Skip to content

Commit

Permalink
Trade limit (#239)
Browse files Browse the repository at this point in the history
* Optimize trading timing, fix: #238

* update version to v0.6.2
  • Loading branch information
AstaFrode authored Sep 6, 2024
1 parent 649c56b commit d3d1a70
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 702 deletions.
3 changes: 0 additions & 3 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,3 @@ MY_ADDR="cXgaee2N8E77JJv9gdsGAckv1Qsf3hqWYf7NL4q6ZuQzuAUtB"

# testnet
RPC_ADDRS="ws://localhost:9944"

# testnet
BOOTSTRAP_NODES="_dnsaddr.boot-miner-testnet.cess.network"
3 changes: 0 additions & 3 deletions .env.testnet
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,3 @@ MY_ADDR="cXgaee2N8E77JJv9gdsGAckv1Qsf3hqWYf7NL4q6ZuQzuAUtB" # root

# testnet
RPC_ADDRS="wss://testnet-rpc.cess.network/ws/"

# testnet
BOOTSTRAP_NODES="_dnsaddr.boot-miner-testnet.cess.network"
104 changes: 40 additions & 64 deletions chain/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,9 @@ func (c *ChainClient) QueryCountedServiceFailed(accountID []byte, block int32) (
// - string: block hash
// - error: error message
func (c *ChainClient) SubmitIdleProof(idleProof []types.U8) (string, error) {
if !c.GetRpcState() {
return "", ERR_RPC_CONNECTION
}

<-c.tradeCh
defer func() {
c.tradeCh <- true
if err := recover(); err != nil {
log.Println(utils.RecoverError(err))
}
Expand Down Expand Up @@ -227,6 +225,14 @@ func (c *ChainClient) SubmitIdleProof(idleProof []types.U8) (string, error) {
return blockhash, err
}

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_idle_proof, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
}

ok, err := c.api.RPC.State.GetStorageLatest(key, &accountInfo)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] GetStorageLatest: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_idle_proof, err)
Expand Down Expand Up @@ -254,17 +260,6 @@ func (c *ChainClient) SubmitIdleProof(idleProof []types.U8) (string, error) {
return blockhash, err
}

<-c.txTicker.C

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_idle_proof, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
<-c.txTicker.C
}

// Do the transfer and track the actual status
sub, err := c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
Expand All @@ -274,7 +269,6 @@ func (c *ChainClient) SubmitIdleProof(idleProof []types.U8) (string, error) {
if err != nil {
return blockhash, errors.Wrap(err, "[Sign]")
}
<-c.txTicker.C
sub, err = c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] SubmitAndWatchExtrinsic: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_idle_proof, err)
Expand Down Expand Up @@ -315,11 +309,9 @@ func (c *ChainClient) SubmitIdleProof(idleProof []types.U8) (string, error) {
// - string: block hash
// - error: error message
func (c *ChainClient) SubmitServiceProof(serviceProof []types.U8) (string, error) {
if !c.GetRpcState() {
return "", ERR_RPC_CONNECTION
}

<-c.tradeCh
defer func() {
c.tradeCh <- true
if err := recover(); err != nil {
log.Println(utils.RecoverError(err))
}
Expand All @@ -344,6 +336,14 @@ func (c *ChainClient) SubmitServiceProof(serviceProof []types.U8) (string, error
return blockhash, err
}

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_service_proof, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
}

ok, err := c.api.RPC.State.GetStorageLatest(key, &accountInfo)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] GetStorageLatest: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_service_proof, err)
Expand Down Expand Up @@ -371,17 +371,6 @@ func (c *ChainClient) SubmitServiceProof(serviceProof []types.U8) (string, error
return blockhash, err
}

<-c.txTicker.C

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_service_proof, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
<-c.txTicker.C
}

// Do the transfer and track the actual status
sub, err := c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
Expand All @@ -391,7 +380,6 @@ func (c *ChainClient) SubmitServiceProof(serviceProof []types.U8) (string, error
if err != nil {
return blockhash, errors.Wrap(err, "[Sign]")
}
<-c.txTicker.C
sub, err = c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] SubmitAndWatchExtrinsic: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_service_proof, err)
Expand Down Expand Up @@ -438,11 +426,9 @@ func (c *ChainClient) SubmitServiceProof(serviceProof []types.U8) (string, error
// - string: block hash
// - error: error message
func (c *ChainClient) SubmitVerifyIdleResult(totalProofHash []types.U8, front, rear types.U64, accumulator Accumulator, result types.Bool, sig types.Bytes, teePuk WorkerPublicKey) (string, error) {
if !c.GetRpcState() {
return "", ERR_RPC_CONNECTION
}

<-c.tradeCh
defer func() {
c.tradeCh <- true
if err := recover(); err != nil {
log.Println(utils.RecoverError(err))
}
Expand All @@ -467,6 +453,14 @@ func (c *ChainClient) SubmitVerifyIdleResult(totalProofHash []types.U8, front, r
return blockhash, err
}

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_idle_result, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
}

ok, err := c.api.RPC.State.GetStorageLatest(key, &accountInfo)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] GetStorageLatest: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_idle_result, err)
Expand Down Expand Up @@ -494,17 +488,6 @@ func (c *ChainClient) SubmitVerifyIdleResult(totalProofHash []types.U8, front, r
return blockhash, err
}

<-c.txTicker.C

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_idle_result, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
<-c.txTicker.C
}

// Do the transfer and track the actual status
sub, err := c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
Expand All @@ -514,7 +497,6 @@ func (c *ChainClient) SubmitVerifyIdleResult(totalProofHash []types.U8, front, r
if err != nil {
return blockhash, errors.Wrap(err, "[Sign]")
}
<-c.txTicker.C
sub, err = c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] SubmitAndWatchExtrinsic: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_idle_result, err)
Expand Down Expand Up @@ -558,11 +540,9 @@ func (c *ChainClient) SubmitVerifyIdleResult(totalProofHash []types.U8, front, r
// - string: block hash
// - error: error message
func (c *ChainClient) SubmitVerifyServiceResult(result types.Bool, sign types.Bytes, bloomFilter BloomFilter, teePuk WorkerPublicKey) (string, error) {
if !c.GetRpcState() {
return "", ERR_RPC_CONNECTION
}

<-c.tradeCh
defer func() {
c.tradeCh <- true
if err := recover(); err != nil {
log.Println(utils.RecoverError(err))
}
Expand All @@ -587,6 +567,14 @@ func (c *ChainClient) SubmitVerifyServiceResult(result types.Bool, sign types.By
return blockhash, err
}

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_service_result, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
}

ok, err := c.api.RPC.State.GetStorageLatest(key, &accountInfo)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] GetStorageLatest: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_service_result, err)
Expand Down Expand Up @@ -614,17 +602,6 @@ func (c *ChainClient) SubmitVerifyServiceResult(result types.Bool, sign types.By
return blockhash, err
}

<-c.txTicker.C

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_service_result, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
<-c.txTicker.C
}

// Do the transfer and track the actual status
sub, err := c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
Expand All @@ -634,7 +611,6 @@ func (c *ChainClient) SubmitVerifyServiceResult(result types.Bool, sign types.By
if err != nil {
return blockhash, errors.Wrap(err, "[Sign]")
}
<-c.txTicker.C
sub, err = c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] SubmitAndWatchExtrinsic: %v", c.GetCurrentRpcAddr(), ExtName_Audit_submit_verify_service_result, err)
Expand Down
25 changes: 10 additions & 15 deletions chain/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,9 @@ func (c *ChainClient) QueryInactiveIssuance(block int32) (string, error) {
// - string: block hash
// - error: error message
func (c *ChainClient) TransferToken(dest string, amount string) (string, error) {
if !c.GetRpcState() {
return "", ERR_RPC_CONNECTION
}

<-c.tradeCh
defer func() {
c.tradeCh <- true
if err := recover(); err != nil {
log.Println(utils.RecoverError(err))
}
Expand Down Expand Up @@ -197,6 +195,14 @@ func (c *ChainClient) TransferToken(dest string, amount string) (string, error)
return blockhash, err
}

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Balances_transferKeepAlive, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
}

ok, err = c.api.RPC.State.GetStorageLatest(key, &accountInfo)
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] GetStorageLatest: %v", c.GetCurrentRpcAddr(), ExtName_Balances_transferKeepAlive, err)
Expand Down Expand Up @@ -224,17 +230,6 @@ func (c *ChainClient) TransferToken(dest string, amount string) (string, error)
return blockhash, err
}

<-c.txTicker.C

if !c.GetRpcState() {
err = c.ReconnectRpc()
if err != nil {
err = fmt.Errorf("rpc err: [%s] [tx] [%s] %s", c.GetCurrentRpcAddr(), ExtName_Balances_transferKeepAlive, ERR_RPC_CONNECTION.Error())
return blockhash, err
}
<-c.txTicker.C
}

// Do the transfer and track the actual status
sub, err := c.api.RPC.Author.SubmitAndWatchExtrinsic(ext)
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
type ChainClient struct {
lock *sync.Mutex
chainStLock *sync.Mutex
txTicker *time.Ticker
api *gsrpc.SubstrateAPI
metadata *types.Metadata
runtimeVersion *types.RuntimeVersion
Expand All @@ -39,12 +38,13 @@ type ChainClient struct {
keyring signature.KeyringPair
rpcAddr []string
currentRpcAddr string
packingTime time.Duration
tokenSymbol string
networkEnv string
signatureAcc string
name string
balance uint64
packingTime time.Duration
tradeCh chan bool
rpcState bool
}

Expand All @@ -65,11 +65,12 @@ func NewChainClientUnconnectedRpc(ctx context.Context, name string, rpcs []strin
var chainClient = &ChainClient{
lock: new(sync.Mutex),
chainStLock: new(sync.Mutex),
txTicker: time.NewTicker(BlockInterval),
tradeCh: make(chan bool, 1),
rpcAddr: rpcs,
packingTime: t,
name: name,
}
chainClient.tradeCh <- true
if mnemonic != "" {
chainClient.keyring, err = signature.KeyringPairFromSecret(mnemonic, 0)
if err != nil {
Expand Down Expand Up @@ -100,12 +101,13 @@ func NewChainClient(ctx context.Context, name string, rpcs []string, mnemonic st
chainClient = &ChainClient{
lock: new(sync.Mutex),
chainStLock: new(sync.Mutex),
txTicker: time.NewTicker(BlockInterval),
tradeCh: make(chan bool, 1),
rpcAddr: rpcs,
packingTime: t,
name: name,
}
)
chainClient.tradeCh <- true

log.SetOutput(io.Discard)
for i := 0; i < len(rpcs); i++ {
Expand Down
Loading

0 comments on commit d3d1a70

Please sign in to comment.