Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate envelope writing from marshalling #586

Closed
wants to merge 10 commits into from
18 changes: 6 additions & 12 deletions buffer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

const (
initialBufferSize = 512
initialBufferSize = bytes.MinRead
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes.MinRead == 512 but more for showing intent, we want to avoid another allocation on first read.

maxRecycleBufferSize = 8 * 1024 * 1024 // if >8MiB, don't hold onto a buffer
)

Expand All @@ -29,26 +29,20 @@ type bufferPool struct {
}

func newBufferPool() *bufferPool {
return &bufferPool{
Pool: sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, initialBufferSize))
},
},
}
return &bufferPool{}
}

func (b *bufferPool) Get() *bytes.Buffer {
if buf, ok := b.Pool.Get().(*bytes.Buffer); ok {
buf.Reset()
return buf
}
return bytes.NewBuffer(make([]byte, 0, initialBufferSize))
}

func (b *bufferPool) Put(buffer *bytes.Buffer) {
if buffer.Cap() > maxRecycleBufferSize {
func (b *bufferPool) Put(buf *bytes.Buffer) {
if buf.Cap() > maxRecycleBufferSize {
return
}
buffer.Reset()
b.Pool.Put(buffer)
b.Pool.Put(buf)
}
16 changes: 12 additions & 4 deletions compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func newCompressionPool(
}
}

func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readMaxBytes int64) *Error {
func (c *compressionPool) Decompress(pool *bufferPool, src *bytes.Buffer, readMaxBytes int64) *Error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression now takes a pool and swaps with the source on success.

tmp := pool.Get()
defer pool.Put(tmp)

decompressor, err := c.getDecompressor(src)
if err != nil {
return errorf(CodeInvalidArgument, "get decompressor: %w", err)
Expand All @@ -87,7 +90,7 @@ func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readM
if readMaxBytes > 0 && readMaxBytes < math.MaxInt64 {
reader = io.LimitReader(decompressor, readMaxBytes+1)
}
bytesRead, err := dst.ReadFrom(reader)
bytesRead, err := tmp.ReadFrom(reader)
if err != nil {
_ = c.putDecompressor(decompressor)
return errorf(CodeInvalidArgument, "decompress: %w", err)
Expand All @@ -103,11 +106,15 @@ func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readM
if err := c.putDecompressor(decompressor); err != nil {
return errorf(CodeUnknown, "recycle decompressor: %w", err)
}
*tmp, *src = *src, *tmp // swap buffers
return nil
}

func (c *compressionPool) Compress(dst *bytes.Buffer, src *bytes.Buffer) *Error {
compressor, err := c.getCompressor(dst)
func (c *compressionPool) Compress(pool *bufferPool, src *bytes.Buffer) *Error {
tmp := pool.Get()
defer pool.Put(tmp)

compressor, err := c.getCompressor(tmp)
if err != nil {
return errorf(CodeUnknown, "get compressor: %w", err)
}
Expand All @@ -118,6 +125,7 @@ func (c *compressionPool) Compress(dst *bytes.Buffer, src *bytes.Buffer) *Error
if err := c.putCompressor(compressor); err != nil {
return errorf(CodeInternal, "recycle compressor: %w", err)
}
*tmp, *src = *src, *tmp // swap buffers
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func receiveUnaryResponse[T any](conn StreamingClientConn) (*Response[T], error)
// In a well-formed stream, the response message may be followed by a block
// of in-stream trailers or HTTP trailers. To ensure that we receive the
// trailers, try to read another message from the stream.
if err := conn.Receive(new(T)); err == nil {
if err := conn.Receive(nil); err == nil {
return nil, NewError(CodeUnknown, errors.New("unary stream has multiple messages"))
} else if err != nil && !errors.Is(err, io.EOF) {
return nil, NewError(CodeUnknown, err)
Expand Down
31 changes: 16 additions & 15 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func TestGRPCMissingTrailersError(t *testing.T) {
assert.Equal(t, connectErr.Code(), connect.CodeInternal)
assert.True(
t,
strings.HasSuffix(connectErr.Message(), "protocol error: no Grpc-Status trailer: unexpected EOF"),
strings.HasSuffix(connectErr.Message(), "protocol error: no Grpc-Status trailer"),
)
}

Expand Down Expand Up @@ -2165,7 +2165,7 @@ func TestStreamUnexpectedEOF(t *testing.T) {
assert.Nil(t, err)
},
expectCode: connect.CodeInternal,
expectMsg: "internal: protocol error: no Grpc-Status trailer: unexpected EOF",
expectMsg: "internal: protocol error: no Grpc-Status trailer",
}, {
name: "grpc-web_missing_end",
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPCWeb()},
Expand All @@ -2178,7 +2178,7 @@ func TestStreamUnexpectedEOF(t *testing.T) {
assert.Nil(t, err)
},
expectCode: connect.CodeInternal,
expectMsg: "internal: protocol error: no Grpc-Status trailer: unexpected EOF",
expectMsg: "internal: protocol error: no Grpc-Status trailer",
}, {
name: "connect_partial_payload",
options: []connect.ClientOption{connect.WithProtoJSON()},
Expand All @@ -2190,8 +2190,8 @@ func TestStreamUnexpectedEOF(t *testing.T) {
_, err = responseWriter.Write(payload[:len(payload)-1])
assert.Nil(t, err)
},
expectCode: connect.CodeInvalidArgument,
expectMsg: fmt.Sprintf("invalid_argument: protocol error: promised %d bytes in enveloped message, got %d bytes", len(payload), len(payload)-1),
expectCode: connect.CodeInternal,
expectMsg: "internal: incomplete envelope: unexpected EOF",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed internal_argument I don't think it should be raised by the framework. Based on doc string: https://pkg.go.dev/google.golang.org/grpc@v1.58.0/codes#Code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check what grpc-go does here? I think I changed this to CodeInvalidArgument specifically to match them, but I may be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starts as io.ErrUnexpectedEOF and converted to status.Error(codes.Internal, err.Error())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What direction is this? Returning "internal" from the server, when the client has sent an invalid stream, seems very wrong. Internal should be reserved for internal server errors, not errors in the client (akin to 5xx vs 4xx). You generally want exception reporting/alerting for internal server errors since they indicate a likely bug in your code; you do not want any such alerting for client errors, because then a client can induce alerts in your system w/ carefully crafted (invalid) requests.

So I agree that this should be "internal" only in cases where it's the server that sent the invalid message.

}, {
name: "grpc_partial_payload",
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPC()},
Expand All @@ -2203,8 +2203,8 @@ func TestStreamUnexpectedEOF(t *testing.T) {
_, err = responseWriter.Write(payload[:len(payload)-1])
assert.Nil(t, err)
},
expectCode: connect.CodeInvalidArgument,
expectMsg: fmt.Sprintf("invalid_argument: protocol error: promised %d bytes in enveloped message, got %d bytes", len(payload), len(payload)-1),
expectCode: connect.CodeInternal,
expectMsg: "internal: incomplete envelope: unexpected EOF",
}, {
name: "grpc-web_partial_payload",
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPCWeb()},
Expand All @@ -2216,8 +2216,8 @@ func TestStreamUnexpectedEOF(t *testing.T) {
_, err = responseWriter.Write(payload[:len(payload)-1])
assert.Nil(t, err)
},
expectCode: connect.CodeInvalidArgument,
expectMsg: fmt.Sprintf("invalid_argument: protocol error: promised %d bytes in enveloped message, got %d bytes", len(payload), len(payload)-1),
expectCode: connect.CodeInternal,
expectMsg: "internal: incomplete envelope: unexpected EOF",
}, {
name: "connect_partial_frame",
options: []connect.ClientOption{connect.WithProtoJSON()},
Expand All @@ -2227,8 +2227,8 @@ func TestStreamUnexpectedEOF(t *testing.T) {
_, err := responseWriter.Write(head[:4])
assert.Nil(t, err)
},
expectCode: connect.CodeInvalidArgument,
expectMsg: "invalid_argument: protocol error: incomplete envelope: unexpected EOF",
expectCode: connect.CodeInternal,
expectMsg: "internal: incomplete envelope: unexpected EOF",
}, {
name: "grpc_partial_frame",
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPC()},
Expand All @@ -2238,8 +2238,8 @@ func TestStreamUnexpectedEOF(t *testing.T) {
_, err := responseWriter.Write(head[:4])
assert.Nil(t, err)
},
expectCode: connect.CodeInvalidArgument,
expectMsg: "invalid_argument: protocol error: incomplete envelope: unexpected EOF",
expectCode: connect.CodeInternal,
expectMsg: "internal: incomplete envelope: unexpected EOF",
}, {
name: "grpc-web_partial_frame",
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPCWeb()},
Expand All @@ -2249,8 +2249,8 @@ func TestStreamUnexpectedEOF(t *testing.T) {
_, err := responseWriter.Write(head[:4])
assert.Nil(t, err)
},
expectCode: connect.CodeInvalidArgument,
expectMsg: "invalid_argument: protocol error: incomplete envelope: unexpected EOF",
expectCode: connect.CodeInternal,
expectMsg: "internal: incomplete envelope: unexpected EOF",
}, {
name: "connect_excess_eof",
options: []connect.ClientOption{connect.WithProtoJSON()},
Expand Down Expand Up @@ -2320,6 +2320,7 @@ func TestStreamUnexpectedEOF(t *testing.T) {
for i := 0; stream.Receive() && i < upTo; i++ {
assert.Equal(t, stream.Msg().Number, 42)
}
t.Log("err:", stream.Err())
assert.NotNil(t, stream.Err())
assert.Equal(t, connect.CodeOf(stream.Err()), testcase.expectCode)
assert.Equal(t, stream.Err().Error(), testcase.expectMsg)
Expand Down
Loading