generated from guionardo/python-package
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4 from guionardo/feature/config
Added base config and basic docs to cache
- Loading branch information
Showing
15 changed files
with
608 additions
and
129 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
lint: | ||
pylint -j 4 --fail-under=9 $(git ls-files '*.py') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ pylint = "*" | |
|
||
[packages] | ||
redis = "*" | ||
pyyaml = "*" | ||
|
||
[requires] | ||
python_version = "3.8" |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
__all__ = ['BaseConfig'] | ||
|
||
from .base_config import BaseConfig |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
"""Base Configuration module""" | ||
|
||
import json | ||
import os | ||
from datetime import date, datetime, timedelta | ||
|
||
import yaml | ||
|
||
from .class_properties import get_envs, get_fields_default_values, get_types | ||
|
||
|
||
def _create_config_field(internal_name: str, field_type: type, | ||
is_list: bool, default_value: any, field_name: str): | ||
if not field_type: | ||
if default_value is None: | ||
raise ValueError(f'No type specified for {internal_name}') | ||
field_type = type(default_value) | ||
if default_value is None: | ||
default_value = field_type() | ||
if not field_name: | ||
field_name = internal_name | ||
|
||
def __parse_value_type(value, value_type): | ||
if value_type in (int, float, bool, str): | ||
return value_type(value) | ||
|
||
if value_type == datetime: | ||
return datetime.strptime(value, '%Y-%m-%d %H:%M:%S') | ||
if value_type == date: | ||
return date.strptime(value, '%Y-%m-%d') | ||
if value_type == timedelta: | ||
return timedelta(**value) | ||
if issubclass(value_type, BaseConfig): | ||
return value_type(value) | ||
if isinstance(value_type, list): | ||
return [__parse_value_type(v, value_type[0]) for v in value] | ||
|
||
raise TypeError(f'Unknown type {value_type}') | ||
|
||
def load(source: dict) -> any: | ||
if field_name in source: | ||
source_value = source[field_name] | ||
if not is_list: | ||
source_value = __parse_value_type(source_value, field_type) | ||
else: | ||
source_value = [__parse_value_type(v, field_type) | ||
for v in source_value] | ||
else: | ||
source_value = default_value | ||
|
||
return source_value | ||
|
||
return load | ||
|
||
|
||
class BaseConfig: | ||
"""Base Configuration class | ||
Field types: | ||
- int | ||
- float | ||
- bool | ||
- str | ||
- datetime.datetime | ||
- datetime.date | ||
- datetime.timedelta | ||
- BaseConfig inherited classes | ||
- list (of types above) | ||
""" | ||
|
||
def __init__(self, source=None, **dict_source): | ||
self.__fields = self.__parse_fields() | ||
self.__load(source or dict_source) | ||
|
||
def __parse_fields(self): | ||
|
||
default_values = get_fields_default_values(self) | ||
types = get_types(self) | ||
envs = get_envs(self) | ||
|
||
all_field_names = set(default_values.keys()) | set(types.keys()) | ||
|
||
data = { | ||
field: _create_config_field(field, | ||
*types.get(field, (None, False)), | ||
default_values.get(field, None), | ||
envs.get(field, field)) | ||
|
||
for field in all_field_names | ||
} | ||
|
||
return data | ||
|
||
def __load(self, source): | ||
if source is None: | ||
return | ||
|
||
if isinstance(source, str): | ||
if os.path.isfile(source): | ||
source = self.__parse_file(source) | ||
else: | ||
source = self.__parse_content(source) | ||
|
||
if not isinstance(source, dict): | ||
return | ||
|
||
for member, field in self.__fields.items(): | ||
setattr(self, member, field(source)) | ||
|
||
def __parse_value_type(self, value_type, value): | ||
if value_type in (int, float, bool, str): | ||
return value_type(value) | ||
|
||
if value_type == datetime: | ||
return datetime.strptime(value, '%Y-%m-%d %H:%M:%S') | ||
if value_type == date: | ||
return date.strptime(value, '%Y-%m-%d') | ||
if value_type == timedelta: | ||
return timedelta(**value) | ||
if issubclass(value_type, BaseConfig): | ||
return value_type(value) | ||
if isinstance(value_type, list): | ||
return [self.__parse_value_type(value_type[0], v) for v in value] | ||
|
||
raise TypeError(f'Unknown type {value_type}') | ||
|
||
def __parse_file(self, path): | ||
try: | ||
with open(path, 'r', encoding='utf-8') as file: | ||
content = file.read() | ||
except Exception as exc: | ||
raise ValueError(f'Error reading file {path}: {exc}') from exc | ||
|
||
return self.__parse_content(content) | ||
|
||
def __parse_content(self, content): | ||
try: | ||
return json.loads(content) | ||
except json.JSONDecodeError: | ||
# Pass to yaml parser | ||
... | ||
except Exception as exc: | ||
raise ValueError(f'Error parsing content {exc}') from exc | ||
try: | ||
return yaml.load(content, yaml.SafeLoader) | ||
except Exception as exc: | ||
raise ValueError(f'Error parsing content {exc}') from exc | ||
|
||
def __repr__(self) -> str: | ||
fields = ", ".join( | ||
f"{k}={repr(getattr(self,k))}" for k in self.__fields) | ||
return f'{self.__class__.__name__}({fields})' | ||
|
||
def __dict__(self) -> dict: | ||
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
"""Class Properties functions""" | ||
|
||
import inspect | ||
|
||
from typing import Dict, Tuple | ||
|
||
|
||
def get_fields_default_values(cls) -> Dict[str, any]: | ||
"""Returns dict with fields (key) and default_value""" | ||
data = {member: default_value | ||
for member, default_value in inspect.getmembers(cls) | ||
if not member.startswith('__') and not callable(default_value)} | ||
return data | ||
|
||
|
||
def get_types(cls) -> Dict[str, Tuple[type, bool]]: | ||
"""Returns dict with fields (key) and (type, is_list)""" | ||
|
||
annotations = getattr(cls, '__annotations__', {}) | ||
data = {} | ||
for member, member_type in annotations.items(): | ||
if member_type is list: | ||
raise TypeError(f'{cls}.{member} must be declared List[type]') | ||
if getattr(member_type, '_name', 'NONE') == 'List': | ||
data[member] = (member_type.__args__[0], True) | ||
else: | ||
data[member] = (member_type, False) | ||
|
||
return data | ||
|
||
|
||
def get_comments(cls) -> Dict[str, str]: | ||
"""Returns dict with fields (key) and comment""" | ||
ret = {} | ||
# inspect things coming from parent classes | ||
if not inspect.isclass(cls): | ||
cls = cls.__class__ | ||
for parent in cls.__bases__: | ||
if parent != object: | ||
ret.update(get_comments(parent)) | ||
|
||
# get class field names | ||
fields = set(member for member, default_value in inspect.getmembers(cls) | ||
if not member.startswith('__') and not callable(default_value)) | \ | ||
set(member for member, _ in getattr(cls, '__annotations__', {}).items()) | ||
|
||
# get all lines | ||
lines = [line | ||
for line in inspect.getsourcelines(cls)[0] | ||
if line.strip() and not '"""' in line and not "'''" in line] | ||
first_line = lines[1] | ||
indent_root = lines[1][0:len(first_line)-len(first_line.lstrip())] | ||
|
||
# get lines from root indentation with FIELD definition with comments | ||
field_lines = [line | ||
for line in lines | ||
if line.startswith(indent_root) and line[len(indent_root)] != ' ' | ||
and ('=' in line or ':' in line) | ||
and '#' in line | ||
] | ||
for line in field_lines: | ||
sep = ':' if ':' in line else '=' | ||
field_name = line.split(sep)[0].strip() | ||
|
||
if field_name not in fields: | ||
continue | ||
|
||
comment = line.split('#', maxsplit=1)[1].strip() | ||
ret[field_name] = comment | ||
|
||
# get lines from __init__ with FIELD definition with comments | ||
for line in inspect.getsourcelines(cls.__init__)[0][1:]: | ||
line = line.strip() | ||
if not (line.startswith('self.') and '=' in line and '#' in line): | ||
continue | ||
field_name = line[5:].split('=')[0].strip() | ||
comment = line.split('#')[1].strip() | ||
ret[field_name] = comment | ||
|
||
return ret | ||
|
||
|
||
def get_envs(cls) -> Dict[str, str]: | ||
"""Get environment variable definitions for fields""" | ||
|
||
comments = get_comments(cls) | ||
envs = {} | ||
for field, comment in comments.items(): | ||
if 'ENV:' in comment: | ||
env = comment.split('ENV:')[1].strip() | ||
if env: | ||
envs[field] = env | ||
return envs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
redis==4.3.3 | ||
redis==4.3.3 | ||
PyYAML==6.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Oops, something went wrong.