1 /*
2  * Authored by Alex Hultman, 2018-2021.
3  * Intellectual property of third-party.
4 
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8 
9  *     http://www.apache.org/licenses/LICENSE-2.0
10 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef UWS_TOPICTREE_H
19 #define UWS_TOPICTREE_H
20 
21 #include <map>
22 #include <list>
23 #include <iostream>
24 #include <unordered_set>
25 #include <utility>
26 #include <memory>
27 #include <unordered_map>
28 #include <vector>
29 #include <string_view>
30 #include <functional>
31 #include <set>
32 
33 namespace uWS {
34 
35 struct Subscriber;
36 
37 struct Topic : std::unordered_set<Subscriber *> {
38 
TopicTopic39     Topic(std::string_view topic) : name(topic) {
40 
41     }
42 
43     std::string name;
44 };
45 
46 struct Subscriber {
47 
48     template <typename, typename> friend struct TopicTree;
49 
50 private:
51     /* We use a factory */
52     Subscriber() = default;
53 
54     /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */
55     Subscriber *prev, *next;
56 
57     /* Any one subscriber can be part of at most 32 publishes before it needs a drain,
58      * or whatever encoding of runs or whatever we might do in the future */
59     uint16_t messageIndices[32];
60 
61     /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */
62     unsigned char numMessageIndices = 0;
63 
64 public:
65 
66     /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */
67     std::set<Topic *> topics;
68 
69     /* User data */
70     void *user;
71 
needsDrainageSubscriber72     bool needsDrainage() {
73         return numMessageIndices;
74     }
75 };
76 
77 template <typename T, typename B>
78 struct TopicTree {
79 
80     enum IteratorFlags {
81         LAST = 1,
82         FIRST = 2
83     };
84 
85     /* Whomever is iterating this topic is locked to not modify its own list */
86     Subscriber *iteratingSubscriber = nullptr;
87 
88 private:
89 
90     /* The drain callback must not publish, unsubscribe or subscribe.
91      * It must only cork, uncork, send, write */
92     std::function<bool(Subscriber *, T &, IteratorFlags)> cb;
93 
94     /* The topics */
95     std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics;
96 
97     /* List of subscribers that needs drainage */
98     Subscriber *drainableSubscribers = nullptr;
99 
100     /* Palette of outgoing messages, up to 64k */
101     std::vector<T> outgoingMessages;
102 
checkIteratingSubscriberTopicTree103     void checkIteratingSubscriber(Subscriber *s) {
104         /* Notify user that they are doing something wrong here */
105         if (iteratingSubscriber == s) {
106             std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
107             std::terminate();
108         }
109     }
110 
111     /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */
drainImplTopicTree112     void drainImpl(Subscriber *s) {
113         /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
114          * since WebSocket::send will call drain from within the cb in that case.*/
115         int numMessageIndices = s->numMessageIndices;
116         s->numMessageIndices = 0;
117 
118         /* Then we emit cb */
119         for (int i = 0; i < numMessageIndices; i++) {
120             T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
121 
122             int flags = (i == numMessageIndices - 1) ? LAST : 0;
123 
124             /* Returning true will stop drainage short (such as when backpressure is too high) */
125             if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
126                 break;
127             }
128         }
129     }
130 
unlinkDrainableSubscriberTopicTree131     void unlinkDrainableSubscriber(Subscriber *s) {
132         if (s->prev) {
133             s->prev->next = s->next;
134         }
135         if (s->next) {
136             s->next->prev = s->prev;
137         }
138         /* If we are the head, then we also need to reset the head */
139         if (drainableSubscribers == s) {
140             drainableSubscribers = s->next;
141         }
142     }
143 
144 public:
145 
TopicTreeTopicTree146     TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
147 
148     }
149 
150     /* Returns nullptr if not found */
lookupTopicTopicTree151     Topic *lookupTopic(std::string_view topic) {
152         auto it = topics.find(topic);
153         if (it == topics.end()) {
154             return nullptr;
155         }
156         return it->second.get();
157     }
158 
159     /* Subscribe fails if we already are subscribed */
subscribeTopicTree160     bool subscribe(Subscriber *s, std::string_view topic) {
161         /* Notify user that they are doing something wrong here */
162         checkIteratingSubscriber(s);
163 
164         /* Lookup or create new topic */
165         Topic *topicPtr = lookupTopic(topic);
166         if (!topicPtr) {
167             Topic *newTopic = new Topic(topic);
168             topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
169             topicPtr = newTopic;
170         }
171 
172         /* Insert us in topic, insert topic in us */
173         auto [it, inserted] = s->topics.insert(topicPtr);
174         if (!inserted) {
175             return false;
176         }
177         topicPtr->insert(s);
178 
179         /* Success */
180         return true;
181     }
182 
183     /* Returns ok, last */
unsubscribeTopicTree184     std::pair<bool, bool> unsubscribe(Subscriber *s, std::string_view topic) {
185         /* Notify user that they are doing something wrong here */
186         checkIteratingSubscriber(s);
187 
188         /* Lookup topic */
189         Topic *topicPtr = lookupTopic(topic);
190         if (!topicPtr) {
191             /* If the topic doesn't exist we are assumed to still be subscribers of something */
192             return {false, false};
193         }
194 
195         /* Erase from our list first */
196         if (s->topics.erase(topicPtr) == 0) {
197             return {false, false};
198         }
199 
200         /* Remove us from topic */
201         topicPtr->erase(s);
202 
203         /* If there is no subscriber to this topic, remove it */
204         if (!topicPtr->size()) {
205             /* Unique_ptr deletes the topic */
206             topics.erase(topic);
207         }
208 
209         /* If we don't hold any topics we are to be freed altogether */
210         return {true, topics.size() == 0};
211     }
212 
213     /* Factory function for creating a Subscriber */
createSubscriberTopicTree214     Subscriber *createSubscriber() {
215         return new Subscriber();
216     }
217 
218     /* This is used to end a Subscriber, before freeing it */
freeSubscriberTopicTree219     void freeSubscriber(Subscriber *s) {
220 
221         /* I guess we call this one even if we are not subscribers */
222         if (!s) {
223             return;
224         }
225 
226         /* For all topics, unsubscribe */
227         for (Topic *topicPtr : s->topics) {
228             /* If we are the last subscriber, simply remove the whole topic */
229             if (topicPtr->size() == 1) {
230                 topics.erase(topicPtr->name);
231             } else {
232                 /* Otherwise just remove us */
233                 topicPtr->erase(s);
234             }
235         }
236 
237         /* We also need to unlink us */
238         if (s->needsDrainage()) {
239             unlinkDrainableSubscriber(s);
240         }
241 
242         delete s;
243     }
244 
245     /* Mainly used by WebSocket::send to drain one socket before sending */
drainTopicTree246     void drain(Subscriber *s) {
247         /* The list is undefined and cannot be touched unless needsDrainage(). */
248         if (s->needsDrainage()) {
249             /* This function differs from drainImpl by properly unlinking
250             * the subscriber from drainableSubscribers. drainImpl does not. */
251             unlinkDrainableSubscriber(s);
252 
253             /* This one always resets needsDrainage before it calls any cb's.
254              * Otherwise we would stackoverflow when sending after publish but before drain. */
255             drainImpl(s);
256 
257             /* If we drained last subscriber, also clear outgoingMessages */
258             if (!drainableSubscribers) {
259                 outgoingMessages.clear();
260             }
261         }
262     }
263 
264     /* Called everytime we call send, to drain published messages so to sync outgoing messages */
drainTopicTree265     void drain() {
266         if (drainableSubscribers) {
267             /* Drain one socket a time */
268             for (Subscriber *s = drainableSubscribers; s; s = s->next) {
269                 /* Instead of unlinking every single subscriber, we just leave the list undefined
270                  * and reset drainableSubscribers ptr below. */
271                 drainImpl(s);
272             }
273             /* Drain always clears drainableSubscribers and outgoingMessages */
274             drainableSubscribers = nullptr;
275             outgoingMessages.clear();
276         }
277     }
278 
279     /* Big messages bypass all buffering and land directly in backpressure */
280     template <typename F>
publishBigTopicTree281     bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) {
282         /* Do we even have this topic? */
283         auto it = topics.find(topic);
284         if (it == topics.end()) {
285             return false;
286         }
287 
288         /* For all subscribers in topic */
289         for (Subscriber *s : *it->second) {
290 
291             /* If we are sender then ignore us */
292             if (sender != s) {
293                 cb(s, bigMessage);
294             }
295         }
296 
297         return true;
298     }
299 
300     /* Linear in number of affected subscribers */
publishTopicTree301     bool publish(Subscriber *sender, std::string_view topic, T &&message) {
302         /* Do we even have this topic? */
303         auto it = topics.find(topic);
304         if (it == topics.end()) {
305             return false;
306         }
307 
308         /* If we have more than 65k messages we need to drain every socket. */
309         if (outgoingMessages.size() == UINT16_MAX) {
310             /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
311              * to their own backpressure */
312             drain();
313         }
314 
315         /* If nobody references this message, don't buffer it */
316         bool referencedMessage = false;
317 
318         /* For all subscribers in topic */
319         for (Subscriber *s : *it->second) {
320 
321             /* If we are sender then ignore us */
322             if (sender != s) {
323 
324                 /* At least one subscriber wants this message */
325                 referencedMessage = true;
326 
327                 /* If we already have too many outgoing messages on this subscriber, drain it now */
328                 if (s->numMessageIndices == 32) {
329                     /* This one does not need to check needsDrainage here but still does. */
330                     drain(s);
331                 }
332 
333                 /* Finally we can continue */
334                 s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
335                 /* First message adds subscriber to list of drainable subscribers */
336                 if (s->numMessageIndices == 1) {
337                     /* Insert us in the head of drainable subscribers */
338                     s->next = drainableSubscribers;
339                     s->prev = nullptr;
340                     if (s->next) {
341                         s->next->prev = s;
342                     }
343                     drainableSubscribers = s;
344                 }
345             }
346         }
347 
348         /* Push this message and return with success */
349         if (referencedMessage) {
350             outgoingMessages.emplace_back(message);
351         }
352 
353         /* Success if someone wants it */
354         return referencedMessage;
355     }
356 };
357 
358 }
359 
360 #endif
361