Skip to content

Commit

Permalink
Merge pull request #1533 from jrudolph/jr/w/1312-rewrite-poolslot
Browse files Browse the repository at this point in the history
#1312 Rewrite of PoolFlow / PoolSlot / PoolConductor as single GraphStage
  • Loading branch information
jrudolph authored Nov 30, 2017
2 parents a365fdc + ecb76a8 commit 0643345
Show file tree
Hide file tree
Showing 14 changed files with 1,434 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings
# New settings in `@DoNotInherit` classes
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.getModeledHeaderParsing")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ParserSettings.modeledHeaderParsing")

# New poolImplementation setting on @DoNotInherit class
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ConnectionPoolSettings.poolImplementation")
# New responseEntitySubscriptionTimeout setting
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ConnectionPoolSettings.responseEntitySubscriptionTimeout")
10 changes: 10 additions & 0 deletions akka-http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,16 @@ akka.http {
# will automatically terminate itself. Set to `infinite` to completely disable idle timeouts.
idle-timeout = 30 s

# The pool implementation to use. Currently supported are:
# - legacy: the original, still default, pool implementation
# - new: the new still-evolving pool implementation, that will receive fixes and new features
pool-implementation = legacy

# The "new" pool implementation will fail a connection early and clear the slot if a response entity was not
# subscribed during the given time period after the response was dispatched. In busy systems the timeout might be
# too tight if a response is not picked up quick enough after it was dispatched by the pool.
response-entity-subscription-timeout = 1.second

# Modify this section to tweak client settings only for host connection pools APIs like `Http().superPool` or
# `Http().singleRequest`.
client = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.http.impl.engine.client

import akka.NotUsed
import akka.annotation.InternalApi
import akka.http.impl.engine.client.PoolConductor.PoolSlotsSetting
import akka.http.scaladsl.settings.ConnectionPoolSettings

Expand All @@ -16,7 +17,9 @@ import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http

private object PoolFlow {
/** Internal API */
@InternalApi
private[client] object PoolFlow {

case class RequestContext(request: HttpRequest, responsePromise: Promise[HttpResponse], retriesLeft: Int) {
require(retriesLeft >= 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ package akka.http.impl.engine.client
import akka.actor._
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.http.impl.engine.client.PoolFlow._
import akka.http.impl.engine.client.pool.NewHostConnectionPool
import akka.http.impl.util.RichHttpRequest
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.http.scaladsl.settings.PoolImplementation
import akka.macros.LogHelper
import akka.stream.actor.ActorPublisherMessage._
import akka.stream.actor.ActorSubscriberMessage._
Expand Down Expand Up @@ -98,7 +100,12 @@ private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer
val connectionFlow =
Http().outgoingConnectionUsingTransport(host, port, settings.transport, connectionContext, settings.connectionSettings, setup.log)

val poolFlow = PoolFlow(connectionFlow, settings, log).named("PoolFlow")
val poolFlow =
settings.poolImplementation match {
case PoolImplementation.Legacy PoolFlow(connectionFlow, settings, log).named("PoolFlow")
case PoolImplementation.New NewHostConnectionPool(connectionFlow, settings, log).named("PoolFlow")
}

Source.fromPublisher(ActorPublisher(self)).via(poolFlow).runWith(Sink.fromSubscriber(ActorSubscriber[ResponseContext](self)))
}

Expand Down
Loading

0 comments on commit 0643345

Please sign in to comment.