Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move s3 HEAD bucket to lib. #5067

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 73 additions & 77 deletions src/toil/jobStores/aws/jobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
uploadFromPath,
)
from toil.jobStores.utils import ReadablePipe, ReadableTransformingPipe, WritablePipe
from toil.lib.aws.s3 import head_s3_bucket
from toil.lib.aws import build_tag_dict_from_env
from toil.lib.aws.session import establish_boto3_session
from toil.lib.aws.utils import (
Expand Down Expand Up @@ -821,85 +822,80 @@ def bucket_retry_predicate(error):
return False

bucketExisted = True
for attempt in retry_s3(predicate=bucket_retry_predicate):
with attempt:
try:
# the head_bucket() call makes sure that the bucket exists and the user can access it
self.s3_client.head_bucket(Bucket=bucket_name)

bucket = self.s3_resource.Bucket(bucket_name)
except ClientError as e:
error_http_status = get_error_status(e)
if error_http_status == 404:
bucketExisted = False
logger.debug("Bucket '%s' does not exist.", bucket_name)
if create:
bucket = create_s3_bucket(
self.s3_resource, bucket_name, self.region
)
# Wait until the bucket exists before checking the region and adding tags
bucket.wait_until_exists()

# It is possible for create_bucket to return but
# for an immediate request for the bucket region to
# produce an S3ResponseError with code
# NoSuchBucket. We let that kick us back up to the
# main retry loop.
assert (
get_bucket_region(bucket_name) == self.region
), f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}"

tags = build_tag_dict_from_env()

if tags:
flat_tags = flatten_tags(tags)
bucket_tagging = self.s3_resource.BucketTagging(bucket_name)
bucket_tagging.put(Tagging={'TagSet': flat_tags})

# Configure bucket so that we can make objects in
# it public, which was the historical default.
enable_public_objects(bucket_name)
elif block:
raise
else:
return None
elif error_http_status == 301:
# This is raised if the user attempts to get a bucket in a region outside
# the specified one, if the specified one is not `us-east-1`. The us-east-1
# server allows a user to use buckets from any region.
raise BucketLocationConflictException(get_bucket_region(bucket_name))
else:
raise
else:
bucketRegion = get_bucket_region(bucket_name)
if bucketRegion != self.region:
raise BucketLocationConflictException(bucketRegion)

if versioning and not bucketExisted:
# only call this method on bucket creation
bucket.Versioning().enable()
# Now wait until versioning is actually on. Some uploads
# would come back with no versions; maybe they were
# happening too fast and this setting isn't sufficiently
# consistent?
time.sleep(1)
while not self._getBucketVersioning(bucket_name):
logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...")
time.sleep(1)
elif check_versioning_consistency:
# now test for versioning consistency
# we should never see any of these errors since 'versioning' should always be true
bucket_versioning = self._getBucketVersioning(bucket_name)
if bucket_versioning != versioning:
assert False, 'Cannot modify versioning on existing bucket'
elif bucket_versioning is None:
assert False, 'Cannot use a bucket with versioning suspended'
if bucketExisted:
logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.")
try:
# make sure bucket exists and user can access it
head_s3_bucket(Bucket=bucket_name)

bucket = self.s3_resource.Bucket(bucket_name)
except ClientError as e:
error_http_status = get_error_status(e)
if error_http_status == 404:
bucketExisted = False
logger.debug("Bucket '%s' does not exist.", bucket_name)
if create:
bucket = create_s3_bucket(self.s3_resource, bucket_name, self.region)
# Wait until the bucket exists before checking the region and adding tags
bucket.wait_until_exists()

# It is possible for create_bucket to return but
# for an immediate request for the bucket region to
# produce an S3ResponseError with code
# NoSuchBucket. We let that kick us back up to the
# main retry loop.
assert (get_bucket_region(bucket_name) == self.region),\
f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}"

tags = build_tag_dict_from_env()

if tags:
flat_tags = flatten_tags(tags)
bucket_tagging = self.s3_resource.BucketTagging(bucket_name)
bucket_tagging.put(Tagging={'TagSet': flat_tags})

# Configure bucket so that we can make objects in
# it public, which was the historical default.
enable_public_objects(bucket_name)
elif block:
raise
else:
logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.")
return None
elif error_http_status == 301:
# This is raised if the user attempts to get a bucket in a region outside
# the specified one, if the specified one is not `us-east-1`. The us-east-1
# server allows a user to use buckets from any region.
raise BucketLocationConflictException(get_bucket_region(bucket_name))
else:
raise
else:
bucketRegion = get_bucket_region(bucket_name)
if bucketRegion != self.region:
raise BucketLocationConflictException(bucketRegion)

if versioning and not bucketExisted:
# only call this method on bucket creation
bucket.Versioning().enable()
# Now wait until versioning is actually on. Some uploads
# would come back with no versions; maybe they were
# happening too fast and this setting isn't sufficiently
# consistent?
time.sleep(1)
while not self._getBucketVersioning(bucket_name):
logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...")
time.sleep(1)
elif check_versioning_consistency:
# now test for versioning consistency
# we should never see any of these errors since 'versioning' should always be true
bucket_versioning = self._getBucketVersioning(bucket_name)
if bucket_versioning != versioning:
assert False, 'Cannot modify versioning on existing bucket'
elif bucket_versioning is None:
assert False, 'Cannot use a bucket with versioning suspended'
if bucketExisted:
logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.")
else:
logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.")

return bucket
return bucket

def _bindDomain(self, domain_name: str, create: bool = False, block: bool = True) -> None:
"""
Expand Down
15 changes: 13 additions & 2 deletions src/toil/lib/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from typing import Dict, Any, Optional, List

from mypy_boto3_s3.type_defs import ListMultipartUploadsOutputTypeDef
from mypy_boto3_s3.type_defs import ListMultipartUploadsOutputTypeDef, HeadBucketOutputTypeDef

from toil.lib.aws import session, AWSServerErrors
from toil.lib.retry import retry
Expand All @@ -23,6 +23,17 @@


@retry(errors=[AWSServerErrors])
def head_s3_bucket(bucket: str, region: Optional[str] = None) -> HeadBucketOutputTypeDef:
"""
Attempt to HEAD an s3 bucket and return its response.

:param bucket: AWS bucket name
:param region: Region that we want to look for the bucket in
"""
s3_client = session.client("s3", region_name=region)
return s3_client.head_bucket(Bucket=bucket)


def list_multipart_uploads(bucket: str, region: str, prefix: str, max_uploads: int = 1) -> ListMultipartUploadsOutputTypeDef:
s3_client = session.client("s3", region_name=region)
return s3_client.list_multipart_uploads(Bucket=bucket, MaxUploads=max_uploads, Prefix=prefix)