feat: impl Spawn for Core

so that sub systems are able to spawn tasks
This commit is contained in:
2021-02-21 18:00:26 +01:00
parent 5b928c96f9
commit c0a099909e
3 changed files with 31 additions and 3 deletions

View File

@@ -16,6 +16,7 @@ use ::dyn_clone::DynClone;
use ::futures::{ use ::futures::{
lock::Mutex as AsyncMutex, lock::Mutex as AsyncMutex,
stream::{self, StreamExt, TryStreamExt}, stream::{self, StreamExt, TryStreamExt},
task,
}; };
use ::serde::{de::DeserializeOwned, Deserialize, Serialize}; use ::serde::{de::DeserializeOwned, Deserialize, Serialize};
use ::std::{ use ::std::{
@@ -38,7 +39,7 @@ const SUPPORTED_VERSIONS: &[Uuid] = &[
#[async_trait] #[async_trait]
pub trait CoreSubHandle pub trait CoreSubHandle
where where
Self: 'static + Debug + Send + Sync + DynClone, Self: 'static + Debug + Send + Sync + DynClone + task::Spawn,
{ {
fn info(&self) -> Info; fn info(&self) -> Info;
@@ -56,6 +57,16 @@ where
) -> Result<()>; ) -> Result<()>;
} }
impl<S, ST, C, KC> task::Spawn for Core<S, ST, C, KC> {
fn spawn_obj(&self, future: task::FutureObj<'static, ()>) -> Result<(), task::SpawnError> {
self.spawn.spawn_obj(future)
}
fn status(&self) -> Result<(), task::SpawnError> {
self.spawn.status()
}
}
#[async_trait] #[async_trait]
impl<S, ST, C, KC> CoreSubHandle for Arc<Core<S, ST, C, KC>> impl<S, ST, C, KC> CoreSubHandle for Arc<Core<S, ST, C, KC>>
where where
@@ -190,15 +201,17 @@ where
// } // }
// } // }
pub trait CoreSpawn: task::Spawn + Debug + Send + Sync {}
#[derive(Debug)] #[derive(Debug)]
pub struct Core<S, ST, C, KC> { pub struct Core<S, ST, C, KC> {
spawn: Box<dyn CoreSpawn>,
storage: ST, storage: ST,
cryptor: C, cryptor: C,
key_cryptor: KC, key_cryptor: KC,
// use sync `std::sync::Mutex` here because it has less overhead than async mutex, we are // use sync `std::sync::Mutex` here because it has less overhead than async mutex, we are
// holding it for a very shot time and do not `.await` while the lock is held. // holding it for a very shot time and do not `.await` while the lock is held.
data: SyncMutex<CoreMutData<S>>, data: SyncMutex<CoreMutData<S>>,
// task_mgr: task::TaskMgr,
supported_data_versions: Vec<Uuid>, supported_data_versions: Vec<Uuid>,
current_data_version: Uuid, current_data_version: Uuid,
apply_ops_lock: AsyncMutex<()>, apply_ops_lock: AsyncMutex<()>,
@@ -248,6 +261,7 @@ where
supported_data_versions.sort_unstable(); supported_data_versions.sort_unstable();
let core = Arc::new(Core { let core = Arc::new(Core {
spawn: options.spawn,
storage: options.storage, storage: options.storage,
cryptor: options.cryptor, cryptor: options.cryptor,
key_cryptor: options.key_cryptor, key_cryptor: options.key_cryptor,
@@ -729,6 +743,7 @@ where
} }
pub struct OpenOptions<ST, C, KC> { pub struct OpenOptions<ST, C, KC> {
pub spawn: Box<dyn CoreSpawn>,
pub storage: ST, pub storage: ST,
pub cryptor: C, pub cryptor: C,
pub key_cryptor: KC, pub key_cryptor: KC,

View File

@@ -1 +0,0 @@
/data

View File

@@ -2,11 +2,24 @@ use ::anyhow::Result;
use ::crdt_enc_gpgme::KeyHandler; use ::crdt_enc_gpgme::KeyHandler;
use ::crdt_enc_sodium::EncHandler; use ::crdt_enc_sodium::EncHandler;
use ::crdt_enc_tokio::Storage; use ::crdt_enc_tokio::Storage;
use ::futures::task;
use ::uuid::Uuid; use ::uuid::Uuid;
const CURRENT_DATA_VERSION: Uuid = Uuid::from_u128(0xaadfd5a6_6e19_4b24_a802_4fa27c72f20c); const CURRENT_DATA_VERSION: Uuid = Uuid::from_u128(0xaadfd5a6_6e19_4b24_a802_4fa27c72f20c);
const SUPPORTED_DATA_VERSIONS: &[Uuid] = &[CURRENT_DATA_VERSION]; const SUPPORTED_DATA_VERSIONS: &[Uuid] = &[CURRENT_DATA_VERSION];
#[derive(Debug)]
struct TokioSpawn;
impl crdt_enc::CoreSpawn for TokioSpawn {}
impl task::Spawn for TokioSpawn {
fn spawn_obj(&self, future: task::FutureObj<'static, ()>) -> Result<(), task::SpawnError> {
// drop tokios `JoinHandle`
tokio::spawn(future);
Ok(())
}
}
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@@ -18,6 +31,7 @@ async fn main() -> Result<()> {
let cryptor = EncHandler::new(); let cryptor = EncHandler::new();
let key_cryptor = KeyHandler::new(); let key_cryptor = KeyHandler::new();
let open_options = crdt_enc::OpenOptions { let open_options = crdt_enc::OpenOptions {
spawn: Box::new(TokioSpawn),
storage, storage,
cryptor, cryptor,
key_cryptor, key_cryptor,