From 2644560c0562412a3c2209820be07f8f3f8b1846 Mon Sep 17 00:00:00 2001 From: Leo Zhang Date: Tue, 20 Jul 2021 20:41:58 -0700 Subject: [PATCH] fix synchornization handling response (#1002) --- engine/common/synchronization/engine.go | 26 +++++++------------- engine/common/synchronization/engine_test.go | 6 ++--- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/engine/common/synchronization/engine.go b/engine/common/synchronization/engine.go index f7ac714e40a..6cf8a0cd5c1 100644 --- a/engine/common/synchronization/engine.go +++ b/engine/common/synchronization/engine.go @@ -340,7 +340,7 @@ func (e *Engine) requestProcessingLoop() { case <-notifier: err := e.processAvailableRequests() if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing queued message") + e.log.Fatal().Err(err).Msg("internal error processing queued requests") } } } @@ -372,7 +372,7 @@ func (e *Engine) responseProcessingLoop() { case <-notifier: err := e.processAvailableResponses() if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing queued message") + e.log.Fatal().Err(err).Msg("internal error processing queued responses") } } } @@ -389,21 +389,15 @@ func (e *Engine) processAvailableResponses() error { msg, ok := e.pendingSyncResponses.Get() if ok { - err := e.onSyncResponse(msg.OriginID, msg.Payload.(*messages.SyncResponse)) + e.onSyncResponse(msg.OriginID, msg.Payload.(*messages.SyncResponse)) e.metrics.MessageHandled(metrics.EngineSynchronization, metrics.MessageSyncResponse) - if err != nil { - return fmt.Errorf("could not process sync response") - } continue } msg, ok = e.pendingBlockResponses.Get() if ok { - err := e.onBlockResponse(msg.OriginID, msg.Payload.(*messages.BlockResponse)) + e.onBlockResponse(msg.OriginID, msg.Payload.(*messages.BlockResponse)) e.metrics.MessageHandled(metrics.EngineSynchronization, metrics.MessageBlockResponse) - if err != nil { - return fmt.Errorf("could not process block response") - } continue } @@ -427,7 +421,7 @@ func (e *Engine) processAvailableRequests() error { if ok { err := e.onSyncRequest(msg.OriginID, msg.Payload.(*messages.SyncRequest)) if err != nil { - return fmt.Errorf("could not process sync request: %w", err) + engine.LogError(e.log, err) } continue } @@ -436,7 +430,7 @@ func (e *Engine) processAvailableRequests() error { if ok { err := e.onRangeRequest(msg.OriginID, msg.Payload.(*messages.RangeRequest)) if err != nil { - return fmt.Errorf("could not process range request: %w", err) + engine.LogError(e.log, err) } continue } @@ -445,7 +439,7 @@ func (e *Engine) processAvailableRequests() error { if ok { err := e.onBatchRequest(msg.OriginID, msg.Payload.(*messages.BatchRequest)) if err != nil { - return fmt.Errorf("could not process batch request: %w", err) + engine.LogError(e.log, err) } continue } @@ -487,11 +481,10 @@ func (e *Engine) onSyncRequest(originID flow.Identifier, req *messages.SyncReque } // onSyncResponse processes a synchronization response. -func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResponse) error { +func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResponse) { final := e.finalSnapshot().head e.core.HandleHeight(final, res.Height) - return nil } // onRangeRequest processes a request for a range of blocks by height. @@ -590,7 +583,7 @@ func (e *Engine) onBatchRequest(originID flow.Identifier, req *messages.BatchReq } // onBlockResponse processes a response containing a specifically requested block. -func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockResponse) error { +func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockResponse) { // process the blocks one by one for _, block := range res.Blocks { if !e.core.HandleBlock(block.Header) { @@ -602,7 +595,6 @@ func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockRe } e.comp.SubmitLocal(synced) } - return nil } // checkLoop will regularly scan for items that need requesting. diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index cce472e67c0..a5b08e7dcfa 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -215,8 +215,7 @@ func (ss *SyncSuite) TestOnSyncResponse() { // the height should be handled ss.core.On("HandleHeight", ss.head, res.Height) - err := ss.e.onSyncResponse(originID, res) - ss.Assert().Nil(err) + ss.e.onSyncResponse(originID, res) ss.core.AssertExpectations(ss.T()) } @@ -366,8 +365,7 @@ func (ss *SyncSuite) TestOnBlockResponse() { }, ) - err := ss.e.onBlockResponse(originID, res) - ss.Assert().Nil(err) + ss.e.onBlockResponse(originID, res) ss.comp.AssertExpectations(ss.T()) ss.core.AssertExpectations(ss.T()) }