Skip to content

Commit

Permalink
fixes consumer pull disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
3axap4eHko committed Mar 28, 2024
1 parent 1ad718f commit 52f275c
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/utils/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,17 @@ export async function createConsumer(client: Kafka, group: string, topic: string
await consumer.connect();
await consumer.subscribe(consumerOptions);

const pool = new Pool<EachMessagePayload>([], poolOptions);
pool.onDone(() => {
consumer.disconnect();
});

consumer.run({
await consumer.run({
eachMessage: async (payload) => {
pool.push(payload);
},
}).catch(e => console.error(e));

const pool = new Pool<EachMessagePayload>([], poolOptions);
pool.onDone(() => {
consumer.disconnect();
});

return pool;
}

Expand Down

0 comments on commit 52f275c

Please sign in to comment.