Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfixes from garden test and cleanups for libp2p #1785

Merged
merged 11 commits into from
May 23, 2024
Merged
22 changes: 21 additions & 1 deletion cmd/go-quai/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package main

import (
"context"
"net/http"
"os"
"os/signal"
"path/filepath"
"runtime"
godebug "runtime/debug"
"sync"
"syscall"

Expand Down Expand Up @@ -75,7 +78,7 @@ func runStart(cmd *cobra.Command, args []string) error {
defer cancel()

if viper.IsSet(utils.PprofFlag.Name) {
utils.EnablePprof()
EnablePprof()
}

// create a quit channel for services to signal for a clean shutdown
Expand Down Expand Up @@ -126,3 +129,20 @@ func runStart(cmd *cobra.Command, args []string) error {
log.Global.Warn("Node is offline")
return nil
}

func EnablePprof() {
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
port := "8085"
go func() {
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
"error": r,
"stacktrace": string(godebug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
log.Global.Print(http.ListenAndServe("localhost:"+port, nil))
}()
}
2 changes: 1 addition & 1 deletion cmd/utils/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func makeFullNode(p2p quai.NetworkingAPI, nodeLocation common.Location, slicesRu
backend, _ := RegisterQuaiService(stack, p2p, cfg.Quai, cfg.Node.NodeLocation.Context(), currentExpansionNumber, startingExpansionNumber, genesisBlock, logger)
sendfullstats := viper.GetBool(SendFullStatsFlag.Name)
// Add the Quai Stats daemon if requested.
if cfg.Quaistats.URL != "" {
if cfg.Quaistats.URL != "" && backend.ProcessingState() {
RegisterQuaiStatsService(stack, backend, cfg.Quaistats.URL, sendfullstats)
}
return stack, backend
Expand Down
3 changes: 2 additions & 1 deletion consensus/blake3pow/poem.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func (blake3pow *Blake3pow) WorkShareLogS(wo *types.WorkObject) (*big.Int, error
} else {
wsEntropy = new(big.Int).Set(blake3pow.IntrinsicLogS(powHash))
}

// Discount 2) applies to all shares regardless of the weight
wsEntropy = new(big.Int).Div(wsEntropy, new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(wo.NumberU64(common.ZONE_CTX)-ws.NumberU64())), nil))
// Add the entropy into the total entropy once the discount calculation is done
totalWsEntropy.Add(totalWsEntropy, wsEntropy)
}
Expand Down
1 change: 0 additions & 1 deletion consensus/progpow/poem.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ func (progpow *Progpow) WorkShareLogS(wo *types.WorkObject) (*big.Int, error) {
}
// Discount 2) applies to all shares regardless of the weight
wsEntropy = new(big.Int).Div(wsEntropy, new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(wo.NumberU64(common.ZONE_CTX)-ws.NumberU64())), nil))

// Add the entropy into the total entropy once the discount calculation is done
totalWsEntropy.Add(totalWsEntropy, wsEntropy)
}
Expand Down
37 changes: 20 additions & 17 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ func (sl *Slice) Append(header *types.WorkObject, domPendingHeader *types.WorkOb
// Relay the new pendingHeader
sl.relayPh(block, pendingHeaderWithTermini, domOrigin, block.Location(), subReorg)

inputs, outputs := block.InputsAndOutputsWithoutCoinbase()
net := int(outputs) - int(inputs)
time10 := common.PrettyDuration(time.Since(start))
sl.logger.WithFields(log.Fields{
"t0_1": time0_1,
Expand All @@ -447,23 +449,24 @@ func (sl *Slice) Append(header *types.WorkObject, domPendingHeader *types.WorkOb
}).Info("Times during sub append")

sl.logger.WithFields(log.Fields{
"number": block.NumberArray(),
"hash": block.Hash(),
"difficulty": block.Difficulty(),
"uncles": len(block.Uncles()),
"totalTxs": len(block.Transactions()),
"etxs emitted": len(block.ExtTransactions()),
"qiTxs": len(block.QiTransactions()),
"quaiTxs": len(block.QuaiTransactions()),
"etxs inbound": len(block.Body().ExternalTransactions()),
"gas": block.GasUsed(),
"gasLimit": block.GasLimit(),
"evmRoot": block.EVMRoot(),
"utxoRoot": block.UTXORoot(),
"etxSetRoot": block.EtxSetRoot(),
"order": order,
"location": block.Location(),
"elapsed": common.PrettyDuration(time.Since(start)),
"number": block.NumberArray(),
"hash": block.Hash(),
"difficulty": block.Difficulty(),
"uncles": len(block.Uncles()),
"totalTxs": len(block.Transactions()),
"etxs emitted": len(block.ExtTransactions()),
"qiTxs": len(block.QiTransactions()),
"net outputs-inputs": net,
"quaiTxs": len(block.QuaiTransactions()),
"etxs inbound": len(block.Body().ExternalTransactions()),
"gas": block.GasUsed(),
"gasLimit": block.GasLimit(),
"evmRoot": block.EVMRoot(),
"utxoRoot": block.UTXORoot(),
"etxSetRoot": block.EtxSetRoot(),
"order": order,
"location": block.Location(),
"elapsed": common.PrettyDuration(time.Since(start)),
}).Info("Appended new block")

if nodeCtx == common.ZONE_CTX {
Expand Down
16 changes: 16 additions & 0 deletions core/types/wo.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,22 @@ func (wo *WorkObject) QuaiTransactionsWithoutCoinbase() []*Transaction {
return quaiTxs
}

func (wo *WorkObject) InputsAndOutputsWithoutCoinbase() (uint, uint) {
inputs := 0
outputs := 0
for i, tx := range wo.Transactions() {
if i == 0 && IsCoinBaseTx(tx, wo.woHeader.parentHash, wo.woHeader.location) {
// ignore the Qi coinbase tx
continue
}
if tx.Type() == QiTxType {
inputs += len(tx.TxIn())
outputs += len(tx.TxOut())
}
}
return uint(inputs), uint(outputs)
}

func (wo *WorkObject) QuaiTransactionsWithFees() []*Transaction {
quaiTxs := make([]*Transaction, 0)
for _, t := range wo.Transactions() {
Expand Down
4 changes: 3 additions & 1 deletion internal/quaiapi/quai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,10 @@ func (s *PublicBlockChainQuaiAPI) GetPendingHeader(ctx context.Context) (map[str
} else if pendingHeader == nil {
return nil, errors.New("no pending header found")
}
// Only keep the Header in the body
pendingHeaderForMining := pendingHeader.WithBody(pendingHeader.Header(), nil, nil, nil, nil, nil)
// Marshal the response.
marshaledPh := pendingHeader.RPCMarshalWorkObject()
marshaledPh := pendingHeaderForMining.RPCMarshalWorkObject()
return marshaledPh, nil
}

Expand Down
26 changes: 17 additions & 9 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package node

import (
"context"
"math/big"
"reflect"
"runtime/debug"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/dominant-strategies/go-quai/common"
)

const requestTimeout = 10 * time.Second

// Starts the node and all of its services
func (p *P2PNode) Start() error {
log.Global.Infof("starting P2P node...")
Expand Down Expand Up @@ -116,7 +119,7 @@ func (p *P2PNode) Stop() error {
}
}

func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData interface{}, respDataType interface{}, resultChan chan interface{}) {
func (p *P2PNode) requestFromPeers(ctx context.Context, topic *pubsubManager.Topic, requestData interface{}, respDataType interface{}, resultChan chan interface{}) {
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -137,6 +140,7 @@ func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData inter
for peerID := range peers {
requestWg.Add(1)
go func(peerID peer.ID) {
defer requestWg.Done()
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
Expand All @@ -145,15 +149,14 @@ func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData inter
}).Error("Go-Quai Panicked")
}
}()
defer requestWg.Done()
p.requestAndWait(peerID, topic, requestData, respDataType, resultChan)
p.requestAndWait(ctx, peerID, topic, requestData, respDataType, resultChan)
}(peerID)
}
requestWg.Wait()
}()
}

func (p *P2PNode) requestAndWait(peerID peer.ID, topic *pubsubManager.Topic, reqData interface{}, respDataType interface{}, resultChan chan interface{}) {
func (p *P2PNode) requestAndWait(ctx context.Context, peerID peer.ID, topic *pubsubManager.Topic, reqData interface{}, respDataType interface{}, resultChan chan interface{}) {
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
Expand All @@ -175,6 +178,13 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, topic *pubsubManager.Topic, req
select {
case resultChan <- recvd:
// Data sent successfully
case <-ctx.Done():
// Request timed out, return
log.Global.WithFields(log.Fields{
"peerId": peerID,
"message": "Request timed out, data not sent",
}).Warning("Missed data request")

default:
// Optionally log the missed send or handle it in another way
log.Global.WithFields(log.Fields{
Expand Down Expand Up @@ -206,6 +216,8 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res
}

resultChan := make(chan interface{}, 10)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
// If it is a hash, first check to see if it is contained in the caches
if hash, ok := requestData.(common.Hash); ok {
result, ok := p.cacheGet(hash, responseDataType, location)
Expand All @@ -215,7 +227,7 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res
}
}

p.requestFromPeers(topic, requestData, responseDataType, resultChan)
p.requestFromPeers(ctx, topic, requestData, responseDataType, resultChan)
// TODO: optimize with waitgroups or a doneChan to only query if no peers responded
// Right now this creates too many streams, so don't call this until we have a better solution
// p.queryDHT(location, requestData, responseDataType, resultChan)
Expand Down Expand Up @@ -309,10 +321,6 @@ func (p *P2PNode) GetBlockHashByNumber(number *big.Int, location common.Location
return p.consensus.LookupBlockHashByNumber(number, location)
}

func (p *P2PNode) GetHeader(hash common.Hash, location common.Location) *types.WorkObject {
return p.consensus.GetHeader(hash, location)
}

func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse {
return p.consensus.GetTrieNode(hash, location)
}
Expand Down
94 changes: 48 additions & 46 deletions p2p/node/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,56 +42,58 @@ func (p *P2PNode) eventLoop() {
for {
select {
case evt := <-sub.Out():
switch e := evt.(type) {
case event.EvtLocalProtocolsUpdated:
log.Global.Debugf("Event: 'Local protocols updated' - added: %+v, removed: %+v", e.Added, e.Removed)
case event.EvtLocalAddressesUpdated:
p2pAddr, err := p.p2pAddress()
if err != nil {
log.Global.Errorf("error computing p2p address: %s", err)
} else {
for _, addr := range e.Current {
addr := addr.Address.Encapsulate(p2pAddr)
log.Global.Infof("Event: 'Local address updated': %s", addr)
go func(evt interface{}) {
switch e := evt.(type) {
case event.EvtLocalProtocolsUpdated:
log.Global.Debugf("Event: 'Local protocols updated' - added: %+v, removed: %+v", e.Added, e.Removed)
case event.EvtLocalAddressesUpdated:
p2pAddr, err := p.p2pAddress()
if err != nil {
log.Global.Errorf("error computing p2p address: %s", err)
} else {
for _, addr := range e.Current {
addr := addr.Address.Encapsulate(p2pAddr)
log.Global.Infof("Event: 'Local address updated': %s", addr)
}
// log removed addresses
for _, addr := range e.Removed {
addr := addr.Address.Encapsulate(p2pAddr)
log.Global.Infof("Event: 'Local address removed': %s", addr)
}
}
// log removed addresses
for _, addr := range e.Removed {
addr := addr.Address.Encapsulate(p2pAddr)
log.Global.Infof("Event: 'Local address removed': %s", addr)
case event.EvtLocalReachabilityChanged:
log.Global.Debugf("Event: 'Local reachability changed': %+v", e.Reachability)
case event.EvtNATDeviceTypeChanged:
log.Global.Debugf("Event: 'NAT device type changed' - DeviceType %v, transport: %v", e.NatDeviceType.String(), e.TransportProtocol.String())
case event.EvtPeerProtocolsUpdated:
log.Global.Debugf("Event: 'Peer protocols updated' - added: %+v, removed: %+v, peer: %+v", e.Added, e.Removed, e.Peer)
case event.EvtPeerIdentificationCompleted:
log.Global.Debugf("Event: 'Peer identification completed' - %v", e.Peer)
case event.EvtPeerIdentificationFailed:
log.Global.Debugf("Event 'Peer identification failed' - peer: %v, reason: %v", e.Peer, e.Reason.Error())
case event.EvtPeerConnectednessChanged:
// get the peer info
peerInfo := p.peerManager.GetHost().Peerstore().PeerInfo(e.Peer)
// get the peer ID
peerID := peerInfo.ID
// get the peer protocols
peerProtocols, err := p.peerManager.GetHost().Peerstore().GetProtocols(peerID)
if err != nil {
log.Global.Errorf("error getting peer protocols: %s", err)
}
}
case event.EvtLocalReachabilityChanged:
log.Global.Debugf("Event: 'Local reachability changed': %+v", e.Reachability)
case event.EvtNATDeviceTypeChanged:
log.Global.Debugf("Event: 'NAT device type changed' - DeviceType %v, transport: %v", e.NatDeviceType.String(), e.TransportProtocol.String())
case event.EvtPeerProtocolsUpdated:
log.Global.Debugf("Event: 'Peer protocols updated' - added: %+v, removed: %+v, peer: %+v", e.Added, e.Removed, e.Peer)
case event.EvtPeerIdentificationCompleted:
log.Global.Debugf("Event: 'Peer identification completed' - %v", e.Peer)
case event.EvtPeerIdentificationFailed:
log.Global.Debugf("Event 'Peer identification failed' - peer: %v, reason: %v", e.Peer, e.Reason.Error())
case event.EvtPeerConnectednessChanged:
// get the peer info
peerInfo := p.peerManager.GetHost().Peerstore().PeerInfo(e.Peer)
// get the peer ID
peerID := peerInfo.ID
// get the peer protocols
peerProtocols, err := p.peerManager.GetHost().Peerstore().GetProtocols(peerID)
if err != nil {
log.Global.Errorf("error getting peer protocols: %s", err)
}
// get the peer addresses
peerAddresses := p.peerManager.GetHost().Peerstore().Addrs(peerID)
log.Global.Debugf("Event: 'Peer connectedness change' - Peer %s (peerInfo: %+v) is now %s, protocols: %v, addresses: %v", peerID.String(), peerInfo, e.Connectedness, peerProtocols, peerAddresses)
// get the peer addresses
peerAddresses := p.peerManager.GetHost().Peerstore().Addrs(peerID)
log.Global.Debugf("Event: 'Peer connectedness change' - Peer %s (peerInfo: %+v) is now %s, protocols: %v, addresses: %v", peerID.String(), peerInfo, e.Connectedness, peerProtocols, peerAddresses)

if e.Connectedness == network.NotConnected {
p.peerManager.RemovePeer(peerID)
if e.Connectedness == network.NotConnected {
p.peerManager.RemovePeer(peerID)
}
case *event.EvtNATDeviceTypeChanged:
log.Global.Debugf("Event `NAT device type changed` - DeviceType %v, transport: %v", e.NatDeviceType.String(), e.TransportProtocol.String())
default:
log.Global.Debugf("Received unknown event (type: %T): %+v", e, e)
}
case *event.EvtNATDeviceTypeChanged:
log.Global.Debugf("Event `NAT device type changed` - DeviceType %v, transport: %v", e.NatDeviceType.String(), e.TransportProtocol.String())
default:
log.Global.Debugf("Received unknown event (type: %T): %+v", e, e)
}
}(evt)
case <-p.ctx.Done():
log.Global.Warnf("Context cancel received. Stopping event listener")
return
Expand Down
Loading
Loading