Kafka extension for Nameko microservice framework.
This is a Nameko microservice framework extension to support Kafka entrypoint and dependency. The motivation behind the project is issue 569. Nameko-kafka provide a simple implementation of the entrypoint based on the approach by calumpeterwebb. It also includes a dependency provider for publishing Kafka messages from within a Nameko service.
The package is supports Python >= 3.5
$ pip install nameko-kafka
The extension can be used for both, a service dependency and entrypoint. Example usage for both cases are shown in the following sections.
This is basically a python-kafka producer in the form of Nameko dependency. Nameko uses dependency injection to instantiate the producer. You just need to declare it in your service class as shown:
from nameko.rpc import rpc
from nameko_kafka import KafkaProducer
class MyService:
"""
My microservice
"""
name = "my-service"
# Kafak dependency
producer = KafkaProducer(bootstrap_servers='localhost:1234')
@rpc
def method(self):
# Publish message using dependency
self.producer.send("kafka-topic", value=b"my-message", key=b"my-key")
Here KafkaProducer
accepts all options valid for python-kafka
's KafkaProducer.
You can use the nameko_kafka.consume
decorator in your services to process Kafka messages:
from nameko_kafka import consume
class MyService:
"""
My microservice
"""
name = "my-service"
@consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234')
def method(self, message):
# Your message handler
handle_message(message)
The consume
decorator accepts all the options valid for python-kafka
's KafkaConsumer.
On top of the default python-kafka
's autocommit feature, the entrypoint also comes with support for three different
types of offset commit strategies: at least once, at most once and exactly once. The three strategies correspond
to the different message delivery semantics achievable in Kafka. Examples for each are shown in the following subsections.
from nameko_kafka import consume, Semantic
class MyService:
"""
My microservice
"""
name = "my-service"
# At least once semantic consumer
@consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_LEAST_ONCE)
def method(self, message):
# Your message handler
handle_message(message)
from nameko_kafka import consume, Semantic
class MyService:
"""
My microservice
"""
name = "my-service"
# At most once semantic consumer
@consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_MOST_ONCE)
def method(self, message):
# Your message handler
handle_message(message)
The exactly once semantic requires a persistent storage to save message offsets. Such a persistent store can be
implemented using the OffsetStorage
interface provided by Nameko-kafka. There can be various backend implementations
like RDBMS, NoSQL databases, etc. Support for some comes out of the box:
from nameko_kafka import consume, Semantic
from nameko_kafka.storage import MongoStorage
from pymongo import MongoClient
class MyService:
"""
My microservice
"""
name = "my-service"
# At most once semantic consumer
@consume(
"kafka-topic",
group_id="my-group",
bootstrap_servers='localhost:1234',
semantic=Semantic.EXACTLY_ONCE,
storage=MongoStorage(
# MongoDB backend client
client=MongoClient('localhost', 27017),
# Database to use for storage
db_name="database-name",
# Collection to use for storage
collection="collection-name"
)
)
def method(self, message):
# Your message handler
handle_message(message)
Note: If the db_name
and collection
arguments are not specified, the default value of "nameko_kafka_offsets"
and
"offsets"
will be used by the storage respectively.
Part of v0.3.0
Part of v0.4.0
Part of v0.5.0
You can create your own offset storage by implementing the OffsetStorage
interface. It exposes the following methods:
from nameko_kafka.storage.base import OffsetStorage
class MyStorage(OffsetStorage):
"""
My custom offset storage.
"""
def setup(self):
"""
Method for setup of the storage.
"""
def stop(self):
"""
Method to teardown the storage.
"""
def read(self, topic, partition):
"""
Read last stored offset from storage for
given topic and partition.
:param topic: message topic
:param partition: partition number of the topic
:returns: last committed offset value
"""
def write(self, offsets):
"""
Write offsets to storage.
:param offsets: mapping between topic-partition
tuples and corresponding latest offset value,
e.g.
{
("topic-1", 0): 1,
("topic-1", 1): 3,
("topic-2", 1): 10,
...
}
"""
The extension configurations can be set in a nameko config.yaml file, or by environment variables.
# Config for entrypoint
KAFKA_CONSUMER:
bootstrap_servers: 'localhost:1234'
retry_backoff_ms: 100
...
# Config for dependency
KAFKA_PRODUCER:
bootstrap_servers: 'localhost:1234'
retries: 3
...
# Config for entrypoint
KAFKA_CONSUMER='{"bootstrap_servers": "localhost:1234", "retry_backoff_ms": 100}'
# Config for dependency
KAFKA_PRODUCER='{"bootstrap_servers": "localhost:1234", "retries": 3}'
- Kafka Entrypoint
- Kafka Dependency
- Commit strategies:
- ALMOST_ONCE_DELIVERY
- AT_LEAST_ONCE_DELIVERY
- EXACTLY_ONCE_DELIVERY
- Commit storage for EXACT_ONCE_DELIVERY strategy
For development a kafka broker is required. You can spawn one using the docker-compose.yml
file in the tests
folder:
$ cd tests
$ docker-compose up -d
To install all package dependencies:
$ pip install -r .[dev]
or
$ make deps
Other useful commands:
$ pytest --cov=nameko_kafka tests/ # to get coverage report
or
$ make coverage
$ pylint nameko_kafka # to check code quality with PyLint
or
$ make lint
Issue reports and Pull requests are always welcomed. Thanks!