-
Notifications
You must be signed in to change notification settings - Fork 4
/
udp_generator.cpp
171 lines (144 loc) · 4.96 KB
/
udp_generator.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#include "udp_generator.hpp"
#include <unistd.h>
#include <signal.h>
#include <iostream>
#include <netdb.h>
#include <cstring>
#include <algorithm>
#include "rate_limiter.hpp"
UDPGenerator::UDPGenerator() {
m_threads = nullptr;
m_stopped = true;
}
UDPGenerator::~UDPGenerator() {
stop();
}
void UDPGenerator::setHost(const char* host) {
m_host = host;
}
const char* UDPGenerator::getHost() {
return m_host;
}
void UDPGenerator::setPort(int port) {
m_port = port;
}
int UDPGenerator::getPort() {
return m_port;
}
void UDPGenerator::setNumThreads(int num_threads) {
m_num_threads = num_threads;
}
int UDPGenerator::getNumThreads() {
return m_num_threads;
}
void UDPGenerator::setStopAfterPackets(unsigned int packets) {
m_stopAfterPackets = packets;
}
void UDPGenerator::setStopAfterSeconds(unsigned int seconds) {
m_stopAfterSeconds = seconds;
}
void UDPGenerator::setPacketsPerSecond(double packets_per_second) {
m_packets_per_second = packets_per_second;
}
double UDPGenerator::getPacketsPerSecond() {
return m_packets_per_second;
}
void UDPGenerator::setReportInterval(int report_interval) {
m_report_interval = report_interval;
}
int UDPGenerator::getReportInterval() {
return m_report_interval;
}
void UDPGenerator::setNumPacketsPerSend(unsigned int num_packets_per_send) {
m_num_packets_per_send = num_packets_per_send;
}
unsigned int UDPGenerator::getNumPacketsPerSend() {
return m_num_packets_per_send;
}
int UDPGenerator::resolvehelper(const char *hostname, int family, int port, sockaddr_storage *pAddr) {
char service[16];
snprintf(service, 16, "%d", getPort());
return resolvehelper(hostname, family, service, pAddr);
}
int UDPGenerator::resolvehelper(const char *hostname, int family, const char *service, sockaddr_storage *pAddr) {
int result;
addrinfo *result_list = NULL;
addrinfo hints = {};
hints.ai_family = family;
hints.ai_socktype = SOCK_DGRAM; // without this flag, getaddrinfo will return 3x the number of addresses (one for each socket type).
result = getaddrinfo(hostname, service, &hints, &result_list);
if (result == 0) {
memcpy(pAddr, result_list->ai_addr, result_list->ai_addrlen);
freeaddrinfo(result_list);
}
return result;
}
int UDPGenerator::start() {
if (m_packets_per_second > 0) {
m_limiter = std::unique_ptr<RateLimiter>(new RateLimiter());
m_limiter.get()->set_rate(m_packets_per_second);
}
m_sequence_counter.store(0);
m_stopped = false;
m_threads = std::unique_ptr<std::thread[]>(new std::thread[m_num_threads]);
for (int i = 0; i < m_num_threads; ++i) {
if (m_packets_per_second > 0) {
m_report_every_n_packets = (unsigned long)(m_packets_per_second) * m_report_interval;
m_threads[i] = std::thread(&UDPGenerator::runWithRateLimit, this, i);
} else {
m_report_every_n_packets = (unsigned long)1000000 * m_report_interval;
m_threads[i] = std::thread(&UDPGenerator::runWithoutRateLimit, this, i);
}
}
return 0;
}
void UDPGenerator::runWithRateLimit(int threadid) {
time_t now;
const int DATE_BUFF_SIZE = 100;
char date_buff[DATE_BUFF_SIZE];
time_t startTime = time(0);
while(!m_stopped) {
m_limiter.get()->aquire(m_num_packets_per_send);
unsigned long long seq = m_sequence_counter.fetch_add(m_num_packets_per_send);
sendPackets(threadid, m_num_packets_per_send, seq - m_num_packets_per_send);
now = time(0);
if (seq % m_report_every_n_packets == 0) {
strftime(date_buff, DATE_BUFF_SIZE, "%Y-%m-%d %H:%M:%S.000", localtime(&now));
printf ("%s: Sent %llu packets.\n", date_buff, seq);
fflush(stdout);
}
if((m_stopAfterSeconds && (now - startTime >= m_stopAfterSeconds)) ||
(m_stopAfterPackets && (seq >= m_stopAfterPackets)))
kill(getpid(), SIGUSR1);
}
}
void UDPGenerator::runWithoutRateLimit(int threadid) {
time_t now;
const int DATE_BUFF_SIZE = 100;
char date_buff[DATE_BUFF_SIZE];
time_t startTime = time(0);
while(!m_stopped) {
unsigned long long seq = m_sequence_counter.fetch_add(m_num_packets_per_send);
sendPackets(threadid, m_num_packets_per_send, seq - m_num_packets_per_send);
now = time(0);
if (seq % m_report_every_n_packets == 0) {
strftime(date_buff, DATE_BUFF_SIZE, "%Y-%m-%d %H:%M:%S.000", localtime(&now));
printf ("%s: Sent %llu packets.\n", date_buff, seq);
}
if((m_stopAfterSeconds && (now - startTime >= m_stopAfterSeconds)) ||
(m_stopAfterPackets && (seq >= m_stopAfterPackets)))
kill(getpid(), SIGUSR1);
}
}
void UDPGenerator::stop() {
if (!m_stopped) {
return;
}
m_stopped = true;
std::cout << "Stopping..." << std::endl;
if (m_threads != nullptr) {
for (int i = 0; i < m_num_threads; ++i) {
m_threads[i].join();
}
}
}