1 /*  thread_pool.c -- A pool of generic worker threads
2 
3     Copyright (c) 2013-2020 Genome Research Ltd.
4 
5     Author: James Bonfield <jkb@sanger.ac.uk>
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE.  */
24 
25 #ifndef TEST_MAIN
26 #define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
27 #include <config.h>
28 #endif
29 
30 #include <stdlib.h>
31 #include <inttypes.h>
32 #include <signal.h>
33 #include <errno.h>
34 #include <stdio.h>
35 #include <string.h>
36 #include <sys/time.h>
37 #include <assert.h>
38 #include <stdarg.h>
39 #include <unistd.h>
40 #include <limits.h>
41 
42 #include "thread_pool_internal.h"
43 #include "htslib/hts_log.h"
44 
45 // Minimum stack size for threads.  Required for some rANS codecs
46 // that use over 2Mbytes of stack for encoder / decoder state
47 #define HTS_MIN_THREAD_STACK (3 * 1024 * 1024)
48 
49 static void hts_tpool_process_detach_locked(hts_tpool *p,
50                                             hts_tpool_process *q);
51 
52 //#define DEBUG
53 
54 #ifdef DEBUG
worker_id(hts_tpool * p)55 static int worker_id(hts_tpool *p) {
56     int i;
57     pthread_t s = pthread_self();
58     for (i = 0; i < p->tsize; i++) {
59         if (pthread_equal(s, p->t[i].tid))
60             return i;
61     }
62     return -1;
63 }
64 
DBG_OUT(FILE * fp,char * fmt,...)65 void DBG_OUT(FILE *fp, char *fmt, ...) {
66     va_list args;
67     va_start(args, fmt);
68     vfprintf(fp, fmt, args);
69     va_end(args);
70 }
71 #else
72 #define DBG_OUT(...) do{}while(0)
73 #endif
74 
75 /* ----------------------------------------------------------------------------
76  * A process-queue to hold results from the thread pool.
77  *
78  * Each thread pool may have jobs of multiple types being queued up and
79  * interleaved, so we attach several job process-queues to a single pool.
80  *
81  * The jobs themselves are expected to push their results onto their
82  * appropriate results queue.
83  */
84 
85 /*
86  * Adds a result to the end of the process result queue.
87  *
88  * Returns 0 on success;
89  *        -1 on failure
90  */
hts_tpool_add_result(hts_tpool_job * j,void * data)91 static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
92     hts_tpool_process *q = j->q;
93     hts_tpool_result *r;
94 
95     pthread_mutex_lock(&q->p->pool_m);
96 
97     DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n",
98             worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);
99 
100     if (--q->n_processing == 0)
101         pthread_cond_signal(&q->none_processing_c);
102 
103     /* No results queue is fine if we don't want any results back */
104     if (q->in_only) {
105         pthread_mutex_unlock(&q->p->pool_m);
106         return 0;
107     }
108 
109     if (!(r = malloc(sizeof(*r)))) {
110         pthread_mutex_unlock(&q->p->pool_m);
111         hts_tpool_process_shutdown(q);
112         return -1;
113     }
114 
115     r->next = NULL;
116     r->data = data;
117     r->result_cleanup = j->result_cleanup;
118     r->serial = j->serial;
119 
120     q->n_output++;
121     if (q->output_tail) {
122         q->output_tail->next = r;
123         q->output_tail = r;
124     } else {
125         q->output_head = q->output_tail = r;
126     }
127 
128     assert(r->serial >= q->next_serial    // Or it will never be dequeued ...
129            || q->next_serial == INT_MAX); // ... unless flush in progress.
130     if (r->serial == q->next_serial) {
131         DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n",
132                 worker_id(j->p), r->serial);
133         pthread_cond_broadcast(&q->output_avail_c);
134         DBG_OUT(stderr, "%d: Broadcast complete\n", worker_id(j->p));
135     }
136 
137     pthread_mutex_unlock(&q->p->pool_m);
138 
139     return 0;
140 }
141 
142 static void wake_next_worker(hts_tpool_process *q, int locked);
143 
144 /* Core of hts_tpool_next_result() */
hts_tpool_next_result_locked(hts_tpool_process * q)145 static hts_tpool_result *hts_tpool_next_result_locked(hts_tpool_process *q) {
146     hts_tpool_result *r, *last;
147 
148     if (q->shutdown)
149         return NULL;
150 
151     for (last = NULL, r = q->output_head; r; last = r, r = r->next) {
152         if (r->serial == q->next_serial)
153             break;
154     }
155 
156     if (r) {
157         // Remove r from out linked list
158         if (q->output_head == r)
159             q->output_head = r->next;
160         else
161             last->next = r->next;
162 
163         if (q->output_tail == r)
164             q->output_tail = last;
165 
166         if (!q->output_head)
167             q->output_tail = NULL;
168 
169         q->next_serial++;
170         q->n_output--;
171 
172         if (q->qsize && q->n_output < q->qsize) {
173             // Not technically input full, but can guarantee there is
174             // room for the input to go somewhere so we still signal.
175             // The waiting code will then check the condition again.
176             if (q->n_input < q->qsize)
177                 pthread_cond_signal(&q->input_not_full_c);
178             if (!q->shutdown)
179                 wake_next_worker(q, 1);
180         }
181     }
182 
183     return r;
184 }
185 
186 /*
187  * Pulls the next item off the process result queue.  The caller should free
188  * it (and any internals as appropriate) after use.  This doesn't wait for a
189  * result to be present.
190  *
191  * Results will be returned in strict order.
192  *
193  * Returns hts_tpool_result pointer if a result is ready.
194  *         NULL if not.
195  */
hts_tpool_next_result(hts_tpool_process * q)196 hts_tpool_result *hts_tpool_next_result(hts_tpool_process *q) {
197     hts_tpool_result *r;
198 
199     DBG_OUT(stderr, "Requesting next result on queue %p\n", q);
200 
201     pthread_mutex_lock(&q->p->pool_m);
202     r = hts_tpool_next_result_locked(q);
203     pthread_mutex_unlock(&q->p->pool_m);
204 
205     DBG_OUT(stderr, "(q=%p) Found %p\n", q, r);
206 
207     return r;
208 }
209 
210 /*
211  * Pulls the next item off the process result queue.  The caller should free
212  * it (and any internals as appropriate) after use.  This will wait for
213  * a result to be present if none are currently available.
214  *
215  * Results will be returned in strict order.
216  *
217  * Returns hts_tpool_result pointer if a result is ready.
218  *         NULL on error or during shutdown.
219  */
hts_tpool_next_result_wait(hts_tpool_process * q)220 hts_tpool_result *hts_tpool_next_result_wait(hts_tpool_process *q) {
221     hts_tpool_result *r;
222 
223     pthread_mutex_lock(&q->p->pool_m);
224     while (!(r = hts_tpool_next_result_locked(q))) {
225         /* Possible race here now avoided via _locked() call, but in case... */
226         struct timeval now;
227         struct timespec timeout;
228 
229         gettimeofday(&now, NULL);
230         timeout.tv_sec = now.tv_sec + 10;
231         timeout.tv_nsec = now.tv_usec * 1000;
232 
233         q->ref_count++;
234         if (q->shutdown) {
235             int rc = --q->ref_count;
236             pthread_mutex_unlock(&q->p->pool_m);
237             if (rc == 0)
238                 hts_tpool_process_destroy(q);
239             return NULL;
240         }
241         pthread_cond_timedwait(&q->output_avail_c, &q->p->pool_m, &timeout);
242 
243         q->ref_count--;
244     }
245     pthread_mutex_unlock(&q->p->pool_m);
246 
247     return r;
248 }
249 
250 /*
251  * Returns true if there are no items in the process results queue and
252  * also none still pending.
253  */
hts_tpool_process_empty(hts_tpool_process * q)254 int hts_tpool_process_empty(hts_tpool_process *q) {
255     int empty;
256 
257     pthread_mutex_lock(&q->p->pool_m);
258     empty = q->n_input == 0 && q->n_processing == 0 && q->n_output == 0;
259     pthread_mutex_unlock(&q->p->pool_m);
260 
261     return empty;
262 }
263 
hts_tpool_process_ref_incr(hts_tpool_process * q)264 void hts_tpool_process_ref_incr(hts_tpool_process *q) {
265     pthread_mutex_lock(&q->p->pool_m);
266     q->ref_count++;
267     pthread_mutex_unlock(&q->p->pool_m);
268 }
269 
hts_tpool_process_ref_decr(hts_tpool_process * q)270 void hts_tpool_process_ref_decr(hts_tpool_process *q) {
271     pthread_mutex_lock(&q->p->pool_m);
272     if (--q->ref_count <= 0) {
273         pthread_mutex_unlock(&q->p->pool_m);
274         hts_tpool_process_destroy(q);
275         return;
276     }
277 
278     // maybe also call destroy here if needed?
279     pthread_mutex_unlock(&q->p->pool_m);
280 }
281 
282 /*
283  * Returns the number of completed jobs in the process results queue.
284  */
hts_tpool_process_len(hts_tpool_process * q)285 int hts_tpool_process_len(hts_tpool_process *q) {
286     int len;
287 
288     pthread_mutex_lock(&q->p->pool_m);
289     len = q->n_output;
290     pthread_mutex_unlock(&q->p->pool_m);
291 
292     return len;
293 }
294 
295 /*
296  * Returns the number of completed jobs in the process results queue plus the
297  * number running and queued up to run.
298  */
hts_tpool_process_sz(hts_tpool_process * q)299 int hts_tpool_process_sz(hts_tpool_process *q) {
300     int len;
301 
302     pthread_mutex_lock(&q->p->pool_m);
303     len = q->n_output + q->n_input + q->n_processing;
304     pthread_mutex_unlock(&q->p->pool_m);
305 
306     return len;
307 }
308 
309 /*
310  * Shutdown a process.
311  *
312  * This sets the shutdown flag and wakes any threads waiting on process
313  * condition variables.
314  */
hts_tpool_process_shutdown_locked(hts_tpool_process * q)315 static void hts_tpool_process_shutdown_locked(hts_tpool_process *q) {
316     q->shutdown = 1;
317     pthread_cond_broadcast(&q->output_avail_c);
318     pthread_cond_broadcast(&q->input_not_full_c);
319     pthread_cond_broadcast(&q->input_empty_c);
320     pthread_cond_broadcast(&q->none_processing_c);
321 }
322 
hts_tpool_process_shutdown(hts_tpool_process * q)323 void hts_tpool_process_shutdown(hts_tpool_process *q) {
324     pthread_mutex_lock(&q->p->pool_m);
325     hts_tpool_process_shutdown_locked(q);
326     pthread_mutex_unlock(&q->p->pool_m);
327 }
328 
hts_tpool_process_is_shutdown(hts_tpool_process * q)329 int hts_tpool_process_is_shutdown(hts_tpool_process *q) {
330     pthread_mutex_lock(&q->p->pool_m);
331     int r = q->shutdown;
332     pthread_mutex_unlock(&q->p->pool_m);
333     return r;
334 }
335 
336 /*
337  * Frees a result 'r' and if free_data is true also frees
338  * the internal r->data result too.
339  */
hts_tpool_delete_result(hts_tpool_result * r,int free_data)340 void hts_tpool_delete_result(hts_tpool_result *r, int free_data) {
341     if (!r)
342         return;
343 
344     if (free_data && r->data)
345         free(r->data);
346 
347     free(r);
348 }
349 
350 /*
351  * Returns the data portion of a hts_tpool_result, corresponding
352  * to the actual "result" itself.
353  */
hts_tpool_result_data(hts_tpool_result * r)354 void *hts_tpool_result_data(hts_tpool_result *r) {
355     return r->data;
356 }
357 
358 /*
359  * Initialises a thread process-queue.
360  *
361  * In_only, if true, indicates that the process generates does not need to
362  * hold any output.  Otherwise an output queue is used to store the results
363  * of processing each input job.
364  *
365  * Results hts_tpool_process pointer on success;
366  *         NULL on failure
367  */
hts_tpool_process_init(hts_tpool * p,int qsize,int in_only)368 hts_tpool_process *hts_tpool_process_init(hts_tpool *p, int qsize, int in_only) {
369     hts_tpool_process *q = malloc(sizeof(*q));
370     if (!q)
371         return NULL;
372 
373     pthread_cond_init(&q->output_avail_c,   NULL);
374     pthread_cond_init(&q->input_not_full_c, NULL);
375     pthread_cond_init(&q->input_empty_c,    NULL);
376     pthread_cond_init(&q->none_processing_c,NULL);
377 
378     q->p           = p;
379     q->input_head  = NULL;
380     q->input_tail  = NULL;
381     q->output_head = NULL;
382     q->output_tail = NULL;
383     q->next_serial = 0;
384     q->curr_serial = 0;
385     q->no_more_input = 0;
386     q->n_input     = 0;
387     q->n_output    = 0;
388     q->n_processing= 0;
389     q->qsize       = qsize;
390     q->in_only     = in_only;
391     q->shutdown    = 0;
392     q->wake_dispatch = 0;
393     q->ref_count   = 1;
394 
395     q->next        = NULL;
396     q->prev        = NULL;
397 
398     hts_tpool_process_attach(p, q);
399 
400     return q;
401 }
402 
403 /* Deallocates memory for a thread process-queue.
404  * Must be called before the thread pool is destroyed.
405  */
hts_tpool_process_destroy(hts_tpool_process * q)406 void hts_tpool_process_destroy(hts_tpool_process *q) {
407     DBG_OUT(stderr, "Destroying results queue %p\n", q);
408 
409     if (!q)
410         return;
411 
412     // Prevent dispatch from queuing up any more jobs.
413     // We want to reset (and flush) the queue here, before
414     // we set the shutdown flag, but we need to avoid races
415     // with queue more input during reset.
416     pthread_mutex_lock(&q->p->pool_m);
417     q->no_more_input = 1;
418     pthread_mutex_unlock(&q->p->pool_m);
419 
420     // Ensure it's fully drained before destroying the queue
421     hts_tpool_process_reset(q, 0);
422     pthread_mutex_lock(&q->p->pool_m);
423     hts_tpool_process_detach_locked(q->p, q);
424     hts_tpool_process_shutdown_locked(q);
425 
426     // Maybe a worker is scanning this queue, so delay destruction
427     if (--q->ref_count > 0) {
428         pthread_mutex_unlock(&q->p->pool_m);
429         return;
430     }
431 
432     pthread_cond_destroy(&q->output_avail_c);
433     pthread_cond_destroy(&q->input_not_full_c);
434     pthread_cond_destroy(&q->input_empty_c);
435     pthread_cond_destroy(&q->none_processing_c);
436     pthread_mutex_unlock(&q->p->pool_m);
437 
438     free(q);
439 
440     DBG_OUT(stderr, "Destroyed results queue %p\n", q);
441 }
442 
443 
444 /*
445  * Attach and detach a thread process-queue with / from the thread pool
446  * scheduler.
447  *
448  * We need to do attach after making a thread process, but may also wish
449  * to temporarily detach if we wish to stop running jobs on a specific
450  * process while permitting other process to continue.
451  */
hts_tpool_process_attach(hts_tpool * p,hts_tpool_process * q)452 void hts_tpool_process_attach(hts_tpool *p, hts_tpool_process *q) {
453     pthread_mutex_lock(&p->pool_m);
454     if (p->q_head) {
455         q->next = p->q_head;
456         q->prev = p->q_head->prev;
457         p->q_head->prev->next = q;
458         p->q_head->prev = q;
459     } else {
460         q->next = q;
461         q->prev = q;
462     }
463     p->q_head = q;
464     assert(p->q_head && p->q_head->prev && p->q_head->next);
465     pthread_mutex_unlock(&p->pool_m);
466 }
467 
hts_tpool_process_detach_locked(hts_tpool * p,hts_tpool_process * q)468 static void hts_tpool_process_detach_locked(hts_tpool *p,
469                                             hts_tpool_process *q) {
470     if (!p->q_head || !q->prev || !q->next)
471         return;
472 
473     hts_tpool_process *curr = p->q_head, *first = curr;
474     do {
475         if (curr == q) {
476             q->next->prev = q->prev;
477             q->prev->next = q->next;
478             p->q_head = q->next;
479             q->next = q->prev = NULL;
480 
481             // Last one
482             if (p->q_head == q)
483                 p->q_head = NULL;
484             break;
485         }
486 
487         curr = curr->next;
488     } while (curr != first);
489 }
490 
hts_tpool_process_detach(hts_tpool * p,hts_tpool_process * q)491 void hts_tpool_process_detach(hts_tpool *p, hts_tpool_process *q) {
492     pthread_mutex_lock(&p->pool_m);
493     hts_tpool_process_detach_locked(p, q);
494     pthread_mutex_unlock(&p->pool_m);
495 }
496 
497 
498 /* ----------------------------------------------------------------------------
499  * The thread pool.
500  */
501 
502 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
503 
504 /*
505  * A worker thread.
506  *
507  * Once woken, each thread checks each process-queue in the pool in turn,
508  * looking for input jobs that also have room for the output (if it requires
509  * storing).  If found, we execute it and repeat.
510  *
511  * If we checked all input queues and find no such job, then we wait until we
512  * are signalled to check again.
513  */
tpool_worker(void * arg)514 static void *tpool_worker(void *arg) {
515     hts_tpool_worker *w = (hts_tpool_worker *)arg;
516     hts_tpool *p = w->p;
517     hts_tpool_job *j;
518 
519     pthread_mutex_lock(&p->pool_m);
520     while (!p->shutdown) {
521         // Pop an item off the pool queue
522 
523         assert(p->q_head == 0 || (p->q_head->prev && p->q_head->next));
524 
525         int work_to_do = 0;
526         hts_tpool_process *first = p->q_head, *q = first;
527         do {
528             // Iterate over queues, finding one with jobs and also
529             // room to put the result.
530             //if (q && q->input_head && !hts_tpool_process_output_full(q)) {
531             if (q && q->input_head
532                 && q->qsize - q->n_output > q->n_processing
533                 && !q->shutdown) {
534                 work_to_do = 1;
535                 break;
536             }
537 
538             if (q) q = q->next;
539         } while (q && q != first);
540 
541         if (!work_to_do) {
542             // We scanned all queues and cannot process any, so we wait.
543             p->nwaiting++;
544 
545             // Push this thread to the top of the waiting stack
546             if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
547                 p->t_stack_top = w->idx;
548 
549             p->t_stack[w->idx] = 1;
550 //            printf("%2d: no work.  In=%d Proc=%d Out=%d  full=%d\n",
551 //                   w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output,
552 //                   hts_tpool_process_output_full(p->q_head));
553             pthread_cond_wait(&w->pending_c, &p->pool_m);
554             p->t_stack[w->idx] = 0;
555 
556             /* Find new t_stack_top */
557             int i;
558             p->t_stack_top = -1;
559             for (i = 0; i < p->tsize; i++) {
560                 if (p->t_stack[i]) {
561                     p->t_stack_top = i;
562                     break;
563                 }
564             }
565 
566             p->nwaiting--;
567             continue; // To outer loop.
568         }
569 
570         // Otherwise work_to_do, so process as many items in this queue as
571         // possible before switching to another queue.  This means threads
572         // often end up being dedicated to one type of work.
573         q->ref_count++;
574         while (q->input_head && q->qsize - q->n_output > q->n_processing) {
575             if (p->shutdown)
576                 goto shutdown;
577 
578             if (q->shutdown)
579                 // Queue shutdown, but there may be other queues
580                 break;
581 
582             j = q->input_head;
583             assert(j->p == p);
584 
585             if (!(q->input_head = j->next))
586                 q->input_tail = NULL;
587 
588             // Transitioning from full queue to not-full means we can wake up
589             // any blocked dispatch threads.  We broadcast this as it's only
590             // happening once (on the transition) rather than every time we
591             // are below qsize.
592             // (I wish I could remember why io_lib rev 3660 changed this from
593             //  == to >=, but keeping it just in case!)
594             q->n_processing++;
595             if (q->n_input-- >= q->qsize)
596                 pthread_cond_broadcast(&q->input_not_full_c);
597 
598             if (q->n_input == 0)
599                 pthread_cond_signal(&q->input_empty_c);
600 
601             p->njobs--; // Total number of jobs; used to adjust to CPU scaling
602 
603             pthread_mutex_unlock(&p->pool_m);
604 
605             DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n",
606                     worker_id(j->p), q, j->serial);
607 
608             if (hts_tpool_add_result(j, j->func(j->arg)) < 0)
609                 goto err;
610             //memset(j, 0xbb, sizeof(*j));
611             free(j);
612 
613             pthread_mutex_lock(&p->pool_m);
614         }
615         if (--q->ref_count == 0) { // we were the last user
616             hts_tpool_process_destroy(q);
617         } else {
618             // Out of jobs on this queue, so restart search from next one.
619             // This is equivalent to "work-stealing".
620             if (p->q_head)
621                 p->q_head = p->q_head->next;
622         }
623     }
624 
625  shutdown:
626     pthread_mutex_unlock(&p->pool_m);
627 #ifdef DEBUG
628     fprintf(stderr, "%d: Shutting down\n", worker_id(p));
629 #endif
630     return NULL;
631 
632  err:
633 #ifdef DEBUG
634     fprintf(stderr, "%d: Failed to add result\n", worker_id(p));
635 #endif
636     // Hard failure, so shutdown all queues
637     pthread_mutex_lock(&p->pool_m);
638     hts_tpool_process *first = p->q_head, *q = first;
639     if (q) {
640         do {
641             hts_tpool_process_shutdown_locked(q);
642             q->shutdown = 2; // signify error.
643             q = q->next;
644         } while (q != first);
645     }
646     pthread_mutex_unlock(&p->pool_m);
647     return NULL;
648 }
649 
wake_next_worker(hts_tpool_process * q,int locked)650 static void wake_next_worker(hts_tpool_process *q, int locked) {
651     if (!q) return;
652     hts_tpool *p = q->p;
653     if (!locked)
654         pthread_mutex_lock(&p->pool_m);
655 
656     // Update the q_head to be this queue so we'll start processing
657     // the queue we know to have results.
658     assert(q->prev && q->next); // attached
659     p->q_head = q;
660 
661     // Wake up if we have more jobs waiting than CPUs. This partially combats
662     // CPU frequency scaling effects.  Starting too many threads and then
663     // running out of jobs can cause each thread to have lots of start/stop
664     // cycles, which then translates often to CPU frequency scaling
665     // adjustments.  Instead it is better to only start as many threads as we
666     // need to keep the throughput up, meaning some threads run flat out and
667     // others are idle.
668     //
669     // This isn't perfect as we need to know how many can actually start,
670     // rather than how many are waiting.  A limit on output queue size makes
671     // these two figures different.
672     assert(p->njobs >= q->n_input);
673 
674     int running = p->tsize - p->nwaiting;
675     int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting
676         && (q->n_processing < q->qsize - q->n_output);
677 
678 //#define AVG_USAGE
679 #ifdef AVG_USAGE
680     // Track average number of running threads and try to keep close.
681     // We permit this to change, but slowly.  This avoids "boom and bust" cycles
682     // where we read a lot of data, start a lot of jobs, then become idle again.
683     // This way some threads run steadily and others dormant, which is better
684     // for throughput.
685     //
686     // It's 50:50 if this is a good thing.  It helps some tasks quite significantly
687     // while slightly hindering other (perhaps more usual) jobs.
688 
689     if (++p->n_count == 256) {
690         p->n_count >>= 1;
691         p->n_running >>= 1;
692     }
693     p->n_running += running;
694     // Built in lag to avoid see-sawing.  Is this safe in all cases?
695     if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0;
696 #endif
697 
698     if (0) {
699         printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs,
700                running, q->n_output, q->qsize - q->n_output,
701                p->n_running/p->n_count, sig);
702         int i;
703         for (i = 0; i < p->tsize; i++)
704             putchar("x "[p->t_stack[i]]);
705         putchar('\n');
706     }
707 
708     if (sig)
709         pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
710 
711     if (!locked)
712         pthread_mutex_unlock(&p->pool_m);
713 }
714 
715 /*
716  * Creates a worker pool with n worker threads.
717  *
718  * Returns pool pointer on success;
719  *         NULL on failure
720  */
hts_tpool_init(int n)721 hts_tpool *hts_tpool_init(int n) {
722     int t_idx = 0;
723     size_t stack_size = 0;
724     pthread_attr_t pattr;
725     int pattr_init_done = 0;
726     hts_tpool *p = malloc(sizeof(*p));
727     if (!p)
728         return NULL;
729     p->tsize = n;
730     p->njobs = 0;
731     p->nwaiting = 0;
732     p->shutdown = 0;
733     p->q_head = NULL;
734     p->t_stack = NULL;
735     p->n_count = 0;
736     p->n_running = 0;
737     p->t = malloc(n * sizeof(p->t[0]));
738     if (!p->t) {
739         free(p);
740         return NULL;
741     }
742     p->t_stack = malloc(n * sizeof(*p->t_stack));
743     if (!p->t_stack) {
744         free(p->t);
745         free(p);
746         return NULL;
747     }
748     p->t_stack_top = -1;
749 
750     pthread_mutexattr_t attr;
751     pthread_mutexattr_init(&attr);
752     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
753     pthread_mutex_init(&p->pool_m, &attr);
754     pthread_mutexattr_destroy(&attr);
755 
756     pthread_mutex_lock(&p->pool_m);
757 
758     // Ensure new threads have a reasonably large stack.  On some platforms,
759     // for example MacOS which defaults to 512Kb, this is not big enough
760     // for some of the rANS codecs.
761 
762     if (pthread_attr_init(&pattr) < 0)
763         goto cleanup;
764     pattr_init_done = 1;
765     if (pthread_attr_getstacksize(&pattr, &stack_size) < 0)
766         goto cleanup;
767     if (stack_size < HTS_MIN_THREAD_STACK) {
768         if (pthread_attr_setstacksize(&pattr, HTS_MIN_THREAD_STACK) < 0)
769             goto cleanup;
770     }
771 
772     for (t_idx = 0; t_idx < n; t_idx++) {
773         hts_tpool_worker *w = &p->t[t_idx];
774         p->t_stack[t_idx] = 0;
775         w->p = p;
776         w->idx = t_idx;
777         pthread_cond_init(&w->pending_c, NULL);
778         if (0 != pthread_create(&w->tid, &pattr, tpool_worker, w))
779             goto cleanup;
780     }
781 
782     pthread_mutex_unlock(&p->pool_m);
783     pthread_attr_destroy(&pattr);
784 
785     return p;
786 
787  cleanup: {
788         // Any threads started will be waiting for p->pool_m, so we can
789         // stop them cleanly by setting p->shutdown, releasing the mutex and
790         // waiting for them to finish.
791         int j;
792         int save_errno = errno;
793         hts_log_error("Couldn't start thread pool worker : %s",
794                       strerror(errno));
795         p->shutdown = 1;
796         pthread_mutex_unlock(&p->pool_m);
797         for (j = 0; j < t_idx; j++) {
798             pthread_join(p->t[j].tid, NULL);
799             pthread_cond_destroy(&p->t[j].pending_c);
800         }
801         pthread_mutex_destroy(&p->pool_m);
802         if (pattr_init_done)
803             pthread_attr_destroy(&pattr);
804         free(p->t_stack);
805         free(p->t);
806         free(p);
807         errno = save_errno;
808         return NULL;
809     }
810 }
811 
812 /*
813  * Returns the number of requested threads for a pool.
814  */
hts_tpool_size(hts_tpool * p)815 int hts_tpool_size(hts_tpool *p) {
816     return p->tsize;
817 }
818 
819 /*
820  * Adds an item to the work pool.
821  *
822  * Returns 0 on success
823  *        -1 on failure
824  */
hts_tpool_dispatch(hts_tpool * p,hts_tpool_process * q,void * (* func)(void * arg),void * arg)825 int hts_tpool_dispatch(hts_tpool *p, hts_tpool_process *q,
826                     void *(*func)(void *arg), void *arg) {
827     return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, 0);
828 }
829 
830 /*
831  * As above but optional non-block flag.
832  *
833  * nonblock  0 => block if input queue is full
834  * nonblock +1 => don't block if input queue is full, but do not add task
835  * nonblock -1 => add task regardless of whether queue is full (over-size)
836  */
hts_tpool_dispatch2(hts_tpool * p,hts_tpool_process * q,void * (* func)(void * arg),void * arg,int nonblock)837 int hts_tpool_dispatch2(hts_tpool *p, hts_tpool_process *q,
838                         void *(*func)(void *arg), void *arg, int nonblock) {
839     return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, nonblock);
840 }
841 
hts_tpool_dispatch3(hts_tpool * p,hts_tpool_process * q,void * (* exec_func)(void * arg),void * arg,void (* job_cleanup)(void * arg),void (* result_cleanup)(void * data),int nonblock)842 int hts_tpool_dispatch3(hts_tpool *p, hts_tpool_process *q,
843                         void *(*exec_func)(void *arg), void *arg,
844                         void (*job_cleanup)(void *arg),
845                         void (*result_cleanup)(void *data),
846                         int nonblock) {
847     hts_tpool_job *j;
848 
849     pthread_mutex_lock(&p->pool_m);
850 
851     DBG_OUT(stderr, "Dispatching job for queue %p, serial %"PRId64"\n",
852             q, q->curr_serial);
853 
854     if ((q->no_more_input || q->n_input >= q->qsize) && nonblock == 1) {
855         pthread_mutex_unlock(&p->pool_m);
856         errno = EAGAIN;
857         return -1;
858     }
859 
860     if (!(j = malloc(sizeof(*j)))) {
861         pthread_mutex_unlock(&p->pool_m);
862         return -1;
863     }
864     j->func = exec_func;
865     j->arg = arg;
866     j->job_cleanup = job_cleanup;
867     j->result_cleanup = result_cleanup;
868     j->next = NULL;
869     j->p = p;
870     j->q = q;
871     j->serial = q->curr_serial++;
872 
873     if (nonblock == 0) {
874         while ((q->no_more_input || q->n_input >= q->qsize) &&
875                !q->shutdown && !q->wake_dispatch) {
876             pthread_cond_wait(&q->input_not_full_c, &q->p->pool_m);
877         }
878         if (q->no_more_input || q->shutdown) {
879             free(j);
880             pthread_mutex_unlock(&p->pool_m);
881             return -1;
882         }
883         if (q->wake_dispatch) {
884             //fprintf(stderr, "Wake => non-block for this operation\n");
885             q->wake_dispatch = 0;
886         }
887     }
888 
889     p->njobs++;    // total across all queues
890     q->n_input++;  // queue specific
891 
892     if (q->input_tail) {
893         q->input_tail->next = j;
894         q->input_tail = j;
895     } else {
896         q->input_head = q->input_tail = j;
897     }
898 
899     DBG_OUT(stderr, "Dispatched (serial %"PRId64")\n", j->serial);
900 
901     // Let a worker know we have data.
902     // Keep incoming queue at 1 per running thread, so there is always
903     // something waiting when they end their current task.  If we go above
904     // this signal to start more threads (if available). This has the effect
905     // of concentrating jobs to fewer cores when we are I/O bound, which in
906     // turn benefits systems with auto CPU frequency scaling.
907     if (!q->shutdown)
908         wake_next_worker(q, 1);
909 
910     pthread_mutex_unlock(&p->pool_m);
911 
912     return 0;
913 }
914 
915 /*
916  * Wakes up a single thread stuck in dispatch and make it return with
917  * errno EAGAIN.
918  */
hts_tpool_wake_dispatch(hts_tpool_process * q)919 void hts_tpool_wake_dispatch(hts_tpool_process *q) {
920     pthread_mutex_lock(&q->p->pool_m);
921     q->wake_dispatch = 1;
922     pthread_cond_signal(&q->input_not_full_c);
923     pthread_mutex_unlock(&q->p->pool_m);
924 }
925 
926 /*
927  * Flushes the process-queue, but doesn't exit. This simply drains the queue
928  * and ensures all worker threads have finished their current tasks
929  * associated with this process.
930  *
931  * NOT: This does not mean the worker threads are not executing jobs in
932  * another process-queue.
933  *
934  * Returns 0 on success;
935  *        -1 on failure
936  */
hts_tpool_process_flush(hts_tpool_process * q)937 int hts_tpool_process_flush(hts_tpool_process *q) {
938     int i;
939     hts_tpool *p = q->p;
940 
941     DBG_OUT(stderr, "Flushing pool %p\n", p);
942 
943     // Drains the queue
944     pthread_mutex_lock(&p->pool_m);
945 
946     // Wake up everything for the final sprint!
947     for (i = 0; i < p->tsize; i++)
948         if (p->t_stack[i])
949             pthread_cond_signal(&p->t[i].pending_c);
950 
951     // Ensure there is room for the final sprint.
952     // Ideally we shouldn't get here, but the "q->qsize - q->n_output >
953     // n_processing" check in tpool_worker means we can trigger a
954     // deadlock there.  This negates that possibility.
955     if (q->qsize < q->n_output + q->n_input + q->n_processing)
956         q->qsize = q->n_output + q->n_input + q->n_processing;
957 
958     // When shutdown, we won't be launching more, but we can still
959     // wait for any processing jobs complete.
960     if (q->shutdown) {
961         while (q->n_processing)
962             pthread_cond_wait(&q->none_processing_c, &p->pool_m);
963     }
964 
965     // Wait for n_input and n_processing to hit zero.
966     while (!q->shutdown && (q->n_input || q->n_processing)) {
967         struct timeval now;
968         struct timespec timeout;
969 
970         while (q->n_input && !q->shutdown) {
971             gettimeofday(&now, NULL);
972             timeout.tv_sec = now.tv_sec + 1;
973             timeout.tv_nsec = now.tv_usec * 1000;
974             pthread_cond_timedwait(&q->input_empty_c, &p->pool_m, &timeout);
975         }
976 
977         // Note: even if q->shutdown is set, we still have to wait until
978         // q->n_processing is zero as we cannot terminate while things are
979         // running otherwise we free up the data being worked on.
980         while (q->n_processing) {
981             gettimeofday(&now, NULL);
982             timeout.tv_sec = now.tv_sec + 1;
983             timeout.tv_nsec = now.tv_usec * 1000;
984             pthread_cond_timedwait(&q->none_processing_c, &p->pool_m,
985                                    &timeout);
986         }
987         if (q->shutdown) break;
988     }
989 
990     pthread_mutex_unlock(&p->pool_m);
991 
992     DBG_OUT(stderr, "Flushed complete for pool %p, queue %p\n", p, q);
993 
994     return 0;
995 }
996 
997 /*
998  * Resets a process to the initial state.
999  *
1000  * This removes any queued up input jobs, disables any notification of
1001  * new results/output, flushes what is left and then discards any
1002  * queued output.  Anything consumer stuck in a wait on results to
1003  * appear should stay stuck and will only wake up when new data is
1004  * pushed through the queue.
1005  *
1006  * Returns 0 on success;
1007  *        -1 on failure
1008  */
hts_tpool_process_reset(hts_tpool_process * q,int free_results)1009 int hts_tpool_process_reset(hts_tpool_process *q, int free_results) {
1010     hts_tpool_job *j, *jn, *j_head;
1011     hts_tpool_result *r, *rn, *r_head;
1012 
1013     pthread_mutex_lock(&q->p->pool_m);
1014     // prevent next_result from returning data during our flush
1015     q->next_serial = INT_MAX;
1016 
1017     // Remove any queued input not yet being acted upon
1018     j_head = q->input_head;
1019     q->input_head = q->input_tail = NULL;
1020     q->n_input = 0;
1021 
1022     // Remove any queued output, thus ensuring we have room to flush.
1023     r_head = q->output_head;
1024     q->output_head = q->output_tail = NULL;
1025     q->n_output = 0;
1026     pthread_mutex_unlock(&q->p->pool_m);
1027 
1028     // Release memory.  This can be done unlocked now the lists have been
1029     // removed from the queue
1030     for (j = j_head; j; j = jn) {
1031         jn = j->next;
1032         if (j->job_cleanup) j->job_cleanup(j->arg);
1033         free(j);
1034     }
1035 
1036     for (r = r_head; r; r = rn) {
1037         rn = r->next;
1038         if (r->result_cleanup) {
1039             r->result_cleanup(r->data);
1040             r->data = NULL;
1041         }
1042         hts_tpool_delete_result(r, free_results);
1043     }
1044 
1045     // Wait for any jobs being processed to complete.
1046     // (TODO: consider how to cancel any currently processing jobs.
1047     // Probably this is too hard.)
1048     if (hts_tpool_process_flush(q) != 0)
1049         return -1;
1050 
1051     // Remove any new output.
1052     pthread_mutex_lock(&q->p->pool_m);
1053     r_head = q->output_head;
1054     q->output_head = q->output_tail = NULL;
1055     q->n_output = 0;
1056 
1057     // Finally reset the serial back to the starting point.
1058     q->next_serial = q->curr_serial = 0;
1059     pthread_cond_signal(&q->input_not_full_c);
1060     pthread_mutex_unlock(&q->p->pool_m);
1061 
1062     // Discard unwanted output
1063     for (r = r_head; r; r = rn) {
1064         //fprintf(stderr, "Discard output %d\n", r->serial);
1065         rn = r->next;
1066         if (r->result_cleanup) {
1067             r->result_cleanup(r->data);
1068             r->data = NULL;
1069         }
1070         hts_tpool_delete_result(r, free_results);
1071     }
1072 
1073     return 0;
1074 }
1075 
1076 /* Returns the process queue size */
hts_tpool_process_qsize(hts_tpool_process * q)1077 int hts_tpool_process_qsize(hts_tpool_process *q) {
1078     return q->qsize;
1079 }
1080 
1081 /*
1082  * Destroys a thread pool.  The threads are joined into the main
1083  * thread so they will finish their current work load.
1084  */
hts_tpool_destroy(hts_tpool * p)1085 void hts_tpool_destroy(hts_tpool *p) {
1086     int i;
1087 
1088     DBG_OUT(stderr, "Destroying pool %p\n", p);
1089 
1090     /* Send shutdown message to worker threads */
1091     pthread_mutex_lock(&p->pool_m);
1092     p->shutdown = 1;
1093 
1094     DBG_OUT(stderr, "Sending shutdown request\n");
1095 
1096     for (i = 0; i < p->tsize; i++)
1097         pthread_cond_signal(&p->t[i].pending_c);
1098 
1099     pthread_mutex_unlock(&p->pool_m);
1100 
1101     DBG_OUT(stderr, "Shutdown complete\n");
1102 
1103     for (i = 0; i < p->tsize; i++)
1104         pthread_join(p->t[i].tid, NULL);
1105 
1106     pthread_mutex_destroy(&p->pool_m);
1107     for (i = 0; i < p->tsize; i++)
1108         pthread_cond_destroy(&p->t[i].pending_c);
1109 
1110     if (p->t_stack)
1111         free(p->t_stack);
1112 
1113     free(p->t);
1114     free(p);
1115 
1116     DBG_OUT(stderr, "Destroyed pool %p\n", p);
1117 }
1118 
1119 
1120 /*
1121  * Destroys a thread pool without waiting on jobs to complete.
1122  * Use hts_tpool_kill(p) to quickly exit after a fatal error.
1123  */
hts_tpool_kill(hts_tpool * p)1124 void hts_tpool_kill(hts_tpool *p) {
1125     int i;
1126 
1127     DBG_OUT(stderr, "Destroying pool %p, kill=%d\n", p, kill);
1128 
1129     for (i = 0; i < p->tsize; i++)
1130         pthread_kill(p->t[i].tid, SIGINT);
1131 
1132     pthread_mutex_destroy(&p->pool_m);
1133     for (i = 0; i < p->tsize; i++)
1134         pthread_cond_destroy(&p->t[i].pending_c);
1135 
1136     if (p->t_stack)
1137         free(p->t_stack);
1138 
1139     free(p->t);
1140     free(p);
1141 
1142     DBG_OUT(stderr, "Destroyed pool %p\n", p);
1143 }
1144 
1145 
1146 /*=============================================================================
1147  * Test app.
1148  *
1149  * This can be considered both as a basic test and as a worked example for
1150  * various usage patterns.
1151  *=============================================================================
1152  */
1153 
1154 #ifdef TEST_MAIN
1155 
1156 #include <stdio.h>
1157 
1158 #ifndef TASK_SIZE
1159 #define TASK_SIZE 1000
1160 #endif
1161 
1162 /*-----------------------------------------------------------------------------
1163  * Unordered x -> x*x test.
1164  * Results arrive in order of completion.
1165  */
doit_square_u(void * arg)1166 void *doit_square_u(void *arg) {
1167     int job = *(int *)arg;
1168 
1169     usleep(random() % 100000); // to coerce job completion out of order
1170 
1171     printf("RESULT: %d\n", job*job);
1172 
1173     free(arg);
1174     return NULL;
1175 }
1176 
test_square_u(int n)1177 int test_square_u(int n) {
1178     hts_tpool *p = hts_tpool_init(n);
1179     hts_tpool_process *q = hts_tpool_process_init(p, n*2, 1);
1180     int i;
1181 
1182     // Dispatch jobs
1183     for (i = 0; i < TASK_SIZE; i++) {
1184         int *ip = malloc(sizeof(*ip));
1185         *ip = i;
1186         hts_tpool_dispatch(p, q, doit_square_u, ip);
1187     }
1188 
1189     hts_tpool_process_flush(q);
1190     hts_tpool_process_destroy(q);
1191     hts_tpool_destroy(p);
1192 
1193     return 0;
1194 }
1195 
1196 
1197 /*-----------------------------------------------------------------------------
1198  * Ordered x -> x*x test.
1199  * Results arrive in numerical order.
1200  *
1201  * This implementation uses a non-blocking dispatch to avoid dead-locks
1202  * where one job takes too long to complete.
1203  */
doit_square(void * arg)1204 void *doit_square(void *arg) {
1205     int job = *(int *)arg;
1206     int *res;
1207 
1208     // One excessively slow, to stress test output queue filling and
1209     // excessive out of order scenarios.
1210     usleep(500000 * ((job&31)==31) + random() % 10000);
1211 
1212     res = malloc(sizeof(*res));
1213     *res = (job<0) ? -job*job : job*job;
1214 
1215     free(arg);
1216     return res;
1217 }
1218 
test_square(int n)1219 int test_square(int n) {
1220     hts_tpool *p = hts_tpool_init(n);
1221     hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1222     int i;
1223     hts_tpool_result *r;
1224 
1225     // Dispatch jobs
1226     for (i = 0; i < TASK_SIZE; i++) {
1227         int *ip = malloc(sizeof(*ip));
1228         *ip = i;
1229         int blk;
1230 
1231         do {
1232             // In the situation where some jobs take much longer than
1233             // others, we could end up blocking here as we haven't got
1234             // any room in the output queue to place it. (We don't launch a
1235             // job if the output queue is full.)
1236 
1237             // This happens when the next serial number to fetch is, eg, 50
1238             // but jobs 51-100 have all executed really fast and appeared in
1239             // the output queue before 50.  A dispatch & check-results
1240             // alternating loop can fail to find job 50 many times over until
1241             // eventually the dispatch blocks before it arrives.
1242 
1243             // Our solution is to dispatch in non-blocking mode so we are
1244             // always to either dispatch or consume a result.
1245             blk = hts_tpool_dispatch2(p, q, doit_square, ip, 1);
1246 
1247             // Check for results.
1248             if ((r = hts_tpool_next_result(q))) {
1249                 printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
1250                 hts_tpool_delete_result(r, 1);
1251             }
1252             if (blk == -1) {
1253                 // The alternative is a separate thread for dispatching and/or
1254                 // consumption of results. See test_squareB.
1255                 putchar('.'); fflush(stdout);
1256                 usleep(10000);
1257             }
1258         } while (blk == -1);
1259     }
1260 
1261     // Wait for any input-queued up jobs or in-progress jobs to complete.
1262     hts_tpool_process_flush(q);
1263 
1264     while ((r = hts_tpool_next_result(q))) {
1265         printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
1266         hts_tpool_delete_result(r, 1);
1267     }
1268 
1269     hts_tpool_process_destroy(q);
1270     hts_tpool_destroy(p);
1271 
1272     return 0;
1273 }
1274 
1275 /*-----------------------------------------------------------------------------
1276  * Ordered x -> x*x test.
1277  * Results arrive in numerical order.
1278  *
1279  * This implementation uses separate dispatching threads and job consumption
1280  * threads (main thread).  This means it can use a blocking calls for
1281  * simplicity elsewhere.
1282  */
1283 struct squareB_opt {
1284     hts_tpool *p;
1285     hts_tpool_process *q;
1286     int n;
1287 };
test_squareB_dispatcher(void * arg)1288 static void *test_squareB_dispatcher(void *arg) {
1289     struct squareB_opt *o = (struct squareB_opt *)arg;
1290     int i, *ip;
1291 
1292     for (i = 0; i < o->n; i++) {
1293         ip = malloc(sizeof(*ip));
1294         *ip = i;
1295 
1296         hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1297     }
1298 
1299     // Dispatch an sentinel job to mark the end
1300     *(ip = malloc(sizeof(*ip))) = -1;
1301     hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1302     pthread_exit(NULL);
1303 }
1304 
test_squareB(int n)1305 int test_squareB(int n) {
1306     hts_tpool *p = hts_tpool_init(n);
1307     hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1308     struct squareB_opt o = {p, q, TASK_SIZE};
1309     pthread_t tid;
1310 
1311     // Launch our job creation thread.
1312     pthread_create(&tid, NULL, test_squareB_dispatcher, &o);
1313 
1314     // Consume all results until we find the end-of-job marker.
1315     for(;;) {
1316         hts_tpool_result *r = hts_tpool_next_result_wait(q);
1317         int x = *(int *)hts_tpool_result_data(r);
1318         hts_tpool_delete_result(r, 1);
1319         if (x == -1)
1320             break;
1321         printf("RESULT: %d\n", x);
1322     }
1323 
1324     // Wait for any input-queued up jobs or in-progress jobs to complete.
1325     // This should do nothing as we've been executing until the termination
1326     // marker of -1.
1327     hts_tpool_process_flush(q);
1328     assert(hts_tpool_next_result(q) == NULL);
1329 
1330     hts_tpool_process_destroy(q);
1331     hts_tpool_destroy(p);
1332     pthread_join(tid, NULL);
1333 
1334     return 0;
1335 }
1336 
1337 
1338 /*-----------------------------------------------------------------------------
1339  * A simple pipeline test.
1340  * We use a dedicated input thread that does the initial generation of job
1341  * and dispatch, several execution steps running in a shared pool, and a
1342  * dedicated output thread that prints up the final result.  It's key that our
1343  * pipeline execution stages can run independently and don't themselves have
1344  * any waits.  To achieve this we therefore also use some dedicated threads
1345  * that take the output from one queue and resubmits the job as the input to
1346  * the next queue.
1347  *
1348  * More generally this could perhaps be a single pipeline thread that
1349  * marshalls multiple queues and their interactions, but this is simply a
1350  * demonstration of a single pipeline.
1351  *
1352  * Our process fills out the bottom byte of a 32-bit int and then shifts it
1353  * left one byte at a time.  Only the final stage needs to be ordered.  Each
1354  * stage uses its own queue.
1355  *
1356  * Possible improvement: we only need the last stage to be ordered.  By
1357  * allocating our own serial numbers for the first job and manually setting
1358  * these serials in the last job, perhaps we can permit out of order execution
1359  * of all the in-between stages.  (I doubt it'll affect speed much though.)
1360  */
1361 
1362 static void *pipe_input_thread(void *arg);
1363 static void *pipe_stage1(void *arg);
1364 static void *pipe_stage2(void *arg);
1365 static void *pipe_stage3(void *arg);
1366 static void *pipe_output_thread(void *arg);
1367 
1368 typedef struct {
1369     hts_tpool *p;
1370     hts_tpool_process *q1;
1371     hts_tpool_process *q2;
1372     hts_tpool_process *q3;
1373     int n;
1374 } pipe_opt;
1375 
1376 typedef struct {
1377     pipe_opt *o;
1378     unsigned int x;
1379     int eof; // set with last job.
1380 } pipe_job;
1381 
pipe_input_thread(void * arg)1382 static void *pipe_input_thread(void *arg) {
1383     pipe_opt *o = (pipe_opt *)arg;
1384 
1385     int i;
1386     for (i = 1; i <= o->n; i++) {
1387         pipe_job *j = malloc(sizeof(*j));
1388         j->o = o;
1389         j->x = i;
1390         j->eof = (i == o->n);
1391 
1392         printf("I  %08x\n", j->x);
1393 
1394         if (hts_tpool_dispatch(o->p, o->q1, pipe_stage1, j) != 0) {
1395             free(j);
1396             pthread_exit((void *)1);
1397         }
1398     }
1399 
1400     pthread_exit(NULL);
1401 }
1402 
pipe_stage1(void * arg)1403 static void *pipe_stage1(void *arg) {
1404     pipe_job *j = (pipe_job *)arg;
1405 
1406     j->x <<= 8;
1407     usleep(random() % 10000); // fast job
1408     printf("1  %08x\n", j->x);
1409 
1410     return j;
1411 }
1412 
pipe_stage1to2(void * arg)1413 static void *pipe_stage1to2(void *arg) {
1414     pipe_opt *o = (pipe_opt *)arg;
1415     hts_tpool_result *r;
1416 
1417     while ((r = hts_tpool_next_result_wait(o->q1))) {
1418         pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1419         hts_tpool_delete_result(r, 0);
1420         if (hts_tpool_dispatch(j->o->p, j->o->q2, pipe_stage2, j) != 0)
1421             pthread_exit((void *)1);
1422         if (j->eof)
1423             break;
1424     }
1425 
1426     pthread_exit(NULL);
1427 }
1428 
pipe_stage2(void * arg)1429 static void *pipe_stage2(void *arg) {
1430     pipe_job *j = (pipe_job *)arg;
1431 
1432     j->x <<= 8;
1433     usleep(random() % 100000); // slow job
1434     printf("2  %08x\n", j->x);
1435 
1436     return j;
1437 }
1438 
pipe_stage2to3(void * arg)1439 static void *pipe_stage2to3(void *arg) {
1440     pipe_opt *o = (pipe_opt *)arg;
1441     hts_tpool_result *r;
1442 
1443     while ((r = hts_tpool_next_result_wait(o->q2))) {
1444         pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1445         hts_tpool_delete_result(r, 0);
1446         if (hts_tpool_dispatch(j->o->p, j->o->q3, pipe_stage3, j) != 0)
1447             pthread_exit((void *)1);
1448         if (j->eof)
1449             break;
1450     }
1451 
1452     pthread_exit(NULL);
1453 }
1454 
pipe_stage3(void * arg)1455 static void *pipe_stage3(void *arg) {
1456     pipe_job *j = (pipe_job *)arg;
1457 
1458     usleep(random() % 10000); // fast job
1459     j->x <<= 8;
1460     return j;
1461 }
1462 
pipe_output_thread(void * arg)1463 static void *pipe_output_thread(void *arg) {
1464     pipe_opt *o = (pipe_opt *)arg;
1465     hts_tpool_result *r;
1466 
1467     while ((r = hts_tpool_next_result_wait(o->q3))) {
1468         pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1469         int eof = j->eof;
1470         printf("O  %08x\n", j->x);
1471         hts_tpool_delete_result(r, 1);
1472         if (eof)
1473             break;
1474     }
1475 
1476     pthread_exit(NULL);
1477 }
1478 
test_pipe(int n)1479 int test_pipe(int n) {
1480     hts_tpool *p = hts_tpool_init(n);
1481     hts_tpool_process *q1 = hts_tpool_process_init(p, n*2, 0);
1482     hts_tpool_process *q2 = hts_tpool_process_init(p, n*2, 0);
1483     hts_tpool_process *q3 = hts_tpool_process_init(p, n*2, 0);
1484     pipe_opt o = {p, q1, q2, q3, TASK_SIZE};
1485     pthread_t tidIto1, tid1to2, tid2to3, tid3toO;
1486     void *retv;
1487     int ret;
1488 
1489     // Launch our data source and sink threads.
1490     pthread_create(&tidIto1, NULL, pipe_input_thread,  &o);
1491     pthread_create(&tid1to2, NULL, pipe_stage1to2,     &o);
1492     pthread_create(&tid2to3, NULL, pipe_stage2to3,     &o);
1493     pthread_create(&tid3toO, NULL, pipe_output_thread, &o);
1494 
1495     // Wait for tasks to finish.
1496     ret = 0;
1497     pthread_join(tidIto1, &retv); ret |= (retv != NULL);
1498     pthread_join(tid1to2, &retv); ret |= (retv != NULL);
1499     pthread_join(tid2to3, &retv); ret |= (retv != NULL);
1500     pthread_join(tid3toO, &retv); ret |= (retv != NULL);
1501     printf("Return value %d\n", ret);
1502 
1503     hts_tpool_process_destroy(q1);
1504     hts_tpool_process_destroy(q2);
1505     hts_tpool_process_destroy(q3);
1506     hts_tpool_destroy(p);
1507 
1508     return 0;
1509 }
1510 
1511 /*-----------------------------------------------------------------------------*/
main(int argc,char ** argv)1512 int main(int argc, char **argv) {
1513     int n;
1514     srandom(0);
1515 
1516     if (argc < 3) {
1517         fprintf(stderr, "Usage: %s command n_threads\n", argv[0]);
1518         fprintf(stderr, "Where commands are:\n\n");
1519         fprintf(stderr, "unordered       # Unordered output\n");
1520         fprintf(stderr, "ordered1        # Main thread with non-block API\n");
1521         fprintf(stderr, "ordered2        # Dispatch thread, blocking API\n");
1522         fprintf(stderr, "pipe            # Multi-stage pipeline, several queues\n");
1523         exit(1);
1524     }
1525 
1526     n = atoi(argv[2]);
1527     if (strcmp(argv[1], "unordered") == 0) return test_square_u(n);
1528     if (strcmp(argv[1], "ordered1") == 0)  return test_square(n);
1529     if (strcmp(argv[1], "ordered2") == 0)  return test_squareB(n);
1530     if (strcmp(argv[1], "pipe") == 0)      return test_pipe(n);
1531 
1532     fprintf(stderr, "Unknown sub-command\n");
1533     exit(1);
1534 }
1535 #endif
1536