Skip to content

Commit

Permalink
Make alter data batch size configurable by media type (#5124)
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc authored Oct 31, 2024
1 parent 452558a commit ea7766d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
5 changes: 5 additions & 0 deletions catalog/dags/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
ES_PROD_HTTP_CONN_ID = "elasticsearch_http_production"
REFRESH_POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30))
DATA_REFRESH_POOL = os.getenv("DATA_REFRESH_POOL", "data_refresh")
DATA_REFRESH_ALTER_BATCH_SIZE = (
int(os.getenv("DATA_REFRESH_ALTER_BATCH_SIZE"))
if os.getenv("DATA_REFRESH_ALTER_BATCH_SIZE")
else None
)


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/data_refresh/alter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def alter_table_data(

batches = get_alter_batches(
estimated_record_count.output,
batch_size="{{ var.value.get('DATA_REFRESH_ALTER_BATCH_SIZE', none) }}",
batch_size=data_refresh_config.alter_data_batch_size,
)

alter_data = alter_data_batch.partial(
Expand Down
16 changes: 15 additions & 1 deletion catalog/dags/data_refresh/data_refresh_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from datetime import datetime, timedelta
from typing import Optional

from common.constants import AUDIO, IMAGE, REFRESH_POKE_INTERVAL
from common.constants import (
AUDIO,
DATA_REFRESH_ALTER_BATCH_SIZE,
IMAGE,
REFRESH_POKE_INTERVAL,
)
from data_refresh import queries


Expand Down Expand Up @@ -70,6 +75,10 @@ class DataRefreshConfig:
data refresh may take.
copy_data_timeout: timedelta expressing the amount of time it may take to
copy the upstream table into the downstream DB
add_primary_key_timeout: timedelta expressing the amount of time it may take to
add the primary key to the temp table
alter_data_batch_size: int number of records to process per batch in alter_data
tasks
indexer_worker_timeout: timedelta expressing the amount of time it may take for
any individual indexer worker to perform its portion of
the distributed reindex
Expand All @@ -96,6 +105,7 @@ class DataRefreshConfig:
dag_timeout: timedelta = timedelta(days=1)
copy_data_timeout: timedelta = timedelta(hours=1)
add_primary_key_timeout: timedelta = timedelta(hours=1)
alter_data_batch_size: int = DATA_REFRESH_ALTER_BATCH_SIZE
indexer_worker_timeout: timedelta = timedelta(hours=12)
index_readiness_timeout: timedelta = timedelta(days=1)
create_filtered_index_timeout: timedelta = timedelta(days=1)
Expand Down Expand Up @@ -126,6 +136,10 @@ def table_mappings(self) -> list[TableMapping]:
dag_timeout=timedelta(days=4),
copy_data_timeout=timedelta(hours=12),
add_primary_key_timeout=timedelta(hours=12),
# Larger batches for image data refresh to avoid overloading XCOMs
alter_data_batch_size=int(
os.getenv("DATA_REFRESH_ALTER_BATCH_SIZE", 1_000_000)
),
indexer_worker_timeout=timedelta(days=1),
concurrency_check_poke_interval=int(
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5)
Expand Down

0 comments on commit ea7766d

Please sign in to comment.