1 /*
2 * lws System Message Distribution
3 *
4 * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include "private-lib-core.h"
26 #include <assert.h>
27
28 /* comment me to remove extra debug and sanity checks */
29 // #define LWS_SMD_DEBUG
30
31
32 #if defined(LWS_SMD_DEBUG)
33 #define lwsl_smd lwsl_notice
34 #else
35 #define lwsl_smd(_s, ...)
36 #endif
37
38 void *
lws_smd_msg_alloc(struct lws_context * ctx,lws_smd_class_t _class,size_t len)39 lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
40 {
41 lws_smd_msg_t *msg;
42
43 /* only allow it if someone wants to consume this class of event */
44
45 if (!(ctx->smd._class_filter & _class)) {
46 lwsl_info("%s: rejecting class 0x%x as no participant wants it\n",
47 __func__, (unsigned int)_class);
48 return NULL;
49 }
50
51 assert(len <= LWS_SMD_MAX_PAYLOAD);
52
53
54 /*
55 * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
56 * payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
57 */
58 msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
59 __func__);
60 if (!msg)
61 return NULL;
62
63 memset(msg, 0, sizeof(*msg));
64 msg->timestamp = lws_now_usecs();
65 msg->length = (uint16_t)len;
66 msg->_class = _class;
67
68 return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
69 }
70
71 void
lws_smd_msg_free(void ** ppay)72 lws_smd_msg_free(void **ppay)
73 {
74 lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
75 LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
76
77 /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
78 lws_free(msg);
79 *ppay = NULL;
80 }
81
82 #if defined(LWS_SMD_DEBUG)
83 static void
lws_smd_dump(lws_smd_t * smd)84 lws_smd_dump(lws_smd_t *smd)
85 {
86 int n = 1;
87
88 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
89 smd->owner_messages.head) {
90 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
91
92 lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
93 n++, msg, msg->refcount,
94 (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
95 msg->length, msg->_class,
96 (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
97
98 } lws_end_foreach_dll_safe(p, p1);
99
100 n = 1;
101 lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
102 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
103
104 lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
105 n++, pr, pr->tail, pr->_class_filter);
106 } lws_end_foreach_dll(p);
107 }
108 #endif
109
110 static int
_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t * pr,lws_smd_msg_t * msg)111 _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
112 {
113 return !!(msg->_class & pr->_class_filter);
114 }
115
116 /*
117 * Figure out what to set the initial refcount for the message to
118 */
119
120 static int
_lws_smd_msg_assess_peers_interested(lws_smd_t * smd,lws_smd_msg_t * msg,struct lws_smd_peer * exc)121 _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
122 struct lws_smd_peer *exc)
123 {
124 struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
125 int interested = 0;
126
127 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
128 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
129
130 if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
131 /*
132 * This peer wants to consume it
133 */
134 interested++;
135
136 } lws_end_foreach_dll(p);
137
138 return interested;
139 }
140
141 static int
_lws_smd_class_mask_union(lws_smd_t * smd)142 _lws_smd_class_mask_union(lws_smd_t *smd)
143 {
144 uint32_t mask = 0;
145
146 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
147 smd->owner_peers.head) {
148 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
149
150 mask |= pr->_class_filter;
151
152 } lws_end_foreach_dll_safe(p, p1);
153
154 smd->_class_filter = mask;
155
156 return 0;
157 }
158
159 /* Call with message lock held */
160
161 static void
_lws_smd_msg_destroy(lws_smd_t * smd,lws_smd_msg_t * msg)162 _lws_smd_msg_destroy(lws_smd_t *smd, lws_smd_msg_t *msg)
163 {
164 /*
165 * We think we gave the message to everyone and can destroy it.
166 * Sanity check that no peer holds a pointer to this guy
167 */
168
169 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
170 smd->owner_peers.head) {
171 lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
172
173 if (xpr->tail == msg) {
174 lwsl_err("%s: peer %p has msg %p "
175 "we are about to destroy as tail\n",
176 __func__, xpr, msg);
177 #if !defined(LWS_PLAT_FREERTOS)
178 assert(0);
179 #endif
180 }
181
182 } lws_end_foreach_dll_safe(p, p1);
183
184 /*
185 * We have fully delivered the message now, it
186 * can be unlinked and destroyed
187 */
188 lwsl_info("%s: destroy msg %p\n", __func__, msg);
189 lws_dll2_remove(&msg->list);
190 lws_free(msg);
191 }
192
193 /*
194 * This is wanting to be threadsafe, limiting the apis we can call
195 */
196
197 int
_lws_smd_msg_send(struct lws_context * ctx,void * pay,struct lws_smd_peer * exc)198 _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
199 {
200 lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
201 LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
202
203 if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
204 lwsl_warn("%s: rejecting message on queue depth %d\n",
205 __func__, (int)ctx->smd.owner_messages.count);
206 /* reject the message due to max queue depth reached */
207 return 1;
208 }
209
210 if (!ctx->smd.delivering)
211 lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
212
213 msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
214 &ctx->smd, msg, exc);
215 if (!msg->refcount) {
216 /* possible, condsidering exc and no other participants */
217 lws_free(msg);
218 if (!ctx->smd.delivering)
219 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
220
221 return 0;
222 }
223
224 msg->exc = exc;
225
226 /* let's add him on the queue... */
227
228 lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
229 lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
230
231 /*
232 * Any peer with no active tail needs to check our class to see if we
233 * should become his tail
234 */
235
236 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
237 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
238
239 if (pr != exc &&
240 !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
241 pr->tail = msg;
242 /* tail message has to actually be of interest to the peer */
243 assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
244 }
245
246 } lws_end_foreach_dll(p);
247
248 #if defined(LWS_SMD_DEBUG)
249 lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
250 msg, msg->refcount, ctx->smd.owner_messages.count);
251 lws_smd_dump(&ctx->smd);
252 #endif
253
254 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
255
256 if (!ctx->smd.delivering)
257 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
258
259 /* we may be happening from another thread context */
260 lws_cancel_service(ctx);
261
262 return 0;
263 }
264
265 /*
266 * This is wanting to be threadsafe, limiting the apis we can call
267 */
268
269 int
lws_smd_msg_send(struct lws_context * ctx,void * pay)270 lws_smd_msg_send(struct lws_context *ctx, void *pay)
271 {
272 return _lws_smd_msg_send(ctx, pay, NULL);
273 }
274
275 /*
276 * This is wanting to be threadsafe, limiting the apis we can call
277 */
278
279 int
lws_smd_msg_printf(struct lws_context * ctx,lws_smd_class_t _class,const char * format,...)280 lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
281 const char *format, ...)
282 {
283 lws_smd_msg_t *msg;
284 va_list ap;
285 void *p;
286 int n;
287
288 if (!(ctx->smd._class_filter & _class))
289 /*
290 * There's nobody interested in messages of this class atm.
291 * Don't bother generating it, and act like all is well.
292 */
293 return 0;
294
295 va_start(ap, format);
296 n = vsnprintf(NULL, 0, format, ap);
297 va_end(ap);
298 if (n > LWS_SMD_MAX_PAYLOAD)
299 /* too large to send */
300 return 1;
301
302 p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
303 if (!p)
304 return 1;
305 msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
306 sizeof(*msg));
307 msg->length = (uint16_t)n;
308 va_start(ap, format);
309 vsnprintf((char *)p, (unsigned int)n + 2, format, ap);
310 va_end(ap);
311
312 /*
313 * locks taken and released in here
314 */
315
316 if (lws_smd_msg_send(ctx, p)) {
317 lws_smd_msg_free(&p);
318 return 1;
319 }
320
321 return 0;
322 }
323
324 #if defined(LWS_WITH_SECURE_STREAMS)
325 int
lws_smd_ss_msg_printf(const char * tag,uint8_t * buf,size_t * len,lws_smd_class_t _class,const char * format,...)326 lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
327 lws_smd_class_t _class, const char *format, ...)
328 {
329 char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN;
330 va_list ap;
331 int n;
332
333 if (*len < LWS_SMD_SS_RX_HEADER_LEN)
334 return 1;
335
336 lws_ser_wu64be(buf, _class);
337 lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
338
339 va_start(ap, format);
340 n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap);
341 va_end(ap);
342
343 if (n > LWS_SMD_MAX_PAYLOAD ||
344 (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN)
345 /* too large to send */
346 return 1;
347
348 *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
349
350 lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class,
351 (unsigned int)n);
352
353 return 0;
354 }
355
356 /*
357 * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
358 * call through to with the payload it received from the proxy. It will then
359 * forward the recieved SMD message to all local (same-context) participants
360 * that are interested in that class (except ones with callback skip_cb, so
361 * we don't loop).
362 */
363
364 static int
_lws_smd_ss_rx_forward(struct lws_context * ctx,const char * tag,struct lws_smd_peer * pr,const uint8_t * buf,size_t len)365 _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
366 struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
367 {
368 lws_smd_class_t _class;
369 lws_smd_msg_t *msg;
370 void *p;
371
372 if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
373 return 1;
374
375 if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
376 return 1;
377
378 _class = (lws_smd_class_t)lws_ser_ru64be(buf);
379
380 if (_class == LWSSMDCL_METRICS) {
381
382 }
383
384 /* only locally forward messages that we care about in this process */
385
386 if (!(ctx->smd._class_filter & _class))
387 /*
388 * There's nobody interested in messages of this class atm.
389 * Don't bother generating it, and act like all is well.
390 */
391 return 0;
392
393 p = lws_smd_msg_alloc(ctx, _class, len);
394 if (!p)
395 return 1;
396
397 msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
398 sizeof(*msg));
399 msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
400 /* adopt the original source timestamp, not time we forwarded it */
401 msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
402
403 /* copy the message payload in */
404 memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
405
406 /*
407 * locks taken and released in here
408 */
409
410 if (_lws_smd_msg_send(ctx, p, pr)) {
411 /* we couldn't send it after all that... */
412 lws_smd_msg_free(&p);
413
414 return 1;
415 }
416
417 lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
418 tag, (unsigned int)_class, msg->length,
419 (unsigned long long)msg->timestamp);
420
421 return 0;
422 }
423
424 int
lws_smd_ss_rx_forward(void * ss_user,const uint8_t * buf,size_t len)425 lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
426 {
427 struct lws_ss_handle *h = (struct lws_ss_handle *)
428 (((char *)ss_user) - sizeof(*h));
429 struct lws_context *ctx = lws_ss_get_context(h);
430
431 return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
432 }
433
434 #if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
435 int
lws_smd_sspc_rx_forward(void * ss_user,const uint8_t * buf,size_t len)436 lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
437 {
438 struct lws_sspc_handle *h = (struct lws_sspc_handle *)
439 (((char *)ss_user) - sizeof(*h));
440 struct lws_context *ctx = lws_sspc_get_context(h);
441
442 return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
443 }
444 #endif
445
446 #endif
447
448 /*
449 * Peers that deregister need to adjust the refcount of messages they would
450 * have been interested in, but didn't take delivery of yet
451 */
452
453 static void
_lws_smd_peer_destroy(lws_smd_peer_t * pr)454 _lws_smd_peer_destroy(lws_smd_peer_t *pr)
455 {
456 lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
457 owner_peers);
458
459 lws_mutex_lock(smd->lock_messages); /* +++++++++ messages */
460
461 lws_dll2_remove(&pr->list);
462
463 /*
464 * We take the approach to adjust the refcount of every would-have-been
465 * delivered message we were interested in
466 */
467
468 while (pr->tail) {
469
470 lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
471 lws_smd_msg_t, list);
472
473 if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
474 if (!--pr->tail->refcount)
475 _lws_smd_msg_destroy(smd, pr->tail);
476 }
477
478 pr->tail = m1;
479 }
480
481 lws_free(pr);
482
483 lws_mutex_unlock(smd->lock_messages); /* messages ------- */
484 }
485
486 static lws_smd_msg_t *
_lws_smd_msg_next_matching_filter(lws_smd_peer_t * pr)487 _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
488 {
489 lws_dll2_t *tail = &pr->tail->list;
490 lws_smd_msg_t *msg;
491
492 do {
493 tail = tail->next;
494 if (!tail)
495 return NULL;
496
497 msg = lws_container_of(tail, lws_smd_msg_t, list);
498 if (msg->exc != pr &&
499 _lws_smd_msg_peer_interested_in_msg(pr, msg))
500 return msg;
501 } while (1);
502
503 return NULL;
504 }
505
506 /*
507 * Delivers only one message to the peer and advances the tail, or sets to NULL
508 * if no more filtered queued messages. Returns nonzero if tail non-NULL.
509 *
510 * For Proxied SS, only asks for writeable and does not advance or change the
511 * tail.
512 *
513 * This is done so if multiple messages queued, we don't get a situation where
514 * one participant gets them all spammed, then the next etc. Instead they are
515 * delivered round-robin.
516 *
517 * Requires peer lock, may take message lock
518 */
519
520 static int
_lws_smd_msg_deliver_peer(struct lws_context * ctx,lws_smd_peer_t * pr)521 _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
522 {
523 lws_smd_msg_t *msg;
524
525 if (!pr->tail)
526 return 0;
527
528 msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
529
530
531 lwsl_smd("%s: deliver cl 0x%x, len %d, refc %d, to peer %p\n",
532 __func__, (unsigned int)msg->_class, (int)msg->length,
533 (int)msg->refcount, pr);
534
535 pr->cb(pr->opaque, msg->_class, msg->timestamp,
536 ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
537 (size_t)msg->length);
538
539 assert(msg->refcount);
540
541 /*
542 * If there is one, move forward to the next queued
543 * message that meets the filters of this peer
544 */
545 pr->tail = _lws_smd_msg_next_matching_filter(pr);
546
547 /* tail message has to actually be of interest to the peer */
548 assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
549
550 lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
551 if (!--msg->refcount)
552 _lws_smd_msg_destroy(&ctx->smd, msg);
553 lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
554
555 return !!pr->tail;
556 }
557
558 /*
559 * Called when the event loop could deliver messages synchronously, eg, on
560 * entry to idle
561 */
562
563 int
lws_smd_msg_distribute(struct lws_context * ctx)564 lws_smd_msg_distribute(struct lws_context *ctx)
565 {
566 char more;
567
568 /* commonly, no messages and nothing to do... */
569
570 if (!ctx->smd.owner_messages.count)
571 return 0;
572
573 ctx->smd.delivering = 1;
574
575 do {
576 more = 0;
577 lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
578
579 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
580 ctx->smd.owner_peers.head) {
581 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
582
583 more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
584
585 } lws_end_foreach_dll_safe(p, p1);
586
587 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
588 } while (more);
589
590 ctx->smd.delivering = 0;
591
592 return 0;
593 }
594
595 struct lws_smd_peer *
lws_smd_register(struct lws_context * ctx,void * opaque,int flags,lws_smd_class_t _class_filter,lws_smd_notification_cb_t cb)596 lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
597 lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
598 {
599 lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
600
601 if (!pr)
602 return NULL;
603
604 pr->cb = cb;
605 pr->opaque = opaque;
606 pr->_class_filter = _class_filter;
607
608 if (!ctx->smd.delivering)
609 lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
610
611 /*
612 * Let's lock the message list before adding this peer... because...
613 */
614
615 lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
616
617 lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
618
619 /* update the global class mask union to account for new peer mask */
620 _lws_smd_class_mask_union(&ctx->smd);
621
622 /*
623 * Now there's a new peer added, any messages we have stashed will try
624 * to deliver to this guy too, if he's interested in that class. So we
625 * have to update the message refcounts for queued messages-he's-
626 * interested-in accordingly.
627 */
628
629 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
630 ctx->smd.owner_messages.head) {
631 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
632
633 if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
634 msg->refcount++;
635
636 } lws_end_foreach_dll_safe(p, p1);
637
638 /* ... ok we are done adding the peer */
639
640 lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
641
642 lwsl_info("%s: peer %p (count %u) registered\n", __func__, pr,
643 (unsigned int)ctx->smd.owner_peers.count);
644
645 if (!ctx->smd.delivering)
646 lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
647
648 return pr;
649 }
650
651 void
lws_smd_unregister(struct lws_smd_peer * pr)652 lws_smd_unregister(struct lws_smd_peer *pr)
653 {
654 lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
655
656 if (!smd->delivering)
657 lws_mutex_lock(smd->lock_peers); /* +++++++++++++++++++ peers */
658 lwsl_notice("%s: destroying peer %p\n", __func__, pr);
659 _lws_smd_peer_destroy(pr);
660 if (!smd->delivering)
661 lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */
662 }
663
664 int
lws_smd_message_pending(struct lws_context * ctx)665 lws_smd_message_pending(struct lws_context *ctx)
666 {
667 int ret = 1;
668
669 /*
670 * First cheaply check the common case no messages pending, so there's
671 * definitely nothing for this tsi or anything else
672 */
673
674 if (!ctx->smd.owner_messages.count)
675 return 0;
676
677 /*
678 * If there are any messages, check their age and expire ones that
679 * have been hanging around too long
680 */
681
682 lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++++++++++ peers */
683 lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
684
685 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
686 ctx->smd.owner_messages.head) {
687 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
688
689 if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
690 lwsl_warn("%s: timing out queued message %p\n",
691 __func__, msg);
692
693 /*
694 * We're forcibly yanking this guy, we can expect that
695 * there might be peers that point to it as their tail.
696 *
697 * In that case, move their tails on to the next guy
698 * they are interested in, if any.
699 */
700
701 lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
702 ctx->smd.owner_peers.head) {
703 lws_smd_peer_t *pr = lws_container_of(pp,
704 lws_smd_peer_t, list);
705
706 if (pr->tail == msg)
707 pr->tail = _lws_smd_msg_next_matching_filter(pr);
708
709 } lws_end_foreach_dll_safe(pp, pp1);
710
711 /*
712 * No peer should fall foul of the peer tail checks
713 * when destroying the message now.
714 */
715
716 _lws_smd_msg_destroy(&ctx->smd, msg);
717 }
718 } lws_end_foreach_dll_safe(p, p1);
719
720 lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
721
722 /*
723 * Walk the peer list
724 */
725
726 lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
727 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
728
729 if (pr->tail)
730 goto bail;
731
732 } lws_end_foreach_dll(p);
733
734 /*
735 * There's no message pending that we need to handle
736 */
737
738 ret = 0;
739
740 bail:
741 lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
742
743 return ret;
744 }
745
746 int
_lws_smd_destroy(struct lws_context * ctx)747 _lws_smd_destroy(struct lws_context *ctx)
748 {
749 /* stop any message creation */
750
751 ctx->smd._class_filter = 0;
752
753 /*
754 * Walk the message list, destroying them
755 */
756
757 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
758 ctx->smd.owner_messages.head) {
759 lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
760
761 lws_dll2_remove(&msg->list);
762 lws_free(msg);
763
764 } lws_end_foreach_dll_safe(p, p1);
765
766 /*
767 * Walk the peer list, destroying them
768 */
769
770 lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
771 ctx->smd.owner_peers.head) {
772 lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
773
774 pr->tail = NULL; /* we just nuked all the messages, ignore */
775 _lws_smd_peer_destroy(pr);
776
777 } lws_end_foreach_dll_safe(p, p1);
778
779 lws_mutex_destroy(ctx->smd.lock_messages);
780 lws_mutex_destroy(ctx->smd.lock_peers);
781
782 return 0;
783 }
784