Skip to content

Commit

Permalink
Merge pull request #15 from vinted/query_add_zstd_support
Browse files Browse the repository at this point in the history
query: add zstd/snappy compression support
  • Loading branch information
GiedriusS authored Aug 22, 2022
2 parents 584920c + 8b9b5a0 commit 2dac4f0
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 4 deletions.
4 changes: 4 additions & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ import (
"go.uber.org/automaxprocs/maxprocs"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/mostynb/go-grpc-compression/zstd"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/tracing/client"
)

// To register the encoder.
var _ = zstd.Name

func main() {
// We use mmaped resources in most of the components so hardcode PanicOnFault to true. This allows us to recover (if we can e.g if queries
// are temporarily accessing unmapped memory).
Expand Down
12 changes: 12 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/log/level"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags"
"github.com/mostynb/go-grpc-compression/zstd"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -26,6 +27,8 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/extgrpc/snappy"
"google.golang.org/grpc"

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -59,6 +62,7 @@ const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
queryPushdown = "query-pushdown"
compressionNone = "none"
)

// registerQuery registers a query command.
Expand All @@ -75,6 +79,9 @@ func registerQuery(app *extkingpin.App) {
key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String()
caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String()
serverName := cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String()
compressionOptions := strings.Join([]string{snappy.Name, zstd.Name, compressionNone}, ", ")

grpcCompression := cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other receivers. Must be one of: "+compressionOptions).Default(compressionNone).Enum(snappy.Name, zstd.Name, compressionNone)

webRoutePrefix := cmd.Flag("web.route-prefix", "Prefix for API and UI endpoints. This allows thanos UI to be served on a sub-path. Defaults to the value of --web.external-prefix. This option is analogous to --web.route-prefix of Prometheus.").Default("").String()
webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
Expand Down Expand Up @@ -236,6 +243,7 @@ func registerQuery(app *extkingpin.App) {
*grpcKey,
*grpcClientCA,
*grpcMaxConnAge,
*grpcCompression,
*secure,
*skipVerify,
*cert,
Expand Down Expand Up @@ -303,6 +311,7 @@ func runQuery(
grpcKey string,
grpcClientCA string,
grpcMaxConnAge time.Duration,
grpcCompression string,
secure bool,
skipVerify bool,
cert string,
Expand Down Expand Up @@ -368,6 +377,9 @@ func runQuery(
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
if grpcCompression != compressionNone {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression)))
}

fileSDCache := cache.New()
dnsStoreProvider := dns.NewProvider(
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.4
github.com/jpillora/backoff v1.0.0
github.com/klauspost/compress v1.13.6
github.com/klauspost/compress v1.15.9
github.com/leanovate/gopter v0.2.4
github.com/lightstep/lightstep-tracer-go v0.18.1
github.com/lovoo/gcloud-opentracing v0.3.0
Expand Down Expand Up @@ -99,7 +99,10 @@ require (

require github.com/viney-shih/go-cache v1.1.4

require github.com/rueian/rueidis v0.0.65
require (
github.com/mostynb/go-grpc-compression v1.1.17
github.com/rueian/rueidis v0.0.65
)

require (
cloud.google.com/go v0.99.0 // indirect
Expand Down
12 changes: 10 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB
github.com/franela/goblin v0.0.0-20210519012713-85d372ac71e2/go.mod h1:VzmDKDJVZI3aJmnRI9VjAn9nJ8qPPsN1fqzr9dqInIo=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
Expand Down Expand Up @@ -1208,8 +1209,9 @@ github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
Expand All @@ -1225,8 +1227,9 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.0.0/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
Expand Down Expand Up @@ -1381,6 +1384,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/mostynb/go-grpc-compression v1.1.17 h1:N9t6taOJN3mNTTi0wDf4e3lp/G/ON1TP67Pn0vTUA9I=
github.com/mostynb/go-grpc-compression v1.1.17/go.mod h1:FUSBr0QjKqQgoDG/e0yiqlR6aqyXC39+g/hFLDfSsEY=
github.com/mozillazg/go-cos v0.13.0/go.mod h1:Zp6DvvXn0RUOXGJ2chmWt2bLEqRAnJnS3DnAZsJsoaE=
github.com/mozillazg/go-httpheader v0.2.1 h1:geV7TrjbL8KXSyvghnFm+NyTux/hxwueTSrwhe88TQQ=
github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60=
Expand Down Expand Up @@ -1523,6 +1528,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -1639,6 +1645,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM=
Expand Down
90 changes: 90 additions & 0 deletions pkg/extgrpc/snappy/snappy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package snappy

import (
"io"
"sync"

"github.com/klauspost/compress/snappy"
"google.golang.org/grpc/encoding"
)

// Name is the name registered for the snappy compressor.
const Name = "snappy"

func init() {
encoding.RegisterCompressor(newCompressor())
}

type compressor struct {
writersPool sync.Pool
readersPool sync.Pool
}

func newCompressor() *compressor {
c := &compressor{}
c.readersPool = sync.Pool{
New: func() interface{} {
return snappy.NewReader(nil)
},
}
c.writersPool = sync.Pool{
New: func() interface{} {
return snappy.NewBufferedWriter(nil)
},
}
return c
}

func (c *compressor) Name() string {
return Name
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
wr := c.writersPool.Get().(*snappy.Writer)
wr.Reset(w)
return writeCloser{wr, &c.writersPool}, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
return reader{dr, &c.readersPool}, nil
}

type writeCloser struct {
writer *snappy.Writer
pool *sync.Pool
}

func (w writeCloser) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}

func (w writeCloser) Close() error {
defer func() {
w.writer.Reset(nil)
w.pool.Put(w.writer)
}()

if w.writer != nil {
return w.writer.Close()
}
return nil
}

type reader struct {
reader *snappy.Reader
pool *sync.Pool
}

func (r reader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if err == io.EOF {
r.reader.Reset(nil)
r.pool.Put(r.reader)
}
return n, err
}

0 comments on commit 2dac4f0

Please sign in to comment.