Skip to content

Commit

Permalink
Implement adaptive timeouts based on historical node performance in n…
Browse files Browse the repository at this point in the history
…ode tracker
  • Loading branch information
jdutchak committed Aug 1, 2024
1 parent 52abaee commit a65940c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 23 deletions.
1 change: 1 addition & 0 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func main() {
if node.IsStaked {
go workers.MonitorWorkers(ctx, node)
go masa.SubscribeToBlocks(ctx, node)
go node.NodeTracker.ClearExpiredWorkerTimeouts()
}

// Listen for SIGINT (CTRL+C)
Expand Down
46 changes: 23 additions & 23 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,29 @@ func (m *JSONMultiaddr) UnmarshalJSON(b []byte) error {
}

type NodeData struct {
Multiaddrs []JSONMultiaddr `json:"multiaddrs,omitempty"`
PeerId peer.ID `json:"peerId"`
FirstJoinedUnix int64 `json:"firstJoined,omitempty"`
LastJoinedUnix int64 `json:"lastJoined,omitempty"`
LastLeftUnix int64 `json:"-"`
LastUpdatedUnix int64 `json:"lastUpdated,omitempty"`
CurrentUptime time.Duration `json:"uptime,omitempty"`
CurrentUptimeStr string `json:"uptimeStr,omitempty"`
AccumulatedUptime time.Duration `json:"accumulatedUptime,omitempty"`
AccumulatedUptimeStr string `json:"accumulatedUptimeStr,omitempty"`
EthAddress string `json:"ethAddress,omitempty"`
Activity int `json:"activity,omitempty"`
IsActive bool `json:"isActive"`
IsStaked bool `json:"isStaked"`
SelfIdentified bool `json:"-"`
IsValidator bool `json:"isValidator"`
IsTwitterScraper bool `json:"isTwitterScraper"`
IsDiscordScraper bool `json:"isDiscordScraper"`
IsTelegramScraper bool `json:"isTelegramScraper"`
IsWebScraper bool `json:"isWebScraper"`
Records any `json:"records,omitempty"`
Version string `json:"version"`
TimeoutChecks map[string]time.Time `json:"timeoutChecks,omitempty"`
Multiaddrs []JSONMultiaddr `json:"multiaddrs,omitempty"`
PeerId peer.ID `json:"peerId"`
FirstJoinedUnix int64 `json:"firstJoined,omitempty"`
LastJoinedUnix int64 `json:"lastJoined,omitempty"`
LastLeftUnix int64 `json:"-"`
LastUpdatedUnix int64 `json:"lastUpdated,omitempty"`
CurrentUptime time.Duration `json:"uptime,omitempty"`
CurrentUptimeStr string `json:"uptimeStr,omitempty"`
AccumulatedUptime time.Duration `json:"accumulatedUptime,omitempty"`
AccumulatedUptimeStr string `json:"accumulatedUptimeStr,omitempty"`
EthAddress string `json:"ethAddress,omitempty"`
Activity int `json:"activity,omitempty"`
IsActive bool `json:"isActive"`
IsStaked bool `json:"isStaked"`
SelfIdentified bool `json:"-"`
IsValidator bool `json:"isValidator"`
IsTwitterScraper bool `json:"isTwitterScraper"`
IsDiscordScraper bool `json:"isDiscordScraper"`
IsTelegramScraper bool `json:"isTelegramScraper"`
IsWebScraper bool `json:"isWebScraper"`
Records any `json:"records,omitempty"`
Version string `json:"version"`
WorkerTimeout time.Time `json:"workerTimeout,omitempty"`
}

// NewNodeData creates a new NodeData struct initialized with the given
Expand Down
27 changes: 27 additions & 0 deletions pkg/pubsub/node_event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,35 @@ func (net *NodeEventTracker) ClearExpiredBufferEntries() {
}
}

// RemoveNodeData removes the node data associated with the given peer ID from the NodeEventTracker.
// It deletes the node data from the internal map and removes any corresponding entry
// from the connect buffer. This function is typically called when a peer disconnects
// or is no longer part of the network.
//
// Parameters:
// - peerID: A string representing the ID of the peer to be removed.
func (net *NodeEventTracker) RemoveNodeData(peerID string) {
net.nodeData.Delete(peerID)
delete(net.ConnectBuffer, peerID)
logrus.Infof("[+] Removed peer %s from NodeTracker", peerID)
}

// ClearExpiredWorkerTimeouts periodically checks and clears expired worker timeouts.
// It runs in an infinite loop, sleeping for 5 minutes between each iteration.
// For each node in the network, it checks if the worker timeout has expired (after 60 minutes).
// If a timeout has expired, it resets the WorkerTimeout to zero and updates the node data.
// This function helps manage the availability of workers in the network by clearing
// temporary timeout states.
func (net *NodeEventTracker) ClearExpiredWorkerTimeouts() {
for {
time.Sleep(5 * time.Minute) // Check every 5 minutes
now := time.Now()

for _, nodeData := range net.GetAllNodeData() {
if !nodeData.WorkerTimeout.IsZero() && now.Sub(nodeData.WorkerTimeout) >= 60*time.Minute {
nodeData.WorkerTimeout = time.Time{} // Reset to zero value
net.AddOrUpdateNodeData(&nodeData, true)

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
}
}
}
}
15 changes: 15 additions & 0 deletions pkg/workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) {
if (p.PeerId.String() != node.Host.ID().String()) &&
p.IsStaked &&
node.NodeTracker.GetNodeData(p.PeerId.String()).CanDoWork(pubsub.WorkerCategory(message.Type)) {

// Check WorkerTimeout
nodeData := node.NodeTracker.GetNodeData(p.PeerId.String())
if !nodeData.WorkerTimeout.IsZero() && time.Since(nodeData.WorkerTimeout) < 60*time.Minute {
logrus.Infof("[+] Skipping worker %s due to timeout", p.PeerId)
continue
}

logrus.Infof("[+] Worker Address: %s", ipAddr)
wg.Add(1)
go func(p pubsub.NodeData) {
Expand Down Expand Up @@ -445,6 +453,13 @@ func processValidatorData(data *pubsub2.Message, validatorDataMap map[string]int
if _, ok := response["error"].(string); ok {
logrus.Infof("[+] Work failed %s", response["error"])

// Set WorkerTimeout for the node
nodeData := node.NodeTracker.GetNodeData(data.ReceivedFrom.String())
if nodeData != nil {
nodeData.WorkerTimeout = time.Now()
node.NodeTracker.AddOrUpdateNodeData(nodeData, true)

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
}

} else if work, ok := response["data"].(string); ok {
processWork(data, work, startTime, node)

Expand Down

0 comments on commit a65940c

Please sign in to comment.