Skip to content

Commit

Permalink
Expose available/occupied space in AsyncBoundedpipe
Browse files Browse the repository at this point in the history
Summary:
Many times applications using AsyncBoundedPipe need some insights into the depth of the Pipe.
This diff exposes available/occupied capacity of the pipe

Reviewed By: aditya-jalan

Differential Revision: D62446509

fbshipit-source-id: ec9f10e8c9bdd98869f88592a5c03fb55c91fc7b
  • Loading branch information
Ranjan Banerjee authored and facebook-github-bot committed Sep 11, 2024
1 parent b3dcc31 commit 4966216
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
6 changes: 6 additions & 0 deletions folly/coro/AsyncPipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ class BoundedAsyncPipe {
return pipe_.write(std::forward<U>(u));
}

size_t getAvailableSpace() { return semaphore_->getAvailableTokens(); }

size_t getOccupiedSpace() {
return semaphore_->getCapacity() - getAvailableSpace();
}

void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); }
void close() && { std::move(pipe_).close(); }

Expand Down
20 changes: 20 additions & 0 deletions folly/coro/test/AsyncPipeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,26 @@ TEST(BoundedAsyncPipeTest, PublishConsume) {
}());
}

TEST(BoundedAsyncPipeTest, PipeCapacity) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 10);
EXPECT_EQ(pipe.getAvailableSpace(), 10);
EXPECT_EQ(pipe.getOccupiedSpace(), 0);
for (int i = 0; i < 7; ++i) {
EXPECT_TRUE(co_await pipe.write(i));
}
EXPECT_EQ(pipe.getAvailableSpace(), 3);
EXPECT_EQ(pipe.getOccupiedSpace(), 7);
for (int i = 0; i < 7; ++i) {
auto item = co_await generator.next();
}
EXPECT_EQ(pipe.getAvailableSpace(), 10);
EXPECT_EQ(pipe.getOccupiedSpace(), 0);
std::move(pipe).close();
}());
}

TEST(BoundedAsyncPipeTest, PublisherBlocks) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::ManualExecutor executor;
Expand Down

0 comments on commit 4966216

Please sign in to comment.