Skip to content

Commit

Permalink
Modifications to currentReadOffset updates for flow control
Browse files Browse the repository at this point in the history
Summary: With reliable resets, we're not going to advance the `currentReadOffset` to the `finalSize` until the application has read all of the reliable data. This is because we could still buffer additional bytes until we get all the reliable data, so we don't want to prematurely send flow control.

Reviewed By: mjoras

Differential Revision: D67681087

fbshipit-source-id: 9b041ce2ae15ccda4b8c6594759fcfe2deb04f64
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Jan 8, 2025
1 parent 075e9e6 commit 1d82b3a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 5 deletions.
40 changes: 39 additions & 1 deletion quic/flowcontrol/QuicFlowController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,18 @@ void updateFlowControlOnRead(
uint64_t lastReadOffset,
TimePoint readTime) {
DCHECK_GE(stream.currentReadOffset, lastReadOffset);
auto diff = stream.currentReadOffset - lastReadOffset;
uint64_t diff = 0;
if (stream.reliableSizeFromPeer &&
stream.currentReadOffset >= *stream.reliableSizeFromPeer) {
CHECK(stream.finalReadOffset.hasValue())
<< "We got a reset from the peer, but the finalReadOffset is not set.";
// We've read all reliable bytes, so we can advance the currentReadOffset
// to the final size.
diff = *stream.finalReadOffset - lastReadOffset;
stream.currentReadOffset = *stream.finalReadOffset;
} else {
diff = stream.currentReadOffset - lastReadOffset;
}
incrementWithOverFlowCheck(
stream.conn.flowControlState.sumCurReadOffset, diff);
if (maybeSendConnWindowUpdate(stream.conn, readTime)) {
Expand All @@ -228,6 +239,33 @@ void updateFlowControlOnRead(
}
}

void updateFlowControlOnReceiveReset(
QuicStreamState& stream,
TimePoint resetTime) {
CHECK(stream.reliableSizeFromPeer.hasValue())
<< "updateFlowControlOnReceiveReset has been called, "
<< "but reliableSizeFromPeer has not been set";
CHECK(stream.finalReadOffset.hasValue())
<< "updateFlowControlOnReceiveReset has been called, "
<< "but finalReadOffset has not been set";
if (stream.currentReadOffset >= *stream.reliableSizeFromPeer) {
// We only advance the currentReadOffset to the final size if the
// application has read all of the reliable bytes. We don't do this
// earlier because we'll buffer additional data that arrives.
auto diff = *stream.finalReadOffset - stream.currentReadOffset;
stream.currentReadOffset = *stream.finalReadOffset;
incrementWithOverFlowCheck(
stream.conn.flowControlState.sumCurReadOffset, diff);
if (maybeSendConnWindowUpdate(stream.conn, resetTime)) {
VLOG(4) << "Reset trigger conn window update "
<< " readOffset=" << stream.conn.flowControlState.sumCurReadOffset
<< " maxOffset="
<< stream.conn.flowControlState.advertisedMaxOffset
<< " window=" << stream.conn.flowControlState.windowSize;
}
}
}

void updateFlowControlOnWriteToSocket(
QuicStreamState& stream,
uint64_t length) {
Expand Down
4 changes: 4 additions & 0 deletions quic/flowcontrol/QuicFlowController.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ void updateFlowControlOnRead(
uint64_t lastReadOffset,
TimePoint readTime);

void updateFlowControlOnReceiveReset(
QuicStreamState& stream,
TimePoint resetTime);

void updateFlowControlOnWriteToSocket(QuicStreamState& stream, uint64_t length);

void updateFlowControlOnWriteToStream(QuicStreamState& stream, uint64_t length);
Expand Down
109 changes: 108 additions & 1 deletion quic/flowcontrol/test/QuicFlowControlTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ TEST_F(QuicFlowControlTest, UpdateBadFlowControlOnStreamData) {
EXPECT_EQ(conn_.flowControlState.sumMaxObservedOffset, 600);
}

TEST_F(QuicFlowControlTest, UpdateFlowControlOnRead) {
TEST_F(QuicFlowControlTest, UpdateFlowControlOnReadBasic) {
auto qLogger = std::make_shared<FileQLogger>(VantagePoint::Client);
conn_.qLogger = qLogger;

Expand Down Expand Up @@ -741,6 +741,113 @@ TEST_F(QuicFlowControlTest, UpdateFlowControlOnRead) {
EXPECT_EQ(event->update, getFlowControlEvent(700));
}

// We've received a reliable reset. Now, we're receiving additional data,
// but we haven't yet received all of the reliable data.
TEST_F(QuicFlowControlTest, UpdateFlowControlOnReadReliableReset1) {
StreamId id = 3;
QuicStreamState stream(id, conn_);
stream.reliableSizeFromPeer = 100;
stream.finalReadOffset = 150;
stream.currentReadOffset = 10;
stream.flowControlState.windowSize = 20;
stream.flowControlState.advertisedMaxOffset = 10000;

conn_.flowControlState.windowSize = 1000;
conn_.flowControlState.advertisedMaxOffset = 10000;
conn_.flowControlState.sumCurReadOffset = 40;

// Simulate the reading of 10 bytes
stream.currentReadOffset = 20;
updateFlowControlOnRead(stream, 10, Clock::now());
EXPECT_EQ(conn_.flowControlState.sumCurReadOffset, 50);
EXPECT_EQ(stream.currentReadOffset, 20);
}

// We've received a reliable reset. Now, we're receiving additional data,
// and we've received all of the reliable data.
TEST_F(QuicFlowControlTest, UpdateFlowControlOnReadReliableReset2) {
StreamId id = 3;
QuicStreamState stream(id, conn_);
stream.reliableSizeFromPeer = 100;
stream.finalReadOffset = 150;
stream.currentReadOffset = 10;
stream.flowControlState.windowSize = 20;
stream.flowControlState.advertisedMaxOffset = 10000;

conn_.flowControlState.windowSize = 1000;
conn_.flowControlState.advertisedMaxOffset = 10000;
conn_.flowControlState.sumCurReadOffset = 40;

// Simulate the reading of 90 bytes
stream.currentReadOffset = 100;
updateFlowControlOnRead(stream, 10, Clock::now());

EXPECT_EQ(conn_.flowControlState.sumCurReadOffset, 180);
EXPECT_EQ(stream.currentReadOffset, 150);
}

// We're receiving a reliable reset with a reliable size > the amount
// of data we've read so far.
TEST_F(QuicFlowControlTest, UpdateFlowControlOnReceiveReset1) {
StreamId id = 3;
QuicStreamState stream(id, conn_);
stream.currentReadOffset = 10;
stream.flowControlState.windowSize = 20;
stream.flowControlState.advertisedMaxOffset = 10000;

conn_.flowControlState.windowSize = 1000;
conn_.flowControlState.advertisedMaxOffset = 10000;
conn_.flowControlState.sumCurReadOffset = 40;

// Simulate the receiving of a reliable reset
stream.reliableSizeFromPeer = 100;
stream.finalReadOffset = 150;
updateFlowControlOnReceiveReset(stream, Clock::now());
EXPECT_EQ(conn_.flowControlState.sumCurReadOffset, 40);
EXPECT_EQ(stream.currentReadOffset, 10);
}

// We're receiving a reliable reset with a reliable size <= the amount
// of data we've read so far.
TEST_F(QuicFlowControlTest, UpdateFlowControlOnReceiveReset2) {
StreamId id = 3;
QuicStreamState stream(id, conn_);
stream.currentReadOffset = 10;
stream.flowControlState.windowSize = 20;
stream.flowControlState.advertisedMaxOffset = 10000;

conn_.flowControlState.windowSize = 1000;
conn_.flowControlState.advertisedMaxOffset = 10000;
conn_.flowControlState.sumCurReadOffset = 40;

// Simulate the receiving of a reliable reset
stream.reliableSizeFromPeer = 10;
stream.finalReadOffset = 150;
updateFlowControlOnReceiveReset(stream, Clock::now());
EXPECT_EQ(conn_.flowControlState.sumCurReadOffset, 180);
EXPECT_EQ(stream.currentReadOffset, 150);
}

// We're receiving a non-reliable reset
TEST_F(QuicFlowControlTest, UpdateFlowControlOnReceiveReset3) {
StreamId id = 3;
QuicStreamState stream(id, conn_);
stream.currentReadOffset = 10;
stream.flowControlState.windowSize = 20;
stream.flowControlState.advertisedMaxOffset = 10000;

conn_.flowControlState.windowSize = 1000;
conn_.flowControlState.advertisedMaxOffset = 10000;
conn_.flowControlState.sumCurReadOffset = 40;

// Simulate the receiving of a non-reliable reset
stream.reliableSizeFromPeer = 0;
stream.finalReadOffset = 150;
updateFlowControlOnReceiveReset(stream, Clock::now());
EXPECT_EQ(conn_.flowControlState.sumCurReadOffset, 180);
EXPECT_EQ(stream.currentReadOffset, 150);
}

TEST_F(QuicFlowControlTest, UpdateFlowControlOnWrite) {
StreamId id = 3;
QuicStreamState stream(id, conn_);
Expand Down
4 changes: 1 addition & 3 deletions quic/state/stream/StreamStateFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@ void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {
// If the currentReadOffset > finalReadOffset we have already processed
// all the bytes until FIN, so we don't need to do anything for the read
// side of the flow controller.
auto lastReadOffset = stream.currentReadOffset;
stream.currentReadOffset = frame.finalSize;
stream.maxOffsetObserved = frame.finalSize;
updateFlowControlOnRead(stream, lastReadOffset, Clock::now());
updateFlowControlOnReceiveReset(stream, Clock::now());
}
stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updateWritableStreams(stream);
Expand Down

0 comments on commit 1d82b3a

Please sign in to comment.