Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create ServerErrCh to surface server errors back to caller. #1897

Merged
merged 5 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions manager/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/hashicorp/consul-template/template"
"github.com/hashicorp/consul-template/watch"

multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
)

const (
Expand All @@ -38,6 +38,10 @@ type Runner struct {
ErrCh chan error
DoneCh chan struct{}

// ServerErrCh is a channel to surface error responses from the server up the calling stack
// and will only hold a maximum of one error at a time
ServerErrCh chan error

// config is the Config that created this Runner. It is used internally to
// construct other objects and pass data.
config *config.Config
Expand Down Expand Up @@ -199,6 +203,7 @@ func NewRunner(config *config.Config, dry bool) (*Runner, error) {
runner := &Runner{
ErrCh: make(chan error),
DoneCh: make(chan struct{}),
ServerErrCh: make(chan error, 1),
config: config,
dry: dry,
inStream: os.Stdin,
Expand Down Expand Up @@ -434,7 +439,17 @@ func (r *Runner) Start() {
log.Printf("[ERR] (runner) watcher reported error: %s", err)
r.ErrCh <- err
return

case err := <-r.watcher.ServerErrCh():
// If we got a server error we push the error up the stack
log.Printf("[ERR] (runner) sending server error back to caller")
// Drain the error channel if anything already exists
select {
case <-r.ServerErrCh:
continue
default:
}
r.ServerErrCh <- err
goto OUTER
case err := <-r.vaultTokenWatcher.ErrCh():
// Push the error back up the stack
log.Printf("[ERR] (runner): %s", err)
Expand Down
3 changes: 2 additions & 1 deletion watch/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (v *View) DataAndLastIndex() (interface{}, uint64) {
// accounts for interrupts on the interrupt channel. This allows the poll
// function to be fired in a goroutine, but then halted even if the fetch
// function is in the middle of a blocking query.
func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
func (v *View) poll(viewCh chan<- *View, errCh chan<- error, serverErrCh chan<- error) {
var retries int

for {
Expand Down Expand Up @@ -162,6 +162,7 @@ func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
case err := <-fetchErrCh:
if !errors.Is(err, errLookup) && v.retryFunc != nil {
retry, sleep := v.retryFunc(retries)
serverErrCh <- err
if retry {
log.Printf("[WARN] (view) %s (retry attempt %d after %q)",
err, retries+1, sleep)
Expand Down
34 changes: 28 additions & 6 deletions watch/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func TestPoll_returnsViewCh(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
Expand All @@ -44,18 +45,26 @@ func TestPoll_returnsErrCh(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
case data := <-viewCh:
t.Errorf("expected no data, but got %+v", data)
case err := <-errCh:
expected := "failed to contact server"
select {
case <-serverErrCh:
t.Errorf("sent server error, expected lookup error")
default:

}
if err.Error() != expected {
t.Errorf("expected %q to be %q", err.Error(), expected)
}

case <-view.stopCh:
t.Errorf("poll received premature stop")
}
Expand All @@ -71,15 +80,18 @@ func TestPoll_stopsViewStopCh(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
view.stop()

select {
case <-viewCh:
t.Errorf("expected no data, but received view data")
case err := <-errCh:
t.Errorf("error while polling: %s", err)
case err := <-serverErrCh:
t.Errorf("server error while polling : %s", err)
case <-time.After(20 * time.Millisecond):
// No data was received, test passes
}
Expand All @@ -95,8 +107,9 @@ func TestPoll_once(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
Expand All @@ -113,6 +126,8 @@ func TestPoll_once(t *testing.T) {
t.Errorf("expected no data (should have stopped), but received view data")
case err := <-errCh:
t.Errorf("error while polling: %s", err)
case err := <-serverErrCh:
t.Errorf("server error while polling: %s", err)
case <-view.stopCh:
t.Errorf("poll received premature stop")
case <-time.After(20 * time.Millisecond):
Expand All @@ -133,8 +148,9 @@ func TestPoll_retries(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
Expand All @@ -143,6 +159,9 @@ func TestPoll_retries(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}

// need to receive error to avoid timeout
<-serverErrCh

select {
case <-viewCh:
// Got this far, so the test passes
Expand Down Expand Up @@ -291,15 +310,18 @@ func TestStop_stopsPolling(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
view.stop()

select {
case v := <-viewCh:
t.Errorf("got unexpected view: %#v", v)
case err := <-errCh:
t.Error(err)
case err := <-serverErrCh:
t.Error(err)
case <-view.stopCh:
// Successfully stopped
}
Expand Down
15 changes: 14 additions & 1 deletion watch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Watcher struct {
// errCh is the chan where any errors will be published.
errCh chan error

// serverErrCh is the chan where response errors from the server will be published
serverErrCh chan error

// blockQueryWaitTime is amount of time in seconds to do a blocking query for
blockQueryWaitTime time.Duration

Expand Down Expand Up @@ -95,6 +98,7 @@ func NewWatcher(i *NewWatcherInput) *Watcher {
depViewMap: make(map[string]*View),
dataCh: make(chan *View, dataBufferSize),
errCh: make(chan error),
serverErrCh: make(chan error),
maxStale: i.MaxStale,
once: i.Once,
blockQueryWaitTime: i.BlockQueryWaitTime,
Expand All @@ -121,6 +125,15 @@ func (w *Watcher) ErrCh() <-chan error {
return w.errCh
}

// ServerErrCh returns a read-only channel of errors returned by the server
// as a response to each consul-template instance
func (w *Watcher) ServerErrCh() <-chan error {
if w == nil {
return nil
}
return w.serverErrCh
}

// Add adds the given dependency to the list of monitored dependencies
// and start the associated view. If the dependency already exists, no action is
// taken.
Expand Down Expand Up @@ -169,7 +182,7 @@ func (w *Watcher) Add(d dep.Dependency) (bool, error) {
log.Printf("[TRACE] (watcher) %s starting", d)

w.depViewMap[d.String()] = v
go v.poll(w.dataCh, w.errCh)
go v.poll(w.dataCh, w.errCh, w.serverErrCh)

return true, nil
}
Expand Down
Loading