From af9580dee9c96eaf9cf87957f702be138db61b82 Mon Sep 17 00:00:00 2001 From: Jakub Hettler Date: Sat, 20 Apr 2019 08:20:54 +0200 Subject: [PATCH] Add Elasticseach sample publisher (#23) --- .../example/scripts/sample_data_loader.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/databuilder/example/scripts/sample_data_loader.py b/databuilder/example/scripts/sample_data_loader.py index b27f643f5d..0d9df76723 100644 --- a/databuilder/example/scripts/sample_data_loader.py +++ b/databuilder/example/scripts/sample_data_loader.py @@ -3,20 +3,31 @@ """ import csv +from elasticsearch import Elasticsearch import logging from pyhocon import ConfigFactory import sqlite3 from sqlalchemy.ext.declarative import declarative_base import textwrap +import uuid +from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor from databuilder.job.job import DefaultJob from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader +from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader from databuilder.publisher import neo4j_csv_publisher +from databuilder.extractor.neo4j_extractor import Neo4jExtractor from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher +from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher from databuilder.task.task import DefaultTask from databuilder.transformer.base_transformer import NoopTransformer +from databuilder.transformer.elasticsearch_document_transformer import ElasticsearchDocumentTransformer +# change to the address of Elasticsearh service +es = Elasticsearch([ + {'host': '0.0.0.0'}, +]) DB_FILE = '/tmp/test.db' SQLITE_CONN_STRING = 'sqlite:////tmp/test.db' @@ -147,6 +158,54 @@ def create_sample_job(table_name, model_name): return job +def create_es_publisher_sample_job(): + + # loader save data to this location and publisher read if from here + extracted_search_data_path = '/var/tmp/amundsen/search_data.json' + + task = DefaultTask(loader=FSElasticsearchJSONLoader(), + extractor=Neo4jSearchDataExtractor(), + transformer=ElasticsearchDocumentTransformer()) + + # elastic search client instance + elasticsearch_client = es + # unique name of new index in Elasticsearch + elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) + # related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38 + elasticsearch_new_index_key_type = 'table' + # alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index + elasticsearch_index_alias = 'tables_alias' + + job_config = ConfigFactory.from_dict({ + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): + 'databuilder.models.neo4j_data.Neo4jDataResult', + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', + 'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY): + elasticsearch_new_index_key, + 'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY): + elasticsearch_new_index_key_type, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): + elasticsearch_client, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): + elasticsearch_new_index_key, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): + elasticsearch_index_alias + }) + + job = DefaultJob(conf=job_config, + task=task, + publisher=ElasticsearchPublisher()) + return job + + if __name__ == "__main__": load_table_data_from_csv('sample_table.csv') load_col_data_from_csv('sample_col.csv') @@ -160,3 +219,7 @@ def create_sample_job(table_name, model_name): job2 = create_sample_job('test_col_metadata', 'example.models.test_column_model.TestColumnMetadata') job2.launch() + + # start Elasticsearch publish job + job3 = create_es_publisher_sample_job() + job3.launch()