From 492048b8107f6c6a460b6f1bc1ede0ddbd27da0a Mon Sep 17 00:00:00 2001 From: Komal Sukhani Date: Tue, 5 Nov 2019 18:24:55 +0530 Subject: [PATCH] Option to store analytics per minute (#167) Fixes #164 Added `store_analytics_per_minute` config option in aggregate and hybrid pumps. Currently, we generate aggregate data per hour. If this option is enabled, aggregate data will be generated per minute. --- analytics/aggregate.go | 10 +++++++--- pumps/hybrid.go | 13 +++++++++---- pumps/mongo_aggregate.go | 3 ++- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/analytics/aggregate.go b/analytics/aggregate.go index a10380361..ad63f8948 100644 --- a/analytics/aggregate.go +++ b/analytics/aggregate.go @@ -213,7 +213,7 @@ func (f *AnalyticsRecordAggregate) AsChange() bson.M { newUpdate = f.generateBSONFromProperty("", "total", &f.Total, newUpdate) asTime := f.TimeStamp - newTime := time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), 0, 0, 0, asTime.Location()) + newTime := time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), asTime.Minute(), 0, 0, asTime.Location()) newUpdate["$set"].(bson.M)["timestamp"] = newTime newUpdate["$set"].(bson.M)["expireAt"] = f.ExpireAt newUpdate["$set"].(bson.M)["timeid.year"] = newTime.Year() @@ -369,7 +369,7 @@ func ignoreTag(tag string, ignoreTagPrefixList []string) bool { } // AggregateData calculates aggregated data, returns map orgID => aggregated analytics data -func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList []string) map[string]AnalyticsRecordAggregate { +func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList []string, storeAnalyticPerMinute bool) map[string]AnalyticsRecordAggregate { analyticsPerOrg := make(map[string]AnalyticsRecordAggregate) for _, v := range data { @@ -387,7 +387,11 @@ func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList [ // Set the hourly timestamp & expiry asTime := thisV.TimeStamp - thisAggregate.TimeStamp = time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), 0, 0, 0, asTime.Location()) + if storeAnalyticPerMinute { + thisAggregate.TimeStamp = time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), asTime.Minute(), 0, 0, asTime.Location()) + } else { + thisAggregate.TimeStamp = time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), 0, 0, 0, asTime.Location()) + } thisAggregate.ExpireAt = thisV.ExpireAt thisAggregate.TimeID.Year = asTime.Year() thisAggregate.TimeID.Month = int(asTime.Month()) diff --git a/pumps/hybrid.go b/pumps/hybrid.go index 066b45940..f10516f8a 100644 --- a/pumps/hybrid.go +++ b/pumps/hybrid.go @@ -39,9 +39,10 @@ var ( // HybridPump allows to send analytics to MDCB over RPC type HybridPump struct { - aggregated bool - trackAllPaths bool - ignoreTagPrefixList []string + aggregated bool + trackAllPaths bool + storeAnalyticPerMinute bool + ignoreTagPrefixList []string } func (p *HybridPump) GetName() string { @@ -118,6 +119,10 @@ func (p *HybridPump) Init(config interface{}) error { p.trackAllPaths = trackAllPaths.(bool) } + if storeAnalyticPerMinute, ok := meta["store_analytics_per_minute"]; ok { + p.storeAnalyticPerMinute = storeAnalyticPerMinute.(bool) + } + if list, ok := meta["ignore_tag_prefix_list"]; ok { ignoreTagPrefixList := list.([]interface{}) p.ignoreTagPrefixList = make([]string, len(ignoreTagPrefixList)) @@ -162,7 +167,7 @@ func (p *HybridPump) WriteData(data []interface{}) error { } } else { // send aggregated data // calculate aggregates - aggregates := analytics.AggregateData(data, p.trackAllPaths, p.ignoreTagPrefixList) + aggregates := analytics.AggregateData(data, p.trackAllPaths, p.ignoreTagPrefixList, p.storeAnalyticPerMinute) // turn map with analytics aggregates into JSON payload jsonData, err := json.Marshal(aggregates) diff --git a/pumps/mongo_aggregate.go b/pumps/mongo_aggregate.go index d26623790..4b6b7014e 100644 --- a/pumps/mongo_aggregate.go +++ b/pumps/mongo_aggregate.go @@ -34,6 +34,7 @@ type MongoAggregateConf struct { TrackAllPaths bool `mapstructure:"track_all_paths"` IgnoreTagPrefixList []string `mapstructure:"ignore_tag_prefix_list"` ThresholdLenTagList int `mapstructure:"threshold_len_tag_list"` + StoreAnalyticsPerMinute bool `mapstructure:"store_analytics_per_minute"` } func (m *MongoAggregatePump) New() Pump { @@ -215,7 +216,7 @@ func (m *MongoAggregatePump) WriteData(data []interface{}) error { m.WriteData(data) } else { // calculate aggregates - analyticsPerOrg := analytics.AggregateData(data, m.dbConf.TrackAllPaths, m.dbConf.IgnoreTagPrefixList) + analyticsPerOrg := analytics.AggregateData(data, m.dbConf.TrackAllPaths, m.dbConf.IgnoreTagPrefixList, m.dbConf.StoreAnalyticsPerMinute) // put aggregated data into MongoDB for orgID, filteredData := range analyticsPerOrg {