Skip to content

Commit

Permalink
http2: don't crash if stream is enqueued at multiplexer multiple times (
Browse files Browse the repository at this point in the history
#3887)

Fixes #3886
  • Loading branch information
jrudolph authored Jul 26, 2021
1 parent 017543d commit 35d5f8e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,23 +330,24 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
trait Sending extends StreamState { _: Product =>
protected def outStream: OutStream

override def pullNextFrame(maxSize: Int): (StreamState, PullFrameResult) = {
val frame = outStream.nextFrame(maxSize)

val res =
outStream.endStreamIfPossible() match {
case Some(trailer) =>
PullFrameResult.SendFrameAndTrailer(frame, trailer)
case None =>
PullFrameResult.SendFrame(frame, outStream.canSend)
}
override def pullNextFrame(maxSize: Int): (StreamState, PullFrameResult) =
if (outStream.canSend) {
val frame = outStream.nextFrame(maxSize)

val res =
outStream.endStreamIfPossible() match {
case Some(trailer) =>
PullFrameResult.SendFrameAndTrailer(frame, trailer)
case None =>
PullFrameResult.SendFrame(frame, outStream.canSend)
}

val nextState =
if (outStream.isDone) handleOutgoingEnded()
else this
val nextState =
if (outStream.isDone) handleOutgoingEnded()
else this

(nextState, res)
}
(nextState, res)
} else (this, PullFrameResult.NothingToSend)

def handleWindowUpdate(windowUpdate: WindowUpdateFrame): StreamState = increaseWindow(windowUpdate.windowSizeIncrement)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,25 @@ class Http2ServerSpec extends AkkaSpecWithMaterializer("""
entityDataOut.sendComplete()
network.expectDATA(TheStreamId, endStream = true, ByteString.empty)
}
"keep sending entity data when WINDOW_UPDATE is received intermediately" inAssertAllStagesStopped new WaitingForResponseDataSetup {
val data1 = ByteString("abcd")
entityDataOut.sendNext(data1)

network.sendWINDOW_UPDATE(TheStreamId, 100)

val data2 = ByteString("efghij")
entityDataOut.sendNext(data2)

val (false, data) = network.expectDATAFrame(TheStreamId)
data shouldEqual data1 ++ data2

// now don't fail if there's demand on the line
network.plainDataProbe.request(1)
network.expectNoBytes(100.millis)

entityDataOut.sendComplete()
network.expectDATA(TheStreamId, endStream = true, ByteString.empty)
}

"parse priority frames" inAssertAllStagesStopped new WaitingForResponseDataSetup {
network.sendPRIORITY(TheStreamId, true, 0, 5)
Expand Down

0 comments on commit 35d5f8e

Please sign in to comment.