From d6fedda82b5349504d72645f9c2be2942ad99f06 Mon Sep 17 00:00:00 2001 From: Jeff Learman Date: Wed, 26 Jun 2024 11:40:57 -0400 Subject: [PATCH] Avoid polling in worker The polling architecture with 200 ms sleep in ProcessEventMessages() has two drawbacks: - it imposes a rate limit of 5 events per second for any input source - it imposes an average latency of 100 ms per event The polling loop can be avoided using a goroutine per worker that posts incoming events to the `messageStream` channel, and pending on that channel in ProcessEventMessages(). A short `messageStream` queue guarantees fairness: no worker's latest event will be postponed for more than the length of the channel. Signed-off-by: Jeff Learman --- worker/worker.go | 71 ++++++++++++++++++------------------------------ 1 file changed, 27 insertions(+), 44 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 8dc2ff175..f15cc5fd4 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -517,7 +517,7 @@ func (m *MessageHandlerRegistry) Add(mh interface { } } -func (m *MessageHandlerRegistry) Remove(name string) { +func (m *MessageHandlerRegistry) remove(name string) { if _, ok := m.Handlers[name]; ok { delete(m.Handlers, name) } @@ -542,7 +542,7 @@ func eventHandler(incoming events.Message, workers *MessageHandlerRegistry) (str switch incoming.(type) { case *events.WorkerStopMessage: msg, _ := incoming.(*events.WorkerStopMessage) - workers.Remove(msg.Name()) + workers.remove(msg.Name()) workerStatusManager.SetWorkerStatus(msg.Name(), STATUS_TERMINATED) return successMsg, nil } @@ -557,29 +557,24 @@ func eventHandler(incoming events.Message, workers *MessageHandlerRegistry) (str return successMsg, nil } -// This function combines all messages (events) from workers into a single global message queue. From this -// global queue, each message will get delivered to each worker by the event handler function. -func mux(workers *MessageHandlerRegistry, muxed chan events.Message) chan events.Message { - +// Start a goroutine for each worker to forward worker messages into the given "multiplexed" channel. +// From this global queue, each message will get delivered to each worker by the event handler function. +func mux(workers *MessageHandlerRegistry, muxed chan events.Message) { for _, w := range workers.Handlers { - select { - case ev := <-(*w).Messages(): - muxed <- ev - default: // nothing - } + go func(c <-chan events.Message) { + for v := range c { + muxed <- v + } + }((*w).Messages()) } - - return muxed } func (workers *MessageHandlerRegistry) ProcessEventMessages() { - // 200 messages should be plenty. We will never get more than 1 message from every worker each time - // we write into this stream. messageStream := make(chan events.Message, 200) + mux(workers, messageStream) - last := int64(0) - + // Process messages on the combined worker message queue as they arrive for { // Exit the event processing loop if all workers have deregistered. if workers.IsEmpty() { @@ -587,35 +582,23 @@ func (workers *MessageHandlerRegistry) ProcessEventMessages() { break } - // Grab messages that are outbound from the workers. - messageStream = mux(workers, messageStream) - - // Process any new messages on the combined worker message queue. - done := false - for !done { - select { - case msg := <-messageStream: - glog.V(3).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg.ShortString()))) - glog.V(5).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg))) - - // Push outbound messages into each worker. - if successMsg, err := eventHandler(msg, workers); err != nil { - // error! do some barfing and then continue - glog.Errorf(mdLogString(fmt.Sprintf("Error occurred handling message: %s, Error: %v\n", msg, err))) - } else { - glog.V(2).Infof(mdLogString(fmt.Sprintf("Success handling message: %s\n", successMsg))) - } - default: - now := time.Now().Unix() - if now-last > 30 { - glog.V(5).Infof(mdLogString(fmt.Sprintf("No incoming messages for router to handle"))) - last = now - } - done = true - } + msg, ok := <-messageStream + if !ok { + // channel closed: currently won't happen + glog.V(3).Infof(mdLogString("Muxed channel closed: Terminating")) + break } - time.Sleep(200 * time.Millisecond) + glog.V(3).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg.ShortString()))) + glog.V(5).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg))) + + // Push outbound messages into each worker. + if successMsg, err := eventHandler(msg, workers); err != nil { + // error! do some barfing and then continue + glog.Errorf(mdLogString(fmt.Sprintf("Error occurred handling message: %s, Error: %v\n", msg, err))) + } else { + glog.V(2).Infof(mdLogString(fmt.Sprintf("Success handling message: %s\n", successMsg))) + } } // Brief delay just in case.