diff --git a/.secrets.baseline b/.secrets.baseline index 2ee1138..90c6f4f 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -3,7 +3,7 @@ "files": "^.secrets.baseline$", "lines": null }, - "generated_at": "2023-03-29T12:57:17Z", + "generated_at": "2023-06-16T15:45:19Z", "plugins_used": [ { "name": "AWSKeyDetector" @@ -60,21 +60,21 @@ "hashed_secret": "dc724af18fbdd4e59189f5fe768a5f8311527050", "is_secret": false, "is_verified": false, - "line_number": 43, + "line_number": 129, "type": "Secret Keyword" }, { "hashed_secret": "a9dfb7e3922027233b135425b52be3d2befba04a", "is_secret": false, "is_verified": false, - "line_number": 200, + "line_number": 190, "type": "Hex High Entropy String" }, { "hashed_secret": "a54e9b6f5de30071ba228af8111ab64dddea5aec", "is_secret": false, "is_verified": false, - "line_number": 203, + "line_number": 193, "type": "Hex High Entropy String" } ], diff --git a/.travis.yml b/.travis.yml index 9dc99a5..cca5790 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: python dist: focal python: - - "3.6" - "3.7" - "3.8" - "3.9" diff --git a/cdisutils/net.py b/cdisutils/net.py index a6e54c7..31f975d 100644 --- a/cdisutils/net.py +++ b/cdisutils/net.py @@ -3,14 +3,6 @@ import os import functools -# Migrated to .storage, re-exported to maintain interface -from .storage import ( # noqa - BotoManager, - cancel_stale_multiparts, - md5sum_with_size, - url_for_boto_key, -) - class ContextDecorator: def __call__(self, f): diff --git a/cdisutils/storage.py b/cdisutils/storage.py deleted file mode 100644 index 1a744e7..0000000 --- a/cdisutils/storage.py +++ /dev/null @@ -1,681 +0,0 @@ -""" -cdisutils.storage ----------------------------------- - -Utilities for working with object stores using boto/libcloud. - -TODO: add tests - -""" -import hashlib -import json -import re -import socket -import ssl - -import boto -from boto.s3 import connection as boto_connection -from dateutil import parser -from datetime import timedelta, datetime - -import cdisutils.log -import cdisutils.parsers - - -class S3ConnectionProxyFix(boto_connection.S3Connection): - """Custom S3Connection class to workaround boto proxy issue. - - This fix will apply to any Python 3 boto code that attempts to interact with - Amazon S3 because it needs a proxy to access it. - - With the Python 3 upgrade came breaking changes specifically pertaining to - bytes and strings. The sendall() socket command no longer accepts strings. - We have to convert them all to bytes before sending. - - There have been attempts to merge a fix for this into the main boto branch - but they have not been successful. - - https://github.com/boto/boto/pull/2718 - - https://github.com/boto/boto/pull/3699 - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.logger = cdisutils.log.get_logger("S3ConnectionProxyFix") - - def _get_host(self, host=None, port=None): - if host and port: - return "%s:%d" % (host, port) - else: - return "%s:%d" % (self.host, self.port) - - def _get_socket(self): - timeout = self.http_connection_kwargs.get("timeout") - if timeout is not None: - return socket.create_connection((self.proxy, int(self.proxy_port)), timeout) - return socket.create_connection((self.proxy, int(self.proxy_port))) - - def _send_header(self, sock, host): - self.logger.debug("Proxy connection: CONNECT %s HTTP/1.0\r\n", host) - sock.sendall(("CONNECT %s HTTP/1.0\r\n" % host).encode()) - sock.sendall(("User-Agent: %s\r\n" % boto.UserAgent).encode()) - if self.proxy_user and self.proxy_pass: - for k, v in self.get_proxy_auth_header().items(): - sock.sendall((f"{k}: {v}\r\n").encode()) - # See discussion about this config option at - # https://groups.google.com/forum/?fromgroups#!topic/boto-dev/teenFvOq2Cc - if boto.config.getbool("Boto", "send_crlf_after_proxy_auth_headers", False): - sock.sendall(b"\r\n") - else: - sock.sendall(b"\r\n") - - def _check_conn(self, sock): - try: - resp = boto.compat.http_client.HTTPResponse(sock, debuglevel=self.debug) - resp.begin() - - if resp.status != 200: - # Fake a socket error, use a code that make it obvious it hasn't - # been generated by the socket library - raise OSError( - -71, - "Error talking to HTTP proxy %s:%s: %s (%s)" - % (self.proxy, self.proxy_port, resp.status, resp.reason), - ) - - except Exception as e: - self.logger.error(e) - - finally: - # We can safely close the response, it duped the original socket - resp.close() - - def proxy_ssl(self, host=None, port=None): - host = self._get_host(host, port) - sock = self._get_socket() - self._send_header(sock, host) - self._check_conn(sock) - - h = boto.compat.http_client.HTTPConnection(host) - - if self.https_validate_certificates: - msg = "wrapping ssl socket for proxied connection; " - if self.ca_certificates_file: - msg += "CA certificate file=%s" % self.ca_certificates_file - else: - msg += "using system provided SSL certs" - self.logger.debug(msg) - key_file = self.http_connection_kwargs.get("key_file", None) - cert_file = self.http_connection_kwargs.get("cert_file", None) - sslSock = ssl.wrap_socket( - sock, - keyfile=key_file, - certfile=cert_file, - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=self.ca_certificates_file, - ) - cert = sslSock.getpeercert() - hostname = self.host.split(":", 0)[0] - if not boto.https_connection.ValidateCertificateHostname(cert, hostname): - raise boto.https_connection.InvalidCertificateException( - hostname, cert, "hostname mismatch" - ) - else: - # Fallback for old Python without ssl.wrap_socket - if hasattr(boto.compat.http_client, "ssl"): - sslSock = boto.compat.http_client.ssl.SSLSocket(sock) - else: - sslSock = socket.ssl(sock, None, None) - sslSock = boto.compat.http_client.FakeSocket(sock, sslSock) - - # This is a bit unclean - h.sock = sslSock - return h - - -def url_for_boto_key(key): - template = "s3://{host}/{bucket}/{name}" - return template.format( - host=key.bucket.connection.host, bucket=key.bucket.name, name=key.name - ) - - -def md5sum_with_size(iterable): - """ - Get md5sum and size given an iterable (eg: a boto key) - """ - md5 = hashlib.md5() - size = 0 - for chunk in iterable: - md5.update(chunk) - size += len(chunk) - return md5.hexdigest(), size - - -def cancel_stale_multiparts(bucket, stale_days=7): - """ - Cancel uploads that are stale for more than [stale_days] - File state shouldn't be effected as there might be ongoing upload for this key - """ - uploads = bucket.get_all_multipart_uploads() - for upload in uploads: - initiated = parser.parse(upload.initiated) - if (datetime.now(initiated.tzinfo) - initiated) > timedelta(days=stale_days): - upload.cancel_upload() - - -def filter_s3_urls(urls): - return [url for url in urls if cdisutils.parsers.S3URLParser(url).scheme == "s3"] - - -class StorageError(Exception): - pass - - -class KeyLookupError(LookupError, StorageError): - pass - - -class StorageClient: - """Class that abstracts away storage interfaces""" - - log = cdisutils.log.get_logger("storage_client") - - def __init__(self, boto_manager): - """Constructs a StorageClient - - :param s3_config: - BotoManager or config for BotoManager - - """ - self.boto_manager = boto_manager - - @classmethod - def from_configs(cls, boto_manager, **kwargs): - return cls(boto_manager=BotoManager(**boto_manager), **kwargs) - - def get_key_from_urls(self, urls): - """Loop through list of urls to fetch boto key - - :raises: :class:.KeyLookupError if no urls succeeded - - """ - - remaining_urls = filter_s3_urls(urls) - if not urls: - raise KeyLookupError(f"No s3 urls found in {urls}") - self.log.debug(f"using {remaining_urls} s3 of {urls}") - - errors = {} - - while remaining_urls: - url = remaining_urls.pop(0) - self.log.debug(f"fetching key: '{url}'") - - try: - return self.boto_manager.get_url(url) - except Exception as e: - self.log.warning(f"failed to fetch '{url}': {e}") - errors[url] = e - - raise KeyLookupError(f"failed to fetch key from any: {errors}") - - -class BotoManager: - """ - A class that abstracts away boto calls to multiple underlying - object stores. Given a map from hostname -> arguments to - connect_s3, it will maintain connections to all of those hosts - which can be used transparently through this object. - """ - - log = cdisutils.log.get_logger("boto_manager") - - def __init__(self, config={}, lazy=False, host_aliases={}, stream_status=False): - """ - Config map should be a map from hostname to args, e.g.: - { - "cleversafe.service.consul: { - "aws_access_key_id": "foo", - "aws_secret_access_key": "bar", - "is_secure": False, - . . . - }, - } - - :param host_aliases: - A *REGEX* map from names that match the regex to hostnames - provided in config - e.g. ``{'aws\.accessor1\.mirror': 'cleversafe.service.consul'}`` - """ - - self.config = config - for host, kwargs in self.config.items(): - # we need to pass the host argument in when we connect, so - # set it here - kwargs["host"] = host - if "calling_format" not in kwargs: - kwargs["calling_format"] = boto_connection.OrdinaryCallingFormat() - - self.host_aliases = host_aliases - - self.conns = {} - if not lazy: - self.connect() - - self.s3_inst_info = { - "ceph": { - "secure": True, - "url": "ceph.service.consul", - "access_key": "", - "secret_key": "", - }, - "ceph2": { - "secure": True, - "url": "gdc-cephb-objstore.osdc.io", - "access_key": "", - "secret_key": "", - }, - "cleversafe": { - "secure": True, - "url": "gdc-accessors.osdc.io", - "access_key": "", - "secret_key": "", - }, - } - self.stream_status = stream_status - - # magic number here for multipart chunk size, change with care - self.mp_chunk_size = 1073741824 # 1GiB - - # semi-magic number here, worth playing with if speed issues seen - self.chunk_size = 16777216 - - @property - def hosts(self): - return self.conns.keys() - - def connect(self): - for host, kwargs in self.config.items(): - self.conns[host] = S3ConnectionProxyFix(**kwargs) - - def new_connection_to(self, host): - return S3ConnectionProxyFix(**self.config[host]) - - def __getitem__(self, host): - return self.get_connection(host) - - def harmonize_host(self, host): - matches = { - alias: aliased_host - for alias, aliased_host in self.host_aliases.items() - if re.match(alias, host) - } - - if len(matches) > 1: - self.log.warning(f"matched multiple aliases: {matches}") - - if matches: - self.log.info("using matched aliases: {}".format(matches.keys())) - return next(iter(matches.values())) - else: - return host - - def get_connection(self, host): - return self.conns[self.harmonize_host(host)] - - def get_url( - self, - url, - get_bucket_validate=True, - get_bucket_headers=None, - get_key_validate=True, - get_key_headers=None, - ): - """ - Parse an s3://host/bucket/key formatted url and return the - corresponding boto Key object. - """ - parsed_url = cdisutils.parsers.S3URLParser(url) - scheme = parsed_url.scheme - if scheme != "s3": - raise RuntimeError(f"{url} is not an s3 url") - host = parsed_url.netloc - bucket = self.get_connection(host).get_bucket( - parsed_url.bucket, - validate=get_bucket_validate, - headers=get_bucket_headers, - ) - return bucket.get_key( - parsed_url.key, validate=get_key_validate, headers=get_key_headers - ) - - def list_buckets(self, host=None): - total_files = 0 - if host: - if host in self.conns: - bucket_list = self.conns[host].get_all_buckets() - for bucket in bucket_list: - file_count, bucket_size = self.get_count_of_keys_in_s3_bucket( - conn, bucket.name - ) - self.log.info(bucket.name, file_count) - total_files = total_files + file_count - - self.log.info(f"{total_files} files in {len(bucket_list)} buckets") - else: - self.log.error(f"No connection to host {host} found") - else: - self.log.error("No host given") - - def get_s3_bucket(self, host=None, bucket_name=None): - bucket = None - if host: - if host in self.conns: - try: - self.log.info(f"Getting bucket {bucket_name} from {host}") - bucket = self.conns[host].get_bucket(bucket_name) - except Exception as e: - if e.error_code == "NoSuchBucket": - print("Bucket not found") - else: - self.log.error(e) - else: - self.log.error(f"No connection to host {host} found") - else: - self.log.error("No host given") - - self.log.info(f"Returning {bucket}") - return bucket - - def upload_file( - self, host=None, bucket_name=None, file_name=None, key_name=None, calc_md5=False - ): - if calc_md5: - md5_sum = md5.new() - - bucket = self.get_s3_bucket(host=host, bucket_name=bucket_name) - if not bucket: - bucket = self.conns[host].create_bucket(bucket_name) - - new_key = Key(bucket) - self.log.info(f"Creating key: {key_name}") - new_key.key = key_name - new_key.set_contents_from_filename(file_name) - - def get_all_s3_files(self): - all_s3_files = {} - for s3_inst in self.s3_inst_info.keys(): - try: - cs_conn = self.connect_to_s3(s3_inst) - except: - self.log.warn("Unable to connect to %s, skipping" % s3_inst) - else: - self.log.info("Connected to S3, cs_conn = ", cs_conn) - if cs_conn != None: - all_s3_files[s3_inst] = self.get_s3_file_counts(cs_conn, s3_inst) - return all_s3_files - - def get_all_bucket_names(self, host=None): - bucket_names = [] - try: - bucket_list = conn.get_all_buckets() - except Exception as e: - self.log.error("Unable to list buckets: %s" % e) - else: - for instance in bucket_list: - bucket_names.append(instance.name) - return bucket_names - - def print_running_status(self, transferred_bytes, start_time): - size_info = self.get_nearest_file_size(transferred_bytes) - cur_time = time.clock() - base_transfer_rate = float(transferred_bytes) / float(cur_time - start_time) - transfer_info = self.get_nearest_file_size(base_transfer_rate) - cur_conv_size = float(transferred_bytes) / float(size_info[0]) - cur_conv_rate = base_transfer_rate / float(transfer_info[0]) - sys.stdout.write( - "{:7.02f} {} : {:6.02f} {} per sec\r".format( - cur_conv_size, size_info[1], cur_conv_rate, transfer_info[1] - ) - ) - sys.stdout.flush() - - def get_file_key(self, host=None, bucket_name=None, key_name=None): - key = None - bucket = self.get_s3_bucket(host=host, bucket_name=bucket_name) - if bucket: - self.log.info(f"Getting key {key_name}") - key = bucket.get_key(key_name) - - return key - - def load_file( - self, - uri=None, - host=None, - bucket_name=None, - key_name=None, - stream_status=False, - chunk_size=16777216, - ): - - downloading = True - error = False - file_data = bytearray() - total_transfer = 0 - chunk = [] - if uri: - uri_data = cdisutils.parsers.S3URLParser(uri) - host = uri_data.netloc - os_name = uri_data.netloc.split(".")[0] - bucket_name = uri_data.bucket - key_name = uri_data.key - - if not self.conns[host]: - self.log.info("Making OS connections") - conn = self.connect() - else: - conn = self.conns[host] - - # get the key from the bucket - self.log.info(f"Getting {key_name} from {bucket_name}") - try: - file_key = self.get_file_key( - host=host, bucket_name=bucket_name, key_name=key_name - ) - except Exception as e: - self.log.error(f"Unable to get {key_name} from {bucket_name}") - self.log.error(e) - else: - if file_key: - while downloading == True: - try: - chunk = file_key.read(size=chunk_size) - except Exception as e: - error = True - downloading = False - self.log.error( - "Error {} reading bytes, got {} bytes".format( - str(e), len(chunk) - ) - ) - total_transfer = total_transfer + len(chunk) - else: - if len(chunk) < chunk_size: - downloading = False - total_transfer += len(chunk) - file_data.extend(chunk) - if stream_status: - sys.stdout.write( - "{:6.02}%%\r".format( - float(total_transfer) / float(file_key.size) * 100.0 - ) - ) - sys.stdout.flush() - else: - self.log.warn(f"Unable to find key {os_name}/{bucket_name}/{key_name}") - - self.log.info(f"{len(str(file_data))} lines received") - return str(file_data) - - def parse_data_file(self, uri=None, data_type="tsv", custom_delimiter=None): - """Processes loaded data as a tsv, csv, or - json, returning it as a list of dicts""" - key_data = [] - header = None - skipped_lines = 0 - delimiters = {"tsv": "\t", "csv": ",", "json": "", "other": ""} - other_delimiters = [" ", ",", ";"] - - file_data = self.load_file(uri=uri) - - if data_type not in delimiters.keys(): - print("Unable to process data type %s" % data_type) - print("Valid data types:") - print(list(delimiters.keys())) - else: - if data_type == "other": - if custom_delimiter: - delimiter = custom_delimiter - else: - print("With data_type 'other', a delimiter is needed") - raise - else: - delimiter = delimiters[data_type] - - if data_type == "json": - for line in file_data.split("\n"): - line_data = json.loads(line) - key_data.append(line_data) - # load as tsv/csv, assuming the first row is the header - # that provides keys for the dict - else: - for line in file_data.split("\n"): - if delimiter in line: - if len(line.strip("\n").strip()): - if not header: - header = line.strip("\n").split(delimiter) - else: - line_data = dict( - zip(header, line.strip("\n").split(delimiter)) - ) - key_data.append(line_data) - else: - # ok, let's see if we can be smart here - # if not header: - # remaining_chars = set([c for c in line if not c.isalnum()]) - skipped_lines += 1 - - print( - "%d lines in file, %d processed" - % (len(file_data.split("\n")), len(key_data)) - ) - return key_data - - def md5_s3_key(self, conn, which_bucket, key_name, chunk_size=16777216): - result = {"transfer_time": 0, "bytes_transferred": 0} - m = md5.new() - sha = hashlib.sha256() - size = 0 - retries = 0 - result["start_time"] = time.time() - error = False - running = False - file_key = self.get_file_key(conn, which_bucket, key_name) - if file_key: - running = True - file_key.BufferSize = self.chunk_size - else: - self.log.warning(f"Unable to find key {which_bucket} {key_name}") - - while running == True: - try: - # chunk = file_key.read() - chunk = file_key.read(size=chunk_size) - except: - if len(chunk) == 0: - if retries > 10: - print("Error reading bytes") - error = True - break - else: - retries += 1 - print("%d: Error reading bytes, retry %d" % (id, retries)) - time.sleep(2) - else: - print( - "%d: Error %s reading bytes, got %d bytes" - % (id, str((sys.exc_info())[1]), len(chunk)) - ) - total_transfer = total_transfer + len(chunk) - m.update(chunk) - sha.update(chunk) - retries = 0 - else: - if len(chunk) == 0: - running = False - result["bytes_transferred"] += len(chunk) - if file_key.size > 0: - sys.stdout.write( - "%6.02f%%\r" - % ( - float(result["bytes_transferred"]) - / float(file_key.size) - * 100.0 - ) - ) - else: - sys.stdout.write("0.00%%\r") - - sys.stdout.flush() - m.update(chunk) - sha.update(chunk) - retries = 0 - result["transfer_time"] = time.time() - result["start_time"] - result["md5_sum"] = m.hexdigest() - result["sha256_sum"] = sha.hexdigest() - return result - - -def is_probably_swift_segments(obj): - """The way OpenStack swift works is that you can't upload anything - larger than 5GB. When you try to do so, swift instead uploads each - 5GB chunk of that object to it's own 'segment' (which is in a - special segments bucket) and instead of the thing you wanted to - upload, stores a json doc with information about how to get the - pieces of the thing you actually wanted. This function is a - heuristic predicate that returns True if the libcloud object - passed in is probably one of these swift segment indicators (note - that I say probably because there is no way to be sure, its - possible, although unlikely, that the key really does consist of a - jsob blob that just happens to look like a segment indicatior). - """ - if obj.size > 10**6: - return False - try: - blob = json.loads("".join(obj.as_stream())) - except ValueError: - return False - if ( - type(blob) == list - and len(blob) > 0 - and "path" in blob[0] - and "etag" in blob[0] - and "size_bytes" in blob[0] - ): - return True - else: - return False - - -def swift_stream(obj): - """Given a libcloud object containing one of the aforementioned swift - segment indicators, return a generator that yields byte chunks of - the chunked object it represents. - """ - doc = json.loads("".join(obj.as_stream())) - driver = obj.container.driver - for segment in doc: - container_name, obj_name = segment["path"].split("/", 2)[1:3] - bucket = driver.get_container(container_name) - yield from bucket.get_object(obj_name).as_stream() diff --git a/dev-requirements.txt b/dev-requirements.txt index 78ebeda..d38d013 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,26 +1,19 @@ # -# This file is autogenerated by pip-compile with python 3.6 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.7 +# by the following command: # -# pip-compile --extra=dev --output-file=dev-requirements.txt +# pip-compile --extra=dev --output-file=dev-requirements.txt --resolver=backtracking setup.py # atomicwrites==1.4.0 # via pytest attrs==19.3.0 # via pytest -aws-xray-sdk==2.11.0 - # via moto -boto==2.49.0 +boto3==1.26.153 # via # cdisutils (setup.py) # moto -boto3==1.16.63 +botocore==1.29.153 # via - # cdisutils (setup.py) - # moto -botocore==1.19.63 - # via - # aws-xray-sdk # boto3 # moto # s3transfer @@ -28,29 +21,32 @@ certifi==2020.6.20 # via requests cffi==1.15.1 # via cryptography -chardet==3.0.4 +charset-normalizer==3.1.0 # via requests -click==7.1.2 +click==8.1.3 # via flask -cookies==2.2.1 - # via moto coverage[toml]==5.2.1 # via pytest-cov -cryptography==3.2.1 +cryptography==41.0.1 # via moto -docker==4.4.4 - # via moto -flask==1.1.4 +flask==2.2.5 + # via + # cdisutils (setup.py) + # flask-cors +flask-cors==3.0.10 # via cdisutils (setup.py) idna==2.10 # via requests -importlib-metadata==0.23 +importlib-metadata==6.6.0 # via + # click + # flask + # moto # pluggy # pytest -itsdangerous==1.1.0 +itsdangerous==2.1.2 # via flask -jinja2==2.11.2 +jinja2==3.1.2 # via # flask # moto @@ -58,15 +54,13 @@ jmespath==0.10.0 # via # boto3 # botocore -jsondiff==1.1.1 - # via moto -markupsafe==1.1.1 - # via jinja2 -mock==3.0.5 - # via moto +markupsafe==2.1.3 + # via + # jinja2 + # werkzeug more-itertools==5.0.0 # via pytest -moto==1.2.0 +moto==4.1.11 # via cdisutils (setup.py) packaging==20.4 # via pytest @@ -74,8 +68,6 @@ pluggy==0.13.1 # via pytest py==1.9.0 # via pytest -pyaml==21.10.1 - # via moto pycparser==2.21 # via cffi pyparsing==2.4.7 @@ -91,46 +83,46 @@ python-dateutil==2.8.2 # botocore # cdisutils (setup.py) # moto -pytz==2020.1 - # via moto pyyaml==5.3.1 # via # cdisutils (setup.py) - # pyaml -requests==2.24.0 + # responses +requests==2.31.0 # via - # docker + # cdisutils (setup.py) # moto -s3transfer==0.3.7 + # responses +responses==0.23.1 + # via moto +s3transfer==0.6.1 # via boto3 six==1.16.0 # via - # cryptography - # docker - # mock + # flask-cors # more-itertools - # moto # packaging # pytest # python-dateutil - # websocket-client toml==0.10.1 # via coverage -urllib3==1.25.11 +types-pyyaml==6.0.12.10 + # via responses +typing-extensions==4.6.3 + # via + # importlib-metadata + # responses +urllib3==1.26.16 # via # botocore # cdisutils (setup.py) # requests + # responses wcwidth==0.2.5 # via pytest -websocket-client==0.59.0 - # via docker -werkzeug==1.0.1 +werkzeug==2.2.3 # via # flask # moto -wrapt==1.15.0 - # via aws-xray-sdk xmltodict==0.12.0 # via moto zipp==1.2.0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..67e107f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,30 @@ +# +# This file is autogenerated by pip-compile with Python 3.7 +# by the following command: +# +# pip-compile --output-file=requirements.txt --resolver=backtracking setup.py +# +boto3==1.26.153 + # via cdisutils (setup.py) +botocore==1.29.153 + # via + # boto3 + # s3transfer +jmespath==1.0.1 + # via + # boto3 + # botocore +python-dateutil==2.8.2 + # via + # botocore + # cdisutils (setup.py) +pyyaml==5.4.1 + # via cdisutils (setup.py) +s3transfer==0.6.1 + # via boto3 +six==1.16.0 + # via python-dateutil +urllib3==1.26.16 + # via + # botocore + # cdisutils (setup.py) diff --git a/setup.py b/setup.py index 431a2b6..ca190e3 100644 --- a/setup.py +++ b/setup.py @@ -18,19 +18,18 @@ long_description_content_type="text/markdown", classifiers=[ "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", "License :: OSI Approved :: Apache Software License", ], - python_requires=">=3.6, <4", + python_requires=">=3.7, <4", packages=["cdisutils"], # Note that some of these ranges have generous lower bounds since some # consumers of cdisutils might still expect older versions. install_requires=[ - "boto~=2.36", - "boto3~=1.9", + "boto3~=1.26", "python-dateutil~=2.4", "PyYAML>=3.11,<6.0", - "urllib3>=1.0,<1.26", # for moto and boto2 + "urllib3~=1.26", ], # Some lesser-used parts of cdisutils require extra dependencies. extras_require={ @@ -45,10 +44,12 @@ # bin/nova_status.py "nova": ["python-novaclient~=3.2"], "dev": [ - "moto>1", + "moto~=4.1", + "requests~=2.31", "pytest>4.6", "pytest-cov>2.10", - "flask", + "flask~=2.0", # required for moto_server + "flask_cors~=3.0", # required for moto_server ], }, ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8e55191..768f5af 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -18,11 +18,11 @@ class MotoServer(Thread): without interfering with other aspects of the code """ - def __init__(self, port=7000, is_secure=True): + def __init__(self, hostname="localhost", port=7000, is_secure=True): Thread.__init__(self) self._proc = None self.port = port - self.hostname = "localhost" + self.hostname = hostname self.server_id = str(uuid.uuid4()) self.is_secure = is_secure @@ -83,6 +83,27 @@ def stop(self): logger.info("Shutdown successful for mock Boto Service id: " + self.server_id) +@pytest.fixture +def moto_server_factory(): + """Fixure factory + + Yields: + function: Creates a function that can be invoked to start multiple moto servers. + """ + servers = [] + + def _gen_moto_server(hostname="localhost", port=7000, is_secure=True): + server = MotoServer(hostname=hostname, port=port, is_secure=is_secure) + server.start() + servers.append(server) + return server + + yield _gen_moto_server + + for server in servers: + server.stop() + + @pytest.fixture(scope="module") def moto_server(): diff --git a/tests/integration/test_storage.py b/tests/integration/test_storage.py deleted file mode 100644 index 3241cd3..0000000 --- a/tests/integration/test_storage.py +++ /dev/null @@ -1,51 +0,0 @@ -import pytest - -from cdisutils.storage import ( - cancel_stale_multiparts, - md5sum_with_size, - url_for_boto_key, -) - -import boto - - -@pytest.fixture -def get_bucket(moto_server_no_ssl): - conn = boto.connect_s3( - host="localhost", - port=7000, - calling_format="boto.s3.connection.OrdinaryCallingFormat", - is_secure=False, - aws_access_key_id="testing", - aws_secret_access_key="testing", - ) - buck = conn.create_bucket("foo") - return buck - - -def test_url_for_boto_key(get_bucket): - key = get_bucket.new_key("bar") - assert url_for_boto_key(key) == "s3://localhost/foo/bar" - - -def test_cancel_stale_multiparts(get_bucket): - upload = get_bucket.initiate_multipart_upload("test_key") - cancel_stale_multiparts(get_bucket) - # moto hard coded multipart initiate date to 2010-11-10T20:48:33.000Z, - # so newly created uploads actually are created > 7 days. - assert len(get_bucket.get_all_multipart_uploads()) == 0 - - -def test_cancel_stale_multiparts_does_not_cancel_active_uploads(get_bucket): - upload = get_bucket.initiate_multipart_upload("test_key") - # moto hard coded multipart initiate date to 2010-11-10T20:48:33.000Z - cancel_stale_multiparts(get_bucket, stale_days=100000000) - assert len(get_bucket.get_all_multipart_uploads()) == 1 - - -def test_md5sum_with_size(get_bucket): - key = get_bucket.new_key("test_key") - key.set_contents_from_string("test") - (md5, size) = md5sum_with_size(key) - assert size == key.size - assert md5.encode("ascii") == key.md5 diff --git a/tests/integration/test_storage3.py b/tests/integration/test_storage3.py index 2a937f6..d800df5 100644 --- a/tests/integration/test_storage3.py +++ b/tests/integration/test_storage3.py @@ -1,9 +1,11 @@ import os - +import time +import typing import boto3 import pytest from cdisutils.storage3 import Boto3Manager +from tests.integration.conftest import MotoServer LARGE_NUMBER_TO_WRITE = 10000000 @@ -13,7 +15,7 @@ TEST_BUCKET = "test_bucket" -def get_config(): +def get_config() -> typing.Dict: return { "localhost:7000": { "aws_secret_access_key": "testing", @@ -97,88 +99,76 @@ def test_list_buckets(): assert bucket_list[0]["Name"] == TEST_BUCKET -@pytest.mark.usefixtures("create_large_object") -def test_create_multipart_upload(): - config = get_config() - manager = Boto3Manager(config) - src_url = f"s3://localhost:7000/{TEST_BUCKET}/{ORIGINAL_FILE_NAME}" - dst_url = f"s3://localhost:7000/{TEST_BUCKET}/{COPIED_FILE_NAME}" - mp_info = manager.create_multipart_upload(src_url=src_url, dst_url=dst_url) - - assert "md5_sum" in mp_info - mp_info.pop("md5_sum") - assert "sha256_sum" in mp_info - mp_info.pop("sha256_sum") - assert "mp_id" in mp_info - mp_info.pop("mp_id") - assert "stream_buffer" in mp_info - mp_info.pop("stream_buffer") - assert "start_time" in mp_info - mp_info.pop("start_time") - - assert mp_info == { - "dst_info": { - "url": "s3://localhost:7000/test_bucket/copied_file", - "s3_loc": "localhost:7000", - "bucket_name": "test_bucket", - "key_name": "copied_file", +def test_simulate_cleversafe_to_aws_multipart_copy( + create_large_file, moto_server_factory +): + """ + The multipart upload is used to support transferring large files from one S3 provider to another. + """ + # Start the servers + host_a = "localhost" + port_a = 7001 + url_a = f"{host_a}:{port_a}" + moto_server_a = moto_server_factory(hostname=host_a, port=port_a) + assert url_a == moto_server_a.url + + host_b = "localhost" + port_b = 7002 + url_b = f"{host_b}:{port_b}" + moto_server_b = moto_server_factory(hostname=host_b, port=port_b) + assert url_b == moto_server_b.url + + # Setup the BotoManager + s3_configs = { + f"{moto_server_a.url}": { + "aws_secret_access_key": "testing", + "aws_access_key_id": "testing", + "verify": False, }, - "src_info": { - "url": "s3://localhost:7000/test_bucket/original_file", - "s3_loc": "localhost:7000", - "bucket_name": "test_bucket", - "key_name": "original_file", + f"{moto_server_b.url}": { + "aws_secret_access_key": "testing", + "aws_access_key_id": "testing", + "verify": False, }, - "mp_chunk_size": 1073741824, - "download_chunk_size": 16777216, - "cur_size": 0, - "chunk_index": 1, - "total_size": 0, - "manifest": {"Parts": []}, } + manager = Boto3Manager(s3_configs) + conn_a = manager.get_connection(f"{moto_server_a.url}") + conn_b = manager.get_connection(f"{moto_server_b.url}") - -@pytest.mark.usefixtures("create_large_object") -def test_complete_multipart_upload(): - config = get_config() - manager = Boto3Manager(config) - src_url = f"s3://localhost:7000/{TEST_BUCKET}/{ORIGINAL_FILE_NAME}" - dst_url = f"s3://localhost:7000/{TEST_BUCKET}/{COPIED_FILE_NAME}" - mp_info = manager.create_multipart_upload(src_url=src_url, dst_url=dst_url) - - manager.complete_multipart_upload(mp_info=mp_info) - - conn = manager.get_connection("localhost:7000") - head = conn.head_object(Bucket=TEST_BUCKET, Key=ORIGINAL_FILE_NAME) + # load in the initial data + conn_a.create_bucket(Bucket=TEST_BUCKET) + conn_a.put_object( + Body=open(str(create_large_file), "rb"), + Bucket=TEST_BUCKET, + Key=ORIGINAL_FILE_NAME, + ) + head = conn_a.head_object(Bucket=TEST_BUCKET, Key=ORIGINAL_FILE_NAME) assert head["ResponseMetadata"]["HTTPStatusCode"] == 200 assert head["ContentLength"] == LARGE_NUMBER_TO_WRITE * len("test") - conn.delete_object(Bucket=TEST_BUCKET, Key=COPIED_FILE_NAME) + # create the destination bucket + conn_b.create_bucket(Bucket=TEST_BUCKET) + # Copy files from one host to the other using multipart + src_url = f"s3://{host_a}:{port_a}/{TEST_BUCKET}/{ORIGINAL_FILE_NAME}" + dst_url = f"s3://{host_b}:{port_b}/{TEST_BUCKET}/{COPIED_FILE_NAME}" -@pytest.mark.usefixtures("create_large_object") -def test_copy_multipart_file(): - config = get_config() - manager = Boto3Manager(config) - src_url = f"s3://localhost:7000/{TEST_BUCKET}/{ORIGINAL_FILE_NAME}" - dst_url = f"s3://localhost:7000/{TEST_BUCKET}/{COPIED_FILE_NAME}" - mp_info = manager.create_multipart_upload(src_url=src_url, dst_url=dst_url) + src_info = manager.parse_url(src_url) + dst_info = manager.parse_url(dst_url) - res = manager.copy_multipart_file( - src_info=mp_info["src_info"], dst_info=mp_info["dst_info"] - ) + # copy_multipart_file invokes the other multipart operation: create_multipart_upload, complete_multipart_upload + res = manager.copy_multipart_file(src_info=src_info, dst_info=dst_info) assert res == { "md5_sum": "bc0354f0646794a755a4276435ec5a6c", "sha256_sum": "c97d1f1ab2ae91dbe05ad8e20bc58fc6f3af28e98d98ca8dbeee31a9d32e1e5b", "bytes_transferred": 40000000, } - conn = manager.get_connection("localhost:7000") - head = conn.head_object(Bucket=TEST_BUCKET, Key=ORIGINAL_FILE_NAME) + head = conn_b.head_object(Bucket=TEST_BUCKET, Key=COPIED_FILE_NAME) assert head["ResponseMetadata"]["HTTPStatusCode"] == 200 assert head["ContentLength"] == LARGE_NUMBER_TO_WRITE * len("test") - conn.delete_object(Bucket=TEST_BUCKET, Key=COPIED_FILE_NAME) + conn_b.delete_object(Bucket=TEST_BUCKET, Key=COPIED_FILE_NAME) @pytest.mark.usefixtures("create_large_object") diff --git a/tests/unit/test_storage_botomanager.py b/tests/unit/test_storage_botomanager.py deleted file mode 100644 index f1ddbda..0000000 --- a/tests/unit/test_storage_botomanager.py +++ /dev/null @@ -1,42 +0,0 @@ -from unittest.mock import MagicMock - -import pytest -from boto.s3.connection import S3Connection - -import cdisutils -from cdisutils.storage import BotoManager -from tests.unit.utils import get_config - - -def test_basic_connect(): - """These tests are admitedly rudimentary. I wanted to use moto but - moto works by intercepting calls to s3.amazonaws.com, so it - won't work for testing mutiple hosts like this -___- - """ - config = get_config() - manager = BotoManager(config) - aws_conn_mock = manager["s3.amazonaws.com"] - assert aws_conn_mock.host == "s3.amazonaws.com" - assert aws_conn_mock.is_secure is True - site_conn_mock = manager["s3.myinstallation.org"] - assert site_conn_mock.host == "s3.myinstallation.org" - assert site_conn_mock.is_secure is False - - -def connect_s3_mock(*args, **kwargs): - kwargs["spec"] = S3Connection - return MagicMock(*args, **kwargs) - - -def test_get_url(monkeypatch): - monkeypatch.setattr(cdisutils.storage, "S3ConnectionProxyFix", MagicMock()) - config = get_config() - manager = BotoManager(config) - manager.get_url("s3://s3.amazonaws.com/bucket/dir/key") - mock = manager["s3.amazonaws.com"] - mock.get_bucket.assert_called_once_with("bucket", headers=None, validate=True) - mock.get_bucket().get_key.assert_called_once_with( - "dir/key", headers=None, validate=True - ) - with pytest.raises(KeyError): - manager.get_url("s3://fake_host/bucket/dir/key") diff --git a/tests/unit/test_storage_client.py b/tests/unit/test_storage_client.py deleted file mode 100644 index 4176766..0000000 --- a/tests/unit/test_storage_client.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -Test suite for utils that interact with the storage system -""" - -from cdisutils.storage import StorageClient - -import pytest - -INDEXD_CONFIG = dict(host="localhost", port=8000, auth=()) - - -@pytest.fixture -def storage_client(): - return StorageClient.from_configs( - { - "boto_manager": { - "config": { - "aws_access_key_id": "foo", - "aws_secret_access_key": "bar", - "is_secure": False, - }, - "lazy": True, - } - } - ) - - -@pytest.mark.parametrize( - "config", - [ - { - "boto_manager": { - "config": { - "localhost:5555": { - "aws_access_key_id": "foo", - "aws_secret_access_key": "bar", - "is_secure": False, - } - }, - "lazy": True, - } - }, - { - "boto_manager": { - "config": { - "localhost:5555": { - "aws_access_key_id": "foo", - "aws_secret_access_key": "bar", - "is_secure": False, - } - }, - "host_aliases": {".*service.dns.resolve": "localhost"}, - "lazy": True, - }, - }, - ], -) -def test_from_configs(config): - StorageClient.from_configs(**config) - - -@pytest.mark.parametrize( - "aliases,host,expected", - [ - ({r"aws\..*\.com": "a", r"aws\..*\.org": "b"}, "aws.custom.com", "a"), - ({r"aws\..*\.com": "b"}, "c", "c"), - ], -) -def test_aliased_connections(aliases, host, expected): - kwargs = {"aws_access_key_id": "", "aws_secret_access_key": ""} - StorageClient.from_configs( - **{ - "boto_manager": { - "config": { - "a": kwargs, - "b": kwargs, - "c": kwargs, - }, - "host_aliases": aliases, - } - } - ).boto_manager.get_connection(host).host == expected diff --git a/tox.ini b/tox.ini index 1da6a4a..4082999 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py36,py37,py38,py39 +envlist = py37,py38,py39 requires = pip >= 18.1 skip_missing_interpreters =