A Kafka Client for Loopback4 built on top of KafkaJS.
Install KafkaConnectorComponent using npm
;
$ [npm install | yarn add] loopback4-kafka-client
Configure and load KafkaConnectorComponent in the application constructor as shown below.
import {
KafkaClientBindings,
KafkaClientComponent,
KafkaClientOptions,
} from 'loopback4-kafka-client';
// ...
export class MyApplication extends BootMixin(
ServiceMixin(RepositoryMixin(RestApplication)),
) {
constructor(options: ApplicationConfig = {}) {
this.configure<KafkaClientOptions>(KafkaClientBindings.Component).to({
initObservers: true, // if you want to init consumer lifeCycleObserver
topics: [Topics.First], // if you want to use producers for given topics
connection: {
// refer https://kafka.js.org/docs/configuration
brokers: [process.env.KAFKA_SERVER ?? ''],
},
});
this.bind(KafkaClientBindings.ProducerConfiguration).to({
// your producer config
// refer https://kafka.js.org/docs/producing#options
});
this.bind(KafkaClientBindings.ConsumerConfiguration).to({
// refer https://kafka.js.org/docs/consuming#options
groupId: process.env.KAFKA_CONSUMER_GROUP,
});
this.component(KafkaClientComponent);
// ...
}
// ...
}
Producers and Consumers work on a Stream
which defines the topic and events used by the application. You can implement the IStreamDefinition
to create your own stream class.
export class TestStream implements IStreamDefinition {
topic = Topics.First;
messages: {
// [<event type key from enum>] : <event type or interface>
[Events.start]: StartEvent;
[Events.stop]: StopEvent;
};
}
A Consumer is a loopback extension
that is used by the KafkaConsumerService
to initialize consumers. It must implement the IConsumer
interface and should be using the @consumer()
decorator. If you want the consumers to start at the start of your application, you should pass the initObservers
config to the Component configuration.
// application.ts
this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
...
initObservers: true
...
});
// start.consumer.ts
@consumer<TestStream, Events.start>()
export class StartConsumer implements IConsumer<TestStream, Events.start> {
constructor(
@inject('test.handler.start')
public handler: StreamHandler<TestStream, Events.start>,
) {}
topic: Topics.First = Topics.First;
event: Events.start = Events.start;
// you can write the handler as a method
handler(payload: StartEvent) {
console.log(payload);
}
}
If you want to write a shared handler for different events, you can use the eventHandlerKey
to bind a handler in the application -
// application.ts
this.bind(eventHandlerKey(Events.Start)).to((payload: StartEvent) => {
console.log(payload);
});
this.bind(eventHandlerKey<TestStream, Events.Stop>(Events.Stop)).toProvider(
CustomEventHandlerProvider,
);
and then you can use the handler using the @eventHandler
decorator -
// start.consumer.ts
@consumer<TestStream, Events.start>()
export class StartConsumer implements IConsumer<TestStream, Events.start> {
constructor(
@eventHandler<TestStream>(Events.Start)
public handler: StreamHandler<TestStream, Events.start>,
) {}
topic: Topics.First = Topics.First;
event: Events.start = Events.start;
}
A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the @producer(TOPIC_NAME)
decorator.
Note: The topic name passed to decorator must be first configured in the Component configuration's topic property -
// application.ts
...
this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
...
topics: [Topics.First],
...
});
...
// test.service.ts
...
class TestService {
constructor(
@producer(Topics.First)
private producer: Producer<TestStream>
) {}
}