From 5acb2cacfc36abd73ef37907b3c1cb174ee2f9ce Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 29 May 2024 13:23:47 -0700 Subject: [PATCH] Return receive-only channel in GetMsgChan() (#350) This is a small improvement: the message channel retrieved from the collecting process should be "receive-only". Signed-off-by: Antonin Bas --- pkg/collector/process.go | 2 +- pkg/intermediate/aggregate.go | 4 ++-- pkg/intermediate/worker.go | 4 ++-- pkg/kafka/producer/kafka.go | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/collector/process.go b/pkg/collector/process.go index bad76377..66c7db3a 100644 --- a/pkg/collector/process.go +++ b/pkg/collector/process.go @@ -127,7 +127,7 @@ func (cp *CollectingProcess) GetAddress() net.Addr { return cp.netAddress } -func (cp *CollectingProcess) GetMsgChan() chan *entities.Message { +func (cp *CollectingProcess) GetMsgChan() <-chan *entities.Message { return cp.messageChan } diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index d63188da..55ae3e3b 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -43,7 +43,7 @@ type AggregationProcess struct { // mutex allows multiple readers or one writer at the same time mutex sync.RWMutex // messageChan is the channel to receive the message - messageChan chan *entities.Message + messageChan <-chan *entities.Message // workerNum is the number of workers to process the messages workerNum int // workerList is the list of workers @@ -71,7 +71,7 @@ type AggregationProcess struct { } type AggregationInput struct { - MessageChan chan *entities.Message + MessageChan <-chan *entities.Message WorkerNum int CorrelateFields []string AggregateElements *AggregationElements diff --git a/pkg/intermediate/worker.go b/pkg/intermediate/worker.go index fab9c7ff..1051c94b 100644 --- a/pkg/intermediate/worker.go +++ b/pkg/intermediate/worker.go @@ -8,12 +8,12 @@ import ( type worker struct { id int - messageChan chan *entities.Message + messageChan <-chan *entities.Message errChan chan bool job func(*entities.Message) error } -func createWorker(id int, messageChan chan *entities.Message, job func(*entities.Message) error) *worker { +func createWorker(id int, messageChan <-chan *entities.Message, job func(*entities.Message) error) *worker { return &worker{ id, messageChan, diff --git a/pkg/kafka/producer/kafka.go b/pkg/kafka/producer/kafka.go index 41737e7b..f7bbf757 100644 --- a/pkg/kafka/producer/kafka.go +++ b/pkg/kafka/producer/kafka.go @@ -186,7 +186,7 @@ func (kp *KafkaProducer) SendFlowMessage(msg protoreflect.Message, kafkaDelimitM // PublishIPFIXMessages takes in a message channel as input and converts all the messages on // the message channel to flow messages in proto schema. This function exits when // the input message channel is closed. -func (kp *KafkaProducer) PublishIPFIXMessages(msgCh chan *entities.Message) { +func (kp *KafkaProducer) PublishIPFIXMessages(msgCh <-chan *entities.Message) { for msg := range msgCh { flowMsgs := kp.input.ProtoSchemaConvertor.ConvertIPFIXMsgToFlowMsgs(msg) for _, flowMsg := range flowMsgs {