Skip to content

Commit

Permalink
Added docs to OperationQueueSupport
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Sep 28, 2023
1 parent b55617f commit 38f8a79
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import scala.language.implicitConversions
* database.collection.op.upsert(docs: _*): IO[Unit]
*
* This will queue up to `opFlushSize` and then stream the batch in `opChunkSize` into the database. Very useful when
* you are needing to do various operations across potentially multiple collections efficiently.
* you are needing to do various operations across potentially multiple collections efficiently. Make sure to call
* `flushQueue()` when finished to avoid un-pushed operations.
*/
trait OperationQueueSupport {
protected def opFlushSize: Int = 10_000
Expand All @@ -28,7 +29,13 @@ trait OperationQueueSupport {
q.asInstanceOf[OperationsQueue[D, M]]
}

def flushQueue(): IO[Unit] = ops.values.map(_.flush()).toList.sequence.void
/**
* Fully flushes all pending operation queues.
*/
def flushQueue(): IO[Unit] = ops.values.map(_.op.flush()).toList.sequence.void

def clear(): IO[Unit] = IO(ops.clear())
/**
* Clears the operation queues and removes all pending operations.
*/
def clearQueue(): IO[Unit] = IO(ops.clear())
}
32 changes: 25 additions & 7 deletions driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,25 @@ import java.util.concurrent.atomic.AtomicInteger

case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection: DocumentCollection[D, M],
flushSize: Int,
chunkSize: Int) {
oq =>
chunkSize: Int) { oq =>
private var queues = List.empty[OpQueue]

/**
* Provide queue operations on a collection. Call `flush()` at the end to make sure all batched data is pushed.
*/
object op {
lazy val insert: OpQueue = OpQueue(stream => collection.stream.insert(stream, chunkSize).void)
lazy val upsert: OpQueue = OpQueue(stream => collection.stream.upsert(stream, chunkSize).void)
lazy val delete: OpQueue = OpQueue(stream => collection.stream.delete(stream.map(_._id), chunkSize).void)
}

def flush(): IO[Unit] = queues.map(_.flush()).sequence.void
/**
* Flushes the queue
*
* @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count
* is below the flushSize threshold.
*/
def flush(fullFlush: Boolean = true): IO[Unit] = queues.map(_.flush(fullFlush)).sequence.void
}

case class OpQueue(process: fs2.Stream[IO, D] => IO[Unit]) {
oq.synchronized {
Expand All @@ -46,18 +54,28 @@ case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection:
}
}

/**
* Queue operations for the supplied docs. If this causes the flushSize to overflow, a flush will occur before this
* returns. Otherwise, this is a very fast operation.
*/
def apply(docs: D*): IO[Unit] = IO {
docs.foreach(queue.add)
counter.addAndGet(docs.length)
}.flatMap { size =>
if (size >= flushSize) {
flush()
flush(fullFlush = false)
} else {
IO.unit
}
}

def flush(fullFlush: Boolean = false): IO[Unit] = IO(take(chunkSize)).flatMap { list =>
/**
* Flushes the queue
*
* @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count
* is below the flushSize threshold.
*/
def flush(fullFlush: Boolean = true): IO[Unit] = IO(take(chunkSize)).flatMap { list =>
if (list.isEmpty) {
IO.unit
} else {
Expand All @@ -73,4 +91,4 @@ case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection:
}
}
}
}
}

0 comments on commit 38f8a79

Please sign in to comment.