commit 6ce29d99fbb1d667455def5d9d63700406a55e1e Author: Thomas Heck Date: Thu Dec 24 11:25:50 2020 +0100 import from pijul repo / WIP diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..f9366fa --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +root = true + +[*] +indent_style = space +indent_size = 4 +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e9e2199 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target/ +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..05fb40e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[workspace] + +members = [ + "crdt-enc", + "crdt-enc-gpgme", + "crdt-enc-sodium", + "crdt-enc-tokio", + "examples/*", +] diff --git a/README.md b/README.md new file mode 100644 index 0000000..221c6d8 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +# Encrypted CRDTs + +Main goal: make an encrypted layer that can be used to store CRDTs on +something like *syncthing*. + +## tech + +Changes are stored as CRDT-Ops or full-States. Resulting files are immutable (files are written only +once and never changed after that; but can be deleted) and content-addressable (the name of the file +is the hash of its content). + +### compaction + +Before writing out a change the device can decide to do a compaction on the data by writing out +the a full-state and removing all merged state and applied op files. + +### header + +* resembles luks (the actual encryption key used for the data isn't derived from the password(s)) + * no need to re-encrypt all files after key change (but can be re-encrypted if needed. + this requires a compactation and the old encryption key needs to be stored to be able to apply ops + coming from other devices that are still using the old key) + * allows password management / multiple passwords +* store header as full-state CRDT diff --git a/crdt-enc-gpgme/Cargo.toml b/crdt-enc-gpgme/Cargo.toml new file mode 100644 index 0000000..98953b3 --- /dev/null +++ b/crdt-enc-gpgme/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "crdt-enc-gpgme" +version = "0.1.0" +authors = ["Thomas Heck "] +edition = "2018" + +[dependencies] +serde = "1" +serde_bytes = "0.11" +rmp-serde = "0.14" +async-trait = "0.1" +anyhow = "1" +uuid = "0.8" +crdts = "4" +gpgme = "0.9" +dyn-clone = "1" + +[dependencies.crdt-enc] +path = "../crdt-enc" diff --git a/crdt-enc-gpgme/src/lib.rs b/crdt-enc-gpgme/src/lib.rs new file mode 100644 index 0000000..eba93a3 --- /dev/null +++ b/crdt-enc-gpgme/src/lib.rs @@ -0,0 +1,200 @@ +use anyhow::{Context, Error, Result}; +use async_trait::async_trait; +use crdt_enc::{ + cryptor::Cryptor, + key_cryptor::Keys, + storage::Storage, + utils::{VersionBytes, VersionBytesRef}, + CoreSubHandle, Info, +}; +use crdts::{CmRDT, CvRDT, MVReg, Orswot}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{borrow::Cow, fmt::Debug, sync::Mutex as SyncMutex}; +use uuid::Uuid; + +pub fn init() { + gpgme::init(); +} + +#[derive(Debug)] +struct MutData { + info: Option, + core: Option>, + remote_meta: MVReg, +} + +#[derive(Debug)] +pub struct KeyHandler { + data: SyncMutex, +} + +impl KeyHandler { + pub fn new() -> KeyHandler { + let data = MutData { + info: None, + core: None, + remote_meta: MVReg::new(), + }; + + KeyHandler { + data: SyncMutex::new(data), + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct Meta { + key_fps: Orswot, +} + +impl CvRDT for Meta { + fn merge(&mut self, other: Self) { + self.key_fps.merge(other.key_fps); + } +} + +#[async_trait] +impl crdt_enc::key_cryptor::KeyCryptor for KeyHandler { + async fn init(&self, core: &dyn CoreSubHandle) -> Result<()> { + let mut data = self + .data + .lock() + .map_err(|err| Error::msg(err.to_string()))?; + data.core = Some(dyn_clone::clone_box(core)); + Ok(()) + } + + async fn set_info(&self, info: &Info) -> Result<()> { + let mut data = self + .data + .lock() + .map_err(|err| Error::msg(err.to_string()))?; + data.info = Some(info.clone()); + Ok(()) + } + + async fn set_remote_meta( + &self, + new_remote_meta: Option>, + ) -> Result<()> { + let (keys, core) = { + let mut data = self + .data + .lock() + .map_err(|err| Error::msg(err.to_string()))?; + + if let Some(new_remote_meta) = new_remote_meta { + data.remote_meta.merge(new_remote_meta); + } + + let keys = data.remote_meta.read().val.into_iter().try_fold( + Keys::default(), + |mut acc, vb| { + // TODO: check version + // TODO: decrypt key + let keys = rmp_serde::from_read_ref(&vb).context("")?; + acc.merge(keys); + Result::<_, Error>::Ok(acc) + }, + )?; + + let core = dyn_clone::clone_box(&**data.core.as_ref().context("core is none")?); + + (keys, core) + }; + + core.set_keys(keys).await?; + + Ok(()) + } + + async fn set_keys(&self, new_keys: Keys) -> Result<()> { + let (rm, core) = { + let mut data = self + .data + .lock() + .map_err(|err| Error::msg(err.to_string()))?; + + let read_ctx = data.remote_meta.read(); + + let actor = data.info.as_ref().context("info is none")?.actor(); + let write_ctx = read_ctx.derive_add_ctx(actor); + + let mut keys = read_ctx + .val + .into_iter() + .try_fold(Keys::default(), |mut acc, vb| { + // TODO: check version + // TODO: decrypt key + let keys = rmp_serde::from_read_ref(&vb).context("")?; + acc.merge(keys); + Result::<_, Error>::Ok(acc) + })?; + + keys.merge(new_keys); + + let op = data.remote_meta.write( + VersionBytes::new( + // TODO + Uuid::nil(), + rmp_serde::to_vec_named(&keys)?, + ), + write_ctx, + ); + data.remote_meta.apply(op); + + let core = dyn_clone::clone_box(&**data.core.as_ref().context("core is none")?); + + (data.remote_meta.clone(), core) + }; + + core.set_remote_meta_key_cryptor(rm).await?; + + Ok(()) + } + + // encrypt: + // let mut pgp_ctx = gpgme::Context::from_protocol(gpgme::Protocol::OpenPgp) + // .context("gpgme init fail TODO")?; + + // let recp_pgp_keys = meta + // .key_fps + // .read() + // .val + // .into_iter() + // .map(|fp| pgp_ctx.get_key(fp.as_ref()).context("TODO gpgme get key")) + // .collect::>>()?; + + // let meta_keys = MetaKeys { + // meta: meta.clone(), + // keys: Cow::Borrowed(keys), + // }; + + // let meta_keys = rmp_serde::to_vec_named(&meta_keys).context("")?; + + // let mut enc = Vec::new(); + + // // TODO: check enc_res + // let _enc_res = pgp_ctx + // .encrypt(&recp_pgp_keys, &meta_keys, &mut enc) + // .context("TODO gpgme enc")?; + // } + + // async fn decrypt(&self) -> Result { + // // let mut pgp_ctx = gpgme::Context::from_protocol(gpgme::Protocol::OpenPgp) + // // .context("gpgme init fail TODO")?; + + // // let mut clear_text = Vec::new(); + + // // // TODO: check dec_res + // // let _dec_res = pgp_ctx + // // .decrypt(enc_meta_keys, &mut clear_text) + // // .context("TODO gpgme dec")?; + + // // let meta_keys: MetaKeys = rmp_serde::from_read_ref(&clear_text).context("")?; + + // // Ok((meta_keys.meta, meta_keys.keys.into())) + + // Ok(Keys::default()) + // } +} diff --git a/crdt-enc-sodium/Cargo.toml b/crdt-enc-sodium/Cargo.toml new file mode 100644 index 0000000..0a19d7a --- /dev/null +++ b/crdt-enc-sodium/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "crdt-enc-sodium" +version = "0.1.0" +authors = ["Thomas Heck "] +edition = "2018" + +[dependencies] +crdts = "4" +sodiumoxide = "0.2" +anyhow = "1" +serde = "1" +rmp-serde = "0.14" +serde_bytes = "0.11" +uuid = "0.8" +async-trait = "0.1" + +[dependencies.crdt-enc] +path = "../crdt-enc" diff --git a/crdt-enc-sodium/src/lib.rs b/crdt-enc-sodium/src/lib.rs new file mode 100644 index 0000000..76c289e --- /dev/null +++ b/crdt-enc-sodium/src/lib.rs @@ -0,0 +1,85 @@ +use anyhow::{Context, Error, Result}; +use async_trait::async_trait; +use crdt_enc::{ + key_cryptor::KeyCryptor, + storage::Storage, + utils::{VersionBytes, VersionBytesRef}, +}; +use crdts::{CmRDT, CvRDT}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use sodiumoxide::crypto::secretbox; +use std::{borrow::Cow, fmt::Debug}; +use uuid::Uuid; + +// c7f269be-0ff5-4a77-99c3-7c23c96d5cb4 +const DATA_VERSION: Uuid = Uuid::from_u128(0xc7f269be0ff54a7799c37c23c96d5cb4); + +// 5df28591-439a-4cef-8ca6-8433276cc9ed +const KEY_VERSION: Uuid = Uuid::from_u128(0x5df28591439a4cef8ca68433276cc9ed); + +pub fn init() { + sodiumoxide::init().expect("sodium init failed"); +} + +#[derive(Debug)] +pub struct EncHandler; + +impl EncHandler { + pub fn new() -> EncHandler { + EncHandler + } +} + +#[async_trait] +impl crdt_enc::cryptor::Cryptor for EncHandler { + async fn gen_key(&self) -> Result { + let key = secretbox::gen_key(); + Ok(VersionBytes::new(KEY_VERSION, key.as_ref().into())) + } + + async fn encrypt(&self, key: VersionBytesRef<'_>, clear_text: &[u8]) -> Result> { + key.ensure_version(KEY_VERSION) + .context("not matching key version")?; + let key = secretbox::Key::from_slice(key.as_ref()).context("invalid key length")?; + + let nonce = secretbox::gen_nonce(); + let enc_data = secretbox::seal(clear_text, &nonce, &key); + let enc_box = EncBox { + nonce, + enc_data: enc_data.into(), + }; + let enc_box_bytes = + rmp_serde::to_vec_named(&enc_box).context("failed to encode encryption box")?; + let version_box = VersionBytesRef::new(DATA_VERSION, enc_box_bytes.as_ref()); + let version_box_bytes = + rmp_serde::to_vec_named(&version_box).context("failed to encode version box")?; + Ok(version_box_bytes) + } + + async fn decrypt(&self, key: VersionBytesRef<'_>, enc_data: &[u8]) -> Result> { + key.ensure_version(KEY_VERSION) + .context("not matching key version")?; + let key = secretbox::Key::from_slice(key.as_ref()).context("invalid key length")?; + + let version_box: VersionBytesRef = + rmp_serde::from_read_ref(enc_data).context("failed to parse version box")?; + version_box + .ensure_version(DATA_VERSION) + .context("not matching version of encryption box")?; + + let enc_box: EncBox = rmp_serde::from_read_ref(version_box.as_ref()) + .context("failed to parse encryption box")?; + let clear_text = secretbox::open(&enc_box.enc_data, &enc_box.nonce, &key) + .map_err(|_| Error::msg("failed decrypting data"))?; + Ok(clear_text) + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct EncBox<'a> { + nonce: secretbox::Nonce, + + #[serde(borrow)] + #[serde(with = "serde_bytes")] + enc_data: Cow<'a, [u8]>, +} diff --git a/crdt-enc-tokio/Cargo.toml b/crdt-enc-tokio/Cargo.toml new file mode 100644 index 0000000..cb08a45 --- /dev/null +++ b/crdt-enc-tokio/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "crdt-enc-tokio" +version = "0.1.0" +authors = ["Thomas Heck "] +edition = "2018" + +[dependencies] +crdts = "4" +serde = "1" +async-trait = "0.1" +futures = "0.3" +anyhow = "1" +uuid = "0.8" +data-encoding = "2" +bytes = "0.5" + +[dependencies.tiny-keccak] +version = "2" +features = ["sha3"] + +[dependencies.tokio] +version = "0.2" +features = ["fs", "stream", "io-util"] + +[dependencies.crdt-enc] +path = "../crdt-enc" diff --git a/crdt-enc-tokio/src/lib.rs b/crdt-enc-tokio/src/lib.rs new file mode 100644 index 0000000..75fc539 --- /dev/null +++ b/crdt-enc-tokio/src/lib.rs @@ -0,0 +1,445 @@ +use ::bytes::Buf; +use anyhow::{ensure, Context, Error, Result}; +use async_trait::async_trait; +use crdt_enc::{ + cryptor::Cryptor, + key_cryptor::KeyCryptor, + utils::{VersionBytes, VersionBytesRef}, +}; +use crdts::{CmRDT, CvRDT}; +use futures::{ + future::{Either, TryFutureExt}, + stream::{self, Stream, StreamExt, TryStreamExt}, +}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + convert::TryFrom, + fmt::{Debug, Write}, + path::{Path, PathBuf}, + str::FromStr, +}; +use tiny_keccak::{Hasher, Sha3}; +use tokio::{ + fs, + io::{self, AsyncWrite, AsyncWriteExt}, +}; +use uuid::Uuid; + +#[derive(Debug)] +pub struct Storage { + local_path: PathBuf, + remote_path: PathBuf, +} + +impl Storage { + pub fn new(local_path: PathBuf, remote_path: PathBuf) -> Result { + ensure!( + local_path.is_absolute(), + "local path {} is not absolute", + local_path.display() + ); + ensure!( + remote_path.is_absolute(), + "remote path {} is not absolute", + remote_path.display() + ); + + Ok(Storage { + local_path, + remote_path, + }) + } +} + +#[async_trait] +impl crdt_enc::storage::Storage for Storage { + async fn load_local_meta(&self) -> Result> { + let path = self.local_path.join("meta-data.msgpack"); + let bytes = read_file_optional(&path) + .await + .with_context(|| format!("failed reading local meta file {}", path.display()))?; + bytes + .map(|bytes| { + let lm = VersionBytes::try_from(bytes.as_ref()).with_context(|| { + format!("failed parsing local meta file {}", path.display()) + })?; + Ok(lm) + }) + .transpose() + } + + async fn store_local_meta(&self, meta: VersionBytes) -> Result<()> { + fs::create_dir_all(&self.local_path) + .await + .with_context(|| format!("failed creating local dir {:?}", self.local_path))?; + + let path = self.local_path.join("meta-data.msgpack"); + // TODO: catch concurrent writes, locking? + write_file(&path, meta.buf()) + .await + .with_context(|| format!("failed writing local meta file {:?}", path))?; + Ok(()) + } + + async fn list_remote_meta_names(&self) -> Result> { + let meta_dir = self.remote_path.join("meta"); + read_dir_optional_files(meta_dir) + .map_err(|err| err.context("failed listing remote meta entries")) + .and_then(|entry| async move { + let name = entry.file_name().into_string().ok().with_context(|| { + format!( + "failed converting remote meta entry name to string for {}", + entry.path().display() + ) + })?; + Ok(name) + }) + .try_collect() + .await + } + + async fn load_remote_metas(&self, names: Vec) -> Result> { + let futs = names.into_iter().map(|name| { + let mut path = self.remote_path.join("meta"); + path.push(&name); + let path = path; + + async move { + let bytes = fs::read(&path).await.with_context(|| { + format!("failed reading remote meta file {}", path.display()) + })?; + let rm = VersionBytes::try_from(bytes.as_ref()).with_context(|| { + format!("failed parsing remote meta file {}", path.display()) + })?; + Ok((name, rm)) + } + }); + + stream::iter(futs).buffer_unordered(32).try_collect().await + } + + async fn store_remote_meta(&self, meta: VersionBytes) -> Result { + let meta_dir = self.remote_path.join("meta"); + write_content_addressible_file(&meta_dir, &meta.as_version_bytes_ref()) + .await + .context("failed writing remote meta file") + } + + async fn remove_remote_metas(&self, names: Vec) -> Result<()> { + let futs = names.into_iter().map(|name| { + let mut path = self.remote_path.join("meta"); + path.push(&name); + let path = path; + + async move { + remove_file_optional(&path) + .await + .with_context(|| format!("failed removing remote meta file {}", name)) + } + }); + + stream::iter(futs).buffer_unordered(32).try_collect().await + } + + async fn list_state_names(&self) -> Result> { + let states_dir = self.remote_path.join("states"); + read_dir_optional_files(states_dir) + .map_err(|err| err.context("failed listing states")) + .and_then(|entry| async move { + let name = entry.file_name().into_string().ok().with_context(|| { + format!( + "failed converting state name to string for state file {}", + entry.path().display() + ) + })?; + Ok(name) + }) + .try_collect() + .await + } + + async fn load_states(&self, names: Vec) -> Result> { + let futs = names.into_iter().map(|name| { + let mut path = self.remote_path.join("states"); + path.push(&name); + let path = path; + + async move { + let block = fs::read(&path) + .await + .with_context(|| format!("failed reading state file {}", path.display()))?; + let block = VersionBytes::try_from(block.as_ref()) + .with_context(|| format!("failed parsing state file {}", path.display()))?; + Ok((name, block)) + } + }); + + stream::iter(futs).buffer_unordered(32).try_collect().await + } + + async fn store_state(&self, bytes: VersionBytes) -> Result { + let states_dir = self.remote_path.join("states"); + write_content_addressible_file(&states_dir, &bytes.as_version_bytes_ref()) + .await + .context("failed writing state file") + } + + async fn remove_states(&self, names: Vec) -> Result> { + let futs = names + .iter() + .map(|name| { + let mut path = self.remote_path.join("states"); + path.push(&name); + let path = path; + + async move { + remove_file_optional(&path) + .await + .with_context(|| format!("failed removing state file {}", name)) + } + }) + .map(Ok); + + stream::iter(futs) + .try_for_each_concurrent(32, |f| f) + .await?; + + Ok(names) + } + + async fn list_op_actors(&self) -> Result> { + let ops_dir = self.remote_path.join("ops"); + read_dir_optional_dirs(ops_dir) + .map_err(|err| err.context("failed listing actors")) + .and_then(|entry| async move { + let actor = entry.file_name(); + let actor = actor.to_str().with_context(|| { + format!("error converting actor dir name {:?} to string", actor) + })?; + let actor = Uuid::from_str(actor).with_context(|| { + format!("error converting actor dir string {} into uuid", actor) + })?; + Ok(actor) + }) + .try_collect() + .await + } + + async fn load_ops( + &self, + actor_first_versions: Vec<(Uuid, u64)>, + ) -> Result> { + async fn get_entry( + path: &Path, + actor: Uuid, + version: u64, + ) -> Result> { + let bytes = read_file_optional(path) + .await + .with_context(|| format!("failed reading op file {}", path.display()))?; + + let bytes = if let Some(bytes) = bytes { + bytes + } else { + return Ok(None); + }; + + let data = VersionBytes::try_from(bytes.as_ref()) + .with_context(|| format!("failed parsing op file {}", path.display()))?; + + Ok(Some((actor, version, data))) + } + + let path = self.remote_path.join("ops"); + + stream::iter(actor_first_versions) + .map(move |(actor, first_version)| { + let path = path.join(actor.to_string()); + + async move { + let ops = stream::iter(first_version..) + .then(move |version| { + let path = path.join(version.to_string()); + async move { get_entry(&path, actor, version).await } + }) + .take_while(|res| { + let res = match res { + Ok(None) => false, + Ok(Some(_)) => true, + Err(_) => true, + }; + async move { res } + }) + .try_filter_map(|opt| async move { Ok(opt) }) + .try_collect::>() + .await?; + + Result::<_, Error>::Ok(stream::iter(ops).map(Ok)) + } + }) + .buffer_unordered(32) + .try_flatten() + .try_collect() + .await + } + + async fn store_ops(&self, actor: Uuid, version: u64, bytes: VersionBytes) -> Result<()> { + let mut path = self.remote_path.join("ops"); + path.push(actor.to_string()); + + fs::create_dir_all(&path) + .await + .with_context(|| format!("failed creating op dir {:?} for actor {}", path, actor))?; + + path.push(version.to_string()); + write_new_file(&path, bytes.buf()) + .await + .with_context(|| format!("failed writing ops file {:?}", path))?; + Ok(()) + } + + async fn remove_ops(&self, names: Vec<(Uuid, u64)>) -> Result<()> { + let futs = names.into_iter().map(|(actor, version)| { + let mut path = self.remote_path.join("ops"); + path.push(actor.to_string()); + path.push(version.to_string()); + let path = path; + + async move { + remove_file_optional(&path).await.with_context(|| { + format!( + "failed removing ops file {} for actor {} version {}", + path.display(), + actor, + version + ) + }) + } + }); + + stream::iter(futs).buffer_unordered(32).try_collect().await + } +} + +async fn write_file(path: &Path, buf: impl Buf) -> io::Result<()> { + write_file_inner(path, buf, false).await +} + +async fn write_new_file(path: &Path, buf: impl Buf) -> io::Result<()> { + write_file_inner(path, buf, true).await +} + +async fn write_file_inner(path: &Path, mut buf: impl Buf, create_new: bool) -> io::Result<()> { + let mut open_options = fs::OpenOptions::new(); + if create_new { + open_options.create_new(true); + } else { + open_options.create(true).truncate(true); + } + let mut file = open_options.write(true).open(path).await?; + + while buf.has_remaining() { + file.write_buf(&mut buf).await?; + } + + // flush internal buffers + file.flush().await?; + // fsync + file.sync_all().await?; + // TODO: close explicitly to catch closing errors + // TODO: 1. write to tmp file 2. rename tmp file to real file + Ok(()) +} + +fn read_dir_optional_dirs(path: PathBuf) -> impl Stream> + 'static { + read_dir_optional_filter_types(path, false) +} + +fn read_dir_optional_files(path: PathBuf) -> impl Stream> + 'static { + read_dir_optional_filter_types(path, true) +} + +fn read_dir_optional_filter_types( + path: PathBuf, + is_file: bool, +) -> impl Stream> + 'static { + read_dir_optional(path) + .map(move |entry| async move { + let entry = entry?; + let ty = entry.file_type().await.with_context(|| { + format!("failed getting file type for {}", entry.path().display()) + })?; + match is_file { + true if ty.is_file() => Ok(Some(entry)), + false if ty.is_dir() => Ok(Some(entry)), + _ => Ok(None), + } + }) + .buffer_unordered(32) + .try_filter_map(|res| async move { Ok(res) }) +} + +fn read_dir_optional(path: PathBuf) -> impl Stream> + 'static { + async move { + match fs::read_dir(&path).await { + Ok(dir) => { + let entry_stream = dir.map(move |entry| { + entry.with_context(|| format!("failed getting entry from {}", path.display())) + }); + Ok(Either::Left(entry_stream)) + } + Err(err) if err.kind() == io::ErrorKind::NotFound => { + let empty = stream::empty(); + Ok(Either::Right(empty)) + } + Err(err) => Err(err).context(format!("failed listing entries in {}", path.display())), + } + } + .try_flatten_stream() +} + +async fn read_file_optional(path: &Path) -> Result>> { + match fs::read(&path).await { + Ok(bytes) => Ok(Some(bytes)), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(err).context(format!("failed reading file {}", path.display())), + } +} + +async fn write_content_addressible_file( + dir_path: &Path, + bytes: &VersionBytesRef<'_>, +) -> Result { + let mut digest = Sha3::v256(); + let mut buf = bytes.buf(); + while buf.has_remaining() { + let b = buf.bytes(); + digest.update(b); + buf.advance(b.len()); + } + let mut digest_output = [0; 32]; + digest.finalize(&mut digest_output); + let block_id = data_encoding::BASE32_NOPAD.encode(&digest_output); + + fs::create_dir_all(dir_path) + .await + .with_context(|| format!("failed creating dir {}", dir_path.display()))?; + let file_path = dir_path.join(&block_id); + write_new_file(&file_path, bytes.buf()) + .await + .with_context(|| { + format!( + "failed writing content addressible file {}", + file_path.display() + ) + })?; + Ok(block_id) +} + +async fn remove_file_optional(path: &Path) -> Result<()> { + match fs::remove_file(path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err).context(format!("failed removing file {}", path.display())), + } +} diff --git a/crdt-enc/Cargo.toml b/crdt-enc/Cargo.toml new file mode 100644 index 0000000..18fcaa1 --- /dev/null +++ b/crdt-enc/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "crdt-enc" +version = "0.1.0" +authors = ["Thomas Heck "] +edition = "2018" + +[dependencies] +crdts = "4" +serde = "1" +serde_bytes = "0.11" +rmp-serde = "0.14" +async-trait = "0.1" +anyhow = "1" +thiserror = "1" +futures = "0.3" +dyn-clone = "1" +bytes = "0.5" + +[dependencies.uuid] +version = "0.8" +features = ["serde", "v4"] diff --git a/crdt-enc/src/cryptor.rs b/crdt-enc/src/cryptor.rs new file mode 100644 index 0000000..7ec8d1b --- /dev/null +++ b/crdt-enc/src/cryptor.rs @@ -0,0 +1,32 @@ +use crate::{ + utils::{VersionBytes, VersionBytesRef}, + CoreSubHandle, Info, KeyCryptor, Storage, +}; +use anyhow::Result; +use async_trait::async_trait; +use crdts::{CmRDT, CvRDT, MVReg}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{fmt::Debug, sync::Arc}; +use uuid::Uuid; + +#[async_trait] +pub trait Cryptor +where + Self: 'static + Debug + Send + Sync + Sized, +{ + async fn init(&self, _core: &dyn CoreSubHandle) -> Result<()> { + Ok(()) + } + + async fn set_info(&self, _info: &Info) -> Result<()> { + Ok(()) + } + + async fn set_remote_meta(&self, _data: Option>) -> Result<()> { + Ok(()) + } + + async fn gen_key(&self) -> Result; + async fn encrypt(&self, key: VersionBytesRef<'_>, clear_text: &[u8]) -> Result>; + async fn decrypt(&self, key: VersionBytesRef<'_>, enc_data: &[u8]) -> Result>; +} diff --git a/crdt-enc/src/key_cryptor.rs b/crdt-enc/src/key_cryptor.rs new file mode 100644 index 0000000..9c7cac2 --- /dev/null +++ b/crdt-enc/src/key_cryptor.rs @@ -0,0 +1,133 @@ +use crate::{ + utils::{VersionBytes, VersionBytesRef}, + CoreSubHandle, Cryptor, Info, Storage, +}; +use anyhow::Result; +use async_trait::async_trait; +use crdts::{CmRDT, CvRDT, MVReg, Orswot}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{ + borrow::Borrow, + cmp::{Eq, Ord, Ordering, PartialEq}, + fmt::Debug, + hash::{Hash, Hasher}, + sync::Arc, +}; +use uuid::Uuid; + +#[async_trait] +pub trait KeyCryptor +where + Self: 'static + Debug + Send + Sync + Sized, +{ + async fn init(&self, _core: &dyn CoreSubHandle) -> Result<()> { + Ok(()) + } + + async fn set_info(&self, _info: &Info) -> Result<()> { + Ok(()) + } + + async fn set_remote_meta(&self, _data: Option>) -> Result<()> { + Ok(()) + } + + async fn set_keys(&self, keys: Keys) -> Result<()>; +} + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct Keys { + latest_key_id: MVReg, + keys: Orswot, +} + +impl CvRDT for Keys { + fn merge(&mut self, other: Keys) { + self.latest_key_id.merge(other.latest_key_id); + self.keys.merge(other.keys); + } +} + +impl Keys { + pub fn get_key(&self, key_id: Uuid) -> Option { + self.keys.read().val.take(&key_id) + } + + pub fn latest_key(&self) -> Option { + let mut keys = self.keys.read().val; + self.latest_key_id + .read() + .val + .into_iter() + .flat_map(move |id| keys.take(&id)) + .min() + } + + pub fn insert_latest_key(&mut self, actor: Uuid, new_key: Key) { + let key_id = new_key.id(); + + let write_ctx = self.keys.read_ctx().derive_add_ctx(actor); + let op = self.keys.add(new_key, write_ctx); + self.keys.apply(op); + + let write_ctx = self.latest_key_id.read_ctx().derive_add_ctx(actor); + let op = self.latest_key_id.write(key_id, write_ctx); + self.latest_key_id.apply(op); + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Key { + id: Uuid, + key: VersionBytes, +} + +impl Key { + pub fn new(key: VersionBytes) -> Key { + Self::new_with_id(Uuid::new_v4(), key) + } + + pub fn new_with_id(id: Uuid, key: VersionBytes) -> Key { + Key { id, key } + } + + pub fn id(&self) -> Uuid { + self.id + } + + pub fn key(&self) -> VersionBytesRef<'_> { + self.key.as_version_bytes_ref() + } +} + +impl Borrow for Key { + fn borrow(&self) -> &Uuid { + &self.id + } +} + +impl Hash for Key { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl PartialEq for Key { + fn eq(&self, other: &Self) -> bool { + self.id.eq(&other.id) + } +} + +impl Eq for Key {} + +impl PartialOrd for Key { + fn partial_cmp(&self, other: &Self) -> Option { + self.id.partial_cmp(&other.id) + } +} + +impl Ord for Key { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id) + } +} diff --git a/crdt-enc/src/lib.rs b/crdt-enc/src/lib.rs new file mode 100644 index 0000000..ee36d9b --- /dev/null +++ b/crdt-enc/src/lib.rs @@ -0,0 +1,759 @@ +pub mod cryptor; +pub mod key_cryptor; +pub mod storage; +pub mod task; +pub mod utils; + +use crate::{ + cryptor::Cryptor, + key_cryptor::{Key, KeyCryptor, Keys}, + storage::Storage, + utils::VersionBytes, +}; +use anyhow::{Context, Error, Result}; +use async_trait::async_trait; +use crdts::{CmRDT, CvRDT, MVReg, VClock}; +use dyn_clone::DynClone; +use futures::{ + future::BoxFuture, + lock::Mutex as AsyncMutex, + stream::{self, StreamExt, TryStreamExt}, +}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{ + collections::HashSet, + default::Default, + fmt::Debug, + mem, + sync::{Arc, Mutex as SyncMutex}, +}; +use uuid::Uuid; + +#[async_trait] +pub trait CoreSubHandle +where + Self: 'static + Debug + Send + Sync + DynClone, +{ + async fn compact(&self) -> Result<()>; + async fn read_remote(&self) -> Result<()>; + async fn read_remote_meta(&self) -> Result<()>; + + async fn set_keys(&self, keys: Keys) -> Result<()>; + + async fn set_remote_meta_storage(&self, remote_meta: MVReg) -> Result<()>; + async fn set_remote_meta_cryptor(&self, remote_meta: MVReg) -> Result<()>; + async fn set_remote_meta_key_cryptor( + &self, + remote_meta: MVReg, + ) -> Result<()>; +} + +#[async_trait] +impl CoreSubHandle for Arc> +where + S: 'static + + CmRDT + + CvRDT + + Default + + Serialize + + DeserializeOwned + + Clone + + Debug + + Send + + Sync, + ::Op: 'static + Serialize + DeserializeOwned + Clone + Send, + ST: Storage, + C: Cryptor, + KC: KeyCryptor, +{ + async fn compact(&self) -> Result<()> { + self.compact().await + } + + async fn read_remote(&self) -> Result<()> { + self.read_remote().await + } + + async fn read_remote_meta(&self) -> Result<()> { + self.read_remote_meta().await + } + + async fn set_keys(&self, keys: Keys) -> Result<()> { + self.set_keys(keys).await + } + + async fn set_remote_meta_storage(&self, remote_meta: MVReg) -> Result<()> { + self.set_remote_meta_storage(remote_meta).await + } + + async fn set_remote_meta_cryptor(&self, remote_meta: MVReg) -> Result<()> { + self.set_remote_meta_cryptor(remote_meta).await + } + + async fn set_remote_meta_key_cryptor( + &self, + remote_meta: MVReg, + ) -> Result<()> { + self.set_remote_meta_key_cryptor(remote_meta).await + } +} + +// #[async_trait] +// pub trait CoreTrait +// where +// Self: 'static + Debug + Send + Sync + Clone, +// ::Op: 'static + Serialize + DeserializeOwned + Clone + Send, +// { +// type State: 'static +// + CmRDT +// + CvRDT +// + Default +// + Serialize +// + DeserializeOwned +// + Clone +// + Debug +// + Send +// + Sync; + +// async fn compact(&self) -> Result<()>; +// async fn read_remote(&self) -> Result<()>; +// async fn read_remote_meta(&self) -> Result<()>; +// async fn apply_ops(&self, ops: Vec<::Op>) -> Result<()>; + +// async fn set_remote_meta_storage(&self, remote_meta: MVReg) -> Result<()>; +// async fn set_remote_meta_cryptor(&self, remote_meta: MVReg) -> Result<()>; +// async fn set_remote_meta_key_cryptor(&self, remote_meta: MVReg) +// -> Result<()>; +// } + +// #[async_trait] +// impl CoreTrait for Arc> +// where +// S: 'static +// + CmRDT +// + CvRDT +// + Default +// + Serialize +// + DeserializeOwned +// + Clone +// + Debug +// + Send +// + Sync, +// ::Op: 'static + Serialize + DeserializeOwned + Clone + Send, +// ST: Storage, +// C: Cryptor, +// KC: KeyCryptor, +// { +// type State = S; + +// async fn compact(&self) -> Result<()> { +// self.compact_().await +// } + +// async fn read_remote(&self) -> Result<()> { +// self.read_remote_().await +// } + +// async fn read_remote_meta(&self) -> Result<()> { +// self.read_remote_meta_(false).await +// } + +// async fn apply_ops(&self, ops: Vec<::Op>) -> Result<()> { +// self.apply_ops_(ops).await +// } + +// async fn set_remote_meta_storage(&self, remote_meta: MVReg) -> Result<()> { +// self.set_remote_meta_storage_(remote_meta).await +// } + +// async fn set_remote_meta_cryptor(&self, remote_meta: MVReg) -> Result<()> { +// self.set_remote_meta_cryptor_(remote_meta).await +// } + +// async fn set_remote_meta_key_cryptor( +// &self, +// remote_meta: MVReg, +// ) -> Result<()> { +// self.set_remote_meta_key_cryptor_(remote_meta).await +// } +// } + +#[derive(Debug)] +pub struct Core { + storage: ST, + cryptor: C, + key_cryptor: KC, + // use sync `std::sync::Mutex` here because it has less overhead than async mutex, we are + // holding it for a very shot time and do not `.await` while the lock is held. + data: SyncMutex>, + // task_mgr: task::TaskMgr, + supported_data_versions: Vec, + current_data_version: Uuid, + apply_ops_lock: AsyncMutex<()>, +} + +#[derive(Debug)] +struct CoreMutData { + local_meta: Option, + remote_meta: RemoteMeta, + keys: Keys, + state: StateWrapper, + read_states: HashSet, + read_remote_metas: HashSet, +} + +impl Core +where + S: 'static + + CmRDT + + CvRDT + + Default + + Serialize + + DeserializeOwned + + Clone + + Debug + + Send + + Sync, + ::Op: 'static + Serialize + DeserializeOwned + Clone + Send, + ST: Storage, + C: Cryptor, + KC: KeyCryptor, +{ + pub async fn open(options: OpenOptions) -> Result<(Arc, Info)> { + let core_data = SyncMutex::new(CoreMutData { + local_meta: None, + remote_meta: RemoteMeta::default(), + keys: Keys::default(), + state: StateWrapper { + next_op_versions: Default::default(), + state: Default::default(), + }, + read_states: HashSet::new(), + read_remote_metas: HashSet::new(), + }); + + let mut supported_data_versions = options.supported_data_versions; + supported_data_versions.sort_unstable(); + + let core = Arc::new(Core { + storage: options.storage, + cryptor: options.cryptor, + key_cryptor: options.key_cryptor, + supported_data_versions, + current_data_version: options.current_data_version, + data: core_data, + apply_ops_lock: AsyncMutex::new(()), + }); + + futures::try_join![ + core.storage.init(&core), + core.cryptor.init(&core), + core.key_cryptor.init(&core), + ]?; + + let local_meta = core + .storage + .load_local_meta() + .await + .context("failed getting local meta")?; + let local_meta: LocalMeta = match local_meta { + Some(local_meta) => { + local_meta.ensure_versions(&core.supported_data_versions)?; + rmp_serde::from_read_ref(&local_meta)? + } + None => { + if !options.create { + return Err(Error::msg( + "local meta does not exist, and `create` option is not set", + )); + } + let local_meta = LocalMeta { + local_actor_id: Uuid::new_v4(), + }; + // TODO: use core version + let vbox = VersionBytes::new( + core.current_data_version, + rmp_serde::to_vec_named(&local_meta)?, + ); + + core.storage + .store_local_meta(vbox) + .await + .context("failed storing local meta")?; + local_meta + } + }; + + let info = Info { + actor: local_meta.local_actor_id, + }; + + core.with_mut_data(|data| { + data.local_meta = Some(local_meta); + Ok(()) + })?; + + futures::try_join![ + core.storage.set_info(&info), + core.cryptor.set_info(&info), + core.key_cryptor.set_info(&info), + ]?; + + core.read_remote_meta_(true).await?; + + let insert_new_key = core.with_mut_data(|data| Ok(data.keys.latest_key().is_none()))?; + + if insert_new_key { + let new_key = core.cryptor.gen_key().await?; + + let keys = core.with_mut_data(|data| { + data.keys.insert_latest_key(info.actor(), Key::new(new_key)); + Ok(data.keys.clone()) + })?; + + core.key_cryptor.set_keys(keys).await?; + } + + Ok((core, info)) + } + + fn with_mut_data(self: &Arc, f: F) -> Result + where + F: FnOnce(&mut CoreMutData) -> Result, + { + let mut data = self + .data + .lock() + .map_err(|err| Error::msg(format!("unable to lock `CoreMutData`: {}", err)))?; + + f(&mut *data) + } + + /// Locks cores data, do not call recursivl + pub fn with_state(self: &Arc, f: F) -> Result + where + F: FnOnce(&S) -> Result, + { + self.with_mut_data(|data| f(&data.state.state)) + } + + pub async fn compact(self: &Arc) -> Result<()> { + self.read_remote().await?; + + let (clear_text, states_to_remove, ops_to_remove, key) = self.with_mut_data(|data| { + let clear_text = rmp_serde::to_vec_named(&data.state)?; + + let states_to_remove = data.read_states.iter().cloned().collect(); + + let ops_to_remove = data + .state + .next_op_versions + .iter() + .map(|dot| (dot.actor.clone(), dot.counter - 1)) + .collect(); + + let key = data.keys.latest_key().context("no latest key")?; + + Ok((clear_text, states_to_remove, ops_to_remove, key)) + })?; + + let data_enc = self.cryptor.encrypt(key.key(), &clear_text).await.unwrap(); + + let enc_data = VersionBytes::new(self.current_data_version, data_enc); + + // first store new state + let new_state_name = self.storage.store_state(enc_data).await?; + + // then remove old states and ops + let (removed_states, _) = futures::try_join![ + self.storage.remove_states(states_to_remove), + self.storage.remove_ops(ops_to_remove), + ]?; + + self.with_mut_data(|data| { + for removed_state in removed_states { + data.read_states.remove(&removed_state); + } + + data.read_states.insert(new_state_name); + Ok(()) + })?; + + Ok(()) + } + + async fn set_keys(self: &Arc, keys: Keys) -> Result<()> { + self.with_mut_data(|data| { + data.keys.merge(keys); + Ok(()) + })?; + + Ok(()) + } + + pub async fn read_remote(self: &Arc) -> Result<()> { + let states_read = self.read_remote_states().await?; + let ops_read = self.read_remote_ops().await?; + + if states_read || ops_read { + // TODO: notify app of state changes + } + + Ok(()) + } + + async fn read_remote_states(self: &Arc) -> Result { + let names = self + .storage + .list_state_names() + .await + .context("failed getting state entry names while reading remote states")?; + + let (states_to_read, key) = self.with_mut_data(|data| { + let states_to_read: Vec<_> = names + .into_iter() + .filter(|name| !data.read_states.contains(name)) + .collect(); + + let key = data.keys.latest_key().context("no latest key")?; + + Ok((states_to_read, key)) + })?; + + let new_states = self + .storage + .load_states(states_to_read) + .await + .context("failed loading state content while reading remote states")?; + + let new_states: Vec<_> = stream::iter(new_states) + .map(|(name, state)| { + let key = key.clone(); + async move { + // TODO: use "Core"s version because we are storing the state in a wrapper with + // other data, and also store app version + state.ensure_versions(&self.supported_data_versions)?; + + let clear_text = self + .cryptor + .decrypt(key.key(), state.as_ref()) + .await + .with_context(|| format!("failed decrypting remote state {}", name))?; + + let state_wrapper: StateWrapper = rmp_serde::from_read_ref(&clear_text)?; + + Result::<_>::Ok((name, state_wrapper)) + } + }) + .buffer_unordered(16) + .try_collect() + .await?; + + let states_read = !new_states.is_empty(); + + self.with_mut_data(|data| { + for (name, state_wrapper) in new_states { + data.state.state.merge(state_wrapper.state); + data.state + .next_op_versions + .merge(state_wrapper.next_op_versions); + data.read_states.insert(name); + } + Ok(()) + })?; + + Ok(states_read) + } + + async fn read_remote_ops(self: &Arc) -> Result { + let actors = self + .storage + .list_op_actors() + .await + .context("failed getting op actor entries while reading remote ops")?; + + let (ops_to_read, key) = self.with_mut_data(|data| { + let ops_to_read: Vec<_> = actors + .into_iter() + .map(|actor| (actor, data.state.next_op_versions.get(&actor))) + .collect(); + + let key = data.keys.latest_key().context("no latest key")?; + + Ok((ops_to_read, key)) + })?; + + let new_ops = self.storage.load_ops(ops_to_read).await?; + + let new_ops: Vec<_> = stream::iter(new_ops) + .map(|(actor, version, data)| { + let key = key.clone(); + async move { + // TODO: use cores version + data.ensure_versions(&self.supported_data_versions)?; + let clear_text = self + .cryptor + .decrypt(key.key(), data.as_ref()) + .await + .unwrap(); + + let ops: Vec<_> = rmp_serde::from_read_ref(&clear_text)?; + // TODO: check apps version + + Result::<_, Error>::Ok((actor, version, ops)) + } + }) + .buffered(16) + .try_collect() + .await?; + + let ops_read = self.with_mut_data(|data| { + let mut ops_read = false; + for (actor, version, ops) in new_ops { + let expected_version = data.state.next_op_versions.get(&actor); + + if version < expected_version { + // already read that version (concurrent call to this fn between us reading + // the ops and processing them) + continue; + } + + if expected_version < version { + return Err(Error::msg( + "Unexpected op version. Got ops in the wrong order? Bug in storage?", + )); + } + + for op in ops { + data.state.state.apply(op); + } + + let version_inc = data.state.next_op_versions.inc(actor); + data.state.next_op_versions.apply(version_inc); + + ops_read = true; + } + + Ok(ops_read) + })?; + + Ok(ops_read) + } + + async fn read_remote_meta(self: &Arc) -> Result<()> { + self.read_remote_meta_(false).await + } + + async fn read_remote_meta_(self: &Arc, force_notify: bool) -> Result<()> { + let names = self + .storage + .list_remote_meta_names() + .await + .context("failed getting remote meta entry names while reading remote metas")?; + + let remote_metas_to_read = self.with_mut_data(|data| { + let remote_metas_to_read: Vec<_> = names + .into_iter() + .filter(|name| !data.read_remote_metas.contains(name)) + .collect(); + Ok(remote_metas_to_read) + })?; + + let remote_metas = self + .storage + .load_remote_metas(remote_metas_to_read) + .await + .context("failed loading remote meta while reading remote metas")? + .into_iter() + .map(|(name, vbox)| { + // TODO: use "Core"s version because we are storing the state in a wrapper with + // other data, and also store app version + vbox.ensure_versions(&self.supported_data_versions)?; + + let remote_meta: RemoteMeta = rmp_serde::from_read_ref(&vbox)?; + + Ok((name, remote_meta)) + }) + .collect::>>()?; + + let remote_meta = if !remote_metas.is_empty() { + self.with_mut_data(|data| { + for (name, meta) in remote_metas { + data.remote_meta.merge(meta); + data.read_remote_metas.insert(name); + } + + Ok(Some(data.remote_meta.clone())) + })? + } else { + None + }; + + if let Some(remote_meta) = remote_meta { + futures::try_join![ + self.storage.set_remote_meta(Some(remote_meta.storage)), + self.cryptor.set_remote_meta(Some(remote_meta.cryptor)), + self.key_cryptor + .set_remote_meta(Some(remote_meta.key_cryptor)), + ]?; + } else if force_notify { + futures::try_join![ + self.storage.set_remote_meta(None), + self.cryptor.set_remote_meta(None), + self.key_cryptor.set_remote_meta(None), + ]?; + } + + Ok(()) + } + + async fn set_remote_meta_storage( + self: &Arc, + remote_meta: MVReg, + ) -> Result<()> { + self.with_mut_data(|data| { + data.remote_meta.storage.merge(remote_meta); + Ok(()) + })?; + + self.store_remote_meta().await + } + + async fn set_remote_meta_cryptor( + self: &Arc, + remote_meta: MVReg, + ) -> Result<()> { + self.with_mut_data(|data| { + data.remote_meta.cryptor.merge(remote_meta); + Ok(()) + })?; + + self.store_remote_meta().await + } + + async fn set_remote_meta_key_cryptor( + self: &Arc, + remote_meta: MVReg, + ) -> Result<()> { + self.with_mut_data(|data| { + data.remote_meta.key_cryptor.merge(remote_meta); + Ok(()) + })?; + + self.store_remote_meta().await + } + + async fn store_remote_meta(self: &Arc) -> Result<()> { + let vbox = self.with_mut_data(|data| { + let bytes = rmp_serde::to_vec_named(&data.remote_meta)?; + // TODO: use core version + Ok(VersionBytes::new(self.current_data_version, bytes)) + })?; + + let new_name = self.storage.store_remote_meta(vbox).await?; + + let names_to_remove = self.with_mut_data(|data| { + let names_to_remove = data.read_remote_metas.drain().collect(); + data.read_remote_metas.insert(new_name); + Ok(names_to_remove) + })?; + + self.storage.remove_remote_metas(names_to_remove).await?; + + Ok(()) + } + + pub async fn apply_ops(self: &Arc, ops: Vec) -> Result<()> { + // don't allow concurrent op applies + let apply_ops_lock = self.apply_ops_lock.lock().await; + + let clear_text = rmp_serde::to_vec_named(&ops)?; + + let key = self.with_mut_data(|data| data.keys.latest_key().context("no latest key"))?; + + let data_enc = self.cryptor.encrypt(key.key(), &clear_text).await.unwrap(); + + // TODO: add key id + // let block = Block { + // data_version: self.current_data_version, + // key_id: Uuid::nil(), + // data_enc, + // }; + + // TODO: use core version + let data_enc = VersionBytes::new(self.current_data_version, data_enc); + + let (actor, version) = self.with_mut_data(|data| { + let actor = data + .local_meta + .as_ref() + .ok_or_else(|| Error::msg("local meta not loaded"))? + .local_actor_id; + let version = data.state.next_op_versions.get(&actor); + Ok((actor, version)) + })?; + + self.storage.store_ops(actor, version, data_enc).await?; + + self.with_mut_data(|data| { + for op in ops { + data.state.state.apply(op); + } + + let version_inc = data.state.next_op_versions.inc(actor); + data.state.next_op_versions.apply(version_inc); + Ok(()) + })?; + + // release lock by hand to prevent an early release by accident + mem::drop(apply_ops_lock); + + Ok(()) + } +} + +pub struct OpenOptions { + pub storage: ST, + pub cryptor: C, + pub key_cryptor: KC, + pub create: bool, + pub supported_data_versions: Vec, + pub current_data_version: Uuid, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct LocalMeta { + pub(crate) local_actor_id: Uuid, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct StateWrapper { + pub(crate) next_op_versions: VClock, + pub(crate) state: S, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +struct RemoteMeta { + storage: MVReg, + cryptor: MVReg, + key_cryptor: MVReg, +} + +impl CvRDT for RemoteMeta { + fn merge(&mut self, other: Self) { + self.storage.merge(other.storage); + self.cryptor.merge(other.cryptor); + self.key_cryptor.merge(other.key_cryptor); + } +} + +#[derive(Debug, Clone)] +pub struct Info { + actor: Uuid, +} + +impl Info { + pub fn actor(&self) -> Uuid { + self.actor + } +} diff --git a/crdt-enc/src/storage.rs b/crdt-enc/src/storage.rs new file mode 100644 index 0000000..ef9436c --- /dev/null +++ b/crdt-enc/src/storage.rs @@ -0,0 +1,48 @@ +use crate::{utils::VersionBytes, CoreSubHandle, Cryptor, Info, KeyCryptor}; +use anyhow::Result; +use async_trait::async_trait; +use crdts::{CmRDT, CvRDT, MVReg}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{fmt::Debug, sync::Arc}; +use uuid::Uuid; + +#[async_trait] +pub trait Storage +where + Self: 'static + Debug + Send + Sync + Sized, +{ + async fn init(&self, _core: &dyn CoreSubHandle) -> Result<()> { + Ok(()) + } + + async fn set_info(&self, _info: &Info) -> Result<()> { + Ok(()) + } + + async fn set_remote_meta(&self, _data: Option>) -> Result<()> { + Ok(()) + } + + async fn load_local_meta(&self) -> Result>; + async fn store_local_meta(&self, data: VersionBytes) -> Result<()>; + + async fn list_remote_meta_names(&self) -> Result>; + async fn load_remote_metas(&self, names: Vec) -> Result>; + async fn store_remote_meta(&self, data: VersionBytes) -> Result; + async fn remove_remote_metas(&self, names: Vec) -> Result<()>; + + async fn list_state_names(&self) -> Result>; + async fn load_states(&self, names: Vec) -> Result>; + async fn store_state(&self, data: VersionBytes) -> Result; + async fn remove_states(&self, names: Vec) -> Result>; + + async fn list_op_actors(&self) -> Result>; + + /// needs to return the ops ordered by version of that actor + async fn load_ops( + &self, + actor_first_versions: Vec<(Uuid, u64)>, + ) -> Result>; + async fn store_ops(&self, actor: Uuid, version: u64, data: VersionBytes) -> Result<()>; + async fn remove_ops(&self, actor_last_verions: Vec<(Uuid, u64)>) -> Result<()>; +} diff --git a/crdt-enc/src/task.rs b/crdt-enc/src/task.rs new file mode 100644 index 0000000..f668772 --- /dev/null +++ b/crdt-enc/src/task.rs @@ -0,0 +1,159 @@ +use anyhow::Result; +use futures::{ + channel::mpsc, + future::{self, BoxFuture, Future, FutureExt}, + stream::FuturesUnordered, + stream::{FusedStream, StreamExt}, + task::{self, Poll, SpawnError}, +}; +use std::{fmt, pin::Pin, result::Result as StdResult}; + +// thread_local! { +// // need to use `Box` here, +// static TL_DATA: RefCell>> = RefCell::new(None); +// } + +// pub struct TaskMgrAccessor; + +// impl TaskMgrAccessor { +// pub fn with(f: F) -> R +// where +// T: 'static, +// F: FnOnce(&mut T) -> R, +// { +// TL_DATA.with(|data| { +// let mut data = data.borrow_mut(); +// let data = data.as_mut().expect("TaskMgrAccessor data not set"); +// if let Some(data) = data.downcast_mut::() { +// f(data) +// } else { +// panic!(format!( +// "Data in TaskMgrAccessor has wrong type, expected type: {}", +// any::type_name::() +// )); +// } +// }) +// } + +// pub fn set_with(val: T, f: F) -> (T, R) +// where +// T: 'static, +// F: FnOnce() -> R, +// { +// TL_DATA.with(|data| { +// let mut data = data.borrow_mut(); +// *data = Some(Box::new(val)); +// }); + +// let res = f(); + +// let val = TL_DATA.with(|data| { +// let mut data = data.borrow_mut(); +// let data = data.take().expect("TaskMgrAccessor data not set"); +// if let Ok(data) = data.downcast::() { +// *data +// } else { +// panic!(format!( +// "Data in TaskMgrAccessor has wrong type, expected type: {}", +// any::type_name::() +// )); +// } +// }); + +// (val, res) +// } +// } + +pub struct TaskMgrExecutor { + futs: FuturesUnordered>>, + rx: mpsc::UnboundedReceiver>>, +} + +impl Future for TaskMgrExecutor { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll { + while let Poll::Ready(Some(fut)) = self.rx.poll_next_unpin(ctx) { + self.futs.push(fut); + } + + if self.futs.is_empty() { + if self.rx.is_terminated() { + // no running tasks & the receiver closed => exit + return Poll::Ready(Ok(())); + } else { + return Poll::Pending; + } + } + + while let Poll::Ready(res) = self.futs.poll_next_unpin(ctx) { + match res { + Some(Ok(())) => {} + Some(Err(err)) => { + return Poll::Ready(Err(err)); + } + None => { + return Poll::Ready(Ok(())); + } + } + } + + Poll::Pending + } +} + +impl fmt::Debug for TaskMgrExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskMgrExecutor").finish() + } +} + +#[derive(Clone)] +pub struct TaskMgr { + tx: mpsc::UnboundedSender>>, +} + +impl TaskMgr { + pub fn new() -> (Self, TaskMgrExecutor) { + let (tx, rx) = mpsc::unbounded(); + + ( + TaskMgr { tx }, + TaskMgrExecutor { + futs: FuturesUnordered::new(), + rx, + }, + ) + } + + pub fn spawn(&self, fut: F) -> StdResult<(), SpawnError> + where + F: 'static + Send + Future>, + { + self.tx + .unbounded_send(fut.boxed()) + .map_err(|_| SpawnError::shutdown())?; + Ok(()) + } + + pub fn spawn_with_handle( + &self, + fut: F, + ) -> StdResult, SpawnError> + where + F: 'static + Send + Future, + F::Output: 'static + Send, + { + let (remote, handle) = fut.remote_handle(); + self.tx + .unbounded_send(remote.map(|()| Result::Ok(())).boxed()) + .map_err(|_| SpawnError::shutdown())?; + Ok(handle) + } +} + +impl fmt::Debug for TaskMgr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskMgr").finish() + } +} diff --git a/crdt-enc/src/utils.rs b/crdt-enc/src/utils.rs new file mode 100644 index 0000000..887c8e8 --- /dev/null +++ b/crdt-enc/src/utils.rs @@ -0,0 +1,19 @@ +mod version_bytes; + +pub use version_bytes::*; + +use crdts::{CmRDT, CvRDT}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct EmptyCrdt; + +impl CmRDT for EmptyCrdt { + type Op = (); + + fn apply(&mut self, _op: Self::Op) {} +} + +impl CvRDT for EmptyCrdt { + fn merge(&mut self, _other: Self) {} +} diff --git a/crdt-enc/src/utils/version_bytes.rs b/crdt-enc/src/utils/version_bytes.rs new file mode 100644 index 0000000..2cc846a --- /dev/null +++ b/crdt-enc/src/utils/version_bytes.rs @@ -0,0 +1,273 @@ +use serde::{Deserialize, Serialize}; +use serde_bytes; +use std::{borrow::Cow, convert::TryFrom, fmt, io::IoSlice}; +use uuid::Uuid; + +#[derive(Debug)] +pub struct VersionError { + expected: Vec, + got: Uuid, +} + +impl fmt::Display for VersionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "version check failed, got: {}, expected one of: ", + self.got + )?; + for (i, e) in self.expected.iter().enumerate() { + if i != 0 { + f.write_str(", ")?; + } + fmt::Display::fmt(e, f)?; + } + Ok(()) + } +} + +impl std::error::Error for VersionError {} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct VersionBytes(Uuid, #[serde(with = "serde_bytes")] Vec); + +impl VersionBytes { + pub fn new(version: Uuid, content: Vec) -> VersionBytes { + VersionBytes(version, content) + } + + pub fn version(&self) -> Uuid { + self.0 + } + + pub fn ensure_version(&self, version: Uuid) -> Result<(), VersionError> { + if self.0 != version { + Err(VersionError { + expected: vec![version], + got: self.0, + }) + } else { + Ok(()) + } + } + + /// `versions` needs to be sorted! + pub fn ensure_versions(&self, versions: &[Uuid]) -> Result<(), VersionError> { + if versions.binary_search(&self.0).is_err() { + Err(VersionError { + expected: versions.to_owned(), + got: self.0, + }) + } else { + Ok(()) + } + } + + pub fn as_version_bytes_ref(&self) -> VersionBytesRef<'_> { + VersionBytesRef::new(self.0, &self.1) + } + + pub fn buf(&self) -> VersionBytesBuf<'_> { + VersionBytesBuf::new(self.0, &self.1) + } +} + +impl From for Vec { + fn from(v: VersionBytes) -> Vec { + v.1 + } +} + +impl From> for VersionBytes { + fn from(v: VersionBytesRef<'_>) -> VersionBytes { + VersionBytes::new(v.0, v.1.into()) + } +} + +impl AsRef<[u8]> for VersionBytes { + fn as_ref(&self) -> &[u8] { + self.1.as_ref() + } +} + +impl TryFrom<&[u8]> for VersionBytes { + type Error = ParseError; + + fn try_from(buf: &[u8]) -> Result { + Ok(VersionBytesRef::try_from(buf)?.into()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct VersionBytesRef<'a>( + Uuid, + #[serde(borrow)] + #[serde(with = "serde_bytes")] + Cow<'a, [u8]>, +); + +impl<'a> VersionBytesRef<'a> { + pub fn new(version: Uuid, content: &'a [u8]) -> VersionBytesRef<'a> { + VersionBytesRef(version, Cow::Borrowed(content)) + } + + pub fn version(&self) -> Uuid { + self.0 + } + + pub fn ensure_version(&self, version: Uuid) -> Result<(), VersionError> { + if self.0 != version { + Err(VersionError { + expected: vec![version], + got: self.0, + }) + } else { + Ok(()) + } + } + + /// `versions` needs to be sorted! + pub fn ensure_versions(&self, versions: &[Uuid]) -> Result<(), VersionError> { + if versions.binary_search(&self.0).is_err() { + Err(VersionError { + expected: versions.to_owned(), + got: self.0, + }) + } else { + Ok(()) + } + } + + pub fn buf(&self) -> VersionBytesBuf<'_> { + VersionBytesBuf::new(self.0, &self.1) + } +} + +impl<'a> AsRef<[u8]> for VersionBytesRef<'a> { + fn as_ref(&self) -> &[u8] { + self.1.as_ref() + } +} + +impl<'a> From<&'a VersionBytes> for VersionBytesRef<'a> { + fn from(v: &'a VersionBytes) -> VersionBytesRef<'a> { + VersionBytesRef::new(v.0, &v.1) + } +} + +impl<'a> TryFrom<&'a [u8]> for VersionBytesRef<'a> { + type Error = ParseError; + + fn try_from(buf: &'a [u8]) -> Result, ParseError> { + if buf.len() < BUF_VERSION_LEN_BYTES { + return Err(ParseError::InvalidLength); + } + + let mut version = [0; 16]; + version.copy_from_slice(&buf[0..16]); + let version = Uuid::from_bytes(version); + + let mut len = [0; 8]; + len.copy_from_slice(&buf[16..24]); + let len = + usize::try_from(u64::from_le_bytes(len)).map_err(|_| ParseError::InvalidLength)?; + + // TODO: check for max len? + + if buf.len() - BUF_VERSION_LEN_BYTES != len { + return Err(ParseError::InvalidLength); + } + + Ok(VersionBytesRef::new(version, &buf[BUF_VERSION_LEN_BYTES..])) + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub enum ParseError { + InvalidLength, +} + +impl fmt::Display for ParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "invalid length".fmt(f) + } +} + +impl std::error::Error for ParseError {} + +const BUF_VERSION_LEN_BYTES: usize = 16 + 8; + +#[derive(Debug, Clone)] +pub struct VersionBytesBuf<'a> { + pos: usize, + version_len: [u8; BUF_VERSION_LEN_BYTES], + content: &'a [u8], +} + +impl<'a> VersionBytesBuf<'a> { + pub fn new(version: Uuid, content: &'a [u8]) -> VersionBytesBuf<'a> { + let mut version_len = [0; BUF_VERSION_LEN_BYTES]; + version_len[0..16].copy_from_slice(version.as_bytes()); + + let len = u64::try_from(content.len()).expect("Could not convert len (usize) into u64"); + version_len[16..].copy_from_slice(&len.to_le_bytes()); + + VersionBytesBuf { + pos: 0, + version_len, + content, + } + } +} + +impl<'a> ::bytes::Buf for VersionBytesBuf<'a> { + fn remaining(&self) -> usize { + BUF_VERSION_LEN_BYTES + self.content.len() - self.pos + } + + fn bytes(&self) -> &[u8] { + if self.pos < BUF_VERSION_LEN_BYTES { + &self.version_len[self.pos..] + } else { + let pos = self.pos - BUF_VERSION_LEN_BYTES; + if self.content.len() <= pos { + &[] + } else { + &self.content[pos..] + } + } + } + + fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.remaining()); + self.pos += cnt; + } + + fn bytes_vectored<'b>(&'b self, dst: &mut [IoSlice<'b>]) -> usize { + // TODO: TESTING! + + if dst.len() == 0 { + return 0; + } + + if self.pos < BUF_VERSION_LEN_BYTES { + dst[0] = IoSlice::new(&self.version_len[self.pos..]); + + if dst.len() == 1 { + 1 + } else { + dst[1] = IoSlice::new(self.content); + 2 + } + } else { + let pos = self.pos - BUF_VERSION_LEN_BYTES; + if self.content.len() <= pos { + 0 + } else { + dst[0] = IoSlice::new(&self.content[pos..]); + 1 + } + } + } +} diff --git a/examples/test/.pijulignore b/examples/test/.pijulignore new file mode 100644 index 0000000..3af0ccb --- /dev/null +++ b/examples/test/.pijulignore @@ -0,0 +1 @@ +/data diff --git a/examples/test/Cargo.toml b/examples/test/Cargo.toml new file mode 100644 index 0000000..4d0236b --- /dev/null +++ b/examples/test/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "example-test" +version = "0.1.0" +authors = ["Thomas Heck "] +edition = "2018" + +[dependencies] +crdt-enc = {path="../../crdt-enc"} +crdt-enc-tokio = {path="../../crdt-enc-tokio"} +crdt-enc-sodium = {path="../../crdt-enc-sodium"} +crdt-enc-gpgme = {path="../../crdt-enc-gpgme"} +uuid = {version = "0.8", features = ["serde", "v4"]} +crdts = "4" +tokio = {version = "0.2", features = ["macros"]} +anyhow = "1" +futures = "0.3" diff --git a/examples/test/src/main.rs b/examples/test/src/main.rs new file mode 100644 index 0000000..1f75c5f --- /dev/null +++ b/examples/test/src/main.rs @@ -0,0 +1,57 @@ +use anyhow::Result; +use crdt_enc_gpgme::KeyHandler; +use crdt_enc_sodium::EncHandler; +use crdt_enc_tokio::Storage; +use futures::future::FutureExt; +use uuid::Uuid; + +const CURRENT_DATA_VERSION: Uuid = Uuid::from_u128(1u128); + +#[tokio::main] +async fn main() -> Result<()> { + crdt_enc_sodium::init(); + + let data_dir = std::fs::canonicalize("./").unwrap().join("data"); + + let storage = Storage::new(data_dir.join("local"), data_dir.join("remote"))?; + let cryptor = EncHandler::new(); + let key_cryptor = KeyHandler::new(); + let open_options = crdt_enc::OpenOptions { + storage, + cryptor, + key_cryptor, + create: true, + supported_data_versions: [CURRENT_DATA_VERSION].iter().cloned().collect(), + current_data_version: CURRENT_DATA_VERSION, + }; + let (repo, info) = crdt_enc::Core::open(open_options).await?; + + // let actor_id = repo.actor_id(); + + // repo.run(futures::stream::empty()).try_for_each(|state| { + // dbg!(state); + // }); + + // dbg!(&repo); + + repo.read_remote().await?; + + // dbg!(&repo); + + // repo.compact().await?; + + // dbg!(&repo); + + let op = repo.with_state(|s: &crdts::MVReg| { + let read_ctx = s.read(); + let add_ctx = read_ctx.derive_add_ctx(info.actor()); + let op = s.write(read_ctx.val.into_iter().max().unwrap_or(0) + 1, add_ctx); + Ok(op) + })?; + + dbg!(&op); + + repo.apply_ops(vec![op]).await?; + + Ok(()) +} diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..615021c --- /dev/null +++ b/shell.nix @@ -0,0 +1,15 @@ +let + pkgs = import {}; +in + pkgs.stdenv.mkDerivation { + name = "crdt-enc"; + buildInputs = [ + pkgs.gpgme + pkgs.libsodium + + pkgs.cargo + pkgs.rustc + pkgs.rustfmt + pkgs.rust-analyzer + ]; + }