Skip to content

Commit

Permalink
[BUGFIX] Raise warning if data_connector cannot be built on config lo…
Browse files Browse the repository at this point in the history
…ad (#8658)
  • Loading branch information
Kilo59 authored Sep 1, 2023
1 parent 0d7f0c0 commit df69219
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
36 changes: 32 additions & 4 deletions great_expectations/datasource/fluent/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,40 @@ def _save_context_project_config(self) -> Union[Datasource, None]:
return None

def _rebuild_asset_data_connectors(self) -> None:
"""If Datasource required a data_connector we need to build the data_connector for each asset"""
"""
If Datasource required a data_connector we need to build the data_connector for each asset.
A warning is raised if a data_connector cannot be built for an asset.
Not all users will have access to the needed dependencies (packages or credentials) for every asset.
Missing dependencies will stop them from using the asset but should not stop them from loading it from config.
"""
asset_build_failure_direct_cause: dict[str, Exception | BaseException] = {}

if self.data_connector_type:
for data_asset in self.assets:
# check if data_connector exist before rebuilding?
connect_options = getattr(data_asset, "connect_options", {})
self._build_data_connector(data_asset, **connect_options)
try:
# check if data_connector exist before rebuilding?
connect_options = getattr(data_asset, "connect_options", {})
self._build_data_connector(data_asset, **connect_options)
except Exception as dc_build_err:
logger.info(
f"Unable to build data_connector for {self.type} {data_asset.type} {data_asset.name}",
exc_info=True,
)
# reveal direct cause instead of generic, unhelpful MyDatasourceError
asset_build_failure_direct_cause[data_asset.name] = (
dc_build_err.__cause__ or dc_build_err
)
if asset_build_failure_direct_cause:
# TODO: allow users to opt out of these warnings
names_and_error: List[str] = [
f"{name}:{type(exc).__name__}"
for (name, exc) in asset_build_failure_direct_cause.items()
]
warnings.warn(
f"data_connector build failure for {self.name} assets - {', '.join(names_and_error)}",
category=RuntimeWarning,
)

@staticmethod
def parse_order_by_sorters(
Expand Down
16 changes: 12 additions & 4 deletions tests/datasource/fluent/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import logging
import pathlib
import warnings
from pprint import pformat as pf
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -330,6 +331,13 @@ def cloud_storage_get_client_doubles(
)


@pytest.fixture
def filter_data_connector_build_warning():
with warnings.catch_warnings() as w:
warnings.simplefilter("ignore", RuntimeWarning)
yield w


@pytest.fixture
def fluent_only_config(
fluent_gx_config_yml_str: str, seed_ds_env_vars: tuple
Expand Down Expand Up @@ -365,7 +373,7 @@ def fluent_yaml_config_file(
@pytest.fixture
@functools.lru_cache(maxsize=1)
def seeded_file_context(
cloud_storage_get_client_doubles,
filter_data_connector_build_warning,
fluent_yaml_config_file: pathlib.Path,
seed_ds_env_vars: tuple,
) -> FileDataContext:
Expand All @@ -378,7 +386,7 @@ def seeded_file_context(

@pytest.fixture
def seed_cloud(
cloud_storage_get_client_doubles,
filter_data_connector_build_warning,
cloud_api_fake: responses.RequestsMock,
fluent_only_config: GxConfig,
):
Expand Down Expand Up @@ -410,8 +418,8 @@ def seeded_cloud_context(

@pytest.fixture(
params=[
pytest.param("seeded_file_context", marks=pytest.mark.filesystem),
pytest.param("seeded_cloud_context", marks=pytest.mark.cloud),
pytest.param("seeded_file_context", marks=[pytest.mark.filesystem]),
pytest.param("seeded_cloud_context", marks=[pytest.mark.cloud]),
]
)
def seeded_contexts(
Expand Down
31 changes: 31 additions & 0 deletions tests/datasource/fluent/test_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pathlib
import re
import urllib.parse
from collections import defaultdict
from pprint import pformat as pf
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -367,5 +368,35 @@ def test_payload_sent_to_cloud(
)


@pytest.mark.filesystem
def test_data_connectors_are_built_on_config_load(
cloud_storage_get_client_doubles,
seeded_file_context: FileDataContext,
):
"""
Ensure that all Datasources that require data_connectors have their data_connectors
created when loaded from config.
"""
context = seeded_file_context
dc_datasources: dict[str, list[str]] = defaultdict(list)

assert context.fluent_datasources
for datasource in context.fluent_datasources.values():
if datasource.data_connector_type:
print(f"class: {datasource.__class__.__name__}")
print(f"type: {datasource.type}")
print(f"data_connector: {datasource.data_connector_type.__name__}")
print(f"name: {datasource.name}", end="\n\n")

dc_datasources[datasource.type].append(datasource.name)

for asset in datasource.assets:
assert isinstance(asset._data_connector, datasource.data_connector_type)
print()

print(f"Datasources with DataConnectors\n{pf(dict(dc_datasources))}")
assert dc_datasources


if __name__ == "__main__":
pytest.main([__file__, "-vv"])

0 comments on commit df69219

Please sign in to comment.