Skip to content

Commit

Permalink
feat: Update concurrency formula. (#1907)
Browse files Browse the repository at this point in the history
## The Problem

There's a "bug" in the concurrency formula on the scheduler (all
strategies).

This is the formula:

```go
	tableConcurrency := max(s.concurrency/minResourceConcurrency, minTableConcurrency)
	resourceConcurrency := tableConcurrency * minResourceConcurrency
```

But these values are hardcoded:
```go
minResourceConcurrency := 100
minTableConcurrency := 1
```

So if you replace:
```go
	tableConcurrency := max(s.concurrency/100, 1)
	resourceConcurrency := tableConcurrency * 100
```

This means that any plugin whose default concurrency is `<= 100` will
have a table concurrency of `1`, even if it's the only table being
synced (assuming no one changes the default).

## The Fix

I made a very subtle change in the formula. Only if concurrency is `<=
100`, I change the `minResourceConcurrency` to `concurrency/10`. This
decreases the resource concurrency up to 10x (that we don't seem to be
hitting anyway), and increases the table concurrency up to 10x.

## Plugins affected (on default concurrency)

- bamboo-hr
- bigquery
- ~clickhouse~ (doesn't use scheduler)
- confluence
- crowddev
- ~file~ (doesn't use scheduler)
- leanix
- oracledb
- ~s3~ (doesn't use scheduler)
- sentinelone
- servicenow
- shopify
- sonarqube
- statuspage

## The Results

I'm still working on the results (it's trickier than it seems).

In principle, they are very encouraging:

### BigQuery

**Before**

```
$ cli sync bigquery_to_postgresql.yaml
Loading spec(s) from bigquery_to_postgresql.yaml
Starting sync for: bigquery (cloudquery/bigquery@v1.7.0) -> [postgresql (cloudquery/postgresql@v8.6.0)]
Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 2m4s
```

**After**

```
$ cli sync bigquery_to_postgresql.yaml
Loading spec(s) from bigquery_to_postgresql.yaml
Starting sync for: bigquery (cloudquery/bigquery@v1.7.0) -> [postgresql (cloudquery/postgresql@v8.6.0)]
Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 1m27s
```

**Result**

1.43x of regular speed (43% faster)

### Sentinelone

**Before**

```
$ cli sync . 
Loading spec(s) from .
Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.5.5)]
Sync completed successfully. Resources: 1231, Errors: 0, Warnings: 0, Time: 1m4s
```

**After**

```
$ cli sync .
Loading spec(s) from .
Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.5.5)]
Sync completed successfully. Resources: 1231, Errors: 0, Warnings: 0, Time: 15s
```

**Result**

4.27x of regular speed (327% faster)

### Sonarqube


**Before**

```
$ cli sync sonarqube_to_postgresql.yaml
	Loading spec(s) from sonarqube_to_postgresql.yaml
	Starting sync for: sonarqube (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.6.0)]
	Sync completed successfully. Resources: 4594, Errors: 0, Warnings: 0, Time: 39s
```

**After**

```
$ cli sync sonarqube_to_postgresql.yaml
Loading spec(s) from sonarqube_to_postgresql.yaml
Starting sync for: sonarqube (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.6.0)]
Sync completed successfully. Resources: 4594, Errors: 0, Warnings: 0, Time: 22s
```

**Result**

1.77x of regular speed (77% faster)
  • Loading branch information
marianogappa authored Sep 30, 2024
1 parent bea3b00 commit adce99c
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,16 @@ func NewScheduler(opts ...Option) *Scheduler {
for _, opt := range opts {
opt(&s)
}

actualMinResourceConcurrency := minResourceConcurrency
if s.concurrency <= minResourceConcurrency {
actualMinResourceConcurrency = max(s.concurrency/10, 1)
}

// This is very similar to the concurrent web crawler problem with some minor changes.
// We are using DFS/Round-Robin to make sure memory usage is capped at O(h) where h is the height of the tree.
tableConcurrency := max(s.concurrency/minResourceConcurrency, minTableConcurrency)
resourceConcurrency := tableConcurrency * minResourceConcurrency
tableConcurrency := max(s.concurrency/actualMinResourceConcurrency, minTableConcurrency)
resourceConcurrency := tableConcurrency * actualMinResourceConcurrency
s.tableSems = make([]*semaphore.Weighted, s.maxDepth)
for i := uint64(0); i < s.maxDepth; i++ {
s.tableSems[i] = semaphore.NewWeighted(int64(tableConcurrency))
Expand Down

0 comments on commit adce99c

Please sign in to comment.