diff --git a/akka-http-core/src/main/mima-filters/10.2.2.backwards.excludes/h2-client-persistent-connection-max-attempts.backwards.excludes b/akka-http-core/src/main/mima-filters/10.2.2.backwards.excludes/h2-client-persistent-connection-max-attempts.backwards.excludes new file mode 100644 index 00000000000..e137b075938 --- /dev/null +++ b/akka-http-core/src/main/mima-filters/10.2.2.backwards.excludes/h2-client-persistent-connection-max-attempts.backwards.excludes @@ -0,0 +1,4 @@ +# Added method to method that is not to be extended: +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.Http2ClientSettings.maxPersistentAttempts") +ProblemFilters.exclude[Problem]("akka.http.scaladsl.settings.Http2ClientSettings#Http2ClientSettingsImpl.*") +ProblemFilters.exclude[Problem]("akka.http.impl.engine.http2.OutgoingConnectionBuilderImpl*") diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index 9a50f7aceea..83964120980 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -447,6 +447,11 @@ akka.http { # Fail the connection if a sent ping is not acknowledged within this timeout. # When zero the ping-interval is used, if set the value must be evenly divisible by less than or equal to the ping-interval. ping-timeout = 0s + + # The maximum number of times a connections is attempted + # before giving up and returning an error. + # Set to zero to retry indefinitely. + max-persistent-attempts = 0 } #client-settings diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala index 0a874f8727f..cb19c0259e0 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala @@ -5,7 +5,6 @@ package akka.http.impl.engine.http2 import java.util.concurrent.CompletionStage - import akka.NotUsed import akka.actor.ClassicActorSystemProvider import akka.annotation.InternalApi @@ -15,7 +14,7 @@ import akka.http.impl.engine.http2.client.PersistentConnection import akka.http.scaladsl.Http.OutgoingConnection import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse -import akka.http.scaladsl.settings.ClientConnectionSettings +import akka.http.scaladsl.settings.{ ClientConnectionSettings, Http2ClientSettings } import akka.stream.scaladsl.Flow import akka.http.javadsl import akka.http.javadsl.{ OutgoingConnectionBuilder => JOutgoingConnectionBuilder } @@ -38,6 +37,7 @@ private[akka] object OutgoingConnectionBuilderImpl { host, None, clientConnectionSettings = ClientConnectionSettings(system), + http2Settings = Http2ClientSettings(system), connectionContext = None, log = system.classicSystem.log, system = system, @@ -48,6 +48,7 @@ private[akka] object OutgoingConnectionBuilderImpl { host: String, port: Option[Int], clientConnectionSettings: ClientConnectionSettings, + http2Settings: Http2ClientSettings, connectionContext: Option[HttpsConnectionContext], log: LoggingAdapter, system: ClassicActorSystemProvider, @@ -80,7 +81,7 @@ private[akka] object OutgoingConnectionBuilderImpl { } override def managedPersistentHttp2(): Flow[HttpRequest, HttpResponse, NotUsed] = - PersistentConnection.managedConnection(http2()) + PersistentConnection.managedConnection(http2(), http2Settings.maxPersistentAttempts) override def http2WithPriorKnowledge(): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { // http/2 prior knowledge plaintext @@ -88,7 +89,7 @@ private[akka] object OutgoingConnectionBuilderImpl { } override def managedPersistentHttp2WithPriorKnowledge(): Flow[HttpRequest, HttpResponse, NotUsed] = - PersistentConnection.managedConnection(http2WithPriorKnowledge()) + PersistentConnection.managedConnection(http2WithPriorKnowledge(), http2Settings.maxPersistentAttempts) override private[akka] def toJava: JOutgoingConnectionBuilder = new JavaAdapter(this) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala index 76c353663b5..1093a201d14 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala @@ -6,11 +6,16 @@ package akka.http.impl.engine.http2.client import akka.NotUsed import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.http.scaladsl.Http.OutgoingConnection import akka.http.scaladsl.model.{ AttributeKey, HttpRequest, HttpResponse, RequestResponseAssociation, StatusCodes } -import akka.stream.scaladsl.{ Flow, Source } +import akka.stream.scaladsl.{ Flow, Keep, Source } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging } import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } +import scala.concurrent.{ Future, Promise } +import scala.util.{ Failure, Success } + /** INTERNAL API */ @InternalApi private[http2] object PersistentConnection { @@ -31,8 +36,11 @@ private[http2] object PersistentConnection { * * generate error responses with 502 status code * * custom attribute contains internal error information */ - def managedConnection(connectionFlow: Flow[HttpRequest, HttpResponse, Any]): Flow[HttpRequest, HttpResponse, NotUsed] = - Flow.fromGraph(new Stage(connectionFlow)) + def managedConnection(connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]], maxAttempts: Int): Flow[HttpRequest, HttpResponse, NotUsed] = + Flow.fromGraph(new Stage(connectionFlow, maxAttempts match { + case 0 => None + case n => Some(n) + })) private class AssociationTag extends RequestResponseAssociation private val associationTagKey = AttributeKey[AssociationTag]("PersistentConnection.associationTagKey") @@ -41,7 +49,7 @@ private[http2] object PersistentConnection { StatusCodes.BadGateway, entity = "The server closed the connection before delivering a response.") - private class Stage(connectionFlow: Flow[HttpRequest, HttpResponse, Any]) extends GraphStage[FlowShape[HttpRequest, HttpResponse]] { + private class Stage(connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]], maxAttempts: Option[Int]) extends GraphStage[FlowShape[HttpRequest, HttpResponse]] { val requestIn = Inlet[HttpRequest]("PersistentConnection.requestIn") val responseOut = Outlet[HttpResponse]("PersistentConnection.responseOut") @@ -53,22 +61,77 @@ private[http2] object PersistentConnection { trait State extends InHandler with OutHandler object Unconnected extends State { - override def onPush(): Unit = connect() + override def onPush(): Unit = connect(maxAttempts) override def onPull(): Unit = - if (!hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected + if (!isAvailable(requestIn) && !hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected pull(requestIn) - } - def connect(): Unit = { + def connect(connectsLeft: Option[Int]): Unit = { val requestOut = new SubSourceOutlet[HttpRequest]("PersistentConnection.requestOut") val responseIn = new SubSinkInlet[HttpResponse]("PersistentConnection.responseIn") + val connection = Promise[OutgoingConnection]() - Source.fromGraph(requestOut.source) - .via(connectionFlow) - .runWith(responseIn.sink)(subFusingMaterializer) + become(new Connecting(connection.future, requestOut, responseIn, connectsLeft.map(_ - 1))) - become(new Connected(requestOut, responseIn)) + connection.completeWith(Source.fromGraph(requestOut.source) + .viaMat(connectionFlow)(Keep.right) + .toMat(responseIn.sink)(Keep.left) + .run()(subFusingMaterializer)) + } + + class Connecting( + connected: Future[OutgoingConnection], + requestOut: SubSourceOutlet[HttpRequest], + responseIn: SubSinkInlet[HttpResponse], + connectsLeft: Option[Int] + ) extends State { + connected.onComplete({ + case Success(_) => + onConnected.invoke(()) + case Failure(cause) => + onFailed.invoke(cause) + })(ExecutionContexts.parasitic) + + var requestOutPulled = false + requestOut.setHandler(new OutHandler { + override def onPull(): Unit = + requestOutPulled = true + override def onDownstreamFinish(): Unit = () + }) + responseIn.setHandler(new InHandler { + override def onPush(): Unit = () + override def onUpstreamFinish(): Unit = () + override def onUpstreamFailure(ex: Throwable): Unit = () + }) + + override def onPush(): Unit = () + + override def onPull(): Unit = { + if (!isAvailable(requestIn) && !hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected + pull(requestIn) + } + + val onConnected = getAsyncCallback[Unit] { (_) => + val newState = new Connected(requestOut, responseIn) + become(newState) + if (requestOutPulled) { + if (isAvailable(requestIn)) newState.dispatchRequest(grab(requestIn)) + else if (!hasBeenPulled(requestIn)) pull(requestIn) + } + } + val onFailed = getAsyncCallback[Throwable] { cause => + responseIn.cancel() + requestOut.fail(new RuntimeException("connection broken")) + + if (connectsLeft.contains(0)) { + failStage(cause) + } else { + setHandler(requestIn, Unconnected) + log.info(s"failed, trying to connect again: ${cause.getMessage}${connectsLeft.map(n => s" ($n left)").getOrElse("")}") + connect(connectsLeft) + } + } } class Connected( @@ -117,7 +180,9 @@ private[http2] object PersistentConnection { requestOut.push(req.addAttribute(associationTagKey, tag)) } - override def onPush(): Unit = dispatchRequest(grab(requestIn)) + override def onPush(): Unit = { + dispatchRequest(grab(requestIn)) + } override def onPull(): Unit = responseIn.pull() override def onUpstreamFinish(): Unit = { diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/settings/Http2ClientSettings.scala b/akka-http-core/src/main/scala/akka/http/javadsl/settings/Http2ClientSettings.scala index 39174408fb6..abdd851e2bb 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/settings/Http2ClientSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/settings/Http2ClientSettings.scala @@ -34,4 +34,6 @@ trait Http2ClientSettings { self: scaladsl.settings.Http2ClientSettings.Http2Cli def getPingTimeout: Duration = Duration.ofMillis(pingTimeout.toMillis) def withPingTimeout(timeout: Duration): Http2ClientSettings = copy(pingTimeout = timeout.toMillis.millis) + def getMaxPersistentAttempts: Int = maxPersistentAttempts + def withMaxPersistentAttempts(max: Int): Http2ClientSettings = copy(maxPersistentAttempts = max) } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/Http2ServerSettings.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/Http2ServerSettings.scala index 9552342cbb1..83ae7c6ab9c 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/Http2ServerSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/Http2ServerSettings.scala @@ -163,6 +163,9 @@ trait Http2ClientSettings extends javadsl.settings.Http2ClientSettings with Http def pingTimeout: FiniteDuration def withPingTimeout(timeout: FiniteDuration): Http2ClientSettings = copy(pingTimeout = timeout) + def maxPersistentAttempts: Int + override def withMaxPersistentAttempts(max: Int): Http2ClientSettings = copy(maxPersistentAttempts = max) + @InternalApi private[http] def internalSettings: Option[Http2InternalClientSettings] @InternalApi @@ -184,6 +187,7 @@ object Http2ClientSettings extends SettingsCompanion[Http2ClientSettings] { logFrames: Boolean, pingInterval: FiniteDuration, pingTimeout: FiniteDuration, + maxPersistentAttempts: Int, internalSettings: Option[Http2InternalClientSettings]) extends Http2ClientSettings with javadsl.settings.Http2ClientSettings { require(maxConcurrentStreams >= 0, "max-concurrent-streams must be >= 0") @@ -191,6 +195,7 @@ object Http2ClientSettings extends SettingsCompanion[Http2ClientSettings] { require(incomingConnectionLevelBufferSize > 0, "incoming-connection-level-buffer-size must be > 0") require(incomingStreamLevelBufferSize > 0, "incoming-stream-level-buffer-size must be > 0") require(outgoingControlFrameBufferSize > 0, "outgoing-control-frame-buffer-size must be > 0") + require(maxPersistentAttempts >= 0, "max-persistent-attempts must be >= 0") Http2CommonSettings.validate(this) } @@ -204,6 +209,7 @@ object Http2ClientSettings extends SettingsCompanion[Http2ClientSettings] { logFrames = c.getBoolean("log-frames"), pingInterval = c.getFiniteDuration("ping-interval"), pingTimeout = c.getFiniteDuration("ping-timeout"), + maxPersistentAttempts = c.getInt("max-persistent-attempts"), internalSettings = None // no possibility to configure internal settings with config ) } diff --git a/akka-http2-support/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala b/akka-http2-support/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala index e6ffa20bf10..d84f903438f 100644 --- a/akka-http2-support/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala +++ b/akka-http2-support/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala @@ -22,6 +22,7 @@ import akka.testkit.TestProbe import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures +import java.net.InetSocketAddress import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } @@ -32,6 +33,7 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer( """akka.http.server.remote-address-header = on akka.http.server.preview.enable-http2 = on akka.http.client.http2.log-frames = on + akka.http.client.http2.max-persistent-attempts = 5 akka.http.client.log-unencrypted-network-bytes = 100 akka.actor.serialize-messages = false """) with ScalaFutures { @@ -182,6 +184,62 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer( // request should have come in through another connection serverRequest2.clientPort should not be (clientPort) } + "when the first connection fails to materialize" inAssertAllStagesStopped new TestSetup(tls) { + var first = true + override def clientSettings = super.clientSettings.withTransport(ClientTransport.withCustomResolver((host, port) => { + if (first) { + first = false + // First request returns an address where we are not listening:: + Future.successful(new InetSocketAddress("example.invalid", 80)) + } else + Future.successful(server.binding.localAddress) + })) + + client.sendRequest( + HttpRequest( + method = HttpMethods.POST, + entity = "ping", + headers = headers.`Accept-Encoding`(HttpEncodings.gzip) :: Nil + ) + .addAttribute(requestIdAttr, RequestId("request-1")) + ) + // need some demand on response side, otherwise, no requests will be pulled in + client.responsesIn.request(1) + client.requestsOut.ensureSubscription() + + val serverRequest = server.expectRequest() + serverRequest.request.attribute(Http2.streamId) should not be empty + serverRequest.request.method shouldBe HttpMethods.POST + serverRequest.request.header[headers.`Accept-Encoding`] should not be empty + serverRequest.entityAsString shouldBe "ping" + + // now respond + server.sendResponseFor(serverRequest, HttpResponse(entity = "pong")) + + val response = client.expectResponse() + Unmarshal(response.entity).to[String].futureValue shouldBe "pong" + response.attribute(requestIdAttr).get.id shouldBe "request-1" + } + } + "eventually fail" should { + "when connecting keeps failing" inAssertAllStagesStopped new TestSetup(tls) { + override def clientSettings = super.clientSettings + .withTransport(ClientTransport.withCustomResolver((_, _) => { + Future.successful(new InetSocketAddress("example.invalid", 80)) + })) + + client.sendRequest( + HttpRequest( + method = HttpMethods.POST, + entity = "ping", + headers = headers.`Accept-Encoding`(HttpEncodings.gzip) :: Nil + ) + .addAttribute(requestIdAttr, RequestId("request-1")) + ) + // need some demand on response side, otherwise, no requests will be pulled in + client.responsesIn.request(1) + client.requestsOut.expectCancellation() + } } "not leak any stages if completed" should { "when waiting for a response" inAssertAllStagesStopped new TestSetup(tls) { @@ -228,7 +286,16 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer( class TestSetup(tls: Boolean) { def serverSettings: ServerSettings = ServerSettings(system) - def clientSettings: ClientConnectionSettings = ClientConnectionSettings(system) + def clientSettings: ClientConnectionSettings = + ClientConnectionSettings(system).withTransport(new ClientTransport { + override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] = { + Flow.fromGraph(KillSwitches.single[ByteString]) + .mapMaterializedValue { killer => + killProbe.ref ! killer + } + .viaMat(ClientTransport.TCP.connectTo(server.binding.localAddress.getHostString, server.binding.localAddress.getPort, settings)(system))(Keep.right) + } + }) val killProbe = TestProbe() def killConnection(): Unit = killProbe.expectMsgType[UniqueKillSwitch].abort(new RuntimeException("connection was killed")) @@ -254,19 +321,11 @@ class Http2PersistentClientSpec extends AkkaSpecWithMaterializer( request.sendResponse(response) } object client { - val transport = new ClientTransport { - override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] = - Flow.fromGraph(KillSwitches.single[ByteString]) - .mapMaterializedValue { killer => - killProbe.ref ! killer - } - .viaMat(ClientTransport.TCP.connectTo(server.binding.localAddress.getHostString, server.binding.localAddress.getPort, settings)(system))(Keep.right) - } lazy val clientFlow = { val builder = Http().connectionTo("akka.example.org") .withCustomHttpsConnectionContext(ExampleHttpContexts.exampleClientContext) - .withClientConnectionSettings(clientSettings.withTransport(transport)) + .withClientConnectionSettings(clientSettings) if (tls) builder.managedPersistentHttp2() else builder.managedPersistentHttp2WithPriorKnowledge()