Skip to content

Commit

Permalink
Create a channel to surface error from server
Browse files Browse the repository at this point in the history
Create a channel to surface error from server

Created a channel to surface server errors

Create a channel to surface error from server

Dummy commit

Edited linters

Edited runner initialization
  • Loading branch information
divyaac committed Mar 18, 2024
1 parent 062dd60 commit 70ede88
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 11 deletions.
20 changes: 17 additions & 3 deletions manager/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/hashicorp/consul-template/template"
"github.com/hashicorp/consul-template/watch"

multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
)

const (
Expand All @@ -38,6 +38,10 @@ type Runner struct {
ErrCh chan error
DoneCh chan struct{}

// ServerCh is a channel to surface error responses from the server up the calling stack
// and will only hold a maximum of one error at a time
ServerCh chan error

// config is the Config that created this Runner. It is used internally to
// construct other objects and pass data.
config *config.Config
Expand Down Expand Up @@ -199,6 +203,7 @@ func NewRunner(config *config.Config, dry bool) (*Runner, error) {
runner := &Runner{
ErrCh: make(chan error),
DoneCh: make(chan struct{}),
ServerCh: make(chan error),
config: config,
dry: dry,
inStream: os.Stdin,
Expand Down Expand Up @@ -434,11 +439,20 @@ func (r *Runner) Start() {
log.Printf("[ERR] (runner) watcher reported error: %s", err)
r.ErrCh <- err
return

case err := <-r.watcher.ServerCh():
// If we got a server error we push the error up the stack
log.Printf("[ERR] (runner) sending server error back to caller")
// Drain the error channel if anything already exists
select {
case <-r.ServerCh:
continue
default:
}
r.ServerCh <- err
goto OUTER
case err := <-r.vaultTokenWatcher.ErrCh():
// Push the error back up the stack
log.Printf("[ERR] (runner): %s", err)
r.ErrCh <- err
return

case tmpl := <-r.quiescenceCh:
Expand Down
3 changes: 2 additions & 1 deletion watch/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (v *View) DataAndLastIndex() (interface{}, uint64) {
// accounts for interrupts on the interrupt channel. This allows the poll
// function to be fired in a goroutine, but then halted even if the fetch
// function is in the middle of a blocking query.
func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
func (v *View) poll(viewCh chan<- *View, errCh chan<- error, serverErrCh chan<- error) {
var retries int

for {
Expand Down Expand Up @@ -162,6 +162,7 @@ func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
case err := <-fetchErrCh:
if !errors.Is(err, errLookup) && v.retryFunc != nil {
retry, sleep := v.retryFunc(retries)
serverErrCh <- err
if retry {
log.Printf("[WARN] (view) %s (retry attempt %d after %q)",
err, retries+1, sleep)
Expand Down
34 changes: 28 additions & 6 deletions watch/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func TestPoll_returnsViewCh(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
Expand All @@ -44,18 +45,26 @@ func TestPoll_returnsErrCh(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
case data := <-viewCh:
t.Errorf("expected no data, but got %+v", data)
case err := <-errCh:
expected := "failed to contact server"
select {
case <-serverErrCh:
t.Errorf("sent server error, expected lookup error")
default:

}
if err.Error() != expected {
t.Errorf("expected %q to be %q", err.Error(), expected)
}

case <-view.stopCh:
t.Errorf("poll received premature stop")
}
Expand All @@ -71,15 +80,18 @@ func TestPoll_stopsViewStopCh(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
view.stop()

select {
case <-viewCh:
t.Errorf("expected no data, but received view data")
case err := <-errCh:
t.Errorf("error while polling: %s", err)
case err := <-serverErrCh:
t.Errorf("server error while polling : %s", err)
case <-time.After(20 * time.Millisecond):
// No data was received, test passes
}
Expand All @@ -95,8 +107,9 @@ func TestPoll_once(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
Expand All @@ -113,6 +126,8 @@ func TestPoll_once(t *testing.T) {
t.Errorf("expected no data (should have stopped), but received view data")
case err := <-errCh:
t.Errorf("error while polling: %s", err)
case err := <-serverErrCh:
t.Errorf("server error while polling : %s", err)
case <-view.stopCh:
t.Errorf("poll received premature stop")
case <-time.After(20 * time.Millisecond):
Expand All @@ -133,8 +148,9 @@ func TestPoll_retries(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
defer view.stop()

select {
Expand All @@ -143,6 +159,9 @@ func TestPoll_retries(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}

// need to receive error to avoid timeout
<-serverErrCh

select {
case <-viewCh:
// Got this far, so the test passes
Expand Down Expand Up @@ -291,15 +310,18 @@ func TestStop_stopsPolling(t *testing.T) {

viewCh := make(chan *View)
errCh := make(chan error)
serverErrCh := make(chan error)

go view.poll(viewCh, errCh)
go view.poll(viewCh, errCh, serverErrCh)
view.stop()

select {
case v := <-viewCh:
t.Errorf("got unexpected view: %#v", v)
case err := <-errCh:
t.Error(err)
case err := <-serverErrCh:
t.Error(err)
case <-view.stopCh:
// Successfully stopped
}
Expand Down
15 changes: 14 additions & 1 deletion watch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Watcher struct {
// errCh is the chan where any errors will be published.
errCh chan error

// serverCh is the chan where response errors from the server will be published
serverCh chan error

// blockQueryWaitTime is amount of time in seconds to do a blocking query for
blockQueryWaitTime time.Duration

Expand Down Expand Up @@ -95,6 +98,7 @@ func NewWatcher(i *NewWatcherInput) *Watcher {
depViewMap: make(map[string]*View),
dataCh: make(chan *View, dataBufferSize),
errCh: make(chan error),
serverCh: make(chan error),
maxStale: i.MaxStale,
once: i.Once,
blockQueryWaitTime: i.BlockQueryWaitTime,
Expand All @@ -121,6 +125,15 @@ func (w *Watcher) ErrCh() <-chan error {
return w.errCh
}

// ServerCh returns a read-only channel of errors returned by the server
// as a response to each Consul instance
func (w *Watcher) ServerCh() <-chan error {
if w == nil {
return nil
}
return w.serverCh
}

// Add adds the given dependency to the list of monitored dependencies
// and start the associated view. If the dependency already exists, no action is
// taken.
Expand Down Expand Up @@ -169,7 +182,7 @@ func (w *Watcher) Add(d dep.Dependency) (bool, error) {
log.Printf("[TRACE] (watcher) %s starting", d)

w.depViewMap[d.String()] = v
go v.poll(w.dataCh, w.errCh)
go v.poll(w.dataCh, w.errCh, w.serverCh)

return true, nil
}
Expand Down

0 comments on commit 70ede88

Please sign in to comment.