Skip to content

Commit

Permalink
feat: implement StatPersistentCachePeerRequest and StatPersistentCach…
Browse files Browse the repository at this point in the history
…eTaskRequest for persistent cache

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Oct 22, 2024
1 parent eb4e101 commit 7740d94
Show file tree
Hide file tree
Showing 10 changed files with 598 additions and 443 deletions.
3 changes: 3 additions & 0 deletions scheduler/resource/persistentcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ type Build struct {
// Golang version.
GoVersion string `csv:"goVersion"`

// Rust version.
RustVersion string `csv:"rustVersion"`

// Build platform.
Platform string `csv:"platform"`
}
Expand Down
5 changes: 4 additions & 1 deletion scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Peer struct {
// ID is persistent cache peer id.
ID string

// Persistent is whether the peer is persistent.
Persistent bool

// Pieces is finished pieces bitset.
FinishedPieces *bitset.BitSet

Expand Down Expand Up @@ -91,7 +94,7 @@ type Peer struct {
}

// New persistent cache peer instance.
func NewPeer(id, state string, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{
ID: id,
Expand Down
8 changes: 8 additions & 0 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return nil, false
}

persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return nil, false
}

finishedPieces := &bitset.BitSet{}
if err := finishedPieces.UnmarshalBinary([]byte(rawPeer["finished_pieces"])); err != nil {
log.Errorf("unmarshal finished pieces failed: %v", err)
Expand Down Expand Up @@ -123,6 +129,7 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return NewPeer(
rawPeer["id"],
rawPeer["state"],
persistent,
finishedPieces,
blockParents,
task,
Expand Down Expand Up @@ -153,6 +160,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
pipe.HSet(ctx,
pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID),
"id", peer.ID,
"persistent", peer.Persistent,
"finished_pieces", finishedPieces,
"state", peer.FSM.Current(),
"block_parents", blockParents,
Expand Down
8 changes: 5 additions & 3 deletions scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ import (

"d7y.io/dragonfly/v2/pkg/rpc/scheduler/server"
"d7y.io/dragonfly/v2/scheduler/config"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
)

// New returns a new scheduler server from the given options.
func New(
cfg *config.Config,
resource resource.Resource,
resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
opts ...grpc.ServerOption,
) *grpc.Server {
return server.New(
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage),
opts...)
}
8 changes: 5 additions & 3 deletions scheduler/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (

"d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
)
Expand Down Expand Up @@ -59,11 +60,12 @@ func TestRPCServer_New(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)

svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
tc.expect(t, svr)
})
}
Expand Down
9 changes: 5 additions & 4 deletions scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (

"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
"d7y.io/dragonfly/v2/scheduler/storage"
)

// TODO Implement v2 version of the rpc server apis.
// schedulerServerV2 is v2 version of the scheduler grpc server.
type schedulerServerV2 struct {
// Service interface.
Expand All @@ -42,12 +42,13 @@ type schedulerServerV2 struct {
// newSchedulerServerV2 returns v2 version of the scheduler server.
func newSchedulerServerV2(
cfg *config.Config,
resource resource.Resource,
resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
) schedulerv2.SchedulerServer {
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage)}
}

// AnnouncePeer announces peer to scheduler.
Expand Down
6 changes: 3 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import (
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/job"
"d7y.io/dragonfly/v2/scheduler/metrics"
persistentcache "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
standard "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/rpcserver"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
Expand Down Expand Up @@ -212,7 +212,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(rpc.NewInsecureCredentials()))
}

svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
svr := rpcserver.New(cfg, resource, s.persistentCacheResource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
s.grpcServer = svr

// Initialize metrics.
Expand Down
8 changes: 4 additions & 4 deletions scheduler/service/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *reso
}

// storeTask stores a new task or reuses a previous task.
func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task {
func (v *V1) storeTask(_ context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task {
filteredQueryParams := strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator)

task, loaded := v.resource.TaskManager().Load(req.GetTaskId())
Expand All @@ -834,7 +834,7 @@ func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, ty
}

// storeHost stores a new host or reuses a previous host.
func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *resource.Host {
func (v *V1) storeHost(_ context.Context, peerHost *schedulerv1.PeerHost) *resource.Host {
host, loaded := v.resource.HostManager().Load(peerHost.Id)
if !loaded {
options := []resource.HostOption{resource.WithNetwork(resource.Network{
Expand Down Expand Up @@ -866,7 +866,7 @@ func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *res
}

// storePeer stores a new peer or reuses a previous peer.
func (v *V1) storePeer(ctx context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer {
func (v *V1) storePeer(_ context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer {
peer, loaded := v.resource.PeerManager().Load(id)
if !loaded {
options := []resource.PeerOption{}
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
func (v *V1) handleEndOfPiece(ctx context.Context, peer *resource.Peer) {}

// handlePieceSuccess handles successful piece.
func (v *V1) handlePieceSuccess(ctx context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) {
func (v *V1) handlePieceSuccess(_ context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) {
// Distinguish traffic type.
trafficType := commonv2.TrafficType_REMOTE_PEER
if resource.IsPieceBackToSource(pieceResult.DstPid) {
Expand Down
Loading

0 comments on commit 7740d94

Please sign in to comment.