1 /*****************************************************************************
2  * This file is part of Kvazaar HEVC encoder.
3  *
4  * Copyright (c) 2021, Tampere University, ITU/ISO/IEC, project contributors
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without modification,
8  * are permitted provided that the following conditions are met:
9  *
10  * * Redistributions of source code must retain the above copyright notice, this
11  *   list of conditions and the following disclaimer.
12  *
13  * * Redistributions in binary form must reproduce the above copyright notice, this
14  *   list of conditions and the following disclaimer in the documentation and/or
15  *   other materials provided with the distribution.
16  *
17  * * Neither the name of the Tampere University or ITU/ISO/IEC nor the names of its
18  *   contributors may be used to endorse or promote products derived from
19  *   this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
25  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
28  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31  ****************************************************************************/
32 
33 #include "global.h"
34 #include "threadqueue.h"
35 
36 #include <errno.h> // ETIMEDOUT
37 #include <pthread.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 
42 #include "threads.h"
43 
44 
45 /**
46  * \file
47  *
48  * Lock acquisition order:
49  *
50  * 1. When locking a job and its dependency, the dependecy must be locked
51  * first and then the job depending on it.
52  *
53  * 2. When locking a job and the thread queue, the thread queue must be
54  * locked first and then the job.
55  *
56  * 3. When accessing threadqueue_job_t.next, the thread queue must be
57  * locked.
58  */
59 
60 #define THREADQUEUE_LIST_REALLOC_SIZE 32
61 
62 #define PTHREAD_COND_SIGNAL(c) \
63   if (pthread_cond_signal((c)) != 0) { \
64     fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); \
65     assert(0); \
66     return 0; \
67   }
68 
69 #define PTHREAD_COND_BROADCAST(c) \
70   if (pthread_cond_broadcast((c)) != 0) { \
71     fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); \
72     assert(0); \
73     return 0; \
74   }
75 
76 #define PTHREAD_COND_WAIT(c,l) \
77   if (pthread_cond_wait((c),(l)) != 0) { \
78     fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); \
79     assert(0); \
80     return 0; \
81   }
82 
83 #define PTHREAD_LOCK(l) \
84   if (pthread_mutex_lock((l)) != 0) { \
85     fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); \
86     assert(0); \
87     return 0; \
88   }
89 
90 #define PTHREAD_UNLOCK(l) \
91   if (pthread_mutex_unlock((l)) != 0) { \
92     fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); \
93     assert(0); \
94     return 0; \
95   }
96 
97 
98 typedef enum {
99   /**
100    * \brief Job has been submitted, but is not allowed to run yet.
101    */
102   THREADQUEUE_JOB_STATE_PAUSED,
103 
104   /**
105    * \brief Job is waiting for dependencies.
106    */
107   THREADQUEUE_JOB_STATE_WAITING,
108 
109   /**
110    * \brief Job is ready to run.
111    */
112   THREADQUEUE_JOB_STATE_READY,
113 
114   /**
115    * \brief Job is running.
116    */
117   THREADQUEUE_JOB_STATE_RUNNING,
118 
119   /**
120    * \brief Job is completed.
121    */
122   THREADQUEUE_JOB_STATE_DONE,
123 
124 } threadqueue_job_state;
125 
126 
127 struct threadqueue_job_t {
128   pthread_mutex_t lock;
129 
130   threadqueue_job_state state;
131 
132   /**
133    * \brief Number of dependencies that have not been completed yet.
134    */
135   int ndepends;
136 
137   /**
138    * \brief Reverse dependencies.
139    *
140    * Array of pointers to jobs that depend on this one. They have to exist
141    * when the thread finishes, because they cannot be run before.
142    */
143   struct threadqueue_job_t **rdepends;
144 
145   /**
146    * \brief Number of elements in rdepends.
147    */
148   int rdepends_count;
149 
150   /**
151    * \brief Allocated size of rdepends.
152    */
153   int rdepends_size;
154 
155   /**
156    * \brief Reference count
157    */
158   int refcount;
159 
160   /**
161    * \brief Pointer to the function to execute.
162    */
163   void (*fptr)(void *arg);
164 
165   /**
166    * \brief Argument for fptr.
167    */
168   void *arg;
169 
170   /**
171    * \brief Pointer to the next job in the queue.
172    */
173   struct threadqueue_job_t *next;
174 
175 };
176 
177 
178 struct threadqueue_queue_t {
179   pthread_mutex_t lock;
180 
181   /**
182    * \brief Job available condition variable
183    *
184    * Signalled when there is a new job to do.
185    */
186   pthread_cond_t job_available;
187 
188   /**
189    * \brief Job done condition variable
190    *
191    * Signalled when a job has been completed.
192    */
193   pthread_cond_t job_done;
194 
195   /**
196    * Array containing spawned threads
197    */
198   pthread_t *threads;
199 
200   /**
201    * \brief Number of threads spawned
202    */
203   int thread_count;
204 
205   /**
206    * \brief Number of threads running
207    */
208   int thread_running_count;
209 
210   /**
211    * \brief If true, threads should stop ASAP.
212    */
213   bool stop;
214 
215   /**
216    * \brief Pointer to the first ready job
217    */
218   threadqueue_job_t *first;
219 
220   /**
221    * \brief Pointer to the last ready job
222    */
223   threadqueue_job_t *last;
224 };
225 
226 
227 /**
228  * \brief Add a job to the queue of jobs ready to run.
229  *
230  * The caller must have locked the thread queue and the job. This function
231  * takes the ownership of the job.
232  */
threadqueue_push_job(threadqueue_queue_t * threadqueue,threadqueue_job_t * job)233 static void threadqueue_push_job(threadqueue_queue_t * threadqueue,
234                                  threadqueue_job_t *job)
235 {
236   assert(job->ndepends == 0);
237   job->state = THREADQUEUE_JOB_STATE_READY;
238 
239   if (threadqueue->first == NULL) {
240     threadqueue->first = job;
241   } else {
242     threadqueue->last->next = job;
243   }
244 
245   threadqueue->last = job;
246   job->next = NULL;
247 }
248 
249 
250 /**
251  * \brief Retrieve a job from the queue of jobs ready to run.
252  *
253  * The caller must have locked the thread queue. The calling function
254  * receives the ownership of the job.
255  */
threadqueue_pop_job(threadqueue_queue_t * threadqueue)256 static threadqueue_job_t * threadqueue_pop_job(threadqueue_queue_t * threadqueue)
257 {
258   assert(threadqueue->first != NULL);
259 
260   threadqueue_job_t *job = threadqueue->first;
261   threadqueue->first = job->next;
262   job->next = NULL;
263 
264   if (threadqueue->first == NULL) {
265     threadqueue->last = NULL;
266   }
267 
268   return job;
269 }
270 
271 
272 /**
273  * \brief Function executed by worker threads.
274  */
threadqueue_worker(void * threadqueue_opaque)275 static void* threadqueue_worker(void* threadqueue_opaque)
276 {
277   threadqueue_queue_t * const threadqueue = (threadqueue_queue_t *) threadqueue_opaque;
278 
279   PTHREAD_LOCK(&threadqueue->lock);
280 
281   for (;;) {
282     while (!threadqueue->stop && threadqueue->first == NULL) {
283       // Wait until there is something to do in the queue.
284       PTHREAD_COND_WAIT(&threadqueue->job_available, &threadqueue->lock);
285     }
286 
287     if (threadqueue->stop) {
288       break;
289     }
290 
291     // Get a job and remove it from the queue.
292     threadqueue_job_t *job = threadqueue_pop_job(threadqueue);
293 
294     PTHREAD_LOCK(&job->lock);
295     assert(job->state == THREADQUEUE_JOB_STATE_READY);
296     job->state = THREADQUEUE_JOB_STATE_RUNNING;
297     PTHREAD_UNLOCK(&job->lock);
298     PTHREAD_UNLOCK(&threadqueue->lock);
299 
300     job->fptr(job->arg);
301 
302     PTHREAD_LOCK(&threadqueue->lock);
303     PTHREAD_LOCK(&job->lock);
304     assert(job->state == THREADQUEUE_JOB_STATE_RUNNING);
305     job->state = THREADQUEUE_JOB_STATE_DONE;
306 
307     PTHREAD_COND_SIGNAL(&threadqueue->job_done);
308 
309     // Go through all the jobs that depend on this one, decreasing their
310     // ndepends. Count how many jobs can now start executing so we know how
311     // many threads to wake up.
312     int num_new_jobs = 0;
313     for (int i = 0; i < job->rdepends_count; ++i) {
314       threadqueue_job_t * const depjob = job->rdepends[i];
315       // The dependency (job) is locked before the job depending on it.
316       // This must be the same order as in kvz_threadqueue_job_dep_add.
317       PTHREAD_LOCK(&depjob->lock);
318 
319       assert(depjob->state == THREADQUEUE_JOB_STATE_WAITING ||
320              depjob->state == THREADQUEUE_JOB_STATE_PAUSED);
321       assert(depjob->ndepends > 0);
322       depjob->ndepends--;
323 
324       if (depjob->ndepends == 0 && depjob->state == THREADQUEUE_JOB_STATE_WAITING) {
325         // Move the job to ready jobs.
326         threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(depjob));
327         num_new_jobs++;
328       }
329 
330       // Clear this reference to the job.
331       PTHREAD_UNLOCK(&depjob->lock);
332       kvz_threadqueue_free_job(&job->rdepends[i]);
333     }
334     job->rdepends_count = 0;
335 
336     PTHREAD_UNLOCK(&job->lock);
337     kvz_threadqueue_free_job(&job);
338 
339     // The current thread will process one of the new jobs so we wake up
340     // one threads less than the the number of new jobs.
341     for (int i = 0; i < num_new_jobs - 1; i++) {
342       pthread_cond_signal(&threadqueue->job_available);
343     }
344   }
345 
346   threadqueue->thread_running_count--;
347   PTHREAD_UNLOCK(&threadqueue->lock);
348   return NULL;
349 }
350 
351 
352 /**
353  * \brief Initialize the queue.
354  *
355  * \return 1 on success, 0 on failure
356  */
kvz_threadqueue_init(int thread_count)357 threadqueue_queue_t * kvz_threadqueue_init(int thread_count)
358 {
359   threadqueue_queue_t *threadqueue = MALLOC(threadqueue_queue_t, 1);
360   if (!threadqueue) {
361     goto failed;
362   }
363 
364   if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) {
365     fprintf(stderr, "pthread_mutex_init failed!\n");
366     goto failed;
367   }
368 
369   if (pthread_cond_init(&threadqueue->job_available, NULL) != 0) {
370     fprintf(stderr, "pthread_cond_init failed!\n");
371     goto failed;
372   }
373 
374   if (pthread_cond_init(&threadqueue->job_done, NULL) != 0) {
375     fprintf(stderr, "pthread_cond_init failed!\n");
376     goto failed;
377   }
378 
379   threadqueue->threads = MALLOC(pthread_t, thread_count);
380   if (!threadqueue->threads) {
381     fprintf(stderr, "Could not malloc threadqueue->threads!\n");
382     goto failed;
383   }
384   threadqueue->thread_count = 0;
385   threadqueue->thread_running_count = 0;
386 
387   threadqueue->stop = false;
388 
389   threadqueue->first              = NULL;
390   threadqueue->last               = NULL;
391 
392   // Lock the queue before creating threads, to ensure they all have correct information.
393   PTHREAD_LOCK(&threadqueue->lock);
394   for (int i = 0; i < thread_count; i++) {
395     if (pthread_create(&threadqueue->threads[i], NULL, threadqueue_worker, threadqueue) != 0) {
396         fprintf(stderr, "pthread_create failed!\n");
397         goto failed;
398     }
399     threadqueue->thread_count++;
400     threadqueue->thread_running_count++;
401   }
402   PTHREAD_UNLOCK(&threadqueue->lock);
403 
404   return threadqueue;
405 
406 failed:
407   kvz_threadqueue_free(threadqueue);
408   return NULL;
409 }
410 
411 
412 /**
413  * \brief Create a job and return a pointer to it.
414  *
415  * The job is created in a paused state. Function kvz_threadqueue_submit
416  * must be called on the job in order to have it run.
417  *
418  * \return pointer to the job, or NULL on failure
419  */
kvz_threadqueue_job_create(void (* fptr)(void * arg),void * arg)420 threadqueue_job_t * kvz_threadqueue_job_create(void (*fptr)(void *arg), void *arg)
421 {
422   threadqueue_job_t *job = MALLOC(threadqueue_job_t, 1);
423   if (!job) {
424     fprintf(stderr, "Could not alloc job!\n");
425     return NULL;
426   }
427 
428   if (pthread_mutex_init(&job->lock, NULL) != 0) {
429     fprintf(stderr, "pthread_mutex_init(job) failed!\n");
430     return NULL;
431   }
432 
433   job->state = THREADQUEUE_JOB_STATE_PAUSED;
434   job->ndepends       = 0;
435   job->rdepends       = NULL;
436   job->rdepends_count = 0;
437   job->rdepends_size  = 0;
438   job->refcount       = 1;
439   job->fptr           = fptr;
440   job->arg            = arg;
441 
442   return job;
443 }
444 
445 
kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue,threadqueue_job_t * job)446 int kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, threadqueue_job_t *job)
447 {
448   PTHREAD_LOCK(&threadqueue->lock);
449   PTHREAD_LOCK(&job->lock);
450   assert(job->state == THREADQUEUE_JOB_STATE_PAUSED);
451 
452   if (threadqueue->thread_count == 0) {
453     // When not using threads, run the job immediately.
454     job->fptr(job->arg);
455     job->state = THREADQUEUE_JOB_STATE_DONE;
456   } else if (job->ndepends == 0) {
457     threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(job));
458     pthread_cond_signal(&threadqueue->job_available);
459   } else {
460     job->state = THREADQUEUE_JOB_STATE_WAITING;
461   }
462   PTHREAD_UNLOCK(&job->lock);
463   PTHREAD_UNLOCK(&threadqueue->lock);
464 
465   return 1;
466 }
467 
468 
469 /**
470  * \brief Add a dependency between two jobs.
471  *
472  * \param job           job that should be executed after dependency
473  * \param dependency    job that should be executed before job
474  *
475  * \return 1 on success, 0 on failure
476  *
477  */
kvz_threadqueue_job_dep_add(threadqueue_job_t * job,threadqueue_job_t * dependency)478 int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency)
479 {
480   // Lock the dependency first and then the job depending on it.
481   // This must be the same order as in threadqueue_worker.
482   PTHREAD_LOCK(&dependency->lock);
483 
484   if (dependency->state == THREADQUEUE_JOB_STATE_DONE) {
485     // The dependency has been completed already so there is nothing to do.
486     PTHREAD_UNLOCK(&dependency->lock);
487     return 1;
488   }
489 
490   PTHREAD_LOCK(&job->lock);
491   job->ndepends++;
492   PTHREAD_UNLOCK(&job->lock);
493 
494   // Add the reverse dependency
495   if (dependency->rdepends_count >= dependency->rdepends_size) {
496     dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE;
497     size_t bytes = dependency->rdepends_size * sizeof(threadqueue_job_t*);
498     dependency->rdepends = realloc(dependency->rdepends, bytes);
499   }
500   dependency->rdepends[dependency->rdepends_count++] = kvz_threadqueue_copy_ref(job);
501 
502   PTHREAD_UNLOCK(&dependency->lock);
503 
504   return 1;
505 }
506 
507 
508 /**
509  * \brief Get a new pointer to a job.
510  *
511  * Increment reference count and return the job.
512  */
kvz_threadqueue_copy_ref(threadqueue_job_t * job)513 threadqueue_job_t *kvz_threadqueue_copy_ref(threadqueue_job_t *job)
514 {
515   int32_t new_refcount = KVZ_ATOMIC_INC(&job->refcount);
516   // The caller should have had another reference and we added one
517   // reference so refcount should be at least 2.
518   assert(new_refcount >= 2);
519   return job;
520 }
521 
522 
523 /**
524  * \brief Free a job.
525  *
526  * Decrement reference count of the job. If no references exist any more,
527  * deallocate associated memory and destroy mutexes.
528  *
529  * Sets the job pointer to NULL.
530  */
kvz_threadqueue_free_job(threadqueue_job_t ** job_ptr)531 void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr)
532 {
533   threadqueue_job_t *job = *job_ptr;
534   if (job == NULL) return;
535   *job_ptr = NULL;
536 
537   int new_refcount = KVZ_ATOMIC_DEC(&job->refcount);
538   if (new_refcount > 0) {
539     // There are still references so we don't free the data yet.
540     return;
541   }
542 
543   assert(new_refcount == 0);
544 
545   for (int i = 0; i < job->rdepends_count; i++) {
546     kvz_threadqueue_free_job(&job->rdepends[i]);
547   }
548   job->rdepends_count = 0;
549 
550   FREE_POINTER(job->rdepends);
551   pthread_mutex_destroy(&job->lock);
552   FREE_POINTER(job);
553 }
554 
555 
556 /**
557  * \brief Wait for a job to be completed.
558  *
559  * \return 1 on success, 0 on failure
560  */
kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue,threadqueue_job_t * job)561 int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job)
562 {
563   PTHREAD_LOCK(&job->lock);
564   while (job->state != THREADQUEUE_JOB_STATE_DONE) {
565     PTHREAD_COND_WAIT(&threadqueue->job_done, &job->lock);
566   }
567   PTHREAD_UNLOCK(&job->lock);
568 
569   return 1;
570 }
571 
572 
573 /**
574  * \brief Stop all threads after they finish the current jobs.
575  *
576  * Block until all threads have stopped.
577  *
578  * \return 1 on success, 0 on failure
579  */
kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue)580 int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue)
581 {
582   PTHREAD_LOCK(&threadqueue->lock);
583 
584   if (threadqueue->stop) {
585     // The threadqueue should have stopped already.
586     assert(threadqueue->thread_running_count == 0);
587     PTHREAD_UNLOCK(&threadqueue->lock);
588     return 1;
589   }
590 
591   // Tell all threads to stop.
592   threadqueue->stop = true;
593   PTHREAD_COND_BROADCAST(&threadqueue->job_available);
594   PTHREAD_UNLOCK(&threadqueue->lock);
595 
596   // Wait for them to stop.
597   for (int i = 0; i < threadqueue->thread_count; i++) {
598     if (pthread_join(threadqueue->threads[i], NULL) != 0) {
599       fprintf(stderr, "pthread_join failed!\n");
600       return 0;
601     }
602   }
603 
604   return 1;
605 }
606 
607 
608 /**
609  * \brief Stop all threads and free allocated resources.
610  *
611  * \return 1 on success, 0 on failure
612  */
kvz_threadqueue_free(threadqueue_queue_t * threadqueue)613 void kvz_threadqueue_free(threadqueue_queue_t *threadqueue)
614 {
615   if (threadqueue == NULL) return;
616 
617   kvz_threadqueue_stop(threadqueue);
618 
619   // Free all jobs.
620   while (threadqueue->first) {
621     threadqueue_job_t *next = threadqueue->first->next;
622     kvz_threadqueue_free_job(&threadqueue->first);
623     threadqueue->first = next;
624   }
625   threadqueue->last = NULL;
626 
627   FREE_POINTER(threadqueue->threads);
628   threadqueue->thread_count = 0;
629 
630   if (pthread_mutex_destroy(&threadqueue->lock) != 0) {
631     fprintf(stderr, "pthread_mutex_destroy failed!\n");
632   }
633 
634   if (pthread_cond_destroy(&threadqueue->job_available) != 0) {
635     fprintf(stderr, "pthread_cond_destroy failed!\n");
636   }
637 
638   if (pthread_cond_destroy(&threadqueue->job_done) != 0) {
639     fprintf(stderr, "pthread_cond_destroy failed!\n");
640   }
641 
642   FREE_POINTER(threadqueue);
643 }
644