Skip to content

Commit

Permalink
DynamoDB: Change CrateDB data model to use (pk, data, aux) columns
Browse files Browse the repository at this point in the history
By breaking the primary key information out of the main record's data
bucket, the main record can be updated as-is on CDC MODIFY operations.
  • Loading branch information
amotl committed Sep 30, 2024
1 parent 553aaba commit 66bf65d
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 106 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- DynamoDB: Change CrateDB data model to use (`pk`, `data`, `aux`) columns
Attention: This is a breaking change.

## 2024/09/26 v0.0.19
- DynamoDB CDC: Fix `MODIFY` operation by propagating `NewImage` fully
Expand Down
7 changes: 4 additions & 3 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,15 @@ def to_sql(self):


@define
class DualRecord:
class UniversalRecord:
"""
Manage two halves of a record.
Manage a universal record including primary keys and two halves of a record.
One bucket stores the typed fields, the other stores the untyped ones.
"""

pk: t.Dict[str, t.Any]
typed: t.Dict[str, t.Any]
untyped: t.Dict[str, t.Any]

def to_dict(self):
return {"typed": self.typed, "untyped": self.untyped}
return {"pk": self.pk, "typed": self.typed, "untyped": self.untyped}
101 changes: 57 additions & 44 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import (
DualRecord,
SQLOperation,
SQLParameterizedWhereClause,
UniversalRecord,
)
from commons_codec.transform.dynamodb_model import PrimaryKeySchema
from commons_codec.util.data import TaggableList
from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer

Expand Down Expand Up @@ -73,6 +73,10 @@ class DynamoTranslatorBase:
Translate DynamoDB records into a different representation.
"""

# Define name of the column where KeySchema DynamoDB fields will get materialized into.
# This column uses the `OBJECT(DYNAMIC)` data type.
PK_COLUMN = "pk"

# Define name of the column where typed DynamoDB fields will get materialized into.
# This column uses the `OBJECT(DYNAMIC)` data type.
TYPED_COLUMN = "data"
Expand All @@ -81,22 +85,27 @@ class DynamoTranslatorBase:
# This column uses the `OBJECT(IGNORED)` data type.
UNTYPED_COLUMN = "aux"

def __init__(self, table_name: str):
def __init__(self, table_name: str, primary_key_schema: PrimaryKeySchema = None):
super().__init__()
self.table_name = quote_relation_name(table_name)
self.primary_key_schema = primary_key_schema
self.deserializer = CrateDBTypeDeserializer()

@property
def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
if self.primary_key_schema is None:
raise IOError("Unable to generate SQL DDL without key schema information")
return (
f"CREATE TABLE IF NOT EXISTS {self.table_name} "
f"({self.TYPED_COLUMN} OBJECT(DYNAMIC), {self.UNTYPED_COLUMN} OBJECT(IGNORED));"
f"CREATE TABLE IF NOT EXISTS {self.table_name} ("
f"{self.PK_COLUMN} OBJECT(STRICT) AS ({', '.join(self.primary_key_schema.to_sql_ddl_clauses())}), "
f"{self.TYPED_COLUMN} OBJECT(DYNAMIC), "
f"{self.UNTYPED_COLUMN} OBJECT(IGNORED));"
)

def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord:
def decode_record(self, item: t.Dict[str, t.Any], key_names: t.Union[t.List[str], None] = None) -> UniversalRecord:
"""
Deserialize DynamoDB JSON record into vanilla Python.
Expand Down Expand Up @@ -124,20 +133,37 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord:
-- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors
"""
record = toolz.valmap(self.deserializer.deserialize, item)

pk = {}
untyped = {}
pk_names = key_names or []
if not pk_names and self.primary_key_schema is not None:
pk_names = self.primary_key_schema.keys()
for key, value in record.items():
if key in pk_names:
pk[key] = value
if isinstance(value, TaggableList) and value.get_tag("varied", False):
untyped[key] = value
record = toolz.dissoc(record, *pk.keys())
record = toolz.dissoc(record, *untyped.keys())
return DualRecord(typed=record, untyped=untyped)
return UniversalRecord(pk=pk, typed=record, untyped=untyped)


class DynamoDBFullLoadTranslator(DynamoTranslatorBase):
def to_sql(self, data: t.Union[RecordType, t.List[RecordType]]) -> SQLOperation:
"""
Produce INSERT SQL operations (SQL statement and parameters) from DynamoDB record(s).
"""
sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
sql = (
f"INSERT INTO {self.table_name} ("
f"{self.PK_COLUMN}, "
f"{self.TYPED_COLUMN}, "
f"{self.UNTYPED_COLUMN}"
f") VALUES ("
f":pk, "
f":typed, "
f":untyped);"
)
if not isinstance(data, list):
data = [data]
parameters = [self.decode_record(record).to_dict() for record in data]
Expand Down Expand Up @@ -166,56 +192,43 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
raise ValueError(f"Unknown eventSource: {event_source}")

if event_name == "INSERT":
dual_record = self.decode_record(event["dynamodb"]["NewImage"])
record = self.decode_event(event["dynamodb"])
sql = (
f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
f"INSERT INTO {self.table_name} ("
f"{self.PK_COLUMN}, "
f"{self.TYPED_COLUMN}, "
f"{self.UNTYPED_COLUMN}"
f") VALUES ("
f":pk, "
f":typed, "
f":untyped);"
)
parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped}
parameters = record.to_dict()

elif event_name == "MODIFY":
new_image = event["dynamodb"]["NewImage"]
# Drop primary key columns to not update them.
# Primary key values should be identical (if chosen identical in DynamoDB and CrateDB),
# but CrateDB does not allow having them in an UPDATE's SET clause.
for key in event["dynamodb"]["Keys"]:
del new_image[key]

dual_record = self.decode_record(event["dynamodb"]["NewImage"])

where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
record = self.decode_event(event["dynamodb"])
sql = (
f"UPDATE {self.table_name} "
f"SET {self.TYPED_COLUMN}=:typed, {self.UNTYPED_COLUMN}=:untyped "
f"WHERE {where_clause.to_sql()};"
f"WHERE {self.PK_COLUMN}=:pk;"
)
parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped}
parameters.update(where_clause.values)
parameters = record.to_dict()

elif event_name == "REMOVE":
where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"DELETE FROM {self.table_name} WHERE {where_clause.to_sql()};"
parameters = where_clause.values # noqa: PD011
record = self.decode_event(event["dynamodb"])
sql = f"DELETE FROM {self.table_name} WHERE {self.PK_COLUMN}=:pk;"
parameters = record.to_dict()

else:
raise ValueError(f"Unknown CDC event name: {event_name}")

return SQLOperation(sql, parameters)

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
IN (top-level stripped):
"Keys": {
"device": {"S": "foo"},
"timestamp": {"S": "2024-07-12T01:17:42"},
}
def decode_event(self, event: t.Dict[str, t.Any]) -> UniversalRecord:
# That's for INSERT+MODIFY.
if "NewImage" in event:
return self.decode_record(event["NewImage"], event["Keys"].keys())

OUT:
WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42'
"""
dual_record = self.decode_record(keys)
clause = SQLParameterizedWhereClause()
for key_name, key_value in dual_record.typed.items():
clause.add(lval=f"{self.TYPED_COLUMN}['{key_name}']", name=key_name, value=key_value)
return clause
# That's for REMOVE.
else:
return self.decode_record(event["Keys"], event["Keys"].keys())
82 changes: 82 additions & 0 deletions src/commons_codec/transform/dynamodb_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import sys
import typing as t

from attr import Factory, define

if sys.version_info >= (3, 11):
from enum import StrEnum
else:
from backports.strenum import StrEnum # pragma: no cover


class AttributeType(StrEnum):
STRING = "STRING"
NUMBER = "NUMBER"
BINARY = "BINARY"


DYNAMODB_TYPE_MAP = {
"S": AttributeType.STRING,
"N": AttributeType.NUMBER,
"B": AttributeType.BINARY,
}

CRATEDB_TYPE_MAP = {
AttributeType.STRING: "STRING",
AttributeType.NUMBER: "BIGINT",
AttributeType.BINARY: "STRING",
}


@define
class Attribute:
name: str
type: AttributeType

@classmethod
def from_dynamodb(cls, name: str, type_: str):
try:
return cls(name=name, type=DYNAMODB_TYPE_MAP[type_])
except KeyError as ex:
raise KeyError(f"Mapping DynamoDB type failed: name={name}, type={type_}") from ex

@property
def cratedb_type(self):
return CRATEDB_TYPE_MAP[self.type]


@define
class PrimaryKeySchema:
schema: t.List[Attribute] = Factory(list)

def add(self, name: str, type: str) -> "PrimaryKeySchema": # noqa: A002
self.schema.append(Attribute.from_dynamodb(name, type))
return self

@classmethod
def from_table(cls, table) -> "PrimaryKeySchema":
"""
# attribute_definitions: [{'AttributeName': 'Id', 'AttributeType': 'N'}]
# key_schema: [{'AttributeName': 'Id', 'KeyType': 'HASH'}]
"""

schema = cls()
attribute_type_map: t.Dict[str, str] = {}
for attr in table.attribute_definitions:
attribute_type_map[attr["AttributeName"]] = attr["AttributeType"]

for key in table.key_schema:
name = key["AttributeName"]
type_ = attribute_type_map[name]
schema.add(name=name, type=type_)

return schema

def keys(self) -> t.List[str]:
return [attribute.name for attribute in self.schema]

def column_names(self) -> t.List[str]:
return [f'"{attribute.name}"' for attribute in self.schema]

def to_sql_ddl_clauses(self) -> t.List[str]:
return [f'"{attribute.name}" {attribute.cratedb_type} PRIMARY KEY' for attribute in self.schema]
13 changes: 13 additions & 0 deletions tests/transform/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import pytest

from commons_codec.transform.dynamodb import DynamoDBCDCTranslator, DynamoDBFullLoadTranslator
from commons_codec.transform.dynamodb_model import PrimaryKeySchema

RESET_TABLES = [
"from.dynamodb",
"from.mongodb",
Expand All @@ -13,3 +16,13 @@ def cratedb(cratedb_service):
"""
cratedb_service.reset(tables=RESET_TABLES)
yield cratedb_service


@pytest.fixture
def dynamodb_full_translator_foo():
return DynamoDBFullLoadTranslator(table_name="foo", primary_key_schema=PrimaryKeySchema().add("id", "S"))


@pytest.fixture
def dynamodb_cdc_translator_foo():
return DynamoDBCDCTranslator(table_name="foo")
Loading

0 comments on commit 66bf65d

Please sign in to comment.