diff --git a/interface.go b/interface.go index 80835288..a2ab3e56 100644 --- a/interface.go +++ b/interface.go @@ -3,7 +3,6 @@ package header import ( "context" "errors" - "fmt" pubsub "github.com/libp2p/go-libp2p-pubsub" ) @@ -59,16 +58,6 @@ var ( ErrHeadersLimitExceeded = errors.New("header/p2p: header limit per 1 request exceeded") ) -// ErrNonAdjacent is returned when Store is appended with a header not adjacent to the stored head. -type ErrNonAdjacent struct { - Head uint64 - Attempted uint64 -} - -func (ena *ErrNonAdjacent) Error() string { - return fmt.Sprintf("header/store: non-adjacent: head %d, attempted %d", ena.Head, ena.Attempted) -} - // Store encompasses the behavior necessary to store and retrieve Headers // from a node's local storage. type Store[H Header[H]] interface { @@ -88,10 +77,6 @@ type Store[H Header[H]] interface { HasAt(context.Context, uint64) bool // Append stores and verifies the given Header(s). - // It requires them to be adjacent and in ascending order, - // as it applies them contiguously on top of the current head height. - // It returns the amount of successfully applied headers, - // so caller can understand what given header was invalid, if any. Append(context.Context, ...H) error // GetRange returns the range [from:to). diff --git a/store/store.go b/store/store.go index 4b12ffe5..657a22c7 100644 --- a/store/store.go +++ b/store/store.go @@ -322,14 +322,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // collect valid headers verified := make([]H, 0, lh) for i, h := range headers { - // currently store requires all headers to be appended sequentially and adjacently - // TODO(@Wondertan): Further pruning friendly Store design should reevaluate this requirement - if h.Height() != head.Height()+1 { - return &header.ErrNonAdjacent{ - Head: head.Height(), - Attempted: h.Height(), - } - } err = head.Verify(h) if err != nil { @@ -350,7 +342,8 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // otherwise, stop the loop and apply headers appeared to be valid break } - verified, head = append(verified, h), h + verified = append(verified, h) + head = h } onWrite := func() { diff --git a/sync/sync_head.go b/sync/sync_head.go index ff6c4f8b..c8776552 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -119,7 +119,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { // * Remove ErrNonAdjacent // * Remove writeHead from the canonical store implementation err := s.store.Append(ctx, netHead) - var nonAdj *header.ErrNonAdjacent + var nonAdj *errNonAdjacent if err != nil && !errors.As(err, &nonAdj) { // might be a storage error or something else, but we can still try to continue processing netHead log.Errorw("storing new network header", diff --git a/sync/sync_store.go b/sync/sync_store.go index 4ea8ecf7..8254a6cb 100644 --- a/sync/sync_store.go +++ b/sync/sync_store.go @@ -2,11 +2,23 @@ package sync import ( "context" + "errors" + "fmt" "sync/atomic" "github.com/celestiaorg/go-header" ) +// errNonAdjacent is returned when syncer is appended with a header not adjacent to the stored head. +type errNonAdjacent struct { + Head uint64 + Attempted uint64 +} + +func (ena *errNonAdjacent) Error() string { + return fmt.Sprintf("sync: non-adjacent: head %d, attempted %d", ena.Head, ena.Attempted) +} + // syncStore is a Store wrapper that provides synchronization over writes and reads // for Head of underlying Store. Useful for Stores that do not guarantee synchrony between Append // and Head method. @@ -31,6 +43,25 @@ func (s *syncStore[H]) Head(ctx context.Context) (H, error) { } func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error { + if len(headers) == 0 { + return nil + } + + head, err := s.Head(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + panic(err) + } + + for _, h := range headers { + if h.Height() != head.Height()+1 { + return &errNonAdjacent{ + Head: head.Height(), + Attempted: h.Height(), + } + } + head = h + } + if err := s.Store.Append(ctx, headers...); err != nil { return err }