Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various clean ups and changes #38

Merged
merged 7 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let clock = Arc::new(
uhlc::HLCBuilder::default()
.with_id(actor_id.0.into())
.with_id(actor_id.into())
.with_max_delta(Duration::from_millis(300))
.build(),
);
Expand Down Expand Up @@ -1972,7 +1972,7 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> {
r#"
-- internal bookkeeping
CREATE TABLE __corro_bookkeeping (
actor_id TEXT NOT NULL,
actor_id BLOB NOT NULL,
start_version INTEGER NOT NULL,
end_version INTEGER,
db_version INTEGER,
Expand Down Expand Up @@ -2022,7 +2022,7 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> {

-- SWIM memberships
CREATE TABLE __corro_members (
id TEXT PRIMARY KEY NOT NULL,
actor_id BLOB PRIMARY KEY NOT NULL,
address TEXT NOT NULL,

state TEXT NOT NULL DEFAULT 'down',
Expand Down
18 changes: 18 additions & 0 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,14 @@ pub async fn bidirectional_sync(
}
};

let their_actor_id = their_sync_state.actor_id;

tx.send(SyncMessage::V1(SyncMessageV1::Clock(
agent.clock().new_timestamp().into(),
)))
.await
.map_err(|_| SyncSendError::ChannelClosed)?;

tokio::spawn(
process_sync(
agent.actor_id(),
Expand Down Expand Up @@ -640,6 +648,16 @@ pub async fn bidirectional_sync(
warn!("received sync state message more than once, ignoring");
continue;
}
SyncMessage::V1(SyncMessageV1::Clock(ts)) => {
if let Err(e) = agent.clock().update_with_timestamp(
&uhlc::Timestamp::new(ts.to_ntp64(), their_actor_id.into()),
) {
warn!(
"could not update clock from actor {their_actor_id}: {e}"
);
}
continue;
}
};
count += len;
}
Expand Down
8 changes: 4 additions & 4 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ pub fn runtime_loop(
for (id, address, state, foca_state) in states {
let db_res = tx.prepare_cached(
"
INSERT INTO __corro_members (id, address, state, foca_state)
INSERT INTO __corro_members (actor_id, address, state, foca_state)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
ON CONFLICT (actor_id) DO UPDATE SET
address = excluded.address,
state = excluded.state,
foca_state = excluded.foca_state;
Expand Down Expand Up @@ -383,9 +383,9 @@ pub fn runtime_loop(
trace!(
"updating {id} {address} as {state} w/ state: {foca_state:?}",
);
let upserted = tx.prepare_cached("INSERT INTO __corro_members (id, address, state, foca_state)
let upserted = tx.prepare_cached("INSERT INTO __corro_members (actor_id, address, state, foca_state)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
ON CONFLICT (actor_id) DO UPDATE SET
address = excluded.address,
state = excluded.state,
foca_state = excluded.foca_state;")?
Expand Down
24 changes: 9 additions & 15 deletions crates/corro-types/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt, hash::Hash, net::SocketAddr, ops::Deref};

use foca::Identity;
use rusqlite::{
types::{FromSql, FromSqlError, ToSqlOutput},
types::{FromSql, ToSqlOutput},
ToSql,
};
use serde::{Deserialize, Serialize};
Expand All @@ -27,6 +27,12 @@ impl ActorId {
}
}

impl Into<uhlc::ID> for ActorId {
fn into(self) -> uhlc::ID {
self.0.into()
}
}

impl Deref for ActorId {
type Target = Uuid;

Expand Down Expand Up @@ -75,25 +81,13 @@ where

impl ToSql for ActorId {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text(
self.0.to_string(),
)))
self.0.to_sql()
}
}

impl FromSql for ActorId {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
match value {
rusqlite::types::ValueRef::Text(s) => Ok(ActorId(
String::from_utf8_lossy(s)
.parse()
.map_err(|e| FromSqlError::Other(Box::new(e)))?,
)),
rusqlite::types::ValueRef::Blob(b) => Ok(ActorId(
Uuid::from_slice(b).map_err(|e| FromSqlError::Other(Box::new(e)))?,
)),
_ => Err(rusqlite::types::FromSqlError::InvalidType),
}
Ok(Self(FromSql::column_result(value)?))
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ impl Timestamp {
OffsetDateTime::from_unix_timestamp(t.as_secs() as i64).unwrap()
+ time::Duration::nanoseconds(t.subsec_nanos() as i64)
}

pub fn to_ntp64(&self) -> NTP64 {
NTP64(self.0)
}
}

impl fmt::Display for Timestamp {
Expand Down
3 changes: 2 additions & 1 deletion crates/corro-types/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio_util::codec::{Decoder, LengthDelimitedCodec};
use crate::{
actor::ActorId,
agent::{Booked, Bookie, KnownDbVersion},
broadcast::ChangeV1,
broadcast::{ChangeV1, Timestamp},
};

#[derive(Debug, Clone, Readable, Writable)]
Expand All @@ -20,6 +20,7 @@ pub enum SyncMessage {
pub enum SyncMessageV1 {
State(SyncStateV1),
Changeset(ChangeV1),
Clock(Timestamp),
}

#[derive(Debug, Default, Clone, Readable, Writable, Serialize, Deserialize)]
Expand Down