Skip to content

Commit

Permalink
feat(extract): Added new extract.py script
Browse files Browse the repository at this point in the history
- Adds a new script `extract.py` that encapsulates the data extraction process
- Includes functionality to:
  - Load environment variables
  - Connect to SQL Server and MongoDB data sources
  - Extract data from the sources
  - Save the extracted data to a specified destination folder
- Provides a command-line interface using the `typer` library
  • Loading branch information
Ronaldo S.A. Batista committed Sep 12, 2024
1 parent d8cf854 commit fd4acc5
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 21 deletions.
4 changes: 2 additions & 2 deletions extracao/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def extraction(self) -> L:
return self.sources.attrgot('df')

def update(self):
df = self.extraction()
self.df = self._format(df)
dfs = self.extraction()
self.df = self._format(dfs)

@staticmethod
def _simplify_sources(df):
Expand Down
5 changes: 0 additions & 5 deletions release.bat

This file was deleted.

10 changes: 0 additions & 10 deletions release.sh

This file was deleted.

66 changes: 66 additions & 0 deletions scripts/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json
import os
import shutil
import warnings
from datetime import datetime
import sys

import pandas as pd
import typer
from dotenv import find_dotenv, load_dotenv
from fastcore.xtras import Path

from extracao.stations import Estacoes

load_dotenv(find_dotenv(), override=True)
warnings.simplefilter('ignore')

SQLSERVER_PARAMS = dict(
driver=os.environ.get('SQL_DRIVER'),
server=os.environ.get('SQL_SERVER'),
database=os.environ.get('SQL_DATABASE'),
trusted_conn=True,
mult_results=True,
encrypt=False,
timeout=int(os.environ.get('SQL_TIMEOUT')),
)

if sys.platform in ('linux', 'darwin', 'cygwin'):
SQLSERVER_PARAMS.update(
{
'trusted_conn': False,
'mult_results': False,
'username': os.environ.get('USERNAME'),
'password': os.environ.get('PASSWORD'),
}
)

MONGO_URI: str = os.environ.get('MONGO_URI')


def get_db(
path: str = os.environ.get('DESTINATION'), # Pasta onde salvar os arquivos",
limit: int = 0, # Número máximo de registros a serem extraídos da cada base MongoDB, 0: sem limite
parallel: bool = True, # Caso verdadeiro efetua as requisições de forma paralela em cada fonte de dados
read_cache: bool = False, # Caso verdadeiro lê os dados já existentes, do contrário efetua a atualização dos dados
reprocess_sources: bool = False,
) -> 'pd.DataFrame': # Retorna o DataFrame com as bases da Anatel e da Aeronáutica
"""Função para encapsular a instância e atualização dos dados"""
import time

start = time.perf_counter()
data = Estacoes(SQLSERVER_PARAMS, MONGO_URI, limit, parallel, read_cache, reprocess_sources)
data.update()
if path is not None:
if path := Path(path):
path.mkdir(parents=True, exist_ok=True)
print(f'Salvando dados em {path}')
subprocess.run(
['powershell', '-Command', f'"robocopy {data.folder} {path} /E /IS /IT"'],
check=False,
)
print(f'Elapsed time: {time.perf_counter() - start} seconds')


if __name__ == '__main__':
typer.run(get_db)
65 changes: 61 additions & 4 deletions scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import warnings
from datetime import datetime
import sys
import subprocess

import pandas as pd
import typer
Expand Down Expand Up @@ -52,17 +53,73 @@ def get_db(
data = Estacoes(SQLSERVER_PARAMS, MONGO_URI, limit, parallel, read_cache, reprocess_sources)
data.update()
data.save()
mod_times = {'ANATEL': datetime.now().strftime('%d/%m/%Y %H:%M:%S')}
mod_times['AERONAUTICA'] = datetime.now().strftime('%d/%m/%Y %H:%M:%S')
mod_time = datetime.now().strftime('%d/%m/%Y %H:%M:%S')
mod_times = {'ANATEL': mod_time, 'AERONAUTICA': mod_time, 'ReleaseDate': mod_time}
versiondb = json.loads((data.folder / 'Release.json').read_text())
mod_times['ReleaseDate'] = datetime.now().strftime('%d/%m/%Y %H:%M:%S')
version = versiondb['rfdatahub']['Version']
version_parts = version.split('.')
version_parts[-1] = str(int(version_parts[-1]) + 1)
new_version = '.'.join(version_parts)
versiondb['rfdatahub']['Version'] = new_version
versiondb['rfdatahub'].update(mod_times)
json.dump(versiondb, (data.folder / 'Release.json').open('w'))
if path is not None:
if (path := Path(path)).exists():
# path.mkdir(parents=True, exist_ok=True)
print(f'Salvando dados em {path}')
shutil.copytree(str(data.folder), str(path), dirs_exist_ok=True)
subprocess.run(
['powershell', '-Command', f'"robocopy {data.folder} {path} /E /IS /IT"'],
check=False,
)

# Create a release with the version tag and generate release notes
subprocess.run(
[
'gh',
'release',
'create',
new_version,
'--generate-notes',
'.\\extracao\\datasources\\arquivos\\saida\\estacoes.parquet.gzip',
'.\\extracao\\datasources\\arquivos\\saida\\log.parquet.gzip',
],
check=False,
)

# Delete the 'rfdatahub' release and cleanup the tag
subprocess.run(
[
'gh',
'release',
'delete',
'rfdatahub',
'--cleanup-tag',
'-R',
'InovaFiscaliza/.github',
'-y',
],
check=False,
)

# Create a new 'rfdatahub' release with specific title, notes, and files
subprocess.run(
[
'gh',
'release',
'create',
'rfdatahub',
'-t',
'RFDataHub',
'--notes',
new_version,
'.\\extracao\\datasources\\arquivos\\saida\\estacoes.parquet.gzip',
'.\\extracao\\datasources\\arquivos\\saida\\log.parquet.gzip',
'.\\extracao\\datasources\\arquivos\\saida\\Release.json',
'-R',
'InovaFiscaliza/.github',
],
check=False,
)

print(f'Elapsed time: {time.perf_counter() - start} seconds')

Expand Down

0 comments on commit fd4acc5

Please sign in to comment.