diff --git a/src/App.h b/src/App.h index 284ece52c..4da74e1d5 100644 --- a/src/App.h +++ b/src/App.h @@ -27,6 +27,11 @@ namespace uWS { /*OpCode*/ int opCode; bool compress; }; + struct TopicTreeBigMessage { + std::string_view message; + /*OpCode*/ int opCode; + bool compress; + }; } /* An app is a convenience wrapper of some of the most used fuctionalities and allows a @@ -69,7 +74,7 @@ struct TemplatedApp { public: - TopicTree *topicTree = nullptr; + TopicTree *topicTree = nullptr; /* Server name */ TemplatedApp &&addServerName(std::string hostname_pattern, SocketContextOptions options = {}) { @@ -113,8 +118,18 @@ struct TemplatedApp { /* Publishes a message to all websocket contexts - conceptually as if publishing to the one single * TopicTree of this app (technically there are many TopicTrees, however the concept is that one * app has one conceptual Topic tree) */ - void publish(std::string_view topic, std::string_view message, OpCode opCode, bool compress = false) { - topicTree->publish(nullptr, topic, {std::string(message), opCode, compress}); + bool publish(std::string_view topic, std::string_view message, OpCode opCode, bool compress = false) { + /* Anything big bypasses corking efforts */ + if (message.length() >= LoopData::CORK_BUFFER_SIZE) { + return topicTree->publishBig(nullptr, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) { + auto *ws = (WebSocket *) s->user; + + /* Send will drain if needed */ + ws->send(message.message, (OpCode)message.opCode, message.compress); + }); + } else { + return topicTree->publish(nullptr, topic, {std::string(message), opCode, compress}); + } } /* Returns number of subscribers for this topic, or 0 for failure. @@ -225,7 +240,7 @@ struct TemplatedApp { if (!topicTree) { bool needsUncork = false; - topicTree = new TopicTree([needsUncork](Subscriber *s, TopicTreeMessage &message, TopicTree::IteratorFlags flags) mutable { + topicTree = new TopicTree([needsUncork](Subscriber *s, TopicTreeMessage &message, TopicTree::IteratorFlags flags) mutable { /* Subscriber's user is the socket */ /* Unfortunately we need to cast is to PerSocketData = int * since many different WebSocketContexts use the same @@ -233,7 +248,7 @@ struct TemplatedApp { auto *ws = (WebSocket *) s->user; /* If this is the first message we try and cork */ - if (flags & TopicTree::IteratorFlags::FIRST) { + if (flags & TopicTree::IteratorFlags::FIRST) { if (ws->canCork() && !ws->isCorked()) { ((AsyncSocket *)ws)->cork(); needsUncork = true; @@ -251,7 +266,7 @@ struct TemplatedApp { } /* If this is the last message we uncork if we are corked */ - if (flags & TopicTree::IteratorFlags::LAST) { + if (flags & TopicTree::IteratorFlags::LAST) { /* We should not uncork in all cases? */ if (needsUncork) { ((AsyncSocket *)ws)->uncork(); diff --git a/src/AsyncSocket.h b/src/AsyncSocket.h index 9cf7aa5f8..1b28a8593 100644 --- a/src/AsyncSocket.h +++ b/src/AsyncSocket.h @@ -49,7 +49,7 @@ struct AsyncSocket { template friend struct WebSocketContext; template friend struct TemplatedApp; template friend struct WebSocketContextData; - template friend struct TopicTree; + template friend struct TopicTree; protected: /* Returns SSL pointer or FD as pointer */ diff --git a/src/TopicTree.h b/src/TopicTree.h index a8371283d..a027f5cb8 100644 --- a/src/TopicTree.h +++ b/src/TopicTree.h @@ -45,7 +45,7 @@ struct Topic : std::unordered_set { struct Subscriber { - template friend struct TopicTree; + template friend struct TopicTree; private: /* We use a factory */ @@ -74,7 +74,7 @@ struct Subscriber { } }; -template +template struct TopicTree { enum IteratorFlags { @@ -275,6 +275,27 @@ struct TopicTree { } } + /* Big messages bypass all buffering and land directly in backpressure */ + template + bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) { + /* Do we even have this topic? */ + auto it = topics.find(topic); + if (it == topics.end()) { + return false; + } + + /* For all subscribers in topic */ + for (Subscriber *s : *it->second) { + + /* If we are sender then ignore us */ + if (sender != s) { + cb(s, bigMessage); + } + } + + return true; + } + /* Linear in number of affected subscribers */ bool publish(Subscriber *sender, std::string_view topic, T &&message) { /* Do we even have this topic? */ @@ -290,12 +311,18 @@ struct TopicTree { drain(); } + /* If nobody references this message, don't buffer it */ + bool referencedMessage = false; + /* For all subscribers in topic */ for (Subscriber *s : *it->second) { /* If we are sender then ignore us */ if (sender != s) { + /* At least one subscriber wants this message */ + referencedMessage = true; + /* If we already have too many outgoing messages on this subscriber, drain it now */ if (s->numMessageIndices == 32) { /* This one does not need to check needsDrainage here but still does. */ @@ -318,10 +345,13 @@ struct TopicTree { } /* Push this message and return with success */ - outgoingMessages.emplace_back(message); - return true; - } + if (referencedMessage) { + outgoingMessages.emplace_back(message); + } + /* Success if someone wants it */ + return referencedMessage; + } }; } diff --git a/src/WebSocket.h b/src/WebSocket.h index f5c10a3fb..ed9c508c1 100644 --- a/src/WebSocket.h +++ b/src/WebSocket.h @@ -304,9 +304,15 @@ struct WebSocket : AsyncSocket { } /* Publish as sender, does not receive its own messages even if subscribed to relevant topics */ - bool success = webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress}); + if (message.length() >= LoopData::CORK_BUFFER_SIZE) { + return webSocketContextData->topicTree->publishBig(webSocketData->subscriber, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) { + auto *ws = (WebSocket *) s->user; - return success; + ws->send(message.message, (OpCode)message.opCode, message.compress); + }); + } else { + return webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress}); + } } }; diff --git a/src/WebSocketContext.h b/src/WebSocketContext.h index cd4acbf38..f9fad2662 100644 --- a/src/WebSocketContext.h +++ b/src/WebSocketContext.h @@ -404,7 +404,7 @@ struct WebSocketContext { public: /* WebSocket contexts are always child contexts to a HTTP context so no SSL options are needed as they are inherited */ - static WebSocketContext *create(Loop */*loop*/, us_socket_context_t *parentSocketContext, TopicTree *topicTree) { + static WebSocketContext *create(Loop */*loop*/, us_socket_context_t *parentSocketContext, TopicTree *topicTree) { WebSocketContext *webSocketContext = (WebSocketContext *) us_create_child_socket_context(SSL, parentSocketContext, sizeof(WebSocketContextData)); if (!webSocketContext) { return nullptr; diff --git a/src/WebSocketContextData.h b/src/WebSocketContextData.h index 06138e7ff..ccd1530e2 100644 --- a/src/WebSocketContextData.h +++ b/src/WebSocketContextData.h @@ -42,7 +42,7 @@ struct WebSocketContextData { public: /* This one points to the App's shared topicTree */ - TopicTree *topicTree; + TopicTree *topicTree; /* The callbacks for this context */ MoveOnlyFunction *)> openHandler = nullptr; @@ -85,7 +85,7 @@ struct WebSocketContextData { } - WebSocketContextData(TopicTree *topicTree) : topicTree(topicTree) { + WebSocketContextData(TopicTree *topicTree) : topicTree(topicTree) { } };