From 16d1121416b74dcf9bd2fbe41805859fa2c43d23 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Fri, 10 Apr 2020 11:23:34 +0800 Subject: [PATCH 1/5] Add active_redirection config --- src/bin/server_proxy.rs | 1 + src/migration/manager.rs | 1 + src/migration/scan_task.rs | 74 ++++++++++++++++++++--------- src/proxy/backend.rs | 45 +----------------- src/proxy/cluster.rs | 96 ++++++++++++++++++++++++-------------- src/proxy/manager.rs | 5 +- src/proxy/redirection.rs | 0 src/proxy/service.rs | 3 ++ 8 files changed, 124 insertions(+), 101 deletions(-) create mode 100644 src/proxy/redirection.rs diff --git a/src/bin/server_proxy.rs b/src/bin/server_proxy.rs index 204b39e6..03057135 100644 --- a/src/bin/server_proxy.rs +++ b/src/bin/server_proxy.rs @@ -88,6 +88,7 @@ fn gen_conf() -> Result { .get::("session_batch_max_time") .unwrap_or_else(|_| 400_000), session_batch_buf, + active_redirection: s.get::("active_redirection").unwrap_or_else(|_| false), }; Ok(config) } diff --git a/src/migration/manager.rs b/src/migration/manager.rs index 02d40a49..1a412761 100644 --- a/src/migration/manager.rs +++ b/src/migration/manager.rs @@ -498,6 +498,7 @@ where } let task = Arc::new(RedisScanImportingTask::new( + config.clone(), mgr_config.clone(), meta.clone(), slot_range.clone(), diff --git a/src/migration/scan_task.rs b/src/migration/scan_task.rs index 7a1eeeab..ab20d585 100644 --- a/src/migration/scan_task.rs +++ b/src/migration/scan_task.rs @@ -10,12 +10,12 @@ use crate::common::config::AtomicMigrationConfig; use crate::common::resp_execution::keep_connecting_and_sending_cmd; use crate::common::response::NOT_READY_FOR_SWITCHING_REPLY; use crate::common::track::TrackedFutureRegistry; -use crate::common::utils::{pretty_print_bytes, ThreadSafe}; +use crate::common::utils::{gen_moved, get_slot, pretty_print_bytes, ThreadSafe}; use crate::common::version::UNDERMOON_MIGRATION_VERSION; use crate::protocol::RespVec; use crate::protocol::{RedisClientError, RedisClientFactory, Resp}; use crate::proxy::backend::{ - CmdTask, CmdTaskFactory, CmdTaskSender, CmdTaskSenderFactory, RedirectionSenderFactory, ReqTask, + CmdTask, CmdTaskFactory, CmdTaskSender, CmdTaskSenderFactory, ReqTask, }; use crate::proxy::blocking::{BlockingHandle, BlockingHintTask, TaskBlockingController}; use crate::proxy::cluster::ClusterSendError; @@ -25,6 +25,7 @@ use atomic_option::AtomicOption; use futures::channel::oneshot; use futures::{future, select, Future, FutureExt, TryFutureExt}; use futures_timer::Delay; +use std::marker::PhantomData; use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -43,12 +44,13 @@ where meta: MigrationMeta, state: Arc, client_factory: Arc, - redirection_sender_factory: RedirectionSenderFactory, stop_signal_sender: AtomicOption>, stop_signal_receiver: AtomicOption>, task: Arc, blocking_ctrl: Arc, future_registry: Arc, + phantom: PhantomData, + active_redirection: bool, } impl RedisScanMigratingTask @@ -59,7 +61,7 @@ where { #[allow(clippy::too_many_arguments)] pub fn new( - _config: Arc, + config: Arc, mgr_config: Arc, cluster_name: ClusterName, slot_range: SlotRange, @@ -76,8 +78,8 @@ where client_factory.clone(), mgr_config.clone(), ); - let redirection_sender_factory = RedirectionSenderFactory::default(); let range_map = RangeMap::from(slot_range.get_range_list()); + let active_redirection = config.active_redirection; Self { mgr_config, cluster_name, @@ -86,12 +88,13 @@ where meta, state: Arc::new(AtomicMigrationState::initial_state()), client_factory, - redirection_sender_factory, stop_signal_sender: AtomicOption::new(Box::new(stop_signal_sender)), stop_signal_receiver: AtomicOption::new(Box::new(stop_signal_receiver)), task: Arc::new(task), blocking_ctrl, future_registry, + phantom: PhantomData, + active_redirection, } } @@ -393,12 +396,11 @@ where _ => (), } - let redirection_sender = self - .redirection_sender_factory - .create(self.meta.dst_proxy_address.clone()); - redirection_sender - .send(cmd_task) - .map_err(|_e| ClusterSendError::MigrationError) + handle_redirection( + cmd_task, + self.meta.dst_proxy_address.clone(), + self.active_redirection, + ) } fn get_state(&self) -> MigrationState { @@ -456,11 +458,11 @@ where state: Arc, _client_factory: Arc, _sender_factory: Arc, - redirection_sender_factory: RedirectionSenderFactory, stop_signal_sender: AtomicOption>, stop_signal_receiver: AtomicOption>, cmd_handler: RestoreDataCmdTaskHandler::Sender>, _cmd_task_factory: Arc, + active_redirection: bool, } impl RedisScanImportingTask @@ -471,6 +473,7 @@ where ::Sender: ThreadSafe + CmdTaskSender>, { pub fn new( + config: Arc, mgr_config: Arc, meta: MigrationMeta, slot_range: SlotRange, @@ -483,8 +486,8 @@ where let cmd_handler = RestoreDataCmdTaskHandler::new(src_sender, dst_sender, cmd_task_factory.clone()); let (stop_signal_sender, stop_signal_receiver) = oneshot::channel(); - let redirection_sender_factory = RedirectionSenderFactory::default(); let range_map = RangeMap::from(slot_range.get_range_list()); + let active_redirection = config.active_redirection; Self { _mgr_config: mgr_config, meta, @@ -492,11 +495,11 @@ where state: Arc::new(AtomicMigrationState::initial_state()), _client_factory: client_factory, _sender_factory: sender_factory, - redirection_sender_factory, stop_signal_sender: AtomicOption::new(Box::new(stop_signal_sender)), stop_signal_receiver: AtomicOption::new(Box::new(stop_signal_receiver)), cmd_handler, _cmd_task_factory: cmd_task_factory, + active_redirection, } } } @@ -546,12 +549,11 @@ where cmd_task: Self::Task, ) -> Result<(), ClusterSendError>> { if self.state.get_state() == MigrationState::PreCheck { - let redirection_sender = self - .redirection_sender_factory - .create(self.meta.src_proxy_address.clone()); - return redirection_sender - .send(cmd_task) - .map_err(|_e| ClusterSendError::MigrationError); + return handle_redirection( + cmd_task, + self.meta.src_proxy_address.clone(), + self.active_redirection, + ); } self.cmd_handler.handle_cmd_task(cmd_task); @@ -613,3 +615,33 @@ impl Drop for ImportingTaskHandle { self.send_stop_signal() } } + +fn handle_redirection( + cmd_task: T, + redirection_address: String, + active_redirection: bool, +) -> Result<(), ClusterSendError>> { + let key = match cmd_task.get_key() { + Some(key) => key, + None => { + let resp = Resp::Error("missing key".to_string().into_bytes()); + cmd_task.set_resp_result(Ok(resp)); + return Ok(()); + } + }; + + let slot = get_slot(key); + + if active_redirection { + let cmd_task = BlockingHintTask::new(cmd_task, false); + Err(ClusterSendError::Moved { + task: cmd_task, + slot, + address: redirection_address, + }) + } else { + let resp = Resp::Error(gen_moved(get_slot(key), redirection_address).into_bytes()); + cmd_task.set_resp_result(Ok(resp)); + Ok(()) + } +} diff --git a/src/proxy/backend.rs b/src/proxy/backend.rs index 6b5a50c8..7982999a 100644 --- a/src/proxy/backend.rs +++ b/src/proxy/backend.rs @@ -4,7 +4,7 @@ use super::slowlog::TaskEvent; use crate::common::batch::TryChunksTimeoutStreamExt; use crate::common::response::ERR_BACKEND_CONNECTION; use crate::common::track::TrackedFutureRegistry; -use crate::common::utils::{gen_moved, get_slot, resolve_first_address, ThreadSafe}; +use crate::common::utils::{resolve_first_address, ThreadSafe}; use crate::protocol::{ new_simple_packet_codec, DecodeError, EncodeError, EncodedPacket, FromResp, MonoPacket, OptionalMulti, Packet, Resp, RespCodec, RespVec, @@ -765,49 +765,6 @@ impl CmdTaskSender for CachedSender { } } -pub struct RedirectionSender { - redirection_address: String, - phantom: PhantomData, -} - -impl CmdTaskSender for RedirectionSender { - type Task = T; - - fn send(&self, cmd_task: Self::Task) -> Result<(), BackendError> { - let key = match cmd_task.get_key() { - Some(key) => key, - None => { - let resp = Resp::Error("missing key".to_string().into_bytes()); - cmd_task.set_resp_result(Ok(resp)); - return Ok(()); - } - }; - let resp = - Resp::Error(gen_moved(get_slot(key), self.redirection_address.clone()).into_bytes()); - cmd_task.set_resp_result(Ok(resp)); - Ok(()) - } -} - -pub struct RedirectionSenderFactory(PhantomData); - -impl Default for RedirectionSenderFactory { - fn default() -> Self { - Self(PhantomData) - } -} - -impl CmdTaskSenderFactory for RedirectionSenderFactory { - type Sender = RedirectionSender; - - fn create(&self, address: String) -> Self::Sender { - RedirectionSender { - redirection_address: address, - phantom: PhantomData, - } - } -} - pub type BackendSenderFactory = CachedSenderFactory>>; diff --git a/src/proxy/cluster.rs b/src/proxy/cluster.rs index 77bf70c9..d40606b6 100644 --- a/src/proxy/cluster.rs +++ b/src/proxy/cluster.rs @@ -66,6 +66,7 @@ where pub fn from_cluster_map>( cluster_meta: &ProxyClusterMeta, sender_factory: &F, + active_redirection: bool, ) -> Self { let epoch = cluster_meta.get_epoch(); @@ -84,8 +85,12 @@ where let mut remote_clusters = HashMap::new(); for (cluster_name, slot_ranges) in cluster_meta.get_peer().get_map().iter() { - let remote_cluster = - RemoteCluster::from_slot_map(cluster_name.clone(), epoch, slot_ranges.clone()); + let remote_cluster = RemoteCluster::from_slot_map( + cluster_name.clone(), + epoch, + slot_ranges.clone(), + active_redirection, + ); remote_clusters.insert(cluster_name.clone(), remote_cluster); } Self { @@ -236,15 +241,31 @@ where // We combine the nodes and slot_map to let them fit into // the same lock with a smaller critical section // compared to the one we need if splitting them. -struct LocalBackend { +struct SenderMap { nodes: HashMap, slot_map: SlotMap, } +impl SenderMap { + fn from_slot_map>( + sender_factory: &F, + slot_map: &HashMap>, + ) -> Self { + let mut nodes = HashMap::new(); + for addr in slot_map.keys() { + nodes.insert(addr.to_string(), sender_factory.create(addr.to_string())); + } + Self { + nodes, + slot_map: SlotMap::from_ranges(slot_map.clone()), + } + } +} + pub struct LocalCluster { name: ClusterName, epoch: u64, - local_backend: LocalBackend, + local_backend: SenderMap, slot_ranges: HashMap>, config: ClusterConfig, } @@ -257,14 +278,7 @@ impl LocalCluster { slot_map: HashMap>, config: ClusterConfig, ) -> Self { - let mut nodes = HashMap::new(); - for addr in slot_map.keys() { - nodes.insert(addr.to_string(), sender_factory.create(addr.to_string())); - } - let local_backend = LocalBackend { - nodes, - slot_map: SlotMap::from_ranges(slot_map.clone()), - }; + let local_backend = SenderMap::from_slot_map(sender_factory, &slot_map); LocalCluster { name, epoch, @@ -351,6 +365,7 @@ pub struct RemoteCluster { epoch: u64, slot_map: SlotMap, slot_ranges: HashMap>, + active_redirection: bool, } impl RemoteCluster { @@ -358,12 +373,14 @@ impl RemoteCluster { name: ClusterName, epoch: u64, slot_map: HashMap>, - ) -> RemoteCluster { - RemoteCluster { + active_redirection: bool, + ) -> Self { + Self { name, epoch, slot_map: SlotMap::from_ranges(slot_map.clone()), slot_ranges: slot_map, + active_redirection, } } @@ -386,11 +403,22 @@ impl RemoteCluster { return Err(ClusterSendError::MissingKey); } }; - match self.slot_map.get_by_key(key) { + + let slot = get_slot(key); + + match self.slot_map.get(slot) { Some(addr) => { - let resp = Resp::Error(gen_moved(get_slot(key), addr.to_string()).into_bytes()); - cmd_task.set_resp_result(Ok(resp)); - Ok(()) + if self.active_redirection { + Err(ClusterSendError::Moved { + task: cmd_task, + slot, + address: addr.to_string(), + }) + } else { + let resp = Resp::Error(gen_moved(get_slot(key), addr.to_string()).into_bytes()); + cmd_task.set_resp_result(Ok(resp)); + Ok(()) + } } None => { let resp = Resp::Error(format!("slot not covered {:?}", key).into_bytes()); @@ -464,6 +492,11 @@ pub enum ClusterSendError { SlotNotCovered, Backend(BackendError), MigrationError, + Moved { + task: T, + slot: usize, + address: String, + }, } impl fmt::Display for ClusterSendError { @@ -483,31 +516,24 @@ impl fmt::Debug for ClusterSendError { Self::SlotNotCovered => "ClusterSendError::SlotNotCovered".to_string(), Self::Backend(err) => format!("ClusterSendError::Backend({})", err), Self::MigrationError => "ClusterSendError::MigrationError".to_string(), + Self::Moved { slot, address, .. } => { + format!("ClusterSendError::Moved({} {})", slot, address) + } }; write!(f, "{}", s) } } impl Error for ClusterSendError { - fn description(&self) -> &str { - match self { - ClusterSendError::MissingKey => "missing key", - ClusterSendError::ClusterNotFound(_) => "cluster not found", - ClusterSendError::SlotNotFound(_) => "slot not found", - ClusterSendError::Backend(_) => "backend error", - ClusterSendError::SlotNotCovered => "slot not covered", - ClusterSendError::MigrationError => "migration queue error", - } - } - fn cause(&self) -> Option<&dyn Error> { match self { - ClusterSendError::MissingKey => None, - ClusterSendError::ClusterNotFound(_) => None, - ClusterSendError::SlotNotFound(_) => None, - ClusterSendError::Backend(err) => Some(err), - ClusterSendError::SlotNotCovered => None, - ClusterSendError::MigrationError => None, + Self::MissingKey => None, + Self::ClusterNotFound(_) => None, + Self::SlotNotFound(_) => None, + Self::Backend(err) => Some(err), + Self::SlotNotCovered => None, + Self::MigrationError => None, + Self::Moved { .. } => None, } } } diff --git a/src/proxy/manager.rs b/src/proxy/manager.rs index 543e785e..31f25936 100644 --- a/src/proxy/manager.rs +++ b/src/proxy/manager.rs @@ -159,6 +159,8 @@ impl> MetaManager } pub fn set_meta(&self, cluster_meta: ProxyClusterMeta) -> Result<(), ClusterMetaError> { + let active_redirection = self.config.active_redirection; + let sender_factory = &self.sender_factory; let migration_manager = &self.migration_manager; @@ -171,7 +173,8 @@ impl> MetaManager } let old_meta_map = self.meta_map.load(); - let cluster_map = ClusterBackendMap::from_cluster_map(&cluster_meta, sender_factory); + let cluster_map = + ClusterBackendMap::from_cluster_map(&cluster_meta, sender_factory, active_redirection); let (migration_map, new_tasks) = migration_manager.create_new_migration_map( &old_meta_map.migration_map, cluster_meta.get_local(), diff --git a/src/proxy/redirection.rs b/src/proxy/redirection.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/proxy/service.rs b/src/proxy/service.rs index 919cdaf1..019b1faa 100644 --- a/src/proxy/service.rs +++ b/src/proxy/service.rs @@ -29,6 +29,7 @@ pub struct ServerProxyConfig { pub session_batch_min_time: usize, pub session_batch_max_time: usize, pub session_batch_buf: NonZeroUsize, + pub active_redirection: bool, } impl ServerProxyConfig { @@ -52,6 +53,7 @@ impl ServerProxyConfig { "session_batch_min_time" => Ok(self.session_batch_min_time.to_string()), "session_batch_max_time" => Ok(self.session_batch_max_time.to_string()), "session_batch_buf" => Ok(self.session_batch_buf.to_string()), + "active_redirection" => Ok(self.active_redirection.to_string()), _ => Err(ConfigError::FieldNotFound), } } @@ -80,6 +82,7 @@ impl ServerProxyConfig { "session_batch_min_time" => Err(ConfigError::ReadonlyField), "session_batch_max_time" => Err(ConfigError::ReadonlyField), "session_batch_buf" => Err(ConfigError::ReadonlyField), + "active_redirection" => Err(ConfigError::ReadonlyField), _ => Err(ConfigError::FieldNotFound), } } From ccbdd84c812d5b62611f740de3dfdf0aaed5d6b9 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Fri, 10 Apr 2020 15:59:15 +0800 Subject: [PATCH 2/5] Support active redirection on migration --- conf/server-proxy.toml | 6 +++ src/bin/server_proxy.rs | 4 +- src/migration/scan_task.rs | 1 + src/proxy/cluster.rs | 94 +++++++++++++++++++++++++++---------- src/proxy/manager.rs | 55 ++++++++++++++++++---- src/proxy/redirection.rs | 1 + tests/proxy_manager_test.rs | 1 + 7 files changed, 128 insertions(+), 34 deletions(-) diff --git a/conf/server-proxy.toml b/conf/server-proxy.toml index a1dfd2d8..1ce82ec1 100644 --- a/conf/server-proxy.toml +++ b/conf/server-proxy.toml @@ -26,3 +26,9 @@ backend_batch_buf = 10 session_batch_min_time = 20000 session_batch_max_time = 400000 session_batch_buf = 10 + +# Active Redirection Mode +# When active_redirection is enabled, +# all the server proxies will handle the redirection inside. +# Clients don't need to be a Redis Cluster Client. +active_redirection = false diff --git a/src/bin/server_proxy.rs b/src/bin/server_proxy.rs index 03057135..d63cf126 100644 --- a/src/bin/server_proxy.rs +++ b/src/bin/server_proxy.rs @@ -88,7 +88,9 @@ fn gen_conf() -> Result { .get::("session_batch_max_time") .unwrap_or_else(|_| 400_000), session_batch_buf, - active_redirection: s.get::("active_redirection").unwrap_or_else(|_| false), + active_redirection: s + .get::("active_redirection") + .unwrap_or_else(|_| false), }; Ok(config) } diff --git a/src/migration/scan_task.rs b/src/migration/scan_task.rs index ab20d585..6fc94afa 100644 --- a/src/migration/scan_task.rs +++ b/src/migration/scan_task.rs @@ -634,6 +634,7 @@ fn handle_redirection( if active_redirection { let cmd_task = BlockingHintTask::new(cmd_task, false); + // Proceed the command inside this proxy. Err(ClusterSendError::Moved { task: cmd_task, slot, diff --git a/src/proxy/cluster.rs b/src/proxy/cluster.rs index d40606b6..08fae8e1 100644 --- a/src/proxy/cluster.rs +++ b/src/proxy/cluster.rs @@ -39,17 +39,19 @@ pub trait ClusterTag { fn set_cluster_name(&mut self, cluster_name: ClusterName); } -pub struct ClusterBackendMap +pub struct ClusterBackendMap where ::Task: ClusterTag, + S: CmdTaskSender::Task>, { local_clusters: HashMap>, - remote_clusters: HashMap, + remote_clusters: HashMap>, } -impl Default for ClusterBackendMap +impl Default for ClusterBackendMap where ::Task: ClusterTag, + S: CmdTaskSender::Task>, { fn default() -> Self { Self { @@ -59,13 +61,18 @@ where } } -impl ClusterBackendMap +impl ClusterBackendMap where ::Task: ClusterTag, + S: CmdTaskSender::Task>, { - pub fn from_cluster_map>( + pub fn from_cluster_map< + F: CmdTaskSenderFactory, + PF: CmdTaskSenderFactory, + >( cluster_meta: &ProxyClusterMeta, sender_factory: &F, + peer_sender_factory: &PF, active_redirection: bool, ) -> Self { let epoch = cluster_meta.get_epoch(); @@ -86,6 +93,7 @@ where let mut remote_clusters = HashMap::new(); for (cluster_name, slot_ranges) in cluster_meta.get_peer().get_map().iter() { let remote_cluster = RemoteCluster::from_slot_map( + peer_sender_factory, cluster_name.clone(), epoch, slot_ranges.clone(), @@ -166,6 +174,26 @@ where } } + pub fn send_remote_directly( + &self, + cmd_task: ::Task, + slot: usize, + address: String, + ) -> Result<(), ClusterSendError<::Task>> { + match self.remote_clusters.get(&cmd_task.get_cluster_name()) { + Some(remote_cluster) => { + remote_cluster.send_remote_directly(cmd_task, slot, address.as_str()) + } + None => { + let resp = Resp::Error( + format!("slot not found: {}", cmd_task.get_cluster_name()).into_bytes(), + ); + cmd_task.set_resp_result(Ok(resp)); + Err(ClusterSendError::SlotNotCovered) + } + } + } + pub fn get_clusters(&self) -> Vec { self.local_clusters.keys().cloned().collect() } @@ -360,27 +388,33 @@ impl LocalCluster { } } -pub struct RemoteCluster { +pub struct RemoteCluster { name: ClusterName, epoch: u64, slot_map: SlotMap, slot_ranges: HashMap>, - active_redirection: bool, + remote_backend: Option>, } -impl RemoteCluster { - pub fn from_slot_map( +impl RemoteCluster { + pub fn from_slot_map>( + sender_factory: &F, name: ClusterName, epoch: u64, slot_map: HashMap>, active_redirection: bool, ) -> Self { + let remote_backend = if active_redirection { + Some(SenderMap::from_slot_map(sender_factory, &slot_map)) + } else { + None + }; Self { name, epoch, slot_map: SlotMap::from_ranges(slot_map.clone()), slot_ranges: slot_map, - active_redirection, + remote_backend, } } @@ -394,7 +428,10 @@ impl RemoteCluster { Resp::Arr(Array::Arr(arr)) } - pub fn send_remote(&self, cmd_task: T) -> Result<(), ClusterSendError> { + pub fn send_remote( + &self, + cmd_task: ::Task, + ) -> Result<(), ClusterSendError<::Task>> { let key = match cmd_task.get_key() { Some(key) => key, None => { @@ -407,19 +444,7 @@ impl RemoteCluster { let slot = get_slot(key); match self.slot_map.get(slot) { - Some(addr) => { - if self.active_redirection { - Err(ClusterSendError::Moved { - task: cmd_task, - slot, - address: addr.to_string(), - }) - } else { - let resp = Resp::Error(gen_moved(get_slot(key), addr.to_string()).into_bytes()); - cmd_task.set_resp_result(Ok(resp)); - Ok(()) - } - } + Some(addr) => self.send_remote_directly(cmd_task, slot, addr), None => { let resp = Resp::Error(format!("slot not covered {:?}", key).into_bytes()); cmd_task.set_resp_result(Ok(resp)); @@ -428,6 +453,27 @@ impl RemoteCluster { } } + pub fn send_remote_directly( + &self, + cmd_task: ::Task, + slot: usize, + address: &str, + ) -> Result<(), ClusterSendError<::Task>> { + if let Some(remote_backend) = self.remote_backend.as_ref() { + match remote_backend.nodes.get(address) { + Some(sender) => sender.send(cmd_task).map_err(ClusterSendError::Backend), + None => { + warn!("failed to get node"); + Err(ClusterSendError::SlotNotFound(cmd_task)) + } + } + } else { + let resp = Resp::Error(gen_moved(slot, address.to_string()).into_bytes()); + cmd_task.set_resp_result(Ok(resp)); + Ok(()) + } + } + pub fn gen_remote_cluster_nodes( &self, migration_states: &HashMap, diff --git a/src/proxy/manager.rs b/src/proxy/manager.rs index 31f25936..ddaa8554 100644 --- a/src/proxy/manager.rs +++ b/src/proxy/manager.rs @@ -22,26 +22,28 @@ use crate::migration::manager::{MigrationManager, MigrationMap, SwitchError}; use crate::migration::task::MgrSubCmd; use crate::migration::task::SwitchArg; use crate::protocol::{Array, BulkStr, RedisClientFactory, Resp, RespPacket, RespVec}; -use crate::proxy::backend::{CmdTask, ConnFactory}; +use crate::proxy::backend::{CachedSenderFactory, CmdTask, ConnFactory}; use crate::replication::manager::ReplicatorManager; use crate::replication::replicator::ReplicatorMeta; use arc_swap::ArcSwap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -pub struct MetaMap +pub struct MetaMap where ::Task: ClusterTag, + S: CmdTaskSender::Task>, T: CmdTask + ClusterTag, { - cluster_map: ClusterBackendMap, + cluster_map: ClusterBackendMap, migration_map: MigrationMap, deleting_task_map: DeleteKeysTaskMap, } -impl MetaMap +impl MetaMap where ::Task: ClusterTag, + S: CmdTaskSender::Task>, T: CmdTask + ClusterTag, { pub fn empty() -> Self { @@ -55,7 +57,7 @@ where } } - pub fn get_cluster_map(&self) -> &ClusterBackendMap { + pub fn get_cluster_map(&self) -> &ClusterBackendMap { &self.cluster_map } } @@ -67,9 +69,20 @@ type SenderFactory = BlockingBackendSenderFactory< C, BlockingTaskRetrySender, >; + +type PeerSenderFactory = SenderFactory; +//type PeerSenderFactory = CachedSenderFactory>; + type MigrationSenderFactory = MigrationBackendSenderFactory; -pub type SharedMetaMap = - Arc as CmdTaskSenderFactory>::Sender, CmdCtx>>>; +pub type SharedMetaMap = Arc< + ArcSwap< + MetaMap< + as CmdTaskSenderFactory>::Sender, + as CmdTaskSenderFactory>::Sender, + CmdCtx, + >, + >, +>; pub struct MetaManager> { config: Arc, @@ -82,6 +95,7 @@ pub struct MetaManager> replicator_manager: ReplicatorManager, migration_manager: MigrationManager, CmdCtxFactory>, sender_factory: SenderFactory, + peer_sender_factory: PeerSenderFactory, blocking_map: Arc, BlockingTaskRetrySender>>, } @@ -103,6 +117,7 @@ impl> MetaManager ); let blocking_map = Arc::new(BlockingMap::new(basic_sender_factory, blocking_task_sender)); let sender_factory = gen_blocking_sender_factory(blocking_map.clone()); + let peer_sender_factory = gen_blocking_sender_factory(blocking_map.clone()); let migration_sender_factory = Arc::new(gen_migration_sender_factory( config.clone(), Arc::new(ReplyCommitHandlerFactory::default()), @@ -130,6 +145,7 @@ impl> MetaManager future_registry, ), sender_factory, + peer_sender_factory, blocking_map, } } @@ -162,6 +178,7 @@ impl> MetaManager let active_redirection = self.config.active_redirection; let sender_factory = &self.sender_factory; + let peer_sender_factory = &self.peer_sender_factory; let migration_manager = &self.migration_manager; let _guard = self.lock.lock().expect("MetaManager::set_meta"); @@ -173,8 +190,12 @@ impl> MetaManager } let old_meta_map = self.meta_map.load(); - let cluster_map = - ClusterBackendMap::from_cluster_map(&cluster_meta, sender_factory, active_redirection); + let cluster_map = ClusterBackendMap::from_cluster_map( + &cluster_meta, + sender_factory, + peer_sender_factory, + active_redirection, + ); let (migration_map, new_tasks) = migration_manager.create_new_migration_map( &old_meta_map.migration_map, cluster_meta.get_local(), @@ -297,6 +318,22 @@ pub fn send_cmd_ctx>( Ok(()) => return, Err(e) => match e { ClusterSendError::SlotNotFound(cmd_ctx) => cmd_ctx, + ClusterSendError::Moved { + task, + slot, + address, + } => { + let res = meta_map + .cluster_map + .send_remote_directly(task.into(), slot, address); + if let Err(e) = res { + match e { + ClusterSendError::MissingKey => (), + err => warn!("Failed to forward cmd_ctx to remote: {:?}", err), + } + } + return; + } err => { error!("migration send task failed: {:?}", err); return; diff --git a/src/proxy/redirection.rs b/src/proxy/redirection.rs index e69de29b..8b137891 100644 --- a/src/proxy/redirection.rs +++ b/src/proxy/redirection.rs @@ -0,0 +1 @@ + diff --git a/tests/proxy_manager_test.rs b/tests/proxy_manager_test.rs index ae107731..48e7e152 100644 --- a/tests/proxy_manager_test.rs +++ b/tests/proxy_manager_test.rs @@ -58,6 +58,7 @@ mod tests { session_batch_min_time: 10000, session_batch_max_time: 10000, session_batch_buf: NonZeroUsize::new(50).unwrap(), + active_redirection: false, } } From 10dfc09ca066ac7a2a72d4182f564c4a9ab91432 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Fri, 10 Apr 2020 17:06:52 +0800 Subject: [PATCH 3/5] make remote sender different from local sender --- src/proxy/backend.rs | 10 ++++++++++ src/proxy/blocking.rs | 8 +++++++- src/proxy/cluster.rs | 45 +++++++++++++++++++++++++++++++++---------- src/proxy/manager.rs | 22 ++++++++++++--------- 4 files changed, 65 insertions(+), 20 deletions(-) diff --git a/src/proxy/backend.rs b/src/proxy/backend.rs index 7982999a..398a3324 100644 --- a/src/proxy/backend.rs +++ b/src/proxy/backend.rs @@ -62,6 +62,16 @@ pub trait CmdTask: ThreadSafe { fn log_event(&mut self, event: TaskEvent); } +pub trait IntoTask: CmdTask { + fn into_task(self) -> T; +} + +impl IntoTask for T { + fn into_task(self) -> T { + self + } +} + pub trait CmdTaskFactory { type Task: CmdTask; diff --git a/src/proxy/blocking.rs b/src/proxy/blocking.rs index 262ff6ce..f83fde4c 100644 --- a/src/proxy/blocking.rs +++ b/src/proxy/blocking.rs @@ -1,6 +1,6 @@ use super::backend::{ BackendError, CachedSenderFactory, CmdTask, CmdTaskResultHandler, CmdTaskResultHandlerFactory, - CmdTaskSender, CmdTaskSenderFactory, ConnFactory, RRSenderGroupFactory, + CmdTaskSender, CmdTaskSenderFactory, ConnFactory, IntoTask, RRSenderGroupFactory, RecoverableBackendNodeFactory, }; use super::cluster::ClusterTag; @@ -509,3 +509,9 @@ impl ClusterTag for BlockingHintTask { self.inner.set_cluster_name(cluster_name) } } + +impl IntoTask for BlockingHintTask { + fn into_task(self) -> T { + self.into_inner() + } +} diff --git a/src/proxy/cluster.rs b/src/proxy/cluster.rs index 08fae8e1..aed9750c 100644 --- a/src/proxy/cluster.rs +++ b/src/proxy/cluster.rs @@ -1,5 +1,4 @@ -use super::backend::CmdTask; -use super::backend::{BackendError, CmdTaskSender, CmdTaskSenderFactory}; +use super::backend::{BackendError, CmdTask, CmdTaskSender, CmdTaskSenderFactory, IntoTask}; use super::slot::SlotMap; use crate::common::cluster::{ClusterName, RangeList, SlotRange, SlotRangeTag}; use crate::common::config::ClusterConfig; @@ -42,7 +41,7 @@ pub trait ClusterTag { pub struct ClusterBackendMap where ::Task: ClusterTag, - S: CmdTaskSender::Task>, + ::Task: IntoTask<

::Task>, { local_clusters: HashMap>, remote_clusters: HashMap>, @@ -51,7 +50,7 @@ where impl Default for ClusterBackendMap where ::Task: ClusterTag, - S: CmdTaskSender::Task>, + ::Task: IntoTask<

::Task>, { fn default() -> Self { Self { @@ -64,7 +63,7 @@ where impl ClusterBackendMap where ::Task: ClusterTag, - S: CmdTaskSender::Task>, + ::Task: IntoTask<

::Task>, { pub fn from_cluster_map< F: CmdTaskSenderFactory, @@ -142,18 +141,18 @@ where pub fn send( &self, cmd_task: ::Task, - ) -> Result<(), ClusterSendError<::Task>> { + ) -> Result<(), ClusterSendError<

::Task>> { let (cmd_task, cluster_exists) = match self.local_clusters.get(&cmd_task.get_cluster_name()) { Some(local_cluster) => match local_cluster.send(cmd_task) { Err(ClusterSendError::SlotNotFound(cmd_task)) => (cmd_task, true), - others => return others, + others => return others.map_err(|err| err.map_task(|task| task.into_task())), }, None => (cmd_task, false), }; match self.remote_clusters.get(&cmd_task.get_cluster_name()) { - Some(remote_cluster) => remote_cluster.send_remote(cmd_task), + Some(remote_cluster) => remote_cluster.send_remote(cmd_task.into_task()), None => { if cluster_exists { let resp = Resp::Error( @@ -179,10 +178,10 @@ where cmd_task: ::Task, slot: usize, address: String, - ) -> Result<(), ClusterSendError<::Task>> { + ) -> Result<(), ClusterSendError<

::Task>> { match self.remote_clusters.get(&cmd_task.get_cluster_name()) { Some(remote_cluster) => { - remote_cluster.send_remote_directly(cmd_task, slot, address.as_str()) + remote_cluster.send_remote_directly(cmd_task.into_task(), slot, address.as_str()) } None => { let resp = Resp::Error( @@ -584,6 +583,32 @@ impl Error for ClusterSendError { } } +impl ClusterSendError { + pub fn map_task(self: Self, f: F) -> ClusterSendError

+ where + P: CmdTask, + F: Fn(T) -> P, + { + match self { + Self::MissingKey => ClusterSendError::MissingKey, + Self::ClusterNotFound(cluster) => ClusterSendError::ClusterNotFound(cluster), + Self::SlotNotFound(task) => ClusterSendError::SlotNotFound(f(task)), + Self::Backend(err) => ClusterSendError::Backend(err), + Self::SlotNotCovered => ClusterSendError::SlotNotCovered, + Self::MigrationError => ClusterSendError::MigrationError, + Self::Moved { + task, + slot, + address, + } => ClusterSendError::Moved { + task: f(task), + slot, + address, + }, + } + } +} + fn gen_cluster_nodes_helper( name: &ClusterName, epoch: u64, diff --git a/src/proxy/manager.rs b/src/proxy/manager.rs index ddaa8554..a8175af1 100644 --- a/src/proxy/manager.rs +++ b/src/proxy/manager.rs @@ -1,6 +1,6 @@ use super::backend::{ - gen_migration_sender_factory, BackendError, CmdTaskSender, CmdTaskSenderFactory, - MigrationBackendSenderFactory, + gen_migration_sender_factory, gen_sender_factory, BackendError, BackendSenderFactory, CmdTask, + CmdTaskSender, CmdTaskSenderFactory, ConnFactory, IntoTask, MigrationBackendSenderFactory, }; use super::blocking::{ gen_basic_blocking_sender_factory, gen_blocking_sender_factory, BasicBlockingSenderFactory, @@ -22,7 +22,6 @@ use crate::migration::manager::{MigrationManager, MigrationMap, SwitchError}; use crate::migration::task::MgrSubCmd; use crate::migration::task::SwitchArg; use crate::protocol::{Array, BulkStr, RedisClientFactory, Resp, RespPacket, RespVec}; -use crate::proxy::backend::{CachedSenderFactory, CmdTask, ConnFactory}; use crate::replication::manager::ReplicatorManager; use crate::replication::replicator::ReplicatorMeta; use arc_swap::ArcSwap; @@ -32,7 +31,7 @@ use std::sync::{Arc, Mutex}; pub struct MetaMap where ::Task: ClusterTag, - S: CmdTaskSender::Task>, + ::Task: IntoTask<

::Task>, T: CmdTask + ClusterTag, { cluster_map: ClusterBackendMap, @@ -43,7 +42,7 @@ where impl MetaMap where ::Task: ClusterTag, - S: CmdTaskSender::Task>, + ::Task: IntoTask<

::Task>, T: CmdTask + ClusterTag, { pub fn empty() -> Self { @@ -70,8 +69,7 @@ type SenderFactory = BlockingBackendSenderFactory< BlockingTaskRetrySender, >; -type PeerSenderFactory = SenderFactory; -//type PeerSenderFactory = CachedSenderFactory>; +type PeerSenderFactory = BackendSenderFactory; type MigrationSenderFactory = MigrationBackendSenderFactory; pub type SharedMetaMap = Arc< @@ -117,7 +115,13 @@ impl> MetaManager ); let blocking_map = Arc::new(BlockingMap::new(basic_sender_factory, blocking_task_sender)); let sender_factory = gen_blocking_sender_factory(blocking_map.clone()); - let peer_sender_factory = gen_blocking_sender_factory(blocking_map.clone()); + let reply_commit_handler_factory = Arc::new(ReplyCommitHandlerFactory::default()); + let peer_sender_factory = gen_sender_factory( + config.clone(), + reply_commit_handler_factory, + conn_factory.clone(), + future_registry.clone(), + ); let migration_sender_factory = Arc::new(gen_migration_sender_factory( config.clone(), Arc::new(ReplyCommitHandlerFactory::default()), @@ -325,7 +329,7 @@ pub fn send_cmd_ctx>( } => { let res = meta_map .cluster_map - .send_remote_directly(task.into(), slot, address); + .send_remote_directly(task, slot, address); if let Err(e) = res { match e { ClusterSendError::MissingKey => (), From 436e11985bd602a7f2e90d0211b192bf546d5c49 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Fri, 10 Apr 2020 19:52:07 +0800 Subject: [PATCH 4/5] Support active redirection in chaos test --- Makefile | 12 ++++++++++-- chaostest/config.py | 1 + chaostest/render_compose.py | 25 ++++++++++++++++++++++--- chaostest/test_stack_mem_broker.yml.j2 | 1 + src/proxy/cluster.rs | 19 ++++++++----------- 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index 7cd904d3..70e7736a 100644 --- a/Makefile +++ b/Makefile @@ -77,11 +77,19 @@ docker-overmoon: docker-compose -f examples/docker-compose-overmoon.yml up start-func-test: - python chaostest/render_compose.py mem_broker + python chaostest/render_compose.py -t mem_broker + docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos + +start-func-test-active: + python chaostest/render_compose.py -t mem_broker -a docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos start-chaos: - python chaostest/render_compose.py mem_broker enable_failure + python chaostest/render_compose.py -t mem_broker -f + docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos + +start-chaos-active: + python chaostest/render_compose.py -t mem_broker -f -a docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos stop-chaos: diff --git a/chaostest/config.py b/chaostest/config.py index aec27366..14f47cda 100644 --- a/chaostest/config.py +++ b/chaostest/config.py @@ -17,6 +17,7 @@ 'broker_port': 7799, 'broker_address': 'broker:7799', 'etcd_port': 2379, + 'active_redirection': False, 'pumba_commands': { 'kill': "--random --interval 60s kill 're2:(server_proxy|coordinator|overmoon).*'", 'delay': "--random --interval 20s netem --duration 5s delay 're2:(server_proxy|coordinator|overmoon).*'", diff --git a/chaostest/render_compose.py b/chaostest/render_compose.py index c60ef7e7..e727d488 100644 --- a/chaostest/render_compose.py +++ b/chaostest/render_compose.py @@ -1,4 +1,5 @@ import sys +import argparse from jinja2 import Environment, FileSystemLoader @@ -18,12 +19,30 @@ def render_docker_compose(docker_compose_yml, is_mem_broker): if __name__ == '__main__': # Usage: - # python chaostest/render_compose.py [overmoon|mem_broker] [enable_failure] - is_mem_broker = len(sys.argv) >= 2 and sys.argv[1] == 'mem_broker' - enable_failure = len(sys.argv) >= 3 and sys.argv[2] == 'enable_failure' + # python chaostest/render_compose.py -t [overmoon|mem_broker] [-f] [-a] + parser = argparse.ArgumentParser(description='Render docker-compose file for chaos testing') + + parser.add_argument('-t', action='store', dest='test_type',default='mem_broker') + parser.add_argument('-f', action='store_true', dest="enable_failure_injection", default=False) + parser.add_argument('-a', action="store_true", dest="active_redirection", default=False) + + results = parser.parse_args() + + is_mem_broker = results.test_type == 'mem_broker' + enable_failure = results.enable_failure_injection + active_redirection = results.active_redirection + if not enable_failure: print("Disable fault injection") config.DOCKER_COMPOSE_CONFIG['pumba_commands'] = {} else: print("Enable fault injection") + + if active_redirection: + config.DOCKER_COMPOSE_CONFIG['active_redirection'] = True + print("Enable active redirection") + else: + config.DOCKER_COMPOSE_CONFIG['active_redirection'] = False + print("Disable active redirection") + render_docker_compose('chaostest/chaos-docker-compose.yml', is_mem_broker) diff --git a/chaostest/test_stack_mem_broker.yml.j2 b/chaostest/test_stack_mem_broker.yml.j2 index c83c5a06..39c73d1a 100644 --- a/chaostest/test_stack_mem_broker.yml.j2 +++ b/chaostest/test_stack_mem_broker.yml.j2 @@ -76,6 +76,7 @@ server_proxy{{ proxy_port }}: - UNDERMOON_SESSION_CHANNEL_SIZE=4096 - UNDERMOON_BACKEND_CHANNEL_SIZE=4096 - UNDERMOON_BACKEND_CONN_NUM=4 + - UNDERMOON_ACTIVE_REDIRECTION={{ active_redirection }} {% endfor %} {% endfilter %} diff --git a/src/proxy/cluster.rs b/src/proxy/cluster.rs index aed9750c..7b4efaf5 100644 --- a/src/proxy/cluster.rs +++ b/src/proxy/cluster.rs @@ -265,9 +265,6 @@ where } } -// We combine the nodes and slot_map to let them fit into -// the same lock with a smaller critical section -// compared to the one we need if splitting them. struct SenderMap { nodes: HashMap, slot_map: SlotMap, @@ -387,16 +384,16 @@ impl LocalCluster { } } -pub struct RemoteCluster { +pub struct RemoteCluster { name: ClusterName, epoch: u64, slot_map: SlotMap, slot_ranges: HashMap>, - remote_backend: Option>, + remote_backend: Option>, } -impl RemoteCluster { - pub fn from_slot_map>( +impl RemoteCluster

{ + pub fn from_slot_map>( sender_factory: &F, name: ClusterName, epoch: u64, @@ -429,8 +426,8 @@ impl RemoteCluster { pub fn send_remote( &self, - cmd_task: ::Task, - ) -> Result<(), ClusterSendError<::Task>> { + cmd_task:

::Task, + ) -> Result<(), ClusterSendError<

::Task>> { let key = match cmd_task.get_key() { Some(key) => key, None => { @@ -454,10 +451,10 @@ impl RemoteCluster { pub fn send_remote_directly( &self, - cmd_task: ::Task, + cmd_task:

::Task, slot: usize, address: &str, - ) -> Result<(), ClusterSendError<::Task>> { + ) -> Result<(), ClusterSendError<

::Task>> { if let Some(remote_backend) = self.remote_backend.as_ref() { match remote_backend.nodes.get(address) { Some(sender) => sender.send(cmd_task).map_err(ClusterSendError::Backend), From ff49c57b82f5b149759c2cea3f5e15edf36b61d2 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Fri, 10 Apr 2020 21:05:19 +0800 Subject: [PATCH 5/5] Remove unused file --- src/proxy/redirection.rs | 1 - 1 file changed, 1 deletion(-) delete mode 100644 src/proxy/redirection.rs diff --git a/src/proxy/redirection.rs b/src/proxy/redirection.rs deleted file mode 100644 index 8b137891..00000000 --- a/src/proxy/redirection.rs +++ /dev/null @@ -1 +0,0 @@ -