diff --git a/CHANGES.md b/CHANGES.md index 6296a71..b8ff482 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- DynamoDB: Change CrateDB data model to use (`pk`, `data`, `aux`) columns + Attention: This is a breaking change. ## 2024/09/26 v0.0.19 - DynamoDB CDC: Fix `MODIFY` operation by propagating `NewImage` fully diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py index 965553d..66265c1 100644 --- a/src/commons_codec/model.py +++ b/src/commons_codec/model.py @@ -131,14 +131,15 @@ def to_sql(self): @define -class DualRecord: +class UniversalRecord: """ - Manage two halves of a record. + Manage a universal record including primary keys and two halves of a record. One bucket stores the typed fields, the other stores the untyped ones. """ + pk: t.Dict[str, t.Any] typed: t.Dict[str, t.Any] untyped: t.Dict[str, t.Any] def to_dict(self): - return {"typed": self.typed, "untyped": self.untyped} + return {"pk": self.pk, "typed": self.typed, "untyped": self.untyped} diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index c6d08f9..2814b78 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -8,10 +8,10 @@ from sqlalchemy_cratedb.support import quote_relation_name from commons_codec.model import ( - DualRecord, SQLOperation, - SQLParameterizedWhereClause, + UniversalRecord, ) +from commons_codec.transform.dynamodb_model import PrimaryKeySchema from commons_codec.util.data import TaggableList from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer @@ -73,6 +73,10 @@ class DynamoTranslatorBase: Translate DynamoDB records into a different representation. """ + # Define name of the column where KeySchema DynamoDB fields will get materialized into. + # This column uses the `OBJECT(DYNAMIC)` data type. + PK_COLUMN = "pk" + # Define name of the column where typed DynamoDB fields will get materialized into. # This column uses the `OBJECT(DYNAMIC)` data type. TYPED_COLUMN = "data" @@ -81,9 +85,10 @@ class DynamoTranslatorBase: # This column uses the `OBJECT(IGNORED)` data type. UNTYPED_COLUMN = "aux" - def __init__(self, table_name: str): + def __init__(self, table_name: str, primary_key_schema: PrimaryKeySchema = None): super().__init__() self.table_name = quote_relation_name(table_name) + self.primary_key_schema = primary_key_schema self.deserializer = CrateDBTypeDeserializer() @property @@ -91,12 +96,16 @@ def sql_ddl(self): """` Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. """ + if self.primary_key_schema is None: + raise IOError("Unable to generate SQL DDL without key schema information") return ( - f"CREATE TABLE IF NOT EXISTS {self.table_name} " - f"({self.TYPED_COLUMN} OBJECT(DYNAMIC), {self.UNTYPED_COLUMN} OBJECT(IGNORED));" + f"CREATE TABLE IF NOT EXISTS {self.table_name} (" + f"{self.PK_COLUMN} OBJECT(STRICT) AS ({', '.join(self.primary_key_schema.to_sql_ddl_clauses())}), " + f"{self.TYPED_COLUMN} OBJECT(DYNAMIC), " + f"{self.UNTYPED_COLUMN} OBJECT(IGNORED));" ) - def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord: + def decode_record(self, item: t.Dict[str, t.Any], key_names: t.Union[t.List[str], None] = None) -> UniversalRecord: """ Deserialize DynamoDB JSON record into vanilla Python. @@ -124,12 +133,20 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord: -- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors """ record = toolz.valmap(self.deserializer.deserialize, item) + + pk = {} untyped = {} + pk_names = key_names or [] + if not pk_names and self.primary_key_schema is not None: + pk_names = self.primary_key_schema.keys() for key, value in record.items(): + if key in pk_names: + pk[key] = value if isinstance(value, TaggableList) and value.get_tag("varied", False): untyped[key] = value + record = toolz.dissoc(record, *pk.keys()) record = toolz.dissoc(record, *untyped.keys()) - return DualRecord(typed=record, untyped=untyped) + return UniversalRecord(pk=pk, typed=record, untyped=untyped) class DynamoDBFullLoadTranslator(DynamoTranslatorBase): @@ -137,7 +154,16 @@ def to_sql(self, data: t.Union[RecordType, t.List[RecordType]]) -> SQLOperation: """ Produce INSERT SQL operations (SQL statement and parameters) from DynamoDB record(s). """ - sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);" + sql = ( + f"INSERT INTO {self.table_name} (" + f"{self.PK_COLUMN}, " + f"{self.TYPED_COLUMN}, " + f"{self.UNTYPED_COLUMN}" + f") VALUES (" + f":pk, " + f":typed, " + f":untyped);" + ) if not isinstance(data, list): data = [data] parameters = [self.decode_record(record).to_dict() for record in data] @@ -166,56 +192,43 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: raise ValueError(f"Unknown eventSource: {event_source}") if event_name == "INSERT": - dual_record = self.decode_record(event["dynamodb"]["NewImage"]) + record = self.decode_event(event["dynamodb"]) sql = ( - f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);" + f"INSERT INTO {self.table_name} (" + f"{self.PK_COLUMN}, " + f"{self.TYPED_COLUMN}, " + f"{self.UNTYPED_COLUMN}" + f") VALUES (" + f":pk, " + f":typed, " + f":untyped);" ) - parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped} + parameters = record.to_dict() elif event_name == "MODIFY": - new_image = event["dynamodb"]["NewImage"] - # Drop primary key columns to not update them. - # Primary key values should be identical (if chosen identical in DynamoDB and CrateDB), - # but CrateDB does not allow having them in an UPDATE's SET clause. - for key in event["dynamodb"]["Keys"]: - del new_image[key] - - dual_record = self.decode_record(event["dynamodb"]["NewImage"]) - - where_clause = self.keys_to_where(event["dynamodb"]["Keys"]) + record = self.decode_event(event["dynamodb"]) sql = ( f"UPDATE {self.table_name} " f"SET {self.TYPED_COLUMN}=:typed, {self.UNTYPED_COLUMN}=:untyped " - f"WHERE {where_clause.to_sql()};" + f"WHERE {self.PK_COLUMN}=:pk;" ) - parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped} - parameters.update(where_clause.values) + parameters = record.to_dict() elif event_name == "REMOVE": - where_clause = self.keys_to_where(event["dynamodb"]["Keys"]) - sql = f"DELETE FROM {self.table_name} WHERE {where_clause.to_sql()};" - parameters = where_clause.values # noqa: PD011 + record = self.decode_event(event["dynamodb"]) + sql = f"DELETE FROM {self.table_name} WHERE {self.PK_COLUMN}=:pk;" + parameters = record.to_dict() else: raise ValueError(f"Unknown CDC event name: {event_name}") return SQLOperation(sql, parameters) - def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause: - """ - Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax. - - IN (top-level stripped): - "Keys": { - "device": {"S": "foo"}, - "timestamp": {"S": "2024-07-12T01:17:42"}, - } + def decode_event(self, event: t.Dict[str, t.Any]) -> UniversalRecord: + # That's for INSERT+MODIFY. + if "NewImage" in event: + return self.decode_record(event["NewImage"], event["Keys"].keys()) - OUT: - WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42' - """ - dual_record = self.decode_record(keys) - clause = SQLParameterizedWhereClause() - for key_name, key_value in dual_record.typed.items(): - clause.add(lval=f"{self.TYPED_COLUMN}['{key_name}']", name=key_name, value=key_value) - return clause + # That's for REMOVE. + else: + return self.decode_record(event["Keys"], event["Keys"].keys()) diff --git a/src/commons_codec/transform/dynamodb_model.py b/src/commons_codec/transform/dynamodb_model.py new file mode 100644 index 0000000..95dc75b --- /dev/null +++ b/src/commons_codec/transform/dynamodb_model.py @@ -0,0 +1,82 @@ +import sys +import typing as t + +from attr import Factory, define + +if sys.version_info >= (3, 11): + from enum import StrEnum +else: + from backports.strenum import StrEnum # pragma: no cover + + +class AttributeType(StrEnum): + STRING = "STRING" + NUMBER = "NUMBER" + BINARY = "BINARY" + + +DYNAMODB_TYPE_MAP = { + "S": AttributeType.STRING, + "N": AttributeType.NUMBER, + "B": AttributeType.BINARY, +} + +CRATEDB_TYPE_MAP = { + AttributeType.STRING: "STRING", + AttributeType.NUMBER: "BIGINT", + AttributeType.BINARY: "STRING", +} + + +@define +class Attribute: + name: str + type: AttributeType + + @classmethod + def from_dynamodb(cls, name: str, type_: str): + try: + return cls(name=name, type=DYNAMODB_TYPE_MAP[type_]) + except KeyError as ex: + raise KeyError(f"Mapping DynamoDB type failed: name={name}, type={type_}") from ex + + @property + def cratedb_type(self): + return CRATEDB_TYPE_MAP[self.type] + + +@define +class PrimaryKeySchema: + schema: t.List[Attribute] = Factory(list) + + def add(self, name: str, type: str) -> "PrimaryKeySchema": # noqa: A002 + self.schema.append(Attribute.from_dynamodb(name, type)) + return self + + @classmethod + def from_table(cls, table) -> "PrimaryKeySchema": + """ + # attribute_definitions: [{'AttributeName': 'Id', 'AttributeType': 'N'}] + # key_schema: [{'AttributeName': 'Id', 'KeyType': 'HASH'}] + """ + + schema = cls() + attribute_type_map: t.Dict[str, str] = {} + for attr in table.attribute_definitions: + attribute_type_map[attr["AttributeName"]] = attr["AttributeType"] + + for key in table.key_schema: + name = key["AttributeName"] + type_ = attribute_type_map[name] + schema.add(name=name, type=type_) + + return schema + + def keys(self) -> t.List[str]: + return [attribute.name for attribute in self.schema] + + def column_names(self) -> t.List[str]: + return [f'"{attribute.name}"' for attribute in self.schema] + + def to_sql_ddl_clauses(self) -> t.List[str]: + return [f'"{attribute.name}" {attribute.cratedb_type} PRIMARY KEY' for attribute in self.schema] diff --git a/tests/transform/conftest.py b/tests/transform/conftest.py index 50dcb70..8c33e84 100644 --- a/tests/transform/conftest.py +++ b/tests/transform/conftest.py @@ -1,5 +1,8 @@ import pytest +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator, DynamoDBFullLoadTranslator +from commons_codec.transform.dynamodb_model import PrimaryKeySchema + RESET_TABLES = [ "from.dynamodb", "from.mongodb", @@ -13,3 +16,13 @@ def cratedb(cratedb_service): """ cratedb_service.reset(tables=RESET_TABLES) yield cratedb_service + + +@pytest.fixture +def dynamodb_full_translator_foo(): + return DynamoDBFullLoadTranslator(table_name="foo", primary_key_schema=PrimaryKeySchema().add("id", "S")) + + +@pytest.fixture +def dynamodb_cdc_translator_foo(): + return DynamoDBCDCTranslator(table_name="foo") diff --git a/tests/transform/test_dynamodb_cdc.py b/tests/transform/test_dynamodb_cdc.py index a5b9dd8..a4d7add 100644 --- a/tests/transform/test_dynamodb_cdc.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -3,8 +3,8 @@ import pytest -from commons_codec.model import DualRecord, SQLOperation -from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator +from commons_codec.model import SQLOperation, UniversalRecord +from commons_codec.transform.dynamodb import CrateDBTypeDeserializer pytestmark = pytest.mark.dynamodb @@ -27,8 +27,9 @@ "tableName": "foo", "dynamodb": { "ApproximateCreationDateTime": 1720740233012995, - "Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, "NewImage": { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, "humidity": {"N": "84.84"}, "temperature": {"N": "42.42"}, "device": {"S": "foo"}, @@ -80,8 +81,9 @@ "tableName": "foo", "dynamodb": { "ApproximateCreationDateTime": 1720742302233719, - "Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, "NewImage": { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, "humidity": {"N": "84.84"}, "temperature": {"N": "55.66"}, "device": {"S": "bar"}, @@ -114,9 +116,10 @@ "tableName": "foo", "dynamodb": { "ApproximateCreationDateTime": 1720742302233719, - "Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, "NewImage": { - "device": {"M": {"id": {"S": "bar"}, "serial": {"N": 12345}}}, + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, + "device": {"S": "foo"}, "tags": {"L": [{"S": "foo"}, {"S": "bar"}]}, "empty_map": {"M": {}}, "empty_list": {"L": []}, @@ -153,8 +156,9 @@ "tableName": "foo", "dynamodb": { "ApproximateCreationDateTime": 1720742321848352, - "Keys": {"device": {"S": "bar"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, "OldImage": { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, "humidity": {"N": "84.84"}, "temperature": {"N": "55.66"}, "device": {"S": "bar"}, @@ -176,35 +180,31 @@ } -def test_decode_ddb_deserialize_type(): - assert DynamoDBCDCTranslator(table_name="foo").decode_record({"foo": {"N": "84.84"}}) == DualRecord( - typed={"foo": 84.84}, untyped={} +def test_decode_ddb_deserialize_type(dynamodb_cdc_translator_foo): + assert dynamodb_cdc_translator_foo.decode_record({"foo": {"N": "84.84"}}) == UniversalRecord( + pk={}, typed={"foo": 84.84}, untyped={} ) -def test_decode_cdc_sql_ddl(): - assert ( - DynamoDBCDCTranslator(table_name="foo").sql_ddl - == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC), aux OBJECT(IGNORED));" - ) - - -def test_decode_cdc_unknown_source(): +def test_decode_cdc_unknown_source(dynamodb_cdc_translator_foo): with pytest.raises(ValueError) as ex: - DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_UNKNOWN_SOURCE) + dynamodb_cdc_translator_foo.to_sql(MSG_UNKNOWN_SOURCE) assert ex.match("Unknown eventSource: foo:bar") -def test_decode_cdc_unknown_event(): +def test_decode_cdc_unknown_event(dynamodb_cdc_translator_foo): with pytest.raises(ValueError) as ex: - DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_UNKNOWN_EVENT) + dynamodb_cdc_translator_foo.to_sql(MSG_UNKNOWN_EVENT) assert ex.match("Unknown CDC event name: FOOBAR") -def test_decode_cdc_insert_basic(): - assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation( - statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", +def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo): + assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_BASIC) == SQLOperation( + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", parameters={ + "pk": { + "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", + }, "typed": { "humidity": 84.84, "temperature": 42.42, @@ -219,12 +219,14 @@ def test_decode_cdc_insert_basic(): ) -def test_decode_cdc_insert_nested(): - assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation( - statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", +def test_decode_cdc_insert_nested(dynamodb_cdc_translator_foo): + assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_NESTED) == SQLOperation( + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", parameters={ - "typed": { + "pk": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", + }, + "typed": { "data": {"temperature": 42.42, "humidity": 84.84}, "meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}, "string_set": ["location_1"], @@ -237,14 +239,16 @@ def test_decode_cdc_insert_nested(): ) -def test_decode_cdc_modify_basic(): - assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation( - statement="UPDATE foo SET data=:typed, aux=:untyped " - "WHERE data['device']=:device AND data['timestamp']=:timestamp;", +def test_decode_cdc_modify_basic(dynamodb_cdc_translator_foo): + assert dynamodb_cdc_translator_foo.to_sql(MSG_MODIFY_BASIC) == SQLOperation( + statement="UPDATE foo SET data=:typed, aux=:untyped WHERE pk=:pk;", parameters={ - "device": "foo", - "timestamp": "2024-07-12T01:17:42", + "pk": { + "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", + }, "typed": { + "device": "bar", + "timestamp": "2024-07-12T01:17:42", "humidity": 84.84, "temperature": 55.66, "location": "Sydney", @@ -259,14 +263,16 @@ def test_decode_cdc_modify_basic(): ) -def test_decode_cdc_modify_nested(): - assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation( - statement="UPDATE foo SET data=:typed, aux=:untyped " - "WHERE data['device']=:device AND data['timestamp']=:timestamp;", +def test_decode_cdc_modify_nested(dynamodb_cdc_translator_foo): + assert dynamodb_cdc_translator_foo.to_sql(MSG_MODIFY_NESTED) == SQLOperation( + statement="UPDATE foo SET data=:typed, aux=:untyped WHERE pk=:pk;", parameters={ - "device": "foo", - "timestamp": "2024-07-12T01:17:42", + "pk": { + "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", + }, "typed": { + "device": "foo", + "timestamp": "2024-07-12T01:17:42", "tags": ["foo", "bar"], "empty_map": {}, "empty_list": [], @@ -281,12 +287,15 @@ def test_decode_cdc_modify_nested(): ) -def test_decode_cdc_remove(): - assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_REMOVE) == SQLOperation( - statement="DELETE FROM foo WHERE data['device']=:device AND data['timestamp']=:timestamp;", +def test_decode_cdc_remove(dynamodb_cdc_translator_foo): + assert dynamodb_cdc_translator_foo.to_sql(MSG_REMOVE) == SQLOperation( + statement="DELETE FROM foo WHERE pk=:pk;", parameters={ - "device": "bar", - "timestamp": "2024-07-12T01:17:42", + "pk": { + "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", + }, + "typed": {}, + "untyped": {}, }, ) diff --git a/tests/transform/test_dynamodb_full.py b/tests/transform/test_dynamodb_full.py index ece6234..225f364 100644 --- a/tests/transform/test_dynamodb_full.py +++ b/tests/transform/test_dynamodb_full.py @@ -41,8 +41,11 @@ "set_of_strings": {"SS": ["location_1"]}, } -RECORD_OUT_DATA = { +RECORD_OUT_PK = { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", +} + +RECORD_OUT_DATA = { "data": {"temperature": 42.42, "humidity": 84.84}, "meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}, "location": { @@ -76,21 +79,29 @@ } -def test_sql_ddl(): +def test_sql_ddl_success(dynamodb_full_translator_foo): assert ( - DynamoDBFullLoadTranslator(table_name="foo").sql_ddl - == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC), aux OBJECT(IGNORED));" + dynamodb_full_translator_foo.sql_ddl == "CREATE TABLE IF NOT EXISTS foo " + '(pk OBJECT(STRICT) AS ("id" STRING PRIMARY KEY), data OBJECT(DYNAMIC), aux OBJECT(IGNORED));' ) -def test_to_sql_operation(): +def test_sql_ddl_failure(dynamodb_full_translator_foo): + translator = DynamoDBFullLoadTranslator(table_name="foo") + with pytest.raises(IOError) as ex: + _ = translator.sql_ddl + assert ex.match("Unable to generate SQL DDL without key schema information") + + +def test_to_sql_operation(dynamodb_full_translator_foo): """ Verify outcome of `DynamoDBFullLoadTranslator.to_sql` operation. """ - assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_IN) == SQLOperation( - statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", + assert dynamodb_full_translator_foo.to_sql(RECORD_IN) == SQLOperation( + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", parameters=[ { + "pk": RECORD_OUT_PK, "typed": RECORD_OUT_DATA, "untyped": RECORD_OUT_AUX, } @@ -99,13 +110,15 @@ def test_to_sql_operation(): @pytest.mark.integration -def test_to_sql_cratedb(caplog, cratedb): +def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo): """ Verify writing converted DynamoDB record to CrateDB. """ # Compute CrateDB operation (SQL+parameters) from DynamoDB record. - translator = DynamoDBFullLoadTranslator(table_name="from.dynamodb") + translator = DynamoDBFullLoadTranslator( + table_name="from.dynamodb", primary_key_schema=dynamodb_full_translator_foo.primary_key_schema + ) operation = translator.to_sql(RECORD_IN) # Insert into CrateDB. @@ -118,5 +131,6 @@ def test_to_sql_cratedb(caplog, cratedb): assert cratedb.database.count_records("from.dynamodb") == 1 results = cratedb.database.run_sql('SELECT * FROM "from".dynamodb;', records=True) # noqa: S608 + assert results[0]["pk"] == RECORD_OUT_PK assert results[0]["data"] == RECORD_OUT_DATA assert results[0]["aux"] == RECORD_OUT_AUX diff --git a/tests/transform/test_dynamodb_model.py b/tests/transform/test_dynamodb_model.py new file mode 100644 index 0000000..583c0fa --- /dev/null +++ b/tests/transform/test_dynamodb_model.py @@ -0,0 +1,31 @@ +import pytest + +from commons_codec.transform.dynamodb_model import PrimaryKeySchema + + +def test_primary_key_schema_from_table_success(): + class SurrogateTable: + attribute_definitions = [ + {"AttributeName": "Id", "AttributeType": "N"}, + ] + key_schema = [ + {"AttributeName": "Id", "KeyType": "HASH"}, + ] + + pks = PrimaryKeySchema.from_table(SurrogateTable()) + assert pks == PrimaryKeySchema().add("Id", "N") + assert pks.column_names() == ['"Id"'] + + +def test_primary_key_schema_from_table_unknown_type(): + class SurrogateTable: + attribute_definitions = [ + {"AttributeName": "Id", "AttributeType": "F"}, + ] + key_schema = [ + {"AttributeName": "Id", "KeyType": "HASH"}, + ] + + with pytest.raises(KeyError) as ex: + PrimaryKeySchema.from_table(SurrogateTable()) + assert ex.match("Mapping DynamoDB type failed: name=Id, type=F") diff --git a/tests/transform/test_dynamodb_types_cratedb.py b/tests/transform/test_dynamodb_types_cratedb.py index b83e562..40eec90 100644 --- a/tests/transform/test_dynamodb_types_cratedb.py +++ b/tests/transform/test_dynamodb_types_cratedb.py @@ -3,8 +3,8 @@ import pytest -from commons_codec.model import DualRecord -from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator +from commons_codec.model import UniversalRecord +from commons_codec.transform.dynamodb import CrateDBTypeDeserializer pytestmark = pytest.mark.dynamodb @@ -21,7 +21,7 @@ def test_deserialize_list(self): ] -def test_decode_typed_untyped(): - assert DynamoDBCDCTranslator(table_name="foo").decode_record( +def test_decode_typed_untyped(dynamodb_cdc_translator_foo): + assert dynamodb_cdc_translator_foo.decode_record( {"foo": {"N": "84.84"}, "bar": {"L": [{"N": "1"}, {"S": "foo"}]}} - ) == DualRecord(typed={"foo": 84.84}, untyped={"bar": [1.0, "foo"]}) + ) == UniversalRecord(pk={}, typed={"foo": 84.84}, untyped={"bar": [1.0, "foo"]})