Skip to content

Commit

Permalink
feat: Implement RandomQueue scheduler strategy (#1914)
Browse files Browse the repository at this point in the history
This PR implements a new Scheduler Strategy based on a _Concurrent
Random Queue_. It is based on @erezrokah 's Priority Queue Scheduler
Strategy.

## How does it work

This is hopefully a much simpler scheduling strategy. It doesn't have
any semaphores; it just uses the existing concurrency setting.

Table resolvers (and their relations) get `Push`ed into a work queue,
and `concurrency` workers `Pull` from this queue, but they pull a random
element from it.

## Why it should work better

**The key benefit of this strategy is this:**
- Assumption 1: most slow syncs are actually slow because of rate
limits, not because of I/O limits or too much data.
- Assumption 2: the meaty part of the sync is syncing relations, because
each child table has a resolver per parent.
- Benefit: because the likelihood of picking up a child resolver of a
given table is uniformly distributed across the `int32` range, all
relation API calls are evenly spread throughout the sync, thus optimally
minimising rate limits!

## Does it work better?

Still working on results. Notably AWS & Azure yield mixed results; still
have to look into why.

### GCP

**Before**

```
$ cli sync .
Loading spec(s) from .
Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 25799, Errors: 0, Warnings: 0, Time: 2m23s
```

UPDATE: GCP is moving to Round Robin strategy, and it's much faster with
this strategy:

```
$ cli sync .
Loading spec(s) from .
Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 26355, Errors: 0, Warnings: 0, Time: 40s
```

**After**

```
$ cli sync .
Loading spec(s) from .
Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 26186, Errors: 0, Warnings: 0, Time: 34s
```

**Result:  76.22% reduction in time, or 3.21 times faster.**
**Result against Round Robin: 15% reduction in time, or 0.18 times
faster (probably within margin of error)**

### BigQuery

**Before**

```
$ cli sync bigquery_to_postgresql.yaml
Loading spec(s) from bigquery_to_postgresql.yaml
Starting sync for: bigquery (cloudquery/bigquery@v1.7.0) -> [postgresql (cloudquery/postgresql@v8.6.0)]
Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 2m7s
```

**After**

```
$ cli sync bigquery_to_postgresql.yaml
Loading spec(s) from bigquery_to_postgresql.yaml
Starting sync for: bigquery (cloudquery/bigquery@v1.7.0) -> [postgresql (cloudquery/postgresql@v8.6.0)]
Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 1m26s
```

**Result: 32.28% reduction in time, or 0.48 times faster**

### SentinelOne

**Before** (it was already quite fast due to previous merged
improvement)

```
$ cli sync .
Loading spec(s) from .
Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.5.5)]
Sync completed successfully. Resources: 1295, Errors: 0, Warnings: 0, Time: 15s
```

**After**

```
$ cli sync .
Loading spec(s) from .
Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.5.5)]
Sync completed successfully. Resources: 1295, Errors: 0, Warnings: 0, Time: 8s
```

**Result: 46.67% reduction in time, or 0.875 times faster**

## How to test

- Add a `go.mod` replace for sdk: `replace
github.com/cloudquery/plugin-sdk/v4 =>
github.com/cloudquery/plugin-sdk/v4
v4.63.1-0.20241002131015-243705c940c6` (check last commit on this PR)
- Run source plugin via grpc locally; make sure to configure the
scheduler strategy to `scheduler.StrategyRandomQueue`.

## How scary is it to merge

- This scheduler strategy is not used by any plugins by default, so in
principle this should be safe to merge.

---------

Co-authored-by: erezrokah <erezrokah@users.noreply.github.com>
  • Loading branch information
marianogappa and erezrokah authored Oct 3, 2024
1 parent 38b4bfd commit af8ac87
Show file tree
Hide file tree
Showing 15 changed files with 681 additions and 96 deletions.
20 changes: 12 additions & 8 deletions scheduler/metrics.go → scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package scheduler
package metrics

import (
"context"
Expand All @@ -12,6 +12,10 @@ import (
"go.opentelemetry.io/otel/metric"
)

const (
OtelName = "io.cloudquery"
)

// Metrics is deprecated as we move toward open telemetry for tracing and metrics
type Metrics struct {
TableClient map[string]map[string]*TableClientMetrics
Expand Down Expand Up @@ -82,39 +86,39 @@ func (s *Metrics) Equal(other *Metrics) bool {
}

func getOtelMeters(tableName string, clientID string) *OtelMeters {
resources, err := otel.Meter(otelName).Int64Counter("sync.table.resources",
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
metric.WithDescription("Number of resources synced for a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

errors, err := otel.Meter(otelName).Int64Counter("sync.table.errors",
errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors",
metric.WithDescription("Number of errors encountered while syncing a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

panics, err := otel.Meter(otelName).Int64Counter("sync.table.panics",
panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics",
metric.WithDescription("Number of panics encountered while syncing a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

startTime, err := otel.Meter(otelName).Int64Counter("sync.table.start_time",
startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time",
metric.WithDescription("Start time of syncing a table"),
metric.WithUnit("ns"),
)
if err != nil {
return nil
}

endTime, err := otel.Meter(otelName).Int64Counter("sync.table.end_time",
endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time",
metric.WithDescription("End time of syncing a table"),
metric.WithUnit("ns"),
)
Expand All @@ -136,7 +140,7 @@ func getOtelMeters(tableName string, clientID string) *OtelMeters {
}
}

func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMeta) {
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
for _, client := range clients {
tableName := table.Name
Expand All @@ -146,7 +150,7 @@ func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMe
}
}
for _, relation := range table.Relations {
s.initWithClients(relation, clients)
s.InitWithClients(relation, clients)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package scheduler
package metrics

import "testing"

Expand Down
67 changes: 67 additions & 0 deletions scheduler/queue/active_work_signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package queue

import (
"sync"
"sync/atomic"
)

// activeWorkSignal is a thread-safe coordinator for awaiting a worker pool
// that relies on a queue that might be temporarily empty.
//
// If queue is empty and workers idle, done!
//
// If the queue is empty but a worker is working on a task, we must wait and check
// if it's empty after that worker finishes. That's why we need this.
//
// Use it like this:
//
// - When a worker picks up a task, call `Add()` (like a WaitGroup)
// - When a worker finishes a task, call `Done()` (like a WaitGroup)
//
// - If the queue is empty, check `IsIdle()` to check if no workers are active.
// - If workers are still active, call `Wait()` to block until state changes.
type activeWorkSignal struct {
countChangeSignal *sync.Cond
activeWorkUnitCount *atomic.Int32
isStarted *atomic.Bool
}

func newActiveWorkSignal() *activeWorkSignal {
return &activeWorkSignal{
countChangeSignal: sync.NewCond(&sync.Mutex{}),
activeWorkUnitCount: &atomic.Int32{},
isStarted: &atomic.Bool{},
}
}

// Add means a worker has started working on a task.
//
// Wake up the work queuing goroutine.
func (s *activeWorkSignal) Add() {
s.activeWorkUnitCount.Add(1)
s.isStarted.Store(true)
s.countChangeSignal.Signal()
}

// Done means a worker has finished working on a task.
//
// If the count became zero, wake up the work queuing goroutine (might have finished).
func (s *activeWorkSignal) Done() {
s.activeWorkUnitCount.Add(-1)
s.countChangeSignal.Signal()
}

// IsIdle returns true if no workers are active. If queue is empty and workers idle, done!
func (s *activeWorkSignal) IsIdle() bool {
return s.isStarted.Load() && s.activeWorkUnitCount.Load() <= 0
}

// Wait blocks until the count of active workers changes.
func (s *activeWorkSignal) Wait() {
if s.activeWorkUnitCount.Load() <= 0 {
return
}
s.countChangeSignal.L.Lock()
defer s.countChangeSignal.L.Unlock()
s.countChangeSignal.Wait()
}
41 changes: 41 additions & 0 deletions scheduler/queue/concurrent_random_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package queue

import (
"math/rand"
"sync"
)

// ConcurrentRandomQueue is a generic, thread-safe queue
// that pops random elements in O(1) time.
type ConcurrentRandomQueue[T any] struct {
mu sync.Mutex
queue []T
random *rand.Rand
}

func NewConcurrentRandomQueue[T any](seed int64, capacityHint int) *ConcurrentRandomQueue[T] {
return &ConcurrentRandomQueue[T]{queue: make([]T, 0, capacityHint), random: rand.New(rand.NewSource(seed))}
}

func (q *ConcurrentRandomQueue[T]) Push(item T) {
q.mu.Lock()
defer q.mu.Unlock()

q.queue = append(q.queue, item)
}

func (q *ConcurrentRandomQueue[T]) Pop() *T {
q.mu.Lock()
defer q.mu.Unlock()

if len(q.queue) == 0 {
return nil
}
idx := q.random.Intn(len(q.queue))
lastIdx := len(q.queue) - 1
q.queue[idx], q.queue[lastIdx] = q.queue[lastIdx], q.queue[idx]
item := q.queue[lastIdx]
q.queue = q.queue[:lastIdx]

return &item
}
138 changes: 138 additions & 0 deletions scheduler/queue/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package queue

import (
"context"

"github.com/cloudquery/plugin-sdk/v4/caser"
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/google/uuid"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
)

const DefaultWorkerCount = 1000

// WorkUnit is an atomic unit of work that the scheduler syncs.
//
// It is one table resolver (same as all other scheduler strategies).
//
// But if it is a non-top-level table, it is bound to a single parent resource.
type WorkUnit struct {
Table *schema.Table
Client schema.ClientMeta
Parent *schema.Resource
}

type Scheduler struct {
workerCount int
logger zerolog.Logger
caser *caser.Caser
deterministicCQID bool
metrics *metrics.Metrics
invocationID string
seed int64
}

type Option func(*Scheduler)

func WithWorkerCount(workerCount int) Option {
return func(d *Scheduler) {
d.workerCount = workerCount
}
}

func WithCaser(c *caser.Caser) Option {
return func(d *Scheduler) {
d.caser = c
}
}

func WithDeterministicCQID(deterministicCQID bool) Option {
return func(d *Scheduler) {
d.deterministicCQID = deterministicCQID
}
}

func WithInvocationID(invocationID string) Option {
return func(d *Scheduler) {
d.invocationID = invocationID
}
}

func NewShuffleQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed int64, opts ...Option) *Scheduler {
scheduler := &Scheduler{
logger: logger,
metrics: m,
workerCount: DefaultWorkerCount,
caser: caser.New(),
invocationID: uuid.New().String(),
seed: seed,
}

for _, opt := range opts {
opt(scheduler)
}

return scheduler
}

func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource) {
if len(tableClients) == 0 {
return
}
queue := NewConcurrentRandomQueue[WorkUnit](d.seed, len(tableClients))
for _, tc := range tableClients {
queue.Push(tc)
}

jobs := make(chan *WorkUnit)
activeWorkSignal := newActiveWorkSignal()

// Worker pool
workerPool, _ := errgroup.WithContext(ctx)
for w := 0; w < d.workerCount; w++ {
workerPool.Go(func() error {
newWorker(
jobs,
queue,
resolvedResources,
d.logger,
d.caser,
d.invocationID,
d.deterministicCQID,
d.metrics,
).work(ctx, activeWorkSignal)
return nil
})
}

// Work distribution
go func() {
defer close(jobs)
for {
select {
case <-ctx.Done():
return
default:
item := queue.Pop()

// There is work to do
if item != nil {
jobs <- item
continue
}

// Queue is empty and no active work, done!
if activeWorkSignal.IsIdle() {
return
}

// Queue is empty and there is active work, wait for changes
activeWorkSignal.Wait()
}
}
}()

_ = workerPool.Wait()
}
Loading

0 comments on commit af8ac87

Please sign in to comment.