Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use circular buffer in FiniteReplayProvider #23

Merged
merged 9 commits into from
Mar 1, 2024
7 changes: 5 additions & 2 deletions joe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ data: world
func TestJoe_errors(t *testing.T) {
t.Parallel()

fin, err := sse.NewFiniteReplayProvider(1, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved

j := &sse.Joe{
ReplayProvider: &sse.FiniteReplayProvider{Count: 1},
ReplayProvider: fin,
}
defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant

Expand All @@ -247,7 +250,7 @@ func TestJoe_errors(t *testing.T) {
return callErr
})

err := j.Subscribe(context.Background(), sse.Subscription{
err = j.Subscribe(context.Background(), sse.Subscription{
Client: client,
LastEventID: sse.ID("0"),
Topics: []string{sse.DefaultTopic},
Expand Down
122 changes: 105 additions & 17 deletions replay_provider.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,136 @@
package sse

import (
"errors"
"strconv"
"sync"
"time"
)

// NewFiniteReplayProvider creates a finite replay provider with the given max
// count and auto ID behaviour.
//
// Count is the maximum number of events FiniteReplayProvider should hold as
// valid. It must be greater than zero.
//
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of
// events.
func NewFiniteReplayProvider(
count int, autoIDs bool,
) (*FiniteReplayProvider, error) {
if count < 1 {
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("count must be greater than zero")
}

return &FiniteReplayProvider{
cap: count,
buf: make([]messageWithTopics, count),
count: count,
autoIDs: autoIDs,
}, nil
}

// FiniteReplayProvider is a replay provider that replays at maximum a certain number of events.
// The events must have an ID unless the AutoIDs flag is toggled.
type FiniteReplayProvider struct {
b buffer

// Count is the maximum number of events FiniteReplayProvider should hold as valid.
// It must be a positive integer, or the code will panic.
Count int
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of events.
AutoIDs bool
mut sync.RWMutex
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
cap int
buf []messageWithTopics
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
head int
tail int
count int
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
autoIDs bool
currentID int64
}

// Put puts a message into the provider's buffer. If there are more messages than the maximum
// number, the oldest message is removed.
func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message {
if f.b == nil {
f.b = getBuffer(f.AutoIDs, f.Count)
if len(topics) == 0 {
panic(errors.New(
"go-sse: no topics provided for Message.\n" +
formatMessagePanicString(message)))
}

if f.b.len() == f.b.cap() {
f.b.dequeue()
f.mut.Lock()
defer f.mut.Unlock()

if f.autoIDs {
f.currentID++

message.ID = ID(strconv.FormatInt(f.currentID, 10))
} else if !message.ID.IsSet() {
panicString := "go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.\n" + formatMessagePanicString(message)

panic(errors.New(panicString))
}

f.buf[f.tail] = messageWithTopics{message: message, topics: topics}

f.tail++
if f.tail >= f.cap {
f.tail = 0
}

return f.b.queue(message, topics)
if f.tail == f.head {
f.head = f.tail + 1

if f.head > f.cap {
f.head = 0
}
}

return message
}

// Replay replays the messages in the buffer to the listener.
// It doesn't take into account the messages' expiry times.
func (f *FiniteReplayProvider) Replay(subscription Subscription) error {
if f.b == nil {
f.mut.RLock()

if f.head == f.tail {
f.mut.RUnlock()
return nil
}

events := f.b.slice(subscription.LastEventID)
if len(events) == 0 {
return nil
// The assumption here is that most of the lifetime of an application
// the circular buffer would be full, so we might as well allocate in
// one go.
vs := make([]messageWithTopics, 0, f.cap)

var n int

// Copy head to end and start to tail when head is after tail.
if f.tail < f.head {
n = f.cap - f.tail
copy(vs[0:n], f.buf[f.tail:])
copy(vs[n:n+f.tail], f.buf[0:f.tail])
// If the head is after the tail the buffer is full.
n = f.cap
} else {
n = f.tail - f.head
copy(vs[0:n], f.buf[f.head:f.tail])
}

f.mut.RUnlock()

values := vs[0:n]

for i := range values {
// This could be done as part of the the initial copy to vs,
// leaving it as a separate op for now.
if values[i].message.ID.value == subscription.LastEventID.value {
if i == len(values)-1 {
values = nil
} else {
values = values[i+1:]
}

break
}
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
}

for _, e := range events {
for _, e := range values {
if topicsIntersect(subscription.Topics, e.topics) {
if err := subscription.Client.Send(e.message); err != nil {
return err
Expand Down
8 changes: 6 additions & 2 deletions replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func TestValidReplayProvider(t *testing.T) {
func TestFiniteReplayProvider(t *testing.T) {
t.Parallel()

p := &sse.FiniteReplayProvider{Count: 3}
p, err := sse.NewFiniteReplayProvider(3, false)
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

tests.Equal(t, p.Replay(sse.Subscription{}), nil, "replay failed on provider without messages")

Expand Down Expand Up @@ -148,5 +149,8 @@ The message is the following:
replayed = replay(t, p, sse.ID("4"), sse.DefaultTopic, "topic with no messages")[0]
tests.Equal(t, replayed.String(), "id: 7\ndata: again\n\n", "invalid replayed message")

testReplayError(t, &sse.FiniteReplayProvider{Count: 10}, nil)
tr, err := sse.NewFiniteReplayProvider(10, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

testReplayError(t, tr, nil)
}