Skip to content

Commit

Permalink
rpatterns: Make memcursor concurrent safe
Browse files Browse the repository at this point in the history
Summary: This MR enables the memory cursor to be concurrent safe in cases where the testing involves concurrency and prevents a panic when multiple go routines try to write to the store at the same time causing a panic with the message "concurrent map writes"
  • Loading branch information
andrewwormald committed Nov 16, 2023
1 parent 69ee1a6 commit 2371e7c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
7 changes: 7 additions & 0 deletions rpatterns/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpatterns
import (
"context"
"strconv"
"sync"

"github.com/luno/reflex"
)
Expand Down Expand Up @@ -75,14 +76,20 @@ func MemCursorStore(opts ...MemOpt) reflex.CursorStore {
}

type memCursorStore struct {
mu sync.Mutex
cursors map[string]string
}

func (m *memCursorStore) GetCursor(_ context.Context, consumerName string) (string, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.cursors[consumerName], nil
}

func (m *memCursorStore) SetCursor(_ context.Context, consumerName string, cursor string) error {
m.mu.Lock()
defer m.mu.Unlock()

if m.cursors == nil {
m.cursors = make(map[string]string)
}
Expand Down
35 changes: 35 additions & 0 deletions rpatterns/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"github.com/luno/jettison/jtest"
Expand Down Expand Up @@ -160,3 +161,37 @@ func TestMemoryCStore(t *testing.T) {
require.NoError(t, err)
require.Equal(t, c2, actual)
}

func TestConcurrentWrites(t *testing.T) {
var (
writerReadyGroup sync.WaitGroup
writerCompletedGroup sync.WaitGroup
)

store := rpatterns.MemCursorStore()
ctx := context.Background()

writerCount := 100
writerReadyGroup.Add(writerCount)
for i := 0; i < writerCount; i++ {
writerCompletedGroup.Add(1)
go func(writerReadyGroup *sync.WaitGroup, writerCompletedGroup *sync.WaitGroup) {
writerReadyGroup.Done()
writerReadyGroup.Wait()
for i := 0; i < 100; i++ {
// Write the thing
val := fmt.Sprintf("%v", i)
err := store.SetCursor(ctx, "single-key", val)
jtest.RequireNil(t, err)

// Flush is a noop but covering it will ensure that it stays concurrent safe
err = store.Flush(ctx)
jtest.RequireNil(t, err)
}
writerCompletedGroup.Done()
}(&writerReadyGroup, &writerCompletedGroup)
}

writerReadyGroup.Wait()
writerCompletedGroup.Wait()
}

0 comments on commit 2371e7c

Please sign in to comment.