diff --git a/asab/storage/elasticsearch.py b/asab/storage/elasticsearch.py index 0ca0f0c2b..0b0df75d9 100644 --- a/asab/storage/elasticsearch.py +++ b/asab/storage/elasticsearch.py @@ -48,7 +48,6 @@ class StorageService(StorageServiceABC): def __init__(self, app, service_name, config_section_name='asab:storage'): super().__init__(app, service_name) - self.Loop = app.Loop self.URL = Config.get(config_section_name, 'elasticsearch_url') parsed_url = urllib.parse.urlparse(self.URL) @@ -62,49 +61,23 @@ def __init__(self, app, service_name, config_section_name='asab:storage'): # Authorization: username or API-key username = Config.get(config_section_name, 'elasticsearch_username') + password = Config.get(config_section_name, 'elasticsearch_password') api_key = Config.get(config_section_name, 'elasticsearch_api_key') - if username != '' and api_key != '': - L.warning("Both username and API key specified. ES Storage service may not function properly. Please choose one option.") - - if username == '': - self._auth = None - else: - password = Config.get(config_section_name, 'elasticsearch_password') - self._auth = aiohttp.BasicAuth(login=username, password=password) - - self._ClientSession = None - # Create headers for requests - self.Headers = {'Content-Type': 'application/json'} - if api_key != '': - self.Headers['Authorization'] = "ApiKey {}".format(api_key) + self.Headers = build_headers(username, password, api_key) + # Build ssl context self.SSLContextBuilder = SSLContextBuilder(config_section_name) - - - async def finalize(self, app): - """ - Close the current client session. - """ - if self._ClientSession is not None and not self._ClientSession.closed: - await self._ClientSession.close() - self._ClientSession = None - - - def session(self): - """ - Get the current client session. - """ - if self._ClientSession is None: - self._ClientSession = aiohttp.ClientSession(auth=self._auth) - elif self._ClientSession.closed: - self._ClientSession = aiohttp.ClientSession(auth=self._auth) - return self._ClientSession + if len(self.ServerUrls) > 0 and self.ServerUrls[0].startswith('https://'): + self.SSLContext = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) + else: + self.SSLContext = None async def is_connected(self) -> bool: - """Check if the service is connected to ElasticSearch cluster. + """ + Check if the service is connected to ElasticSearch cluster. Raises: ConnectionError: Connection failed. @@ -112,41 +85,36 @@ async def is_connected(self) -> bool: Returns: bool: True if the service is connected. """ - for url in self.ServerUrls: - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - async with self.session().request( - method="GET", - url=url, - ssl=ssl_context, - headers=self.Headers, - ) as resp: - await self.session().close() - if resp.status not in {200, 201}: - resp = await resp.json() - L.error("Failed to connect to ElasticSearch.", struct_data={ - "code": resp.get("status"), - "reason": resp.get("error", {}).get("reason") - }) - return False + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + try: + async with session.request( + method="GET", + url=url, + ssl=self.SSLContext, + headers=self.Headers, + ) as resp: + if resp.status not in {200, 201}: + resp = await resp.json() + L.error("Failed to connect to ElasticSearch.", struct_data={ + "code": resp.get("status"), + "reason": resp.get("error", {}).get("reason") + }) + return False - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise ConnectionError("Failed to connect to '{}'.".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise ConnectionError("Failed to connect to '{}'.".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) L.info("Connected to ElasticSearch.", struct_data={"urls": self.ServerUrls}) return True async def get(self, index: str, obj_id: str, decrypt=None) -> dict: - """Get object by its index and object ID. + """ + Get object by its index and object ID. Args: index (str): Index for the query. @@ -164,49 +132,45 @@ async def get(self, index: str, obj_id: str, decrypt=None) -> dict: """ if decrypt is not None: raise NotImplementedError("AES encryption for ElasticSearch not implemented") - for url in self.ServerUrls: - request_url = "{}{}/_doc/{}".format(url, index, obj_id) - try: - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - async with self.session().request( - method="GET", - url=request_url, - ssl=ssl_context, - headers=self.Headers, - ) as resp: - if resp.status == 401: - raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.") - elif resp.status not in {200, 201}: - resp = await resp.json() - raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format( - resp.get("status"), - resp.get("error", {}).get("reason") - )) + + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + request_url = "{}{}/_doc/{}".format(url, index, obj_id) + try: + async with session.get( + url=request_url, + ssl=self.SSLContext, + headers=self.Headers, + ) as resp: + if resp.status == 401: + raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.") + elif resp.status not in {200, 201}: + resp = await resp.json() + raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format( + resp.get("status"), + resp.get("error", {}).get("reason") + )) + else: + obj = await resp.json() + if not obj.get("found"): + raise KeyError("No existing object with ID {}".format(obj_id)) + ret = obj['_source'] + ret['_v'] = obj['_version'] + ret['_id'] = obj['_id'] + return ret + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise ConnectionError("Failed to connect to '{}'".format(url)) else: - obj = await resp.json() - if not obj.get("found"): - raise KeyError("No existing object with ID {}".format(obj_id)) - ret = obj['_source'] - ret['_v'] = obj['_version'] - ret['_id'] = obj['_id'] - return ret - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise ConnectionError("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) async def get_by(self, collection: str, key: str, value, decrypt=None): raise NotImplementedError("get_by") async def delete(self, index: str, _id=None) -> dict: - """Delete an entire index or document from that index. + """ + Delete an entire index or document from that index. Args: index: Index to delete. @@ -221,50 +185,46 @@ async def delete(self, index: str, _id=None) -> dict: Returns: The deleted document or message that the entire index was deleted. """ - for url in self.ServerUrls: - try: - if _id: - request_url = "{}{}/_doc/{}?refresh={}".format(url, index, _id, self.Refresh) - else: - request_url = "{}{}".format(url, index) - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - async with self.session().request( - method="DELETE", - url=request_url, - ssl=ssl_context, - headers=self.Headers - ) as resp: - if resp.status == 401: - raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.") - elif resp.status == 404: - raise KeyError("No existing object with ID {}".format(_id)) - elif resp.status not in {200, 201}: - raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format( - resp.get("status"), - resp.get("error", {}).get("reason") - )) + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + try: + if _id: + request_url = "{}{}/_doc/{}?refresh={}".format(url, index, _id, self.Refresh) else: - json_response = await resp.json() + request_url = "{}{}".format(url, index) + + async with session.delete( + url=request_url, + ssl=self.SSLContext, + headers=self.Headers + ) as resp: + if resp.status == 401: + raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.") + elif resp.status == 404: + raise KeyError("No existing object with ID {}".format(_id)) + elif resp.status not in {200, 201}: + raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format( + resp.get("status"), + resp.get("error", {}).get("reason") + )) + else: + json_response = await resp.json() - if json_response.get("acknowledged", False): + if json_response.get("acknowledged", False): + return json_response + assert json_response["result"] == "deleted", "Document was not deleted" return json_response - assert json_response["result"] == "deleted", "Document was not deleted" - await self.session().close() - return json_response - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) async def mapping(self, index: str) -> dict: - """Retrieve mapping definitions for one index. + """ + Retrieve mapping definitions for one index. :param index: Specified index. :type index: str @@ -273,62 +233,54 @@ async def mapping(self, index: str) -> dict: Returns: dict: Mapping definitions for the index. """ - for url in self.ServerUrls: - request_url = "{}{}/_mapping".format(url, index) - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - async with self.session().request( - method="GET", - url=request_url, - ssl=ssl_context, - headers=self.Headers - ) as resp: - obj = await resp.json() - await self.session().close() - return obj - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise ConnectionError("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + request_url = "{}{}/_mapping".format(url, index) + + try: + async with session.get( + url=request_url, + ssl=ssl_context, + headers=self.Headers + ) as resp: + obj = await resp.json() + return obj + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise ConnectionError("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async def get_index_template(self, template_name: str) -> dict: - """Retrieve ECS Index template for the given template name. + """ + Retrieve ECS Index template for the given template name. :param template_name: The name of the ECS template to retrieve. :type template_name: str :raise Exception: Raised if connection to all server URLs fails. :return: ElasticSearch Index template. """ - for url in self.ServerUrls: - request_url = "{}_index_template/{}?format=json".format(url, template_name) - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - async with self.session().request( - method="GET", - url=request_url, - headers=self.Headers, - ssl=ssl_context, - ) as resp: - assert resp.status == 200, "Unexpected response code: {}".format(resp.status) - content = await resp.json() - await self.session().close() - return content - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + request_url = "{}_index_template/{}?format=json".format(url, template_name) + + try: + async with session.get( + url=request_url, + headers=self.Headers, + ssl=self.SSLContext, + ) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + content = await resp.json() + return content + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) async def put_index_template(self, template_name: str, template: dict) -> dict: @@ -339,85 +291,74 @@ async def put_index_template(self, template_name: str, template: dict) -> dict: :return: JSON response. :raise Exception: Raised if connection to all server URLs fails. """ - for url in self.ServerUrls: - request_url = "{}_index_template/{}".format(url, template_name) - # L.warning("Posting index template into url: {}".format(request_url)) - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - async with self.session().request( - method="PUT", - url=request_url, - data=json.dumps(template), - headers=self.Headers, - ssl=ssl_context, - ) as resp: - - if resp.status != 200: - raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) - - resp = await resp.json() - await self.session().close() - return resp - - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) - return {} + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + request_url = "{}_index_template/{}".format(url, template_name) + + try: + async with session.put( + url=request_url, + data=json.dumps(template), + headers=self.Headers, + ssl=self.SSLContext, + ) as resp: + + if resp.status != 200: + raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) + + resp = await resp.json() + return resp + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + return {} async def reindex(self, previous_index, new_index): - for url in self.ServerUrls: - try: - if url.endswith('/'): - request_url = "{}_reindex".format(url) - else: - request_url = "{}/_reindex".format(url) - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - async with self.session().request( - method="POST", - url=request_url, - headers=self.Headers, - ssl=ssl_context, - data=json.dumps({ - "source": { - "index": previous_index, - }, - "dest": { - "index": new_index, - } - }) - ) as resp: + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + try: + if url.endswith('/'): + request_url = "{}_reindex".format(url) + else: + request_url = "{}/_reindex".format(url) - if resp.status != 200: - raise AssertionError( - "Unexpected response code when reindexing: {}, {}".format( - resp.status, await resp.text() + async with session.post( + url=request_url, + headers=self.Headers, + ssl=self.SSLContext, + data=json.dumps({ + "source": { + "index": previous_index, + }, + "dest": { + "index": new_index, + } + }) + ) as resp: + + if resp.status != 200: + raise AssertionError( + "Unexpected response code when reindexing: {}, {}".format( + resp.status, await resp.text() + ) ) - ) - resp = await resp.json() - await self.session().close() - return resp - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise ConnectionError("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + resp = await resp.json() + return resp + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise ConnectionError("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) async def scroll(self, index: str, body: typing.Optional[dict] = None) -> dict: - """Retrieve the next batch of results for a scrolling search. + """ + Retrieve the next batch of results for a scrolling search. :param index: The index name. :type index: str @@ -432,62 +373,60 @@ async def scroll(self, index: str, body: typing.Optional[dict] = None) -> dict: } scroll_id = None - while True: - for url in self.ServerUrls: - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None + async with aiohttp.ClientSession() as session: + while True: + for url in self.ServerUrls: - if scroll_id is None: - path = "{}/_search?scroll={}".format( - index, self.ScrollTimeout - ) - request_body = body - else: - path = "_search/scroll" - request_body = { - "scroll": self.ScrollTimeout, - "scroll_id": scroll_id, - } - request_url = "{}{}".format(url, path) - try: - async with self.session().request( - method="POST", - url=request_url, - json=request_body, - headers=self.Headers, - ssl=ssl_context, - ) as resp: - if resp.status != 200: - data = await resp.text() - L.error( - "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( - resp.status, url, data - ) - ) - break - response_json = await resp.json() - await self.session().close() - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception( - "Failed to connect to '{}'".format( - url - ) + if scroll_id is None: + path = "{}/_search?scroll={}".format( + index, self.ScrollTimeout ) + request_body = body else: - L.warning( - "Failed to connect to '{}', iterating to another cluster node".format( - url + path = "_search/scroll" + request_body = { + "scroll": self.ScrollTimeout, + "scroll_id": scroll_id, + } + request_url = "{}{}".format(url, path) + + try: + async with session.post( + url=request_url, + json=request_body, + headers=self.Headers, + ssl=self.SSLContext, + ) as resp: + if resp.status != 200: + data = await resp.text() + L.error( + "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( + resp.status, url, data + ) + ) + break + response_json = await resp.json() + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception( + "Failed to connect to '{}'".format( + url + ) + ) + else: + L.warning( + "Failed to connect to '{}', iterating to another cluster node".format( + url + ) ) - ) - scroll_id = response_json.get("_scroll_id") - if scroll_id is None: - break - return response_json + scroll_id = response_json.get("_scroll_id") + if scroll_id is None: + break + + return response_json + def upsertor(self, index: str, obj_id=None, version: int = 0): return ElasticSearchUpsertor(self, index, obj_id, version) @@ -518,30 +457,29 @@ async def list(self, index: str, _from: int = 0, size: int = 10000, body: typing } } } - for url in self.ServerUrls: - - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - request_url = "{}{}/_search?size={}&from={}&version=true".format(url, index, size, _from) - async with self.session().request( - method="GET", - url=request_url, - json=body, - headers=self.Headers, - ssl=ssl_context, - ) as resp: - assert resp.status == 200, "Unexpected response code: {}".format(resp.status) - content = await resp.json() - return content - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + + try: + request_url = "{}{}/_search?size={}&from={}&version=true".format(url, index, size, _from) + + async with session.get( + url=request_url, + json=body, + headers=self.Headers, + ssl=self.SSLContext, + ) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + content = await resp.json() + return content + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async def count(self, index) -> int: """ @@ -551,28 +489,27 @@ async def count(self, index) -> int: :return: The number of matches for a given index. :raise Exception: Connection failed. """ - for url in self.ServerUrls: - try: - count_url = "{}{}/_count".format(url, index) - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - async with self.session().request( - method="GET", - url=count_url, - ssl=ssl_context, - headers=self.Headers - ) as resp: - assert resp.status == 200, "Unexpected response code: {}".format(resp.status) - total_count = await resp.json() - return total_count - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + try: + count_url = "{}{}/_count".format(url, index) + + async with session.get( + url=count_url, + headers=self.Headers, + ssl=self.SSLContext, + ) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + total_count = await resp.json() + return total_count + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async def indices(self, search_string=None): """ @@ -580,28 +517,23 @@ async def indices(self, search_string=None): :param search_string: A search string. Default to None. """ - for url in self.ServerUrls: - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - request_url = "{}_cat/indices/{}?format=json".format(url, search_string if search_string is not None else "*") - async with self.session().request( - method="GET", - url=request_url, - ssl=ssl_context, - headers=self.Headers - ) as resp: - assert resp.status == 200, "Unexpected response code: {}".format(resp.status) - return await resp.json() - - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + try: + request_url = "{}_cat/indices/{}?format=json".format(url, search_string if search_string is not None else "*") + async with session.get( + url=request_url, + ssl=self.SSLContext, + headers=self.Headers + ) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + return await resp.json() + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) async def empty_index(self, index, settings=None): @@ -613,32 +545,28 @@ async def empty_index(self, index, settings=None): if settings is None: settings = {} - for url in self.ServerUrls: - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - request_url = "{}{}".format(url, index) - async with self.session().request( - method="PUT", - url=request_url, - json=settings, - ssl=ssl_context, - headers=self.Headers - ) as resp: - - if resp.status != 200: - raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) - - return await resp.json() - - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + + try: + request_url = "{}{}".format(url, index) + async with session.put( + url=request_url, + json=settings, + ssl=self.SSLContext, + headers=self.Headers + ) as resp: + + if resp.status != 200: + raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) + + return await resp.json() + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) async def put_policy(self, policy_name, settings=None): @@ -649,32 +577,28 @@ async def put_policy(self, policy_name, settings=None): if settings is None: settings = {} - for url in self.ServerUrls: - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - request_url = "{}_ilm/policy/{}".format(url, policy_name) - async with self.session().request( - method="PUT", - url=request_url, - json=settings, - ssl=ssl_context, - headers=self.Headers - ) as resp: - - if resp.status != 200: - raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) - - return await resp.json() - - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + + try: + request_url = "{}_ilm/policy/{}".format(url, policy_name) + async with session.put( + url=request_url, + json=settings, + ssl=ssl.SSLContext, + headers=self.Headers + ) as resp: + + if resp.status != 200: + raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) + + return await resp.json() + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) async def policies(self): @@ -683,31 +607,27 @@ async def policies(self): :param search_string: A search string. Default to None. """ - for url in self.ServerUrls: - if url.startswith('https://'): - ssl_context = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - request_url = "{}_ilm/policy".format(url) - async with self.session().request( - method="GET", - url=request_url, - ssl=ssl_context, - headers=self.Headers - ) as resp: - - if resp.status != 200: - raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) - - return await resp.json() - - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + async with aiohttp.ClientSession() as session: + for url in self.ServerUrls: + + try: + request_url = "{}_ilm/policy".format(url) + async with session.get( + url=request_url, + ssl=self.SSLContext, + headers=self.Headers + ) as resp: + + if resp.status != 200: + raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) + + return await resp.json() + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) class ElasticSearchUpsertor(UpsertorABC): @@ -722,11 +642,6 @@ def __init__(self, storage, collection, obj_id, version=None): if version == 0: self.ModSet['_c'] = now # Set the creation timestamp - api_key = Config.get('asab:storage', 'elasticsearch_api_key') - self.Headers = {'Content-Type': 'application/json'} - if api_key != '': - self.Headers['Authorization'] = "ApiKey {}".format(api_key) - @classmethod def generate_id(cls): @@ -761,70 +676,19 @@ async def _insert_new_object(self): raise NotImplementedError("yet") # This is insert of the new document, the ObjId is to be generated by the ElasicSearch - for url in self.Storage.ServerUrls: - request_url = "{}{}/_doc?refresh={}".format( - url, self.Collection, self.Storage.Refresh - ) - - if url.startswith('https://'): - ssl_context = self.Storage.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None - - try: - async with self.Storage.session().request( - method="POST", - url=request_url, - headers=self.Headers, - json=upsert_data, - ssl=ssl_context - ) as resp: - if resp.status == 401: - raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.") - elif resp.status not in {200, 201}: - raise ConnectionError("Unexpected response code: {}".format(resp.status)) - else: - resp_json = await resp.json() - self.ObjId = resp_json['_id'] - await self.Storage.session().close() - return self.ObjId - - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.Storage.ServerUrls[-1]: - raise Exception("Failed to connect to '{}'".format(url)) - else: - L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) - - except aiohttp.client_exceptions.ServerDisconnectedError: - raise Exception("Failed to connect to '{}'".format(url)) - - except ValueError as err: - raise ConnectionError("Both username and API key specified. Please choose one option. {}".format(err)) - - # except Exception: - # raise Exception("Failed to connect to '{}'".format(url)) - - async def _update_existing_object(self): - upsert_data = {"doc": {}, "doc_as_upsert": True} - - if len(self.ModSet) > 0: - for k, v in self.ModSet.items(): - upsert_data["doc"][k] = serialize(self.ModSet[k]) + async with aiohttp.ClientSession() as session: for url in self.Storage.ServerUrls: - if url.startswith('https://'): - ssl_context = self.Storage.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - ssl_context = None + request_url = "{}{}/_doc?refresh={}".format( + url, self.Collection, self.Storage.Refresh + ) try: - request_url = "{}{}/_update/{}?refresh={}".format(url, self.Collection, self.ObjId, self.Storage.Refresh) - async with self.Storage.session().request( - method="POST", + async with session.post( url=request_url, + headers=self.Storage.Headers, json=upsert_data, - headers=self.Headers, - ssl=ssl_context, + ssl=self.Storage.SSLContext, ) as resp: if resp.status == 401: raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.") @@ -832,10 +696,9 @@ async def _update_existing_object(self): raise ConnectionError("Unexpected response code: {}".format(resp.status)) else: resp_json = await resp.json() - assert resp_json["result"] == "updated" or resp_json[ - "result"] == "created", "Creating/updating was unsuccessful" - await self.Storage.session().close() + self.ObjId = resp_json['_id'] return self.ObjId + except aiohttp.client_exceptions.ClientConnectorError: if url == self.Storage.ServerUrls[-1]: raise Exception("Failed to connect to '{}'".format(url)) @@ -845,9 +708,74 @@ async def _update_existing_object(self): except aiohttp.client_exceptions.ServerDisconnectedError: raise Exception("Failed to connect to '{}'".format(url)) + except ValueError as err: + raise ConnectionError("Both username and API key specified. Please choose one option. {}".format(err)) + + # except Exception: + # raise Exception("Failed to connect to '{}'".format(url)) + + + async def _update_existing_object(self): + upsert_data = {"doc": {}, "doc_as_upsert": True} + + if len(self.ModSet) > 0: + for k, v in self.ModSet.items(): + upsert_data["doc"][k] = serialize(self.ModSet[k]) + + async with aiohttp.ClientSession() as session: + for url in self.Storage.ServerUrls: + + try: + request_url = "{}{}/_update/{}?refresh={}".format(url, self.Collection, self.ObjId, self.Storage.Refresh) + async with session.post( + url=request_url, + json=upsert_data, + headers=self.Storage.Headers, + ssl=self.Storage.SSLContext, + ) as resp: + if resp.status == 401: + raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.") + elif resp.status not in {200, 201}: + raise ConnectionError("Unexpected response code: {}".format(resp.status)) + else: + resp_json = await resp.json() + assert resp_json["result"] == "updated" or resp_json[ + "result"] == "created", "Creating/updating was unsuccessful" + return self.ObjId + + except aiohttp.client_exceptions.ClientConnectorError: + if url == self.Storage.ServerUrls[-1]: + raise Exception("Failed to connect to '{}'".format(url)) + else: + L.warning("Failed to connect to '{}', iterating to another cluster node".format(url)) + + except aiohttp.client_exceptions.ServerDisconnectedError: + raise Exception("Failed to connect to '{}'".format(url)) + def serialize(v): if isinstance(v, datetime.datetime): return v.timestamp() else: return v + + +def build_headers(username, password, api_key): + + # Check configurations + if username != '' and username is not None and api_key != '' and api_key is not None: + raise ValueError("Both username and API key can't be specified. Please choose one option.") + + headers = { + 'Content-Type': 'application/json', + } + + # Build headers + if username != '' and username is not None: + auth = aiohttp.BasicAuth(username, password) + headers['Authorization'] = auth.encode() + + elif api_key != '' and api_key is not None: + headers['Authorization'] = 'ApiKey {}'.format(api_key) + + return headers