diff --git a/worker/worker.go b/worker/worker.go index 9bd7a92..b6add6a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -2,6 +2,7 @@ package worker import ( "context" + "errors" "log" "os" "sync" @@ -13,7 +14,7 @@ type Worker struct { workerContext context.Context // Context for the worker's operations. mutex sync.RWMutex // Mutex to control access to shared resources. stopCh chan struct{} // Channel to signal the worker to stop processing tasks. - collector <-chan worker.Task // Channel to receive jobs from the pool's dispatcher. + queue <-chan worker.Task // Channel to receive jobs from the pool's dispatcher. currentProcess worker.Task // Current task being processed by the worker. status worker.Status // Current status of the worker (e.g., running, stopped). errCh chan *worker.Error // Channel to send and receive errors that occur in the worker. @@ -47,3 +48,29 @@ func (w *Worker) SetContext(ctx context.Context) { w.workerContext = ctx } } + +// SetQueue sets the task queue channel for the worker. This method allows +// the worker to be assigned a new task queue channel, which it will use +// to receive tasks. The method ensures that the provided channel is open before +// setting it, returning an error if the channel is closed. +func (w *Worker) SetQueue(queue chan worker.Task) error { + // Use a select statement with a default case to check if the provided channel is closed. + // The select statement attempts to receive from the queue channel. + select { + // Attempt to receive from the queue channel. + case _, ok := <-queue: + // If the receive operation fails, the channel is closed. + // Return an error indicating that the channel is closed. + if !ok { + return errors.New("queue chan is close") + } + // If the receive operation would block, continue without doing anything. + default: + } + + // Set the worker's queue channel to the provided queue channel. + w.queue = queue + + // Return nil indicating that the operation was successful. + return nil +}