diff --git a/Cargo.lock b/Cargo.lock index f0253c0d..eaded3aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,6 +323,18 @@ name = "arrayvec" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "async-compression" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project-lite 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "async-trait" version = "0.1.40" @@ -406,6 +418,11 @@ name = "base64" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "bitflags" version = "1.2.1" @@ -1676,6 +1693,7 @@ name = "reqwest" version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "async-compression 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "base64 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "encoding_rs 0.8.24 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2101,6 +2119,7 @@ dependencies = [ "atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "atomic-option 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "backtrace 0.3.50 (registry+https://github.com/rust-lang/crates.io-index)", + "base64 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "btoi 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "caseless 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2115,6 +2134,7 @@ dependencies = [ "derivative 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "either 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures-batch 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2410,6 +2430,7 @@ dependencies = [ "checksum arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "bc4662175ead9cd84451d5c35070517777949a2ed84551764129cedb88384841" "checksum arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" "checksum arrayvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" +"checksum async-compression 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "9021768bcce77296b64648cc7a7460e3df99979b97ed5c925c38d1cc83778d98" "checksum async-trait 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "687c230d85c0a52504709705fc8a53e4a692b83a2184f03dae73e38e1e93a783" "checksum atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e0afb7287b68575f5ca0e5c7e40191cbd4be59d325781f46faa603e176eaef47" "checksum atomic-option 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" @@ -2419,6 +2440,7 @@ dependencies = [ "checksum backtrace 0.3.50 (registry+https://github.com/rust-lang/crates.io-index)" = "46254cf2fdcdf1badb5934448c1bcbe046a56537b3987d96c51a7afc5d03f293" "checksum base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" "checksum base64 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" +"checksum base64 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum brotli-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4445dea95f4c2b41cde57cc9fee236ae4dbae88d8fcbdb4750fc1bb5d86aaecd" "checksum brotli2 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0cb036c3eade309815c15ddbacec5b22c4d1f3983a774ab2eac2e3e9ea85568e" diff --git a/Cargo.toml b/Cargo.toml index c6c9a4e4..67a1d9a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ crc16 = "0.4.0" crc64 = "1.0.0" caseless = "0.2.1" arc-swap = "0.3.11" -reqwest = { version = "0.10.1", features = ["json"] } +reqwest = { version = "0.10.1", features = ["json", "gzip"] } serde = "1.0" serde_derive = "1.0.88" serde_json = "1.0" @@ -58,6 +58,8 @@ backtrace = "0.3" jemallocator = "0.3.0" async-trait = "0.1" derivative = "2.1.1" +flate2 = "1" +base64 = "0.13.0" [profile.release] debug = true diff --git a/conf/coordinator.toml b/conf/coordinator.toml index ac2fa3a5..32fe45bd 100644 --- a/conf/coordinator.toml +++ b/conf/coordinator.toml @@ -3,3 +3,5 @@ address="127.0.0.1:6699" broker_address = "127.0.0.1:7799" reporter_id = "127.0.0.1:6699" thread_number = 2 +# Set this to true for large cluster +enable_compression = false diff --git a/src/bin/coordinator.rs b/src/bin/coordinator.rs index 8c20941d..90c6b426 100644 --- a/src/bin/coordinator.rs +++ b/src/bin/coordinator.rs @@ -58,12 +58,17 @@ fn gen_conf() -> CoordinatorConfig { let proxy_timeout = s.get::("proxy_timeout").unwrap_or_else(|_| 2); + let enable_compression = s + .get::("enable_compression") + .unwrap_or_else(|_| false); + CoordinatorConfig { address, broker_addresses: Arc::new(ArcSwap::new(Arc::new(broker_address_list))), reporter_id, thread_number, proxy_timeout, + enable_compression, } } @@ -74,6 +79,7 @@ fn gen_service( let data_broker = Arc::new(HttpMetaBroker::new( config.broker_addresses.clone(), http_client.clone(), + config.enable_compression, )); let mani_broker = Arc::new(HttpMetaManipulationBroker::new( config.broker_addresses.clone(), diff --git a/src/bin/mem_broker.rs b/src/bin/mem_broker.rs index 639bcc69..df1fb702 100644 --- a/src/bin/mem_broker.rs +++ b/src/bin/mem_broker.rs @@ -189,6 +189,7 @@ async fn main() -> std::io::Result<()> { let service = service.clone(); App::new() .wrap(middleware::Logger::default()) + .wrap(middleware::Compress::default()) .configure(|cfg| configure_app(cfg, service.clone())) }) .bind(&address)? diff --git a/src/common/config.rs b/src/common/config.rs index 58072714..f065560f 100644 --- a/src/common/config.rs +++ b/src/common/config.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct ClusterConfig { #[serde(default)] pub compression_strategy: CompressionStrategy, @@ -74,7 +74,7 @@ impl ClusterConfig { } } -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CompressionStrategy { Disabled = 0, // Only allow SET, SETEX, PSETEX, SETNX, GET, GETSET , MGET, MSET, MSETNX commands for String data type @@ -137,7 +137,7 @@ impl<'de> Deserialize<'de> for CompressionStrategy { } } -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct MigrationConfig { pub max_migration_time: u64, pub max_blocking_time: u64, diff --git a/src/common/proto.rs b/src/common/proto.rs index 1efb5c61..73f70e02 100644 --- a/src/common/proto.rs +++ b/src/common/proto.rs @@ -4,8 +4,12 @@ use crate::common::cluster::ClusterName; use crate::common::config::ClusterConfig; use crate::common::utils::extract_host_from_address; use crate::protocol::{Array, BulkStr, Resp}; +use flate2::Compression; use std::collections::HashMap; use std::convert::TryFrom; +use std::io; +use std::io::Read; +use std::io::Write; use std::iter::Peekable; use std::str; @@ -30,23 +34,89 @@ macro_rules! try_get { #[derive(Debug, Clone, PartialEq)] pub struct ClusterMapFlags { pub force: bool, + pub compress: bool, } impl ClusterMapFlags { pub fn to_arg(&self) -> String { + let mut flags = Vec::new(); if self.force { - "FORCE".to_string() - } else { + flags.push("FORCE"); + } + if self.compress { + flags.push("COMPRESS"); + } + + if flags.is_empty() { "NOFLAG".to_string() + } else { + flags.join(",") } } pub fn from_arg(flags_str: &str) -> Self { let force = has_flags(flags_str, ',', "FORCE"); - ClusterMapFlags { force } + let compress = has_flags(flags_str, ',', "COMPRESS"); + ClusterMapFlags { force, compress } } } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct ProxyClusterMetaData { + local: ProxyClusterMap, + peer: ProxyClusterMap, + clusters_config: ClusterConfigMap, +} + +impl ProxyClusterMetaData { + pub fn new( + local: ProxyClusterMap, + peer: ProxyClusterMap, + clusters_config: ClusterConfigMap, + ) -> Self { + Self { + local, + peer, + clusters_config, + } + } + + pub fn gen_compressed_data(&self) -> Result { + let s = serde_json::to_string(&self).map_err(|err| { + error!("failed to encode json for meta: {:?}", err); + MetaCompressError::Json + })?; + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), Compression::fast()); + encoder + .write_all(s.as_bytes()) + .map_err(MetaCompressError::Io)?; + let buf = encoder.finish().map_err(MetaCompressError::Io)?; + Ok(base64::encode(&buf)) + } + + pub fn from_compressed_data(data: String) -> Result { + let raw = base64::decode(data).map_err(|err| { + error!("failed to decode base64 for meta: {:?}", err); + MetaCompressError::Base64 + })?; + let r = io::Cursor::new(raw); + let mut gz = flate2::read::GzDecoder::new(r); + let mut s = String::new(); + gz.read_to_string(&mut s).map_err(MetaCompressError::Io)?; + serde_json::from_str(s.as_str()).map_err(|err| { + error!("failed to decode json for meta: {:?}", err); + MetaCompressError::Json + }) + } +} + +#[derive(Debug)] +pub enum MetaCompressError { + Io(io::Error), + Json, + Base64, +} + const PEER_PREFIX: &str = "PEER"; const CONFIG_PREFIX: &str = "CONFIG"; @@ -95,6 +165,14 @@ impl ProxyClusterMeta { &self.clusters_config } + pub fn gen_data(&self) -> ProxyClusterMetaData { + ProxyClusterMetaData { + local: self.local.clone(), + peer: self.peer.clone(), + clusters_config: self.clusters_config.clone(), + } + } + pub fn from_resp>( resp: &Resp, ) -> Result<(Self, Result<(), ParseExtendedMetaError>), CmdParseError> { @@ -127,6 +205,36 @@ impl ProxyClusterMeta { let flags = ClusterMapFlags::from_arg(&try_get!(it.next())); + if flags.compress { + let compressed_data = it.next().ok_or_else(|| { + error!("failed to get compressed data for UMCTL SETCLUSTER"); + CmdParseError {} + })?; + let data = + ProxyClusterMetaData::from_compressed_data(compressed_data).map_err(|err| { + error!( + "failed to parse compressed data for UMCTL SETCLUSTER: {:?}", + err + ); + CmdParseError {} + })?; + let ProxyClusterMetaData { + local, + peer, + clusters_config, + } = data; + return Ok(( + Self { + epoch, + flags, + local, + peer, + clusters_config, + }, + Ok(()), + )); + } + let local = ProxyClusterMap::parse(it)?; let mut peer = ProxyClusterMap::new(HashMap::new()); let mut clusters_config = ClusterConfigMap::default(); @@ -178,9 +286,20 @@ impl ProxyClusterMeta { } args } + + pub fn to_compressed_args(&self) -> Result, MetaCompressError> { + let data = ProxyClusterMetaData::new( + self.local.clone(), + self.peer.clone(), + self.clusters_config.clone(), + ) + .gen_compressed_data()?; + let args = vec![self.epoch.to_string(), self.flags.to_arg(), data]; + Ok(args) + } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct ProxyClusterMap { cluster_map: HashMap>>, } @@ -280,7 +399,7 @@ impl ProxyClusterMap { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct ClusterConfigMap { config_map: HashMap, } @@ -802,6 +921,11 @@ mod tests { args.sort(); cluster_args.sort(); assert_eq!(args, cluster_args); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] @@ -835,6 +959,11 @@ mod tests { .compression_strategy, CompressionStrategy::SetGetOnly ); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] @@ -866,6 +995,11 @@ mod tests { assert!(extended_res.is_err()); assert_eq!(cluster_meta.epoch, 233); assert!(cluster_meta.flags.force); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] @@ -897,6 +1031,11 @@ mod tests { assert!(extended_res.is_err()); assert_eq!(cluster_meta.epoch, 233); assert!(cluster_meta.flags.force); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] diff --git a/src/coordinator/core.rs b/src/coordinator/core.rs index f756bc53..9958e583 100644 --- a/src/coordinator/core.rs +++ b/src/coordinator/core.rs @@ -563,6 +563,7 @@ pub enum CoordinateError { InvalidReply, InvalidAddress, InvalidConfig, + CompressionError, } impl fmt::Display for CoordinateError { diff --git a/src/coordinator/http_meta_broker.rs b/src/coordinator/http_meta_broker.rs index d90992f4..e63e42e3 100644 --- a/src/coordinator/http_meta_broker.rs +++ b/src/coordinator/http_meta_broker.rs @@ -14,14 +14,20 @@ pub struct HttpMetaBroker { broker_addresses: BrokerAddresses, broker_index: AtomicUsize, client: reqwest::Client, + enable_compression: bool, } impl HttpMetaBroker { - pub fn new(broker_addresses: BrokerAddresses, client: reqwest::Client) -> Self { + pub fn new( + broker_addresses: BrokerAddresses, + client: reqwest::Client, + enable_compression: bool, + ) -> Self { HttpMetaBroker { broker_addresses, broker_index: AtomicUsize::new(0), client, + enable_compression, } } } @@ -63,10 +69,18 @@ impl HttpMetaBroker { let url = self .gen_url(&format!("/clusters/meta/{}", name)) .ok_or_else(|| MetaDataBrokerError::NoBroker)?; - let response = self.client.get(&url).send().await.map_err(|e| { - error!("failed to get cluster {:?}", e); - MetaDataBrokerError::RequestFailed - })?; + let encoding = gen_accept_encoding(self.enable_compression); + + let response = self + .client + .get(&url) + .header(reqwest::header::ACCEPT_ENCODING, encoding) + .send() + .await + .map_err(|e| { + error!("failed to get cluster {:?}", e); + MetaDataBrokerError::RequestFailed + })?; let ClusterPayload { cluster } = response.json().await.map_err(|e| { error!("failed to get cluster from json {:?}", e); MetaDataBrokerError::InvalidReply @@ -98,10 +112,18 @@ impl HttpMetaBroker { let url = self .gen_url(&format!("/proxies/meta/{}", address)) .ok_or_else(|| MetaDataBrokerError::NoBroker)?; - let response = self.client.get(&url).send().await.map_err(|e| { - error!("failed to get proxy {:?}", e); - MetaDataBrokerError::RequestFailed - })?; + let encoding = gen_accept_encoding(self.enable_compression); + + let response = self + .client + .get(&url) + .header(reqwest::header::ACCEPT_ENCODING, encoding) + .send() + .await + .map_err(|e| { + error!("failed to get proxy {:?}", e); + MetaDataBrokerError::RequestFailed + })?; let ProxyPayload { proxy } = response.json().await.map_err(move |e| { error!("failed to get proxy {} from json {:?}", address, e); MetaDataBrokerError::InvalidReply @@ -249,6 +271,14 @@ impl MetaDataBroker for HttpMetaBroker { } } +fn gen_accept_encoding(enable_compression: bool) -> &'static str { + if enable_compression { + "gzip" + } else { + "identity" + } +} + #[derive(Deserialize, Serialize)] pub struct ClusterNamesPayload { pub names: Vec, diff --git a/src/coordinator/migration.rs b/src/coordinator/migration.rs index f1965ade..5cfaee7c 100644 --- a/src/coordinator/migration.rs +++ b/src/coordinator/migration.rs @@ -150,7 +150,7 @@ mod tests { ) } - fn create_client_func() -> impl RedisClient { + fn create_client_func(_enable_compression: bool) -> impl RedisClient { let mut mock_client = MockRedisClient::new(); let info_mgr_cmd = vec![b"UMCTL".to_vec(), b"INFOMGR".to_vec()]; @@ -171,7 +171,7 @@ mod tests { #[tokio::test] async fn test_migration_state_checker() { - let factory = DummyRedisClientFactory::new(create_client_func); + let factory = DummyRedisClientFactory::new(create_client_func, false); let checker = MigrationStateRespChecker::new(Arc::new(factory)); let res: Vec<_> = checker.check("127.0.0.1:6000".to_string()).collect().await; assert_eq!(res.len(), 1); @@ -232,7 +232,7 @@ mod tests { // Integrate together. #[tokio::test] async fn test_migration_state_sync() { - let factory = Arc::new(DummyRedisClientFactory::new(create_client_func)); + let factory = Arc::new(DummyRedisClientFactory::new(create_client_func, false)); let checker = MigrationStateRespChecker::new(factory); let mut mock_mani_broker = MockMetaManipulationBroker::new(); diff --git a/src/coordinator/service.rs b/src/coordinator/service.rs index 26b394ce..ce33d2a1 100644 --- a/src/coordinator/service.rs +++ b/src/coordinator/service.rs @@ -31,6 +31,7 @@ pub struct CoordinatorConfig { pub reporter_id: String, pub thread_number: usize, pub proxy_timeout: usize, + pub enable_compression: bool, } impl CoordinatorConfig { @@ -106,10 +107,11 @@ impl, client_factory: Arc, + enable_compression: bool, ) -> impl ProxyMetaSynchronizer { let proxy_retriever = BrokerOrderedProxiesRetriever::new(data_broker.clone()); let meta_retriever = BrokerMetaRetriever::new(data_broker); - let sender = ProxyMetaRespSender::new(client_factory); + let sender = ProxyMetaRespSender::new(client_factory, enable_compression); ProxyMetaRespSynchronizer::new(proxy_retriever, meta_retriever, sender) } @@ -123,12 +125,13 @@ impl, mani_broker: Arc, client_factory: Arc, + enable_compression: bool, ) -> impl MigrationStateSynchronizer { let proxy_retriever = BrokerProxiesRetriever::new(data_broker.clone()); let checker = MigrationStateRespChecker::new(client_factory.clone()); let committer = BrokerMigrationCommitter::new(mani_broker); let meta_retriever = BrokerMetaRetriever::new(data_broker); - let sender = ProxyMetaRespSender::new(client_factory); + let sender = ProxyMetaRespSender::new(client_factory, enable_compression); ParMigrationStateSynchronizer::new( proxy_retriever, checker, @@ -165,8 +168,11 @@ impl { client_factory: Arc, + enable_compression: bool, } impl ProxyMetaRespSender { - pub fn new(client_factory: Arc) -> Self { - Self { client_factory } + pub fn new(client_factory: Arc, enable_compression: bool) -> Self { + Self { + client_factory, + enable_compression, + } } } @@ -28,18 +34,27 @@ impl ProxyMetaRespSender { .await .map_err(CoordinateError::Redis)?; let proxy_with_only_masters = filter_proxy_masters(proxy.clone()); + let repl_flags = ClusterMapFlags { + force: false, + compress: false, + }; send_meta( &mut client, "SETREPL".to_string(), - generate_repl_meta_cmd_args(proxy, ClusterMapFlags { force: false }), - ) - .await?; - send_meta( - &mut client, - "SETCLUSTER".to_string(), - generate_proxy_meta_cmd_args(ClusterMapFlags { force: false }, proxy_with_only_masters), + generate_repl_meta_cmd_args(proxy, repl_flags), ) .await?; + + let flags = ClusterMapFlags { + force: false, + compress: self.enable_compression, + }; + let meta_cmd_args = + generate_proxy_meta_cmd_args(flags, proxy_with_only_masters).map_err(|err| { + error!("FATAL_ERROR: failed to generate {:?}", err); + CoordinateError::CompressionError + })?; + send_meta(&mut client, "SETCLUSTER".to_string(), meta_cmd_args).await?; Ok(()) } } @@ -91,7 +106,10 @@ impl ProxyMetaRetriever for BrokerMetaRetriever { } } -fn generate_proxy_meta_cmd_args(flags: ClusterMapFlags, proxy: Proxy) -> Vec { +fn generate_proxy_meta_cmd_args( + flags: ClusterMapFlags, + proxy: Proxy, +) -> Result, MetaCompressError> { let epoch = proxy.get_epoch(); let clusters_config = ClusterConfigMap::new(proxy.get_clusters_config().clone()); @@ -115,8 +133,13 @@ fn generate_proxy_meta_cmd_args(flags: ClusterMapFlags, proxy: Proxy) -> Vec Vec { + let epoch = 7799; + let flags = ClusterMapFlags::from_arg("COMPRESS"); + + let mut local_map = HashMap::new(); + let mut node_map = HashMap::new(); + let slots = vec!["1".to_string(), "233-666".to_string()]; + node_map.insert( + "127.0.0.1:7001".to_string(), + vec![SlotRange::from_strings(&mut slots.into_iter().peekable()).unwrap()], + ); + local_map.insert(ClusterName::try_from("mycluster").unwrap(), node_map); + let local = ProxyClusterMap::new(local_map); + + let peer = ProxyClusterMap::new(HashMap::new()); + let mut cluster_map = HashMap::new(); + cluster_map.insert( + ClusterName::try_from("mycluster").unwrap(), + ClusterConfig::default(), + ); + let clusters_config = ClusterConfigMap::new(cluster_map); + + let meta = ProxyClusterMeta::new(epoch, flags, local, peer, clusters_config); + meta.to_compressed_args().unwrap() + } + #[test] fn test_master_generate_repl_meta_cmd_args() { let proxy = gen_testing_proxy(Role::Master); - let args = generate_repl_meta_cmd_args(proxy, ClusterMapFlags { force: false }); + let args = generate_repl_meta_cmd_args( + proxy, + ClusterMapFlags { + force: false, + compress: false, + }, + ); assert_eq!(args, gen_master_args()) } #[test] fn test_replica_generate_repl_meta_cmd_args() { let proxy = gen_testing_proxy(Role::Replica); - let args = generate_repl_meta_cmd_args(proxy, ClusterMapFlags { force: true }); + let args = generate_repl_meta_cmd_args( + proxy, + ClusterMapFlags { + force: true, + compress: false, + }, + ); assert_eq!(args, gen_replica_args()) } @@ -382,7 +443,7 @@ mod tests { assert!(res.is_err()); } - fn create_client_func() -> impl RedisClient { + fn create_client_func(enable_compression: bool) -> impl RedisClient { let call_times = Arc::new(AtomicUsize::new(0)); let mut mock_client = MockRedisClient::new(); @@ -394,14 +455,22 @@ mod tests { .map(|s| s.into_bytes()) .collect(), ); + let mut set_cluster_cmd = vec![b"UMCTL".to_vec(), b"SETCLUSTER".to_vec()]; - set_cluster_cmd.append( - &mut gen_set_cluster_args() + if enable_compression { + let mut args = gen_set_cluster_compress_args() .into_iter() .map(|s| s.into_bytes()) - .collect(), - ); - set_cluster_cmd.push(b"CONFIG".to_vec()); + .collect(); + set_cluster_cmd.append(&mut args); + } else { + let mut args = gen_set_cluster_args() + .into_iter() + .map(|s| s.into_bytes()) + .collect(); + set_cluster_cmd.append(&mut args); + set_cluster_cmd.push(b"CONFIG".to_vec()); + } mock_client .expect_execute_single() @@ -409,6 +478,8 @@ mod tests { if call_times.load(Ordering::SeqCst) == 0 { call_times.fetch_add(1, Ordering::SeqCst); command.eq(&set_repl_cmd) + } else if enable_compression { + command.eq(&set_cluster_cmd) } else { // Ignore the config part let cmd = command.get(0..set_cluster_cmd.len()).unwrap().to_vec(); @@ -423,8 +494,13 @@ mod tests { #[tokio::test] async fn test_meta_resp_sender() { - let client_factory = DummyRedisClientFactory::new(create_client_func); - let sender = ProxyMetaRespSender::new(Arc::new(client_factory)); + test_meta_resp_sender_helper(false).await; + test_meta_resp_sender_helper(true).await; + } + + async fn test_meta_resp_sender_helper(enable_compression: bool) { + let client_factory = DummyRedisClientFactory::new(create_client_func, enable_compression); + let sender = ProxyMetaRespSender::new(Arc::new(client_factory), enable_compression); let proxy = gen_testing_proxy(Role::Master); let res = sender.send_meta(proxy).await; assert!(res.is_ok()); @@ -455,6 +531,11 @@ mod tests { // Integrate together #[tokio::test] async fn test_meta_sync() { + test_meta_sync_helper(false).await; + test_meta_sync_helper(true).await; + } + + async fn test_meta_sync_helper(enable_compression: bool) { let mut mock_broker = MockMetaDataBroker::new(); let proxy_addr = gen_testing_proxy(Role::Master).get_address().to_string(); let proxy_addr2 = proxy_addr.clone(); @@ -490,8 +571,8 @@ mod tests { let proxies_retriever = BrokerProxiesRetriever::new(mock_broker.clone()); let meta_retriever = BrokerMetaRetriever::new(mock_broker); - let client_factory = DummyRedisClientFactory::new(create_client_func); - let sender = ProxyMetaRespSender::new(Arc::new(client_factory)); + let client_factory = DummyRedisClientFactory::new(create_client_func, enable_compression); + let sender = ProxyMetaRespSender::new(Arc::new(client_factory), enable_compression); let sync = ProxyMetaRespSynchronizer::new(proxies_retriever, meta_retriever, sender); let results: Vec<_> = sync.run().collect().await; diff --git a/src/protocol/client.rs b/src/protocol/client.rs index fb402c5e..53e43edd 100644 --- a/src/protocol/client.rs +++ b/src/protocol/client.rs @@ -139,25 +139,29 @@ fn process_multi_cmd_result( pub struct DummyRedisClientFactory where C: RedisClient + Sync + 'static, - F: Fn() -> C + ThreadSafe, + F: Fn(bool) -> C + ThreadSafe, { create_func: F, + enable_compression: bool, } impl DummyRedisClientFactory where C: RedisClient + Sync + 'static, - F: Fn() -> C + ThreadSafe, + F: Fn(bool) -> C + ThreadSafe, { - pub fn new(create_func: F) -> Self { - Self { create_func } + pub fn new(create_func: F, enable_compression: bool) -> Self { + Self { + create_func, + enable_compression, + } } } impl RedisClientFactory for DummyRedisClientFactory where C: RedisClient + Sync + 'static, - F: Fn() -> C + ThreadSafe, + F: Fn(bool) -> C + ThreadSafe, { type Client = C; @@ -165,7 +169,7 @@ where &'s self, _address: String, ) -> Pin> + Send + 's>> { - let client = (self.create_func)(); + let client = (self.create_func)(self.enable_compression); Box::pin(async move { Ok(client) }) } } diff --git a/src/replication/replicator.rs b/src/replication/replicator.rs index 29f4549b..34810755 100644 --- a/src/replication/replicator.rs +++ b/src/replication/replicator.rs @@ -209,7 +209,13 @@ mod tests { assert!(r.is_ok()); let meta = r.unwrap(); assert_eq!(meta.epoch, 233); - assert_eq!(meta.flags, ClusterMapFlags { force: true }); + assert_eq!( + meta.flags, + ClusterMapFlags { + force: true, + compress: false + } + ); assert_eq!(meta.masters.len(), 1); assert_eq!(meta.replicas.len(), 0); @@ -231,7 +237,13 @@ mod tests { assert!(r.is_ok()); let meta = r.unwrap(); assert_eq!(meta.epoch, 233); - assert_eq!(meta.flags, ClusterMapFlags { force: false }); + assert_eq!( + meta.flags, + ClusterMapFlags { + force: false, + compress: false + } + ); assert_eq!(meta.masters.len(), 1); assert_eq!(meta.replicas.len(), 1); diff --git a/tests/proxy_manager_test.rs b/tests/proxy_manager_test.rs index de40b10c..a3f2a61f 100644 --- a/tests/proxy_manager_test.rs +++ b/tests/proxy_manager_test.rs @@ -113,7 +113,10 @@ mod tests { ReplicatorMeta { epoch: 233, - flags: ClusterMapFlags { force: false }, + flags: ClusterMapFlags { + force: false, + compress: false, + }, masters: vec![MasterMeta { cluster_name: cluster_name.clone(), master_node_address: "127.0.0.1:6379".to_string(), @@ -132,7 +135,10 @@ mod tests { ReplicatorMeta { epoch: 233, - flags: ClusterMapFlags { force: false }, + flags: ClusterMapFlags { + force: false, + compress: false, + }, masters: vec![MasterMeta { cluster_name: cluster_name.clone(), master_node_address: "127.0.0.1:6379".to_string(), @@ -147,7 +153,10 @@ mod tests { ReplicatorMeta { epoch: 233, - flags: ClusterMapFlags { force: false }, + flags: ClusterMapFlags { + force: false, + compress: false, + }, masters: vec![], replicas: vec![ReplicaMeta { cluster_name: cluster_name.clone(),