Skip to content

Commit

Permalink
prefetch
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 17, 2024
1 parent 0827fb0 commit f254773
Showing 1 changed file with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ object Processing {
val badProcessor = BadRowProcessor(env.appInfo.name, env.appInfo.version)

in.through(setLatency(env.metrics))
.prefetchN(env.cpuParallelism)
.through(parseAndTransform(env, badProcessor))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
Expand Down Expand Up @@ -363,24 +364,34 @@ object Processing {
env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad)
}

private implicit def batchableTokens: BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] =
new BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] {
def combine(b: Vector[Unique.Token], a: BatchAfterTransform): Vector[Unique.Token] =
b ++ a.tokens
/**
* Batches up checkpointing tokens, so we checkpoint every 10 seconds, instead of once per batch
*
* TODO: This should move to common-streams because it will be helpful for other loaders. The
* delay is helpful for Kinesis and Kafka, but not for PubSub.
*/
private def emitTokens[F[_]: Async]: Pipe[F, BatchAfterTransform, Unique.Token] = {
implicit val batchableTokens: BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] =
new BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] {
def combine(b: Vector[Unique.Token], a: BatchAfterTransform): Vector[Unique.Token] =
b ++ a.tokens

def single(a: BatchAfterTransform): Vector[Unique.Token] =
a.tokens
def single(a: BatchAfterTransform): Vector[Unique.Token] =
a.tokens

def weightOf(a: BatchAfterTransform): Long =
0L
}
def weightOf(a: BatchAfterTransform): Long =
0L
}

// This will become configurable when we migrate it to common-streams
val checkpointDelay = 10.seconds

private def emitTokens[F[_]: Async]: Pipe[F, BatchAfterTransform, Unique.Token] =
BatchUp.withTimeout[F, BatchAfterTransform, Vector[Unique.Token]](Long.MaxValue, 10.seconds).andThen {
BatchUp.withTimeout[F, BatchAfterTransform, Vector[Unique.Token]](Long.MaxValue, checkpointDelay).andThen {
_.flatMap { tokens =>
Stream.emits(tokens)
}
}
}

private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt)

Expand Down

0 comments on commit f254773

Please sign in to comment.