1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #if !defined(_GNU_SOURCE)
26 #define _GNU_SOURCE
27 #endif
28 
29 #if defined(WIN32)
30 #define HAVE_STRUCT_TIMESPEC
31 #if defined(pid_t)
32 #undef pid_t
33 #endif
34 #endif
35 #include <pthread.h>
36 
37 #include "private-lib-core.h"
38 
39 #include <string.h>
40 #include <stdio.h>
41 
42 struct lws_threadpool;
43 
44 struct lws_threadpool_task {
45 	struct lws_threadpool_task	*task_queue_next;
46 
47 	struct lws_threadpool		*tp;
48 	char				name[32];
49 	struct lws_threadpool_task_args args;
50 
51 	lws_dll2_t			list;
52 
53 	lws_usec_t			created;
54 	lws_usec_t			acquired;
55 	lws_usec_t			done;
56 	lws_usec_t			entered_state;
57 
58 	lws_usec_t			acc_running;
59 	lws_usec_t			acc_syncing;
60 
61 	pthread_cond_t			wake_idle;
62 
63 	enum lws_threadpool_task_status status;
64 
65 	int				late_sync_retries;
66 
67 	char				wanted_writeable_cb;
68 	char				outlive;
69 };
70 
71 struct lws_pool {
72 	struct lws_threadpool		*tp;
73 	pthread_t			thread;
74 	pthread_mutex_t			lock; /* part of task wake_idle */
75 	struct lws_threadpool_task	*task;
76 	lws_usec_t			acquired;
77 	int				worker_index;
78 };
79 
80 struct lws_threadpool {
81 	pthread_mutex_t			lock; /* protects all pool lists */
82 	pthread_cond_t			wake_idle;
83 	struct lws_pool			*pool_list;
84 
85 	struct lws_context		*context;
86 	struct lws_threadpool		*tp_list; /* context list of threadpools */
87 
88 	struct lws_threadpool_task	*task_queue_head;
89 	struct lws_threadpool_task	*task_done_head;
90 
91 	char				name[32];
92 
93 	int				threads_in_pool;
94 	int				queue_depth;
95 	int				done_queue_depth;
96 	int				max_queue_depth;
97 	int				running_tasks;
98 
99 	unsigned int			destroying:1;
100 };
101 
102 static int
ms_delta(lws_usec_t now,lws_usec_t then)103 ms_delta(lws_usec_t now, lws_usec_t then)
104 {
105 	return (int)((now - then) / 1000);
106 }
107 
108 static void
us_accrue(lws_usec_t * acc,lws_usec_t then)109 us_accrue(lws_usec_t *acc, lws_usec_t then)
110 {
111 	lws_usec_t now = lws_now_usecs();
112 
113 	*acc += now - then;
114 }
115 
116 static int
pc_delta(lws_usec_t now,lws_usec_t then,lws_usec_t us)117 pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
118 {
119 	lws_usec_t delta = (now - then) + 1;
120 
121 	return (int)((us * 100) / delta);
122 }
123 
124 static void
__lws_threadpool_task_dump(struct lws_threadpool_task * task,char * buf,int len)125 __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
126 {
127 	lws_usec_t now = lws_now_usecs();
128 	char *end = buf + len - 1;
129 	int syncms = 0, runms = 0;
130 
131 	if (!task->acquired) {
132 		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
133 				    "task: %s, QUEUED queued: %dms",
134 				    task->name, ms_delta(now, task->created));
135 
136 		return;
137 	}
138 
139 	if (task->acc_running)
140 		runms = (int)task->acc_running;
141 
142 	if (task->acc_syncing)
143 		syncms = (int)task->acc_syncing;
144 
145 	if (!task->done) {
146 		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
147 			"task: %s, ONGOING state %d (%dms) alive: %dms "
148 			"(queued %dms, acquired: %dms, "
149 			"run: %d%%, sync: %d%%)", task->name, task->status,
150 			ms_delta(now, task->entered_state),
151 			ms_delta(now, task->created),
152 			ms_delta(task->acquired, task->created),
153 			ms_delta(now, task->acquired),
154 			pc_delta(now, task->acquired, runms),
155 			pc_delta(now, task->acquired, syncms));
156 
157 		return;
158 	}
159 
160 	lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
161 		"task: %s, DONE state %d lived: %dms "
162 		"(queued %dms, on thread: %dms, "
163 		"ran: %d%%, synced: %d%%)", task->name, task->status,
164 		ms_delta(task->done, task->created),
165 		ms_delta(task->acquired, task->created),
166 		ms_delta(task->done, task->acquired),
167 		pc_delta(task->done, task->acquired, runms),
168 		pc_delta(task->done, task->acquired, syncms));
169 }
170 
171 void
lws_threadpool_dump(struct lws_threadpool * tp)172 lws_threadpool_dump(struct lws_threadpool *tp)
173 {
174 #if 0
175 	//defined(_DEBUG)
176 	struct lws_threadpool_task **c;
177 	char buf[160];
178 	int n, count;
179 
180 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
181 
182 	lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
183 		    tp->name, tp->queue_depth, tp->running_tasks,
184 		    tp->done_queue_depth);
185 
186 	count = 0;
187 	c = &tp->task_queue_head;
188 	while (*c) {
189 		struct lws_threadpool_task *task = *c;
190 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
191 		lwsl_thread("  - %s\n", buf);
192 		count++;
193 
194 		c = &(*c)->task_queue_next;
195 	}
196 
197 	if (count != tp->queue_depth)
198 		lwsl_err("%s: tp says queue depth %d, but actually %d\n",
199 			 __func__, tp->queue_depth, count);
200 
201 	count = 0;
202 	for (n = 0; n < tp->threads_in_pool; n++) {
203 		struct lws_pool *pool = &tp->pool_list[n];
204 		struct lws_threadpool_task *task = pool->task;
205 
206 		if (task) {
207 			__lws_threadpool_task_dump(task, buf, sizeof(buf));
208 			lwsl_thread("  - worker %d: %s\n", n, buf);
209 			count++;
210 		}
211 	}
212 
213 	if (count != tp->running_tasks)
214 		lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
215 			 __func__, tp->running_tasks, count);
216 
217 	count = 0;
218 	c = &tp->task_done_head;
219 	while (*c) {
220 		struct lws_threadpool_task *task = *c;
221 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
222 		lwsl_thread("  - %s\n", buf);
223 		count++;
224 
225 		c = &(*c)->task_queue_next;
226 	}
227 
228 	if (count != tp->done_queue_depth)
229 		lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
230 			 __func__, tp->done_queue_depth, count);
231 
232 	pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
233 #endif
234 }
235 
236 static void
state_transition(struct lws_threadpool_task * task,enum lws_threadpool_task_status status)237 state_transition(struct lws_threadpool_task *task,
238 		 enum lws_threadpool_task_status status)
239 {
240 	task->entered_state = lws_now_usecs();
241 	task->status = status;
242 }
243 
244 static struct lws *
task_to_wsi(struct lws_threadpool_task * task)245 task_to_wsi(struct lws_threadpool_task *task)
246 {
247 #if defined(LWS_WITH_SECURE_STREAMS)
248 	if (task->args.ss)
249 		return task->args.ss->wsi;
250 #endif
251 	return task->args.wsi;
252 }
253 
254 static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task * task)255 lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
256 {
257 	if (task->args.cleanup)
258 		task->args.cleanup(task_to_wsi(task), task->args.user);
259 
260 	lws_dll2_remove(&task->list);
261 
262 	lwsl_thread("%s: tp %p: cleaned finished task for %s\n",
263 		    __func__, task->tp, lws_wsi_tag(task_to_wsi(task)));
264 
265 	lws_free(task);
266 }
267 
268 static void
__lws_threadpool_reap(struct lws_threadpool_task * task)269 __lws_threadpool_reap(struct lws_threadpool_task *task)
270 {
271 	struct lws_threadpool_task **c, *t = NULL;
272 	struct lws_threadpool *tp = task->tp;
273 
274 	/* remove the task from the done queue */
275 
276 	if (tp) {
277 		c = &tp->task_done_head;
278 
279 		while (*c) {
280 			if ((*c) == task) {
281 				t = *c;
282 				*c = t->task_queue_next;
283 				t->task_queue_next = NULL;
284 				tp->done_queue_depth--;
285 
286 				lwsl_thread("%s: tp %s: reaped task %s\n", __func__,
287 					   tp->name, lws_wsi_tag(task_to_wsi(task)));
288 
289 				break;
290 			}
291 			c = &(*c)->task_queue_next;
292 		}
293 
294 		if (!t) {
295 			lwsl_err("%s: task %p not in done queue\n", __func__, task);
296 			/*
297 			 * This shouldn't occur, but in this case not really
298 			 * safe to assume there's a task to destroy
299 			 */
300 			return;
301 		}
302 	} else
303 		lwsl_err("%s: task->tp NULL already\n", __func__);
304 
305 	/* call the task's cleanup and delete the task itself */
306 
307 	lws_threadpool_task_cleanup_destroy(task);
308 }
309 
310 /*
311  * this gets called from each tsi service context after the service was
312  * cancelled... we need to ask for the writable callback from the matching
313  * tsi context for any wsis bound to a worked thread that need it
314  */
315 
316 int
lws_threadpool_tsi_context(struct lws_context * context,int tsi)317 lws_threadpool_tsi_context(struct lws_context *context, int tsi)
318 {
319 	struct lws_threadpool_task **c, *task = NULL;
320 	struct lws_threadpool *tp;
321 	struct lws *wsi;
322 
323 	lws_context_lock(context, __func__);
324 
325 	tp = context->tp_list_head;
326 	while (tp) {
327 		int n;
328 
329 		/* for the running (syncing...) tasks... */
330 
331 		for (n = 0; n < tp->threads_in_pool; n++) {
332 			struct lws_pool *pool = &tp->pool_list[n];
333 
334 			task = pool->task;
335 			if (!task)
336 				continue;
337 
338 			wsi = task_to_wsi(task);
339 			if (!wsi || wsi->tsi != tsi ||
340 			    (!task->wanted_writeable_cb &&
341 			     task->status != LWS_TP_STATUS_SYNCING))
342 				continue;
343 
344 			task->wanted_writeable_cb = 0;
345 			lws_memory_barrier();
346 
347 			/*
348 			 * finally... we can ask for the callback on
349 			 * writable from the correct service thread
350 			 * context
351 			 */
352 
353 			lws_callback_on_writable(wsi);
354 		}
355 
356 		/* for the done tasks... */
357 
358 		c = &tp->task_done_head;
359 
360 		while (*c) {
361 			task = *c;
362 			wsi = task_to_wsi(task);
363 
364 			if (wsi && wsi->tsi == tsi &&
365 			    (task->wanted_writeable_cb ||
366 			     task->status == LWS_TP_STATUS_SYNCING)) {
367 
368 				task->wanted_writeable_cb = 0;
369 				lws_memory_barrier();
370 
371 				/*
372 				 * finally... we can ask for the callback on
373 				 * writable from the correct service thread
374 				 * context
375 				 */
376 
377 				lws_callback_on_writable(wsi);
378 			}
379 
380 			c = &task->task_queue_next;
381 		}
382 
383 		tp = tp->tp_list;
384 	}
385 
386 	lws_context_unlock(context);
387 
388 	return 0;
389 }
390 
391 static int
lws_threadpool_worker_sync(struct lws_pool * pool,struct lws_threadpool_task * task)392 lws_threadpool_worker_sync(struct lws_pool *pool,
393 			   struct lws_threadpool_task *task)
394 {
395 	enum lws_threadpool_task_status temp;
396 	struct timespec abstime;
397 	struct lws *wsi;
398 	int tries = 15;
399 
400 	/* block until writable acknowledges */
401 	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
402 	pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
403 
404 	lwsl_info("%s: %s: task %p (%s): syncing with %s\n", __func__,
405 		    pool->tp->name, task, task->name, lws_wsi_tag(task_to_wsi(task)));
406 
407 	temp = task->status;
408 	state_transition(task, LWS_TP_STATUS_SYNCING);
409 	while (tries--) {
410 		wsi = task_to_wsi(task);
411 
412 		/*
413 		 * if the wsi is no longer attached to this task, there is
414 		 * nothing we can sync to usefully.  Since the work wants to
415 		 * sync, it means we should react to the situation by telling
416 		 * the task it can't continue usefully by stopping it.
417 		 */
418 
419 		if (!wsi) {
420 			lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
421 				 "wsi to sync to\n", __func__, pool->tp->name,
422 				 task, task->name);
423 
424 			state_transition(task, LWS_TP_STATUS_STOPPING);
425 			goto done;
426 		}
427 
428 		/*
429 		 * So tries times this is the maximum time between SYNC asking
430 		 * for a callback on writable and actually getting it we are
431 		 * willing to sit still for.
432 		 *
433 		 * If it is exceeded, we will stop the task.
434 		 */
435 		abstime.tv_sec = time(NULL) + 2;
436 		abstime.tv_nsec = 0;
437 
438 		task->wanted_writeable_cb = 1;
439 		lws_memory_barrier();
440 
441 		/*
442 		 * This will cause lws_threadpool_tsi_context() to get called
443 		 * from each tsi service context, where we can safely ask for
444 		 * a callback on writeable on the wsi we are associated with.
445 		 */
446 		lws_cancel_service(lws_get_context(wsi));
447 
448 		/*
449 		 * so the danger here is that we asked for a writable callback
450 		 * on the wsi, but for whatever reason, we are never going to
451 		 * get one.  To avoid deadlocking forever, we allow a set time
452 		 * for the sync to happen naturally, otherwise the cond wait
453 		 * times out and we stop the task.
454 		 */
455 
456 		if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
457 					   &abstime) == ETIMEDOUT) {
458 			task->late_sync_retries++;
459 			if (!tries) {
460 				lwsl_err("%s: %s: task %p (%s): SYNC timed out "
461 					 "(associated %s)\n",
462 					 __func__, pool->tp->name, task,
463 					 task->name, lws_wsi_tag(task_to_wsi(task)));
464 
465 				state_transition(task, LWS_TP_STATUS_STOPPING);
466 				goto done;
467 			}
468 
469 			continue;
470 		} else
471 			break;
472 	}
473 
474 	if (task->status == LWS_TP_STATUS_SYNCING)
475 		state_transition(task, temp);
476 
477 	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
478 
479 done:
480 	pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
481 
482 	return 0;
483 }
484 
485 #if !defined(WIN32)
486 static int dummy;
487 #endif
488 
489 static void *
lws_threadpool_worker(void * d)490 lws_threadpool_worker(void *d)
491 {
492 	struct lws_threadpool_task **c, **c2, *task;
493 	struct lws_pool *pool = d;
494 	struct lws_threadpool *tp = pool->tp;
495 	char buf[160];
496 
497 	while (!tp->destroying) {
498 
499 		/* we have no running task... wait and get one from the queue */
500 
501 		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
502 
503 		/*
504 		 * if there's no task already waiting in the queue, wait for
505 		 * the wake_idle condition to signal us that might have changed
506 		 */
507 		while (!tp->task_queue_head && !tp->destroying)
508 			pthread_cond_wait(&tp->wake_idle, &tp->lock);
509 
510 		if (tp->destroying) {
511 			lwsl_notice("%s: bailing\n", __func__);
512 			goto doneski;
513 		}
514 
515 		c = &tp->task_queue_head;
516 		c2 = NULL;
517 		task = NULL;
518 		pool->task = NULL;
519 
520 		/* look at the queue tail */
521 		while (*c) {
522 			c2 = c;
523 			c = &(*c)->task_queue_next;
524 		}
525 
526 		/* is there a task at the queue tail? */
527 		if (c2 && *c2) {
528 			pool->task = task = *c2;
529 			task->acquired = pool->acquired = lws_now_usecs();
530 			/* remove it from the queue */
531 			*c2 = task->task_queue_next;
532 			task->task_queue_next = NULL;
533 			tp->queue_depth--;
534 			/* mark it as running */
535 			state_transition(task, LWS_TP_STATUS_RUNNING);
536 		}
537 
538 		/* someone else got it first... wait and try again */
539 		if (!task) {
540 			pthread_mutex_unlock(&tp->lock);  /* ------ tp unlock */
541 			continue;
542 		}
543 
544 		task->wanted_writeable_cb = 0;
545 
546 		/* we have acquired a new task */
547 
548 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
549 
550 		lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
551 			    __func__, tp->name, pool->worker_index, buf);
552 		tp->running_tasks++;
553 
554 		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
555 
556 		/*
557 		 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
558 		 * "resurface" periodically, and get called again with
559 		 * cont = 1 immediately to indicate it is picking up where it
560 		 * left off if the task is not being "stopped".
561 		 *
562 		 * This allows long tasks to respond to requests to stop in
563 		 * a clean and opaque way.
564 		 *
565 		 * 2) The task can return with LWS_TP_RETURN_SYNC to register
566 		 * a "callback on writable" request on the service thread and
567 		 * block until it hears back from the WRITABLE handler.
568 		 *
569 		 * This allows the work on the thread to be synchronized to the
570 		 * previous work being dispatched cleanly.
571 		 *
572 		 * 3) The task can return with LWS_TP_RETURN_FINISHED to
573 		 * indicate its work is completed nicely.
574 		 *
575 		 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
576 		 * it stopped and cleaned up after incomplete work.
577 		 */
578 
579 		do {
580 			lws_usec_t then;
581 			int n;
582 
583 			if (tp->destroying || !task_to_wsi(task)) {
584 				lwsl_info("%s: stopping on wsi gone\n", __func__);
585 				state_transition(task, LWS_TP_STATUS_STOPPING);
586 			}
587 
588 			then = lws_now_usecs();
589 			n = (int)task->args.task(task->args.user, task->status);
590 			lwsl_debug("   %d, status %d\n", n, task->status);
591 			us_accrue(&task->acc_running, then);
592 			if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
593 				task->outlive = 1;
594 			switch (n & 7) {
595 			case LWS_TP_RETURN_CHECKING_IN:
596 				/* if not destroying the tp, continue */
597 				break;
598 			case LWS_TP_RETURN_SYNC:
599 				if (!task_to_wsi(task)) {
600 					lwsl_debug("%s: task that wants to "
601 						    "outlive lost wsi asked "
602 						    "to sync: bypassed\n",
603 						    __func__);
604 					break;
605 				}
606 				/* block until writable acknowledges */
607 				then = lws_now_usecs();
608 				lws_threadpool_worker_sync(pool, task);
609 				us_accrue(&task->acc_syncing, then);
610 				break;
611 			case LWS_TP_RETURN_FINISHED:
612 				state_transition(task, LWS_TP_STATUS_FINISHED);
613 				break;
614 			case LWS_TP_RETURN_STOPPED:
615 				state_transition(task, LWS_TP_STATUS_STOPPED);
616 				break;
617 			}
618 		} while (task->status == LWS_TP_STATUS_RUNNING);
619 
620 		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
621 
622 		tp->running_tasks--;
623 
624 		if (pool->task->status == LWS_TP_STATUS_STOPPING)
625 			state_transition(task, LWS_TP_STATUS_STOPPED);
626 
627 		/* move the task to the done queue */
628 
629 		pool->task->task_queue_next = tp->task_done_head;
630 		tp->task_done_head = task;
631 		tp->done_queue_depth++;
632 		pool->task->done = lws_now_usecs();
633 
634 		if (!pool->task->args.wsi &&
635 		    (pool->task->status == LWS_TP_STATUS_STOPPED ||
636 		     pool->task->status == LWS_TP_STATUS_FINISHED)) {
637 
638 			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
639 			lwsl_thread("%s: %s: worker %d REAPING: %s\n",
640 				    __func__, tp->name, pool->worker_index,
641 				    buf);
642 
643 			/*
644 			 * there is no longer any wsi attached, so nothing is
645 			 * going to take care of reaping us.  So we must take
646 			 * care of it ourselves.
647 			 */
648 			__lws_threadpool_reap(pool->task);
649 		} else {
650 
651 			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
652 			lwsl_thread("%s: %s: worker %d DONE: %s\n",
653 				    __func__, tp->name, pool->worker_index,
654 				    buf);
655 
656 			/* signal the associated wsi to take a fresh look at
657 			 * task status */
658 
659 			if (task_to_wsi(pool->task)) {
660 				task->wanted_writeable_cb = 1;
661 
662 				lws_cancel_service(
663 					lws_get_context(task_to_wsi(pool->task)));
664 			}
665 		}
666 
667 doneski:
668 		pool->task = NULL;
669 		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
670 	}
671 
672 	lwsl_notice("%s: Exiting\n", __func__);
673 
674 	/* threadpool is being destroyed */
675 #if !defined(WIN32)
676 	pthread_exit(&dummy);
677 #endif
678 
679 	return NULL;
680 }
681 
682 struct lws_threadpool *
lws_threadpool_create(struct lws_context * context,const struct lws_threadpool_create_args * args,const char * format,...)683 lws_threadpool_create(struct lws_context *context,
684 		      const struct lws_threadpool_create_args *args,
685 		      const char *format, ...)
686 {
687 	struct lws_threadpool *tp;
688 	va_list ap;
689 	int n;
690 
691 	tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads),
692 			"threadpool alloc");
693 	if (!tp)
694 		return NULL;
695 
696 	memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads));
697 	tp->pool_list = (struct lws_pool *)(tp + 1);
698 	tp->max_queue_depth = args->max_queue_depth;
699 
700 	va_start(ap, format);
701 	n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
702 	va_end(ap);
703 
704 	lws_context_lock(context, __func__);
705 
706 	tp->context = context;
707 	tp->tp_list = context->tp_list_head;
708 	context->tp_list_head = tp;
709 
710 	lws_context_unlock(context);
711 
712 	pthread_mutex_init(&tp->lock, NULL);
713 	pthread_cond_init(&tp->wake_idle, NULL);
714 
715 	for (n = 0; n < args->threads; n++) {
716 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
717 		char name[16];
718 #endif
719 		tp->pool_list[n].tp = tp;
720 		tp->pool_list[n].worker_index = n;
721 		pthread_mutex_init(&tp->pool_list[n].lock, NULL);
722 		if (pthread_create(&tp->pool_list[n].thread, NULL,
723 				   lws_threadpool_worker, &tp->pool_list[n])) {
724 			lwsl_err("thread creation failed\n");
725 		} else {
726 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
727 			lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
728 			pthread_setname_np(tp->pool_list[n].thread, name);
729 #endif
730 			tp->threads_in_pool++;
731 		}
732 	}
733 
734 	return tp;
735 }
736 
737 void
lws_threadpool_finish(struct lws_threadpool * tp)738 lws_threadpool_finish(struct lws_threadpool *tp)
739 {
740 	struct lws_threadpool_task **c, *task;
741 
742 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
743 
744 	/* nothing new can start, running jobs will abort as STOPPED and the
745 	 * pool threads will exit ASAP (they are joined in destroy) */
746 	tp->destroying = 1;
747 
748 	/* stop everyone in the pending queue and move to the done queue */
749 
750 	c = &tp->task_queue_head;
751 	while (*c) {
752 		task = *c;
753 		*c = task->task_queue_next;
754 		task->task_queue_next = tp->task_done_head;
755 		tp->task_done_head = task;
756 		state_transition(task, LWS_TP_STATUS_STOPPED);
757 		tp->queue_depth--;
758 		tp->done_queue_depth++;
759 		task->done = lws_now_usecs();
760 
761 		c = &task->task_queue_next;
762 	}
763 
764 	pthread_cond_broadcast(&tp->wake_idle);
765 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
766 }
767 
768 void
lws_threadpool_destroy(struct lws_threadpool * tp)769 lws_threadpool_destroy(struct lws_threadpool *tp)
770 {
771 	struct lws_threadpool_task *task, *next;
772 	struct lws_threadpool **ptp;
773 	void *retval;
774 	int n;
775 
776 	/* remove us from the context list of threadpools */
777 
778 	lws_context_lock(tp->context, __func__);
779 	ptp = &tp->context->tp_list_head;
780 
781 	while (*ptp) {
782 		if (*ptp == tp) {
783 			*ptp = tp->tp_list;
784 			break;
785 		}
786 		ptp = &(*ptp)->tp_list;
787 	}
788 
789 	lws_context_unlock(tp->context);
790 
791 	/*
792 	 * Wake up the threadpool guys and tell them to exit
793 	 */
794 
795 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
796 	tp->destroying = 1;
797 	pthread_cond_broadcast(&tp->wake_idle);
798 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
799 
800 	lws_threadpool_dump(tp);
801 
802 	lwsl_info("%s: waiting for threads to rejoin\n", __func__);
803 #if defined(WIN32)
804 	Sleep(1000);
805 #endif
806 
807 	for (n = 0; n < tp->threads_in_pool; n++) {
808 		task = tp->pool_list[n].task;
809 
810 		pthread_join(tp->pool_list[n].thread, &retval);
811 		pthread_mutex_destroy(&tp->pool_list[n].lock);
812 	}
813 	lwsl_info("%s: all threadpools exited\n", __func__);
814 #if defined(WIN32)
815 	Sleep(1000);
816 #endif
817 
818 	task = tp->task_done_head;
819 	while (task) {
820 		next = task->task_queue_next;
821 		lws_threadpool_task_cleanup_destroy(task);
822 		tp->done_queue_depth--;
823 		task = next;
824 	}
825 
826 	pthread_mutex_destroy(&tp->lock);
827 
828 	memset(tp, 0xdd, sizeof(*tp));
829 	lws_free(tp);
830 }
831 
832 /*
833  * We want to stop and destroy the tasks and related priv.
834  */
835 
836 int
lws_threadpool_dequeue_task(struct lws_threadpool_task * task)837 lws_threadpool_dequeue_task(struct lws_threadpool_task *task)
838 {
839 	struct lws_threadpool *tp;
840 	struct lws_threadpool_task **c;
841 	int n;
842 
843 	tp = task->tp;
844 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
845 
846 	if (task->outlive && !tp->destroying) {
847 
848 		/* disconnect from wsi, and wsi from task */
849 
850 		lws_dll2_remove(&task->list);
851 		task->args.wsi = NULL;
852 #if defined(LWS_WITH_SECURE_STREAMS)
853 		task->args.ss = NULL;
854 #endif
855 
856 		goto bail;
857 	}
858 
859 
860 	c = &tp->task_queue_head;
861 
862 	/* is he queued waiting for a chance to run?  Mark him as stopped and
863 	 * move him on to the done queue */
864 
865 	while (*c) {
866 		if ((*c) == task) {
867 			*c = task->task_queue_next;
868 			task->task_queue_next = tp->task_done_head;
869 			tp->task_done_head = task;
870 			state_transition(task, LWS_TP_STATUS_STOPPED);
871 			tp->queue_depth--;
872 			tp->done_queue_depth++;
873 			task->done = lws_now_usecs();
874 
875 			lwsl_debug("%s: tp %p: removed queued task %s\n",
876 				    __func__, tp, lws_wsi_tag(task_to_wsi(task)));
877 
878 			break;
879 		}
880 		c = &(*c)->task_queue_next;
881 	}
882 
883 	/* is he on the done queue? */
884 
885 	c = &tp->task_done_head;
886 	while (*c) {
887 		if ((*c) == task) {
888 			*c = task->task_queue_next;
889 			task->task_queue_next = NULL;
890 			lws_threadpool_task_cleanup_destroy(task);
891 			tp->done_queue_depth--;
892 			goto bail;
893 		}
894 		c = &(*c)->task_queue_next;
895 	}
896 
897 	/* he's not in the queue... is he already running on a thread? */
898 
899 	for (n = 0; n < tp->threads_in_pool; n++) {
900 		if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
901 			continue;
902 
903 		/*
904 		 * ensure we don't collide with tests or changes in the
905 		 * worker thread
906 		 */
907 		pthread_mutex_lock(&tp->pool_list[n].lock);
908 
909 		/*
910 		 * mark him as having been requested to stop...
911 		 * the caller will hear about it in his service thread
912 		 * context as a request to close
913 		 */
914 		state_transition(task, LWS_TP_STATUS_STOPPING);
915 
916 		/* disconnect from wsi, and wsi from task */
917 
918 		lws_dll2_remove(&task->list);
919 		task->args.wsi = NULL;
920 #if defined(LWS_WITH_SECURE_STREAMS)
921 		task->args.ss = NULL;
922 #endif
923 
924 		pthread_mutex_unlock(&tp->pool_list[n].lock);
925 
926 		lwsl_debug("%s: tp %p: request stop running task "
927 			    "for %s\n", __func__, tp,
928 			    lws_wsi_tag(task_to_wsi(task)));
929 
930 		break;
931 	}
932 
933 	if (n == tp->threads_in_pool) {
934 		/* can't find it */
935 		lwsl_notice("%s: tp %p: no task for %s, decoupling\n",
936 			    __func__, tp, lws_wsi_tag(task_to_wsi(task)));
937 		lws_dll2_remove(&task->list);
938 		task->args.wsi = NULL;
939 #if defined(LWS_WITH_SECURE_STREAMS)
940 		task->args.ss = NULL;
941 #endif
942 	}
943 
944 bail:
945 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
946 
947 	return 0;
948 }
949 
950 int
lws_threadpool_dequeue(struct lws * wsi)951 lws_threadpool_dequeue(struct lws *wsi) /* deprecated */
952 {
953 	struct lws_threadpool_task *task;
954 
955 	if (!wsi->tp_task_owner.count)
956 		return 0;
957 	assert(wsi->tp_task_owner.count != 1);
958 
959 	task = lws_container_of(wsi->tp_task_owner.head,
960 				struct lws_threadpool_task, list);
961 
962 	return lws_threadpool_dequeue_task(task);
963 }
964 
965 struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool * tp,const struct lws_threadpool_task_args * args,const char * format,...)966 lws_threadpool_enqueue(struct lws_threadpool *tp,
967 		       const struct lws_threadpool_task_args *args,
968 		       const char *format, ...)
969 {
970 	struct lws_threadpool_task *task = NULL;
971 	va_list ap;
972 
973 	if (tp->destroying)
974 		return NULL;
975 
976 #if defined(LWS_WITH_SECURE_STREAMS)
977 	assert(args->ss || args->wsi);
978 #endif
979 
980 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
981 
982 	/*
983 	 * if there's room on the queue, the job always goes on the queue
984 	 * first, then any free thread may pick it up after the wake_idle
985 	 */
986 
987 	if (tp->queue_depth == tp->max_queue_depth) {
988 		lwsl_notice("%s: queue reached limit %d\n", __func__,
989 			    tp->max_queue_depth);
990 
991 		goto bail;
992 	}
993 
994 	/*
995 	 * create the task object
996 	 */
997 
998 	task = lws_malloc(sizeof(*task), __func__);
999 	if (!task)
1000 		goto bail;
1001 
1002 	memset(task, 0, sizeof(*task));
1003 	pthread_cond_init(&task->wake_idle, NULL);
1004 	task->args = *args;
1005 	task->tp = tp;
1006 	task->created = lws_now_usecs();
1007 
1008 	va_start(ap, format);
1009 	vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
1010 	va_end(ap);
1011 
1012 	/*
1013 	 * add him on the tp task queue
1014 	 */
1015 
1016 	task->task_queue_next = tp->task_queue_head;
1017 	state_transition(task, LWS_TP_STATUS_QUEUED);
1018 	tp->task_queue_head = task;
1019 	tp->queue_depth++;
1020 
1021 	/*
1022 	 * mark the wsi itself as depending on this tp (so wsi close for
1023 	 * whatever reason can clean up)
1024 	 */
1025 
1026 #if defined(LWS_WITH_SECURE_STREAMS)
1027 	if (args->ss)
1028 		lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
1029 	else
1030 #endif
1031 		lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
1032 
1033 	lwsl_thread("%s: tp %s: enqueued task %p (%s) for %s, depth %d\n",
1034 		    __func__, tp->name, task, task->name,
1035 		    lws_wsi_tag(task_to_wsi(task)), tp->queue_depth);
1036 
1037 	/* alert any idle thread there's something new on the task list */
1038 
1039 	lws_memory_barrier();
1040 	pthread_cond_signal(&tp->wake_idle);
1041 
1042 bail:
1043 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
1044 
1045 	return task;
1046 }
1047 
1048 /* this should be called from the service thread */
1049 
1050 enum lws_threadpool_task_status
lws_threadpool_task_status(struct lws_threadpool_task * task,void ** user)1051 lws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
1052 {
1053 	enum lws_threadpool_task_status status;
1054 	struct lws_threadpool *tp = task->tp;
1055 
1056 	if (!tp)
1057 		return LWS_TP_STATUS_FINISHED;
1058 
1059 	*user = task->args.user;
1060 	status = task->status;
1061 
1062 	if (status == LWS_TP_STATUS_FINISHED ||
1063 	    status == LWS_TP_STATUS_STOPPED) {
1064 		char buf[160];
1065 
1066 		pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
1067 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
1068 		lwsl_thread("%s: %s: service thread REAPING: %s\n",
1069 			    __func__, tp->name, buf);
1070 		__lws_threadpool_reap(task);
1071 		lws_memory_barrier();
1072 		pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
1073 	}
1074 
1075 	return status;
1076 }
1077 
1078 enum lws_threadpool_task_status
lws_threadpool_task_status_noreap(struct lws_threadpool_task * task)1079 lws_threadpool_task_status_noreap(struct lws_threadpool_task *task)
1080 {
1081 	return task->status;
1082 }
1083 
1084 enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws * wsi,struct lws_threadpool_task ** _task,void ** user)1085 lws_threadpool_task_status_wsi(struct lws *wsi,
1086 			       struct lws_threadpool_task **_task, void **user)
1087 {
1088 	struct lws_threadpool_task *task;
1089 
1090 	if (!wsi->tp_task_owner.count) {
1091 		lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
1092 		return LWS_TP_STATUS_FINISHED;
1093 	}
1094 
1095 	assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */
1096 
1097 	task = lws_container_of(wsi->tp_task_owner.head,
1098 				struct lws_threadpool_task, list);
1099 
1100 	*_task = task;
1101 
1102 	return lws_threadpool_task_status(task, user);
1103 }
1104 
1105 void
lws_threadpool_task_sync(struct lws_threadpool_task * task,int stop)1106 lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1107 {
1108 	lwsl_debug("%s\n", __func__);
1109 	if (!task)
1110 		return;
1111 
1112 	if (stop)
1113 		state_transition(task, LWS_TP_STATUS_STOPPING);
1114 
1115 	pthread_mutex_lock(&task->tp->lock);
1116 	pthread_cond_signal(&task->wake_idle);
1117 	pthread_mutex_unlock(&task->tp->lock);
1118 }
1119 
1120 int
lws_threadpool_foreach_task_wsi(struct lws * wsi,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1121 lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
1122 				int (*cb)(struct lws_threadpool_task *task,
1123 					  void *user))
1124 {
1125 	struct lws_threadpool_task *task1;
1126 
1127 	if (wsi->tp_task_owner.head == NULL)
1128 		return 0;
1129 
1130 	task1 = lws_container_of(wsi->tp_task_owner.head,
1131 				 struct lws_threadpool_task, list);
1132 
1133 	pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */
1134 
1135 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1136 				   wsi->tp_task_owner.head) {
1137 		struct lws_threadpool_task *task = lws_container_of(d,
1138 					struct lws_threadpool_task, list);
1139 
1140 		if (cb(task, user)) {
1141 			pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1142 			return 1;
1143 		}
1144 
1145 	} lws_end_foreach_dll_safe(d, d1);
1146 
1147 	pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1148 
1149 	return 0;
1150 }
1151 
1152 #if defined(LWS_WITH_SECURE_STREAMS)
1153 int
lws_threadpool_foreach_task_ss(struct lws_ss_handle * ss,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1154 lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
1155 			       int (*cb)(struct lws_threadpool_task *task,
1156 					 void *user))
1157 {
1158 	if (!ss->wsi)
1159 		return 0;
1160 
1161 	return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb);
1162 }
1163 #endif
1164 
1165 static int
disassociate_wsi(struct lws_threadpool_task * task,void * user)1166 disassociate_wsi(struct lws_threadpool_task *task,
1167 		  void *user)
1168 {
1169 	task->args.wsi = NULL;
1170 	lws_dll2_remove(&task->list);
1171 
1172 	return 0;
1173 }
1174 
1175 void
lws_threadpool_wsi_closing(struct lws * wsi)1176 lws_threadpool_wsi_closing(struct lws *wsi)
1177 {
1178 	lws_threadpool_foreach_task_wsi(wsi, NULL, disassociate_wsi);
1179 }
1180 
1181 struct lws_threadpool_task *
lws_threadpool_get_task_wsi(struct lws * wsi)1182 lws_threadpool_get_task_wsi(struct lws *wsi)
1183 {
1184 	if (wsi->tp_task_owner.head == NULL)
1185 		return NULL;
1186 
1187 	return lws_container_of(wsi->tp_task_owner.head,
1188 				 struct lws_threadpool_task, list);
1189 }
1190 
1191 #if defined(LWS_WITH_SECURE_STREAMS)
1192 struct lws_threadpool_task *
lws_threadpool_get_task_ss(struct lws_ss_handle * ss)1193 lws_threadpool_get_task_ss(struct lws_ss_handle *ss)
1194 {
1195 	return lws_threadpool_get_task_wsi(ss->wsi);
1196 }
1197 #endif
1198