From 0f771ecb312faee4a3a41c869b65a0a71d9b9150 Mon Sep 17 00:00:00 2001 From: Kalyan Dutia Date: Thu, 20 May 2021 10:01:29 +0100 Subject: [PATCH] es dump stability improvements - parallel_bulk -> streaming_bulk, as parallel_bulk has problems with memory consumption - add retry_on_timeout=True and max_retries=100 to Elasticsearch connector - refresh interval for es indexing resets to 1s even if the bulk load exists early --- CHANGELOG.md | 1 + elastic_wikidata/dump_to_es.py | 54 +++++++++++++++++++--------------- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5e2c07..411901e 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ All notable changes documented below. ## 1.0.0-beta - **enhancement (breaking change):** properties now passed as whitespace-separated list rather than comma-separated. They can also be passed through a config file by giving the `--properties` option a filename to a file that exists. +- **stability improvements:** `elasticsearch.helpers.streaming_bulk` now used instead of `elasticsearch.helpers.parallel_bulk` due to issues with memory usage of the latter. Bulk load now retries on timeout. ## 0.3.7 - **fix:** reading from JSON dump forces utf-8 diff --git a/elastic_wikidata/dump_to_es.py b/elastic_wikidata/dump_to_es.py index 39579a3..8433af0 100755 --- a/elastic_wikidata/dump_to_es.py +++ b/elastic_wikidata/dump_to_es.py @@ -2,7 +2,7 @@ from itertools import islice from tqdm.auto import tqdm from elasticsearch import Elasticsearch -from elasticsearch.helpers import parallel_bulk +from elasticsearch.helpers import streaming_bulk from typing import Union from elastic_wikidata.wd_entities import ( get_entities, @@ -87,11 +87,16 @@ def start_elasticsearch(self): self.es_credentials["ELASTICSEARCH_USER"], self.es_credentials["ELASTICSEARCH_PASSWORD"], ), + max_retries=100, + retry_on_timeout=True, ) else: # run on localhost print("Connecting to Elasticsearch on localhost") - self.es = Elasticsearch() + self.es = Elasticsearch( + max_retries=100, + retry_on_timeout=True, + ) mappings = { "mappings": { @@ -122,28 +127,31 @@ def dump_to_es(self): elif self.entities: action_generator = self.generate_actions_from_entities() - for ok, action in tqdm( - parallel_bulk( - client=self.es, - index=self.index_name, - actions=action_generator, - chunk_size=self.config["chunk_size"], - queue_size=self.config["queue_size"], - ), - ): - if not ok: - print(action) - errors.append(action) - successes += ok + try: + for ok, action in tqdm( + streaming_bulk( + client=self.es, + index=self.index_name, + actions=action_generator, + chunk_size=self.config["chunk_size"], + # queue_size=self.config["queue_size"], + max_retries=3, + ), + ): + if not ok: + print(action) + errors.append(action) + successes += ok - if self.disable_refresh_on_index: - # reset back to default - print("Refresh interval set back to default of 1s.") - self.es.indices.put_settings({"index": {"refresh_interval": "1s"}}) + finally: + if self.disable_refresh_on_index: + # reset back to default + print("Refresh interval set back to default of 1s.") + self.es.indices.put_settings({"index": {"refresh_interval": "1s"}}) def process_doc(self, doc: dict) -> dict: """ - Processes a single document from the JSON dump, returning a filtered version of that document. + Processes a single document from the JSON dump, returning a filtered version of that document. """ lang = self.wiki_options["lang"] @@ -153,8 +161,8 @@ def process_doc(self, doc: dict) -> dict: def generate_actions_from_dump(self): """ - Generator to yield a processed document from the Wikidata JSON dump. - Each line of the Wikidata JSON dump is a separate document. + Generator to yield a processed document from the Wikidata JSON dump. + Each line of the Wikidata JSON dump is a separate document. """ with open(self.dump_path, "r", encoding="utf-8") as f: objects = (json.loads(line) for line in f) @@ -170,7 +178,7 @@ def generate_actions_from_dump(self): def generate_actions_from_entities(self): """ - Generator to yield processed document from list of entities. Calls are made to + Generator to yield processed document from list of entities. Calls are made to wbgetentities API with page size of 50 to retrieve documents. """