From d151e0775af8e81248c2a41749c68ada38b0cc01 Mon Sep 17 00:00:00 2001 From: wizeguyy Date: Wed, 16 Oct 2024 16:31:55 -0500 Subject: [PATCH] Generate access event on unlock --- common/types.go | 5 ++ core/bodydb.go | 11 +-- core/core.go | 4 + core/events.go | 5 ++ core/headerchain.go | 14 ++- core/state_processor.go | 139 ++++++++++++++++------------- core/worker.go | 2 +- quai/api_backend.go | 8 ++ quai/filters/api.go | 19 +++- quai/filters/filter.go | 1 + quai/filters/filter_system.go | 37 +++++++- quai/filters/filter_system_test.go | 5 ++ 12 files changed, 175 insertions(+), 75 deletions(-) diff --git a/common/types.go b/common/types.go index 47eddac212..6d89c9072b 100644 --- a/common/types.go +++ b/common/types.go @@ -788,3 +788,8 @@ func SetBlockHashForQi(blockHash Hash, nodeLocation Location) Hash { blockHash[3] |= 0x80 // 10000000 in binary (set first bit to 1) return blockHash } + +type Unlock struct { + Addr InternalAddress + Amt *big.Int +} diff --git a/core/bodydb.go b/core/bodydb.go index aebfb508f6..305969d9b7 100644 --- a/core/bodydb.go +++ b/core/bodydb.go @@ -77,7 +77,7 @@ func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chai } // Append -func (bc *BodyDb) Append(block *types.WorkObject) ([]*types.Log, error) { +func (bc *BodyDb) Append(block *types.WorkObject) ([]*types.Log, []common.Unlock, error) { startLock := time.Now() batch := bc.db.NewBatch() @@ -85,12 +85,13 @@ func (bc *BodyDb) Append(block *types.WorkObject) ([]*types.Log, error) { locktime := time.Since(startLock) nodeCtx := bc.NodeCtx() var logs []*types.Log + var unlocks []common.Unlock var err error if nodeCtx == common.ZONE_CTX && bc.ProcessingState() { // Process our block - logs, err = bc.processor.Apply(batch, block) + logs, unlocks, err = bc.processor.Apply(batch, block) if err != nil { - return nil, err + return nil, nil, err } rawdb.WriteTxLookupEntriesByBlock(batch, block, nodeCtx) } @@ -103,9 +104,9 @@ func (bc *BodyDb) Append(block *types.WorkObject) ([]*types.Log, error) { bc.logger.WithField("apply state", common.PrettyDuration(time.Since(stateApply))).Debug("Time taken to") if err = batch.Write(); err != nil { - return nil, err + return nil, nil, err } - return logs, nil + return logs, unlocks, nil } func (bc *BodyDb) ProcessingState() bool { diff --git a/core/core.go b/core/core.go index e513cc6bd5..16654375e1 100644 --- a/core/core.go +++ b/core/core.go @@ -1167,6 +1167,10 @@ func (c *Core) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { return c.sl.miner.worker.pendingLogsFeed.Subscribe(ch) } +func (c *Core) SubscribeUnlocks(ch chan<- UnlocksEvent) event.Subscription { + return c.sl.hc.unlocksFeed.Subscribe(ch) +} + // SubscribePendingBlock starts delivering the pending block to the given channel. func (c *Core) SubscribePendingHeader(ch chan<- *types.WorkObject) event.Subscription { return c.sl.miner.SubscribePendingHeader(ch) diff --git a/core/events.go b/core/events.go index 6344fb4bc5..9d3eb4027b 100644 --- a/core/events.go +++ b/core/events.go @@ -24,6 +24,11 @@ type ChainEvent struct { Entropy *big.Int } +type UnlocksEvent struct { + Hash common.Hash + Unlocks []common.Unlock +} + type ChainSideEvent struct { Blocks []*types.WorkObject } diff --git a/core/headerchain.go b/core/headerchain.go index 756ee5b561..b9d39f3ba0 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -63,6 +63,7 @@ type HeaderChain struct { currentExpansionNumber uint8 chainHeadFeed event.Feed + unlocksFeed event.Feed chainSideFeed event.Feed scope event.SubscriptionScope @@ -423,10 +424,16 @@ func (hc *HeaderChain) setStateProcessing() bool { func (hc *HeaderChain) AppendBlock(block *types.WorkObject) error { blockappend := time.Now() // Append block else revert header append - logs, err := hc.bc.Append(block) + logs, unlocks, err := hc.bc.Append(block) if err != nil { return err } + if unlocks != nil && len(unlocks) > 0 { + hc.unlocksFeed.Send(UnlocksEvent{ + Hash: block.Hash(), + Unlocks: unlocks, + }) + } hc.logger.WithField("append block", common.PrettyDuration(time.Since(blockappend))).Debug("Time taken to") if len(logs) > 0 { @@ -1177,6 +1184,11 @@ func (hc *HeaderChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.S return hc.scope.Track(hc.chainHeadFeed.Subscribe(ch)) } +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (hc *HeaderChain) SubscribeUnlocksEvent(ch chan<- UnlocksEvent) event.Subscription { + return hc.scope.Track(hc.unlocksFeed.Subscribe(ch)) +} + // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (hc *HeaderChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return hc.scope.Track(hc.chainSideFeed.Subscribe(ch)) diff --git a/core/state_processor.go b/core/state_processor.go index 27b9232104..737edbf97f 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -219,7 +219,7 @@ type UtxosCreatedDeleted struct { // Process returns the receipts and logs accumulated during the process and // returns the amount of gas that was used in the process. If any of the // transactions failed to execute due to insufficient gas it will return an error. -func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (types.Receipts, []*types.Transaction, []*types.Log, *state.StateDB, uint64, uint64, uint64, *multiset.MultiSet, error) { +func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (types.Receipts, []*types.Transaction, []*types.Log, *state.StateDB, uint64, uint64, uint64, *multiset.MultiSet, []common.Unlock, error) { var ( receipts types.Receipts usedGas = new(uint64) @@ -236,7 +236,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty start := time.Now() parent := p.hc.GetBlock(block.ParentHash(nodeCtx), block.NumberU64(nodeCtx)-1) if parent == nil { - return types.Receipts{}, []*types.Transaction{}, []*types.Log{}, nil, 0, 0, 0, nil, errors.New("parent block is nil for the block given to process") + return types.Receipts{}, []*types.Transaction{}, []*types.Log{}, nil, 0, 0, 0, nil, nil, errors.New("parent block is nil for the block given to process") } time1 := common.PrettyDuration(time.Since(start)) @@ -254,14 +254,14 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty // Initialize a statedb statedb, err := state.New(parentEvmRoot, parentEtxSetRoot, parentQuaiStateSize, p.stateCache, p.etxCache, p.snaps, nodeLocation, p.logger) if err != nil { - return types.Receipts{}, []*types.Transaction{}, []*types.Log{}, nil, 0, 0, 0, nil, err + return types.Receipts{}, []*types.Transaction{}, []*types.Log{}, nil, 0, 0, 0, nil, nil, err } utxosCreatedDeleted := new(UtxosCreatedDeleted) // utxos created and deleted in this block // Apply the previous inbound ETXs to the ETX set state prevInboundEtxs := rawdb.ReadInboundEtxs(p.hc.bc.db, header.ParentHash(nodeCtx)) if len(prevInboundEtxs) > 0 { if err := statedb.PushETXs(prevInboundEtxs); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not push prev inbound etxs: %w", err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not push prev inbound etxs: %w", err) } } time2 := common.PrettyDuration(time.Since(start)) @@ -291,7 +291,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty blockContext, err := NewEVMBlockContext(header, parent, p.hc, nil) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, p.vmConfig) time3 := common.PrettyDuration(time.Since(start)) @@ -327,17 +327,18 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty // Check the min base fee, and max base fee maxBaseFee, err := p.hc.CalcMaxBaseFee(parent) if maxBaseFee == nil && !p.hc.IsGenesisHash(parent.Hash()) { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not calculate max base fee %s", err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not calculate max base fee %s", err) } primeTerminus := p.hc.GetHeaderByHash(header.PrimeTerminusHash()) if primeTerminus == nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not find prime terminus header %032x", header.PrimeTerminusHash()) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not find prime terminus header %032x", header.PrimeTerminusHash()) } // Redeem all Quai for the different lock up periods - if err := RedeemLockedQuai(p.hc, header, parent, statedb); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("error redeeming locked quai: %w", err) + err, unlocks := RedeemLockedQuai(p.hc, header, parent, statedb) + if err != nil { + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("error redeeming locked quai: %w", err) } // Set the min gas price to the lowest gas price in the transaction If that @@ -356,7 +357,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } qiTxFee, fees, etxs, err, timing := ProcessQiTx(tx, p.hc, checkSig, firstQiTx, header, batch, p.hc.headerDb, gp, usedGas, p.hc.pool.signer, p.hc.NodeLocation(), *p.config.ChainID, qiScalingFactor, &etxRLimit, &etxPLimit, utxosCreatedDeleted) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } firstQiTx = false startEtxAppend := time.Now() @@ -374,7 +375,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty qiGasPrice := new(big.Int).Div(qiTxFeeInQuai, big.NewInt(int64(types.CalculateBlockQiTxGas(tx, qiScalingFactor, p.hc.NodeLocation())))) if qiGasPrice.Cmp(minBaseFee) < 0 { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("qi tx has base fee less than min base fee not apply tx %d [%v]", i, tx.Hash().Hex()) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("qi tx has base fee less than min base fee not apply tx %d [%v]", i, tx.Hash().Hex()) } // If the gas price from this qi tx is greater than the max base fee @@ -406,7 +407,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty msg, err := tx.AsMessageWithSender(types.MakeSigner(p.config, header.Number(nodeCtx)), header.BaseFee(), senders[tx.Hash()]) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } timeSignDelta := time.Since(startProcess) timeSign += timeSignDelta @@ -424,13 +425,13 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty // ETXs MUST be included in order, so popping the first from the queue must equal the first in the block etx, err := statedb.PopETX() if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not pop etx from statedb: %w", err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not pop etx from statedb: %w", err) } if etx == nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("etx %x is nil", tx.Hash()) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("etx %x is nil", tx.Hash()) } if etx.Hash() != tx.Hash() { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("invalid external transaction: etx %x is not in order or not found in unspent etx set", tx.Hash()) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("invalid external transaction: etx %x is not in order or not found in unspent etx set", tx.Hash()) } // check if the tx is a coinbase tx @@ -441,22 +442,22 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty // 4) etx emit threshold numbers if types.IsCoinBaseTx(tx) { if tx.To() == nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("coinbase tx %x has no recipient", tx.Hash()) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("coinbase tx %x has no recipient", tx.Hash()) } if len(tx.Data()) == 0 { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("coinbase tx %x has no lockup byte", tx.Hash()) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("coinbase tx %x has no lockup byte", tx.Hash()) } if _, err := tx.To().InternalAddress(); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("coinbase tx %x has invalid recipient: %w", tx.Hash(), err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("coinbase tx %x has invalid recipient: %w", tx.Hash(), err) } lockupByte := tx.Data()[0] if tx.To().IsInQiLedgerScope() { if int(lockupByte) > len(params.LockupByteToBlockDepth) { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("coinbase lockup byte %d is out of range", lockupByte) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("coinbase lockup byte %d is out of range", lockupByte) } lockup := new(big.Int).SetUint64(params.LockupByteToBlockDepth[lockupByte]) if lockup.Uint64() < params.ConversionLockPeriod { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("coinbase lockup period is less than the minimum lockup period of %d blocks", params.ConversionLockPeriod) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("coinbase lockup period is less than the minimum lockup period of %d blocks", params.ConversionLockPeriod) } value := params.CalculateCoinbaseValueWithLockup(tx.Value(), lockupByte) denominations := misc.FindMinDenominations(value) @@ -475,7 +476,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty utxo := types.NewUtxoEntry(types.NewTxOut(uint8(denomination), tx.To().Bytes(), lockup)) // the ETX hash is guaranteed to be unique if err := rawdb.CreateUTXO(batch, etx.Hash(), outputIndex, utxo); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } utxosCreatedDeleted.UtxosCreatedHashes = append(utxosCreatedDeleted.UtxosCreatedHashes, types.UTXOHash(etx.Hash(), outputIndex, utxo)) utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKeyWithDenomination(etx.Hash(), outputIndex, utxo.Denomination)) @@ -487,7 +488,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty if block.NumberU64(common.ZONE_CTX) > params.TimeToStartTx { // subtract the minimum tx gas from the gas pool if err := gp.SubGas(params.TxGas); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } *usedGas += params.TxGas totalEtxGas += params.TxGas @@ -506,7 +507,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } txGas -= params.TxGas if err := gp.SubGas(params.TxGas); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } *usedGas += params.TxGas totalEtxGas += params.TxGas @@ -525,14 +526,14 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } txGas -= params.CallValueTransferGas if err := gp.SubGas(params.CallValueTransferGas); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } *usedGas += params.CallValueTransferGas // In the future we may want to determine what a fair gas cost is totalEtxGas += params.CallValueTransferGas // In the future we may want to determine what a fair gas cost is utxo := types.NewUtxoEntry(types.NewTxOut(uint8(denomination), etx.To().Bytes(), lock)) // the ETX hash is guaranteed to be unique if err := rawdb.CreateUTXO(batch, etx.Hash(), outputIndex, utxo); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } utxosCreatedDeleted.UtxosCreatedHashes = append(utxosCreatedDeleted.UtxosCreatedHashes, types.UTXOHash(etx.Hash(), outputIndex, utxo)) utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKeyWithDenomination(etx.Hash(), outputIndex, utxo.Denomination)) @@ -544,13 +545,13 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty utxo := types.NewUtxoEntry(types.NewTxOut(uint8(etx.Value().Uint64()), etx.To().Bytes(), big.NewInt(0))) // There are no more checks to be made as the ETX is worked so add it to the set if err := rawdb.CreateUTXO(batch, etx.OriginatingTxHash(), etx.ETXIndex(), utxo); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } utxosCreatedDeleted.UtxosCreatedHashes = append(utxosCreatedDeleted.UtxosCreatedHashes, types.UTXOHash(etx.OriginatingTxHash(), etx.ETXIndex(), utxo)) utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKeyWithDenomination(etx.OriginatingTxHash(), etx.ETXIndex(), utxo.Denomination)) // This Qi ETX should cost more gas if err := gp.SubGas(params.CallValueTransferGas); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } *usedGas += params.CallValueTransferGas // In the future we may want to determine what a fair gas cost is totalEtxGas += params.CallValueTransferGas // In the future we may want to determine what a fair gas cost is @@ -562,7 +563,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty if types.IsConversionTx(etx) && etx.To().IsInQuaiLedgerScope() { // Qi->Quai Conversion // subtract the minimum tx gas from the gas pool if err := gp.SubGas(params.QiToQuaiConversionGas); err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } *usedGas += params.QiToQuaiConversionGas totalEtxGas += params.QiToQuaiConversionGas @@ -573,7 +574,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty receipt, fees, err = applyTransaction(msg, parent, p.config, p.hc, gp, statedb, blockNumber, blockHash, etx, usedGas, usedState, vmenv, &etxRLimit, &etxPLimit, p.logger) statedb.SetBalance(common.ZeroInternal(nodeLocation), prevZeroBal) // Reset the balance to what it previously was. Residual balance will be lost if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } addReceipt = true @@ -589,7 +590,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty fees := big.NewInt(0) receipt, fees, err = applyTransaction(msg, parent, p.config, p.hc, gp, statedb, blockNumber, blockHash, tx, usedGas, usedState, vmenv, &etxRLimit, &etxPLimit, p.logger) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } addReceipt = true timeTxDelta := time.Since(startTimeTx) @@ -600,7 +601,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty gasPrice := tx.GasPrice() if gasPrice.Cmp(minBaseFee) < 0 { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("quai tx has gas price less than min base fee not apply tx %d [%v]", i, tx.Hash().Hex()) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("quai tx has gas price less than min base fee not apply tx %d [%v]", i, tx.Hash().Hex()) } // If the gas price from this quai tx is greater than the max base fee @@ -620,7 +621,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } } else { - return nil, nil, nil, nil, 0, 0, 0, nil, ErrTxTypeNotSupported + return nil, nil, nil, nil, 0, 0, 0, nil, nil, ErrTxTypeNotSupported } for _, etx := range receipt.OutboundEtxs { if receipt.Status == types.ReceiptStatusSuccessful { @@ -635,42 +636,42 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } if nonEtxExists && block.BaseFee().Cmp(big.NewInt(0)) == 0 { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("block base fee is nil though non etx transactions exist") + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("block base fee is nil though non etx transactions exist") } if minGasPrice != nil && block.BaseFee().Cmp(minGasPrice) != 0 { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("invalid base fee used (remote: %d local: %d)", block.BaseFee(), minGasPrice) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("invalid base fee used (remote: %d local: %d)", block.BaseFee(), minGasPrice) } etxAvailable := false oldestIndex, err := statedb.GetOldestIndex() if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not get oldest index: %w", err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not get oldest index: %w", err) } // Check if there is at least one ETX in the set etx, err := statedb.ReadETX(oldestIndex) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("could not read etx: %w", err) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("could not read etx: %w", err) } if etx != nil { etxAvailable = true } if block.NumberU64(common.ZONE_CTX) <= params.TimeToStartTx && (etxAvailable && etxCount < minimumEtxCount || etxCount > maximumEtxCount) { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("total number of ETXs %d is not within the range %d to %d", etxCount, minimumEtxCount, maximumEtxCount) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("total number of ETXs %d is not within the range %d to %d", etxCount, minimumEtxCount, maximumEtxCount) } if block.NumberU64(common.ZONE_CTX) > params.TimeToStartTx && (etxAvailable && totalEtxGas < minimumEtxGas) || totalEtxGas > maximumEtxGas { p.logger.Errorf("prevInboundEtxs: %d, oldestIndex: %d, etxHash: %s", len(prevInboundEtxs), oldestIndex.Int64(), etx.Hash().Hex()) - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("total gas used by ETXs %d is not within the range %d to %d", totalEtxGas, minimumEtxGas, maximumEtxGas) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("total gas used by ETXs %d is not within the range %d to %d", totalEtxGas, minimumEtxGas, maximumEtxGas) } quaiCoinbase, err := block.QuaiCoinbase() if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } qiCoinbase, err := block.QiCoinbase() if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } primaryCoinbase := block.PrimaryCoinbase() @@ -713,14 +714,14 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty updatedTokenChoiceSet, err := CalculateTokenChoicesSet(p.hc, parent, emittedEtxs) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } var exchangeRate *big.Int var beta0, beta1 *big.Float if parent.NumberU64(common.ZONE_CTX) > params.ControllerKickInBlock { exchangeRate, beta0, beta1, err = CalculateExchangeRate(p.hc, parent, updatedTokenChoiceSet) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } } else { exchangeRate = parent.ExchangeRate() @@ -730,14 +731,14 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } err = rawdb.WriteTokenChoicesSet(batch, block.Hash(), &updatedTokenChoiceSet) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } err = rawdb.WriteBetas(batch, block.Hash(), beta0, beta1) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } if block.ExchangeRate().Cmp(exchangeRate) != 0 { - return nil, nil, nil, nil, 0, 0, 0, nil, fmt.Errorf("invalid exchange rate used (remote: %d local: %d)", block.ExchangeRate(), exchangeRate) + return nil, nil, nil, nil, 0, 0, 0, nil, nil, fmt.Errorf("invalid exchange rate used (remote: %d local: %d)", block.ExchangeRate(), exchangeRate) } for _, etx := range emittedEtxs { // If the etx is conversion @@ -759,7 +760,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) multiSet, utxoSetSize, err := p.engine.Finalize(p.hc, batch, block, statedb, false, parentUtxoSetSize, utxosCreatedDeleted.UtxosCreatedHashes, utxosCreatedDeleted.UtxosDeletedHashes) if err != nil { - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } time5 := common.PrettyDuration(time.Since(start)) @@ -800,18 +801,19 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty "numTxs": len(block.Transactions()), }).Info("Total Tx Processing Time") if err := rawdb.WriteSpentUTXOs(batch, blockHash, utxosCreatedDeleted.UtxosDeleted); err != nil { // Could do this in Apply instead - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } if err := rawdb.WriteCreatedUTXOKeys(batch, blockHash, utxosCreatedDeleted.UtxosCreatedKeys); err != nil { // Could do this in Apply instead - return nil, nil, nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, nil, err } - return receipts, emittedEtxs, allLogs, statedb, *usedGas, *usedState, utxoSetSize, multiSet, nil + return receipts, emittedEtxs, allLogs, statedb, *usedGas, *usedState, utxoSetSize, multiSet, unlocks, nil } // RedeemLockedQuai redeems any locked Quai for coinbase addresses at specific block depths. // It processes blocks based on predefined lockup periods and checks for unlockable Quai. // This function is intended to be run as part of the block processing. -func RedeemLockedQuai(hc *HeaderChain, header *types.WorkObject, parent *types.WorkObject, statedb *state.StateDB) error { +// Returns the list of unlocked coinbases +func RedeemLockedQuai(hc *HeaderChain, header *types.WorkObject, parent *types.WorkObject, statedb *state.StateDB) (error, []common.Unlock) { // Array of specific block depths for which we will redeem the Quai blockDepths := []uint64{ params.ConversionLockPeriod, @@ -822,6 +824,7 @@ func RedeemLockedQuai(hc *HeaderChain, header *types.WorkObject, parent *types.W } currentBlockHeight := header.Number(hc.NodeCtx()).Uint64() + unlocks := []common.Unlock{} // Loop through the predefined block depths for _, blockDepth := range blockDepths { @@ -837,7 +840,7 @@ func RedeemLockedQuai(hc *HeaderChain, header *types.WorkObject, parent *types.W // Fetch the block at the calculated target height targetBlock := hc.GetBlockByNumber(targetBlockHeight) if targetBlock == nil { - return fmt.Errorf("block at height %d not found", targetBlockHeight) + return fmt.Errorf("block at height %d not found", targetBlockHeight), nil } for _, etx := range targetBlock.Body().ExternalTransactions() { @@ -846,7 +849,7 @@ func RedeemLockedQuai(hc *HeaderChain, header *types.WorkObject, parent *types.W // Redeem all unlocked Quai for the coinbase address internal, err := etx.To().InternalAddress() if err != nil { - return fmt.Errorf("error converting address to internal address: %v", err) + return fmt.Errorf("error converting address to internal address: %v", err), nil } lockupByte := etx.Data()[0] @@ -868,6 +871,10 @@ func RedeemLockedQuai(hc *HeaderChain, header *types.WorkObject, parent *types.W } hc.logger.Debugf("Redeeming %s locked Quai for %s at block depth %d", balance.String(), internal.Hex(), blockDepth) statedb.AddBalance(internal, balance) + unlocks = append(unlocks, common.Unlock{ + Addr: internal, + Amt: balance, + }) } } @@ -891,10 +898,14 @@ func RedeemLockedQuai(hc *HeaderChain, header *types.WorkObject, parent *types.W } hc.logger.Debugf("Redeeming %s converted Quai for %s at block depth %d", balance.String(), internal.Hex(), blockDepth) statedb.AddBalance(internal, balance) + unlocks = append(unlocks, common.Unlock{ + Addr: internal, + Amt: balance, + }) } } } - return nil + return nil, unlocks } func applyTransaction(msg types.Message, parent *types.WorkObject, config *params.ChainConfig, bc ChainContext, gp *types.GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, usedState *uint64, evm *vm.EVM, etxRLimit, etxPLimit *int, logger *log.Logger) (*types.Receipt, *big.Int, error) { @@ -1454,7 +1465,7 @@ func CheckDenominations(inputs, outputs map[uint]uint64) error { } // Apply State -func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.WorkObject) ([]*types.Log, error) { +func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.WorkObject) ([]*types.Log, []common.Unlock, error) { nodeCtx := p.hc.NodeCtx() start := time.Now() blockHash := block.Hash() @@ -1463,15 +1474,15 @@ func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.WorkObject) ([]*t if p.hc.IsGenesisHash(block.ParentHash(nodeCtx)) { parent := p.hc.GetHeaderByHash(parentHash) if parent == nil { - return nil, errors.New("failed to load parent block") + return nil, nil, errors.New("failed to load parent block") } } time1 := common.PrettyDuration(time.Since(start)) time2 := common.PrettyDuration(time.Since(start)) // Process our block - receipts, etxs, logs, statedb, usedGas, usedState, utxoSetSize, multiSet, err := p.Process(block, batch) + receipts, etxs, logs, statedb, usedGas, usedState, utxoSetSize, multiSet, unlocks, err := p.Process(block, batch) if err != nil { - return nil, err + return nil, nil, err } if block.Hash() != blockHash { p.logger.WithFields(log.Fields{ @@ -1482,7 +1493,7 @@ func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.WorkObject) ([]*t time3 := common.PrettyDuration(time.Since(start)) err = p.validator.ValidateState(block, statedb, receipts, etxs, multiSet, usedGas, usedState) if err != nil { - return nil, err + return nil, nil, err } time4 := common.PrettyDuration(time.Since(start)) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(nodeCtx), receipts) @@ -1496,20 +1507,20 @@ func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.WorkObject) ([]*t // Commit all cached state changes into underlying memory database. root, err := statedb.Commit(true) if err != nil { - return nil, err + return nil, nil, err } etxRoot, err := statedb.CommitEtxs() if err != nil { - return nil, err + return nil, nil, err } time7 := common.PrettyDuration(time.Since(start)) var time8 common.PrettyDuration if err := p.stateCache.TrieDB().Commit(root, false, nil); err != nil { - return nil, err + return nil, nil, err } if err := p.etxCache.TrieDB().Commit(etxRoot, false, nil); err != nil { - return nil, err + return nil, nil, err } time8 = common.PrettyDuration(time.Since(start)) @@ -1528,7 +1539,7 @@ func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.WorkObject) ([]*t rawdb.WriteUTXOSetSize(batch, block.Hash(), utxoSetSize) // Indicate that we have processed the state of the block rawdb.WriteProcessedState(batch, block.Hash()) - return logs, nil + return logs, unlocks, nil } // ApplyTransaction attempts to apply a transaction to the given state database @@ -1755,7 +1766,7 @@ func (p *StateProcessor) StateAtBlock(block *types.WorkObject, reexec uint64, ba if currentBlock == nil { return nil, errors.New("detached block found trying to regenerate state") } - _, _, _, _, _, _, _, _, err := p.Process(currentBlock, batch) + _, _, _, _, _, _, _, _, _, err := p.Process(currentBlock, batch) if err != nil { return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(nodeCtx), err) } diff --git a/core/worker.go b/core/worker.go index 6c4cbc8cf5..d3e240d2fc 100644 --- a/core/worker.go +++ b/core/worker.go @@ -1611,7 +1611,7 @@ func (w *worker) prepareWork(genParams *generateParams, wo *types.WorkObject) (* return nil, err } - if err := RedeemLockedQuai(w.hc, proposedWo, parent, env.state); err != nil { + if err, _ := RedeemLockedQuai(w.hc, proposedWo, parent, env.state); err != nil { w.logger.WithField("err", err).Error("Failed to redeem locked Quai") return nil, err } diff --git a/quai/api_backend.go b/quai/api_backend.go index e1dcd4548c..1e4623df09 100644 --- a/quai/api_backend.go +++ b/quai/api_backend.go @@ -296,6 +296,14 @@ func (b *QuaiAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event return b.quai.core.SubscribePendingLogs(ch) } +func (b *QuaiAPIBackend) SubscribeUnlocksEvent(ch chan<- core.UnlocksEvent) event.Subscription { + nodeCtx := b.quai.core.NodeCtx() + if nodeCtx != common.ZONE_CTX { + return nil + } + return b.quai.core.SubscribeUnlocks(ch) +} + func (b *QuaiAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.quai.Core().SubscribeChainEvent(ch) } diff --git a/quai/filters/api.go b/quai/filters/api.go index 29ae4890ba..9b5e69ad2f 100644 --- a/quai/filters/api.go +++ b/quai/filters/api.go @@ -29,6 +29,7 @@ import ( quai "github.com/dominant-strategies/go-quai" "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/common/hexutil" + "github.com/dominant-strategies/go-quai/core" "github.com/dominant-strategies/go-quai/core/types" "github.com/dominant-strategies/go-quai/ethdb" "github.com/dominant-strategies/go-quai/event" @@ -317,6 +318,10 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) ( } rpcSub := notifier.CreateSubscription() + internalAddr, err := addr.InternalAddress() + if err != nil { + return nil, err + } go func() { defer func() { @@ -331,7 +336,8 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) ( api.activeSubscriptions += 1 headers := make(chan *types.WorkObject) headersSub := api.events.SubscribeNewHeads(headers) - + unlocks := make(chan core.UnlocksEvent) + unlocksSub := api.events.SubscribeUnlocks(unlocks) for { select { case h := <-headers: @@ -339,7 +345,7 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) ( hash := h.Hash() nodeLocation := api.backend.NodeLocation() nodeCtx := nodeLocation.Context() - if block, err := api.backend.GetBlock(h.Hash(), h.NumberU64(nodeCtx)); err != nil { + if block, err := api.backend.GetBlock(hash, h.NumberU64(nodeCtx)); err != nil { for _, tx := range block.Transactions() { // Check for external accesses if tx.To() == &addr || tx.From(nodeLocation) == &addr { @@ -355,11 +361,20 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) ( } } } + case u := <-unlocks: + for _, unlock := range u.Unlocks { + if unlock.Addr == internalAddr { + notifier.Notify(rpcSub.ID, u.Hash) + break + } + } case <-rpcSub.Err(): headersSub.Unsubscribe() + unlocksSub.Unsubscribe() return case <-notifier.Closed(): headersSub.Unsubscribe() + unlocksSub.Unsubscribe() return } } diff --git a/quai/filters/filter.go b/quai/filters/filter.go index 82acd476b1..c47ae9a49b 100644 --- a/quai/filters/filter.go +++ b/quai/filters/filter.go @@ -44,6 +44,7 @@ type Backend interface { SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribePendingHeaderEvent(ch chan<- *types.WorkObject) event.Subscription + SubscribeUnlocksEvent(ch chan<- core.UnlocksEvent) event.Subscription ProcessingState() bool NodeLocation() common.Location NodeCtx() int diff --git a/quai/filters/filter_system.go b/quai/filters/filter_system.go index 65d28817a3..b45bab424f 100644 --- a/quai/filters/filter_system.go +++ b/quai/filters/filter_system.go @@ -51,6 +51,8 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // UnlocksSubscription queries balances that are recently unlocked + UnlocksSubscription // LastSubscription keeps track of the last index LastIndexSubscription ) @@ -64,7 +66,8 @@ const ( // logsChanSize is the size of channel listening to LogsEvent. logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. - chainEvChanSize = 10 + chainEvChanSize = 10 + unlocksEvChanSize = 10 ) type subscription struct { @@ -75,6 +78,7 @@ type subscription struct { logs chan []*types.Log hashes chan []common.Hash headers chan *types.WorkObject + unlocks chan core.UnlocksEvent header chan *types.WorkObject installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -92,6 +96,7 @@ type EventSystem struct { rmLogsSub event.Subscription // Subscription for removed log event pendingLogsSub event.Subscription // Subscription for pending log event chainSub event.Subscription // Subscription for new chain event + unlocksSub event.Subscription // Subscription for new unlocks event // Channels install chan *subscription // install filter for event notification @@ -101,6 +106,7 @@ type EventSystem struct { pendingLogsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event + unlocksCh chan core.UnlocksEvent // Channel to receive newly unlocked coinbases } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -119,6 +125,7 @@ func NewEventSystem(backend Backend) *EventSystem { rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), pendingLogsCh: make(chan []*types.Log, logsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), + unlocksCh: make(chan core.UnlocksEvent, unlocksEvChanSize), } nodeCtx := backend.NodeCtx() @@ -127,12 +134,13 @@ func NewEventSystem(backend Backend) *EventSystem { m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) + m.unlocksSub = m.backend.SubscribeUnlocksEvent(m.unlocksCh) } m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) // Make sure none of the subscriptions are empty if nodeCtx == common.ZONE_CTX && backend.ProcessingState() { - if m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { + if m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.unlocksSub == nil { backend.Logger().Fatal("Subscribe for event system failed") } } else { @@ -173,6 +181,7 @@ func (sub *Subscription) Unsubscribe() { case <-sub.f.logs: case <-sub.f.hashes: case <-sub.f.headers: + case <-sub.f.unlocks: } } @@ -296,6 +305,21 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.WorkObject) *Subscr return es.subscribe(sub) } +// SubscribeUnlocks creates a subscription that writes the recently unlocked balances +func (es *EventSystem) SubscribeUnlocks(unlocks chan core.UnlocksEvent) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: UnlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + unlocks: unlocks, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + // SubscribePendingTxs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { @@ -384,6 +408,12 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) } } +func (es *EventSystem) handleUnlocksEvent(filters filterIndex, ev core.UnlocksEvent) { + for _, f := range filters[UnlocksSubscription] { + f.unlocks <- ev + } +} + // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { defer func() { @@ -401,6 +431,7 @@ func (es *EventSystem) eventLoop() { es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.pendingLogsSub.Unsubscribe() + es.unlocksSub.Unsubscribe() } es.chainSub.Unsubscribe() @@ -457,6 +488,8 @@ func (es *EventSystem) handleZoneEventLoop(index filterIndex) { es.handleRemovedLogs(index, ev) case ev := <-es.pendingLogsCh: es.handlePendingLogs(index, ev) + case ev := <-es.unlocksCh: + es.handleUnlocksEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions diff --git a/quai/filters/filter_system_test.go b/quai/filters/filter_system_test.go index cf490b9306..7afb91f940 100644 --- a/quai/filters/filter_system_test.go +++ b/quai/filters/filter_system_test.go @@ -53,6 +53,7 @@ type testBackend struct { pendingLogsFeed event.Feed chainFeed event.Feed pendingHeaderFeed event.Feed + unlocksFeed event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -182,6 +183,10 @@ func (b *testBackend) SubscribePendingHeaderEvent(ch chan<- *types.WorkObject) e return b.pendingHeaderFeed.Subscribe(ch) } +func (b *testBackend) SubscribeUnlocksEvent(ch chan<- core.UnlocksEvent) event.Subscription { + return b.unlocksFeed.Subscribe(ch) +} + // TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux. func TestPendingTxFilter(t *testing.T) { t.Skip("Todo: Fix broken test")