-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding the serialization features. #1666
Conversation
Don't be put off by the lines change count, 80% of that is generated code from msgp |
Tests are failing from a data race in the tests themselves since I am accessing them directly. Lemme see if I can fix that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work, I like the deduplication algo. I only did a first pass, I'm not yet familiar with the full picture
internal/component/prometheus/remote/queue/serialization/seralizer_test.go
Outdated
Show resolved
Hide resolved
Stats are protocol agnostic, so that if used in prometheus or otel environment they can add their own specific metrics and we dont define protocol specific in the lower level structs. The component in the last PR will expose prometheus compatible ones derived from the callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Looking good :)
@@ -141,7 +141,7 @@ lint: alloylint | |||
# final command runs tests for all other submodules. | |||
test: | |||
$(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/) | |||
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker | |||
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd be running these tests twice, second time without -race
- I don't see the reason why, is that an accident?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one test that will not be ran twice since I am accessing the var directly to test its value. The others will be ran, I could add the //go:build race to the others. Note most of our exclusions above have some tests that run twice.
internal/component/prometheus/remote/queue/serialization/appender.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/serialization/appender.go
Outdated
Show resolved
Hide resolved
ts.TS = t | ||
ts.Value = v | ||
ts.Hash = l.Hash() | ||
err := a.s.SendSeries(a.ctx, ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed that ts
will be returned eventually to the object pool? Would we have a leak if, e.g. the component was removed from Alloy config? I don't see any issues, but would be nice to make this code a bit more clear that this is what's going on, with naming or comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be a required that all time series are returned. Though not in this PR this is checked in a future test via OutStandingTimeSeriesBinary
atomic int. There are end to end tests that ensure at the end of the test this is zero.
internal/component/prometheus/remote/queue/serialization/serializer.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/types/serialization.go
Outdated
Show resolved
Hide resolved
stringsSlice := make([]string, len(strMapToInt)) | ||
for stringValue, index := range strMapToInt { | ||
stringsSlice[index] = stringValue | ||
} | ||
group.Strings = stringsSlice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stringsSlice := make([]string, len(strMapToInt)) | |
for stringValue, index := range strMapToInt { | |
stringsSlice[index] = stringValue | |
} | |
group.Strings = stringsSlice | |
dictionary := make([]string, len(strMapToInt)) | |
for stringValue, index := range strMapToInt { | |
dictionary[index] = stringValue | |
} | |
group.dictionary = dictionary |
I like to use the concept of dictionary here, or lookup table... it makes it easier to figure out what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to use an actual map? Or a rename like above?
} | ||
group.Strings = stringsSlice | ||
|
||
buf, err := group.MarshalMsg(s.msgpBuffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sooo... is it worth it to do the dictionary stuff? I guess yes, but on the other hand I know that compression algos would do something similar automatically, snappy can refer to previous part of the data to reduce repetition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One second had a bug in my test re-evaluating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright back with much more verifiable test.
//go:generate msgp
package main
import (
"fmt"
"math/rand"
"reflect"
"github.com/golang/snappy"
)
// 5 long really random
// 5371616
// 4732108
// 5 long half random
// 5050060
// 3929185
// 5 long quarter random
// 4455979
// 2918973
func main() {
metrics := make([]map[string]string, 0)
// 100k metrics with 10 labels each
for i := 0; i < 100_000; i++ {
metrics = append(metrics, getLabels())
}
ss := &StringString{Labels: metrics}
bb, err := ss.MarshalMsg(nil)
if err != nil {
panic(err)
}
out := snappy.Encode(nil, bb)
dc, _ := snappy.Decode(nil, out)
err = validateStringString(dc, metrics)
if err != nil {
panic(err)
}
println(fmt.Printf("dictionary based is %d bytes", len(out)))
ib := &IndexBased{
String: make([]string, 0),
Names: make([][]uint32, 0),
Values: make([][]uint32, 0),
}
alignIndexBased(ib, metrics)
bb, err = ib.MarshalMsg(nil)
if err != nil {
panic(err)
}
out = snappy.Encode(nil, bb)
dc, _ = snappy.Decode(nil, out)
err = validateIndexBased(dc, metrics)
println(fmt.Printf("index based is %d bytes", len(out)))
}
func validateStringString(bb []byte, metrics []map[string]string) error {
ss := &StringString{}
_, err := ss.UnmarshalMsg(bb)
if err != nil {
return err
}
for i, m := range metrics {
if !reflect.DeepEqual(ss.Labels[i], m) {
return fmt.Errorf("invalid metric at index %d", i)
}
}
return nil
}
func validateIndexBased(bb []byte, metrics []map[string]string) error {
ss := &IndexBased{}
_, err := ss.UnmarshalMsg(bb)
if err != nil {
return err
}
for i, m := range metrics {
if !reflect.DeepEqual(getMetric(ss.Names[i], ss.Values[i], ss.String), m) {
return fmt.Errorf("invalid metric at index %d", i)
}
}
return nil
}
func getMetric(names []uint32, values []uint32, strings []string) map[string]string {
metric := make(map[string]string)
for i, v := range names {
metric[strings[v]] = strings[values[i]]
}
return metric
}
func alignIndexBased(ib *IndexBased, strings []map[string]string) {
index := 0
stringsList := make(map[string]int)
for _, metric := range strings {
names := make([]uint32, 0)
values := make([]uint32, 0)
for k, v := range metric {
keyIndex, ok := stringsList[k]
if !ok {
stringsList[k] = index
ib.String = append(ib.String, k)
keyIndex = index
index++
}
valIndex, ok := stringsList[v]
if !ok {
stringsList[v] = index
ib.String = append(ib.String, v)
valIndex = index
index++
}
names = append(names, uint32(keyIndex))
values = append(values, uint32(valIndex))
}
ib.Names = append(ib.Names, names)
ib.Values = append(ib.Values, values)
}
ib.String = make([]string, len(stringsList))
for k, v := range stringsList {
ib.String[v] = k
}
}
func getLabels() map[string]string {
retLbls := make(map[string]string, 0)
for i := 0; i < 10; i++ {
retLbls[fmt.Sprintf("label_%d", i)] = randString()
}
return retLbls
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var halfRandom = []rune("abcdefghijklmnopqrstuvwxyz")
var quarterRandom = []rune("abcdefghijkl")
func randString() string {
b := make([]rune, rand.Intn(5))
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
type IndexBased struct {
Names [][]uint32
Values [][]uint32
String []string
}
type StringString struct {
Labels []map[string]string
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general the index based is never worse, and IMO in many cases is 60% of the size of the pure string based.
Results from the above test, changing out the letterRunes to small sets.
// 5 char long really random
// 5371616 string map
// 4732108 index based
// 5 long half random
// 5050060
// 3929185
// 5 long quarter random
// 4455979
// 2918973
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lower the cardinality the better but even in worse case its not terrible.
internal/component/prometheus/remote/queue/serialization/serializer_bench_test.go
Outdated
Show resolved
Hide resolved
…der.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com>
…lizer.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com>
Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com>
…lizer.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com>
Going to merge this and we can revisit any followup in the big merge on specific points. |
This adds the serialization side of converting series to the binary format. The binary format is time series where the strings are deduplicated with actual marshalling handled by the msgp library. I tested roughly 6 libraries listed here. Msgp hit the sweetspot of ease of use and size and features. Such as reusing arrays if they were passed in.