Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use circular buffer in FiniteReplayProvider #23

Merged
merged 9 commits into from
Mar 1, 2024
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@

This file tracks changes to this project. It follows the [Keep a Changelog format](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Removed

- `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead

hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
### Changed

- Due to a change in the internal implementation, the `FiniteReplayProvider` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped.
### Added

- `NewFiniteReplayProvider` constructor

### Fixed

- `FiniteReplayProvider` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayProvider` would store and replay more events than it was configured to.

## [0.8.0] - 2024-01-30

This version removes all external dependencies of `go-sse`. All our bugs are belong to us! It also does some API and documentation cleanups.
Expand Down
6 changes: 3 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestConnection_Connect_resetBody(t *testing.T) {
},
}

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
time.Sleep(time.Millisecond * 5)
}))
defer ts.Close()
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestConnection_Connect_validator(t *testing.T) {
},
}

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}))
defer ts.Close()

c := &sse.Client{
Expand Down Expand Up @@ -611,7 +611,7 @@ func TestConnection_reconnect(t *testing.T) {
retries := 0
c := sse.Client{
HTTPClient: ts.Client(),
OnRetry: func(err error, d time.Duration) {
OnRetry: func(_ error, _ time.Duration) {
retries++
if retries == 3 {
cancel()
Expand Down
9 changes: 6 additions & 3 deletions joe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ data: world
func TestJoe_errors(t *testing.T) {
t.Parallel()

fin, err := sse.NewFiniteReplayProvider(2, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

j := &sse.Joe{
ReplayProvider: &sse.FiniteReplayProvider{Count: 1},
ReplayProvider: fin,
}
defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant

Expand All @@ -247,7 +250,7 @@ func TestJoe_errors(t *testing.T) {
return callErr
})

err := j.Subscribe(context.Background(), sse.Subscription{
err = j.Subscribe(context.Background(), sse.Subscription{
Client: client,
LastEventID: sse.ID("0"),
Topics: []string{sse.DefaultTopic},
Expand All @@ -274,7 +277,7 @@ func TestJoe_errors(t *testing.T) {

err = j.Subscribe(ctx, sse.Subscription{Client: client, Topics: []string{sse.DefaultTopic}})
tests.Equal(t, err, callErr, "error not received from send")
tests.Equal(t, called, 1, "callback was called after subscribe returned")
tests.Equal(t, called, 0, "callback was called after subscribe returned")

<-done
}
Expand Down
111 changes: 91 additions & 20 deletions replay_provider.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,127 @@
package sse

import (
"errors"
"strconv"
"time"
)

// NewFiniteReplayProvider creates a finite replay provider with the given max
// count and auto ID behaviour.
//
// Count is the maximum number of events FiniteReplayProvider should hold as
// valid. It must be greater than zero.
//
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of
// events.
func NewFiniteReplayProvider(
count int, autoIDs bool,
) (*FiniteReplayProvider, error) {
if count < 2 {
return nil, errors.New("count must be at least 2")
}

return &FiniteReplayProvider{
cap: count,
buf: make([]messageWithTopics, count),
autoIDs: autoIDs,
}, nil
}

// FiniteReplayProvider is a replay provider that replays at maximum a certain number of events.
// The events must have an ID unless the AutoIDs flag is toggled.
type FiniteReplayProvider struct {
b buffer

// Count is the maximum number of events FiniteReplayProvider should hold as valid.
// It must be a positive integer, or the code will panic.
Count int
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of events.
AutoIDs bool
buf []messageWithTopics
cap int
head int
tail int
autoIDs bool
currentID int64
}

// Put puts a message into the provider's buffer. If there are more messages than the maximum
// number, the oldest message is removed.
func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message {
if f.b == nil {
f.b = getBuffer(f.AutoIDs, f.Count)
if len(topics) == 0 {
panic(errors.New(
"go-sse: no topics provided for Message.\n" +
formatMessagePanicString(message)))
}

if f.autoIDs {
f.currentID++

message.ID = ID(strconv.FormatInt(f.currentID, 10))

Check warning on line 54 in replay_provider.go

View check run for this annotation

Codecov / codecov/patch

replay_provider.go#L52-L54

Added lines #L52 - L54 were not covered by tests
} else if !message.ID.IsSet() {
panicString := "go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.\n" + formatMessagePanicString(message)

panic(errors.New(panicString))
}

if f.b.len() == f.b.cap() {
f.b.dequeue()
f.buf[f.tail] = messageWithTopics{message: message, topics: topics}

f.tail++
if f.tail >= f.cap {
f.tail = 0
}

return f.b.queue(message, topics)
if f.tail == f.head {
f.head = f.tail + 1

if f.head > f.cap {
f.head = 0
}

Check warning on line 73 in replay_provider.go

View check run for this annotation

Codecov / codecov/patch

replay_provider.go#L72-L73

Added lines #L72 - L73 were not covered by tests
}

return message
}

// Replay replays the messages in the buffer to the listener.
// It doesn't take into account the messages' expiry times.
func (f *FiniteReplayProvider) Replay(subscription Subscription) error {
if f.b == nil {
if f.head == f.tail {
return nil
}

events := f.b.slice(subscription.LastEventID)
if len(events) == 0 {
return nil
// Replay head to end and start to tail when head is after tail.
if f.tail < f.head {
foundFirst, err := replay(subscription, f.buf[f.tail:], false)
if err != nil {
return err
}

_, err = replay(subscription, f.buf[0:f.tail], foundFirst)
if err != nil {
return err
}

Check warning on line 96 in replay_provider.go

View check run for this annotation

Codecov / codecov/patch

replay_provider.go#L95-L96

Added lines #L95 - L96 were not covered by tests
} else {
_, err := replay(subscription, f.buf[0:f.tail], false)
if err != nil {
return err
}

Check warning on line 101 in replay_provider.go

View check run for this annotation

Codecov / codecov/patch

replay_provider.go#L100-L101

Added lines #L100 - L101 were not covered by tests
}

return subscription.Client.Flush()
}

func replay(
sub Subscription, events []messageWithTopics, foundFirstEvent bool,
) (hasFoundFirstEvent bool, err error) {
for _, e := range events {
if topicsIntersect(subscription.Topics, e.topics) {
if err := subscription.Client.Send(e.message); err != nil {
return err
if !foundFirstEvent && e.message.ID == sub.LastEventID {
foundFirstEvent = true

continue
}

if foundFirstEvent && topicsIntersect(sub.Topics, e.topics) {
if err := sub.Client.Send(e.message); err != nil {
return false, err
}
}
}

return subscription.Client.Flush()
return foundFirstEvent, nil
}

// ValidReplayProvider is a ReplayProvider that replays all the buffered non-expired events.
Expand Down
79 changes: 74 additions & 5 deletions replay_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sse_test

import (
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -108,7 +110,13 @@ func TestValidReplayProvider(t *testing.T) {
func TestFiniteReplayProvider(t *testing.T) {
t.Parallel()

p := &sse.FiniteReplayProvider{Count: 3}
_, err := sse.NewFiniteReplayProvider(1, false)
if err == nil {
t.Fatal("should not create FiniteReplayProvider with count less than 2")
}

p, err := sse.NewFiniteReplayProvider(3, false)
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

tests.Equal(t, p.Replay(sse.Subscription{}), nil, "replay failed on provider without messages")

Expand Down Expand Up @@ -142,11 +150,72 @@ The message is the following:
tests.Equal(t, replayed.String(), "id: 4\ndata: world\n\n", "invalid replayed message")

p.Put(msg(t, "", "5"), []string{"t"})
p.Put(msg(t, "", "6"), []string{"t"})
p.Put(msg(t, "again", "7"), []string{sse.DefaultTopic})
p.Put(msg(t, "again", "6"), []string{sse.DefaultTopic})

replayed = replay(t, p, sse.ID("4"), sse.DefaultTopic, "topic with no messages")[0]
tests.Equal(t, replayed.String(), "id: 7\ndata: again\n\n", "invalid replayed message")
tests.Equal(t, replayed.String(), "id: 6\ndata: again\n\n", "invalid replayed message")

tr, err := sse.NewFiniteReplayProvider(10, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

testReplayError(t, tr, nil)
}

func TestFiniteReplayProvider_allocations(t *testing.T) {
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
p, err := sse.NewFiniteReplayProvider(3, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

const runs = 100

topics := []string{sse.DefaultTopic}
// Add one to the number of runs to take the warmup run of
// AllocsPerRun() into account.
queue := make([]*sse.Message, runs+1)
lastID := runs

for i := 0; i < len(queue); i++ {
queue[i] = msg(t,
fmt.Sprintf("message %d", i),
strconv.Itoa(i),
)
}

var run int

avgAllocs := testing.AllocsPerRun(runs, func() {
_ = p.Put(queue[run], topics)

run++
})

tests.Equal(t, avgAllocs, 0, "no allocations should be made on Put()")

var replayCount int

cb := mockClient(func(m *sse.Message) error {
if m != nil {
replayCount++
}

return nil
})

sub := sse.Subscription{
Client: cb,
Topics: topics,
}

sub.LastEventID = sse.ID(strconv.Itoa(lastID - 3))

err = p.Replay(sub)
tests.Equal(t, err, nil, "replay from fourth last should succeed")

tests.Equal(t, replayCount, 0, "replay from fourth last should not yield messages")

sub.LastEventID = sse.ID(strconv.Itoa(lastID - 2))

err = p.Replay(sub)
tests.Equal(t, err, nil, "replay from third last should succeed")

testReplayError(t, &sse.FiniteReplayProvider{Count: 10}, nil)
tests.Equal(t, replayCount, 2, "replay from third last should yield 2 messages")
}
Loading