diff --git a/asab/storage/elasticsearch.py b/asab/storage/elasticsearch.py index b350eefb5..64cdc8223 100644 --- a/asab/storage/elasticsearch.py +++ b/asab/storage/elasticsearch.py @@ -1,19 +1,18 @@ +import ssl import time -import json -import aiohttp import logging import datetime import urllib.parse import typing import contextlib +import aiohttp + from .service import StorageServiceABC from .upsertor import UpsertorABC from ..config import Config from ..tls import SSLContextBuilder -import ssl - # L = logging.getLogger(__name__) @@ -53,7 +52,7 @@ def __init__(self, app, service_name, config_section_name='asab:storage'): self.ServerUrls = [] urls = Config.getmultiline(config_section_name, 'elasticsearch_url') - for url in urls: + for url in urls: parsed_url = urllib.parse.urlparse(url) self.ServerUrls += [ urllib.parse.urlunparse((parsed_url.scheme, netloc, parsed_url.path, None, None, None)) @@ -124,7 +123,7 @@ async def is_connected(self) -> bool: "reason": resp.get("error", {}).get("reason") }) return False - + else: L.info("Connected to ElasticSearch.", struct_data={"urls": self.ServerUrls}) return True @@ -152,14 +151,14 @@ async def get(self, index: str, obj_id: str, decrypt=None) -> dict: raise NotImplementedError("AES encryption for ElasticSearch not implemented") async with self.request("GET", "{}/_doc/{}".format(index, obj_id)) as resp: - + if resp.status not in {200, 201}: resp = await resp.json() raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format( resp.get("status"), resp.get("error", {}).get("reason") )) - + else: obj = await resp.json() if not obj.get("found"): @@ -196,16 +195,16 @@ async def delete(self, index: str, _id=None) -> dict: else: path = "{}".format(index) - async with self.request("DELETE", path) as resp: + async with self.request("DELETE", path) as resp: if resp.status == 404: raise KeyError("No existing object with ID {}".format(_id)) - + elif resp.status not in {200, 201}: raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format( resp.get("status"), resp.get("error", {}).get("reason") )) - + else: json_response = await resp.json() @@ -241,7 +240,7 @@ async def get_index_template(self, template_name: str) -> dict: :raise Exception: Raised if connection to all server URLs fails. :return: ElasticSearch Index template. """ - async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp: + async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp: if resp.status != 200: raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text())) return await resp.json() @@ -265,7 +264,7 @@ async def put_index_template(self, template_name: str, template: dict) -> dict: async def reindex(self, previous_index, new_index): - data={ + data = { "source": { "index": previous_index, }, @@ -285,78 +284,6 @@ async def reindex(self, previous_index, new_index): return await resp.json() - async def scroll(self, index: str, body: typing.Optional[dict] = None) -> dict: - """ - Retrieve the next batch of results for a scrolling search. - - :param index: The index name. - :type index: str - :param body: Custom body for the request. Defaults to None. - :type body: dict - :return: JSON response. - :raise Exception: Raised if connection to all server URLs fails. - """ - if body is None: - body = { - "query": {"bool": {"must": {"match_all": {}}}} - } - - scroll_id = None - async with aiohttp.ClientSession() as session: - while True: - for url in self.ServerUrls: - - if scroll_id is None: - path = "{}/_search?scroll={}".format( - index, self.ScrollTimeout - ) - request_body = body - else: - path = "_search/scroll" - request_body = { - "scroll": self.ScrollTimeout, - "scroll_id": scroll_id, - } - request_url = "{}{}".format(url, path) - - try: - async with session.post( - url=request_url, - json=request_body, - headers=self.Headers, - ssl=self.SSLContext, - ) as resp: - if resp.status != 200: - data = await resp.text() - L.error( - "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( - resp.status, url, data - ) - ) - break - response_json = await resp.json() - - except aiohttp.client_exceptions.ClientConnectorError: - if url == self.ServerUrls[-1]: - raise Exception( - "Failed to connect to '{}'".format( - url - ) - ) - else: - L.warning( - "Failed to connect to '{}', iterating to another cluster node".format( - url - ) - ) - - scroll_id = response_json.get("_scroll_id") - if scroll_id is None: - break - - return response_json - - def upsertor(self, index: str, obj_id=None, version: int = 0): return ElasticSearchUpsertor(self, index, obj_id, version)