Skip to content

Commit

Permalink
Flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
ateska committed Nov 9, 2023
1 parent e592477 commit 7b09ab5
Showing 1 changed file with 12 additions and 85 deletions.
97 changes: 12 additions & 85 deletions asab/storage/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import ssl
import time
import json
import aiohttp
import logging
import datetime
import urllib.parse
import typing
import contextlib

import aiohttp

from .service import StorageServiceABC
from .upsertor import UpsertorABC
from ..config import Config
from ..tls import SSLContextBuilder

import ssl

#

L = logging.getLogger(__name__)
Expand Down Expand Up @@ -53,7 +52,7 @@ def __init__(self, app, service_name, config_section_name='asab:storage'):

self.ServerUrls = []
urls = Config.getmultiline(config_section_name, 'elasticsearch_url')
for url in urls:
for url in urls:
parsed_url = urllib.parse.urlparse(url)
self.ServerUrls += [
urllib.parse.urlunparse((parsed_url.scheme, netloc, parsed_url.path, None, None, None))
Expand Down Expand Up @@ -124,7 +123,7 @@ async def is_connected(self) -> bool:
"reason": resp.get("error", {}).get("reason")
})
return False

else:
L.info("Connected to ElasticSearch.", struct_data={"urls": self.ServerUrls})
return True
Expand Down Expand Up @@ -152,14 +151,14 @@ async def get(self, index: str, obj_id: str, decrypt=None) -> dict:
raise NotImplementedError("AES encryption for ElasticSearch not implemented")

async with self.request("GET", "{}/_doc/{}".format(index, obj_id)) as resp:

if resp.status not in {200, 201}:
resp = await resp.json()
raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
resp.get("status"),
resp.get("error", {}).get("reason")
))

else:
obj = await resp.json()
if not obj.get("found"):
Expand Down Expand Up @@ -196,16 +195,16 @@ async def delete(self, index: str, _id=None) -> dict:
else:
path = "{}".format(index)

async with self.request("DELETE", path) as resp:
async with self.request("DELETE", path) as resp:
if resp.status == 404:
raise KeyError("No existing object with ID {}".format(_id))

elif resp.status not in {200, 201}:
raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
resp.get("status"),
resp.get("error", {}).get("reason")
))

else:
json_response = await resp.json()

Expand Down Expand Up @@ -241,7 +240,7 @@ async def get_index_template(self, template_name: str) -> dict:
:raise Exception: Raised if connection to all server URLs fails.
:return: ElasticSearch Index template.
"""
async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp:
async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
Expand All @@ -265,7 +264,7 @@ async def put_index_template(self, template_name: str, template: dict) -> dict:

async def reindex(self, previous_index, new_index):

data={
data = {
"source": {
"index": previous_index,
},
Expand All @@ -285,78 +284,6 @@ async def reindex(self, previous_index, new_index):
return await resp.json()


async def scroll(self, index: str, body: typing.Optional[dict] = None) -> dict:
"""
Retrieve the next batch of results for a scrolling search.
:param index: The index name.
:type index: str
:param body: Custom body for the request. Defaults to None.
:type body: dict
:return: JSON response.
:raise Exception: Raised if connection to all server URLs fails.
"""
if body is None:
body = {
"query": {"bool": {"must": {"match_all": {}}}}
}

scroll_id = None
async with aiohttp.ClientSession() as session:
while True:
for url in self.ServerUrls:

if scroll_id is None:
path = "{}/_search?scroll={}".format(
index, self.ScrollTimeout
)
request_body = body
else:
path = "_search/scroll"
request_body = {
"scroll": self.ScrollTimeout,
"scroll_id": scroll_id,
}
request_url = "{}{}".format(url, path)

try:
async with session.post(
url=request_url,
json=request_body,
headers=self.Headers,
ssl=self.SSLContext,
) as resp:
if resp.status != 200:
data = await resp.text()
L.error(
"Failed to fetch data from ElasticSearch: {} from {}\n{}".format(
resp.status, url, data
)
)
break
response_json = await resp.json()

except aiohttp.client_exceptions.ClientConnectorError:
if url == self.ServerUrls[-1]:
raise Exception(
"Failed to connect to '{}'".format(
url
)
)
else:
L.warning(
"Failed to connect to '{}', iterating to another cluster node".format(
url
)
)

scroll_id = response_json.get("_scroll_id")
if scroll_id is None:
break

return response_json


def upsertor(self, index: str, obj_id=None, version: int = 0):
return ElasticSearchUpsertor(self, index, obj_id, version)

Expand Down

0 comments on commit 7b09ab5

Please sign in to comment.