This repository has been archived by the owner on Feb 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandlers.go
185 lines (152 loc) · 4.54 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package main
import (
"bytes"
"context"
"errors"
"log"
"net"
"strconv"
"strings"
"time"
)
// NewDatagram returns the Datagram struct completed
func NewDatagram(buffer []byte, addr net.Addr, nbytes int) Datagram {
return Datagram{
NumberOfBytes: nbytes,
RemoteAddr: addr,
Buffer: buffer,
}
}
// ReceiverDatagram receive the UDP datagram and return
// to the channel the Datagram received
func ReceiverDatagram(ctx context.Context, conn net.PacketConn, done chan<- error, receivedDatagram chan Datagram) {
for {
buffer := make([]byte, packetSizeUDP)
nbytes, addr, err := conn.ReadFrom(buffer)
if err != nil {
<-ctx.Done()
done <- err
}
d := NewDatagram(buffer, addr, nbytes)
receivedDatagram <- d
}
}
// splitNameMetric split the raw name of metric to human name to this metric
func splitNameMetric(n string) string {
var nameSplitted []string
var suffix string
nameSplitted = strings.Split(n, ".")
if len(nameSplitted) > 0 {
suffix = "." + nameSplitted[len(nameSplitted)-1]
}
nameSplitted = strings.Split(n, ";")
if len(nameSplitted) >= 2 {
return nameSplitted[0] + suffix
}
nameSplitted = strings.Split(n, "@")
if len(nameSplitted) >= 2 {
return nameSplitted[0] + suffix
}
return n
}
// handleDatagram handle the msg from udp datagram packet
func handleDatagram(d Datagram) (nameRaw string, name string, value float64, typeOf string, err error) {
var msg []byte
if len(d.Buffer) == 0 {
err = errors.New("the length of the buffer of datagram is zero")
return
}
msg = d.Buffer[:d.NumberOfBytes]
idx := bytes.IndexByte(d.Buffer, '\n')
// protocol does not require line to end in \n
if idx >= 0 { // \n not found
msg = d.Buffer[:idx]
}
sMsg := string(msg)
splittedMsg := strings.Split(sMsg, ":")
splittedValueType := strings.Split(splittedMsg[1], "|")
nameRaw = splittedMsg[0]
name = splitNameMetric(nameRaw)
typeOf = splittedValueType[1]
value, err = strconv.ParseFloat(splittedValueType[0], 64)
return
}
// ParseStatsDMetric parse the Datagram received to a StatsDMetric
func (d Datagram) ParseStatsDMetric() (StatsDMetric, error) {
stats := StatsDMetric{}
var err error
stats.NameRaw, stats.Name, stats.Value, stats.Type, err = handleDatagram(d)
log.Printf("ParseStatsDMetric: stats: %+v\n", stats)
return stats, err
}
// ParseMetric parse the Datagram received to a Metric
// with all fields completed and ready for storage
func (d Datagram) ParseMetric(statsd *StatsDServer) (Metric, error) {
m := Metric{}
var err error
m.Hostname = statsd.Config.Hostname
m.SourceIP, m.SourcePort, err = net.SplitHostPort(d.RemoteAddr.String())
m.Timestamp = time.Now()
m.Prefix = ""
m.Stats, err = d.ParseStatsDMetric()
return m, err
}
// Save store the metric in the storage previous configured
func (metric Metric) Save(statsd *StatsDServer) {
if err := statsd.Storage.SaveMetric(metric); err != nil {
log.Printf("metric.Save(): statsd.Storage.SaveMetric(metric) error: %+v\n", err)
return
}
}
// IsSupported check if the metric type is supported by storage
func (metric Metric) IsSupported(statsd *StatsDServer) (supported bool) {
switch metric.Stats.Type {
case "ms":
supported = true
}
return
}
// Process all the things about metric and save
func (metric Metric) Process(statsd *StatsDServer) {
var err error
if supported := metric.IsSupported(statsd); !supported {
log.Printf("metric.Process(): metric is not supported; metric.Stats=%+v", metric.Stats)
return
}
// initialization for each metric
statsd.Storage, err = NewStorage(statsd.Config.StorageType, statsd.Config.StorageURL)
if err != nil {
log.Printf("metric.Process(): NewStorage error=%+v", err)
return
}
// check if item exists
if !statsd.Cache.ItemExists(metric) {
// if not exists, create the item in the cache
err := statsd.Cache.SaveItem(metric)
if err != nil {
log.Printf("metric.Process(): statsd.Cache.SaveItem(metric) error: %+v\n", err)
}
// and create the item in the storage, if necessary
if err := statsd.Storage.SaveItem(metric); err != nil {
log.Printf("metric.Process(): statsd.Storage.SaveItem(metric) error: %+v\n", err)
}
}
// Save the metric
metric.Save(statsd)
}
// RunMetrics receive all Datagram by a channel and run all operations
// for proccess and save/storage this metric
func RunMetrics(ctx context.Context, done chan<- error, receivedDatagram <-chan Datagram, statsd *StatsDServer) {
for {
select {
case <-ctx.Done():
return
case d := <-receivedDatagram:
metric, err := d.ParseMetric(statsd)
if err != nil {
done <- err
}
go metric.Process(statsd)
}
}
}