A progressive Node.js framework for building efficient and scalable server-side applications.
Process and Publish Kafka message by batch in NestJS. Cross compatible with ServerKafka
and ClientKafka
from @nestjs/microservices
package.
$ npm i --save @tawkto/nestjs-batch-kafka
To use the batch kafka consumer, initialize BatchKafkaServer
in your main.ts
file by connecting the microservice to your app.
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
// The config is the same as the KafkaOptions from the @nestjs/microservices package
strategy: new KafkaBatchServer({
client: {
brokers: ['localhost:52800', 'localhost:52801'],
},
consumer: {
groupId: 'test',
heartbeatInterval: 5000,
sessionTimeout: 30000,
},
run: {
autoCommitInterval: 5000,
autoCommitThreshold: 100,
partitionsConsumedConcurrently: 4,
},
})
})
Then you can start consuming the events in batches as follow
@BatchProcessor('test')
async test(
@Payload() data: any[],
@Ctx() context: KafkaBatchContext,
) {
const heartbeat = context.getHeartbeat();
const resolveOffset = context.getResolveOffset();
const commitOffsetsIfNecessary = context.getCommitOffsetsIfNecessary();
await heartbeat();
for (const message of data) {
console.log(message);
}
resolveOffset(context.getMessages().at(-1).offset);
console.log("Batch resolved");
await heartbeat();
await commitOffsetsIfNecessary();
}
The KafkaBatchContext
object provides the necessary components from kafkajs
's EachBatchPayload
:
Method | Type | Description |
---|---|---|
getMessages |
KafkaMessage[] |
Get the raw messages from Kafka in the batch |
getConsumer |
KafkaConsumer |
Get the consumer instance |
getResolveOffset |
function |
Get the resolve offset method |
getHeartbeat |
function |
Get the heartbeat method |
getPause |
function |
Get the pause method |
getCommitOffsetsIfNecessary |
function |
Get the commit offsets if necessary method |
getUncommittedOffsets |
OffsetsByTopicPartition |
Get the uncommitted offsets |
getIsRunning |
boolean |
Indicate if the consumer is still running |
getIsStale |
boolean |
Indicate if the consumer is stale |
The KafkaBatchClient
is exactly the same as the KafkaClient
from the @nestjs/microservices
package, except that client.send
method is removed from the client as batch messages should not be used for request-response
communication. On top of that, KafkaBatchClient
also have the capability to publish batch messages or publish to multiple topics just like in kafkajs
.
@Module({
imports: [
ClientsModule.register([{
name: 'KAFKA_BATCH_CLIENT',
customClass: KafkaBatchClient,
options: {
client: {
brokers: ['localhost:52800', 'localhost:52801'],
},
consumer: {
groupId: 'test',
heartbeatInterval: 5000,
sessionTimeout: 30000,
},
},
}]),
],
})
export class AppModule {}
Then you can inject and use the KafkaBatchClient
in your service as follow
@Injectable()
export class AppService {
constructor(
@Inject('KAFKA_BATCH_CLIENT')
private kafkaClient: KafkaBatchClient,
) {}
async eventToBatch() {
this.kafkaClient.emit('test', { example: 'data'});
}
async publishBatch() {
// equivalent to kafkajs producer.send
this.kafkaClient.emitBatch('test', [{
example: 'data1'
}, {
example: 'data2'
}])
}
async publishBatchTopics() {
// will publish to two topics, topic1 and topic2
// equivalent to kafkajs producer.publishBatch
this.kafkaClient.emitBatchTopics([{
pattern: 'topic1',
data: [{ example: 'data11' }, { example: 'data12' }]
}, {
pattern: 'topic2',
data: [{ example: 'data21' }, { example: 'data22' }]
}])
}
}
Calling send
with the KafkaBatchClient
will result in an error.
this.kafkaClient.send('send', { data: 'data'}); // Error