feat(core): add custom mvreg decoding/encoding util fns

This commit is contained in:
2021-03-07 14:46:38 +01:00
parent e59748951b
commit c4d0f3acc1
2 changed files with 65 additions and 4 deletions

View File

@@ -1,9 +1,11 @@
mod version_bytes; mod version_bytes;
use futures::TryFutureExt;
pub use version_bytes::*; pub use version_bytes::*;
use ::anyhow::{Context, Result}; use ::anyhow::{Context, Result};
use ::crdts::{ctx::ReadCtx, CmRDT, CvRDT, MVReg}; use ::crdts::{ctx::ReadCtx, CmRDT, CvRDT, MVReg};
use ::futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
use ::serde::{de::DeserializeOwned, Deserialize, Serialize}; use ::serde::{de::DeserializeOwned, Deserialize, Serialize};
use ::std::convert::Infallible; use ::std::convert::Infallible;
use ::uuid::Uuid; use ::uuid::Uuid;
@@ -54,6 +56,40 @@ pub fn decode_version_bytes_mvreg<T: DeserializeOwned + CvRDT + Default>(
}) })
} }
pub async fn decode_version_bytes_mvreg_custom<T, M, Fut>(
reg: &MVReg<VersionBytes, Uuid>,
supported_versions: &[Uuid],
mut buf_decode: M,
) -> Result<ReadCtx<T, Uuid>>
where
T: DeserializeOwned + CvRDT + Default,
M: FnMut(Vec<u8>) -> Fut,
Fut: Future<Output = Result<Vec<u8>>>,
{
let (vals, read_ctx) = reg.read().split();
let val = stream::iter(vals)
.map(|vb| {
vb.ensure_versions(supported_versions)?;
Ok(vb.into_inner())
})
.map_ok(|buf| {
buf_decode(buf).map(|res| res.context("Custom buffer decode function failed"))
})
.try_buffer_unordered(16)
.try_fold(T::default(), |mut acc, buf| async move {
let keys = rmp_serde::from_read_ref(&buf).context("Could not parse msgpack value")?;
acc.merge(keys);
Ok(acc)
})
.await
.context("Could not process mvreg value")?;
Ok(ReadCtx {
add_clock: read_ctx.add_clock,
rm_clock: read_ctx.rm_clock,
val,
})
}
pub fn encode_version_bytes_mvreg<T: Serialize>( pub fn encode_version_bytes_mvreg<T: Serialize>(
reg: &mut MVReg<VersionBytes, Uuid>, reg: &mut MVReg<VersionBytes, Uuid>,
val: ReadCtx<T, Uuid>, val: ReadCtx<T, Uuid>,
@@ -61,10 +97,31 @@ pub fn encode_version_bytes_mvreg<T: Serialize>(
version: Uuid, version: Uuid,
) -> Result<()> { ) -> Result<()> {
let (val, read_ctx) = val.split(); let (val, read_ctx) = val.split();
let vb = VersionBytes::new( let buf = rmp_serde::to_vec_named(&val).context("Could not serialize value to msgpack")?;
version, let vb = VersionBytes::new(version, buf);
rmp_serde::to_vec_named(&val).context("Could not serialize value to msgpack")?, let op = reg.write(vb, read_ctx.derive_add_ctx(actor));
); reg.apply(op);
Ok(())
}
pub async fn encode_version_bytes_mvreg_custom<T, M, Fut>(
reg: &mut MVReg<VersionBytes, Uuid>,
val: ReadCtx<T, Uuid>,
actor: Uuid,
version: Uuid,
mut buf_encode: M,
) -> Result<()>
where
T: Serialize,
M: FnMut(Vec<u8>) -> Fut,
Fut: Future<Output = Result<Vec<u8>>>,
{
let (val, read_ctx) = val.split();
let buf = rmp_serde::to_vec_named(&val).context("Could not serialize value to msgpack")?;
let buf = buf_encode(buf)
.await
.context("Custom buffer encode function failed")?;
let vb = VersionBytes::new(version, buf);
let op = reg.write(vb, read_ctx.derive_add_ctx(actor)); let op = reg.write(vb, read_ctx.derive_add_ctx(actor));
reg.apply(op); reg.apply(op);
Ok(()) Ok(())

View File

@@ -78,6 +78,10 @@ impl VersionBytes {
pub fn to_vec(&self) -> Vec<u8> { pub fn to_vec(&self) -> Vec<u8> {
self.as_version_bytes_ref().to_vec() self.as_version_bytes_ref().to_vec()
} }
pub fn into_inner(self) -> Vec<u8> {
self.1
}
} }
impl From<VersionBytes> for Vec<u8> { impl From<VersionBytes> for Vec<u8> {