diff --git a/build.sbt b/build.sbt index 3cbbe84..b9f99a0 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -ThisBuild / baseVersion := "0.9.1" +ThisBuild / baseVersion := "0.9.2" ThisBuild / organization := "ai.entrolution" ThisBuild / organizationName := "Greg von Nessi" diff --git a/src/main/scala/bengal/stm/STM.scala b/src/main/scala/bengal/stm/STM.scala index c2329ea..1370f3b 100644 --- a/src/main/scala/bengal/stm/STM.scala +++ b/src/main/scala/bengal/stm/STM.scala @@ -95,6 +95,7 @@ object STM { idGenVar <- Ref.of[F, Long](0) idGenTxn <- Ref.of[F, Long](0) graphBuilderSemaphore <- Semaphore[F](1) + retrySemaphore <- Semaphore[F](1) stm <- Async[F].delay { new STM[F] { override val txnVarIdGen: Ref[F, TxnVarId] = idGenVar @@ -102,7 +103,9 @@ object STM { val txnRuntime: TxnRuntime = new TxnRuntime { override val scheduler: TxnScheduler = - TxnScheduler(graphBuilderSemaphore) + TxnScheduler(graphBuilderSemaphore = graphBuilderSemaphore, + retrySemaphore = retrySemaphore + ) } override def allocateTxnVar[V](value: V): F[TxnVar[F, V]] = diff --git a/src/main/scala/bengal/stm/runtime/TxnRuntimeContext.scala b/src/main/scala/bengal/stm/runtime/TxnRuntimeContext.scala index 668835a..f42b38b 100644 --- a/src/main/scala/bengal/stm/runtime/TxnRuntimeContext.scala +++ b/src/main/scala/bengal/stm/runtime/TxnRuntimeContext.scala @@ -55,10 +55,50 @@ private[stm] trait TxnRuntimeContext[F[_]] { private[stm] case class TxnScheduler( graphBuilderSemaphore: Semaphore[F], - activeTransactions: MutableMap[TxnId, AnalysedTxn[_]] + activeTransactions: MutableMap[TxnId, AnalysedTxn[_]], + retrySemaphore: Semaphore[F], + retryMap: MutableMap[IdFootprint, F[Unit]] ) { override val toString: String = "TxnScheduler" + def checkRetryQueue(idFootprint: IdFootprint): F[Unit] = + for { + _ <- retrySemaphore.acquire + triggeredFootprints <- + retryMap.keys.toList.parTraverse { waitingFootprint => + Async[F].ifM( + Async[F].delay( + !idFootprint.isCompatibleWith(waitingFootprint) + ) + )( + retryMap(waitingFootprint) >> Async[F].pure( + Option(waitingFootprint) + ), + Async[F].pure(None.asInstanceOf[Option[IdFootprint]]) + ) + }.map(_.flatten) + _ <- triggeredFootprints.traverse(footprint => + Async[F].delay(retryMap.remove(footprint)) + ) + _ <- retrySemaphore.release + } yield () + + def submitTxnForRetry(analysedTxn: AnalysedTxn[_]): F[Unit] = + for { + _ <- retrySemaphore.acquire + footprint <- Async[F].delay(analysedTxn.idFootprint) + execSpec <- Async[F].delay(retryMap.get(footprint)) + _ <- Async[F].delay(execSpec match { + case Some(spec) => + retryMap.update(footprint, + spec >> submitTxn(analysedTxn).start.void + ) + case None => + retryMap.addOne(footprint -> submitTxn(analysedTxn).start.void) + }) + _ <- retrySemaphore.release + } yield () + def submitTxnForImmediateRetry(analysedTxn: AnalysedTxn[_]): F[Unit] = for { _ <- analysedTxn.resetDependencyTally @@ -108,6 +148,7 @@ private[stm] trait TxnRuntimeContext[F[_]] { _ <- testAndLink.parTraverse(_.joinWithNever) _ <- analysedTxn.checkExecutionReadiness _ <- graphBuilderSemaphore.release + _ <- checkRetryQueue(analysedTxn.idFootprint).start } yield () def submitTxn(analysedTxn: AnalysedTxn[_]): F[Unit] = @@ -135,6 +176,7 @@ private[stm] trait TxnRuntimeContext[F[_]] { _ <- testAndLink.parTraverse(_.joinWithNever) _ <- analysedTxn.checkExecutionReadiness _ <- graphBuilderSemaphore.release + _ <- checkRetryQueue(analysedTxn.idFootprint).start } yield () def registerCompletion(analysedTxn: AnalysedTxn[_]): F[Unit] = @@ -156,11 +198,14 @@ private[stm] trait TxnRuntimeContext[F[_]] { private[stm] object TxnScheduler { private[stm] def apply( - graphBuilderSemaphore: Semaphore[F] + graphBuilderSemaphore: Semaphore[F], + retrySemaphore: Semaphore[F] ): TxnScheduler = TxnScheduler( activeTransactions = MutableMap(), - graphBuilderSemaphore = graphBuilderSemaphore + graphBuilderSemaphore = graphBuilderSemaphore, + retrySemaphore = retrySemaphore, + retryMap = MutableMap() ) } @@ -173,11 +218,12 @@ private[stm] trait TxnRuntimeContext[F[_]] { dependencyTally: Ref[F, Int], unsubSpecs: MutableMap[TxnId, F[Unit]], executionStatus: Ref[F, ExecutionStatus], + hasDownstream: Ref[F, Boolean], scheduler: TxnScheduler ) { private[stm] val resetDependencyTally: F[Unit] = - dependencyTally.set(0) + dependencyTally.set(0) >> hasDownstream.set(false) private[stm] val checkExecutionReadiness: F[Unit] = Async[F].ifM(dependencyTally.get.map(_ == 0))( @@ -208,6 +254,7 @@ private[stm] trait TxnRuntimeContext[F[_]] { txn.id -> txn.unsubscribeUpstreamDependency ) ) + _ <- hasDownstream.set(true) } yield () ) @@ -293,7 +340,10 @@ private[stm] trait TxnRuntimeContext[F[_]] { Right[Throwable, V](result.asInstanceOf[V]) ) case TxnResultRetry => - ex.submitTxn(this) + Async[F].ifM(hasDownstream.get)( + ex.submitTxn(this), + ex.submitTxnForRetry(this) + ) case TxnResultLogDirty(idFootprintRefinement) => ex.submitTxnForImmediateRetry( this.copy(idFootprint = @@ -335,6 +385,7 @@ private[stm] trait TxnRuntimeContext[F[_]] { } completionSignal <- Deferred[F, Either[Throwable, V]] dependencyTally <- Ref[F].of(0) + hasDownstream <- Ref[F].of(false) executionStatus <- Ref[F].of(NotScheduled.asInstanceOf[ExecutionStatus]) id <- txnIdGen.getAndUpdate(_ + 1) analysedTxn <- @@ -347,6 +398,7 @@ private[stm] trait TxnRuntimeContext[F[_]] { dependencyTally = dependencyTally, unsubSpecs = MutableMap(), executionStatus = executionStatus, + hasDownstream = hasDownstream, scheduler = scheduler ) )