From 1066c878a503acc04fc1cd46644d1ef3749bdd12 Mon Sep 17 00:00:00 2001 From: kumarkavish Date: Fri, 19 Jan 2024 23:07:34 +0530 Subject: [PATCH 1/5] [NET-6966] consul-template support for listing peerings - new template function to access consul peerings in consul template. - test cases for the same. --- dependency/consul_peering.go | 197 ++++++++++++++++++++++++++++++ dependency/consul_peering_test.go | 134 ++++++++++++++++++++ dependency/dependency.go | 5 + dependency/dependency_test.go | 22 ++++ template/funcs.go | 22 ++++ template/template.go | 1 + template/template_test.go | 22 ++++ 7 files changed, 403 insertions(+) create mode 100644 dependency/consul_peering.go create mode 100644 dependency/consul_peering_test.go diff --git a/dependency/consul_peering.go b/dependency/consul_peering.go new file mode 100644 index 000000000..b81075010 --- /dev/null +++ b/dependency/consul_peering.go @@ -0,0 +1,197 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package dependency + +import ( + "context" + "encoding/gob" + "fmt" + "github.com/hashicorp/consul/api" + "github.com/pkg/errors" + "log" + "net/url" + "regexp" + "sort" + "time" +) + +var ( + // Ensure implements + _ Dependency = (*ListPeeringQuery)(nil) + + // ListPeeringQueryRe is the regular expression to use. + ListPeeringQueryRe = regexp.MustCompile(`\A` + queryRe + `\z`) +) + +func init() { + gob.Register([]*Peering{}) + gob.Register([]*PeeringStreamStatus{}) + gob.Register([]*PeeringRemoteInfo{}) +} + +// ListPeeringQuery fetches all peering for a Consul cluster. +// https://developer.hashicorp.com/consul/api-docs/peering#list-all-peerings +type ListPeeringQuery struct { + stopCh chan struct{} + + partition string +} + +// Peering represent the response of the Consul peering API. +type Peering struct { + ID string + Name string + Partition string + Meta map[string]string + PeeringState string + PeerID string + PeerServerName string + PeerServerAddresses []string + StreamStatus PeeringStreamStatus + Remote PeeringRemoteInfo +} + +type PeeringStreamStatus struct { + ImportedServices []string + ExportedServices []string + LastHeartbeat *time.Time + LastReceive *time.Time + LastSend *time.Time +} + +type PeeringRemoteInfo struct { + Partition string + Datacenter string +} + +func NewListPeeringQuery(s string) (*ListPeeringQuery, error) { + if s != "" && !ListPeeringQueryRe.MatchString(s) { + return nil, fmt.Errorf("list.peering: invalid format: %q", s) + } + + m := regexpMatch(ListPeeringQueryRe, s) + + queryParams, err := GetConsulQueryOpts(m, "list.peering") + if err != nil { + return nil, err + } + + return &ListPeeringQuery{ + stopCh: make(chan struct{}, 1), + partition: queryParams.Get(QueryPartition), + }, nil +} + +func (l *ListPeeringQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) { + select { + case <-l.stopCh: + return nil, nil, ErrStopped + default: + } + + opts = opts.Merge(&QueryOptions{ + ConsulPartition: l.partition, + }) + + log.Printf("[TRACE] %s: GET %s", l, &url.URL{ + Path: "/v1/peerings", + RawQuery: opts.String(), + }) + + // list peering is a blocking API, so making sure the ctx passed while calling it + // times out after the default wait time. + ctx, cancel := context.WithTimeout(context.Background(), DefaultContextTimeout) + defer cancel() + + p, meta, err := clients.Consul().Peerings().List(ctx, opts.ToConsulOpts()) + if err != nil { + return nil, nil, errors.Wrap(err, l.String()) + } + + log.Printf("[TRACE] %s: returned %d results", l, len(p)) + + peers := make([]*Peering, 0, len(p)) + for _, peering := range p { + peers = append(peers, toPeering(peering)) + } + + // sort so that the result is deterministic + sort.Stable(ByPeer(peers)) + + rm := &ResponseMetadata{ + LastIndex: meta.LastIndex, + LastContact: meta.LastContact, + } + + return peers, rm, nil +} + +func toPeering(p *api.Peering) *Peering { + return &Peering{ + ID: p.ID, + Name: p.Name, + Partition: p.Partition, + Meta: p.Meta, + PeeringState: fmt.Sprintf("%s", p.State), + PeerID: p.PeerID, + PeerServerName: p.PeerServerName, + PeerServerAddresses: p.PeerServerAddresses, + StreamStatus: PeeringStreamStatus{ + ImportedServices: p.StreamStatus.ImportedServices, + ExportedServices: p.StreamStatus.ExportedServices, + LastHeartbeat: p.StreamStatus.LastHeartbeat, + LastReceive: p.StreamStatus.LastReceive, + LastSend: p.StreamStatus.LastSend, + }, + Remote: PeeringRemoteInfo{ + Partition: p.Remote.Partition, + Datacenter: p.Remote.Datacenter, + }, + } +} + +func (l *ListPeeringQuery) String() string { + partitionStr := l.partition + + if len(partitionStr) > 0 { + partitionStr = fmt.Sprintf("?partition=%s", partitionStr) + } else { + return "list.peerings" + } + + return fmt.Sprintf("list.peerings%s", partitionStr) +} + +func (l *ListPeeringQuery) Stop() { + close(l.stopCh) +} + +func (l *ListPeeringQuery) Type() Type { + return TypeConsul +} + +func (l *ListPeeringQuery) CanShare() bool { + return false +} + +// ByPeer is a sortable list of peerings in this order: +// 1. State +// 2. Partition +// 3. Name +type ByPeer []*Peering + +func (p ByPeer) Len() int { return len(p) } +func (p ByPeer) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// Less if peer names are cluster-2, cluster-12, cluster-1 +// our sorting will be cluster-1, cluster-12, cluster-2 +func (p ByPeer) Less(i, j int) bool { + if p[i].PeeringState == p[j].PeeringState { + if p[i].Partition == p[j].Partition { + return p[i].Name < p[j].Name + } + return p[i].Partition < p[j].Partition + } + return p[i].PeeringState < p[j].PeeringState +} diff --git a/dependency/consul_peering_test.go b/dependency/consul_peering_test.go new file mode 100644 index 000000000..e36072b4c --- /dev/null +++ b/dependency/consul_peering_test.go @@ -0,0 +1,134 @@ +package dependency + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestListPeeringsQuery(t *testing.T) { + cases := []struct { + name string + i string + exp *ListPeeringQuery + err bool + }{ + { + "empty", + "", + &ListPeeringQuery{}, + false, + }, + { + "invalid query param (unsupported key)", + "?unsupported=foo", + nil, + true, + }, + { + "peerings", + "peerings", + nil, + true, + }, + { + "partition", + "?partition=foo", + &ListPeeringQuery{ + partition: "foo", + }, + false, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) { + act, err := NewListPeeringQuery(tc.i) + if (err != nil) != tc.err { + t.Fatal(err) + } + + if act != nil { + act.stopCh = nil + } + + assert.Equal(t, tc.exp, act) + }) + } +} + +func TestListPeeringsQuery_Fetch(t *testing.T) { + cases := []struct { + name string + i string + exp []string + }{ + { + "all", + "", + // the peering generated has random IDs, + // we can't assert on the full response, + // we can assert on the peering names though. + []string{ + "bar", + "foo", + }, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) { + p, err := NewListPeeringQuery(tc.i) + if err != nil { + t.Fatal(err) + } + + res, _, err := p.Fetch(testClients, nil) + if err != nil { + t.Fatal(err) + } + + if res == nil { + t.Fatalf("expected non-nil result") + } + + peerNames := make([]string, 0) + for _, peering := range res.([]*Peering) { + peerNames = append(peerNames, peering.Name) + } + + assert.Equal(t, tc.exp, peerNames) + }) + } +} + +func TestListPeeringsQuery_String(t *testing.T) { + cases := []struct { + name string + i string + exp string + }{ + { + "empty", + "", + "list.peerings", + }, + { + "partition", + "?partition=foo", + "list.peerings?partition=foo", + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) { + d, err := NewListPeeringQuery(tc.i) + if err != nil { + t.Fatal(err) + } + str := d.String() + assert.Equal(t, tc.exp, str) + }) + } +} diff --git a/dependency/dependency.go b/dependency/dependency.go index e96cda3c4..46d62135f 100644 --- a/dependency/dependency.go +++ b/dependency/dependency.go @@ -42,6 +42,11 @@ const ( TypeNomad ) +const ( + // DefaultContextTimeout context wait timeout for blocking queries. + DefaultContextTimeout = 60 * time.Second +) + // Dependency is an interface for a dependency that Consul Template is capable // of watching. type Dependency interface { diff --git a/dependency/dependency_test.go b/dependency/dependency_test.go index 2d5b2fa99..0948da841 100644 --- a/dependency/dependency_test.go +++ b/dependency/dependency_test.go @@ -4,6 +4,7 @@ package dependency import ( + "context" "encoding/json" "fmt" "io" @@ -124,6 +125,11 @@ func TestMain(m *testing.M) { Fatalf("%v", err) } + err := createConsulPeerings(clients) + if err != nil { + Fatalf("%v", err) + } + // Wait for Nomad initialization to finish if err := <-nomadFuture; err != nil { testConsul.Stop() @@ -158,6 +164,22 @@ func TestMain(m *testing.M) { os.Exit(exit) } +func createConsulPeerings(clients *ClientSet) error { + generateReq := api.PeeringGenerateTokenRequest{PeerName: "foo"} + _, _, err := clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{}) + if err != nil { + return err + } + + generateReq = api.PeeringGenerateTokenRequest{PeerName: "bar"} + _, _, err = clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{}) + if err != nil { + return err + } + + return nil +} + func runTestConsul(tb testutil.TestingTB) { consul, err := testutil.NewTestServerConfigT(tb, func(c *testutil.TestServerConfig) { diff --git a/template/funcs.go b/template/funcs.go index 3635d0b14..09db41976 100644 --- a/template/funcs.go +++ b/template/funcs.go @@ -363,6 +363,28 @@ func nodesFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.Node, e } } +// peeringsFunc returns or accumulates peerings. +func peeringsFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.Peering, error) { + return func(s ...string) ([]*dep.Peering, error) { + result := []*dep.Peering{} + + d, err := dep.NewListPeeringQuery(strings.Join(s, "")) + if err != nil { + return nil, err + } + + used.Add(d) + + if value, ok := b.Recall(d); ok { + return value.([]*dep.Peering), nil + } + + missing.Add(d) + + return result, nil + } +} + // pkiCertFunc returns a PKI cert from Vault func pkiCertFunc(b *Brain, used, missing *dep.Set, destPath string) func(...string) (interface{}, error) { return func(s ...string) (interface{}, error) { diff --git a/template/template.go b/template/template.go index f43a62dd6..ffc624c2e 100644 --- a/template/template.go +++ b/template/template.go @@ -328,6 +328,7 @@ func funcMap(i *funcMapInput) template.FuncMap { "safeLs": safeLsFunc(i.brain, i.used, i.missing), "node": nodeFunc(i.brain, i.used, i.missing), "nodes": nodesFunc(i.brain, i.used, i.missing), + "peerings": peeringsFunc(i.brain, i.used, i.missing), "secret": secretFunc(i.brain, i.used, i.missing), "secrets": secretsFunc(i.brain, i.used, i.missing), "service": serviceFunc(i.brain, i.used, i.missing), diff --git a/template/template_test.go b/template/template_test.go index 9e1839ab9..7b21a8318 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -477,6 +477,28 @@ func TestTemplate_Execute(t *testing.T) { "node1node2", false, }, + { + "func_peerings", + &NewTemplateInput{ + Contents: `{{ range peerings }}{{ .Name }}{{ end }}`, + }, + &ExecuteInput{ + Brain: func() *Brain { + b := NewBrain() + d, err := dep.NewListPeeringQuery("") + if err != nil { + t.Fatal(err) + } + b.Remember(d, []*dep.Peering{ + {Name: "cluster-01"}, + {Name: "cluster-02"}, + }) + return b + }(), + }, + "cluster-01cluster-02", + false, + }, { "func_secret_read", &NewTemplateInput{ From e767b4afeb8d051224ae58ce43286a21da0d4f8e Mon Sep 17 00:00:00 2001 From: kumarkavish Date: Fri, 19 Jan 2024 23:09:55 +0530 Subject: [PATCH 2/5] [NET-6966] consul-template support for listing peerings - CHANGELOG.md --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6d80beb9..f2ee459fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# Unreleased Changes + +IMPROVEMENTS: +* Add support for listing Consul peers [NET-6966](https://hashicorp.atlassian.net/browse/NET-6966) + ## v0.36.0 (January 3, 2024) IMPROVEMENTS: From d8f0d94aebe78e094fe506d77567905fa023d06b Mon Sep 17 00:00:00 2001 From: kumarkavish Date: Fri, 19 Jan 2024 23:13:41 +0530 Subject: [PATCH 3/5] [NET-6966] consul-template support for listing peerings - lint issues --- dependency/consul_peering.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependency/consul_peering.go b/dependency/consul_peering.go index b81075010..e7ab46973 100644 --- a/dependency/consul_peering.go +++ b/dependency/consul_peering.go @@ -133,7 +133,7 @@ func toPeering(p *api.Peering) *Peering { Name: p.Name, Partition: p.Partition, Meta: p.Meta, - PeeringState: fmt.Sprintf("%s", p.State), + PeeringState: string(p.State), PeerID: p.PeerID, PeerServerName: p.PeerServerName, PeerServerAddresses: p.PeerServerAddresses, From 5f2837e2576ba150062fe106f73835d02605000e Mon Sep 17 00:00:00 2001 From: kumarkavish Date: Sat, 20 Jan 2024 00:37:16 +0530 Subject: [PATCH 4/5] [NET-6966] consul-template support for listing peerings - correction --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2ee459fb..6a0fcaf4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Unreleased Changes -IMPROVEMENTS: +NEW FEATURES: * Add support for listing Consul peers [NET-6966](https://hashicorp.atlassian.net/browse/NET-6966) ## v0.36.0 (January 3, 2024) From 970b431da49c4b7f7664bcdc282c0ed5e5f4c21a Mon Sep 17 00:00:00 2001 From: kumarkavish Date: Tue, 23 Jan 2024 00:36:17 +0530 Subject: [PATCH 5/5] [NET-6966] consul-template support for listing peerings - docs for peering template --- docs/templating-language.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/docs/templating-language.md b/docs/templating-language.md index 459b8e203..91c90318f 100644 --- a/docs/templating-language.md +++ b/docs/templating-language.md @@ -490,6 +490,39 @@ To query a different data center and order by shortest trip time to ourselves: To access map data such as `TaggedAddresses` or `Meta`, use [Go's text/template][text-template] map indexing. +### `peerings` + +Query [Consul][consul] for all peerings. + +```golang +{{ peerings "?" }} +``` +The `` attribute is optional; if omitted, `default` partition will be queried. `` can be used to set the Consul partition. `` accepts a url query-parameter format, e.g.: + +```golang +{{ range peerings "?partition=partition" }} +{{ .Name }}{{ end }} +``` +For example: + +```golang +{{ range peerings }} +{{ .Name }}{{ end }} +``` + +renders + +```text +foo +bar +``` + +For complete list of available fields, see consul's documentation on [CT-Peerings](https://github.com/hashicorp/consul-template/blob/main/dependency/consul_peering.go#L42) + +To access map data such as `Meta` or slice such as `PeerServerAddresses`, use +[Go's text/template][text-template] map indexing. + + ### `secret` #### Format