From 3727fa4ed69624571107ceefdfaadb9570d9fad5 Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Wed, 24 Jul 2024 14:23:28 +0200 Subject: [PATCH] fix: active validators not always connected to each other (#844) * chore(dash): decrease log verbosity in validator conn executor * fix(node): validator conn executor not started correctly * fix: panic on full node * fix: dash dialer fails to lookup nodes in addressbook --- dash/quorum/validator_conn_executor.go | 24 +++++++------- internal/p2p/dash_dialer.go | 4 ++- node/node.go | 46 ++++++++++++-------------- node/node_test.go | 2 +- 4 files changed, 38 insertions(+), 38 deletions(-) diff --git a/dash/quorum/validator_conn_executor.go b/dash/quorum/validator_conn_executor.go index ffad0b475b..bcef5c3330 100644 --- a/dash/quorum/validator_conn_executor.go +++ b/dash/quorum/validator_conn_executor.go @@ -184,7 +184,7 @@ func (vc *ValidatorConnExecutor) subscribe() error { // receiveEvents processes received events and executes all the logic. // Returns non-nil error only if fatal error occurred and the main goroutine should be terminated. func (vc *ValidatorConnExecutor) receiveEvents(ctx context.Context) error { - vc.logger.Debug("ValidatorConnExecutor: waiting for an event") + vc.logger.Trace("ValidatorConnExecutor: waiting for an event") sCtx, cancel := context.WithCancel(ctx) // TODO check value for correctness defer cancel() msg, err := vc.subscription.Next(sCtx) @@ -202,7 +202,7 @@ func (vc *ValidatorConnExecutor) receiveEvents(ctx context.Context) error { vc.logger.Error("cannot handle validator update", "error", err) return nil // non-fatal, so no error returned to continue the loop } - vc.logger.Debug("validator updates processed successfully", "event", event) + vc.logger.Trace("validator updates processed successfully", "event", event) return nil } @@ -264,7 +264,7 @@ func (vc *ValidatorConnExecutor) resolveNodeID(va *types.ValidatorAddress) error va.NodeID = address.NodeID return nil // success } - vc.logger.Debug( + vc.logger.Trace( "warning: validator node id lookup method failed", "url", va.String(), "method", method, @@ -305,7 +305,7 @@ func (vc *ValidatorConnExecutor) ensureValidatorsHaveNodeIDs(validators []*types for _, validator := range validators { err := vc.resolveNodeID(&validator.NodeAddress) if err != nil { - vc.logger.Error("cannot determine node id for validator, skipping", "url", validator.String(), "error", err) + vc.logger.Warn("cannot determine node id for validator, skipping", "url", validator.String(), "error", err) continue } results = append(results, validator) @@ -318,7 +318,7 @@ func (vc *ValidatorConnExecutor) disconnectValidator(validator types.Validator) return err } id := validator.NodeAddress.NodeID - vc.logger.Debug("disconnecting Validator", "validator", validator, "id", id, "address", validator.NodeAddress.String()) + vc.logger.Trace("disconnecting Validator", "validator", validator, "id", id, "address", validator.NodeAddress.String()) if err := vc.dialer.DisconnectAsync(id); err != nil { return err } @@ -337,7 +337,7 @@ func (vc *ValidatorConnExecutor) disconnectValidators(exceptions validatorMap) e vc.logger.Error("cannot disconnect Validator", "error", err) continue } - vc.logger.Debug("Validator already disconnected", "error", err) + vc.logger.Trace("Validator already disconnected", "error", err) // We still delete the validator from vc.connectedValidators } delete(vc.connectedValidators, currentKey) @@ -373,11 +373,11 @@ func (vc *ValidatorConnExecutor) updateConnections() error { if err := vc.disconnectValidators(newValidators); err != nil { return fmt.Errorf("cannot disconnect unused validators: %w", err) } - vc.logger.Debug("filtering validators", "validators", newValidators.String()) + vc.logger.Trace("filtering validators", "validators", newValidators.String()) // ensure that we can connect to all validators newValidators = vc.filterAddresses(newValidators) // Connect to new validators - vc.logger.Debug("dialing validators", "validators", newValidators.String()) + vc.logger.Trace("dialing validators", "validators", newValidators.String()) if err := vc.dial(newValidators); err != nil { return fmt.Errorf("cannot dial validators: %w", err) } @@ -390,20 +390,20 @@ func (vc *ValidatorConnExecutor) filterAddresses(validators validatorMap) valida filtered := make(validatorMap, len(validators)) for id, validator := range validators { if vc.proTxHash != nil && string(id) == vc.proTxHash.String() { - vc.logger.Debug("validator is ourself", "id", id, "address", validator.NodeAddress.String()) + vc.logger.Trace("validator is ourself", "id", id, "address", validator.NodeAddress.String()) continue } if err := validator.ValidateBasic(); err != nil { - vc.logger.Debug("validator address is invalid", "id", id, "address", validator.NodeAddress.String()) + vc.logger.Warn("validator address is invalid", "id", id, "address", validator.NodeAddress.String()) continue } if vc.connectedValidators.contains(validator) { - vc.logger.Debug("validator already connected", "id", id) + vc.logger.Trace("validator already connected", "id", id) continue } if vc.dialer.IsDialingOrConnected(validator.NodeAddress.NodeID) { - vc.logger.Debug("already dialing this validator", "id", id, "address", validator.NodeAddress.String()) + vc.logger.Trace("already dialing this validator", "id", id, "address", validator.NodeAddress.String()) continue } diff --git a/internal/p2p/dash_dialer.go b/internal/p2p/dash_dialer.go index 64d70ecff3..65b9372b84 100644 --- a/internal/p2p/dash_dialer.go +++ b/internal/p2p/dash_dialer.go @@ -110,12 +110,14 @@ func (cm *routerDashDialer) lookupIPPort(ctx context.Context, ip net.IP, port ui for _, nodeID := range peers { addresses := cm.peerManager.Addresses(nodeID) for _, addr := range addresses { - if endpoints, err := addr.Resolve(ctx); err != nil { + if endpoints, err := addr.Resolve(ctx); err == nil { for _, item := range endpoints { if item.IP.Equal(ip) && item.Port == port { return item.NodeAddress(nodeID), nil } } + } else { + cm.logger.Warn("lookupIPPort: failed to resolve address", "peer", nodeID, "address", addr, "err", err) } } } diff --git a/node/node.go b/node/node.go index 67292f777e..e4ba7e6300 100644 --- a/node/node.go +++ b/node/node.go @@ -73,9 +73,6 @@ type nodeImpl struct { shutdownOps closer rpcEnv *rpccore.Environment prometheusSrv *http.Server - - // Dash - validatorConnExecutor *dashquorum.ValidatorConnExecutor } // newDefaultNode returns a Tendermint node with default settings for the @@ -238,25 +235,6 @@ func makeNode( fmt.Errorf("failed to create peer manager: %w", err), makeCloser(closers)) } - - // Start Dash connection executor - var validatorConnExecutor *dashquorum.ValidatorConnExecutor - if len(proTxHash) > 0 { - vcLogger := logger.With("node_proTxHash", proTxHash.ShortString(), "module", "ValidatorConnExecutor") - dcm := p2p.NewRouterDashDialer(peerManager, vcLogger) - validatorConnExecutor, err = dashquorum.NewValidatorConnExecutor( - proTxHash, - eventBus, - dcm, - dashquorum.WithLogger(vcLogger), - dashquorum.WithValidatorsSet(state.Validators), - ) - if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) - } - } - - // TODO construct node here: node := &nodeImpl{ config: cfg, logger: logger, @@ -276,8 +254,6 @@ func makeNode( shutdownOps: makeCloser(closers), - validatorConnExecutor: validatorConnExecutor, - rpcEnv: &rpccore.Environment{ ProxyApp: proxyApp, @@ -295,6 +271,28 @@ func makeNode( }, } + // Start Dash connection executor + if len(proTxHash) > 0 { + var validatorConnExecutor *dashquorum.ValidatorConnExecutor + + vcLogger := logger.With("node_proTxHash", proTxHash.ShortString(), "module", "ValidatorConnExecutor") + dcm := p2p.NewRouterDashDialer(peerManager, vcLogger) + validatorConnExecutor, err = dashquorum.NewValidatorConnExecutor( + proTxHash, + eventBus, + dcm, + dashquorum.WithLogger(vcLogger), + dashquorum.WithValidatorsSet(state.Validators), + ) + if err != nil { + return nil, combineCloseError(err, makeCloser(closers)) + } + + node.services = append(node.services, validatorConnExecutor) + } else { + logger.Debug("ProTxHash not set, so we are not a validator; skipping ValidatorConnExecutor initialization") + } + node.router, err = createRouter(logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp) if err != nil { return nil, combineCloseError( diff --git a/node/node_test.go b/node/node_test.go index 2d085c33f3..7f8671a349 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -672,7 +672,7 @@ func TestNodeSetEventSink(t *testing.T) { logger := log.NewNopLogger() - setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink { + setupTest := func(t *testing.T, _conf *config.Config) []indexer.EventSink { eventBus := eventbus.NewDefault(logger.With("module", "events")) require.NoError(t, eventBus.Start(ctx))