-
Notifications
You must be signed in to change notification settings - Fork 6
/
sf_tieout
executable file
·227 lines (212 loc) · 9.15 KB
/
sf_tieout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!/usr/bin/env python3
from cmdlineparse import CmdlineParseTieout
from datetime import datetime, timezone
import os
import re
from pathlib import Path
import shutil
from sfconfig import SfConfig
from sfconn import SfConn
from sflogger import SfLogger
#from snowflake.connector.errors import ProgrammingError
sf_cfg = SfConfig('config.json')
cmdline = CmdlineParseTieout()
logger = SfLogger(cmdline.args.log_level, __file__)
sf_conn = SfConn(sf_cfg.config, logger)
yaml_cfg = SfConfig(cmdline.args.yaml, 'yaml')
target = cmdline.args.target
# assume that target is a dict in yaml_config
if target not in yaml_cfg.config:
logger.error(f"{target} not a target in yaml file: {cmdline.args.yaml}")
targets = '"' + '", "'.join(yaml_cfg.config.keys()) + '"'
logger.error(f"Targets available: {targets}")
exit(0)
config = yaml_cfg.config[target]
CASE_INSENSITIVE = cmdline.args.case_insensitive
treat_null_as_blank = cmdline.args.treat_null_as_blank
output_db = config['OUTPUT_DB']
output_sc = config['OUTPUT_SC']
output_prefix = config['OUTPUT_PREFIX']
output_base = f"{output_db}.{output_sc}.TIEOUT_{output_prefix}"
sf_conn.tieout_create_tables(output_base)
validations = config['VALIDATIONS']
for validation in validations:
name = validation['NAME']
key = validation['KEY']
key_iter = "'" + "','".join(key) + "'"
key_iter_unquoted = ", ".join(key)
_from_tbl = validation['FROM_TBL']
_to_tbl = validation['TO_TBL']
ignore_cols = []
if 'IGNORE_COLS' in validation:
ignore_cols = validation['IGNORE_COLS']
ignore_cols.extend(key)
from_db, from_sc, from_obj = cmdline.sf_val.split_db_sc_obj(_from_tbl)
to_db, to_sc, to_obj = cmdline.sf_val.split_db_sc_obj(_to_tbl)
logger.info(f"{target}:{name}: Validating using key {key_iter_unquoted} between {_from_tbl} and {_to_tbl}")
# logic to detect duplicates should run here
if cmdline.args.detect_duplicate_key:
logger.info(f"Detecting duplicate key(s) in from {_from_tbl}")
qry = f"""
with cnt as (
select count(1) as cnt, {key_iter_unquoted}
from {_from_tbl}
group by {key_iter_unquoted}
having cnt > 1
)
select count(1) from cnt
"""
# if the first column of the first row is 0 then there's no duplicates
dup_curs = sf_conn.run_query(qry)
dup_row = dup_curs.fetchone()
dup_keys, = dup_row
if dup_keys > 0:
logger.warning(f"Multiple keys of {key_iter_unquoted} ({dup_keys}) have more than one row in {_from_tbl}")
logger.info(f"Detecting duplicate key(s) in to {_to_tbl}")
qry = f"""
with cnt as (
select count(1) as cnt, {key_iter_unquoted}
from {_to_tbl}
group by {key_iter_unquoted}
having cnt > 1
)
select count(1) from cnt
"""
# if the first column of the first row is 0 then there's no duplicates
dup_curs = sf_conn.run_query(qry)
dup_row = dup_curs.fetchone()
dup_keys, = dup_row
if dup_keys > 0:
logger.warning(f"Multiple keys ({dup_keys}) have more than one row in {_to_tbl}")
# how do we fix this
f_curs = sf_conn.run_query(f"select count({key_iter_unquoted}) from {from_db}.{from_sc}.{from_obj}")
from_row = f_curs.fetchone()
from_rowcnt, = from_row
t_curs = sf_conn.run_query(f"select count({key_iter_unquoted}) from {to_db}.{to_sc}.{to_obj}")
to_row = t_curs.fetchone()
to_rowcnt, = to_row
only_keys = f"""
with from_keys as (
select {key_iter_unquoted} from {_from_tbl}
minus
select {key_iter_unquoted} from {_to_tbl}
), to_keys as (
select {key_iter_unquoted} from {_to_tbl}
minus
select {key_iter_unquoted} from {_from_tbl}
), both_keys as (
select {key_iter_unquoted} from {_to_tbl}
intersect
select {key_iter_unquoted} from {_from_tbl}
)
select 'FROM' as type, count({key_iter_unquoted}) from from_keys
union
select 'TO' as type, count({key_iter_unquoted}) from to_keys
union
select 'BOTH' as type, count({key_iter_unquoted}) from both_keys
"""
from_only = 0
to_only = 0
both = 0
only_curs = sf_conn.run_query(only_keys)
for row in only_curs:
dir, count = row
if dir == 'FROM':
from_only = count
if dir == 'TO':
to_only = count
if dir == 'BOTH':
both = count
logger.info(f"{target}:{name}: {_from_tbl} total row count: {from_rowcnt}; unique {key_iter_unquoted}: {from_only}")
logger.info(f"{target}:{name}: {_to_tbl} total row count: {to_rowcnt}; unique {key_iter_unquoted}: {to_only}")
curs = sf_conn.run_query(f"insert into {output_base}_1_OVERVIEW (name, key, overlap_rowcount, from_tbl, from_rowcount, from_unique, to_tbl, to_rowcount, to_unique) select '{name}', array_construct({key_iter}), {both}, '{_from_tbl}', {from_rowcnt}, {from_only}, '{_to_tbl}', {to_rowcnt}, {to_only}")
#logger.debug("Need to store the keys that are unique")
logger.info(f"{target}:{name}: Comparing column by column where {key_iter_unquoted} overlaps both tables")
logger.debug(f"{target}:{name}: Determining columns to compare")
igncols = ''
if len(ignore_cols) > 0:
igncols = "and column_name not in ('" + "','".join(ignore_cols) + "')"
logger.debug(f"{target}:{name}: Ignoring columns: {ignore_cols}")
from_cols = f"select column_name from {from_db}.information_schema.columns where table_catalog = '{from_db}' and table_schema = '{from_sc}' and table_name = '{from_obj}' {igncols}"
to_cols = f"select column_name from {to_db}.information_schema.columns where table_catalog = '{to_db}' and table_schema = '{to_sc}' and table_name = '{to_obj}' {igncols}"
tbl_columns = f"""
with from_only as (
{from_cols} minus {to_cols}
), to_only as (
{to_cols} minus {from_cols}
), both as (
{from_cols} intersect {to_cols}
)
select 'FROM' as source, column_name from from_only
union
select 'TO' as source, column_name from to_only
union
select 'BOTH' as source, column_name from both
order by column_name asc
"""
# This can be removed when Snowflake support confirms fix
# shared_keys = f"""
#shared_keys as (
# select {key_iter_unquoted} from {_from_tbl}
# intersect
# select {key_iter_unquoted} from {_to_tbl}
#)
#"""
shared_keys_list_unqoted = ", ".join(map(lambda x: f"f.{x}", key))
shared_keys_list_join = " and ".join(map(lambda x: f"f.{x} = t.{x}", key))
shared_keys = f"""
shared_keys as (
select {shared_keys_list_unqoted} from {_from_tbl} f
inner join {_to_tbl} t on {shared_keys_list_join}
)
"""
col_curs = sf_conn.run_query(tbl_columns)
for row in col_curs:
source, col_nm = row
if source == 'BOTH':
# this needs to be expanded ..
keys_list = " and ".join(map(lambda x: f"t.{x} = f.{x} and s.{x} = t.{x} and s.{x} = f.{x}", key))
skey_iter_unquoted = ", ".join(map(lambda x: f"s.{x}",key))
compare_condition = f"t.{col_nm}, f.{col_nm}"
if CASE_INSENSITIVE and treat_null_as_blank:
compare_condition = f"upper(ifnull(t.{col_nm}, '')), upper(ifnull(f.{col_nm}, ''))"
elif CASE_INSENSITIVE:
compare_condition = f"upper(t.{col_nm}), upper(f.{col_nm})"
elif treat_null_as_blank:
compare_condition = f"ifnull(t.{col_nm}, ''), ifnull(f.{col_nm}, '')"
comp_sql = f"""
with {shared_keys}
select count({skey_iter_unquoted}) as diff from shared_keys s, {_from_tbl} f, {_to_tbl} t
where {keys_list}
and NOT(EQUAL_NULL({compare_condition}))
"""
# Skip this one if it fails and move to the next key?
comp_curs = sf_conn.run_query(comp_sql)
comp_row = comp_curs.fetchone()
diff, = comp_row
logger.info(f"{target}:{name}: Column: {col_nm} has {diff} differences")
comp_curs = sf_conn.run_query(f"insert into {output_base}_2_COLUMNS_SUMMARY (name, col_nm, col_diff_cnt) values ('{name}', '{col_nm}', {diff})")
if diff > 0:
keys_hash = ", ".join(map(lambda x: f"'{x}', s.{x}", key))
comp_sql = f"""
insert into {output_base}_3_COLUMNS_DETAIL
(name, col_nm, key_vals, data_vals)
with {shared_keys}
select '{name}' as name,
'{col_nm}' as col_nm,
array_construct({key_iter}) as key_vals, -- the below key is not correct
object_construct_keep_null({keys_hash}, '__to_val', t.{col_nm}, '__from_val', f.{col_nm}) as data_vals
from shared_keys s, {_from_tbl} f, {_to_tbl} t
where {keys_list}
and NOT(EQUAL_NULL({compare_condition}))
"""
comp_curs = sf_conn.run_query(comp_sql)
logger.info(f"{target}:{name}: Column: {col_nm} diff details stored")
elif source == 'FROM':
logger.debug(f"{target}:{name}: Skipping {col_nm} as only exists in from {_from_tbl}")
comp_curs = sf_conn.run_query(f"insert into {output_base}_4_COLUMNS_SKIPPED (name, tbl_nm, col_nm) values ('{name}', '{_from_tbl}', '{col_nm}')")
elif source == 'TO':
logger.debug(f"{target}:{name}: Skipping {col_nm} as only exists in to {_to_tbl}")
comp_curs = sf_conn.run_query(f"insert into {output_base}_4_COLUMNS_SKIPPED (name, tbl_nm, col_nm) values ('{name}', '{_to_tbl}', '{col_nm}')")
sf_conn.close_conn()
exit(0)