Skip to content

Commit

Permalink
Add allow_missing_slots flag to cluster scan functions
Browse files Browse the repository at this point in the history
  • Loading branch information
avifenesh committed Nov 14, 2024
1 parent 7d72b87 commit c004756
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 128 deletions.
35 changes: 23 additions & 12 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use crate::types::RetryMethod;

pub(crate) const MUTEX_READ_ERR: &str = "Failed to obtain read lock. Poisoned mutex?";
const MUTEX_WRITE_ERR: &str = "Failed to obtain write lock. Poisoned mutex?";
/// This represents an async Redis Cluster connection. It stores the
/// This represents an async Cluster connection. It stores the
/// underlying connections maintained for each node in the cluster, as well
/// as common parameters for connecting to nodes and executing commands.
#[derive(Clone)]
Expand Down Expand Up @@ -144,7 +144,7 @@ where

/// Special handling for `SCAN` command, using `cluster_scan`.
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
Expand All @@ -154,7 +154,8 @@ where
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
/// * `object_type` - An optional [`ObjectType`] enum of requested key data type.
/// * `allow_non_covered_slots` - A boolean flag to allow missing slots in the cluster.
///
/// # Returns
///
Expand All @@ -177,7 +178,7 @@ where
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan(scan_state_rc, None, None).await.unwrap();
/// connection.cluster_scan(scan_state_rc, None, None, false).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
Expand All @@ -196,14 +197,21 @@ where
scan_state_rc: ScanStateRC,
count: Option<usize>,
object_type: Option<ObjectType>,
allow_non_covered_slots: bool,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type);
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
None,
count,
object_type,
allow_non_covered_slots,
);
self.route_cluster_scan(cluster_scan_args).await
}

/// Special handling for `SCAN` command, using `cluster_scan_with_pattern`.
/// It is a special case of [`cluster_scan`], with an additional match pattern.
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
Expand All @@ -214,7 +222,8 @@ where
/// * `match_pattern` - A match pattern of requested keys.
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
/// * `object_type` - An optional [`ObjectType`] enum of requested key data type.
/// * `allow_non_covered_slots` - A boolean flag to allow missing slots in the cluster.
///
/// # Returns
///
Expand All @@ -237,7 +246,7 @@ where
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap();
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None, false).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
Expand All @@ -257,12 +266,14 @@ where
match_pattern: K,
count: Option<usize>,
object_type: Option<ObjectType>,
allow_non_covered_slots: bool,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
Some(match_pattern.to_redis_args().concat()),
count,
object_type,
allow_non_covered_slots,
);
self.route_cluster_scan(cluster_scan_args).await
}
Expand All @@ -282,15 +293,15 @@ where
.map_err(|_| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
"cluster: Unable to send command",
))
})?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
"cluster: Unable to receive command",
)))
})
.map(|response| match response {
Expand Down Expand Up @@ -319,15 +330,15 @@ where
.map_err(|_| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
"cluster: Unable to send command",
))
})?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
"cluster: Unable to receive command",
)))
})
.map(|response| match response {
Expand Down
Loading

0 comments on commit c004756

Please sign in to comment.