1 /**
2  * \file mlt_consumer.c
3  * \brief abstraction for all consumer services
4  * \see mlt_consumer_s
5  *
6  * Copyright (C) 2003-2021 Meltytech, LLC
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License as published by the Free Software Foundation; either
11  * version 2.1 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  */
22 
23 #include "mlt_consumer.h"
24 #include "mlt_factory.h"
25 #include "mlt_producer.h"
26 #include "mlt_frame.h"
27 #include "mlt_profile.h"
28 #include "mlt_log.h"
29 
30 #include <stdio.h>
31 #include <string.h>
32 #include <stdlib.h>
33 #include <sys/time.h>
34 #include <stdatomic.h>
35 
36 /** Define this if you want an automatic deinterlace (if necessary) when the
37  * consumer's producer is not running at normal speed.
38  */
39 #undef DEINTERLACE_ON_NOT_NORMAL_SPEED
40 
41 /** This is not the ideal place for this, but it is needed by VDPAU as well.
42  */
43 pthread_mutex_t mlt_sdl_mutex = PTHREAD_MUTEX_INITIALIZER;
44 
45 /** \brief private members of mlt_consumer */
46 
47 typedef struct
48 {
49 	int real_time;
50 	atomic_int ahead;
51 	int preroll;
52 	mlt_image_format image_format;
53 	mlt_audio_format audio_format;
54 	mlt_deque queue;
55 	void *ahead_thread;
56 	pthread_mutex_t queue_mutex;
57 	pthread_cond_t queue_cond;
58 	pthread_mutex_t put_mutex;
59 	pthread_cond_t put_cond;
60 	mlt_frame put;
61 	int put_active;
62 	mlt_event event_listener;
63 	mlt_position position;
64 	pthread_mutex_t position_mutex;
65 	int is_purge;
66 	int aud_counter;
67 	double fps;
68 	int channels;
69 	int frequency;
70 	atomic_int speed;
71 	/* additional fields added for the parallel work queue */
72 	mlt_deque worker_threads;
73 	pthread_mutex_t done_mutex;
74 	pthread_cond_t done_cond;
75 	int consecutive_dropped;
76 	int consecutive_rendered;
77 	int process_head;
78 	atomic_int started;
79 	pthread_t *threads; /**< used to deallocate all threads */
80 }
81 consumer_private;
82 
83 static void mlt_consumer_property_changed(mlt_properties owner, mlt_consumer self, mlt_event_data );
84 static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties );
85 static void on_consumer_frame_show(mlt_properties owner, mlt_consumer self, mlt_event_data );
86 static void mlt_thread_create( mlt_consumer self, mlt_thread_function_t function );
87 static void mlt_thread_join( mlt_consumer self );
88 static void consumer_read_ahead_start( mlt_consumer self );
89 
90 /** Initialize a consumer service.
91  *
92  * \public \memberof mlt_consumer_s
93  * \param self the consumer to initialize
94  * \param child a pointer to the object for the subclass
95  * \param profile the \p mlt_profile_s to use (optional but recommended,
96  * uses the environment variable MLT if self is NULL)
97  * \return true if there was an error
98  */
99 
mlt_consumer_init(mlt_consumer self,void * child,mlt_profile profile)100 int mlt_consumer_init( mlt_consumer self, void *child, mlt_profile profile )
101 {
102 	int error = 0;
103 	memset( self, 0, sizeof( struct mlt_consumer_s ) );
104 	self->child = child;
105 	consumer_private *priv = self->local = calloc( 1, sizeof( consumer_private ) );
106 
107 	error = mlt_service_init( &self->parent, self );
108 	if ( error == 0 )
109 	{
110 		// Get the properties from the service
111 		mlt_properties properties = MLT_SERVICE_PROPERTIES( &self->parent );
112 
113 		// Apply profile to properties
114 		if ( profile == NULL )
115 		{
116 			// Normally the application creates the profile and controls its lifetime
117 			// This is the fallback exception handling
118 			profile = mlt_profile_init( NULL );
119 			mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
120 			mlt_properties_set_data( properties, "_profile", profile, 0, (mlt_destructor)mlt_profile_close, NULL );
121 		}
122 		apply_profile_properties( self, profile, properties );
123 		mlt_properties_set( properties, "mlt_type", "consumer" );
124 
125 		// Default rescaler for all consumers
126 		mlt_properties_set( properties, "rescale", "bilinear" );
127 
128 		// Default read ahead buffer size
129 		mlt_properties_set_int( properties, "buffer", 25 );
130 		mlt_properties_set_int( properties, "drop_max", 5 );
131 
132 		// Default audio frequency and channels
133 		mlt_properties_set_int( properties, "frequency", 48000 );
134 		mlt_properties_set_int( properties, "channels", 2 );
135 
136 		// Default of all consumers is real time
137 		mlt_properties_set_int( properties, "real_time", 1 );
138 
139 		// Default to environment test card
140 		mlt_properties_set( properties, "test_card", mlt_environment( "MLT_TEST_CARD" ) );
141 
142 		// Hmm - default all consumers to yuv422 with s16 :-/
143 		priv->image_format = mlt_image_yuv422;
144 		priv->audio_format = mlt_audio_s16;
145 
146 		mlt_events_register( properties, "consumer-frame-show" );
147 		mlt_events_register( properties, "consumer-frame-render" );
148 		mlt_events_register( properties, "consumer-thread-started" );
149 		mlt_events_register( properties, "consumer-thread-stopped" );
150 		mlt_events_register( properties, "consumer-stopping" );
151 		mlt_events_register( properties, "consumer-stopped" );
152 		mlt_events_register( properties, "consumer-thread-create" );
153 		mlt_events_register( properties, "consumer-thread-join" );
154 		mlt_events_listen( properties, self, "consumer-frame-show", ( mlt_listener )on_consumer_frame_show );
155 
156 		// Register a property-changed listener to handle the profile property -
157 		// subsequent properties can override the profile
158 		priv->event_listener = mlt_events_listen( properties, self, "property-changed", ( mlt_listener )mlt_consumer_property_changed );
159 
160 		// Create the push mutex and condition
161 		pthread_mutex_init( &priv->put_mutex, NULL );
162 		pthread_cond_init( &priv->put_cond, NULL );
163 
164 		pthread_mutex_init( &priv->position_mutex, NULL );
165 	}
166 	return error;
167 }
168 
169 /** Convert the profile into properties on the consumer.
170  *
171  * \private \memberof mlt_consumer_s
172  * \param self a consumer
173  * \param profile a profile
174  * \param properties a properties list (typically, the consumer's)
175  */
176 
apply_profile_properties(mlt_consumer self,mlt_profile profile,mlt_properties properties)177 static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties )
178 {
179 	consumer_private *priv = self->local;
180 	mlt_event_block( priv->event_listener );
181 	mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
182 	mlt_properties_set_int( properties, "frame_rate_num", profile->frame_rate_num );
183 	mlt_properties_set_int( properties, "frame_rate_den", profile->frame_rate_den );
184 	mlt_properties_set_int( properties, "width", profile->width );
185 	mlt_properties_set_int( properties, "height", profile->height );
186 	mlt_properties_set_int( properties, "progressive", profile->progressive );
187 	mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
188 	mlt_properties_set_int( properties, "sample_aspect_num", profile->sample_aspect_num );
189 	mlt_properties_set_int( properties, "sample_aspect_den", profile->sample_aspect_den );
190 	mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
191 	mlt_properties_set_int( properties, "display_aspect_num", profile->display_aspect_num );
192 	mlt_properties_set_int( properties, "display_aspect_den", profile->display_aspect_den );
193 	mlt_properties_set_int( properties, "colorspace", profile->colorspace );
194 	mlt_event_unblock( priv->event_listener );
195 }
196 
197 /** The property-changed event listener
198  *
199  * \private \memberof mlt_consumer_s
200  * \param owner the events object
201  * \param self the consumer
202  * \param name the name of the property that changed
203  */
204 
mlt_consumer_property_changed(mlt_properties owner,mlt_consumer self,mlt_event_data event_data)205 static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, mlt_event_data event_data )
206 {
207 	const char *name = mlt_event_data_to_string(event_data);
208 	if ( name && !strcmp( name, "mlt_profile" ) )
209 	{
210 		// Get the properties
211 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
212 
213 		// Get the current profile
214 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
215 
216 		// Load the new profile
217 		mlt_profile new_profile = mlt_profile_init( mlt_properties_get( properties, name ) );
218 
219 		if ( new_profile )
220 		{
221 			// Copy the profile
222 			if ( profile != NULL )
223 			{
224 				free( profile->description );
225 				memcpy( profile, new_profile, sizeof( struct mlt_profile_s ) );
226 				profile->description = strdup( new_profile->description );
227 			}
228 			else
229 			{
230 				profile = new_profile;
231 			}
232 
233 			// Apply to properties
234 			apply_profile_properties( self, profile, properties );
235 			mlt_profile_close( new_profile );
236 		}
237  	}
238 	else if ( !strcmp( name, "frame_rate_num" ) )
239 	{
240 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
241 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
242 		if ( profile )
243 		{
244 			profile->frame_rate_num = mlt_properties_get_int( properties, "frame_rate_num" );
245 			mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
246 		}
247 	}
248 	else if ( !strcmp( name, "frame_rate_den" ) )
249 	{
250 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
251 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
252 		if ( profile )
253 		{
254 			profile->frame_rate_den = mlt_properties_get_int( properties, "frame_rate_den" );
255 			mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
256 		}
257 	}
258 	else if ( !strcmp( name, "width" ) )
259 	{
260 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
261 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
262 		if ( profile )
263 			profile->width = mlt_properties_get_int( properties, "width" );
264 	}
265 	else if ( !strcmp( name, "height" ) )
266 	{
267 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
268 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
269 		if ( profile )
270 			profile->height = mlt_properties_get_int( properties, "height" );
271 	}
272 	else if ( !strcmp( name, "progressive" ) )
273 	{
274 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
275 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
276 		if ( profile )
277 			profile->progressive = mlt_properties_get_int( properties, "progressive" );
278 	}
279 	else if ( !strcmp( name, "sample_aspect_num" ) )
280 	{
281 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
282 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
283 		if ( profile )
284 		{
285 			profile->sample_aspect_num = mlt_properties_get_int( properties, "sample_aspect_num" );
286 			mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
287 		}
288 	}
289 	else if ( !strcmp( name, "sample_aspect_den" ) )
290 	{
291 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
292 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
293 		if ( profile )
294 		{
295 			profile->sample_aspect_den = mlt_properties_get_int( properties, "sample_aspect_den" );
296 			mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
297 		}
298 	}
299 	else if ( !strcmp( name, "display_aspect_num" ) )
300 	{
301 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
302 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
303 		if ( profile )
304 		{
305 			profile->display_aspect_num = mlt_properties_get_int( properties, "display_aspect_num" );
306 			mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
307 		}
308 	}
309 	else if ( !strcmp( name, "display_aspect_den" ) )
310 	{
311 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
312 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
313 		if ( profile )
314 		{
315 			profile->display_aspect_den = mlt_properties_get_int( properties, "display_aspect_den" );
316 			mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
317 		}
318 	}
319 	else if ( !strcmp( name, "colorspace" ) )
320 	{
321 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
322 		mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
323 		if ( profile )
324 			profile->colorspace = mlt_properties_get_int( properties, "colorspace" );
325 	}
326 }
327 
328 /** A listener on the consumer-frame-show event
329  *
330  * Saves the position of the frame shown.
331  *
332  * \private \memberof mlt_consumer_s
333  * \param owner the events object
334  * \param consumer the consumer on which this event occurred
335  * \param frame the frame that was shown
336  */
337 
on_consumer_frame_show(mlt_properties owner,mlt_consumer consumer,mlt_event_data event_data)338 static void on_consumer_frame_show( mlt_properties owner, mlt_consumer consumer, mlt_event_data event_data )
339 {
340 	mlt_frame frame = mlt_event_data_to_frame(event_data);
341 	if ( frame ) {
342 		consumer_private* priv = consumer->local;
343 		pthread_mutex_lock( &priv->position_mutex );
344 		priv->position = mlt_frame_get_position( frame );
345 		pthread_mutex_unlock( &priv->position_mutex );
346 	}
347 }
348 
349 /** Create a new consumer.
350  *
351  * \public \memberof mlt_consumer_s
352  * \param profile a profile (optional, but recommended)
353  * \return a new consumer
354  */
355 
mlt_consumer_new(mlt_profile profile)356 mlt_consumer mlt_consumer_new( mlt_profile profile )
357 {
358 	// Create the memory for the structure
359 	mlt_consumer self = malloc( sizeof( struct mlt_consumer_s ) );
360 
361 	// Initialise it
362 	if ( self != NULL && mlt_consumer_init( self, NULL, profile ) == 0 )
363 	{
364 		// Return it
365 		return self;
366 	}
367 	else
368 	{
369 		free(self);
370 		return NULL;
371 	}
372 }
373 
374 /** Get the parent service object.
375  *
376  * \public \memberof mlt_consumer_s
377  * \param self a consumer
378  * \return the parent service class
379  * \see MLT_CONSUMER_SERVICE
380  */
381 
mlt_consumer_service(mlt_consumer self)382 mlt_service mlt_consumer_service( mlt_consumer self )
383 {
384 	return self != NULL ? &self->parent : NULL;
385 }
386 
387 /** Get the consumer properties.
388  *
389  * \public \memberof mlt_consumer_s
390  * \param self a consumer
391  * \return the consumer's properties list
392  * \see MLT_CONSUMER_PROPERTIES
393  */
394 
mlt_consumer_properties(mlt_consumer self)395 mlt_properties mlt_consumer_properties( mlt_consumer self )
396 {
397 	return self != NULL ? MLT_SERVICE_PROPERTIES( &self->parent ) : NULL;
398 }
399 
400 /** Connect the consumer to the producer.
401  *
402  * \public \memberof mlt_consumer_s
403  * \param self a consumer
404  * \param producer a producer
405  * \return > 0 warning, == 0 success, < 0 serious error,
406  *         1 = this service does not accept input,
407  *         2 = the producer is invalid,
408  *         3 = the producer is already registered with this consumer
409  */
410 
mlt_consumer_connect(mlt_consumer self,mlt_service producer)411 int mlt_consumer_connect( mlt_consumer self, mlt_service producer )
412 {
413 	return mlt_service_connect_producer( &self->parent, producer, 0 );
414 }
415 
416 /** Set the audio format to use in the render thread.
417  *
418  * \private \memberof mlt_consumer_s
419  * \param self a consumer
420  */
421 
set_audio_format(mlt_consumer self)422 static void set_audio_format( mlt_consumer self )
423 {
424 	// Get the audio format to use for rendering threads.
425 	consumer_private *priv = self->local;
426 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
427 	const char *format = mlt_properties_get( properties, "mlt_audio_format" );
428 	if ( format )
429 	{
430 		if ( !strcmp( format, "none" ) )
431 			priv->audio_format = mlt_audio_none;
432 		else if ( !strcmp( format, "s32" ) )
433 			priv->audio_format = mlt_audio_s32;
434 		else if ( !strcmp( format, "s32le" ) )
435 			priv->audio_format = mlt_audio_s32le;
436 		else if ( !strcmp( format, "float" ) )
437 			priv->audio_format = mlt_audio_float;
438 		else if ( !strcmp( format, "f32le" ) )
439 			priv->audio_format = mlt_audio_f32le;
440 		else if ( !strcmp( format, "u8" ) )
441 			priv->audio_format = mlt_audio_u8;
442 	}
443 }
444 
445 /** Set the image format to use in render threads.
446  *
447  * \private \memberof mlt_consumer_s
448  * \param self a consumer
449  */
450 
set_image_format(mlt_consumer self)451 static void set_image_format( mlt_consumer self )
452 {
453 	// Get the image format to use for rendering threads.
454 	consumer_private *priv = self->local;
455 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
456 	const char* format = mlt_properties_get( properties, "mlt_image_format" );
457 	if ( format )
458 	{
459 		priv->image_format = mlt_image_format_id( format );
460 		if ( mlt_image_invalid == priv->image_format )
461 			priv->image_format = mlt_image_yuv422;
462 		// mlt_image_movit is for internal use only.
463 		// Remapping it glsl_texture prevents breaking existing apps
464 		// using the legacy "glsl" name.
465 		else if ( mlt_image_movit == priv->image_format )
466 			priv->image_format = mlt_image_opengl_texture;
467 	}
468 }
469 
470 /** Start the consumer.
471  *
472  * \public \memberof mlt_consumer_s
473  * \param self a consumer
474  * \return true if there was an error
475  */
476 
mlt_consumer_start(mlt_consumer self)477 int mlt_consumer_start( mlt_consumer self )
478 {
479 	if (!self) {
480 		return 1;
481 	}
482 
483 	int error = 0;
484 
485 	if ( !mlt_consumer_is_stopped( self ) )
486 		return error;
487 
488 	consumer_private *priv = self->local;
489 
490 	// Stop listening to the property-changed event
491 	mlt_event_block( priv->event_listener );
492 
493 	// Get the properties
494 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
495 
496 	// Determine if there's a test card producer
497 	char *test_card = mlt_properties_get( properties, "test_card" );
498 
499 	// Just to make sure nothing is hanging around...
500 	pthread_mutex_lock( &priv->put_mutex );
501 	priv->put = NULL;
502 	priv->put_active = 1;
503 	pthread_mutex_unlock( &priv->put_mutex );
504 
505 	// Deal with it now.
506 	if ( test_card != NULL )
507 	{
508 		if ( mlt_properties_get_data( properties, "test_card_producer", NULL ) == NULL )
509 		{
510 			// Create a test card producer
511 			mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
512 			mlt_producer producer = mlt_factory_producer( profile, NULL, test_card );
513 
514 			// Do we have a producer
515 			if ( producer != NULL )
516 			{
517 				// Test card should loop I guess...
518 				mlt_properties_set( MLT_PRODUCER_PROPERTIES( producer ), "eof", "loop" );
519 				//mlt_producer_set_speed( producer, 0 );
520 				//mlt_producer_set_in_and_out( producer, 0, 0 );
521 
522 				// Set the test card on the consumer
523 				mlt_properties_set_data( properties, "test_card_producer", producer, 0, ( mlt_destructor )mlt_producer_close, NULL );
524 			}
525 		}
526 	}
527 	else
528 	{
529 		// Allow the hash table to speed things up
530 		mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
531 	}
532 
533 	// The profile could have changed between a stop and a restart.
534 	apply_profile_properties( self, mlt_service_profile( MLT_CONSUMER_SERVICE(self) ), properties );
535 
536 	// Set the frame duration in microseconds for the frame-dropping heuristic
537 	int frame_rate_num = mlt_properties_get_int( properties, "frame_rate_num" );
538 	int frame_rate_den = mlt_properties_get_int( properties, "frame_rate_den" );
539 	int frame_duration = 0;
540 
541 	if ( frame_rate_num && frame_rate_den )
542 	{
543 		frame_duration = 1000000 / frame_rate_num * frame_rate_den;
544 	}
545 
546 	mlt_properties_set_int( properties, "frame_duration", frame_duration );
547 	mlt_properties_set_int( properties, "drop_count", 0 );
548 
549 	// Check and run an ante command
550 	if ( mlt_properties_get( properties, "ante" ) )
551 		if ( system( mlt_properties_get( properties, "ante" ) ) == -1 )
552 			mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_ERROR, "system(%s) failed!\n", mlt_properties_get( properties, "ante" ) );
553 
554 	// Set the real_time preference
555 	priv->real_time = mlt_properties_get_int( properties, "real_time" );
556 
557 	// For worker threads implementation, buffer must be at least # threads
558 	if ( abs( priv->real_time ) > 1 && mlt_properties_get_int( properties, "buffer" ) <= abs( priv->real_time ) )
559 		mlt_properties_set_int( properties, "_buffer", abs( priv->real_time ) + 1 );
560 
561 	// Store the parameters for audio processing.
562 	priv->aud_counter = 0;
563 	priv->fps = mlt_properties_get_double( properties, "fps" );
564 	priv->channels = mlt_properties_get_int( properties, "channels" );
565 	priv->frequency = mlt_properties_get_int( properties, "frequency" );
566 	priv->preroll = 1;
567 
568 #ifdef _WIN32
569 	if ( priv->real_time == 1 || priv->real_time == -1 )
570 		consumer_read_ahead_start( self );
571 #endif
572 
573 	// Start the service
574 	if ( self->start != NULL )
575 		error = self->start( self );
576 
577 	return error;
578 }
579 
580 /** An alternative method to feed frames into the consumer.
581  *
582  * Only valid if the consumer itself is not connected.
583  *
584  * \public \memberof mlt_consumer_s
585  * \param self a consumer
586  * \param frame a frame
587  * \return true (ignore self for now)
588  */
589 
mlt_consumer_put_frame(mlt_consumer self,mlt_frame frame)590 int mlt_consumer_put_frame( mlt_consumer self, mlt_frame frame )
591 {
592 	int error = 1;
593 
594 	// Get the service associated to the consumer
595 	mlt_service service = MLT_CONSUMER_SERVICE( self );
596 
597 	if ( mlt_service_producer( service ) == NULL )
598 	{
599 		struct timeval now;
600 		struct timespec tm;
601 		consumer_private *priv = self->local;
602 
603 		mlt_properties_set_int( MLT_CONSUMER_PROPERTIES(self), "put_pending", 1 );
604 		pthread_mutex_lock( &priv->put_mutex );
605 		while ( priv->put_active && priv->put != NULL )
606 		{
607 			gettimeofday( &now, NULL );
608 			tm.tv_sec = now.tv_sec + 1;
609 			tm.tv_nsec = now.tv_usec * 1000;
610 			pthread_cond_timedwait( &priv->put_cond, &priv->put_mutex, &tm );
611 		}
612 		mlt_properties_set_int( MLT_CONSUMER_PROPERTIES(self), "put_pending", 0 );
613 		if ( priv->put_active && priv->put == NULL )
614 			priv->put = frame;
615 		else
616 			mlt_frame_close( frame );
617 		pthread_cond_broadcast( &priv->put_cond );
618 		pthread_mutex_unlock( &priv->put_mutex );
619 	}
620 	else
621 	{
622 		mlt_frame_close( frame );
623 	}
624 
625 	return error;
626 }
627 
628 /** Protected method for consumer to get frames from connected service
629  *
630  * \public \memberof mlt_consumer_s
631  * \param self a consumer
632  * \return a frame
633  */
634 
mlt_consumer_get_frame(mlt_consumer self)635 mlt_frame mlt_consumer_get_frame( mlt_consumer self )
636 {
637 	// Frame to return
638 	mlt_frame frame = NULL;
639 
640 	// Get the service associated to the consumer
641 	mlt_service service = MLT_CONSUMER_SERVICE( self );
642 
643 	// Get the consumer properties
644 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
645 
646 	// Get the frame
647 	if ( mlt_service_producer( service ) == NULL && mlt_properties_get_int( properties, "put_mode" ) )
648 	{
649 		struct timeval now;
650 		struct timespec tm;
651 		consumer_private *priv = self->local;
652 
653 		pthread_mutex_lock( &priv->put_mutex );
654 		while ( priv->put_active && priv->put == NULL )
655 		{
656 			gettimeofday( &now, NULL );
657 			tm.tv_sec = now.tv_sec + 1;
658 			tm.tv_nsec = now.tv_usec * 1000;
659 			pthread_cond_timedwait( &priv->put_cond, &priv->put_mutex, &tm );
660 		}
661 		frame = priv->put;
662 		priv->put = NULL;
663 		pthread_cond_broadcast( &priv->put_cond );
664 		pthread_mutex_unlock( &priv->put_mutex );
665 		if ( frame != NULL )
666 			mlt_service_apply_filters( service, frame, 0 );
667 	}
668 	else if ( mlt_service_producer( service ) != NULL )
669 	{
670 		mlt_service_get_frame( service, &frame, 0 );
671 	}
672 	else
673 	{
674 		frame = mlt_frame_init( service );
675 	}
676 
677 	if ( frame != NULL )
678 	{
679 		// Get the frame properties
680 		mlt_properties frame_properties = MLT_FRAME_PROPERTIES( frame );
681 
682 		// Get the test card producer
683 		mlt_producer test_card = mlt_properties_get_data( properties, "test_card_producer", NULL );
684 
685 		// Attach the test frame producer to it.
686 		if ( test_card != NULL )
687 			mlt_properties_set_data( frame_properties, "test_card_producer", test_card, 0, NULL, NULL );
688 
689 		// Pass along the interpolation and deinterlace options
690 		// TODO: get rid of consumer_deinterlace and use profile.progressive
691 		mlt_properties_set( frame_properties, "rescale.interp", mlt_properties_get( properties, "rescale" ) );
692 		mlt_properties_set_int( frame_properties, "consumer_deinterlace", mlt_properties_get_int( properties, "progressive" ) | mlt_properties_get_int( properties, "deinterlace" ) );
693 		mlt_properties_set( frame_properties, "deinterlace_method", mlt_properties_get( properties, "deinterlace_method" ) );
694 		mlt_properties_set_int( frame_properties, "consumer_tff", mlt_properties_get_int( properties, "top_field_first" ) );
695 		mlt_properties_set( frame_properties, "consumer_color_trc", mlt_properties_get( properties, "color_trc" ) );
696 		mlt_properties_set( frame_properties, "consumer_channel_layout", mlt_properties_get( properties, "channel_layout" ) );
697 	}
698 
699 	// Return the frame
700 	return frame;
701 }
702 
703 /** Compute the time difference between now and a time value.
704  *
705  * \private \memberof mlt_consumer_s
706  * \param time1 a time value to be compared against now
707  * \return the difference in microseconds
708  */
709 
time_difference(struct timeval * time1)710 static inline long time_difference( struct timeval *time1 )
711 {
712 	struct timeval time2;
713 	time2.tv_sec = time1->tv_sec;
714 	time2.tv_usec = time1->tv_usec;
715 	gettimeofday( time1, NULL );
716 	return time1->tv_sec * 1000000 + time1->tv_usec - time2.tv_sec * 1000000 - time2.tv_usec;
717 }
718 
719 /** The thread procedure for asynchronously pulling frames through the service
720  * network connected to a consumer.
721  *
722  * \private \memberof mlt_consumer_s
723  * \param arg a consumer
724  */
725 
consumer_read_ahead_thread(void * arg)726 static void *consumer_read_ahead_thread( void *arg )
727 {
728 	// The argument is the consumer
729 	mlt_consumer self = arg;
730 	consumer_private *priv = self->local;
731 
732 	// Get the properties of the consumer
733 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
734 
735 	// Get the width and height
736 	int width = mlt_properties_get_int( properties, "width" );
737 	int height = mlt_properties_get_int( properties, "height" );
738 
739 	// See if video is turned off
740 	int video_off = mlt_properties_get_int( properties, "video_off" );
741 	int preview_off = mlt_properties_get_int( properties, "preview_off" );
742 	int preview_format = mlt_properties_get_int( properties, "preview_format" );
743 
744 	// Audio processing variables
745 	int samples = 0;
746 	void *audio = NULL;
747 
748 	// See if audio is turned off
749 	int audio_off = mlt_properties_get_int( properties, "audio_off" );
750 
751 	// General frame variable
752 	mlt_frame frame = NULL;
753 	uint8_t *image = NULL;
754 
755 	// Time structures
756 	struct timeval ante;
757 
758 	// Average time for get_frame and get_image
759 	int count = 0;
760 	int skipped = 0;
761 	int64_t time_process = 0;
762 	int skip_next = 0;
763 	mlt_position pos = 0;
764 	mlt_position start_pos = 0;
765 	mlt_position last_pos = 0;
766 	int frame_duration = mlt_properties_get_int( properties, "frame_duration" );
767 	int drop_max = mlt_properties_get_int( properties, "drop_max" );
768 
769 	if ( preview_off && preview_format != 0 )
770 		priv->image_format = preview_format;
771 
772 	set_audio_format( self );
773 	set_image_format( self );
774 
775 	mlt_events_fire( properties, "consumer-thread-started", mlt_event_data_none() );
776 
777 	// Get the first frame
778 	frame = mlt_consumer_get_frame( self );
779 	priv->speed = mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" );
780 
781 	if ( frame )
782 	{
783 		// Get the audio of the first frame
784 		if ( !audio_off )
785 		{
786 			samples = mlt_audio_calculate_frame_samples( priv->fps, priv->frequency, priv->aud_counter++ );
787 			mlt_frame_get_audio( frame, &audio, &priv->audio_format, &priv->frequency, &priv->channels, &samples );
788 		}
789 
790 		// Get the image of the first frame
791 		if ( !video_off )
792 		{
793 			mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", mlt_event_data_from_frame(frame) );
794 			mlt_frame_get_image( frame, &image, &priv->image_format, &width, &height, 0 );
795 		}
796 
797 		// Mark as rendered
798 		mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
799 		last_pos = start_pos = pos = mlt_frame_get_position( frame );
800 	}
801 
802 	// Get the starting time (can ignore the times above)
803 	gettimeofday( &ante, NULL );
804 
805 	// Continue to read ahead
806 	while ( priv->ahead )
807 	{
808 		// Get the maximum size of the buffer
809 		int buffer = (priv->speed == 0) ? 1 : MAX(mlt_properties_get_int( properties, "buffer" ), 0) + 1;
810 
811 		// Put the current frame into the queue
812 		pthread_mutex_lock( &priv->queue_mutex );
813 		while( priv->ahead && mlt_deque_count( priv->queue ) >= buffer )
814 			pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
815 		if ( priv->is_purge )
816 		{
817 			mlt_frame_close( frame );
818 			priv->is_purge = 0;
819 		}
820 		else
821 		{
822 			mlt_deque_push_back( priv->queue, frame );
823 		}
824 		pthread_cond_broadcast( &priv->queue_cond );
825 		pthread_mutex_unlock( &priv->queue_mutex );
826 
827 		mlt_log_timings_begin();
828 		// Get the next frame
829 		frame = mlt_consumer_get_frame( self );
830 		mlt_log_timings_end( NULL, "mlt_consumer_get_frame" );
831 
832 		// If there's no frame, we're probably stopped...
833 		if ( frame == NULL )
834 			continue;
835 		pos = mlt_frame_get_position( frame );
836 		priv->speed = mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" );
837 
838 		// WebVfx uses this to setup a consumer-stopping event handler.
839 		mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
840 
841 		// Increment the counter used for averaging processing cost
842 		count ++;
843 
844 		// Always process audio
845 		if ( !audio_off )
846 		{
847 			samples = mlt_audio_calculate_frame_samples( priv->fps, priv->frequency, priv->aud_counter++ );
848 			mlt_frame_get_audio( frame, &audio, &priv->audio_format, &priv->frequency, &priv->channels, &samples );
849 		}
850 
851 		// All non-normal playback frames should be shown
852 		if ( priv->speed != 1 )
853 		{
854 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
855 			mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
856 #endif
857 			// Indicate seeking or trick-play
858 			start_pos = pos;
859 		}
860 
861 		// If skip flag not set or frame-dropping disabled
862 		if ( !skip_next || priv->real_time == -1 )
863 		{
864 			if ( !video_off )
865 			{
866 				// Reset width/height - could have been changed by previous mlt_frame_get_image
867 				width = mlt_properties_get_int( properties, "width" );
868 				height = mlt_properties_get_int( properties, "height" );
869 
870 				// Get the image
871 				mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", mlt_event_data_from_frame(frame) );
872 				mlt_log_timings_begin();
873 				mlt_frame_get_image( frame, &image, &priv->image_format, &width, &height, 0 );
874 				mlt_log_timings_end( NULL, "mlt_frame_get_image" );
875 			}
876 
877 			// Indicate the rendered image is available.
878 			mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
879 
880 			// Reset consecutively-skipped counter
881 			skipped = 0;
882 		}
883 		else // Skip image processing
884 		{
885 			// Increment the number of consecutively-skipped frames
886 			skipped++;
887 
888 			// If too many (1 sec) consecutively-skipped frames
889 			if ( skipped > drop_max )
890 			{
891 				// Reset cost tracker
892 				time_process = 0;
893 				count = 1;
894 				mlt_log_verbose( self, "too many frames dropped - forcing next frame\n" );
895 			}
896 		}
897 
898 		// Get the time to process this frame
899 		int64_t time_current = time_difference( &ante );
900 
901 		// If the current time is not suddenly some large amount
902 		if ( time_current < time_process / count * 20 || !time_process || count < 5 )
903 		{
904 			// Accumulate the cost for processing this frame
905 			time_process += time_current;
906 		}
907 		else
908 		{
909 			mlt_log_debug( self, "current %"PRId64" threshold %"PRId64" count %d\n",
910 				time_current, (int64_t) (time_process / count * 20), count );
911 			// Ignore the cost of this frame's time
912 			count--;
913 		}
914 
915 		// Determine if we started, resumed, or seeked
916 		if ( pos != last_pos + 1 )
917 			start_pos = pos;
918 		last_pos = pos;
919 
920 		// Do not skip the first 20% of buffer at start, resume, or seek
921 		if ( pos - start_pos <= buffer / 5 + 1 )
922 		{
923 			// Reset cost tracker
924 			time_process = 0;
925 			count = 1;
926 		}
927 
928 		// Reset skip flag
929 		skip_next = 0;
930 
931 		// Only consider skipping if the buffer level is low (or really small)
932 		if ( mlt_deque_count( priv->queue ) <= buffer / 5 + 1 && count > 1 )
933 		{
934 			// Skip next frame if average cost exceeds frame duration.
935 			if ( time_process / count > frame_duration )
936 				skip_next = 1;
937 			if ( skip_next )
938 				mlt_log_debug( self, "avg usec %"PRId64" (%"PRId64"/%d) duration %d\n",
939 					time_process/count, time_process, count, frame_duration);
940 		}
941 	}
942 
943 	// Remove the last frame
944 	mlt_frame_close( frame );
945 
946 	// Wipe the queue
947 	pthread_mutex_lock( &priv->queue_mutex );
948 	while ( mlt_deque_count( priv->queue ) )
949 		mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
950 
951 	// Close the queue
952 	mlt_deque_close( priv->queue );
953 	priv->queue = NULL;
954 	pthread_mutex_unlock( &priv->queue_mutex );
955 
956 	mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", mlt_event_data_none() );
957 
958 	return NULL;
959 }
960 
961 /** Locate the first unprocessed frame in the queue.
962  *
963  * When playing with realtime behavior, we do not use the true head, but
964  * rather an adjusted process_head. The process_head is adjusted based on
965  * the rate of frame-dropping or recovery from frame-dropping. The idea is
966  * that as the level of frame-dropping increases to move the process_head
967  * closer to the tail because the frames are not completing processing prior
968  * to their playout! Then, as frames are not dropped the process_head moves
969  * back closer to the head of the queue so that worker threads can work
970  * ahead of the playout point (queue head).
971  *
972  * \private \memberof mlt_consumer_s
973  * \param self a consumer
974  * \return an index into the queue
975  */
976 
first_unprocessed_frame(mlt_consumer self)977 static inline int first_unprocessed_frame( mlt_consumer self )
978 {
979 	consumer_private *priv = self->local;
980 	int index = priv->real_time <= 0 ? 0 : priv->process_head;
981 	while ( index < mlt_deque_count( priv->queue ) && MLT_FRAME( mlt_deque_peek( priv->queue, index ) )->is_processing )
982 		index++;
983 	return index;
984 }
985 
986 /** The worker thread procedure for parallel processing frames.
987  *
988  * \private \memberof mlt_consumer_s
989  * \param arg a consumer
990  */
991 
consumer_worker_thread(void * arg)992 static void *consumer_worker_thread( void *arg )
993 {
994 	// The argument is the consumer
995 	mlt_consumer self = arg;
996 	consumer_private *priv = self->local;
997 
998 	// Get the properties of the consumer
999 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1000 
1001 	// Get the width and height
1002 	int width = mlt_properties_get_int( properties, "width" );
1003 	int height = mlt_properties_get_int( properties, "height" );
1004 	mlt_image_format format = priv->image_format;
1005 
1006 	// See if video is turned off
1007 	int video_off = mlt_properties_get_int( properties, "video_off" );
1008 	int preview_off = mlt_properties_get_int( properties, "preview_off" );
1009 	int preview_format = mlt_properties_get_int( properties, "preview_format" );
1010 
1011 	// General frame variable
1012 	mlt_frame frame = NULL;
1013 	uint8_t *image = NULL;
1014 
1015 	if ( preview_off && preview_format != 0 )
1016 		format = preview_format;
1017 
1018 	mlt_events_fire( properties, "consumer-thread-started", mlt_event_data_none() );
1019 
1020 	// Continue to read ahead
1021 	while ( priv->ahead )
1022 	{
1023 		// Get the next unprocessed frame from the work queue
1024 		pthread_mutex_lock( &priv->queue_mutex );
1025 		int index = first_unprocessed_frame( self );
1026 		while ( priv->ahead && index >= mlt_deque_count( priv->queue ) )
1027 		{
1028 			mlt_log_debug( MLT_CONSUMER_SERVICE(self), "waiting in worker index = %d queue count = %d\n",
1029 				index, mlt_deque_count( priv->queue ) );
1030 			pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
1031 			index = first_unprocessed_frame( self );
1032 		}
1033 
1034 		// Mark the frame for processing
1035 		frame = mlt_deque_peek( priv->queue, index );
1036 		if ( frame )
1037 		{
1038 			mlt_log_debug( MLT_CONSUMER_SERVICE(self), "worker processing index = %d frame " MLT_POSITION_FMT " queue count = %d\n",
1039 				index, mlt_frame_get_position(frame), mlt_deque_count( priv->queue ) );
1040 			frame->is_processing = 1;
1041 			mlt_properties_inc_ref( MLT_FRAME_PROPERTIES( frame ) );
1042 		}
1043 		pthread_mutex_unlock( &priv->queue_mutex );
1044 
1045 		// If there's no frame, we're probably stopped...
1046 		if ( frame == NULL )
1047 			continue;
1048 
1049 		// WebVfx uses this to setup a consumer-stopping event handler.
1050 		mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
1051 
1052 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
1053 		// All non normal playback frames should be shown
1054 		if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
1055 			mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
1056 #endif
1057 
1058 		// Get the image
1059 		if ( !video_off )
1060 		{
1061 			// Fetch width/height again
1062 			width = mlt_properties_get_int( properties, "width" );
1063 			height = mlt_properties_get_int( properties, "height" );
1064 			mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", mlt_event_data_from_frame(frame) );
1065 			mlt_frame_get_image( frame, &image, &format, &width, &height, 0 );
1066 		}
1067 		mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1068 		mlt_frame_close( frame );
1069 
1070 		// Tell a waiting thread (non-realtime main consumer thread) that we are done.
1071 		pthread_mutex_lock( &priv->done_mutex );
1072 		pthread_cond_broadcast( &priv->done_cond );
1073 		pthread_mutex_unlock( &priv->done_mutex );
1074 	}
1075 
1076 	return NULL;
1077 }
1078 
1079 /** Start the read/render thread.
1080  *
1081  * \private \memberof mlt_consumer_s
1082  * \param self a consumer
1083  */
1084 
consumer_read_ahead_start(mlt_consumer self)1085 static void consumer_read_ahead_start( mlt_consumer self )
1086 {
1087 	consumer_private *priv = self->local;
1088 
1089 	if ( priv->started )
1090 		return;
1091 
1092 	// We're running now
1093 	priv->ahead = 1;
1094 
1095 	// Create the frame queue
1096 	priv->queue = mlt_deque_init( );
1097 
1098 	// Create the queue mutex
1099 	pthread_mutex_init( &priv->queue_mutex, NULL );
1100 
1101 	// Create the condition
1102 	pthread_cond_init( &priv->queue_cond, NULL );
1103 
1104 	// Create the read ahead
1105 	mlt_thread_create( self, (mlt_thread_function_t) consumer_read_ahead_thread );
1106 	priv->started = 1;
1107 }
1108 
1109 /** Start the worker threads.
1110  *
1111  * \private \memberof mlt_consumer_s
1112  * \param self a consumer
1113  */
1114 
consumer_work_start(mlt_consumer self)1115 static void consumer_work_start( mlt_consumer self )
1116 {
1117 	consumer_private *priv = self->local;
1118 	int n = abs( priv->real_time );
1119 	pthread_t *thread;
1120 
1121 	if ( priv->started )
1122 		return;
1123 
1124 	thread = calloc( 1, sizeof( pthread_t ) * n );
1125 
1126 	// We're running now
1127 	priv->ahead = 1;
1128 	priv->threads = thread;
1129 
1130 	// These keep track of the acceleration of frame dropping or recovery.
1131 	priv->consecutive_dropped = 0;
1132 	priv->consecutive_rendered = 0;
1133 
1134 	// This is the position in the queue from which to look for a frame to process.
1135 	// If we always start from the head, then we may likely not complete processing
1136 	// before the frame is played out.
1137 	priv->process_head = 0;
1138 
1139 	// Create the queues
1140 	priv->queue = mlt_deque_init();
1141 	priv->worker_threads = mlt_deque_init();
1142 
1143 	// Create the mutexes
1144 	pthread_mutex_init( &priv->queue_mutex, NULL );
1145 	pthread_mutex_init( &priv->done_mutex, NULL );
1146 
1147 	// Create the conditions
1148 	pthread_cond_init( &priv->queue_cond, NULL );
1149 	pthread_cond_init( &priv->done_cond, NULL );
1150 
1151 	// Create the read ahead
1152 	if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
1153 	{
1154 
1155 		struct sched_param priority;
1156 		pthread_attr_t thread_attributes;
1157 
1158 		priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
1159 		pthread_attr_init( &thread_attributes );
1160 		pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
1161 		pthread_attr_setschedparam( &thread_attributes, &priority );
1162 		pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
1163 		pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
1164 
1165 		while ( n-- )
1166 		{
1167 			if ( pthread_create( thread, &thread_attributes, consumer_worker_thread, self ) < 0 ) {
1168 				if ( pthread_create( thread, NULL, consumer_worker_thread, self ) == 0 )
1169 					mlt_deque_push_back( priv->worker_threads, thread );
1170 			} else {
1171 				mlt_deque_push_back( priv->worker_threads, thread );
1172 			}
1173 			thread++;
1174 		}
1175 		pthread_attr_destroy( &thread_attributes );
1176 	}
1177 
1178 	else
1179 	{
1180 		while ( n-- )
1181 		{
1182 			if ( pthread_create( thread, NULL, consumer_worker_thread, self ) == 0 )
1183 				mlt_deque_push_back( priv->worker_threads, thread );
1184 			thread++;
1185 		}
1186 	}
1187 	priv->started = 1;
1188 }
1189 
1190 /** Stop the read/render thread.
1191  *
1192  * \private \memberof mlt_consumer_s
1193  * \param self a consumer
1194  */
1195 
consumer_read_ahead_stop(mlt_consumer self)1196 static void consumer_read_ahead_stop( mlt_consumer self )
1197 {
1198 	consumer_private *priv = self->local;
1199 
1200 	// Make sure we're running
1201 	int expected = 1;
1202 	if ( atomic_compare_exchange_strong( &priv->started, &expected, 0 ) )
1203 	{
1204 		// Inform thread to stop
1205 		priv->ahead = 0;
1206 		mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-stopping", mlt_event_data_none() );
1207 
1208 		// Broadcast to the condition in case it's waiting
1209 		pthread_mutex_lock( &priv->queue_mutex );
1210 		pthread_cond_broadcast( &priv->queue_cond );
1211 		pthread_mutex_unlock( &priv->queue_mutex );
1212 
1213 		// Broadcast to the put condition in case it's waiting
1214 		pthread_mutex_lock( &priv->put_mutex );
1215 		pthread_cond_broadcast( &priv->put_cond );
1216 		pthread_mutex_unlock( &priv->put_mutex );
1217 
1218 		// Join the thread
1219 		mlt_thread_join( self );
1220 
1221 		// Destroy the frame queue mutex
1222 		pthread_mutex_destroy( &priv->queue_mutex );
1223 
1224 		// Destroy the condition
1225 		pthread_cond_destroy( &priv->queue_cond );
1226 	}
1227 }
1228 
1229 /** Stop the worker threads.
1230  *
1231  * \private \memberof mlt_consumer_s
1232  * \param self a consumer
1233  */
1234 
consumer_work_stop(mlt_consumer self)1235 static void consumer_work_stop( mlt_consumer self )
1236 {
1237 	consumer_private *priv = self->local;
1238 
1239 	// Make sure we're running
1240 	int expected = 1;
1241 	if ( atomic_compare_exchange_strong( &priv->started, &expected, 0 ) )
1242 	{
1243 		// Inform thread to stop
1244 		priv->ahead = 0;
1245 		mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-stopping", mlt_event_data_none() );
1246 
1247 		// Broadcast to the queue condition in case it's waiting
1248 		pthread_mutex_lock( &priv->queue_mutex );
1249 		pthread_cond_broadcast( &priv->queue_cond );
1250 		pthread_mutex_unlock( &priv->queue_mutex );
1251 
1252 		// Broadcast to the put condition in case it's waiting
1253 		pthread_mutex_lock( &priv->put_mutex );
1254 		pthread_cond_broadcast( &priv->put_cond );
1255 		pthread_mutex_unlock( &priv->put_mutex );
1256 
1257 		// Broadcast to the done condition in case it's waiting
1258 		pthread_mutex_lock( &priv->done_mutex );
1259 		pthread_cond_broadcast( &priv->done_cond );
1260 		pthread_mutex_unlock( &priv->done_mutex );
1261 
1262 		// Join the threads
1263 		pthread_t *thread;
1264 		while ( ( thread = mlt_deque_pop_back( priv->worker_threads ) ) )
1265 			pthread_join( *thread, NULL );
1266 
1267 		// Deallocate the array of threads
1268 		free( priv->threads );
1269 
1270 		// Destroy the mutexes
1271 		pthread_mutex_destroy( &priv->queue_mutex );
1272 		pthread_mutex_destroy( &priv->done_mutex );
1273 
1274 		// Destroy the conditions
1275 		pthread_cond_destroy( &priv->queue_cond );
1276 		pthread_cond_destroy( &priv->done_cond );
1277 
1278 		// Wipe the queues
1279 		while ( mlt_deque_count( priv->queue ) )
1280 			mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
1281 
1282 		// Close the queues
1283 		mlt_deque_close( priv->queue );
1284 		mlt_deque_close( priv->worker_threads );
1285 
1286 		mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", mlt_event_data_none() );
1287 	}
1288 }
1289 
1290 /** Flush the read/render thread's buffer.
1291  *
1292  * \public \memberof mlt_consumer_s
1293  * \param self a consumer
1294  */
1295 
mlt_consumer_purge(mlt_consumer self)1296 void mlt_consumer_purge( mlt_consumer self )
1297 {
1298 	if ( self )
1299 	{
1300 		consumer_private *priv = self->local;
1301 
1302 		pthread_mutex_lock( &priv->put_mutex );
1303 		if ( priv->put ) {
1304 			mlt_frame_close( priv->put );
1305 			priv->put = NULL;
1306 		}
1307 		pthread_cond_broadcast( &priv->put_cond );
1308 		pthread_mutex_unlock( &priv->put_mutex );
1309 
1310 		if ( self->purge )
1311 			self->purge( self );
1312 
1313 		if ( priv->started && priv->real_time )
1314 			pthread_mutex_lock( &priv->queue_mutex );
1315 
1316 		while ( priv->started && mlt_deque_count( priv->queue ) )
1317 			mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
1318 
1319 		if ( priv->started && priv->real_time )
1320 		{
1321 			priv->is_purge = 1;
1322 			pthread_cond_broadcast( &priv->queue_cond );
1323 			pthread_mutex_unlock( &priv->queue_mutex );
1324 			if ( abs( priv->real_time ) > 1 )
1325 			{
1326 				pthread_mutex_lock( &priv->done_mutex );
1327 				pthread_cond_broadcast( &priv->done_cond );
1328 				pthread_mutex_unlock( &priv->done_mutex );
1329 			}
1330 		}
1331 
1332 		pthread_mutex_lock( &priv->put_mutex );
1333 		if ( priv->put ) {
1334 			mlt_frame_close( priv->put );
1335 			priv->put = NULL;
1336 		}
1337 		pthread_cond_broadcast( &priv->put_cond );
1338 		pthread_mutex_unlock( &priv->put_mutex );
1339 	}
1340 }
1341 
1342 /** Use multiple worker threads and a work queue.
1343  */
1344 
worker_get_frame(mlt_consumer self,mlt_properties properties)1345 static mlt_frame worker_get_frame( mlt_consumer self, mlt_properties properties )
1346 {
1347 	// Frame to return
1348 	mlt_frame frame = NULL;
1349 	consumer_private *priv = self->local;
1350 	int threads = abs( priv->real_time );
1351 	int audio_off = mlt_properties_get_int( properties, "audio_off" );
1352 	int samples = 0;
1353 	void *audio = NULL;
1354 	int buffer = mlt_properties_get_int( properties, "_buffer" );
1355 	buffer = buffer > 0 ? buffer : mlt_properties_get_int( properties, "buffer" );
1356 	// This is a heuristic to determine a suitable minimum buffer size for the number of threads.
1357 	int headroom = (priv->real_time < 0) ? threads : (2 + threads * threads);
1358 	buffer = MAX(buffer, headroom);
1359 
1360 	// Start worker threads if not already started.
1361 	if ( ! priv->ahead )
1362 	{
1363 		int prefill = mlt_properties_get_int( properties, "prefill" );
1364 		prefill = prefill > 0 && prefill < buffer ? prefill : buffer;
1365 
1366 		set_audio_format( self );
1367 		set_image_format( self );
1368 		consumer_work_start( self );
1369 
1370 		// Fill the work queue.
1371 		int i = buffer;
1372 		while ( priv->ahead && i-- )
1373 		{
1374 			frame = mlt_consumer_get_frame( self );
1375 			if ( frame )
1376 			{
1377 				// Process the audio
1378 				if ( !audio_off )
1379 				{
1380 					samples = mlt_audio_calculate_frame_samples( priv->fps, priv->frequency, priv->aud_counter++ );
1381 					mlt_frame_get_audio( frame, &audio, &priv->audio_format, &priv->frequency, &priv->channels, &samples );
1382 				}
1383 				pthread_mutex_lock( &priv->queue_mutex );
1384 				mlt_deque_push_back( priv->queue, frame );
1385 				pthread_cond_signal( &priv->queue_cond );
1386 				pthread_mutex_unlock( &priv->queue_mutex );
1387 				priv->speed = mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" );
1388 				buffer = (priv->speed == 0) ? 1 : buffer;
1389 			}
1390 		}
1391 
1392 		// Wait for prefill
1393 		while ( priv->ahead && first_unprocessed_frame( self ) < prefill )
1394 		{
1395 			pthread_mutex_lock( &priv->done_mutex );
1396 			pthread_cond_wait( &priv->done_cond, &priv->done_mutex );
1397 			pthread_mutex_unlock( &priv->done_mutex );
1398 		}
1399 		priv->process_head = threads;
1400 	}
1401 
1402 //	mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "size %d done count %d work count %d process_head %d\n",
1403 //		threads, first_unprocessed_frame( self ), mlt_deque_count( priv->queue ), priv->process_head );
1404 
1405 	// Feed the work queue
1406 	while ( priv->ahead && mlt_deque_count( priv->queue ) < buffer )
1407 	{
1408 		frame = mlt_consumer_get_frame( self );
1409 		if ( frame )
1410 		{
1411 			// Process the audio
1412 			if ( !audio_off )
1413 			{
1414 				samples = mlt_audio_calculate_frame_samples( priv->fps, priv->frequency, priv->aud_counter++ );
1415 				mlt_frame_get_audio( frame, &audio, &priv->audio_format, &priv->frequency, &priv->channels, &samples );
1416 			}
1417 			pthread_mutex_lock( &priv->queue_mutex );
1418 			mlt_deque_push_back( priv->queue, frame );
1419 			pthread_cond_signal( &priv->queue_cond );
1420 			pthread_mutex_unlock( &priv->queue_mutex );
1421 			priv->speed = mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" );
1422 			buffer = (priv->speed == 0) ? 1 : buffer;
1423 		}
1424 	}
1425 
1426 	// Wait if not realtime.
1427 	while ( priv->ahead && priv->real_time < 0 && !priv->is_purge &&
1428 		!( mlt_properties_get_int( MLT_FRAME_PROPERTIES( MLT_FRAME( mlt_deque_peek_front( priv->queue ) ) ), "rendered" ) ) )
1429 	{
1430 		pthread_mutex_lock( &priv->done_mutex );
1431 		pthread_cond_wait( &priv->done_cond, &priv->done_mutex );
1432 		pthread_mutex_unlock( &priv->done_mutex );
1433 	}
1434 
1435 	// Get the frame from the queue.
1436 	pthread_mutex_lock( &priv->queue_mutex );
1437 	frame = mlt_deque_pop_front( priv->queue );
1438 	pthread_mutex_unlock( &priv->queue_mutex );
1439 	if ( ! frame ) {
1440 		priv->is_purge = 0;
1441 		return frame;
1442 	}
1443 
1444 	// Adapt the worker process head to the runtime conditions.
1445 	if ( priv->real_time > 0 )
1446 	{
1447 		if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
1448 		{
1449 			priv->consecutive_dropped = 0;
1450 			if ( priv->process_head > threads && priv->consecutive_rendered >= priv->process_head )
1451 				priv->process_head--;
1452 			else
1453 				priv->consecutive_rendered++;
1454 		}
1455 		else
1456 		{
1457 			priv->consecutive_rendered = 0;
1458 			if ( priv->process_head < buffer - threads && priv->consecutive_dropped > threads )
1459 				priv->process_head++;
1460 			else
1461 				priv->consecutive_dropped++;
1462 		}
1463 //		mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "dropped %d rendered %d process_head %d\n",
1464 //			priv->consecutive_dropped, priv->consecutive_rendered, priv->process_head );
1465 
1466 		// Check for too many consecutively dropped frames
1467 		if ( priv->consecutive_dropped > mlt_properties_get_int( properties, "drop_max" ) )
1468 		{
1469 			int orig_buffer = mlt_properties_get_int( properties, "buffer" );
1470 			int prefill = mlt_properties_get_int( properties, "prefill" );
1471 			mlt_log_verbose( self, "too many frames dropped - " );
1472 
1473 			// If using a default low-latency buffer level (SDL) and below the limit
1474 			if ( ( orig_buffer == 1 || prefill == 1 ) && buffer < (threads + 1) * 10 )
1475 			{
1476 				// Auto-scale the buffer to compensate
1477 				mlt_log_verbose( self, "increasing buffer to %d\n", buffer + threads );
1478 				mlt_properties_set_int( properties, "_buffer", buffer + threads );
1479 				priv->consecutive_dropped = priv->fps / 2;
1480 			}
1481 			else
1482 			{
1483 				// Tell the consumer to render it
1484 				mlt_log_verbose( self, "forcing next frame\n" );
1485 				mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1486 				priv->consecutive_dropped = 0;
1487 			}
1488 		}
1489 		if ( !mlt_properties_get_int( MLT_FRAME_PROPERTIES(frame), "rendered") )
1490 		{
1491 			int dropped = mlt_properties_get_int( properties, "drop_count" );
1492 			mlt_properties_set_int( properties, "drop_count", ++dropped );
1493 			mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "dropped video frame %d\n", dropped );
1494 		}
1495 	}
1496 	if ( priv->is_purge ) {
1497 		priv->is_purge = 0;
1498 		mlt_frame_close( frame );
1499 		frame = NULL;
1500 	}
1501 	return frame;
1502 }
1503 
1504 /** Get the next frame from the producer connected to a consumer.
1505  *
1506  * Typically, one uses this instead of \p mlt_consumer_get_frame to make
1507  * the asynchronous/real-time behavior configurable at runtime.
1508  * You should close the frame returned from this when you are done with it.
1509  *
1510  * \public \memberof mlt_consumer_s
1511  * \param self a consumer
1512  * \return a frame
1513  */
1514 
mlt_consumer_rt_frame(mlt_consumer self)1515 mlt_frame mlt_consumer_rt_frame( mlt_consumer self )
1516 {
1517 	// Frame to return
1518 	mlt_frame frame = NULL;
1519 
1520 	// Get the properties
1521 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1522 	consumer_private *priv = self->local;
1523 
1524 	// Check if the user has requested real time or not
1525 	if ( priv->real_time > 1 || priv->real_time < -1 )
1526 	{
1527 		// see above
1528 		return worker_get_frame( self, properties );
1529 	}
1530 	else if ( priv->real_time == 1 || priv->real_time == -1 )
1531 	{
1532 		int size = 1;
1533 
1534 		if ( priv->preroll )
1535 		{
1536 			int buffer = mlt_properties_get_int( properties, "buffer" );
1537 			int prefill = mlt_properties_get_int( properties, "prefill" );
1538 #ifndef _WIN32
1539 			consumer_read_ahead_start( self );
1540 #endif
1541 			if ( buffer > 1 && priv->speed )
1542 				size = prefill > 0 && prefill < buffer ? prefill : buffer;
1543 			priv->preroll = 0;
1544 		}
1545 
1546 		// Get frame from queue
1547 		pthread_mutex_lock( &priv->queue_mutex );
1548 		mlt_log_timings_begin();
1549 		while( priv->ahead && mlt_deque_count( priv->queue ) < size )
1550 			pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
1551 		frame = mlt_deque_pop_front( priv->queue );
1552 		mlt_log_timings_end( NULL, "wait_for_frame_queue" );
1553 		pthread_cond_broadcast( &priv->queue_cond );
1554 		pthread_mutex_unlock( &priv->queue_mutex );
1555 		if ( priv->real_time == 1 && frame &&
1556 			 !mlt_properties_get_int( MLT_FRAME_PROPERTIES(frame), "rendered" ) )
1557 		{
1558 			int dropped = mlt_properties_get_int( properties, "drop_count" );
1559 			mlt_properties_set_int( properties, "drop_count", ++dropped );
1560 			mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "dropped video frame %d\n", dropped );
1561 		}
1562 	}
1563 	else // real_time == 0
1564 	{
1565 		if ( !priv->ahead )
1566 		{
1567 			priv->ahead = 1;
1568 			mlt_events_fire( properties, "consumer-thread-started", mlt_event_data_none() );
1569 		}
1570 		// Get the frame in non real time
1571 		frame = mlt_consumer_get_frame( self );
1572 
1573 		// This isn't true, but from the consumers perspective it is
1574 		if ( frame != NULL )
1575 		{
1576 			mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1577 
1578 			// WebVfx uses this to setup a consumer-stopping event handler.
1579 			mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
1580 		}
1581 	}
1582 
1583 	return frame;
1584 }
1585 
1586 /** Callback for the implementation to indicate a stopped condition.
1587  *
1588  * \public \memberof mlt_consumer_s
1589  * \param self a consumer
1590  */
1591 
mlt_consumer_stopped(mlt_consumer self)1592 void mlt_consumer_stopped( mlt_consumer self )
1593 {
1594 	mlt_properties_set_int( MLT_CONSUMER_PROPERTIES( self ), "running", 0 );
1595 	mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-stopped", mlt_event_data_none() );
1596 	mlt_event_unblock( ( ( consumer_private* ) self->local )->event_listener );
1597 }
1598 
1599 /** Stop the consumer.
1600  *
1601  * \public \memberof mlt_consumer_s
1602  * \param self a consumer
1603  * \return true if there was an error
1604  */
1605 
mlt_consumer_stop(mlt_consumer self)1606 int mlt_consumer_stop( mlt_consumer self )
1607 {
1608 	if (!self) {
1609 		return 1;
1610 	}
1611 
1612 	// Get the properties
1613 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1614 	consumer_private *priv = self->local;
1615 
1616 	// Just in case...
1617 	mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping put waiting\n" );
1618 	pthread_mutex_lock( &priv->put_mutex );
1619 	priv->put_active = 0;
1620 	pthread_cond_broadcast( &priv->put_cond );
1621 	pthread_mutex_unlock( &priv->put_mutex );
1622 
1623 	// Stop the consumer
1624 	mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping consumer\n" );
1625 
1626 	// Cancel the read ahead threads
1627 	if ( priv->started )
1628 	{
1629 		// Unblock the consumer calling mlt_consumer_rt_frame
1630 		pthread_mutex_lock( &priv->queue_mutex );
1631 		pthread_cond_broadcast( &priv->queue_cond );
1632 		pthread_mutex_unlock( &priv->queue_mutex );
1633 	}
1634 
1635 	// Invoke the child callback
1636 	if ( self->stop != NULL )
1637 		self->stop( self );
1638 
1639 	// Check if the user has requested real time or not and stop if necessary
1640 	mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping read_ahead\n" );
1641 	if ( abs( priv->real_time ) == 1 )
1642 		consumer_read_ahead_stop( self );
1643 	else if ( abs( priv->real_time ) > 1 )
1644 		consumer_work_stop( self );
1645 
1646 	// Kill the test card
1647 	mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
1648 
1649 	// Check and run a post command
1650 	if ( mlt_properties_get( properties, "post" ) )
1651 		if (system( mlt_properties_get( properties, "post" ) ) == -1 )
1652 			mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_ERROR, "system(%s) failed!\n", mlt_properties_get( properties, "post" ) );
1653 
1654 	mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopped\n" );
1655 
1656 	return 0;
1657 }
1658 
1659 /** Determine if the consumer is stopped.
1660  *
1661  * \public \memberof mlt_consumer_s
1662  * \param self a consumer
1663  * \return true if the consumer is stopped
1664  */
1665 
mlt_consumer_is_stopped(mlt_consumer self)1666 int mlt_consumer_is_stopped( mlt_consumer self )
1667 {
1668 	// Check if the consumer is stopped
1669 	if ( self && self->is_stopped )
1670 		return self->is_stopped( self );
1671 
1672 	return 0;
1673 }
1674 
1675 /** Close and destroy the consumer.
1676  *
1677  * \public \memberof mlt_consumer_s
1678  * \param self a consumer
1679  */
1680 
mlt_consumer_close(mlt_consumer self)1681 void mlt_consumer_close( mlt_consumer self )
1682 {
1683 	if ( self != NULL && mlt_properties_dec_ref( MLT_CONSUMER_PROPERTIES( self ) ) <= 0 )
1684 	{
1685 		// Get the childs close function
1686 		void ( *consumer_close )( ) = self->close;
1687 
1688 		if ( consumer_close )
1689 		{
1690 			// Just in case...
1691 			//mlt_consumer_stop( self );
1692 
1693 			self->close = NULL;
1694 			consumer_close( self );
1695 		}
1696 		else
1697 		{
1698 			consumer_private *priv = self->local;
1699 
1700 			// Make sure it only gets called once
1701 			self->parent.close = NULL;
1702 
1703 			// Destroy the push mutex and condition
1704 			pthread_mutex_destroy( &priv->put_mutex );
1705 			pthread_cond_destroy( &priv->put_cond );
1706 
1707 			pthread_mutex_destroy( &priv->position_mutex );
1708 
1709 			mlt_service_close( &self->parent );
1710 			free( priv );
1711 		}
1712 	}
1713 }
1714 
1715 /** Get the position of the last frame shown.
1716  *
1717  * \public \memberof mlt_consumer_s
1718  * \param consumer a consumer
1719  * \return the position
1720  */
1721 
mlt_consumer_position(mlt_consumer consumer)1722 mlt_position mlt_consumer_position( mlt_consumer consumer )
1723 {
1724 	consumer_private* priv = consumer->local;
1725 	pthread_mutex_lock( &priv->position_mutex );
1726 	mlt_position result = priv->position;
1727 	pthread_mutex_unlock( &priv->position_mutex );
1728 	return result;
1729 }
1730 
mlt_thread_create(mlt_consumer self,mlt_thread_function_t function)1731 static void mlt_thread_create(mlt_consumer self, mlt_thread_function_t function )
1732 {
1733 	consumer_private *priv = self->local;
1734 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1735 
1736 	if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
1737 	{
1738 		struct sched_param priority;
1739 		priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
1740 		mlt_event_data_thread data = {
1741 		    .thread = &priv->ahead_thread,
1742 		    .priority = &priority.sched_priority,
1743 		    .function = function,
1744 		    .data = self
1745 		};
1746 		if ( mlt_events_fire( properties, "consumer-thread-create", mlt_event_data_from_object(&data) ) < 1 )
1747 		{
1748 			pthread_attr_t thread_attributes;
1749 			pthread_attr_init( &thread_attributes );
1750 			pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
1751 			pthread_attr_setschedparam( &thread_attributes, &priority );
1752 			pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
1753 			pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
1754 			priv->ahead_thread = malloc( sizeof( pthread_t ) );
1755 			pthread_t *handle = priv->ahead_thread;
1756 			if ( pthread_create( ( pthread_t* ) &( *handle ), &thread_attributes, function, self ) < 0 )
1757 				pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
1758 			pthread_attr_destroy( &thread_attributes );
1759 		}
1760 	}
1761 	else
1762 	{
1763 		int priority = -1;
1764 		mlt_event_data_thread data = {
1765 		    .thread = &priv->ahead_thread,
1766 		    .priority = &priority,
1767 		    .function = function,
1768 		    .data = self
1769 		};
1770 		if ( mlt_events_fire( properties, "consumer-thread-create", mlt_event_data_from_object(&data) ) < 1 )
1771 		{
1772 			priv->ahead_thread = malloc( sizeof( pthread_t ) );
1773 			pthread_t *handle = priv->ahead_thread;
1774 			pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
1775 		}
1776 	}
1777 }
1778 
mlt_thread_join(mlt_consumer self)1779 static void mlt_thread_join( mlt_consumer self )
1780 {
1781 	consumer_private *priv = self->local;
1782 	if ( mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-join",
1783 		mlt_event_data_from_object(priv->ahead_thread) ) < 1 )
1784 	{
1785 		pthread_t *handle = priv->ahead_thread;
1786 		pthread_join( *handle, NULL );
1787 		free( priv->ahead_thread );
1788 	}
1789 	priv->ahead_thread = NULL;
1790 }
1791