Skip to content

Commit

Permalink
Coalesce some onWrite callbacks
Browse files Browse the repository at this point in the history
Summary: If there are multiple writes in the same loop, add up the bytes written and only deliver onWrite stats callback once.

Reviewed By: mjoras

Differential Revision: D56315021

fbshipit-source-id: b4a1005f9f9c61a3667d41febb32e836b85e3ea8
  • Loading branch information
afrind authored and facebook-github-bot committed Apr 21, 2024
1 parent dc8c032 commit 0fe62f6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
14 changes: 7 additions & 7 deletions proxygen/lib/http/session/HQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1066,12 +1066,19 @@ void HQSession::runLoopCallback() noexcept {

// Then handle the writes
// Write all the control streams first
auto maxToSendOrig = maxToSend_;
maxToSend_ -= writeControlStreams(maxToSend_);
// Then write the request streams
if (!txnEgressQueue_.empty() && maxToSend_ > 0) {
// TODO: we could send FIN only?
maxToSend_ = writeRequestStreams(maxToSend_);
}
auto sent = maxToSendOrig - maxToSend_;
if (sent > 0) {
if (infoCallback_) {
infoCallback_->onWrite(*this, sent);
}
}
// Zero out maxToSend_ here. We won't egress anything else until the next
// onWriteReady call
maxToSend_ = 0;
Expand Down Expand Up @@ -1769,10 +1776,6 @@ uint64_t HQSession::controlStreamWriteImpl(HQControlStream* ctrlStream,
<< __func__ << " after write sess=" << *this
<< ": streamID=" << ctrlStream->getEgressStreamId() << " sent=" << sendLen
<< " buflen=" << static_cast<int>(ctrlStream->writeBuf_.chainLength());
if (infoCallback_) {
infoCallback_->onWrite(*this, sendLen);
}

return sendLen;
}

Expand Down Expand Up @@ -2035,9 +2038,6 @@ uint64_t HQSession::requestStreamWriteImpl(HQStreamTransportBase* hqStream,
<< " buflen=" << hqStream->writeBufferSize()
<< " hasPendingBody=" << hqStream->txn_.hasPendingBody()
<< " EOM=" << hqStream->pendingEOM_;
if (infoCallback_) {
infoCallback_->onWrite(*this, sent);
}
CHECK_GE(maxEgress, sent);

bool flowControlBlocked = (sent == streamSendWindow && !sendEof);
Expand Down
8 changes: 7 additions & 1 deletion proxygen/lib/http/session/HTTPSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,7 @@ void HTTPSession::runLoopCallback() noexcept {
});
}

uint64_t bytesWritten = 0;
for (uint32_t i = 0; i < kMaxWritesPerLoop; ++i) {
bodyBytesPerWriteBuf_ = 0;
bool cork = true;
Expand Down Expand Up @@ -2281,8 +2282,13 @@ void HTTPSession::runLoopCallback() noexcept {
// updateWriteBufSize called in scope guard
break;
}
bytesWritten += len;
// writeChain can result in a writeError and trigger the shutdown code path
}
if (infoCallback_ && bytesWritten > 0) {
infoCallback_->onWrite(*this, bytesWritten);
}

if (numActiveWrites_ == 0 && !writesShutdown() && hasMoreWrites() &&
(!connFlowControl_ || connFlowControl_->getAvailableSend())) {
scheduleWrite();
Expand Down Expand Up @@ -2800,7 +2806,7 @@ void HTTPSession::writeSuccess() noexcept {
writeTimeout_.cancelTimeout();
pendingWrite_.reset();

if (infoCallback_) {
if (infoCallback_ && !inLoopCallback_) {
infoCallback_->onWrite(*this, bytesWritten);
}

Expand Down
2 changes: 1 addition & 1 deletion proxygen/lib/http/session/test/HQSessionTestCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class HQSessionTest
auto dirModifier =
(direction_ == proxygen::TransportDirection::DOWNSTREAM) ? 0 : 1;
EXPECT_CALL(infoCb_, onWrite(testing::_, testing::_))
.Times(testing::AtLeast(numCtrlStreams_));
.Times(testing::AtLeast(1));
for (auto i = 0; i < numCtrlStreams_; i++) {
folly::Optional<proxygen::HTTPCodec::StreamID> expectedStreamID =
i * 4 + 2 + dirModifier;
Expand Down

0 comments on commit 0fe62f6

Please sign in to comment.