diff --git a/folly/coro/AsyncPipe.h b/folly/coro/AsyncPipe.h index d3b5a135e59..43225119217 100644 --- a/folly/coro/AsyncPipe.h +++ b/folly/coro/AsyncPipe.h @@ -266,6 +266,12 @@ class BoundedAsyncPipe { return pipe_.write(std::forward(u)); } + size_t getAvailableSpace() { return semaphore_->getAvailableTokens(); } + + size_t getOccupiedSpace() { + return semaphore_->getCapacity() - getAvailableSpace(); + } + void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); } void close() && { std::move(pipe_).close(); } diff --git a/folly/coro/test/AsyncPipeTest.cpp b/folly/coro/test/AsyncPipeTest.cpp index dd3b51abfc8..a542557f7fa 100644 --- a/folly/coro/test/AsyncPipeTest.cpp +++ b/folly/coro/test/AsyncPipeTest.cpp @@ -353,6 +353,26 @@ TEST(BoundedAsyncPipeTest, PublishConsume) { }()); } +TEST(BoundedAsyncPipeTest, PipeCapacity) { + folly::coro::blockingWait([]() -> folly::coro::Task { + auto [generator, pipe] = + folly::coro::BoundedAsyncPipe::create(/* tokens */ 10); + EXPECT_EQ(pipe.getAvailableSpace(), 10); + EXPECT_EQ(pipe.getOccupiedSpace(), 0); + for (int i = 0; i < 7; ++i) { + EXPECT_TRUE(co_await pipe.write(i)); + } + EXPECT_EQ(pipe.getAvailableSpace(), 3); + EXPECT_EQ(pipe.getOccupiedSpace(), 7); + for (int i = 0; i < 7; ++i) { + auto item = co_await generator.next(); + } + EXPECT_EQ(pipe.getAvailableSpace(), 10); + EXPECT_EQ(pipe.getOccupiedSpace(), 0); + std::move(pipe).close(); + }()); +} + TEST(BoundedAsyncPipeTest, PublisherBlocks) { folly::coro::blockingWait([]() -> folly::coro::Task { folly::ManualExecutor executor;