Skip to content

Commit

Permalink
Merge pull request #97 from vinted/pull_worker_change
Browse files Browse the repository at this point in the history
Pull worker change
  • Loading branch information
GiedriusS authored Apr 12, 2024
2 parents bba1b1b + 0ec44c1 commit 1663138
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 114 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early
- [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction
- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred.
- [#7122](https://github.com/thanos-io/thanos/pull/7122) Store Gateway: Fix lazy expanded postings estimate base cardinality using posting group with remove keys.
- [#7224](https://github.com/thanos-io/thanos/pull/7224) Query-frontend: Add Redis username to the client configuration.
- [#7220](https://github.com/thanos-io/thanos/pull/7220) Store Gateway: Fix lazy expanded postings caching partial expanded postings and bug of estimating remove postings with non existent value. Added `PromQLSmith` based fuzz test to improve correctness.
- [#7244](https://github.com/thanos-io/thanos/pull/7244) Query: Fix Internal Server Error unknown targetHealth: "unknown" when trying to open the targets page.
- [#7248](https://github.com/thanos-io/thanos/pull/7248) Receive: Fix RemoteWriteAsync was sequentially executed causing high latency in the ingestion path.

### Added
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
Expand Down
37 changes: 19 additions & 18 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,24 +237,25 @@ func runReceive(
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
})

grpcProbe := prober.NewGRPC()
Expand Down
73 changes: 73 additions & 0 deletions pkg/pool/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"context"
"sync"
)

// Work is a unit of item to be worked on, like Java Runnable.
type Work func()

// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool.
type WorkerPool interface {
// Init initializes the worker pool.
Init()

// Go waits until the next worker becomes available and executes the given work.
Go(work Work)

// Close cancels all workers and waits for them to finish.
Close()

// Size returns the number of workers in the pool.
Size() int
}

type workerPool struct {
sync.Once
workCh chan Work
cancel context.CancelFunc
}

func NewWorkerPool(workers uint) WorkerPool {
return &workerPool{
workCh: make(chan Work, workers),
}
}

func (p *workerPool) Init() {
p.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

for i := 0; i < p.Size(); i++ {
go func() {
for {
select {
case <-ctx.Done():
// TODO: exhaust workCh before exit
return
case work := <-p.workCh:
work()
}
}
}()
}
})
}

func (p *workerPool) Go(work Work) {
p.Init()
p.workCh <- work
}

func (p *workerPool) Close() {
p.cancel()
}

func (p *workerPool) Size() int {
return cap(p.workCh)
}
35 changes: 35 additions & 0 deletions pkg/pool/worker_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestGo(t *testing.T) {
var expectedWorksDone uint32
var workerPoolSize uint
var mu sync.Mutex
workerPoolSize = 5
p := NewWorkerPool(workerPoolSize)
p.Init()
defer p.Close()

var wg sync.WaitGroup
for i := 0; i < int(workerPoolSize*3); i++ {
wg.Add(1)
p.Go(func() {
mu.Lock()
defer mu.Unlock()
expectedWorksDone++
wg.Done()
})
}
wg.Wait()
require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone)
require.Equal(t, int(workerPoolSize), p.Size())
}
127 changes: 31 additions & 96 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"

extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -144,6 +145,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
if workers == 0 {
workers = 1
}
level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers)

h := &Handler{
logger: logger,
Expand Down Expand Up @@ -1266,90 +1268,23 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors {
return errs
}

func (pw *peerWorker) initWorkers() {
pw.initWorkersOnce.Do(func() {
work := make(chan peerWorkItem)
pw.work = work

ctx, cancel := context.WithCancel(context.Background())
pw.turnOffGoroutines = cancel

for i := 0; i < int(pw.asyncWorkerCount); i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case w := <-work:
pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds())

tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req)
w.workResult <- peerWorkResponse{
er: w.er,
err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint),
}
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
close(w.workResult)
}, opentracing.Tags{
"endpoint": w.er.endpoint,
"replica": w.er.replica,
})

}
}
}()
}

})
}

func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker {
return &peerWorker{
cc: cc,
asyncWorkerCount: asyncWorkerCount,
forwardDelay: forwardDelay,
cc: cc,
wp: pool.NewWorkerPool(asyncWorkerCount),
forwardDelay: forwardDelay,
}
}

type peerWorkItem struct {
cc *grpc.ClientConn
req *storepb.WriteRequest
workItemCtx context.Context

workResult chan peerWorkResponse
er endpointReplica
sendTime time.Time
}

func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
pw.initWorkers()

w := peerWorkItem{
cc: pw.cc,
req: in,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
sendTime: time.Now(),
}

pw.work <- w
return nil, (<-w.workResult).err
return storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in)
}

type peerWorker struct {
cc *grpc.ClientConn
wp pool.WorkerPool

work chan peerWorkItem
turnOffGoroutines func()

initWorkersOnce sync.Once
asyncWorkerCount uint
forwardDelay prometheus.Histogram
forwardDelay prometheus.Histogram
}

func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
Expand All @@ -1373,29 +1308,29 @@ type peersContainer interface {
reset()
}

type peerWorkResponse struct {
er endpointReplica
err error
}

func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) {
p.initWorkers()

w := peerWorkItem{
cc: p.cc,
req: req,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
er: er,

sendTime: time.Now(),
}

p.work <- w
res := <-w.workResult

responseWriter <- newWriteResponse(seriesIDs, res.err, er)
cb(res.err)
now := time.Now()
p.wp.Go(func() {
p.forwardDelay.Observe(time.Since(now).Seconds())

tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req)
responseWriter <- newWriteResponse(
seriesIDs,
errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint),
er,
)
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
cb(err)
}, opentracing.Tags{
"endpoint": er.endpoint,
"replica": er.replica,
})
})
}

type peerGroup struct {
Expand Down Expand Up @@ -1423,7 +1358,7 @@ func (p *peerGroup) close(addr string) error {
return nil
}

p.connections[addr].turnOffGoroutines()
p.connections[addr].wp.Close()
delete(p.connections, addr)
if err := c.cc.Close(); err != nil {
return fmt.Errorf("closing connection for %s", addr)
Expand Down

0 comments on commit 1663138

Please sign in to comment.