Skip to content

Commit

Permalink
feature/decouple adapters from core (#972)
Browse files Browse the repository at this point in the history
* Add Github action for integration test

* Update tox

* Fetch spark from https link

* Use Spark version 3.1.2

* Seperate running Spark session and thrift

* Use Spark 3.1.2 and Hadoop 3.2

* Reset tox.ini

* Remove base pythons in tox.ini

* Fix reference to Docker compose file

* Remove timeout

* Remove artifact steps

* Bump Spark and Hadoop versions

* Reset Spark and Hadoop version

* Update comment

* Add changie

* add databricks and PR execution protections

* use single quotes

* remove `_target` suffix

* add comment to test

* specify container user as root

* formatting

* remove python setup for pre-existing container

* download simba

* fix curl call

* fix curl call

* fix curl call

* fix curl call

* fix curl call

* fix curl call

* fix db test naming

* confirm ODBC driver installed

* add odbc driver env var

* add odbc driver env var

* specify platform

* check odbc driver integrity

* add dbt user env var

* add dbt user env var

* fix host_name env var

* try removing architecture arg

* swap back to pull_request_target

* try running on host instead of container

* Update .github/workflows/integration.yml

Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>

* try running odbcinst -j

* remove bash

* add sudo

* add sudo

* update odbc.ini

* install libsasl2-modules-gssapi-mit

* install libsasl2-modules-gssapi-mit

* set -e on odbc install

* set -e on odbc install

* set -e on odbc install

* sudo echo odbc.inst

* remove postgres components

* remove release related items

* remove irrelevant output

* move long bash script into its own file

* update integration.yml to align with other adapters

* revert name change

* revert name change

* combine databricks and spark tests

* combine databricks and spark tests

* Add dagger

* remove platform

* add dagger setup

* add dagger setup

* set env vars

* install requirements

* install requirements

* add DEFAULT_ENV_VARS and test_path arg

* remove circle ci

* formatting

* update changie

* Update .changes/unreleased/Under the Hood-20230929-161218.yaml

Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>

* formatting fixes and simplify env_var handling

* remove tox, update CONTRIBUTING.md and cleanup GHA workflows

* remove tox, update CONTRIBUTING.md and cleanup GHA workflows

* install test reqs in main.yml

* install test reqs in main.yml

* formatting

* remove tox from dev-requirements.txt and Makefile

* clarify spark crt instantiation

* add comments on python-version

* initial migration changes

* unpin

* implement core / adapters decoupling

* fix list_relations

* fix typing and exception imports

* fix typing and exception imports

* add changie

* replace dbt.common with dbt_common

* update setup.py

* add dbt-adapters

* update setup.py

* fix credentials import

* fix dev-requirements.txt

* dagger improvements to caching and installing package under test

* update requirements

* add cluster start fixture

* update conftest.py

* re-order dagger setup to reduce cache invalidation

* renove dbt-core version dependency version check

---------

Co-authored-by: Cor Zuurmond <jczuurmond@protonmail.com>
Co-authored-by: Florian Eiden <florian.eiden@fleid.fr>
Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com>
Co-authored-by: Mike Alfare <mike.alfare@dbtlabs.com>
  • Loading branch information
6 people authored Jan 25, 2024
1 parent e97918b commit 5d90ff9
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 132 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240111-114806.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Update import paths and list_relations to support decoupling adapters/core
time: 2024-01-11T11:48:06.120111-08:00
custom:
Author: colin-rogers-dbt
Issue: "972"
2 changes: 1 addition & 1 deletion dagger/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
dagger-io~=0.8.0
dagger-io~=0.9.7
python-dotenv
47 changes: 35 additions & 12 deletions dagger/run_dbt_spark_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import sys
from typing import Dict

import anyio as anyio
import dagger as dagger
Expand All @@ -19,7 +20,7 @@
TESTING_ENV_VARS.update({"ODBC_DRIVER": "/opt/simba/spark/lib/64/libsparkodbc_sb64.so"})


def env_variables(envs: dict[str, str]):
def env_variables(envs: Dict[str, str]):
def env_variables_inner(ctr: dagger.Container):
for key, value in envs.items():
ctr = ctr.with_env_variable(key, value)
Expand All @@ -28,18 +29,19 @@ def env_variables_inner(ctr: dagger.Container):
return env_variables_inner


async def get_postgres_container(client: dagger.Client) -> (dagger.Container, str):
ctr = await (
def get_postgres_container(client: dagger.Client) -> (dagger.Container, str):
ctr = (
client.container()
.from_("postgres:13")
.with_env_variable("POSTGRES_PASSWORD", "postgres")
.with_exposed_port(PG_PORT)
.as_service()
)

return ctr, "postgres_db"


async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
def get_spark_container(client: dagger.Client) -> (dagger.Service, str):
spark_dir = client.host().directory("./dagger/spark-container")
spark_ctr_base = (
client.container()
Expand All @@ -63,7 +65,7 @@ async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
)

# postgres is the metastore here
pg_ctr, pg_host = await get_postgres_container(client)
pg_ctr, pg_host = get_postgres_container(client)

spark_ctr = (
spark_ctr_base.with_service_binding(alias=pg_host, service=pg_ctr)
Expand All @@ -77,6 +79,7 @@ async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
]
)
.with_exposed_port(10000)
.as_service()
)

return spark_ctr, "spark_db"
Expand All @@ -85,29 +88,49 @@ async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
async def test_spark(test_args):
async with dagger.Connection(dagger.Config(log_output=sys.stderr)) as client:
test_profile = test_args.profile
req_files = client.host().directory("./", include=["*.txt", "*.env", "*.ini"])

# create cache volumes, these are persisted between runs saving time when developing locally
os_reqs_cache = client.cache_volume("os_reqs")
pip_cache = client.cache_volume("pip")

# setup directories as we don't want to copy the whole repo into the container
req_files = client.host().directory(
"./", include=["*.txt", "*.env", "*.ini", "*.md", "setup.py"]
)
dbt_spark_dir = client.host().directory("./dbt")
test_dir = client.host().directory("./tests")
scripts = client.host().directory("./dagger/scripts")

platform = dagger.Platform("linux/amd64")
tst_container = (
client.container(platform=platform)
.from_("python:3.8-slim")
.with_directory("/.", req_files)
.with_directory("/dbt", dbt_spark_dir)
.with_directory("/tests", test_dir)
.with_mounted_cache("/var/cache/apt/archives", os_reqs_cache)
.with_mounted_cache("/root/.cache/pip", pip_cache)
# install OS deps first so any local changes don't invalidate the cache
.with_directory("/scripts", scripts)
.with_exec("./scripts/install_os_reqs.sh")
.with_exec(["./scripts/install_os_reqs.sh"])
# install dbt-spark + python deps
.with_directory("/src", req_files)
.with_directory("src/dbt", dbt_spark_dir)
.with_directory("src/tests", test_dir)
.with_workdir("/src")
.with_exec(["pip", "install", "-U", "pip"])
.with_exec(["pip", "install", "-r", "requirements.txt"])
.with_exec(["pip", "install", "-r", "dev-requirements.txt"])
.with_exec(["pip", "install", "-e", "."])
)

if test_profile == "apache_spark":
spark_ctr, spark_host = await get_spark_container(client)
spark_ctr, spark_host = get_spark_container(client)
tst_container = tst_container.with_service_binding(alias=spark_host, service=spark_ctr)

elif test_profile in ["databricks_cluster", "databricks_sql_endpoint"]:
tst_container = tst_container.with_exec("./scripts/configure_odbc.sh")
tst_container = (
tst_container.with_workdir("/")
.with_exec(["./scripts/configure_odbc.sh"])
.with_workdir("/src")
)

elif test_profile == "spark_session":
tst_container = tst_container.with_exec(["pip", "install", "pyspark"])
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/spark/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Dict, Optional, TypeVar, Union

from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from dbt_common.dataclass_schema import dbtClassMixin

Self = TypeVar("Self", bound="SparkColumn")

Expand Down
57 changes: 29 additions & 28 deletions dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from contextlib import contextmanager

import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.contracts.connection import (
AdapterResponse,
ConnectionState,
Connection,
Credentials,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.exceptions import FailedToConnectError
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import ConnectionState, AdapterResponse
from dbt.events import AdapterLogger
from dbt.utils import DECIMALS
from dbt_common.exceptions import DbtConfigError, DbtRuntimeError, DbtDatabaseError

from dbt_common.utils.encoding import DECIMALS
from dbt.adapters.spark import __version__

try:
Expand All @@ -22,8 +28,7 @@
pyodbc = None
from datetime import datetime
import sqlparams
from dbt.contracts.connection import Connection
from dbt.dataclass_schema import StrEnum
from dbt_common.dataclass_schema import StrEnum
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Union, Tuple, List, Generator, Iterable, Sequence

Expand Down Expand Up @@ -92,15 +97,15 @@ def cluster_id(self) -> Optional[str]:

def __post_init__(self) -> None:
if self.method is None:
raise dbt.exceptions.DbtRuntimeError("Must specify `method` in profile")
raise DbtRuntimeError("Must specify `method` in profile")
if self.host is None:
raise dbt.exceptions.DbtRuntimeError("Must specify `host` in profile")
raise DbtRuntimeError("Must specify `host` in profile")
if self.schema is None:
raise dbt.exceptions.DbtRuntimeError("Must specify `schema` in profile")
raise DbtRuntimeError("Must specify `schema` in profile")

# spark classifies database and schema as the same thing
if self.database is not None and self.database != self.schema:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f" schema: {self.schema} \n"
f" database: {self.database} \n"
f"On Spark, database must be omitted or have the same value as"
Expand All @@ -112,7 +117,7 @@ def __post_init__(self) -> None:
try:
import pyodbc # noqa: F401
except ImportError as e:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f"{self.method} connection method requires "
"additional dependencies. \n"
"Install the additional required dependencies with "
Expand All @@ -121,7 +126,7 @@ def __post_init__(self) -> None:
) from e

if self.method == SparkConnectionMethod.ODBC and self.cluster and self.endpoint:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
"`cluster` and `endpoint` cannot both be set when"
f" using {self.method} method to connect to Spark"
)
Expand All @@ -130,7 +135,7 @@ def __post_init__(self) -> None:
self.method == SparkConnectionMethod.HTTP
or self.method == SparkConnectionMethod.THRIFT
) and not (ThriftState and THttpClient and hive):
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f"{self.method} connection method requires "
"additional dependencies. \n"
"Install the additional required dependencies with "
Expand All @@ -141,7 +146,7 @@ def __post_init__(self) -> None:
try:
import pyspark # noqa: F401
except ImportError as e:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f"{self.method} connection method requires "
"additional dependencies. \n"
"Install the additional required dependencies with "
Expand Down Expand Up @@ -291,13 +296,11 @@ def execute(self, sql: str, bindings: Optional[List[Any]] = None) -> None:
if poll_state.errorMessage:
logger.debug("Poll response: {}".format(poll_state))
logger.debug("Poll status: {}".format(state))
raise dbt.exceptions.DbtDatabaseError(poll_state.errorMessage)
raise DbtDatabaseError(poll_state.errorMessage)

elif state not in STATE_SUCCESS:
status_type = ThriftState._VALUES_TO_NAMES.get(state, "Unknown<{!r}>".format(state))
raise dbt.exceptions.DbtDatabaseError(
"Query failed with status: {}".format(status_type)
)
raise DbtDatabaseError("Query failed with status: {}".format(status_type))

logger.debug("Poll status: {}, query complete".format(state))

Expand Down Expand Up @@ -358,9 +361,9 @@ def exception_handler(self, sql: str) -> Generator[None, None, None]:
thrift_resp = exc.args[0]
if hasattr(thrift_resp, "status"):
msg = thrift_resp.status.errorMessage
raise dbt.exceptions.DbtRuntimeError(msg)
raise DbtRuntimeError(msg)
else:
raise dbt.exceptions.DbtRuntimeError(str(exc))
raise DbtRuntimeError(str(exc))

def cancel(self, connection: Connection) -> None:
connection.handle.cancel()
Expand Down Expand Up @@ -390,7 +393,7 @@ def validate_creds(cls, creds: Any, required: Iterable[str]) -> None:

for key in required:
if not hasattr(creds, key):
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
"The config '{}' is required when using the {} method"
" to connect to Spark".format(key, method)
)
Expand Down Expand Up @@ -481,7 +484,7 @@ def open(cls, connection: Connection) -> Connection:
endpoint=creds.endpoint
)
else:
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
"Either `cluster` or `endpoint` must set when"
" using the odbc method to connect to Spark"
)
Expand Down Expand Up @@ -525,9 +528,7 @@ def open(cls, connection: Connection) -> Connection:
Connection(server_side_parameters=creds.server_side_parameters)
)
else:
raise dbt.exceptions.DbtProfileError(
f"invalid credential method: {creds.method}"
)
raise DbtConfigError(f"invalid credential method: {creds.method}")
break
except Exception as e:
exc = e
Expand All @@ -537,7 +538,7 @@ def open(cls, connection: Connection) -> Connection:
msg = "Failed to connect"
if creds.token is not None:
msg += ", is your token valid?"
raise dbt.exceptions.FailedToConnectError(msg) from e
raise FailedToConnectError(msg) from e
retryable_message = _is_retryable_error(e)
if retryable_message and creds.connect_retries > 0:
msg = (
Expand All @@ -558,7 +559,7 @@ def open(cls, connection: Connection) -> Connection:
logger.warning(msg)
time.sleep(creds.connect_timeout)
else:
raise dbt.exceptions.FailedToConnectError("failed to connect") from e
raise FailedToConnectError("failed to connect") from e
else:
raise exc # type: ignore

Expand Down
Loading

0 comments on commit 5d90ff9

Please sign in to comment.