Skip to content

Commit

Permalink
[METADATA] Expose all COMRPC metadata like Channels and Proxies (also…
Browse files Browse the repository at this point in the history
… the private channels)
  • Loading branch information
pwielders committed Oct 11, 2024
1 parent 0161d8e commit 03955fb
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 31 deletions.
8 changes: 6 additions & 2 deletions Source/Thunder/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,12 +1216,12 @@ namespace Plugin {
return (Core::ERROR_NONE);
}

Core::hresult Controller::Proxies(const uint32_t linkId, IMetadata::Data::IProxiesIterator*& outProxies) const
Core::hresult Controller::Proxies(const Core::OptionalType<string>& linkId, IMetadata::Data::IProxiesIterator*& outProxies) const
{
Core::hresult result = Core::ERROR_UNKNOWN_KEY;

std::vector<IMetadata::Data::Proxy> collection;
bool proxySearch = RPC::Administrator::Instance().Allocations(linkId, [&collection](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
bool proxySearch = RPC::Administrator::Instance().Allocations(linkId.IsSet() ? linkId.Value() : EMPTY_STRING, [&collection](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
for (const auto& proxy : proxies) {
IMetadata::Data::Proxy data;
data.Count = proxy->ReferenceCount();
Expand All @@ -1232,6 +1232,8 @@ namespace Plugin {
}
});

TRACE(Trace::Information, (_T("Found %d proxies to be listed and the search = [%s]"), collection.size(), proxySearch ? _T("true") : _T("false")));

if (proxySearch == true) {
using Iterator = IMetadata::Data::IProxiesIterator;

Expand Down Expand Up @@ -1440,6 +1442,8 @@ namespace Plugin {
buildInfo.ThreadPoolCount = THREADPOOL_COUNT;
#endif

buildInfo.COMRPCTimeOut = RPC::CommunicationTimeOut;

return (Core::ERROR_NONE);
}
}
Expand Down
2 changes: 1 addition & 1 deletion Source/Thunder/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ namespace Plugin {

// IMetadata overrides
Core::hresult Links(IMetadata::Data::ILinksIterator*& links) const override;
Core::hresult Proxies(const uint32_t linkId, IMetadata::Data::IProxiesIterator*& proxies) const override;
Core::hresult Proxies(const Core::OptionalType<string>& linkId, IMetadata::Data::IProxiesIterator*& proxies) const override;
Core::hresult Services(const Core::OptionalType<string>& callsign, IMetadata::Data::IServicesIterator*& services) const override;
Core::hresult CallStack(const uint8_t threadId, IMetadata::Data::ICallStackIterator*& callstack) const override;
Core::hresult Threads(IMetadata::Data::IThreadsIterator*& threads) const override;
Expand Down
2 changes: 1 addition & 1 deletion Source/Thunder/PluginHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ POP_WARNING()
printf("Link: %s\n", index.Current().Remote.Value().c_str());
printf("------------------------------------------------------------\n");

RPC::Administrator::Instance().Allocations(index.Current().ID.Value(), [](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
RPC::Administrator::Instance().Allocations(index.Current().Name.Value(), [](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
for (const auto& proxy: proxies) {
Core::instance_id instanceId = proxy->Implementation();
printf("[%s] InstanceId: 0x%" PRIx64 ", RefCount: %d, InterfaceId %d [0x%X]\n", proxy->Name().c_str(), static_cast<uint64_t>(instanceId), proxy->ReferenceCount(), proxy->InterfaceId(), proxy->InterfaceId());
Expand Down
30 changes: 24 additions & 6 deletions Source/Thunder/PluginServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,11 @@ namespace PluginHost {
ExternalAccess& operator=(const ExternalAccess&) = delete;

ExternalAccess(
const Core::NodeId& source,
const string& sourceName,
const Core::NodeId& sourceNode,
const string& proxyStubPath,
const Core::ProxyType<RPC::InvokeServer>& handler)
: RPC::Communicator(source, proxyStubPath, Core::ProxyType<Core::IIPCServer>(handler))
: RPC::Communicator(sourceName, sourceNode, proxyStubPath, Core::ProxyType<Core::IIPCServer>(handler))
, _plugin(nullptr) {
}
~ExternalAccess() override = default;
Expand Down Expand Up @@ -820,7 +821,7 @@ namespace PluginHost {
, _lastId(0)
, _metadata(plugin.MaxRequests.Value())
, _library()
, _external(PluginNodeId(server, plugin), server.ProxyStubPath(), handler)
, _external('/' + Callsign(), PluginNodeId(server, plugin), server.ProxyStubPath(), handler)
, _administrator(administrator)
, _composit(*this)
, _jobs(administrator)
Expand Down Expand Up @@ -853,6 +854,9 @@ namespace PluginHost {
}

public:
inline const RPC::Communicator& COMServer() const {
return (_external);
}
inline void Submit(Core::ProxyType<Core::IDispatch>&& job) {
_jobs.Push(std::move(job));
}
Expand Down Expand Up @@ -2203,7 +2207,7 @@ namespace PluginHost {
const uint8_t hardKillCheckWaitTime,
const bool delegatedReleases,
const Core::ProxyType<RPC::InvokeServer>& handler)
: RPC::Communicator(node, ProxyStubPathCreator(proxyStubPath, observableProxyStubPath), Core::ProxyType<Core::IIPCServer>(handler))
: RPC::Communicator(_T("/"), node, ProxyStubPathCreator(proxyStubPath, observableProxyStubPath), Core::ProxyType<Core::IIPCServer>(handler))
, _parent(parent)
, _persistentPath(persistentPath)
, _systemPath(systemPath)
Expand Down Expand Up @@ -3191,10 +3195,24 @@ namespace PluginHost {

entry.Activity = element.Source().IsOpen();
entry.State = Metadata::Channel::state::COMRPC;
entry.Name = string("/" EXPAND_AND_QUOTE(APPLICATION_NAME) "/Communicator");
entry.Name = element.Extension().Origin();
entry.Remote = element.Source().RemoteId();
});
_adminLock.Unlock();

for (const auto& entry : _services) {
entry.second->COMServer().Visit([&](const RPC::Communicator::Client& element)
{
Metadata::Channel& entry = metaData.Add();
entry.ID = element.Extension().Id();

entry.Activity = element.Source().IsOpen();
entry.State = Metadata::Channel::state::COMRPC;
entry.Name = element.Extension().Origin();
entry.Remote = element.Source().RemoteId();
});
}

_adminLock.Unlock();
}
uint32_t FromIdentifier(const string& callSign, Core::ProxyType<IShell>& service)
{
Expand Down
4 changes: 4 additions & 0 deletions Source/com/Administrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ namespace RPC {
return(index != _stubs.end() ? index->second->Convert(rawImplementation) : nullptr);
}

bool Administrator::IsRequestedLink(const ProxyStub::UnknownProxy* proxy, const string& id) const {
return (proxy->LinkId() == id);
}

void Administrator::DeleteChannel(const Core::ProxyType<Core::IPCChannel>& channel, Proxies& pendingProxies)
{
_adminLock.Lock();
Expand Down
13 changes: 10 additions & 3 deletions Source/com/Administrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,26 @@ namespace RPC {
}

template<typename ACTION>
bool Allocations(const uint32_t id, ACTION&& action) const {
bool Allocations(const string& linkId, ACTION&& action) const {
bool found = false;
_adminLock.Lock();
if (id == 0) {
if (linkId.empty() == true) {
for (const auto& proxy : _channelProxyMap) {
action(proxy.second);
}
action(_danglingProxies);
found = true;
}
else if (linkId == _T("/Dangling")) {
action(_danglingProxies);
found = true;
}
else {
ChannelMap::const_iterator index(_channelProxyMap.begin());
while ((found == false) && (index != _channelProxyMap.end())) {
if (index->first != id) {
ASSERT(index->second.size() != 0);

if (IsRequestedLink(index->second[0], linkId) == true) {
index++;
}
else {
Expand Down Expand Up @@ -305,6 +311,7 @@ POP_WARNING()
// ----------------------------------------------------------------------------------------------------
// Methods for the Stub Environment
// ----------------------------------------------------------------------------------------------------
bool IsRequestedLink(const ProxyStub::UnknownProxy* proxy, const string& id) const;
Core::IUnknown* Convert(void* rawImplementation, const uint32_t id);
const Core::IUnknown* Convert(void* rawImplementation, const uint32_t id) const;
void RegisterUnknown(const Core::ProxyType<Core::IPCChannel>& channel, Core::IUnknown* source, const uint32_t id);
Expand Down
9 changes: 6 additions & 3 deletions Source/com/Communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ namespace RPC {
uint8_t Communicator::_hardKillCheckWaitTime = 4;

PUSH_WARNING(DISABLE_WARNING_THIS_IN_MEMBER_INITIALIZER_LIST)
Communicator::Communicator(const Core::NodeId& node, const string& proxyStubPath)
: _connectionMap(*this)
Communicator::Communicator(const string& source, const Core::NodeId& node, const string& proxyStubPath)
: _source(source)
, _connectionMap(*this)
, _ipcServer(node, _connectionMap, proxyStubPath) {
if (proxyStubPath.empty() == false) {
RPC::LoadProxyStubs(proxyStubPath);
Expand All @@ -374,10 +375,12 @@ namespace RPC {
}

Communicator::Communicator(
const string& source,
const Core::NodeId& node,
const string& proxyStubPath,
const Core::ProxyType<Core::IIPCServer>& handler)
: _connectionMap(*this)
: _source(source)
, _connectionMap(*this)
, _ipcServer(node, _connectionMap, proxyStubPath, handler) {
if (proxyStubPath.empty() == false) {
RPC::LoadProxyStubs(proxyStubPath);
Expand Down
12 changes: 12 additions & 0 deletions Source/com/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,9 @@ namespace RPC {
{
return _id;
}
const string& Origin() const {
return (_connectionMap->Origin());
}

private:
// Non ref-counted reference to our parent, of which we are a composit :-)
Expand Down Expand Up @@ -1187,6 +1190,9 @@ namespace RPC {
}

public:
const string& Origin() const {
return (_parent.Origin());
}
inline void Register(RPC::IRemoteConnection::INotification* sink)
{
ASSERT(sink != nullptr);
Expand Down Expand Up @@ -1650,9 +1656,11 @@ POP_WARNING()
Communicator& operator=(const Communicator&) = delete;

Communicator(
const string& source,
const Core::NodeId& node,
const string& proxyStubPath);
Communicator(
const string& source,
const Core::NodeId& node,
const string& proxyStubPath,
const Core::ProxyType<Core::IIPCServer>& handler);
Expand All @@ -1669,6 +1677,9 @@ POP_WARNING()
void Visit(ACTION&& action) const {
_ipcServer.Visit(action);
}
const string& Origin() const {
return (_source);
}
inline bool IsListening() const
{
return (_ipcServer.IsListening());
Expand Down Expand Up @@ -1753,6 +1764,7 @@ POP_WARNING()
}

private:
const string _source;
RemoteConnectionMap _connectionMap;
ChannelServer _ipcServer;
static uint8_t _softKillCheckWaitTime;
Expand Down
23 changes: 18 additions & 5 deletions Source/com/IUnknown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,33 @@ namespace ProxyStub {
// -------------------------------------------------------------------------------------------
// PROXY
// -------------------------------------------------------------------------------------------
const Core::SocketPort* UnknownProxy::Socket() const
string UnknownProxy::LinkId() const
{
const Core::SocketPort* result = nullptr;

_adminLock.Lock();
if (_channel.IsValid() == true) {
const RPC::Communicator::Client* comchannel = dynamic_cast<const RPC::Communicator::Client*>(_channel.operator->());
ASSERT(comChannel != nullptr);
if (comchannel != nullptr) {
result = &(comchannel->Source());
return (comchannel->Extension().Origin());
}
}
_adminLock.Unlock();

return (result);
return (_T("/Dangling"));
}
uint32_t UnknownProxy::Id() const
{
uint32_t id = 0;
if (_channel.IsValid() == true) {
const RPC::Communicator::Client* comchannel = dynamic_cast<const RPC::Communicator::Client*>(_channel.operator->());
ASSERT(comChannel != nullptr);
if (comchannel != nullptr) {
const Core::IResource* result = &(comchannel->Source());
ASSERT(result != nullptr);
id = result->Descriptor();
}
}
return (id);
}

static class UnknownInstantiation {
Expand Down
9 changes: 3 additions & 6 deletions Source/com/IUnknown.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ namespace ProxyStub {
{
return (&_parent);
}
const Core::SocketPort* Socket() const;
string LinkId() const;
void* Interface(const Core::instance_id& implementation, const uint32_t id) const
{
void* result = nullptr;
Expand Down Expand Up @@ -426,16 +426,13 @@ namespace ProxyStub {

return (_parent.QueryInterface(id));
}
// The RPC::Administrator uses this to identifiy to what link this
// The RPC::Administrator uses this to identifiy to what link this
// proxy belongs. The LinkId is always called within the lock of the
// RPC::Administrator, and since it is onl used and called from there
// and the clearing of the _channel is also only called from there,
// Invalidate(), It is safe to use it on the _channel in an unlocked
// fashion!!
uint32_t Id() const
{
return (_channel.IsValid() ? _channel->Id() : 0);
}
uint32_t Id() const;
void Invalidate() {
ASSERT(_refCount > 0);
_adminLock.Lock();
Expand Down
9 changes: 5 additions & 4 deletions Source/plugins/IController.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ namespace Controller {
uint8_t InstanceIDBits /* @brief Core instance bits */;
Core::OptionalType<uint8_t> TraceLevel /* @brief Trace level */;
uint8_t ThreadPoolCount /* Number of configured threads on the threadpool */;
uint32_t COMRPCTimeOut /* The number of milliseconds a COMRPC call can take before it is assumed to fail */;
};

struct CallStack {
Expand Down Expand Up @@ -294,9 +295,9 @@ namespace Controller {

string Remote /* @brief IP address (or FQDN) of the other side of the connection */;
state State /* @brief State of the link */;
Core::OptionalType<string> Name /* @brief Name of the connection */;
uint32_t Id /* @brief A unique number identifying the connection */;
bool Activity /* @brief Denotes if there was any activity on this connection */;
Core::OptionalType<string> Name /* @brief Name of the connection */;
};

struct Service {
Expand Down Expand Up @@ -353,13 +354,13 @@ namespace Controller {
// @details If callsign is omitted, metadata of all services is returned.
virtual Core::hresult Services(const Core::OptionalType<string>& callsign /* @index */, Data::IServicesIterator*& services /* @out @extract */) const = 0;

// @property
// @brief Connections list
// @property @deprecated
// @brief Connections list of Thunder connections
virtual Core::hresult Links(Data::ILinksIterator*& links /* @out */) const = 0;

// @property
// @brief Proxies list
virtual Core::hresult Proxies(const uint32_t linkID /* @index */, Data::IProxiesIterator*& proxies /* @out */) const = 0;
virtual Core::hresult Proxies(const Core::OptionalType<string>& linkID /* @index */, Data::IProxiesIterator*& proxies /* @out */) const = 0;

// @property
// @brief Framework version
Expand Down

0 comments on commit 03955fb

Please sign in to comment.