-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Counters to shared memory #227
Conversation
5e663ea
to
75da7ac
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First commit:
for (const auto& iter : counters_named) | ||
{ | ||
metadata.counter_positions[iter.first] = metadata.start_counters / sizeof(uint64_t) + static_cast<uint64_t>(iter.second); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're using both iter.first and iter.second, use structured bindings with meaningful names i.e. for (const auto& [name, offset] : counters_named)
for (const auto& iter : counters_stats) | ||
{ | ||
metadata.counter_positions[iter.first] = (metadata.start_stats + iter.second) / sizeof(uint64_t); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same with structured bindings, name iter.first and iter.second
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second commit:
#define SHARED_MEMORY_REREAD_TIMEOUT_MICROSECONDS 100 | ||
#define SHARED_MEMORY_REREAD_MAXIMUM_ATTEMPTS 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
constexpr auto SHARED_MEMORY_REREAD_TIMEOUT_MICROSECONDS = 100;
constexpr auto SHARED_MEMORY_REREAD_MAXIMUM_ATTEMPTS = 100;
{ | ||
number_of_attempts++; | ||
std::string message_error; | ||
uint64_t size_mmap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to init this variable: uint64_t size_mmap = 0;
common/sdpclient.h
Outdated
while (result != eResultRead::ok) | ||
{ | ||
number_of_attempts++; | ||
std::string message_error; | ||
uint64_t size_mmap; | ||
result = ReadItAgainMainFileDataplane(sdp_data, size_mmap, message_error); | ||
if (result == eResultRead::error) | ||
{ | ||
YANET_LOG_ERROR("File %s. %s\n", YANET_SHARED_MEMORY_FILE_DATAPLANE, message_error.c_str()); | ||
sdp_data.UnmapBuffers(size_mmap); | ||
return eResult::errorInitSharedMemory; | ||
} | ||
else if (result == eResultRead::need_reread) | ||
{ | ||
sdp_data.UnmapBuffers(size_mmap); | ||
if (number_of_attempts >= SHARED_MEMORY_REREAD_MAXIMUM_ATTEMPTS) | ||
{ | ||
YANET_LOG_ERROR("File %s. Attempts were made to read: %d. %s\n", | ||
YANET_SHARED_MEMORY_FILE_DATAPLANE, | ||
number_of_attempts, | ||
message_error.c_str()); | ||
return eResult::errorInitSharedMemory; | ||
} | ||
YANET_LOG_WARNING("File %s. %s\n", YANET_SHARED_MEMORY_FILE_DATAPLANE, message_error.c_str()); | ||
std::this_thread::sleep_for(std::chrono::microseconds{SHARED_MEMORY_REREAD_TIMEOUT_MICROSECONDS}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO it will be better to extract the loop body to some function like AttemptRead
. Maybe even a lambda will be good enough:
int number_of_attempts = 0;
eResultRead result = eResultRead::need_reread;
std::string message_error;
uint64_t size_mmap = 0;
auto attempt_read = [&]() -> eResultRead {
number_of_attempts++;
result = ReadItAgainMainFileDataplane(sdp_data, size_mmap, message_error);
//...
std::this_thread::sleep_for(std::chrono::microseconds{/*...*/});
}
return result;
};
while (result != eResultRead::ok) {
if (auto err = attempt_read(); err == eResult::errorInitSharedMemory) {
return err;
}
}
for (const auto& iter : sdp_data.workers) | ||
{ | ||
sockets_buffer[iter.second.socket] = {nullptr, 0}; | ||
} | ||
for (const auto& iter : sdp_data.workers_gc) | ||
{ | ||
sockets_buffer[iter.second.socket] = {nullptr, 0}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's sad that since we have a requirement to use gcc from Ubuntu 18, we cannot just do
for (const auto& [id, worker] : sdp_data.workers) {
sockets_buffer[worker.socket] = {nullptr, 0};
}
cause gcc will tell us that id
is unused, which is a bug in that gcc version.
this comment is not a request to do something, just sharing my thoughts :)
common/sdpclient.h
Outdated
for (auto& iter : sockets_buffer) | ||
{ | ||
std::string filename = FileNameWorkerOnNumaNode(iter.first); | ||
auto [buffer, size] = common::ipc::SharedMemory::OpenBuffer(filename, false); | ||
if (buffer == nullptr) | ||
{ | ||
YANET_LOG_ERROR("Error openning shared memory buffer from file: %s\n", filename.c_str()); | ||
return eResult::errorInitSharedMemory; | ||
} | ||
iter.second = {buffer, size}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here since we use both iter.first
and iter.second
, we can safely use structured bindings:
for (auto& [socket, buffer_pair] : sockets_buffer)
{
std::string filename = FileNameWorkerOnNumaNode(socket);
auto [buffer, size] = common::ipc::SharedMemory::OpenBuffer(filename, false);
if (!buffer)
{
YANET_LOG_ERROR("Error opening shared memory buffer from file: %s\n", filename.c_str());
return eResult::errorInitSharedMemory;
}
buffer_pair = {buffer, size};
}
common/sdpcommon.h
Outdated
for (const auto& iter : data_worker) | ||
{ | ||
info[iter.first] = {tSocketId(iter.second.first), iter.second.second, nullptr}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (const auto& [core_id, worker_data] : data_worker)
{
info[tCoreId(core_id)] = {tSocketId(worker_data.first), worker_data.second, nullptr};
}
YANET_LOG_ERROR("Error munmap %d: %s", errno, strerror(errno)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to exit here? If not, maybe LOG_WARNING
?
uint64_t* aclCounters = common::sdp::ShiftBuffer<uint64_t*>(iter.second.buffer, start_acl_counters); | ||
for (size_t i = 0; i < YANET_CONFIG_ACL_COUNTERS_SIZE; i++) | ||
{ | ||
response[i] += aclCounters[i]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto* aclCounters = common::sdp::ShiftBuffer<uint64_t*>(iter.second.buffer, start_acl_counters);
std::transform(response.begin(), response.end(), aclCounters, response.begin(), std::plus<>{});
uint64_t* worker_bursts = | ||
common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, sdp_data->metadata_worker.start_bursts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto* worker_bursts =
std::array<uint64_t, CONFIG_YADECAP_MBUFS_BURST_SIZE + 1> bursts; | ||
uint64_t* worker_bursts = | ||
common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, sdp_data->metadata_worker.start_bursts); | ||
memcpy(&bursts[0], worker_bursts, sizeof(uint64_t) * (CONFIG_YADECAP_MBUFS_BURST_SIZE + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::memcpy(bursts.data(), worker_bursts, sizeof(uint64_t) * bursts.size());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last comment:
Also use reserve
to pre-allocate space for vectors to avoid multiple reallocations in bus.h
functions
std::map<common::idp::requestType, std::string> names = { | ||
{common::idp::requestType::updateGlobalBase, "updateGlobalBase"}, | ||
{common::idp::requestType::updateGlobalBaseBalancer, "updateGlobalBaseBalancer"}, | ||
{common::idp::requestType::getGlobalBase, "getGlobalBase"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include "common/idp.h" for symbol common::idp::requestType
cli/bus.h
Outdated
uint64_t* requests = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_requests); | ||
uint64_t* durations = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_durations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use auto
when initializing with a template cast to avoid duplicating the type name:
auto* requests = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_requests);
auto* durations = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_durations);
std::map<common::idp::requestType, std::string> names = { | ||
{common::idp::requestType::updateGlobalBase, "updateGlobalBase"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::map<common::idp::requestType, std::string_view> names = {
for (uint32_t index = 0; index < (uint32_t)common::idp::requestType::size; ++index) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static_cast
@@ -404,6 +404,8 @@ void cBus::clientThread(int clientSocket) | |||
|
|||
std::chrono::duration<double> duration = std::chrono::system_clock::now() - startTime; | |||
|
|||
// The duration time is measured in milliseconds | |||
stats.durations[(uint32_t)type] += static_cast<uint64_t>(1000 * duration.count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static_cast
type too
for (const auto& [coreId, worker_info] : sdp_data.workers) | ||
{ | ||
std::vector<influxdb_format::value_t> values; | ||
uint64_t* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, | ||
sdp_data.metadata_worker.start_counters); | ||
for (const auto& [name, index] : sdp_data.metadata_worker.counter_positions) | ||
{ | ||
values.emplace_back(name.data(), buffer[index]); | ||
} | ||
influxdb_format::print("worker", {{"coreId", coreId}}, values); | ||
} | ||
|
||
for (const auto& [coreId, worker_info] : sdp_data.workers_gc) | ||
{ | ||
std::vector<influxdb_format::value_t> values; | ||
uint64_t* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, | ||
sdp_data.metadata_worker.start_counters); | ||
for (const auto& [name, index] : sdp_data.metadata_worker_gc.counter_positions) | ||
{ | ||
values.emplace_back(name.data(), buffer[index]); | ||
} | ||
influxdb_format::print("worker_gc", {{"coreId", coreId}}, values); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code duplication. Either extract to a separate function, or use lambda:
auto process_worker_info = [](const tCoreId coreId, const auto& worker_info, const auto& counter_positions, uint64_t start_counters, std::string_view type) {
std::vector<influxdb_format::value_t> values;
auto* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, start_counters);
for (const auto& [name, index] : counter_positions)
{
values.emplace_back(name.data(), buffer[index]);
}
influxdb_format::print(type.data(), {{"coreId", coreId}}, values);
};
for (const auto& [coreId, worker_info] : sdp_data.workers)
{
process_worker_info(coreId, worker_info, sdp_data.metadata_worker.counter_positions, sdp_data.metadata_worker.start_counters, "worker");
}
for (const auto& [coreId, worker_info] : sdp_data.workers_gc)
{
process_worker_info(coreId, worker_info, sdp_data.metadata_worker_gc.counter_positions, sdp_data.metadata_worker_gc.start_counters, "worker_gc");
}
75da7ac
to
02129a3
Compare
- Work with shared memory Posix standard and System V is supported. - Two functions are implemented for each standard - to create a buffer (on the server) and to open the created buffer (on the client). - When creating a buffer, one can, if necessary, specify the socket (numa node) on which to allocate memory, and whether to use HugeMem.
ad72b55
to
dddfaf5
Compare
- Bus operation time measurements are enabled. - Added error message output when connecting via socket to a dataplane before calling an exception. - 3 commands for getting bus statistics have been added to the cli. - The command to get the main counters has been added to the cli.
dddfaf5
to
b9dd6a2
Compare
I find the PR good enough to be merged despite some improvements could be still done
And when do we plan to address my review notes? There are around 60 unresolved comments, most of which are relatively simple fixes. I don't understand why we are pushing code without resolving these issues, especially since they can be easily addressed. I'm reviewing the project and noticing several areas that still need improvement, many of which I previously flagged for changes. |
At the current time, the operation of obtaining the counters values may require a lot of resources, the following actions are performed:
In this PR, the main Dataplane counters have been moved to shared memory. To do this, the following files are created in /dev/shm/:
A more detailed description of the data storage structure in these files is in the common/sdpcommon.h.
Any client process can open these files and get the values of the required counters.