Small documentation with my first steps into the akka framework that's made possible use the Actor Model paradigm in the Scala Language
I need to made a multi-threading application in wich thoose threads have to send messages between them. I went trough multiple options like do it all my self using plain sockets and Threads, but the cost of maintenance of a development like that could be very high. After that I thought about a RPC aproach and I was almost decided, but i found that the akka framework has a great compatibility with the project Apache Kafka, wich i use as a message broker in that application, so i finally decided to use akka as a long term bet of using it with Apache Kafka Streams into a Future.
With this selection I'm aware that the cost of a framework like akka is higher than the cost of a RPC framework like Fineagle(Looks like a cool project made by Twitter).
So if you check the akka project webpage, you can see a really fancy definition of the framework, but to keep it simple, it's just a framework that made really easy for developers to develop distributed applications using the Actor model
I think that the best way to understand what is akka and how it works it doing a simple example, so let's develop and application in which we have 2 kinds of Actors, Producers and Consumers. The idea is that, the Producers will send a message through a socket that will be listening a Consumer Actor. Once the Consumer Actor recieves the data, it will print the message.
So with this example, we are going to learn how to send message to actor using akka and how to write and read from sockets,
Producer Consumer
* --------> *
The producer Actor that we are building it's an easy one. The Actor will do the following steps:
- Actor check if can stablish a connection to a socket
- Actor sends data trough the socket
- Actor close the connection
- Actor destroys itself
For this puspose, we are going to use Scala companion objects.
First of all we have to create the following Boilerplate in a file that I'm gonna name SocketProducer.scala
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, Props}
import akka.io.{IO, Tcp}
import akka.util.ByteString
import scala.collection.mutable
object SocketProducer {
//Needed to create an Actor with a given configuration
def props(host: String, port: Int, messages: mutable.MutableList[String]): Props =
Props(new SocketProducer(host,port,messages))
}
class SocketProducer(host: String, port: Int, messages: mutable.MutableList[String]) extends Actor with ActorLogging{
import SocketProducer._
// function that allows us to do some logic before the Actor is up
override def preStart(): Unit = {
log.info("Starting socket PRODUCER actor with following config {}:{}",host,port)
}
// function that allows us to do some logic once the Actor is down
override def postStop(): Unit = {
log.info("Stopped socket PRODUCER actor")
}
}
Now we need to connect the Actor to the socket, for that purppose we have to communicate our Actor with the IO Actor that manages the communication with the Socket and we also have to implement the receive method of our Actor, because the IO Actor will send use message with information about the socket.
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, Props}
import akka.io.{IO, Tcp}
import akka.util.ByteString
import scala.collection.mutable
object SocketProducer {
//Needed to create an Actor with a given configuration
def props(host: String, port: Int, messages: mutable.MutableList[String]): Props =
Props(new SocketProducer(host,port,messages))
}
class SocketProducer(host: String, port: Int, messages: mutable.MutableList[String]) extends Actor with ActorLogging{
import SocketProducer._
import akka.io.Tcp._
import context.system
// Actor that manages the low level communication with the socket
// this Actor can send messages to out SocketProducer Actor, because
// a reference to the SocketProducer Actor is implicit passed
// when we invoke IO(Tcp)
IO(Tcp) ! Connect(new InetSocketAddress(host,port))
// function that allows us to do some logic before the Actor is up
override def preStart(): Unit = {
log.info("Starting socket PRODUCER actor with following config {}:{}",host,port)
}
// function that allows us to do some logic once the Actor is down
override def postStop(): Unit = {
log.info("Stopped socket PRODUCER actor")
}
override def receive = {
case CommandFailed(_: Connect) =>
context stop self
//Needed to establish a connection
case c @ Connected(remote, local) =>
//opens connection
val connection = sender()
//establish connection
connection ! Register(self)
//sends messages
messages.foreach(message => {
log.info("Trying to send message: "+message)
connection ! Write(ByteString(message))
})
//close connection
connection ! Close
//this is some black magic that will be explained later
//but the idea it's that it manages the socket status
context become {
case data: ByteString =>
connection ! Write(data)
case CommandFailed(w: Write) =>
// O/S buffer was full
log.warning("write failed")
case Received(data) =>
case "close" =>
connection ! Close
case _: ConnectionClosed =>
log.warning("connection closed")
//kills the Actor
context stop self
}
//if the Actor receives a message that it doesn't understand, it sends a warning through the logger
case x @ _ => log.warning("Something else is up. ---> " + x.toString)
}
}
Ok, we have the Actor, but we need to test if it's working as we tought. First of all, we have to launch the Actor from somewhere in our code. In this case I'm gonna invoke the Actor from a Main scala object.
import actors.SocketProducer
import akka.actor.ActorSystem
import scala.collection.mutable
object Main {
def main(args: Array[String]): Unit = {
//Creates and Actor system in wich will reside/live our actor
val actorSystem = ActorSystem.create("MyActorSystem")
//data that we want to send with our actor
val list = mutable.MutableList("1","2","Three","0100").map(_+"\n")
//I append a \n to each element in the list for visibility
//creates and runs the actor
val actor = actorSystem.actorOf(SocketProducer.props("localhost",9000,list))
}
}
That code is enough to launch our akka Actor, but we need a somebody listen in localhost:9000
to check if everything is alright. Before creating the consumer Actor we can check it using the netcat terminal utiity/program in a terminal as follows:
nc -lk 9000
So if we launch our scala code once netcat is running, netcat should print the following output:
1
2
Three
0100
Everything is working! nice!
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, Props}
import akka.io.{IO, Tcp}
import akka.util.ByteString
import scala.collection.mutable
object SocketConsumer {
//Needed to create an Actor with a given configuration
def props(port: Int): Props = Props(new SocketConsumer(port))
}
class SocketConsumer(port: Int) extends Actor with ActorLogging{
import SocketConsumer._
import akka.io.Tcp._
import context.system
// Actor that manages the low level communication with the socket
// this Actor can send messages to out SocketProducer Actor, because
// a reference to the SocketProducer Actor is implicit passed
// when we invoke IO(Tcp)
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", port))
// function that allows us to do some logic before the Actor is up
override def preStart(): Unit = {
log.info("Starting socket CONSUMER actor in port {}",port)
}
// function that allows us to do some logic once the Actor is down
override def postStop(): Unit = {
log.info("Stopped socket CONSUMER actor")
}
override def receive = {
case b @ Bound(localAddress) =>
log.info("CONSUMER bound to: "+b)
case CommandFailed(_: Bind) => context stop self
case c @ Connected(remote, local) =>
val connection = sender()
connection ! Register(self)
case Received(message) => log.info("Received: \n"+ message.decodeString("UTF8"))
case PeerClosed => context stop self
//if the Actor receives a message that it doesn't understand, it sends a warning through the logger
case x @ _ => log.warning("Something else is up. ---> " + x.toString)
}
}
Now we have to add the consumer to our Main scala object:
import actors.{SocketConsumer, SocketProducer}
import akka.actor.ActorSystem
import scala.collection.mutable
object Main {
def main(args: Array[String]): Unit = {
//Creates and Actor system in wich will reside/live our actor
val actorSystem = ActorSystem.create("MyActorSystem")
//data that we want to send with our actor
val list = mutable.MutableList("1","2","Three","0100").map(_+"\n")
//creates and runs the actor
val actorConsumer = actorSystem.actorOf(SocketConsumer.props(9000))
val actorProducer = actorSystem.actorOf(SocketProducer.props("localhost",9000,list))
}
}
Finally if whe launch the program, the output should be:
[INFO] [09/05/2017 13:31:06.639] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/$a] Starting socket CONSUMER actor in port 9000
[INFO] [09/05/2017 13:31:06.639] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$b] Starting socket PRODUCER actor with following config localhost:9000
[INFO] [09/05/2017 13:31:06.652] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$a] CONSUMER bound to: Bound(/127.0.0.1:9000)
[INFO] [09/05/2017 13:31:06.657] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: 1
[INFO] [09/05/2017 13:31:06.666] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: 2
[INFO] [09/05/2017 13:31:06.666] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: Three
[INFO] [09/05/2017 13:31:06.666] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: 0100
[INFO] [09/05/2017 13:31:06.669] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$a] Received:
1
2
Three
0100
[INFO] [09/05/2017 13:31:06.672] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$a] Stopped socket CONSUMER actor
[WARN] [09/05/2017 13:31:06.673] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$b] connection closed
[INFO] [09/05/2017 13:31:06.673] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$b] Stopped socket PRODUCER actor