diff --git a/dagster_dbt_processing/assets/constant.py b/dagster_dbt_processing/assets/constant.py index 396aa9c..3d03542 100644 --- a/dagster_dbt_processing/assets/constant.py +++ b/dagster_dbt_processing/assets/constant.py @@ -150,9 +150,9 @@ def review_null_value(df: pl.DataFrame, context: AssetExecutionContext): context.log.info(df.null_count().to_arrow()) -def fetch_metadata(conn): +def fetch_metadata(conn, table_name): metadata = conn.execute( - f"SELECT * FROM duckdb_tables() WHERE table_name = '{TABLE_ANL_BASE_REPORT}'" + f"SELECT * FROM duckdb_tables() WHERE table_name = '{table_name}'" ).fetchdf() return metadata diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_base_report.py b/dagster_dbt_processing/assets/ingest_analytics/anl_base_report.py index d52a791..e6f302c 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_base_report.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_base_report.py @@ -104,7 +104,7 @@ def anl_base_report(context: AssetExecutionContext, duckdb: DuckDBResource): schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_AGG_BASE_NAME, file_name=TABLE_ANL_DIM_AGG_BASE_NAME.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_BASE_REPORT) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_fhv_vehicle.py b/dagster_dbt_processing/assets/ingest_analytics/anl_fhv_vehicle.py index c5dcd02..b90c21d 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_fhv_vehicle.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_fhv_vehicle.py @@ -151,7 +151,7 @@ def anl_fhv_vehicles(context: AssetExecutionContext, duckdb: DuckDBResource): schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_VEHICLE_YEAR_FHV_VEHICLE, file_name=TABLE_ANL_DIM_VEHICLE_YEAR_FHV_VEHICLE.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_FHV_VEHICLE) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_lost_property.py b/dagster_dbt_processing/assets/ingest_analytics/anl_lost_property.py index 1754af7..e63bb67 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_lost_property.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_lost_property.py @@ -119,7 +119,7 @@ def anl_lost_property(context: AssetExecutionContext, duckdb: DuckDBResource): schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_LOST_PROPERTY_VEHICLES, file_name=TABLE_ANL_DIM_LOST_PROPERTY_VEHICLES.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_LOST_PROPERTY) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_meter_shop.py b/dagster_dbt_processing/assets/ingest_analytics/anl_meter_shop.py index 64d6ccf..9cec4af 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_meter_shop.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_meter_shop.py @@ -87,7 +87,7 @@ def anl_meter_shop(context: AssetExecutionContext, duckdb: DuckDBResource): schema=REPORT_SCHEMA, table=TABLE_ANL_METER_SHOPS, file_name=TABLE_ANL_METER_SHOPS.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_METER_SHOPS) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_monthly_report.py b/dagster_dbt_processing/assets/ingest_analytics/anl_monthly_report.py index a2269b5..9e1400c 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_monthly_report.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_monthly_report.py @@ -87,7 +87,7 @@ def anl_monthly_report(context: AssetExecutionContext, duckdb: DuckDBResource): schema=REPORT_SCHEMA, table=TABLE_ANL_MONTHLY_REPORT, file_name=TABLE_ANL_MONTHLY_REPORT.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_MONTHLY_REPORT) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_new_driver.py b/dagster_dbt_processing/assets/ingest_analytics/anl_new_driver.py index c2668fe..253f2a8 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_new_driver.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_new_driver.py @@ -84,7 +84,7 @@ def anl_new_driver(context: AssetExecutionContext, duckdb: DuckDBResource): schema=REPORT_SCHEMA, table=TABLE_ANL_NEW_DRIVERS, file_name=TABLE_ANL_NEW_DRIVERS.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_NEW_DRIVERS) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_shl_inspect.py b/dagster_dbt_processing/assets/ingest_analytics/anl_shl_inspect.py index 9c827dc..6707f7e 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_shl_inspect.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_shl_inspect.py @@ -100,7 +100,7 @@ def anl_shl_inspect(context: AssetExecutionContext, duckdb: DuckDBResource): schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_INSPECT_BASE, file_name=TABLE_ANL_DIM_INSPECT_BASE.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_SHL_INSPECT) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_shl_permit.py b/dagster_dbt_processing/assets/ingest_analytics/anl_shl_permit.py index ca4f7d0..24d6785 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_shl_permit.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_shl_permit.py @@ -115,7 +115,7 @@ def anl_shl_permit(context: AssetExecutionContext, duckdb: DuckDBResource): schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_BASE, file_name=TABLE_ANL_DIM_BASE.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_SHL_PERMIT) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_analytics/anl_taxi_zone.py b/dagster_dbt_processing/assets/ingest_analytics/anl_taxi_zone.py index ede6a25..361c03a 100644 --- a/dagster_dbt_processing/assets/ingest_analytics/anl_taxi_zone.py +++ b/dagster_dbt_processing/assets/ingest_analytics/anl_taxi_zone.py @@ -81,7 +81,7 @@ def anl_taxi_zone(context: AssetExecutionContext, duckdb: DuckDBResource): schema=ZONES_SCHEMA, table=TABLE_ANL_TAXI_ZONES, file_name=TABLE_ANL_TAXI_ZONES.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_TAXI_ZONES) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_geometry/geo_nta.py b/dagster_dbt_processing/assets/ingest_geometry/geo_nta.py index 4c4a553..fe1b88b 100644 --- a/dagster_dbt_processing/assets/ingest_geometry/geo_nta.py +++ b/dagster_dbt_processing/assets/ingest_geometry/geo_nta.py @@ -75,7 +75,7 @@ def geo_nta(context: AssetExecutionContext, duckdb: DuckDBResource): schema=ZONES_SCHEMA, table=TABLE_GEO_NTA, file_name=TABLE_GEO_NTA.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_GEO_NTA) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_geometry/geo_taxi_zone.py b/dagster_dbt_processing/assets/ingest_geometry/geo_taxi_zone.py index ae3ba09..cb14614 100644 --- a/dagster_dbt_processing/assets/ingest_geometry/geo_taxi_zone.py +++ b/dagster_dbt_processing/assets/ingest_geometry/geo_taxi_zone.py @@ -75,7 +75,7 @@ def geo_taxi_zone(context: AssetExecutionContext, duckdb: DuckDBResource): schema=ZONES_SCHEMA, table=TABLE_GEO_TAXI_ZONES, file_name=TABLE_GEO_TAXI_ZONES.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_GEO_TAXI_ZONES) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_time_series/ts_aqe.py b/dagster_dbt_processing/assets/ingest_time_series/ts_aqe.py index 33604d2..3a842a3 100644 --- a/dagster_dbt_processing/assets/ingest_time_series/ts_aqe.py +++ b/dagster_dbt_processing/assets/ingest_time_series/ts_aqe.py @@ -82,7 +82,7 @@ def ts_aqe(context: AssetExecutionContext, duckdb: DuckDBResource): schema=ENV_SCHEMA, table=TABLE_TS_AQE, file_name=TABLE_TS_AQE.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_AQE) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_time_series/ts_fhv_trips.py b/dagster_dbt_processing/assets/ingest_time_series/ts_fhv_trips.py index 1dc4bec..ea6f82d 100644 --- a/dagster_dbt_processing/assets/ingest_time_series/ts_fhv_trips.py +++ b/dagster_dbt_processing/assets/ingest_time_series/ts_fhv_trips.py @@ -83,7 +83,7 @@ def ts_fhv_trips(context: AssetExecutionContext, duckdb: DuckDBResource): context.log.info(f"Processing {sub_list} finished _ idx : {idx}") - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_FHV_TRIPS) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_time_series/ts_green_trips.py b/dagster_dbt_processing/assets/ingest_time_series/ts_green_trips.py index 171c79d..0f5df18 100644 --- a/dagster_dbt_processing/assets/ingest_time_series/ts_green_trips.py +++ b/dagster_dbt_processing/assets/ingest_time_series/ts_green_trips.py @@ -118,7 +118,7 @@ def ts_green_trips(context: AssetExecutionContext, duckdb: DuckDBResource): context.log.info(f"Processing {sub_list} finished _ idx : {idx}") - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_GREEN_TRIPS) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_time_series/ts_hourly_monitoring.py b/dagster_dbt_processing/assets/ingest_time_series/ts_hourly_monitoring.py index 0a97591..8bf12f4 100644 --- a/dagster_dbt_processing/assets/ingest_time_series/ts_hourly_monitoring.py +++ b/dagster_dbt_processing/assets/ingest_time_series/ts_hourly_monitoring.py @@ -81,7 +81,7 @@ def ts_hourly_monitoring(context: AssetExecutionContext, duckdb: DuckDBResource) schema=ENV_SCHEMA, table=TABLE_TS_HOURLY_MONITORING, file_name=TABLE_TS_HOURLY_MONITORING.lower().replace("_", ""), context=context) - metadata = fetch_metadata(conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_HOURLY_MONITORING) metadata_mapping = create_column_value_dict(metadata) diff --git a/dagster_dbt_processing/assets/ingest_time_series/ts_yellow_trips.py b/dagster_dbt_processing/assets/ingest_time_series/ts_yellow_trips.py index 50a8b8b..9cc3218 100644 --- a/dagster_dbt_processing/assets/ingest_time_series/ts_yellow_trips.py +++ b/dagster_dbt_processing/assets/ingest_time_series/ts_yellow_trips.py @@ -108,7 +108,7 @@ def ts_yellow_trips(context: AssetExecutionContext, duckdb: DuckDBResource): file_name=TABLE_TS_DIM_VENDOR.lower().replace("_", ""), context=context, idx=idx) context.log.info(f"Processing {sub_list} finished _ idx : {idx}") - metadata = fetch_metadata(conn=conn) + metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_YELLOW_TRIPS) metadata_mapping = create_column_value_dict(metadata)