1 /*****************************************************************************
2 * This file is part of Kvazaar HEVC encoder.
3 *
4 * Copyright (c) 2021, Tampere University, ITU/ISO/IEC, project contributors
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modification,
8 * are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice, this
11 * list of conditions and the following disclaimer.
12 *
13 * * Redistributions in binary form must reproduce the above copyright notice, this
14 * list of conditions and the following disclaimer in the documentation and/or
15 * other materials provided with the distribution.
16 *
17 * * Neither the name of the Tampere University or ITU/ISO/IEC nor the names of its
18 * contributors may be used to endorse or promote products derived from
19 * this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
25 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
28 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31 ****************************************************************************/
32
33 #include "global.h"
34 #include "threadqueue.h"
35
36 #include <errno.h> // ETIMEDOUT
37 #include <pthread.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41
42 #include "threads.h"
43
44
45 /**
46 * \file
47 *
48 * Lock acquisition order:
49 *
50 * 1. When locking a job and its dependency, the dependecy must be locked
51 * first and then the job depending on it.
52 *
53 * 2. When locking a job and the thread queue, the thread queue must be
54 * locked first and then the job.
55 *
56 * 3. When accessing threadqueue_job_t.next, the thread queue must be
57 * locked.
58 */
59
60 #define THREADQUEUE_LIST_REALLOC_SIZE 32
61
62 #define PTHREAD_COND_SIGNAL(c) \
63 if (pthread_cond_signal((c)) != 0) { \
64 fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); \
65 assert(0); \
66 return 0; \
67 }
68
69 #define PTHREAD_COND_BROADCAST(c) \
70 if (pthread_cond_broadcast((c)) != 0) { \
71 fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); \
72 assert(0); \
73 return 0; \
74 }
75
76 #define PTHREAD_COND_WAIT(c,l) \
77 if (pthread_cond_wait((c),(l)) != 0) { \
78 fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); \
79 assert(0); \
80 return 0; \
81 }
82
83 #define PTHREAD_LOCK(l) \
84 if (pthread_mutex_lock((l)) != 0) { \
85 fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); \
86 assert(0); \
87 return 0; \
88 }
89
90 #define PTHREAD_UNLOCK(l) \
91 if (pthread_mutex_unlock((l)) != 0) { \
92 fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); \
93 assert(0); \
94 return 0; \
95 }
96
97
98 typedef enum {
99 /**
100 * \brief Job has been submitted, but is not allowed to run yet.
101 */
102 THREADQUEUE_JOB_STATE_PAUSED,
103
104 /**
105 * \brief Job is waiting for dependencies.
106 */
107 THREADQUEUE_JOB_STATE_WAITING,
108
109 /**
110 * \brief Job is ready to run.
111 */
112 THREADQUEUE_JOB_STATE_READY,
113
114 /**
115 * \brief Job is running.
116 */
117 THREADQUEUE_JOB_STATE_RUNNING,
118
119 /**
120 * \brief Job is completed.
121 */
122 THREADQUEUE_JOB_STATE_DONE,
123
124 } threadqueue_job_state;
125
126
127 struct threadqueue_job_t {
128 pthread_mutex_t lock;
129
130 threadqueue_job_state state;
131
132 /**
133 * \brief Number of dependencies that have not been completed yet.
134 */
135 int ndepends;
136
137 /**
138 * \brief Reverse dependencies.
139 *
140 * Array of pointers to jobs that depend on this one. They have to exist
141 * when the thread finishes, because they cannot be run before.
142 */
143 struct threadqueue_job_t **rdepends;
144
145 /**
146 * \brief Number of elements in rdepends.
147 */
148 int rdepends_count;
149
150 /**
151 * \brief Allocated size of rdepends.
152 */
153 int rdepends_size;
154
155 /**
156 * \brief Reference count
157 */
158 int refcount;
159
160 /**
161 * \brief Pointer to the function to execute.
162 */
163 void (*fptr)(void *arg);
164
165 /**
166 * \brief Argument for fptr.
167 */
168 void *arg;
169
170 /**
171 * \brief Pointer to the next job in the queue.
172 */
173 struct threadqueue_job_t *next;
174
175 };
176
177
178 struct threadqueue_queue_t {
179 pthread_mutex_t lock;
180
181 /**
182 * \brief Job available condition variable
183 *
184 * Signalled when there is a new job to do.
185 */
186 pthread_cond_t job_available;
187
188 /**
189 * \brief Job done condition variable
190 *
191 * Signalled when a job has been completed.
192 */
193 pthread_cond_t job_done;
194
195 /**
196 * Array containing spawned threads
197 */
198 pthread_t *threads;
199
200 /**
201 * \brief Number of threads spawned
202 */
203 int thread_count;
204
205 /**
206 * \brief Number of threads running
207 */
208 int thread_running_count;
209
210 /**
211 * \brief If true, threads should stop ASAP.
212 */
213 bool stop;
214
215 /**
216 * \brief Pointer to the first ready job
217 */
218 threadqueue_job_t *first;
219
220 /**
221 * \brief Pointer to the last ready job
222 */
223 threadqueue_job_t *last;
224 };
225
226
227 /**
228 * \brief Add a job to the queue of jobs ready to run.
229 *
230 * The caller must have locked the thread queue and the job. This function
231 * takes the ownership of the job.
232 */
threadqueue_push_job(threadqueue_queue_t * threadqueue,threadqueue_job_t * job)233 static void threadqueue_push_job(threadqueue_queue_t * threadqueue,
234 threadqueue_job_t *job)
235 {
236 assert(job->ndepends == 0);
237 job->state = THREADQUEUE_JOB_STATE_READY;
238
239 if (threadqueue->first == NULL) {
240 threadqueue->first = job;
241 } else {
242 threadqueue->last->next = job;
243 }
244
245 threadqueue->last = job;
246 job->next = NULL;
247 }
248
249
250 /**
251 * \brief Retrieve a job from the queue of jobs ready to run.
252 *
253 * The caller must have locked the thread queue. The calling function
254 * receives the ownership of the job.
255 */
threadqueue_pop_job(threadqueue_queue_t * threadqueue)256 static threadqueue_job_t * threadqueue_pop_job(threadqueue_queue_t * threadqueue)
257 {
258 assert(threadqueue->first != NULL);
259
260 threadqueue_job_t *job = threadqueue->first;
261 threadqueue->first = job->next;
262 job->next = NULL;
263
264 if (threadqueue->first == NULL) {
265 threadqueue->last = NULL;
266 }
267
268 return job;
269 }
270
271
272 /**
273 * \brief Function executed by worker threads.
274 */
threadqueue_worker(void * threadqueue_opaque)275 static void* threadqueue_worker(void* threadqueue_opaque)
276 {
277 threadqueue_queue_t * const threadqueue = (threadqueue_queue_t *) threadqueue_opaque;
278
279 PTHREAD_LOCK(&threadqueue->lock);
280
281 for (;;) {
282 while (!threadqueue->stop && threadqueue->first == NULL) {
283 // Wait until there is something to do in the queue.
284 PTHREAD_COND_WAIT(&threadqueue->job_available, &threadqueue->lock);
285 }
286
287 if (threadqueue->stop) {
288 break;
289 }
290
291 // Get a job and remove it from the queue.
292 threadqueue_job_t *job = threadqueue_pop_job(threadqueue);
293
294 PTHREAD_LOCK(&job->lock);
295 assert(job->state == THREADQUEUE_JOB_STATE_READY);
296 job->state = THREADQUEUE_JOB_STATE_RUNNING;
297 PTHREAD_UNLOCK(&job->lock);
298 PTHREAD_UNLOCK(&threadqueue->lock);
299
300 job->fptr(job->arg);
301
302 PTHREAD_LOCK(&threadqueue->lock);
303 PTHREAD_LOCK(&job->lock);
304 assert(job->state == THREADQUEUE_JOB_STATE_RUNNING);
305 job->state = THREADQUEUE_JOB_STATE_DONE;
306
307 PTHREAD_COND_SIGNAL(&threadqueue->job_done);
308
309 // Go through all the jobs that depend on this one, decreasing their
310 // ndepends. Count how many jobs can now start executing so we know how
311 // many threads to wake up.
312 int num_new_jobs = 0;
313 for (int i = 0; i < job->rdepends_count; ++i) {
314 threadqueue_job_t * const depjob = job->rdepends[i];
315 // The dependency (job) is locked before the job depending on it.
316 // This must be the same order as in kvz_threadqueue_job_dep_add.
317 PTHREAD_LOCK(&depjob->lock);
318
319 assert(depjob->state == THREADQUEUE_JOB_STATE_WAITING ||
320 depjob->state == THREADQUEUE_JOB_STATE_PAUSED);
321 assert(depjob->ndepends > 0);
322 depjob->ndepends--;
323
324 if (depjob->ndepends == 0 && depjob->state == THREADQUEUE_JOB_STATE_WAITING) {
325 // Move the job to ready jobs.
326 threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(depjob));
327 num_new_jobs++;
328 }
329
330 // Clear this reference to the job.
331 PTHREAD_UNLOCK(&depjob->lock);
332 kvz_threadqueue_free_job(&job->rdepends[i]);
333 }
334 job->rdepends_count = 0;
335
336 PTHREAD_UNLOCK(&job->lock);
337 kvz_threadqueue_free_job(&job);
338
339 // The current thread will process one of the new jobs so we wake up
340 // one threads less than the the number of new jobs.
341 for (int i = 0; i < num_new_jobs - 1; i++) {
342 pthread_cond_signal(&threadqueue->job_available);
343 }
344 }
345
346 threadqueue->thread_running_count--;
347 PTHREAD_UNLOCK(&threadqueue->lock);
348 return NULL;
349 }
350
351
352 /**
353 * \brief Initialize the queue.
354 *
355 * \return 1 on success, 0 on failure
356 */
kvz_threadqueue_init(int thread_count)357 threadqueue_queue_t * kvz_threadqueue_init(int thread_count)
358 {
359 threadqueue_queue_t *threadqueue = MALLOC(threadqueue_queue_t, 1);
360 if (!threadqueue) {
361 goto failed;
362 }
363
364 if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) {
365 fprintf(stderr, "pthread_mutex_init failed!\n");
366 goto failed;
367 }
368
369 if (pthread_cond_init(&threadqueue->job_available, NULL) != 0) {
370 fprintf(stderr, "pthread_cond_init failed!\n");
371 goto failed;
372 }
373
374 if (pthread_cond_init(&threadqueue->job_done, NULL) != 0) {
375 fprintf(stderr, "pthread_cond_init failed!\n");
376 goto failed;
377 }
378
379 threadqueue->threads = MALLOC(pthread_t, thread_count);
380 if (!threadqueue->threads) {
381 fprintf(stderr, "Could not malloc threadqueue->threads!\n");
382 goto failed;
383 }
384 threadqueue->thread_count = 0;
385 threadqueue->thread_running_count = 0;
386
387 threadqueue->stop = false;
388
389 threadqueue->first = NULL;
390 threadqueue->last = NULL;
391
392 // Lock the queue before creating threads, to ensure they all have correct information.
393 PTHREAD_LOCK(&threadqueue->lock);
394 for (int i = 0; i < thread_count; i++) {
395 if (pthread_create(&threadqueue->threads[i], NULL, threadqueue_worker, threadqueue) != 0) {
396 fprintf(stderr, "pthread_create failed!\n");
397 goto failed;
398 }
399 threadqueue->thread_count++;
400 threadqueue->thread_running_count++;
401 }
402 PTHREAD_UNLOCK(&threadqueue->lock);
403
404 return threadqueue;
405
406 failed:
407 kvz_threadqueue_free(threadqueue);
408 return NULL;
409 }
410
411
412 /**
413 * \brief Create a job and return a pointer to it.
414 *
415 * The job is created in a paused state. Function kvz_threadqueue_submit
416 * must be called on the job in order to have it run.
417 *
418 * \return pointer to the job, or NULL on failure
419 */
kvz_threadqueue_job_create(void (* fptr)(void * arg),void * arg)420 threadqueue_job_t * kvz_threadqueue_job_create(void (*fptr)(void *arg), void *arg)
421 {
422 threadqueue_job_t *job = MALLOC(threadqueue_job_t, 1);
423 if (!job) {
424 fprintf(stderr, "Could not alloc job!\n");
425 return NULL;
426 }
427
428 if (pthread_mutex_init(&job->lock, NULL) != 0) {
429 fprintf(stderr, "pthread_mutex_init(job) failed!\n");
430 return NULL;
431 }
432
433 job->state = THREADQUEUE_JOB_STATE_PAUSED;
434 job->ndepends = 0;
435 job->rdepends = NULL;
436 job->rdepends_count = 0;
437 job->rdepends_size = 0;
438 job->refcount = 1;
439 job->fptr = fptr;
440 job->arg = arg;
441
442 return job;
443 }
444
445
kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue,threadqueue_job_t * job)446 int kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, threadqueue_job_t *job)
447 {
448 PTHREAD_LOCK(&threadqueue->lock);
449 PTHREAD_LOCK(&job->lock);
450 assert(job->state == THREADQUEUE_JOB_STATE_PAUSED);
451
452 if (threadqueue->thread_count == 0) {
453 // When not using threads, run the job immediately.
454 job->fptr(job->arg);
455 job->state = THREADQUEUE_JOB_STATE_DONE;
456 } else if (job->ndepends == 0) {
457 threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(job));
458 pthread_cond_signal(&threadqueue->job_available);
459 } else {
460 job->state = THREADQUEUE_JOB_STATE_WAITING;
461 }
462 PTHREAD_UNLOCK(&job->lock);
463 PTHREAD_UNLOCK(&threadqueue->lock);
464
465 return 1;
466 }
467
468
469 /**
470 * \brief Add a dependency between two jobs.
471 *
472 * \param job job that should be executed after dependency
473 * \param dependency job that should be executed before job
474 *
475 * \return 1 on success, 0 on failure
476 *
477 */
kvz_threadqueue_job_dep_add(threadqueue_job_t * job,threadqueue_job_t * dependency)478 int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency)
479 {
480 // Lock the dependency first and then the job depending on it.
481 // This must be the same order as in threadqueue_worker.
482 PTHREAD_LOCK(&dependency->lock);
483
484 if (dependency->state == THREADQUEUE_JOB_STATE_DONE) {
485 // The dependency has been completed already so there is nothing to do.
486 PTHREAD_UNLOCK(&dependency->lock);
487 return 1;
488 }
489
490 PTHREAD_LOCK(&job->lock);
491 job->ndepends++;
492 PTHREAD_UNLOCK(&job->lock);
493
494 // Add the reverse dependency
495 if (dependency->rdepends_count >= dependency->rdepends_size) {
496 dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE;
497 size_t bytes = dependency->rdepends_size * sizeof(threadqueue_job_t*);
498 dependency->rdepends = realloc(dependency->rdepends, bytes);
499 }
500 dependency->rdepends[dependency->rdepends_count++] = kvz_threadqueue_copy_ref(job);
501
502 PTHREAD_UNLOCK(&dependency->lock);
503
504 return 1;
505 }
506
507
508 /**
509 * \brief Get a new pointer to a job.
510 *
511 * Increment reference count and return the job.
512 */
kvz_threadqueue_copy_ref(threadqueue_job_t * job)513 threadqueue_job_t *kvz_threadqueue_copy_ref(threadqueue_job_t *job)
514 {
515 int32_t new_refcount = KVZ_ATOMIC_INC(&job->refcount);
516 // The caller should have had another reference and we added one
517 // reference so refcount should be at least 2.
518 assert(new_refcount >= 2);
519 return job;
520 }
521
522
523 /**
524 * \brief Free a job.
525 *
526 * Decrement reference count of the job. If no references exist any more,
527 * deallocate associated memory and destroy mutexes.
528 *
529 * Sets the job pointer to NULL.
530 */
kvz_threadqueue_free_job(threadqueue_job_t ** job_ptr)531 void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr)
532 {
533 threadqueue_job_t *job = *job_ptr;
534 if (job == NULL) return;
535 *job_ptr = NULL;
536
537 int new_refcount = KVZ_ATOMIC_DEC(&job->refcount);
538 if (new_refcount > 0) {
539 // There are still references so we don't free the data yet.
540 return;
541 }
542
543 assert(new_refcount == 0);
544
545 for (int i = 0; i < job->rdepends_count; i++) {
546 kvz_threadqueue_free_job(&job->rdepends[i]);
547 }
548 job->rdepends_count = 0;
549
550 FREE_POINTER(job->rdepends);
551 pthread_mutex_destroy(&job->lock);
552 FREE_POINTER(job);
553 }
554
555
556 /**
557 * \brief Wait for a job to be completed.
558 *
559 * \return 1 on success, 0 on failure
560 */
kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue,threadqueue_job_t * job)561 int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job)
562 {
563 PTHREAD_LOCK(&job->lock);
564 while (job->state != THREADQUEUE_JOB_STATE_DONE) {
565 PTHREAD_COND_WAIT(&threadqueue->job_done, &job->lock);
566 }
567 PTHREAD_UNLOCK(&job->lock);
568
569 return 1;
570 }
571
572
573 /**
574 * \brief Stop all threads after they finish the current jobs.
575 *
576 * Block until all threads have stopped.
577 *
578 * \return 1 on success, 0 on failure
579 */
kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue)580 int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue)
581 {
582 PTHREAD_LOCK(&threadqueue->lock);
583
584 if (threadqueue->stop) {
585 // The threadqueue should have stopped already.
586 assert(threadqueue->thread_running_count == 0);
587 PTHREAD_UNLOCK(&threadqueue->lock);
588 return 1;
589 }
590
591 // Tell all threads to stop.
592 threadqueue->stop = true;
593 PTHREAD_COND_BROADCAST(&threadqueue->job_available);
594 PTHREAD_UNLOCK(&threadqueue->lock);
595
596 // Wait for them to stop.
597 for (int i = 0; i < threadqueue->thread_count; i++) {
598 if (pthread_join(threadqueue->threads[i], NULL) != 0) {
599 fprintf(stderr, "pthread_join failed!\n");
600 return 0;
601 }
602 }
603
604 return 1;
605 }
606
607
608 /**
609 * \brief Stop all threads and free allocated resources.
610 *
611 * \return 1 on success, 0 on failure
612 */
kvz_threadqueue_free(threadqueue_queue_t * threadqueue)613 void kvz_threadqueue_free(threadqueue_queue_t *threadqueue)
614 {
615 if (threadqueue == NULL) return;
616
617 kvz_threadqueue_stop(threadqueue);
618
619 // Free all jobs.
620 while (threadqueue->first) {
621 threadqueue_job_t *next = threadqueue->first->next;
622 kvz_threadqueue_free_job(&threadqueue->first);
623 threadqueue->first = next;
624 }
625 threadqueue->last = NULL;
626
627 FREE_POINTER(threadqueue->threads);
628 threadqueue->thread_count = 0;
629
630 if (pthread_mutex_destroy(&threadqueue->lock) != 0) {
631 fprintf(stderr, "pthread_mutex_destroy failed!\n");
632 }
633
634 if (pthread_cond_destroy(&threadqueue->job_available) != 0) {
635 fprintf(stderr, "pthread_cond_destroy failed!\n");
636 }
637
638 if (pthread_cond_destroy(&threadqueue->job_done) != 0) {
639 fprintf(stderr, "pthread_cond_destroy failed!\n");
640 }
641
642 FREE_POINTER(threadqueue);
643 }
644