Skip to content

Commit

Permalink
Add Elasticseach sample publisher (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhettler authored and Hans Adriaans committed Jun 30, 2022
1 parent 08f5cb1 commit af9580d
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions databuilder/example/scripts/sample_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,31 @@
"""

import csv
from elasticsearch import Elasticsearch
import logging
from pyhocon import ConfigFactory
import sqlite3
from sqlalchemy.ext.declarative import declarative_base
import textwrap
import uuid

from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.transformer.elasticsearch_document_transformer import ElasticsearchDocumentTransformer

# change to the address of Elasticsearh service
es = Elasticsearch([
{'host': '0.0.0.0'},
])

DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Expand Down Expand Up @@ -147,6 +158,54 @@ def create_sample_job(table_name, model_name):
return job


def create_es_publisher_sample_job():

# loader save data to this location and publisher read if from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=ElasticsearchDocumentTransformer())

# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'tables_alias'

job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.neo4j_data.Neo4jDataResult',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job


if __name__ == "__main__":
load_table_data_from_csv('sample_table.csv')
load_col_data_from_csv('sample_col.csv')
Expand All @@ -160,3 +219,7 @@ def create_sample_job(table_name, model_name):
job2 = create_sample_job('test_col_metadata',
'example.models.test_column_model.TestColumnMetadata')
job2.launch()

# start Elasticsearch publish job
job3 = create_es_publisher_sample_job()
job3.launch()

0 comments on commit af9580d

Please sign in to comment.