diff --git a/.github/workflows/docker-build-publish.yml b/.github/workflows/docker-build-publish.yml index c922e8b901..2ce01c81df 100644 --- a/.github/workflows/docker-build-publish.yml +++ b/.github/workflows/docker-build-publish.yml @@ -18,6 +18,6 @@ jobs: permissions: contents: write packages: write - uses: celestiaorg/.github/.github/workflows/reusable_dockerfile_pipeline.yml@v0.2.0 # yamllint disable-line rule:line-length + uses: celestiaorg/.github/.github/workflows/reusable_dockerfile_pipeline.yml@v0.2.2 # yamllint disable-line rule:line-length with: dockerfile: Dockerfile diff --git a/Makefile b/Makefile index 310e65cbe0..2567b225d3 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ SHELL=/usr/bin/env bash PROJECTNAME=$(shell basename "$(PWD)") -LDFLAGS=-ldflags="-X 'main.buildTime=$(shell date)' -X 'main.lastCommit=$(shell git rev-parse HEAD)' -X 'main.semanticVersion=$(shell git describe --tags --dirty=-dev 2>/dev/null || git rev-parse --abbrev-ref HEAD)'" +versioningPath := "github.com/celestiaorg/celestia-node/nodebuilder/node" +LDFLAGS=-ldflags="-X '$(versioningPath).buildTime=$(shell date)' -X '$(versioningPath).lastCommit=$(shell git rev-parse HEAD)' -X '$(versioningPath).semanticVersion=$(shell git describe --tags --dirty=-dev 2>/dev/null || git rev-parse --abbrev-ref HEAD)'" ifeq (${PREFIX},) PREFIX := /usr/local endif diff --git a/cmd/celestia/rpc.go b/cmd/celestia/rpc.go index 169ca4b8d1..64b21aaf86 100644 --- a/cmd/celestia/rpc.go +++ b/cmd/celestia/rpc.go @@ -195,7 +195,7 @@ func parseParams(method string, params []string) []interface{} { // 1. Height num, err := strconv.ParseUint(params[0], 10, 64) if err != nil { - panic("Error parsing gas limit: uint64 could not be parsed.") + panic("Error parsing height: uint64 could not be parsed.") } parsedParams[0] = num // 2. NamespaceID @@ -215,7 +215,7 @@ func parseParams(method string, params []string) []interface{} { // 1. Height num, err := strconv.ParseUint(params[0], 10, 64) if err != nil { - panic("Error parsing gas limit: uint64 could not be parsed.") + panic("Error parsing height: uint64 could not be parsed.") } parsedParams[0] = num // 2. Namespace diff --git a/cmd/celestia/util.go b/cmd/celestia/util.go index 85505c3a60..a38860d1f7 100644 --- a/cmd/celestia/util.go +++ b/cmd/celestia/util.go @@ -26,12 +26,6 @@ func persistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err return err } ctx = cmdnode.WithNetwork(ctx, parsedNetwork) - ctx = cmdnode.WithNodeBuildInfo(ctx, &node.BuildInfo{ - LastCommit: lastCommit, - SemanticVersion: semanticVersion, - SystemVersion: systemVersion, - GolangVersion: golangVersion, - }) // loads existing config into the environment ctx, err = cmdnode.ParseNodeFlags(ctx, cmd, cmdnode.Network(ctx)) diff --git a/cmd/celestia/version.go b/cmd/celestia/version.go index 462f17b474..f0d379e7a7 100644 --- a/cmd/celestia/version.go +++ b/cmd/celestia/version.go @@ -2,18 +2,10 @@ package main import ( "fmt" - "runtime" "github.com/spf13/cobra" -) - -var ( - buildTime string - lastCommit string - semanticVersion string - systemVersion = fmt.Sprintf("%s/%s", runtime.GOARCH, runtime.GOOS) - golangVersion = runtime.Version() + "github.com/celestiaorg/celestia-node/nodebuilder/node" ) var versionCmd = &cobra.Command{ @@ -24,9 +16,10 @@ var versionCmd = &cobra.Command{ } func printBuildInfo(_ *cobra.Command, _ []string) { - fmt.Printf("Semantic version: %s\n", semanticVersion) - fmt.Printf("Commit: %s\n", lastCommit) - fmt.Printf("Build Date: %s\n", buildTime) - fmt.Printf("System version: %s\n", systemVersion) - fmt.Printf("Golang version: %s\n", golangVersion) + buildInfo := node.GetBuildInfo() + fmt.Printf("Semantic version: %s\n", buildInfo.SemanticVersion) + fmt.Printf("Commit: %s\n", buildInfo.LastCommit) + fmt.Printf("Build Date: %s\n", buildInfo.BuildTime) + fmt.Printf("System version: %s\n", buildInfo.SystemVersion) + fmt.Printf("Golang version: %s\n", buildInfo.GolangVersion) } diff --git a/cmd/env.go b/cmd/env.go index ca915d884f..f9860a2de8 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -38,11 +38,6 @@ func NodeConfig(ctx context.Context) nodebuilder.Config { return cfg } -// NodeInfo reads the node build inforamtion from the context. -func NodeInfo(ctx context.Context) node.BuildInfo { - return ctx.Value(buildInfo{}).(node.BuildInfo) -} - // WithNodeType sets the node type in the given context. func WithNodeType(ctx context.Context, tp node.Type) context.Context { return context.WithValue(ctx, nodeTypeKey{}, tp) @@ -78,16 +73,10 @@ func WithNodeConfig(ctx context.Context, config *nodebuilder.Config) context.Con return context.WithValue(ctx, configKey{}, *config) } -// WithNodeConfig sets the node config build information. -func WithNodeBuildInfo(ctx context.Context, info *node.BuildInfo) context.Context { - return context.WithValue(ctx, buildInfo{}, *info) -} - type ( optionsKey struct{} configKey struct{} storePathKey struct{} nodeTypeKey struct{} networkKey struct{} - buildInfo struct{} ) diff --git a/cmd/flags_misc.go b/cmd/flags_misc.go index 4483e17201..4a11977b86 100644 --- a/cmd/flags_misc.go +++ b/cmd/flags_misc.go @@ -11,13 +11,8 @@ import ( otelpyroscope "github.com/pyroscope-io/otel-profiling-go" "github.com/spf13/cobra" flag "github.com/spf13/pflag" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/sdk/resource" - tracesdk "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.11.0" - "go.opentelemetry.io/otel/trace" "github.com/celestiaorg/celestia-node/logs" "github.com/celestiaorg/celestia-node/nodebuilder" @@ -199,7 +194,6 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e } if ok { - var tp trace.TracerProvider opts := []otlptracehttp.Option{ otlptracehttp.WithCompression(otlptracehttp.GzipCompression), otlptracehttp.WithEndpoint(cmd.Flag(tracingEndpointFlag).Value.String()), @@ -210,30 +204,13 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e opts = append(opts, otlptracehttp.WithInsecure()) } - exp, err := otlptracehttp.New(cmd.Context(), opts...) - if err != nil { - return ctx, err - } - - tp = tracesdk.NewTracerProvider( - tracesdk.WithSampler(tracesdk.AlwaysSample()), - // Always be sure to batch in production. - tracesdk.WithBatcher(exp), - // Record information about this application in a Resource. - tracesdk.WithResource(resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String(fmt.Sprintf("Celestia-%s", NodeType(ctx).String())), - // TODO(@Wondertan): Versioning: semconv.ServiceVersionKey - )), - ) - + pyroOpts := make([]otelpyroscope.Option, 0) ok, err = cmd.Flags().GetBool(pyroscopeTracing) if err != nil { panic(err) } if ok { - tp = otelpyroscope.NewTracerProvider( - tp, + pyroOpts = append(pyroOpts, otelpyroscope.WithAppName("celestia.da-node"), otelpyroscope.WithPyroscopeURL(cmd.Flag(pyroscopeEndpoint).Value.String()), otelpyroscope.WithRootSpanOnly(true), @@ -242,8 +219,7 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e otelpyroscope.WithProfileBaselineURL(true), ) } - - otel.SetTracerProvider(tp) + ctx = WithNodeOptions(ctx, nodebuilder.WithTraces(opts, pyroOpts)) } ok, err = cmd.Flags().GetBool(metricsFlag) @@ -262,7 +238,7 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e opts = append(opts, otlpmetrichttp.WithInsecure()) } - ctx = WithNodeOptions(ctx, nodebuilder.WithMetrics(opts, NodeType(ctx), NodeInfo(ctx))) + ctx = WithNodeOptions(ctx, nodebuilder.WithMetrics(opts, NodeType(ctx))) } ok, err = cmd.Flags().GetBool(p2pMetrics) diff --git a/core/eds_test.go b/core/eds_test.go index 2cc6f1c7cc..6a2026ee58 100644 --- a/core/eds_test.go +++ b/core/eds_test.go @@ -47,6 +47,7 @@ func TestEmptySquareWithZeroTxs(t *testing.T) { eds, err = app.ExtendBlock(data, appconsts.LatestVersion) require.NoError(t, err) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) assert.Equal(t, share.EmptyRoot().Hash(), dah.Hash()) } diff --git a/das/metrics.go b/das/metrics.go index 1dcf5c8165..42b472d909 100644 --- a/das/metrics.go +++ b/das/metrics.go @@ -6,11 +6,9 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" - "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric" "github.com/celestiaorg/celestia-node/header" ) @@ -22,73 +20,70 @@ const ( ) var ( - meter = global.MeterProvider().Meter("das") + meter = otel.Meter("das") ) type metrics struct { - sampled syncint64.Counter - sampleTime syncfloat64.Histogram - getHeaderTime syncfloat64.Histogram - newHead syncint64.Counter + sampled metric.Int64Counter + sampleTime metric.Float64Histogram + getHeaderTime metric.Float64Histogram + newHead metric.Int64Counter lastSampledTS uint64 } func (d *DASer) InitMetrics() error { - sampled, err := meter.SyncInt64().Counter("das_sampled_headers_counter", - instrument.WithDescription("sampled headers counter")) + sampled, err := meter.Int64Counter("das_sampled_headers_counter", + metric.WithDescription("sampled headers counter")) if err != nil { return err } - sampleTime, err := meter.SyncFloat64().Histogram("das_sample_time_hist", - instrument.WithDescription("duration of sampling a single header")) + sampleTime, err := meter.Float64Histogram("das_sample_time_hist", + metric.WithDescription("duration of sampling a single header")) if err != nil { return err } - getHeaderTime, err := meter.SyncFloat64().Histogram("das_get_header_time_hist", - instrument.WithDescription("duration of getting header from header store")) + getHeaderTime, err := meter.Float64Histogram("das_get_header_time_hist", + metric.WithDescription("duration of getting header from header store")) if err != nil { return err } - newHead, err := meter.SyncInt64().Counter("das_head_updated_counter", - instrument.WithDescription("amount of times DAS'er advanced network head")) + newHead, err := meter.Int64Counter("das_head_updated_counter", + metric.WithDescription("amount of times DAS'er advanced network head")) if err != nil { return err } - lastSampledTS, err := meter.AsyncInt64().Gauge("das_latest_sampled_ts", - instrument.WithDescription("latest sampled timestamp")) + lastSampledTS, err := meter.Int64ObservableGauge("das_latest_sampled_ts", + metric.WithDescription("latest sampled timestamp")) if err != nil { return err } - busyWorkers, err := meter.AsyncInt64().Gauge("das_busy_workers_amount", - instrument.WithDescription("number of active parallel workers in DAS'er")) + busyWorkers, err := meter.Int64ObservableGauge("das_busy_workers_amount", + metric.WithDescription("number of active parallel workers in DAS'er")) if err != nil { return err } - networkHead, err := meter.AsyncInt64().Gauge("das_network_head", - instrument.WithDescription("most recent network head")) + networkHead, err := meter.Int64ObservableGauge("das_network_head", + metric.WithDescription("most recent network head")) if err != nil { return err } - sampledChainHead, err := meter.AsyncInt64().Gauge("das_sampled_chain_head", - instrument.WithDescription("height of the sampled chain - all previous headers have been successfully sampled")) + sampledChainHead, err := meter.Int64ObservableGauge("das_sampled_chain_head", + metric.WithDescription("height of the sampled chain - all previous headers have been successfully sampled")) if err != nil { return err } - totalSampled, err := meter. - AsyncInt64(). - Gauge( - "das_total_sampled_headers", - instrument.WithDescription("total sampled headers gauge"), - ) + totalSampled, err := meter.Int64ObservableGauge("das_total_sampled_headers", + metric.WithDescription("total sampled headers gauge"), + ) if err != nil { return err } @@ -100,36 +95,38 @@ func (d *DASer) InitMetrics() error { newHead: newHead, } - err = meter.RegisterCallback( - []instrument.Asynchronous{ - lastSampledTS, - busyWorkers, - networkHead, - sampledChainHead, - totalSampled, - }, - func(ctx context.Context) { - stats, err := d.sampler.stats(ctx) - if err != nil { - log.Errorf("observing stats: %s", err.Error()) - } - - for jobType, amount := range stats.workersByJobType() { - busyWorkers.Observe(ctx, amount, - attribute.String(jobTypeLabel, string(jobType))) - } - - networkHead.Observe(ctx, int64(stats.NetworkHead)) - sampledChainHead.Observe(ctx, int64(stats.SampledChainHead)) - - if ts := atomic.LoadUint64(&d.sampler.metrics.lastSampledTS); ts != 0 { - lastSampledTS.Observe(ctx, int64(ts)) - } - - totalSampled.Observe(ctx, int64(stats.totalSampled())) - }, - ) + callback := func(ctx context.Context, observer metric.Observer) error { + stats, err := d.sampler.stats(ctx) + if err != nil { + log.Errorf("observing stats: %s", err.Error()) + return err + } + + for jobType, amount := range stats.workersByJobType() { + observer.ObserveInt64(busyWorkers, amount, + metric.WithAttributes( + attribute.String(jobTypeLabel, string(jobType)), + )) + } + + observer.ObserveInt64(networkHead, int64(stats.NetworkHead)) + observer.ObserveInt64(sampledChainHead, int64(stats.SampledChainHead)) + + if ts := atomic.LoadUint64(&d.sampler.metrics.lastSampledTS); ts != 0 { + observer.ObserveInt64(lastSampledTS, int64(ts)) + } + observer.ObserveInt64(totalSampled, int64(stats.totalSampled())) + return nil + } + + _, err = meter.RegisterCallback(callback, + lastSampledTS, + busyWorkers, + networkHead, + sampledChainHead, + totalSampled, + ) if err != nil { return fmt.Errorf("registering metrics callback: %w", err) } @@ -153,16 +150,18 @@ func (m *metrics) observeSample( ctx = context.Background() } m.sampleTime.Record(ctx, sampleTime.Seconds(), - attribute.Bool(failedLabel, err != nil), - attribute.Int(headerWidthLabel, len(h.DAH.RowRoots)), - attribute.String(jobTypeLabel, string(jobType)), - ) + metric.WithAttributes( + attribute.Bool(failedLabel, err != nil), + attribute.Int(headerWidthLabel, len(h.DAH.RowRoots)), + attribute.String(jobTypeLabel, string(jobType)), + )) m.sampled.Add(ctx, 1, - attribute.Bool(failedLabel, err != nil), - attribute.Int(headerWidthLabel, len(h.DAH.RowRoots)), - attribute.String(jobTypeLabel, string(jobType)), - ) + metric.WithAttributes( + attribute.Bool(failedLabel, err != nil), + attribute.Int(headerWidthLabel, len(h.DAH.RowRoots)), + attribute.String(jobTypeLabel, string(jobType)), + )) atomic.StoreUint64(&m.lastSampledTS, uint64(time.Now().UTC().Unix())) } diff --git a/go.mod b/go.mod index 8dd663736b..ba5156ccff 100644 --- a/go.mod +++ b/go.mod @@ -10,12 +10,12 @@ require ( github.com/BurntSushi/toml v1.3.2 github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921 github.com/benbjohnson/clock v1.3.5 - github.com/celestiaorg/celestia-app v1.0.0-rc8 - github.com/celestiaorg/go-fraud v0.1.0 - github.com/celestiaorg/go-header v0.2.11 + github.com/celestiaorg/celestia-app v1.0.0-rc9 + github.com/celestiaorg/go-fraud v0.1.2 + github.com/celestiaorg/go-header v0.2.12 github.com/celestiaorg/go-libp2p-messenger v0.2.0 github.com/celestiaorg/nmt v0.17.0 - github.com/celestiaorg/rsmt2d v0.9.0 + github.com/celestiaorg/rsmt2d v0.10.0 github.com/cosmos/cosmos-sdk v0.46.13 github.com/cosmos/cosmos-sdk/api v0.1.0 github.com/cristalhq/jwt v1.2.0 @@ -64,13 +64,14 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 github.com/tendermint/tendermint v0.34.28 - go.opentelemetry.io/otel v1.13.0 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.34.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.2 - go.opentelemetry.io/otel/metric v0.34.0 - go.opentelemetry.io/otel/sdk v1.11.2 - go.opentelemetry.io/otel/sdk/metric v0.34.0 - go.opentelemetry.io/otel/trace v1.13.0 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0 + go.opentelemetry.io/otel/metric v1.16.0 + go.opentelemetry.io/otel/sdk v1.16.0 + go.opentelemetry.io/otel/sdk/metric v0.39.0 + go.opentelemetry.io/otel/trace v1.16.0 go.opentelemetry.io/proto/otlp v0.19.0 go.uber.org/fx v1.19.3 go.uber.org/zap v1.24.0 @@ -105,7 +106,7 @@ require ( github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 // indirect github.com/celestiaorg/quantum-gravity-bridge v1.3.0 // indirect - github.com/cenkalti/backoff/v4 v4.2.0 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chzyer/readline v1.5.1 // indirect @@ -176,7 +177,7 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/gtank/merlin v0.1.1 // indirect github.com/gtank/ristretto255 v0.1.2 // indirect @@ -306,21 +307,22 @@ require ( github.com/zondax/ledger-go v0.14.1 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.17.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/oauth2 v0.7.0 // indirect + golang.org/x/oauth2 v0.8.0 // indirect golang.org/x/term v0.8.0 // indirect golang.org/x/tools v0.9.1 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/api v0.114.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect @@ -331,10 +333,10 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.15.0-sdk-v0.46.13 + github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.16.0-sdk-v0.46.13 github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230413141458-735ab09a15d6 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.23.0-tm-v0.34.28 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.24.0-tm-v0.34.28 ) diff --git a/go.sum b/go.sum index 0ad1e62fb1..cfe93a5fa7 100644 --- a/go.sum +++ b/go.sum @@ -344,18 +344,18 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7 github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= -github.com/celestiaorg/celestia-app v1.0.0-rc8 h1:bBQY8tR3DCc8uA7c2ney4AeWwEn9Ob1lc1QryGKVF2M= -github.com/celestiaorg/celestia-app v1.0.0-rc8/go.mod h1:X0R6s+LvfusZu+jBj/2SbTm4Nb/H1R2MD1CnR4fwQno= -github.com/celestiaorg/celestia-core v1.23.0-tm-v0.34.28 h1:G7/rq6xTnuFf3XsVZEcl/Sa6vtagm9NQNhaUaSgjvy0= -github.com/celestiaorg/celestia-core v1.23.0-tm-v0.34.28/go.mod h1:J/GsBjoTZaFz71VeyrLZbG8rV+Rzi6oFEUZUipQ97hQ= -github.com/celestiaorg/cosmos-sdk v1.15.0-sdk-v0.46.13 h1:vaQKgaOm0w58JAvOgn2iDohqjH7kvvRqVKiMcBDWifA= -github.com/celestiaorg/cosmos-sdk v1.15.0-sdk-v0.46.13/go.mod h1:G9XkhOJZde36FH0kt/1ayg4ZaioZEQmmRfMa/zQig0I= +github.com/celestiaorg/celestia-app v1.0.0-rc9 h1:6xDYE+OziXO/rLeYy/MutnJpE8M2sIPryZ/ifSWUmdc= +github.com/celestiaorg/celestia-app v1.0.0-rc9/go.mod h1:aGFnIIdA30DtFzznYbcfMdNnXiUebfEUkkrQu8imC3I= +github.com/celestiaorg/celestia-core v1.24.0-tm-v0.34.28 h1:eXS3v26nob8Xs2+flKHVxcTzhzQW44KgTcooR3OxnK4= +github.com/celestiaorg/celestia-core v1.24.0-tm-v0.34.28/go.mod h1:J/GsBjoTZaFz71VeyrLZbG8rV+Rzi6oFEUZUipQ97hQ= +github.com/celestiaorg/cosmos-sdk v1.16.0-sdk-v0.46.13 h1:N1PrCWcYkaODeIQyyVBmDKDTwiQWZ31bgtTEYIGeby8= +github.com/celestiaorg/cosmos-sdk v1.16.0-sdk-v0.46.13/go.mod h1:xpBZc/OYZ736hp0IZlBGNUhEgCD9C+bKs8yNLZibyv0= github.com/celestiaorg/dagstore v0.0.0-20230413141458-735ab09a15d6 h1:/yCwMCoOPcYCiG18u8/1pv5eXF04xczoQO3sR0bKsgM= github.com/celestiaorg/dagstore v0.0.0-20230413141458-735ab09a15d6/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM= -github.com/celestiaorg/go-fraud v0.1.0 h1:v6mZvlmf2J5ELZfPnrtmmOvKbaYIUs/erDWPO8NbZyY= -github.com/celestiaorg/go-fraud v0.1.0/go.mod h1:yoNM35cKMAkt5Mi/Qx3Wi9bnPilLi8n6RpHZVglTUDs= -github.com/celestiaorg/go-header v0.2.11 h1:dLpuUfpGNxFfJNw3Ar3SqWc+AeyT1DlTP5mLjx9Ths8= -github.com/celestiaorg/go-header v0.2.11/go.mod h1:i9OpY70+PJ1xPw1IgMfF0Pk6vBD6VWPmjY3bgubJBcU= +github.com/celestiaorg/go-fraud v0.1.2 h1:Bf7yIN3lZ4IR/Vlu5OtmcVCVNESBKEJ/xwu28rRKGA8= +github.com/celestiaorg/go-fraud v0.1.2/go.mod h1:kHZXQY+6gd1kYkoWRFFKgWyrLPWRgDN3vd1Ll9gE/oo= +github.com/celestiaorg/go-header v0.2.12 h1:3H9nir20+MTY1vXbLxOUOV05ZspotR6JOiZGKxACHCQ= +github.com/celestiaorg/go-header v0.2.12/go.mod h1:NhiWq97NtAYyRBu8quzYOUghQULjgOzO2Ql0iVEFOf0= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo= github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8= @@ -366,13 +366,13 @@ github.com/celestiaorg/nmt v0.17.0 h1:/k8YLwJvuHgT/jQ435zXKaDX811+sYEMXL4B/vYdSL github.com/celestiaorg/nmt v0.17.0/go.mod h1:ZndCeAR4l9lxm7W51ouoyTo1cxhtFgK+4DpEIkxRA3A= github.com/celestiaorg/quantum-gravity-bridge v1.3.0 h1:9zPIp7w1FWfkPnn16y3S4FpFLnQtS7rm81CUVcHEts0= github.com/celestiaorg/quantum-gravity-bridge v1.3.0/go.mod h1:6WOajINTDEUXpSj5UZzod16UZ96ZVB/rFNKyM+Mt1gI= -github.com/celestiaorg/rsmt2d v0.9.0 h1:kon78I748ZqjNzI8OAqPN+2EImuZuanj/6gTh8brX3o= -github.com/celestiaorg/rsmt2d v0.9.0/go.mod h1:E06nDxfoeBDltWRvTR9dLviiUZI5/6mLXAuhSJzz3Iw= +github.com/celestiaorg/rsmt2d v0.10.0 h1:8dprr6CW5mCk5YPnbiLdirojw9YsJOE+XB+GORb8sT0= +github.com/celestiaorg/rsmt2d v0.10.0/go.mod h1:BiCZkCJfhDHUEOJKXUeu+CudjluecKvRTqHcuxKvodc= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= -github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= -github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk= github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= @@ -856,8 +856,9 @@ github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/gtank/merlin v0.1.1-0.20191105220539-8318aed1a79f/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= @@ -2056,31 +2057,31 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel v1.4.1/go.mod h1:StM6F/0fSwpd8dKWDCdRr7uRvEPYdW0hBSlbdTiUde4= -go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y= -go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 h1:htgM8vZIF8oPSCxa341e3IZ4yr/sKxgu8KZYllByiVY= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2/go.mod h1:rqbht/LlhVBgn5+k3M5QK96K5Xb0DvXpMJ5SFQpY6uw= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 h1:kpskzLZ60cJ48SJ4uxWa6waBL+4kSV6nVK8rP+QM8Wg= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0/go.mod h1:4+x3i62TEegDHuzNva0bMcAN8oUi5w4liGb1d/VgPYo= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.34.0 h1:t4Ajxj8JGjxkqoBtbkCOY2cDUl9RwiNE9LPQavooi9U= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.34.0/go.mod h1:WO7omosl4P7JoanH9NgInxDxEn2F2M5YinIh8EyeT8w= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 h1:fqR1kli93643au1RKo0Uma3d2aPQKT+WBKfTSBaKbOc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2/go.mod h1:5Qn6qvgkMsLDX+sYK64rHb1FPhpn0UtxF+ouX1uhyJE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.2 h1:Us8tbCmuN16zAnK5TC69AtODLycKbwnskQzaB6DfFhc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.2/go.mod h1:GZWSQQky8AgdJj50r1KJm8oiQiIPaAX7uZCFQX9GzC8= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 h1:t4ZwRPU+emrcvM2e9DHd0Fsf0JTPVcbfa/BhTDF03d0= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0/go.mod h1:vLarbg68dH2Wa77g71zmKQqlQ8+8Rq3GRG31uc0WcWI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0 h1:f6BwB2OACc3FCbYVznctQ9V6KK7Vq6CjmYXJ7DeSs4E= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0/go.mod h1:UqL5mZ3qs6XYhDnZaW1Ps4upD+PX6LipH40AoeuIlwU= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 h1:IZXpCEtI7BbX01DRQEWTGDkvjMB6hEhiEZXS+eg2YqY= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0/go.mod h1:xY111jIZtWb+pUUgT4UiiSonAaY2cD2Ts5zvuKLki3o= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 h1:cbsD4cUcviQGXdw8+bo5x2wazq10SKz8hEbtCRPcU78= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0/go.mod h1:JgXSGah17croqhJfhByOLVY719k1emAXC8MVhCIJlRs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0 h1:iqjq9LAB8aK++sKVcELezzn655JnBNdsDhghU4G/So8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0/go.mod h1:hGXzO5bhhSHZnKvrDaXB82Y9DRFour0Nz/KrBh7reWw= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= -go.opentelemetry.io/otel/metric v0.34.0 h1:MCPoQxcg/26EuuJwpYN1mZTeCYAUGx8ABxfW07YkjP8= -go.opentelemetry.io/otel/metric v0.34.0/go.mod h1:ZFuI4yQGNCupurTXCwkeD/zHBt+C2bR7bw5JqUm/AP8= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc= -go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU= -go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU= -go.opentelemetry.io/otel/sdk/metric v0.34.0 h1:7ElxfQpXCFZlRTvVRTkcUvK8Gt5DC8QzmzsLsO2gdzo= -go.opentelemetry.io/otel/sdk/metric v0.34.0/go.mod h1:l4r16BIqiqPy5rd14kkxllPy/fOI4tWo1jkpD9Z3ffQ= +go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= +go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= +go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= +go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= go.opentelemetry.io/otel/trace v1.4.1/go.mod h1:iYEVbroFCNut9QkwEczV9vMRPHNKSSwYZjulEtsmhFc= -go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY= -go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= @@ -2323,8 +2324,8 @@ golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094/go.mod h1:h4gKUeWbJ4rQPri golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= golang.org/x/oauth2 v0.1.0/go.mod h1:G9FE4dLTsbXUu90h/Pf85g4w1D+SSAgR+q46nJZ8M4A= -golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= -golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= +golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= +golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2764,8 +2765,12 @@ google.golang.org/genproto v0.0.0-20221010155953-15ba04fc1c0e/go.mod h1:3526vdqw google.golang.org/genproto v0.0.0-20221014173430-6e2ab493f96b/go.mod h1:1vXfmgAz9N9Jx0QA82PqRVauvCz1SGSz739p0f183jM= google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a/go.mod h1:1vXfmgAz9N9Jx0QA82PqRVauvCz1SGSz739p0f183jM= google.golang.org/genproto v0.0.0-20221025140454-527a21cfbd71/go.mod h1:9qHF0xnpdSfF6knlcsnpzUu5y+rpwgbvsyGAZPBMg4s= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e h1:Ao9GzfUMPH3zjVfzXG5rlWlk+Q8MXWKwWpwVQE1MXfw= +google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc h1:kVKPf/IiYSBWEWtkIn6wZXwWGCnLKcC8oWfZvXjsGnM= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= diff --git a/header/header.go b/header/header.go index e59d3802c1..0e386be8b5 100644 --- a/header/header.go +++ b/header/header.go @@ -75,12 +75,18 @@ func MakeExtendedHeader( vals *core.ValidatorSet, eds *rsmt2d.ExtendedDataSquare, ) (*ExtendedHeader, error) { - var dah DataAvailabilityHeader + var ( + dah DataAvailabilityHeader + err error + ) switch eds { case nil: dah = EmptyDAH() default: - dah = da.NewDataAvailabilityHeader(eds) + dah, err = da.NewDataAvailabilityHeader(eds) + } + if err != nil { + return nil, err } eh := &ExtendedHeader{ diff --git a/header/headertest/testing.go b/header/headertest/testing.go index ceeb9bb164..b20d389452 100644 --- a/header/headertest/testing.go +++ b/header/headertest/testing.go @@ -325,7 +325,8 @@ func FraudMaker(t *testing.T, faultHeight int64, bServ blockservice.BlockService func ExtendedHeaderFromEDS(t *testing.T, height uint64, eds *rsmt2d.ExtendedDataSquare) *header.ExtendedHeader { valSet, vals := RandValidatorSet(10, 10) gen := RandRawHeader(t) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) gen.DataHash = dah.Hash() gen.ValidatorsHash = valSet.Hash() @@ -355,7 +356,8 @@ func CreateFraudExtHeader( square := edstest.RandByzantineEDS(t, len(eh.DAH.RowRoots)) err := ipld.ImportEDS(context.Background(), square, serv) require.NoError(t, err) - dah := da.NewDataAvailabilityHeader(square) + dah, err := da.NewDataAvailabilityHeader(square) + require.NoError(t, err) eh.DAH = &dah eh.RawHeader.DataHash = dah.Hash() return eh, square diff --git a/header/headertest/verify_test.go b/header/headertest/verify_test.go index 177352a4cf..33bcf72642 100644 --- a/header/headertest/verify_test.go +++ b/header/headertest/verify_test.go @@ -37,14 +37,6 @@ func TestVerify(t *testing.T) { }, err: true, }, - { - prepare: func() libhead.Header { - untrusted := *untrustedNonAdj - untrusted.Commit = NewTestSuite(t, 2).Commit(RandRawHeader(t)) - return &untrusted - }, - err: true, - }, { prepare: func() libhead.Header { untrusted := *untrustedAdj diff --git a/header/verify.go b/header/verify.go index 18e7f91ea6..827f6c1d1b 100644 --- a/header/verify.go +++ b/header/verify.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - "github.com/tendermint/tendermint/light" - libhead "github.com/celestiaorg/go-header" ) @@ -47,12 +45,6 @@ func (eh *ExtendedHeader) Verify(untrusted libhead.Header) error { return nil } - // Ensure that untrusted commit has enough of trusted commit's power. - err := eh.ValidatorSet.VerifyCommitLightTrusting(eh.ChainID(), untrst.Commit, light.DefaultTrustLevel) - if err != nil { - return &libhead.VerifyError{Reason: err} - } - return nil } diff --git a/nodebuilder/node/buildInfo.go b/nodebuilder/node/buildInfo.go index 5f5bdde28e..53d8554d4d 100644 --- a/nodebuilder/node/buildInfo.go +++ b/nodebuilder/node/buildInfo.go @@ -1,9 +1,35 @@ package node -// BuildInfo stores all necessary information for the current build. +import ( + "fmt" + "runtime" +) + +var ( + buildTime string + lastCommit string + semanticVersion string + + systemVersion = fmt.Sprintf("%s/%s", runtime.GOARCH, runtime.GOOS) + golangVersion = runtime.Version() +) + +// BuildInfo represents all necessary information about current build. type BuildInfo struct { + BuildTime string LastCommit string SemanticVersion string SystemVersion string GolangVersion string } + +// GetBuildInfo returns information about current build. +func GetBuildInfo() *BuildInfo { + return &BuildInfo{ + buildTime, + lastCommit, + semanticVersion, + systemVersion, + golangVersion, + } +} diff --git a/nodebuilder/node/metrics.go b/nodebuilder/node/metrics.go index 625e8425e8..7d722524e8 100644 --- a/nodebuilder/node/metrics.go +++ b/nodebuilder/node/metrics.go @@ -4,11 +4,11 @@ import ( "context" "time" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" ) -var meter = global.MeterProvider().Meter("node") +var meter = otel.Meter("node") var ( timeStarted time.Time @@ -17,37 +17,34 @@ var ( // WithMetrics registers node metrics. func WithMetrics() error { - nodeStartTS, err := meter. - AsyncFloat64(). - Gauge( - "node_start_ts", - instrument.WithDescription("timestamp when the node was started"), - ) + nodeStartTS, err := meter.Int64ObservableGauge( + "node_start_ts", + metric.WithDescription("timestamp when the node was started"), + ) if err != nil { return err } - totalNodeRunTime, err := meter. - AsyncFloat64(). - Counter( - "node_runtime_counter_in_seconds", - instrument.WithDescription("total time the node has been running"), - ) + totalNodeRunTime, err := meter.Float64ObservableCounter( + "node_runtime_counter_in_seconds", + metric.WithDescription("total time the node has been running"), + ) if err != nil { return err } - return meter.RegisterCallback( - []instrument.Asynchronous{nodeStartTS, totalNodeRunTime}, - func(ctx context.Context) { - if !nodeStarted { - // Observe node start timestamp - timeStarted = time.Now() - nodeStartTS.Observe(ctx, float64(timeStarted.Unix())) - nodeStarted = true - } - - totalNodeRunTime.Observe(ctx, time.Since(timeStarted).Seconds()) - }, - ) + callback := func(ctx context.Context, observer metric.Observer) error { + if !nodeStarted { + // Observe node start timestamp + timeStarted = time.Now() + observer.ObserveInt64(nodeStartTS, timeStarted.Unix()) + nodeStarted = true + } + + observer.ObserveFloat64(totalNodeRunTime, time.Since(timeStarted).Seconds()) + return nil + } + + _, err = meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime) + return err } diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index e6775419c7..f481162b5c 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -81,7 +81,6 @@ func TestLifecycle_WithMetrics(t *testing.T) { otlpmetrichttp.WithInsecure(), }, tt.tp, - node.BuildInfo{}, ), ) require.NotNil(t, node) diff --git a/nodebuilder/p2p/genesis.go b/nodebuilder/p2p/genesis.go index cbe5628d9f..1fb6b10a55 100644 --- a/nodebuilder/p2p/genesis.go +++ b/nodebuilder/p2p/genesis.go @@ -24,7 +24,7 @@ func GenesisFor(net Network) (string, error) { // NOTE: Every time we add a new long-running network, its genesis hash has to be added here. var genesisList = map[Network]string{ Arabica: "7A5FABB19713D732D967B1DA84FA0DF5E87A7B62302D783F78743E216C1A3550", - Mocha: "831B81ADDC5CE999EBB0C150B778F76DAAD9E09DF75FACF164B1F11DCE93E2E1", + Mocha: "79A97034D569C4199A867439B1B7B77D4E1E1D9697212755E1CE6D920CDBB541", BlockspaceRace: "1A8491A72F73929680DAA6C93E3B593579261B2E76536BFA4F5B97D6FE76E088", Private: "", } diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index bea3c78ad2..9a9f7fcf47 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -7,11 +7,16 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pyroscope-io/client/pyroscope" + otelpyroscope "github.com/pyroscope-io/otel-profiling-go" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.11.0" + "go.opentelemetry.io/otel/trace" "go.uber.org/fx" "github.com/celestiaorg/go-fraud" @@ -62,10 +67,9 @@ func WithPyroscope(endpoint string, nodeType node.Type) fx.Option { } // WithMetrics enables metrics exporting for the node. -func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildInfo node.BuildInfo) fx.Option { +func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Option { baseComponents := fx.Options( fx.Supply(metricOpts), - fx.Supply(buildInfo), fx.Invoke(initializeMetrics), fx.Invoke(state.WithMetrics), fx.Invoke(fraud.WithMetrics), @@ -105,13 +109,55 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildIn return opts } +func WithTraces(opts []otlptracehttp.Option, pyroOpts []otelpyroscope.Option) fx.Option { + options := fx.Options( + fx.Supply(opts), + fx.Supply(pyroOpts), + fx.Invoke(initializeTraces), + ) + return options +} + +func initializeTraces( + ctx context.Context, + nodeType node.Type, + peerID peer.ID, + network p2p.Network, + opts []otlptracehttp.Option, + pyroOpts []otelpyroscope.Option, +) error { + client := otlptracehttp.NewClient(opts...) + exporter, err := otlptrace.New(ctx, client) + if err != nil { + return fmt.Errorf("creating OTLP trace exporter: %w", err) + } + + var tp trace.TracerProvider + tp = tracesdk.NewTracerProvider( + tracesdk.WithSampler(tracesdk.AlwaysSample()), + // Always be sure to batch in production. + tracesdk.WithBatcher(exporter), + // Record information about this application in a Resource. + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNamespaceKey.String(nodeType.String()), + semconv.ServiceNameKey.String(fmt.Sprintf("%s/%s", network.String(), peerID.String()))), + )) + + if len(pyroOpts) > 0 { + tp = otelpyroscope.NewTracerProvider(tp, pyroOpts...) + } + otel.SetTracerProvider(tp) + return nil +} + // initializeMetrics initializes the global meter provider. func initializeMetrics( ctx context.Context, lc fx.Lifecycle, peerID peer.ID, nodeType node.Type, - buildInfo node.BuildInfo, + network p2p.Network, opts []otlpmetrichttp.Option, ) error { exp, err := otlpmetrichttp.New(ctx, opts...) @@ -119,19 +165,17 @@ func initializeMetrics( return err } - provider := metric.NewMeterProvider( - metric.WithReader(metric.NewPeriodicReader(exp, metric.WithTimeout(2*time.Second))), - metric.WithResource(resource.NewWithAttributes( + provider := sdk.NewMeterProvider( + sdk.WithReader(sdk.NewPeriodicReader(exp, sdk.WithTimeout(2*time.Second))), + sdk.WithResource(resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceNamespaceKey.String(fmt.Sprintf("Celestia-%s", nodeType.String())), - semconv.ServiceNameKey.String(fmt.Sprintf("semver-%s", buildInfo.SemanticVersion)), - semconv.ServiceInstanceIDKey.String(peerID.String()), - ))) + semconv.ServiceNamespaceKey.String(nodeType.String()), + semconv.ServiceNameKey.String(fmt.Sprintf("%s/%s", network.String(), peerID.String()))))) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return provider.Shutdown(ctx) }, }) - global.SetMeterProvider(provider) + otel.SetMeterProvider(provider) return nil } diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 5cb0e41e53..a1b7e39713 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -53,9 +53,12 @@ func newModule(getter share.Getter, avail share.Availability) Module { // ensureEmptyCARExists adds an empty EDS to the provided EDS store. func ensureEmptyCARExists(ctx context.Context, store *eds.Store) error { emptyEDS := share.EmptyExtendedDataSquare() - emptyDAH := da.NewDataAvailabilityHeader(emptyEDS) + emptyDAH, err := da.NewDataAvailabilityHeader(emptyEDS) + if err != nil { + return err + } - err := store.Put(ctx, emptyDAH.Hash(), emptyEDS) + err = store.Put(ctx, emptyDAH.Hash(), emptyEDS) if errors.Is(err, dagstore.ErrShardExists) { return nil } diff --git a/nodebuilder/share/share_test.go b/nodebuilder/share/share_test.go index 388fd9af07..7c440a6dbf 100644 --- a/nodebuilder/share/share_test.go +++ b/nodebuilder/share/share_test.go @@ -27,7 +27,8 @@ func Test_EmptyCARExists(t *testing.T) { require.NoError(t, err) eds := share.EmptyExtendedDataSquare() - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) // add empty EDS to store err = ensureEmptyCARExists(ctx, edsStore) diff --git a/nodebuilder/store.go b/nodebuilder/store.go index 0ba87a5494..acbcf93caf 100644 --- a/nodebuilder/store.go +++ b/nodebuilder/store.go @@ -105,24 +105,18 @@ func (f *fsStore) PutConfig(cfg *Config) error { } func (f *fsStore) Keystore() (_ keystore.Keystore, err error) { - f.lock.RLock() - defer f.lock.RUnlock() if f.keys == nil { return nil, fmt.Errorf("node: no Keystore found") } return f.keys, nil } -func (f *fsStore) Datastore() (_ datastore.Batching, err error) { - f.lock.RLock() +func (f *fsStore) Datastore() (datastore.Batching, error) { + f.dataMu.Lock() + defer f.dataMu.Unlock() if f.data != nil { - f.lock.RUnlock() return f.data, nil } - f.lock.RUnlock() - - f.lock.Lock() - defer f.lock.Unlock() opts := dsbadger.DefaultOptions // this should be copied @@ -145,29 +139,30 @@ func (f *fsStore) Datastore() (_ datastore.Batching, err error) { // TODO(@Wondertan): Make configurable with more conservative defaults for Light Node opts.MaxTableSize = 64 << 20 - f.data, err = dsbadger.NewDatastore(dataPath(f.path), &opts) + ds, err := dsbadger.NewDatastore(dataPath(f.path), &opts) if err != nil { return nil, fmt.Errorf("node: can't open Badger Datastore: %w", err) } - return f.data, nil + f.data = ds + return ds, nil } func (f *fsStore) Close() (err error) { err = errors.Join(err, f.dirLock.Unlock()) + f.dataMu.Lock() if f.data != nil { err = errors.Join(err, f.data.Close()) } + f.dataMu.Unlock() return } type fsStore struct { - path string - - data datastore.Batching - keys keystore.Keystore - - lock sync.RWMutex // protects all the fields + path string + dataMu sync.Mutex + data datastore.Batching + keys keystore.Keystore dirLock fslock.Handle // protects directory } diff --git a/nodebuilder/tests/swamp/config.go b/nodebuilder/tests/swamp/config.go index 630920609f..e6c06b40cc 100644 --- a/nodebuilder/tests/swamp/config.go +++ b/nodebuilder/tests/swamp/config.go @@ -15,8 +15,8 @@ type Config struct { // 100ms func DefaultConfig() *Config { cfg := core.DefaultTestConfig() - // target height duration lower than this tend to be flakier - cfg.Tendermint.Consensus.TargetHeightDuration = 200 * time.Millisecond + // timeout commit lower than this tend to be flakier + cfg.Tendermint.Consensus.TimeoutCommit = 200 * time.Millisecond return &Config{ cfg, } @@ -31,7 +31,7 @@ func WithBlockTime(t time.Duration) Option { // for empty block c.Tendermint.Consensus.CreateEmptyBlocksInterval = t // for filled block - c.Tendermint.Consensus.TargetHeightDuration = t + c.Tendermint.Consensus.TimeoutCommit = t c.Tendermint.Consensus.SkipTimeoutCommit = false } } diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index cfa5bd0c39..4b19183be1 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -72,13 +72,14 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, root *share.Ro _, err := fa.getter.GetEDS(ctx, root) if err != nil { + if errors.Is(err, context.Canceled) { + return err + } log.Errorw("availability validation failed", "root", root.String(), "err", err.Error()) var byzantineErr *byzantine.ErrByzantine if ipldFormat.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &byzantineErr) { return share.ErrNotAvailable } - - return err } return err } diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index 761671b955..cc2e08129e 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -84,13 +84,13 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, dah *share.Roo } if err != nil { - if !errors.Is(err, context.Canceled) { - log.Errorw("availability validation failed", "root", dah.String(), "err", err.Error()) + if errors.Is(err, context.Canceled) { + return err } + log.Errorw("availability validation failed", "root", dah.String(), "err", err.Error()) if ipldFormat.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) { return share.ErrNotAvailable } - return err } } diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 1709c3f4b7..48813a33f9 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -132,7 +132,8 @@ func TestGetShares(t *testing.T) { eds, err := getter.GetEDS(ctx, dah) require.NoError(t, err) - gotDAH := da.NewDataAvailabilityHeader(eds) + gotDAH, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) require.True(t, dah.Equals(&gotDAH)) } diff --git a/share/availability/test/testing.go b/share/availability/test/testing.go index 19f63f114a..27d6669061 100644 --- a/share/availability/test/testing.go +++ b/share/availability/test/testing.go @@ -34,7 +34,8 @@ func RandFillBS(t *testing.T, n int, bServ blockservice.BlockService) *share.Roo func FillBS(t *testing.T, bServ blockservice.BlockService, shares []share.Share) *share.Root { eds, err := ipld.AddShares(context.TODO(), shares, bServ) require.NoError(t, err) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) return &dah } diff --git a/share/eds/byzantine/bad_encoding.go b/share/eds/byzantine/bad_encoding.go index 0672096b25..9d6cbff229 100644 --- a/share/eds/byzantine/bad_encoding.go +++ b/share/eds/byzantine/bad_encoding.go @@ -2,7 +2,6 @@ package byzantine import ( "bytes" - "errors" "fmt" "github.com/celestiaorg/celestia-app/pkg/wrapper" @@ -113,50 +112,62 @@ func (p *BadEncodingProof) UnmarshalBinary(data []byte) error { func (p *BadEncodingProof) Validate(hdr libhead.Header) error { header, ok := hdr.(*header.ExtendedHeader) if !ok { - panic(fmt.Sprintf("invalid header type: expected %T, got %T", header, hdr)) + panic(fmt.Sprintf("invalid header type received during BEFP validation: expected %T, got %T", header, hdr)) } if header.Height() != int64(p.BlockHeight) { - return errors.New("fraud: incorrect block height") + return fmt.Errorf("incorrect block height during BEFP validation: expected %d, got %d", + p.BlockHeight, header.Height(), + ) } - merkleRowRoots := header.DAH.RowRoots - merkleColRoots := header.DAH.ColumnRoots - if len(merkleRowRoots) != len(merkleColRoots) { + + if len(header.DAH.RowRoots) != len(header.DAH.ColumnRoots) { // NOTE: This should never happen as callers of this method should not feed it with a // malformed extended header. panic(fmt.Sprintf( - "fraud: invalid extended header: length of row and column roots do not match. (rowRoots=%d) (colRoots=%d)", - len(merkleRowRoots), - len(merkleColRoots)), + "invalid extended header: length of row and column roots do not match. (rowRoots=%d) (colRoots=%d)", + len(header.DAH.RowRoots), + len(header.DAH.ColumnRoots)), ) } - if int(p.Index) >= len(merkleRowRoots) { - return fmt.Errorf("fraud: invalid proof: index out of bounds (%d >= %d)", int(p.Index), len(merkleRowRoots)) + + // merkleRoots are the roots against which we are going to check the inclusion of the received + // shares. Changing the order of the roots to prove the shares relative to the orthogonal axis, + // because inside the rsmt2d library rsmt2d.Row = 0 and rsmt2d.Col = 1 + merkleRoots := header.DAH.RowRoots + if p.Axis == rsmt2d.Row { + merkleRoots = header.DAH.ColumnRoots } - if len(merkleRowRoots) != len(p.Shares) { - return fmt.Errorf("fraud: invalid proof: incorrect number of shares %d != %d", len(p.Shares), len(merkleRowRoots)) + if int(p.Index) >= len(merkleRoots) { + return fmt.Errorf("invalid %s proof: index out of bounds (%d >= %d)", + BadEncoding, int(p.Index), len(merkleRoots), + ) } - - root := merkleRowRoots[p.Index] - if p.Axis == rsmt2d.Col { - root = merkleColRoots[p.Index] + if len(p.Shares) != len(merkleRoots) { + // Since p.Shares should contain all the shares from either a row or a + // column, it should exactly match the number of row roots. In this + // context, the number of row roots is the width of the extended data + // square. + return fmt.Errorf("invalid %s proof: incorrect number of shares %d != %d", + BadEncoding, len(p.Shares), len(merkleRoots), + ) } // verify that Merkle proofs correspond to particular shares. - shares := make([][]byte, len(merkleRowRoots)) + shares := make([][]byte, len(merkleRoots)) for index, shr := range p.Shares { if shr == nil { continue } // validate inclusion of the share into one of the DAHeader roots - if ok := shr.Validate(ipld.MustCidFromNamespacedSha256(root)); !ok { - return fmt.Errorf("fraud: invalid proof: incorrect share received at index %d", index) + if ok := shr.Validate(ipld.MustCidFromNamespacedSha256(merkleRoots[index])); !ok { + return fmt.Errorf("invalid %s proof: incorrect share received at index %d", BadEncoding, index) } // NMTree commits the additional namespace while rsmt2d does not know about, so we trim it // this is ugliness from NMTWrapper that we have to embrace ¯\_(ツ)_/¯ shares[index] = share.GetData(shr.Share) } - odsWidth := uint64(len(merkleRowRoots) / 2) + odsWidth := uint64(len(merkleRoots) / 2) codec := share.DefaultRSMT2DCodec() // rebuild a row or col. @@ -183,10 +194,15 @@ func (p *BadEncodingProof) Validate(hdr libhead.Header) error { return err } + // root is a merkle root of the row/col where ErrByzantine occurred + root := header.DAH.RowRoots[p.Index] + if p.Axis == rsmt2d.Col { + root = header.DAH.ColumnRoots[p.Index] + } + // comparing rebuilt Merkle Root of bad row/col with respective Merkle Root of row/col from block. if bytes.Equal(expectedRoot, root) { - return errors.New("fraud: invalid proof: recomputed Merkle root matches the DAH's row/column root") + return fmt.Errorf("invalid %s proof: recomputed Merkle root matches the DAH's row/column root", BadEncoding) } - return nil } diff --git a/share/eds/byzantine/bad_encoding_test.go b/share/eds/byzantine/bad_encoding_test.go index 5f6adac595..49cf64c2c2 100644 --- a/share/eds/byzantine/bad_encoding_test.go +++ b/share/eds/byzantine/bad_encoding_test.go @@ -25,8 +25,9 @@ func TestBadEncodingFraudProof(t *testing.T) { bServ := mdutils.Bserv() square := edstest.RandByzantineEDS(t, 16) - dah := da.NewDataAvailabilityHeader(square) - err := ipld.ImportEDS(ctx, square, bServ) + dah, err := da.NewDataAvailabilityHeader(square) + require.NoError(t, err) + err = ipld.ImportEDS(ctx, square, bServ) require.NoError(t, err) var errRsmt2d *rsmt2d.ErrByzantineData @@ -55,7 +56,8 @@ func TestIncorrectBadEncodingFraudProof(t *testing.T) { eds, err := ipld.AddShares(ctx, shares, bServ) require.NoError(t, err) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) // get an arbitrary row row := uint(squareSize / 2) diff --git a/share/eds/byzantine/byzantine.go b/share/eds/byzantine/byzantine.go index b9c8ef414f..0fcd78273e 100644 --- a/share/eds/byzantine/byzantine.go +++ b/share/eds/byzantine/byzantine.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/ipfs/go-blockservice" + "golang.org/x/sync/errgroup" "github.com/celestiaorg/celestia-app/pkg/da" "github.com/celestiaorg/rsmt2d" @@ -35,17 +36,41 @@ func NewErrByzantine( dah *da.DataAvailabilityHeader, errByz *rsmt2d.ErrByzantineData, ) *ErrByzantine { - root := [][][]byte{ - dah.RowRoots, + // changing the order to collect proofs against an orthogonal axis + roots := [][][]byte{ dah.ColumnRoots, - }[errByz.Axis][errByz.Index] - sharesWithProof, err := GetProofsForShares( - ctx, - bGetter, - ipld.MustCidFromNamespacedSha256(root), - errByz.Shares, - ) - if err != nil { + dah.RowRoots, + }[errByz.Axis] + + sharesWithProof := make([]*ShareWithProof, len(errByz.Shares)) + sharesAmount := 0 + + errGr, ctx := errgroup.WithContext(ctx) + for index, share := range errByz.Shares { + // skip further shares if we already requested half of them, which is enough to recompute the row + // or col + if sharesAmount == len(dah.RowRoots)/2 { + break + } + + if share == nil { + continue + } + sharesAmount++ + + index := index + errGr.Go(func() error { + share, err := getProofsAt( + ctx, bGetter, + ipld.MustCidFromNamespacedSha256(roots[index]), + int(errByz.Index), len(errByz.Shares), + ) + sharesWithProof[index] = share + return err + }) + } + + if err := errGr.Wait(); err != nil { // Fatal as rsmt2d proved that error is byzantine, // but we cannot properly collect the proof, // so verification will fail and thus services won't be stopped @@ -53,7 +78,6 @@ func NewErrByzantine( // TODO(@Wondertan): Find a better way to handle log.Fatalw("getting proof for ErrByzantine", "err", err) } - return &ErrByzantine{ Index: uint32(errByz.Index), Shares: sharesWithProof, diff --git a/share/eds/byzantine/share_proof.go b/share/eds/byzantine/share_proof.go index b8e39ee1d3..d6aa9dad51 100644 --- a/share/eds/byzantine/share_proof.go +++ b/share/eds/byzantine/share_proof.go @@ -78,24 +78,38 @@ func GetProofsForShares( proofs := make([]*ShareWithProof, len(shares)) for index, share := range shares { if share != nil { - proof := make([]cid.Cid, 0) - // TODO(@vgonkivs): Combine GetLeafData and GetProof in one function as the are traversing the same - // tree. Add options that will control what data will be fetched. - s, err := ipld.GetLeaf(ctx, bGetter, root, index, len(shares)) + proof, err := getProofsAt(ctx, bGetter, root, index, len(shares)) if err != nil { return nil, err } - proof, err = ipld.GetProof(ctx, bGetter, root, proof, index, len(shares)) - if err != nil { - return nil, err - } - proofs[index] = NewShareWithProof(index, s.RawData(), proof) + proofs[index] = proof } } - return proofs, nil } +func getProofsAt( + ctx context.Context, + bGetter blockservice.BlockGetter, + root cid.Cid, + index, + total int, +) (*ShareWithProof, error) { + proof := make([]cid.Cid, 0) + // TODO(@vgonkivs): Combine GetLeafData and GetProof in one function as the are traversing the same + // tree. Add options that will control what data will be fetched. + node, err := ipld.GetLeaf(ctx, bGetter, root, index, total) + if err != nil { + return nil, err + } + + proof, err = ipld.GetProof(ctx, bGetter, root, proof, index, total) + if err != nil { + return nil, err + } + return NewShareWithProof(index, node.RawData(), proof), nil +} + func ProtoToShare(protoShares []*pb.Share) []*ShareWithProof { shares := make([]*ShareWithProof, len(protoShares)) for i, share := range protoShares { diff --git a/share/eds/byzantine/share_proof_test.go b/share/eds/byzantine/share_proof_test.go index 0f63d4f0c9..db1db64f80 100644 --- a/share/eds/byzantine/share_proof_test.go +++ b/share/eds/byzantine/share_proof_test.go @@ -27,7 +27,8 @@ func TestGetProof(t *testing.T) { in, err := ipld.AddShares(ctx, shares, bServ) require.NoError(t, err) - dah := da.NewDataAvailabilityHeader(in) + dah, err := da.NewDataAvailabilityHeader(in) + require.NoError(t, err) var tests = []struct { roots [][]byte }{ @@ -63,7 +64,8 @@ func TestGetProofs(t *testing.T) { in, err := ipld.AddShares(ctx, shares, bServ) require.NoError(t, err) - dah := da.NewDataAvailabilityHeader(in) + dah, err := da.NewDataAvailabilityHeader(in) + require.NoError(t, err) for _, root := range dah.ColumnRoots { rootCid := ipld.MustCidFromNamespacedSha256(root) data := make([][]byte, 0, in.Width()) diff --git a/share/eds/eds.go b/share/eds/eds.go index 544f3c2438..cc775491c9 100644 --- a/share/eds/eds.go +++ b/share/eds/eds.go @@ -102,7 +102,10 @@ func initializeWriter(ctx context.Context, eds *rsmt2d.ExtendedDataSquare, w io. return nil, fmt.Errorf("recomputing data square: %w", err) } // compute roots - eds.RowRoots() + _, err = eds.RowRoots() + if err != nil { + return nil, fmt.Errorf("computing row roots: %w", err) + } // commit the batch to DAG err = batchAdder.Commit() if err != nil { @@ -231,8 +234,18 @@ func prependNamespace(quadrant int, shr share.Share) []byte { // rootsToCids converts the EDS's Row and Column roots to CIDs. func rootsToCids(eds *rsmt2d.ExtendedDataSquare) ([]cid.Cid, error) { - var err error - roots := append(eds.RowRoots(), eds.ColRoots()...) + rowRoots, err := eds.RowRoots() + if err != nil { + return nil, err + } + colRoots, err := eds.ColRoots() + if err != nil { + return nil, err + } + + roots := make([][]byte, 0, len(rowRoots)+len(colRoots)) + roots = append(roots, rowRoots...) + roots = append(roots, colRoots...) rootCids := make([]cid.Cid, len(roots)) for i, r := range roots { rootCids[i], err = ipld.CidFromNamespacedSha256(r) @@ -283,7 +296,10 @@ func ReadEDS(ctx context.Context, r io.Reader, root share.DataHash) (eds *rsmt2d return nil, fmt.Errorf("share: computing eds: %w", err) } - newDah := da.NewDataAvailabilityHeader(eds) + newDah, err := da.NewDataAvailabilityHeader(eds) + if err != nil { + return nil, err + } if !bytes.Equal(newDah.Hash(), root) { return nil, fmt.Errorf( "share: content integrity mismatch: imported root %s doesn't match expected root %s", diff --git a/share/eds/eds_test.go b/share/eds/eds_test.go index 0ef211ec6f..af870f60b3 100644 --- a/share/eds/eds_test.go +++ b/share/eds/eds_test.go @@ -167,14 +167,25 @@ func TestInnerNodeBatchSize(t *testing.T) { func TestReadWriteRoundtrip(t *testing.T) { eds := writeRandomEDS(t) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) f := openWrittenEDS(t) defer f.Close() loaded, err := ReadEDS(context.Background(), f, dah.Hash()) require.NoError(t, err, "error reading EDS from file") - require.Equal(t, eds.RowRoots(), loaded.RowRoots()) - require.Equal(t, eds.ColRoots(), loaded.ColRoots()) + + rowRoots, err := eds.RowRoots() + require.NoError(t, err) + loadedRowRoots, err := loaded.RowRoots() + require.NoError(t, err) + require.Equal(t, rowRoots, loadedRowRoots) + + colRoots, err := eds.ColRoots() + require.NoError(t, err) + loadedColRoots, err := loaded.ColRoots() + require.NoError(t, err) + require.Equal(t, colRoots, loadedColRoots) } func TestReadEDS(t *testing.T) { @@ -187,17 +198,22 @@ func TestReadEDS(t *testing.T) { loaded, err := ReadEDS(context.Background(), f, dah.Hash()) require.NoError(t, err, "error reading EDS from file") - require.Equal(t, dah.RowRoots, loaded.RowRoots()) - require.Equal(t, dah.ColumnRoots, loaded.ColRoots()) + rowRoots, err := loaded.RowRoots() + require.NoError(t, err) + require.Equal(t, dah.RowRoots, rowRoots) + colRoots, err := loaded.ColRoots() + require.NoError(t, err) + require.Equal(t, dah.ColumnRoots, colRoots) } func TestReadEDSContentIntegrityMismatch(t *testing.T) { writeRandomEDS(t) - dah := da.NewDataAvailabilityHeader(edstest.RandEDS(t, 4)) + dah, err := da.NewDataAvailabilityHeader(edstest.RandEDS(t, 4)) + require.NoError(t, err) f := openWrittenEDS(t) defer f.Close() - _, err := ReadEDS(context.Background(), f, dah.Hash()) + _, err = ReadEDS(context.Background(), f, dah.Hash()) require.ErrorContains(t, err, "share: content integrity mismatch: imported root") } @@ -209,7 +225,8 @@ func BenchmarkReadWriteEDS(b *testing.B) { b.Cleanup(cancel) for originalDataWidth := 4; originalDataWidth <= 64; originalDataWidth *= 2 { eds := edstest.RandEDS(b, originalDataWidth) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(b, err) b.Run(fmt.Sprintf("Writing %dx%d", originalDataWidth, originalDataWidth), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { @@ -280,7 +297,8 @@ func createTestData(t *testing.T, testDir string) { //nolint:unused err = WriteEDS(ctx, eds, f) require.NoError(t, err, "writing EDS to file") f.Close() - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) header, err := json.MarshalIndent(dah, "", "") require.NoError(t, err, "marshaling example root") diff --git a/share/eds/ods_test.go b/share/eds/ods_test.go index eb243a4022..5b6ed5568b 100644 --- a/share/eds/ods_test.go +++ b/share/eds/ods_test.go @@ -89,6 +89,16 @@ func TestODSReaderReconstruction(t *testing.T) { // reconstruct EDS from ODSReader loaded, err := ReadEDS(ctx, odsR, dah.Hash()) assert.NoError(t, err) - require.Equal(t, eds.RowRoots(), loaded.RowRoots()) - require.Equal(t, eds.ColRoots(), loaded.ColRoots()) + + rowRoots, err := eds.RowRoots() + require.NoError(t, err) + loadedRowRoots, err := loaded.RowRoots() + require.NoError(t, err) + require.Equal(t, rowRoots, loadedRowRoots) + + colRoots, err := eds.ColRoots() + require.NoError(t, err) + loadedColRoots, err := loaded.ColRoots() + require.NoError(t, err) + require.Equal(t, colRoots, loadedColRoots) } diff --git a/share/eds/retriever.go b/share/eds/retriever.go index b3cf363056..2483a37a92 100644 --- a/share/eds/retriever.go +++ b/share/eds/retriever.go @@ -127,7 +127,7 @@ func (r *Retriever) newSession(ctx context.Context, dah *da.DataAvailabilityHead return &tree } - square, err := rsmt2d.ImportExtendedDataSquare(make([][]byte, size*size), share.DefaultRSMT2DCodec(), treeFn) + square, err := rsmt2d.NewExtendedDataSquare(share.DefaultRSMT2DCodec(), treeFn, uint(size), share.Size) if err != nil { return nil, err } @@ -283,10 +283,12 @@ func (rs *retrievalSession) doRequest(ctx context.Context, q *quadrant) { if rs.isReconstructed() { return } - if rs.square.GetCell(uint(x), uint(y)) != nil { + if err := rs.square.SetCell(uint(x), uint(y), share); err != nil { + // safe to ignore as: + // * share size already verified + // * the same share might come from either Row or Col return } - rs.square.SetCell(uint(x), uint(y), share) // if we have >= 1/4 of the square we can start trying to Reconstruct // TODO(@Wondertan): This is not an ideal way to know when to start // reconstruction and can cause idle reconstruction tries in some cases, diff --git a/share/eds/retriever_test.go b/share/eds/retriever_test.go index c90697862f..2277f894a1 100644 --- a/share/eds/retriever_test.go +++ b/share/eds/retriever_test.go @@ -59,7 +59,8 @@ func TestRetriever_Retrieve(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Minute*5) // the timeout is big for the max size which is long defer cancel() - dah := da.NewDataAvailabilityHeader(in) + dah, err := da.NewDataAvailabilityHeader(in) + require.NoError(t, err) out, err := r.Retrieve(ctx, &dah) require.NoError(t, err) assert.True(t, share.EqualEDS(in, out)) @@ -93,7 +94,8 @@ func TestRetriever_ByzantineError(t *testing.T) { require.NoError(t, err) // ensure we rcv an error - dah := da.NewDataAvailabilityHeader(attackerEDS) + dah, err := da.NewDataAvailabilityHeader(attackerEDS) + require.NoError(t, err) r := NewRetriever(bserv) _, err = r.Retrieve(ctx, &dah) var errByz *byzantine.ErrByzantine @@ -116,7 +118,8 @@ func TestRetriever_MultipleRandQuadrants(t *testing.T) { in, err := ipld.AddShares(ctx, shares, bServ) require.NoError(t, err) - dah := da.NewDataAvailabilityHeader(in) + dah, err := da.NewDataAvailabilityHeader(in) + require.NoError(t, err) ses, err := r.newSession(ctx, &dah) require.NoError(t, err) diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 7f2f548f63..491b63c48a 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -271,7 +271,8 @@ func BenchmarkStore(b *testing.B) { // pause the timer for initializing test data b.StopTimer() eds := edstest.RandEDS(b, 128) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(b, err) b.StartTimer() err = edsStore.Put(ctx, dah.Hash(), eds) @@ -286,11 +287,12 @@ func BenchmarkStore(b *testing.B) { // pause the timer for initializing test data b.StopTimer() eds := edstest.RandEDS(b, 128) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(b, err) _ = edsStore.Put(ctx, dah.Hash(), eds) b.StartTimer() - _, err := edsStore.Get(ctx, dah.Hash()) + _, err = edsStore.Get(ctx, dah.Hash()) require.NoError(b, err) } }) @@ -306,7 +308,8 @@ func newStore(t *testing.T) (*Store, error) { func randomEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, share.Root) { eds := edstest.RandEDS(t, 4) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) return eds, dah } diff --git a/share/empty.go b/share/empty.go index b0e1e6e6ae..07d48f2f07 100644 --- a/share/empty.go +++ b/share/empty.go @@ -54,7 +54,10 @@ func initEmpty() { } emptyBlockEDS = eds - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + if err != nil { + panic(fmt.Errorf("failed to create empty DAH: %w", err)) + } minDAH := da.MinDataAvailabilityHeader() if !bytes.Equal(minDAH.Hash(), dah.Hash()) { panic(fmt.Sprintf("mismatch in calculated minimum DAH and minimum DAH from celestia-app, "+ diff --git a/share/getters/getter_test.go b/share/getters/getter_test.go index 7af8af2f26..571129f029 100644 --- a/share/getters/getter_test.go +++ b/share/getters/getter_test.go @@ -232,7 +232,8 @@ func TestIPLDGetter(t *testing.T) { func randomEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, share.Root) { eds := edstest.RandEDS(t, 4) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) return eds, dah } @@ -260,7 +261,8 @@ func randomEDSWithDoubledNamespace(t *testing.T, size int) (*rsmt2d.ExtendedData wrapper.NewConstructor(uint64(size)), ) require.NoError(t, err, "failure to recompute the extended data square") - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) return eds, share.GetNamespace(randShares[idx1]), dah } diff --git a/share/getters/shrex.go b/share/getters/shrex.go index bb5e7ca7a7..c754d73c1d 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -6,11 +6,9 @@ import ( "fmt" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "github.com/celestiaorg/rsmt2d" @@ -32,11 +30,11 @@ const ( defaultMinAttemptsCount = 3 ) -var meter = global.MeterProvider().Meter("shrex/getter") +var meter = otel.Meter("shrex/getter") type metrics struct { - edsAttempts syncint64.Histogram - ndAttempts syncint64.Histogram + edsAttempts metric.Int64Histogram + ndAttempts metric.Int64Histogram } func (m *metrics) recordEDSAttempt(ctx context.Context, attemptCount int, success bool) { @@ -46,7 +44,9 @@ func (m *metrics) recordEDSAttempt(ctx context.Context, attemptCount int, succes if ctx.Err() != nil { ctx = context.Background() } - m.edsAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success)) + m.edsAttempts.Record(ctx, int64(attemptCount), + metric.WithAttributes( + attribute.Bool("success", success))) } func (m *metrics) recordNDAttempt(ctx context.Context, attemptCount int, success bool) { @@ -56,23 +56,23 @@ func (m *metrics) recordNDAttempt(ctx context.Context, attemptCount int, success if ctx.Err() != nil { ctx = context.Background() } - m.ndAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success)) + m.ndAttempts.Record(ctx, int64(attemptCount), + metric.WithAttributes( + attribute.Bool("success", success))) } func (sg *ShrexGetter) WithMetrics() error { - edsAttemptHistogram, err := meter.SyncInt64().Histogram( + edsAttemptHistogram, err := meter.Int64Histogram( "getters_shrex_eds_attempts_per_request", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("Number of attempts per shrex/eds request"), + metric.WithDescription("Number of attempts per shrex/eds request"), ) if err != nil { return err } - ndAttemptHistogram, err := meter.SyncInt64().Histogram( + ndAttemptHistogram, err := meter.Int64Histogram( "getters_shrex_nd_attempts_per_request", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("Number of attempts per shrex/nd request"), + metric.WithDescription("Number of attempts per shrex/nd request"), ) if err != nil { return err diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index e1628d8725..0dbe7e44db 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -198,7 +198,8 @@ func newStore(t *testing.T) (*eds.Store, error) { func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader, share.Namespace) { eds := edstest.RandEDS(t, 4) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) max := nmt.MaxNamespace(dah.RowRoots[(len(dah.RowRoots))/2-1], share.NamespaceSize) return eds, dah, max } diff --git a/share/getters/testing.go b/share/getters/testing.go index 4734557d7f..71c6231f3c 100644 --- a/share/getters/testing.go +++ b/share/getters/testing.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/require" + "github.com/celestiaorg/celestia-app/pkg/da" "github.com/celestiaorg/rsmt2d" @@ -15,7 +17,8 @@ import ( // TestGetter provides a testing SingleEDSGetter and the root of the EDS it holds. func TestGetter(t *testing.T) (share.Getter, *share.Root) { eds := edstest.RandEDS(t, 8) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) return &SingleEDSGetter{ EDS: eds, }, &dah @@ -52,7 +55,10 @@ func (seg *SingleEDSGetter) GetSharesByNamespace(context.Context, *share.Root, s } func (seg *SingleEDSGetter) checkRoot(root *share.Root) error { - dah := da.NewDataAvailabilityHeader(seg.EDS) + dah, err := da.NewDataAvailabilityHeader(seg.EDS) + if err != nil { + return err + } if !root.Equals(&dah) { return fmt.Errorf("unknown EDS: have %s, asked %s", dah.String(), root.String()) } diff --git a/share/ipld/add.go b/share/ipld/add.go index fddfe8e99e..5807a320fd 100644 --- a/share/ipld/add.go +++ b/share/ipld/add.go @@ -39,7 +39,10 @@ func AddShares( return nil, fmt.Errorf("failure to recompute the extended data square: %w", err) } // compute roots - eds.RowRoots() + _, err = eds.RowRoots() + if err != nil { + return nil, err + } // commit the batch to ipfs return eds, batchAdder.Commit() } @@ -67,7 +70,10 @@ func ImportShares( return nil, fmt.Errorf("failure to recompute the extended data square: %w", err) } // compute roots - eds.RowRoots() + _, err = eds.RowRoots() + if err != nil { + return nil, err + } // commit the batch to DAG return eds, batchAdder.Commit() } diff --git a/share/ipld/get_shares_test.go b/share/ipld/get_shares_test.go index cd26f759b3..7ee7704fc0 100644 --- a/share/ipld/get_shares_test.go +++ b/share/ipld/get_shares_test.go @@ -45,7 +45,9 @@ func TestGetShare(t *testing.T) { for i, leaf := range shares { row := i / size pos := i - (size * row) - share, err := GetShare(ctx, bServ, MustCidFromNamespacedSha256(eds.RowRoots()[row]), pos, size*2) + rowRoots, err := eds.RowRoots() + require.NoError(t, err) + share, err := GetShare(ctx, bServ, MustCidFromNamespacedSha256(rowRoots[row]), pos, size*2) require.NoError(t, err) assert.Equal(t, leaf, share) } @@ -87,8 +89,10 @@ func TestBlockRecovery(t *testing.T) { require.NoError(t, err) // calculate roots using the first complete square - rowRoots := testEds.RowRoots() - colRoots := testEds.ColRoots() + rowRoots, err := testEds.RowRoots() + require.NoError(t, err) + colRoots, err := testEds.ColRoots() + require.NoError(t, err) flat := share.ExtractEDS(testEds) @@ -173,9 +177,11 @@ func TestGetSharesByNamespace(t *testing.T) { require.NoError(t, err) var shares []share.Share - for _, row := range eds.RowRoots() { + rowRoots, err := eds.RowRoots() + require.NoError(t, err) + for _, row := range rowRoots { rcid := MustCidFromNamespacedSha256(row) - rowShares, _, err := GetSharesByNamespace(ctx, bServ, rcid, namespace, len(eds.RowRoots())) + rowShares, _, err := GetSharesByNamespace(ctx, bServ, rcid, namespace, len(rowRoots)) if errors.Is(err, ErrNamespaceOutsideRange) { continue } @@ -208,7 +214,8 @@ func TestCollectLeavesByNamespace_IncompleteData(t *testing.T) { eds, err := AddShares(ctx, shares, bServ) require.NoError(t, err) - roots := eds.RowRoots() + roots, err := eds.RowRoots() + require.NoError(t, err) // remove the second share from the first row rcid := MustCidFromNamespacedSha256(roots[0]) @@ -302,7 +309,10 @@ func TestCollectLeavesByNamespace_MultipleRowsContainingSameNamespaceId(t *testi eds, err := AddShares(ctx, shares, bServ) require.NoError(t, err) - for _, row := range eds.RowRoots() { + rowRoots, err := eds.RowRoots() + require.NoError(t, err) + + for _, row := range rowRoots { rcid := MustCidFromNamespacedSha256(row) data := NewNamespaceData(len(shares), namespace, WithLeaves()) err := data.CollectLeavesByNamespace(ctx, bServ, rcid) @@ -356,9 +366,11 @@ func TestGetSharesWithProofsByNamespace(t *testing.T) { require.NoError(t, err) var shares []share.Share - for _, row := range eds.RowRoots() { + rowRoots, err := eds.RowRoots() + require.NoError(t, err) + for _, row := range rowRoots { rcid := MustCidFromNamespacedSha256(row) - rowShares, proof, err := GetSharesByNamespace(ctx, bServ, rcid, namespace, len(eds.RowRoots())) + rowShares, proof, err := GetSharesByNamespace(ctx, bServ, rcid, namespace, len(rowRoots)) if namespace.IsOutsideRange(row, row) { require.ErrorIs(t, err, ErrNamespaceOutsideRange) continue @@ -446,16 +458,18 @@ func assertNoRowContainsNID( namespace share.Namespace, isAbsent bool, ) { - rowRootCount := len(eds.RowRoots()) + rowRoots, err := eds.RowRoots() + require.NoError(t, err) + rowRootCount := len(rowRoots) // get all row root cids rowRootCIDs := make([]cid.Cid, rowRootCount) - for i, rowRoot := range eds.RowRoots() { + for i, rowRoot := range rowRoots { rowRootCIDs[i] = MustCidFromNamespacedSha256(rowRoot) } // for each row root cid check if the min namespace exists var absentCount, foundAbsenceRows int - for _, rowRoot := range eds.RowRoots() { + for _, rowRoot := range rowRoots { var outsideRange bool if !namespace.IsOutsideRange(rowRoot, rowRoot) { // namespace does belong to namespace range of the row diff --git a/share/ipld/nmt_test.go b/share/ipld/nmt_test.go index aa125ab3c7..77268d7112 100644 --- a/share/ipld/nmt_test.go +++ b/share/ipld/nmt_test.go @@ -26,7 +26,8 @@ func TestNamespaceFromCID(t *testing.T) { for i, tt := range tests { t.Run(strconv.Itoa(i), func(t *testing.T) { - dah := da.NewDataAvailabilityHeader(tt.eds) + dah, err := da.NewDataAvailabilityHeader(tt.eds) + require.NoError(t, err) // check to make sure NamespacedHash is correctly derived from CID for _, row := range dah.RowRoots { c, err := CidFromNamespacedSha256(row) diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 96eda8bd78..9779d72195 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -251,7 +251,10 @@ func (d *Discovery) discover(ctx context.Context) bool { log.Debugw("reached soft peer limit, skipping discovery", "size", size) return true } - log.Infow("discovering peers", "want", want) + // TODO @renaynay: eventually, have a mechanism to catch if wanted amount of peers + // has not been discovered in X amount of time so that users are warned of degraded + // FN connectivity. + log.Debugw("discovering peers", "want", want) // we use errgroup as it provide limits var wg errgroup.Group diff --git a/share/p2p/discovery/metrics.go b/share/p2p/discovery/metrics.go index b6adbb1984..99c9bb4548 100644 --- a/share/p2p/discovery/metrics.go +++ b/share/p2p/discovery/metrics.go @@ -5,11 +5,9 @@ import ( "fmt" "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/instrument/asyncint64" - "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric" ) const ( @@ -28,18 +26,18 @@ const ( ) var ( - meter = global.MeterProvider().Meter("share_discovery") + meter = otel.Meter("share_discovery") ) type handlePeerResult string type metrics struct { - peersAmount asyncint64.Gauge - discoveryResult syncint64.Counter // attributes: enough_peers[bool],is_canceled[bool] - handlePeerResult syncint64.Counter // attributes: result[string] - advertise syncint64.Counter // attributes: failed[bool] - peerAdded syncint64.Counter - peerRemoved syncint64.Counter + peersAmount metric.Int64ObservableGauge + discoveryResult metric.Int64Counter // attributes: enough_peers[bool],is_canceled[bool] + handlePeerResult metric.Int64Counter // attributes: result[string] + advertise metric.Int64Counter // attributes: failed[bool] + peerAdded metric.Int64Counter + peerRemoved metric.Int64Counter } // WithMetrics turns on metric collection in discoery. @@ -54,44 +52,44 @@ func (d *Discovery) WithMetrics() error { } func initMetrics(d *Discovery) (*metrics, error) { - peersAmount, err := meter.AsyncInt64().Gauge("discovery_amount_of_peers", - instrument.WithDescription("amount of peers in discovery set")) + peersAmount, err := meter.Int64ObservableGauge("discovery_amount_of_peers", + metric.WithDescription("amount of peers in discovery set")) if err != nil { return nil, err } - discoveryResult, err := meter.SyncInt64().Counter("discovery_find_peers_result", - instrument.WithDescription("result of find peers run")) + discoveryResult, err := meter.Int64Counter("discovery_find_peers_result", + metric.WithDescription("result of find peers run")) if err != nil { return nil, err } - handlePeerResultCounter, err := meter.SyncInt64().Counter("discovery_handler_peer_result", - instrument.WithDescription("result handling found peer")) + handlePeerResultCounter, err := meter.Int64Counter("discovery_handler_peer_result", + metric.WithDescription("result handling found peer")) if err != nil { return nil, err } - advertise, err := meter.SyncInt64().Counter("discovery_advertise_event", - instrument.WithDescription("advertise events counter")) + advertise, err := meter.Int64Counter("discovery_advertise_event", + metric.WithDescription("advertise events counter")) if err != nil { return nil, err } - peerAdded, err := meter.SyncInt64().Counter("discovery_add_peer", - instrument.WithDescription("add peer to discovery set counter")) + peerAdded, err := meter.Int64Counter("discovery_add_peer", + metric.WithDescription("add peer to discovery set counter")) if err != nil { return nil, err } - peerRemoved, err := meter.SyncInt64().Counter("discovery_remove_peer", - instrument.WithDescription("remove peer from discovery set counter")) + peerRemoved, err := meter.Int64Counter("discovery_remove_peer", + metric.WithDescription("remove peer from discovery set counter")) if err != nil { return nil, err } - backOffSize, err := meter.AsyncInt64().Gauge("discovery_backoff_amount", - instrument.WithDescription("amount of peers in backoff")) + backOffSize, err := meter.Int64ObservableGauge("discovery_backoff_amount", + metric.WithDescription("amount of peers in backoff")) if err != nil { return nil, err } @@ -105,16 +103,12 @@ func initMetrics(d *Discovery) (*metrics, error) { peerRemoved: peerRemoved, } - err = meter.RegisterCallback( - []instrument.Asynchronous{ - peersAmount, - backOffSize, - }, - func(ctx context.Context) { - peersAmount.Observe(ctx, int64(d.set.Size())) - backOffSize.Observe(ctx, int64(d.connector.Size())) - }, - ) + callback := func(ctx context.Context, observer metric.Observer) error { + observer.ObserveInt64(peersAmount, int64(d.set.Size())) + observer.ObserveInt64(backOffSize, int64(d.connector.Size())) + return nil + } + _, err = meter.RegisterCallback(callback, peersAmount, backOffSize) if err != nil { return nil, fmt.Errorf("registering metrics callback: %w", err) } @@ -130,7 +124,8 @@ func (m *metrics) observeFindPeers(ctx context.Context, isEnoughPeers bool) { } m.discoveryResult.Add(ctx, 1, - attribute.Bool(discoveryEnoughPeersKey, isEnoughPeers)) + metric.WithAttributes( + attribute.Bool(discoveryEnoughPeersKey, isEnoughPeers))) } func (m *metrics) observeHandlePeer(ctx context.Context, result handlePeerResult) { @@ -142,7 +137,8 @@ func (m *metrics) observeHandlePeer(ctx context.Context, result handlePeerResult } m.handlePeerResult.Add(ctx, 1, - attribute.String(handlePeerResultKey, string(result))) + metric.WithAttributes( + attribute.String(handlePeerResultKey, string(result)))) } func (m *metrics) observeAdvertise(ctx context.Context, err error) { @@ -154,7 +150,8 @@ func (m *metrics) observeAdvertise(ctx context.Context, err error) { } m.advertise.Add(ctx, 1, - attribute.Bool(advertiseFailedKey, err != nil)) + metric.WithAttributes( + attribute.Bool(advertiseFailedKey, err != nil))) } func (m *metrics) observeOnPeersUpdate(_ peer.ID, isAdded bool) { diff --git a/share/p2p/metrics.go b/share/p2p/metrics.go index 87c1e2eeb0..1942be5d6b 100644 --- a/share/p2p/metrics.go +++ b/share/p2p/metrics.go @@ -4,14 +4,12 @@ import ( "context" "fmt" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/metric" ) -var meter = global.MeterProvider().Meter("shrex/eds") +var meter = otel.Meter("shrex/eds") type status string @@ -24,7 +22,7 @@ const ( ) type Metrics struct { - totalRequestCounter syncint64.Counter + totalRequestCounter metric.Int64Counter } // ObserveRequests increments the total number of requests sent with the given status as an @@ -36,14 +34,16 @@ func (m *Metrics) ObserveRequests(ctx context.Context, count int64, status statu if ctx.Err() != nil { ctx = context.Background() } - m.totalRequestCounter.Add(ctx, count, attribute.String("status", string(status))) + m.totalRequestCounter.Add(ctx, count, + metric.WithAttributes( + attribute.String("status", string(status)), + )) } func InitClientMetrics(protocol string) (*Metrics, error) { - totalRequestCounter, err := meter.SyncInt64().Counter( + totalRequestCounter, err := meter.Int64Counter( fmt.Sprintf("shrex_%s_client_total_requests", protocol), - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription(fmt.Sprintf("Total count of sent shrex/%s requests", protocol)), + metric.WithDescription(fmt.Sprintf("Total count of sent shrex/%s requests", protocol)), ) if err != nil { return nil, err @@ -55,10 +55,9 @@ func InitClientMetrics(protocol string) (*Metrics, error) { } func InitServerMetrics(protocol string) (*Metrics, error) { - totalRequestCounter, err := meter.SyncInt64().Counter( + totalRequestCounter, err := meter.Int64Counter( fmt.Sprintf("shrex_%s_server_total_responses", protocol), - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription(fmt.Sprintf("Total count of sent shrex/%s responses", protocol)), + metric.WithDescription(fmt.Sprintf("Total count of sent shrex/%s responses", protocol)), ) if err != nil { return nil, err diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index bf4d544d9f..95d1ce65d9 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -8,11 +8,9 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/instrument/asyncint64" - "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) @@ -52,7 +50,7 @@ const ( ) var ( - meter = global.MeterProvider().Meter("shrex_peer_manager") + meter = otel.Meter("shrex_peer_manager") ) type blacklistPeerReason string @@ -64,63 +62,63 @@ type poolStatus string type peerSource string type metrics struct { - getPeer syncint64.Counter // attributes: source, is_instant - getPeerWaitTimeHistogram syncint64.Histogram // attributes: source - getPeerPoolSizeHistogram syncint64.Histogram // attributes: source - doneResult syncint64.Counter // attributes: source, done_result - validationResult syncint64.Counter // attributes: validation_result - - shrexPools asyncint64.Gauge // attributes: pool_status - fullNodesPool asyncint64.Gauge // attributes: pool_status + getPeer metric.Int64Counter // attributes: source, is_instant + getPeerWaitTimeHistogram metric.Int64Histogram // attributes: source + getPeerPoolSizeHistogram metric.Int64Histogram // attributes: source + doneResult metric.Int64Counter // attributes: source, done_result + validationResult metric.Int64Counter // attributes: validation_result + + shrexPools metric.Int64ObservableGauge // attributes: pool_status + fullNodesPool metric.Int64ObservableGauge // attributes: pool_status blacklistedPeersByReason sync.Map - blacklistedPeers asyncint64.Gauge // attributes: blacklist_reason + blacklistedPeers metric.Int64ObservableGauge // attributes: blacklist_reason } func initMetrics(manager *Manager) (*metrics, error) { - getPeer, err := meter.SyncInt64().Counter("peer_manager_get_peer_counter", - instrument.WithDescription("get peer counter")) + getPeer, err := meter.Int64Counter("peer_manager_get_peer_counter", + metric.WithDescription("get peer counter")) if err != nil { return nil, err } - getPeerWaitTimeHistogram, err := meter.SyncInt64().Histogram("peer_manager_get_peer_ms_time_hist", - instrument.WithDescription("get peer time histogram(ms), observed only for async get(is_instant = false)")) + getPeerWaitTimeHistogram, err := meter.Int64Histogram("peer_manager_get_peer_ms_time_hist", + metric.WithDescription("get peer time histogram(ms), observed only for async get(is_instant = false)")) if err != nil { return nil, err } - getPeerPoolSizeHistogram, err := meter.SyncInt64().Histogram("peer_manager_get_peer_pool_size_hist", - instrument.WithDescription("amount of available active peers in pool at time when get was called")) + getPeerPoolSizeHistogram, err := meter.Int64Histogram("peer_manager_get_peer_pool_size_hist", + metric.WithDescription("amount of available active peers in pool at time when get was called")) if err != nil { return nil, err } - doneResult, err := meter.SyncInt64().Counter("peer_manager_done_result_counter", - instrument.WithDescription("done results counter")) + doneResult, err := meter.Int64Counter("peer_manager_done_result_counter", + metric.WithDescription("done results counter")) if err != nil { return nil, err } - validationResult, err := meter.SyncInt64().Counter("peer_manager_validation_result_counter", - instrument.WithDescription("validation result counter")) + validationResult, err := meter.Int64Counter("peer_manager_validation_result_counter", + metric.WithDescription("validation result counter")) if err != nil { return nil, err } - shrexPools, err := meter.AsyncInt64().Gauge("peer_manager_pools_gauge", - instrument.WithDescription("pools amount")) + shrexPools, err := meter.Int64ObservableGauge("peer_manager_pools_gauge", + metric.WithDescription("pools amount")) if err != nil { return nil, err } - fullNodesPool, err := meter.AsyncInt64().Gauge("peer_manager_full_nodes_gauge", - instrument.WithDescription("full nodes pool peers amount")) + fullNodesPool, err := meter.Int64ObservableGauge("peer_manager_full_nodes_gauge", + metric.WithDescription("full nodes pool peers amount")) if err != nil { return nil, err } - blacklisted, err := meter.AsyncInt64().Gauge("peer_manager_blacklisted_peers", - instrument.WithDescription("blacklisted peers amount")) + blacklisted, err := meter.Int64ObservableGauge("peer_manager_blacklisted_peers", + metric.WithDescription("blacklisted peers amount")) if err != nil { return nil, err } @@ -136,33 +134,31 @@ func initMetrics(manager *Manager) (*metrics, error) { blacklistedPeers: blacklisted, } - err = meter.RegisterCallback( - []instrument.Asynchronous{ - shrexPools, - fullNodesPool, - blacklisted, - }, - func(ctx context.Context) { - for poolStatus, count := range manager.shrexPools() { - shrexPools.Observe(ctx, count, - attribute.String(poolStatusKey, string(poolStatus))) - } - - fullNodesPool.Observe(ctx, int64(manager.fullNodes.len()), - attribute.String(peerStatusKey, string(peerStatusActive))) - fullNodesPool.Observe(ctx, int64(manager.fullNodes.cooldown.len()), - attribute.String(peerStatusKey, string(peerStatusCooldown))) - - metrics.blacklistedPeersByReason.Range(func(key, value any) bool { - reason := key.(blacklistPeerReason) - amount := value.(int) - blacklisted.Observe(ctx, int64(amount), - attribute.String(blacklistPeerReasonKey, string(reason))) - return true - }) - }, - ) + callback := func(ctx context.Context, observer metric.Observer) error { + for poolStatus, count := range manager.shrexPools() { + observer.ObserveInt64(shrexPools, count, + metric.WithAttributes( + attribute.String(poolStatusKey, string(poolStatus)))) + } + observer.ObserveInt64(fullNodesPool, int64(manager.fullNodes.len()), + metric.WithAttributes( + attribute.String(peerStatusKey, string(peerStatusActive)))) + observer.ObserveInt64(fullNodesPool, int64(manager.fullNodes.cooldown.len()), + metric.WithAttributes( + attribute.String(peerStatusKey, string(peerStatusCooldown)))) + + metrics.blacklistedPeersByReason.Range(func(key, value any) bool { + reason := key.(blacklistPeerReason) + amount := value.(int) + observer.ObserveInt64(blacklisted, int64(amount), + metric.WithAttributes( + attribute.String(blacklistPeerReasonKey, string(reason)))) + return true + }) + return nil + } + _, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted) if err != nil { return nil, fmt.Errorf("registering metrics callback: %w", err) } @@ -180,17 +176,20 @@ func (m *metrics) observeGetPeer( ctx = context.Background() } m.getPeer.Add(ctx, 1, - attribute.String(sourceKey, string(source)), - attribute.Bool(isInstantKey, waitTime == 0)) + metric.WithAttributes( + attribute.String(sourceKey, string(source)), + attribute.Bool(isInstantKey, waitTime == 0))) if source == sourceShrexSub { m.getPeerPoolSizeHistogram.Record(ctx, int64(poolSize), - attribute.String(sourceKey, string(source))) + metric.WithAttributes( + attribute.String(sourceKey, string(source)))) } // record wait time only for async gets if waitTime > 0 { m.getPeerWaitTimeHistogram.Record(ctx, waitTime.Milliseconds(), - attribute.String(sourceKey, string(source))) + metric.WithAttributes( + attribute.String(sourceKey, string(source)))) } } @@ -201,8 +200,9 @@ func (m *metrics) observeDoneResult(source peerSource, result result) { ctx := context.Background() m.doneResult.Add(ctx, 1, - attribute.String(sourceKey, string(source)), - attribute.String(doneResultKey, string(result))) + metric.WithAttributes( + attribute.String(sourceKey, string(source)), + attribute.String(doneResultKey, string(result)))) } // validationObserver is a middleware that observes validation results as metrics @@ -230,7 +230,8 @@ func (m *metrics) validationObserver(validator shrexsub.ValidatorFn) shrexsub.Va } m.validationResult.Add(ctx, 1, - attribute.String(validationResultKey, resStr)) + metric.WithAttributes( + attribute.String(validationResultKey, resStr))) return res } } diff --git a/share/p2p/shrexeds/exchange_test.go b/share/p2p/shrexeds/exchange_test.go index 21fe9f77a1..b1d2e0ad18 100644 --- a/share/p2p/shrexeds/exchange_test.go +++ b/share/p2p/shrexeds/exchange_test.go @@ -35,7 +35,8 @@ func TestExchange_RequestEDS(t *testing.T) { // Testcase: EDS is immediately available t.Run("EDS_Available", func(t *testing.T) { eds := edstest.RandEDS(t, 4) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) err = store.Put(ctx, dah.Hash(), eds) require.NoError(t, err) @@ -48,7 +49,8 @@ func TestExchange_RequestEDS(t *testing.T) { t.Run("EDS_AvailableAfterDelay", func(t *testing.T) { storageDelay := time.Second eds := edstest.RandEDS(t, 4) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) go func() { time.Sleep(storageDelay) err = store.Put(ctx, dah.Hash(), eds) @@ -77,8 +79,9 @@ func TestExchange_RequestEDS(t *testing.T) { timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) t.Cleanup(cancel) eds := edstest.RandEDS(t, 4) - dah := da.NewDataAvailabilityHeader(eds) - _, err := client.RequestEDS(timeoutCtx, dah.Hash(), server.host.ID()) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) + _, err = client.RequestEDS(timeoutCtx, dah.Hash(), server.host.ID()) require.ErrorIs(t, err, p2p.ErrNotFound) }) diff --git a/share/p2p/shrexnd/exchange_test.go b/share/p2p/shrexnd/exchange_test.go index e8d3e439c0..8d1dad17c0 100644 --- a/share/p2p/shrexnd/exchange_test.go +++ b/share/p2p/shrexnd/exchange_test.go @@ -45,7 +45,8 @@ func TestExchange_RequestND_NotFound(t *testing.T) { t.Cleanup(cancel) eds := edstest.RandEDS(t, 4) - dah := da.NewDataAvailabilityHeader(eds) + dah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds)) randNamespace := dah.RowRoots[(len(dah.RowRoots)-1)/2][:share.NamespaceSize] diff --git a/state/metrics.go b/state/metrics.go index e465e2833d..169023b5f9 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -3,32 +3,28 @@ package state import ( "context" - "go.opentelemetry.io/otel/metric/global" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" ) -var meter = global.MeterProvider().Meter("state") +var meter = otel.Meter("state") func WithMetrics(ca *CoreAccessor) { - pfbCounter, _ := meter.AsyncInt64().Counter( + pfbCounter, _ := meter.Int64ObservableCounter( "pfb_count", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("Total count of submitted PayForBlob transactions"), + metric.WithDescription("Total count of submitted PayForBlob transactions"), ) - lastPfbTimestamp, _ := meter.AsyncInt64().Counter( + lastPfbTimestamp, _ := meter.Int64ObservableCounter( "last_pfb_timestamp", - instrument.WithUnit(unit.Milliseconds), - instrument.WithDescription("Timestamp of the last submitted PayForBlob transaction"), + metric.WithDescription("Timestamp of the last submitted PayForBlob transaction"), ) - err := meter.RegisterCallback( - []instrument.Asynchronous{pfbCounter, lastPfbTimestamp}, - func(ctx context.Context) { - pfbCounter.Observe(ctx, ca.payForBlobCount) - lastPfbTimestamp.Observe(ctx, ca.lastPayForBlob) - }, - ) + callback := func(ctx context.Context, observer metric.Observer) error { + observer.ObserveInt64(pfbCounter, ca.payForBlobCount) + observer.ObserveInt64(lastPfbTimestamp, ca.lastPayForBlob) + return nil + } + _, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp) if err != nil { panic(err) }