Skip to content

Commit

Permalink
Option to store analytics per minute (#167)
Browse files Browse the repository at this point in the history
Fixes #164 
Added `store_analytics_per_minute` config option in aggregate and hybrid pumps.

Currently, we generate aggregate data per hour. If this option is enabled, aggregate data will be generated per minute.
  • Loading branch information
komalsukhani authored and buger committed Nov 5, 2019
1 parent 6a1381c commit 492048b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
10 changes: 7 additions & 3 deletions analytics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (f *AnalyticsRecordAggregate) AsChange() bson.M {
newUpdate = f.generateBSONFromProperty("", "total", &f.Total, newUpdate)

asTime := f.TimeStamp
newTime := time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), 0, 0, 0, asTime.Location())
newTime := time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), asTime.Minute(), 0, 0, asTime.Location())
newUpdate["$set"].(bson.M)["timestamp"] = newTime
newUpdate["$set"].(bson.M)["expireAt"] = f.ExpireAt
newUpdate["$set"].(bson.M)["timeid.year"] = newTime.Year()
Expand Down Expand Up @@ -369,7 +369,7 @@ func ignoreTag(tag string, ignoreTagPrefixList []string) bool {
}

// AggregateData calculates aggregated data, returns map orgID => aggregated analytics data
func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList []string) map[string]AnalyticsRecordAggregate {
func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList []string, storeAnalyticPerMinute bool) map[string]AnalyticsRecordAggregate {
analyticsPerOrg := make(map[string]AnalyticsRecordAggregate)

for _, v := range data {
Expand All @@ -387,7 +387,11 @@ func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList [

// Set the hourly timestamp & expiry
asTime := thisV.TimeStamp
thisAggregate.TimeStamp = time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), 0, 0, 0, asTime.Location())
if storeAnalyticPerMinute {
thisAggregate.TimeStamp = time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), asTime.Minute(), 0, 0, asTime.Location())
} else {
thisAggregate.TimeStamp = time.Date(asTime.Year(), asTime.Month(), asTime.Day(), asTime.Hour(), 0, 0, 0, asTime.Location())
}
thisAggregate.ExpireAt = thisV.ExpireAt
thisAggregate.TimeID.Year = asTime.Year()
thisAggregate.TimeID.Month = int(asTime.Month())
Expand Down
13 changes: 9 additions & 4 deletions pumps/hybrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ var (

// HybridPump allows to send analytics to MDCB over RPC
type HybridPump struct {
aggregated bool
trackAllPaths bool
ignoreTagPrefixList []string
aggregated bool
trackAllPaths bool
storeAnalyticPerMinute bool
ignoreTagPrefixList []string
}

func (p *HybridPump) GetName() string {
Expand Down Expand Up @@ -118,6 +119,10 @@ func (p *HybridPump) Init(config interface{}) error {
p.trackAllPaths = trackAllPaths.(bool)
}

if storeAnalyticPerMinute, ok := meta["store_analytics_per_minute"]; ok {
p.storeAnalyticPerMinute = storeAnalyticPerMinute.(bool)
}

if list, ok := meta["ignore_tag_prefix_list"]; ok {
ignoreTagPrefixList := list.([]interface{})
p.ignoreTagPrefixList = make([]string, len(ignoreTagPrefixList))
Expand Down Expand Up @@ -162,7 +167,7 @@ func (p *HybridPump) WriteData(data []interface{}) error {
}
} else { // send aggregated data
// calculate aggregates
aggregates := analytics.AggregateData(data, p.trackAllPaths, p.ignoreTagPrefixList)
aggregates := analytics.AggregateData(data, p.trackAllPaths, p.ignoreTagPrefixList, p.storeAnalyticPerMinute)

// turn map with analytics aggregates into JSON payload
jsonData, err := json.Marshal(aggregates)
Expand Down
3 changes: 2 additions & 1 deletion pumps/mongo_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type MongoAggregateConf struct {
TrackAllPaths bool `mapstructure:"track_all_paths"`
IgnoreTagPrefixList []string `mapstructure:"ignore_tag_prefix_list"`
ThresholdLenTagList int `mapstructure:"threshold_len_tag_list"`
StoreAnalyticsPerMinute bool `mapstructure:"store_analytics_per_minute"`
}

func (m *MongoAggregatePump) New() Pump {
Expand Down Expand Up @@ -215,7 +216,7 @@ func (m *MongoAggregatePump) WriteData(data []interface{}) error {
m.WriteData(data)
} else {
// calculate aggregates
analyticsPerOrg := analytics.AggregateData(data, m.dbConf.TrackAllPaths, m.dbConf.IgnoreTagPrefixList)
analyticsPerOrg := analytics.AggregateData(data, m.dbConf.TrackAllPaths, m.dbConf.IgnoreTagPrefixList, m.dbConf.StoreAnalyticsPerMinute)

// put aggregated data into MongoDB
for orgID, filteredData := range analyticsPerOrg {
Expand Down

0 comments on commit 492048b

Please sign in to comment.