diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index 98b38ce29..81206e0f8 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -49,7 +49,8 @@ const ( reasonContextCanceled = "context canceled" reasonPeerGoneFromScheduler = "scheduler says client should disconnect" - failedCodeNotSet = 0 + failedReasonNotSet = "unknown" + failedCodeNotSet = 0 ) var errPeerPacketChanged = errors.New("peer packet changed") @@ -344,6 +345,14 @@ func (pt *peerTask) pullPiecesFromPeers(pti Task, cleanUnfinishedFunc func()) { }() // wait first available peer select { + case <-pt.ctx.Done(): + err := pt.ctx.Err() + pt.Errorf("context done due to %s", err) + if pt.failedReason == failedReasonNotSet && err != nil { + pt.failedReason = err.Error() + } + pt.span.AddEvent(fmt.Sprintf("pulling pieces end due to %s", err)) + return case <-pt.peerPacketReady: // preparePieceTasksByPeer func already send piece result with error pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s", diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 966c94b09..6cf63735f 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -176,7 +176,7 @@ func newFilePeerTask(ctx context.Context, readyPieces: NewBitmap(), requestedPieces: NewBitmap(), failedPieceCh: make(chan int32, 4), - failedReason: "unknown", + failedReason: failedReasonNotSet, failedCode: dfcodes.UnknownError, contentLength: -1, totalPiece: -1, diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index ab28b42b2..d58b42439 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -67,7 +67,8 @@ func newStreamPeerTask(ctx context.Context, span.SetAttributes(config.AttributePeerID.String(request.PeerId)) span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) - logger.Debugf("request overview, url: %s, filter: %s, meta: %s, biz: %s", request.Url, request.Filter, request.UrlMeta, request.BizId) + logger.Debugf("request overview, pid: %s, url: %s, filter: %s, meta: %s, biz: %s", + request.PeerId, request.Url, request.Filter, request.UrlMeta, request.BizId) // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) @@ -153,7 +154,7 @@ func newStreamPeerTask(ctx context.Context, readyPieces: NewBitmap(), requestedPieces: NewBitmap(), failedPieceCh: make(chan int32, 4), - failedReason: "unknown", + failedReason: failedReasonNotSet, failedCode: dfcodes.UnknownError, contentLength: -1, totalPiece: -1, @@ -229,7 +230,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin select { case <-s.ctx.Done(): var err error - if s.failedReason != "" { + if s.failedReason != failedReasonNotSet { err = errors.Errorf(s.failedReason) } else { err = errors.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err()) @@ -243,7 +244,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin return nil, attr, err case <-s.done: var err error - if s.failedReason != "" { + if s.failedReason != failedReasonNotSet { err = errors.Errorf(s.failedReason) } else { err = errors.Errorf("stream peer task early done")