Skip to content

Commit

Permalink
Improve error handling and resilience in send_work.go
Browse files Browse the repository at this point in the history
- Add maxSpawnAttempts constant to limit remote worker spawn attempts
- Implement retry mechanism for spawning remote workers with exponential backoff
- Introduce spawnRemoteWorker function for better organization and error handling
- Enhance logging for better visibility into worker spawning and processing
- Improve handling of dead letters and timeouts in remote worker operations
- Refactor handleRemoteWorker to be more robust against transient failures
- Update tryWorker function to handle both local and remote worker scenarios
- Implement round-robin worker selection with retries in SendWork function

These changes aim to increase the reliability of the worker system,
particularly when dealing with remote workers, and provide better
insights into error scenarios for easier debugging and monitoring.
  • Loading branch information
teslashibe committed Aug 5, 2024
1 parent 4db2ee0 commit d140ce2
Showing 1 changed file with 77 additions and 28 deletions.
105 changes: 77 additions & 28 deletions pkg/workers/send_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
)

const (
workerTimeout = 30 * time.Second
workerTimeout = 30 * time.Second
maxRetries = 3
maxSpawnAttempts = 3
)

type Worker struct {
Expand All @@ -35,32 +37,62 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) {
responseCollector := make(chan *pubsub2.Message, 1)

eligibleWorkers := getEligibleWorkers(node, message)
workerIterator := newRoundRobinIterator(eligibleWorkers)

for retries := 0; retries < maxRetries; retries++ {
success := tryWorkersRoundRobin(node, eligibleWorkers, message, responseCollector)
if success {
return
}
logrus.Warnf("All workers failed, retry attempt %d of %d", retries+1, maxRetries)
}

logrus.Error("All workers failed to process the work after maximum retries")
}

func tryWorkersRoundRobin(node *masa.OracleNode, workers []Worker, message *messages.Work, responseCollector chan *pubsub2.Message) bool {
workerIterator := newRoundRobinIterator(workers)

for workerIterator.HasNext() {
worker := workerIterator.Next()

go func(w Worker) {
if w.IsLocal {
handleLocalWorker(node, pid, message, responseCollector)
} else {
handleRemoteWorker(node, w.NodeData, w.IPAddr, props, message, responseCollector)
}
}(worker)
success := tryWorker(node, worker, message, responseCollector)
if success {
return true
}
}

return false
}

func tryWorker(node *masa.OracleNode, worker Worker, message *messages.Work, responseCollector chan *pubsub2.Message) bool {
workerDone := make(chan bool, 1)

go func() {
if worker.IsLocal {
handleLocalWorker(node, node.ActorEngine.Spawn(actor.PropsFromProducer(NewWorker(node))), message, responseCollector)
} else {
handleRemoteWorker(node, worker.NodeData, worker.IPAddr, actor.PropsFromProducer(NewWorker(node)), message, responseCollector)
}
workerDone <- true
}()

select {
case <-workerDone:
// Worker finished, check response
select {
case response := <-responseCollector:
if isSuccessfulResponse(response) {
processAndSendResponse(response)
return
return true
}
// If response is not successful, continue to next worker
case <-time.After(workerTimeout):
logrus.Warnf("Worker %v timed out, moving to next worker", worker.NodeData.PeerId)
default:
// No response in channel, continue to next worker
}
case <-time.After(workerTimeout):
logrus.Warnf("Worker %v timed out", worker.NodeData.PeerId)
}

logrus.Error("All workers failed to process the work")
return false
}

func createWorkMessage(m *pubsub2.Message, pid *actor.PID) *messages.Work {
Expand Down Expand Up @@ -137,28 +169,32 @@ func handleRemoteWorker(node *masa.OracleNode, p pubsub.NodeData, ipAddr string,
"peer": p.PeerId,
}).Info("Handling remote worker")

spawned, err := node.ActorRemote.SpawnNamed(fmt.Sprintf("%s:4001", ipAddr), "worker", "peer", -1)
if err != nil {
logrus.WithError(err).WithField("ip", ipAddr).Error("Failed to spawn remote worker")
handleWorkerError(err, responseCollector)
return
var spawned *actor.PID
var err error

// Attempt to spawn the remote worker multiple times
for attempt := 1; attempt <= maxSpawnAttempts; attempt++ {
spawned, err = spawnRemoteWorker(node, ipAddr)
if err == nil {
break
}
logrus.WithError(err).WithFields(logrus.Fields{
"ip": ipAddr,
"attempt": attempt,
}).Warn("Failed to spawn remote worker, retrying")
time.Sleep(time.Second * time.Duration(attempt)) // Exponential backoff
}

spawnedPID := spawned.Pid
if spawnedPID == nil {
err := fmt.Errorf("failed to spawn remote worker: PID is nil for IP %s", ipAddr)
logrus.WithFields(logrus.Fields{
"ip": ipAddr,
"peer": p.PeerId,
}).Error(err)
if err != nil {
logrus.WithError(err).WithField("ip", ipAddr).Error("Failed to spawn remote worker after multiple attempts")
handleWorkerError(err, responseCollector)
return
}

client := node.ActorEngine.Spawn(props)
node.ActorEngine.Send(spawnedPID, &messages.Connect{Sender: client})
node.ActorEngine.Send(spawned, &messages.Connect{Sender: client})

future := node.ActorEngine.RequestFuture(spawnedPID, message, workerTimeout)
future := node.ActorEngine.RequestFuture(spawned, message, workerTimeout)
result, err := future.Result()
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
Expand All @@ -176,6 +212,19 @@ func handleRemoteWorker(node *masa.OracleNode, p pubsub.NodeData, ipAddr string,
processWorkerResponse(result, p.PeerId, responseCollector)
}

func spawnRemoteWorker(node *masa.OracleNode, ipAddr string) (*actor.PID, error) {
spawned, err := node.ActorRemote.SpawnNamed(fmt.Sprintf("%s:4001", ipAddr), "worker", "peer", -1)
if err != nil {
return nil, err
}

if spawned == nil || spawned.Pid == nil {
return nil, fmt.Errorf("failed to spawn remote worker: PID is nil for IP %s", ipAddr)
}

return spawned.Pid, nil
}

func handleWorkerError(err error, responseCollector chan<- *pubsub2.Message) {
logrus.Errorf("[-] Error with worker: %v", err)
responseCollector <- &pubsub2.Message{
Expand Down

0 comments on commit d140ce2

Please sign in to comment.