Skip to content

Commit

Permalink
[ntcore] Move ServerImpl detail classes to separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterJohnson committed Oct 16, 2024
1 parent cc5ce94 commit f086214
Show file tree
Hide file tree
Showing 21 changed files with 1,917 additions and 1,619 deletions.
13 changes: 13 additions & 0 deletions ntcore/src/main/native/cpp/server/Constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright(c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.

#pragma once

#include <stdint.h>

namespace nt::server {

inline constexpr uint32_t kMinPeriodMs = 5;

} // namespace nt::server
16 changes: 16 additions & 0 deletions ntcore/src/main/native/cpp/server/Functions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright(c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.

#pragma once

#include <functional>
#include <string_view>

namespace nt::server {

using SetPeriodicFunc = std::function<void(uint32_t repeatMs)>;
using Connected3Func =
std::function<void(std::string_view name, uint16_t proto)>;

} // namespace nt::server
31 changes: 31 additions & 0 deletions ntcore/src/main/native/cpp/server/MessagePackWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright(c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.

#pragma once

#include <stdint.h>

#include <vector>

#include <wpi/MessagePack.h>
#include <wpi/raw_ostream.h>

namespace nt::server {

struct Writer : public mpack::mpack_writer_t {
Writer() {
mpack::mpack_writer_init(this, buf, sizeof(buf));
mpack::mpack_writer_set_context(this, &os);
mpack::mpack_writer_set_flush(
this, [](mpack::mpack_writer_t* w, const char* buffer, size_t count) {
static_cast<wpi::raw_ostream*>(w->context)->write(buffer, count);
});
}

std::vector<uint8_t> bytes;
wpi::raw_uvector_ostream os{bytes};
char buf[128];
};

} // namespace nt::server
61 changes: 61 additions & 0 deletions ntcore/src/main/native/cpp/server/ServerClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.

#include "ServerClient.h"

#include <wpi/MessagePack.h>

#include "server/MessagePackWriter.h"
#include "server/ServerImpl.h"
#include "server/ServerPublisher.h"

using namespace nt::server;
using namespace mpack;

void ServerClient::UpdateMetaClientPub() {
if (!m_metaPub) {
return;
}
Writer w;
mpack_start_array(&w, m_publishers.size());
for (auto&& pub : m_publishers) {
mpack_write_object_bytes(
&w, reinterpret_cast<const char*>(pub.second->metaClient.data()),
pub.second->metaClient.size());
}
mpack_finish_array(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_server.SetValue(nullptr, m_metaPub, Value::MakeRaw(std::move(w.bytes)));
}
}

void ServerClient::UpdateMetaClientSub() {
if (!m_metaSub) {
return;
}
Writer w;
mpack_start_array(&w, m_subscribers.size());
for (auto&& sub : m_subscribers) {
mpack_write_object_bytes(
&w, reinterpret_cast<const char*>(sub.second->metaClient.data()),
sub.second->metaClient.size());
}
mpack_finish_array(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_server.SetValue(nullptr, m_metaSub, Value::MakeRaw(std::move(w.bytes)));
}
}

std::span<ServerSubscriber*> ServerClient::GetSubscribers(
std::string_view name, bool special,
wpi::SmallVectorImpl<ServerSubscriber*>& buf) {
buf.resize(0);
for (auto&& subPair : m_subscribers) {
ServerSubscriber* subscriber = subPair.getSecond().get();
if (subscriber->Matches(name, special)) {
buf.emplace_back(subscriber);
}
}
return {buf.data(), buf.size()};
}
95 changes: 95 additions & 0 deletions ntcore/src/main/native/cpp/server/ServerClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright(c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.

#pragma once

#include <stdint.h>

#include <optional>
#include <span>
#include <string_view>

#include <wpi/json_fwd.h>

#include "net/NetworkOutgoingQueue.h"
#include "server/Functions.h"
#include "server/ServerPublisher.h"
#include "server/ServerSubscriber.h"

namespace wpi {
class Logger;
template <typename T>
class SmallVectorImpl;
} // namespace wpi

namespace nt::server {

struct ServerTopic;
class ServerImpl;
struct TopicClientData;

class ServerClient {
public:
ServerClient(std::string_view name, std::string_view connInfo, bool local,
SetPeriodicFunc setPeriodic, ServerImpl& server, int id,
wpi::Logger& logger)
: m_name{name},
m_connInfo{connInfo},
m_local{local},
m_setPeriodic{std::move(setPeriodic)},
m_server{server},
m_id{id},
m_logger{logger} {}
virtual ~ServerClient() = default;

// these return true if any messages have been queued for later processing
virtual bool ProcessIncomingText(std::string_view data) = 0;
virtual bool ProcessIncomingBinary(std::span<const uint8_t> data) = 0;

virtual void SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) = 0;
virtual void SendAnnounce(ServerTopic* topic, std::optional<int> pubuid) = 0;
virtual void SendUnannounce(ServerTopic* topic) = 0;
virtual void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update,
bool ack) = 0;
virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0;
virtual void Flush() = 0;

// later processing -- returns true if more to process
virtual bool ProcessIncomingMessages(size_t max) = 0;

void UpdateMetaClientPub();
void UpdateMetaClientSub();

std::span<ServerSubscriber*> GetSubscribers(
std::string_view name, bool special,
wpi::SmallVectorImpl<ServerSubscriber*>& buf);

std::string_view GetName() const { return m_name; }
int GetId() const { return m_id; }

virtual void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) {}

protected:
std::string m_name;
std::string m_connInfo;
bool m_local; // local to machine
SetPeriodicFunc m_setPeriodic;
// TODO: make this per-topic?
uint32_t m_periodMs{UINT32_MAX};
ServerImpl& m_server;
int m_id;

wpi::Logger& m_logger;

wpi::DenseMap<int, std::unique_ptr<ServerPublisher>> m_publishers;
wpi::DenseMap<int, std::unique_ptr<ServerSubscriber>> m_subscribers;

public:
// meta topics
ServerTopic* m_metaPub = nullptr;
ServerTopic* m_metaSub = nullptr;
};

} // namespace nt::server
Loading

0 comments on commit f086214

Please sign in to comment.