Skip to content

Commit

Permalink
Merge pull request #3737 from raboof/persistent-connection-max-attempts
Browse files Browse the repository at this point in the history
[h2 client] persistent connection max attempts
  • Loading branch information
johanandren authored Jan 15, 2021
2 parents c156686 + 0da2f3a commit 5bc4b2e
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Added method to method that is not to be extended:
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.Http2ClientSettings.maxPersistentAttempts")
ProblemFilters.exclude[Problem]("akka.http.scaladsl.settings.Http2ClientSettings#Http2ClientSettingsImpl.*")
ProblemFilters.exclude[Problem]("akka.http.impl.engine.http2.OutgoingConnectionBuilderImpl*")
5 changes: 5 additions & 0 deletions akka-http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ akka.http {
# Fail the connection if a sent ping is not acknowledged within this timeout.
# When zero the ping-interval is used, if set the value must be evenly divisible by less than or equal to the ping-interval.
ping-timeout = 0s

# The maximum number of times a connections is attempted
# before giving up and returning an error.
# Set to zero to retry indefinitely.
max-persistent-attempts = 0
}

#client-settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.http.impl.engine.http2

import java.util.concurrent.CompletionStage

import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.annotation.InternalApi
Expand All @@ -15,7 +14,7 @@ import akka.http.impl.engine.http2.client.PersistentConnection
import akka.http.scaladsl.Http.OutgoingConnection
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.http.scaladsl.settings.{ ClientConnectionSettings, Http2ClientSettings }
import akka.stream.scaladsl.Flow
import akka.http.javadsl
import akka.http.javadsl.{ OutgoingConnectionBuilder => JOutgoingConnectionBuilder }
Expand All @@ -38,6 +37,7 @@ private[akka] object OutgoingConnectionBuilderImpl {
host,
None,
clientConnectionSettings = ClientConnectionSettings(system),
http2Settings = Http2ClientSettings(system),
connectionContext = None,
log = system.classicSystem.log,
system = system,
Expand All @@ -48,6 +48,7 @@ private[akka] object OutgoingConnectionBuilderImpl {
host: String,
port: Option[Int],
clientConnectionSettings: ClientConnectionSettings,
http2Settings: Http2ClientSettings,
connectionContext: Option[HttpsConnectionContext],
log: LoggingAdapter,
system: ClassicActorSystemProvider,
Expand Down Expand Up @@ -80,15 +81,15 @@ private[akka] object OutgoingConnectionBuilderImpl {
}

override def managedPersistentHttp2(): Flow[HttpRequest, HttpResponse, NotUsed] =
PersistentConnection.managedConnection(http2())
PersistentConnection.managedConnection(http2(), http2Settings.maxPersistentAttempts)

override def http2WithPriorKnowledge(): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
// http/2 prior knowledge plaintext
Http2(system).outgoingConnectionPriorKnowledge(host, port.getOrElse(80), clientConnectionSettings, log)
}

override def managedPersistentHttp2WithPriorKnowledge(): Flow[HttpRequest, HttpResponse, NotUsed] =
PersistentConnection.managedConnection(http2WithPriorKnowledge())
PersistentConnection.managedConnection(http2WithPriorKnowledge(), http2Settings.maxPersistentAttempts)

override private[akka] def toJava: JOutgoingConnectionBuilder = new JavaAdapter(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ package akka.http.impl.engine.http2.client

import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.http.scaladsl.Http.OutgoingConnection
import akka.http.scaladsl.model.{ AttributeKey, HttpRequest, HttpResponse, RequestResponseAssociation, StatusCodes }
import akka.stream.scaladsl.{ Flow, Source }
import akka.stream.scaladsl.{ Flow, Keep, Source }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }

import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success }

/** INTERNAL API */
@InternalApi
private[http2] object PersistentConnection {
Expand All @@ -31,8 +36,11 @@ private[http2] object PersistentConnection {
* * generate error responses with 502 status code
* * custom attribute contains internal error information
*/
def managedConnection(connectionFlow: Flow[HttpRequest, HttpResponse, Any]): Flow[HttpRequest, HttpResponse, NotUsed] =
Flow.fromGraph(new Stage(connectionFlow))
def managedConnection(connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]], maxAttempts: Int): Flow[HttpRequest, HttpResponse, NotUsed] =
Flow.fromGraph(new Stage(connectionFlow, maxAttempts match {
case 0 => None
case n => Some(n)
}))

private class AssociationTag extends RequestResponseAssociation
private val associationTagKey = AttributeKey[AssociationTag]("PersistentConnection.associationTagKey")
Expand All @@ -41,7 +49,7 @@ private[http2] object PersistentConnection {
StatusCodes.BadGateway,
entity = "The server closed the connection before delivering a response.")

private class Stage(connectionFlow: Flow[HttpRequest, HttpResponse, Any]) extends GraphStage[FlowShape[HttpRequest, HttpResponse]] {
private class Stage(connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]], maxAttempts: Option[Int]) extends GraphStage[FlowShape[HttpRequest, HttpResponse]] {
val requestIn = Inlet[HttpRequest]("PersistentConnection.requestIn")
val responseOut = Outlet[HttpResponse]("PersistentConnection.responseOut")

Expand All @@ -53,22 +61,77 @@ private[http2] object PersistentConnection {

trait State extends InHandler with OutHandler
object Unconnected extends State {
override def onPush(): Unit = connect()
override def onPush(): Unit = connect(maxAttempts)
override def onPull(): Unit =
if (!hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected
if (!isAvailable(requestIn) && !hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected
pull(requestIn)

}

def connect(): Unit = {
def connect(connectsLeft: Option[Int]): Unit = {
val requestOut = new SubSourceOutlet[HttpRequest]("PersistentConnection.requestOut")
val responseIn = new SubSinkInlet[HttpResponse]("PersistentConnection.responseIn")
val connection = Promise[OutgoingConnection]()

Source.fromGraph(requestOut.source)
.via(connectionFlow)
.runWith(responseIn.sink)(subFusingMaterializer)
become(new Connecting(connection.future, requestOut, responseIn, connectsLeft.map(_ - 1)))

become(new Connected(requestOut, responseIn))
connection.completeWith(Source.fromGraph(requestOut.source)
.viaMat(connectionFlow)(Keep.right)
.toMat(responseIn.sink)(Keep.left)
.run()(subFusingMaterializer))
}

class Connecting(
connected: Future[OutgoingConnection],
requestOut: SubSourceOutlet[HttpRequest],
responseIn: SubSinkInlet[HttpResponse],
connectsLeft: Option[Int]
) extends State {
connected.onComplete({
case Success(_) =>
onConnected.invoke(())
case Failure(cause) =>
onFailed.invoke(cause)
})(ExecutionContexts.parasitic)

var requestOutPulled = false
requestOut.setHandler(new OutHandler {
override def onPull(): Unit =
requestOutPulled = true
override def onDownstreamFinish(): Unit = ()
})
responseIn.setHandler(new InHandler {
override def onPush(): Unit = ()
override def onUpstreamFinish(): Unit = ()
override def onUpstreamFailure(ex: Throwable): Unit = ()
})

override def onPush(): Unit = ()

override def onPull(): Unit = {
if (!isAvailable(requestIn) && !hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected
pull(requestIn)
}

val onConnected = getAsyncCallback[Unit] { (_) =>
val newState = new Connected(requestOut, responseIn)
become(newState)
if (requestOutPulled) {
if (isAvailable(requestIn)) newState.dispatchRequest(grab(requestIn))
else if (!hasBeenPulled(requestIn)) pull(requestIn)
}
}
val onFailed = getAsyncCallback[Throwable] { cause =>
responseIn.cancel()
requestOut.fail(new RuntimeException("connection broken"))

if (connectsLeft.contains(0)) {
failStage(cause)
} else {
setHandler(requestIn, Unconnected)
log.info(s"failed, trying to connect again: ${cause.getMessage}${connectsLeft.map(n => s" ($n left)").getOrElse("")}")
connect(connectsLeft)
}
}
}

class Connected(
Expand Down Expand Up @@ -117,7 +180,9 @@ private[http2] object PersistentConnection {
requestOut.push(req.addAttribute(associationTagKey, tag))
}

override def onPush(): Unit = dispatchRequest(grab(requestIn))
override def onPush(): Unit = {
dispatchRequest(grab(requestIn))
}
override def onPull(): Unit = responseIn.pull()

override def onUpstreamFinish(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ trait Http2ClientSettings { self: scaladsl.settings.Http2ClientSettings.Http2Cli
def getPingTimeout: Duration = Duration.ofMillis(pingTimeout.toMillis)
def withPingTimeout(timeout: Duration): Http2ClientSettings = copy(pingTimeout = timeout.toMillis.millis)

def getMaxPersistentAttempts: Int = maxPersistentAttempts
def withMaxPersistentAttempts(max: Int): Http2ClientSettings = copy(maxPersistentAttempts = max)
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ trait Http2ClientSettings extends javadsl.settings.Http2ClientSettings with Http
def pingTimeout: FiniteDuration
def withPingTimeout(timeout: FiniteDuration): Http2ClientSettings = copy(pingTimeout = timeout)

def maxPersistentAttempts: Int
override def withMaxPersistentAttempts(max: Int): Http2ClientSettings = copy(maxPersistentAttempts = max)

@InternalApi
private[http] def internalSettings: Option[Http2InternalClientSettings]
@InternalApi
Expand All @@ -184,13 +187,15 @@ object Http2ClientSettings extends SettingsCompanion[Http2ClientSettings] {
logFrames: Boolean,
pingInterval: FiniteDuration,
pingTimeout: FiniteDuration,
maxPersistentAttempts: Int,
internalSettings: Option[Http2InternalClientSettings])
extends Http2ClientSettings with javadsl.settings.Http2ClientSettings {
require(maxConcurrentStreams >= 0, "max-concurrent-streams must be >= 0")
require(requestEntityChunkSize > 0, "request-entity-chunk-size must be > 0")
require(incomingConnectionLevelBufferSize > 0, "incoming-connection-level-buffer-size must be > 0")
require(incomingStreamLevelBufferSize > 0, "incoming-stream-level-buffer-size must be > 0")
require(outgoingControlFrameBufferSize > 0, "outgoing-control-frame-buffer-size must be > 0")
require(maxPersistentAttempts >= 0, "max-persistent-attempts must be >= 0")
Http2CommonSettings.validate(this)
}

Expand All @@ -204,6 +209,7 @@ object Http2ClientSettings extends SettingsCompanion[Http2ClientSettings] {
logFrames = c.getBoolean("log-frames"),
pingInterval = c.getFiniteDuration("ping-interval"),
pingTimeout = c.getFiniteDuration("ping-timeout"),
maxPersistentAttempts = c.getInt("max-persistent-attempts"),
internalSettings = None // no possibility to configure internal settings with config
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import akka.testkit.TestProbe
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures

import java.net.InetSocketAddress
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
Expand All @@ -32,6 +33,7 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer(
"""akka.http.server.remote-address-header = on
akka.http.server.preview.enable-http2 = on
akka.http.client.http2.log-frames = on
akka.http.client.http2.max-persistent-attempts = 5
akka.http.client.log-unencrypted-network-bytes = 100
akka.actor.serialize-messages = false
""") with ScalaFutures {
Expand Down Expand Up @@ -182,6 +184,62 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer(
// request should have come in through another connection
serverRequest2.clientPort should not be (clientPort)
}
"when the first connection fails to materialize" inAssertAllStagesStopped new TestSetup(tls) {
var first = true
override def clientSettings = super.clientSettings.withTransport(ClientTransport.withCustomResolver((host, port) => {
if (first) {
first = false
// First request returns an address where we are not listening::
Future.successful(new InetSocketAddress("example.invalid", 80))
} else
Future.successful(server.binding.localAddress)
}))

client.sendRequest(
HttpRequest(
method = HttpMethods.POST,
entity = "ping",
headers = headers.`Accept-Encoding`(HttpEncodings.gzip) :: Nil
)
.addAttribute(requestIdAttr, RequestId("request-1"))
)
// need some demand on response side, otherwise, no requests will be pulled in
client.responsesIn.request(1)
client.requestsOut.ensureSubscription()

val serverRequest = server.expectRequest()
serverRequest.request.attribute(Http2.streamId) should not be empty
serverRequest.request.method shouldBe HttpMethods.POST
serverRequest.request.header[headers.`Accept-Encoding`] should not be empty
serverRequest.entityAsString shouldBe "ping"

// now respond
server.sendResponseFor(serverRequest, HttpResponse(entity = "pong"))

val response = client.expectResponse()
Unmarshal(response.entity).to[String].futureValue shouldBe "pong"
response.attribute(requestIdAttr).get.id shouldBe "request-1"
}
}
"eventually fail" should {
"when connecting keeps failing" inAssertAllStagesStopped new TestSetup(tls) {
override def clientSettings = super.clientSettings
.withTransport(ClientTransport.withCustomResolver((_, _) => {
Future.successful(new InetSocketAddress("example.invalid", 80))
}))

client.sendRequest(
HttpRequest(
method = HttpMethods.POST,
entity = "ping",
headers = headers.`Accept-Encoding`(HttpEncodings.gzip) :: Nil
)
.addAttribute(requestIdAttr, RequestId("request-1"))
)
// need some demand on response side, otherwise, no requests will be pulled in
client.responsesIn.request(1)
client.requestsOut.expectCancellation()
}
}
"not leak any stages if completed" should {
"when waiting for a response" inAssertAllStagesStopped new TestSetup(tls) {
Expand Down Expand Up @@ -228,7 +286,16 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer(

class TestSetup(tls: Boolean) {
def serverSettings: ServerSettings = ServerSettings(system)
def clientSettings: ClientConnectionSettings = ClientConnectionSettings(system)
def clientSettings: ClientConnectionSettings =
ClientConnectionSettings(system).withTransport(new ClientTransport {
override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] = {
Flow.fromGraph(KillSwitches.single[ByteString])
.mapMaterializedValue { killer =>
killProbe.ref ! killer
}
.viaMat(ClientTransport.TCP.connectTo(server.binding.localAddress.getHostString, server.binding.localAddress.getPort, settings)(system))(Keep.right)
}
})

val killProbe = TestProbe()
def killConnection(): Unit = killProbe.expectMsgType[UniqueKillSwitch].abort(new RuntimeException("connection was killed"))
Expand All @@ -254,19 +321,11 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer(
request.sendResponse(response)
}
object client {
val transport = new ClientTransport {
override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] =
Flow.fromGraph(KillSwitches.single[ByteString])
.mapMaterializedValue { killer =>
killProbe.ref ! killer
}
.viaMat(ClientTransport.TCP.connectTo(server.binding.localAddress.getHostString, server.binding.localAddress.getPort, settings)(system))(Keep.right)
}

lazy val clientFlow = {
val builder = Http().connectionTo("akka.example.org")
.withCustomHttpsConnectionContext(ExampleHttpContexts.exampleClientContext)
.withClientConnectionSettings(clientSettings.withTransport(transport))
.withClientConnectionSettings(clientSettings)

if (tls) builder.managedPersistentHttp2()
else builder.managedPersistentHttp2WithPriorKnowledge()
Expand Down

0 comments on commit 5bc4b2e

Please sign in to comment.