From d9b29b6afa3b596a48b00ef69343a67f2d4a5710 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 11:49:44 +0000 Subject: [PATCH 1/9] Run dashboard on Ray port and set Dashboard link --- dask_databricks/_version.py | 17 +++++++++++++++++ dask_databricks/cli.py | 2 +- dask_databricks/databrickscluster.py | 7 +++++++ 3 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 dask_databricks/_version.py diff --git a/dask_databricks/_version.py b/dask_databricks/_version.py new file mode 100644 index 0000000..2035ce1 --- /dev/null +++ b/dask_databricks/_version.py @@ -0,0 +1,17 @@ +# file generated by setuptools_scm +# don't change, don't track in version control +TYPE_CHECKING = False +if TYPE_CHECKING: + from typing import Tuple, Union + + VERSION_TUPLE = Tuple[Union[int, str], ...] +else: + VERSION_TUPLE = object + +version: str +__version__: str +__version_tuple__: VERSION_TUPLE +version_tuple: VERSION_TUPLE + +__version__ = version = '0.1.dev15+g4d11272.d20231108' +__version_tuple__ = version_tuple = (0, 1, 'dev15', 'g4d11272.d20231108') diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 09badd3..4181cdb 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -38,7 +38,7 @@ def run(): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - subprocess.Popen(["dask", "scheduler"]) + subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8265"]) else: log.info("This node is a Dask worker.") log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786") diff --git a/dask_databricks/databrickscluster.py b/dask_databricks/databrickscluster.py index 7a83bc5..6502b7f 100644 --- a/dask_databricks/databrickscluster.py +++ b/dask_databricks/databrickscluster.py @@ -39,6 +39,13 @@ async def _start(self): self.scheduler_comm = rpc(f"{self.spark_local_ip}:8786") await super()._start() + def dashboard_link(self): + if spark is None: + raise RuntimeError("Unable to locate spark session. Are you running this on a Databricks driver node?") + cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId") + org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId") + return f"https://dbc-dp-{org_id}.cloud.databricks.com/driver-proxy/o/{org_id}/{cluster_id}/8265/status" + def get_client(): """Get a Dask client connected to a Databricks cluster.""" From c6f3825b74efdc1263ee0375befb3e961b117f2e Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 11:50:25 +0000 Subject: [PATCH 2/9] Remove _version.py --- dask_databricks/_version.py | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 dask_databricks/_version.py diff --git a/dask_databricks/_version.py b/dask_databricks/_version.py deleted file mode 100644 index 2035ce1..0000000 --- a/dask_databricks/_version.py +++ /dev/null @@ -1,17 +0,0 @@ -# file generated by setuptools_scm -# don't change, don't track in version control -TYPE_CHECKING = False -if TYPE_CHECKING: - from typing import Tuple, Union - - VERSION_TUPLE = Tuple[Union[int, str], ...] -else: - VERSION_TUPLE = object - -version: str -__version__: str -__version_tuple__: VERSION_TUPLE -version_tuple: VERSION_TUPLE - -__version__ = version = '0.1.dev15+g4d11272.d20231108' -__version_tuple__ = version_tuple = (0, 1, 'dev15', 'g4d11272.d20231108') From 682723cb51bbb72ac2ef2bd2e319c6ed4f34e194 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 11:52:52 +0000 Subject: [PATCH 3/9] Start dashboard on a few ports for experimentation --- dask_databricks/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 4181cdb..2d8008f 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -38,7 +38,7 @@ def run(): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8265"]) + subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8265,:8087,:8001"]) else: log.info("This node is a Dask worker.") log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786") From 59eb2d962d315e6f9ab4ec3c3bb2195328ed684c Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 11:58:11 +0000 Subject: [PATCH 4/9] Make property --- dask_databricks/databrickscluster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_databricks/databrickscluster.py b/dask_databricks/databrickscluster.py index 6502b7f..535673c 100644 --- a/dask_databricks/databrickscluster.py +++ b/dask_databricks/databrickscluster.py @@ -39,6 +39,7 @@ async def _start(self): self.scheduler_comm = rpc(f"{self.spark_local_ip}:8786") await super()._start() + @property def dashboard_link(self): if spark is None: raise RuntimeError("Unable to locate spark session. Are you running this on a Databricks driver node?") From 3cd77509a8b6bb068df27c08a1229676adb0b800 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 12:00:44 +0000 Subject: [PATCH 5/9] Avoid Ray port and just use 8087 --- dask_databricks/cli.py | 2 +- dask_databricks/databrickscluster.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 2d8008f..0651ca5 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -38,7 +38,7 @@ def run(): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8265,:8087,:8001"]) + subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) else: log.info("This node is a Dask worker.") log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786") diff --git a/dask_databricks/databrickscluster.py b/dask_databricks/databrickscluster.py index 535673c..54b1b03 100644 --- a/dask_databricks/databrickscluster.py +++ b/dask_databricks/databrickscluster.py @@ -45,7 +45,7 @@ def dashboard_link(self): raise RuntimeError("Unable to locate spark session. Are you running this on a Databricks driver node?") cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId") org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId") - return f"https://dbc-dp-{org_id}.cloud.databricks.com/driver-proxy/o/{org_id}/{cluster_id}/8265/status" + return f"https://dbc-dp-{org_id}.cloud.databricks.com/driver-proxy/o/{org_id}/{cluster_id}/8087/status" def get_client(): From 591361bd7a8d1b6a01d4716bf14439b4ef10d3e2 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 12:41:08 +0000 Subject: [PATCH 6/9] Find spark session --- dask_databricks/databrickscluster.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dask_databricks/databrickscluster.py b/dask_databricks/databrickscluster.py index 54b1b03..96be4f8 100644 --- a/dask_databricks/databrickscluster.py +++ b/dask_databricks/databrickscluster.py @@ -6,8 +6,12 @@ from distributed.deploy.cluster import Cluster from tornado.ioloop import IOLoop -# Databricks Notebooks injects the `spark` session variable -if "spark" not in globals(): +# Databricks Notebooks injects the `spark` session variable but we need to create it ourselves +try: + from pyspark.sql import SparkSession + + spark = SparkSession.getActiveSession() +except ImportError: spark = None @@ -41,8 +45,6 @@ async def _start(self): @property def dashboard_link(self): - if spark is None: - raise RuntimeError("Unable to locate spark session. Are you running this on a Databricks driver node?") cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId") org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId") return f"https://dbc-dp-{org_id}.cloud.databricks.com/driver-proxy/o/{org_id}/{cluster_id}/8087/status" From 7ccf56597b5a20c4a16cbf6087bfdaa25b52bde2 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 12:54:09 +0000 Subject: [PATCH 7/9] Remove / from dashboard prefix --- dask_databricks/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 0651ca5..f59bb56 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -38,7 +38,7 @@ def run(): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) + subprocess.Popen(["dask", "scheduler", "--dashboard-prefix", "", "--dashboard-address", ":8787,:8087"]) else: log.info("This node is a Dask worker.") log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786") From 9dc1aff7894f7a738d0b40fad9d72f40c72e0c43 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 13:52:01 +0000 Subject: [PATCH 8/9] Trying to set a null prefix a little differently --- dask_databricks/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index f59bb56..7d9d902 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -38,7 +38,7 @@ def run(): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - subprocess.Popen(["dask", "scheduler", "--dashboard-prefix", "", "--dashboard-address", ":8787,:8087"]) + subprocess.Popen(["bash", "-c", "dask scheduler --dashboard-prefix '' --dashboard-address :8787,:8087"]) else: log.info("This node is a Dask worker.") log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786") From 4984d90ba4957b06c05890ed1ad2523a8994351b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 8 Nov 2023 14:27:28 +0000 Subject: [PATCH 9/9] Revert prefix changes --- dask_databricks/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_databricks/cli.py b/dask_databricks/cli.py index 7d9d902..0651ca5 100644 --- a/dask_databricks/cli.py +++ b/dask_databricks/cli.py @@ -38,7 +38,7 @@ def run(): if DB_IS_DRIVER == "TRUE": log.info("This node is the Dask scheduler.") - subprocess.Popen(["bash", "-c", "dask scheduler --dashboard-prefix '' --dashboard-address :8787,:8087"]) + subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"]) else: log.info("This node is a Dask worker.") log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786")