Skip to content

Commit

Permalink
[BUGFIX] Ensure that Datasource credentials are not persisted to Cl…
Browse files Browse the repository at this point in the history
…oud/disk (#6254)
  • Loading branch information
cdkini authored Nov 3, 2022
1 parent 20188e0 commit a17701e
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,12 @@ def test_yaml_config( # noqa: C901 - complexity 17

usage_stats_event_name: str = "data_context.test_yaml_config"

# Based on the particular object type we are attempting to instantiate,
# we may need the original config, the substituted config, or both.
config = self._test_yaml_config_prepare_config(
yaml_config=yaml_config, usage_stats_event_name=usage_stats_event_name
)
config_with_substitutions = self._test_yaml_config_prepare_substituted_config(
yaml_config, runtime_environment, usage_stats_event_name
)

Expand All @@ -208,42 +213,50 @@ def test_yaml_config( # noqa: C901 - complexity 17
instantiated_class,
usage_stats_event_payload,
) = self._test_instantiation_of_store_from_yaml_config(
name, class_name, config
name=name, class_name=class_name, config=config_with_substitutions
)
elif class_name in self.TEST_YAML_CONFIG_SUPPORTED_DATASOURCE_TYPES:
(
instantiated_class,
usage_stats_event_payload,
) = self._test_instantiation_of_datasource_from_yaml_config(
name, class_name, config
name=name,
class_name=class_name,
config=config, # Uses original config as substitutions are done downstream
)
elif class_name in self.TEST_YAML_CONFIG_SUPPORTED_CHECKPOINT_TYPES:
(
instantiated_class,
usage_stats_event_payload,
) = self._test_instantiation_of_checkpoint_from_yaml_config(
name, class_name, config
name=name, class_name=class_name, config=config_with_substitutions
)
elif class_name in self.TEST_YAML_CONFIG_SUPPORTED_DATA_CONNECTOR_TYPES:
(
instantiated_class,
usage_stats_event_payload,
) = self._test_instantiation_of_data_connector_from_yaml_config(
name, class_name, config, runtime_environment
name=name,
class_name=class_name,
config=config_with_substitutions,
runtime_environment=runtime_environment,
)
elif class_name in self.TEST_YAML_CONFIG_SUPPORTED_PROFILER_TYPES:
(
instantiated_class,
usage_stats_event_payload,
) = self._test_instantiation_of_profiler_from_yaml_config(
name, class_name, config
name=name, class_name=class_name, config=config_with_substitutions
)
else:
(
instantiated_class,
usage_stats_event_payload,
) = self._test_instantiation_of_misc_class_from_yaml_config(
name, config, runtime_environment, usage_stats_event_payload
name=name,
config=config_with_substitutions,
runtime_environment=runtime_environment,
usage_stats_event_payload=usage_stats_event_payload,
)

send_usage_message_from_handler(
Expand Down Expand Up @@ -293,12 +306,37 @@ def test_yaml_config( # noqa: C901 - complexity 17
raise e

def _test_yaml_config_prepare_config(
self, yaml_config: str, usage_stats_event_name: str
) -> CommentedMap:
config = self._load_config_string_as_commented_map(
config_str=yaml_config,
usage_stats_event_name=usage_stats_event_name,
)
return config

def _test_yaml_config_prepare_substituted_config(
self, yaml_config: str, runtime_environment: dict, usage_stats_event_name: str
) -> CommentedMap:
"""
Performs variable substitution and conversion from YAML to CommentedMap.
See `test_yaml_config` for more details.
"""
config_str_with_substituted_variables = (
self._prepare_config_string_with_substituted_variables(
yaml_config=yaml_config,
runtime_environment=runtime_environment,
usage_stats_event_name=usage_stats_event_name,
)
)
config = self._load_config_string_as_commented_map(
config_str=config_str_with_substituted_variables,
usage_stats_event_name=usage_stats_event_name,
)
return config

def _prepare_config_string_with_substituted_variables(
self, yaml_config: str, runtime_environment: dict, usage_stats_event_name: str
) -> str:
try:
substituted_config_variables: Union[
DataContextConfig, dict
Expand All @@ -313,12 +351,13 @@ def _test_yaml_config_prepare_config(
**runtime_environment,
}

config_str_with_substituted_variables: Union[
DataContextConfig, dict
] = substitute_all_config_variables(
yaml_config,
substitutions,
config_str_with_substituted_variables: str = (
substitute_all_config_variables(
yaml_config,
substitutions,
)
)
return config_str_with_substituted_variables
except Exception as e:
usage_stats_event_payload: dict = {
"diagnostic_info": ["__substitution_error__"],
Expand All @@ -331,9 +370,12 @@ def _test_yaml_config_prepare_config(
)
raise e

def _load_config_string_as_commented_map(
self, config_str: str, usage_stats_event_name: str
) -> CommentedMap:
try:
config: CommentedMap = yaml.load(config_str_with_substituted_variables)
return config
substituted_config: CommentedMap = yaml.load(config_str)
return substituted_config

except Exception as e:
usage_stats_event_payload = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,11 @@ def save_datasource(
Returns:
The datasource, after storing and retrieving the stored config.
"""
# Chetan - 20221103 - Directly accessing private attr in order to patch security vulnerabiliy around credential leakage.
# This is to be removed once substitution logic is migrated from the context to the individual object level.
config = datasource._raw_config

datasource_config_dict: dict = datasourceConfigSchema.dump(datasource.config)
datasource_config_dict: dict = datasourceConfigSchema.dump(config)
# Manually need to add in class name to the config since it is not part of the runtime obj
datasource_config_dict["class_name"] = datasource.__class__.__name__

Expand All @@ -497,7 +500,10 @@ def save_datasource(
)
updated_datasource: Union[
LegacyDatasource, BaseDatasource
] = self._instantiate_datasource_from_config(config=substituted_config)
] = self._instantiate_datasource_from_config(
raw_config=updated_datasource_config_from_store,
substituted_config=substituted_config,
)
self._cached_datasources[datasource_name] = updated_datasource
return updated_datasource

Expand Down Expand Up @@ -949,17 +955,21 @@ def get_datasource(
datasource_name=datasource_name
)

config: dict = dict(datasourceConfigSchema.dump(datasource_config))
raw_config_dict: dict = dict(datasourceConfigSchema.dump(datasource_config))
raw_config = datasourceConfigSchema.load(raw_config_dict)

substitutions: dict = self._determine_substitutions()
config = substitute_all_config_variables(
config, substitutions, self.DOLLAR_SIGN_ESCAPE_STRING
substituted_config = substitute_all_config_variables(
raw_config_dict, substitutions, self.DOLLAR_SIGN_ESCAPE_STRING
)

# Instantiate the datasource and add to our in-memory cache of datasources, this does not persist:
datasource_config = datasourceConfigSchema.load(config)
datasource_config = datasourceConfigSchema.load(substituted_config)
datasource: Optional[
Union[LegacyDatasource, BaseDatasource]
] = self._instantiate_datasource_from_config(config=datasource_config)
] = self._instantiate_datasource_from_config(
raw_config=raw_config, substituted_config=substituted_config
)
self._cached_datasources[datasource_name] = datasource
return datasource

Expand Down Expand Up @@ -2717,21 +2727,31 @@ def _initialize_usage_statistics(

def _init_datasources(self) -> None:
"""Initialize the datasources in store"""
config: DataContextConfig = self.get_config_with_variables_substituted(
self.config
)
config: DataContextConfig = self.config
datasources: Dict[str, DatasourceConfig] = cast(
Dict[str, DatasourceConfig], config.datasources
)

substitutions = self._determine_substitutions()

for datasource_name, datasource_config in datasources.items():
try:
config = copy.deepcopy(datasource_config) # type: ignore[assignment]
config_dict = dict(datasourceConfigSchema.dump(config))
datasource_config = datasourceConfigSchema.load(config_dict)
datasource_config.name = datasource_name

raw_config_dict = dict(datasourceConfigSchema.dump(config))
substituted_config_dict: dict = substitute_all_config_variables(
raw_config_dict, substitutions, self.DOLLAR_SIGN_ESCAPE_STRING
)

raw_datasource_config = datasourceConfigSchema.load(raw_config_dict)
substituted_datasource_config = datasourceConfigSchema.load(
substituted_config_dict
)
substituted_datasource_config.name = datasource_name

datasource = self._instantiate_datasource_from_config(
config=datasource_config
raw_config=raw_datasource_config,
substituted_config=substituted_datasource_config,
)
self._cached_datasources[datasource_name] = datasource
except ge_exceptions.DatasourceInitializationError as e:
Expand All @@ -2742,7 +2762,9 @@ def _init_datasources(self) -> None:
pass

def _instantiate_datasource_from_config(
self, config: DatasourceConfig
self,
raw_config: DatasourceConfig,
substituted_config: DatasourceConfig,
) -> Datasource:
"""Instantiate a new datasource.
Args:
Expand All @@ -2755,14 +2777,18 @@ def _instantiate_datasource_from_config(
DatasourceInitializationError
"""
try:
datasource: Datasource = self._build_datasource_from_config(config=config)
datasource: Datasource = self._build_datasource_from_config(
raw_config=raw_config, substituted_config=substituted_config
)
except Exception as e:
raise ge_exceptions.DatasourceInitializationError(
datasource_name=config.name, message=str(e)
datasource_name=substituted_config.name, message=str(e)
)
return datasource

def _build_datasource_from_config(self, config: DatasourceConfig) -> Datasource:
def _build_datasource_from_config(
self, raw_config: DatasourceConfig, substituted_config: DatasourceConfig
) -> Datasource:
"""Instantiate a Datasource from a config.
Args:
Expand All @@ -2776,26 +2802,34 @@ def _build_datasource_from_config(self, config: DatasourceConfig) -> Datasource:
"""
# We convert from the type back to a dictionary for purposes of instantiation
serializer = DictConfigSerializer(schema=datasourceConfigSchema)
config_dict: dict = serializer.serialize(config)
substituted_config_dict: dict = serializer.serialize(substituted_config)

# While the new Datasource classes accept "data_context_root_directory", the Legacy Datasource classes do not.
if config_dict["class_name"] in [
if substituted_config_dict["class_name"] in [
"BaseDatasource",
"Datasource",
]:
config_dict.update({"data_context_root_directory": self.root_directory})
substituted_config_dict.update(
{"data_context_root_directory": self.root_directory}
)
module_name: str = "great_expectations.datasource"
datasource: Datasource = instantiate_class_from_config(
config=config_dict,
config=substituted_config_dict,
runtime_environment={"data_context": self, "concurrency": self.concurrency},
config_defaults={"module_name": module_name},
)
if not datasource:
raise ge_exceptions.ClassInstantiationError(
module_name=module_name,
package_name=None,
class_name=config["class_name"],
class_name=substituted_config_dict["class_name"],
)

# Chetan - 20221103 - Directly accessing private attr in order to patch security vulnerabiliy around credential leakage.
# This is to be removed once substitution logic is migrated from the context to the individual object level.
raw_config_dict: dict = serializer.serialize(raw_config)
datasource._raw_config = raw_config_dict

return datasource

def _perform_substitutions_on_datasource_config(
Expand Down Expand Up @@ -2856,7 +2890,7 @@ def _instantiate_datasource_from_config_and_update_project_config(
if initialize:
try:
datasource = self._instantiate_datasource_from_config(
config=substituted_config
raw_config=config, substituted_config=substituted_config
)
self._cached_datasources[config.name] = datasource
except ge_exceptions.DatasourceInitializationError as e:
Expand Down
5 changes: 5 additions & 0 deletions great_expectations/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ def __init__(
if batch_kwargs_generators is not None:
self._datasource_config["batch_kwargs_generators"] = batch_kwargs_generators

# Chetan - 20221103 - This attribute is meant to represent the config args used to instantiate the object (before ${VARIABLE} substitution).
# While downstream logic should override this value, we default to `self._datasource_config` as a backup.
# This is to be removed once substitution logic is migrated from the context to the individual object level.
self._raw_config = self._datasource_config

@property
def name(self):
"""
Expand Down
22 changes: 14 additions & 8 deletions great_expectations/datasource/new_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class BaseDatasource:
def __init__(
self,
name: str,
execution_engine=None,
execution_engine: Optional[dict] = None,
data_context_root_directory: Optional[str] = None,
concurrency: Optional[ConcurrencyConfig] = None,
id: Optional[str] = None,
Expand Down Expand Up @@ -59,14 +59,20 @@ def __init__(
runtime_environment={"concurrency": concurrency},
config_defaults={"module_name": "great_expectations.execution_engine"},
)
self._datasource_config: dict = {
"execution_engine": execution_engine,
"id": id,
"name": name,
}
except Exception as e:
raise ge_exceptions.ExecutionEngineError(message=str(e))

self._datasource_config: dict = {
"execution_engine": execution_engine,
"id": id,
"name": name,
}

# Chetan - 20221103 - This attribute is meant to represent the config args used to instantiate the object (before ${VARIABLE} substitution).
# While downstream logic should override this value, we default to `self._datasource_config` as a backup.
# This is to be removed once substitution logic is migrated from the context to the individual object level.
self._raw_config = self._datasource_config

self._data_connectors: dict = {}

def get_batch_from_batch_definition(
Expand Down Expand Up @@ -411,8 +417,8 @@ class Datasource(BaseDatasource):
def __init__(
self,
name: str,
execution_engine=None,
data_connectors=None,
execution_engine: Optional[dict] = None,
data_connectors: Optional[dict] = None,
data_context_root_directory: Optional[str] = None,
concurrency: Optional[ConcurrencyConfig] = None,
id: Optional[str] = None,
Expand Down
Loading

0 comments on commit a17701e

Please sign in to comment.