Name : Google BigQuery Sink Connector
Original Developer: WePay and Confluent
Link to Github: https://github.com/confluentinc/kafka-connect-bigquery
License: Apache 2.0
TESTED
One of:
NOT TESTED - built, not tested
TESTED - tested end-to-end in limited number of usecases
MAINTAINED - tested and has unit tests or integration tests
Follow Google's instructions to create an account and export json key.
Refer to Big Query's description of roles and permissions to pick ones you require for your setup.
Create the target dataset.
Sample config for the connector:
processingGuarantees: "EFFECTIVELY_ONCE"
configs:
topic: "bq-test01"
kafkaConnectorSinkClass: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
offsetStorageTopic: "bq-sink-offsets01"
batchSize: "10"
lingerTimeMs: "10"
# required to be true for the Big Query
sanitizeTopicName: "true"
kafkaConnectorConfigProperties:
name: "bq-sink1"
connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
tasks.max: "1"
topics: "bq-test01"
project: "my-bigquery-project"
defaultDataset: "BQ_CONNECTOR_TEST"
keyfile: "/Users/me/my-bigquery-key.json"
keySource: "FILE"
kafkaDataFieldName: "topicMetaData"
autoCreateTables: "true"
sanitizeFieldNames: "true"
bufferSize: "10"
queueSize: "100"
Make sure you have correct topic names, project, dataset, etc.
Please refer to the connector's documentation and code for details on additional supported parameters
- Start pulsar standalone
$ bin/pulsar standalone
- Set retention for the namespace:
$ bin/pulsar-admin namespaces set-retention public/default -t -1 -s 100M
3.Set topic schema:
$ cat ~/schema_js.json
{
"type": "JSON",
"schema": "{\"type\":\"record\",\"name\":\"TestSchema\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"string\"]},{\"name\":\"col2\",\"type\":[\"null\",\"string\"]}]}",
"properties": {}
}
$ bin/pulsar-admin schemas upload --filename ~/schema_js.json bq-test01
- Start the connector
$ bin/pulsar-admin sinks localrun \
-a ~/src/pulsar-3rdparty-connectors/pulsar-connectors/bigquery/target/pulsar-3rdparty-pulsar-connectors-bigquery-0.1.0-SNAPSHOT.nar \
--name bq-sink \
--namespace public/default \
--parallelism 1 \
-i bq-test01 \
--sink-config-file ~/bigquery.yaml
Make sure that connector started normally.
- Send messages to Pulsar
$ cat ~/a.json
{"col1": "val1", "col2": "val2"}
$ bin/pulsar-client produce bq-test01 -f ~/a.json -n 20 -vs json:"{\"type\":\"record\",\"name\":\"TestSchema\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"string\"]},{\"name\":\"col2\",\"type\":[\"null\",\"string\"]}]}"
Please note that -vs
(pass schema for the value) parameter is not supported by some earlier
version of Pulsar 2.8.x
- Check the messages in Big Query