feat: use phf::Set for supported versions
This commit is contained in:
@@ -14,6 +14,7 @@ uuid = "0.8"
|
|||||||
crdts = "6.2"
|
crdts = "6.2"
|
||||||
gpgme = "0.9"
|
gpgme = "0.9"
|
||||||
dyn-clone = "1"
|
dyn-clone = "1"
|
||||||
|
phf = {version = "0.8", features = ["macros"]}
|
||||||
|
|
||||||
[dependencies.crdt-enc]
|
[dependencies.crdt-enc]
|
||||||
path = "../crdt-enc"
|
path = "../crdt-enc"
|
||||||
|
|||||||
@@ -3,7 +3,8 @@ use ::async_trait::async_trait;
|
|||||||
use ::crdt_enc::{
|
use ::crdt_enc::{
|
||||||
key_cryptor::Keys,
|
key_cryptor::Keys,
|
||||||
utils::{
|
utils::{
|
||||||
decode_version_bytes_mvreg_custom, encode_version_bytes_mvreg_custom, LockBox, VersionBytes,
|
decode_version_bytes_mvreg_custom_phf, encode_version_bytes_mvreg_custom, LockBox,
|
||||||
|
VersionBytes,
|
||||||
},
|
},
|
||||||
CoreSubHandle, Info,
|
CoreSubHandle, Info,
|
||||||
};
|
};
|
||||||
@@ -14,10 +15,10 @@ use ::uuid::Uuid;
|
|||||||
|
|
||||||
const CURRENT_VERSION: Uuid = Uuid::from_u128(0xe69cb68e_7fbb_41aa_8d22_87eace7a04c9);
|
const CURRENT_VERSION: Uuid = Uuid::from_u128(0xe69cb68e_7fbb_41aa_8d22_87eace7a04c9);
|
||||||
|
|
||||||
// needs to be sorted!
|
static SUPPORTED_VERSIONS: phf::Set<u128> = phf::phf_set! {
|
||||||
const SUPPORTED_VERSIONS: &[Uuid] = &[
|
// current
|
||||||
Uuid::from_u128(0xe69cb68e_7fbb_41aa_8d22_87eace7a04c9), // current
|
0x_e69cb68e_7fbb_41aa_8d22_87eace7a04c9_u128,
|
||||||
];
|
};
|
||||||
|
|
||||||
pub fn init() {
|
pub fn init() {
|
||||||
gpgme::init();
|
gpgme::init();
|
||||||
@@ -88,11 +89,14 @@ impl crdt_enc::key_cryptor::KeyCryptor for KeyHandler {
|
|||||||
Ok((data.remote_meta.clone(), core))
|
Ok((data.remote_meta.clone(), core))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let keys_ctx =
|
let keys_ctx = decode_version_bytes_mvreg_custom_phf(
|
||||||
decode_version_bytes_mvreg_custom(&remote_meta, SUPPORTED_VERSIONS, |buf| async move {
|
&remote_meta,
|
||||||
|
&SUPPORTED_VERSIONS,
|
||||||
|
|buf| async move {
|
||||||
// TODO: decrypt key
|
// TODO: decrypt key
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
})
|
},
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
core.set_keys(keys_ctx).await?;
|
core.set_keys(keys_ctx).await?;
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ thiserror = "1"
|
|||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
dyn-clone = "1"
|
dyn-clone = "1"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
|
phf = {version = "0.8", features = ["macros"]}
|
||||||
|
|
||||||
[dependencies.uuid]
|
[dependencies.uuid]
|
||||||
version = "0.8"
|
version = "0.8"
|
||||||
|
|||||||
@@ -25,10 +25,10 @@ use ::uuid::Uuid;
|
|||||||
|
|
||||||
const CURRENT_VERSION: Uuid = Uuid::from_u128(0xe834d789_101b_4634_9823_9de990a9051f);
|
const CURRENT_VERSION: Uuid = Uuid::from_u128(0xe834d789_101b_4634_9823_9de990a9051f);
|
||||||
|
|
||||||
// needs to be sorted!
|
static SUPPORTED_VERSIONS: phf::Set<u128> = phf::phf_set! {
|
||||||
const SUPPORTED_VERSIONS: &[Uuid] = &[
|
// current
|
||||||
Uuid::from_u128(0xe834d789_101b_4634_9823_9de990a9051f), // current
|
0x_e834d789_101b_4634_9823_9de990a9051f_u128,
|
||||||
];
|
};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait CoreSubHandle
|
pub trait CoreSubHandle
|
||||||
@@ -254,7 +254,7 @@ where
|
|||||||
.context("failed getting local meta")?;
|
.context("failed getting local meta")?;
|
||||||
let local_meta: LocalMeta = match local_meta {
|
let local_meta: LocalMeta = match local_meta {
|
||||||
Some(local_meta) => {
|
Some(local_meta) => {
|
||||||
local_meta.ensure_versions(SUPPORTED_VERSIONS)?;
|
local_meta.ensure_versions_phf(&SUPPORTED_VERSIONS)?;
|
||||||
rmp_serde::from_read_ref(&local_meta)?
|
rmp_serde::from_read_ref(&local_meta)?
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
@@ -432,7 +432,7 @@ where
|
|||||||
.map(|(name, state)| {
|
.map(|(name, state)| {
|
||||||
let key = key.clone();
|
let key = key.clone();
|
||||||
async move {
|
async move {
|
||||||
state.ensure_versions(SUPPORTED_VERSIONS)?;
|
state.ensure_versions_phf(&SUPPORTED_VERSIONS)?;
|
||||||
|
|
||||||
let clear_text = self
|
let clear_text = self
|
||||||
.cryptor
|
.cryptor
|
||||||
@@ -497,7 +497,7 @@ where
|
|||||||
.map(|(actor, version, data)| {
|
.map(|(actor, version, data)| {
|
||||||
let key = key.clone();
|
let key = key.clone();
|
||||||
async move {
|
async move {
|
||||||
data.ensure_versions(SUPPORTED_VERSIONS)?;
|
data.ensure_versions_phf(&SUPPORTED_VERSIONS)?;
|
||||||
let clear_text = self
|
let clear_text = self
|
||||||
.cryptor
|
.cryptor
|
||||||
.decrypt(key.key(), data.into_inner())
|
.decrypt(key.key(), data.into_inner())
|
||||||
@@ -575,7 +575,7 @@ where
|
|||||||
.context("failed loading remote meta while reading remote metas")?
|
.context("failed loading remote meta while reading remote metas")?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(name, vbox)| {
|
.map(|(name, vbox)| {
|
||||||
vbox.ensure_versions(SUPPORTED_VERSIONS)?;
|
vbox.ensure_versions_phf(&SUPPORTED_VERSIONS)?;
|
||||||
|
|
||||||
let remote_meta: RemoteMeta = rmp_serde::from_read_ref(&vbox)?;
|
let remote_meta: RemoteMeta = rmp_serde::from_read_ref(&vbox)?;
|
||||||
|
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ pub fn decode_version_bytes_mvreg<T: DeserializeOwned + CvRDT + Default>(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `supported_versions` needs to be sorted
|
||||||
pub async fn decode_version_bytes_mvreg_custom<T, M, Fut>(
|
pub async fn decode_version_bytes_mvreg_custom<T, M, Fut>(
|
||||||
reg: &MVReg<VersionBytes, Uuid>,
|
reg: &MVReg<VersionBytes, Uuid>,
|
||||||
supported_versions: &[Uuid],
|
supported_versions: &[Uuid],
|
||||||
@@ -89,6 +90,40 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn decode_version_bytes_mvreg_custom_phf<T, M, Fut>(
|
||||||
|
reg: &MVReg<VersionBytes, Uuid>,
|
||||||
|
supported_versions: &phf::Set<u128>,
|
||||||
|
mut buf_decode: M,
|
||||||
|
) -> Result<ReadCtx<T, Uuid>>
|
||||||
|
where
|
||||||
|
T: DeserializeOwned + CvRDT + Default,
|
||||||
|
M: FnMut(Vec<u8>) -> Fut,
|
||||||
|
Fut: Future<Output = Result<Vec<u8>>>,
|
||||||
|
{
|
||||||
|
let (vals, read_ctx) = reg.read().split();
|
||||||
|
let val = stream::iter(vals)
|
||||||
|
.map(|vb| {
|
||||||
|
vb.ensure_versions_phf(supported_versions)?;
|
||||||
|
Ok(vb.into_inner())
|
||||||
|
})
|
||||||
|
.map_ok(|buf| {
|
||||||
|
buf_decode(buf).map(|res| res.context("Custom buffer decode function failed"))
|
||||||
|
})
|
||||||
|
.try_buffer_unordered(16)
|
||||||
|
.try_fold(T::default(), |mut acc, buf| async move {
|
||||||
|
let keys = rmp_serde::from_read_ref(&buf).context("Could not parse msgpack value")?;
|
||||||
|
acc.merge(keys);
|
||||||
|
Ok(acc)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.context("Could not process mvreg value")?;
|
||||||
|
Ok(ReadCtx {
|
||||||
|
add_clock: read_ctx.add_clock,
|
||||||
|
rm_clock: read_ctx.rm_clock,
|
||||||
|
val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn encode_version_bytes_mvreg<T: Serialize>(
|
pub fn encode_version_bytes_mvreg<T: Serialize>(
|
||||||
reg: &mut MVReg<VersionBytes, Uuid>,
|
reg: &mut MVReg<VersionBytes, Uuid>,
|
||||||
val: ReadCtx<T, Uuid>,
|
val: ReadCtx<T, Uuid>,
|
||||||
|
|||||||
@@ -49,6 +49,10 @@ impl VersionBytes {
|
|||||||
self.as_version_bytes_ref().ensure_versions(versions)
|
self.as_version_bytes_ref().ensure_versions(versions)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn ensure_versions_phf(&self, versions: &phf::Set<u128>) -> Result<(), VersionError> {
|
||||||
|
self.as_version_bytes_ref().ensure_versions_phf(versions)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn into_inner(self) -> Vec<u8> {
|
pub fn into_inner(self) -> Vec<u8> {
|
||||||
self.1
|
self.1
|
||||||
}
|
}
|
||||||
@@ -128,6 +132,21 @@ impl<'a> VersionBytesRef<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn ensure_versions_phf(&self, versions: &phf::Set<u128>) -> Result<(), VersionError> {
|
||||||
|
if versions.contains(&self.version().as_u128()) {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(VersionError {
|
||||||
|
expected: versions
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.map(|v| Uuid::from_u128(v))
|
||||||
|
.collect(),
|
||||||
|
got: self.0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn buf(&self) -> VersionBytesBuf<'_> {
|
pub fn buf(&self) -> VersionBytesBuf<'_> {
|
||||||
VersionBytesBuf::new(self.0, &self.1)
|
VersionBytesBuf::new(self.0, &self.1)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user