From 64f61f65200fcbb03f571311981115e41c53935e Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Mon, 17 Apr 2023 18:33:04 -0600 Subject: [PATCH 1/6] Don't rebuild stream_maps[stream_name] if stream is already known --- singer_sdk/mapper.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index 031ca0c82..f9bf3017b 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -664,11 +664,14 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 if stream_name in self.stream_maps: primary_mapper = self.stream_maps[stream_name][0] if ( - primary_mapper.raw_schema != schema - or primary_mapper.raw_key_properties != key_properties + isinstance(primary_mapper, self.default_mapper_type) + and primary_mapper.raw_schema == schema + and primary_mapper.raw_key_properties == key_properties ): - # Unload/reset stream maps if schema or key properties have changed. - self.stream_maps.pop(stream_name) + return + + # Unload/reset stream maps if schema or key properties have changed. + self.stream_maps.pop(stream_name) if stream_name not in self.stream_maps: # The 0th mapper should be the same-named treatment. From f1f960e86c711b843c288c621e4046d1da51cf8c Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Tue, 18 Apr 2023 21:23:24 -0600 Subject: [PATCH 2/6] Don't emit SCHEMA messages for skipped streams --- samples/sample_mapper/mapper.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/samples/sample_mapper/mapper.py b/samples/sample_mapper/mapper.py index e079bac08..4d2e4c36e 100644 --- a/samples/sample_mapper/mapper.py +++ b/samples/sample_mapper/mapper.py @@ -7,7 +7,7 @@ import singer_sdk._singerlib as singer import singer_sdk.typing as th from singer_sdk.helpers._util import utc_now -from singer_sdk.mapper import PluginMapper +from singer_sdk.mapper import PluginMapper, RemoveRecordTransform from singer_sdk.mapper_base import InlineMapper if t.TYPE_CHECKING: @@ -90,6 +90,10 @@ def map_schema_message( message_dict.get("key_properties", []), ) for stream_map in self.mapper.stream_maps[stream_id]: + if isinstance(stream_map, RemoveRecordTransform): + # Don't emit schema if the stream's records are all ignored. + continue + schema_message = singer.SchemaMessage( stream_map.stream_alias, stream_map.transformed_schema, From 09ea77d5edad5c9197a106e7beb2dd7f37f78077 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Mon, 31 Jul 2023 23:52:40 +0000 Subject: [PATCH 3/6] Add support for glob patterns in source stream names --- singer_sdk/mapper.py | 82 +++++++++++++++++++++------------------ tests/core/test_mapper.py | 80 +++++++++++++++++++++++++++++++++++++- 2 files changed, 123 insertions(+), 39 deletions(-) diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index f9bf3017b..e608d8175 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -11,6 +11,7 @@ import hashlib import logging import typing as t +import fnmatch from singer_sdk.exceptions import MapExpressionError, StreamMapConfigError from singer_sdk.helpers import _simpleeval as simpleeval @@ -691,59 +692,64 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 if isinstance(stream_map_val, dict) else stream_map_val ) - stream_alias: str = stream_map_key source_stream: str = stream_map_key - if isinstance(stream_def, str) and stream_def != NULL_STRING: - if stream_name == stream_map_key: - # TODO: Add any expected cases for str expressions (currently none) - pass - - msg = f"Option '{stream_map_key}:{stream_def}' is not expected." - raise StreamMapConfigError(msg) + stream_alias: str = stream_map_key - if stream_def is None or stream_def == NULL_STRING: - if stream_name != stream_map_key: - continue + is_source_stream_primary = True + if isinstance(stream_def, dict): + if MAPPER_SOURCE_OPTION in stream_def: + # : __source__: + source_stream = stream_def.pop(MAPPER_SOURCE_OPTION) + is_source_stream_primary = False + elif MAPPER_ALIAS_OPTION in stream_def: + # : __alias__: + stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION) + + if stream_name == source_stream: + # Exact match + pass + elif fnmatch.fnmatch(stream_name, source_stream): + # Wildcard match + if stream_alias == source_stream: + stream_alias = stream_name + source_stream = stream_name + else: + continue - self.stream_maps[stream_map_key][0] = RemoveRecordTransform( - stream_alias=stream_map_key, + if isinstance(stream_def, dict): + mapper = CustomStreamMap( + stream_alias=stream_alias, + map_transform=stream_def, + map_config=self.map_config, + raw_schema=schema, + key_properties=key_properties, + flattening_options=self.flattening_options, + ) + elif stream_def is None or (isinstance(stream_def, str) and stream_def == NULL_STRING): + mapper = RemoveRecordTransform( + stream_alias=stream_alias, raw_schema=schema, key_properties=None, flattening_options=self.flattening_options, ) - logging.info("Set null tansform as default for '%s'", stream_name) - continue + logging.info(f"Set null transform as default for '{stream_name}'") + + elif isinstance(stream_def, str): + # Non-NULL string values are not currently supported + msg = f"Option '{stream_map_key}:{stream_def}' is not expected." + raise StreamMapConfigError(msg) - if not isinstance(stream_def, dict): + else: msg = ( f"Unexpected stream definition type. Expected str, dict, or None. " f"Got '{type(stream_def).__name__}'." ) raise StreamMapConfigError(msg) - if MAPPER_SOURCE_OPTION in stream_def: - source_stream = stream_def.pop(MAPPER_SOURCE_OPTION) - - if source_stream != stream_name: - # Not a match - continue - - if MAPPER_ALIAS_OPTION in stream_def: - stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION) - - mapper = CustomStreamMap( - stream_alias=stream_alias, - map_transform=stream_def, - map_config=self.map_config, - raw_schema=schema, - key_properties=key_properties, - flattening_options=self.flattening_options, - ) - - if source_stream == stream_map_key: + if is_source_stream_primary: # Zero-th mapper should be the same-keyed mapper. # Override the default mapper with this custom map. - self.stream_maps[stream_name][0] = mapper + self.stream_maps[source_stream][0] = mapper else: # Additional mappers for aliasing and multi-projection: - self.stream_maps[stream_name].append(mapper) + self.stream_maps[source_stream].append(mapper) diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index 036d7586a..32e300c23 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -46,12 +46,13 @@ def sample_catalog_dict() -> dict: Property("name", StringType), Property("owner_email", StringType), Property("description", StringType), - Property("description", StringType), + Property("create_date", StringType), ).to_dict() foobars_schema = PropertiesList( Property("the", StringType), Property("brown", StringType), ).to_dict() + singular_schema = PropertiesList(Property("foo", StringType)).to_dict() return { "streams": [ { @@ -64,6 +65,11 @@ def sample_catalog_dict() -> dict: "tap_stream_id": "foobars", "schema": foobars_schema, }, + { + "stream": "singular", + "tap_stream_id": "singular", + "schema": singular_schema, + }, ], } @@ -106,6 +112,9 @@ def sample_stream(): {"the": "quick"}, {"brown": "fox"}, ], + "singular": [ + {"foo": "bar"}, + ], } @@ -181,6 +190,7 @@ def transformed_result(stream_map_config): {"the": "quick"}, {"brown": "fox"}, ], + "singular": [{"foo": "bar"}], # should be unchanged } @@ -200,6 +210,9 @@ def transformed_schemas(): Property("the", StringType), Property("brown", StringType), ).to_dict(), + "singular": PropertiesList( + Property("foo", StringType), + ).to_dict(), } @@ -231,6 +244,7 @@ def cloned_and_aliased_schemas(): Property("name", StringType), Property("owner_email", StringType), Property("description", StringType), + Property("create_date", StringType), ).to_dict() return { "repositories_aliased": properties, @@ -277,6 +291,51 @@ def filtered_schemas(): return {"repositories": PropertiesList(Property("name", StringType)).to_dict()} +# Wildcard + + +@pytest.fixture +def wildcard_stream_maps(): + return { + "*s": { + "db_name": "'database'", + }, + } + + +@pytest.fixture +def wildcard_result(sample_stream): + return { + "repositories": [ + {**record, "db_name": "database"} + for record in sample_stream["repositories"] + ], + "foobars": [ + {**record, "db_name": "database"} for record in sample_stream["foobars"] + ], + "singular": sample_stream["singular"], + } + + +@pytest.fixture +def wildcard_schemas(sample_catalog_dict): + return { + "repositories": PropertiesList( + Property("name", StringType), + Property("owner_email", StringType), + Property("description", StringType), + Property("create_date", StringType), + Property("db_name", StringType), + ).to_dict(), + "foobars": PropertiesList( + Property("the", StringType), + Property("brown", StringType), + Property("db_name", StringType), # added + ).to_dict(), + "singular": PropertiesList(Property("foo", StringType)).to_dict(), # unchanged + } + + def test_map_transforms( sample_stream, sample_catalog_obj, @@ -354,6 +413,25 @@ def test_filter_transforms_w_error( ) +def test_wildcard_transforms( + sample_stream, + sample_catalog_obj, + wildcard_stream_maps, + stream_map_config, + wildcard_result, + wildcard_schemas, +): + _test_transform( + "wildcard", + stream_maps=wildcard_stream_maps, + stream_map_config=stream_map_config, + expected_result=wildcard_result, + expected_schemas=wildcard_schemas, + sample_stream=sample_stream, + sample_catalog_obj=sample_catalog_obj, + ) + + def _test_transform( test_name: str, *, From 31773fc1fe8367165981f365de851acfb3af5f83 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 1 Aug 2023 00:22:10 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- singer_sdk/mapper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index e608d8175..9855d88e6 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -8,10 +8,10 @@ import abc import copy import datetime +import fnmatch import hashlib import logging import typing as t -import fnmatch from singer_sdk.exceptions import MapExpressionError, StreamMapConfigError from singer_sdk.helpers import _simpleeval as simpleeval @@ -725,7 +725,9 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 key_properties=key_properties, flattening_options=self.flattening_options, ) - elif stream_def is None or (isinstance(stream_def, str) and stream_def == NULL_STRING): + elif stream_def is None or ( + isinstance(stream_def, str) and stream_def == NULL_STRING + ): mapper = RemoveRecordTransform( stream_alias=stream_alias, raw_schema=schema, From c1da49ad0082500f49ebfe21c0e578cd47d6c054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Wed, 31 Jan 2024 17:02:30 -0600 Subject: [PATCH 5/6] Fix wildcard test --- tests/core/test_mapper.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index c14b78454..058099f3a 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -22,6 +22,7 @@ from singer_sdk.typing import ( ArrayType, BooleanType, + CustomType, IntegerType, NumberType, ObjectType, @@ -393,6 +394,7 @@ def wildcard_result(sample_stream): {**record, "db_name": "database"} for record in sample_stream["foobars"] ], "singular": sample_stream["singular"], + "nested_jellybean": sample_stream["nested_jellybean"], } @@ -412,6 +414,18 @@ def wildcard_schemas(): Property("db_name", StringType), # added ).to_dict(), "singular": PropertiesList(Property("foo", StringType)).to_dict(), # unchanged + "nested_jellybean": PropertiesList( # unchanged + Property("id", IntegerType), + Property( + "custom_fields", + ArrayType( + ObjectType( + Property("id", IntegerType), + Property("value", CustomType({})), + ), + ), + ), + ).to_dict(), } From c35f8a3141ab6a931cc845fb520208abfa4406d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Wed, 7 Feb 2024 21:47:46 -0600 Subject: [PATCH 6/6] Fix types --- singer_sdk/mapper.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index 520ca6a7a..bc6224d7f 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -766,6 +766,8 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 else: continue + mapper: CustomStreamMap | RemoveRecordTransform + if isinstance(stream_def, dict): mapper = CustomStreamMap( stream_alias=stream_alias,