diff --git a/utils/cluster_manager.py b/utils/cluster_manager.py index 73ad26f991..a05b7f0b96 100644 --- a/utils/cluster_manager.py +++ b/utils/cluster_manager.py @@ -364,10 +364,14 @@ def create_servers( def create_cluster( - servers: List[RedisServer], replica_count: int, cluster_folder: str, use_tls: bool + servers: List[RedisServer], + shard_count: int, + replica_count: int, + cluster_folder: str, + use_tls: bool, ): tic = time.perf_counter() - servers = (str(server) for server in servers) + servers_tuple = (str(server) for server in servers) logging.debug("## Starting cluster creation...") p = subprocess.Popen( [ @@ -375,7 +379,7 @@ def create_cluster( *get_redis_cli_option_args(cluster_folder, use_tls), "--cluster", "create", - *servers, + *servers_tuple, "--cluster-replicas", str(replica_count), "--cluster-yes", @@ -398,12 +402,107 @@ def create_cluster( f"Waiting for server with log {log_file} to reach status OK.\n" f"See {dir}/redis.log for more information" ) - + if replica_count > 0: + wait_for_nodes_to_become_replicas( + shard_count * replica_count, servers, cluster_folder, use_tls + ) + wait_for_sync_to_finish(servers, cluster_folder, use_tls) logging.debug("The cluster was successfully created!") toc = time.perf_counter() logging.debug(f"create_cluster {cluster_folder} Elapsed time: {toc - tic:0.4f}") +def wait_for_nodes_to_become_replicas( + num_of_replicas: int, servers: List[RedisServer], cluster_folder: str, use_tls: bool +): + for server in servers: + cmd_args = [ + "redis-cli", + "-h", + server.host, + "-p", + str(server.port), + *get_redis_cli_option_args(cluster_folder, use_tls), + "cluster", + "nodes", + ] + logging.debug(f"Executing: {cmd_args}") + retries = 50 + while retries >= 0: + try: + p = subprocess.Popen( + cmd_args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + output, err = p.communicate(timeout=5) + if err: + raise Exception( + f"Failed to execute command: {p.args}\n Return code: {p.returncode}\n Error: {err}" + ) + if output.count("slave") == num_of_replicas: + logging.debug( + f"All replicas found in the topology view of {server}" + ) + break + else: + retries -= 1 + time.sleep(0.5) + except subprocess.TimeoutExpired: + time.sleep(0.5) + retries -= 1 + if retries == 0: + raise Exception( + "Timeout exceeded trying to wait for nodes to become replicas" + ) + + +def wait_for_sync_to_finish( + servers: List[RedisServer], cluster_folder: str, use_tls: bool +): + for server in servers: + cmd_args = [ + "redis-cli", + "-h", + server.host, + "-p", + str(server.port), + *get_redis_cli_option_args(cluster_folder, use_tls), + "info", + "replication", + ] + logging.debug(f"Executing: {cmd_args}") + retries = 50 + while retries >= 0: + try: + p = subprocess.Popen( + cmd_args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + output, err = p.communicate(timeout=5) + if err: + raise Exception( + f"Failed to execute command: {p.args}\n Return code: {p.returncode}\n Error: {err}" + ) + if "role:master" in output or "master_link_status:up" in output: + logging.debug(f"Server {server} is ready!") + break + else: + retries -= 1 + time.sleep(0.5) + continue + except subprocess.TimeoutExpired: + time.sleep(0.5) + retries -= 1 + if retries < 0: + raise Exception( + f"Timeout exceeded trying to wait for replica {server.host}:{server:port} to finish syncing" + ) + + def wait_for_server( server: RedisServer, cluster_folder: str, @@ -430,7 +529,7 @@ def wait_for_server( try: output, err = p.communicate(timeout=1) if output.strip() == "PONG": - logging.debug(f"Server {server} is ready!") + logging.debug(f"Server {server} is up!") return True if p.returncode != 0: logging.debug( @@ -814,7 +913,9 @@ def main(): cluster_folder, args.tls, ) - create_cluster(servers, args.replica_count, cluster_folder, args.tls) + create_cluster( + servers, args.shard_count, args.replica_count, cluster_folder, args.tls + ) servers_str = ",".join(str(server) for server in servers) toc = time.perf_counter() logging.info(f"Created cluster in {toc - tic:0.4f} seconds")