-
Notifications
You must be signed in to change notification settings - Fork 0
/
0_DefaultTablesCreation.py
208 lines (167 loc) · 4.97 KB
/
0_DefaultTablesCreation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# Databricks notebook source
# MAGIC %md
# MAGIC ### TABLE CREATION
# COMMAND ----------
# MAGIC %md
# MAGIC ### INCULDING NOTEBOOKS TO THIS NOTEBOOK
# COMMAND ----------
# MAGIC %md
# MAGIC Including Notebook Named **TableAndDataMappingConfigs**
# COMMAND ----------
# MAGIC %run "./customConfig/TableAndDataMappingConfigs"
# COMMAND ----------
# MAGIC %md
# MAGIC ### MAJOR VARIABLES
# COMMAND ----------
# MAGIC %md
# MAGIC Data Base Variables
# COMMAND ----------
silverDataBase = DetlaLakeDatabases["SilverDatabase"]
bronzeDataBase = DetlaLakeDatabases["BronzeDatabase"]
# COMMAND ----------
# MAGIC %md
# MAGIC Table Variables
# COMMAND ----------
silverItemTable = TableName["silverItemTable"]
silverCommonTable = TableName["silverCommonTable"]
bronzeTable = TableName["bronzeTable"]
# COMMAND ----------
# MAGIC %md
# MAGIC Utility Table Variables
# COMMAND ----------
ingetionStatus = utilityTable["dataIngetionStatus"]
legacyIngetionStatus = utilityTable["dataIngetionLegacyStatus"]
# COMMAND ----------
# MAGIC %md
# MAGIC ### CREATING BRONZE DATABASE
# COMMAND ----------
databaseQuery = f"CREATE DATABASE IF NOT EXISTS {bronzeDataBase}"
useBronzeDBQuery = f"use {bronzeDataBase}"
spark.sql(databaseQuery)
# COMMAND ----------
# MAGIC %md
# MAGIC ### CREATING SILVER DATABASE
# COMMAND ----------
databaseQuery = f"CREATE DATABASE IF NOT EXISTS {silverDataBase}"
useSilverDBQuery = f"use {silverDataBase}"
spark.sql(databaseQuery)
# COMMAND ----------
# MAGIC %md
# MAGIC ### CREATE BRONZE TABLE
# COMMAND ----------
createBronzeTableQuery = '''CREATE TABLE IF NOT EXISTS {} (
DT_CARGA string,
DT_ALTERACAO TIMESTAMP,
ID_TRANSACAO bigint,
DT_TRANSACAO TIMESTAMP,
ID_STATUS_TRANSACAO bigint,
STATUS string,
CD_UNIMED_ORIGEM bigint,
UNIMED_ORIGEM string,
CD_UNIMED_DESTINO bigint,
UNIMED_DESTINO string,
CD_UNIMED_BENEFICIARIO bigint,
ID_BENEFICIARIO string,
COD_BENEFICIARIO string,
FG_RECEM_NATO string,
TIPO_PACIENTE bigint,
NOME_CONTRATADO_EXECUTANTE string,
CNES_CONTRATADO_EXECUTANTE string,
CNPJ_CONTRATADO_EXECUTANTE bigint,
TIPO_PRESTADOR string,
FG_RECURSO_PROPRIO string,
NOME_PROFISSIONAL_EXECUTANTE string,
SG_CONSELHO_PROFISSIONAL_EXECUTANTE string,
NR_CONSELHO_PROFISSIONAL_EXECUTANTE string,
SG_UF_PROFISSIONAL_EXECUTANTE bigint,
CD_CBO_PROFISSIONAL_EXECUTANTE bigint,
TIPO_CONSULTA bigint,
TIPO_ACIDENTE string,
NOME_PROFISSIONAL_SOLICITANTE string,
SG_CONSELHO_PROFISSIONAL_SOLICITANTE string,
NR_CONSELHO_PROFISSIONAL_SOLICITANTE string,
SG_UF_PROFISSIONAL_SOLICITANTE bigint,
CD_CBO_PROFISSIONAL_SOLICITANTE bigint,
CD_TECNICA_UTILIZADA string,
TIPO_PARTICIPACAO string,
TIPO_ATENDIMENTO string,
CD_CARATER_ATENDIMENTO bigint,
TIPO_ENCERRAMENTO string,
DT_EXECUCAO TIMESTAMP,
DT_ATENDIMENTO TIMESTAMP,
NR_SEQ_ITEM bigint,
CD_ITEM_UNICO string,
TIPO_TABELA bigint,
CD_SERVICO string,
TP_GUIA string,
TIPO_INTERNACAO bigint,
TIPO_REGIME_INTERNACAO string,
CD_CID string,
NR_GUIATISSPRESTADOR string,
CHAVE_NOTA string);'''.format(bronzeTable)
createIngetionStatus = '''CREATE TABLE IF NOT EXISTS {} (
queryStartDate timestamp,
queryEndDate timestamp,
data_count bigint,
exception string);'''.format(ingetionStatus)
createLegacyIngestionStatus = '''CREATE TABLE IF NOT EXISTS {} (queryStartDate timestamp,
queryEndDate timestamp,
data_count bigint,
exception string);'''.format(legacyIngetionStatus)
spark.sql(useBronzeDBQuery)
spark.sql(createBronzeTableQuery)
spark.sql(createIngetionStatus)
spark.sql(createLegacyIngestionStatus)
# COMMAND ----------
# MAGIC %md
# MAGIC ### CREATE SILVER TABLE
# COMMAND ----------
silverCommonCreateTableQuery = '''CREATE TABLE IF NOT EXISTS {} (
CHAVE_NOTA string,
processStatus string,
dateCreated string,
lastUpdated string,
DT_TRANSACAO string,
STATUS string,
CD_UNIMED_ORIGEM string,
COD_BENEFICIARIO string,
NOME_PROFISSIONAL_EXECUTANTE string,
SG_CONSELHO_PROFISSIONAL_EXECUTANTE string,
NR_CONSELHO_PROFISSIONAL_EXECUTANTE string,
SG_UF_PROFISSIONAL_EXECUTANTE string,
CD_CBO_PROFISSIONAL_EXECUTANTE string,
TP_GUIA string,
CD_UNIMED_DESTINO string,
FG_RECEM_NATO string,
TIPO_PACIENTE string,
TIPO_PRESTADOR string,
FG_RECURSO_PROPRIO string,
TIPO_CONSULTA string,
TIPO_ACIDENTE string,
NOME_PROFISSIONAL_SOLICITANTE string,
SG_CONSELHO_PROFISSIONAL_SOLICITANTE string,
NR_CONSELHO_PROFISSIONAL_SOLICITANTE string,
SG_UF_PROFISSIONAL_SOLICITANTE string,
CD_CBO_PROFISSIONAL_SOLICITANTE string,
CD_TECNICA_UTILIZADA string,
TIPO_PARTICIPACAO string,
CD_CARATER_ATENDIMENTO string,
TIPO_ENCERRAMENTO string,
TIPO_TABELA string,
TIPO_INTERNACAO string,
TIPO_REGIME_INTERNACAO string,
CD_CID string,
NR_GUIATISSPRESTADOR string);'''.format(silverCommonTable)
silverItemCreateTableQuery = '''CREATE TABLE IF NOT EXISTS {} (
CHAVE_NOTA string,
processStatus string,
dateCreated string,
lastUpdated string,
NOME_CONTRATADO_EXECUTANTE string,
CD_SERVICO string,
CNPJ_CONTRATADO_EXECUTANTE string,
DT_ATENDIMENTO string,
TIPO_ATENDIMENTO string)'''.format(silverItemTable)
spark.sql(useSilverDBQuery)
spark.sql(silverItemCreateTableQuery)
spark.sql(silverCommonCreateTableQuery)