From c6c0c7dcb5965ec840cb448c278cb3778d5abf01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Mon, 20 Nov 2023 16:31:32 +0100 Subject: [PATCH] optimize logBody to not buffer and pass x-flow-id to the log, such that it can be searched in log analysis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- filters/block/block.go | 1 - filters/diag/logbody.go | 32 +++++------ net/httpbody.go | 49 ++++++++++++---- net/httpbody_test.go | 121 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 171 insertions(+), 32 deletions(-) diff --git a/filters/block/block.go b/filters/block/block.go index 0a1c8091e1..4524c6f5a0 100644 --- a/filters/block/block.go +++ b/filters/block/block.go @@ -88,7 +88,6 @@ func blockMatcher(matches []toBlockKeys) func(b []byte) (int, error) { return func(b []byte) (int, error) { for _, s := range matches { s := s - println("blockMatcher:", string(b), len(string(b)), "contains?:", string(s.Str)) if bytes.Contains(b, s.Str) { b = nil return 0, net.ErrBlocked diff --git a/filters/diag/logbody.go b/filters/diag/logbody.go index 4c41b3210b..f18f78f568 100644 --- a/filters/diag/logbody.go +++ b/filters/diag/logbody.go @@ -2,9 +2,11 @@ package diag import ( "context" + "fmt" "strings" "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/filters/flowid" "github.com/zalando/skipper/net" ) @@ -59,13 +61,12 @@ func (lb logBody) Request(ctx filters.FilterContext) { req := ctx.Request() if req.Body != nil { - req.Body = net.WrapBody( - req.Context(), - func(p []byte) (int, error) { - ctx.Logger().Infof(`logBody("request"): %q`, p) - return len(p), nil - }, - req.Body) + req.Body = net.LogBody( + context.Background(), + fmt.Sprintf(`logBody("request") %s: `, req.Header.Get(flowid.HeaderName)), + ctx.Logger().Infof, + req.Body, + ) } } @@ -76,16 +77,11 @@ func (lb logBody) Response(ctx filters.FilterContext) { rsp := ctx.Response() if rsp.Body != nil { - // if this is not set we get from curl - // Error while processing content unencoding: invalid stored block lengths - rsp.Header.Del("Content-Length") - rsp.ContentLength = -1 - - rsp.Body = net.WrapBody( - context.Background(), // not sure if it makes sense to be cancellable here - func(p []byte) (int, error) { - ctx.Logger().Infof(`logBody("response"): %q`, p) - return len(p), nil - }, rsp.Body) + rsp.Body = net.LogBody( + context.Background(), + fmt.Sprintf(`logBody("response") %s: `, ctx.Request().Header.Get(flowid.HeaderName)), + ctx.Logger().Infof, + rsp.Body, + ) } } diff --git a/net/httpbody.go b/net/httpbody.go index a17af98470..6718c0eb87 100644 --- a/net/httpbody.go +++ b/net/httpbody.go @@ -26,6 +26,38 @@ const ( MaxBufferAbort ) +type logBody struct { + ctx context.Context + fmtstr string + log func(format string, args ...interface{}) + input io.ReadCloser +} + +func newLogBody(ctx context.Context, fmtstr string, log func(format string, args ...interface{}), rc io.ReadCloser) io.ReadCloser { + return &logBody{ + ctx: ctx, + fmtstr: fmtstr, + input: rc, + log: log, + } +} + +func (lb *logBody) Read(p []byte) (int, error) { + n, err := lb.input.Read(p) + if n > 0 { + lb.log("%s%s", lb.fmtstr, p) + } + return n, err +} + +func (lb *logBody) Close() error { + return lb.input.Close() +} + +func LogBody(ctx context.Context, fmtstr string, log func(format string, args ...interface{}), rc io.ReadCloser) io.ReadCloser { + return newLogBody(ctx, fmtstr, log, rc) +} + type matcher struct { ctx context.Context once sync.Once @@ -60,7 +92,6 @@ func newMatcher( if maxBufferSize < rsize { rsize = maxBufferSize } - println("maxBufferSize:", maxBufferSize, "rsize:", rsize) return &matcher{ ctx: ctx, @@ -80,9 +111,7 @@ func (m *matcher) readNTimes(times int) (bool, error) { var consumedInput bool for i := 0; i < times; i++ { n, err := m.input.Read(m.readBuffer) - println("readNTimes(", times, "): read n bytes:", n, "eof:", err == io.EOF) - n2, err2 := m.pending.Write(m.readBuffer[:n]) - println("readNTimes(", times, "): wrote n2 bytes:", n2, "eof:", err == io.EOF) + _, err2 := m.pending.Write(m.readBuffer[:n]) if n > 0 { consumedInput = true @@ -92,7 +121,7 @@ func (m *matcher) readNTimes(times int) (bool, error) { return consumedInput, err } if err2 != nil { - return consumedInput, err + return consumedInput, err2 } } @@ -104,7 +133,6 @@ func (m *matcher) fill(requested int) error { readSize := 1 for m.ready.Len() < requested { consumedInput, err := m.readNTimes(readSize) - println("fill(", requested, "), m.ready.Len():", m.ready.Len(), "consumedInput:", consumedInput, "m.pending.Len():", m.pending.Len()) if !consumedInput { io.CopyBuffer(m.ready, m.pending, m.readBuffer) return err @@ -136,7 +164,6 @@ func (m *matcher) fill(requested int) error { } func (m *matcher) Read(p []byte) (int, error) { - println("matcher.Read: len(p):", len(p)) if m.closed { return 0, ErrClosed } @@ -159,8 +186,6 @@ func (m *matcher) Read(p []byte) (int, error) { } n, _ := m.ready.Read(p) - println("matcher.Read n bytes:", n) - if n == 0 && len(p) > 0 && m.err != nil { return 0, m.err } @@ -172,13 +197,13 @@ func (m *matcher) Read(p []byte) (int, error) { return 0, m.ctx.Err() default: } - n, err := m.f(p) - println("matcher.Read f processed n bytes:", n) + n, err := m.f(p) if err != nil { m.closed = true - if err == ErrBlocked { + switch err { + case ErrBlocked: m.metrics.IncCounter("blocked.requests") } diff --git a/net/httpbody_test.go b/net/httpbody_test.go index 876396cc91..b0afe3c273 100644 --- a/net/httpbody_test.go +++ b/net/httpbody_test.go @@ -4,12 +4,15 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net/http" "net/http/httptest" "strings" "testing" "time" + + "github.com/zalando/skipper/filters/flowid" ) type mybuf struct { @@ -40,6 +43,122 @@ func blockMatcher(matches []toBlockKeys) func(b []byte) (int, error) { } } +func TestHttpBodyLogBody(t *testing.T) { + t.Run("logbody request", func(t *testing.T) { + sent := strings.Repeat("a", 1024) + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b := make([]byte, 0, 1024) + buf := bytes.NewBuffer(b) + _, err := io.Copy(buf, r.Body) + if err != nil { + t.Fatalf("Failed to read body on backend receiver: %v", err) + } + + if got := buf.String(); got != sent { + t.Fatalf("Failed to get request body in backend. want: %q, got: %q", sent, got) + } + w.WriteHeader(200) + w.Write([]byte("OK")) + })) + defer backend.Close() + + lgbuf := &bytes.Buffer{} + + var b mybuf + b.buf = bytes.NewBufferString(sent) + + req, err := http.NewRequest("POST", backend.URL, b.buf) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + req.Header.Add(flowid.HeaderName, "foo") + + lg := func(format string, args ...interface{}) { + s := fmt.Sprintf(format, args...) + lgbuf.WriteString(s) + } + + body := LogBody( + context.Background(), + fmt.Sprintf(`logBody("request") %s: `, req.Header.Get(flowid.HeaderName)), + lg, + req.Body, + ) + defer body.Close() + req.Body = body + + rsp, err := (&http.Client{}).Do(req) + if err != nil { + t.Fatalf("Failed to do POST request, got err: %v", err) + } + defer rsp.Body.Close() + + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get the expected status code 200, got: %d", rsp.StatusCode) + } + + lgData := lgbuf.String() + wantLogData := fmt.Sprintf(`logBody("request") %s: %s`, req.Header.Get(flowid.HeaderName), sent) + if wantLogData != lgData { + t.Fatalf("Failed to get log %q, got %q", wantLogData, lgData) + } + }) + + t.Run("logbody response", func(t *testing.T) { + sent := strings.Repeat("a", 512) + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + w.Write([]byte(sent)) + })) + defer backend.Close() + + lgbuf := &bytes.Buffer{} + + req, err := http.NewRequest("GET", backend.URL, nil) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + req.Header.Add(flowid.HeaderName, "bar") + + rsp, err := backend.Client().Do(req) + if err != nil { + t.Fatalf("Failed to do POST request, got err: %v", err) + } + + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get the expected status code 200, got: %d", rsp.StatusCode) + } + + lg := func(format string, args ...interface{}) { + s := fmt.Sprintf(format, args...) + lgbuf.WriteString(s) + } + body := LogBody( + context.Background(), + fmt.Sprintf(`logBody("response") %s: `, req.Header.Get(flowid.HeaderName)), + lg, + rsp.Body, + ) + defer body.Close() + rsp.Body = body + + var buf bytes.Buffer + io.Copy(&buf, rsp.Body) + rsp.Body.Close() + rspBody := buf.String() + if rspBody != sent { + t.Fatalf("Failed to get sent %q, got rspbody %q", sent, rspBody) + } + + lgData := lgbuf.String() + wantLogData := fmt.Sprintf(`logBody("response") %s: %s`, req.Header.Get(flowid.HeaderName), sent) + if wantLogData != lgData { + t.Fatalf("Failed to get log %q, got %q", wantLogData, lgData) + } + }) + +} + func TestHttpBodyReadOnly(t *testing.T) { sent := "hell0 foo bar" @@ -84,7 +203,7 @@ func TestHttpBodyReadOnly(t *testing.T) { defer body.Close() rsp, err := (&http.Client{}).Post(okBackend.URL, "text/plain", body) if err != nil { - t.Fatalf("Expected POST request to be blocked, got err: %v", err) + t.Fatalf("Failed to do POST request: %v", err) } if rsp.StatusCode != http.StatusOK {