forked from halfgaar/FlashMQ
-
Notifications
You must be signed in to change notification settings - Fork 0
/
qospacketqueue.h
73 lines (56 loc) · 2.27 KB
/
qospacketqueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*
This file is part of FlashMQ (https://www.flashmq.org)
Copyright (C) 2021-2023 Wiebe Cazemier
FlashMQ is free software: you can redistribute it and/or modify
it under the terms of The Open Software License 3.0 (OSL-3.0).
See LICENSE for license details.
*/
#ifndef QOSPACKETQUEUE_H
#define QOSPACKETQUEUE_H
#include <list>
#include <map>
#include "types.h"
#include "publishcopyfactory.h"
/**
* @brief The QueuedPublish class wraps the publish with a packet id.
*
* We don't want to store the packet id in the Publish object, because the packet id is determined/tracked per client/session.
*/
class QueuedPublish
{
Publish publish;
uint16_t packet_id = 0;
public:
QueuedPublish(Publish &&publish, uint16_t packet_id);
QueuedPublish(const QueuedPublish &other) = delete;
std::shared_ptr<QueuedPublish> prev;
std::shared_ptr<QueuedPublish> next;
size_t getApproximateMemoryFootprint() const;
uint16_t getPacketId() const;
Publish &getPublish();
};
class QoSPublishQueue
{
std::shared_ptr<QueuedPublish> head;
std::shared_ptr<QueuedPublish> tail;
std::unordered_map<uint16_t, std::shared_ptr<QueuedPublish>> queue;
std::map<std::chrono::time_point<std::chrono::steady_clock>, uint16_t> queueExpirations;
std::chrono::time_point<std::chrono::steady_clock> nextExpireAt = std::chrono::time_point<std::chrono::steady_clock>::max();
ssize_t qosQueueBytes = 0;
void addToExpirationQueue(std::shared_ptr<QueuedPublish> &qp);
void eraseFromMapAndRelinkList(std::unordered_map<uint16_t, std::shared_ptr<QueuedPublish>>::iterator pos);
void addToHeadOfLinkedList(std::shared_ptr<QueuedPublish> &qp);
public:
QoSPublishQueue() = default;
// We make this uncopyable because of the linked list QueuedPublish objects, making a deep-copy difficult.
QoSPublishQueue(const QoSPublishQueue &other) = delete;
bool erase(const uint16_t packet_id);
size_t size() const;
size_t getByteSize() const;
void queuePublish(PublishCopyFactory ©Factory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished);
void queuePublish(Publish &&pub, uint16_t id);
int clearExpiredMessages();
const std::shared_ptr<QueuedPublish> &getTail() const;
std::shared_ptr<QueuedPublish> popNext();
};
#endif // QOSPACKETQUEUE_H