Skip to content

Commit

Permalink
fix(store): properly update store head (#207)
Browse files Browse the repository at this point in the history
## Overview

The main idea of this PR is to make `Store[H].Head` working properly, to
be precise: returning head that was written to the disk (*). Along with
that `heightSub.height` is increased monotonically to prevent bugs when
we have store appends out of order.

To test everything I'm adding 2 new tests: one that verifies out of
order appends and another when which does this concurrently. Which
helped to find 2 or even 3 edge cases during coding.

Fixes #201
  • Loading branch information
cristaloleg authored Oct 22, 2024
1 parent 4b626c3 commit 18f0eb1
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 89 deletions.
36 changes: 22 additions & 14 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand All @@ -28,27 +29,35 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] {
}
}

// Height reports current height.
func (hs *heightSub[H]) Height() uint64 {
return hs.height.Load()
func (hs *heightSub[H]) isInited() bool {
return hs.height.Load() != 0
}

// SetHeight sets the new head height for heightSub.
func (hs *heightSub[H]) SetHeight(height uint64) {
hs.height.Store(height)
// setHeight sets the new head height for heightSub.
// Only higher than current height can be set.
func (hs *heightSub[H]) setHeight(height uint64) {
for {
curr := hs.height.Load()
if curr >= height {
return
}
if hs.height.CompareAndSwap(curr, height) {
return
}
}
}

// Sub subscribes for a header of a given height.
// It can return errElapsedHeight, which means a requested header was already provided
// and caller should get it elsewhere.
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
var zero H
if hs.Height() >= height {
if hs.height.Load() >= height {
return zero, errElapsedHeight
}

hs.heightReqsLk.Lock()
if hs.Height() >= height {
if hs.height.Load() >= height {
// This is a rare case we have to account for.
// The lock above can park a goroutine long enough for hs.height to change for a requested height,
// leaving the request never fulfilled and the goroutine deadlocked.
Expand Down Expand Up @@ -81,21 +90,20 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {

// Pub processes all the outstanding subscriptions matching the given headers.
// Pub is only safe when called from one goroutine.
// For Pub to work correctly, heightSub has to be initialized with SetHeight
// For Pub to work correctly, heightSub has to be initialized with setHeight
// so that given headers are contiguous to the height on heightSub.
func (hs *heightSub[H]) Pub(headers ...H) {
ln := len(headers)
if ln == 0 {
return
}

height := hs.Height()
from, to := headers[0].Height(), headers[ln-1].Height()
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1
log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from)
return
if from > to {
panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to))
}
hs.SetHeight(to)

hs.setHeight(to)

hs.heightReqsLk.Lock()
defer hs.heightReqsLk.Unlock()
Expand Down
64 changes: 63 additions & 1 deletion store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestHeightSub(t *testing.T) {
{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.SetHeight(99)
hs.setHeight(99)
hs.Pub(h)

h, err := hs.Sub(ctx, 10)
Expand All @@ -47,6 +47,68 @@ func TestHeightSub(t *testing.T) {
}
}

// Test heightSub can accept non-adj headers without a problem.
func TestHeightSubNonAdjacement(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.setHeight(99)
hs.Pub(h)
}

{
go func() {
// fixes flakiness on CI
time.Sleep(time.Millisecond)

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}()

h, err := hs.Sub(ctx, 200)
assert.NoError(t, err)
assert.NotNil(t, h)
}
}

func TestHeightSub_monotonicHeight(t *testing.T) {
hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.setHeight(99)
hs.Pub(h)
}

{
h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}

{

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 120
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 130
hs.Pub(h1, h2)
}

assert.Equal(t, hs.height.Load(), uint64(300))
}

func TestHeightSubCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
116 changes: 80 additions & 36 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package store

import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"sync/atomic"
"time"

Expand Down Expand Up @@ -51,6 +53,7 @@ type Store[H header.Header[H]] struct {
writesDn chan struct{}
// writeHead maintains the current write head
writeHead atomic.Pointer[H]

// pending keeps headers pending to be written in one batch
pending *batch[H]

Expand Down Expand Up @@ -112,7 +115,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
}

func (s *Store[H]) Init(ctx context.Context, initial H) error {
if s.heightSub.Height() != 0 {
if s.heightSub.isInited() {
return errors.New("store already initialized")
}
// trust the given header as the initial head
Expand Down Expand Up @@ -164,27 +167,37 @@ func (s *Store[H]) Stop(ctx context.Context) error {
}

func (s *Store[H]) Height() uint64 {
return s.heightSub.Height()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

head, err := s.Head(ctx)
if err != nil {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, datastore.ErrNotFound) {
return 0
}
panic(err)
}
return head.Height()
}

// Head returns the highest contiguous header written to the store.
func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
head, err := s.GetByHeight(ctx, s.heightSub.Height())
if err == nil {
return head, nil
headPtr := s.writeHead.Load()
if headPtr != nil {
return *headPtr, nil
}

var zero H
head, err = s.readHead(ctx)
switch {
default:
head, err := s.readHead(ctx)
if err != nil {
var zero H
return zero, err
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound):
return zero, header.ErrNoHead
case err == nil:
s.heightSub.SetHeight(head.Height())
log.Infow("loaded head", "height", head.Height(), "hash", head.Hash())
return head, nil
}

s.writeHead.CompareAndSwap(nil, &head)

return head, nil
}

func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
Expand Down Expand Up @@ -231,12 +244,16 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
return h, nil
}

return s.getByHeight(ctx, height)
}

func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
var zero H
hash, err := s.heightIndex.HashByHeight(ctx, height)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return zero, header.ErrNotFound
}

return zero, err
}

Expand Down Expand Up @@ -300,29 +317,27 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool {
return height != uint64(0) && s.Height() >= height
}

// Append the given headers to the store. Real write to the disk happens
// asynchronously and might fail without reporting error (just logging).
func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
lh := len(headers)
if lh == 0 {
return nil
}

var err error
// take current write head to verify headers against
var head H
headPtr := s.writeHead.Load()
if headPtr == nil {
head, err = s.Head(ctx)
if err != nil {
return err
}
} else {
head = *headPtr
head, err := s.Head(ctx)
if err != nil {
return err
}

slices.SortFunc(headers, func(a, b H) int {
return cmp.Compare(a.Height(), b.Height())
})

// collect valid headers
verified := make([]H, 0, lh)
for i, h := range headers {

err = head.Verify(h)
if err != nil {
var verErr *header.VerifyError
Expand All @@ -346,27 +361,19 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
head = h
}

onWrite := func() {
newHead := verified[len(verified)-1]
s.writeHead.Store(&newHead)
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
s.metrics.newHead(newHead.Height())
}

// queue headers to be written on disk
select {
case s.writes <- verified:
// we return an error here after writing,
// as there might be an invalid header in between of a given range
onWrite()
return err
default:
s.metrics.writesQueueBlocked(ctx)
}

// if the writes queue is full, we block until it is not
select {
case s.writes <- verified:
onWrite()
return err
case <-s.writesDn:
return errStoppedStore
Expand Down Expand Up @@ -413,6 +420,8 @@ func (s *Store[H]) flushLoop() {
time.Sleep(sleep)
}

s.tryAdvanceHead(ctx, toFlush...)

s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
// reset pending
s.pending.Reset()
Expand Down Expand Up @@ -501,6 +510,41 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
return data, nil
}

// try advance heighest writeHead based on passed or already written headers.
func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) {
writeHead := s.writeHead.Load()
if writeHead == nil || len(headers) == 0 {
return
}

currHeight := (*writeHead).Height()

// advance based on passed headers.
for i := 0; i < len(headers); i++ {
if headers[i].Height() != currHeight+1 {
break
}
newHead := headers[i]
s.writeHead.Store(&newHead)
currHeight++
}

// TODO(cristaloleg): benchmark this timeout or make it dynamic.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// advance based on already written headers.
for {
h, err := s.getByHeight(ctx, currHeight+1)
if err != nil {
break
}
newHead := h
s.writeHead.Store(&newHead)
currHeight++
}
}

// indexTo saves mapping between header Height and Hash to the given batch.
func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error {
for _, h := range headers {
Expand Down
Loading

0 comments on commit 18f0eb1

Please sign in to comment.