-
Notifications
You must be signed in to change notification settings - Fork 0
/
stats.go
972 lines (848 loc) · 29.1 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
// Copyright 2022 Blues Inc. All rights reserved.
// Use of this source code is governed by licenses granted by the
// copyright holder including that found in the LICENSE file.
package main
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"sort"
"sync"
"time"
)
// Standard or zip file
const zipType = ".zip"
const jsonType = ".json"
const currentType = zipType
// AggregatedStat is a structure used to aggregate stats across service instances
type AggregatedStat struct {
Started int64 `json:"started,omitempty"`
Time int64 `json:"time,omitempty"`
DiskReads uint64 `json:"disk_read,omitempty"`
DiskWrites uint64 `json:"disk_write,omitempty"`
NetReceived uint64 `json:"net_received,omitempty"`
NetSent uint64 `json:"net_sent,omitempty"`
HttpConnTotal uint64 `json:"http_conn,omitempty"`
HttpConnReused uint64 `json:"http_conn_reused,omitempty"`
HandlersEphemeral int64 `json:"handlers_ephemeral,omitempty"`
HandlersDiscovery int64 `json:"handlers_discovery,omitempty"`
HandlersContinuous int64 `json:"handlers_continuous,omitempty"`
HandlersNotification int64 `json:"handlers_notification,omitempty"`
NewHandlersEphemeral int64 `json:"handlers_ephemeral_new,omitempty"`
NewHandlersDiscovery int64 `json:"handlers_discovery_new,omitempty"`
NewHandlersContinuous int64 `json:"handlers_continuous_new,omitempty"`
NewHandlersNotification int64 `json:"handlers_notification_new,omitempty"`
EventsReceived int64 `json:"events_received,omitempty"`
EventsRouted int64 `json:"events_routed,omitempty"`
DatabaseReads int64 `json:"database_reads,omitempty"`
DatabaseWrites int64 `json:"database_writes,omitempty"`
APITotal int64 `json:"api_total,omitempty"`
Databases map[string]StatsDatabase `json:"databases,omitempty"`
Caches map[string]StatsCache `json:"caches,omitempty"`
API map[string]int64 `json:"api,omitempty"`
Fatals map[string]int64 `json:"fatals,omitempty"`
}
// Periodic stats publisher. The stats publisher maintains, in the local system's data directory,
// a file that shadows what it keeps in-memory: 1 day's worth of stats data starting at UTC midnight.
// One of these files is maintained for each host being monitored. On an hourly basis aligned with
// midnight UTC, these files are archived to an S3 bucket.
//
// Separately, there is a goroutine responsible for examining the in-memory structure and streaming
// new values out to real-time listeners including DataDog. This process takes our native stats
// format, aggregates the service endpoints, converts our stats into publishable metrics, and
// publishes it.
// This represents a set of stats aggregated for a host. We use this structure for
// the files we write (which are UTC midnight-based for 1 day), and we use the structure
// for the in-memory structure we maintain (which is up to a rolling 48-hours).
type HostStats struct {
Name string `json:"name,omitempty"`
Addr string `json:"address,omitempty"`
Time int64 `json:"time,omitempty"`
BucketMins int64 `json:"minutes,omitempty"`
Stats map[string][]StatsStat `json:"stats,omitempty"`
}
// Globals
const secs1Day = (60 * 60 * 24)
var statsInitCompleted int64
var statsMaintainNow *Event
var statsLock sync.Mutex
var stats map[string]HostStats
var statsServiceVersions map[string]string
// Trace
const addStatsTrace = true
// Stats maintenance task
func statsMaintainer() {
var err error
// Load past stats into the in-memory maps
statsInit()
// Wait for a signal to update them, or a timeout
for {
lastUpdatedDay := todayTime()
// Proceed if signalled, else do this several times per hour
// because stats are only maintained by services for an hour.
statsMaintainNow.Wait(time.Minute * time.Duration(Config.MonitorPeriodMins))
// Maintain for every enabled host
for _, host := range Config.MonitoredHosts {
if !host.Disabled {
_, _, err = statsUpdateHost(host.Name, host.Addr, lastUpdatedDay != todayTime())
if err != nil {
fmt.Printf("%s: error updating stats: %s\n", host.Name, err)
}
}
}
}
}
// Get the stats filename for a given UTC date
func statsFilename(host string, serviceVersion string, filetime int64, filetype string) (filename string) {
return host + "-" + serviceVersion + "-" + time.Unix(filetime, 0).Format("20060102") + filetype
}
// Get the stats filename's full path
func statsFilepath(host string, serviceVersion string, filetime int64, filetype string) (filepath string) {
return configDataDirectory + "/" + statsFilename(host, serviceVersion, filetime, filetype)
}
// Load stats from files
func uLoadStats(hostname string, hostaddr string, serviceVersion string, bucketSecs int64) (err error) {
// Begin by clearing out the host
statsServiceVersions[hostname] = ""
uStatsVerify(hostname, hostaddr, serviceVersion, bucketSecs)
// Load the files
var hs HostStats
hs, err = readFileLocally(hostname, serviceVersion, todayTime())
if err != nil {
err = nil
} else {
added, _, err := uStatsAdd(hostname, hs.Addr, hs.Stats)
if err != nil {
fmt.Printf("stats: %s\n", err)
}
if added > 0 {
fmt.Printf("stats: loaded %d stats for %s from today\n", added, hostname)
}
}
hs, err = readFileLocally(hostname, serviceVersion, yesterdayTime())
if err != nil {
err = nil
} else {
added, _, err := uStatsAdd(hostname, hs.Addr, hs.Stats)
if err != nil {
fmt.Printf("stats: %s\n", err)
}
if added > 0 {
fmt.Printf("stats: loaded %d stats for %s from yesterday\n", added, hostname)
}
}
// Done
return
}
// Load stats from the file system and initialize for processing
func statsInit() {
// Create the maintenance event and pre-trigger a maintenance cycle
statsMaintainNow = EventNew()
statsMaintainNow.Signal()
// Initialize stats maps
stats = make(map[string]HostStats)
statsServiceVersions = make(map[string]string)
// Remember when we began initialization
statsInitCompleted = time.Now().UTC().Unix()
}
// Verify that the stats buckets are set up properly
func uStatsVerify(hostname string, hostaddr string, serviceVersion string, bucketSecs int64) {
// If service version is wrong, initialize
if serviceVersion != statsServiceVersions[hostname] {
statsServiceVersions[hostname] = serviceVersion
hs := HostStats{}
hs.Name = hostname
hs.Addr = hostaddr
hs.BucketMins = bucketSecs / 60
stats[hostname] = hs
fmt.Printf("stats: reset stats for %s\n", hostname)
}
}
// See if stats are uniform across the instances. For timing reasons it could be
// that we don't fetch them at exactly the right time
func statsAreUniform(s map[string][]StatsStat) (uniform bool, err error) {
maxTime := int64(0)
uniform = true
for _, sis := range s {
if len(sis) == 0 {
uniform = false
err = fmt.Errorf("stats unavailable")
return
}
if maxTime < sis[0].SnapshotTaken {
maxTime = sis[0].SnapshotTaken
}
if sis[0].SnapshotTaken != maxTime {
uniform = false
}
}
if uniform {
return
}
fmt.Printf("STALE HANDLER STATS\n")
for siid, sis := range s {
if maxTime != sis[0].SnapshotTaken {
fmt.Printf(" %d != %d %s\n", maxTime, sis[0].SnapshotTaken, siid)
}
}
err = fmt.Errorf("stale stats results")
return
}
// Validate the continuity of the specified stats array, to correct any possible corruption
// Note that they must have the same start time but they can be of varying lengths, because
// handlers start at different times.
func uValidateStats(fixupType string, s map[string][]StatsStat, normalizedTime int64, bucketSecs64 int64) (totalEntries int, blankEntries int, err error) {
bucketSecs := int(bucketSecs64)
if normalizedTime == 0 {
for _, sis := range s {
if len(sis) > 0 && sis[0].SnapshotTaken > normalizedTime {
normalizedTime = sis[0].SnapshotTaken
}
if sis[0].SnapshotTaken != normalizedTime {
fmt.Printf("NONUNIFORM buckets (normalizedTime: %d)\n", normalizedTime)
for siid, sis2 := range s {
fmt.Printf("%s %d\n", siid, sis2[0].SnapshotTaken)
}
break
}
}
if normalizedTime == 0 {
err = fmt.Errorf("no stats")
return
}
}
// Iterate over each stats array, normalizing to normalizedTime and normalizedLength
for siid, sis := range s {
// Do a pre-check to see if the entire array is fine
bad := false
for i := 0; i < len(sis); i++ {
if sis[i].SnapshotTaken != normalizedTime-int64(i*bucketSecs) {
bad = true
t1 := time.Unix(sis[i].SnapshotTaken, 0).UTC()
t1s := t1.Format("01-02 15:04:05")
t2 := time.Unix(normalizedTime-int64(i*bucketSecs), 0).UTC()
t2s := t2.Format("01-02 15:04:05")
fmt.Printf("fixup %s: len:%d entry %d's time %s != expected time %s\n", fixupType, len(sis), i, t1s, t2s)
}
if sis[i].OSMemTotal == 0 {
blankEntries++
}
totalEntries++
}
// Don't do the fixup if it's fine
if !bad {
continue
}
fmt.Printf("fixup %s: doing fixup: length:%d total:%d blank:%d\n", fixupType, len(sis), totalEntries, blankEntries)
statsAnalyze("BAD DATA: ", sis, int64(bucketSecs))
// Do the fixup, which is a slow process
newStats := make([]StatsStat, len(sis))
for i := 0; i < len(sis); i++ {
newStats[i].SnapshotTaken = normalizedTime - int64(bucketSecs*i)
}
for sn, stat := range sis {
i := int(normalizedTime-stat.SnapshotTaken) / bucketSecs
if i < 0 || i >= len(sis) {
fmt.Printf("can't place stat %d during fixup\n", i)
} else {
if newStats[i].SnapshotTaken != stat.SnapshotTaken {
fmt.Printf("huh?")
} else {
if sn != i {
fmt.Printf("fixup: placed %d in %d\n", sn, i)
}
newStats[i] = stat
}
}
}
// Done
fmt.Printf("fixup %s: %s FIXED UP to be of length %d instead of %d\n", fixupType, siid, len(newStats), len(s[siid]))
s[siid] = newStats
}
// Done
return
}
// Add stats to the in-memory vector of stats.
func uStatsAdd(hostname string, hostaddr string, s map[string][]StatsStat) (added int, addedStats map[string][]StatsStat, err error) {
// Exit if no map (this is to be expected in initialization cases)
if s == nil {
fmt.Printf("uStatsAdd: nil stats\n")
return
}
fmt.Printf("uStatsAdd: adding stats from %d handlers of %s\n", len(s), hostname)
// Initialize output map
addedStats = make(map[string][]StatsStat)
// Get the host's stats record
hs := stats[hostname]
if hs.Stats == nil {
hs.Stats = map[string][]StatsStat{}
}
bucketMins := hs.BucketMins
bucketSecs := (bucketMins * 60)
// Exit if hoststats is invalid
if hs.BucketMins == 0 {
err = fmt.Errorf("uStatsAdd: %s: *** invalid host stats ***", hostname)
fmt.Printf("%s\n", err)
return
}
// Validate both existing stats arrays and the ones being added, just as a sanity check
if len(s) > 0 {
var totalEntries, blankEntries int
totalEntries, blankEntries, err = uValidateStats("new", s, 0, bucketSecs)
if err != nil {
return
}
if blankEntries > 0 {
fmt.Printf("uStatsAdd: adding %d blank entries (of %d total) to %s\n", blankEntries, totalEntries, hostname)
}
}
if len(hs.Stats) > 0 {
_, _, err = uValidateStats("existing", hs.Stats, hs.Time, bucketSecs)
if err != nil {
return
}
}
// Make sure there are map entries for all the service instances we're adding, and
// that we can always feel safe in referencing the [0] entry of a stats array.
// Also, compute the least recent time so we can extend all arrays to that point
var mostRecentTime, leastRecentTime int64
for siid, sis := range s {
if hs.Stats[siid] == nil {
hs.Stats[siid] = []StatsStat{}
}
if mostRecentTime == 0 {
mostRecentTime = sis[0].SnapshotTaken
}
if mostRecentTime != sis[0].SnapshotTaken || mostRecentTime == 0 {
err = fmt.Errorf("uStatsAdd: %s: *** non-uniform stats time (%d != %d) ***", hostname, sis[0].SnapshotTaken, mostRecentTime)
return
}
lrt := mostRecentTime - (int64(len(sis)) * bucketSecs)
if leastRecentTime == 0 || lrt < leastRecentTime {
leastRecentTime = lrt
}
}
if addStatsTrace {
fmt.Printf("uStatsAdd: %s: recent:%d least:%d\n", hostname, mostRecentTime, leastRecentTime)
}
// If the base time needs to be updated, do so
if hs.Time == 0 {
hs.Time = mostRecentTime
fmt.Printf("uStatsAdd: %s: initializing time\n", hostname)
}
// If the time is more recent than the existing base time, extend all arrays at the front
if mostRecentTime > hs.Time {
arrayEntries := (mostRecentTime - hs.Time) / bucketSecs
if addStatsTrace {
fmt.Printf("adding %d entries at front (more recent)\n", arrayEntries)
}
z := make([]StatsStat, arrayEntries)
for i := int64(0); i < arrayEntries; i++ {
z[i].SnapshotTaken = mostRecentTime - (bucketSecs * i)
}
for siid := range hs.Stats {
hs.Stats[siid] = append(z, hs.Stats[siid]...)
}
hs.Time = mostRecentTime
}
// If the time is less recent than the one found, extend all arrays at the end
for siid, sis := range hs.Stats {
hsLeastRecentTime := hs.Time - (int64(len(sis)) * bucketSecs)
if hsLeastRecentTime > leastRecentTime {
arrayEntries := (hsLeastRecentTime - leastRecentTime) / bucketSecs
if addStatsTrace {
fmt.Printf("for %s adding %d entries at end\n", siid, arrayEntries)
}
z := make([]StatsStat, arrayEntries)
for i := int64(0); i < arrayEntries; i++ {
z[i].SnapshotTaken = hsLeastRecentTime - (bucketSecs * i)
}
hs.Stats[siid] = append(hs.Stats[siid], z...)
}
}
// Checkpoint the stats here so that if an error happens below, our stat arrays
// will at least all be the correct length
stats[hostname] = hs
// As purely a sanity check to validate the performance of the above, validate
// the core assumptions that all siids encompass the window of the stats being inserted
for _, sis := range hs.Stats {
if hs.Time != sis[0].SnapshotTaken {
err = fmt.Errorf("*** error: unexpected %d != snapshot taken %d", hs.Time, sis[0].SnapshotTaken)
fmt.Printf("%s\n", err)
statsAnalyze("", sis, int64(bucketSecs))
return
}
if hs.Time < mostRecentTime {
err = fmt.Errorf("*** error: unexpected %d < most recent time %d", hs.Time, mostRecentTime)
fmt.Printf("%s\n", err)
statsAnalyze("", sis, bucketSecs)
return
}
}
// For each new stat coming in, set the array contents
for siid, sis := range s {
var newStats []StatsStat
for sn, snew := range sis {
i := (hs.Time - snew.SnapshotTaken) / bucketSecs
if i < 0 || i > int64(len(hs.Stats[siid])) {
fmt.Printf("*** error: out of bounds %d, %d\n", i, len(hs.Stats[siid]))
continue
}
if hs.Stats[siid][i].SnapshotTaken != snew.SnapshotTaken {
fmt.Printf("target-currentIndex:%d source-NewIndex:%d out of place? %d != %d\n", i, sn, hs.Stats[siid][i].SnapshotTaken, snew.SnapshotTaken)
}
if snew.OSMemTotal != 0 {
hs.Stats[siid][i] = snew
newStats = append(newStats, snew)
added++
}
}
if len(newStats) > 0 {
addedStats[siid] = newStats
}
}
// Update the main stats
stats[hostname] = hs
return
}
// Analyze stats for a host
func statsAnalyzeHost(hostname string) {
// Lock and exit if no stats loaded yet
statsLock.Lock()
defer statsLock.Unlock()
if !uStatsLoaded(hostname) {
return
}
// Perform the analysis
hs := stats[hostname]
t := time.Unix(hs.Time, 0).UTC()
ts := t.Format("01-02 15:04:05")
fmt.Printf("Stats for host %s (%s)\n", hostname, ts)
for siid, sis := range hs.Stats {
fmt.Printf(" %s\n", siid)
statsAnalyze(" ", sis, hs.BucketMins*60)
}
}
// Analyze stats
func statsAnalyze(prefix string, stats []StatsStat, bucketSecs int64) {
var highest, lowest, prev int64
count := 0
for i, s := range stats {
blank := ""
if s.OSMemTotal == 0 {
blank = "blank"
}
count++
if highest == 0 {
highest = s.SnapshotTaken
}
lowest = s.SnapshotTaken
if prev == 0 {
t2 := time.Unix(lowest, 0).UTC()
t2s := t2.Format("01-02 15:04:05")
fmt.Printf("%s*** %d ok this:%s %s\n", prefix, i, t2s, blank)
} else {
if lowest >= prev {
t1 := time.Unix(prev, 0).UTC()
t1s := t1.Format("01-02 15:04:05")
t2 := time.Unix(lowest, 0).UTC()
t2s := t2.Format("01-02 15:04:05")
fmt.Printf("%s*** not descending %d prev:%s this:%s %s\n", prefix, i, t1s, t2s, blank)
} else {
shouldBe := prev - bucketSecs
if shouldBe != lowest {
t1 := time.Unix(lowest, 0).UTC()
t1s := t1.Format("01-02 15:04:05")
t2 := time.Unix(shouldBe, 0).UTC()
t2s := t2.Format("01-02 15:04:05")
fmt.Printf("%s*** not exact %d this:%s shouldBe:%s %s\n", prefix, i, t1s, t2s, blank)
} else {
t2 := time.Unix(lowest, 0).UTC()
t2s := t2.Format("01-02 15:04:05")
fmt.Printf("%s*** %d ok this:%s %s\n", prefix, i, t2s, blank)
}
}
}
prev = lowest
}
t1 := time.Unix(highest, 0).UTC()
t1s := t1.Format("01-02 15:04:05")
t2 := time.Unix(lowest, 0).UTC()
t2s := t2.Format("01-02 15:04:05")
fmt.Printf("%s%s - %s (%d entries)\n", prefix, t1s, t2s, count)
}
// Extract stats for the given host for a time range
func statsExtract(hostname string, beginTime int64, duration int64) (hsret HostStats, exists bool) {
// Lock and exit if no stats loaded yet
statsLock.Lock()
defer statsLock.Unlock()
if !uStatsLoaded(hostname) {
fmt.Printf("%s not loaded\n", hostname)
exists = false
return
}
// Perform the extraction
return uStatsExtract(hostname, beginTime, duration)
}
// Extract stats for the given host for a time range, locked
func uStatsExtract(hostname string, beginTime int64, duration int64) (hsret HostStats, exists bool) {
// Get the existing value, and exit if we want the whole thing
hsret, exists = stats[hostname]
if duration == 0 {
return
}
// Initialize host stats
hs := hsret
hsret = HostStats{}
hsret.Name = hs.Name
hsret.Addr = hs.Addr
hsret.BucketMins = hs.BucketMins
hsret.Stats = map[string][]StatsStat{}
// Loop, appending and filtering
for siid, sis := range hs.Stats {
if len(sis) == 0 {
continue
}
// Initialize a new return array
sisret := []StatsStat{}
// Iterate over the stats, filtering. We use the knowledge that the statss
// are ordered most-recent to least-recent in how we stop the scan.
for _, s := range sis {
if s.SnapshotTaken < beginTime {
break
}
if s.SnapshotTaken < (beginTime + duration) {
sisret = append(sisret, s)
if s.SnapshotTaken > hsret.Time {
hsret.Time = s.SnapshotTaken
}
}
}
// Store the stats for this instance
if len(sisret) != 0 {
hsret.Stats[siid] = sisret
}
}
// Done
return
}
// Get the UTC for today's midnight
func todayTime() int64 {
return (time.Now().UTC().Unix() / secs1Day) * secs1Day
}
// Get the UTC for today's midnight
func yesterdayTime() int64 {
return todayTime() - secs1Day
}
// Update the files with the data currently in-memory
func uSaveStats(hostname string, serviceVersion string) (err error) {
// Update today's stats into the file system and S3
contents, err := writeFileLocally(hostname, serviceVersion, todayTime(), secs1Day)
if err != nil {
fmt.Printf("stats: error writing %s: %s\n", statsFilename(hostname, serviceVersion, todayTime(), currentType), err)
} else {
err = s3UploadStats(statsFilename(hostname, serviceVersion, todayTime(), currentType), contents)
if err != nil {
fmt.Printf("stats: error uploading %s to S3: %s\n", statsFilename(hostname, serviceVersion, todayTime(), currentType), err)
}
}
return
}
// Return true if stats are loaded
func uStatsLoaded(hostname string) bool {
_, statsExist := stats[hostname]
return statsServiceVersions[hostname] != "" && statsExist
}
// Update the host's data structures both in-memory and on-disk
func statsUpdateHost(hostname string, hostaddr string, reload bool) (ss serviceSummary, handlers map[string]AppHandler, err error) {
// Only one in here at a time
statsLock.Lock()
defer statsLock.Unlock()
// Get a set of uniform stats across the devices. If we ping at the wrong time we may get inconsisten stats
// across the instances, so just retry
var serviceVersionChanged bool
var statsLastHour map[string][]StatsStat
for retries := 0; ; retries++ {
serviceVersionChanged, ss, handlers, statsLastHour, err = watcherGetStats(hostname, hostaddr)
if err != nil {
return
}
var uniform bool
uniform, err = statsAreUniform(statsLastHour)
if uniform {
break
}
if retries > 10 {
return
}
fmt.Printf("retrying: %s\n", err)
time.Sleep(10 * time.Second)
}
// If the stats for that service version were never yet loaded, load them
if !uStatsLoaded(hostname) {
err = uLoadStats(hostname, hostaddr, ss.ServiceVersion, ss.BucketSecs)
if err != nil {
fmt.Printf("stats: error loading %s stats: %s\n", hostname, err)
return
}
serviceVersionChanged = false
}
// If the service version changed, make sure that we write and re-load the stats
// using the new service version. We do this because when the service version
// changes, all the node IDs change and thus spreadsheets would be unusable.
if reload || serviceVersionChanged {
fmt.Printf("stats: %s service version changed\n", hostname)
err = uSaveStats(hostname, ss.ServiceVersion)
if err != nil {
fmt.Printf("stats: error saving %s stats: %s\n", hostname, err)
}
err = uLoadStats(hostname, hostaddr, ss.ServiceVersion, ss.BucketSecs)
if err != nil {
fmt.Printf("stats: error loading %s stats: %s\n", hostname, err)
}
}
// Verify that the in-memory stats are set up properly
uStatsVerify(hostname, hostaddr, ss.ServiceVersion, ss.BucketSecs)
// Update the stats in-memory
added, addedStats, err := uStatsAdd(hostname, hostaddr, statsLastHour)
if err != nil {
fmt.Printf("stats: error adding stats: %s\n", err)
}
if added > 0 {
fmt.Printf("stats: added %d new stats for %s\n", added, hostname)
}
// Save the stats in case we crash
uSaveStats(hostname, ss.ServiceVersion)
// If this is just the initial set of stats that were being loaded from the file system, ignore it,
// else write the stats to datadog
if len(addedStats) > 0 && time.Now().UTC().Unix() > statsInitCompleted+60 {
datadogUploadStats(hostname, ss.BucketSecs, addedStats)
}
// Done
return
}
// Read a file locally
func readFileLocally(hostname string, serviceVersion string, beginTime int64) (hs HostStats, err error) {
fmt.Printf("reading %s\n", statsFilename(hostname, serviceVersion, beginTime, currentType))
// Read the contents
var contents []byte
filepath := statsFilepath(hostname, serviceVersion, beginTime, currentType)
contents, err = os.ReadFile(filepath)
if err != nil {
return
}
// If it's a zip type, unzip the first file within the archive
if currentType == zipType {
lenBefore := len(contents)
archive, err2 := zip.NewReader(bytes.NewReader(contents), int64(len(contents)))
if err2 != nil {
err = err2
return
}
for _, zf := range archive.File {
f, err2 := zf.Open()
if err != nil {
err = err2
return
}
contents, err = io.ReadAll(f)
f.Close()
if err != nil {
return
}
if len(contents) > 0 {
break
}
}
fmt.Printf("readFile: unzipped %d to %d\n", lenBefore, len(contents))
}
// Unmarshal it
err = json.Unmarshal(contents, &hs)
if err != nil {
fmt.Printf("readFile: unmarshal error (%s): %s\n", statsFilename(hostname, serviceVersion, beginTime, currentType), err)
return
}
return
}
// Write a file locally
func writeFileLocally(hostname string, serviceVersion string, beginTime int64, duration int64) (contents []byte, err error) {
// Marshal the stats into a bytes buffer
hs, _ := uStatsExtract(hostname, beginTime, duration)
contents, err = json.Marshal(hs)
if err != nil {
fmt.Printf("writeFileLocally: marshal error (%s): %s\n", hostname, err)
return
}
// If desired, convert the bytes to zip format
if currentType == zipType {
lenBefore := len(contents)
buf := new(bytes.Buffer)
zipWriter := zip.NewWriter(buf)
zipFile, err2 := zipWriter.Create(statsFilename(hostname, serviceVersion, beginTime, jsonType))
if err2 != nil {
err = err2
return
}
_, err = zipFile.Write(contents)
if err != nil {
return
}
err = zipWriter.Close()
if err != nil {
return
}
contents = buf.Bytes()
fmt.Printf("writeFile: zipped %d to %d\n", lenBefore, len(contents))
}
// Write the file
err = os.WriteFile(statsFilepath(hostname, serviceVersion, beginTime, currentType), contents, 0644)
if err != nil {
return
}
// Return the contents
return
}
// Sort new-to-old
type statRecency []AggregatedStat
func (list statRecency) Len() int { return len(list) }
func (list statRecency) Swap(i, j int) { list[i], list[j] = list[j], list[i] }
func (list statRecency) Less(i, j int) bool {
var si = list[i]
var sj = list[j]
return si.Time > sj.Time
}
// Aggregate a notehub stats structure across service instances back into an StatsStat structure
func statsAggregateAsStatsStat(allStats map[string][]StatsStat, bucketSecs int64) (aggregatedStats []StatsStat) {
as := statsAggregate(allStats, bucketSecs)
// Pull them together
for _, s := range as {
lbs := StatsStat{}
lbs.SnapshotTaken = s.Time
lbs.OSDiskRead = s.DiskReads
lbs.OSDiskWrite = s.DiskWrites
lbs.OSNetReceived = s.NetReceived
lbs.OSNetSent = s.NetSent
lbs.HttpConnTotal = s.HttpConnTotal
lbs.HttpConnReused = s.HttpConnReused
lbs.DiscoveryHandlersActivated = s.NewHandlersDiscovery
lbs.EphemeralHandlersActivated = s.NewHandlersEphemeral
lbs.ContinuousHandlersActivated = s.NewHandlersContinuous
lbs.NotificationHandlersActivated = s.NewHandlersNotification
lbs.DiscoveryHandlersDeactivated = s.HandlersDiscovery
lbs.EphemeralHandlersDeactivated = s.HandlersEphemeral
lbs.ContinuousHandlersDeactivated = s.HandlersContinuous
lbs.NotificationHandlersDeactivated = s.HandlersNotification
lbs.EventsEnqueued = s.EventsReceived
lbs.EventsRouted = s.EventsRouted
lbs.Databases = s.Databases
lbs.Caches = s.Caches
lbs.API = s.API
lbs.Fatals = s.Fatals
aggregatedStats = append(aggregatedStats, lbs)
}
return
}
// Aggregate a notehub stats structure across service instances
func statsAggregate(allStats map[string][]StatsStat, bucketSecs int64) (aggregatedStats []AggregatedStat) {
// Assuming that all stats are on the same aligned timebase, fetch it
if len(allStats) == 0 {
return
}
// Create a data structure that aggregates stats, under the assumption that the stat
// buckets are aligned.
aggregatedStatsByBucket := make(map[int]AggregatedStat)
for _, sis := range allStats {
for _, s := range sis {
bucketID := int(s.SnapshotTaken / bucketSecs)
as := aggregatedStatsByBucket[bucketID]
as.Time = int64(bucketID) * bucketSecs
// Aggregate a common stat across instances
as.DiskReads += s.OSDiskRead
as.DiskWrites += s.OSDiskWrite
as.NetReceived += s.OSNetReceived
as.NetSent += s.OSNetSent
as.HttpConnTotal += s.HttpConnTotal
as.HttpConnReused += s.HttpConnReused
// Aggregate handlers.
as.NewHandlersEphemeral += s.EphemeralHandlersActivated
as.NewHandlersContinuous += s.ContinuousHandlersActivated
as.NewHandlersDiscovery += s.DiscoveryHandlersActivated
as.NewHandlersNotification += s.NotificationHandlersActivated
as.HandlersEphemeral += s.EphemeralHandlersDeactivated
as.HandlersContinuous += s.ContinuousHandlersDeactivated
as.HandlersDiscovery += s.DiscoveryHandlersDeactivated
as.HandlersNotification += s.NotificationHandlersDeactivated
// Events
as.EventsReceived += s.EventsEnqueued
as.EventsRouted += s.EventsRouted
// Databases
if as.Databases == nil {
as.Databases = map[string]StatsDatabase{}
}
if s.Databases != nil {
for key, db := range s.Databases {
as.DatabaseReads += db.Reads
as.DatabaseWrites += db.Writes
v := as.Databases[key]
v.Reads += db.Reads
v.Writes += db.Writes
if db.ReadMsMax > v.ReadMsMax {
v.ReadMsMax = db.ReadMsMax
}
if db.WriteMsMax > v.WriteMsMax {
v.WriteMsMax = db.WriteMsMax
}
as.Databases[key] = v
}
}
// Caches
if as.Caches == nil {
as.Caches = map[string]StatsCache{}
}
if s.Caches != nil {
for key, cache := range s.Caches {
v := as.Caches[key]
if cache.Invalidations > v.Invalidations {
v.Invalidations = cache.Invalidations
}
if cache.EntriesHWM > v.EntriesHWM {
v.EntriesHWM = cache.EntriesHWM
}
as.Caches[key] = v
}
}
// API calls
if as.API == nil {
as.API = map[string]int64{}
}
if s.API != nil {
for key, apiCalls := range s.API {
as.APITotal += apiCalls
as.API[key] += apiCalls
}
}
// Fatals calls
if as.Fatals == nil {
as.Fatals = map[string]int64{}
}
if s.Fatals != nil {
for key, fatals := range s.Fatals {
as.Fatals[key] += fatals
}
}
// Update the aggregated stat
aggregatedStatsByBucket[bucketID] = as
}
}
// Generate a flat array of stats
for _, s := range aggregatedStatsByBucket {
aggregatedStats = append(aggregatedStats, s)
}
// Sort the stats
sort.Sort(statRecency(aggregatedStats))
// Done
return
}