Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use circular buffer in FiniteReplayProvider #23

Merged
merged 9 commits into from
Mar 1, 2024
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@

This file tracks changes to this project. It follows the [Keep a Changelog format](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Removed

- `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead

hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
### Changed

- Due to a change in the internal implementation, the `FiniteReplayProvider` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped.
### Added

- `NewFiniteReplayProvider` constructor

### Fixed

- `FiniteReplayProvider` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayProvider` would store and replay more events than it was configured to.

## [0.8.0] - 2024-01-30

This version removes all external dependencies of `go-sse`. All our bugs are belong to us! It also does some API and documentation cleanups.
Expand Down
7 changes: 5 additions & 2 deletions joe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ data: world
func TestJoe_errors(t *testing.T) {
t.Parallel()

fin, err := sse.NewFiniteReplayProvider(1, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved

j := &sse.Joe{
ReplayProvider: &sse.FiniteReplayProvider{Count: 1},
ReplayProvider: fin,
}
defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant

Expand All @@ -247,7 +250,7 @@ func TestJoe_errors(t *testing.T) {
return callErr
})

err := j.Subscribe(context.Background(), sse.Subscription{
err = j.Subscribe(context.Background(), sse.Subscription{
Client: client,
LastEventID: sse.ID("0"),
Topics: []string{sse.DefaultTopic},
Expand Down
111 changes: 91 additions & 20 deletions replay_provider.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,127 @@
package sse

import (
"errors"
"strconv"
"time"
)

// NewFiniteReplayProvider creates a finite replay provider with the given max
// count and auto ID behaviour.
//
// Count is the maximum number of events FiniteReplayProvider should hold as
// valid. It must be greater than zero.
//
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of
// events.
func NewFiniteReplayProvider(
count int, autoIDs bool,
) (*FiniteReplayProvider, error) {
if count < 2 {
return nil, errors.New("count must be at least 2")
}

return &FiniteReplayProvider{
cap: count,
buf: make([]messageWithTopics, count),
autoIDs: autoIDs,
}, nil
}

// FiniteReplayProvider is a replay provider that replays at maximum a certain number of events.
// The events must have an ID unless the AutoIDs flag is toggled.
type FiniteReplayProvider struct {

Check failure on line 33 in replay_provider.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 16 pointer bytes could be 8 (govet)
b buffer

// Count is the maximum number of events FiniteReplayProvider should hold as valid.
// It must be a positive integer, or the code will panic.
Count int
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of events.
AutoIDs bool
cap int
buf []messageWithTopics
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
head int
tail int
autoIDs bool
currentID int64
}

// Put puts a message into the provider's buffer. If there are more messages than the maximum
// number, the oldest message is removed.
func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message {
if f.b == nil {
f.b = getBuffer(f.AutoIDs, f.Count)
if len(topics) == 0 {
panic(errors.New(
"go-sse: no topics provided for Message.\n" +
formatMessagePanicString(message)))
}

if f.autoIDs {
f.currentID++

message.ID = ID(strconv.FormatInt(f.currentID, 10))
} else if !message.ID.IsSet() {
panicString := "go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.\n" + formatMessagePanicString(message)

panic(errors.New(panicString))
}

if f.b.len() == f.b.cap() {
f.b.dequeue()
f.buf[f.tail] = messageWithTopics{message: message, topics: topics}

f.tail++
if f.tail >= f.cap {
f.tail = 0
}

return f.b.queue(message, topics)
if f.tail == f.head {
f.head = f.tail + 1

if f.head > f.cap {
f.head = 0
}
}

return message
}

// Replay replays the messages in the buffer to the listener.
// It doesn't take into account the messages' expiry times.
func (f *FiniteReplayProvider) Replay(subscription Subscription) error {
if f.b == nil {
if f.head == f.tail {
return nil
}

events := f.b.slice(subscription.LastEventID)
if len(events) == 0 {
return nil
// Replay head to end and start to tail when head is after tail.
if f.tail < f.head {
foundFirst, err := replay(subscription, f.buf[f.tail:], false)
if err != nil {
return err
}

_, err = replay(subscription, f.buf[0:f.tail], foundFirst)
if err != nil {
return err
}
} else {
_, err := replay(subscription, f.buf[0:f.tail], false)
if err != nil {
return err
}
}

return subscription.Client.Flush()
}

func replay(
sub Subscription, events []messageWithTopics, foundFirstEvent bool,
) (hasFoundFirstEvent bool, err error) {
for _, e := range events {
if topicsIntersect(subscription.Topics, e.topics) {
if err := subscription.Client.Send(e.message); err != nil {
return err
if !foundFirstEvent && e.message.ID == sub.LastEventID {
foundFirstEvent = true

continue
}

if foundFirstEvent && topicsIntersect(sub.Topics, e.topics) {
if err := sub.Client.Send(e.message); err != nil {
return false, err
}
}
}

return subscription.Client.Flush()
return foundFirstEvent, nil
}

// ValidReplayProvider is a ReplayProvider that replays all the buffered non-expired events.
Expand Down
79 changes: 74 additions & 5 deletions replay_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sse_test

import (
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -108,7 +110,13 @@ func TestValidReplayProvider(t *testing.T) {
func TestFiniteReplayProvider(t *testing.T) {
t.Parallel()

p := &sse.FiniteReplayProvider{Count: 3}
_, err := sse.NewFiniteReplayProvider(1, false)
if err == nil {
t.Fatal("should not create FiniteReplayProvider with count less than 2")
}

p, err := sse.NewFiniteReplayProvider(3, false)
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

tests.Equal(t, p.Replay(sse.Subscription{}), nil, "replay failed on provider without messages")

Expand Down Expand Up @@ -142,11 +150,72 @@ The message is the following:
tests.Equal(t, replayed.String(), "id: 4\ndata: world\n\n", "invalid replayed message")

p.Put(msg(t, "", "5"), []string{"t"})
p.Put(msg(t, "", "6"), []string{"t"})
p.Put(msg(t, "again", "7"), []string{sse.DefaultTopic})
p.Put(msg(t, "again", "6"), []string{sse.DefaultTopic})

replayed = replay(t, p, sse.ID("4"), sse.DefaultTopic, "topic with no messages")[0]
tests.Equal(t, replayed.String(), "id: 7\ndata: again\n\n", "invalid replayed message")
tests.Equal(t, replayed.String(), "id: 6\ndata: again\n\n", "invalid replayed message")

tr, err := sse.NewFiniteReplayProvider(10, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

testReplayError(t, tr, nil)
}

func TestFiniteReplayProvider_allocations(t *testing.T) {
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
p, err := sse.NewFiniteReplayProvider(3, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

const runs = 100

topics := []string{sse.DefaultTopic}
// Add one to the number of runs to take the warmup run of
// AllocsPerRun() into account.
queue := make([]*sse.Message, runs+1)
lastID := runs

for i := 0; i < len(queue); i++ {
queue[i] = msg(t,
fmt.Sprintf("message %d", i),
strconv.Itoa(i),
)
}

var run int

avgAllocs := testing.AllocsPerRun(runs, func() {
_ = p.Put(queue[run], topics)

run++
})

tests.Equal(t, avgAllocs, 0, "no allocations should be made on Put()")

var replayCount int

cb := mockClient(func(m *sse.Message) error {
if m != nil {
replayCount++
}

return nil
})

sub := sse.Subscription{
Client: cb,
Topics: topics,
}

sub.LastEventID = sse.ID(strconv.Itoa(lastID - 3))

err = p.Replay(sub)
tests.Equal(t, err, nil, "replay from fourth last should succeed")

tests.Equal(t, replayCount, 0, "replay from fourth last should not yield messages")

sub.LastEventID = sse.ID(strconv.Itoa(lastID - 2))

err = p.Replay(sub)
tests.Equal(t, err, nil, "replay from third last should succeed")

testReplayError(t, &sse.FiniteReplayProvider{Count: 10}, nil)
tests.Equal(t, replayCount, 2, "replay from third last should yield 2 messages")
}
Loading