Skip to content

Commit

Permalink
Merge branch 'main' of github.com:jacobtomlinson/dask-databricks into…
Browse files Browse the repository at this point in the history
… main
  • Loading branch information
skirui-source committed Nov 8, 2023
2 parents a356add + 8af31a1 commit da34b30
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 74 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: "Test"
on:
pull_request:
push:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 45
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install hatch
run: pipx install hatch
- name: Run tests
run: hatch run test:run
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,5 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

_version.py
18 changes: 18 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive
# dependency. Make sure to update flake8-bugbear manually on a regular basis.
repos:
- repo: https://github.com/psf/black
rev: 23.10.1
hooks:
- id: black
language_version: python3
exclude: versioneer.py
args:
- --target-version=py39
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: "v0.1.3"
hooks:
- id: ruff
language_version: python3
args: [--fix, --exit-non-zero-on-fix]
21 changes: 16 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ To launch a Dask cluster on Databricks you need to create an [init script](https
dask databricks run
```

Then from your Databricks Notebook you can use the `DatabricksCluster` class to quickly connect a Dask `Client` to the scheduler running on the Spark Driver Node.
Then from your Databricks Notebook you can quickly connect a Dask `Client` to the scheduler running on the Spark Driver Node.

```python
from dask.distributed import Client
from dask_databricks import DatabricksCluster
import dask_databricks

cluster = DatabricksCluster()
client = Client(cluster)
client = dask_databricks.get_client()
```

Now you can submit work from your notebook to the multi-node Dask cluster.
Expand All @@ -36,3 +34,16 @@ def inc(x):
x = client.submit(inc, 10)
x.result()
```

### Dashboard

You can access the [Dask dashboard](https://docs.dask.org/en/latest/dashboard.html) via the Databricks driver-node proxy. The link can be found in `Client` or `DatabricksCluster` repr or via `client.dashboard_link`.

```python
>>> print(client.dashboard_link)
https://dbc-dp-xxxx.cloud.databricks.com/driver-proxy/o/xxxx/xx-xxx-xxxx/8087/status
```

![](https://user-images.githubusercontent.com/1610850/281442274-450d41c6-2eb6-42a1-8de6-c4a1a1b84193.png)

![](https://user-images.githubusercontent.com/1610850/281441285-9b84d5f1-d58a-45dc-9354-7385e1599d1f.png)
4 changes: 0 additions & 4 deletions dask_databricks/__about__.py

This file was deleted.

20 changes: 19 additions & 1 deletion dask_databricks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,22 @@
#
# SPDX-License-Identifier: BSD-3

from .databrickscluster import DatabricksCluster
from .databrickscluster import DatabricksCluster, get_client # noqa

# Define the variable '__version__':
try:
# If setuptools_scm is installed (e.g. in a development environment with
# an editable install), then use it to determine the version dynamically.
from setuptools_scm import get_version

# This will fail with LookupError if the package is not installed in
# editable mode or if Git is not installed.
__version__ = get_version(root="..", relative_to=__file__)
except (ImportError, LookupError):
# As a fallback, use the version that is hard-coded in the file.
try:
from dask_databricks._version import __version__ # noqa: F401
except ModuleNotFoundError:
# The user is probably trying to run this without having installed
# the package, so complain.
raise RuntimeError("dask-databricks is not correctly installed. " "Please install it with pip.")
21 changes: 12 additions & 9 deletions dask_databricks/cli.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import click
import logging
import os
import socket
import subprocess
import sys
import time

import click
from rich.logging import RichHandler


def get_logger():
logging.basicConfig(
level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
logging.basicConfig(level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler()])
return logging.getLogger("dask_databricks")


@click.group(name="databricks")
def main():
"""Tools to launch Dask on Databricks."""


@main.command()
@click.option('--worker-command', help='Custom worker command')
@click.option('--worker-args', help='Additional worker arguments as a single string')
Expand All @@ -27,17 +28,19 @@ def run(worker_command, worker_args):

log.info("Setting up Dask on a Databricks cluster.")

DB_IS_DRIVER = os.getenv('DB_IS_DRIVER')
DB_DRIVER_IP = os.getenv('DB_DRIVER_IP')
DB_IS_DRIVER = os.getenv("DB_IS_DRIVER")
DB_DRIVER_IP = os.getenv("DB_DRIVER_IP")

if DB_DRIVER_IP is None or DB_IS_DRIVER is None:
log.error("Unable to find expected environment variables DB_IS_DRIVER and DB_DRIVER_IP. "
"Are you running this command on a Databricks multi-node cluster?")
log.error(
"Unable to find expected environment variables DB_IS_DRIVER and DB_DRIVER_IP. "
"Are you running this command on a Databricks multi-node cluster?"
)
sys.exit(1)

if DB_IS_DRIVER == "TRUE":
log.info("This node is the Dask scheduler.")
subprocess.Popen(["dask", "scheduler"])
subprocess.Popen(["dask", "scheduler", "--dashboard-address", ":8787,:8087"])
else:
log.info("This node is a Dask worker.")
log.info(f"Connecting to Dask scheduler at {DB_DRIVER_IP}:8786")
Expand Down
39 changes: 29 additions & 10 deletions dask_databricks/databrickscluster.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
import os
import uuid
from typing import Optional

from distributed.deploy.cluster import Cluster
from distributed.core import rpc
from typing import Optional
from distributed.deploy.cluster import Cluster
from tornado.ioloop import IOLoop

# Databricks Notebooks injects the `spark` session variable
if 'spark' not in globals():
# Databricks Notebooks injects the `spark` session variable but we need to create it ourselves
try:
from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()
except ImportError:
spark = None


class DatabricksCluster(Cluster):
"""Connect to a Dask cluster deployed via databricks."""

def __init__(self,
def __init__(
self,
loop: Optional[IOLoop] = None,
asynchronous: bool = False,):
asynchronous: bool = False,
):
self.spark_local_ip = os.getenv("SPARK_LOCAL_IP")
if self.spark_local_ip is None:
raise KeyError("Unable to find expected environment variable SPARK_LOCAL_IP. "
"Are you running this on a Databricks driver node?")
raise KeyError(
"Unable to find expected environment variable SPARK_LOCAL_IP. "
"Are you running this on a Databricks driver node?"
)
try:
name = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
except AttributeError:
Expand All @@ -30,7 +39,17 @@ def __init__(self,
self._loop_runner.start()
self.sync(self._start)


async def _start(self):
self.scheduler_comm = rpc(f'{self.spark_local_ip}:8786')
self.scheduler_comm = rpc(f"{self.spark_local_ip}:8786")
await super()._start()

@property
def dashboard_link(self):
cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId")
return f"https://dbc-dp-{org_id}.cloud.databricks.com/driver-proxy/o/{org_id}/{cluster_id}/8087/status"


def get_client():
"""Get a Dask client connected to a Databricks cluster."""
return DatabricksCluster().get_client()
13 changes: 12 additions & 1 deletion dask_databricks/tests/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from dask.distributed import Client
from distributed.deploy import Cluster, LocalCluster

from dask_databricks import DatabricksCluster, get_client

from dask_databricks import DatabricksCluster

@pytest.fixture(scope="session")
def dask_cluster():
"""Start a LocalCluster to simulate the cluster that would be started on Databricks."""
return LocalCluster(scheduler_port=8786)


@pytest.fixture
def remove_spark_local_ip():
original_spark_local_ip = os.getenv("SPARK_LOCAL_IP")
Expand All @@ -21,6 +22,7 @@ def remove_spark_local_ip():
if original_spark_local_ip:
os.environ["SPARK_LOCAL_IP"] = original_spark_local_ip


@pytest.fixture
def set_spark_local_ip():
original_spark_local_ip = os.getenv("SPARK_LOCAL_IP")
Expand All @@ -31,10 +33,12 @@ def set_spark_local_ip():
else:
del os.environ["SPARK_LOCAL_IP"]


def test_databricks_cluster_raises_key_error_when_initialised_outside_of_databricks(remove_spark_local_ip):
with pytest.raises(KeyError):
DatabricksCluster()


def test_databricks_cluster_create(set_spark_local_ip, dask_cluster):
cluster = DatabricksCluster()
assert isinstance(cluster, Cluster)
Expand All @@ -45,3 +49,10 @@ def test_databricks_cluster_create_client(set_spark_local_ip, dask_cluster):
client = Client(cluster)
assert isinstance(client, Client)
assert client.submit(sum, (10, 1)).result() == 11


def test_get_client(set_spark_local_ip, dask_cluster):
client = get_client()
assert isinstance(client, Client)
assert isinstance(client.cluster, DatabricksCluster)
assert client.submit(sum, (10, 1)).result() == 11
Loading

0 comments on commit da34b30

Please sign in to comment.