From 2e0784e39715da727dfb9337f3b86e1fe8f4540c Mon Sep 17 00:00:00 2001 From: Dmitriy Tarasov Date: Thu, 24 Aug 2023 18:26:43 +0300 Subject: [PATCH] swarm nodes fix getContainers --- README.md | 2 + cmd/docker-gen/main.go | 3 + internal/generator/generator.go | 358 +++++++++++++++------------ internal/generator/generator_test.go | 27 +- 4 files changed, 211 insertions(+), 179 deletions(-) diff --git a/README.md b/README.md index 4725ce01..1debeaab 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,8 @@ Options: config files with template directives. Config files will be merged if this option is specified multiple times. (default []) -endpoint string docker api endpoint (tcp|unix://..). Default unix:///var/run/docker.sock + -swarm-node value + docker api endpoints from which to listen for events. Default equals to value of `endpoint` argument -interval int notify command interval (secs) -keep-blank-lines diff --git a/cmd/docker-gen/main.go b/cmd/docker-gen/main.go index a28deeac..ebd7f182 100644 --- a/cmd/docker-gen/main.go +++ b/cmd/docker-gen/main.go @@ -34,6 +34,7 @@ var ( interval int keepBlankLines bool endpoint string + swarmNodes stringslice tlsCert string tlsKey string tlsCaCert string @@ -106,6 +107,7 @@ func initFlags() { flag.IntVar(&interval, "interval", 0, "notify command interval (secs)") flag.BoolVar(&keepBlankLines, "keep-blank-lines", false, "keep blank lines in the output file") flag.StringVar(&endpoint, "endpoint", "", "docker api endpoint (tcp|unix://..). Default unix:///var/run/docker.sock") + flag.Var(&swarmNodes, "swarm-node", "swarm node api endpoint (tcp|unix://..).") flag.StringVar(&tlsCert, "tlscert", filepath.Join(certPath, "cert.pem"), "path to TLS client certificate file") flag.StringVar(&tlsKey, "tlskey", filepath.Join(certPath, "key.pem"), "path to TLS client key file") flag.StringVar(&tlsCaCert, "tlscacert", filepath.Join(certPath, "ca.pem"), "path to TLS CA certificate file") @@ -174,6 +176,7 @@ func main() { generator, err := generator.NewGenerator(generator.GeneratorConfig{ Endpoint: endpoint, + SwarmNodes: swarmNodes, TLSKey: tlsKey, TLSCert: tlsCert, TLSCACert: tlsCaCert, diff --git a/internal/generator/generator.go b/internal/generator/generator.go index 240de21a..88b0189f 100644 --- a/internal/generator/generator.go +++ b/internal/generator/generator.go @@ -21,8 +21,10 @@ import ( type generator struct { Client *docker.Client + SwarmClients []*docker.Client Configs config.ConfigFile Endpoint string + SwarmNodes []string TLSVerify bool TLSCert, TLSCaCert, TLSKey string All bool @@ -32,7 +34,8 @@ type generator struct { } type GeneratorConfig struct { - Endpoint string + Endpoint string + SwarmNodes []string TLSCert string TLSKey string @@ -62,16 +65,38 @@ func NewGenerator(gc GeneratorConfig) (*generator, error) { // Grab the docker daemon info once and hold onto it context.SetDockerEnv(apiVersion) + var swarmNodes []string + if len(gc.SwarmNodes) == 0 { + swarmNodes = append(swarmNodes, gc.Endpoint) + } else { + swarmNodes = append(swarmNodes, gc.SwarmNodes...) + } + + var swarmClients []*docker.Client + for _, swarmNodeEndpoint := range swarmNodes { + swarmNodeEndpoint, err := dockerclient.GetEndpoint(swarmNodeEndpoint) + if err != nil { + return nil, fmt.Errorf("bad endpoint: %s", err) + } + client, err := dockerclient.NewDockerClient(swarmNodeEndpoint, gc.TLSVerify, gc.TLSCert, gc.TLSCACert, gc.TLSKey) + if err != nil { + return nil, fmt.Errorf("unable to create docker client: %s", err) + } + swarmClients = append(swarmClients, client) + } + return &generator{ - Client: client, - Endpoint: gc.Endpoint, - TLSVerify: gc.TLSVerify, - TLSCert: gc.TLSCert, - TLSCaCert: gc.TLSCACert, - TLSKey: gc.TLSKey, - All: gc.All, - Configs: gc.ConfigFile, - retry: true, + Client: client, + Endpoint: gc.Endpoint, + SwarmNodes: swarmNodes, + SwarmClients: swarmClients, + TLSVerify: gc.TLSVerify, + TLSCert: gc.TLSCert, + TLSCaCert: gc.TLSCACert, + TLSKey: gc.TLSKey, + All: gc.All, + Configs: gc.ConfigFile, + retry: true, }, nil } @@ -182,7 +207,6 @@ func (g *generator) generateFromEvents() { return } - client := g.Client var watchers []chan *docker.APIEvents for _, cfg := range configs.Config { @@ -215,97 +239,108 @@ func (g *generator) generateFromEvents() { }(cfg) } - // maintains docker client connection and passes events to watchers - go func() { - // channel will be closed by go-dockerclient - eventChan := make(chan *docker.APIEvents, 100) - sigChan, cleanup := newSignalChannel() - defer cleanup() - - for { - watching := false - - if client == nil { - var err error - endpoint, err := dockerclient.GetEndpoint(g.Endpoint) - if err != nil { - log.Printf("Bad endpoint: %s", err) - time.Sleep(10 * time.Second) - continue - } - client, err = dockerclient.NewDockerClient(endpoint, g.TLSVerify, g.TLSCert, g.TLSCaCert, g.TLSKey) - if err != nil { - log.Printf("Unable to connect to docker daemon: %s", err) - time.Sleep(10 * time.Second) - continue - } - } + eventChan := make(chan *docker.APIEvents, 100) + done := make(chan bool) + clientDone := make(chan bool) + for _, endpoint := range g.SwarmNodes { + go func(endpoint string) { + var client *docker.Client + var listenerChan chan *docker.APIEvents for { if client == nil { - break - } - if !watching { - err := client.AddEventListener(eventChan) + var err error + endpoint, err := dockerclient.GetEndpoint(endpoint) + if err != nil { + log.Printf("Bad endpoint: %s", err) + clientDone <- true + return + } + client, err = dockerclient.NewDockerClient(endpoint, g.TLSVerify, g.TLSCert, g.TLSCaCert, g.TLSKey) + if err != nil { + log.Printf("Unable to connect to docker daemon: %s", err) + time.Sleep(10 * time.Second) + continue + } + listenerChan = make(chan *docker.APIEvents, 100) + err = client.AddEventListener(listenerChan) if err != nil && err != docker.ErrListenerAlreadyExists { log.Printf("Error registering docker event listener: %s", err) + client = nil + listenerChan = nil time.Sleep(10 * time.Second) continue } - watching = true log.Println("Watching docker events") // sync all configs after resuming listener - g.generateFromContainers() + eventChan <- nil } select { - case event, ok := <-eventChan: + case event, ok := <-listenerChan: if !ok { log.Printf("Docker daemon connection interrupted") - if watching { - client.RemoveEventListener(eventChan) - watching = false - client = nil - } + client.RemoveEventListener(listenerChan) + client = nil + listenerChan = nil if !g.retry { - // close all watchers and exit - for _, watcher := range watchers { - close(watcher) - } + clientDone <- true return } - // recreate channel and attempt to resume - eventChan = make(chan *docker.APIEvents, 100) time.Sleep(10 * time.Second) - break } if event.Status == "start" || event.Status == "stop" || event.Status == "die" { log.Printf("Received event %s for container %s", event.Status, event.ID[:12]) // fanout event to all watchers - for _, watcher := range watchers { - watcher <- event - } + eventChan <- event } case <-time.After(10 * time.Second): // check for docker liveness err := client.Ping() if err != nil { log.Printf("Unable to ping docker daemon: %s", err) - if watching { - client.RemoveEventListener(eventChan) - watching = false - client = nil - } - } - case sig := <-sigChan: - log.Printf("Received signal: %s\n", sig) - switch sig { - case syscall.SIGTERM, syscall.SIGINT: - // close all watchers and exit - for _, watcher := range watchers { - close(watcher) - } - return + client.RemoveEventListener(listenerChan) + client = nil + listenerChan = nil } + case <-done: + log.Printf("Done signal received") + client.RemoveEventListener(listenerChan) + client = nil + listenerChan = nil + return + } + } + }(endpoint) + } + + go func() { + sigChan, cleanup := newSignalChannel() + defer cleanup() + defer close(done) + defer func() { + for _, watcher := range watchers { + close(watcher) + } + }() + + for { + select { + case event := <-eventChan: + if event == nil { + g.generateFromContainers() + continue + } + // fanout event to all watchers + for _, watcher := range watchers { + watcher <- event + } + case <-clientDone: + return + case sig := <-sigChan: + log.Printf("Received signal: %s\n", sig) + switch sig { + case syscall.SIGTERM, syscall.SIGINT: + return } } } @@ -365,111 +400,112 @@ func (g *generator) getContainers() ([]*context.RuntimeContainer, error) { context.SetServerInfo(apiInfo) } - apiContainers, err := g.Client.ListContainers(docker.ListContainersOptions{ - All: g.All, - Size: false, - }) - if err != nil { - return nil, err - } - containers := []*context.RuntimeContainer{} - for _, apiContainer := range apiContainers { - opts := docker.InspectContainerOptions{ID: apiContainer.ID} - container, err := g.Client.InspectContainerWithOptions(opts) + for _, client := range g.SwarmClients { + apiContainers, err := client.ListContainers(docker.ListContainersOptions{ + All: g.All, + Size: false, + }) if err != nil { - log.Printf("Error inspecting container: %s: %s\n", apiContainer.ID, err) - continue + return nil, err } - registry, repository, tag := dockerclient.SplitDockerImage(container.Config.Image) - runtimeContainer := &context.RuntimeContainer{ - ID: container.ID, - Image: context.DockerImage{ - Registry: registry, - Repository: repository, - Tag: tag, - }, - State: context.State{ - Running: container.State.Running, - }, - Name: strings.TrimLeft(container.Name, "/"), - Hostname: container.Config.Hostname, - Gateway: container.NetworkSettings.Gateway, - Addresses: []context.Address{}, - Networks: []context.Network{}, - Env: make(map[string]string), - Volumes: make(map[string]context.Volume), - Node: context.SwarmNode{}, - Labels: make(map[string]string), - IP: container.NetworkSettings.IPAddress, - IP6LinkLocal: container.NetworkSettings.LinkLocalIPv6Address, - IP6Global: container.NetworkSettings.GlobalIPv6Address, - } - for k, v := range container.NetworkSettings.Ports { - address := context.Address{ + for _, apiContainer := range apiContainers { + opts := docker.InspectContainerOptions{ID: apiContainer.ID} + container, err := client.InspectContainerWithOptions(opts) + if err != nil { + log.Printf("Error inspecting container: %s: %s\n", apiContainer.ID, err) + continue + } + + registry, repository, tag := dockerclient.SplitDockerImage(container.Config.Image) + runtimeContainer := &context.RuntimeContainer{ + ID: container.ID, + Image: context.DockerImage{ + Registry: registry, + Repository: repository, + Tag: tag, + }, + State: context.State{ + Running: container.State.Running, + }, + Name: strings.TrimLeft(container.Name, "/"), + Hostname: container.Config.Hostname, + Gateway: container.NetworkSettings.Gateway, + Addresses: []context.Address{}, + Networks: []context.Network{}, + Env: make(map[string]string), + Volumes: make(map[string]context.Volume), + Node: context.SwarmNode{}, + Labels: make(map[string]string), IP: container.NetworkSettings.IPAddress, IP6LinkLocal: container.NetworkSettings.LinkLocalIPv6Address, IP6Global: container.NetworkSettings.GlobalIPv6Address, - Port: k.Port(), - Proto: k.Proto(), } - if len(v) > 0 { - address.HostPort = v[0].HostPort - address.HostIP = v[0].HostIP - } - runtimeContainer.Addresses = append(runtimeContainer.Addresses, - address) + for k, v := range container.NetworkSettings.Ports { + address := context.Address{ + IP: container.NetworkSettings.IPAddress, + IP6LinkLocal: container.NetworkSettings.LinkLocalIPv6Address, + IP6Global: container.NetworkSettings.GlobalIPv6Address, + Port: k.Port(), + Proto: k.Proto(), + } + if len(v) > 0 { + address.HostPort = v[0].HostPort + address.HostIP = v[0].HostIP + } + runtimeContainer.Addresses = append(runtimeContainer.Addresses, + address) - } - for k, v := range container.NetworkSettings.Networks { - network := context.Network{ - IP: v.IPAddress, - Name: k, - Gateway: v.Gateway, - EndpointID: v.EndpointID, - IPv6Gateway: v.IPv6Gateway, - GlobalIPv6Address: v.GlobalIPv6Address, - MacAddress: v.MacAddress, - GlobalIPv6PrefixLen: v.GlobalIPv6PrefixLen, - IPPrefixLen: v.IPPrefixLen, } + for k, v := range container.NetworkSettings.Networks { + network := context.Network{ + IP: v.IPAddress, + Name: k, + Gateway: v.Gateway, + EndpointID: v.EndpointID, + IPv6Gateway: v.IPv6Gateway, + GlobalIPv6Address: v.GlobalIPv6Address, + MacAddress: v.MacAddress, + GlobalIPv6PrefixLen: v.GlobalIPv6PrefixLen, + IPPrefixLen: v.IPPrefixLen, + } - runtimeContainer.Networks = append(runtimeContainer.Networks, - network) - } - for k, v := range container.Volumes { - runtimeContainer.Volumes[k] = context.Volume{ - Path: k, - HostPath: v, - ReadWrite: container.VolumesRW[k], + runtimeContainer.Networks = append(runtimeContainer.Networks, + network) } - } - if container.Node != nil { - runtimeContainer.Node.ID = container.Node.ID - runtimeContainer.Node.Name = container.Node.Name - runtimeContainer.Node.Address = context.Address{ - IP: container.Node.IP, + for k, v := range container.Volumes { + runtimeContainer.Volumes[k] = context.Volume{ + Path: k, + HostPath: v, + ReadWrite: container.VolumesRW[k], + } + } + if container.Node != nil { + runtimeContainer.Node.ID = container.Node.ID + runtimeContainer.Node.Name = container.Node.Name + runtimeContainer.Node.Address = context.Address{ + IP: container.Node.IP, + } } - } - for _, v := range container.Mounts { - runtimeContainer.Mounts = append(runtimeContainer.Mounts, context.Mount{ - Name: v.Name, - Source: v.Source, - Destination: v.Destination, - Driver: v.Driver, - Mode: v.Mode, - RW: v.RW, - }) - } + for _, v := range container.Mounts { + runtimeContainer.Mounts = append(runtimeContainer.Mounts, context.Mount{ + Name: v.Name, + Source: v.Source, + Destination: v.Destination, + Driver: v.Driver, + Mode: v.Mode, + RW: v.RW, + }) + } - runtimeContainer.Env = utils.SplitKeyValueSlice(container.Config.Env) - runtimeContainer.Labels = container.Config.Labels - containers = append(containers, runtimeContainer) + runtimeContainer.Env = utils.SplitKeyValueSlice(container.Config.Env) + runtimeContainer.Labels = container.Config.Labels + containers = append(containers, runtimeContainer) + } } return containers, nil - } func newSignalChannel() (<-chan os.Signal, func()) { diff --git a/internal/generator/generator_test.go b/internal/generator/generator_test.go index 4675f251..1eded036 100644 --- a/internal/generator/generator_test.go +++ b/internal/generator/generator_test.go @@ -16,8 +16,6 @@ import ( docker "github.com/fsouza/go-dockerclient" dockertest "github.com/fsouza/go-dockerclient/testing" "github.com/nginx-proxy/docker-gen/internal/config" - "github.com/nginx-proxy/docker-gen/internal/context" - "github.com/nginx-proxy/docker-gen/internal/dockerclient" ) func TestGenerateFromEvents(t *testing.T) { @@ -106,11 +104,6 @@ func TestGenerateFromEvents(t *testing.T) { })) serverURL := fmt.Sprintf("tcp://%s", strings.TrimRight(strings.TrimPrefix(server.URL(), "http://"), "/")) - client, err := dockerclient.NewDockerClient(serverURL, false, "", "", "") - if err != nil { - t.Errorf("Failed to create client: %s", err) - } - client.SkipServerVersionCheck = true tmplFile, err := os.CreateTemp(os.TempDir(), "docker-gen-tmpl") if err != nil { @@ -140,16 +133,10 @@ func TestGenerateFromEvents(t *testing.T) { } }() - apiVersion, err := client.Version() - if err != nil { - t.Errorf("Failed to retrieve docker server version info: %v\n", err) - } - context.SetDockerEnv(apiVersion) // prevents a panic - - generator := &generator{ - Client: client, - Endpoint: serverURL, - Configs: config.ConfigFile{ + generator, err := NewGenerator(GeneratorConfig{ + Endpoint: serverURL, + TLSVerify: false, + ConfigFile: config.ConfigFile{ Config: []config.Config{ { Template: tmplFile.Name(), @@ -176,9 +163,13 @@ func TestGenerateFromEvents(t *testing.T) { }, }, }, - retry: false, + }) + if err != nil { + t.Errorf("Error creating generator: %v\n", err) } + generator.retry = false + generator.generateFromEvents() generator.wg.Wait()