diff --git a/liiatools/cin_census_pipeline/pipeline.py b/liiatools/cin_census_pipeline/pipeline.py index 47a4848..68fa8b4 100644 --- a/liiatools/cin_census_pipeline/pipeline.py +++ b/liiatools/cin_census_pipeline/pipeline.py @@ -145,7 +145,7 @@ def create_current_view(archive: DataframeArchive, process_folder: FS) -> FS: def create_reports(current_folder: FS, process_folder: FS): export_folder = process_folder.makedirs("export", recreate=True) - aggregate = DataframeAggregator(current_folder, load_pipeline_config()) + aggregate = DataframeAggregator(current_folder, load_pipeline_config(), dataset="cin") aggregate_data = aggregate.current() for report in ["PAN"]: diff --git a/liiatools/common/aggregate.py b/liiatools/common/aggregate.py index 14acd03..650bb80 100644 --- a/liiatools/common/aggregate.py +++ b/liiatools/common/aggregate.py @@ -15,9 +15,10 @@ class DataframeAggregator: Only tables and columns defined in the pipeline config are aggregated. """ - def __init__(self, fs: FS, config: PipelineConfig): + def __init__(self, fs: FS, config: PipelineConfig, dataset: str): self.fs = fs self.config = config + self.dataset = dataset def list_files(self) -> List[str]: """ @@ -37,7 +38,7 @@ def load_file(self, file) -> DataContainer: Load a file from the current directory. """ data = DataContainer() - table_id = re.search(r"_([a-zA-Z0-9]*)\.", file) + table_id = re.search(fr"{self.dataset}_([a-zA-Z0-9_]*)\.", file) for table_spec in self.config.table_list: if table_id and table_id.group(1) == table_spec.id: diff --git a/liiatools/ssda903_pipeline/pipeline.py b/liiatools/ssda903_pipeline/pipeline.py index 7f662c2..b6c05b0 100644 --- a/liiatools/ssda903_pipeline/pipeline.py +++ b/liiatools/ssda903_pipeline/pipeline.py @@ -137,7 +137,7 @@ def create_current_view(archive: DataframeArchive, process_folder: FS) -> FS: def create_reports(current_folder: FS, process_folder: FS): export_folder = process_folder.makedirs("export", recreate=True) - aggregate = DataframeAggregator(current_folder, load_pipeline_config()) + aggregate = DataframeAggregator(current_folder, load_pipeline_config(), dataset="ssda903") aggregate_data = aggregate.current() for report in ["PAN", "SUFFICIENCY"]: diff --git a/liiatools_pipeline/ops/common_org.py b/liiatools_pipeline/ops/common_org.py index 2b25d62..1b09293 100644 --- a/liiatools_pipeline/ops/common_org.py +++ b/liiatools_pipeline/ops/common_org.py @@ -92,7 +92,7 @@ def create_reports( f"current/{config.dataset}", recreate=True ) log.info("Aggregating Data Frames...") - aggregate = DataframeAggregator(session_folder, pipeline_config(config)) + aggregate = DataframeAggregator(session_folder, pipeline_config(config), config.dataset) aggregate_data = aggregate.current() log.debug(f"Using config: {config}") for report in pipeline_config(config).retention_period.keys():