Skip to content

Commit

Permalink
http2: use mutable.Queue for state in multiplexer (#3873)
Browse files Browse the repository at this point in the history
Co-authored-by: Arnout Engelen <arnout@engelen.eu>
  • Loading branch information
jrudolph and raboof authored Jul 22, 2021
1 parent a9140d2 commit 867a8a5
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
push(frameOut, event)
}

val multiplexer = createMultiplexer(StreamPrioritizer.first())
val multiplexer = createMultiplexer(StreamPrioritizer.First)
setHandler(frameOut, multiplexer)

val pingState = ConfigurablePing.PingState(http2Settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import akka.macros.LogHelper
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
import akka.stream.stage.StageLogging
import com.github.ghik.silencer.silent

import scala.collection.immutable
import scala.collection.mutable

/**
* INTERNAL API
Expand Down Expand Up @@ -41,11 +42,10 @@ private[http2] trait Http2Multiplexer {
}

@InternalApi
private[http2] sealed abstract class PullFrameResult {
def frame: DataFrame
}
private[http2] sealed abstract class PullFrameResult
@InternalApi
private[http2] object PullFrameResult {
final case object NothingToSend extends PullFrameResult
final case class SendFrame(frame: DataFrame, hasMore: Boolean) extends PullFrameResult
final case class SendFrameAndTrailer(frame: DataFrame, trailer: FrameEvent) extends PullFrameResult
}
Expand Down Expand Up @@ -130,7 +130,10 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage
private var _state: MultiplexerState = Idle

def hasFlushedAllData: Boolean = allDataFlushed(_state)
private def allDataFlushed(state: MultiplexerState): Boolean = (state == WaitingForData || state == Idle)
private def allDataFlushed(state: MultiplexerState): Boolean = (state eq WaitingForData) || (state eq Idle)

private val controlFrameBuffer: mutable.Queue[FrameEvent] = new mutable.Queue[FrameEvent]
private val sendableOutstreams: mutable.Queue[Int] = new mutable.Queue[Int]

private def updateState(transition: MultiplexerState => MultiplexerState): Unit = {
val oldState = _state
Expand All @@ -139,34 +142,48 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage

if (isDebugEnabled && newState.name != oldState.name) recordStateChange(oldState.name, newState.name)
if (allDataFlushed(newState)) onAllDataFlushed()
allowReadingIncomingFrames(controlFrameBuffer.size < settings.outgoingControlFrameBufferSize)
}

private[http2] sealed trait MultiplexerState extends Product {
sealed trait MultiplexerState extends Product {
def name: String = productPrefix

def onPull(): MultiplexerState
@silent("references private")
def pushControlFrame(frame: FrameEvent): MultiplexerState
def connectionWindowAvailable(): MultiplexerState
def enqueueOutStream(streamId: Int): MultiplexerState
def closeStream(streamId: Int): MultiplexerState

protected def sendDataFrame(streamId: Int, sendableOutstreams: immutable.Set[Int]): MultiplexerState = {
protected def sendDataFrame(streamId: Int): MultiplexerState = {
val maxBytesToSend = currentMaxFrameSize min connectionWindowLeft
val result = pullNextFrame(streamId, maxBytesToSend)
val frame = result.frame
pushFrameOut(frame)
connectionWindowLeft -= frame.payload.length
def send(frame: DataFrame): Unit = {
pushFrameOut(frame)
connectionWindowLeft -= frame.payload.length
}

result match {
case PullFrameResult.SendFrame(_, hasMore) =>
if (hasMore) WaitingForNetworkToSendData(sendableOutstreams + streamId)
else {
val remainingStreams = sendableOutstreams - streamId
if (remainingStreams.isEmpty) Idle
else WaitingForNetworkToSendData(remainingStreams)
case PullFrameResult.SendFrame(frame, hasMore) =>
send(frame)
if (hasMore) {
sendableOutstreams += streamId
WaitingForNetworkToSendData
} else {
if (sendableOutstreams.isEmpty) Idle
else WaitingForNetworkToSendData
}
case PullFrameResult.SendFrameAndTrailer(_, trailer) =>
WaitingForNetworkToSendControlFrames(Vector(trailer), sendableOutstreams - streamId)
case PullFrameResult.SendFrameAndTrailer(frame, trailer) =>
send(frame)
controlFrameBuffer += trailer
WaitingForNetworkToSendControlFrames
case PullFrameResult.NothingToSend =>
// We are pulled but the stream that wanted to send, now chose otherwise.
// This can happen if either the stream got closed in the meantime, or if the stream was added to the queue
// multiple times, which can happen because `enqueueOutStream` is supposed to be idempotent but we don't check
// if we added an element several times to the queue (because it's inefficient).
if (sendableOutstreams.isEmpty) WaitingForData
else WaitingForNetworkToSendData.onPull()
}
}
}
Expand All @@ -178,11 +195,17 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage
// WaitingForNetworkToSendData: Data frames queued but no network demand
// WaitingForConnectionWindow: Data frames queued, demand from the network, but no connection-level window available

private[http2] case object Idle extends MultiplexerState {
case object Idle extends MultiplexerState {
def onPull(): MultiplexerState = WaitingForData
def pushControlFrame(frame: FrameEvent): MultiplexerState = WaitingForNetworkToSendControlFrames(Vector(frame), immutable.TreeSet.empty)
def pushControlFrame(frame: FrameEvent): MultiplexerState = {
controlFrameBuffer += frame
WaitingForNetworkToSendControlFrames
}
def connectionWindowAvailable(): MultiplexerState = this
def enqueueOutStream(streamId: Int): MultiplexerState = WaitingForNetworkToSendData(immutable.TreeSet(streamId))
def enqueueOutStream(streamId: Int): MultiplexerState = {
sendableOutstreams += streamId
WaitingForNetworkToSendData
}
def closeStream(streamId: Int): MultiplexerState = this
}

Expand All @@ -194,99 +217,89 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage
}
def connectionWindowAvailable(): MultiplexerState = this // nothing to do, as there is no data to send
def enqueueOutStream(streamId: Int): MultiplexerState =
if (connectionWindowLeft == 0) WaitingForConnectionWindow(immutable.TreeSet(streamId))
else sendDataFrame(streamId, Set.empty)
if (connectionWindowLeft == 0) {
sendableOutstreams += streamId
WaitingForConnectionWindow
} else sendDataFrame(streamId)
def closeStream(streamId: Int): MultiplexerState = this
}

/** Not yet pulled but data waiting to be sent */
private[http2] case class WaitingForNetworkToSendControlFrames(controlFrameBuffer: immutable.Vector[FrameEvent], sendableOutstreams: immutable.Set[Int]) extends MultiplexerState {
require(controlFrameBuffer.nonEmpty)
allowReadingIncomingFrames(controlFrameBuffer.size < settings.outgoingControlFrameBufferSize)
def onPull(): MultiplexerState = controlFrameBuffer match {
case first +: remaining =>
pushFrameOut(first)
allowReadingIncomingFrames(remaining.length < settings.outgoingControlFrameBufferSize)
if (remaining.isEmpty && sendableOutstreams.isEmpty) Idle
else if (remaining.isEmpty) WaitingForNetworkToSendData(sendableOutstreams)
else copy(remaining, sendableOutstreams)
case object WaitingForNetworkToSendControlFrames extends MultiplexerState {
def onPull(): MultiplexerState = {
val first = controlFrameBuffer.dequeue()
pushFrameOut(first)
if (controlFrameBuffer.isEmpty && sendableOutstreams.isEmpty) Idle
else if (controlFrameBuffer.isEmpty) WaitingForNetworkToSendData
else this
}
def pushControlFrame(frame: FrameEvent): MultiplexerState = {
controlFrameBuffer += frame
this
}
def pushControlFrame(frame: FrameEvent): MultiplexerState = copy(controlFrameBuffer = controlFrameBuffer :+ frame)
def connectionWindowAvailable(): MultiplexerState = this
def enqueueOutStream(streamId: Int): MultiplexerState =
if (!sendableOutstreams.contains(streamId))
copy(sendableOutstreams = sendableOutstreams + streamId)
else
this
def enqueueOutStream(streamId: Int): MultiplexerState = {
sendableOutstreams += streamId
this
}

def closeStream(streamId: Int): MultiplexerState =
if (sendableOutstreams.contains(streamId)) {
val sendableExceptClosed = sendableOutstreams - streamId
copy(sendableOutstreams = sendableExceptClosed)
} else
this
def closeStream(streamId: Int): MultiplexerState = {
// leave stream in sendableOutstreams, to be skipped in sendDataFrame
this
}
}

private[http2] abstract class WithSendableOutStreams extends MultiplexerState {
def sendableOutstreams: immutable.Set[Int]
def withSendableOutstreams(sendableOutStreams: immutable.Set[Int]): WithSendableOutStreams

protected def sendNext(): MultiplexerState = {
val chosenId = prioritizer.chooseSubstream(sendableOutstreams)
sendDataFrame(chosenId, sendableOutstreams)
}
abstract class WithSendableOutStreams extends MultiplexerState {
protected def sendNext(): MultiplexerState =
if (prioritizer eq StreamPrioritizer.First)
sendDataFrame(sendableOutstreams.dequeue())
else {
val chosenId = prioritizer.chooseSubstream(sendableOutstreams.toSet)
// expensive operation, to be optimized when prioritizers can be configured
// in 2.12.x there's no Queue.-=, when 2.12.x support is dropped, this can be
// changed to Queue.-=
sendableOutstreams.dequeueAll(_ == chosenId)
sendDataFrame(chosenId)
}

def closeStream(streamId: Int): MultiplexerState =
if (sendableOutstreams.contains(streamId)) {
val sendableExceptClosed = sendableOutstreams - streamId

if (sendableExceptClosed.isEmpty)
if (pulled) WaitingForData else Idle
else withSendableOutstreams(sendableExceptClosed)
} else
this
// leave stream in sendableOutstreams, to be skipped in sendDataFrame
this

def pulled: Boolean
}

private[http2] case class WaitingForNetworkToSendData(sendableOutstreams: immutable.Set[Int]) extends WithSendableOutStreams {
require(sendableOutstreams.nonEmpty)
case object WaitingForNetworkToSendData extends WithSendableOutStreams {
def onPull(): MultiplexerState =
if (connectionWindowLeft > 0) sendNext()
else // do nothing and wait for window first
WaitingForConnectionWindow(sendableOutstreams)
WaitingForConnectionWindow

def pushControlFrame(frame: FrameEvent): MultiplexerState = WaitingForNetworkToSendControlFrames(Vector(frame), sendableOutstreams)
def pushControlFrame(frame: FrameEvent): MultiplexerState = {
controlFrameBuffer += frame
WaitingForNetworkToSendControlFrames
}
def connectionWindowAvailable(): MultiplexerState = this
def enqueueOutStream(streamId: Int): MultiplexerState =
if (!sendableOutstreams.contains(streamId))
copy(sendableOutstreams = sendableOutstreams + streamId)
else
this

def withSendableOutstreams(sendableOutStreams: Set[Int]) =
WaitingForNetworkToSendData(sendableOutStreams)
def enqueueOutStream(streamId: Int): MultiplexerState = {
sendableOutstreams += streamId
this
}

override def pulled = false
}

/** Pulled and data is pending but no connection-level window available */
private[http2] case class WaitingForConnectionWindow(sendableOutstreams: immutable.Set[Int]) extends WithSendableOutStreams {
require(sendableOutstreams.nonEmpty)
case object WaitingForConnectionWindow extends WithSendableOutStreams {
def onPull(): MultiplexerState = throw new IllegalStateException(s"pull unexpected while waiting for connection window")
def pushControlFrame(frame: FrameEvent): MultiplexerState = {
pushFrameOut(frame)
WaitingForNetworkToSendData(sendableOutstreams)
WaitingForNetworkToSendData
}
def connectionWindowAvailable(): MultiplexerState = sendNext()
def enqueueOutStream(streamId: Int): MultiplexerState =
if (!sendableOutstreams.contains(streamId))
copy(sendableOutstreams = sendableOutstreams + streamId)
else
this

def withSendableOutstreams(sendableOutStreams: Set[Int]) =
WaitingForConnectionWindow(sendableOutStreams)
def enqueueOutStream(streamId: Int): MultiplexerState = {
sendableOutstreams += streamId
this
}

override def pulled = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
nextStateStream(buffer)
}

def pullNextFrame(maxSize: Int): (StreamState, PullFrameResult) = throw new IllegalStateException(s"pullNextFrame not supported in state $stateName")
def pullNextFrame(maxSize: Int): (StreamState, PullFrameResult) = (this, PullFrameResult.NothingToSend)
def incomingStreamPulled(): StreamState = throw new IllegalStateException(s"incomingStreamPulled not supported in state $stateName")

/** Called to cleanup any state when the connection is torn down */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ private[http2] trait StreamPrioritizer {
@InternalApi
private[http2] object StreamPrioritizer {
/** A prioritizer that ignores priority information and just sends to the first stream */
def first(): StreamPrioritizer =
new StreamPrioritizer {
def updatePriority(priorityFrame: PriorityFrame): Unit = ()
def chooseSubstream(streams: Set[Int]): Int = streams.head
}
object First extends StreamPrioritizer {
def updatePriority(priorityFrame: PriorityFrame): Unit = ()
def chooseSubstream(streams: Set[Int]): Int = streams.head
}

def usingPriorityTree(): StreamPrioritizer =
new StreamPrioritizer {
Expand Down

0 comments on commit 867a8a5

Please sign in to comment.