diff --git a/manager/runner.go b/manager/runner.go index 856822b4e..5388fbf61 100644 --- a/manager/runner.go +++ b/manager/runner.go @@ -23,7 +23,7 @@ import ( "github.com/hashicorp/consul-template/template" "github.com/hashicorp/consul-template/watch" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-multierror" ) const ( @@ -38,6 +38,10 @@ type Runner struct { ErrCh chan error DoneCh chan struct{} + // ServerErrCh is a channel to surface error responses from the server up the calling stack + // and will only hold a maximum of one error at a time + ServerErrCh chan error + // config is the Config that created this Runner. It is used internally to // construct other objects and pass data. config *config.Config @@ -199,6 +203,7 @@ func NewRunner(config *config.Config, dry bool) (*Runner, error) { runner := &Runner{ ErrCh: make(chan error), DoneCh: make(chan struct{}), + ServerErrCh: make(chan error, 1), config: config, dry: dry, inStream: os.Stdin, @@ -434,7 +439,17 @@ func (r *Runner) Start() { log.Printf("[ERR] (runner) watcher reported error: %s", err) r.ErrCh <- err return - + case err := <-r.watcher.ServerErrCh(): + // If we got a server error we push the error up the stack + log.Printf("[ERR] (runner) sending server error back to caller") + // Drain the error channel if anything already exists + select { + case <-r.ServerErrCh: + continue + default: + } + r.ServerErrCh <- err + goto OUTER case err := <-r.vaultTokenWatcher.ErrCh(): // Push the error back up the stack log.Printf("[ERR] (runner): %s", err) diff --git a/watch/view.go b/watch/view.go index 59c45bea6..f29e1531b 100644 --- a/watch/view.go +++ b/watch/view.go @@ -121,7 +121,7 @@ func (v *View) DataAndLastIndex() (interface{}, uint64) { // accounts for interrupts on the interrupt channel. This allows the poll // function to be fired in a goroutine, but then halted even if the fetch // function is in the middle of a blocking query. -func (v *View) poll(viewCh chan<- *View, errCh chan<- error) { +func (v *View) poll(viewCh chan<- *View, errCh chan<- error, serverErrCh chan<- error) { var retries int for { @@ -162,6 +162,7 @@ func (v *View) poll(viewCh chan<- *View, errCh chan<- error) { case err := <-fetchErrCh: if !errors.Is(err, errLookup) && v.retryFunc != nil { retry, sleep := v.retryFunc(retries) + serverErrCh <- err if retry { log.Printf("[WARN] (view) %s (retry attempt %d after %q)", err, retries+1, sleep) diff --git a/watch/view_test.go b/watch/view_test.go index 1e9ee8e7c..a39dd03f2 100644 --- a/watch/view_test.go +++ b/watch/view_test.go @@ -20,8 +20,9 @@ func TestPoll_returnsViewCh(t *testing.T) { viewCh := make(chan *View) errCh := make(chan error) + serverErrCh := make(chan error) - go view.poll(viewCh, errCh) + go view.poll(viewCh, errCh, serverErrCh) defer view.stop() select { @@ -44,8 +45,9 @@ func TestPoll_returnsErrCh(t *testing.T) { viewCh := make(chan *View) errCh := make(chan error) + serverErrCh := make(chan error) - go view.poll(viewCh, errCh) + go view.poll(viewCh, errCh, serverErrCh) defer view.stop() select { @@ -53,9 +55,16 @@ func TestPoll_returnsErrCh(t *testing.T) { t.Errorf("expected no data, but got %+v", data) case err := <-errCh: expected := "failed to contact server" + select { + case <-serverErrCh: + t.Errorf("sent server error, expected lookup error") + default: + + } if err.Error() != expected { t.Errorf("expected %q to be %q", err.Error(), expected) } + case <-view.stopCh: t.Errorf("poll received premature stop") } @@ -71,8 +80,9 @@ func TestPoll_stopsViewStopCh(t *testing.T) { viewCh := make(chan *View) errCh := make(chan error) + serverErrCh := make(chan error) - go view.poll(viewCh, errCh) + go view.poll(viewCh, errCh, serverErrCh) view.stop() select { @@ -80,6 +90,8 @@ func TestPoll_stopsViewStopCh(t *testing.T) { t.Errorf("expected no data, but received view data") case err := <-errCh: t.Errorf("error while polling: %s", err) + case err := <-serverErrCh: + t.Errorf("server error while polling : %s", err) case <-time.After(20 * time.Millisecond): // No data was received, test passes } @@ -95,8 +107,9 @@ func TestPoll_once(t *testing.T) { viewCh := make(chan *View) errCh := make(chan error) + serverErrCh := make(chan error) - go view.poll(viewCh, errCh) + go view.poll(viewCh, errCh, serverErrCh) defer view.stop() select { @@ -113,6 +126,8 @@ func TestPoll_once(t *testing.T) { t.Errorf("expected no data (should have stopped), but received view data") case err := <-errCh: t.Errorf("error while polling: %s", err) + case err := <-serverErrCh: + t.Errorf("server error while polling: %s", err) case <-view.stopCh: t.Errorf("poll received premature stop") case <-time.After(20 * time.Millisecond): @@ -133,8 +148,9 @@ func TestPoll_retries(t *testing.T) { viewCh := make(chan *View) errCh := make(chan error) + serverErrCh := make(chan error) - go view.poll(viewCh, errCh) + go view.poll(viewCh, errCh, serverErrCh) defer view.stop() select { @@ -143,6 +159,9 @@ func TestPoll_retries(t *testing.T) { case <-time.After(100 * time.Millisecond): } + // need to receive error to avoid timeout + <-serverErrCh + select { case <-viewCh: // Got this far, so the test passes @@ -291,8 +310,9 @@ func TestStop_stopsPolling(t *testing.T) { viewCh := make(chan *View) errCh := make(chan error) + serverErrCh := make(chan error) - go view.poll(viewCh, errCh) + go view.poll(viewCh, errCh, serverErrCh) view.stop() select { @@ -300,6 +320,8 @@ func TestStop_stopsPolling(t *testing.T) { t.Errorf("got unexpected view: %#v", v) case err := <-errCh: t.Error(err) + case err := <-serverErrCh: + t.Error(err) case <-view.stopCh: // Successfully stopped } diff --git a/watch/watcher.go b/watch/watcher.go index 4eb025b99..571e43fcf 100644 --- a/watch/watcher.go +++ b/watch/watcher.go @@ -30,6 +30,9 @@ type Watcher struct { // errCh is the chan where any errors will be published. errCh chan error + // serverErrCh is the chan where response errors from the server will be published + serverErrCh chan error + // blockQueryWaitTime is amount of time in seconds to do a blocking query for blockQueryWaitTime time.Duration @@ -95,6 +98,7 @@ func NewWatcher(i *NewWatcherInput) *Watcher { depViewMap: make(map[string]*View), dataCh: make(chan *View, dataBufferSize), errCh: make(chan error), + serverErrCh: make(chan error), maxStale: i.MaxStale, once: i.Once, blockQueryWaitTime: i.BlockQueryWaitTime, @@ -121,6 +125,15 @@ func (w *Watcher) ErrCh() <-chan error { return w.errCh } +// ServerErrCh returns a read-only channel of errors returned by the server +// as a response to each consul-template instance +func (w *Watcher) ServerErrCh() <-chan error { + if w == nil { + return nil + } + return w.serverErrCh +} + // Add adds the given dependency to the list of monitored dependencies // and start the associated view. If the dependency already exists, no action is // taken. @@ -169,7 +182,7 @@ func (w *Watcher) Add(d dep.Dependency) (bool, error) { log.Printf("[TRACE] (watcher) %s starting", d) w.depViewMap[d.String()] = v - go v.poll(w.dataCh, w.errCh) + go v.poll(w.dataCh, w.errCh, w.serverErrCh) return true, nil }