From ebe2128a9d4847b297d7fae2252c0881f6fb6d06 Mon Sep 17 00:00:00 2001 From: AstaFrode Date: Tue, 23 Aug 2022 11:00:50 +0800 Subject: [PATCH] Added blacklist and optimized connection scheduling logic. --- cmd/cmd.go | 39 ++++++++--- configs/config.go | 6 ++ internal/chain/chainstate.go | 40 ++++++++++++ internal/chain/transaction.go | 98 +++++++++++++++++++--------- internal/pattern/blacklist.go | 42 ++++++++++++ internal/rpc/main.go | 11 ++-- internal/task/miner.go | 1 + internal/task/space.go | 119 ++++++++++++++++++++++++++-------- 8 files changed, 281 insertions(+), 75 deletions(-) create mode 100644 internal/pattern/blacklist.go diff --git a/cmd/cmd.go b/cmd/cmd.go index bba0ae7..d817496 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -460,7 +460,7 @@ func Command_Run_Runfunc(cmd *cobra.Command, args []string) { } // start-up - task.Run() + go task.Run() rpc.Rpc_Main() } @@ -629,9 +629,19 @@ func Command_UpdateAddress_Runfunc(cmd *cobra.Command, args []string) { //Parse command arguments and configuration file parseFlags(cmd) - txhash, _, err := chain.UpdateAddress(configs.C.SignatureAcc, base58.Encode([]byte(os.Args[2]))) - if txhash == "" { - log.Printf("\x1b[%dm[err]\x1b[0m Update failed, Please try again later.%v\n", 41, err) + txhash, err := chain.UpdateAddress(configs.C.SignatureAcc, base58.Encode([]byte(os.Args[2]))) + if err != nil { + if err.Error() == chain.ERR_Empty { + log.Println("[err] Please check your wallet balance.") + } else { + if txhash != "" { + msg := configs.HELP_common + fmt.Sprintf(" %v\n", txhash) + msg += configs.HELP_UpdateAddress + log.Printf("[pending] %v\n", msg) + } else { + log.Printf("[err] %v.\n", err) + } + } os.Exit(1) } log.Printf("\x1b[%dm[ok]\x1b[0m success\n", 42) @@ -651,9 +661,19 @@ func Command_UpdateIncome_Runfunc(cmd *cobra.Command, args []string) { } //Parse command arguments and configuration file parseFlags(cmd) - txhash, _, err := chain.UpdateIncome(configs.C.SignatureAcc, types.NewAccountID(pubkey)) - if txhash == "" { - log.Printf("\x1b[%dm[err]\x1b[0m Update failed, Please try again later.%v\n", 41, err) + txhash, err := chain.UpdateIncome(configs.C.SignatureAcc, types.NewAccountID(pubkey)) + if err != nil { + if err.Error() == chain.ERR_Empty { + log.Println("[err] Please check your wallet balance.") + } else { + if txhash != "" { + msg := configs.HELP_common + fmt.Sprintf(" %v\n", txhash) + msg += configs.HELP_UpdataBeneficiary + log.Printf("[pending] %v\n", msg) + } else { + log.Printf("[err] %v.\n", err) + } + } os.Exit(1) } log.Printf("\x1b[%dm[ok]\x1b[0m success\n", 42) @@ -773,9 +793,8 @@ func parseProfile() { configs.Shared_g = data.Shared_g configs.Shared_params = data.Shared_params configs.Spk = data.Spk - pattern.SetMinerAcc(acc) - pattern.SetMinerSignAddr(configs.C.IncomeAcc) } - + pattern.SetMinerAcc(acc) + pattern.SetMinerSignAddr(configs.C.IncomeAcc) configs.BaseDir = filepath.Join(configs.C.MountedPath, addr, configs.BaseDir) } diff --git a/configs/config.go b/configs/config.go index dd3037b..c9349d3 100644 --- a/configs/config.go +++ b/configs/config.go @@ -41,6 +41,12 @@ const ( HELP_register = ` 3.Check the Sminer_Registered transaction event result in the block hash above: If system.ExtrinsicFailed is prompted, it means failure; If system.ExtrinsicSuccess is prompted, it means success;` + HELP_UpdateAddress = ` 3.Check the Sminer_UpdataIp transaction event result in the block hash above: + If system.ExtrinsicFailed is prompted, it means failure; + If system.ExtrinsicSuccess is prompted, it means success;` + HELP_UpdataBeneficiary = ` 3.Check the Sminer_UpdataBeneficiary transaction event result in the block hash above: + If system.ExtrinsicFailed is prompted, it means failure; + If system.ExtrinsicSuccess is prompted, it means success;` ) // Miner info diff --git a/internal/chain/chainstate.go b/internal/chain/chainstate.go index 24738b6..31f01d0 100644 --- a/internal/chain/chainstate.go +++ b/internal/chain/chainstate.go @@ -234,3 +234,43 @@ func GetBlockHeight(api *gsrpc.SubstrateAPI) (types.U32, error) { } return types.U32(block.Block.Header.Number), nil } + +func GetAccountInfo(puk []byte) (types.AccountInfo, error) { + defer func() { + if err := recover(); err != nil { + Pnc.Sugar().Errorf("%v", tools.RecoverError(err)) + } + }() + + var data types.AccountInfo + + api, err := GetRpcClient_Safe(configs.C.RpcAddr) + defer Free() + if err != nil { + return data, errors.Wrap(err, "[GetRpcClient_Safe]") + } + + meta, err := GetMetadata(api) + if err != nil { + return data, errors.Wrap(err, "[GetMetadata]") + } + + b, err := types.EncodeToBytes(types.NewAccountID(puk)) + if err != nil { + return data, errors.Wrap(err, "[EncodeToBytes]") + } + + key, err := types.CreateStorageKey(meta, "System", "Account", b) + if err != nil { + return data, errors.Wrap(err, "[CreateStorageKey]") + } + + ok, err := api.RPC.State.GetStorageLatest(key, &data) + if err != nil { + return data, errors.Wrap(err, "[GetStorageLatest]") + } + if !ok { + return data, errors.New(ERR_Empty) + } + return data, nil +} diff --git a/internal/chain/transaction.go b/internal/chain/transaction.go index c227fe8..6803e4c 100644 --- a/internal/chain/transaction.go +++ b/internal/chain/transaction.go @@ -763,7 +763,7 @@ func ClearFiller(api *gsrpc.SubstrateAPI, signaturePrk string) (int, error) { } } -func UpdateAddress(transactionPrK, addr string) (string, int, error) { +func UpdateAddress(transactionPrK, addr string) (string, error) { var ( err error accountInfo types.AccountInfo @@ -777,50 +777,50 @@ func UpdateAddress(transactionPrK, addr string) (string, int, error) { api, err := NewRpcClient(configs.C.RpcAddr) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "NewRpcClient err") + return "", errors.Wrap(err, "NewRpcClient err") } keyring, err := signature.KeyringPairFromSecret(transactionPrK, 0) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "KeyringPairFromSecret err") + return "", errors.Wrap(err, "KeyringPairFromSecret err") } meta, err := api.RPC.State.GetMetadataLatest() if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetMetadataLatest err") + return "", errors.Wrap(err, "GetMetadataLatest err") } c, err := types.NewCall(meta, ChainTx_Sminer_UpdateIp, types.Bytes([]byte(addr))) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "NewCall err") + return "", errors.Wrap(err, "NewCall err") } ext := types.NewExtrinsic(c) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "NewExtrinsic err") + return "", errors.Wrap(err, "NewExtrinsic err") } genesisHash, err := api.RPC.Chain.GetBlockHash(0) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetBlockHash err") + return "", errors.Wrap(err, "GetBlockHash err") } rv, err := api.RPC.State.GetRuntimeVersionLatest() if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetRuntimeVersionLatest err") + return "", errors.Wrap(err, "GetRuntimeVersionLatest err") } key, err := types.CreateStorageKey(meta, "System", "Account", keyring.PublicKey) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "CreateStorageKey System Account err") + return "", errors.Wrap(err, "CreateStorageKey System Account err") } ok, err := api.RPC.State.GetStorageLatest(key, &accountInfo) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetStorageLatest err") + return "", errors.Wrap(err, "GetStorageLatest err") } if !ok { - return "", configs.Code_500, errors.New("GetStorageLatest return value is empty") + return "", errors.New("GetStorageLatest return value is empty") } o := types.SignatureOptions{ @@ -836,13 +836,13 @@ func UpdateAddress(transactionPrK, addr string) (string, int, error) { // Sign the transaction err = ext.Sign(keyring, o) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "Sign err") + return "", errors.Wrap(err, "Sign err") } // Do the transfer and track the actual status sub, err := api.RPC.Author.SubmitAndWatchExtrinsic(ext) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "SubmitAndWatchExtrinsic err") + return "", errors.Wrap(err, "SubmitAndWatchExtrinsic err") } defer sub.Unsubscribe() timeout := time.After(time.Second * configs.TimeToWaitEvents_S) @@ -850,18 +850,35 @@ func UpdateAddress(transactionPrK, addr string) (string, int, error) { select { case status := <-sub.Chan(): if status.IsInBlock { + events := MyEventRecords{} txhash, _ := types.EncodeToHexString(status.AsInBlock) - return txhash, configs.Code_600, nil + keye, err := types.CreateStorageKey(meta, "System", "Events", nil) + if err != nil { + return txhash, errors.Wrap(err, "GetKeyEvents") + } + h, err := api.RPC.State.GetStorageRaw(keye, status.AsInBlock) + if err != nil { + return txhash, errors.Wrap(err, "GetStorageRaw") + } + + types.EventRecordsRaw(*h).DecodeEventRecords(meta, &events) + + if len(events.Sminer_UpdataIp) > 0 { + if string(events.Sminer_UpdataIp[0].Acc[:]) == string(pattern.GetMinerAcc()) { + return txhash, nil + } + } + return txhash, errors.New(ERR_Failed) } case err = <-sub.Err(): - return "", configs.Code_500, err + return "", errors.Wrap(err, "<-sub") case <-timeout: - return "", configs.Code_500, errors.Errorf("timeout") + return "", errors.Errorf("timeout") } } } -func UpdateIncome(transactionPrK string, acc types.AccountID) (string, int, error) { +func UpdateIncome(transactionPrK string, acc types.AccountID) (string, error) { var ( err error accountInfo types.AccountInfo @@ -873,49 +890,49 @@ func UpdateIncome(transactionPrK string, acc types.AccountID) (string, int, erro }() api, err := NewRpcClient(configs.C.RpcAddr) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "NewRpcClient err") + return "", errors.Wrap(err, "NewRpcClient err") } keyring, err := signature.KeyringPairFromSecret(transactionPrK, 0) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "KeyringPairFromSecret err") + return "", errors.Wrap(err, "KeyringPairFromSecret err") } meta, err := api.RPC.State.GetMetadataLatest() if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetMetadataLatest err") + return "", errors.Wrap(err, "GetMetadataLatest err") } c, err := types.NewCall(meta, ChainTx_Sminer_UpdateBeneficiary, acc) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "NewCall err") + return "", errors.Wrap(err, "NewCall err") } ext := types.NewExtrinsic(c) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "NewExtrinsic err") + return "", errors.Wrap(err, "NewExtrinsic err") } genesisHash, err := api.RPC.Chain.GetBlockHash(0) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetBlockHash err") + return "", errors.Wrap(err, "GetBlockHash err") } rv, err := api.RPC.State.GetRuntimeVersionLatest() if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetRuntimeVersionLatest err") + return "", errors.Wrap(err, "GetRuntimeVersionLatest err") } key, err := types.CreateStorageKey(meta, "System", "Account", keyring.PublicKey) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "CreateStorageKey System Account err") + return "", errors.Wrap(err, "CreateStorageKey System Account err") } ok, err := api.RPC.State.GetStorageLatest(key, &accountInfo) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "GetStorageLatest err") + return "", errors.Wrap(err, "GetStorageLatest err") } if !ok { - return "", configs.Code_500, errors.New("GetStorageLatest return value is empty") + return "", errors.New("GetStorageLatest return value is empty") } o := types.SignatureOptions{ @@ -931,13 +948,13 @@ func UpdateIncome(transactionPrK string, acc types.AccountID) (string, int, erro // Sign the transaction err = ext.Sign(keyring, o) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "Sign err") + return "", errors.Wrap(err, "Sign err") } // Do the transfer and track the actual status sub, err := api.RPC.Author.SubmitAndWatchExtrinsic(ext) if err != nil { - return "", configs.Code_500, errors.Wrap(err, "SubmitAndWatchExtrinsic err") + return "", errors.Wrap(err, "SubmitAndWatchExtrinsic err") } defer sub.Unsubscribe() timeout := time.After(time.Second * configs.TimeToWaitEvents_S) @@ -945,13 +962,30 @@ func UpdateIncome(transactionPrK string, acc types.AccountID) (string, int, erro select { case status := <-sub.Chan(): if status.IsInBlock { + events := MyEventRecords{} txhash, _ := types.EncodeToHexString(status.AsInBlock) - return txhash, configs.Code_600, nil + keye, err := types.CreateStorageKey(meta, "System", "Events", nil) + if err != nil { + return txhash, errors.Wrap(err, "GetKeyEvents") + } + h, err := api.RPC.State.GetStorageRaw(keye, status.AsInBlock) + if err != nil { + return txhash, errors.Wrap(err, "GetStorageRaw") + } + + types.EventRecordsRaw(*h).DecodeEventRecords(meta, &events) + + if len(events.Sminer_UpdataBeneficiary) > 0 { + if string(events.Sminer_UpdataBeneficiary[0].Acc[:]) == string(pattern.GetMinerAcc()) { + return txhash, nil + } + } + return txhash, errors.New(ERR_Failed) } case err = <-sub.Err(): - return "", configs.Code_500, err + return "", errors.Wrap(err, "<-sub") case <-timeout: - return "", configs.Code_500, errors.Errorf("timeout") + return "", errors.Errorf("timeout") } } } diff --git a/internal/pattern/blacklist.go b/internal/pattern/blacklist.go new file mode 100644 index 0000000..aa47742 --- /dev/null +++ b/internal/pattern/blacklist.go @@ -0,0 +1,42 @@ +package pattern + +import ( + "sync" + "time" +) + +type Blacklist struct { + list map[string]int64 + l *sync.Mutex +} + +var blacklist *Blacklist + +func init() { + blacklist = new(Blacklist) + blacklist.l = new(sync.Mutex) + blacklist.list = make(map[string]int64, 10) +} + +func AddToBlacklist(key string) { + blacklist.l.Lock() + blacklist.list[key] = time.Now().Unix() + blacklist.l.Unlock() +} + +func IsInBlacklist(key string) bool { + blacklist.l.Lock() + _, ok := blacklist.list[key] + blacklist.l.Unlock() + return ok +} + +func DeleteExpiredBlacklist() { + blacklist.l.Lock() + for k, v := range blacklist.list { + if time.Since(time.Unix(v, 0)).Minutes() > 60 { + delete(blacklist.list, k) + } + } + blacklist.l.Unlock() +} diff --git a/internal/rpc/main.go b/internal/rpc/main.go index f87f4dc..a1e23fc 100644 --- a/internal/rpc/main.go +++ b/internal/rpc/main.go @@ -38,6 +38,7 @@ const ( RpcMethod_Scheduler_Space = "space" RpcMethod_Scheduler_Spacefile = "spacefile" RpcMethod_Scheduler_FillerBack = "fillerback" + RpcMethod_Scheduler_FillerFall = "fillerfall" RpcMethod_Scheduler_State = "state" RpcFileBuffer = 1024 * 1024 //1MB ) @@ -361,7 +362,7 @@ func (MService) ReadfiletagAction(body []byte) (proto.Message, error) { } // -func WriteData(cli *Client, service, method string, t time.Duration, body []byte) (int, []byte, bool, error) { +func WriteData(cli *Client, service, method string, t time.Duration, body []byte) (int, string, []byte, bool, error) { req := &ReqMsg{ Service: service, Method: method, @@ -371,18 +372,18 @@ func WriteData(cli *Client, service, method string, t time.Duration, body []byte resp, err := cli.Call(ctx, req) if err != nil { cli.Close() - return 0, nil, true, errors.Wrap(err, "Call err:") + return 0, "", nil, true, errors.Wrap(err, "Call err:") } var b RespBody if len(resp.Body) == 0 { - return 0, nil, false, errors.New("empty body") + return 0, "", nil, false, errors.New("empty body") } err = proto.Unmarshal(resp.Body, &b) if err != nil { - return 0, nil, false, errors.Wrap(err, "Unmarshal:") + return 0, "", nil, false, errors.Wrap(err, "Unmarshal:") } - return int(b.Code), b.Data, false, nil + return int(b.Code), b.Msg, b.Data, false, nil } diff --git a/internal/task/miner.go b/internal/task/miner.go index 79c953d..38f875f 100644 --- a/internal/task/miner.go +++ b/internal/task/miner.go @@ -39,6 +39,7 @@ func task_self_judgment(ch chan bool) { if failcount >= 10 { os.Exit(1) } + pattern.DeleteExpiredBlacklist() time.Sleep(time.Minute * 5) } } diff --git a/internal/task/space.go b/internal/task/space.go index 1818bae..11e5d30 100644 --- a/internal/task/space.go +++ b/internal/task/space.go @@ -13,6 +13,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/big" "net/http" "os" "path/filepath" @@ -112,6 +113,8 @@ func task_SpaceManagement(ch chan bool) { } } + Flr.Sugar().Infof("Connected to %v", pattern.GetMinerRecentSche()) + if time.Since(tSpace).Minutes() >= 10 { availableSpace, err = calcAvailableSpace() if err != nil { @@ -140,7 +143,7 @@ func task_SpaceManagement(ch chan bool) { continue } - respCode, respBody, clo, err := rpc.WriteData( + respCode, respMsg, respBody, clo, err := rpc.WriteData( client, rpc.RpcService_Scheduler, rpc.RpcMethod_Scheduler_Space, @@ -149,7 +152,9 @@ func task_SpaceManagement(ch chan bool) { ) reconn = clo if err != nil { - Flr.Sugar().Errorf("%v", err) + fail_sche := pattern.GetMinerRecentSche() + Flr.Sugar().Errorf("Space: %v, code:%v msg:%v err:%v", fail_sche, respCode, respMsg, err) + pattern.AddToBlacklist(fail_sche) time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 30))) continue } @@ -161,17 +166,41 @@ func task_SpaceManagement(ch chan bool) { Flr.Sugar().Errorf(" %v", err) continue } - var fillerurl string = "http://" + string(base58.Decode(basefiller.MinerIp[0])) + "/" + basefiller.FillerId + mip := string(base58.Decode(basefiller.MinerIp[0])) + var fillerurl string = "http://" + mip + "/" + basefiller.FillerId var fillertagurl string = fillerurl + ".tag" - fillerbody, err := getFiller(fillerurl) + if pattern.IsInBlacklist(mip) { + continue + } + Flr.Sugar().Infof("%v", fillerurl) + fillerbody, err := getFiller(fillerurl, time.Duration(time.Second*90)) if err != nil { - time.Sleep(time.Second * time.Duration(tools.RandomInRange(3, 6))) - fillerbody, err = getFiller(fillerurl) + Flr.Sugar().Errorf("%v", err) + pattern.AddToBlacklist(mip) + // + var req_back FillerBackReq + req_back.Publickey = pattern.GetMinerAcc() + req_back.FileId = []byte(basefiller.FillerId) + req_back.FileHash = nil + req_back_req, err := proto.Marshal(&req_back) if err != nil { Flr.Sugar().Errorf("%v", err) time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 10))) continue } + + _, _, _, reconn, err = rpc.WriteData( + client, + rpc.RpcService_Scheduler, + rpc.RpcMethod_Scheduler_FillerFall, + time.Duration(time.Second*30), + req_back_req, + ) + if err != nil { + Flr.Sugar().Errorf("%v", err) + } + time.Sleep(time.Second * time.Duration(tools.RandomInRange(3, 6))) + continue } spacefilefullpath := filepath.Join(configs.SpaceDir, basefiller.FillerId) err = write_file(spacefilefullpath, fillerbody) @@ -181,12 +210,12 @@ func task_SpaceManagement(ch chan bool) { time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 10))) continue } - fillertagbody, err := getFiller(fillertagurl) + fillertagbody, err := getFiller(fillertagurl, time.Duration(time.Second*20)) if err != nil { - time.Sleep(time.Second * time.Duration(tools.RandomInRange(3, 6))) - fillertagbody, err = getFiller(fillertagurl) if err != nil { Flr.Sugar().Errorf("%v", err) + pattern.AddToBlacklist(mip) + os.Remove(spacefilefullpath) time.Sleep(time.Second * time.Duration(tools.RandomInRange(3, 6))) continue } @@ -201,6 +230,7 @@ func task_SpaceManagement(ch chan bool) { time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 10))) continue } + hash, err := tools.CalcFileHash(spacefilefullpath) if err != nil { os.Remove(tagfilefullpath) @@ -222,33 +252,49 @@ func task_SpaceManagement(ch chan bool) { continue } - _, _, reconn, err = rpc.WriteData( + respCode, _, _, reconn, err = rpc.WriteData( client, rpc.RpcService_Scheduler, rpc.RpcMethod_Scheduler_FillerBack, - time.Duration(time.Second*30), + time.Duration(time.Second*20), req_back_req, ) - if err != nil { - if clo { + if respCode != 200 { + if reconn { client, err = ReConnect(pattern.GetMinerRecentSche()) if err != nil { Flr.Sugar().Errorf("%v", err) + os.Remove(tagfilefullpath) + os.Remove(spacefilefullpath) + fail_sche := pattern.GetMinerRecentSche() + pattern.AddToBlacklist(fail_sche) time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 10))) continue } - _, _, reconn, err = rpc.WriteData( + respCode, _, _, reconn, err = rpc.WriteData( client, rpc.RpcService_Scheduler, rpc.RpcMethod_Scheduler_FillerBack, - time.Duration(time.Second*30), + time.Duration(time.Second*20), req_back_req, ) - if err != nil { + if respCode != 200 { Flr.Sugar().Errorf("%v", err) + os.Remove(tagfilefullpath) + os.Remove(spacefilefullpath) + fail_sche := pattern.GetMinerRecentSche() + pattern.AddToBlacklist(fail_sche) time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 10))) continue } + } else { + reconn = true + Flr.Sugar().Errorf("%v", err) + os.Remove(tagfilefullpath) + os.Remove(spacefilefullpath) + fail_sche := pattern.GetMinerRecentSche() + pattern.AddToBlacklist(fail_sche) + continue } } Flr.Sugar().Infof("C-filler: %v", basefiller.FillerId) @@ -256,8 +302,10 @@ func task_SpaceManagement(ch chan bool) { } if respCode != 200 { - Flr.Sugar().Errorf("%v", respCode) - time.Sleep(time.Second * time.Duration(tools.RandomInRange(10, 30))) + fail_sche := pattern.GetMinerRecentSche() + Flr.Sugar().Errorf("Call %v, code:%v msg:%v", fail_sche, respCode, respMsg) + pattern.AddToBlacklist(fail_sche) + time.Sleep(time.Second * 3) reconn = true continue } @@ -309,17 +357,18 @@ func task_SpaceManagement(ch chan bool) { time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 10))) break } - respCode, respBody, clo, err = rpc.WriteData( + respCode, respMsg, respBody, reconn, err = rpc.WriteData( client, rpc.RpcService_Scheduler, rpc.RpcMethod_Scheduler_Spacefile, - time.Duration(time.Second*100), + time.Duration(time.Second*60), req_b, ) - reconn = clo if err != nil { - Flr.Sugar().Errorf(" %v", err) f.Close() + fail_sche := pattern.GetMinerRecentSche() + Flr.Sugar().Errorf("Spacefile: %v, code:%v msg:%v err:%v", fail_sche, respCode, respMsg, err) + pattern.AddToBlacklist(fail_sche) os.Remove(tagfilefullpath) os.Remove(spacefilefullpath) time.Sleep(time.Second * time.Duration(tools.RandomInRange(5, 10))) @@ -377,12 +426,26 @@ func connectionScheduler(schds []chain.SchedulerInfo) (*rpc.Client, error) { var wsURL string for i := 0; i < len(schds); i++ { wsURL = "ws://" + string(base58.Decode(string(schds[i].Ip))) + if pattern.IsInBlacklist(wsURL) { + continue + } + accountinfo, err := chain.GetAccountInfo(schds[i].Controller_user[:]) + if err != nil { + if err.Error() == chain.ERR_Empty { + pattern.AddToBlacklist(wsURL) + } + continue + } + if accountinfo.Data.Free.CmpAbs(new(big.Int).SetUint64(2000000000000)) == -1 { + pattern.AddToBlacklist(wsURL) + continue + } ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) cli, err = rpc.DialWebsocket(ctx, wsURL, "") if err != nil { continue } - respCode, respBody, _, _ := rpc.WriteData( + respCode, _, respBody, _, _ := rpc.WriteData( cli, rpc.RpcService_Scheduler, rpc.RpcMethod_Scheduler_State, @@ -403,7 +466,7 @@ func connectionScheduler(schds []chain.SchedulerInfo) (*rpc.Client, error) { resu = resu << 8 resu += int32(respBody[3]) } - if resu < 5 { + if resu < 10 { pattern.SetMinerRecentSche(wsURL) return cli, nil } @@ -411,14 +474,13 @@ func connectionScheduler(schds []chain.SchedulerInfo) (*rpc.Client, error) { cli.Close() } var ok = false - var threshold int32 = 5 + var threshold int32 = 10 for !ok { for k, v := range state { - if (threshold-5) <= v && v < threshold { + if (threshold-10) <= v && v < threshold { ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) cli, err = rpc.DialWebsocket(ctx, k, "") if err == nil { - Flr.Sugar().Infof("Connect to %v", k) pattern.SetMinerRecentSche(k) ok = true break @@ -437,10 +499,11 @@ func connectionScheduler(schds []chain.SchedulerInfo) (*rpc.Client, error) { return cli, err } -func getFiller(url string) ([]byte, error) { +func getFiller(url string, t time.Duration) ([]byte, error) { req, err := http.NewRequest(http.MethodGet, url, nil) client := &http.Client{ + Timeout: t, Transport: globalTransport, } resp, err := client.Do(req)