Skip to content

Commit

Permalink
Merge pull request #51 from Nordstrom/config_updates
Browse files Browse the repository at this point in the history
Configuration updates
  • Loading branch information
dylanmei authored Jun 3, 2024
2 parents b157154 + 18e295e commit 3ca54bd
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 270 deletions.
11 changes: 6 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
*.tar.gz
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
/target/

# Eclipse
.classpath
.project
Expand All @@ -33,4 +29,9 @@ dependency-reduced-pom.xml

# IntelliJ
.idea
*.iml
*.iml

# Files relevant to this project
target/
config/sink-connector.properties
config/source-connector.properties
127 changes: 24 additions & 103 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# kafka-connect-sqs

The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue).
The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS
queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue).

## Compatibility matrix

Expand All @@ -10,12 +11,11 @@ The SQS connector plugin provides the ability to use AWS SQS queues as both a so
|1.5|3.3.2|1.12.409|
|1.6|3.4.1|1.12.669|

Running the connector on versions of Kafka Connect prior to 3.0 is not recommended.

## Building the distributable

You can build the connector with Maven using the standard lifecycle goals:
```

```sh
mvn clean
mvn package
```
Expand All @@ -37,27 +37,10 @@ Optional properties:
* `sqs.message.attributes.include.list`: The comma separated list of MessageAttribute names to be included, if empty it includes all the Message Attributes. Default is the empty string.
* `sqs.message.attributes.partition.key`: The name of a single AWS SQS MessageAttribute to use as the partition key. If this is not specified, default to the SQS message ID as the partition key.

### Sample connector configuration

```json
{
"config": {
"connector.class": "com.nordstrom.kafka.connect.sqs.SqsSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "my-sqs-source",
"sqs.max.messages": "5",
"sqs.queue.url": "https://sqs.<AWS_REGION>.amazonaws.com/<AWS_ACCOUNT>/my-queue",
"sqs.wait.time.seconds": "5",
"topics": "my-topic",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"name": "my-sqs-source"
}
```

### Sample IAM policy

Ensure the authentication principal has privileges to read messages from the SQS queue.
When using this connector, ensure the authentication principal has privileges to read messages from
the SQS queue.

```json
{
Expand Down Expand Up @@ -90,23 +73,6 @@ Optional properties:
* `sqs.message.attributes.enabled`: If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false.
* `sqs.message.attributes.include.list`: The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string.

### Sample connector configuration

```json
{
"config": {
"connector.class": "com.nordstrom.kafka.connect.sqs.SqsSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "my-sqs-sink",
"sqs.queue.url": "https://sqs.<AWS_REGION>.amazonaws.com/<AWS_ACCOUNT>/my-queue",
"sqs.region": "<AWS_REGION>",
"topics": "my-topic",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"name": "my-sqs-sink"
}
```

### Sample SQS queue policy

Define a corresponding SQS queue policy that allows the connector to send messages to the SQS queue:
Expand All @@ -131,7 +97,8 @@ Define a corresponding SQS queue policy that allows the connector to send messag

### Sample IAM policy

Ensure the authentication principal has privileges to send messages to the SQS queue.
When using this connector, ensure the authentication principal has privileges to read messages from
the SQS queue.

```json
{
Expand Down Expand Up @@ -205,73 +172,27 @@ The IAM role will have a corresponding trust policy. For example:
}
```

## Running the demo

### Build the connector plugin
## Running the connector

Build the connector jar file:

```shell
mvn clean package
```
This example demonstrates using the sink connector to send a message to an SQS queue from Kafka.

### Run the connector using Docker Compose
- Setup an SQS queue
- Setup Kafka. Use the cluster defined in `docker-compose.yaml` if you don't have one
- Customize the files in the config directory; for example, `config/sink-connector.properties.example`

Ensure you have `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables exported in your shell. Docker Compose will pass these values into the `connect` container.

Use the provided [Docker Compose](https://docs.docker.com/compose) file and run `docker-compose up`.

With the [Kafka Connect REST interface](https://docs.confluent.io/current/connect/references/restapi.html), verify the SQS sink and source connectors are installed and ready: `curl http://localhost:8083/connector-plugins`.

### AWS

The demo assumes you have an AWS account and valid credentials in ~/.aws/credentials as well as
setting the `AWS_PROFILE` and `AWS_REGION` to appropriate values.

These are required so that Kafka Connect will have access to the SQS queues.

### The flow

We will use the AWS Console to put a message into an SQS queue. A source connector will read messages
from the queue and write the messages to a Kafka topic. A sink connector will read messages from the
topic and write to a _different_ SQS queue.
Now, start the sink connector in standalone mode:

```sh
$KAFKA_HOME/bin/connect-standalone.sh \
config/connect-worker.properties config/sink-connector.properties
```
__ | s | | k | | s |
( o> chirp ---> | q | ---> | a | ---> | q |
///\ | s | | | f | | | s |
\V_/_ |_____| | |_____| | |_____|
chirps-q | chirps-t | chirped-q
| |
| |
source- sink-
connector connector
```

### Create AWS SQS queues

Create `chirps-q` and `chirped-q` SQS queues using the AWS Console. Take note of the `URL:` values for each
as you will need them to configure the connectors later.
Use a tool to produce messages to the Kafka topic.

### Create the connectors

The `source` connector configuration is defined in `demos/sqs-source-chirps.json]`, The `sink` connector configuration
is defined in `demos/sqs-sink-chirped.json`. You will have to modify the `sqs.queue.url` parameter to reflect the
values noted when you created the queues.

Create the connectors using the Confluent CLI:

```shell
curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-source-chirps.json
curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-sink-chirped.json
```sh
bin/kafka-console-producer --bootstrap-server localhost:9092 \
--topic hello-sqs-sink \
--property parse.headers=true \
--property 'headers.delimiter=\t'
>test:abc\t{"hello":"world"}
```

### Send and receive messages

Using the AWS Console (or the AWS CLI), send a message to the `chirps-q`.

The source connector will read the message from the queue and write it to the `chirps-t` Kafka topic.

The `sink` connector will read the message from the topic and write it to the `chirped-q` queue.

Use the AWS Console (or the AWS CLI) to read your message from the `chirped-q`
10 changes: 10 additions & 0 deletions config/connect-worker.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
bootstrap.servers=localhost:9092

plugin.path=./target/plugin/
offset.storage.file.filename=/tmp/connect.offsets

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
8 changes: 8 additions & 0 deletions config/sink-connector.properties.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: sqs-sink
connector.class: com.nordstrom.kafka.connect.sqs.SqsSinkConnector
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter

topics: hello-sqs-sink
sqs.queue.url: https://sqs.us-west-2.amazonaws.com/<Your AWS account>/hello-sqs-sink
sqs.message.attributes.enabled: true
9 changes: 9 additions & 0 deletions config/source-connector.properties.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: sqs-source
connector.class: com.nordstrom.kafka.connect.sqs.SqsSourceConnector
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter

topics: hello-sqs-source
sqs.queue.url: https://sqs.us-west-2.amazonaws.com/<Your AWS account>/hello-sqs-source
sqs.wait.time.seconds: 5
sqs.message.attributes.enabled: true
11 changes: 0 additions & 11 deletions demos/sqs-sink-chirped.json

This file was deleted.

13 changes: 0 additions & 13 deletions demos/sqs-source-chirps.json

This file was deleted.

71 changes: 19 additions & 52 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,31 @@ version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.1.1
image: confluentinc/cp-zookeeper:7.4.5
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
logging: { driver: none }
logging:
driver: none

broker:
image: confluentinc/cp-kafka:7.1.1
kafka-broker:
image: confluentinc/cp-kafka:7.4.5
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=PUBLIC://0.0.0.0:9092,INTERNAL://0.0.0.0:19092
- KAFKA_ADVERTISED_LISTENERS=PUBLIC://localhost:9092,INTERNAL://broker:19092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PUBLIC:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_NUM_PARTITIONS=2
- KAFKA_DEFAULT_REPLICATION_FACTOR=1
- KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=10
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_DELETE_TOPIC_ENABLE=true
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false

- KAFKA_LOG4J_ROOT_LOGLEVEL=INFO
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PUBLIC://0.0.0.0:9092,INTERNAL://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PUBLIC://localhost:9092,INTERNAL://kafka-broker:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PUBLIC:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_NUM_PARTITIONS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 10
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
KAFKA_LOG4J_LOGGERS: kafka=WARN,kafka.controller=INFO,kafka.server.KafkaServer=INFO,org.apache.zookeeper=WARN
depends_on: [zookeeper]
logging: { driver: none }

# NB: run connect locally in stand-alone mode to debug
connect:
image: confluentinc/cp-kafka-connect:7.1.1
ports:
- 8083:8083
environment:
- CONNECT_BOOTSTRAP_SERVERS=broker:19092
- CONNECT_REST_ADVERTISED_HOST_NAME=connect
- CONNECT_REST_PORT=8083
- CONNECT_GROUP_ID=connect
- CONNECT_CONFIG_STORAGE_TOPIC=_connect_configs
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
- CONNECT_OFFSET_STORAGE_TOPIC=_connect_offsets
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
- CONNECT_STATUS_STORAGE_TOPIC=_connect_status
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

- CONNECT_PLUGIN_PATH=/opt/connectors
- KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/log4j.properties

- AWS_PROFILE
- AWS_REGION
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
volumes:
- ~/.aws:/root/.aws
- ./target/plugin:/opt/connectors
- ./config/log4j.properties:/etc/log4j.properties
depends_on: [broker]
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<groupId>com.nordstrom.kafka.connect.sqs</groupId>
<artifactId>kafka-connect-sqs</artifactId>
<name>Kafka Connect SQS Sink/Source Connector</name>
<version>1.6.1</version>
<version>1.6.2</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Loading

0 comments on commit 3ca54bd

Please sign in to comment.