diff --git a/CHANGES.md b/CHANGES.md index 3b825f2..9361d63 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- Removed poor man's relation name quoting. When needed, use + `quote_relation_name` from `sqlalchemy-cratedb` package. ## 2024/08/27 v0.0.13 - DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values diff --git a/examples/mongodb_cdc_cratedb.py b/examples/mongodb_cdc_cratedb.py index 25fdf00..f117661 100644 --- a/examples/mongodb_cdc_cratedb.py +++ b/examples/mongodb_cdc_cratedb.py @@ -45,7 +45,7 @@ def start(self): for sql in self.cdc_to_sql(): if sql: connection.execute(sa.text(sql)) - connection.execute(sa.text(f"REFRESH TABLE {self.cdc.quote_table_name(self.table_name)};")) + connection.execute(sa.text(f"REFRESH TABLE {self.table_name};")) def cdc_to_sql(self): """ diff --git a/pyproject.toml b/pyproject.toml index cd82f5d..30378e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,6 +140,7 @@ optional-dependencies.test = [ "pytest<9", "pytest-cov<6", "pytest-mock<4", + "sqlalchemy-cratedb>=0.39.0", ] optional-dependencies.zyp = [ "jmespath<1.1", diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py index 93f1e21..74de525 100644 --- a/src/commons_codec/model.py +++ b/src/commons_codec/model.py @@ -2,6 +2,7 @@ import sys import typing as t from enum import auto +from functools import cached_property from attr import Factory from attrs import define @@ -11,28 +12,25 @@ else: from backports.strenum import StrEnum # pragma: no cover +try: + from sqlalchemy_cratedb.support import quote_relation_name +except ImportError: # pragma: no cover + quote_relation_name = None + @define(frozen=True) class TableAddress: schema: str table: str - @property + @cached_property def fqn(self): if not self.schema: raise ValueError("Unable to compute a full-qualified table name without schema name") - return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}" - - @staticmethod - def quote_identifier(name: str) -> str: - """ - Poor man's table quoting. - - TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. - """ - if name and '"' not in name: - name = f'"{name}"' - return name + identifier = f"{self.schema}.{self.table}" + if quote_relation_name: + identifier = quote_relation_name(identifier) + return identifier class ColumnType(StrEnum): diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index 9736ddc..71f55e9 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -48,7 +48,7 @@ class DynamoTranslatorBase: def __init__(self, table_name: str): super().__init__() - self.table_name = self.quote_table_name(table_name) + self.table_name = table_name self.deserializer = CrateDBTypeDeserializer() @property @@ -58,17 +58,6 @@ def sql_ddl(self): """ return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" - @staticmethod - def quote_table_name(name: str): - """ - Poor man's table quoting. - - TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. - """ - if '"' not in name and "." not in name: - name = f'"{name}"' - return name - def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: """ Deserialize DynamoDB JSON record into vanilla Python. diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index aabbd72..98a58bc 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -101,7 +101,7 @@ class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase): def __init__(self, table_name: str): super().__init__() - self.table_name = self.quote_table_name(table_name) + self.table_name = table_name @property def sql_ddl(self): @@ -184,14 +184,3 @@ def where_clause(self, record: t.Dict[str, t.Any]) -> str: """ oid = self.get_document_key(record) return f"oid = '{oid}'" - - @staticmethod - def quote_table_name(name: str): - """ - Poor man's table quoting. - - TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. - """ - if '"' not in name: - name = f'"{name}"' - return name diff --git a/tests/test_model.py b/tests/test_model.py index 881a1d5..d1194dc 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -3,16 +3,21 @@ from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress -def test_table_address_success(): +def test_table_address_basic(): ta = TableAddress(schema="foo", table="bar") - assert ta.fqn == '"foo"."bar"' + assert ta.fqn == "foo.bar" + + +def test_table_address_quoting(): + ta = TableAddress(schema="select", table="from") + assert ta.fqn == '"select"."from"' def test_table_address_failure(): ta = TableAddress(schema=None, table="bar") with pytest.raises(ValueError) as ex: _ = ta.fqn - assert ex.match("adcdc") + assert ex.match("Unable to compute a full-qualified table name without schema name") def test_column_type_map_store_serialize(): diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py index 40a33b7..bb2ef3c 100644 --- a/tests/transform/test_aws_dms.py +++ b/tests/transform/test_aws_dms.py @@ -213,19 +213,19 @@ def test_decode_cdc_unknown_event(cdc): def test_decode_cdc_sql_ddl_regular(cdc): assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == SQLOperation( - statement='CREATE TABLE IF NOT EXISTS "public"."foo" (data OBJECT(DYNAMIC));', parameters=None + statement="CREATE TABLE IF NOT EXISTS public.foo (data OBJECT(DYNAMIC));", parameters=None ) def test_decode_cdc_sql_ddl_awsdms(cdc): assert cdc.to_sql(MSG_CONTROL_AWSDMS) == SQLOperation( - statement='CREATE TABLE IF NOT EXISTS "dms"."awsdms_apply_exceptions" (data OBJECT(DYNAMIC));', parameters=None + statement="CREATE TABLE IF NOT EXISTS dms.awsdms_apply_exceptions (data OBJECT(DYNAMIC));", parameters=None ) def test_decode_cdc_insert(cdc): assert cdc.to_sql(MSG_DATA_INSERT) == SQLOperation( - statement='INSERT INTO "public"."foo" (data) VALUES (:record);', parameters={"record": RECORD_INSERT} + statement="INSERT INTO public.foo (data) VALUES (:record);", parameters={"record": RECORD_INSERT} ) @@ -238,7 +238,7 @@ def test_decode_cdc_update_success(cdc): # Emulate an UPDATE operation. assert cdc.to_sql(MSG_DATA_UPDATE_VALUE) == SQLOperation( - statement='UPDATE "public"."foo" SET ' + statement="UPDATE public.foo SET " "data['age']=:age, data['attributes']=:attributes, data['name']=:name " "WHERE data['id']=:id;", parameters=RECORD_UPDATE, @@ -267,7 +267,7 @@ def test_decode_cdc_delete_success(cdc): # Emulate a DELETE operation. assert cdc.to_sql(MSG_DATA_DELETE) == SQLOperation( - statement='DELETE FROM "public"."foo" WHERE data[\'id\']=:id;', parameters={"id": 45} + statement="DELETE FROM public.foo WHERE data['id']=:id;", parameters={"id": 45} ) diff --git a/tests/transform/test_dynamodb_cdc.py b/tests/transform/test_dynamodb_cdc.py index cdd1dec..c27b7f0 100644 --- a/tests/transform/test_dynamodb_cdc.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -179,7 +179,7 @@ def test_decode_ddb_deserialize_type(): def test_decode_cdc_sql_ddl(): - assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));' + assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));" def test_decode_cdc_unknown_source(): @@ -196,7 +196,7 @@ def test_decode_cdc_unknown_event(): def test_decode_cdc_insert_basic(): assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation( - statement='INSERT INTO "foo" (data) VALUES (:record);', + statement="INSERT INTO foo (data) VALUES (:record);", parameters={ "record": { "humidity": 84.84, @@ -213,7 +213,7 @@ def test_decode_cdc_insert_basic(): def test_decode_cdc_insert_nested(): assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation( - statement='INSERT INTO "foo" (data) VALUES (:record);', + statement="INSERT INTO foo (data) VALUES (:record);", parameters={ "record": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", @@ -230,7 +230,7 @@ def test_decode_cdc_insert_nested(): def test_decode_cdc_modify_basic(): assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation( - statement='UPDATE "foo" SET ' + statement="UPDATE foo SET " "data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, " "data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, " "data['empty_string']=:empty_string, data['null_string']=:null_string " @@ -252,7 +252,7 @@ def test_decode_cdc_modify_basic(): def test_decode_cdc_modify_nested(): assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation( - statement='UPDATE "foo" SET ' + statement="UPDATE foo SET " "data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, " "data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, " "data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) " @@ -274,7 +274,7 @@ def test_decode_cdc_modify_nested(): def test_decode_cdc_remove(): assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_REMOVE) == SQLOperation( - statement="DELETE FROM \"foo\" WHERE data['device']=:device AND data['timestamp']=:timestamp;", + statement="DELETE FROM foo WHERE data['device']=:device AND data['timestamp']=:timestamp;", parameters={ "device": "bar", "timestamp": "2024-07-12T01:17:42", diff --git a/tests/transform/test_dynamodb_full.py b/tests/transform/test_dynamodb_full.py index cfac06f..bb1887e 100644 --- a/tests/transform/test_dynamodb_full.py +++ b/tests/transform/test_dynamodb_full.py @@ -35,14 +35,13 @@ def test_sql_ddl(): assert ( - DynamoDBFullLoadTranslator(table_name="foo").sql_ddl - == 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));' + DynamoDBFullLoadTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));" ) def test_to_sql_all_types(): assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_ALL_TYPES) == SQLOperation( - statement='INSERT INTO "foo" (data) VALUES (:record);', + statement="INSERT INTO foo (data) VALUES (:record);", parameters={ "record": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", @@ -59,7 +58,7 @@ def test_to_sql_all_types(): def test_to_sql_list_of_objects(): assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_UTM) == SQLOperation( - statement='INSERT INTO "foo" (data) VALUES (:record);', + statement="INSERT INTO foo (data) VALUES (:record);", parameters={ "record": { "utmTags": [ diff --git a/tests/transform/test_mongodb.py b/tests/transform/test_mongodb.py index e2d6441..494aa9f 100644 --- a/tests/transform/test_mongodb.py +++ b/tests/transform/test_mongodb.py @@ -96,7 +96,7 @@ def test_decode_cdc_sql_ddl(): assert ( MongoDBCDCTranslatorCrateDB(table_name="foo").sql_ddl - == 'CREATE TABLE IF NOT EXISTS "foo" (oid TEXT, data OBJECT(DYNAMIC));' + == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" ) @@ -120,7 +120,7 @@ def test_decode_cdc_optype_empty(): def test_decode_cdc_insert(): assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == SQLOperation( - statement='INSERT INTO "foo" (oid, data) VALUES (:oid, :record);', + statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", parameters={ "oid": "669683c2b0750b2c84893f3e", "record": { @@ -135,7 +135,7 @@ def test_decode_cdc_insert(): def test_decode_cdc_update(): assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation( - statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", + statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", parameters={ "record": { "_id": {"$oid": "669683c2b0750b2c84893f3e"}, @@ -149,14 +149,14 @@ def test_decode_cdc_update(): def test_decode_cdc_replace(): assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( - statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", + statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}}, ) def test_decode_cdc_delete(): assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) == SQLOperation( - statement="DELETE FROM \"foo\" WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None + statement="DELETE FROM foo WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None )