diff --git a/crdt-enc-gpgme/Cargo.toml b/crdt-enc-gpgme/Cargo.toml index 98953b3..3e030cb 100644 --- a/crdt-enc-gpgme/Cargo.toml +++ b/crdt-enc-gpgme/Cargo.toml @@ -7,11 +7,11 @@ edition = "2018" [dependencies] serde = "1" serde_bytes = "0.11" -rmp-serde = "0.14" +rmp-serde = "0.15" async-trait = "0.1" anyhow = "1" uuid = "0.8" -crdts = "4" +crdts = "5" gpgme = "0.9" dyn-clone = "1" diff --git a/crdt-enc-gpgme/src/lib.rs b/crdt-enc-gpgme/src/lib.rs index eba93a3..71529b3 100644 --- a/crdt-enc-gpgme/src/lib.rs +++ b/crdt-enc-gpgme/src/lib.rs @@ -9,7 +9,7 @@ use crdt_enc::{ }; use crdts::{CmRDT, CvRDT, MVReg, Orswot}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{borrow::Cow, fmt::Debug, sync::Mutex as SyncMutex}; +use std::{borrow::Cow, convert::Infallible, fmt::Debug, sync::Mutex as SyncMutex}; use uuid::Uuid; pub fn init() { @@ -48,6 +48,12 @@ pub struct Meta { } impl CvRDT for Meta { + type Validation = Infallible; + + fn validate_merge(&self, _other: &Self) -> Result<(), Infallible> { + Ok(()) + } + fn merge(&mut self, other: Self) { self.key_fps.merge(other.key_fps); } @@ -117,12 +123,9 @@ impl crdt_enc::key_cryptor::KeyCryptor for KeyHandler { let read_ctx = data.remote_meta.read(); - let actor = data.info.as_ref().context("info is none")?.actor(); - let write_ctx = read_ctx.derive_add_ctx(actor); - let mut keys = read_ctx .val - .into_iter() + .iter() .try_fold(Keys::default(), |mut acc, vb| { // TODO: check version // TODO: decrypt key @@ -133,6 +136,9 @@ impl crdt_enc::key_cryptor::KeyCryptor for KeyHandler { keys.merge(new_keys); + let actor = data.info.as_ref().context("info is none")?.actor(); + let write_ctx = read_ctx.derive_add_ctx(actor); + let op = data.remote_meta.write( VersionBytes::new( // TODO diff --git a/crdt-enc-sodium/Cargo.toml b/crdt-enc-sodium/Cargo.toml index 0a19d7a..14f685c 100644 --- a/crdt-enc-sodium/Cargo.toml +++ b/crdt-enc-sodium/Cargo.toml @@ -5,11 +5,11 @@ authors = ["Thomas Heck "] edition = "2018" [dependencies] -crdts = "4" +crdts = "5" sodiumoxide = "0.2" anyhow = "1" serde = "1" -rmp-serde = "0.14" +rmp-serde = "0.15" serde_bytes = "0.11" uuid = "0.8" async-trait = "0.1" diff --git a/crdt-enc-tokio/Cargo.toml b/crdt-enc-tokio/Cargo.toml index cb08a45..58303d4 100644 --- a/crdt-enc-tokio/Cargo.toml +++ b/crdt-enc-tokio/Cargo.toml @@ -5,22 +5,26 @@ authors = ["Thomas Heck "] edition = "2018" [dependencies] -crdts = "4" +crdts = "5" serde = "1" async-trait = "0.1" futures = "0.3" anyhow = "1" uuid = "0.8" data-encoding = "2" -bytes = "0.5" +bytes = "1" [dependencies.tiny-keccak] version = "2" features = ["sha3"] [dependencies.tokio] -version = "0.2" -features = ["fs", "stream", "io-util"] +version = "1" +features = ["fs", "io-util"] + +[dependencies.tokio-stream] +version = "0.1" +features = ["fs"] [dependencies.crdt-enc] path = "../crdt-enc" diff --git a/crdt-enc-tokio/src/lib.rs b/crdt-enc-tokio/src/lib.rs index 75fc539..837590b 100644 --- a/crdt-enc-tokio/src/lib.rs +++ b/crdt-enc-tokio/src/lib.rs @@ -23,6 +23,7 @@ use tokio::{ fs, io::{self, AsyncWrite, AsyncWriteExt}, }; +use tokio_stream::wrappers::ReadDirStream; use uuid::Uuid; #[derive(Debug)] @@ -383,7 +384,7 @@ fn read_dir_optional(path: PathBuf) -> impl Stream> async move { match fs::read_dir(&path).await { Ok(dir) => { - let entry_stream = dir.map(move |entry| { + let entry_stream = ReadDirStream::new(dir).map(move |entry| { entry.with_context(|| format!("failed getting entry from {}", path.display())) }); Ok(Either::Left(entry_stream)) @@ -413,7 +414,7 @@ async fn write_content_addressible_file( let mut digest = Sha3::v256(); let mut buf = bytes.buf(); while buf.has_remaining() { - let b = buf.bytes(); + let b = buf.chunk(); digest.update(b); buf.advance(b.len()); } diff --git a/crdt-enc/Cargo.toml b/crdt-enc/Cargo.toml index 18fcaa1..3d2e3e6 100644 --- a/crdt-enc/Cargo.toml +++ b/crdt-enc/Cargo.toml @@ -5,16 +5,16 @@ authors = ["Thomas Heck "] edition = "2018" [dependencies] -crdts = "4" +crdts = "5" serde = "1" serde_bytes = "0.11" -rmp-serde = "0.14" +rmp-serde = "0.15" async-trait = "0.1" anyhow = "1" thiserror = "1" futures = "0.3" dyn-clone = "1" -bytes = "0.5" +bytes = "1" [dependencies.uuid] version = "0.8" diff --git a/crdt-enc/src/key_cryptor.rs b/crdt-enc/src/key_cryptor.rs index 9c7cac2..9a899b6 100644 --- a/crdt-enc/src/key_cryptor.rs +++ b/crdt-enc/src/key_cryptor.rs @@ -9,6 +9,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ borrow::Borrow, cmp::{Eq, Ord, Ordering, PartialEq}, + convert::Infallible, fmt::Debug, hash::{Hash, Hasher}, sync::Arc, @@ -42,6 +43,12 @@ pub struct Keys { } impl CvRDT for Keys { + type Validation = Infallible; + + fn validate_merge(&self, _other: &Self) -> Result<(), Infallible> { + Ok(()) + } + fn merge(&mut self, other: Keys) { self.latest_key_id.merge(other.latest_key_id); self.keys.merge(other.keys); diff --git a/crdt-enc/src/lib.rs b/crdt-enc/src/lib.rs index ee36d9b..605a094 100644 --- a/crdt-enc/src/lib.rs +++ b/crdt-enc/src/lib.rs @@ -22,6 +22,7 @@ use futures::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ collections::HashSet, + convert::Infallible, default::Default, fmt::Debug, mem, @@ -740,6 +741,12 @@ struct RemoteMeta { } impl CvRDT for RemoteMeta { + type Validation = Infallible; + + fn validate_merge(&self, _other: &Self) -> Result<(), Infallible> { + Ok(()) + } + fn merge(&mut self, other: Self) { self.storage.merge(other.storage); self.cryptor.merge(other.cryptor); diff --git a/crdt-enc/src/utils.rs b/crdt-enc/src/utils.rs index 887c8e8..3da10c2 100644 --- a/crdt-enc/src/utils.rs +++ b/crdt-enc/src/utils.rs @@ -4,6 +4,7 @@ pub use version_bytes::*; use crdts::{CmRDT, CvRDT}; use serde::{Deserialize, Serialize}; +use std::convert::Infallible; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct EmptyCrdt; @@ -11,9 +12,21 @@ pub struct EmptyCrdt; impl CmRDT for EmptyCrdt { type Op = (); + type Validation = Infallible; + + fn validate_op(&self, op: &Self::Op) -> Result<(), Infallible> { + Ok(()) + } + fn apply(&mut self, _op: Self::Op) {} } impl CvRDT for EmptyCrdt { + type Validation = Infallible; + + fn validate_merge(&self, _other: &Self) -> Result<(), Infallible> { + Ok(()) + } + fn merge(&mut self, _other: Self) {} } diff --git a/crdt-enc/src/utils/version_bytes.rs b/crdt-enc/src/utils/version_bytes.rs index 2cc846a..d8f1a1c 100644 --- a/crdt-enc/src/utils/version_bytes.rs +++ b/crdt-enc/src/utils/version_bytes.rs @@ -226,7 +226,7 @@ impl<'a> ::bytes::Buf for VersionBytesBuf<'a> { BUF_VERSION_LEN_BYTES + self.content.len() - self.pos } - fn bytes(&self) -> &[u8] { + fn chunk(&self) -> &[u8] { if self.pos < BUF_VERSION_LEN_BYTES { &self.version_len[self.pos..] } else { @@ -244,7 +244,7 @@ impl<'a> ::bytes::Buf for VersionBytesBuf<'a> { self.pos += cnt; } - fn bytes_vectored<'b>(&'b self, dst: &mut [IoSlice<'b>]) -> usize { + fn chunks_vectored<'b>(&'b self, dst: &mut [IoSlice<'b>]) -> usize { // TODO: TESTING! if dst.len() == 0 { diff --git a/examples/test/Cargo.toml b/examples/test/Cargo.toml index 4d0236b..b02407a 100644 --- a/examples/test/Cargo.toml +++ b/examples/test/Cargo.toml @@ -10,7 +10,7 @@ crdt-enc-tokio = {path="../../crdt-enc-tokio"} crdt-enc-sodium = {path="../../crdt-enc-sodium"} crdt-enc-gpgme = {path="../../crdt-enc-gpgme"} uuid = {version = "0.8", features = ["serde", "v4"]} -crdts = "4" -tokio = {version = "0.2", features = ["macros"]} +crdts = "5" +tokio = {version = "1", features = ["macros", "rt"]} anyhow = "1" futures = "0.3" diff --git a/examples/test/src/main.rs b/examples/test/src/main.rs index 1f75c5f..ca95c32 100644 --- a/examples/test/src/main.rs +++ b/examples/test/src/main.rs @@ -7,7 +7,7 @@ use uuid::Uuid; const CURRENT_DATA_VERSION: Uuid = Uuid::from_u128(1u128); -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { crdt_enc_sodium::init(); @@ -44,8 +44,8 @@ async fn main() -> Result<()> { let op = repo.with_state(|s: &crdts::MVReg| { let read_ctx = s.read(); - let add_ctx = read_ctx.derive_add_ctx(info.actor()); - let op = s.write(read_ctx.val.into_iter().max().unwrap_or(0) + 1, add_ctx); + let new_val = read_ctx.val.iter().copied().max().unwrap_or(0) + 1; + let op = s.write(new_val, read_ctx.derive_add_ctx(info.actor())); Ok(op) })?;