Skip to content

General library for Python CDC components running in Keboola Connection environment

License

Notifications You must be signed in to change notification settings

keboola/python-cdc-component

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CDC component

Adding new connector

To add a new connector, you need to create a new folder in the db_components directory. Ideally copy one of the existing folders (e.g. ex_mysql_cdc) and modify it to your needs.

The component specific files are:

  • src/component.py - the main component file, where the component is defined. Inherit from the DebeziumComponent class.
  • src/configuration.py - the configuration file, where the component configuration is defined. Inherit from the DebeziumConfiguration class.
  • src/extractor/extractor.py - the extractor file, where the data is extracted from the database. Inherit from the DebeziumExtractor class.

Component UI and developer portal metadata are defined in the component_config folder.

Implementing the Configuration class

The configuration class is the main class that defines the configuration for the component. It should inherit from the DebeziumConfiguration class. Most of the configuration options are shared among all the components. Generally you will need to implement only the following pydantic objects. If you need anything extra the configuration classes/properties can be overridden.

from enum import Enum

from pydantic import Field

from db_components.db_common.configuration import DbConnectionOptions, DebeziumConfiguration


class Adapter(str, Enum):
    mysql = "MySQL"
    mariadb = "MariaDB"


class XXDbOptions(DbConnectionOptions):
    # any additional options specific for the database
    user: str = Field()
    password: str = Field(alias="#password")
    adapter: Adapter = Adapter.mysql


class XXConfiguration(DebeziumConfiguration[XXDbOptions]):
    # override needed for proper functioning
    db_settings: XXDbOptions

Implementing the Extractor class

The extractor class is the main class that defines the extractor. It should inherit from the DebeziumExtractor class and implement the following abstract methods. It also requires BaseTypeConverter class that converts source db types to BaseTypes:

from typing import Any
from pathlib import Path
from typing import Optional
from db_components.db_common.extractor import DebeziumExtractor
from db_components.db_common.table_schema import BaseTypeConverter

class XXBaseTypeConverter(BaseTypeConverter):
    """
     Converts source db types to BaseTypes
    """
    MAPPING = {"tinyint": "INTEGER",
               "smallint": "INTEGER",
               "mediumint": "INTEGER",
               "int": "INTEGER",
               "bigint": "INTEGER",
               "decimal": "NUMERIC",
               "float": "NUMERIC",
               "double": "NUMERIC",
               "bit": "STRING",
               "date": "DATE",
               "datetime": "TIMESTAMP",
               "timestamp": "TIMESTAMP",
               "time": "TIMESTAMP",
               "year": "INTEGER",
               "char": "STRING",
               "varchar": "STRING",
               "binary": "STRING",
               "varbinary": "STRING",
               "tinyblob": "STRING",
               "blob": "STRING",
               "mediumblob": "STRING",
               "longblob": "STRING",
               "tinytext": "STRING",
               "text": "STRING",
               "mediumtext": "STRING",
               "longtext": "STRING",
               "enum": "STRING",
               "set": "STRING"}
    
    @classmethod
    def supported_types(cls) -> list[str]:
        """
        
          List supported data types. This is used for schema conversion. Unsupported types will be converted to STRING.
        """
        return list(cls.MAPPING.keys())
    
    def __call__(self, source_type: str, length: Optional[str] = None) -> str:
        source_type_lower = source_type.lower()
        match source_type_lower:
            case 'bit' if str(length) == '1':
                return 'BOOLEAN'
            case 'boolean':
                return 'BOOLEAN'
            case _:
                return self.MAPPING.get(source_type_lower, 'STRING')


class XXDebeziumExtractor(DebeziumExtractor):

    def __init__(self, db_credentials: XXDbOptions):
        """
          Prepare JDBC parameters for connector initialization
        """
        # get current file path
        jdbc_folder = Path(__file__).parent.parent.parent
        # jar path
        jars = [jdbc_folder.joinpath('jdbc/DRIVER.jar').as_posix()]
        driver_class = 'com.mysql.cj.jdbc.Driver'
        protocol = 'jdbc:mysql'

        driver_args = {'user': db_credentials.user, 'password': db_credentials.password}
        url_string = f'{protocol}://{db_credentials.host}:{db_credentials.port}'
        super().__init__(db_credentials, jars, driver_args,
                         driver_class, url_string, XXBaseTypeConverter())

    def test_has_replication_privilege(self):
        """
        Test if the user has replication privileges. Used for connection testing
        Returns:

        """
        pass

    def validate_user_permissions(self):
      """
      Validate user permissions. Used for connection testing
      """
      pass

    def get_target_position(self) -> Any:
        """
        Get the current position in the source log. 
        This can be left empty if it's not applicable. Many connectors may use DefaultStoppingCondition
        Returns: any specific position representation for the database

        """
        pass

Implementing the Component class

Once you have created implemented the Configuration and Extractor classes, you can implement the Component class. The component class is the main class that defines the component. It should inherit from the DebeziumComponent class and implement the following abstract methods:

from db_components.db_common.cdc_component import DebeziumComponent
from db_components.db_common.table_schema import ColumnSchema, TableSchema
from db_components.debezium.executor import DefaultStoppingCondition, StoppingCondition
from db_components.db_common.cdc_component import DEFAULT_TOPIC_NAME

class XXCDCComponent(DebeziumComponent[XXDebeziumExtractor, XXConfiguration]):
  
    def _build_stopping_condition(self) -> StoppingCondition:
        """
        Builds stopping condition based on the configuration specific for the connector. 
        If a stopping condition is missing, implement new one.
        Returns:

        """

        max_duration_s = self._configuration.sync_options.max_runtime_s or self.component_timeout
        return DefaultStoppingCondition(max_duration_s, self._configuration.sync_options.max_wait_s)

    @property
    def has_schema_history(self) -> bool:
        """
        Flag if the Debezium connector that component implements uses schema history.
        (all but Postgres, Mongo) use schema history
        Returns:

        """
        return True

    @property
    def system_columns(self) -> list[ColumnSchema]:
      """
        Returns the system columns for the component. It can be specific for each db tyoe.
        e.g.
        [
        ColumnSchema(name="KBC__OPERATION", source_type="STRING"),
        ColumnSchema(name="KBC__EVENT_TIMESTAMP_MS", source_type="TIMESTAMP"),
        ColumnSchema(name="KBC__FILE", source_type="STRING"),
        ColumnSchema(name="KBC__POS", source_type="INTEGER"),
        ColumnSchema(name="KBC__DELETED", source_type="BOOLEAN"),
        ColumnSchema(name="KBC__BATCH_EVENT_ORDER", source_type="INTEGER")
    ]
      """
      return self.DEFAULT_SYSTEM_COLUMNS

    @property
    def system_column_name_mapping(self) -> dict[str, str]:
      """
        Returns mapping of column names coming from the debezium connector to system columns (output names). 
        e.g. {"kbc__operation": "KBC__OPERATION",
                                  "kbc__event_timestamp": "KBC__EVENT_TIMESTAMP_MS",
                                  "kbc__file": "KBC__FILE",
                                  "kbc__pos": "KBC__POS",
                                  "__deleted": "KBC__DELETED",
                                  "kbc__batch_event_order": "KBC__BATCH_EVENT_ORDER"}
      """
      return self.DEFAULT_SYSTEM_COLUMN_NAME_MAPPING

    def _get_db_specific_dbz_properties(self) -> dict:
      
        pass

    def get_tables_in_schema(self, schema: str) -> list[str]:
        """
        Each database schema can be considered a different level (e.g. database, schema)
        so it needs to be implemented
        It can be different for each database type. In many cases you can keep the default implementation.
        Args:
            schema:

        Returns:

        """
        return [f"{t[1]}.{t[2]}" for t in self._client.metadata_provider.get_tables(schema_pattern=schema)]


    def get_table_metadata(self, schema: str, table: str) -> TableSchema:
        """
        Get metadata for a specific table.
        Each database schema can be considered a different level (e.g. database, schema)
        so it needs to be implemented. 
        It can be different for each database type. In many cases you can keep the default implementation.
        Args:
            schema:
            table:

        Returns:

            """
        return self._client.metadata_provider.get_table_metadata(schema=schema,
                                                                 table_name=table)

    def generate_table_key(self, schema: TableSchema, separator: str = '_') -> str:
        """
        Generates table key based on the schema. This is used for metadata indexing.
        It can be different for each database type. In many cases you can keep the default implementation.

        Args:
            schema:
            separator: optional separator

        Returns:

        """
        # TODO: change the topic name (testcdc)
        schema_key = separator.join([DEFAULT_TOPIC_NAME, schema.schema_name, schema.name])
        return schema_key

Tests

Debezium based CDC components functional tests are defined using the datadir test framework. The functional tests are abstracted in thedb_components/debezium_core/tests package.

TestDatabaseEnvironment is the base class for JDBC connection to prepare the test environment. ] It is initialized in the DebeziumCDCDatadirTest class and may be used in the set_up.py scripts to prepare the test cases.

  • prepare_initial_table(script_name) - This method creates a test table using the provided SQL script.
    • The folder containing the scripts is defined by the db_test_traits.set_sql_traits_folder(sql_traits_path) method.

DebeziumCDCDatadirTest

DebeziumCDCDatadirTest is the base class for all functional tests. It performs the following tasks:

  • Initializes the TestDatabaseEnvironment and injects the instance into the context_parameters['db_client'] variable of the test object.
    • This can be accessed from individual tests from the set_up.py script.
  • Initializes and runs the component.
  • Cleans up the results so they can be compared.
    • Removes the KBC__EVENT_TIMESTAMP_MS column from the result as this is dynamic value changing with each run.
    • Removes the id column from debezium_signals table as this is dynamically generated UUID.
    • The results are always sorted by the event order so we can safely delete these columns.

Defining a new test

In the tests folder create the following:

test_functional.py should contain the following:

import os
import unittest

from datadirtest import DataDirTester

from db_components.debezium.tests.db_test_traits import traits as db_test_traits
from db_components.debezium.tests.functional import DebeziumCDCDatadirTest


class TestComponent(unittest.TestCase):
    # @freeze_time("2024-02-03 14:50:42.833622")
    def test_functional(self):
        # Set the path to the SQL traits folder
        sql_traits_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'sql_test_traits')
        db_test_traits.set_sql_traits_folder(sql_traits_path)
        functional_tests = DataDirTester(test_data_dir_class=DebeziumCDCDatadirTest)
        functional_tests.run()


if __name__ == "__main__":
    unittest.main()

Example set_up.py - initial load

from datadirtest import TestDataDir
from db_components.debezium.tests.functional import TestDatabaseEnvironment



def run(context: TestDataDir):
    # get value from the context parameters injected via DataDirTester constructor
    sql_client: TestDatabaseEnvironment = context.context_parameters['db_client']
    # prepare the initial sales table
    sql_client.prepare_initial_table('sales_table.sql')
    sql_client.create_signal_table()
    print("Running before script")

Example set_up.py - increment

import os
from datadirtest import TestDataDir
from db_components.debezium.tests.functional import TestDatabaseEnvironment


def get_transactions_queries():
    transactions_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'transactions.sql')
    return str(open(transactions_path, 'r').read()).split(';')


def run(context: TestDataDir):
    # get value from the context parameters injected via DataDirTester constructor
    sql_client: TestDatabaseEnvironment = context.context_parameters['db_client']
    sql_client.connection.connect()

    schema = 'inventory'
    sql_client.perform_query(f'SET search_path TO {schema}')
    
    # run the transactions
    queries = get_transactions_queries()
    for q in queries:
        if q.strip():
            sql_client.perform_query(q)
    sql_client.perform_query('commit')
    sql_client.connection.close()
    print("Running before script")

Chained tests

Use chained test to test the consecutive runs. E.g. initial load and then increments. The chained tests are ordered by name, and they pass state to each other.

img.png

Simple tests

Single test runs:

img.png

Installation

  1. Clone this repository
  2. Navigate to the debezium_core directory
  3. Run mvn clean install, this will generate kbcDebeziumEngine-jar-with-dependencies.jar in the debezium_core/jars directory.

License

MIT licensed, see LICENSE file.

About

General library for Python CDC components running in Keboola Connection environment

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages