Skip to content
This repository has been archived by the owner on Nov 10, 2022. It is now read-only.

Commit

Permalink
Improved handling of multiple connections and small refactor
Browse files Browse the repository at this point in the history
Now server is able to handle corrently when multiple write requests are
made to client.
Tested with parallel Web-Socket and REST connections with subscribed
clients.

Small update of code based on pull request comments.

Signed-off-by: Miladinovic Bojan <fixed-term.bojan.miladinovic@se.bosch.com>
  • Loading branch information
Miladinovic Bojan committed Nov 5, 2019
1 parent 495babc commit 6427df0
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 17 deletions.
6 changes: 4 additions & 2 deletions w3c-visserver-api/include/ISubscriptionHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ class WsServer;
class IVssDatabase;
class IServer;

using SubscriptionId = uint32_t;

class ISubscriptionHandler {
public:
virtual ~ISubscriptionHandler() {}

virtual uint64_t subscribe(WsChannel& channel,
std::shared_ptr<IVssDatabase> db,
const std::string &path) = 0;
virtual int unsubscribe(uint32_t subscribeID) = 0;
virtual int unsubscribeAll(uint32_t connectionID) = 0;
virtual int unsubscribe(SubscriptionId subscribeID) = 0;
virtual int unsubscribeAll(SubscriptionId connectionID) = 0;
virtual int updateByUUID(const std::string &signalUUID, const jsoncons::json &value) = 0;
virtual int updateByPath(const std::string &path, const jsoncons::json &value) = 0;
virtual std::shared_ptr<IServer> getServer() = 0;
Expand Down
9 changes: 5 additions & 4 deletions w3c-visserver-api/include/SubscriptionHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class WsChannel;
class WsServer;
class ILogger;


using SubConnId = uint64_t;

// Subscription ID: Client ID
typedef std::unordered_map<uint32_t, SubConnId> subscriptions_t;
using subscriptions_t = std::unordered_map<SubscriptionId, SubConnId>;

// Subscription UUID
typedef std::string uuid_t;
Expand All @@ -54,7 +55,7 @@ class SubscriptionHandler : public ISubscriptionHandler {
std::mutex subMutex;
std::thread subThread;
bool threadRun;
std::queue<std::pair<SubConnId, jsoncons::json>> buffer;
std::queue<std::tuple<SubscriptionId, SubConnId, jsoncons::json>> buffer;

public:
SubscriptionHandler(std::shared_ptr<ILogger> loggerUtil,
Expand All @@ -66,8 +67,8 @@ class SubscriptionHandler : public ISubscriptionHandler {
uint64_t subscribe(WsChannel& channel,
std::shared_ptr<IVssDatabase> db,
const std::string &path);
int unsubscribe(uint32_t subscribeID);
int unsubscribeAll(uint32_t connectionID);
int unsubscribe(SubscriptionId subscribeID);
int unsubscribeAll(SubscriptionId connectionID);
int updateByUUID(const std::string &signalUUID, const jsoncons::json &value);
int updateByPath(const std::string &path, const jsoncons::json &value);
std::shared_ptr<IServer> getServer();
Expand Down
13 changes: 12 additions & 1 deletion w3c-visserver-api/src/RestV1ApiHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,19 @@ bool RestV1ApiHandler::GetSignalPath(uint32_t requestId,
if (restTarget.size() && verifyPathAndStrip(restTarget, restDelimiter)) {
while (restTarget.size()) {
// we only accept clean printable characters
const std::regex regexValidWord("^([A-Za-z]+)");
const std::regex regexValidWord("^([A-Za-z0-9]+)");

if (std::regex_search(restTarget, sm, regexValidWord)) {
foundStr = sm.str(1);
if (foundStr.size() == 0) {
JsonResponses::malFormedRequest(
requestId,
json["action"].as_string(),
"Signal path not valid",
json);
ret = false;
break;
}
signalPath += foundStr;
if (verifyPathAndStrip(restTarget, foundStr)) {
if ((restTarget.size() == 0)) {
Expand Down Expand Up @@ -115,6 +124,7 @@ bool RestV1ApiHandler::GetSignalPath(uint32_t requestId,
"Signal path not valid",
json);
ret = false;
break;
}
}
else {
Expand All @@ -124,6 +134,7 @@ bool RestV1ApiHandler::GetSignalPath(uint32_t requestId,
"Signal path URI not valid",
json);
ret = false;
break;
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions w3c-visserver-api/src/SubscriptionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
#include "Authenticator.hpp"
#include "exception.hpp"
#include "visconf.hpp"
#include "WsChannel.hpp"
#include "VssDatabase.hpp"
#include "WsServer.hpp"
#include "ILogger.hpp"

using namespace std;
Expand Down Expand Up @@ -129,8 +129,8 @@ int SubscriptionHandler::updateByUUID(const string &signalUUID,

for (auto subID : handle->second) {
std::lock_guard<std::mutex> lock(subMutex);
pair<SubConnId, json> newSub;
newSub = std::make_pair(subID.second, value);
tuple<SubscriptionId, SubConnId, json> newSub;
newSub = std::make_tuple(subID.first, subID.second, value);
buffer.push(newSub);
}

Expand Down Expand Up @@ -158,13 +158,12 @@ void* SubscriptionHandler::subThreadRunner() {
auto newSub = buffer.front();
buffer.pop();

auto connId = newSub.first;

jsoncons::json value = newSub.second;
auto connId = std::get<1>(newSub);
jsoncons::json value = std::get<2>(newSub);

jsoncons::json answer;
answer["action"] = "subscribe";
answer["subscriptionId"] = connId;
answer["subscriptionId"] = std::get<0>(newSub);
answer.insert_or_assign("value", value);
answer["timestamp"] = time(NULL);

Expand Down
3 changes: 3 additions & 0 deletions w3c-visserver-api/src/VssDatabase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <limits>
#include <regex>
#include <stdexcept>
#include <boost/algorithm/string.hpp>

#include "exception.hpp"
#include "visconf.hpp"
Expand Down Expand Up @@ -48,6 +49,8 @@ namespace {
void checkTypeAndBound(std::shared_ptr<ILogger> logger, string value_type, jsoncons::json val) {
bool typeValid = false;

boost::algorithm::to_lower(value_type);

if (value_type == "uint8") {
typeValid = true;
long double longDoubleVal;
Expand Down
33 changes: 30 additions & 3 deletions w3c-visserver-api/src/WebSockHttpFlexServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <limits>
#include <regex>
#include <stdexcept>
#include <list>

#include "detect_ssl.hpp"
#include "ssl_stream.hpp"
Expand Down Expand Up @@ -242,6 +243,7 @@ namespace {
boost::asio::steady_timer timer_;
RequestHandler requestHandler_;
WsChannel channel;
std::list<std::string> writeQueue_;
public:
// Construct the session
explicit WebSocketSession(boost::asio::io_context& ioc,
Expand Down Expand Up @@ -423,7 +425,14 @@ namespace {
}

void write(const std::string &message) {
// TODO: add queuing as async_write should be called one at a time
writeQueue_.push_back(message);

// there can be only one async_write request at any single time,
// so queue additional transfers
if (writeQueue_.size() > 1) {
return;
}

boost::asio::buffer_copy(bufferWrite_.prepare(message.size()), boost::asio::buffer(message));
bufferWrite_.commit(message.size()); // commit copied data for write

Expand All @@ -440,8 +449,6 @@ namespace {
}

void onWrite(boost::system::error_code ec, std::size_t bytesTransferred) {
boost::ignore_unused(bytesTransferred);

// Happens when the timer closes the socket
if(ec == boost::asio::error::operation_aborted)
return;
Expand All @@ -453,6 +460,26 @@ namespace {

// Clear the buffer
bufferWrite_.consume(bytesTransferred);

writeQueue_.pop_front();

// check if there is more to write
if (!writeQueue_.empty()) {
auto message = writeQueue_.front();
boost::asio::buffer_copy(bufferWrite_.prepare(message.size()), boost::asio::buffer(message));
bufferWrite_.commit(message.size()); // commit copied data for write

// send message
derived().ws().async_write(
bufferWrite_.data(),
boost::asio::bind_executor(
strand_,
std::bind(
&WebSocketSession::onWrite,
derived().shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
}
};

Expand Down

0 comments on commit 6427df0

Please sign in to comment.