Skip to content

Commit

Permalink
Fixes port mixup while forwarding packets to slow worker
Browse files Browse the repository at this point in the history
  • Loading branch information
vimes committed Oct 6, 2024
1 parent 5ca8f31 commit fd8c5e8
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 21 deletions.
21 changes: 21 additions & 0 deletions dataplane/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,33 @@ class PortMapper
uint16_t ports_count_ = 0;
tPortId dpdk_ports_[std::numeric_limits<tPortId>::max() + 1]; // logical to dpdk
tPortId logical_ports_[std::numeric_limits<tPortId>::max() + 1]; // dpdk to logical

public:
PortMapper()
{
std::fill(std::begin(dpdk_ports_), std::end(dpdk_ports_), INVALID_PORT_ID);
std::fill(std::begin(logical_ports_), std::end(logical_ports_), INVALID_PORT_ID);
}

PortMapper(const PortMapper& other)
{
*this = other;
}

PortMapper& operator=(const PortMapper& other)
{
ports_count_ = other.ports_count_;
std::copy(std::begin(other.dpdk_ports_),
std::end(other.dpdk_ports_),
std::begin(dpdk_ports_));
std::copy(std::begin(other.logical_ports_),
std::end(other.logical_ports_),
std::begin(logical_ports_));
return *this;
}

uint16_t size() const { return ports_count_; }

[[nodiscard]] std::optional<tPortId> Register(tPortId dpdk_port)
{
if (ports_count_ < CONFIG_YADECAP_PORTS_SIZE)
Expand All @@ -51,6 +71,7 @@ class PortMapper
return {};
}
}

tPortId ToDpdk(tPortId logical) const { return dpdk_ports_[logical]; }
tPortId ToLogical(tPortId dpdk) const { return logical_ports_[dpdk]; }
bool ValidDpdk(tPortId dpdk) const { return logical_ports_[dpdk] != INVALID_PORT_ID; }
Expand Down
8 changes: 3 additions & 5 deletions dataplane/dataplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1197,8 +1197,6 @@ eResult cDataPlane::InitSlowWorker(const tCoreId core, const CPlaneWorkerConfig&
}
YANET_LOG_ERROR("ending with kni bundleconf\n");

dataplane::KernelInterfaceWorkerConfig kni_config{std::move(kni_bundleconf), &basePermanently.ports};

std::vector<cWorker*> workers_to_service;
for (auto& core : cfg.workers)
{
Expand Down Expand Up @@ -1231,7 +1229,7 @@ eResult cDataPlane::InitSlowWorker(const tCoreId core, const CPlaneWorkerConfig&
std::move(ports_to_service),
std::move(workers_to_service),
std::move(rings_from_gcs),
kni_config,
dataplane::KernelInterfaceWorker{kni_bundleconf},
socket_cplane_mempools.at(socket_id),
config.use_kernel_interface,
config.SWICMPOutRateLimit);
Expand Down Expand Up @@ -1322,9 +1320,9 @@ eResult cDataPlane::initKniQueues()
}

tQueueId qid = 0;
for (auto [core, worker]: slow_workers)
for (auto [core, worker] : slow_workers)
{
(void) worker;
(void)worker;
const auto& socket_id = rte_lcore_to_socket_id(core);
if (!KNIAddRxQueue(qid++, socket_id, socket_cplane_mempools.at(socket_id)))
{
Expand Down
8 changes: 7 additions & 1 deletion dataplane/dpdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <rte_mbuf.h>
#include <rte_ring.h>
#include <rte_ethdev.h>

#include "common/type.h"

Expand Down Expand Up @@ -75,9 +76,14 @@ struct Endpoint
{
tPortId port;
tQueueId queue;
Endpoint() = default;
Endpoint() :
Endpoint(-1, -1) {}
Endpoint(tPortId port, tQueueId queue) :
port{port}, queue{queue} {}
};

std::optional<std::string> GetNameByPort(tPortId pid);

std::optional<common::mac_address_t> GetMacAddress(tPortId pid);

} // namespace dpdk
2 changes: 1 addition & 1 deletion dataplane/kernel_interface_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ bool KernelInterfaceHandle::Add(const std::string& vdev_name, const std::string&
YADECAP_LOG_ERROR("vdev interface '%s' not found\n", vdev_name.data());
return false;
}
YADECAP_LOG_INFO("Successfully added vdev interface '%s' (%s)\n", vdev_name.data(), args.data());
YANET_LOG_INFO("Successfully added vdev interface '%s' with pid %d (%s)\n", vdev_name.data(), kni_port_, args.data());
return true;
}

Expand Down
48 changes: 42 additions & 6 deletions dataplane/kernel_interface_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ void KernelInterfaceWorker::RecvFree(const KernelInterface& iface)
rte_pktmbuf_free_bulk(burst, len);
}

KernelInterfaceWorker::KernelInterfaceWorker(const KernelInterfaceWorkerConfig& config) :
port_mapper_{config.port_mapper}
KernelInterfaceWorker::KernelInterfaceWorker(std::vector<KernelInterfaceBundleConfig>& interfaces)
{
for (const auto& iface : config.interfaces)
for (const auto& iface : interfaces)
{
auto mapped = port_mapper_.Register(iface.phy.port);
if (!mapped || mapped != size_)
{
YANET_LOG_ERROR("Failed to register port with kernel interface worker (%d)\n", iface.phy.port);
abort();
}
phy_ports_[size_] = iface.phy.port;
phy_queues_[size_] = iface.phy.queue;
forward_[size_] = KernelInterface{iface.forward};
Expand All @@ -96,6 +101,27 @@ KernelInterfaceWorker::KernelInterfaceWorker(const KernelInterfaceWorkerConfig&
}
}

KernelInterfaceWorker::KernelInterfaceWorker(KernelInterfaceWorker&& other)
{
*this = std::move(other);
}

KernelInterfaceWorker& KernelInterfaceWorker::operator=(KernelInterfaceWorker&& other)
{
size_ = std::exchange(other.size_, 0);
for (std::size_t i = 0; i < size_; ++i)
{
std::swap(phy_ports_[i], other.phy_ports_[i]);
std::swap(phy_queues_[i], other.phy_queues_[i]);
std::swap(forward_[i], other.forward_[i]);
std::swap(in_dump_[i], other.in_dump_[i]);
std::swap(out_dump_[i], other.out_dump_[i]);
std::swap(drop_dump_[i], other.drop_dump_[i]);
}
port_mapper_ = std::move(other.port_mapper_);
return *this;
}

KernelInterfaceWorker::ConstPortArrayRange<tPortId>
KernelInterfaceWorker::PortsIds() const
{
Expand Down Expand Up @@ -179,13 +205,13 @@ void KernelInterfaceWorker::HandlePacketDump(rte_mbuf* mbuf)
{
dataplane::metadata* metadata = YADECAP_METADATA(mbuf);

if (!port_mapper_->ValidDpdk(metadata->flow.data.dump.id))
if (!port_mapper_.ValidDpdk(metadata->flow.data.dump.id))
{
unknown_dump_interface_++;
rte_pktmbuf_free(mbuf);
return;
}
const auto local_port_id = port_mapper_->ToLogical(metadata->flow.data.dump.id);
const auto local_port_id = port_mapper_.ToLogical(metadata->flow.data.dump.id);

using dumpType = common::globalBase::dump_type_e;
switch (metadata->flow.data.dump.type)
Expand All @@ -208,7 +234,17 @@ void KernelInterfaceWorker::HandlePacketDump(rte_mbuf* mbuf)
void KernelInterfaceWorker::HandlePacketFromForwardingPlane(rte_mbuf* mbuf)
{
dataplane::metadata* metadata = YADECAP_METADATA(mbuf);
const auto i = port_mapper_->ToLogical(metadata->fromPortId);
if (!port_mapper_.ValidDpdk(metadata->fromPortId))
{
if (unknown_forward_interface_ < 100 || unknown_forward_interface_ % 100 == 0)
{
YANET_LOG_ERROR("Failed to map dpdk port %d while handling packet from forwarding plane (occurance %ld)\n", metadata->fromPortId, unknown_forward_interface_);
}
++unknown_forward_interface_;
rte_pktmbuf_free(mbuf);
return;
}
const auto i = port_mapper_.ToLogical(metadata->fromPortId);
const auto& delta = forward_[i].PushTracked(mbuf);
stats_[i].opackets += delta.packets;
stats_[i].obytes += delta.bytes;
Expand Down
13 changes: 5 additions & 8 deletions dataplane/kernel_interface_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ struct KernelInterfaceBundleConfig
dpdk::Endpoint drop_dump;
};

struct KernelInterfaceWorkerConfig
{
std::vector<KernelInterfaceBundleConfig> interfaces;
dataplane::base::PortMapper* port_mapper;
};

class KernelInterfaceWorker
{
public:
Expand All @@ -76,8 +70,9 @@ class KernelInterfaceWorker
PortArray<KernelInterface> in_dump_;
PortArray<KernelInterface> out_dump_;
PortArray<KernelInterface> drop_dump_;
const dataplane::base::PortMapper* port_mapper_;
dataplane::base::PortMapper port_mapper_;
uint64_t unknown_dump_interface_ = 0;
uint64_t unknown_forward_interface_ = 0;

/**
* @brief Receive packets from interface and free them.
Expand All @@ -86,7 +81,9 @@ class KernelInterfaceWorker
void RecvFree(const KernelInterface& iface);

public:
KernelInterfaceWorker(const KernelInterfaceWorkerConfig& config);
KernelInterfaceWorker(std::vector<KernelInterfaceBundleConfig>& config);
KernelInterfaceWorker(KernelInterfaceWorker&& other);
KernelInterfaceWorker& operator=(KernelInterfaceWorker&& other);
ConstPortArrayRange<tPortId> PortsIds() const;
ConstPortArrayRange<sKniStats> PortsStats() const;
std::optional<std::reference_wrapper<const sKniStats>> PortStats(tPortId pid) const;
Expand Down
1 change: 1 addition & 0 deletions dataplane/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ sources = files('bus.cpp',
'controlplane.cpp',
'dataplane.cpp',
'debug_latch.cpp',
'dpdk.cpp',
'dregress.cpp',
'fragmentation.cpp',
'icmp_translations.cpp',
Expand Down

0 comments on commit fd8c5e8

Please sign in to comment.