Skip to content

Commit

Permalink
optimize logBody to not buffer and pass x-flow-id to the log, such th…
Browse files Browse the repository at this point in the history
…at it can be searched in log analysis

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Nov 22, 2023
1 parent f0678f3 commit c6c0c7d
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 32 deletions.
1 change: 0 additions & 1 deletion filters/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func blockMatcher(matches []toBlockKeys) func(b []byte) (int, error) {
return func(b []byte) (int, error) {
for _, s := range matches {
s := s
println("blockMatcher:", string(b), len(string(b)), "contains?:", string(s.Str))
if bytes.Contains(b, s.Str) {
b = nil
return 0, net.ErrBlocked
Expand Down
32 changes: 14 additions & 18 deletions filters/diag/logbody.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package diag

import (
"context"
"fmt"
"strings"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/flowid"
"github.com/zalando/skipper/net"
)

Expand Down Expand Up @@ -59,13 +61,12 @@ func (lb logBody) Request(ctx filters.FilterContext) {

req := ctx.Request()
if req.Body != nil {
req.Body = net.WrapBody(
req.Context(),
func(p []byte) (int, error) {
ctx.Logger().Infof(`logBody("request"): %q`, p)
return len(p), nil
},
req.Body)
req.Body = net.LogBody(
context.Background(),
fmt.Sprintf(`logBody("request") %s: `, req.Header.Get(flowid.HeaderName)),
ctx.Logger().Infof,
req.Body,
)
}
}

Expand All @@ -76,16 +77,11 @@ func (lb logBody) Response(ctx filters.FilterContext) {

rsp := ctx.Response()
if rsp.Body != nil {
// if this is not set we get from curl
// Error while processing content unencoding: invalid stored block lengths
rsp.Header.Del("Content-Length")
rsp.ContentLength = -1

rsp.Body = net.WrapBody(
context.Background(), // not sure if it makes sense to be cancellable here
func(p []byte) (int, error) {
ctx.Logger().Infof(`logBody("response"): %q`, p)
return len(p), nil
}, rsp.Body)
rsp.Body = net.LogBody(
context.Background(),
fmt.Sprintf(`logBody("response") %s: `, ctx.Request().Header.Get(flowid.HeaderName)),
ctx.Logger().Infof,
rsp.Body,
)
}
}
49 changes: 37 additions & 12 deletions net/httpbody.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,38 @@ const (
MaxBufferAbort
)

type logBody struct {
ctx context.Context
fmtstr string
log func(format string, args ...interface{})
input io.ReadCloser
}

func newLogBody(ctx context.Context, fmtstr string, log func(format string, args ...interface{}), rc io.ReadCloser) io.ReadCloser {
return &logBody{
ctx: ctx,
fmtstr: fmtstr,
input: rc,
log: log,
}
}

func (lb *logBody) Read(p []byte) (int, error) {
n, err := lb.input.Read(p)
if n > 0 {
lb.log("%s%s", lb.fmtstr, p)
}
return n, err
}

func (lb *logBody) Close() error {
return lb.input.Close()
}

func LogBody(ctx context.Context, fmtstr string, log func(format string, args ...interface{}), rc io.ReadCloser) io.ReadCloser {
return newLogBody(ctx, fmtstr, log, rc)
}

type matcher struct {
ctx context.Context
once sync.Once
Expand Down Expand Up @@ -60,7 +92,6 @@ func newMatcher(
if maxBufferSize < rsize {
rsize = maxBufferSize
}
println("maxBufferSize:", maxBufferSize, "rsize:", rsize)

return &matcher{
ctx: ctx,
Expand All @@ -80,9 +111,7 @@ func (m *matcher) readNTimes(times int) (bool, error) {
var consumedInput bool
for i := 0; i < times; i++ {
n, err := m.input.Read(m.readBuffer)
println("readNTimes(", times, "): read n bytes:", n, "eof:", err == io.EOF)
n2, err2 := m.pending.Write(m.readBuffer[:n])
println("readNTimes(", times, "): wrote n2 bytes:", n2, "eof:", err == io.EOF)
_, err2 := m.pending.Write(m.readBuffer[:n])

if n > 0 {
consumedInput = true
Expand All @@ -92,7 +121,7 @@ func (m *matcher) readNTimes(times int) (bool, error) {
return consumedInput, err
}
if err2 != nil {
return consumedInput, err
return consumedInput, err2
}

}
Expand All @@ -104,7 +133,6 @@ func (m *matcher) fill(requested int) error {
readSize := 1
for m.ready.Len() < requested {
consumedInput, err := m.readNTimes(readSize)
println("fill(", requested, "), m.ready.Len():", m.ready.Len(), "consumedInput:", consumedInput, "m.pending.Len():", m.pending.Len())
if !consumedInput {
io.CopyBuffer(m.ready, m.pending, m.readBuffer)
return err
Expand Down Expand Up @@ -136,7 +164,6 @@ func (m *matcher) fill(requested int) error {
}

func (m *matcher) Read(p []byte) (int, error) {
println("matcher.Read: len(p):", len(p))
if m.closed {
return 0, ErrClosed
}
Expand All @@ -159,8 +186,6 @@ func (m *matcher) Read(p []byte) (int, error) {
}

n, _ := m.ready.Read(p)
println("matcher.Read n bytes:", n)

if n == 0 && len(p) > 0 && m.err != nil {
return 0, m.err
}
Expand All @@ -172,13 +197,13 @@ func (m *matcher) Read(p []byte) (int, error) {
return 0, m.ctx.Err()
default:
}
n, err := m.f(p)
println("matcher.Read f processed n bytes:", n)

n, err := m.f(p)
if err != nil {
m.closed = true

if err == ErrBlocked {
switch err {
case ErrBlocked:
m.metrics.IncCounter("blocked.requests")
}

Expand Down
121 changes: 120 additions & 1 deletion net/httpbody_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/zalando/skipper/filters/flowid"
)

type mybuf struct {
Expand Down Expand Up @@ -40,6 +43,122 @@ func blockMatcher(matches []toBlockKeys) func(b []byte) (int, error) {
}
}

func TestHttpBodyLogBody(t *testing.T) {
t.Run("logbody request", func(t *testing.T) {
sent := strings.Repeat("a", 1024)
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b := make([]byte, 0, 1024)
buf := bytes.NewBuffer(b)
_, err := io.Copy(buf, r.Body)
if err != nil {
t.Fatalf("Failed to read body on backend receiver: %v", err)
}

if got := buf.String(); got != sent {
t.Fatalf("Failed to get request body in backend. want: %q, got: %q", sent, got)
}
w.WriteHeader(200)
w.Write([]byte("OK"))
}))
defer backend.Close()

lgbuf := &bytes.Buffer{}

var b mybuf
b.buf = bytes.NewBufferString(sent)

req, err := http.NewRequest("POST", backend.URL, b.buf)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Add(flowid.HeaderName, "foo")

lg := func(format string, args ...interface{}) {
s := fmt.Sprintf(format, args...)
lgbuf.WriteString(s)
}

body := LogBody(
context.Background(),
fmt.Sprintf(`logBody("request") %s: `, req.Header.Get(flowid.HeaderName)),
lg,
req.Body,
)
defer body.Close()
req.Body = body

rsp, err := (&http.Client{}).Do(req)
if err != nil {
t.Fatalf("Failed to do POST request, got err: %v", err)
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
t.Fatalf("Failed to get the expected status code 200, got: %d", rsp.StatusCode)
}

lgData := lgbuf.String()
wantLogData := fmt.Sprintf(`logBody("request") %s: %s`, req.Header.Get(flowid.HeaderName), sent)
if wantLogData != lgData {
t.Fatalf("Failed to get log %q, got %q", wantLogData, lgData)
}
})

t.Run("logbody response", func(t *testing.T) {
sent := strings.Repeat("a", 512)
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
w.Write([]byte(sent))
}))
defer backend.Close()

lgbuf := &bytes.Buffer{}

req, err := http.NewRequest("GET", backend.URL, nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Add(flowid.HeaderName, "bar")

rsp, err := backend.Client().Do(req)
if err != nil {
t.Fatalf("Failed to do POST request, got err: %v", err)
}

if rsp.StatusCode != http.StatusOK {
t.Fatalf("Failed to get the expected status code 200, got: %d", rsp.StatusCode)
}

lg := func(format string, args ...interface{}) {
s := fmt.Sprintf(format, args...)
lgbuf.WriteString(s)
}
body := LogBody(
context.Background(),
fmt.Sprintf(`logBody("response") %s: `, req.Header.Get(flowid.HeaderName)),
lg,
rsp.Body,
)
defer body.Close()
rsp.Body = body

var buf bytes.Buffer
io.Copy(&buf, rsp.Body)
rsp.Body.Close()
rspBody := buf.String()
if rspBody != sent {
t.Fatalf("Failed to get sent %q, got rspbody %q", sent, rspBody)
}

lgData := lgbuf.String()
wantLogData := fmt.Sprintf(`logBody("response") %s: %s`, req.Header.Get(flowid.HeaderName), sent)
if wantLogData != lgData {
t.Fatalf("Failed to get log %q, got %q", wantLogData, lgData)
}
})

}

func TestHttpBodyReadOnly(t *testing.T) {
sent := "hell0 foo bar"

Expand Down Expand Up @@ -84,7 +203,7 @@ func TestHttpBodyReadOnly(t *testing.T) {
defer body.Close()
rsp, err := (&http.Client{}).Post(okBackend.URL, "text/plain", body)
if err != nil {
t.Fatalf("Expected POST request to be blocked, got err: %v", err)
t.Fatalf("Failed to do POST request: %v", err)
}

if rsp.StatusCode != http.StatusOK {
Expand Down

0 comments on commit c6c0c7d

Please sign in to comment.