Skip to content

logicalguess/akka-spark-streams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

34 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

#Domain

object Category extends Enumeration {
  val info, warning, error = Value
}

/**Event model */
case class Event(guid: String, category: Category.Value, action: String, timestamp: String)

#Spark Streaming

object Functions {
  val eventProcessor: DStream[Event] => Unit = { stream =>
    val categoryCount = stream.map(event => (event.category, 1))
    val grouped = categoryCount.reduceByKeyAndWindow(_ + _, _ - _, Seconds(1), Seconds(1))
    grouped.foreachRDD(rdd => rdd.foreach {
      case (category, count) => println("count per window - " + category + ":" + count)}
    )
  }
}

    val rddQueue = new Queue[RDD[Event]]()

    // Create the QueueInputDStream and use it do some processing
    val inputStream = ssc.queueStream(rddQueue)
    eventProcessor(inputStream)
    ssc.start()

    //publish events
    val actorSystem = SparkEnv.get.actorSystem
    val cancellable =
      actorSystem.scheduler.schedule(0 milliseconds, 1 second) {
        rddQueue += ssc.sparkContext.makeRDD((1 to 100).map(_ => RandomEvent()), 10)
      }
    actorSystem.scheduler.scheduleOnce(10 seconds) {
      cancellable.cancel()
    }
    actorSystem.scheduler.scheduleOnce(20 seconds) {
      ssc.stop()
    }

#Akka Streaming

  def countsByCategoryFilter = {
    println("\nFilter\n----------------")


    def processor(category: String): Transformer[Event, (String, Int)] = Flow[Event]
      .filter(_.category.toString == category)
      .groupedWithin(Int.MaxValue, 1.second)
      .map[(String, Int)](es => (category, es.length))


    val g = RunnableGraph.fromGraph(GraphDSL.create() {
      implicit builder =>
        import GraphDSL.Implicits._

        // Source
        val A: Outlet[Event] = builder.add(events).out

        // Flows
        val B: UniformFanOutShape[Event, Event] = builder.add(Broadcast[Event](3))

        val C: FlowShape[Event, (String, Int)] = builder.add(processor(Category.info.toString))
        val D: FlowShape[Event, (String, Int)] = builder.add(processor(Category.warning.toString))
        val E: FlowShape[Event, (String, Int)] = builder.add(processor(Category.error.toString))

        val F: UniformFanInShape[(String, Int), (String, Int)] = builder.add(Merge[(String, Int)](3))

        // Sinks
        //val G: Inlet[Any] = builder.add(Sink.foreach(println)).in
        val G: SinkShape[Any] = builder.add(Sink.actorSubscriber(Props(classOf[SinkActor], "sinkActor")))

        A ~> B ~> C ~> F ~> G
             B ~> D ~> F
             B ~> E ~> F

        ClosedShape
    })

    g.run()
  }

    def countsByCategoryMap = {
      println("\nMap\n----------------")

      val processor: Transformer[Event, Map[String, Int]] = Flow[Event]
        .groupedWithin(Int.MaxValue, 1.second)
        .map(w => w.groupBy(_.category).map { case (category, events) => category.toString -> events.length })

      val g = RunnableGraph.fromGraph(GraphDSL.create() {
        implicit builder =>
          import GraphDSL.Implicits._

          // Source
          val A: Outlet[Event] = builder.add(events).out

          // Flows
          val B: FlowShape[Event, Map[String, Int]] = builder.add(processor)

          // Sinks
          val C: Inlet[Any] = builder.add(Sink.foreach(println)).in

          A ~> B ~> C

          ClosedShape
      })

      g.run()
    }

About

Akka and Spark streaming, with Algebird

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages