Skip to content

Latest commit

 

History

History
235 lines (198 loc) · 11.7 KB

File metadata and controls

235 lines (198 loc) · 11.7 KB

Twelve Days of Single Message Transforms - Day 12 - Community Transformations

Apache Kafka ships with many Single Message Transformations included - but the great thing about it being an open API is that people can, and do, write their own transformations. Many of these are shared with the wider community, and in this final installment of the series I’m going to look at some of the transformations written by Jeremy Custenborder and available in kafka-connect-transform-common which can be downloaded and installed from Confluent Hub (or built from source, if you like that kind of thing). Also check out the XML transformation by the same author, which I’ve written about previously.

🎥 Recording

maxresdefault

Setup

  1. Clone the repository

    git clone https://github.com/confluentinc/demo-scene.git
    cd demo-scene/kafka-connect-single-message-transforms
  2. Bring the stack up

    docker-compose up -d
  3. Wait for Kafka Connect to start up

    bash -c ' \
    echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
      echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    done
    echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
    '

Generate test data

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day12-00/config \
    -d '{
        "connector.class"                 : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day12-sys01.with"          : "#{Internet.uuid}",
        "genv.day12-sys01.amount.with"    : "#{Commerce.price}",
        "genv.day12-sys01.units.with"     : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day12-sys01.txn_date.with"  : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day12-sys01.product.with"   : "#{Beer.name}",
        "genv.day12-sys01.source.with"    : "SYS01",
        "topic.day12-sys01.throttle.ms"   : 1000
    }'

Change the topic case

👉 Reference

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day12-00/config \
  -d '{
      "connector.class"          : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"           : "jdbc:mysql://mysql:3306/demo",
      "connection.user"          : "mysqluser",
      "connection.password"      : "mysqlpw",
      "topics"                   : "day12-sys01",
      "tasks.max"                : "4",
      "auto.create"              : "true",
      "auto.evolve"              : "true",

      "transforms"               : "topicCase",
      "transforms.topicCase.type": "com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase",
      "transforms.topicCase.from": "LOWER_HYPHEN",
      "transforms.topicCase.to"  : "UPPER_CAMEL"
      }'

The source topic name of day12-sys01 gets modified to Day12Sys01:

mysql> show tables;
+----------------+
| Tables_in_demo |
+----------------+
| Day12Sys01     |

Use the timestamp of a field as the message timestamp

A nice little triumvirate of transformations here, which use the timestamp in a field of a message to modify the topic name.

The three steps are:

  1. TimestampConverter to transform the field from a string to a Timestamp (not necessary if it already is)

  2. ExtractTimestamp to set the timestamp of the Kafka message to the value of the specified field

  3. TimestampRouter to modify the topic name to include the timestamp components required

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day12-01/config \
  -d '{
      "connector.class"                        : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                         : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                        : "mysqluser",
      "connection.password"                    : "mysqlpw",
      "topics"                                 : "day12-sys01",
      "tasks.max"                              : "4",
      "auto.create"                            : "true",
      "auto.evolve"                            : "true",

      "transforms"                              : "convertTS,extractTS,setTopicName",
      "transforms.convertTS.type"               : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.convertTS.field"              : "txn_date",
      "transforms.convertTS.format"             : "EEE MMM dd HH:mm:ss zzz yyyy",
      "transforms.convertTS.target.type"        : "Timestamp",
      "transforms.extractTS.type"               : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
      "transforms.extractTS.field.name"         : "txn_date",
      "transforms.setTopicName.type"            : "org.apache.kafka.connect.transforms.TimestampRouter",
      "transforms.setTopicName.topic.format"    : "${topic}_${timestamp}",
      "transforms.setTopicName.timestamp.format": "YYYY-MM-dd"
      }'

Resulting topic takes the date from the message field txn_date and generates table names accordingly:

mysql> show tables;
+------------------------+
| Tables_in_demo         |
+------------------------+
| day12-sys01_2020-12-07 |
| day12-sys01_2020-12-08 |
| day12-sys01_2020-12-09 |
| day12-sys01_2020-12-10 |
| day12-sys01_2020-12-11 |
| day12-sys01_2020-12-12 |
| day12-sys01_2020-12-13 |
| day12-sys01_2020-12-14 |
| day12-sys01_2020-12-15 |
| day12-sys01_2020-12-16 |
+------------------------+
12 rows in set (0.01 sec)

Add the current timestamp to the message payload

👉 Reference

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day12-02/config \
  -d '{
      "connector.class"          : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"           : "jdbc:mysql://mysql:3306/demo",
      "connection.user"          : "mysqluser",
      "connection.password"      : "mysqlpw",
      "topics"                   : "day12-sys01",
      "tasks.max"                : "4",
      "auto.create"              : "true",
      "auto.evolve"              : "true",

      "transforms"                : "addTSNow",
      "transforms.addTSNow.type"  : "com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value",
      "transforms.addTSNow.fields": "processingTS"
      }'
mysql> select product, amount, txn_date, processingTS from `day12-sys01` ORDER BY units  LIMIT 5;
+------------------------------+--------+------------------------------+-------------------------+
| product                      | amount | txn_date                     | processingTS            |
+------------------------------+--------+------------------------------+-------------------------+
| Sublimely Self-Righteous Ale | 61.25  | Mon Dec 14 09:12:03 GMT 2020 | 2020-12-17 00:43:02.550 |
| Arrogant Bastard Ale         | 88.65  | Wed Dec 09 18:05:02 GMT 2020 | 2020-12-17 00:43:02.559 |
| Sublimely Self-Righteous Ale | 30.81  | Fri Dec 11 14:49:14 GMT 2020 | 2020-12-17 00:43:02.551 |
| Arrogant Bastard Ale         | 20.45  | Tue Dec 08 10:30:21 GMT 2020 | 2020-12-17 00:43:02.223 |
| Sublimely Self-Righteous Ale | 56.95  | Wed Dec 16 23:12:23 GMT 2020 | 2020-12-17 00:43:02.233 |
+------------------------------+--------+------------------------------+-------------------------+
5 rows in set (0.00 sec)

Using SimulatorSinkConnector (and Single Message Transform TRACE logging)

Not a transformation as such, but a useful tip for examining the output of Transforms without needing to route the data to an actual target:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-simulator-day12-02/config \
    -d '{
        "connector.class"           : "com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector",
        "topics"                    : "day12-sys01",
        "log.entries"               : "true",
        "transforms"                : "addTSNow",
        "transforms.addTSNow.type"  : "com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value",
        "transforms.addTSNow.fields": "processingTS"
    }'

You can see the message after it’s been processed by the transform(s) in the Kafka Connect worker log:

[2020-12-18 00:29:59,651] INFO [sink-simulator-day12-02|task-0] record.value=Struct{units=39,product=Delirium Tremens,amount=32.60,txn_date=Wed Dec 16 07:27:19 GMT 2020,source=SYS01,processingTS=Fri Dec 18 00:29:59 GMT 2020} (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)

You can also get the Kafka Connect runtime to log TRACE messages that show the source messages before a transformation (c.f. Changing the Logging Level for Kafka Connect Dynamically):

curl -s -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.TransformationChain \
    -d '{"level": "TRACE"}' \
    | jq '.'

With that set the Kafka Connect worker then logs the record before it is transformed, and then from the SimulatorSink its state after transform:

[2020-12-18 00:31:54,572] TRACE [sink-simulator-day12-02|task-0] Applying transformation com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value to SinkRecord{kafkaOffset=121, timestampType=CreateTime} ConnectRecord{topic='day12-sys01', kafkaPartition=0, key=fd403528-90c3-45a1-a1c5-3f9ebe2799be, keySchema=Schema{STRING}, value=Struct{units=6,product=Nugget Nectar,amount=91.30,txn_date=Thu Dec 10 06:51:22 GMT 2020,source=SYS01}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608251514568, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
[2020-12-18 00:31:54,572] INFO [sink-simulator-day12-02|task-0] record.value=Struct{units=6,product=Nugget Nectar,amount=91.30,txn_date=Thu Dec 10 06:51:22 GMT 2020,source=SYS01,processingTS=Fri Dec 18 00:31:54 GMT 2020} (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)