diff --git a/README.md b/README.md index 1b8a7d8..acfcf76 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Remember when you ran the analyzer above and saved the current mapping file for Execute the csv_mapper script as follows ... ```console -python csv_mapper.py -i input/test_set1.csv -m mappings/test_set1.map -o output/test_set1.json +python csv_mapper.py -i input/test_set1.csv -m mappings/test_set1.map -o output/test_set1.json -l output/test_set1-statistics.json Processing input/test_set1.csv ... 8 rows processed, 1 rows skipped, complete! diff --git a/csv_functions.json b/csv_functions.json index 6a1ab92..00ede2d 100644 --- a/csv_functions.json +++ b/csv_functions.json @@ -1,7 +1,19 @@ { "GARBAGE_VALUES": [ + "NO LONGER REP", + "NO LONGER REP-", "NULL" ], + "NAME_ENDER_TOKENS": [ + "/", + ";", + "INC" + ], + "NAME_SPLIT_TOKENS": [ + "AKA", + "DBA", + "LLCDBA" + ], "ORGANIZATION_TOKENS": [ "AG", "AND", @@ -23,6 +35,7 @@ "HOLDINGS", "HOSPITAL", "INC", + "INDUSTRIES", "INTERNATIONAL", "INVESTMENT", "INVESTMENTS", @@ -30,6 +43,7 @@ "LTD", "NOMINEES", "PROPERTIES", + "RESTAURANT", "SA", "SERVICES", "THE", diff --git a/csv_functions.py b/csv_functions.py index b22d6bd..68359ff 100644 --- a/csv_functions.py +++ b/csv_functions.py @@ -3,9 +3,14 @@ import argparse import json import re +import random from datetime import datetime import time +try: import probablepeople as pp +except: pp = None +#pp = None + #========================= class csv_functions(): @@ -35,7 +40,9 @@ def __init__(self): "GARBAGE_VALUES", "ORGANIZATION_TOKENS", "PERSON_TOKENS", - "SENZING_ATTRIBUTES" + "SENZING_ATTRIBUTES", + "NAME_SPLIT_TOKENS", + "NAME_ENDER_TOKENS" ] for key in keys: if key not in self.variantJson: @@ -92,36 +99,153 @@ def format_date(self, dateString, outputFormat = None): return None #----------------------------------- - def clean_value(self, valueString): + def clean_value(self, attrName, attrValue): #--remove extra spaces - returnValue = ' '.join(str(valueString).strip().split()) + newValue = ' '.join(str(attrValue).strip().split()) #--whole field must match a garbage value - if returnValue.upper() in self.variantData['GARBAGE_VALUES']: - returnValue = '' - return returnValue + if newValue.upper() in self.variantData['GARBAGE_VALUES']: + self.updateStat('GARBAGE_IN_FIELDS', attrName, newValue.upper()) + return '' + return newValue + + #----------------------------------- + def parse_name(self, nameString): + #--notes: clean name has already been done + #-- sorry names will be forced upper case + #-- returns a dictionary of names + + #--remove in garbage expressions in the string + nameString = nameString.upper() + for garbageValue in self.variantData['GARBAGE_VALUES']: + if garbageValue in nameString: + self.updateStat('GARBAGE_IN_NAMES', garbageValue, nameString) + nameString = nameString.replace(garbageValue,'').strip() + newString = nameString + + primaryNameTokens = [] + secondaryNameTokens = [] + referenceNameTokens = [] + + #--remove tokens in parenthesis + groupedStrings = re.findall('\(.*?\)',newString) + for groupedString in groupedStrings: + self.updateStat('GROUPED_STRINGS', '()', newString + ' | ' + groupedString) + newString = newString.replace(groupedString,'') + referenceNameTokens.append(groupedString) + + #--split the name + theToken = None + split = 0 + for token in newString.replace('.',' ').replace(',',' ').replace('-',' - ').replace('/',' / ').replace(';',' ; ').upper().split(): + if split == 1: + secondaryNameTokens.append(token) + elif split == 2: + referenceNameTokens.append(token) + elif token in self.variantData['NAME_SPLIT_TOKENS']: + #--token is skipped + split=1 + theToken = token + elif token in self.variantData['NAME_ENDER_TOKENS']: + primaryNameTokens.append(token) + split=2 + theToken = token + else: + primaryNameTokens.append(token) + + primaryNameStr = ' '.join(primaryNameTokens) + secondaryNameStr = ' '.join(secondaryNameTokens) + referenceNameStr = ' '.join(referenceNameTokens) + + if secondaryNameStr: + self.updateStat('NAME_SPLITERS', theToken, nameString + ' -> ' + primaryNameStr + ' | ' + secondaryNameStr) + if referenceNameStr and split == 2: + self.updateStat('NAME_ENDERS', theToken, nameString + ' -> ' + primaryNameStr + ' | ' + referenceNameStr) + + #--probable people parser + if pp: + #--pp.tag(name_str) # expected output: (OrderedDict([('PrefixMarital', 'Mr'), ('GivenName', 'George'), ('Nickname', '"Gob"'), ('Surname', 'Bluth'), ('SuffixGenerational', 'II')]), 'Person') + #--pp.tag(corp_str) # expected output: (OrderedDict([('CorporationName', 'Sitwell Housing'), ('CorporationLegalType', 'Inc')]), 'Corporation') + #--PrefixMarital + #--PrefixOther + #--GivenName + #--FirstInitial + #--MiddleName + #--MiddleInitial + #--Surname + #--LastInitial + #--SuffixGenerational + #--SuffixOther + #--Nickname + #--And + #--CorporationName + #--CorporationNameOrganization + #--CorporationLegalType + #--CorporationNamePossessiveOf + #--ShortForm + #--ProxyFor + #--AKA + try: + taggedName, nameType = pp.tag(primaryNameStr) + isOrganization = False if nameType == 'Person' else True + self.updateStat('ProbablePeople', nameType, primaryNameStr) + except: + isOrganization = self.is_organization_name(primaryNameStr) + + #--home grown parser + else: + isOrganization = self.is_organization_name(primaryNameStr) + + if isOrganization: + primaryNameOrg = primaryNameStr + secondaryNameOrg = secondaryNameStr + primaryNameFull = '' + secondaryNameFull = '' + else: + primaryNameOrg = '' + secondaryNameOrg = '' + primaryNameFull = primaryNameStr + secondaryNameFull = secondaryNameStr + nameList = [] + nameList.append({'IS_ORGANIZATION': True}) + nameList.append({'PRIMARY_NAME_ORG': primaryNameOrg}) + nameList.append({'SECONDARY_NAME_ORG': secondaryNameOrg}) + nameList.append({'PRIMARY_NAME_FULL': primaryNameFull}) + nameList.append({'SECONDARY_NAME_FULL': secondaryNameFull}) + nameList.append({'REFERENCE_NAME': referenceNameStr}) + + return nameList + #----------------------------------- def is_organization_name(self, nameString): - if nameString: - priorTokens = [] - for token in nameString.replace('.',' ').replace(',',' ').upper().split(): - if token in self.variantData['ORGANIZATION_TOKENS'] or \ - ' '.join(priorTokens[-2:]) in self.variantData['ORGANIZATION_TOKENS'] or \ - ' '.join(priorTokens[-3:]) in self.variantData['ORGANIZATION_TOKENS']: - return True - priorTokens.append(token) + tokenCnt = 0 + priorTokens = [] + for token in nameString.replace('.',' ').replace(',',' ').replace('-',' ').upper().split(): + if token in self.variantData['ORGANIZATION_TOKENS']: + self.updateStat('ORGANIZATION_TOKENS', token, nameString) + return True + elif ' '.join(priorTokens[-2:]) in self.variantData['ORGANIZATION_TOKENS']: + self.updateStat('ORGANIZATION_TOKENS', ' '.join(priorTokens[-2:]), nameString) + return True + elif ' '.join(priorTokens[-3:]) in self.variantData['ORGANIZATION_TOKENS']: + self.updateStat('ORGANIZATION_TOKENS', ' '.join(priorTokens[-3:]), nameString) + return True + priorTokens.append(token) + tokenCnt += 1 + if tokenCnt > 0: + self.updateStat('PERSONS_BY_TOKEN_COUNT', tokenCnt, nameString) + return False #----------------------------------- def is_person_name(self, nameString): - if nameString: - priorTokens = [] - for token in nameString.replace('.',' ').replace(',',' ').upper().split(): - if token in self.variantData['PERSON_TOKENS'] or \ - ' '.join(priorTokens[-2:]) in self.variantData['PERSON_TOKENS'] or \ - ' '.join(priorTokens[-3:]) in self.variantData['PERSON_TOKENS']: - return True - priorTokens.append(token) + priorTokens = [] + for token in nameString.replace('.',' ').replace(',',' ').replace('-',' ').upper().split(): + if token in self.variantData['PERSON_TOKENS'] or \ + ' '.join(priorTokens[-2:]) in self.variantData['PERSON_TOKENS'] or \ + ' '.join(priorTokens[-3:]) in self.variantData['PERSON_TOKENS']: + return True + priorTokens.append(token) return False #----------------------------------- @@ -154,11 +278,30 @@ def get_senzing_attribute(self, attrName): return self.variantData['SENZING_ATTRIBUTES'][baseName] return {} + #---------------------------------------- + def updateStat(self, cat1, cat2, example = None): + if cat1 not in self.statPack: + self.statPack[cat1] = {} + if cat2 not in self.statPack[cat1]: + self.statPack[cat1][cat2] = {} + self.statPack[cat1][cat2]['count'] = 0 + + self.statPack[cat1][cat2]['count'] += 1 + if example: + if 'examples' not in self.statPack[cat1][cat2]: + self.statPack[cat1][cat2]['examples'] = [] + if example not in self.statPack[cat1][cat2]['examples']: + if len(self.statPack[cat1][cat2]['examples']) < 100: + self.statPack[cat1][cat2]['examples'].append(example) + else: + randomSampleI = random.randint(25,99) + self.statPack[cat1][cat2]['examples'][randomSampleI] = example + return + #---------------------------------------- if __name__ == "__main__": appPath = os.path.dirname(os.path.abspath(sys.argv[0])) - #--test the instance csvFunctions = csv_functions() if not csvFunctions.initialized: diff --git a/csv_mapper.py b/csv_mapper.py index 1c84bae..c3f9ada 100644 --- a/csv_mapper.py +++ b/csv_mapper.py @@ -1,480 +1,495 @@ -import os -import sys -import argparse -import configparser -import signal -import time -from datetime import datetime, timedelta -import json -import csv -import glob -from csv_functions import csv_functions - -#---------------------------------------- -def pause(question='PRESS ENTER TO CONTINUE ...'): - """ pause for debug purposes """ - global shutDown - try: response = input(question) - except KeyboardInterrupt: - response = None - shutDown = True - return response - -#---------------------------------------- -def signal_handler(signal, frame): - global shutDown - print('USER INTERUPT! Shutting down ... (please wait)') - shutDown = True - return - -#---------------------------------------- -def getNextRow(fileInfo): - errCnt = 0 - rowData = None - while not rowData: - - #--quit for consecutive errors - if errCnt >= 10: - fileInfo['ERROR'] = 'YES' - print() - print('Shutdown due to too many errors') - break - - try: line = next(fileInfo['reader']) - except StopIteration: - break - except: - print(' row %s: %s' % (fileInfo['rowCnt'], sys.exc_info()[0])) - fileInfo['skipCnt'] += 1 - errCnt += 1 - continue - fileInfo['rowCnt'] += 1 - if line: #--skip empty lines - - #--csv reader will return a list (mult-char delimiter must be manually split) - if type(line) == list: - row = line - else: - row = [removeQuoteChar(x.strip()) for x in line.split(fileInfo['delimiter'])] - - #--turn into a dictionary if there is a header - if 'header' in fileInfo: - - #--column mismatch - if len(row) != len(fileInfo['header']): - print(' row %s has %s columns, expected %s' % (fileInfo['rowCnt'], len(row), len(fileInfo['header']))) - fileInfo['skipCnt'] += 1 - errCnt += 1 - continue - - #--is it the header row - elif str(row[0]).upper() == fileInfo['header'][0].upper() and str(row[len(row)-1]).upper() == fileInfo['header'][len(fileInfo['header'])-1].upper(): - fileInfo['skipCnt'] += 1 - if fileInfo['rowCnt'] != 1: - print(' row %s contains the header' % fileInfo['rowCnt']) - errCnt += 1 - continue - - #--return a good row - else: - rowData = dict(zip(fileInfo['header'], [str(x).strip() for x in row])) - - else: #--if not just return what should be the header row - fileInfo['skipCnt'] += 1 - rowData = [str(x).strip() for x in row] - - else: - print(' row %s is blank' % fileInfo['rowCnt']) - fileInfo['skipCnt'] += 1 - continue - - return fileInfo, rowData - -#---------------------------------------- -def removeQuoteChar(s): - if len(s)>2 and s[0] + s[-1] in ("''", '""'): - return s[1:-1] - return s - -#---------------------------------------- -def getValue(rowData, expression): - try: rtnValue = expression % rowData - except: - print('warning: could not find %s' % (expression,)) - rtnValue = '' - return rtnValue - -#---------------------------------------- -def processFile(): - """ map a csv file to senzing """ - global shutDown - - #--read the mapping file - if not os.path.exists(mappingFileName): - print() - print('%s does not exist' % mappingFileName) - return -1 - - try: mappingDoc = json.load(open(mappingFileName, 'r')) - except ValueError as err: - print() - print('mapping file error: %s in %s' % (err, mappingFileName)) - return -1 - - #--validate all outputs - errorCnt = 0 - for i in range(len(mappingDoc['outputs'])): - if 'enabled' in mappingDoc['outputs'][i] and mappingDoc['outputs'][i]['enabled'].upper().startswith("N"): - continue - - mappingDoc['outputs'][i]['rowsWritten'] = 0 - mappingDoc['outputs'][i]['rowsSkipped'] = 0 - mappingDoc['outputs'][i]['mappedList'] = [] - mappingDoc['outputs'][i]['unmappedList'] = [] - mappingDoc['outputs'][i]['ignoredList'] = [] - mappingDoc['outputs'][i]['statistics'] = {} - - #--ensure uniqueness of attributes, especially if using labels (usage types) - aggregate = False - labelAttrList = [] - for i1 in range(len(mappingDoc['outputs'][i]['attributes'])): - if mappingDoc['outputs'][i]['attributes'][i1]['attribute'] == '': - if 'mapping' in mappingDoc['outputs'][i]['attributes'][i1]: - mappingDoc['outputs'][i]['ignoredList'].append(mappingDoc['outputs'][i]['attributes'][i1]['mapping'].replace('%(','').replace(')s','')) - continue - elif csv_functions.is_senzing_attribute(mappingDoc['outputs'][i]['attributes'][i1]['attribute']): - mappingDoc['outputs'][i]['mappedList'].append(mappingDoc['outputs'][i]['attributes'][i1]['attribute']) - else: - mappingDoc['outputs'][i]['unmappedList'].append(mappingDoc['outputs'][i]['attributes'][i1]['attribute']) - mappingDoc['outputs'][i]['statistics'][mappingDoc['outputs'][i]['attributes'][i1]['attribute']] = 0 - - if 'label' in mappingDoc['outputs'][i]['attributes'][i1]: - mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] = mappingDoc['outputs'][i]['attributes'][i1]['label'].replace('_', '-') + '_' - else: - mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] = '' - mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] += mappingDoc['outputs'][i]['attributes'][i1]['attribute'] - if mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] in labelAttrList: - errorCnt += 1 - print('attribute %s (%s) is duplicated for output %s!' % (i1, mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'], i)) - else: - labelAttrList.append(mappingDoc['outputs'][i]['attributes'][i1]['label_attribute']) - - if 'subList' in mappingDoc['outputs'][i]['attributes'][i1]: - aggregate = True - - mappingDoc['outputs'][i]['aggregate'] = aggregate - if errorCnt: - return -1 - - #--initialize aggregated record array - totalRowCnt = 0 - aggregatedRecords = {} - - #--open output file - try: outputFileHandle = open(outputFileName, 'w') - except IOError as err: - print('') - print('Could not write to %s \n%s' % (outputFileName, err)) - return -1 - - #--override mapping document with parameters - if inputFileName or 'inputFileName' not in mappingDoc['input']: - mappingDoc['input']['inputFileName'] = inputFileName - #if fieldDelimiter or 'fieldDelimiter' not in mappingDoc['input']: - # mappingDoc['input']['fieldDelimiter'] = fieldDelimiter - #if fileEncoding or 'fileEncoding' not in mappingDoc['input']: - # mappingDoc['input']['fileEncoding'] = fileEncoding - if 'columnHeaders' not in mappingDoc['input']: - mappingDoc['input']['columnHeaders'] = [] - - #--get the input file - if not mappingDoc['input']['inputFileName']: - print('') - print('no input file supplied') - return 1 - fileList = glob.glob(mappingDoc['input']['inputFileName']) - if len(fileList) == 0: - print('') - print('%s not found' % inputFileName) - return 1 - - #--for each input file - for fileName in fileList: - print('') - print('Processing %s ...' % fileName) - currentFile = {} - currentFile['name'] = fileName - currentFile['rowCnt'] = 0 - currentFile['skipCnt'] = 0 - - #--open the file - if 'fileEncoding' in mappingDoc['input'] and mappingDoc['input']['fileEncoding']: - currentFile['fileEncoding'] = mappingDoc['input']['fileEncoding'] - currentFile['handle'] = open(fileName, 'r', encoding=mappingDoc['input']['fileEncoding']) - else: - currentFile['handle'] = open(fileName, 'r') - - #--set the dialect - currentFile['fieldDelimiter'] = mappingDoc['input']['fieldDelimiter'] - if not mappingDoc['input']['fieldDelimiter']: - currentFile['csvDialect'] = csv.Sniffer().sniff(currentFile['handle'].readline(), delimiters='|,\t') - currentFile['handle'].seek(0) - currentFile['fieldDelimiter'] = currentFile['csvDialect'].delimiter - mappingDoc['input']['fieldDelimiter'] = currentFile['csvDialect'].delimiter - elif mappingDoc['input']['fieldDelimiter'].lower() in ('csv', 'comma', ','): - currentFile['csvDialect'] = 'excel' - elif mappingDoc['input']['fieldDelimiter'].lower() in ('tab', 'tsv', '\t'): - currentFile['csvDialect'] = 'excel-tab' - elif mappingDoc['input']['fieldDelimiter'].lower() in ('pipe', '|'): - csv.register_dialect('pipe', delimiter = '|', quotechar = '"') - currentFile['csvDialect'] = 'pipe' - elif len(mappingDoc['input']['fieldDelimiter']) == 1: - csv.register_dialect('other', delimiter = delimiter, quotechar = '"') - currentFile['csvDialect'] = 'other' - elif len(mappingDoc['input']['fieldDelimiter']) > 1: - currentFile['csvDialect'] = 'multi' - else: - currentFile['csvDialect'] = 'excel' - - #--set the reader (csv cannot be used for multi-char delimiters) - if currentFile['csvDialect'] != 'multi': - currentFile['reader'] = csv.reader(currentFile['handle'], dialect=currentFile['csvDialect']) - else: - currentFile['reader'] = currentFile['handle'] - - #--get the current file header row and use it if not one already - currentFile, currentHeaders = getNextRow(currentFile) - if not mappingDoc['input']['columnHeaders']: - mappingDoc['input']['columnHeaders'] = [str(x).replace(' ', '_') for x in currentHeaders] - currentFile['header'] = mappingDoc['input']['columnHeaders'] - - while True: - currentFile, rowData = getNextRow(currentFile) - if not rowData: - break - - totalRowCnt += 1 - rowData['ROW_ID'] = totalRowCnt - - #--clean garbage values - for key in rowData: - rowData[key] = csv_functions.clean_value(rowData[key]) - - #--perform calculations - mappingErrors = 0 - if 'calculations' in mappingDoc: - for calcDict in mappingDoc['calculations']: - try: rowData[list(calcDict.keys())[0]] = eval(list(calcDict.values())[0]) - except Exception as err: - print(' error: %s [%s]' % (calcDict['attribute'], err)) - mappingErrors += 1 - - #print(json.dumps(rowData, indent=4)) - #pause() - - #--process the record for each output - for i in range(len(mappingDoc['outputs'])): - if 'enabled' in mappingDoc['outputs'][i] and mappingDoc['outputs'][i]['enabled'].upper().startswith("N"): - continue - - if 'filter' in mappingDoc['outputs'][i]: - try: skipRow = eval(mappingDoc['outputs'][i]['filter']) - except Exception as err: - skipRow = False - print(' filter error: %s [%s]' % (mappingDoc['outputs'][i]['filter'], err)) - if skipRow: - mappingDoc['outputs'][i]['rowsSkipped'] += 1 - continue - - - dataSource = getValue(rowData, mappingDoc['outputs'][i]['data_source']) - if 'entity_type' in mappingDoc['outputs'][i]: - entityType = getValue(rowData, mappingDoc['outputs'][i]['entity_type']) - else: - entityType = dataSource - - entityKey = None - recordID = None - uniqueKey = None - if 'entity_key' in mappingDoc['outputs'][i]: - entityKey = getValue(rowData, mappingDoc['outputs'][i]['entity_key']) - uniqueKey = dataSource + '|' + entityKey - elif 'record_id' in mappingDoc['outputs'][i]: - recordID = getValue(rowData, mappingDoc['outputs'][i]['record_id']) - uniqueKey = dataSource + '|' + recordID - - rootValues = {} - subListValues = {} - for attrDict in mappingDoc['outputs'][i]['attributes']: - if attrDict['attribute'] == '': - continue - - attrValue = getValue(rowData, attrDict['mapping']) - if attrValue: - mappingDoc['outputs'][i]['statistics'][attrDict['attribute']] += 1 - if 'subList' in attrDict: - if attrDict['subList'] not in subListValues: - subListValues[attrDict['subList']] = {} - subListValues[attrDict['subList']][attrDict['label_attribute']] = attrValue - else: - rootValues[attrDict['label_attribute']] = attrValue - - #--complete the json record - jsonData = {} - for subList in subListValues: - jsonData[subList] = [subListValues[subList]] - jsonData['DATA_SOURCE'] = dataSource - jsonData['ENTITY_TYPE'] = entityType - if entityKey: - jsonData['ENTITY_KEY'] = entityKey - elif recordID: - jsonData['RECORD_ID'] = recordID - jsonData.update(rootValues) - - #--just output if not aggregating - if not mappingDoc['outputs'][i]['aggregate']: - try: outputFileHandle.write(json.dumps(jsonData) + '\n') - except IOError as err: - print('') - print('Could no longer write to %s \n%s' % (outputFileName, err)) - shutDown = True - break - mappingDoc['outputs'][i]['rowsWritten'] += 1 - else: - if uniqueKey not in aggregatedRecords: - mappingDoc['outputs'][i]['rowsWritten'] += 1 - aggregatedRecords[uniqueKey] = jsonData - else: - #--update root attributes - for attribute in jsonData: - if type(jsonData[attribute]) != list: - #--append missing - if attribute not in aggregatedRecords[uniqueKey]: - aggregatedRecords[uniqueKey][attribute] = jsonData[attribute] - else: - if jsonData[attribute] != aggregatedRecords[uniqueKey][attribute]: - print(' %s update ignored ... [%s] vs [%s]' % (attribute, jsonData[attribute], aggregatedRecords[uniqueKey][attribute])) - #--do not update for now... just not sure how! - - #--aggregate distinct subLists - for subList in subListValues: - subRecord = subListValues[subList] - if subList not in aggregatedRecords[uniqueKey]: - aggregatedRecords[uniqueKey][subList] = [] - - if subRecord not in aggregatedRecords[uniqueKey][subList]: - aggregatedRecords[uniqueKey][subList].append(subRecord) - jsonData = aggregatedRecords[uniqueKey] - - if debugOn: - print(json.dumps(jsonData, indent=4)) - pause() - - #--break conditions - if shutDown: - break - elif 'ERROR' in currentFile: - break - - if currentFile['rowCnt'] % 10000 == 0: - print(' %s rows processed, %s rows skipped' % (currentFile['rowCnt'], currentFile['skipCnt'])) - - currentFile['handle'].close() - if shutDown: - break - else: - print(' %s rows processed, %s rows skipped, complete!' % (currentFile['rowCnt'], currentFile['skipCnt'])) - - #-write aggregated records to file - if aggregatedRecords: - print('writing aggregated records to output file ...') - if not shutDown: - rowCnt = 0 - for uniqueKey in aggregatedRecords: - try: outputFileHandle.write(json.dumps(aggregatedRecords[uniqueKey]) + '\n') - except IOError as err: - print('') - print('Could not longer write to %s \n%s' % (outputFileName, err)) - print('') - shutDown = True - break - rowCnt += 1 - if rowCnt % 10000 == 0: - print(' %s rows processed' % rowCnt) - if not shutDown: - print(' %s rows processed, complete!' % rowCnt) - - #--close all inputs and outputs - outputFileHandle.close() - - for i in range(len(mappingDoc['outputs'])): - print() - print('OUTPUT #%s ...' % i) - print(' %s rows written' % mappingDoc['outputs'][i]['rowsWritten']) - print(' %s rows skipped' % mappingDoc['outputs'][i]['rowsSkipped']) - print() - print(' MAPPED ATTRIBUTES:') - for attribute in mappingDoc['outputs'][i]['mappedList']: - percentPopulated = round(mappingDoc['outputs'][i]['statistics'][attribute] / mappingDoc['outputs'][i]['rowsWritten'] * 100, 2) - print(' %s %10d %s %%' % (attribute.lower().ljust(30,'.'), mappingDoc['outputs'][i]['statistics'][attribute], percentPopulated)) - if mappingDoc['outputs'][i]['unmappedList']: - print() - print(' UNMAPPED ATTRIBUTES:') - for attribute in mappingDoc['outputs'][i]['unmappedList']: - percentPopulated = round(mappingDoc['outputs'][i]['statistics'][attribute] / mappingDoc['outputs'][i]['rowsWritten'] * 100, 2) - print(' %s %10d %s %%' % (attribute.lower().ljust(30,'.'), mappingDoc['outputs'][i]['statistics'][attribute], percentPopulated)) - if mappingDoc['outputs'][i]['ignoredList']: - print() - print(' COLUMNS IGNORED: \n %s' % ', '.join([i.lower() for i in mappingDoc['outputs'][i]['ignoredList']])) - - if shutDown: - return -1 - - #for fileName in openFiles: - # openFiles[fileName].close() - - return 0 - -#---------------------------------------- -if __name__ == '__main__': - procStartTime = time.time() - shutDown = False - signal.signal(signal.SIGINT, signal_handler) - - parser = argparse.ArgumentParser() - parser.add_argument('-m', '--mappingFileName', dest='mappingFileName', help='the name of a mapping file') - parser.add_argument('-i', '--inputFileName', dest='inputFileName', help='the name of a csv input file') - parser.add_argument('-d', '--delimiterChar', dest='delimiterChar', help='delimiter character') - parser.add_argument('-e', '--fileEncoding', dest='fileEncoding', help='file encoding') - parser.add_argument('-o', '--outputFileName', dest='outputFileName', help='the name of the output file') - parser.add_argument('-D', '--debugOn', dest='debugOn', action='store_true', default=False, help='run in debug mode') - args = parser.parse_args() - mappingFileName = args.mappingFileName - inputFileName = args.inputFileName - delimiterChar = args.delimiterChar - fileEncoding = args.fileEncoding - outputFileName = args.outputFileName - debugOn = args.debugOn - - #--validations - if not mappingFileName: - print('a mapping file must be specified with -m') - sys.exit(1) - if not outputFileName: - print('an output file must be specified with -o') - sys.exit(1) - - csv_functions = csv_functions() - if not csv_functions.initialized: - sys.exit(1) - - result = processFile() - - print('') - if result != 0: - print('process aborted!') - else: - print('process completed!') - print('') - - sys.exit(result) +import os +import sys +import argparse +import configparser +import signal +import time +from datetime import datetime, timedelta +import json +import csv +import global +from csv_functions import csv_functions + +#---------------------------------------- +def pause(question='PRESS ENTER TO CONTINUE ...'): + """ pause for debug purposes """ + global shutDown + try: response = input(question) + except KeyboardInterrupt: + response = None + shutDown = True + return response + +#---------------------------------------- +def signal_handler(signal, frame): + global shutDown + print('USER INTERUPT! Shutting down ... (please wait)') + shutDown = True + return + +#---------------------------------------- +def getNextRow(fileInfo): + errCnt = 0 + rowData = None + while not rowData: + + #--quit for consecutive errors + if errCnt >= 10: + fileInfo['ERROR'] = 'YES' + print() + print('Shutdown due to too many errors') + break + + try: line = next(fileInfo['reader']) + except StopIteration: + break + except: + print(' row %s: %s' % (fileInfo['rowCnt'], sys.exc_info()[0])) + fileInfo['skipCnt'] += 1 + errCnt += 1 + continue + fileInfo['rowCnt'] += 1 + if line: #--skip empty lines + + #--csv reader will return a list (mult-char delimiter must be manually split) + if type(line) == list: + row = line + else: + row = [removeQuoteChar(x.strip()) for x in line.split(fileInfo['delimiter'])] + + #--turn into a dictionary if there is a header + if 'header' in fileInfo: + + #--column mismatch + if len(row) != len(fileInfo['header']): + print(' row %s has %s columns, expected %s' % (fileInfo['rowCnt'], len(row), len(fileInfo['header']))) + fileInfo['skipCnt'] += 1 + errCnt += 1 + continue + + #--is it the header row + elif str(row[0]).upper() == fileInfo['header'][0].upper() and str(row[len(row)-1]).upper() == fileInfo['header'][len(fileInfo['header'])-1].upper(): + fileInfo['skipCnt'] += 1 + if fileInfo['rowCnt'] != 1: + print(' row %s contains the header' % fileInfo['rowCnt']) + errCnt += 1 + continue + + #--return a good row + else: + rowData = dict(zip(fileInfo['header'], [str(x).strip() for x in row])) + + else: #--if not just return what should be the header row + fileInfo['skipCnt'] += 1 + rowData = [str(x).strip() for x in row] + + else: + print(' row %s is blank' % fileInfo['rowCnt']) + fileInfo['skipCnt'] += 1 + continue + + return fileInfo, rowData + +#---------------------------------------- +def removeQuoteChar(s): + if len(s)>2 and s[0] + s[-1] in ("''", '""'): + return s[1:-1] + return s + +#---------------------------------------- +def getValue(rowData, expression): + try: rtnValue = expression % rowData + except: + print('warning: could not map %s' % (expression,)) + rtnValue = '' + return rtnValue + +#---------------------------------------- +def processFile(): + global shutDown + + #--read the mapping file + if not os.path.exists(mappingFileName): + print() + print('%s does not exist' % mappingFileName) + return -1 + + try: mappingDoc = json.load(open(mappingFileName, 'r')) + except ValueError as err: + print() + print('mapping file error: %s in %s' % (err, mappingFileName)) + return -1 + + #--validate all outputs + errorCnt = 0 + for i in range(len(mappingDoc['outputs'])): + if 'enabled' in mappingDoc['outputs'][i] and mappingDoc['outputs'][i]['enabled'].upper().startswith("N"): + continue + + mappingDoc['outputs'][i]['rowsWritten'] = 0 + mappingDoc['outputs'][i]['rowsSkipped'] = 0 + mappingDoc['outputs'][i]['mappedList'] = [] + mappingDoc['outputs'][i]['unmappedList'] = [] + mappingDoc['outputs'][i]['ignoredList'] = [] + mappingDoc['outputs'][i]['statistics'] = {} + + #--ensure uniqueness of attributes, especially if using labels (usage types) + aggregate = False + labelAttrList = [] + for i1 in range(len(mappingDoc['outputs'][i]['attributes'])): + if mappingDoc['outputs'][i]['attributes'][i1]['attribute'] == '': + if 'mapping' in mappingDoc['outputs'][i]['attributes'][i1]: + mappingDoc['outputs'][i]['ignoredList'].append(mappingDoc['outputs'][i]['attributes'][i1]['mapping'].replace('%(','').replace(')s','')) + continue + elif csv_functions.is_senzing_attribute(mappingDoc['outputs'][i]['attributes'][i1]['attribute']): + mappingDoc['outputs'][i]['mappedList'].append(mappingDoc['outputs'][i]['attributes'][i1]['attribute']) + else: + mappingDoc['outputs'][i]['unmappedList'].append(mappingDoc['outputs'][i]['attributes'][i1]['attribute']) + mappingDoc['outputs'][i]['statistics'][mappingDoc['outputs'][i]['attributes'][i1]['attribute']] = 0 + + if 'label' in mappingDoc['outputs'][i]['attributes'][i1]: + mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] = mappingDoc['outputs'][i]['attributes'][i1]['label'].replace('_', '-') + '_' + else: + mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] = '' + mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] += mappingDoc['outputs'][i]['attributes'][i1]['attribute'] + if mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'] in labelAttrList: + errorCnt += 1 + print('attribute %s (%s) is duplicated for output %s!' % (i1, mappingDoc['outputs'][i]['attributes'][i1]['label_attribute'], i)) + else: + labelAttrList.append(mappingDoc['outputs'][i]['attributes'][i1]['label_attribute']) + + if 'subList' in mappingDoc['outputs'][i]['attributes'][i1]: + aggregate = True + + mappingDoc['outputs'][i]['aggregate'] = aggregate + if errorCnt: + return -1 + + #--initialize aggregated record array + totalRowCnt = 0 + aggregatedRecords = {} + + #--open output file + try: outputFileHandle = open(outputFileName, 'w') + except IOError as err: + print('') + print('Could not write to %s \n%s' % (outputFileName, err)) + return -1 + + #--override mapping document with parameters + if inputFileName or 'inputFileName' not in mappingDoc['input']: + mappingDoc['input']['inputFileName'] = inputFileName + #if fieldDelimiter or 'fieldDelimiter' not in mappingDoc['input']: + # mappingDoc['input']['fieldDelimiter'] = fieldDelimiter + #if fileEncoding or 'fileEncoding' not in mappingDoc['input']: + # mappingDoc['input']['fileEncoding'] = fileEncoding + if 'columnHeaders' not in mappingDoc['input']: + mappingDoc['input']['columnHeaders'] = [] + + #--get the input file + if not mappingDoc['input']['inputFileName']: + print('') + print('no input file supplied') + return 1 + fileList = glob.glob(mappingDoc['input']['inputFileName']) + if len(fileList) == 0: + print('') + print('%s not found' % inputFileName) + return 1 + + #--for each input file + for fileName in fileList: + print('') + print('Processing %s ...' % fileName) + currentFile = {} + currentFile['name'] = fileName + currentFile['rowCnt'] = 0 + currentFile['skipCnt'] = 0 + + #--open the file + if 'fileEncoding' in mappingDoc['input'] and mappingDoc['input']['fileEncoding']: + currentFile['fileEncoding'] = mappingDoc['input']['fileEncoding'] + currentFile['handle'] = open(fileName, 'r', encoding=mappingDoc['input']['fileEncoding']) + else: + currentFile['handle'] = open(fileName, 'r') + + #--set the dialect + currentFile['fieldDelimiter'] = mappingDoc['input']['fieldDelimiter'] + if not mappingDoc['input']['fieldDelimiter']: + currentFile['csvDialect'] = csv.Sniffer().sniff(currentFile['handle'].readline(), delimiters='|,\t') + currentFile['handle'].seek(0) + currentFile['fieldDelimiter'] = currentFile['csvDialect'].delimiter + mappingDoc['input']['fieldDelimiter'] = currentFile['csvDialect'].delimiter + elif mappingDoc['input']['fieldDelimiter'].lower() in ('csv', 'comma', ','): + currentFile['csvDialect'] = 'excel' + elif mappingDoc['input']['fieldDelimiter'].lower() in ('tab', 'tsv', '\t'): + currentFile['csvDialect'] = 'excel-tab' + elif mappingDoc['input']['fieldDelimiter'].lower() in ('pipe', '|'): + csv.register_dialect('pipe', delimiter = '|', quotechar = '"') + currentFile['csvDialect'] = 'pipe' + elif len(mappingDoc['input']['fieldDelimiter']) == 1: + csv.register_dialect('other', delimiter = delimiter, quotechar = '"') + currentFile['csvDialect'] = 'other' + elif len(mappingDoc['input']['fieldDelimiter']) > 1: + currentFile['csvDialect'] = 'multi' + else: + currentFile['csvDialect'] = 'excel' + + #--set the reader (csv cannot be used for multi-char delimiters) + if currentFile['csvDialect'] != 'multi': + currentFile['reader'] = csv.reader(currentFile['handle'], dialect=currentFile['csvDialect']) + else: + currentFile['reader'] = currentFile['handle'] + + #--get the current file header row and use it if not one already + currentFile, currentHeaders = getNextRow(currentFile) + if not mappingDoc['input']['columnHeaders']: + mappingDoc['input']['columnHeaders'] = [str(x).replace(' ', '_') for x in currentHeaders] + currentFile['header'] = mappingDoc['input']['columnHeaders'] + + while True: + currentFile, rowData = getNextRow(currentFile) + if not rowData: + break + + totalRowCnt += 1 + rowData['ROW_ID'] = totalRowCnt + + #--clean garbage values + for key in rowData: + rowData[key] = csv_functions.clean_value(key, rowData[key]) + + #--perform calculations + mappingErrors = 0 + if 'calculations' in mappingDoc: + for calcDict in mappingDoc['calculations']: + try: newValue = eval(list(calcDict.values())[0]) + except Exception as err: + print(' error: %s [%s]' % (list(calcDict.keys())[0], err)) + mappingErrors += 1 + else: + if type(newValue) == list: + for newItem in newValue: + rowData.update(newItem) + else: + rowData[list(calcDict.keys())[0]] = newValue + + if debugOn: + print(json.dumps(rowData, indent=4)) + pause() + + #--process the record for each output + for i in range(len(mappingDoc['outputs'])): + if 'enabled' in mappingDoc['outputs'][i] and mappingDoc['outputs'][i]['enabled'].upper().startswith("N"): + continue + + if 'filter' in mappingDoc['outputs'][i]: + try: skipRow = eval(mappingDoc['outputs'][i]['filter']) + except Exception as err: + skipRow = False + print(' filter error: %s [%s]' % (mappingDoc['outputs'][i]['filter'], err)) + if skipRow: + mappingDoc['outputs'][i]['rowsSkipped'] += 1 + continue + + + dataSource = getValue(rowData, mappingDoc['outputs'][i]['data_source']) + if 'entity_type' in mappingDoc['outputs'][i]: + entityType = getValue(rowData, mappingDoc['outputs'][i]['entity_type']) + else: + entityType = dataSource + + entityKey = None + recordID = None + uniqueKey = None + if 'entity_key' in mappingDoc['outputs'][i]: + entityKey = getValue(rowData, mappingDoc['outputs'][i]['entity_key']) + uniqueKey = dataSource + '|' + entityKey + elif 'record_id' in mappingDoc['outputs'][i]: + recordID = getValue(rowData, mappingDoc['outputs'][i]['record_id']) + uniqueKey = dataSource + '|' + recordID + + rootValues = {} + subListValues = {} + for attrDict in mappingDoc['outputs'][i]['attributes']: + if attrDict['attribute'] == '': + continue + + attrValue = getValue(rowData, attrDict['mapping']) + if attrValue: + mappingDoc['outputs'][i]['statistics'][attrDict['attribute']] += 1 + if 'subList' in attrDict: + if attrDict['subList'] not in subListValues: + subListValues[attrDict['subList']] = {} + subListValues[attrDict['subList']][attrDict['label_attribute']] = attrValue + else: + rootValues[attrDict['label_attribute']] = attrValue + + #--complete the json record + jsonData = {} + for subList in subListValues: + jsonData[subList] = [subListValues[subList]] + jsonData['DATA_SOURCE'] = dataSource + jsonData['ENTITY_TYPE'] = entityType + if entityKey: + jsonData['ENTITY_KEY'] = entityKey + elif recordID: + jsonData['RECORD_ID'] = recordID + jsonData.update(rootValues) + + #--just output if not aggregating + if not mappingDoc['outputs'][i]['aggregate']: + try: outputFileHandle.write(json.dumps(jsonData) + '\n') + except IOError as err: + print('') + print('Could no longer write to %s \n%s' % (outputFileName, err)) + shutDown = True + break + mappingDoc['outputs'][i]['rowsWritten'] += 1 + else: + if uniqueKey not in aggregatedRecords: + mappingDoc['outputs'][i]['rowsWritten'] += 1 + aggregatedRecords[uniqueKey] = jsonData + else: + #--update root attributes + for attribute in jsonData: + if type(jsonData[attribute]) != list: + #--append missing + if attribute not in aggregatedRecords[uniqueKey]: + aggregatedRecords[uniqueKey][attribute] = jsonData[attribute] + else: + if jsonData[attribute] != aggregatedRecords[uniqueKey][attribute]: + print(' %s update ignored ... [%s] vs [%s]' % (attribute, jsonData[attribute], aggregatedRecords[uniqueKey][attribute])) + #--do not update for now... just not sure how! + + #--aggregate distinct subLists + for subList in subListValues: + subRecord = subListValues[subList] + if subList not in aggregatedRecords[uniqueKey]: + aggregatedRecords[uniqueKey][subList] = [] + + if subRecord not in aggregatedRecords[uniqueKey][subList]: + aggregatedRecords[uniqueKey][subList].append(subRecord) + jsonData = aggregatedRecords[uniqueKey] + + if debugOn: + print(json.dumps(jsonData, indent=4)) + pause() + + #--break conditions + if shutDown: + break + elif 'ERROR' in currentFile: + break + + if currentFile['rowCnt'] % 10000 == 0: + print(' %s rows processed, %s rows skipped' % (currentFile['rowCnt'], currentFile['skipCnt'])) + + currentFile['handle'].close() + if shutDown: + break + else: + print(' %s rows processed, %s rows skipped, complete!' % (currentFile['rowCnt'], currentFile['skipCnt'])) + + #-write aggregated records to file + if aggregatedRecords: + print('writing aggregated records to output file ...') + if not shutDown: + rowCnt = 0 + for uniqueKey in aggregatedRecords: + try: outputFileHandle.write(json.dumps(aggregatedRecords[uniqueKey]) + '\n') + except IOError as err: + print('') + print('Could not longer write to %s \n%s' % (outputFileName, err)) + print('') + shutDown = True + break + rowCnt += 1 + if rowCnt % 10000 == 0: + print(' %s rows processed' % rowCnt) + if not shutDown: + print(' %s rows processed, complete!' % rowCnt) + + #--close all inputs and outputs + outputFileHandle.close() + + for i in range(len(mappingDoc['outputs'])): + print() + print('OUTPUT #%s ...' % i) + print(' %s rows written' % mappingDoc['outputs'][i]['rowsWritten']) + print(' %s rows skipped' % mappingDoc['outputs'][i]['rowsSkipped']) + print() + print(' MAPPED ATTRIBUTES:') + for attribute in mappingDoc['outputs'][i]['mappedList']: + percentPopulated = round(mappingDoc['outputs'][i]['statistics'][attribute] / mappingDoc['outputs'][i]['rowsWritten'] * 100, 2) + print(' %s %10d %s %%' % (attribute.lower().ljust(30,'.'), mappingDoc['outputs'][i]['statistics'][attribute], percentPopulated)) + if mappingDoc['outputs'][i]['unmappedList']: + print() + print(' UNMAPPED ATTRIBUTES:') + for attribute in mappingDoc['outputs'][i]['unmappedList']: + percentPopulated = round(mappingDoc['outputs'][i]['statistics'][attribute] / mappingDoc['outputs'][i]['rowsWritten'] * 100, 2) + print(' %s %10d %s %%' % (attribute.lower().ljust(30,'.'), mappingDoc['outputs'][i]['statistics'][attribute], percentPopulated)) + if mappingDoc['outputs'][i]['ignoredList']: + print() + print(' COLUMNS IGNORED: \n %s' % ', '.join([i.lower() for i in mappingDoc['outputs'][i]['ignoredList']])) + + if shutDown: + return -1 + + #for fileName in openFiles: + # openFiles[fileName].close() + + return 0 + +#---------------------------------------- +if __name__ == '__main__': + procStartTime = time.time() + shutDown = False + signal.signal(signal.SIGINT, signal_handler) + + parser = argparse.ArgumentParser() + parser.add_argument('-m', '--mappingFileName', dest='mappingFileName', help='the name of a mapping file') + parser.add_argument('-i', '--inputFileName', dest='inputFileName', help='the name of a csv input file') + parser.add_argument('-d', '--delimiterChar', dest='delimiterChar', help='delimiter character') + parser.add_argument('-e', '--fileEncoding', dest='fileEncoding', help='file encoding') + parser.add_argument('-o', '--outputFileName', dest='outputFileName', help='the name of the output file') + parser.add_argument('-l', '--log_file', dest='logFileName', help='optional statistics filename (json format).') + parser.add_argument('-D', '--debugOn', dest='debugOn', action='store_true', default=False, help='run in debug mode') + args = parser.parse_args() + mappingFileName = args.mappingFileName + inputFileName = args.inputFileName + delimiterChar = args.delimiterChar + fileEncoding = args.fileEncoding + outputFileName = args.outputFileName + logFileName = args.logFileName + debugOn = args.debugOn + + #--validations + if not mappingFileName: + print('a mapping file must be specified with -m') + sys.exit(1) + if not outputFileName: + print('an output file must be specified with -o') + sys.exit(1) + + csv_functions = csv_functions() + if not csv_functions.initialized: + sys.exit(1) + + result = processFile() + + if logFileName: + print('') + with open(logFileName, 'w') as f: + json.dump(csv_functions.statPack, f, indent=4, sort_keys = True) + print('Mapping stats written to %s' % logFileName) + + + print('') + if result != 0: + print('process aborted!') + else: + print('process completed!') + print('') + + sys.exit(result) diff --git a/csv_search.py b/csv_search.py new file mode 100644 index 0000000..86297cb --- /dev/null +++ b/csv_search.py @@ -0,0 +1,830 @@ +import os +import sys +import argparse +import configparser +import signal +import time +from datetime import datetime, timedelta +import csv +import json +import glob +from csv_functions import csv_functions + +#--senzing python classes +try: + from G2Database import G2Database + from G2Exception import G2Exception + from G2Engine import G2Engine +except: + print('') + print('Please export PYTHONPATH=') + print('') + sys.exit(1) + +#--see if a g2 config manager present - v1.12+ +try: + from G2IniParams import G2IniParams + from G2ConfigMgr import G2ConfigMgr +except: G2ConfigMgr = None + +#---------------------------------------- +def pause(question='PRESS ENTER TO CONTINUE ...'): + """ pause for debug purposes """ + try: response = input(question) + except KeyboardInterrupt: + response = None + global shutDown + shutDown = True + return response + +#---------------------------------------- +def signal_handler(signal, frame): + print('USER INTERUPT! Shutting down ... (please wait)') + global shutDown + shutDown = True + return + +#---------------------------------------- +def getNextRow(fileInfo): + errCnt = 0 + rowData = None + while not rowData: + + #--quit for consecutive errors + if errCnt >= 10: + fileInfo['ERROR'] = 'YES' + print() + print('Shutdown due to too many errors') + break + + try: line = next(fileInfo['reader']) + except StopIteration: + break + except: + print(' row %s: %s' % (fileInfo['rowCnt'], sys.exc_info()[0])) + fileInfo['skipCnt'] += 1 + errCnt += 1 + continue + fileInfo['rowCnt'] += 1 + if line: #--skip empty lines + + #--csv reader will return a list (mult-char delimiter must be manually split) + if type(line) == list: + row = line + else: + row = [removeQuoteChar(x.strip()) for x in line.split(fileInfo['delimiter'])] + + #--turn into a dictionary if there is a header + if 'header' in fileInfo: + + #--column mismatch + if len(row) != len(fileInfo['header']): + print(' row %s has %s columns, expected %s' % (fileInfo['rowCnt'], len(row), len(fileInfo['header']))) + fileInfo['skipCnt'] += 1 + errCnt += 1 + continue + + #--is it the header row + elif str(row[0]).upper() == fileInfo['header'][0].upper() and str(row[len(row)-1]).upper() == fileInfo['header'][len(fileInfo['header'])-1].upper(): + fileInfo['skipCnt'] += 1 + if fileInfo['rowCnt'] != 1: + print(' row %s contains the header' % fileInfo['rowCnt']) + errCnt += 1 + continue + + #--return a good row + else: + rowData = dict(zip(fileInfo['header'], [str(x).strip() for x in row])) + + else: #--if not just return what should be the header row + fileInfo['skipCnt'] += 1 + rowData = [str(x).strip() for x in row] + + else: + print(' row %s is blank' % fileInfo['rowCnt']) + fileInfo['skipCnt'] += 1 + continue + + return fileInfo, rowData + +#---------------------------------------- +def removeQuoteChar(s): + if len(s)>2 and s[0] + s[-1] in ("''", '""'): + return s[1:-1] + return s + +#---------------------------------------- +def getValue(rowData, expression): + try: rtnValue = expression % rowData + except: + print('warning: could not map %s' % (expression,)) + rtnValue = '' + return rtnValue + +#---------------------------------------- +def processFile(): + global shutDown + + #--read the mapping file + if not os.path.exists(mappingFileName): + print() + print('%s does not exist' % mappingFileName) + return -1 + + try: mappingDoc = json.load(open(mappingFileName, 'r')) + except ValueError as err: + print() + print('mapping file error: %s in %s' % (err, mappingFileName)) + return -1 + + #--upper case value replacements + if 'columnHeaders' in mappingDoc['input']: + mappingDoc['input']['columnHeaders'] = [x.upper() for x in mappingDoc['input']['columnHeaders']] + for ii in range(len(mappingDoc['search']['attributes'])): + mappingDoc['search']['attributes'][ii]['mapping'] = mappingDoc['search']['attributes'][ii]['mapping'].upper().replace(')S', ')s') + for ii in range(len(mappingDoc['output']['columns'])): + mappingDoc['output']['columns'][ii]['value'] = mappingDoc['output']['columns'][ii]['value'].upper().replace(')S', ')s') + + #--build output headers + recordDataRequested = False + outputHeaders = [] + for ii in range(len(mappingDoc['output']['columns'])): + columnName = mappingDoc['output']['columns'][ii]['name'].upper() + mappingDoc['output']['columns'][ii]['name'] = columnName + outputHeaders.append(columnName) + if mappingDoc['output']['columns'][ii]['source'].upper() == 'RECORD': + recordDataRequested = True + mappingDoc['output']['outputHeaders'] = outputHeaders + + #--use minimal format unless record data requested + #--initialize search flags + #-- + #-- G2_ENTITY_MINIMAL_FORMAT = ( 1 << 18 ) + #-- G2_ENTITY_BRIEF_FORMAT = ( 1 << 20 ) + #-- G2_ENTITY_INCLUDE_NO_FEATURES + #-- + #-- G2_EXPORT_INCLUDE_RESOLVED = ( 1 << 2 ) + #-- G2_EXPORT_INCLUDE_POSSIBLY_SAME = ( 1 << 3 ) + #-- + searchFlags = g2Engine.G2_ENTITY_INCLUDE_NO_RELATIONS + if recordDataRequested: + searchFlags = searchFlags | g2Engine.G2_ENTITY_INCLUDE_NO_FEATURES + else: + searchFlags = searchFlags | g2Engine.G2_ENTITY_MINIMAL_FORMAT + + if 'matchLevelFilter' not in mappingDoc['output'] or int(mappingDoc['output']['matchLevelFilter']) < 1: + mappingDoc['output']['matchLevelFilter'] = 99 + else: + mappingDoc['output']['matchLevelFilter'] = int(mappingDoc['output']['matchLevelFilter']) + if mappingDoc['output']['matchLevelFilter'] == 1: + searchFlags = searchFlags | g2Engine.G2_EXPORT_INCLUDE_RESOLVED + elif mappingDoc['output']['matchLevelFilter'] == 2: + searchFlags = searchFlags | g2Engine.G2_EXPORT_INCLUDE_RESOLVED | g2Engine.G2_EXPORT_INCLUDE_POSSIBLY_SAME + + if 'nameScoreFilter' not in mappingDoc['output']: + mappingDoc['output']['nameScoreFilter'] = 0 + else: + mappingDoc['output']['nameScoreFilter'] = int(mappingDoc['output']['nameScoreFilter']) + + if 'dataSourceFilter' not in mappingDoc['output']: + mappingDoc['output']['dataSourceFilter'] = None + else: + mappingDoc['output']['dataSourceFilter'] = mappingDoc['output']['dataSourceFilter'].upper() + + if 'maxReturnCount' not in mappingDoc['output']: + mappingDoc['output']['maxReturnCount'] = 1 + else: + mappingDoc['output']['maxReturnCount'] = int(mappingDoc['output']['maxReturnCount']) + + #--open the output file + if outputFileName: + fileName = outputFileName + else: + fileName = mappingDoc['output']['fileName'] + outputFileHandle = open(fileName, 'w', encoding='utf-8', newline='') + mappingDoc['output']['fileHandle'] = outputFileHandle + if mappingDoc['output']['fileType'] != 'JSON': + mappingDoc['output']['fileWriter'] = csv.writer(mappingDoc['output']['fileHandle'], dialect=csv.excel, quoting=csv.QUOTE_MINIMAL) + mappingDoc['output']['fileWriter'].writerow(outputHeaders) + + #--upper case value replacements + #for ii in range(len(mappingDoc['search']['attributes'])): + # mappingDoc['search']['attributes'][ii]['value'] = mappingDoc['search']['attributes'][ii]['value'].upper().replace(')S', ')s') + #for ii in range(len(mappingDoc['output']['columns'])): + # mappingDoc['output']['columns'][ii]['value'] = mappingDoc['output']['columns'][ii]['value'].upper().replace(')S', ')s') + + #--initialize the stats + scoreCounts = {} + scoreCounts['best'] = {} + scoreCounts['best']['total'] = 0 + scoreCounts['best']['>=100'] = 0 + scoreCounts['best']['>=95'] = 0 + scoreCounts['best']['>=90'] = 0 + scoreCounts['best']['>=85'] = 0 + scoreCounts['best']['>=80'] = 0 + scoreCounts['best']['>=75'] = 0 + scoreCounts['best']['>=70'] = 0 + scoreCounts['best']['<70'] = 0 + scoreCounts['additional'] = {} + scoreCounts['additional']['total'] = 0 + scoreCounts['additional']['>=100'] = 0 + scoreCounts['additional']['>=95'] = 0 + scoreCounts['additional']['>=90'] = 0 + scoreCounts['additional']['>=85'] = 0 + scoreCounts['additional']['>=80'] = 0 + scoreCounts['additional']['>=75'] = 0 + scoreCounts['additional']['>=70'] = 0 + scoreCounts['additional']['<70'] = 0 + scoreCounts['name'] = {} + scoreCounts['name']['total'] = 0 + scoreCounts['name']['=100'] = 0 + scoreCounts['name']['>=95'] = 0 + scoreCounts['name']['>=90'] = 0 + scoreCounts['name']['>=85'] = 0 + scoreCounts['name']['>=80'] = 0 + scoreCounts['name']['>=75'] = 0 + scoreCounts['name']['>=70'] = 0 + scoreCounts['name']['<70'] = 0 + rowsSkipped = 0 + rowsMatched = 0 + rowsNotMatched = 0 + resolvedMatches = 0 + possibleMatches = 0 + possiblyRelateds = 0 + nameOnlyMatches = 0 + + mappingDoc['search']['rowsSearched'] = 0 + mappingDoc['search']['rowsSkipped'] = 0 + mappingDoc['search']['mappedList'] = [] + mappingDoc['search']['unmappedList'] = [] + mappingDoc['search']['ignoredList'] = [] + mappingDoc['search']['statistics'] = {} + + #--ensure uniqueness of attributes, especially if using labels (usage types) + errorCnt = 0 + labelAttrList = [] + for i1 in range(len(mappingDoc['search']['attributes'])): + if mappingDoc['search']['attributes'][i1]['attribute'] == '': + if 'mapping' in mappingDoc['search']['attributes'][i1]: + mappingDoc['search']['ignoredList'].append(mappingDoc['search']['attributes'][i1]['mapping'].replace('%(','').replace(')s','')) + continue + elif csv_functions.is_senzing_attribute(mappingDoc['search']['attributes'][i1]['attribute']): + mappingDoc['search']['mappedList'].append(mappingDoc['search']['attributes'][i1]['attribute']) + else: + mappingDoc['search']['unmappedList'].append(mappingDoc['search']['attributes'][i1]['attribute']) + mappingDoc['search']['statistics'][mappingDoc['search']['attributes'][i1]['attribute']] = 0 + + if 'label' in mappingDoc['search']['attributes'][i1]: + mappingDoc['search']['attributes'][i1]['label_attribute'] = mappingDoc['search']['attributes'][i1]['label'].replace('_', '-') + '_' + else: + mappingDoc['search']['attributes'][i1]['label_attribute'] = '' + mappingDoc['search']['attributes'][i1]['label_attribute'] += mappingDoc['search']['attributes'][i1]['attribute'] + if mappingDoc['search']['attributes'][i1]['label_attribute'] in labelAttrList: + errorCnt += 1 + print('attribute %s (%s) is duplicated for output %s!' % (i1, mappingDoc['search']['attributes'][i1]['label_attribute'], i)) + else: + labelAttrList.append(mappingDoc['search']['attributes'][i1]['label_attribute']) + + if errorCnt: + return -1 + + #--override mapping document with parameters + if inputFileName or 'inputFileName' not in mappingDoc['input']: + mappingDoc['input']['inputFileName'] = inputFileName + #if fieldDelimiter or 'fieldDelimiter' not in mappingDoc['input']: + # mappingDoc['input']['fieldDelimiter'] = fieldDelimiter + #if fileEncoding or 'fileEncoding' not in mappingDoc['input']: + # mappingDoc['input']['fileEncoding'] = fileEncoding + if 'columnHeaders' not in mappingDoc['input']: + mappingDoc['input']['columnHeaders'] = [] + + #--get the input file + if not mappingDoc['input']['inputFileName']: + print('') + print('no input file supplied') + return 1 + fileList = glob.glob(mappingDoc['input']['inputFileName']) + if len(fileList) == 0: + print('') + print('%s not found' % inputFileName) + return 1 + + #--for each input file + totalRowCnt = 0 + for fileName in fileList: + print('') + print('Processing %s ...' % fileName) + currentFile = {} + currentFile['name'] = fileName + currentFile['rowCnt'] = 0 + currentFile['skipCnt'] = 0 + + #--open the file + if 'fileEncoding' in mappingDoc['input'] and mappingDoc['input']['fileEncoding']: + currentFile['fileEncoding'] = mappingDoc['input']['fileEncoding'] + currentFile['handle'] = open(fileName, 'r', encoding=mappingDoc['input']['fileEncoding']) + else: + currentFile['handle'] = open(fileName, 'r') + + #--set the dialect + currentFile['fieldDelimiter'] = mappingDoc['input']['fieldDelimiter'] + if not mappingDoc['input']['fieldDelimiter']: + currentFile['csvDialect'] = csv.Sniffer().sniff(currentFile['handle'].readline(), delimiters='|,\t') + currentFile['handle'].seek(0) + currentFile['fieldDelimiter'] = currentFile['csvDialect'].delimiter + mappingDoc['input']['fieldDelimiter'] = currentFile['csvDialect'].delimiter + elif mappingDoc['input']['fieldDelimiter'].lower() in ('csv', 'comma', ','): + currentFile['csvDialect'] = 'excel' + elif mappingDoc['input']['fieldDelimiter'].lower() in ('tab', 'tsv', '\t'): + currentFile['csvDialect'] = 'excel-tab' + elif mappingDoc['input']['fieldDelimiter'].lower() in ('pipe', '|'): + csv.register_dialect('pipe', delimiter = '|', quotechar = '"') + currentFile['csvDialect'] = 'pipe' + elif len(mappingDoc['input']['fieldDelimiter']) == 1: + csv.register_dialect('other', delimiter = delimiter, quotechar = '"') + currentFile['csvDialect'] = 'other' + elif len(mappingDoc['input']['fieldDelimiter']) > 1: + currentFile['csvDialect'] = 'multi' + else: + currentFile['csvDialect'] = 'excel' + + #--set the reader (csv cannot be used for multi-char delimiters) + if currentFile['csvDialect'] != 'multi': + currentFile['reader'] = csv.reader(currentFile['handle'], dialect=currentFile['csvDialect']) + else: + currentFile['reader'] = currentFile['handle'] + + #--get the current file header row and use it if not one already + currentFile, currentHeaders = getNextRow(currentFile) + if not mappingDoc['input']['columnHeaders']: + mappingDoc['input']['columnHeaders'] = [x.upper() for x in currentHeaders] + currentFile['header'] = mappingDoc['input']['columnHeaders'] + + while True: + currentFile, rowData = getNextRow(currentFile) + if not rowData or shutDown: + break + + totalRowCnt += 1 + rowData['ROW_ID'] = totalRowCnt + + #--clean garbage values + for key in rowData: + rowData[key] = csv_functions.clean_value(key, rowData[key]) + + #--perform calculations + mappingErrors = 0 + if 'calculations' in mappingDoc: + for calcDict in mappingDoc['calculations']: + try: newValue = eval(list(calcDict.values())[0]) + except Exception as err: + print(' error: %s [%s]' % (list(calcDict.keys())[0], err)) + mappingErrors += 1 + else: + if type(newValue) == list: + for newItem in newValue: + rowData.update(newItem) + else: + rowData[list(calcDict.keys())[0]] = newValue + + if debugOn: + print(json.dumps(rowData, indent=4)) + pause() + + if 'filter' in mappingDoc['search']: + try: skipRow = eval(mappingDoc['search']['filter']) + except Exception as err: + skipRow = False + print(' filter error: %s [%s]' % (mappingDoc['search']['filter'], err)) + if skipRow: + mappingDoc['search']['rowsSkipped'] += 1 + continue + + rootValues = {} + subListValues = {} + for attrDict in mappingDoc['search']['attributes']: + if attrDict['attribute'] == '': + continue + + attrValue = getValue(rowData, attrDict['mapping']) + if attrValue: + mappingDoc['search']['statistics'][attrDict['attribute']] += 1 + if 'subList' in attrDict: + if attrDict['subList'] not in subListValues: + subListValues[attrDict['subList']] = {} + subListValues[attrDict['subList']][attrDict['label_attribute']] = attrValue + else: + rootValues[attrDict['label_attribute']] = attrValue + + #--create the search json + jsonData = {} + for subList in subListValues: + jsonData[subList] = [subListValues[subList]] + jsonData.update(rootValues) + if debugOn: + print(json.dumps(jsonData, indent=4)) + pause() + jsonStr = json.dumps(jsonData) + + #--empty searchResult = '{"SEARCH_RESPONSE": {"RESOLVED_ENTITIES": []}}'??? + try: + response = bytearray() + retcode = g2Engine.searchByAttributesV2(jsonStr, searchFlags, response) + response = response.decode() if response else '' + #if len(response) > 500: + # print(json.dumps(json.loads(response), indent=4)) + # pause() + except G2ModuleException as err: + print('') + print(err) + print('') + shutDown = True + break + jsonResponse = json.loads(response) + + matchList = [] + for resolvedEntity in jsonResponse['RESOLVED_ENTITIES']: + + #--create a list of data sources we found them in + dataSources = {} + for record in resolvedEntity['ENTITY']['RESOLVED_ENTITY']['RECORDS']: + dataSource = record['DATA_SOURCE'] + if dataSource not in dataSources: + dataSources[dataSource] = [record['RECORD_ID']] + else: + dataSources[dataSource].append(record['RECORD_ID']) + + dataSourceList = [] + for dataSource in dataSources: + if len(dataSources[dataSource]) == 1: + dataSourceList.append(dataSource + ': ' + dataSources[dataSource][0]) + else: + dataSourceList.append(dataSource + ': ' + str(len(dataSources[dataSource])) + ' records') + + #--determine the matching criteria + matchLevel = int(resolvedEntity['MATCH_INFO']['MATCH_LEVEL']) + matchKey = resolvedEntity['MATCH_INFO']['MATCH_KEY'] if resolvedEntity['MATCH_INFO']['MATCH_KEY'] else '' + matchKey = matchKey.replace('+RECORD_TYPE', '') + + scoreData = [] + bestScores = {} + bestScores['NAME'] = {} + bestScores['NAME']['score'] = 0 + bestScores['NAME']['value'] = 'n/a' + for featureCode in resolvedEntity['MATCH_INFO']['FEATURE_SCORES']: + if featureCode == 'NAME': + scoreCode = 'GNR_FN' + else: + scoreCode = 'FULL_SCORE' + for scoreRecord in resolvedEntity['MATCH_INFO']['FEATURE_SCORES'][featureCode]: + matchingScore= scoreRecord[scoreCode] + matchingValue = scoreRecord['CANDIDATE_FEAT'] + scoreData.append('%s|%s|%s|%s' % (featureCode, scoreCode, matchingScore, matchingValue)) + if featureCode not in bestScores: + bestScores[featureCode] = {} + bestScores[featureCode]['score'] = 0 + bestScores[featureCode]['value'] = 'n/a' + if matchingScore > bestScores[featureCode]['score']: + bestScores[featureCode]['score'] = matchingScore + bestScores[featureCode]['value'] = matchingValue + + if debugOn: + print(json.dumps(bestScores, indent=4)) + + + #--perform scoring (use stored match_score if not overridden in the mapping document) + if 'scoring' not in mappingDoc: + matchScore = str(((5-resolvedEntity['MATCH_INFO']['MATCH_LEVEL']) * 100) + int(resolvedEntity['MATCH_INFO']['MATCH_SCORE'])) + '-' + str(1000+bestScores['NAME']['score'])[-3:] + else: + matchScore = 0 + for featureCode in bestScores: + if featureCode in mappingDoc['scoring']: + if debugOn: + print(featureCode, mappingDoc['scoring'][featureCode]) + if bestScores[featureCode]['score'] >= mappingDoc['scoring'][featureCode]['threshold']: + matchScore += int(round(bestScores[featureCode]['score'] * (mappingDoc['scoring'][featureCode]['+weight'] / 100),0)) + elif '-weight' in mappingDoc['scoring'][featureCode]: + matchScore += -mappingDoc['scoring'][featureCode]['-weight'] #--actual score does not matter if below the threshold + + #--create the possible match entity one-line summary + matchedEntity = {} + matchedEntity['ENTITY_ID'] = resolvedEntity['ENTITY']['RESOLVED_ENTITY']['ENTITY_ID'] + if 'ENTITY_NAME' in resolvedEntity['ENTITY']['RESOLVED_ENTITY']: + matchedEntity['ENTITY_NAME'] = resolvedEntity['ENTITY']['RESOLVED_ENTITY']['ENTITY_NAME'] + (('\n aka: ' + bestScores['NAME']['value']) if bestScores['NAME']['value'] and bestScores['NAME']['value'] != resolvedEntity['ENTITY']['RESOLVED_ENTITY']['ENTITY_NAME'] else '') + else: + matchedEntity['ENTITY_NAME'] = bestScores['NAME']['value'] if 'NAME' in bestScores else '' + matchedEntity['ENTITY_SOURCES'] = '\n'.join(dataSourceList) + matchedEntity['MATCH_LEVEL'] = matchLevel + matchedEntity['MATCH_KEY'] = matchKey[1:] + matchedEntity['MATCH_SCORE'] = matchScore + matchedEntity['NAME_SCORE'] = bestScores['NAME']['score'] + matchedEntity['SCORE_DATA'] = '\n'.join(sorted(map(str, scoreData))) + + if debugOn: + print(json.dumps(matchedEntity, indent=4)) + pause() + + matchedEntity['RECORDS'] = resolvedEntity['ENTITY']['RESOLVED_ENTITY']['RECORDS'] + + #--check the output filters + filteredOut = False + if matchLevel > mappingDoc['output']['matchLevelFilter']: + filteredOut = True + if debugOn: + print(' ** did not meet matchLevelFilter **') + if bestScores['NAME']['score'] < mappingDoc['output']['nameScoreFilter']: + filteredOut = True + if debugOn: + print(' ** did not meet nameScoreFilter **') + if mappingDoc['output']['dataSourceFilter'] and mappingDoc['output']['dataSourceFilter'] not in dataSources: + filteredOut = True + if debugOn: + print(' ** did not meet dataSourceFiler **') + if not filteredOut: + matchList.append(matchedEntity) + + #--set the no match condition + if len(matchList) == 0: + # if requiredFieldsMissing: + # rowsSkipped += 1 + # else: + rowsNotMatched += 1 + matchedEntity = {} + matchedEntity['ENTITY_ID'] = 0 + matchedEntity['ENTITY_NAME'] = '' + matchedEntity['ENTITY_SOURCES'] = '' + matchedEntity['MATCH_NUMBER'] = 0 + matchedEntity['MATCH_LEVEL'] = 0 + matchedEntity['MATCH_KEY'] = '' + matchedEntity['MATCH_SCORE'] = '' + matchedEntity['NAME_SCORE'] = '' + matchedEntity['SCORE_DATA'] = '' + matchedEntity['RECORDS'] = [] + matchList.append(matchedEntity) + if debugOn: + print(' ** no matches found **') + else: + rowsMatched += 1 + + #---------------------------------- + #--create the output rows + matchNumber = 0 + for matchedEntity in sorted(matchList, key=lambda x: x['MATCH_SCORE'], reverse=True): + matchNumber += 1 + matchedEntity['MATCH_NUMBER'] = matchNumber if matchedEntity['ENTITY_ID'] != 0 else 0 + + if matchedEntity['MATCH_SCORE']: + score = int(matchedEntity['MATCH_SCORE']) + level = 'best' if matchNumber == 1 else 'additional' + scoreCounts[level]['total'] += 1 + if score >= 100: + scoreCounts[level]['>=100'] += 1 + elif score >= 95: + scoreCounts[level]['>=95'] += 1 + elif score >= 90: + scoreCounts[level]['>=90'] += 1 + elif score >= 85: + scoreCounts[level]['>=85'] += 1 + elif score >= 80: + scoreCounts[level]['>=80'] += 1 + elif score >= 75: + scoreCounts[level]['>=75'] += 1 + elif score >= 70: + scoreCounts[level]['>=70'] += 1 + else: + scoreCounts[level]['<70'] += 1 + + if matchedEntity['NAME_SCORE']: + score = int(matchedEntity['NAME_SCORE']) + level = 'name' + scoreCounts[level]['total'] += 1 + if score >= 100: + scoreCounts[level]['=100'] += 1 + elif score >= 95: + scoreCounts[level]['>=95'] += 1 + elif score >= 90: + scoreCounts[level]['>=90'] += 1 + elif score >= 85: + scoreCounts[level]['>=85'] += 1 + elif score >= 80: + scoreCounts[level]['>=80'] += 1 + elif score >= 75: + scoreCounts[level]['>=75'] += 1 + elif score >= 70: + scoreCounts[level]['>=70'] += 1 + else: + scoreCounts[level]['<70'] += 1 + + if matchNumber > mappingDoc['output']['maxReturnCount']: + break + + #--get the column values + #uppercasedJsonData = False + rowValues = [] + for columnDict in mappingDoc['output']['columns']: + columnValue = '' + try: + if columnDict['source'].upper() == 'CSV': + columnValue = columnDict['value'] % rowData + elif columnDict['source'].upper() == 'API': + columnValue = columnDict['value'] % matchedEntity + except: + print('warning: could not find %s in %s' % (columnDict['value'],columnDict['source'].upper())) + + #--comes from the records + if columnDict['source'].upper() == 'RECORD': + #if not uppercasedJsonData: + # record['JSON_DATA'] = dictKeysUpper(record['JSON_DATA']) + # uppercasedJsonData = True + columnValues = [] + for record in matchedEntity['RECORDS']: + if columnDict['value'].upper().endswith('_DATA'): + for item in record[columnDict['value'].upper()]: + columnValues.append(item) + else: + try: thisValue = columnDict['value'] % record['JSON_DATA'] + except: pass + else: + if thisValue and thisValue not in columnValues: + columnValues.append(thisValue) + if columnValues: + columnValue = '\n'.join(sorted(map(str, columnValues))) + + #if debugOn: + # print(columnDict['value'], columnValue) + if len(columnValue) > 32000: + columnValue = columnValue[0:32000] + print('column %s truncated at 32k' % columnDict['name']) + rowValues.append(columnValue.replace('\n', '|')) + + #--write the record + if mappingDoc['output']['fileType'] != 'JSON': + mappingDoc['output']['fileWriter'].writerow(rowValues) + else: + mappingDoc['output']['fileHandle'].write(json.dumps(rowValues) + '\n') + + #--update the counters + if matchedEntity['MATCH_LEVEL'] == 1: + resolvedMatches += 1 + elif matchedEntity['MATCH_LEVEL'] == 2: + possibleMatches += 1 + elif matchedEntity['MATCH_LEVEL'] == 3: + possiblyRelateds += 1 + elif matchedEntity['MATCH_LEVEL'] == 4: + nameOnlyMatches += 1 + + if totalRowCnt % sqlCommitSize == 0: + now = datetime.now().strftime('%I:%M%p').lower() + elapsedMins = round((time.time() - procStartTime) / 60, 1) + eps = int(float(sqlCommitSize) / (float(time.time() - batchStartTime if time.time() - batchStartTime != 0 else 1))) + batchStartTime = time.time() + print(' %s rows searched at %s, %s per second ... %s rows matched, %s resolved matches, %s possible matches, %s possibly related, %s name only' % (totalRowCnt, now, eps, rowsMatched, resolvedMatches, possibleMatches, possiblyRelateds, nameOnlyMatches)) + + #--break conditions + if shutDown: + break + elif 'ERROR' in currentFile: + break + + currentFile['handle'].close() + if shutDown: + break + + #--all files completed + now = datetime.now().strftime('%I:%M%p').lower() + elapsedMins = round((time.time() - procStartTime) / 60, 1) + eps = int(float(sqlCommitSize) / (float(time.time() - procStartTime if time.time() - procStartTime != 0 else 1))) + batchStartTime = time.time() + print(' %s rows searched at %s, %s per second ... %s rows matched, %s resolved matches, %s possible matches, %s possibly related, %s name only' % (totalRowCnt, now, eps, rowsMatched, resolvedMatches, possibleMatches, possiblyRelateds, nameOnlyMatches)) + print(json.dumps(scoreCounts, indent = 4)) + + #--close all inputs and outputs + outputFileHandle.close() + + return shutDown + +#---------------------------------------- +if __name__ == "__main__": + appPath = os.path.dirname(os.path.abspath(sys.argv[0])) + + global shutDown + shutDown = False + signal.signal(signal.SIGINT, signal_handler) + procStartTime = time.time() + sqlCommitSize = 100 + + senzingConfigFile = os.getenv('SENZING_CONFIG_FILE') if os.getenv('SENZING_CONFIG_FILE', None) else appPath + os.path.sep + 'G2Module.ini' + parser = argparse.ArgumentParser() + parser.add_argument('-c', '--senzingConfigFile', dest='senzingConfigFile', default=senzingConfigFile, help='name of the g2.ini file, defaults to %s' % senzingConfigFile) + parser.add_argument('-m', '--mappingFileName', dest='mappingFileName', help='the name of a mapping file') + parser.add_argument('-i', '--inputFileName', dest='inputFileName', help='the name of a csv input file') + parser.add_argument('-d', '--delimiterChar', dest='delimiterChar', help='delimiter character') + parser.add_argument('-e', '--fileEncoding', dest='fileEncoding', help='file encoding') + parser.add_argument('-o', '--outputFileName', dest='outputFileName', help='the name of the output file') + parser.add_argument('-l', '--log_file', dest='logFileName', help='optional statistics filename (json format).') + parser.add_argument('-D', '--debugOn', dest='debugOn', action='store_true', default=False, help='run in debug mode') + args = parser.parse_args() + senzingConfigFile = args.senzingConfigFile + mappingFileName = args.mappingFileName + inputFileName = args.inputFileName + delimiterChar = args.delimiterChar + fileEncoding = args.fileEncoding + outputFileName = args.outputFileName + logFileName = args.logFileName + debugOn = args.debugOn + + #--get parameters from ini file + if not os.path.exists(senzingConfigFile): + print('') + print('Senzing config file: %s not found!' % senzingConfigFile) + print('') + sys.exit(1) + iniParser = configparser.ConfigParser() + iniParser.read(senzingConfigFile) + + #--use config file if in the ini file, otherwise expect to get from database with config manager lib + #print(iniParser.get('SQL', 'G2CONFIGFILE')) + try: configTableFile = iniParser.get('SQL', 'G2CONFIGFILE') + except: configTableFile = None + if not configTableFile and not G2ConfigMgr: + print('') + print('Config information missing from ini file and no config manager present!') + print('') + sys.exit(1) + + #--get the config from the file + if configTableFile: + try: cfgData = json.load(open(configTableFile), encoding="utf-8") + except ValueError as e: + print('') + print('G2CONFIGFILE: %s has invalid json' % configTableFile) + print(e) + print('') + sys.exit(1) + except IOError as e: + print('') + print('G2CONFIGFILE: %s was not found' % configTableFile) + print(e) + print('') + sys.exit(1) + + #--get the config from the config manager + else: + iniParamCreator = G2IniParams() + iniParams = iniParamCreator.getJsonINIParams(senzingConfigFile) + try: + g2ConfigMgr = G2ConfigMgr() + g2ConfigMgr.initV2('pyG2ConfigMgr', iniParams, False) + defaultConfigID = bytearray() + g2ConfigMgr.getDefaultConfigID(defaultConfigID) + if len(defaultConfigID) == 0: + print('') + print('No default config stored in database. (see https://senzing.zendesk.com/hc/en-us/articles/360036587313)') + print('') + sys.exit(1) + defaultConfigDoc = bytearray() + g2ConfigMgr.getConfig(defaultConfigID, defaultConfigDoc) + if len(defaultConfigDoc) == 0: + print('') + print('No default config stored in database. (see https://senzing.zendesk.com/hc/en-us/articles/360036587313)') + print('') + sys.exit(1) + cfgData = json.loads(defaultConfigDoc.decode()) + g2ConfigMgr.destroy() + except: + #--error already printed by the api wrapper + sys.exit(1) + + #--initialize the g2engine + try: + g2Engine = G2Engine() + if configTableFile: + g2Engine.init('csv_search_viewer', senzingConfigFile, False) + else: + iniParamCreator = G2IniParams() + iniParams = iniParamCreator.getJsonINIParams(senzingConfigFile) + g2Engine.initV2('csv_search', iniParams, False) + except G2Exception as err: + print('') + print('Could not initialize the G2 Engine') + print(str(err)) + print('') + sys.exit(1) + + #--load the csv functions if available + csv_functions = csv_functions() + if not csv_functions.initialized: + sys.exit(1) + + returnCode = processFile() + + print('') + elapsedMins = round((time.time() - procStartTime) / 60, 1) + if returnCode == 0: + print('Process completed successfully in %s minutes!' % elapsedMins) + else: + print('Process aborted after %s minutes!' % elapsedMins) + print('') + + g2Engine.destroy() + sys.exit(returnCode) diff --git a/input/test_set1.csv b/input/test_set1.csv index 9d645ac..ce31ad3 100644 --- a/input/test_set1.csv +++ b/input/test_set1.csv @@ -3,7 +3,14 @@ uniqueid,name,gender,dob,ssn,dlnum,proflic,taxid,addr1,city,state,zip 1002,Bob Jones,M,2/2/92,2102122,,1002022,,222 Second,Las Vegas,NV,89112 1003,General Hospital,,3/3/93,,,1003033,,333 Third,Las Vegas,NV,89113 1004,Mary Smith,F,,,,1004044,,444 Fourth,Las Vegas,NV,89114 -1005,Peter Anderson,,,,,1005055,,555 Fifth,Las Vegas,NV,89115 +1005,Peter Null Anderson,,,,,1005055,,555 Fifth,Las Vegas,NV,89115 1006,Cleveland Clinic,,,6060016,,1006066,6060016,666 Sixth,Las Vegas,NV,89116 1007,,F,,,,,700777,777 Seventh,Las Vegas,NV,89117 -1008,City of Hope,F,,,,,,888 Eighth,Las Vegas,NV,89118 \ No newline at end of file +1008,City of Hope,F,,,,,,888 Eighth,Las Vegas,NV,89118 +1009,NLR Jones & Jones,,,,,1009099,,Null,,, +1010,No Longer Rep-Jones and Jones,,,,,1009099,,,Null,, +1011,Rideshare Inc Jones,,,,,1011011,,,,null, +1012,Rideshare Inc Smith,,,,,1011011,,,,,null +1013,ABC Industries DBA ABC Semiconductor,,,,,1013013,,null,,, +1014,ABC Industries LLCDBA ABC Circuit boards,,,,,1013013,,,null,, +1015,Joe Smith Aka Joey Jones,,,,,1015015,,,,null, \ No newline at end of file diff --git a/mappings/search_set1.map b/mappings/search_set1.map new file mode 100644 index 0000000..56d222b --- /dev/null +++ b/mappings/search_set1.map @@ -0,0 +1,260 @@ +{ + "input": { + "inputFileName": "input/test_set1.csv", + "fieldDelimiter": ",", + "columnHeaders": [ + "UNIQUEID", + "NAME", + "GENDER", + "DOB", + "SSN", + "DLNUM", + "PROFLIC", + "TAXID", + "ADDR1", + "CITY", + "STATE", + "ZIP" + ] + }, + "calculations": [ + { + "": "csv_functions.parse_name(rowData['NAME'])" + } + ], + "search": { + "attributes": [ + { + "attribute": "PRIMARY_NAME_ORG", + "mapping" : "%(PRIMARY_NAME_ORG)s" + }, + { + "attribute": "PRIMARY_NAME_FULL", + "mapping" : "%(PRIMARY_NAME_FULL)s" + }, + { + "attribute": "SECONDARY_NAME_ORG", + "mapping" : "%(SECONDARY_NAME_ORG)s" + }, + { + "attribute": "SECONDARY_NAME_FULL", + "mapping" : "%(SECONDARY_NAME_FULL)s" + }, + { + "attribute": "GENDER", + "mapping" : "%(gender)s" + }, + { + "attribute": "DATE_OF_BIRTH", + "mapping" : "%(dob)s" + }, + { + "attribute": "SSN_NUMBER", + "mapping" : "%(ssn)s" + }, + { + "attribute": "PROF_LICENSE", + "mapping" : "%(proflic)s" + }, + { + "attribute": "TAX_ID_NUMBER", + "mapping" : "%(taxid)s" + }, + { + "attribute": "ADDR_LINE1", + "mapping" : "%(addr1)s" + }, + { + "attribute": "ADDR_CITY", + "mapping" : "%(city)s" + }, + { + "attribute": "ADDR_STATE", + "mapping" : "%(state)s" + }, + { + "attribute": "ADDR_POSTAL_CODE", + "mapping" : "%(zip)s" + } + ] + }, + "scoring": { + "NAME": { + "threshold": 80, + "+weight": 80 + }, + "DOB": { + "threshold": 85, + "+weight": 10, + "-weight": 30 + }, + "ADDRESS": { + "threshold": 80, + "+weight": 10 + }, + "PHONE": { + "threshold": 80, + "+weight": 10 + }, + "EMAIL": { + "threshold": 80, + "+weight": 10 + }, + "SSN": { + "threshold": 90, + "+weight": 10, + "-weight": 30 + }, + "DRLIC": { + "threshold": 90, + "+weight": 10 + }, + "TAX_ID": { + "threshold": 90, + "+weight": 10 + } + }, + "output": { + "fileType": "csv", + "matchLevelFilter": 0, + "nameScoreFilter": 0, + "maxReturnCount": 10, + "columns": [ + { + "name": "row_id", + "source": "csv", + "value" : "%(row_id)s" + }, + { + "name": "uniqueID", + "source": "csv", + "value" : "%(uniqueID)s" + }, + { + "name": "name", + "source": "csv", + "value" : "%(name)s" + }, + { + "name": "gender", + "source": "csv", + "value" : "%(gender)s" + }, + { + "name": "dob", + "source": "csv", + "value" : "%(dob)s" + }, + { + "name": "ssn", + "source": "csv", + "value" : "%(ssn)s" + }, + { + "name": "dlnum", + "source": "csv", + "value" : "%(dlnum)s" + }, + { + "name": "proflic", + "source": "csv", + "value" : "%(proflic)s" + }, + { + "name": "taxid", + "source": "csv", + "value" : "%(taxid)s" + }, + { + "name": "addr1", + "source": "csv", + "value" : "%(addr1)s" + }, + { + "name": "city", + "source": "csv", + "value" : "%(city)s" + }, + { + "name": "state", + "source": "csv", + "value" : "%(state)s" + }, + { + "name": "zip", + "source": "csv", + "value" : "%(zip)s" + }, + { + "name": "match_number", + "source": "api", + "value" : "%(match_number)s" + }, + { + "name": "match_key", + "source": "api", + "value" : "%(match_key)s" + }, + { + "name": "match_score", + "source": "api", + "value" : "%(match_score)s" + }, + { + "name": "name_score", + "source": "api", + "value" : "%(name_score)s" + }, + { + "name": "entity_id", + "source": "api", + "value" : "%(entity_id)s" + }, + { + "name": "entity_name", + "source": "api", + "value" : "%(entity_name)s" + }, + { + "name": "entity_sources", + "source": "api", + "value" : "%(entity_sources)s" + }, + { + "name": "score_data", + "source": "api", + "value" : "%(score_data)s" + }, + { + "name": "name_data", + "source": "record", + "value" : "NAME_DATA" + }, + { + "name": "attribute_data", + "source": "record", + "value" : "ATTRIBUTE_DATA" + }, + { + "name": "identifier_data", + "source": "record", + "value" : "IDENTIFIER_DATA" + }, + { + "name": "address_data", + "source": "record", + "value" : "ADDRESS_DATA" + }, + { + "name": "phone_data", + "source": "record", + "value" : "PHONE_DATA" + }, + { + "name": "other_data", + "source": "record", + "value" : "OTHER_DATA" + } + ] + } +} \ No newline at end of file diff --git a/mappings/test_set1.map b/mappings/test_set1.map index 6e44cb6..90e65dd 100644 --- a/mappings/test_set1.map +++ b/mappings/test_set1.map @@ -18,23 +18,21 @@ ] }, "calculations": [ - {"is_organization": "csv_functions.is_organization_name(rowData['name'])"}, - {"record_type": "'ORGANIZATION' if rowData['is_organization'] else 'PERSON'"}, - {"name_org": "rowData['name'] if rowData['is_organization'] else ''"}, - {"name_full": "rowData['name'] if not rowData['is_organization'] else ''"}, - {"ref_gender": "rowData['gender'] if rowData['is_organization'] else ''"}, - {"ref_dob": "rowData['dob'] if rowData['is_organization'] else ''"}, - {"ref_ssn": "rowData['ssn'] if rowData['is_organization'] else ''"}, - {"ref_dlnum": "rowData['dlnum'] if rowData['is_organization'] else ''"}, - {"gender": "rowData['gender'] if not rowData['is_organization'] else ''"}, - {"dob": "rowData['dob'] if not rowData['is_organization'] else ''"}, - {"ssn": "rowData['ssn'] if not rowData['is_organization'] else ''"}, - {"dlnum": "rowData['dlnum'] if not rowData['is_organization'] else ''"} + {"": "csv_functions.parse_name(rowData['name'])"}, + {"record_type": "'ORGANIZATION' if rowData['IS_ORGANIZATION'] else 'PERSON'"}, + {"ref_gender": "rowData['gender'] if rowData['IS_ORGANIZATION'] else ''"}, + {"ref_dob": "rowData['dob'] if rowData['IS_ORGANIZATION'] else ''"}, + {"ref_ssn": "rowData['ssn'] if rowData['IS_ORGANIZATION'] else ''"}, + {"ref_dlnum": "rowData['dlnum'] if rowData['IS_ORGANIZATION'] else ''"}, + {"gender": "rowData['gender'] if not rowData['IS_ORGANIZATION'] else ''"}, + {"dob": "rowData['dob'] if not rowData['IS_ORGANIZATION'] else ''"}, + {"ssn": "rowData['ssn'] if not rowData['IS_ORGANIZATION'] else ''"}, + {"dlnum": "rowData['dlnum'] if not rowData['IS_ORGANIZATION'] else ''"} ], "outputs": [ { "filter": "not rowData['name']", - "data_source": "test", + "data_source": "TEST", "entity_type": "%(record_type)s", "record_id": "%(uniqueid)s", "attributes": [ @@ -72,11 +70,23 @@ }, { "attribute": "PRIMARY_NAME_ORG", - "mapping": "%(name_org)s" + "mapping": "%(PRIMARY_NAME_ORG)s" }, { "attribute": "PRIMARY_NAME_FULL", - "mapping": "%(name_full)s" + "mapping": "%(PRIMARY_NAME_FULL)s" + }, + { + "attribute": "SECONDARY_NAME_ORG", + "mapping": "%(SECONDARY_NAME_ORG)s" + }, + { + "attribute": "SECONDARY_NAME_FULL", + "mapping": "%(SECONDARY_NAME_FULL)s" + }, + { + "attribute": "REFERENCE_NAME", + "mapping": "%(REFERENCE_NAME)s" }, { "attribute": "RECORD_TYPE", diff --git a/output/test_set1.json b/output/test_set1.json index 646ac57..e264031 100644 --- a/output/test_set1.json +++ b/output/test_set1.json @@ -1,7 +1,14 @@ -{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1001", "PRIMARY_NAME_ORG": "ABC Company", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1001011", "HOME_ADDR_LINE1": "111 First", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89111"} -{"DATA_SOURCE": "test", "ENTITY_TYPE": "PERSON", "RECORD_ID": "1002", "PRIMARY_NAME_FULL": "Bob Jones", "RECORD_TYPE": "PERSON", "GENDER": "M", "DATE_OF_BIRTH": "2/2/92", "SSN_NUMBER": "2102122", "PROF_LICENSE": "1002022", "HOME_ADDR_LINE1": "222 Second", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89112"} -{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1003", "PRIMARY_NAME_ORG": "General Hospital", "RECORD_TYPE": "ORGANIZATION", "REF_DOB": "3/3/93", "PROF_LICENSE": "1003033", "HOME_ADDR_LINE1": "333 Third", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89113"} -{"DATA_SOURCE": "test", "ENTITY_TYPE": "PERSON", "RECORD_ID": "1004", "PRIMARY_NAME_FULL": "Mary Smith", "RECORD_TYPE": "PERSON", "GENDER": "F", "PROF_LICENSE": "1004044", "HOME_ADDR_LINE1": "444 Fourth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89114"} -{"DATA_SOURCE": "test", "ENTITY_TYPE": "PERSON", "RECORD_ID": "1005", "PRIMARY_NAME_FULL": "Peter Anderson", "RECORD_TYPE": "PERSON", "PROF_LICENSE": "1005055", "HOME_ADDR_LINE1": "555 Fifth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89115"} -{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1006", "PRIMARY_NAME_ORG": "Cleveland Clinic", "RECORD_TYPE": "ORGANIZATION", "REF_SSN": "6060016", "PROF_LICENSE": "1006066", "TAX_ID_NUMBER": "6060016", "HOME_ADDR_LINE1": "666 Sixth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89116"} -{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1008", "PRIMARY_NAME_ORG": "City of Hope", "RECORD_TYPE": "ORGANIZATION", "REF_GENDER": "F", "HOME_ADDR_LINE1": "888 Eighth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89118"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1001", "PRIMARY_NAME_ORG": "ABC COMPANY", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1001011", "HOME_ADDR_LINE1": "111 First", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89111"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1002", "PRIMARY_NAME_FULL": "BOB JONES", "RECORD_TYPE": "ORGANIZATION", "REF_GENDER": "M", "REF_DOB": "2/2/92", "REF_SSN": "2102122", "PROF_LICENSE": "1002022", "HOME_ADDR_LINE1": "222 Second", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89112"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1003", "PRIMARY_NAME_ORG": "GENERAL HOSPITAL", "RECORD_TYPE": "ORGANIZATION", "REF_DOB": "3/3/93", "PROF_LICENSE": "1003033", "HOME_ADDR_LINE1": "333 Third", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89113"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1004", "PRIMARY_NAME_FULL": "MARY SMITH", "RECORD_TYPE": "ORGANIZATION", "REF_GENDER": "F", "PROF_LICENSE": "1004044", "HOME_ADDR_LINE1": "444 Fourth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89114"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1005", "PRIMARY_NAME_FULL": "PETER ANDERSON", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1005055", "HOME_ADDR_LINE1": "555 Fifth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89115"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1006", "PRIMARY_NAME_ORG": "CLEVELAND CLINIC", "RECORD_TYPE": "ORGANIZATION", "REF_SSN": "6060016", "PROF_LICENSE": "1006066", "TAX_ID_NUMBER": "6060016", "HOME_ADDR_LINE1": "666 Sixth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89116"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1008", "PRIMARY_NAME_ORG": "CITY OF HOPE", "RECORD_TYPE": "ORGANIZATION", "REF_GENDER": "F", "HOME_ADDR_LINE1": "888 Eighth", "HOME_ADDR_CITY": "Las Vegas", "HOME_ADDR_STATE": "NV", "HOME_ADDR_POSTAL_CODE": "89118"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1009", "PRIMARY_NAME_ORG": "NLR JONES & JONES", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1009099"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1010", "PRIMARY_NAME_ORG": "- JONES AND JONES", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1009099"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1011", "PRIMARY_NAME_ORG": "RIDESHARE INC", "REFERENCE_NAME": "JONES", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1011011"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1012", "PRIMARY_NAME_ORG": "RIDESHARE INC", "REFERENCE_NAME": "SMITH", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1011011"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1013", "PRIMARY_NAME_ORG": "ABC INDUSTRIES", "SECONDARY_NAME_ORG": "ABC SEMICONDUCTOR", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1013013"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1014", "PRIMARY_NAME_ORG": "ABC INDUSTRIES", "SECONDARY_NAME_ORG": "ABC CIRCUIT BOARDS", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1013013"} +{"DATA_SOURCE": "test", "ENTITY_TYPE": "ORGANIZATION", "RECORD_ID": "1015", "PRIMARY_NAME_FULL": "JOE SMITH", "SECONDARY_NAME_FULL": "JOEY JONES", "RECORD_TYPE": "ORGANIZATION", "PROF_LICENSE": "1015015"}