kafka-connect-redis-sink
is used to save data from Kafka to Redis with different data types options.
- Maven 3.1 or later
- Java 8 or later
- Git (optional)
Clone repository using following command (or directly download project):
git clone https://github.com/mangrrua/kafka-connect-redis-sink.git
Go to the project directory:
cd kafka-connect-redis-sink
Build project:
mvn clean package
Jar with dependencies will be created in the /target
directory as project-name-1.0-jar-with-dependencies.jar
name.
- Jedis is used to connect to the Redis.
- FasterXml Jackson is used to convert output value of record to
JSON
string. - JUnit and Mockito are used for testing purpose.
This redis sink connector supports cache(string)
and hashes
data types(redis data formats) with different options. You can write JSON
data that stored in the Kafka topics to Redis with supported data formats and different Redis key options.
It works with only Schemaless JSON
data now(schema support will be added in later versions). Thus, you must specify value.converter
and value.converter.schemas.enable
properties in the connect standalone/distributed.properties
like below;
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Connector, receives JSON data as Map<String, Object>
format, and generates output according to your configurations. If record value is not a Map(not a JSON), connector ignores record.
You can specify Redis key with two way;
-
Use Kafka Record key as Redis Key: With this option, record key will be used for Redis key, and record value will be used as Redis value. If you will use this option, your
key.converter
andkey.converter.schemas.enable
properties in the connectstandalone/distributed.properties
must be like below;key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false
In this situation, if record key value is String, not null and non empty, key value is assigned as Redis key. Otherwise, connector ignores this record, and does not sends it to Redis
-
Use Any Fields of Record Value: With this option, you can select any fields from your JSON input(record value), and values of these selected fields will be used as Redis key. If more than one fields are selected,
redis.key.delimiter
is used to split Redis key. The remaining fields will be used as Redis value according to selected Redis data type(cache or hashes).Record will be saved to Redis if at least one selected field is primitive data type and not null and non empty. Otherwise, connector ignores this record, and does not sends it to Redis.
use.record.key
option is configurable to which option is used to decide Redis key.
Connector also supports Redis Standalone
and Redis Sentinel
connection. For this Jedis
was used due to lightweight api.
This connector supports two data types to write data to Redis: string(cache) and hashes. For hashes
data structures, connector also supports two write option;
- String(cache) Data Type - Cache Writer
- Hashes Data Type
- Hash Field Writer
- Hash Value Writer
You can choose any writer you want using redis.writer
property.
1. String Data Type(CacheWriter)
In this format, Redis "set redisKey redisValue"
is used. Redis key and value will be genereted according to your configurations. Output value that will be saved as Redis value converted to JSON
string after generated. Finally, output records will be saved to Redis.
Example input data in Kafka topic;
record key : "96533"
record value : {"id": "125", "name": "shelly", "surname": "stark", "age": "25", "address": "address", "job": "Computer Engineer"}
If you set use.record.key
to true
, Redis key will be 96533
, and Redis value will be a JSON string;
{"id": "125", "name": "shelly", "surname": "stark", "age": "25", "address": "address", "job": "Computer Engineer"}
If you set use.record.key
to false
, and selected field name for redis key is id
, Redis key will be 125
, and Redis value will be like;
{"surname": "stark", "age": "25", "address": "address", "job": "Computer Engineer"}
You can select multiple fields for Redis key such as name
and surname
. Also, you can set redis.key.delimiter
you want(for example -
) . In this case, Redis key will be shelly-stark
. And Redis value will be like;
{"id": "125", "age": "25", "address": "address", "job": "Computer Engineer"}
2. Hashes Data Type
Connector uses Redis "hmset redisKey subKey subValue subKey1 subValue1"
command. You can use two writer for Hashes data structures.
-
Hash_field Writer
This writer receives your record data, generates Redis key, and converts each field of record value to
subKey - subValue
format for Redis Hashes structure, then it saves these fields under the generated Redis key.Example input data in Kafka topic;
record key : "96533" record value : {"id": "125", "name": "shelly", "surname": "stark", "age": "25", "address": "address", "job": "Computer Engineer"}
If you set
use.record.key
totrue
, Redis key will be96533
, and Redis value will be like(value is basically Map format);"id": "125", "name": "shelly", "surname": "stark", "age": "25", "address": "address", "job": "Computer Engineer"
If you set
use.record.key
tofalse
, and selected field names arename
andsurname
, Redis key will beshelly.stark
, and Redis value will be like;"id": "125", "age": "25", "address": "address", "job": "Computer Engineer"
-
Hash_value Writer
Sometimes, values can be added under the specific key. In this case, generated Redis key is used as
subKey
under the specifiedtable name
. Writer usesredis.table
configuration for this. Writer converts Kafka record to the same format as in thecache_writer
.Example inputs data in Kafka topic;
record key : "96533" record value : {"id": "125", "name": "shelly", "surname": "stark", "age": "25", "address": "address", "job": "Computer Engineer"} record key : "12345" record value : {"id": "10", "name": "jaime", "surname": "older", "age": "29", "address": "address1", "job": "Doctor"}
If you set
record.table
toredis-kafka-table
anduse.record.key
totrue
, records will be saved like this;redis-kafka-table: "96533": {"id": "125", "name": "shelly", "surname": "stark", "age": "25", "address": "address", "job": "Computer Engineer"} "12345": {"id": "10", "name": "jaime", "surname": "older", "age": "29", "address": "address1", "job": "Doctor"}
You can use multiple fields as Redis key like in the other writers with specific primary key delimiter.
Property | Description | Type | Default Value | Valid Values |
---|---|---|---|---|
redis.writer | Type of data that will be stored in the Redis. | String | cache | cache hash_field hash_value |
redis.type | Redis Connection Type | string | standalone | standalone sentinel |
use.record.key | If it is true, Kafka record key is used as Redis key. Otherwise, redis.key.fields property is used to decide Redis key. | boolean | true | true false |
redis.key.fields | Fields to decide Redis key from record value. For multiple fields, set this value as field1:field2 format. This option is used by writers when use.record.key is false. | string | none | none |
redis.key.delimiter | Delimiter for selected multiple fields for Redis key. | string | . | any string values |
redis.key.prefix | A prefix value for Redis key. | string | null | any string values |
redis.table | If you use 'hash_value' to save data to Redis, you must specify the table name. This represents key in the Redis. Generated Redis records from received Kafka records will be saved under the this table. | String | sinkTable | any string values |
redis.connection.hosts | Host and port information to connect to the Redis. It must be as host:port format. If there is more than one host(e.g redis sentinel), it must be as host:port,host1:port1 format. | string | localhost:6379 | none |
redis.connection.timeout | Maximum time to connect to the Redis. | int | 2000 | none |
redis.connection.password | Password of Redis. | string | null | none |
redis.dbNum | Database number where data will be saved. | int | 0 | none |
name=my-redis-sink
connector.class=connect.RedisSinkConnector
tasks.max=1
topics=test-sink
redis.writer=cache
redis.type=standalone
use.record.key=false
#redis.key.fields=fieldName
#redis.key.prefix=PRODUCT:
#redis.key.delimiter=,
#redis.table=tbl
redis.connection.hosts=localhost:6379
#redis.connection.password=none
#redis.connection.timeout=2000
redis.dbNum=0