Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/Voyz/databay into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Voyz committed Dec 17, 2020
2 parents 0d58630 + f7817ae commit bd6cb06
Show file tree
Hide file tree
Showing 54 changed files with 554 additions and 409 deletions.
15 changes: 8 additions & 7 deletions databay/base_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

_LOGGER = logging.getLogger('databay.BasePlanner')


class BasePlanner(ABC):
"""
Base abstract class for a job planner. Implementations should handle scheduling link transfers based on :py:class:`datetime.timedelta` intervals.
"""


def __init__(self, links:Union[Link, List[Link]]=None):
def __init__(self, links: Union[Link, List[Link]] = None):
"""
:type links: :any:`Link` or list[:any:`Link`]
:param links: Links that should be added and scheduled.
Expand All @@ -38,7 +38,7 @@ def links(self):
"""
return self._links

def add_links(self, links:Union[Link, List[Link]]):
def add_links(self, links: Union[Link, List[Link]]):
"""
Add new links to this planner. This can be run once planner is already running.
Expand Down Expand Up @@ -68,7 +68,8 @@ def remove_links(self, links: Link):

for link in links:
if link not in self._links:
raise MissingLinkError(f'Planner does not contain the link: {link}')
raise MissingLinkError(
f'Planner does not contain the link: {link}')

if link.job is not None:
self._unschedule(link)
Expand Down Expand Up @@ -112,7 +113,7 @@ def start(self):
link.on_start()
self._start_planner()

def shutdown(self, wait:bool=True):
def shutdown(self, wait: bool = True):
"""
Shutdown this planner. Links will stop being scheduled after calling this method. Remaining link jobs may still execute after calling this method depending on the concrete planner implementation.
Expand All @@ -133,7 +134,7 @@ def _start_planner(self):
raise NotImplementedError()

@abstractmethod
def _shutdown_planner(self, wait:bool=True):
def _shutdown_planner(self, wait: bool = True):
"""
Override this method to provide shutdown functionality.
"""
Expand All @@ -156,4 +157,4 @@ def running(self):
Override this property to indicate when the underlying scheduling functionality is currently running.
"""
raise NotImplementedError()
raise NotImplementedError()
6 changes: 4 additions & 2 deletions databay/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

IGNORE_WARNINGS = []


def initialise():
import logging

from databay.misc.logs import ISO8601Formatter

iso8601_formatter = ISO8601Formatter('%(asctime)s|%(levelname)-.1s| %(message)s (%(name)s)', millis_precision=3)# / %(threadName)s)')
iso8601_formatter = ISO8601Formatter(
'%(asctime)s|%(levelname)-.1s| %(message)s (%(name)s)', millis_precision=3) # / %(threadName)s)')
iso8601_formatter.set_pretty(True)

stream_handler = logging.StreamHandler()
Expand All @@ -30,7 +32,7 @@ def initialise():

if sys.platform.startswith('win') and \
(sys.stdin.encoding == 'windows-1252' or sys.stdout.encoding == 'windows-1252') and \
'windows-1252' not in IGNORE_WARNINGS:
'windows-1252' not in IGNORE_WARNINGS:
default_logger.warning('stdin or stdout encoder is set to \'windows-1252\'. This may cause errors with data streaming. Fix by setting following environment variables: \n\nPYTHONIOENCODING=utf-8\nPYTHONLEGACYWINDOWSSTDIO=utf-8\n\nSet DATABAY_IGNORE_WARNINGS=\'windows-1252\' to ignore this warning.')

# monkey patch on asyncio.run for Python versions below 3.7.
Expand Down
3 changes: 2 additions & 1 deletion databay/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ImplementationError(RuntimeError):
""" Raised when concrete implementation is incorrect."""
pass


class InvalidNodeError(RuntimeError):
""" Raised when invalid node (inlet or outlet) is provided."""
pass
pass
17 changes: 7 additions & 10 deletions databay/inlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
import databay as da




class Inlet(ABC):
"""
Abstract class representing an input of the data stream.
"""

def __init__(self, metadata:dict=None):
def __init__(self, metadata: dict = None):
"""
:type metadata: dict
:param metadata: Global metadata that will be attached to each record generated by this inlet. It can be overridden or appended to by providing metadata when creating a record using :py:func:`new_record` function. |default| :code:`None`
Expand All @@ -45,7 +43,7 @@ def metadata(self):
"""
return self._metadata

async def _pull(self, update:'da.Update'):
async def _pull(self, update: 'da.Update'):
if self._uses_coroutine:
data = await self.pull(update)
else:
Expand All @@ -62,9 +60,8 @@ async def _pull(self, update:'da.Update'):

return data


@abstractmethod
def pull(self, update:'da.Update') -> List[Record]:
def pull(self, update: 'da.Update') -> List[Record]:
"""
Produce new data.
Expand All @@ -79,7 +76,7 @@ def pull(self, update:'da.Update') -> List[Record]:
"""
raise NotImplementedError()

def new_record(self, payload, metadata:dict=None) -> Record:
def new_record(self, payload, metadata: dict = None) -> Record:
"""
Create a new :any:`Record`. This should be the preferred way of creating new records.
Expand All @@ -94,7 +91,8 @@ def new_record(self, payload, metadata:dict=None) -> Record:
:rtype: :any:`Record`
"""

full_metadata = {**self._metadata, **(metadata if metadata is not None else {})}
full_metadata = {**self._metadata, **
(metadata if metadata is not None else {})}
full_metadata['__inlet__'] = str(self)
return Record(payload=payload, metadata=full_metadata)

Expand Down Expand Up @@ -181,7 +179,6 @@ def active(self):
"""
return self._active


def __repr__(self):
s = "%s(" % (self.__class__.__name__)

Expand All @@ -190,4 +187,4 @@ def __repr__(self):

s += ')'

return s
return s
7 changes: 4 additions & 3 deletions databay/inlets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

if importlib.util.find_spec('aiohttp') is not None:
from databay.inlets.http_inlet import HttpInlet
else: # pragma: no cover
else: # pragma: no cover
def HttpInlet(*args, **kwargs):
raise ImportError('aiohttp dependency is required for HttpInlet. Fix by running: pip install "databay[HttpInlet]"')
raise ImportError(
'aiohttp dependency is required for HttpInlet. Fix by running: pip install "databay[HttpInlet]"')

from databay.inlets.random_int_inlet import RandomIntInlet
from databay.inlets.null_inlet import NullInlet
from databay.inlets.null_inlet import NullInlet
10 changes: 6 additions & 4 deletions databay/inlets/file_inlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@

from databay import Inlet


class FileInletMode(Enum):
"""Enum defining the mode in which the FileInlet should read the file."""

LINE:str = 'line'
LINE: str = 'line'
"""Read file one line per transfer. This will open the file and hold it open for as long as the planner is running."""

FILE:str = 'file'
FILE: str = 'file'
"""Read the entire file on each transfer. This will only open the file briefly during the transfer."""


class FileInlet(Inlet):

"""
Inlet producing data by reading a file.
"""

def __init__(self, filepath:str, read_mode:FileInletMode= FileInletMode.LINE, *args, **kwargs):
def __init__(self, filepath: str, read_mode: FileInletMode = FileInletMode.LINE, *args, **kwargs):
"""
:param filepath: Path to the file.
:type filepath: str
Expand Down Expand Up @@ -71,4 +73,4 @@ def __repr__(self):

s += ')'

return s
return s
8 changes: 4 additions & 4 deletions databay/inlets/http_inlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

_LOGGER = logging.getLogger('databay.HttpInlet')


class HttpInlet(Inlet):
"""
Inlet for pulling data from a specified URL using `aiohttp <aiohttp.ClientSession.get_>`__.
.. _aiohttp.ClientSession.get: https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession.get
"""

def __init__(self, url:str, json:str=True, *args, **kwargs):
def __init__(self, url: str, json: str = True, *args, **kwargs):
"""
:type url: str
:param url: URL that should be queried for data.
Expand Down Expand Up @@ -60,7 +61,8 @@ async def pull(self, update) -> Union[List[Record], str]:
return payload.decode("utf-8")
except Exception as e:
if isinstance(e, JSONDecodeError) and 'Expecting value: line 1 column 1 (char 0)' in str(e):
raise ValueError(f'Response does not contain valid JSON:\n\n{payload}') from e
raise ValueError(
f'Response does not contain valid JSON:\n\n{payload}') from e
else:
raise e

Expand All @@ -74,5 +76,3 @@ def __repr__(self):

s += ')'
return s


5 changes: 3 additions & 2 deletions databay/inlets/null_inlet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from databay import Inlet

class NullInlet(Inlet): # pragma: no cover

class NullInlet(Inlet): # pragma: no cover
"""
Inlet that doesn't do anything, essentially a 'no-op' inlet.
"""
Expand All @@ -11,4 +12,4 @@ def pull(self, update):
:returns: empty list
"""
return []
return []
6 changes: 3 additions & 3 deletions databay/inlets/random_int_inlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from databay import Inlet


class RandomIntInlet(Inlet): # pragma: no cover
class RandomIntInlet(Inlet): # pragma: no cover
"""
Inlet that will generate a random integer within the specified range.
"""

def __init__(self, min:int=0, max:int=100, *args, **kwargs):
def __init__(self, min: int = 0, max: int = 100, *args, **kwargs):
"""
:param min: Lower boundary of the random range.
Expand All @@ -31,4 +31,4 @@ def pull(self, update):
:return: Single or multiple records produced.
:rtype: :any:`Record` or list[:any:`Record`]
"""
return random.randint(self.min, self.max)
return random.randint(self.min, self.max)
Loading

0 comments on commit bd6cb06

Please sign in to comment.