Skip to content

Commit

Permalink
feat: implement StatPersistentCachePeerRequest and StatPersistentCach…
Browse files Browse the repository at this point in the history
…eTaskRequest for persistent cache

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Oct 22, 2024
1 parent eb4e101 commit 74fc0e6
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 86 deletions.
3 changes: 3 additions & 0 deletions scheduler/resource/persistentcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ type Build struct {
// Golang version.
GoVersion string `csv:"goVersion"`

// Rust version.
RustVersion string `csv:"rustVersion"`

// Build platform.
Platform string `csv:"platform"`
}
Expand Down
5 changes: 4 additions & 1 deletion scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Peer struct {
// ID is persistent cache peer id.
ID string

// Persistent is whether the peer is persistent.
Persistent bool

// Pieces is finished pieces bitset.
FinishedPieces *bitset.BitSet

Expand Down Expand Up @@ -91,7 +94,7 @@ type Peer struct {
}

// New persistent cache peer instance.
func NewPeer(id, state string, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{
ID: id,
Expand Down
8 changes: 8 additions & 0 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return nil, false
}

persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return nil, false
}

finishedPieces := &bitset.BitSet{}
if err := finishedPieces.UnmarshalBinary([]byte(rawPeer["finished_pieces"])); err != nil {
log.Errorf("unmarshal finished pieces failed: %v", err)
Expand Down Expand Up @@ -123,6 +129,7 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return NewPeer(
rawPeer["id"],
rawPeer["state"],
persistent,
finishedPieces,
blockParents,
task,
Expand Down Expand Up @@ -153,6 +160,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
pipe.HSet(ctx,
pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID),
"id", peer.ID,
"persistent", peer.Persistent,
"finished_pieces", finishedPieces,
"state", peer.FSM.Current(),
"block_parents", blockParents,
Expand Down
9 changes: 5 additions & 4 deletions scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (

"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
"d7y.io/dragonfly/v2/scheduler/storage"
)

// TODO Implement v2 version of the rpc server apis.
// schedulerServerV2 is v2 version of the scheduler grpc server.
type schedulerServerV2 struct {
// Service interface.
Expand All @@ -42,12 +42,13 @@ type schedulerServerV2 struct {
// newSchedulerServerV2 returns v2 version of the scheduler server.
func newSchedulerServerV2(
cfg *config.Config,
resource resource.Resource,
resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
) schedulerv2.SchedulerServer {
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage)}
}

// AnnouncePeer announces peer to scheduler.
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import (
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/job"
"d7y.io/dragonfly/v2/scheduler/metrics"
persistentcache "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
standard "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/rpcserver"

Check failure on line 47 in scheduler/scheduler.go

View workflow job for this annotation

GitHub Actions / Lint

could not import d7y.io/dragonfly/v2/scheduler/rpcserver (-: # d7y.io/dragonfly/v2/scheduler/rpcserver
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
Expand Down
8 changes: 4 additions & 4 deletions scheduler/service/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *reso
}

// storeTask stores a new task or reuses a previous task.
func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task {
func (v *V1) storeTask(_ context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task {
filteredQueryParams := strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator)

task, loaded := v.resource.TaskManager().Load(req.GetTaskId())
Expand All @@ -834,7 +834,7 @@ func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, ty
}

// storeHost stores a new host or reuses a previous host.
func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *resource.Host {
func (v *V1) storeHost(_ context.Context, peerHost *schedulerv1.PeerHost) *resource.Host {
host, loaded := v.resource.HostManager().Load(peerHost.Id)
if !loaded {
options := []resource.HostOption{resource.WithNetwork(resource.Network{
Expand Down Expand Up @@ -866,7 +866,7 @@ func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *res
}

// storePeer stores a new peer or reuses a previous peer.
func (v *V1) storePeer(ctx context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer {
func (v *V1) storePeer(_ context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer {
peer, loaded := v.resource.PeerManager().Load(id)
if !loaded {
options := []resource.PeerOption{}
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
func (v *V1) handleEndOfPiece(ctx context.Context, peer *resource.Peer) {}

// handlePieceSuccess handles successful piece.
func (v *V1) handlePieceSuccess(ctx context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) {
func (v *V1) handlePieceSuccess(_ context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) {
// Distinguish traffic type.
trafficType := commonv2.TrafficType_REMOTE_PEER
if resource.IsPieceBackToSource(pieceResult.DstPid) {
Expand Down
Loading

0 comments on commit 74fc0e6

Please sign in to comment.