Skip to content

Commit

Permalink
fix: active validators not always connected to each other (#844)
Browse files Browse the repository at this point in the history
* chore(dash): decrease log verbosity in validator conn executor

* fix(node): validator conn executor not started correctly

* fix: panic on full node

* fix: dash dialer fails to lookup nodes in addressbook
  • Loading branch information
lklimek authored Jul 24, 2024
1 parent 13492fd commit 3727fa4
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 38 deletions.
24 changes: 12 additions & 12 deletions dash/quorum/validator_conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (vc *ValidatorConnExecutor) subscribe() error {
// receiveEvents processes received events and executes all the logic.
// Returns non-nil error only if fatal error occurred and the main goroutine should be terminated.
func (vc *ValidatorConnExecutor) receiveEvents(ctx context.Context) error {
vc.logger.Debug("ValidatorConnExecutor: waiting for an event")
vc.logger.Trace("ValidatorConnExecutor: waiting for an event")
sCtx, cancel := context.WithCancel(ctx) // TODO check value for correctness
defer cancel()
msg, err := vc.subscription.Next(sCtx)
Expand All @@ -202,7 +202,7 @@ func (vc *ValidatorConnExecutor) receiveEvents(ctx context.Context) error {
vc.logger.Error("cannot handle validator update", "error", err)
return nil // non-fatal, so no error returned to continue the loop
}
vc.logger.Debug("validator updates processed successfully", "event", event)
vc.logger.Trace("validator updates processed successfully", "event", event)
return nil
}

Expand Down Expand Up @@ -264,7 +264,7 @@ func (vc *ValidatorConnExecutor) resolveNodeID(va *types.ValidatorAddress) error
va.NodeID = address.NodeID
return nil // success
}
vc.logger.Debug(
vc.logger.Trace(
"warning: validator node id lookup method failed",
"url", va.String(),
"method", method,
Expand Down Expand Up @@ -305,7 +305,7 @@ func (vc *ValidatorConnExecutor) ensureValidatorsHaveNodeIDs(validators []*types
for _, validator := range validators {
err := vc.resolveNodeID(&validator.NodeAddress)
if err != nil {
vc.logger.Error("cannot determine node id for validator, skipping", "url", validator.String(), "error", err)
vc.logger.Warn("cannot determine node id for validator, skipping", "url", validator.String(), "error", err)
continue
}
results = append(results, validator)
Expand All @@ -318,7 +318,7 @@ func (vc *ValidatorConnExecutor) disconnectValidator(validator types.Validator)
return err
}
id := validator.NodeAddress.NodeID
vc.logger.Debug("disconnecting Validator", "validator", validator, "id", id, "address", validator.NodeAddress.String())
vc.logger.Trace("disconnecting Validator", "validator", validator, "id", id, "address", validator.NodeAddress.String())
if err := vc.dialer.DisconnectAsync(id); err != nil {
return err
}
Expand All @@ -337,7 +337,7 @@ func (vc *ValidatorConnExecutor) disconnectValidators(exceptions validatorMap) e
vc.logger.Error("cannot disconnect Validator", "error", err)
continue
}
vc.logger.Debug("Validator already disconnected", "error", err)
vc.logger.Trace("Validator already disconnected", "error", err)
// We still delete the validator from vc.connectedValidators
}
delete(vc.connectedValidators, currentKey)
Expand Down Expand Up @@ -373,11 +373,11 @@ func (vc *ValidatorConnExecutor) updateConnections() error {
if err := vc.disconnectValidators(newValidators); err != nil {
return fmt.Errorf("cannot disconnect unused validators: %w", err)
}
vc.logger.Debug("filtering validators", "validators", newValidators.String())
vc.logger.Trace("filtering validators", "validators", newValidators.String())
// ensure that we can connect to all validators
newValidators = vc.filterAddresses(newValidators)
// Connect to new validators
vc.logger.Debug("dialing validators", "validators", newValidators.String())
vc.logger.Trace("dialing validators", "validators", newValidators.String())
if err := vc.dial(newValidators); err != nil {
return fmt.Errorf("cannot dial validators: %w", err)
}
Expand All @@ -390,20 +390,20 @@ func (vc *ValidatorConnExecutor) filterAddresses(validators validatorMap) valida
filtered := make(validatorMap, len(validators))
for id, validator := range validators {
if vc.proTxHash != nil && string(id) == vc.proTxHash.String() {
vc.logger.Debug("validator is ourself", "id", id, "address", validator.NodeAddress.String())
vc.logger.Trace("validator is ourself", "id", id, "address", validator.NodeAddress.String())
continue
}

if err := validator.ValidateBasic(); err != nil {
vc.logger.Debug("validator address is invalid", "id", id, "address", validator.NodeAddress.String())
vc.logger.Warn("validator address is invalid", "id", id, "address", validator.NodeAddress.String())
continue
}
if vc.connectedValidators.contains(validator) {
vc.logger.Debug("validator already connected", "id", id)
vc.logger.Trace("validator already connected", "id", id)
continue
}
if vc.dialer.IsDialingOrConnected(validator.NodeAddress.NodeID) {
vc.logger.Debug("already dialing this validator", "id", id, "address", validator.NodeAddress.String())
vc.logger.Trace("already dialing this validator", "id", id, "address", validator.NodeAddress.String())
continue
}

Expand Down
4 changes: 3 additions & 1 deletion internal/p2p/dash_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ func (cm *routerDashDialer) lookupIPPort(ctx context.Context, ip net.IP, port ui
for _, nodeID := range peers {
addresses := cm.peerManager.Addresses(nodeID)
for _, addr := range addresses {
if endpoints, err := addr.Resolve(ctx); err != nil {
if endpoints, err := addr.Resolve(ctx); err == nil {
for _, item := range endpoints {
if item.IP.Equal(ip) && item.Port == port {
return item.NodeAddress(nodeID), nil
}
}
} else {
cm.logger.Warn("lookupIPPort: failed to resolve address", "peer", nodeID, "address", addr, "err", err)
}
}
}
Expand Down
46 changes: 22 additions & 24 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ type nodeImpl struct {
shutdownOps closer
rpcEnv *rpccore.Environment
prometheusSrv *http.Server

// Dash
validatorConnExecutor *dashquorum.ValidatorConnExecutor
}

// newDefaultNode returns a Tendermint node with default settings for the
Expand Down Expand Up @@ -238,25 +235,6 @@ func makeNode(
fmt.Errorf("failed to create peer manager: %w", err),
makeCloser(closers))
}

// Start Dash connection executor
var validatorConnExecutor *dashquorum.ValidatorConnExecutor
if len(proTxHash) > 0 {
vcLogger := logger.With("node_proTxHash", proTxHash.ShortString(), "module", "ValidatorConnExecutor")
dcm := p2p.NewRouterDashDialer(peerManager, vcLogger)
validatorConnExecutor, err = dashquorum.NewValidatorConnExecutor(
proTxHash,
eventBus,
dcm,
dashquorum.WithLogger(vcLogger),
dashquorum.WithValidatorsSet(state.Validators),
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
}

// TODO construct node here:
node := &nodeImpl{
config: cfg,
logger: logger,
Expand All @@ -276,8 +254,6 @@ func makeNode(

shutdownOps: makeCloser(closers),

validatorConnExecutor: validatorConnExecutor,

rpcEnv: &rpccore.Environment{
ProxyApp: proxyApp,

Expand All @@ -295,6 +271,28 @@ func makeNode(
},
}

// Start Dash connection executor
if len(proTxHash) > 0 {
var validatorConnExecutor *dashquorum.ValidatorConnExecutor

vcLogger := logger.With("node_proTxHash", proTxHash.ShortString(), "module", "ValidatorConnExecutor")
dcm := p2p.NewRouterDashDialer(peerManager, vcLogger)
validatorConnExecutor, err = dashquorum.NewValidatorConnExecutor(
proTxHash,
eventBus,
dcm,
dashquorum.WithLogger(vcLogger),
dashquorum.WithValidatorsSet(state.Validators),
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}

node.services = append(node.services, validatorConnExecutor)
} else {
logger.Debug("ProTxHash not set, so we are not a validator; skipping ValidatorConnExecutor initialization")
}

node.router, err = createRouter(logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func TestNodeSetEventSink(t *testing.T) {

logger := log.NewNopLogger()

setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
setupTest := func(t *testing.T, _conf *config.Config) []indexer.EventSink {
eventBus := eventbus.NewDefault(logger.With("module", "events"))
require.NoError(t, eventBus.Start(ctx))

Expand Down

0 comments on commit 3727fa4

Please sign in to comment.