From 703982955c09352c558d4949b2400641f31f2173 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 5 Sep 2018 13:46:03 +0200 Subject: [PATCH 1/6] [backport from master] Allow a maximum size parameter for HttpEntity 'toStrict' (#2186) And apply configurable default limit otherwise. Fixes #268. Refs #2137. (cherry picked from commit d0cee31292c893994141cc36f772e5c90828c341) --- .../akka/http/javadsl/model/HttpEntity.java | 13 +++++- .../akka/http/javadsl/model/HttpMessage.java | 13 +++++- .../mima-filters/10.1.4.backwards.excludes | 13 ++++++ .../src/main/resources/reference.conf | 4 ++ .../impl/settings/ParserSettingsImpl.scala | 2 + .../scala/akka/http/impl/util/package.scala | 14 ++++-- .../javadsl/settings/ParserSettings.scala | 2 + .../akka/http/scaladsl/model/HttpEntity.scala | 35 +++++++++++--- .../http/scaladsl/model/HttpMessage.scala | 9 ++++ .../scaladsl/settings/ParserSettings.scala | 3 ++ .../http/scaladsl/model/HttpEntitySpec.scala | 40 ++++++++++++++++ .../mima-filters/10.1.4.backwards.excludes | 4 ++ .../server/directives/BasicDirectives.scala | 41 ++++++++++++++++- .../server/directives/BasicDirectives.scala | 46 +++++++++++++++++-- 14 files changed, 221 insertions(+), 18 deletions(-) create mode 100644 akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes create mode 100644 akka-http/src/main/mima-filters/10.1.4.backwards.excludes diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java index 30d61c97850..8a3a0c65c62 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java @@ -136,7 +136,7 @@ public interface HttpEntity { HttpEntity withoutSizeLimit(); /** - * Returns a future of a strict entity that contains the same data as this entity + * Returns a CompletionStage of a strict entity that contains the same data as this entity * which is only completed when the complete entity has been collected. As the * duration of receiving the complete entity cannot be predicted, a timeout needs to * be specified to guard the process against running and keeping resources infinitely. @@ -146,6 +146,17 @@ public interface HttpEntity { */ CompletionStage toStrict(long timeoutMillis, Materializer materializer); + /** + * Returns a CompletionStage of a strict entity that contains the same data as this entity + * which is only completed when the complete entity has been collected. As the + * duration of receiving the complete entity cannot be predicted, a timeout needs to + * be specified to guard the process against running and keeping resources infinitely. + * + * Use getDataBytes and stream processing instead if the expected data is big or + * is likely to take a long time. + */ + CompletionStage toStrict(long timeoutMillis, long maxBytes, Materializer materializer); + /** * Discards the entities data bytes by running the {@code dataBytes} Source contained in this entity. * diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java index 33262cd6111..d3052043cef 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java @@ -179,7 +179,7 @@ interface MessageTransformations { Self transformEntityDataBytes(Graph, T> transformer); /** - * Returns a future of Self message with strict entity that contains the same data as this entity + * Returns a CompletionStage of Self message with strict entity that contains the same data as this entity * which is only completed when the complete entity has been collected. As the * duration of receiving the complete entity cannot be predicted, a timeout needs to * be specified to guard the process against running and keeping resources infinitely. @@ -188,5 +188,16 @@ interface MessageTransformations { * is likely to take a long time. */ CompletionStage toStrict(long timeoutMillis, Executor ec, Materializer materializer); + + /** + * Returns a CompletionStage of Self message with strict entity that contains the same data as this entity + * which is only completed when the complete entity has been collected. As the + * duration of receiving the complete entity cannot be predicted, a timeout needs to + * be specified to guard the process against running and keeping resources infinitely. + * + * Use getEntity().getDataBytes and stream processing instead if the expected data is big or + * is likely to take a long time. + */ + CompletionStage toStrict(long timeoutMillis, long maxBytes, Executor ec, Materializer materializer); } } diff --git a/akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes b/akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes new file mode 100644 index 00000000000..aa52237c60c --- /dev/null +++ b/akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes @@ -0,0 +1,13 @@ +# Not meant for user extension, so new methods should be fine: +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpEntity.toStrict") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.toStrict") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpMessage#MessageTransformations.toStrict") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpMessage.toStrict") + +# ToStrict is private[http] +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.util.ToStrict.this") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.getMaxToStrictBytes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.maxToStrictBytes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ParserSettings.maxToStrictBytes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.settings.ParserSettingsImpl.*") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.settings.ParserSettingsImpl.*") diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index bc340224c1d..b5a3a33ed4d 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -352,6 +352,10 @@ akka.http { # programmatically via `withSizeLimit`.) max-content-length = 8m + # The maximum number of bytes to allow when reading the entire entity into memory with `toStrict` + # (which is used by the `toStrictEntity` and `extractStrictEntity` directives) + max-to-strict-bytes = 8m + # Sets the strictness mode for parsing request target URIs. # The following values are defined: # diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala index 7b20eb83e59..1445703996b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala @@ -23,6 +23,7 @@ private[akka] final case class ParserSettingsImpl( maxHeaderValueLength: Int, maxHeaderCount: Int, maxContentLength: Long, + maxToStrictBytes: Long, maxChunkExtLength: Int, maxChunkSize: Int, uriParsingMode: Uri.ParsingMode, @@ -74,6 +75,7 @@ object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.ht c getIntBytes "max-header-value-length", c getIntBytes "max-header-count", c getPossiblyInfiniteBytes "max-content-length", + c getPossiblyInfiniteBytes "max-to-strict-bytes", c getIntBytes "max-chunk-ext-length", c getIntBytes "max-chunk-size", Uri.ParsingMode(c getString "uri-parsing-mode"), diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 7bbf4d65084..186127d5212 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -92,11 +92,12 @@ package object util { package util { - import akka.http.scaladsl.model.{ ContentType, HttpEntity } - import akka.stream.{ Attributes, Outlet, Inlet, FlowShape } + import akka.http.scaladsl.model.{ ContentType, EntityStreamException, ErrorInfo, HttpEntity } + import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } + import scala.concurrent.duration.FiniteDuration - private[http] class ToStrict(timeout: FiniteDuration, contentType: ContentType) + private[http] class ToStrict(timeout: FiniteDuration, maxBytes: Option[Long], contentType: ContentType) extends GraphStage[FlowShape[ByteString, HttpEntity.Strict]] { val byteStringIn = Inlet[ByteString]("ToStrict.byteStringIn") @@ -124,7 +125,12 @@ package util { setHandler(byteStringIn, new InHandler { override def onPush(): Unit = { bytes ++= grab(byteStringIn) - pull(byteStringIn) + maxBytes match { + case Some(max) if bytes.length > max ⇒ + failStage(new EntityStreamException(new ErrorInfo("Request too large", s"Request was longer than the maximum of $max"))) + case _ ⇒ + pull(byteStringIn) + } } override def onUpstreamFinish(): Unit = { if (isAvailable(httpEntityOut)) { diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/settings/ParserSettings.scala b/akka-http-core/src/main/scala/akka/http/javadsl/settings/ParserSettings.scala index 5d8143bcaaf..0f087e00d89 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/settings/ParserSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/settings/ParserSettings.scala @@ -31,6 +31,7 @@ abstract class ParserSettings private[akka] () extends BodyPartParser.Settings { def getMaxHeaderValueLength: Int def getMaxHeaderCount: Int def getMaxContentLength: Long + def getMaxToStrictBytes: Long def getMaxChunkExtLength: Int def getMaxChunkSize: Int def getUriParsingMode: Uri.ParsingMode @@ -55,6 +56,7 @@ abstract class ParserSettings private[akka] () extends BodyPartParser.Settings { def withMaxHeaderValueLength(newValue: Int): ParserSettings = self.copy(maxHeaderValueLength = newValue) def withMaxHeaderCount(newValue: Int): ParserSettings = self.copy(maxHeaderCount = newValue) def withMaxContentLength(newValue: Long): ParserSettings = self.copy(maxContentLength = newValue) + def withMaxToStrictBytes(newValue: Long): ParserSettings = self.copy(maxToStrictBytes = newValue) def withMaxChunkExtLength(newValue: Int): ParserSettings = self.copy(maxChunkExtLength = newValue) def withMaxChunkSize(newValue: Int): ParserSettings = self.copy(maxChunkSize = newValue) def withUriParsingMode(newValue: Uri.ParsingMode): ParserSettings = self.copy(uriParsingMode = newValue.asScala) diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 1717ae66fc0..65a334ab92f 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -70,12 +70,31 @@ sealed trait HttpEntity extends jm.HttpEntity { /** * Collects all possible parts and returns a potentially future Strict entity for easier processing. - * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout. - */ - def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] = - dataBytes - .via(new akka.http.impl.util.ToStrict(timeout, contentType)) - .runWith(Sink.head) + * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout, + * or with a EntityStreamException when the end of the entity is not reached within the maximum number of bytes + * as configured in `akka.http.parsing.max-to-strict-bytes`. Not that this method does not support different + * defaults for client- and server use: if you want that, use the `toStrict` method and pass in an explicit + * maximum number of bytes. + */ + def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] = { + import akka.http.impl.util._ + val config = fm.asInstanceOf[ActorMaterializer].system.settings.config + toStrict(timeout, config.getPossiblyInfiniteBytes("akka.http.parsing.max-to-strict-bytes")) + } + + /** + * Collects all possible parts and returns a potentially future Strict entity for easier processing. + * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout, + * or with a EntityStreamException when the end of the entity is not reached within the maximum number of bytes. + */ + def toStrict(timeout: FiniteDuration, maxBytes: Long)(implicit fm: Materializer): Future[HttpEntity.Strict] = contentLengthOption match { + case Some(contentLength) if contentLength > maxBytes ⇒ + FastFuture.failed(new EntityStreamException(new ErrorInfo("Request too large", s"Request of size $contentLength was longer than the maximum of $maxBytes"))) + case _ ⇒ + dataBytes + .via(new akka.http.impl.util.ToStrict(timeout, Some(maxBytes), contentType)) + .runWith(Sink.head) + } /** * Discards the entities data bytes by running the `dataBytes` Source contained in this `entity`. @@ -174,6 +193,10 @@ sealed trait HttpEntity extends jm.HttpEntity { override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.HttpEntity.Strict] = toStrict(timeoutMillis.millis)(materializer).toJava + /** Java API */ + override def toStrict(timeoutMillis: Long, maxBytes: Long, materializer: Materializer): CompletionStage[jm.HttpEntity.Strict] = + toStrict(timeoutMillis.millis)(materializer).toJava + /** Java API */ override def withContentType(contentType: jm.ContentType): HttpEntity = { import JavaMapping.Implicits._ diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala index fe00df085ff..14e9dcb23ea 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala @@ -95,6 +95,10 @@ sealed trait HttpMessage extends jm.HttpMessage { def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: Materializer): Future[Self] = entity.toStrict(timeout).fast.map(this.withEntity) + /** Returns a shareable and serializable copy of this message with a strict entity. */ + def toStrict(timeout: FiniteDuration, maxBytes: Long)(implicit ec: ExecutionContext, fm: Materializer): Future[Self] = + entity.toStrict(timeout, maxBytes).fast.map(this.withEntity) + /** Returns a copy of this message with the entity and headers set to the given ones. */ def withHeadersAndEntity(headers: immutable.Seq[HttpHeader], entity: MessageEntity): Self @@ -177,6 +181,11 @@ sealed trait HttpMessage extends jm.HttpMessage { val ex = ExecutionContext.fromExecutor(ec) toStrict(timeoutMillis.millis)(ex, materializer).toJava } + /** Java API */ + def toStrict(timeoutMillis: Long, maxBytes: Long, ec: Executor, materializer: Materializer): CompletionStage[Self] = { + val ex = ExecutionContext.fromExecutor(ec) + toStrict(timeoutMillis.millis, maxBytes)(ex, materializer).toJava + } } object HttpMessage { diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ParserSettings.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ParserSettings.scala index 7d9c4991ed5..3387ebda305 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ParserSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ParserSettings.scala @@ -31,6 +31,7 @@ abstract class ParserSettings private[akka] () extends akka.http.javadsl.setting def maxHeaderValueLength: Int def maxHeaderCount: Int def maxContentLength: Long + def maxToStrictBytes: Long def maxChunkExtLength: Int def maxChunkSize: Int def uriParsingMode: Uri.ParsingMode @@ -52,6 +53,7 @@ abstract class ParserSettings private[akka] () extends akka.http.javadsl.setting override def getUriParsingMode: akka.http.javadsl.model.Uri.ParsingMode = uriParsingMode override def getMaxHeaderCount = maxHeaderCount override def getMaxContentLength = maxContentLength + override def getMaxToStrictBytes = maxToStrictBytes override def getMaxHeaderValueLength = maxHeaderValueLength override def getIncludeTlsSessionInfoHeader = includeTlsSessionInfoHeader override def getIllegalHeaderWarnings = illegalHeaderWarnings @@ -83,6 +85,7 @@ abstract class ParserSettings private[akka] () extends akka.http.javadsl.setting override def withMaxHeaderValueLength(newValue: Int): ParserSettings = self.copy(maxHeaderValueLength = newValue) override def withMaxHeaderCount(newValue: Int): ParserSettings = self.copy(maxHeaderCount = newValue) override def withMaxContentLength(newValue: Long): ParserSettings = self.copy(maxContentLength = newValue) + override def withMaxToStrictBytes(newValue: Long): ParserSettings = self.copy(maxToStrictBytes = newValue) override def withMaxChunkExtLength(newValue: Int): ParserSettings = self.copy(maxChunkExtLength = newValue) override def withMaxChunkSize(newValue: Int): ParserSettings = self.copy(maxChunkSize = newValue) override def withIllegalHeaderWarnings(newValue: Boolean): ParserSettings = self.copy(illegalHeaderWarnings = newValue) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala index 24fdce3c4aa..502c8790051 100755 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala @@ -20,6 +20,7 @@ import akka.stream.ActorMaterializer import akka.http.scaladsl.model.HttpEntity._ import akka.http.impl.util.StreamUtils import akka.testkit._ +import org.scalatest.concurrent.ScalaFutures import scala.util.Random @@ -99,6 +100,45 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { }.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data") } } + "support toStrict with the default max size" - { + "Infinite data stream" in { + intercept[EntityStreamException] { + Await.result(Chunked(tpe, Source.repeat(Chunk(abc))).toStrict(awaitAtMost), awaitAtMost) + }.getMessage must be("Request too large: Request was longer than the maximum of 8388608") + } + } + "support toStrict with a max size" - { + "Strict" in { + intercept[EntityStreamException] { + Await.result(Strict(tpe, abc).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost) + }.getMessage must be("Request too large: Request of size 3 was longer than the maximum of 1") + } + "Default" in { + intercept[EntityStreamException] { + Await.result(Default(tpe, 11, source(abc, de, fgh, ijk)).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost) + }.getMessage must be("Request too large: Request of size 11 was longer than the maximum of 1") + } + "CloseDelimited" in { + intercept[EntityStreamException] { + Await.result(CloseDelimited(tpe, source(abc, de, fgh, ijk)).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost) + }.getMessage must be("Request too large: Request was longer than the maximum of 1") + } + "Chunked w/o LastChunk" in { + intercept[EntityStreamException] { + Await.result(Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk))).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost) + }.getMessage must be("Request too large: Request was longer than the maximum of 1") + } + "Chunked with LastChunk" in { + intercept[EntityStreamException] { + Await.result(Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost) + }.getMessage must be("Request too large: Request was longer than the maximum of 1") + } + "Infinite data stream" in { + intercept[EntityStreamException] { + Await.result(Chunked(tpe, Source.repeat(Chunk(abc))).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost) + }.getMessage must be("Request too large: Request was longer than the maximum of 1") + } + } "support transformDataBytes" - { "Strict" in { Strict(tpe, abc) must transformTo(Strict(tpe, doubleChars("abc") ++ trailer)) diff --git a/akka-http/src/main/mima-filters/10.1.4.backwards.excludes b/akka-http/src/main/mima-filters/10.1.4.backwards.excludes new file mode 100644 index 00000000000..b566119ca73 --- /dev/null +++ b/akka-http/src/main/mima-filters/10.1.4.backwards.excludes @@ -0,0 +1,4 @@ +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.server.directives.BasicDirectives.toStrictEntity") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.server.directives.BasicDirectives.toStrictEntity") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.server.directives.BasicDirectives.extractStrictEntity") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.server.directives.BasicDirectives.extractStrictEntity") diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala index da216dcca23..d60366768d2 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala @@ -303,7 +303,10 @@ abstract class BasicDirectives { def extractRequestEntity(inner: JFunction[RequestEntity, Route]): Route = extractEntity(inner) /** - * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. * * Converts the HttpEntity from the [[akka.http.javadsl.server.RequestContext]] into an * [[akka.http.javadsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the @@ -316,7 +319,26 @@ abstract class BasicDirectives { } /** - * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. + * + * Converts the HttpEntity from the [[akka.http.javadsl.server.RequestContext]] into an + * [[akka.http.javadsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the + * entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + */ + def extractStrictEntity(timeout: FiniteDuration, maxBytes: Long, inner: JFunction[HttpEntity.Strict, Route]): Route = RouteAdapter { + D.extractStrictEntity(timeout, maxBytes) { strict ⇒ inner.apply(strict).delegate } + } + + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. * * Extracts the [[akka.http.javadsl.server.RequestContext]] itself with the strict HTTP entity, * or fails the route if unable to drain the entire request body within the timeout. @@ -327,4 +349,19 @@ abstract class BasicDirectives { D.toStrictEntity(timeout) { inner.get.delegate } } + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. + * + * Extracts the [[akka.http.javadsl.server.RequestContext]] itself with the strict HTTP entity, + * or fails the route if unable to drain the entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + */ + def toStrictEntity(timeout: FiniteDuration, maxBytes: Long, inner: Supplier[Route]): Route = RouteAdapter { + D.toStrictEntity(timeout, maxBytes) { inner.get.delegate } + } + } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala index 85067fededa..0d2548a27b9 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/BasicDirectives.scala @@ -324,7 +324,10 @@ trait BasicDirectives { def extractDataBytes: Directive1[Source[ByteString, Any]] = BasicDirectives._extractDataBytes /** - * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. * * Converts the HttpEntity from the [[akka.http.scaladsl.server.RequestContext]] into an * [[akka.http.scaladsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the @@ -337,7 +340,26 @@ trait BasicDirectives { toStrictEntity(timeout) & extract(_.request.entity.asInstanceOf[HttpEntity.Strict]) /** - * WARNING: This will read the entire request entity into memory regardless of size and effectively disable streaming. + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. + * + * Converts the HttpEntity from the [[akka.http.scaladsl.server.RequestContext]] into an + * [[akka.http.scaladsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the + * entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @group basic + */ + def extractStrictEntity(timeout: FiniteDuration, maxBytes: Long): Directive1[HttpEntity.Strict] = + toStrictEntity(timeout, maxBytes) & extract(_.request.entity.asInstanceOf[HttpEntity.Strict]) + + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. * * Extracts the [[akka.http.scaladsl.server.RequestContext]] itself with the strict HTTP entity, * or fails the route if unable to drain the entire request body within the timeout. @@ -346,15 +368,31 @@ trait BasicDirectives { * @group basic */ def toStrictEntity(timeout: FiniteDuration): Directive0 = + extractParserSettings flatMap { settings ⇒ + toStrictEntity(timeout, settings.maxToStrictBytes) + } + + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `akka.http.parsing.max-to-strict-bytes` configuration setting. + * + * Extracts the [[akka.http.scaladsl.server.RequestContext]] itself with the strict HTTP entity, + * or fails the route if unable to drain the entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @group basic + */ + def toStrictEntity(timeout: FiniteDuration, maxBytes: Long): Directive0 = Directive { inner ⇒ ctx ⇒ import ctx.{ executionContext, materializer } - ctx.request.entity.toStrict(timeout).flatMap { strictEntity ⇒ + ctx.request.entity.toStrict(timeout, maxBytes).flatMap { strictEntity ⇒ val newCtx = ctx.mapRequest(_.copy(entity = strictEntity)) inner(())(newCtx) } } - } object BasicDirectives extends BasicDirectives { From 3adff26f0909bd9dbd063ba216cb323812dbaa3f Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 5 Sep 2018 14:43:20 +0200 Subject: [PATCH 2/6] =pro properly apply mima exclusion rules --- ...10.1.4.backwards.excludes => 10.0.13.backwards.excludes} | 6 ++++++ .../src/main/mima-filters/10.1.0.backwards.excludes | 5 ----- ...10.1.4.backwards.excludes => 10.0.13.backwards.excludes} | 0 project/MiMa.scala | 4 +++- 4 files changed, 9 insertions(+), 6 deletions(-) rename akka-http-core/src/main/mima-filters/{10.1.4.backwards.excludes => 10.0.13.backwards.excludes} (81%) delete mode 100644 akka-http-core/src/main/mima-filters/10.1.0.backwards.excludes rename akka-http/src/main/mima-filters/{10.1.4.backwards.excludes => 10.0.13.backwards.excludes} (100%) diff --git a/akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes b/akka-http-core/src/main/mima-filters/10.0.13.backwards.excludes similarity index 81% rename from akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes rename to akka-http-core/src/main/mima-filters/10.0.13.backwards.excludes index aa52237c60c..74864ea4a8e 100644 --- a/akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes +++ b/akka-http-core/src/main/mima-filters/10.0.13.backwards.excludes @@ -1,3 +1,6 @@ +# Don't monitor changes to internal API +ProblemFilters.exclude[Problem]("akka.http.impl.*") + # Not meant for user extension, so new methods should be fine: ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpEntity.toStrict") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.toStrict") @@ -11,3 +14,6 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ParserSettings.maxToStrictBytes") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.settings.ParserSettingsImpl.*") ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.settings.ParserSettingsImpl.*") + +# Uri conversion additions https://github.com/akka/akka-http/pull/1950 +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.Uri.asScala") diff --git a/akka-http-core/src/main/mima-filters/10.1.0.backwards.excludes b/akka-http-core/src/main/mima-filters/10.1.0.backwards.excludes deleted file mode 100644 index df51e27ae52..00000000000 --- a/akka-http-core/src/main/mima-filters/10.1.0.backwards.excludes +++ /dev/null @@ -1,5 +0,0 @@ -# Don't monitor changes to internal API -ProblemFilters.exclude[Problem]("akka.http.impl.*") - -# Uri conversion additions https://github.com/akka/akka-http/pull/1950 -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.Uri.asScala") diff --git a/akka-http/src/main/mima-filters/10.1.4.backwards.excludes b/akka-http/src/main/mima-filters/10.0.13.backwards.excludes similarity index 100% rename from akka-http/src/main/mima-filters/10.1.4.backwards.excludes rename to akka-http/src/main/mima-filters/10.0.13.backwards.excludes diff --git a/project/MiMa.scala b/project/MiMa.scala index 20747cc50f9..8f9d263b8e7 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -34,7 +34,9 @@ object MiMa extends AutoPlugin { "10.0.8", "10.0.9", "10.0.10", - "10.0.11" + "10.0.11", + "10.0.12", + "10.0.13" ) .collect { case version if !ignoredModules.get(name.value).exists(_.contains(version)) => organization.value %% name.value % version From 76723b781b1856f06cce0cc0692a749cae3c6586 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 5 Sep 2018 11:00:05 +0200 Subject: [PATCH 3/6] [backport from master] =htc don't throw in Limitable stage but use failStage --- .../akka/http/scaladsl/model/HttpEntity.scala | 2 +- .../server/WithoutSizeLimitSpec.scala | 24 ++++++------ .../directives/MiscDirectivesSpec.scala | 37 ++++++++----------- 3 files changed, 28 insertions(+), 35 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 65a334ab92f..592e79b9787 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -638,7 +638,7 @@ object HttpEntity { case Some(limit: SizeLimit) if limit.isDisabled ⇒ // "no limit" case Some(SizeLimit(bytes, cl @ Some(contentLength))) ⇒ - if (contentLength > bytes) throw EntityStreamSizeException(bytes, cl) + if (contentLength > bytes) failStage(EntityStreamSizeException(bytes, cl)) // else we still count but never throw an error case Some(SizeLimit(bytes, None)) ⇒ maxBytes = bytes diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/WithoutSizeLimitSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/WithoutSizeLimitSpec.scala index 310aee6db0d..d62275a0531 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/WithoutSizeLimitSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/WithoutSizeLimitSpec.scala @@ -10,7 +10,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.Http import akka.stream.ActorMaterializer -import akka.testkit.{ EventFilter, SocketUtil, TestKit } +import akka.testkit.{ SocketUtil, TestKit } import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } @@ -49,21 +49,19 @@ class WithoutSizeLimitSpec extends WordSpec with Matchers with RequestBuilding w val (hostName, port) = SocketUtil.temporaryServerHostnameAndPort() - EventFilter[EntityStreamSizeException](occurrences = 1).intercept { - val future = for { - _ ← Http().bindAndHandle(route, hostName, port) + val future = for { + _ ← Http().bindAndHandle(route, hostName, port) - requestToNoDirective = Post(s"http://$hostName:$port/noDirective", entityOfSize(801)) - responseWithoutDirective ← Http().singleRequest(requestToNoDirective) - _ = responseWithoutDirective.status shouldEqual StatusCodes.BadRequest + requestToNoDirective = Post(s"http://$hostName:$port/noDirective", entityOfSize(801)) + responseWithoutDirective ← Http().singleRequest(requestToNoDirective) + _ = responseWithoutDirective.status shouldEqual StatusCodes.BadRequest - requestToDirective = Post(s"http://$hostName:$port/withoutSizeLimit", entityOfSize(801)) - responseWithDirective ← Http().singleRequest(requestToDirective) - } yield responseWithDirective + requestToDirective = Post(s"http://$hostName:$port/withoutSizeLimit", entityOfSize(801)) + responseWithDirective ← Http().singleRequest(requestToDirective) + } yield responseWithDirective - val response = Await.result(future, 5 seconds) - response.status shouldEqual StatusCodes.OK - } + val response = Await.result(future, 5 seconds) + response.status shouldEqual StatusCodes.OK } } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MiscDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MiscDirectivesSpec.scala index d527a9bf352..bf170f1a91a 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MiscDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MiscDirectivesSpec.scala @@ -99,10 +99,9 @@ class MiscDirectivesSpec extends RoutingSpec { status shouldEqual StatusCodes.OK } - EventFilter[EntityStreamSizeException](occurrences = 1).intercept { - Post("/abc", entityOfSize(501)) ~> Route.seal(route) ~> check { - status shouldEqual StatusCodes.BadRequest - } + Post("/abc", entityOfSize(501)) ~> Route.seal(route) ~> check { + status shouldEqual StatusCodes.BadRequest + entityAs[String] should include("exceeded content length limit") } } @@ -118,15 +117,13 @@ class MiscDirectivesSpec extends RoutingSpec { status shouldEqual StatusCodes.OK } - EventFilter[EntityStreamSizeException](occurrences = 1).intercept { - Post("/abc", formDataOfSize(128)) ~> Route.seal(route) ~> check { - status shouldEqual StatusCodes.BadRequest - responseAs[String] shouldEqual "The request content was malformed:\n" + - "EntityStreamSizeException: actual entity size (Some(134)) " + - "exceeded content length limit (64 bytes)! " + - "You can configure this by setting `akka.http.[server|client].parsing.max-content-length` " + - "or calling `HttpEntity.withSizeLimit` before materializing the dataBytes stream." - } + Post("/abc", formDataOfSize(128)) ~> Route.seal(route) ~> check { + status shouldEqual StatusCodes.BadRequest + responseAs[String] shouldEqual "The request content was malformed:\n" + + "EntityStreamSizeException: actual entity size (Some(134)) " + + "exceeded content length limit (64 bytes)! " + + "You can configure this by setting `akka.http.[server|client].parsing.max-content-length` " + + "or calling `HttpEntity.withSizeLimit` before materializing the dataBytes stream." } } @@ -144,10 +141,9 @@ class MiscDirectivesSpec extends RoutingSpec { status shouldEqual StatusCodes.OK } - EventFilter[EntityStreamSizeException](occurrences = 1).intercept { - Post("/abc", entityOfSize(801)) ~> Route.seal(route) ~> check { - status shouldEqual StatusCodes.BadRequest - } + Post("/abc", entityOfSize(801)) ~> Route.seal(route) ~> check { + status shouldEqual StatusCodes.BadRequest + entityAs[String] should include("exceeded content length limit") } val route2 = @@ -163,10 +159,9 @@ class MiscDirectivesSpec extends RoutingSpec { status shouldEqual StatusCodes.OK } - EventFilter[EntityStreamSizeException](occurrences = 1).intercept { - Post("/abc", entityOfSize(401)) ~> Route.seal(route2) ~> check { - status shouldEqual StatusCodes.BadRequest - } + Post("/abc", entityOfSize(401)) ~> Route.seal(route2) ~> check { + status shouldEqual StatusCodes.BadRequest + entityAs[String] should include("exceeded content length limit") } } } From 8c0058b5ab051285f0babfb8b5064304e923fd57 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 5 Sep 2018 11:00:48 +0200 Subject: [PATCH 4/6] [backport from master] =htc apply limitable stage automatically when withSizeLimit / withoutSizeLimit is called This ensures that the size limit will work in all cases. It may add the stage several times if different size limits are added. --- .../engine/parsing/HttpMessageParser.scala | 4 +- .../engine/parsing/HttpResponseParser.scala | 2 +- .../akka/http/scaladsl/model/HttpEntity.scala | 60 +++++++++---------- .../engine/client/PrepareResponseSpec.scala | 2 +- .../engine/server/PrepareRequestsSpec.scala | 2 +- 5 files changed, 34 insertions(+), 36 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala index 3b62063698e..96298f6856c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala @@ -320,7 +320,7 @@ private[http] trait HttpMessageParser[Output >: MessageOutput <: ParserOutput] { case EntityPart(bytes) ⇒ bytes case EntityStreamError(info) ⇒ throw EntityStreamException(info) } - HttpEntity.Default(contentType(cth), contentLength, HttpEntity.limitableByteSource(data)) + HttpEntity.Default(contentType(cth), contentLength, data) } protected final def chunkedEntity[A <: ParserOutput](cth: Option[`Content-Type`]) = @@ -329,7 +329,7 @@ private[http] trait HttpMessageParser[Output >: MessageOutput <: ParserOutput] { case EntityChunk(chunk) ⇒ chunk case EntityStreamError(info) ⇒ throw EntityStreamException(info) } - HttpEntity.Chunked(contentType(cth), HttpEntity.limitableChunkSource(chunks)) + HttpEntity.Chunked(contentType(cth), chunks) } protected final def addTransferEncodingWithChunkedPeeled(headers: List[HttpHeader], teh: `Transfer-Encoding`): List[HttpHeader] = diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala index 1a2b57a878f..5c60a25a9b1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala @@ -188,7 +188,7 @@ private[http] class HttpResponseParser(protected val settings: ParserSettings, p emitResponseStart { StreamedEntityCreator { entityParts ⇒ val data = entityParts.collect { case EntityPart(bytes) ⇒ bytes } - HttpEntity.CloseDelimited(contentType(cth), HttpEntity.limitableByteSource(data)) + HttpEntity.CloseDelimited(contentType(cth), data) } } setCompletionHandling(HttpMessageParser.CompletionOk) diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 592e79b9787..77fbbc5d048 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -149,11 +149,6 @@ sealed trait HttpEntity extends jm.HttpEntity { * Content-Length and then another limit is applied then this new limit will be evaluated against the new * Content-Length. If the entity is transformed in a way that changes the Content-Length and no new limit is applied * then the previous limit will be applied against the previous Content-Length. - * - * Note that the size limit applied via this method will only have any effect if the `Source` instance contained - * in this entity has been appropriately modified via the `HttpEntity.limitable` method. For all entities created - * by the HTTP layer itself this is always the case, but if you create entities yourself and would like them to - * properly respect limits defined via this method you need to make sure to apply `HttpEntity.limitable` yourself. */ override def withSizeLimit(maxBytes: Long): HttpEntity @@ -164,11 +159,6 @@ sealed trait HttpEntity extends jm.HttpEntity { * application's `max-content-length` config setting. It is recommended to always keep an upper limit on accepted * entities to avoid potential attackers flooding you with too large requests/responses, so use this method with caution. * - * Note that the size limit applied via this method will only have any effect if the `Source` instance contained - * in this entity has been appropriately modified via the `HttpEntity.limitable` method. For all entities created - * by the HTTP layer itself this is always the case, but if you create entities yourself and would like them to - * properly respect limits defined via this method you need to make sure to apply `HttpEntity.limitable` yourself. - * * See [[withSizeLimit]] for more details. */ override def withoutSizeLimit: HttpEntity @@ -363,7 +353,7 @@ object HttpEntity { override def withSizeLimit(maxBytes: Long): UniversalEntity = if (data.length <= maxBytes || isKnownEmpty) this - else HttpEntity.Default(contentType, data.length, limitableByteSource(Source.single(data))) withSizeLimit maxBytes + else HttpEntity.Default(contentType, data.length, Source.single(data)) withSizeLimit maxBytes override def withoutSizeLimit: UniversalEntity = withSizeLimit(SizeLimit.Disabled) @@ -421,7 +411,7 @@ object HttpEntity { if (contentType == this.contentType) this else copy(contentType = contentType) override def withSizeLimit(maxBytes: Long): HttpEntity.Default = - copy(data = data withAttributes Attributes(SizeLimit(maxBytes, Some(contentLength)))) + copy(data = Limitable.applyForByteStrings(data, SizeLimit(maxBytes, Some(contentLength)))) override def withoutSizeLimit: HttpEntity.Default = withSizeLimit(SizeLimit.Disabled) @@ -451,10 +441,10 @@ object HttpEntity { override def dataBytes: Source[ByteString, Any] = data override def withSizeLimit(maxBytes: Long): Self = - withData(data withAttributes Attributes(SizeLimit(maxBytes))) + withData(Limitable.applyForByteStrings(data, SizeLimit(maxBytes))) override def withoutSizeLimit: Self = - withData(data withAttributes Attributes(SizeLimit(SizeLimit.Disabled))) + withSizeLimit(SizeLimit.Disabled) override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): Self = withData(data via transformer) @@ -519,7 +509,7 @@ object HttpEntity { override def dataBytes: Source[ByteString, Any] = chunks.map(_.data).filter(_.nonEmpty) override def withSizeLimit(maxBytes: Long): HttpEntity.Chunked = - copy(chunks = chunks withAttributes Attributes(SizeLimit(maxBytes))) + copy(chunks = Limitable.applyForChunks(chunks, SizeLimit(maxBytes))) override def withoutSizeLimit: HttpEntity.Chunked = withSizeLimit(SizeLimit.Disabled) @@ -605,29 +595,31 @@ object HttpEntity { object LastChunk extends LastChunk("", Nil) /** - * Turns the given source into one that respects the `withSizeLimit` calls when used as a parameter - * to entity constructors. + * Deprecated: no-op, not explicitly needed any more. */ + @deprecated("Not needed explicitly any more. ", "10.1.5") def limitableByteSource[Mat](source: Source[ByteString, Mat]): Source[ByteString, Mat] = - source.via(new Limitable(sizeOfByteString)) + source /** - * Turns the given source into one that respects the `withSizeLimit` calls when used as a parameter - * to entity constructors. + * Deprecated: no-op, not explicitly needed any more. */ + @deprecated("Not needed explicitly any more. ", "10.1.5") def limitableChunkSource[Mat](source: Source[ChunkStreamPart, Mat]): Source[ChunkStreamPart, Mat] = - source.via(new Limitable(sizeOfChunkStreamPart)) + source - private val sizeOfByteString: ByteString ⇒ Int = _.size - private val sizeOfChunkStreamPart: ChunkStreamPart ⇒ Int = _.data.size - - private val limitableDefaults = Attributes.name("limitable") + private final case class SizeLimit(maxBytes: Long, contentLength: Option[Long] = None) extends Attributes.Attribute { + def isDisabled = maxBytes < 0 + } + private object SizeLimit { + val Disabled = -1 // any negative value will do + } private final class Limitable[T](sizeOf: T ⇒ Int) extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("Limitable.in") val out = Outlet[T]("Limitable.out") override val shape = FlowShape.of(in, out) - override protected val initialAttributes: Attributes = limitableDefaults + override protected val initialAttributes: Attributes = Limitable.limitableDefaults override def createLogic(attributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { private var maxBytes = -1L @@ -661,12 +653,18 @@ object HttpEntity { setHandlers(in, out, this) } } + private object Limitable { + def applyForByteStrings[Mat](source: Source[ByteString, Mat], limit: SizeLimit): Source[ByteString, Mat] = + applyLimit(source, limit)(_.size) - private final case class SizeLimit(maxBytes: Long, contentLength: Option[Long] = None) extends Attributes.Attribute { - def isDisabled = maxBytes < 0 - } - private object SizeLimit { - val Disabled = -1 // any negative value will do + def applyForChunks[Mat](source: Source[ChunkStreamPart, Mat], limit: SizeLimit): Source[ChunkStreamPart, Mat] = + applyLimit(source, limit)(_.data.size) + + def applyLimit[T, Mat](source: Source[T, Mat], limit: SizeLimit)(sizeOf: T ⇒ Int): Source[T, Mat] = + if (limit.isDisabled) source withAttributes Attributes(limit) // no need to add stage, it's either there or not needed + else source.via(new Limitable(sizeOf)) withAttributes Attributes(limit) + + private val limitableDefaults = Attributes.name("limitable") } /** diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/PrepareResponseSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/PrepareResponseSpec.scala index f3b261ee945..889462c4c8c 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/PrepareResponseSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/PrepareResponseSpec.scala @@ -28,7 +28,7 @@ class PrepareResponseSpec extends AkkaSpec { case EntityChunk(chunk) ⇒ chunk case EntityStreamError(info) ⇒ throw EntityStreamException(info) } - HttpEntity.Chunked(ContentTypes.`application/octet-stream`, HttpEntity.limitableChunkSource(chunks)) + HttpEntity.Chunked(ContentTypes.`application/octet-stream`, chunks) }, closeRequested = false) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala index e32d944600e..593c3084bf4 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala @@ -29,7 +29,7 @@ class PrepareRequestsSpec extends AkkaSpec { case EntityChunk(chunk) ⇒ chunk case EntityStreamError(info) ⇒ throw EntityStreamException(info) } - HttpEntity.Chunked(ContentTypes.`application/octet-stream`, HttpEntity.limitableChunkSource(chunks)) + HttpEntity.Chunked(ContentTypes.`application/octet-stream`, chunks) }, expect100Continue = true, closeRequested = false) From df3b682848b3844b8cde92e96791f9b610f62e6e Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Thu, 30 Aug 2018 18:14:57 +0200 Subject: [PATCH 5/6] [backport from master] =htp add size limit to `decodeRequest` directives, ref #2137 --- .../mima-filters/10.0.13.backwards.excludes | 6 + .../impl/settings/RoutingSettingsImpl.scala | 2 + .../javadsl/settings/RoutingSettings.scala | 1 + .../scaladsl/settings/RoutingSettings.scala | 3 + .../javadsl/settings/RoutingSettingsTest.java | 1 + .../engine/parsing/RequestParserSpec.scala | 2 +- .../settings/SettingsEqualitySpec.scala | 1 + .../http/scaladsl/server/SizeLimitSpec.scala | 241 ++++++++++++++++++ akka-http/src/main/resources/reference.conf | 9 + .../akka/http/scaladsl/coding/Decoder.scala | 8 +- .../server/directives/CodingDirectives.scala | 11 +- .../coding-directives/decodeRequest.md | 6 +- .../coding-directives/decodeRequestWith.md | 6 +- 13 files changed, 283 insertions(+), 14 deletions(-) create mode 100644 akka-http-tests/src/test/scala/akka/http/scaladsl/server/SizeLimitSpec.scala diff --git a/akka-http-core/src/main/mima-filters/10.0.13.backwards.excludes b/akka-http-core/src/main/mima-filters/10.0.13.backwards.excludes index 74864ea4a8e..790c71e5264 100644 --- a/akka-http-core/src/main/mima-filters/10.0.13.backwards.excludes +++ b/akka-http-core/src/main/mima-filters/10.0.13.backwards.excludes @@ -17,3 +17,9 @@ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.settings.P # Uri conversion additions https://github.com/akka/akka-http/pull/1950 ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.Uri.asScala") + +# RoutingSettings is @DoNotInherit +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.RoutingSettings.decodeMaxSize") +# Impl classes +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.settings.RoutingSettingsImpl.*") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.settings.RoutingSettingsImpl.*") diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala index 46c194984b4..00f080e4321 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala @@ -17,6 +17,7 @@ private[http] final case class RoutingSettingsImpl( rangeCountLimit: Int, rangeCoalescingThreshold: Long, decodeMaxBytesPerChunk: Int, + decodeMaxSize: Long, fileIODispatcher: String) extends akka.http.scaladsl.settings.RoutingSettings { override def productPrefix = "RoutingSettings" @@ -30,5 +31,6 @@ object RoutingSettingsImpl extends SettingsCompanion[RoutingSettingsImpl]("akka. c getInt "range-count-limit", c getBytes "range-coalescing-threshold", c getIntBytes "decode-max-bytes-per-chunk", + c getPossiblyInfiniteBytes "decode-max-size", c getString "file-io-dispatcher") } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/settings/RoutingSettings.scala b/akka-http-core/src/main/scala/akka/http/javadsl/settings/RoutingSettings.scala index d6b0fcdff8c..9b4d5bdf00e 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/settings/RoutingSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/settings/RoutingSettings.scala @@ -28,6 +28,7 @@ abstract class RoutingSettings private[akka] () { self: RoutingSettingsImpl ⇒ def withRangeCountLimit(rangeCountLimit: Int): RoutingSettings = self.copy(rangeCountLimit = rangeCountLimit) def withRangeCoalescingThreshold(rangeCoalescingThreshold: Long): RoutingSettings = self.copy(rangeCoalescingThreshold = rangeCoalescingThreshold) def withDecodeMaxBytesPerChunk(decodeMaxBytesPerChunk: Int): RoutingSettings = self.copy(decodeMaxBytesPerChunk = decodeMaxBytesPerChunk) + def withDecodeMaxSize(decodeMaxSize: Long): RoutingSettings = self.copy(decodeMaxSize = decodeMaxSize) def withFileIODispatcher(fileIODispatcher: String): RoutingSettings = self.copy(fileIODispatcher = fileIODispatcher) } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/RoutingSettings.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/RoutingSettings.scala index 0a9c217e4b9..19825a688b4 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/RoutingSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/RoutingSettings.scala @@ -19,6 +19,7 @@ abstract class RoutingSettings private[akka] () extends akka.http.javadsl.settin def rangeCountLimit: Int def rangeCoalescingThreshold: Long def decodeMaxBytesPerChunk: Int + def decodeMaxSize: Long def fileIODispatcher: String /* Java APIs */ @@ -28,6 +29,7 @@ abstract class RoutingSettings private[akka] () extends akka.http.javadsl.settin def getRangeCountLimit: Int = rangeCountLimit def getRangeCoalescingThreshold: Long = rangeCoalescingThreshold def getDecodeMaxBytesPerChunk: Int = decodeMaxBytesPerChunk + def getDecodeMaxSize: Long = decodeMaxSize def getFileIODispatcher: String = fileIODispatcher override def withVerboseErrorMessages(verboseErrorMessages: Boolean): RoutingSettings = self.copy(verboseErrorMessages = verboseErrorMessages) @@ -36,6 +38,7 @@ abstract class RoutingSettings private[akka] () extends akka.http.javadsl.settin override def withRangeCountLimit(rangeCountLimit: Int): RoutingSettings = self.copy(rangeCountLimit = rangeCountLimit) override def withRangeCoalescingThreshold(rangeCoalescingThreshold: Long): RoutingSettings = self.copy(rangeCoalescingThreshold = rangeCoalescingThreshold) override def withDecodeMaxBytesPerChunk(decodeMaxBytesPerChunk: Int): RoutingSettings = self.copy(decodeMaxBytesPerChunk = decodeMaxBytesPerChunk) + override def withDecodeMaxSize(decodeMaxSize: Long): RoutingSettings = self.copy(decodeMaxSize = decodeMaxSize) override def withFileIODispatcher(fileIODispatcher: String): RoutingSettings = self.copy(fileIODispatcher = fileIODispatcher) } diff --git a/akka-http-core/src/test/java/akka/http/javadsl/settings/RoutingSettingsTest.java b/akka-http-core/src/test/java/akka/http/javadsl/settings/RoutingSettingsTest.java index b6531887c4d..be12e795bfa 100644 --- a/akka-http-core/src/test/java/akka/http/javadsl/settings/RoutingSettingsTest.java +++ b/akka-http-core/src/test/java/akka/http/javadsl/settings/RoutingSettingsTest.java @@ -22,6 +22,7 @@ public void testCreateWithActorSystem() { " range-coalescing-threshold = 80\n" + " range-count-limit = 16\n" + " decode-max-bytes-per-chunk = 1m\n" + + " decode-max-size = 8m\n" + " file-io-dispatcher = \"test-only\"\n" + "}"; Config config = ConfigFactory.parseString(testConfig); diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala index d0f5f6f972b..c0e6b052986 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala @@ -38,7 +38,7 @@ abstract class RequestParserSpec(mode: String, newLine: String) extends FreeSpec akka.loglevel = WARNING akka.http.parsing.max-header-value-length = 32 akka.http.parsing.max-uri-length = 40 - akka.http.parsing.max-content-length = 4000000000""") + akka.http.parsing.max-content-length = infinite""") implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/settings/SettingsEqualitySpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/settings/SettingsEqualitySpec.scala index f5dfb3c340f..7ab0485c6a2 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/settings/SettingsEqualitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/settings/SettingsEqualitySpec.scala @@ -19,6 +19,7 @@ class SettingsEqualitySpec extends WordSpec with Matchers { range-coalescing-threshold = 80 range-count-limit = 16 decode-max-bytes-per-chunk = 1m + decode-max-size = 8m file-io-dispatcher = ${akka.stream.blocking-io-dispatcher} } """).withFallback(ConfigFactory.load).resolve diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/SizeLimitSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/SizeLimitSpec.scala new file mode 100644 index 00000000000..f603b88d61e --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/SizeLimitSpec.scala @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.http.scaladsl.server + +import akka.NotUsed + +import scala.collection.immutable +import akka.actor.ActorSystem +import akka.http.scaladsl.client.RequestBuilding +import akka.http.scaladsl.coding.{ Decoder, Gzip } +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.HttpEntity.Chunk +import akka.http.scaladsl.model.headers.{ HttpEncoding, HttpEncodings, `Content-Encoding` } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Flow, Source } +import akka.testkit.{ EventFilter, TestKit } +import akka.util.ByteString +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{ Millis, Seconds, Span } + +class SizeLimitSpec extends WordSpec with Matchers with RequestBuilding with BeforeAndAfterAll with ScalaFutures { + + val maxContentLength = 800 + // Protect network more than memory: + val decodeMaxSize = 1600 + + val testConf: Config = ConfigFactory.parseString(s""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.loglevel = ERROR + akka.stdout-loglevel = ERROR + akka.http.parsing.max-content-length = $maxContentLength + akka.http.routing.decode-max-size = $decodeMaxSize + """) + implicit val system = ActorSystem(getClass.getSimpleName, testConf) + import system.dispatcher + implicit val materializer = ActorMaterializer() + val random = new scala.util.Random(42) + + implicit val defaultPatience = PatienceConfig(timeout = Span(2, Seconds), interval = Span(5, Millis)) + + "a normal route" should { + val route = path("noDirective") { + post { + entity(as[String]) { _ ⇒ + complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

")) + } + } + } + + val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue + + "accept small POST requests" in { + Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength))) + .futureValue.status shouldEqual StatusCodes.OK + } + + "not accept entities bigger than configured with akka.http.parsing.max-content-length" in { + Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength + 1))) + .futureValue.status shouldEqual StatusCodes.BadRequest + } + } + + "a route with decodeRequest" should { + val route = path("noDirective") { + decodeRequest { + post { + entity(as[String]) { e ⇒ + complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters")) + } + } + } + } + + val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue + + "accept a small request" in { + val response = Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength))).futureValue + response.status shouldEqual StatusCodes.OK + response.entity.dataBytes.runReduce(_ ++ _).futureValue.utf8String shouldEqual (s"Got request with entity of $maxContentLength characters") + } + + "reject a small request that decodes into a large entity" in { + val data = ByteString.fromString("0" * (decodeMaxSize + 1)) + val zippedData = Gzip.encode(data) + val request = HttpRequest( + HttpMethods.POST, + s"http:/${binding.localAddress}/noDirective", + immutable.Seq(`Content-Encoding`(HttpEncodings.gzip)), + HttpEntity(ContentTypes.`text/plain(UTF-8)`, zippedData)) + + zippedData.size should be <= maxContentLength + data.size should be > decodeMaxSize + + Http().singleRequest(request) + .futureValue.status shouldEqual StatusCodes.BadRequest + } + } + + "a route with decodeRequest that results in a large chunked entity" should { + val decoder = decodeTo(chunkedEntityOfSize(decodeMaxSize + 1)) + + val route = path("noDirective") { + decodeRequestWith(decoder) { + post { + entity(as[String]) { e ⇒ + complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters")) + } + } + } + } + + val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue + + "reject a small request that decodes into a large chunked entity" in { + val request = Post(s"http:/${binding.localAddress}/noDirective", "x").withHeaders(`Content-Encoding`(HttpEncoding("custom"))) + val response = Http().singleRequest(request).futureValue + response.status shouldEqual StatusCodes.BadRequest + } + } + + "a route with decodeRequest that results in a large non-chunked streaming entity" should { + val decoder = decodeTo(nonChunkedEntityOfSize(decodeMaxSize + 1)) + + val route = path("noDirective") { + decodeRequestWith(decoder) { + post { + entity(as[String]) { e ⇒ + complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters")) + } + } + } + } + + val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue + + "reject a small request that decodes into a large non-chunked streaming entity" in { + val request = Post(s"http:/${binding.localAddress}/noDirective", "x").withHeaders(`Content-Encoding`(HttpEncoding("custom"))) + val response = Http().singleRequest(request).futureValue + response.status shouldEqual StatusCodes.BadRequest + } + } + + "a route with decodeRequest followed by withoutSizeLimit" should { + val route = path("noDirective") { + decodeRequest { + withoutSizeLimit { + post { + entity(as[String]) { e ⇒ + complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters")) + } + } + } + } + } + + val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue + + "accept a small request" in { + Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength))) + .futureValue.status shouldEqual StatusCodes.OK + } + + "accept a small request that decodes into a large entity" in { + val data = ByteString.fromString("0" * (decodeMaxSize + 1)) + val zippedData = Gzip.encode(data) + val request = HttpRequest( + HttpMethods.POST, + s"http:/${binding.localAddress}/noDirective", + immutable.Seq(`Content-Encoding`(HttpEncodings.gzip)), + HttpEntity(ContentTypes.`text/plain(UTF-8)`, zippedData)) + + zippedData.size should be <= maxContentLength + data.size should be > decodeMaxSize + + val response = Http().singleRequest(request).futureValue + response.status shouldEqual StatusCodes.OK + response.entity.dataBytes.runReduce(_ ++ _).futureValue.utf8String shouldEqual (s"Got request with entity of ${decodeMaxSize + 1} characters") + } + + "accept a large request that decodes into a large entity" in { + val data = new Array[Byte](decodeMaxSize) + random.nextBytes(data) + val zippedData = Gzip.encode(ByteString(data)) + val request = HttpRequest( + HttpMethods.POST, + s"http:/${binding.localAddress}/noDirective", + immutable.Seq(`Content-Encoding`(HttpEncodings.gzip)), + HttpEntity(ContentTypes.`text/plain(UTF-8)`, zippedData)) + + zippedData.size should be > maxContentLength + data.length should be <= decodeMaxSize + + Http().singleRequest(request) + .futureValue.status shouldEqual StatusCodes.OK + } + } + + "the withoutSizeLimit directive" should { + val route = path("withoutSizeLimit") { + post { + withoutSizeLimit { + entity(as[String]) { _ ⇒ + complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

")) + } + } + } + } + + val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue + + "accept entities bigger than configured with akka.http.parsing.max-content-length" in { + Http().singleRequest(Post(s"http:/${binding.localAddress}/withoutSizeLimit", entityOfSize(maxContentLength + 1))) + .futureValue.status shouldEqual StatusCodes.OK + } + } + + override def afterAll() = TestKit.shutdownActorSystem(system) + + private def byteSource(size: Int): Source[ByteString, Any] = Source(Array.fill[ByteString](size)(ByteString("0")).toVector) + + private def chunkedEntityOfSize(size: Int) = HttpEntity.Chunked(ContentTypes.`text/plain(UTF-8)`, byteSource(size).map(Chunk(_))) + private def nonChunkedEntityOfSize(size: Int): MessageEntity = HttpEntity.Default(ContentTypes.`text/plain(UTF-8)`, size, byteSource(size)) + private def entityOfSize(size: Int) = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "0" * size) + + private def decodeTo(result: MessageEntity): Decoder = new Decoder { + override def encoding: HttpEncoding = HttpEncoding("custom") + + override def maxBytesPerChunk: Int = 1000 + override def withMaxBytesPerChunk(maxBytesPerChunk: Int): Decoder = this + + override def decoderFlow: Flow[ByteString, ByteString, NotUsed] = ??? + + override def decodeMessage(message: HttpMessage) = message.withEntity(result) + } +} diff --git a/akka-http/src/main/resources/reference.conf b/akka-http/src/main/resources/reference.conf index 654c1dda9e1..e92628acbba 100644 --- a/akka-http/src/main/resources/reference.conf +++ b/akka-http/src/main/resources/reference.conf @@ -37,6 +37,15 @@ akka.http.routing { # for an entity data stream. decode-max-bytes-per-chunk = 1m + # Maximum content length after applying a decoding directive. When the directive + # decompresses, for example, an entity compressed with gzip, the resulting stream can be much + # larger than the max-content-length. Like with max-content-length, this is not necessarilly a + # problem when consuming the entity in a streaming fashion, but does risk high memory use + # when the entity is made strict or marshalled into an in-memory object. + # This limit (like max-content-length) can be overridden on a case-by-case basis using the + # withSizeLimit directive. + decode-max-size = 8m + # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors for IO operations. file-io-dispatcher = ${akka.stream.blocking-io-dispatcher} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/coding/Decoder.scala b/akka-http/src/main/scala/akka/http/scaladsl/coding/Decoder.scala index e2752b2043d..2309f25cbb1 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/coding/Decoder.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/coding/Decoder.scala @@ -13,13 +13,19 @@ import headers.HttpEncoding import akka.stream.scaladsl.{ Flow, Sink, Source } import scala.concurrent.Future +import scala.util.control.NonFatal trait Decoder { def encoding: HttpEncoding def decodeMessage(message: HttpMessage): message.Self = if (message.headers exists Encoder.isContentEncodingHeader) - message.transformEntityDataBytes(decoderFlow).withHeaders(message.headers filterNot Encoder.isContentEncodingHeader) + message.transformEntityDataBytes(decoderFlow.recover { + case NonFatal(e) ⇒ + throw IllegalRequestException( + StatusCodes.BadRequest, + ErrorInfo("The request's encoding is corrupt", e.getMessage)) + }).withHeaders(message.headers filterNot Encoder.isContentEncodingHeader) else message.self @deprecated("Use Decoder#decodeMessage instead. No need for implicit mapper.", since = "10.0.6") diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/CodingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/CodingDirectives.scala index 5b8bdaf2e12..9e7a3643e69 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/CodingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/CodingDirectives.scala @@ -80,16 +80,7 @@ trait CodingDirectives { else extractSettings flatMap { settings ⇒ val effectiveDecoder = decoder.withMaxBytesPerChunk(settings.decodeMaxBytesPerChunk) - mapRequest { request ⇒ - effectiveDecoder.decodeMessage(request).mapEntity { entity ⇒ - entity.transformDataBytes(Flow[ByteString].recover { - case NonFatal(e) ⇒ - throw IllegalRequestException( - StatusCodes.BadRequest, - ErrorInfo("The request's encoding is corrupt", e.getMessage)) - }) - } - } + mapRequest(effectiveDecoder.decodeMessage(_)) & withSizeLimit(settings.decodeMaxSize) } requestEntityEmpty | ( diff --git a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md index e4e60c905b8..6f72b67dd0d 100644 --- a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md +++ b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md @@ -10,7 +10,11 @@ ## Description -Decompresses the incoming request if it is `gzip` or `deflate` compressed. Uncompressed requests are passed through untouched. If the request encoded with another encoding the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection]. +Decompresses the incoming request if it is `gzip` or `deflate` compressed. Uncompressed requests are passed through untouched. +If the request encoded with another encoding the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection]. +If the request entity after decoding exceeds `akka.http.routing.decode-max-size` the stream fails with an +@unidoc[akka.http.scaladsl.model.EntityStreamSizeException]. + ## Example diff --git a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md index fe14f097de9..d856b378e7c 100644 --- a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md +++ b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md @@ -10,7 +10,11 @@ ## Description -Decodes the incoming request if it is encoded with one of the given encoders. If the request encoding doesn't match one of the given encoders the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection]. If no decoders are given the default encoders (`Gzip`, `Deflate`, `NoCoding`) are used. +Decodes the incoming request if it is encoded with one of the given encoders. +If the request encoding doesn't match one of the given encoders the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection]. If no decoders are given the default encoders (`Gzip`, `Deflate`, `NoCoding`) are used. +If the request entity after decoding exceeds `akka.http.routing.decode-max-size` the stream fails with an +@unidoc[akka.http.scaladsl.model.EntityStreamSizeException]. + ## Example From a6197bbd05aea8f2db4d862e737917a084537d85 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 6 Sep 2018 11:40:44 +0200 Subject: [PATCH 6/6] =doc Remove broken API doc link for 10.0.x. Doesn't seem too useful for backporting any paradox fixes for that one. --- .../routing-dsl/directives/coding-directives/decodeRequest.md | 2 +- .../directives/coding-directives/decodeRequestWith.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md index 6f72b67dd0d..6d8e614b0de 100644 --- a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md +++ b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequest.md @@ -13,7 +13,7 @@ Decompresses the incoming request if it is `gzip` or `deflate` compressed. Uncompressed requests are passed through untouched. If the request encoded with another encoding the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection]. If the request entity after decoding exceeds `akka.http.routing.decode-max-size` the stream fails with an -@unidoc[akka.http.scaladsl.model.EntityStreamSizeException]. +`akka.http.scaladsl.model.EntityStreamSizeException`. ## Example diff --git a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md index d856b378e7c..c94766d7f33 100644 --- a/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md +++ b/docs/src/main/paradox/routing-dsl/directives/coding-directives/decodeRequestWith.md @@ -13,7 +13,7 @@ Decodes the incoming request if it is encoded with one of the given encoders. If the request encoding doesn't match one of the given encoders the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection]. If no decoders are given the default encoders (`Gzip`, `Deflate`, `NoCoding`) are used. If the request entity after decoding exceeds `akka.http.routing.decode-max-size` the stream fails with an -@unidoc[akka.http.scaladsl.model.EntityStreamSizeException]. +`akka.http.scaladsl.model.EntityStreamSizeException`. ## Example