Skip to content

Commit

Permalink
[ntcore] Move ServerImpl to nt::server namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterJohnson committed Oct 16, 2024
1 parent 40dfc53 commit cc5ce94
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 86 deletions.
1 change: 1 addition & 0 deletions ntcore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ file(
src/main/native/cpp/net/*.cpp
src/main/native/cpp/net3/*.cpp
src/main/native/cpp/networktables/*.cpp
src/main/native/cpp/server/*.cpp
src/main/native/cpp/tables/*.cpp
)
add_library(ntcore ${ntcore_native_src})
Expand Down
4 changes: 2 additions & 2 deletions ntcore/src/main/native/cpp/NetworkServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#include "net/ClientMessageQueue.h"
#include "net/Message.h"
#include "net/ServerImpl.h"
#include "ntcore_cpp.h"
#include "server/ServerImpl.h"

namespace wpi {
class Logger;
Expand Down Expand Up @@ -78,7 +78,7 @@ class NetworkServer {
using Queue = net::LocalClientMessageQueue;
net::ClientMessage m_localMsgs[Queue::kBlockSize];

net::ServerImpl m_serverImpl;
server::ServerImpl m_serverImpl;

// shared with user (must be atomic or mutex-protected)
std::atomic<wpi::uv::Async<>*> m_flushLocalAtomic{nullptr};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
#include <wpi/raw_ostream.h>
#include <wpi/timestamp.h>

#include "IConnectionList.h"
#include "Log.h"
#include "NetworkInterface.h"
#include "Types_internal.h"
#include "net/Message.h"
#include "net/WireEncoder.h"
Expand All @@ -35,7 +33,7 @@
#include "ntcore_c.h"

using namespace nt;
using namespace nt::net;
using namespace nt::server;
using namespace mpack;

// maximum amount of time the wire can be not ready to send another
Expand Down Expand Up @@ -293,7 +291,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe(

// update periodic sender (if not local)
if (!m_local) {
m_periodMs = UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_setPeriodic(m_periodMs);
}

Expand All @@ -312,7 +310,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe(
bool wasSubscribed =
tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty();
bool wasSubscribedValue =
wasSubscribed ? tcdIt->second.sendMode != ValueSendMode::kDisabled
wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled
: false;

bool added = false;
Expand Down Expand Up @@ -344,7 +342,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe(

for (auto topic : dataToSend) {
DEBUG4("send last value for {} to client {}", topic->name, m_id);
SendValue(topic, topic->lastValue, ValueSendMode::kAll);
SendValue(topic, topic->lastValue, net::ValueSendMode::kAll);
}
}

Expand Down Expand Up @@ -372,7 +370,7 @@ void ServerImpl::ClientData4Base::ClientUnsubscribe(int subuid) {

// loop over all subscribers to update period
if (!m_local) {
m_periodMs = CalculatePeriod(
m_periodMs = net::CalculatePeriod(
m_subscribers, [](auto& x) { return x.getSecond()->periodMs; });
m_setPeriodic(m_periodMs);
}
Expand All @@ -392,7 +390,7 @@ void ServerImpl::ClientData4Base::ClientSetValue(int pubuid,

void ServerImpl::ClientDataLocal::SendValue(TopicData* topic,
const Value& value,
ValueSendMode mode) {
net::ValueSendMode mode) {
if (m_server.m_local) {
m_server.m_local->ServerSetValue(topic->localTopic, value);
}
Expand Down Expand Up @@ -435,32 +433,32 @@ void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic,
}

bool ServerImpl::ClientData4Base::DoProcessIncomingMessages(
ClientMessageQueue& queue, size_t max) {
net::ClientMessageQueue& queue, size_t max) {
DEBUG4("ProcessIncomingMessage()");
max = (std::min)(m_msgsBuf.size(), max);
std::span<ClientMessage> msgs =
std::span<net::ClientMessage> msgs =
queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max));

// just map as a normal client into client=0 calls
bool updatepub = false;
bool updatesub = false;
for (const auto& elem : msgs) { // NOLINT
// common case is value, so check that first
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
if (auto msg = std::get_if<net::ClientValueMsg>(&elem.contents)) {
ClientSetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::PublishMsg>(&elem.contents)) {
ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
msg->options);
updatepub = true;
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::UnpublishMsg>(&elem.contents)) {
ClientUnpublish(msg->pubuid);
updatepub = true;
} else if (auto msg = std::get_if<SetPropertiesMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::SetPropertiesMsg>(&elem.contents)) {
ClientSetProperties(msg->name, msg->update);
} else if (auto msg = std::get_if<SubscribeMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::SubscribeMsg>(&elem.contents)) {
ClientSubscribe(msg->subuid, msg->topicNames, msg->options);
updatesub = true;
} else if (auto msg = std::get_if<UnsubscribeMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::UnsubscribeMsg>(&elem.contents)) {
ClientUnsubscribe(msg->subuid);
updatesub = true;
}
Expand Down Expand Up @@ -502,7 +500,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary(
int pubuid;
Value value;
std::string error;
if (!WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {
if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {
m_wire.Disconnect(fmt::format("binary decode error: {}", error));
break;
}
Expand All @@ -512,7 +510,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary(
auto now = wpi::Now();
DEBUG4("RTT ping from {}, responding with time={}", m_id, now);
m_wire.SendBinary(
[&](auto& os) { WireEncodeBinary(os, -1, now, value); });
[&](auto& os) { net::WireEncodeBinary(os, -1, now, value); });
continue;
}

Expand All @@ -531,7 +529,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary(
}

void ServerImpl::ClientData4::SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) {
net::ValueSendMode mode) {
m_outgoing.SendValue(topic->id, value, mode);
}

Expand All @@ -545,8 +543,8 @@ void ServerImpl::ClientData4::SendAnnounce(TopicData* topic,

if (m_local) {
int unsent = m_wire.WriteText([&](auto& os) {
WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr,
topic->properties, pubuid);
net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr,
topic->properties, pubuid);
});
if (unsent < 0) {
return; // error
Expand All @@ -556,8 +554,8 @@ void ServerImpl::ClientData4::SendAnnounce(TopicData* topic,
}
}
m_outgoing.SendMessage(
topic->id, AnnounceMsg{topic->name, static_cast<int>(topic->id),
topic->typeStr, pubuid, topic->properties});
topic->id, net::AnnounceMsg{topic->name, static_cast<int>(topic->id),
topic->typeStr, pubuid, topic->properties});
m_server.m_controlReady = true;
}

Expand All @@ -569,8 +567,9 @@ void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) {
sent = false;

if (m_local) {
int unsent = m_wire.WriteText(
[&](auto& os) { WireEncodeUnannounce(os, topic->name, topic->id); });
int unsent = m_wire.WriteText([&](auto& os) {
net::WireEncodeUnannounce(os, topic->name, topic->id);
});
if (unsent < 0) {
return; // error
}
Expand All @@ -579,7 +578,7 @@ void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) {
}
}
m_outgoing.SendMessage(
topic->id, UnannounceMsg{topic->name, static_cast<int>(topic->id)});
topic->id, net::UnannounceMsg{topic->name, static_cast<int>(topic->id)});
m_outgoing.EraseId(topic->id);
m_server.m_controlReady = true;
}
Expand All @@ -593,7 +592,7 @@ void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic,

if (m_local) {
int unsent = m_wire.WriteText([&](auto& os) {
WireEncodePropertiesUpdate(os, topic->name, update, ack);
net::WireEncodePropertiesUpdate(os, topic->name, update, ack);
});
if (unsent < 0) {
return; // error
Expand All @@ -603,7 +602,7 @@ void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic,
}
}
m_outgoing.SendMessage(topic->id,
PropertiesUpdateMsg{topic->name, update, ack});
net::PropertiesUpdateMsg{topic->name, update, ack});
m_server.m_controlReady = true;
}

Expand All @@ -618,8 +617,8 @@ void ServerImpl::ClientData4::SendOutgoing(uint64_t curTimeMs, bool flush) {

void ServerImpl::ClientData4::UpdatePeriod(TopicData::TopicClientData& tcd,
TopicData* topic) {
uint32_t period =
CalculatePeriod(tcd.subscribers, [](auto& x) { return x->periodMs; });
uint32_t period = net::CalculatePeriod(tcd.subscribers,
[](auto& x) { return x->periodMs; });
DEBUG4("updating {} period to {} ms", topic->name, period);
m_outgoing.SetPeriod(topic->id, period);
}
Expand All @@ -640,21 +639,21 @@ bool ServerImpl::ClientData3::ProcessIncomingBinary(
}

void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) {
net::ValueSendMode mode) {
if (m_state != kStateRunning) {
if (mode == ValueSendMode::kImm) {
mode = ValueSendMode::kAll;
if (mode == net::ValueSendMode::kImm) {
mode = net::ValueSendMode::kAll;
}
} else if (m_local) {
mode = ValueSendMode::kImm; // always send local immediately
mode = net::ValueSendMode::kImm; // always send local immediately
}
TopicData3* topic3 = GetTopic3(topic);
bool added = false;

switch (mode) {
case ValueSendMode::kDisabled: // do nothing
case net::ValueSendMode::kDisabled: // do nothing
break;
case ValueSendMode::kImm: // send immediately
case net::ValueSendMode::kImm: // send immediately
++topic3->seqNum;
if (topic3->sentAssign) {
net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id,
Expand All @@ -669,7 +668,7 @@ void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value,
Flush();
}
break;
case ValueSendMode::kNormal: {
case net::ValueSendMode::kNormal: {
// replace, or append if not present
wpi::DenseMap<NT_Topic, size_t>::iterator it;
std::tie(it, added) =
Expand All @@ -686,7 +685,7 @@ void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value,
}
}
// fallthrough
case ValueSendMode::kAll: // append to outgoing
case net::ValueSendMode::kAll: // append to outgoing
if (!added) {
m_outgoingValueMap[topic->id] = m_outgoing.size();
}
Expand Down Expand Up @@ -889,7 +888,7 @@ void ServerImpl::ClientData3::ClientHello(std::string_view self_id,
options.prefixMatch = true;
sub = std::make_unique<SubscriberData>(
this, std::span<const std::string>{{prefix}}, 0, options);
m_periodMs = UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_setPeriodic(m_periodMs);

{
Expand Down Expand Up @@ -1212,7 +1211,7 @@ ServerImpl::ServerImpl(wpi::Logger& logger) : m_logger{logger} {

std::pair<std::string, int> ServerImpl::AddClient(
std::string_view name, std::string_view connInfo, bool local,
WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) {
net::WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) {
if (name.empty()) {
name = "NT4";
}
Expand Down Expand Up @@ -1842,7 +1841,7 @@ void ServerImpl::SetValue(ClientData* client, TopicData* topic,

for (auto&& tcd : topic->clients) {
if (tcd.first != client &&
tcd.second.sendMode != ValueSendMode::kDisabled) {
tcd.second.sendMode != net::ValueSendMode::kDisabled) {
tcd.first->SendValue(topic, value, tcd.second.sendMode);
}
}
Expand Down Expand Up @@ -1942,8 +1941,8 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) {
}
}

void ServerImpl::SetLocal(ServerMessageHandler* local,
ClientMessageQueue* queue) {
void ServerImpl::SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue) {
DEBUG4("SetLocal()");
m_local = local;
m_localClient->SetQueue(queue);
Expand Down
Loading

0 comments on commit cc5ce94

Please sign in to comment.