From 7e1f50d17ecb753fd92abc59385046f00cc1a8d2 Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Mon, 5 Feb 2024 08:34:04 +0100 Subject: [PATCH 1/9] use circular buffer in FiniteReplayProvider Uses NewFiniteReplayProvider() to instantiate the provider instead of direct literal. This allows catching count misconfiguration at instantiation (typically application startup) instead as a later panic. Synchronises access to the underlying buffer and head and tail state using an RWMutex. --- joe_test.go | 7 ++- replay_provider.go | 122 ++++++++++++++++++++++++++++++++++++++------- replay_test.go | 8 ++- 3 files changed, 116 insertions(+), 21 deletions(-) diff --git a/joe_test.go b/joe_test.go index 05c6d33..fe0f25b 100644 --- a/joe_test.go +++ b/joe_test.go @@ -229,8 +229,11 @@ data: world func TestJoe_errors(t *testing.T) { t.Parallel() + fin, err := sse.NewFiniteReplayProvider(1, false) + tests.Equal(t, err, nil, "should create new FiniteReplayProvider") + j := &sse.Joe{ - ReplayProvider: &sse.FiniteReplayProvider{Count: 1}, + ReplayProvider: fin, } defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant @@ -247,7 +250,7 @@ func TestJoe_errors(t *testing.T) { return callErr }) - err := j.Subscribe(context.Background(), sse.Subscription{ + err = j.Subscribe(context.Background(), sse.Subscription{ Client: client, LastEventID: sse.ID("0"), Topics: []string{sse.DefaultTopic}, diff --git a/replay_provider.go b/replay_provider.go index d178625..d483958 100644 --- a/replay_provider.go +++ b/replay_provider.go @@ -1,48 +1,136 @@ package sse import ( + "errors" + "strconv" + "sync" "time" ) +// NewFiniteReplayProvider creates a finite replay provider with the given max +// count and auto ID behaviour. +// +// Count is the maximum number of events FiniteReplayProvider should hold as +// valid. It must be greater than zero. +// +// AutoIDs configures FiniteReplayProvider to automatically set the IDs of +// events. +func NewFiniteReplayProvider( + count int, autoIDs bool, +) (*FiniteReplayProvider, error) { + if count < 1 { + return nil, errors.New("count must be greater than zero") + } + + return &FiniteReplayProvider{ + cap: count, + buf: make([]messageWithTopics, count), + count: count, + autoIDs: autoIDs, + }, nil +} + // FiniteReplayProvider is a replay provider that replays at maximum a certain number of events. // The events must have an ID unless the AutoIDs flag is toggled. type FiniteReplayProvider struct { - b buffer - - // Count is the maximum number of events FiniteReplayProvider should hold as valid. - // It must be a positive integer, or the code will panic. - Count int - // AutoIDs configures FiniteReplayProvider to automatically set the IDs of events. - AutoIDs bool + mut sync.RWMutex + cap int + buf []messageWithTopics + head int + tail int + count int + autoIDs bool + currentID int64 } // Put puts a message into the provider's buffer. If there are more messages than the maximum // number, the oldest message is removed. func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message { - if f.b == nil { - f.b = getBuffer(f.AutoIDs, f.Count) + if len(topics) == 0 { + panic(errors.New( + "go-sse: no topics provided for Message.\n" + + formatMessagePanicString(message))) } - if f.b.len() == f.b.cap() { - f.b.dequeue() + f.mut.Lock() + defer f.mut.Unlock() + + if f.autoIDs { + f.currentID++ + + message.ID = ID(strconv.FormatInt(f.currentID, 10)) + } else if !message.ID.IsSet() { + panicString := "go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.\n" + formatMessagePanicString(message) + + panic(errors.New(panicString)) + } + + f.buf[f.tail] = messageWithTopics{message: message, topics: topics} + + f.tail++ + if f.tail >= f.cap { + f.tail = 0 } - return f.b.queue(message, topics) + if f.tail == f.head { + f.head = f.tail + 1 + + if f.head > f.cap { + f.head = 0 + } + } + + return message } // Replay replays the messages in the buffer to the listener. // It doesn't take into account the messages' expiry times. func (f *FiniteReplayProvider) Replay(subscription Subscription) error { - if f.b == nil { + f.mut.RLock() + + if f.head == f.tail { + f.mut.RUnlock() return nil } - events := f.b.slice(subscription.LastEventID) - if len(events) == 0 { - return nil + // The assumption here is that most of the lifetime of an application + // the circular buffer would be full, so we might as well allocate in + // one go. + vs := make([]messageWithTopics, 0, f.cap) + + var n int + + // Copy head to end and start to tail when head is after tail. + if f.tail < f.head { + n = f.cap - f.tail + copy(vs[0:n], f.buf[f.tail:]) + copy(vs[n:n+f.tail], f.buf[0:f.tail]) + // If the head is after the tail the buffer is full. + n = f.cap + } else { + n = f.tail - f.head + copy(vs[0:n], f.buf[f.head:f.tail]) + } + + f.mut.RUnlock() + + values := vs[0:n] + + for i := range values { + // This could be done as part of the the initial copy to vs, + // leaving it as a separate op for now. + if values[i].message.ID.value == subscription.LastEventID.value { + if i == len(values)-1 { + values = nil + } else { + values = values[i+1:] + } + + break + } } - for _, e := range events { + for _, e := range values { if topicsIntersect(subscription.Topics, e.topics) { if err := subscription.Client.Send(e.message); err != nil { return err diff --git a/replay_test.go b/replay_test.go index 301f6c9..03d513d 100644 --- a/replay_test.go +++ b/replay_test.go @@ -108,7 +108,8 @@ func TestValidReplayProvider(t *testing.T) { func TestFiniteReplayProvider(t *testing.T) { t.Parallel() - p := &sse.FiniteReplayProvider{Count: 3} + p, err := sse.NewFiniteReplayProvider(3, false) + tests.Equal(t, err, nil, "should create new FiniteReplayProvider") tests.Equal(t, p.Replay(sse.Subscription{}), nil, "replay failed on provider without messages") @@ -148,5 +149,8 @@ The message is the following: replayed = replay(t, p, sse.ID("4"), sse.DefaultTopic, "topic with no messages")[0] tests.Equal(t, replayed.String(), "id: 7\ndata: again\n\n", "invalid replayed message") - testReplayError(t, &sse.FiniteReplayProvider{Count: 10}, nil) + tr, err := sse.NewFiniteReplayProvider(10, false) + tests.Equal(t, err, nil, "should create new FiniteReplayProvider") + + testReplayError(t, tr, nil) } From 5043b145d9724a09e91bb9e60127a5fc9cd1921b Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Mon, 5 Feb 2024 17:41:10 +0100 Subject: [PATCH 2/9] remove unused count field in FiniteReplayProvider Co-authored-by: Teodor Maxim <57960185+tmaxmax@users.noreply.github.com> --- replay_provider.go | 1 - 1 file changed, 1 deletion(-) diff --git a/replay_provider.go b/replay_provider.go index d483958..ad64751 100644 --- a/replay_provider.go +++ b/replay_provider.go @@ -38,7 +38,6 @@ type FiniteReplayProvider struct { buf []messageWithTopics head int tail int - count int autoIDs bool currentID int64 } From 3d87c9d973aafff463dc44437fb027f885177cb1 Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Mon, 5 Feb 2024 18:23:42 +0100 Subject: [PATCH 3/9] remove RWMutex and last reference to count --- replay_provider.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/replay_provider.go b/replay_provider.go index ad64751..2b2eef4 100644 --- a/replay_provider.go +++ b/replay_provider.go @@ -3,7 +3,6 @@ package sse import ( "errors" "strconv" - "sync" "time" ) @@ -25,7 +24,6 @@ func NewFiniteReplayProvider( return &FiniteReplayProvider{ cap: count, buf: make([]messageWithTopics, count), - count: count, autoIDs: autoIDs, }, nil } @@ -33,7 +31,6 @@ func NewFiniteReplayProvider( // FiniteReplayProvider is a replay provider that replays at maximum a certain number of events. // The events must have an ID unless the AutoIDs flag is toggled. type FiniteReplayProvider struct { - mut sync.RWMutex cap int buf []messageWithTopics head int @@ -51,9 +48,6 @@ func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message { formatMessagePanicString(message))) } - f.mut.Lock() - defer f.mut.Unlock() - if f.autoIDs { f.currentID++ @@ -85,10 +79,7 @@ func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message { // Replay replays the messages in the buffer to the listener. // It doesn't take into account the messages' expiry times. func (f *FiniteReplayProvider) Replay(subscription Subscription) error { - f.mut.RLock() - if f.head == f.tail { - f.mut.RUnlock() return nil } @@ -111,8 +102,6 @@ func (f *FiniteReplayProvider) Replay(subscription Subscription) error { copy(vs[0:n], f.buf[f.head:f.tail]) } - f.mut.RUnlock() - values := vs[0:n] for i := range values { From 7663eba72df3c51938401bdd2b2380629d256438 Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Tue, 6 Feb 2024 07:01:49 +0100 Subject: [PATCH 4/9] update changelog --- CHANGELOG.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22583fb..f612bc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,20 @@ This file tracks changes to this project. It follows the [Keep a Changelog format](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Removed + +- `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead + +### Added + +- `NewFiniteReplayProvider` constructor + +### Fixed + +- `FiniteReplayProvider` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayProvider` would store and replay more events than it was configured to. + ## [0.8.0] - 2024-01-30 This version removes all external dependencies of `go-sse`. All our bugs are belong to us! It also does some API and documentation cleanups. From 2d03ee226e6aa027044bf4e0663db10e9b1873ed Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Tue, 6 Feb 2024 13:07:01 +0100 Subject: [PATCH 5/9] use replay helper function to avoid creating an intermediate buffer TestFiniteReplayProvider had to be adjusted, as it assumed that the event with the ID 4 would still be present after pushing events {5,6,7} into a finite replay provider with a count of 3. TestJoe_errors is failing, but I'm a bit unsure as to what it's actually testing. --- replay_provider.go | 59 +++++++++++++++++++++------------------------- replay_test.go | 5 ++-- 2 files changed, 29 insertions(+), 35 deletions(-) diff --git a/replay_provider.go b/replay_provider.go index 2b2eef4..bec918e 100644 --- a/replay_provider.go +++ b/replay_provider.go @@ -83,50 +83,45 @@ func (f *FiniteReplayProvider) Replay(subscription Subscription) error { return nil } - // The assumption here is that most of the lifetime of an application - // the circular buffer would be full, so we might as well allocate in - // one go. - vs := make([]messageWithTopics, 0, f.cap) - - var n int - - // Copy head to end and start to tail when head is after tail. + // Replay head to end and start to tail when head is after tail. if f.tail < f.head { - n = f.cap - f.tail - copy(vs[0:n], f.buf[f.tail:]) - copy(vs[n:n+f.tail], f.buf[0:f.tail]) - // If the head is after the tail the buffer is full. - n = f.cap + foundFirst, err := replay(subscription, f.buf[f.tail:], false) + if err != nil { + return err + } + + _, err = replay(subscription, f.buf[0:f.tail], foundFirst) + if err != nil { + return err + } } else { - n = f.tail - f.head - copy(vs[0:n], f.buf[f.head:f.tail]) + _, err := replay(subscription, f.buf[0:f.tail], false) + if err != nil { + return err + } } - values := vs[0:n] + return subscription.Client.Flush() +} - for i := range values { - // This could be done as part of the the initial copy to vs, - // leaving it as a separate op for now. - if values[i].message.ID.value == subscription.LastEventID.value { - if i == len(values)-1 { - values = nil - } else { - values = values[i+1:] - } +func replay( + sub Subscription, events []messageWithTopics, foundFirstEvent bool, +) (hasFoundFirstEvent bool, err error) { + for _, e := range events { + if !foundFirstEvent && e.message.ID == sub.LastEventID { + foundFirstEvent = true - break + continue } - } - for _, e := range values { - if topicsIntersect(subscription.Topics, e.topics) { - if err := subscription.Client.Send(e.message); err != nil { - return err + if foundFirstEvent && topicsIntersect(sub.Topics, e.topics) { + if err := sub.Client.Send(e.message); err != nil { + return false, err } } } - return subscription.Client.Flush() + return foundFirstEvent, nil } // ValidReplayProvider is a ReplayProvider that replays all the buffered non-expired events. diff --git a/replay_test.go b/replay_test.go index 03d513d..f6353d6 100644 --- a/replay_test.go +++ b/replay_test.go @@ -143,11 +143,10 @@ The message is the following: tests.Equal(t, replayed.String(), "id: 4\ndata: world\n\n", "invalid replayed message") p.Put(msg(t, "", "5"), []string{"t"}) - p.Put(msg(t, "", "6"), []string{"t"}) - p.Put(msg(t, "again", "7"), []string{sse.DefaultTopic}) + p.Put(msg(t, "again", "6"), []string{sse.DefaultTopic}) replayed = replay(t, p, sse.ID("4"), sse.DefaultTopic, "topic with no messages")[0] - tests.Equal(t, replayed.String(), "id: 7\ndata: again\n\n", "invalid replayed message") + tests.Equal(t, replayed.String(), "id: 6\ndata: again\n\n", "invalid replayed message") tr, err := sse.NewFiniteReplayProvider(10, false) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") From f8658469566839e64b03774b395d92dd3c10484e Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Fri, 9 Feb 2024 08:59:53 +0100 Subject: [PATCH 6/9] add allocations test for TestFiniteReplayProvider --- replay_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/replay_test.go b/replay_test.go index f6353d6..6fd1a02 100644 --- a/replay_test.go +++ b/replay_test.go @@ -1,6 +1,8 @@ package sse_test import ( + "fmt" + "strconv" "testing" "time" @@ -153,3 +155,62 @@ The message is the following: testReplayError(t, tr, nil) } + +func TestFiniteReplayProvider_allocations(t *testing.T) { + p, err := sse.NewFiniteReplayProvider(3, false) + tests.Equal(t, err, nil, "should create new FiniteReplayProvider") + + const runs = 100 + + topics := []string{sse.DefaultTopic} + // Add one to the number of runs to take the warmup run of + // AllocsPerRun() into account. + queue := make([]*sse.Message, runs+1) + lastID := runs + + for i := 0; i < len(queue); i++ { + queue[i] = msg(t, + fmt.Sprintf("message %d", i), + strconv.Itoa(i), + ) + } + + var run int + + avgAllocs := testing.AllocsPerRun(runs, func() { + _ = p.Put(queue[run], topics) + + run++ + }) + + tests.Equal(t, avgAllocs, 0, "no allocations should be made on Put()") + + var replayCount int + + cb := mockClient(func(m *sse.Message) error { + if m != nil { + replayCount++ + } + + return nil + }) + + sub := sse.Subscription{ + Client: cb, + Topics: topics, + } + + sub.LastEventID = sse.ID(strconv.Itoa(lastID - 3)) + + err = p.Replay(sub) + tests.Equal(t, err, nil, "replay from fourth last should succeed") + + tests.Equal(t, replayCount, 0, "replay from fourth last should not yield messages") + + sub.LastEventID = sse.ID(strconv.Itoa(lastID - 2)) + + err = p.Replay(sub) + tests.Equal(t, err, nil, "replay from third last should succeed") + + tests.Equal(t, replayCount, 2, "replay from third last should yield 2 messages") +} From cf78cef64858debc43b8e37fecdb94e284c97df8 Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Fri, 9 Feb 2024 11:10:47 +0100 Subject: [PATCH 7/9] Update CHANGELOG.md Co-authored-by: Teodor Maxim <57960185+tmaxmax@users.noreply.github.com> --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f612bc1..ff44da5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ This file tracks changes to this project. It follows the [Keep a Changelog forma - `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead +### Changed + +- Due to a change in the internal implementation, the `FiniteReplayProvider` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped. ### Added - `NewFiniteReplayProvider` constructor From 26c6b7c2bba2b93ae46772c31ffafd226ea7e484 Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Fri, 9 Feb 2024 11:16:06 +0100 Subject: [PATCH 8/9] require at least count of 2 for FiniteReplayProvider update test to check NewFiniteReplayProvider() count error case. --- replay_provider.go | 4 ++-- replay_test.go | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/replay_provider.go b/replay_provider.go index bec918e..e5ceede 100644 --- a/replay_provider.go +++ b/replay_provider.go @@ -17,8 +17,8 @@ import ( func NewFiniteReplayProvider( count int, autoIDs bool, ) (*FiniteReplayProvider, error) { - if count < 1 { - return nil, errors.New("count must be greater than zero") + if count < 2 { + return nil, errors.New("count must be at least 2") } return &FiniteReplayProvider{ diff --git a/replay_test.go b/replay_test.go index 6fd1a02..b1700ee 100644 --- a/replay_test.go +++ b/replay_test.go @@ -110,6 +110,11 @@ func TestValidReplayProvider(t *testing.T) { func TestFiniteReplayProvider(t *testing.T) { t.Parallel() + _, err := sse.NewFiniteReplayProvider(1, false) + if err == nil { + t.Fatal("should not create FiniteReplayProvider with count less than 2") + } + p, err := sse.NewFiniteReplayProvider(3, false) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") From f0b8ad1e3e8113de4b14dcc93bc72ed4e4bd33cb Mon Sep 17 00:00:00 2001 From: Hugo Wetterberg Date: Mon, 19 Feb 2024 08:37:02 +0100 Subject: [PATCH 9/9] fix test and linting errors --- client_test.go | 6 +++--- joe_test.go | 4 ++-- replay_provider.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/client_test.go b/client_test.go index 5bd9cdf..235961c 100644 --- a/client_test.go +++ b/client_test.go @@ -181,7 +181,7 @@ func TestConnection_Connect_resetBody(t *testing.T) { }, } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { time.Sleep(time.Millisecond * 5) })) defer ts.Close() @@ -250,7 +250,7 @@ func TestConnection_Connect_validator(t *testing.T) { }, } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) defer ts.Close() c := &sse.Client{ @@ -611,7 +611,7 @@ func TestConnection_reconnect(t *testing.T) { retries := 0 c := sse.Client{ HTTPClient: ts.Client(), - OnRetry: func(err error, d time.Duration) { + OnRetry: func(_ error, _ time.Duration) { retries++ if retries == 3 { cancel() diff --git a/joe_test.go b/joe_test.go index fe0f25b..bcca1fb 100644 --- a/joe_test.go +++ b/joe_test.go @@ -229,7 +229,7 @@ data: world func TestJoe_errors(t *testing.T) { t.Parallel() - fin, err := sse.NewFiniteReplayProvider(1, false) + fin, err := sse.NewFiniteReplayProvider(2, false) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") j := &sse.Joe{ @@ -277,7 +277,7 @@ func TestJoe_errors(t *testing.T) { err = j.Subscribe(ctx, sse.Subscription{Client: client, Topics: []string{sse.DefaultTopic}}) tests.Equal(t, err, callErr, "error not received from send") - tests.Equal(t, called, 1, "callback was called after subscribe returned") + tests.Equal(t, called, 0, "callback was called after subscribe returned") <-done } diff --git a/replay_provider.go b/replay_provider.go index e5ceede..81b9356 100644 --- a/replay_provider.go +++ b/replay_provider.go @@ -31,8 +31,8 @@ func NewFiniteReplayProvider( // FiniteReplayProvider is a replay provider that replays at maximum a certain number of events. // The events must have an ID unless the AutoIDs flag is toggled. type FiniteReplayProvider struct { - cap int buf []messageWithTopics + cap int head int tail int autoIDs bool