From 5f2aa1950f8a491b2f46e730859fc373329e839f Mon Sep 17 00:00:00 2001 From: Jesse Cotton Date: Tue, 6 Aug 2024 18:57:18 -0700 Subject: [PATCH] fix(targets): Make sink assertion compatible with stream maps (#2589) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Move sink exists assertion to be compatible with stream_maps * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> --- singer_sdk/target_base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 8ecdfa851..e600b6b51 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -334,7 +334,8 @@ def _process_record_message(self, message_dict: dict) -> None: self._assert_line_requires(message_dict, requires={"stream", "record"}) stream_name = message_dict["stream"] - self._assert_sink_exists(stream_name) + if stream_name not in self.mapper.stream_maps: + self._assert_sink_exists(stream_name) for stream_map in self.mapper.stream_maps[stream_name]: raw_record = copy.copy(message_dict["record"]) @@ -343,6 +344,7 @@ def _process_record_message(self, message_dict: dict) -> None: # Record was filtered out by the map transform continue + self._assert_sink_exists(stream_map.stream_alias) sink = self.get_sink(stream_map.stream_alias, record=transformed_record) context = sink._get_context(transformed_record) # noqa: SLF001 if sink.include_sdc_metadata_properties: