Skip to content

Commit

Permalink
es dump stability improvements
Browse files Browse the repository at this point in the history
- parallel_bulk -> streaming_bulk, as parallel_bulk has problems with memory consumption
- add retry_on_timeout=True and max_retries=100 to Elasticsearch connector
- refresh interval for es indexing resets to 1s even if the bulk load exists early
  • Loading branch information
kdutia committed May 20, 2021
1 parent e60bcc6 commit 0f771ec
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ All notable changes documented below.

## 1.0.0-beta
- **enhancement (breaking change):** properties now passed as whitespace-separated list rather than comma-separated. They can also be passed through a config file by giving the `--properties` option a filename to a file that exists.
- **stability improvements:** `elasticsearch.helpers.streaming_bulk` now used instead of `elasticsearch.helpers.parallel_bulk` due to issues with memory usage of the latter. Bulk load now retries on timeout.

## 0.3.7
- **fix:** reading from JSON dump forces utf-8
Expand Down
54 changes: 31 additions & 23 deletions elastic_wikidata/dump_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from itertools import islice
from tqdm.auto import tqdm
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from elasticsearch.helpers import streaming_bulk
from typing import Union
from elastic_wikidata.wd_entities import (
get_entities,
Expand Down Expand Up @@ -87,11 +87,16 @@ def start_elasticsearch(self):
self.es_credentials["ELASTICSEARCH_USER"],
self.es_credentials["ELASTICSEARCH_PASSWORD"],
),
max_retries=100,
retry_on_timeout=True,
)
else:
# run on localhost
print("Connecting to Elasticsearch on localhost")
self.es = Elasticsearch()
self.es = Elasticsearch(
max_retries=100,
retry_on_timeout=True,
)

mappings = {
"mappings": {
Expand Down Expand Up @@ -122,28 +127,31 @@ def dump_to_es(self):
elif self.entities:
action_generator = self.generate_actions_from_entities()

for ok, action in tqdm(
parallel_bulk(
client=self.es,
index=self.index_name,
actions=action_generator,
chunk_size=self.config["chunk_size"],
queue_size=self.config["queue_size"],
),
):
if not ok:
print(action)
errors.append(action)
successes += ok
try:
for ok, action in tqdm(
streaming_bulk(
client=self.es,
index=self.index_name,
actions=action_generator,
chunk_size=self.config["chunk_size"],
# queue_size=self.config["queue_size"],
max_retries=3,
),
):
if not ok:
print(action)
errors.append(action)
successes += ok

if self.disable_refresh_on_index:
# reset back to default
print("Refresh interval set back to default of 1s.")
self.es.indices.put_settings({"index": {"refresh_interval": "1s"}})
finally:
if self.disable_refresh_on_index:
# reset back to default
print("Refresh interval set back to default of 1s.")
self.es.indices.put_settings({"index": {"refresh_interval": "1s"}})

def process_doc(self, doc: dict) -> dict:
"""
Processes a single document from the JSON dump, returning a filtered version of that document.
Processes a single document from the JSON dump, returning a filtered version of that document.
"""

lang = self.wiki_options["lang"]
Expand All @@ -153,8 +161,8 @@ def process_doc(self, doc: dict) -> dict:

def generate_actions_from_dump(self):
"""
Generator to yield a processed document from the Wikidata JSON dump.
Each line of the Wikidata JSON dump is a separate document.
Generator to yield a processed document from the Wikidata JSON dump.
Each line of the Wikidata JSON dump is a separate document.
"""
with open(self.dump_path, "r", encoding="utf-8") as f:
objects = (json.loads(line) for line in f)
Expand All @@ -170,7 +178,7 @@ def generate_actions_from_dump(self):

def generate_actions_from_entities(self):
"""
Generator to yield processed document from list of entities. Calls are made to
Generator to yield processed document from list of entities. Calls are made to
wbgetentities API with page size of 50 to retrieve documents.
"""

Expand Down

0 comments on commit 0f771ec

Please sign in to comment.