Skip to content

Commit

Permalink
Merge pull request #2278 from valkey-io/detect_disconnects
Browse files Browse the repository at this point in the history
Integrate passive disconnect handling and automatic reconnect functionality
  • Loading branch information
ikolomi authored Sep 12, 2024
2 parents 7d5de72 + 68c6b3b commit 11da7d9
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 142 deletions.
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ once_cell = "1.18.0"
arcstr = "1.1.5"
sha1_smol = "1.0.0"
nanoid = "0.4.0"
async-trait = { version = "0.1.24" }

[features]
socket-layer = ["directories", "integer-encoding", "num_cpus", "protobuf", "tokio-util"]
Expand Down
10 changes: 8 additions & 2 deletions glide-core/benches/connections_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use redis::{
aio::{ConnectionLike, ConnectionManager, MultiplexedConnection},
cluster::ClusterClientBuilder,
cluster_async::ClusterConnection,
AsyncCommands, ConnectionAddr, ConnectionInfo, RedisConnectionInfo, RedisResult, Value,
AsyncCommands, ConnectionAddr, ConnectionInfo, GlideConnectionOptions, RedisConnectionInfo,
RedisResult, Value,
};
use std::env;
use tokio::runtime::{Builder, Runtime};
Expand Down Expand Up @@ -83,7 +84,12 @@ fn get_connection_info(address: ConnectionAddr) -> redis::ConnectionInfo {
fn multiplexer_benchmark(c: &mut Criterion, address: ConnectionAddr, group: &str) {
benchmark(c, address, "multiplexer", group, |address, runtime| {
let client = redis::Client::open(get_connection_info(address)).unwrap();
runtime.block_on(async { client.get_multiplexed_tokio_connection(None).await.unwrap() })
runtime.block_on(async {
client
.get_multiplexed_tokio_connection(GlideConnectionOptions::default())
.await
.unwrap()
})
});
}

Expand Down
22 changes: 16 additions & 6 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);
pub const DEFAULT_RETRIES: u32 = 3;
pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

// The connection check interval is currently not exposed to the user via ConnectionRequest,
// as improper configuration could negatively impact performance or pub/sub resiliency.
// A 3-second interval provides a reasonable balance between connection validation
// and performance overhead.
pub const CONNECTION_CHECKS_INTERVAL: Duration = Duration::from_secs(3);

pub(super) fn get_port(address: &NodeAddress) -> u16 {
const DEFAULT_PORT: u16 = 6379;
if address.port == 0 {
Expand Down Expand Up @@ -443,19 +449,19 @@ async fn create_cluster_client(
.collect();
let read_from = request.read_from.unwrap_or_default();
let read_from_replicas = !matches!(read_from, ReadFrom::Primary); // TODO - implement different read from replica strategies.
let periodic_checks = match request.periodic_checks {
let periodic_topology_checks = match request.periodic_checks {
Some(PeriodicCheck::Disabled) => None,
Some(PeriodicCheck::Enabled) => Some(DEFAULT_PERIODIC_CHECKS_INTERVAL),
Some(PeriodicCheck::Enabled) => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL),
Some(PeriodicCheck::ManualInterval(interval)) => Some(interval),
None => Some(DEFAULT_PERIODIC_CHECKS_INTERVAL),
None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL),
};
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes)
.connection_timeout(INTERNAL_CONNECTION_TIMEOUT)
.retries(DEFAULT_RETRIES);
if read_from_replicas {
builder = builder.read_from_replicas();
}
if let Some(interval_duration) = periodic_checks {
if let Some(interval_duration) = periodic_topology_checks {
builder = builder.periodic_topology_checks(interval_duration);
}
builder = builder.use_protocol(request.protocol.unwrap_or_default());
Expand All @@ -473,6 +479,10 @@ async fn create_cluster_client(
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
builder = builder.pubsub_subscriptions(pubsub_subscriptions);
}

// Always use with Glide
builder = builder.periodic_connections_checks(CONNECTION_CHECKS_INTERVAL);

let client = builder.build()?;
client.get_async_connection(push_sender).await
}
Expand Down Expand Up @@ -571,7 +581,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
Some(PeriodicCheck::Disabled) => "\nPeriodic Checks: Disabled".to_string(),
Some(PeriodicCheck::Enabled) => format!(
"\nPeriodic Checks: Enabled with default interval of {:?}",
DEFAULT_PERIODIC_CHECKS_INTERVAL
DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL
),
Some(PeriodicCheck::ManualInterval(interval)) => format!(
"\nPeriodic Checks: Enabled with manual interval of {:?}s",
Expand Down
74 changes: 62 additions & 12 deletions glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
*/
use super::{NodeAddress, TlsMode};
use crate::retry_strategies::RetryStrategy;
use async_trait::async_trait;
use futures_intrusive::sync::ManualResetEvent;
use logger_core::{log_debug, log_trace, log_warn};
use redis::aio::MultiplexedConnection;
use redis::{PushInfo, RedisConnectionInfo, RedisError, RedisResult};
use logger_core::{log_debug, log_error, log_trace, log_warn};
use redis::aio::{DisconnectNotifier, MultiplexedConnection};
use redis::{GlideConnectionOptions, PushInfo, RedisConnectionInfo, RedisError, RedisResult};
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Notify};
use tokio::task;
use tokio::time::timeout;
use tokio_retry::Retry;

use super::{run_with_timeout, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT};
Expand Down Expand Up @@ -46,7 +48,7 @@ struct InnerReconnectingConnection {
#[derive(Clone)]
pub(super) struct ReconnectingConnection {
inner: Arc<InnerReconnectingConnection>,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
connection_options: GlideConnectionOptions,
}

impl fmt::Debug for ReconnectingConnection {
Expand All @@ -57,22 +59,59 @@ impl fmt::Debug for ReconnectingConnection {

async fn get_multiplexed_connection(
client: &redis::Client,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
connection_options: &GlideConnectionOptions,
) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
client.get_multiplexed_async_connection(push_sender),
client.get_multiplexed_async_connection(connection_options.clone()),
)
.await
}

#[derive(Clone)]
struct TokioDisconnectNotifier {
disconnect_notifier: Arc<Notify>,
}

#[async_trait]
impl DisconnectNotifier for TokioDisconnectNotifier {
fn notify_disconnect(&mut self) {
self.disconnect_notifier.notify_one();
}

async fn wait_for_disconnect_with_timeout(&self, max_wait: &Duration) {
let _ = timeout(*max_wait, async {
self.disconnect_notifier.notified().await;
})
.await;
}

fn clone_box(&self) -> Box<dyn DisconnectNotifier> {
Box::new(self.clone())
}
}

impl TokioDisconnectNotifier {
fn new() -> TokioDisconnectNotifier {
TokioDisconnectNotifier {
disconnect_notifier: Arc::new(Notify::new()),
}
}
}

async fn create_connection(
connection_backend: ConnectionBackend,
retry_strategy: RetryStrategy,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
let client = &connection_backend.connection_info;
let action = || get_multiplexed_connection(client, push_sender.clone());
let connection_options = GlideConnectionOptions {
push_sender,
disconnect_notifier: Some::<Box<dyn DisconnectNotifier>>(Box::new(
TokioDisconnectNotifier::new(),
)),
};
let action = || get_multiplexed_connection(client, &connection_options);

match Retry::spawn(retry_strategy.get_iterator(), action).await {
Ok(connection) => {
Expand All @@ -91,7 +130,7 @@ async fn create_connection(
state: Mutex::new(ConnectionState::Connected(connection)),
backend: connection_backend,
}),
push_sender,
connection_options,
})
}
Err(err) => {
Expand All @@ -110,7 +149,7 @@ async fn create_connection(
state: Mutex::new(ConnectionState::InitializedDisconnected),
backend: connection_backend,
}),
push_sender,
connection_options,
};
connection.reconnect();
Err((connection, err))
Expand Down Expand Up @@ -220,7 +259,6 @@ impl ReconnectingConnection {
log_debug("reconnect", "starting");

let connection_clone = self.clone();
let push_sender = self.push_sender.clone();
// The reconnect task is spawned instead of awaited here, so that the reconnect attempt will continue in the
// background, regardless of whether the calling task is dropped or not.
task::spawn(async move {
Expand All @@ -234,7 +272,8 @@ impl ReconnectingConnection {
// Client was dropped, reconnection attempts can stop
return;
}
match get_multiplexed_connection(client, push_sender.clone()).await {
match get_multiplexed_connection(client, &connection_clone.connection_options).await
{
Ok(mut connection) => {
if connection
.send_packed_command(&redis::cmd("PING"))
Expand Down Expand Up @@ -268,4 +307,15 @@ impl ReconnectingConnection {
ConnectionState::Reconnecting
)
}

pub async fn wait_for_disconnect_with_timeout(&self, max_wait: &Duration) {
// disconnect_notifier should always exists
if let Some(disconnect_notifier) = &self.connection_options.disconnect_notifier {
disconnect_notifier
.wait_for_disconnect_with_timeout(max_wait)
.await;
} else {
log_error("disconnect notifier", "BUG! Disconnect notifier is not set");
}
}
}
46 changes: 44 additions & 2 deletions glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ use super::reconnecting_connection::ReconnectingConnection;
use super::{ConnectionRequest, NodeAddress, TlsMode};
use crate::retry_strategies::RetryStrategy;
use futures::{future, stream, StreamExt};
#[cfg(feature = "standalone_heartbeat")]
use logger_core::log_debug;
use logger_core::log_warn;
use rand::Rng;
use redis::aio::ConnectionLike;
use redis::cluster_routing::{self, is_readonly_cmd, ResponsePolicy, Routable, RoutingInfo};
use redis::{PushInfo, RedisError, RedisResult, Value};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::mpsc;
#[cfg(feature = "standalone_heartbeat")]
use tokio::task;

#[derive(Debug)]
Expand Down Expand Up @@ -190,6 +189,10 @@ impl StandaloneClient {
Self::start_heartbeat(node.clone());
}

for node in nodes.iter() {
Self::start_periodic_connection_check(node.clone());
}

Ok(Self {
inner: Arc::new(DropWrapper {
primary_index,
Expand Down Expand Up @@ -416,6 +419,45 @@ impl StandaloneClient {
}
});
}

// Monitors passive connection status and reconnects if necessary.
// This function is cheaper alternative to start_heartbeat(),
// as it avoids sending PING commands to the server, checking only the connection state.
fn start_periodic_connection_check(reconnecting_connection: ReconnectingConnection) {
task::spawn(async move {
loop {
reconnecting_connection
.wait_for_disconnect_with_timeout(&super::CONNECTION_CHECKS_INTERVAL)
.await;
// check connection is valid
if reconnecting_connection.is_dropped() {
log_debug(
"StandaloneClient",
"connection checker stopped after connection was dropped",
);
// Client was dropped, checker can stop.
return;
}

let Some(connection) = reconnecting_connection.try_get_connection().await else {
log_debug(
"StandaloneClient",
"connection checker is skipping a connections since its reconnecting",
);
// Client is reconnecting..
continue;
};

if connection.is_closed() {
log_debug(
"StandaloneClient",
"connection checker has triggered reconnect",
);
reconnecting_connection.reconnect();
}
}
});
}
}

async fn get_connection_and_replication_info(
Expand Down
Loading

0 comments on commit 11da7d9

Please sign in to comment.