Skip to content

Commit

Permalink
Merge pull request eclipse-uprotocol#192 from pranavishere2/notificat…
Browse files Browse the repository at this point in the history
…ionSink

Implementation for NotificationSink
  • Loading branch information
gregmedd authored Jun 28, 2024
2 parents 89a8789 + b070250 commit 3b80662
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 10 deletions.
11 changes: 8 additions & 3 deletions include/up-cpp/communication/NotificationSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ namespace uprotocol::communication {
/// receiving half of the notification model.
struct NotificationSink {
using ListenCallback = transport::UTransport::ListenCallback;

using SinkOrStatus =
utils::Expected<std::unique_ptr<NotificationSink>, v1::UStatus>;
using ListenHandle = transport::UTransport::ListenHandle;

/// @brief Create a notification sink to receive notifications.
///
Expand Down Expand Up @@ -68,12 +68,17 @@ struct NotificationSink {
///
/// @throws std::invalid_argument if listener is not connected.
NotificationSink(std::shared_ptr<transport::UTransport> transport,
transport::UTransport::ListenHandle&& listener);
ListenHandle&& listener);

private:
std::shared_ptr<transport::UTransport> transport_;
ListenHandle listener_;

transport::UTransport::ListenHandle listener_;
// Allow the protected constructor for this class to be used in make_unique
// inside of subscribe()
friend std::unique_ptr<NotificationSink> std::make_unique<
NotificationSink, std::shared_ptr<transport::UTransport>, ListenHandle>(
std::shared_ptr<uprotocol::transport::UTransport>&&, ListenHandle&&);
};

} // namespace uprotocol::communication
Expand Down
24 changes: 24 additions & 0 deletions src/communication/NotificationSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,27 @@
// SPDX-License-Identifier: Apache-2.0

#include "up-cpp/communication/NotificationSink.h"

namespace uprotocol::communication {
NotificationSink::SinkOrStatus NotificationSink::create(
std::shared_ptr<transport::UTransport> transport,
const uprotocol::v1::UUri& sink, ListenCallback&& callback,
std::optional<uprotocol::v1::UUri>&& source_filter) {
if (!transport) {
throw std::invalid_argument("transport cannot be null");
}
auto listener = transport->registerListener(sink, std::move(callback),
std::move(source_filter));
if (!listener) {
return uprotocol::utils::Unexpected(listener.error());
}
return std::make_unique<NotificationSink>(
std::forward<std::shared_ptr<transport::UTransport>>(transport),
std::forward<ListenHandle&&>(std::move(listener).value()));
}

NotificationSink::NotificationSink(
std::shared_ptr<transport::UTransport> transport,
NotificationSink::ListenHandle&& listener)
: transport_(std::move(transport)), listener_(std::move(listener)) {}
} // namespace uprotocol::communication
181 changes: 174 additions & 7 deletions test/coverage/communication/NotificationSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,199 @@
//
// SPDX-License-Identifier: Apache-2.0

#include <google/protobuf/util/message_differencer.h>
#include <gtest/gtest.h>
#include <up-cpp/communication/NotificationSink.h>

#include <random>
#include <string>

#include "UTransportMock.h"
#include "up-cpp/datamodel/validator/UUri.h"

namespace {
using MsgDiff = google::protobuf::util::MessageDifferencer;
using namespace uprotocol::communication;

class TestFixture : public testing::Test {
class NotificationSinkTest : public testing::Test {
protected:
// Run once per TEST_F.
// Used to set up clean environments per test.
void SetUp() override {}
void SetUp() override { buildValidSinkURI(); }
void TearDown() override {}

// Run once per execution of the test application.
// Used for setup of all tests. Has access to this instance.
TestFixture() = default;
~TestFixture() = default;
NotificationSinkTest() = default;
~NotificationSinkTest() = default;

void buildValidSinkURI(const std::string& authority = "192.168.1.10") {
testSink_.set_authority_name(authority);
testSink_.set_ue_id(0x18000);
testSink_.set_ue_version_major(0x1);
testSink_.set_resource_id(0x0);
}

// Run once per execution of the test application.
// Used only for global setup outside of tests.
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}

uprotocol::v1::UUri testSink_;
size_t capture_count_ = 0;
uprotocol::v1::UMessage capture_msg_;

public:
void handleCallbackMessage(const uprotocol::v1::UMessage& message);
};

// TODO replace
TEST_F(TestFixture, SomeTestName) {}
void NotificationSinkTest::handleCallbackMessage(
const uprotocol::v1::UMessage& message) {
capture_msg_ = message;
capture_count_++;
}

std::string get_random_string(size_t length) {
auto randchar = []() -> char {
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[rand() % max_index];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}

// Positive test case with no source filter
TEST_F(NotificationSinkTest, NotificationSinkSuccessWithoutSourceFilter) {
auto transport =
std::make_shared<uprotocol::test::UTransportMock>(testSink_);

auto callback = [this](const auto& arg) {
return this->handleCallbackMessage(arg);
};

std::optional<uprotocol::v1::UUri> source_filter;

auto result = NotificationSink::create(
transport, testSink_, std::move(callback), std::move(source_filter));

EXPECT_TRUE(transport->listener_);
EXPECT_TRUE(result.has_value());

auto handle = std::move(result).value();
EXPECT_TRUE(handle);
EXPECT_TRUE(MsgDiff::Equals(testSink_, transport->sink_filter_));

const size_t max_count = 100;
for (auto i = 0; i < max_count; i++) {
uprotocol::v1::UMessage msg;
auto attr = std::make_shared<uprotocol::v1::UAttributes>();
*msg.mutable_attributes() = *attr;
msg.set_payload(get_random_string(1400));
transport->mockMessage(msg);
EXPECT_EQ(i + 1, capture_count_);
EXPECT_TRUE(MsgDiff::Equals(msg, capture_msg_));
}
}

// Positive test case with source filter
TEST_F(NotificationSinkTest, NotificationSinkSuccessWithSourceFilter) {
auto transport =
std::make_shared<uprotocol::test::UTransportMock>(testSink_);

auto callback = [this](const auto& arg) {
return this->handleCallbackMessage(arg);
};

std::optional<uprotocol::v1::UUri> source_filter = uprotocol::v1::UUri();
if (source_filter) {
source_filter->set_authority_name("192.168.1.11");
source_filter->set_ue_id(0x18001);
source_filter->set_ue_version_major(0x1);
source_filter->set_resource_id(0x0);
}

auto result = NotificationSink::create(
transport, testSink_, std::move(callback), std::move(source_filter));

EXPECT_TRUE(transport->listener_);
EXPECT_TRUE(result.has_value());

auto handle = std::move(result).value();
EXPECT_TRUE(handle);
EXPECT_TRUE(MsgDiff::Equals(testSink_, transport->sink_filter_));

const size_t max_count = 100;
for (auto i = 0; i < max_count; i++) {
uprotocol::v1::UMessage msg;
auto attr = std::make_shared<uprotocol::v1::UAttributes>();
*msg.mutable_attributes() = *attr;
msg.set_payload(get_random_string(1400));
transport->mockMessage(msg);
EXPECT_EQ(i + 1, capture_count_);
EXPECT_TRUE(MsgDiff::Equals(msg, capture_msg_));
}
}

// Simulate Error code from transport mock
TEST_F(NotificationSinkTest, NotificationSinkFailWithErroCode) {
auto transport =
std::make_shared<uprotocol::test::UTransportMock>(testSink_);

auto callback = [this](const auto& arg) {
return this->handleCallbackMessage(arg);
};

std::optional<uprotocol::v1::UUri> source_filter;

uprotocol::v1::UStatus expectedStatus;
expectedStatus.set_code(uprotocol::v1::UCode::ABORTED);
transport->registerListener_status_ = expectedStatus;

auto result = NotificationSink::create(
transport, testSink_, std::move(callback), std::move(source_filter));

auto actualStatus = std::move(result).error();
EXPECT_EQ(actualStatus.code(), expectedStatus.code());
}

// Notification sink with null transport
TEST_F(NotificationSinkTest, NotificationSinkNullTransport) {
// set transport to null
auto transport = nullptr;
std::optional<uprotocol::v1::UUri> source_filter;

auto callback = [this](const auto& arg) {
return this->handleCallbackMessage(arg);
};

EXPECT_THROW(auto result = NotificationSink::create(
transport, testSink_, std::move(callback),
std::move(source_filter)),
std::invalid_argument);
}

// notification sink with null callback
TEST_F(NotificationSinkTest, NotificationSinkNullCallback) {
auto transport =
std::make_shared<uprotocol::test::UTransportMock>(testSink_);

std::optional<uprotocol::v1::UUri> source_filter;

// bind to null callback
auto result = NotificationSink::create(
transport, testSink_, std::move(nullptr), std::move(source_filter));

const size_t max_count = 100;
uprotocol::v1::UMessage msg;
auto attr = std::make_shared<uprotocol::v1::UAttributes>();
*msg.mutable_attributes() = *attr;
msg.set_payload(get_random_string(1400));
EXPECT_THROW(transport->mockMessage(msg), std::bad_function_call);
}

} // namespace
} // namespace

0 comments on commit 3b80662

Please sign in to comment.