refactor: make serialization/deserializaion of version boxes more specific
This commit is contained in:
@@ -7,7 +7,6 @@ use ::futures::{
|
|||||||
stream::{self, Stream, StreamExt, TryStreamExt},
|
stream::{self, Stream, StreamExt, TryStreamExt},
|
||||||
};
|
};
|
||||||
use ::std::{
|
use ::std::{
|
||||||
convert::TryFrom,
|
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
@@ -55,7 +54,7 @@ impl crdt_enc::storage::Storage for Storage {
|
|||||||
.with_context(|| format!("failed reading local meta file {}", path.display()))?;
|
.with_context(|| format!("failed reading local meta file {}", path.display()))?;
|
||||||
bytes
|
bytes
|
||||||
.map(|bytes| {
|
.map(|bytes| {
|
||||||
let lm = VersionBytes::try_from(bytes.as_ref()).with_context(|| {
|
let lm = VersionBytes::deserialize(&bytes).with_context(|| {
|
||||||
format!("failed parsing local meta file {}", path.display())
|
format!("failed parsing local meta file {}", path.display())
|
||||||
})?;
|
})?;
|
||||||
Ok(lm)
|
Ok(lm)
|
||||||
@@ -103,7 +102,7 @@ impl crdt_enc::storage::Storage for Storage {
|
|||||||
let bytes = fs::read(&path).await.with_context(|| {
|
let bytes = fs::read(&path).await.with_context(|| {
|
||||||
format!("failed reading remote meta file {}", path.display())
|
format!("failed reading remote meta file {}", path.display())
|
||||||
})?;
|
})?;
|
||||||
let rm = VersionBytes::try_from(bytes.as_ref()).with_context(|| {
|
let rm = VersionBytes::deserialize(&bytes).with_context(|| {
|
||||||
format!("failed parsing remote meta file {}", path.display())
|
format!("failed parsing remote meta file {}", path.display())
|
||||||
})?;
|
})?;
|
||||||
Ok((name, rm))
|
Ok((name, rm))
|
||||||
@@ -163,7 +162,7 @@ impl crdt_enc::storage::Storage for Storage {
|
|||||||
let block = fs::read(&path)
|
let block = fs::read(&path)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("failed reading state file {}", path.display()))?;
|
.with_context(|| format!("failed reading state file {}", path.display()))?;
|
||||||
let block = VersionBytes::try_from(block.as_ref())
|
let block = VersionBytes::deserialize(&block)
|
||||||
.with_context(|| format!("failed parsing state file {}", path.display()))?;
|
.with_context(|| format!("failed parsing state file {}", path.display()))?;
|
||||||
Ok((name, block))
|
Ok((name, block))
|
||||||
}
|
}
|
||||||
@@ -239,7 +238,7 @@ impl crdt_enc::storage::Storage for Storage {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
let data = VersionBytes::try_from(bytes.as_ref())
|
let data = VersionBytes::deserialize(&bytes)
|
||||||
.with_context(|| format!("failed parsing op file {}", path.display()))?;
|
.with_context(|| format!("failed parsing op file {}", path.display()))?;
|
||||||
|
|
||||||
Ok(Some((actor, version, data)))
|
Ok(Some((actor, version, data)))
|
||||||
|
|||||||
@@ -465,7 +465,7 @@ where
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("failed decrypting remote state {}", name))?;
|
.with_context(|| format!("failed decrypting remote state {}", name))?;
|
||||||
|
|
||||||
let clear_text = VersionBytesRef::from_slice(&clear_text)?;
|
let clear_text = VersionBytesRef::deserialize(&clear_text)?;
|
||||||
clear_text.ensure_versions(&self.supported_data_versions)?;
|
clear_text.ensure_versions(&self.supported_data_versions)?;
|
||||||
|
|
||||||
let state_wrapper: StateWrapper<S> = rmp_serde::from_read_ref(&clear_text)?;
|
let state_wrapper: StateWrapper<S> = rmp_serde::from_read_ref(&clear_text)?;
|
||||||
@@ -524,7 +524,7 @@ where
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let clear_text = VersionBytesRef::from_slice(&clear_text)?;
|
let clear_text = VersionBytesRef::deserialize(&clear_text)?;
|
||||||
clear_text.ensure_versions(&self.supported_data_versions)?;
|
clear_text.ensure_versions(&self.supported_data_versions)?;
|
||||||
|
|
||||||
let ops: Vec<_> = rmp_serde::from_read_ref(&clear_text)?;
|
let ops: Vec<_> = rmp_serde::from_read_ref(&clear_text)?;
|
||||||
@@ -700,7 +700,7 @@ where
|
|||||||
|
|
||||||
let data_enc = self
|
let data_enc = self
|
||||||
.cryptor
|
.cryptor
|
||||||
.encrypt(key.key(), &clear_text.to_vec())
|
.encrypt(key.key(), &clear_text.serialize())
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
mod version_bytes;
|
mod version_bytes;
|
||||||
|
|
||||||
use futures::TryFutureExt;
|
|
||||||
pub use version_bytes::*;
|
pub use version_bytes::*;
|
||||||
|
|
||||||
use ::anyhow::{Context, Result};
|
use ::anyhow::{Context, Result};
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use ::bytes::Buf;
|
use ::bytes::Buf;
|
||||||
use ::serde::{Deserialize, Serialize};
|
use ::serde::{Deserialize, Serialize};
|
||||||
use ::std::{borrow::Cow, convert::TryFrom, fmt, io::IoSlice};
|
use ::std::{borrow::Cow, fmt, io::IoSlice};
|
||||||
use ::uuid::Uuid;
|
use ::uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -41,26 +41,16 @@ impl VersionBytes {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn ensure_version(&self, version: Uuid) -> Result<(), VersionError> {
|
pub fn ensure_version(&self, version: Uuid) -> Result<(), VersionError> {
|
||||||
if self.0 != version {
|
self.as_version_bytes_ref().ensure_version(version)
|
||||||
Err(VersionError {
|
|
||||||
expected: vec![version],
|
|
||||||
got: self.0,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `versions` needs to be sorted!
|
/// `versions` needs to be sorted!
|
||||||
pub fn ensure_versions(&self, versions: &[Uuid]) -> Result<(), VersionError> {
|
pub fn ensure_versions(&self, versions: &[Uuid]) -> Result<(), VersionError> {
|
||||||
if versions.binary_search(&self.0).is_err() {
|
self.as_version_bytes_ref().ensure_versions(versions)
|
||||||
Err(VersionError {
|
|
||||||
expected: versions.to_owned(),
|
|
||||||
got: self.0,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_inner(self) -> Vec<u8> {
|
||||||
|
self.1
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn as_version_bytes_ref(&self) -> VersionBytesRef<'_> {
|
pub fn as_version_bytes_ref(&self) -> VersionBytesRef<'_> {
|
||||||
@@ -71,16 +61,12 @@ impl VersionBytes {
|
|||||||
VersionBytesBuf::new(self.0, &self.1)
|
VersionBytesBuf::new(self.0, &self.1)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_slice(slice: &[u8]) -> Result<VersionBytes, ParseError> {
|
pub fn deserialize(slice: &[u8]) -> Result<VersionBytes, ParseError> {
|
||||||
TryFrom::try_from(slice)
|
Ok(VersionBytesRef::deserialize(slice)?.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn to_vec(&self) -> Vec<u8> {
|
pub fn serialize(&self) -> Vec<u8> {
|
||||||
self.as_version_bytes_ref().to_vec()
|
self.as_version_bytes_ref().serialize()
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_inner(self) -> Vec<u8> {
|
|
||||||
self.1
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,14 +88,6 @@ impl AsRef<[u8]> for VersionBytes {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<&[u8]> for VersionBytes {
|
|
||||||
type Error = ParseError;
|
|
||||||
|
|
||||||
fn try_from(buf: &[u8]) -> Result<VersionBytes, ParseError> {
|
|
||||||
Ok(VersionBytesRef::try_from(buf)?.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub struct VersionBytesRef<'a>(
|
pub struct VersionBytesRef<'a>(
|
||||||
Uuid,
|
Uuid,
|
||||||
@@ -154,11 +132,19 @@ impl<'a> VersionBytesRef<'a> {
|
|||||||
VersionBytesBuf::new(self.0, &self.1)
|
VersionBytesBuf::new(self.0, &self.1)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_slice(slice: &'a [u8]) -> Result<VersionBytesRef<'a>, ParseError> {
|
pub fn deserialize(slice: &'a [u8]) -> Result<VersionBytesRef<'a>, ParseError> {
|
||||||
TryFrom::try_from(slice)
|
if slice.len() < VERSION_LEN {
|
||||||
|
return Err(ParseError::InvalidLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn to_vec(&self) -> Vec<u8> {
|
let mut version = [0; 16];
|
||||||
|
version.copy_from_slice(&slice[0..16]);
|
||||||
|
let version = Uuid::from_bytes(version);
|
||||||
|
|
||||||
|
Ok(VersionBytesRef::new(version, &slice[VERSION_LEN..]))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialize(&self) -> Vec<u8> {
|
||||||
let mut buf = self.buf();
|
let mut buf = self.buf();
|
||||||
let mut vec = Vec::with_capacity(buf.remaining());
|
let mut vec = Vec::with_capacity(buf.remaining());
|
||||||
while buf.has_remaining() {
|
while buf.has_remaining() {
|
||||||
@@ -183,22 +169,6 @@ impl<'a> From<&'a VersionBytes> for VersionBytesRef<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> TryFrom<&'a [u8]> for VersionBytesRef<'a> {
|
|
||||||
type Error = ParseError;
|
|
||||||
|
|
||||||
fn try_from(buf: &'a [u8]) -> Result<VersionBytesRef<'a>, ParseError> {
|
|
||||||
if buf.len() < VERSION_LEN {
|
|
||||||
return Err(ParseError::InvalidLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut version = [0; 16];
|
|
||||||
version.copy_from_slice(&buf[0..16]);
|
|
||||||
let version = Uuid::from_bytes(version);
|
|
||||||
|
|
||||||
Ok(VersionBytesRef::new(version, &buf[VERSION_LEN..]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
pub enum ParseError {
|
pub enum ParseError {
|
||||||
|
|||||||
Reference in New Issue
Block a user