1 /*
2  * Copyright (C) 1999-2017 Paul Davis <paul@linuxaudiosystems.com>
3  * Copyright (C) 2009-2015 David Robillard <d@drobilla.net>
4  * Copyright (C) 2010-2012 Carl Hetherington <carl@carlh.net>
5  * Copyright (C) 2014-2017 Robin Gareus <robin@gareus.org>
6  * Copyright (C) 2015 GZharun <grygoriiz@wavesglobal.com>
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License along
19  * with this program; if not, write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21  */
22 
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 
27 #ifndef PLATFORM_WINDOWS
28 #include <poll.h>
29 #endif
30 
31 #include "pbd/error.h"
32 #include "pbd/pthread_utils.h"
33 
34 #include "ardour/butler.h"
35 #include "ardour/debug.h"
36 #include "ardour/disk_io.h"
37 #include "ardour/disk_reader.h"
38 #include "ardour/io.h"
39 #include "ardour/session.h"
40 #include "ardour/track.h"
41 #include "ardour/auditioner.h"
42 
43 #include "pbd/i18n.h"
44 
45 using namespace PBD;
46 
47 namespace ARDOUR {
48 
Butler(Session & s)49 Butler::Butler(Session& s)
50 	: SessionHandleRef (s)
51 	, thread()
52 	, have_thread (false)
53 	, _audio_capture_buffer_size(0)
54 	, _audio_playback_buffer_size(0)
55 	, _midi_buffer_size(0)
56 	, pool_trash(16)
57 	, _xthread (true)
58 {
59 	g_atomic_int_set (&should_do_transport_work, 0);
60 	SessionEvent::pool->set_trash (&pool_trash);
61 
62 	/* catch future changes to parameters */
63 	Config->ParameterChanged.connect_same_thread (*this, boost::bind (&Butler::config_changed, this, _1));
64 }
65 
~Butler()66 Butler::~Butler()
67 {
68 	terminate_thread ();
69 }
70 
71 void
map_parameters()72 Butler::map_parameters ()
73 {
74         /* use any current ones that we care about */
75         boost::function<void (std::string)> ff (boost::bind (&Butler::config_changed, this, _1));
76         Config->map_parameters (ff);
77 }
78 
79 void
config_changed(std::string p)80 Butler::config_changed (std::string p)
81 {
82 	if (p == "playback-buffer-seconds") {
83 		_session.adjust_playback_buffering ();
84 		if (Config->get_buffering_preset() == Custom) {
85 			/* size is in Samples, not bytes */
86 			samplecnt_t audio_playback_buffer_size = (uint32_t) floor (Config->get_audio_playback_buffer_seconds() * _session.sample_rate());
87 			if (_audio_playback_buffer_size != audio_playback_buffer_size) {
88 				_audio_playback_buffer_size = audio_playback_buffer_size;
89 				_session.adjust_playback_buffering ();
90 			}
91 		}
92 	} else if (p == "capture-buffer-seconds") {
93 		if (Config->get_buffering_preset() == Custom) {
94 			/* size is in Samples, not bytes */
95 			samplecnt_t audio_capture_buffer_size = (uint32_t) floor (Config->get_audio_capture_buffer_seconds() * _session.sample_rate());
96 			if (_audio_capture_buffer_size != audio_capture_buffer_size) {
97 				_audio_capture_buffer_size = audio_capture_buffer_size;
98 				_session.adjust_capture_buffering ();
99 			}
100 		}
101 	} else if (p == "buffering-preset") {
102 		DiskIOProcessor::set_buffering_parameters (Config->get_buffering_preset());
103 		samplecnt_t audio_capture_buffer_size = (uint32_t) floor (Config->get_audio_capture_buffer_seconds() * _session.sample_rate());
104 		samplecnt_t audio_playback_buffer_size = (uint32_t) floor (Config->get_audio_playback_buffer_seconds() * _session.sample_rate());
105 		if (_audio_capture_buffer_size != audio_capture_buffer_size) {
106 			_audio_capture_buffer_size = audio_capture_buffer_size;
107 			_session.adjust_capture_buffering ();
108 		}
109 		if (_audio_playback_buffer_size != audio_playback_buffer_size) {
110 			_audio_playback_buffer_size = audio_playback_buffer_size;
111 			_session.adjust_playback_buffering ();
112 		}
113 	}
114 }
115 
116 int
start_thread()117 Butler::start_thread()
118 {
119 	// set up capture and playback buffering
120 	DiskIOProcessor::set_buffering_parameters (Config->get_buffering_preset());
121 
122 	/* size is in Samples, not bytes */
123 	const float rate = (float)_session.sample_rate();
124 	_audio_capture_buffer_size = (uint32_t) floor (Config->get_audio_capture_buffer_seconds() * rate);
125 	_audio_playback_buffer_size = (uint32_t) floor (Config->get_audio_playback_buffer_seconds() * rate);
126 
127 	/* size is in bytes
128 	 * XXX: AudioEngine needs to tell us the MIDI buffer size
129 	 * (i.e. how many MIDI bytes we might see in a cycle)
130 	 */
131 	_midi_buffer_size = (uint32_t) floor (Config->get_midi_track_buffer_seconds() * rate);
132 
133 	should_run = false;
134 
135 	if (pthread_create_and_store ("disk butler", &thread, _thread_work, this)) {
136 		error << _("Session: could not create butler thread") << endmsg;
137 		return -1;
138 	}
139 
140 	//pthread_detach (thread);
141 	have_thread = true;
142 
143 	// we are ready to request buffer adjustments
144 	_session.adjust_capture_buffering ();
145 	_session.adjust_playback_buffering ();
146 
147 	return 0;
148 }
149 
150 void
terminate_thread()151 Butler::terminate_thread ()
152 {
153 	if (have_thread) {
154 		void* status;
155                 DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: ask butler to quit @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
156 		queue_request (Request::Quit);
157 		pthread_join (thread, &status);
158 	}
159 }
160 
161 void *
_thread_work(void * arg)162 Butler::_thread_work (void* arg)
163 {
164 	SessionEvent::create_per_thread_pool ("butler events", 4096);
165 	pthread_set_name (X_("butler"));
166 	return ((Butler *) arg)->thread_work ();
167 }
168 
169 void *
thread_work()170 Butler::thread_work ()
171 {
172 	uint32_t err = 0;
173 
174 	bool disk_work_outstanding = false;
175 	RouteList::iterator i;
176 
177 	while (true) {
178 		DEBUG_TRACE (DEBUG::Butler, string_compose ("%1 butler main loop, disk work outstanding ? %2 @ %3\n", DEBUG_THREAD_SELF, disk_work_outstanding, g_get_monotonic_time()));
179 
180 		if(!disk_work_outstanding) {
181 			DEBUG_TRACE (DEBUG::Butler, string_compose ("%1 butler waits for requests @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
182 
183 			char msg;
184 			/* empty the pipe of all current requests */
185 			if (_xthread.receive (msg, true) >= 0) {
186 				Request::Type req = (Request::Type) msg;
187 				switch (req) {
188 
189 					case Request::Run:
190 						DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: butler asked to run @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
191 						should_run = true;
192 						break;
193 
194 					case Request::Pause:
195 						DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: butler asked to pause @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
196 						should_run = false;
197 						break;
198 
199 					case Request::Quit:
200 						DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: butler asked to quit @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
201 						return 0;
202 						abort(); /*NOTREACHED*/
203 						break;
204 
205 					default:
206 						break;
207 				}
208 			}
209 		}
210 
211 
212 	  restart:
213 		DEBUG_TRACE (DEBUG::Butler, "at restart for disk work\n");
214 		disk_work_outstanding = false;
215 
216 		if (transport_work_requested()) {
217 			DEBUG_TRACE (DEBUG::Butler, string_compose ("do transport work @ %1\n", g_get_monotonic_time()));
218 			_session.butler_transport_work ();
219 			DEBUG_TRACE (DEBUG::Butler, string_compose ("\ttransport work complete @ %1, twr = %2\n", g_get_monotonic_time(), transport_work_requested()));
220 
221 			if (_session.locate_initiated()) {
222 				/* we have done the "stop" required for a
223 				   locate (DeclickToLocate state in TFSM), but
224 				   once that finishes we're going to do a locate,
225 				   so do not bother with buffer refills at this
226 				   time.
227 				*/
228 				Glib::Threads::Mutex::Lock lm (request_lock);
229 				DEBUG_TRACE (DEBUG::Butler, string_compose ("\tlocate pending, so just pause @ %1 till woken again\n", g_get_monotonic_time()));
230 				paused.signal ();
231 				continue;
232 			}
233 		}
234 
235 		sampleoffset_t audition_seek;
236 		if (should_run && _session.is_auditioning() && (audition_seek = _session.the_auditioner()->seek_sample()) >= 0) {
237 			boost::shared_ptr<Track> tr = boost::dynamic_pointer_cast<Track> (_session.the_auditioner());
238 			DEBUG_TRACE (DEBUG::Butler, "seek the auditioner\n");
239 			tr->seek(audition_seek);
240 			tr->do_refill ();
241 			_session.the_auditioner()->seek_response(audition_seek);
242 		}
243 
244 		boost::shared_ptr<RouteList> rl = _session.get_routes();
245 
246 		RouteList rl_with_auditioner = *rl;
247 		rl_with_auditioner.push_back (_session.the_auditioner());
248 
249 		DEBUG_TRACE (DEBUG::Butler, string_compose ("butler starts refill loop, twr = %1\n", transport_work_requested()));
250 
251 		for (i = rl_with_auditioner.begin(); !transport_work_requested() && should_run && i != rl_with_auditioner.end(); ++i) {
252 
253 			boost::shared_ptr<Track> tr = boost::dynamic_pointer_cast<Track> (*i);
254 
255 			if (!tr) {
256 				continue;
257 			}
258 
259 			boost::shared_ptr<IO> io = tr->input ();
260 
261 			if (io && !io->active()) {
262 				/* don't read inactive tracks */
263 				// DEBUG_TRACE (DEBUG::Butler, string_compose ("butler skips inactive track %1\n", tr->name()));
264 				continue;
265 			}
266 			// DEBUG_TRACE (DEBUG::Butler, string_compose ("butler refills %1, playback load = %2\n", tr->name(), tr->playback_buffer_load()));
267 			switch (tr->do_refill ()) {
268 			case 0:
269 				//DEBUG_TRACE (DEBUG::Butler, string_compose ("\ttrack refill done %1\n", tr->name()));
270 				break;
271 
272 			case 1:
273 				DEBUG_TRACE (DEBUG::Butler, string_compose ("\ttrack refill unfinished %1\n", tr->name()));
274 				disk_work_outstanding = true;
275 				break;
276 
277 			default:
278 				error << string_compose(_("Butler read ahead failure on dstream %1"), (*i)->name()) << endmsg;
279                                 std::cerr << string_compose(_("Butler read ahead failure on dstream %1"), (*i)->name()) << std::endl;
280 				break;
281 			}
282 
283 		}
284 
285 		if (i != rl_with_auditioner.begin() && i != rl_with_auditioner.end()) {
286 			/* we didn't get to all the streams */
287 			disk_work_outstanding = true;
288 		}
289 
290 		if (!err && transport_work_requested()) {
291 			DEBUG_TRACE (DEBUG::Butler, "transport work requested during refill, back to restart\n");
292 			goto restart;
293 		}
294 
295 		disk_work_outstanding = disk_work_outstanding || flush_tracks_to_disk_normal (rl, err);
296 
297 		if (err && _session.actively_recording()) {
298 			/* stop the transport and try to catch as much possible
299 			   captured state as we can.
300 			*/
301 			DEBUG_TRACE (DEBUG::Butler, "error occurred during recording - stop transport\n");
302 			_session.request_stop ();
303 		}
304 
305 		if (!err && transport_work_requested()) {
306 			DEBUG_TRACE (DEBUG::Butler, "transport work requested during flush, back to restart\n");
307 			goto restart;
308 		}
309 
310 		if (!disk_work_outstanding) {
311 			_session.refresh_disk_space ();
312 		}
313 
314 		{
315 			Glib::Threads::Mutex::Lock lm (request_lock);
316 
317 			if (should_run && (disk_work_outstanding || transport_work_requested())) {
318                                 DEBUG_TRACE (DEBUG::Butler, string_compose ("at end, should run %1 disk work %2 transport work %3 ... goto restart\n",
319                                                                             should_run, disk_work_outstanding, transport_work_requested()));
320 				goto restart;
321 			}
322 
323                         DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: butler signals pause @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
324 			paused.signal();
325 		}
326 
327 		DEBUG_TRACE (DEBUG::Butler, "butler emptying pool trash\n");
328 		empty_pool_trash ();
329 	}
330 
331 	return (0);
332 }
333 
334 bool
flush_tracks_to_disk_normal(boost::shared_ptr<RouteList> rl,uint32_t & errors)335 Butler::flush_tracks_to_disk_normal (boost::shared_ptr<RouteList> rl, uint32_t& errors)
336 {
337 	bool disk_work_outstanding = false;
338 
339 	for (RouteList::iterator i = rl->begin(); !transport_work_requested() && should_run && i != rl->end(); ++i) {
340 
341 		// cerr << "write behind for " << (*i)->name () << endl;
342 
343 		boost::shared_ptr<Track> tr = boost::dynamic_pointer_cast<Track> (*i);
344 
345 		if (!tr) {
346 			continue;
347 		}
348 
349 		/* note that we still try to flush diskstreams attached to inactive routes
350 		 */
351 
352 		int ret;
353 
354 		// DEBUG_TRACE (DEBUG::Butler, string_compose ("butler flushes track %1 capture load %2\n", tr->name(), tr->capture_buffer_load()));
355 		ret = tr->do_flush (ButlerContext, false);
356 		switch (ret) {
357 		case 0:
358 			//DEBUG_TRACE (DEBUG::Butler, string_compose ("\tflush complete for %1\n", tr->name()));
359 			break;
360 
361 		case 1:
362 			//DEBUG_TRACE (DEBUG::Butler, string_compose ("\tflush not finished for %1\n", tr->name()));
363 			disk_work_outstanding = true;
364 			break;
365 
366 		default:
367 			errors++;
368 			error << string_compose(_("Butler write-behind failure on dstream %1"), (*i)->name()) << endmsg;
369 			std::cerr << string_compose(_("Butler write-behind failure on dstream %1"), (*i)->name()) << std::endl;
370 			/* don't break - try to flush all streams in case they
371 			   are split across disks.
372 			*/
373 		}
374 	}
375 
376 	return disk_work_outstanding;
377 }
378 
379 bool
flush_tracks_to_disk_after_locate(boost::shared_ptr<RouteList> rl,uint32_t & errors)380 Butler::flush_tracks_to_disk_after_locate (boost::shared_ptr<RouteList> rl, uint32_t& errors)
381 {
382 	bool disk_work_outstanding = false;
383 
384 	/* almost the same as the "normal" version except that we do not test
385 	 * for transport_work_requested() and we force flushes.
386 	 */
387 
388 	for (RouteList::iterator i = rl->begin(); i != rl->end(); ++i) {
389 
390 		// cerr << "write behind for " << (*i)->name () << endl;
391 
392 		boost::shared_ptr<Track> tr = boost::dynamic_pointer_cast<Track> (*i);
393 
394 		if (!tr) {
395 			continue;
396 		}
397 
398 		/* note that we still try to flush diskstreams attached to inactive routes
399 		 */
400 
401 		int ret;
402 
403 		DEBUG_TRACE (DEBUG::Butler, string_compose ("butler flushes track %1 capture load %2\n", tr->name(), tr->capture_buffer_load()));
404 		ret = tr->do_flush (ButlerContext, true);
405 		switch (ret) {
406 		case 0:
407 			DEBUG_TRACE (DEBUG::Butler, string_compose ("\tflush complete for %1\n", tr->name()));
408 			break;
409 
410 		case 1:
411 			DEBUG_TRACE (DEBUG::Butler, string_compose ("\tflush not finished for %1\n", tr->name()));
412 			disk_work_outstanding = true;
413 			break;
414 
415 		default:
416 			errors++;
417 			error << string_compose(_("Butler write-behind failure on dstream %1"), (*i)->name()) << endmsg;
418 			std::cerr << string_compose(_("Butler write-behind failure on dstream %1"), (*i)->name()) << std::endl;
419 			/* don't break - try to flush all streams in case they
420 			   are split across disks.
421 			*/
422 		}
423 	}
424 
425 	return disk_work_outstanding;
426 }
427 
428 void
schedule_transport_work()429 Butler::schedule_transport_work ()
430 {
431 	DEBUG_TRACE (DEBUG::Butler, "requesting more transport work\n");
432 	g_atomic_int_inc (&should_do_transport_work);
433 	summon ();
434 }
435 
436 void
queue_request(Request::Type r)437 Butler::queue_request (Request::Type r)
438 {
439 	char c = r;
440 	if (_xthread.deliver (c) != 1) {
441 		/* the x-thread channel is non-blocking
442 		 * write may fail, but we really don't want to wait
443 		 * under normal circumstances.
444 		 *
445 		 * a lost "run" requests under normal RT operation
446 		 * is mostly harmless.
447 		 *
448 		 * TODO if ardour is freehweeling, wait & retry.
449 		 * ditto for Request::Type Quit
450 		 */
451 		assert(1); // we're screwd
452 	}
453 }
454 
455 void
summon()456 Butler::summon ()
457 {
458 	DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: summon butler to run @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
459 	queue_request (Request::Run);
460 }
461 
462 void
stop()463 Butler::stop ()
464 {
465 	Glib::Threads::Mutex::Lock lm (request_lock);
466 	DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: asking butler to stop @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
467 	queue_request (Request::Pause);
468 	paused.wait(request_lock);
469 }
470 
471 void
wait_until_finished()472 Butler::wait_until_finished ()
473 {
474 	Glib::Threads::Mutex::Lock lm (request_lock);
475 	DEBUG_TRACE (DEBUG::Butler, string_compose ("%1: waiting for butler to finish @ %2\n", DEBUG_THREAD_SELF, g_get_monotonic_time()));
476 	queue_request (Request::Pause);
477 	paused.wait(request_lock);
478 }
479 
480 bool
transport_work_requested() const481 Butler::transport_work_requested () const
482 {
483 	return g_atomic_int_get (&should_do_transport_work);
484 }
485 
486 void
empty_pool_trash()487 Butler::empty_pool_trash ()
488 {
489 	/* look in the trash, deleting empty pools until we come to one that is not empty */
490 
491 	RingBuffer<CrossThreadPool*>::rw_vector vec;
492 	pool_trash.get_read_vector (&vec);
493 
494 	guint deleted = 0;
495 
496 	for (int i = 0; i < 2; ++i) {
497 		for (guint j = 0; j < vec.len[i]; ++j) {
498 			if (vec.buf[i][j]->empty()) {
499 				delete vec.buf[i][j];
500 				++deleted;
501 			} else {
502 				/* found a non-empty pool, so stop deleting */
503 				if (deleted) {
504 					pool_trash.increment_read_idx (deleted);
505 				}
506 				return;
507 			}
508 		}
509 	}
510 
511 	if (deleted) {
512 		pool_trash.increment_read_idx (deleted);
513 	}
514 }
515 
516 void
drop_references()517 Butler::drop_references ()
518 {
519 	std::cerr << "Butler drops pool trash\n";
520 	SessionEvent::pool->set_trash (0);
521 }
522 
523 
524 } // namespace ARDOUR
525