From c4d0f3acc14b72cae0816a7e4abc9117da9f5be9 Mon Sep 17 00:00:00 2001 From: Thomas Heck Date: Sun, 7 Mar 2021 14:46:38 +0100 Subject: [PATCH] feat(core): add custom mvreg decoding/encoding util fns --- crdt-enc/src/utils.rs | 65 +++++++++++++++++++++++++++-- crdt-enc/src/utils/version_bytes.rs | 4 ++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/crdt-enc/src/utils.rs b/crdt-enc/src/utils.rs index 2d5c554..5a357a0 100644 --- a/crdt-enc/src/utils.rs +++ b/crdt-enc/src/utils.rs @@ -1,9 +1,11 @@ mod version_bytes; +use futures::TryFutureExt; pub use version_bytes::*; use ::anyhow::{Context, Result}; use ::crdts::{ctx::ReadCtx, CmRDT, CvRDT, MVReg}; +use ::futures::{stream, Future, FutureExt, StreamExt, TryStreamExt}; use ::serde::{de::DeserializeOwned, Deserialize, Serialize}; use ::std::convert::Infallible; use ::uuid::Uuid; @@ -54,6 +56,40 @@ pub fn decode_version_bytes_mvreg( }) } +pub async fn decode_version_bytes_mvreg_custom( + reg: &MVReg, + supported_versions: &[Uuid], + mut buf_decode: M, +) -> Result> +where + T: DeserializeOwned + CvRDT + Default, + M: FnMut(Vec) -> Fut, + Fut: Future>>, +{ + let (vals, read_ctx) = reg.read().split(); + let val = stream::iter(vals) + .map(|vb| { + vb.ensure_versions(supported_versions)?; + Ok(vb.into_inner()) + }) + .map_ok(|buf| { + buf_decode(buf).map(|res| res.context("Custom buffer decode function failed")) + }) + .try_buffer_unordered(16) + .try_fold(T::default(), |mut acc, buf| async move { + let keys = rmp_serde::from_read_ref(&buf).context("Could not parse msgpack value")?; + acc.merge(keys); + Ok(acc) + }) + .await + .context("Could not process mvreg value")?; + Ok(ReadCtx { + add_clock: read_ctx.add_clock, + rm_clock: read_ctx.rm_clock, + val, + }) +} + pub fn encode_version_bytes_mvreg( reg: &mut MVReg, val: ReadCtx, @@ -61,10 +97,31 @@ pub fn encode_version_bytes_mvreg( version: Uuid, ) -> Result<()> { let (val, read_ctx) = val.split(); - let vb = VersionBytes::new( - version, - rmp_serde::to_vec_named(&val).context("Could not serialize value to msgpack")?, - ); + let buf = rmp_serde::to_vec_named(&val).context("Could not serialize value to msgpack")?; + let vb = VersionBytes::new(version, buf); + let op = reg.write(vb, read_ctx.derive_add_ctx(actor)); + reg.apply(op); + Ok(()) +} + +pub async fn encode_version_bytes_mvreg_custom( + reg: &mut MVReg, + val: ReadCtx, + actor: Uuid, + version: Uuid, + mut buf_encode: M, +) -> Result<()> +where + T: Serialize, + M: FnMut(Vec) -> Fut, + Fut: Future>>, +{ + let (val, read_ctx) = val.split(); + let buf = rmp_serde::to_vec_named(&val).context("Could not serialize value to msgpack")?; + let buf = buf_encode(buf) + .await + .context("Custom buffer encode function failed")?; + let vb = VersionBytes::new(version, buf); let op = reg.write(vb, read_ctx.derive_add_ctx(actor)); reg.apply(op); Ok(()) diff --git a/crdt-enc/src/utils/version_bytes.rs b/crdt-enc/src/utils/version_bytes.rs index c118067..91319f2 100644 --- a/crdt-enc/src/utils/version_bytes.rs +++ b/crdt-enc/src/utils/version_bytes.rs @@ -78,6 +78,10 @@ impl VersionBytes { pub fn to_vec(&self) -> Vec { self.as_version_bytes_ref().to_vec() } + + pub fn into_inner(self) -> Vec { + self.1 + } } impl From for Vec {