From a356addd68afaac5afcecca21b828610417eb9bb Mon Sep 17 00:00:00 2001 From: skirui-source Date: Tue, 7 Nov 2023 18:11:28 -0800 Subject: [PATCH 1/9] add alternative worker commands, config options --- dask_databricks/cli.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 7849d06..d5548b1 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -19,7 +19,9 @@ def main(): """Tools to launch Dask on Databricks.""" @main.command() -def run(): +@click.option('--worker-command', help='Custom worker command') +@click.option('--worker-args', help='Additional worker arguments as a single string') +def run(worker_command, worker_args): """Run Dask processes on a Databricks cluster.""" log = get_logger() @@ -48,7 +50,15 @@ def run(): except ConnectionRefusedError: log.info("Scheduler not available yet. Waiting...") time.sleep(1) - subprocess.Popen(["dask", "worker", f"tcp://{DB_DRIVER_IP}:8786"]) + + # Construct the worker command + worker_command = worker_command.split() if worker_command else ["dask", "worker"] + if worker_args: + worker_command.extend(worker_args.split()) + + worker_command.append(f"tcp://{DB_DRIVER_IP}:8786") + + subprocess.Popen(worker_command) if __name__ == "__main__": From 6efb91ae80f0a5ac8a651d2457524265407083ee Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 8 Nov 2023 21:55:48 +0000 Subject: [PATCH 2/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- dask_databricks/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index e94b830..f628b1f 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -58,7 +58,7 @@ def run(worker_command, worker_args): worker_command = worker_command.split() if worker_command else ["dask", "worker"] if worker_args: worker_command.extend(worker_args.split()) - + worker_command.append(f"tcp://{DB_DRIVER_IP}:8786") subprocess.Popen(worker_command) From 6bda422d282a1bbe957aaf5d52dc3a3d2ba71fc8 Mon Sep 17 00:00:00 2001 From: skirui-source Date: Wed, 8 Nov 2023 18:44:10 -0800 Subject: [PATCH 3/9] add support for Json worker args --- dask_databricks/cli.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index e94b830..c1d5dbd 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -1,3 +1,4 @@ +import json import logging import os import socket @@ -21,7 +22,7 @@ def main(): @main.command() @click.option('--worker-command', help='Custom worker command') -@click.option('--worker-args', help='Additional worker arguments as a single string') +@click.option('--worker-args', help='Additional worker arguments') def run(worker_command, worker_args): """Run Dask processes on a Databricks cluster.""" log = get_logger() @@ -54,11 +55,19 @@ def run(worker_command, worker_args): log.info("Scheduler not available yet. Waiting...") time.sleep(1) + if worker_args: + try: + # Try to decode the JSON-encoded worker_args + worker_args_list = json.loads(worker_args) + if not isinstance(worker_args_list, list): + raise ValueError("The JSON-encoded worker_args must be a list.") + except json.JSONDecodeError: + # If decoding as JSON fails, split worker_args by spaces + worker_args_list = worker_args.split() + # Construct the worker command worker_command = worker_command.split() if worker_command else ["dask", "worker"] - if worker_args: - worker_command.extend(worker_args.split()) - + worker_command.extend(worker_args_list) worker_command.append(f"tcp://{DB_DRIVER_IP}:8786") subprocess.Popen(worker_command) From 37938830db4ff370dba5f4b363a6fb6a95f46e81 Mon Sep 17 00:00:00 2001 From: skirui-source Date: Wed, 8 Nov 2023 20:00:27 -0800 Subject: [PATCH 4/9] polls the scheduler for health check --- dask_databricks/cli.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index c1d5dbd..5f0c88b 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -41,14 +41,20 @@ def run(worker_command, worker_args): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) + scheduler_process = subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) + time.sleep(5) # Sleep for 5 seconds to give the scheduler time to start + if scheduler_process.poll() is not None: + log.error("Scheduler process has exited.") + sys.exit(1) else: + # Specify the same port for all workers + worker_port = 8786 log.info("This node is a Dask worker.") - log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786") + log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:{worker_port}") while True: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((DB_DRIVER_IP, 8786)) + sock.connect((DB_DRIVER_IP, worker_port)) sock.close() break except ConnectionRefusedError: @@ -63,12 +69,13 @@ def run(worker_command, worker_args): raise ValueError("The JSON-encoded worker_args must be a list.") except json.JSONDecodeError: # If decoding as JSON fails, split worker_args by spaces + # TODO: Are there other cases to account for? worker_args_list = worker_args.split() # Construct the worker command worker_command = worker_command.split() if worker_command else ["dask", "worker"] worker_command.extend(worker_args_list) - worker_command.append(f"tcp://{DB_DRIVER_IP}:8786") + worker_command.append(f"tcp://{DB_DRIVER_IP}:{worker_port}") subprocess.Popen(worker_command) From 458240d6add64e5408c014529d276ed301dfbc6a Mon Sep 17 00:00:00 2001 From: skirui-source Date: Wed, 8 Nov 2023 23:50:31 -0800 Subject: [PATCH 5/9] extra health checks for dask workers --- dask_databricks/cli.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 5f0c88b..584eac4 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -42,7 +42,7 @@ def run(worker_command, worker_args): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") scheduler_process = subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) - time.sleep(5) # Sleep for 5 seconds to give the scheduler time to start + time.sleep(5) # give the scheduler time to start if scheduler_process.poll() is not None: log.error("Scheduler process has exited.") sys.exit(1) @@ -79,6 +79,24 @@ def run(worker_command, worker_args): subprocess.Popen(worker_command) + # # Start the worker subprocess and capture its output + # log.info("Starting the worker...") + # worker_process = subprocess.Popen(worker_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + + # # Watch the worker's stdout for "Start worker at:" log line + # while True: + # line = worker_process.stdout.readline() + # if not line: + # break # If there's no more output, the worker process has finished + # if "Start worker at:" in line: + # # Log the message + # log.info(line.strip()) + # break # Exit the loop after capturing the message + + # return_code = worker_process.poll() + # if return_code is not None: + # log.info(f"Worker process has exited with return code {return_code}") + if __name__ == "__main__": main() From 3f6065bb2d24c79b15e28ff873404110cd6cc2e7 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 9 Nov 2023 14:07:48 +0000 Subject: [PATCH 6/9] Handle dask not being on the path --- dask_databricks/cli.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 584eac4..4323a03 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -41,7 +41,9 @@ def run(worker_command, worker_args): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - scheduler_process = subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) + scheduler_process = subprocess.Popen( + [sys.executable, "-m", "dask", "scheduler", "--dashboard-address", ":8787,:8087"] + ) time.sleep(5) # give the scheduler time to start if scheduler_process.poll() is not None: log.error("Scheduler process has exited.") @@ -77,7 +79,7 @@ def run(worker_command, worker_args): worker_command.extend(worker_args_list) worker_command.append(f"tcp://{DB_DRIVER_IP}:{worker_port}") - subprocess.Popen(worker_command) + subprocess.Popen([sys.executable, "-m"] + worker_command) # # Start the worker subprocess and capture its output # log.info("Starting the worker...") From 93755ac95aa0d99f39057a534fd000881bc0f2e6 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 9 Nov 2023 14:32:02 +0000 Subject: [PATCH 7/9] Revert sys.executable change --- dask_databricks/cli.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 4323a03..584eac4 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -41,9 +41,7 @@ def run(worker_command, worker_args): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - scheduler_process = subprocess.Popen( - [sys.executable, "-m", "dask", "scheduler", "--dashboard-address", ":8787,:8087"] - ) + scheduler_process = subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) time.sleep(5) # give the scheduler time to start if scheduler_process.poll() is not None: log.error("Scheduler process has exited.") @@ -79,7 +77,7 @@ def run(worker_command, worker_args): worker_command.extend(worker_args_list) worker_command.append(f"tcp://{DB_DRIVER_IP}:{worker_port}") - subprocess.Popen([sys.executable, "-m"] + worker_command) + subprocess.Popen(worker_command) # # Start the worker subprocess and capture its output # log.info("Starting the worker...") From f849a4c29ce6e18ae20334f4708fa6fa88c11a79 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 9 Nov 2023 15:04:53 +0000 Subject: [PATCH 8/9] Fix when worker args are not specified --- dask_databricks/cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 584eac4..745e8fa 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -61,6 +61,8 @@ def run(worker_command, worker_args): log.info("Scheduler not available yet. Waiting...") time.sleep(1) + # Construct the worker command + worker_command = worker_command.split() if worker_command else ["dask", "worker"] if worker_args: try: # Try to decode the JSON-encoded worker_args @@ -72,9 +74,7 @@ def run(worker_command, worker_args): # TODO: Are there other cases to account for? worker_args_list = worker_args.split() - # Construct the worker command - worker_command = worker_command.split() if worker_command else ["dask", "worker"] - worker_command.extend(worker_args_list) + worker_command.extend(worker_args_list) worker_command.append(f"tcp://{DB_DRIVER_IP}:{worker_port}") subprocess.Popen(worker_command) From 5301b63f2f17b46cfaf156406f1871084bcfc65f Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 9 Nov 2023 16:11:30 +0000 Subject: [PATCH 9/9] Clean up and add --cuda flag --- dask_databricks/cli.py | 44 ++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 745e8fa..91d317a 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -23,7 +23,14 @@ def main(): @main.command() @click.option('--worker-command', help='Custom worker command') @click.option('--worker-args', help='Additional worker arguments') -def run(worker_command, worker_args): +@click.option( + "--cuda", + is_flag=True, + show_default=True, + default=False, + help="Use `dask cuda worker` from the dask-cuda package when starting workers.", +) +def run(worker_command, worker_args, cuda): """Run Dask processes on a Databricks cluster.""" log = get_logger() @@ -44,7 +51,7 @@ def run(worker_command, worker_args): scheduler_process = subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) time.sleep(5) # give the scheduler time to start if scheduler_process.poll() is not None: - log.error("Scheduler process has exited.") + log.error("Scheduler process has exited prematurely.") sys.exit(1) else: # Specify the same port for all workers @@ -62,7 +69,13 @@ def run(worker_command, worker_args): time.sleep(1) # Construct the worker command - worker_command = worker_command.split() if worker_command else ["dask", "worker"] + if worker_command: + worker_command = worker_command.split() + elif cuda: + worker_command = ["dask", "cuda", "worker"] + else: + worker_command = ["dask", "worker"] + if worker_args: try: # Try to decode the JSON-encoded worker_args @@ -71,31 +84,16 @@ def run(worker_command, worker_args): raise ValueError("The JSON-encoded worker_args must be a list.") except json.JSONDecodeError: # If decoding as JSON fails, split worker_args by spaces - # TODO: Are there other cases to account for? worker_args_list = worker_args.split() worker_command.extend(worker_args_list) worker_command.append(f"tcp://{DB_DRIVER_IP}:{worker_port}") - subprocess.Popen(worker_command) - - # # Start the worker subprocess and capture its output - # log.info("Starting the worker...") - # worker_process = subprocess.Popen(worker_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) - - # # Watch the worker's stdout for "Start worker at:" log line - # while True: - # line = worker_process.stdout.readline() - # if not line: - # break # If there's no more output, the worker process has finished - # if "Start worker at:" in line: - # # Log the message - # log.info(line.strip()) - # break # Exit the loop after capturing the message - - # return_code = worker_process.poll() - # if return_code is not None: - # log.info(f"Worker process has exited with return code {return_code}") + worker_process = subprocess.Popen(worker_command) + time.sleep(5) # give the worker time to start + if worker_process.poll() is not None: + log.error("Worker process has exited prematurely.") + sys.exit(1) if __name__ == "__main__":