Skip to content

Commit

Permalink
Merge pull request #162 from philippseith/bugfix/#161_Unread_body_pre…
Browse files Browse the repository at this point in the history
…vents_reuse_of_connection
  • Loading branch information
philippseith authored Mar 20, 2023
2 parents 11e6992 + 077ac80 commit e11f4c1
Show file tree
Hide file tree
Showing 9 changed files with 1,449 additions and 4,645 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v3
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.46
version: v1.51

test:
name: Test and Coverage
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,5 @@ dist
/.vscode
/cover.out
/coverage.txt

signalr_test/package-lock.json
4 changes: 3 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func NewClient(ctx context.Context, options ...func(Party) error) (Client, error
format: "json",
partyBase: newPartyBase(ctx, info, dbg),
lastID: -1,
backoffFactory: func() backoff.BackOff { return backoff.NewExponentialBackOff() },
}
for _, option := range options {
if option != nil {
Expand Down Expand Up @@ -151,11 +152,12 @@ type client struct {
loop *loop
receiver interface{}
lastID int64
backoffFactory func() backoff.BackOff
}

func (c *client) Start() {
c.setState(ClientConnecting)
boff := backoff.NewExponentialBackOff()
boff := c.backoffFactory()
go func() {
for {
c.setErr(nil)
Expand Down
14 changes: 14 additions & 0 deletions clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package signalr
import (
"errors"
"fmt"
"github.com/cenkalti/backoff/v4"
)

// WithConnection sets the Connection of the Client
Expand Down Expand Up @@ -49,6 +50,19 @@ func WithReceiver(receiver interface{}) func(Party) error {
}
}

// WithBackoff sets the backoff.BackOff used for repeated connection attempts in the client.
// See https://pkg.go.dev/github.com/cenkalti/backoff for configuration options.
// If the option is not set, backoff.NewExponentialBackOff() without any further configuration will be used.
func WithBackoff(backoffFactory func() backoff.BackOff) func(party Party) error {
return func(party Party) error {
if client, ok := party.(*client); ok {
client.backoffFactory = backoffFactory
return nil
}
return errors.New("option WithBackoff is client only")
}
}

// TransferFormat sets the transfer format used on the transport. Allowed values are "Text" and "Binary"
func TransferFormat(format string) func(Party) error {
return func(p Party) error {
Expand Down
11 changes: 10 additions & 1 deletion clientoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package signalr

import (
"context"
"github.com/cenkalti/backoff/v4"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -41,7 +42,15 @@ var _ = Describe("Client options", func() {
Expect(err).NotTo(HaveOccurred())
}, 3.0)
})

Context("only WithBackoff is given", func() {
It("NewClient should not fail", func() {
conn := NewNetConnection(context.TODO(), nil)
_, err := NewClient(context.TODO(), WithConnection(conn), WithBackoff(func() backoff.BackOff {
return backoff.NewExponentialBackOff()
}))
Expect(err).NotTo(HaveOccurred())
}, 3.0)
})
})

})
4 changes: 2 additions & 2 deletions clientsseconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func newClientSSEConnection(address string, connectionID string, body io.ReadClo
}
c.sseReader, c.sseWriter = io.Pipe()
go func() {
defer func() { closeResponseBody(body) }()
p := make([]byte, 1<<15)
loop:
for {
Expand All @@ -60,7 +61,6 @@ func newClientSSEConnection(address string, connectionID string, body io.ReadClo
}
}
}
_ = body.Close()
}()
return &c, nil
}
Expand All @@ -82,6 +82,6 @@ func (c *clientSSEConnection) Write(p []byte) (n int, err error) {
if resp.StatusCode != 200 {
err = fmt.Errorf("POST %v -> %v", c.reqURL, resp.Status)
}
_ = resp.Body.Close()
closeResponseBody(resp.Body)
return len(p), err
}
12 changes: 10 additions & 2 deletions httpconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewHTTPConnection(ctx context.Context, address string, options ...func(*htt
}

if httpConn.client == nil {
httpConn.client = &http.Client{}
httpConn.client = http.DefaultClient
}

reqURL, err := url.Parse(address)
Expand All @@ -77,7 +77,7 @@ func NewHTTPConnection(ctx context.Context, address string, options ...func(*htt
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
defer func() { closeResponseBody(resp.Body) }()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("%v %v -> %v", req.Method, req.URL.String(), resp.Status)
Expand Down Expand Up @@ -156,3 +156,11 @@ func NewHTTPConnection(ctx context.Context, address string, options ...func(*htt

return conn, nil
}

// closeResponseBody reads a http response body to the end and closes it
// See https://blog.cubieserver.de/2022/http-connection-reuse-in-go-clients/
// The body needs to be fully read and closed, otherwise the connection will not be reused
func closeResponseBody(body io.ReadCloser) {
_, _ = io.Copy(io.Discard, body)
_ = body.Close()
}
Loading

0 comments on commit e11f4c1

Please sign in to comment.