diff --git a/crdt-enc/src/utils.rs b/crdt-enc/src/utils.rs index d1daf0a..2d5c554 100644 --- a/crdt-enc/src/utils.rs +++ b/crdt-enc/src/utils.rs @@ -2,9 +2,11 @@ mod version_bytes; pub use version_bytes::*; -use ::crdts::{CmRDT, CvRDT}; -use ::serde::{Deserialize, Serialize}; +use ::anyhow::{Context, Result}; +use ::crdts::{ctx::ReadCtx, CmRDT, CvRDT, MVReg}; +use ::serde::{de::DeserializeOwned, Deserialize, Serialize}; use ::std::convert::Infallible; +use ::uuid::Uuid; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct EmptyCrdt; @@ -30,3 +32,40 @@ impl CvRDT for EmptyCrdt { fn merge(&mut self, _other: Self) {} } + +pub fn decode_version_bytes_mvreg( + reg: &MVReg, + supported_versions: &[Uuid], +) -> Result> { + let (vals, read_ctx) = reg.read().split(); + let val = vals + .into_iter() + .try_fold(T::default(), |mut acc, vb| -> Result { + vb.ensure_versions(supported_versions)?; + let keys = rmp_serde::from_read_ref(&vb).context("Could not parse msgpack value")?; + acc.merge(keys); + Ok(acc) + }) + .context("Could not process mvreg value")?; + Ok(ReadCtx { + add_clock: read_ctx.add_clock, + rm_clock: read_ctx.rm_clock, + val, + }) +} + +pub fn encode_version_bytes_mvreg( + reg: &mut MVReg, + val: ReadCtx, + actor: Uuid, + version: Uuid, +) -> Result<()> { + let (val, read_ctx) = val.split(); + let vb = VersionBytes::new( + version, + rmp_serde::to_vec_named(&val).context("Could not serialize value to msgpack")?, + ); + let op = reg.write(vb, read_ctx.derive_add_ctx(actor)); + reg.apply(op); + Ok(()) +}