From 88968d272cd04c377e6e9427d2948b1754fc1b83 Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Fri, 29 Sep 2023 15:12:11 +0200 Subject: [PATCH 1/2] Add compat module for wire compatibility with Akka --- build.sbt | 9 ++- .../src/main/scala/akka/actor/ActorRef.scala | 7 ++ .../scala/akka/actor/typed/ActorRef.scala | 6 ++ .../src/main/scala/akka/util/ByteString.scala | 7 ++ .../compat/AkkaCompatKryoInitializer.scala | 19 ++++++ .../TypedAkkaCompatKryoInitializer.scala | 26 ++++++++ .../serializer/CompatActorRefSerializer.scala | 42 ++++++++++++ .../CompatTypedActorRefSerializer.scala | 43 ++++++++++++ .../compat/AkkaCompatSerializerTest.scala | 66 +++++++++++++++++++ .../io/altoo/testing/SampleMessage.scala | 6 ++ .../pekko/serializer/ActorRefSerializer.scala | 2 +- 11 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/ActorRef.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/typed/ActorRef.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/main/scala/akka/util/ByteString.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatKryoInitializer.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/TypedAkkaCompatKryoInitializer.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala create mode 100644 pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/testing/SampleMessage.scala diff --git a/build.sbt b/build.sbt index 83b6988..0abe63b 100644 --- a/build.sbt +++ b/build.sbt @@ -26,7 +26,7 @@ lazy val root: Project = project.in(file(".")) .settings(publish / skip := true) .settings(OsgiKeys.privatePackage := Nil) .settings(OsgiKeys.exportPackage := Seq("io.altoo.*")) - .aggregate(core, typed) + .aggregate(core, typed, akkaCompat) lazy val core: Project = Project("pekko-kryo-serialization", file("pekko-kryo-serialization")) .settings(moduleSettings) @@ -53,6 +53,12 @@ lazy val typed: Project = Project("pekko-kryo-serialization-typed", file("pekko- .settings(libraryDependencies ++= typedDeps ++ testingDeps) .dependsOn(core) +lazy val akkaCompat: Project = Project("pekko-kryo-serialization-akka-compat", file("pekko-kryo-serialization-akka-compat")) + .settings(moduleSettings) + .settings(description := "pekko-serialization implementation using kryo - extension for improved wire compatibility with Akka") + .settings(libraryDependencies ++= testingDeps) + .dependsOn(core, typed) + // Dependencies lazy val coreDeps = Seq( "io.altoo" %% "scala-kryo-serialization" % scalaKryoVersion, @@ -61,6 +67,7 @@ lazy val coreDeps = Seq( "org.lz4" % "lz4-java" % "1.8.0", "commons-io" % "commons-io" % "2.11.0" % Test, "org.scala-lang.modules" %% "scala-collection-compat" % "2.9.0") + lazy val typedDeps = Seq( "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion, "org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test) diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/ActorRef.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/ActorRef.scala new file mode 100644 index 0000000..b93fd56 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/ActorRef.scala @@ -0,0 +1,7 @@ +package akka.actor + +/** + * Dummy class to register a serializer for akka.actor.ActorRef on Pekko system + */ +class ActorRef +class RepointableActorRef diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/typed/ActorRef.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/typed/ActorRef.scala new file mode 100644 index 0000000..5207d91 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/typed/ActorRef.scala @@ -0,0 +1,6 @@ +package akka.actor.typed + +/** + * Dummy class to register a serializer for akka.actor.typed.ActorRef on Pekko system + */ +class ActorRef[-T] diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/akka/util/ByteString.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/util/ByteString.scala new file mode 100644 index 0000000..aad2850 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/util/ByteString.scala @@ -0,0 +1,7 @@ +package akka.util + + +/** + * Dummy class to register a serializer for akka.util.ByteString on Pekko system + */ +class ByteString diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatKryoInitializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatKryoInitializer.scala new file mode 100644 index 0000000..715ad78 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatKryoInitializer.scala @@ -0,0 +1,19 @@ +package io.altoo.pekko.serialization.kryo.compat + +import io.altoo.pekko.serialization.kryo.compat.serializer.CompatActorRefSerializer +import io.altoo.serialization.kryo.pekko.DefaultKryoInitializer +import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer +import io.altoo.serialization.kryo.scala.serializer.ScalaKryo + +class AkkaCompatKryoInitializer extends DefaultKryoInitializer { + + override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = { + super.initPekkoSerializer(kryo) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system)) + kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system)) + // registering dummy Akka ByteString to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer]) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/TypedAkkaCompatKryoInitializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/TypedAkkaCompatKryoInitializer.scala new file mode 100644 index 0000000..9f2c8a2 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/TypedAkkaCompatKryoInitializer.scala @@ -0,0 +1,26 @@ +package io.altoo.pekko.serialization.kryo.compat + +import io.altoo.pekko.serialization.kryo.compat.serializer.{CompatActorRefSerializer, CompatTypedActorRefSerializer} +import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer +import io.altoo.serialization.kryo.pekko.typed.TypedKryoInitializer +import io.altoo.serialization.kryo.scala.serializer.ScalaKryo + +class TypedAkkaCompatKryoInitializer extends TypedKryoInitializer { + + override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = { + super.initPekkoSerializer(kryo) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system)) + kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system)) + // registering dummy Akka ByteString to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer]) + } + + override protected def initPekkoTypedSerializer(kryo: ScalaKryo): Unit = { + super.initPekkoTypedSerializer(kryo) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.actor.typed.ActorRef[Nothing]], new CompatTypedActorRefSerializer(typedSystem)) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala new file mode 100644 index 0000000..089b1c8 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala @@ -0,0 +1,42 @@ +/** + * ***************************************************************************** + * Copyright 2012 Roman Levenstein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * **************************************************************************** + */ + +package io.altoo.pekko.serialization.kryo.compat.serializer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} +import org.apache.pekko.actor.{ActorRef, ExtendedActorSystem} +import org.apache.pekko.serialization.Serialization + +/** + * Specialized serializer for actor refs. + * + * @author Roman Levenstein + */ +class CompatActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] { + + override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = { + val path = input.readString() + val newPath = path.replace("akka://", "pekko://") + system.provider.resolveActorRef(newPath) + } + + override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = { + output.writeAscii(Serialization.serializedActorPath(obj)) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala new file mode 100644 index 0000000..4b6086f --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala @@ -0,0 +1,43 @@ +/** + * ***************************************************************************** + * Copyright 2012 Roman Levenstein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * **************************************************************************** + */ + +package io.altoo.pekko.serialization.kryo.compat.serializer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} +import org.apache.pekko.actor.typed.* + +/** + * Specialized serializer for typed actor refs. + * + * @author Arman Bilge + */ +class CompatTypedActorRefSerializer(val system: ActorSystem[Nothing]) extends Serializer[ActorRef[Nothing]] { + + private val resolver = ActorRefResolver(system) + + override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef[Nothing]]): ActorRef[Nothing] = { + val path = input.readString() + val newPath = path.replace("akka://", "pekko://") + resolver.resolveActorRef(newPath) + } + + override def write(kryo: Kryo, output: Output, obj: ActorRef[Nothing]): Unit = { + output.writeAscii(resolver.toSerializationFormat(obj)) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala new file mode 100644 index 0000000..66a0493 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala @@ -0,0 +1,66 @@ +package io.altoo.pekko.serialization.kryo.compat + +import com.typesafe.config.ConfigFactory +import io.altoo.serialization.kryo.pekko.PekkoKryoSerializer +import io.altoo.testing.SampleMessage +import org.apache.pekko.actor.{Actor, ActorSystem, Props} +import org.apache.pekko.serialization.SerializationExtension +import org.apache.pekko.testkit.TestKit +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, Inside} + +object AkkaCompatSerializerTest { + private val testConfig = + """ + |pekko { + | actor { + | serializers { + | kryo = "io.altoo.serialization.kryo.pekko.PekkoKryoSerializer" + | } + | serialization-bindings { + | "org.apache.pekko.actor.ActorRef" = kryo + | "akka.actor.ActorRef" = kryo + | "akka.actor.ActorRef" = kryo + | "io.altoo.testing.SampleMessage" = kryo + | } + | } + |} + |pekko-kryo-serialization { + | trace = true + | id-strategy = "default" + | implicit-registration-logging = true + | post-serialization-transformations = off + | + | kryo-initializer = "io.altoo.pekko.serialization.kryo.compat.AkkaCompatKryoInitializer" + |} + |""".stripMargin + + // serialized io.altoo.testing.SampleMessage(actorRef: akka.actor.ActorRef) with akka-kryo-serialization + private val akkaActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97, + 103, -27, 1, 1, 1, 97, 107, 107, 97, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99, 116, 111, 114, 82, 101, -26, 1, 97, 107, 107, 97, 58, 47, + 47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116, 111, 114, 35, 49, 53, 56, 51, 48, 57, 56, 56, 51, -75) +} + +class AkkaCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigFactory.parseString(AkkaCompatSerializerTest.testConfig).withFallback(ConfigFactory.load()))) + with AnyFlatSpecLike with Matchers with Inside with BeforeAndAfterAll { + + private val serialization = SerializationExtension(system) + + override protected def afterAll(): Unit = shutdown(system) + + behavior of "ActorRefSerializer" + + it should "serialize and deserialize actorRef" in { + // create actor with path to not get deadLetter ref + system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor") + + val serializer = serialization.serializerFor(classOf[SampleMessage]) + serializer shouldBe a[PekkoKryoSerializer] + + // deserialize + val deserialized = serializer.fromBinary(AkkaCompatSerializerTest.akkaActorRefSerialized) + deserialized shouldBe a[SampleMessage] + deserialized.asInstanceOf[SampleMessage].actorRef.path.toString shouldBe "pekko://testSystem/user/sampleActor" + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/testing/SampleMessage.scala b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/testing/SampleMessage.scala new file mode 100644 index 0000000..161d3f6 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/testing/SampleMessage.scala @@ -0,0 +1,6 @@ +package io.altoo.testing + +import org.apache.pekko.actor.ActorRef + +// Mirror class using Pekko ActorRef instead of Akka ActorRef +case class SampleMessage(actorRef: ActorRef) extends Serializable diff --git a/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala b/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala index 5705aeb..5660647 100644 --- a/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala +++ b/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala @@ -30,7 +30,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer} */ class ActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] { - override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef]): ActorRef = { + override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = { val path = input.readString() system.provider.resolveActorRef(path) } From 9534736a56451bab9dd8b047d1b47a7747b6c203 Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Mon, 16 Oct 2023 14:54:32 +0200 Subject: [PATCH 2/2] Extend migration documentation --- migration-guide.md | 18 ++++++++++++++++-- .../kryo/compat/AkkaCompatSerializerTest.scala | 2 +- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/migration-guide.md b/migration-guide.md index 16fad71..70b3d9c 100644 --- a/migration-guide.md +++ b/migration-guide.md @@ -4,5 +4,19 @@ pekko-kryo-serialization - migration guide Migration from akka-kryo-serialization to pekko-kryo-serialization ----------------------------- * You should upgrade to akka-kryo-serialization 2.5.0 before migrating to pekko-kryo-serialization and perform the respective [migrations](https://github.com/altoo-ag/akka-kryo-serialization/blob/master/migration-guide.md). -* TODO - incorporate pekko migration guide... - +* To support efforts for live migration from Akka to Pekko, compat modules are available in both Akka and Pekko Kryo Serialization to help with wire compatibility of custom messages containing ActorRefs and ByteStrings. + ``` + # on Pekko + libraryDependencies += "io.altoo" %% "pekko-kryo-serialization-akka-compat" % "1.0.1" + + # on Akka + libraryDependencies += "io.altoo" %% "pekko-kryo-serialization-akka-compat" % "2.5.2" + ``` + Then configure (or derive from if using a custom initializer) `AkkaCompatKryoInitializer` on Pekko, and `PekkoCompatKryoInitializer` on Akka. + ``` + # on Pekko + pekko-kryo-serialization.kryo-initializer = "io.altoo.pekko.serialization.kryo.compat.AkkaCompatKryoInitializer" + + # on Akka + kka-kryo-serialization.kryo-initializer = "io.altoo.akka.serialization.kryo.compat.PekkoCompatKryoInitializer" + ``` diff --git a/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala index 66a0493..aff8043 100644 --- a/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala +++ b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala @@ -51,7 +51,7 @@ class AkkaCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigF behavior of "ActorRefSerializer" - it should "serialize and deserialize actorRef" in { + it should "deserialize actorRef from Akka" in { // create actor with path to not get deadLetter ref system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor")