Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): heightSub doesn't require adjacent headers #197

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,17 @@ func (hs *heightSub[H]) Height() uint64 {
}

// SetHeight sets the new head height for heightSub.
// Only the higher height can be set, otherwise no-op.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change heightSub was vulnerable against setting lower height. I bet we had a place that did this but by dropping adjacency requirement better to make it resilient against this. Objections?

On the other side with a new headers sorting in store.Store.Append we cannot set lower header anymore (even if 2 parallel calls to .Append will happen we still write them sequentially).

(hm, what if we write [100 .. 150) and then [90..100))

func (hs *heightSub[H]) SetHeight(height uint64) {
hs.height.Store(height)
for {
curr := hs.height.Load()
if curr > height {
return
}
if hs.height.CompareAndSwap(curr, height) {
return
}
}
}

// Sub subscribes for a header of a given height.
Expand Down Expand Up @@ -89,12 +98,7 @@ func (hs *heightSub[H]) Pub(headers ...H) {
return
}

height := hs.Height()
from, to := headers[0].Height(), headers[ln-1].Height()
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1
Copy link
Contributor Author

@cristaloleg cristaloleg Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I mentioned in our Slack conversation. We don't wait adjacent headers anymore + don't fail.

log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from)
return
}
hs.SetHeight(to)

hs.heightReqsLk.Lock()
Expand All @@ -114,17 +118,17 @@ func (hs *heightSub[H]) Pub(headers ...H) {
return
}

// instead of looping over each header in 'headers', we can loop over each request
// which will drastically decrease idle iterations, as there will be less requests than headers
for height, reqs := range hs.heightReqs {
// then we look if any of the requests match the given range of headers
if height >= from && height <= to {
// and if so, calculate its position and fulfill requests
h := headers[height-from]
for req := range reqs {
req <- h // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, height)
for _, h := range headers {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the places where we assume continuous range of headers. If headers doesn't have gaps then headers[height-from] works fine (line 123 above).

I will revert this change, this was done purely for the discussion.

height := h.Height()

reqs, ok := hs.heightReqs[height]
if !ok {
continue
}

for req := range reqs {
req <- h // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, height)
}
}
31 changes: 31 additions & 0 deletions store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ func TestHeightSub(t *testing.T) {
}
}

func TestHeightSubNonAdjacement(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.SetHeight(99)
hs.Pub(h)
}

{
go func() {
// fixes flakiness on CI
time.Sleep(time.Millisecond)

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}()

h, err := hs.Sub(ctx, 200)
assert.NoError(t, err)
assert.NotNil(t, h)
}
}

func TestHeightSubCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
6 changes: 6 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package store

import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"sync/atomic"
"time"

Expand Down Expand Up @@ -319,6 +321,10 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
head = *headPtr
}

slices.SortFunc(headers, func(a, b H) int {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort headers in ascending order before processing.

return cmp.Compare(a.Height(), b.Height())
})

// collect valid headers
verified := make([]H, 0, lh)
for i, h := range headers {
Expand Down
Loading