From 781bb049c4ec75916b908d424b42dd030f1a1780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 27 Jul 2023 11:51:48 -0600 Subject: [PATCH] feat: Add `_sdc_sync_started_at` metadata column to indicate the start of the target process --- singer_sdk/plugin_base.py | 14 +++++++++++++- singer_sdk/sinks/core.py | 12 ++++++++---- singer_sdk/sinks/sql.py | 4 ++-- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index d81e8f7c3..511d8b7fc 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -24,7 +24,7 @@ from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._compat import metadata from singer_sdk.helpers._secrets import SecretString, is_common_secret_key -from singer_sdk.helpers._util import read_json_file +from singer_sdk.helpers._util import read_json_file, utc_now from singer_sdk.helpers.capabilities import ( FLATTENING_CONFIG, STREAM_MAPS_CONFIG, @@ -155,6 +155,9 @@ def __init__( metrics._setup_logging(self.config) self.metrics_logger = metrics.get_metrics_logger() + # Initialization timestamp + self.__initialized_at = int(utc_now().timestamp()) + def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" self._mapper = PluginMapper( @@ -185,6 +188,15 @@ def mapper(self, mapper: PluginMapper) -> None: """ self._mapper = mapper + @property + def initialized_at(self) -> int: + """Start time of the plugin. + + Returns: + The start time of the plugin. + """ + return self.__initialized_at + @classproperty def capabilities(self) -> list[CapabilitiesEnum]: """Get capabilities. diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 9928aa6f2..19dfbc31a 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -32,7 +32,7 @@ if t.TYPE_CHECKING: from logging import Logger - from singer_sdk.plugin_base import PluginBase + from singer_sdk.target_base import Target JSONSchemaValidator = Draft7Validator @@ -48,7 +48,7 @@ class Sink(metaclass=abc.ABCMeta): def __init__( self, - target: PluginBase, + target: Target, stream_name: str, schema: dict, key_properties: list[str] | None, @@ -62,6 +62,7 @@ def __init__( key_properties: Primary key of the stream to sink. """ self.logger = target.logger + self.sync_started_at = target.initialized_at self._config = dict(target.config) self._pending_batch: dict | None = None self.stream_name = stream_name @@ -238,7 +239,7 @@ def _add_sdc_metadata_to_record( Args: record: Individual record in the stream. - message: TODO + message: The record message. context: Stream partition or context dictionary. """ record["_sdc_extracted_at"] = message.get("time_extracted") @@ -252,6 +253,7 @@ def _add_sdc_metadata_to_record( record["_sdc_deleted_at"] = record.get("_sdc_deleted_at") record["_sdc_sequence"] = int(round(time.time() * 1000)) record["_sdc_table_version"] = message.get("version") + record["_sdc_sync_started_at"] = self.sync_started_at def _add_sdc_metadata_to_schema(self) -> None: """Add _sdc metadata columns. @@ -270,7 +272,7 @@ def _add_sdc_metadata_to_schema(self) -> None: "type": ["null", "string"], "format": "date-time", } - for col in ("_sdc_sequence", "_sdc_table_version"): + for col in ("_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at"): properties_dict[col] = {"type": ["null", "integer"]} def _remove_sdc_metadata_from_schema(self) -> None: @@ -287,6 +289,7 @@ def _remove_sdc_metadata_from_schema(self) -> None: "_sdc_deleted_at", "_sdc_sequence", "_sdc_table_version", + "_sdc_sync_started_at", ): properties_dict.pop(col, None) @@ -305,6 +308,7 @@ def _remove_sdc_metadata_from_record(self, record: dict) -> None: record.pop("_sdc_deleted_at", None) record.pop("_sdc_sequence", None) record.pop("_sdc_table_version", None) + record.pop("_sdc_sync_started_at", None) # Record validation diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 9dac99b1e..238e83dec 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -20,7 +20,7 @@ if t.TYPE_CHECKING: from sqlalchemy.sql import Executable - from singer_sdk.plugin_base import PluginBase + from singer_sdk.target_base import Target class SQLSink(BatchSink): @@ -32,7 +32,7 @@ class SQLSink(BatchSink): def __init__( self, - target: PluginBase, + target: Target, stream_name: str, schema: dict, key_properties: list[str] | None,