-
Notifications
You must be signed in to change notification settings - Fork 1
/
gmb_etl_main.py
61 lines (49 loc) · 1.83 KB
/
gmb_etl_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import os
import sys
import yaml
import argparse
import shutil
from gmb_transformation import GmbTransformation
from gmb_extraction import GmbExtraction
from bento.common.utils import get_logger
import subprocess
parser = argparse.ArgumentParser()
parser.add_argument('config_file', help='The name of the config file')
args = parser.parse_args()
config_file = args.config_file
with open(config_file) as f:
config = yaml.load(f, Loader = yaml.FullLoader)
gmb_log = get_logger('GMB Main')
try:
# Extract data files
gmb_extractor = GmbExtraction(config)
timestamp = gmb_extractor.extract()
except Exception as e:
gmb_log.error(e)
gmb_log.error('GMB data extraction failed, abort the GMB ETL process')
sys.exit(1)
try:
# Transform data files
s3_sub_folder = timestamp
download_data = False
gmb_transformer = GmbTransformation(config_file, s3_sub_folder, download_data)
gmb_transformer.transform()
except Exception as e:
gmb_log.error(e)
gmb_log.error('GMB data transformation failed, abort the GMB ETL process')
sys.exit(1)
try:
# Copy static files to the transformed data files' folder
for static_file in os.listdir(config['STATIC_FILES']):
shutil.copy(os.path.join(config['STATIC_FILES'], static_file) , config['OUTPUT_FOLDER_TRANSFORMED'])
except Exception as e:
gmb_log.error(e)
gmb_log.error('GMB static files copying failed, abort the GMB ETL process')
sys.exit(1)
# Load data files to the neo4j database
data_loader_command = ['python3', config['DATA_LOADER'], config['DATA_LOADER_CONFIG'], '--dataset', config['OUTPUT_FOLDER_TRANSFORMED']]
data_loader_result = subprocess.call(data_loader_command)
if data_loader_result != 0:
# if something is wrong while running the data loader
gmb_log.error('GMB data upload failed, abort the GMB ETL process')
sys.exit(1)