diff --git a/README.md b/README.md index 8f85a56..4e444e8 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ compared to systemd: and generator for calendar events. It is also published for ScalaJS. With sbt, use: ```sbt - libraryDependencies += "com.github.eikek" %% "calev-core" % "0.5.0" + libraryDependencies += "com.github.eikek" %% "calev-core" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT" ``` - The *fs2* module contains utilities to work with [FS2](https://github.com/functional-streams-for-scala/fs2) streams. @@ -61,19 +61,24 @@ compared to systemd: [fs2-cron](https://github.com/fthomas/fs2-cron) library. It is also published for ScalaJS. With sbt, use ```sbt - libraryDependencies += "com.github.eikek" %% "calev-fs2" % "0.5.0" + libraryDependencies += "com.github.eikek" %% "calev-fs2" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT" ``` - The *doobie* module contains `Meta`, `Read` and `Write` instances for `CalEvent` to use with [doobie](https://github.com/tpolecat/doobie). ```sbt - libraryDependencies += "com.github.eikek" %% "calev-doobie" % "0.5.0" + libraryDependencies += "com.github.eikek" %% "calev-doobie" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT" ``` - The *circe* module defines a json decoder and encoder for `CalEvent` instances to use with [circe](https://github.com/circe/circe). It is also published for ScalaJS. ```sbt - libraryDependencies += "com.github.eikek" %% "calev-circe" % "0.5.0" + libraryDependencies += "com.github.eikek" %% "calev-circe" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT" + ``` +- The *akka* module allows to use calendar events with [Akka Scheduler](https://doc.akka.io/docs/akka/current/scheduler.html) + and [Akka Timers](https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#typed-scheduling). + ```sbt + libraryDependencies += "com.github.eikek" %% "calev-akka" % "0.5.0+21-e3e154f9+20210615-1815-SNAPSHOT" ``` @@ -158,16 +163,16 @@ import java.time._ ce.asString // res4: String = "*-*-* 00/2:00:00" val now = LocalDateTime.now -// now: LocalDateTime = 2021-06-07T08:58:36.220 +// now: LocalDateTime = 2021-06-15T18:19:16.523 ce.nextElapse(now) -// res5: Option[LocalDateTime] = Some(value = 2021-06-07T10:00) +// res5: Option[LocalDateTime] = Some(value = 2021-06-15T20:00) ce.nextElapses(now, 5) // res6: List[LocalDateTime] = List( -// 2021-06-07T10:00, -// 2021-06-07T12:00, -// 2021-06-07T14:00, -// 2021-06-07T16:00, -// 2021-06-07T18:00 +// 2021-06-15T20:00, +// 2021-06-15T22:00, +// 2021-06-16T00:00, +// 2021-06-16T02:00, +// 2021-06-16T04:00 // ) ``` @@ -200,7 +205,7 @@ import java.time.LocalTime import scala.concurrent.ExecutionContext implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) -// timer: Timer[IO] = cats.effect.internals.IOTimer@2b83ba8c +// timer: Timer[IO] = cats.effect.internals.IOTimer@f5ba38a val printTime = IO(println(LocalTime.now)) // printTime: IO[Unit] = Delay(thunk = ) @@ -221,9 +226,9 @@ val task = CalevFs2.awakeEvery[IO](event).evalMap(_ => printTime) // task: Stream[IO[x], Unit] = Stream(..) task.take(3).compile.drain.unsafeRunSync -// 08:58:38.017 -// 08:58:40.001 -// 08:58:42 +// 18:19:18.025 +// 18:19:20.002 +// 18:19:22.002 ``` @@ -261,8 +266,8 @@ val insert = // acquire = Suspend( // a = PrepareStatement(a = "INSERT INTO mytable (event) VALUES (?)") // ), -// use = doobie.hi.connection$$$Lambda$7902/37512530@6f0227ba, -// release = cats.effect.Bracket$$Lambda$7904/1537174318@5df8aeda +// use = doobie.hi.connection$$$Lambda$14364/1018409754@1fda5d05, +// release = cats.effect.Bracket$$Lambda$14366/1381227270@4dcfea87 // ) // ) @@ -273,8 +278,8 @@ val select = // acquire = Suspend( // a = PrepareStatement(a = "SELECT event FROM mytable WHERE id = 1") // ), -// use = doobie.hi.connection$$$Lambda$7902/37512530@55d0504f, -// release = cats.effect.Bracket$$Lambda$7904/1537174318@30746cde +// use = doobie.hi.connection$$$Lambda$14364/1018409754@55610e82, +// release = cats.effect.Bracket$$Lambda$14366/1381227270@3a81a963 // ) // ) ``` @@ -348,3 +353,22 @@ val read = for { // ) // ) ``` +### Akka + +```scala +import com.github.eikek.calev.CalEvent +import com.github.eikek.calev.akka.CalevTimerScheduler +import _root_.akka.actor.typed.scaladsl.Behaviors._ + +case class Tick(timestamp: ZonedDateTime) + +def calEvent = CalEvent.unsafe("*-*-* *:0/1:0") // every day, every full minute // every day, every full minute + +def behavior = CalevTimerScheduler.withCalendarEvent(calEvent, Tick)( + receiveMessage[Tick] { tick => + println(s"Tick scheduled at ${tick.timestamp} received at: ${Instant.now}") + same + } +) +``` +More examples to come... diff --git a/build.sbt b/build.sbt index 63011ac..272cea8 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,7 @@ val sharedSettings = Seq( "-Ywarn-value-discard" ) else if (scalaBinaryVersion.value.startsWith("2.13")) - List("-Werror", "-Wdead-code", "-Wunused", "-Wvalue-discard") + List("-Werror", "-Wdead-code", "-Wunused", "-Wvalue-discard", "-Ytasty-reader") else if (scalaBinaryVersion.value.startsWith("3")) List( "-explain", @@ -166,6 +166,24 @@ lazy val circe = crossProject(JSPlatform, JVMPlatform) lazy val circeJVM = circe.jvm lazy val circeJS = circe.js +lazy val akkaJVM = project + .in(file("modules/akka")) + .dependsOn(coreJVM) + .settings(sharedSettings) + .settings(scalafixSettings) + .settings( + name := "calev-akka", + crossScalaVersions := Seq(scala212, scala213), + developers += Developer( + id = "pawelkaczor", + name = "Paweł Kaczor", + url = url("https://github.com/pawelkaczor"), + email = "" + ), + libraryDependencies ++= + Dependencies.akkaAll ++ Dependencies.scalaTest ++ Dependencies.logback.map(_ % Test) + ) + lazy val readme = project .in(file("modules/readme")) .enablePlugins(MdocPlugin) @@ -189,7 +207,7 @@ lazy val readme = project () } ) - .dependsOn(coreJVM, fs2JVM, doobieJVM, circeJVM) + .dependsOn(coreJVM, fs2JVM, doobieJVM, circeJVM, akkaJVM) val root = project .in(file(".")) @@ -198,4 +216,4 @@ val root = project .settings( name := "calev-root" ) - .aggregate(coreJVM, coreJS, fs2JVM, fs2JS, doobieJVM, circeJVM, circeJS) + .aggregate(coreJVM, coreJS, fs2JVM, fs2JS, doobieJVM, circeJVM, circeJS, akkaJVM) diff --git a/docs/readme.md b/docs/readme.md index 65113f4..74fc9cf 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -75,6 +75,11 @@ compared to systemd: ```sbt libraryDependencies += "com.github.eikek" %% "calev-circe" % "@VERSION@" ``` +- The *akka* module allows to use calendar events with [Akka Scheduler](https://doc.akka.io/docs/akka/current/scheduler.html) + and [Akka Timers](https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#typed-scheduling). + ```sbt + libraryDependencies += "com.github.eikek" %% "calev-akka" % "@VERSION@" + ``` ## Examples @@ -205,3 +210,22 @@ val read = for { value <- parsed.as[Meeting] } yield value ``` +### Akka + +```scala mdoc +import com.github.eikek.calev.CalEvent +import com.github.eikek.calev.akka.CalevTimerScheduler +import _root_.akka.actor.typed.scaladsl.Behaviors._ + +case class Tick(timestamp: ZonedDateTime) + +def calEvent = CalEvent.unsafe("*-*-* *:0/1:0") // every day, every full minute + +def behavior = CalevTimerScheduler.withCalendarEvent(calEvent, Tick)( + receiveMessage[Tick] { tick => + println(s"Tick scheduled at ${tick.timestamp} received at: ${Instant.now}") + same + } +) +``` +More examples to come... diff --git a/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevActorScheduling.scala b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevActorScheduling.scala new file mode 100644 index 0000000..9a2b70c --- /dev/null +++ b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevActorScheduling.scala @@ -0,0 +1,38 @@ +package com.github.eikek.calev.akka + +import java.time.{Clock, ZonedDateTime} +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.{Duration, FiniteDuration} + +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.ActorContext +import com.github.eikek.calev.CalEvent +import com.typesafe.config.Config + +object CalevActorScheduling { + + implicit class CalevActorScheduling(ctx: ActorContext[_]) { + def scheduleUpcoming[T]( + calEvent: CalEvent, + target: ActorRef[T], + triggerFactory: ZonedDateTime => T, + clock: Clock = Clock.systemDefaultZone() + ): Unit = new UpcomingEventProvider(clock)(calEvent).foreach { + case (instant, delay) => + ctx.scheduleOnce(delay, target, triggerFactory(instant)) + } + } + + implicit final private[akka] class ConfigOps(val config: Config) extends AnyVal { + def getDurationMillis(path: String): FiniteDuration = + getDuration(path, TimeUnit.MILLISECONDS) + + def getDurationNanos(path: String): FiniteDuration = + getDuration(path, TimeUnit.NANOSECONDS) + + private def getDuration(path: String, unit: TimeUnit): FiniteDuration = + Duration(config.getDuration(path, unit), unit) + } + +} diff --git a/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevInterceptor.scala b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevInterceptor.scala new file mode 100644 index 0000000..707228c --- /dev/null +++ b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevInterceptor.scala @@ -0,0 +1,42 @@ +package com.github.eikek.calev.akka + +import java.time.{Clock, ZonedDateTime} + +import scala.reflect.ClassTag + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{Behavior, BehaviorInterceptor, TypedActorContext} +import com.github.eikek.calev.CalEvent +import com.github.eikek.calev.akka.CalevActorScheduling.ConfigOps + +private[akka] class CalevInterceptor[B, T <: B: ClassTag]( + clock: Clock, + calEvent: CalEvent, + triggerFactory: ZonedDateTime => T +) extends BehaviorInterceptor[T, B] { + + override def aroundStart( + ctx: TypedActorContext[T], + target: BehaviorInterceptor.PreStartTarget[B] + ): Behavior[B] = + scheduleUpcoming(target.start(ctx)) + + override def aroundReceive( + ctx: TypedActorContext[T], + msg: T, + target: BehaviorInterceptor.ReceiveTarget[B] + ): Behavior[B] = + scheduleUpcoming(target(ctx, msg)) + + private def scheduleUpcoming(target: Behavior[B]): Behavior[B] = Behaviors.setup { + ctx => + val config = ctx.system.settings.config + val minInterval = config.getDurationMillis("akka.scheduler.tick-duration") * 4 + + CalevTimerScheduler.withCalevTimers(Some(minInterval), clock) { scheduler => + scheduler.scheduleUpcoming(calEvent, triggerFactory) + target + } + } + +} diff --git a/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevScheduler.scala b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevScheduler.scala new file mode 100644 index 0000000..d6b5a42 --- /dev/null +++ b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevScheduler.scala @@ -0,0 +1,19 @@ +package com.github.eikek.calev.akka + +import java.time.Clock + +import scala.concurrent.ExecutionContext + +import akka.actor.typed.Scheduler +import com.github.eikek.calev.CalEvent + +class CalevScheduler(scheduler: Scheduler, clock: Clock = Clock.systemDefaultZone()) { + private val upcomingEventProvider = new UpcomingEventProvider(clock) + + def scheduleUpcoming(calEvent: CalEvent, runnable: Runnable)(implicit + executor: ExecutionContext + ): Unit = + upcomingEventProvider(calEvent).foreach { case (_, delay) => + scheduler.scheduleOnce(delay, runnable) + } +} diff --git a/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevTimerScheduler.scala b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevTimerScheduler.scala new file mode 100644 index 0000000..df5cdcb --- /dev/null +++ b/modules/akka/src/main/scala/com/github/eikek/calev/akka/CalevTimerScheduler.scala @@ -0,0 +1,52 @@ +package com.github.eikek.calev.akka + +import java.time.{Clock, ZonedDateTime} + +import scala.concurrent.duration._ +import scala.reflect.ClassTag + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} +import com.github.eikek.calev.CalEvent + +object CalevTimerScheduler { + + def withCalevTimers[T]( + minInterval: Option[FiniteDuration] = None, + clock: Clock = Clock.systemDefaultZone() + )(factory: CalevTimerScheduler[T] => Behavior[T]): Behavior[T] = + Behaviors.withTimers { scheduler => + factory(new CalevTimerSchedulerImpl[T](scheduler, clock, minInterval)) + } + + def withCalendarEvent[B, T <: B: ClassTag]( + calEvent: CalEvent, + triggerFactory: ZonedDateTime => T, + clock: Clock = Clock.systemDefaultZone() + )(inner: Behavior[B]): Behavior[T] = + Behaviors.intercept(() => + new CalevInterceptor[B, T](clock, calEvent, triggerFactory) + )(inner) + +} + +trait CalevTimerScheduler[T] { + def scheduleUpcoming(calEvent: CalEvent, triggerFactory: ZonedDateTime => T): Unit +} + +private[akka] class CalevTimerSchedulerImpl[T]( + scheduler: TimerScheduler[T], + clock: Clock, + minInterval: Option[FiniteDuration] +) extends CalevTimerScheduler[T] { + + private val upcomingEventProvider = + new UpcomingEventProvider(clock, minInterval) + + def scheduleUpcoming(calEvent: CalEvent, triggerFactory: ZonedDateTime => T): Unit = + upcomingEventProvider(calEvent) + .foreach { case (instant, delay) => + scheduler.startSingleTimer(triggerFactory.apply(instant), delay) + } + +} diff --git a/modules/akka/src/main/scala/com/github/eikek/calev/akka/UpcomingEventProvider.scala b/modules/akka/src/main/scala/com/github/eikek/calev/akka/UpcomingEventProvider.scala new file mode 100644 index 0000000..8f5bf7f --- /dev/null +++ b/modules/akka/src/main/scala/com/github/eikek/calev/akka/UpcomingEventProvider.scala @@ -0,0 +1,34 @@ +package com.github.eikek.calev.akka + +import java.time.{Clock, Duration => JavaDuration, ZonedDateTime} + +import scala.concurrent.duration._ + +import com.github.eikek.calev.CalEvent + +class UpcomingEventProvider( + clock: Clock, + minInterval: Option[FiniteDuration] = None +) { + private def now = clock.instant().atZone(clock.getZone) + + def apply( + calEvent: CalEvent, + delay: FiniteDuration = Duration.Zero + ): Option[(ZonedDateTime, FiniteDuration)] = { + val refInstant: ZonedDateTime = now.minusNanos(delay.toNanos) + calEvent + .nextElapse(refInstant) + .map { instant => + (instant, JavaDuration.between(refInstant, instant).toMillis.millis) + } + .flatMap { + case (_, duration) + if minInterval.exists(min => duration.toMillis < min.toMillis) => + apply(calEvent, minInterval.get) + case (dt, duration) => + Some((dt, duration)) + } + } + +} diff --git a/modules/akka/src/test/resources/application-test.conf b/modules/akka/src/test/resources/application-test.conf new file mode 100644 index 0000000..11ee217 --- /dev/null +++ b/modules/akka/src/test/resources/application-test.conf @@ -0,0 +1,4 @@ +akka { + loglevel = DEBUG + log-config-on-start = off +} \ No newline at end of file diff --git a/modules/akka/src/test/resources/logback-test.xml b/modules/akka/src/test/resources/logback-test.xml new file mode 100644 index 0000000..b339cd2 --- /dev/null +++ b/modules/akka/src/test/resources/logback-test.xml @@ -0,0 +1,11 @@ + + + + %d{dd.MM HH:mm:ss.SSS} %.-1level [%10.10t] %-25.25logger{24} %X{device}%X{model} %m%n%ex + + + + + + + \ No newline at end of file diff --git a/modules/akka/src/test/scala/com/github/eikek/calev/akka/CalevTimerSchedulerTest.scala b/modules/akka/src/test/scala/com/github/eikek/calev/akka/CalevTimerSchedulerTest.scala new file mode 100644 index 0000000..453da18 --- /dev/null +++ b/modules/akka/src/test/scala/com/github/eikek/calev/akka/CalevTimerSchedulerTest.scala @@ -0,0 +1,75 @@ +package com.github.eikek.calev.akka + +import java.time.temporal.ChronoField +import java.time.{LocalTime, ZonedDateTime} + +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +import akka.actor.testkit.typed.scaladsl.{ + ManualTime, + ScalaTestWithActorTestKit, + TestProbe +} +import akka.actor.typed.scaladsl.Behaviors.{receiveMessage, same} +import com.github.eikek.calev.CalEvent +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +case class Tick(timestamp: ZonedDateTime) + +class CalevTimerSchedulerTest + extends ScalaTestWithActorTestKit(ManualTime.config) + with AnyWordSpecLike { + + val log = LoggerFactory.getLogger(getClass) + val manualTime = ManualTime() + val clock = new TestClock + + "Akka Timer" should { + val probe = TestProbe[Tick]() + + "trigger periodically according to given CalEvent" in { + + val calEvent = CalEvent.unsafe("*-*-* *:0/1:0") // every day, every full minute + + val behavior = CalevTimerScheduler.withCalendarEvent(calEvent, Tick, clock)( + receiveMessage[Tick] { tick => + probe.ref ! tick + log.info( + s"Tick scheduled at ${tick.timestamp.toLocalTime} received at: ${LocalTime.now(clock)}" + ) + same + } + ) + + spawn(behavior) + + expectNoMessagesFor( + (60 - LocalTime.now().get(ChronoField.SECOND_OF_MINUTE)).seconds - 1.second + ) + + timePasses(2.seconds) + probe.expectMessageType[Tick] + + expectNoMessagesFor(30.seconds) + + timePasses(30.seconds) + probe.expectMessageType[Tick] + + expectNoMessagesFor(30.seconds) + } + + def expectNoMessagesFor(duration: FiniteDuration): Unit = { + log.debug("Expect no messages for {}", duration) + manualTime.expectNoMessageFor(100.millis, probe) + timePasses(duration) + manualTime.expectNoMessageFor(100.millis, probe) + } + + def timePasses(duration: FiniteDuration): Unit = { + clock.tick(duration) + manualTime.timePasses(duration) + } + } + +} diff --git a/modules/akka/src/test/scala/com/github/eikek/calev/akka/TestClock.scala b/modules/akka/src/test/scala/com/github/eikek/calev/akka/TestClock.scala new file mode 100644 index 0000000..ed3a979 --- /dev/null +++ b/modules/akka/src/test/scala/com/github/eikek/calev/akka/TestClock.scala @@ -0,0 +1,36 @@ +package com.github.eikek.calev.akka + +import java.time._ + +import scala.concurrent.duration.FiniteDuration + +class TestClock extends Clock { + + @volatile private var _instant = roundToMillis(Instant.now()) + + override def getZone: ZoneId = ZoneOffset.UTC + + override def withZone(zone: ZoneId): Clock = + throw new UnsupportedOperationException("withZone not supported") + + override def instant(): Instant = + _instant + + def setInstant(newInstant: Instant): Unit = + _instant = roundToMillis(newInstant) + + def tick(duration: FiniteDuration): Instant = { + val newInstant = roundToMillis(_instant.plusMillis(duration.toMillis)) + _instant = newInstant + newInstant + } + + private def roundToMillis(i: Instant): Instant = { + // algo taken from java.time.Clock.tick + val epochMilli = i.toEpochMilli + Instant.ofEpochMilli(epochMilli - Math.floorMod(epochMilli, 1L)) + } + + override def toString = + s"TestClock(${_instant})" +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d2ba35c..0cce1e8 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,13 +2,19 @@ import sbt._ object Dependencies { - val circeVersion = "0.14.1" - val doobieVersion = "0.13.4" - val fs2Version = "2.5.6" - val h2Version = "1.4.200" - val log4sVersion = "1.8.2" - val logbackVersion = "1.2.3" - val munitVersion = "0.7.26" + val akkaVersion = "2.6.15" + val circeVersion = "0.14.1" + val doobieVersion = "0.13.4" + val fs2Version = "2.5.6" + val h2Version = "1.4.200" + val log4sVersion = "1.8.2" + val logbackVersion = "1.2.3" + val munitVersion = "0.7.26" + val scalaTestVersion = "3.2.9" + + val scalaTest = Seq( + "org.scalatest" %% "scalatest" % scalaTestVersion % Test + ) val munit = Seq( "org.scalameta" %% "munit" % munitVersion, @@ -46,4 +52,9 @@ object Dependencies { "io.circe" %% "circe-generic" % circeVersion, "io.circe" %% "circe-parser" % circeVersion ) + + val akkaAll = Seq( + "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test + ) }