Skip to content

Commit

Permalink
MongoDB: Apply special treatment to certain items
Browse files Browse the repository at this point in the history
Some should be stored as lists, some need to be ignored for now.
  • Loading branch information
amotl committed Sep 3, 2024
1 parent 59e9c1d commit c670f36
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
This means relevant column definitions will not be included into the SQL DDL.
- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy.
- MongoDB: Sanitize lists of varying objects
- MongoDB: Apply special treatment to items that should be stored as lists

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
9 changes: 6 additions & 3 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,23 @@ def start(self):
progress_bar = tqdm(total=records_in)
records_out = 0

for document in self.mongodb_collection.find().limit(self.mongodb_limit):
for document in self.mongodb_collection.find().skip(0).limit(self.mongodb_limit):
try:
operation = self.translator.to_sql(document)
logger.debug("SQL operation: %s", operation)
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")

Check warning on line 107 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L106-L107

Added lines #L106 - L107 were not covered by tests
continue
# TODO: Make configurable.
# raise
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")
logger_on_error(f"Executing query failed: {ex}\nOperation:\n{operation}")

Check warning on line 116 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L115-L116

Added lines #L115 - L116 were not covered by tests
# TODO: Make configurable.
# raise

progress_bar.close()
connection.commit()
Expand Down
69 changes: 69 additions & 0 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,40 @@ def timestamp_converter(value):
"undefined": lambda x: None,
}

# TODO: Make configurable, for example by using a Zyp transformation definition.
# Ignored items include anomalies not resolved yet.
special_treatment_ignore_items = [
# Lists
"carrier_orders",
"contacts",
"equipment",
"flex_attributes",
# Maps
"status",
"customer",
"customers",
"customer_orders",
"customer_orders[]",
"sharing_meta_data",
"ship_locations",
"sort_attributes",
"start_date",
"end_date",
]
# New

# Those items are fine after applying `to_list()`.
special_treatment_to_list = [
"external_ids",
"flags",
"groupName",
"lane",
"po_number",
]
special_treatment_to_string = [
"custom_id",
]


def extract_value(value, parent_type=None):
"""
Expand All @@ -68,6 +102,8 @@ def extract_value(value, parent_type=None):
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
# Custom adjustments to compensate shape errors in source data.
apply_special_treatments(value)
if len(value) == 1:
if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]:
decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"])))
Expand All @@ -89,6 +125,39 @@ def extract_value(value, parent_type=None):
return value


def apply_special_treatments(value):
"""
Apply special treatments to value that can't be described otherwise up until now.
TODO: Needs an integration test feeding two records instead of just one.
"""

# Ignore certain items including anomalies that are not resolved, yet.
for ignore_name in special_treatment_ignore_items:
if ignore_name in value:
del value[ignore_name]

Check warning on line 138 in cratedb_toolkit/io/mongodb/export.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/export.py#L138

Added line #L138 was not covered by tests

# Converge certain items to `list` even when defined differently.
for to_list_name in special_treatment_to_list:
if to_list_name in value and not isinstance(value[to_list_name], list):
value[to_list_name] = [value[to_list_name]]

Check warning on line 143 in cratedb_toolkit/io/mongodb/export.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/export.py#L143

Added line #L143 was not covered by tests

# Converge certain items to `str` even when defined differently.
for name in special_treatment_to_string:
if name in value and not isinstance(value[name], str):
value[name] = str(value[name])

Check warning on line 148 in cratedb_toolkit/io/mongodb/export.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/export.py#L148

Added line #L148 was not covered by tests

# Manual treatment.
# Some nested objects have been defined as strings, probably in previous schema versions.
if "users" in value:
for user_item in value["users"]:
if "user" in user_item and not isinstance(user_item["user"], dict):
user_item["user"] = {"id": user_item["user"]}

Check warning on line 155 in cratedb_toolkit/io/mongodb/export.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/export.py#L153-L155

Added lines #L153 - L155 were not covered by tests

if "createdBy" in value and not isinstance(value["createdBy"], dict):
value["createdBy"] = {"id": value["createdBy"]}

Check warning on line 158 in cratedb_toolkit/io/mongodb/export.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/export.py#L158

Added line #L158 was not covered by tests


@define
class ListOfVaryingObjectsSanitizer:
"""
Expand Down

0 comments on commit c670f36

Please sign in to comment.