1 /*
2  * lws System Message Distribution
3  *
4  * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #include "private-lib-core.h"
26 #include <assert.h>
27 
28 /* comment me to remove extra debug and sanity checks */
29 // #define LWS_SMD_DEBUG
30 
31 
32 #if defined(LWS_SMD_DEBUG)
33 #define lwsl_smd lwsl_notice
34 #else
35 #define lwsl_smd(_s, ...)
36 #endif
37 
38 void *
lws_smd_msg_alloc(struct lws_context * ctx,lws_smd_class_t _class,size_t len)39 lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
40 {
41 	lws_smd_msg_t *msg;
42 
43 	/* only allow it if someone wants to consume this class of event */
44 
45 	if (!(ctx->smd._class_filter & _class)) {
46 		lwsl_info("%s: rejecting class 0x%x as no participant wants it\n",
47 			  __func__, (unsigned int)_class);
48 		return NULL;
49 	}
50 
51 	assert(len <= LWS_SMD_MAX_PAYLOAD);
52 
53 
54 	/*
55 	 * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
56 	 * payload, ie,  msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
57 	 */
58 	msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
59 			 __func__);
60 	if (!msg)
61 		return NULL;
62 
63 	memset(msg, 0, sizeof(*msg));
64 	msg->timestamp = lws_now_usecs();
65 	msg->length = (uint16_t)len;
66 	msg->_class = _class;
67 
68 	return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
69 }
70 
71 void
lws_smd_msg_free(void ** ppay)72 lws_smd_msg_free(void **ppay)
73 {
74 	lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
75 				LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
76 
77 	/* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
78 	lws_free(msg);
79 	*ppay = NULL;
80 }
81 
82 #if defined(LWS_SMD_DEBUG)
83 static void
lws_smd_dump(lws_smd_t * smd)84 lws_smd_dump(lws_smd_t *smd)
85 {
86 	int n = 1;
87 
88 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
89 				   smd->owner_messages.head) {
90 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
91 
92 		lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
93 			    n++, msg, msg->refcount,
94 			    (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
95 			    msg->length, msg->_class,
96 			    (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
97 
98 	} lws_end_foreach_dll_safe(p, p1);
99 
100 	n = 1;
101 	lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
102 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
103 
104 		lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
105 			    n++, pr, pr->tail, pr->_class_filter);
106 	} lws_end_foreach_dll(p);
107 }
108 #endif
109 
110 static int
_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t * pr,lws_smd_msg_t * msg)111 _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
112 {
113     return !!(msg->_class & pr->_class_filter);
114 }
115 
116 /*
117  * Figure out what to set the initial refcount for the message to
118  */
119 
120 static int
_lws_smd_msg_assess_peers_interested(lws_smd_t * smd,lws_smd_msg_t * msg,struct lws_smd_peer * exc)121 _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
122 				     struct lws_smd_peer *exc)
123 {
124 	struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
125 	int interested = 0;
126 
127 	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
128 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
129 
130 		if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
131 			/*
132 			 * This peer wants to consume it
133 			 */
134 			interested++;
135 
136 	} lws_end_foreach_dll(p);
137 
138 	return interested;
139 }
140 
141 static int
_lws_smd_class_mask_union(lws_smd_t * smd)142 _lws_smd_class_mask_union(lws_smd_t *smd)
143 {
144 	uint32_t mask = 0;
145 
146 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
147 				   smd->owner_peers.head) {
148 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
149 
150 		mask |= pr->_class_filter;
151 
152 	} lws_end_foreach_dll_safe(p, p1);
153 
154 	smd->_class_filter = mask;
155 
156 	return 0;
157 }
158 
159 /* Call with message lock held */
160 
161 static void
_lws_smd_msg_destroy(lws_smd_t * smd,lws_smd_msg_t * msg)162 _lws_smd_msg_destroy(lws_smd_t *smd, lws_smd_msg_t *msg)
163 {
164 	/*
165 	 * We think we gave the message to everyone and can destroy it.
166 	 * Sanity check that no peer holds a pointer to this guy
167 	 */
168 
169 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
170 				   smd->owner_peers.head) {
171 		lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
172 
173 		if (xpr->tail == msg) {
174 			lwsl_err("%s: peer %p has msg %p "
175 				 "we are about to destroy as tail\n",
176 				 __func__, xpr, msg);
177 #if !defined(LWS_PLAT_FREERTOS)
178 			assert(0);
179 #endif
180 		}
181 
182 	} lws_end_foreach_dll_safe(p, p1);
183 
184 	/*
185 	 * We have fully delivered the message now, it
186 	 * can be unlinked and destroyed
187 	 */
188 	lwsl_info("%s: destroy msg %p\n", __func__, msg);
189 	lws_dll2_remove(&msg->list);
190 	lws_free(msg);
191 }
192 
193 /*
194  * This is wanting to be threadsafe, limiting the apis we can call
195  */
196 
197 int
_lws_smd_msg_send(struct lws_context * ctx,void * pay,struct lws_smd_peer * exc)198 _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
199 {
200 	lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
201 				LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
202 
203 	if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
204 		lwsl_warn("%s: rejecting message on queue depth %d\n",
205 				__func__, (int)ctx->smd.owner_messages.count);
206 		/* reject the message due to max queue depth reached */
207 		return 1;
208 	}
209 
210 	if (!ctx->smd.delivering)
211 		lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
212 
213 	msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
214 							&ctx->smd, msg, exc);
215 	if (!msg->refcount) {
216 		/* possible, condsidering exc and no other participants */
217 		lws_free(msg);
218 		if (!ctx->smd.delivering)
219 			lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
220 
221 		return 0;
222 	}
223 
224 	msg->exc = exc;
225 
226 	/* let's add him on the queue... */
227 
228 	lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
229 	lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
230 
231 	/*
232 	 * Any peer with no active tail needs to check our class to see if we
233 	 * should become his tail
234 	 */
235 
236 	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
237 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
238 
239 		if (pr != exc &&
240                    !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
241 			pr->tail = msg;
242 			/* tail message has to actually be of interest to the peer */
243 			assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
244 		}
245 
246 	} lws_end_foreach_dll(p);
247 
248 #if defined(LWS_SMD_DEBUG)
249 	lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
250 		 msg, msg->refcount, ctx->smd.owner_messages.count);
251 	lws_smd_dump(&ctx->smd);
252 #endif
253 
254 	lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
255 
256 	if (!ctx->smd.delivering)
257 		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
258 
259 	/* we may be happening from another thread context */
260 	lws_cancel_service(ctx);
261 
262 	return 0;
263 }
264 
265 /*
266  * This is wanting to be threadsafe, limiting the apis we can call
267  */
268 
269 int
lws_smd_msg_send(struct lws_context * ctx,void * pay)270 lws_smd_msg_send(struct lws_context *ctx, void *pay)
271 {
272 	return _lws_smd_msg_send(ctx, pay, NULL);
273 }
274 
275 /*
276  * This is wanting to be threadsafe, limiting the apis we can call
277  */
278 
279 int
lws_smd_msg_printf(struct lws_context * ctx,lws_smd_class_t _class,const char * format,...)280 lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
281 		   const char *format, ...)
282 {
283 	lws_smd_msg_t *msg;
284 	va_list ap;
285 	void *p;
286 	int n;
287 
288 	if (!(ctx->smd._class_filter & _class))
289 		/*
290 		 * There's nobody interested in messages of this class atm.
291 		 * Don't bother generating it, and act like all is well.
292 		 */
293 		return 0;
294 
295 	va_start(ap, format);
296 	n = vsnprintf(NULL, 0, format, ap);
297 	va_end(ap);
298 	if (n > LWS_SMD_MAX_PAYLOAD)
299 		/* too large to send */
300 		return 1;
301 
302 	p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
303 	if (!p)
304 		return 1;
305 	msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
306 								sizeof(*msg));
307 	msg->length = (uint16_t)n;
308 	va_start(ap, format);
309 	vsnprintf((char *)p, (unsigned int)n + 2, format, ap);
310 	va_end(ap);
311 
312 	/*
313 	 * locks taken and released in here
314 	 */
315 
316 	if (lws_smd_msg_send(ctx, p)) {
317 		lws_smd_msg_free(&p);
318 		return 1;
319 	}
320 
321 	return 0;
322 }
323 
324 #if defined(LWS_WITH_SECURE_STREAMS)
325 int
lws_smd_ss_msg_printf(const char * tag,uint8_t * buf,size_t * len,lws_smd_class_t _class,const char * format,...)326 lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
327 		      lws_smd_class_t _class, const char *format, ...)
328 {
329 	char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN;
330 	va_list ap;
331 	int n;
332 
333 	if (*len < LWS_SMD_SS_RX_HEADER_LEN)
334 		return 1;
335 
336 	lws_ser_wu64be(buf, _class);
337 	lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
338 
339 	va_start(ap, format);
340 	n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap);
341 	va_end(ap);
342 
343 	if (n > LWS_SMD_MAX_PAYLOAD ||
344 	    (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN)
345 		/* too large to send */
346 		return 1;
347 
348 	*len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
349 
350 	lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class,
351 			(unsigned int)n);
352 
353 	return 0;
354 }
355 
356 /*
357  * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
358  * call through to with the payload it received from the proxy.  It will then
359  * forward the recieved SMD message to all local (same-context) participants
360  * that are interested in that class (except ones with callback skip_cb, so
361  * we don't loop).
362  */
363 
364 static int
_lws_smd_ss_rx_forward(struct lws_context * ctx,const char * tag,struct lws_smd_peer * pr,const uint8_t * buf,size_t len)365 _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
366 		       struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
367 {
368 	lws_smd_class_t _class;
369 	lws_smd_msg_t *msg;
370 	void *p;
371 
372 	if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
373 		return 1;
374 
375 	if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
376 		return 1;
377 
378 	_class = (lws_smd_class_t)lws_ser_ru64be(buf);
379 
380 	if (_class == LWSSMDCL_METRICS) {
381 
382 	}
383 
384 	/* only locally forward messages that we care about in this process */
385 
386 	if (!(ctx->smd._class_filter & _class))
387 		/*
388 		 * There's nobody interested in messages of this class atm.
389 		 * Don't bother generating it, and act like all is well.
390 		 */
391 		return 0;
392 
393 	p = lws_smd_msg_alloc(ctx, _class, len);
394 	if (!p)
395 		return 1;
396 
397 	msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
398 								sizeof(*msg));
399 	msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
400 	/* adopt the original source timestamp, not time we forwarded it */
401 	msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
402 
403 	/* copy the message payload in */
404 	memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
405 
406 	/*
407 	 * locks taken and released in here
408 	 */
409 
410 	if (_lws_smd_msg_send(ctx, p, pr)) {
411 		/* we couldn't send it after all that... */
412 		lws_smd_msg_free(&p);
413 
414 		return 1;
415 	}
416 
417 	lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
418 		    tag, (unsigned int)_class, msg->length,
419 		    (unsigned long long)msg->timestamp);
420 
421 	return 0;
422 }
423 
424 int
lws_smd_ss_rx_forward(void * ss_user,const uint8_t * buf,size_t len)425 lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
426 {
427 	struct lws_ss_handle *h = (struct lws_ss_handle *)
428 					(((char *)ss_user) - sizeof(*h));
429 	struct lws_context *ctx = lws_ss_get_context(h);
430 
431 	return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
432 }
433 
434 #if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
435 int
lws_smd_sspc_rx_forward(void * ss_user,const uint8_t * buf,size_t len)436 lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
437 {
438 	struct lws_sspc_handle *h = (struct lws_sspc_handle *)
439 					(((char *)ss_user) - sizeof(*h));
440 	struct lws_context *ctx = lws_sspc_get_context(h);
441 
442 	return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
443 }
444 #endif
445 
446 #endif
447 
448 /*
449  * Peers that deregister need to adjust the refcount of messages they would
450  * have been interested in, but didn't take delivery of yet
451  */
452 
453 static void
_lws_smd_peer_destroy(lws_smd_peer_t * pr)454 _lws_smd_peer_destroy(lws_smd_peer_t *pr)
455 {
456 	lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
457 					  owner_peers);
458 
459 	lws_mutex_lock(smd->lock_messages); /* +++++++++ messages */
460 
461 	lws_dll2_remove(&pr->list);
462 
463 	/*
464 	 * We take the approach to adjust the refcount of every would-have-been
465 	 * delivered message we were interested in
466 	 */
467 
468 	while (pr->tail) {
469 
470 		lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
471 							lws_smd_msg_t, list);
472 
473 		if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
474 			if (!--pr->tail->refcount)
475 				_lws_smd_msg_destroy(smd, pr->tail);
476 		}
477 
478 		pr->tail = m1;
479 	}
480 
481 	lws_free(pr);
482 
483 	lws_mutex_unlock(smd->lock_messages); /* messages ------- */
484 }
485 
486 static lws_smd_msg_t *
_lws_smd_msg_next_matching_filter(lws_smd_peer_t * pr)487 _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
488 {
489 	lws_dll2_t *tail = &pr->tail->list;
490 	lws_smd_msg_t *msg;
491 
492 	do {
493 		tail = tail->next;
494 		if (!tail)
495 			return NULL;
496 
497 		msg = lws_container_of(tail, lws_smd_msg_t, list);
498 		if (msg->exc != pr &&
499 		    _lws_smd_msg_peer_interested_in_msg(pr, msg))
500 			return msg;
501 	} while (1);
502 
503 	return NULL;
504 }
505 
506 /*
507  * Delivers only one message to the peer and advances the tail, or sets to NULL
508  * if no more filtered queued messages.  Returns nonzero if tail non-NULL.
509  *
510  * For Proxied SS, only asks for writeable and does not advance or change the
511  * tail.
512  *
513  * This is done so if multiple messages queued, we don't get a situation where
514  * one participant gets them all spammed, then the next etc.  Instead they are
515  * delivered round-robin.
516  *
517  * Requires peer lock, may take message lock
518  */
519 
520 static int
_lws_smd_msg_deliver_peer(struct lws_context * ctx,lws_smd_peer_t * pr)521 _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
522 {
523 	lws_smd_msg_t *msg;
524 
525 	if (!pr->tail)
526 		return 0;
527 
528 	msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
529 
530 
531 	lwsl_smd("%s: deliver cl 0x%x, len %d, refc %d, to peer %p\n",
532 		    __func__, (unsigned int)msg->_class, (int)msg->length,
533 		    (int)msg->refcount, pr);
534 
535 	pr->cb(pr->opaque, msg->_class, msg->timestamp,
536 	       ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
537 	       (size_t)msg->length);
538 
539 	assert(msg->refcount);
540 
541 	/*
542 	 * If there is one, move forward to the next queued
543 	 * message that meets the filters of this peer
544 	 */
545 	pr->tail = _lws_smd_msg_next_matching_filter(pr);
546 
547 	/* tail message has to actually be of interest to the peer */
548 	assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
549 
550 	lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
551 	if (!--msg->refcount)
552 		_lws_smd_msg_destroy(&ctx->smd, msg);
553 	lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
554 
555 	return !!pr->tail;
556 }
557 
558 /*
559  * Called when the event loop could deliver messages synchronously, eg, on
560  * entry to idle
561  */
562 
563 int
lws_smd_msg_distribute(struct lws_context * ctx)564 lws_smd_msg_distribute(struct lws_context *ctx)
565 {
566 	char more;
567 
568 	/* commonly, no messages and nothing to do... */
569 
570 	if (!ctx->smd.owner_messages.count)
571 		return 0;
572 
573 	ctx->smd.delivering = 1;
574 
575 	do {
576 		more = 0;
577 		lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
578 
579 		lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
580 					   ctx->smd.owner_peers.head) {
581 			lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
582 
583 			more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
584 
585 		} lws_end_foreach_dll_safe(p, p1);
586 
587 		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
588 	} while (more);
589 
590 	ctx->smd.delivering = 0;
591 
592 	return 0;
593 }
594 
595 struct lws_smd_peer *
lws_smd_register(struct lws_context * ctx,void * opaque,int flags,lws_smd_class_t _class_filter,lws_smd_notification_cb_t cb)596 lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
597 		 lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
598 {
599 	lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
600 
601 	if (!pr)
602 		return NULL;
603 
604 	pr->cb = cb;
605 	pr->opaque = opaque;
606 	pr->_class_filter = _class_filter;
607 
608 	if (!ctx->smd.delivering)
609 		lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
610 
611 	/*
612 	 * Let's lock the message list before adding this peer... because...
613 	 */
614 
615 	lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
616 
617 	lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
618 
619 	/* update the global class mask union to account for new peer mask */
620 	_lws_smd_class_mask_union(&ctx->smd);
621 
622 	/*
623 	 * Now there's a new peer added, any messages we have stashed will try
624 	 * to deliver to this guy too, if he's interested in that class.  So we
625 	 * have to update the message refcounts for queued messages-he's-
626 	 * interested-in accordingly.
627 	 */
628 
629 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
630 				   ctx->smd.owner_messages.head) {
631 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
632 
633 		if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
634 			msg->refcount++;
635 
636 	} lws_end_foreach_dll_safe(p, p1);
637 
638 	/* ... ok we are done adding the peer */
639 
640 	lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
641 
642 	lwsl_info("%s: peer %p (count %u) registered\n", __func__, pr,
643 			(unsigned int)ctx->smd.owner_peers.count);
644 
645 	if (!ctx->smd.delivering)
646 		lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
647 
648 	return pr;
649 }
650 
651 void
lws_smd_unregister(struct lws_smd_peer * pr)652 lws_smd_unregister(struct lws_smd_peer *pr)
653 {
654 	lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
655 
656 	if (!smd->delivering)
657 		lws_mutex_lock(smd->lock_peers); /* +++++++++++++++++++ peers */
658 	lwsl_notice("%s: destroying peer %p\n", __func__, pr);
659 	_lws_smd_peer_destroy(pr);
660 	if (!smd->delivering)
661 		lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */
662 }
663 
664 int
lws_smd_message_pending(struct lws_context * ctx)665 lws_smd_message_pending(struct lws_context *ctx)
666 {
667 	int ret = 1;
668 
669 	/*
670 	 * First cheaply check the common case no messages pending, so there's
671 	 * definitely nothing for this tsi or anything else
672 	 */
673 
674 	if (!ctx->smd.owner_messages.count)
675 		return 0;
676 
677 	/*
678 	 * If there are any messages, check their age and expire ones that
679 	 * have been hanging around too long
680 	 */
681 
682 	lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++++++++++ peers */
683 	lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
684 
685 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
686 				   ctx->smd.owner_messages.head) {
687 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
688 
689 		if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
690 			lwsl_warn("%s: timing out queued message %p\n",
691 					__func__, msg);
692 
693 			/*
694 			 * We're forcibly yanking this guy, we can expect that
695 			 * there might be peers that point to it as their tail.
696 			 *
697 			 * In that case, move their tails on to the next guy
698 			 * they are interested in, if any.
699 			 */
700 
701 			lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
702 						   ctx->smd.owner_peers.head) {
703 				lws_smd_peer_t *pr = lws_container_of(pp,
704 							lws_smd_peer_t, list);
705 
706 				if (pr->tail == msg)
707 					pr->tail = _lws_smd_msg_next_matching_filter(pr);
708 
709 			} lws_end_foreach_dll_safe(pp, pp1);
710 
711 			/*
712 			 * No peer should fall foul of the peer tail checks
713 			 * when destroying the message now.
714 			 */
715 
716 			_lws_smd_msg_destroy(&ctx->smd, msg);
717 		}
718 	} lws_end_foreach_dll_safe(p, p1);
719 
720 	lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
721 
722 	/*
723 	 * Walk the peer list
724 	 */
725 
726 	lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
727 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
728 
729 		if (pr->tail)
730 			goto bail;
731 
732 	} lws_end_foreach_dll(p);
733 
734 	/*
735 	 * There's no message pending that we need to handle
736 	 */
737 
738 	ret = 0;
739 
740 bail:
741 	lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
742 
743 	return ret;
744 }
745 
746 int
_lws_smd_destroy(struct lws_context * ctx)747 _lws_smd_destroy(struct lws_context *ctx)
748 {
749 	/* stop any message creation */
750 
751 	ctx->smd._class_filter = 0;
752 
753 	/*
754 	 * Walk the message list, destroying them
755 	 */
756 
757 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
758 				   ctx->smd.owner_messages.head) {
759 		lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
760 
761 		lws_dll2_remove(&msg->list);
762 		lws_free(msg);
763 
764 	} lws_end_foreach_dll_safe(p, p1);
765 
766 	/*
767 	 * Walk the peer list, destroying them
768 	 */
769 
770 	lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
771 				   ctx->smd.owner_peers.head) {
772 		lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
773 
774 		pr->tail = NULL; /* we just nuked all the messages, ignore */
775 		_lws_smd_peer_destroy(pr);
776 
777 	} lws_end_foreach_dll_safe(p, p1);
778 
779 	lws_mutex_destroy(ctx->smd.lock_messages);
780 	lws_mutex_destroy(ctx->smd.lock_peers);
781 
782 	return 0;
783 }
784