diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 58f4c4a8..58699929 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -211,7 +211,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age let clock = Arc::new( uhlc::HLCBuilder::default() - .with_id(actor_id.0.into()) + .with_id(actor_id.into()) .with_max_delta(Duration::from_millis(300)) .build(), ); @@ -1972,7 +1972,7 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> { r#" -- internal bookkeeping CREATE TABLE __corro_bookkeeping ( - actor_id TEXT NOT NULL, + actor_id BLOB NOT NULL, start_version INTEGER NOT NULL, end_version INTEGER, db_version INTEGER, @@ -2022,7 +2022,7 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> { -- SWIM memberships CREATE TABLE __corro_members ( - id TEXT PRIMARY KEY NOT NULL, + actor_id BLOB PRIMARY KEY NOT NULL, address TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'down', diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index ba20cb6f..f2864eee 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -609,6 +609,14 @@ pub async fn bidirectional_sync( } }; + let their_actor_id = their_sync_state.actor_id; + + tx.send(SyncMessage::V1(SyncMessageV1::Clock( + agent.clock().new_timestamp().into(), + ))) + .await + .map_err(|_| SyncSendError::ChannelClosed)?; + tokio::spawn( process_sync( agent.actor_id(), @@ -640,6 +648,16 @@ pub async fn bidirectional_sync( warn!("received sync state message more than once, ignoring"); continue; } + SyncMessage::V1(SyncMessageV1::Clock(ts)) => { + if let Err(e) = agent.clock().update_with_timestamp( + &uhlc::Timestamp::new(ts.to_ntp64(), their_actor_id.into()), + ) { + warn!( + "could not update clock from actor {their_actor_id}: {e}" + ); + } + continue; + } }; count += len; } diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index c9045dbc..7f8896ce 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -247,9 +247,9 @@ pub fn runtime_loop( for (id, address, state, foca_state) in states { let db_res = tx.prepare_cached( " - INSERT INTO __corro_members (id, address, state, foca_state) + INSERT INTO __corro_members (actor_id, address, state, foca_state) VALUES (?, ?, ?, ?) - ON CONFLICT (id) DO UPDATE SET + ON CONFLICT (actor_id) DO UPDATE SET address = excluded.address, state = excluded.state, foca_state = excluded.foca_state; @@ -383,9 +383,9 @@ pub fn runtime_loop( trace!( "updating {id} {address} as {state} w/ state: {foca_state:?}", ); - let upserted = tx.prepare_cached("INSERT INTO __corro_members (id, address, state, foca_state) + let upserted = tx.prepare_cached("INSERT INTO __corro_members (actor_id, address, state, foca_state) VALUES (?, ?, ?, ?) - ON CONFLICT (id) DO UPDATE SET + ON CONFLICT (actor_id) DO UPDATE SET address = excluded.address, state = excluded.state, foca_state = excluded.foca_state;")? diff --git a/crates/corro-types/src/actor.rs b/crates/corro-types/src/actor.rs index 5b67402b..267b5f2b 100644 --- a/crates/corro-types/src/actor.rs +++ b/crates/corro-types/src/actor.rs @@ -2,7 +2,7 @@ use std::{fmt, hash::Hash, net::SocketAddr, ops::Deref}; use foca::Identity; use rusqlite::{ - types::{FromSql, FromSqlError, ToSqlOutput}, + types::{FromSql, ToSqlOutput}, ToSql, }; use serde::{Deserialize, Serialize}; @@ -27,6 +27,12 @@ impl ActorId { } } +impl Into for ActorId { + fn into(self) -> uhlc::ID { + self.0.into() + } +} + impl Deref for ActorId { type Target = Uuid; @@ -75,25 +81,13 @@ where impl ToSql for ActorId { fn to_sql(&self) -> rusqlite::Result> { - Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text( - self.0.to_string(), - ))) + self.0.to_sql() } } impl FromSql for ActorId { fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { - match value { - rusqlite::types::ValueRef::Text(s) => Ok(ActorId( - String::from_utf8_lossy(s) - .parse() - .map_err(|e| FromSqlError::Other(Box::new(e)))?, - )), - rusqlite::types::ValueRef::Blob(b) => Ok(ActorId( - Uuid::from_slice(b).map_err(|e| FromSqlError::Other(Box::new(e)))?, - )), - _ => Err(rusqlite::types::FromSqlError::InvalidType), - } + Ok(Self(FromSql::column_result(value)?)) } } diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index eb5baab9..b4927730 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -179,6 +179,10 @@ impl Timestamp { OffsetDateTime::from_unix_timestamp(t.as_secs() as i64).unwrap() + time::Duration::nanoseconds(t.subsec_nanos() as i64) } + + pub fn to_ntp64(&self) -> NTP64 { + NTP64(self.0) + } } impl fmt::Display for Timestamp { diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 22cf267a..21398519 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -8,7 +8,7 @@ use tokio_util::codec::{Decoder, LengthDelimitedCodec}; use crate::{ actor::ActorId, agent::{Booked, Bookie, KnownDbVersion}, - broadcast::ChangeV1, + broadcast::{ChangeV1, Timestamp}, }; #[derive(Debug, Clone, Readable, Writable)] @@ -20,6 +20,7 @@ pub enum SyncMessage { pub enum SyncMessageV1 { State(SyncStateV1), Changeset(ChangeV1), + Clock(Timestamp), } #[derive(Debug, Default, Clone, Readable, Writable, Serialize, Deserialize)]