Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compat module for wire compatibility with Akka #13

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lazy val root: Project = project.in(file("."))
.settings(publish / skip := true)
.settings(OsgiKeys.privatePackage := Nil)
.settings(OsgiKeys.exportPackage := Seq("io.altoo.*"))
.aggregate(core, typed)
.aggregate(core, typed, akkaCompat)

lazy val core: Project = Project("pekko-kryo-serialization", file("pekko-kryo-serialization"))
.settings(moduleSettings)
Expand All @@ -53,6 +53,12 @@ lazy val typed: Project = Project("pekko-kryo-serialization-typed", file("pekko-
.settings(libraryDependencies ++= typedDeps ++ testingDeps)
.dependsOn(core)

lazy val akkaCompat: Project = Project("pekko-kryo-serialization-akka-compat", file("pekko-kryo-serialization-akka-compat"))
.settings(moduleSettings)
.settings(description := "pekko-serialization implementation using kryo - extension for improved wire compatibility with Akka")
.settings(libraryDependencies ++= testingDeps)
.dependsOn(core, typed)

// Dependencies
lazy val coreDeps = Seq(
"io.altoo" %% "scala-kryo-serialization" % scalaKryoVersion,
Expand All @@ -61,6 +67,7 @@ lazy val coreDeps = Seq(
"org.lz4" % "lz4-java" % "1.8.0",
"commons-io" % "commons-io" % "2.11.0" % Test,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.9.0")

lazy val typedDeps = Seq(
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package akka.actor

/**
* Dummy class to register a serializer for akka.actor.ActorRef on Pekko system
*/
class ActorRef
class RepointableActorRef
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package akka.actor.typed

/**
* Dummy class to register a serializer for akka.actor.typed.ActorRef on Pekko system
*/
class ActorRef[-T]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package akka.util


/**
* Dummy class to register a serializer for akka.util.ByteString on Pekko system
*/
class ByteString
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.altoo.pekko.serialization.kryo.compat

import io.altoo.pekko.serialization.kryo.compat.serializer.CompatActorRefSerializer
import io.altoo.serialization.kryo.pekko.DefaultKryoInitializer
import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer
import io.altoo.serialization.kryo.scala.serializer.ScalaKryo

class AkkaCompatKryoInitializer extends DefaultKryoInitializer {

override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = {
super.initPekkoSerializer(kryo)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system))
kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.altoo.pekko.serialization.kryo.compat

import io.altoo.pekko.serialization.kryo.compat.serializer.{CompatActorRefSerializer, CompatTypedActorRefSerializer}
import io.altoo.serialization.kryo.pekko.serializer.ByteStringSerializer
import io.altoo.serialization.kryo.pekko.typed.TypedKryoInitializer
import io.altoo.serialization.kryo.scala.serializer.ScalaKryo

class TypedAkkaCompatKryoInitializer extends TypedKryoInitializer {

override protected def initPekkoSerializer(kryo: ScalaKryo): Unit = {
super.initPekkoSerializer(kryo)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.actor.ActorRef], new CompatActorRefSerializer(system))
kryo.addDefaultSerializer(classOf[akka.actor.RepointableActorRef], new CompatActorRefSerializer(system))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.util.ByteString], classOf[ByteStringSerializer])
}

override protected def initPekkoTypedSerializer(kryo: ScalaKryo): Unit = {
super.initPekkoTypedSerializer(kryo)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[akka.actor.typed.ActorRef[Nothing]], new CompatTypedActorRefSerializer(typedSystem))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.pekko.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.pekko.actor.{ActorRef, ExtendedActorSystem}
import org.apache.pekko.serialization.Serialization

/**
* Specialized serializer for actor refs.
*
* @author Roman Levenstein
*/
class CompatActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] {

override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = {
val path = input.readString()
val newPath = path.replace("akka://", "pekko://")
system.provider.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = {
output.writeAscii(Serialization.serializedActorPath(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.pekko.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.pekko.actor.typed.*

/**
* Specialized serializer for typed actor refs.
*
* @author Arman Bilge
*/
class CompatTypedActorRefSerializer(val system: ActorSystem[Nothing]) extends Serializer[ActorRef[Nothing]] {

private val resolver = ActorRefResolver(system)

override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef[Nothing]]): ActorRef[Nothing] = {
val path = input.readString()
val newPath = path.replace("akka://", "pekko://")
resolver.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef[Nothing]): Unit = {
output.writeAscii(resolver.toSerializationFormat(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.altoo.pekko.serialization.kryo.compat

import com.typesafe.config.ConfigFactory
import io.altoo.serialization.kryo.pekko.PekkoKryoSerializer
import io.altoo.testing.SampleMessage
import org.apache.pekko.actor.{Actor, ActorSystem, Props}
import org.apache.pekko.serialization.SerializationExtension
import org.apache.pekko.testkit.TestKit
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, Inside}

object AkkaCompatSerializerTest {
private val testConfig =
"""
|pekko {
| actor {
| serializers {
| kryo = "io.altoo.serialization.kryo.pekko.PekkoKryoSerializer"
| }
| serialization-bindings {
| "org.apache.pekko.actor.ActorRef" = kryo
| "akka.actor.ActorRef" = kryo
| "akka.actor.ActorRef" = kryo
| "io.altoo.testing.SampleMessage" = kryo
| }
| }
|}
|pekko-kryo-serialization {
| trace = true
| id-strategy = "default"
| implicit-registration-logging = true
| post-serialization-transformations = off
|
| kryo-initializer = "io.altoo.pekko.serialization.kryo.compat.AkkaCompatKryoInitializer"
|}
|""".stripMargin

// serialized io.altoo.testing.SampleMessage(actorRef: akka.actor.ActorRef) with akka-kryo-serialization
private val akkaActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97,
103, -27, 1, 1, 1, 97, 107, 107, 97, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99, 116, 111, 114, 82, 101, -26, 1, 97, 107, 107, 97, 58, 47,
47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116, 111, 114, 35, 49, 53, 56, 51, 48, 57, 56, 56, 51, -75)
}

class AkkaCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigFactory.parseString(AkkaCompatSerializerTest.testConfig).withFallback(ConfigFactory.load())))
with AnyFlatSpecLike with Matchers with Inside with BeforeAndAfterAll {

private val serialization = SerializationExtension(system)

override protected def afterAll(): Unit = shutdown(system)

behavior of "ActorRefSerializer"

it should "serialize and deserialize actorRef" in {
danischroeter marked this conversation as resolved.
Show resolved Hide resolved
// create actor with path to not get deadLetter ref
system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor")

val serializer = serialization.serializerFor(classOf[SampleMessage])
serializer shouldBe a[PekkoKryoSerializer]

// deserialize
val deserialized = serializer.fromBinary(AkkaCompatSerializerTest.akkaActorRefSerialized)
deserialized shouldBe a[SampleMessage]
deserialized.asInstanceOf[SampleMessage].actorRef.path.toString shouldBe "pekko://testSystem/user/sampleActor"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.altoo.testing

import org.apache.pekko.actor.ActorRef

// Mirror class using Pekko ActorRef instead of Akka ActorRef
case class SampleMessage(actorRef: ActorRef) extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer}
*/
class ActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] {

override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef]): ActorRef = {
override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = {
val path = input.readString()
system.provider.resolveActorRef(path)
}
Expand Down
Loading