-
Notifications
You must be signed in to change notification settings - Fork 0
/
ManagableRouting.scala
54 lines (36 loc) · 1.45 KB
/
ManagableRouting.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package api.routing.dsl
import java.util.concurrent.{ Executors, TimeUnit }
import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.{ Promise, Future }
import scala.concurrent.duration.Duration
import scala.util.Try
/**
* Created by dkondratiuk on 7/6/15.
*/
trait ManagableFlows[Ctx] extends RoutingDSLBase[Ctx] {
lazy val flowTimeout = Duration(10, TimeUnit.SECONDS)
// $COVERAGE-OFF$
lazy val flowRetries = 1 //TODO implement
// $COVERAGE-ON$
lazy val scheduler = Executors.newScheduledThreadPool(1)
override def log[In, Out](f: Flow[In, Out], in: Seq[In])(action: => Future[Seq[Out]])(implicit ctx: Ctx): Future[Seq[Out]] = {
val r = super.log(f, in)(action)
val p = Promise[Seq[Out]]()
p tryCompleteWith r
if (f.hasNormalName && !f.isInstanceOf[Split[_, _, _, _, _]]) {
val action = new Runnable {
override def run(): Unit = p tryFailure new Exception(s"Act timeout: ${f.name}!")
}
scheduler.schedule(action, flowTimeout.toMillis, TimeUnit.MILLISECONDS)
}
p.future
}
}
trait ManagableFlowsConfig[C] extends ManagableFlows[C] {
lazy val config: Config = ConfigFactory.load()
override lazy val flowTimeout = Duration(
Try(config.getDuration("flow.timeout", TimeUnit.MILLISECONDS)).getOrElse(10000L), TimeUnit.MILLISECONDS)
// $COVERAGE-OFF$
override lazy val flowRetries = Try(config.getInt("flow.retries")).getOrElse(1) //TODO implement
// $COVERAGE-ON$
}