1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdatomic.h>
20 #include "slicethread.h"
21 #include "mem.h"
22 #include "thread.h"
23 #include "avassert.h"
24 
25 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
26 
27 typedef struct WorkerContext {
28     AVSliceThread   *ctx;
29     pthread_mutex_t mutex;
30     pthread_cond_t  cond;
31     pthread_t       thread;
32     int             done;
33 } WorkerContext;
34 
35 struct AVSliceThread {
36     WorkerContext   *workers;
37     int             nb_threads;
38     int             nb_active_threads;
39     int             nb_jobs;
40 
41     atomic_uint     first_job;
42     atomic_uint     current_job;
43     pthread_mutex_t done_mutex;
44     pthread_cond_t  done_cond;
45     int             done;
46     int             finished;
47 
48     void            *priv;
49     void            (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
50     void            (*main_func)(void *priv);
51 };
52 
run_jobs(AVSliceThread * ctx)53 static int run_jobs(AVSliceThread *ctx)
54 {
55     unsigned nb_jobs    = ctx->nb_jobs;
56     unsigned nb_active_threads = ctx->nb_active_threads;
57     unsigned first_job    = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
58     unsigned current_job  = first_job;
59 
60     do {
61         ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
62     } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
63 
64     return current_job == nb_jobs + nb_active_threads - 1;
65 }
66 
thread_worker(void * v)67 static void *attribute_align_arg thread_worker(void *v)
68 {
69     WorkerContext *w = v;
70     AVSliceThread *ctx = w->ctx;
71 
72     pthread_mutex_lock(&w->mutex);
73     pthread_cond_signal(&w->cond);
74 
75     while (1) {
76         w->done = 1;
77         while (w->done)
78             pthread_cond_wait(&w->cond, &w->mutex);
79 
80         if (ctx->finished) {
81             pthread_mutex_unlock(&w->mutex);
82             return NULL;
83         }
84 
85         if (run_jobs(ctx)) {
86             pthread_mutex_lock(&ctx->done_mutex);
87             ctx->done = 1;
88             pthread_cond_signal(&ctx->done_cond);
89             pthread_mutex_unlock(&ctx->done_mutex);
90         }
91     }
92 }
93 
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)94 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
95                               void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
96                               void (*main_func)(void *priv),
97                               int nb_threads)
98 {
99     AVSliceThread *ctx;
100     int nb_workers, i;
101 
102     av_assert0(nb_threads >= 0);
103     if (!nb_threads) {
104         int nb_cpus = av_cpu_count();
105         if (nb_cpus > 1)
106             nb_threads = nb_cpus + 1;
107         else
108             nb_threads = 1;
109     }
110 
111     nb_workers = nb_threads;
112     if (!main_func)
113         nb_workers--;
114 
115     *pctx = ctx = av_mallocz(sizeof(*ctx));
116     if (!ctx)
117         return AVERROR(ENOMEM);
118 
119     if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
120         av_freep(pctx);
121         return AVERROR(ENOMEM);
122     }
123 
124     ctx->priv        = priv;
125     ctx->worker_func = worker_func;
126     ctx->main_func   = main_func;
127     ctx->nb_threads  = nb_threads;
128     ctx->nb_active_threads = 0;
129     ctx->nb_jobs     = 0;
130     ctx->finished    = 0;
131 
132     atomic_init(&ctx->first_job, 0);
133     atomic_init(&ctx->current_job, 0);
134     pthread_mutex_init(&ctx->done_mutex, NULL);
135     pthread_cond_init(&ctx->done_cond, NULL);
136     ctx->done        = 0;
137 
138     for (i = 0; i < nb_workers; i++) {
139         WorkerContext *w = &ctx->workers[i];
140         int ret;
141         w->ctx = ctx;
142         pthread_mutex_init(&w->mutex, NULL);
143         pthread_cond_init(&w->cond, NULL);
144         pthread_mutex_lock(&w->mutex);
145         w->done = 0;
146 
147         if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
148             ctx->nb_threads = main_func ? i : i + 1;
149             pthread_mutex_unlock(&w->mutex);
150             pthread_cond_destroy(&w->cond);
151             pthread_mutex_destroy(&w->mutex);
152             avpriv_slicethread_free(pctx);
153             return AVERROR(ret);
154         }
155 
156         while (!w->done)
157             pthread_cond_wait(&w->cond, &w->mutex);
158         pthread_mutex_unlock(&w->mutex);
159     }
160 
161     return nb_threads;
162 }
163 
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)164 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
165 {
166     int nb_workers, i, is_last = 0;
167 
168     av_assert0(nb_jobs > 0);
169     ctx->nb_jobs           = nb_jobs;
170     ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
171     atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
172     atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
173     nb_workers             = ctx->nb_active_threads;
174     if (!ctx->main_func || !execute_main)
175         nb_workers--;
176 
177     for (i = 0; i < nb_workers; i++) {
178         WorkerContext *w = &ctx->workers[i];
179         pthread_mutex_lock(&w->mutex);
180         w->done = 0;
181         pthread_cond_signal(&w->cond);
182         pthread_mutex_unlock(&w->mutex);
183     }
184 
185     if (ctx->main_func && execute_main)
186         ctx->main_func(ctx->priv);
187     else
188         is_last = run_jobs(ctx);
189 
190     if (!is_last) {
191         pthread_mutex_lock(&ctx->done_mutex);
192         while (!ctx->done)
193             pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
194         ctx->done = 0;
195         pthread_mutex_unlock(&ctx->done_mutex);
196     }
197 }
198 
avpriv_slicethread_free(AVSliceThread ** pctx)199 void avpriv_slicethread_free(AVSliceThread **pctx)
200 {
201     AVSliceThread *ctx;
202     int nb_workers, i;
203 
204     if (!pctx || !*pctx)
205         return;
206 
207     ctx = *pctx;
208     nb_workers = ctx->nb_threads;
209     if (!ctx->main_func)
210         nb_workers--;
211 
212     ctx->finished = 1;
213     for (i = 0; i < nb_workers; i++) {
214         WorkerContext *w = &ctx->workers[i];
215         pthread_mutex_lock(&w->mutex);
216         w->done = 0;
217         pthread_cond_signal(&w->cond);
218         pthread_mutex_unlock(&w->mutex);
219     }
220 
221     for (i = 0; i < nb_workers; i++) {
222         WorkerContext *w = &ctx->workers[i];
223         pthread_join(w->thread, NULL);
224         pthread_cond_destroy(&w->cond);
225         pthread_mutex_destroy(&w->mutex);
226     }
227 
228     pthread_cond_destroy(&ctx->done_cond);
229     pthread_mutex_destroy(&ctx->done_mutex);
230     av_freep(&ctx->workers);
231     av_freep(pctx);
232 }
233 
234 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
235 
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)236 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
237                               void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
238                               void (*main_func)(void *priv),
239                               int nb_threads)
240 {
241     *pctx = NULL;
242     return AVERROR(EINVAL);
243 }
244 
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)245 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
246 {
247     av_assert0(0);
248 }
249 
avpriv_slicethread_free(AVSliceThread ** pctx)250 void avpriv_slicethread_free(AVSliceThread **pctx)
251 {
252     av_assert0(!pctx || !*pctx);
253 }
254 
255 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
256