-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline_01.py
118 lines (93 loc) · 3.64 KB
/
pipeline_01.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import gdown
import duckdb
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
from duckdb import DuckDBPyRelation
from pandas import DataFrame
from datetime import datetime
load_dotenv()
def conectar_banco():
"""Conecta ao banco de dados DuckDB; cria o banco se não existir."""
return duckdb.connect(database='duckdb.db', read_only=False)
def inicializar_tabela(con):
"""Cria a tabela se ela não existir."""
con.execute(
"""
CREATE TABLE IF NOT EXISTS historico_arquivos (
nome_arquivo VARCHAR,
horario_processamento TIMESTAMP
)
"""
)
def registrar_arquivo(con, nome_arquivo):
"""Registra um novo arquivo no banco de dados com o horário atual."""
con.execute(
"""
INSERT INTO historico_arquivos (nome_arquivo, horario_processamento)
VALUES (?, ?)
""",
(nome_arquivo, datetime.now()),
)
def arquivos_processados(con):
"""Retorna um set com os nomes de todos os arquivos já processados."""
return set(
row[0]
for row in con.execute(
'SELECT nome_arquivo FROM historico_arquivos'
).fetchall()
)
def baixar_os_arquivos_do_google_drive(url_pasta, diretorio_local):
os.makedirs(diretorio_local, exist_ok=True)
gdown.download_folder(
url_pasta, output=diretorio_local, quiet=False, use_cookies=False
)
# Função para listar arquivos CSV no diretório especificado
def listar_arquivos_csv(diretorio):
arquivos_csv = []
todos_os_arquivos = os.listdir(diretorio)
for arquivo in todos_os_arquivos:
if arquivo.endswith('.csv'):
caminho_completo = os.path.join(diretorio, arquivo)
arquivos_csv.append(caminho_completo)
return arquivos_csv
# Função para ler um arquivo CSV e retornar um DataFrame duckdb
def ler_csv(caminho_do_arquivo):
dataframe_duckdb = duckdb.read_csv(caminho_do_arquivo)
return dataframe_duckdb
# Função para adicionar uma coluna de total de vendas
def transformar(df: DuckDBPyRelation) -> DataFrame:
# Executa a consulta SQL que inclui a nova coluna, operando sobre a tabela virtual
df_transformado = duckdb.sql(
'SELECT *, quantidade * valor AS total_vendas FROM df'
).df()
# Remove o registro da tabela virtual para limpeza
return df_transformado
# Função para converter o Duckdb em Pandas e salvar o DataFrame no PostgreSQL
def salvar_no_postgres(df_duckdb, tabela):
DATABASE_URL = os.getenv(
'DATABASE_URL'
) # Ex: 'postgresql://user:password@localhost:5432/database_name'
engine = create_engine(DATABASE_URL)
# Salvar o DataFrame no PostgreSQL
df_duckdb.to_sql(tabela, con=engine, if_exists='append', index=False)
# Transformacao
if __name__ == '__main__':
url_pasta = 'https://drive.google.com/drive/folders/19flL9P8UV9aSu4iQtM6Ymv-77VtFcECP'
diretorio_local = './pasta_gdown'
# baixar_os_arquivos_do_google_drive(url_pasta, diretorio_local)
lista_de_arquivos = listar_arquivos_csv(diretorio_local)
con = conectar_banco()
inicializar_tabela(con)
processados = arquivos_processados(con)
for caminho_do_arquivo in lista_de_arquivos:
nome_arquivo = os.path.basename(caminho_do_arquivo)
if nome_arquivo not in processados:
df = ler_csv(caminho_do_arquivo)
df_transformado = transformar(df)
salvar_no_postgres(df_transformado, 'vendas_calculado')
registrar_arquivo(con, nome_arquivo)
print(f'Arquivo {nome_arquivo} processado e salvo.')
else:
print(f'Arquivo {nome_arquivo} já foi processado anteriormente.')