1 /* 2 * Authored by Alex Hultman, 2018-2021. 3 * Intellectual property of third-party. 4 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 9 * http://www.apache.org/licenses/LICENSE-2.0 10 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef UWS_TOPICTREE_H 19 #define UWS_TOPICTREE_H 20 21 #include <map> 22 #include <list> 23 #include <iostream> 24 #include <unordered_set> 25 #include <utility> 26 #include <memory> 27 #include <unordered_map> 28 #include <vector> 29 #include <string_view> 30 #include <functional> 31 #include <set> 32 33 namespace uWS { 34 35 struct Subscriber; 36 37 struct Topic : std::unordered_set<Subscriber *> { 38 TopicTopic39 Topic(std::string_view topic) : name(topic) { 40 41 } 42 43 std::string name; 44 }; 45 46 struct Subscriber { 47 48 template <typename, typename> friend struct TopicTree; 49 50 private: 51 /* We use a factory */ 52 Subscriber() = default; 53 54 /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */ 55 Subscriber *prev, *next; 56 57 /* Any one subscriber can be part of at most 32 publishes before it needs a drain, 58 * or whatever encoding of runs or whatever we might do in the future */ 59 uint16_t messageIndices[32]; 60 61 /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */ 62 unsigned char numMessageIndices = 0; 63 64 public: 65 66 /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */ 67 std::set<Topic *> topics; 68 69 /* User data */ 70 void *user; 71 needsDrainageSubscriber72 bool needsDrainage() { 73 return numMessageIndices; 74 } 75 }; 76 77 template <typename T, typename B> 78 struct TopicTree { 79 80 enum IteratorFlags { 81 LAST = 1, 82 FIRST = 2 83 }; 84 85 /* Whomever is iterating this topic is locked to not modify its own list */ 86 Subscriber *iteratingSubscriber = nullptr; 87 88 private: 89 90 /* The drain callback must not publish, unsubscribe or subscribe. 91 * It must only cork, uncork, send, write */ 92 std::function<bool(Subscriber *, T &, IteratorFlags)> cb; 93 94 /* The topics */ 95 std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics; 96 97 /* List of subscribers that needs drainage */ 98 Subscriber *drainableSubscribers = nullptr; 99 100 /* Palette of outgoing messages, up to 64k */ 101 std::vector<T> outgoingMessages; 102 checkIteratingSubscriberTopicTree103 void checkIteratingSubscriber(Subscriber *s) { 104 /* Notify user that they are doing something wrong here */ 105 if (iteratingSubscriber == s) { 106 std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; 107 std::terminate(); 108 } 109 } 110 111 /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */ drainImplTopicTree112 void drainImpl(Subscriber *s) { 113 /* Before we call cb we need to make sure this subscriber will not report needsDrainage() 114 * since WebSocket::send will call drain from within the cb in that case.*/ 115 int numMessageIndices = s->numMessageIndices; 116 s->numMessageIndices = 0; 117 118 /* Then we emit cb */ 119 for (int i = 0; i < numMessageIndices; i++) { 120 T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; 121 122 int flags = (i == numMessageIndices - 1) ? LAST : 0; 123 124 /* Returning true will stop drainage short (such as when backpressure is too high) */ 125 if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { 126 break; 127 } 128 } 129 } 130 unlinkDrainableSubscriberTopicTree131 void unlinkDrainableSubscriber(Subscriber *s) { 132 if (s->prev) { 133 s->prev->next = s->next; 134 } 135 if (s->next) { 136 s->next->prev = s->prev; 137 } 138 /* If we are the head, then we also need to reset the head */ 139 if (drainableSubscribers == s) { 140 drainableSubscribers = s->next; 141 } 142 } 143 144 public: 145 TopicTreeTopicTree146 TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { 147 148 } 149 150 /* Returns nullptr if not found */ lookupTopicTopicTree151 Topic *lookupTopic(std::string_view topic) { 152 auto it = topics.find(topic); 153 if (it == topics.end()) { 154 return nullptr; 155 } 156 return it->second.get(); 157 } 158 159 /* Subscribe fails if we already are subscribed */ subscribeTopicTree160 bool subscribe(Subscriber *s, std::string_view topic) { 161 /* Notify user that they are doing something wrong here */ 162 checkIteratingSubscriber(s); 163 164 /* Lookup or create new topic */ 165 Topic *topicPtr = lookupTopic(topic); 166 if (!topicPtr) { 167 Topic *newTopic = new Topic(topic); 168 topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); 169 topicPtr = newTopic; 170 } 171 172 /* Insert us in topic, insert topic in us */ 173 auto [it, inserted] = s->topics.insert(topicPtr); 174 if (!inserted) { 175 return false; 176 } 177 topicPtr->insert(s); 178 179 /* Success */ 180 return true; 181 } 182 183 /* Returns ok, last */ unsubscribeTopicTree184 std::pair<bool, bool> unsubscribe(Subscriber *s, std::string_view topic) { 185 /* Notify user that they are doing something wrong here */ 186 checkIteratingSubscriber(s); 187 188 /* Lookup topic */ 189 Topic *topicPtr = lookupTopic(topic); 190 if (!topicPtr) { 191 /* If the topic doesn't exist we are assumed to still be subscribers of something */ 192 return {false, false}; 193 } 194 195 /* Erase from our list first */ 196 if (s->topics.erase(topicPtr) == 0) { 197 return {false, false}; 198 } 199 200 /* Remove us from topic */ 201 topicPtr->erase(s); 202 203 /* If there is no subscriber to this topic, remove it */ 204 if (!topicPtr->size()) { 205 /* Unique_ptr deletes the topic */ 206 topics.erase(topic); 207 } 208 209 /* If we don't hold any topics we are to be freed altogether */ 210 return {true, topics.size() == 0}; 211 } 212 213 /* Factory function for creating a Subscriber */ createSubscriberTopicTree214 Subscriber *createSubscriber() { 215 return new Subscriber(); 216 } 217 218 /* This is used to end a Subscriber, before freeing it */ freeSubscriberTopicTree219 void freeSubscriber(Subscriber *s) { 220 221 /* I guess we call this one even if we are not subscribers */ 222 if (!s) { 223 return; 224 } 225 226 /* For all topics, unsubscribe */ 227 for (Topic *topicPtr : s->topics) { 228 /* If we are the last subscriber, simply remove the whole topic */ 229 if (topicPtr->size() == 1) { 230 topics.erase(topicPtr->name); 231 } else { 232 /* Otherwise just remove us */ 233 topicPtr->erase(s); 234 } 235 } 236 237 /* We also need to unlink us */ 238 if (s->needsDrainage()) { 239 unlinkDrainableSubscriber(s); 240 } 241 242 delete s; 243 } 244 245 /* Mainly used by WebSocket::send to drain one socket before sending */ drainTopicTree246 void drain(Subscriber *s) { 247 /* The list is undefined and cannot be touched unless needsDrainage(). */ 248 if (s->needsDrainage()) { 249 /* This function differs from drainImpl by properly unlinking 250 * the subscriber from drainableSubscribers. drainImpl does not. */ 251 unlinkDrainableSubscriber(s); 252 253 /* This one always resets needsDrainage before it calls any cb's. 254 * Otherwise we would stackoverflow when sending after publish but before drain. */ 255 drainImpl(s); 256 257 /* If we drained last subscriber, also clear outgoingMessages */ 258 if (!drainableSubscribers) { 259 outgoingMessages.clear(); 260 } 261 } 262 } 263 264 /* Called everytime we call send, to drain published messages so to sync outgoing messages */ drainTopicTree265 void drain() { 266 if (drainableSubscribers) { 267 /* Drain one socket a time */ 268 for (Subscriber *s = drainableSubscribers; s; s = s->next) { 269 /* Instead of unlinking every single subscriber, we just leave the list undefined 270 * and reset drainableSubscribers ptr below. */ 271 drainImpl(s); 272 } 273 /* Drain always clears drainableSubscribers and outgoingMessages */ 274 drainableSubscribers = nullptr; 275 outgoingMessages.clear(); 276 } 277 } 278 279 /* Big messages bypass all buffering and land directly in backpressure */ 280 template <typename F> publishBigTopicTree281 bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) { 282 /* Do we even have this topic? */ 283 auto it = topics.find(topic); 284 if (it == topics.end()) { 285 return false; 286 } 287 288 /* For all subscribers in topic */ 289 for (Subscriber *s : *it->second) { 290 291 /* If we are sender then ignore us */ 292 if (sender != s) { 293 cb(s, bigMessage); 294 } 295 } 296 297 return true; 298 } 299 300 /* Linear in number of affected subscribers */ publishTopicTree301 bool publish(Subscriber *sender, std::string_view topic, T &&message) { 302 /* Do we even have this topic? */ 303 auto it = topics.find(topic); 304 if (it == topics.end()) { 305 return false; 306 } 307 308 /* If we have more than 65k messages we need to drain every socket. */ 309 if (outgoingMessages.size() == UINT16_MAX) { 310 /* If there is a socket that is currently corked, this will be ugly as all sockets will drain 311 * to their own backpressure */ 312 drain(); 313 } 314 315 /* If nobody references this message, don't buffer it */ 316 bool referencedMessage = false; 317 318 /* For all subscribers in topic */ 319 for (Subscriber *s : *it->second) { 320 321 /* If we are sender then ignore us */ 322 if (sender != s) { 323 324 /* At least one subscriber wants this message */ 325 referencedMessage = true; 326 327 /* If we already have too many outgoing messages on this subscriber, drain it now */ 328 if (s->numMessageIndices == 32) { 329 /* This one does not need to check needsDrainage here but still does. */ 330 drain(s); 331 } 332 333 /* Finally we can continue */ 334 s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); 335 /* First message adds subscriber to list of drainable subscribers */ 336 if (s->numMessageIndices == 1) { 337 /* Insert us in the head of drainable subscribers */ 338 s->next = drainableSubscribers; 339 s->prev = nullptr; 340 if (s->next) { 341 s->next->prev = s; 342 } 343 drainableSubscribers = s; 344 } 345 } 346 } 347 348 /* Push this message and return with success */ 349 if (referencedMessage) { 350 outgoingMessages.emplace_back(message); 351 } 352 353 /* Success if someone wants it */ 354 return referencedMessage; 355 } 356 }; 357 358 } 359 360 #endif 361