Skip to content

Commit

Permalink
Simplify MergeSampleStream and favor *not* switching between streams
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksontj committed May 2, 2018
1 parent dc948d7 commit caad8c4
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 55 deletions.
98 changes: 48 additions & 50 deletions promhttputil/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,72 +128,70 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value) (model.Value,
}

// MergeSampleStream merges SampleStreams `a` and `b` with the given antiAffinityBuffer
// When combining series from 2 different prometheus hosts we can run into some problems
// with clock skew (from a variety of sources). The primary one I've run into is issues
// with the time that prometheus stores. Since the time associated with the datapoint is
// the *start* time of the scrape, there can be quite a lot of time (which can vary
// dramatically between hosts) for the exporter to return. In an attempt to mitigate
// this problem we're going to *not* merge any datapoint within antiAffinityBuffer of another point
// we have. This means we can tolerate antiAffinityBuffer/2 on either side (which can be used by either
// clock skew or from this scrape skew).
func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream) (*model.SampleStream, error) {
if a.Metric.Fingerprint() != b.Metric.Fingerprint() {
return nil, fmt.Errorf("Cannot merge mismatch fingerprints")
}

// TODO: really there should be a library method for this in prometheus IMO
// At this point we have 2 sorted lists of datapoints which we need to merge
seenTimes := make(map[model.Time]struct{})
newValues := make([]model.SamplePair, 0, len(a.Values)+len(b.Values))

ai := 0 // Offset in a
bi := 0 // Offset in b

// When combining series from 2 different prometheus hosts we can run into some problems
// with clock skew (from a variety of sources). The primary one I've run into is issues
// with the time that prometheus stores. Since the time associated with the datapoint is
// the *start* time of the scrape, there can be quite a lot of time (which can vary
// dramatically between hosts) for the exporter to return. In an attempt to mitigate
// this problem we're going to *not* merge any datapoint within antiAffinityBuffer of another point
// we have. This means we can tolerate antiAffinityBuffer/2 on either side (which can be used by either
// clock skew or from this scrape skew).

var lastTime model.Time

for {
if ai >= len(a.Values) && bi >= len(b.Values) {
break
}
newValues := make([]model.SamplePair, 0, len(a.Values))

var item model.SamplePair

if ai < len(a.Values) { // If a exists
if bi < len(b.Values) {
// both items
if a.Values[ai].Timestamp < b.Values[bi].Timestamp {
item = a.Values[ai]
ai++
} else {
item = b.Values[bi]
bi++
}
bOffset := 0
aStartBuffered := a.Values[0].Timestamp - antiAffinityBuffer

// start by loading b points before a
if b.Values[0].Timestamp < aStartBuffered {
for i, bValue := range b.Values {
bOffset = i
if bValue.Timestamp < aStartBuffered {
newValues = append(newValues, bValue)
} else {
// Only A
item = a.Values[ai]
ai++
}
} else {
if bi < len(b.Values) {
// Only B
item = b.Values[bi]
bi++
break
}
}
// If we've already seen this timestamp, skip
if _, ok := seenTimes[item.Timestamp]; ok {

}

for _, aValue := range a.Values {
// if we have no points, this one by definition is valid
if len(newValues) == 0 {
newValues = append(newValues, aValue)
continue
}

if (lastTime > 0) && item.Timestamp-lastTime < antiAffinityBuffer {
continue
// if there is a gap between the last 2 points > antiAffinityBuffer
// check if b has a point that would fit in there
lastTime := newValues[len(newValues)-1].Timestamp
if (aValue.Timestamp - lastTime) > antiAffinityBuffer*2 {
// We want to see if we have any datapoints in the window that aren't too close
for ; bOffset < len(b.Values); bOffset++ {
bValue := b.Values[bOffset]
if bValue.Timestamp >= aValue.Timestamp {
break
}
if bValue.Timestamp > lastTime+antiAffinityBuffer && bValue.Timestamp < (aValue.Timestamp-antiAffinityBuffer) {
newValues = append(newValues, bValue)
}
}
}
lastTime = item.Timestamp
newValues = append(newValues, aValue)
}

// Otherwise, lets add it
newValues = append(newValues, item)
seenTimes[item.Timestamp] = struct{}{}
lastTime := newValues[len(newValues)-1].Timestamp
for ; bOffset < len(b.Values); bOffset++ {
bValue := b.Values[bOffset]
if bValue.Timestamp > lastTime+antiAffinityBuffer {
newValues = append(newValues, bValue)
}
}

return &model.SampleStream{
Expand Down
172 changes: 167 additions & 5 deletions promhttputil/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,149 @@ func TestMergeValues(t *testing.T) {
r: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{{
model.Time(100),
model.SampleValue(10),
[]model.SamplePair{
{
model.Time(100),
model.SampleValue(10),
},
{
model.Time(200),
model.SampleValue(10),
},
},
},
}),
},

// Fill missing
// Lots of holes, ensure they are merged correctly
{
name: "Matrix fill missing2",
a: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{
{
model.Time(200),
model.SampleValue(10),
}},
},
{
model.Time(400),
model.SampleValue(10),
},
},
},
}),
b: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{
{
model.Time(100),
model.SampleValue(10),
},
{
model.Time(300),
model.SampleValue(10),
},
{
model.Time(500),
model.SampleValue(10),
},
},
},
}),
r: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{
{
model.Time(100),
model.SampleValue(10),
},
{
model.Time(200),
model.SampleValue(10),
},
{
model.Time(300),
model.SampleValue(10),
},
{
model.Time(400),
model.SampleValue(10),
},
{
model.Time(500),
model.SampleValue(10),
},
},
},
}),
antiAffinity: model.Time(20),
},

// Fill missing
// In this case we have 2 series which have large gaps, but the anti-affinity
// defines that we should not merge them, make sure we don't
{
name: "Matrix fill missing3",
a: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{
{
model.Time(200),
model.SampleValue(10),
},
{
model.Time(400),
model.SampleValue(10),
},
},
},
}),
b: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{
{
model.Time(100),
model.SampleValue(10),
},
{
model.Time(300),
model.SampleValue(10),
},
{
model.Time(500),
model.SampleValue(10),
},
},
},
}),
r: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{
{
model.Time(200),
model.SampleValue(10),
},
{
model.Time(400),
model.SampleValue(10),
},
},
},
}),
antiAffinity: model.Time(100),
},

// Ensure that anti-affinity-buffer is working properly
// if we have 2 matrix values with similar times only one should be put in
{
name: "Matrix fill missing2",
name: "Matrix merge similar",
a: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
Expand Down Expand Up @@ -324,6 +451,41 @@ func TestMergeValues(t *testing.T) {
}),
antiAffinity: model.Time(2),
},

// Ensure that anti-affinity-buffer is working properly
// we want to prefer balues from the "first" series (as that is the one
// we already have. This avoids unnecessary switches between series
{
name: "Matrix merge similar 2",
a: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{{
model.Time(101),
model.SampleValue(10),
}},
},
}),
b: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{{
model.Time(100),
model.SampleValue(10),
}},
},
}),
r: model.Matrix([]*model.SampleStream{
{
model.Metric(model.LabelSet{model.MetricNameLabel: model.LabelValue("hosta")}),
[]model.SamplePair{{
model.Time(101),
model.SampleValue(10),
}},
},
}),
antiAffinity: model.Time(2),
},
}

for _, test := range tests {
Expand Down

0 comments on commit caad8c4

Please sign in to comment.