Skip to content

Commit

Permalink
rpatterns: Make memcursor concurrent safe
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewwormald committed Nov 16, 2023
1 parent 69ee1a6 commit ece2ec5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
7 changes: 7 additions & 0 deletions rpatterns/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpatterns
import (
"context"
"strconv"
"sync"

"github.com/luno/reflex"
)
Expand Down Expand Up @@ -75,14 +76,20 @@ func MemCursorStore(opts ...MemOpt) reflex.CursorStore {
}

type memCursorStore struct {
mu sync.Mutex
cursors map[string]string
}

func (m *memCursorStore) GetCursor(_ context.Context, consumerName string) (string, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.cursors[consumerName], nil
}

func (m *memCursorStore) SetCursor(_ context.Context, consumerName string, cursor string) error {
m.mu.Lock()
defer m.mu.Unlock()

if m.cursors == nil {
m.cursors = make(map[string]string)
}
Expand Down
33 changes: 33 additions & 0 deletions rpatterns/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"github.com/luno/jettison/jtest"
Expand Down Expand Up @@ -160,3 +161,35 @@ func TestMemoryCStore(t *testing.T) {
require.NoError(t, err)
require.Equal(t, c2, actual)
}

func TestConcurrentWrites(t *testing.T) {
var (
writerReadyGroup sync.WaitGroup
writerCompletedGroup sync.WaitGroup
)

store := rpatterns.MemCursorStore()

writerCount := 100
writerReadyGroup.Add(writerCount)
for i := 0; i < writerCount; i++ {
writerCompletedGroup.Add(1)
go func(writerReadyGroup *sync.WaitGroup, writerCompletedGroup *sync.WaitGroup) {
writerReadyGroup.Done()
for i := 0; i < 100; i++ {
// Write the thing
val := fmt.Sprintf("%v", i)
err := store.SetCursor(context.TODO(), "single-key", val)
jtest.RequireNil(t, err)

// Flush is a noop but covering it will ensure that it stays concurrent safe
err = store.Flush(context.TODO())
jtest.RequireNil(t, err)
}
writerCompletedGroup.Done()
}(&writerReadyGroup, &writerCompletedGroup)
}

writerReadyGroup.Wait()
writerCompletedGroup.Wait()
}

0 comments on commit ece2ec5

Please sign in to comment.