Skip to content

Commit

Permalink
Changes in place, accidentally set rustfmt too aggressive...
Browse files Browse the repository at this point in the history
  • Loading branch information
jscatena88 committed Jul 2, 2024
1 parent 8cfafa1 commit ead29dd
Show file tree
Hide file tree
Showing 14 changed files with 610 additions and 264 deletions.
32 changes: 16 additions & 16 deletions src/codec/meta/actor_settings.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use ecdsa::signature::rand_core::CryptoRngCore;
use futures::{AsyncWrite, AsyncWriteExt};
use winnow::binary::le_u8;
use winnow::token::take;
use winnow::Parser;
use winnow::{binary::le_u8, token::take, Parser};

use crate::codec::crypto::{
AccessKey, AsymLockedAccessKey, AsymLockedAccessKeyError, SigningKey, VerifyingKey,
use crate::codec::{
crypto::{AccessKey, AsymLockedAccessKey, AsymLockedAccessKeyError, SigningKey, VerifyingKey},
header::AccessMask,
meta::{UserAgent, VectorClockActor, VectorClockActorSnapshot},
ParserResult, Stream,
};
use crate::codec::header::AccessMask;
use crate::codec::meta::{UserAgent, VectorClock, VectorClockSnapshot};
use crate::codec::{ParserResult, Stream};

use super::ActorId;

const KEY_PRESENT_BIT: u8 = 0b0000_0001;

#[derive(Clone, Debug)]
pub struct ActorSettings {
verifying_key: VerifyingKey,
vector_clock: VectorClock,
vector_clock: VectorClockActor,

access_mask: AccessMask,
filesystem_key: Option<AsymLockedAccessKey>,
Expand Down Expand Up @@ -80,7 +80,7 @@ impl ActorSettings {
let mut written_bytes = 0;

written_bytes += self.verifying_key.encode(writer).await?;
written_bytes += self.vector_clock.encode(writer).await?;
written_bytes += self.vector_clock.to_snapshot().encode(writer).await?;
written_bytes += self.access_mask.encode(writer).await?;
written_bytes += self.user_agent().encode(writer).await?;

Expand Down Expand Up @@ -176,8 +176,8 @@ impl ActorSettings {
Ok(Some(open_key))
}

pub fn new(verifying_key: VerifyingKey, access_mask: AccessMask) -> Self {
let vector_clock = VectorClock::initialize();
pub fn new(verifying_key: VerifyingKey, access_mask: AccessMask, actor_id: ActorId) -> Self {
let vector_clock = VectorClockActor::initialize(actor_id);
let user_agent = UserAgent::current();

Self {
Expand All @@ -195,7 +195,7 @@ impl ActorSettings {

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, verifying_key) = VerifyingKey::parse(input)?;
let (input, vector_clock) = VectorClock::parse(input)?;
let (input, vector_clock) = VectorClockActor::parse(input)?;
let (input, access_mask) = AccessMask::parse(input)?;
let (input, user_agent) = UserAgent::parse(input)?;

Expand All @@ -220,7 +220,7 @@ impl ActorSettings {

pub const fn size() -> usize {
VerifyingKey::size()
+ VectorClock::size()
+ VectorClockActorSnapshot::size()
+ AccessMask::size()
+ UserAgent::size()
+ 3 * (1 + AsymLockedAccessKey::size())
Expand All @@ -234,8 +234,8 @@ impl ActorSettings {
self.user_agent.clone()
}

pub fn vector_clock(&self) -> VectorClockSnapshot {
VectorClockSnapshot::from(&self.vector_clock)
pub fn vector_clock(&self) -> VectorClockActorSnapshot {
(&self.vector_clock).into()
}

pub fn verifying_key(&self) -> VerifyingKey {
Expand Down
8 changes: 7 additions & 1 deletion src/codec/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ pub use journal_checkpoint::JournalCheckpoint;
pub use meta_key::MetaKey;
pub use permanent_id::PermanentId;
pub use user_agent::UserAgent;
pub use vector_clock::{VectorClock, VectorClockSnapshot};
pub use vector_clock::{
Actor as VectorClockActor, ActorSnapshot as VectorClockActorSnapshot,
Filesystem as VectorClockFilesystem,
FilesystemActorSnapshot as VectorClockFilesystemActorSnapshot,
FilesystemSnapshot as VectorClockFilesystemSnapshot, Node as VectorClockNode,
NodeActorSnapshot as VectorClockNodeActorSnapshot, NodeSnapshot as VectorClockNodeSnapshot,
};
87 changes: 87 additions & 0 deletions src/codec/meta/vector_clock/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use super::clock_inner::{ClockInner, ClockInnerSnapshot};
use crate::codec::{ActorId, ParserResult, Stream};

use futures::AsyncWrite;

#[derive(Debug, PartialEq, Clone)]
pub struct Actor {
id: ActorId,
clock: ClockInner,
}

impl Actor {
fn new(id: ActorId, clock: ClockInner) -> Self {
Self { id, clock }
}

pub fn initialize(actor_id: ActorId) -> Self {
Self::new(actor_id, ClockInner::initialize())
}

pub fn to_snapshot(&self) -> ActorSnapshot {
self.into()
}

pub async fn encode<W: AsyncWrite + Unpin + Send>(
&self,
writer: &mut W,
) -> std::io::Result<usize> {
ActorSnapshot::from(self).encode(writer).await
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, snapshot) = ActorSnapshot::parse(input)?;
Ok((input, Self::new(snapshot.id, snapshot.clock.into())))
}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct ActorSnapshot {
id: ActorId,
clock: ClockInnerSnapshot,
}

impl ActorSnapshot {
fn new(id: ActorId, clock: ClockInnerSnapshot) -> Self {
Self { id, clock }
}

pub const fn size() -> usize {
ActorId::size() + ClockInnerSnapshot::size()
}

pub async fn encode<W: AsyncWrite + Unpin + Send>(
&self,
writer: &mut W,
) -> std::io::Result<usize> {
self.id.encode(writer).await?;
self.clock.encode(writer).await
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, id) = ActorId::parse(input)?;
let (input, clock) = ClockInnerSnapshot::parse(input)?;
Ok((input, Self::new(id, clock)))
}
}

impl PartialOrd for ActorSnapshot {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ActorSnapshot {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.id.cmp(&other.id).then(self.clock.cmp(&other.clock))
}
}

impl From<&Actor> for ActorSnapshot {
fn from(value: &Actor) -> Self {
Self {
id: value.id,
clock: (&value.clock).into(),
}
}
}
197 changes: 197 additions & 0 deletions src/codec/meta/vector_clock/clock_inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use futures::{AsyncWrite, AsyncWriteExt};
use std::{
cmp::PartialEq,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use winnow::{binary::le_u64, Parser};

use crate::codec::{ParserResult, Stream};

/// Vector clocks are used as monotonic clocks for a particular actor or resource within the
/// filesystem and is used for providing strict ordering of events. The internal value is
/// initialized to a random value when a new one is initialized.
///
/// Internally this uses an atomic 64 unsigned integer for the ID, wrapping is an allowed behavior
/// and must be handled by all consumers. We consider a roll over valid if the last known ID was
/// within 262,144 (2^18) ticks of rolling over.

#[derive(Debug, Clone)]
pub struct ClockInner(Arc<AtomicU64>);

impl ClockInner {
pub fn initialize() -> Self {
// TODO: make this actually initialize to a random value as the docs above indicate
Self::from(ClockInnerSnapshot::from(0))
}

pub fn to_snapshot(&self) -> ClockInnerSnapshot {
ClockInnerSnapshot::from(self)
}
}

impl From<ClockInnerSnapshot> for ClockInner {
fn from(val: ClockInnerSnapshot) -> Self {
Self(Arc::new(AtomicU64::new(val.0)))
}
}

impl PartialEq for ClockInner {
fn eq(&self, other: &Self) -> bool {
self.0
.load(Ordering::Relaxed)
.eq(&other.0.load(Ordering::Relaxed))
}
}

const WRAP_THRESHOLD: u64 = 2 ^ 18;

/// A snapshot of a [`VectorClock`] at a specific value
///
/// These are what get stored to record the state of a vector
/// during specific operations
///
/// # Wrapping Behavior
/// These must functionally monotonically increase, but if we overflow
/// the underlying value we will wrap around. This is handled by
/// comparing the values with a threshold to determine if the
/// wrapped value is greater than the non-wrapped value.
/// The threshold is 2^18, or 262,144.
/// [`PartialOrd`] and [`Ord`] are implemented to handle this.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct ClockInnerSnapshot(pub(super) u64);

impl ClockInnerSnapshot {
pub async fn encode<W: AsyncWrite + Unpin + Send>(
&self,
writer: &mut W,
) -> std::io::Result<usize> {
let clock_bytes = self.0.to_le_bytes();

writer.write_all(&clock_bytes).await?;

Ok(clock_bytes.len())
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, value) = le_u64.parse_peek(input)?;
Ok((input, Self(value)))
}

pub const fn size() -> usize {
8
}
}

impl From<&ClockInner> for ClockInnerSnapshot {
fn from(value: &ClockInner) -> Self {
Self(value.0.load(Ordering::Relaxed))
}
}

impl From<u64> for ClockInnerSnapshot {
fn from(value: u64) -> Self {
Self(value)
}
}

impl PartialOrd for ClockInnerSnapshot {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ClockInnerSnapshot {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.0 < WRAP_THRESHOLD || other.0 < WRAP_THRESHOLD {
self.0
.wrapping_add(WRAP_THRESHOLD)
.cmp(&other.0.wrapping_add(WRAP_THRESHOLD))
} else {
self.0.cmp(&other.0)
}
}
}

#[cfg(test)]
mod test {
use super::*;
use winnow::Partial;

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::*;

/// Test PartialOrd implementation
#[test]
fn test_partial_ord() {
// Basic ordering
assert!(ClockInnerSnapshot(1) < ClockInnerSnapshot(2));
assert!(ClockInnerSnapshot(2) > ClockInnerSnapshot(1));
assert!(ClockInnerSnapshot(1) == ClockInnerSnapshot(1));

// Wrapping
assert!(ClockInnerSnapshot(u64::MAX) < ClockInnerSnapshot(0));
assert!(ClockInnerSnapshot(0) > ClockInnerSnapshot(u64::MAX));
assert!(ClockInnerSnapshot(u64::MAX) < ClockInnerSnapshot(WRAP_THRESHOLD - 1));
assert!(ClockInnerSnapshot(u64::MAX) > ClockInnerSnapshot(WRAP_THRESHOLD));
}

/// Test Ord implementation
#[test]
fn test_ord() {
// Basic ordering
assert_eq!(
ClockInnerSnapshot(1).cmp(&ClockInnerSnapshot(2)),
std::cmp::Ordering::Less
);
assert_eq!(
ClockInnerSnapshot(2).cmp(&ClockInnerSnapshot(1)),
std::cmp::Ordering::Greater
);
assert_eq!(
ClockInnerSnapshot(1).cmp(&ClockInnerSnapshot(1)),
std::cmp::Ordering::Equal
);

// Wrapping
assert_eq!(
ClockInnerSnapshot(u64::MAX).cmp(&ClockInnerSnapshot(0)),
std::cmp::Ordering::Less
);
assert_eq!(
ClockInnerSnapshot(0).cmp(&ClockInnerSnapshot(u64::MAX)),
std::cmp::Ordering::Greater
);
assert_eq!(
ClockInnerSnapshot(u64::MAX).cmp(&ClockInnerSnapshot(WRAP_THRESHOLD - 1)),
std::cmp::Ordering::Less
);
assert_eq!(
ClockInnerSnapshot(u64::MAX).cmp(&ClockInnerSnapshot(WRAP_THRESHOLD)),
std::cmp::Ordering::Greater
);
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test(async))]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn test_user_agent_roundtrip() {
let checkpoint = ClockInner::initialize();

let mut buffer = Vec::with_capacity(ClockInnerSnapshot::size());
checkpoint
.to_snapshot()
.encode(&mut buffer)
.await
.expect("encoding success");

let partial = Partial::new(buffer.as_slice());
let (remaining, parsed) = ClockInnerSnapshot::parse(partial).expect("round trip");

let parsed = ClockInner::from(parsed);

assert!(remaining.is_empty());
assert_eq!(checkpoint, parsed);
}
}
Loading

0 comments on commit ead29dd

Please sign in to comment.