From 14e0fba2380e4d96bf53b268c83ac67a2e70c34f Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 28 Aug 2024 21:23:55 +0200 Subject: [PATCH] DynamoDB: Add software test for decoding list of objects --- tests/io/dynamodb/manager.py | 29 ++++++++++++++++++ tests/io/dynamodb/test_copy.py | 54 ++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 tests/io/dynamodb/test_copy.py diff --git a/tests/io/dynamodb/manager.py b/tests/io/dynamodb/manager.py index ce62c12..68c347d 100644 --- a/tests/io/dynamodb/manager.py +++ b/tests/io/dynamodb/manager.py @@ -1,4 +1,5 @@ import json +import typing as t from pathlib import Path from yarl import URL @@ -37,3 +38,31 @@ def load_product_catalog(self): self.adapter.dynamodb_client.batch_write_item(RequestItems=data) table.load() return table + + def load_records(self, table_name: str, records: t.List[t.Dict[str, t.Any]]): + table = self.adapter.dynamodb_resource.Table(table_name) + try: + table.delete() + except Exception: # noqa: S110 + pass + + table = self.adapter.dynamodb_resource.create_table( + TableName=table_name, + KeySchema=[ + {"AttributeName": "Id", "KeyType": "HASH"}, + ], + AttributeDefinitions=[ + {"AttributeName": "Id", "AttributeType": "N"}, + ], + ProvisionedThroughput={ + "ReadCapacityUnits": 1, + "WriteCapacityUnits": 1, + }, + TableClass="STANDARD", + ) + table.wait_until_exists() + + for record in records: + self.adapter.dynamodb_client.put_item(TableName=table_name, Item=record) + table.load() + return table diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py new file mode 100644 index 0000000..b758d59 --- /dev/null +++ b/tests/io/dynamodb/test_copy.py @@ -0,0 +1,54 @@ +from cratedb_toolkit.io.dynamodb.copy import DynamoDBFullLoad + +RECORD_UTM = { + "Id": {"N": "101"}, + "utmTags": { + "L": [ + { + "M": { + "date": {"S": "2024-08-28T20:05:42.603Z"}, + "utm_adgroup": {"L": [{"S": ""}, {"S": ""}]}, + "utm_campaign": {"S": "34374686341"}, + "utm_medium": {"S": "foobar"}, + "utm_source": {"S": "google"}, + } + } + ] + }, +} + + +def test_dynamodb_copy_list_of_objects(caplog, cratedb, dynamodb, dynamodb_test_manager): + """ + CLI test: Invoke `ctk load table` for DynamoDB. + """ + + # Define source and target URLs. + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + + # Populate source database with data. + dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM]) + + # Run transfer command. + table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url) + table_loader.start() + + # Verify data in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 1 + + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608 + assert results[0]["data"] == { + "Id": 101.0, + "utmTags": [ + { + "date": "2024-08-28T20:05:42.603Z", + "utm_adgroup": ["", ""], + "utm_campaign": "34374686341", + "utm_medium": "foobar", + "utm_source": "google", + } + ], + }