1 /* thread_pool.c -- A pool of generic worker threads
2
3 Copyright (c) 2013-2020 Genome Research Ltd.
4
5 Author: James Bonfield <jkb@sanger.ac.uk>
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE. */
24
25 #ifndef TEST_MAIN
26 #define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
27 #include <config.h>
28 #endif
29
30 #include <stdlib.h>
31 #include <inttypes.h>
32 #include <signal.h>
33 #include <errno.h>
34 #include <stdio.h>
35 #include <string.h>
36 #include <sys/time.h>
37 #include <assert.h>
38 #include <stdarg.h>
39 #include <unistd.h>
40 #include <limits.h>
41
42 #include "thread_pool_internal.h"
43 #include "htslib/hts_log.h"
44
45 // Minimum stack size for threads. Required for some rANS codecs
46 // that use over 2Mbytes of stack for encoder / decoder state
47 #define HTS_MIN_THREAD_STACK (3 * 1024 * 1024)
48
49 static void hts_tpool_process_detach_locked(hts_tpool *p,
50 hts_tpool_process *q);
51
52 //#define DEBUG
53
54 #ifdef DEBUG
worker_id(hts_tpool * p)55 static int worker_id(hts_tpool *p) {
56 int i;
57 pthread_t s = pthread_self();
58 for (i = 0; i < p->tsize; i++) {
59 if (pthread_equal(s, p->t[i].tid))
60 return i;
61 }
62 return -1;
63 }
64
DBG_OUT(FILE * fp,char * fmt,...)65 void DBG_OUT(FILE *fp, char *fmt, ...) {
66 va_list args;
67 va_start(args, fmt);
68 vfprintf(fp, fmt, args);
69 va_end(args);
70 }
71 #else
72 #define DBG_OUT(...) do{}while(0)
73 #endif
74
75 /* ----------------------------------------------------------------------------
76 * A process-queue to hold results from the thread pool.
77 *
78 * Each thread pool may have jobs of multiple types being queued up and
79 * interleaved, so we attach several job process-queues to a single pool.
80 *
81 * The jobs themselves are expected to push their results onto their
82 * appropriate results queue.
83 */
84
85 /*
86 * Adds a result to the end of the process result queue.
87 *
88 * Returns 0 on success;
89 * -1 on failure
90 */
hts_tpool_add_result(hts_tpool_job * j,void * data)91 static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
92 hts_tpool_process *q = j->q;
93 hts_tpool_result *r;
94
95 pthread_mutex_lock(&q->p->pool_m);
96
97 DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n",
98 worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);
99
100 if (--q->n_processing == 0)
101 pthread_cond_signal(&q->none_processing_c);
102
103 /* No results queue is fine if we don't want any results back */
104 if (q->in_only) {
105 pthread_mutex_unlock(&q->p->pool_m);
106 return 0;
107 }
108
109 if (!(r = malloc(sizeof(*r)))) {
110 pthread_mutex_unlock(&q->p->pool_m);
111 hts_tpool_process_shutdown(q);
112 return -1;
113 }
114
115 r->next = NULL;
116 r->data = data;
117 r->result_cleanup = j->result_cleanup;
118 r->serial = j->serial;
119
120 q->n_output++;
121 if (q->output_tail) {
122 q->output_tail->next = r;
123 q->output_tail = r;
124 } else {
125 q->output_head = q->output_tail = r;
126 }
127
128 assert(r->serial >= q->next_serial // Or it will never be dequeued ...
129 || q->next_serial == INT_MAX); // ... unless flush in progress.
130 if (r->serial == q->next_serial) {
131 DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n",
132 worker_id(j->p), r->serial);
133 pthread_cond_broadcast(&q->output_avail_c);
134 DBG_OUT(stderr, "%d: Broadcast complete\n", worker_id(j->p));
135 }
136
137 pthread_mutex_unlock(&q->p->pool_m);
138
139 return 0;
140 }
141
142 static void wake_next_worker(hts_tpool_process *q, int locked);
143
144 /* Core of hts_tpool_next_result() */
hts_tpool_next_result_locked(hts_tpool_process * q)145 static hts_tpool_result *hts_tpool_next_result_locked(hts_tpool_process *q) {
146 hts_tpool_result *r, *last;
147
148 if (q->shutdown)
149 return NULL;
150
151 for (last = NULL, r = q->output_head; r; last = r, r = r->next) {
152 if (r->serial == q->next_serial)
153 break;
154 }
155
156 if (r) {
157 // Remove r from out linked list
158 if (q->output_head == r)
159 q->output_head = r->next;
160 else
161 last->next = r->next;
162
163 if (q->output_tail == r)
164 q->output_tail = last;
165
166 if (!q->output_head)
167 q->output_tail = NULL;
168
169 q->next_serial++;
170 q->n_output--;
171
172 if (q->qsize && q->n_output < q->qsize) {
173 // Not technically input full, but can guarantee there is
174 // room for the input to go somewhere so we still signal.
175 // The waiting code will then check the condition again.
176 if (q->n_input < q->qsize)
177 pthread_cond_signal(&q->input_not_full_c);
178 if (!q->shutdown)
179 wake_next_worker(q, 1);
180 }
181 }
182
183 return r;
184 }
185
186 /*
187 * Pulls the next item off the process result queue. The caller should free
188 * it (and any internals as appropriate) after use. This doesn't wait for a
189 * result to be present.
190 *
191 * Results will be returned in strict order.
192 *
193 * Returns hts_tpool_result pointer if a result is ready.
194 * NULL if not.
195 */
hts_tpool_next_result(hts_tpool_process * q)196 hts_tpool_result *hts_tpool_next_result(hts_tpool_process *q) {
197 hts_tpool_result *r;
198
199 DBG_OUT(stderr, "Requesting next result on queue %p\n", q);
200
201 pthread_mutex_lock(&q->p->pool_m);
202 r = hts_tpool_next_result_locked(q);
203 pthread_mutex_unlock(&q->p->pool_m);
204
205 DBG_OUT(stderr, "(q=%p) Found %p\n", q, r);
206
207 return r;
208 }
209
210 /*
211 * Pulls the next item off the process result queue. The caller should free
212 * it (and any internals as appropriate) after use. This will wait for
213 * a result to be present if none are currently available.
214 *
215 * Results will be returned in strict order.
216 *
217 * Returns hts_tpool_result pointer if a result is ready.
218 * NULL on error or during shutdown.
219 */
hts_tpool_next_result_wait(hts_tpool_process * q)220 hts_tpool_result *hts_tpool_next_result_wait(hts_tpool_process *q) {
221 hts_tpool_result *r;
222
223 pthread_mutex_lock(&q->p->pool_m);
224 while (!(r = hts_tpool_next_result_locked(q))) {
225 /* Possible race here now avoided via _locked() call, but in case... */
226 struct timeval now;
227 struct timespec timeout;
228
229 gettimeofday(&now, NULL);
230 timeout.tv_sec = now.tv_sec + 10;
231 timeout.tv_nsec = now.tv_usec * 1000;
232
233 q->ref_count++;
234 if (q->shutdown) {
235 int rc = --q->ref_count;
236 pthread_mutex_unlock(&q->p->pool_m);
237 if (rc == 0)
238 hts_tpool_process_destroy(q);
239 return NULL;
240 }
241 pthread_cond_timedwait(&q->output_avail_c, &q->p->pool_m, &timeout);
242
243 q->ref_count--;
244 }
245 pthread_mutex_unlock(&q->p->pool_m);
246
247 return r;
248 }
249
250 /*
251 * Returns true if there are no items in the process results queue and
252 * also none still pending.
253 */
hts_tpool_process_empty(hts_tpool_process * q)254 int hts_tpool_process_empty(hts_tpool_process *q) {
255 int empty;
256
257 pthread_mutex_lock(&q->p->pool_m);
258 empty = q->n_input == 0 && q->n_processing == 0 && q->n_output == 0;
259 pthread_mutex_unlock(&q->p->pool_m);
260
261 return empty;
262 }
263
hts_tpool_process_ref_incr(hts_tpool_process * q)264 void hts_tpool_process_ref_incr(hts_tpool_process *q) {
265 pthread_mutex_lock(&q->p->pool_m);
266 q->ref_count++;
267 pthread_mutex_unlock(&q->p->pool_m);
268 }
269
hts_tpool_process_ref_decr(hts_tpool_process * q)270 void hts_tpool_process_ref_decr(hts_tpool_process *q) {
271 pthread_mutex_lock(&q->p->pool_m);
272 if (--q->ref_count <= 0) {
273 pthread_mutex_unlock(&q->p->pool_m);
274 hts_tpool_process_destroy(q);
275 return;
276 }
277
278 // maybe also call destroy here if needed?
279 pthread_mutex_unlock(&q->p->pool_m);
280 }
281
282 /*
283 * Returns the number of completed jobs in the process results queue.
284 */
hts_tpool_process_len(hts_tpool_process * q)285 int hts_tpool_process_len(hts_tpool_process *q) {
286 int len;
287
288 pthread_mutex_lock(&q->p->pool_m);
289 len = q->n_output;
290 pthread_mutex_unlock(&q->p->pool_m);
291
292 return len;
293 }
294
295 /*
296 * Returns the number of completed jobs in the process results queue plus the
297 * number running and queued up to run.
298 */
hts_tpool_process_sz(hts_tpool_process * q)299 int hts_tpool_process_sz(hts_tpool_process *q) {
300 int len;
301
302 pthread_mutex_lock(&q->p->pool_m);
303 len = q->n_output + q->n_input + q->n_processing;
304 pthread_mutex_unlock(&q->p->pool_m);
305
306 return len;
307 }
308
309 /*
310 * Shutdown a process.
311 *
312 * This sets the shutdown flag and wakes any threads waiting on process
313 * condition variables.
314 */
hts_tpool_process_shutdown_locked(hts_tpool_process * q)315 static void hts_tpool_process_shutdown_locked(hts_tpool_process *q) {
316 q->shutdown = 1;
317 pthread_cond_broadcast(&q->output_avail_c);
318 pthread_cond_broadcast(&q->input_not_full_c);
319 pthread_cond_broadcast(&q->input_empty_c);
320 pthread_cond_broadcast(&q->none_processing_c);
321 }
322
hts_tpool_process_shutdown(hts_tpool_process * q)323 void hts_tpool_process_shutdown(hts_tpool_process *q) {
324 pthread_mutex_lock(&q->p->pool_m);
325 hts_tpool_process_shutdown_locked(q);
326 pthread_mutex_unlock(&q->p->pool_m);
327 }
328
hts_tpool_process_is_shutdown(hts_tpool_process * q)329 int hts_tpool_process_is_shutdown(hts_tpool_process *q) {
330 pthread_mutex_lock(&q->p->pool_m);
331 int r = q->shutdown;
332 pthread_mutex_unlock(&q->p->pool_m);
333 return r;
334 }
335
336 /*
337 * Frees a result 'r' and if free_data is true also frees
338 * the internal r->data result too.
339 */
hts_tpool_delete_result(hts_tpool_result * r,int free_data)340 void hts_tpool_delete_result(hts_tpool_result *r, int free_data) {
341 if (!r)
342 return;
343
344 if (free_data && r->data)
345 free(r->data);
346
347 free(r);
348 }
349
350 /*
351 * Returns the data portion of a hts_tpool_result, corresponding
352 * to the actual "result" itself.
353 */
hts_tpool_result_data(hts_tpool_result * r)354 void *hts_tpool_result_data(hts_tpool_result *r) {
355 return r->data;
356 }
357
358 /*
359 * Initialises a thread process-queue.
360 *
361 * In_only, if true, indicates that the process generates does not need to
362 * hold any output. Otherwise an output queue is used to store the results
363 * of processing each input job.
364 *
365 * Results hts_tpool_process pointer on success;
366 * NULL on failure
367 */
hts_tpool_process_init(hts_tpool * p,int qsize,int in_only)368 hts_tpool_process *hts_tpool_process_init(hts_tpool *p, int qsize, int in_only) {
369 hts_tpool_process *q = malloc(sizeof(*q));
370 if (!q)
371 return NULL;
372
373 pthread_cond_init(&q->output_avail_c, NULL);
374 pthread_cond_init(&q->input_not_full_c, NULL);
375 pthread_cond_init(&q->input_empty_c, NULL);
376 pthread_cond_init(&q->none_processing_c,NULL);
377
378 q->p = p;
379 q->input_head = NULL;
380 q->input_tail = NULL;
381 q->output_head = NULL;
382 q->output_tail = NULL;
383 q->next_serial = 0;
384 q->curr_serial = 0;
385 q->no_more_input = 0;
386 q->n_input = 0;
387 q->n_output = 0;
388 q->n_processing= 0;
389 q->qsize = qsize;
390 q->in_only = in_only;
391 q->shutdown = 0;
392 q->wake_dispatch = 0;
393 q->ref_count = 1;
394
395 q->next = NULL;
396 q->prev = NULL;
397
398 hts_tpool_process_attach(p, q);
399
400 return q;
401 }
402
403 /* Deallocates memory for a thread process-queue.
404 * Must be called before the thread pool is destroyed.
405 */
hts_tpool_process_destroy(hts_tpool_process * q)406 void hts_tpool_process_destroy(hts_tpool_process *q) {
407 DBG_OUT(stderr, "Destroying results queue %p\n", q);
408
409 if (!q)
410 return;
411
412 // Prevent dispatch from queuing up any more jobs.
413 // We want to reset (and flush) the queue here, before
414 // we set the shutdown flag, but we need to avoid races
415 // with queue more input during reset.
416 pthread_mutex_lock(&q->p->pool_m);
417 q->no_more_input = 1;
418 pthread_mutex_unlock(&q->p->pool_m);
419
420 // Ensure it's fully drained before destroying the queue
421 hts_tpool_process_reset(q, 0);
422 pthread_mutex_lock(&q->p->pool_m);
423 hts_tpool_process_detach_locked(q->p, q);
424 hts_tpool_process_shutdown_locked(q);
425
426 // Maybe a worker is scanning this queue, so delay destruction
427 if (--q->ref_count > 0) {
428 pthread_mutex_unlock(&q->p->pool_m);
429 return;
430 }
431
432 pthread_cond_destroy(&q->output_avail_c);
433 pthread_cond_destroy(&q->input_not_full_c);
434 pthread_cond_destroy(&q->input_empty_c);
435 pthread_cond_destroy(&q->none_processing_c);
436 pthread_mutex_unlock(&q->p->pool_m);
437
438 free(q);
439
440 DBG_OUT(stderr, "Destroyed results queue %p\n", q);
441 }
442
443
444 /*
445 * Attach and detach a thread process-queue with / from the thread pool
446 * scheduler.
447 *
448 * We need to do attach after making a thread process, but may also wish
449 * to temporarily detach if we wish to stop running jobs on a specific
450 * process while permitting other process to continue.
451 */
hts_tpool_process_attach(hts_tpool * p,hts_tpool_process * q)452 void hts_tpool_process_attach(hts_tpool *p, hts_tpool_process *q) {
453 pthread_mutex_lock(&p->pool_m);
454 if (p->q_head) {
455 q->next = p->q_head;
456 q->prev = p->q_head->prev;
457 p->q_head->prev->next = q;
458 p->q_head->prev = q;
459 } else {
460 q->next = q;
461 q->prev = q;
462 }
463 p->q_head = q;
464 assert(p->q_head && p->q_head->prev && p->q_head->next);
465 pthread_mutex_unlock(&p->pool_m);
466 }
467
hts_tpool_process_detach_locked(hts_tpool * p,hts_tpool_process * q)468 static void hts_tpool_process_detach_locked(hts_tpool *p,
469 hts_tpool_process *q) {
470 if (!p->q_head || !q->prev || !q->next)
471 return;
472
473 hts_tpool_process *curr = p->q_head, *first = curr;
474 do {
475 if (curr == q) {
476 q->next->prev = q->prev;
477 q->prev->next = q->next;
478 p->q_head = q->next;
479 q->next = q->prev = NULL;
480
481 // Last one
482 if (p->q_head == q)
483 p->q_head = NULL;
484 break;
485 }
486
487 curr = curr->next;
488 } while (curr != first);
489 }
490
hts_tpool_process_detach(hts_tpool * p,hts_tpool_process * q)491 void hts_tpool_process_detach(hts_tpool *p, hts_tpool_process *q) {
492 pthread_mutex_lock(&p->pool_m);
493 hts_tpool_process_detach_locked(p, q);
494 pthread_mutex_unlock(&p->pool_m);
495 }
496
497
498 /* ----------------------------------------------------------------------------
499 * The thread pool.
500 */
501
502 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
503
504 /*
505 * A worker thread.
506 *
507 * Once woken, each thread checks each process-queue in the pool in turn,
508 * looking for input jobs that also have room for the output (if it requires
509 * storing). If found, we execute it and repeat.
510 *
511 * If we checked all input queues and find no such job, then we wait until we
512 * are signalled to check again.
513 */
tpool_worker(void * arg)514 static void *tpool_worker(void *arg) {
515 hts_tpool_worker *w = (hts_tpool_worker *)arg;
516 hts_tpool *p = w->p;
517 hts_tpool_job *j;
518
519 pthread_mutex_lock(&p->pool_m);
520 while (!p->shutdown) {
521 // Pop an item off the pool queue
522
523 assert(p->q_head == 0 || (p->q_head->prev && p->q_head->next));
524
525 int work_to_do = 0;
526 hts_tpool_process *first = p->q_head, *q = first;
527 do {
528 // Iterate over queues, finding one with jobs and also
529 // room to put the result.
530 //if (q && q->input_head && !hts_tpool_process_output_full(q)) {
531 if (q && q->input_head
532 && q->qsize - q->n_output > q->n_processing
533 && !q->shutdown) {
534 work_to_do = 1;
535 break;
536 }
537
538 if (q) q = q->next;
539 } while (q && q != first);
540
541 if (!work_to_do) {
542 // We scanned all queues and cannot process any, so we wait.
543 p->nwaiting++;
544
545 // Push this thread to the top of the waiting stack
546 if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
547 p->t_stack_top = w->idx;
548
549 p->t_stack[w->idx] = 1;
550 // printf("%2d: no work. In=%d Proc=%d Out=%d full=%d\n",
551 // w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output,
552 // hts_tpool_process_output_full(p->q_head));
553 pthread_cond_wait(&w->pending_c, &p->pool_m);
554 p->t_stack[w->idx] = 0;
555
556 /* Find new t_stack_top */
557 int i;
558 p->t_stack_top = -1;
559 for (i = 0; i < p->tsize; i++) {
560 if (p->t_stack[i]) {
561 p->t_stack_top = i;
562 break;
563 }
564 }
565
566 p->nwaiting--;
567 continue; // To outer loop.
568 }
569
570 // Otherwise work_to_do, so process as many items in this queue as
571 // possible before switching to another queue. This means threads
572 // often end up being dedicated to one type of work.
573 q->ref_count++;
574 while (q->input_head && q->qsize - q->n_output > q->n_processing) {
575 if (p->shutdown)
576 goto shutdown;
577
578 if (q->shutdown)
579 // Queue shutdown, but there may be other queues
580 break;
581
582 j = q->input_head;
583 assert(j->p == p);
584
585 if (!(q->input_head = j->next))
586 q->input_tail = NULL;
587
588 // Transitioning from full queue to not-full means we can wake up
589 // any blocked dispatch threads. We broadcast this as it's only
590 // happening once (on the transition) rather than every time we
591 // are below qsize.
592 // (I wish I could remember why io_lib rev 3660 changed this from
593 // == to >=, but keeping it just in case!)
594 q->n_processing++;
595 if (q->n_input-- >= q->qsize)
596 pthread_cond_broadcast(&q->input_not_full_c);
597
598 if (q->n_input == 0)
599 pthread_cond_signal(&q->input_empty_c);
600
601 p->njobs--; // Total number of jobs; used to adjust to CPU scaling
602
603 pthread_mutex_unlock(&p->pool_m);
604
605 DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n",
606 worker_id(j->p), q, j->serial);
607
608 if (hts_tpool_add_result(j, j->func(j->arg)) < 0)
609 goto err;
610 //memset(j, 0xbb, sizeof(*j));
611 free(j);
612
613 pthread_mutex_lock(&p->pool_m);
614 }
615 if (--q->ref_count == 0) { // we were the last user
616 hts_tpool_process_destroy(q);
617 } else {
618 // Out of jobs on this queue, so restart search from next one.
619 // This is equivalent to "work-stealing".
620 if (p->q_head)
621 p->q_head = p->q_head->next;
622 }
623 }
624
625 shutdown:
626 pthread_mutex_unlock(&p->pool_m);
627 #ifdef DEBUG
628 fprintf(stderr, "%d: Shutting down\n", worker_id(p));
629 #endif
630 return NULL;
631
632 err:
633 #ifdef DEBUG
634 fprintf(stderr, "%d: Failed to add result\n", worker_id(p));
635 #endif
636 // Hard failure, so shutdown all queues
637 pthread_mutex_lock(&p->pool_m);
638 hts_tpool_process *first = p->q_head, *q = first;
639 if (q) {
640 do {
641 hts_tpool_process_shutdown_locked(q);
642 q->shutdown = 2; // signify error.
643 q = q->next;
644 } while (q != first);
645 }
646 pthread_mutex_unlock(&p->pool_m);
647 return NULL;
648 }
649
wake_next_worker(hts_tpool_process * q,int locked)650 static void wake_next_worker(hts_tpool_process *q, int locked) {
651 if (!q) return;
652 hts_tpool *p = q->p;
653 if (!locked)
654 pthread_mutex_lock(&p->pool_m);
655
656 // Update the q_head to be this queue so we'll start processing
657 // the queue we know to have results.
658 assert(q->prev && q->next); // attached
659 p->q_head = q;
660
661 // Wake up if we have more jobs waiting than CPUs. This partially combats
662 // CPU frequency scaling effects. Starting too many threads and then
663 // running out of jobs can cause each thread to have lots of start/stop
664 // cycles, which then translates often to CPU frequency scaling
665 // adjustments. Instead it is better to only start as many threads as we
666 // need to keep the throughput up, meaning some threads run flat out and
667 // others are idle.
668 //
669 // This isn't perfect as we need to know how many can actually start,
670 // rather than how many are waiting. A limit on output queue size makes
671 // these two figures different.
672 assert(p->njobs >= q->n_input);
673
674 int running = p->tsize - p->nwaiting;
675 int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting
676 && (q->n_processing < q->qsize - q->n_output);
677
678 //#define AVG_USAGE
679 #ifdef AVG_USAGE
680 // Track average number of running threads and try to keep close.
681 // We permit this to change, but slowly. This avoids "boom and bust" cycles
682 // where we read a lot of data, start a lot of jobs, then become idle again.
683 // This way some threads run steadily and others dormant, which is better
684 // for throughput.
685 //
686 // It's 50:50 if this is a good thing. It helps some tasks quite significantly
687 // while slightly hindering other (perhaps more usual) jobs.
688
689 if (++p->n_count == 256) {
690 p->n_count >>= 1;
691 p->n_running >>= 1;
692 }
693 p->n_running += running;
694 // Built in lag to avoid see-sawing. Is this safe in all cases?
695 if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0;
696 #endif
697
698 if (0) {
699 printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs,
700 running, q->n_output, q->qsize - q->n_output,
701 p->n_running/p->n_count, sig);
702 int i;
703 for (i = 0; i < p->tsize; i++)
704 putchar("x "[p->t_stack[i]]);
705 putchar('\n');
706 }
707
708 if (sig)
709 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
710
711 if (!locked)
712 pthread_mutex_unlock(&p->pool_m);
713 }
714
715 /*
716 * Creates a worker pool with n worker threads.
717 *
718 * Returns pool pointer on success;
719 * NULL on failure
720 */
hts_tpool_init(int n)721 hts_tpool *hts_tpool_init(int n) {
722 int t_idx = 0;
723 size_t stack_size = 0;
724 pthread_attr_t pattr;
725 int pattr_init_done = 0;
726 hts_tpool *p = malloc(sizeof(*p));
727 if (!p)
728 return NULL;
729 p->tsize = n;
730 p->njobs = 0;
731 p->nwaiting = 0;
732 p->shutdown = 0;
733 p->q_head = NULL;
734 p->t_stack = NULL;
735 p->n_count = 0;
736 p->n_running = 0;
737 p->t = malloc(n * sizeof(p->t[0]));
738 if (!p->t) {
739 free(p);
740 return NULL;
741 }
742 p->t_stack = malloc(n * sizeof(*p->t_stack));
743 if (!p->t_stack) {
744 free(p->t);
745 free(p);
746 return NULL;
747 }
748 p->t_stack_top = -1;
749
750 pthread_mutexattr_t attr;
751 pthread_mutexattr_init(&attr);
752 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
753 pthread_mutex_init(&p->pool_m, &attr);
754 pthread_mutexattr_destroy(&attr);
755
756 pthread_mutex_lock(&p->pool_m);
757
758 // Ensure new threads have a reasonably large stack. On some platforms,
759 // for example MacOS which defaults to 512Kb, this is not big enough
760 // for some of the rANS codecs.
761
762 if (pthread_attr_init(&pattr) < 0)
763 goto cleanup;
764 pattr_init_done = 1;
765 if (pthread_attr_getstacksize(&pattr, &stack_size) < 0)
766 goto cleanup;
767 if (stack_size < HTS_MIN_THREAD_STACK) {
768 if (pthread_attr_setstacksize(&pattr, HTS_MIN_THREAD_STACK) < 0)
769 goto cleanup;
770 }
771
772 for (t_idx = 0; t_idx < n; t_idx++) {
773 hts_tpool_worker *w = &p->t[t_idx];
774 p->t_stack[t_idx] = 0;
775 w->p = p;
776 w->idx = t_idx;
777 pthread_cond_init(&w->pending_c, NULL);
778 if (0 != pthread_create(&w->tid, &pattr, tpool_worker, w))
779 goto cleanup;
780 }
781
782 pthread_mutex_unlock(&p->pool_m);
783 pthread_attr_destroy(&pattr);
784
785 return p;
786
787 cleanup: {
788 // Any threads started will be waiting for p->pool_m, so we can
789 // stop them cleanly by setting p->shutdown, releasing the mutex and
790 // waiting for them to finish.
791 int j;
792 int save_errno = errno;
793 hts_log_error("Couldn't start thread pool worker : %s",
794 strerror(errno));
795 p->shutdown = 1;
796 pthread_mutex_unlock(&p->pool_m);
797 for (j = 0; j < t_idx; j++) {
798 pthread_join(p->t[j].tid, NULL);
799 pthread_cond_destroy(&p->t[j].pending_c);
800 }
801 pthread_mutex_destroy(&p->pool_m);
802 if (pattr_init_done)
803 pthread_attr_destroy(&pattr);
804 free(p->t_stack);
805 free(p->t);
806 free(p);
807 errno = save_errno;
808 return NULL;
809 }
810 }
811
812 /*
813 * Returns the number of requested threads for a pool.
814 */
hts_tpool_size(hts_tpool * p)815 int hts_tpool_size(hts_tpool *p) {
816 return p->tsize;
817 }
818
819 /*
820 * Adds an item to the work pool.
821 *
822 * Returns 0 on success
823 * -1 on failure
824 */
hts_tpool_dispatch(hts_tpool * p,hts_tpool_process * q,void * (* func)(void * arg),void * arg)825 int hts_tpool_dispatch(hts_tpool *p, hts_tpool_process *q,
826 void *(*func)(void *arg), void *arg) {
827 return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, 0);
828 }
829
830 /*
831 * As above but optional non-block flag.
832 *
833 * nonblock 0 => block if input queue is full
834 * nonblock +1 => don't block if input queue is full, but do not add task
835 * nonblock -1 => add task regardless of whether queue is full (over-size)
836 */
hts_tpool_dispatch2(hts_tpool * p,hts_tpool_process * q,void * (* func)(void * arg),void * arg,int nonblock)837 int hts_tpool_dispatch2(hts_tpool *p, hts_tpool_process *q,
838 void *(*func)(void *arg), void *arg, int nonblock) {
839 return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, nonblock);
840 }
841
hts_tpool_dispatch3(hts_tpool * p,hts_tpool_process * q,void * (* exec_func)(void * arg),void * arg,void (* job_cleanup)(void * arg),void (* result_cleanup)(void * data),int nonblock)842 int hts_tpool_dispatch3(hts_tpool *p, hts_tpool_process *q,
843 void *(*exec_func)(void *arg), void *arg,
844 void (*job_cleanup)(void *arg),
845 void (*result_cleanup)(void *data),
846 int nonblock) {
847 hts_tpool_job *j;
848
849 pthread_mutex_lock(&p->pool_m);
850
851 DBG_OUT(stderr, "Dispatching job for queue %p, serial %"PRId64"\n",
852 q, q->curr_serial);
853
854 if ((q->no_more_input || q->n_input >= q->qsize) && nonblock == 1) {
855 pthread_mutex_unlock(&p->pool_m);
856 errno = EAGAIN;
857 return -1;
858 }
859
860 if (!(j = malloc(sizeof(*j)))) {
861 pthread_mutex_unlock(&p->pool_m);
862 return -1;
863 }
864 j->func = exec_func;
865 j->arg = arg;
866 j->job_cleanup = job_cleanup;
867 j->result_cleanup = result_cleanup;
868 j->next = NULL;
869 j->p = p;
870 j->q = q;
871 j->serial = q->curr_serial++;
872
873 if (nonblock == 0) {
874 while ((q->no_more_input || q->n_input >= q->qsize) &&
875 !q->shutdown && !q->wake_dispatch) {
876 pthread_cond_wait(&q->input_not_full_c, &q->p->pool_m);
877 }
878 if (q->no_more_input || q->shutdown) {
879 free(j);
880 pthread_mutex_unlock(&p->pool_m);
881 return -1;
882 }
883 if (q->wake_dispatch) {
884 //fprintf(stderr, "Wake => non-block for this operation\n");
885 q->wake_dispatch = 0;
886 }
887 }
888
889 p->njobs++; // total across all queues
890 q->n_input++; // queue specific
891
892 if (q->input_tail) {
893 q->input_tail->next = j;
894 q->input_tail = j;
895 } else {
896 q->input_head = q->input_tail = j;
897 }
898
899 DBG_OUT(stderr, "Dispatched (serial %"PRId64")\n", j->serial);
900
901 // Let a worker know we have data.
902 // Keep incoming queue at 1 per running thread, so there is always
903 // something waiting when they end their current task. If we go above
904 // this signal to start more threads (if available). This has the effect
905 // of concentrating jobs to fewer cores when we are I/O bound, which in
906 // turn benefits systems with auto CPU frequency scaling.
907 if (!q->shutdown)
908 wake_next_worker(q, 1);
909
910 pthread_mutex_unlock(&p->pool_m);
911
912 return 0;
913 }
914
915 /*
916 * Wakes up a single thread stuck in dispatch and make it return with
917 * errno EAGAIN.
918 */
hts_tpool_wake_dispatch(hts_tpool_process * q)919 void hts_tpool_wake_dispatch(hts_tpool_process *q) {
920 pthread_mutex_lock(&q->p->pool_m);
921 q->wake_dispatch = 1;
922 pthread_cond_signal(&q->input_not_full_c);
923 pthread_mutex_unlock(&q->p->pool_m);
924 }
925
926 /*
927 * Flushes the process-queue, but doesn't exit. This simply drains the queue
928 * and ensures all worker threads have finished their current tasks
929 * associated with this process.
930 *
931 * NOT: This does not mean the worker threads are not executing jobs in
932 * another process-queue.
933 *
934 * Returns 0 on success;
935 * -1 on failure
936 */
hts_tpool_process_flush(hts_tpool_process * q)937 int hts_tpool_process_flush(hts_tpool_process *q) {
938 int i;
939 hts_tpool *p = q->p;
940
941 DBG_OUT(stderr, "Flushing pool %p\n", p);
942
943 // Drains the queue
944 pthread_mutex_lock(&p->pool_m);
945
946 // Wake up everything for the final sprint!
947 for (i = 0; i < p->tsize; i++)
948 if (p->t_stack[i])
949 pthread_cond_signal(&p->t[i].pending_c);
950
951 // Ensure there is room for the final sprint.
952 // Ideally we shouldn't get here, but the "q->qsize - q->n_output >
953 // n_processing" check in tpool_worker means we can trigger a
954 // deadlock there. This negates that possibility.
955 if (q->qsize < q->n_output + q->n_input + q->n_processing)
956 q->qsize = q->n_output + q->n_input + q->n_processing;
957
958 // When shutdown, we won't be launching more, but we can still
959 // wait for any processing jobs complete.
960 if (q->shutdown) {
961 while (q->n_processing)
962 pthread_cond_wait(&q->none_processing_c, &p->pool_m);
963 }
964
965 // Wait for n_input and n_processing to hit zero.
966 while (!q->shutdown && (q->n_input || q->n_processing)) {
967 struct timeval now;
968 struct timespec timeout;
969
970 while (q->n_input && !q->shutdown) {
971 gettimeofday(&now, NULL);
972 timeout.tv_sec = now.tv_sec + 1;
973 timeout.tv_nsec = now.tv_usec * 1000;
974 pthread_cond_timedwait(&q->input_empty_c, &p->pool_m, &timeout);
975 }
976
977 // Note: even if q->shutdown is set, we still have to wait until
978 // q->n_processing is zero as we cannot terminate while things are
979 // running otherwise we free up the data being worked on.
980 while (q->n_processing) {
981 gettimeofday(&now, NULL);
982 timeout.tv_sec = now.tv_sec + 1;
983 timeout.tv_nsec = now.tv_usec * 1000;
984 pthread_cond_timedwait(&q->none_processing_c, &p->pool_m,
985 &timeout);
986 }
987 if (q->shutdown) break;
988 }
989
990 pthread_mutex_unlock(&p->pool_m);
991
992 DBG_OUT(stderr, "Flushed complete for pool %p, queue %p\n", p, q);
993
994 return 0;
995 }
996
997 /*
998 * Resets a process to the initial state.
999 *
1000 * This removes any queued up input jobs, disables any notification of
1001 * new results/output, flushes what is left and then discards any
1002 * queued output. Anything consumer stuck in a wait on results to
1003 * appear should stay stuck and will only wake up when new data is
1004 * pushed through the queue.
1005 *
1006 * Returns 0 on success;
1007 * -1 on failure
1008 */
hts_tpool_process_reset(hts_tpool_process * q,int free_results)1009 int hts_tpool_process_reset(hts_tpool_process *q, int free_results) {
1010 hts_tpool_job *j, *jn, *j_head;
1011 hts_tpool_result *r, *rn, *r_head;
1012
1013 pthread_mutex_lock(&q->p->pool_m);
1014 // prevent next_result from returning data during our flush
1015 q->next_serial = INT_MAX;
1016
1017 // Remove any queued input not yet being acted upon
1018 j_head = q->input_head;
1019 q->input_head = q->input_tail = NULL;
1020 q->n_input = 0;
1021
1022 // Remove any queued output, thus ensuring we have room to flush.
1023 r_head = q->output_head;
1024 q->output_head = q->output_tail = NULL;
1025 q->n_output = 0;
1026 pthread_mutex_unlock(&q->p->pool_m);
1027
1028 // Release memory. This can be done unlocked now the lists have been
1029 // removed from the queue
1030 for (j = j_head; j; j = jn) {
1031 jn = j->next;
1032 if (j->job_cleanup) j->job_cleanup(j->arg);
1033 free(j);
1034 }
1035
1036 for (r = r_head; r; r = rn) {
1037 rn = r->next;
1038 if (r->result_cleanup) {
1039 r->result_cleanup(r->data);
1040 r->data = NULL;
1041 }
1042 hts_tpool_delete_result(r, free_results);
1043 }
1044
1045 // Wait for any jobs being processed to complete.
1046 // (TODO: consider how to cancel any currently processing jobs.
1047 // Probably this is too hard.)
1048 if (hts_tpool_process_flush(q) != 0)
1049 return -1;
1050
1051 // Remove any new output.
1052 pthread_mutex_lock(&q->p->pool_m);
1053 r_head = q->output_head;
1054 q->output_head = q->output_tail = NULL;
1055 q->n_output = 0;
1056
1057 // Finally reset the serial back to the starting point.
1058 q->next_serial = q->curr_serial = 0;
1059 pthread_cond_signal(&q->input_not_full_c);
1060 pthread_mutex_unlock(&q->p->pool_m);
1061
1062 // Discard unwanted output
1063 for (r = r_head; r; r = rn) {
1064 //fprintf(stderr, "Discard output %d\n", r->serial);
1065 rn = r->next;
1066 if (r->result_cleanup) {
1067 r->result_cleanup(r->data);
1068 r->data = NULL;
1069 }
1070 hts_tpool_delete_result(r, free_results);
1071 }
1072
1073 return 0;
1074 }
1075
1076 /* Returns the process queue size */
hts_tpool_process_qsize(hts_tpool_process * q)1077 int hts_tpool_process_qsize(hts_tpool_process *q) {
1078 return q->qsize;
1079 }
1080
1081 /*
1082 * Destroys a thread pool. The threads are joined into the main
1083 * thread so they will finish their current work load.
1084 */
hts_tpool_destroy(hts_tpool * p)1085 void hts_tpool_destroy(hts_tpool *p) {
1086 int i;
1087
1088 DBG_OUT(stderr, "Destroying pool %p\n", p);
1089
1090 /* Send shutdown message to worker threads */
1091 pthread_mutex_lock(&p->pool_m);
1092 p->shutdown = 1;
1093
1094 DBG_OUT(stderr, "Sending shutdown request\n");
1095
1096 for (i = 0; i < p->tsize; i++)
1097 pthread_cond_signal(&p->t[i].pending_c);
1098
1099 pthread_mutex_unlock(&p->pool_m);
1100
1101 DBG_OUT(stderr, "Shutdown complete\n");
1102
1103 for (i = 0; i < p->tsize; i++)
1104 pthread_join(p->t[i].tid, NULL);
1105
1106 pthread_mutex_destroy(&p->pool_m);
1107 for (i = 0; i < p->tsize; i++)
1108 pthread_cond_destroy(&p->t[i].pending_c);
1109
1110 if (p->t_stack)
1111 free(p->t_stack);
1112
1113 free(p->t);
1114 free(p);
1115
1116 DBG_OUT(stderr, "Destroyed pool %p\n", p);
1117 }
1118
1119
1120 /*
1121 * Destroys a thread pool without waiting on jobs to complete.
1122 * Use hts_tpool_kill(p) to quickly exit after a fatal error.
1123 */
hts_tpool_kill(hts_tpool * p)1124 void hts_tpool_kill(hts_tpool *p) {
1125 int i;
1126
1127 DBG_OUT(stderr, "Destroying pool %p, kill=%d\n", p, kill);
1128
1129 for (i = 0; i < p->tsize; i++)
1130 pthread_kill(p->t[i].tid, SIGINT);
1131
1132 pthread_mutex_destroy(&p->pool_m);
1133 for (i = 0; i < p->tsize; i++)
1134 pthread_cond_destroy(&p->t[i].pending_c);
1135
1136 if (p->t_stack)
1137 free(p->t_stack);
1138
1139 free(p->t);
1140 free(p);
1141
1142 DBG_OUT(stderr, "Destroyed pool %p\n", p);
1143 }
1144
1145
1146 /*=============================================================================
1147 * Test app.
1148 *
1149 * This can be considered both as a basic test and as a worked example for
1150 * various usage patterns.
1151 *=============================================================================
1152 */
1153
1154 #ifdef TEST_MAIN
1155
1156 #include <stdio.h>
1157
1158 #ifndef TASK_SIZE
1159 #define TASK_SIZE 1000
1160 #endif
1161
1162 /*-----------------------------------------------------------------------------
1163 * Unordered x -> x*x test.
1164 * Results arrive in order of completion.
1165 */
doit_square_u(void * arg)1166 void *doit_square_u(void *arg) {
1167 int job = *(int *)arg;
1168
1169 usleep(random() % 100000); // to coerce job completion out of order
1170
1171 printf("RESULT: %d\n", job*job);
1172
1173 free(arg);
1174 return NULL;
1175 }
1176
test_square_u(int n)1177 int test_square_u(int n) {
1178 hts_tpool *p = hts_tpool_init(n);
1179 hts_tpool_process *q = hts_tpool_process_init(p, n*2, 1);
1180 int i;
1181
1182 // Dispatch jobs
1183 for (i = 0; i < TASK_SIZE; i++) {
1184 int *ip = malloc(sizeof(*ip));
1185 *ip = i;
1186 hts_tpool_dispatch(p, q, doit_square_u, ip);
1187 }
1188
1189 hts_tpool_process_flush(q);
1190 hts_tpool_process_destroy(q);
1191 hts_tpool_destroy(p);
1192
1193 return 0;
1194 }
1195
1196
1197 /*-----------------------------------------------------------------------------
1198 * Ordered x -> x*x test.
1199 * Results arrive in numerical order.
1200 *
1201 * This implementation uses a non-blocking dispatch to avoid dead-locks
1202 * where one job takes too long to complete.
1203 */
doit_square(void * arg)1204 void *doit_square(void *arg) {
1205 int job = *(int *)arg;
1206 int *res;
1207
1208 // One excessively slow, to stress test output queue filling and
1209 // excessive out of order scenarios.
1210 usleep(500000 * ((job&31)==31) + random() % 10000);
1211
1212 res = malloc(sizeof(*res));
1213 *res = (job<0) ? -job*job : job*job;
1214
1215 free(arg);
1216 return res;
1217 }
1218
test_square(int n)1219 int test_square(int n) {
1220 hts_tpool *p = hts_tpool_init(n);
1221 hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1222 int i;
1223 hts_tpool_result *r;
1224
1225 // Dispatch jobs
1226 for (i = 0; i < TASK_SIZE; i++) {
1227 int *ip = malloc(sizeof(*ip));
1228 *ip = i;
1229 int blk;
1230
1231 do {
1232 // In the situation where some jobs take much longer than
1233 // others, we could end up blocking here as we haven't got
1234 // any room in the output queue to place it. (We don't launch a
1235 // job if the output queue is full.)
1236
1237 // This happens when the next serial number to fetch is, eg, 50
1238 // but jobs 51-100 have all executed really fast and appeared in
1239 // the output queue before 50. A dispatch & check-results
1240 // alternating loop can fail to find job 50 many times over until
1241 // eventually the dispatch blocks before it arrives.
1242
1243 // Our solution is to dispatch in non-blocking mode so we are
1244 // always to either dispatch or consume a result.
1245 blk = hts_tpool_dispatch2(p, q, doit_square, ip, 1);
1246
1247 // Check for results.
1248 if ((r = hts_tpool_next_result(q))) {
1249 printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
1250 hts_tpool_delete_result(r, 1);
1251 }
1252 if (blk == -1) {
1253 // The alternative is a separate thread for dispatching and/or
1254 // consumption of results. See test_squareB.
1255 putchar('.'); fflush(stdout);
1256 usleep(10000);
1257 }
1258 } while (blk == -1);
1259 }
1260
1261 // Wait for any input-queued up jobs or in-progress jobs to complete.
1262 hts_tpool_process_flush(q);
1263
1264 while ((r = hts_tpool_next_result(q))) {
1265 printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
1266 hts_tpool_delete_result(r, 1);
1267 }
1268
1269 hts_tpool_process_destroy(q);
1270 hts_tpool_destroy(p);
1271
1272 return 0;
1273 }
1274
1275 /*-----------------------------------------------------------------------------
1276 * Ordered x -> x*x test.
1277 * Results arrive in numerical order.
1278 *
1279 * This implementation uses separate dispatching threads and job consumption
1280 * threads (main thread). This means it can use a blocking calls for
1281 * simplicity elsewhere.
1282 */
1283 struct squareB_opt {
1284 hts_tpool *p;
1285 hts_tpool_process *q;
1286 int n;
1287 };
test_squareB_dispatcher(void * arg)1288 static void *test_squareB_dispatcher(void *arg) {
1289 struct squareB_opt *o = (struct squareB_opt *)arg;
1290 int i, *ip;
1291
1292 for (i = 0; i < o->n; i++) {
1293 ip = malloc(sizeof(*ip));
1294 *ip = i;
1295
1296 hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1297 }
1298
1299 // Dispatch an sentinel job to mark the end
1300 *(ip = malloc(sizeof(*ip))) = -1;
1301 hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1302 pthread_exit(NULL);
1303 }
1304
test_squareB(int n)1305 int test_squareB(int n) {
1306 hts_tpool *p = hts_tpool_init(n);
1307 hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1308 struct squareB_opt o = {p, q, TASK_SIZE};
1309 pthread_t tid;
1310
1311 // Launch our job creation thread.
1312 pthread_create(&tid, NULL, test_squareB_dispatcher, &o);
1313
1314 // Consume all results until we find the end-of-job marker.
1315 for(;;) {
1316 hts_tpool_result *r = hts_tpool_next_result_wait(q);
1317 int x = *(int *)hts_tpool_result_data(r);
1318 hts_tpool_delete_result(r, 1);
1319 if (x == -1)
1320 break;
1321 printf("RESULT: %d\n", x);
1322 }
1323
1324 // Wait for any input-queued up jobs or in-progress jobs to complete.
1325 // This should do nothing as we've been executing until the termination
1326 // marker of -1.
1327 hts_tpool_process_flush(q);
1328 assert(hts_tpool_next_result(q) == NULL);
1329
1330 hts_tpool_process_destroy(q);
1331 hts_tpool_destroy(p);
1332 pthread_join(tid, NULL);
1333
1334 return 0;
1335 }
1336
1337
1338 /*-----------------------------------------------------------------------------
1339 * A simple pipeline test.
1340 * We use a dedicated input thread that does the initial generation of job
1341 * and dispatch, several execution steps running in a shared pool, and a
1342 * dedicated output thread that prints up the final result. It's key that our
1343 * pipeline execution stages can run independently and don't themselves have
1344 * any waits. To achieve this we therefore also use some dedicated threads
1345 * that take the output from one queue and resubmits the job as the input to
1346 * the next queue.
1347 *
1348 * More generally this could perhaps be a single pipeline thread that
1349 * marshalls multiple queues and their interactions, but this is simply a
1350 * demonstration of a single pipeline.
1351 *
1352 * Our process fills out the bottom byte of a 32-bit int and then shifts it
1353 * left one byte at a time. Only the final stage needs to be ordered. Each
1354 * stage uses its own queue.
1355 *
1356 * Possible improvement: we only need the last stage to be ordered. By
1357 * allocating our own serial numbers for the first job and manually setting
1358 * these serials in the last job, perhaps we can permit out of order execution
1359 * of all the in-between stages. (I doubt it'll affect speed much though.)
1360 */
1361
1362 static void *pipe_input_thread(void *arg);
1363 static void *pipe_stage1(void *arg);
1364 static void *pipe_stage2(void *arg);
1365 static void *pipe_stage3(void *arg);
1366 static void *pipe_output_thread(void *arg);
1367
1368 typedef struct {
1369 hts_tpool *p;
1370 hts_tpool_process *q1;
1371 hts_tpool_process *q2;
1372 hts_tpool_process *q3;
1373 int n;
1374 } pipe_opt;
1375
1376 typedef struct {
1377 pipe_opt *o;
1378 unsigned int x;
1379 int eof; // set with last job.
1380 } pipe_job;
1381
pipe_input_thread(void * arg)1382 static void *pipe_input_thread(void *arg) {
1383 pipe_opt *o = (pipe_opt *)arg;
1384
1385 int i;
1386 for (i = 1; i <= o->n; i++) {
1387 pipe_job *j = malloc(sizeof(*j));
1388 j->o = o;
1389 j->x = i;
1390 j->eof = (i == o->n);
1391
1392 printf("I %08x\n", j->x);
1393
1394 if (hts_tpool_dispatch(o->p, o->q1, pipe_stage1, j) != 0) {
1395 free(j);
1396 pthread_exit((void *)1);
1397 }
1398 }
1399
1400 pthread_exit(NULL);
1401 }
1402
pipe_stage1(void * arg)1403 static void *pipe_stage1(void *arg) {
1404 pipe_job *j = (pipe_job *)arg;
1405
1406 j->x <<= 8;
1407 usleep(random() % 10000); // fast job
1408 printf("1 %08x\n", j->x);
1409
1410 return j;
1411 }
1412
pipe_stage1to2(void * arg)1413 static void *pipe_stage1to2(void *arg) {
1414 pipe_opt *o = (pipe_opt *)arg;
1415 hts_tpool_result *r;
1416
1417 while ((r = hts_tpool_next_result_wait(o->q1))) {
1418 pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1419 hts_tpool_delete_result(r, 0);
1420 if (hts_tpool_dispatch(j->o->p, j->o->q2, pipe_stage2, j) != 0)
1421 pthread_exit((void *)1);
1422 if (j->eof)
1423 break;
1424 }
1425
1426 pthread_exit(NULL);
1427 }
1428
pipe_stage2(void * arg)1429 static void *pipe_stage2(void *arg) {
1430 pipe_job *j = (pipe_job *)arg;
1431
1432 j->x <<= 8;
1433 usleep(random() % 100000); // slow job
1434 printf("2 %08x\n", j->x);
1435
1436 return j;
1437 }
1438
pipe_stage2to3(void * arg)1439 static void *pipe_stage2to3(void *arg) {
1440 pipe_opt *o = (pipe_opt *)arg;
1441 hts_tpool_result *r;
1442
1443 while ((r = hts_tpool_next_result_wait(o->q2))) {
1444 pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1445 hts_tpool_delete_result(r, 0);
1446 if (hts_tpool_dispatch(j->o->p, j->o->q3, pipe_stage3, j) != 0)
1447 pthread_exit((void *)1);
1448 if (j->eof)
1449 break;
1450 }
1451
1452 pthread_exit(NULL);
1453 }
1454
pipe_stage3(void * arg)1455 static void *pipe_stage3(void *arg) {
1456 pipe_job *j = (pipe_job *)arg;
1457
1458 usleep(random() % 10000); // fast job
1459 j->x <<= 8;
1460 return j;
1461 }
1462
pipe_output_thread(void * arg)1463 static void *pipe_output_thread(void *arg) {
1464 pipe_opt *o = (pipe_opt *)arg;
1465 hts_tpool_result *r;
1466
1467 while ((r = hts_tpool_next_result_wait(o->q3))) {
1468 pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1469 int eof = j->eof;
1470 printf("O %08x\n", j->x);
1471 hts_tpool_delete_result(r, 1);
1472 if (eof)
1473 break;
1474 }
1475
1476 pthread_exit(NULL);
1477 }
1478
test_pipe(int n)1479 int test_pipe(int n) {
1480 hts_tpool *p = hts_tpool_init(n);
1481 hts_tpool_process *q1 = hts_tpool_process_init(p, n*2, 0);
1482 hts_tpool_process *q2 = hts_tpool_process_init(p, n*2, 0);
1483 hts_tpool_process *q3 = hts_tpool_process_init(p, n*2, 0);
1484 pipe_opt o = {p, q1, q2, q3, TASK_SIZE};
1485 pthread_t tidIto1, tid1to2, tid2to3, tid3toO;
1486 void *retv;
1487 int ret;
1488
1489 // Launch our data source and sink threads.
1490 pthread_create(&tidIto1, NULL, pipe_input_thread, &o);
1491 pthread_create(&tid1to2, NULL, pipe_stage1to2, &o);
1492 pthread_create(&tid2to3, NULL, pipe_stage2to3, &o);
1493 pthread_create(&tid3toO, NULL, pipe_output_thread, &o);
1494
1495 // Wait for tasks to finish.
1496 ret = 0;
1497 pthread_join(tidIto1, &retv); ret |= (retv != NULL);
1498 pthread_join(tid1to2, &retv); ret |= (retv != NULL);
1499 pthread_join(tid2to3, &retv); ret |= (retv != NULL);
1500 pthread_join(tid3toO, &retv); ret |= (retv != NULL);
1501 printf("Return value %d\n", ret);
1502
1503 hts_tpool_process_destroy(q1);
1504 hts_tpool_process_destroy(q2);
1505 hts_tpool_process_destroy(q3);
1506 hts_tpool_destroy(p);
1507
1508 return 0;
1509 }
1510
1511 /*-----------------------------------------------------------------------------*/
main(int argc,char ** argv)1512 int main(int argc, char **argv) {
1513 int n;
1514 srandom(0);
1515
1516 if (argc < 3) {
1517 fprintf(stderr, "Usage: %s command n_threads\n", argv[0]);
1518 fprintf(stderr, "Where commands are:\n\n");
1519 fprintf(stderr, "unordered # Unordered output\n");
1520 fprintf(stderr, "ordered1 # Main thread with non-block API\n");
1521 fprintf(stderr, "ordered2 # Dispatch thread, blocking API\n");
1522 fprintf(stderr, "pipe # Multi-stage pipeline, several queues\n");
1523 exit(1);
1524 }
1525
1526 n = atoi(argv[2]);
1527 if (strcmp(argv[1], "unordered") == 0) return test_square_u(n);
1528 if (strcmp(argv[1], "ordered1") == 0) return test_square(n);
1529 if (strcmp(argv[1], "ordered2") == 0) return test_squareB(n);
1530 if (strcmp(argv[1], "pipe") == 0) return test_pipe(n);
1531
1532 fprintf(stderr, "Unknown sub-command\n");
1533 exit(1);
1534 }
1535 #endif
1536