diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 39e547c85b..61ed57697e 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -111,7 +111,7 @@ use crate::types::RetryMethod; pub(crate) const MUTEX_READ_ERR: &str = "Failed to obtain read lock. Poisoned mutex?"; const MUTEX_WRITE_ERR: &str = "Failed to obtain write lock. Poisoned mutex?"; -/// This represents an async Redis Cluster connection. It stores the +/// This represents an async Cluster connection. It stores the /// underlying connections maintained for each node in the cluster, as well /// as common parameters for connecting to nodes and executing commands. #[derive(Clone)] @@ -144,7 +144,7 @@ where /// Special handling for `SCAN` command, using `cluster_scan`. /// If you wish to use a match pattern, use [`cluster_scan_with_pattern`]. - /// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology + /// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology /// and make sure that all keys that were in the cluster from start to end of the scan are scanned. /// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance. /// @@ -154,7 +154,8 @@ where /// for each subsequent iteration use the returned [`ScanStateRC`]. /// * `count` - An optional count of keys requested, /// the amount returned can vary and not obligated to return exactly count. - /// * `object_type` - An optional [`ObjectType`] enum of requested key redis type. + /// * `object_type` - An optional [`ObjectType`] enum of requested key data type. + /// * `allow_non_covered_slots` - A boolean flag to allow missing slots in the cluster. /// /// # Returns /// @@ -177,7 +178,7 @@ where /// let mut keys: Vec = vec![]; /// loop { /// let (next_cursor, scan_keys): (ScanStateRC, Vec) = - /// connection.cluster_scan(scan_state_rc, None, None).await.unwrap(); + /// connection.cluster_scan(scan_state_rc, None, None, false).await.unwrap(); /// scan_state_rc = next_cursor; /// let mut scan_keys = scan_keys /// .into_iter() @@ -196,14 +197,21 @@ where scan_state_rc: ScanStateRC, count: Option, object_type: Option, + allow_non_covered_slots: bool, ) -> RedisResult<(ScanStateRC, Vec)> { - let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type); + let cluster_scan_args = ClusterScanArgs::new( + scan_state_rc, + None, + count, + object_type, + allow_non_covered_slots, + ); self.route_cluster_scan(cluster_scan_args).await } /// Special handling for `SCAN` command, using `cluster_scan_with_pattern`. /// It is a special case of [`cluster_scan`], with an additional match pattern. - /// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology + /// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology /// and make sure that all keys that were in the cluster from start to end of the scan are scanned. /// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance. /// @@ -214,7 +222,8 @@ where /// * `match_pattern` - A match pattern of requested keys. /// * `count` - An optional count of keys requested, /// the amount returned can vary and not obligated to return exactly count. - /// * `object_type` - An optional [`ObjectType`] enum of requested key redis type. + /// * `object_type` - An optional [`ObjectType`] enum of requested key data type. + /// * `allow_non_covered_slots` - A boolean flag to allow missing slots in the cluster. /// /// # Returns /// @@ -237,7 +246,7 @@ where /// let mut keys: Vec = vec![]; /// loop { /// let (next_cursor, scan_keys): (ScanStateRC, Vec) = - /// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap(); + /// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None, false).await.unwrap(); /// scan_state_rc = next_cursor; /// let mut scan_keys = scan_keys /// .into_iter() @@ -257,12 +266,14 @@ where match_pattern: K, count: Option, object_type: Option, + allow_non_covered_slots: bool, ) -> RedisResult<(ScanStateRC, Vec)> { let cluster_scan_args = ClusterScanArgs::new( scan_state_rc, Some(match_pattern.to_redis_args().concat()), count, object_type, + allow_non_covered_slots, ); self.route_cluster_scan(cluster_scan_args).await } @@ -282,7 +293,7 @@ where .map_err(|_| { RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to send command", + "cluster: Unable to send command", )) })?; receiver @@ -290,7 +301,7 @@ where .unwrap_or_else(|_| { Err(RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to receive command", + "cluster: Unable to receive command", ))) }) .map(|response| match response { @@ -319,7 +330,7 @@ where .map_err(|_| { RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to send command", + "cluster: Unable to send command", )) })?; receiver @@ -327,7 +338,7 @@ where .unwrap_or_else(|_| { Err(RedisError::from(io::Error::new( io::ErrorKind::BrokenPipe, - "redis_cluster: Unable to receive command", + "cluster: Unable to receive command", ))) }) .map(|response| match response { diff --git a/glide-core/redis-rs/redis/src/commands/cluster_scan.rs b/glide-core/redis-rs/redis/src/commands/cluster_scan.rs index 0fccb0e6f5..1770f6591f 100644 --- a/glide-core/redis-rs/redis/src/commands/cluster_scan.rs +++ b/glide-core/redis-rs/redis/src/commands/cluster_scan.rs @@ -6,11 +6,10 @@ use crate::cluster_async::{ use crate::cluster_routing::SlotAddr; use crate::cluster_topology::SLOT_SIZE; use crate::{cmd, from_redis_value, Cmd, ErrorKind, RedisError, RedisResult, Value}; -use async_trait::async_trait; use std::sync::Arc; use strum_macros::Display; -/// This module contains the implementation of scanning operations in a Redis cluster. +/// This module contains the implementation of scanning operations in a cluster. /// /// The [`ClusterScanArgs`] struct represents the arguments for a cluster scan operation, /// including the scan state reference, match pattern, count, and object type. @@ -18,10 +17,10 @@ use strum_macros::Display; /// The [[`ScanStateRC`]] struct is a wrapper for managing the state of a scan operation in a cluster. /// It holds a reference to the scan state and provides methods for accessing the state. /// -/// The [[`ClusterInScan`]] trait defines the methods for interacting with a Redis cluster during scanning, +/// The [[`ClusterInScan`]] trait defines the methods for interacting with a cluster during scanning, /// including retrieving address information, refreshing slot mapping, and routing commands to specific address. /// -/// The [[`ScanState`]] struct represents the state of a scan operation in a Redis cluster. +/// The [[`ScanState`]] struct represents the state of a scan operation in a cluster. /// It holds information about the current scan state, including the cursor position, scanned slots map, /// address being scanned, and address's epoch. @@ -31,28 +30,42 @@ const BITS_ARRAY_SIZE: usize = NUM_OF_SLOTS / BITS_PER_U64; const END_OF_SCAN: u16 = NUM_OF_SLOTS as u16 + 1; type SlotsBitsArray = [u64; BITS_ARRAY_SIZE]; +// Used in cases where allow non covered slots is set to true, +// and the while iterating in order to find responsive slot we end up with no address to reach, +// This used as a flag to the caller that the string return is not an address, and the scan ends. +const NO_ADDRESSES_FOUND: &str = "NO_ADDRESSES_FOUND"; + +/// Arguments for performing a cluster-wide `SCAN` operation. +/// +/// This struct holds the parameters needed to perform a scan across a cluster, +/// including the scan state, match pattern, count, object type, and whether to allow +/// non-covered slots. #[derive(Clone)] pub(crate) struct ClusterScanArgs { pub(crate) scan_state_cursor: ScanStateRC, match_pattern: Option>, count: Option, object_type: Option, + allow_non_covered_slots: bool, } +/// Represents the type of an object used to filter keys by data type. +/// +/// This enum is used with the `TYPE` option in the `SCAN` command to +/// filter keys by their data type. #[derive(Debug, Clone, Display)] -/// Represents the type of an object in Redis. pub enum ObjectType { - /// Represents a string object in Redis. + /// String data type. String, - /// Represents a list object in Redis. + /// List data type. List, - /// Represents a set object in Redis. + /// Set data type. Set, - /// Represents a sorted set object in Redis. + /// Sorted set data type. ZSet, - /// Represents a hash object in Redis. + /// Hash data type. Hash, - /// Represents a stream object in Redis. + /// Stream data type. Stream, } @@ -62,12 +75,14 @@ impl ClusterScanArgs { match_pattern: Option>, count: Option, object_type: Option, + allow_non_covered_slots: bool, ) -> Self { Self { scan_state_cursor, match_pattern, count, object_type, + allow_non_covered_slots, } } } @@ -80,10 +95,11 @@ pub enum ScanStateStage { Finished, } +/// Wrapper struct for managing the state of a cluster scan operation. +/// +/// This struct holds an `Arc` to the actual scan state and a status indicating +/// whether the scan is initiating, in progress, or finished. #[derive(Debug, Clone, Default)] -/// A wrapper struct for managing the state of a scan operation in a cluster. -/// It holds a reference to the scan state and provides methods for accessing the state. -/// The `status` field indicates the status of the scan operation. pub struct ScanStateRC { scan_state_rc: Arc>, status: ScanStateStage, @@ -130,11 +146,18 @@ impl ScanStateRC { } } -/// This trait defines the methods for interacting with a Redis cluster during scanning. -#[async_trait] +/// Trait defining methods for interacting with a cluster during scanning. +/// +/// Implementors of this trait provide functionalities such as retrieving the address +/// for a slot, getting the epoch of an address, routing commands, and checking cluster coverage. pub(crate) trait ClusterInScan { /// Retrieves the address associated with a given slot in the cluster. - async fn get_address_by_slot(&self, slot: u16) -> RedisResult>; + async fn get_address_by_slot( + &self, + slot: u16, + scanned_slots_map: &mut SlotsBitsArray, + allow_non_covered_slots: bool, + ) -> RedisResult>; /// Retrieves the epoch of a given address in the cluster. /// The epoch represents the version of the address, which is updated when a failover occurs or slots migrate in. @@ -143,7 +166,7 @@ pub(crate) trait ClusterInScan { /// Retrieves the slots assigned to a given address in the cluster. async fn get_slots_of_address(&self, address: Arc) -> Vec; - /// Routes a Redis command to a specific address in the cluster. + /// Routes a command to a specific address in the cluster. async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult; /// Check if all slots are covered by the cluster @@ -153,10 +176,10 @@ pub(crate) trait ClusterInScan { async fn refresh_if_topology_changed(&self) -> RedisResult; } -/// Represents the state of a scan operation in a Redis cluster. +/// Represents the state of a cluster scan operation. /// -/// This struct holds information about the current scan state, including the cursor position, -/// the scanned slots map, the address being scanned, and the address's epoch. +/// This struct keeps track of the current cursor, which slots have been scanned, +/// the address currently being scanned, and the epoch of that address. #[derive(PartialEq, Debug, Clone)] pub(crate) struct ScanState { // the real cursor in the scan operation @@ -217,10 +240,20 @@ impl ScanState { /// and the address set to the address associated with slot 0. /// The address epoch is set to the epoch of the address. /// If the address epoch cannot be retrieved, the method returns an error. - async fn initiate_scan(connection: &C) -> RedisResult { - let new_scanned_slots_map: SlotsBitsArray = [0; BITS_ARRAY_SIZE]; + async fn initiate_scan( + connection: &C, + allow_non_covered_slots: bool, + ) -> RedisResult { + let mut new_scanned_slots_map: SlotsBitsArray = [0; BITS_ARRAY_SIZE]; let new_cursor = 0; - let address = connection.get_address_by_slot(0).await?; + let address = connection + .get_address_by_slot(0, &mut new_scanned_slots_map, allow_non_covered_slots) + .await?; + + if *address == NO_ADDRESSES_FOUND { + return Ok(ScanState::create_finished_state()); + } + let address_epoch = connection.get_address_epoch(&address).await.unwrap_or(0); Ok(ScanState::new( new_cursor, @@ -256,25 +289,35 @@ impl ScanState { async fn creating_state_without_slot_changes( &self, connection: &C, + allow_non_covered_slots: bool, ) -> RedisResult { - let next_slot = self.get_next_slot(&self.scanned_slots_map).unwrap_or(0); + let mut scanned_slots_map = self.scanned_slots_map; + let next_slot = self.get_next_slot(&scanned_slots_map).unwrap_or(0); let new_address = if next_slot == END_OF_SCAN { return Ok(ScanState::create_finished_state()); } else { - connection.get_address_by_slot(next_slot).await + connection + .get_address_by_slot(next_slot, &mut scanned_slots_map, allow_non_covered_slots) + .await? }; - match new_address { - Ok(address) => { - let new_epoch = connection.get_address_epoch(&address).await.unwrap_or(0); + + match new_address.as_ref().as_str() { + NO_ADDRESSES_FOUND => { + return Ok(ScanState::create_finished_state()); + } + _ => { + let new_epoch = connection + .get_address_epoch(&new_address) + .await + .unwrap_or(0); Ok(ScanState::new( 0, - self.scanned_slots_map, - address, + scanned_slots_map, + new_address, new_epoch, ScanStateStage::InProgress, )) } - Err(err) => Err(err), } } @@ -287,6 +330,7 @@ impl ScanState { async fn create_updated_scan_state_for_completed_address( &mut self, connection: &C, + allow_non_covered_slots: bool, ) -> RedisResult { connection .refresh_if_topology_changed() @@ -306,7 +350,9 @@ impl ScanState { .await .unwrap_or(0); if new_address_epoch != self.address_epoch { - return self.creating_state_without_slot_changes(connection).await; + return self + .creating_state_without_slot_changes(connection, allow_non_covered_slots) + .await; } // If epoch wasn't changed, the slots owned by the address after the refresh are all valid as slots that been scanned // So we will update the scanned_slots_map with the slots owned by the address @@ -323,51 +369,73 @@ impl ScanState { let new_address = if next_slot == END_OF_SCAN { return Ok(ScanState::create_finished_state()); } else { - connection.get_address_by_slot(next_slot).await + connection + .get_address_by_slot(next_slot, &mut scanned_slots_map, allow_non_covered_slots) + .await? }; - match new_address { - Ok(new_address) => { - let new_epoch = connection - .get_address_epoch(&new_address) - .await - .unwrap_or(0); - let new_cursor = 0; - Ok(ScanState::new( - new_cursor, - scanned_slots_map, - new_address, - new_epoch, - ScanStateStage::InProgress, - )) - } - Err(err) => Err(err), + + if *new_address == NO_ADDRESSES_FOUND { + return Ok(ScanState::create_finished_state()); } + let new_epoch = connection + .get_address_epoch(&new_address) + .await + .unwrap_or(0); + let new_cursor = 0; + Ok(ScanState::new( + new_cursor, + scanned_slots_map, + new_address, + new_epoch, + ScanStateStage::InProgress, + )) } } // Implement the [`ClusterInScan`] trait for [`InnerCore`] of async cluster connection. -#[async_trait] impl ClusterInScan for Core where C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - async fn get_address_by_slot(&self, slot: u16) -> RedisResult> { - let address = self - .get_address_from_slot(slot, SlotAddr::ReplicaRequired) - .await; - match address { - Some(addr) => Ok(addr), - None => { - if self.are_all_slots_covered().await { - Err(RedisError::from(( + async fn get_address_by_slot( + &self, + mut slot: u16, + scanned_slots_map: &mut SlotsBitsArray, + allow_non_covered_slots: bool, + ) -> RedisResult> { + loop { + let address = self + .get_address_from_slot(slot, SlotAddr::ReplicaRequired) + .await; + + match address { + Some(addr) => return Ok(addr), + None => { + if allow_non_covered_slots { + // Mark the current slot as scanned before moving to next + let slot_index = slot as usize / BITS_PER_U64; + let slot_bit = slot as usize % BITS_PER_U64; + scanned_slots_map[slot_index] |= 1 << slot_bit; + + slot += 1; + if slot == NUM_OF_SLOTS as u16 { + return Ok(NO_ADDRESSES_FOUND.to_string().into()); + } + continue; + } + + if self.are_all_slots_covered().await { + return + Err::, RedisError>(RedisError::from(( ErrorKind::IoError, "Failed to get connection to the node cover the slot, please check the cluster configuration ", - ))) - } else { - Err(RedisError::from(( + ))); + } + return + Err::, RedisError>(RedisError::from(( ErrorKind::NotAllSlotsCovered, "All slots are not covered by the cluster, please check the cluster configuration ", - ))) + ))); } } } @@ -411,19 +479,19 @@ where } } -/// Perform a cluster scan operation. -/// This function performs a scan operation in a Redis cluster using the given [`ClusterInScan`] connection. -/// It scans the cluster for keys based on the given `ClusterScanArgs` arguments. -/// The function returns a tuple containing the new scan state cursor and the keys found in the scan operation. -/// If the scan operation fails, an error is returned. +/// Performs a cluster-wide `SCAN` operation. +/// +/// This function scans the cluster for keys based on the provided arguments +/// and returns a tuple containing the new scan state and the keys found. /// /// # Arguments -/// * `core` - The connection to the Redis cluster. -/// * `cluster_scan_args` - The arguments for the cluster scan operation. +/// +/// * `core` - The cluster connection. +/// * `cluster_scan_args` - Arguments for the scan operation. /// /// # Returns -/// A tuple containing the new scan state cursor and the keys found in the scan operation. -/// If the scan operation fails, an error is returned. +/// +/// A result containing the new `ScanStateRC` and a vector of `Value`s representing the keys. pub(crate) async fn cluster_scan( core: C, cluster_scan_args: ClusterScanArgs, @@ -436,11 +504,12 @@ where match_pattern, count, object_type, + allow_non_covered_slots, } = cluster_scan_args; // If scan_state is None, meaning we start a new scan let scan_state = match scan_state_cursor.get_state_from_wrapper() { Some(state) => state, - None => match ScanState::initiate_scan(&core).await { + None => match ScanState::initiate_scan(&core, allow_non_covered_slots).await { Ok(state) => state, Err(err) => { return Err(err); @@ -467,8 +536,15 @@ where ErrorKind::IoError | ErrorKind::AllConnectionsUnavailable | ErrorKind::ConnectionNotFoundForRoute => { - let retry = - retry_scan(&scan_state, &core, match_pattern, count, object_type).await?; + let retry = retry_scan( + &scan_state, + &core, + match_pattern, + count, + object_type, + allow_non_covered_slots, + ) + .await?; (from_redis_value(&retry.0?)?, retry.1) } _ => return Err(err), @@ -479,7 +555,7 @@ where // we will update the scan state to get the next address to scan if new_cursor == 0 { scan_state = scan_state - .create_updated_scan_state_for_completed_address(&core) + .create_updated_scan_state_for_completed_address(&core, allow_non_covered_slots) .await?; } @@ -498,7 +574,19 @@ where Ok((ScanStateRC::from_scan_state(scan_state), new_keys)) } -// Send the scan command to the address in the scan_state +/// Sends the `SCAN` command to the specified address. +/// +/// # Arguments +/// +/// * `scan_state` - The current scan state. +/// * `core` - The cluster connection. +/// * `match_pattern` - Optional pattern to match keys. +/// * `count` - Optional count of keys to return per scan. +/// * `object_type` - Optional object type to filter keys. +/// +/// # Returns +/// +/// A `RedisResult` containing the response from the `SCAN` command. async fn send_scan( scan_state: &ScanState, core: &C, @@ -525,17 +613,31 @@ where .await } -// If the scan command failed to route to the address we will check we will first refresh the slots, we will check if all slots are covered by cluster, -// and if so we will try to get a new address to scan for handling case of failover. -// if all slots are not covered by the cluster we will return an error indicating that the cluster is not well configured. -// if all slots are covered by cluster but we failed to get a new address to scan we will return an error indicating that we failed to get a new address to scan. -// if we got a new address to scan but the scan command failed to route to the address we will return an error indicating that we failed to route the command. +/// Attempts to retry the scan operation in case of routing failures. +/// +/// This function handles cases where the scan command fails due to topology changes +/// or unreachable nodes, and attempts to recover by refreshing the cluster state and +/// retrying the scan. +/// +/// # Arguments +/// +/// * `scan_state` - The current scan state. +/// * `core` - The cluster connection. +/// * `match_pattern` - Optional pattern to match keys. +/// * `count` - Optional count of keys to return per scan. +/// * `object_type` - Optional object type to filter keys. +/// * `allow_non_covered_slots` - Whether to allow scanning slots not covered by the cluster. +/// +/// # Returns +/// +/// A `RedisResult` containing a tuple with the scan result and the updated scan state. async fn retry_scan( scan_state: &ScanState, core: &C, match_pattern: Option>, count: Option, object_type: Option, + allow_non_covered_slots: bool, ) -> RedisResult<(RedisResult, ScanState)> where C: ClusterInScan, @@ -549,7 +651,7 @@ where format!("{:?}", err), )) })?; - if !core.are_all_slots_covered().await { + if !core.are_all_slots_covered().await && !allow_non_covered_slots { return Err(RedisError::from(( ErrorKind::NotAllSlotsCovered, "Not all slots are covered by the cluster, please check the cluster configuration", @@ -562,12 +664,20 @@ where let next_slot = scan_state .get_next_slot(&scan_state.scanned_slots_map) .unwrap_or(0); - let address = core.get_address_by_slot(next_slot).await?; + let mut scanned_slots_map = scan_state.scanned_slots_map; + let address = core + .get_address_by_slot(next_slot, &mut scanned_slots_map, allow_non_covered_slots) + .await?; + + if *address == NO_ADDRESSES_FOUND { + let finished_state = ScanState::create_finished_state(); + return Ok((Ok(Value::Nil), finished_state)); + } let new_epoch = core.get_address_epoch(&address).await.unwrap_or(0); let scan_state = &ScanState::new( 0, - scan_state.scanned_slots_map, + scanned_slots_map, // Use the updated map that includes skipped slots address, new_epoch, ScanStateStage::InProgress, @@ -632,12 +742,16 @@ mod tests { } // Create a mock connection struct MockConnection; - #[async_trait] impl ClusterInScan for MockConnection { async fn refresh_if_topology_changed(&self) -> RedisResult { Ok(true) } - async fn get_address_by_slot(&self, _slot: u16) -> RedisResult> { + async fn get_address_by_slot( + &self, + _slot: u16, + _scanned_slots_map: &mut SlotsBitsArray, + _allow_non_covered_slots: bool, + ) -> RedisResult> { Ok("mock_address".to_string().into()) } async fn get_address_epoch(&self, _address: &str) -> Result { @@ -661,7 +775,7 @@ mod tests { #[tokio::test] async fn test_initiate_scan() { let connection = MockConnection; - let scan_state = ScanState::initiate_scan(&connection).await.unwrap(); + let scan_state = ScanState::initiate_scan(&connection, true).await.unwrap(); // Assert that the scan state is initialized correctly assert_eq!(scan_state.cursor, 0); @@ -704,10 +818,10 @@ mod tests { #[tokio::test] async fn test_update_scan_state_and_get_next_address() { let connection = MockConnection; - let scan_state = ScanState::initiate_scan(&connection).await; + let scan_state = ScanState::initiate_scan(&connection, true).await; let updated_scan_state = scan_state .unwrap() - .create_updated_scan_state_for_completed_address(&connection) + .create_updated_scan_state_for_completed_address(&connection, true) .await .unwrap(); @@ -736,7 +850,7 @@ mod tests { ); let scanned_slots_map = scan_state.scanned_slots_map; let updated_scan_state = scan_state - .creating_state_without_slot_changes(&connection) + .creating_state_without_slot_changes(&connection, true) .await .unwrap(); assert_eq!(updated_scan_state.scanned_slots_map, scanned_slots_map); diff --git a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs index cfc4bae594..48df6fc9ab 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs @@ -5,7 +5,7 @@ mod support; mod test_cluster_scan_async { use crate::support::*; use rand::Rng; - use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo}; + use redis::cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo, SingleNodeRoutingInfo}; use redis::{cmd, from_redis_value, ObjectType, RedisResult, ScanStateRC, Value}; use std::time::Duration; @@ -67,7 +67,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, None, None, false) .await .unwrap(); scan_state_rc = next_cursor; @@ -114,7 +114,7 @@ mod test_cluster_scan_async { loop { count += 1; let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, None, None, false) .await .unwrap(); scan_state_rc = next_cursor; @@ -189,18 +189,21 @@ mod test_cluster_scan_async { let mut keys: Vec = Vec::new(); let mut count = 0; let mut result: RedisResult = Ok(Value::Nil); + let mut next_cursor = ScanStateRC::new(); + let mut scan_keys; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; - let (next_cursor, scan_keys) = match scan_response { - Ok((cursor, keys)) => (cursor, keys), + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, None, None, false) + .await; + (next_cursor, scan_keys) = match scan_response { + Ok((cursor, keys)) => (cursor.clone(), keys), Err(e) => { result = Err(e); break; } }; - scan_state_rc = next_cursor; + scan_state_rc = next_cursor.clone(); keys.extend(scan_keys.into_iter().map(|v| from_redis_value(&v).unwrap())); if scan_state_rc.is_finished() { break; @@ -225,6 +228,44 @@ mod test_cluster_scan_async { } // We expect an error of finding address assert!(result.is_err()); + + // Test we can continue scanning after the fail using allow_non_covered_slots=true + scan_state_rc = next_cursor; + // config cluster to allow missing slots + let mut config_cmd = cmd("CONFIG"); + config_cmd + .arg("SET") + .arg("cluster-require-full-coverage") + .arg("no"); + let res: RedisResult = connection + .route_command( + &config_cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await; + print!("config result: {:?}", res); + loop { + let res = connection + .cluster_scan(scan_state_rc.clone(), None, None, true) + .await; + let (next_cursor, scan_keys): (ScanStateRC, Vec) = match res { + Ok((cursor, keys)) => (cursor.clone(), keys), + Err(e) => { + println!("error: {:?}", e); + break; + } + }; + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + assert!(scan_state_rc.is_finished()); } #[tokio::test] @@ -268,8 +309,9 @@ mod test_cluster_scan_async { let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, None, None, false) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -439,12 +481,13 @@ mod test_cluster_scan_async { } // Scan the keys let mut scan_state_rc = ScanStateRC::new(); - let mut keys: Vec = Vec::new(); + let mut keys: Vec = vec![]; let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, None, None, false) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -513,7 +556,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, None, None, false) .await .unwrap(); scan_state_rc = next_cursor; @@ -574,7 +617,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None) + .cluster_scan(scan_state_rc, None, None, false) .await .unwrap(); scan_state_rc = next_cursor; @@ -643,7 +686,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan_with_pattern(scan_state_rc, "key:pattern:*", None, None) + .cluster_scan_with_pattern(scan_state_rc, "key:pattern:*", None, None, false) .await .unwrap(); scan_state_rc = next_cursor; @@ -702,7 +745,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, Some(ObjectType::Set)) + .cluster_scan(scan_state_rc, None, Some(ObjectType::Set), false) .await .unwrap(); scan_state_rc = next_cursor; @@ -756,11 +799,11 @@ mod test_cluster_scan_async { let mut comparing_times = 0; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc.clone(), Some(100), None) + .cluster_scan(scan_state_rc.clone(), Some(100), None, false) .await .unwrap(); let (_, scan_without_count_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, Some(100), None) + .cluster_scan(scan_state_rc, Some(100), None, false) .await .unwrap(); if !scan_keys.is_empty() && !scan_without_count_keys.is_empty() { @@ -821,8 +864,9 @@ mod test_cluster_scan_async { let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, None, None, false) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -835,7 +879,7 @@ mod test_cluster_scan_async { if count == 5 { drop(cluster); let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc.clone(), None, None) + .cluster_scan(scan_state_rc.clone(), None, None, false) .await; assert!(scan_response.is_err()); break; @@ -844,8 +888,9 @@ mod test_cluster_scan_async { cluster = TestClusterContext::new(3, 0); connection = cluster.async_connection(None).await; loop { - let scan_response: RedisResult<(ScanStateRC, Vec)> = - connection.cluster_scan(scan_state_rc, None, None).await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, None, None, false) + .await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -857,4 +902,86 @@ mod test_cluster_scan_async { } } } + + #[tokio::test] + #[serial_test::serial] + async fn test_async_cluster_scan_with_missing_node() { + // Create a cluster with 3 nodes + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(0), + false, + ); + let mut connection = cluster.async_connection(None).await; + + let mut config_cmd = cmd("CONFIG"); + config_cmd + .arg("SET") + .arg("cluster-require-full-coverage") + .arg("no"); + let _: RedisResult = connection + .route_command( + &config_cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await; + // Kill one node + let mut cluster_nodes = cluster.get_cluster_nodes().await; + let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await; + let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await; + match ready { + Ok(_) => {} + Err(e) => { + println!("error: {:?}", e); + } + } + + // Compare slot distribution before and after killing a node + cluster_nodes = cluster.get_cluster_nodes().await; + let new_slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + assert_ne!(slot_distribution, new_slot_distribution); + let mut excepted_keys: Vec = vec![]; + // Set some keys + for i in 0..100 { + let key = format!("key{}", i); + let res: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + if res.is_ok() { + excepted_keys.push(key); + } + } + + // Scan the keys + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = vec![]; + loop { + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, None, None, true) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); // Change the type of `keys` to `Vec` + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + // Check if all keys available scanned + keys.sort(); + keys.dedup(); + excepted_keys.sort(); + excepted_keys.dedup(); + for key in excepted_keys.iter() { + assert!(keys.contains(key)); + } + assert!(keys.len() > 0); + } }