1 /*
2  * consumer_rtaudio.c -- output through RtAudio audio wrapper
3  * Copyright (C) 2011-2021 Meltytech, LLC
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with consumer library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19 
20 #include <framework/mlt.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <pthread.h>
24 #include <sys/time.h>
25 #ifdef USE_INTERNAL_RTAUDIO
26 #include "RtAudio.h"
27 #else
28 #include <RtAudio.h>
29 #endif
30 
31 static void consumer_refresh_cb(mlt_consumer sdl, mlt_consumer consumer, mlt_event_data );
32 static int  rtaudio_callback( void *outputBuffer, void *inputBuffer,
33 	unsigned int nFrames, double streamTime, RtAudioStreamStatus status, void *userData );
34 static void *consumer_thread_proxy( void *arg );
35 static void *video_thread_proxy( void *arg );
36 
rtaudio_api_str(RtAudio::Api api)37 static const char *rtaudio_api_str( RtAudio::Api api )
38 {
39 	switch( api )
40 	{
41 		case RtAudio::UNSPECIFIED: return "UNSPECIFIED";
42 		case RtAudio::LINUX_ALSA: return "LINUX_ALSA";
43 		case RtAudio::LINUX_PULSE: return "LINUX_PULSE";
44 		case RtAudio::LINUX_OSS: return "LINUX_OSS";
45 		case RtAudio::UNIX_JACK: return "UNIX_JACK";
46 		case RtAudio::MACOSX_CORE: return "MACOSX_CORE";
47 		case RtAudio::WINDOWS_WASAPI: return "WINDOWS_WASAPI";
48 		case RtAudio::WINDOWS_ASIO: return "WINDOWS_ASIO";
49 		case RtAudio::WINDOWS_DS: return "WINDOWS_DS";
50 		case RtAudio::RTAUDIO_DUMMY: return "RTAUDIO_DUMMY";
51 	}
52 	return "UNKNOWN!?!";
53 }
54 
55 class RtAudioConsumer
56 {
57 public:
58 	struct mlt_consumer_s consumer;
59 	RtAudio*              rt;
60 	int                   device_id;
61 	mlt_deque             queue;
62 	pthread_t             thread;
63 	int                   joined;
64 	int                   running;
65 	int                   out_channels;
66 	uint8_t               audio_buffer[4096 * 10];
67 	int                   audio_avail;
68 	pthread_mutex_t       audio_mutex;
69 	pthread_cond_t        audio_cond;
70 	pthread_mutex_t       video_mutex;
71 	pthread_cond_t        video_cond;
72 	int                   playing;
73 	pthread_cond_t        refresh_cond;
74 	pthread_mutex_t       refresh_mutex;
75 	int                   refresh_count;
76 	bool                  is_purge;
77 
getConsumer()78 	mlt_consumer getConsumer()
79 		{ return &consumer; }
80 
RtAudioConsumer()81 	RtAudioConsumer()
82 		: rt(NULL)
83 		, device_id(-1)
84 		, queue(NULL)
85 		, joined(0)
86 		, running(0)
87 		, audio_avail(0)
88 		, playing(0)
89 		, refresh_count(0)
90 		, is_purge(false)
91 	{
92 		memset( &consumer, 0, sizeof( consumer ) );
93 	}
94 
~RtAudioConsumer()95 	~RtAudioConsumer()
96 	{
97 		// Close the queue
98 		mlt_deque_close( queue );
99 
100 		// Destroy mutexes
101 		pthread_mutex_destroy( &audio_mutex );
102 		pthread_cond_destroy( &audio_cond );
103 		pthread_mutex_destroy( &video_mutex );
104 		pthread_cond_destroy( &video_cond );
105 		pthread_mutex_destroy( &refresh_mutex );
106 		pthread_cond_destroy( &refresh_cond );
107 
108 		if ( rt && rt->isStreamOpen() )
109 			rt->closeStream();
110 		delete rt;
111 		rt = NULL;
112 	}
113 
create_rtaudio(RtAudio::Api api,int channels,int frequency)114 	bool create_rtaudio( RtAudio::Api api, int channels, int frequency )
115 	{
116 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( getConsumer() );
117 		const char *resource = mlt_properties_get( properties, "resource" );
118 		unsigned int bufferFrames = mlt_properties_get_int( properties, "audio_buffer" );
119 
120 		mlt_log_info( getConsumer(), "Attempt to open RtAudio: %s\t%d\t%d\n", rtaudio_api_str( api ), channels, frequency );
121 		rt = new RtAudio( api );
122 
123 		if( !rt )
124 		{
125 			return false;
126 		}
127 
128 		if ( rt->getDeviceCount() < 1 )
129 		{
130 			mlt_log_warning( getConsumer(), "no audio devices found\n" );
131 			delete rt;
132 			rt = NULL;
133 			return false;
134 		}
135 
136 #ifndef __LINUX_ALSA__
137 		device_id = rt->getDefaultOutputDevice();
138 #endif
139 		if ( resource && strcmp( resource, "" ) && strcmp( resource, "default" ) )
140 		{
141 			// Get device ID by name
142 			unsigned int n = rt->getDeviceCount();
143 			RtAudio::DeviceInfo info;
144 			unsigned int i;
145 
146 			for ( i = 0; i < n; i++ )
147 			{
148 				info = rt->getDeviceInfo( i );
149 				mlt_log_verbose( NULL, "RtAudio device %d = %s\n", i, info.name.c_str() );
150 				if ( info.probed && info.name == resource )
151 				{
152 					device_id = i;
153 					break;
154 				}
155 			}
156 			// Name selection failed, try arg as numeric
157 			if ( i == n )
158 				device_id = (int) strtol( resource, NULL, 0 );
159 		}
160 
161 		RtAudio::StreamParameters parameters;
162 		parameters.deviceId = device_id;
163 		parameters.nChannels = channels;
164 		parameters.firstChannel = 0;
165 		RtAudio::StreamOptions options;
166 
167 		if ( device_id == -1 )
168 		{
169 			options.flags = RTAUDIO_ALSA_USE_DEFAULT;
170 			parameters.deviceId = 0;
171 		}
172 		if ( resource )
173 		{
174 			unsigned n = rt->getDeviceCount();
175 			for (unsigned i = 0; i < n; i++) {
176 				RtAudio::DeviceInfo info = rt->getDeviceInfo( i );
177 				if ( info.name == resource ) {
178 					device_id = parameters.deviceId = i;
179 					break;
180 				}
181 			}
182 		}
183 
184 		try {
185 			if ( rt->isStreamOpen() ) {
186 				 rt->closeStream();
187 			}
188 			rt->openStream( &parameters, NULL, RTAUDIO_SINT16,
189 				frequency, &bufferFrames, &rtaudio_callback, this, &options );
190 			rt->startStream();
191 		}
192 #ifdef RTERROR_H
193 		catch ( RtError& e ) {
194 #else
195 		catch ( RtAudioError& e ) {
196 #endif
197 			mlt_log_info( getConsumer(), "%s\n", e.getMessage().c_str() );
198 			delete rt;
199 			rt = NULL;
200 			return false;
201 		}
202 		mlt_log_info( getConsumer(), "Opened RtAudio: %s\t%d\t%d\n", rtaudio_api_str( rt->getCurrentApi() ), channels, frequency );
203 		return true;
204 	}
205 
206 	bool find_and_create_rtaudio( int requested_channels, int frequency, int* actual_channels )
207 	{
208 		bool result = false;
209 #ifdef __WINDOWS_DS__
210 		// Prefer DirectSound on Windows
211 		RtAudio::Api PREFERRED_API = RtAudio::WINDOWS_DS;
212 #else
213 		RtAudio::Api PREFERRED_API = RtAudio::UNSPECIFIED;
214 #endif
215 		*actual_channels = requested_channels;
216 
217 		// First try with preferred API.
218 		result = create_rtaudio( PREFERRED_API, *actual_channels, frequency );
219 
220 		if( !result )
221 		{
222 			// If the preferred API fails, try other APIs that are available.
223 			std::vector<RtAudio::Api> apis;
224 			RtAudio::getCompiledApi( apis );
225 			for( size_t i = 0; i < apis.size(); i++ )
226 			{
227 				if( apis[i] == PREFERRED_API || apis[i] == RtAudio::RTAUDIO_DUMMY )
228 				{
229 					continue;
230 				}
231 				result = create_rtaudio( apis[i], *actual_channels, frequency );
232 				if( result )
233 				{
234 					break;
235 				}
236 			}
237 		}
238 
239 		if( !result && *actual_channels != 2 )
240 		{
241 			// If surround has failed for all APIs, try stereo.
242 			*actual_channels = 2;
243 			mlt_log_info( getConsumer(), "Unable to open %d channels. Try %d channels\n", requested_channels, *actual_channels );
244 			std::vector<RtAudio::Api> apis;
245 			RtAudio::getCompiledApi( apis );
246 			for( size_t i = 0; i < apis.size(); i++ )
247 			{
248 				if( apis[i] == RtAudio::RTAUDIO_DUMMY )
249 				{
250 					continue;
251 				}
252 				result = create_rtaudio( apis[i], *actual_channels, frequency );
253 				if( result )
254 				{
255 					break;
256 				}
257 			}
258 		}
259 
260 		return result;
261 	}
262 
263 	bool open( const char* arg )
264 	{
265 		// Create the queue
266 		queue = mlt_deque_init( );
267 
268 		// get a handle on properties
269 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( &consumer );
270 
271 		// Set the default volume
272 		mlt_properties_set_double( properties, "volume", 1.0 );
273 
274 		// This is the initialisation of the consumer
275 		pthread_mutex_init( &audio_mutex, NULL );
276 		pthread_cond_init( &audio_cond, NULL);
277 		pthread_mutex_init( &video_mutex, NULL );
278 		pthread_cond_init( &video_cond, NULL);
279 
280 		// Default scaler (for now we'll use nearest)
281 		mlt_properties_set( properties, "rescale", "nearest" );
282 		mlt_properties_set( properties, "deinterlace_method", "onefield" );
283 
284 		// Default buffer for low latency
285 		mlt_properties_set_int( properties, "buffer", 1 );
286 
287 		// Default audio buffer
288 		mlt_properties_set_int( properties, "audio_buffer", 1024 );
289 
290 		// Set the resource to the device name arg
291 		mlt_properties_set( properties, "resource", arg );
292 
293 		// Ensure we don't join on a non-running object
294 		joined = 1;
295 
296 		// Initialize the refresh handler
297 		pthread_cond_init( &refresh_cond, NULL );
298 		pthread_mutex_init( &refresh_mutex, NULL );
299 		mlt_events_listen( properties, this, "property-changed", ( mlt_listener )consumer_refresh_cb );
300 
301 		return true;
302 	}
303 
304 	int start()
305 	{
306 		if ( !running )
307 		{
308 			stop();
309 			running = 1;
310 			joined = 0;
311 			pthread_create( &thread, NULL, consumer_thread_proxy, this );
312 		}
313 
314 		return 0;
315 	}
316 
317 	int stop()
318 	{
319 		if ( running && !joined )
320 		{
321 			// Kill the thread and clean up
322 			joined = 1;
323 			running = 0;
324 
325 			// Unlatch the consumer thread
326 			pthread_mutex_lock( &refresh_mutex );
327 			pthread_cond_broadcast( &refresh_cond );
328 			pthread_mutex_unlock( &refresh_mutex );
329 
330 			// Cleanup the main thread
331 			pthread_join( thread, NULL );
332 
333 			// Unlatch the video thread
334 			pthread_mutex_lock( &video_mutex );
335 			pthread_cond_broadcast( &video_cond );
336 			pthread_mutex_unlock( &video_mutex );
337 
338 			// Unlatch the audio callback
339 			pthread_mutex_lock( &audio_mutex );
340 			pthread_cond_broadcast( &audio_cond );
341 			pthread_mutex_unlock( &audio_mutex );
342 
343 			if ( rt && rt->isStreamOpen() )
344 			try {
345 				// Stop the stream
346 				rt->stopStream();
347 			}
348 #ifdef RTERROR_H
349 			catch ( RtError& e ) {
350 #else
351 			catch ( RtAudioError& e ) {
352 #endif
353 				mlt_log_error( getConsumer(), "%s\n", e.getMessage().c_str() );
354 			}
355 			delete rt;
356 			rt = NULL;
357 		}
358 
359 		return 0;
360 	}
361 
362 	void purge()
363 	{
364 		if ( running )
365 		{
366 			pthread_mutex_lock( &video_mutex );
367 			mlt_frame frame = MLT_FRAME( mlt_deque_peek_back( queue ) );
368 			// When playing rewind or fast forward then we need to keep one
369 			// frame in the queue to prevent playback stalling.
370 			double speed = frame? mlt_properties_get_double( MLT_FRAME_PROPERTIES(frame), "_speed" ) : 0;
371 			int n = ( speed == 0.0 || speed == 1.0 ) ? 0 : 1;
372 			while ( mlt_deque_count( queue ) > n )
373 				mlt_frame_close( MLT_FRAME( mlt_deque_pop_back( queue ) ) );
374 			is_purge = true;
375 			pthread_cond_broadcast( &video_cond );
376 			pthread_mutex_unlock( &video_mutex );
377 		}
378 	}
379 
380 	void consumer_thread()
381 	{
382 		// Get the properties
383 		mlt_properties consumer_props = MLT_CONSUMER_PROPERTIES( getConsumer() );
384 
385 		// Video thread
386 		pthread_t thread;
387 
388 		// internal initialization
389 		int init_audio = 1;
390 		int init_video = 1;
391 		mlt_frame frame = NULL;
392 		mlt_properties properties = NULL;
393 		int64_t duration = 0;
394 		int64_t playtime = mlt_properties_get_int(consumer_props, "video_delay") * 1000;
395 		struct timespec tm = { 0, 100000 };
396 	//	int last_position = -1;
397 
398 		pthread_mutex_lock( &refresh_mutex );
399 		refresh_count = 0;
400 		pthread_mutex_unlock( &refresh_mutex );
401 
402 		// Loop until told not to
403 		while ( running )
404 		{
405 			// Get a frame from the attached producer
406 			frame = mlt_consumer_rt_frame( getConsumer() );
407 
408 			// Ensure that we have a frame
409 			if ( frame )
410 			{
411 				// Get the frame properties
412 				properties =  MLT_FRAME_PROPERTIES( frame );
413 
414 				// Get the speed of the frame
415 				double speed = mlt_properties_get_double( properties, "_speed" );
416 
417 				// Get refresh request for the current frame
418 				int refresh = mlt_properties_get_int( consumer_props, "refresh" );
419 
420 				// Clear refresh
421 				mlt_events_block( consumer_props, consumer_props );
422 				mlt_properties_set_int( consumer_props, "refresh", 0 );
423 				mlt_events_unblock( consumer_props, consumer_props );
424 
425 				// Play audio
426 				init_audio = play_audio( frame, init_audio, &duration );
427 
428 				// Determine the start time now
429 				if ( playing && init_video )
430 				{
431 					// Create the video thread
432 					pthread_create( &thread, NULL, video_thread_proxy, this );
433 
434 					// Video doesn't need to be initialised any more
435 					init_video = 0;
436 				}
437 
438 				// Set playtime for this frame in microseconds
439 				mlt_properties_set_int64( properties, "playtime", playtime );
440 
441 				while ( running && speed != 0 && mlt_deque_count( queue ) > 15 )
442 					nanosleep( &tm, NULL );
443 
444 				// Push this frame to the back of the video queue
445 				if ( running && speed )
446 				{
447 					pthread_mutex_lock( &video_mutex );
448 					if ( is_purge && speed == 1.0 )
449 					{
450 						mlt_frame_close( frame );
451 						is_purge = false;
452 					}
453 					else
454 					{
455 						mlt_deque_push_back( queue, frame );
456 						pthread_cond_broadcast( &video_cond );
457 					}
458 					pthread_mutex_unlock( &video_mutex );
459 
460 					// Calculate the next playtime
461 					playtime += duration;
462 				}
463 				else if ( running )
464 				{
465 					pthread_mutex_lock( &refresh_mutex );
466 					if ( refresh == 0 && refresh_count <= 0 )
467 					{
468 						play_video( frame );
469 						pthread_cond_wait( &refresh_cond, &refresh_mutex );
470 					}
471 					mlt_frame_close( frame );
472 					refresh_count --;
473 					pthread_mutex_unlock( &refresh_mutex );
474 				}
475 				else
476 				{
477 					mlt_frame_close( frame );
478 					frame = NULL;
479 				}
480 
481 				// Optimisation to reduce latency
482 				if ( frame && speed == 1.0 )
483 				{
484 					// TODO: disabled due to misbehavior on parallel-consumer
485 	//				if ( last_position != -1 && last_position + 1 != mlt_frame_get_position( frame ) )
486 	//					mlt_consumer_purge( consumer );
487 	//				last_position = mlt_frame_get_position( frame );
488 				}
489 				else if (speed == 0.0)
490 				{
491 					mlt_consumer_purge( getConsumer() );
492 	//				last_position = -1;
493 				}
494 			}
495 		}
496 
497 		// Kill the video thread
498 		if ( init_video == 0 )
499 		{
500 			pthread_mutex_lock( &video_mutex );
501 			pthread_cond_broadcast( &video_cond );
502 			pthread_mutex_unlock( &video_mutex );
503 			pthread_join( thread, NULL );
504 		}
505 
506 		while( mlt_deque_count( queue ) )
507 			mlt_frame_close( (mlt_frame) mlt_deque_pop_back( queue ) );
508 
509 		audio_avail = 0;
510 	}
511 
512 	int callback( int16_t *outbuf, int16_t *inbuf,
513 		unsigned int samples, double streamTime, RtAudioStreamStatus status )
514 	{
515 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( getConsumer() );
516 		double volume = mlt_properties_get_double( properties, "volume" );
517 		int len = mlt_audio_format_size( mlt_audio_s16, samples, out_channels );
518 
519 		pthread_mutex_lock( &audio_mutex );
520 
521 		// Block until audio received
522 		while ( running && len > audio_avail )
523 			pthread_cond_wait( &audio_cond, &audio_mutex );
524 
525 		if ( audio_avail >= len )
526 		{
527 			// Place in the audio buffer
528 			memcpy( outbuf, audio_buffer, len );
529 
530 			// Remove len from the audio available
531 			audio_avail -= len;
532 
533 			// Remove the samples
534 			memmove( audio_buffer, audio_buffer + len, audio_avail );
535 		}
536 		else
537 		{
538 			// Just to be safe, wipe the stream first
539 			memset( outbuf, 0, len );
540 
541 			// Copy what we have
542 			memcpy( outbuf, audio_buffer, audio_avail );
543 
544 			// No audio left
545 			audio_avail = 0;
546 		}
547 
548 		if ( volume != 1.0 )
549 		{
550 			int16_t *p = outbuf;
551 			int i = samples * out_channels + 1;
552 			while ( --i )
553 				*p++ *= volume;
554 		}
555 
556 		// We're definitely playing now
557 		playing = 1;
558 
559 		pthread_cond_broadcast( &audio_cond );
560 		pthread_mutex_unlock( &audio_mutex );
561 
562 		return 0;
563 	}
564 
565 	int play_audio( mlt_frame frame, int init_audio, int64_t *duration )
566 	{
567 		// Get the properties of this consumer
568 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( getConsumer() );
569 		mlt_audio_format afmt = mlt_audio_s16;
570 
571 		// Set the preferred params of the test card signal
572 		int channels = mlt_properties_get_int( properties, "channels" );
573 		int frequency = mlt_properties_get_int( properties, "frequency" );
574 		int scrub = mlt_properties_get_int( properties, "scrub_audio" );
575 		static int counter = 0;
576 		int samples = mlt_audio_calculate_frame_samples( mlt_properties_get_double( properties, "fps" ), frequency, counter++ );
577 		int16_t *pcm;
578 
579 		mlt_frame_get_audio( frame, (void**) &pcm, &afmt, &frequency, &channels, &samples );
580 		*duration = 1000000LL * samples / frequency;
581 
582 		if ( mlt_properties_get_int( properties, "audio_off" ) )
583 		{
584 			playing = 1;
585 			return init_audio;
586 		}
587 
588 		if ( init_audio == 1 )
589 		{
590 			if( find_and_create_rtaudio( channels, frequency, &out_channels ) )
591 			{
592 				init_audio = 0;
593 				playing = 1;
594 			}
595 			else
596 			{
597 				rt = NULL;
598 				mlt_log_error( getConsumer(), "Unable to initialize RtAudio\n" );
599 				init_audio = 2;
600 			}
601 		}
602 
603 		if ( init_audio == 0 )
604 		{
605 			mlt_properties properties = MLT_FRAME_PROPERTIES( frame );
606 			int samples_copied = 0;
607 			int dst_stride = out_channels * sizeof( *pcm );
608 
609 			pthread_mutex_lock( &audio_mutex );
610 
611 			while ( running && samples_copied < samples )
612 			{
613 				int sample_space = ( sizeof( audio_buffer ) - audio_avail ) / dst_stride;
614 
615 				while ( running && sample_space == 0 )
616 				{
617 					pthread_cond_wait( &audio_cond, &audio_mutex );
618 					sample_space = ( sizeof( audio_buffer ) - audio_avail ) / dst_stride;
619 				}
620 				if ( running )
621 				{
622 					int samples_to_copy = samples - samples_copied;
623 					if ( samples_to_copy > sample_space )
624 					{
625 						samples_to_copy = sample_space;
626 					}
627 					int dst_bytes = samples_to_copy * dst_stride;
628 
629 					if ( scrub || mlt_properties_get_double( properties, "_speed" ) == 1 )
630 					{
631 						if ( channels == out_channels )
632 						{
633 							memcpy( &audio_buffer[ audio_avail ], pcm, dst_bytes );
634 							pcm += samples_to_copy * channels;
635 						}
636 						else
637 						{
638 							int16_t *dest = (int16_t*) &audio_buffer[ audio_avail ];
639 							int i = samples_to_copy + 1;
640 							while ( --i )
641 							{
642 								memcpy( dest, pcm, dst_stride );
643 								pcm += channels;
644 								dest += out_channels;
645 							}
646 						}
647 					}
648 					else
649 					{
650 						memset( &audio_buffer[ audio_avail ], 0, dst_bytes );
651 						pcm += samples_to_copy * channels;
652 					}
653 					audio_avail += dst_bytes;
654 					samples_copied += samples_to_copy;
655 				}
656 				pthread_cond_broadcast( &audio_cond );
657 			}
658 			pthread_mutex_unlock( &audio_mutex );
659 		}
660 
661 		return init_audio;
662 	}
663 
664 	int play_video( mlt_frame frame )
665 	{
666 		// Get the properties of this consumer
667 		mlt_properties properties = MLT_CONSUMER_PROPERTIES( getConsumer() );
668 		if ( running && !mlt_consumer_is_stopped( getConsumer() ) ) {
669 			mlt_events_fire( properties, "consumer-frame-show", mlt_event_data_from_frame(frame) );
670 		}
671 
672 		return 0;
673 	}
674 
675 	void video_thread()
676 	{
677 		// Obtain time of thread start
678 		struct timeval now;
679 		int64_t start = 0;
680 		int64_t elapsed = 0;
681 		struct timespec tm;
682 		mlt_frame next = NULL;
683 		mlt_properties consumerProperties = MLT_CONSUMER_PROPERTIES( getConsumer() );
684 		double speed = 0;
685 
686 		// Get real time flag
687 		int real_time = mlt_properties_get_int( consumerProperties, "real_time" );
688 
689 		// Get the current time
690 		gettimeofday( &now, NULL );
691 
692 		// Determine start time
693 		start = ( int64_t )now.tv_sec * 1000000 + now.tv_usec;
694 
695 		while ( running )
696 		{
697 			// Pop the next frame
698 			pthread_mutex_lock( &video_mutex );
699 			next = (mlt_frame) mlt_deque_pop_front( queue );
700 			while ( next == NULL && running )
701 			{
702 				pthread_cond_wait( &video_cond, &video_mutex );
703 				next = (mlt_frame) mlt_deque_pop_front( queue );
704 			}
705 			pthread_mutex_unlock( &video_mutex );
706 
707 			if ( !running || next == NULL ) break;
708 
709 			// Get the properties
710 			mlt_properties properties =  MLT_FRAME_PROPERTIES( next );
711 
712 			// Get the speed of the frame
713 			speed = mlt_properties_get_double( properties, "_speed" );
714 
715 			// Get the current time
716 			gettimeofday( &now, NULL );
717 
718 			// Get the elapsed time
719 			elapsed = ( ( int64_t )now.tv_sec * 1000000 + now.tv_usec ) - start;
720 
721 			// See if we have to delay the display of the current frame
722 			if ( mlt_properties_get_int( properties, "rendered" ) == 1 && running )
723 			{
724 				// Obtain the scheduled playout time in microseconds
725 				int64_t scheduled = mlt_properties_get_int64( properties, "playtime" );
726 
727 				// Determine the difference between the elapsed time and the scheduled playout time
728 				int64_t difference = scheduled - elapsed;
729 
730 				// Smooth playback a bit
731 				if ( real_time && ( difference > 20000 && speed == 1.0 ) )
732 				{
733 					tm.tv_sec = difference / 1000000;
734 					tm.tv_nsec = ( difference % 1000000 ) * 1000;
735 					nanosleep( &tm, NULL );
736 				}
737 
738 				// Show current frame if not too old
739 				if ( !real_time || ( difference > -10000 || speed != 1.0 || mlt_deque_count( queue ) < 2 ) )
740 					play_video( next );
741 
742 				// If the queue is empty, recalculate start to allow build up again
743 				if ( real_time && ( mlt_deque_count( queue ) == 0 && speed == 1.0 ) )
744 				{
745 					gettimeofday( &now, NULL );
746 					start = ( ( int64_t )now.tv_sec * 1000000 + now.tv_usec ) - scheduled + 20000;
747 					start += mlt_properties_get_int(consumerProperties, "video_delay") * 1000;
748 				}
749 			}
750 
751 			// This frame can now be closed
752 			mlt_frame_close( next );
753 			next = NULL;
754 		}
755 
756 		if ( next != NULL )
757 			mlt_frame_close( next );
758 
759 		mlt_consumer_stopped( getConsumer() );
760 	}
761 
762 };
763 
764 static void consumer_refresh_cb( mlt_consumer sdl, mlt_consumer consumer, mlt_event_data event_data )
765 {
766 	const char *name = mlt_event_data_to_string(event_data);
767 	if ( name && !strcmp( name, "refresh" ) )
768 	{
769 		RtAudioConsumer* rtaudio = (RtAudioConsumer*) consumer->child;
770 		pthread_mutex_lock( &rtaudio->refresh_mutex );
771 		rtaudio->refresh_count = rtaudio->refresh_count <= 0 ? 1 : rtaudio->refresh_count + 1;
772 		pthread_cond_broadcast( &rtaudio->refresh_cond );
773 		pthread_mutex_unlock( &rtaudio->refresh_mutex );
774 	}
775 }
776 
777 static int rtaudio_callback( void *outputBuffer, void *inputBuffer,
778 	unsigned int nFrames, double streamTime, RtAudioStreamStatus status, void *userData )
779 {
780 	RtAudioConsumer* rtaudio = (RtAudioConsumer*) userData;
781 	return rtaudio->callback( (int16_t*) outputBuffer, (int16_t*) inputBuffer, nFrames, streamTime, status );
782 }
783 
784 static void *consumer_thread_proxy( void *arg )
785 {
786 	RtAudioConsumer* rtaudio = (RtAudioConsumer*) arg;
787 	rtaudio->consumer_thread();
788 	return NULL;
789 }
790 
791 static void *video_thread_proxy( void *arg )
792 {
793 	RtAudioConsumer* rtaudio = (RtAudioConsumer*) arg;
794 	rtaudio->video_thread();
795 	return NULL;
796 }
797 
798 /** Start the consumer.
799  */
800 
801 static int start( mlt_consumer consumer )
802 {
803 	RtAudioConsumer* rtaudio = (RtAudioConsumer*) consumer->child;
804 	return rtaudio->start();
805 }
806 
807 /** Stop the consumer.
808  */
809 
810 static int stop( mlt_consumer consumer )
811 {
812 	RtAudioConsumer* rtaudio = (RtAudioConsumer*) consumer->child;
813 	return rtaudio->stop();
814 }
815 
816 /** Determine if the consumer is stopped.
817  */
818 
819 static int is_stopped( mlt_consumer consumer )
820 {
821 	RtAudioConsumer* rtaudio = (RtAudioConsumer*) consumer->child;
822 	return !rtaudio->running;
823 }
824 
825 static void purge( mlt_consumer consumer )
826 {
827 	RtAudioConsumer* rtaudio = (RtAudioConsumer*) consumer->child;
828 	rtaudio->purge();
829 }
830 
831 /** Close the consumer.
832  */
833 
834 static void close( mlt_consumer consumer )
835 {
836 	// Stop the consumer
837 	mlt_consumer_stop( consumer );
838 
839 	// Close the parent
840 	consumer->close = NULL;
841 	mlt_consumer_close( consumer );
842 
843 	// Free the memory
844 	delete (RtAudioConsumer*) consumer->child;
845 }
846 
847 extern "C" {
848 
849 mlt_consumer consumer_rtaudio_init( mlt_profile profile, mlt_service_type type, const char *id, char *arg )
850 {
851 	// Allocate the consumer
852 	RtAudioConsumer* rtaudio = new RtAudioConsumer();
853 	mlt_consumer consumer = NULL;
854 
855 	// If allocated
856 	if ( rtaudio && !mlt_consumer_init( rtaudio->getConsumer(), rtaudio, profile ) )
857 	{
858 		// If initialises without error
859 		if ( rtaudio->open( arg? arg : getenv( "AUDIODEV" ) ) )
860 		{
861 			// Setup callbacks
862 			consumer = rtaudio->getConsumer();
863 			consumer->close = close;
864 			consumer->start = start;
865 			consumer->stop = stop;
866 			consumer->is_stopped = is_stopped;
867 			consumer->purge = purge;
868 		}
869 		else
870 		{
871 			mlt_consumer_close( rtaudio->getConsumer() );
872 			delete rtaudio;
873 		}
874 	}
875 
876 	// Return consumer
877 	return consumer;
878 }
879 
880 static mlt_properties metadata( mlt_service_type type, const char *id, void *data )
881 {
882 	char file[ PATH_MAX ];
883 	const char *service_type = "consumer";
884 	snprintf( file, PATH_MAX, "%s/rtaudio/%s_%s.yml", mlt_environment( "MLT_DATA" ), service_type, id );
885 	return mlt_properties_parse_yaml( file );
886 }
887 
888 MLT_REPOSITORY
889 {
890 	MLT_REGISTER( mlt_service_consumer_type, "rtaudio", consumer_rtaudio_init );
891 	MLT_REGISTER_METADATA( mlt_service_consumer_type, "rtaudio", metadata, NULL );
892 }
893 
894 } // extern C
895