From 2e928e1ecbcf36f3b178f800e93d25d088cd2131 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Mon, 16 Oct 2023 20:30:21 +0200 Subject: [PATCH 1/9] filters/scheduler: handle errors (#2680) Make lifo, lifoGroup and fifo filters backed error aware and remove cleanup from the proxy. Follow up on #1054, #1086 and #2239 Updates #1238 Signed-off-by: Alexander Yastrebov --- filters/scheduler/cleanup_test.go | 85 +++++++++++++++++++++++++++++++ filters/scheduler/fifo.go | 6 +++ filters/scheduler/lifo.go | 14 ++++- proxy/proxy.go | 24 --------- 4 files changed, 104 insertions(+), 25 deletions(-) create mode 100644 filters/scheduler/cleanup_test.go diff --git a/filters/scheduler/cleanup_test.go b/filters/scheduler/cleanup_test.go new file mode 100644 index 0000000000..a479173561 --- /dev/null +++ b/filters/scheduler/cleanup_test.go @@ -0,0 +1,85 @@ +package scheduler + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/routing" + "github.com/zalando/skipper/routing/testdataclient" + "github.com/zalando/skipper/scheduler" +) + +func TestCleanupOnBackendErrors(t *testing.T) { + doc := ` + aroute: * + -> lifo(1, 1, "100ms") + -> lifoGroup("foo", 1, 1, "100ms") + -> lifo(2, 2, "200ms") + -> lifoGroup("bar", 1, 1, "100ms") + -> fifo(1, 1, "200ms") + -> "http://test.invalid" + ` + + dc, err := testdataclient.NewDoc(doc) + require.NoError(t, err) + defer dc.Close() + + reg := scheduler.RegistryWith(scheduler.Options{}) + defer reg.Close() + + fr := make(filters.Registry) + fr.Register(NewLIFO()) + fr.Register(NewLIFOGroup()) + fr.Register(NewFifo()) + + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, + } + + rt := routing.New(ro) + defer rt.Close() + + <-rt.FirstLoad() + + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + }) + defer pr.Close() + + ts := httptest.NewServer(pr) + defer ts.Close() + + rsp, err := http.Get(ts.URL) + require.NoError(t, err) + rsp.Body.Close() + + var route *routing.Route + { + req, err := http.NewRequest("GET", ts.URL, nil) + require.NoError(t, err) + + route, _ = rt.Get().Do(req) + require.NotNil(t, route, "failed to lookup route") + } + + for _, f := range route.Filters { + if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.Queue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.FifoQueue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else { + t.Fatal("filter does not implement GetQueue()") + } + } +} diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index 040d92a3ad..eca58e59fe 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -150,3 +150,9 @@ func (f *fifoFilter) Response(ctx filters.FilterContext) { pending[last]() ctx.StateBag()[fifoKey] = pending[:last] } + +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (f *fifoFilter) HandleErrorResponse() bool { + return true +} diff --git a/filters/scheduler/lifo.go b/filters/scheduler/lifo.go index 60db279a58..717580bcd9 100644 --- a/filters/scheduler/lifo.go +++ b/filters/scheduler/lifo.go @@ -243,7 +243,7 @@ func (l *lifoFilter) Close() error { // increase the number of inflight requests and respond to the caller, // if the bounded queue returns an error. Status code by Error: // -// - 503 if jobqueue.ErrQueueFull +// - 503 if jobqueue.ErrStackFull // - 502 if jobqueue.ErrTimeout func (l *lifoFilter) Request(ctx filters.FilterContext) { request(l.GetQueue(), scheduler.LIFOKey, ctx) @@ -255,6 +255,12 @@ func (l *lifoFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoFilter) HandleErrorResponse() bool { + return true +} + func (l *lifoGroupFilter) Group() string { return l.name } @@ -300,6 +306,12 @@ func (l *lifoGroupFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoGroupFilter) HandleErrorResponse() bool { + return true +} + func request(q *scheduler.Queue, key string, ctx filters.FilterContext) { if q == nil { ctx.Logger().Warnf("Unexpected scheduler.Queue is nil for key %s", key) diff --git a/proxy/proxy.go b/proxy/proxy.go index 2d2ef831d8..ba119efb3f 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -40,7 +40,6 @@ import ( "github.com/zalando/skipper/ratelimit" "github.com/zalando/skipper/rfc" "github.com/zalando/skipper/routing" - "github.com/zalando/skipper/scheduler" "github.com/zalando/skipper/tracing" ) @@ -1046,29 +1045,6 @@ func (p *Proxy) do(ctx *context) (err error) { return errMaxLoopbacksReached } - // this can be deleted after fixing - // https://github.com/zalando/skipper/issues/1238 problem - // happens if we get proxy errors, for example connect errors, - // which would block responses until fifo() timeouts. - defer func() { - stateBag := ctx.StateBag() - - pendingFIFO, _ := stateBag[scheduler.FIFOKey].([]func()) - for _, done := range pendingFIFO { - done() - } - - pendingLIFO, _ := stateBag[scheduler.LIFOKey].([]func()) - for _, done := range pendingLIFO { - done() - } - - // Cleanup state bag to avoid double call of done() - // because do() could be called for loopback backend - delete(stateBag, scheduler.FIFOKey) - delete(stateBag, scheduler.LIFOKey) - }() - // proxy global setting if !ctx.wasExecuted() { if settings, retryAfter := p.limiters.Check(ctx.request); retryAfter > 0 { From f85f4a811f6b66063af9db952fd0938ab2407113 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Thu, 19 Oct 2023 16:23:49 +0200 Subject: [PATCH 2/9] filters/scheduler: refactor fifo tests (#2693) * separate argument tests from behaviour test * use eskip to define filter configuration * use TestFifo prefix for all test names Signed-off-by: Alexander Yastrebov --- filters/scheduler/fifo_test.go | 260 ++++++++++----------------------- net/httptest/client.go | 6 - 2 files changed, 79 insertions(+), 187 deletions(-) diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index b0ed9bda38..e23a410e66 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -5,11 +5,11 @@ import ( "io" "net/http" stdlibhttptest "net/http/httptest" - "net/url" "testing" "time" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/metrics/metricstest" "github.com/zalando/skipper/net/httptest" @@ -19,11 +19,12 @@ import ( "github.com/zalando/skipper/scheduler" ) -func TestCreateFifoFilter(t *testing.T) { +func TestFifoCreateFilter(t *testing.T) { for _, tt := range []struct { name string args []interface{} wantParseErr bool + wantConfig scheduler.Config }{ { name: "fifo no args", @@ -51,6 +52,11 @@ func TestCreateFifoFilter(t *testing.T) { 5, "1s", }, + wantConfig: scheduler.Config{ + MaxConcurrency: 3, + MaxQueueSize: 5, + Timeout: 1 * time.Second, + }, }, { name: "fifo wrong type arg1", @@ -108,91 +114,59 @@ func TestCreateFifoFilter(t *testing.T) { if err == nil && tt.wantParseErr { t.Fatal("Failed to get wanted error on create filter") } + if tt.wantParseErr { + return + } - if _, ok := ff.(*fifoFilter); !ok && err == nil { + f, ok := ff.(*fifoFilter) + if !ok { t.Fatal("Failed to convert filter to *fifoFilter") } + + // validate config + config := f.Config() + if config != tt.wantConfig { + t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) + } + if f.queue != f.GetQueue() { + t.Fatal("Failed to get expected queue") + } }) } } func TestFifo(t *testing.T) { for _, tt := range []struct { - name string - args []interface{} - freq int - per time.Duration - backendTime time.Duration - clientTimeout time.Duration - wantConfig scheduler.Config - wantParseErr bool - wantOkRate float64 - epsilon float64 + name string + filter string + freq int + per time.Duration + backendTime time.Duration + wantOkRate float64 }{ { - name: "fifo defaults", - args: []interface{}{}, - wantParseErr: true, - }, - { - name: "fifo simple ok", - args: []interface{}{ - 3, - 5, - "1s", - }, - freq: 20, - per: 100 * time.Millisecond, - backendTime: 1 * time.Millisecond, - clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: time.Second, - }, - wantParseErr: false, - wantOkRate: 1.0, - epsilon: 1, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue timeouts", - args: []interface{}{ - 3, - 5, - "10ms", - }, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, - clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: 10 * time.Millisecond, - }, - wantParseErr: false, - wantOkRate: 0.1, - epsilon: 1, + name: "fifo with reaching max concurrency and queue timeouts", + filter: `fifo(3, 5, "10ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 10 * time.Millisecond, + wantOkRate: 0.1, }, { - name: "fifo with reaching max concurrency and queue full", - args: []interface{}{ - 1, - 1, - "250ms", - }, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 100 * time.Millisecond, - clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 1, - MaxQueueSize: 1, - Timeout: 250 * time.Millisecond, - }, - wantParseErr: false, - wantOkRate: 0.0008, - epsilon: 1, + name: "fifo with reaching max concurrency and queue full", + filter: `fifo(3, 5, "250ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 100 * time.Millisecond, + wantOkRate: 0.0008, }, } { t.Run(tt.name, func(t *testing.T) { @@ -201,29 +175,6 @@ func TestFifo(t *testing.T) { t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) } - // no parse error - ff, err := fs.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) - } - if err == nil && tt.wantParseErr { - t.Fatalf("want parse error but have no: %v", err) - } - if tt.wantParseErr { - return - } - - // validate config - if f, ok := ff.(*fifoFilter); ok { - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) - } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") - } - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -241,22 +192,11 @@ func TestFifo(t *testing.T) { })) defer backend.Close() - var fmtStr string - switch len(tt.args) { - case 0: - fmtStr = `aroute: * -> fifo() -> "%s"` - case 1: - fmtStr = `aroute: * -> fifo(%v) -> "%s"` - case 2: - fmtStr = `aroute: * -> fifo(%v, %v) -> "%s"` - case 3: - fmtStr = `aroute: * -> fifo(%v, %v, "%v") -> "%s"` - default: - t.Fatalf("Test not possible %d >3", len(tt.args)) + if ff := eskip.MustParseFilters(tt.filter); len(ff) != 1 { + t.Fatalf("expected one filter, got %d", len(ff)) } - args := append(tt.args, backend.URL) - doc := fmt.Sprintf(fmtStr, args...) + doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) t.Logf("%s", doc) dc, err := testdataclient.NewDoc(doc) @@ -285,14 +225,9 @@ func TestFifo(t *testing.T) { ts := stdlibhttptest.NewServer(pr) defer ts.Close() - reqURL, err := url.Parse(ts.URL) + rsp, err := ts.Client().Get(ts.URL) if err != nil { - t.Fatalf("Failed to parse url %s: %v", ts.URL, err) - } - - rsp, err := http.DefaultClient.Get(reqURL.String()) - if err != nil { - t.Fatalf("Failed to get response from %s: %v", reqURL.String(), err) + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } defer rsp.Body.Close() @@ -300,7 +235,9 @@ func TestFifo(t *testing.T) { t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) } - va := httptest.NewVegetaAttacker(reqURL.String(), tt.freq, tt.per, tt.clientTimeout) + const clientTimeout = 1 * time.Second + + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) t.Logf("Success [0..1]: %0.2f", va.Success()) @@ -327,40 +264,24 @@ func TestFifo(t *testing.T) { } } -func TestConstantRouteUpdatesFifo(t *testing.T) { +func TestFifoConstantRouteUpdates(t *testing.T) { for _, tt := range []struct { - name string - args []interface{} - freq int - per time.Duration - updateRate time.Duration - backendTime time.Duration - clientTimeout time.Duration - wantConfig scheduler.Config - wantParseErr bool - wantOkRate float64 - epsilon float64 + name string + filter string + freq int + per time.Duration + updateRate time.Duration + backendTime time.Duration + wantOkRate float64 }{ { - name: "fifo simple ok", - args: []interface{}{ - 3, - 5, - "1s", - }, - freq: 20, - per: 100 * time.Millisecond, - updateRate: 25 * time.Millisecond, - backendTime: 1 * time.Millisecond, - clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: time.Second, - }, - wantParseErr: false, - wantOkRate: 1.0, - epsilon: 1, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + wantOkRate: 1.0, }, } { t.Run(tt.name, func(t *testing.T) { @@ -369,29 +290,6 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) } - // no parse error - ff, err := fs.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) - } - if err == nil && tt.wantParseErr { - t.Fatalf("want parse error but have no: %v", err) - } - if tt.wantParseErr { - return - } - - // validate config - if f, ok := ff.(*fifoFilter); ok { - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) - } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") - } - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -409,8 +307,11 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { })) defer backend.Close() - args := append(tt.args, backend.URL) - doc := fmt.Sprintf(`aroute: * -> fifo(%v, %v, "%v") -> "%s"`, args...) + if ff := eskip.MustParseFilters(tt.filter); len(ff) != 1 { + t.Fatalf("expected one filter, got %d", len(ff)) + } + + doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) dc, err := testdataclient.NewDoc(doc) if err != nil { @@ -438,14 +339,9 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { ts := stdlibhttptest.NewServer(pr) defer ts.Close() - reqURL, err := url.Parse(ts.URL) + rsp, err := ts.Client().Get(ts.URL) if err != nil { - t.Fatalf("Failed to parse url %s: %v", ts.URL, err) - } - - rsp, err := http.DefaultClient.Get(reqURL.String()) - if err != nil { - t.Fatalf("Failed to get response from %s: %v", reqURL.String(), err) + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } defer rsp.Body.Close() @@ -475,7 +371,9 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { }(quit, tt.updateRate, doc, newDoc) - va := httptest.NewVegetaAttacker(reqURL.String(), tt.freq, tt.per, tt.clientTimeout) + const clientTimeout = 1 * time.Second + + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) quit <- struct{}{} diff --git a/net/httptest/client.go b/net/httptest/client.go index 9b6b44f06b..778913cc0c 100644 --- a/net/httptest/client.go +++ b/net/httptest/client.go @@ -2,7 +2,6 @@ package httptest import ( "io" - "log" "strconv" "time" @@ -64,13 +63,8 @@ func (atk *VegetaAttacker) Attack(w io.Writer, d time.Duration, name string) { continue } atk.metrics.Add(res) - //metrics.Latencies.Add(res.Latency) } atk.metrics.Close() - // logrus.Info("histogram reporter:") - // histReporter := vegeta.NewHistogramReporter(atk.metrics.Histogram) - // histReporter.Report(os.Stdout) - log.Print("text reporter:") reporter := vegeta.NewTextReporter(atk.metrics) reporter.Report(w) } From 5d387a571fead893877fba3dba379b93223b28e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Fri, 20 Oct 2023 13:34:55 +0200 Subject: [PATCH 3/9] feature: filter fifoWithBody (#2685) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature: filter fifoWithBody that works similar to fifo(), but release deferred until body streaming to client was finished * fix: fifo() and fifoWithBody() with canceled requests Both filters did not check for canceled context from request before semaphore.Acquire, see https://github.com/golang/go/issues/63615 Signed-off-by: Sandor Szücs --- docs/reference/filters.md | 25 ++ filters/builtin/builtin.go | 1 + filters/filters.go | 1 + filters/scheduler/fifo.go | 52 ++-- filters/scheduler/fifo_test.go | 445 +++++++++++++++++++++++++++------ proxy/proxy.go | 11 + scheduler/scheduler.go | 14 ++ 7 files changed, 460 insertions(+), 89 deletions(-) diff --git a/docs/reference/filters.md b/docs/reference/filters.md index abfd5aaa70..fd72fbd1fd 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2819,6 +2819,31 @@ Example: fifo(100, 150, "10s") ``` +### fifoWithBody + +This Filter is similar to the [lifo](#lifo) filter in regards to +parameters and status codes. +Performance considerations are similar to [fifo](#fifo). + +The difference between fifo and fifoWithBody is that fifo will decrement +the concurrency as soon as the backend sent response headers and +fifoWithBody will decrement the concurrency if the response body was +served. Normally both are very similar, but if you have a fully async +component that serves multiple website fragments, this would decrement +concurrency too early. + +Parameters: + +* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int) +* MaxQueueSize sets the queue size (int) +* Timeout sets the timeout to get request scheduled (time) + +Example: + +``` +fifoWithBody(100, 150, "10s") +``` + ### lifo This Filter changes skipper to handle the route with a bounded last in diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 0609d87412..d4e1895bf4 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -216,6 +216,7 @@ func Filters() []filters.Spec { auth.NewForwardToken(), auth.NewForwardTokenField(), scheduler.NewFifo(), + scheduler.NewFifoWithBody(), scheduler.NewLIFO(), scheduler.NewLIFOGroup(), rfc.NewPath(), diff --git a/filters/filters.go b/filters/filters.go index 6caaaa7a41..9f9a1f333a 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -327,6 +327,7 @@ const ( SetDynamicBackendUrl = "setDynamicBackendUrl" ApiUsageMonitoringName = "apiUsageMonitoring" FifoName = "fifo" + FifoWithBodyName = "fifoWithBody" LifoName = "lifo" LifoGroupName = "lifoGroup" RfcPathName = "rfcPath" diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index eca58e59fe..50ceb95b65 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -11,24 +11,31 @@ import ( "github.com/zalando/skipper/scheduler" ) -const ( - fifoKey string = "fifo" -) - type ( - fifoSpec struct{} + fifoSpec struct { + typ string + } fifoFilter struct { config scheduler.Config queue *scheduler.FifoQueue + typ string } ) func NewFifo() filters.Spec { - return &fifoSpec{} + return &fifoSpec{ + typ: filters.FifoName, + } +} + +func NewFifoWithBody() filters.Spec { + return &fifoSpec{ + typ: filters.FifoWithBodyName, + } } -func (*fifoSpec) Name() string { - return filters.FifoName +func (s *fifoSpec) Name() string { + return s.typ } // CreateFilter creates a fifoFilter, that will use a semaphore based @@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) { } return &fifoFilter{ + typ: s.typ, config: scheduler.Config{ MaxConcurrency: cc, MaxQueueSize: qs, @@ -132,23 +140,29 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) { } // ok - pending, _ := ctx.StateBag()[fifoKey].([]func()) - ctx.StateBag()[fifoKey] = append(pending, done) + pending, _ := ctx.StateBag()[f.typ].([]func()) + ctx.StateBag()[f.typ] = append(pending, done) } // Response will decrease the number of inflight requests to release // the concurrency reservation for the request. func (f *fifoFilter) Response(ctx filters.FilterContext) { - pending, ok := ctx.StateBag()[fifoKey].([]func()) - if !ok { - return - } - last := len(pending) - 1 - if last < 0 { - return + switch f.typ { + case filters.FifoName: + pending, ok := ctx.StateBag()[f.typ].([]func()) + if !ok { + return + } + last := len(pending) - 1 + if last < 0 { + return + } + pending[last]() + ctx.StateBag()[f.typ] = pending[:last] + + case filters.FifoWithBodyName: + // nothing to do here, handled in the proxy after copyStream() } - pending[last]() - ctx.StateBag()[fifoKey] = pending[:last] } // HandleErrorResponse is to opt-in for filters to get called diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index e23a410e66..c9f0093f20 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -1,14 +1,19 @@ package scheduler import ( + "bytes" "fmt" "io" "net/http" stdlibhttptest "net/http/httptest" + "strings" "testing" + "testing/iotest" "time" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/metrics/metricstest" @@ -19,7 +24,29 @@ import ( "github.com/zalando/skipper/scheduler" ) -func TestFifoCreateFilter(t *testing.T) { +func TestCreateFifoName(t *testing.T) { + for _, tt := range []struct { + name string + filterFunc func() filters.Spec + }{ + { + name: filters.FifoName, + filterFunc: NewFifo, + }, + { + name: filters.FifoWithBodyName, + filterFunc: NewFifoWithBody, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.filterFunc().Name() != tt.name { + t.Fatalf("got %q, want %q", tt.filterFunc().Name(), tt.name) + } + }) + } +} + +func TestCreateFifoFilter(t *testing.T) { for _, tt := range []struct { name string args []interface{} @@ -58,6 +85,33 @@ func TestFifoCreateFilter(t *testing.T) { Timeout: 1 * time.Second, }, }, + { + name: "fifo negative value arg1", + args: []interface{}{ + -3, + 5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo negative value arg2", + args: []interface{}{ + 3, + -5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo too small value arg3", + args: []interface{}{ + 3, + 5, + "1ns", + }, + wantParseErr: true, + }, { name: "fifo wrong type arg1", args: []interface{}{ @@ -106,43 +160,239 @@ func TestFifoCreateFilter(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - spec := &fifoSpec{} - ff, err := spec.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) + for _, f := range []func() filters.Spec{NewFifo, NewFifoWithBody} { + spec := f() + ff, err := spec.CreateFilter(tt.args) + if err != nil && !tt.wantParseErr { + t.Fatalf("Failed to parse filter: %v", err) + } + if err == nil && tt.wantParseErr { + t.Fatal("Failed to get wanted error on create filter") + } + + if _, ok := ff.(*fifoFilter); !ok && err == nil { + t.Fatal("Failed to convert filter to *fifoFilter") + } } - if err == nil && tt.wantParseErr { - t.Fatal("Failed to get wanted error on create filter") + }) + } +} + +type flusher struct { + w http.ResponseWriter +} + +func (f *flusher) Flush() { + f.w.(http.Flusher).Flush() +} + +func (f *flusher) Unwrap() http.ResponseWriter { + return f.w +} + +func (f *flusher) Write(p []byte) (n int, err error) { + n, err = f.w.Write(p) + if err == nil { + f.Flush() + } + return +} + +type slowReader struct { + r io.Reader + d time.Duration +} + +func (sr *slowReader) Read(p []byte) (int, error) { + logrus.Infof("slowReader: %d", len(p)) + if len(p) == 0 { + return 0, nil + } + time.Sleep(sr.d) + n, err := sr.r.Read(p) + logrus.Infof("slowReader: %d %v", n, err) + return n, err +} + +func TestFifoWithBody(t *testing.T) { + for _, tt := range []struct { + name string + args []interface{} + backendTime time.Duration + responseSize int + wantErr bool + }{ + { + name: "fifoWithBody 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 1024, + }, + { + name: "fifoWithBody 1024B with 0 queue should fail", + args: []interface{}{1, 0, "10ms"}, + backendTime: 50 * time.Millisecond, + responseSize: 1024, + wantErr: true, + }, + { + name: "fifoWithBody 2x 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + }, + { + name: "fifoWithBody 2x 1024B with 0 queue should fail", + args: []interface{}{1, 0, "15ms"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + + backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("backend path: %s", r.URL.Path) + buf := bytes.NewBufferString(strings.Repeat("A", tt.responseSize)) + halfReader := iotest.HalfReader(buf) + sr := &slowReader{ + d: 100 * time.Millisecond, + r: halfReader, + } + + w.WriteHeader(http.StatusOK) + // sleep here to test the difference between streaming response and not + time.Sleep(tt.backendTime) + // TODO: maybe better to do slow body streaming? + b := make([]byte, 1024) + io.CopyBuffer(&flusher{w}, sr, b) + })) + defer backend.Close() + + // proxy + metrics := &metricstest.MockMetrics{} + defer metrics.Close() + reg := scheduler.RegistryWith(scheduler.Options{ + Metrics: metrics, + EnableRouteFIFOMetrics: true, + }) + defer reg.Close() + fr := make(filters.Registry) + fr.Register(NewFifoWithBody()) + args := append(tt.args, backend.URL) + doc := fmt.Sprintf(`r: * -> fifoWithBody(%v, %v, "%v") -> "%s"`, args...) + t.Logf("%s", doc) + dc, err := testdataclient.NewDoc(doc) + if err != nil { + t.Fatalf("Failed to create testdataclient: %v", err) } - if tt.wantParseErr { - return + defer dc.Close() + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, } + rt := routing.New(ro) + defer rt.Close() + <-rt.FirstLoad() + tracer := &testTracer{MockTracer: mocktracer.New()} + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + OpenTracing: &proxy.OpenTracingParams{Tracer: tracer}, + }) + defer pr.Close() + ts := stdlibhttptest.NewServer(pr) + defer ts.Close() - f, ok := ff.(*fifoFilter) - if !ok { - t.Fatal("Failed to convert filter to *fifoFilter") + // simple test + rsp, err := ts.Client().Get(ts.URL + "/test") + if err != nil { + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) + } + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read response body from: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to read the size, got: %v, want: %v", len(b), tt.responseSize) } - // validate config - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) + t.Log("the streaming test") + // the streaming test + rspCH := make(chan *http.Response) + errCH := make(chan error) + defer func() { + close(rspCH) + close(errCH) + }() + waithCH := make(chan struct{}) + go func() { + rsp, err := ts.Client().Get(ts.URL + "/1") + t.Logf("rsp1: %s", rsp.Status) + close(waithCH) + if err != nil { + errCH <- err + } else { + rspCH <- rsp + } + }() + + <-waithCH + rsp2, err2 := ts.Client().Get(ts.URL + "/2") + t.Logf("rsp2: %s", rsp.Status) + if tt.wantErr { + n, err := io.Copy(io.Discard, rsp2.Body) + if n != 0 { + t.Fatalf("Failed to get error copied %d bytes, err: %v", n, err) + } + rsp2.Body.Close() + } else { + if err2 != nil { + t.Errorf("Failed to do 2nd request: %v", err2) + } else { + b, err2 := io.ReadAll(rsp2.Body) + if err2 != nil { + t.Errorf("Failed 2nd request to read body: %v", err2) + } + if len(b) != tt.responseSize { + t.Errorf("Failed 2nd request to get response size: %d, want: %d", len(b), tt.responseSize) + } + } } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") + + // read body from first request + select { + case err := <-errCH: + t.Fatalf("Failed to do request: %v", err) + case rsp := <-rspCH: + t.Logf("client1 got %s", rsp.Status) + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to get response size: %d, want: %d", len(b), tt.responseSize) + } } + }) } } func TestFifo(t *testing.T) { for _, tt := range []struct { - name string - filter string - freq int - per time.Duration - backendTime time.Duration - wantOkRate float64 + name string + filter string + freq int + per time.Duration + backendTime time.Duration + clientTimeout time.Duration + wantOkRate float64 }{ { name: "fifo simple ok", @@ -153,28 +403,70 @@ func TestFifo(t *testing.T) { wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue timeouts", - filter: `fifo(3, 5, "10ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, - wantOkRate: 0.1, + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue full", - filter: `fifo(3, 5, "250ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 100 * time.Millisecond, - wantOkRate: 0.0008, + name: "fifo simple client canceled", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifoWithBody simple client canceled", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifo with reaching max concurrency and queue timeouts", + filter: `fifo(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifoWithBody with reaching max concurrency and queue timeouts", + filter: `fifoWithBody(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifo with reaching max concurrency and queue full", + filter: `fifo(1, 1, "250ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 100 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.0008, + }, + { + name: "fifoWithBody with reaching max concurrency and queue full", + filter: `fifoWithBody(1, 1, "250ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 100 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.0008, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -183,7 +475,8 @@ func TestFifo(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -235,18 +528,26 @@ func TestFifo(t *testing.T) { t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) } - const clientTimeout = 1 * time.Second - - va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) t.Logf("Success [0..1]: %0.2f", va.Success()) t.Logf("requests: %d", va.TotalRequests()) + count200, _ := va.CountStatus(200) + count499, _ := va.CountStatus(0) + count502, _ := va.CountStatus(502) + count503, _ := va.CountStatus(503) + t.Logf("status 200: %d", count200) + t.Logf("status 499: %d", count499) + t.Logf("status 502: %d", count502) + t.Logf("status 503: %d", count503) + got := va.TotalSuccess() want := tt.wantOkRate * float64(va.TotalRequests()) if got < want { t.Fatalf("OK rate too low got 0 { t.Fatal("no OK") @@ -266,30 +567,36 @@ func TestFifo(t *testing.T) { func TestFifoConstantRouteUpdates(t *testing.T) { for _, tt := range []struct { - name string - filter string - freq int - per time.Duration - updateRate time.Duration - backendTime time.Duration - wantOkRate float64 + name string + filter string + freq int + per time.Duration + updateRate time.Duration + backendTime time.Duration + clientTimeout time.Duration + wantOkRate float64 }{ { - name: "fifo simple ok", - filter: `fifo(3, 5, "1s")`, - freq: 20, - per: 100 * time.Millisecond, - updateRate: 25 * time.Millisecond, - backendTime: 1 * time.Millisecond, - wantOkRate: 1.0, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, + }, { + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -298,7 +605,8 @@ func TestFifoConstantRouteUpdates(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -312,7 +620,6 @@ func TestFifoConstantRouteUpdates(t *testing.T) { } doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) - dc, err := testdataclient.NewDoc(doc) if err != nil { t.Fatalf("Failed to create testdataclient: %v", err) @@ -351,7 +658,7 @@ func TestFifoConstantRouteUpdates(t *testing.T) { // run dataclient updates quit := make(chan struct{}) - newDoc := fmt.Sprintf(`aroute: * -> fifo(100, 200, "250ms") -> "%s"`, backend.URL) + newDoc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) go func(q chan<- struct{}, updateRate time.Duration, doc1, doc2 string) { i := 0 for { @@ -371,9 +678,7 @@ func TestFifoConstantRouteUpdates(t *testing.T) { }(quit, tt.updateRate, doc, newDoc) - const clientTimeout = 1 * time.Second - - va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) quit <- struct{}{} diff --git a/proxy/proxy.go b/proxy/proxy.go index ba119efb3f..3498185683 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1454,6 +1454,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := p.do(ctx) + // writeTimeout() filter if d, ok := ctx.StateBag()[filters.WriteTimeout].(time.Duration); ok { e := ctx.ResponseController().SetWriteDeadline(time.Now().Add(d)) if e != nil { @@ -1461,12 +1462,22 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } + // stream response body to client if err != nil { p.errorResponse(ctx, err) } else { p.serveResponse(ctx) } + // fifoWtihBody() filter + if sbf, ok := ctx.StateBag()[filters.FifoWithBodyName]; ok { + if fs, ok := sbf.([]func()); ok { + for i := len(fs) - 1; i >= 0; i-- { + fs[i]() + } + } + } + if ctx.cancelBackendContext != nil { ctx.cancelBackendContext() } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5da0dd803e..e2e7de35e4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -155,6 +155,20 @@ func (fq *fifoQueue) wait(ctx context.Context) (func(), error) { cnt := fq.counter fq.mu.RUnlock() + // check request context expired + // https://github.com/golang/go/issues/63615 + if err := ctx.Err(); err != nil { + switch err { + case context.DeadlineExceeded: + return nil, ErrQueueTimeout + case context.Canceled: + return nil, ErrClientCanceled + default: + // does not exist yet in Go stdlib as of Go1.18.4 + return nil, err + } + } + // handle queue all := cnt.Add(1) // queue full? From 912f318c015bad8fb3ca8be30d94f3df084e3567 Mon Sep 17 00:00:00 2001 From: Mustafa Saber Date: Wed, 25 Oct 2023 16:14:46 +0200 Subject: [PATCH 4/9] Prevent multiple items per yaml item (#2691) * Don't allow more than one filter/predicate per yaml/json item Signed-off-by: Mustafa Abdelrahman --- cmd/webhook/admission/admission_test.go | 10 +++ .../testdata/rg/rg-with-multiple-filters.json | 49 +++++++++++++ .../rg/rg-with-multiple-predicates.json | 49 +++++++++++++ .../definitions/definitions_test.go | 8 ++- .../kubernetes/definitions/routegroups.go | 2 - .../definitions/routegroupvalidator.go | 32 +++++---- .../testdata/errorwrapdata/errors.log | 6 ++ .../testdata/errorwrapdata/routegroups.json | 71 ++++++++++++++++++- .../validation/route-with-empty-filter.log | 3 +- .../validation/route-with-empty-predicate.log | 3 +- .../route-with-multiple-filters-one-item.log | 1 + .../route-with-multiple-filters-one-item.yaml | 25 +++++++ ...oute-with-multiple-predicates-one-item.log | 1 + ...ute-with-multiple-predicates-one-item.yaml | 26 +++++++ 14 files changed, 265 insertions(+), 21 deletions(-) create mode 100644 cmd/webhook/admission/testdata/rg/rg-with-multiple-filters.json create mode 100644 cmd/webhook/admission/testdata/rg/rg-with-multiple-predicates.json create mode 100644 dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log create mode 100644 dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.log create mode 100644 dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.yaml create mode 100644 dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.log create mode 100644 dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.yaml diff --git a/cmd/webhook/admission/admission_test.go b/cmd/webhook/admission/admission_test.go index d3869de555..7cb7ca7038 100644 --- a/cmd/webhook/admission/admission_test.go +++ b/cmd/webhook/admission/admission_test.go @@ -122,6 +122,16 @@ func TestRouteGroupAdmitter(t *testing.T) { inputFile: "rg-with-invalid-eskip-filters-and-predicates.json", message: "parse failed after token status, last route id: , position 11: syntax error\\nparse failed after token Method, last route id: Method, position 6: syntax error", }, + { + name: "invalid routgroup multiple filters per json/yaml array item", + inputFile: "rg-with-multiple-filters.json", + message: `single filter expected at \"status(201) -> inlineContent(\"hi\")\"\nsingle filter expected at \" \"`, + }, + { + name: "invalid routgroup multiple predicates per json/yaml array item", + inputFile: "rg-with-multiple-predicates.json", + message: `single predicate expected at \"Method(\"GET\") && Path(\"/\")\"\nsingle predicate expected at \" \"`, + }, } { t.Run(tc.name, func(t *testing.T) { expectedResponse := responseAllowedFmt diff --git a/cmd/webhook/admission/testdata/rg/rg-with-multiple-filters.json b/cmd/webhook/admission/testdata/rg/rg-with-multiple-filters.json new file mode 100644 index 0000000000..02abcfd9a6 --- /dev/null +++ b/cmd/webhook/admission/testdata/rg/rg-with-multiple-filters.json @@ -0,0 +1,49 @@ +{ + "request": { + "uid": "req-uid", + "name": "req1", + "operation": "create", + "kind": { + "group": "zalando", + "version": "v1", + "kind": "RouteGroup" + }, + "namespace": "n1", + "object": { + "metadata": { + "name": "rg1", + "namespace": "n1" + }, + "spec": { + "backends": [ + { + "name": "backend", + "type": "shunt" + } + ], + "defaultBackends": [ + { + "backendName": "backend" + } + ], + "routes": [ + { + "backends": [ + { + "backendName": "backend" + } + ], + "filters": [ + "status(201) -> inlineContent(\"hi\")", + " " + ], + "path": "/", + "predicates": [ + "Method(\"GET\")" + ] + } + ] + } + } + } +} diff --git a/cmd/webhook/admission/testdata/rg/rg-with-multiple-predicates.json b/cmd/webhook/admission/testdata/rg/rg-with-multiple-predicates.json new file mode 100644 index 0000000000..47090f6a83 --- /dev/null +++ b/cmd/webhook/admission/testdata/rg/rg-with-multiple-predicates.json @@ -0,0 +1,49 @@ +{ + "request": { + "uid": "req-uid", + "name": "req1", + "operation": "create", + "kind": { + "group": "zalando", + "version": "v1", + "kind": "RouteGroup" + }, + "namespace": "n1", + "object": { + "metadata": { + "name": "rg1", + "namespace": "n1" + }, + "spec": { + "backends": [ + { + "name": "backend", + "type": "shunt" + } + ], + "defaultBackends": [ + { + "backendName": "backend" + } + ], + "routes": [ + { + "backends": [ + { + "backendName": "backend" + } + ], + "filters": [ + "status(201)" + ], + "path": "/", + "predicates": [ + "Method(\"GET\") && Path(\"/\")", + " " + ] + } + ] + } + } + } +} diff --git a/dataclients/kubernetes/definitions/definitions_test.go b/dataclients/kubernetes/definitions/definitions_test.go index 46d98586cd..5edab7fa4c 100644 --- a/dataclients/kubernetes/definitions/definitions_test.go +++ b/dataclients/kubernetes/definitions/definitions_test.go @@ -2,6 +2,7 @@ package definitions_test import ( "os" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -18,10 +19,15 @@ func TestValidateRouteGroups(t *testing.T) { data, err := os.ReadFile("testdata/errorwrapdata/routegroups.json") require.NoError(t, err) + logs, err := os.ReadFile("testdata/errorwrapdata/errors.log") + require.NoError(t, err) + + logsString := strings.TrimSuffix(string(logs), "\n") + rgl, err := definitions.ParseRouteGroupsJSON(data) require.NoError(t, err) err = definitions.ValidateRouteGroups(&rgl) - assert.EqualError(t, err, "route group without name\nerror in route group default/rg1: route group without backend") + assert.EqualError(t, err, logsString) } diff --git a/dataclients/kubernetes/definitions/routegroups.go b/dataclients/kubernetes/definitions/routegroups.go index 626880e7d5..922d363149 100644 --- a/dataclients/kubernetes/definitions/routegroups.go +++ b/dataclients/kubernetes/definitions/routegroups.go @@ -25,8 +25,6 @@ var ( errRouteGroupWithoutName = errors.New("route group without name") errRouteGroupWithoutSpec = errors.New("route group without spec") errInvalidRouteSpec = errors.New("invalid route spec") - errInvalidPredicate = errors.New("invalid predicate") - errInvalidFilter = errors.New("invalid filter") errInvalidMethod = errors.New("invalid method") errBothPathAndPathSubtree = errors.New("path and path subtree in the same route") errMissingBackendReference = errors.New("missing backend reference") diff --git a/dataclients/kubernetes/definitions/routegroupvalidator.go b/dataclients/kubernetes/definitions/routegroupvalidator.go index 9159ea6703..eb6e72f455 100644 --- a/dataclients/kubernetes/definitions/routegroupvalidator.go +++ b/dataclients/kubernetes/definitions/routegroupvalidator.go @@ -1,11 +1,19 @@ package definitions import ( + "errors" + "fmt" + "github.com/zalando/skipper/eskip" ) type RouteGroupValidator struct{} +var ( + errSingleFilterExpected = errors.New("single filter expected") + errSinglePredicateExpected = errors.New("single predicate expected") +) + var defaultRouteGroupValidator = &RouteGroupValidator{} // ValidateRouteGroup validates a RouteGroupItem @@ -56,8 +64,12 @@ func (rgv *RouteGroupValidator) filtersValidation(item *RouteGroupItem) error { var errs []error for _, r := range item.Spec.Routes { for _, f := range r.Filters { - _, err := eskip.ParseFilters(f) - errs = append(errs, err) + filters, err := eskip.ParseFilters(f) + if err != nil { + errs = append(errs, err) + } else if len(filters) != 1 { + errs = append(errs, fmt.Errorf("%w at \"%s\"", errSingleFilterExpected, f)) + } } } @@ -68,8 +80,12 @@ func (rgv *RouteGroupValidator) predicatesValidation(item *RouteGroupItem) error var errs []error for _, r := range item.Spec.Routes { for _, p := range r.Predicates { - _, err := eskip.ParsePredicates(p) - errs = append(errs, err) + predicates, err := eskip.ParsePredicates(p) + if err != nil { + errs = append(errs, err) + } else if len(predicates) != 1 { + errs = append(errs, fmt.Errorf("%w at \"%s\"", errSinglePredicateExpected, p)) + } } } return errorsJoin(errs...) @@ -131,14 +147,6 @@ func (r *RouteSpec) validate(hasDefault bool, backends map[string]bool) error { return errBothPathAndPathSubtree } - if hasEmpty(r.Predicates) { - return errInvalidPredicate - } - - if hasEmpty(r.Filters) { - return errInvalidFilter - } - if hasEmpty(r.Methods) { return errInvalidMethod } diff --git a/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log b/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log new file mode 100644 index 0000000000..6a5ecf8970 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log @@ -0,0 +1,6 @@ +route group without name +error in route group default/rg1: route group without backend +single predicate expected at "Path("/foo") && Method("GET")" +single predicate expected at "" +single filter expected at "inlineContent("/foo") -> status(200)" +single filter expected at " " diff --git a/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json b/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json index b2d7b9a315..270d491ec9 100644 --- a/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json +++ b/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json @@ -45,10 +45,41 @@ "kind": "RouteGroup", "spec": { "backends": [ - { + { + "name": "shunt", + "type": "shunt" + } + ], + "hosts": [ + "test.example.com" + ], + "routes": [ + { + "backends": [ + { + "backendName": "shunt" + } + ], + "filters": [ + "inlineContent(\"/foo\")" + ], + "path": "/foo" + } + ] + } + }, + { + "apiVersion": "zalando.org/v1", + "metadata": { + "name": "rg1" + }, + "kind": "RouteGroup", + "spec": { + "backends": [ + { "name": "shunt", "type": "shunt" - } + } ], "hosts": [ "test.example.com" @@ -60,6 +91,10 @@ "backendName": "shunt" } ], + "predicates": [ + "Path(\"/foo\") && Method(\"GET\")", + "" + ], "filters": [ "inlineContent(\"/foo\")" ], @@ -67,6 +102,38 @@ } ] } + }, + { + "apiVersion": "zalando.org/v1", + "metadata": { + "name": "rg1" + }, + "kind": "RouteGroup", + "spec": { + "backends": [ + { + "name": "shunt", + "type": "shunt" + } + ], + "hosts": [ + "test.example.com" + ], + "routes": [ + { + "backends": [ + { + "backendName": "shunt" + } + ], + "filters": [ + "inlineContent(\"/foo\") -> status(200)", + " " + ], + "path": "/foo" + } + ] + } } ] } diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log index 1ccd7d1fe1..6300163089 100644 --- a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log @@ -1,2 +1 @@ -test-route-group -invalid filter +single filter expected diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log index d222d6d555..6ec8ec4cbd 100644 --- a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log @@ -1,2 +1 @@ -test-route-group -invalid predicate +single predicate expected diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.log new file mode 100644 index 0000000000..6300163089 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.log @@ -0,0 +1 @@ +single filter expected diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.yaml b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.yaml new file mode 100644 index 0000000000..c9fe72cdc2 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.yaml @@ -0,0 +1,25 @@ +apiVersion: zalando.org/v1 +kind: RouteGroup +metadata: + name: test-route-group +spec: + hosts: + - example.org + backends: + - name: app + type: service + serviceName: app-svc + servicePort: 80 + defaultBackends: + - backendName: app + routes: + - path: / + methods: + - GET + - HEAD + predicates: + - Foo("X-Bar", "42") + filters: + - foo(42) -> bar(24) + backends: + - backendName: app diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.log new file mode 100644 index 0000000000..6ec8ec4cbd --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.log @@ -0,0 +1 @@ +single predicate expected diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.yaml b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.yaml new file mode 100644 index 0000000000..3bdd72f2d0 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.yaml @@ -0,0 +1,26 @@ +apiVersion: zalando.org/v1 +kind: RouteGroup +metadata: + name: test-route-group +spec: + hosts: + - example.org + backends: + - name: app + type: service + serviceName: app-svc + servicePort: 80 + defaultBackends: + - backendName: app + routes: + - path: / + methods: + - GET + - HEAD + predicates: + - Foo("X-Bar", "42") && Bar("X-Foo", "24") + filters: + - foo(42) + - bar(24) + backends: + - backendName: app From 7b433757c323ec3ab1183ef5f47a2b632283d119 Mon Sep 17 00:00:00 2001 From: Mustafa Saber Date: Thu, 26 Oct 2023 12:21:16 +0200 Subject: [PATCH 5/9] Use `%q` instead of `%s` for strings with quotes (#2700) Signed-off-by: Mustafa Abdelrahman --- cmd/webhook/admission/admission_test.go | 4 ++-- dataclients/kubernetes/definitions/routegroupvalidator.go | 4 ++-- .../kubernetes/definitions/testdata/errorwrapdata/errors.log | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/webhook/admission/admission_test.go b/cmd/webhook/admission/admission_test.go index 7cb7ca7038..3fa9a07f7f 100644 --- a/cmd/webhook/admission/admission_test.go +++ b/cmd/webhook/admission/admission_test.go @@ -125,12 +125,12 @@ func TestRouteGroupAdmitter(t *testing.T) { { name: "invalid routgroup multiple filters per json/yaml array item", inputFile: "rg-with-multiple-filters.json", - message: `single filter expected at \"status(201) -> inlineContent(\"hi\")\"\nsingle filter expected at \" \"`, + message: `single filter expected at \"status(201) -> inlineContent(\\\"hi\\\")\"\nsingle filter expected at \" \"`, }, { name: "invalid routgroup multiple predicates per json/yaml array item", inputFile: "rg-with-multiple-predicates.json", - message: `single predicate expected at \"Method(\"GET\") && Path(\"/\")\"\nsingle predicate expected at \" \"`, + message: `single predicate expected at \"Method(\\\"GET\\\") && Path(\\\"/\\\")\"\nsingle predicate expected at \" \"`, }, } { t.Run(tc.name, func(t *testing.T) { diff --git a/dataclients/kubernetes/definitions/routegroupvalidator.go b/dataclients/kubernetes/definitions/routegroupvalidator.go index eb6e72f455..4911f7910c 100644 --- a/dataclients/kubernetes/definitions/routegroupvalidator.go +++ b/dataclients/kubernetes/definitions/routegroupvalidator.go @@ -68,7 +68,7 @@ func (rgv *RouteGroupValidator) filtersValidation(item *RouteGroupItem) error { if err != nil { errs = append(errs, err) } else if len(filters) != 1 { - errs = append(errs, fmt.Errorf("%w at \"%s\"", errSingleFilterExpected, f)) + errs = append(errs, fmt.Errorf("%w at %q", errSingleFilterExpected, f)) } } } @@ -84,7 +84,7 @@ func (rgv *RouteGroupValidator) predicatesValidation(item *RouteGroupItem) error if err != nil { errs = append(errs, err) } else if len(predicates) != 1 { - errs = append(errs, fmt.Errorf("%w at \"%s\"", errSinglePredicateExpected, p)) + errs = append(errs, fmt.Errorf("%w at %q", errSinglePredicateExpected, p)) } } } diff --git a/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log b/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log index 6a5ecf8970..836138e99f 100644 --- a/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log +++ b/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log @@ -1,6 +1,6 @@ route group without name error in route group default/rg1: route group without backend -single predicate expected at "Path("/foo") && Method("GET")" +single predicate expected at "Path(\"/foo\") && Method(\"GET\")" single predicate expected at "" -single filter expected at "inlineContent("/foo") -> status(200)" +single filter expected at "inlineContent(\"/foo\") -> status(200)" single filter expected at " " From 08434c6c29e8c6374fc9d4a264e8f4d013152fa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 1 Nov 2023 10:44:03 +0100 Subject: [PATCH 6/9] feature: routesrv add prometheus metrics (#2710) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feature: routesrv enable profiling feature: routesrv split supportListener for metrics and profiling from the main listener feature: routesrv shutdown supportServer gracefully test: routesrv shutdown main listener and supportListener test: add more routesrv coverage to reach close to 80% doc: add routesrv metrics and profiling docs Signed-off-by: Sandor Szücs --- docs/operation/operation.md | 202 +++++++++++++++++++++++++++++++++++- routesrv/eskipbytes.go | 52 +++++++++- routesrv/polling.go | 42 +++----- routesrv/redishandler.go | 4 +- routesrv/routesrv.go | 118 ++++++++++++++++----- routesrv/routesrv_test.go | 15 +++ routesrv/shutdown_test.go | 187 +++++++++++++++++++++++++++++++++ 7 files changed, 559 insertions(+), 61 deletions(-) create mode 100644 routesrv/shutdown_test.go diff --git a/docs/operation/operation.md b/docs/operation/operation.md index e40feee974..2ae08caf5d 100644 --- a/docs/operation/operation.md +++ b/docs/operation/operation.md @@ -359,7 +359,9 @@ utilized applications (less than 100 requests per second): Metrics from the [go runtime memstats](https://golang.org/pkg/runtime/#MemStats) are exposed from skipper to the metrics endpoint, default listener -:9911, on path /metrics : +:9911, on path /metrics + +#### Go metrics - Codahale ```json "gauges": { @@ -464,6 +466,119 @@ are exposed from skipper to the metrics endpoint, default listener } ``` +#### Go metrics - Prometheus + +``` +# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 4.7279e-05 +go_gc_duration_seconds{quantile="0.25"} 5.9291e-05 +go_gc_duration_seconds{quantile="0.5"} 7.4e-05 +go_gc_duration_seconds{quantile="0.75"} 9.55e-05 +go_gc_duration_seconds{quantile="1"} 0.000199667 +go_gc_duration_seconds_sum 0.001108339 +go_gc_duration_seconds_count 13 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 13 +# HELP go_info Information about the Go environment. +# TYPE go_info gauge +go_info{version="go1.21.3"} 1 +# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use. +# TYPE go_memstats_alloc_bytes gauge +go_memstats_alloc_bytes 6.4856e+06 +# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed. +# TYPE go_memstats_alloc_bytes_total counter +go_memstats_alloc_bytes_total 4.1797384e+07 +# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. +# TYPE go_memstats_buck_hash_sys_bytes gauge +go_memstats_buck_hash_sys_bytes 1.462151e+06 +# HELP go_memstats_frees_total Total number of frees. +# TYPE go_memstats_frees_total counter +go_memstats_frees_total 507460 +# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata. +# TYPE go_memstats_gc_sys_bytes gauge +go_memstats_gc_sys_bytes 4.549296e+06 +# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use. +# TYPE go_memstats_heap_alloc_bytes gauge +go_memstats_heap_alloc_bytes 6.4856e+06 +# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used. +# TYPE go_memstats_heap_idle_bytes gauge +go_memstats_heap_idle_bytes 7.421952e+06 +# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use. +# TYPE go_memstats_heap_inuse_bytes gauge +go_memstats_heap_inuse_bytes 8.372224e+06 +# HELP go_memstats_heap_objects Number of allocated objects. +# TYPE go_memstats_heap_objects gauge +go_memstats_heap_objects 70159 +# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS. +# TYPE go_memstats_heap_released_bytes gauge +go_memstats_heap_released_bytes 6.47168e+06 +# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system. +# TYPE go_memstats_heap_sys_bytes gauge +go_memstats_heap_sys_bytes 1.5794176e+07 +# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection. +# TYPE go_memstats_last_gc_time_seconds gauge +go_memstats_last_gc_time_seconds 1.6987664839728708e+09 +# HELP go_memstats_lookups_total Total number of pointer lookups. +# TYPE go_memstats_lookups_total counter +go_memstats_lookups_total 0 +# HELP go_memstats_mallocs_total Total number of mallocs. +# TYPE go_memstats_mallocs_total counter +go_memstats_mallocs_total 577619 +# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures. +# TYPE go_memstats_mcache_inuse_bytes gauge +go_memstats_mcache_inuse_bytes 19200 +# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system. +# TYPE go_memstats_mcache_sys_bytes gauge +go_memstats_mcache_sys_bytes 31200 +# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures. +# TYPE go_memstats_mspan_inuse_bytes gauge +go_memstats_mspan_inuse_bytes 302904 +# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system. +# TYPE go_memstats_mspan_sys_bytes gauge +go_memstats_mspan_sys_bytes 309624 +# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place. +# TYPE go_memstats_next_gc_bytes gauge +go_memstats_next_gc_bytes 8.206808e+06 +# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations. +# TYPE go_memstats_other_sys_bytes gauge +go_memstats_other_sys_bytes 2.402169e+06 +# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator. +# TYPE go_memstats_stack_inuse_bytes gauge +go_memstats_stack_inuse_bytes 983040 +# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator. +# TYPE go_memstats_stack_sys_bytes gauge +go_memstats_stack_sys_bytes 983040 +# HELP go_memstats_sys_bytes Number of bytes obtained from system. +# TYPE go_memstats_sys_bytes gauge +go_memstats_sys_bytes 2.5531656e+07 +# HELP go_threads Number of OS threads created. +# TYPE go_threads gauge +go_threads 22 +# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. +# TYPE process_cpu_seconds_total counter +process_cpu_seconds_total 0.42 +# HELP process_max_fds Maximum number of open file descriptors. +# TYPE process_max_fds gauge +process_max_fds 60000 +# HELP process_open_fds Number of open file descriptors. +# TYPE process_open_fds gauge +process_open_fds 10 +# HELP process_resident_memory_bytes Resident memory size in bytes. +# TYPE process_resident_memory_bytes gauge +process_resident_memory_bytes 4.2811392e+07 +# HELP process_start_time_seconds Start time of the process since unix epoch in seconds. +# TYPE process_start_time_seconds gauge +process_start_time_seconds 1.69876646736e+09 +# HELP process_virtual_memory_bytes Virtual memory size in bytes. +# TYPE process_virtual_memory_bytes gauge +process_virtual_memory_bytes 2.823462912e+09 +# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes. +# TYPE process_virtual_memory_max_bytes gauge +process_virtual_memory_max_bytes 1.8446744073709552e+19 +``` + ### Redis - Rate limiting metrics System metrics exposed by the redisclient: @@ -491,7 +606,7 @@ See more details about rate limiting at [Rate limiting](../reference/filters.md# ### Open Policy Agent metrics -If Open Policy Agent filters are enabled, the following counters show up in the `/metrics` endpoint. The bundle-name is the first parameter of the filter so that for example increased error codes can be attributed to a specific source bundle / system. +If Open Policy Agent filters are enabled, the following counters show up in the `/metrics` endpoint. The bundle-name is the first parameter of the filter so that for example increased error codes can be attributed to a specific source bundle / system. - `skipper.opaAuthorizeRequest.custom.decision.allow.` - `skipper.opaAuthorizeRequest.custom.decision.deny.` @@ -505,6 +620,83 @@ The following timer metrics are exposed per used bundle-name: - `skipper.opaAuthorizeRequest.custom.eval_time.` - `skipper.opaServeResponse.custom.eval_time.` +### RouteSRV metrics + +RouteSRV metrics expose the following metrics in Prometheus format: + +``` +% curl http://127.0.0.1:9911/metrics +# 8< Go metrics >8 + +# HELP routesrv_backend_combined_duration_seconds Duration in seconds of a proxy backend combined. +# TYPE routesrv_backend_combined_duration_seconds histogram +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38-8",le="0.005"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="0.01"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="0.025"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="0.05"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="0.1"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="0.25"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="0.5"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="1"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="2.5"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="5"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="10"} 5 +routesrv_backend_combined_duration_seconds_bucket{version="v0.18.38",le="+Inf"} 5 +routesrv_backend_combined_duration_seconds_sum{version="v0.18.38"} 0.001349441 +routesrv_backend_combined_duration_seconds_count{version="v0.18.38"} 5 +# HELP routesrv_backend_duration_seconds Duration in seconds of a proxy backend. +# TYPE routesrv_backend_duration_seconds histogram +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="0.005"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="0.01"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="0.025"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="0.05"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="0.1"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="0.25"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="0.5"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="1"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="2.5"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="5"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="10"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",version="v0.18.38",le="+Inf"} 5 +routesrv_backend_duration_seconds_sum{host="",route="routersv",version="v0.18.38"} 0.001349441 +routesrv_backend_duration_seconds_count{host="",route="routersv",version="v0.18.38"} 5 +# HELP routesrv_custom_gauges Gauges number of custom metrics. +# TYPE routesrv_custom_gauges gauge +routesrv_custom_gauges{key="polling_started_timestamp",version="v0.18.38"} 1.69876646881321e+09 +routesrv_custom_gauges{key="redis_endpoints",version="v0.18.38"} 1 +routesrv_custom_gauges{key="routes.byte",version="v0.18.38"} 91378 +routesrv_custom_gauges{key="routes.initialized_timestamp",version="v0.18.38"} 1.6987664689696188e+09 +routesrv_custom_gauges{key="routes.total",version="v0.18.38"} 258 +routesrv_custom_gauges{key="routes.updated_timestamp",version="v0.18.38"} 1.698766468969631e+09 +# HELP routesrv_custom_total Total number of custom metrics. +# TYPE routesrv_custom_total counter +routesrv_custom_total{key="200",version="v0.18.38"} 5 +``` + +Metrics explanation: + +- `routesrv_custom_total{key="200",version="v0.18.38"} 5`: + 5 requests were responded with status code 200 by the current routesrv + version `v0.18.38`. +- `routesrv_custom_gauges{key="polling_started_timestamp",version="v0.18.38"} 1.69876646881321e+09`: + routesrv started to poll at 1.69876646881321e+09 seconds of UNIX beginning + (2023-10-31 16:34:28 1705425/2097152 +0100). +- `routesrv_custom_gauges{key="redis_endpoints", version="v0.18.38"} 1`: + The routes endpoint `/swarm/redis/shards` was called 1 times +- `routesrv_custom_gauges{key="routes.byte",version="v0.18.38"} 91378`: + The number of bytes that are served at `/routes` is 91378. +- `routesrv_custom_gauges{key="routes.initialized_timestamp",version="v0.18.38"} 1.6987664689696188e+09`: + routesrv initialized the routes at 1.6987664689696188e+09 seconds of UNIX beginning. + (2023-10-31 16:34:28 1016719/1048576 +0100) +- `routesrv_custom_gauges{key="routes.total",version="v0.18.38"} 258`: + The number of routes that are served at `/routes` are 258. +- `routesrv_custom_gauges{key="routes.updated_timestamp",version="v0.18.38"} 1.698766468969631e+09`: + The last update of routes by routesrv was at 1.698766468969631e+09. + (2023-10-31 16:34:28 4066927/4194304 +0100) + + +If you want to read more about RouteSRV see [deploy RouteSRV](../kubernetes/ingress-controller.md#routesrv). + ## OpenTracing Skipper has support for different [OpenTracing API](http://opentracing.io/) vendors, including @@ -636,7 +828,7 @@ The following tags are added to the Span, labels are taken from the OPA configur - `opa.decision_id=` - `opa.labels.=` -The labels can for example be used to link to a specific decision in the control plane if they contain URL fragments for the receiving entity. +The labels can for example be used to link to a specific decision in the control plane if they contain URL fragments for the receiving entity. ### Redis rate limiting spans @@ -962,11 +1154,13 @@ predicates and filters involved in the route processing: } ``` -## Profiling skipper +## Profiling Go profiling is explained in Go's [diagnostics](https://golang.org/doc/diagnostics.html) documentation. +### Profiling skipper or RouteSRV + To enable profiling in skipper you have to use `-enable-profile`. This will start a profiling route at `/debug/pprof/profile` on the support listener, which defaults to `:9911`. diff --git a/routesrv/eskipbytes.go b/routesrv/eskipbytes.go index 620b2415d7..37ebdc664a 100644 --- a/routesrv/eskipbytes.go +++ b/routesrv/eskipbytes.go @@ -11,10 +11,41 @@ import ( ot "github.com/opentracing/opentracing-go" "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" "github.com/zalando/skipper/tracing" ) +type responseWriterInterceptor struct { + http.ResponseWriter + statusCode int + bytesWritten int +} + +func (w *responseWriterInterceptor) WriteHeader(statusCode int) { + w.statusCode = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +func (w *responseWriterInterceptor) Header() http.Header { + return w.ResponseWriter.Header() +} + +func (w *responseWriterInterceptor) Write(p []byte) (int, error) { + w.bytesWritten += len(p) + return w.ResponseWriter.Write(p) +} + +// Unwrap will be used by ResponseController, so if they will use that +// to get the ResponseWrite for some reason they can do it. +func (w *responseWriterInterceptor) Unwrap() http.ResponseWriter { + return w.ResponseWriter +} + +var ( + _ http.ResponseWriter = &responseWriterInterceptor{} +) + // eskipBytes keeps eskip-formatted routes as a byte slice and // provides synchronized r/w access to them. Additionally it can // serve as an HTTP handler exposing its content. @@ -26,8 +57,9 @@ type eskipBytes struct { count int mu sync.RWMutex - tracer ot.Tracer - now func() time.Time + tracer ot.Tracer + metrics metrics.Metrics + now func() time.Time } // formatAndSet takes a slice of routes and stores them eskip-formatted @@ -54,9 +86,23 @@ func (e *eskipBytes) formatAndSet(routes []*eskip.Route) (_ int, _ string, initi return len(e.data), e.etag, initialized, updated } -func (e *eskipBytes) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) { span := tracing.CreateSpan("serve_routes", r.Context(), e.tracer) defer span.Finish() + start := time.Now() + defer e.metrics.MeasureBackend("routersv", start) + + w := &responseWriterInterceptor{ + ResponseWriter: rw, + statusCode: http.StatusOK, + } + + defer func() { + span.SetTag("status", w.statusCode) + span.SetTag("bytes", w.bytesWritten) + + e.metrics.IncCounter(strconv.Itoa(w.statusCode)) + }() if r.Method != "GET" && r.Method != "HEAD" { w.WriteHeader(http.StatusMethodNotAllowed) diff --git a/routesrv/polling.go b/routesrv/polling.go index b262e5a4dd..00895cf0ac 100644 --- a/routesrv/polling.go +++ b/routesrv/polling.go @@ -8,11 +8,10 @@ import ( "time" ot "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" "github.com/zalando/skipper/tracing" ) @@ -26,24 +25,6 @@ const ( LogRoutesUpdated = "routes updated" ) -var ( - pollingStarted = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "polling_started_timestamp", - Help: "UNIX time when the routes polling has started", - }) - routesInitialized = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "routes_initialized_timestamp", - Help: "UNIX time when the first routes were received and stored", - }) - routesUpdated = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "routes_updated_timestamp", - Help: "UNIX time of the last routes update (initial load counts as well)", - }) -) - type poller struct { client routing.DataClient b *eskipBytes @@ -56,8 +37,9 @@ type poller struct { editRoute []*eskip.Editor cloneRoute []*eskip.Clone - // tracer - tracer ot.Tracer + // visibility + tracer ot.Tracer + metrics metrics.Metrics } func (p *poller) poll(wg *sync.WaitGroup) { @@ -66,21 +48,20 @@ func (p *poller) poll(wg *sync.WaitGroup) { log.WithField("timeout", p.timeout).Info(LogPollingStarted) ticker := time.NewTicker(p.timeout) defer ticker.Stop() - pollingStarted.SetToCurrentTime() + p.setGaugeToCurrentTime("polling_started_timestamp") var lastRoutesById map[string]string for { span := tracing.CreateSpan("poll_routes", context.TODO(), p.tracer) routes, err := p.client.LoadAll() - routes = p.process(routes) - routesCount := len(routes) switch { case err != nil: log.WithError(err).Error(LogRoutesFetchingFailed) + p.metrics.IncCounter("routes.fetch_errors") span.SetTag("error", true) span.LogKV( @@ -89,6 +70,7 @@ func (p *poller) poll(wg *sync.WaitGroup) { ) case routesCount == 0: log.Error(LogRoutesEmpty) + p.metrics.IncCounter("routes.empty") span.SetTag("error", true) span.LogKV( @@ -101,12 +83,14 @@ func (p *poller) poll(wg *sync.WaitGroup) { if initialized { logger.Info(LogRoutesInitialized) span.SetTag("routes.initialized", true) - routesInitialized.SetToCurrentTime() + p.setGaugeToCurrentTime("routes.initialized_timestamp") } if updated { logger.Info(LogRoutesUpdated) span.SetTag("routes.updated", true) - routesUpdated.SetToCurrentTime() + p.setGaugeToCurrentTime("routes.updated_timestamp") + p.metrics.UpdateGauge("routes.total", float64(routesCount)) + p.metrics.UpdateGauge("routes.byte", float64(routesBytes)) } span.SetTag("routes.count", routesCount) span.SetTag("routes.bytes", routesBytes) @@ -154,6 +138,10 @@ func (p *poller) process(routes []*eskip.Route) []*eskip.Route { return routes } +func (p *poller) setGaugeToCurrentTime(name string) { + p.metrics.UpdateGauge(name, (float64(time.Now().UnixNano()) / 1e9)) +} + func mapRoutes(routes []*eskip.Route) map[string]string { byId := make(map[string]string) for _, r := range routes { diff --git a/routesrv/redishandler.go b/routesrv/redishandler.go index 1a4fc10ea9..45ad66e715 100644 --- a/routesrv/redishandler.go +++ b/routesrv/redishandler.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/zalando/skipper/dataclients/kubernetes" + "github.com/zalando/skipper/metrics" ) type RedisHandler struct { @@ -37,10 +38,11 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write(address) } -func getRedisAddresses(namespace, name string, kdc *kubernetes.Client) func() ([]byte, error) { +func getRedisAddresses(namespace, name string, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) { return func() ([]byte, error) { a := kdc.GetEndpointAddresses(namespace, name) log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a) + m.UpdateGauge("redis_endpoints", float64(len(a))) result := RedisEndpoints{} for i := 0; i < len(a); i++ { diff --git a/routesrv/routesrv.go b/routesrv/routesrv.go index d333411199..a4a6aeb1a7 100644 --- a/routesrv/routesrv.go +++ b/routesrv/routesrv.go @@ -9,21 +9,24 @@ import ( "syscall" "time" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/zalando/skipper" "github.com/zalando/skipper/dataclients/kubernetes" "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/tracing" ) // RouteServer is used to serve eskip-formatted routes, // that originate from the polled data source. type RouteServer struct { - server *http.Server - poller *poller - wg *sync.WaitGroup + metrics metrics.Metrics + server *http.Server + supportServer *http.Server + poller *poller + wg *sync.WaitGroup } // New returns an initialized route server according to the passed options. @@ -31,7 +34,27 @@ type RouteServer struct { // will stay in an uninitialized state, till StartUpdates is called and // in effect data source is queried and routes initialized/updated. func New(opts skipper.Options) (*RouteServer, error) { - rs := &RouteServer{} + if opts.PrometheusRegistry == nil { + opts.PrometheusRegistry = prometheus.NewRegistry() + } + + mopt := metrics.Options{ + Format: metrics.PrometheusKind, + Prefix: "routesrv", + PrometheusRegistry: opts.PrometheusRegistry, + EnableDebugGcMetrics: true, + EnableRuntimeMetrics: true, + EnableProfile: opts.EnableProfile, + BlockProfileRate: opts.BlockProfileRate, + MutexProfileFraction: opts.MutexProfileFraction, + MemProfileRate: opts.MemProfileRate, + } + m := metrics.NewMetrics(mopt) + metricsHandler := metrics.NewHandler(mopt, m) + + rs := &RouteServer{ + metrics: m, + } opentracingOpts := opts.OpenTracing if len(opentracingOpts) == 0 { @@ -42,12 +65,25 @@ func New(opts skipper.Options) (*RouteServer, error) { return nil, err } - b := &eskipBytes{tracer: tracer, now: time.Now} - bs := &eskipBytesStatus{b: b} - handler := http.NewServeMux() - handler.Handle("/health", bs) - handler.Handle("/routes", b) - handler.Handle("/metrics", promhttp.Handler()) + b := &eskipBytes{ + tracer: tracer, + metrics: m, + now: time.Now, + } + bs := &eskipBytesStatus{ + b: b, + } + mux := http.NewServeMux() + mux.Handle("/health", bs) + mux.Handle("/routes", b) + supportHandler := http.NewServeMux() + supportHandler.Handle("/metrics", metricsHandler) + supportHandler.Handle("/metrics/", metricsHandler) + + if opts.EnableProfile { + supportHandler.Handle("/debug/pprof", metricsHandler) + supportHandler.Handle("/debug/pprof/", metricsHandler) + } dataclient, err := kubernetes.New(opts.KubernetesDataClientOptions()) if err != nil { @@ -68,13 +104,20 @@ func New(opts skipper.Options) (*RouteServer, error) { if err != nil { return nil, err } - rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient) - handler.Handle("/swarm/redis/shards", rh) + rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient, m) + mux.Handle("/swarm/redis/shards", rh) } rs.server = &http.Server{ Addr: opts.Address, - Handler: handler, + Handler: mux, + ReadTimeout: 1 * time.Minute, + ReadHeaderTimeout: 1 * time.Minute, + } + + rs.supportServer = &http.Server{ + Addr: opts.SupportListener, + Handler: supportHandler, ReadTimeout: 1 * time.Minute, ReadHeaderTimeout: 1 * time.Minute, } @@ -89,6 +132,7 @@ func New(opts skipper.Options) (*RouteServer, error) { cloneRoute: opts.CloneRoute, oauth2Config: oauthConfig, tracer: tracer, + metrics: m, } rs.wg = &sync.WaitGroup{} @@ -114,6 +158,15 @@ func (rs *RouteServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { rs.server.Handler.ServeHTTP(w, r) } +func (rs *RouteServer) startSupportListener() { + if rs.supportServer != nil { + err := rs.supportServer.ListenAndServe() + if err != nil { + log.Errorf("Failed support listener: %v", err) + } + } +} + func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { once := sync.Once{} rs.wg.Add(1) @@ -125,6 +178,12 @@ func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { log.Infof("shutting down the server in %s...", delay) time.Sleep(delay) + if rs.supportServer != nil { + if err := rs.supportServer.Shutdown(context.Background()); err != nil { + log.Error("unable to shut down the support server: ", err) + } + log.Info("supportServer shut down") + } if err := rs.server.Shutdown(context.Background()); err != nil { log.Error("unable to shut down the server: ", err) } @@ -133,21 +192,11 @@ func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { } } -// Run starts a route server set up according to the passed options. -// It is a blocking call designed to be used as a single call/entry point, -// when running the route server as a standalone binary. It returns, when -// the server is closed, which can happen due to server startup errors or -// gracefully handled SIGTERM signal. In case of a server startup error, -// the error is returned as is. -func Run(opts skipper.Options) error { - rs, err := New(opts) - if err != nil { - return err - } +func run(rs *RouteServer, opts skipper.Options, sigs chan os.Signal) error { + var err error shutdown := newShutdownFunc(rs) - sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) go func() { <-sigs @@ -156,6 +205,7 @@ func Run(opts skipper.Options) error { rs.StartUpdates() + go rs.startSupportListener() if err = rs.server.ListenAndServe(); err != http.ErrServerClosed { go shutdown(0) } else { @@ -166,3 +216,19 @@ func Run(opts skipper.Options) error { return err } + +// Run starts a route server set up according to the passed options. +// It is a blocking call designed to be used as a single call/entry point, +// when running the route server as a standalone binary. It returns, when +// the server is closed, which can happen due to server startup errors or +// gracefully handled SIGTERM signal. In case of a server startup error, +// the error is returned as is. +func Run(opts skipper.Options) error { + rs, err := New(opts) + if err != nil { + return err + } + sigs := make(chan os.Signal, 1) + return run(rs, opts, sigs) + +} diff --git a/routesrv/routesrv_test.go b/routesrv/routesrv_test.go index c92bb3ccf9..e1ed960ff2 100644 --- a/routesrv/routesrv_test.go +++ b/routesrv/routesrv_test.go @@ -16,6 +16,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/zalando/skipper" "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" "github.com/zalando/skipper/eskip" @@ -109,6 +110,14 @@ func getRoutes(rs *routesrv.RouteServer) *httptest.ResponseRecorder { return w } +func getHealth(rs *routesrv.RouteServer) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/health", nil) + rs.ServeHTTP(w, r) + + return w +} + func getRedisURLs(rs *routesrv.RouteServer) *httptest.ResponseRecorder { w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/swarm/redis/shards", nil) @@ -171,6 +180,9 @@ func TestNotInitializedRoutesAreNotServed(t *testing.T) { t.Error("uninitialized routes were served") } wantHTTPCode(t, w, http.StatusNotFound) + + w = getHealth(rs) + wantHTTPCode(t, w, http.StatusServiceUnavailable) } func TestEmptyRoutesAreNotServed(t *testing.T) { @@ -218,6 +230,9 @@ func TestFetchedRoutesAreServedInEskipFormat(t *testing.T) { t.Errorf("served routes do not reflect kubernetes resources: %s", cmp.Diff(got, want)) } wantHTTPCode(t, w, http.StatusOK) + + w = getHealth(rs) + wantHTTPCode(t, w, http.StatusNoContent) } func TestRedisEndpointSlices(t *testing.T) { diff --git a/routesrv/shutdown_test.go b/routesrv/shutdown_test.go new file mode 100644 index 0000000000..4ce164be6c --- /dev/null +++ b/routesrv/shutdown_test.go @@ -0,0 +1,187 @@ +package routesrv + +import ( + "bytes" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/zalando/skipper" + "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" +) + +type muxHandler struct { + handler http.Handler + mu sync.RWMutex +} + +func (m *muxHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + m.mu.RLock() + defer m.mu.RUnlock() + + m.handler.ServeHTTP(w, r) +} + +func newKubeAPI(t *testing.T, specs ...io.Reader) http.Handler { + t.Helper() + api, err := kubernetestest.NewAPI(kubernetestest.TestAPIOptions{}, specs...) + if err != nil { + t.Fatalf("cannot initialize kubernetes api: %s", err) + } + return api +} +func newKubeServer(t *testing.T, specs ...io.Reader) (*httptest.Server, *muxHandler) { + t.Helper() + handler := &muxHandler{handler: newKubeAPI(t, specs...)} + return httptest.NewUnstartedServer(handler), handler +} + +func loadKubeYAML(t *testing.T, path string) io.Reader { + t.Helper() + y, err := os.ReadFile(path) + if err != nil { + t.Fatalf("failed to open kubernetes resources fixture %s: %v", path, err) + } + return bytes.NewBuffer(y) +} + +func findAddress() (string, error) { + l, err := net.ListenTCP("tcp6", &net.TCPAddr{}) + if err != nil { + return "", err + } + + defer l.Close() + return l.Addr().String(), nil +} + +func TestServerShutdownHTTP(t *testing.T) { + ks, _ := newKubeServer(t, loadKubeYAML(t, "testdata/lb-target-multi.yaml")) + defer ks.Close() + ks.Start() + + o := skipper.Options{ + KubernetesURL: "http://" + ks.Listener.Addr().String(), + SourcePollTimeout: 500 * time.Millisecond, + } + const shutdownDelay = 1 * time.Second + + address, err := findAddress() + if err != nil { + t.Fatalf("Failed to find address: %v", err) + } + supportAddress, err := findAddress() + if err != nil { + t.Fatalf("Failed to find supportAddress: %v", err) + } + + o.Address, o.SupportListener, o.WaitForHealthcheckInterval = address, supportAddress, shutdownDelay + baseURL := "http://" + address + supportBaseURL := "http://" + supportAddress + testEndpoints := []string{baseURL + "/routes", supportBaseURL + "/metrics"} + + t.Logf("kube endpoint: %q", o.KubernetesURL) + for _, u := range testEndpoints { + t.Logf("test endpoint: %q", u) + } + + rs, err := New(o) + if err != nil { + t.Fatalf("Failed to create a routesrv: %v", err) + } + + time.Sleep(o.SourcePollTimeout * 2) + + cli := http.Client{ + Timeout: time.Second, + } + rsp, err := cli.Get(o.KubernetesURL + "/api/v1/services") + if err != nil { + t.Fatalf("Failed to get %q: %v", o.KubernetesURL, err) + } + if rsp.StatusCode != 200 { + t.Fatalf("Failed to get status OK for %q: %d", o.KubernetesURL, rsp.StatusCode) + } + + sigs := make(chan os.Signal, 1) + + errCh := make(chan error) + go func() { + err := run(rs, o, sigs) + if err != nil { + errCh <- err + } + }() + + // make sure we started all listeners correctly + for i := 0; i < 5; i++ { + var ( + err error + rsp *http.Response + ) + + for _, u := range testEndpoints { + rsp, err = http.DefaultClient.Get(u) + if err != nil { + err = fmt.Errorf("failed to get %q: %v", u, err) + time.Sleep(10 * time.Millisecond) + continue + } + if rsp.StatusCode != 200 { + err = fmt.Errorf("failed to get expected status code 200 for %q, got: %d", u, rsp.StatusCode) + + time.Sleep(10 * time.Millisecond) + continue + } + err = nil + } + if i == 4 && err != nil { + t.Fatalf("Failed to get %v", err) + } + } + + // initiate shutdown + sigs <- syscall.SIGTERM + + // test that we can fetch even within termination + time.Sleep(shutdownDelay / 2) + + for _, u := range testEndpoints { + rsp, err := http.DefaultClient.Get(u) + if err != nil { + t.Fatalf("Failed to get %q after SIGTERM: %v", u, err) + } + if rsp.StatusCode != 200 { + t.Fatalf("Failed to get expected status code 200 for %q after SIGTERM, got: %d", u, rsp.StatusCode) + } + } + + // test that we get connection refused after shutdown + time.Sleep(shutdownDelay / 2) + + for _, u := range testEndpoints { + _, err = http.DefaultClient.Get(u) + switch err { + case nil: + t.Fatalf("Failed to get error as expected: %q", u) + default: + if e := err.Error(); !strings.Contains(e, "refused") { + t.Fatalf("Failed to get connection refused, got: %s", e) + } + } + } + + select { + case err := <-errCh: + t.Fatalf("Failed to shutdown: %v", err) + default: + } +} From 2f5b13fe453e1903d7b15df69579d7cd3a9a8f21 Mon Sep 17 00:00:00 2001 From: Mustafa Saber Date: Wed, 1 Nov 2023 14:50:07 +0100 Subject: [PATCH 7/9] Init validator object in tests (#2690) Signed-off-by: Mustafa Abdelrahman --- cmd/webhook/admission/admission_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/webhook/admission/admission_test.go b/cmd/webhook/admission/admission_test.go index 3fa9a07f7f..a2ad4b26da 100644 --- a/cmd/webhook/admission/admission_test.go +++ b/cmd/webhook/admission/admission_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/zalando/skipper/dataclients/kubernetes/definitions" ) const ( @@ -146,7 +147,7 @@ func TestRouteGroupAdmitter(t *testing.T) { req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() - rgAdm := &RouteGroupAdmitter{} + rgAdm := &RouteGroupAdmitter{RouteGroupValidator: &definitions.RouteGroupValidator{}} h := Handler(rgAdm) h(w, req) @@ -210,7 +211,7 @@ func TestIngressAdmitter(t *testing.T) { req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() - ingressAdm := &IngressAdmitter{} + ingressAdm := &IngressAdmitter{IngressValidator: &definitions.IngressV1Validator{}} h := Handler(ingressAdm) h(w, req) @@ -227,8 +228,8 @@ func TestIngressAdmitter(t *testing.T) { } func TestMalformedRequests(t *testing.T) { - routeGroupHandler := Handler(&RouteGroupAdmitter{}) - ingressHandler := Handler(&IngressAdmitter{}) + routeGroupHandler := Handler(&RouteGroupAdmitter{RouteGroupValidator: &definitions.RouteGroupValidator{}}) + ingressHandler := Handler(&IngressAdmitter{IngressValidator: &definitions.IngressV1Validator{}}) for _, tc := range []struct { name string From 19452245aee596a1f267fca767fdd9bd84a3a0f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Mon, 6 Nov 2023 15:36:34 +0100 Subject: [PATCH 8/9] Feature: tracing set route id to ingress spans (#2714) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature: set Tag "skipper.route_id" to `route.Id` in all ingress spans including shunt and loopback routes Signed-off-by: Sandor Szücs --- proxy/context.go | 4 +- proxy/proxy.go | 17 ++++- proxy/tracing_test.go | 146 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 162 insertions(+), 5 deletions(-) diff --git a/proxy/context.go b/proxy/context.go index 418d76ed64..468dc4ac98 100644 --- a/proxy/context.go +++ b/proxy/context.go @@ -296,7 +296,9 @@ func (c *context) Split() (filters.FilterContext, error) { } func (c *context) Loopback() { - err := c.proxy.do(c) + loopSpan := c.Tracer().StartSpan(c.proxy.tracing.initialOperationName, opentracing.ChildOf(c.ParentSpan().Context())) + defer loopSpan.Finish() + err := c.proxy.do(c, loopSpan) if c.response != nil && c.response.Body != nil { if _, err := io.Copy(io.Discard, c.response.Body); err != nil { c.Logger().Errorf("context: error while discarding remainder response body: %v.", err) diff --git a/proxy/proxy.go b/proxy/proxy.go index 3498185683..7b6923cb2e 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1024,7 +1024,7 @@ func stack() []byte { } } -func (p *Proxy) do(ctx *context) (err error) { +func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) { defer func() { if r := recover(); r != nil { p.onPanicSometimes.Do(func() { @@ -1068,6 +1068,7 @@ func (p *Proxy) do(ctx *context) (err error) { p.makeErrorResponse(ctx, errRouteLookupFailed) return errRouteLookupFailed } + parentSpan.SetTag(SkipperRouteIDTag, route.Id) ctx.applyRoute(route, params, p.flags.PreserveHost()) @@ -1086,7 +1087,16 @@ func (p *Proxy) do(ctx *context) (err error) { ctx.ensureDefaultResponse() } else if ctx.route.BackendType == eskip.LoopBackend { loopCTX := ctx.clone() - if err := p.do(loopCTX); err != nil { + loopSpan := tracing.CreateSpan("loopback", ctx.request.Context(), p.tracing.tracer) + p.tracing. + setTag(loopSpan, SpanKindTag, SpanKindServer). + setTag(loopSpan, SkipperRouteIDTag, ctx.route.Id) + p.setCommonSpanInfo(ctx.Request().URL, ctx.Request(), loopSpan) + ctx.parentSpan = loopSpan + + defer loopSpan.Finish() + + if err := p.do(loopCTX, loopSpan); err != nil { // in case of error we have to copy the response in this recursion unwinding ctx.response = loopCTX.response if err != nil { @@ -1442,6 +1452,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx.startServe = time.Now() ctx.tracer = p.tracing.tracer ctx.initialSpan = span + ctx.parentSpan = span defer func() { if ctx.response != nil && ctx.response.Body != nil { @@ -1452,7 +1463,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } }() - err := p.do(ctx) + err := p.do(ctx, span) // writeTimeout() filter if d, ok := ctx.StateBag()[filters.WriteTimeout].(time.Duration); ok { diff --git a/proxy/tracing_test.go b/proxy/tracing_test.go index 7962358f82..10c55b96eb 100644 --- a/proxy/tracing_test.go +++ b/proxy/tracing_test.go @@ -84,7 +84,8 @@ func TestTracingIngressSpan(t *testing.T) { }) defer s.Close() - doc := fmt.Sprintf(`hello: Path("/hello") -> setPath("/bye") -> setQuery("void") -> "%s"`, s.URL) + routeID := "ingressRoute" + doc := fmt.Sprintf(`%s: Path("/hello") -> setPath("/bye") -> setQuery("void") -> "%s"`, routeID, s.URL) tracer := mocktracer.New() params := Params{ @@ -126,6 +127,7 @@ func TestTracingIngressSpan(t *testing.T) { verifyTag(t, span, SpanKindTag, SpanKindServer) verifyTag(t, span, ComponentTag, "skipper") + verifyTag(t, span, SkipperRouteIDTag, routeID) // to save memory we dropped the URL tag from ingress span //verifyTag(t, span, HTTPUrlTag, "/hello?world") // For server requests there is no scheme://host:port, see https://golang.org/pkg/net/http/#Request verifyTag(t, span, HTTPMethodTag, "GET") @@ -137,6 +139,139 @@ func TestTracingIngressSpan(t *testing.T) { verifyHasTag(t, span, HTTPRemoteIPTag) } +func TestTracingIngressSpanShunt(t *testing.T) { + routeID := "ingressShuntRoute" + doc := fmt.Sprintf(`%s: Path("/hello") -> setPath("/bye") -> setQuery("void") -> status(205) -> `, routeID) + + tracer := mocktracer.New() + params := Params{ + OpenTracing: &OpenTracingParams{ + Tracer: tracer, + }, + Flags: FlagsNone, + } + + t.Setenv("HOSTNAME", "ingress-shunt.tracing.test") + + tp, err := newTestProxyWithParams(doc, params) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + req, err := http.NewRequest("GET", ps.URL+"/hello?world", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("X-Flow-Id", "test-flow-id") + + rsp, err := ps.Client().Do(req) + if err != nil { + t.Fatal(err) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + + // client may get response before proxy finishes span + time.Sleep(10 * time.Millisecond) + + span, ok := findSpan(tracer, "ingress") + if !ok { + t.Fatal("ingress span not found") + } + + verifyTag(t, span, SpanKindTag, SpanKindServer) + verifyTag(t, span, ComponentTag, "skipper") + verifyTag(t, span, SkipperRouteIDTag, routeID) + // to save memory we dropped the URL tag from ingress span + //verifyTag(t, span, HTTPUrlTag, "/hello?world") // For server requests there is no scheme://host:port, see https://golang.org/pkg/net/http/#Request + verifyTag(t, span, HTTPMethodTag, "GET") + verifyTag(t, span, HostnameTag, "ingress-shunt.tracing.test") + verifyTag(t, span, HTTPPathTag, "/hello") + verifyTag(t, span, HTTPHostTag, ps.Listener.Addr().String()) + verifyTag(t, span, FlowIDTag, "test-flow-id") + verifyTag(t, span, HTTPStatusCodeTag, uint16(205)) + verifyHasTag(t, span, HTTPRemoteIPTag) +} + +func TestTracingIngressSpanLoopback(t *testing.T) { + shuntRouteID := "ingressShuntRoute" + loop1RouteID := "loop1Route" + loop2RouteID := "loop2Route" + routeIDs := []string{loop2RouteID, loop1RouteID, shuntRouteID} + paths := map[string]string{ + loop2RouteID: "/loop2", + loop1RouteID: "/loop1", + shuntRouteID: "/shunt", + } + + doc := fmt.Sprintf(` +%s: Path("/shunt") -> setPath("/bye") -> setQuery("void") -> status(204) -> ; +%s: Path("/loop1") -> setPath("/shunt") -> ; +%s: Path("/loop2") -> setPath("/loop1") -> ; +`, shuntRouteID, loop1RouteID, loop2RouteID) + + tracer := mocktracer.New() + params := Params{ + OpenTracing: &OpenTracingParams{ + Tracer: tracer, + }, + Flags: FlagsNone, + } + + t.Setenv("HOSTNAME", "ingress-loop.tracing.test") + + tp, err := newTestProxyWithParams(doc, params) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + req, err := http.NewRequest("GET", ps.URL+"/loop2", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("X-Flow-Id", "test-flow-id") + + rsp, err := ps.Client().Do(req) + if err != nil { + t.Fatal(err) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + t.Logf("got response %d", rsp.StatusCode) + + // client may get response before proxy finishes span + time.Sleep(10 * time.Millisecond) + + sp, ok := findSpanByRouteID(tracer, loop2RouteID) + if !ok { + t.Fatalf("span for route %q not found", loop2RouteID) + } + verifyTag(t, sp, HTTPStatusCodeTag, uint16(204)) + + for _, rid := range routeIDs { + span, ok := findSpanByRouteID(tracer, rid) + if !ok { + t.Fatalf("span for route %q not found", rid) + } + verifyTag(t, span, SpanKindTag, SpanKindServer) + verifyTag(t, span, ComponentTag, "skipper") + verifyTag(t, span, SkipperRouteIDTag, rid) + verifyTag(t, span, HTTPMethodTag, "GET") + verifyTag(t, span, HostnameTag, "ingress-loop.tracing.test") + verifyTag(t, span, HTTPPathTag, paths[rid]) + verifyTag(t, span, HTTPHostTag, ps.Listener.Addr().String()) + verifyTag(t, span, FlowIDTag, "test-flow-id") + } +} + func TestTracingSpanName(t *testing.T) { traceContent := fmt.Sprintf("%x", md5.New().Sum([]byte(time.Now().String()))) s := startTestServer(nil, 0, func(r *http.Request) { @@ -562,6 +697,15 @@ func findSpan(tracer *mocktracer.MockTracer, name string) (*mocktracer.MockSpan, return nil, false } +func findSpanByRouteID(tracer *mocktracer.MockTracer, routeID string) (*mocktracer.MockSpan, bool) { + for _, s := range tracer.FinishedSpans() { + if s.Tag(SkipperRouteIDTag) == routeID { + return s, true + } + } + return nil, false +} + func verifyTag(t *testing.T, span *mocktracer.MockSpan, name string, expected interface{}) { t.Helper() if got := span.Tag(name); got != expected { From e764efe7d617448a31b8f0eedb0642015dde6193 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Thu, 16 Nov 2023 14:33:57 +0100 Subject: [PATCH 9/9] filters/auth: add setRequestHeaderFromSecret filter (#2740) Add a filter to set request header value from secret with optional prefix and suffix. It is similar to `bearerinjector` which is equivalent to `setRequestHeaderFromSecret("Authorization", "/tokens/my-token", "Bearer ")` For #1952 Signed-off-by: Alexander Yastrebov --- docs/reference/filters.md | 49 ++++++++++---- filters/auth/secretheader.go | 75 ++++++++++++++++++++++ filters/auth/secretheader_test.go | 102 ++++++++++++++++++++++++++++++ filters/filters.go | 1 + skipper.go | 1 + 5 files changed, 215 insertions(+), 13 deletions(-) create mode 100644 filters/auth/secretheader.go create mode 100644 filters/auth/secretheader_test.go diff --git a/docs/reference/filters.md b/docs/reference/filters.md index fd72fbd1fd..f14265ba03 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2957,31 +2957,54 @@ the -rfc-patch-path flag. See [URI standards interpretation](../operation/operation.md#uri-standards-interpretation). ## Egress -### bearerinjector -This filter injects `Bearer` tokens into `Authorization` headers read -from file providing the token as content. This is only for use cases -using skipper as sidecar to inject tokens for the application on the +### setRequestHeaderFromSecret + +This filter sets request header to the secret value with optional prefix and suffix. +This is only for use cases using skipper as sidecar to inject tokens for the application on the [**egress**](egress.md) path, if it's used in the **ingress** path you likely create a security issue for your application. This filter should be used as an [egress](egress.md) only feature. +Parameters: + +* header name (string) +* secret name (string) +* value prefix (string) - optional +* value suffix (string) - optional + Example: ``` -egress1: Method("POST") && Host("api.example.com") -> bearerinjector("/tmp/secrets/write-token") -> "https://api.example.com/shoes"; -egress2: Method("GET") && Host("api.example.com") -> bearerinjector("/tmp/secrets/read-token") -> "https://api.example.com/shoes"; +egress1: Method("GET") -> setRequestHeaderFromSecret("Authorization", "/tmp/secrets/get-token") -> "https://api.example.com"; +egress2: Method("POST") -> setRequestHeaderFromSecret("Authorization", "/tmp/secrets/post-token", "foo-") -> "https://api.example.com"; +egress3: Method("PUT") -> setRequestHeaderFromSecret("X-Secret", "/tmp/secrets/put-token", "bar-", "-baz") -> "https://api.example.com"; ``` -To integrate with the `bearerinjector` filter you need to run skipper -with `-credentials-paths=/tmp/secrets` and specify an update interval -`-credentials-update-interval=10s`. Files in the credentials path can -be a directory, which will be able to find all files within this -directory, but it won't walk subtrees. For the example case, there -have to be filenames `write-token` and `read-token` within the +To use `setRequestHeaderFromSecret` filter you need to run skipper +with `-credentials-paths=/tmp/secrets` and specify an update interval `-credentials-update-interval=10s`. +Files in the credentials path can be a directory, which will be able to find all files within this +directory, but it won't walk subtrees. +For the example case, there have to be `get-token`, `post-token` and `put-token` files within the specified credential paths `/tmp/secrets/`, resulting in -`/tmp/secrets/write-token` and `/tmp/secrets/read-token`. +`/tmp/secrets/get-token`, `/tmp/secrets/post-token` and `/tmp/secrets/put-token`. + +### bearerinjector + +This filter injects `Bearer` tokens into `Authorization` headers read +from file providing the token as content. + +It is a special form of `setRequestHeaderFromSecret` with `"Authorization"` header name, +`"Bearer "` prefix and empty suffix. + +Example: + +``` +egress: * -> bearerinjector("/tmp/secrets/my-token") -> "https://api.example.com"; + +// equivalent to setRequestHeaderFromSecret("Authorization", "/tmp/secrets/my-token", "Bearer ") +``` ## Open Tracing ### tracingBaggageToTag diff --git a/filters/auth/secretheader.go b/filters/auth/secretheader.go new file mode 100644 index 0000000000..cf29b1fab0 --- /dev/null +++ b/filters/auth/secretheader.go @@ -0,0 +1,75 @@ +package auth + +import ( + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/secrets" +) + +type ( + secretHeaderSpec struct { + secretsReader secrets.SecretsReader + } + + secretHeaderFilter struct { + headerName string + secretName string + prefix string + suffix string + + secretsReader secrets.SecretsReader + } +) + +func NewSetRequestHeaderFromSecret(sr secrets.SecretsReader) filters.Spec { + return &secretHeaderSpec{secretsReader: sr} +} + +func (*secretHeaderSpec) Name() string { + return filters.SetRequestHeaderFromSecretName +} + +func (s *secretHeaderSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + if len(args) < 2 || len(args) > 4 { + return nil, filters.ErrInvalidFilterParameters + } + var ok bool + + f := &secretHeaderFilter{ + secretsReader: s.secretsReader, + } + + f.headerName, ok = args[0].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + + f.secretName, ok = args[1].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + + if len(args) > 2 { + f.prefix, ok = args[2].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + } + + if len(args) > 3 { + f.suffix, ok = args[3].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + } + + return f, nil +} + +func (f *secretHeaderFilter) Request(ctx filters.FilterContext) { + value, ok := f.secretsReader.GetSecret(f.secretName) + if ok { + ctx.Request().Header.Set(f.headerName, f.prefix+string(value)+f.suffix) + } +} + +func (*secretHeaderFilter) Response(filters.FilterContext) {} diff --git a/filters/auth/secretheader_test.go b/filters/auth/secretheader_test.go new file mode 100644 index 0000000000..a40f348f87 --- /dev/null +++ b/filters/auth/secretheader_test.go @@ -0,0 +1,102 @@ +package auth_test + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/filters/filtertest" +) + +type testSecretsReader struct { + name string + secret string +} + +func (tsr *testSecretsReader) GetSecret(name string) ([]byte, bool) { + if name == tsr.name { + return []byte(tsr.secret), true + } + return nil, false +} + +func (*testSecretsReader) Close() {} + +func TestSetRequestHeaderFromSecretInvalidArgs(t *testing.T) { + spec := auth.NewSetRequestHeaderFromSecret(nil) + for _, def := range []string{ + `setRequestHeaderFromSecret()`, + `setRequestHeaderFromSecret("X-Secret")`, + `setRequestHeaderFromSecret("X-Secret", 1)`, + `setRequestHeaderFromSecret(1, "/my-secret")`, + `setRequestHeaderFromSecret("X-Secret", "/my-secret", 1)`, + `setRequestHeaderFromSecret("X-Secret", "/my-secret", "prefix", 1)`, + `setRequestHeaderFromSecret("X-Secret", "/my-secret", "prefix", "suffix", "garbage")`, + } { + t.Run(def, func(t *testing.T) { + ff := eskip.MustParseFilters(def) + require.Len(t, ff, 1) + + _, err := spec.CreateFilter(ff[0].Args) + assert.Error(t, err) + }) + } +} + +func TestSetRequestHeaderFromSecret(t *testing.T) { + spec := auth.NewSetRequestHeaderFromSecret(&testSecretsReader{ + name: "/my-secret", + secret: "secret-value", + }) + + assert.Equal(t, "setRequestHeaderFromSecret", spec.Name()) + + for _, tc := range []struct { + def, header, value string + }{ + { + def: `setRequestHeaderFromSecret("X-Secret", "/my-secret")`, + header: "X-Secret", + value: "secret-value", + }, + { + def: `setRequestHeaderFromSecret("X-Secret", "/my-secret", "foo-")`, + header: "X-Secret", + value: "foo-secret-value", + }, + { + def: `setRequestHeaderFromSecret("X-Secret", "/my-secret", "foo-", "-bar")`, + header: "X-Secret", + value: "foo-secret-value-bar", + }, + { + def: `setRequestHeaderFromSecret("X-Secret", "/does-not-exist")`, + header: "X-Secret", + value: "", + }, + } { + t.Run(tc.def, func(t *testing.T) { + ff := eskip.MustParseFilters(tc.def) + require.Len(t, ff, 1) + + f, err := spec.CreateFilter(ff[0].Args) + assert.NoError(t, err) + + ctx := &filtertest.Context{ + FRequest: &http.Request{ + Header: http.Header{}, + }, + } + f.Request(ctx) + + if tc.value != "" { + assert.Equal(t, tc.value, ctx.FRequest.Header.Get(tc.header)) + } else { + assert.NotContains(t, ctx.FRequest.Header, tc.header) + } + }) + } +} diff --git a/filters/filters.go b/filters/filters.go index 9f9a1f333a..d91cc435b4 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -333,6 +333,7 @@ const ( RfcPathName = "rfcPath" RfcHostName = "rfcHost" BearerInjectorName = "bearerinjector" + SetRequestHeaderFromSecretName = "setRequestHeaderFromSecret" TracingBaggageToTagName = "tracingBaggageToTag" StateBagToTagName = "stateBagToTag" TracingTagName = "tracingTag" diff --git a/skipper.go b/skipper.go index 97ff66990f..eb75610a60 100644 --- a/skipper.go +++ b/skipper.go @@ -1584,6 +1584,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { block.NewBlock(o.MaxMatcherBufferSize), block.NewBlockHex(o.MaxMatcherBufferSize), auth.NewBearerInjector(sp), + auth.NewSetRequestHeaderFromSecret(sp), auth.NewJwtValidationWithOptions(tio), auth.TokenintrospectionWithOptions(auth.NewOAuthTokenintrospectionAnyClaims, tio), auth.TokenintrospectionWithOptions(auth.NewOAuthTokenintrospectionAllClaims, tio),