diff --git a/kafka-streamer/batcher.go b/kafka-streamer/batcher.go new file mode 100644 index 000000000..0f6ac1f3c --- /dev/null +++ b/kafka-streamer/batcher.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" + "sync" + "time" +) + +type workerMessage[T any] struct { + Message *kafka.Message + Index int +} + +type messageBatcher[T any] struct { + maxSize int + timeoutDuration time.Duration + nextTimeout time.Time + entries []T + errors []error + errorLock sync.Mutex + workerPool chan workerMessage[T] + wg sync.WaitGroup + closeCh chan struct{} + closed bool + parseF func(context.Context, *kafka.Message) (T, error) + submitF func(context.Context, []T) error +} + +func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount int, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error) *messageBatcher[T] { + mb := &messageBatcher[T]{ + maxSize: maxSize, + timeoutDuration: timeout, + entries: make([]T, 0, maxSize), + workerPool: make(chan workerMessage[T], workerCount), // Buffered according to number of workers + closeCh: make(chan struct{}), + parseF: parseF, + submitF: submitF, + } + + for i := 0; i < workerCount; i++ { + go mb.worker() + } + + return mb +} + +func (mb *messageBatcher[T]) Add(ctx context.Context, msg *kafka.Message) error { + if mb.closed { + return fmt.Errorf("cannot add message: batcher is stopped") + } + + mb.wg.Add(1) + index := len(mb.entries) + if index == 0 { + // The first message added starts the timeout clock + mb.nextTimeout = time.Now().Add(mb.timeoutDuration) + } + mb.entries = append(mb.entries, *new(T)) + mb.workerPool <- workerMessage[T]{Message: msg, Index: index} + return nil +} + +func (mb *messageBatcher[T]) worker() { + for wm := range mb.workerPool { + result, err := mb.parseF(context.Background(), wm.Message) + if err != nil { + mb.errorLock.Lock() + mb.errors = append(mb.errors, err) + mb.errorLock.Unlock() + } else { + mb.entries[wm.Index] = result + } + mb.wg.Done() + } +} + +func (mb *messageBatcher[T]) IsReady() bool { + if len(mb.entries) == 0 { + return false + } + + return len(mb.entries) >= mb.maxSize || time.Now().After(mb.nextTimeout) +} + +func (mb *messageBatcher[T]) Submit(ctx context.Context, c *kafka.Consumer) error { + if len(mb.entries) == 0 { + return nil + } + + mb.wg.Wait() + + if len(mb.errors) > 0 { + return fmt.Errorf("errors occurred during batch processing: %v", mb.errors) + } + + if readOnlyMode { + mb.Reset() + return nil + } + + err := mb.submitF(ctx, mb.entries) + if err != nil { + return fmt.Errorf("failed to submit batch: %w", err) + } + + _, err = c.Commit() + if err != nil { + return fmt.Errorf("failed to commit offsets: %w", err) + } + + mb.Reset() + return nil +} + +func (mb *messageBatcher[T]) Reset() { + // Wait for any outstanding workers to finish processing + if len(mb.entries) > 0 { + mb.wg.Wait() + } + + mb.nextTimeout = time.Time{} + mb.entries = make([]T, 0, mb.maxSize) + mb.errors = nil +} + +func (mb *messageBatcher[T]) Stop() { + if mb.closed { + return + } + mb.closed = true + + mb.wg.Wait() // Wait for all workers to finish processing + close(mb.closeCh) // Close the shutdown signal channel + close(mb.workerPool) // Safely close the worker pool channel +} diff --git a/kafka-streamer/main.go b/kafka-streamer/main.go index d933a09cc..e67d89012 100644 --- a/kafka-streamer/main.go +++ b/kafka-streamer/main.go @@ -25,6 +25,9 @@ import ( "time" ) +// Enable for debugging; will not commit offsets or write to the database +const readOnlyMode = false + type streamerConfig struct { Topic string Batcher batcher @@ -63,7 +66,7 @@ func newEthereumOwnerConfig(deserializer *avro.GenericDeserializer, queries *mir return &streamerConfig{ Topic: "ethereum.owner.v4", - Batcher: newMessageBatcher(250, time.Second, parseF, submitF), + Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), } } @@ -78,7 +81,7 @@ func newEthereumTokenConfig(deserializer *avro.GenericDeserializer, queries *mir return &streamerConfig{ Topic: "ethereum.nft.v4", - Batcher: newMessageBatcher(250, time.Second, parseF, submitF), + Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), } } @@ -99,7 +102,7 @@ func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord return &streamerConfig{ Topic: "base.owner.v4", - Batcher: newMessageBatcher(250, time.Second, parseF, submitF), + Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), } } @@ -120,7 +123,7 @@ func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord return &streamerConfig{ Topic: "base.nft.v4", - Batcher: newMessageBatcher(250, time.Second, parseF, submitF), + Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), } } @@ -141,7 +144,7 @@ func newZoraOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord return &streamerConfig{ Topic: "zora.owner.v4", - Batcher: newMessageBatcher(250, time.Second, parseF, submitF), + Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), } } @@ -162,7 +165,7 @@ func newZoraTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord return &streamerConfig{ Topic: "zora.nft.v4", - Batcher: newMessageBatcher(250, time.Second, parseF, submitF), + Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), } } @@ -272,73 +275,12 @@ func (m *messageStats) Update(ctx context.Context, msg *kafka.Message) { type batcher interface { Reset() + Stop() Add(context.Context, *kafka.Message) error IsReady() bool Submit(context.Context, *kafka.Consumer) error } -type messageBatcher[T any] struct { - maxSize int - timeoutDuration time.Duration - parseF func(context.Context, *kafka.Message) (T, error) - submitF func(context.Context, []T) error - - entries []T - nextTimeout time.Time -} - -func newMessageBatcher[T any](maxSize int, timeout time.Duration, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error) *messageBatcher[T] { - return &messageBatcher[T]{ - maxSize: maxSize, - timeoutDuration: timeout, - parseF: parseF, - submitF: submitF, - } -} - -func (b *messageBatcher[T]) Reset() { - b.entries = []T{} - b.nextTimeout = time.Time{} -} - -func (b *messageBatcher[T]) Add(ctx context.Context, msg *kafka.Message) error { - t, err := b.parseF(ctx, msg) - if err != nil { - return fmt.Errorf("failed to parse message: %w", err) - } - - b.entries = append(b.entries, t) - b.nextTimeout = time.Now().Add(b.timeoutDuration) - return nil -} - -func (b *messageBatcher[T]) IsReady() bool { - if len(b.entries) == 0 { - return false - } - - return len(b.entries) >= b.maxSize || time.Now().After(b.nextTimeout) -} - -func (b *messageBatcher[T]) Submit(ctx context.Context, c *kafka.Consumer) error { - if len(b.entries) == 0 { - return nil - } - - err := b.submitF(ctx, b.entries) - if err != nil { - return fmt.Errorf("failed to submit batch: %w", err) - } - - _, err = c.Commit() - if err != nil { - return fmt.Errorf("failed to commit offsets: %w", err) - } - - b.entries = []T{} - return nil -} - func streamTopic(ctx context.Context, config *streamerConfig) error { ctx = logger.NewContextWithFields(ctx, logrus.Fields{"topic": config.Topic}) @@ -776,6 +718,10 @@ type queryBatchExecuter interface { } func submitBatch[TBatch queryBatchExecuter, TEntries any](ctx context.Context, queryF func(context.Context, []TEntries) TBatch, entries []TEntries) error { + if readOnlyMode { + return nil + } + b := queryF(ctx, entries) defer b.Close()