Skip to content

Commit

Permalink
Merge pull request #36 from superfly/swim-leave
Browse files Browse the repository at this point in the history
(more) Gracefully leave SWIM cluster
  • Loading branch information
jeromegn authored Aug 24, 2023
2 parents 09a5ab4 + afb28bc commit a97f0de
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 23 deletions.
9 changes: 6 additions & 3 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,11 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
.and_then(|rows| rows.collect::<rusqlite::Result<Vec<String>>>())
{
Ok(foca_states) => {
foca_states.iter().filter_map(|state| match serde_json::from_str(state.as_str()) {
Ok(fs) => Some(fs),
foca_states.iter().filter_map(|state| match serde_json::from_str::<foca::Member<Actor>>(state.as_str()) {
Ok(fs) => match fs.state() {
foca::State::Suspect => None,
_ => Some(fs)
},
Err(e) => {
error!("could not deserialize foca member state: {e} (json: {state})");
None
Expand Down Expand Up @@ -1080,7 +1083,7 @@ async fn handle_notifications(
let added = { agent.members().write().add_member(&actor) };
debug!("Member Up {actor:?} (added: {added})");
if added {
debug!("Member Up {actor:?}");
info!("Member Up {actor:?}");
increment_counter!("corro.gossip.member.added", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string());
// actually added a member
// notify of new cluster size
Expand Down
8 changes: 4 additions & 4 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ fn build_quinn_transport_config(config: &GossipConfig) -> quinn::TransportConfig
.unwrap(),
));

// max 1024 concurrent bidirectional streams
transport_config.max_concurrent_bidi_streams(1024u32.into());
// max concurrent bidirectional streams
transport_config.max_concurrent_bidi_streams(32u32.into());

// max 10240 concurrent unidirectional streams
transport_config.max_concurrent_uni_streams(10240u32.into());
// max concurrent unidirectional streams
transport_config.max_concurrent_uni_streams(512u32.into());

if let Some(max_mtu) = config.max_mtu {
info!("Setting maximum MTU for QUIC at {max_mtu}");
Expand Down
39 changes: 25 additions & 14 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
net::SocketAddr,
num::NonZeroU32,
pin::Pin,
Expand All @@ -16,6 +17,7 @@ use metrics::{gauge, histogram};
use parking_lot::RwLock;
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};
use rusqlite::params;
use spawn::spawn_counted;
use speedy::Writable;
use strum::EnumDiscriminants;
use tokio::{
Expand Down Expand Up @@ -132,7 +134,7 @@ pub fn runtime_loop(

// foca SWIM operations loop.
// NOTE: every turn of that loop should be fast or else we risk being suspected of being down
tokio::spawn({
spawn_counted({
let config = config.clone();
let agent = agent.clone();
let mut tripwire = tripwire.clone();
Expand All @@ -142,7 +144,7 @@ pub fn runtime_loop(

let member_events_chunks =
tokio_stream::wrappers::BroadcastStream::new(member_events.resubscribe())
.chunks_timeout(100, Duration::from_secs(30));
.chunks_timeout(1000, Duration::from_secs(2));
tokio::pin!(member_events_chunks);

#[derive(EnumDiscriminants)]
Expand Down Expand Up @@ -283,6 +285,9 @@ pub fn runtime_loop(
}
}

// extra time for leave message to propagate
tokio::time::sleep(Duration::from_secs(2)).await;

break;
}
Branch::Foca(input) => match input {
Expand Down Expand Up @@ -331,8 +336,12 @@ pub fn runtime_loop(
let splitted: Vec<_> = evts
.iter()
.flatten()
.filter_map(|evt| {
let actor = evt.actor();
.fold(HashMap::new(), |mut acc, evt| {
acc.insert(evt.actor(), evt.as_str());
acc
})
.into_iter()
.filter_map(|(actor, evt)| {
let foca_state = {
// need to bind this...
let foca_state = foca
Expand All @@ -350,9 +359,8 @@ pub fn runtime_loop(
foca_state
};

foca_state.map(|foca_state| {
(actor.id(), actor.addr(), evt.as_str(), foca_state)
})
foca_state
.map(|foca_state| (actor.id(), actor.addr(), evt, foca_state))
})
.collect();

Expand All @@ -372,22 +380,25 @@ pub fn runtime_loop(
let tx = conn.transaction()?;

for (id, address, state, foca_state) in splitted {
tx.prepare_cached(
"
INSERT INTO __corro_members (id, address, state, foca_state)
trace!(
"updating {id} {address} as {state} w/ state: {foca_state:?}",
);
let upserted = tx.prepare_cached("INSERT INTO __corro_members (id, address, state, foca_state)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
address = excluded.address,
state = excluded.state,
foca_state = excluded.foca_state;
",
)?
foca_state = excluded.foca_state;")?
.execute(params![
id,
address.to_string(),
state,
foca_state
])?;

if upserted != 1 {
warn!("did not update member");
}
}

tx.commit()?;
Expand Down Expand Up @@ -691,7 +702,7 @@ fn transmit_broadcast(payload: Bytes, transport: Transport, addr: SocketAddr) {
}

if let Err(e) = stream.finish().await {
warn!("error finishing broadcast uni stream to {addr}: {e}");
debug!("could not finish broadcast uni stream to {addr}: {e}");
}
});
}
9 changes: 8 additions & 1 deletion crates/corro-types/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, net::SocketAddr, ops::Deref};
use std::{fmt, hash::Hash, net::SocketAddr, ops::Deref};

use foca::Identity;
use rusqlite::{
Expand Down Expand Up @@ -105,6 +105,13 @@ pub struct Actor {
bump: u16,
}

impl Hash for Actor {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
self.addr.hash(state);
}
}

impl Actor {
pub fn new(id: ActorId, addr: SocketAddr) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ struct Cli {
long = "config",
short,
global = true,
default_value = "corrosion.toml"
default_value = "/etc/corrosion/config.toml"
)]
config_path: Utf8PathBuf,

Expand Down

0 comments on commit a97f0de

Please sign in to comment.