From c120282ca6fa8372681e64802c7585185f358b18 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 24 Sep 2024 16:03:20 +0200 Subject: [PATCH 1/9] feat(sync): bifurcation for syncTarget --- sync/sync_head.go | 75 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 11 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index c74347b7..f5d82181 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -3,6 +3,7 @@ package sync import ( "context" "errors" + "fmt" "time" "github.com/celestiaorg/go-header" @@ -173,22 +174,74 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { } var verErr *header.VerifyError - if errors.As(err, &verErr) && !verErr.SoftFailure { - logF := log.Warnw - if errors.Is(err, header.ErrKnownHeader) { - logF = log.Debugw - } - logF("invalid network header", - "height_of_invalid", newHead.Height(), - "hash_of_invalid", newHead.Hash(), - "height_of_subjective", sbjHead.Height(), - "hash_of_subjective", sbjHead.Hash(), - "reason", verErr.Reason) + if !errors.As(err, &verErr) { + return false, nil + } + + if verErr.SoftFailure { + err := s.verifySkipping(ctx, sbjHead.Height(), newHead) + return false, err + } + + logF := log.Warnw + if errors.Is(err, header.ErrKnownHeader) { + logF = log.Debugw } + logF("invalid network header", + "height_of_invalid", newHead.Height(), + "hash_of_invalid", newHead.Hash(), + "height_of_subjective", sbjHead.Height(), + "hash_of_subjective", sbjHead.Hash(), + "reason", verErr.Reason, + ) return verErr.SoftFailure, err } +/* +Subjective head is 500, network head is 1000. +Header at height 1000 does not have sufficient validator set overlap, +so the client downloads height 750 (which does have enough sufficient overlap), +verifies it against 500 and advances the subjective head to 750. + +Client tries to apply height 1000 against 750 and if there is sufficient overlap, +it applies 1000 as the subjective head. +If not, it downloads the halfway point and retries the process. +*/ +func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHeight uint64, networkHeader H) error { + diff := networkHeader.Height() - subjHeight + if diff <= 0 { + panic(fmt.Sprintf("implementation bug: diff is %d", diff)) + } + + for diff > 0 { + diff = diff / 2 + subjHeight += diff + + subjHeader, err := s.getter.GetByHeight(ctx, subjHeight) + if err != nil { + return err + } + + if err := header.Verify(subjHeader, networkHeader); err == nil { + return nil + } + } + return &NewValidatorSetCantBeTrustedError{ + NetHeadHeight: networkHeader.Height(), + NetHeadHash: networkHeader.Hash(), + } +} + +type NewValidatorSetCantBeTrustedError struct { + NetHeadHeight uint64 + NetHeadHash []byte +} + +func (e *NewValidatorSetCantBeTrustedError) Error() string { + return fmt.Sprintf("sync: new validator set cant be trusted: head %d, attempted %s", e.NetHeadHeight, e.NetHeadHash) +} + // isExpired checks if header is expired against trusting period. func isExpired[H header.Header[H]](header H, period time.Duration) bool { expirationTime := header.Time().Add(period) From dfd308d1eb9ccefd4a7062193a2b517d7381ab82 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 4 Oct 2024 13:27:34 +0200 Subject: [PATCH 2/9] fix --- sync/sync_head.go | 6 ++++++ sync/sync_head_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/sync/sync_head.go b/sync/sync_head.go index f5d82181..cec9a485 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -223,6 +223,12 @@ func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHeight uint64, netwo return err } + _, _ = s.subjectiveHead(ctx) + + if err := header.Verify(subjHeader, networkHeader); err == nil { + return nil + } + if err := header.Verify(subjHeader, networkHeader); err == nil { return nil } diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index eecd3a35..6ccb7308 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -95,6 +95,50 @@ func TestSyncer_HeadWithTrustedHead(t *testing.T) { require.True(t, wrappedGetter.withTrustedHead) } +func TestSyncer_HeadWithNotEnoughValidators(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + head := suite.Head() + + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) + + err := remoteStore.Append(ctx, suite.GenDummyHeaders(100)...) + require.NoError(t, err) + + // create a wrappedGetter to track exchange interactions + wrappedGetter := newWrappedGetter(local.NewExchange(remoteStore)) + + syncer, err := NewSyncer( + wrappedGetter, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Nanosecond), + WithRecencyThreshold(time.Nanosecond), // forces a request for a new sync target + // ensures that syncer's store contains a subjective head that is within + // the unbonding period so that the syncer can use a header from the network + // as a sync target + WithTrustingPeriod(time.Hour), + ) + require.NoError(t, err) + + // start the syncer which triggers a Head request that will + // load the syncer's subjective head from the store, and request + // a new sync target from the network rather than from trusted peers + err = syncer.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = syncer.Stop(ctx) + require.NoError(t, err) + }) + + // ensure the syncer really requested Head from the network + // rather than from trusted peers + require.True(t, wrappedGetter.withTrustedHead) +} + type wrappedGetter struct { ex header.Exchange[*headertest.DummyHeader] From 9b8a974a8db6513ecdc037425cd3d5bdd040673a Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 9 Oct 2024 15:01:18 +0200 Subject: [PATCH 3/9] new algo --- sync/sync_head.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index cec9a485..691a1cfc 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -179,7 +179,7 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { } if verErr.SoftFailure { - err := s.verifySkipping(ctx, sbjHead.Height(), newHead) + err := s.verifySkipping(ctx, sbjHead, newHead) return false, err } @@ -208,31 +208,42 @@ Client tries to apply height 1000 against 750 and if there is sufficient overlap it applies 1000 as the subjective head. If not, it downloads the halfway point and retries the process. */ -func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHeight uint64, networkHeader H) error { +func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHead, networkHeader H) error { + subjHeight := subjHead.Height() + diff := networkHeader.Height() - subjHeight if diff <= 0 { panic(fmt.Sprintf("implementation bug: diff is %d", diff)) } - for diff > 0 { - diff = diff / 2 - subjHeight += diff + for diff > 1 { + candidateHeight := subjHeight + diff/2 - subjHeader, err := s.getter.GetByHeight(ctx, subjHeight) + candidateHeader, err := s.getter.GetByHeight(ctx, candidateHeight) if err != nil { return err } - _, _ = s.subjectiveHead(ctx) - - if err := header.Verify(subjHeader, networkHeader); err == nil { - return nil + if err := header.Verify(subjHead, candidateHeader); err != nil { + // candidate failed, go deeper in 1st half. + diff = diff / 2 + continue } - if err := header.Verify(subjHeader, networkHeader); err == nil { + // candidate was validated properly, update subjHead. + subjHead = candidateHeader + // TODO: s.setSubjectiveHead(ctx, subjHead) + + if err := header.Verify(subjHead, networkHeader); err == nil { + // network head validate properly, return success. return nil } + + // new subjHead failed, go deeper in 2nd half. + subjHeight = subjHead.Height() + diff = networkHeader.Height() - subjHeight } + return &NewValidatorSetCantBeTrustedError{ NetHeadHeight: networkHeader.Height(), NetHeadHash: networkHeader.Hash(), From 6ff7cb7b47f621bf481e695de30ba2a4159be3f0 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 14 Oct 2024 11:58:08 +0200 Subject: [PATCH 4/9] add tests --- headertest/dummy_header.go | 6 ++ sync/sync_head.go | 2 +- sync/sync_head_test.go | 142 ++++++++++++++++++++++++++++++++++++- 3 files changed, 148 insertions(+), 2 deletions(-) diff --git a/headertest/dummy_header.go b/headertest/dummy_header.go index e8480dde..108c4bc3 100644 --- a/headertest/dummy_header.go +++ b/headertest/dummy_header.go @@ -30,6 +30,9 @@ type DummyHeader struct { // SoftFailure allows for testing scenarios where a header would fail // verification with SoftFailure set to true SoftFailure bool + + // VerifyFn can be used to change header.Verify behaviour per header. + VerifyFn func(hdr *DummyHeader) error `json:"-"` } func RandDummyHeader(t *testing.T) *DummyHeader { @@ -100,6 +103,9 @@ func (d *DummyHeader) IsExpired(period time.Duration) bool { } func (d *DummyHeader) Verify(hdr *DummyHeader) error { + if d.VerifyFn != nil { + return d.VerifyFn(hdr) + } if hdr.VerifyFailure { return &header.VerifyError{Reason: ErrDummyVerify, SoftFailure: hdr.SoftFailure} } diff --git a/sync/sync_head.go b/sync/sync_head.go index 691a1cfc..50bcf6d4 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -256,7 +256,7 @@ type NewValidatorSetCantBeTrustedError struct { } func (e *NewValidatorSetCantBeTrustedError) Error() string { - return fmt.Sprintf("sync: new validator set cant be trusted: head %d, attempted %s", e.NetHeadHeight, e.NetHeadHash) + return fmt.Sprintf("sync: new validator set cant be trusted: head %d, attempted %x", e.NetHeadHeight, e.NetHeadHash) } // isExpired checks if header is expired against trusting period. diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 6ccb7308..e04bfcdc 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -139,6 +139,146 @@ func TestSyncer_HeadWithNotEnoughValidators(t *testing.T) { require.True(t, wrappedGetter.withTrustedHead) } +func TestSyncer_verifySkipping(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + const total = 1000 + const badHeaderHeight = total + 1 + + suite := headertest.NewTestSuite(t) + head := suite.Head() + + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) + + // create a wrappedGetter to track exchange interactions + wrappedGetter := newWrappedGetter(local.NewExchange(remoteStore)) + + syncer, err := NewSyncer( + wrappedGetter, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Nanosecond), + WithRecencyThreshold(time.Nanosecond), // forces a request for a new sync target + // ensures that syncer's store contains a subjective head that is within + // the unbonding period so that the syncer can use a header from the network + // as a sync target + WithTrustingPeriod(time.Hour), + ) + require.NoError(t, err) + + // start the syncer which triggers a Head request that will + // load the syncer's subjective head from the store, and request + // a new sync target from the network rather than from trusted peers + err = syncer.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = syncer.Stop(ctx) + require.NoError(t, err) + }) + + t.Run("success (with bad candidates)", func(t *testing.T) { + const iters = 4 + + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) + + var verifyCounter atomic.Int32 + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if i >= 501 { + return nil + } + + verifyCounter.Add(1) + if verifyCounter.Load() <= iters { + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, + } + } + return nil + } + } + + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true + + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) + + err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) + require.NoError(t, err) + }) + + t.Run("success", func(t *testing.T) { + const iters = 4 + + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) + + var verifyCounter atomic.Int32 + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if hdr.Height() != badHeaderHeight { + return nil + } + + verifyCounter.Add(1) + if verifyCounter.Load() >= iters { + return nil + } + + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, + } + } + } + + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true + + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) + + err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) + require.NoError(t, err) + }) + + t.Run("cannot verify", func(t *testing.T) { + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) + + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if hdr.Height() != badHeaderHeight { + return nil + } + + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, + } + } + } + + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true + + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) + + err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) + var verErr *NewValidatorSetCantBeTrustedError + assert.ErrorAs(t, err, &verErr) + }) +} + type wrappedGetter struct { ex header.Exchange[*headertest.DummyHeader] @@ -170,7 +310,7 @@ func (t *wrappedGetter) Get(ctx context.Context, hash header.Hash) (*headertest. } func (t *wrappedGetter) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) { - return nil, errors.New("implement me") + return t.ex.GetByHeight(ctx, u) } func (t *wrappedGetter) GetRangeByHeight( From 46eabdc2d69aa5bb2b7d878420277edf21fc531f Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 14 Oct 2024 12:40:36 +0200 Subject: [PATCH 5/9] fix returned error and split tests --- sync/sync_head.go | 36 ++++--- sync/sync_head_test.go | 206 ++++++++++++++++++++++++++++------------- 2 files changed, 157 insertions(+), 85 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index 50bcf6d4..f808f6c6 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -174,28 +174,26 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { } var verErr *header.VerifyError - if !errors.As(err, &verErr) { - return false, nil - } + if errors.As(err, &verErr) { + if verErr.SoftFailure { + err := s.verifySkipping(ctx, sbjHead, newHead) + var errValSet *NewValidatorSetCantBeTrustedError + return errors.As(err, &errValSet), err + } - if verErr.SoftFailure { - err := s.verifySkipping(ctx, sbjHead, newHead) - return false, err + logF := log.Warnw + if errors.Is(err, header.ErrKnownHeader) { + logF = log.Debugw + } + logF("invalid network header", + "height_of_invalid", newHead.Height(), + "hash_of_invalid", newHead.Hash(), + "height_of_subjective", sbjHead.Height(), + "hash_of_subjective", sbjHead.Hash(), + "reason", verErr.Reason) } - logF := log.Warnw - if errors.Is(err, header.ErrKnownHeader) { - logF = log.Debugw - } - logF("invalid network header", - "height_of_invalid", newHead.Height(), - "hash_of_invalid", newHead.Hash(), - "height_of_subjective", sbjHead.Height(), - "hash_of_subjective", sbjHead.Hash(), - "reason", verErr.Reason, - ) - - return verErr.SoftFailure, err + return false, err } /* diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index e04bfcdc..a5a7ea16 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -139,7 +139,7 @@ func TestSyncer_HeadWithNotEnoughValidators(t *testing.T) { require.True(t, wrappedGetter.withTrustedHead) } -func TestSyncer_verifySkipping(t *testing.T) { +func TestSyncer_verifySkippingSuccess(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -178,105 +178,179 @@ func TestSyncer_verifySkipping(t *testing.T) { require.NoError(t, err) }) - t.Run("success (with bad candidates)", func(t *testing.T) { - const iters = 4 + const iters = 4 - headers := suite.GenDummyHeaders(total) - err = remoteStore.Append(ctx, headers...) - require.NoError(t, err) + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) - var verifyCounter atomic.Int32 - for i := range total { - headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { - if i >= 501 { - return nil - } + var verifyCounter atomic.Int32 + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if hdr.Height() != badHeaderHeight { + return nil + } - verifyCounter.Add(1) - if verifyCounter.Load() <= iters { - return &header.VerifyError{ - Reason: headertest.ErrDummyVerify, - SoftFailure: hdr.SoftFailure, - } - } + verifyCounter.Add(1) + if verifyCounter.Load() >= iters { return nil } + + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, + } } + } - headers[total-1].VerifyFailure = true - headers[total-1].SoftFailure = true + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true - subjHead, err := syncer.subjectiveHead(ctx) - require.NoError(t, err) + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) - err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) - require.NoError(t, err) - }) + err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) + require.NoError(t, err) +} + +func TestSyncer_verifySkippingSuccessWithBadCandidates(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + const total = 1000 + const badHeaderHeight = total + 1 - t.Run("success", func(t *testing.T) { - const iters = 4 + suite := headertest.NewTestSuite(t) + head := suite.Head() - headers := suite.GenDummyHeaders(total) - err = remoteStore.Append(ctx, headers...) + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) + + // create a wrappedGetter to track exchange interactions + wrappedGetter := newWrappedGetter(local.NewExchange(remoteStore)) + + syncer, err := NewSyncer( + wrappedGetter, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Nanosecond), + WithRecencyThreshold(time.Nanosecond), // forces a request for a new sync target + // ensures that syncer's store contains a subjective head that is within + // the unbonding period so that the syncer can use a header from the network + // as a sync target + WithTrustingPeriod(time.Hour), + ) + require.NoError(t, err) + + // start the syncer which triggers a Head request that will + // load the syncer's subjective head from the store, and request + // a new sync target from the network rather than from trusted peers + err = syncer.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = syncer.Stop(ctx) require.NoError(t, err) + }) - var verifyCounter atomic.Int32 - for i := range total { - headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { - if hdr.Height() != badHeaderHeight { - return nil - } + const iters = 4 - verifyCounter.Add(1) - if verifyCounter.Load() >= iters { - return nil - } + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) + + var verifyCounter atomic.Int32 + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if i >= 501 { + return nil + } + verifyCounter.Add(1) + if verifyCounter.Load() <= iters { return &header.VerifyError{ Reason: headertest.ErrDummyVerify, SoftFailure: hdr.SoftFailure, } } + return nil } + } - headers[total-1].VerifyFailure = true - headers[total-1].SoftFailure = true + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true - subjHead, err := syncer.subjectiveHead(ctx) - require.NoError(t, err) + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) + + err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) + require.NoError(t, err) +} + +func TestSyncer_verifySkippingCannotVerify(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + const total = 1000 + const badHeaderHeight = total + 1 + + suite := headertest.NewTestSuite(t) + head := suite.Head() + + localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head) + + // create a wrappedGetter to track exchange interactions + wrappedGetter := newWrappedGetter(local.NewExchange(remoteStore)) - err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) + syncer, err := NewSyncer( + wrappedGetter, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Nanosecond), + WithRecencyThreshold(time.Nanosecond), // forces a request for a new sync target + // ensures that syncer's store contains a subjective head that is within + // the unbonding period so that the syncer can use a header from the network + // as a sync target + WithTrustingPeriod(time.Hour), + ) + require.NoError(t, err) + + // start the syncer which triggers a Head request that will + // load the syncer's subjective head from the store, and request + // a new sync target from the network rather than from trusted peers + err = syncer.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = syncer.Stop(ctx) require.NoError(t, err) }) - t.Run("cannot verify", func(t *testing.T) { - headers := suite.GenDummyHeaders(total) - err = remoteStore.Append(ctx, headers...) - require.NoError(t, err) + headers := suite.GenDummyHeaders(total) + err = remoteStore.Append(ctx, headers...) + require.NoError(t, err) - for i := range total { - headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { - if hdr.Height() != badHeaderHeight { - return nil - } + for i := range total { + headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { + if hdr.Height() != badHeaderHeight { + return nil + } - return &header.VerifyError{ - Reason: headertest.ErrDummyVerify, - SoftFailure: hdr.SoftFailure, - } + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, } } + } - headers[total-1].VerifyFailure = true - headers[total-1].SoftFailure = true + headers[total-1].VerifyFailure = true + headers[total-1].SoftFailure = true - subjHead, err := syncer.subjectiveHead(ctx) - require.NoError(t, err) + subjHead, err := syncer.subjectiveHead(ctx) + require.NoError(t, err) - err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) - var verErr *NewValidatorSetCantBeTrustedError - assert.ErrorAs(t, err, &verErr) - }) + err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) + var verErr *NewValidatorSetCantBeTrustedError + assert.ErrorAs(t, err, &verErr, "%T", err) } type wrappedGetter struct { From f9d2dafb8063db19404bdee2eacb468fcb1b4dec Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 14 Oct 2024 13:08:40 +0200 Subject: [PATCH 6/9] remove old comment --- header.go | 3 ++- p2p/server_test.go | 2 +- sync/sync_head.go | 20 ++++++++++---------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/header.go b/header.go index 010ab2ca..75cfa4d0 100644 --- a/header.go +++ b/header.go @@ -12,7 +12,8 @@ type Header[H any] interface { // New creates new instance of a header. // It exists to overcome limitation of Go's type system. // See: - // https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example + // + //https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example New() H // IsZero reports whether Header is a zero value of it's concrete type. IsZero() bool diff --git a/p2p/server_test.go b/p2p/server_test.go index 1e896b2e..77496d92 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -176,7 +176,7 @@ func (timeoutStore[H]) Append(ctx context.Context, _ ...H) error { return ctx.Err() } -func (timeoutStore[H]) GetRange(ctx context.Context, _ uint64, _ uint64) ([]H, error) { +func (timeoutStore[H]) GetRange(ctx context.Context, _, _ uint64) ([]H, error) { <-ctx.Done() return nil, ctx.Err() } diff --git a/sync/sync_head.go b/sync/sync_head.go index f808f6c6..552d5b62 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -196,16 +196,16 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { return false, err } -/* -Subjective head is 500, network head is 1000. -Header at height 1000 does not have sufficient validator set overlap, -so the client downloads height 750 (which does have enough sufficient overlap), -verifies it against 500 and advances the subjective head to 750. - -Client tries to apply height 1000 against 750 and if there is sufficient overlap, -it applies 1000 as the subjective head. -If not, it downloads the halfway point and retries the process. -*/ +// verifySkipping will try to find such headers in range (subjHead, networkHeader) +// that can be verified by subjHead, literally: +// +// header.Verify(subjHead, candidate) +// +// and also such headers can verify `networkHeader`, literally +// +// header.Verify(candidate, networkHeader) +// +// When such candidates cannot be found [NewValidatorSetCantBeTrustedError] will be returned. func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHead, networkHeader H) error { subjHeight := subjHead.Height() From 6d035aa10d8c15f9556b4c4bad3bdbe50dae493c Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 5 Nov 2024 15:10:54 +0100 Subject: [PATCH 7/9] review suggestions --- sync/sync_head.go | 21 +++++++++++---------- sync/sync_head_test.go | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index 552d5b62..1fdacfef 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -196,14 +196,11 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { return false, err } -// verifySkipping will try to find such headers in range (subjHead, networkHeader) -// that can be verified by subjHead, literally: -// -// header.Verify(subjHead, candidate) -// -// and also such headers can verify `networkHeader`, literally -// -// header.Verify(candidate, networkHeader) +// verifySkipping performs a bifurcated header verification process such that +// it tries to find a header (or several headers if necessary) +// between the networkHead and the subjectiveHead such that non-adjacent +// (or in the worst case adjacent) verification passes and the networkHead +// can be verified as a valid sync target against the syncer's subjectiveHead. // // When such candidates cannot be found [NewValidatorSetCantBeTrustedError] will be returned. func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHead, networkHeader H) error { @@ -211,7 +208,11 @@ func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHead, networkHeader diff := networkHeader.Height() - subjHeight if diff <= 0 { - panic(fmt.Sprintf("implementation bug: diff is %d", diff)) + panic(fmt.Sprintf("implementation bug: diff %d, subjective height %d (%X), network height %d (%X)", + diff, + subjHeight, subjHead.Hash(), + networkHeader.Height(), networkHeader.Hash(), + )) } for diff > 1 { @@ -230,7 +231,7 @@ func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHead, networkHeader // candidate was validated properly, update subjHead. subjHead = candidateHeader - // TODO: s.setSubjectiveHead(ctx, subjHead) + s.setSubjectiveHead(ctx, subjHead) if err := header.Verify(subjHead, networkHeader); err == nil { // network head validate properly, return success. diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index a5a7ea16..9d9d6965 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -350,7 +350,7 @@ func TestSyncer_verifySkippingCannotVerify(t *testing.T) { err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) var verErr *NewValidatorSetCantBeTrustedError - assert.ErrorAs(t, err, &verErr, "%T", err) + assert.ErrorIs(t, err, verErr, "%T", err) } type wrappedGetter struct { From 821c9b4525eaf1cd01e2e50380148ca04acce548 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 11 Nov 2024 11:01:36 +0100 Subject: [PATCH 8/9] review suggestions --- sync/metrics.go | 18 +++++++++++++++++ sync/sync_head.go | 18 ++++------------- sync/sync_head_test.go | 44 ++++++++++++++++++++++++++---------------- 3 files changed, 49 insertions(+), 31 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 590bd5ba..2269a539 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -22,6 +22,7 @@ type metrics struct { trustedPeersOutOfSync metric.Int64Counter outdatedHeader metric.Int64Counter subjectiveInit metric.Int64Counter + failedAgainstSubjHead metric.Int64Counter subjectiveHead atomic.Int64 @@ -71,6 +72,16 @@ func newMetrics() (*metrics, error) { return nil, err } + failedAgainstSubjHead, err := meter.Int64Counter( + "hdr_sync_subj_validation_failed", + metric.WithDescription( + "tracks how many times validation against subjective head failed", + ), + ) + if err != nil { + return nil, err + } + subjectiveHead, err := meter.Int64ObservableGauge( "hdr_sync_subjective_head_gauge", metric.WithDescription("subjective head height"), @@ -112,6 +123,7 @@ func newMetrics() (*metrics, error) { trustedPeersOutOfSync: trustedPeersOutOfSync, outdatedHeader: outdatedHeader, subjectiveInit: subjectiveInit, + failedAgainstSubjHead: failedAgainstSubjHead, syncLoopDurationHist: syncLoopDurationHist, syncLoopRunningInst: syncLoopRunningInst, requestRangeTimeHist: requestRangeTimeHist, @@ -186,6 +198,12 @@ func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestam }) } +func (m *metrics) failedValidationAgainstSubjHead(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.failedAgainstSubjHead.Add(ctx, 1) + }) +} + func (m *metrics) rangeRequestStart() { if m == nil { return diff --git a/sync/sync_head.go b/sync/sync_head.go index 1fdacfef..41a661d3 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -177,8 +177,7 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) { if errors.As(err, &verErr) { if verErr.SoftFailure { err := s.verifySkipping(ctx, sbjHead, newHead) - var errValSet *NewValidatorSetCantBeTrustedError - return errors.As(err, &errValSet), err + return err != nil, err } logF := log.Warnw @@ -243,19 +242,10 @@ func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHead, networkHeader diff = networkHeader.Height() - subjHeight } - return &NewValidatorSetCantBeTrustedError{ - NetHeadHeight: networkHeader.Height(), - NetHeadHash: networkHeader.Hash(), - } -} - -type NewValidatorSetCantBeTrustedError struct { - NetHeadHeight uint64 - NetHeadHash []byte -} + s.metrics.failedValidationAgainstSubjHead(ctx) + log.Warnw("sync: header validation against subjHead", "height", networkHeader.Height(), "hash", networkHeader.Hash().String()) -func (e *NewValidatorSetCantBeTrustedError) Error() string { - return fmt.Sprintf("sync: new validator set cant be trusted: head %d, attempted %x", e.NetHeadHeight, e.NetHeadHash) + return fmt.Errorf("sync: header validation against subjHead height:%d hash:%x", networkHeader.Height(), networkHeader.Hash().String()) } // isExpired checks if header is expired against trusting period. diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 9d9d6965..4179f6a0 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -139,13 +139,12 @@ func TestSyncer_HeadWithNotEnoughValidators(t *testing.T) { require.True(t, wrappedGetter.withTrustedHead) } +// Test will simulate a case with upto `iters` failures before we will get to +// the header that can be verified against subjectiveHead. func TestSyncer_verifySkippingSuccess(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) - const total = 1000 - const badHeaderHeight = total + 1 - suite := headertest.NewTestSuite(t) head := suite.Head() @@ -178,12 +177,18 @@ func TestSyncer_verifySkippingSuccess(t *testing.T) { require.NoError(t, err) }) + // when + const total = 1000 + const badHeaderHeight = total + 1 // make the last header bad const iters = 4 headers := suite.GenDummyHeaders(total) err = remoteStore.Append(ctx, headers...) require.NoError(t, err) + // configure header verification method is such way + // that the first [iters] verification will fail + // but all other will be ok. var verifyCounter atomic.Int32 for i := range total { headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { @@ -213,13 +218,12 @@ func TestSyncer_verifySkippingSuccess(t *testing.T) { require.NoError(t, err) } +// Test will simulate a case with upto `iters` failures before we will get to +// the header that can be verified against subjectiveHead. func TestSyncer_verifySkippingSuccessWithBadCandidates(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) - const total = 1000 - const badHeaderHeight = total + 1 - suite := headertest.NewTestSuite(t) head := suite.Head() @@ -252,12 +256,17 @@ func TestSyncer_verifySkippingSuccessWithBadCandidates(t *testing.T) { require.NoError(t, err) }) + const total = 1000 + const badHeaderHeight = total + 1 const iters = 4 headers := suite.GenDummyHeaders(total) err = remoteStore.Append(ctx, headers...) require.NoError(t, err) + // configure header verification method is such way + // that the first [iters] verification will fail + // but all other will be ok. var verifyCounter atomic.Int32 for i := range total { headers[i].VerifyFn = func(hdr *headertest.DummyHeader) error { @@ -266,13 +275,13 @@ func TestSyncer_verifySkippingSuccessWithBadCandidates(t *testing.T) { } verifyCounter.Add(1) - if verifyCounter.Load() <= iters { - return &header.VerifyError{ - Reason: headertest.ErrDummyVerify, - SoftFailure: hdr.SoftFailure, - } + if verifyCounter.Load() > iters { + return nil + } + return &header.VerifyError{ + Reason: headertest.ErrDummyVerify, + SoftFailure: hdr.SoftFailure, } - return nil } } @@ -286,13 +295,12 @@ func TestSyncer_verifySkippingSuccessWithBadCandidates(t *testing.T) { require.NoError(t, err) } +// Test will simulate a case when no headers can be verified against subjectiveHead. +// As a result the [NewValidatorSetCantBeTrustedError] error will be returned. func TestSyncer_verifySkippingCannotVerify(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) - const total = 1000 - const badHeaderHeight = total + 1 - suite := headertest.NewTestSuite(t) head := suite.Head() @@ -325,6 +333,9 @@ func TestSyncer_verifySkippingCannotVerify(t *testing.T) { require.NoError(t, err) }) + const total = 1000 + const badHeaderHeight = total + 1 + headers := suite.GenDummyHeaders(total) err = remoteStore.Append(ctx, headers...) require.NoError(t, err) @@ -349,8 +360,7 @@ func TestSyncer_verifySkippingCannotVerify(t *testing.T) { require.NoError(t, err) err = syncer.verifySkipping(ctx, subjHead, headers[total-1]) - var verErr *NewValidatorSetCantBeTrustedError - assert.ErrorIs(t, err, verErr, "%T", err) + assert.Error(t, err) } type wrappedGetter struct { From f9bbccb0e4808edbd332ee8f65d1b83ae6143f13 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 13 Nov 2024 11:15:52 +0100 Subject: [PATCH 9/9] fix panic msg --- sync/sync_head.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index 41a661d3..b2fc399f 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -207,8 +207,7 @@ func (s *Syncer[H]) verifySkipping(ctx context.Context, subjHead, networkHeader diff := networkHeader.Height() - subjHeight if diff <= 0 { - panic(fmt.Sprintf("implementation bug: diff %d, subjective height %d (%X), network height %d (%X)", - diff, + panic(fmt.Sprintf("implementation bug:\n subjective head height %d, hash %X,\n network head height %d, hash %X", subjHeight, subjHead.Hash(), networkHeader.Height(), networkHeader.Hash(), ))