1 /*
2  * consumer_jack.c -- a JACK audio consumer
3  * Copyright (C) 2011-2020 Meltytech, LLC
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19 
20 #include <framework/mlt.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <jack/jack.h>
28 #include <jack/ringbuffer.h>
29 
30 #define BUFFER_LEN (204800 * 6)
31 
32 pthread_mutex_t g_activate_mutex = PTHREAD_MUTEX_INITIALIZER;
33 
34 /** This classes definition.
35 */
36 
37 typedef struct consumer_jack_s *consumer_jack;
38 
39 struct consumer_jack_s
40 {
41 	struct mlt_consumer_s parent;
42 	jack_client_t *jack;
43 	mlt_deque queue;
44 	pthread_t thread;
45 	int joined;
46 	int running;
47 	pthread_mutex_t video_mutex;
48 	pthread_cond_t video_cond;
49 	int playing;
50 
51 	pthread_cond_t refresh_cond;
52 	pthread_mutex_t refresh_mutex;
53 	int refresh_count;
54 	int counter;
55 	jack_ringbuffer_t **ringbuffers;
56 	jack_port_t **ports;
57 };
58 
59 /** Forward references to static functions.
60 */
61 
62 static int consumer_start( mlt_consumer parent );
63 static int consumer_stop( mlt_consumer parent );
64 static int consumer_is_stopped( mlt_consumer parent );
65 static void consumer_close( mlt_consumer parent );
66 static void *consumer_thread( void * );
67 static void consumer_refresh_cb( mlt_consumer sdl, mlt_consumer parent, char *name );
68 static int jack_process( jack_nframes_t frames, void * data );
69 
70 /** Constructor
71 */
72 
consumer_jack_init(mlt_profile profile,mlt_service_type type,const char * id,char * arg)73 mlt_consumer consumer_jack_init( mlt_profile profile, mlt_service_type type, const char *id, char *arg )
74 {
75 	// Create the consumer object
76 	consumer_jack self = calloc( 1, sizeof( struct consumer_jack_s ) );
77 
78 	// If no malloc'd and consumer init ok
79 	if ( self != NULL && mlt_consumer_init( &self->parent, self, profile ) == 0 )
80 	{
81 		char name[14];
82 
83 		snprintf( name, sizeof( name ), "mlt%d", getpid() );
84 		if (( self->jack = jack_client_open( name, JackNullOption, NULL ) ))
85 		{
86 			jack_set_process_callback( self->jack, jack_process, self );
87 
88 			// Create the queue
89 			self->queue = mlt_deque_init( );
90 
91 			// Get the parent consumer object
92 			mlt_consumer parent = &self->parent;
93 
94 			// We have stuff to clean up, so override the close method
95 			parent->close = consumer_close;
96 
97 			// get a handle on properties
98 			mlt_service service = MLT_CONSUMER_SERVICE( parent );
99 			mlt_properties properties = MLT_SERVICE_PROPERTIES( service );
100 
101 			// This is the initialisation of the consumer
102 			pthread_mutex_init( &self->video_mutex, NULL );
103 			pthread_cond_init( &self->video_cond, NULL);
104 
105 			// Default scaler (for now we'll use nearest)
106 			mlt_properties_set( properties, "rescale", "nearest" );
107 			mlt_properties_set( properties, "deinterlace_method", "onefield" );
108 
109 			// Default buffer for low latency
110 			mlt_properties_set_int( properties, "buffer", 1 );
111 
112 			// Set frequency from JACK
113 			mlt_properties_set_int( properties, "frequency", (int) jack_get_sample_rate( self->jack ) );
114 
115 			// Set default volume
116 			mlt_properties_set_double( properties, "volume", 1.0 );
117 
118 			// Ensure we don't join on a non-running object
119 			self->joined = 1;
120 
121 			// Allow thread to be started/stopped
122 			parent->start = consumer_start;
123 			parent->stop = consumer_stop;
124 			parent->is_stopped = consumer_is_stopped;
125 
126 			// Initialize the refresh handler
127 			pthread_cond_init( &self->refresh_cond, NULL );
128 			pthread_mutex_init( &self->refresh_mutex, NULL );
129 			mlt_events_listen( MLT_CONSUMER_PROPERTIES( parent ), self, "property-changed", ( mlt_listener )consumer_refresh_cb );
130 
131 			// Return the consumer produced
132 			return parent;
133 		}
134 	}
135 
136 	// malloc or consumer init failed
137 	free( self );
138 
139 	// Indicate failure
140 	return NULL;
141 }
142 
consumer_refresh_cb(mlt_consumer sdl,mlt_consumer parent,char * name)143 static void consumer_refresh_cb( mlt_consumer sdl, mlt_consumer parent, char *name )
144 {
145 	if ( !strcmp( name, "refresh" ) )
146 	{
147 		consumer_jack self = parent->child;
148 		pthread_mutex_lock( &self->refresh_mutex );
149 		self->refresh_count = self->refresh_count <= 0 ? 1 : self->refresh_count + 1;
150 		pthread_cond_broadcast( &self->refresh_cond );
151 		pthread_mutex_unlock( &self->refresh_mutex );
152 	}
153 }
154 
consumer_start(mlt_consumer parent)155 static int consumer_start( mlt_consumer parent )
156 {
157 	consumer_jack self = parent->child;
158 
159 	if ( !self->running )
160 	{
161 		consumer_stop( parent );
162 		self->running = 1;
163 		self->joined = 0;
164 		pthread_create( &self->thread, NULL, consumer_thread, self );
165 	}
166 
167 	return 0;
168 }
169 
consumer_stop(mlt_consumer parent)170 static int consumer_stop( mlt_consumer parent )
171 {
172 	// Get the actual object
173 	consumer_jack self = parent->child;
174 
175 	if ( self->running && !self->joined )
176 	{
177 		// Kill the thread and clean up
178 		self->joined = 1;
179 		self->running = 0;
180 
181 		// Unlatch the consumer thread
182 		pthread_mutex_lock( &self->refresh_mutex );
183 		pthread_cond_broadcast( &self->refresh_cond );
184 		pthread_mutex_unlock( &self->refresh_mutex );
185 
186 		// Cleanup the main thread
187 #ifndef _WIN32
188 		if ( self->thread )
189 #endif
190 			pthread_join( self->thread, NULL );
191 
192 		// Unlatch the video thread
193 		pthread_mutex_lock( &self->video_mutex );
194 		pthread_cond_broadcast( &self->video_cond );
195 		pthread_mutex_unlock( &self->video_mutex );
196 
197 		// Cleanup JACK
198 		if ( self->playing )
199 			jack_deactivate( self->jack );
200 		if ( self->ringbuffers )
201 		{
202 			int n = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( parent ), "channels" );
203 			while ( n-- )
204 			{
205 				jack_ringbuffer_free( self->ringbuffers[n] );
206 				jack_port_unregister( self->jack, self->ports[n] );
207 			}
208 			mlt_pool_release( self->ringbuffers );
209 		}
210 		self->ringbuffers = NULL;
211 		if ( self->ports )
212 			mlt_pool_release( self->ports );
213 		self->ports = NULL;
214 	}
215 
216 	return 0;
217 }
218 
consumer_is_stopped(mlt_consumer parent)219 static int consumer_is_stopped( mlt_consumer parent )
220 {
221 	consumer_jack self = parent->child;
222 	return !self->running;
223 }
224 
jack_process(jack_nframes_t frames,void * data)225 static int jack_process( jack_nframes_t frames, void * data )
226 {
227 	int error = 0;
228 	consumer_jack self = (consumer_jack) data;
229 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( &self->parent );
230 	int channels = mlt_properties_get_int( properties, "channels" );
231 	int i;
232 
233 	if ( !self->ringbuffers )
234 		return 1;
235 
236 	for ( i = 0; i < channels; i++ )
237 	{
238 		size_t jack_size = ( frames * sizeof(float) );
239 		size_t ring_size = jack_ringbuffer_read_space( self->ringbuffers[i] );
240 		char *dest = jack_port_get_buffer( self->ports[i], frames );
241 
242 		jack_ringbuffer_read( self->ringbuffers[i], dest, ring_size < jack_size ? ring_size : jack_size );
243 		if ( ring_size < jack_size )
244 			memset( dest + ring_size, 0, jack_size - ring_size );
245 	}
246 
247 	return error;
248 }
249 
initialise_jack_ports(consumer_jack self)250 static void initialise_jack_ports( consumer_jack self )
251 {
252 	int i;
253 	char mlt_name[20], con_name[30];
254 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( &self->parent );
255 	const char **ports = NULL;
256 
257 	// Propagate these for the Jack processing callback
258 	int channels = mlt_properties_get_int( properties, "channels" );
259 
260 	// Allocate buffers and ports
261 	self->ringbuffers = mlt_pool_alloc( sizeof( jack_ringbuffer_t *) * channels );
262 	self->ports = mlt_pool_alloc( sizeof(jack_port_t *) * channels );
263 
264 	// Start Jack processing - required before registering ports
265 	pthread_mutex_lock( &g_activate_mutex );
266 	jack_activate( self->jack );
267 	pthread_mutex_unlock( &g_activate_mutex );
268 	self->playing = 1;
269 
270 	// Register Jack ports
271 	for ( i = 0; i < channels; i++ )
272 	{
273 		self->ringbuffers[i] = jack_ringbuffer_create( BUFFER_LEN * sizeof(float) );
274 		snprintf( mlt_name, sizeof( mlt_name ), "out_%d", i + 1 );
275 		self->ports[i] = jack_port_register( self->jack, mlt_name, JACK_DEFAULT_AUDIO_TYPE,
276 				JackPortIsOutput | JackPortIsTerminal, 0 );
277 	}
278 
279 	// Establish connections
280 	for ( i = 0; i < channels; i++ )
281 	{
282 		snprintf( mlt_name, sizeof( mlt_name ), "%s", jack_port_name( self->ports[i] ) );
283 		if ( mlt_properties_get( properties, con_name ) )
284 			snprintf( con_name, sizeof( con_name ), "%s", mlt_properties_get( properties, con_name ) );
285 		else
286 		{
287 			if ( !ports )
288 				ports = jack_get_ports( self->jack, NULL, NULL, JackPortIsPhysical | JackPortIsInput );
289 			if ( ports )
290 				strncpy( con_name, ports[i], sizeof( con_name ));
291 			else
292 				snprintf( con_name, sizeof( con_name ), "system:playback_%d", i + 1);
293 			con_name[ sizeof( con_name ) - 1 ] = '\0';
294 		}
295 		mlt_log_verbose( NULL, "JACK connect %s to %s\n", mlt_name, con_name );
296 		jack_connect( self->jack, mlt_name, con_name );
297 	}
298 	if ( ports )
299 		jack_free( ports );
300 }
301 
consumer_play_audio(consumer_jack self,mlt_frame frame,int init_audio,int * duration)302 static int consumer_play_audio( consumer_jack self, mlt_frame frame, int init_audio, int *duration )
303 {
304 	// Get the properties of this consumer
305 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( &self->parent );
306 	mlt_audio_format afmt = mlt_audio_float;
307 
308 	// Set the preferred params of the test card signal
309 	double speed = mlt_properties_get_double( MLT_FRAME_PROPERTIES(frame), "_speed" );
310 	int channels = mlt_properties_get_int( properties, "channels" );
311 	int frequency = mlt_properties_get_int( properties, "frequency" );
312 	int scrub = mlt_properties_get_int( properties, "scrub_audio" );
313 	int samples = mlt_audio_calculate_frame_samples( mlt_properties_get_double( properties, "fps" ), frequency, self->counter++ );
314 	float *buffer;
315 
316 	mlt_frame_get_audio( frame, (void**) &buffer, &afmt, &frequency, &channels, &samples );
317 	*duration = ( ( samples * 1000 ) / frequency );
318 
319 	if ( mlt_properties_get_int( properties, "audio_off" ) )
320 	{
321 		init_audio = 1;
322 		return init_audio;
323 	}
324 
325 	if ( init_audio == 1 )
326 	{
327 		self->playing = 0;
328 		initialise_jack_ports( self );
329 		init_audio = 0;
330 	}
331 
332 	if ( init_audio == 0 && ( speed == 1.0 || speed == 0.0 ) )
333 	{
334 		int i;
335 		size_t mlt_size = samples * sizeof(float);
336 		float volume = mlt_properties_get_double( properties, "volume" );
337 
338 		if ( !scrub && speed == 0.0 )
339 			volume = 0.0;
340 
341 		if ( volume != 1.0 )
342 		{
343 			float *p = buffer;
344 			i = samples * channels + 1;
345 			while (--i)
346 				*p++ *= volume;
347 		}
348 
349 		// Write into output ringbuffer
350 		for ( i = 0; i < channels; i++ )
351 		{
352 			size_t ring_size = jack_ringbuffer_write_space( self->ringbuffers[i] );
353 			if ( ring_size >= mlt_size )
354 				jack_ringbuffer_write( self->ringbuffers[i], (char*)( buffer + i * samples ), mlt_size );
355 		}
356 	}
357 
358 	return init_audio;
359 }
360 
consumer_play_video(consumer_jack self,mlt_frame frame)361 static int consumer_play_video( consumer_jack self, mlt_frame frame )
362 {
363 	// Get the properties of this consumer
364 	mlt_properties properties = MLT_CONSUMER_PROPERTIES( &self->parent );
365 	if ( self->running && !mlt_consumer_is_stopped( &self->parent ) )
366 		mlt_events_fire( properties, "consumer-frame-show", frame, NULL );
367 
368 	return 0;
369 }
370 
video_thread(void * arg)371 static void *video_thread( void *arg )
372 {
373 	// Identify the arg
374 	consumer_jack self = arg;
375 
376 	// Obtain time of thread start
377 	struct timeval now;
378 	int64_t start = 0;
379 	int64_t elapsed = 0;
380 	struct timespec tm;
381 	mlt_frame next = NULL;
382 	mlt_properties properties = NULL;
383 	double speed = 0;
384 
385 	// Get real time flag
386 	int real_time = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( &self->parent ), "real_time" );
387 
388 	// Get the current time
389 	gettimeofday( &now, NULL );
390 
391 	// Determine start time
392 	start = ( int64_t )now.tv_sec * 1000000 + now.tv_usec;
393 
394 	while ( self->running )
395 	{
396 		// Pop the next frame
397 		pthread_mutex_lock( &self->video_mutex );
398 		next = mlt_deque_pop_front( self->queue );
399 		while ( next == NULL && self->running )
400 		{
401 			pthread_cond_wait( &self->video_cond, &self->video_mutex );
402 			next = mlt_deque_pop_front( self->queue );
403 		}
404 		pthread_mutex_unlock( &self->video_mutex );
405 
406 		if ( !self->running || next == NULL ) break;
407 
408 		// Get the properties
409 		properties =  MLT_FRAME_PROPERTIES( next );
410 
411 		// Get the speed of the frame
412 		speed = mlt_properties_get_double( properties, "_speed" );
413 
414 		// Get the current time
415 		gettimeofday( &now, NULL );
416 
417 		// Get the elapsed time
418 		elapsed = ( ( int64_t )now.tv_sec * 1000000 + now.tv_usec ) - start;
419 
420 		// See if we have to delay the display of the current frame
421 		if ( mlt_properties_get_int( properties, "rendered" ) == 1 && self->running )
422 		{
423 			// Obtain the scheduled playout time
424 			int64_t scheduled = mlt_properties_get_int( properties, "playtime" );
425 
426 			// Determine the difference between the elapsed time and the scheduled playout time
427 			int64_t difference = scheduled - elapsed;
428 
429 			// Smooth playback a bit
430 			if ( real_time && ( difference > 20000 && speed == 1.0 ) )
431 			{
432 				tm.tv_sec = difference / 1000000;
433 				tm.tv_nsec = ( difference % 1000000 ) * 500;
434 				nanosleep( &tm, NULL );
435 			}
436 
437 			// Show current frame if not too old
438 			if ( !real_time || ( difference > -10000 || speed != 1.0 || mlt_deque_count( self->queue ) < 2 ) )
439 				consumer_play_video( self, next );
440 
441 			// If the queue is empty, recalculate start to allow build up again
442 			if ( real_time && ( mlt_deque_count( self->queue ) == 0 && speed == 1.0 ) )
443 			{
444 				gettimeofday( &now, NULL );
445 				start = ( ( int64_t )now.tv_sec * 1000000 + now.tv_usec ) - scheduled + 20000;
446 			}
447 		}
448 
449 		// This frame can now be closed
450 		mlt_frame_close( next );
451 		next = NULL;
452 	}
453 
454 	if ( next != NULL )
455 		mlt_frame_close( next );
456 
457 	mlt_consumer_stopped( &self->parent );
458 
459 	return NULL;
460 }
461 
462 /** Threaded wrapper for pipe.
463 */
464 
consumer_thread(void * arg)465 static void *consumer_thread( void *arg )
466 {
467 	// Identify the arg
468 	consumer_jack self = arg;
469 
470 	// Get the consumer
471 	mlt_consumer consumer = &self->parent;
472 
473 	// Get the properties
474 	mlt_properties consumer_props = MLT_CONSUMER_PROPERTIES( consumer );
475 
476 	// Video thread
477 	pthread_t thread;
478 
479 	// internal initialization
480 	int init_audio = 1;
481 	int init_video = 1;
482 	mlt_frame frame = NULL;
483 	mlt_properties properties = NULL;
484 	int duration = 0;
485 	int64_t playtime = 0;
486 	struct timespec tm = { 0, 100000 };
487 //	int last_position = -1;
488 
489 	pthread_mutex_lock( &self->refresh_mutex );
490 	self->refresh_count = 0;
491 	pthread_mutex_unlock( &self->refresh_mutex );
492 
493 	// Loop until told not to
494 	while( self->running )
495 	{
496 		// Get a frame from the attached producer
497 		frame = mlt_consumer_rt_frame( consumer );
498 
499 		// Ensure that we have a frame
500 		if ( frame )
501 		{
502 			// Get the frame properties
503 			properties =  MLT_FRAME_PROPERTIES( frame );
504 
505 			// Get the speed of the frame
506 			double speed = mlt_properties_get_double( properties, "_speed" );
507 
508 			// Get refresh request for the current frame
509 			int refresh = mlt_properties_get_int( consumer_props, "refresh" );
510 
511 			// Clear refresh
512 			mlt_events_block( consumer_props, consumer_props );
513 			mlt_properties_set_int( consumer_props, "refresh", 0 );
514 			mlt_events_unblock( consumer_props, consumer_props );
515 
516 			// Play audio
517 			init_audio = consumer_play_audio( self, frame, init_audio, &duration );
518 
519 			// Determine the start time now
520 			if ( self->playing && init_video )
521 			{
522 				// Create the video thread
523 				pthread_create( &thread, NULL, video_thread, self );
524 
525 				// Video doesn't need to be initialised any more
526 				init_video = 0;
527 			}
528 
529 			// Set playtime for this frame
530 			mlt_properties_set_int( properties, "playtime", playtime );
531 
532 			while ( self->running && speed != 0 && mlt_deque_count( self->queue ) > 15 )
533 				nanosleep( &tm, NULL );
534 
535 			// Push this frame to the back of the video queue
536 			if ( self->running && speed )
537 			{
538 				pthread_mutex_lock( &self->video_mutex );
539 				mlt_deque_push_back( self->queue, frame );
540 				pthread_cond_broadcast( &self->video_cond );
541 				pthread_mutex_unlock( &self->video_mutex );
542 
543 				// Calculate the next playtime
544 				playtime += ( duration * 1000 );
545 			}
546 			else if ( self->running )
547 			{
548 				pthread_mutex_lock( &self->refresh_mutex );
549 				if ( refresh == 0 && self->refresh_count <= 0 )
550 				{
551 					consumer_play_video( self, frame );
552 					pthread_cond_wait( &self->refresh_cond, &self->refresh_mutex );
553 				}
554 				mlt_frame_close( frame );
555 				self->refresh_count --;
556 				pthread_mutex_unlock( &self->refresh_mutex );
557 			}
558 			else
559 			{
560 				mlt_frame_close( frame );
561 				frame = NULL;
562 			}
563 
564 			// Optimisation to reduce latency
565 			if ( frame && speed == 1.0 )
566 			{
567 				// TODO: disabled due to misbehavior on parallel-consumer
568 //				if ( last_position != -1 && last_position + 1 != mlt_frame_get_position( frame ) )
569 //					mlt_consumer_purge( consumer );
570 //				last_position = mlt_frame_get_position( frame );
571 			}
572 			else if (speed == 0.0)
573 			{
574 				mlt_consumer_purge( consumer );
575 //				last_position = -1;
576 			}
577 		}
578 	}
579 
580 	// Kill the video thread
581 	if ( init_video == 0 )
582 	{
583 		pthread_mutex_lock( &self->video_mutex );
584 		pthread_cond_broadcast( &self->video_cond );
585 		pthread_mutex_unlock( &self->video_mutex );
586 		pthread_join( thread, NULL );
587 	}
588 
589 	while( mlt_deque_count( self->queue ) )
590 		mlt_frame_close( mlt_deque_pop_back( self->queue ) );
591 
592 	return NULL;
593 }
594 
595 /** Callback to allow override of the close method.
596 */
597 
consumer_close(mlt_consumer parent)598 static void consumer_close( mlt_consumer parent )
599 {
600 	// Get the actual object
601 	consumer_jack self = parent->child;
602 
603 	// Stop the consumer
604 	mlt_consumer_stop( parent );
605 
606 	// Now clean up the rest
607 	mlt_consumer_close( parent );
608 
609 	// Close the queue
610 	mlt_deque_close( self->queue );
611 
612 	// Destroy mutexes
613 	pthread_mutex_destroy( &self->video_mutex );
614 	pthread_cond_destroy( &self->video_cond );
615 	pthread_mutex_destroy( &self->refresh_mutex );
616 	pthread_cond_destroy( &self->refresh_cond );
617 
618 	// Disconnect from JACK
619 	jack_client_close( self->jack );
620 
621 	// Finally deallocate self
622 	free( self );
623 }
624