diff --git a/das/celestia/celestia.go b/das/celestia/celestia.go index c6f95011ad..f6b3a90222 100644 --- a/das/celestia/celestia.go +++ b/das/celestia/celestia.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/offchainlabs/nitro/das/celestia/types" "github.com/offchainlabs/nitro/solgen/go/celestiagen" @@ -44,6 +45,19 @@ type ValidatorConfig struct { BlobstreamAddr string `koanf:"blobstream"` } +var ( + celestiaDALastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/action/last_success", nil) + celestiaLastNonDefaultGasprice = metrics.NewRegisteredGaugeFloat64("celestia/last_gas_price", nil) + celestiaSuccessCounter = metrics.NewRegisteredCounter("celestia/action/celestia_success", nil) + celestiaFailureCounter = metrics.NewRegisteredCounter("celestia/action/celestia_failure", nil) + celestiaGasRetries = metrics.NewRegisteredCounter("celestia/action/gas_retries", nil) + celestiaBlobInclusionRetries = metrics.NewRegisteredCounter("celestia/action/inclusion_retries", nil) + + celestiaValidationLastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/validation/last_success", nil) + celestiaValidationSuccessCounter = metrics.NewRegisteredCounter("celestia/validation/blobstream_success", nil) + celestiaValidationFailureCounter = metrics.NewRegisteredCounter("celestia/validation/blobstream_failure", nil) +) + var ( // ErrTxTimedout is the error message returned by the DA when mempool is congested ErrTxTimedout = errors.New("timed out waiting for tx to be included in a block") @@ -159,6 +173,7 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) { if c.Cfg.NoopWriter { log.Warn("NoopWriter enabled, falling back", "c.Cfg.NoopWriter", c.Cfg.NoopWriter) + celestiaFailureCounter.Inc(1) return nil, errors.New("NoopWriter enabled") } // set a 5 minute timeout context on submissions @@ -168,6 +183,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) defer cancel() dataBlob, err := blob.NewBlobV0(*c.Namespace, message) if err != nil { + celestiaFailureCounter.Inc(1) log.Warn("Error creating blob", "err", err) return nil, err } @@ -187,23 +203,30 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) } else { gasPrice = gasPrice * c.Cfg.GasMultiplier } + + celestiaGasRetries.Inc(1) continue default: + celestiaFailureCounter.Inc(1) log.Warn("Blob Submission error", "err", err) return nil, err } } if height == 0 { + celestiaFailureCounter.Inc(1) log.Warn("Unexpected height from blob response", "height", height) return nil, errors.New("unexpected response code") } submitted = true + + celestiaLastNonDefaultGasprice.Update(gasPrice) } proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { + celestiaFailureCounter.Inc(1) log.Warn("Error retrieving proof", "err", err) return nil, err } @@ -214,14 +237,17 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) time.Sleep(time.Millisecond * 100) proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { + celestiaFailureCounter.Inc(1) log.Warn("Error retrieving proof", "err", err) return nil, err } proofRetries++ + celestiaBlobInclusionRetries.Inc(1) } included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment) if err != nil || !included { + celestiaFailureCounter.Inc(1) log.Warn("Error checking for inclusion", "err", err, "proof", proofs) return nil, err } @@ -230,16 +256,19 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) // we fetch the blob so that we can get the correct start index in the square dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment) if err != nil { + celestiaFailureCounter.Inc(1) return nil, err } if dataBlob.Index() <= 0 { + celestiaFailureCounter.Inc(1) log.Warn("Unexpected index from blob response", "index", dataBlob.Index()) return nil, errors.New("unexpected response code") } header, err := c.Client.Header.GetByHeight(ctx, height) if err != nil { + celestiaFailureCounter.Inc(1) log.Warn("Header retrieval error", "err", err) return nil, err } @@ -263,6 +292,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) // startRow startRow := blobIndex / squareSize if odsSize*startRow > blobIndex { + celestiaFailureCounter.Inc(1) // return an empty batch return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, dataBlob.Index()) } @@ -278,6 +308,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) blobPointerData, err := blobPointer.MarshalBinary() if err != nil { + celestiaFailureCounter.Inc(1) log.Warn("BlobPointer MashalBinary error", "err", err) return nil, err } @@ -285,18 +316,23 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) buf := new(bytes.Buffer) err = binary.Write(buf, binary.BigEndian, CelestiaMessageHeaderFlag) if err != nil { + celestiaFailureCounter.Inc(1) log.Warn("batch type byte serialization failed", "err", err) return nil, err } err = binary.Write(buf, binary.BigEndian, blobPointerData) if err != nil { + celestiaFailureCounter.Inc(1) log.Warn("blob pointer data serialization failed", "err", err) return nil, err } serializedBlobPointerData := buf.Bytes() - log.Trace("celestia.CelestiaDA.Store", "serialized_blob_pointer", serializedBlobPointerData) + + celestiaSuccessCounter.Inc(1) + celestiaDALastSuccesfulActionGauge.Update(time.Now().Unix()) + return serializedBlobPointerData, nil } @@ -403,6 +439,7 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) ( func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { if c.Prover == nil { + celestiaValidationFailureCounter.Inc(1) return nil, fmt.Errorf("no celestia prover config found") } @@ -413,6 +450,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { blobBytes := buf.Bytes() err := blobPointer.UnmarshalBinary(blobBytes) if err != nil { + celestiaValidationFailureCounter.Inc(1) log.Error("Couldn't unmarshal Celestia blob pointer", "err", err) return nil, nil } @@ -420,12 +458,14 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { // Get data root from a celestia node header, err := c.Client.Header.GetByHeight(ctx, blobPointer.BlockHeight) if err != nil { + celestiaValidationFailureCounter.Inc(1) log.Warn("Header retrieval error", "err", err) return nil, err } latestBlockNumber, err := c.Prover.EthClient.BlockNumber(context.Background()) if err != nil { + celestiaValidationFailureCounter.Inc(1) return nil, err } @@ -436,6 +476,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { Context: ctx, }) if err != nil { + celestiaValidationFailureCounter.Inc(1) return nil, err } @@ -453,12 +494,14 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { event, err = c.filter(ctx, latestBlockNumber, blobPointer.BlockHeight, backwards) if err != nil { + celestiaValidationFailureCounter.Inc(1) return nil, err } // get the block data root inclusion proof to the data root tuple root dataRootProof, err := c.Prover.Trpc.DataRootInclusionProof(ctx, blobPointer.BlockHeight, event.StartBlock, event.EndBlock) if err != nil { + celestiaValidationFailureCounter.Inc(1) return nil, err } @@ -486,6 +529,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { proof, ) if err != nil { + celestiaValidationFailureCounter.Inc(1) return nil, err } @@ -494,6 +538,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { if valid { sharesProof, err := c.Prover.Trpc.ProveShares(ctx, blobPointer.BlockHeight, blobPointer.Start, blobPointer.Start+blobPointer.SharesLength) if err != nil { + celestiaValidationFailureCounter.Inc(1) log.Error("Unable to get ShareProof", "err", err) return nil, err } @@ -504,31 +549,28 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) { celestiaVerifierAbi, err := celestiagen.CelestiaBatchVerifierMetaData.GetAbi() if err != nil { + celestiaValidationFailureCounter.Inc(1) log.Error("Could not get ABI for Celestia Batch Verifier", "err", err) return nil, err } verifyProofABI := celestiaVerifierAbi.Methods["verifyProof"] - // need to encode function signature to this proofData, err := verifyProofABI.Inputs.Pack( common.HexToAddress(c.Cfg.ValidatorConfig.BlobstreamAddr), namespaceNode, rowProof, attestationProof, ) if err != nil { + celestiaValidationFailureCounter.Inc(1) log.Error("Could not pack structs into ABI", "err", err) return nil, err } - fmt.Printf("Proof Data: %v\n", proofData) - - // // apend size of batch + proofData - // sizeBytes := make([]byte, 4) - // binary.BigEndian.PutUint32(sizeBytes, uint32((len(proofData)))+msgLength) - // proofData = append(proofData, sizeBytes...) - + celestiaValidationSuccessCounter.Inc(1) + celestiaValidationLastSuccesfulActionGauge.Update(time.Now().Unix()) return proofData, nil } + celestiaValidationFailureCounter.Inc(1) return nil, err }