Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Header -> Block Conversion + PeerDB + Topic Changes #1735

Merged
merged 14 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions cmd/utils/hierarchical_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,17 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co
stack, apiBackend := makeFullNode(hc.p2p, location, hc.slicesRunning, hc.currentExpansionNumber, genesisBlock, logger)
quaiBackend.SetApiBackend(&apiBackend, location)

// Subscribe to the new topics after setting the api backend
hc.p2p.Subscribe(location, &types.WorkObject{})
hc.p2p.Subscribe(location, common.Hash{})
hc.p2p.Subscribe(location, &types.Transactions{})
hc.p2p.Subscribe(location, &types.WorkObjectHeader{})
hc.p2p.Subscribe(location, &types.WorkObjectHeaderView{})

if quaiBackend.ProcessingState(location) && location.Context() == common.ZONE_CTX {
// Subscribe to the new topics after setting the api backend
hc.p2p.Subscribe(location, &types.WorkObjectHeader{})
hc.p2p.Subscribe(location, &types.Transactions{})
}

if location.Context() == common.PRIME_CTX || location.Context() == common.REGION_CTX || quaiBackend.ProcessingState(location) {
hc.p2p.Subscribe(location, &types.WorkObjectBlockView{})
}

StartNode(stack)

Expand Down
2 changes: 1 addition & 1 deletion common/proto_common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 102 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ var (
Zero = Address{&ZeroExternal} // For utility purposes only. It is out-of-scope for state purposes.
)

var (
ErrInvalidLocation = errors.New("invalid location")
)

// Hash represents the 32 byte Keccak256 hash of arbitrary data.
type Hash [HashLength]byte

Expand Down Expand Up @@ -537,6 +541,104 @@ func (l Location) MarshalJSON() ([]byte, error) {
return json.Marshal(intSlice)
}

// NewLocation verifies the inputs for region and zone and returns a valid location
func NewLocation(region, zone int) (Location, error) {
loc := Location{}
err := loc.SetRegion(region)
if err != nil {
return nil, err
}
err = loc.SetZone(zone)
if err != nil {
return nil, err
}

return loc, nil
}

func (l *Location) SetRegion(region int) error {
if region < 0 || region >= 0xf {
return ErrInvalidLocation
}
if len(*l) < 1 {
// Extend location to include region if its too short
newLoc := make([]byte, 1)
*l = newLoc
}
(*l)[0] = byte(region)
return nil
}

func (l *Location) SetZone(zone int) error {
if zone < 0 || zone > 0xf {
return ErrInvalidLocation
}
if len(*l) < 2 {
// Extend the slice while preserving the first byte, if it exists
newSlice := make([]byte, 2)
if len(*l) > 0 {
newSlice[0] = (*l)[0] // Preserve existing first byte
}
*l = newSlice
}
(*l)[1] = byte(zone)
return nil
}

// regionMappings maps region names to their corresponding byte values.
var regionMappings = map[string]byte{
"cyprus": 0,
"paxos": 1,
"hydra": 2,
}

// LocationFromName parses a location name and returns a Location.
func LocationFromName(name string) (Location, error) {
if name == "" || name == "prime" {
return Location{}, nil
}

parts := strings.Fields(name)
if len(parts) == 1 {
wizeguyy marked this conversation as resolved.
Show resolved Hide resolved
// Check if name was provided as a string
regionIndex, err := parseRegion(parts[0])
if err != nil {
log.Global.WithField("error", err).Error("Error parsing region index")
return Location{}, err
}
return Location{byte(regionIndex)}, nil
} else if len(parts) == 2 {
// Check if name was provided as a string
regionIndex, err := parseRegion(parts[0])
if err != nil {
log.Global.WithField("error", err).Error("Error parsing region index")
return Location{}, err
}
zoneIndex, err := strconv.Atoi(parts[1])
if err != nil {
log.Global.WithField("error", err).Error("Error parsing zone index")
return nil, err
}
return Location{byte(regionIndex), byte(zoneIndex - 1)}, nil
}

return nil, fmt.Errorf("invalid location format")
}

// parseRegion attempts to parse a region from a string.
func parseRegion(part string) (byte, error) {
// Check if the part is a region name
if regionIndex, ok := regionMappings[strings.ToLower(part)]; ok {
return regionIndex, nil
}
// Otherwise, treat it as a numerical region index
regionIndex, err := strconv.Atoi(part)
if err != nil {
return 0, err
}
return byte(regionIndex), nil
}

func IsInChainScope(b []byte, nodeLocation Location) bool {
nodeCtx := nodeLocation.Context()
// IsInChainScope only be called for a zone chain
Expand Down
2 changes: 1 addition & 1 deletion consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (blake3pow *Blake3pow) FinalizeAndAssemble(chain consensus.ChainHeaderReade
return nil, err
}
// Header seems complete, assemble into a block and return
return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil, types.BlockObject), nil
return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil), nil
}

// NodeLocation returns the location of the node
Expand Down
2 changes: 1 addition & 1 deletion consensus/progpow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func (progpow *Progpow) FinalizeAndAssemble(chain consensus.ChainHeaderReader, h
return nil, err
}
// Header seems complete, assemble into a block and return
return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil, types.BlockObject), nil
return types.NewWorkObject(header.WorkObjectHeader(), woBody, nil), nil
}

func (progpow *Progpow) NodeLocation() common.Location {
Expand Down
12 changes: 7 additions & 5 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ func (v *BlockValidator) ValidateBody(block *types.WorkObject) error {
if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash() {
return fmt.Errorf("uncle root hash mismatch: have %x, want %x", hash, header.UncleHash())
}
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash() {
return fmt.Errorf("transaction root hash mismatch: have %x, want %x", hash, header.TxHash())
}
if hash := types.DeriveSha(block.ExtTransactions(), trie.NewStackTrie(nil)); hash != header.EtxHash() {
return fmt.Errorf("external transaction root hash mismatch: have %x, want %x", hash, header.EtxHash())
if v.hc.ProcessingState() {
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash() {
return fmt.Errorf("transaction root hash mismatch: have %x, want %x", hash, header.TxHash())
}
if hash := types.DeriveSha(block.ExtTransactions(), trie.NewStackTrie(nil)); hash != header.EtxHash() {
return fmt.Errorf("external transaction root hash mismatch: have %x, want %x", hash, header.EtxHash())
}
}
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion core/bodydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type BodyDb struct {
woCache *lru.Cache
processor *StateProcessor

slicesRunning []common.Location
slicesRunning []common.Location
processingState bool
gameofpointers marked this conversation as resolved.
Show resolved Hide resolved

logger *log.Logger
}
Expand Down
31 changes: 27 additions & 4 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ type HeaderChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

headermu sync.RWMutex
heads []*types.WorkObject
slicesRunning []common.Location
headermu sync.RWMutex
heads []*types.WorkObject
slicesRunning []common.Location
processingState bool

logger *log.Logger

Expand Down Expand Up @@ -124,6 +125,9 @@ func NewHeaderChain(db ethdb.Database, engine consensus.Engine, pEtxsRollupFetch
return nil, err
}

// Record if the chain is processing state
hc.processingState = hc.setStateProcessing()

pendingEtxsRollup, _ := lru.New(c_maxPendingEtxsRollup)
hc.pendingEtxsRollup = pendingEtxsRollup

Expand Down Expand Up @@ -314,7 +318,26 @@ func (hc *HeaderChain) AppendHeader(header *types.WorkObject) error {
return nil
}
func (hc *HeaderChain) ProcessingState() bool {
return hc.bc.ProcessingState()
return hc.processingState
}

func (hc *HeaderChain) setStateProcessing() bool {
nodeCtx := hc.NodeCtx()
for _, slice := range hc.slicesRunning {
switch nodeCtx {
case common.PRIME_CTX:
return true
case common.REGION_CTX:
if slice.Region() == hc.NodeLocation().Region() {
return true
}
case common.ZONE_CTX:
if slice.Equal(hc.NodeLocation()) {
return true
}
}
}
return false
}

// Append
Expand Down
24 changes: 12 additions & 12 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func DeleteTermini(db ethdb.KeyValueWriter, hash common.Hash) {
}

// ReadWorkObjectHeader retreive's the work object header stored in hash.
func ReadWorkObjectHeader(db ethdb.Reader, hash common.Hash, woType int) *types.WorkObjectHeader {
func ReadWorkObjectHeader(db ethdb.Reader, hash common.Hash, woType types.WorkObjectView) *types.WorkObjectHeader {
var key []byte
switch woType {
case types.BlockObject:
Expand Down Expand Up @@ -577,7 +577,7 @@ func ReadWorkObjectHeader(db ethdb.Reader, hash common.Hash, woType int) *types.
}

// WriteWorkObjectHeader writes the work object header of the terminus hash.
func WriteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType int, nodeCtx int) {
func WriteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType types.WorkObjectView, nodeCtx int) {
var key []byte
switch woType {
case types.BlockObject:
Expand All @@ -601,7 +601,7 @@ func WriteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, workObject
}

// DeleteWorkObjectHeader deletes the work object header stored for the header hash.
func DeleteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, woType int) {
func DeleteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, woType types.WorkObjectView) {
var key []byte
switch woType {
case types.BlockObject:
Expand All @@ -617,7 +617,7 @@ func DeleteWorkObjectHeader(db ethdb.KeyValueWriter, hash common.Hash, woType in
}

// ReadWorkObject retreive's the work object stored in hash.
func ReadWorkObject(db ethdb.Reader, hash common.Hash, woType int) *types.WorkObject {
func ReadWorkObject(db ethdb.Reader, hash common.Hash, woType types.WorkObjectView) *types.WorkObject {
workObjectHeader := ReadWorkObjectHeader(db, hash, woType)
if workObjectHeader == nil {
return nil
Expand All @@ -626,10 +626,10 @@ func ReadWorkObject(db ethdb.Reader, hash common.Hash, woType int) *types.WorkOb
if workObjectBody == nil {
return nil
}
return types.NewWorkObject(workObjectHeader, workObjectBody, nil, woType) //TODO: mmtx transaction
return types.NewWorkObject(workObjectHeader, workObjectBody, nil) //TODO: mmtx transaction
}

func ReadWorkObjectHeaderOnly(db ethdb.Reader, hash common.Hash, woType int) *types.WorkObject {
func ReadWorkObjectHeaderOnly(db ethdb.Reader, hash common.Hash, woType types.WorkObjectView) *types.WorkObject {
workObjectHeader := ReadWorkObjectHeader(db, hash, woType)
if workObjectHeader == nil {
return nil
Expand All @@ -638,17 +638,17 @@ func ReadWorkObjectHeaderOnly(db ethdb.Reader, hash common.Hash, woType int) *ty
if workObjectBodyHeaderOnly == nil {
return nil
}
return types.NewWorkObject(workObjectHeader, workObjectBodyHeaderOnly, nil, woType)
return types.NewWorkObject(workObjectHeader, workObjectBodyHeaderOnly, nil)
}

// WriteWorkObject writes the work object of the terminus hash.
func WriteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType int, nodeCtx int) {
func WriteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType types.WorkObjectView, nodeCtx int) {
WriteWorkObjectBody(db, hash, workObject, woType, nodeCtx)
WriteWorkObjectHeader(db, hash, workObject, woType, nodeCtx)
}

// DeleteWorkObject deletes the work object stored for the header hash.
func DeleteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType int) {
func DeleteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType types.WorkObjectView) {
DeleteWorkObjectBody(db, hash)
DeleteWorkObjectHeader(db, hash, woType) //TODO: mmtx transaction
DeleteHeader(db, hash, number)
Expand All @@ -657,7 +657,7 @@ func DeleteWorkObject(db ethdb.KeyValueWriter, hash common.Hash, number uint64,

// DeleteWorkObjectWithoutNumber removes all block data associated with a hash, except
// the hash to number mapping.
func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType int) {
func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64, woType types.WorkObjectView) {
DeleteWorkObjectBody(db, hash)
DeleteWorkObjectHeader(db, hash, woType) //TODO: mmtx transaction
DeleteReceipts(db, hash, number)
Expand Down Expand Up @@ -705,7 +705,7 @@ func ReadWorkObjectBodyHeaderOnly(db ethdb.Reader, hash common.Hash) *types.Work
}

// WriteWorkObjectBody writes the work object body of the terminus hash.
func WriteWorkObjectBody(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType int, nodeCtx int) {
func WriteWorkObjectBody(db ethdb.KeyValueWriter, hash common.Hash, workObject *types.WorkObject, woType types.WorkObjectView, nodeCtx int) {

key := workObjectBodyKey(hash)
WriteHeaderNumber(db, hash, workObject.NumberU64(nodeCtx))
Expand Down Expand Up @@ -1079,7 +1079,7 @@ func ReadBadWorkObject(db ethdb.Reader, hash common.Hash) *types.WorkObject {
}
for _, bad := range *badWorkObjects {
if bad.woHeader.Hash() == hash {
return types.NewWorkObject(bad.woHeader, bad.woBody, nil, types.BlockObject)
return types.NewWorkObject(bad.woHeader, bad.woBody, nil)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/db.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,7 @@ func (sl *Slice) ConstructLocalMinedBlock(wo *types.WorkObject) (*types.WorkObje
wo.Body().SetTransactions(nil)
wo.Body().SetExtTransactions(nil)
wo.Body().SetInterlinkHashes(interlinkHashes)
pendingBlockBody = types.NewWorkObject(wo.WorkObjectHeader(), wo.Body(), nil, types.BlockObject)
pendingBlockBody = types.NewWorkObject(wo.WorkObjectHeader(), wo.Body(), nil)
}
// Load uncles because they are not included in the block response.
txs := make([]*types.Transaction, len(pendingBlockBody.Transactions()))
Expand Down Expand Up @@ -1340,7 +1340,7 @@ func (sl *Slice) ConstructLocalMinedBlock(wo *types.WorkObject) (*types.WorkObje
pendingBlockBody.Body().SetExtTransactions(etxs)
pendingBlockBody.Body().SetManifest(subManifest)
pendingBlockBody.Body().SetInterlinkHashes(interlinkHashes)
block := types.NewWorkObject(wo.WorkObjectHeader(), pendingBlockBody.Body(), nil, types.BlockObject)
block := types.NewWorkObject(wo.WorkObjectHeader(), pendingBlockBody.Body(), nil)

if err := sl.validator.ValidateBody(block); err != nil {
return block, err
Expand Down
Loading
Loading