Skip to content

parkhub/iris

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

70 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

iris

Kafka and Avro mashup

A kafka library that is meant to validate produced messages and decode consumed messages using schemas defined in Confluent's Schema Registry.

It uses the excellent node-rdkafka library under the hood by wrapping a producer's produce method or a consumer's consume method with encodings that work with a schema registry backed Kafka deployment. It uses avsc to handle avro encoding/decoding of JSON messages.

Build Status Code Coverage Dependencies version Apache License PRs Welcome Roadmap Semantic Release Commitizen

Watch on GitHub Star on GitHub Tweet

Install

npm install @parkhub/iris

Usage

Give it the url for your schema registry, initialize it and create clients!

Make sure to brush up on your Kafka knowledge before tweaking with producers/consumers options!

import iris from '@parkhub/iris';

const registryUrl = 'http://registry:8081';
const brokerList = 'kafka-1:9092,kafka-2:9092';
const schemaCfgs = [
  {
    topic: 'TestTopic'
  }, {
   topic: 'OtherTopic'
   version: '1.2'
   }
];
async function startApp() {
  const kafka = await iris({ registryUrl, brokerList, SchemaCfgs })
	  .initialize();

  const consumer = await kafka.createConsumer({
    groupId: 'consumer-group',
    topicCfgs: {
      'consume.callback.max.messages': 100
    }
  }).connect();

  consumer.subscribe(['TestTopic'], (data) => {
    const { message, topic, schemaId } = data;
    console.log('Message received', JSON.stringify(message, null, 4));
    console.log('Message from topic', topic);
    console.log('SchemaId used to parse message', schemaId);
  });

  const producer = await kafka.createProducer({
    'client.id': 'kafka',
    'dr_cb': true
  }).connect();


  const message = {
    name: 'satsuki',
    age: 19
  };

  producer.produce('TestTopic', null, message);

  return kafka;
}

startApp()
  .then(async (kafka) => {
    kafka.disconnectAllClients();
  })
  .catch(err => console.error(err));

API

The API for Iris clients follows pretty closely to those of node-rdkafka with the exception of a few methods that have been promisified!

These methods for both Producers/Consumers are:

  • getMetadata
  • connect
  • disconnect

They take the same configurations as described in the node-rdkafka api docs except the callback if you're using the promise API.

HOWEVER, due to node-rdkafka using its methods to do some internal magic, you also have the option of using the callback API. Just pass in the callback along with each method's arguments.

All producer/consumer clients listen to the same events, take the same configurations(with a few exceptions listed below) and behave the same way. Make sure to take a look at node-rdkafka configurations for more detail!

Consumer API

createConsumer({ groupId, Kafka Consumer Configurations, topicCfgs(valid Kafka Consumer Topic Configurations) })

Differences

This method follows closely with Kafka Consumer Configurations except that groupId is used here instead of 'group.id' Any other valid configuration can be passed to the Consumer by following the same semantics used in node-rdkafka configurations. And the same applies for topicCfgs Object and topic configurations.

Only the [standard consumer api] is supported at the moment. I've also joined the "subscribe" process so subscribe actually takes an array of topics and the handler. So you don't need to call subscribe, then consume then listen on 'data' event. Everything is done when you call the subscribe method.

A consumer handler will receive the following structure:

{
  message: 'Decoded Message',
  topic: 'Topic the message came from',
  schemaId: 'The schemaId used to encode the topic',
  key: 'Key for this kafka topic',
  size: 'Size of message in bytes',
  partition: 'Partition the message was on',
  offset: 'Offset the message was read from'
}
import iris from '@parkhub/iris';

// Using async/await
(async function startConsumer() {
  const consumer = iris.createConsumer({
    groupId: 'consumer-group',
    topicCfgs: {
      'consume.callback.max.messages': 100
    }
  });

  await consumer.connect();

  const handler = data => console.log(data);
  consumer.subscribe(['MY_TOPIC'], handler);

  console.log('DONE!');
})();

// Using Promises
iris.createConsumer({ connection: 'kafka:9092', groupId: 'MY_GROUP_ID'})
  .then(consumer => {
    const handler = message => console.log(message);
    consumer.subscribe(['MY_TOPIC'], handler);
  })
  .then(() => console.log('DONE!'))
  .catch(err => console.log('ERROR!', err));

Producer API

createProducer(Kafka Producer Configurations)

Differences

The only difference is that iris' only supports [standard-api producer] clients. Everything else remains the same.

import iris from '@parkhub/iris';

// Using async/await
(async function startProducer() {
  const producer = await iris.createProducer({
    'client.id': 'kafka',
    'dr_cb': true
  });

  await producer.connect();

  producer.produce('TestTopic', null, 'message');

  await producer.disconnect();

  console.log('DONE!');
}());

// Using Promises
iris
  .createProducer({ 'client.id': 'kafka:9092', dr_cb: true })
  .then((producer) => {
    producer.produce('TestTopic', null, 'message');

    return producer.disconnect();
  })
  .then(() => console.log('DONE!'))
  .catch(err => console.log('ERROR!', err));

Development Guide

In this section you will be able to find out how to get started developing for iris.

Requirements

  • Must have the latest version of Docker installed.

Downloading

git clone git@github.com:parkhub/iris.git

Building

docker-compose up iris-integration

Running Tests

Integration Tests

When you run docker-compose up iris-integration it will actually run the integration tests. As you make changes to the project the tests will rerun.

Unit Tests

To run the unit test simply make sure to install the packages locally by running npm start. Then all you need to do is run the test command.

npm start test

Creating a Commit

We use semantic-release to manage our releases. If you haven't worked with it before please take a look at their project to understand more about how it works.

  1. First I like to run the validate command before running through the commit process because if it fails on validation when your committing then you will have to go through the commit process again. To run the validate command simply run this:

    npm start validate

  2. To start a new release, make sure you have added your files to git and then run this command:

    npm start commit

    This will take you through the release process. Follow the directions and read the steps throughly.

  3. After you have committed your code and it passes the linter then you can push your branch up to Github and create a pull request.