Skip to content

Commit

Permalink
[NET-6966] consul-template support for listing peerings
Browse files Browse the repository at this point in the history
- new template function to access consul peerings in consul template.
- test cases for the same.
  • Loading branch information
kkavish committed Jan 19, 2024
1 parent 8fdab02 commit 1066c87
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 0 deletions.
197 changes: 197 additions & 0 deletions dependency/consul_peering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package dependency

import (
"context"
"encoding/gob"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"log"
"net/url"
"regexp"
"sort"
"time"
)

var (
// Ensure implements
_ Dependency = (*ListPeeringQuery)(nil)

// ListPeeringQueryRe is the regular expression to use.
ListPeeringQueryRe = regexp.MustCompile(`\A` + queryRe + `\z`)
)

func init() {
gob.Register([]*Peering{})
gob.Register([]*PeeringStreamStatus{})
gob.Register([]*PeeringRemoteInfo{})
}

// ListPeeringQuery fetches all peering for a Consul cluster.
// https://developer.hashicorp.com/consul/api-docs/peering#list-all-peerings
type ListPeeringQuery struct {
stopCh chan struct{}

partition string
}

// Peering represent the response of the Consul peering API.
type Peering struct {
ID string
Name string
Partition string
Meta map[string]string
PeeringState string
PeerID string
PeerServerName string
PeerServerAddresses []string
StreamStatus PeeringStreamStatus
Remote PeeringRemoteInfo
}

type PeeringStreamStatus struct {
ImportedServices []string
ExportedServices []string
LastHeartbeat *time.Time
LastReceive *time.Time
LastSend *time.Time
}

type PeeringRemoteInfo struct {
Partition string
Datacenter string
}

func NewListPeeringQuery(s string) (*ListPeeringQuery, error) {
if s != "" && !ListPeeringQueryRe.MatchString(s) {
return nil, fmt.Errorf("list.peering: invalid format: %q", s)
}

m := regexpMatch(ListPeeringQueryRe, s)

queryParams, err := GetConsulQueryOpts(m, "list.peering")
if err != nil {
return nil, err
}

return &ListPeeringQuery{
stopCh: make(chan struct{}, 1),
partition: queryParams.Get(QueryPartition),
}, nil
}

func (l *ListPeeringQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
select {
case <-l.stopCh:
return nil, nil, ErrStopped
default:
}

opts = opts.Merge(&QueryOptions{
ConsulPartition: l.partition,
})

log.Printf("[TRACE] %s: GET %s", l, &url.URL{
Path: "/v1/peerings",
RawQuery: opts.String(),
})

// list peering is a blocking API, so making sure the ctx passed while calling it
// times out after the default wait time.
ctx, cancel := context.WithTimeout(context.Background(), DefaultContextTimeout)
defer cancel()

p, meta, err := clients.Consul().Peerings().List(ctx, opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrap(err, l.String())
}

log.Printf("[TRACE] %s: returned %d results", l, len(p))

peers := make([]*Peering, 0, len(p))
for _, peering := range p {
peers = append(peers, toPeering(peering))
}

// sort so that the result is deterministic
sort.Stable(ByPeer(peers))

rm := &ResponseMetadata{
LastIndex: meta.LastIndex,
LastContact: meta.LastContact,
}

return peers, rm, nil
}

func toPeering(p *api.Peering) *Peering {
return &Peering{
ID: p.ID,
Name: p.Name,
Partition: p.Partition,
Meta: p.Meta,
PeeringState: fmt.Sprintf("%s", p.State),
PeerID: p.PeerID,
PeerServerName: p.PeerServerName,
PeerServerAddresses: p.PeerServerAddresses,
StreamStatus: PeeringStreamStatus{
ImportedServices: p.StreamStatus.ImportedServices,
ExportedServices: p.StreamStatus.ExportedServices,
LastHeartbeat: p.StreamStatus.LastHeartbeat,
LastReceive: p.StreamStatus.LastReceive,
LastSend: p.StreamStatus.LastSend,
},
Remote: PeeringRemoteInfo{
Partition: p.Remote.Partition,
Datacenter: p.Remote.Datacenter,
},
}
}

func (l *ListPeeringQuery) String() string {
partitionStr := l.partition

if len(partitionStr) > 0 {
partitionStr = fmt.Sprintf("?partition=%s", partitionStr)
} else {
return "list.peerings"
}

return fmt.Sprintf("list.peerings%s", partitionStr)
}

func (l *ListPeeringQuery) Stop() {
close(l.stopCh)
}

func (l *ListPeeringQuery) Type() Type {
return TypeConsul
}

func (l *ListPeeringQuery) CanShare() bool {
return false
}

// ByPeer is a sortable list of peerings in this order:
// 1. State
// 2. Partition
// 3. Name
type ByPeer []*Peering

func (p ByPeer) Len() int { return len(p) }
func (p ByPeer) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

// Less if peer names are cluster-2, cluster-12, cluster-1
// our sorting will be cluster-1, cluster-12, cluster-2
func (p ByPeer) Less(i, j int) bool {
if p[i].PeeringState == p[j].PeeringState {
if p[i].Partition == p[j].Partition {
return p[i].Name < p[j].Name
}
return p[i].Partition < p[j].Partition
}
return p[i].PeeringState < p[j].PeeringState
}
134 changes: 134 additions & 0 deletions dependency/consul_peering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package dependency

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestListPeeringsQuery(t *testing.T) {
cases := []struct {
name string
i string
exp *ListPeeringQuery
err bool
}{
{
"empty",
"",
&ListPeeringQuery{},
false,
},
{
"invalid query param (unsupported key)",
"?unsupported=foo",
nil,
true,
},
{
"peerings",
"peerings",
nil,
true,
},
{
"partition",
"?partition=foo",
&ListPeeringQuery{
partition: "foo",
},
false,
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
act, err := NewListPeeringQuery(tc.i)
if (err != nil) != tc.err {
t.Fatal(err)
}

if act != nil {
act.stopCh = nil
}

assert.Equal(t, tc.exp, act)
})
}
}

func TestListPeeringsQuery_Fetch(t *testing.T) {
cases := []struct {
name string
i string
exp []string
}{
{
"all",
"",
// the peering generated has random IDs,
// we can't assert on the full response,
// we can assert on the peering names though.
[]string{
"bar",
"foo",
},
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
p, err := NewListPeeringQuery(tc.i)
if err != nil {
t.Fatal(err)
}

res, _, err := p.Fetch(testClients, nil)
if err != nil {
t.Fatal(err)
}

if res == nil {
t.Fatalf("expected non-nil result")
}

peerNames := make([]string, 0)
for _, peering := range res.([]*Peering) {
peerNames = append(peerNames, peering.Name)
}

assert.Equal(t, tc.exp, peerNames)
})
}
}

func TestListPeeringsQuery_String(t *testing.T) {
cases := []struct {
name string
i string
exp string
}{
{
"empty",
"",
"list.peerings",
},
{
"partition",
"?partition=foo",
"list.peerings?partition=foo",
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewListPeeringQuery(tc.i)
if err != nil {
t.Fatal(err)
}
str := d.String()
assert.Equal(t, tc.exp, str)
})
}
}
5 changes: 5 additions & 0 deletions dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const (
TypeNomad
)

const (
// DefaultContextTimeout context wait timeout for blocking queries.
DefaultContextTimeout = 60 * time.Second
)

// Dependency is an interface for a dependency that Consul Template is capable
// of watching.
type Dependency interface {
Expand Down
22 changes: 22 additions & 0 deletions dependency/dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dependency

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -124,6 +125,11 @@ func TestMain(m *testing.M) {
Fatalf("%v", err)
}

err := createConsulPeerings(clients)
if err != nil {
Fatalf("%v", err)
}

// Wait for Nomad initialization to finish
if err := <-nomadFuture; err != nil {
testConsul.Stop()
Expand Down Expand Up @@ -158,6 +164,22 @@ func TestMain(m *testing.M) {
os.Exit(exit)
}

func createConsulPeerings(clients *ClientSet) error {
generateReq := api.PeeringGenerateTokenRequest{PeerName: "foo"}
_, _, err := clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
if err != nil {
return err
}

generateReq = api.PeeringGenerateTokenRequest{PeerName: "bar"}
_, _, err = clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
if err != nil {
return err
}

return nil
}

func runTestConsul(tb testutil.TestingTB) {
consul, err := testutil.NewTestServerConfigT(tb,
func(c *testutil.TestServerConfig) {
Expand Down
Loading

0 comments on commit 1066c87

Please sign in to comment.