Skip to content

Commit

Permalink
Configure poke interval for the filtered index creation by environment (
Browse files Browse the repository at this point in the history
#5114)

* Configure poke interval for the filtered index creation by environment

* Set kwarg explicitly in mapped task

* Adjust poke intervals
  • Loading branch information
stacimc authored Oct 31, 2024
1 parent ab8a52d commit 452558a
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 4 deletions.
3 changes: 2 additions & 1 deletion catalog/dags/common/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def trigger_and_wait_for_reindex(
max_docs: int | None = None,
refresh: bool = True,
slices: Union[int, Literal["auto"]] = "auto",
poke_interval: int = REFRESH_POKE_INTERVAL,
):
@task
def trigger_reindex(
Expand Down Expand Up @@ -208,7 +209,7 @@ def wait_for_reindex(
slices,
)

wait_for_reindex_task = wait_for_reindex(
wait_for_reindex_task = wait_for_reindex.override(poke_interval=poke_interval)(
task_id=trigger_reindex_task, expected_docs=max_docs, es_host=es_host
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def create_and_populate_filtered_index(
origin_index_name: str,
filtered_index_name: str,
timeout: timedelta,
poke_interval: int,
):
"""
Create and populate a filtered index based on the given origin index, excluding
Expand Down Expand Up @@ -90,6 +91,7 @@ def create_and_populate_filtered_index(
}
},
refresh=False,
poke_interval=poke_interval,
)

refresh_index = es.refresh_index(es_host=es_host, index_name=filtered_index_name)
Expand Down
1 change: 1 addition & 0 deletions catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def create_data_refresh_dag(
origin_index_name=target_index_name,
filtered_index_name=filtered_index_name,
timeout=data_refresh_config.create_filtered_index_timeout,
poke_interval=data_refresh_config.reindex_poke_interval,
)

# Re-enable Cloudwatch alarms once reindexing is complete, even if it
Expand Down
6 changes: 3 additions & 3 deletions catalog/dags/data_refresh/data_refresh_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def table_mappings(self) -> list[TableMapping]:
add_primary_key_timeout=timedelta(hours=12),
indexer_worker_timeout=timedelta(days=1),
concurrency_check_poke_interval=int(
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5)
),
reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)),
reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 10)),
),
AUDIO: DataRefreshConfig(
media_type=AUDIO,
Expand All @@ -152,6 +152,6 @@ def table_mappings(self) -> list[TableMapping]:
concurrency_check_poke_interval=int(
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30)
),
reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)),
reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5)),
),
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
AUDIO,
DAG_DEFAULT_ARGS,
MEDIA_TYPES,
REFRESH_POKE_INTERVAL,
STAGING,
)
from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG
Expand Down Expand Up @@ -158,6 +159,7 @@ def create_proportional_by_source_staging_index():
# Do not refresh the index after each partial reindex
refresh=False,
es_host=es_host,
poke_interval=REFRESH_POKE_INTERVAL,
).expand_kwargs(desired_source_counts)

refresh_destination_index = es.refresh_index(
Expand Down

0 comments on commit 452558a

Please sign in to comment.