To add a new connector, you need to create a new folder in the db_components
directory. Ideally copy one of the existing folders (e.g. ex_mysql_cdc
) and modify it to your needs.
The component specific files are:
src/component.py
- the main component file, where the component is defined. Inherit from theDebeziumComponent
class.src/configuration.py
- the configuration file, where the component configuration is defined. Inherit from theDebeziumConfiguration
class.src/extractor/extractor.py
- the extractor file, where the data is extracted from the database. Inherit from theDebeziumExtractor
class.
Component UI and developer portal metadata are defined in the component_config
folder.
The configuration class is the main class that defines the configuration for the component. It should inherit from the DebeziumConfiguration
class.
Most of the configuration options are shared among all the components. Generally you will need to implement only the following pydantic objects.
If you need anything extra the configuration classes/properties can be overridden.
from enum import Enum
from pydantic import Field
from db_components.db_common.configuration import DbConnectionOptions, DebeziumConfiguration
class Adapter(str, Enum):
mysql = "MySQL"
mariadb = "MariaDB"
class XXDbOptions(DbConnectionOptions):
# any additional options specific for the database
user: str = Field()
password: str = Field(alias="#password")
adapter: Adapter = Adapter.mysql
class XXConfiguration(DebeziumConfiguration[XXDbOptions]):
# override needed for proper functioning
db_settings: XXDbOptions
The extractor class is the main class that defines the extractor. It should inherit from the DebeziumExtractor
class and implement the following abstract methods.
It also requires BaseTypeConverter class that converts source db types to BaseTypes:
from typing import Any
from pathlib import Path
from typing import Optional
from db_components.db_common.extractor import DebeziumExtractor
from db_components.db_common.table_schema import BaseTypeConverter
class XXBaseTypeConverter(BaseTypeConverter):
"""
Converts source db types to BaseTypes
"""
MAPPING = {"tinyint": "INTEGER",
"smallint": "INTEGER",
"mediumint": "INTEGER",
"int": "INTEGER",
"bigint": "INTEGER",
"decimal": "NUMERIC",
"float": "NUMERIC",
"double": "NUMERIC",
"bit": "STRING",
"date": "DATE",
"datetime": "TIMESTAMP",
"timestamp": "TIMESTAMP",
"time": "TIMESTAMP",
"year": "INTEGER",
"char": "STRING",
"varchar": "STRING",
"binary": "STRING",
"varbinary": "STRING",
"tinyblob": "STRING",
"blob": "STRING",
"mediumblob": "STRING",
"longblob": "STRING",
"tinytext": "STRING",
"text": "STRING",
"mediumtext": "STRING",
"longtext": "STRING",
"enum": "STRING",
"set": "STRING"}
@classmethod
def supported_types(cls) -> list[str]:
"""
List supported data types. This is used for schema conversion. Unsupported types will be converted to STRING.
"""
return list(cls.MAPPING.keys())
def __call__(self, source_type: str, length: Optional[str] = None) -> str:
source_type_lower = source_type.lower()
match source_type_lower:
case 'bit' if str(length) == '1':
return 'BOOLEAN'
case 'boolean':
return 'BOOLEAN'
case _:
return self.MAPPING.get(source_type_lower, 'STRING')
class XXDebeziumExtractor(DebeziumExtractor):
def __init__(self, db_credentials: XXDbOptions):
"""
Prepare JDBC parameters for connector initialization
"""
# get current file path
jdbc_folder = Path(__file__).parent.parent.parent
# jar path
jars = [jdbc_folder.joinpath('jdbc/DRIVER.jar').as_posix()]
driver_class = 'com.mysql.cj.jdbc.Driver'
protocol = 'jdbc:mysql'
driver_args = {'user': db_credentials.user, 'password': db_credentials.password}
url_string = f'{protocol}://{db_credentials.host}:{db_credentials.port}'
super().__init__(db_credentials, jars, driver_args,
driver_class, url_string, XXBaseTypeConverter())
def test_has_replication_privilege(self):
"""
Test if the user has replication privileges. Used for connection testing
Returns:
"""
pass
def validate_user_permissions(self):
"""
Validate user permissions. Used for connection testing
"""
pass
def get_target_position(self) -> Any:
"""
Get the current position in the source log.
This can be left empty if it's not applicable. Many connectors may use DefaultStoppingCondition
Returns: any specific position representation for the database
"""
pass
Once you have created implemented the Configuration and Extractor classes, you can implement the Component class.
The component class is the main class that defines the component. It should inherit from the DebeziumComponent
class and implement the following abstract methods:
from db_components.db_common.cdc_component import DebeziumComponent
from db_components.db_common.table_schema import ColumnSchema, TableSchema
from db_components.debezium.executor import DefaultStoppingCondition, StoppingCondition
from db_components.db_common.cdc_component import DEFAULT_TOPIC_NAME
class XXCDCComponent(DebeziumComponent[XXDebeziumExtractor, XXConfiguration]):
def _build_stopping_condition(self) -> StoppingCondition:
"""
Builds stopping condition based on the configuration specific for the connector.
If a stopping condition is missing, implement new one.
Returns:
"""
max_duration_s = self._configuration.sync_options.max_runtime_s or self.component_timeout
return DefaultStoppingCondition(max_duration_s, self._configuration.sync_options.max_wait_s)
@property
def has_schema_history(self) -> bool:
"""
Flag if the Debezium connector that component implements uses schema history.
(all but Postgres, Mongo) use schema history
Returns:
"""
return True
@property
def system_columns(self) -> list[ColumnSchema]:
"""
Returns the system columns for the component. It can be specific for each db tyoe.
e.g.
[
ColumnSchema(name="KBC__OPERATION", source_type="STRING"),
ColumnSchema(name="KBC__EVENT_TIMESTAMP_MS", source_type="TIMESTAMP"),
ColumnSchema(name="KBC__FILE", source_type="STRING"),
ColumnSchema(name="KBC__POS", source_type="INTEGER"),
ColumnSchema(name="KBC__DELETED", source_type="BOOLEAN"),
ColumnSchema(name="KBC__BATCH_EVENT_ORDER", source_type="INTEGER")
]
"""
return self.DEFAULT_SYSTEM_COLUMNS
@property
def system_column_name_mapping(self) -> dict[str, str]:
"""
Returns mapping of column names coming from the debezium connector to system columns (output names).
e.g. {"kbc__operation": "KBC__OPERATION",
"kbc__event_timestamp": "KBC__EVENT_TIMESTAMP_MS",
"kbc__file": "KBC__FILE",
"kbc__pos": "KBC__POS",
"__deleted": "KBC__DELETED",
"kbc__batch_event_order": "KBC__BATCH_EVENT_ORDER"}
"""
return self.DEFAULT_SYSTEM_COLUMN_NAME_MAPPING
def _get_db_specific_dbz_properties(self) -> dict:
pass
def get_tables_in_schema(self, schema: str) -> list[str]:
"""
Each database schema can be considered a different level (e.g. database, schema)
so it needs to be implemented
It can be different for each database type. In many cases you can keep the default implementation.
Args:
schema:
Returns:
"""
return [f"{t[1]}.{t[2]}" for t in self._client.metadata_provider.get_tables(schema_pattern=schema)]
def get_table_metadata(self, schema: str, table: str) -> TableSchema:
"""
Get metadata for a specific table.
Each database schema can be considered a different level (e.g. database, schema)
so it needs to be implemented.
It can be different for each database type. In many cases you can keep the default implementation.
Args:
schema:
table:
Returns:
"""
return self._client.metadata_provider.get_table_metadata(schema=schema,
table_name=table)
def generate_table_key(self, schema: TableSchema, separator: str = '_') -> str:
"""
Generates table key based on the schema. This is used for metadata indexing.
It can be different for each database type. In many cases you can keep the default implementation.
Args:
schema:
separator: optional separator
Returns:
"""
# TODO: change the topic name (testcdc)
schema_key = separator.join([DEFAULT_TOPIC_NAME, schema.schema_name, schema.name])
return schema_key
Debezium based CDC components functional tests are defined using the datadir
test framework.
The functional tests are abstracted in thedb_components/debezium_core/tests
package.
TestDatabaseEnvironment
is the base class for JDBC connection to prepare the test environment. ]
It is initialized in the DebeziumCDCDatadirTest
class and may be used in the set_up.py
scripts to prepare the test cases.
prepare_initial_table(script_name)
- This method creates a test table using the provided SQL script.- The folder containing the scripts is defined by the
db_test_traits.set_sql_traits_folder(sql_traits_path)
method.
- The folder containing the scripts is defined by the
DebeziumCDCDatadirTest
is the base class for all functional tests. It performs the following tasks:
- Initializes the
TestDatabaseEnvironment
and injects the instance into thecontext_parameters['db_client']
variable of the test object.- This can be accessed from individual tests from the
set_up.py
script.
- This can be accessed from individual tests from the
- Initializes and runs the component.
- Cleans up the results so they can be compared.
- Removes the
KBC__EVENT_TIMESTAMP_MS
column from the result as this is dynamic value changing with each run. - Removes the
id
column fromdebezium_signals
table as this is dynamically generated UUID. - The results are always sorted by the event order so we can safely delete these columns.
- Removes the
In the tests folder create the following:
functional
folder that contains the test case. (folder per each test case)- See examples in the db_components/ex_postgres_cdc/tests/functional folder.
test_functional.py
running the DataDir testsql_test_traits
folder containing the SQL scripts for the test cases.- Example Postgres scripts are available in the
/db_components/debezium/tests/db_test_traits
folder, these can be converted using ChatGPT into other SQL dialects.
- Example Postgres scripts are available in the
test_functional.py
should contain the following:
import os
import unittest
from datadirtest import DataDirTester
from db_components.debezium.tests.db_test_traits import traits as db_test_traits
from db_components.debezium.tests.functional import DebeziumCDCDatadirTest
class TestComponent(unittest.TestCase):
# @freeze_time("2024-02-03 14:50:42.833622")
def test_functional(self):
# Set the path to the SQL traits folder
sql_traits_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'sql_test_traits')
db_test_traits.set_sql_traits_folder(sql_traits_path)
functional_tests = DataDirTester(test_data_dir_class=DebeziumCDCDatadirTest)
functional_tests.run()
if __name__ == "__main__":
unittest.main()
from datadirtest import TestDataDir
from db_components.debezium.tests.functional import TestDatabaseEnvironment
def run(context: TestDataDir):
# get value from the context parameters injected via DataDirTester constructor
sql_client: TestDatabaseEnvironment = context.context_parameters['db_client']
# prepare the initial sales table
sql_client.prepare_initial_table('sales_table.sql')
sql_client.create_signal_table()
print("Running before script")
import os
from datadirtest import TestDataDir
from db_components.debezium.tests.functional import TestDatabaseEnvironment
def get_transactions_queries():
transactions_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'transactions.sql')
return str(open(transactions_path, 'r').read()).split(';')
def run(context: TestDataDir):
# get value from the context parameters injected via DataDirTester constructor
sql_client: TestDatabaseEnvironment = context.context_parameters['db_client']
sql_client.connection.connect()
schema = 'inventory'
sql_client.perform_query(f'SET search_path TO {schema}')
# run the transactions
queries = get_transactions_queries()
for q in queries:
if q.strip():
sql_client.perform_query(q)
sql_client.perform_query('commit')
sql_client.connection.close()
print("Running before script")
Use chained test to test the consecutive runs. E.g. initial load and then increments. The chained tests are ordered by name, and they pass state to each other.
Single test runs:
- Clone this repository
- Navigate to the debezium_core directory
- Run mvn clean install, this will generate kbcDebeziumEngine-jar-with-dependencies.jar in the debezium_core/jars directory.
MIT licensed, see LICENSE file.