Skip to content

Commit

Permalink
Channel structure has been completed
Browse files Browse the repository at this point in the history
  • Loading branch information
leventkaragol committed May 21, 2024
1 parent 8be244c commit e8a28a7
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 26 deletions.
4 changes: 2 additions & 2 deletions examples/multiple-producer-multiple-consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void produce(Channel<std::string>::Producer producer, const std::string& name)
i++;

// Sending string message to the consumer with producer name
producer.send(name + "-Message " + std::to_string(i));
producer.send(name + " Message " + std::to_string(i));

std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Expand All @@ -28,7 +28,7 @@ void consume(Channel<std::string>::Consumer consumer, const std::string& name)

if (message.has_value())
{
std::cout << name <<"-Received: " << message.value() << std::endl;
std::cout << name <<" Received: " << message.value() << std::endl;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-producer-single-consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void produce(Channel<std::string>::Producer producer, const std::string& name)
i++;

// Sending string message to the consumer with producer name
producer.send(name + "-Message " + std::to_string(i));
producer.send(name + " Message " + std::to_string(i));

std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Expand All @@ -28,7 +28,7 @@ void consume(Channel<std::string>::Consumer consumer)

if (message.has_value())
{
std::cout << "Received: " << message.value() << std::endl;
std::cout << "Consumer Received: " << message.value() << std::endl;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/single-producer-multiple-consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void consume(Channel<std::string>::Consumer consumer, const std::string& name)

if (message.has_value())
{
std::cout << name << "-Received: " << message.value() << std::endl;
std::cout << name << " Received: " << message.value() << std::endl;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/single-producer-single-consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void consume(Channel<std::string>::Consumer consumer)

if (message.has_value())
{
std::cout << "Received: " << message.value() << std::endl;
std::cout << "Consumer Received: " << message.value() << std::endl;
}
}
}
Expand Down
74 changes: 54 additions & 20 deletions src/libcpp-channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,66 +37,100 @@ SOFTWARE.
#include <optional>
#include <memory>
#include <vector>
#include <unordered_map>

namespace lklibs {

namespace lklibs
{
template <typename T>
class Channel {
class Channel
{
private:
struct Data {
std::queue<T> queue_;
struct Data
{
std::queue<std::shared_ptr<T>> queue_;
std::mutex mutex_;
std::condition_variable cond_var_;
std::unordered_map<int, std::queue<std::shared_ptr<T>>> consumer_queues_;
int consumer_id_counter = 0;
};

std::shared_ptr<Data> data_;

public:
Channel() : data_(std::make_shared<Data>()) {}
Channel() : data_(std::make_shared<Data>())
{
}

class Producer {
class Producer
{
public:
explicit Producer(std::shared_ptr<Data> data) : data_(std::move(data)) {}
explicit Producer(std::shared_ptr<Data> data) : data_(std::move(data))
{
}

void send(T value) {
void send(T value)
{
auto message = std::make_shared<T>(std::move(value));

std::unique_lock<std::mutex> lock(data_->mutex_);

data_->queue_.push(std::move(value));
data_->queue_.push(message);

// Broadcast message to all consumers
for (auto& [id, q] : data_->consumer_queues_)
{
q.push(message);
}

lock.unlock();

data_->cond_var_.notify_one();
data_->cond_var_.notify_all();
}

private:
std::shared_ptr<Data> data_;
};

class Consumer {
class Consumer
{
public:
explicit Consumer(std::shared_ptr<Data> data) : data_(std::move(data)) {}
explicit Consumer(std::shared_ptr<Data> data): data_(std::move(data)), consumer_id_(data_->consumer_id_counter++)
{
std::unique_lock<std::mutex> lock(data_->mutex_);

data_->consumer_queues_[consumer_id_] = std::queue<std::shared_ptr<T>>();
}

std::optional<T> receive() {
~Consumer()
{
std::unique_lock<std::mutex> lock(data_->mutex_);

data_->consumer_queues_.erase(consumer_id_);
}

std::optional<T> receive()
{
std::unique_lock<std::mutex> lock(data_->mutex_);

data_->cond_var_.wait(lock, [this]() { return !data_->queue_.empty(); });
auto& q = data_->consumer_queues_[consumer_id_];

if (data_->queue_.empty()) {
data_->cond_var_.wait(lock, [&q]() { return !q.empty(); });

if (q.empty())
{
return std::nullopt; // Spurious wakeup protection
}
}

T value = std::move(data_->queue_.front());
auto message = q.front();

data_->queue_.pop();
q.pop();

return value;
return *message;
}

private:
std::shared_ptr<Data> data_;
int consumer_id_;
};

Producer getProducer()
Expand Down

0 comments on commit e8a28a7

Please sign in to comment.