Skip to content

Commit

Permalink
Dynamic upload parallelism factor
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 17, 2024
1 parent 4a3a929 commit b1f1019
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 42 deletions.
6 changes: 4 additions & 2 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls ow many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
Expand Down
6 changes: 4 additions & 2 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls ow many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
Expand Down
6 changes: 4 additions & 2 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls ow many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"batching": {
"maxBytes": 16000000
"maxDelay": "1 second"
"uploadConcurrency": 3
"uploadParallelismFactor": 2.5
}
"cpuParallelismFraction": 0.75

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object Config {
case class Batching(
maxBytes: Long,
maxDelay: FiniteDuration,
uploadConcurrency: Int
uploadParallelismFactor: BigDecimal
)

case class Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ object Environment {
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
channelProviders <- Vector.range(0, config.batching.uploadConcurrency).traverse { index =>
cpuParallelism = chooseCpuParallelism(config)
uploadParallelism = chooseUploadParallelism(config)
channelProviders <- Vector.range(0, uploadParallelism).traverse { index =>
for {
channelOpener <- Channel.opener(config.output.good, config.retries, appHealth, index)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
} yield channelProvider
}
cpuParallelism = chooseCpuParallelism(config)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
Expand All @@ -95,4 +96,15 @@ object Environment {
(Runtime.getRuntime.availableProcessors * config.cpuParallelismFraction)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

/**
* See the description of `uploadParallelism` on the [[Environment]] class
*
* For bigger instances (more cores) we produce batches more quickly, and so need higher upload
* parallelism so that uploading is not the bottleneck
*/
private def chooseUploadParallelism(config: Config[Any, Any]): Int =
(Runtime.getRuntime.availableProcessors * config.batching.uploadParallelismFactor)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,24 +363,34 @@ object Processing {
env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad)
}

private implicit def batchableTokens: BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] =
new BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] {
def combine(b: Vector[Unique.Token], a: BatchAfterTransform): Vector[Unique.Token] =
b ++ a.tokens
/**
* Batches up checkpointing tokens, so we checkpoint every 10 seconds, instead of once per batch
*
* TODO: This should move to common-streams because it will be helpful for other loaders. The
* delay is helpful for Kinesis and Kafka, but not for PubSub.
*/
private def emitTokens[F[_]: Async]: Pipe[F, BatchAfterTransform, Unique.Token] = {
implicit val batchableTokens: BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] =
new BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] {
def combine(b: Vector[Unique.Token], a: BatchAfterTransform): Vector[Unique.Token] =
b ++ a.tokens

def single(a: BatchAfterTransform): Vector[Unique.Token] =
a.tokens
def single(a: BatchAfterTransform): Vector[Unique.Token] =
a.tokens

def weightOf(a: BatchAfterTransform): Long =
0L
}
def weightOf(a: BatchAfterTransform): Long =
0L
}

// This will become configurable when we migrate it to common-streams
val checkpointDelay = 10.seconds

private def emitTokens[F[_]: Async]: Pipe[F, BatchAfterTransform, Unique.Token] =
BatchUp.withTimeout[F, BatchAfterTransform, Vector[Unique.Token]](Long.MaxValue, 10.seconds).andThen {
BatchUp.withTimeout[F, BatchAfterTransform, Vector[Unique.Token]](Long.MaxValue, checkpointDelay).andThen {
_.flatMap { tokens =>
Stream.emits(tokens)
}
}
}

private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ object MockEnvironment {
metrics = testMetrics(state),
appHealth = testAppHealth(state),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 10.seconds,
uploadConcurrency = 1
maxBytes = 16000000,
maxDelay = 10.seconds,
uploadParallelismFactor = BigDecimal(1)
),
cpuParallelism = 2,
schemasToSkip = List.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ object KafkaConfigSpec {
)
),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 1.second,
uploadConcurrency = 3
maxBytes = 16000000,
maxDelay = 1.second,
uploadParallelismFactor = BigDecimal(2.5)
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
Expand Down Expand Up @@ -186,9 +186,9 @@ object KafkaConfigSpec {
)
),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 1.second,
uploadConcurrency = 1
maxBytes = 16000000,
maxDelay = 1.second,
uploadParallelismFactor = BigDecimal(2.5)
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ object KinesisConfigSpec {
)
),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 1.second,
uploadConcurrency = 3
maxBytes = 16000000,
maxDelay = 1.second,
uploadParallelismFactor = BigDecimal(2.5)
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
Expand Down Expand Up @@ -177,9 +177,9 @@ object KinesisConfigSpec {
)
),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 1.second,
uploadConcurrency = 1
maxBytes = 16000000,
maxDelay = 1.second,
uploadParallelismFactor = BigDecimal(2.5)
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ object PubsubConfigSpec {
)
),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 1.second,
uploadConcurrency = 3
maxBytes = 16000000,
maxDelay = 1.second,
uploadParallelismFactor = BigDecimal(2.5)
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
Expand Down Expand Up @@ -176,9 +176,9 @@ object PubsubConfigSpec {
)
),
batching = Config.Batching(
maxBytes = 16000000,
maxDelay = 1.second,
uploadConcurrency = 1
maxBytes = 16000000,
maxDelay = 1.second,
uploadParallelismFactor = BigDecimal(2.5)
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
Expand Down

0 comments on commit b1f1019

Please sign in to comment.