From 7e6b981b062cffdffbcf804f3bf99812bbc4933e Mon Sep 17 00:00:00 2001 From: Deeptiman Date: Wed, 28 Jul 2021 20:34:24 +0530 Subject: [PATCH] Create Batch [Logic] --- producer.go | 37 ++++++++++++------------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/producer.go b/producer.go index 064c070..497f3f7 100644 --- a/producer.go +++ b/producer.go @@ -7,8 +7,7 @@ import ( log "github.com/sirupsen/logrus" ) -var ( - itemCounter = 1 +var ( DefaultMaxItems = uint64(100) // maximum no of items packed inside a Batch DefaultMaxWait = time.Duration(30) * time.Second //seconds DefaultBatchNo = int32(1) @@ -74,24 +73,23 @@ func (p *BatchProducer) WatchProducer() { case item := <-p.Watcher: item.BatchNo = int(p.getBatchNo()) - p.Log.WithFields(log.Fields{"Id": item.Id, "BatchNo": item.BatchNo, "Item": item.Item}).Info("BatchProducer") - - items = append(items, *item) - itemCounter++ - if p.isBatchReady() { - p.Log.WithFields(log.Fields{"Item Size": len(items), "MaxItems": p.MaxItems}).Warn("BatchReady") - - itemCounter = 0 + p.Log.WithFields(log.Fields{"Id": item.Id, "Batch_Break": item.Id / int(p.MaxItems), "BatchNo": item.BatchNo, "Item": item.Item}).Info("BatchProducer") + + items = append(items, *item) + + if (item.Id / int(p.MaxItems)) == item.BatchNo { + p.Log.WithFields(log.Fields{"Item Size": len(items), "MaxItems": p.MaxItems}).Warn("BatchReady") items = p.releaseBatch(items) + p.createBatchNo() } - + case <-time.After(p.MaxWait): p.Log.WithFields(log.Fields{"Items": len(items)}).Warn("MaxWait") if len(items) == 0 { return } - itemCounter = 0 + items = p.releaseBatch(items) case <-p.Quit: p.Log.Warn("Quit BatchProducer") @@ -128,23 +126,12 @@ func (p *BatchProducer) CheckRemainingItems(done chan bool) { done <- true } -// isBatchReady verfies that whether the batch ItemCounter++ increases to the MaxItems value -// to create a Batch. -func (p *BatchProducer) isBatchReady() bool { - return uint64(itemCounter) >= p.MaxItems -} - // addBatchNo will increases the current BatchNo to 1 atomically. -func (p *BatchProducer) addBatchNo() { +func (p *BatchProducer) createBatchNo() { atomic.AddInt32(&p.BatchNo, 1) } // getBatchNo will get the current BatchNo from the atomic variable. func (p *BatchProducer) getBatchNo() int32 { - - if itemCounter == 0 { - p.addBatchNo() - } - return atomic.LoadInt32(&p.BatchNo) -} +} \ No newline at end of file