From f7561671714d14f9fa76940f9465119a6e65b525 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 21 Oct 2024 20:29:48 +0800 Subject: [PATCH] feat: add peer manager for persistent cache task Signed-off-by: Gaius --- internal/dflog/logger.go | 8 +- pkg/redis/redis.go | 11 + .../resource/persistentcache/host_manager.go | 104 ++++---- scheduler/resource/persistentcache/peer.go | 1 + .../resource/persistentcache/peer_manager.go | 237 ++++++++++++++++++ scheduler/resource/persistentcache/task.go | 2 +- .../resource/persistentcache/task_manager.go | 44 ++-- 7 files changed, 328 insertions(+), 79 deletions(-) create mode 100644 scheduler/resource/persistentcache/peer_manager.go diff --git a/internal/dflog/logger.go b/internal/dflog/logger.go index 8ef75d32f08..8df4571308b 100644 --- a/internal/dflog/logger.go +++ b/internal/dflog/logger.go @@ -141,15 +141,15 @@ func WithTask(taskID, url string) *SugaredLoggerOnWith { } } -func WithPersistentCacheTask(taskID string) *SugaredLoggerOnWith { +func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ - withArgs: []any{"taskID", taskID}, + withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip}, } } -func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith { +func WithPeerID(peerID string) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ - withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip}, + withArgs: []any{"peerID", peerID}, } } diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index d2d741f84f7..260258b5e9f 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -141,11 +141,17 @@ func MakePersistentCacheTasksInScheduler(schedulerClusterID uint) string { return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheTasksNamespace)) } +// MakePersistentCachePeersOfPersistentCacheTaskInScheduler make persistent cache peers of persistent cache task in scheduler. +func MakePersistentCachePeersOfPersistentCacheTaskInScheduler(schedulerClusterID uint, taskID string) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID, PersistentCachePeersNamespace)) +} + // MakePersistentCachePeerKeyInScheduler make persistent cache peer key in scheduler. func MakePersistentCachePeerKeyInScheduler(schedulerClusterID uint, peerID string) string { return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID)) } +// MakePersistentCachePeersInScheduler make persistent cache peers in scheduler. func MakePersistentCachePeersInScheduler(schedulerClusterID uint) string { return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCachePeersNamespace)) } @@ -159,3 +165,8 @@ func MakePersistentCacheHostKeyInScheduler(schedulerClusterID uint, hostID strin func MakePersistentCacheHostsInScheduler(schedulerClusterID uint) string { return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheHostsNamespace)) } + +// MakePersistentCachePeersOfPersistentCacheHostInScheduler make persistent cache peers of persistent cache host in scheduler. +func MakePersistentCachePeersOfPersistentCacheHostInScheduler(schedulerClusterID uint, hostID string) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCacheHostsNamespace, hostID, PersistentCachePeersNamespace)) +} diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go index 3d1caad7d1c..20d52706480 100644 --- a/scheduler/resource/persistentcache/host_manager.go +++ b/scheduler/resource/persistentcache/host_manager.go @@ -20,7 +20,6 @@ package persistentcache import ( "context" - "fmt" "strconv" "time" @@ -34,13 +33,13 @@ import ( // HostManager is the interface used for host manager. type HostManager interface { - // Load returns host for a key. + // Load returns host by a key. Load(context.Context, string) (*Host, bool) // Store sets host. Store(context.Context, *Host) - // Delete deletes host for a key. + // Delete deletes host by a key. Delete(context.Context, string) // LoadAll returns all hosts. @@ -63,140 +62,141 @@ func newHostManager(cfg *config.Config, rdb redis.UniversalClient) HostManager { return &hostManager{config: cfg, rdb: rdb} } -// Load returns host for a key. +// Load returns host by a key. func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { + log := logger.WithHostID(hostID) rawHost, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result() if err != nil { - fmt.Println("getting host failed from Redis:", err) + log.Errorf("getting host failed from redis: %v", err) return nil, false } // Set integer fields from raw host. port, err := strconv.ParseInt(rawHost["port"], 10, 32) if err != nil { - fmt.Println("parsing port failed:", err) + log.Errorf("parsing port failed: %v", err) return nil, false } downloadPort, err := strconv.ParseInt(rawHost["download_port"], 10, 32) if err != nil { - fmt.Println("parsing download port failed:", err) + log.Errorf("parsing download port failed: %v", err) return nil, false } concurrentUploadLimit, err := strconv.ParseInt(rawHost["concurrent_upload_limit"], 10, 32) if err != nil { - fmt.Println("parsing concurrent upload limit failed:", err) + log.Errorf("parsing concurrent upload limit failed: %v", err) return nil, false } concurrentUploadCount, err := strconv.ParseInt(rawHost["concurrent_upload_count"], 10, 32) if err != nil { - fmt.Println("parsing concurrent upload count failed:", err) + log.Errorf("parsing concurrent upload count failed: %v", err) return nil, false } uploadCount, err := strconv.ParseInt(rawHost["upload_count"], 10, 64) if err != nil { - fmt.Println("parsing upload count failed:", err) + log.Errorf("parsing upload count failed: %v", err) return nil, false } uploadFailedCount, err := strconv.ParseInt(rawHost["upload_failed_count"], 10, 64) if err != nil { - fmt.Println("parsing upload failed count failed:", err) + log.Errorf("parsing upload failed count failed: %v", err) return nil, false } // Set boolean fields from raw host. diableShared, err := strconv.ParseBool(rawHost["disable_shared"]) if err != nil { - fmt.Println("parsing disable shared failed:", err) + log.Errorf("parsing disable shared failed: %v", err) return nil, false } // Set cpu fields from raw host. cpuLogicalCount, err := strconv.ParseUint(rawHost["cpu_logical_count"], 10, 32) if err != nil { - fmt.Println("parsing cpu logical count failed:", err) + log.Errorf("parsing cpu logical count failed: %v", err) return nil, false } cpuPhysicalCount, err := strconv.ParseUint(rawHost["cpu_physical_count"], 10, 32) if err != nil { - fmt.Println("parsing cpu physical count failed:", err) + log.Errorf("parsing cpu physical count failed: %v", err) return nil, false } cpuPercent, err := strconv.ParseFloat(rawHost["cpu_percent"], 64) if err != nil { - fmt.Println("parsing cpu percent failed:", err) + log.Errorf("parsing cpu percent failed: %v", err) return nil, false } cpuProcessPercent, err := strconv.ParseFloat(rawHost["cpu_processe_percent"], 64) if err != nil { - fmt.Println("parsing cpu process percent failed:", err) + log.Errorf("parsing cpu process percent failed: %v", err) return nil, false } cpuTimesUser, err := strconv.ParseFloat(rawHost["cpu_times_user"], 64) if err != nil { - fmt.Println("parsing cpu times user failed:", err) + log.Errorf("parsing cpu times user failed: %v", err) return nil, false } cpuTimesSystem, err := strconv.ParseFloat(rawHost["cpu_times_system"], 64) if err != nil { - fmt.Println("parsing cpu times system failed:", err) + log.Errorf("parsing cpu times system failed: %v", err) return nil, false } cpuTimesIdle, err := strconv.ParseFloat(rawHost["cpu_times_idle"], 64) if err != nil { - fmt.Println("parsing cpu times idle failed:", err) + log.Errorf("parsing cpu times idle failed: %v", err) return nil, false } cpuTimesNice, err := strconv.ParseFloat(rawHost["cpu_times_nice"], 64) if err != nil { - fmt.Println("parsing cpu times nice failed:", err) + log.Errorf("parsing cpu times nice failed: %v", err) return nil, false } cpuTimesIowait, err := strconv.ParseFloat(rawHost["cpu_times_iowait"], 64) if err != nil { - fmt.Println("parsing cpu times iowait failed:", err) + log.Errorf("parsing cpu times iowait failed: %v", err) return nil, false } cpuTimesIrq, err := strconv.ParseFloat(rawHost["cpu_times_irq"], 64) if err != nil { - fmt.Println("parsing cpu times irq failed:", err) + log.Errorf("parsing cpu times irq failed: %v", err) return nil, false } cpuTimesSoftirq, err := strconv.ParseFloat(rawHost["cpu_times_softirq"], 64) if err != nil { - fmt.Println("parsing cpu times softirq failed:", err) + log.Errorf("parsing cpu times softirq failed: %v", err) return nil, false } cpuTimesSteal, err := strconv.ParseFloat(rawHost["cpu_times_steal"], 64) if err != nil { - fmt.Println("parsing cpu times steal failed:", err) + log.Errorf("parsing cpu times steal failed: %v", err) return nil, false } cpuTimesGuest, err := strconv.ParseFloat(rawHost["cpu_times_guest"], 64) if err != nil { - fmt.Println("parsing cpu times guest failed:", err) + log.Errorf("parsing cpu times guest failed: %v", err) return nil, false } cpuTimesGuestNice, err := strconv.ParseFloat(rawHost["cpu_times_guest_nice"], 64) if err != nil { - fmt.Println("parsing cpu times guest nice failed:", err) + log.Errorf("parsing cpu times guest nice failed: %v", err) return nil, false } @@ -222,37 +222,37 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { // Set memory fields from raw host. memoryTotal, err := strconv.ParseUint(rawHost["memory_total"], 10, 64) if err != nil { - fmt.Println("parsing memory total failed:", err) + log.Errorf("parsing memory total failed: %v", err) return nil, false } memoryAvailable, err := strconv.ParseUint(rawHost["memory_available"], 10, 64) if err != nil { - fmt.Println("parsing memory available failed:", err) + log.Errorf("parsing memory available failed: %v", err) return nil, false } memoryUsed, err := strconv.ParseUint(rawHost["memory_used"], 10, 64) if err != nil { - fmt.Println("parsing memory used failed:", err) + log.Errorf("parsing memory used failed: %v", err) return nil, false } memoryUsedPercent, err := strconv.ParseFloat(rawHost["memory_used_percent"], 64) if err != nil { - fmt.Println("parsing memory used percent failed:", err) + log.Errorf("parsing memory used percent failed: %v", err) return nil, false } memoryProcessUsedPercent, err := strconv.ParseFloat(rawHost["memory_processe_used_percent"], 64) if err != nil { - fmt.Println("parsing memory process used percent failed:", err) + log.Errorf("parsing memory process used percent failed: %v", err) return nil, false } memoryFree, err := strconv.ParseUint(rawHost["memory_free"], 10, 64) if err != nil { - fmt.Println("parsing memory free failed:", err) + log.Errorf("parsing memory free failed: %v", err) return nil, false } @@ -268,37 +268,37 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { // Set network fields from raw host. networkTCPConnectionCount, err := strconv.ParseUint(rawHost["network_tcp_connection_count"], 10, 32) if err != nil { - fmt.Println("parsing network tcp connection count failed:", err) + log.Errorf("parsing network tcp connection count failed: %v", err) return nil, false } networkUploadTCPConnectionCount, err := strconv.ParseUint(rawHost["network_upload_tcp_connection_count"], 10, 32) if err != nil { - fmt.Println("parsing network upload tcp connection count failed:", err) + log.Errorf("parsing network upload tcp connection count failed: %v", err) return nil, false } downloadRate, err := strconv.ParseUint(rawHost["network_download_rate"], 10, 64) if err != nil { - fmt.Println("parsing download rate failed:", err) + log.Errorf("parsing download rate failed: %v", err) return nil, false } downloadRateLimit, err := strconv.ParseUint(rawHost["network_download_rate_limit"], 10, 64) if err != nil { - fmt.Println("parsing download rate limit failed:", err) + log.Errorf("parsing download rate limit failed: %v", err) return nil, false } uploadRate, err := strconv.ParseUint(rawHost["network_upload_rate"], 10, 64) if err != nil { - fmt.Println("parsing upload rate failed:", err) + log.Errorf("parsing upload rate failed: %v", err) return nil, false } uploadRateLimit, err := strconv.ParseUint(rawHost["network_upload_rate_limit"], 10, 64) if err != nil { - fmt.Println("parsing upload rate limit failed:", err) + log.Errorf("parsing upload rate limit failed: %v", err) return nil, false } @@ -316,49 +316,49 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { // Set disk fields from raw host. diskTotal, err := strconv.ParseUint(rawHost["disk_total"], 10, 64) if err != nil { - fmt.Println("parsing disk total failed:", err) + log.Errorf("parsing disk total failed: %v", err) return nil, false } diskFree, err := strconv.ParseUint(rawHost["disk_free"], 10, 64) if err != nil { - fmt.Println("parsing disk free failed:", err) + log.Errorf("parsing disk free failed: %v", err) return nil, false } diskUsed, err := strconv.ParseUint(rawHost["disk_used"], 10, 64) if err != nil { - fmt.Println("parsing disk used failed:", err) + log.Errorf("parsing disk used failed: %v", err) return nil, false } diskUsedPercent, err := strconv.ParseFloat(rawHost["disk_used_percent"], 64) if err != nil { - fmt.Println("parsing disk used percent failed:", err) + log.Errorf("parsing disk used percent failed: %v", err) return nil, false } diskInodesTotal, err := strconv.ParseUint(rawHost["disk_inodes_total"], 10, 64) if err != nil { - fmt.Println("parsing disk inodes total failed:", err) + log.Errorf("parsing disk inodes total failed: %v", err) return nil, false } diskInodesUsed, err := strconv.ParseUint(rawHost["disk_inodes_used"], 10, 64) if err != nil { - fmt.Println("parsing disk inodes used failed:", err) + log.Errorf("parsing disk inodes used failed: %v", err) return nil, false } diskInodesFree, err := strconv.ParseUint(rawHost["disk_inodes_free"], 10, 64) if err != nil { - fmt.Println("parsing disk inodes free failed:", err) + log.Errorf("parsing disk inodes free failed: %v", err) return nil, false } diskInodesUsedPercent, err := strconv.ParseFloat(rawHost["disk_inodes_used_percent"], 64) if err != nil { - fmt.Println("parsing disk inodes used percent failed:", err) + log.Errorf("parsing disk inodes used percent failed: %v", err) return nil, false } @@ -383,19 +383,19 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { // Set time fields from raw host. announceInterval, err := strconv.ParseInt(rawHost["announce_interval"], 10, 32) if err != nil { - fmt.Println("parsing announce interval failed:", err) + log.Errorf("parsing announce interval failed: %v", err) return nil, false } createdAt, err := time.Parse(time.RFC3339, rawHost["created_at"]) if err != nil { - fmt.Println("parsing created at failed:", err) + log.Errorf("parsing created at failed: %v", err) return nil, false } updatedAt, err := time.Parse(time.RFC3339, rawHost["updated_at"]) if err != nil { - fmt.Println("parsing updated at failed:", err) + log.Errorf("parsing updated at failed: %v", err) return nil, false } @@ -493,7 +493,7 @@ func (t *hostManager) Store(ctx context.Context, host *Host) { "updated_at", host.UpdatedAt.Format(time.RFC3339)) } -// Delete deletes host for a key. +// Delete deletes host by a key. func (t *hostManager) Delete(ctx context.Context, hostID string) { t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)) } @@ -513,14 +513,14 @@ func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { hostKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(t.config.Manager.SchedulerClusterID), 10).Result() if err != nil { - logger.Warn("scan hosts failed") + logger.Error("scan hosts failed") return nil, err } for _, hostKey := range hostKeys { host, loaded := t.Load(ctx, hostKey) if !loaded { - logger.WithHostID(hostKey).Warn("load host failed") + logger.WithHostID(hostKey).Error("load host failed") continue } diff --git a/scheduler/resource/persistentcache/peer.go b/scheduler/resource/persistentcache/peer.go index adc7384cd02..c9e1850c551 100644 --- a/scheduler/resource/persistentcache/peer.go +++ b/scheduler/resource/persistentcache/peer.go @@ -129,6 +129,7 @@ func NewPeer(id, state string, finishedPieces *bitset.BitSet, blockParents []str }, }, ) + p.FSM.SetState(state) return p } diff --git a/scheduler/resource/persistentcache/peer_manager.go b/scheduler/resource/persistentcache/peer_manager.go new file mode 100644 index 00000000000..0f9fbcd8a79 --- /dev/null +++ b/scheduler/resource/persistentcache/peer_manager.go @@ -0,0 +1,237 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package persistentcache + +import ( + "context" + "encoding/json" + "errors" + "strconv" + "time" + + "github.com/bits-and-blooms/bitset" + "github.com/redis/go-redis/v9" + + logger "d7y.io/dragonfly/v2/internal/dflog" + pkgredis "d7y.io/dragonfly/v2/pkg/redis" + "d7y.io/dragonfly/v2/scheduler/config" +) + +// PeerManager is the interface used for peer manager. +type PeerManager interface { + // Load returns peer by a key. + Load(context.Context, string) (*Peer, bool) + + // Store sets peer. + Store(context.Context, *Peer) error + + // Delete deletes peer by a key. + Delete(context.Context, string) error + + // LoadAll returns all peers. + LoadAll(context.Context) ([]*Peer, error) +} + +// peerManager contains content for peer manager. +type peerManager struct { + // Config is scheduler config. + config *config.Config + + // taskManager is the manager of task. + taskManager TaskManager + + // hostManager is the manager of host. + hostManager HostManager + + // Redis universal client interface. + rdb redis.UniversalClient +} + +// TODO: Use newPeerManager for resource management. +// New peer manager interface. +// nolint +func newPeerManager(cfg *config.Config, taskManager TaskManager, hostManager HostManager, rdb redis.UniversalClient) PeerManager { + return &peerManager{config: cfg, taskManager: taskManager, hostManager: hostManager, rdb: rdb} +} + +// Load returns persistent cache peer by a key. +func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) { + log := logger.WithPeerID(peerID) + rawPeer, err := p.rdb.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result() + if err != nil { + log.Errorf("getting peer failed from redis: %v", err) + return nil, false + } + + finishedPieces := &bitset.BitSet{} + if err := finishedPieces.UnmarshalBinary([]byte(rawPeer["finished_pieces"])); err != nil { + log.Errorf("unmarshal finished pieces failed: %v", err) + return nil, false + } + + blockParents := []string{} + if err := json.Unmarshal([]byte(rawPeer["block_parents"]), &blockParents); err != nil { + log.Errorf("unmarshal block parents failed: %v", err) + return nil, false + } + + // Set time fields from raw task. + cost, err := strconv.ParseInt(rawPeer["cost"], 10, 32) + if err != nil { + log.Errorf("parsing cost failed: %v", err) + return nil, false + } + + createdAt, err := time.Parse(time.RFC3339, rawPeer["created_at"]) + if err != nil { + log.Errorf("parsing created at failed: %v", err) + return nil, false + } + + updatedAt, err := time.Parse(time.RFC3339, rawPeer["updated_at"]) + if err != nil { + log.Errorf("parsing updated at failed: %v", err) + return nil, false + } + + host, loaded := p.hostManager.Load(ctx, rawPeer["host_id"]) + if !loaded { + log.Errorf("host not found") + return nil, false + } + + task, loaded := p.taskManager.Load(ctx, rawPeer["task_id"]) + if !loaded { + log.Errorf("task not found") + return nil, false + } + + return NewPeer( + rawPeer["id"], + rawPeer["state"], + finishedPieces, + blockParents, + task, + host, + time.Duration(cost), + createdAt, + updatedAt, + logger.WithPeer(host.ID, task.ID, rawPeer["id"]), + ), true +} + +// Store sets persistent cache peer. +func (p *peerManager) Store(ctx context.Context, peer *Peer) error { + finishedPieces, err := peer.FinishedPieces.MarshalBinary() + if err != nil { + peer.Log.Errorf("marshal finished pieces failed: %v", err) + return err + } + + blockParents, err := json.Marshal(peer.BlockParents) + if err != nil { + peer.Log.Errorf("marshal block parents failed: %v", err) + return err + } + + if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + // Store peer information and set expiration. + pipe.HSet(ctx, + pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), + "id", peer.ID, + "finished_pieces", finishedPieces, + "state", peer.FSM.Current(), + "block_parents", blockParents, + "task_id", peer.Task.ID, + "host_id", peer.Host.ID, + "ttl", peer.Cost, + "created_at", peer.CreatedAt.Format(time.RFC3339), + "updated_at", peer.UpdatedAt.Format(time.RFC3339)) + pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), peer.Task.TTL) + + // Store the association with task and set expiration. + pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID) + pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.Task.TTL) + + // Store the association with host. + pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.ID) + return nil + }); err != nil { + peer.Log.Errorf("store peer failed: %v", err) + return err + } + + return nil +} + +// Delete deletes persistent cache peer by a key, and it will delete the association with task and host at the same time. +func (p *peerManager) Delete(ctx context.Context, peerID string) error { + log := logger.WithPeerID(peerID) + if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + rawPeer, err := p.rdb.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result() + if err != nil { + return errors.New("getting peer failed from redis") + } + + pipe.Del(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)) + pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID) + pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID) + return nil + }); err != nil { + log.Errorf("store peer failed: %v", err) + return err + } + + return nil +} + +// LoadAll returns all persistent cache peers. +func (p *peerManager) LoadAll(ctx context.Context) ([]*Peer, error) { + var ( + peers []*Peer + cursor uint64 + ) + + for { + var ( + peerKeys []string + err error + ) + + peerKeys, cursor, err = p.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCachePeersInScheduler(p.config.Manager.SchedulerClusterID), 10).Result() + if err != nil { + logger.Error("scan tasks failed") + return nil, err + } + + for _, peerKey := range peerKeys { + peer, loaded := p.Load(ctx, peerKey) + if !loaded { + logger.WithPeerID(peerKey).Error("load peer failed") + continue + } + + peers = append(peers, peer) + } + + if cursor == 0 { + break + } + } + + return peers, nil +} diff --git a/scheduler/resource/persistentcache/task.go b/scheduler/resource/persistentcache/task.go index f945734f338..7472f787422 100644 --- a/scheduler/resource/persistentcache/task.go +++ b/scheduler/resource/persistentcache/task.go @@ -115,7 +115,7 @@ func NewTask(id, tag, application, state string, persistentReplicaCount uint64, TTL: time.Hour * 24, CreatedAt: createdAt, UpdatedAt: updatedAt, - Log: logger.WithPersistentCacheTask(id), + Log: logger.WithTaskID(id), } // Initialize state machine. diff --git a/scheduler/resource/persistentcache/task_manager.go b/scheduler/resource/persistentcache/task_manager.go index 3603308e055..8ba04082072 100644 --- a/scheduler/resource/persistentcache/task_manager.go +++ b/scheduler/resource/persistentcache/task_manager.go @@ -20,7 +20,6 @@ package persistentcache import ( "context" - "fmt" "strconv" "time" @@ -34,13 +33,13 @@ import ( // TaskManager is the interface used for persistent cache task manager. type TaskManager interface { - // Load returns persistent cache task for a key. + // Load returns persistent cache task by a key. Load(context.Context, string) (*Task, bool) // Store sets persistent cache task. Store(context.Context, *Task) error - // Delete deletes persistent cache task for a key. + // Delete deletes persistent cache task by a key. Delete(context.Context, string) // LoadAll returns all persistent cache tasks. @@ -63,68 +62,69 @@ func newTaskManager(cfg *config.Config, rdb redis.UniversalClient) TaskManager { return &taskManager{config: cfg, rdb: rdb} } -// Load returns persistent cache task for a key. +// Load returns persistent cache task by a key. func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { + log := logger.WithTaskID(taskID) rawTask, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result() if err != nil { - fmt.Println("getting task failed from Redis:", err) + log.Errorf("getting task failed from redis: %v", err) return nil, false } // Set integer fields from raw task. persistentReplicaCount, err := strconv.ParseUint(rawTask["persistent_replica_count"], 10, 64) if err != nil { - fmt.Println("parsing persistent replica count failed:", err) + log.Errorf("parsing persistent replica count failed: %v", err) return nil, false } replicaCount, err := strconv.ParseUint(rawTask["replica_count"], 10, 64) if err != nil { - fmt.Println("parsing replica count failed:", err) + log.Errorf("parsing replica count failed: %v", err) return nil, false } pieceLength, err := strconv.ParseInt(rawTask["piece_length"], 10, 32) if err != nil { - fmt.Println("parsing piece length failed:", err) + log.Errorf("parsing piece length failed: %v", err) return nil, false } contentLength, err := strconv.ParseInt(rawTask["content_length"], 10, 64) if err != nil { - fmt.Println("parsing content length failed:", err) + log.Errorf("parsing content length failed: %v", err) return nil, false } totalPieceCount, err := strconv.ParseInt(rawTask["total_piece_count"], 10, 32) if err != nil { - fmt.Println("parsing total piece count failed:", err) + log.Errorf("parsing total piece count failed: %v", err) return nil, false } // Set time fields from raw task. ttl, err := strconv.ParseInt(rawTask["ttl"], 10, 32) if err != nil { - fmt.Println("parsing ttl failed:", err) + log.Errorf("parsing ttl failed: %v", err) return nil, false } createdAt, err := time.Parse(time.RFC3339, rawTask["created_at"]) if err != nil { - fmt.Println("parsing created at failed:", err) + log.Errorf("parsing created at failed: %v", err) return nil, false } updatedAt, err := time.Parse(time.RFC3339, rawTask["updated_at"]) if err != nil { - fmt.Println("parsing updated at failed:", err) + log.Errorf("parsing updated at failed: %v", err) return nil, false } // Set digest from raw task. digest, err := digest.Parse(rawTask["digest"]) if err != nil { - fmt.Println("parsing digest failed:", err) + log.Errorf("parsing digest failed: %v", err) return nil, false } @@ -142,14 +142,14 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { time.Duration(ttl), createdAt, updatedAt, - logger.WithPersistentCacheTask(rawTask["id"]), + logger.WithTaskID(rawTask["id"]), ), true } // Store sets persistent cache task. func (t *taskManager) Store(ctx context.Context, task *Task) error { if _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - t.rdb.HSet(ctx, + pipe.HSet(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), "id", task.ID, "persistent_replica_count", task.PersistentReplicaCount, @@ -160,22 +160,22 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error { "piece_length", task.PieceLength, "content_length", task.ContentLength, "total_piece_count", task.TotalPieceCount, - "state", TaskStatePending, + "state", task.FSM.Current(), "ttl", task.TTL, "created_at", task.CreatedAt.Format(time.RFC3339), "updated_at", task.UpdatedAt.Format(time.RFC3339)) - t.rdb.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL) + pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL) return nil }); err != nil { - task.Log.Warnf("store task failed: %v", err) + task.Log.Errorf("store task failed: %v", err) return err } return nil } -// Delete deletes persistent cache task for a key. +// Delete deletes persistent cache task by a key. func (t *taskManager) Delete(ctx context.Context, taskID string) { t.rdb.Del(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)) } @@ -195,14 +195,14 @@ func (t *taskManager) LoadAll(ctx context.Context) ([]*Task, error) { taskKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheTasksInScheduler(t.config.Manager.SchedulerClusterID), 10).Result() if err != nil { - logger.Warn("scan tasks failed") + logger.Error("scan tasks failed") return nil, err } for _, taskKey := range taskKeys { task, loaded := t.Load(ctx, taskKey) if !loaded { - logger.WithTaskID(taskKey).Warn("load task failed") + logger.WithTaskID(taskKey).Error("load task failed") continue }