Skip to content

Commit

Permalink
Support compression from coordinator to proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
doyoubi committed Dec 7, 2020
1 parent 536a99a commit 3b77d99
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 51 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ backtrace = "0.3"
jemallocator = "0.3.0"
async-trait = "0.1"
derivative = "2.1.1"
flate2 = "1"
base64 = "0.13.0"

[profile.release]
debug = true
Expand Down
6 changes: 3 additions & 3 deletions src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct ClusterConfig {
#[serde(default)]
pub compression_strategy: CompressionStrategy,
Expand Down Expand Up @@ -74,7 +74,7 @@ impl ClusterConfig {
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionStrategy {
Disabled = 0,
// Only allow SET, SETEX, PSETEX, SETNX, GET, GETSET , MGET, MSET, MSETNX commands for String data type
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<'de> Deserialize<'de> for CompressionStrategy {
}
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct MigrationConfig {
pub max_migration_time: u64,
pub max_blocking_time: u64,
Expand Down
119 changes: 114 additions & 5 deletions src/common/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ use crate::common::cluster::ClusterName;
use crate::common::config::ClusterConfig;
use crate::common::utils::extract_host_from_address;
use crate::protocol::{Array, BulkStr, Resp};
use flate2::Compression;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::io;
use std::io::Read;
use std::io::Write;
use std::iter::Peekable;
use std::str;

Expand All @@ -30,23 +34,89 @@ macro_rules! try_get {
#[derive(Debug, Clone, PartialEq)]
pub struct ClusterMapFlags {
pub force: bool,
pub compress: bool,
}

impl ClusterMapFlags {
pub fn to_arg(&self) -> String {
let mut flags = Vec::new();
if self.force {
"FORCE".to_string()
} else {
flags.push("FORCE");
}
if self.compress {
flags.push("COMPRESS");
}

if flags.is_empty() {
"NOFLAG".to_string()
} else {
flags.join(",")
}
}

pub fn from_arg(flags_str: &str) -> Self {
let force = has_flags(flags_str, ',', "FORCE");
ClusterMapFlags { force }
let compress = has_flags(flags_str, ',', "COMPRESS");
ClusterMapFlags { force, compress }
}
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct ProxyClusterMetaData {
local: ProxyClusterMap,
peer: ProxyClusterMap,
clusters_config: ClusterConfigMap,
}

impl ProxyClusterMetaData {
pub fn new(
local: ProxyClusterMap,
peer: ProxyClusterMap,
clusters_config: ClusterConfigMap,
) -> Self {
Self {
local,
peer,
clusters_config,
}
}

pub fn gen_compressed_data(&self) -> Result<String, MetaCompressError> {
let s = serde_json::to_string(&self).map_err(|err| {
error!("failed to encode json for meta: {:?}", err);
MetaCompressError::Json
})?;
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), Compression::fast());
encoder
.write_all(s.as_bytes())
.map_err(MetaCompressError::Io)?;
let buf = encoder.finish().map_err(MetaCompressError::Io)?;
Ok(base64::encode(&buf))
}

pub fn from_compressed_data(data: String) -> Result<Self, MetaCompressError> {
let raw = base64::decode(data).map_err(|err| {
error!("failed to decode base64 for meta: {:?}", err);
MetaCompressError::Base64
})?;
let r = io::Cursor::new(raw);
let mut gz = flate2::read::GzDecoder::new(r);
let mut s = String::new();
gz.read_to_string(&mut s).map_err(MetaCompressError::Io)?;
serde_json::from_str(s.as_str()).map_err(|err| {
error!("failed to decode json for meta: {:?}", err);
MetaCompressError::Json
})
}
}

#[derive(Debug)]
pub enum MetaCompressError {
Io(io::Error),
Json,
Base64,
}

const PEER_PREFIX: &str = "PEER";
const CONFIG_PREFIX: &str = "CONFIG";

Expand Down Expand Up @@ -95,6 +165,14 @@ impl ProxyClusterMeta {
&self.clusters_config
}

pub fn gen_data(&self) -> ProxyClusterMetaData {
ProxyClusterMetaData {
local: self.local.clone(),
peer: self.peer.clone(),
clusters_config: self.clusters_config.clone(),
}
}

pub fn from_resp<T: AsRef<[u8]>>(
resp: &Resp<T>,
) -> Result<(Self, Result<(), ParseExtendedMetaError>), CmdParseError> {
Expand Down Expand Up @@ -178,9 +256,20 @@ impl ProxyClusterMeta {
}
args
}

pub fn to_compressed_args(&self) -> Result<Vec<String>, MetaCompressError> {
let data = ProxyClusterMetaData::new(
self.local.clone(),
self.peer.clone(),
self.clusters_config.clone(),
)
.gen_compressed_data()?;
let args = vec![self.epoch.to_string(), self.flags.to_arg(), data];
Ok(args)
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct ProxyClusterMap {
cluster_map: HashMap<ClusterName, HashMap<String, Vec<SlotRange>>>,
}
Expand Down Expand Up @@ -280,7 +369,7 @@ impl ProxyClusterMap {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct ClusterConfigMap {
config_map: HashMap<ClusterName, ClusterConfig>,
}
Expand Down Expand Up @@ -802,6 +891,11 @@ mod tests {
args.sort();
cluster_args.sort();
assert_eq!(args, cluster_args);

let metadata = cluster_meta.gen_data();
let d = metadata.gen_compressed_data().unwrap();
let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap();
assert_eq!(metadata, metadata2);
}

#[test]
Expand Down Expand Up @@ -835,6 +929,11 @@ mod tests {
.compression_strategy,
CompressionStrategy::SetGetOnly
);

let metadata = cluster_meta.gen_data();
let d = metadata.gen_compressed_data().unwrap();
let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap();
assert_eq!(metadata, metadata2);
}

#[test]
Expand Down Expand Up @@ -866,6 +965,11 @@ mod tests {
assert!(extended_res.is_err());
assert_eq!(cluster_meta.epoch, 233);
assert!(cluster_meta.flags.force);

let metadata = cluster_meta.gen_data();
let d = metadata.gen_compressed_data().unwrap();
let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap();
assert_eq!(metadata, metadata2);
}

#[test]
Expand Down Expand Up @@ -897,6 +1001,11 @@ mod tests {
assert!(extended_res.is_err());
assert_eq!(cluster_meta.epoch, 233);
assert!(cluster_meta.flags.force);

let metadata = cluster_meta.gen_data();
let d = metadata.gen_compressed_data().unwrap();
let metadata2 = ProxyClusterMetaData::from_compressed_data(d).unwrap();
assert_eq!(metadata, metadata2);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/coordinator/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ pub enum CoordinateError {
InvalidReply,
InvalidAddress,
InvalidConfig,
CompressionError,
}

impl fmt::Display for CoordinateError {
Expand Down
6 changes: 3 additions & 3 deletions src/coordinator/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ mod tests {
)
}

fn create_client_func() -> impl RedisClient {
fn create_client_func(_enable_compression: bool) -> impl RedisClient {
let mut mock_client = MockRedisClient::new();

let info_mgr_cmd = vec![b"UMCTL".to_vec(), b"INFOMGR".to_vec()];
Expand All @@ -171,7 +171,7 @@ mod tests {

#[tokio::test]
async fn test_migration_state_checker() {
let factory = DummyRedisClientFactory::new(create_client_func);
let factory = DummyRedisClientFactory::new(create_client_func, false);
let checker = MigrationStateRespChecker::new(Arc::new(factory));
let res: Vec<_> = checker.check("127.0.0.1:6000".to_string()).collect().await;
assert_eq!(res.len(), 1);
Expand Down Expand Up @@ -232,7 +232,7 @@ mod tests {
// Integrate together.
#[tokio::test]
async fn test_migration_state_sync() {
let factory = Arc::new(DummyRedisClientFactory::new(create_client_func));
let factory = Arc::new(DummyRedisClientFactory::new(create_client_func, false));
let checker = MigrationStateRespChecker::new(factory);

let mut mock_mani_broker = MockMetaManipulationBroker::new();
Expand Down
14 changes: 10 additions & 4 deletions src/coordinator/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ impl<DB: MetaDataBroker + ThreadSafe, MB: MetaManipulationBroker, F: RedisClient
fn gen_proxy_meta_synchronizer(
data_broker: Arc<DB>,
client_factory: Arc<F>,
enable_compression: bool,
) -> impl ProxyMetaSynchronizer {
let proxy_retriever = BrokerOrderedProxiesRetriever::new(data_broker.clone());
let meta_retriever = BrokerMetaRetriever::new(data_broker);
let sender = ProxyMetaRespSender::new(client_factory);
let sender = ProxyMetaRespSender::new(client_factory, enable_compression);
ProxyMetaRespSynchronizer::new(proxy_retriever, meta_retriever, sender)
}

Expand All @@ -124,12 +125,13 @@ impl<DB: MetaDataBroker + ThreadSafe, MB: MetaManipulationBroker, F: RedisClient
data_broker: Arc<DB>,
mani_broker: Arc<MB>,
client_factory: Arc<F>,
enable_compression: bool,
) -> impl MigrationStateSynchronizer {
let proxy_retriever = BrokerProxiesRetriever::new(data_broker.clone());
let checker = MigrationStateRespChecker::new(client_factory.clone());
let committer = BrokerMigrationCommitter::new(mani_broker);
let meta_retriever = BrokerMetaRetriever::new(data_broker);
let sender = ProxyMetaRespSender::new(client_factory);
let sender = ProxyMetaRespSender::new(client_factory, enable_compression);
ParMigrationStateSynchronizer::new(
proxy_retriever,
checker,
Expand Down Expand Up @@ -166,8 +168,11 @@ impl<DB: MetaDataBroker + ThreadSafe, MB: MetaManipulationBroker, F: RedisClient
loop {
trace!("start sync proxy meta data");
defer!(trace!("proxy meta sync finished a round"));
let sync =
Self::gen_proxy_meta_synchronizer(data_broker.clone(), client_factory.clone());
let sync = Self::gen_proxy_meta_synchronizer(
data_broker.clone(),
client_factory.clone(),
self.config.enable_compression,
);
let mut s = sync.run();
while let Some(r) = s.next().await {
if let Err(e) = r {
Expand Down Expand Up @@ -206,6 +211,7 @@ impl<DB: MetaDataBroker + ThreadSafe, MB: MetaManipulationBroker, F: RedisClient
data_broker.clone(),
mani_broker.clone(),
client_factory.clone(),
self.config.enable_compression,
);
let mut s = sync.run();
while let Some(r) = s.next().await {
Expand Down
Loading

0 comments on commit 3b77d99

Please sign in to comment.