Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Improve to_sql() to accept list of records #43

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
`quote_relation_name` from `sqlalchemy-cratedb` package.
- DynamoDB: Add special decoding for varied lists, storing them into a separate
`OBJECT(IGNORED)` column in CrateDB
- DynamoDB: Improve `to_sql()` to accept list of records

## 2024/08/27 v0.0.13
- DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values
Expand Down
3 changes: 3 additions & 0 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,6 @@ class DualRecord:

typed: t.Dict[str, t.Any]
untyped: t.Dict[str, t.Any]

def to_dict(self):
return {"typed": self.typed, "untyped": self.untyped}
12 changes: 8 additions & 4 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
# Inhibit Rounded Exceptions
DYNAMODB_CONTEXT.traps[decimal.Rounded] = False

RecordType = t.Dict[str, t.Any]


class CrateDBTypeDeserializer(TypeDeserializer):
def _deserialize_n(self, value):
Expand Down Expand Up @@ -132,13 +134,15 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord:


class DynamoDBFullLoadTranslator(DynamoTranslatorBase):
def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation:
def to_sql(self, data: t.Union[RecordType, t.List[RecordType]]) -> SQLOperation:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
Produce INSERT SQL operations (SQL statement and parameters) from DynamoDB record(s).
"""
dual_record = self.decode_record(record)
sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
return SQLOperation(sql, {"typed": dual_record.typed, "untyped": dual_record.untyped})
if not isinstance(data, list):
data = [data]
parameters = [self.decode_record(record).to_dict() for record in data]
return SQLOperation(sql, parameters)


class DynamoDBCDCTranslator(DynamoTranslatorBase):
Expand Down
12 changes: 7 additions & 5 deletions tests/transform/test_dynamodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ def test_to_sql_operation():
"""
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_IN) == SQLOperation(
statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);",
parameters={
"typed": RECORD_OUT_DATA,
"untyped": RECORD_OUT_AUX,
},
parameters=[
{
"typed": RECORD_OUT_DATA,
"untyped": RECORD_OUT_AUX,
}
],
)


Expand All @@ -103,7 +105,7 @@ def test_to_sql_cratedb(caplog, cratedb):

# Compute CrateDB operation (SQL+parameters) from DynamoDB record.
translator = DynamoDBFullLoadTranslator(table_name="from.dynamodb")
operation = translator.to_sql(record=RECORD_IN)
operation = translator.to_sql(RECORD_IN)

# Insert into CrateDB.
cratedb.database.run_sql(translator.sql_ddl)
Expand Down