Skip to content

Commit

Permalink
DynamoDB: Add pagination support for full-load table loader
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 2, 2024
1 parent 4c72537 commit 46bbc16
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@


## Unreleased
- DynamoDB: Add special decoding for varied lists.
Store them into a separate `OBJECT(IGNORED)` column in CrateDB.
- DynamoDB: Add pagination support for `full-load` table loader

## 2024/08/27 v0.0.20
- DMS/DynamoDB: Fix table name quoting within CDC processor handler
Expand Down
39 changes: 35 additions & 4 deletions cratedb_toolkit/io/dynamodb/adapter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
import typing as t

import boto3
from yarl import URL

logger = logging.getLogger(__name__)


class DynamoDBAdapter:
def __init__(self, dynamodb_url: URL, echo: bool = False):
def __init__(self, dynamodb_url: URL):
self.session = boto3.Session(
aws_access_key_id=dynamodb_url.user,
aws_secret_access_key=dynamodb_url.password,
Expand All @@ -15,11 +20,37 @@ def __init__(self, dynamodb_url: URL, echo: bool = False):
self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url)
self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url)

def scan(self, table_name: str):
def scan(
self,
table_name: str,
page_size: int = 1000,
consistent_read: bool = False,
on_error: t.Literal["log", "raise"] = "log",
) -> t.Generator[t.Dict, None, None]:
"""
Return all items from DynamoDB table.
Fetch and generate all items from a DynamoDB table, with pagination.
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
"""
return self.dynamodb_client.scan(TableName=table_name)
key = None
while True:
try:
scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": page_size}
if key is not None:
scan_kwargs += {"ExclusiveStartKey": key}
response = self.dynamodb_client.scan(**scan_kwargs)
yield response
key = response.get("LastEvaluatedKey", None)
if key is None:
break
except Exception as ex:
if on_error == "log":
logger.exception("Error reading DynamoDB table")
elif on_error == "raise":
raise
else:
raise ValueError(f"Unknown 'on_error' value: {on_error}") from ex
break

def count_records(self, table_name: str):
table = self.dynamodb_resource.Table(table_name)
Expand Down
39 changes: 25 additions & 14 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.data import asbool

logger = logging.getLogger(__name__)

Expand All @@ -23,6 +24,7 @@ def __init__(
dynamodb_url: str,
cratedb_url: str,
progress: bool = False,
debug: bool = True,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
Expand All @@ -36,38 +38,47 @@ def __init__(
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.progress = progress
self.debug = debug

self.page_size: int = int(self.dynamodb_url.query.get("page-size", 1000))
self.consistent_read: bool = asbool(self.dynamodb_url.query.get("consistent-read", False))

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
records_out = 0
for operation in self.items_to_operations(result["Items"]):
for result in self.dynamodb_adapter.scan(
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
page_size=self.page_size,
):
result_size = len(result["Items"])
try:
operation = self.translator.to_sql(result["Items"])
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out < records_in:
if records_out == 0:
logger.warning("No data has been copied")

def items_to_operations(self, items):
"""
Convert data for record items to INSERT statements.
"""
for item in items:
yield self.translator.to_sql(item)
6 changes: 4 additions & 2 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
MESSAGE_FORMAT: str = os.environ.get("MESSAGE_FORMAT", "unknown")
COLUMN_TYPES: str = os.environ.get("COLUMN_TYPES", "")
CRATEDB_SQLALCHEMY_URL: str = os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://")
CRATEDB_TABLE: str = os.environ.get("CRATEDB_TABLE", "default")
CRATEDB_TABLE: t.Optional[str] = os.environ.get("CRATEDB_TABLE")

logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
Expand Down Expand Up @@ -127,7 +127,9 @@ def handler(event, context):
connection.execute(sa.text(operation.statement), parameters=operation.parameters)

# Processing alternating CDC events requires write synchronization.
connection.execute(sa.text(f"REFRESH TABLE {cdc.quote_table_name(CRATEDB_TABLE)}"))
# TODO: Improve interface.
if hasattr(cdc, "table_name"):
connection.execute(sa.text(f"REFRESH TABLE {cdc.table_name}"))

connection.commit()

Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/testing/testcontainers/localstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer):
useful when used within a test matrix. Its default value is `latest`.
"""

LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest")
LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "3.7")

def __init__(
self,
Expand Down
16 changes: 16 additions & 0 deletions doc/io/dynamodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```

## Options

### `page-size`
The source URL accepts the `page-size` option to configure DynamoDB
[pagination]. The default value is `1000`.
```shell
ctk load table .../ProductCatalog?region=us-east-1&page-size=5000
```

### `consistent-read`
The source URL accepts the `consistent-read` option to configure DynamoDB
[read consistency]. The default value is `false`.


## Variants

### CrateDB Cloud
Expand Down Expand Up @@ -66,3 +80,5 @@ docker run \

[Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/
[Get started with DynamoDB on LocalStack]: https://docs.localstack.cloud/user-guide/aws/dynamodb/
[pagination]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
[read consistency]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ReadConsistency
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ docs = [
]
dynamodb = [
"boto3",
"commons-codec>=0.0.12",
"commons-codec @ git+https://github.com/crate/commons-codec.git@dynamodb-full-load-batch",
]
full = [
"cratedb-toolkit[cfr,cloud,datasets,io,service]",
Expand All @@ -155,11 +155,11 @@ io = [
"sqlalchemy>=2",
]
kinesis = [
"commons-codec>=0.0.12",
"lorrystream[carabas]",
"commons-codec @ git+https://github.com/crate/commons-codec.git@dynamodb-full-load-batch",
"lorrystream[carabas]>=0.0.6",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.12",
"commons-codec[mongodb,zyp] @ git+https://github.com/crate/commons-codec.git@dynamodb-full-load-batch",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
42 changes: 42 additions & 0 deletions tests/io/dynamodb/test_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
from botocore.exceptions import ParamValidationError
from yarl import URL

from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter

pytestmark = pytest.mark.dynamodb


RECORD = {
"Id": {"N": "101"},
}


def test_adapter_scan_success(dynamodb):
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))
adapter.scan("foo")


def test_adapter_scan_failure_consistent_read(dynamodb):
"""
Check supplying invalid parameters to `DynamoDBAdapter` fails as expected.
"""
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))

with pytest.raises(ParamValidationError) as ex:
next(adapter.scan("demo", consistent_read=-42, on_error="raise"))
assert ex.match("Parameter validation failed:\nInvalid type for parameter ConsistentRead, value: -42.*")


def test_adapter_scan_failure_page_size(dynamodb):
"""
Check supplying invalid parameters to `DynamoDBAdapter` fails as expected.
"""
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))

with pytest.raises(ParamValidationError) as ex:
next(adapter.scan("demo", page_size=-1, on_error="raise"))
assert ex.match("Parameter validation failed:\nInvalid value for parameter Limit, value: -1, valid min value: 1")
2 changes: 1 addition & 1 deletion tests/io/dynamodb/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
CLI test: Invoke `ctk load table` for DynamoDB.
"""
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with sample dataset.
dynamodb_test_manager.load_product_catalog()
Expand Down
48 changes: 6 additions & 42 deletions tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,22 @@
pytestmark = pytest.mark.dynamodb


RECORD_UTM = {
RECORD = {
"Id": {"N": "101"},
"utmTags": {
"L": [
{
"M": {
"date": {"S": "2024-08-28T20:05:42.603Z"},
"utm_adgroup": {"L": [{"S": ""}, {"S": ""}]},
"utm_campaign": {"S": "34374686341"},
"utm_medium": {"S": "foobar"},
"utm_source": {"S": "google"},
}
}
]
},
"location": {
"M": {
"coordinates": {"L": [{"S": ""}]},
"meetingPoint": {"S": "At the end of the tunnel"},
"address": {"S": "Salzbergwerk Berchtesgaden"},
},
},
}


def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager):
def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
CLI test: Invoke `ctk load table` for DynamoDB.
Verify `DynamoDBFullLoad` works as expected.
"""

# Define source and target URLs.
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM])
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD])

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
Expand All @@ -52,20 +32,4 @@ def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager):
assert cratedb.database.count_records("testdrive.demo") == 1

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608
assert results[0]["data"] == {
"Id": 101.0,
"utmTags": [
{
"date": "2024-08-28T20:05:42.603Z",
"utm_adgroup": ["", ""],
"utm_campaign": "34374686341",
"utm_medium": "foobar",
"utm_source": "google",
}
],
"location": {
"coordinates": [""],
"meetingPoint": "At the end of the tunnel",
"address": "Salzbergwerk Berchtesgaden",
},
}
assert results[0]["data"] == {"Id": 101.0}

0 comments on commit 46bbc16

Please sign in to comment.