Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NET-6966] consul-template support for listing peerings #1869

Merged
merged 5 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Unreleased Changes

NEW FEATURES:
* Add support for listing Consul peers [NET-6966](https://hashicorp.atlassian.net/browse/NET-6966)

## v0.36.0 (January 3, 2024)

IMPROVEMENTS:
Expand Down
197 changes: 197 additions & 0 deletions dependency/consul_peering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package dependency

import (
"context"
"encoding/gob"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"log"
"net/url"
"regexp"
"sort"
"time"
)

var (
// Ensure implements
_ Dependency = (*ListPeeringQuery)(nil)

// ListPeeringQueryRe is the regular expression to use.
ListPeeringQueryRe = regexp.MustCompile(`\A` + queryRe + `\z`)
)

func init() {
gob.Register([]*Peering{})
gob.Register([]*PeeringStreamStatus{})
gob.Register([]*PeeringRemoteInfo{})
}

// ListPeeringQuery fetches all peering for a Consul cluster.
// https://developer.hashicorp.com/consul/api-docs/peering#list-all-peerings
type ListPeeringQuery struct {
stopCh chan struct{}

partition string
}

// Peering represent the response of the Consul peering API.
type Peering struct {
ID string
Name string
Partition string
Meta map[string]string
PeeringState string
PeerID string
PeerServerName string
PeerServerAddresses []string
StreamStatus PeeringStreamStatus
Remote PeeringRemoteInfo
}

type PeeringStreamStatus struct {
ImportedServices []string
ExportedServices []string
LastHeartbeat *time.Time
LastReceive *time.Time
LastSend *time.Time
}

type PeeringRemoteInfo struct {
Partition string
Datacenter string
}

func NewListPeeringQuery(s string) (*ListPeeringQuery, error) {
if s != "" && !ListPeeringQueryRe.MatchString(s) {
return nil, fmt.Errorf("list.peering: invalid format: %q", s)
}

m := regexpMatch(ListPeeringQueryRe, s)

queryParams, err := GetConsulQueryOpts(m, "list.peering")
if err != nil {
return nil, err
}

return &ListPeeringQuery{
stopCh: make(chan struct{}, 1),
partition: queryParams.Get(QueryPartition),
}, nil
}

func (l *ListPeeringQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
select {
case <-l.stopCh:
return nil, nil, ErrStopped
default:
}

opts = opts.Merge(&QueryOptions{
ConsulPartition: l.partition,
})

log.Printf("[TRACE] %s: GET %s", l, &url.URL{
Path: "/v1/peerings",
RawQuery: opts.String(),
})

// list peering is a blocking API, so making sure the ctx passed while calling it
// times out after the default wait time.
ctx, cancel := context.WithTimeout(context.Background(), DefaultContextTimeout)
defer cancel()

p, meta, err := clients.Consul().Peerings().List(ctx, opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrap(err, l.String())
}

log.Printf("[TRACE] %s: returned %d results", l, len(p))

peers := make([]*Peering, 0, len(p))
for _, peering := range p {
peers = append(peers, toPeering(peering))
}

// sort so that the result is deterministic
sort.Stable(ByPeer(peers))

rm := &ResponseMetadata{
LastIndex: meta.LastIndex,
LastContact: meta.LastContact,
}

return peers, rm, nil
}

func toPeering(p *api.Peering) *Peering {
return &Peering{
ID: p.ID,
Name: p.Name,
Partition: p.Partition,
Meta: p.Meta,
PeeringState: string(p.State),
PeerID: p.PeerID,
PeerServerName: p.PeerServerName,
PeerServerAddresses: p.PeerServerAddresses,
StreamStatus: PeeringStreamStatus{
ImportedServices: p.StreamStatus.ImportedServices,
ExportedServices: p.StreamStatus.ExportedServices,
LastHeartbeat: p.StreamStatus.LastHeartbeat,
LastReceive: p.StreamStatus.LastReceive,
LastSend: p.StreamStatus.LastSend,
},
Remote: PeeringRemoteInfo{
Partition: p.Remote.Partition,
Datacenter: p.Remote.Datacenter,
},
}
}

func (l *ListPeeringQuery) String() string {
partitionStr := l.partition

if len(partitionStr) > 0 {
partitionStr = fmt.Sprintf("?partition=%s", partitionStr)
} else {
return "list.peerings"
}

return fmt.Sprintf("list.peerings%s", partitionStr)
}

func (l *ListPeeringQuery) Stop() {
close(l.stopCh)
}

func (l *ListPeeringQuery) Type() Type {
return TypeConsul
}

func (l *ListPeeringQuery) CanShare() bool {
return false
}

// ByPeer is a sortable list of peerings in this order:
// 1. State
// 2. Partition
// 3. Name
type ByPeer []*Peering

func (p ByPeer) Len() int { return len(p) }
func (p ByPeer) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

// Less if peer names are cluster-2, cluster-12, cluster-1
// our sorting will be cluster-1, cluster-12, cluster-2
func (p ByPeer) Less(i, j int) bool {
if p[i].PeeringState == p[j].PeeringState {
if p[i].Partition == p[j].Partition {
return p[i].Name < p[j].Name
}
return p[i].Partition < p[j].Partition
}
return p[i].PeeringState < p[j].PeeringState
}
134 changes: 134 additions & 0 deletions dependency/consul_peering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package dependency

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestListPeeringsQuery(t *testing.T) {
cases := []struct {
name string
i string
exp *ListPeeringQuery
err bool
}{
{
"empty",
"",
&ListPeeringQuery{},
false,
},
{
"invalid query param (unsupported key)",
"?unsupported=foo",
nil,
true,
},
{
"peerings",
"peerings",
nil,
true,
},
{
"partition",
"?partition=foo",
&ListPeeringQuery{
partition: "foo",
},
false,
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
act, err := NewListPeeringQuery(tc.i)
if (err != nil) != tc.err {
t.Fatal(err)
}

if act != nil {
act.stopCh = nil
}

assert.Equal(t, tc.exp, act)
})
}
}

func TestListPeeringsQuery_Fetch(t *testing.T) {
cases := []struct {
name string
i string
exp []string
}{
{
"all",
"",
// the peering generated has random IDs,
// we can't assert on the full response,
// we can assert on the peering names though.
[]string{
"bar",
"foo",
},
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
p, err := NewListPeeringQuery(tc.i)
if err != nil {
t.Fatal(err)
}

res, _, err := p.Fetch(testClients, nil)
if err != nil {
t.Fatal(err)
}

if res == nil {
t.Fatalf("expected non-nil result")
}

peerNames := make([]string, 0)
for _, peering := range res.([]*Peering) {
peerNames = append(peerNames, peering.Name)
}

assert.Equal(t, tc.exp, peerNames)
})
}
}

func TestListPeeringsQuery_String(t *testing.T) {
cases := []struct {
name string
i string
exp string
}{
{
"empty",
"",
"list.peerings",
},
{
"partition",
"?partition=foo",
"list.peerings?partition=foo",
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewListPeeringQuery(tc.i)
if err != nil {
t.Fatal(err)
}
str := d.String()
assert.Equal(t, tc.exp, str)
})
}
}
5 changes: 5 additions & 0 deletions dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const (
TypeNomad
)

const (
// DefaultContextTimeout context wait timeout for blocking queries.
DefaultContextTimeout = 60 * time.Second
)

// Dependency is an interface for a dependency that Consul Template is capable
// of watching.
type Dependency interface {
Expand Down
22 changes: 22 additions & 0 deletions dependency/dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dependency

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -124,6 +125,11 @@ func TestMain(m *testing.M) {
Fatalf("%v", err)
}

err := createConsulPeerings(clients)
if err != nil {
Fatalf("%v", err)
}

// Wait for Nomad initialization to finish
if err := <-nomadFuture; err != nil {
testConsul.Stop()
Expand Down Expand Up @@ -158,6 +164,22 @@ func TestMain(m *testing.M) {
os.Exit(exit)
}

func createConsulPeerings(clients *ClientSet) error {
generateReq := api.PeeringGenerateTokenRequest{PeerName: "foo"}
_, _, err := clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
if err != nil {
return err
}

generateReq = api.PeeringGenerateTokenRequest{PeerName: "bar"}
_, _, err = clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
if err != nil {
return err
}

return nil
}

func runTestConsul(tb testutil.TestingTB) {
consul, err := testutil.NewTestServerConfigT(tb,
func(c *testutil.TestServerConfig) {
Expand Down
Loading