From c0a099909eae842943ac8d6c98f06624b65dfae6 Mon Sep 17 00:00:00 2001 From: Thomas Heck Date: Sun, 21 Feb 2021 18:00:26 +0100 Subject: [PATCH] feat: impl `Spawn` for `Core` so that sub systems are able to spawn tasks --- crdt-enc/src/lib.rs | 19 +++++++++++++++++-- examples/test/.pijulignore | 1 - examples/test/src/main.rs | 14 ++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) delete mode 100644 examples/test/.pijulignore diff --git a/crdt-enc/src/lib.rs b/crdt-enc/src/lib.rs index 70dfc62..a0d14f6 100644 --- a/crdt-enc/src/lib.rs +++ b/crdt-enc/src/lib.rs @@ -16,6 +16,7 @@ use ::dyn_clone::DynClone; use ::futures::{ lock::Mutex as AsyncMutex, stream::{self, StreamExt, TryStreamExt}, + task, }; use ::serde::{de::DeserializeOwned, Deserialize, Serialize}; use ::std::{ @@ -38,7 +39,7 @@ const SUPPORTED_VERSIONS: &[Uuid] = &[ #[async_trait] pub trait CoreSubHandle where - Self: 'static + Debug + Send + Sync + DynClone, + Self: 'static + Debug + Send + Sync + DynClone + task::Spawn, { fn info(&self) -> Info; @@ -56,6 +57,16 @@ where ) -> Result<()>; } +impl task::Spawn for Core { + fn spawn_obj(&self, future: task::FutureObj<'static, ()>) -> Result<(), task::SpawnError> { + self.spawn.spawn_obj(future) + } + + fn status(&self) -> Result<(), task::SpawnError> { + self.spawn.status() + } +} + #[async_trait] impl CoreSubHandle for Arc> where @@ -190,15 +201,17 @@ where // } // } +pub trait CoreSpawn: task::Spawn + Debug + Send + Sync {} + #[derive(Debug)] pub struct Core { + spawn: Box, storage: ST, cryptor: C, key_cryptor: KC, // use sync `std::sync::Mutex` here because it has less overhead than async mutex, we are // holding it for a very shot time and do not `.await` while the lock is held. data: SyncMutex>, - // task_mgr: task::TaskMgr, supported_data_versions: Vec, current_data_version: Uuid, apply_ops_lock: AsyncMutex<()>, @@ -248,6 +261,7 @@ where supported_data_versions.sort_unstable(); let core = Arc::new(Core { + spawn: options.spawn, storage: options.storage, cryptor: options.cryptor, key_cryptor: options.key_cryptor, @@ -729,6 +743,7 @@ where } pub struct OpenOptions { + pub spawn: Box, pub storage: ST, pub cryptor: C, pub key_cryptor: KC, diff --git a/examples/test/.pijulignore b/examples/test/.pijulignore deleted file mode 100644 index 3af0ccb..0000000 --- a/examples/test/.pijulignore +++ /dev/null @@ -1 +0,0 @@ -/data diff --git a/examples/test/src/main.rs b/examples/test/src/main.rs index f426a07..9693313 100644 --- a/examples/test/src/main.rs +++ b/examples/test/src/main.rs @@ -2,11 +2,24 @@ use ::anyhow::Result; use ::crdt_enc_gpgme::KeyHandler; use ::crdt_enc_sodium::EncHandler; use ::crdt_enc_tokio::Storage; +use ::futures::task; use ::uuid::Uuid; const CURRENT_DATA_VERSION: Uuid = Uuid::from_u128(0xaadfd5a6_6e19_4b24_a802_4fa27c72f20c); const SUPPORTED_DATA_VERSIONS: &[Uuid] = &[CURRENT_DATA_VERSION]; +#[derive(Debug)] +struct TokioSpawn; + +impl crdt_enc::CoreSpawn for TokioSpawn {} + +impl task::Spawn for TokioSpawn { + fn spawn_obj(&self, future: task::FutureObj<'static, ()>) -> Result<(), task::SpawnError> { + // drop tokios `JoinHandle` + tokio::spawn(future); + Ok(()) + } +} #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { @@ -18,6 +31,7 @@ async fn main() -> Result<()> { let cryptor = EncHandler::new(); let key_cryptor = KeyHandler::new(); let open_options = crdt_enc::OpenOptions { + spawn: Box::new(TokioSpawn), storage, cryptor, key_cryptor,