Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(nitro): reorg fix and minor refactoring #17

Merged
merged 4 commits into from
Jul 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 47 additions & 55 deletions das/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ import (
)

type DAConfig struct {
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
ReadRpc string `koanf:"read-rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"`
}

type ValidatorConfig struct {
Expand Down Expand Up @@ -110,6 +111,7 @@ func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".validator-config"+".tendermint-rpc", "", "Tendermint RPC endpoint, only used for validation")
f.String(prefix+".validator-config"+".eth-rpc", "", "L1 Websocket connection, only used for validation")
f.String(prefix+".validator-config"+".blobstream", "", "Blobstream address, only used for validation")
f.Bool(prefix+".dangerous-reorg-on-read-failure", false, "DANGEROUS: reorg if any error during reads from celestia node")
}

func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, error) {
Expand All @@ -127,6 +129,8 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err
if err != nil {
return nil, err
}
} else {
readClient = daClient
}

if cfg.NamespaceId == "" {
Expand Down Expand Up @@ -244,7 +248,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
celestiaLastNonDefaultGasprice.Update(gasPrice)
}

proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
proofs, err := c.ReadClient.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Error retrieving proof", "err", err)
Expand All @@ -255,7 +259,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
for proofs == nil {
log.Warn("Retrieved empty proof from GetProof, fetching again...", "proofRetries", proofRetries)
time.Sleep(time.Millisecond * 100)
proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
proofs, err = c.ReadClient.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Error retrieving proof", "err", err)
Expand All @@ -265,7 +269,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
celestiaBlobInclusionRetries.Inc(1)
}

included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment)
included, err := c.ReadClient.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment)
if err != nil || !included {
celestiaFailureCounter.Inc(1)
log.Warn("Error checking for inclusion", "err", err, "proof", proofs)
Expand All @@ -274,7 +278,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(dataBlob.Commitment))

// we fetch the blob so that we can get the correct start index in the square
dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment)
dataBlob, err = c.ReadClient.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
log.Warn("could not fetch blob", "err", err)
celestiaFailureCounter.Inc(1)
Expand All @@ -287,7 +291,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
return nil, errors.New("unexpected response code")
}

header, err := c.Client.Header.GetByHeight(ctx, height)
header, err := c.ReadClient.Header.GetByHeight(ctx, height)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Header retrieval error", "err", err)
Expand Down Expand Up @@ -359,19 +363,13 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)

func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ([]byte, *types.SquareData, error) {
// Wait until our client is synced
var celestiaClient *openrpc.Client
if c.ReadClient != nil {
celestiaClient = c.ReadClient
} else {
celestiaClient = c.Client
}
err := celestiaClient.Header.SyncWait(ctx)
err := c.ReadClient.Header.SyncWait(ctx)
if err != nil {
log.Error("trouble with client sync", "err", err)
return nil, nil, err
}

header, err := celestiaClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
header, err := c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
if err != nil {
log.Error("could not fetch header", "err", err)
return nil, nil, err
Expand All @@ -380,14 +378,12 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
headerDataHash := [32]byte{}
copy(headerDataHash[:], header.DataHash)
if headerDataHash != blobPointer.DataRoot {
log.Error("Data Root mismatch", " header.DataHash", header.DataHash, "blobPointer.DataRoot", hex.EncodeToString(blobPointer.DataRoot[:]))
Ferret-san marked this conversation as resolved.
Show resolved Hide resolved
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Data Root mismatch, header.DataHash=%v, blobPointer.DataRoot=%v", header.DataHash, hex.EncodeToString(blobPointer.DataRoot[:])))
}

proofs, err := celestiaClient.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
proofs, err := c.ReadClient.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
if err != nil {
log.Error("Error retrieving proof", "err", err)
Ferret-san marked this conversation as resolved.
Show resolved Hide resolved
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Error retrieving proof, err=%v", err))
}

sharesLength := uint64(0)
Expand All @@ -396,21 +392,19 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
}

if sharesLength != blobPointer.SharesLength {
log.Error("Share length mismatch", "sharesLength", sharesLength, "blobPointer.SharesLength", blobPointer.SharesLength)
Ferret-san marked this conversation as resolved.
Show resolved Hide resolved
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Share length mismatch, sharesLength=%v, blobPointer.SharesLength=%v", sharesLength, blobPointer.SharesLength))
}

blob, err := celestiaClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
blob, err := c.ReadClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
if err != nil {
// return an empty batch of data because we could not find the blob from the sequencer message
log.Error("Failed to get blob", "height", blobPointer.BlockHeight, "commitment", hex.EncodeToString(blobPointer.TxCommitment[:]), "err", err)
return []byte{}, nil, nil
// we eventually manually reorg, setting ReorgOnReadFailure=true
return c.returnErrorHelper(fmt.Errorf("Failed to get blob, height=%v, commitment=%v, err=%v", blobPointer.BlockHeight, hex.EncodeToString(blobPointer.TxCommitment[:]), err))
Ferret-san marked this conversation as resolved.
Show resolved Hide resolved
}

eds, err := celestiaClient.Share.GetEDS(ctx, header)
eds, err := c.ReadClient.Share.GetEDS(ctx, header)
if err != nil {
log.Error("Failed to get EDS", "height", blobPointer.BlockHeight, "err", err)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("Failed to get EDS, height=%v, err=%v", blobPointer.BlockHeight, err))
Ferret-san marked this conversation as resolved.
Show resolved Hide resolved
}

squareSize := uint64(eds.Width())
Expand All @@ -419,34 +413,29 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
startRow := blobPointer.Start / odsSize

if blobPointer.Start >= odsSize*odsSize {
log.Error("startIndexOds >= odsSize*odsSize", "startIndexOds", blobPointer.Start, "odsSize*odsSize", odsSize*odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("startIndexOds >= odsSize*odsSize, startIndexOds=%v, odsSize*odsSize=%v", blobPointer.Start, odsSize*odsSize))
}

if blobPointer.Start+blobPointer.SharesLength < 1 {
log.Error("startIndexOds+blobPointer.SharesLength < 1", "startIndexOds+blobPointer.SharesLength", blobPointer.Start+blobPointer.SharesLength)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("startIndexOds+blobPointer.SharesLength < 1, startIndexOds+blobPointer.SharesLength=%v", blobPointer.Start+blobPointer.SharesLength))
}

endIndexOds := blobPointer.Start + blobPointer.SharesLength - 1
if endIndexOds >= odsSize*odsSize {
log.Error("endIndexOds >= odsSize*odsSize", "endIndexOds", endIndexOds, "odsSize*odsSize", odsSize*odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("endIndexOds >= odsSize*odsSize, endIndexOds=%v, odsSize*odsSize=%v", endIndexOds, odsSize*odsSize))
Ferret-san marked this conversation as resolved.
Show resolved Hide resolved
}

endRow := endIndexOds / odsSize

if endRow >= odsSize || startRow >= odsSize {
log.Error("endRow >= odsSize || startRow >= odsSize", "endRow", endRow, "startRow", startRow, "odsSize", odsSize)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("endRow >= odsSize || startRow >= odsSize, endRow=%v, startRow=%v, odsSize=%v", endRow, startRow, odsSize))
}

startColumn := blobPointer.Start % odsSize
endColumn := endIndexOds % odsSize

if startRow == endRow && startColumn > endColumn {
log.Error("start and end row are the same and startColumn >= endColumn", "startColumn", startColumn, "endColumn+1 ", endColumn+1)
return []byte{}, nil, nil
return c.returnErrorHelper(fmt.Errorf("start and end row are the same and startColumn >= endColumn, startColumn=%v, endColumn+1=%v", startColumn, endColumn+1))
Ferret-san marked this conversation as resolved.
Show resolved Hide resolved
}

rows := [][][]byte{}
Expand Down Expand Up @@ -481,18 +470,11 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Couldn't unmarshal Celestia blob pointer", "err", err)
return nil, nil
}

var celestiaClient *openrpc.Client
if c.ReadClient != nil {
celestiaClient = c.ReadClient
} else {
celestiaClient = c.Client
return nil, err
}

// Get data root from a celestia node
header, err := celestiaClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
header, err := c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Warn("Header retrieval error", "err", err)
Expand Down Expand Up @@ -689,3 +671,13 @@ func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHei

return nil, fmt.Errorf("unable to find Data Commitment Stored event in Blobstream")
}

func (c *CelestiaDA) returnErrorHelper(err error) ([]byte, *types.SquareData, error) {
log.Error(err.Error())

if c.Cfg.ReorgOnReadFailure {
return []byte{}, nil, nil
}

return nil, nil, err
}
Loading