Skip to content

Commit

Permalink
add metrics to das/celestia
Browse files Browse the repository at this point in the history
  • Loading branch information
Ferret-san committed Jun 4, 2024
1 parent a9e30f7 commit 4f1f17f
Showing 1 changed file with 51 additions and 9 deletions.
60 changes: 51 additions & 9 deletions das/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/offchainlabs/nitro/das/celestia/types"
"github.com/offchainlabs/nitro/solgen/go/celestiagen"

Expand All @@ -44,6 +45,19 @@ type ValidatorConfig struct {
BlobstreamAddr string `koanf:"blobstream"`
}

var (
celestiaDALastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/action/last_success", nil)
celestiaLastNonDefaultGasprice = metrics.NewRegisteredGaugeFloat64("celestia/last_gas_price", nil)
celestiaSuccessCounter = metrics.NewRegisteredCounter("celestia/action/celestia_success", nil)
celestiaFailureCounter = metrics.NewRegisteredCounter("celestia/action/celestia_failure", nil)
celestiaGasRetries = metrics.NewRegisteredCounter("celestia/action/gas_retries", nil)
celestiaBlobInclusionRetries = metrics.NewRegisteredCounter("celestia/action/inclusion_retries", nil)

celestiaValidationLastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/validation/last_success", nil)
celestiaValidationSuccessCounter = metrics.NewRegisteredCounter("celestia/validation/blobstream_success", nil)
celestiaValidationFailureCounter = metrics.NewRegisteredCounter("celestia/validation/blobstream_failure", nil)
)

var (
// ErrTxTimedout is the error message returned by the DA when mempool is congested
ErrTxTimedout = errors.New("timed out waiting for tx to be included in a block")
Expand Down Expand Up @@ -159,6 +173,7 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err
func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) {
if c.Cfg.NoopWriter {
log.Warn("NoopWriter enabled, falling back", "c.Cfg.NoopWriter", c.Cfg.NoopWriter)
celestiaFailureCounter.Inc(1)
return nil, errors.New("NoopWriter enabled")
}
// set a 5 minute timeout context on submissions
Expand All @@ -168,6 +183,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
defer cancel()
dataBlob, err := blob.NewBlobV0(*c.Namespace, message)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Error creating blob", "err", err)
return nil, err
}
Expand All @@ -187,23 +203,30 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
} else {
gasPrice = gasPrice * c.Cfg.GasMultiplier
}

celestiaGasRetries.Inc(1)
continue
default:
celestiaFailureCounter.Inc(1)
log.Warn("Blob Submission error", "err", err)
return nil, err
}
}

if height == 0 {
celestiaFailureCounter.Inc(1)
log.Warn("Unexpected height from blob response", "height", height)
return nil, errors.New("unexpected response code")
}

submitted = true

celestiaLastNonDefaultGasprice.Update(gasPrice)
}

proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Error retrieving proof", "err", err)
return nil, err
}
Expand All @@ -214,14 +237,17 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
time.Sleep(time.Millisecond * 100)
proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Error retrieving proof", "err", err)
return nil, err
}
proofRetries++
celestiaBlobInclusionRetries.Inc(1)
}

included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment)
if err != nil || !included {
celestiaFailureCounter.Inc(1)
log.Warn("Error checking for inclusion", "err", err, "proof", proofs)
return nil, err
}
Expand All @@ -230,16 +256,19 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
// we fetch the blob so that we can get the correct start index in the square
dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
celestiaFailureCounter.Inc(1)
return nil, err
}

if dataBlob.Index() <= 0 {
celestiaFailureCounter.Inc(1)
log.Warn("Unexpected index from blob response", "index", dataBlob.Index())
return nil, errors.New("unexpected response code")
}

header, err := c.Client.Header.GetByHeight(ctx, height)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("Header retrieval error", "err", err)
return nil, err
}
Expand All @@ -263,6 +292,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
// startRow
startRow := blobIndex / squareSize
if odsSize*startRow > blobIndex {
celestiaFailureCounter.Inc(1)
// return an empty batch
return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, dataBlob.Index())
}
Expand All @@ -278,25 +308,31 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)

blobPointerData, err := blobPointer.MarshalBinary()
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("BlobPointer MashalBinary error", "err", err)
return nil, err
}

buf := new(bytes.Buffer)
err = binary.Write(buf, binary.BigEndian, CelestiaMessageHeaderFlag)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("batch type byte serialization failed", "err", err)
return nil, err
}

err = binary.Write(buf, binary.BigEndian, blobPointerData)
if err != nil {
celestiaFailureCounter.Inc(1)
log.Warn("blob pointer data serialization failed", "err", err)
return nil, err
}

serializedBlobPointerData := buf.Bytes()
log.Trace("celestia.CelestiaDA.Store", "serialized_blob_pointer", serializedBlobPointerData)

celestiaSuccessCounter.Inc(1)
celestiaDALastSuccesfulActionGauge.Update(time.Now().Unix())

return serializedBlobPointerData, nil
}

Expand Down Expand Up @@ -403,6 +439,7 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (

func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
if c.Prover == nil {
celestiaValidationFailureCounter.Inc(1)
return nil, fmt.Errorf("no celestia prover config found")
}

Expand All @@ -413,19 +450,22 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
blobBytes := buf.Bytes()
err := blobPointer.UnmarshalBinary(blobBytes)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Couldn't unmarshal Celestia blob pointer", "err", err)
return nil, nil
}

// Get data root from a celestia node
header, err := c.Client.Header.GetByHeight(ctx, blobPointer.BlockHeight)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Warn("Header retrieval error", "err", err)
return nil, err
}

latestBlockNumber, err := c.Prover.EthClient.BlockNumber(context.Background())
if err != nil {
celestiaValidationFailureCounter.Inc(1)
return nil, err
}

Expand All @@ -436,6 +476,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
Context: ctx,
})
if err != nil {
celestiaValidationFailureCounter.Inc(1)
return nil, err
}

Expand All @@ -453,12 +494,14 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {

event, err = c.filter(ctx, latestBlockNumber, blobPointer.BlockHeight, backwards)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
return nil, err
}

// get the block data root inclusion proof to the data root tuple root
dataRootProof, err := c.Prover.Trpc.DataRootInclusionProof(ctx, blobPointer.BlockHeight, event.StartBlock, event.EndBlock)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
return nil, err
}

Expand Down Expand Up @@ -486,6 +529,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
proof,
)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
return nil, err
}

Expand All @@ -494,6 +538,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
if valid {
sharesProof, err := c.Prover.Trpc.ProveShares(ctx, blobPointer.BlockHeight, blobPointer.Start, blobPointer.Start+blobPointer.SharesLength)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Unable to get ShareProof", "err", err)
return nil, err
}
Expand All @@ -504,31 +549,28 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {

celestiaVerifierAbi, err := celestiagen.CelestiaBatchVerifierMetaData.GetAbi()
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Could not get ABI for Celestia Batch Verifier", "err", err)
return nil, err
}

verifyProofABI := celestiaVerifierAbi.Methods["verifyProof"]

// need to encode function signature to this
proofData, err := verifyProofABI.Inputs.Pack(
common.HexToAddress(c.Cfg.ValidatorConfig.BlobstreamAddr), namespaceNode, rowProof, attestationProof,
)
if err != nil {
celestiaValidationFailureCounter.Inc(1)
log.Error("Could not pack structs into ABI", "err", err)
return nil, err
}

fmt.Printf("Proof Data: %v\n", proofData)

// // apend size of batch + proofData
// sizeBytes := make([]byte, 4)
// binary.BigEndian.PutUint32(sizeBytes, uint32((len(proofData)))+msgLength)
// proofData = append(proofData, sizeBytes...)

celestiaValidationSuccessCounter.Inc(1)
celestiaValidationLastSuccesfulActionGauge.Update(time.Now().Unix())
return proofData, nil
}

celestiaValidationFailureCounter.Inc(1)
return nil, err
}

Expand Down

0 comments on commit 4f1f17f

Please sign in to comment.