Skip to content

Commit

Permalink
refactor(p2p): simplify sendMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jun 6, 2024
1 parent 4c7a15c commit 1e343e6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
7 changes: 5 additions & 2 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,11 @@ func (ex *Exchange[H]) request(
req *p2p_pb.HeaderRequest,
) ([]H, error) {
log.Debugw("requesting peer", "peer", to)
responses, size, duration, err := sendMessage(ctx, ex.host, to, ex.protocolID, req)
ex.metrics.response(ctx, size, duration, err)
start := time.Now()
responses, size, err := sendMessage(ctx, ex.host, to, ex.protocolID, req)
took := time.Since(start)

ex.metrics.response(ctx, size, took, err)
if err != nil {
log.Debugw("err sending request", "peer", to, "err", err)
return nil, err
Expand Down
12 changes: 5 additions & 7 deletions p2p/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -51,11 +50,10 @@ func sendMessage(
to peer.ID,
protocol protocol.ID,
req *p2p_pb.HeaderRequest,
) ([]*p2p_pb.HeaderResponse, uint64, time.Duration, error) {
startTime := time.Now()
) ([]*p2p_pb.HeaderResponse, uint64, error) {
stream, err := host.NewStream(ctx, to, protocol)
if err != nil {
return nil, 0, 0, fmt.Errorf("header/p2p: failed to open a new stream: %w", err)
return nil, 0, fmt.Errorf("header/p2p: failed to open a new stream: %w", err)
}

// set stream deadline from the context deadline.
Expand All @@ -71,12 +69,12 @@ func sendMessage(
_, err = serde.Write(stream, req)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, 0, 0, fmt.Errorf("header/p2p: failed to write a request: %w", err)
return nil, 0, fmt.Errorf("header/p2p: failed to write a request: %w", err)
}

err = stream.CloseWrite()
if err != nil {
return nil, 0, 0, err
return nil, 0, err
}

headers := make([]*p2p_pb.HeaderResponse, 0)
Expand Down Expand Up @@ -112,7 +110,7 @@ func sendMessage(
// reset stream in case of an error
stream.Reset() //nolint:errcheck
}
return headers, totalRespLn, time.Since(startTime), err
return headers, totalRespLn, err
}

// convertStatusCodeToError converts passed status code into an error.
Expand Down
15 changes: 7 additions & 8 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ func (s *session[H]) doRequest(
ctx, cancel := context.WithTimeout(ctx, s.requestTimeout)
defer cancel()

r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
s.metrics.response(ctx, size, duration, err)
start := time.Now()
r, size, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
took := time.Since(start)

s.metrics.response(ctx, size, took, err)
if err != nil {
span.SetStatus(codes.Error, err.Error())
// we should not punish peer at this point and should try to parse responses, despite that error
Expand Down Expand Up @@ -233,7 +236,7 @@ func (s *session[H]) doRequest(
span.SetStatus(codes.Ok, "")

// update peer stats
stat.updateStats(size, duration)
stat.updateStats(size, took)

// ensure that we received the correct amount of headers.
if remainingHeaders > 0 {
Expand Down Expand Up @@ -338,7 +341,7 @@ func processResponses[H header.Header[H]](resps []*p2p_pb.HeaderResponse) ([]H,
return nil, errEmptyResponse
}

hdrs := make([]H, 0)
hdrs := make([]H, 0, len(resps))
for _, resp := range resps {
err := convertStatusCodeToError(resp.StatusCode)
if err != nil {
Expand All @@ -358,9 +361,5 @@ func processResponses[H header.Header[H]](resps []*p2p_pb.HeaderResponse) ([]H,

hdrs = append(hdrs, hdr)
}

if len(hdrs) == 0 {
return nil, header.ErrNotFound
}
return hdrs, nil
}

0 comments on commit 1e343e6

Please sign in to comment.