Skip to content

Commit

Permalink
Fix staleness during target handover (#703)
Browse files Browse the repository at this point in the history
* Refactor out DistributedTargets and clean up.

* Add MovedAway method to DistributedTargets and rename existing methods.

* distributed_targets.go tests and benchmarks and refactor

* tests and refactor

* fix race condition in pyroscope test

* workaround for data race in prometheus

* changelog

* fix changelog

* feedback

* use a better named prom fork branch
  • Loading branch information
thampiotr authored May 13, 2024
1 parent 9f6bc3a commit 12fb273
Show file tree
Hide file tree
Showing 16 changed files with 1,201 additions and 102 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Bugfixes

- Fixed an issue with `prometheus.scrape` in which targets that move from one
cluster instance to another could have a staleness marker inserted and result
in a gap in metrics (@thampiotr)

v1.1.0-rc.0
-----------

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,10 @@ replace (
// * There is a release of Prometheus which contains
// prometheus/prometheus#13002
// and prometheus/prometheus#13497
// and https://github.com/grafana/prometheus/pull/34
// We use the last v1-related tag as the replace statement does not work for v2
// tags without the v2 suffix to the module root.
replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 // cmp_header_order branch
replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e // cmp_header_order_and_staleness_disabling branch

replace gopkg.in/yaml.v2 => github.com/rfratto/go-yaml v0.0.0-20211119180816-77389c3526dc

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,8 @@ github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e1018
github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e101852ba5/go.mod h1:0djU5YbUIZw4Y+KNYk07tZztXrMK/LNAZyfH8b7f7xA=
github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd h1:vNHdecaOmYgSHMEQRgyzWacV++N38Jp8qLZg0RCsfFo=
github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd/go.mod h1:kR16GJ0ZwWVQ2osW3pgtDJU1a/GXpufrwio0kLG14cg=
github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 h1:LVIOYe5j92m10wluP5hgeHqSkOLnZzcPxhYCkdbLXCE=
github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g=
github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e h1:uQDMlJKE+h6TloPTTiSyA8FSMJeU8mQfg1MY1/UrCKA=
github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE=
github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo=
Expand Down
49 changes: 0 additions & 49 deletions internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"time"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/service/cluster"
"github.com/grafana/ckit/shard"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand All @@ -20,53 +18,6 @@ import (
// component.
type Target map[string]string

// DistributedTargets uses the node's Lookup method to distribute discovery
// targets when a component runs in a cluster.
type DistributedTargets struct {
useClustering bool
cluster cluster.Cluster
targets []Target
}

// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(e bool, n cluster.Cluster, t []Target) DistributedTargets {
return DistributedTargets{e, n, t}
}

// Get distributes discovery targets a clustered environment.
//
// If a cluster size is 1, then all targets will be returned.
func (t *DistributedTargets) Get() []Target {
// TODO(@tpaschalis): Make this into a single code-path to simplify logic.
if !t.useClustering || t.cluster == nil {
return t.targets
}

peerCount := len(t.cluster.Peers())
resCap := (len(t.targets) + 1)
if peerCount != 0 {
resCap = (len(t.targets) + 1) / peerCount
}

res := make([]Target, 0, resCap)

for _, tgt := range t.targets {
peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite)
if err != nil {
// This can only fail in case we ask for more owners than the
// available peers. This will never happen, but in any case we fall
// back to owning the target ourselves.
res = append(res, tgt)
}
if len(peers) == 0 || peers[0].Self {
res = append(res, tgt)
}
}

return res
}

// Labels converts Target into a set of sorted labels.
func (t Target) Labels() labels.Labels {
var lset labels.Labels
Expand Down
92 changes: 92 additions & 0 deletions internal/component/discovery/distributed_targets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package discovery

import (
"github.com/grafana/ckit/peer"
"github.com/grafana/ckit/shard"

"github.com/grafana/alloy/internal/service/cluster"
)

// DistributedTargets uses the node's Lookup method to distribute discovery
// targets when a component runs in a cluster.
type DistributedTargets struct {
localTargets []Target
// localTargetKeys is used to cache the key hash computation. Improves time performance by ~20%.
localTargetKeys []shard.Key
remoteTargetKeys map[shard.Key]struct{}
}

// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(clusteringEnabled bool, cluster cluster.Cluster, allTargets []Target) *DistributedTargets {
if !clusteringEnabled || cluster == nil {
cluster = disabledCluster{}
}

localCap := len(allTargets) + 1
if peerCount := len(cluster.Peers()); peerCount != 0 {
localCap = (len(allTargets) + 1) / peerCount
}

localTargets := make([]Target, 0, localCap)
localTargetKeys := make([]shard.Key, 0, localCap)
remoteTargetKeys := make(map[shard.Key]struct{}, len(allTargets)-localCap)

for _, tgt := range allTargets {
targetKey := keyFor(tgt)
peers, err := cluster.Lookup(targetKey, 1, shard.OpReadWrite)
belongsToLocal := err != nil || len(peers) == 0 || peers[0].Self

if belongsToLocal {
localTargets = append(localTargets, tgt)
localTargetKeys = append(localTargetKeys, targetKey)
} else {
remoteTargetKeys[targetKey] = struct{}{}
}
}

return &DistributedTargets{
localTargets: localTargets,
localTargetKeys: localTargetKeys,
remoteTargetKeys: remoteTargetKeys,
}
}

// LocalTargets returns the targets that belong to the local cluster node.
func (dt *DistributedTargets) LocalTargets() []Target {
return dt.localTargets
}

// MovedToRemoteInstance returns the set of local targets from prev
// that are no longer local in dt, indicating an active target has moved.
// Only targets which exist in both prev and dt are returned. If prev
// contains an empty list of targets, no targets are returned.
func (dt *DistributedTargets) MovedToRemoteInstance(prev *DistributedTargets) []Target {
if prev == nil {
return nil
}
var movedAwayTargets []Target
for i := 0; i < len(prev.localTargets); i++ {
key := prev.localTargetKeys[i]
if _, exist := dt.remoteTargetKeys[key]; exist {
movedAwayTargets = append(movedAwayTargets, prev.localTargets[i])
}
}
return movedAwayTargets
}

func keyFor(tgt Target) shard.Key {
return shard.Key(tgt.NonMetaLabels().Hash())
}

type disabledCluster struct{}

var _ cluster.Cluster = disabledCluster{}

func (l disabledCluster) Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error) {
return nil, nil
}

func (l disabledCluster) Peers() []peer.Peer {
return nil
}
Loading

0 comments on commit 12fb273

Please sign in to comment.