Skip to content

Commit

Permalink
Add compat module for wire compatibility with Akka
Browse files Browse the repository at this point in the history
  • Loading branch information
nvollmar committed Oct 16, 2023
1 parent df9525e commit 88968d2
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 2 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lazy val root: Project = project.in(file("."))
.settings(publish / skip := true)
.settings(OsgiKeys.privatePackage := Nil)
.settings(OsgiKeys.exportPackage := Seq("io.altoo.*"))
.aggregate(core, typed)
.aggregate(core, typed, akkaCompat)

lazy val core: Project = Project("pekko-kryo-serialization", file("pekko-kryo-serialization"))
.settings(moduleSettings)
Expand All @@ -53,6 +53,12 @@ lazy val typed: Project = Project("pekko-kryo-serialization-typed", file("pekko-
.settings(libraryDependencies ++= typedDeps ++ testingDeps)
.dependsOn(core)

lazy val akkaCompat: Project = Project("pekko-kryo-serialization-akka-compat", file("pekko-kryo-serialization-akka-compat"))
.settings(moduleSettings)
.settings(description := "pekko-serialization implementation using kryo - extension for improved wire compatibility with Akka")
.settings(libraryDependencies ++= testingDeps)
.dependsOn(core, typed)

// Dependencies
lazy val coreDeps = Seq(
"io.altoo" %% "scala-kryo-serialization" % scalaKryoVersion,
Expand All @@ -61,6 +67,7 @@ lazy val coreDeps = Seq(
"org.lz4" % "lz4-java" % "1.8.0",
"commons-io" % "commons-io" % "2.11.0" % Test,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.9.0")

lazy val typedDeps = Seq(
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package akka.actor

/**
* Dummy class to register a serializer for akka.actor.ActorRef on Pekko system
*/
class ActorRef
class RepointableActorRef
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package akka.actor.typed

/**
* Dummy class to register a serializer for akka.actor.typed.ActorRef on Pekko system
*/
class ActorRef[-T]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package akka.util


/**
* Dummy class to register a serializer for akka.util.ByteString on Pekko system
*/
class ByteString
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.altoo.pekko.serialization.kryo.compat

import io.altoo.pekko.serialization.kryo.compat.serializer.CompatActorRefSerializer
import io.altoo.serialization.kryo.pekko.DefaultKryoInitializer
import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer
import io.altoo.serialization.kryo.scala.serializer.ScalaKryo

class AkkaCompatKryoInitializer extends DefaultKryoInitializer {

override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = {
super.initPekkoSerializer(kryo)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system))
kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.altoo.pekko.serialization.kryo.compat

import io.altoo.pekko.serialization.kryo.compat.serializer.{CompatActorRefSerializer, CompatTypedActorRefSerializer}
import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer
import io.altoo.serialization.kryo.pekko.typed.TypedKryoInitializer
import io.altoo.serialization.kryo.scala.serializer.ScalaKryo

class TypedAkkaCompatKryoInitializer extends TypedKryoInitializer {

override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = {
super.initPekkoSerializer(kryo)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system))
kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer])
}

override protected def initPekkoTypedSerializer(kryo: ScalaKryo): Unit = {
super.initPekkoTypedSerializer(kryo)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.actor.typed.ActorRef[Nothing]], new CompatTypedActorRefSerializer(typedSystem))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.pekko.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.pekko.actor.{ActorRef, ExtendedActorSystem}
import org.apache.pekko.serialization.Serialization

/**
* Specialized serializer for actor refs.
*
* @author Roman Levenstein
*/
class CompatActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] {

override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = {
val path = input.readString()
val newPath = path.replace("akka://", "pekko://")
system.provider.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = {
output.writeAscii(Serialization.serializedActorPath(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.pekko.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.pekko.actor.typed.*

/**
* Specialized serializer for typed actor refs.
*
* @author Arman Bilge
*/
class CompatTypedActorRefSerializer(val system: ActorSystem[Nothing]) extends Serializer[ActorRef[Nothing]] {

private val resolver = ActorRefResolver(system)

override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef[Nothing]]): ActorRef[Nothing] = {
val path = input.readString()
val newPath = path.replace("akka://", "pekko://")
resolver.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef[Nothing]): Unit = {
output.writeAscii(resolver.toSerializationFormat(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.altoo.pekko.serialization.kryo.compat

import com.typesafe.config.ConfigFactory
import io.altoo.serialization.kryo.pekko.PekkoKryoSerializer
import io.altoo.testing.SampleMessage
import org.apache.pekko.actor.{Actor, ActorSystem, Props}
import org.apache.pekko.serialization.SerializationExtension
import org.apache.pekko.testkit.TestKit
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, Inside}

object AkkaCompatSerializerTest {
private val testConfig =
"""
|pekko {
| actor {
| serializers {
| kryo = "io.altoo.serialization.kryo.pekko.PekkoKryoSerializer"
| }
| serialization-bindings {
| "org.apache.pekko.actor.ActorRef" = kryo
| "akka.actor.ActorRef" = kryo
| "akka.actor.ActorRef" = kryo
| "io.altoo.testing.SampleMessage" = kryo
| }
| }
|}
|pekko-kryo-serialization {
| trace = true
| id-strategy = "default"
| implicit-registration-logging = true
| post-serialization-transformations = off
|
| kryo-initializer = "io.altoo.pekko.serialization.kryo.compat.AkkaCompatKryoInitializer"
|}
|""".stripMargin

// serialized io.altoo.testing.SampleMessage(actorRef: akka.actor.ActorRef) with akka-kryo-serialization
private val akkaActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97,
103, -27, 1, 1, 1, 97, 107, 107, 97, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99, 116, 111, 114, 82, 101, -26, 1, 97, 107, 107, 97, 58, 47,
47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116, 111, 114, 35, 49, 53, 56, 51, 48, 57, 56, 56, 51, -75)
}

class AkkaCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigFactory.parseString(AkkaCompatSerializerTest.testConfig).withFallback(ConfigFactory.load())))
with AnyFlatSpecLike with Matchers with Inside with BeforeAndAfterAll {

private val serialization = SerializationExtension(system)

override protected def afterAll(): Unit = shutdown(system)

behavior of "ActorRefSerializer"

it should "serialize and deserialize actorRef" in {
// create actor with path to not get deadLetter ref
system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor")

val serializer = serialization.serializerFor(classOf[SampleMessage])
serializer shouldBe a[PekkoKryoSerializer]

// deserialize
val deserialized = serializer.fromBinary(AkkaCompatSerializerTest.akkaActorRefSerialized)
deserialized shouldBe a[SampleMessage]
deserialized.asInstanceOf[SampleMessage].actorRef.path.toString shouldBe "pekko://testSystem/user/sampleActor"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.altoo.testing

import org.apache.pekko.actor.ActorRef

// Mirror class using Pekko ActorRef instead of Akka ActorRef
case class SampleMessage(actorRef: ActorRef) extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer}
*/
class ActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] {

override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef]): ActorRef = {
override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = {
val path = input.readString()
system.provider.resolveActorRef(path)
}
Expand Down

0 comments on commit 88968d2

Please sign in to comment.