Skip to content

Commit

Permalink
Introduce Block and Header views to optimize bandwidth usage across t…
Browse files Browse the repository at this point in the history
…opics
  • Loading branch information
Djadih committed May 7, 2024
1 parent 21e5e79 commit 7243dfc
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 23 deletions.
3 changes: 2 additions & 1 deletion cmd/utils/hierarchical_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co
quaiBackend.SetApiBackend(&apiBackend, location)

// Subscribe to the new topics after setting the api backend
hc.p2p.Subscribe(location, &types.WorkObject{})
hc.p2p.Subscribe(location, &types.WorkObjectBlockView{})
hc.p2p.Subscribe(location, &types.WorkObjectHeaderView{})
hc.p2p.Subscribe(location, common.Hash{})
hc.p2p.Subscribe(location, &types.Transaction{})

Expand Down
29 changes: 29 additions & 0 deletions core/types/wo.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,3 +989,32 @@ func (wb *WorkObjectBody) RPCMarshalWorkObjectBody() map[string]interface{} {

return result
}

////////////////////////////////////////////////////////////
///////////////////// Work Object Views ////////////////////
////////////////////////////////////////////////////////////

type WorkObjectBlockView struct {
*WorkObject
}

type WorkObjectHeaderView struct {
*WorkObject
}

////////////////////////////////////////////////////////////
////////////// View Conversion/Getter Methods //////////////
////////////////////////////////////////////////////////////

func (wo *WorkObject) ConvertToHeaderView() *WorkObjectHeaderView {
newWo := NewWorkObjectWithHeader(wo, nil, common.ZONE_CTX, HeaderObject)
return &WorkObjectHeaderView{
WorkObject: newWo,
}
}

func (wo *WorkObject) ConvertToBlockView() *WorkObjectBlockView {
return &WorkObjectBlockView{
WorkObject: wo,
}
}
2 changes: 1 addition & 1 deletion internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type Backend interface {
Logger() *log.Logger

// P2P apis
BroadcastBlock(block *types.WorkObject, location common.Location) error
BroadcastWorkObject(*types.WorkObject, common.Location) error
}

func GetAPIs(apiBackend Backend) []rpc.API {
Expand Down
2 changes: 1 addition & 1 deletion internal/quaiapi/quai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func (s *PublicBlockChainQuaiAPI) ReceiveMinedHeader(ctx context.Context, raw js

// Broadcast the block and announce chain insertion event
if block.Header() != nil {
err := s.b.BroadcastBlock(block, s.b.NodeLocation())
err := s.b.BroadcastWorkObject(block, s.b.NodeLocation())
if err != nil {
s.b.Logger().WithField("err", err).Error("Error broadcasting block")
}
Expand Down
26 changes: 12 additions & 14 deletions p2p/pb/proto_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/trie"
)

Expand Down Expand Up @@ -190,29 +189,28 @@ func DecodeQuaiResponse(respMsg *QuaiResponseMessage) (uint32, interface{}, erro
// Converts a custom go type to a proto type and marhsals it into a protobuf message
func ConvertAndMarshal(data interface{}) ([]byte, error) {
switch data := data.(type) {
case *types.WorkObject:
log.Global.Tracef("marshalling block: %+v", data)
protoBlock, err := data.ProtoEncode(types.BlockObject)
if err != nil {
return nil, err
case *types.WorkObjectHeaderView, *types.WorkObjectBlockView:
var protoBlock *types.ProtoWorkObject
var err error
switch data := data.(type) {
case *types.WorkObjectHeaderView:
protoBlock, err = data.ProtoEncode(types.HeaderObject)
case *types.WorkObjectBlockView:
protoBlock, err = data.ProtoEncode(types.BlockObject)
default:
return nil, errors.New("unsupported data type")
}
return proto.Marshal(protoBlock)
case *types.Header:
log.Global.Tracef("marshalling header: %+v", data)
protoHeader, err := data.ProtoEncode()
if err != nil {
return nil, err
}
return proto.Marshal(protoHeader)
return proto.Marshal(protoBlock)
case *types.Transaction:
log.Global.Tracef("marshalling transaction: %+v", data)
protoTransaction, err := data.ProtoEncode()
if err != nil {
return nil, err
}
return proto.Marshal(protoTransaction)
case common.Hash:
log.Global.Tracef("marshalling hash: %+v", data)
protoHash := data.ProtoEncode()
return proto.Marshal(protoHash)
default:
Expand All @@ -223,7 +221,7 @@ func ConvertAndMarshal(data interface{}) ([]byte, error) {
// Unmarshals a protobuf message into a proto type and converts it to a custom go type
func UnmarshalAndConvert(data []byte, sourceLocation common.Location, dataPtr *interface{}, datatype interface{}) error {
switch datatype.(type) {
case *types.WorkObject:
case *types.WorkObjectHeaderView, *types.WorkObjectBlockView:
protoWorkObject := &types.ProtoWorkObject{}
err := proto.Unmarshal(data, protoWorkObject)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion p2p/pubsubManager/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ const (
func TopicName(genesis common.Hash, location common.Location, data interface{}) (string, error) {
baseTopic := strings.Join([]string{genesis.String(), location.Name()}, "/")
switch data.(type) {
case *types.WorkObject:
case *types.WorkObjectHeaderView:
return strings.Join([]string{baseTopic, C_headerType}, "/"), nil
case *types.WorkObjectBlockView:
return strings.Join([]string{baseTopic, C_workObjectType}, "/"), nil
case common.Hash:
return strings.Join([]string{baseTopic, C_hashType}, "/"), nil
Expand Down
20 changes: 15 additions & 5 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,19 @@ func (b *QuaiAPIBackend) SubscribeExpansionEvent(ch chan<- core.ExpansionEvent)
return b.quai.core.SubscribeExpansionEvent(ch)
}

// ///////////////////////////
// /////// P2P ///////////////
// ///////////////////////////
func (b *QuaiAPIBackend) BroadcastBlock(block *types.WorkObject, location common.Location) error {
return b.quai.p2p.Broadcast(location, block)
// ////////////////////////////
// //////// P2P ///////////////
// ////////////////////////////
func (b *QuaiAPIBackend) BroadcastWorkObject(wo *types.WorkObject, location common.Location) error {
err := b.quai.p2p.Broadcast(location, wo.ConvertToBlockView())
if err != nil {
return err
}

err = b.quai.p2p.Broadcast(location, wo.ConvertToHeaderView())
if err != nil {
return err
}

return nil
}

0 comments on commit 7243dfc

Please sign in to comment.