From 6f3fd105826f1741bd4b6e884c52713713af43a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 16 Jul 2024 18:48:03 -0600 Subject: [PATCH] refactor: Implement abstract `serialize_message` for Singer writers --- singer_sdk/_singerlib/_encoding/simple.py | 65 ------- .../{_encoding => encoding}/__init__.py | 4 +- singer_sdk/_singerlib/encoding/_base.py | 161 ++++++++++++++++++ .../base.py => encoding/_simple.py} | 160 ++++------------- singer_sdk/_singerlib/messages.py | 6 +- singer_sdk/io_base.py | 2 +- 6 files changed, 198 insertions(+), 200 deletions(-) delete mode 100644 singer_sdk/_singerlib/_encoding/simple.py rename singer_sdk/_singerlib/{_encoding => encoding}/__init__.py (57%) create mode 100644 singer_sdk/_singerlib/encoding/_base.py rename singer_sdk/_singerlib/{_encoding/base.py => encoding/_simple.py} (52%) diff --git a/singer_sdk/_singerlib/_encoding/simple.py b/singer_sdk/_singerlib/_encoding/simple.py deleted file mode 100644 index 636a8f981..000000000 --- a/singer_sdk/_singerlib/_encoding/simple.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import annotations - -import json -import logging -import sys -import typing as t - -from singer_sdk._singerlib.exceptions import InvalidInputLine -from singer_sdk._singerlib.json import deserialize_json, serialize_json - -from .base import GenericSingerReader, GenericSingerWriter - -if t.TYPE_CHECKING: - from singer_sdk._singerlib.messages import Message - -logger = logging.getLogger(__name__) - - -class SingerReader(GenericSingerReader[str]): - """Base class for all plugins reading Singer messages as strings from stdin.""" - - default_input = sys.stdin - - def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 - """Deserialize a line of json. - - Args: - line: A single line of json. - - Returns: - A dictionary of the deserialized json. - - Raises: - InvalidInputLine: If the line is not valid JSON. - """ - try: - return deserialize_json(line) - except json.decoder.JSONDecodeError as exc: - logger.exception("Unable to parse:\n%s", line) - msg = f"Unable to parse line as JSON: {line}" - raise InvalidInputLine(msg) from exc - - -class SingerWriter(GenericSingerWriter[str]): - """Interface for all plugins writing Singer messages to stdout.""" - - def serialize_json(self, obj: object) -> str: # noqa: PLR6301 - """Serialize a dictionary into a line of json. - - Args: - obj: A Python object usually a dict. - - Returns: - A string of serialized json. - """ - return serialize_json(obj) - - def write_message(self, message: Message) -> None: - """Write a message to stdout. - - Args: - message: The message to write. - """ - sys.stdout.write(self.format_message(message) + "\n") - sys.stdout.flush() diff --git a/singer_sdk/_singerlib/_encoding/__init__.py b/singer_sdk/_singerlib/encoding/__init__.py similarity index 57% rename from singer_sdk/_singerlib/_encoding/__init__.py rename to singer_sdk/_singerlib/encoding/__init__.py index 7819a1452..2d64c54dc 100644 --- a/singer_sdk/_singerlib/_encoding/__init__.py +++ b/singer_sdk/_singerlib/encoding/__init__.py @@ -1,7 +1,7 @@ from __future__ import annotations -from .base import GenericSingerReader, GenericSingerWriter, SingerMessageType -from .simple import SingerReader, SingerWriter +from ._base import GenericSingerReader, GenericSingerWriter, SingerMessageType +from ._simple import SingerReader, SingerWriter __all__ = [ "GenericSingerReader", diff --git a/singer_sdk/_singerlib/encoding/_base.py b/singer_sdk/_singerlib/encoding/_base.py new file mode 100644 index 000000000..62793776b --- /dev/null +++ b/singer_sdk/_singerlib/encoding/_base.py @@ -0,0 +1,161 @@ +"""Abstract base classes for all Singer messages IO operations.""" + +from __future__ import annotations + +import abc +import enum +import logging +import sys +import typing as t +from collections import Counter, defaultdict + +from singer_sdk._singerlib import exceptions + +if sys.version_info < (3, 11): + from backports.datetime_fromisoformat import MonkeyPatch + + MonkeyPatch.patch_fromisoformat() + +logger = logging.getLogger(__name__) + + +# TODO: Use to default to 'str' here +# https://peps.python.org/pep-0696/ +T = t.TypeVar("T", str, bytes) +M = t.TypeVar("M") + + +class SingerMessageType(str, enum.Enum): + """Singer specification message types.""" + + RECORD = "RECORD" + SCHEMA = "SCHEMA" + STATE = "STATE" + ACTIVATE_VERSION = "ACTIVATE_VERSION" + BATCH = "BATCH" + + +class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins reading Singer messages as strings or bytes.""" + + @t.final + def listen(self, file_input: t.IO[T] | None = None) -> None: + """Read from input until all messages are processed. + + Args: + file_input: Readable stream of messages. Defaults to standard in. + """ + self._process_lines(file_input or self.default_input) + self._process_endofpipe() + + def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: + """Internal method to process jsonl lines from a Singer tap. + + Args: + file_input: Readable stream of messages, each on a separate line. + + Returns: + A counter object for the processed lines. + """ + stats: dict[str, int] = defaultdict(int) + for line in file_input: + line_dict = self.deserialize_json(line) + self._assert_line_requires(line_dict, requires={"type"}) + + record_type: SingerMessageType = line_dict["type"] + if record_type == SingerMessageType.SCHEMA: + self._process_schema_message(line_dict) + + elif record_type == SingerMessageType.RECORD: + self._process_record_message(line_dict) + + elif record_type == SingerMessageType.ACTIVATE_VERSION: + self._process_activate_version_message(line_dict) + + elif record_type == SingerMessageType.STATE: + self._process_state_message(line_dict) + + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) + + else: + self._process_unknown_message(line_dict) + + stats[record_type] += 1 + + return Counter(**stats) + + @property + @abc.abstractmethod + def default_input(self) -> t.IO[T]: ... + + @staticmethod + def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: + """Check if dictionary . + + Args: + line_dict: TODO + requires: TODO + + Raises: + InvalidInputLine: raised if any required keys are missing + """ + if not requires.issubset(line_dict): + missing = requires - set(line_dict) + msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" + raise exceptions.InvalidInputLine(msg) + + @abc.abstractmethod + def deserialize_json(self, line: T) -> dict: ... + + @abc.abstractmethod + def _process_schema_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_record_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_state_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_activate_version_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_batch_message(self, message_dict: dict) -> None: ... + + def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 + """Internal method to process unknown message types from a Singer tap. + + Args: + message_dict: Dictionary representation of the Singer message. + + Raises: + ValueError: raised if a message type is not recognized + """ + record_type = message_dict["type"] + msg = f"Unknown message type '{record_type}' in message." + raise ValueError(msg) + + def _process_endofpipe(self) -> None: # noqa: PLR6301 + logger.debug("End of pipe reached") + + +class GenericSingerWriter(t.Generic[T, M], metaclass=abc.ABCMeta): + """Interface for all plugins writing Singer messages as strings or bytes.""" + + def format_message(self, message: M) -> T: + """Format a message as a JSON string. + + Args: + message: The message to format. + + Returns: + The formatted message. + """ + return self.serialize_message(message) + + @abc.abstractmethod + def serialize_message(self, message: M) -> T: ... + + @abc.abstractmethod + def write_message(self, message: M) -> None: ... diff --git a/singer_sdk/_singerlib/_encoding/base.py b/singer_sdk/_singerlib/encoding/_simple.py similarity index 52% rename from singer_sdk/_singerlib/_encoding/base.py rename to singer_sdk/_singerlib/encoding/_simple.py index 798a4f6dd..5bfb242fb 100644 --- a/singer_sdk/_singerlib/_encoding/base.py +++ b/singer_sdk/_singerlib/encoding/_simple.py @@ -1,41 +1,20 @@ -"""Abstract base classes for all Singer messages IO operations.""" - from __future__ import annotations -import abc -import enum +import json import logging import sys import typing as t -from collections import Counter, defaultdict from dataclasses import asdict, dataclass, field from datetime import datetime, timezone -from singer_sdk._singerlib import exceptions - -if sys.version_info < (3, 11): - from backports.datetime_fromisoformat import MonkeyPatch +from singer_sdk._singerlib.exceptions import InvalidInputLine +from singer_sdk._singerlib.json import deserialize_json, serialize_json - MonkeyPatch.patch_fromisoformat() +from ._base import GenericSingerReader, GenericSingerWriter, SingerMessageType logger = logging.getLogger(__name__) -# TODO: Use to default to 'str' here -# https://peps.python.org/pep-0696/ -T = t.TypeVar("T", str, bytes) - - -class SingerMessageType(str, enum.Enum): - """Singer specification message types.""" - - RECORD = "RECORD" - SCHEMA = "SCHEMA" - STATE = "STATE" - ACTIVATE_VERSION = "ACTIVATE_VERSION" - BATCH = "BATCH" - - def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]: """Exclude null values from a dictionary. @@ -215,127 +194,50 @@ def __post_init__(self) -> None: self.type = SingerMessageType.ACTIVATE_VERSION -class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): - """Interface for all plugins reading Singer messages as strings or bytes.""" +class SingerReader(GenericSingerReader[str]): + """Base class for all plugins reading Singer messages as strings from stdin.""" - @t.final - def listen(self, file_input: t.IO[T] | None = None) -> None: - """Read from input until all messages are processed. - - Args: - file_input: Readable stream of messages. Defaults to standard in. - """ - self._process_lines(file_input or self.default_input) - self._process_endofpipe() + default_input = sys.stdin - def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: - """Internal method to process jsonl lines from a Singer tap. + def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 + """Deserialize a line of json. Args: - file_input: Readable stream of messages, each on a separate line. + line: A single line of json. Returns: - A counter object for the processed lines. - """ - stats: dict[str, int] = defaultdict(int) - for line in file_input: - line_dict = self.deserialize_json(line) - self._assert_line_requires(line_dict, requires={"type"}) - - record_type: SingerMessageType = line_dict["type"] - if record_type == SingerMessageType.SCHEMA: - self._process_schema_message(line_dict) - - elif record_type == SingerMessageType.RECORD: - self._process_record_message(line_dict) - - elif record_type == SingerMessageType.ACTIVATE_VERSION: - self._process_activate_version_message(line_dict) - - elif record_type == SingerMessageType.STATE: - self._process_state_message(line_dict) - - elif record_type == SingerMessageType.BATCH: - self._process_batch_message(line_dict) - - else: - self._process_unknown_message(line_dict) - - stats[record_type] += 1 - - return Counter(**stats) - - @property - @abc.abstractmethod - def default_input(self) -> t.IO[T]: ... - - @staticmethod - def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: - """Check if dictionary . - - Args: - line_dict: TODO - requires: TODO + A dictionary of the deserialized json. Raises: - InvalidInputLine: raised if any required keys are missing + InvalidInputLine: If the line is not valid JSON. """ - if not requires.issubset(line_dict): - missing = requires - set(line_dict) - msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" - raise exceptions.InvalidInputLine(msg) - - @abc.abstractmethod - def deserialize_json(self, line: T) -> dict: ... - - @abc.abstractmethod - def _process_schema_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_record_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_state_message(self, message_dict: dict) -> None: ... + try: + return deserialize_json(line) + except json.decoder.JSONDecodeError as exc: + logger.exception("Unable to parse:\n%s", line) + msg = f"Unable to parse line as JSON: {line}" + raise InvalidInputLine(msg) from exc - @abc.abstractmethod - def _process_activate_version_message(self, message_dict: dict) -> None: ... - @abc.abstractmethod - def _process_batch_message(self, message_dict: dict) -> None: ... +class SingerWriter(GenericSingerWriter[str, Message]): + """Interface for all plugins writing Singer messages to stdout.""" - def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 - """Internal method to process unknown message types from a Singer tap. + def serialize_message(self, message: Message) -> str: # noqa: PLR6301 + """Serialize a dictionary into a line of json. Args: - message_dict: Dictionary representation of the Singer message. + message: A Singer message object. - Raises: - ValueError: raised if a message type is not recognized + Returns: + A string of serialized json. """ - record_type = message_dict["type"] - msg = f"Unknown message type '{record_type}' in message." - raise ValueError(msg) - - def _process_endofpipe(self) -> None: # noqa: PLR6301 - logger.debug("End of pipe reached") - - -class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): - """Interface for all plugins writing Singer messages as strings or bytes.""" + return serialize_json(message.to_dict()) - def format_message(self, message: Message) -> T: - """Format a message as a JSON string. + def write_message(self, message: Message) -> None: + """Write a message to stdout. Args: - message: The message to format. - - Returns: - The formatted message. + message: The message to write. """ - return self.serialize_json(message.to_dict()) - - @abc.abstractmethod - def serialize_json(self, obj: object) -> T: ... - - @abc.abstractmethod - def write_message(self, message: Message) -> None: ... + sys.stdout.write(self.format_message(message) + "\n") + sys.stdout.flush() diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index b08739126..640219de6 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -2,13 +2,13 @@ from __future__ import annotations -from singer_sdk._singerlib._encoding import SingerWriter -from singer_sdk._singerlib._encoding.base import ( +from .encoding import SingerWriter +from .encoding._base import SingerMessageType +from .encoding._simple import ( ActivateVersionMessage, Message, RecordMessage, SchemaMessage, - SingerMessageType, StateMessage, exclude_null_dict, ) diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index ddc933eb8..f9041beea 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -2,4 +2,4 @@ from __future__ import annotations -from singer_sdk._singerlib._encoding import * # noqa: F403 +from singer_sdk._singerlib.encoding import * # noqa: F403