Skip to content

Commit

Permalink
Merge pull request #136 from pawelkaczor/akka-timers-without-trigger
Browse files Browse the repository at this point in the history
Add calev-akka (Akka Typed Support) - without using Trigger directly
  • Loading branch information
eikek authored Jun 16, 2021
2 parents 56095da + b017fe6 commit 750061a
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 29 deletions.
62 changes: 43 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ compared to systemd:
and generator for calendar events. It is also published for ScalaJS.
With sbt, use:
```sbt
libraryDependencies += "com.github.eikek" %% "calev-core" % "0.5.0"
libraryDependencies += "com.github.eikek" %% "calev-core" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT"
```
- The *fs2* module contains utilities to work with
[FS2](https://github.com/functional-streams-for-scala/fs2) streams.
Expand All @@ -61,19 +61,24 @@ compared to systemd:
[fs2-cron](https://github.com/fthomas/fs2-cron) library. It is also published
for ScalaJS. With sbt, use
```sbt
libraryDependencies += "com.github.eikek" %% "calev-fs2" % "0.5.0"
libraryDependencies += "com.github.eikek" %% "calev-fs2" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT"
```
- The *doobie* module contains `Meta`, `Read` and `Write` instances
for `CalEvent` to use with
[doobie](https://github.com/tpolecat/doobie).
```sbt
libraryDependencies += "com.github.eikek" %% "calev-doobie" % "0.5.0"
libraryDependencies += "com.github.eikek" %% "calev-doobie" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT"
```
- The *circe* module defines a json decoder and encoder for `CalEvent`
instances to use with [circe](https://github.com/circe/circe). It is also
published for ScalaJS.
```sbt
libraryDependencies += "com.github.eikek" %% "calev-circe" % "0.5.0"
libraryDependencies += "com.github.eikek" %% "calev-circe" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT"
```
- The *akka* module allows to use calendar events with [Akka Scheduler](https://doc.akka.io/docs/akka/current/scheduler.html)
and [Akka Timers](https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#typed-scheduling).
```sbt
libraryDependencies += "com.github.eikek" %% "calev-akka" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT"
```


Expand Down Expand Up @@ -158,16 +163,16 @@ import java.time._
ce.asString
// res4: String = "*-*-* 00/2:00:00"
val now = LocalDateTime.now
// now: LocalDateTime = 2021-06-07T08:58:36.220
// now: LocalDateTime = 2021-06-15T18:19:16.523
ce.nextElapse(now)
// res5: Option[LocalDateTime] = Some(value = 2021-06-07T10:00)
// res5: Option[LocalDateTime] = Some(value = 2021-06-15T20:00)
ce.nextElapses(now, 5)
// res6: List[LocalDateTime] = List(
// 2021-06-07T10:00,
// 2021-06-07T12:00,
// 2021-06-07T14:00,
// 2021-06-07T16:00,
// 2021-06-07T18:00
// 2021-06-15T20:00,
// 2021-06-15T22:00,
// 2021-06-16T00:00,
// 2021-06-16T02:00,
// 2021-06-16T04:00
// )
```

Expand Down Expand Up @@ -200,7 +205,7 @@ import java.time.LocalTime
import scala.concurrent.ExecutionContext

implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
// timer: Timer[IO] = cats.effect.internals.IOTimer@2b83ba8c
// timer: Timer[IO] = cats.effect.internals.IOTimer@f5ba38a

val printTime = IO(println(LocalTime.now))
// printTime: IO[Unit] = Delay(thunk = <function0>)
Expand All @@ -221,9 +226,9 @@ val task = CalevFs2.awakeEvery[IO](event).evalMap(_ => printTime)
// task: Stream[IO[x], Unit] = Stream(..)

task.take(3).compile.drain.unsafeRunSync
// 08:58:38.017
// 08:58:40.001
// 08:58:42
// 18:19:18.025
// 18:19:20.002
// 18:19:22.002
```


Expand Down Expand Up @@ -261,8 +266,8 @@ val insert =
// acquire = Suspend(
// a = PrepareStatement(a = "INSERT INTO mytable (event) VALUES (?)")
// ),
// use = doobie.hi.connection$$$Lambda$7902/37512530@6f0227ba,
// release = cats.effect.Bracket$$Lambda$7904/1537174318@5df8aeda
// use = doobie.hi.connection$$$Lambda$14364/1018409754@1fda5d05,
// release = cats.effect.Bracket$$Lambda$14366/1381227270@4dcfea87
// )
// )

Expand All @@ -273,8 +278,8 @@ val select =
// acquire = Suspend(
// a = PrepareStatement(a = "SELECT event FROM mytable WHERE id = 1")
// ),
// use = doobie.hi.connection$$$Lambda$7902/37512530@55d0504f,
// release = cats.effect.Bracket$$Lambda$7904/1537174318@30746cde
// use = doobie.hi.connection$$$Lambda$14364/1018409754@55610e82,
// release = cats.effect.Bracket$$Lambda$14366/1381227270@3a81a963
// )
// )
```
Expand Down Expand Up @@ -348,3 +353,22 @@ val read = for {
// )
// )
```
### Akka

```scala
import com.github.eikek.calev.CalEvent
import com.github.eikek.calev.akka.CalevTimerScheduler
import _root_.akka.actor.typed.scaladsl.Behaviors._

case class Tick(timestamp: ZonedDateTime)

def calEvent = CalEvent.unsafe("*-*-* *:0/1:0") // every day, every full minute // every day, every full minute

def behavior = CalevTimerScheduler.withCalendarEvent(calEvent, Tick)(
receiveMessage[Tick] { tick =>
println(s"Tick scheduled at ${tick.timestamp} received at: ${Instant.now}")
same
}
)
```
More examples to come...
24 changes: 21 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ val sharedSettings = Seq(
"-Ywarn-value-discard"
)
else if (scalaBinaryVersion.value.startsWith("2.13"))
List("-Werror", "-Wdead-code", "-Wunused", "-Wvalue-discard")
List("-Werror", "-Wdead-code", "-Wunused", "-Wvalue-discard", "-Ytasty-reader")
else if (scalaBinaryVersion.value.startsWith("3"))
List(
"-explain",
Expand Down Expand Up @@ -166,6 +166,24 @@ lazy val circe = crossProject(JSPlatform, JVMPlatform)
lazy val circeJVM = circe.jvm
lazy val circeJS = circe.js

lazy val akkaJVM = project
.in(file("modules/akka"))
.dependsOn(coreJVM)
.settings(sharedSettings)
.settings(scalafixSettings)
.settings(
name := "calev-akka",
crossScalaVersions := Seq(scala212, scala213),
developers += Developer(
id = "pawelkaczor",
name = "Paweł Kaczor",
url = url("https://github.com/pawelkaczor"),
email = ""
),
libraryDependencies ++=
Dependencies.akkaAll ++ Dependencies.scalaTest ++ Dependencies.logback.map(_ % Test)
)

lazy val readme = project
.in(file("modules/readme"))
.enablePlugins(MdocPlugin)
Expand All @@ -189,7 +207,7 @@ lazy val readme = project
()
}
)
.dependsOn(coreJVM, fs2JVM, doobieJVM, circeJVM)
.dependsOn(coreJVM, fs2JVM, doobieJVM, circeJVM, akkaJVM)

val root = project
.in(file("."))
Expand All @@ -198,4 +216,4 @@ val root = project
.settings(
name := "calev-root"
)
.aggregate(coreJVM, coreJS, fs2JVM, fs2JS, doobieJVM, circeJVM, circeJS)
.aggregate(coreJVM, coreJS, fs2JVM, fs2JS, doobieJVM, circeJVM, circeJS, akkaJVM)
24 changes: 24 additions & 0 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ compared to systemd:
```sbt
libraryDependencies += "com.github.eikek" %% "calev-circe" % "@VERSION@"
```
- The *akka* module allows to use calendar events with [Akka Scheduler](https://doc.akka.io/docs/akka/current/scheduler.html)
and [Akka Timers](https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#typed-scheduling).
```sbt
libraryDependencies += "com.github.eikek" %% "calev-akka" % "@VERSION@"
```


## Examples
Expand Down Expand Up @@ -205,3 +210,22 @@ val read = for {
value <- parsed.as[Meeting]
} yield value
```
### Akka

```scala mdoc
import com.github.eikek.calev.CalEvent
import com.github.eikek.calev.akka.CalevTimerScheduler
import _root_.akka.actor.typed.scaladsl.Behaviors._

case class Tick(timestamp: ZonedDateTime)

def calEvent = CalEvent.unsafe("*-*-* *:0/1:0") // every day, every full minute

def behavior = CalevTimerScheduler.withCalendarEvent(calEvent, Tick)(
receiveMessage[Tick] { tick =>
println(s"Tick scheduled at ${tick.timestamp} received at: ${Instant.now}")
same
}
)
```
More examples to come...
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.github.eikek.calev.akka

import java.time.{Clock, ZonedDateTime}
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.{Duration, FiniteDuration}

import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.ActorContext
import com.github.eikek.calev.CalEvent
import com.typesafe.config.Config

object CalevActorScheduling {

implicit class CalevActorScheduling(ctx: ActorContext[_]) {
def scheduleUpcoming[T](
calEvent: CalEvent,
target: ActorRef[T],
triggerFactory: ZonedDateTime => T,
clock: Clock = Clock.systemDefaultZone()
): Unit = new UpcomingEventProvider(clock)(calEvent).foreach {
case (instant, delay) =>
ctx.scheduleOnce(delay, target, triggerFactory(instant))
}
}

implicit final private[akka] class ConfigOps(val config: Config) extends AnyVal {
def getDurationMillis(path: String): FiniteDuration =
getDuration(path, TimeUnit.MILLISECONDS)

def getDurationNanos(path: String): FiniteDuration =
getDuration(path, TimeUnit.NANOSECONDS)

private def getDuration(path: String, unit: TimeUnit): FiniteDuration =
Duration(config.getDuration(path, unit), unit)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.github.eikek.calev.akka

import java.time.{Clock, ZonedDateTime}

import scala.reflect.ClassTag

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{Behavior, BehaviorInterceptor, TypedActorContext}
import com.github.eikek.calev.CalEvent
import com.github.eikek.calev.akka.CalevActorScheduling.ConfigOps

private[akka] class CalevInterceptor[B, T <: B: ClassTag](
clock: Clock,
calEvent: CalEvent,
triggerFactory: ZonedDateTime => T
) extends BehaviorInterceptor[T, B] {

override def aroundStart(
ctx: TypedActorContext[T],
target: BehaviorInterceptor.PreStartTarget[B]
): Behavior[B] =
scheduleUpcoming(target.start(ctx))

override def aroundReceive(
ctx: TypedActorContext[T],
msg: T,
target: BehaviorInterceptor.ReceiveTarget[B]
): Behavior[B] =
scheduleUpcoming(target(ctx, msg))

private def scheduleUpcoming(target: Behavior[B]): Behavior[B] = Behaviors.setup {
ctx =>
val config = ctx.system.settings.config
val minInterval = config.getDurationMillis("akka.scheduler.tick-duration") * 4

CalevTimerScheduler.withCalevTimers(Some(minInterval), clock) { scheduler =>
scheduler.scheduleUpcoming(calEvent, triggerFactory)
target
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.github.eikek.calev.akka

import java.time.Clock

import scala.concurrent.ExecutionContext

import akka.actor.typed.Scheduler
import com.github.eikek.calev.CalEvent

class CalevScheduler(scheduler: Scheduler, clock: Clock = Clock.systemDefaultZone()) {
private val upcomingEventProvider = new UpcomingEventProvider(clock)

def scheduleUpcoming(calEvent: CalEvent, runnable: Runnable)(implicit
executor: ExecutionContext
): Unit =
upcomingEventProvider(calEvent).foreach { case (_, delay) =>
scheduler.scheduleOnce(delay, runnable)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.github.eikek.calev.akka

import java.time.{Clock, ZonedDateTime}

import scala.concurrent.duration._
import scala.reflect.ClassTag

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import com.github.eikek.calev.CalEvent

object CalevTimerScheduler {

def withCalevTimers[T](
minInterval: Option[FiniteDuration] = None,
clock: Clock = Clock.systemDefaultZone()
)(factory: CalevTimerScheduler[T] => Behavior[T]): Behavior[T] =
Behaviors.withTimers { scheduler =>
factory(new CalevTimerSchedulerImpl[T](scheduler, clock, minInterval))
}

def withCalendarEvent[B, T <: B: ClassTag](
calEvent: CalEvent,
triggerFactory: ZonedDateTime => T,
clock: Clock = Clock.systemDefaultZone()
)(inner: Behavior[B]): Behavior[T] =
Behaviors.intercept(() =>
new CalevInterceptor[B, T](clock, calEvent, triggerFactory)
)(inner)

}

trait CalevTimerScheduler[T] {
def scheduleUpcoming(calEvent: CalEvent, triggerFactory: ZonedDateTime => T): Unit
}

private[akka] class CalevTimerSchedulerImpl[T](
scheduler: TimerScheduler[T],
clock: Clock,
minInterval: Option[FiniteDuration]
) extends CalevTimerScheduler[T] {

private val upcomingEventProvider =
new UpcomingEventProvider(clock, minInterval)

def scheduleUpcoming(calEvent: CalEvent, triggerFactory: ZonedDateTime => T): Unit =
upcomingEventProvider(calEvent)
.foreach { case (instant, delay) =>
scheduler.startSingleTimer(triggerFactory.apply(instant), delay)
}

}
Loading

0 comments on commit 750061a

Please sign in to comment.