From 4966216f6db57a896d4f865f48129f350cc7af40 Mon Sep 17 00:00:00 2001 From: Ranjan Banerjee Date: Tue, 10 Sep 2024 18:36:23 -0700 Subject: [PATCH] Expose available/occupied space in AsyncBoundedpipe Summary: Many times applications using AsyncBoundedPipe need some insights into the depth of the Pipe. This diff exposes available/occupied capacity of the pipe Reviewed By: aditya-jalan Differential Revision: D62446509 fbshipit-source-id: ec9f10e8c9bdd98869f88592a5c03fb55c91fc7b --- folly/coro/AsyncPipe.h | 6 ++++++ folly/coro/test/AsyncPipeTest.cpp | 20 ++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/folly/coro/AsyncPipe.h b/folly/coro/AsyncPipe.h index d3b5a135e59..43225119217 100644 --- a/folly/coro/AsyncPipe.h +++ b/folly/coro/AsyncPipe.h @@ -266,6 +266,12 @@ class BoundedAsyncPipe { return pipe_.write(std::forward(u)); } + size_t getAvailableSpace() { return semaphore_->getAvailableTokens(); } + + size_t getOccupiedSpace() { + return semaphore_->getCapacity() - getAvailableSpace(); + } + void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); } void close() && { std::move(pipe_).close(); } diff --git a/folly/coro/test/AsyncPipeTest.cpp b/folly/coro/test/AsyncPipeTest.cpp index dd3b51abfc8..a542557f7fa 100644 --- a/folly/coro/test/AsyncPipeTest.cpp +++ b/folly/coro/test/AsyncPipeTest.cpp @@ -353,6 +353,26 @@ TEST(BoundedAsyncPipeTest, PublishConsume) { }()); } +TEST(BoundedAsyncPipeTest, PipeCapacity) { + folly::coro::blockingWait([]() -> folly::coro::Task { + auto [generator, pipe] = + folly::coro::BoundedAsyncPipe::create(/* tokens */ 10); + EXPECT_EQ(pipe.getAvailableSpace(), 10); + EXPECT_EQ(pipe.getOccupiedSpace(), 0); + for (int i = 0; i < 7; ++i) { + EXPECT_TRUE(co_await pipe.write(i)); + } + EXPECT_EQ(pipe.getAvailableSpace(), 3); + EXPECT_EQ(pipe.getOccupiedSpace(), 7); + for (int i = 0; i < 7; ++i) { + auto item = co_await generator.next(); + } + EXPECT_EQ(pipe.getAvailableSpace(), 10); + EXPECT_EQ(pipe.getOccupiedSpace(), 0); + std::move(pipe).close(); + }()); +} + TEST(BoundedAsyncPipeTest, PublisherBlocks) { folly::coro::blockingWait([]() -> folly::coro::Task { folly::ManualExecutor executor;