Skip to content

Commit

Permalink
Make new cluster peer discovery default
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Oct 3, 2024
1 parent 9229619 commit af36368
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 267 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Main (unreleased)
- The `mimir.rules.kubernetes` component now supports adding extra label matchers
to all queries discovered via `PrometheusRule` CRDs. (@thampiotr)

- The `cluster.use-discovery-v1` flag is now deprecated since there were no issues found with the v2 cluster discovery mechanism. (@thampiotr)

### Bugfixes

- Update windows_exporter from v0.27.2 vo v0.27.3: (@jkroepke)
Expand Down
4 changes: 0 additions & 4 deletions docs/sources/reference/cli/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ The following flags are supported:
* `--cluster.advertise-interfaces`: List of interfaces used to infer an address to advertise. Set to `all` to use all available network interfaces on the system. (default `"eth0,en0"`).
* `--cluster.max-join-peers`: Number of peers to join from the discovered set (default `5`).
* `--cluster.name`: Name to prevent nodes without this identifier from joining the cluster (default `""`).
* `--cluster.use-discovery-v1`: Use the older, v1 version of cluster peer discovery mechanism (default `false`). Note that this flag will be deprecated in the future and eventually removed.
* `--cluster.enable-tls`: Specifies whether TLS should be used for communication between peers (default `false`).
* `--cluster.tls-ca-path`: Path to the CA certificate file used for peer communication over TLS.
* `--cluster.tls-cert-path`: Path to the certificate file used for peer communication over TLS.
Expand Down Expand Up @@ -146,9 +145,6 @@ When `--cluster.name` is provided, nodes will only join peers who share the same
By default, the cluster name is empty, and any node that doesn't set the flag can join.
Attempting to join a cluster with a wrong `--cluster.name` will result in a "failed to join memberlist" error.

The `--cluster.use-discovery-v1` flag can be used to switch back to the older, v1 version of the cluster peer discovery mechanism
in case of any issues with the newer version. This flag will be deprecated in the future and eventually removed.

### Clustering states

Clustered {{< param "PRODUCT_NAME" >}}s are in one of three states:
Expand Down
197 changes: 34 additions & 163 deletions internal/alloycli/cluster_builder.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package alloycli

import (
"errors"
"fmt"
stdlog "log"
"net"
"os"
"strconv"
"time"

"github.com/go-kit/log"
"github.com/grafana/ckit/advertise"
"github.com/hashicorp/go-discover"
"github.com/hashicorp/go-discover/provider/k8s"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"

Expand All @@ -26,23 +22,21 @@ type clusterOptions struct {
Metrics prometheus.Registerer
Tracer trace.TracerProvider

EnableClustering bool
NodeName string
AdvertiseAddress string
ListenAddress string
JoinPeers []string
DiscoverPeers string
RejoinInterval time.Duration
AdvertiseInterfaces []string
ClusterMaxJoinPeers int
ClusterName string
EnableStateUpdatesLimiter bool
EnableDiscoveryV2 bool
EnableTLS bool
TLSCAPath string
TLSCertPath string
TLSKeyPath string
TLSServerName string
EnableClustering bool
NodeName string
AdvertiseAddress string
ListenAddress string
JoinPeers []string
DiscoverPeers string
RejoinInterval time.Duration
AdvertiseInterfaces []string
ClusterMaxJoinPeers int
ClusterName string
EnableTLS bool
TLSCAPath string
TLSCertPath string
TLSKeyPath string
TLSServerName string
}

func buildClusterService(opts clusterOptions) (*cluster.Service, error) {
Expand All @@ -53,17 +47,16 @@ func buildClusterService(opts clusterOptions) (*cluster.Service, error) {
Metrics: opts.Metrics,
Tracer: opts.Tracer,

EnableClustering: opts.EnableClustering,
NodeName: opts.NodeName,
RejoinInterval: opts.RejoinInterval,
ClusterMaxJoinPeers: opts.ClusterMaxJoinPeers,
ClusterName: opts.ClusterName,
EnableStateUpdatesLimiter: opts.EnableStateUpdatesLimiter,
EnableTLS: opts.EnableTLS,
TLSCAPath: opts.TLSCAPath,
TLSCertPath: opts.TLSCertPath,
TLSKeyPath: opts.TLSKeyPath,
TLSServerName: opts.TLSServerName,
EnableClustering: opts.EnableClustering,
NodeName: opts.NodeName,
RejoinInterval: opts.RejoinInterval,
ClusterMaxJoinPeers: opts.ClusterMaxJoinPeers,
ClusterName: opts.ClusterName,
EnableTLS: opts.EnableTLS,
TLSCAPath: opts.TLSCAPath,
TLSCertPath: opts.TLSCertPath,
TLSKeyPath: opts.TLSKeyPath,
TLSServerName: opts.TLSServerName,
}

if config.NodeName == "" {
Expand All @@ -80,41 +73,16 @@ func buildClusterService(opts clusterOptions) (*cluster.Service, error) {
return nil, err
}

// New, refactored and improved peer discovery.
// TODO(alloy/#1274): Remove the old peer discovery code once this becomes default.
if opts.EnableDiscoveryV2 {
config.DiscoverPeers, err = discovery.NewPeerDiscoveryFn(discovery.Options{
JoinPeers: opts.JoinPeers,
DiscoverPeers: opts.DiscoverPeers,
DefaultPort: listenPort,
Logger: opts.Log,
Tracer: opts.Tracer,
})
if err != nil {
return nil, err
}
} else {
switch {
case len(opts.JoinPeers) > 0 && opts.DiscoverPeers != "":
return nil, fmt.Errorf("at most one of join peers and discover peers may be set")

case len(opts.JoinPeers) > 0:
config.DiscoverPeers = newStaticDiscovery(opts.JoinPeers, listenPort, opts.Log)

case opts.DiscoverPeers != "":
discoverFunc, err := newDynamicDiscovery(config.Log, opts.DiscoverPeers, listenPort)
if err != nil {
return nil, err
}
config.DiscoverPeers = discoverFunc

default:
// Here, both JoinPeers and DiscoverPeers are empty. This is desirable when
// starting a seed node that other nodes connect to, so we don't require
// one of the fields to be set.
}
config.DiscoverPeers, err = discovery.NewPeerDiscoveryFn(discovery.Options{
JoinPeers: opts.JoinPeers,
DiscoverPeers: opts.DiscoverPeers,
DefaultPort: listenPort,
Logger: opts.Log,
Tracer: opts.Tracer,
})
if err != nil {
return nil, err
}

return cluster.New(config)
}

Expand Down Expand Up @@ -165,100 +133,3 @@ func appendDefaultPort(addr string, port int) string {
}
return net.JoinHostPort(addr, strconv.Itoa(port))
}

type discoverFunc func() ([]string, error)

func newStaticDiscovery(providedAddr []string, defaultPort int, log log.Logger) discovery.DiscoverFn {
return func() ([]string, error) {
addresses, err := buildJoinAddresses(providedAddr, log)
if err != nil {
return nil, fmt.Errorf("static peer discovery: %w", err)
}
for i := range addresses {
// Default to using the same advertise port as the local node. This may
// break in some cases, so the user should make sure the port numbers
// align on as many nodes as possible.
addresses[i] = appendDefaultPort(addresses[i], defaultPort)
}
return addresses, nil
}
}

func buildJoinAddresses(providedAddr []string, log log.Logger) ([]string, error) {
// Currently we don't consider it an error to not have any join addresses.
if len(providedAddr) == 0 {
return nil, nil
}
var (
result []string
deferredErr error
)
for _, addr := range providedAddr {
// If it's a host:port, use it as is.
_, _, err := net.SplitHostPort(addr)
if err != nil {
wrappedErr := fmt.Errorf("failed to extract host and port: %w", err)
deferredErr = errors.Join(deferredErr, wrappedErr)
} else {
level.Debug(log).Log("msg", "found a host:port cluster join address", "addr", addr)
result = append(result, addr)
break
}

// If it's an IP address, use it.
ip := net.ParseIP(addr)
if ip != nil {
level.Debug(log).Log("msg", "found an IP cluster join address", "addr", addr)
result = append(result, ip.String())
break
}

// Otherwise, do a DNS lookup and return all the records found.
_, srvs, err := net.LookupSRV("", "", addr)
if err != nil {
level.Warn(log).Log("msg", "failed to resolve SRV records", "addr", addr, "err", err)
wrappedErr := fmt.Errorf("failed to resolve SRV records: %w", err)
deferredErr = errors.Join(deferredErr, wrappedErr)
} else {
level.Debug(log).Log("msg", "found cluster join addresses via SRV records", "addr", addr, "count", len(srvs))
for _, srv := range srvs {
result = append(result, srv.Target)
}
}
}
if len(result) == 0 {
return nil, fmt.Errorf("failed to find any valid join addresses: %w", deferredErr)
}
return result, nil
}

func newDynamicDiscovery(l log.Logger, config string, defaultPort int) (discovery.DiscoverFn, error) {
providers := make(map[string]discover.Provider, len(discover.Providers)+1)
for k, v := range discover.Providers {
providers[k] = v
}

// Custom providers that aren't enabled by default
providers["k8s"] = &k8s.Provider{}

discoverer, err := discover.New(discover.WithProviders(providers))
if err != nil {
return nil, fmt.Errorf("bootstrapping peer discovery: %w", err)
}

return func() ([]string, error) {
addrs, err := discoverer.Addrs(config, stdlog.New(log.NewStdlibAdapter(l), "", 0))
if err != nil {
return nil, fmt.Errorf("discovering peers: %w", err)
}

for i := range addrs {
// Default to using the same advertise port as the local node. This may
// break in some cases, so the user should make sure the port numbers
// align on as many nodes as possible.
addrs[i] = appendDefaultPort(addrs[i], defaultPort)
}

return addrs, nil
}, nil
}
60 changes: 8 additions & 52 deletions internal/alloycli/cluster_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@ import (

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/runtime/tracing"
)

func TestBuildClusterService(t *testing.T) {
tracer, err := tracing.New(tracing.DefaultOptions)
require.NoError(t, err)

opts := clusterOptions{
JoinPeers: []string{"foo", "bar"},
DiscoverPeers: "provider=aws key1=val1 key2=val2",
Log: log.NewLogfmtLogger(os.Stderr),
Tracer: tracer,
}

cs, err := buildClusterService(opts)
require.Nil(t, cs)
require.EqualError(t, err, "at most one of join peers and discover peers may be set")
require.ErrorContains(t, err, "at most one of join peers and discover peers may be set")
}

func TestGetAdvertiseAddress(t *testing.T) {
Expand Down Expand Up @@ -55,54 +62,3 @@ func TestGetAdvertiseAddress(t *testing.T) {
require.Equal(t, "127.0.0.1:80", addr)
})
}

func TestStaticDiscovery(t *testing.T) {
t.Run("no addresses provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Nil(t, actual)
})
t.Run("host and port provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"host:8080"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"host:8080"}, actual)
})
t.Run("IP provided and default port added", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"192.168.0.1"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"192.168.0.1:12345"}, actual)
})
t.Run("fallback to next host and port provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"this | wont | work", "host2:8080"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"host2:8080"}, actual)
})
t.Run("fallback to next host and port provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"this | wont | work", "host2:8080"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"host2:8080"}, actual)
})
t.Run("nothing found", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"this | wont | work"}, 12345, logger)
actual, err := sd()
require.Nil(t, actual)
require.ErrorContains(t, err, "failed to find any valid join addresses")
// We can only check the error messages in Alloy's code.
// We cannot check for error messages from other Go libraries,
// because they may differ based on operating system and
// on env var settings such as GODEBUG=netdns.
require.ErrorContains(t, err, "failed to extract host and port")
require.ErrorContains(t, err, "failed to resolve SRV records")
})
}
Loading

0 comments on commit af36368

Please sign in to comment.