Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support built-in Redis Cluster Proxy #140

Merged
merged 5 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,19 @@ docker-overmoon:
docker-compose -f examples/docker-compose-overmoon.yml up

start-func-test:
python chaostest/render_compose.py mem_broker
python chaostest/render_compose.py -t mem_broker
docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos

start-func-test-active:
python chaostest/render_compose.py -t mem_broker -a
docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos

start-chaos:
python chaostest/render_compose.py mem_broker enable_failure
python chaostest/render_compose.py -t mem_broker -f
docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos

start-chaos-active:
python chaostest/render_compose.py -t mem_broker -f -a
docker stack deploy --compose-file chaostest/chaos-docker-compose.yml chaos

stop-chaos:
Expand Down
1 change: 1 addition & 0 deletions chaostest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
'broker_port': 7799,
'broker_address': 'broker:7799',
'etcd_port': 2379,
'active_redirection': False,
'pumba_commands': {
'kill': "--random --interval 60s kill 're2:(server_proxy|coordinator|overmoon).*'",
'delay': "--random --interval 20s netem --duration 5s delay 're2:(server_proxy|coordinator|overmoon).*'",
Expand Down
25 changes: 22 additions & 3 deletions chaostest/render_compose.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import argparse

from jinja2 import Environment, FileSystemLoader

Expand All @@ -18,12 +19,30 @@ def render_docker_compose(docker_compose_yml, is_mem_broker):

if __name__ == '__main__':
# Usage:
# python chaostest/render_compose.py [overmoon|mem_broker] [enable_failure]
is_mem_broker = len(sys.argv) >= 2 and sys.argv[1] == 'mem_broker'
enable_failure = len(sys.argv) >= 3 and sys.argv[2] == 'enable_failure'
# python chaostest/render_compose.py -t [overmoon|mem_broker] [-f] [-a]
parser = argparse.ArgumentParser(description='Render docker-compose file for chaos testing')

parser.add_argument('-t', action='store', dest='test_type',default='mem_broker')
parser.add_argument('-f', action='store_true', dest="enable_failure_injection", default=False)
parser.add_argument('-a', action="store_true", dest="active_redirection", default=False)

results = parser.parse_args()

is_mem_broker = results.test_type == 'mem_broker'
enable_failure = results.enable_failure_injection
active_redirection = results.active_redirection

if not enable_failure:
print("Disable fault injection")
config.DOCKER_COMPOSE_CONFIG['pumba_commands'] = {}
else:
print("Enable fault injection")

if active_redirection:
config.DOCKER_COMPOSE_CONFIG['active_redirection'] = True
print("Enable active redirection")
else:
config.DOCKER_COMPOSE_CONFIG['active_redirection'] = False
print("Disable active redirection")

render_docker_compose('chaostest/chaos-docker-compose.yml', is_mem_broker)
1 change: 1 addition & 0 deletions chaostest/test_stack_mem_broker.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ server_proxy{{ proxy_port }}:
- UNDERMOON_SESSION_CHANNEL_SIZE=4096
- UNDERMOON_BACKEND_CHANNEL_SIZE=4096
- UNDERMOON_BACKEND_CONN_NUM=4
- UNDERMOON_ACTIVE_REDIRECTION={{ active_redirection }}
{% endfor %}
{% endfilter %}

Expand Down
6 changes: 6 additions & 0 deletions conf/server-proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ backend_batch_buf = 10
session_batch_min_time = 20000
session_batch_max_time = 400000
session_batch_buf = 10

# Active Redirection Mode
# When active_redirection is enabled,
# all the server proxies will handle the redirection inside.
# Clients don't need to be a Redis Cluster Client.
active_redirection = false
3 changes: 3 additions & 0 deletions src/bin/server_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ fn gen_conf() -> Result<ServerProxyConfig, &'static str> {
.get::<usize>("session_batch_max_time")
.unwrap_or_else(|_| 400_000),
session_batch_buf,
active_redirection: s
.get::<bool>("active_redirection")
.unwrap_or_else(|_| false),
};
Ok(config)
}
Expand Down
1 change: 1 addition & 0 deletions src/migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ where
}

let task = Arc::new(RedisScanImportingTask::new(
config.clone(),
mgr_config.clone(),
meta.clone(),
slot_range.clone(),
Expand Down
75 changes: 54 additions & 21 deletions src/migration/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use crate::common::config::AtomicMigrationConfig;
use crate::common::resp_execution::keep_connecting_and_sending_cmd;
use crate::common::response::NOT_READY_FOR_SWITCHING_REPLY;
use crate::common::track::TrackedFutureRegistry;
use crate::common::utils::{pretty_print_bytes, ThreadSafe};
use crate::common::utils::{gen_moved, get_slot, pretty_print_bytes, ThreadSafe};
use crate::common::version::UNDERMOON_MIGRATION_VERSION;
use crate::protocol::RespVec;
use crate::protocol::{RedisClientError, RedisClientFactory, Resp};
use crate::proxy::backend::{
CmdTask, CmdTaskFactory, CmdTaskSender, CmdTaskSenderFactory, RedirectionSenderFactory, ReqTask,
CmdTask, CmdTaskFactory, CmdTaskSender, CmdTaskSenderFactory, ReqTask,
};
use crate::proxy::blocking::{BlockingHandle, BlockingHintTask, TaskBlockingController};
use crate::proxy::cluster::ClusterSendError;
Expand All @@ -25,6 +25,7 @@ use atomic_option::AtomicOption;
use futures::channel::oneshot;
use futures::{future, select, Future, FutureExt, TryFutureExt};
use futures_timer::Delay;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -43,12 +44,13 @@ where
meta: MigrationMeta,
state: Arc<AtomicMigrationState>,
client_factory: Arc<RCF>,
redirection_sender_factory: RedirectionSenderFactory<T>,
stop_signal_sender: AtomicOption<oneshot::Sender<()>>,
stop_signal_receiver: AtomicOption<oneshot::Receiver<()>>,
task: Arc<ScanMigrationTask>,
blocking_ctrl: Arc<BC>,
future_registry: Arc<TrackedFutureRegistry>,
phantom: PhantomData<T>,
active_redirection: bool,
}

impl<RCF, T, BC> RedisScanMigratingTask<RCF, T, BC>
Expand All @@ -59,7 +61,7 @@ where
{
#[allow(clippy::too_many_arguments)]
pub fn new(
_config: Arc<ServerProxyConfig>,
config: Arc<ServerProxyConfig>,
mgr_config: Arc<AtomicMigrationConfig>,
cluster_name: ClusterName,
slot_range: SlotRange,
Expand All @@ -76,8 +78,8 @@ where
client_factory.clone(),
mgr_config.clone(),
);
let redirection_sender_factory = RedirectionSenderFactory::default();
let range_map = RangeMap::from(slot_range.get_range_list());
let active_redirection = config.active_redirection;
Self {
mgr_config,
cluster_name,
Expand All @@ -86,12 +88,13 @@ where
meta,
state: Arc::new(AtomicMigrationState::initial_state()),
client_factory,
redirection_sender_factory,
stop_signal_sender: AtomicOption::new(Box::new(stop_signal_sender)),
stop_signal_receiver: AtomicOption::new(Box::new(stop_signal_receiver)),
task: Arc::new(task),
blocking_ctrl,
future_registry,
phantom: PhantomData,
active_redirection,
}
}

Expand Down Expand Up @@ -393,12 +396,11 @@ where
_ => (),
}

let redirection_sender = self
.redirection_sender_factory
.create(self.meta.dst_proxy_address.clone());
redirection_sender
.send(cmd_task)
.map_err(|_e| ClusterSendError::MigrationError)
handle_redirection(
cmd_task,
self.meta.dst_proxy_address.clone(),
self.active_redirection,
)
}

fn get_state(&self) -> MigrationState {
Expand Down Expand Up @@ -456,11 +458,11 @@ where
state: Arc<AtomicMigrationState>,
_client_factory: Arc<RCF>,
_sender_factory: Arc<TSF>,
redirection_sender_factory: RedirectionSenderFactory<CTF::Task>,
stop_signal_sender: AtomicOption<oneshot::Sender<()>>,
stop_signal_receiver: AtomicOption<oneshot::Receiver<()>>,
cmd_handler: RestoreDataCmdTaskHandler<CTF, <TSF as CmdTaskSenderFactory>::Sender>,
_cmd_task_factory: Arc<CTF>,
active_redirection: bool,
}

impl<RCF, TSF, CTF> RedisScanImportingTask<RCF, TSF, CTF>
Expand All @@ -471,6 +473,7 @@ where
<TSF as CmdTaskSenderFactory>::Sender: ThreadSafe + CmdTaskSender<Task = ReqTask<CTF::Task>>,
{
pub fn new(
config: Arc<ServerProxyConfig>,
mgr_config: Arc<AtomicMigrationConfig>,
meta: MigrationMeta,
slot_range: SlotRange,
Expand All @@ -483,20 +486,20 @@ where
let cmd_handler =
RestoreDataCmdTaskHandler::new(src_sender, dst_sender, cmd_task_factory.clone());
let (stop_signal_sender, stop_signal_receiver) = oneshot::channel();
let redirection_sender_factory = RedirectionSenderFactory::default();
let range_map = RangeMap::from(slot_range.get_range_list());
let active_redirection = config.active_redirection;
Self {
_mgr_config: mgr_config,
meta,
range_map,
state: Arc::new(AtomicMigrationState::initial_state()),
_client_factory: client_factory,
_sender_factory: sender_factory,
redirection_sender_factory,
stop_signal_sender: AtomicOption::new(Box::new(stop_signal_sender)),
stop_signal_receiver: AtomicOption::new(Box::new(stop_signal_receiver)),
cmd_handler,
_cmd_task_factory: cmd_task_factory,
active_redirection,
}
}
}
Expand Down Expand Up @@ -546,12 +549,11 @@ where
cmd_task: Self::Task,
) -> Result<(), ClusterSendError<BlockingHintTask<Self::Task>>> {
if self.state.get_state() == MigrationState::PreCheck {
let redirection_sender = self
.redirection_sender_factory
.create(self.meta.src_proxy_address.clone());
return redirection_sender
.send(cmd_task)
.map_err(|_e| ClusterSendError::MigrationError);
return handle_redirection(
cmd_task,
self.meta.src_proxy_address.clone(),
self.active_redirection,
);
}

self.cmd_handler.handle_cmd_task(cmd_task);
Expand Down Expand Up @@ -613,3 +615,34 @@ impl Drop for ImportingTaskHandle {
self.send_stop_signal()
}
}

fn handle_redirection<T: CmdTask>(
cmd_task: T,
redirection_address: String,
active_redirection: bool,
) -> Result<(), ClusterSendError<BlockingHintTask<T>>> {
let key = match cmd_task.get_key() {
Some(key) => key,
None => {
let resp = Resp::Error("missing key".to_string().into_bytes());
cmd_task.set_resp_result(Ok(resp));
return Ok(());
}
};

let slot = get_slot(key);

if active_redirection {
let cmd_task = BlockingHintTask::new(cmd_task, false);
// Proceed the command inside this proxy.
Err(ClusterSendError::Moved {
task: cmd_task,
slot,
address: redirection_address,
})
} else {
let resp = Resp::Error(gen_moved(get_slot(key), redirection_address).into_bytes());
cmd_task.set_resp_result(Ok(resp));
Ok(())
}
}
55 changes: 11 additions & 44 deletions src/proxy/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::slowlog::TaskEvent;
use crate::common::batch::TryChunksTimeoutStreamExt;
use crate::common::response::ERR_BACKEND_CONNECTION;
use crate::common::track::TrackedFutureRegistry;
use crate::common::utils::{gen_moved, get_slot, resolve_first_address, ThreadSafe};
use crate::common::utils::{resolve_first_address, ThreadSafe};
use crate::protocol::{
new_simple_packet_codec, DecodeError, EncodeError, EncodedPacket, FromResp, MonoPacket,
OptionalMulti, Packet, Resp, RespCodec, RespVec,
Expand Down Expand Up @@ -62,6 +62,16 @@ pub trait CmdTask: ThreadSafe {
fn log_event(&mut self, event: TaskEvent);
}

pub trait IntoTask<T: CmdTask>: CmdTask {
fn into_task(self) -> T;
}

impl<T: CmdTask> IntoTask<T> for T {
fn into_task(self) -> T {
self
}
}

pub trait CmdTaskFactory {
type Task: CmdTask;

Expand Down Expand Up @@ -765,49 +775,6 @@ impl<S: CmdTaskSender> CmdTaskSender for CachedSender<S> {
}
}

pub struct RedirectionSender<T: CmdTask> {
redirection_address: String,
phantom: PhantomData<T>,
}

impl<T: CmdTask> CmdTaskSender for RedirectionSender<T> {
type Task = T;

fn send(&self, cmd_task: Self::Task) -> Result<(), BackendError> {
let key = match cmd_task.get_key() {
Some(key) => key,
None => {
let resp = Resp::Error("missing key".to_string().into_bytes());
cmd_task.set_resp_result(Ok(resp));
return Ok(());
}
};
let resp =
Resp::Error(gen_moved(get_slot(key), self.redirection_address.clone()).into_bytes());
cmd_task.set_resp_result(Ok(resp));
Ok(())
}
}

pub struct RedirectionSenderFactory<T: CmdTask>(PhantomData<T>);

impl<T: CmdTask> Default for RedirectionSenderFactory<T> {
fn default() -> Self {
Self(PhantomData)
}
}

impl<T: CmdTask> CmdTaskSenderFactory for RedirectionSenderFactory<T> {
type Sender = RedirectionSender<T>;

fn create(&self, address: String) -> Self::Sender {
RedirectionSender {
redirection_address: address,
phantom: PhantomData,
}
}
}

pub type BackendSenderFactory<F, CF> =
CachedSenderFactory<RRSenderGroupFactory<RecoverableBackendNodeFactory<F, CF>>>;

Expand Down
8 changes: 7 additions & 1 deletion src/proxy/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::backend::{
BackendError, CachedSenderFactory, CmdTask, CmdTaskResultHandler, CmdTaskResultHandlerFactory,
CmdTaskSender, CmdTaskSenderFactory, ConnFactory, RRSenderGroupFactory,
CmdTaskSender, CmdTaskSenderFactory, ConnFactory, IntoTask, RRSenderGroupFactory,
RecoverableBackendNodeFactory,
};
use super::cluster::ClusterTag;
Expand Down Expand Up @@ -509,3 +509,9 @@ impl<T: CmdTask + ClusterTag> ClusterTag for BlockingHintTask<T> {
self.inner.set_cluster_name(cluster_name)
}
}

impl<T: CmdTask + ClusterTag> IntoTask<T> for BlockingHintTask<T> {
fn into_task(self) -> T {
self.into_inner()
}
}
Loading