-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate envelope writing from marshalling #586
Changes from all commits
8098575
4711456
7c05a40
739164e
c710296
9ddd904
9c11e78
2ab6cea
b6e4587
623056f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,10 @@ func newCompressionPool( | |
} | ||
} | ||
|
||
func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readMaxBytes int64) *Error { | ||
func (c *compressionPool) Decompress(pool *bufferPool, src *bytes.Buffer, readMaxBytes int64) *Error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Compression now takes a pool and swaps with the source on success. |
||
tmp := pool.Get() | ||
defer pool.Put(tmp) | ||
|
||
decompressor, err := c.getDecompressor(src) | ||
if err != nil { | ||
return errorf(CodeInvalidArgument, "get decompressor: %w", err) | ||
|
@@ -87,7 +90,7 @@ func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readM | |
if readMaxBytes > 0 && readMaxBytes < math.MaxInt64 { | ||
reader = io.LimitReader(decompressor, readMaxBytes+1) | ||
} | ||
bytesRead, err := dst.ReadFrom(reader) | ||
bytesRead, err := tmp.ReadFrom(reader) | ||
if err != nil { | ||
_ = c.putDecompressor(decompressor) | ||
return errorf(CodeInvalidArgument, "decompress: %w", err) | ||
|
@@ -103,11 +106,15 @@ func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readM | |
if err := c.putDecompressor(decompressor); err != nil { | ||
return errorf(CodeUnknown, "recycle decompressor: %w", err) | ||
} | ||
*tmp, *src = *src, *tmp // swap buffers | ||
return nil | ||
} | ||
|
||
func (c *compressionPool) Compress(dst *bytes.Buffer, src *bytes.Buffer) *Error { | ||
compressor, err := c.getCompressor(dst) | ||
func (c *compressionPool) Compress(pool *bufferPool, src *bytes.Buffer) *Error { | ||
tmp := pool.Get() | ||
defer pool.Put(tmp) | ||
|
||
compressor, err := c.getCompressor(tmp) | ||
if err != nil { | ||
return errorf(CodeUnknown, "get compressor: %w", err) | ||
} | ||
|
@@ -118,6 +125,7 @@ func (c *compressionPool) Compress(dst *bytes.Buffer, src *bytes.Buffer) *Error | |
if err := c.putCompressor(compressor); err != nil { | ||
return errorf(CodeInternal, "recycle compressor: %w", err) | ||
} | ||
*tmp, *src = *src, *tmp // swap buffers | ||
return nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -707,7 +707,7 @@ func TestGRPCMissingTrailersError(t *testing.T) { | |
assert.Equal(t, connectErr.Code(), connect.CodeInternal) | ||
assert.True( | ||
t, | ||
strings.HasSuffix(connectErr.Message(), "protocol error: no Grpc-Status trailer: unexpected EOF"), | ||
strings.HasSuffix(connectErr.Message(), "protocol error: no Grpc-Status trailer"), | ||
) | ||
} | ||
|
||
|
@@ -2165,7 +2165,7 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: protocol error: no Grpc-Status trailer: unexpected EOF", | ||
expectMsg: "internal: protocol error: no Grpc-Status trailer", | ||
}, { | ||
name: "grpc-web_missing_end", | ||
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPCWeb()}, | ||
|
@@ -2178,7 +2178,7 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: protocol error: no Grpc-Status trailer: unexpected EOF", | ||
expectMsg: "internal: protocol error: no Grpc-Status trailer", | ||
}, { | ||
name: "connect_partial_payload", | ||
options: []connect.ClientOption{connect.WithProtoJSON()}, | ||
|
@@ -2190,8 +2190,8 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
_, err = responseWriter.Write(payload[:len(payload)-1]) | ||
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInvalidArgument, | ||
expectMsg: fmt.Sprintf("invalid_argument: protocol error: promised %d bytes in enveloped message, got %d bytes", len(payload), len(payload)-1), | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: incomplete envelope: unexpected EOF", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you check what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Starts as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What direction is this? Returning "internal" from the server, when the client has sent an invalid stream, seems very wrong. Internal should be reserved for internal server errors, not errors in the client (akin to 5xx vs 4xx). You generally want exception reporting/alerting for internal server errors since they indicate a likely bug in your code; you do not want any such alerting for client errors, because then a client can induce alerts in your system w/ carefully crafted (invalid) requests. So I agree that this should be "internal" only in cases where it's the server that sent the invalid message. |
||
}, { | ||
name: "grpc_partial_payload", | ||
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPC()}, | ||
|
@@ -2203,8 +2203,8 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
_, err = responseWriter.Write(payload[:len(payload)-1]) | ||
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInvalidArgument, | ||
expectMsg: fmt.Sprintf("invalid_argument: protocol error: promised %d bytes in enveloped message, got %d bytes", len(payload), len(payload)-1), | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: incomplete envelope: unexpected EOF", | ||
}, { | ||
name: "grpc-web_partial_payload", | ||
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPCWeb()}, | ||
|
@@ -2216,8 +2216,8 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
_, err = responseWriter.Write(payload[:len(payload)-1]) | ||
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInvalidArgument, | ||
expectMsg: fmt.Sprintf("invalid_argument: protocol error: promised %d bytes in enveloped message, got %d bytes", len(payload), len(payload)-1), | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: incomplete envelope: unexpected EOF", | ||
}, { | ||
name: "connect_partial_frame", | ||
options: []connect.ClientOption{connect.WithProtoJSON()}, | ||
|
@@ -2227,8 +2227,8 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
_, err := responseWriter.Write(head[:4]) | ||
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInvalidArgument, | ||
expectMsg: "invalid_argument: protocol error: incomplete envelope: unexpected EOF", | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: incomplete envelope: unexpected EOF", | ||
}, { | ||
name: "grpc_partial_frame", | ||
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPC()}, | ||
|
@@ -2238,8 +2238,8 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
_, err := responseWriter.Write(head[:4]) | ||
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInvalidArgument, | ||
expectMsg: "invalid_argument: protocol error: incomplete envelope: unexpected EOF", | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: incomplete envelope: unexpected EOF", | ||
}, { | ||
name: "grpc-web_partial_frame", | ||
options: []connect.ClientOption{connect.WithProtoJSON(), connect.WithGRPCWeb()}, | ||
|
@@ -2249,8 +2249,8 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
_, err := responseWriter.Write(head[:4]) | ||
assert.Nil(t, err) | ||
}, | ||
expectCode: connect.CodeInvalidArgument, | ||
expectMsg: "invalid_argument: protocol error: incomplete envelope: unexpected EOF", | ||
expectCode: connect.CodeInternal, | ||
expectMsg: "internal: incomplete envelope: unexpected EOF", | ||
}, { | ||
name: "connect_excess_eof", | ||
options: []connect.ClientOption{connect.WithProtoJSON()}, | ||
|
@@ -2320,6 +2320,7 @@ func TestStreamUnexpectedEOF(t *testing.T) { | |
for i := 0; stream.Receive() && i < upTo; i++ { | ||
assert.Equal(t, stream.Msg().Number, 42) | ||
} | ||
t.Log("err:", stream.Err()) | ||
assert.NotNil(t, stream.Err()) | ||
assert.Equal(t, connect.CodeOf(stream.Err()), testcase.expectCode) | ||
assert.Equal(t, stream.Err().Error(), testcase.expectMsg) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bytes.MinRead
== 512 but more for showing intent, we want to avoid another allocation on first read.