Skip to content

Commit

Permalink
Reactive Graph Scheduler Update
Browse files Browse the repository at this point in the history
* Refactor of existential IDs
* Enhanced collision detection between ID footprints
* Scheduler complete redesign
  • Loading branch information
gvonness authored Jun 26, 2023
1 parent 06dafa9 commit 35a0857
Show file tree
Hide file tree
Showing 26 changed files with 1,135 additions and 1,098 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ThisBuild / baseVersion := "0.7.0"
ThisBuild / baseVersion := "0.8.0"

ThisBuild / organization := "ai.entrolution"
ThisBuild / organizationName := "Greg von Nessi"
Expand All @@ -13,7 +13,7 @@ ThisBuild / scmInfo := Some(
)

ThisBuild / startYear := Some(2020)
ThisBuild / endYear := Some(2022)
ThisBuild / endYear := Some(2023)

ThisBuild / spiewakCiReleaseSnapshots := false
ThisBuild / spiewakMainBranches := Seq("main")
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import sbt._
object DependencyVersions {
val scala2p13Version = "2.13.8"

val catsEffectVersion = "3.3.14"
val catsFreeVersion = "2.8.0"
val catsEffectVersion = "3.4.8"
val catsFreeVersion = "2.9.0"

val catsEffectTestingVersion = "1.4.0"
}
Expand Down
25 changes: 7 additions & 18 deletions src/main/scala/bengal/stm/STM.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,8 +23,7 @@ import bengal.stm.model.runtime._
import bengal.stm.runtime.{TxnCompilerContext, TxnLogContext, TxnRuntimeContext}

import cats.effect.Ref
import cats.effect.implicits.genSpawnOps
import cats.effect.kernel.{Async, Deferred}
import cats.effect.kernel.Async
import cats.effect.std.Semaphore
import cats.implicits._

Expand Down Expand Up @@ -94,21 +93,15 @@ object STM {
stm

def runtime[F[_]: Async]: F[STM[F]] =
runtime(FiniteDuration(Long.MaxValue, NANOSECONDS),
Runtime.getRuntime.availableProcessors() * 2
)
runtime(FiniteDuration(Long.MaxValue, NANOSECONDS))

def runtime[F[_]: Async](
retryMaxWait: FiniteDuration,
maxWaitingToProcessInLoop: Int
retryMaxWait: FiniteDuration
): F[STM[F]] =
for {
idGenVar <- Ref.of[F, Long](0)
idGenTxn <- Ref.of[F, Long](0)
runningSemaphore <- Semaphore[F](1)
waitingSemaphore <- Semaphore[F](1)
schedulerTrigger <- Deferred[F, Unit]
schedulerTriggerRef <- Ref.of(schedulerTrigger)
graphBuilderSemaphore <- Semaphore[F](1)
stm <- Async[F].delay {
new STM[F] {
override val txnVarIdGen: Ref[F, TxnVarId] = idGenVar
Expand All @@ -117,11 +110,8 @@ object STM {
val txnRuntime: TxnRuntime = new TxnRuntime {
override val scheduler: TxnScheduler =
TxnScheduler(
runningSemaphore,
waitingSemaphore,
schedulerTriggerRef,
retryMaxWait,
maxWaitingToProcessInLoop
graphBuilderSemaphore = graphBuilderSemaphore,
retryWaitMaxDuration = retryMaxWait
)
}

Expand All @@ -137,6 +127,5 @@ object STM {
txnRuntime.commit(txn)
}
}
_ <- stm.txnRuntime.scheduler.reprocessingRecursion.start
} yield stm
}
2 changes: 1 addition & 1 deletion src/main/scala/bengal/stm/api/internal/TxnApiContext.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/bengal/stm/model/AsyncImplicits.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/bengal/stm/model/TxnAdt.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/bengal/stm/model/TxnAdtContext.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/bengal/stm/model/TxnErratum.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
14 changes: 4 additions & 10 deletions src/main/scala/bengal/stm/model/TxnStateEntity.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,8 @@ package bengal.stm.model

import bengal.stm.model.runtime._

import cats.effect.Ref
import cats.effect.std.Semaphore
import cats.effect.{Deferred, Ref}

import java.util.UUID

Expand All @@ -33,15 +33,9 @@ private[stm] trait TxnStateEntity[F[_], V] {
// Note: We run this through a deterministic UUID mapping
// to mitigate the chance of increment-based IDs colliding
// with bare hash codes
private[stm] final val runtimeId: TxnVarRuntimeId =
UUID.nameUUIDFromBytes(id.toString.getBytes).hashCode()
private[stm] final lazy val runtimeId: TxnVarRuntimeId =
TxnVarRuntimeId(UUID.nameUUIDFromBytes(id.toString.getBytes).hashCode())

protected def value: Ref[F, V]
private[stm] def commitLock: Semaphore[F]
private[stm] def txnRetrySignals: TxnSignals[F]

private[stm] def registerRetry(
signal: Deferred[F, Unit]
): F[Unit] =
txnRetrySignals.update(signals => signals ++ Set(signal))
}
23 changes: 6 additions & 17 deletions src/main/scala/bengal/stm/model/TxnVar.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,34 +20,24 @@ package bengal.stm.model
import bengal.stm.STM
import bengal.stm.model.runtime._

import cats.effect.Ref
import cats.effect.kernel.Async
import cats.effect.std.Semaphore
import cats.effect.{Deferred, Ref}
import cats.syntax.all._

case class TxnVar[F[_]: Async, T](
case class TxnVar[F[_], T](
private[stm] val id: TxnVarId,
protected val value: Ref[F, T],
private[stm] val commitLock: Semaphore[F],
private[stm] val txnRetrySignals: TxnSignals[F]
private[stm] val commitLock: Semaphore[F]
) extends TxnStateEntity[F, T] {

private[stm] def completeRetrySignals: F[Unit] =
for {
signals <- txnRetrySignals.getAndSet(Set())
_ <- signals.toList.traverse(_.complete(()))
} yield ()

private[stm] lazy val get: F[T] =
value.get

private[stm] def set(
newValue: T
): F[Unit] =
for {
_ <- value.set(newValue)
_ <- completeRetrySignals
} yield ()
value.set(newValue)
}

object TxnVar {
Expand All @@ -57,6 +47,5 @@ object TxnVar {
id <- STM[F].txnVarIdGen.updateAndGet(_ + 1)
valueRef <- Async[F].ref(value)
lock <- Semaphore[F](1)
signals <- Async[F].ref(Set[Deferred[F, Unit]]())
} yield TxnVar(id, valueRef, lock, signals)
} yield TxnVar(id, valueRef, lock)
}
58 changes: 10 additions & 48 deletions src/main/scala/bengal/stm/model/TxnVarMap.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,9 +20,9 @@ package bengal.stm.model
import bengal.stm.STM
import bengal.stm.model.runtime._

import cats.effect.Ref
import cats.effect.kernel.Async
import cats.effect.std.Semaphore
import cats.effect.{Deferred, Ref}
import cats.syntax.all._

import java.util.UUID
Expand All @@ -34,15 +34,8 @@ case class TxnVarMap[F[_]: STM: Async, K, V](
private[stm] val commitLock: Semaphore[F],
private val internalStructureLock: Semaphore[F],
private val internalSignalLock: Semaphore[F],
private[stm] val txnRetrySignals: TxnSignals[F]
) extends TxnStateEntity[F, VarIndex[F, K, V]] {

private def completeRetrySignals: F[Unit] =
for {
signals <- txnRetrySignals.getAndSet(Set())
_ <- signals.toList.traverse(_.complete(()))
} yield ()

private def withLock[A](semaphore: Semaphore[F])(
fa: F[A]
): F[A] =
Expand Down Expand Up @@ -72,32 +65,13 @@ case class TxnVarMap[F[_]: STM: Async, K, V](
}
} yield result

private[stm] def getId(key: K): F[Option[TxnVarId]] =
getTxnVar(key).map(_.map(_.id))

private[stm] def getRuntimeExistentialId(key: K): TxnVarRuntimeId =
UUID.nameUUIDFromBytes((id, key).toString.getBytes).hashCode()

private[stm] def getRuntimeActualisedId(
key: K
): F[Option[TxnVarRuntimeId]] =
getTxnVar(key).map(_.map(_.runtimeId))
private def getRuntimeExistentialId(key: K): TxnVarRuntimeId =
TxnVarRuntimeId(UUID.nameUUIDFromBytes((id, key).toString.getBytes).hashCode())

private[stm] def getRuntimeId(
key: K
): F[List[TxnVarRuntimeId]] =
getRuntimeActualisedId(key).map(
List(_, Some(getRuntimeExistentialId(key))).flatten
)

// Get transactional IDs for any keys already existing
// in the map
private[stm] def getIdsForKeys(
keySet: Set[K]
): F[Set[TxnVarId]] =
for {
ids <- keySet.toList.traverse(getId)
} yield ids.flatten.toSet
): F[TxnVarRuntimeId] =
Async[F].delay(getRuntimeExistentialId(key).addParent(runtimeId))

// Only called when key is known to not exist
private def add(newKey: K, newValue: V): F[Unit] =
Expand All @@ -106,7 +80,6 @@ case class TxnVarMap[F[_]: STM: Async, K, V](
_ <- withLock(internalStructureLock)(
value.update(_ += (newKey -> newTxnVar))
)
_ <- completeRetrySignals
} yield ()

private[stm] def addOrUpdate(key: K, newValue: V): F[Unit] =
Expand All @@ -116,7 +89,7 @@ case class TxnVarMap[F[_]: STM: Async, K, V](
case Some(tVar) =>
withLock(internalStructureLock)(
tVar.set(newValue)
) >> completeRetrySignals
)
case None =>
add(key, newValue)
}
Expand All @@ -126,21 +99,12 @@ case class TxnVarMap[F[_]: STM: Async, K, V](
for {
txnVarMap <- value.get
_ <- txnVarMap.get(key) match {
case Some(txnVar) =>
for {
_ <- withLock(internalStructureLock)(value.update(_ -= key))
_ <- txnVar.completeRetrySignals
_ <- completeRetrySignals
} yield ()
case Some(_) =>
withLock(internalStructureLock)(value.update(_ -= key))
case None =>
Async[F].unit
}
} yield ()

private[stm] override def registerRetry(
signal: Deferred[F, Unit]
): F[Unit] =
withLock(internalSignalLock)(txnRetrySignals.update(_ + signal))
}

object TxnVarMap {
Expand All @@ -155,12 +119,10 @@ object TxnVarMap {
lock <- Semaphore[F](1)
internalStructureLock <- Semaphore[F](1)
internalSignalLock <- Semaphore[F](1)
signals <- Async[F].ref(Set[Deferred[F, Unit]]())
} yield TxnVarMap(id,
valuesRef,
lock,
internalStructureLock,
internalSignalLock,
signals
internalSignalLock
)
}
2 changes: 1 addition & 1 deletion src/main/scala/bengal/stm/model/package.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Greg von Nessi
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
24 changes: 24 additions & 0 deletions src/main/scala/bengal/stm/model/runtime/ExecutionStatus.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020-2023 Greg von Nessi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.entrolution
package bengal.stm.model.runtime

sealed trait ExecutionStatus

case object Scheduled extends ExecutionStatus
case object Running extends ExecutionStatus
case object NotScheduled extends ExecutionStatus
Loading

0 comments on commit 35a0857

Please sign in to comment.