diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 5fd64fdd43..c9a58af432 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -1,9 +1,11 @@ """A collection of CLI commands for working with Kedro catalog.""" +import copy from collections import defaultdict import click import yaml from click import secho +from parse import parse from kedro.framework.cli.utils import KedroCliError, env_option, split_string from kedro.framework.project import pipelines, settings @@ -174,3 +176,64 @@ def _add_missing_datasets_to_catalog(missing_ds, catalog_path): catalog_path.parent.mkdir(exist_ok=True) with catalog_path.open(mode="w") as catalog_file: yaml.safe_dump(catalog_config, catalog_file, default_flow_style=False) + + +def pick_best_match(matches): + matches = sorted(matches, key=lambda x: (specificity(x[0]), -x[0].count("{"), x[0])) + return matches[0] + + +def specificity(pattern): + """This function will check length of exactly matched characters not inside brackets + Example - + specificity("{namespace}.companies") = 10 + specificity("{namespace}.{dataset}") = 1 + specificity("france.companies") = 16 + """ + pattern_variables = parse(pattern, pattern).named + for k in pattern_variables: + pattern_variables[k] = "" + specific_characters = pattern.format(**pattern_variables) + return -len(specific_characters) + + +@catalog.command("resolve") +@env_option +@click.pass_obj +def resolve_catalog_datasets(metadata: ProjectMetadata, env): + session = _create_session(metadata.package_name, env=env) + context = session.load_context() + catalog_conf = context.config_loader["catalog"] + + # Create a list of all datasets used in the project pipelines. + pipeline_datasets = [] + for _, pl_obj in pipelines.items(): + pipeline_ds = pl_obj.data_sets() + for dataset in pipeline_ds: + pipeline_datasets.append(dataset) + pipeline_datasets = set(pipeline_datasets) + result_catalog = {} + for pipeline_dataset in pipeline_datasets: + matches = [] + for ds_name in catalog_conf.keys(): + result = parse(ds_name, pipeline_dataset) + if not result: + continue + # We have found a match! + matches.append((ds_name, result)) + if len(matches) == 0: + # print(f"skipping {pipeline_dataset} -> maybe params or MemoryDataSet") + continue + best_match, result = pick_best_match(matches) + best_match_config = copy.deepcopy(catalog_conf[best_match]) + # Match results to patterns in best matching catalog entry + for key, value in best_match_config.items(): + string_value = str(value) + try: + formatted_string = string_value.format_map(result.named) + except KeyError: + # Dataset config has a placeholder which is not present in the ds name + print(f"'{key}' has invalid catalog configuration") + best_match_config[key] = formatted_string + result_catalog[pipeline_dataset] = best_match_config + secho(yaml.dump(result_catalog))