Skip to content

Commit

Permalink
set queue chan
Browse files Browse the repository at this point in the history
  • Loading branch information
SyntaxErrorLineNULL committed Aug 17, 2024
1 parent 5a938d5 commit 4813bfa
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker

import (
"context"
"errors"
"log"
"os"
"sync"
Expand All @@ -13,7 +14,7 @@ type Worker struct {
workerContext context.Context // Context for the worker's operations.
mutex sync.RWMutex // Mutex to control access to shared resources.
stopCh chan struct{} // Channel to signal the worker to stop processing tasks.
collector <-chan worker.Task // Channel to receive jobs from the pool's dispatcher.
queue <-chan worker.Task // Channel to receive jobs from the pool's dispatcher.
currentProcess worker.Task // Current task being processed by the worker.
status worker.Status // Current status of the worker (e.g., running, stopped).
errCh chan *worker.Error // Channel to send and receive errors that occur in the worker.
Expand Down Expand Up @@ -47,3 +48,29 @@ func (w *Worker) SetContext(ctx context.Context) {
w.workerContext = ctx
}
}

// SetQueue sets the task queue channel for the worker. This method allows
// the worker to be assigned a new task queue channel, which it will use
// to receive tasks. The method ensures that the provided channel is open before
// setting it, returning an error if the channel is closed.
func (w *Worker) SetQueue(queue chan worker.Task) error {
// Use a select statement with a default case to check if the provided channel is closed.
// The select statement attempts to receive from the queue channel.
select {
// Attempt to receive from the queue channel.
case _, ok := <-queue:
// If the receive operation fails, the channel is closed.
// Return an error indicating that the channel is closed.
if !ok {
return errors.New("queue chan is close")
}
// If the receive operation would block, continue without doing anything.
default:
}

// Set the worker's queue channel to the provided queue channel.
w.queue = queue

// Return nil indicating that the operation was successful.
return nil
}

0 comments on commit 4813bfa

Please sign in to comment.