Skip to content

Commit

Permalink
refactor: PEP8 Conventions Adjusts
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanildoBarauna committed May 24, 2024
1 parent 9518787 commit 56b8a79
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 64 deletions.
41 changes: 22 additions & 19 deletions etl/common/utils/logs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import os

LOG_FORMAT='%(asctime)s :: %(levelname)s :: %(message)s'
LOG_FORMAT = "%(asctime)s :: %(levelname)s :: %(message)s"

def consoleLogger(module):

def ConsoleLogger(module):
"""
Creates a console logger for logging messages to the console and a log file.
Expand All @@ -15,29 +16,30 @@ def consoleLogger(module):
"""
dir_name = f"etl/common/logs/"
os.makedirs(dir_name, exist_ok=True)

with open(dir_name + f"{module}.log", "w") as f:
f.write("")

logging.basicConfig(
filename=dir_name + f"{module}.log",
level=logging.DEBUG,
format=LOG_FORMAT,
datefmt='%Y-%m-%d %H:%M:%S'
datefmt="%Y-%m-%d %H:%M:%S",
)

consoleLog = logging.getLogger("consoleLogger")
consoleLog.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter(LOG_FORMAT)
ch.setFormatter(formatter)

consoleLog.addHandler(ch)

return consoleLog

def loggingInfo(msg, module):


def logging_info(msg, module):
"""
Logs an informational message.
Expand All @@ -51,12 +53,12 @@ def loggingInfo(msg, module):
if logging.getLogger("consoleLogger").hasHandlers():
logger = logging.getLogger("consoleLogger")
else:
logger = consoleLogger(module=module)
logger = ConsoleLogger(module=module)

logger.info(msg=msg)


def loggingError(msg, module):
def logging_error(msg, module):
"""
Logs an error message.
Expand All @@ -70,11 +72,12 @@ def loggingError(msg, module):
if logging.getLogger("consoleLogger").hasHandlers():
logger = logging.getLogger("consoleLogger")
else:
logger = consoleLogger(module=module)
logger = ConsoleLogger(module=module)

logger.error(msg=msg)

def loggingWarn(msg,module):


def logging_warn(msg, module):
"""
Logs a warning message.
Expand All @@ -88,6 +91,6 @@ def loggingWarn(msg,module):
if logging.getLogger("consoleLogger").hasHandlers():
logger = logging.getLogger("consoleLogger")
else:
logger = consoleLogger(module=module)
logger = ConsoleLogger(module=module)

logger.warning(msg=msg)
5 changes: 2 additions & 3 deletions etl/config/logFile.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os


def logFileName(file: str) -> str:
def log_file_name(file: str) -> str:
current_dir = os.path.dirname(os.path.relpath(file))
WORK_DIR = current_dir.split("/")[-1:][0]
return WORK_DIR
return current_dir.split("/")[-1:][0]
11 changes: 5 additions & 6 deletions etl/models/extract/params_validator.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import requests
from etl.common.utils.logs import loggingWarn
from etl.common.utils.logs import logging_warn
from etl.config.datasource import API
from etl.config.logFile import logFileName
from etl.config.logFile import log_file_name

WORK_DIR = logFileName(file=__file__)
WORK_DIR = log_file_name(file=__file__)


class ParamsValidator:
def __init__(self, params: list) -> None:
self.params = params
# self.validParams = self.__ValidParamsForCall__()

def ValidParamsForCall(self) -> list:
def valid_params_for_call(self) -> list:
"""
Returns a list of valid parameters for the pipeline execution.
Expand All @@ -26,7 +25,7 @@ def ValidParamsForCall(self) -> list:
if param in AvaliableList:
valParams.append(param)
else:
loggingWarn(f"Param: {param} is not valid for call", WORK_DIR)
logging_warn(f"Param: {param} is not valid for call", WORK_DIR)

if valParams:
return valParams
Expand Down
11 changes: 6 additions & 5 deletions etl/models/extract/test_api_data_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

def test_extraction_init():
params = ["USD-BRL", "USD-BRLT", "CAD-BRL"]
ext = extraction(params)
assert ext.ValidParams == params
extractor = extraction(params)
response, valid_params = extractor.run()
assert valid_params == params


def test_extraction_run_success():
params = ["USD-BRL", "USD-BRLT", "CAD-BRL"]
ext = extraction(params)
json_data = ext.__run__(ext.ValidParams)
assert isinstance(json_data, dict)
extractor = extraction(params)
response, valid_params = extractor.run()
assert isinstance(response, dict)
7 changes: 3 additions & 4 deletions etl/models/extract/test_params_validator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from matplotlib.rcsetup import validate_ps_distiller
import pytest
from etl.models.extract.params_validator import ParamsValidator

Expand All @@ -20,19 +19,19 @@ def mixed_params():

def test_valid_params(valid_params):
validator = ParamsValidator(valid_params)
valid = validator.ValidParamsForCall()
valid = validator.valid_params_for_call()
assert validator.params == valid_params
assert valid_params == valid


def test_invalid_params(invalid_params):
validator = ParamsValidator(invalid_params)
with pytest.raises(KeyError):
validator.ValidParamsForCall()
validator.valid_params_for_call()


def test_mixed_params(mixed_params, valid_params):
validator = ParamsValidator(mixed_params)
validated = validator.ValidParamsForCall()
validated = validator.valid_params_for_call()
assert validator.params == mixed_params
assert validated == valid_params
11 changes: 5 additions & 6 deletions etl/models/load/parquet_loader.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from math import e
from tqdm import tqdm
import pandas as pd

from etl.config.logFile import logFileName
from etl.common.utils.logs import loggingError, loggingInfo
from etl.config.logFile import log_file_name
from etl.common.utils.logs import logging_error, logging_info
from etl.common.utils.common import (
DefaultTimestampStr,
DefaultOutputFolder,
DefaultUTCDatetime,
)

WORK_DIR = logFileName(file=__file__)
dir = log_file_name(file=__file__)


class load:
Expand All @@ -24,7 +23,7 @@ def run(self):
df = pd.DataFrame([self.dic])

if df.empty:
loggingError("DataFrame is empty", WORK_DIR)
logging_error("DataFrame is empty", dir)
raise ValueError("DataFrame is empty")

# Add new columns to the DataFrame
Expand All @@ -37,7 +36,7 @@ def run(self):
try:
df.to_parquet(f"{DefaultOutputFolder()}{param}-{ts}.parquet")
except Exception as e:
loggingError(f"Error writing parquet file: {e}", WORK_DIR)
logging_error(f"Error writing parquet file: {e}", dir)

# Append list with the file path
extracted_files.append(f"{param}-{ts}.parquet")
Expand Down
17 changes: 8 additions & 9 deletions etl/models/transform/publisher.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import time
import queue
from tqdm import tqdm
from etl.common.utils.logs import loggingInfo
from etl.config.logFile import logFileName
from etl.config.logFile import log_file_name


WORK_DIR = logFileName(file=__file__)
WORK_DIR = log_file_name(file=__file__)


class transformation:
def __init__(self, json_response: dict, params, fila: object):
def __init__(self, json_response: dict, params, queue: queue.Queue):
self.json_response = json_response
self.validParams = params
self.fila = fila
self.valid_params = params
self.queue = queue

def publish(self):
for param in tqdm(
self.validParams, total=len(self.validParams), desc="Producing Data"
self.valid_params, total=len(self.valid_params), desc="Producing Data"
):
dic = self.json_response[param.replace("-", "")]
time.sleep(0.2)
self.fila.put(dic) # type: ignore
self.queue.put(dic)
20 changes: 10 additions & 10 deletions etl/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@
start = time.time()


def GenerateRandomParams(ParamsQty: int) -> list:
def generate_random_params(params_qty: int) -> list:
"""
Generate a list of random parameters from the available list of parities.
Args:
ParamsQty (int): The number of random parameters to generate.
params_qty (int): The number of random parameters to generate.
Returns:
list: A list of randomly generated parameters.
"""
AvaliableList = list(requests.get(API.ENDPOINT_AVALIABLE_PARITIES).json())
min = random.randint(0, len(AvaliableList) - ParamsQty)
max = min + ParamsQty
if max > len(AvaliableList):
max = len(AvaliableList)
if ParamsQty == len(AvaliableList):
avaliable_list = list(requests.get(API.ENDPOINT_AVALIABLE_PARITIES).json())
min = random.randint(0, len(avaliable_list) - params_qty)
max = min + params_qty
if max > len(avaliable_list):
max = len(avaliable_list)
if params_qty == len(avaliable_list):
max -= max
return AvaliableList[min: max - 1]
return avaliable_list[min : max - 1]


def main(total_files: int = 2):
NewExec = PipelineExecutor(*GenerateRandomParams(total_files))
NewExec = PipelineExecutor(*generate_random_params(total_files))
NewExec.pipeline_run()


Expand Down
4 changes: 2 additions & 2 deletions etl/test_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
import requests
from etl.run import GenerateRandomParams
from etl.run import generate_random_params


def test_generate_random_params():
Expand All @@ -9,7 +9,7 @@ def test_generate_random_params():
requests.get = lambda url: MockResponse(mock_response)

params_qty = 3
result = GenerateRandomParams(params_qty)
result = generate_random_params(params_qty)

assert isinstance(result, list)
assert len(result) == params_qty - 1
Expand Down

0 comments on commit 56b8a79

Please sign in to comment.