From f0678f3fbfb1e1ff0a34a872b3662fa3fc1944a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Fri, 19 May 2023 22:48:20 +0200 Subject: [PATCH] httpbody wrapper blockmatcher with func passing feature: implement net.HttpBody to allow streaming filters on http body request and response refactoring: blockContent,blockContentHex filters to use the new implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit refactor: move ErrBlock form proxy to net refactor: reuse types and enum and expose options to set httpbody matcher config drop noleak because of transport related leaks feature: read write body wrapper and test api with porting compress filter Signed-off-by: Sandor Szücs --- filters/block/block.go | 61 ++++-- filters/block/block_test.go | 376 +++++++++++++++++++++-------------- filters/block/matcher.go | 209 ------------------- filters/builtin/builtin.go | 1 + filters/diag/absorb.go | 3 +- filters/diag/logbody.go | 91 +++++++++ filters/diag/logbody_test.go | 148 ++++++++++++++ filters/filters.go | 1 + net/httpbody.go | 245 +++++++++++++++++++++++ net/httpbody_test.go | 333 +++++++++++++++++++++++++++++++ proxy/proxy.go | 4 +- proxy/teebody.go | 3 +- 12 files changed, 1096 insertions(+), 379 deletions(-) delete mode 100644 filters/block/matcher.go create mode 100644 filters/diag/logbody.go create mode 100644 filters/diag/logbody_test.go create mode 100644 net/httpbody.go create mode 100644 net/httpbody_test.go diff --git a/filters/block/block.go b/filters/block/block.go index f01b59ce63..0a1c8091e1 100644 --- a/filters/block/block.go +++ b/filters/block/block.go @@ -1,10 +1,12 @@ package block import ( + "bytes" "encoding/hex" "errors" "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/net" ) var ( @@ -16,10 +18,16 @@ type blockSpec struct { hex bool } +type toBlockKeys struct{ Str []byte } + +func (b toBlockKeys) String() string { + return string(b.Str) +} + type block struct { - toblockList []toblockKeys + toblockList []toBlockKeys maxEditorBuffer uint64 - maxBufferHandling maxBufferHandling + maxBufferHandling net.MaxBufferHandling } // NewBlockFilter *deprecated* version of NewBlock @@ -52,7 +60,7 @@ func (bs *blockSpec) CreateFilter(args []interface{}) (filters.Filter, error) { return nil, filters.ErrInvalidFilterParameters } - sargs := make([]toblockKeys, 0, len(args)) + sargs := make([]toBlockKeys, 0, len(args)) for _, w := range args { v, ok := w.(string) if !ok { @@ -63,33 +71,50 @@ func (bs *blockSpec) CreateFilter(args []interface{}) (filters.Filter, error) { if err != nil { return nil, err } - sargs = append(sargs, toblockKeys{str: a}) + sargs = append(sargs, toBlockKeys{Str: a}) } else { - sargs = append(sargs, toblockKeys{str: []byte(v)}) + sargs = append(sargs, toBlockKeys{Str: []byte(v)}) } } - b := &block{ + return &block{ toblockList: sargs, - maxBufferHandling: maxBufferBestEffort, + maxBufferHandling: net.MaxBufferBestEffort, maxEditorBuffer: bs.MaxMatcherBufferSize, - } + }, nil +} - return *b, nil +func blockMatcher(matches []toBlockKeys) func(b []byte) (int, error) { + return func(b []byte) (int, error) { + for _, s := range matches { + s := s + println("blockMatcher:", string(b), len(string(b)), "contains?:", string(s.Str)) + if bytes.Contains(b, s.Str) { + b = nil + return 0, net.ErrBlocked + } + } + return len(b), nil + } } -func (b block) Request(ctx filters.FilterContext) { +func (b *block) Request(ctx filters.FilterContext) { req := ctx.Request() if req.ContentLength == 0 { return } - - req.Body = newMatcher( - req.Body, - b.toblockList, - b.maxEditorBuffer, - b.maxBufferHandling, - ) + // fix filter chaining - https://github.com/zalando/skipper/issues/2605 + ctx.Request().Header.Del("Content-Length") + ctx.Request().ContentLength = -1 + + req.Body = net.WrapBodyWithOptions( + req.Context(), + net.BodyOptions{ + MaxBufferHandling: b.maxBufferHandling, + ReadBufferSize: b.maxEditorBuffer, + }, + blockMatcher(b.toblockList), + req.Body) } -func (block) Response(filters.FilterContext) {} +func (*block) Response(filters.FilterContext) {} diff --git a/filters/block/block_test.go b/filters/block/block_test.go index ed858bdc98..335ccf5c10 100644 --- a/filters/block/block_test.go +++ b/filters/block/block_test.go @@ -11,7 +11,7 @@ import ( "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" - "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/net" "github.com/zalando/skipper/proxy/proxytest" ) @@ -39,22 +39,26 @@ func TestMatcher(t *testing.T) { { name: "empty string", content: "", + block: []byte(".class"), err: nil, }, { name: "small string", content: ".class", - err: proxy.ErrBlocked, + block: []byte(".class"), + err: net.ErrBlocked, }, { name: "small string without match", content: "foxi", + block: []byte(".class"), err: nil, }, { name: "small string with match", content: "fox.class.foo.blah", - err: proxy.ErrBlocked, + block: []byte(".class"), + err: net.ErrBlocked, }, { name: "hex string 0x00 without match", @@ -65,42 +69,43 @@ func TestMatcher(t *testing.T) { name: "hex string 0x00 with match", content: "fox.c\x00.foo.blah", block: []byte("\x00"), - err: proxy.ErrBlocked, + err: net.ErrBlocked, }, { name: "hex string with uppercase match content string with lowercase", content: "fox.c\x0A.foo.blah", block: []byte("\x0a"), - err: proxy.ErrBlocked, + err: net.ErrBlocked, }, { name: "hex string 0x00 0x0a with match", content: "fox.c\x00\x0a.foo.blah", block: []byte{0, 10}, - err: proxy.ErrBlocked, + err: net.ErrBlocked, }, { name: "long string", content: strings.Repeat("A", 8192), + block: []byte(".class"), }} { t.Run(tt.name, func(t *testing.T) { - block := []byte(".class") - if len(tt.block) != 0 { - block = tt.block - } r := &nonBlockingReader{initialContent: []byte(tt.content)} - toblockList := []toblockKeys{{str: block}} + toblockList := []toBlockKeys{{Str: tt.block}} - bmb := newMatcher(r, toblockList, 2097152, maxBufferBestEffort) + req, err := http.NewRequest("POST", "http://test.example", r) + if err != nil { + t.Fatalf("Failed to create request with body: %v", err) + } + + bmb := net.WrapBody(req.Context(), blockMatcher(toblockList), req.Body) - t.Logf("Content: %s", r.initialContent) p := make([]byte, len(r.initialContent)) n, err := bmb.Read(p) if err != tt.err { t.Fatalf("Failed to get expected err %v, got: %v", tt.err, err) } if err != nil { - if err == proxy.ErrBlocked { + if err == net.ErrBlocked { t.Logf("Stop! Request has some blocked content!") } else { t.Errorf("Failed to read: %v", err) @@ -108,60 +113,10 @@ func TestMatcher(t *testing.T) { } else if n != len(tt.content) { t.Errorf("Failed to read content length %d, got %d", len(tt.content), n) } - }) } } -func TestMatcherErrorCases(t *testing.T) { - toblockList := []toblockKeys{{str: []byte(".class")}} - t.Run("maxBufferAbort", func(t *testing.T) { - r := &nonBlockingReader{initialContent: []byte("fppppppppp .class")} - bmb := newMatcher(r, toblockList, 5, maxBufferAbort) - p := make([]byte, len(r.initialContent)) - _, err := bmb.Read(p) - if err != ErrMatcherBufferFull { - t.Errorf("Failed to get expected error %v, got: %v", ErrMatcherBufferFull, err) - } - }) - - t.Run("maxBuffer", func(t *testing.T) { - r := &nonBlockingReader{initialContent: []byte("fppppppppp .class")} - bmb := newMatcher(r, toblockList, 5, maxBufferBestEffort) - p := make([]byte, len(r.initialContent)) - _, err := bmb.Read(p) - if err != nil { - t.Errorf("Failed to read: %v", err) - } - }) - - t.Run("maxBuffer read on closed reader", func(t *testing.T) { - pipeR, pipeW := io.Pipe() - initialContent := []byte("fppppppppp") - go pipeW.Write(initialContent) - bmb := newMatcher(pipeR, toblockList, 5, maxBufferBestEffort) - p := make([]byte, len(initialContent)+10) - pipeR.Close() - _, err := bmb.Read(p) - if err == nil || err != io.ErrClosedPipe { - t.Errorf("Failed to get correct read error: %v", err) - } - }) - - t.Run("maxBuffer read on initial closed reader", func(t *testing.T) { - pipeR, _ := io.Pipe() - initialContent := []byte("fppppppppp") - bmb := newMatcher(pipeR, toblockList, 5, maxBufferBestEffort) - p := make([]byte, len(initialContent)+10) - pipeR.Close() - bmb.Close() - _, err := bmb.Read(p) - if err == nil || err.Error() != "reader closed" { - t.Errorf("Failed to get correct read error: %v", err) - } - }) -} - func TestBlockCreateFilterErrors(t *testing.T) { spec := NewBlock(1024) @@ -181,26 +136,90 @@ func TestBlockCreateFilterErrors(t *testing.T) { } func TestBlock(t *testing.T) { - backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + r.Body.Close() w.WriteHeader(200) w.Write([]byte("OK")) })) defer backend.Close() spec := NewBlock(1024) - args := []interface{}{"foo"} fr := make(filters.Registry) fr.Register(spec) - r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} - - proxy := proxytest.New(fr, r) - defer proxy.Close() - reqURL, err := url.Parse(proxy.URL) - if err != nil { - t.Errorf("Failed to parse url %s: %v", proxy.URL, err) - } t.Run("block request", func(t *testing.T) { + args := []interface{}{"foo"} + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + + buf := bytes.NewBufferString("hello foo world") + req, err := http.NewRequest("POST", reqURL.String(), buf) + if err != nil { + t.Fatal(err) + } + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + defer rsp.Body.Close() + if rsp.StatusCode != 400 { + t.Errorf("Not Blocked response status code %d", rsp.StatusCode) + } + }) + + t.Run("block request chain first blocks", func(t *testing.T) { + args := []interface{}{"foo"} + args2 := []interface{}{"bar"} + r := &eskip.Route{ + Filters: []*eskip.Filter{ + {Name: spec.Name(), Args: args}, + {Name: spec.Name(), Args: args2}, + }, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + + buf := bytes.NewBufferString("hello foo world") + req, err := http.NewRequest("POST", reqURL.String(), buf) + if err != nil { + t.Fatal(err) + } + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + defer rsp.Body.Close() + if rsp.StatusCode != 400 { + t.Errorf("Not Blocked response status code %d", rsp.StatusCode) + } + }) + + t.Run("block request chain second blocks", func(t *testing.T) { + args := []interface{}{"bar"} + args2 := []interface{}{"foo"} + r := &eskip.Route{ + Filters: []*eskip.Filter{ + {Name: spec.Name(), Args: args}, + {Name: spec.Name(), Args: args2}, + }, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + buf := bytes.NewBufferString("hello foo world") req, err := http.NewRequest("POST", reqURL.String(), buf) if err != nil { @@ -218,6 +237,15 @@ func TestBlock(t *testing.T) { }) t.Run("pass request", func(t *testing.T) { + args := []interface{}{"foo"} + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + buf := bytes.NewBufferString("hello world") req, err := http.NewRequest("POST", reqURL.String(), buf) if err != nil { @@ -234,7 +262,97 @@ func TestBlock(t *testing.T) { } }) + t.Run("pass request with filter chain and check content", func(t *testing.T) { + content := "hello world" + + be := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + res, err := io.ReadAll(r.Body) + r.Body.Close() + if err != nil { + w.WriteHeader(500) + w.Write([]byte("Failed to read body")) + return + } + if s := string(res); s != content { + t.Logf("backend received: %q", s) + w.WriteHeader(400) + w.Write([]byte("wrong body")) + return + } + w.WriteHeader(200) + w.Write([]byte("OK")) + })) + defer be.Close() + + args := []interface{}{"foo"} + args2 := []interface{}{"bar"} + r := &eskip.Route{Filters: []*eskip.Filter{ + {Name: spec.Name(), Args: args}, + {Name: spec.Name(), Args: args2}, + }, Backend: be.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + + buf := bytes.NewBufferString(content) + req, err := http.NewRequest("POST", reqURL.String(), buf) + if err != nil { + t.Fatal(err) + } + + rsp, err := proxy.Client().Do(req) + if err != nil { + t.Fatal(err) + } + result, _ := io.ReadAll(rsp.Body) + defer rsp.Body.Close() + if rsp.StatusCode != 200 { + t.Errorf("Blocked response status code %d: %s", rsp.StatusCode, string(result)) + } + }) + t.Run("pass request on empty body", func(t *testing.T) { + args := []interface{}{"foo"} + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + + buf := bytes.NewBufferString("") + req, err := http.NewRequest("POST", reqURL.String(), buf) + if err != nil { + t.Fatal(err) + } + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + defer rsp.Body.Close() + if rsp.StatusCode != 200 { + t.Errorf("Blocked response status code %d", rsp.StatusCode) + } + }) + t.Run("pass request on empty body with filter chain", func(t *testing.T) { + args := []interface{}{"foo"} + args2 := []interface{}{"bar"} + r := &eskip.Route{Filters: []*eskip.Filter{ + {Name: spec.Name(), Args: args}, + {Name: spec.Name(), Args: args2}, + }, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + buf := bytes.NewBufferString("") req, err := http.NewRequest("POST", reqURL.String(), buf) if err != nil { @@ -251,6 +369,7 @@ func TestBlock(t *testing.T) { } }) } + func TestBlockHex(t *testing.T) { backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) @@ -259,19 +378,19 @@ func TestBlockHex(t *testing.T) { defer backend.Close() spec := NewBlockHex(1024) - args := []interface{}{`000a`} fr := make(filters.Registry) fr.Register(spec) - r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} - - proxy := proxytest.New(fr, r) - defer proxy.Close() - reqURL, err := url.Parse(proxy.URL) - if err != nil { - t.Errorf("Failed to parse url %s: %v", proxy.URL, err) - } t.Run("block request", func(t *testing.T) { + args := []interface{}{`000a`} + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + buf := bytes.NewBufferString("hello \x00\x0afoo world") req, err := http.NewRequest("POST", reqURL.String(), buf) if err != nil { @@ -289,6 +408,15 @@ func TestBlockHex(t *testing.T) { }) t.Run("block request binary data in request", func(t *testing.T) { + args := []interface{}{`000a`} + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + buf := bytes.NewBuffer([]byte{65, 65, 31, 0, 10, 102, 111, 111, 31}) req, err := http.NewRequest("POST", reqURL.String(), buf) if err != nil { @@ -306,6 +434,15 @@ func TestBlockHex(t *testing.T) { }) t.Run("pass request", func(t *testing.T) { + args := []interface{}{`000a`} + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + buf := bytes.NewBufferString("hello \x00a\x0a world") req, err := http.NewRequest("POST", reqURL.String(), buf) if err != nil { @@ -323,6 +460,15 @@ func TestBlockHex(t *testing.T) { }) t.Run("pass request binary data in request", func(t *testing.T) { + args := []interface{}{`000a`} + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: backend.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Errorf("Failed to parse url %s: %v", proxy.URL, err) + } + buf := bytes.NewBuffer([]byte{65, 65, 31, 0, 11, 102, 111, 111, 31}) req, err := http.NewRequest("POST", reqURL.String(), buf) if err != nil { @@ -339,67 +485,3 @@ func TestBlockHex(t *testing.T) { } }) } - -func BenchmarkBlock(b *testing.B) { - - fake := func(source string, len int) string { - return strings.Repeat(source[:2], len) // partially matches target - } - - fakematch := func(source string, len int) string { - return strings.Repeat(source, len) // matches target - } - - for _, tt := range []struct { - name string - tomatch []byte - bm []byte - }{ - { - name: "Small Stream without blocking", - tomatch: []byte(".class"), - bm: []byte(fake(".class", 1<<20)), // Test with 1Mib - }, - { - name: "Small Stream with blocking", - tomatch: []byte(".class"), - bm: []byte(fakematch(".class", 1<<20)), - }, - { - name: "Medium Stream without blocking", - tomatch: []byte(".class"), - bm: []byte(fake(".class", 1<<24)), // Test with ~10Mib - }, - { - name: "Medium Stream with blocking", - tomatch: []byte(".class"), - bm: []byte(fakematch(".class", 1<<24)), - }, - { - name: "Large Stream without blocking", - tomatch: []byte(".class"), - bm: []byte(fake(".class", 1<<27)), // Test with ~100Mib - }, - { - name: "Large Stream with blocking", - tomatch: []byte(".class"), - bm: []byte(fakematch(".class", 1<<27)), - }} { - b.Run(tt.name, func(b *testing.B) { - target := &nonBlockingReader{initialContent: tt.bm} - r := &http.Request{ - Body: target, - } - toblockList := []toblockKeys{{str: tt.tomatch}} - bmb := newMatcher(r.Body, toblockList, 2097152, maxBufferBestEffort) - p := make([]byte, len(target.initialContent)) - b.Logf("Number of loops: %b", b.N) - for n := 0; n < b.N; n++ { - _, err := bmb.Read(p) - if err != nil { - return - } - } - }) - } -} diff --git a/filters/block/matcher.go b/filters/block/matcher.go deleted file mode 100644 index b64050328b..0000000000 --- a/filters/block/matcher.go +++ /dev/null @@ -1,209 +0,0 @@ -package block - -import ( - "bytes" - "errors" - "io" - "sync" - - "github.com/zalando/skipper/metrics" - "github.com/zalando/skipper/proxy" -) - -type toblockKeys struct{ str []byte } - -const ( - readBufferSize uint64 = 8192 -) - -type maxBufferHandling int - -const ( - maxBufferBestEffort maxBufferHandling = iota - maxBufferAbort -) - -// matcher provides a reader that wraps an input reader, and blocks the request -// if a pattern was found. -// -// It reads enough data until at least a complete match of the -// pattern is met or the maxBufferSize is reached. When the pattern matches the entire -// buffered input, the replaced content is returned to the caller when maxBufferSize is -// reached. This also means that more replacements can happen than if we edited the -// entire content in one piece, but this is necessary to be able to use the matcher for -// input with unknown length. -// -// When the maxBufferHandling is set to maxBufferAbort, then the streaming is aborted -// and the rest of the payload is dropped. -// -// To limit the number of repeated scans over the buffered data, the size of the -// additional data read from the input grows exponentially with every iteration that -// didn't result with any matched data blocked. If there was any matched data -// the read size is reset to the initial value. -// -// When the input returns an error, e.g. EOF, the matcher finishes matching the buffered -// data, blocks or return it to the caller. -// -// When the matcher is closed, it doesn't read anymore from the input or return any -// buffered data. If the input implements io.Closer, closing the matcher closes the -// input, too. -type matcher struct { - once sync.Once - input io.ReadCloser - toblockList []toblockKeys - maxBufferSize uint64 - maxBufferHandling maxBufferHandling - readBuffer []byte - - ready *bytes.Buffer - pending *bytes.Buffer - - metrics metrics.Metrics - - err error - closed bool -} - -var ( - ErrMatcherBufferFull = errors.New("matcher buffer full") -) - -func newMatcher( - input io.ReadCloser, - toblockList []toblockKeys, - maxBufferSize uint64, - mbh maxBufferHandling, -) *matcher { - - rsize := readBufferSize - if maxBufferSize < rsize { - rsize = maxBufferSize - } - - return &matcher{ - once: sync.Once{}, - input: input, - toblockList: toblockList, - maxBufferSize: maxBufferSize, - maxBufferHandling: mbh, - readBuffer: make([]byte, rsize), - pending: bytes.NewBuffer(nil), - ready: bytes.NewBuffer(nil), - metrics: metrics.Default, - } -} - -func (m *matcher) readNTimes(times int) (bool, error) { - var consumedInput bool - for i := 0; i < times; i++ { - n, err := m.input.Read(m.readBuffer) - m.pending.Write(m.readBuffer[:n]) - if n > 0 { - consumedInput = true - } - - if err != nil { - return consumedInput, err - } - - } - - return consumedInput, nil -} - -func (m *matcher) match(b []byte) (int, error) { - var consumed int - - for _, s := range m.toblockList { - if bytes.Contains(b, s.str) { - b = nil - return 0, proxy.ErrBlocked - } - } - consumed += len(b) - return consumed, nil - -} - -func (m *matcher) fill(requested int) error { - readSize := 1 - for m.ready.Len() < requested { - consumedInput, err := m.readNTimes(readSize) - if !consumedInput { - io.CopyBuffer(m.ready, m.pending, m.readBuffer) - return err - } - - if uint64(m.pending.Len()) > m.maxBufferSize { - switch m.maxBufferHandling { - case maxBufferAbort: - return ErrMatcherBufferFull - default: - _, err := m.match(m.pending.Bytes()) - if err != nil { - return err - } - m.pending.Reset() - readSize = 1 - } - } - - readSize *= 2 - } - return nil -} - -func (m *matcher) Read(p []byte) (int, error) { - if m.closed { - return 0, ErrClosed - } - - if m.ready.Len() == 0 && m.err != nil { - return 0, m.err - } - - if m.ready.Len() < len(p) { - m.err = m.fill(len(p)) - } - - if m.err == ErrMatcherBufferFull { - return 0, ErrMatcherBufferFull - } - - if m.err == proxy.ErrBlocked { - m.metrics.IncCounter("blocked.requests") - return 0, proxy.ErrBlocked - } - - n, _ := m.ready.Read(p) - - if n == 0 && len(p) > 0 && m.err != nil { - return 0, m.err - } - - n, err := m.match(p) - - if err != nil { - m.closed = true - - if err == proxy.ErrBlocked { - m.metrics.IncCounter("blocked.requests") - } - - return 0, err - } - - return n, nil -} - -// Close closes the undelrying reader if it implements io.Closer. -func (m *matcher) Close() error { - var err error - m.once.Do(func() { - m.closed = true - if c, ok := m.input.(io.Closer); ok { - err = c.Close() - } - }) - return err -} diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 0609d87412..b60cf6f033 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -179,6 +179,7 @@ func Filters() []filters.Spec { diag.NewAbsorb(), diag.NewAbsorbSilent(), diag.NewLogHeader(), + diag.NewLogBody(), diag.NewUniformRequestLatency(), diag.NewUniformResponseLatency(), diag.NewNormalRequestLatency(), diff --git a/filters/diag/absorb.go b/filters/diag/absorb.go index 0d53601236..f48b891ca8 100644 --- a/filters/diag/absorb.go +++ b/filters/diag/absorb.go @@ -68,9 +68,8 @@ func NewAbsorbSilent() filters.Spec { func (a *absorb) Name() string { if a.silent { return filters.AbsorbSilentName - } else { - return filters.AbsorbName } + return filters.AbsorbName } func (a *absorb) CreateFilter(args []interface{}) (filters.Filter, error) { return a, nil } diff --git a/filters/diag/logbody.go b/filters/diag/logbody.go new file mode 100644 index 0000000000..4c41b3210b --- /dev/null +++ b/filters/diag/logbody.go @@ -0,0 +1,91 @@ +package diag + +import ( + "context" + "strings" + + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/net" +) + +type logBody struct { + request bool + response bool +} + +// NewLogBody creates a filter specification for the 'logBody()' filter. +func NewLogBody() filters.Spec { return logBody{} } + +// Name returns the logBody filtern name. +func (logBody) Name() string { + return filters.LogBodyName +} + +func (logBody) CreateFilter(args []interface{}) (filters.Filter, error) { + var ( + request = false + response = false + ) + + // default behavior + if len(args) == 0 { + request = true + } + + for i := range args { + opt, ok := args[i].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + switch strings.ToLower(opt) { + case "response": + response = true + case "request": + request = true + } + + } + + return logBody{ + request: request, + response: response, + }, nil +} + +func (lb logBody) Request(ctx filters.FilterContext) { + if !lb.request { + return + } + + req := ctx.Request() + if req.Body != nil { + req.Body = net.WrapBody( + req.Context(), + func(p []byte) (int, error) { + ctx.Logger().Infof(`logBody("request"): %q`, p) + return len(p), nil + }, + req.Body) + } +} + +func (lb logBody) Response(ctx filters.FilterContext) { + if !lb.response { + return + } + + rsp := ctx.Response() + if rsp.Body != nil { + // if this is not set we get from curl + // Error while processing content unencoding: invalid stored block lengths + rsp.Header.Del("Content-Length") + rsp.ContentLength = -1 + + rsp.Body = net.WrapBody( + context.Background(), // not sure if it makes sense to be cancellable here + func(p []byte) (int, error) { + ctx.Logger().Infof(`logBody("response"): %q`, p) + return len(p), nil + }, rsp.Body) + } +} diff --git a/filters/diag/logbody_test.go b/filters/diag/logbody_test.go new file mode 100644 index 0000000000..906fd05699 --- /dev/null +++ b/filters/diag/logbody_test.go @@ -0,0 +1,148 @@ +package diag + +import ( + "bytes" + "fmt" + "io" + "net/http" + "os" + "strings" + "testing" + + log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/proxy/proxytest" +) + +func TestLogBody(t *testing.T) { + defer func() { + log.SetOutput(os.Stderr) + }() + + t.Run("Request", func(t *testing.T) { + beRoutes := eskip.MustParse(`r: * -> absorbSilent() -> repeatContent("a", 10) -> `) + fr := make(filters.Registry) + fr.Register(NewLogBody()) + fr.Register(NewAbsorbSilent()) + fr.Register(NewRepeat()) + be := proxytest.New(fr, beRoutes...) + defer be.Close() + + routes := eskip.MustParse(fmt.Sprintf(`r: * -> logBody("request") -> "%s"`, be.URL)) + p := proxytest.New(fr, routes...) + defer p.Close() + + content := "testrequest" + logbuf := bytes.NewBuffer(nil) + log.SetOutput(logbuf) + buf := bytes.NewBufferString(content) + rsp, err := http.DefaultClient.Post(p.URL, "text/plain", buf) + log.SetOutput(os.Stderr) + if err != nil { + t.Fatalf("Failed to POST: %v", err) + } + defer rsp.Body.Close() + + if got := logbuf.String(); !strings.Contains(got, content) { + t.Fatalf("Failed to find %q log, got: %q", content, got) + } + }) + + t.Run("Request is default", func(t *testing.T) { + beRoutes := eskip.MustParse(`r: * -> absorbSilent() -> repeatContent("a", 10) -> `) + fr := make(filters.Registry) + fr.Register(NewLogBody()) + fr.Register(NewAbsorbSilent()) + fr.Register(NewRepeat()) + be := proxytest.New(fr, beRoutes...) + defer be.Close() + + routes := eskip.MustParse(fmt.Sprintf(`r: * -> logBody() -> "%s"`, be.URL)) + p := proxytest.New(fr, routes...) + defer p.Close() + + content := "testrequestisdefault" + logbuf := bytes.NewBuffer(nil) + log.SetOutput(logbuf) + buf := bytes.NewBufferString(content) + rsp, err := http.DefaultClient.Post(p.URL, "text/plain", buf) + log.SetOutput(os.Stderr) + if err != nil { + t.Fatalf("Failed to POST: %v", err) + } + defer rsp.Body.Close() + + if got := logbuf.String(); !strings.Contains(got, content) { + t.Fatalf("Failed to find %q log, got: %q", content, got) + } + }) + + t.Run("Response", func(t *testing.T) { + beRoutes := eskip.MustParse(`r: * -> repeatContent("a", 10) -> `) + fr := make(filters.Registry) + fr.Register(NewLogBody()) + fr.Register(NewRepeat()) + be := proxytest.New(fr, beRoutes...) + defer be.Close() + + routes := eskip.MustParse(fmt.Sprintf(`r: * -> logBody("response") -> "%s"`, be.URL)) + p := proxytest.New(fr, routes...) + defer p.Close() + + content := "testrequest" + logbuf := bytes.NewBuffer(nil) + log.SetOutput(logbuf) + buf := bytes.NewBufferString(content) + rsp, err := http.DefaultClient.Post(p.URL, "text/plain", buf) + if err != nil { + t.Fatalf("Failed to do post request: %v", err) + } + + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + log.SetOutput(os.Stderr) + + got := logbuf.String() + if strings.Contains(got, content) { + t.Fatalf("Found request body %q in %q", content, got) + } + // repeatContent("a", 10) + if !strings.Contains(got, "aaaaaaaaaa") { + t.Fatalf("Failed to find rsp content %q log, got: %q", "aaaaaaaaaa", got) + } + }) + t.Run("Request-response", func(t *testing.T) { + beRoutes := eskip.MustParse(`r: * -> repeatContent("a", 10) -> `) + fr := make(filters.Registry) + fr.Register(NewLogBody()) + fr.Register(NewRepeat()) + be := proxytest.New(fr, beRoutes...) + defer be.Close() + + routes := eskip.MustParse(fmt.Sprintf(`r: * -> logBody("request","response") -> "%s"`, be.URL)) + p := proxytest.New(fr, routes...) + defer p.Close() + + requestContent := "testrequestresponse" + logbuf := bytes.NewBuffer(nil) + log.SetOutput(logbuf) + buf := bytes.NewBufferString(requestContent) + rsp, err := http.DefaultClient.Post(p.URL, "text/plain", buf) + if err != nil { + t.Fatalf("Failed to get respone: %v", err) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + log.SetOutput(os.Stderr) + + got := logbuf.String() + if !strings.Contains(got, requestContent) { + t.Fatalf("Failed to find req %q log, got: %q", requestContent, got) + } + // repeatContent("a", 10) + if !strings.Contains(got, "aaaaaaaaaa") { + t.Fatalf("Failed to find %q log, got: %q", "aaaaaaaaaa", got) + } + }) +} diff --git a/filters/filters.go b/filters/filters.go index 6caaaa7a41..4e57ae9be4 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -262,6 +262,7 @@ const ( NormalResponseLatencyName = "normalResponseLatency" HistogramRequestLatencyName = "histogramRequestLatency" HistogramResponseLatencyName = "histogramResponseLatency" + LogBodyName = "logBody" LogHeaderName = "logHeader" TeeName = "tee" TeenfName = "teenf" diff --git a/net/httpbody.go b/net/httpbody.go new file mode 100644 index 0000000000..a17af98470 --- /dev/null +++ b/net/httpbody.go @@ -0,0 +1,245 @@ +package net + +import ( + "bytes" + "context" + "errors" + "io" + "sync" + + "github.com/zalando/skipper/metrics" +) + +var ( + ErrClosed = errors.New("reader closed") + ErrBlocked = errors.New("blocked string match found in body") +) + +const ( + defaultReadBufferSize uint64 = 8192 +) + +type MaxBufferHandling int + +const ( + MaxBufferBestEffort MaxBufferHandling = iota + MaxBufferAbort +) + +type matcher struct { + ctx context.Context + once sync.Once + input io.ReadCloser + f func([]byte) (int, error) + maxBufferSize uint64 + maxBufferHandling MaxBufferHandling + readBuffer []byte + + ready *bytes.Buffer + pending *bytes.Buffer + + metrics metrics.Metrics + + err error + closed bool +} + +var ( + ErrMatcherBufferFull = errors.New("matcher buffer full") +) + +func newMatcher( + ctx context.Context, + input io.ReadCloser, + f func([]byte) (int, error), + maxBufferSize uint64, + mbh MaxBufferHandling, +) *matcher { + + rsize := defaultReadBufferSize + if maxBufferSize < rsize { + rsize = maxBufferSize + } + println("maxBufferSize:", maxBufferSize, "rsize:", rsize) + + return &matcher{ + ctx: ctx, + once: sync.Once{}, + input: input, + f: f, + maxBufferSize: maxBufferSize, + maxBufferHandling: mbh, + readBuffer: make([]byte, rsize), + pending: bytes.NewBuffer(nil), + ready: bytes.NewBuffer(nil), + metrics: metrics.Default, + } +} + +func (m *matcher) readNTimes(times int) (bool, error) { + var consumedInput bool + for i := 0; i < times; i++ { + n, err := m.input.Read(m.readBuffer) + println("readNTimes(", times, "): read n bytes:", n, "eof:", err == io.EOF) + n2, err2 := m.pending.Write(m.readBuffer[:n]) + println("readNTimes(", times, "): wrote n2 bytes:", n2, "eof:", err == io.EOF) + + if n > 0 { + consumedInput = true + } + + if err != nil { + return consumedInput, err + } + if err2 != nil { + return consumedInput, err + } + + } + + return consumedInput, nil +} + +func (m *matcher) fill(requested int) error { + readSize := 1 + for m.ready.Len() < requested { + consumedInput, err := m.readNTimes(readSize) + println("fill(", requested, "), m.ready.Len():", m.ready.Len(), "consumedInput:", consumedInput, "m.pending.Len():", m.pending.Len()) + if !consumedInput { + io.CopyBuffer(m.ready, m.pending, m.readBuffer) + return err + } + + if uint64(m.pending.Len()) > m.maxBufferSize { + switch m.maxBufferHandling { + case MaxBufferAbort: + return ErrMatcherBufferFull + default: + select { + case <-m.ctx.Done(): + m.Close() + return m.ctx.Err() + default: + } + _, err := m.f(m.pending.Bytes()) + if err != nil { + return err + } + m.pending.Reset() + readSize = 1 + } + } + + readSize *= 2 + } + return nil +} + +func (m *matcher) Read(p []byte) (int, error) { + println("matcher.Read: len(p):", len(p)) + if m.closed { + return 0, ErrClosed + } + + if m.ready.Len() == 0 && m.err != nil { + return 0, m.err + } + + if m.ready.Len() < len(p) { + m.err = m.fill(len(p)) + } + + if m.err == ErrMatcherBufferFull { + return 0, ErrMatcherBufferFull + } + + if m.err == ErrBlocked { + m.metrics.IncCounter("blocked.requests") + return 0, ErrBlocked + } + + n, _ := m.ready.Read(p) + println("matcher.Read n bytes:", n) + + if n == 0 && len(p) > 0 && m.err != nil { + return 0, m.err + } + p = p[:n] + + select { + case <-m.ctx.Done(): + m.Close() + return 0, m.ctx.Err() + default: + } + n, err := m.f(p) + println("matcher.Read f processed n bytes:", n) + + if err != nil { + m.closed = true + + if err == ErrBlocked { + m.metrics.IncCounter("blocked.requests") + } + + return 0, err + } + return n, nil +} + +// Close closes the underlying reader if it implements io.Closer. +func (m *matcher) Close() error { + var err error + m.once.Do(func() { + m.closed = true + if c, ok := m.input.(io.Closer); ok { + err = c.Close() + } + }) + return err +} + +/* + Wants: + - [x] filters can read the body content for example WAF scoring + - [ ] filters can change the body content for example sedRequest() + - [x] filters need to be chainable (support -> ) + - [x] filters need to be able to stop streaming to request blockContent() or WAF deny() + + TODO(sszuecs): + + 1) major optimization: use registry pattern and have only one body + wrapped for concatenating readers and run all f() in a loop, so + streaming does not happen for all but once for all + readers. Important if one write is between two readers we can not + do this, so we need to detect this case. + + 3) in case we ErrBlock, then we break the loop or cancel the + context to stop processing. The registry control layer should be + able to stop all processing. + +*/ + +// WrapBody wraps the given ReadCloser such that the given function f +// runs along streaming the http body to the target. A target can be +// the request target or the response target. +// +// NOTE: This function is *experimental* and will likely change or disappear in the future. +func WrapBody(ctx context.Context, f func([]byte) (int, error), rc io.ReadCloser) io.ReadCloser { + return newMatcher(ctx, rc, f, defaultReadBufferSize, MaxBufferBestEffort) +} + +type BodyOptions struct { + MaxBufferHandling MaxBufferHandling + ReadBufferSize uint64 +} + +// WrapBodyWithOptions wraps the given ReadCloser such that the given +// function f runs along streaming the http body to the target. A +// target can be the request target or the response target. It applies +// given BodyOptions to the matcher. +// +// NOTE: This function is *experimental* and will likely change or disappear in the future. +func WrapBodyWithOptions(ctx context.Context, bo BodyOptions, f func([]byte) (int, error), rc io.ReadCloser) io.ReadCloser { + return newMatcher(ctx, rc, f, bo.ReadBufferSize, bo.MaxBufferHandling) +} diff --git a/net/httpbody_test.go b/net/httpbody_test.go new file mode 100644 index 0000000000..876396cc91 --- /dev/null +++ b/net/httpbody_test.go @@ -0,0 +1,333 @@ +package net + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +type mybuf struct { + buf *bytes.Buffer +} + +func (mybuf) Close() error { + return nil +} + +func (b mybuf) Read(p []byte) (int, error) { + return b.buf.Read(p) +} + +type toBlockKeys struct{ Str []byte } + +func blockMatcher(matches []toBlockKeys) func(b []byte) (int, error) { + return func(b []byte) (int, error) { + var consumed int + for _, s := range matches { + if bytes.Contains(b, s.Str) { + b = nil + return 0, ErrBlocked + } + } + consumed += len(b) + return consumed, nil + } +} + +func TestHttpBodyReadOnly(t *testing.T) { + sent := "hell0 foo bar" + + okBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b := make([]byte, 0, 1024) + buf := bytes.NewBuffer(b) + n, err := io.Copy(buf, r.Body) + if err != nil { + t.Fatalf("Failed to read body on backend receiver: %v", err) + } + + t.Logf("read(%d): %s", n, buf) + if got := buf.String(); got != sent { + t.Fatalf("Failed to get request body in okbackend. want: %q, got: %q", sent, got) + } + w.WriteHeader(200) + // w.Write([]byte("OK")) + w.Write(b[:n]) + })) + defer okBackend.Close() + + blockedBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b := make([]byte, 1024) + buf := bytes.NewBuffer(b) + _, err := io.Copy(buf, r.Body) + + // body started to stream but was cut by sender + if err != io.ErrUnexpectedEOF { + t.Logf("expected 'io.ErrUnexpectedEOF' got: %v", err) + } + + w.WriteHeader(200) + w.Write([]byte("OK")) + })) + defer blockedBackend.Close() + + t.Run("single block matcher without match", func(t *testing.T) { + var b mybuf + b.buf = bytes.NewBufferString(sent) + + body := WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("no match")}}), b) + defer body.Close() + rsp, err := (&http.Client{}).Post(okBackend.URL, "text/plain", body) + if err != nil { + t.Fatalf("Expected POST request to be blocked, got err: %v", err) + } + + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get the expected status code 200, got: %d", rsp.StatusCode) + } + var buf bytes.Buffer + io.Copy(&buf, rsp.Body) + rsp.Body.Close() + if got := buf.String(); got != sent { + t.Fatalf("Failed to get %q, got %q", sent, got) + } + }) + + t.Run("double block matcher without match", func(t *testing.T) { + var b mybuf + b.buf = bytes.NewBufferString(sent) + + bod := WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("no-match")}}), b) + defer bod.Close() + body := WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("no match")}}), bod) + defer body.Close() + rsp, err := (&http.Client{}).Post(okBackend.URL, "text/plain", body) + if err != nil { + t.Fatalf("Failed to POST request: %v", err) + } + + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get 200 status code, got: %v", rsp.StatusCode) + } + var buf bytes.Buffer + io.Copy(&buf, rsp.Body) + rsp.Body.Close() + if got := buf.String(); got != sent { + t.Fatalf("Failed to get %q, got %q", sent, got) + } + }) + + t.Run("single block matcher with match", func(t *testing.T) { + + var b mybuf + b.buf = bytes.NewBufferString("hell0 foo bar") + + body := WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("foo")}}), b) + defer body.Close() + rsp, err := (&http.Client{}).Post(blockedBackend.URL, "text/plain", body) + if !errors.Is(err, ErrBlocked) { + if rsp != nil { + t.Errorf("rsp should be nil, status code: %d", rsp.StatusCode) + } + t.Fatalf("Expected POST request to be blocked, got err: %v", err) + } + }) + + t.Run("double block matcher with first match", func(t *testing.T) { + var b mybuf + b.buf = bytes.NewBufferString("hell0 foo bar") + + body := WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("foo")}}), b) + body = WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("no match")}}), body) + defer body.Close() + rsp, err := (&http.Client{}).Post(blockedBackend.URL, "text/plain", body) + + if !errors.Is(err, ErrBlocked) { + if rsp != nil { + t.Errorf("rsp should be nil, status code: %d", rsp.StatusCode) + } + t.Fatalf("Expected POST request to be blocked, got err: %v", err) + } + }) + + t.Run("double block matcher with second match", func(t *testing.T) { + var b mybuf + b.buf = bytes.NewBufferString("hell0 foo bar") + + body := WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("no match")}}), b) + body = WrapBody(context.Background(), blockMatcher([]toBlockKeys{{Str: []byte("bar")}}), body) + defer body.Close() + rsp, err := (&http.Client{}).Post(blockedBackend.URL, "text/plain", body) + + if !errors.Is(err, ErrBlocked) { + if rsp != nil { + t.Errorf("rsp should be nil, status code: %d", rsp.StatusCode) + } + t.Fatalf("Expected POST request to be blocked, got err: %v", err) + } + }) + +} + +type nonBlockingReader struct { + initialContent []byte +} + +func (r *nonBlockingReader) Read(p []byte) (int, error) { + n := copy(p, r.initialContent) + r.initialContent = r.initialContent[n:] + return n, nil +} + +func (r *nonBlockingReader) Close() error { + return nil +} + +type slowBlockingReader struct { + initialContent []byte +} + +func (r *slowBlockingReader) Read(p []byte) (int, error) { + time.Sleep(250 * time.Millisecond) + n := copy(p, r.initialContent) + r.initialContent = r.initialContent[n:] + return n, nil +} + +func (r *slowBlockingReader) Close() error { + return nil +} + +// TODO(sszuecs): test all error cases for matcher, the following we had for blockContent() filter +func TestMatcherErrorCases(t *testing.T) { + toblockList := []toBlockKeys{{Str: []byte(".class")}} + t.Run("maxBufferAbort", func(t *testing.T) { + r := &nonBlockingReader{initialContent: []byte("fppppppppp .class")} + bmb := newMatcher(context.Background(), r, blockMatcher(toblockList), 5, MaxBufferAbort) + p := make([]byte, len(r.initialContent)) + _, err := bmb.Read(p) + if err != ErrMatcherBufferFull { + t.Errorf("Failed to get expected error %v, got: %v", ErrMatcherBufferFull, err) + } + }) + + t.Run("maxBuffer", func(t *testing.T) { + r := &nonBlockingReader{initialContent: []byte("fppppppppp .class")} + bmb := newMatcher(context.Background(), r, blockMatcher(toblockList), 5, MaxBufferBestEffort) + p := make([]byte, len(r.initialContent)) + _, err := bmb.Read(p) + if err != nil { + t.Errorf("Failed to read: %v", err) + } + }) + + t.Run("cancel read", func(t *testing.T) { + r := &slowBlockingReader{initialContent: []byte("fppppppppp .class")} + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond)) + defer cancel() + bmb := newMatcher(ctx, r, blockMatcher(toblockList), 5, MaxBufferBestEffort) + p := make([]byte, len(r.initialContent)) + _, err := bmb.Read(p) + if err == nil { + t.Errorf("Failed to cancel read: %v", err) + } + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("Failed to get deadline exceeded, got: %T", err) + } + }) + + t.Run("maxBuffer read on closed reader", func(t *testing.T) { + pipeR, pipeW := io.Pipe() + initialContent := []byte("fppppppppp") + go pipeW.Write(initialContent) + bmb := newMatcher(context.Background(), pipeR, blockMatcher(toblockList), 5, MaxBufferBestEffort) + p := make([]byte, len(initialContent)+10) + pipeR.Close() + _, err := bmb.Read(p) + if err == nil || err != io.ErrClosedPipe { + t.Errorf("Failed to get correct read error: %v", err) + } + }) + + t.Run("maxBuffer read on initial closed reader", func(t *testing.T) { + pipeR, _ := io.Pipe() + initialContent := []byte("fppppppppp") + bmb := newMatcher(context.Background(), pipeR, blockMatcher(toblockList), 5, MaxBufferBestEffort) + p := make([]byte, len(initialContent)+10) + pipeR.Close() + bmb.Close() + _, err := bmb.Read(p) + if err == nil || err.Error() != "reader closed" { + t.Errorf("Failed to get correct read error: %v", err) + } + }) +} + +func BenchmarkBlock(b *testing.B) { + + fake := func(source string, len int) string { + return strings.Repeat(source[:2], len) // partially matches target + } + + fakematch := func(source string, len int) string { + return strings.Repeat(source, len) // matches target + } + + for _, tt := range []struct { + name string + tomatch []byte + bm []byte + }{ + { + name: "Small Stream without blocking", + tomatch: []byte(".class"), + bm: []byte(fake(".class", 1<<20)), // Test with 1Mib + }, + { + name: "Small Stream with blocking", + tomatch: []byte(".class"), + bm: []byte(fakematch(".class", 1<<20)), + }, + { + name: "Medium Stream without blocking", + tomatch: []byte(".class"), + bm: []byte(fake(".class", 1<<24)), // Test with ~10Mib + }, + { + name: "Medium Stream with blocking", + tomatch: []byte(".class"), + bm: []byte(fakematch(".class", 1<<24)), + }, + { + name: "Large Stream without blocking", + tomatch: []byte(".class"), + bm: []byte(fake(".class", 1<<27)), // Test with ~100Mib + }, + { + name: "Large Stream with blocking", + tomatch: []byte(".class"), + bm: []byte(fakematch(".class", 1<<27)), + }} { + b.Run(tt.name, func(b *testing.B) { + target := &nonBlockingReader{initialContent: tt.bm} + r := &http.Request{ + Body: target, + } + toblockList := []toBlockKeys{{Str: tt.tomatch}} + bmb := newMatcher(context.Background(), r.Body, blockMatcher(toblockList), 2097152, MaxBufferBestEffort) + p := make([]byte, len(target.initialContent)) + b.Logf("Number of loops: %b", b.N) + for n := 0; n < b.N; n++ { + _, err := bmb.Read(p) + if err != nil { + return + } + } + }) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 2d2ef831d8..6fb34e7e85 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -36,6 +36,7 @@ import ( "github.com/zalando/skipper/loadbalancer" "github.com/zalando/skipper/logging" "github.com/zalando/skipper/metrics" + skpnet "github.com/zalando/skipper/net" "github.com/zalando/skipper/proxy/fastcgi" "github.com/zalando/skipper/ratelimit" "github.com/zalando/skipper/rfc" @@ -268,7 +269,6 @@ const ( ) var ( - ErrBlocked = errors.New("blocked string match found in body") errRouteLookupFailed = &proxyError{err: errRouteLookup} errCircuitBreakerOpen = &proxyError{ err: errors.New("circuit breaker open"), @@ -891,7 +891,7 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co ctx.proxySpan.LogKV("http_roundtrip", EndEvent) if err != nil { - if errors.Is(err, ErrBlocked) { + if errors.Is(err, skpnet.ErrBlocked) { p.tracing.setTag(ctx.proxySpan, BlockTag, true) p.tracing.setTag(ctx.proxySpan, HTTPStatusCodeTag, uint16(http.StatusBadRequest)) return nil, &proxyError{err: err, code: http.StatusBadRequest} diff --git a/proxy/teebody.go b/proxy/teebody.go index a9bb637cff..590efe6092 100644 --- a/proxy/teebody.go +++ b/proxy/teebody.go @@ -1,10 +1,11 @@ package proxy import ( - log "github.com/sirupsen/logrus" "io" "net/http" "net/url" + + log "github.com/sirupsen/logrus" ) type teeTie struct {