diff --git a/ntcore/src/main/native/cpp/server/Constants.h b/ntcore/src/main/native/cpp/server/Constants.h new file mode 100644 index 00000000000..654f5be091b --- /dev/null +++ b/ntcore/src/main/native/cpp/server/Constants.h @@ -0,0 +1,13 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +namespace nt::server { + +inline constexpr uint32_t kMinPeriodMs = 5; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/Functions.h b/ntcore/src/main/native/cpp/server/Functions.h new file mode 100644 index 00000000000..f09b4a93710 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/Functions.h @@ -0,0 +1,16 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +namespace nt::server { + +using SetPeriodicFunc = std::function; +using Connected3Func = + std::function; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/MessagePackWriter.h b/ntcore/src/main/native/cpp/server/MessagePackWriter.h new file mode 100644 index 00000000000..766d9aa513b --- /dev/null +++ b/ntcore/src/main/native/cpp/server/MessagePackWriter.h @@ -0,0 +1,31 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include + +#include +#include + +namespace nt::server { + +struct Writer : public mpack::mpack_writer_t { + Writer() { + mpack::mpack_writer_init(this, buf, sizeof(buf)); + mpack::mpack_writer_set_context(this, &os); + mpack::mpack_writer_set_flush( + this, [](mpack::mpack_writer_t* w, const char* buffer, size_t count) { + static_cast(w->context)->write(buffer, count); + }); + } + + std::vector bytes; + wpi::raw_uvector_ostream os{bytes}; + char buf[128]; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient.cpp b/ntcore/src/main/native/cpp/server/ServerClient.cpp new file mode 100644 index 00000000000..8987ebd5ce2 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient.cpp @@ -0,0 +1,61 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient.h" + +#include + +#include "server/MessagePackWriter.h" +#include "server/ServerImpl.h" +#include "server/ServerPublisher.h" + +using namespace nt::server; +using namespace mpack; + +void ServerClient::UpdateMetaClientPub() { + if (!m_metaPub) { + return; + } + Writer w; + mpack_start_array(&w, m_publishers.size()); + for (auto&& pub : m_publishers) { + mpack_write_object_bytes( + &w, reinterpret_cast(pub.second->metaClient.data()), + pub.second->metaClient.size()); + } + mpack_finish_array(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_server.SetValue(nullptr, m_metaPub, Value::MakeRaw(std::move(w.bytes))); + } +} + +void ServerClient::UpdateMetaClientSub() { + if (!m_metaSub) { + return; + } + Writer w; + mpack_start_array(&w, m_subscribers.size()); + for (auto&& sub : m_subscribers) { + mpack_write_object_bytes( + &w, reinterpret_cast(sub.second->metaClient.data()), + sub.second->metaClient.size()); + } + mpack_finish_array(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_server.SetValue(nullptr, m_metaSub, Value::MakeRaw(std::move(w.bytes))); + } +} + +std::span ServerClient::GetSubscribers( + std::string_view name, bool special, + wpi::SmallVectorImpl& buf) { + buf.resize(0); + for (auto&& subPair : m_subscribers) { + ServerSubscriber* subscriber = subPair.getSecond().get(); + if (subscriber->Matches(name, special)) { + buf.emplace_back(subscriber); + } + } + return {buf.data(), buf.size()}; +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient.h b/ntcore/src/main/native/cpp/server/ServerClient.h new file mode 100644 index 00000000000..fec1629ad2f --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient.h @@ -0,0 +1,95 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include +#include + +#include + +#include "net/NetworkOutgoingQueue.h" +#include "server/Functions.h" +#include "server/ServerPublisher.h" +#include "server/ServerSubscriber.h" + +namespace wpi { +class Logger; +template +class SmallVectorImpl; +} // namespace wpi + +namespace nt::server { + +struct ServerTopic; +class ServerImpl; +struct TopicClientData; + +class ServerClient { + public: + ServerClient(std::string_view name, std::string_view connInfo, bool local, + SetPeriodicFunc setPeriodic, ServerImpl& server, int id, + wpi::Logger& logger) + : m_name{name}, + m_connInfo{connInfo}, + m_local{local}, + m_setPeriodic{std::move(setPeriodic)}, + m_server{server}, + m_id{id}, + m_logger{logger} {} + virtual ~ServerClient() = default; + + // these return true if any messages have been queued for later processing + virtual bool ProcessIncomingText(std::string_view data) = 0; + virtual bool ProcessIncomingBinary(std::span data) = 0; + + virtual void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) = 0; + virtual void SendAnnounce(ServerTopic* topic, std::optional pubuid) = 0; + virtual void SendUnannounce(ServerTopic* topic) = 0; + virtual void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) = 0; + virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0; + virtual void Flush() = 0; + + // later processing -- returns true if more to process + virtual bool ProcessIncomingMessages(size_t max) = 0; + + void UpdateMetaClientPub(); + void UpdateMetaClientSub(); + + std::span GetSubscribers( + std::string_view name, bool special, + wpi::SmallVectorImpl& buf); + + std::string_view GetName() const { return m_name; } + int GetId() const { return m_id; } + + virtual void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) {} + + protected: + std::string m_name; + std::string m_connInfo; + bool m_local; // local to machine + SetPeriodicFunc m_setPeriodic; + // TODO: make this per-topic? + uint32_t m_periodMs{UINT32_MAX}; + ServerImpl& m_server; + int m_id; + + wpi::Logger& m_logger; + + wpi::DenseMap> m_publishers; + wpi::DenseMap> m_subscribers; + + public: + // meta topics + ServerTopic* m_metaPub = nullptr; + ServerTopic* m_metaSub = nullptr; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient3.cpp b/ntcore/src/main/native/cpp/server/ServerClient3.cpp new file mode 100644 index 00000000000..b162603a574 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient3.cpp @@ -0,0 +1,490 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient3.h" + +#include + +#include "Log.h" +#include "Types_internal.h" +#include "net3/WireEncoder3.h" +#include "server/ServerImpl.h" +#include "server/ServerPublisher.h" +#include "server/ServerTopic.h" + +using namespace nt::server; + +// maximum amount of time the wire can be not ready to send another +// transmission before we close the connection +static constexpr uint32_t kWireMaxNotReadyUs = 1000000; + +bool ServerClient3::TopicData3::UpdateFlags(ServerTopic* topic) { + unsigned int newFlags = topic->persistent ? NT_PERSISTENT : 0; + bool updated = flags != newFlags; + flags = newFlags; + return updated; +} + +bool ServerClient3::ProcessIncomingBinary(std::span data) { + if (!m_decoder.Execute(&data)) { + m_wire.Disconnect(m_decoder.GetError()); + } + return false; +} + +void ServerClient3::SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) { + if (m_state != kStateRunning) { + if (mode == net::ValueSendMode::kImm) { + mode = net::ValueSendMode::kAll; + } + } else if (m_local) { + mode = net::ValueSendMode::kImm; // always send local immediately + } + TopicData3* topic3 = GetTopic3(topic); + bool added = false; + + switch (mode) { + case net::ValueSendMode::kDisabled: // do nothing + break; + case net::ValueSendMode::kImm: // send immediately + ++topic3->seqNum; + if (topic3->sentAssign) { + net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id, + topic3->seqNum.value(), value); + } else { + net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, + topic->id, topic3->seqNum.value(), value, + topic3->flags); + topic3->sentAssign = true; + } + if (m_local) { + Flush(); + } + break; + case net::ValueSendMode::kNormal: { + // replace, or append if not present + wpi::DenseMap::iterator it; + std::tie(it, added) = + m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size()); + if (!added && it->second < m_outgoing.size()) { + auto& msg = m_outgoing[it->second]; + if (msg.Is(net3::Message3::kEntryUpdate) || + msg.Is(net3::Message3::kEntryAssign)) { + if (msg.id() == topic->id) { // should always be true + msg.SetValue(value); + break; + } + } + } + } + // fallthrough + case net::ValueSendMode::kAll: // append to outgoing + if (!added) { + m_outgoingValueMap[topic->id] = m_outgoing.size(); + } + ++topic3->seqNum; + if (topic3->sentAssign) { + m_outgoing.emplace_back(net3::Message3::EntryUpdate( + topic->id, topic3->seqNum.value(), value)); + } else { + m_outgoing.emplace_back(net3::Message3::EntryAssign( + topic->name, topic->id, topic3->seqNum.value(), value, + topic3->flags)); + topic3->sentAssign = true; + } + break; + } +} + +void ServerClient3::SendAnnounce(ServerTopic* topic, std::optional pubuid) { + // ignore if we've not yet built the subscriber + if (m_subscribers.empty()) { + return; + } + + // subscribe to all non-special topics + if (!topic->special) { + topic->clients[this].AddSubscriber(m_subscribers[0].get()); + m_server.UpdateMetaTopicSub(topic); + } + + // NT3 requires a value to send the assign message, so the assign message + // will get sent when the first value is sent (by SendValue). +} + +void ServerClient3::SendUnannounce(ServerTopic* topic) { + auto it = m_topics3.find(topic); + if (it == m_topics3.end()) { + return; // never sent to client + } + bool sentAssign = it->second.sentAssign; + m_topics3.erase(it); + if (!sentAssign) { + return; // never sent to client + } + + // map to NT3 delete message + if (m_local && m_state == kStateRunning) { + net3::WireEncodeEntryDelete(m_wire.Send().stream(), topic->id); + Flush(); + } else { + m_outgoing.emplace_back(net3::Message3::EntryDelete(topic->id)); + } +} + +void ServerClient3::SendPropertiesUpdate(ServerTopic* topic, + const wpi::json& update, bool ack) { + if (ack) { + return; // we don't ack in NT3 + } + auto it = m_topics3.find(topic); + if (it == m_topics3.end()) { + return; // never sent to client + } + TopicData3* topic3 = &it->second; + // Don't send flags update unless we've already sent an assign message. + // The assign message will contain the updated flags when we eventually + // send it. + if (topic3->UpdateFlags(topic) && topic3->sentAssign) { + if (m_local && m_state == kStateRunning) { + net3::WireEncodeFlagsUpdate(m_wire.Send().stream(), topic->id, + topic3->flags); + Flush(); + } else { + m_outgoing.emplace_back( + net3::Message3::FlagsUpdate(topic->id, topic3->flags)); + } + } +} + +void ServerClient3::SendOutgoing(uint64_t curTimeMs, bool flush) { + if (m_outgoing.empty() || m_state != kStateRunning) { + return; // nothing to do + } + + // rate limit frequency of transmissions + if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) { + return; + } + + if (!m_wire.Ready()) { + uint64_t lastFlushTime = m_wire.GetLastFlushTime(); + uint64_t now = wpi::Now(); + if (lastFlushTime != 0 && now > (lastFlushTime + kWireMaxNotReadyUs)) { + m_wire.Disconnect("transmit stalled"); + } + return; + } + + auto out = m_wire.Send(); + for (auto&& msg : m_outgoing) { + net3::WireEncode(out.stream(), msg); + } + m_wire.Flush(); + m_outgoing.resize(0); + m_outgoingValueMap.clear(); + m_lastSendMs = curTimeMs; +} + +void ServerClient3::KeepAlive() { + DEBUG4("KeepAlive({})", m_id); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected KeepAlive message"); + return; + } + // ignore +} + +void ServerClient3::ServerHelloDone() { + DEBUG4("ServerHelloDone({})", m_id); + m_decoder.SetError("received unexpected ServerHelloDone message"); +} + +void ServerClient3::ClientHelloDone() { + DEBUG4("ClientHelloDone({})", m_id); + if (m_state != kStateServerHelloComplete) { + m_decoder.SetError("received unexpected ClientHelloDone message"); + return; + } + m_state = kStateRunning; +} + +void ServerClient3::ClearEntries() { + DEBUG4("ClearEntries({})", m_id); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected ClearEntries message"); + return; + } + + for (auto topic3it : m_topics3) { + ServerTopic* topic = topic3it.first; + + // make sure we send assign the next time + topic3it.second.sentAssign = false; + + // unpublish from this client (if it was previously published) + if (topic3it.second.published) { + topic3it.second.published = false; + auto publisherIt = m_publishers.find(topic3it.second.pubuid); + if (publisherIt != m_publishers.end()) { + // remove publisher from topic + topic->RemovePublisher(this, publisherIt->second.get()); + + // remove publisher from client + m_publishers.erase(publisherIt); + + // update meta data + m_server.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + } + } + + // set retained=false + m_server.SetProperties(this, topic, {{"retained", false}}); + } +} + +void ServerClient3::ProtoUnsup(unsigned int proto_rev) { + DEBUG4("ProtoUnsup({})", m_id); + m_decoder.SetError("received unexpected ProtoUnsup message"); +} + +void ServerClient3::ClientHello(std::string_view self_id, + unsigned int proto_rev) { + DEBUG4("ClientHello({}, '{}', {:04x})", m_id, self_id, proto_rev); + if (m_state != kStateInitial) { + m_decoder.SetError("received unexpected ClientHello message"); + return; + } + if (proto_rev != 0x0300) { + net3::WireEncodeProtoUnsup(m_wire.Send().stream(), 0x0300); + Flush(); + m_decoder.SetError( + fmt::format("unsupported protocol version {:04x}", proto_rev)); + return; + } + // create a unique name (just ignore provided client id) + m_name = fmt::format("NT3@{}", m_connInfo); + m_connected(m_name, 0x0300); + m_connected = nullptr; // no longer required + + // create client meta topics + m_metaPub = m_server.CreateMetaTopic(fmt::format("$clientpub${}", m_name)); + m_metaSub = m_server.CreateMetaTopic(fmt::format("$clientsub${}", m_name)); + + // subscribe and send initial assignments + auto& sub = m_subscribers[0]; + std::string prefix; + PubSubOptions options; + options.prefixMatch = true; + sub = std::make_unique( + this, std::span{{prefix}}, 0, options); + m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); + m_setPeriodic(m_periodMs); + + { + auto out = m_wire.Send(); + net3::WireEncodeServerHello(out.stream(), 0, "server"); + for (auto&& topic : m_server.m_topics) { + if (topic && !topic->special && topic->IsPublished() && + topic->lastValue) { + DEBUG4("client {}: initial announce of '{}' (id {})", m_id, topic->name, + topic->id); + topic->clients[this].AddSubscriber(sub.get()); + m_server.UpdateMetaTopicSub(topic.get()); + + TopicData3* topic3 = GetTopic3(topic.get()); + ++topic3->seqNum; + net3::WireEncodeEntryAssign(out.stream(), topic->name, topic->id, + topic3->seqNum.value(), topic->lastValue, + topic3->flags); + topic3->sentAssign = true; + } + } + net3::WireEncodeServerHelloDone(out.stream()); + } + Flush(); + m_state = kStateServerHelloComplete; + + // update meta topics + UpdateMetaClientPub(); + UpdateMetaClientSub(); +} + +void ServerClient3::ServerHello(unsigned int flags, std::string_view self_id) { + DEBUG4("ServerHello({}, {}, {})", m_id, flags, self_id); + m_decoder.SetError("received unexpected ServerHello message"); +} + +void ServerClient3::EntryAssign(std::string_view name, unsigned int id, + unsigned int seq_num, const Value& value, + unsigned int flags) { + DEBUG4("EntryAssign({}, {}, {}, {}, {})", m_id, id, seq_num, + static_cast(value.type()), flags); + if (id != 0xffff) { + DEBUG3("ignored EntryAssign from {} with non-0xffff id {}", m_id, id); + return; + } + + // convert from NT3 info + auto typeStr = TypeToString(value.type()); + wpi::json properties = wpi::json::object(); + properties["retained"] = true; // treat all NT3 published topics as retained + properties["cached"] = true; // treat all NT3 published topics as cached + if ((flags & NT_PERSISTENT) != 0) { + properties["persistent"] = true; + } + + // create topic + auto topic = m_server.CreateTopic(this, name, typeStr, properties); + TopicData3* topic3 = GetTopic3(topic); + if (topic3->published || topic3->sentAssign) { + WARN("ignoring client {} duplicate publish of '{}'", m_id, name); + return; + } + ++topic3->seqNum; + topic3->published = true; + topic3->pubuid = m_nextPubUid++; + topic3->sentAssign = true; + + // create publisher + auto [publisherIt, isNew] = m_publishers.try_emplace( + topic3->pubuid, + std::make_unique(this, topic, topic3->pubuid)); + if (!isNew) { + return; // shouldn't happen, but just in case... + } + + // add publisher to topic + topic->AddPublisher(this, publisherIt->getSecond().get()); + + // update meta data + m_server.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + + // acts as an announce + data update + SendAnnounce(topic, topic3->pubuid); + m_server.SetValue(this, topic, value); + + // respond with assign message with assigned topic ID + if (m_local && m_state == kStateRunning) { + net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, topic->id, + topic3->seqNum.value(), value, topic3->flags); + } else { + m_outgoing.emplace_back(net3::Message3::EntryAssign( + topic->name, topic->id, topic3->seqNum.value(), value, topic3->flags)); + } +} + +void ServerClient3::EntryUpdate(unsigned int id, unsigned int seq_num, + const Value& value) { + DEBUG4("EntryUpdate({}, {}, {}, {})", m_id, id, seq_num, + static_cast(value.type())); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected EntryUpdate message"); + return; + } + + if (id >= m_server.m_topics.size()) { + DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id); + return; + } + ServerTopic* topic = m_server.m_topics[id].get(); + if (!topic || !topic->IsPublished()) { + DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id); + return; + } + + TopicData3* topic3 = GetTopic3(topic); + if (!topic3->published) { + topic3->published = true; + topic3->pubuid = m_nextPubUid++; + + // create publisher + auto [publisherIt, isNew] = m_publishers.try_emplace( + topic3->pubuid, + std::make_unique(this, topic, topic3->pubuid)); + if (isNew) { + // add publisher to topic + topic->AddPublisher(this, publisherIt->getSecond().get()); + + // update meta data + m_server.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + } + } + topic3->seqNum = net3::SequenceNumber{seq_num}; + + m_server.SetValue(this, topic, value); +} + +void ServerClient3::FlagsUpdate(unsigned int id, unsigned int flags) { + DEBUG4("FlagsUpdate({}, {}, {})", m_id, id, flags); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected FlagsUpdate message"); + return; + } + if (id >= m_server.m_topics.size()) { + DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id); + return; + } + ServerTopic* topic = m_server.m_topics[id].get(); + if (!topic || !topic->IsPublished()) { + DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id); + return; + } + if (topic->special) { + DEBUG3("ignored FlagsUpdate from {} on special topic {}", m_id, id); + return; + } + m_server.SetFlags(this, topic, flags); +} + +void ServerClient3::EntryDelete(unsigned int id) { + DEBUG4("EntryDelete({}, {})", m_id, id); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected EntryDelete message"); + return; + } + if (id >= m_server.m_topics.size()) { + DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id); + return; + } + ServerTopic* topic = m_server.m_topics[id].get(); + if (!topic || !topic->IsPublished()) { + DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id); + return; + } + if (topic->special) { + DEBUG3("ignored EntryDelete from {} on special topic {}", m_id, id); + return; + } + + auto topic3it = m_topics3.find(topic); + if (topic3it != m_topics3.end()) { + // make sure we send assign the next time + topic3it->second.sentAssign = false; + + // unpublish from this client (if it was previously published) + if (topic3it->second.published) { + topic3it->second.published = false; + auto publisherIt = m_publishers.find(topic3it->second.pubuid); + if (publisherIt != m_publishers.end()) { + // remove publisher from topic + topic->RemovePublisher(this, publisherIt->second.get()); + + // remove publisher from client + m_publishers.erase(publisherIt); + + // update meta data + m_server.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + } + } + } + + // set retained=false + m_server.SetProperties(this, topic, {{"retained", false}}); +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient3.h b/ntcore/src/main/native/cpp/server/ServerClient3.h new file mode 100644 index 00000000000..6444e24f895 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient3.h @@ -0,0 +1,95 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include "ServerClient.h" + +#include "net/ClientMessageQueue.h" +#include "net3/Message3.h" +#include "net3/SequenceNumber.h" +#include "net3/WireConnection3.h" +#include "net3/WireDecoder3.h" +#include "server/Functions.h" + +namespace nt::server { + +class ServerClient3 final : public ServerClient, private net3::MessageHandler3 { + public: + ServerClient3(std::string_view connInfo, bool local, + net3::WireConnection3& wire, Connected3Func connected, + SetPeriodicFunc setPeriodic, ServerImpl& server, int id, + wpi::Logger& logger) + : ServerClient{"", connInfo, local, setPeriodic, server, id, logger}, + m_connected{std::move(connected)}, + m_wire{wire}, + m_decoder{*this}, + m_incoming{logger} {} + + bool ProcessIncomingText(std::string_view data) final { return false; } + bool ProcessIncomingBinary(std::span data) final; + + bool ProcessIncomingMessages(size_t max) final { return false; } + + void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) final; + void SendAnnounce(ServerTopic* topic, std::optional pubuid) final; + void SendUnannounce(ServerTopic* topic) final; + void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) final; + void SendOutgoing(uint64_t curTimeMs, bool flush) final; + + void Flush() final { m_wire.Flush(); } + + private: + // MessageHandler3 interface + void KeepAlive() final; + void ServerHelloDone() final; + void ClientHelloDone() final; + void ClearEntries() final; + void ProtoUnsup(unsigned int proto_rev) final; + void ClientHello(std::string_view self_id, unsigned int proto_rev) final; + void ServerHello(unsigned int flags, std::string_view self_id) final; + void EntryAssign(std::string_view name, unsigned int id, unsigned int seq_num, + const Value& value, unsigned int flags) final; + void EntryUpdate(unsigned int id, unsigned int seq_num, + const Value& value) final; + void FlagsUpdate(unsigned int id, unsigned int flags) final; + void EntryDelete(unsigned int id) final; + void ExecuteRpc(unsigned int id, unsigned int uid, + std::span params) final {} + void RpcResponse(unsigned int id, unsigned int uid, + std::span result) final {} + + Connected3Func m_connected; + net3::WireConnection3& m_wire; + + enum State { kStateInitial, kStateServerHelloComplete, kStateRunning }; + State m_state{kStateInitial}; + net3::WireDecoder3 m_decoder; + + net::NetworkIncomingClientQueue m_incoming; + std::vector m_outgoing; + wpi::DenseMap m_outgoingValueMap; + int64_t m_nextPubUid{1}; + uint64_t m_lastSendMs{0}; + + struct TopicData3 { + explicit TopicData3(ServerTopic* topic) { UpdateFlags(topic); } + + unsigned int flags{0}; + net3::SequenceNumber seqNum; + bool sentAssign{false}; + bool published{false}; + int64_t pubuid{0}; + + bool UpdateFlags(ServerTopic* topic); + }; + wpi::DenseMap m_topics3; + TopicData3* GetTopic3(ServerTopic* topic) { + return &m_topics3.try_emplace(topic, topic).first->second; + } +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient4.cpp b/ntcore/src/main/native/cpp/server/ServerClient4.cpp new file mode 100644 index 00000000000..9c8ca2cf53d --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4.cpp @@ -0,0 +1,160 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient4.h" + +#include + +#include "Log.h" +#include "net/WireDecoder.h" +#include "server/ServerImpl.h" +#include "server/ServerTopic.h" + +using namespace nt::server; + +bool ServerClient4::ProcessIncomingText(std::string_view data) { + constexpr int kMaxImmProcessing = 10; + bool queueWasEmpty = m_incoming.empty(); + // can't directly process, because we don't know how big it is + WireDecodeText(data, m_incoming, m_logger); + if (queueWasEmpty && + DoProcessIncomingMessages(m_incoming, kMaxImmProcessing)) { + m_wire.StopRead(); + return true; + } + return false; +} + +bool ServerClient4::ProcessIncomingBinary(std::span data) { + constexpr int kMaxImmProcessing = 10; + // if we've already queued, keep queuing + int count = m_incoming.empty() ? 0 : kMaxImmProcessing; + for (;;) { + if (data.empty()) { + break; + } + + // decode message + int pubuid; + Value value; + std::string error; + if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) { + m_wire.Disconnect(fmt::format("binary decode error: {}", error)); + break; + } + + // respond to RTT ping + if (pubuid == -1) { + auto now = wpi::Now(); + DEBUG4("RTT ping from {}, responding with time={}", m_id, now); + m_wire.SendBinary( + [&](auto& os) { net::WireEncodeBinary(os, -1, now, value); }); + continue; + } + + // handle value set + if (++count < kMaxImmProcessing) { + ClientSetValue(pubuid, value); + } else { + m_incoming.ClientSetValue(pubuid, value); + } + } + if (count >= kMaxImmProcessing) { + m_wire.StopRead(); + return true; + } + return false; +} + +void ServerClient4::SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) { + m_outgoing.SendValue(topic->id, value, mode); +} + +void ServerClient4::SendAnnounce(ServerTopic* topic, std::optional pubuid) { + auto& sent = m_announceSent[topic]; + if (sent) { + return; + } + sent = true; + + if (m_local) { + int unsent = m_wire.WriteText([&](auto& os) { + net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr, + topic->properties, pubuid); + }); + if (unsent < 0) { + return; // error + } + if (unsent == 0 && m_wire.Flush() == 0) { + return; + } + } + m_outgoing.SendMessage( + topic->id, net::AnnounceMsg{topic->name, static_cast(topic->id), + topic->typeStr, pubuid, topic->properties}); + m_server.m_controlReady = true; +} + +void ServerClient4::SendUnannounce(ServerTopic* topic) { + auto& sent = m_announceSent[topic]; + if (!sent) { + return; + } + sent = false; + + if (m_local) { + int unsent = m_wire.WriteText([&](auto& os) { + net::WireEncodeUnannounce(os, topic->name, topic->id); + }); + if (unsent < 0) { + return; // error + } + if (unsent == 0 && m_wire.Flush() == 0) { + return; + } + } + m_outgoing.SendMessage( + topic->id, net::UnannounceMsg{topic->name, static_cast(topic->id)}); + m_outgoing.EraseId(topic->id); + m_server.m_controlReady = true; +} + +void ServerClient4::SendPropertiesUpdate(ServerTopic* topic, + const wpi::json& update, bool ack) { + if (!m_announceSent.lookup(topic)) { + return; + } + + if (m_local) { + int unsent = m_wire.WriteText([&](auto& os) { + net::WireEncodePropertiesUpdate(os, topic->name, update, ack); + }); + if (unsent < 0) { + return; // error + } + if (unsent == 0 && m_wire.Flush() == 0) { + return; + } + } + m_outgoing.SendMessage(topic->id, + net::PropertiesUpdateMsg{topic->name, update, ack}); + m_server.m_controlReady = true; +} + +void ServerClient4::SendOutgoing(uint64_t curTimeMs, bool flush) { + if (m_wire.GetVersion() >= 0x0401) { + if (!m_ping.Send(curTimeMs)) { + return; + } + } + m_outgoing.SendOutgoing(curTimeMs, flush); +} + +void ServerClient4::UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) { + uint32_t period = net::CalculatePeriod(tcd.subscribers, + [](auto& x) { return x->periodMs; }); + DEBUG4("updating {} period to {} ms", topic->name, period); + m_outgoing.SetPeriod(topic->id, period); +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient4.h b/ntcore/src/main/native/cpp/server/ServerClient4.h new file mode 100644 index 00000000000..bd89b246190 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4.h @@ -0,0 +1,59 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include "net/NetworkPing.h" +#include "net/WireConnection.h" +#include "server/Functions.h" +#include "server/ServerClient4Base.h" + +namespace nt::server { + +class ServerClient4 final : public ServerClient4Base { + public: + ServerClient4(std::string_view name, std::string_view connInfo, bool local, + net::WireConnection& wire, SetPeriodicFunc setPeriodic, + ServerImpl& server, int id, wpi::Logger& logger) + : ServerClient4Base{name, connInfo, local, setPeriodic, server, id, logger}, + m_wire{wire}, + m_ping{wire}, + m_incoming{logger}, + m_outgoing{wire, local} {} + + bool ProcessIncomingText(std::string_view data) final; + bool ProcessIncomingBinary(std::span data) final; + + bool ProcessIncomingMessages(size_t max) final { + if (!DoProcessIncomingMessages(m_incoming, max)) { + m_wire.StartRead(); + return false; + } + return true; + } + + void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) final; + void SendAnnounce(ServerTopic* topic, std::optional pubuid) final; + void SendUnannounce(ServerTopic* topic) final; + void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) final; + void SendOutgoing(uint64_t curTimeMs, bool flush) final; + + void Flush() final {} + + void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) final; + + public: + net::WireConnection& m_wire; + + private: + net::NetworkPing m_ping; + net::NetworkIncomingClientQueue m_incoming; + net::NetworkOutgoingQueue m_outgoing; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient4Base.cpp b/ntcore/src/main/native/cpp/server/ServerClient4Base.cpp new file mode 100644 index 00000000000..cf14aa57fed --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4Base.cpp @@ -0,0 +1,247 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient4Base.h" + +#include + +#include +#include + +#include "Log.h" +#include "server/Constants.h" +#include "server/ServerImpl.h" +#include "server/ServerPublisher.h" + +using namespace nt::server; + +void ServerClient4Base::ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, + const wpi::json& properties, + const PubSubOptionsImpl& options) { + DEBUG3("ClientPublish({}, {}, {}, {})", m_id, name, pubuid, typeStr); + auto topic = m_server.CreateTopic(this, name, typeStr, properties); + + // create publisher + auto [publisherIt, isNew] = m_publishers.try_emplace( + pubuid, std::make_unique(this, topic, pubuid)); + if (!isNew) { + WARN("client {} duplicate publish of pubuid {}", m_id, pubuid); + } else { + // add publisher to topic + topic->AddPublisher(this, publisherIt->getSecond().get()); + + // update meta data + m_server.UpdateMetaTopicPub(topic); + } + + // respond with announce with pubuid to client + DEBUG4("client {}: announce {} pubuid {}", m_id, topic->name, pubuid); + SendAnnounce(topic, pubuid); +} + +void ServerClient4Base::ClientUnpublish(int pubuid) { + DEBUG3("ClientUnpublish({}, {})", m_id, pubuid); + auto publisherIt = m_publishers.find(pubuid); + if (publisherIt == m_publishers.end()) { + return; // nothing to do + } + auto publisher = publisherIt->getSecond().get(); + auto topic = publisher->topic; + + // remove publisher from topic + topic->RemovePublisher(this, publisher); + + // remove publisher from client + m_publishers.erase(publisherIt); + + // update meta data + m_server.UpdateMetaTopicPub(topic); + + // delete topic if no longer published + if (!topic->IsPublished()) { + m_server.DeleteTopic(topic); + } +} + +void ServerClient4Base::ClientSetProperties(std::string_view name, + const wpi::json& update) { + DEBUG4("ClientSetProperties({}, {}, {})", m_id, name, update.dump()); + auto topicIt = m_server.m_nameTopics.find(name); + if (topicIt == m_server.m_nameTopics.end() || + !topicIt->second->IsPublished()) { + WARN( + "server ignoring SetProperties({}) from client {} on unpublished topic " + "'{}'; publish or set a value first", + update.dump(), m_id, name); + return; // nothing to do + } + auto topic = topicIt->second; + if (topic->special) { + WARN("server ignoring SetProperties({}) from client {} on meta topic '{}'", + update.dump(), m_id, name); + return; // nothing to do + } + m_server.SetProperties(nullptr, topic, update); +} + +void ServerClient4Base::ClientSubscribe(int subuid, + std::span topicNames, + const PubSubOptionsImpl& options) { + DEBUG4("ClientSubscribe({}, ({}), {})", m_id, fmt::join(topicNames, ","), + subuid); + auto& sub = m_subscribers[subuid]; + bool replace = false; + if (sub) { + // replace subscription + sub->Update(topicNames, options); + replace = true; + } else { + // create + sub = std::make_unique(this, topicNames, subuid, options); + } + + // limit subscriber min period + if (sub->periodMs < kMinPeriodMs) { + sub->periodMs = kMinPeriodMs; + } + + // update periodic sender (if not local) + if (!m_local) { + m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); + m_setPeriodic(m_periodMs); + } + + // see if this immediately subscribes to any topics + // for transmit efficiency, we want to batch announcements and values, so + // send announcements in first loop and remember what we want to send in + // second loop. + std::vector dataToSend; + dataToSend.reserve(m_server.m_topics.size()); + for (auto&& topic : m_server.m_topics) { + auto tcdIt = topic->clients.find(this); + bool removed = tcdIt != topic->clients.end() && replace && + tcdIt->second.subscribers.erase(sub.get()); + + // is client already subscribed? + bool wasSubscribed = + tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty(); + bool wasSubscribedValue = + wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled + : false; + + bool added = false; + if (sub->Matches(topic->name, topic->special)) { + if (tcdIt == topic->clients.end()) { + tcdIt = topic->clients.try_emplace(this).first; + } + tcdIt->second.AddSubscriber(sub.get()); + added = true; + } + + if (added ^ removed) { + UpdatePeriod(tcdIt->second, topic.get()); + m_server.UpdateMetaTopicSub(topic.get()); + } + + // announce topic to client if not previously announced + if (added && !removed && !wasSubscribed) { + DEBUG4("client {}: announce {}", m_id, topic->name); + SendAnnounce(topic.get(), std::nullopt); + } + + // send last value + if (added && !sub->options.topicsOnly && !wasSubscribedValue && + topic->lastValue) { + dataToSend.emplace_back(topic.get()); + } + } + + for (auto topic : dataToSend) { + DEBUG4("send last value for {} to client {}", topic->name, m_id); + SendValue(topic, topic->lastValue, net::ValueSendMode::kAll); + } +} + +void ServerClient4Base::ClientUnsubscribe(int subuid) { + DEBUG3("ClientUnsubscribe({}, {})", m_id, subuid); + auto subIt = m_subscribers.find(subuid); + if (subIt == m_subscribers.end() || !subIt->getSecond()) { + return; // nothing to do + } + auto sub = subIt->getSecond().get(); + + // remove from topics + for (auto&& topic : m_server.m_topics) { + auto tcdIt = topic->clients.find(this); + if (tcdIt != topic->clients.end()) { + if (tcdIt->second.subscribers.erase(sub)) { + UpdatePeriod(tcdIt->second, topic.get()); + m_server.UpdateMetaTopicSub(topic.get()); + } + } + } + + // delete it from client (future value sets will be ignored) + m_subscribers.erase(subIt); + + // loop over all subscribers to update period + if (!m_local) { + m_periodMs = net::CalculatePeriod( + m_subscribers, [](auto& x) { return x.getSecond()->periodMs; }); + m_setPeriodic(m_periodMs); + } +} + +void ServerClient4Base::ClientSetValue(int pubuid, const Value& value) { + DEBUG4("ClientSetValue({}, {})", m_id, pubuid); + auto publisherIt = m_publishers.find(pubuid); + if (publisherIt == m_publishers.end()) { + WARN("unrecognized client {} pubuid {}, ignoring set", m_id, pubuid); + return; // ignore unrecognized pubuids + } + auto topic = publisherIt->getSecond().get()->topic; + m_server.SetValue(this, topic, value); +} + +bool ServerClient4Base::DoProcessIncomingMessages( + net::ClientMessageQueue& queue, size_t max) { + DEBUG4("ProcessIncomingMessage()"); + max = (std::min)(m_msgsBuf.size(), max); + std::span msgs = + queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max)); + + // just map as a normal client into client=0 calls + bool updatepub = false; + bool updatesub = false; + for (const auto& elem : msgs) { // NOLINT + // common case is value, so check that first + if (auto msg = std::get_if(&elem.contents)) { + ClientSetValue(msg->pubuid, msg->value); + } else if (auto msg = std::get_if(&elem.contents)) { + ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties, + msg->options); + updatepub = true; + } else if (auto msg = std::get_if(&elem.contents)) { + ClientUnpublish(msg->pubuid); + updatepub = true; + } else if (auto msg = std::get_if(&elem.contents)) { + ClientSetProperties(msg->name, msg->update); + } else if (auto msg = std::get_if(&elem.contents)) { + ClientSubscribe(msg->subuid, msg->topicNames, msg->options); + updatesub = true; + } else if (auto msg = std::get_if(&elem.contents)) { + ClientUnsubscribe(msg->subuid); + updatesub = true; + } + } + if (updatepub) { + UpdateMetaClientPub(); + } + if (updatesub) { + UpdateMetaClientSub(); + } + + return msgs.size() == max; // don't know for sure, but there might be more +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient4Base.h b/ntcore/src/main/native/cpp/server/ServerClient4Base.h new file mode 100644 index 00000000000..cda8ab8100a --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4Base.h @@ -0,0 +1,47 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include + +#include "net/ClientMessageQueue.h" +#include "server/Functions.h" +#include "server/ServerClient.h" + +namespace nt::server { + +class ServerClient4Base : public ServerClient, + protected net::ClientMessageHandler { + public: + ServerClient4Base(std::string_view name, std::string_view connInfo, + bool local, SetPeriodicFunc setPeriodic, ServerImpl& server, + int id, wpi::Logger& logger) + : ServerClient{name, connInfo, local, setPeriodic, server, id, logger} {} + + protected: + // ClientMessageHandler interface + void ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, const wpi::json& properties, + const PubSubOptionsImpl& options) final; + void ClientUnpublish(int pubuid) final; + void ClientSetProperties(std::string_view name, + const wpi::json& update) final; + void ClientSubscribe(int subuid, std::span topicNames, + const PubSubOptionsImpl& options) final; + void ClientUnsubscribe(int subuid) final; + + void ClientSetValue(int pubuid, const Value& value) final; + + bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max); + + wpi::DenseMap m_announceSent; + + private: + std::array m_msgsBuf; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClientLocal.cpp b/ntcore/src/main/native/cpp/server/ServerClientLocal.cpp new file mode 100644 index 00000000000..557a30f11b3 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClientLocal.cpp @@ -0,0 +1,52 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClientLocal.h" + +#include "server/ServerImpl.h" + +using namespace nt::server; + +void ServerClientLocal::SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) { + if (m_server.m_local) { + m_server.m_local->ServerSetValue(topic->localTopic, value); + } +} + +void ServerClientLocal::SendAnnounce(ServerTopic* topic, + std::optional pubuid) { + if (m_server.m_local) { + auto& sent = m_announceSent[topic]; + if (sent) { + return; + } + sent = true; + + topic->localTopic = m_server.m_local->ServerAnnounce( + topic->name, 0, topic->typeStr, topic->properties, pubuid); + } +} + +void ServerClientLocal::SendUnannounce(ServerTopic* topic) { + if (m_server.m_local) { + auto& sent = m_announceSent[topic]; + if (!sent) { + return; + } + sent = false; + m_server.m_local->ServerUnannounce(topic->name, topic->localTopic); + } +} + +void ServerClientLocal::SendPropertiesUpdate(ServerTopic* topic, + const wpi::json& update, + bool ack) { + if (m_server.m_local) { + if (!m_announceSent.lookup(topic)) { + return; + } + m_server.m_local->ServerPropertiesUpdate(topic->name, update, ack); + } +} diff --git a/ntcore/src/main/native/cpp/server/ServerClientLocal.h b/ntcore/src/main/native/cpp/server/ServerClientLocal.h new file mode 100644 index 00000000000..74a211b761d --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClientLocal.h @@ -0,0 +1,46 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +#include "server/ServerClient4Base.h" + +namespace nt::server { + +class ServerClientLocal final : public ServerClient4Base { + public: + ServerClientLocal(ServerImpl& server, int id, wpi::Logger& logger) + : ServerClient4Base{"", "", true, [](uint32_t) {}, server, id, logger} {} + + bool ProcessIncomingText(std::string_view data) final { return false; } + bool ProcessIncomingBinary(std::span data) final { + return false; + } + + bool ProcessIncomingMessages(size_t max) final { + if (!m_queue) { + return false; + } + return DoProcessIncomingMessages(*m_queue, max); + } + + void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) final; + void SendAnnounce(ServerTopic* topic, std::optional pubuid) final; + void SendUnannounce(ServerTopic* topic) final; + void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) final; + void SendOutgoing(uint64_t curTimeMs, bool flush) final {} + void Flush() final {} + + void SetQueue(net::ClientMessageQueue* queue) { m_queue = queue; } + + private: + net::ClientMessageQueue* m_queue = nullptr; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.cpp b/ntcore/src/main/native/cpp/server/ServerImpl.cpp index 4d7c9fb5bca..63f48d5752b 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.cpp @@ -24,1194 +24,33 @@ #include #include "Log.h" -#include "Types_internal.h" -#include "net/Message.h" #include "net/WireEncoder.h" #include "net3/WireConnection3.h" #include "net3/WireEncoder3.h" #include "networktables/NetworkTableValue.h" #include "ntcore_c.h" +#include "server/MessagePackWriter.h" +#include "server/ServerClient3.h" +#include "server/ServerClient4.h" +#include "server/ServerClientLocal.h" +#include "server/ServerPublisher.h" +#include "server/ServerTopic.h" using namespace nt; using namespace nt::server; using namespace mpack; -// maximum amount of time the wire can be not ready to send another -// transmission before we close the connection -static constexpr uint32_t kWireMaxNotReadyUs = 1000000; - -namespace { -struct Writer : public mpack_writer_t { - Writer() { - mpack_writer_init(this, buf, sizeof(buf)); - mpack_writer_set_context(this, &os); - mpack_writer_set_flush( - this, [](mpack_writer_t* w, const char* buffer, size_t count) { - static_cast(w->context)->write(buffer, count); - }); - } - - std::vector bytes; - wpi::raw_uvector_ostream os{bytes}; - char buf[128]; -}; -} // namespace - -static void WriteOptions(mpack_writer_t& w, const PubSubOptionsImpl& options) { - int size = - (options.sendAll ? 1 : 0) + (options.topicsOnly ? 1 : 0) + - (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs ? 1 : 0) + - (options.prefixMatch ? 1 : 0); - mpack_start_map(&w, size); - if (options.sendAll) { - mpack_write_str(&w, "all"); - mpack_write_bool(&w, true); - } - if (options.topicsOnly) { - mpack_write_str(&w, "topicsonly"); - mpack_write_bool(&w, true); - } - if (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs) { - mpack_write_str(&w, "periodic"); - mpack_write_float(&w, options.periodicMs / 1000.0); - } - if (options.prefixMatch) { - mpack_write_str(&w, "prefix"); - mpack_write_bool(&w, true); - } - mpack_finish_map(&w); -} - -void ServerImpl::PublisherData::UpdateMeta() { - { - Writer w; - mpack_start_map(&w, 2); - mpack_write_str(&w, "uid"); - mpack_write_int(&w, pubuid); - mpack_write_str(&w, "topic"); - mpack_write_str(&w, topic->name); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaClient = std::move(w.bytes); - } - } - { - Writer w; - mpack_start_map(&w, 2); - mpack_write_str(&w, "client"); - if (client) { - mpack_write_str(&w, client->GetName()); - } else { - mpack_write_str(&w, ""); - } - mpack_write_str(&w, "pubuid"); - mpack_write_int(&w, pubuid); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaTopic = std::move(w.bytes); - } - } -} - -void ServerImpl::SubscriberData::UpdateMeta() { - { - Writer w; - mpack_start_map(&w, 3); - mpack_write_str(&w, "uid"); - mpack_write_int(&w, subuid); - mpack_write_str(&w, "topics"); - mpack_start_array(&w, topicNames.size()); - for (auto&& name : topicNames) { - mpack_write_str(&w, name); - } - mpack_finish_array(&w); - mpack_write_str(&w, "options"); - WriteOptions(w, options); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaClient = std::move(w.bytes); - } - } - { - Writer w; - mpack_start_map(&w, 3); - mpack_write_str(&w, "client"); - if (client) { - mpack_write_str(&w, client->GetName()); - } else { - mpack_write_str(&w, ""); - } - mpack_write_str(&w, "subuid"); - mpack_write_int(&w, subuid); - mpack_write_str(&w, "options"); - WriteOptions(w, options); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaTopic = std::move(w.bytes); - } - } -} - -void ServerImpl::ClientData::UpdateMetaClientPub() { - if (!m_metaPub) { - return; - } - Writer w; - mpack_start_array(&w, m_publishers.size()); - for (auto&& pub : m_publishers) { - mpack_write_object_bytes( - &w, reinterpret_cast(pub.second->metaClient.data()), - pub.second->metaClient.size()); - } - mpack_finish_array(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - m_server.SetValue(nullptr, m_metaPub, Value::MakeRaw(std::move(w.bytes))); - } -} - -void ServerImpl::ClientData::UpdateMetaClientSub() { - if (!m_metaSub) { - return; - } - Writer w; - mpack_start_array(&w, m_subscribers.size()); - for (auto&& sub : m_subscribers) { - mpack_write_object_bytes( - &w, reinterpret_cast(sub.second->metaClient.data()), - sub.second->metaClient.size()); - } - mpack_finish_array(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - m_server.SetValue(nullptr, m_metaSub, Value::MakeRaw(std::move(w.bytes))); - } -} - -std::span ServerImpl::ClientData::GetSubscribers( - std::string_view name, bool special, - wpi::SmallVectorImpl& buf) { - buf.resize(0); - for (auto&& subPair : m_subscribers) { - SubscriberData* subscriber = subPair.getSecond().get(); - if (subscriber->Matches(name, special)) { - buf.emplace_back(subscriber); - } - } - return {buf.data(), buf.size()}; -} - -void ServerImpl::ClientData4Base::ClientPublish( - int pubuid, std::string_view name, std::string_view typeStr, - const wpi::json& properties, const PubSubOptionsImpl& options) { - DEBUG3("ClientPublish({}, {}, {}, {})", m_id, name, pubuid, typeStr); - auto topic = m_server.CreateTopic(this, name, typeStr, properties); - - // create publisher - auto [publisherIt, isNew] = m_publishers.try_emplace( - pubuid, std::make_unique(this, topic, pubuid)); - if (!isNew) { - WARN("client {} duplicate publish of pubuid {}", m_id, pubuid); - } else { - // add publisher to topic - topic->AddPublisher(this, publisherIt->getSecond().get()); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - } - - // respond with announce with pubuid to client - DEBUG4("client {}: announce {} pubuid {}", m_id, topic->name, pubuid); - SendAnnounce(topic, pubuid); -} - -void ServerImpl::ClientData4Base::ClientUnpublish(int pubuid) { - DEBUG3("ClientUnpublish({}, {})", m_id, pubuid); - auto publisherIt = m_publishers.find(pubuid); - if (publisherIt == m_publishers.end()) { - return; // nothing to do - } - auto publisher = publisherIt->getSecond().get(); - auto topic = publisher->topic; - - // remove publisher from topic - topic->RemovePublisher(this, publisher); - - // remove publisher from client - m_publishers.erase(publisherIt); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - - // delete topic if no longer published - if (!topic->IsPublished()) { - m_server.DeleteTopic(topic); - } -} - -void ServerImpl::ClientData4Base::ClientSetProperties(std::string_view name, - const wpi::json& update) { - DEBUG4("ClientSetProperties({}, {}, {})", m_id, name, update.dump()); - auto topicIt = m_server.m_nameTopics.find(name); - if (topicIt == m_server.m_nameTopics.end() || - !topicIt->second->IsPublished()) { - WARN( - "server ignoring SetProperties({}) from client {} on unpublished topic " - "'{}'; publish or set a value first", - update.dump(), m_id, name); - return; // nothing to do - } - auto topic = topicIt->second; - if (topic->special) { - WARN("server ignoring SetProperties({}) from client {} on meta topic '{}'", - update.dump(), m_id, name); - return; // nothing to do - } - m_server.SetProperties(nullptr, topic, update); -} - -void ServerImpl::ClientData4Base::ClientSubscribe( - int subuid, std::span topicNames, - const PubSubOptionsImpl& options) { - DEBUG4("ClientSubscribe({}, ({}), {})", m_id, fmt::join(topicNames, ","), - subuid); - auto& sub = m_subscribers[subuid]; - bool replace = false; - if (sub) { - // replace subscription - sub->Update(topicNames, options); - replace = true; - } else { - // create - sub = std::make_unique(this, topicNames, subuid, options); - } - - // limit subscriber min period - if (sub->periodMs < kMinPeriodMs) { - sub->periodMs = kMinPeriodMs; - } - - // update periodic sender (if not local) - if (!m_local) { - m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); - m_setPeriodic(m_periodMs); - } - - // see if this immediately subscribes to any topics - // for transmit efficiency, we want to batch announcements and values, so - // send announcements in first loop and remember what we want to send in - // second loop. - std::vector dataToSend; - dataToSend.reserve(m_server.m_topics.size()); - for (auto&& topic : m_server.m_topics) { - auto tcdIt = topic->clients.find(this); - bool removed = tcdIt != topic->clients.end() && replace && - tcdIt->second.subscribers.erase(sub.get()); - - // is client already subscribed? - bool wasSubscribed = - tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty(); - bool wasSubscribedValue = - wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled - : false; - - bool added = false; - if (sub->Matches(topic->name, topic->special)) { - if (tcdIt == topic->clients.end()) { - tcdIt = topic->clients.try_emplace(this).first; - } - tcdIt->second.AddSubscriber(sub.get()); - added = true; - } - - if (added ^ removed) { - UpdatePeriod(tcdIt->second, topic.get()); - m_server.UpdateMetaTopicSub(topic.get()); - } - - // announce topic to client if not previously announced - if (added && !removed && !wasSubscribed) { - DEBUG4("client {}: announce {}", m_id, topic->name); - SendAnnounce(topic.get(), std::nullopt); - } - - // send last value - if (added && !sub->options.topicsOnly && !wasSubscribedValue && - topic->lastValue) { - dataToSend.emplace_back(topic.get()); - } - } - - for (auto topic : dataToSend) { - DEBUG4("send last value for {} to client {}", topic->name, m_id); - SendValue(topic, topic->lastValue, net::ValueSendMode::kAll); - } -} - -void ServerImpl::ClientData4Base::ClientUnsubscribe(int subuid) { - DEBUG3("ClientUnsubscribe({}, {})", m_id, subuid); - auto subIt = m_subscribers.find(subuid); - if (subIt == m_subscribers.end() || !subIt->getSecond()) { - return; // nothing to do - } - auto sub = subIt->getSecond().get(); - - // remove from topics - for (auto&& topic : m_server.m_topics) { - auto tcdIt = topic->clients.find(this); - if (tcdIt != topic->clients.end()) { - if (tcdIt->second.subscribers.erase(sub)) { - UpdatePeriod(tcdIt->second, topic.get()); - m_server.UpdateMetaTopicSub(topic.get()); - } - } - } - - // delete it from client (future value sets will be ignored) - m_subscribers.erase(subIt); - - // loop over all subscribers to update period - if (!m_local) { - m_periodMs = net::CalculatePeriod( - m_subscribers, [](auto& x) { return x.getSecond()->periodMs; }); - m_setPeriodic(m_periodMs); - } -} - -void ServerImpl::ClientData4Base::ClientSetValue(int pubuid, - const Value& value) { - DEBUG4("ClientSetValue({}, {})", m_id, pubuid); - auto publisherIt = m_publishers.find(pubuid); - if (publisherIt == m_publishers.end()) { - WARN("unrecognized client {} pubuid {}, ignoring set", m_id, pubuid); - return; // ignore unrecognized pubuids - } - auto topic = publisherIt->getSecond().get()->topic; - m_server.SetValue(this, topic, value); -} - -void ServerImpl::ClientDataLocal::SendValue(TopicData* topic, - const Value& value, - net::ValueSendMode mode) { - if (m_server.m_local) { - m_server.m_local->ServerSetValue(topic->localTopic, value); - } -} - -void ServerImpl::ClientDataLocal::SendAnnounce(TopicData* topic, - std::optional pubuid) { - if (m_server.m_local) { - auto& sent = m_announceSent[topic]; - if (sent) { - return; - } - sent = true; - - topic->localTopic = m_server.m_local->ServerAnnounce( - topic->name, 0, topic->typeStr, topic->properties, pubuid); - } -} - -void ServerImpl::ClientDataLocal::SendUnannounce(TopicData* topic) { - if (m_server.m_local) { - auto& sent = m_announceSent[topic]; - if (!sent) { - return; - } - sent = false; - m_server.m_local->ServerUnannounce(topic->name, topic->localTopic); - } -} - -void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic, - const wpi::json& update, - bool ack) { - if (m_server.m_local) { - if (!m_announceSent.lookup(topic)) { - return; - } - m_server.m_local->ServerPropertiesUpdate(topic->name, update, ack); - } -} - -bool ServerImpl::ClientData4Base::DoProcessIncomingMessages( - net::ClientMessageQueue& queue, size_t max) { - DEBUG4("ProcessIncomingMessage()"); - max = (std::min)(m_msgsBuf.size(), max); - std::span msgs = - queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max)); - - // just map as a normal client into client=0 calls - bool updatepub = false; - bool updatesub = false; - for (const auto& elem : msgs) { // NOLINT - // common case is value, so check that first - if (auto msg = std::get_if(&elem.contents)) { - ClientSetValue(msg->pubuid, msg->value); - } else if (auto msg = std::get_if(&elem.contents)) { - ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties, - msg->options); - updatepub = true; - } else if (auto msg = std::get_if(&elem.contents)) { - ClientUnpublish(msg->pubuid); - updatepub = true; - } else if (auto msg = std::get_if(&elem.contents)) { - ClientSetProperties(msg->name, msg->update); - } else if (auto msg = std::get_if(&elem.contents)) { - ClientSubscribe(msg->subuid, msg->topicNames, msg->options); - updatesub = true; - } else if (auto msg = std::get_if(&elem.contents)) { - ClientUnsubscribe(msg->subuid); - updatesub = true; - } - } - if (updatepub) { - UpdateMetaClientPub(); - } - if (updatesub) { - UpdateMetaClientSub(); - } - - return msgs.size() == max; // don't know for sure, but there might be more -} - -bool ServerImpl::ClientData4::ProcessIncomingText(std::string_view data) { - constexpr int kMaxImmProcessing = 10; - bool queueWasEmpty = m_incoming.empty(); - // can't directly process, because we don't know how big it is - WireDecodeText(data, m_incoming, m_logger); - if (queueWasEmpty && - DoProcessIncomingMessages(m_incoming, kMaxImmProcessing)) { - m_wire.StopRead(); - return true; - } - return false; -} - -bool ServerImpl::ClientData4::ProcessIncomingBinary( - std::span data) { - constexpr int kMaxImmProcessing = 10; - // if we've already queued, keep queuing - int count = m_incoming.empty() ? 0 : kMaxImmProcessing; - for (;;) { - if (data.empty()) { - break; - } - - // decode message - int pubuid; - Value value; - std::string error; - if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) { - m_wire.Disconnect(fmt::format("binary decode error: {}", error)); - break; - } - - // respond to RTT ping - if (pubuid == -1) { - auto now = wpi::Now(); - DEBUG4("RTT ping from {}, responding with time={}", m_id, now); - m_wire.SendBinary( - [&](auto& os) { net::WireEncodeBinary(os, -1, now, value); }); - continue; - } - - // handle value set - if (++count < kMaxImmProcessing) { - ClientSetValue(pubuid, value); - } else { - m_incoming.ClientSetValue(pubuid, value); - } - } - if (count >= kMaxImmProcessing) { - m_wire.StopRead(); - return true; - } - return false; -} - -void ServerImpl::ClientData4::SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) { - m_outgoing.SendValue(topic->id, value, mode); -} - -void ServerImpl::ClientData4::SendAnnounce(TopicData* topic, - std::optional pubuid) { - auto& sent = m_announceSent[topic]; - if (sent) { - return; - } - sent = true; - - if (m_local) { - int unsent = m_wire.WriteText([&](auto& os) { - net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr, - topic->properties, pubuid); - }); - if (unsent < 0) { - return; // error - } - if (unsent == 0 && m_wire.Flush() == 0) { - return; - } - } - m_outgoing.SendMessage( - topic->id, net::AnnounceMsg{topic->name, static_cast(topic->id), - topic->typeStr, pubuid, topic->properties}); - m_server.m_controlReady = true; -} - -void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) { - auto& sent = m_announceSent[topic]; - if (!sent) { - return; - } - sent = false; - - if (m_local) { - int unsent = m_wire.WriteText([&](auto& os) { - net::WireEncodeUnannounce(os, topic->name, topic->id); - }); - if (unsent < 0) { - return; // error - } - if (unsent == 0 && m_wire.Flush() == 0) { - return; - } - } - m_outgoing.SendMessage( - topic->id, net::UnannounceMsg{topic->name, static_cast(topic->id)}); - m_outgoing.EraseId(topic->id); - m_server.m_controlReady = true; -} - -void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic, - const wpi::json& update, - bool ack) { - if (!m_announceSent.lookup(topic)) { - return; - } - - if (m_local) { - int unsent = m_wire.WriteText([&](auto& os) { - net::WireEncodePropertiesUpdate(os, topic->name, update, ack); - }); - if (unsent < 0) { - return; // error - } - if (unsent == 0 && m_wire.Flush() == 0) { - return; - } - } - m_outgoing.SendMessage(topic->id, - net::PropertiesUpdateMsg{topic->name, update, ack}); - m_server.m_controlReady = true; -} - -void ServerImpl::ClientData4::SendOutgoing(uint64_t curTimeMs, bool flush) { - if (m_wire.GetVersion() >= 0x0401) { - if (!m_ping.Send(curTimeMs)) { - return; - } - } - m_outgoing.SendOutgoing(curTimeMs, flush); -} - -void ServerImpl::ClientData4::UpdatePeriod(TopicData::TopicClientData& tcd, - TopicData* topic) { - uint32_t period = net::CalculatePeriod(tcd.subscribers, - [](auto& x) { return x->periodMs; }); - DEBUG4("updating {} period to {} ms", topic->name, period); - m_outgoing.SetPeriod(topic->id, period); -} - -bool ServerImpl::ClientData3::TopicData3::UpdateFlags(TopicData* topic) { - unsigned int newFlags = topic->persistent ? NT_PERSISTENT : 0; - bool updated = flags != newFlags; - flags = newFlags; - return updated; -} - -bool ServerImpl::ClientData3::ProcessIncomingBinary( - std::span data) { - if (!m_decoder.Execute(&data)) { - m_wire.Disconnect(m_decoder.GetError()); - } - return false; -} - -void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) { - if (m_state != kStateRunning) { - if (mode == net::ValueSendMode::kImm) { - mode = net::ValueSendMode::kAll; - } - } else if (m_local) { - mode = net::ValueSendMode::kImm; // always send local immediately - } - TopicData3* topic3 = GetTopic3(topic); - bool added = false; - - switch (mode) { - case net::ValueSendMode::kDisabled: // do nothing - break; - case net::ValueSendMode::kImm: // send immediately - ++topic3->seqNum; - if (topic3->sentAssign) { - net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id, - topic3->seqNum.value(), value); - } else { - net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, - topic->id, topic3->seqNum.value(), value, - topic3->flags); - topic3->sentAssign = true; - } - if (m_local) { - Flush(); - } - break; - case net::ValueSendMode::kNormal: { - // replace, or append if not present - wpi::DenseMap::iterator it; - std::tie(it, added) = - m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size()); - if (!added && it->second < m_outgoing.size()) { - auto& msg = m_outgoing[it->second]; - if (msg.Is(net3::Message3::kEntryUpdate) || - msg.Is(net3::Message3::kEntryAssign)) { - if (msg.id() == topic->id) { // should always be true - msg.SetValue(value); - break; - } - } - } - } - // fallthrough - case net::ValueSendMode::kAll: // append to outgoing - if (!added) { - m_outgoingValueMap[topic->id] = m_outgoing.size(); - } - ++topic3->seqNum; - if (topic3->sentAssign) { - m_outgoing.emplace_back(net3::Message3::EntryUpdate( - topic->id, topic3->seqNum.value(), value)); - } else { - m_outgoing.emplace_back(net3::Message3::EntryAssign( - topic->name, topic->id, topic3->seqNum.value(), value, - topic3->flags)); - topic3->sentAssign = true; - } - break; - } -} - -void ServerImpl::ClientData3::SendAnnounce(TopicData* topic, - std::optional pubuid) { - // ignore if we've not yet built the subscriber - if (m_subscribers.empty()) { - return; - } - - // subscribe to all non-special topics - if (!topic->special) { - topic->clients[this].AddSubscriber(m_subscribers[0].get()); - m_server.UpdateMetaTopicSub(topic); - } - - // NT3 requires a value to send the assign message, so the assign message - // will get sent when the first value is sent (by SendValue). -} - -void ServerImpl::ClientData3::SendUnannounce(TopicData* topic) { - auto it = m_topics3.find(topic); - if (it == m_topics3.end()) { - return; // never sent to client - } - bool sentAssign = it->second.sentAssign; - m_topics3.erase(it); - if (!sentAssign) { - return; // never sent to client - } - - // map to NT3 delete message - if (m_local && m_state == kStateRunning) { - net3::WireEncodeEntryDelete(m_wire.Send().stream(), topic->id); - Flush(); - } else { - m_outgoing.emplace_back(net3::Message3::EntryDelete(topic->id)); - } -} - -void ServerImpl::ClientData3::SendPropertiesUpdate(TopicData* topic, - const wpi::json& update, - bool ack) { - if (ack) { - return; // we don't ack in NT3 - } - auto it = m_topics3.find(topic); - if (it == m_topics3.end()) { - return; // never sent to client - } - TopicData3* topic3 = &it->second; - // Don't send flags update unless we've already sent an assign message. - // The assign message will contain the updated flags when we eventually - // send it. - if (topic3->UpdateFlags(topic) && topic3->sentAssign) { - if (m_local && m_state == kStateRunning) { - net3::WireEncodeFlagsUpdate(m_wire.Send().stream(), topic->id, - topic3->flags); - Flush(); - } else { - m_outgoing.emplace_back( - net3::Message3::FlagsUpdate(topic->id, topic3->flags)); - } - } -} - -void ServerImpl::ClientData3::SendOutgoing(uint64_t curTimeMs, bool flush) { - if (m_outgoing.empty() || m_state != kStateRunning) { - return; // nothing to do - } - - // rate limit frequency of transmissions - if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) { - return; - } - - if (!m_wire.Ready()) { - uint64_t lastFlushTime = m_wire.GetLastFlushTime(); - uint64_t now = wpi::Now(); - if (lastFlushTime != 0 && now > (lastFlushTime + kWireMaxNotReadyUs)) { - m_wire.Disconnect("transmit stalled"); - } - return; - } - - auto out = m_wire.Send(); - for (auto&& msg : m_outgoing) { - net3::WireEncode(out.stream(), msg); - } - m_wire.Flush(); - m_outgoing.resize(0); - m_outgoingValueMap.clear(); - m_lastSendMs = curTimeMs; -} - -void ServerImpl::ClientData3::KeepAlive() { - DEBUG4("KeepAlive({})", m_id); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected KeepAlive message"); - return; - } - // ignore -} - -void ServerImpl::ClientData3::ServerHelloDone() { - DEBUG4("ServerHelloDone({})", m_id); - m_decoder.SetError("received unexpected ServerHelloDone message"); -} - -void ServerImpl::ClientData3::ClientHelloDone() { - DEBUG4("ClientHelloDone({})", m_id); - if (m_state != kStateServerHelloComplete) { - m_decoder.SetError("received unexpected ClientHelloDone message"); - return; - } - m_state = kStateRunning; -} - -void ServerImpl::ClientData3::ClearEntries() { - DEBUG4("ClearEntries({})", m_id); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected ClearEntries message"); - return; - } - - for (auto topic3it : m_topics3) { - TopicData* topic = topic3it.first; - - // make sure we send assign the next time - topic3it.second.sentAssign = false; - - // unpublish from this client (if it was previously published) - if (topic3it.second.published) { - topic3it.second.published = false; - auto publisherIt = m_publishers.find(topic3it.second.pubuid); - if (publisherIt != m_publishers.end()) { - // remove publisher from topic - topic->RemovePublisher(this, publisherIt->second.get()); - - // remove publisher from client - m_publishers.erase(publisherIt); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - } - } - - // set retained=false - m_server.SetProperties(this, topic, {{"retained", false}}); - } -} - -void ServerImpl::ClientData3::ProtoUnsup(unsigned int proto_rev) { - DEBUG4("ProtoUnsup({})", m_id); - m_decoder.SetError("received unexpected ProtoUnsup message"); -} - -void ServerImpl::ClientData3::ClientHello(std::string_view self_id, - unsigned int proto_rev) { - DEBUG4("ClientHello({}, '{}', {:04x})", m_id, self_id, proto_rev); - if (m_state != kStateInitial) { - m_decoder.SetError("received unexpected ClientHello message"); - return; - } - if (proto_rev != 0x0300) { - net3::WireEncodeProtoUnsup(m_wire.Send().stream(), 0x0300); - Flush(); - m_decoder.SetError( - fmt::format("unsupported protocol version {:04x}", proto_rev)); - return; - } - // create a unique name (just ignore provided client id) - m_name = fmt::format("NT3@{}", m_connInfo); - m_connected(m_name, 0x0300); - m_connected = nullptr; // no longer required - - // create client meta topics - m_metaPub = m_server.CreateMetaTopic(fmt::format("$clientpub${}", m_name)); - m_metaSub = m_server.CreateMetaTopic(fmt::format("$clientsub${}", m_name)); - - // subscribe and send initial assignments - auto& sub = m_subscribers[0]; - std::string prefix; - PubSubOptions options; - options.prefixMatch = true; - sub = std::make_unique( - this, std::span{{prefix}}, 0, options); - m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); - m_setPeriodic(m_periodMs); - - { - auto out = m_wire.Send(); - net3::WireEncodeServerHello(out.stream(), 0, "server"); - for (auto&& topic : m_server.m_topics) { - if (topic && !topic->special && topic->IsPublished() && - topic->lastValue) { - DEBUG4("client {}: initial announce of '{}' (id {})", m_id, topic->name, - topic->id); - topic->clients[this].AddSubscriber(sub.get()); - m_server.UpdateMetaTopicSub(topic.get()); - - TopicData3* topic3 = GetTopic3(topic.get()); - ++topic3->seqNum; - net3::WireEncodeEntryAssign(out.stream(), topic->name, topic->id, - topic3->seqNum.value(), topic->lastValue, - topic3->flags); - topic3->sentAssign = true; - } - } - net3::WireEncodeServerHelloDone(out.stream()); - } - Flush(); - m_state = kStateServerHelloComplete; - - // update meta topics - UpdateMetaClientPub(); - UpdateMetaClientSub(); -} - -void ServerImpl::ClientData3::ServerHello(unsigned int flags, - std::string_view self_id) { - DEBUG4("ServerHello({}, {}, {})", m_id, flags, self_id); - m_decoder.SetError("received unexpected ServerHello message"); -} - -void ServerImpl::ClientData3::EntryAssign(std::string_view name, - unsigned int id, unsigned int seq_num, - const Value& value, - unsigned int flags) { - DEBUG4("EntryAssign({}, {}, {}, {}, {})", m_id, id, seq_num, - static_cast(value.type()), flags); - if (id != 0xffff) { - DEBUG3("ignored EntryAssign from {} with non-0xffff id {}", m_id, id); - return; - } - - // convert from NT3 info - auto typeStr = TypeToString(value.type()); - wpi::json properties = wpi::json::object(); - properties["retained"] = true; // treat all NT3 published topics as retained - properties["cached"] = true; // treat all NT3 published topics as cached - if ((flags & NT_PERSISTENT) != 0) { - properties["persistent"] = true; - } - - // create topic - auto topic = m_server.CreateTopic(this, name, typeStr, properties); - TopicData3* topic3 = GetTopic3(topic); - if (topic3->published || topic3->sentAssign) { - WARN("ignoring client {} duplicate publish of '{}'", m_id, name); - return; - } - ++topic3->seqNum; - topic3->published = true; - topic3->pubuid = m_nextPubUid++; - topic3->sentAssign = true; - - // create publisher - auto [publisherIt, isNew] = m_publishers.try_emplace( - topic3->pubuid, - std::make_unique(this, topic, topic3->pubuid)); - if (!isNew) { - return; // shouldn't happen, but just in case... - } - - // add publisher to topic - topic->AddPublisher(this, publisherIt->getSecond().get()); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - - // acts as an announce + data update - SendAnnounce(topic, topic3->pubuid); - m_server.SetValue(this, topic, value); - - // respond with assign message with assigned topic ID - if (m_local && m_state == kStateRunning) { - net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, topic->id, - topic3->seqNum.value(), value, topic3->flags); - } else { - m_outgoing.emplace_back(net3::Message3::EntryAssign( - topic->name, topic->id, topic3->seqNum.value(), value, topic3->flags)); - } -} - -void ServerImpl::ClientData3::EntryUpdate(unsigned int id, unsigned int seq_num, - const Value& value) { - DEBUG4("EntryUpdate({}, {}, {}, {})", m_id, id, seq_num, - static_cast(value.type())); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected EntryUpdate message"); - return; - } - - if (id >= m_server.m_topics.size()) { - DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id); - return; - } - TopicData* topic = m_server.m_topics[id].get(); - if (!topic || !topic->IsPublished()) { - DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id); - return; - } - - TopicData3* topic3 = GetTopic3(topic); - if (!topic3->published) { - topic3->published = true; - topic3->pubuid = m_nextPubUid++; - - // create publisher - auto [publisherIt, isNew] = m_publishers.try_emplace( - topic3->pubuid, - std::make_unique(this, topic, topic3->pubuid)); - if (isNew) { - // add publisher to topic - topic->AddPublisher(this, publisherIt->getSecond().get()); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - } - } - topic3->seqNum = net3::SequenceNumber{seq_num}; - - m_server.SetValue(this, topic, value); -} - -void ServerImpl::ClientData3::FlagsUpdate(unsigned int id, unsigned int flags) { - DEBUG4("FlagsUpdate({}, {}, {})", m_id, id, flags); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected FlagsUpdate message"); - return; - } - if (id >= m_server.m_topics.size()) { - DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id); - return; - } - TopicData* topic = m_server.m_topics[id].get(); - if (!topic || !topic->IsPublished()) { - DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id); - return; - } - if (topic->special) { - DEBUG3("ignored FlagsUpdate from {} on special topic {}", m_id, id); - return; - } - m_server.SetFlags(this, topic, flags); -} - -void ServerImpl::ClientData3::EntryDelete(unsigned int id) { - DEBUG4("EntryDelete({}, {})", m_id, id); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected EntryDelete message"); - return; - } - if (id >= m_server.m_topics.size()) { - DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id); - return; - } - TopicData* topic = m_server.m_topics[id].get(); - if (!topic || !topic->IsPublished()) { - DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id); - return; - } - if (topic->special) { - DEBUG3("ignored EntryDelete from {} on special topic {}", m_id, id); - return; - } - - auto topic3it = m_topics3.find(topic); - if (topic3it != m_topics3.end()) { - // make sure we send assign the next time - topic3it->second.sentAssign = false; - - // unpublish from this client (if it was previously published) - if (topic3it->second.published) { - topic3it->second.published = false; - auto publisherIt = m_publishers.find(topic3it->second.pubuid); - if (publisherIt != m_publishers.end()) { - // remove publisher from topic - topic->RemovePublisher(this, publisherIt->second.get()); - - // remove publisher from client - m_publishers.erase(publisherIt); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - } - } - } - - // set retained=false - m_server.SetProperties(this, topic, {{"retained", false}}); -} - -bool ServerImpl::TopicData::SetProperties(const wpi::json& update) { - if (!update.is_object()) { - return false; - } - bool updated = false; - for (auto&& elem : update.items()) { - if (elem.value().is_null()) { - properties.erase(elem.key()); - } else { - properties[elem.key()] = elem.value(); - } - updated = true; - } - if (updated) { - RefreshProperties(); - } - return updated; -} - -void ServerImpl::TopicData::RefreshProperties() { - persistent = false; - retained = false; - cached = true; - - auto persistentIt = properties.find("persistent"); - if (persistentIt != properties.end()) { - if (auto val = persistentIt->get_ptr()) { - persistent = *val; - } - } - - auto retainedIt = properties.find("retained"); - if (retainedIt != properties.end()) { - if (auto val = retainedIt->get_ptr()) { - retained = *val; - } - } - - auto cachedIt = properties.find("cached"); - if (cachedIt != properties.end()) { - if (auto val = cachedIt->get_ptr()) { - cached = *val; - } - } - - if (!cached) { - lastValue = {}; - lastValueClient = nullptr; - } - - if (!cached && persistent) { - WARN("topic {}: disabling cached property disables persistent storage", - name); - } -} - -bool ServerImpl::TopicData::SetFlags(unsigned int flags_) { - bool updated; - if ((flags_ & NT_PERSISTENT) != 0) { - updated = !persistent; - persistent = true; - properties["persistent"] = true; - } else { - updated = persistent; - persistent = false; - properties.erase("persistent"); - } - if ((flags_ & NT_RETAINED) != 0) { - updated |= !retained; - retained = true; - properties["retained"] = true; - } else { - updated |= retained; - retained = false; - properties.erase("retained"); - } - if ((flags_ & NT_UNCACHED) != 0) { - updated |= cached; - cached = false; - properties["cached"] = false; - lastValue = {}; - lastValueClient = nullptr; - } else { - updated |= !cached; - cached = true; - properties.erase("cached"); - } - if (!cached && persistent) { - WARN("topic {}: disabling cached property disables persistent storage", - name); - } - return updated; -} - -bool ServerImpl::SubscriberData::Matches(std::string_view name, bool special) { - for (auto&& topicName : topicNames) { - if ((!options.prefixMatch && name == topicName) || - (options.prefixMatch && (!special || !topicName.empty()) && - wpi::starts_with(name, topicName))) { - return true; - } - } - return false; -} - ServerImpl::ServerImpl(wpi::Logger& logger) : m_logger{logger} { // local is client 0 - m_clients.emplace_back(std::make_unique(*this, 0, logger)); - m_localClient = static_cast(m_clients.back().get()); + m_clients.emplace_back(std::make_unique(*this, 0, logger)); + m_localClient = static_cast(m_clients.back().get()); } -std::pair ServerImpl::AddClient( - std::string_view name, std::string_view connInfo, bool local, - net::WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) { +std::pair ServerImpl::AddClient(std::string_view name, + std::string_view connInfo, + bool local, + net::WireConnection& wire, + SetPeriodicFunc setPeriodic) { if (name.empty()) { name = "NT4"; } @@ -1232,9 +71,9 @@ std::pair ServerImpl::AddClient( std::string dedupName = fmt::format("{}@{}", name, index); auto& clientData = m_clients[index]; - clientData = std::make_unique(dedupName, connInfo, local, wire, - std::move(setPeriodic), *this, - index, m_logger); + clientData = std::make_unique(dedupName, connInfo, local, wire, + std::move(setPeriodic), *this, + index, m_logger); // create client meta topics clientData->m_metaPub = @@ -1252,8 +91,8 @@ std::pair ServerImpl::AddClient( int ServerImpl::AddClient3(std::string_view connInfo, bool local, net3::WireConnection3& wire, - ServerImpl::Connected3Func connected, - ServerImpl::SetPeriodicFunc setPeriodic) { + Connected3Func connected, + SetPeriodicFunc setPeriodic) { size_t index = m_clients.size(); // find an empty slot; we can't check for duplicates until we get a hello. // just do a linear search as number of clients is typically small (<10) @@ -1267,7 +106,7 @@ int ServerImpl::AddClient3(std::string_view connInfo, bool local, m_clients.emplace_back(); } - m_clients[index] = std::make_unique( + m_clients[index] = std::make_unique( connInfo, local, wire, std::move(connected), std::move(setPeriodic), *this, index, m_logger); @@ -1280,7 +119,7 @@ std::shared_ptr ServerImpl::RemoveClient(int clientId) { auto& client = m_clients[clientId]; // remove all publishers and subscribers for this client - wpi::SmallVector toDelete; + wpi::SmallVector toDelete; for (auto&& topic : m_topics) { bool pubChanged = false; bool subChanged = false; @@ -1693,11 +532,11 @@ std::string ServerImpl::LoadPersistent(std::string_view in) { return allerrors; } -ServerImpl::TopicData* ServerImpl::CreateTopic(ClientData* client, - std::string_view name, - std::string_view typeStr, - const wpi::json& properties, - bool special) { +ServerTopic* ServerImpl::CreateTopic(ServerClient* client, + std::string_view name, + std::string_view typeStr, + const wpi::json& properties, + bool special) { auto& topic = m_nameTopics[name]; if (topic) { if (typeStr != topic->typeStr) { @@ -1709,7 +548,7 @@ ServerImpl::TopicData* ServerImpl::CreateTopic(ClientData* client, } else { // new topic unsigned int id = m_topics.emplace_back( - std::make_unique(m_logger, name, typeStr, properties)); + std::make_unique(m_logger, name, typeStr, properties)); topic = m_topics[id].get(); topic->id = id; topic->special = special; @@ -1720,7 +559,7 @@ ServerImpl::TopicData* ServerImpl::CreateTopic(ClientData* client, } // look for subscriber matching prefixes - wpi::SmallVector subscribersBuf; + wpi::SmallVector subscribersBuf; auto subscribers = aClient->GetSubscribers(name, topic->special, subscribersBuf); @@ -1760,11 +599,11 @@ ServerImpl::TopicData* ServerImpl::CreateTopic(ClientData* client, return topic; } -ServerImpl::TopicData* ServerImpl::CreateMetaTopic(std::string_view name) { +ServerTopic* ServerImpl::CreateMetaTopic(std::string_view name) { return CreateTopic(nullptr, name, "msgpack", {{"retained", true}}, true); } -void ServerImpl::DeleteTopic(TopicData* topic) { +void ServerImpl::DeleteTopic(ServerTopic* topic) { if (!topic) { return; } @@ -1790,7 +629,7 @@ void ServerImpl::DeleteTopic(TopicData* topic) { m_topics.erase(topic->id); } -void ServerImpl::SetProperties(ClientData* client, TopicData* topic, +void ServerImpl::SetProperties(ServerClient* client, ServerTopic* topic, const wpi::json& update) { DEBUG4("SetProperties({}, {}, {})", client ? client->GetId() : -1, topic->name, update.dump()); @@ -1804,7 +643,7 @@ void ServerImpl::SetProperties(ClientData* client, TopicData* topic, } } -void ServerImpl::SetFlags(ClientData* client, TopicData* topic, +void ServerImpl::SetFlags(ServerClient* client, ServerTopic* topic, unsigned int flags) { bool wasPersistent = topic->persistent; if (topic->SetFlags(flags)) { @@ -1822,7 +661,7 @@ void ServerImpl::SetFlags(ClientData* client, TopicData* topic, } } -void ServerImpl::SetValue(ClientData* client, TopicData* topic, +void ServerImpl::SetValue(ServerClient* client, ServerTopic* topic, const Value& value) { // update retained value if from same client or timestamp newer if (topic->cached && (!topic->lastValue || topic->lastValueClient == client || @@ -1868,7 +707,7 @@ void ServerImpl::UpdateMetaClients(const std::vector& conns) { } } -void ServerImpl::UpdateMetaTopicPub(TopicData* topic) { +void ServerImpl::UpdateMetaTopicPub(ServerTopic* topic) { if (!topic->metaPub) { return; } @@ -1891,7 +730,7 @@ void ServerImpl::UpdateMetaTopicPub(TopicData* topic) { } } -void ServerImpl::UpdateMetaTopicSub(TopicData* topic) { +void ServerImpl::UpdateMetaTopicSub(ServerTopic* topic) { if (!topic->metaSub) { return; } @@ -1914,7 +753,7 @@ void ServerImpl::UpdateMetaTopicSub(TopicData* topic) { } } -void ServerImpl::PropertiesChanged(ClientData* client, TopicData* topic, +void ServerImpl::PropertiesChanged(ServerClient* client, ServerTopic* topic, const wpi::json& update) { // removing some properties can result in the topic being unpublished if (!topic->IsPublished()) { diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.h b/ntcore/src/main/native/cpp/server/ServerImpl.h index 55c39826178..13e4dfe5b3b 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.h +++ b/ntcore/src/main/native/cpp/server/ServerImpl.h @@ -6,33 +6,19 @@ #include -#include -#include #include #include #include #include -#include #include -#include -#include #include #include -#include +#include -#include "PubSubOptions.h" -#include "net/ClientMessageQueue.h" -#include "net/Message.h" -#include "net/NetworkOutgoingQueue.h" -#include "net/NetworkPing.h" -#include "net/WireConnection.h" -#include "net/WireDecoder.h" -#include "net/WireEncoder.h" -#include "net3/Message3.h" -#include "net3/SequenceNumber.h" -#include "net3/WireConnection3.h" -#include "net3/WireDecoder3.h" +#include "server/Functions.h" +#include "server/ServerClient.h" +#include "server/ServerTopic.h" namespace wpi { class Logger; @@ -42,8 +28,9 @@ class raw_ostream; } // namespace wpi namespace nt::net { -struct ClientMessage; +class ClientMessageQueue; class LocalInterface; +class ServerMessageHandler; class WireConnection; } // namespace nt::net @@ -53,12 +40,20 @@ class WireConnection3; namespace nt::server { +class ServerClient; +class ServerClient3; +class ServerClient4; +class ServerClient4Base; +class ServerClientLocal; + class ServerImpl final { - public: - using SetPeriodicFunc = std::function; - using Connected3Func = - std::function; + friend class ServerClient; + friend class ServerClient3; + friend class ServerClient4; + friend class ServerClient4Base; + friend class ServerClientLocal; + public: explicit ServerImpl(wpi::Logger& logger); void SendAllOutgoing(uint64_t curTimeMs, bool flush); @@ -95,423 +90,39 @@ class ServerImpl final { std::string LoadPersistent(std::string_view in); private: - static constexpr uint32_t kMinPeriodMs = 5; - - class ClientData; - struct PublisherData; - struct SubscriberData; - - struct TopicData { - TopicData(wpi::Logger& logger, std::string_view name, - std::string_view typeStr) - : m_logger{logger}, name{name}, typeStr{typeStr} {} - TopicData(wpi::Logger& logger, std::string_view name, - std::string_view typeStr, wpi::json properties) - : m_logger{logger}, - name{name}, - typeStr{typeStr}, - properties(std::move(properties)) { - RefreshProperties(); - } - - bool IsPublished() const { - return persistent || retained || publisherCount != 0; - } - - // returns true if properties changed - bool SetProperties(const wpi::json& update); - void RefreshProperties(); - bool SetFlags(unsigned int flags_); - - wpi::Logger& m_logger; // Must be m_logger for WARN macro to work - std::string name; - unsigned int id; - Value lastValue; - ClientData* lastValueClient = nullptr; - std::string typeStr; - wpi::json properties = wpi::json::object(); - unsigned int publisherCount{0}; - bool persistent{false}; - bool retained{false}; - bool cached{true}; - bool special{false}; - int localTopic{0}; - - void AddPublisher(ClientData* client, PublisherData* pub) { - if (clients[client].publishers.insert(pub).second) { - ++publisherCount; - } - } - - void RemovePublisher(ClientData* client, PublisherData* pub) { - if (clients[client].publishers.erase(pub)) { - --publisherCount; - } - } - - struct TopicClientData { - wpi::SmallPtrSet publishers; - wpi::SmallPtrSet subscribers; - net::ValueSendMode sendMode = net::ValueSendMode::kDisabled; - - bool AddSubscriber(SubscriberData* sub) { - bool added = subscribers.insert(sub).second; - if (!sub->options.topicsOnly) { - if (sub->options.sendAll) { - sendMode = net::ValueSendMode::kAll; - } else if (sendMode == net::ValueSendMode::kDisabled) { - sendMode = net::ValueSendMode::kNormal; - } - } - return added; - } - }; - wpi::SmallDenseMap clients; - - // meta topics - TopicData* metaPub = nullptr; - TopicData* metaSub = nullptr; - }; - - class ClientData { - public: - ClientData(std::string_view name, std::string_view connInfo, bool local, - ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server, - int id, wpi::Logger& logger) - : m_name{name}, - m_connInfo{connInfo}, - m_local{local}, - m_setPeriodic{std::move(setPeriodic)}, - m_server{server}, - m_id{id}, - m_logger{logger} {} - virtual ~ClientData() = default; - - // these return true if any messages have been queued for later processing - virtual bool ProcessIncomingText(std::string_view data) = 0; - virtual bool ProcessIncomingBinary(std::span data) = 0; - - virtual void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) = 0; - virtual void SendAnnounce(TopicData* topic, std::optional pubuid) = 0; - virtual void SendUnannounce(TopicData* topic) = 0; - virtual void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) = 0; - virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0; - virtual void Flush() = 0; - - // later processing -- returns true if more to process - virtual bool ProcessIncomingMessages(size_t max) = 0; - - void UpdateMetaClientPub(); - void UpdateMetaClientSub(); - - std::span GetSubscribers( - std::string_view name, bool special, - wpi::SmallVectorImpl& buf); - - std::string_view GetName() const { return m_name; } - int GetId() const { return m_id; } - - virtual void UpdatePeriod(TopicData::TopicClientData& tcd, - TopicData* topic) {} - - protected: - std::string m_name; - std::string m_connInfo; - bool m_local; // local to machine - ServerImpl::SetPeriodicFunc m_setPeriodic; - // TODO: make this per-topic? - uint32_t m_periodMs{UINT32_MAX}; - ServerImpl& m_server; - int m_id; - - wpi::Logger& m_logger; - - wpi::DenseMap> m_publishers; - wpi::DenseMap> m_subscribers; - - public: - // meta topics - TopicData* m_metaPub = nullptr; - TopicData* m_metaSub = nullptr; - }; - - class ClientData4Base : public ClientData, - protected net::ClientMessageHandler { - public: - ClientData4Base(std::string_view name, std::string_view connInfo, - bool local, ServerImpl::SetPeriodicFunc setPeriodic, - ServerImpl& server, int id, wpi::Logger& logger) - : ClientData{name, connInfo, local, setPeriodic, server, id, logger} {} - - protected: - // ClientMessageHandler interface - void ClientPublish(int pubuid, std::string_view name, - std::string_view typeStr, const wpi::json& properties, - const PubSubOptionsImpl& options) final; - void ClientUnpublish(int pubuid) final; - void ClientSetProperties(std::string_view name, - const wpi::json& update) final; - void ClientSubscribe(int subuid, std::span topicNames, - const PubSubOptionsImpl& options) final; - void ClientUnsubscribe(int subuid) final; - - void ClientSetValue(int pubuid, const Value& value) final; - - bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max); - - wpi::DenseMap m_announceSent; - - private: - std::array m_msgsBuf; - }; - - class ClientDataLocal final : public ClientData4Base { - public: - ClientDataLocal(ServerImpl& server, int id, wpi::Logger& logger) - : ClientData4Base{"", "", true, [](uint32_t) {}, server, id, logger} {} - - bool ProcessIncomingText(std::string_view data) final { return false; } - bool ProcessIncomingBinary(std::span data) final { - return false; - } - - bool ProcessIncomingMessages(size_t max) final { - if (!m_queue) { - return false; - } - return DoProcessIncomingMessages(*m_queue, max); - } - - void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) final; - void SendAnnounce(TopicData* topic, std::optional pubuid) final; - void SendUnannounce(TopicData* topic) final; - void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) final; - void SendOutgoing(uint64_t curTimeMs, bool flush) final {} - void Flush() final {} - - void SetQueue(net::ClientMessageQueue* queue) { m_queue = queue; } - - private: - net::ClientMessageQueue* m_queue = nullptr; - }; - - class ClientData4 final : public ClientData4Base { - public: - ClientData4(std::string_view name, std::string_view connInfo, bool local, - net::WireConnection& wire, - ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server, - int id, wpi::Logger& logger) - : ClientData4Base{name, connInfo, local, setPeriodic, - server, id, logger}, - m_wire{wire}, - m_ping{wire}, - m_incoming{logger}, - m_outgoing{wire, local} {} - - bool ProcessIncomingText(std::string_view data) final; - bool ProcessIncomingBinary(std::span data) final; - - bool ProcessIncomingMessages(size_t max) final { - if (!DoProcessIncomingMessages(m_incoming, max)) { - m_wire.StartRead(); - return false; - } - return true; - } - - void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) final; - void SendAnnounce(TopicData* topic, std::optional pubuid) final; - void SendUnannounce(TopicData* topic) final; - void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) final; - void SendOutgoing(uint64_t curTimeMs, bool flush) final; - - void Flush() final {} - - void UpdatePeriod(TopicData::TopicClientData& tcd, TopicData* topic) final; - - public: - net::WireConnection& m_wire; - - private: - net::NetworkPing m_ping; - net::NetworkIncomingClientQueue m_incoming; - net::NetworkOutgoingQueue m_outgoing; - }; - - class ClientData3 final : public ClientData, private net3::MessageHandler3 { - public: - ClientData3(std::string_view connInfo, bool local, - net3::WireConnection3& wire, - ServerImpl::Connected3Func connected, - ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server, - int id, wpi::Logger& logger) - : ClientData{"", connInfo, local, setPeriodic, server, id, logger}, - m_connected{std::move(connected)}, - m_wire{wire}, - m_decoder{*this}, - m_incoming{logger} {} - - bool ProcessIncomingText(std::string_view data) final { return false; } - bool ProcessIncomingBinary(std::span data) final; - - bool ProcessIncomingMessages(size_t max) final { return false; } - - void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) final; - void SendAnnounce(TopicData* topic, std::optional pubuid) final; - void SendUnannounce(TopicData* topic) final; - void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) final; - void SendOutgoing(uint64_t curTimeMs, bool flush) final; - - void Flush() final { m_wire.Flush(); } - - private: - // MessageHandler3 interface - void KeepAlive() final; - void ServerHelloDone() final; - void ClientHelloDone() final; - void ClearEntries() final; - void ProtoUnsup(unsigned int proto_rev) final; - void ClientHello(std::string_view self_id, unsigned int proto_rev) final; - void ServerHello(unsigned int flags, std::string_view self_id) final; - void EntryAssign(std::string_view name, unsigned int id, - unsigned int seq_num, const Value& value, - unsigned int flags) final; - void EntryUpdate(unsigned int id, unsigned int seq_num, - const Value& value) final; - void FlagsUpdate(unsigned int id, unsigned int flags) final; - void EntryDelete(unsigned int id) final; - void ExecuteRpc(unsigned int id, unsigned int uid, - std::span params) final {} - void RpcResponse(unsigned int id, unsigned int uid, - std::span result) final {} - - ServerImpl::Connected3Func m_connected; - net3::WireConnection3& m_wire; - - enum State { kStateInitial, kStateServerHelloComplete, kStateRunning }; - State m_state{kStateInitial}; - net3::WireDecoder3 m_decoder; - - net::NetworkIncomingClientQueue m_incoming; - std::vector m_outgoing; - wpi::DenseMap m_outgoingValueMap; - int64_t m_nextPubUid{1}; - uint64_t m_lastSendMs{0}; - - struct TopicData3 { - explicit TopicData3(TopicData* topic) { UpdateFlags(topic); } - - unsigned int flags{0}; - net3::SequenceNumber seqNum; - bool sentAssign{false}; - bool published{false}; - int64_t pubuid{0}; - - bool UpdateFlags(TopicData* topic); - }; - wpi::DenseMap m_topics3; - TopicData3* GetTopic3(TopicData* topic) { - return &m_topics3.try_emplace(topic, topic).first->second; - } - }; - - struct PublisherData { - PublisherData(ClientData* client, TopicData* topic, int64_t pubuid) - : client{client}, topic{topic}, pubuid{pubuid} { - UpdateMeta(); - } - - void UpdateMeta(); - - ClientData* client; - TopicData* topic; - int64_t pubuid; - std::vector metaClient; - std::vector metaTopic; - }; - - struct SubscriberData { - SubscriberData(ClientData* client, std::span topicNames, - int64_t subuid, const PubSubOptionsImpl& options) - : client{client}, - topicNames{topicNames.begin(), topicNames.end()}, - subuid{subuid}, - options{options}, - periodMs(std::lround(options.periodicMs / 10.0) * 10) { - UpdateMeta(); - if (periodMs < kMinPeriodMs) { - periodMs = kMinPeriodMs; - } - } - - void Update(std::span topicNames_, - const PubSubOptionsImpl& options_) { - topicNames = {topicNames_.begin(), topicNames_.end()}; - options = options_; - UpdateMeta(); - periodMs = std::lround(options_.periodicMs / 10.0) * 10; - if (periodMs < kMinPeriodMs) { - periodMs = kMinPeriodMs; - } - } - - bool Matches(std::string_view name, bool special); - - void UpdateMeta(); - - ClientData* client; - std::vector topicNames; - int64_t subuid; - PubSubOptionsImpl options; - std::vector metaClient; - std::vector metaTopic; - wpi::DenseMap topics; - // in options as double, but copy here as integer; rounded to the nearest - // 10 ms - uint32_t periodMs; - }; - wpi::Logger& m_logger; net::ServerMessageHandler* m_local{nullptr}; bool m_controlReady{false}; - ClientDataLocal* m_localClient; - std::vector> m_clients; - wpi::UidVector, 16> m_topics; - wpi::StringMap m_nameTopics; + ServerClientLocal* m_localClient; + std::vector> m_clients; + wpi::UidVector, 16> m_topics; + wpi::StringMap m_nameTopics; bool m_persistentChanged{false}; // global meta topics (other meta topics are linked to from the specific // client or topic) - TopicData* m_metaClients; + ServerTopic* m_metaClients; void DumpPersistent(wpi::raw_ostream& os); // helper functions - TopicData* CreateTopic(ClientData* client, std::string_view name, - std::string_view typeStr, const wpi::json& properties, - bool special = false); - TopicData* CreateMetaTopic(std::string_view name); - void DeleteTopic(TopicData* topic); - void SetProperties(ClientData* client, TopicData* topic, + ServerTopic* CreateTopic(ServerClient* client, std::string_view name, + std::string_view typeStr, + const wpi::json& properties, bool special = false); + ServerTopic* CreateMetaTopic(std::string_view name); + void DeleteTopic(ServerTopic* topic); + void SetProperties(ServerClient* client, ServerTopic* topic, const wpi::json& update); - void SetFlags(ClientData* client, TopicData* topic, unsigned int flags); - void SetValue(ClientData* client, TopicData* topic, const Value& value); + void SetFlags(ServerClient* client, ServerTopic* topic, unsigned int flags); + void SetValue(ServerClient* client, ServerTopic* topic, const Value& value); // update meta topic values from data structures void UpdateMetaClients(const std::vector& conns); - void UpdateMetaTopicPub(TopicData* topic); - void UpdateMetaTopicSub(TopicData* topic); + void UpdateMetaTopicPub(ServerTopic* topic); + void UpdateMetaTopicSub(ServerTopic* topic); - void PropertiesChanged(ClientData* client, TopicData* topic, + void PropertiesChanged(ServerClient* client, ServerTopic* topic, const wpi::json& update); }; diff --git a/ntcore/src/main/native/cpp/server/ServerPublisher.cpp b/ntcore/src/main/native/cpp/server/ServerPublisher.cpp new file mode 100644 index 00000000000..b041180d582 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerPublisher.cpp @@ -0,0 +1,45 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerPublisher.h" + +#include + +#include "server/MessagePackWriter.h" +#include "server/ServerClient.h" +#include "server/ServerTopic.h" + +using namespace nt::server; +using namespace mpack; + +void ServerPublisher::UpdateMeta() { + { + Writer w; + mpack_start_map(&w, 2); + mpack_write_str(&w, "uid"); + mpack_write_int(&w, pubuid); + mpack_write_str(&w, "topic"); + mpack_write_str(&w, topic->name); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + metaClient = std::move(w.bytes); + } + } + { + Writer w; + mpack_start_map(&w, 2); + mpack_write_str(&w, "client"); + if (client) { + mpack_write_str(&w, client->GetName()); + } else { + mpack_write_str(&w, ""); + } + mpack_write_str(&w, "pubuid"); + mpack_write_int(&w, pubuid); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + metaTopic = std::move(w.bytes); + } + } +} diff --git a/ntcore/src/main/native/cpp/server/ServerPublisher.h b/ntcore/src/main/native/cpp/server/ServerPublisher.h new file mode 100644 index 00000000000..dbd518c75af --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerPublisher.h @@ -0,0 +1,31 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include + +namespace nt::server { + +class ServerClient; +struct ServerTopic; + +struct ServerPublisher { + ServerPublisher(ServerClient* client, ServerTopic* topic, int64_t pubuid) + : client{client}, topic{topic}, pubuid{pubuid} { + UpdateMeta(); + } + + void UpdateMeta(); + + ServerClient* client; + ServerTopic* topic; + int64_t pubuid; + std::vector metaClient; + std::vector metaTopic; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerSubscriber.cpp b/ntcore/src/main/native/cpp/server/ServerSubscriber.cpp new file mode 100644 index 00000000000..a9d69ccbfaa --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerSubscriber.cpp @@ -0,0 +1,91 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerSubscriber.h" + +#include +#include + +#include "PubSubOptions.h" +#include "server/MessagePackWriter.h" +#include "server/ServerClient.h" + +using namespace nt; +using namespace nt::server; +using namespace mpack; + +static void WriteOptions(mpack_writer_t& w, const PubSubOptionsImpl& options) { + int size = + (options.sendAll ? 1 : 0) + (options.topicsOnly ? 1 : 0) + + (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs ? 1 : 0) + + (options.prefixMatch ? 1 : 0); + mpack_start_map(&w, size); + if (options.sendAll) { + mpack_write_str(&w, "all"); + mpack_write_bool(&w, true); + } + if (options.topicsOnly) { + mpack_write_str(&w, "topicsonly"); + mpack_write_bool(&w, true); + } + if (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs) { + mpack_write_str(&w, "periodic"); + mpack_write_float(&w, options.periodicMs / 1000.0); + } + if (options.prefixMatch) { + mpack_write_str(&w, "prefix"); + mpack_write_bool(&w, true); + } + mpack_finish_map(&w); +} + +bool ServerSubscriber::Matches(std::string_view name, bool special) { + for (auto&& topicName : topicNames) { + if ((!options.prefixMatch && name == topicName) || + (options.prefixMatch && (!special || !topicName.empty()) && + wpi::starts_with(name, topicName))) { + return true; + } + } + return false; +} + +void ServerSubscriber::UpdateMeta() { + { + Writer w; + mpack_start_map(&w, 3); + mpack_write_str(&w, "uid"); + mpack_write_int(&w, subuid); + mpack_write_str(&w, "topics"); + mpack_start_array(&w, topicNames.size()); + for (auto&& name : topicNames) { + mpack_write_str(&w, name); + } + mpack_finish_array(&w); + mpack_write_str(&w, "options"); + WriteOptions(w, options); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + metaClient = std::move(w.bytes); + } + } + { + Writer w; + mpack_start_map(&w, 3); + mpack_write_str(&w, "client"); + if (client) { + mpack_write_str(&w, client->GetName()); + } else { + mpack_write_str(&w, ""); + } + mpack_write_str(&w, "subuid"); + mpack_write_int(&w, subuid); + mpack_write_str(&w, "options"); + WriteOptions(w, options); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + metaTopic = std::move(w.bytes); + } + } +} diff --git a/ntcore/src/main/native/cpp/server/ServerSubscriber.h b/ntcore/src/main/native/cpp/server/ServerSubscriber.h new file mode 100644 index 00000000000..7b213af2185 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerSubscriber.h @@ -0,0 +1,66 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include +#include +#include + +#include + +#include "PubSubOptions.h" +#include "server/Constants.h" + +namespace nt::server { + +class ServerClient; +struct ServerTopic; + +struct ServerSubscriber { + ServerSubscriber(ServerClient* client, + std::span topicNames, int64_t subuid, + const PubSubOptionsImpl& options) + : client{client}, + topicNames{topicNames.begin(), topicNames.end()}, + subuid{subuid}, + options{options}, + periodMs(std::lround(options.periodicMs / 10.0) * 10) { + UpdateMeta(); + if (periodMs < kMinPeriodMs) { + periodMs = kMinPeriodMs; + } + } + + void Update(std::span topicNames_, + const PubSubOptionsImpl& options_) { + topicNames = {topicNames_.begin(), topicNames_.end()}; + options = options_; + UpdateMeta(); + periodMs = std::lround(options_.periodicMs / 10.0) * 10; + if (periodMs < kMinPeriodMs) { + periodMs = kMinPeriodMs; + } + } + + bool Matches(std::string_view name, bool special); + + void UpdateMeta(); + + ServerClient* client; + std::vector topicNames; + int64_t subuid; + PubSubOptionsImpl options; + std::vector metaClient; + std::vector metaTopic; + wpi::DenseMap topics; + // in options as double, but copy here as integer; rounded to the nearest + // 10 ms + uint32_t periodMs; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerTopic.cpp b/ntcore/src/main/native/cpp/server/ServerTopic.cpp new file mode 100644 index 00000000000..826a49e9c24 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerTopic.cpp @@ -0,0 +1,103 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerTopic.h" + +#include "Log.h" + +using namespace nt::server; + +bool ServerTopic::SetProperties(const wpi::json& update) { + if (!update.is_object()) { + return false; + } + bool updated = false; + for (auto&& elem : update.items()) { + if (elem.value().is_null()) { + properties.erase(elem.key()); + } else { + properties[elem.key()] = elem.value(); + } + updated = true; + } + if (updated) { + RefreshProperties(); + } + return updated; +} + +void ServerTopic::RefreshProperties() { + persistent = false; + retained = false; + cached = true; + + auto persistentIt = properties.find("persistent"); + if (persistentIt != properties.end()) { + if (auto val = persistentIt->get_ptr()) { + persistent = *val; + } + } + + auto retainedIt = properties.find("retained"); + if (retainedIt != properties.end()) { + if (auto val = retainedIt->get_ptr()) { + retained = *val; + } + } + + auto cachedIt = properties.find("cached"); + if (cachedIt != properties.end()) { + if (auto val = cachedIt->get_ptr()) { + cached = *val; + } + } + + if (!cached) { + lastValue = {}; + lastValueClient = nullptr; + } + + if (!cached && persistent) { + WARN("topic {}: disabling cached property disables persistent storage", + name); + } +} + +bool ServerTopic::SetFlags(unsigned int flags_) { + bool updated; + if ((flags_ & NT_PERSISTENT) != 0) { + updated = !persistent; + persistent = true; + properties["persistent"] = true; + } else { + updated = persistent; + persistent = false; + properties.erase("persistent"); + } + if ((flags_ & NT_RETAINED) != 0) { + updated |= !retained; + retained = true; + properties["retained"] = true; + } else { + updated |= retained; + retained = false; + properties.erase("retained"); + } + if ((flags_ & NT_UNCACHED) != 0) { + updated |= cached; + cached = false; + properties["cached"] = false; + lastValue = {}; + lastValueClient = nullptr; + } else { + updated |= !cached; + cached = true; + properties.erase("cached"); + } + if (!cached && persistent) { + WARN("topic {}: disabling cached property disables persistent storage", + name); + } + return updated; +} diff --git a/ntcore/src/main/native/cpp/server/ServerTopic.h b/ntcore/src/main/native/cpp/server/ServerTopic.h new file mode 100644 index 00000000000..dc8a7058b99 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerTopic.h @@ -0,0 +1,100 @@ +// Copyright(c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include +#include + +#include "net/NetworkOutgoingQueue.h" +#include "networktables/NetworkTableValue.h" +#include "server/ServerSubscriber.h" + +namespace wpi { +class Logger; +} // namespace wpi + +namespace nt::server { + +class ServerClient; +struct ServerPublisher; +struct ServerSubscriber; + +struct TopicClientData { + wpi::SmallPtrSet publishers; + wpi::SmallPtrSet subscribers; + net::ValueSendMode sendMode = net::ValueSendMode::kDisabled; + + bool AddSubscriber(ServerSubscriber* sub) { + bool added = subscribers.insert(sub).second; + if (!sub->options.topicsOnly) { + if (sub->options.sendAll) { + sendMode = net::ValueSendMode::kAll; + } else if (sendMode == net::ValueSendMode::kDisabled) { + sendMode = net::ValueSendMode::kNormal; + } + } + return added; + } +}; + +struct ServerTopic { + ServerTopic(wpi::Logger& logger, std::string_view name, + std::string_view typeStr) + : m_logger{logger}, name{name}, typeStr{typeStr} {} + ServerTopic(wpi::Logger& logger, std::string_view name, + std::string_view typeStr, wpi::json properties) + : m_logger{logger}, + name{name}, + typeStr{typeStr}, + properties(std::move(properties)) { + RefreshProperties(); + } + + bool IsPublished() const { + return persistent || retained || publisherCount != 0; + } + + // returns true if properties changed + bool SetProperties(const wpi::json& update); + void RefreshProperties(); + bool SetFlags(unsigned int flags_); + + wpi::Logger& m_logger; // Must be m_logger for WARN macro to work + std::string name; + unsigned int id; + Value lastValue; + ServerClient* lastValueClient = nullptr; + std::string typeStr; + wpi::json properties = wpi::json::object(); + unsigned int publisherCount{0}; + bool persistent{false}; + bool retained{false}; + bool cached{true}; + bool special{false}; + int localTopic{0}; + + void AddPublisher(ServerClient* client, ServerPublisher* pub) { + if (clients[client].publishers.insert(pub).second) { + ++publisherCount; + } + } + + void RemovePublisher(ServerClient* client, ServerPublisher* pub) { + if (clients[client].publishers.erase(pub)) { + --publisherCount; + } + } + + wpi::SmallDenseMap clients; + + // meta topics + ServerTopic* metaPub = nullptr; + ServerTopic* metaSub = nullptr; +}; + +} // namespace nt::server