diff --git a/das/celestia/celestia.go b/das/celestia/celestia.go index 8e08b9d977..70b7143e47 100644 --- a/das/celestia/celestia.go +++ b/das/celestia/celestia.go @@ -29,16 +29,17 @@ import ( ) type DAConfig struct { - Enable bool `koanf:"enable"` - GasPrice float64 `koanf:"gas-price" reload:"hot"` - GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"` - Rpc string `koanf:"rpc" reload:"hot"` - ReadRpc string `koanf:"read-rpc" reload:"hot"` - NamespaceId string `koanf:"namespace-id" ` - AuthToken string `koanf:"auth-token" reload:"hot"` - ReadAuthToken string `koanf:"read-auth-token" reload:"hot"` - NoopWriter bool `koanf:"noop-writer" reload:"hot"` - ValidatorConfig *ValidatorConfig `koanf:"validator-config"` + Enable bool `koanf:"enable"` + GasPrice float64 `koanf:"gas-price" reload:"hot"` + GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"` + Rpc string `koanf:"rpc" reload:"hot"` + ReadRpc string `koanf:"read-rpc" reload:"hot"` + NamespaceId string `koanf:"namespace-id" ` + AuthToken string `koanf:"auth-token" reload:"hot"` + ReadAuthToken string `koanf:"read-auth-token" reload:"hot"` + NoopWriter bool `koanf:"noop-writer" reload:"hot"` + ValidatorConfig *ValidatorConfig `koanf:"validator-config"` + ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"` } type ValidatorConfig struct { @@ -110,6 +111,7 @@ func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".validator-config"+".tendermint-rpc", "", "Tendermint RPC endpoint, only used for validation") f.String(prefix+".validator-config"+".eth-rpc", "", "L1 Websocket connection, only used for validation") f.String(prefix+".validator-config"+".blobstream", "", "Blobstream address, only used for validation") + f.Bool(prefix+".dangerous-reorg-on-read-failure", false, "DANGEROUS: reorg if any error during reads from celestia node") } func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, error) { @@ -127,6 +129,8 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err if err != nil { return nil, err } + } else { + readClient = daClient } if cfg.NamespaceId == "" { @@ -244,7 +248,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) celestiaLastNonDefaultGasprice.Update(gasPrice) } - proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) + proofs, err := c.ReadClient.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { celestiaFailureCounter.Inc(1) log.Warn("Error retrieving proof", "err", err) @@ -255,7 +259,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) for proofs == nil { log.Warn("Retrieved empty proof from GetProof, fetching again...", "proofRetries", proofRetries) time.Sleep(time.Millisecond * 100) - proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) + proofs, err = c.ReadClient.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { celestiaFailureCounter.Inc(1) log.Warn("Error retrieving proof", "err", err) @@ -265,7 +269,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) celestiaBlobInclusionRetries.Inc(1) } - included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment) + included, err := c.ReadClient.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment) if err != nil || !included { celestiaFailureCounter.Inc(1) log.Warn("Error checking for inclusion", "err", err, "proof", proofs) @@ -274,7 +278,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(dataBlob.Commitment)) // we fetch the blob so that we can get the correct start index in the square - dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment) + dataBlob, err = c.ReadClient.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { log.Warn("could not fetch blob", "err", err) celestiaFailureCounter.Inc(1) @@ -287,7 +291,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) return nil, errors.New("unexpected response code") } - header, err := c.Client.Header.GetByHeight(ctx, height) + header, err := c.ReadClient.Header.GetByHeight(ctx, height) if err != nil { celestiaFailureCounter.Inc(1) log.Warn("Header retrieval error", "err", err) @@ -359,19 +363,13 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ([]byte, *types.SquareData, error) { // Wait until our client is synced - var celestiaClient *openrpc.Client - if c.ReadClient != nil { - celestiaClient = c.ReadClient - } else { - celestiaClient = c.Client - } - err := celestiaClient.Header.SyncWait(ctx) + err := c.ReadClient.Header.SyncWait(ctx) if err != nil { log.Error("trouble with client sync", "err", err) return nil, nil, err } - header, err := celestiaClient.Header.GetByHeight(ctx, blobPointer.BlockHeight) + header, err := c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight) if err != nil { log.Error("could not fetch header", "err", err) return nil, nil, err @@ -380,14 +378,12 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ( headerDataHash := [32]byte{} copy(headerDataHash[:], header.DataHash) if headerDataHash != blobPointer.DataRoot { - log.Error("Data Root mismatch", " header.DataHash", header.DataHash, "blobPointer.DataRoot", hex.EncodeToString(blobPointer.DataRoot[:])) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("Data Root mismatch, header.DataHash=%v, blobPointer.DataRoot=%v", header.DataHash, hex.EncodeToString(blobPointer.DataRoot[:]))) } - proofs, err := celestiaClient.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:]) + proofs, err := c.ReadClient.Blob.GetProof(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:]) if err != nil { - log.Error("Error retrieving proof", "err", err) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("Error retrieving proof, err=%v", err)) } sharesLength := uint64(0) @@ -396,21 +392,19 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ( } if sharesLength != blobPointer.SharesLength { - log.Error("Share length mismatch", "sharesLength", sharesLength, "blobPointer.SharesLength", blobPointer.SharesLength) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("Share length mismatch, sharesLength=%v, blobPointer.SharesLength=%v", sharesLength, blobPointer.SharesLength)) } - blob, err := celestiaClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:]) + blob, err := c.ReadClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:]) if err != nil { // return an empty batch of data because we could not find the blob from the sequencer message - log.Error("Failed to get blob", "height", blobPointer.BlockHeight, "commitment", hex.EncodeToString(blobPointer.TxCommitment[:]), "err", err) - return []byte{}, nil, nil + // we eventually manually reorg, setting ReorgOnReadFailure=true + return c.returnErrorHelper(fmt.Errorf("Failed to get blob, height=%v, commitment=%v, err=%v", blobPointer.BlockHeight, hex.EncodeToString(blobPointer.TxCommitment[:]), err)) } - eds, err := celestiaClient.Share.GetEDS(ctx, header) + eds, err := c.ReadClient.Share.GetEDS(ctx, header) if err != nil { - log.Error("Failed to get EDS", "height", blobPointer.BlockHeight, "err", err) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("Failed to get EDS, height=%v, err=%v", blobPointer.BlockHeight, err)) } squareSize := uint64(eds.Width()) @@ -419,34 +413,29 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ( startRow := blobPointer.Start / odsSize if blobPointer.Start >= odsSize*odsSize { - log.Error("startIndexOds >= odsSize*odsSize", "startIndexOds", blobPointer.Start, "odsSize*odsSize", odsSize*odsSize) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("startIndexOds >= odsSize*odsSize, startIndexOds=%v, odsSize*odsSize=%v", blobPointer.Start, odsSize*odsSize)) } if blobPointer.Start+blobPointer.SharesLength < 1 { - log.Error("startIndexOds+blobPointer.SharesLength < 1", "startIndexOds+blobPointer.SharesLength", blobPointer.Start+blobPointer.SharesLength) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("startIndexOds+blobPointer.SharesLength < 1, startIndexOds+blobPointer.SharesLength=%v", blobPointer.Start+blobPointer.SharesLength)) } endIndexOds := blobPointer.Start + blobPointer.SharesLength - 1 if endIndexOds >= odsSize*odsSize { - log.Error("endIndexOds >= odsSize*odsSize", "endIndexOds", endIndexOds, "odsSize*odsSize", odsSize*odsSize) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("endIndexOds >= odsSize*odsSize, endIndexOds=%v, odsSize*odsSize=%v", endIndexOds, odsSize*odsSize)) } endRow := endIndexOds / odsSize if endRow >= odsSize || startRow >= odsSize { - log.Error("endRow >= odsSize || startRow >= odsSize", "endRow", endRow, "startRow", startRow, "odsSize", odsSize) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("endRow >= odsSize || startRow >= odsSize, endRow=%v, startRow=%v, odsSize=%v", endRow, startRow, odsSize)) } startColumn := blobPointer.Start % odsSize endColumn := endIndexOds % odsSize if startRow == endRow && startColumn > endColumn { - log.Error("start and end row are the same and startColumn >= endColumn", "startColumn", startColumn, "endColumn+1 ", endColumn+1) - return []byte{}, nil, nil + return c.returnErrorHelper(fmt.Errorf("start and end row are the same and startColumn >= endColumn, startColumn=%v, endColumn+1=%v", startColumn, endColumn+1)) } rows := [][][]byte{} @@ -481,18 +470,11 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { if err != nil { celestiaValidationFailureCounter.Inc(1) log.Error("Couldn't unmarshal Celestia blob pointer", "err", err) - return nil, nil - } - - var celestiaClient *openrpc.Client - if c.ReadClient != nil { - celestiaClient = c.ReadClient - } else { - celestiaClient = c.Client + return nil, err } // Get data root from a celestia node - header, err := celestiaClient.Header.GetByHeight(ctx, blobPointer.BlockHeight) + header, err := c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight) if err != nil { celestiaValidationFailureCounter.Inc(1) log.Warn("Header retrieval error", "err", err) @@ -689,3 +671,13 @@ func (c *CelestiaDA) filter(ctx context.Context, latestBlock uint64, celestiaHei return nil, fmt.Errorf("unable to find Data Commitment Stored event in Blobstream") } + +func (c *CelestiaDA) returnErrorHelper(err error) ([]byte, *types.SquareData, error) { + log.Error(err.Error()) + + if c.Cfg.ReorgOnReadFailure { + return []byte{}, nil, nil + } + + return nil, nil, err +}