Skip to content

Commit

Permalink
Moved qserver zmq listener socket to netstreamer thread since we were…
Browse files Browse the repository at this point in the history
… getting seg faults with two threads
  • Loading branch information
Arthur Glowacki committed Nov 18, 2024
1 parent 7160865 commit 4efbb57
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 114 deletions.
119 changes: 27 additions & 92 deletions src/mvc/BlueskyComm.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,20 @@
//---------------------------------------------------------------------------


class BlueskyComm : public QThread
class BlueskyComm
{

Q_OBJECT

public:

/**
* Constructor.
*/
BlueskyComm(QString str_ip, QObject* parent = nullptr) : QThread(parent)
BlueskyComm(zmq::context_t *context, QString str_ip)
{

std::string conn_str = "tcp://"+str_ip.toStdString()+":60615";
std::string lsn_str = "tcp://"+str_ip.toStdString()+":60625";
_context = new zmq::context_t(1);
_zmq_comm_socket = new zmq::socket_t(*_context, ZMQ_REQ);
_zmq_comm_socket = new zmq::socket_t(*context, ZMQ_REQ);
_zmq_comm_socket->connect(conn_str);
_zmq_lsn_socket = new zmq::socket_t(*_context, ZMQ_SUB);
_zmq_lsn_socket->connect(lsn_str);
_zmq_lsn_socket->set(zmq::sockopt::subscribe, "QS_Console");
_zmq_lsn_socket->set(zmq::sockopt::rcvtimeo, 1000); //set timeout to 1000ms

}

//---------------------------------------------------------------------------
Expand All @@ -57,19 +48,7 @@ class BlueskyComm : public QThread
_zmq_comm_socket->close();
delete _zmq_comm_socket;
}
if(_zmq_lsn_socket != nullptr)
{
_zmq_lsn_socket->close();
delete _zmq_lsn_socket;
}
if (_context != nullptr)
{
_context->close();
delete _context;
}
_zmq_comm_socket = nullptr;
_zmq_lsn_socket = nullptr;
_context = nullptr;
}

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -172,9 +151,9 @@ class BlueskyComm : public QThread
zmq::message_t message;
QByteArray msg_arr = gen_send_mesg("environment_open", nullptr);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -206,9 +185,9 @@ class BlueskyComm : public QThread
zmq::message_t message;
QByteArray msg_arr = gen_send_mesg("environment_close", nullptr);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -240,9 +219,9 @@ class BlueskyComm : public QThread
zmq::message_t message;
QByteArray msg_arr = gen_send_mesg("queue_start", nullptr);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -274,9 +253,9 @@ class BlueskyComm : public QThread
zmq::message_t message;
QByteArray msg_arr = gen_send_mesg("queue_stop", nullptr);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -312,9 +291,9 @@ class BlueskyComm : public QThread
params["user_group"] = "primary";
QByteArray msg_arr = gen_send_mesg2("queue_item_add", params);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -350,9 +329,9 @@ class BlueskyComm : public QThread
params["user_group"] = "primary";
QByteArray msg_arr = gen_send_mesg2("queue_item_update", params);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -388,9 +367,9 @@ class BlueskyComm : public QThread
params["pos_dest"] = destRow;
QByteArray msg_arr = gen_send_mesg2("queue_item_move", params);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -424,9 +403,9 @@ class BlueskyComm : public QThread
params["pos"] = row;
QByteArray msg_arr = gen_send_mesg2("queue_item_remove", params);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -462,9 +441,9 @@ class BlueskyComm : public QThread
params["user_group"] = "primary";
QByteArray msg_arr = gen_send_mesg("plans_allowed", &params);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);;
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);;

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -568,9 +547,9 @@ class BlueskyComm : public QThread
zmq::message_t message;
QByteArray msg_arr = gen_send_mesg("queue_get", nullptr);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
QJsonObject reply = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
Expand Down Expand Up @@ -781,9 +760,9 @@ class BlueskyComm : public QThread
zmq::message_t message;
QByteArray msg_arr = gen_send_mesg("history_clear", nullptr);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
msg = QString::fromUtf8((char*)message.data(), message.size());
Expand All @@ -808,9 +787,9 @@ class BlueskyComm : public QThread
zmq::message_t message;
QByteArray msg_arr = gen_send_mesg("history_get", nullptr);
zmq::message_t zmsg(msg_arr.data(), msg_arr.length());
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg, zmq::send_flags::none);
zmq::send_result_t s_res = _zmq_comm_socket->send(zmsg);

zmq::recv_result_t r_res = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
zmq::recv_result_t r_res = _zmq_comm_socket->recv(message);
if(r_res.has_value())
{
if(raw_mesg)
Expand Down Expand Up @@ -963,54 +942,10 @@ class BlueskyComm : public QThread

//---------------------------------------------------------------------------

public slots:
void run() override
{
_running = true;
zmq::message_t token, message;
while(_running)
{
zmq::recv_result_t r_res = _zmq_lsn_socket->recv(token, zmq::recv_flags::none);
if(r_res.has_value())
{
std::string s1 ((char*)token.data(), token.size());
if(s1 == "QS_Console")
{
zmq::recv_result_t r_res2 = _zmq_comm_socket->recv(message, zmq::recv_flags::none);
if(r_res2.has_value())
{
QJsonObject rootJson = QJsonDocument::fromJson(QString::fromUtf8((char*)message.data(), message.size()).toUtf8()).object();
if(rootJson.contains("msg"))
{
QString msg = rootJson["msg"].toString();
msg.chop(1);
emit newData(msg);
}
else
{
// logI<<data.toStdString()<<"\n"; // may cause issues coming from a thread
}
}
}
}
}
_zmq_lsn_socket->close();
}
void stop() {_running = false;}

signals:
void newData(const QString&);

protected:

bool _running;

zmq::context_t *_context;

zmq::socket_t *_zmq_comm_socket;

zmq::socket_t *_zmq_lsn_socket;

};


Expand Down
25 changes: 17 additions & 8 deletions src/mvc/LiveMapsElementsWidget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ LiveMapsElementsWidget::LiveMapsElementsWidget(QString ip, QString port, QWidget
_qserverComm = nullptr;
_mapsElementsWidget = nullptr;
_last_packet = nullptr;
_qserverComm = nullptr;
//_currentModel = new MapsH5Model();
_currentModel = nullptr;
_num_images = 0;
_prev_dataset_name = " ";
_context = new zmq::context_t(1);
_qserver_ip_addr = new QLineEdit("127.0.0.1");
_qline_ip_addr = new QLineEdit();
if(ip.length() > 0)
Expand Down Expand Up @@ -62,6 +64,11 @@ LiveMapsElementsWidget::~LiveMapsElementsWidget()
}
_currentModel = nullptr;

if(_qserverComm != nullptr)
{
delete _qserverComm;
}

if(_streamWorker != nullptr)
{
_streamWorker->stop();
Expand All @@ -77,6 +84,13 @@ LiveMapsElementsWidget::~LiveMapsElementsWidget()
_mapsElementsWidget = nullptr;
}

if (_context != nullptr)
{
_context->close();
delete _context;
}
_context = nullptr;

}

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -167,15 +181,9 @@ void LiveMapsElementsWidget::updateIp()

if(_qserverComm != nullptr)
{
disconnect(_qserverComm, &BlueskyComm::newData, _scan_queue_widget, &ScanQueueWidget::newDataArrived);
_qserverComm->stop();
_qserverComm->quit();
_qserverComm->wait();
delete _qserverComm;
}
_qserverComm = new BlueskyComm(_qserver_ip_addr->text(), this);
connect(_qserverComm, &BlueskyComm::newData, _scan_queue_widget, &ScanQueueWidget::newDataArrived);
_qserverComm->start();
_qserverComm = new BlueskyComm(_context, _qserver_ip_addr->text());

if(_streamWorker != nullptr)
{
Expand All @@ -185,8 +193,9 @@ void LiveMapsElementsWidget::updateIp()
_streamWorker->wait();
delete _streamWorker;
}
_streamWorker = new NetStreamWorker(_qline_ip_addr->text(), _qline_port->text(), this);
_streamWorker = new NetStreamWorker(_context, _qline_ip_addr->text(), _qline_port->text(), _qserver_ip_addr->text(), this);
connect(_streamWorker, &NetStreamWorker::newData, this, &LiveMapsElementsWidget::newDataArrived);
connect(_streamWorker, &NetStreamWorker::newStringData, _scan_queue_widget, &ScanQueueWidget::newDataArrived);
_streamWorker->start();
if(_last_packet != nullptr)
delete _last_packet;
Expand Down
2 changes: 2 additions & 0 deletions src/mvc/LiveMapsElementsWidget.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public slots:

std::string _prev_dataset_name;

zmq::context_t *_context;

int _num_images;
};

Expand Down
Loading

0 comments on commit 4efbb57

Please sign in to comment.