Skip to content

Commit

Permalink
Optimize big message publishing, fix unreferenced publishes
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Sep 28, 2021
1 parent 814a7ee commit 0ede7e5
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 17 deletions.
27 changes: 21 additions & 6 deletions src/App.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ namespace uWS {
/*OpCode*/ int opCode;
bool compress;
};
struct TopicTreeBigMessage {
std::string_view message;
/*OpCode*/ int opCode;
bool compress;
};
}

/* An app is a convenience wrapper of some of the most used fuctionalities and allows a
Expand Down Expand Up @@ -69,7 +74,7 @@ struct TemplatedApp {

public:

TopicTree<TopicTreeMessage> *topicTree = nullptr;
TopicTree<TopicTreeMessage, TopicTreeBigMessage> *topicTree = nullptr;

/* Server name */
TemplatedApp &&addServerName(std::string hostname_pattern, SocketContextOptions options = {}) {
Expand Down Expand Up @@ -113,8 +118,18 @@ struct TemplatedApp {
/* Publishes a message to all websocket contexts - conceptually as if publishing to the one single
* TopicTree of this app (technically there are many TopicTrees, however the concept is that one
* app has one conceptual Topic tree) */
void publish(std::string_view topic, std::string_view message, OpCode opCode, bool compress = false) {
topicTree->publish(nullptr, topic, {std::string(message), opCode, compress});
bool publish(std::string_view topic, std::string_view message, OpCode opCode, bool compress = false) {
/* Anything big bypasses corking efforts */
if (message.length() >= LoopData::CORK_BUFFER_SIZE) {
return topicTree->publishBig(nullptr, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) {
auto *ws = (WebSocket<SSL, true, int> *) s->user;

/* Send will drain if needed */
ws->send(message.message, (OpCode)message.opCode, message.compress);
});
} else {
return topicTree->publish(nullptr, topic, {std::string(message), opCode, compress});
}
}

/* Returns number of subscribers for this topic, or 0 for failure.
Expand Down Expand Up @@ -225,15 +240,15 @@ struct TemplatedApp {
if (!topicTree) {

bool needsUncork = false;
topicTree = new TopicTree<TopicTreeMessage>([needsUncork](Subscriber *s, TopicTreeMessage &message, TopicTree<TopicTreeMessage>::IteratorFlags flags) mutable {
topicTree = new TopicTree<TopicTreeMessage, TopicTreeBigMessage>([needsUncork](Subscriber *s, TopicTreeMessage &message, TopicTree<TopicTreeMessage, TopicTreeBigMessage>::IteratorFlags flags) mutable {
/* Subscriber's user is the socket */
/* Unfortunately we need to cast is to PerSocketData = int
* since many different WebSocketContexts use the same
* TopicTree now */
auto *ws = (WebSocket<SSL, true, int> *) s->user;

/* If this is the first message we try and cork */
if (flags & TopicTree<TopicTreeMessage>::IteratorFlags::FIRST) {
if (flags & TopicTree<TopicTreeMessage, TopicTreeBigMessage>::IteratorFlags::FIRST) {
if (ws->canCork() && !ws->isCorked()) {
((AsyncSocket<SSL> *)ws)->cork();
needsUncork = true;
Expand All @@ -251,7 +266,7 @@ struct TemplatedApp {
}

/* If this is the last message we uncork if we are corked */
if (flags & TopicTree<TopicTreeMessage>::IteratorFlags::LAST) {
if (flags & TopicTree<TopicTreeMessage, TopicTreeBigMessage>::IteratorFlags::LAST) {
/* We should not uncork in all cases? */
if (needsUncork) {
((AsyncSocket<SSL> *)ws)->uncork();
Expand Down
2 changes: 1 addition & 1 deletion src/AsyncSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct AsyncSocket {
template <bool, bool, typename> friend struct WebSocketContext;
template <bool> friend struct TemplatedApp;
template <bool, typename> friend struct WebSocketContextData;
template <typename> friend struct TopicTree;
template <typename, typename> friend struct TopicTree;

protected:
/* Returns SSL pointer or FD as pointer */
Expand Down
40 changes: 35 additions & 5 deletions src/TopicTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct Topic : std::unordered_set<Subscriber *> {

struct Subscriber {

template <typename> friend struct TopicTree;
template <typename, typename> friend struct TopicTree;

private:
/* We use a factory */
Expand Down Expand Up @@ -74,7 +74,7 @@ struct Subscriber {
}
};

template <typename T>
template <typename T, typename B>
struct TopicTree {

enum IteratorFlags {
Expand Down Expand Up @@ -275,6 +275,27 @@ struct TopicTree {
}
}

/* Big messages bypass all buffering and land directly in backpressure */
template <typename F>
bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) {
/* Do we even have this topic? */
auto it = topics.find(topic);
if (it == topics.end()) {
return false;
}

/* For all subscribers in topic */
for (Subscriber *s : *it->second) {

/* If we are sender then ignore us */
if (sender != s) {
cb(s, bigMessage);
}
}

return true;
}

/* Linear in number of affected subscribers */
bool publish(Subscriber *sender, std::string_view topic, T &&message) {
/* Do we even have this topic? */
Expand All @@ -290,12 +311,18 @@ struct TopicTree {
drain();
}

/* If nobody references this message, don't buffer it */
bool referencedMessage = false;

/* For all subscribers in topic */
for (Subscriber *s : *it->second) {

/* If we are sender then ignore us */
if (sender != s) {

/* At least one subscriber wants this message */
referencedMessage = true;

/* If we already have too many outgoing messages on this subscriber, drain it now */
if (s->numMessageIndices == 32) {
/* This one does not need to check needsDrainage here but still does. */
Expand All @@ -318,10 +345,13 @@ struct TopicTree {
}

/* Push this message and return with success */
outgoingMessages.emplace_back(message);
return true;
}
if (referencedMessage) {
outgoingMessages.emplace_back(message);
}

/* Success if someone wants it */
return referencedMessage;
}
};

}
Expand Down
10 changes: 8 additions & 2 deletions src/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,15 @@ struct WebSocket : AsyncSocket<SSL> {
}

/* Publish as sender, does not receive its own messages even if subscribed to relevant topics */
bool success = webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress});
if (message.length() >= LoopData::CORK_BUFFER_SIZE) {
return webSocketContextData->topicTree->publishBig(webSocketData->subscriber, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) {
auto *ws = (WebSocket<SSL, true, int> *) s->user;

return success;
ws->send(message.message, (OpCode)message.opCode, message.compress);
});
} else {
return webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress});
}
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/WebSocketContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ struct WebSocketContext {

public:
/* WebSocket contexts are always child contexts to a HTTP context so no SSL options are needed as they are inherited */
static WebSocketContext *create(Loop */*loop*/, us_socket_context_t *parentSocketContext, TopicTree<TopicTreeMessage> *topicTree) {
static WebSocketContext *create(Loop */*loop*/, us_socket_context_t *parentSocketContext, TopicTree<TopicTreeMessage, TopicTreeBigMessage> *topicTree) {
WebSocketContext *webSocketContext = (WebSocketContext *) us_create_child_socket_context(SSL, parentSocketContext, sizeof(WebSocketContextData<SSL, USERDATA>));
if (!webSocketContext) {
return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions src/WebSocketContextData.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct WebSocketContextData {
public:

/* This one points to the App's shared topicTree */
TopicTree<TopicTreeMessage> *topicTree;
TopicTree<TopicTreeMessage, TopicTreeBigMessage> *topicTree;

/* The callbacks for this context */
MoveOnlyFunction<void(WebSocket<SSL, true, USERDATA> *)> openHandler = nullptr;
Expand Down Expand Up @@ -85,7 +85,7 @@ struct WebSocketContextData {

}

WebSocketContextData(TopicTree<TopicTreeMessage> *topicTree) : topicTree(topicTree) {
WebSocketContextData(TopicTree<TopicTreeMessage, TopicTreeBigMessage> *topicTree) : topicTree(topicTree) {

}
};
Expand Down

0 comments on commit 0ede7e5

Please sign in to comment.