Skip to content

Commit

Permalink
Remove poor man's relation name quoting
Browse files Browse the repository at this point in the history
When needed, use `quote_relation_name` from `sqlalchemy-cratedb`
package.
  • Loading branch information
Andreas Motl committed Aug 29, 2024
1 parent 3e32304 commit d42fdad
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 61 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Removed poor man's relation name quoting. When needed, use
`quote_relation_name` from `sqlalchemy-cratedb` package.

## 2024/08/27 v0.0.13
- DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values
Expand Down
2 changes: 1 addition & 1 deletion examples/mongodb_cdc_cratedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def start(self):
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
connection.execute(sa.text(f"REFRESH TABLE {self.cdc.quote_table_name(self.table_name)};"))
connection.execute(sa.text(f"REFRESH TABLE {self.table_name};"))

def cdc_to_sql(self):
"""
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ optional-dependencies.test = [
"pytest<9",
"pytest-cov<6",
"pytest-mock<4",
"sqlalchemy-cratedb>=0.39.0",
]
optional-dependencies.zyp = [
"jmespath<1.1",
Expand Down
24 changes: 11 additions & 13 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import typing as t
from enum import auto
from functools import cached_property

from attr import Factory
from attrs import define
Expand All @@ -11,28 +12,25 @@
else:
from backports.strenum import StrEnum # pragma: no cover

try:
from sqlalchemy_cratedb.support import quote_relation_name
except ImportError: # pragma: no cover
quote_relation_name = None


@define(frozen=True)
class TableAddress:
schema: str
table: str

@property
@cached_property
def fqn(self):
if not self.schema:
raise ValueError("Unable to compute a full-qualified table name without schema name")
return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}"

@staticmethod
def quote_identifier(name: str) -> str:
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if name and '"' not in name:
name = f'"{name}"'
return name
identifier = f"{self.schema}.{self.table}"
if quote_relation_name:
identifier = quote_relation_name(identifier)
return identifier


class ColumnType(StrEnum):
Expand Down
13 changes: 1 addition & 12 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DynamoTranslatorBase:

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)
self.table_name = table_name
self.deserializer = CrateDBTypeDeserializer()

@property
Expand All @@ -58,17 +58,6 @@ def sql_ddl(self):
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name and "." not in name:
name = f'"{name}"'
return name

def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
"""
Deserialize DynamoDB JSON record into vanilla Python.
Expand Down
13 changes: 1 addition & 12 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase):

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)
self.table_name = table_name

@property
def sql_ddl(self):
Expand Down Expand Up @@ -184,14 +184,3 @@ def where_clause(self, record: t.Dict[str, t.Any]) -> str:
"""
oid = self.get_document_key(record)
return f"oid = '{oid}'"

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name:
name = f'"{name}"'
return name
11 changes: 8 additions & 3 deletions tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress


def test_table_address_success():
def test_table_address_basic():
ta = TableAddress(schema="foo", table="bar")
assert ta.fqn == '"foo"."bar"'
assert ta.fqn == "foo.bar"


def test_table_address_quoting():
ta = TableAddress(schema="select", table="from")
assert ta.fqn == '"select"."from"'


def test_table_address_failure():
ta = TableAddress(schema=None, table="bar")
with pytest.raises(ValueError) as ex:
_ = ta.fqn
assert ex.match("adcdc")
assert ex.match("Unable to compute a full-qualified table name without schema name")


def test_column_type_map_store_serialize():
Expand Down
10 changes: 5 additions & 5 deletions tests/transform/test_aws_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,19 @@ def test_decode_cdc_unknown_event(cdc):

def test_decode_cdc_sql_ddl_regular(cdc):
assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == SQLOperation(
statement='CREATE TABLE IF NOT EXISTS "public"."foo" (data OBJECT(DYNAMIC));', parameters=None
statement="CREATE TABLE IF NOT EXISTS public.foo (data OBJECT(DYNAMIC));", parameters=None
)


def test_decode_cdc_sql_ddl_awsdms(cdc):
assert cdc.to_sql(MSG_CONTROL_AWSDMS) == SQLOperation(
statement='CREATE TABLE IF NOT EXISTS "dms"."awsdms_apply_exceptions" (data OBJECT(DYNAMIC));', parameters=None
statement="CREATE TABLE IF NOT EXISTS dms.awsdms_apply_exceptions (data OBJECT(DYNAMIC));", parameters=None
)


def test_decode_cdc_insert(cdc):
assert cdc.to_sql(MSG_DATA_INSERT) == SQLOperation(
statement='INSERT INTO "public"."foo" (data) VALUES (:record);', parameters={"record": RECORD_INSERT}
statement="INSERT INTO public.foo (data) VALUES (:record);", parameters={"record": RECORD_INSERT}
)


Expand All @@ -238,7 +238,7 @@ def test_decode_cdc_update_success(cdc):

# Emulate an UPDATE operation.
assert cdc.to_sql(MSG_DATA_UPDATE_VALUE) == SQLOperation(
statement='UPDATE "public"."foo" SET '
statement="UPDATE public.foo SET "
"data['age']=:age, data['attributes']=:attributes, data['name']=:name "
"WHERE data['id']=:id;",
parameters=RECORD_UPDATE,
Expand Down Expand Up @@ -267,7 +267,7 @@ def test_decode_cdc_delete_success(cdc):

# Emulate a DELETE operation.
assert cdc.to_sql(MSG_DATA_DELETE) == SQLOperation(
statement='DELETE FROM "public"."foo" WHERE data[\'id\']=:id;', parameters={"id": 45}
statement="DELETE FROM public.foo WHERE data['id']=:id;", parameters={"id": 45}
)


Expand Down
12 changes: 6 additions & 6 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def test_decode_ddb_deserialize_type():


def test_decode_cdc_sql_ddl():
assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));'
assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"


def test_decode_cdc_unknown_source():
Expand All @@ -196,7 +196,7 @@ def test_decode_cdc_unknown_event():

def test_decode_cdc_insert_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"humidity": 84.84,
Expand All @@ -213,7 +213,7 @@ def test_decode_cdc_insert_basic():

def test_decode_cdc_insert_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -230,7 +230,7 @@ def test_decode_cdc_insert_nested():

def test_decode_cdc_modify_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation(
statement='UPDATE "foo" SET '
statement="UPDATE foo SET "
"data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['empty_string']=:empty_string, data['null_string']=:null_string "
Expand All @@ -252,7 +252,7 @@ def test_decode_cdc_modify_basic():

def test_decode_cdc_modify_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation(
statement='UPDATE "foo" SET '
statement="UPDATE foo SET "
"data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) "
Expand All @@ -274,7 +274,7 @@ def test_decode_cdc_modify_nested():

def test_decode_cdc_remove():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_REMOVE) == SQLOperation(
statement="DELETE FROM \"foo\" WHERE data['device']=:device AND data['timestamp']=:timestamp;",
statement="DELETE FROM foo WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "bar",
"timestamp": "2024-07-12T01:17:42",
Expand Down
7 changes: 3 additions & 4 deletions tests/transform/test_dynamodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@

def test_sql_ddl():
assert (
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl
== 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));'
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"
)


def test_to_sql_all_types():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_ALL_TYPES) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -59,7 +58,7 @@ def test_to_sql_all_types():

def test_to_sql_list_of_objects():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_UTM) == SQLOperation(
statement='INSERT INTO "foo" (data) VALUES (:record);',
statement="INSERT INTO foo (data) VALUES (:record);",
parameters={
"record": {
"utmTags": [
Expand Down
10 changes: 5 additions & 5 deletions tests/transform/test_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
def test_decode_cdc_sql_ddl():
assert (
MongoDBCDCTranslatorCrateDB(table_name="foo").sql_ddl
== 'CREATE TABLE IF NOT EXISTS "foo" (oid TEXT, data OBJECT(DYNAMIC));'
== "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));"
)


Expand All @@ -120,7 +120,7 @@ def test_decode_cdc_optype_empty():

def test_decode_cdc_insert():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == SQLOperation(
statement='INSERT INTO "foo" (oid, data) VALUES (:oid, :record);',
statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);",
parameters={
"oid": "669683c2b0750b2c84893f3e",
"record": {
Expand All @@ -135,7 +135,7 @@ def test_decode_cdc_insert():

def test_decode_cdc_update():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation(
statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={
"record": {
"_id": {"$oid": "669683c2b0750b2c84893f3e"},
Expand All @@ -149,14 +149,14 @@ def test_decode_cdc_update():

def test_decode_cdc_replace():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation(
statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}},
)


def test_decode_cdc_delete():
assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) == SQLOperation(
statement="DELETE FROM \"foo\" WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None
statement="DELETE FROM foo WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None
)


Expand Down

0 comments on commit d42fdad

Please sign in to comment.