From 66f16aff02bf77d5868a65df92cb6f4b957805fb Mon Sep 17 00:00:00 2001 From: Matt Elgazar Date: Wed, 9 Aug 2023 10:33:44 +0900 Subject: [PATCH 01/10] exclude system.profile from collections --- tap_mongodb/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_mongodb/connector.py b/tap_mongodb/connector.py index 366192a..fb512df 100644 --- a/tap_mongodb/connector.py +++ b/tap_mongodb/connector.py @@ -99,7 +99,7 @@ def discover_catalog_entries(self) -> List[Dict[str, Any]]: The discovered catalog entries as a list. """ result: List[Dict] = [] - for collection in self.database.list_collection_names(authorizedCollections=True, nameOnly=True): + for collection in [i for i in self.database.list_collection_names(authorizedCollections=True, nameOnly=True) if i != 'system.profile']: try: self.database[collection].find_one() except PyMongoError: From f5b99224cfc98a84f47444aa3f50b319f878c7a6 Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Thu, 5 Oct 2023 21:40:56 -0500 Subject: [PATCH 02/10] recursive inf replacement error handling over json dict --- pyproject.toml | 2 +- tap_mongodb/connector.py | 2 +- tap_mongodb/streams.py | 22 ++++++++++++++++++---- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6395c88..00403c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-mongodb" -version = "2.3.3" +version = "2.3.4" description = "`tap-mongodb` is a Singer tap for MongoDB and AWS DocumentDB, built with the Meltano Singer SDK." readme = "README.md" authors = ["Matt Menzenski"] diff --git a/tap_mongodb/connector.py b/tap_mongodb/connector.py index fb512df..366192a 100644 --- a/tap_mongodb/connector.py +++ b/tap_mongodb/connector.py @@ -99,7 +99,7 @@ def discover_catalog_entries(self) -> List[Dict[str, Any]]: The discovered catalog entries as a list. """ result: List[Dict] = [] - for collection in [i for i in self.database.list_collection_names(authorizedCollections=True, nameOnly=True) if i != 'system.profile']: + for collection in self.database.list_collection_names(authorizedCollections=True, nameOnly=True): try: self.database[collection].find_one() except PyMongoError: diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index 2232920..f71bd17 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import Any, Generator, Iterable, Optional - +import math from bson.objectid import ObjectId from pendulum import DateTime from pymongo import ASCENDING @@ -22,7 +22,7 @@ REPLICATION_INCREMENTAL, REPLICATION_LOG_BASED, Stream, - TypeConformanceLevel, + TypeConformanceLevel ) from tap_mongodb.connector import MongoDBConnector @@ -30,6 +30,17 @@ DEFAULT_START_DATE: str = "1970-01-01" +def recursive_replace_inf_in_dict(dct): + for key, value in dct.items(): + if value in [-math.inf, math.inf]: + dct[key] = str(dct[key]) + elif isinstance(value, list): + for i, item in enumerate(value): + if isinstance(item, dict): + recursive_replace_inf_in_dict(item) + elif isinstance(value, dict): + recursive_replace_inf_in_dict(value) + return def to_object_id(replication_key_value: str) -> ObjectId: """Converts an ISO-8601 date string into a BSON ObjectId.""" @@ -163,7 +174,7 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess record=record, schema=self.schema, level=self.TYPE_CONFORMANCE_LEVEL, - logger=self.logger, + logger=self.logger ) for stream_map in self.stream_maps: mapped_record = stream_map.transform(record) @@ -173,7 +184,7 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess stream=stream_map.stream_alias, record=mapped_record, version=None, - time_extracted=extracted_at, + time_extracted=extracted_at ) yield record_message @@ -199,6 +210,9 @@ def get_records(self, context: dict | None) -> Iterable[dict]: for record in collection.find({"_id": {"$gt": start_date}}).sort([("_id", ASCENDING)]): object_id: ObjectId = record["_id"] incremental_id: IncrementalId = IncrementalId.from_object_id(object_id) + + recursive_replace_inf_in_dict(record) + parsed_record = { "replication_key": str(incremental_id), "object_id": str(object_id), From 9e06e00a32525664e1d6677f8f1320230649dafd Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Thu, 5 Oct 2023 21:56:43 -0500 Subject: [PATCH 03/10] tested --- tap_mongodb/streams.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index f71bd17..f5e1fe3 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -47,7 +47,6 @@ def to_object_id(replication_key_value: str) -> ObjectId: incremental_id: IncrementalId = IncrementalId.from_string(replication_key_value) return incremental_id.object_id - class MongoDBCollectionStream(Stream): """Stream class for mongodb streams.""" @@ -97,6 +96,7 @@ def primary_keys(self) -> Optional[list[str]]: def primary_keys(self, new_value: list[str]) -> None: """Set primary keys for the stream.""" self._primary_keys = new_value + return @property def is_sorted(self) -> bool: @@ -106,6 +106,7 @@ def is_sorted(self) -> bool: string, and these are alphanumerically sortable. When the tap is running in log-based mode, it is not sorted - the replication key value here is a hex string.""" + return self.replication_method == REPLICATION_INCREMENTAL def _increment_stream_state(self, latest_record: dict[str, Any], *, context: dict | None = None) -> None: @@ -120,7 +121,9 @@ def _increment_stream_state(self, latest_record: dict[str, Any], *, context: dic Raises: ValueError: if configured replication method is unsupported, or if replication key is absent + """ + # This also creates a state entry if one does not yet exist: state_dict = self.get_context_state(context) @@ -193,9 +196,7 @@ def get_records(self, context: dict | None) -> Iterable[dict]: """Return a generator of record-type dictionary objects.""" # pylint: disable=too-many-locals,too-many-branches,too-many-statements bookmark: str = self.get_starting_replication_key_value(context) - should_add_metadata: bool = self.config.get("add_record_metadata", False) - collection: Collection = self._connector.database[self._collection_name] if self.replication_method == REPLICATION_INCREMENTAL: @@ -268,6 +269,7 @@ def get_records(self, context: dict | None) -> Iterable[dict]: else: self.logger.critical(f"operation_failure on collection.watch: {operation_failure}") raise operation_failure + except Exception as exception: self.logger.critical(exception) raise exception From 46d971d193d9ad4c547c015d4f7f475f5267d1b1 Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Fri, 6 Oct 2023 11:26:16 -0500 Subject: [PATCH 04/10] add case when list value is inf --- tap_mongodb/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index f5e1fe3..896c962 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -38,6 +38,8 @@ def recursive_replace_inf_in_dict(dct): for i, item in enumerate(value): if isinstance(item, dict): recursive_replace_inf_in_dict(item) + elif item in [-math.inf, math.inf]: + value[i] = str(item) elif isinstance(value, dict): recursive_replace_inf_in_dict(value) return From 842a58a2f5a1da849db1f6fa0447a107a6e8763d Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Tue, 10 Oct 2023 19:46:29 -0500 Subject: [PATCH 05/10] changing inf and nan to work with json dumps --- tap_mongodb/streams.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index 896c962..71c5de3 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -32,14 +32,14 @@ def recursive_replace_inf_in_dict(dct): for key, value in dct.items(): - if value in [-math.inf, math.inf]: - dct[key] = str(dct[key]) + if value in [-math.inf, math.inf, math.nan]: + dct[key] = None elif isinstance(value, list): for i, item in enumerate(value): if isinstance(item, dict): recursive_replace_inf_in_dict(item) - elif item in [-math.inf, math.inf]: - value[i] = str(item) + elif item in [-math.inf, math.inf, math.nan]: + value[i] = None elif isinstance(value, dict): recursive_replace_inf_in_dict(value) return From 46e612c2d655215379a2ecddd0bbf04897de58ed Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Tue, 10 Oct 2023 20:08:58 -0500 Subject: [PATCH 06/10] update version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 00403c1..ce59a4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-mongodb" -version = "2.3.4" +version = "2.3.5" description = "`tap-mongodb` is a Singer tap for MongoDB and AWS DocumentDB, built with the Meltano Singer SDK." readme = "README.md" authors = ["Matt Menzenski"] From ae48025525c3bb1e3767394e5085f1a7a5b332af Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Tue, 10 Oct 2023 20:12:00 -0500 Subject: [PATCH 07/10] rename fn --- tap_mongodb/streams.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index 71c5de3..5aa62cf 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -30,18 +30,18 @@ DEFAULT_START_DATE: str = "1970-01-01" -def recursive_replace_inf_in_dict(dct): +def recursive_replace_empty_in_dict(dct): for key, value in dct.items(): if value in [-math.inf, math.inf, math.nan]: dct[key] = None elif isinstance(value, list): for i, item in enumerate(value): if isinstance(item, dict): - recursive_replace_inf_in_dict(item) + recursive_replace_empty_in_dict(item) elif item in [-math.inf, math.inf, math.nan]: value[i] = None elif isinstance(value, dict): - recursive_replace_inf_in_dict(value) + recursive_replace_empty_in_dict(value) return def to_object_id(replication_key_value: str) -> ObjectId: @@ -214,7 +214,7 @@ def get_records(self, context: dict | None) -> Iterable[dict]: object_id: ObjectId = record["_id"] incremental_id: IncrementalId = IncrementalId.from_object_id(object_id) - recursive_replace_inf_in_dict(record) + recursive_replace_empty_in_dict(record) parsed_record = { "replication_key": str(incremental_id), From 6499a68e40f12591ba81e15a1b88687f48072ede Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Thu, 19 Oct 2023 12:43:32 -0500 Subject: [PATCH 08/10] pass checks --- tap_mongodb/streams.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index 5aa62cf..f7cb6de 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -2,9 +2,10 @@ from __future__ import annotations +import math from datetime import datetime from typing import Any, Generator, Iterable, Optional -import math + from bson.objectid import ObjectId from pendulum import DateTime from pymongo import ASCENDING @@ -18,19 +19,19 @@ from singer_sdk.helpers._state import increment_state from singer_sdk.helpers._typing import conform_record_data_types from singer_sdk.helpers._util import utc_now -from singer_sdk.streams.core import ( - REPLICATION_INCREMENTAL, - REPLICATION_LOG_BASED, - Stream, - TypeConformanceLevel -) +from singer_sdk.streams.core import REPLICATION_INCREMENTAL, REPLICATION_LOG_BASED, Stream, TypeConformanceLevel from tap_mongodb.connector import MongoDBConnector from tap_mongodb.types import IncrementalId DEFAULT_START_DATE: str = "1970-01-01" + def recursive_replace_empty_in_dict(dct): + """ + Recursively replace empty values with None in a dictionary. + NaN, inf, and -inf are unable to be parsed by the json library, so these values will be replaced with None. + """ for key, value in dct.items(): if value in [-math.inf, math.inf, math.nan]: dct[key] = None @@ -42,13 +43,16 @@ def recursive_replace_empty_in_dict(dct): value[i] = None elif isinstance(value, dict): recursive_replace_empty_in_dict(value) - return + return dct + def to_object_id(replication_key_value: str) -> ObjectId: """Converts an ISO-8601 date string into a BSON ObjectId.""" incremental_id: IncrementalId = IncrementalId.from_string(replication_key_value) + return incremental_id.object_id + class MongoDBCollectionStream(Stream): """Stream class for mongodb streams.""" @@ -98,7 +102,7 @@ def primary_keys(self) -> Optional[list[str]]: def primary_keys(self, new_value: list[str]) -> None: """Set primary keys for the stream.""" self._primary_keys = new_value - return + return self @property def is_sorted(self) -> bool: @@ -158,6 +162,8 @@ def _increment_stream_state(self, latest_record: dict[str, Any], *, context: dic check_sorted=self.check_sorted, ) + return self + def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMessage, None, None]: """Write out a RECORD message. @@ -179,19 +185,16 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess record=record, schema=self.schema, level=self.TYPE_CONFORMANCE_LEVEL, - logger=self.logger + logger=self.logger, ) + for stream_map in self.stream_maps: mapped_record = stream_map.transform(record) # Emit record if not filtered if mapped_record is not None: record_message = singer.RecordMessage( - stream=stream_map.stream_alias, - record=mapped_record, - version=None, - time_extracted=extracted_at + stream=stream_map.stream_alias, record=mapped_record, version=None, time_extracted=extracted_at ) - yield record_message def get_records(self, context: dict | None) -> Iterable[dict]: From 80223fa0df8658803f6dd8430dbf9edc38efc31b Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Fri, 20 Oct 2023 10:46:37 -0500 Subject: [PATCH 09/10] add plugins --- plugins/loaders/target-jsonl--andyh1203.lock | 34 ++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 plugins/loaders/target-jsonl--andyh1203.lock diff --git a/plugins/loaders/target-jsonl--andyh1203.lock b/plugins/loaders/target-jsonl--andyh1203.lock new file mode 100644 index 0000000..5825fc4 --- /dev/null +++ b/plugins/loaders/target-jsonl--andyh1203.lock @@ -0,0 +1,34 @@ +{ + "plugin_type": "loaders", + "name": "target-jsonl", + "namespace": "target_jsonl", + "variant": "andyh1203", + "label": "JSON Lines (JSONL)", + "docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203", + "repo": "https://github.com/andyh1203/target-jsonl", + "pip_url": "target-jsonl", + "description": "JSONL loader", + "logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png", + "settings": [ + { + "name": "destination_path", + "kind": "string", + "value": "output", + "label": "Destination Path", + "description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n" + }, + { + "name": "do_timestamp_file", + "kind": "boolean", + "value": false, + "label": "Include Timestamp in File Names", + "description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n" + }, + { + "name": "custom_name", + "kind": "string", + "label": "Custom File Name Override", + "description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n" + } + ] +} \ No newline at end of file From cfb051f1fe41e3c13091bd5ad4593557f24b128e Mon Sep 17 00:00:00 2001 From: melgazar9 Date: Fri, 20 Oct 2023 10:47:56 -0500 Subject: [PATCH 10/10] EOL fix --- plugins/loaders/target-jsonl--andyh1203.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/loaders/target-jsonl--andyh1203.lock b/plugins/loaders/target-jsonl--andyh1203.lock index 5825fc4..11fa0ba 100644 --- a/plugins/loaders/target-jsonl--andyh1203.lock +++ b/plugins/loaders/target-jsonl--andyh1203.lock @@ -31,4 +31,4 @@ "description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n" } ] -} \ No newline at end of file +}