Skip to content

Commit

Permalink
Changed cluster_manager script to wait for replicas to be ready and i…
Browse files Browse the repository at this point in the history
…n sync
  • Loading branch information
barshaul committed Aug 6, 2023
1 parent d17d942 commit febecf6
Showing 1 changed file with 107 additions and 6 deletions.
113 changes: 107 additions & 6 deletions utils/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,18 +364,22 @@ def create_servers(


def create_cluster(
servers: List[RedisServer], replica_count: int, cluster_folder: str, use_tls: bool
servers: List[RedisServer],
shard_count: int,
replica_count: int,
cluster_folder: str,
use_tls: bool,
):
tic = time.perf_counter()
servers = (str(server) for server in servers)
servers_tuple = (str(server) for server in servers)
logging.debug("## Starting cluster creation...")
p = subprocess.Popen(
[
"redis-cli",
*get_redis_cli_option_args(cluster_folder, use_tls),
"--cluster",
"create",
*servers,
*servers_tuple,
"--cluster-replicas",
str(replica_count),
"--cluster-yes",
Expand All @@ -398,12 +402,107 @@ def create_cluster(
f"Waiting for server with log {log_file} to reach status OK.\n"
f"See {dir}/redis.log for more information"
)

if replica_count > 0:
wait_for_nodes_to_become_replicas(
shard_count * replica_count, servers, cluster_folder, use_tls
)
wait_for_sync_to_finish(servers, cluster_folder, use_tls)
logging.debug("The cluster was successfully created!")
toc = time.perf_counter()
logging.debug(f"create_cluster {cluster_folder} Elapsed time: {toc - tic:0.4f}")


def wait_for_nodes_to_become_replicas(
num_of_replicas: int, servers: List[RedisServer], cluster_folder: str, use_tls: bool
):
for server in servers:
cmd_args = [
"redis-cli",
"-h",
server.host,
"-p",
str(server.port),
*get_redis_cli_option_args(cluster_folder, use_tls),
"cluster",
"nodes",
]
logging.debug(f"Executing: {cmd_args}")
retries = 50
while retries >= 0:
try:
p = subprocess.Popen(
cmd_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
output, err = p.communicate(timeout=5)
if err:
raise Exception(
f"Failed to execute command: {p.args}\n Return code: {p.returncode}\n Error: {err}"
)
if output.count("slave") == num_of_replicas:
logging.debug(
f"All replicas found in the topology view of {server}"
)
break
else:
retries -= 1
time.sleep(0.5)
except subprocess.TimeoutExpired:
time.sleep(0.5)
retries -= 1
if retries == 0:
raise Exception(
"Timeout exceeded trying to wait for nodes to become replicas"
)


def wait_for_sync_to_finish(
servers: List[RedisServer], cluster_folder: str, use_tls: bool
):
for server in servers:
cmd_args = [
"redis-cli",
"-h",
server.host,
"-p",
str(server.port),
*get_redis_cli_option_args(cluster_folder, use_tls),
"info",
"replication",
]
logging.debug(f"Executing: {cmd_args}")
retries = 50
while retries >= 0:
try:
p = subprocess.Popen(
cmd_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
output, err = p.communicate(timeout=5)
if err:
raise Exception(
f"Failed to execute command: {p.args}\n Return code: {p.returncode}\n Error: {err}"
)
if "role:master" in output or "master_link_status:up" in output:
logging.debug(f"Server {server} is ready!")
break
else:
retries -= 1
time.sleep(0.5)
continue
except subprocess.TimeoutExpired:
time.sleep(0.5)
retries -= 1
if retries < 0:
raise Exception(
f"Timeout exceeded trying to wait for replica {server.host}:{server:port} to finish syncing"
)


def wait_for_server(
server: RedisServer,
cluster_folder: str,
Expand All @@ -430,7 +529,7 @@ def wait_for_server(
try:
output, err = p.communicate(timeout=1)
if output.strip() == "PONG":
logging.debug(f"Server {server} is ready!")
logging.debug(f"Server {server} is up!")
return True
if p.returncode != 0:
logging.debug(
Expand Down Expand Up @@ -814,7 +913,9 @@ def main():
cluster_folder,
args.tls,
)
create_cluster(servers, args.replica_count, cluster_folder, args.tls)
create_cluster(
servers, args.shard_count, args.replica_count, cluster_folder, args.tls
)
servers_str = ",".join(str(server) for server in servers)
toc = time.perf_counter()
logging.info(f"Created cluster in {toc - tic:0.4f} seconds")
Expand Down

0 comments on commit febecf6

Please sign in to comment.