diff --git a/crdt-enc-gpgme/Cargo.toml b/crdt-enc-gpgme/Cargo.toml index c2681ba..1b20692 100644 --- a/crdt-enc-gpgme/Cargo.toml +++ b/crdt-enc-gpgme/Cargo.toml @@ -14,6 +14,7 @@ uuid = "0.8" crdts = "6.2" gpgme = "0.9" dyn-clone = "1" +phf = {version = "0.8", features = ["macros"]} [dependencies.crdt-enc] path = "../crdt-enc" diff --git a/crdt-enc-gpgme/src/lib.rs b/crdt-enc-gpgme/src/lib.rs index c9cceae..46f6721 100644 --- a/crdt-enc-gpgme/src/lib.rs +++ b/crdt-enc-gpgme/src/lib.rs @@ -3,7 +3,8 @@ use ::async_trait::async_trait; use ::crdt_enc::{ key_cryptor::Keys, utils::{ - decode_version_bytes_mvreg_custom, encode_version_bytes_mvreg_custom, LockBox, VersionBytes, + decode_version_bytes_mvreg_custom_phf, encode_version_bytes_mvreg_custom, LockBox, + VersionBytes, }, CoreSubHandle, Info, }; @@ -14,10 +15,10 @@ use ::uuid::Uuid; const CURRENT_VERSION: Uuid = Uuid::from_u128(0xe69cb68e_7fbb_41aa_8d22_87eace7a04c9); -// needs to be sorted! -const SUPPORTED_VERSIONS: &[Uuid] = &[ - Uuid::from_u128(0xe69cb68e_7fbb_41aa_8d22_87eace7a04c9), // current -]; +static SUPPORTED_VERSIONS: phf::Set = phf::phf_set! { + // current + 0x_e69cb68e_7fbb_41aa_8d22_87eace7a04c9_u128, +}; pub fn init() { gpgme::init(); @@ -88,12 +89,15 @@ impl crdt_enc::key_cryptor::KeyCryptor for KeyHandler { Ok((data.remote_meta.clone(), core)) })?; - let keys_ctx = - decode_version_bytes_mvreg_custom(&remote_meta, SUPPORTED_VERSIONS, |buf| async move { + let keys_ctx = decode_version_bytes_mvreg_custom_phf( + &remote_meta, + &SUPPORTED_VERSIONS, + |buf| async move { // TODO: decrypt key Ok(buf) - }) - .await?; + }, + ) + .await?; core.set_keys(keys_ctx).await?; diff --git a/crdt-enc/Cargo.toml b/crdt-enc/Cargo.toml index 48a6732..02c6679 100644 --- a/crdt-enc/Cargo.toml +++ b/crdt-enc/Cargo.toml @@ -15,6 +15,7 @@ thiserror = "1" futures = "0.3" dyn-clone = "1" bytes = "1" +phf = {version = "0.8", features = ["macros"]} [dependencies.uuid] version = "0.8" diff --git a/crdt-enc/src/lib.rs b/crdt-enc/src/lib.rs index 110a906..487503e 100644 --- a/crdt-enc/src/lib.rs +++ b/crdt-enc/src/lib.rs @@ -25,10 +25,10 @@ use ::uuid::Uuid; const CURRENT_VERSION: Uuid = Uuid::from_u128(0xe834d789_101b_4634_9823_9de990a9051f); -// needs to be sorted! -const SUPPORTED_VERSIONS: &[Uuid] = &[ - Uuid::from_u128(0xe834d789_101b_4634_9823_9de990a9051f), // current -]; +static SUPPORTED_VERSIONS: phf::Set = phf::phf_set! { + // current + 0x_e834d789_101b_4634_9823_9de990a9051f_u128, +}; #[async_trait] pub trait CoreSubHandle @@ -254,7 +254,7 @@ where .context("failed getting local meta")?; let local_meta: LocalMeta = match local_meta { Some(local_meta) => { - local_meta.ensure_versions(SUPPORTED_VERSIONS)?; + local_meta.ensure_versions_phf(&SUPPORTED_VERSIONS)?; rmp_serde::from_read_ref(&local_meta)? } None => { @@ -432,7 +432,7 @@ where .map(|(name, state)| { let key = key.clone(); async move { - state.ensure_versions(SUPPORTED_VERSIONS)?; + state.ensure_versions_phf(&SUPPORTED_VERSIONS)?; let clear_text = self .cryptor @@ -497,7 +497,7 @@ where .map(|(actor, version, data)| { let key = key.clone(); async move { - data.ensure_versions(SUPPORTED_VERSIONS)?; + data.ensure_versions_phf(&SUPPORTED_VERSIONS)?; let clear_text = self .cryptor .decrypt(key.key(), data.into_inner()) @@ -575,7 +575,7 @@ where .context("failed loading remote meta while reading remote metas")? .into_iter() .map(|(name, vbox)| { - vbox.ensure_versions(SUPPORTED_VERSIONS)?; + vbox.ensure_versions_phf(&SUPPORTED_VERSIONS)?; let remote_meta: RemoteMeta = rmp_serde::from_read_ref(&vbox)?; diff --git a/crdt-enc/src/utils.rs b/crdt-enc/src/utils.rs index 46edea4..e1b6107 100644 --- a/crdt-enc/src/utils.rs +++ b/crdt-enc/src/utils.rs @@ -55,6 +55,7 @@ pub fn decode_version_bytes_mvreg( }) } +/// `supported_versions` needs to be sorted pub async fn decode_version_bytes_mvreg_custom( reg: &MVReg, supported_versions: &[Uuid], @@ -89,6 +90,40 @@ where }) } +pub async fn decode_version_bytes_mvreg_custom_phf( + reg: &MVReg, + supported_versions: &phf::Set, + mut buf_decode: M, +) -> Result> +where + T: DeserializeOwned + CvRDT + Default, + M: FnMut(Vec) -> Fut, + Fut: Future>>, +{ + let (vals, read_ctx) = reg.read().split(); + let val = stream::iter(vals) + .map(|vb| { + vb.ensure_versions_phf(supported_versions)?; + Ok(vb.into_inner()) + }) + .map_ok(|buf| { + buf_decode(buf).map(|res| res.context("Custom buffer decode function failed")) + }) + .try_buffer_unordered(16) + .try_fold(T::default(), |mut acc, buf| async move { + let keys = rmp_serde::from_read_ref(&buf).context("Could not parse msgpack value")?; + acc.merge(keys); + Ok(acc) + }) + .await + .context("Could not process mvreg value")?; + Ok(ReadCtx { + add_clock: read_ctx.add_clock, + rm_clock: read_ctx.rm_clock, + val, + }) +} + pub fn encode_version_bytes_mvreg( reg: &mut MVReg, val: ReadCtx, diff --git a/crdt-enc/src/utils/version_bytes.rs b/crdt-enc/src/utils/version_bytes.rs index 513bc8e..326b482 100644 --- a/crdt-enc/src/utils/version_bytes.rs +++ b/crdt-enc/src/utils/version_bytes.rs @@ -49,6 +49,10 @@ impl VersionBytes { self.as_version_bytes_ref().ensure_versions(versions) } + pub fn ensure_versions_phf(&self, versions: &phf::Set) -> Result<(), VersionError> { + self.as_version_bytes_ref().ensure_versions_phf(versions) + } + pub fn into_inner(self) -> Vec { self.1 } @@ -128,6 +132,21 @@ impl<'a> VersionBytesRef<'a> { } } + pub fn ensure_versions_phf(&self, versions: &phf::Set) -> Result<(), VersionError> { + if versions.contains(&self.version().as_u128()) { + Ok(()) + } else { + Err(VersionError { + expected: versions + .iter() + .copied() + .map(|v| Uuid::from_u128(v)) + .collect(), + got: self.0, + }) + } + } + pub fn buf(&self) -> VersionBytesBuf<'_> { VersionBytesBuf::new(self.0, &self.1) }