Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre release #49

Merged
merged 11 commits into from
Jan 19, 2024
53 changes: 53 additions & 0 deletions bench/pyroscope_pipeline_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pyroscopereceiver

import (
"path/filepath"
"testing"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/testclient"
"github.com/stretchr/testify/assert"
)

type request struct {
urlParams map[string]string
jfr string
}

// Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse).
// Adjust collectorAddr to bench a your target if needed.
// Example: GOMAXPROCS=1 go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6
func BenchmarkPyroscopePipeline(b *testing.B) {
dist := []request{
{
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
"sampleRate": "100",
},
jfr: filepath.Join("..", "receiver", "pyroscopereceiver", "testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
},
{
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
},
jfr: filepath.Join("..", "receiver", "pyroscopereceiver", "testdata", "memory_alloc_live_example.jfr"),
},
}
collectorAddr := "http://0.0.0.0:8062"

b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
j := 0
for pb.Next() {
err := testclient.Ingest(collectorAddr, dist[j].urlParams, dist[j].jfr)
assert.NoError(b, err, "failed to ingest")
j = (j + 1) % len(dist)
}
})
}
1 change: 1 addition & 0 deletions exporter/clickhouseprofileexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func initMetrics(meter metric.Meter) error {
if otelcolExporterClickhouseProfileFlushTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "flush_time_millis"),
metric.WithDescription("Clickhouse profile exporter flush time in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions receiver/pyroscopereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op

- `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6.
- `timeout`: sets the server reponse timeout. Default is 10 seconds.
- `request_body_uncompressed_size_bytes`: sets the expected value for uncompressed request body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.
- `parsed_body_uncompressed_size_bytes`: sets the expected value for uncompressed parsed body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.

## Example

Expand Down
12 changes: 0 additions & 12 deletions receiver/pyroscopereceiver/buf/prepare.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"compress/gzip"
"fmt"
"io"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
)

type codec uint8
Expand All @@ -17,15 +15,13 @@ const (

// Decodes compressed streams
type Decompressor struct {
uncompressedSizeBytes int64
maxUncompressedSizeBytes int64
decoders map[codec]func(body io.Reader) (io.Reader, error)
}

// Creates a new decompressor
func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64) *Decompressor {
func NewDecompressor(maxUncompressedSizeBytes int64) *Decompressor {
return &Decompressor{
uncompressedSizeBytes: uncompressedSizeBytes,
maxUncompressedSizeBytes: maxUncompressedSizeBytes,
decoders: map[codec]func(r io.Reader) (io.Reader, error){
Gzip: func(r io.Reader) (io.Reader, error) {
Expand All @@ -39,36 +35,34 @@ func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64
}
}

func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) {
buf := buf.PrepareBuffer(d.uncompressedSizeBytes)

func (d *Decompressor) readBytes(r io.Reader, out *bytes.Buffer) error {
// read max+1 to validate size via a single Read()
lr := io.LimitReader(r, d.maxUncompressedSizeBytes+1)

n, err := buf.ReadFrom(lr)
n, err := out.ReadFrom(lr)
if err != nil {
return nil, err
return err
}
if n < 1 {
return nil, fmt.Errorf("empty profile")
return fmt.Errorf("empty profile")
}
if n > d.maxUncompressedSizeBytes {
return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes)
return fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes)
}
return buf, nil
return nil
}

// Decodes the accepted reader, applying the configured size limit to avoid oom by compression bomb
func (d *Decompressor) Decompress(r io.Reader, c codec) (*bytes.Buffer, error) {
func (d *Decompressor) Decompress(r io.Reader, c codec, out *bytes.Buffer) error {
decoder, ok := d.decoders[c]
if !ok {
return nil, fmt.Errorf("unsupported encoding")
return fmt.Errorf("unsupported encoding")
}

dr, err := decoder(r)
if err != nil {
return nil, err
return err
}

return d.readBytes(dr)
return d.readBytes(dr, out)
}
15 changes: 0 additions & 15 deletions receiver/pyroscopereceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ type Config struct {

// Cofigures timeout for synchronous request handling by the receiver server
Timeout time.Duration `mapstructure:"timeout"`
// Configures the expected value for uncompressed request body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
RequestBodyUncompressedSizeBytes int64 `mapstructure:"request_body_uncompressed_size_bytes"`
// Configures the expected value for uncompressed parsed body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
ParsedBodyUncompressedSizeBytes int64 `mapstructure:"parsed_body_uncompressed_size_bytes"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -38,14 +32,5 @@ func (cfg *Config) Validate() error {
if cfg.Protocols.Http.MaxRequestBodySize < 1 {
return fmt.Errorf("max_request_body_size must be positive")
}
if cfg.RequestBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("request_body_uncompressed_size_bytes must be positive")
}
if cfg.RequestBodyUncompressedSizeBytes > cfg.Protocols.Http.MaxRequestBodySize {
return fmt.Errorf("expected value cannot be greater than max")
}
if cfg.ParsedBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("parsed_body_uncompressed_size_bytes must be positive")
}
return nil
}
12 changes: 4 additions & 8 deletions receiver/pyroscopereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (
const (
typeStr = "pyroscopereceiver"

defaultHttpAddr = "0.0.0.0:8062"
defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata
defaultTimeout = 10 * time.Second
defaultRequestBodyUncompressedSizeBytesExpectedValue = 0
defaultParsedBodyUncompressedSizeBytesExpectedValue = 0
defaultHttpAddr = "0.0.0.0:8062"
defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata
defaultTimeout = 10 * time.Second
)

func createDefaultConfig() component.Config {
Expand All @@ -28,9 +26,7 @@ func createDefaultConfig() component.Config {
MaxRequestBodySize: defaultMaxRequestBodySize,
},
},
Timeout: defaultTimeout,
RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue,
ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue,
Timeout: defaultTimeout,
}
}

Expand Down
5 changes: 2 additions & 3 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
pprof_proto "github.com/google/pprof/profile"
jfr_parser "github.com/grafana/jfr-parser/parser"
jfr_types "github.com/grafana/jfr-parser/parser/types"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

Expand Down Expand Up @@ -55,7 +54,7 @@ func NewJfrPprofParser() *jfrPprofParser {
}

// Parses the jfr buffer into pprof. The returned slice may be empty without an error.
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) {
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) {
var (
period int64
event string
Expand Down Expand Up @@ -115,7 +114,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, pa
for _, pr := range pa.proftab {
if nil != pr {
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = buf.PrepareBuffer(parsedBodyUncompressedSizeBytes)
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)
ps = append(ps, pr.prof)
}
Expand Down
3 changes: 3 additions & 0 deletions receiver/pyroscopereceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@ func initMetrics(meter metric.Meter) error {
if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"),
metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576),
); err != nil {
return err
}
if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"),
metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576),
); err != nil {
return err
}
if otelcolReceiverPyroscopeHttpResponseTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "http_response_time_millis"),
metric.WithDescription("Pyroscope receiver http response time in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
}
Expand Down
49 changes: 49 additions & 0 deletions receiver/pyroscopereceiver/pool_alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pyroscopereceiver

import (
"bytes"
"compress/gzip"
"os"
"path/filepath"
"sync"
"testing"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
"github.com/stretchr/testify/assert"
)

func TestAllocDecompress(t *testing.T) {
dist := []string{
filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
filepath.Join("testdata", "memory_alloc_live_example.jfr"),
}
compressed := []*bytes.Buffer{
loadCompressed(t, dist[0]),
loadCompressed(t, dist[1]),
}
d := compress.NewDecompressor(1024 * 1024 * 1024)
j := 0
p := &sync.Pool{}

n := testing.AllocsPerRun(100, func() {
buf := acquireBuf(p)
d.Decompress(compressed[j], compress.Gzip, buf)
releaseBuf(p, buf)
j = (j + 1) % len(dist)
})
t.Logf("\naverage alloc count: %f", n)
}

func loadCompressed(t *testing.T, jfr string) *bytes.Buffer {
uncompressed, err := os.ReadFile(jfr)
if err != nil {
assert.NoError(t, err, "failed to load jfr")
}
compressed := new(bytes.Buffer)
gw := gzip.NewWriter(compressed)
if _, err := gw.Write(uncompressed); err != nil {
assert.NoError(t, err, "failed to compress jfr")
}
gw.Close()
return compressed
}
42 changes: 32 additions & 10 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ type pyroscopeReceiver struct {
decompressor *compress.Decompressor
httpServer *http.Server
shutdownWg sync.WaitGroup

uncompressedBufPool *sync.Pool
}

type parser interface {
// Parses the given input buffer into the collector's profile IR
Parse(buf *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error)
Parse(buf *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error)
}

type params struct {
Expand All @@ -72,13 +74,14 @@ type params struct {

func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.CreateSettings) (*pyroscopeReceiver, error) {
recv := &pyroscopeReceiver{
cfg: cfg,
set: set,
logger: set.Logger,
meter: set.MeterProvider.Meter(typeStr),
next: consumer,
cfg: cfg,
set: set,
logger: set.Logger,
meter: set.MeterProvider.Meter(typeStr),
next: consumer,
uncompressedBufPool: &sync.Pool{},
}
recv.decompressor = compress.NewDecompressor(recv.cfg.RequestBodyUncompressedSizeBytes, recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.decompressor = compress.NewDecompressor(recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.httpMux = http.NewServeMux()
recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) {
recv.httpHandlerIngest(resp, req)
Expand Down Expand Up @@ -151,7 +154,7 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri
}

otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().Unix()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().UnixMilli()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
writeResponseNoContent(resp)
}()
return c
Expand Down Expand Up @@ -221,6 +224,20 @@ func newOtelcolAttrSetHttp(service string, errorCode string) *attribute.Set {
return &s
}

func acquireBuf(p *sync.Pool) *bytes.Buffer {
v := p.Get()
if v == nil {
v = new(bytes.Buffer)
}
buf := v.(*bytes.Buffer)
return buf
}

func releaseBuf(p *sync.Pool, buf *bytes.Buffer) {
buf.Reset()
p.Put(buf)
}

func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) {
var (
tmp []string
Expand All @@ -243,7 +260,12 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
}
defer f.Close()

buf, err := recv.decompressor.Decompress(f, compress.Gzip)
buf := acquireBuf(recv.uncompressedBufPool)
defer func() {
releaseBuf(recv.uncompressedBufPool, buf)
}()

err = recv.decompressor.Decompress(f, compress.Gzip, buf)
if err != nil {
return logs, fmt.Errorf("failed to decompress body: %w", err)
}
Expand All @@ -261,7 +283,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
md.SampleRateHertz = hz
}

ps, err := pa.Parse(buf, md, recv.cfg.ParsedBodyUncompressedSizeBytes)
ps, err := pa.Parse(buf, md)
if err != nil {
return logs, fmt.Errorf("failed to parse pprof: %w", err)
}
Expand Down
Loading
Loading