Skip to content

Commit

Permalink
[Backport 8.16] Allow retries for statuses other than 429 in streamin…
Browse files Browse the repository at this point in the history
…g bulk (#2702)

Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
Co-authored-by: Quentin Pradet <quentin.pradet@elastic.co>
Co-authored-by: Aaron Hoffer <4275843+ayayron@users.noreply.github.com>
  • Loading branch information
4 people authored Nov 12, 2024
1 parent 0544a4b commit ee10e2a
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 20 deletions.
29 changes: 19 additions & 10 deletions elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ async def async_streaming_bulk(
max_backoff: float = 600,
yield_ok: bool = True,
ignore_status: Union[int, Collection[int]] = (),
retry_on_status: Union[int, Collection[int]] = (429,),
*args: Any,
**kwargs: Any,
) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]:
Expand All @@ -184,10 +185,11 @@ async def async_streaming_bulk(
entire input is consumed and sent.
If you specify ``max_retries`` it will also retry any documents that were
rejected with a ``429`` status code. To do this it will wait (**by calling
asyncio.sleep**) for ``initial_backoff`` seconds and then,
every subsequent rejection for the same chunk, for double the time every
time up to ``max_backoff`` seconds.
rejected with a ``429`` status code. Use ``retry_on_status`` to
configure which status codes will be retried. To do this it will wait
(**by calling asyncio.sleep which will block**) for ``initial_backoff`` seconds
and then, every subsequent rejection for the same chunk, for double the time
every time up to ``max_backoff`` seconds.
:arg client: instance of :class:`~elasticsearch.AsyncElasticsearch` to use
:arg actions: iterable or async iterable containing the actions to be executed
Expand All @@ -200,8 +202,11 @@ async def async_streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_on_status: HTTP status code that will trigger a retry.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
retry_on_status (defaulting to ``429``) is received,
set to 0 (default) for no retries
:arg initial_backoff: number of seconds we should wait before the first
retry. Any subsequent retries will be powers of ``initial_backoff *
2**retry_number``
Expand All @@ -213,6 +218,9 @@ async def async_streaming_bulk(
client = client.options()
client._client_meta = (("h", "bp"),)

if isinstance(retry_on_status, int):
retry_on_status = (retry_on_status,)

async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
async for item in aiter(actions):
yield expand_action_callback(item)
Expand Down Expand Up @@ -264,11 +272,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
):
if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and status in retry_on_status (defaulting to 429)
if (
max_retries
and info["status"] == 429
and info["status"] in retry_on_status
and (attempt + 1) <= max_retries
):
# _process_bulk_chunk expects strings so we need to
Expand All @@ -281,8 +289,9 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
yield ok, info

except ApiError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress any status in retry_on_status (429 by default)
# since we will retry them
if attempt == max_retries or e.status_code not in retry_on_status:
raise
else:
if not to_retry:
Expand Down
29 changes: 19 additions & 10 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def streaming_bulk(
max_backoff: float = 600,
yield_ok: bool = True,
ignore_status: Union[int, Collection[int]] = (),
retry_on_status: Union[int, Collection[int]] = (429,),
span_name: str = "helpers.streaming_bulk",
*args: Any,
**kwargs: Any,
Expand All @@ -386,10 +387,11 @@ def streaming_bulk(
entire input is consumed and sent.
If you specify ``max_retries`` it will also retry any documents that were
rejected with a ``429`` status code. To do this it will wait (**by calling
time.sleep which will block**) for ``initial_backoff`` seconds and then,
every subsequent rejection for the same chunk, for double the time every
time up to ``max_backoff`` seconds.
rejected with a ``429`` status code. Use ``retry_on_status`` to
configure which status codes will be retried. To do this it will wait
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
and then, every subsequent rejection for the same chunk, for double the time
every time up to ``max_backoff`` seconds.
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
:arg actions: iterable containing the actions to be executed
Expand All @@ -402,8 +404,11 @@ def streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_on_status: HTTP status code that will trigger a retry.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
retry_on_status (defaulting to ``429``) is received,
set to 0 (default) for no retries
:arg initial_backoff: number of seconds we should wait before the first
retry. Any subsequent retries will be powers of ``initial_backoff *
2**retry_number``
Expand All @@ -415,6 +420,9 @@ def streaming_bulk(
client = client.options()
client._client_meta = (("h", "bp"),)

if isinstance(retry_on_status, int):
retry_on_status = (retry_on_status,)

serializer = client.transport.serializers.get_serializer("application/json")

bulk_data: List[
Expand Down Expand Up @@ -458,11 +466,11 @@ def streaming_bulk(
):
if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and status in retry_on_status (defaulting to 429)
if (
max_retries
and info["status"] == 429
and info["status"] in retry_on_status
and (attempt + 1) <= max_retries
):
# _process_bulk_chunk expects bytes so we need to
Expand All @@ -475,8 +483,9 @@ def streaming_bulk(
yield ok, info

except ApiError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress any status in retry_on_status (429 by default)
# since we will retry them
if attempt == max_retries or e.status_code not in retry_on_status:
raise
else:
if not to_retry:
Expand Down
39 changes: 39 additions & 0 deletions test_elasticsearch/test_async/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,45 @@ async def streaming_bulk():
await streaming_bulk()
assert 4 == failing_client._called

async def test_connection_timeout_is_retried_with_retry_status_callback(
self, async_client
):
failing_client = FailingBulkClient(
async_client,
fail_with=ApiError(
message="Connection timed out!",
body={},
meta=ApiResponseMeta(
status=522, headers={}, http_version="1.1", duration=0, node=None
),
),
)
docs = [
{"_index": "i", "_id": 47, "f": "v"},
{"_index": "i", "_id": 45, "f": "v"},
{"_index": "i", "_id": 42, "f": "v"},
]

results = [
x
async for x in helpers.async_streaming_bulk(
failing_client,
docs,
raise_on_exception=False,
raise_on_error=False,
chunk_size=1,
retry_on_status=522,
max_retries=1,
initial_backoff=0,
)
]
assert 3 == len(results)
assert [True, True, True] == [r[0] for r in results]
await async_client.indices.refresh(index="i")
res = await async_client.search(index="i")
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
assert 4 == failing_client._called


class TestBulk:
async def test_bulk_works_with_single_item(self, async_client):
Expand Down
39 changes: 39 additions & 0 deletions test_elasticsearch/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,45 @@ def streaming_bulk():
assert 4 == failing_client._called


def test_connection_timeout_is_retried_with_retry_status_callback(sync_client):
failing_client = FailingBulkClient(
sync_client,
fail_with=ApiError(
message="Connection timed out!",
body={},
meta=ApiResponseMeta(
status=522, headers={}, http_version="1.1", duration=0, node=None
),
),
)
docs = [
{"_index": "i", "_id": 47, "f": "v"},
{"_index": "i", "_id": 45, "f": "v"},
{"_index": "i", "_id": 42, "f": "v"},
]

results = list(
helpers.streaming_bulk(
failing_client,
docs,
index="i",
raise_on_exception=False,
raise_on_error=False,
chunk_size=1,
retry_on_status=522,
max_retries=1,
initial_backoff=0,
)
)
assert 3 == len(results)
print(results)
assert [True, True, True] == [r[0] for r in results]
sync_client.indices.refresh(index="i")
res = sync_client.search(index="i")
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
assert 4 == failing_client._called


def test_bulk_works_with_single_item(sync_client):
docs = [{"answer": 42, "_id": 1}]
success, failed = helpers.bulk(sync_client, docs, index="test-index", refresh=True)
Expand Down

0 comments on commit ee10e2a

Please sign in to comment.