From caad8c43b893d6ce817003b2278cb7010c31649d Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Wed, 2 May 2018 09:24:46 -0700 Subject: [PATCH] Simplify MergeSampleStream and favor *not* switching between streams --- promhttputil/merge.go | 98 +++++++++++---------- promhttputil/merge_test.go | 172 +++++++++++++++++++++++++++++++++++-- 2 files changed, 215 insertions(+), 55 deletions(-) diff --git a/promhttputil/merge.go b/promhttputil/merge.go index 39588bd5e..2de15d8e7 100644 --- a/promhttputil/merge.go +++ b/promhttputil/merge.go @@ -128,6 +128,14 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value) (model.Value, } // MergeSampleStream merges SampleStreams `a` and `b` with the given antiAffinityBuffer +// When combining series from 2 different prometheus hosts we can run into some problems +// with clock skew (from a variety of sources). The primary one I've run into is issues +// with the time that prometheus stores. Since the time associated with the datapoint is +// the *start* time of the scrape, there can be quite a lot of time (which can vary +// dramatically between hosts) for the exporter to return. In an attempt to mitigate +// this problem we're going to *not* merge any datapoint within antiAffinityBuffer of another point +// we have. This means we can tolerate antiAffinityBuffer/2 on either side (which can be used by either +// clock skew or from this scrape skew). func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream) (*model.SampleStream, error) { if a.Metric.Fingerprint() != b.Metric.Fingerprint() { return nil, fmt.Errorf("Cannot merge mismatch fingerprints") @@ -135,65 +143,55 @@ func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream) // TODO: really there should be a library method for this in prometheus IMO // At this point we have 2 sorted lists of datapoints which we need to merge - seenTimes := make(map[model.Time]struct{}) - newValues := make([]model.SamplePair, 0, len(a.Values)+len(b.Values)) - - ai := 0 // Offset in a - bi := 0 // Offset in b - - // When combining series from 2 different prometheus hosts we can run into some problems - // with clock skew (from a variety of sources). The primary one I've run into is issues - // with the time that prometheus stores. Since the time associated with the datapoint is - // the *start* time of the scrape, there can be quite a lot of time (which can vary - // dramatically between hosts) for the exporter to return. In an attempt to mitigate - // this problem we're going to *not* merge any datapoint within antiAffinityBuffer of another point - // we have. This means we can tolerate antiAffinityBuffer/2 on either side (which can be used by either - // clock skew or from this scrape skew). - - var lastTime model.Time - - for { - if ai >= len(a.Values) && bi >= len(b.Values) { - break - } + newValues := make([]model.SamplePair, 0, len(a.Values)) - var item model.SamplePair - - if ai < len(a.Values) { // If a exists - if bi < len(b.Values) { - // both items - if a.Values[ai].Timestamp < b.Values[bi].Timestamp { - item = a.Values[ai] - ai++ - } else { - item = b.Values[bi] - bi++ - } + bOffset := 0 + aStartBuffered := a.Values[0].Timestamp - antiAffinityBuffer + + // start by loading b points before a + if b.Values[0].Timestamp < aStartBuffered { + for i, bValue := range b.Values { + bOffset = i + if bValue.Timestamp < aStartBuffered { + newValues = append(newValues, bValue) } else { - // Only A - item = a.Values[ai] - ai++ - } - } else { - if bi < len(b.Values) { - // Only B - item = b.Values[bi] - bi++ + break } } - // If we've already seen this timestamp, skip - if _, ok := seenTimes[item.Timestamp]; ok { + + } + + for _, aValue := range a.Values { + // if we have no points, this one by definition is valid + if len(newValues) == 0 { + newValues = append(newValues, aValue) continue } - if (lastTime > 0) && item.Timestamp-lastTime < antiAffinityBuffer { - continue + // if there is a gap between the last 2 points > antiAffinityBuffer + // check if b has a point that would fit in there + lastTime := newValues[len(newValues)-1].Timestamp + if (aValue.Timestamp - lastTime) > antiAffinityBuffer*2 { + // We want to see if we have any datapoints in the window that aren't too close + for ; bOffset < len(b.Values); bOffset++ { + bValue := b.Values[bOffset] + if bValue.Timestamp >= aValue.Timestamp { + break + } + if bValue.Timestamp > lastTime+antiAffinityBuffer && bValue.Timestamp < (aValue.Timestamp-antiAffinityBuffer) { + newValues = append(newValues, bValue) + } + } } - lastTime = item.Timestamp + newValues = append(newValues, aValue) + } - // Otherwise, lets add it - newValues = append(newValues, item) - seenTimes[item.Timestamp] = struct{}{} + lastTime := newValues[len(newValues)-1].Timestamp + for ; bOffset < len(b.Values); bOffset++ { + bValue := b.Values[bOffset] + if bValue.Timestamp > lastTime+antiAffinityBuffer { + newValues = append(newValues, bValue) + } } return &model.SampleStream{ diff --git a/promhttputil/merge_test.go b/promhttputil/merge_test.go index 0997bb69d..415b5e109 100644 --- a/promhttputil/merge_test.go +++ b/promhttputil/merge_test.go @@ -279,22 +279,149 @@ func TestMergeValues(t *testing.T) { r: model.Matrix([]*model.SampleStream{ { model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), - []model.SamplePair{{ - model.Time(100), - model.SampleValue(10), + []model.SamplePair{ + { + model.Time(100), + model.SampleValue(10), + }, + { + model.Time(200), + model.SampleValue(10), + }, }, + }, + }), + }, + + // Fill missing + // Lots of holes, ensure they are merged correctly + { + name: "Matrix fill missing2", + a: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{ { model.Time(200), model.SampleValue(10), - }}, + }, + { + model.Time(400), + model.SampleValue(10), + }, + }, }, }), + b: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{ + { + model.Time(100), + model.SampleValue(10), + }, + { + model.Time(300), + model.SampleValue(10), + }, + { + model.Time(500), + model.SampleValue(10), + }, + }, + }, + }), + r: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{ + { + model.Time(100), + model.SampleValue(10), + }, + { + model.Time(200), + model.SampleValue(10), + }, + { + model.Time(300), + model.SampleValue(10), + }, + { + model.Time(400), + model.SampleValue(10), + }, + { + model.Time(500), + model.SampleValue(10), + }, + }, + }, + }), + antiAffinity: model.Time(20), + }, + + // Fill missing + // In this case we have 2 series which have large gaps, but the anti-affinity + // defines that we should not merge them, make sure we don't + { + name: "Matrix fill missing3", + a: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{ + { + model.Time(200), + model.SampleValue(10), + }, + { + model.Time(400), + model.SampleValue(10), + }, + }, + }, + }), + b: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{ + { + model.Time(100), + model.SampleValue(10), + }, + { + model.Time(300), + model.SampleValue(10), + }, + { + model.Time(500), + model.SampleValue(10), + }, + }, + }, + }), + r: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{ + { + model.Time(200), + model.SampleValue(10), + }, + { + model.Time(400), + model.SampleValue(10), + }, + }, + }, + }), + antiAffinity: model.Time(100), }, // Ensure that anti-affinity-buffer is working properly // if we have 2 matrix values with similar times only one should be put in { - name: "Matrix fill missing2", + name: "Matrix merge similar", a: model.Matrix([]*model.SampleStream{ { model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), @@ -324,6 +451,41 @@ func TestMergeValues(t *testing.T) { }), antiAffinity: model.Time(2), }, + + // Ensure that anti-affinity-buffer is working properly + // we want to prefer balues from the "first" series (as that is the one + // we already have. This avoids unnecessary switches between series + { + name: "Matrix merge similar 2", + a: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{{ + model.Time(101), + model.SampleValue(10), + }}, + }, + }), + b: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{{ + model.Time(100), + model.SampleValue(10), + }}, + }, + }), + r: model.Matrix([]*model.SampleStream{ + { + model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}), + []model.SamplePair{{ + model.Time(101), + model.SampleValue(10), + }}, + }, + }), + antiAffinity: model.Time(2), + }, } for _, test := range tests {