Skip to content

Commit

Permalink
added add_sqlsink and update get_sink to call it
Browse files Browse the repository at this point in the history
  • Loading branch information
BuzzCutNorman committed Jul 20, 2023
1 parent 4a84b47 commit e3d31e9
Showing 1 changed file with 86 additions and 1 deletion.
87 changes: 86 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from singer_sdk.connectors import SQLConnector
from singer_sdk.mapper import PluginMapper
from singer_sdk.sinks import Sink
from singer_sdk.sinks import Sink, SQLSink

_MAX_PARALLELISM = 8

Expand Down Expand Up @@ -633,3 +633,88 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
super().append_builtin_config(config_jsonschema)

pass

def add_sqlsink(
self,
stream_name: str,
schema: dict,
key_properties: list[str] | None = None,
) -> SQLSink:
"""Create a sink and register it.
This method is internal to the SDK and should not need to be overridden.
Args:
stream_name: Name of the stream.
schema: Schema of the stream.
key_properties: Primary key of the stream.
Returns:
A new sink for the stream.
"""
self.logger.info("Initializing '%s' target sink...", self.name)
sink_class = self.default_sink_class
sink = sink_class(
target=self,
stream_name=stream_name,
schema=schema,
key_properties=key_properties,
connector=self.target_connector,
)
sink.setup()
self._sinks_active[stream_name] = sink
return sink

def get_sink(
self,
stream_name: str,
*,
record: dict | None = None,
schema: dict | None = None,
key_properties: list[str] | None = None,
) -> SQLSink:
"""Return a sink for the given stream name.
A new sink will be created if `schema` is provided and if either `schema` or
`key_properties` has changed. If so, the old sink becomes archived and held
until the next drain_all() operation.
Developers only need to override this method if they want to provide a different
sink depending on the values within the `record` object. Otherwise, please see
`default_sink_class` property and/or the `get_sink_class()` method.
Raises :class:`singer_sdk.exceptions.RecordsWithoutSchemaException` if sink does
not exist and schema is not sent.
Args:
stream_name: Name of the stream.
record: Record being processed.
schema: Stream schema.
key_properties: Primary key of the stream.
Returns:
The sink used for this target.
"""
_ = record # Custom implementations may use record in sink selection.
if schema is None:
self._assert_sink_exists(stream_name)
return self._sinks_active[stream_name]

existing_sink = self._sinks_active.get(stream_name, None)
if not existing_sink:
return self.add_sqlsink(stream_name, schema, key_properties)

if (
existing_sink.schema != schema
or existing_sink.key_properties != key_properties
):
self.logger.info(
"Schema or key properties for '%s' stream have changed. "
"Initializing a new '%s' sink...",
stream_name,
stream_name,
)
self._sinks_to_clear.append(self._sinks_active.pop(stream_name))
return self.add_sqlsink(stream_name, schema, key_properties)

return existing_sink

0 comments on commit e3d31e9

Please sign in to comment.