Skip to content

Commit

Permalink
Merge pull request #17 from emilianobonassi/reorg_fix
Browse files Browse the repository at this point in the history
fix(nitro): reorg fix and minor refactoring
  • Loading branch information
Ferret-san authored Jul 29, 2024
2 parents 055e71a + 00d7e40 commit 5a9e7a0
Showing 1 changed file with 47 additions and 55 deletions.
102 changes: 47 additions & 55 deletions das/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ import (
)

type DAConfig struct {
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"`
}

type ValidatorConfig struct {
Expand Down Expand Up @@ -110,6 +111,7 @@ func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".validator-config"+".tendermint-rpc", "", "Tendermint RPC endpoint, only used for validation")
f.String(prefix+".validator-config"+".eth-rpc", "", "L1 Websocket connection, only used for validation")
f.String(prefix+".validator-config"+".blobstream", "", "Blobstream address, only used for validation")
f.Bool(prefix+".dangerous-reorg-on-read-failure", false, "DANGEROUS: reorg if any error during reads from celestia node")
}

func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, error) {
Expand All @@ -127,6 +129,8 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err
if err != nil {
return nil, err
}
} else {
readClient = daClient
}

if cfg.NamespaceId == "" {
Expand Down Expand Up @@ -244,7 +248,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
celestiaLastNonDefaultGasprice.Update(gasPrice)
}

proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
proofs, err := c.ReadClient.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Error retrieving proof", "err", err)
Expand All @@ -255,7 +259,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
for proofs == nil {
log.Warn("Retrieved empty proof from GetProof, fetching again...", "proofRetries", proofRetries)
time.Sleep(time.Millisecond * 100)
proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
proofs, err = c.ReadClient.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Error retrieving proof", "err", err)
Expand All @@ -265,7 +269,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
celestiaBlobInclusionRetries.Inc(1)
}

included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment)
included, err := c.ReadClient.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment)
if err != nil || !included {
celestiaFailureCounter.Inc(1)
log.Warn("Error checking for inclusion", "err", err, "proof", proofs)
Expand All @@ -274,7 +278,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(dataBlob.Commitment))

// we fetch the blob so that we can get the correct start index in the square
dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment)
dataBlob, err = c.ReadClient.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
log.Warn("could not fetch blob", "err", err)
celestiaFailureCounter.Inc(1)
Expand All @@ -287,7 +291,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
return nil, errors.New("unexpected response code")
}

header, err := c.Client.Header.GetByHeight(ctx, height)
header, err := c.ReadClient.Header.GetByHeight(ctx, height)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Header retrieval error", "err", err)
Expand Down Expand Up @@ -359,19 +363,13 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)

func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ([]byte, *types.SquareData, error) {
// Wait until our client is synced
var celestiaClient *openrpc.Client
if c.ReadClient != nil {
celestiaClient = c.ReadClient
} else {
celestiaClient = c.Client
}
err := celestiaClient.Header.SyncWait(ctx)
err := c.ReadClient.Header.SyncWait(ctx)
if err != nil {
log.Error("trouble with client sync", "err", err)
return nil, nil, err
}

header, err := celestiaClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
header, err := c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
if err != nil {
log.Error("could not fetch header", "err", err)
return nil, nil, err
Expand All @@ -380,14 +378,12 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
headerDataHash := [32]byte{}
copy(headerDataHash[:], header.DataHash)
if headerDataHash != blobPointer.DataRoot {
log.Error("Data Root mismatch", " header.DataHash", header.DataHash, "blobPointer.DataRoot", hex.EncodeToString(blobPointer.DataRoot[:]))
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Data Root mismatch, header.DataHash=%v, blobPointer.DataRoot=%v", header.DataHash, hex.EncodeToString(blobPointer.DataRoot[:])))
}

proofs, err := celestiaClient.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
proofs, err := c.ReadClient.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
if err != nil {
log.Error("Error retrieving proof", "err", err)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Error retrieving proof, err=%v", err))
}

sharesLength := uint64(0)
Expand All @@ -396,21 +392,19 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
}

if sharesLength != blobPointer.SharesLength {
log.Error("Share length mismatch", "sharesLength", sharesLength, "blobPointer.SharesLength", blobPointer.SharesLength)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Share length mismatch, sharesLength=%v, blobPointer.SharesLength=%v", sharesLength, blobPointer.SharesLength))
}

blob, err := celestiaClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
blob, err := c.ReadClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
if err != nil {
// return an empty batch of data because we could not find the blob from the sequencer message
log.Error("Failed to get blob", "height", blobPointer.BlockHeight, "commitment", hex.EncodeToString(blobPointer.TxCommitment[:]), "err", err)
return []byte{}, nil, nil
// we eventually manually reorg, setting ReorgOnReadFailure=true
return c.returnErrorHelper(fmt.Errorf("Failed to get blob, height=%v, commitment=%v, err=%v", blobPointer.BlockHeight, hex.EncodeToString(blobPointer.TxCommitment[:]), err))
}

eds, err := celestiaClient.Share.GetEDS(ctx, header)
eds, err := c.ReadClient.Share.GetEDS(ctx, header)
if err != nil {
log.Error("Failed to get EDS", "height", blobPointer.BlockHeight, "err", err)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Failed to get EDS, height=%v, err=%v", blobPointer.BlockHeight, err))
}

squareSize := uint64(eds.Width())
Expand All @@ -419,34 +413,29 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
startRow := blobPointer.Start / odsSize

if blobPointer.Start >= odsSize*odsSize {
log.Error("startIndexOds >= odsSize*odsSize", "startIndexOds", blobPointer.Start, "odsSize*odsSize", odsSize*odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("startIndexOds >= odsSize*odsSize, startIndexOds=%v, odsSize*odsSize=%v", blobPointer.Start, odsSize*odsSize))
}

if blobPointer.Start+blobPointer.SharesLength < 1 {
log.Error("startIndexOds+blobPointer.SharesLength < 1", "startIndexOds+blobPointer.SharesLength", blobPointer.Start+blobPointer.SharesLength)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("startIndexOds+blobPointer.SharesLength < 1, startIndexOds+blobPointer.SharesLength=%v", blobPointer.Start+blobPointer.SharesLength))
}

endIndexOds := blobPointer.Start + blobPointer.SharesLength - 1
if endIndexOds >= odsSize*odsSize {
log.Error("endIndexOds >= odsSize*odsSize", "endIndexOds", endIndexOds, "odsSize*odsSize", odsSize*odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("endIndexOds >= odsSize*odsSize, endIndexOds=%v, odsSize*odsSize=%v", endIndexOds, odsSize*odsSize))
}

endRow := endIndexOds / odsSize

if endRow >= odsSize || startRow >= odsSize {
log.Error("endRow >= odsSize || startRow >= odsSize", "endRow", endRow, "startRow", startRow, "odsSize", odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("endRow >= odsSize || startRow >= odsSize, endRow=%v, startRow=%v, odsSize=%v", endRow, startRow, odsSize))
}

startColumn := blobPointer.Start % odsSize
endColumn := endIndexOds % odsSize

if startRow == endRow && startColumn > endColumn {
log.Error("start and end row are the same and startColumn >= endColumn", "startColumn", startColumn, "endColumn+1 ", endColumn+1)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("start and end row are the same and startColumn >= endColumn, startColumn=%v, endColumn+1=%v", startColumn, endColumn+1))
}

rows := [][][]byte{}
Expand Down Expand Up @@ -481,18 +470,11 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Couldn't unmarshal Celestia blob pointer", "err", err)
return nil, nil
}

var celestiaClient *openrpc.Client
if c.ReadClient != nil {
celestiaClient = c.ReadClient
} else {
celestiaClient = c.Client
return nil, err
}

// Get data root from a celestia node
header, err := celestiaClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
header, err := c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Warn("Header retrieval error", "err", err)
Expand Down Expand Up @@ -689,3 +671,13 @@ func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHei

return nil, fmt.Errorf("unable to find Data Commitment Stored event in Blobstream")
}

func (c *CelestiaDA) returnErrorHelper(err error) ([]byte, *types.SquareData, error) {
log.Error(err.Error())

if c.Cfg.ReorgOnReadFailure {
return []byte{}, nil, nil
}

return nil, nil, err
}

0 comments on commit 5a9e7a0

Please sign in to comment.