From c670f369ffe919ac456606a226b15e6f97daeb34 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 3 Sep 2024 13:18:22 +0200 Subject: [PATCH] MongoDB: Apply special treatment to certain items Some should be stored as lists, some need to be ignored for now. --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/copy.py | 9 ++-- cratedb_toolkit/io/mongodb/export.py | 69 ++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 5c43831..78640cc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,7 @@ This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. - MongoDB: Sanitize lists of varying objects +- MongoDB: Apply special treatment to items that should be stored as lists ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index a735a1b..853bbe1 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -99,20 +99,23 @@ def start(self): progress_bar = tqdm(total=records_in) records_out = 0 - for document in self.mongodb_collection.find().limit(self.mongodb_limit): + for document in self.mongodb_collection.find().skip(0).limit(self.mongodb_limit): try: operation = self.translator.to_sql(document) logger.debug("SQL operation: %s", operation) except Exception as ex: logger_on_error(f"Transforming query failed: {ex}") - continue + # TODO: Make configurable. + # raise try: result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount records_out += result_size progress_bar.update(n=result_size) except Exception as ex: - logger_on_error(f"Executing query failed: {ex}") + logger_on_error(f"Executing query failed: {ex}\nOperation:\n{operation}") + # TODO: Make configurable. + # raise progress_bar.close() connection.commit() diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 2e5d711..4119d69 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -59,6 +59,40 @@ def timestamp_converter(value): "undefined": lambda x: None, } +# TODO: Make configurable, for example by using a Zyp transformation definition. +# Ignored items include anomalies not resolved yet. +special_treatment_ignore_items = [ + # Lists + "carrier_orders", + "contacts", + "equipment", + "flex_attributes", + # Maps + "status", + "customer", + "customers", + "customer_orders", + "customer_orders[]", + "sharing_meta_data", + "ship_locations", + "sort_attributes", + "start_date", + "end_date", +] +# New + +# Those items are fine after applying `to_list()`. +special_treatment_to_list = [ + "external_ids", + "flags", + "groupName", + "lane", + "po_number", +] +special_treatment_to_string = [ + "custom_id", +] + def extract_value(value, parent_type=None): """ @@ -68,6 +102,8 @@ def extract_value(value, parent_type=None): - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ """ if isinstance(value, dict): + # Custom adjustments to compensate shape errors in source data. + apply_special_treatments(value) if len(value) == 1: if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) @@ -89,6 +125,39 @@ def extract_value(value, parent_type=None): return value +def apply_special_treatments(value): + """ + Apply special treatments to value that can't be described otherwise up until now. + + TODO: Needs an integration test feeding two records instead of just one. + """ + + # Ignore certain items including anomalies that are not resolved, yet. + for ignore_name in special_treatment_ignore_items: + if ignore_name in value: + del value[ignore_name] + + # Converge certain items to `list` even when defined differently. + for to_list_name in special_treatment_to_list: + if to_list_name in value and not isinstance(value[to_list_name], list): + value[to_list_name] = [value[to_list_name]] + + # Converge certain items to `str` even when defined differently. + for name in special_treatment_to_string: + if name in value and not isinstance(value[name], str): + value[name] = str(value[name]) + + # Manual treatment. + # Some nested objects have been defined as strings, probably in previous schema versions. + if "users" in value: + for user_item in value["users"]: + if "user" in user_item and not isinstance(user_item["user"], dict): + user_item["user"] = {"id": user_item["user"]} + + if "createdBy" in value and not isinstance(value["createdBy"], dict): + value["createdBy"] = {"id": value["createdBy"]} + + @define class ListOfVaryingObjectsSanitizer: """