diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f711def..fca323a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,5 +1,5 @@ set(examples - psu + bps ) foreach(example ${examples}) diff --git a/examples/bps.cxx b/examples/bps.cxx new file mode 100644 index 0000000..252905c --- /dev/null +++ b/examples/bps.cxx @@ -0,0 +1,25 @@ +#include +#include +#include + +int main(void) +{ + pza::core::set_log_level(pza::core::log_level::debug); + + pza::client::ptr cli = std::make_shared("localhost", 1883); + + if (cli->connect() == -1) { + return -1; + } + + pza::bps::ptr bps = std::make_shared("default", "MY BPS 1"); + + cli->register_device(bps); + + bps->channel[0]->volts; + + while (1) + ; + + return 0; +} diff --git a/examples/psu.cxx b/examples/psu.cxx deleted file mode 100644 index 9830e3e..0000000 --- a/examples/psu.cxx +++ /dev/null @@ -1,18 +0,0 @@ -#include -#include - -int main(void) -{ - pza::core::set_log_level(pza::core::log_level::trace); - - pza::client::ptr client = std::make_shared("localhost", 1883); - - if (client->connect() == -1) { - return -1; - } - - while (1) - ; - - return 0; -} diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index ac0dd7e..2d39e35 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -11,6 +11,18 @@ target_sources(${LIBRARY_NAME} pza/core/core.cxx pza/core/client.cxx pza/core/device.cxx + pza/core/united_interface.cxx + pza/core/interface.cxx + + pza/utils/json.cxx + pza/utils/string.cxx + + pza/devices/bps.cxx + + pza/interfaces/power_channel.cxx + pza/interfaces/ampermeter.cxx + pza/interfaces/voltmeter.cxx + pza/interfaces/bps_chan_ctrl.cxx ) target_include_directories(${LIBRARY_NAME} PUBLIC diff --git a/source/pza/core/client.cxx b/source/pza/core/client.cxx index d811621..2fd353e 100644 --- a/source/pza/core/client.cxx +++ b/source/pza/core/client.cxx @@ -164,7 +164,6 @@ void client::message_arrived(mqtt::const_message_ptr msg) spdlog::trace("message arrived on topic: {}", msg->get_topic()); if (_listeners.count(msg->get_topic()) > 0) { - spdlog::debug("Direct"); _listeners[msg->get_topic()](msg); return; } @@ -178,7 +177,8 @@ void client::message_arrived(mqtt::const_message_ptr msg) int client::scan_devices(void) { - int ret = 0; + bool ret; + std::condition_variable cv; std::unique_lock lock(_mtx); if (is_connected() == false) { @@ -186,57 +186,136 @@ int client::scan_devices(void) return -1; } - _scan_count_expected = 0; - _scan_results.clear(); + _scan_device_count_expected = 0; + _scan_device_results.clear(); spdlog::debug("scanning for devices on {}...", _addr); _subscribe("pza/+/+/device/atts/info", [&](const mqtt::const_message_ptr &msg) { + std::string base_topic = msg->get_topic().substr(0, msg->get_topic().find("/device/atts/info")); spdlog::trace("received device info: {} {}", msg->get_topic(), msg->get_payload_str()); - _scan_results.emplace(msg->get_topic(), msg->get_payload_str()); - _cv.notify_all(); + _scan_device_results.emplace(base_topic, msg->get_payload_str()); + cv.notify_all(); }); _subscribe("pza/+/platforms/+/atts/info", [&](const mqtt::const_message_ptr &msg) { std::string payload = msg->get_payload_str(); std::string topic = msg->get_topic(); + unsigned int val; spdlog::trace("received platform info: {}", payload); - try { - nlohmann::json j = nlohmann::json::parse(payload); - - auto val = j["info"]["number_of_devices"]; - if (val.is_number_unsigned()) - _scan_count_expected += val.get(); - else - spdlog::error("invalid payload: {}", payload); + if (pza::json::get_unsigned_int(payload, "info", "number_of_devices", val) == -1) { + spdlog::error("failed to parse platform info: {}", payload); + return; } - catch (const nlohmann::json::parse_error &exc) { - spdlog::error("failed to parse platform info: {}", exc.what()); - } - _cv.notify_all(); + _scan_device_count_expected += val; + cv.notify_all(); }); _publish("pza", "*"); - if (_cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) { - return (_scan_count_expected && (_scan_count_expected == _scan_results.size())); - }) == false) { + ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) { + return (_scan_device_count_expected && (_scan_device_count_expected == _scan_device_results.size())); + }); + + _unsubscribe("pza/+/+/device/atts/info"); + _unsubscribe("pza/+/platforms/+/atts/info"); + + if (ret == false) { spdlog::error("timed out waiting for scan results"); - spdlog::trace("_scan_count_expected = {}, got = {}", _scan_count_expected, _scan_results.size()); - _scan_results.clear(); - ret = -1; + spdlog::trace("_scan_device_count_expected = {}, got = {}", _scan_device_count_expected, _scan_device_results.size()); + return -1; } - _scan_count_expected = 0; - _unsubscribe("pza/+/+/device/atts/info"); - _unsubscribe("pza/+/platforms/+/atts/info"); + spdlog::debug("scan successful, found {} devices", _scan_device_results.size()); + + if (core::get_log_level() == core::log_level::trace) { + for (auto &it : _scan_device_results) { + spdlog::trace("device: {}", it.first); + } + } + + return 0; +} - spdlog::debug("scan complete, found {} devices", _scan_results.size()); +int client::_scan_interfaces(std::unique_lock &lock, const device::ptr &device) +{ + bool ret; + std::condition_variable cv; + std::string itf_topic = device->_get_base_topic() + "/+/atts/info"; + const std::string &scan_payload = _scan_device_results[device->_get_base_topic()]; - for (auto &it : _scan_results) { - spdlog::debug("device: {}", it.first); + if (json::get_unsigned_int(scan_payload, "info", "number_of_interfaces", _scan_itf_count_expected) == -1) { + spdlog::error("Unknown number of interface for device"); + return -1; } - return ret; -} \ No newline at end of file + _scan_itf_results.clear(); + + _subscribe(itf_topic, [&](const mqtt::const_message_ptr &msg) { + std::string base_topic = msg->get_topic().substr(0, msg->get_topic().find("/atts/info")); + spdlog::trace("received interface info: {} {}", msg->get_topic(), msg->get_payload_str()); + base_topic = base_topic.substr(base_topic.find_last_of('/') + 1); + _scan_itf_results.emplace(base_topic, msg->get_payload_str()); + cv.notify_all(); + }); + + _publish(device->_get_device_topic(), "*"); + + ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) { + return (_scan_itf_count_expected && (_scan_itf_count_expected == _scan_itf_results.size())); + }); + + _unsubscribe(itf_topic); + + if (ret == false) { + spdlog::error("timed out waiting for scan results"); + spdlog::trace("_scan_itf_count_expected = {}, got = {}", _scan_itf_count_expected, _scan_itf_results.size()); + return -1; + } + + spdlog::debug("scan successful, found {} interfaces", _scan_itf_results.size()); + + if (core::get_log_level() == core::log_level::trace) { + for (auto &it : _scan_itf_results) { + spdlog::trace("interface: {}", it.first); + } + } + + return 0; +} + +int client::register_device(const device::ptr &device) +{ + bool sane = false; + bool ret; + std::condition_variable cv; + std::unique_lock lock(_mtx); + + if (device == nullptr) { + spdlog::error("Device is null"); + return -1; + } + + if (_scan_device_results.find(device->_get_base_topic()) == _scan_device_results.end()) { + spdlog::error("Device {} was not scanned", device->_get_base_topic()); + return -1; + } + + _subscribe(device->_get_device_topic() + "/atts/identity", [&](const mqtt::const_message_ptr &msg) { + if (device->_set_identity(msg->get_payload_str()) == 0) + sane = true; + cv.notify_all(); + }); + + ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) { return (sane); }); + if (ret == false) { + spdlog::error("Device is not sane, that's very troubling"); + return -1; + } + + if (_scan_interfaces(lock, device) == -1) + return -1; + + return device->_propagate_interfaces(_scan_itf_results); +} diff --git a/source/pza/core/client.hxx b/source/pza/core/client.hxx index 9d6acc2..8aa7359 100644 --- a/source/pza/core/client.hxx +++ b/source/pza/core/client.hxx @@ -10,6 +10,10 @@ #include #include +#include +#include +#include + namespace pza { class client : virtual public mqtt::callback @@ -31,6 +35,8 @@ namespace pza const std::string &get_id(void) const { return _id; } int get_port(void) const { return _port; } + int register_device(const device::ptr &device); + private: using listener_map = std::map>; @@ -41,12 +47,16 @@ namespace pza int _port; std::string _id; mqtt::async_client::ptr_t _paho_client; - std::condition_variable _cv; std::mutex _mtx; - std::map _scan_results; - unsigned int _scan_count_expected = 0; listener_map _listeners; + std::map _scan_device_results; + unsigned int _scan_device_count_expected = 0; + + std::map _scan_itf_results; + unsigned int _scan_itf_count_expected = 0; + + void connection_lost(const std::string &cause) override; void message_arrived(mqtt::const_message_ptr msg) override; @@ -58,5 +68,6 @@ namespace pza std::string _convertPattern(const std::string &fnmatchPattern); bool _topic_matches(const std::string &str, const std::string &fnmatchPattern); void _count_devices_to_scan(const std::string &payload); + int _scan_interfaces(std::unique_lock &lock, const device::ptr &device); }; -}; \ No newline at end of file +}; diff --git a/source/pza/core/device.cxx b/source/pza/core/device.cxx index 06106a4..b31c34c 100644 --- a/source/pza/core/device.cxx +++ b/source/pza/core/device.cxx @@ -1 +1,47 @@ -#include "device.hxx" \ No newline at end of file +#include "device.hxx" +#include + +using namespace pza; + +device::device(const std::string group, const std::string name) + : _name(name), + _group(group), + _base_topic("pza/" + group + "/" + name), + _device_topic(_base_topic + "/device") +{ + +} + +void device::reset() +{ + _state = state::orphan; + _model = ""; + _manufacturer = ""; +} + +int device::_set_identity(const std::string &payload) +{ + std::string family; + + if (json::get_string(payload, "identity", "model", _model) == -1) { + spdlog::error("Device does not have a model"); + return -1; + } + + if (json::get_string(payload, "identity", "manufacturer", _manufacturer) == -1) { + spdlog::error("Device does not have a manufacturer"); + return -1; + } + + if (json::get_string(payload, "identity", "family", family) == -1) { + spdlog::error("Device does not have a family"); + return -1; + } + + if (family != get_family()) { + spdlog::error("Device is not compatible {} != {}", family, get_family()); + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/source/pza/core/device.hxx b/source/pza/core/device.hxx index a26b753..2b0d381 100644 --- a/source/pza/core/device.hxx +++ b/source/pza/core/device.hxx @@ -1,10 +1,56 @@ #pragma once +#include +#include +#include +#include + +#include + namespace pza { - class device + class client; + + class device { public: - private: + using ptr = std::shared_ptr; + + friend class client; + + enum class state : unsigned int + { + orphan = 0, + init, + running + }; + + const std::string &get_model() { return _model; } + const std::string &get_manufacturer() { return _manufacturer; } + const std::shared_ptr get_client() { return _cli; } + virtual const std::string get_family() = 0; + + void reset(); + enum state get_state() { return _state; } + + protected: + device(const std::string group, const std::string name); + + virtual int _propagate_interfaces(const std::map &map) = 0; + int _set_identity(const std::string &payload); + const std::string &_get_base_topic() { return _base_topic; } + const std::string &_get_device_topic() { return _device_topic; } + + std::shared_ptr _cli = nullptr; + + std::string _name; + std::string _group; + std::string _model; + std::string _manufacturer; + + std::string _base_topic; + std::string _device_topic; + + enum state _state = state::orphan; }; -}; \ No newline at end of file +}; diff --git a/source/pza/core/united_interface.cxx b/source/pza/core/united_interface.cxx new file mode 100644 index 0000000..2d86087 --- /dev/null +++ b/source/pza/core/united_interface.cxx @@ -0,0 +1 @@ +#include "united_interface.hxx" \ No newline at end of file diff --git a/source/pza/core/united_interface.hxx b/source/pza/core/united_interface.hxx new file mode 100644 index 0000000..7d861d5 --- /dev/null +++ b/source/pza/core/united_interface.hxx @@ -0,0 +1,21 @@ +#pragma once + +#include "interface.hxx" + +#include + +namespace pza +{ + class device; + + class united_interface + { + public: + using ptr = std::shared_ptr; + + virtual int _add_interface(const std::pair &elem) = 0; + + protected: + size_t _size; + }; +}; \ No newline at end of file diff --git a/source/pza/devices/bps.cxx b/source/pza/devices/bps.cxx new file mode 100644 index 0000000..8d09eae --- /dev/null +++ b/source/pza/devices/bps.cxx @@ -0,0 +1,22 @@ +#include "bps.hxx" + +using namespace pza; + +bps::bps(const std::string group, const std::string name) + : device(group, name), + _num_channels(0) +{ + +} + +int bps::_propagate_interfaces(const std::map &map) +{ + int n_channels = 0; + int chan = 0; + std::string s; + + for (auto it = map.begin(); it != map.end() && (*it).first[0] == ':'; ++it) { + if (channel._add_interface(*it) == -1) + return -1; + } +} \ No newline at end of file diff --git a/source/pza/devices/bps.hxx b/source/pza/devices/bps.hxx new file mode 100644 index 0000000..c07734a --- /dev/null +++ b/source/pza/devices/bps.hxx @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +#include + +namespace pza +{ + class bps : public device + { + public: + using ptr = std::shared_ptr; + + bps(const std::string group, const std::string name); + + const std::string get_family() { return "bps"; } + + size_t get_num_channels() { return _num_channels; } + + power_channel channel; + + private: + int _propagate_interfaces(const std::map &map) override; + + size_t _num_channels; + }; +}; diff --git a/source/pza/interfaces/ampermeter.cxx b/source/pza/interfaces/ampermeter.cxx new file mode 100644 index 0000000..2d68aad --- /dev/null +++ b/source/pza/interfaces/ampermeter.cxx @@ -0,0 +1 @@ +#include "ampermeter.hxx" \ No newline at end of file diff --git a/source/pza/interfaces/ampermeter.hxx b/source/pza/interfaces/ampermeter.hxx new file mode 100644 index 0000000..d4d4ad7 --- /dev/null +++ b/source/pza/interfaces/ampermeter.hxx @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace pza +{ + class ampermeter + { + public: + using ptr = std::shared_ptr; + private: + }; +}; \ No newline at end of file diff --git a/source/pza/interfaces/bps_chan_ctrl.cxx b/source/pza/interfaces/bps_chan_ctrl.cxx new file mode 100644 index 0000000..6c0e0bf --- /dev/null +++ b/source/pza/interfaces/bps_chan_ctrl.cxx @@ -0,0 +1 @@ +#include "bps_chan_ctrl.hxx" \ No newline at end of file diff --git a/source/pza/interfaces/bps_chan_ctrl.hxx b/source/pza/interfaces/bps_chan_ctrl.hxx new file mode 100644 index 0000000..f4ecb92 --- /dev/null +++ b/source/pza/interfaces/bps_chan_ctrl.hxx @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace pza +{ + class bps_chan_ctrl + { + public: + using ptr = std::shared_ptr; + private: + }; +}; \ No newline at end of file diff --git a/source/pza/interfaces/power_channel.cxx b/source/pza/interfaces/power_channel.cxx new file mode 100644 index 0000000..3bc915d --- /dev/null +++ b/source/pza/interfaces/power_channel.cxx @@ -0,0 +1,36 @@ +#include "power_channel.hxx" + +using namespace pza; + +int power_channel::_add_interface(const std::pair &elem) +{ + const std::string &name = elem.first; + size_t chan_id; + size_t pos; + std::string type; + + pos = name.find_first_of('_') + 1; + chan_id = std::stoi(name.substr(pos, name.find_last_of(':') - pos)); + if (pza::json::get_string(elem.second, "info", "type", type) == -1) + return -1; + + if (_channels.size() <= chan_id) { + _channels.resize(chan_id + 1); + } + + if (type == "ammeter") + _channels[chan_id].amps = std::make_shared(); + else if (type == "voltmeter") + _channels[chan_id].volts = std::make_shared(); + else if (type == "bps_control") + _channels[chan_id].ctrl = std::make_shared(); + + return 0; +} + +const power_channel::s_channel *power_channel::operator[](size_t index) const +{ + if (index >= _channels.size()) + throw std::out_of_range("index out of range"); + return &_channels.at(index); +} \ No newline at end of file diff --git a/source/pza/interfaces/power_channel.hxx b/source/pza/interfaces/power_channel.hxx new file mode 100644 index 0000000..ab3faff --- /dev/null +++ b/source/pza/interfaces/power_channel.hxx @@ -0,0 +1,33 @@ +#pragma once + +#include + +#include + +#include +#include +#include + +#include +#include + +namespace pza +{ + class power_channel : public united_interface + { + public: + struct s_channel + { + voltmeter::ptr volts; + ampermeter::ptr amps; + bps_chan_ctrl::ptr ctrl; + }; + + const s_channel *operator[](size_t index) const; + + int _add_interface(const std::pair &elem) override; + + private: + std::vector _channels; + }; +}; \ No newline at end of file diff --git a/source/pza/interfaces/voltmeter.cxx b/source/pza/interfaces/voltmeter.cxx new file mode 100644 index 0000000..2f67e6b --- /dev/null +++ b/source/pza/interfaces/voltmeter.cxx @@ -0,0 +1 @@ +#include "voltmeter.hxx" \ No newline at end of file diff --git a/source/pza/interfaces/voltmeter.hxx b/source/pza/interfaces/voltmeter.hxx new file mode 100644 index 0000000..579a063 --- /dev/null +++ b/source/pza/interfaces/voltmeter.hxx @@ -0,0 +1,14 @@ +#pragma once + +#include + +namespace pza +{ + class voltmeter + { + public: + using ptr = std::shared_ptr; + private: + + }; +}; \ No newline at end of file diff --git a/source/pza/utils/json.cxx b/source/pza/utils/json.cxx new file mode 100644 index 0000000..1ae8137 --- /dev/null +++ b/source/pza/utils/json.cxx @@ -0,0 +1,112 @@ +#include "json.hxx" + +using namespace pza; +using namespace json; + +int json::_parse(const std::string &payload, nlohmann::json &json) +{ + try { + json = nlohmann::json::parse(payload); + } catch (nlohmann::json::parse_error &e) { + return -1; + } + return 0; +} + +int json::get_string(const std::string &payload, const std::string &atts, const std::string &key, std::string &str) +{ + nlohmann::json json; + if (_parse(payload, json) < 0) { + return -1; + } + try { + str = json[atts][key].get(); + } catch (nlohmann::json::type_error &e) { + return -1; + } + return 0; +} + +int json::get_int(const std::string &payload, const std::string &atts, const std::string &key, int &i) +{ + nlohmann::json json; + if (_parse(payload, json) < 0) { + return -1; + } + try { + i = json[atts][key].get(); + } catch (nlohmann::json::type_error &e) { + return -1; + } + return 0; +} + +int json::get_unsigned_int(const std::string &payload, const std::string &atts, const std::string &key, unsigned &u) +{ + nlohmann::json json; + if (_parse(payload, json) < 0) { + return -1; + } + try { + u = json[atts][key].get(); + } catch (nlohmann::json::type_error &e) { + return -1; + } + return 0; +} + +int json::get_double(const std::string &payload, const std::string &atts, const std::string &key, double &f) +{ + nlohmann::json json; + if (_parse(payload, json) < 0) { + return -1; + } + try { + f = json[atts][key].get(); + } catch (nlohmann::json::type_error &e) { + return -1; + } + return 0; +} + +int json::get_bool(const std::string &payload, const std::string &atts, const std::string &key, bool &b) +{ + nlohmann::json json; + if (_parse(payload, json) < 0) { + return -1; + } + try { + b = json[atts][key].get(); + } catch (nlohmann::json::type_error &e) { + return -1; + } + return 0; +} + +int json::get_array(const std::string &payload, const std::string &atts, const std::string &key, nlohmann::json &json) +{ + nlohmann::json j; + if (_parse(payload, j) < 0) { + return -1; + } + try { + json = j[atts][key]; + } catch (nlohmann::json::type_error &e) { + return -1; + } + return 0; +} + +int json::get_object(const std::string &payload, const std::string &atts, const std::string &key, nlohmann::json &json) +{ + nlohmann::json j; + if (_parse(payload, j) < 0) { + return -1; + } + try { + json = j[atts][key]; + } catch (nlohmann::json::type_error &e) { + return -1; + } + return 0; +} \ No newline at end of file diff --git a/source/pza/utils/json.hxx b/source/pza/utils/json.hxx new file mode 100644 index 0000000..72ecec6 --- /dev/null +++ b/source/pza/utils/json.hxx @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace pza +{ + namespace json + { + int get_string(const std::string &payload, const std::string &atts, const std::string &key, std::string &str); + int get_int(const std::string &payload, const std::string &atts, const std::string &key, int &i); + int get_unsigned_int(const std::string &payload, const std::string &atts, const std::string &key, unsigned &u); + int get_double(const std::string &payload, const std::string &atts, const std::string &key, double &f); + int get_bool(const std::string &payload, const std::string &atts, const std::string &key, bool &b); + int get_array(const std::string &payload, const std::string &atts, const std::string &key, nlohmann::json &json); + int get_object(const std::string &payload, const std::string &atts, const std::string &key, nlohmann::json &json); + int _parse(const std::string &payload, nlohmann::json &json); + }; +}; diff --git a/source/pza/utils/string.cxx b/source/pza/utils/string.cxx new file mode 100644 index 0000000..03d0b90 --- /dev/null +++ b/source/pza/utils/string.cxx @@ -0,0 +1,8 @@ +#include "string.hxx" + +using namespace pza; + +bool string::starts_with(const std::string &s, const std::string &prefix) +{ + return (s.rfind(prefix, 0) == 0); +} \ No newline at end of file diff --git a/source/pza/utils/string.hxx b/source/pza/utils/string.hxx new file mode 100644 index 0000000..3482756 --- /dev/null +++ b/source/pza/utils/string.hxx @@ -0,0 +1,11 @@ +#pragma once + +#include + +namespace pza +{ + namespace string + { + bool starts_with(const std::string &s, const std::string &prefix); + }; +}; \ No newline at end of file