From 972c9b20dc469cc4a9d1807b001129fd9e9b421a Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 3 Jun 2024 16:47:20 +0200 Subject: [PATCH 1/3] rework again --- store/heightsub.go | 2 +- store/store.go | 12 ++++++------ sync/sync_head.go | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 2335001d..aa58338d 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -92,7 +92,7 @@ func (hs *heightSub[H]) Pub(headers ...H) { height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) + // log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) return } hs.SetHeight(to) diff --git a/store/store.go b/store/store.go index 4b12ffe5..0323301e 100644 --- a/store/store.go +++ b/store/store.go @@ -324,12 +324,12 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { for i, h := range headers { // currently store requires all headers to be appended sequentially and adjacently // TODO(@Wondertan): Further pruning friendly Store design should reevaluate this requirement - if h.Height() != head.Height()+1 { - return &header.ErrNonAdjacent{ - Head: head.Height(), - Attempted: h.Height(), - } - } + // if h.Height() != head.Height()+1 { + // return &header.ErrNonAdjacent{ + // Head: head.Height(), + // Attempted: h.Height(), + // } + // } err = head.Verify(h) if err != nil { diff --git a/sync/sync_head.go b/sync/sync_head.go index ff6c4f8b..c4d51382 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -119,8 +119,8 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { // * Remove ErrNonAdjacent // * Remove writeHead from the canonical store implementation err := s.store.Append(ctx, netHead) - var nonAdj *header.ErrNonAdjacent - if err != nil && !errors.As(err, &nonAdj) { + // var nonAdj *header.ErrNonAdjacent + if err != nil { //&& !errors.As(err, &nonAdj) { // might be a storage error or something else, but we can still try to continue processing netHead log.Errorw("storing new network header", "height", netHead.Height(), From 5a10a681ac1aa486656fb846834305da5fb2d84d Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 3 Jun 2024 17:08:34 +0200 Subject: [PATCH 2/3] move check to sync --- interface.go | 15 --------------- store/store.go | 11 ++--------- sync/sync_head.go | 4 ++-- sync/sync_store.go | 31 +++++++++++++++++++++++++++++++ 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/interface.go b/interface.go index 80835288..a2ab3e56 100644 --- a/interface.go +++ b/interface.go @@ -3,7 +3,6 @@ package header import ( "context" "errors" - "fmt" pubsub "github.com/libp2p/go-libp2p-pubsub" ) @@ -59,16 +58,6 @@ var ( ErrHeadersLimitExceeded = errors.New("header/p2p: header limit per 1 request exceeded") ) -// ErrNonAdjacent is returned when Store is appended with a header not adjacent to the stored head. -type ErrNonAdjacent struct { - Head uint64 - Attempted uint64 -} - -func (ena *ErrNonAdjacent) Error() string { - return fmt.Sprintf("header/store: non-adjacent: head %d, attempted %d", ena.Head, ena.Attempted) -} - // Store encompasses the behavior necessary to store and retrieve Headers // from a node's local storage. type Store[H Header[H]] interface { @@ -88,10 +77,6 @@ type Store[H Header[H]] interface { HasAt(context.Context, uint64) bool // Append stores and verifies the given Header(s). - // It requires them to be adjacent and in ascending order, - // as it applies them contiguously on top of the current head height. - // It returns the amount of successfully applied headers, - // so caller can understand what given header was invalid, if any. Append(context.Context, ...H) error // GetRange returns the range [from:to). diff --git a/store/store.go b/store/store.go index 0323301e..657a22c7 100644 --- a/store/store.go +++ b/store/store.go @@ -322,14 +322,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // collect valid headers verified := make([]H, 0, lh) for i, h := range headers { - // currently store requires all headers to be appended sequentially and adjacently - // TODO(@Wondertan): Further pruning friendly Store design should reevaluate this requirement - // if h.Height() != head.Height()+1 { - // return &header.ErrNonAdjacent{ - // Head: head.Height(), - // Attempted: h.Height(), - // } - // } err = head.Verify(h) if err != nil { @@ -350,7 +342,8 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // otherwise, stop the loop and apply headers appeared to be valid break } - verified, head = append(verified, h), h + verified = append(verified, h) + head = h } onWrite := func() { diff --git a/sync/sync_head.go b/sync/sync_head.go index c4d51382..c8776552 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -119,8 +119,8 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { // * Remove ErrNonAdjacent // * Remove writeHead from the canonical store implementation err := s.store.Append(ctx, netHead) - // var nonAdj *header.ErrNonAdjacent - if err != nil { //&& !errors.As(err, &nonAdj) { + var nonAdj *errNonAdjacent + if err != nil && !errors.As(err, &nonAdj) { // might be a storage error or something else, but we can still try to continue processing netHead log.Errorw("storing new network header", "height", netHead.Height(), diff --git a/sync/sync_store.go b/sync/sync_store.go index 4ea8ecf7..8254a6cb 100644 --- a/sync/sync_store.go +++ b/sync/sync_store.go @@ -2,11 +2,23 @@ package sync import ( "context" + "errors" + "fmt" "sync/atomic" "github.com/celestiaorg/go-header" ) +// errNonAdjacent is returned when syncer is appended with a header not adjacent to the stored head. +type errNonAdjacent struct { + Head uint64 + Attempted uint64 +} + +func (ena *errNonAdjacent) Error() string { + return fmt.Sprintf("sync: non-adjacent: head %d, attempted %d", ena.Head, ena.Attempted) +} + // syncStore is a Store wrapper that provides synchronization over writes and reads // for Head of underlying Store. Useful for Stores that do not guarantee synchrony between Append // and Head method. @@ -31,6 +43,25 @@ func (s *syncStore[H]) Head(ctx context.Context) (H, error) { } func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error { + if len(headers) == 0 { + return nil + } + + head, err := s.Head(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + panic(err) + } + + for _, h := range headers { + if h.Height() != head.Height()+1 { + return &errNonAdjacent{ + Head: head.Height(), + Attempted: h.Height(), + } + } + head = h + } + if err := s.Store.Append(ctx, headers...); err != nil { return err } From f10cce8834a8de9285a3ca1b6abd7a2e1b88fe82 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 3 Jun 2024 17:11:08 +0200 Subject: [PATCH 3/3] rollback logging --- store/heightsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/heightsub.go b/store/heightsub.go index aa58338d..2335001d 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -92,7 +92,7 @@ func (hs *heightSub[H]) Pub(headers ...H) { height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - // log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) + log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) return } hs.SetHeight(to)