1 /*
2 * frame_threads.c -- implementation of transcode multithreaded filter
3 * processing code.
4 *
5 * Copyright (C) Thomas Oestreich - June 2001
6 * updates and partial rewrite:
7 * Copyright (C) Francesco Romani - October 2007
8 *
9 * This file is part of transcode, a video stream processing tool
10 *
11 * transcode is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by
13 * the Free Software Foundation; either version 2, or (at your option)
14 * any later version.
15 *
16 * transcode is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
20 *
21 * You should have received a copy of the GNU General Public License
22 * along with GNU Make; see the file COPYING. If not, write to
23 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 *
25 */
26
27 #include <pthread.h>
28
29 #include "transcode.h"
30 #include "encoder-common.h"
31 #include "framebuffer.h"
32 #include "video_trans.h"
33 #include "audio_trans.h"
34 #include "decoder.h"
35 #include "filter.h"
36
37 #include "frame_threads.h"
38
39 /*************************************************************************/
40
41 typedef struct tcframethreaddata_ TCFrameThreadData;
42 struct tcframethreaddata_ {
43 pthread_t threads[TC_FRAME_THREADS_MAX]; /* thread pool */
44 int count; /* how many workers? */
45
46 pthread_mutex_t lock;
47 volatile int running; /* _pool_ running flag */
48 };
49
50 TCFrameThreadData audio_threads = {
51 .count = 0,
52 .lock = PTHREAD_MUTEX_INITIALIZER,
53 .running = TC_FALSE,
54 };
55
56 TCFrameThreadData video_threads = {
57 .count = 0,
58 .lock = PTHREAD_MUTEX_INITIALIZER,
59 .running = TC_FALSE,
60 };
61
62 /*************************************************************************/
63
64 /*
65 * tc_frame_threads_stop (Thread safe):
66 * set the stop flag for a thread pool.
67 *
68 * Parameters:
69 * data: thread pool descriptor.
70 * Return Value:
71 * None.
72 */
tc_frame_threads_stop(TCFrameThreadData * data)73 static void tc_frame_threads_stop(TCFrameThreadData *data)
74 {
75 pthread_mutex_lock(&data->lock);
76 data->running = TC_FALSE;
77 pthread_mutex_unlock(&data->lock);
78 }
79
80 /*
81 * tc_frame_threads_are_active (Thread safe):
82 * verify if there is a pending stop request for given thread pool.
83 *
84 * Parameters:
85 * data: thread pool descriptor.
86 * Return Value:
87 * !0: there is pending pool stop request.
88 * 0: otherwise.
89 */
tc_frame_threads_are_active(TCFrameThreadData * data)90 static int tc_frame_threads_are_active(TCFrameThreadData *data)
91 {
92 int ret;
93 pthread_mutex_lock(&data->lock);
94 ret = data->running;
95 pthread_mutex_unlock(&data->lock);
96 return ret;
97 }
98
99 /*
100 * stop_requested: verify if the pool thread has to stop.
101 * First thread in the pool notifying the core has to stop must
102 * set the flag to notify the others.
103 *
104 * Parameters:
105 * data: thread pool descriptor.
106 * Return Value:
107 * !0: thread pool has to halt as soon as is possible.
108 * 0: thread pool can continue to run.
109 */
stop_requested(TCFrameThreadData * data)110 static int stop_requested(TCFrameThreadData *data)
111 {
112 return (!tc_running() || !tc_frame_threads_are_active(data));
113 }
114
115
116 /*************************************************************************/
117 /* frame processing core threads */
118 /*************************************************************************/
119
120
121 #define DUP_vptr_if_cloned(vptr) do { \
122 if(vptr->attributes & TC_FRAME_IS_CLONED) { \
123 vframe_list_t *tmptr = vframe_dup(vptr); \
124 \
125 /* ptr was successfully cloned */ \
126 /* delete clone flag */ \
127 tmptr->attributes &= ~TC_FRAME_IS_CLONED; \
128 vptr->attributes &= ~TC_FRAME_IS_CLONED; \
129 \
130 /* set info for filters */ \
131 tmptr->attributes |= TC_FRAME_WAS_CLONED; \
132 \
133 /* this frame is to be processed _after_ the current one */ \
134 /* so put it back into the queue */ \
135 vframe_push_next(tmptr, TC_FRAME_WAIT); \
136 \
137 } \
138 } while (0)
139
140
141
142 #define DUP_aptr_if_cloned(aptr) do { \
143 if(aptr->attributes & TC_FRAME_IS_CLONED) { \
144 aframe_list_t *tmptr = aframe_dup(aptr); \
145 \
146 /* ptr was successfully cloned */ \
147 \
148 /* delete clone flag */ \
149 tmptr->attributes &= ~TC_FRAME_IS_CLONED; \
150 aptr->attributes &= ~TC_FRAME_IS_CLONED; \
151 \
152 /* set info for filters */ \
153 tmptr->attributes |= TC_FRAME_WAS_CLONED; \
154 \
155 /* this frame is to be processed _after_ the current one */ \
156 /* so put it back into the queue */ \
157 aframe_push_next(tmptr, TC_FRAME_WAIT); \
158 \
159 } \
160 } while (0)
161
162
163 #define SET_STOP_FLAG(DATAP, MSG) do { \
164 if (verbose >= TC_CLEANUP) \
165 tc_log_msg(__FILE__, "%s", (MSG)); \
166 tc_frame_threads_stop((DATAP)); \
167 } while (0)
168
process_video_frame(void * _vob)169 static void *process_video_frame(void *_vob)
170 {
171 static int res = 0; // XXX
172 vframe_list_t *ptr = NULL;
173 vob_t *vob = _vob;
174
175 while (!stop_requested(&video_threads)) {
176 ptr = vframe_reserve();
177 if (ptr == NULL) {
178 SET_STOP_FLAG(&video_threads, "video interrupted: exiting!");
179 res = 1;
180 break;
181 }
182 if (ptr->attributes & TC_FRAME_IS_END_OF_STREAM) {
183 SET_STOP_FLAG(&video_threads, "video stream end: marking!");
184 }
185
186 if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
187 vframe_remove(ptr); /* release frame buffer memory */
188 continue;
189 }
190
191 if (TC_FRAME_NEED_PROCESSING(ptr)) {
192 // external plugin pre-processing
193 ptr->tag = TC_VIDEO|TC_PRE_M_PROCESS;
194 tc_filter_process((frame_list_t *)ptr);
195
196 if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
197 vframe_remove(ptr); /* release frame buffer memory */
198 continue;
199 }
200
201 // clone if the filter told us to do so.
202 DUP_vptr_if_cloned(ptr);
203
204 // internal processing of video
205 ptr->tag = TC_VIDEO;
206 process_vid_frame(vob, ptr);
207
208 // external plugin post-processing
209 ptr->tag = TC_VIDEO|TC_POST_M_PROCESS;
210 tc_filter_process((frame_list_t *)ptr);
211
212 if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
213 vframe_remove(ptr); /* release frame buffer memory */
214 continue;
215 }
216 }
217
218 vframe_push_next(ptr, TC_FRAME_READY);
219 }
220 if (verbose >= TC_CLEANUP)
221 tc_log_msg(__FILE__, "video stream end: got, so exiting!");
222
223 pthread_exit(&res);
224 return NULL;
225 }
226
227
process_audio_frame(void * _vob)228 static void *process_audio_frame(void *_vob)
229 {
230 static int res = 0; // XXX
231 aframe_list_t *ptr = NULL;
232 vob_t *vob = _vob;
233
234 while (!stop_requested(&audio_threads)) {
235 ptr = aframe_reserve();
236 if (ptr == NULL) {
237 SET_STOP_FLAG(&audio_threads, "audio interrupted: exiting!");
238 break;
239 res = 1;
240 }
241 if (ptr->attributes & TC_FRAME_IS_END_OF_STREAM) {
242 SET_STOP_FLAG(&audio_threads, "audio stream end: marking!");
243 }
244
245 if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
246 aframe_remove(ptr); /* release frame buffer memory */
247 continue;
248 }
249
250 if (TC_FRAME_NEED_PROCESSING(ptr)) {
251 // external plugin pre-processing
252 ptr->tag = TC_AUDIO|TC_PRE_M_PROCESS;
253 tc_filter_process((frame_list_t *)ptr);
254
255 DUP_aptr_if_cloned(ptr);
256
257 if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
258 aframe_remove(ptr); /* release frame buffer memory */
259 continue;
260 }
261
262 // internal processing of audio
263 ptr->tag = TC_AUDIO;
264 process_aud_frame(vob, ptr);
265
266 // external plugin post-processing
267 ptr->tag = TC_AUDIO|TC_POST_M_PROCESS;
268 tc_filter_process((frame_list_t *)ptr);
269
270 if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
271 aframe_remove(ptr); /* release frame buffer memory */
272 continue;
273 }
274 }
275
276 aframe_push_next(ptr, TC_FRAME_READY);
277 }
278 if (verbose >= TC_CLEANUP)
279 tc_log_msg(__FILE__, "audio stream end: got, so exiting!");
280
281 pthread_exit(&res);
282 return NULL;
283 }
284
285 /*************************************************************************/
286
287
tc_frame_threads_have_video_workers(void)288 int tc_frame_threads_have_video_workers(void)
289 {
290 return (video_threads.count > 0);
291 }
292
tc_frame_threads_have_audio_workers(void)293 int tc_frame_threads_have_audio_workers(void)
294 {
295 return (audio_threads.count > 0);
296 }
297
298
tc_frame_threads_init(vob_t * vob,int vworkers,int aworkers)299 void tc_frame_threads_init(vob_t *vob, int vworkers, int aworkers)
300 {
301 int n = 0;
302
303 if (vworkers > 0 && !video_threads.running) {
304 video_threads.count = vworkers;
305 video_threads.running = TC_TRUE; /* enforce, needed when restarting */
306
307 if (verbose >= TC_DEBUG)
308 tc_log_info(__FILE__, "starting %i video frame"
309 " processing thread(s)", vworkers);
310
311 // start the thread pool
312 for (n = 0; n < vworkers; n++) {
313 if (pthread_create(&video_threads.threads[n], NULL,
314 process_video_frame, vob) != 0)
315 tc_error("failed to start video frame processing thread");
316 }
317 }
318
319 if (aworkers > 0 && !audio_threads.running) {
320 audio_threads.count = aworkers;
321 audio_threads.running = TC_TRUE; /* enforce, needed when restarting */
322
323 if (verbose >= TC_DEBUG)
324 tc_log_info(__FILE__, "starting %i audio frame"
325 " processing thread(s)", aworkers);
326
327 // start the thread pool
328 for (n = 0; n < aworkers; n++) {
329 if (pthread_create(&audio_threads.threads[n], NULL,
330 process_audio_frame, vob) != 0)
331 tc_error("failed to start audio frame processing thread");
332 }
333 }
334 return;
335 }
336
tc_frame_threads_close(void)337 void tc_frame_threads_close(void)
338 {
339 void *status = NULL;
340 int n = 0;
341
342 if (audio_threads.count > 0) {
343 tc_frame_threads_stop(&audio_threads);
344 if (verbose >= TC_CLEANUP)
345 tc_log_msg(__FILE__, "wait for %i audio frame processing threads",
346 audio_threads.count);
347 for (n = 0; n < audio_threads.count; n++)
348 pthread_join(audio_threads.threads[n], &status);
349 if (verbose >= TC_CLEANUP)
350 tc_log_msg(__FILE__, "audio frame processing threads canceled");
351 }
352
353 if (video_threads.count > 0) {
354 tc_frame_threads_stop(&video_threads);
355 if (verbose >= TC_CLEANUP)
356 tc_log_msg(__FILE__, "wait for %i video frame processing threads",
357 video_threads.count);
358 for (n = 0; n < video_threads.count; n++)
359 pthread_join(video_threads.threads[n], &status);
360 if (verbose >= TC_CLEANUP)
361 tc_log_msg(__FILE__, "video frame processing threads canceled");
362 }
363 }
364
365
366 /*************************************************************************/
367
368 /*
369 * Local variables:
370 * c-file-style: "stroustrup"
371 * c-file-offsets: ((case-label . *) (statement-case-intro . *))
372 * indent-tabs-mode: nil
373 * End:
374 *
375 * vim: expandtab shiftwidth=4:
376 */
377