Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
fix: add parameter to fetch_metadata() function
  • Loading branch information
KhaiHuy123 committed Nov 6, 2024
1 parent 00cd378 commit 5828a0f
Show file tree
Hide file tree
Showing 17 changed files with 18 additions and 18 deletions.
4 changes: 2 additions & 2 deletions dagster_dbt_processing/assets/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ def review_null_value(df: pl.DataFrame, context: AssetExecutionContext):
context.log.info(df.null_count().to_arrow())


def fetch_metadata(conn):
def fetch_metadata(conn, table_name):
metadata = conn.execute(
f"SELECT * FROM duckdb_tables() WHERE table_name = '{TABLE_ANL_BASE_REPORT}'"
f"SELECT * FROM duckdb_tables() WHERE table_name = '{table_name}'"
).fetchdf()
return metadata

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def anl_base_report(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_AGG_BASE_NAME,
file_name=TABLE_ANL_DIM_AGG_BASE_NAME.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_BASE_REPORT)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def anl_fhv_vehicles(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_VEHICLE_YEAR_FHV_VEHICLE,
file_name=TABLE_ANL_DIM_VEHICLE_YEAR_FHV_VEHICLE.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_FHV_VEHICLE)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def anl_lost_property(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_LOST_PROPERTY_VEHICLES,
file_name=TABLE_ANL_DIM_LOST_PROPERTY_VEHICLES.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_LOST_PROPERTY)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def anl_meter_shop(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=REPORT_SCHEMA, table=TABLE_ANL_METER_SHOPS,
file_name=TABLE_ANL_METER_SHOPS.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_METER_SHOPS)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def anl_monthly_report(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=REPORT_SCHEMA, table=TABLE_ANL_MONTHLY_REPORT,
file_name=TABLE_ANL_MONTHLY_REPORT.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_MONTHLY_REPORT)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def anl_new_driver(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=REPORT_SCHEMA, table=TABLE_ANL_NEW_DRIVERS,
file_name=TABLE_ANL_NEW_DRIVERS.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_NEW_DRIVERS)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def anl_shl_inspect(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_INSPECT_BASE,
file_name=TABLE_ANL_DIM_INSPECT_BASE.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_SHL_INSPECT)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def anl_shl_permit(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=SERVICES_SCHEMA, table=TABLE_ANL_DIM_BASE,
file_name=TABLE_ANL_DIM_BASE.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_SHL_PERMIT)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def anl_taxi_zone(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=ZONES_SCHEMA, table=TABLE_ANL_TAXI_ZONES,
file_name=TABLE_ANL_TAXI_ZONES.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_ANL_TAXI_ZONES)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
2 changes: 1 addition & 1 deletion dagster_dbt_processing/assets/ingest_geometry/geo_nta.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def geo_nta(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=ZONES_SCHEMA, table=TABLE_GEO_NTA,
file_name=TABLE_GEO_NTA.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_GEO_NTA)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def geo_taxi_zone(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=ZONES_SCHEMA, table=TABLE_GEO_TAXI_ZONES,
file_name=TABLE_GEO_TAXI_ZONES.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_GEO_TAXI_ZONES)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
2 changes: 1 addition & 1 deletion dagster_dbt_processing/assets/ingest_time_series/ts_aqe.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def ts_aqe(context: AssetExecutionContext, duckdb: DuckDBResource):
schema=ENV_SCHEMA, table=TABLE_TS_AQE,
file_name=TABLE_TS_AQE.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_AQE)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def ts_fhv_trips(context: AssetExecutionContext, duckdb: DuckDBResource):

context.log.info(f"Processing {sub_list} finished _ idx : {idx}")

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_FHV_TRIPS)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def ts_green_trips(context: AssetExecutionContext, duckdb: DuckDBResource):

context.log.info(f"Processing {sub_list} finished _ idx : {idx}")

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_GREEN_TRIPS)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def ts_hourly_monitoring(context: AssetExecutionContext, duckdb: DuckDBResource)
schema=ENV_SCHEMA, table=TABLE_TS_HOURLY_MONITORING,
file_name=TABLE_TS_HOURLY_MONITORING.lower().replace("_", ""), context=context)

metadata = fetch_metadata(conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_HOURLY_MONITORING)

metadata_mapping = create_column_value_dict(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def ts_yellow_trips(context: AssetExecutionContext, duckdb: DuckDBResource):
file_name=TABLE_TS_DIM_VENDOR.lower().replace("_", ""), context=context, idx=idx)
context.log.info(f"Processing {sub_list} finished _ idx : {idx}")

metadata = fetch_metadata(conn=conn)
metadata = fetch_metadata(conn=conn, table_name=TABLE_TS_YELLOW_TRIPS)

metadata_mapping = create_column_value_dict(metadata)

Expand Down

0 comments on commit 5828a0f

Please sign in to comment.