Skip to content

Commit

Permalink
[ntcore] Server round robin message processing (#7191)
Browse files Browse the repository at this point in the history
Each client has an incoming queue of ClientMessage.

In the read callback:
- Parse and process only ping messages and a limited number of messages;
  anything else will get put into the queue and not processed
- If we queued some messages, we tell the network we stopped reading; this will
  result in back-pressure if we are reading too slowly.  We also start an idle
  handle to process the queued messages.

In the idle handle callback:
- For each client, process just a few pending messages.  This is performed in
  round-robin fashion across all clients with pending messages
- When a client's queue becomes empty, we re-enable the network read
- When all client queues are empty, we stop the idle handle (so we don't spin)

For local client processing, we use round-robin processing for most cases (including FlushLocal),
but still do batch processing of all local changes for explicit network Flush() calls.
  • Loading branch information
PeterJohnson authored Oct 11, 2024
1 parent 8870d98 commit a621ceb
Show file tree
Hide file tree
Showing 15 changed files with 438 additions and 246 deletions.
24 changes: 16 additions & 8 deletions ntcore/src/main/native/cpp/NetworkClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ NetworkClientBase::NetworkClientBase(int inst, std::string_view id,
m_id{id},
m_localQueue{logger},
m_loop{*m_loopRunner.GetLoop()} {
m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize);

INFO("starting network client");
}

Expand Down Expand Up @@ -194,9 +192,14 @@ NetworkClient3::~NetworkClient3() {
}

void NetworkClient3::HandleLocal() {
m_localQueue.ReadQueue(&m_localMsgs);
if (m_clientImpl) {
m_clientImpl->HandleLocal(m_localMsgs);
for (;;) {
auto msgs = m_localQueue.ReadQueue(m_localMsgs);
if (msgs.empty()) {
return;
}
if (m_clientImpl) {
m_clientImpl->HandleLocal(msgs);
}
}
}

Expand Down Expand Up @@ -358,9 +361,14 @@ NetworkClient::~NetworkClient() {
}

void NetworkClient::HandleLocal() {
m_localQueue.ReadQueue(&m_localMsgs);
if (m_clientImpl) {
m_clientImpl->HandleLocal(std::move(m_localMsgs));
for (;;) {
auto msgs = m_localQueue.ReadQueue(m_localMsgs);
if (msgs.empty()) {
return;
}
if (m_clientImpl) {
m_clientImpl->HandleLocal(msgs);
}
}
}

Expand Down
9 changes: 4 additions & 5 deletions ntcore/src/main/native/cpp/NetworkClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <atomic>
#include <functional>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <string_view>
Expand All @@ -23,12 +22,11 @@

#include "INetworkClient.h"
#include "net/ClientImpl.h"
#include "net/ClientMessageQueue.h"
#include "net/Message.h"
#include "net/NetworkLoopQueue.h"
#include "net/WebSocketConnection.h"
#include "net3/ClientImpl3.h"
#include "net3/UvStreamConnection3.h"
#include "ntcore_cpp.h"

namespace wpi {
class Logger;
Expand Down Expand Up @@ -80,7 +78,8 @@ class NetworkClientBase : public INetworkClient {
std::shared_ptr<wpi::uv::Async<>> m_flushLocal;
std::shared_ptr<wpi::uv::Async<>> m_flush;

std::vector<net::ClientMessage> m_localMsgs;
using Queue = net::LocalClientMessageQueue;
net::ClientMessage m_localMsgs[Queue::kBlockSize];

std::vector<std::pair<std::string, unsigned int>> m_servers;

Expand All @@ -91,7 +90,7 @@ class NetworkClientBase : public INetworkClient {
std::atomic<wpi::uv::Async<>*> m_flushLocalAtomic{nullptr};
std::atomic<wpi::uv::Async<>*> m_flushAtomic{nullptr};

net::NetworkLoopQueue m_localQueue;
Queue m_localQueue;

int m_connHandle = 0;

Expand Down
56 changes: 41 additions & 15 deletions ntcore/src/main/native/cpp/NetworkServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace uv = wpi::uv;
// use a larger max message size for websockets
static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024;

static constexpr size_t kClientProcessMessageCountMax = 16;

class NetworkServer::ServerConnection {
public:
ServerConnection(NetworkServer& server, std::string_view addr,
Expand Down Expand Up @@ -105,7 +107,6 @@ class NetworkServer::ServerConnection4 final
void NetworkServer::ServerConnection::SetupOutgoingTimer() {
m_outgoingTimer = uv::Timer::Create(m_server.m_loop);
m_outgoingTimer->timeout.connect([this] {
m_server.HandleLocal();
m_server.m_serverImpl.SendOutgoing(m_clientId,
m_server.m_loop.Now().count());
});
Expand Down Expand Up @@ -172,8 +173,10 @@ NetworkServer::ServerConnection3::ServerConnection3(
ConnectionClosed();
});
stream->data.connect([this](uv::Buffer& buf, size_t size) {
m_server.m_serverImpl.ProcessIncomingBinary(
m_clientId, {reinterpret_cast<const uint8_t*>(buf.base), size});
if (m_server.m_serverImpl.ProcessIncomingBinary(
m_clientId, {reinterpret_cast<const uint8_t*>(buf.base), size})) {
m_server.m_idle->Start();
}
});
stream->StartRead();

Expand Down Expand Up @@ -293,10 +296,14 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
ConnectionClosed();
});
m_websocket->text.connect([this](std::string_view data, bool) {
m_server.m_serverImpl.ProcessIncomingText(m_clientId, data);
if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) {
m_server.m_idle->Start();
}
});
m_websocket->binary.connect([this](std::span<const uint8_t> data, bool) {
m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data);
if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) {
m_server.m_idle->Start();
}
});

SetupOutgoingTimer();
Expand All @@ -320,12 +327,11 @@ NetworkServer::NetworkServer(std::string_view persistentFilename,
m_serverImpl{logger},
m_localQueue{logger},
m_loop(*m_loopRunner.GetLoop()) {
m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize);
m_loopRunner.ExecAsync([=, this](uv::Loop& loop) {
// connect local storage to server
m_serverImpl.SetLocal(&m_localStorage);
m_serverImpl.SetLocal(&m_localStorage, &m_localQueue);
m_localStorage.StartNetwork(&m_localQueue);
HandleLocal();
ProcessAllLocal();

// load persistent file first, then initialize
uv::QueueWork(m_loop, [this] { LoadPersistent(); }, [this] { Init(); });
Expand All @@ -350,9 +356,9 @@ void NetworkServer::Flush() {
}
}

void NetworkServer::HandleLocal() {
m_localQueue.ReadQueue(&m_localMsgs);
m_serverImpl.HandleLocal(m_localMsgs);
void NetworkServer::ProcessAllLocal() {
while (m_serverImpl.ProcessLocalMessages(128)) {
}
}

void NetworkServer::LoadPersistent() {
Expand Down Expand Up @@ -421,8 +427,10 @@ void NetworkServer::Init() {
m_readLocalTimer = uv::Timer::Create(m_loop);
if (m_readLocalTimer) {
m_readLocalTimer->timeout.connect([this] {
HandleLocal();
m_serverImpl.SendAllOutgoing(m_loop.Now().count(), false);
if (m_serverImpl.ProcessLocalMessages(kClientProcessMessageCountMax)) {
DEBUG4("Starting idle processing");
m_idle->Start(); // more to process
}
});
m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100});
}
Expand All @@ -447,18 +455,36 @@ void NetworkServer::Init() {
m_flush = uv::Async<>::Create(m_loop);
if (m_flush) {
m_flush->wakeup.connect([this] {
HandleLocal();
ProcessAllLocal();
m_serverImpl.SendAllOutgoing(m_loop.Now().count(), true);
});
}
m_flushAtomic = m_flush.get();

m_flushLocal = uv::Async<>::Create(m_loop);
if (m_flushLocal) {
m_flushLocal->wakeup.connect([this] { HandleLocal(); });
m_flushLocal->wakeup.connect([this] {
if (m_serverImpl.ProcessLocalMessages(kClientProcessMessageCountMax)) {
DEBUG4("Starting idle processing");
m_idle->Start(); // more to process
}
});
}
m_flushLocalAtomic = m_flushLocal.get();

m_idle = uv::Idle::Create(m_loop);
if (m_idle) {
m_idle->idle.connect([this] {
if (m_serverImpl.ProcessIncomingMessages(kClientProcessMessageCountMax)) {
DEBUG4("Starting idle processing");
m_idle->Start(); // more to process
} else {
DEBUG4("Stopping idle processing");
m_idle->Stop(); // go back to sleep
}
});
}

INFO("Listening on NT3 port {}, NT4 port {}", m_port3, m_port4);

if (m_port3 != 0) {
Expand Down
11 changes: 7 additions & 4 deletions ntcore/src/main/native/cpp/NetworkServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

#include <wpinet/EventLoopRunner.h>
#include <wpinet/uv/Async.h>
#include <wpinet/uv/Idle.h>
#include <wpinet/uv/Timer.h>

#include "net/ClientMessageQueue.h"
#include "net/Message.h"
#include "net/NetworkLoopQueue.h"
#include "net/ServerImpl.h"
#include "ntcore_cpp.h"

Expand Down Expand Up @@ -49,7 +50,7 @@ class NetworkServer {
class ServerConnection3;
class ServerConnection4;

void HandleLocal();
void ProcessAllLocal();
void LoadPersistent();
void SavePersistent(std::string_view filename, std::string_view data);
void Init();
Expand All @@ -71,9 +72,11 @@ class NetworkServer {
std::shared_ptr<wpi::uv::Timer> m_savePersistentTimer;
std::shared_ptr<wpi::uv::Async<>> m_flushLocal;
std::shared_ptr<wpi::uv::Async<>> m_flush;
std::shared_ptr<wpi::uv::Idle> m_idle;
bool m_shutdown = false;

std::vector<net::ClientMessage> m_localMsgs;
using Queue = net::LocalClientMessageQueue;
net::ClientMessage m_localMsgs[Queue::kBlockSize];

net::ServerImpl m_serverImpl;

Expand All @@ -87,7 +90,7 @@ class NetworkServer {
};
std::vector<Connection> m_connections;

net::NetworkLoopQueue m_localQueue;
Queue m_localQueue;

wpi::EventLoopRunner m_loopRunner;
wpi::uv::Loop& m_loop;
Expand Down
2 changes: 1 addition & 1 deletion ntcore/src/main/native/cpp/net/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
}
}

void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
void ClientImpl::HandleLocal(std::span<ClientMessage> msgs) {
DEBUG4("HandleLocal()");
for (auto&& elem : msgs) {
// common case is value
Expand Down
2 changes: 1 addition & 1 deletion ntcore/src/main/native/cpp/net/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ClientImpl final : private ServerMessageHandler {

void ProcessIncomingText(std::string_view data);
void ProcessIncomingBinary(uint64_t curTimeMs, std::span<const uint8_t> data);
void HandleLocal(std::vector<ClientMessage>&& msgs);
void HandleLocal(std::span<ClientMessage> msgs);

void SendOutgoing(uint64_t curTimeMs, bool flush);

Expand Down
87 changes: 87 additions & 0 deletions ntcore/src/main/native/cpp/net/ClientMessageQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.

#pragma once

#include <span>
#include <string>

#include <wpi/FastQueue.h>
#include <wpi/mutex.h>

#include "Message.h"
#include "MessageHandler.h"

namespace wpi {
class Logger;
} // namespace wpi

namespace nt::net {

class ClientMessageQueue {
public:
virtual ~ClientMessageQueue() = default;

virtual std::span<ClientMessage> ReadQueue(std::span<ClientMessage> out) = 0;
virtual void ClearQueue() = 0;
};

namespace detail {

template <size_t MaxValueSize, bool IsMutexed>
class ClientMessageQueueImpl final : public ClientMessageHandler,
public ClientMessageQueue {
public:
static constexpr size_t kBlockSize = 64;

explicit ClientMessageQueueImpl(wpi::Logger& logger) : m_logger{logger} {}

bool empty() const { return m_queue.empty(); }

// ClientMessageQueue - calls to these read the queue
std::span<ClientMessage> ReadQueue(std::span<ClientMessage> out) final;
void ClearQueue() final;

// ClientMessageHandler - calls to these append to the queue
void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void ClientUnpublish(int pubuid) final;
void ClientSetProperties(std::string_view name,
const wpi::json& update) final;
void ClientSubscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void ClientUnsubscribe(int subuid) final;
void ClientSetValue(int pubuid, const Value& value) final;

private:
wpi::FastQueue<ClientMessage, kBlockSize> m_queue{kBlockSize - 1};
wpi::Logger& m_logger;

class NoMutex {
public:
void lock() {}
void unlock() {}
};
[[no_unique_address]]
std::conditional_t<IsMutexed, wpi::mutex, NoMutex> m_mutex;

struct ValueSize {
size_t size{0};
bool errored{false};
};
struct Empty {};
[[no_unique_address]]
std::conditional_t<MaxValueSize != 0, ValueSize, Empty> m_valueSize;
};

} // namespace detail

using LocalClientMessageQueue =
detail::ClientMessageQueueImpl<2 * 1024 * 1024, true>;
using NetworkIncomingClientQueue = detail::ClientMessageQueueImpl<0, false>;

} // namespace nt::net

#include "ClientMessageQueue.inc"
Loading

0 comments on commit a621ceb

Please sign in to comment.