Skip to content

Commit

Permalink
Merge pull request #79 from warp-contracts/janekolszak/fix-downloadin…
Browse files Browse the repository at this point in the history
…g-blocks

janekolszak/fix downloading blocks
  • Loading branch information
szynwelski authored Oct 13, 2023
2 parents d8be41d + 4b0a79a commit f188381
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 53 deletions.
33 changes: 17 additions & 16 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -634,25 +635,25 @@ func New(
app.MountMemoryStores(memKeys)

// initialize BaseApp
// anteHandler, err := sequencerante.NewAnteHandler(
// sequencerante.HandlerOptions{
// AccountKeeper: app.AccountKeeper,
// BankKeeper: app.BankKeeper,
// FeegrantKeeper: app.FeeGrantKeeper,
// SignModeHandler: encodingConfig.TxConfig.SignModeHandler(),
// SigGasConsumer: sequencerante.SigVerificationGasConsumer,
// BlockInteractions: blockInteractions,
// },
// )
// if err != nil {
// panic(fmt.Errorf("failed to create AnteHandler: %w", err))
// }

// app.SetAnteHandler(anteHandler)
anteHandler, err := sequencerante.NewAnteHandler(
sequencerante.HandlerOptions{
AccountKeeper: app.AccountKeeper,
BankKeeper: app.BankKeeper,
FeegrantKeeper: app.FeeGrantKeeper,
SignModeHandler: encodingConfig.TxConfig.SignModeHandler(),
SigGasConsumer: sequencerante.SigVerificationGasConsumer,
BlockInteractions: blockInteractions,
},
)
if err != nil {
panic(fmt.Errorf("failed to create AnteHandler: %w", err))
}

app.SetAnteHandler(anteHandler)
app.SetInitChainer(app.InitChainer)
app.SetBeginBlocker(app.BeginBlocker)
app.SetEndBlocker(app.EndBlocker)
// app.SetPrepareProposal(sequencerproposal.NewPrepareProposalHandler(&app.SequencerKeeper, app.ArweaveBlocksController, app.txConfig))
app.SetPrepareProposal(sequencerproposal.NewPrepareProposalHandler(&app.SequencerKeeper, app.ArweaveBlocksController, app.txConfig))
app.SetProcessProposal(sequencerproposal.NewProcessProposalHandler(app.txConfig, app.BlockValidator, app.Logger()))

if loadLatest {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/warp-contracts/syncer v0.2.41
github.com/warp-contracts/syncer v0.2.42
golang.org/x/crypto v0.13.0
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529
google.golang.org/grpc v1.56.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1248,8 +1248,8 @@ github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8=
github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/warp-contracts/syncer v0.2.41 h1:4w6C4X4sFSl2nHkIc0ZATZh0d1/ck7xfX3Nio4dZKeI=
github.com/warp-contracts/syncer v0.2.41/go.mod h1:FuuesdBgrN56inr420w2QITI0c/9gcaUiSCbopkcbqo=
github.com/warp-contracts/syncer v0.2.42 h1:QkME9abBhaZ65COICfsOSHR8ufiofIzHe9AsZjXnMWI=
github.com/warp-contracts/syncer v0.2.42/go.mod h1:FuuesdBgrN56inr420w2QITI0c/9gcaUiSCbopkcbqo=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
25 changes: 17 additions & 8 deletions x/sequencer/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// Controller for fetching Arweave blocks to add them to the sequencer blockchain or validate blocks added by the Proposer.
type ArweaveBlocksController interface {
// Sets the last block height accepted by the sequencer network
SetLastAcceptedBlockHeight(uint64)
SetLastAcceptedBlock(*types.ArweaveBlockInfo)

// Gracefully stops the controller, waits for all tasks to finish
StopWait()
Expand All @@ -40,6 +40,7 @@ type SyncerController struct {
// Runtime state
mtx sync.Mutex
lastAcceptedArweaveHeight uint64
lastAcceptedArweaveHash arweave.Base64String

blockDownloader *listener.BlockDownloader
store *Store
Expand Down Expand Up @@ -95,7 +96,7 @@ func NewController(log log.Logger, configPath string) (out ArweaveBlocksControll
if self.lastAcceptedArweaveHeight > 0 {
// This is a restart from the watchdog so set the start height
// Otherwise it will be set later
self.blockDownloader.WithHeightRange(self.lastAcceptedArweaveHeight+1, math.MaxUint64)
self.blockDownloader.WithHeightRange(self.lastAcceptedArweaveHeight, math.MaxUint64)
}

transactionDownloader := listener.NewTransactionDownloader(self.config).
Expand Down Expand Up @@ -162,19 +163,27 @@ func (self *SyncerController) StopWait() {
self.StopWait()
}

func (self *SyncerController) SetLastAcceptedBlockHeight(height uint64) {
func (self *SyncerController) SetLastAcceptedBlock(block *types.ArweaveBlockInfo) {
if !self.isRunning() {
return
}
self.mtx.Lock()
defer self.mtx.Unlock()

if self.lastAcceptedArweaveHeight == 0 {
// This is the first time we set the height
self.blockDownloader.SetStartHeight(self.lastAcceptedArweaveHeight + 1)
} else {
if self.lastAcceptedArweaveHeight != 0 {
// Called after the initialization
self.store.RemoveNextArweaveBlocksUpToHeight(height)
self.store.RemoveNextArweaveBlocksUpToHeight(block.Height)
return
}

// This is the first time we see the last accepted block
// Start downloading blocks from the next one
self.lastAcceptedArweaveHeight = block.Height
var hash arweave.Base64String
err := hash.Decode(block.Hash)
if err != nil {
panic(err)
}
self.lastAcceptedArweaveHash = hash
self.blockDownloader.SetPreviousBlock(self.lastAcceptedArweaveHeight, self.lastAcceptedArweaveHash)
}
2 changes: 1 addition & 1 deletion x/sequencer/controller/controller_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type ArweaveBlocksControllerMock struct {
block *types.NextArweaveBlock
}

func (mock ArweaveBlocksControllerMock) SetLastAcceptedBlockHeight(height uint64) {
func (mock ArweaveBlocksControllerMock) SetLastAcceptedBlock(*types.ArweaveBlockInfo) {
}

func (mock ArweaveBlocksControllerMock) GetNextArweaveBlock(height uint64) *types.NextArweaveBlock {
Expand Down
2 changes: 1 addition & 1 deletion x/sequencer/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,5 @@ func (am AppModule) startOrUpdateArweaveBlocksController(ctx sdk.Context) {
panic("Last Arweave Block is not set when the BeginBlock method is called, and should be set when the blockchain is started")
}

am.arweaveBlocksController.SetLastAcceptedBlockHeight(lastArweaveBlock.ArweaveBlock.Height)
am.arweaveBlocksController.SetLastAcceptedBlock(lastArweaveBlock.ArweaveBlock)
}
47 changes: 23 additions & 24 deletions x/sequencer/proposal/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,29 @@ func (v *BlockValidator) validateInParallel(txValidator *TxValidator, txs []sdk.
}

func (v *BlockValidator) ValidateBlock(block *Block) error {
return nil
// if v == nil {
// return nil
// }

// // sending the block to the input channel (with checking whether the task is not stopped)
// select {
// case <-v.Ctx.Done():
// return nil
// case <-block.ctx.Done():
// return nil
// case v.input <- block:
// }

// // receiving the validation result from the output channel (with checking whether the task is not stopped)
// select {
// case <-v.Ctx.Done():
// return nil
// case <-block.ctx.Done():
// return nil
// case err := <-v.output:
// // in case of a closed channel, err will be nil
// return err
// }
if v == nil {
return nil
}

// sending the block to the input channel (with checking whether the task is not stopped)
select {
case <-v.Ctx.Done():
return nil
case <-block.ctx.Done():
return nil
case v.input <- block:
}

// receiving the validation result from the output channel (with checking whether the task is not stopped)
select {
case <-v.Ctx.Done():
return nil
case <-block.ctx.Done():
return nil
case err := <-v.output:
// in case of a closed channel, err will be nil
return err
}
}

func (v *BlockValidator) StopWait() {
Expand Down

0 comments on commit f188381

Please sign in to comment.