Skip to content

Commit

Permalink
Add an accessor for context of a fanout channel
Browse files Browse the repository at this point in the history
Summary: Title

Reviewed By: yfeldblum

Differential Revision: D59761840

fbshipit-source-id: 4de9ce742a215f6d7f5fbf783b3cfa3e8634a956
  • Loading branch information
Melwyn DSouza authored and facebook-github-bot committed Sep 11, 2024
1 parent 7731806 commit 04caa53
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
9 changes: 9 additions & 0 deletions folly/channels/FanoutChannel-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ void FanoutChannel<ValueType, ContextType>::close(exception_wrapper ex) && {
processor_ = nullptr;
}

template <typename ValueType, typename ContextType>
ContextType FanoutChannel<ValueType, ContextType>::getContext() const {
return processor_->getContext();
}

namespace detail {

template <typename ValueType, typename ContextType>
Expand All @@ -100,6 +105,8 @@ class IFanoutChannelProcessor : public IChannelCallback {
virtual void closeSubscribers(CloseResult closeResult) = 0;

virtual void destroyHandle(CloseResult closeResult) = 0;

virtual ContextType getContext() = 0;
};

/**
Expand Down Expand Up @@ -210,6 +217,8 @@ class FanoutChannelProcessor
return state_.wlock()->fanoutSender.anySubscribers();
}

ContextType getContext() { return state_.rlock()->context; }

private:
/**
* Called when one of the channels we are listening to has an update (either
Expand Down
5 changes: 5 additions & 0 deletions folly/channels/FanoutChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class FanoutChannel {
*/
void close(exception_wrapper ex = exception_wrapper()) &&;

/**
* Get the context
*/
ContextType getContext() const;

private:
TProcessor* processor_;
};
Expand Down
2 changes: 2 additions & 0 deletions folly/channels/test/FanoutChannelTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ TEST_F(FanoutChannelFixture, ReceiveValue_FanoutBroadcastsValues) {
sender.write(2);
executor_.drain();

EXPECT_EQ(fanoutChannel.getContext().version, 2);

auto [handle3, callback3] = processValues(
fanoutChannel.subscribe([](const LatestVersion& latestVersion) {
EXPECT_EQ(latestVersion.numSubscribers, 2);
Expand Down

0 comments on commit 04caa53

Please sign in to comment.