From 233e57c6eb72d41eaaef6a9cab464f52d7b927dc Mon Sep 17 00:00:00 2001 From: AstaFrode Date: Wed, 4 Sep 2024 17:59:58 +0800 Subject: [PATCH] Miner filter (#185) * update peer record * update config file and env * update track * add trackerv2 * add trackerv2 --- .env.local | 4 +- cmd/cmd/run.go | 7 +- common/confile/conf_test.yaml | 4 +- common/confile/confile.go | 6 +- common/peerrecord/record_peer.go | 285 ++++++++++++++++++++++-- node/get_location.go | 7 +- node/tasks.go | 106 +++++++-- node/tracker.go | 66 +++--- node/trackerv2.go | 367 +++++++++++++++++++++++++++++++ 9 files changed, 766 insertions(+), 86 deletions(-) create mode 100644 node/trackerv2.go diff --git a/.env.local b/.env.local index 3ba4b95..cee45d1 100644 --- a/.env.local +++ b/.env.local @@ -60,5 +60,5 @@ ttl="" refresh="" -# give priority to storing files to miners with these peerids, multiple separated by spaces -peerid="" \ No newline at end of file +# specify the storage miner account you want to store, multiple separated by spaces +specify_miner="" \ No newline at end of file diff --git a/cmd/cmd/run.go b/cmd/cmd/run.go index a5240a4..728e3ef 100644 --- a/cmd/cmd/run.go +++ b/cmd/cmd/run.go @@ -114,8 +114,6 @@ func cmd_run_func(cmd *cobra.Command, args []string) { } defer n.PeerNode.Close() - n.LoadPeer(filepath.Join(n.GetBasespace(), "peer_record")) - go node.Subscribe( ctx, n.PeerNode.GetHost(), n.PeerNode.GetBootnode(), @@ -298,9 +296,8 @@ func readEnv() (*confile.Config, error) { refresh, _ := strconv.Atoi(os.Getenv("refresh")) c.Selector.Refresh = uint32(refresh) - // high priority peerid - peerids := strings.Split(os.Getenv("peerid"), " ") - c.Peerid = peerids + // specify storage miner account + c.Shunt.Account = strings.Split(os.Getenv("specify_miner"), " ") return c, nil } diff --git a/common/confile/conf_test.yaml b/common/confile/conf_test.yaml index b212188..5de2893 100644 --- a/common/confile/conf_test.yaml +++ b/common/confile/conf_test.yaml @@ -66,5 +66,5 @@ selector: refresh: 4 shunt: - # give priority to storing files to miners with these peerids - peerid: \ No newline at end of file + # specify the storage miner account you want to store + account: \ No newline at end of file diff --git a/common/confile/confile.go b/common/confile/confile.go index 8437a6a..bb2317c 100644 --- a/common/confile/confile.go +++ b/common/confile/confile.go @@ -88,8 +88,8 @@ selector: refresh: 4 shunt: - # give priority to storing files to miners with these peerids - peerid:` + # specify the storage miner account you want to store + account:` ) type Application struct { @@ -134,7 +134,7 @@ type Selector struct { } type Shunt struct { - Peerid []string `name:"peerid" toml:"peerid" yaml:"peerid"` + Account []string `name:"account" toml:"account" yaml:"account"` } type Config struct { diff --git a/common/peerrecord/record_peer.go b/common/peerrecord/record_peer.go index addcd07..0a28c32 100644 --- a/common/peerrecord/record_peer.go +++ b/common/peerrecord/record_peer.go @@ -22,36 +22,93 @@ type PeerRecord interface { // SavePeer saves or updates peer information SavePeer(addr peer.AddrInfo) error // - SavePeerAccount(account string, peerid string) error + DeletePeer(peerid string) + // + DeletePeerByAccount(acc string) + // + SavePeerAccount(account string, peerid string, state string, idle_space uint64) error // HasPeer(peerid string) bool // GetPeer(peerid string) (peer.AddrInfo, bool) // - GetPeerByAccount(account string) (peer.AddrInfo, bool) + GetPeerByAccount(account string) (AccountInfo, bool) + // + GetAccountByPeer(peerid string) (string, bool) // GetAllPeerId() []string // + GetAllWhitelist() []string + // + AddToWhitelist(peerid, account string) + // + AddToBlacklist(peerid, account, reason string) + // + RemoveFromBlacklist(peerid string) + // + IsInBlacklist(peerid string) bool + // + GetBlacklist() map[string]BlacklistInfo + // + GetBlacklistInfo(peerid string) (BlacklistInfo, bool) + // BackupPeer(path string) error // LoadPeer(path string) error + // + BackupAccountPeer(path string) error + // + LoadAccountPeer(path string) error + // + BackupBlacklist(path string) error + // + LoadBlacklist(path string) error + // + BackupWhitelist(path string) error + // + LoadWhitelist(path string) error } type PeerRecordType struct { - lock *sync.RWMutex - accLock *sync.RWMutex - peerList map[string]peer.AddrInfo - accountList map[string]peer.AddrInfo + lock *sync.RWMutex + accLock *sync.RWMutex + blacklistLock *sync.RWMutex + whitelistLock *sync.RWMutex + peerAccountLock *sync.RWMutex + peerList map[string]peer.AddrInfo + peerAccountList map[string]string + accountList map[string]AccountInfo + blacklist map[string]BlacklistInfo + whitelist map[string]string +} + +type AccountInfo struct { + Account string `json:"account"` + State string `json:"state"` + IdleSpace uint64 `json:"idle_space"` + Addrs peer.AddrInfo `json:"addrs"` +} + +type BlacklistInfo struct { + Account string `json:"account"` + Reason string `json:"reason"` + Addrs peer.AddrInfo `json:"addrs"` } var _ PeerRecord = (*PeerRecordType)(nil) func NewPeerRecord() PeerRecord { return &PeerRecordType{ - lock: new(sync.RWMutex), - accLock: new(sync.RWMutex), - peerList: make(map[string]peer.AddrInfo, 100), - accountList: make(map[string]peer.AddrInfo, 100), + lock: new(sync.RWMutex), + accLock: new(sync.RWMutex), + blacklistLock: new(sync.RWMutex), + whitelistLock: new(sync.RWMutex), + peerAccountLock: new(sync.RWMutex), + peerList: make(map[string]peer.AddrInfo, 100), + peerAccountList: make(map[string]string, 100), + accountList: make(map[string]AccountInfo, 100), + blacklist: make(map[string]BlacklistInfo, 100), + whitelist: make(map[string]string, 100), } } @@ -72,16 +129,53 @@ func (p *PeerRecordType) SavePeer(addr peer.AddrInfo) error { return nil } -func (p PeerRecordType) SavePeerAccount(account string, peerid string) error { +func (p PeerRecordType) DeletePeer(peerid string) { + p.lock.Lock() + delete(p.peerList, peerid) + p.lock.Unlock() + + p.blacklistLock.Lock() + delete(p.blacklist, peerid) + p.blacklistLock.Unlock() +} + +func (p PeerRecordType) DeletePeerByAccount(acc string) { + p.accLock.RLock() + value, ok := p.accountList[acc] + p.accLock.RUnlock() + if ok { + p.DeletePeer(value.Addrs.ID.String()) + } +} + +func (p PeerRecordType) SavePeerAccount(account string, peerid string, state string, idle_space uint64) error { p.lock.RLock() addr, ok := p.peerList[peerid] p.lock.RUnlock() if !ok { return fmt.Errorf("not fount peer: %s", peerid) } + p.accLock.Lock() - p.accountList[account] = addr + p.accountList[account] = AccountInfo{ + Addrs: addr, + Account: account, + State: state, + IdleSpace: idle_space, + } p.accLock.Unlock() + + p.peerAccountLock.Lock() + p.peerAccountList[peerid] = account + p.peerAccountLock.Unlock() + + p.blacklistLock.Lock() + value, ok := p.blacklist[peerid] + if ok { + value.Account = account + p.blacklist[peerid] = value + } + p.blacklistLock.Unlock() return nil } @@ -99,11 +193,18 @@ func (p *PeerRecordType) GetPeer(peerid string) (peer.AddrInfo, bool) { return addr, ok } -func (p *PeerRecordType) GetPeerByAccount(account string) (peer.AddrInfo, bool) { +func (p *PeerRecordType) GetPeerByAccount(account string) (AccountInfo, bool) { p.accLock.RLock() - addr, ok := p.accountList[account] + accountInfo, ok := p.accountList[account] p.accLock.RUnlock() - return addr, ok + return accountInfo, ok +} + +func (p *PeerRecordType) GetAccountByPeer(peerid string) (string, bool) { + p.peerAccountLock.RLock() + acc, ok := p.peerAccountList[peerid] + p.peerAccountLock.RUnlock() + return acc, ok } func (p *PeerRecordType) GetAllPeerId() []string { @@ -118,6 +219,76 @@ func (p *PeerRecordType) GetAllPeerId() []string { return result } +func (p *PeerRecordType) GetAllWhitelist() []string { + var i int + p.whitelistLock.RLock() + var result = make([]string, len(p.whitelist)) + for k := range p.whitelist { + result[i] = k + i++ + } + p.whitelistLock.RUnlock() + return result +} + +func (p *PeerRecordType) AddToWhitelist(peerid, account string) { + p.whitelistLock.Lock() + p.whitelist[peerid] = account + p.whitelistLock.Unlock() + + p.blacklistLock.Lock() + delete(p.blacklist, peerid) + p.blacklistLock.Unlock() +} + +func (p *PeerRecordType) AddToBlacklist(peerid, account, reason string) { + p.lock.RLock() + addrs, _ := p.peerList[peerid] + p.lock.RUnlock() + + p.blacklistLock.Lock() + p.blacklist[peerid] = BlacklistInfo{ + Addrs: addrs, + Account: account, + Reason: reason, + } + p.blacklistLock.Unlock() + + p.whitelistLock.Lock() + delete(p.whitelist, peerid) + p.whitelistLock.Unlock() +} + +func (p *PeerRecordType) RemoveFromBlacklist(peerid string) { + p.blacklistLock.Lock() + delete(p.blacklist, peerid) + p.blacklistLock.Unlock() +} + +func (p *PeerRecordType) IsInBlacklist(peerid string) bool { + p.blacklistLock.RLock() + _, ok := p.blacklist[peerid] + p.blacklistLock.RUnlock() + return ok +} + +func (p *PeerRecordType) GetBlacklist() map[string]BlacklistInfo { + p.blacklistLock.Lock() + var result = make(map[string]BlacklistInfo, len(p.blacklist)) + for k, v := range p.blacklist { + result[k] = v + } + p.blacklistLock.Unlock() + return result +} + +func (p *PeerRecordType) GetBlacklistInfo(peerid string) (BlacklistInfo, bool) { + p.blacklistLock.RLock() + result, ok := p.blacklist[peerid] + p.blacklistLock.RUnlock() + return result, ok +} + func (p *PeerRecordType) BackupPeer(path string) error { p.lock.RLock() buf, err := json.Marshal(p.peerList) @@ -145,3 +316,87 @@ func (p *PeerRecordType) LoadPeer(path string) error { p.lock.Unlock() return nil } + +func (p *PeerRecordType) BackupAccountPeer(path string) error { + p.accLock.RLock() + buf, err := json.Marshal(p.accountList) + if err != nil { + p.accLock.RUnlock() + return err + } + p.accLock.RUnlock() + err = sutils.WriteBufToFile(buf, path) + return err +} + +func (p *PeerRecordType) LoadAccountPeer(path string) error { + buf, err := os.ReadFile(path) + if err != nil { + return err + } + var data = make(map[string]AccountInfo) + err = json.Unmarshal(buf, &data) + if err != nil { + return err + } + p.accLock.Lock() + p.accountList = data + p.accLock.Unlock() + return nil +} + +func (p *PeerRecordType) BackupBlacklist(path string) error { + p.blacklistLock.RLock() + buf, err := json.Marshal(p.blacklist) + if err != nil { + p.blacklistLock.RUnlock() + return err + } + p.blacklistLock.RUnlock() + err = sutils.WriteBufToFile(buf, path) + return err +} + +func (p *PeerRecordType) LoadBlacklist(path string) error { + buf, err := os.ReadFile(path) + if err != nil { + return err + } + var data = make(map[string]BlacklistInfo) + err = json.Unmarshal(buf, &data) + if err != nil { + return err + } + p.blacklistLock.Lock() + p.blacklist = data + p.blacklistLock.Unlock() + return nil +} + +func (p *PeerRecordType) BackupWhitelist(path string) error { + p.whitelistLock.RLock() + buf, err := json.Marshal(p.whitelist) + if err != nil { + p.whitelistLock.RUnlock() + return err + } + p.whitelistLock.RUnlock() + err = sutils.WriteBufToFile(buf, path) + return err +} + +func (p *PeerRecordType) LoadWhitelist(path string) error { + buf, err := os.ReadFile(path) + if err != nil { + return err + } + var data = make(map[string]string) + err = json.Unmarshal(buf, &data) + if err != nil { + return err + } + p.whitelistLock.Lock() + p.whitelist = data + p.whitelistLock.Unlock() + return nil +} diff --git a/node/get_location.go b/node/get_location.go index 32fae48..7eda3e8 100644 --- a/node/get_location.go +++ b/node/get_location.go @@ -102,9 +102,9 @@ func (n *Node) GetFileLocation(c *gin.Context) { } func (n *Node) getMinerAddr(account string) (peer.AddrInfo, string, error) { - addr, ok := n.GetPeerByAccount(account) + accountInfo, ok := n.GetPeerByAccount(account) if ok { - return addr, "", nil + return accountInfo.Addrs, "", nil } puk, err := sutils.ParsingPublickey(account) if err != nil { @@ -115,8 +115,9 @@ func (n *Node) getMinerAddr(account string) (peer.AddrInfo, string, error) { return peer.AddrInfo{}, "", err } peerid := base58.Encode([]byte(string(minerInfo.PeerId[:]))) - addr, ok = n.GetPeer(peerid) + addr, ok := n.GetPeer(peerid) if ok { + n.SavePeerAccount(account, peerid, string(minerInfo.State), minerInfo.IdleSpace.Uint64()) return addr, base58.Encode([]byte(string(minerInfo.PeerId[:]))), nil } return peer.AddrInfo{}, "", fmt.Errorf("not fount peer: %s", peerid) diff --git a/node/tasks.go b/node/tasks.go index ddbd740..4feb6b8 100644 --- a/node/tasks.go +++ b/node/tasks.go @@ -8,24 +8,45 @@ package node import ( + "context" + "errors" "math" "path/filepath" "strconv" "time" schain "github.com/CESSProject/cess-go-sdk/chain" - sconfig "github.com/CESSProject/cess-go-sdk/config" + sutils "github.com/CESSProject/cess-go-sdk/utils" "github.com/CESSProject/p2p-go/out" + "github.com/libp2p/go-libp2p/core/peer" "github.com/mr-tron/base58" ) func (n *Node) TaskMgt() { var ( - err error - ch_trackFile = make(chan bool, 1) - ch_refreshMiner = make(chan bool, 1) + err error + ch_trackFile = make(chan bool, 1) + ch_refreshMiner = make(chan bool, 1) + ch_refreshBlacklist = make(chan bool, 1) ) + err = n.LoadPeer(filepath.Join(n.Workspace(), "peer_record")) + if err != nil { + n.Log("err", "LoadPeer"+err.Error()) + } + err = n.LoadAccountPeer(filepath.Join(n.Workspace(), "account_record")) + if err != nil { + n.Log("err", "LoadAccountPeer"+err.Error()) + } + err = n.LoadBlacklist(filepath.Join(n.Workspace(), "blacklist_record")) + if err != nil { + n.Log("err", "LoadBlacklist"+err.Error()) + } + err = n.LoadWhitelist(filepath.Join(n.Workspace(), "whitelist_record")) + if err != nil { + n.Log("err", "LoadWhitelist"+err.Error()) + } + ch_trackFile <- true task_block := time.NewTicker(time.Duration(time.Second * 27)) @@ -34,12 +55,17 @@ func (n *Node) TaskMgt() { task_Minute := time.NewTicker(time.Duration(time.Second * 59)) defer task_Minute.Stop() + task_10Minute := time.NewTicker(time.Duration(time.Second * 597)) + defer task_10Minute.Stop() + task_Hour := time.NewTicker(time.Duration(time.Second * 3599)) defer task_Hour.Stop() go n.RefreshMiner(ch_refreshMiner) + go n.RefreshBlacklist(ch_refreshBlacklist) + go n.TrackerV2() - count := 0 + //count := 0 chainState := true for { select { @@ -54,51 +80,85 @@ func (n *Node) TaskMgt() { n.Log("info", "rpc reconnect suc: "+n.GetCurrentRpcAddr()) } } - count++ - if count >= 4320 { //blacklist released every 12 hours - count = 0 - n.ClearBlackList() - } + + // count++ + // if count >= 4320 { //blacklist released every 12 hours + // count = 0 + // n.ClearBlackList() + // } case <-task_Minute.C: - if len(ch_trackFile) > 0 { - <-ch_trackFile - go n.Tracker(ch_trackFile) - } + // if len(ch_trackFile) > 0 { + // <-ch_trackFile + // go n.Tracker(ch_trackFile) + // } err := n.RefreshSelf() if err != nil { n.Log("err", err.Error()) } + case <-task_10Minute.C: + go n.BackupPeer(filepath.Join(n.Workspace(), "peer_record")) + go n.BackupAccountPeer(filepath.Join(n.Workspace(), "account_record")) + go n.BackupBlacklist(filepath.Join(n.Workspace(), "blacklist_record")) + go n.BackupWhitelist(filepath.Join(n.Workspace(), "whitelist_record")) + + if len(ch_refreshBlacklist) > 0 { + <-ch_refreshBlacklist + n.RefreshBlacklist(ch_refreshBlacklist) + } + + n.Log("info", "backup peer") case <-task_Hour.C: if len(ch_refreshMiner) > 0 { <-ch_refreshMiner go n.RefreshMiner(ch_refreshMiner) } + } + } +} - go n.BackupPeer(filepath.Join(n.Workspace(), "peer_record")) +func (n *Node) RefreshBlacklist(ch chan<- bool) { + defer func() { ch <- true }() + allpeers := n.GetBlacklist() + for _, v := range allpeers { + if n.ConnectPeer(v.Addrs) { + n.RemoveFromBlacklist(v.Addrs.ID.String()) + n.AddToWhitelist(v.Addrs.ID.String(), "") } } } +func (n *Node) ConnectPeer(addr peer.AddrInfo) bool { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + err := n.Connect(ctx, addr) + cancel() + return err == nil +} + func (n *Node) RefreshMiner(ch chan<- bool) { defer func() { ch <- true }() + peerid := "" sminerList, err := n.QueryAllMiner(-1) if err == nil { for i := 0; i < len(sminerList); i++ { - minerinfo, err := n.QueryMinerItems(sminerList[i][:], -1) + acc, err := sutils.EncodePublicKeyAsCessAccount(sminerList[i][:]) if err != nil { + n.Log("err", err.Error()) continue } - if minerinfo.IdleSpace.Uint64() >= sconfig.FragmentSize { - peerid := base58.Encode([]byte(string(minerinfo.PeerId[:]))) - n.SavePeerAccount(n.GetSignatureAcc(), peerid) - // addrinfo, ok := n.GetPeer(peerid) - // if ok { - // n.FlushPeerNodes(scheduler.DEFAULT_TIMEOUT, addrinfo) - // } + minerinfo, err := n.QueryMinerItems(sminerList[i][:], -1) + if err != nil { + if !errors.Is(err, schain.ERR_RPC_EMPTY_VALUE) { + n.Log("err", err.Error()) + } else { + n.DeletePeerByAccount(acc) + } + continue } + peerid = base58.Encode([]byte(string(minerinfo.PeerId[:]))) + n.SavePeerAccount(acc, peerid, string(minerinfo.State), minerinfo.IdleSpace.Uint64()) } } } diff --git a/node/tracker.go b/node/tracker.go index 2e7e239..e8e9f2f 100644 --- a/node/tracker.go +++ b/node/tracker.go @@ -238,7 +238,7 @@ func (n *Node) storageData(record TrackerInfo, completeList []chain.CompleteInfo value.Complete = true value.Miner, _ = sutils.EncodePublicKeyAsCessAccount(v.Miner[:]) if p, ok := n.GetPeerByAccount(value.Miner); ok { - value.Peerid = p.ID.String() + value.Peerid = p.Addrs.ID.String() } dataGroup[uint8(v.Index)] = value } @@ -258,7 +258,7 @@ func (n *Node) storageData(record TrackerInfo, completeList []chain.CompleteInfo return n.rangeStorage(record, dataGroup) } - priorityMiners := n.Config.Shunt.Peerid + priorityMiners := n.Config.Shunt.Account if len(priorityMiners) > 0 { n.highPriorityStorage(record, dataGroup) } @@ -294,31 +294,31 @@ func (n *Node) shuntAllStorage(record TrackerInfo, dataGroup map[uint8]datagroup continue } - n.Peerstore().AddAddrs(addr.ID, addr.Addrs, time.Minute) - n.Logtrack("info", fmt.Sprintf("[%s] will transfer to the miner: %s", record.Fid, addr.ID.String())) + n.Peerstore().AddAddrs(addr.Addrs.ID, addr.Addrs.Addrs, time.Minute) + n.Logtrack("info", fmt.Sprintf("[%s] will transfer to the miner: %s", record.Fid, addr.Addrs.ID.String())) for j := 0; j < len(v.File); j++ { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - err = n.WriteDataAction(ctx, addr.ID, v.File[j], record.Fid, filepath.Base(v.File[j])) + err = n.WriteDataAction(ctx, addr.Addrs.ID, v.File[j], record.Fid, filepath.Base(v.File[j])) if err != nil { failed = true - n.Logtrack("err", fmt.Sprintf("[%s] transfer to %s failed: %v", record.Fid, addr.ID.String(), err)) - n.Feedback(addr.ID.String(), false) + n.Logtrack("err", fmt.Sprintf("[%s] transfer to %s failed: %v", record.Fid, addr.Addrs.ID.String(), err)) + n.Feedback(addr.Addrs.ID.String(), false) break } - n.Logtrack("info", fmt.Sprintf("[%s] The %dth fragment of the %dth batch is transferred to %s", record.Fid, j, index, addr.ID.String())) + n.Logtrack("info", fmt.Sprintf("[%s] The %dth fragment of the %dth batch is transferred to %s", record.Fid, j, index, addr.Addrs.ID.String())) failed = false } - n.Peerstore().ClearAddrs(addr.ID) + n.Peerstore().ClearAddrs(addr.Addrs.ID) if !failed { var value datagroup value = dataGroup[index] value.Complete = true value.Miner = acconut - value.Peerid = addr.ID.String() + value.Peerid = addr.Addrs.ID.String() dataGroup[index] = value //n.Feedback(addr.ID.String(), true) - n.Logtrack("info", fmt.Sprintf("[%s] %dth batch of all fragments is transferred to %s", record.Fid, index, addr.ID.String())) + n.Logtrack("info", fmt.Sprintf("[%s] %dth batch of all fragments is transferred to %s", record.Fid, index, addr.Addrs.ID.String())) break } allcompleted = false @@ -349,31 +349,31 @@ func (n *Node) shuntPartStorage(record TrackerInfo, dataGroup map[uint8]datagrou if v.Complete { continue } - n.Peerstore().AddAddrs(addr.ID, addr.Addrs, time.Minute) + n.Peerstore().AddAddrs(addr.Addrs.ID, addr.Addrs.Addrs, time.Minute) for j := 0; j < len(v.File); j++ { - n.Logtrack("info", fmt.Sprintf("[%s] shunt part: will transfer the %dth(%d-%d) batch of fragments to the miner: %s", record.Fid, index, len(v.File), j, addr.ID.String())) + n.Logtrack("info", fmt.Sprintf("[%s] shunt part: will transfer the %dth(%d-%d) batch of fragments to the miner: %s", record.Fid, index, len(v.File), j, addr.Addrs.ID.String())) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - err = n.WriteDataAction(ctx, addr.ID, v.File[j], record.Fid, filepath.Base(v.File[j])) + err = n.WriteDataAction(ctx, addr.Addrs.ID, v.File[j], record.Fid, filepath.Base(v.File[j])) if err != nil { failed = true n.Logtrack("err", fmt.Sprintf("[%s] shunt part: transfer failed: %v", record.Fid, err)) - n.Feedback(addr.ID.String(), false) + n.Feedback(addr.Addrs.ID.String(), false) break } n.Logtrack("info", fmt.Sprintf("[%s] shunt part: transfer successful", record.Fid)) failed = false } - n.Peerstore().ClearAddrs(addr.ID) + n.Peerstore().ClearAddrs(addr.Addrs.ID) if !failed { var value datagroup value = dataGroup[index] value.Complete = true value.Miner = acconut - value.Peerid = addr.ID.String() + value.Peerid = addr.Addrs.ID.String() dataGroup[index] = value //n.Feedback(addr.ID.String(), true) - n.Logtrack("info", fmt.Sprintf("[%s] shunt part: %dth batch fragments all transferred to: %s %s", record.Fid, index, acconut, addr.ID.String())) + n.Logtrack("info", fmt.Sprintf("[%s] shunt part: %dth batch fragments all transferred to: %s %s", record.Fid, index, acconut, addr.Addrs.ID.String())) break } allcompleted = false @@ -450,7 +450,7 @@ func (n *Node) rangeStorage(record TrackerInfo, dataGroup map[uint8]datagroup) e func (n *Node) highPriorityStorage(record TrackerInfo, dataGroup map[uint8]datagroup) error { var err error - priorityPeers := n.Config.Shunt.Peerid + priorityPeers := n.Config.Shunt.Account if len(priorityPeers) <= 0 { return nil } @@ -464,41 +464,41 @@ func (n *Node) highPriorityStorage(record TrackerInfo, dataGroup map[uint8]datag } failed := true n.Logtrack("info", fmt.Sprintf("[%s] will transfer the %dth(%d) batch of fragments to high priority miners", record.Fid, index, len(v.File))) - for _, peerid := range priorityPeers { - if _, ok := sucPeer[peerid]; ok { + for _, acc := range priorityPeers { + addrs, ok := n.GetPeerByAccount(acc) + if !ok { + n.Logtrack("info", fmt.Sprintf("[%s] not found this peer: %s", record.Fid, acc)) continue } - addrs, ok := n.GetPeer(peerid) - if !ok { - n.Logtrack("info", fmt.Sprintf("[%s] not found this peer: %s", record.Fid, peerid)) + if _, ok := sucPeer[addrs.Addrs.ID.String()]; ok { continue } - n.Peerstore().AddAddrs(addrs.ID, addrs.Addrs, time.Minute) - n.Logtrack("info", fmt.Sprintf("[%s] will transfer to the miner: %s", record.Fid, peerid)) + n.Peerstore().AddAddrs(addrs.Addrs.ID, addrs.Addrs.Addrs, time.Minute) + n.Logtrack("info", fmt.Sprintf("[%s] will transfer to the miner: %s", record.Fid, addrs.Addrs.ID.String())) for j := 0; j < len(v.File); j++ { n.Logtrack("info", fmt.Sprintf("[%s] will transfer fragment: %s", record.Fid, v.File[j])) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - err = n.WriteDataAction(ctx, addrs.ID, v.File[j], record.Fid, filepath.Base(v.File[j])) + err = n.WriteDataAction(ctx, addrs.Addrs.ID, v.File[j], record.Fid, filepath.Base(v.File[j])) if err != nil { failed = true - n.Logtrack("err", fmt.Sprintf("[%s] transfer to %s failed: %v", record.Fid, peerid, err)) - n.Feedback(peerid, false) + n.Logtrack("err", fmt.Sprintf("[%s] transfer to %s failed: %v", record.Fid, addrs.Addrs.ID.String(), err)) + n.Feedback(addrs.Addrs.ID.String(), false) break } - n.Logtrack("info", fmt.Sprintf("[%s] The %dth fragment of the %dth batch is transferred to %s", record.Fid, j, index, peerid)) + n.Logtrack("info", fmt.Sprintf("[%s] The %dth fragment of the %dth batch is transferred to %s", record.Fid, j, index, addrs.Addrs.ID.String())) failed = false } - n.Peerstore().ClearAddrs(addrs.ID) + n.Peerstore().ClearAddrs(addrs.Addrs.ID) if !failed { var value datagroup value = dataGroup[index] value.Complete = true - value.Peerid = peerid + value.Peerid = addrs.Addrs.ID.String() dataGroup[index] = value //n.Feedback(peerid, true) - n.Logtrack("info", fmt.Sprintf("[%s] %dth batch of all fragments is transferred to %s", record.Fid, index, peerid)) + n.Logtrack("info", fmt.Sprintf("[%s] %dth batch of all fragments is transferred to %s", record.Fid, index, addrs.Addrs.ID.String())) break } } diff --git a/node/trackerv2.go b/node/trackerv2.go new file mode 100644 index 0000000..f177cd9 --- /dev/null +++ b/node/trackerv2.go @@ -0,0 +1,367 @@ +/* + Copyright (C) CESS. All rights reserved. + Copyright (C) Cumulus Encrypted Storage System. All rights reserved. + + SPDX-License-Identifier: Apache-2.0 +*/ + +package node + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/CESSProject/DeOSS/common/utils" + "github.com/CESSProject/cess-go-sdk/chain" + schain "github.com/CESSProject/cess-go-sdk/chain" + sconfig "github.com/CESSProject/cess-go-sdk/config" + sutils "github.com/CESSProject/cess-go-sdk/utils" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" +) + +type StorageDataType struct { + Fid string + Complete []string + Data [][]string +} + +// tracker +func (n *Node) TrackerV2() { + n.Logtrack("info", ">>>>> start trackerv2 <<<<<") + var tNow time.Time + for { + tNow = time.Now() + n.processTrackFiles() + if time.Since(tNow).Minutes() < 3.0 { + time.Sleep(time.Minute * 3) + } + } +} + +func (n *Node) processTrackFiles() { + defer func() { + if err := recover(); err != nil { + n.Pnc(utils.RecoverError(err)) + } + }() + + var err error + var count uint8 + var trackFiles []string + trackFiles, err = n.ListTrackFiles() + if err != nil { + n.Logtrack("err", err.Error()) + return + } + if len(trackFiles) <= 0 { + n.Logtrack("info", "no track files") + return + } + + n.Logtrack("info", fmt.Sprintf("number of track files: %d", len(trackFiles))) + + count = 0 + fid := "" + var dealFiles = make([]StorageDataType, 0) + for i := 0; i < len(trackFiles); i++ { + fid = filepath.Base(trackFiles[i]) + storageDataType, ok, err := n.checkFileState(fid) + if err != nil { + n.Logtrack("err", fmt.Sprintf("checkFileState: %v", err)) + continue + } + if ok { + n.Logtrack("info", fmt.Sprintf(" %s storage suc", fid)) + continue + } + + dealFiles = append(dealFiles, storageDataType) + count++ + if count >= 10 { + n.Logtrack("info", fmt.Sprintf(" will storage %d files: %v", len(dealFiles), dealFiles)) + err = n.storageFiles(dealFiles) + if err != nil { + n.Logtrack("err", err.Error()) + return + } + count = 0 + dealFiles = make([]StorageDataType, 0) + } + } + if len(dealFiles) > 0 { + n.Logtrack("info", fmt.Sprintf(" will storage %d files: %v", len(dealFiles), dealFiles)) + err = n.storageFiles(dealFiles) + if err != nil { + n.Logtrack("err", err.Error()) + } + } +} + +func (n *Node) checkFileState(fid string) (StorageDataType, bool, error) { + recordFile, err := n.ParseTrackFile(fid) + if err != nil { + return StorageDataType{}, false, fmt.Errorf("[ParseTrackFromFile(%s)] %v", fid, err) + } + + _, err = n.QueryFile(fid, -1) + if err != nil { + if err.Error() != chain.ERR_Empty { + return StorageDataType{}, false, err + } + } else { + for i := 0; i < len(recordFile.Segment); i++ { + for j := 0; j < len(recordFile.Segment[i].FragmentHash); j++ { + os.Remove(filepath.Join(recordFile.CacheDir, recordFile.Segment[i].FragmentHash[j])) + } + } + n.DeleteTrackFile(fid) + return StorageDataType{}, true, nil + } + + flag := false + if recordFile.Segment == nil { + flag = true + } + + for i := 0; i < len(recordFile.Segment); i++ { + for j := 0; j < len(recordFile.Segment[i].FragmentHash); j++ { + _, err = os.Stat(recordFile.Segment[i].FragmentHash[j]) + if err != nil { + flag = true + break + } + } + if flag { + break + } + } + + if flag { + segment, hash, err := n.reFullProcessing(fid, recordFile.Cipher, recordFile.CacheDir) + if err != nil { + return StorageDataType{}, false, errors.Wrapf(err, "reFullProcessing") + } + if recordFile.Fid != hash { + return StorageDataType{}, false, fmt.Errorf("The fid after reprocessing is inconsistent [%s != %s] %v", recordFile.Fid, hash, err) + } + recordFile.Segment = segment + } + + var storageDataType = StorageDataType{ + Fid: fid, + Complete: make([]string, 0), + Data: make([][]string, 0), + } + + dealmap, err := n.QueryDealMap(fid, -1) + if err != nil { + if err.Error() != chain.ERR_Empty { + return StorageDataType{}, false, err + } + } else { + for index := 0; index < (sconfig.DataShards + sconfig.ParShards); index++ { + acc, ok := IsComplete(index+1, dealmap.CompleteList) + if ok { + storageDataType.Complete = append(storageDataType.Complete, acc) + continue + } + var value = make([]string, 0) + for i := 0; i < len(recordFile.Segment); i++ { + value = append(value, string(recordFile.Segment[i].FragmentHash[index])) + } + storageDataType.Data = append(storageDataType.Data, value) + } + return storageDataType, false, nil + } + + recordFile.PutFlag = false + b, err := json.Marshal(&recordFile) + if err != nil { + return StorageDataType{}, false, errors.Wrapf(err, "[%s] [json.Marshal]", fid) + } + err = n.WriteTrackFile(fid, b) + if err != nil { + return StorageDataType{}, false, errors.Wrapf(err, "[%s] [WriteTrackFile]", fid) + } + + // verify the space is authorized + authAccs, err := n.QueryAuthorityList(recordFile.Owner, -1) + if err != nil { + if err.Error() != chain.ERR_Empty { + return StorageDataType{}, false, err + } + } + + flag = false + for _, v := range authAccs { + if sutils.CompareSlice(n.GetSignatureAccPulickey(), v[:]) { + flag = true + break + } + } + + if !flag { + // os.RemoveAll(recordFile.CacheDir) + // n.DeleteTrackFile(roothash) + user, _ := sutils.EncodePublicKeyAsCessAccount(recordFile.Owner) + return StorageDataType{}, true, errors.Errorf("[%s] user [%s] has revoked authorization", fid, user) + } + + txhash, err := n.PlaceStorageOrder( + fid, + recordFile.FileName, + recordFile.BucketName, + recordFile.TerritoryName, + recordFile.Segment, + recordFile.Owner, + recordFile.FileSize, + ) + if err != nil { + return StorageDataType{}, false, err + } + n.Logtrack("info", fmt.Sprintf("[%s] PlaceStorageOrder suc: %s", fid, txhash)) + + for index := 0; index < (sconfig.DataShards + sconfig.ParShards); index++ { + var value = make([]string, 0) + for i := 0; i < len(recordFile.Segment); i++ { + value = append(value, string(recordFile.Segment[i].FragmentHash[index])) + } + storageDataType.Data = append(storageDataType.Data, value) + } + return storageDataType, false, nil +} + +func (n *Node) storageFiles(tracks []StorageDataType) error { + allpeers := n.GetAllWhitelist() + allpeers = append(allpeers, n.GetAllPeerId()...) + length := len(allpeers) + for i := 0; i < length; i++ { + n.Logtrack("info", fmt.Sprintf(" use peer: %s", allpeers[i])) + if n.IsInBlacklist(allpeers[i]) { + n.Logtrack("info", " peer in blacklist") + continue + } + err := n.storageToPeer(allpeers[i], tracks) + if err != nil { + n.Logtrack("err", err.Error()) + } + } + return nil +} + +func (n *Node) storageToPeer(peerid string, tracks []StorageDataType) error { + addr, ok := n.GetPeer(peerid) + if !ok { + n.Logtrack("err", " peer not found") + return fmt.Errorf("%s not found addr", peerid) + } + + n.Peerstore().AddAddrs(addr.ID, addr.Addrs, time.Hour) + err := n.storagedata(addr.ID, tracks) + n.Peerstore().ClearAddrs(addr.ID) + if err != nil { + return err + } + return nil +} + +func (n *Node) storagedata(peerid peer.ID, tracks []StorageDataType) error { + account, _ := n.GetAccountByPeer(peerid.String()) + + accountInfo, ok := n.GetPeerByAccount(account) + if !ok { + n.Logtrack("err", " peer is not a miner") + return nil + } + if accountInfo.State != schain.MINER_STATE_POSITIVE { + n.Logtrack("err", fmt.Sprintf(" peer status is not %s", schain.MINER_STATE_POSITIVE)) + return fmt.Errorf(" %s status is not %s", account, schain.MINER_STATE_POSITIVE) + } + if accountInfo.IdleSpace < sconfig.FragmentSize*(sconfig.ParShards+sconfig.DataShards) { + n.Logtrack("err", " peer space < 96M") + return fmt.Errorf(" %s space < 96M", account) + } + length := len(tracks) + for i := 0; i < length; i++ { + n.Logtrack("info", fmt.Sprintf(" peer will storage file %s", tracks[i].Fid)) + if IsStoraged(account, tracks[i].Complete) { + n.Logtrack("info", " peer already storaged this file") + continue + } + err := n.storageBatchFragment(peerid, account, tracks[i]) + if err != nil { + return err + } + if len(tracks[i].Data) > 1 { + tracks[i].Data = tracks[i].Data[1:] + } else { + tracks[i].Data = make([][]string, 0) + } + accountInfo.IdleSpace -= sconfig.FragmentSize * (sconfig.ParShards + sconfig.DataShards) + if accountInfo.IdleSpace < sconfig.FragmentSize*(sconfig.ParShards+sconfig.DataShards) { + n.Logtrack("info", " peer space < 96M, stop storage") + return nil + } + } + n.Logtrack("info", " all files transferred") + return nil +} + +func (n *Node) storageBatchFragment(peerid peer.ID, account string, tracks StorageDataType) error { + var err error + if len(tracks.Data) <= 0 { + n.Logtrack("info", " peer transferred this batch of fragments") + return nil + } + if len(tracks.Data[0]) <= 0 { + n.Logtrack("info", " peer transferred all fragments of the file") + return nil + } + for j := 0; j < len(tracks.Data[0]); j++ { + err = n.storageFragment(peerid, tracks.Fid, filepath.Base(tracks.Data[0][j]), tracks.Data[0][j]) + if err != nil { + n.Logtrack("info", fmt.Sprintf(" peer transfer %d fragment failed: %v", j, err)) + //if strings.Contains(err.Error(), "refused") || strings.Contains(err.Error(), "timeout") { + n.AddToBlacklist(peerid.String(), account, err.Error()) + //} + return err + } + n.Logtrack("info", fmt.Sprintf(" peer transfer %d fragment suc", j)) + } + n.Logtrack("info", " peer transfer all fragment suc") + n.AddToWhitelist(peerid.String(), account) + return nil +} + +func (n *Node) storageFragment(peerid peer.ID, fid, fragmentHash, fragmentPath string) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + err := n.WriteDataAction(ctx, peerid, fragmentPath, fid, fragmentHash) + return err +} + +func IsStoraged(account string, complete []string) bool { + length := len(complete) + for i := 0; i < length; i++ { + if account == complete[i] { + return true + } + } + return false +} + +func IsComplete(index int, completeInfo []schain.CompleteInfo) (string, bool) { + length := len(completeInfo) + for i := 0; i < length; i++ { + if int(completeInfo[i].Index) == index { + acc, _ := sutils.EncodePublicKeyAsCessAccount(completeInfo[i].Miner[:]) + return acc, true + } + } + return "", false +}