Skip to content

Commit

Permalink
feat: Add _sdc_sync_started_at metadata column to indicate the star…
Browse files Browse the repository at this point in the history
…t of the target process
  • Loading branch information
edgarrmondragon committed Jul 27, 2023
1 parent b0a8621 commit 781bb04
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
14 changes: 13 additions & 1 deletion singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import metadata
from singer_sdk.helpers._secrets import SecretString, is_common_secret_key
from singer_sdk.helpers._util import read_json_file
from singer_sdk.helpers._util import read_json_file, utc_now
from singer_sdk.helpers.capabilities import (
FLATTENING_CONFIG,
STREAM_MAPS_CONFIG,
Expand Down Expand Up @@ -155,6 +155,9 @@ def __init__(
metrics._setup_logging(self.config)
self.metrics_logger = metrics.get_metrics_logger()

# Initialization timestamp
self.__initialized_at = int(utc_now().timestamp())

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
self._mapper = PluginMapper(
Expand Down Expand Up @@ -185,6 +188,15 @@ def mapper(self, mapper: PluginMapper) -> None:
"""
self._mapper = mapper

@property
def initialized_at(self) -> int:
"""Start time of the plugin.
Returns:
The start time of the plugin.
"""
return self.__initialized_at

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get capabilities.
Expand Down
12 changes: 8 additions & 4 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
if t.TYPE_CHECKING:
from logging import Logger

from singer_sdk.plugin_base import PluginBase
from singer_sdk.target_base import Target

JSONSchemaValidator = Draft7Validator

Expand All @@ -48,7 +48,7 @@ class Sink(metaclass=abc.ABCMeta):

def __init__(
self,
target: PluginBase,
target: Target,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
Expand All @@ -62,6 +62,7 @@ def __init__(
key_properties: Primary key of the stream to sink.
"""
self.logger = target.logger
self.sync_started_at = target.initialized_at
self._config = dict(target.config)
self._pending_batch: dict | None = None
self.stream_name = stream_name
Expand Down Expand Up @@ -238,7 +239,7 @@ def _add_sdc_metadata_to_record(
Args:
record: Individual record in the stream.
message: TODO
message: The record message.
context: Stream partition or context dictionary.
"""
record["_sdc_extracted_at"] = message.get("time_extracted")
Expand All @@ -252,6 +253,7 @@ def _add_sdc_metadata_to_record(
record["_sdc_deleted_at"] = record.get("_sdc_deleted_at")
record["_sdc_sequence"] = int(round(time.time() * 1000))
record["_sdc_table_version"] = message.get("version")
record["_sdc_sync_started_at"] = self.sync_started_at

Check warning on line 256 in singer_sdk/sinks/core.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/sinks/core.py#L256

Added line #L256 was not covered by tests

def _add_sdc_metadata_to_schema(self) -> None:
"""Add _sdc metadata columns.
Expand All @@ -270,7 +272,7 @@ def _add_sdc_metadata_to_schema(self) -> None:
"type": ["null", "string"],
"format": "date-time",
}
for col in ("_sdc_sequence", "_sdc_table_version"):
for col in ("_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at"):
properties_dict[col] = {"type": ["null", "integer"]}

def _remove_sdc_metadata_from_schema(self) -> None:
Expand All @@ -287,6 +289,7 @@ def _remove_sdc_metadata_from_schema(self) -> None:
"_sdc_deleted_at",
"_sdc_sequence",
"_sdc_table_version",
"_sdc_sync_started_at",
):
properties_dict.pop(col, None)

Expand All @@ -305,6 +308,7 @@ def _remove_sdc_metadata_from_record(self, record: dict) -> None:
record.pop("_sdc_deleted_at", None)
record.pop("_sdc_sequence", None)
record.pop("_sdc_table_version", None)
record.pop("_sdc_sync_started_at", None)

# Record validation

Expand Down
4 changes: 2 additions & 2 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
if t.TYPE_CHECKING:
from sqlalchemy.sql import Executable

from singer_sdk.plugin_base import PluginBase
from singer_sdk.target_base import Target


class SQLSink(BatchSink):
Expand All @@ -32,7 +32,7 @@ class SQLSink(BatchSink):

def __init__(
self,
target: PluginBase,
target: Target,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
Expand Down

0 comments on commit 781bb04

Please sign in to comment.