This is embedded reactive and typesafe Scala-based DSL for declaring flows using functional composition. Open-sourced with Agoda.com permission.
To get started with SBT, simply add the following to your build.sbt file:
resolvers += Resolver.sonatypeRepo("snapshots")
libraryDependencies += "com.github.dk14" %% "rflows" % "0.2-SNAPSHOT"
Usage examples (tests):
- routing -
SimpleRouter
- defines flows and groups. Тhere is also аbefore
intercеptor (after
andlog
defined as well) that executes before each step (act). You may have a look atManagableFlows
trait if you need automatical timеouts for flows - it will work only forFuture
-based implementation not for "simple" ones (Act.simple
and so on). See RoutingDSL for sources. - metrics - shows how to measure time, spent on each service. Note that time spent for a step itself is measured automatically. All such measurements are tagged with "flowName" tag and each contains a name of current step and service that was executed inside that step, so you need implicitly pass
MetricsContext
to ensure that service is bound to a step. You can get this context from message iteslf (seewithMeta
in routing examples).
sequentially executes several acts, splitters, aggregators or other flows
To create new application-level flow - just extend InstrumentedRouting
trait and define your flow inside:
import api.routing.dsl._
trait Routing1 extends InstrumentedRouting[Request] with MyService1 with MyService2 {
val Flow1 = Act("act1", handler1) |> Act("act2", handler2) |>
Split("split1", splitter1) |> Aggregate("aggregate1", aggregate) |> Act("act6", handler6)
}
handlerN
- handler for current flow step
MyServiceN
- service-layer used by handlers. It's recommended to inject those services in mix-in style.
So here you can define flow as composition (|>
) of acts, splitters and aggregators
just executes a simple processor
Act.simple("act1", handler1)
handler1
takes input message and returns output message
executes a processor and moves to next act or returns a result using Future
Act("act1", handler1)
handler1
takes input message and returns a future of output message, so next act in chain (act2) is executed only when future completes. In combination with scala's Future
composition, it gives you a way to acquire asynchronous services and build processing in reactive way.
groups several sub-flows to be used inside one Split
(see below)
implicit object FlowGroup1 extends Group {
val SubFlow1 = Act("act3", handler3) |> Act("act4", handler4) tagged //do not forget "tagged" word !!
val SubFlow2 = Act("act5", handler5) tagged
}
You can't mix flows from different groups inside one splitting/routing handler - there is special compiler check to prevent that. Please, don't forget tagged
and implicit object
- otherwise your splitter won't compile. You can have several groups (implicit object
's ) in one scope
allows to split a message into several ones and send each part to its own (dynamically chosen) flow in reactive way. You can also send same message to different flows.
Split("split1", splitter1)
def splitter1(in: Data[Request]) = Seq( //same message, different flows
Future(in) -> SubFlow1,
Future(in) -> SubFlow2
)
or
def splitter1(in: Data[Request]) = Seq( //different messages, different flows
Future(in.part1) -> SubFlow1,
Future(in.part2) -> SubFlow2
)
or just
Split.simple("split1", simpleSplitter)
def simpleSplitter(in: Data[Request]) = Seq(
in.part1 -> SubFlow1,
in.part2 -> SubFlow2
)
splitter1
function takes message and returns list of directives, each directive is Future[Message] -> Flow
, which means that it contains a new message (might be just a copy of input) and flow to process this message.
All flows (returned by splitter) should be part of same group (see above). For instance,
SubFlow1
andSubFlow2
are parts ofFlowGroup1
(see the diagram).
Otherwise, you get scary compilation error, which is intentional protection from SubFlow Hell
route process a message and route a result to dynamically chosen flow (in reactive way)
Simmilar to Split
(it's particular case actually) but route1
returns list with only one directive (message+destination) to process. It can be implemented with same Split construction:
Split("route1", router1)
def router1(in: Data[Request]) =
if (in.isForSubFlow1)
Seq(Future(in.data) -> SubFlow1) else Seq(Future(in.data) -> SubFlow2)
//or just
Split.route("route", router) = //note that this one doesn't need aggregation
if (in.isForSubFlow1)
in.data -> SubFlow1 else in.data -> SubFlow2
aggregates messages after being split
Split("split1", splitter1) |> Aggregate("aggregate1", aggregator1)
Shoud be placed right after corresponding Split
it's just a function
Flow1(Seq(incomingMessage1), context)
If you specify several incoming messages, they're going to be processed in parallel (it's like a resuming after Split
). Usually you don't need that.
Register special reporter for yammer metrics:
import java.util.concurrent.TimeUnit
import api.routing.metrics._
DefaultMetricsReporter.localReporter.start(1, TimeUnit.SECONDS)
Add visualization actor into your spray-can Boot
using MetricsBoot
trait :
import api.routing.http._
object Boot extends App with MetricsBoot {
implicit val system = ActorSystem("spray-sample-system")
IO(Http) ! Http.Bind(metricsService, "localhost", 8080)
}
Data is a wrapper around message. It contains the message itself, and also its Context and Meta-info
Context is an object shared between all Flows(like Flow1
) that belong to a single Routing (like Routing1
).
You can specify its type when extending InstrumentedRouting
trait (see below) and specify an object itself when passing incoming message to a flow, like Flow(incomingMessage, context)
Usually the context is current request, so just Flow(incomingMessage, incomingMessage)
Meta - is meta-information which contains current executing flow itself (including name), so you can propagate it to handler, services etc. Used for metrics and logging to define a concrete flow (act, splitter or aggregator) where some measured/logged event is actually takes place.
TODO: Big Flow Decomposition
TODO: "how to write handler" examples
TODO: how to register before, after, failure handlers
Inspired by Camel, Akka projects. Thanks to Alexander Nemish, Ilya Tkachuk, Vitalij Kotlyarenko for some of ideas.