1 /*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include <stdatomic.h>
20 #include "slicethread.h"
21 #include "mem.h"
22 #include "thread.h"
23 #include "avassert.h"
24
25 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
26
27 typedef struct WorkerContext {
28 AVSliceThread *ctx;
29 pthread_mutex_t mutex;
30 pthread_cond_t cond;
31 pthread_t thread;
32 int done;
33 } WorkerContext;
34
35 struct AVSliceThread {
36 WorkerContext *workers;
37 int nb_threads;
38 int nb_active_threads;
39 int nb_jobs;
40
41 atomic_uint first_job;
42 atomic_uint current_job;
43 pthread_mutex_t done_mutex;
44 pthread_cond_t done_cond;
45 int done;
46 int finished;
47
48 void *priv;
49 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
50 void (*main_func)(void *priv);
51 };
52
run_jobs(AVSliceThread * ctx)53 static int run_jobs(AVSliceThread *ctx)
54 {
55 unsigned nb_jobs = ctx->nb_jobs;
56 unsigned nb_active_threads = ctx->nb_active_threads;
57 unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
58 unsigned current_job = first_job;
59
60 do {
61 ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
62 } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
63
64 return current_job == nb_jobs + nb_active_threads - 1;
65 }
66
thread_worker(void * v)67 static void *attribute_align_arg thread_worker(void *v)
68 {
69 WorkerContext *w = v;
70 AVSliceThread *ctx = w->ctx;
71
72 pthread_mutex_lock(&w->mutex);
73 pthread_cond_signal(&w->cond);
74
75 while (1) {
76 w->done = 1;
77 while (w->done)
78 pthread_cond_wait(&w->cond, &w->mutex);
79
80 if (ctx->finished) {
81 pthread_mutex_unlock(&w->mutex);
82 return NULL;
83 }
84
85 if (run_jobs(ctx)) {
86 pthread_mutex_lock(&ctx->done_mutex);
87 ctx->done = 1;
88 pthread_cond_signal(&ctx->done_cond);
89 pthread_mutex_unlock(&ctx->done_mutex);
90 }
91 }
92 }
93
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)94 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
95 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
96 void (*main_func)(void *priv),
97 int nb_threads)
98 {
99 AVSliceThread *ctx;
100 int nb_workers, i;
101
102 av_assert0(nb_threads >= 0);
103 if (!nb_threads) {
104 int nb_cpus = av_cpu_count();
105 if (nb_cpus > 1)
106 nb_threads = nb_cpus + 1;
107 else
108 nb_threads = 1;
109 }
110
111 nb_workers = nb_threads;
112 if (!main_func)
113 nb_workers--;
114
115 *pctx = ctx = av_mallocz(sizeof(*ctx));
116 if (!ctx)
117 return AVERROR(ENOMEM);
118
119 if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
120 av_freep(pctx);
121 return AVERROR(ENOMEM);
122 }
123
124 ctx->priv = priv;
125 ctx->worker_func = worker_func;
126 ctx->main_func = main_func;
127 ctx->nb_threads = nb_threads;
128 ctx->nb_active_threads = 0;
129 ctx->nb_jobs = 0;
130 ctx->finished = 0;
131
132 atomic_init(&ctx->first_job, 0);
133 atomic_init(&ctx->current_job, 0);
134 pthread_mutex_init(&ctx->done_mutex, NULL);
135 pthread_cond_init(&ctx->done_cond, NULL);
136 ctx->done = 0;
137
138 for (i = 0; i < nb_workers; i++) {
139 WorkerContext *w = &ctx->workers[i];
140 int ret;
141 w->ctx = ctx;
142 pthread_mutex_init(&w->mutex, NULL);
143 pthread_cond_init(&w->cond, NULL);
144 pthread_mutex_lock(&w->mutex);
145 w->done = 0;
146
147 if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
148 ctx->nb_threads = main_func ? i : i + 1;
149 pthread_mutex_unlock(&w->mutex);
150 pthread_cond_destroy(&w->cond);
151 pthread_mutex_destroy(&w->mutex);
152 avpriv_slicethread_free(pctx);
153 return AVERROR(ret);
154 }
155
156 while (!w->done)
157 pthread_cond_wait(&w->cond, &w->mutex);
158 pthread_mutex_unlock(&w->mutex);
159 }
160
161 return nb_threads;
162 }
163
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)164 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
165 {
166 int nb_workers, i, is_last = 0;
167
168 av_assert0(nb_jobs > 0);
169 ctx->nb_jobs = nb_jobs;
170 ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
171 atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
172 atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
173 nb_workers = ctx->nb_active_threads;
174 if (!ctx->main_func || !execute_main)
175 nb_workers--;
176
177 for (i = 0; i < nb_workers; i++) {
178 WorkerContext *w = &ctx->workers[i];
179 pthread_mutex_lock(&w->mutex);
180 w->done = 0;
181 pthread_cond_signal(&w->cond);
182 pthread_mutex_unlock(&w->mutex);
183 }
184
185 if (ctx->main_func && execute_main)
186 ctx->main_func(ctx->priv);
187 else
188 is_last = run_jobs(ctx);
189
190 if (!is_last) {
191 pthread_mutex_lock(&ctx->done_mutex);
192 while (!ctx->done)
193 pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
194 ctx->done = 0;
195 pthread_mutex_unlock(&ctx->done_mutex);
196 }
197 }
198
avpriv_slicethread_free(AVSliceThread ** pctx)199 void avpriv_slicethread_free(AVSliceThread **pctx)
200 {
201 AVSliceThread *ctx;
202 int nb_workers, i;
203
204 if (!pctx || !*pctx)
205 return;
206
207 ctx = *pctx;
208 nb_workers = ctx->nb_threads;
209 if (!ctx->main_func)
210 nb_workers--;
211
212 ctx->finished = 1;
213 for (i = 0; i < nb_workers; i++) {
214 WorkerContext *w = &ctx->workers[i];
215 pthread_mutex_lock(&w->mutex);
216 w->done = 0;
217 pthread_cond_signal(&w->cond);
218 pthread_mutex_unlock(&w->mutex);
219 }
220
221 for (i = 0; i < nb_workers; i++) {
222 WorkerContext *w = &ctx->workers[i];
223 pthread_join(w->thread, NULL);
224 pthread_cond_destroy(&w->cond);
225 pthread_mutex_destroy(&w->mutex);
226 }
227
228 pthread_cond_destroy(&ctx->done_cond);
229 pthread_mutex_destroy(&ctx->done_mutex);
230 av_freep(&ctx->workers);
231 av_freep(pctx);
232 }
233
234 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
235
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)236 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
237 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
238 void (*main_func)(void *priv),
239 int nb_threads)
240 {
241 *pctx = NULL;
242 return AVERROR(EINVAL);
243 }
244
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)245 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
246 {
247 av_assert0(0);
248 }
249
avpriv_slicethread_free(AVSliceThread ** pctx)250 void avpriv_slicethread_free(AVSliceThread **pctx)
251 {
252 av_assert0(!pctx || !*pctx);
253 }
254
255 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
256