1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #if !defined(_GNU_SOURCE)
26 #define _GNU_SOURCE
27 #endif
28
29 #if defined(WIN32)
30 #define HAVE_STRUCT_TIMESPEC
31 #if defined(pid_t)
32 #undef pid_t
33 #endif
34 #endif
35 #include <pthread.h>
36
37 #include "private-lib-core.h"
38
39 #include <string.h>
40 #include <stdio.h>
41
42 struct lws_threadpool;
43
44 struct lws_threadpool_task {
45 struct lws_threadpool_task *task_queue_next;
46
47 struct lws_threadpool *tp;
48 char name[32];
49 struct lws_threadpool_task_args args;
50
51 lws_dll2_t list;
52
53 lws_usec_t created;
54 lws_usec_t acquired;
55 lws_usec_t done;
56 lws_usec_t entered_state;
57
58 lws_usec_t acc_running;
59 lws_usec_t acc_syncing;
60
61 pthread_cond_t wake_idle;
62
63 enum lws_threadpool_task_status status;
64
65 int late_sync_retries;
66
67 char wanted_writeable_cb;
68 char outlive;
69 };
70
71 struct lws_pool {
72 struct lws_threadpool *tp;
73 pthread_t thread;
74 pthread_mutex_t lock; /* part of task wake_idle */
75 struct lws_threadpool_task *task;
76 lws_usec_t acquired;
77 int worker_index;
78 };
79
80 struct lws_threadpool {
81 pthread_mutex_t lock; /* protects all pool lists */
82 pthread_cond_t wake_idle;
83 struct lws_pool *pool_list;
84
85 struct lws_context *context;
86 struct lws_threadpool *tp_list; /* context list of threadpools */
87
88 struct lws_threadpool_task *task_queue_head;
89 struct lws_threadpool_task *task_done_head;
90
91 char name[32];
92
93 int threads_in_pool;
94 int queue_depth;
95 int done_queue_depth;
96 int max_queue_depth;
97 int running_tasks;
98
99 unsigned int destroying:1;
100 };
101
102 static int
ms_delta(lws_usec_t now,lws_usec_t then)103 ms_delta(lws_usec_t now, lws_usec_t then)
104 {
105 return (int)((now - then) / 1000);
106 }
107
108 static void
us_accrue(lws_usec_t * acc,lws_usec_t then)109 us_accrue(lws_usec_t *acc, lws_usec_t then)
110 {
111 lws_usec_t now = lws_now_usecs();
112
113 *acc += now - then;
114 }
115
116 static int
pc_delta(lws_usec_t now,lws_usec_t then,lws_usec_t us)117 pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
118 {
119 lws_usec_t delta = (now - then) + 1;
120
121 return (int)((us * 100) / delta);
122 }
123
124 static void
__lws_threadpool_task_dump(struct lws_threadpool_task * task,char * buf,int len)125 __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
126 {
127 lws_usec_t now = lws_now_usecs();
128 char *end = buf + len - 1;
129 int syncms = 0, runms = 0;
130
131 if (!task->acquired) {
132 buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
133 "task: %s, QUEUED queued: %dms",
134 task->name, ms_delta(now, task->created));
135
136 return;
137 }
138
139 if (task->acc_running)
140 runms = (int)task->acc_running;
141
142 if (task->acc_syncing)
143 syncms = (int)task->acc_syncing;
144
145 if (!task->done) {
146 buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
147 "task: %s, ONGOING state %d (%dms) alive: %dms "
148 "(queued %dms, acquired: %dms, "
149 "run: %d%%, sync: %d%%)", task->name, task->status,
150 ms_delta(now, task->entered_state),
151 ms_delta(now, task->created),
152 ms_delta(task->acquired, task->created),
153 ms_delta(now, task->acquired),
154 pc_delta(now, task->acquired, runms),
155 pc_delta(now, task->acquired, syncms));
156
157 return;
158 }
159
160 lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
161 "task: %s, DONE state %d lived: %dms "
162 "(queued %dms, on thread: %dms, "
163 "ran: %d%%, synced: %d%%)", task->name, task->status,
164 ms_delta(task->done, task->created),
165 ms_delta(task->acquired, task->created),
166 ms_delta(task->done, task->acquired),
167 pc_delta(task->done, task->acquired, runms),
168 pc_delta(task->done, task->acquired, syncms));
169 }
170
171 void
lws_threadpool_dump(struct lws_threadpool * tp)172 lws_threadpool_dump(struct lws_threadpool *tp)
173 {
174 #if 0
175 //defined(_DEBUG)
176 struct lws_threadpool_task **c;
177 char buf[160];
178 int n, count;
179
180 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
181
182 lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
183 tp->name, tp->queue_depth, tp->running_tasks,
184 tp->done_queue_depth);
185
186 count = 0;
187 c = &tp->task_queue_head;
188 while (*c) {
189 struct lws_threadpool_task *task = *c;
190 __lws_threadpool_task_dump(task, buf, sizeof(buf));
191 lwsl_thread(" - %s\n", buf);
192 count++;
193
194 c = &(*c)->task_queue_next;
195 }
196
197 if (count != tp->queue_depth)
198 lwsl_err("%s: tp says queue depth %d, but actually %d\n",
199 __func__, tp->queue_depth, count);
200
201 count = 0;
202 for (n = 0; n < tp->threads_in_pool; n++) {
203 struct lws_pool *pool = &tp->pool_list[n];
204 struct lws_threadpool_task *task = pool->task;
205
206 if (task) {
207 __lws_threadpool_task_dump(task, buf, sizeof(buf));
208 lwsl_thread(" - worker %d: %s\n", n, buf);
209 count++;
210 }
211 }
212
213 if (count != tp->running_tasks)
214 lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
215 __func__, tp->running_tasks, count);
216
217 count = 0;
218 c = &tp->task_done_head;
219 while (*c) {
220 struct lws_threadpool_task *task = *c;
221 __lws_threadpool_task_dump(task, buf, sizeof(buf));
222 lwsl_thread(" - %s\n", buf);
223 count++;
224
225 c = &(*c)->task_queue_next;
226 }
227
228 if (count != tp->done_queue_depth)
229 lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
230 __func__, tp->done_queue_depth, count);
231
232 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
233 #endif
234 }
235
236 static void
state_transition(struct lws_threadpool_task * task,enum lws_threadpool_task_status status)237 state_transition(struct lws_threadpool_task *task,
238 enum lws_threadpool_task_status status)
239 {
240 task->entered_state = lws_now_usecs();
241 task->status = status;
242 }
243
244 static struct lws *
task_to_wsi(struct lws_threadpool_task * task)245 task_to_wsi(struct lws_threadpool_task *task)
246 {
247 #if defined(LWS_WITH_SECURE_STREAMS)
248 if (task->args.ss)
249 return task->args.ss->wsi;
250 #endif
251 return task->args.wsi;
252 }
253
254 static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task * task)255 lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
256 {
257 if (task->args.cleanup)
258 task->args.cleanup(task_to_wsi(task), task->args.user);
259
260 lws_dll2_remove(&task->list);
261
262 lwsl_thread("%s: tp %p: cleaned finished task for %s\n",
263 __func__, task->tp, lws_wsi_tag(task_to_wsi(task)));
264
265 lws_free(task);
266 }
267
268 static void
__lws_threadpool_reap(struct lws_threadpool_task * task)269 __lws_threadpool_reap(struct lws_threadpool_task *task)
270 {
271 struct lws_threadpool_task **c, *t = NULL;
272 struct lws_threadpool *tp = task->tp;
273
274 /* remove the task from the done queue */
275
276 if (tp) {
277 c = &tp->task_done_head;
278
279 while (*c) {
280 if ((*c) == task) {
281 t = *c;
282 *c = t->task_queue_next;
283 t->task_queue_next = NULL;
284 tp->done_queue_depth--;
285
286 lwsl_thread("%s: tp %s: reaped task %s\n", __func__,
287 tp->name, lws_wsi_tag(task_to_wsi(task)));
288
289 break;
290 }
291 c = &(*c)->task_queue_next;
292 }
293
294 if (!t) {
295 lwsl_err("%s: task %p not in done queue\n", __func__, task);
296 /*
297 * This shouldn't occur, but in this case not really
298 * safe to assume there's a task to destroy
299 */
300 return;
301 }
302 } else
303 lwsl_err("%s: task->tp NULL already\n", __func__);
304
305 /* call the task's cleanup and delete the task itself */
306
307 lws_threadpool_task_cleanup_destroy(task);
308 }
309
310 /*
311 * this gets called from each tsi service context after the service was
312 * cancelled... we need to ask for the writable callback from the matching
313 * tsi context for any wsis bound to a worked thread that need it
314 */
315
316 int
lws_threadpool_tsi_context(struct lws_context * context,int tsi)317 lws_threadpool_tsi_context(struct lws_context *context, int tsi)
318 {
319 struct lws_threadpool_task **c, *task = NULL;
320 struct lws_threadpool *tp;
321 struct lws *wsi;
322
323 lws_context_lock(context, __func__);
324
325 tp = context->tp_list_head;
326 while (tp) {
327 int n;
328
329 /* for the running (syncing...) tasks... */
330
331 for (n = 0; n < tp->threads_in_pool; n++) {
332 struct lws_pool *pool = &tp->pool_list[n];
333
334 task = pool->task;
335 if (!task)
336 continue;
337
338 wsi = task_to_wsi(task);
339 if (!wsi || wsi->tsi != tsi ||
340 (!task->wanted_writeable_cb &&
341 task->status != LWS_TP_STATUS_SYNCING))
342 continue;
343
344 task->wanted_writeable_cb = 0;
345 lws_memory_barrier();
346
347 /*
348 * finally... we can ask for the callback on
349 * writable from the correct service thread
350 * context
351 */
352
353 lws_callback_on_writable(wsi);
354 }
355
356 /* for the done tasks... */
357
358 c = &tp->task_done_head;
359
360 while (*c) {
361 task = *c;
362 wsi = task_to_wsi(task);
363
364 if (wsi && wsi->tsi == tsi &&
365 (task->wanted_writeable_cb ||
366 task->status == LWS_TP_STATUS_SYNCING)) {
367
368 task->wanted_writeable_cb = 0;
369 lws_memory_barrier();
370
371 /*
372 * finally... we can ask for the callback on
373 * writable from the correct service thread
374 * context
375 */
376
377 lws_callback_on_writable(wsi);
378 }
379
380 c = &task->task_queue_next;
381 }
382
383 tp = tp->tp_list;
384 }
385
386 lws_context_unlock(context);
387
388 return 0;
389 }
390
391 static int
lws_threadpool_worker_sync(struct lws_pool * pool,struct lws_threadpool_task * task)392 lws_threadpool_worker_sync(struct lws_pool *pool,
393 struct lws_threadpool_task *task)
394 {
395 enum lws_threadpool_task_status temp;
396 struct timespec abstime;
397 struct lws *wsi;
398 int tries = 15;
399
400 /* block until writable acknowledges */
401 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
402 pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
403
404 lwsl_info("%s: %s: task %p (%s): syncing with %s\n", __func__,
405 pool->tp->name, task, task->name, lws_wsi_tag(task_to_wsi(task)));
406
407 temp = task->status;
408 state_transition(task, LWS_TP_STATUS_SYNCING);
409 while (tries--) {
410 wsi = task_to_wsi(task);
411
412 /*
413 * if the wsi is no longer attached to this task, there is
414 * nothing we can sync to usefully. Since the work wants to
415 * sync, it means we should react to the situation by telling
416 * the task it can't continue usefully by stopping it.
417 */
418
419 if (!wsi) {
420 lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
421 "wsi to sync to\n", __func__, pool->tp->name,
422 task, task->name);
423
424 state_transition(task, LWS_TP_STATUS_STOPPING);
425 goto done;
426 }
427
428 /*
429 * So tries times this is the maximum time between SYNC asking
430 * for a callback on writable and actually getting it we are
431 * willing to sit still for.
432 *
433 * If it is exceeded, we will stop the task.
434 */
435 abstime.tv_sec = time(NULL) + 2;
436 abstime.tv_nsec = 0;
437
438 task->wanted_writeable_cb = 1;
439 lws_memory_barrier();
440
441 /*
442 * This will cause lws_threadpool_tsi_context() to get called
443 * from each tsi service context, where we can safely ask for
444 * a callback on writeable on the wsi we are associated with.
445 */
446 lws_cancel_service(lws_get_context(wsi));
447
448 /*
449 * so the danger here is that we asked for a writable callback
450 * on the wsi, but for whatever reason, we are never going to
451 * get one. To avoid deadlocking forever, we allow a set time
452 * for the sync to happen naturally, otherwise the cond wait
453 * times out and we stop the task.
454 */
455
456 if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
457 &abstime) == ETIMEDOUT) {
458 task->late_sync_retries++;
459 if (!tries) {
460 lwsl_err("%s: %s: task %p (%s): SYNC timed out "
461 "(associated %s)\n",
462 __func__, pool->tp->name, task,
463 task->name, lws_wsi_tag(task_to_wsi(task)));
464
465 state_transition(task, LWS_TP_STATUS_STOPPING);
466 goto done;
467 }
468
469 continue;
470 } else
471 break;
472 }
473
474 if (task->status == LWS_TP_STATUS_SYNCING)
475 state_transition(task, temp);
476
477 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
478
479 done:
480 pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
481
482 return 0;
483 }
484
485 #if !defined(WIN32)
486 static int dummy;
487 #endif
488
489 static void *
lws_threadpool_worker(void * d)490 lws_threadpool_worker(void *d)
491 {
492 struct lws_threadpool_task **c, **c2, *task;
493 struct lws_pool *pool = d;
494 struct lws_threadpool *tp = pool->tp;
495 char buf[160];
496
497 while (!tp->destroying) {
498
499 /* we have no running task... wait and get one from the queue */
500
501 pthread_mutex_lock(&tp->lock); /* =================== tp lock */
502
503 /*
504 * if there's no task already waiting in the queue, wait for
505 * the wake_idle condition to signal us that might have changed
506 */
507 while (!tp->task_queue_head && !tp->destroying)
508 pthread_cond_wait(&tp->wake_idle, &tp->lock);
509
510 if (tp->destroying) {
511 lwsl_notice("%s: bailing\n", __func__);
512 goto doneski;
513 }
514
515 c = &tp->task_queue_head;
516 c2 = NULL;
517 task = NULL;
518 pool->task = NULL;
519
520 /* look at the queue tail */
521 while (*c) {
522 c2 = c;
523 c = &(*c)->task_queue_next;
524 }
525
526 /* is there a task at the queue tail? */
527 if (c2 && *c2) {
528 pool->task = task = *c2;
529 task->acquired = pool->acquired = lws_now_usecs();
530 /* remove it from the queue */
531 *c2 = task->task_queue_next;
532 task->task_queue_next = NULL;
533 tp->queue_depth--;
534 /* mark it as running */
535 state_transition(task, LWS_TP_STATUS_RUNNING);
536 }
537
538 /* someone else got it first... wait and try again */
539 if (!task) {
540 pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */
541 continue;
542 }
543
544 task->wanted_writeable_cb = 0;
545
546 /* we have acquired a new task */
547
548 __lws_threadpool_task_dump(task, buf, sizeof(buf));
549
550 lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
551 __func__, tp->name, pool->worker_index, buf);
552 tp->running_tasks++;
553
554 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
555
556 /*
557 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
558 * "resurface" periodically, and get called again with
559 * cont = 1 immediately to indicate it is picking up where it
560 * left off if the task is not being "stopped".
561 *
562 * This allows long tasks to respond to requests to stop in
563 * a clean and opaque way.
564 *
565 * 2) The task can return with LWS_TP_RETURN_SYNC to register
566 * a "callback on writable" request on the service thread and
567 * block until it hears back from the WRITABLE handler.
568 *
569 * This allows the work on the thread to be synchronized to the
570 * previous work being dispatched cleanly.
571 *
572 * 3) The task can return with LWS_TP_RETURN_FINISHED to
573 * indicate its work is completed nicely.
574 *
575 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
576 * it stopped and cleaned up after incomplete work.
577 */
578
579 do {
580 lws_usec_t then;
581 int n;
582
583 if (tp->destroying || !task_to_wsi(task)) {
584 lwsl_info("%s: stopping on wsi gone\n", __func__);
585 state_transition(task, LWS_TP_STATUS_STOPPING);
586 }
587
588 then = lws_now_usecs();
589 n = (int)task->args.task(task->args.user, task->status);
590 lwsl_debug(" %d, status %d\n", n, task->status);
591 us_accrue(&task->acc_running, then);
592 if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
593 task->outlive = 1;
594 switch (n & 7) {
595 case LWS_TP_RETURN_CHECKING_IN:
596 /* if not destroying the tp, continue */
597 break;
598 case LWS_TP_RETURN_SYNC:
599 if (!task_to_wsi(task)) {
600 lwsl_debug("%s: task that wants to "
601 "outlive lost wsi asked "
602 "to sync: bypassed\n",
603 __func__);
604 break;
605 }
606 /* block until writable acknowledges */
607 then = lws_now_usecs();
608 lws_threadpool_worker_sync(pool, task);
609 us_accrue(&task->acc_syncing, then);
610 break;
611 case LWS_TP_RETURN_FINISHED:
612 state_transition(task, LWS_TP_STATUS_FINISHED);
613 break;
614 case LWS_TP_RETURN_STOPPED:
615 state_transition(task, LWS_TP_STATUS_STOPPED);
616 break;
617 }
618 } while (task->status == LWS_TP_STATUS_RUNNING);
619
620 pthread_mutex_lock(&tp->lock); /* =================== tp lock */
621
622 tp->running_tasks--;
623
624 if (pool->task->status == LWS_TP_STATUS_STOPPING)
625 state_transition(task, LWS_TP_STATUS_STOPPED);
626
627 /* move the task to the done queue */
628
629 pool->task->task_queue_next = tp->task_done_head;
630 tp->task_done_head = task;
631 tp->done_queue_depth++;
632 pool->task->done = lws_now_usecs();
633
634 if (!pool->task->args.wsi &&
635 (pool->task->status == LWS_TP_STATUS_STOPPED ||
636 pool->task->status == LWS_TP_STATUS_FINISHED)) {
637
638 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
639 lwsl_thread("%s: %s: worker %d REAPING: %s\n",
640 __func__, tp->name, pool->worker_index,
641 buf);
642
643 /*
644 * there is no longer any wsi attached, so nothing is
645 * going to take care of reaping us. So we must take
646 * care of it ourselves.
647 */
648 __lws_threadpool_reap(pool->task);
649 } else {
650
651 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
652 lwsl_thread("%s: %s: worker %d DONE: %s\n",
653 __func__, tp->name, pool->worker_index,
654 buf);
655
656 /* signal the associated wsi to take a fresh look at
657 * task status */
658
659 if (task_to_wsi(pool->task)) {
660 task->wanted_writeable_cb = 1;
661
662 lws_cancel_service(
663 lws_get_context(task_to_wsi(pool->task)));
664 }
665 }
666
667 doneski:
668 pool->task = NULL;
669 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
670 }
671
672 lwsl_notice("%s: Exiting\n", __func__);
673
674 /* threadpool is being destroyed */
675 #if !defined(WIN32)
676 pthread_exit(&dummy);
677 #endif
678
679 return NULL;
680 }
681
682 struct lws_threadpool *
lws_threadpool_create(struct lws_context * context,const struct lws_threadpool_create_args * args,const char * format,...)683 lws_threadpool_create(struct lws_context *context,
684 const struct lws_threadpool_create_args *args,
685 const char *format, ...)
686 {
687 struct lws_threadpool *tp;
688 va_list ap;
689 int n;
690
691 tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads),
692 "threadpool alloc");
693 if (!tp)
694 return NULL;
695
696 memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads));
697 tp->pool_list = (struct lws_pool *)(tp + 1);
698 tp->max_queue_depth = args->max_queue_depth;
699
700 va_start(ap, format);
701 n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
702 va_end(ap);
703
704 lws_context_lock(context, __func__);
705
706 tp->context = context;
707 tp->tp_list = context->tp_list_head;
708 context->tp_list_head = tp;
709
710 lws_context_unlock(context);
711
712 pthread_mutex_init(&tp->lock, NULL);
713 pthread_cond_init(&tp->wake_idle, NULL);
714
715 for (n = 0; n < args->threads; n++) {
716 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
717 char name[16];
718 #endif
719 tp->pool_list[n].tp = tp;
720 tp->pool_list[n].worker_index = n;
721 pthread_mutex_init(&tp->pool_list[n].lock, NULL);
722 if (pthread_create(&tp->pool_list[n].thread, NULL,
723 lws_threadpool_worker, &tp->pool_list[n])) {
724 lwsl_err("thread creation failed\n");
725 } else {
726 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
727 lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
728 pthread_setname_np(tp->pool_list[n].thread, name);
729 #endif
730 tp->threads_in_pool++;
731 }
732 }
733
734 return tp;
735 }
736
737 void
lws_threadpool_finish(struct lws_threadpool * tp)738 lws_threadpool_finish(struct lws_threadpool *tp)
739 {
740 struct lws_threadpool_task **c, *task;
741
742 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
743
744 /* nothing new can start, running jobs will abort as STOPPED and the
745 * pool threads will exit ASAP (they are joined in destroy) */
746 tp->destroying = 1;
747
748 /* stop everyone in the pending queue and move to the done queue */
749
750 c = &tp->task_queue_head;
751 while (*c) {
752 task = *c;
753 *c = task->task_queue_next;
754 task->task_queue_next = tp->task_done_head;
755 tp->task_done_head = task;
756 state_transition(task, LWS_TP_STATUS_STOPPED);
757 tp->queue_depth--;
758 tp->done_queue_depth++;
759 task->done = lws_now_usecs();
760
761 c = &task->task_queue_next;
762 }
763
764 pthread_cond_broadcast(&tp->wake_idle);
765 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
766 }
767
768 void
lws_threadpool_destroy(struct lws_threadpool * tp)769 lws_threadpool_destroy(struct lws_threadpool *tp)
770 {
771 struct lws_threadpool_task *task, *next;
772 struct lws_threadpool **ptp;
773 void *retval;
774 int n;
775
776 /* remove us from the context list of threadpools */
777
778 lws_context_lock(tp->context, __func__);
779 ptp = &tp->context->tp_list_head;
780
781 while (*ptp) {
782 if (*ptp == tp) {
783 *ptp = tp->tp_list;
784 break;
785 }
786 ptp = &(*ptp)->tp_list;
787 }
788
789 lws_context_unlock(tp->context);
790
791 /*
792 * Wake up the threadpool guys and tell them to exit
793 */
794
795 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
796 tp->destroying = 1;
797 pthread_cond_broadcast(&tp->wake_idle);
798 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
799
800 lws_threadpool_dump(tp);
801
802 lwsl_info("%s: waiting for threads to rejoin\n", __func__);
803 #if defined(WIN32)
804 Sleep(1000);
805 #endif
806
807 for (n = 0; n < tp->threads_in_pool; n++) {
808 task = tp->pool_list[n].task;
809
810 pthread_join(tp->pool_list[n].thread, &retval);
811 pthread_mutex_destroy(&tp->pool_list[n].lock);
812 }
813 lwsl_info("%s: all threadpools exited\n", __func__);
814 #if defined(WIN32)
815 Sleep(1000);
816 #endif
817
818 task = tp->task_done_head;
819 while (task) {
820 next = task->task_queue_next;
821 lws_threadpool_task_cleanup_destroy(task);
822 tp->done_queue_depth--;
823 task = next;
824 }
825
826 pthread_mutex_destroy(&tp->lock);
827
828 memset(tp, 0xdd, sizeof(*tp));
829 lws_free(tp);
830 }
831
832 /*
833 * We want to stop and destroy the tasks and related priv.
834 */
835
836 int
lws_threadpool_dequeue_task(struct lws_threadpool_task * task)837 lws_threadpool_dequeue_task(struct lws_threadpool_task *task)
838 {
839 struct lws_threadpool *tp;
840 struct lws_threadpool_task **c;
841 int n;
842
843 tp = task->tp;
844 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
845
846 if (task->outlive && !tp->destroying) {
847
848 /* disconnect from wsi, and wsi from task */
849
850 lws_dll2_remove(&task->list);
851 task->args.wsi = NULL;
852 #if defined(LWS_WITH_SECURE_STREAMS)
853 task->args.ss = NULL;
854 #endif
855
856 goto bail;
857 }
858
859
860 c = &tp->task_queue_head;
861
862 /* is he queued waiting for a chance to run? Mark him as stopped and
863 * move him on to the done queue */
864
865 while (*c) {
866 if ((*c) == task) {
867 *c = task->task_queue_next;
868 task->task_queue_next = tp->task_done_head;
869 tp->task_done_head = task;
870 state_transition(task, LWS_TP_STATUS_STOPPED);
871 tp->queue_depth--;
872 tp->done_queue_depth++;
873 task->done = lws_now_usecs();
874
875 lwsl_debug("%s: tp %p: removed queued task %s\n",
876 __func__, tp, lws_wsi_tag(task_to_wsi(task)));
877
878 break;
879 }
880 c = &(*c)->task_queue_next;
881 }
882
883 /* is he on the done queue? */
884
885 c = &tp->task_done_head;
886 while (*c) {
887 if ((*c) == task) {
888 *c = task->task_queue_next;
889 task->task_queue_next = NULL;
890 lws_threadpool_task_cleanup_destroy(task);
891 tp->done_queue_depth--;
892 goto bail;
893 }
894 c = &(*c)->task_queue_next;
895 }
896
897 /* he's not in the queue... is he already running on a thread? */
898
899 for (n = 0; n < tp->threads_in_pool; n++) {
900 if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
901 continue;
902
903 /*
904 * ensure we don't collide with tests or changes in the
905 * worker thread
906 */
907 pthread_mutex_lock(&tp->pool_list[n].lock);
908
909 /*
910 * mark him as having been requested to stop...
911 * the caller will hear about it in his service thread
912 * context as a request to close
913 */
914 state_transition(task, LWS_TP_STATUS_STOPPING);
915
916 /* disconnect from wsi, and wsi from task */
917
918 lws_dll2_remove(&task->list);
919 task->args.wsi = NULL;
920 #if defined(LWS_WITH_SECURE_STREAMS)
921 task->args.ss = NULL;
922 #endif
923
924 pthread_mutex_unlock(&tp->pool_list[n].lock);
925
926 lwsl_debug("%s: tp %p: request stop running task "
927 "for %s\n", __func__, tp,
928 lws_wsi_tag(task_to_wsi(task)));
929
930 break;
931 }
932
933 if (n == tp->threads_in_pool) {
934 /* can't find it */
935 lwsl_notice("%s: tp %p: no task for %s, decoupling\n",
936 __func__, tp, lws_wsi_tag(task_to_wsi(task)));
937 lws_dll2_remove(&task->list);
938 task->args.wsi = NULL;
939 #if defined(LWS_WITH_SECURE_STREAMS)
940 task->args.ss = NULL;
941 #endif
942 }
943
944 bail:
945 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
946
947 return 0;
948 }
949
950 int
lws_threadpool_dequeue(struct lws * wsi)951 lws_threadpool_dequeue(struct lws *wsi) /* deprecated */
952 {
953 struct lws_threadpool_task *task;
954
955 if (!wsi->tp_task_owner.count)
956 return 0;
957 assert(wsi->tp_task_owner.count != 1);
958
959 task = lws_container_of(wsi->tp_task_owner.head,
960 struct lws_threadpool_task, list);
961
962 return lws_threadpool_dequeue_task(task);
963 }
964
965 struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool * tp,const struct lws_threadpool_task_args * args,const char * format,...)966 lws_threadpool_enqueue(struct lws_threadpool *tp,
967 const struct lws_threadpool_task_args *args,
968 const char *format, ...)
969 {
970 struct lws_threadpool_task *task = NULL;
971 va_list ap;
972
973 if (tp->destroying)
974 return NULL;
975
976 #if defined(LWS_WITH_SECURE_STREAMS)
977 assert(args->ss || args->wsi);
978 #endif
979
980 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
981
982 /*
983 * if there's room on the queue, the job always goes on the queue
984 * first, then any free thread may pick it up after the wake_idle
985 */
986
987 if (tp->queue_depth == tp->max_queue_depth) {
988 lwsl_notice("%s: queue reached limit %d\n", __func__,
989 tp->max_queue_depth);
990
991 goto bail;
992 }
993
994 /*
995 * create the task object
996 */
997
998 task = lws_malloc(sizeof(*task), __func__);
999 if (!task)
1000 goto bail;
1001
1002 memset(task, 0, sizeof(*task));
1003 pthread_cond_init(&task->wake_idle, NULL);
1004 task->args = *args;
1005 task->tp = tp;
1006 task->created = lws_now_usecs();
1007
1008 va_start(ap, format);
1009 vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
1010 va_end(ap);
1011
1012 /*
1013 * add him on the tp task queue
1014 */
1015
1016 task->task_queue_next = tp->task_queue_head;
1017 state_transition(task, LWS_TP_STATUS_QUEUED);
1018 tp->task_queue_head = task;
1019 tp->queue_depth++;
1020
1021 /*
1022 * mark the wsi itself as depending on this tp (so wsi close for
1023 * whatever reason can clean up)
1024 */
1025
1026 #if defined(LWS_WITH_SECURE_STREAMS)
1027 if (args->ss)
1028 lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
1029 else
1030 #endif
1031 lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
1032
1033 lwsl_thread("%s: tp %s: enqueued task %p (%s) for %s, depth %d\n",
1034 __func__, tp->name, task, task->name,
1035 lws_wsi_tag(task_to_wsi(task)), tp->queue_depth);
1036
1037 /* alert any idle thread there's something new on the task list */
1038
1039 lws_memory_barrier();
1040 pthread_cond_signal(&tp->wake_idle);
1041
1042 bail:
1043 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
1044
1045 return task;
1046 }
1047
1048 /* this should be called from the service thread */
1049
1050 enum lws_threadpool_task_status
lws_threadpool_task_status(struct lws_threadpool_task * task,void ** user)1051 lws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
1052 {
1053 enum lws_threadpool_task_status status;
1054 struct lws_threadpool *tp = task->tp;
1055
1056 if (!tp)
1057 return LWS_TP_STATUS_FINISHED;
1058
1059 *user = task->args.user;
1060 status = task->status;
1061
1062 if (status == LWS_TP_STATUS_FINISHED ||
1063 status == LWS_TP_STATUS_STOPPED) {
1064 char buf[160];
1065
1066 pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
1067 __lws_threadpool_task_dump(task, buf, sizeof(buf));
1068 lwsl_thread("%s: %s: service thread REAPING: %s\n",
1069 __func__, tp->name, buf);
1070 __lws_threadpool_reap(task);
1071 lws_memory_barrier();
1072 pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
1073 }
1074
1075 return status;
1076 }
1077
1078 enum lws_threadpool_task_status
lws_threadpool_task_status_noreap(struct lws_threadpool_task * task)1079 lws_threadpool_task_status_noreap(struct lws_threadpool_task *task)
1080 {
1081 return task->status;
1082 }
1083
1084 enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws * wsi,struct lws_threadpool_task ** _task,void ** user)1085 lws_threadpool_task_status_wsi(struct lws *wsi,
1086 struct lws_threadpool_task **_task, void **user)
1087 {
1088 struct lws_threadpool_task *task;
1089
1090 if (!wsi->tp_task_owner.count) {
1091 lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
1092 return LWS_TP_STATUS_FINISHED;
1093 }
1094
1095 assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */
1096
1097 task = lws_container_of(wsi->tp_task_owner.head,
1098 struct lws_threadpool_task, list);
1099
1100 *_task = task;
1101
1102 return lws_threadpool_task_status(task, user);
1103 }
1104
1105 void
lws_threadpool_task_sync(struct lws_threadpool_task * task,int stop)1106 lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1107 {
1108 lwsl_debug("%s\n", __func__);
1109 if (!task)
1110 return;
1111
1112 if (stop)
1113 state_transition(task, LWS_TP_STATUS_STOPPING);
1114
1115 pthread_mutex_lock(&task->tp->lock);
1116 pthread_cond_signal(&task->wake_idle);
1117 pthread_mutex_unlock(&task->tp->lock);
1118 }
1119
1120 int
lws_threadpool_foreach_task_wsi(struct lws * wsi,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1121 lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
1122 int (*cb)(struct lws_threadpool_task *task,
1123 void *user))
1124 {
1125 struct lws_threadpool_task *task1;
1126
1127 if (wsi->tp_task_owner.head == NULL)
1128 return 0;
1129
1130 task1 = lws_container_of(wsi->tp_task_owner.head,
1131 struct lws_threadpool_task, list);
1132
1133 pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */
1134
1135 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1136 wsi->tp_task_owner.head) {
1137 struct lws_threadpool_task *task = lws_container_of(d,
1138 struct lws_threadpool_task, list);
1139
1140 if (cb(task, user)) {
1141 pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1142 return 1;
1143 }
1144
1145 } lws_end_foreach_dll_safe(d, d1);
1146
1147 pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1148
1149 return 0;
1150 }
1151
1152 #if defined(LWS_WITH_SECURE_STREAMS)
1153 int
lws_threadpool_foreach_task_ss(struct lws_ss_handle * ss,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1154 lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
1155 int (*cb)(struct lws_threadpool_task *task,
1156 void *user))
1157 {
1158 if (!ss->wsi)
1159 return 0;
1160
1161 return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb);
1162 }
1163 #endif
1164
1165 static int
disassociate_wsi(struct lws_threadpool_task * task,void * user)1166 disassociate_wsi(struct lws_threadpool_task *task,
1167 void *user)
1168 {
1169 task->args.wsi = NULL;
1170 lws_dll2_remove(&task->list);
1171
1172 return 0;
1173 }
1174
1175 void
lws_threadpool_wsi_closing(struct lws * wsi)1176 lws_threadpool_wsi_closing(struct lws *wsi)
1177 {
1178 lws_threadpool_foreach_task_wsi(wsi, NULL, disassociate_wsi);
1179 }
1180
1181 struct lws_threadpool_task *
lws_threadpool_get_task_wsi(struct lws * wsi)1182 lws_threadpool_get_task_wsi(struct lws *wsi)
1183 {
1184 if (wsi->tp_task_owner.head == NULL)
1185 return NULL;
1186
1187 return lws_container_of(wsi->tp_task_owner.head,
1188 struct lws_threadpool_task, list);
1189 }
1190
1191 #if defined(LWS_WITH_SECURE_STREAMS)
1192 struct lws_threadpool_task *
lws_threadpool_get_task_ss(struct lws_ss_handle * ss)1193 lws_threadpool_get_task_ss(struct lws_ss_handle *ss)
1194 {
1195 return lws_threadpool_get_task_wsi(ss->wsi);
1196 }
1197 #endif
1198