Skip to content

Commit

Permalink
Remove some dependencies that are no longer available.
Browse files Browse the repository at this point in the history
  • Loading branch information
jaeadams committed Mar 1, 2024
1 parent bbb93a8 commit c2f7a8c
Showing 1 changed file with 68 additions and 11 deletions.
79 changes: 68 additions & 11 deletions G2Loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import importlib
import io
import json
import math
import os
Expand All @@ -20,9 +21,7 @@
from multiprocessing import Process, Queue, Value, Manager
from queue import Empty, Full

import DumpStack
import G2Paths
from CompressedFile import fileRowParser, isCompressedFile, openPossiblyCompressedFile
from G2ConfigTables import G2ConfigTables
from G2IniParams import G2IniParams
from G2Project import G2Project
Expand Down Expand Up @@ -349,11 +348,74 @@ def check_resources_and_startup(return_queue, thread_count, do_purge):
return_queue.put(thread_count)


def fileRowParser(line, fileData, rowNum=0, errors_file=None, errors_short=False, errors_disable=False):

def write_error(row_num, line, msg='ERROR: Unknown error'):
''' Write error to terminal and file if not disabled '''

print(f' ERROR: {msg} {row_num} ({line[:50]})', flush=True)

if errors_file and not errors_disable:
if not errors_short:
errors_file.write(f'\n{str(datetime.now())} ERROR: {msg} {rowNum}\n\t{line}\n')
else:
errors_file.write(f'\n{str(datetime.now())} ERROR: {msg} {rowNum}\n')
errors_file.flush()

line = line.strip()

if len(line) == 0:
print(f' WARNING: Row {rowNum} is blank')
return None

# Its a JSON string
if fileData['FILE_FORMAT'] == 'JSON':
try:
rowData = json.loads(line)
except Exception:
write_error(rowNum, line, 'Invalid JSON in row')
return None

return rowData

# Its a UMF string
if fileData['FILE_FORMAT'] == 'UMF':
if not (line.upper().startswith('<UMF_DOC') or not line.upper().endswith('/UMF_DOC>')):
write_error(rowNum, line, 'Invalid UMF in row')
return None

return line

# Its a CSV variant
else:

# --handling for multi-character delimiters as csv module does not allow for it
try:
if fileData['MULTICHAR_DELIMITER']:
rowData = [removeQuoteChar(x.strip()) for x in line.split(fileData['DELIMITER'])]
else:
rowData = [removeQuoteChar(x.strip()) for x in next(csv.reader([line], delimiter=fileData['DELIMITER'], skipinitialspace=True))]

except Exception:
write_error(rowNum, line, 'Row could not be parsed')
try:
print(line)
except Exception:
pass
return None

if len(''.join(map(str, rowData)).strip()) == 0:
print(f' WARNING: Row {rowNum} is blank')
return '' # skip rows with no data
if 'HEADER_ROW' in fileData:
rowData = dict(zip(fileData['HEADER_ROW'], rowData))
return rowData


def perform_load():
""" Main processing when not in redo only mode """

exit_code = 0
DumpStack.listen()
proc_start_time = time.time()

# Prepare the G2 configuration
Expand Down Expand Up @@ -408,7 +470,7 @@ def perform_load():
print(f'\n{"-"*30} Loading {"-"*30}\n')

# Drop to a single thread for files under 500k
if os.path.getsize(file_path) < (100000 if isCompressedFile(file_path) else 500000):
if os.path.getsize(file_path) < (500000):
print(' Dropping to single thread due to small file size')
transport_thread_count = 1
else:
Expand All @@ -417,11 +479,8 @@ def perform_load():
# Shuffle the source file for performance, unless directed not to or in test mode or single threaded
if not cli_args.noShuffle and not cli_args.testMode and transport_thread_count > 1:

if isCompressedFile(file_path):
print('INFO: Not shuffling compressed file. Please ensure the data was shuffled before compressing!\n')

# If it looks like source file was previously shuffled by G2Loader don't do it again
elif SHUF_NO_DEL_TAG in file_path or SHUF_TAG in file_path:
if SHUF_NO_DEL_TAG in file_path or SHUF_TAG in file_path:

shuf_detected = True
print(f'INFO: Not shuffling source file, previously shuffled. {SHUF_TAG} or {SHUF_NO_DEL_TAG} in file name\n')
Expand Down Expand Up @@ -498,7 +557,7 @@ def perform_load():

file_path = str(shuf_file_path)

file_reader = openPossiblyCompressedFile(file_path, 'r')
file_reader = io.open(file_path, 'r', encoding='utf-8-sig')
# --file_reader = safe_csv_reader(csv.reader(csvFile, fileFormat), cnt_bad_parse)

# Use previously stored header row, so get rid of this one
Expand Down Expand Up @@ -1191,7 +1250,6 @@ def redo_feed(q, debug_trace, redo_mode, redo_mode_interval):
def load_redo_queue_and_process():

exit_code = 0
DumpStack.listen()
proc_start_time = time.time()

thread_list, work_queue = start_loader_process_and_threads(default_thread_count)
Expand Down Expand Up @@ -1436,7 +1494,6 @@ def governor_setup():

signal.signal(signal.SIGINT, signal_int)
signal.signal(signal.SIGTERM, signal_int)
DumpStack.listen()

tmp_path = os.path.join(tempfile.gettempdir(), 'senzing', 'g2')

Expand Down

0 comments on commit c2f7a8c

Please sign in to comment.