Skip to content

Commit

Permalink
feat: Implement round-robin worker selection for distributed task pro…
Browse files Browse the repository at this point in the history
…cessing

- Add round-robin iterator for fair worker distribution
- Prioritize local worker when eligible
- Cycle through remote workers in subsequent calls
- Improve error handling and logging for worker responses
- Enhance code readability and maintainability

This update ensures a balanced workload across all available workers
over time, while still prioritizing local processing when possible.
  • Loading branch information
teslashibe committed Aug 5, 2024
1 parent 349386f commit 4db2ee0
Showing 1 changed file with 33 additions and 26 deletions.
59 changes: 33 additions & 26 deletions pkg/workers/send_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package workers

import (
"fmt"
"sync"
"time"

masa "github.com/masa-finance/masa-oracle/pkg"
Expand All @@ -18,8 +17,6 @@ import (

const (
workerTimeout = 30 * time.Second
queueTimeout = 8 * time.Second
maxWorkers = 5
)

type Worker struct {
Expand All @@ -35,29 +32,35 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) {
pid := node.ActorEngine.Spawn(props)
message := createWorkMessage(m, pid)

responseCollector := make(chan *pubsub2.Message, 100)
timeout := time.After(queueTimeout)
responseCollector := make(chan *pubsub2.Message, 1)

eligibleWorkers := getEligibleWorkers(node, message)
workerIterator := newRoundRobinIterator(eligibleWorkers)

var wg sync.WaitGroup
for i := 0; i < maxWorkers && workerIterator.HasNext(); i++ {
for workerIterator.HasNext() {
worker := workerIterator.Next()
wg.Add(1)

go func(w Worker) {
defer wg.Done()
if w.IsLocal {
handleLocalWorker(node, pid, message, responseCollector)
} else {
handleRemoteWorker(node, w.NodeData, w.IPAddr, props, message, responseCollector)
}
}(worker)
}

go queueResponses(responseCollector, timeout)
select {
case response := <-responseCollector:
if isSuccessfulResponse(response) {
processAndSendResponse(response)
return
}
// If response is not successful, continue to next worker
case <-time.After(workerTimeout):
logrus.Warnf("Worker %v timed out, moving to next worker", worker.NodeData.PeerId)
}
}

wg.Wait()
logrus.Error("All workers failed to process the work")
}

func createWorkMessage(m *pubsub2.Message, pid *actor.PID) *messages.Work {
Expand Down Expand Up @@ -195,19 +198,23 @@ func processWorkerResponse(result interface{}, workerID interface{}, responseCol
responseCollector <- msg
}

func queueResponses(responseCollector <-chan *pubsub2.Message, timeout <-chan time.Time) {
var responses []*pubsub2.Message
for {
select {
case response := <-responseCollector:
responses = append(responses, response)
logrus.Infof("Adding response from %s to responses list. Total responses: %d", response.ReceivedFrom, len(responses))
case <-timeout:
logrus.Infof("Timeout reached, sending all responses to workerDoneCh. Total responses: %d", len(responses))
for _, resp := range responses {
workerDoneCh <- resp
}
return
}
func isSuccessfulResponse(response *pubsub2.Message) bool {
if response.ValidatorData == nil {
return true
}
validatorData, ok := response.ValidatorData.(map[string]interface{})
if !ok {
return false
}
errorVal, exists := validatorData["error"]
return !exists || errorVal == nil
}

func processAndSendResponse(response *pubsub2.Message) {
logrus.Infof("Processing and sending successful response")
workerDoneCh <- response
}

func init() {
workerDoneCh = make(chan *pubsub2.Message, 100)
}

0 comments on commit 4db2ee0

Please sign in to comment.