From 3b77d991a84060dc1e056e9fcd0ec7c71b1d41dc Mon Sep 17 00:00:00 2001 From: doyoubi Date: Sun, 6 Dec 2020 21:48:07 +0800 Subject: [PATCH] Support compression from coordinator to proxy --- Cargo.lock | 8 +++ Cargo.toml | 2 + src/common/config.rs | 6 +- src/common/proto.rs | 119 ++++++++++++++++++++++++++++-- src/coordinator/core.rs | 1 + src/coordinator/migration.rs | 6 +- src/coordinator/service.rs | 14 ++-- src/coordinator/sync.rs | 131 +++++++++++++++++++++++++++------- src/protocol/client.rs | 16 +++-- src/replication/replicator.rs | 16 ++++- tests/proxy_manager_test.rs | 15 +++- 11 files changed, 283 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bde32ad8..eaded3aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -418,6 +418,11 @@ name = "base64" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "bitflags" version = "1.2.1" @@ -2114,6 +2119,7 @@ dependencies = [ "atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "atomic-option 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "backtrace 0.3.50 (registry+https://github.com/rust-lang/crates.io-index)", + "base64 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "btoi 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "caseless 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2128,6 +2134,7 @@ dependencies = [ "derivative 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "either 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures-batch 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2433,6 +2440,7 @@ dependencies = [ "checksum backtrace 0.3.50 (registry+https://github.com/rust-lang/crates.io-index)" = "46254cf2fdcdf1badb5934448c1bcbe046a56537b3987d96c51a7afc5d03f293" "checksum base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" "checksum base64 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" +"checksum base64 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum brotli-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4445dea95f4c2b41cde57cc9fee236ae4dbae88d8fcbdb4750fc1bb5d86aaecd" "checksum brotli2 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0cb036c3eade309815c15ddbacec5b22c4d1f3983a774ab2eac2e3e9ea85568e" diff --git a/Cargo.toml b/Cargo.toml index 324f1748..67a1d9a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,8 @@ backtrace = "0.3" jemallocator = "0.3.0" async-trait = "0.1" derivative = "2.1.1" +flate2 = "1" +base64 = "0.13.0" [profile.release] debug = true diff --git a/src/common/config.rs b/src/common/config.rs index 58072714..f065560f 100644 --- a/src/common/config.rs +++ b/src/common/config.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct ClusterConfig { #[serde(default)] pub compression_strategy: CompressionStrategy, @@ -74,7 +74,7 @@ impl ClusterConfig { } } -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CompressionStrategy { Disabled = 0, // Only allow SET, SETEX, PSETEX, SETNX, GET, GETSET , MGET, MSET, MSETNX commands for String data type @@ -137,7 +137,7 @@ impl<'de> Deserialize<'de> for CompressionStrategy { } } -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct MigrationConfig { pub max_migration_time: u64, pub max_blocking_time: u64, diff --git a/src/common/proto.rs b/src/common/proto.rs index 1efb5c61..0fe0ec6a 100644 --- a/src/common/proto.rs +++ b/src/common/proto.rs @@ -4,8 +4,12 @@ use crate::common::cluster::ClusterName; use crate::common::config::ClusterConfig; use crate::common::utils::extract_host_from_address; use crate::protocol::{Array, BulkStr, Resp}; +use flate2::Compression; use std::collections::HashMap; use std::convert::TryFrom; +use std::io; +use std::io::Read; +use std::io::Write; use std::iter::Peekable; use std::str; @@ -30,23 +34,89 @@ macro_rules! try_get { #[derive(Debug, Clone, PartialEq)] pub struct ClusterMapFlags { pub force: bool, + pub compress: bool, } impl ClusterMapFlags { pub fn to_arg(&self) -> String { + let mut flags = Vec::new(); if self.force { - "FORCE".to_string() - } else { + flags.push("FORCE"); + } + if self.compress { + flags.push("COMPRESS"); + } + + if flags.is_empty() { "NOFLAG".to_string() + } else { + flags.join(",") } } pub fn from_arg(flags_str: &str) -> Self { let force = has_flags(flags_str, ',', "FORCE"); - ClusterMapFlags { force } + let compress = has_flags(flags_str, ',', "COMPRESS"); + ClusterMapFlags { force, compress } } } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct ProxyClusterMetaData { + local: ProxyClusterMap, + peer: ProxyClusterMap, + clusters_config: ClusterConfigMap, +} + +impl ProxyClusterMetaData { + pub fn new( + local: ProxyClusterMap, + peer: ProxyClusterMap, + clusters_config: ClusterConfigMap, + ) -> Self { + Self { + local, + peer, + clusters_config, + } + } + + pub fn gen_compressed_data(&self) -> Result { + let s = serde_json::to_string(&self).map_err(|err| { + error!("failed to encode json for meta: {:?}", err); + MetaCompressError::Json + })?; + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), Compression::fast()); + encoder + .write_all(s.as_bytes()) + .map_err(MetaCompressError::Io)?; + let buf = encoder.finish().map_err(MetaCompressError::Io)?; + Ok(base64::encode(&buf)) + } + + pub fn from_compressed_data(data: String) -> Result { + let raw = base64::decode(data).map_err(|err| { + error!("failed to decode base64 for meta: {:?}", err); + MetaCompressError::Base64 + })?; + let r = io::Cursor::new(raw); + let mut gz = flate2::read::GzDecoder::new(r); + let mut s = String::new(); + gz.read_to_string(&mut s).map_err(MetaCompressError::Io)?; + serde_json::from_str(s.as_str()).map_err(|err| { + error!("failed to decode json for meta: {:?}", err); + MetaCompressError::Json + }) + } +} + +#[derive(Debug)] +pub enum MetaCompressError { + Io(io::Error), + Json, + Base64, +} + const PEER_PREFIX: &str = "PEER"; const CONFIG_PREFIX: &str = "CONFIG"; @@ -95,6 +165,14 @@ impl ProxyClusterMeta { &self.clusters_config } + pub fn gen_data(&self) -> ProxyClusterMetaData { + ProxyClusterMetaData { + local: self.local.clone(), + peer: self.peer.clone(), + clusters_config: self.clusters_config.clone(), + } + } + pub fn from_resp>( resp: &Resp, ) -> Result<(Self, Result<(), ParseExtendedMetaError>), CmdParseError> { @@ -178,9 +256,20 @@ impl ProxyClusterMeta { } args } + + pub fn to_compressed_args(&self) -> Result, MetaCompressError> { + let data = ProxyClusterMetaData::new( + self.local.clone(), + self.peer.clone(), + self.clusters_config.clone(), + ) + .gen_compressed_data()?; + let args = vec![self.epoch.to_string(), self.flags.to_arg(), data]; + Ok(args) + } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct ProxyClusterMap { cluster_map: HashMap>>, } @@ -280,7 +369,7 @@ impl ProxyClusterMap { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct ClusterConfigMap { config_map: HashMap, } @@ -802,6 +891,11 @@ mod tests { args.sort(); cluster_args.sort(); assert_eq!(args, cluster_args); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] @@ -835,6 +929,11 @@ mod tests { .compression_strategy, CompressionStrategy::SetGetOnly ); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] @@ -866,6 +965,11 @@ mod tests { assert!(extended_res.is_err()); assert_eq!(cluster_meta.epoch, 233); assert!(cluster_meta.flags.force); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] @@ -897,6 +1001,11 @@ mod tests { assert!(extended_res.is_err()); assert_eq!(cluster_meta.epoch, 233); assert!(cluster_meta.flags.force); + + let metadata = cluster_meta.gen_data(); + let d = metadata.gen_compressed_data().unwrap(); + let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap(); + assert_eq!(metadata, metadata2); } #[test] diff --git a/src/coordinator/core.rs b/src/coordinator/core.rs index f756bc53..9958e583 100644 --- a/src/coordinator/core.rs +++ b/src/coordinator/core.rs @@ -563,6 +563,7 @@ pub enum CoordinateError { InvalidReply, InvalidAddress, InvalidConfig, + CompressionError, } impl fmt::Display for CoordinateError { diff --git a/src/coordinator/migration.rs b/src/coordinator/migration.rs index f1965ade..5cfaee7c 100644 --- a/src/coordinator/migration.rs +++ b/src/coordinator/migration.rs @@ -150,7 +150,7 @@ mod tests { ) } - fn create_client_func() -> impl RedisClient { + fn create_client_func(_enable_compression: bool) -> impl RedisClient { let mut mock_client = MockRedisClient::new(); let info_mgr_cmd = vec![b"UMCTL".to_vec(), b"INFOMGR".to_vec()]; @@ -171,7 +171,7 @@ mod tests { #[tokio::test] async fn test_migration_state_checker() { - let factory = DummyRedisClientFactory::new(create_client_func); + let factory = DummyRedisClientFactory::new(create_client_func, false); let checker = MigrationStateRespChecker::new(Arc::new(factory)); let res: Vec<_> = checker.check("127.0.0.1:6000".to_string()).collect().await; assert_eq!(res.len(), 1); @@ -232,7 +232,7 @@ mod tests { // Integrate together. #[tokio::test] async fn test_migration_state_sync() { - let factory = Arc::new(DummyRedisClientFactory::new(create_client_func)); + let factory = Arc::new(DummyRedisClientFactory::new(create_client_func, false)); let checker = MigrationStateRespChecker::new(factory); let mut mock_mani_broker = MockMetaManipulationBroker::new(); diff --git a/src/coordinator/service.rs b/src/coordinator/service.rs index 30df2546..ce33d2a1 100644 --- a/src/coordinator/service.rs +++ b/src/coordinator/service.rs @@ -107,10 +107,11 @@ impl, client_factory: Arc, + enable_compression: bool, ) -> impl ProxyMetaSynchronizer { let proxy_retriever = BrokerOrderedProxiesRetriever::new(data_broker.clone()); let meta_retriever = BrokerMetaRetriever::new(data_broker); - let sender = ProxyMetaRespSender::new(client_factory); + let sender = ProxyMetaRespSender::new(client_factory, enable_compression); ProxyMetaRespSynchronizer::new(proxy_retriever, meta_retriever, sender) } @@ -124,12 +125,13 @@ impl, mani_broker: Arc, client_factory: Arc, + enable_compression: bool, ) -> impl MigrationStateSynchronizer { let proxy_retriever = BrokerProxiesRetriever::new(data_broker.clone()); let checker = MigrationStateRespChecker::new(client_factory.clone()); let committer = BrokerMigrationCommitter::new(mani_broker); let meta_retriever = BrokerMetaRetriever::new(data_broker); - let sender = ProxyMetaRespSender::new(client_factory); + let sender = ProxyMetaRespSender::new(client_factory, enable_compression); ParMigrationStateSynchronizer::new( proxy_retriever, checker, @@ -166,8 +168,11 @@ impl { client_factory: Arc, + enable_compression: bool, } impl ProxyMetaRespSender { - pub fn new(client_factory: Arc) -> Self { - Self { client_factory } + pub fn new(client_factory: Arc, enable_compression: bool) -> Self { + Self { + client_factory, + enable_compression, + } } } @@ -28,18 +34,27 @@ impl ProxyMetaRespSender { .await .map_err(CoordinateError::Redis)?; let proxy_with_only_masters = filter_proxy_masters(proxy.clone()); + let repl_flags = ClusterMapFlags { + force: false, + compress: false, + }; send_meta( &mut client, "SETREPL".to_string(), - generate_repl_meta_cmd_args(proxy, ClusterMapFlags { force: false }), - ) - .await?; - send_meta( - &mut client, - "SETCLUSTER".to_string(), - generate_proxy_meta_cmd_args(ClusterMapFlags { force: false }, proxy_with_only_masters), + generate_repl_meta_cmd_args(proxy, repl_flags), ) .await?; + + let flags = ClusterMapFlags { + force: false, + compress: self.enable_compression, + }; + let meta_cmd_args = + generate_proxy_meta_cmd_args(flags, proxy_with_only_masters).map_err(|err| { + error!("FATAL_ERROR: failed to generate {:?}", err); + CoordinateError::CompressionError + })?; + send_meta(&mut client, "SETCLUSTER".to_string(), meta_cmd_args).await?; Ok(()) } } @@ -91,7 +106,10 @@ impl ProxyMetaRetriever for BrokerMetaRetriever { } } -fn generate_proxy_meta_cmd_args(flags: ClusterMapFlags, proxy: Proxy) -> Vec { +fn generate_proxy_meta_cmd_args( + flags: ClusterMapFlags, + proxy: Proxy, +) -> Result, MetaCompressError> { let epoch = proxy.get_epoch(); let clusters_config = ClusterConfigMap::new(proxy.get_clusters_config().clone()); @@ -115,8 +133,13 @@ fn generate_proxy_meta_cmd_args(flags: ClusterMapFlags, proxy: Proxy) -> Vec Vec { + let epoch = 7799; + let flags = ClusterMapFlags::from_arg("COMPRESS"); + + let mut local_map = HashMap::new(); + let mut node_map = HashMap::new(); + let slots = vec!["1".to_string(), "233-666".to_string()]; + node_map.insert( + "127.0.0.1:7001".to_string(), + vec![SlotRange::from_strings(&mut slots.into_iter().peekable()).unwrap()], + ); + local_map.insert(ClusterName::try_from("mycluster").unwrap(), node_map); + let local = ProxyClusterMap::new(local_map); + + let peer = ProxyClusterMap::new(HashMap::new()); + let mut cluster_map = HashMap::new(); + cluster_map.insert( + ClusterName::try_from("mycluster").unwrap(), + ClusterConfig::default(), + ); + let clusters_config = ClusterConfigMap::new(cluster_map); + + let meta = ProxyClusterMeta::new(epoch, flags, local, peer, clusters_config); + meta.to_compressed_args().unwrap() + } + #[test] fn test_master_generate_repl_meta_cmd_args() { let proxy = gen_testing_proxy(Role::Master); - let args = generate_repl_meta_cmd_args(proxy, ClusterMapFlags { force: false }); + let args = generate_repl_meta_cmd_args( + proxy, + ClusterMapFlags { + force: false, + compress: false, + }, + ); assert_eq!(args, gen_master_args()) } #[test] fn test_replica_generate_repl_meta_cmd_args() { let proxy = gen_testing_proxy(Role::Replica); - let args = generate_repl_meta_cmd_args(proxy, ClusterMapFlags { force: true }); + let args = generate_repl_meta_cmd_args( + proxy, + ClusterMapFlags { + force: true, + compress: false, + }, + ); assert_eq!(args, gen_replica_args()) } @@ -382,7 +443,7 @@ mod tests { assert!(res.is_err()); } - fn create_client_func() -> impl RedisClient { + fn create_client_func(enable_compression: bool) -> impl RedisClient { let call_times = Arc::new(AtomicUsize::new(0)); let mut mock_client = MockRedisClient::new(); @@ -394,14 +455,22 @@ mod tests { .map(|s| s.into_bytes()) .collect(), ); + let mut set_cluster_cmd = vec![b"UMCTL".to_vec(), b"SETCLUSTER".to_vec()]; - set_cluster_cmd.append( - &mut gen_set_cluster_args() + if enable_compression { + let mut args = gen_set_cluster_compress_args() .into_iter() .map(|s| s.into_bytes()) - .collect(), - ); - set_cluster_cmd.push(b"CONFIG".to_vec()); + .collect(); + set_cluster_cmd.append(&mut args); + } else { + let mut args = gen_set_cluster_args() + .into_iter() + .map(|s| s.into_bytes()) + .collect(); + set_cluster_cmd.append(&mut args); + set_cluster_cmd.push(b"CONFIG".to_vec()); + } mock_client .expect_execute_single() @@ -409,6 +478,8 @@ mod tests { if call_times.load(Ordering::SeqCst) == 0 { call_times.fetch_add(1, Ordering::SeqCst); command.eq(&set_repl_cmd) + } else if enable_compression { + command.eq(&set_cluster_cmd) } else { // Ignore the config part let cmd = command.get(0..set_cluster_cmd.len()).unwrap().to_vec(); @@ -423,8 +494,13 @@ mod tests { #[tokio::test] async fn test_meta_resp_sender() { - let client_factory = DummyRedisClientFactory::new(create_client_func); - let sender = ProxyMetaRespSender::new(Arc::new(client_factory)); + test_meta_resp_sender_helper(false).await; + test_meta_resp_sender_helper(true).await; + } + + async fn test_meta_resp_sender_helper(enable_compression: bool) { + let client_factory = DummyRedisClientFactory::new(create_client_func, enable_compression); + let sender = ProxyMetaRespSender::new(Arc::new(client_factory), enable_compression); let proxy = gen_testing_proxy(Role::Master); let res = sender.send_meta(proxy).await; assert!(res.is_ok()); @@ -455,6 +531,11 @@ mod tests { // Integrate together #[tokio::test] async fn test_meta_sync() { + test_meta_sync_helper(false).await; + test_meta_sync_helper(true).await; + } + + async fn test_meta_sync_helper(enable_compression: bool) { let mut mock_broker = MockMetaDataBroker::new(); let proxy_addr = gen_testing_proxy(Role::Master).get_address().to_string(); let proxy_addr2 = proxy_addr.clone(); @@ -490,8 +571,8 @@ mod tests { let proxies_retriever = BrokerProxiesRetriever::new(mock_broker.clone()); let meta_retriever = BrokerMetaRetriever::new(mock_broker); - let client_factory = DummyRedisClientFactory::new(create_client_func); - let sender = ProxyMetaRespSender::new(Arc::new(client_factory)); + let client_factory = DummyRedisClientFactory::new(create_client_func, enable_compression); + let sender = ProxyMetaRespSender::new(Arc::new(client_factory), enable_compression); let sync = ProxyMetaRespSynchronizer::new(proxies_retriever, meta_retriever, sender); let results: Vec<_> = sync.run().collect().await; diff --git a/src/protocol/client.rs b/src/protocol/client.rs index fb402c5e..53e43edd 100644 --- a/src/protocol/client.rs +++ b/src/protocol/client.rs @@ -139,25 +139,29 @@ fn process_multi_cmd_result( pub struct DummyRedisClientFactory where C: RedisClient + Sync + 'static, - F: Fn() -> C + ThreadSafe, + F: Fn(bool) -> C + ThreadSafe, { create_func: F, + enable_compression: bool, } impl DummyRedisClientFactory where C: RedisClient + Sync + 'static, - F: Fn() -> C + ThreadSafe, + F: Fn(bool) -> C + ThreadSafe, { - pub fn new(create_func: F) -> Self { - Self { create_func } + pub fn new(create_func: F, enable_compression: bool) -> Self { + Self { + create_func, + enable_compression, + } } } impl RedisClientFactory for DummyRedisClientFactory where C: RedisClient + Sync + 'static, - F: Fn() -> C + ThreadSafe, + F: Fn(bool) -> C + ThreadSafe, { type Client = C; @@ -165,7 +169,7 @@ where &'s self, _address: String, ) -> Pin> + Send + 's>> { - let client = (self.create_func)(); + let client = (self.create_func)(self.enable_compression); Box::pin(async move { Ok(client) }) } } diff --git a/src/replication/replicator.rs b/src/replication/replicator.rs index 29f4549b..34810755 100644 --- a/src/replication/replicator.rs +++ b/src/replication/replicator.rs @@ -209,7 +209,13 @@ mod tests { assert!(r.is_ok()); let meta = r.unwrap(); assert_eq!(meta.epoch, 233); - assert_eq!(meta.flags, ClusterMapFlags { force: true }); + assert_eq!( + meta.flags, + ClusterMapFlags { + force: true, + compress: false + } + ); assert_eq!(meta.masters.len(), 1); assert_eq!(meta.replicas.len(), 0); @@ -231,7 +237,13 @@ mod tests { assert!(r.is_ok()); let meta = r.unwrap(); assert_eq!(meta.epoch, 233); - assert_eq!(meta.flags, ClusterMapFlags { force: false }); + assert_eq!( + meta.flags, + ClusterMapFlags { + force: false, + compress: false + } + ); assert_eq!(meta.masters.len(), 1); assert_eq!(meta.replicas.len(), 1); diff --git a/tests/proxy_manager_test.rs b/tests/proxy_manager_test.rs index de40b10c..a3f2a61f 100644 --- a/tests/proxy_manager_test.rs +++ b/tests/proxy_manager_test.rs @@ -113,7 +113,10 @@ mod tests { ReplicatorMeta { epoch: 233, - flags: ClusterMapFlags { force: false }, + flags: ClusterMapFlags { + force: false, + compress: false, + }, masters: vec![MasterMeta { cluster_name: cluster_name.clone(), master_node_address: "127.0.0.1:6379".to_string(), @@ -132,7 +135,10 @@ mod tests { ReplicatorMeta { epoch: 233, - flags: ClusterMapFlags { force: false }, + flags: ClusterMapFlags { + force: false, + compress: false, + }, masters: vec![MasterMeta { cluster_name: cluster_name.clone(), master_node_address: "127.0.0.1:6379".to_string(), @@ -147,7 +153,10 @@ mod tests { ReplicatorMeta { epoch: 233, - flags: ClusterMapFlags { force: false }, + flags: ClusterMapFlags { + force: false, + compress: false, + }, masters: vec![], replicas: vec![ReplicaMeta { cluster_name: cluster_name.clone(),