diff --git a/catalog/dags/common/constants.py b/catalog/dags/common/constants.py index 0105ecda921..c56778d473e 100644 --- a/catalog/dags/common/constants.py +++ b/catalog/dags/common/constants.py @@ -50,6 +50,11 @@ ES_PROD_HTTP_CONN_ID = "elasticsearch_http_production" REFRESH_POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30)) DATA_REFRESH_POOL = os.getenv("DATA_REFRESH_POOL", "data_refresh") +DATA_REFRESH_ALTER_BATCH_SIZE = ( + int(os.getenv("DATA_REFRESH_ALTER_BATCH_SIZE")) + if os.getenv("DATA_REFRESH_ALTER_BATCH_SIZE") + else None +) @dataclass diff --git a/catalog/dags/data_refresh/alter_data.py b/catalog/dags/data_refresh/alter_data.py index 260900bd3d1..0c490a2c149 100644 --- a/catalog/dags/data_refresh/alter_data.py +++ b/catalog/dags/data_refresh/alter_data.py @@ -199,7 +199,7 @@ def alter_table_data( batches = get_alter_batches( estimated_record_count.output, - batch_size="{{ var.value.get('DATA_REFRESH_ALTER_BATCH_SIZE', none) }}", + batch_size=data_refresh_config.alter_data_batch_size, ) alter_data = alter_data_batch.partial( diff --git a/catalog/dags/data_refresh/data_refresh_types.py b/catalog/dags/data_refresh/data_refresh_types.py index 0041443047d..8d0e9ecfaf6 100644 --- a/catalog/dags/data_refresh/data_refresh_types.py +++ b/catalog/dags/data_refresh/data_refresh_types.py @@ -10,7 +10,12 @@ from datetime import datetime, timedelta from typing import Optional -from common.constants import AUDIO, IMAGE, REFRESH_POKE_INTERVAL +from common.constants import ( + AUDIO, + DATA_REFRESH_ALTER_BATCH_SIZE, + IMAGE, + REFRESH_POKE_INTERVAL, +) from data_refresh import queries @@ -70,6 +75,10 @@ class DataRefreshConfig: data refresh may take. copy_data_timeout: timedelta expressing the amount of time it may take to copy the upstream table into the downstream DB + add_primary_key_timeout: timedelta expressing the amount of time it may take to + add the primary key to the temp table + alter_data_batch_size: int number of records to process per batch in alter_data + tasks indexer_worker_timeout: timedelta expressing the amount of time it may take for any individual indexer worker to perform its portion of the distributed reindex @@ -96,6 +105,7 @@ class DataRefreshConfig: dag_timeout: timedelta = timedelta(days=1) copy_data_timeout: timedelta = timedelta(hours=1) add_primary_key_timeout: timedelta = timedelta(hours=1) + alter_data_batch_size: int = DATA_REFRESH_ALTER_BATCH_SIZE indexer_worker_timeout: timedelta = timedelta(hours=12) index_readiness_timeout: timedelta = timedelta(days=1) create_filtered_index_timeout: timedelta = timedelta(days=1) @@ -126,6 +136,10 @@ def table_mappings(self) -> list[TableMapping]: dag_timeout=timedelta(days=4), copy_data_timeout=timedelta(hours=12), add_primary_key_timeout=timedelta(hours=12), + # Larger batches for image data refresh to avoid overloading XCOMs + alter_data_batch_size=int( + os.getenv("DATA_REFRESH_ALTER_BATCH_SIZE", 1_000_000) + ), indexer_worker_timeout=timedelta(days=1), concurrency_check_poke_interval=int( os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5)