1 /*
2  *  frame_threads.c -- implementation of transcode multithreaded filter
3  *                     processing code.
4  *
5  *  Copyright (C) Thomas Oestreich - June 2001
6  *  updates and partial rewrite:
7  *  Copyright (C) Francesco Romani - October 2007
8  *
9  *  This file is part of transcode, a video stream processing tool
10  *
11  *  transcode is free software; you can redistribute it and/or modify
12  *  it under the terms of the GNU General Public License as published by
13  *  the Free Software Foundation; either version 2, or (at your option)
14  *  any later version.
15  *
16  *  transcode is distributed in the hope that it will be useful,
17  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *  GNU General Public License for more details.
20  *
21  *  You should have received a copy of the GNU General Public License
22  *  along with GNU Make; see the file COPYING.  If not, write to
23  *  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26 
27 #include <pthread.h>
28 
29 #include "transcode.h"
30 #include "encoder-common.h"
31 #include "framebuffer.h"
32 #include "video_trans.h"
33 #include "audio_trans.h"
34 #include "decoder.h"
35 #include "filter.h"
36 
37 #include "frame_threads.h"
38 
39 /*************************************************************************/
40 
41 typedef struct tcframethreaddata_ TCFrameThreadData;
42 struct tcframethreaddata_ {
43     pthread_t threads[TC_FRAME_THREADS_MAX];    /* thread pool           */
44     int count;                                  /* how many workers?     */
45 
46     pthread_mutex_t lock;
47     volatile int running;                       /* _pool_ running flag   */
48 };
49 
50 TCFrameThreadData audio_threads = {
51     .count   = 0,
52     .lock    = PTHREAD_MUTEX_INITIALIZER,
53     .running = TC_FALSE,
54 };
55 
56 TCFrameThreadData video_threads = {
57     .count   = 0,
58     .lock    = PTHREAD_MUTEX_INITIALIZER,
59     .running = TC_FALSE,
60 };
61 
62 /*************************************************************************/
63 
64 /*
65  * tc_frame_threads_stop (Thread safe):
66  * set the stop flag for a thread pool.
67  *
68  * Parameters:
69  *      data: thread pool descriptor.
70  * Return Value:
71  *      None.
72  */
tc_frame_threads_stop(TCFrameThreadData * data)73 static void tc_frame_threads_stop(TCFrameThreadData *data)
74 {
75     pthread_mutex_lock(&data->lock);
76     data->running = TC_FALSE;
77     pthread_mutex_unlock(&data->lock);
78 }
79 
80 /*
81  * tc_frame_threads_are_active (Thread safe):
82  * verify if there is a pending stop request for given thread pool.
83  *
84  * Parameters:
85  *      data: thread pool descriptor.
86  * Return Value:
87  *      !0: there is pending pool stop request.
88  *       0: otherwise.
89  */
tc_frame_threads_are_active(TCFrameThreadData * data)90 static int tc_frame_threads_are_active(TCFrameThreadData *data)
91 {
92     int ret;
93     pthread_mutex_lock(&data->lock);
94     ret = data->running;
95     pthread_mutex_unlock(&data->lock);
96     return ret;
97 }
98 
99 /*
100  * stop_requested: verify if the pool thread has to stop.
101  * First thread in the pool notifying the core has to stop must
102  * set the flag to notify the others.
103  *
104  * Parameters:
105  *      data: thread pool descriptor.
106  * Return Value:
107  *      !0: thread pool has to halt as soon as is possible.
108  *       0: thread pool can continue to run.
109  */
stop_requested(TCFrameThreadData * data)110 static int stop_requested(TCFrameThreadData *data)
111 {
112     return (!tc_running() || !tc_frame_threads_are_active(data));
113 }
114 
115 
116 /*************************************************************************/
117 /*         frame processing core threads                                 */
118 /*************************************************************************/
119 
120 
121 #define DUP_vptr_if_cloned(vptr) do { \
122     if(vptr->attributes & TC_FRAME_IS_CLONED) { \
123         vframe_list_t *tmptr = vframe_dup(vptr); \
124         \
125         /* ptr was successfully cloned */ \
126         /* delete clone flag */ \
127         tmptr->attributes &= ~TC_FRAME_IS_CLONED; \
128         vptr->attributes  &= ~TC_FRAME_IS_CLONED; \
129         \
130         /* set info for filters */ \
131         tmptr->attributes |= TC_FRAME_WAS_CLONED; \
132         \
133         /* this frame is to be processed _after_ the current one */ \
134         /* so put it back into the queue */ \
135         vframe_push_next(tmptr, TC_FRAME_WAIT); \
136         \
137     } \
138 } while (0)
139 
140 
141 
142 #define DUP_aptr_if_cloned(aptr) do { \
143     if(aptr->attributes & TC_FRAME_IS_CLONED) {  \
144         aframe_list_t *tmptr = aframe_dup(aptr);  \
145         \
146         /* ptr was successfully cloned */ \
147         \
148         /* delete clone flag */ \
149         tmptr->attributes &= ~TC_FRAME_IS_CLONED;  \
150         aptr->attributes  &= ~TC_FRAME_IS_CLONED;  \
151         \
152         /* set info for filters */ \
153         tmptr->attributes |= TC_FRAME_WAS_CLONED;  \
154         \
155         /* this frame is to be processed _after_ the current one */ \
156         /* so put it back into the queue */ \
157         aframe_push_next(tmptr, TC_FRAME_WAIT);  \
158         \
159     }  \
160 } while (0)
161 
162 
163 #define SET_STOP_FLAG(DATAP, MSG) do { \
164     if (verbose >= TC_CLEANUP) \
165         tc_log_msg(__FILE__, "%s", (MSG)); \
166     tc_frame_threads_stop((DATAP)); \
167 } while (0)
168 
process_video_frame(void * _vob)169 static void *process_video_frame(void *_vob)
170 {
171     static int res = 0; // XXX
172     vframe_list_t *ptr = NULL;
173     vob_t *vob = _vob;
174 
175     while (!stop_requested(&video_threads)) {
176         ptr = vframe_reserve();
177         if (ptr == NULL) {
178             SET_STOP_FLAG(&video_threads, "video interrupted: exiting!");
179             res = 1;
180             break;
181         }
182         if (ptr->attributes & TC_FRAME_IS_END_OF_STREAM) {
183             SET_STOP_FLAG(&video_threads, "video stream end: marking!");
184         }
185 
186         if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
187             vframe_remove(ptr);  /* release frame buffer memory */
188             continue;
189         }
190 
191         if (TC_FRAME_NEED_PROCESSING(ptr)) {
192             // external plugin pre-processing
193             ptr->tag = TC_VIDEO|TC_PRE_M_PROCESS;
194             tc_filter_process((frame_list_t *)ptr);
195 
196             if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
197                 vframe_remove(ptr);  /* release frame buffer memory */
198                 continue;
199             }
200 
201             // clone if the filter told us to do so.
202             DUP_vptr_if_cloned(ptr);
203 
204             // internal processing of video
205             ptr->tag = TC_VIDEO;
206             process_vid_frame(vob, ptr);
207 
208             // external plugin post-processing
209             ptr->tag = TC_VIDEO|TC_POST_M_PROCESS;
210             tc_filter_process((frame_list_t *)ptr);
211 
212             if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
213                 vframe_remove(ptr);  /* release frame buffer memory */
214                 continue;
215             }
216         }
217 
218         vframe_push_next(ptr, TC_FRAME_READY);
219     }
220     if (verbose >= TC_CLEANUP)
221         tc_log_msg(__FILE__, "video stream end: got, so exiting!");
222 
223     pthread_exit(&res);
224     return NULL;
225 }
226 
227 
process_audio_frame(void * _vob)228 static void *process_audio_frame(void *_vob)
229 {
230     static int res = 0; // XXX
231     aframe_list_t *ptr = NULL;
232     vob_t *vob = _vob;
233 
234     while (!stop_requested(&audio_threads)) {
235         ptr = aframe_reserve();
236         if (ptr == NULL) {
237             SET_STOP_FLAG(&audio_threads, "audio interrupted: exiting!");
238             break;
239             res = 1;
240         }
241         if (ptr->attributes & TC_FRAME_IS_END_OF_STREAM) {
242             SET_STOP_FLAG(&audio_threads, "audio stream end: marking!");
243         }
244 
245         if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
246             aframe_remove(ptr);  /* release frame buffer memory */
247             continue;
248         }
249 
250         if (TC_FRAME_NEED_PROCESSING(ptr)) {
251             // external plugin pre-processing
252             ptr->tag = TC_AUDIO|TC_PRE_M_PROCESS;
253             tc_filter_process((frame_list_t *)ptr);
254 
255             DUP_aptr_if_cloned(ptr);
256 
257             if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
258                 aframe_remove(ptr);  /* release frame buffer memory */
259                 continue;
260             }
261 
262             // internal processing of audio
263             ptr->tag = TC_AUDIO;
264             process_aud_frame(vob, ptr);
265 
266             // external plugin post-processing
267             ptr->tag = TC_AUDIO|TC_POST_M_PROCESS;
268             tc_filter_process((frame_list_t *)ptr);
269 
270             if (ptr->attributes & TC_FRAME_IS_SKIPPED) {
271                 aframe_remove(ptr);  /* release frame buffer memory */
272                 continue;
273             }
274         }
275 
276         aframe_push_next(ptr, TC_FRAME_READY);
277     }
278     if (verbose >= TC_CLEANUP)
279         tc_log_msg(__FILE__, "audio stream end: got, so exiting!");
280 
281     pthread_exit(&res);
282     return NULL;
283 }
284 
285 /*************************************************************************/
286 
287 
tc_frame_threads_have_video_workers(void)288 int tc_frame_threads_have_video_workers(void)
289 {
290     return (video_threads.count > 0);
291 }
292 
tc_frame_threads_have_audio_workers(void)293 int tc_frame_threads_have_audio_workers(void)
294 {
295     return (audio_threads.count > 0);
296 }
297 
298 
tc_frame_threads_init(vob_t * vob,int vworkers,int aworkers)299 void tc_frame_threads_init(vob_t *vob, int vworkers, int aworkers)
300 {
301     int n = 0;
302 
303     if (vworkers > 0 && !video_threads.running) {
304         video_threads.count   = vworkers;
305         video_threads.running = TC_TRUE; /* enforce, needed when restarting */
306 
307         if (verbose >= TC_DEBUG)
308             tc_log_info(__FILE__, "starting %i video frame"
309                                  " processing thread(s)", vworkers);
310 
311         // start the thread pool
312         for (n = 0; n < vworkers; n++) {
313             if (pthread_create(&video_threads.threads[n], NULL,
314                                process_video_frame, vob) != 0)
315                 tc_error("failed to start video frame processing thread");
316         }
317     }
318 
319     if (aworkers > 0 && !audio_threads.running) {
320         audio_threads.count   = aworkers;
321         audio_threads.running = TC_TRUE; /* enforce, needed when restarting */
322 
323         if (verbose >= TC_DEBUG)
324             tc_log_info(__FILE__, "starting %i audio frame"
325                                  " processing thread(s)", aworkers);
326 
327         // start the thread pool
328         for (n = 0; n < aworkers; n++) {
329             if (pthread_create(&audio_threads.threads[n], NULL,
330                                process_audio_frame, vob) != 0)
331                 tc_error("failed to start audio frame processing thread");
332         }
333     }
334     return;
335 }
336 
tc_frame_threads_close(void)337 void tc_frame_threads_close(void)
338 {
339     void *status = NULL;
340     int n = 0;
341 
342     if (audio_threads.count > 0) {
343         tc_frame_threads_stop(&audio_threads);
344         if (verbose >= TC_CLEANUP)
345             tc_log_msg(__FILE__, "wait for %i audio frame processing threads",
346                        audio_threads.count);
347         for (n = 0; n < audio_threads.count; n++)
348             pthread_join(audio_threads.threads[n], &status);
349         if (verbose >= TC_CLEANUP)
350             tc_log_msg(__FILE__, "audio frame processing threads canceled");
351     }
352 
353     if (video_threads.count > 0) {
354         tc_frame_threads_stop(&video_threads);
355         if (verbose >= TC_CLEANUP)
356             tc_log_msg(__FILE__, "wait for %i video frame processing threads",
357                        video_threads.count);
358         for (n = 0; n < video_threads.count; n++)
359             pthread_join(video_threads.threads[n], &status);
360         if (verbose >= TC_CLEANUP)
361             tc_log_msg(__FILE__, "video frame processing threads canceled");
362     }
363 }
364 
365 
366 /*************************************************************************/
367 
368 /*
369  * Local variables:
370  *   c-file-style: "stroustrup"
371  *   c-file-offsets: ((case-label . *) (statement-case-intro . *))
372  *   indent-tabs-mode: nil
373  * End:
374  *
375  * vim: expandtab shiftwidth=4:
376  */
377