Skip to content

Commit

Permalink
Replace poor man's relation name quoting with quote_relation_name
Browse files Browse the repository at this point in the history
... from `sqlalchemy-cratedb` package.
  • Loading branch information
Andreas Motl authored and amotl committed Aug 31, 2024
1 parent 3e32304 commit 664a5f7
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 62 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Replace poor man's relation name quoting with implementation
`quote_relation_name` from `sqlalchemy-cratedb` package.

## 2024/08/27 v0.0.13
- DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values
Expand Down
5 changes: 3 additions & 2 deletions examples/mongodb_cdc_cratedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import pymongo
import sqlalchemy as sa
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB

Expand All @@ -33,7 +34,7 @@ def __init__(
self.cratedb_client = sa.create_engine(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = cratedb_table
self.table_name = quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)

def start(self):
Expand All @@ -45,7 +46,7 @@ def start(self):
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
connection.execute(sa.text(f"REFRESH TABLE {self.cdc.quote_table_name(self.table_name)};"))
connection.execute(sa.text(f"REFRESH TABLE {self.table_name};"))

def cdc_to_sql(self):
"""
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ dependencies = [
"backports-strenum<1.3; python_version<'3.11'",
"cattrs<24",
"simplejson<4",
"sqlalchemy-cratedb>=0.39.0",
"toolz<0.13",
]
optional-dependencies.all = [
Expand Down
17 changes: 4 additions & 13 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import sys
import typing as t
from enum import auto
from functools import cached_property

from attr import Factory
from attrs import define
from sqlalchemy_cratedb.support import quote_relation_name

if sys.version_info >= (3, 11):
from enum import StrEnum
Expand All @@ -17,22 +19,11 @@ class TableAddress:
schema: str
table: str

@property
@cached_property
def fqn(self):
if not self.schema:
raise ValueError("Unable to compute a full-qualified table name without schema name")
return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}"

@staticmethod
def quote_identifier(name: str) -> str:
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if name and '"' not in name:
name = f'"{name}"'
return name
return quote_relation_name(f"{self.schema}.{self.table}")


class ColumnType(StrEnum):
Expand Down
14 changes: 2 additions & 12 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing as t

import toolz
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import (
SQLOperation,
Expand Down Expand Up @@ -48,7 +49,7 @@ class DynamoTranslatorBase:

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)
self.table_name = quote_relation_name(table_name)
self.deserializer = CrateDBTypeDeserializer()

@property
Expand All @@ -58,17 +59,6 @@ def sql_ddl(self):
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name and "." not in name:
name = f'"{name}"'
return name

def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
"""
Deserialize DynamoDB JSON record into vanilla Python.
Expand Down
14 changes: 2 additions & 12 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typing as t

from bson.json_util import _json_convert
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import SQLOperation

Expand Down Expand Up @@ -101,7 +102,7 @@ class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase):

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)
self.table_name = quote_relation_name(table_name)

@property
def sql_ddl(self):
Expand Down Expand Up @@ -184,14 +185,3 @@ def where_clause(self, record: t.Dict[str, t.Any]) -> str:
"""
oid = self.get_document_key(record)
return f"oid = '{oid}'"

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name:
name = f'"{name}"'
return name
11 changes: 8 additions & 3 deletions tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress


def test_table_address_success():
def test_table_address_basic():
ta = TableAddress(schema="foo", table="bar")
assert ta.fqn == '"foo"."bar"'
assert ta.fqn == "foo.bar"


def test_table_address_quoting():
ta = TableAddress(schema="select", table="from")
assert ta.fqn == '"select"."from"'


def test_table_address_failure():
ta = TableAddress(schema=None, table="bar")
with pytest.raises(ValueError) as ex:
_ = ta.fqn
assert ex.match("adcdc")
assert ex.match("Unable to compute a full-qualified table name without schema name")


def test_column_type_map_store_serialize():
Expand Down
10 changes: 5 additions & 5 deletions tests/transform/test_aws_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,19 @@ def test_decode_cdc_unknown_event(cdc):

def test_decode_cdc_sql_ddl_regular(cdc):
assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == SQLOperation(
statement='CREATE TABLE IF NOT EXISTS "public"."foo" (data OBJECT(DYNAMIC));', parameters=None
statement="CREATE TABLE IF NOT EXISTS public.foo (data OBJECT(DYNAMIC));", parameters=None
)


def test_decode_cdc_sql_ddl_awsdms(cdc):
assert cdc.to_sql(MSG_CONTROL_AWSDMS) == SQLOperation(
statement='CREATE TABLE IF NOT EXISTS "dms"."awsdms_apply_exceptions" (data OBJECT(DYNAMIC));', parameters=None
statement="CREATE TABLE IF NOT EXISTS dms.awsdms_apply_exceptions (data OBJECT(DYNAMIC));", parameters=None
)


def test_decode_cdc_insert(cdc):
assert cdc.to_sql(MSG_DATA_INSERT) == SQLOperation(
statement='INSERT INTO "public"."foo" (data) VALUES (:record);', parameters={"record": RECORD_INSERT}
statement="INSERT INTO public.foo (data) VALUES (:record);", parameters={"record": RECORD_INSERT}
)


Expand All @@ -238,7 +238,7 @@ def test_decode_cdc_update_success(cdc):

# Emulate an UPDATE operation.
assert cdc.to_sql(MSG_DATA_UPDATE_VALUE) == SQLOperation(
statement='UPDATE "public"."foo" SET '
statement="UPDATE public.foo SET "
"data['age']=:age, data['attributes']=:attributes, data['name']=:name "
"WHERE data['id']=:id;",
parameters=RECORD_UPDATE,
Expand Down Expand Up @@ -267,7 +267,7 @@ def test_decode_cdc_delete_success(cdc):

# Emulate a DELETE operation.
assert cdc.to_sql(MSG_DATA_DELETE) == SQLOperation(
statement='DELETE FROM "public"."foo" WHERE data[\'id\']=:id;', parameters={"id": 45}
statement="DELETE FROM public.foo WHERE data['id']=:id;", parameters={"id": 45}
)


Expand Down
12 changes: 6 additions & 6 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def test_decode_ddb_deserialize_type():


def test_decode_cdc_sql_ddl():
assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));'
assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"


def test_decode_cdc_unknown_source():
Expand All @@ -196,7 +196,7 @@ def test_decode_cdc_unknown_event():

def test_decode_cdc_insert_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"humidity": 84.84,
Expand All @@ -213,7 +213,7 @@ def test_decode_cdc_insert_basic():

def test_decode_cdc_insert_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -230,7 +230,7 @@ def test_decode_cdc_insert_nested():

def test_decode_cdc_modify_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation(
statement='UPDATE "foo" SET '
statement="UPDATE foo SET "
"data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['empty_string']=:empty_string, data['null_string']=:null_string "
Expand All @@ -252,7 +252,7 @@ def test_decode_cdc_modify_basic():

def test_decode_cdc_modify_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation(
statement='UPDATE "foo" SET '
statement="UPDATE foo SET "
"data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) "
Expand All @@ -274,7 +274,7 @@ def test_decode_cdc_modify_nested():

def test_decode_cdc_remove():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_REMOVE) == SQLOperation(
statement="DELETE FROM \"foo\" WHERE data['device']=:device AND data['timestamp']=:timestamp;",
statement="DELETE FROM foo WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "bar",
"timestamp": "2024-07-12T01:17:42",
Expand Down
7 changes: 3 additions & 4 deletions tests/transform/test_dynamodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@

def test_sql_ddl():
assert (
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl
== 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));'
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"
)


def test_to_sql_all_types():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_ALL_TYPES) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -59,7 +58,7 @@ def test_to_sql_all_types():

def test_to_sql_list_of_objects():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_UTM) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"utmTags": [
Expand Down
10 changes: 5 additions & 5 deletions tests/transform/test_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
def test_decode_cdc_sql_ddl():
assert (
MongoDBCDCTranslatorCrateDB(table_name="foo").sql_ddl
== 'CREATE TABLE IF NOT EXISTS "foo" (oid TEXT, data OBJECT(DYNAMIC));'
== "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));"
)


Expand All @@ -120,7 +120,7 @@ def test_decode_cdc_optype_empty():

def test_decode_cdc_insert():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == SQLOperation(
statement='INSERT INTO "foo" (oid, data) VALUES (:oid, :record);',
statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);",
parameters={
"oid": "669683c2b0750b2c84893f3e",
"record": {
Expand All @@ -135,7 +135,7 @@ def test_decode_cdc_insert():

def test_decode_cdc_update():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation(
statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={
"record": {
"_id": {"$oid": "669683c2b0750b2c84893f3e"},
Expand All @@ -149,14 +149,14 @@ def test_decode_cdc_update():

def test_decode_cdc_replace():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation(
statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}},
)


def test_decode_cdc_delete():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) == SQLOperation(
statement="DELETE FROM \"foo\" WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None
statement="DELETE FROM foo WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None
)


Expand Down

0 comments on commit 664a5f7

Please sign in to comment.