Skip to content

Commit

Permalink
feat(store): heightSub doesn't require adjacement headers
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jun 11, 2024
1 parent 72d6506 commit f01ab8e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 17 deletions.
38 changes: 21 additions & 17 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,17 @@ func (hs *heightSub[H]) Height() uint64 {
}

// SetHeight sets the new head height for heightSub.
// Only the higher height can be set, otherwise no-op.
func (hs *heightSub[H]) SetHeight(height uint64) {
hs.height.Store(height)
for {
curr := hs.height.Load()
if curr > height {
return
}
if hs.height.CompareAndSwap(curr, height) {
return
}
}
}

// Sub subscribes for a header of a given height.
Expand Down Expand Up @@ -89,12 +98,7 @@ func (hs *heightSub[H]) Pub(headers ...H) {
return
}

height := hs.Height()
from, to := headers[0].Height(), headers[ln-1].Height()
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1
log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from)
return
}
hs.SetHeight(to)

hs.heightReqsLk.Lock()
Expand All @@ -114,17 +118,17 @@ func (hs *heightSub[H]) Pub(headers ...H) {
return
}

// instead of looping over each header in 'headers', we can loop over each request
// which will drastically decrease idle iterations, as there will be less requests than headers
for height, reqs := range hs.heightReqs {
// then we look if any of the requests match the given range of headers
if height >= from && height <= to {
// and if so, calculate its position and fulfill requests
h := headers[height-from]
for req := range reqs {
req <- h // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, height)
for _, h := range headers {
height := h.Height()

reqs, ok := hs.heightReqs[height]
if !ok {
continue
}

for req := range reqs {
req <- h // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, height)
}
}
31 changes: 31 additions & 0 deletions store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ func TestHeightSub(t *testing.T) {
}
}

func TestHeightSubNonAdjacement(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.SetHeight(99)
hs.Pub(h)
}

{
go func() {
// fixes flakiness on CI
time.Sleep(time.Millisecond)

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}()

h, err := hs.Sub(ctx, 200)
assert.NoError(t, err)
assert.NotNil(t, h)
}
}

func TestHeightSubCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
6 changes: 6 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package store

import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"sync/atomic"
"time"

Expand Down Expand Up @@ -319,6 +321,10 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
head = *headPtr
}

slices.SortFunc(headers, func(a, b H) int {
return cmp.Compare(a.Height(), b.Height())
})

// collect valid headers
verified := make([]H, 0, lh)
for i, h := range headers {
Expand Down

0 comments on commit f01ab8e

Please sign in to comment.