Skip to content

Commit

Permalink
Support for BPS interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
wardru committed Aug 15, 2023
1 parent 4cab994 commit ffa2727
Show file tree
Hide file tree
Showing 24 changed files with 593 additions and 59 deletions.
2 changes: 1 addition & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
set(examples
psu
bps
)

foreach(example ${examples})
Expand Down
25 changes: 25 additions & 0 deletions examples/bps.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include <pza/core/client.hxx>
#include <pza/core/core.hxx>
#include <pza/devices/bps.hxx>

int main(void)
{
pza::core::set_log_level(pza::core::log_level::debug);

pza::client::ptr cli = std::make_shared<pza::client>("localhost", 1883);

if (cli->connect() == -1) {
return -1;
}

pza::bps::ptr bps = std::make_shared<pza::bps>("default", "MY BPS 1");

cli->register_device(bps);

bps->channel[0]->volts;

while (1)
;

return 0;
}
18 changes: 0 additions & 18 deletions examples/psu.cxx

This file was deleted.

12 changes: 12 additions & 0 deletions source/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ target_sources(${LIBRARY_NAME}
pza/core/core.cxx
pza/core/client.cxx
pza/core/device.cxx
pza/core/united_interface.cxx
pza/core/interface.cxx

pza/utils/json.cxx
pza/utils/string.cxx

pza/devices/bps.cxx

pza/interfaces/power_channel.cxx
pza/interfaces/ampermeter.cxx
pza/interfaces/voltmeter.cxx
pza/interfaces/bps_chan_ctrl.cxx
)

target_include_directories(${LIBRARY_NAME} PUBLIC
Expand Down
143 changes: 111 additions & 32 deletions source/pza/core/client.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ void client::message_arrived(mqtt::const_message_ptr msg)
spdlog::trace("message arrived on topic: {}", msg->get_topic());

if (_listeners.count(msg->get_topic()) > 0) {
spdlog::debug("Direct");
_listeners[msg->get_topic()](msg);
return;
}
Expand All @@ -178,65 +177,145 @@ void client::message_arrived(mqtt::const_message_ptr msg)

int client::scan_devices(void)
{
int ret = 0;
bool ret;
std::condition_variable cv;
std::unique_lock<std::mutex> lock(_mtx);

if (is_connected() == false) {
spdlog::error("scan failed. Not connected to {}", _addr);
return -1;
}

_scan_count_expected = 0;
_scan_results.clear();
_scan_device_count_expected = 0;
_scan_device_results.clear();

spdlog::debug("scanning for devices on {}...", _addr);

_subscribe("pza/+/+/device/atts/info", [&](const mqtt::const_message_ptr &msg) {
std::string base_topic = msg->get_topic().substr(0, msg->get_topic().find("/device/atts/info"));
spdlog::trace("received device info: {} {}", msg->get_topic(), msg->get_payload_str());
_scan_results.emplace(msg->get_topic(), msg->get_payload_str());
_cv.notify_all();
_scan_device_results.emplace(base_topic, msg->get_payload_str());
cv.notify_all();
});

_subscribe("pza/+/platforms/+/atts/info", [&](const mqtt::const_message_ptr &msg) {
std::string payload = msg->get_payload_str();
std::string topic = msg->get_topic();
unsigned int val;

spdlog::trace("received platform info: {}", payload);
try {
nlohmann::json j = nlohmann::json::parse(payload);

auto val = j["info"]["number_of_devices"];
if (val.is_number_unsigned())
_scan_count_expected += val.get<unsigned int>();
else
spdlog::error("invalid payload: {}", payload);
if (pza::json::get_unsigned_int(payload, "info", "number_of_devices", val) == -1) {
spdlog::error("failed to parse platform info: {}", payload);
return;
}
catch (const nlohmann::json::parse_error &exc) {
spdlog::error("failed to parse platform info: {}", exc.what());
}
_cv.notify_all();
_scan_device_count_expected += val;
cv.notify_all();
});

_publish("pza", "*");

if (_cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) {
return (_scan_count_expected && (_scan_count_expected == _scan_results.size()));
}) == false) {
ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) {
return (_scan_device_count_expected && (_scan_device_count_expected == _scan_device_results.size()));
});

_unsubscribe("pza/+/+/device/atts/info");
_unsubscribe("pza/+/platforms/+/atts/info");

if (ret == false) {
spdlog::error("timed out waiting for scan results");
spdlog::trace("_scan_count_expected = {}, got = {}", _scan_count_expected, _scan_results.size());
_scan_results.clear();
ret = -1;
spdlog::trace("_scan_device_count_expected = {}, got = {}", _scan_device_count_expected, _scan_device_results.size());
return -1;
}

_scan_count_expected = 0;
_unsubscribe("pza/+/+/device/atts/info");
_unsubscribe("pza/+/platforms/+/atts/info");
spdlog::debug("scan successful, found {} devices", _scan_device_results.size());

if (core::get_log_level() == core::log_level::trace) {
for (auto &it : _scan_device_results) {
spdlog::trace("device: {}", it.first);
}
}

return 0;
}

spdlog::debug("scan complete, found {} devices", _scan_results.size());
int client::_scan_interfaces(std::unique_lock<std::mutex> &lock, const device::ptr &device)
{
bool ret;
std::condition_variable cv;
std::string itf_topic = device->_get_base_topic() + "/+/atts/info";
const std::string &scan_payload = _scan_device_results[device->_get_base_topic()];

for (auto &it : _scan_results) {
spdlog::debug("device: {}", it.first);
if (json::get_unsigned_int(scan_payload, "info", "number_of_interfaces", _scan_itf_count_expected) == -1) {
spdlog::error("Unknown number of interface for device");
return -1;
}

return ret;
}
_scan_itf_results.clear();

_subscribe(itf_topic, [&](const mqtt::const_message_ptr &msg) {
std::string base_topic = msg->get_topic().substr(0, msg->get_topic().find("/atts/info"));
spdlog::trace("received interface info: {} {}", msg->get_topic(), msg->get_payload_str());
base_topic = base_topic.substr(base_topic.find_last_of('/') + 1);
_scan_itf_results.emplace(base_topic, msg->get_payload_str());
cv.notify_all();
});

_publish(device->_get_device_topic(), "*");

ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) {
return (_scan_itf_count_expected && (_scan_itf_count_expected == _scan_itf_results.size()));
});

_unsubscribe(itf_topic);

if (ret == false) {
spdlog::error("timed out waiting for scan results");
spdlog::trace("_scan_itf_count_expected = {}, got = {}", _scan_itf_count_expected, _scan_itf_results.size());
return -1;
}

spdlog::debug("scan successful, found {} interfaces", _scan_itf_results.size());

if (core::get_log_level() == core::log_level::trace) {
for (auto &it : _scan_itf_results) {
spdlog::trace("interface: {}", it.first);
}
}

return 0;
}

int client::register_device(const device::ptr &device)
{
bool sane = false;
bool ret;
std::condition_variable cv;
std::unique_lock<std::mutex> lock(_mtx);

if (device == nullptr) {
spdlog::error("Device is null");
return -1;
}

if (_scan_device_results.find(device->_get_base_topic()) == _scan_device_results.end()) {
spdlog::error("Device {} was not scanned", device->_get_base_topic());
return -1;
}

_subscribe(device->_get_device_topic() + "/atts/identity", [&](const mqtt::const_message_ptr &msg) {
if (device->_set_identity(msg->get_payload_str()) == 0)
sane = true;
cv.notify_all();
});

ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) { return (sane); });
if (ret == false) {
spdlog::error("Device is not sane, that's very troubling");
return -1;
}

if (_scan_interfaces(lock, device) == -1)
return -1;

return device->_propagate_interfaces(_scan_itf_results);
}
19 changes: 15 additions & 4 deletions source/pza/core/client.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
#include <nlohmann/json.hpp>
#include <spdlog/spdlog.h>

#include <pza/core/core.hxx>
#include <pza/core/device.hxx>
#include <pza/utils/json.hxx>

namespace pza
{
class client : virtual public mqtt::callback
Expand All @@ -31,6 +35,8 @@ namespace pza
const std::string &get_id(void) const { return _id; }
int get_port(void) const { return _port; }

int register_device(const device::ptr &device);

private:
using listener_map = std::map<std::string, std::function<void(mqtt::const_message_ptr)>>;

Expand All @@ -41,12 +47,16 @@ namespace pza
int _port;
std::string _id;
mqtt::async_client::ptr_t _paho_client;
std::condition_variable _cv;
std::mutex _mtx;
std::map<std::string, std::string> _scan_results;
unsigned int _scan_count_expected = 0;
listener_map _listeners;

std::map<std::string, std::string> _scan_device_results;
unsigned int _scan_device_count_expected = 0;

std::map<std::string, std::string> _scan_itf_results;
unsigned int _scan_itf_count_expected = 0;


void connection_lost(const std::string &cause) override;
void message_arrived(mqtt::const_message_ptr msg) override;

Expand All @@ -58,5 +68,6 @@ namespace pza
std::string _convertPattern(const std::string &fnmatchPattern);
bool _topic_matches(const std::string &str, const std::string &fnmatchPattern);
void _count_devices_to_scan(const std::string &payload);
int _scan_interfaces(std::unique_lock<std::mutex> &lock, const device::ptr &device);
};
};
};
48 changes: 47 additions & 1 deletion source/pza/core/device.cxx
Original file line number Diff line number Diff line change
@@ -1 +1,47 @@
#include "device.hxx"
#include "device.hxx"
#include <pza/core/client.hxx>

using namespace pza;

device::device(const std::string group, const std::string name)
: _name(name),
_group(group),
_base_topic("pza/" + group + "/" + name),
_device_topic(_base_topic + "/device")
{

}

void device::reset()
{
_state = state::orphan;
_model = "";
_manufacturer = "";
}

int device::_set_identity(const std::string &payload)
{
std::string family;

if (json::get_string(payload, "identity", "model", _model) == -1) {
spdlog::error("Device does not have a model");
return -1;
}

if (json::get_string(payload, "identity", "manufacturer", _manufacturer) == -1) {
spdlog::error("Device does not have a manufacturer");
return -1;
}

if (json::get_string(payload, "identity", "family", family) == -1) {
spdlog::error("Device does not have a family");
return -1;
}

if (family != get_family()) {
spdlog::error("Device is not compatible {} != {}", family, get_family());
return -1;
}

return 0;
}
Loading

0 comments on commit ffa2727

Please sign in to comment.