diff --git a/build.sbt b/build.sbt index 83b6988..0abe63b 100644 --- a/build.sbt +++ b/build.sbt @@ -26,7 +26,7 @@ lazy val root: Project = project.in(file(".")) .settings(publish / skip := true) .settings(OsgiKeys.privatePackage := Nil) .settings(OsgiKeys.exportPackage := Seq("io.altoo.*")) - .aggregate(core, typed) + .aggregate(core, typed, akkaCompat) lazy val core: Project = Project("pekko-kryo-serialization", file("pekko-kryo-serialization")) .settings(moduleSettings) @@ -53,6 +53,12 @@ lazy val typed: Project = Project("pekko-kryo-serialization-typed", file("pekko- .settings(libraryDependencies ++= typedDeps ++ testingDeps) .dependsOn(core) +lazy val akkaCompat: Project = Project("pekko-kryo-serialization-akka-compat", file("pekko-kryo-serialization-akka-compat")) + .settings(moduleSettings) + .settings(description := "pekko-serialization implementation using kryo - extension for improved wire compatibility with Akka") + .settings(libraryDependencies ++= testingDeps) + .dependsOn(core, typed) + // Dependencies lazy val coreDeps = Seq( "io.altoo" %% "scala-kryo-serialization" % scalaKryoVersion, @@ -61,6 +67,7 @@ lazy val coreDeps = Seq( "org.lz4" % "lz4-java" % "1.8.0", "commons-io" % "commons-io" % "2.11.0" % Test, "org.scala-lang.modules" %% "scala-collection-compat" % "2.9.0") + lazy val typedDeps = Seq( "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion, "org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test) diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/ActorRef.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/ActorRef.scala new file mode 100644 index 0000000..b93fd56 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/ActorRef.scala @@ -0,0 +1,7 @@ +package akka.actor + +/** + * Dummy class to register a serializer for akka.actor.ActorRef on Pekko system + */ +class ActorRef +class RepointableActorRef diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/typed/ActorRef.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/typed/ActorRef.scala new file mode 100644 index 0000000..5207d91 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/actor/typed/ActorRef.scala @@ -0,0 +1,6 @@ +package akka.actor.typed + +/** + * Dummy class to register a serializer for akka.actor.typed.ActorRef on Pekko system + */ +class ActorRef[-T] diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/akka/util/ByteString.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/util/ByteString.scala new file mode 100644 index 0000000..aad2850 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/akka/util/ByteString.scala @@ -0,0 +1,7 @@ +package akka.util + + +/** + * Dummy class to register a serializer for akka.util.ByteString on Pekko system + */ +class ByteString diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatKryoInitializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatKryoInitializer.scala new file mode 100644 index 0000000..715ad78 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatKryoInitializer.scala @@ -0,0 +1,19 @@ +package io.altoo.pekko.serialization.kryo.compat + +import io.altoo.pekko.serialization.kryo.compat.serializer.CompatActorRefSerializer +import io.altoo.serialization.kryo.pekko.DefaultKryoInitializer +import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer +import io.altoo.serialization.kryo.scala.serializer.ScalaKryo + +class AkkaCompatKryoInitializer extends DefaultKryoInitializer { + + override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = { + super.initPekkoSerializer(kryo) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system)) + kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system)) + // registering dummy Akka ByteString to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer]) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/TypedAkkaCompatKryoInitializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/TypedAkkaCompatKryoInitializer.scala new file mode 100644 index 0000000..9f2c8a2 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/TypedAkkaCompatKryoInitializer.scala @@ -0,0 +1,26 @@ +package io.altoo.pekko.serialization.kryo.compat + +import io.altoo.pekko.serialization.kryo.compat.serializer.{CompatActorRefSerializer, CompatTypedActorRefSerializer} +import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer +import io.altoo.serialization.kryo.pekko.typed.TypedKryoInitializer +import io.altoo.serialization.kryo.scala.serializer.ScalaKryo + +class TypedAkkaCompatKryoInitializer extends TypedKryoInitializer { + + override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = { + super.initPekkoSerializer(kryo) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system)) + kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system)) + // registering dummy Akka ByteString to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer]) + } + + override protected def initPekkoTypedSerializer(kryo: ScalaKryo): Unit = { + super.initPekkoTypedSerializer(kryo) + + // registering dummy Akka ActorRef to provide wire compatibility + kryo.addDefaultSerializer(classOf[akka.actor.typed.ActorRef[Nothing]], new CompatTypedActorRefSerializer(typedSystem)) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala new file mode 100644 index 0000000..089b1c8 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatActorRefSerializer.scala @@ -0,0 +1,42 @@ +/** + * ***************************************************************************** + * Copyright 2012 Roman Levenstein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * **************************************************************************** + */ + +package io.altoo.pekko.serialization.kryo.compat.serializer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} +import org.apache.pekko.actor.{ActorRef, ExtendedActorSystem} +import org.apache.pekko.serialization.Serialization + +/** + * Specialized serializer for actor refs. + * + * @author Roman Levenstein + */ +class CompatActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] { + + override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = { + val path = input.readString() + val newPath = path.replace("akka://", "pekko://") + system.provider.resolveActorRef(newPath) + } + + override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = { + output.writeAscii(Serialization.serializedActorPath(obj)) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala new file mode 100644 index 0000000..4b6086f --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/main/scala/io/altoo/pekko/serialization/kryo/compat/serializer/CompatTypedActorRefSerializer.scala @@ -0,0 +1,43 @@ +/** + * ***************************************************************************** + * Copyright 2012 Roman Levenstein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * **************************************************************************** + */ + +package io.altoo.pekko.serialization.kryo.compat.serializer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} +import org.apache.pekko.actor.typed.* + +/** + * Specialized serializer for typed actor refs. + * + * @author Arman Bilge + */ +class CompatTypedActorRefSerializer(val system: ActorSystem[Nothing]) extends Serializer[ActorRef[Nothing]] { + + private val resolver = ActorRefResolver(system) + + override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef[Nothing]]): ActorRef[Nothing] = { + val path = input.readString() + val newPath = path.replace("akka://", "pekko://") + resolver.resolveActorRef(newPath) + } + + override def write(kryo: Kryo, output: Output, obj: ActorRef[Nothing]): Unit = { + output.writeAscii(resolver.toSerializationFormat(obj)) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala new file mode 100644 index 0000000..3e27b25 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/pekko/serialization/kryo/compat/AkkaCompatSerializerTest.scala @@ -0,0 +1,80 @@ +package io.altoo.pekko.serialization.kryo.compat + +import com.typesafe.config.ConfigFactory +import io.altoo.serialization.kryo.pekko.PekkoKryoSerializer +import io.altoo.testing.SampleMessage +import org.apache.pekko.actor.{Actor, ActorSystem, Props} +import org.apache.pekko.serialization.SerializationExtension +import org.apache.pekko.testkit.TestKit +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, Inside} + +object AkkaCompatSerializerTest { + private val testConfig = + """ + |pekko { + | actor { + | serializers { + | kryo = "io.altoo.serialization.kryo.pekko.PekkoKryoSerializer" + | } + | serialization-bindings { + | "org.apache.pekko.actor.ActorRef" = kryo + | "akka.actor.ActorRef" = kryo + | "akka.actor.ActorRef" = kryo + | "io.altoo.testing.SampleMessage" = kryo + | } + | } + |} + |pekko-kryo-serialization { + | trace = true + | id-strategy = "default" + | implicit-registration-logging = true + | post-serialization-transformations = off + | + | kryo-initializer = "io.altoo.pekko.serialization.kryo.compat.AkkaCompatKryoInitializer" + |} + |""".stripMargin + + // serialized io.altoo.testing.SampleMessage(actorRef: akka.actor.ActorRef) with akka-kryo-serialization + private val akkaActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97, + 103, -27, 1, 1, 1, 97, 107, 107, 97, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99, 116, 111, 114, 82, 101, -26, 1, 97, 107, 107, 97, 58, 47, + 47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116, 111, 114, 35, 49, 53, 56, 51, 48, 57, 56, 56, 51, -75) + + // serialized io.altoo.testing.SampleMessage(actorRef: akka.actor.ActorRef) with akka-kryo-serialization + private val pekkoActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97, + 103, -27, 1, 1, 1, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 112, 101, 107, 107, 111, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99, + 116, 111, 114, 82, 101, -26, 1, 112, 101, 107, 107, 111, 58, 47, 47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116, + 111, 114, 35, 56, 48, 52, 54, 54, 57, 49, 52, -79) + +} + +class AkkaCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigFactory.parseString(AkkaCompatSerializerTest.testConfig).withFallback(ConfigFactory.load()))) + with AnyFlatSpecLike with Matchers with Inside with BeforeAndAfterAll { + + private val serialization = SerializationExtension(system) + + override protected def afterAll(): Unit = shutdown(system) + + behavior of "ActorRefSerializer" + + ignore should "serialize and deserialize actorRef" in { + // create actor with path to not get deadLetter ref + system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor") + + val serializer = serialization.serializerFor(classOf[SampleMessage]) + serializer shouldBe a[PekkoKryoSerializer] + + // deserialize + val deserialized = serializer.fromBinary(AkkaCompatSerializerTest.akkaActorRefSerialized) + deserialized shouldBe a[SampleMessage] + deserialized.asInstanceOf[SampleMessage].actorRef.path.toString shouldBe "pekko://testSystem/user/sampleActor" + } + + it should "serialize" in { + val ref = system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor") + val serializer = serialization.serializerFor(classOf[SampleMessage]) + serializer shouldBe a[PekkoKryoSerializer] + println(serializer.toBinary(SampleMessage(ref)).mkString("Array[Byte](", ", ", ")")) + } +} diff --git a/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/testing/SampleMessage.scala b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/testing/SampleMessage.scala new file mode 100644 index 0000000..161d3f6 --- /dev/null +++ b/pekko-kryo-serialization-akka-compat/src/test/scala/io/altoo/testing/SampleMessage.scala @@ -0,0 +1,6 @@ +package io.altoo.testing + +import org.apache.pekko.actor.ActorRef + +// Mirror class using Pekko ActorRef instead of Akka ActorRef +case class SampleMessage(actorRef: ActorRef) extends Serializable diff --git a/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala b/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala index 5705aeb..5660647 100644 --- a/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala +++ b/pekko-kryo-serialization/src/main/scala/io/altoo/serialization/kryo/pekko/serializer/ActorRefSerializer.scala @@ -30,7 +30,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer} */ class ActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] { - override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef]): ActorRef = { + override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = { val path = input.readString() system.provider.resolveActorRef(path) }