Skip to content
This repository has been archived by the owner on Sep 11, 2023. It is now read-only.

Kitura-Next/SwiftKafka

Repository files navigation

Kitura

APIDoc Build Status - Master macOS Linux Apache 2 Slack Status

SwiftKafka

A swift implementation of Kafka for producing and consuming from event streams.

This works by wrapping the librdkafka C library.

Swift version

The latest version of SwiftKafka requires Swift 5.0 or later. You can download this version of the Swift binaries by following this link.

Usage

Swift Package Manager

Add dependencies

Add the SwiftKafka package to the dependencies within your application’s Package.swift file. Substitute "x.x.x" with the latest SwiftKafka release.

.package(url: "https://github.com/Kitura-Next/SwiftKafka.git", from: "x.x.x")

Add SwiftKafka to your target's dependencies:

.target(name: "example", dependencies: ["SwiftKafka"]),

Import package

import SwiftKafka

Getting Started

To use SwiftKafka you will need to install the librdkafka package:

macOS

brew install librdkafka

Linux

Install librdkafka from the Confluent APT repositories - see instructions here (following steps 1 and 2 to add the Confluent package signing key and apt repository), and then install librdkafka:

sudo apt install librdkafka

Running a Kafka instance locally

To experiment locally, you can set up your own Kafka server to produce/consume from.

On macOS you can follow this guide on Kafka Installation using Homebrew to run a local server.

On Linux, you can follow this guide for a manual install on Ubuntu.

KafkaConfig

The KafkaConfig class contains your configuration settings for a KafkaConsumer/KafkaProducer.

The class is initialized with default values which can then be changed using the helper functions. For example, to enable all logging you would set the debug variable:

let config = KafkaConfig()
config.debug = [.all]

Alternatively, you can access the configuration dictionary directly on the KafkaConfig object:

let config = KafkaConfig()
config["debug"] = "all"

The list of configuration keys and descriptions can be found in the librdkafka CONFIGURATION.md.

When you pass this class to a producer/consumer, a copy is made so further changes to the instance will not affect existing configurations.

KafkaProducer:

The KafkaProducer class produces messages to a Kafka server.

You can initialize a KafkaProducer using a KafkaConfig instance or with the default configuration.

The producer sends a KafkaProducerRecord with the following fields:

  • topic: The topic where the record will be sent. If this topic doesn't exist the producer will try to create it.
  • value: The message body that will be sent with the record.
  • partition: The topic partition the record will be sent to. If this is not set the partition will be automatically assigned.
  • key: If the partition is not set, records with the same key will be sent to the same partition. Since order is guaranteed within a partition, these records will be read in order they were produced.

The send() function is asynchronous. The result is returned in a callback which contains a KafkaConsumerRecord on success or a KafkaError on failure.

The following example produces a message with the value "Hello World" to a "test" topic of a Kafka server running on localhost.

do {
    let producer = try KafkaProducer()
    guard producer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    producer.send(producerRecord: KafkaProducerRecord(topic: "test", value: "Hello world", key: "Key")) { result in
        switch result {
        case .success(let message):
            print("Message at offset \(message.offset) successfully sent")
        case .failure(let error):
            print("Error producing: \(error)")
        }
    }
} catch {
    print("Error creating producer: \(error)")
}

KafkaConsumer:

The KafkaConsumer class consumes messages from a Kafka server.

You can initialize a KafkaConsumer using a KafkaConfig instance or with the default configuration.

You can then subscribe to topics using subscribe(). This will distribute the topic partitions evenly between consumers with the same group id. If you do not set a group id, a random UUID will be used.

Alternatively to can use assign() to manually set the partition and offset for the consumer.

Both subscribe() and assign() are asynchronous and will return immediately, however they may take up to sessionTimeoutMs (Default 10 seconds) * 2 before the consumer completely connects.

To consume messages from Kafka you call poll(timeout:). This will poll Kafka, blocking for timeout seconds. When it completes, it returns an array of KafkaConsumerRecord with the following fields:

  • value: The message value if it can be UTF8 decoded to a String.
  • valueData: The message value as raw data.
  • key: The message key if it can be utf8 decoded to a String.
  • keyData: The message key as raw data.
  • offset: The message offset.
  • topic: The topic that the message was consumed from.
  • partition: The partition that the message was consumed from.

When you have finished consuming, you can call close() to close the connection and unassigns the consumer. The unassigned partitions will then be rebalanced between other consumers in the group. If close() is not called, the consumer will be closed when the class is deallocated.

The following example consumes and print all unread messages from the "test" topic of the Kafka server.

do {
    let config = KafkaConfig()
    config.groupId = "Kitura"
    config.autoOffsetReset = .beginning
    let consumer = try KafkaConsumer(config: config)
    guard consumer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    try consumer.subscribe(topics: ["test"])
    while(true) {
        let records = try consumer.poll()
        print(records)
    }
} catch {
    print("Error creating consumer: \(error)")
}

API Documentation

For more information visit our API reference.

Community

We love to talk server-side Swift, and Kitura. Join our Slack to meet the team!

License

This library is licensed under Apache 2.0. Full license text is available in LICENSE.