Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counters to shared memory #227

Merged
merged 9 commits into from
Sep 18, 2024
Merged

Counters to shared memory #227

merged 9 commits into from
Sep 18, 2024

Conversation

stal76
Copy link
Collaborator

@stal76 stal76 commented Jul 11, 2024

At the current time, the operation of obtaining the counters values may require a lot of resources, the following actions are performed:

  • the client process opens a unix socket with a Dataplane;
  • a separate thread is created in the Dataplane, this thread processes the client request and returns the counter values.

In this PR, the main Dataplane counters have been moved to shared memory. To do this, the following files are created in /dev/shm/:

  • yanet_data plane.shm - contains general information about workers in Dataplane, metadata and some other information.
  • yanet_workers_node_N.shm - N is the numa node number (a separate file for each numa node), contains buffers of all workers working on this numa node.

A more detailed description of the data storage structure in these files is in the common/sdpcommon.h.

Any client process can open these files and get the values of the required counters.

@stal76 stal76 force-pushed the counters-to-shared-memory branch 4 times, most recently from 5e663ea to 75da7ac Compare July 12, 2024 12:59
Copy link
Collaborator

@ol-imorozko ol-imorozko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First commit:

common/sdpcommon.h Outdated Show resolved Hide resolved
common/sdpcommon.h Outdated Show resolved Hide resolved
common/sdpcommon.h Outdated Show resolved Hide resolved
common/shared_memory.h Show resolved Hide resolved
common/shared_memory.h Show resolved Hide resolved
dataplane/unittest/shared_memory.cpp Outdated Show resolved Hide resolved
dataplane/unittest/shared_memory.cpp Outdated Show resolved Hide resolved
dataplane/unittest/shared_memory.cpp Outdated Show resolved Hide resolved
Comment on lines +450 to +346
for (const auto& iter : counters_named)
{
metadata.counter_positions[iter.first] = metadata.start_counters / sizeof(uint64_t) + static_cast<uint64_t>(iter.second);
}
Copy link
Collaborator

@ol-imorozko ol-imorozko Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're using both iter.first and iter.second, use structured bindings with meaningful names i.e. for (const auto& [name, offset] : counters_named)

Comment on lines +197 to +180
for (const auto& iter : counters_stats)
{
metadata.counter_positions[iter.first] = (metadata.start_stats + iter.second) / sizeof(uint64_t);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with structured bindings, name iter.first and iter.second

Copy link
Collaborator

@ol-imorozko ol-imorozko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second commit:

Comment on lines +9 to +10
#define SHARED_MEMORY_REREAD_TIMEOUT_MICROSECONDS 100
#define SHARED_MEMORY_REREAD_MAXIMUM_ATTEMPTS 100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constexpr auto SHARED_MEMORY_REREAD_TIMEOUT_MICROSECONDS = 100;
constexpr auto SHARED_MEMORY_REREAD_MAXIMUM_ATTEMPTS = 100;

{
number_of_attempts++;
std::string message_error;
uint64_t size_mmap;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to init this variable: uint64_t size_mmap = 0;

Comment on lines 32 to 73
while (result != eResultRead::ok)
{
number_of_attempts++;
std::string message_error;
uint64_t size_mmap;
result = ReadItAgainMainFileDataplane(sdp_data, size_mmap, message_error);
if (result == eResultRead::error)
{
YANET_LOG_ERROR("File %s. %s\n", YANET_SHARED_MEMORY_FILE_DATAPLANE, message_error.c_str());
sdp_data.UnmapBuffers(size_mmap);
return eResult::errorInitSharedMemory;
}
else if (result == eResultRead::need_reread)
{
sdp_data.UnmapBuffers(size_mmap);
if (number_of_attempts >= SHARED_MEMORY_REREAD_MAXIMUM_ATTEMPTS)
{
YANET_LOG_ERROR("File %s. Attempts were made to read: %d. %s\n",
YANET_SHARED_MEMORY_FILE_DATAPLANE,
number_of_attempts,
message_error.c_str());
return eResult::errorInitSharedMemory;
}
YANET_LOG_WARNING("File %s. %s\n", YANET_SHARED_MEMORY_FILE_DATAPLANE, message_error.c_str());
std::this_thread::sleep_for(std::chrono::microseconds{SHARED_MEMORY_REREAD_TIMEOUT_MICROSECONDS});
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it will be better to extract the loop body to some function like AttemptRead. Maybe even a lambda will be good enough:

        int number_of_attempts = 0;
        eResultRead result = eResultRead::need_reread;
        std::string message_error;
        uint64_t size_mmap = 0;
        
        auto attempt_read = [&]() -> eResultRead {
            number_of_attempts++;
            result = ReadItAgainMainFileDataplane(sdp_data, size_mmap, message_error);
            //...
                        std::this_thread::sleep_for(std::chrono::microseconds{/*...*/});
            }
            return result;
        };
        
        while (result != eResultRead::ok) {
            if (auto err = attempt_read(); err == eResult::errorInitSharedMemory) {
                return err;
            }
        }

Comment on lines +69 to +91
for (const auto& iter : sdp_data.workers)
{
sockets_buffer[iter.second.socket] = {nullptr, 0};
}
for (const auto& iter : sdp_data.workers_gc)
{
sockets_buffer[iter.second.socket] = {nullptr, 0};
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sad that since we have a requirement to use gcc from Ubuntu 18, we cannot just do

        for (const auto& [id, worker] : sdp_data.workers) {
            sockets_buffer[worker.socket] = {nullptr, 0};
        }

cause gcc will tell us that id is unused, which is a bug in that gcc version.

this comment is not a request to do something, just sharing my thoughts :)

Comment on lines 79 to 117
for (auto& iter : sockets_buffer)
{
std::string filename = FileNameWorkerOnNumaNode(iter.first);
auto [buffer, size] = common::ipc::SharedMemory::OpenBuffer(filename, false);
if (buffer == nullptr)
{
YANET_LOG_ERROR("Error openning shared memory buffer from file: %s\n", filename.c_str());
return eResult::errorInitSharedMemory;
}
iter.second = {buffer, size};
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here since we use both iter.first and iter.second, we can safely use structured bindings:

for (auto& [socket, buffer_pair] : sockets_buffer)
{
            std::string filename = FileNameWorkerOnNumaNode(socket);
            auto [buffer, size] = common::ipc::SharedMemory::OpenBuffer(filename, false);
            if (!buffer)
            {
                YANET_LOG_ERROR("Error opening shared memory buffer from file: %s\n", filename.c_str());
                return eResult::errorInitSharedMemory;
            }
            buffer_pair = {buffer, size};
}

Comment on lines 190 to 207
for (const auto& iter : data_worker)
{
info[iter.first] = {tSocketId(iter.second.first), iter.second.second, nullptr};
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (const auto& [core_id, worker_data] : data_worker)
{
	info[tCoreId(core_id)] = {tSocketId(worker_data.first), worker_data.second, nullptr};
}

Comment on lines +223 to +199
YANET_LOG_ERROR("Error munmap %d: %s", errno, strerror(errno));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to exit here? If not, maybe LOG_WARNING?

Comment on lines +1002 to +1015
uint64_t* aclCounters = common::sdp::ShiftBuffer<uint64_t*>(iter.second.buffer, start_acl_counters);
for (size_t i = 0; i < YANET_CONFIG_ACL_COUNTERS_SIZE; i++)
{
response[i] += aclCounters[i];
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto* aclCounters = common::sdp::ShiftBuffer<uint64_t*>(iter.second.buffer, start_acl_counters);
std::transform(response.begin(), response.end(), aclCounters, response.begin(), std::plus<>{});

Comment on lines +313 to +314
uint64_t* worker_bursts =
common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, sdp_data->metadata_worker.start_bursts);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto* worker_bursts =

std::array<uint64_t, CONFIG_YADECAP_MBUFS_BURST_SIZE + 1> bursts;
uint64_t* worker_bursts =
common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, sdp_data->metadata_worker.start_bursts);
memcpy(&bursts[0], worker_bursts, sizeof(uint64_t) * (CONFIG_YADECAP_MBUFS_BURST_SIZE + 1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::memcpy(bursts.data(), worker_bursts, sizeof(uint64_t) * bursts.size());

Copy link
Collaborator

@ol-imorozko ol-imorozko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last comment:

Also use reserve to pre-allocate space for vectors to avoid multiple reallocations in bus.h functions

Comment on lines +16 to +19
std::map<common::idp::requestType, std::string> names = {
{common::idp::requestType::updateGlobalBase, "updateGlobalBase"},
{common::idp::requestType::updateGlobalBaseBalancer, "updateGlobalBaseBalancer"},
{common::idp::requestType::getGlobalBase, "getGlobalBase"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include "common/idp.h" for symbol common::idp::requestType

cli/bus.h Outdated
Comment on lines 13 to 14
uint64_t* requests = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_requests);
uint64_t* durations = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_durations);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use auto when initializing with a template cast to avoid duplicating the type name:

auto* requests = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_requests);
auto* durations = common::sdp::ShiftBuffer<uint64_t*>(sdp_data.dataplane_data, sdp_data.metadata_bus.start_bus_durations);

Comment on lines +16 to +17
std::map<common::idp::requestType, std::string> names = {
{common::idp::requestType::updateGlobalBase, "updateGlobalBase"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const std::map<common::idp::requestType, std::string_view> names = {

Comment on lines +63 to +64
for (uint32_t index = 0; index < (uint32_t)common::idp::requestType::size; ++index)
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_cast

@@ -404,6 +404,8 @@ void cBus::clientThread(int clientSocket)

std::chrono::duration<double> duration = std::chrono::system_clock::now() - startTime;

// The duration time is measured in milliseconds
stats.durations[(uint32_t)type] += static_cast<uint64_t>(1000 * duration.count());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_cast type too

Comment on lines +702 to +732
for (const auto& [coreId, worker_info] : sdp_data.workers)
{
std::vector<influxdb_format::value_t> values;
uint64_t* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
for (const auto& [name, index] : sdp_data.metadata_worker.counter_positions)
{
values.emplace_back(name.data(), buffer[index]);
}
influxdb_format::print("worker", {{"coreId", coreId}}, values);
}

for (const auto& [coreId, worker_info] : sdp_data.workers_gc)
{
std::vector<influxdb_format::value_t> values;
uint64_t* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
for (const auto& [name, index] : sdp_data.metadata_worker_gc.counter_positions)
{
values.emplace_back(name.data(), buffer[index]);
}
influxdb_format::print("worker_gc", {{"coreId", coreId}}, values);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code duplication. Either extract to a separate function, or use lambda:

auto process_worker_info = [](const tCoreId coreId, const auto& worker_info, const auto& counter_positions, uint64_t start_counters, std::string_view type) {
	std::vector<influxdb_format::value_t> values;
	auto* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, start_counters);
	for (const auto& [name, index] : counter_positions)
	{
		values.emplace_back(name.data(), buffer[index]);
	}
	influxdb_format::print(type.data(), {{"coreId", coreId}}, values);
};

for (const auto& [coreId, worker_info] : sdp_data.workers)
{
	process_worker_info(coreId, worker_info, sdp_data.metadata_worker.counter_positions, sdp_data.metadata_worker.start_counters, "worker");
}

for (const auto& [coreId, worker_info] : sdp_data.workers_gc)
{
	process_worker_info(coreId, worker_info, sdp_data.metadata_worker_gc.counter_positions, sdp_data.metadata_worker_gc.start_counters, "worker_gc");
}

- Work with shared memory Posix standard and System V is supported.
- Two functions are implemented for each standard - to create a buffer (on the
server) and to open the created buffer (on the client).
- When creating a buffer, one can, if necessary, specify the socket (numa node)
on which to allocate memory, and whether to use HugeMem.
@stal76 stal76 force-pushed the counters-to-shared-memory branch 3 times, most recently from ad72b55 to dddfaf5 Compare September 17, 2024 13:09
@GeorgyKirichenko GeorgyKirichenko dismissed ol-imorozko’s stale review September 18, 2024 06:28

I find the PR good enough to be merged despite some improvements could be still done

@GeorgyKirichenko GeorgyKirichenko merged commit 6d8e640 into main Sep 18, 2024
8 checks passed
@GeorgyKirichenko GeorgyKirichenko deleted the counters-to-shared-memory branch September 18, 2024 06:29
@ol-imorozko
Copy link
Collaborator

@GeorgyKirichenko

I find the PR good enough to be merged despite some improvements could be still done

And when do we plan to address my review notes? There are around 60 unresolved comments, most of which are relatively simple fixes. I don't understand why we are pushing code without resolving these issues, especially since they can be easily addressed.

I'm reviewing the project and noticing several areas that still need improvement, many of which I previously flagged for changes.

Things like:
image

or
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants