Skip to content

Commit

Permalink
feat(p2p): cover Exchange with traces (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Feb 1, 2024
1 parent 30ce8bc commit aecd7cf
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 16 deletions.
63 changes: 58 additions & 5 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/go-header"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
)

var log = logging.Logger("header/p2p")
var (
log = logging.Logger("header/p2p")

tracerClient = otel.Tracer("header/p2p-client")
)

// minHeadResponses is the minimum number of headers of the same height
// received from peers to determine the network head. If all trusted peers
Expand Down Expand Up @@ -113,6 +121,8 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// and return the highest one.
func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
log.Debug("requesting head")
ctx, span := tracerClient.Start(ctx, "head")
defer span.End()

reqCtx := ctx
startTime := time.Now()
Expand Down Expand Up @@ -157,8 +167,15 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
)
for _, from := range peers {
go func(from peer.ID) {
_, newSpan := tracerClient.Start(
ctx, "requesting peer",
trace.WithAttributes(attribute.String("peerID", from.String())),
)
defer newSpan.End()

headers, err := ex.request(reqCtx, from, headerReq)
if err != nil {
newSpan.SetStatus(codes.Error, err.Error())
log.Errorw("head request to peer failed", "peer", from, "err", err)
headerRespCh <- zero
return
Expand All @@ -171,6 +188,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
if errors.As(err, &verErr) && verErr.SoftFailure {
log.Debugw("received head from tracked peer that soft-failed verification",
"tracked peer", from, "err", err)
newSpan.SetStatus(codes.Error, err.Error())
headerRespCh <- headers[0]
return
}
Expand All @@ -180,10 +198,12 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
}
logF("verifying head received from tracked peer", "tracked peer", from,
"height", headers[0].Height(), "err", err)
newSpan.SetStatus(codes.Error, err.Error())
headerRespCh <- zero
return
}
}
newSpan.SetStatus(codes.Ok, "")
// request ensures that the result slice will have at least one Header
headerRespCh <- headers[0]
}(from)
Expand All @@ -206,22 +226,25 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
status = headStatusTimeout
}

span.SetStatus(codes.Error, fmt.Sprintf("head request %s", status))
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status)
return zero, ctx.Err()
case <-ex.ctx.Done():
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled)
span.SetStatus(codes.Error, "exchange client stopped")
return zero, ex.ctx.Err()
}
}

head, err := bestHead[H](headers)
if err != nil {
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders)
span.SetStatus(codes.Error, headStatusNoHeaders)
return zero, err
}

ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk)
span.SetStatus(codes.Ok, "")
return head, nil
}

Expand All @@ -230,10 +253,17 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
// thereafter.
func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
log.Debugw("requesting header", "height", height)
ctx, span := tracerClient.Start(ctx, "get-by-height",
trace.WithAttributes(
attribute.Int64("height", int64(height)),
))
defer span.End()
var zero H
// sanity check height
if height == 0 {
return zero, fmt.Errorf("specified request height must be greater than 0")
err := fmt.Errorf("specified request height must be greater than 0")
span.SetStatus(codes.Error, err.Error())
return zero, err
}
// create request
req := &p2p_pb.HeaderRequest{
Expand All @@ -242,8 +272,10 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error
}
headers, err := ex.performRequest(ctx, req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return zero, err
}
span.SetStatus(codes.Ok, "")
return headers[0], nil
}

Expand All @@ -254,19 +286,36 @@ func (ex *Exchange[H]) GetRangeByHeight(
from H,
to uint64,
) ([]H, error) {
ctx, span := tracerClient.Start(ctx, "get-range-by-height",
trace.WithAttributes(
attribute.Int64("from", int64(from.Height())),
attribute.Int64("to", int64(to)),
))
defer span.End()
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
amount := to - (from.Height() + 1)
return session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
result, err := session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetStatus(codes.Ok, "")
return result, nil
}

// Get performs a request for the Header by the given hash corresponding
// to the RawHeader. Note that the Header must be verified thereafter.
func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
log.Debugw("requesting header", "hash", hash.String())
ctx, span := tracerClient.Start(ctx, "get-by-hash",
trace.WithAttributes(
attribute.String("hash", hash.String()),
))
defer span.End()
var zero H
// create request
req := &p2p_pb.HeaderRequest{
Expand All @@ -275,12 +324,16 @@ func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
}
headers, err := ex.performRequest(ctx, req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return zero, err
}

if !bytes.Equal(headers[0].Hash(), hash) {
return zero, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
err = fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
span.SetStatus(codes.Error, err.Error())
return zero, err
}
span.SetStatus(codes.Ok, "")
return headers[0], nil
}

Expand Down
8 changes: 4 additions & 4 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracer = otel.Tracer("header/server")
tracerServ = otel.Tracer("header/server")
)

// ExchangeServer represents the server-side component for
Expand Down Expand Up @@ -173,7 +173,7 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
log.Debugw("server: handling header request", "hash", header.Hash(hash).String())
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
ctx, span := tracer.Start(ctx, "request-by-hash", trace.WithAttributes(
ctx, span := tracerServ.Start(ctx, "request-by-hash", trace.WithAttributes(
attribute.String("hash", header.Hash(hash).String()),
))
defer span.End()
Expand Down Expand Up @@ -204,7 +204,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
}

startTime := time.Now()
ctx, span := tracer.Start(serv.ctx, "request-range", trace.WithAttributes(
ctx, span := tracerServ.Start(serv.ctx, "request-range", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(to))))
defer span.End()
Expand Down Expand Up @@ -273,7 +273,7 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
log.Debug("server: handling head request")
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
ctx, span := tracer.Start(ctx, "request-head")
ctx, span := tracerServ.Start(ctx, "request-head")
defer span.End()

head, err := serv.store.Head(ctx)
Expand Down
46 changes: 39 additions & 7 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/go-header"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
)

var (
tracerSession = otel.Tracer("header/p2p-session")
)

// errEmptyResponse means that server side closes the connection without sending at least 1
// response.
var errEmptyResponse = errors.New("empty response")
Expand Down Expand Up @@ -77,9 +85,15 @@ func newSession[H header.Header[H]](
func (s *session[H]) getRangeByHeight(
ctx context.Context,
from, amount, headersPerPeer uint64,
) ([]H, error) {
) (_ []H, err error) {
log.Debugw("requesting headers", "from", from, "to", from+amount-1) // -1 need to exclude to+1 height

ctx, span := tracerSession.Start(ctx, "get-range-by-height", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(from+amount-1)),
))
defer span.End()

requests := prepareRequests(from, amount, headersPerPeer)
result := make(chan []H, len(requests))
s.reqCh = make(chan *p2p_pb.HeaderRequest, len(requests))
Expand All @@ -94,8 +108,11 @@ LOOP:
for {
select {
case <-s.ctx.Done():
return nil, errors.New("header/p2p: exchange is closed")
err = errors.New("header/p2p: exchange is closed")
span.SetStatus(codes.Error, err.Error())
return nil, err
case <-ctx.Done():
span.SetStatus(codes.Error, ctx.Err().Error())
return nil, ctx.Err()
case res := <-result:
headers = append(headers, res...)
Expand All @@ -113,6 +130,7 @@ LOOP:
"from", headers[0].Height(),
"to", headers[len(headers)-1].Height(),
)
span.SetStatus(codes.Ok, "")
return headers, nil
}

Expand Down Expand Up @@ -152,19 +170,28 @@ func (s *session[H]) doRequest(
req *p2p_pb.HeaderRequest,
headers chan []H,
) {
ctx, span := tracerSession.Start(ctx, "request-headers-from-peer", trace.WithAttributes(
attribute.String("peerID", stat.peerID.String()),
attribute.Int64("from", int64(req.GetOrigin())),
attribute.Int64("amount", int64(req.Amount)),
))
defer span.End()

ctx, cancel := context.WithTimeout(ctx, s.requestTimeout)
defer cancel()

r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
s.metrics.response(ctx, size, duration, err)
if err != nil {
span.SetStatus(codes.Error, err.Error())
// we should not punish peer at this point and should try to parse responses, despite that error
// was received.
log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err)
}

h, err := s.processResponses(r)
if err != nil {
span.SetStatus(codes.Error, err.Error())
logFn := log.Errorw

switch err {
Expand Down Expand Up @@ -195,21 +222,26 @@ func (s *session[H]) doRequest(
"requestedAmount", req.Amount,
)

remainingHeaders := req.Amount - uint64(len(h))

span.SetStatus(codes.Ok, "")

// update peer stats
stat.updateStats(size, duration)

responseLn := uint64(len(h))
// ensure that we received the correct amount of headers.
if responseLn < req.Amount {
from := h[responseLn-1].Height()
amount := req.Amount - responseLn
if remainingHeaders > 0 {
span.AddEvent("remaining headers", trace.WithAttributes(
attribute.Int64("amount", int64(remainingHeaders))),
)

from := h[uint64(len(h))-1].Height()
select {
case <-s.ctx.Done():
return
// create a new request with the remaining headers.
// prepareRequests will return a slice with 1 element at this point
case s.reqCh <- prepareRequests(from+1, amount, req.Amount)[0]:
case s.reqCh <- prepareRequests(from+1, remainingHeaders, req.Amount)[0]:
log.Debugw("sending additional request to get remaining headers")
}
}
Expand Down

0 comments on commit aecd7cf

Please sign in to comment.