Skip to content

Commit

Permalink
Merge pull request #171 from Aiven-Open/exg-s3-retry
Browse files Browse the repository at this point in the history
Improve retry handling for s3 transfers
  • Loading branch information
giacomo-alzetta-aiven authored Mar 1, 2024
2 parents e09948f + 24d4c81 commit 02f5ae6
Showing 1 changed file with 40 additions and 42 deletions.
82 changes: 40 additions & 42 deletions rohmu/object_storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def __init__(
s3={"addressing_style": S3AddressingStyle(addressing_style).value},
signature_version=signature_version,
proxies=proxies,
retries={
"max_attempts": 10,
"mode": "standard",
},
**timeouts,
)
if not is_verify_tls and cert_path is not None:
Expand Down Expand Up @@ -402,54 +406,48 @@ def multipart_upload_file_object(
if not data:
break

attempts = 10
start_of_part_upload = time.monotonic()
while True:
attempts -= 1
self.stats.operation(StorageOperation.store_file, size=len(data))
self.stats.operation(StorageOperation.store_file, size=len(data))
try:
cup_response = self.s3_client.upload_part(
Body=data,
Bucket=self.bucket_name,
Key=path,
PartNumber=part_number,
UploadId=mp_id,
)
except botocore.exceptions.ClientError as ex:
self.log.exception("Uploading part %d for %s failed", part_number, path)
self.stats.operation(StorageOperation.multipart_aborted)
try:
cup_response = self.s3_client.upload_part(
Body=data,
self.s3_client.abort_multipart_upload(
Bucket=self.bucket_name,
Key=path,
PartNumber=part_number,
UploadId=mp_id,
)
except botocore.exceptions.ClientError as ex:
self.log.exception("Uploading part %d for %s failed, attempts left: %d", part_number, path, attempts)
if attempts <= 0:
self.stats.operation(StorageOperation.multipart_aborted)
try:
self.s3_client.abort_multipart_upload(
Bucket=self.bucket_name,
Key=path,
UploadId=mp_id,
)
finally:
err = f"Multipart upload of {path} failed: {ex.__class__.__name__}: {ex}"
raise StorageError(err) from ex
else:
time.sleep(1.0)
else:
self.log.info(
"Uploaded part %s of %s, size %s in %.2fs",
part_number,
chunks,
len(data),
time.monotonic() - start_of_part_upload,
)
parts.append(
{
"ETag": cup_response["ETag"],
"PartNumber": part_number,
}
)
part_number += 1
bytes_sent += len(data)
if progress_fn:
# TODO: change this to incremental progress. Size parameter is currently unused.
progress_fn(bytes_sent, size) # type: ignore[arg-type]
break
finally:
err = f"Multipart upload of {path} failed: {ex.__class__.__name__}: {ex}"
raise StorageError(err) from ex
else:
self.log.info(
"Uploaded part %s of %s, size %s in %.2fs",
part_number,
chunks,
len(data),
time.monotonic() - start_of_part_upload,
)
parts.append(
{
"ETag": cup_response["ETag"],
"PartNumber": part_number,
}
)
part_number += 1
bytes_sent += len(data)
if progress_fn:
# TODO: change this to incremental progress. Size parameter is currently unused.
progress_fn(bytes_sent, size) # type: ignore[arg-type]
break

self.stats.operation(StorageOperation.multipart_complete)
try:
Expand Down

0 comments on commit 02f5ae6

Please sign in to comment.