Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Highlevel consumer using executor #45

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,69 @@ import kafka.serializer.DefaultDecoder
}
```


AkkaDirectConsumer
-----------------

This consumer is based on [Kafka High Level Consumer](https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example). This consumer creates `N` threads per topic where `N` = `streams`. Each thread will block on the iterator until a message becomes available. Messages are then forwarded to your receiving actor in the same manor.

This actor overrides settings for both:
```
consumer.timeout.ms = "-1"
auto.commit.enable = "true"
```
This will not affect the configured values for other actors [AkkaBatchConsumer, AkkaConsumer] and thus you can use both styles of actors in the same project to fit your needs.


This consumer is suitable for low volume messages that require minimal latency (eg: Instant Messaging applications)

Some notes on blocking:
* [Blocking Needs Careful Management](http://doc.akka.io/docs/akka/2.2.3/general/actor-systems.html#Blocking_Needs_Careful_Management)

`AkkaDirectConsumerProps` has similar convenience methods to `AkkaConsumerProps`

So a full example of getting a high level consumer up and running looks like this.

```scala
import akka.actor.{Props, ActorSystem, Actor}
import com.sclasen.akka.kafka.{AkkaDirectConsumer, AkkaDirectConsumerProps}
import kafka.serializer.DefaultDecoder

object Example {
class Printer extends Actor{
def receive = {
case x:Any =>
println(x)
sender ! StreamFSM.Processed
}
}

val system = ActorSystem("test")
val printer = system.actorOf(Props[Printer])

/*
the consumer will have 4 streams (threads that block per topic)
*/
val consumerProps = AkkaDirectConsumerProps.forSystem(
system = system,
zkConnect = "localhost:2181",
topic = "your-kafka-topic",
group = "your-consumer-group",
streams = 4, //one per partition
keyDecoder = new DefaultDecoder(),
msgDecoder = new DefaultDecoder(),
receiver = printer
)

val consumer = new AkkaConsumer(consumerProps)

consumer.start() //returns a Future[Unit] that completes when the connector is started

consumer.stop() //returns a Future[Unit] that completes when the connector is stopped.

}
```

develop
=======

Expand Down
163 changes: 163 additions & 0 deletions src/main/scala/com/sclasen/akka/kafka/AkkaDirectConsumer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package com.sclasen.akka.kafka

import java.util.concurrent.{Executors, TimeUnit}

import akka.actor._
import akka.util.Timeout
import kafka.consumer._
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration._

class AkkaDirectConsumer[Key,Msg](props:AkkaDirectConsumerProps[Key,Msg]) {

import AkkaConsumer._

lazy val connector = createConnection(props)
lazy val executor = Executors.newFixedThreadPool(props.streams)

def kafkaConsumerProps(zkConnect:String, groupId:String) = {
val consumerConfig = props.system.settings.config.getConfig("kafka.consumer")
val required = Set("zookeeper.connect" -> zkConnect, "group.id" -> groupId, "consumer.timeout.ms" -> "-1", "auto.commit.enable" -> "true")
val requiredKeys = required.map(_._1)
val consumerProps = consumerConfig.entrySet().asScala
.filter( k => !requiredKeys.contains(k.getKey) )
.map{
entry => entry.getKey -> consumerConfig.getString(entry.getKey)
} ++ required
toProps(consumerProps)
}

def kafkaConsumer(zkConnect:String, groupId:String) = {
Consumer.create(new ConsumerConfig(kafkaConsumerProps(zkConnect, groupId)))
}

def createConnection(props:AkkaDirectConsumerProps[Key,Msg]) = {
import props._
val consumerConfig = new ConsumerConfig(kafkaConsumerProps(zkConnect, group))
Consumer.create(consumerConfig)
}

def createStream = {
props.topicFilterOrTopic match {
case Left(t) =>
props.system.log.info(s"createStream for topic: ${t}")
connector.createMessageStreamsByFilter(t, props.streams, props.keyDecoder, props.msgDecoder)
case Right(t) =>
props.system.log.info(s"createStream for topic: ${t}")
connector.createMessageStreams(Map(t -> props.streams), props.keyDecoder, props.msgDecoder).apply(t)
}
}

def start(): Future[Unit] = Future {
val streams = createStream
streams.foreach { stream =>
executor.submit(new Runnable() {
def run(): Unit = {
val it = stream.iterator()
def hasNext = try {
it.hasNext()
} catch {
case cte: ConsumerTimeoutException =>
props.system.log.warning("AkkaHighLevelConsumer should not see ConsumerTimeoutException")
false
}
props.system.log.debug("blocking on stream")
while (hasNext) {
val msg = props.msgHandler(it.next())
props.receiver ! msg
}
}
})

}
()
}

def stop():Future[Unit] = {
connector.shutdown()
executor.shutdown()
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
props.system.log.warning("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch {
case e:InterruptedException =>
props.system.log.warning("Interrupted during shutdown, exiting uncleanly");
}
Future.successful(Unit)
}
}

object AkkaDirectConsumerProps {
def forSystem[Key, Msg](system: ActorSystem,
zkConnect: String,
topic: String,
group: String,
streams: Int,
keyDecoder: Decoder[Key],
msgDecoder: Decoder[Msg],
receiver: ActorRef,
msgHandler: (MessageAndMetadata[Key,Msg]) => Any = defaultHandler[Key, Msg],
connectorActorName:Option[String] = None,
startTimeout: Timeout = Timeout(5 seconds)): AkkaDirectConsumerProps[Key, Msg] =
AkkaDirectConsumerProps(system, system, zkConnect, Right(topic), group, streams, keyDecoder, msgDecoder, msgHandler, receiver, connectorActorName, startTimeout)

def forSystemWithFilter[Key, Msg](system: ActorSystem,
zkConnect: String,
topicFilter: TopicFilter,
group: String,
streams: Int,
keyDecoder: Decoder[Key],
msgDecoder: Decoder[Msg],
receiver: ActorRef,
msgHandler: (MessageAndMetadata[Key,Msg]) => Any = defaultHandler[Key, Msg],
connectorActorName:Option[String] = None,
startTimeout: Timeout = Timeout(5 seconds)): AkkaDirectConsumerProps[Key, Msg] =
AkkaDirectConsumerProps(system, system, zkConnect, Left(topicFilter), group, streams, keyDecoder, msgDecoder, msgHandler, receiver, connectorActorName, startTimeout)


def forContext[Key, Msg](context: ActorContext,
zkConnect: String,
topic: String,
group: String,
streams: Int,
keyDecoder: Decoder[Key],
msgDecoder: Decoder[Msg],
receiver: ActorRef,
msgHandler: (MessageAndMetadata[Key,Msg]) => Any = defaultHandler[Key, Msg],
connectorActorName:Option[String] = None,
startTimeout: Timeout = Timeout(5 seconds)): AkkaDirectConsumerProps[Key, Msg] =
AkkaDirectConsumerProps(context.system, context, zkConnect, Right(topic), group, streams, keyDecoder, msgDecoder, msgHandler, receiver,connectorActorName, startTimeout)

def forContextWithFilter[Key, Msg](context: ActorContext,
zkConnect: String,
topicFilter: TopicFilter,
group: String,
streams: Int,
keyDecoder: Decoder[Key],
msgDecoder: Decoder[Msg],
receiver: ActorRef,
msgHandler: (MessageAndMetadata[Key,Msg]) => Any = defaultHandler[Key, Msg],
connectorActorName:Option[String] = None,
startTimeout: Timeout = Timeout(5 seconds)): AkkaDirectConsumerProps[Key, Msg] =
AkkaDirectConsumerProps(context.system, context, zkConnect, Left(topicFilter), group, streams, keyDecoder, msgDecoder, msgHandler, receiver,connectorActorName, startTimeout)

def defaultHandler[Key,Msg]: (MessageAndMetadata[Key,Msg]) => Any = msg => msg.message()
}

case class AkkaDirectConsumerProps[Key,Msg](system:ActorSystem,
actorRefFactory:ActorRefFactory,
zkConnect:String,
topicFilterOrTopic:Either[TopicFilter,String],
group:String,
streams:Int,
keyDecoder:Decoder[Key],
msgDecoder:Decoder[Msg],
msgHandler: (MessageAndMetadata[Key,Msg]) => Any,
receiver: ActorRef,
connectorActorName:Option[String],
startTimeout:Timeout = Timeout(5 seconds))