diff --git a/client/client.cc b/client/client.cc index d1bdf48..d7d6788 100644 --- a/client/client.cc +++ b/client/client.cc @@ -26,7 +26,7 @@ absl::Status ClientImpl::CheckConnected() const { } absl::Status ClientImpl::Init(const std::string &server_socket, - const std::string &client_name) { + const std::string &client_name) { if (socket_.Connected()) { return absl::InternalError("Client is already connected to the server; " "Init() called twice perhaps?"); @@ -98,7 +98,7 @@ CollectBuffers(const google::protobuf::RepeatedPtrField &buffers, absl::StatusOr ClientImpl::CreatePublisher(const std::string &channel_name, int slot_size, - int num_slots, const PublisherOptions &opts) { + int num_slots, const PublisherOptions &opts) { if (absl::Status status = CheckConnected(); !status.ok()) { return status; } @@ -183,7 +183,7 @@ ClientImpl::CreatePublisher(const std::string &channel_name, int slot_size, absl::StatusOr ClientImpl::CreateSubscriber(const std::string &channel_name, - const SubscriberOptions &opts) { + const SubscriberOptions &opts) { if (absl::Status status = CheckConnected(); !status.ok()) { return status; } @@ -247,7 +247,7 @@ ClientImpl::CreateSubscriber(const std::string &channel_name, } absl::StatusOr ClientImpl::GetMessageBuffer(PublisherImpl *publisher, - int32_t max_size) { + int32_t max_size) { publisher->ClearPollFd(); int32_t slot_size = publisher->SlotSize(); @@ -306,21 +306,20 @@ absl::StatusOr ClientImpl::GetMessageBuffer(PublisherImpl *publisher, return buffer; } -absl::StatusOr ClientImpl::PublishMessage(PublisherImpl *publisher, - int64_t message_size) { +absl::StatusOr +ClientImpl::PublishMessage(PublisherImpl *publisher, int64_t message_size) { return PublishMessageInternal(publisher, message_size, /*omit_prefix=*/false); } absl::StatusOr -ClientImpl::PublishMessageInternal(PublisherImpl *publisher, int64_t message_size, - bool omit_prefix) { +ClientImpl::PublishMessageInternal(PublisherImpl *publisher, + int64_t message_size, bool omit_prefix) { // Check if there are any new subscribers and if so, load their trigger fds. if (absl::Status status = ReloadSubscribersIfNecessary(publisher); !status.ok()) { return status; } publisher->SetMessageSize(message_size); - MessageSlot *old_slot = publisher->CurrentSlot(); if (debug_) { if (old_slot != nullptr) { @@ -373,7 +372,7 @@ ClientImpl::PublishMessageInternal(PublisherImpl *publisher, int64_t message_siz } absl::Status ClientImpl::WaitForReliablePublisher(PublisherImpl *publisher, - co::Coroutine *c) { + co::Coroutine *c) { if (absl::Status status = CheckConnected(); !status.ok()) { return status; } @@ -406,7 +405,7 @@ absl::Status ClientImpl::WaitForReliablePublisher(PublisherImpl *publisher, } absl::Status ClientImpl::WaitForSubscriber(SubscriberImpl *subscriber, - co::Coroutine *c) { + co::Coroutine *c) { if (absl::Status status = CheckConnected(); !status.ok()) { return status; } @@ -433,7 +432,7 @@ absl::Status ClientImpl::WaitForSubscriber(SubscriberImpl *subscriber, absl::StatusOr ClientImpl::ReadMessageInternal(SubscriberImpl *subscriber, ReadMode mode, - bool pass_activation, bool clear_trigger) { + bool pass_activation, bool clear_trigger) { if (clear_trigger) { subscriber->ClearPollFd(); } @@ -476,7 +475,6 @@ ClientImpl::ReadMessageInternal(SubscriberImpl *subscriber, ReadMode mode, if (new_slot == nullptr) { // I'm out of messages to read, trigger the publishers to give me // some more. This is only for reliable publishers. - subscriber->SetSlot(nullptr); subscriber->TriggerReliablePublishers(); subscriber->UnmapUnusedBuffers(); return Message(); @@ -510,8 +508,8 @@ ClientImpl::ReadMessageInternal(SubscriberImpl *subscriber, ReadMode mode, subscriber->CurrentOrdinal(), subscriber->Timestamp()); } -absl::StatusOr ClientImpl::ReadMessage(SubscriberImpl *subscriber, - ReadMode mode) { +absl::StatusOr +ClientImpl::ReadMessage(SubscriberImpl *subscriber, ReadMode mode) { // If the channel is a placeholder (no publishers present), look // in the SCB to see if a new publisher has been created and if so, // talk to the server to get the information to reload the shared @@ -536,7 +534,8 @@ absl::StatusOr ClientImpl::ReadMessage(SubscriberImpl *subscriber } absl::StatusOr -ClientImpl::FindMessageInternal(SubscriberImpl *subscriber, uint64_t timestamp) { +ClientImpl::FindMessageInternal(SubscriberImpl *subscriber, + uint64_t timestamp) { MessageSlot *new_slot = subscriber->FindMessage(timestamp); if (new_slot == nullptr) { @@ -547,8 +546,8 @@ ClientImpl::FindMessageInternal(SubscriberImpl *subscriber, uint64_t timestamp) subscriber->CurrentOrdinal(), subscriber->Timestamp()); } -absl::StatusOr ClientImpl::FindMessage(SubscriberImpl *subscriber, - uint64_t timestamp) { +absl::StatusOr +ClientImpl::FindMessage(SubscriberImpl *subscriber, uint64_t timestamp) { // If the channel is a placeholder (no publishers present), contact the // server to see if there is now a publisher. This will reload the shared // memory. If there still isn't a publisher, we will still be a placeholder. @@ -604,8 +603,9 @@ int64_t ClientImpl::GetCurrentOrdinal(SubscriberImpl *sub) const { return slot->ordinal; } -absl::StatusOr ClientImpl::ReloadBuffersIfNecessary(ClientChannel *channel, - ChannelLock *lock) { +absl::StatusOr +ClientImpl::ReloadBuffersIfNecessary(ClientChannel *channel, + ChannelLock *lock) { if (!channel->BuffersChanged()) { return false; } @@ -702,7 +702,8 @@ absl::Status ClientImpl::ReloadSubscriber(SubscriberImpl *subscriber) { return absl::OkStatus(); } -absl::Status ClientImpl::ReloadSubscribersIfNecessary(PublisherImpl *publisher) { +absl::Status +ClientImpl::ReloadSubscribersIfNecessary(PublisherImpl *publisher) { if (absl::Status status = CheckConnected(); !status.ok()) { return status; } @@ -884,7 +885,7 @@ const ChannelCounters &ClientImpl::GetChannelCounters(ClientChannel *channel) { } absl::Status ClientImpl::ResizeChannel(PublisherImpl *publisher, - int32_t new_slot_size) { + int32_t new_slot_size) { if (publisher->IsFixedSize()) { return absl::InternalError(absl::StrFormat( "Channel %s is fixed size at %d bytes; can't increase it to %d bytes", @@ -930,9 +931,9 @@ absl::Status ClientImpl::ResizeChannel(PublisherImpl *publisher, return status; } -absl::Status -ClientImpl::SendRequestReceiveResponse(const Request &req, Response &response, - std::vector &fds) { +absl::Status ClientImpl::SendRequestReceiveResponse( + const Request &req, Response &response, + std::vector &fds) { // SendMessage needs 4 bytes before the buffer passed to // use for the length. char *sendbuf = buffer_ + sizeof(int32_t); diff --git a/client/client_test.cc b/client/client_test.cc index 579589f..4babaa0 100644 --- a/client/client_test.cc +++ b/client/client_test.cc @@ -482,6 +482,12 @@ TEST_F(ClientTest, PublishSingleMessageAndRead) { msg = sub->ReadMessage(); ASSERT_TRUE(msg.ok()); ASSERT_EQ(0, msg->length); + + // Read again to make sure we get another 0. + // Regression test. + msg = sub->ReadMessage(); + ASSERT_TRUE(msg.ok()); + ASSERT_EQ(0, msg->length); } TEST_F(ClientTest, PublishAndResize) {