A collection of Simple Message Transformation (SMT) for Apache Kafka Connect
$ ./mvnw clean install
This transformation inserts the current system timestamp into the header. It is beneficial when there is a need to capture the time at which the record is processed, for example, when Sink connectors consume a record. It enables deriving information about the delay between the record's production time and its consumption time.
"transforms": "insertHeaderTs",
"transforms.insertHeaderTs.type": "org.cjmencias.kafka.connect.transforms.InsertHeaderCurrentTimestamp",
"transforms.insertHeaderTs.header": "laststreamdate"
This predicate checks whether a field exists within the payload value.
"predicates": "hasFieldAfter",
"predicates.hasFieldAfter.type": "org.cjmencias.kafka.connect.transforms.predicates.HasField",
"predicates.hasFieldAfter.field": "after"