diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b75a7896eec..b291cf2bcde 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,6 +1,7 @@ /web/ui @juliusv /web/ui/module @juliusv @nexucis /storage/remote @csmarchbanks @cstyan @bwplotka @tomwilkie +/storage/remote/otlptranslator @gouthamve @jesusvazquez /discovery/kubernetes @brancz /tsdb @jesusvazquez /promql @roidelapluie diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e8b61bcadb2..c464227239d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ jobs: # Whenever the Go version is updated here, .promu.yml # should also be updated. container: - image: quay.io/prometheus/golang-builder:1.20-base + image: quay.io/prometheus/golang-builder:1.21-base steps: - uses: actions/checkout@v3 - uses: prometheus/promci@v0.1.0 @@ -32,7 +32,7 @@ jobs: # Whenever the Go version is updated here, .promu.yml # should also be updated. container: - image: quay.io/prometheus/golang-builder:1.20-base + image: quay.io/prometheus/golang-builder:1.21-base steps: - uses: actions/checkout@v3 @@ -55,7 +55,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '>=1.20 <1.21' + go-version: '>=1.21 <1.22' - run: | $TestTargets = go list ./... | Where-Object { $_ -NotMatch "(github.com/prometheus/prometheus/discovery.*|github.com/prometheus/prometheus/config|github.com/prometheus/prometheus/web)"} go test $TestTargets -vet=off -v @@ -66,7 +66,7 @@ jobs: runs-on: ubuntu-latest # The go verson in this image should be N-1 wrt test_go. container: - image: quay.io/prometheus/golang-builder:1.19-base + image: quay.io/prometheus/golang-builder:1.20-base steps: - uses: actions/checkout@v3 - run: make build @@ -79,7 +79,7 @@ jobs: # Whenever the Go version is updated here, .promu.yml # should also be updated. container: - image: quay.io/prometheus/golang-builder:1.19-base + image: quay.io/prometheus/golang-builder:1.20-base steps: - uses: actions/checkout@v3 - run: go install ./cmd/promtool/. @@ -172,7 +172,7 @@ jobs: quay_io_login: ${{ secrets.quay_io_login }} quay_io_password: ${{ secrets.quay_io_password }} publish_release: - name: Publish release arfefacts + name: Publish release artefacts runs-on: ubuntu-latest needs: [test_ui, test_go, test_windows, golangci, codeql, build_all] if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v2.') diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 762e920163f..ec8d993fb52 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -27,7 +27,7 @@ jobs: uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '>=1.20 <1.21' + go-version: '>=1.21 <1.22' - name: Initialize CodeQL uses: github/codeql-action/init@v2 diff --git a/.golangci.yml b/.golangci.yml index 4a6daae5948..5aa01e48aa0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,6 +3,9 @@ run: skip-files: # Skip autogenerated files. - ^.*\.(pb|y)\.go$ + skip-dirs: + # Copied it from a different source + - storage/remote/otlptranslator/prometheusremotewrite output: sort-results: true diff --git a/.promu.yml b/.promu.yml index 95e043330c4..30d1c14f787 100644 --- a/.promu.yml +++ b/.promu.yml @@ -1,7 +1,7 @@ go: # Whenever the Go version is updated here, # .circle/config.yml should also be updated. - version: 1.20 + version: 1.21 repository: path: github.com/prometheus/prometheus build: diff --git a/CHANGELOG.md b/CHANGELOG.md index 057342dedbe..4a9d4521a17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,27 @@ # Changelog +## 2.47.0 / 2023-09-06 + +This release adds an experimental OpenTelemetry (OTLP) Ingestion feature, +and also new setting `keep_dropped_targets` to limit the amount of dropped +targets held in memory. This defaults to 0 meaning 'no limit', so we encourage +users with large Prometheus to try setting a limit such as 100. + +* [FEATURE] Web: Add OpenTelemetry (OTLP) Ingestion endpoint. #12571 #12643 +* [FEATURE] Scraping: Optionally limit detail on dropped targets, to save memory. #12647 +* [ENHANCEMENT] TSDB: Write head chunks to disk in the background to reduce blocking. #11818 +* [ENHANCEMENT] PromQL: Speed up aggregate and function queries. #12682 +* [ENHANCEMENT] PromQL: More efficient evaluation of query with `timestamp()`. #12579 +* [ENHANCEMENT] API: Faster streaming of Labels to JSON. #12598 +* [ENHANCEMENT] Agent: Memory pooling optimisation. #12651 +* [ENHANCEMENT] TSDB: Prevent storage space leaks due to terminated snapshots on shutdown. #12664 +* [ENHANCEMENT] Histograms: Refactoring and optimisations. #12352 #12584 #12596 #12711 #12054 +* [ENHANCEMENT] Histograms: Add `histogram_stdvar` and `histogram_stddev` functions. #12614 +* [ENHANCEMENT] Remote-write: add http.resend_count tracing attribute. #12676 +* [ENHANCEMENT] TSDB: Support native histograms in snapshot on shutdown. #12722 +* [BUGFIX] TSDB/Agent: ensure that new series get written to WAL on rollback. #12592 +* [BUGFIX] Scraping: fix infinite loop on exemplar in protobuf format. #12737 + ## 2.46.0 / 2023-07-25 * [FEATURE] Promtool: Add PromQL format and label matcher set/delete commands to promtool. #11411 diff --git a/RELEASE.md b/RELEASE.md index 0d0918191b3..2ae07281b0a 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -51,7 +51,7 @@ Release cadence of first pre-releases being cut is 6 weeks. | v2.44 | 2023-04-19 | Bryan Boreham (GitHub: @bboreham) | | v2.45 LTS | 2023-05-31 | Jesus Vazquez (Github: @jesusvazquez) | | v2.46 | 2023-07-12 | Julien Pivotto (GitHub: @roidelapluie) | -| v2.47 | 2023-08-23 | **searching for volunteer** | +| v2.47 | 2023-08-23 | Bryan Boreham (GitHub: @bboreham) | | v2.48 | 2023-10-04 | **searching for volunteer** | If you are interested in volunteering please create a pull request against the [prometheus/prometheus](https://github.com/prometheus/prometheus) repository and propose yourself for the release series of your choice. diff --git a/VERSION b/VERSION index cc8758303e8..bb13a7e3545 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.46.0 +2.47.0 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index debc0d3f9d0..cab65626aac 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -170,6 +170,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "remote-write-receiver": c.web.EnableRemoteWriteReceiver = true level.Warn(logger).Log("msg", "Remote write receiver enabled via feature flag remote-write-receiver. This is DEPRECATED. Use --web.enable-remote-write-receiver.") + case "otlp-write-receiver": + c.web.EnableOTLPWriteReceiver = true + level.Info(logger).Log("msg", "Experimental OTLP write receiver enabled") case "expand-external-labels": c.enableExpandExternalLabels = true level.Info(logger).Log("msg", "Experimental expand-external-labels enabled") @@ -211,11 +214,6 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { } } - if c.tsdb.EnableNativeHistograms && c.tsdb.EnableMemorySnapshotOnShutdown { - c.tsdb.EnableMemorySnapshotOnShutdown = false - level.Warn(logger).Log("msg", "memory-snapshot-on-shutdown has been disabled automatically because memory-snapshot-on-shutdown and native-histograms cannot be enabled at the same time.") - } - return nil } @@ -420,7 +418,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) diff --git a/cmd/promtool/testdata/unittest.yml b/cmd/promtool/testdata/unittest.yml index e6745aadfed..ff511729ba3 100644 --- a/cmd/promtool/testdata/unittest.yml +++ b/cmd/promtool/testdata/unittest.yml @@ -10,6 +10,21 @@ tests: - series: test_full values: "0 0" + - series: test_repeat + values: "1x2" + + - series: test_increase + values: "1+1x2" + + - series: test_histogram + values: "{{schema:1 sum:-0.3 count:32.1 z_bucket:7.1 z_bucket_w:0.05 buckets:[5.1 10 7] offset:-3 n_buckets:[4.1 5] n_offset:-5}}" + + - series: test_histogram_repeat + values: "{{sum:3 count:2 buckets:[2]}}x2" + + - series: test_histogram_increase + values: "{{sum:3 count:2 buckets:[2]}}+{{sum:1.3 count:1 buckets:[1]}}x2" + - series: test_stale values: "0 stale" @@ -31,6 +46,37 @@ tests: exp_samples: - value: 60 + # Repeat & increase + - expr: test_repeat + eval_time: 2m + exp_samples: + - value: 1 + labels: "test_repeat" + - expr: test_increase + eval_time: 2m + exp_samples: + - value: 3 + labels: "test_increase" + + # Histograms + - expr: test_histogram + eval_time: 1m + exp_samples: + - labels: "test_histogram" + histogram: "{{schema:1 sum:-0.3 count:32.1 z_bucket:7.1 z_bucket_w:0.05 buckets:[5.1 10 7] offset:-3 n_buckets:[4.1 5] n_offset:-5}}" + + - expr: test_histogram_repeat + eval_time: 2m + exp_samples: + - labels: "test_histogram_repeat" + histogram: "{{count:2 sum:3 buckets:[2]}}" + + - expr: test_histogram_increase + eval_time: 2m + exp_samples: + - labels: "test_histogram_increase" + histogram: "{{count:4 sum:5.6 buckets:[4]}}" + # Ensure a value is stale as soon as it is marked as such. - expr: test_stale eval_time: 59s diff --git a/cmd/promtool/unittest.go b/cmd/promtool/unittest.go index e934f37c851..575480b0327 100644 --- a/cmd/promtool/unittest.go +++ b/cmd/promtool/unittest.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/common/model" "gopkg.in/yaml.v2" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -346,14 +347,29 @@ Outer: var gotSamples []parsedSample for _, s := range got { gotSamples = append(gotSamples, parsedSample{ - Labels: s.Metric.Copy(), - Value: s.F, + Labels: s.Metric.Copy(), + Value: s.F, + Histogram: promql.HistogramTestExpression(s.H), }) } var expSamples []parsedSample for _, s := range testCase.ExpSamples { lb, err := parser.ParseMetric(s.Labels) + var hist *histogram.FloatHistogram + if err == nil && s.Histogram != "" { + _, values, parseErr := parser.ParseSeriesDesc("{} " + s.Histogram) + switch { + case parseErr != nil: + err = parseErr + case len(values) != 1: + err = fmt.Errorf("expected 1 value, got %d", len(values)) + case values[0].Histogram == nil: + err = fmt.Errorf("expected histogram, got %v", values[0]) + default: + hist = values[0].Histogram + } + } if err != nil { err = fmt.Errorf("labels %q: %w", s.Labels, err) errs = append(errs, fmt.Errorf(" expr: %q, time: %s, err: %w", testCase.Expr, @@ -361,8 +377,9 @@ Outer: continue Outer } expSamples = append(expSamples, parsedSample{ - Labels: lb, - Value: s.Value, + Labels: lb, + Value: s.Value, + Histogram: promql.HistogramTestExpression(hist), }) } @@ -530,14 +547,16 @@ type promqlTestCase struct { } type sample struct { - Labels string `yaml:"labels"` - Value float64 `yaml:"value"` + Labels string `yaml:"labels"` + Value float64 `yaml:"value"` + Histogram string `yaml:"histogram"` // A non-empty string means Value is ignored. } // parsedSample is a sample with parsed Labels. type parsedSample struct { - Labels labels.Labels - Value float64 + Labels labels.Labels + Value float64 + Histogram string // TestExpression() of histogram.FloatHistogram } func parsedSamplesString(pss []parsedSample) string { @@ -552,5 +571,8 @@ func parsedSamplesString(pss []parsedSample) string { } func (ps *parsedSample) String() string { + if ps.Histogram != "" { + return ps.Labels.String() + " " + ps.Histogram + } return ps.Labels.String() + " " + strconv.FormatFloat(ps.Value, 'E', -1, 64) } diff --git a/config/config.go b/config/config.go index d32fcc33c98..7f7595dcdf9 100644 --- a/config/config.go +++ b/config/config.go @@ -409,6 +409,9 @@ type GlobalConfig struct { // More than this label value length post metric-relabeling will cause the // scrape to fail. 0 means no limit. LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"` + // Keep no more than this many dropped targets per job. + // 0 means no limit. + KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"` } // SetDirectory joins any relative file paths with dir. @@ -514,6 +517,9 @@ type ScrapeConfig struct { // More than this many buckets in a native histogram will cause the scrape to // fail. NativeHistogramBucketLimit uint `yaml:"native_histogram_bucket_limit,omitempty"` + // Keep no more than this many dropped targets per job. + // 0 means no limit. + KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. @@ -608,6 +614,9 @@ func (c *ScrapeConfig) Validate(globalConfig GlobalConfig) error { if c.LabelValueLengthLimit == 0 { c.LabelValueLengthLimit = globalConfig.LabelValueLengthLimit } + if c.KeepDroppedTargets == 0 { + c.KeepDroppedTargets = globalConfig.KeepDroppedTargets + } return nil } diff --git a/discovery/hetzner/hcloud.go b/discovery/hetzner/hcloud.go index 4bcfde83020..6d0599dfa24 100644 --- a/discovery/hetzner/hcloud.go +++ b/discovery/hetzner/hcloud.go @@ -91,7 +91,7 @@ func (d *hcloudDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, er targets := make([]model.LabelSet, len(servers)) for i, server := range servers { labels := model.LabelSet{ - hetznerLabelRole: model.LabelValue(hetznerRoleHcloud), + hetznerLabelRole: model.LabelValue(HetznerRoleHcloud), hetznerLabelServerID: model.LabelValue(fmt.Sprintf("%d", server.ID)), hetznerLabelServerName: model.LabelValue(server.Name), hetznerLabelDatacenter: model.LabelValue(server.Datacenter.Name), diff --git a/discovery/hetzner/hetzner.go b/discovery/hetzner/hetzner.go index 40b28cc2c93..c3f7ec39c3c 100644 --- a/discovery/hetzner/hetzner.go +++ b/discovery/hetzner/hetzner.go @@ -57,7 +57,7 @@ type SDConfig struct { RefreshInterval model.Duration `yaml:"refresh_interval"` Port int `yaml:"port"` - Role role `yaml:"role"` + Role Role `yaml:"role"` hcloudEndpoint string // For tests only. robotEndpoint string // For tests only. } @@ -74,26 +74,26 @@ type refresher interface { refresh(context.Context) ([]*targetgroup.Group, error) } -// role is the role of the target within the Hetzner Ecosystem. -type role string +// Role is the Role of the target within the Hetzner Ecosystem. +type Role string // The valid options for role. const ( // Hetzner Robot Role (Dedicated Server) // https://robot.hetzner.com - hetznerRoleRobot role = "robot" + HetznerRoleRobot Role = "robot" // Hetzner Cloud Role // https://console.hetzner.cloud - hetznerRoleHcloud role = "hcloud" + HetznerRoleHcloud Role = "hcloud" ) // UnmarshalYAML implements the yaml.Unmarshaler interface. -func (c *role) UnmarshalYAML(unmarshal func(interface{}) error) error { +func (c *Role) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := unmarshal((*string)(c)); err != nil { return err } switch *c { - case hetznerRoleRobot, hetznerRoleHcloud: + case HetznerRoleRobot, HetznerRoleHcloud: return nil default: return fmt.Errorf("unknown role %q", *c) @@ -143,12 +143,12 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*refresh.Discovery, error) func newRefresher(conf *SDConfig, l log.Logger) (refresher, error) { switch conf.Role { - case hetznerRoleHcloud: + case HetznerRoleHcloud: if conf.hcloudEndpoint == "" { conf.hcloudEndpoint = hcloud.Endpoint } return newHcloudDiscovery(conf, l) - case hetznerRoleRobot: + case HetznerRoleRobot: if conf.robotEndpoint == "" { conf.robotEndpoint = "https://robot-ws.your-server.de" } diff --git a/discovery/hetzner/robot.go b/discovery/hetzner/robot.go index 4960880289f..1d8aa9302fb 100644 --- a/discovery/hetzner/robot.go +++ b/discovery/hetzner/robot.go @@ -105,7 +105,7 @@ func (d *robotDiscovery) refresh(context.Context) ([]*targetgroup.Group, error) targets := make([]model.LabelSet, len(servers)) for i, server := range servers { labels := model.LabelSet{ - hetznerLabelRole: model.LabelValue(hetznerRoleRobot), + hetznerLabelRole: model.LabelValue(HetznerRoleRobot), hetznerLabelServerID: model.LabelValue(strconv.Itoa(server.Server.ServerNumber)), hetznerLabelServerName: model.LabelValue(server.Server.ServerName), hetznerLabelDatacenter: model.LabelValue(strings.ToLower(server.Server.Dc)), diff --git a/discovery/marathon/marathon.go b/discovery/marathon/marathon.go index cfd3e2c083c..3baf79aff3b 100644 --- a/discovery/marathon/marathon.go +++ b/discovery/marathon/marathon.go @@ -106,14 +106,16 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if len(c.AuthToken) > 0 && len(c.AuthTokenFile) > 0 { return errors.New("marathon_sd: at most one of auth_token & auth_token_file must be configured") } - if c.HTTPClientConfig.BasicAuth != nil && (len(c.AuthToken) > 0 || len(c.AuthTokenFile) > 0) { - return errors.New("marathon_sd: at most one of basic_auth, auth_token & auth_token_file must be configured") - } - if (len(c.HTTPClientConfig.BearerToken) > 0 || len(c.HTTPClientConfig.BearerTokenFile) > 0) && (len(c.AuthToken) > 0 || len(c.AuthTokenFile) > 0) { - return errors.New("marathon_sd: at most one of bearer_token, bearer_token_file, auth_token & auth_token_file must be configured") - } - if c.HTTPClientConfig.Authorization != nil && (len(c.AuthToken) > 0 || len(c.AuthTokenFile) > 0) { - return errors.New("marathon_sd: at most one of auth_token, auth_token_file & authorization must be configured") + + if len(c.AuthToken) > 0 || len(c.AuthTokenFile) > 0 { + switch { + case c.HTTPClientConfig.BasicAuth != nil: + return errors.New("marathon_sd: at most one of basic_auth, auth_token & auth_token_file must be configured") + case len(c.HTTPClientConfig.BearerToken) > 0 || len(c.HTTPClientConfig.BearerTokenFile) > 0: + return errors.New("marathon_sd: at most one of bearer_token, bearer_token_file, auth_token & auth_token_file must be configured") + case c.HTTPClientConfig.Authorization != nil: + return errors.New("marathon_sd: at most one of auth_token, auth_token_file & authorization must be configured") + } } return c.HTTPClientConfig.Validate() } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 46f286e0091..78ec205f244 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -52,7 +52,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 6691902579c..f15a9f914d0 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -106,6 +106,10 @@ global: # change in the future. [ target_limit: | default = 0 ] + # Limit per scrape config on the number of targets dropped by relabeling + # that will be kept in memory. 0 means no limit. + [ keep_dropped_targets: | default = 0 ] + # Rule files specifies a list of globs. Rules and alerts are read from # all matching files. rule_files: @@ -415,6 +419,10 @@ metric_relabel_configs: # change in the future. [ target_limit: | default = 0 ] +# Per-job limit on the number of targets dropped by relabeling +# that will be kept in memory. 0 means no limit. +[ keep_dropped_targets: | default = 0 ] + # Limit on total number of positive and negative buckets allowed in a single # native histogram. If this is exceeded, the entire scrape will be treated as # failed. 0 means no limit. diff --git a/docs/configuration/unit_testing_rules.md b/docs/configuration/unit_testing_rules.md index efd168b3585..73d8ddd383c 100644 --- a/docs/configuration/unit_testing_rules.md +++ b/docs/configuration/unit_testing_rules.md @@ -76,18 +76,49 @@ series: # This uses expanding notation. # Expanding notation: -# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)' -# Read this as series starts at a, then c further samples incrementing by b. -# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)' -# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b). +# 'a+bxn' becomes 'a a+b a+(2*b) a+(3*b) … a+(n*b)' +# Read this as series starts at a, then n further samples incrementing by b. +# 'a-bxn' becomes 'a a-b a-(2*b) a-(3*b) … a-(n*b)' +# Read this as series starts at a, then n further samples decrementing by b (or incrementing by negative b). +# 'axn' becomes 'a a a … a' (n times) - it's a shorthand for 'a+0xn' # There are special values to indicate missing and stale samples: -# '_' represents a missing sample from scrape -# 'stale' indicates a stale sample +# '_' represents a missing sample from scrape +# 'stale' indicates a stale sample # Examples: # 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4. # 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2. # 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0. # 4. ' 1 _x3 stale' becomes '1 _ _ _ stale' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression. +# +# Native histogram notation: +# Native histograms can be used instead of floating point numbers using the following notation: +# {{schema:1 sum:-0.3 count:3.1 z_bucket:7.1 z_bucket_w:0.05 buckets:[5.1 10 7] offset:-3 n_buckets:[4.1 5] n_offset:-5}} +# Native histograms support the same expanding notation as floating point numbers, i.e. 'axn', 'a+bxn' and 'a-bxn'. +# All properties are optional and default to 0. The order is not important. The following properties are supported: +# - schema (int): +# Currently valid schema numbers are -4 <= n <= 8. They are all for +# base-2 bucket schemas, where 1 is a bucket boundary in each case, and +# then each power of two is divided into 2^n logarithmic buckets. Or +# in other words, each bucket boundary is the previous boundary times +# 2^(2^-n). +# - sum (float): +# The sum of all observations, including the zero bucket. +# - count (non-negative float): +# The number of observations, including those that are NaN and including the zero bucket. +# - z_bucket (non-negative float): +# The sum of all observations in the zero bucket. +# - z_bucket_w (non-negative float): +# The width of the zero bucket. +# If z_bucket_w > 0, the zero bucket contains all observations -z_bucket_w <= x <= z_bucket_w. +# Otherwise, the zero bucket only contains observations that are exactly 0. +# - buckets (list of non-negative floats): +# Observation counts in positive buckets. Each represents an absolute count. +# - offset (int): +# The starting index of the first entry in the positive buckets. +# - n_buckets (list of non-negative floats): +# Observation counts in negative buckets. Each represents an absolute count. +# - n_offset (int): +# The starting index of the first entry in the negative buckets. values: ``` diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 58e49e3b4b2..1cf54c47f84 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -126,3 +126,11 @@ still ingest those conventional histograms that do not come with a corresponding native histogram. However, if a native histogram is present, Prometheus will ignore the corresponding conventional histogram, with the notable exception of exemplars, which are always ingested. + +## OTLP Receiver + +`--enable-feature=otlp-write-receiver` + +The OTLP receiver allows Prometheus to accept [OpenTelemetry](https://opentelemetry.io/) metrics writes. +Prometheus is best used as a Pull based system, and staleness, `up` metric, and other Pull enabled features +won't work when you push OTLP metrics. \ No newline at end of file diff --git a/docs/installation.md b/docs/installation.md index 05df14a46e1..28f64c0f959 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -55,6 +55,23 @@ docker run \ prom/prometheus ``` +### Save your Prometheus data + +Prometheus data is stored in `/prometheus` dir inside the container, so the data is cleared every time the container gets restarted. To save your data, you need to set up persistent storage (or bind mounts) for your container. + +Run Prometheus container with persistent storage: + +```bash +# Create persistent volume for your data +docker volume create prometheus-data +# Start Prometheus container +docker run \ + -p 9090:9090 \ + -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml \ + -v prometheus-data:/prometheus \ + prom/prometheus +``` + ### Custom image To avoid managing a file on the host and bind-mount it, the diff --git a/docs/querying/api.md b/docs/querying/api.md index ca7f64f6227..408d32cdab5 100644 --- a/docs/querying/api.md +++ b/docs/querying/api.md @@ -543,6 +543,7 @@ GET /api/v1/targets ``` Both the active and dropped targets are part of the response by default. +Dropped targets are subject to `keep_dropped_targets` limit, if set. `labels` represents the label set after relabeling has occurred. `discoveredLabels` represent the unmodified labels retrieved during service discovery before relabeling has occurred. @@ -1294,3 +1295,16 @@ Enable the remote write receiver by setting endpoint is `/api/v1/write`. Find more details [here](../storage.md#overview). *New in v2.33* + +## OTLP Receiver + +Prometheus can be configured as a receiver for the OTLP Metrics protocol. This +is not considered an efficient way of ingesting samples. Use it +with caution for specific low-volume use cases. It is not suitable for +replacing the ingestion via scraping. + +Enable the OTLP receiver by the feature flag +`--enable-feature=otlp-write-receiver`. When enabled, the OTLP receiver +endpoint is `/api/v1/otlp/v1/metrics`. + +*New in v2.47* \ No newline at end of file diff --git a/docs/querying/basics.md b/docs/querying/basics.md index 9eb95c66eb5..fa0d44a69be 100644 --- a/docs/querying/basics.md +++ b/docs/querying/basics.md @@ -35,7 +35,7 @@ vector is the only type that can be directly graphed. _Notes about the experimental native histograms:_ * Ingesting native histograms has to be enabled via a [feature - flag](../feature_flags/#native-histograms). + flag](../../feature_flags.md#native-histograms). * Once native histograms have been ingested into the TSDB (and even after disabling the feature flag again), both instant vectors and range vectors may now contain samples that aren't simple floating point numbers (float samples) diff --git a/docs/querying/functions.md b/docs/querying/functions.md index 55ed92ecc8b..6b3a77e9730 100644 --- a/docs/querying/functions.md +++ b/docs/querying/functions.md @@ -14,7 +14,7 @@ vector, which if not provided it will default to the value of the expression _Notes about the experimental native histograms:_ * Ingesting native histograms has to be enabled via a [feature - flag](../feature_flags/#native-histograms). As long as no native histograms + flag](../../feature_flags.md#native-histograms). As long as no native histograms have been ingested into the TSDB, all functions will behave as usual. * Functions that do not explicitly mention native histograms in their documentation (see below) will ignore histogram samples. @@ -145,7 +145,7 @@ delta(cpu_temp_celsius{host="zeus"}[2h]) ``` `delta` acts on native histograms by calculating a new histogram where each -compononent (sum and count of observations, buckets) is the difference between +component (sum and count of observations, buckets) is the difference between the respective component in the first and last native histogram in `v`. However, each element in `v` that contains a mix of float and native histogram samples within the range, will be missing from the result vector. @@ -323,6 +323,19 @@ a histogram. You can use `histogram_quantile(1, v instant-vector)` to get the estimated maximum value stored in a histogram. +## `histogram_stddev()` and `histogram_stdvar()` + +_Both functions only act on native histograms, which are an experimental +feature. The behavior of these functions may change in future versions of +Prometheus, including their removal from PromQL._ + +`histogram_stddev(v instant-vector)` returns the estimated standard deviation +of observations in a native histogram, based on the geometric mean of the buckets +where the observations lie. Samples that are not native histograms are ignored and +do not show up in the returned vector. + +Similarly, `histogram_stdvar(v instant-vector)` returns the estimated standard +variance of observations in a native histogram. ## `holt_winters()` @@ -419,11 +432,10 @@ label_join(up{job="api-server",src1="a",src2="b",src3="c"}, "foo", ",", "src1", ## `label_replace()` For each timeseries in `v`, `label_replace(v instant-vector, dst_label string, replacement string, src_label string, regex string)` -matches the regular expression `regex` against the value of the label `src_label`. If it +matches the [regular expression](https://github.com/google/re2/wiki/Syntax) `regex` against the value of the label `src_label`. If it matches, the value of the label `dst_label` in the returned timeseries will be the expansion of `replacement`, together with the original labels in the input. Capturing groups in the -regular expression can be referenced with `$1`, `$2`, etc. If the regular expression doesn't -match then the timeseries is returned unchanged. +regular expression can be referenced with `$1`, `$2`, etc. Named capturing groups in the regular expression can be referenced with `$name` (where `name` is the capturing group name). If the regular expression doesn't match then the timeseries is returned unchanged. `label_replace` acts on float and histogram samples in the same way. @@ -433,6 +445,11 @@ This example will return timeseries with the values `a:c` at label `service` and label_replace(up{job="api-server",service="a:c"}, "foo", "$1", "service", "(.*):.*") ``` +This second example has the same effect than the first example, and illustrates use of named capturing groups: +``` +label_replace(up{job="api-server",service="a:c"}, "foo", "$name", "service", "(?P.*):(?P.*)") +``` + ## `ln()` `ln(v instant-vector)` calculates the natural logarithm for all elements in `v`. @@ -491,7 +508,7 @@ rate(http_requests_total{job="api-server"}[5m]) ``` `rate` acts on native histograms by calculating a new histogram where each -compononent (sum and count of observations, buckets) is the rate of increase +component (sum and count of observations, buckets) is the rate of increase between the respective component in the first and last native histogram in `v`. However, each element in `v` that contains a mix of float and native histogram samples within the range, will be missing from the result vector. diff --git a/docs/querying/operators.md b/docs/querying/operators.md index c691a0f1a8a..b92bdd94ac8 100644 --- a/docs/querying/operators.md +++ b/docs/querying/operators.md @@ -310,7 +310,7 @@ so `2 ^ 3 ^ 2` is equivalent to `2 ^ (3 ^ 2)`. ## Operators for native histograms Native histograms are an experimental feature. Ingesting native histograms has -to be enabled via a [feature flag](../feature_flags/#native-histograms). Once +to be enabled via a [feature flag](../../feature_flags.md#native-histograms). Once native histograms have been ingested, they can be queried (even after the feature flag has been disabled again). However, the operator support for native histograms is still very limited. diff --git a/documentation/examples/prometheus-kubernetes.yml b/documentation/examples/prometheus-kubernetes.yml index 9a622873422..ad7451c2d7b 100644 --- a/documentation/examples/prometheus-kubernetes.yml +++ b/documentation/examples/prometheus-kubernetes.yml @@ -8,6 +8,11 @@ # If you are using Kubernetes 1.7.2 or earlier, please take note of the comments # for the kubernetes-cadvisor job; you will need to edit or remove this job. +# Keep at most 100 sets of details of targets dropped by relabeling. +# This information is used to display in the UI for troubleshooting. +global: + keep_dropped_targets: 100 + # Scrape config for API servers. # # Kubernetes exposes API servers as endpoints to the default/kubernetes diff --git a/go.mod b/go.mod index 82e53f1851f..b84a4623d78 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/prometheus/prometheus -go 1.19 +go 1.20 require ( github.com/Azure/azure-sdk-for-go v65.0.0+incompatible @@ -50,10 +50,12 @@ require ( github.com/prometheus/common/assets v0.2.0 github.com/prometheus/common/sigv4 v0.1.0 github.com/prometheus/exporter-toolkit v0.10.0 - github.com/scaleway/scaleway-sdk-go v1.0.0-beta.19 + github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20 github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c github.com/stretchr/testify v1.8.4 github.com/vultr/govultr/v2 v2.17.2 + go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 + go.opentelemetry.io/collector/semconv v0.81.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 @@ -64,6 +66,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/automaxprocs v1.5.2 go.uber.org/goleak v1.2.1 + go.uber.org/multierr v1.11.0 golang.org/x/net v0.12.0 golang.org/x/oauth2 v0.10.0 golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum index 4a5496485d9..f5661fc9e44 100644 --- a/go.sum +++ b/go.sum @@ -448,7 +448,7 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -706,8 +706,8 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/scaleway/scaleway-sdk-go v1.0.0-beta.19 h1:+1H+N9QFl2Sfvia0FBYfMrHYHYhmpZxhSE0wpPL2lYs= -github.com/scaleway/scaleway-sdk-go v1.0.0-beta.19/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= +github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20 h1:a9hSJdJcd16e0HoMsnFvaHvxB3pxSD+SC7+CISp7xY0= +github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shoenig/test v0.6.6 h1:Oe8TPH9wAbv++YPNDKJWUnI8Q4PPWCx3UbOfH+FxiMU= @@ -797,6 +797,10 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 h1:iT5qH0NLmkGeIdDtnBogYDx7L58t6CaWGL378DEo2QY= +go.opentelemetry.io/collector/pdata v1.0.0-rcv0014/go.mod h1:BRvDrx43kiSoUx3mr7SoA7h9B8+OY99mUK+CZSQFWW4= +go.opentelemetry.io/collector/semconv v0.81.0 h1:lCYNNo3powDvFIaTPP2jDKIrBiV1T92NK4QgL/aHYXw= +go.opentelemetry.io/collector/semconv v0.81.0/go.mod h1:TlYPtzvsXyHOgr5eATi43qEMqwSmIziivJB2uctKswo= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= @@ -828,6 +832,8 @@ go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 782b07df6f3..41873278cbf 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -15,6 +15,7 @@ package histogram import ( "fmt" + "math" "strings" ) @@ -93,26 +94,8 @@ func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram { Sum: h.Sum, } - // TODO(beorn7): This is a straight-forward implementation using merging - // iterators for the original buckets and then adding one merged bucket - // after another to the newly created FloatHistogram. It's well possible - // that a more involved implementation performs much better, which we - // could do if this code path turns out to be performance-critical. - var iInSpan, index int32 - for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(true, 0, targetSchema); it.Next(); { - b := it.At() - c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( - b, c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan, index, - ) - index = b.Index - } - for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(false, 0, targetSchema); it.Next(); { - b := it.At() - c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( - b, c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan, index, - ) - index = b.Index - } + c.PositiveSpans, c.PositiveBuckets = mergeToSchema(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema) + c.NegativeSpans, c.NegativeBuckets = mergeToSchema(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema) return &c } @@ -148,6 +131,55 @@ func (h *FloatHistogram) String() string { return sb.String() } +// TestExpression returns the string representation of this histogram as it is used in the internal PromQL testing +// framework as well as in promtool rules unit tests. +// The syntax is described in https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#series +func (h *FloatHistogram) TestExpression() string { + var res []string + m := h.Copy() + + m.Compact(math.MaxInt) // Compact to reduce the number of positive and negative spans to 1. + + if m.Schema != 0 { + res = append(res, fmt.Sprintf("schema:%d", m.Schema)) + } + if m.Count != 0 { + res = append(res, fmt.Sprintf("count:%g", m.Count)) + } + if m.Sum != 0 { + res = append(res, fmt.Sprintf("sum:%g", m.Sum)) + } + if m.ZeroCount != 0 { + res = append(res, fmt.Sprintf("z_bucket:%g", m.ZeroCount)) + } + if m.ZeroThreshold != 0 { + res = append(res, fmt.Sprintf("z_bucket_w:%g", m.ZeroThreshold)) + } + + addBuckets := func(kind, bucketsKey, offsetKey string, buckets []float64, spans []Span) []string { + if len(spans) > 1 { + panic(fmt.Sprintf("histogram with multiple %s spans not supported", kind)) + } + for _, span := range spans { + if span.Offset != 0 { + res = append(res, fmt.Sprintf("%s:%d", offsetKey, span.Offset)) + } + } + + var bucketStr []string + for _, bucket := range buckets { + bucketStr = append(bucketStr, fmt.Sprintf("%g", bucket)) + } + if len(bucketStr) > 0 { + res = append(res, fmt.Sprintf("%s:[%s]", bucketsKey, strings.Join(bucketStr, " "))) + } + return res + } + res = addBuckets("positive", "buckets", "offset", m.PositiveBuckets, m.PositiveSpans) + res = addBuckets("negative", "n_buckets", "n_offset", m.NegativeBuckets, m.NegativeSpans) + return "{{" + strings.Join(res, " ") + "}}" +} + // ZeroBucket returns the zero bucket. func (h *FloatHistogram) ZeroBucket() Bucket[float64] { return Bucket[float64]{ @@ -177,7 +209,7 @@ func (h *FloatHistogram) Mul(factor float64) *FloatHistogram { return h } -// Div works like Scale but divides instead of multiplies. +// Div works like Mul but divides instead of multiplies. // When dividing by 0, everything will be set to Inf. func (h *FloatHistogram) Div(scalar float64) *FloatHistogram { h.ZeroCount /= scalar @@ -236,23 +268,17 @@ func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram { h.Count += other.Count h.Sum += other.Sum - // TODO(beorn7): If needed, this can be optimized by inspecting the - // spans in other and create missing buckets in h in batches. - var iInSpan, index int32 - for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); { - b := it.At() - h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( - b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index, - ) - index = b.Index - } - for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); { - b := it.At() - h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( - b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index, - ) - index = b.Index + otherPositiveSpans := other.PositiveSpans + otherPositiveBuckets := other.PositiveBuckets + otherNegativeSpans := other.NegativeSpans + otherNegativeBuckets := other.NegativeBuckets + if other.Schema != h.Schema { + otherPositiveSpans, otherPositiveBuckets = mergeToSchema(other.PositiveSpans, other.PositiveBuckets, other.Schema, h.Schema) + otherNegativeSpans, otherNegativeBuckets = mergeToSchema(other.NegativeSpans, other.NegativeBuckets, other.Schema, h.Schema) } + + h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.PositiveSpans, h.PositiveBuckets, otherPositiveSpans, otherPositiveBuckets) + h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.NegativeSpans, h.NegativeBuckets, otherNegativeSpans, otherNegativeBuckets) return h } @@ -263,25 +289,17 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram { h.Count -= other.Count h.Sum -= other.Sum - // TODO(beorn7): If needed, this can be optimized by inspecting the - // spans in other and create missing buckets in h in batches. - var iInSpan, index int32 - for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); { - b := it.At() - b.Count *= -1 - h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( - b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index, - ) - index = b.Index - } - for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); { - b := it.At() - b.Count *= -1 - h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( - b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index, - ) - index = b.Index + otherPositiveSpans := other.PositiveSpans + otherPositiveBuckets := other.PositiveBuckets + otherNegativeSpans := other.NegativeSpans + otherNegativeBuckets := other.NegativeBuckets + if other.Schema != h.Schema { + otherPositiveSpans, otherPositiveBuckets = mergeToSchema(other.PositiveSpans, other.PositiveBuckets, other.Schema, h.Schema) + otherNegativeSpans, otherNegativeBuckets = mergeToSchema(other.NegativeSpans, other.NegativeBuckets, other.Schema, h.Schema) } + + h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.PositiveSpans, h.PositiveBuckets, otherPositiveSpans, otherPositiveBuckets) + h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.NegativeSpans, h.NegativeBuckets, otherNegativeSpans, otherNegativeBuckets) return h } @@ -316,103 +334,6 @@ func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool { return true } -// addBucket takes the "coordinates" of the last bucket that was handled and -// adds the provided bucket after it. If a corresponding bucket exists, the -// count is added. If not, the bucket is inserted. The updated slices and the -// coordinates of the inserted or added-to bucket are returned. -func addBucket( - b Bucket[float64], - spans []Span, buckets []float64, - iSpan, iBucket int, - iInSpan, index int32, -) ( - newSpans []Span, newBuckets []float64, - newISpan, newIBucket int, newIInSpan int32, -) { - if iSpan == -1 { - // First add, check if it is before all spans. - if len(spans) == 0 || spans[0].Offset > b.Index { - // Add bucket before all others. - buckets = append(buckets, 0) - copy(buckets[1:], buckets) - buckets[0] = b.Count - if len(spans) > 0 && spans[0].Offset == b.Index+1 { - spans[0].Length++ - spans[0].Offset-- - return spans, buckets, 0, 0, 0 - } - spans = append(spans, Span{}) - copy(spans[1:], spans) - spans[0] = Span{Offset: b.Index, Length: 1} - if len(spans) > 1 { - // Convert the absolute offset in the formerly - // first span to a relative offset. - spans[1].Offset -= b.Index + 1 - } - return spans, buckets, 0, 0, 0 - } - if spans[0].Offset == b.Index { - // Just add to first bucket. - buckets[0] += b.Count - return spans, buckets, 0, 0, 0 - } - // We are behind the first bucket, so set everything to the - // first bucket and continue normally. - iSpan, iBucket, iInSpan = 0, 0, 0 - index = spans[0].Offset - } - deltaIndex := b.Index - index - for { - remainingInSpan := int32(spans[iSpan].Length) - iInSpan - if deltaIndex < remainingInSpan { - // Bucket is in current span. - iBucket += int(deltaIndex) - iInSpan += deltaIndex - buckets[iBucket] += b.Count - return spans, buckets, iSpan, iBucket, iInSpan - } - deltaIndex -= remainingInSpan - iBucket += int(remainingInSpan) - iSpan++ - if iSpan == len(spans) || deltaIndex < spans[iSpan].Offset { - // Bucket is in gap behind previous span (or there are no further spans). - buckets = append(buckets, 0) - copy(buckets[iBucket+1:], buckets[iBucket:]) - buckets[iBucket] = b.Count - if deltaIndex == 0 { - // Directly after previous span, extend previous span. - if iSpan < len(spans) { - spans[iSpan].Offset-- - } - iSpan-- - iInSpan = int32(spans[iSpan].Length) - spans[iSpan].Length++ - return spans, buckets, iSpan, iBucket, iInSpan - } - if iSpan < len(spans) && deltaIndex == spans[iSpan].Offset-1 { - // Directly before next span, extend next span. - iInSpan = 0 - spans[iSpan].Offset-- - spans[iSpan].Length++ - return spans, buckets, iSpan, iBucket, iInSpan - } - // No next span, or next span is not directly adjacent to new bucket. - // Add new span. - iInSpan = 0 - if iSpan < len(spans) { - spans[iSpan].Offset -= deltaIndex + 1 - } - spans = append(spans, Span{}) - copy(spans[iSpan+1:], spans[iSpan:]) - spans[iSpan] = Span{Length: 1, Offset: deltaIndex} - return spans, buckets, iSpan, iBucket, iInSpan - } - // Try start of next span. - deltaIndex -= spans[iSpan].Offset - iInSpan = 0 - } -} - // Compact eliminates empty buckets at the beginning and end of each span, then // merges spans that are consecutive or at most maxEmptyBuckets apart, and // finally splits spans that contain more consecutive empty buckets than @@ -818,6 +739,11 @@ type floatBucketIterator struct { absoluteStartValue float64 // Never return buckets with an upper bound ≤ this value. } +func (i *floatBucketIterator) At() Bucket[float64] { + // Need to use i.targetSchema rather than i.baseBucketIterator.schema. + return i.baseBucketIterator.at(i.targetSchema) +} + func (i *floatBucketIterator) Next() bool { if i.spansIdx >= len(i.spans) { return false @@ -977,3 +903,202 @@ func (i *allFloatBucketIterator) Next() bool { func (i *allFloatBucketIterator) At() Bucket[float64] { return i.currBucket } + +// targetIdx returns the bucket index in the target schema for the given bucket +// index idx in the original schema. +func targetIdx(idx, originSchema, targetSchema int32) int32 { + return ((idx - 1) >> (originSchema - targetSchema)) + 1 +} + +// mergeToSchema is used to merge a FloatHistogram's Spans and Buckets (no matter if +// positive or negative) from the original schema to the target schema. +// The target schema must be smaller than the original schema. +func mergeToSchema(originSpans []Span, originBuckets []float64, originSchema, targetSchema int32) ([]Span, []float64) { + var ( + targetSpans []Span // The spans in the target schema. + targetBuckets []float64 // The buckets in the target schema. + bucketIdx int32 // The index of bucket in the origin schema. + lastTargetBucketIdx int32 // The index of the last added target bucket. + origBucketIdx int // The position of a bucket in originBuckets slice. + ) + + for _, span := range originSpans { + // Determine the index of the first bucket in this span. + bucketIdx += span.Offset + for j := 0; j < int(span.Length); j++ { + // Determine the index of the bucket in the target schema from the index in the original schema. + targetBucketIdx := targetIdx(bucketIdx, originSchema, targetSchema) + + switch { + case len(targetSpans) == 0: + // This is the first span in the targetSpans. + span := Span{ + Offset: targetBucketIdx, + Length: 1, + } + targetSpans = append(targetSpans, span) + targetBuckets = append(targetBuckets, originBuckets[0]) + lastTargetBucketIdx = targetBucketIdx + + case lastTargetBucketIdx == targetBucketIdx: + // The current bucket has to be merged into the same target bucket as the previous bucket. + targetBuckets[len(targetBuckets)-1] += originBuckets[origBucketIdx] + + case (lastTargetBucketIdx + 1) == targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is next to the previous target bucket, + // so we add it to the current target span. + targetSpans[len(targetSpans)-1].Length++ + targetBuckets = append(targetBuckets, originBuckets[origBucketIdx]) + lastTargetBucketIdx++ + + case (lastTargetBucketIdx + 1) < targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is separated by a gap from the previous target bucket, + // so we need to add a new target span. + span := Span{ + Offset: targetBucketIdx - lastTargetBucketIdx - 1, + Length: 1, + } + targetSpans = append(targetSpans, span) + targetBuckets = append(targetBuckets, originBuckets[origBucketIdx]) + lastTargetBucketIdx = targetBucketIdx + } + + bucketIdx++ + origBucketIdx++ + } + } + + return targetSpans, targetBuckets +} + +// addBuckets adds the buckets described by spansB/bucketsB to the buckets described by spansA/bucketsA, +// creating missing buckets in spansA/bucketsA as needed. +// It returns the resulting spans/buckets (which must be used instead of the original spansA/bucketsA, +// although spansA/bucketsA might get modified by this function). +// All buckets must use the same provided schema. +// Buckets in spansB/bucketsB with an absolute upper limit ≤ threshold are ignored. +// If negative is true, the buckets in spansB/bucketsB are subtracted rather than added. +func addBuckets( + schema int32, threshold float64, negative bool, + spansA []Span, bucketsA []float64, + spansB []Span, bucketsB []float64, +) ([]Span, []float64) { + var ( + iSpan int = -1 + iBucket int = -1 + iInSpan int32 + indexA int32 + indexB int32 + bIdxB int + bucketB float64 + deltaIndex int32 + lowerThanThreshold = true + ) + + for _, spanB := range spansB { + indexB += spanB.Offset + for j := 0; j < int(spanB.Length); j++ { + if lowerThanThreshold && getBound(indexB, schema) <= threshold { + goto nextLoop + } + lowerThanThreshold = false + + bucketB = bucketsB[bIdxB] + if negative { + bucketB *= -1 + } + + if iSpan == -1 { + if len(spansA) == 0 || spansA[0].Offset > indexB { + // Add bucket before all others. + bucketsA = append(bucketsA, 0) + copy(bucketsA[1:], bucketsA) + bucketsA[0] = bucketB + if len(spansA) > 0 && spansA[0].Offset == indexB+1 { + spansA[0].Length++ + spansA[0].Offset-- + goto nextLoop + } else { + spansA = append(spansA, Span{}) + copy(spansA[1:], spansA) + spansA[0] = Span{Offset: indexB, Length: 1} + if len(spansA) > 1 { + // Convert the absolute offset in the formerly + // first span to a relative offset. + spansA[1].Offset -= indexB + 1 + } + goto nextLoop + } + } else if spansA[0].Offset == indexB { + // Just add to first bucket. + bucketsA[0] += bucketB + goto nextLoop + } + iSpan, iBucket, iInSpan = 0, 0, 0 + indexA = spansA[0].Offset + } + deltaIndex = indexB - indexA + for { + remainingInSpan := int32(spansA[iSpan].Length) - iInSpan + if deltaIndex < remainingInSpan { + // Bucket is in current span. + iBucket += int(deltaIndex) + iInSpan += deltaIndex + bucketsA[iBucket] += bucketB + break + } else { + deltaIndex -= remainingInSpan + iBucket += int(remainingInSpan) + iSpan++ + if iSpan == len(spansA) || deltaIndex < spansA[iSpan].Offset { + // Bucket is in gap behind previous span (or there are no further spans). + bucketsA = append(bucketsA, 0) + copy(bucketsA[iBucket+1:], bucketsA[iBucket:]) + bucketsA[iBucket] = bucketB + switch { + case deltaIndex == 0: + // Directly after previous span, extend previous span. + if iSpan < len(spansA) { + spansA[iSpan].Offset-- + } + iSpan-- + iInSpan = int32(spansA[iSpan].Length) + spansA[iSpan].Length++ + goto nextLoop + case iSpan < len(spansA) && deltaIndex == spansA[iSpan].Offset-1: + // Directly before next span, extend next span. + iInSpan = 0 + spansA[iSpan].Offset-- + spansA[iSpan].Length++ + goto nextLoop + default: + // No next span, or next span is not directly adjacent to new bucket. + // Add new span. + iInSpan = 0 + if iSpan < len(spansA) { + spansA[iSpan].Offset -= deltaIndex + 1 + } + spansA = append(spansA, Span{}) + copy(spansA[iSpan+1:], spansA[iSpan:]) + spansA[iSpan] = Span{Length: 1, Offset: deltaIndex} + goto nextLoop + } + } else { + // Try start of next span. + deltaIndex -= spansA[iSpan].Offset + iInSpan = 0 + } + } + } + + nextLoop: + indexA = indexB + indexB++ + bIdxB++ + } + } + + return spansA, bucketsA +} diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index ce749b71017..0b712be4386 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -938,6 +938,21 @@ func TestFloatHistogramCompact(t *testing.T) { NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000, 3, 4}, }, }, + { + "cut empty buckets in the middle", + &FloatHistogram{ + PositiveSpans: []Span{{5, 4}}, + PositiveBuckets: []float64{1, 3, 0, 2}, + }, + 0, + &FloatHistogram{ + PositiveSpans: []Span{ + {Offset: 5, Length: 2}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []float64{1, 3, 2}, + }, + }, { "cut empty buckets at start or end of spans, even in the middle", &FloatHistogram{ @@ -955,7 +970,7 @@ func TestFloatHistogramCompact(t *testing.T) { }, }, { - "cut empty buckets at start or end but merge spans due to maxEmptyBuckets", + "cut empty buckets at start and end - also merge spans due to maxEmptyBuckets", &FloatHistogram{ PositiveSpans: []Span{{-4, 4}, {5, 3}}, PositiveBuckets: []float64{0, 0, 1, 3.3, 4.2, 0.1, 3.3}, @@ -998,18 +1013,42 @@ func TestFloatHistogramCompact(t *testing.T) { PositiveBuckets: []float64{1, 3.3, 4.2, 0.1, 3.3}, }, }, + { + "cut empty buckets from the middle of a span, avoiding none due to maxEmptyBuckets", + &FloatHistogram{ + PositiveSpans: []Span{{-2, 4}}, + PositiveBuckets: []float64{1, 0, 0, 3.3}, + }, + 1, + &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}, {2, 1}}, + PositiveBuckets: []float64{1, 3.3}, + }, + }, + { + "cut empty buckets and merge spans due to maxEmptyBuckets", + &FloatHistogram{ + PositiveSpans: []Span{{-2, 4}, {3, 1}}, + PositiveBuckets: []float64{1, 0, 0, 3.3, 4.2}, + }, + 1, + &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}, {2, 1}, {3, 1}}, + PositiveBuckets: []float64{1, 3.3, 4.2}, + }, + }, { "cut empty buckets from the middle of a span, avoiding some due to maxEmptyBuckets", &FloatHistogram{ - PositiveSpans: []Span{{-4, 6}, {3, 3}}, - PositiveBuckets: []float64{0, 0, 1, 0, 0, 3.3, 4.2, 0.1, 3.3}, + PositiveSpans: []Span{{-4, 6}, {3, 3}, {10, 2}}, + PositiveBuckets: []float64{0, 0, 1, 0, 0, 3.3, 4.2, 0.1, 3.3, 2, 3}, NegativeSpans: []Span{{0, 2}, {3, 5}}, NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000, 0, 3, 4}, }, 1, &FloatHistogram{ - PositiveSpans: []Span{{-2, 1}, {2, 1}, {3, 3}}, - PositiveBuckets: []float64{1, 3.3, 4.2, 0.1, 3.3}, + PositiveSpans: []Span{{-2, 1}, {2, 1}, {3, 3}, {10, 2}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1, 3.3, 2, 3}, NegativeSpans: []Span{{0, 2}, {3, 5}}, NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000, 0, 3, 4}, }, @@ -2205,3 +2244,50 @@ func TestAllReverseFloatBucketIterator(t *testing.T) { }) } } + +func TestFloatBucketIteratorTargetSchema(t *testing.T) { + h := FloatHistogram{ + Count: 405, + Sum: 1008.4, + Schema: 1, + PositiveSpans: []Span{ + {Offset: 0, Length: 4}, + {Offset: 1, Length: 3}, + {Offset: 2, Length: 3}, + }, + PositiveBuckets: []float64{100, 344, 123, 55, 3, 63, 2, 54, 235, 33}, + NegativeSpans: []Span{ + {Offset: 0, Length: 3}, + {Offset: 7, Length: 4}, + {Offset: 1, Length: 3}, + }, + NegativeBuckets: []float64{10, 34, 1230, 54, 67, 63, 2, 554, 235, 33}, + } + expPositiveBuckets := []Bucket[float64]{ + {Lower: 0.25, Upper: 1, LowerInclusive: false, UpperInclusive: true, Count: 100, Index: 0}, + {Lower: 1, Upper: 4, LowerInclusive: false, UpperInclusive: true, Count: 522, Index: 1}, + {Lower: 4, Upper: 16, LowerInclusive: false, UpperInclusive: true, Count: 68, Index: 2}, + {Lower: 16, Upper: 64, LowerInclusive: false, UpperInclusive: true, Count: 322, Index: 3}, + } + expNegativeBuckets := []Bucket[float64]{ + {Lower: -1, Upper: -0.25, LowerInclusive: true, UpperInclusive: false, Count: 10, Index: 0}, + {Lower: -4, Upper: -1, LowerInclusive: true, UpperInclusive: false, Count: 1264, Index: 1}, + {Lower: -64, Upper: -16, LowerInclusive: true, UpperInclusive: false, Count: 184, Index: 3}, + {Lower: -256, Upper: -64, LowerInclusive: true, UpperInclusive: false, Count: 791, Index: 4}, + {Lower: -1024, Upper: -256, LowerInclusive: true, UpperInclusive: false, Count: 33, Index: 5}, + } + + it := h.floatBucketIterator(true, 0, -1) + for i, b := range expPositiveBuckets { + require.True(t, it.Next(), "positive iterator exhausted too early") + require.Equal(t, b, it.At(), "bucket %d", i) + } + require.False(t, it.Next(), "positive iterator not exhausted") + + it = h.floatBucketIterator(false, 0, -1) + for i, b := range expNegativeBuckets { + require.True(t, it.Next(), "negative iterator exhausted too early") + require.Equal(t, b, it.At(), "bucket %d", i) + } + require.False(t, it.Next(), "negative iterator not exhausted") +} diff --git a/model/histogram/generic.go b/model/histogram/generic.go index e1de5ffb52d..dad54cb0698 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -102,16 +102,22 @@ type baseBucketIterator[BC BucketCount, IBC InternalBucketCount] struct { } func (b baseBucketIterator[BC, IBC]) At() Bucket[BC] { + return b.at(b.schema) +} + +// at is an internal version of the exported At to enable using a different +// schema. +func (b baseBucketIterator[BC, IBC]) at(schema int32) Bucket[BC] { bucket := Bucket[BC]{ Count: BC(b.currCount), Index: b.currIdx, } if b.positive { - bucket.Upper = getBound(b.currIdx, b.schema) - bucket.Lower = getBound(b.currIdx-1, b.schema) + bucket.Upper = getBound(b.currIdx, schema) + bucket.Lower = getBound(b.currIdx-1, schema) } else { - bucket.Lower = -getBound(b.currIdx, b.schema) - bucket.Upper = -getBound(b.currIdx-1, b.schema) + bucket.Lower = -getBound(b.currIdx, schema) + bucket.Upper = -getBound(b.currIdx-1, schema) } bucket.LowerInclusive = bucket.Lower < 0 bucket.UpperInclusive = bucket.Upper > 0 diff --git a/model/labels/labels_stringlabels.go b/model/labels/labels_stringlabels.go index 223aa6ebf7d..a87545a26bf 100644 --- a/model/labels/labels_stringlabels.go +++ b/model/labels/labels_stringlabels.go @@ -49,12 +49,6 @@ type Labels struct { data string } -type labelSlice []Label - -func (ls labelSlice) Len() int { return len(ls) } -func (ls labelSlice) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] } -func (ls labelSlice) Less(i, j int) bool { return ls[i].Name < ls[j].Name } - func decodeSize(data string, index int) (int, int) { // Fast-path for common case of a single byte, value 0..127. b := data[index] @@ -300,13 +294,26 @@ func (ls Labels) Get(name string) string { // Has returns true if the label with the given name is present. func (ls Labels) Has(name string) bool { + if name == "" { // Avoid crash in loop if someone asks for "". + return false // Prometheus does not store blank label names. + } for i := 0; i < len(ls.data); { - var lName string - lName, i = decodeString(ls.data, i) - _, i = decodeString(ls.data, i) - if lName == name { - return true + var size int + size, i = decodeSize(ls.data, i) + if ls.data[i] == name[0] { + lName := ls.data[i : i+size] + i += size + if lName == name { + return true + } + } else { + if ls.data[i] > name[0] { // Stop looking if we've gone past. + break + } + i += size } + size, i = decodeSize(ls.data, i) + i += size } return false } diff --git a/model/labels/labels_test.go b/model/labels/labels_test.go index d91be27cbce..a5401b92440 100644 --- a/model/labels/labels_test.go +++ b/model/labels/labels_test.go @@ -472,16 +472,22 @@ func BenchmarkLabels_Get(b *testing.B) { for _, scenario := range []struct { desc, label string }{ - {"get first label", allLabels[0].Name}, - {"get middle label", allLabels[size/2].Name}, - {"get last label", allLabels[size-1].Name}, - {"get not-found label", "benchmark"}, + {"first label", allLabels[0].Name}, + {"middle label", allLabels[size/2].Name}, + {"last label", allLabels[size-1].Name}, + {"not-found label", "benchmark"}, } { b.Run(scenario.desc, func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _ = labels.Get(scenario.label) - } + b.Run("get", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = labels.Get(scenario.label) + } + }) + b.Run("has", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = labels.Has(scenario.label) + } + }) }) } }) diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index 29ccdb84df0..fbb84a2bd30 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -56,6 +56,10 @@ type ProtobufParser struct { fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed. redoClassic bool // true after parsing a native histogram if we need to parse it again as a classic histogram. + // exemplarReturned is set to true each time an exemplar has been + // returned, and set back to false upon each Next() call. + exemplarReturned bool + // state is marked by the entry we are processing. EntryInvalid implies // that we have to decode the next MetricFamily. state Entry @@ -293,8 +297,12 @@ func (p *ProtobufParser) Metric(l *labels.Labels) string { // Exemplar writes the exemplar of the current sample into the passed // exemplar. It returns if an exemplar exists or not. In case of a native // histogram, the legacy bucket section is still used for exemplars. To ingest -// all examplars, call the Exemplar method repeatedly until it returns false. +// all exemplars, call the Exemplar method repeatedly until it returns false. func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { + if p.exemplarReturned && p.state == EntrySeries { + // We only ever return one exemplar per (non-native-histogram) series. + return false + } m := p.mf.GetMetric()[p.metricPos] var exProto *dto.Exemplar switch p.mf.GetType() { @@ -335,6 +343,7 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { } p.builder.Sort() ex.Labels = p.builder.Labels() + p.exemplarReturned = true return true } @@ -342,6 +351,7 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { // text format parser). It returns (EntryInvalid, io.EOF) if no samples were // read. func (p *ProtobufParser) Next() (Entry, error) { + p.exemplarReturned = false switch p.state { case EntryInvalid: p.metricPos = 0 @@ -554,20 +564,17 @@ func formatOpenMetricsFloat(f float64) string { return s + ".0" } -// isNativeHistogram returns false iff the provided histograms has no sparse -// buckets and a zero threshold of 0 and a zero count of 0. In principle, this -// could still be meant to be a native histogram (with a zero threshold of 0 and -// no observations yet), but for now, we'll treat this case as a conventional -// histogram. -// -// TODO(beorn7): In the final format, there should be an unambiguous way of -// deciding if a histogram should be ingested as a conventional one or a native -// one. +// isNativeHistogram returns false iff the provided histograms has no spans at +// all (neither positive nor negative) and a zero threshold of 0 and a zero +// count of 0. In principle, this could still be meant to be a native histogram +// with a zero threshold of 0 and no observations yet. In that case, +// instrumentation libraries should add a "no-op" span (e.g. length zero, offset +// zero) to signal that the histogram is meant to be parsed as a native +// histogram. Failing to do so will cause Prometheus to parse it as a classic +// histogram as long as no observations have happened. func isNativeHistogram(h *dto.Histogram) bool { - return h.GetZeroThreshold() > 0 || - h.GetZeroCount() > 0 || - len(h.GetNegativeDelta()) > 0 || - len(h.GetPositiveDelta()) > 0 || - len(h.GetNegativeCount()) > 0 || - len(h.GetPositiveCount()) > 0 + return len(h.GetPositiveSpan()) > 0 || + len(h.GetNegativeSpan()) > 0 || + h.GetZeroThreshold() > 0 || + h.GetZeroCount() > 0 } diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index 07f84f4aeb2..5436d7f3e3c 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -517,6 +517,19 @@ metric: < sample_sum: 1.234 > > +`, + `name: "empty_histogram" +help: "A histogram without observations and with a zero threshold of zero but with a no-op span to identify it as a native histogram." +type: HISTOGRAM +metric: < + histogram: < + positive_span: < + offset: 0 + length: 0 + > + > +> + `, } @@ -965,6 +978,25 @@ func TestProtobufParse(t *testing.T) { "__name__", "without_quantiles_sum", ), }, + { + m: "empty_histogram", + help: "A histogram without observations and with a zero threshold of zero but with a no-op span to identify it as a native histogram.", + }, + { + m: "empty_histogram", + typ: MetricTypeHistogram, + }, + { + m: "empty_histogram", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "empty_histogram", + ), + }, }, }, { @@ -1688,6 +1720,25 @@ func TestProtobufParse(t *testing.T) { "__name__", "without_quantiles_sum", ), }, + { // 78 + m: "empty_histogram", + help: "A histogram without observations and with a zero threshold of zero but with a no-op span to identify it as a native histogram.", + }, + { // 79 + m: "empty_histogram", + typ: MetricTypeHistogram, + }, + { // 80 + m: "empty_histogram", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "empty_histogram", + ), + }, }, }, } @@ -1728,6 +1779,7 @@ func TestProtobufParse(t *testing.T) { } else { require.Equal(t, true, found, "i: %d", i) require.Equal(t, exp[i].e[0], e, "i: %d", i) + require.False(t, p.Exemplar(&e), "too many exemplars returned, i: %d", i) } case EntryHistogram: diff --git a/prompb/io/prometheus/client/metrics.pb.go b/prompb/io/prometheus/client/metrics.pb.go index 3e4bc7df8fa..83a7da77999 100644 --- a/prompb/io/prometheus/client/metrics.pb.go +++ b/prompb/io/prometheus/client/metrics.pb.go @@ -414,6 +414,9 @@ type Histogram struct { NegativeDelta []int64 `protobuf:"zigzag64,10,rep,packed,name=negative_delta,json=negativeDelta,proto3" json:"negative_delta,omitempty"` NegativeCount []float64 `protobuf:"fixed64,11,rep,packed,name=negative_count,json=negativeCount,proto3" json:"negative_count,omitempty"` // Positive buckets for the native histogram. + // Use a no-op span (offset 0, length 0) for a native histogram without any + // observations yet and with a zero_threshold of 0. Otherwise, it would be + // indistinguishable from a classic histogram. PositiveSpan []BucketSpan `protobuf:"bytes,12,rep,name=positive_span,json=positiveSpan,proto3" json:"positive_span"` // Use either "positive_delta" or "positive_count", the former for // regular histograms with integer counts, the latter for float diff --git a/prompb/io/prometheus/client/metrics.proto b/prompb/io/prometheus/client/metrics.proto index 6bbea622f24..3fef2b6d005 100644 --- a/prompb/io/prometheus/client/metrics.proto +++ b/prompb/io/prometheus/client/metrics.proto @@ -97,6 +97,9 @@ message Histogram { repeated double negative_count = 11; // Absolute count of each bucket. // Positive buckets for the native histogram. + // Use a no-op span (offset 0, length 0) for a native histogram without any + // observations yet and with a zero_threshold of 0. Otherwise, it would be + // indistinguishable from a classic histogram. repeated BucketSpan positive_span = 12 [(gogoproto.nullable) = false]; // Use either "positive_delta" or "positive_count", the former for // regular histograms with integer counts, the latter for float diff --git a/promql/bench_test.go b/promql/bench_test.go index 6818498bf8d..c6a528f7bf3 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -186,6 +186,10 @@ func rangeQueryCases() []benchCase { expr: "count({__name__!=\"\",l=\"\"})", steps: 1, }, + // Functions which have special handling inside eval() + { + expr: "timestamp(a_X)", + }, } // X in an expr will be replaced by different metric sizes. diff --git a/promql/engine.go b/promql/engine.go index 83bbdeff890..816f20721e4 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1143,7 +1143,11 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } } enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} - seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. + type seriesAndTimestamp struct { + Series + ts int64 + } + seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples var ( @@ -1228,9 +1232,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) // Make the function call. enh.Ts = ts result, ws := funcCall(args, bufHelpers, enh) - if result.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") - } enh.Out = result[:0] // Reuse result vector. warnings = append(warnings, ws...) @@ -1247,6 +1248,9 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { + if result.ContainsSameLabelset() { + ev.errorf("vector cannot contain metrics with the same labelset") + } mat := make(Matrix, len(result)) for i, s := range result { if s.H == nil { @@ -1264,8 +1268,13 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) for _, sample := range result { h := sample.Metric.Hash() ss, ok := seriess[h] - if !ok { - ss = Series{Metric: sample.Metric} + if ok { + if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. + ev.errorf("vector cannot contain metrics with the same labelset") + } + ss.ts = ts + } else { + ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} } if sample.H == nil { if ss.Floats == nil { @@ -1292,7 +1301,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) for _, ss := range seriess { - mat = append(mat, ss) + mat = append(mat, ss.Series) } ev.currentSamples = originalNumSamples + mat.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -1387,15 +1396,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { unwrapParenExpr(&arg) vs, ok := arg.(*parser.VectorSelector) if ok { - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { - if vs.Timestamp != nil { - // This is a special case only for "timestamp" since the offset - // needs to be adjusted for every point. - vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond - } - val, ws := ev.vectorSelector(vs, enh.Ts) - return call([]parser.Value{val}, e.Args, enh), ws - }) + return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e) } } @@ -1833,38 +1834,48 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(fmt.Errorf("unhandled expression of type: %T", expr)) } -// vectorSelector evaluates a *parser.VectorSelector expression. -func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { - ws, err := checkAndExpandSeriesSet(ev.ctx, node) +func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) { + ws, err := checkAndExpandSeriesSet(ev.ctx, vs) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } - vec := make(Vector, 0, len(node.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) - var chkIter chunkenc.Iterator - for i, s := range node.Series { - chkIter = s.Iterator(chkIter) - it.Reset(chkIter) - t, f, h, ok := ev.vectorSelectorSingle(it, node, ts) - if ok { - vec = append(vec, Sample{ - Metric: node.Series[i].Labels(), - T: t, - F: f, - H: h, - }) + seriesIterators := make([]*storage.MemoizedSeriesIterator, len(vs.Series)) + for i, s := range vs.Series { + it := s.Iterator(nil) + seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)) + } - ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtTimestamp(ts, 1) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + if vs.Timestamp != nil { + // This is a special case for "timestamp()" when the @ modifier is used, to ensure that + // we return a point for each time step in this case. + // See https://github.com/prometheus/prometheus/issues/8433. + vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } - } - ev.samplesStats.UpdatePeak(ev.currentSamples) - return vec, ws + vec := make(Vector, 0, len(vs.Series)) + for i, s := range vs.Series { + it := seriesIterators[i] + t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) + if ok { + vec = append(vec, Sample{ + Metric: s.Labels(), + T: t, + F: f, + H: h, + }) + + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + } + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + return call([]parser.Value{vec}, e.Args, enh), ws + }) } // vectorSelectorSingle evaluates an instant vector for the iterator of one time series. @@ -2874,7 +2885,7 @@ func btos(b bool) float64 { // result of the op operation. func shouldDropMetricName(op parser.ItemType) bool { switch op { - case parser.ADD, parser.SUB, parser.DIV, parser.MUL, parser.POW, parser.MOD: + case parser.ADD, parser.SUB, parser.DIV, parser.MUL, parser.POW, parser.MOD, parser.ATAN2: return true default: return false diff --git a/promql/engine_test.go b/promql/engine_test.go index ca4a022e092..82e44bcbca8 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -25,8 +25,6 @@ import ( "github.com/go-kit/log" - "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -35,7 +33,9 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/stats" + "github.com/prometheus/prometheus/util/teststorage" ) func TestMain(m *testing.M) { @@ -566,6 +566,7 @@ func TestSelectHintsSetCorrectly(t *testing.T) { err error ) ctx := context.Background() + if tc.end == 0 { query, err = engine.NewInstantQuery(ctx, hintsRecorder, nil, tc.query, timestamp.Time(tc.start)) } else { @@ -573,7 +574,7 @@ func TestSelectHintsSetCorrectly(t *testing.T) { } require.NoError(t, err) - res := query.Exec(ctx) + res := query.Exec(context.Background()) require.NoError(t, res.Err) require.Equal(t, tc.expected, hintsRecorder.hints) @@ -636,15 +637,11 @@ func TestEngineShutdown(t *testing.T) { } func TestEngineEvalStmtTimestamps(t *testing.T) { - test, err := NewTest(t, ` + storage := LoadedStorage(t, ` load 10s metric 1 2 `) - require.NoError(t, err) - defer test.Close() - - err = test.Run() - require.NoError(t, err) + t.Cleanup(func() { storage.Close() }) cases := []struct { Query string @@ -728,14 +725,15 @@ load 10s t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) { var err error var qry Query + engine := newTestEngine() if c.Interval == 0 { - qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start) + qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start) } else { - qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) + qry, err = engine.NewRangeQuery(context.Background(), storage, nil, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) if c.ShouldError { require.Error(t, res.Err, "expected error for the query %q", c.Query) return @@ -748,18 +746,14 @@ load 10s } func TestQueryStatistics(t *testing.T) { - test, err := NewTest(t, ` + storage := LoadedStorage(t, ` load 10s metricWith1SampleEvery10Seconds 1+1x100 metricWith3SampleEvery10Seconds{a="1",b="1"} 1+1x100 metricWith3SampleEvery10Seconds{a="2",b="2"} 1+1x100 metricWith3SampleEvery10Seconds{a="3",b="2"} 1+1x100 `) - require.NoError(t, err) - defer test.Close() - - err = test.Run() - require.NoError(t, err) + t.Cleanup(func() { storage.Close() }) cases := []struct { Query string @@ -1194,7 +1188,7 @@ load 10s }, } - engine := test.QueryEngine() + engine := newTestEngine() engine.enablePerStepStats = true origMaxSamples := engine.maxSamplesPerQuery for _, c := range cases { @@ -1206,13 +1200,13 @@ load 10s var err error var qry Query if c.Interval == 0 { - qry, err = engine.NewInstantQuery(test.context, test.Queryable(), opts, c.Query, c.Start) + qry, err = engine.NewInstantQuery(context.Background(), storage, opts, c.Query, c.Start) } else { - qry, err = engine.NewRangeQuery(test.context, test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval) + qry, err = engine.NewRangeQuery(context.Background(), storage, opts, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.Equal(t, expErr, res.Err) return qry.Stats() @@ -1234,17 +1228,13 @@ load 10s } func TestMaxQuerySamples(t *testing.T) { - test, err := NewTest(t, ` + storage := LoadedStorage(t, ` load 10s metric 1+1x100 bigmetric{a="1"} 1+1x100 bigmetric{a="2"} 1+1x100 `) - require.NoError(t, err) - defer test.Close() - - err = test.Run() - require.NoError(t, err) + t.Cleanup(func() { storage.Close() }) // These test cases should be touching the limit exactly (hence no exceeding). // Exceeding the limit will be tested by doing -1 to the MaxSamples. @@ -1382,20 +1372,20 @@ load 10s }, } - engine := test.QueryEngine() for _, c := range cases { t.Run(c.Query, func(t *testing.T) { + engine := newTestEngine() testFunc := func(expError error) { var err error var qry Query if c.Interval == 0 { - qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start) + qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start) } else { - qry, err = engine.NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) + qry, err = engine.NewRangeQuery(context.Background(), storage, nil, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) stats := qry.Stats() require.Equal(t, expError, res.Err) require.NotNil(t, stats) @@ -1416,7 +1406,8 @@ load 10s } func TestAtModifier(t *testing.T) { - test, err := NewTest(t, ` + engine := newTestEngine() + storage := LoadedStorage(t, ` load 10s metric{job="1"} 0+1x1000 metric{job="2"} 0+2x1000 @@ -1427,11 +1418,7 @@ load 10s load 1ms metric_ms 0+1x10000 `) - require.NoError(t, err) - defer test.Close() - - err = test.Run() - require.NoError(t, err) + t.Cleanup(func() { storage.Close() }) lbls1 := labels.FromStrings("__name__", "metric", "job", "1") lbls2 := labels.FromStrings("__name__", "metric", "job", "2") @@ -1441,7 +1428,7 @@ load 1ms lblsneg := labels.FromStrings("__name__", "metric_neg") // Add some samples with negative timestamp. - db := test.TSDB() + db := storage.DB app := db.Appender(context.Background()) ref, err := app.Append(0, lblsneg, -1000000, 1000) require.NoError(t, err) @@ -1630,13 +1617,13 @@ load 1ms var err error var qry Query if c.end == 0 { - qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.query, start) + qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.query, start) } else { - qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.query, start, end, interval) + qry, err = engine.NewRangeQuery(context.Background(), storage, nil, c.query, start, end, interval) } require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) if expMat, ok := c.result.(Matrix); ok { sort.Sort(expMat) @@ -1955,18 +1942,16 @@ func TestSubquerySelector(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - test, err := NewTest(t, tst.loadString) - require.NoError(t, err) - defer test.Close() + engine := newTestEngine() + storage := LoadedStorage(t, tst.loadString) + t.Cleanup(func() { storage.Close() }) - require.NoError(t, test.Run()) - engine := test.QueryEngine() for _, c := range tst.cases { t.Run(c.Query, func(t *testing.T) { - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.Equal(t, c.Result.Err, res.Err) mat := res.Value.(Matrix) sort.Sort(mat) @@ -1977,6 +1962,47 @@ func TestSubquerySelector(t *testing.T) { } } +func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) { + engine := newTestEngine() + storage := LoadedStorage(t, ` +load 1m + metric 0+1x1000 +`) + t.Cleanup(func() { storage.Close() }) + + query := "timestamp(metric)" + start := time.Unix(0, 0) + end := time.Unix(61, 0) + interval := time.Second + + // We expect the value to be 0 for t=0s to t=59s (inclusive), then 60 for t=60s and t=61s. + expectedPoints := []FPoint{} + + for t := 0; t <= 59; t++ { + expectedPoints = append(expectedPoints, FPoint{F: 0, T: int64(t * 1000)}) + } + + expectedPoints = append( + expectedPoints, + FPoint{F: 60, T: 60_000}, + FPoint{F: 60, T: 61_000}, + ) + + expectedResult := Matrix{ + Series{ + Floats: expectedPoints, + Metric: labels.EmptyLabels(), + }, + } + + qry, err := engine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval) + require.NoError(t, err) + + res := qry.Exec(context.Background()) + require.NoError(t, res.Err) + require.Equal(t, expectedResult, res.Value) +} + type FakeQueryLogger struct { closed bool logs []interface{} @@ -2911,7 +2937,6 @@ func TestPreprocessAndWrapWithStepInvariantExpr(t *testing.T) { } func TestEngineOptsValidation(t *testing.T) { - ctx := context.Background() cases := []struct { opts EngineOpts query string @@ -2971,8 +2996,8 @@ func TestEngineOptsValidation(t *testing.T) { for _, c := range cases { eng := NewEngine(c.opts) - _, err1 := eng.NewInstantQuery(ctx, nil, nil, c.query, time.Unix(10, 0)) - _, err2 := eng.NewRangeQuery(ctx, nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) + _, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0)) + _, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) if c.fail { require.Equal(t, c.expError, err1) require.Equal(t, c.expError, err2) @@ -3112,17 +3137,14 @@ func TestRangeQuery(t *testing.T) { } for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - test, err := NewTest(t, c.Load) - require.NoError(t, err) - defer test.Close() - - err = test.Run() - require.NoError(t, err) + engine := newTestEngine() + storage := LoadedStorage(t, c.Load) + t.Cleanup(func() { storage.Close() }) - qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) + qry, err := engine.NewRangeQuery(context.Background(), storage, nil, c.Query, c.Start, c.End, c.Interval) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) require.Equal(t, c.Result, res.Value) }) @@ -3132,27 +3154,24 @@ func TestRangeQuery(t *testing.T) { func TestNativeHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. - test, err := NewTest(t, "") - require.NoError(t, err) - defer test.Close() + engine := newTestEngine() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) seriesName := "sparse_histogram_series" lbls := labels.FromStrings("__name__", seriesName) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) for i, h := range tsdbutil.GenerateTestHistograms(100) { _, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), h, nil) require.NoError(t, err) } require.NoError(t, app.Commit()) - require.NoError(t, test.Run()) - engine := test.QueryEngine() - queryString := fmt.Sprintf("rate(%s[1m])", seriesName) - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() require.NoError(t, err) @@ -3163,7 +3182,7 @@ func TestNativeHistogramRate(t *testing.T) { Schema: 1, ZeroThreshold: 0.001, ZeroCount: 1. / 15., - Count: 8. / 15., + Count: 9. / 15., Sum: 1.226666666666667, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, @@ -3176,27 +3195,24 @@ func TestNativeHistogramRate(t *testing.T) { func TestNativeFloatHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. - test, err := NewTest(t, "") - require.NoError(t, err) - defer test.Close() + engine := newTestEngine() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) seriesName := "sparse_histogram_series" lbls := labels.FromStrings("__name__", seriesName) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) for i, fh := range tsdbutil.GenerateTestFloatHistograms(100) { _, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), nil, fh) require.NoError(t, err) } require.NoError(t, app.Commit()) - require.NoError(t, test.Run()) - engine := test.QueryEngine() - queryString := fmt.Sprintf("rate(%s[1m])", seriesName) - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() require.NoError(t, err) @@ -3207,7 +3223,7 @@ func TestNativeFloatHistogramRate(t *testing.T) { Schema: 1, ZeroThreshold: 0.001, ZeroCount: 1. / 15., - Count: 8. / 15., + Count: 9. / 15., Sum: 1.226666666666667, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, @@ -3239,16 +3255,16 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { } for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { - test, err := NewTest(t, "") - require.NoError(t, err) - t.Cleanup(test.Close) + engine := newTestEngine() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) seriesName := "sparse_histogram_series" lbls := labels.FromStrings("__name__", seriesName) - engine := test.QueryEngine() ts := int64(10 * time.Minute / time.Millisecond) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) + var err error if floatHisto { _, err = app.AppendHistogram(0, lbls, ts, nil, h.ToFloat()) } else { @@ -3258,10 +3274,10 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { require.NoError(t, app.Commit()) queryString := fmt.Sprintf("histogram_count(%s)", seriesName) - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() @@ -3276,10 +3292,10 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { } queryString = fmt.Sprintf("histogram_sum(%s)", seriesName) - qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err = engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) require.NoError(t, err) - res = qry.Exec(test.Context()) + res = qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err = res.Vector() @@ -3296,6 +3312,165 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { } } +func TestNativeHistogram_HistogramStdDevVar(t *testing.T) { + // TODO(codesome): Integrate histograms into the PromQL testing framework + // and write more tests there. + testCases := []struct { + name string + h *histogram.Histogram + stdVar float64 + }{ + { + name: "1, 2, 3, 4 low-res", + h: &histogram.Histogram{ + Count: 4, + Sum: 10, + Schema: 2, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 1}, + {Offset: 3, Length: 1}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{1, 0, 0, 0}, + }, + stdVar: 1.163807968526718, // actual variance: 1.25 + }, + { + name: "1, 2, 3, 4 hi-res", + h: &histogram.Histogram{ + Count: 4, + Sum: 10, + Schema: 8, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 1}, + {Offset: 255, Length: 1}, + {Offset: 149, Length: 1}, + {Offset: 105, Length: 1}, + }, + PositiveBuckets: []int64{1, 0, 0, 0}, + }, + stdVar: 1.2471347737158793, // actual variance: 1.25 + }, + { + name: "-50, -8, 0, 3, 8, 9, 100", + h: &histogram.Histogram{ + Count: 7, + ZeroCount: 1, + Sum: 62, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 13, Length: 1}, + {Offset: 10, Length: 1}, + {Offset: 1, Length: 1}, + {Offset: 27, Length: 1}, + }, + PositiveBuckets: []int64{1, 0, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 24, Length: 1}, + {Offset: 21, Length: 1}, + }, + NegativeBuckets: []int64{1, 0}, + }, + stdVar: 1544.8582535368798, // actual variance: 1738.4082 + }, + { + name: "-50, -8, 0, 3, 8, 9, 100, NaN", + h: &histogram.Histogram{ + Count: 8, + ZeroCount: 1, + Sum: math.NaN(), + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 13, Length: 1}, + {Offset: 10, Length: 1}, + {Offset: 1, Length: 1}, + {Offset: 27, Length: 1}, + }, + PositiveBuckets: []int64{1, 0, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 24, Length: 1}, + {Offset: 21, Length: 1}, + }, + NegativeBuckets: []int64{1, 0}, + }, + stdVar: math.NaN(), + }, + { + name: "-50, -8, 0, 3, 8, 9, 100, +Inf", + h: &histogram.Histogram{ + Count: 8, + ZeroCount: 1, + Sum: math.Inf(1), + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 13, Length: 1}, + {Offset: 10, Length: 1}, + {Offset: 1, Length: 1}, + {Offset: 27, Length: 1}, + }, + PositiveBuckets: []int64{1, 0, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 24, Length: 1}, + {Offset: 21, Length: 1}, + }, + NegativeBuckets: []int64{1, 0}, + }, + stdVar: math.NaN(), + }, + } + for _, tc := range testCases { + for _, floatHisto := range []bool{true, false} { + t.Run(fmt.Sprintf("%s floatHistogram=%t", tc.name, floatHisto), func(t *testing.T) { + engine := newTestEngine() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + seriesName := "sparse_histogram_series" + lbls := labels.FromStrings("__name__", seriesName) + + ts := int64(10 * time.Minute / time.Millisecond) + app := storage.Appender(context.Background()) + var err error + if floatHisto { + _, err = app.AppendHistogram(0, lbls, ts, nil, tc.h.ToFloat()) + } else { + _, err = app.AppendHistogram(0, lbls, ts, tc.h, nil) + } + require.NoError(t, err) + require.NoError(t, app.Commit()) + + queryString := fmt.Sprintf("histogram_stdvar(%s)", seriesName) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) + require.NoError(t, err) + + res := qry.Exec(context.Background()) + require.NoError(t, res.Err) + + vector, err := res.Vector() + require.NoError(t, err) + + require.Len(t, vector, 1) + require.Nil(t, vector[0].H) + require.InEpsilon(t, tc.stdVar, vector[0].F, 1e-12) + + queryString = fmt.Sprintf("histogram_stddev(%s)", seriesName) + qry, err = engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) + require.NoError(t, err) + + res = qry.Exec(context.Background()) + require.NoError(t, res.Err) + + vector, err = res.Vector() + require.NoError(t, err) + + require.Len(t, vector, 1) + require.Nil(t, vector[0].H) + require.InEpsilon(t, math.Sqrt(tc.stdVar), vector[0].F, 1e-12) + }) + } + } +} + func TestNativeHistogram_HistogramQuantile(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. @@ -3489,18 +3664,18 @@ func TestNativeHistogram_HistogramQuantile(t *testing.T) { }, } - test, err := NewTest(t, "") - require.NoError(t, err) - t.Cleanup(test.Close) + engine := newTestEngine() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) idx := int64(0) for _, floatHisto := range []bool{true, false} { for _, c := range cases { t.Run(fmt.Sprintf("%s floatHistogram=%t", c.text, floatHisto), func(t *testing.T) { seriesName := "sparse_histogram_series" lbls := labels.FromStrings("__name__", seriesName) - engine := test.QueryEngine() ts := idx * int64(10*time.Minute/time.Millisecond) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) + var err error if floatHisto { _, err = app.AppendHistogram(0, lbls, ts, nil, c.h.ToFloat()) } else { @@ -3512,10 +3687,10 @@ func TestNativeHistogram_HistogramQuantile(t *testing.T) { for j, sc := range c.subCases { t.Run(fmt.Sprintf("%d %s", j, sc.quantile), func(t *testing.T) { queryString := fmt.Sprintf("histogram_quantile(%s, %s)", sc.quantile, seriesName) - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() @@ -3922,16 +4097,16 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) { for _, floatHisto := range []bool{true, false} { for _, c := range cases { t.Run(fmt.Sprintf("%s floatHistogram=%t", c.text, floatHisto), func(t *testing.T) { - test, err := NewTest(t, "") - require.NoError(t, err) - t.Cleanup(test.Close) + engine := newTestEngine() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) seriesName := "sparse_histogram_series" lbls := labels.FromStrings("__name__", seriesName) - engine := test.QueryEngine() ts := idx * int64(10*time.Minute/time.Millisecond) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) + var err error if floatHisto { _, err = app.AppendHistogram(0, lbls, ts, nil, c.h.ToFloat()) } else { @@ -3943,10 +4118,10 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) { for j, sc := range c.subCases { t.Run(fmt.Sprintf("%d %s %s", j, sc.lower, sc.upper), func(t *testing.T) { queryString := fmt.Sprintf("histogram_fraction(%s, %s, %s)", sc.lower, sc.upper, seriesName) - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() @@ -3980,7 +4155,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { { CounterResetHint: histogram.GaugeType, Schema: 0, - Count: 21, + Count: 25, Sum: 1234.5, ZeroThreshold: 0.001, ZeroCount: 4, @@ -3998,7 +4173,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { { CounterResetHint: histogram.GaugeType, Schema: 0, - Count: 36, + Count: 41, Sum: 2345.6, ZeroThreshold: 0.001, ZeroCount: 5, @@ -4018,7 +4193,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { { CounterResetHint: histogram.GaugeType, Schema: 0, - Count: 36, + Count: 41, Sum: 1111.1, ZeroThreshold: 0.001, ZeroCount: 5, @@ -4045,7 +4220,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { Schema: 0, ZeroThreshold: 0.001, ZeroCount: 14, - Count: 93, + Count: 107, Sum: 4691.2, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 7}, @@ -4062,7 +4237,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { Schema: 0, ZeroThreshold: 0.001, ZeroCount: 3.5, - Count: 23.25, + Count: 26.75, Sum: 1172.8, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 7}, @@ -4081,20 +4256,20 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { for _, c := range cases { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) { - test, err := NewTest(t, "") - require.NoError(t, err) - t.Cleanup(test.Close) + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) seriesName := "sparse_histogram_series" seriesNameOverTime := "sparse_histogram_series_over_time" - engine := test.QueryEngine() + engine := newTestEngine() ts := idx0 * int64(10*time.Minute/time.Millisecond) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) for idx1, h := range c.histograms { lbls := labels.FromStrings("__name__", seriesName, "idx", fmt.Sprintf("%d", idx1)) // Since we mutate h later, we need to create a copy here. + var err error if floatHisto { _, err = app.AppendHistogram(0, lbls, ts, nil, h.Copy().ToFloat()) } else { @@ -4115,10 +4290,10 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { require.NoError(t, app.Commit()) queryAndCheck := func(queryString string, ts int64, exp Vector) { - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() @@ -4173,7 +4348,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { histograms: []histogram.Histogram{ { Schema: 0, - Count: 36, + Count: 41, Sum: 2345.6, ZeroThreshold: 0.001, ZeroCount: 5, @@ -4208,7 +4383,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { }, expected: histogram.FloatHistogram{ Schema: 0, - Count: 25, + Count: 30, Sum: 1111.1, ZeroThreshold: 0.001, ZeroCount: 2, @@ -4229,7 +4404,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { histograms: []histogram.Histogram{ { Schema: 0, - Count: 36, + Count: 41, Sum: 2345.6, ZeroThreshold: 0.001, ZeroCount: 5, @@ -4264,7 +4439,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { }, expected: histogram.FloatHistogram{ Schema: 0, - Count: 25, + Count: 30, Sum: 1111.1, ZeroThreshold: 0.001, ZeroCount: 2, @@ -4299,7 +4474,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { }, { Schema: 0, - Count: 36, + Count: 41, Sum: 2345.6, ZeroThreshold: 0.001, ZeroCount: 5, @@ -4319,7 +4494,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { }, expected: histogram.FloatHistogram{ Schema: 0, - Count: -25, + Count: -30, Sum: -1111.1, ZeroThreshold: 0.001, ZeroCount: -2, @@ -4341,19 +4516,18 @@ func TestNativeHistogram_SubOperator(t *testing.T) { for _, c := range cases { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) { - test, err := NewTest(t, "") - require.NoError(t, err) - t.Cleanup(test.Close) + engine := newTestEngine() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) seriesName := "sparse_histogram_series" - engine := test.QueryEngine() - ts := idx0 * int64(10*time.Minute/time.Millisecond) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) for idx1, h := range c.histograms { lbls := labels.FromStrings("__name__", seriesName, "idx", fmt.Sprintf("%d", idx1)) // Since we mutate h later, we need to create a copy here. + var err error if floatHisto { _, err = app.AppendHistogram(0, lbls, ts, nil, h.Copy().ToFloat()) } else { @@ -4364,15 +4538,25 @@ func TestNativeHistogram_SubOperator(t *testing.T) { require.NoError(t, app.Commit()) queryAndCheck := func(queryString string, exp Vector) { - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() require.NoError(t, err) + if len(vector) == len(exp) { + for i, e := range exp { + got := vector[i].H + if got != e.H { + // Error messages are better if we compare structs, not pointers. + require.Equal(t, *e.H, *got) + } + } + } + require.Equal(t, exp, vector) } @@ -4383,8 +4567,8 @@ func TestNativeHistogram_SubOperator(t *testing.T) { } queryAndCheck(queryString, []Sample{{T: ts, H: &c.expected, Metric: labels.EmptyLabels()}}) }) - idx0++ } + idx0++ } } @@ -4487,20 +4671,20 @@ func TestNativeHistogram_MulDivOperator(t *testing.T) { for _, c := range cases { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) { - test, err := NewTest(t, "") - require.NoError(t, err) - t.Cleanup(test.Close) + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) seriesName := "sparse_histogram_series" floatSeriesName := "float_series" - engine := test.QueryEngine() + engine := newTestEngine() ts := idx0 * int64(10*time.Minute/time.Millisecond) - app := test.Storage().Appender(context.TODO()) + app := storage.Appender(context.Background()) h := c.histogram lbls := labels.FromStrings("__name__", seriesName) // Since we mutate h later, we need to create a copy here. + var err error if floatHisto { _, err = app.AppendHistogram(0, lbls, ts, nil, h.Copy().ToFloat()) } else { @@ -4512,10 +4696,10 @@ func TestNativeHistogram_MulDivOperator(t *testing.T) { require.NoError(t, app.Commit()) queryAndCheck := func(queryString string, exp Vector) { - qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(ts)) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vector, err := res.Vector() @@ -4616,22 +4800,18 @@ metric 0 1 2 for _, c := range cases { c := c t.Run(c.name, func(t *testing.T) { - test, err := NewTest(t, load) - require.NoError(t, err) - defer test.Close() - - err = test.Run() - require.NoError(t, err) + engine := newTestEngine() + storage := LoadedStorage(t, load) + t.Cleanup(func() { storage.Close() }) - eng := test.QueryEngine() if c.engineLookback != 0 { - eng.lookbackDelta = c.engineLookback + engine.lookbackDelta = c.engineLookback } opts := NewPrometheusQueryOpts(false, c.queryLookback) - qry, err := eng.NewInstantQuery(test.context, test.Queryable(), opts, query, c.ts) + qry, err := engine.NewInstantQuery(context.Background(), storage, opts, query, c.ts) require.NoError(t, err) - res := qry.Exec(test.Context()) + res := qry.Exec(context.Background()) require.NoError(t, res.Err) vec, ok := res.Value.(Vector) require.True(t, ok) diff --git a/promql/functions.go b/promql/functions.go index 96bffab96d3..5c39d6bd8a1 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -996,6 +996,72 @@ func funcHistogramSum(vals []parser.Value, args parser.Expressions, enh *EvalNod return enh.Out } +// === histogram_stddev(Vector parser.ValueTypeVector) Vector === +func funcHistogramStdDev(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + inVec := vals[0].(Vector) + + for _, sample := range inVec { + // Skip non-histogram samples. + if sample.H == nil { + continue + } + mean := sample.H.Sum / sample.H.Count + var variance, cVariance float64 + it := sample.H.AllBucketIterator() + for it.Next() { + bucket := it.At() + var val float64 + if bucket.Lower <= 0 && 0 <= bucket.Upper { + val = 0 + } else { + val = math.Sqrt(bucket.Upper * bucket.Lower) + } + delta := val - mean + variance, cVariance = kahanSumInc(bucket.Count*delta*delta, variance, cVariance) + } + variance += cVariance + variance /= sample.H.Count + enh.Out = append(enh.Out, Sample{ + Metric: enh.DropMetricName(sample.Metric), + F: math.Sqrt(variance), + }) + } + return enh.Out +} + +// === histogram_stdvar(Vector parser.ValueTypeVector) Vector === +func funcHistogramStdVar(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + inVec := vals[0].(Vector) + + for _, sample := range inVec { + // Skip non-histogram samples. + if sample.H == nil { + continue + } + mean := sample.H.Sum / sample.H.Count + var variance, cVariance float64 + it := sample.H.AllBucketIterator() + for it.Next() { + bucket := it.At() + var val float64 + if bucket.Lower <= 0 && 0 <= bucket.Upper { + val = 0 + } else { + val = math.Sqrt(bucket.Upper * bucket.Lower) + } + delta := val - mean + variance, cVariance = kahanSumInc(bucket.Count*delta*delta, variance, cVariance) + } + variance += cVariance + variance /= sample.H.Count + enh.Out = append(enh.Out, Sample{ + Metric: enh.DropMetricName(sample.Metric), + F: variance, + }) + } + return enh.Out +} + // === histogram_fraction(lower, upper parser.ValueTypeScalar, Vector parser.ValueTypeVector) Vector === func funcHistogramFraction(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { lower := vals[0].(Vector)[0].F @@ -1377,6 +1443,8 @@ var FunctionCalls = map[string]FunctionCall{ "histogram_fraction": funcHistogramFraction, "histogram_quantile": funcHistogramQuantile, "histogram_sum": funcHistogramSum, + "histogram_stddev": funcHistogramStdDev, + "histogram_stdvar": funcHistogramStdVar, "holt_winters": funcHoltWinters, "hour": funcHour, "idelta": funcIdelta, diff --git a/promql/parser/functions.go b/promql/parser/functions.go index 479c7f635d3..45a30219e65 100644 --- a/promql/parser/functions.go +++ b/promql/parser/functions.go @@ -173,6 +173,16 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeVector}, ReturnType: ValueTypeVector, }, + "histogram_stddev": { + Name: "histogram_stddev", + ArgTypes: []ValueType{ValueTypeVector}, + ReturnType: ValueTypeVector, + }, + "histogram_stdvar": { + Name: "histogram_stdvar", + ArgTypes: []ValueType{ValueTypeVector}, + ReturnType: ValueTypeVector, + }, "histogram_fraction": { Name: "histogram_fraction", ArgTypes: []ValueType{ValueTypeScalar, ValueTypeScalar, ValueTypeVector}, diff --git a/promql/parser/generated_parser.y b/promql/parser/generated_parser.y index b28e9d544c4..f7951db2b08 100644 --- a/promql/parser/generated_parser.y +++ b/promql/parser/generated_parser.y @@ -21,23 +21,28 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/model/histogram" ) %} %union { - node Node - item Item - matchers []*labels.Matcher - matcher *labels.Matcher - label labels.Label - labels labels.Labels - lblList []labels.Label - strings []string - series []SequenceValue - uint uint64 - float float64 - duration time.Duration + node Node + item Item + matchers []*labels.Matcher + matcher *labels.Matcher + label labels.Label + labels labels.Labels + lblList []labels.Label + strings []string + series []SequenceValue + histogram *histogram.FloatHistogram + descriptors map[string]interface{} + bucket_set []float64 + int int64 + uint uint64 + float float64 + duration time.Duration } @@ -54,6 +59,8 @@ IDENTIFIER LEFT_BRACE LEFT_BRACKET LEFT_PAREN +OPEN_HIST +CLOSE_HIST METRIC_IDENTIFIER NUMBER RIGHT_BRACE @@ -64,6 +71,20 @@ SPACE STRING TIMES +// Histogram Descriptors. +%token histogramDescStart +%token +SUM_DESC +COUNT_DESC +SCHEMA_DESC +OFFSET_DESC +NEGATIVE_OFFSET_DESC +BUCKETS_DESC +NEGATIVE_BUCKETS_DESC +ZERO_BUCKET_DESC +ZERO_BUCKET_WIDTH_DESC +%token histogramDescEnd + // Operators. %token operatorsStart %token @@ -145,6 +166,10 @@ START_METRIC_SELECTOR %type