Skip to content

Commit

Permalink
Process Kafka messages in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
radazen committed May 2, 2024
1 parent d4303dc commit 0d754d5
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 68 deletions.
137 changes: 137 additions & 0 deletions kafka-streamer/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package main

import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"sync"
"time"
)

type workerMessage[T any] struct {
Message *kafka.Message
Index int
}

type messageBatcher[T any] struct {
maxSize int
timeoutDuration time.Duration
nextTimeout time.Time
entries []T
errors []error
errorLock sync.Mutex
workerPool chan workerMessage[T]
wg sync.WaitGroup
closeCh chan struct{}
closed bool
parseF func(context.Context, *kafka.Message) (T, error)
submitF func(context.Context, []T) error
}

func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount int, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error) *messageBatcher[T] {
mb := &messageBatcher[T]{
maxSize: maxSize,
timeoutDuration: timeout,
entries: make([]T, 0, maxSize),
workerPool: make(chan workerMessage[T], workerCount), // Buffered according to number of workers
closeCh: make(chan struct{}),
parseF: parseF,
submitF: submitF,
}

for i := 0; i < workerCount; i++ {
go mb.worker()
}

return mb
}

func (mb *messageBatcher[T]) Add(ctx context.Context, msg *kafka.Message) error {
if mb.closed {
return fmt.Errorf("cannot add message: batcher is stopped")
}

mb.wg.Add(1)
index := len(mb.entries)
if index == 0 {
// The first message added starts the timeout clock
mb.nextTimeout = time.Now().Add(mb.timeoutDuration)
}
mb.entries = append(mb.entries, *new(T))
mb.workerPool <- workerMessage[T]{Message: msg, Index: index}
return nil
}

func (mb *messageBatcher[T]) worker() {
for wm := range mb.workerPool {
result, err := mb.parseF(context.Background(), wm.Message)
if err != nil {
mb.errorLock.Lock()
mb.errors = append(mb.errors, err)
mb.errorLock.Unlock()
} else {
mb.entries[wm.Index] = result
}
mb.wg.Done()
}
}

func (mb *messageBatcher[T]) IsReady() bool {
if len(mb.entries) == 0 {
return false
}

return len(mb.entries) >= mb.maxSize || time.Now().After(mb.nextTimeout)
}

func (mb *messageBatcher[T]) Submit(ctx context.Context, c *kafka.Consumer) error {
if len(mb.entries) == 0 {
return nil
}

mb.wg.Wait()

if len(mb.errors) > 0 {
return fmt.Errorf("errors occurred during batch processing: %v", mb.errors)
}

if readOnlyMode {
mb.Reset()
return nil
}

err := mb.submitF(ctx, mb.entries)
if err != nil {
return fmt.Errorf("failed to submit batch: %w", err)
}

_, err = c.Commit()
if err != nil {
return fmt.Errorf("failed to commit offsets: %w", err)
}

mb.Reset()
return nil
}

func (mb *messageBatcher[T]) Reset() {
// Wait for any outstanding workers to finish processing
if len(mb.entries) > 0 {
mb.wg.Wait()
}

mb.nextTimeout = time.Time{}
mb.entries = make([]T, 0, mb.maxSize)
mb.errors = nil
}

func (mb *messageBatcher[T]) Stop() {
if mb.closed {
return
}
mb.closed = true

mb.wg.Wait() // Wait for all workers to finish processing
close(mb.closeCh) // Close the shutdown signal channel
close(mb.workerPool) // Safely close the worker pool channel
}
82 changes: 14 additions & 68 deletions kafka-streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"time"
)

// Enable for debugging; will not commit offsets or write to the database
const readOnlyMode = false

type streamerConfig struct {
Topic string
Batcher batcher
Expand Down Expand Up @@ -63,7 +66,7 @@ func newEthereumOwnerConfig(deserializer *avro.GenericDeserializer, queries *mir

return &streamerConfig{
Topic: "ethereum.owner.v4",
Batcher: newMessageBatcher(250, time.Second, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

Expand All @@ -78,7 +81,7 @@ func newEthereumTokenConfig(deserializer *avro.GenericDeserializer, queries *mir

return &streamerConfig{
Topic: "ethereum.nft.v4",
Batcher: newMessageBatcher(250, time.Second, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

Expand All @@ -99,7 +102,7 @@ func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord

return &streamerConfig{
Topic: "base.owner.v4",
Batcher: newMessageBatcher(250, time.Second, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

Expand All @@ -120,7 +123,7 @@ func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord

return &streamerConfig{
Topic: "base.nft.v4",
Batcher: newMessageBatcher(250, time.Second, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

Expand All @@ -141,7 +144,7 @@ func newZoraOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord

return &streamerConfig{
Topic: "zora.owner.v4",
Batcher: newMessageBatcher(250, time.Second, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

Expand All @@ -162,7 +165,7 @@ func newZoraTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord

return &streamerConfig{
Topic: "zora.nft.v4",
Batcher: newMessageBatcher(250, time.Second, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

Expand Down Expand Up @@ -272,73 +275,12 @@ func (m *messageStats) Update(ctx context.Context, msg *kafka.Message) {

type batcher interface {
Reset()
Stop()
Add(context.Context, *kafka.Message) error
IsReady() bool
Submit(context.Context, *kafka.Consumer) error
}

type messageBatcher[T any] struct {
maxSize int
timeoutDuration time.Duration
parseF func(context.Context, *kafka.Message) (T, error)
submitF func(context.Context, []T) error

entries []T
nextTimeout time.Time
}

func newMessageBatcher[T any](maxSize int, timeout time.Duration, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error) *messageBatcher[T] {
return &messageBatcher[T]{
maxSize: maxSize,
timeoutDuration: timeout,
parseF: parseF,
submitF: submitF,
}
}

func (b *messageBatcher[T]) Reset() {
b.entries = []T{}
b.nextTimeout = time.Time{}
}

func (b *messageBatcher[T]) Add(ctx context.Context, msg *kafka.Message) error {
t, err := b.parseF(ctx, msg)
if err != nil {
return fmt.Errorf("failed to parse message: %w", err)
}

b.entries = append(b.entries, t)
b.nextTimeout = time.Now().Add(b.timeoutDuration)
return nil
}

func (b *messageBatcher[T]) IsReady() bool {
if len(b.entries) == 0 {
return false
}

return len(b.entries) >= b.maxSize || time.Now().After(b.nextTimeout)
}

func (b *messageBatcher[T]) Submit(ctx context.Context, c *kafka.Consumer) error {
if len(b.entries) == 0 {
return nil
}

err := b.submitF(ctx, b.entries)
if err != nil {
return fmt.Errorf("failed to submit batch: %w", err)
}

_, err = c.Commit()
if err != nil {
return fmt.Errorf("failed to commit offsets: %w", err)
}

b.entries = []T{}
return nil
}

func streamTopic(ctx context.Context, config *streamerConfig) error {
ctx = logger.NewContextWithFields(ctx, logrus.Fields{"topic": config.Topic})

Expand Down Expand Up @@ -776,6 +718,10 @@ type queryBatchExecuter interface {
}

func submitBatch[TBatch queryBatchExecuter, TEntries any](ctx context.Context, queryF func(context.Context, []TEntries) TBatch, entries []TEntries) error {
if readOnlyMode {
return nil
}

b := queryF(ctx, entries)
defer b.Close()

Expand Down

0 comments on commit 0d754d5

Please sign in to comment.