1 /* This file is part of Clementine.
2    Copyright 2010, David Sansome <me@davidsansome.com>
3 
4    Clementine is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8 
9    Clementine is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with Clementine.  If not, see <http://www.gnu.org/licenses/>.
16 */
17 
18 #include "config.h"
19 
20 #include <limits>
21 
22 #include <QCoreApplication>
23 #include <QDir>
24 #include <QPair>
25 #include <QRegExp>
26 
27 #include "bufferconsumer.h"
28 #include "config.h"
29 #include "gstelementdeleter.h"
30 #include "gstengine.h"
31 #include "gstenginepipeline.h"
32 #include "core/concurrentrun.h"
33 #include "core/logging.h"
34 #include "core/mac_startup.h"
35 #include "core/signalchecker.h"
36 #include "core/utilities.h"
37 #include "internet/core/internetmodel.h"
38 #ifdef HAVE_SPOTIFY
39 #  include "internet/spotify/spotifyserver.h"
40 #  include "internet/spotify/spotifyservice.h"
41 #endif
42 
43 const int GstEnginePipeline::kGstStateTimeoutNanosecs = 10000000;
44 const int GstEnginePipeline::kFaderFudgeMsec = 2000;
45 
46 const int GstEnginePipeline::kEqBandCount = 10;
47 const int GstEnginePipeline::kEqBandFrequencies[] = {
48     60, 170, 310, 600, 1000, 3000, 6000, 12000, 14000, 16000};
49 
50 int GstEnginePipeline::sId = 1;
51 GstElementDeleter* GstEnginePipeline::sElementDeleter = nullptr;
52 
GstEnginePipeline(GstEngine * engine)53 GstEnginePipeline::GstEnginePipeline(GstEngine* engine)
54     : QObject(nullptr),
55       engine_(engine),
56       id_(sId++),
57       valid_(false),
58       sink_(GstEngine::kAutoSink),
59       segment_start_(0),
60       segment_start_received_(false),
61       emit_track_ended_on_stream_start_(false),
62       emit_track_ended_on_time_discontinuity_(false),
63       last_buffer_offset_(0),
64       eq_enabled_(false),
65       eq_preamp_(0),
66       stereo_balance_(0.0f),
67       rg_enabled_(false),
68       rg_mode_(0),
69       rg_preamp_(0.0),
70       rg_compression_(true),
71       buffer_duration_nanosec_(1 * kNsecPerSec),
72       buffer_min_fill_(33),
73       buffering_(false),
74       mono_playback_(false),
75       sample_rate_(GstEngine::kAutoSampleRate),
76       end_offset_nanosec_(-1),
77       next_beginning_offset_nanosec_(-1),
78       next_end_offset_nanosec_(-1),
79       ignore_next_seek_(false),
80       ignore_tags_(false),
81       pipeline_is_initialised_(false),
82       pipeline_is_connected_(false),
83       pending_seek_nanosec_(-1),
84       last_known_position_ns_(0),
85       volume_percent_(100),
86       volume_modifier_(1.0),
87       pipeline_(nullptr),
88       uridecodebin_(nullptr),
89       audiobin_(nullptr),
90       queue_(nullptr),
91       audioconvert_(nullptr),
92       rgvolume_(nullptr),
93       rglimiter_(nullptr),
94       audioconvert2_(nullptr),
95       equalizer_(nullptr),
96       stereo_panorama_(nullptr),
97       volume_(nullptr),
98       audioscale_(nullptr),
99       audiosink_(nullptr) {
100   if (!sElementDeleter) {
101     sElementDeleter = new GstElementDeleter;
102   }
103 
104   for (int i = 0; i < kEqBandCount; ++i) eq_band_gains_ << 0;
105 }
106 
set_output_device(const QString & sink,const QVariant & device)107 void GstEnginePipeline::set_output_device(const QString& sink,
108                                           const QVariant& device) {
109   sink_ = sink;
110   device_ = device;
111 }
112 
set_replaygain(bool enabled,int mode,float preamp,bool compression)113 void GstEnginePipeline::set_replaygain(bool enabled, int mode, float preamp,
114                                        bool compression) {
115   rg_enabled_ = enabled;
116   rg_mode_ = mode;
117   rg_preamp_ = preamp;
118   rg_compression_ = compression;
119 }
120 
set_buffer_duration_nanosec(qint64 buffer_duration_nanosec)121 void GstEnginePipeline::set_buffer_duration_nanosec(
122     qint64 buffer_duration_nanosec) {
123   buffer_duration_nanosec_ = buffer_duration_nanosec;
124 }
125 
set_buffer_min_fill(int percent)126 void GstEnginePipeline::set_buffer_min_fill(int percent) {
127   buffer_min_fill_ = percent;
128 }
129 
set_mono_playback(bool enabled)130 void GstEnginePipeline::set_mono_playback(bool enabled) {
131   mono_playback_ = enabled;
132 }
133 
set_sample_rate(int rate)134 void GstEnginePipeline::set_sample_rate(int rate) { sample_rate_ = rate; }
135 
ReplaceDecodeBin(GstElement * new_bin)136 bool GstEnginePipeline::ReplaceDecodeBin(GstElement* new_bin) {
137   if (!new_bin) return false;
138 
139   // Destroy the old elements if they are set
140   // Note that the caller to this function MUST schedule the old uridecodebin_
141   // for deletion in the main thread.
142   if (uridecodebin_) {
143     gst_bin_remove(GST_BIN(pipeline_), uridecodebin_);
144   }
145 
146   uridecodebin_ = new_bin;
147   segment_start_ = 0;
148   segment_start_received_ = false;
149   pipeline_is_connected_ = false;
150   gst_bin_add(GST_BIN(pipeline_), uridecodebin_);
151 
152   return true;
153 }
154 
ReplaceDecodeBin(const QUrl & url)155 bool GstEnginePipeline::ReplaceDecodeBin(const QUrl& url) {
156   GstElement* new_bin = CreateDecodeBinFromUrl(url);
157   return ReplaceDecodeBin(new_bin);
158 }
159 
CreateDecodeBinFromUrl(const QUrl & url)160 GstElement* GstEnginePipeline::CreateDecodeBinFromUrl(const QUrl& url) {
161   GstElement* new_bin = nullptr;
162 #ifdef HAVE_SPOTIFY
163   if (url.scheme() == "spotify") {
164     new_bin = gst_bin_new("spotify_bin");
165     if (!new_bin) return nullptr;
166 
167     // Create elements
168     GstElement* src = engine_->CreateElement("tcpserversrc", new_bin);
169     if (!src) {
170       gst_object_unref(GST_OBJECT(new_bin));
171       return nullptr;
172     }
173     GstElement* gdp = engine_->CreateElement("gdpdepay", new_bin);
174     if (!gdp) {
175       gst_object_unref(GST_OBJECT(new_bin));
176       return nullptr;
177     }
178 
179     // Pick a port number
180     const int port = Utilities::PickUnusedPort();
181     g_object_set(G_OBJECT(src), "host", "127.0.0.1", nullptr);
182     g_object_set(G_OBJECT(src), "port", port, nullptr);
183 
184     // Link the elements
185     gst_element_link(src, gdp);
186 
187     // Add a ghost pad
188     GstPad* pad = gst_element_get_static_pad(gdp, "src");
189     gst_element_add_pad(GST_ELEMENT(new_bin), gst_ghost_pad_new("src", pad));
190     gst_object_unref(GST_OBJECT(pad));
191 
192     // Tell spotify to start sending data to us.
193     SpotifyServer* spotify_server =
194         InternetModel::Service<SpotifyService>()->server();
195     // Need to schedule this in the spotify server's thread
196     QMetaObject::invokeMethod(
197         spotify_server, "StartPlayback", Qt::QueuedConnection,
198         Q_ARG(QString, url.toString()), Q_ARG(quint16, port));
199   } else {
200 #endif
201     QByteArray uri;
202     if (url.scheme() == "cdda") {
203       QString str = url.toString();
204       str.remove(str.lastIndexOf(QChar('a')), 1);
205       uri = str.toUtf8();
206     } else {
207       uri = url.toEncoded();
208     }
209     new_bin = engine_->CreateElement("uridecodebin");
210     if (!new_bin) return nullptr;
211     g_object_set(G_OBJECT(new_bin), "uri", uri.constData(), nullptr);
212     CHECKED_GCONNECT(G_OBJECT(new_bin), "drained", &SourceDrainedCallback,
213                      this);
214     CHECKED_GCONNECT(G_OBJECT(new_bin), "pad-added", &NewPadCallback, this);
215     CHECKED_GCONNECT(G_OBJECT(new_bin), "notify::source", &SourceSetupCallback,
216                      this);
217 #ifdef HAVE_SPOTIFY
218   }
219 #endif
220 
221   return new_bin;
222 }
223 
CreateDecodeBinFromString(const char * pipeline)224 GstElement* GstEnginePipeline::CreateDecodeBinFromString(const char* pipeline) {
225   GError* error = nullptr;
226   GstElement* bin = gst_parse_bin_from_description(pipeline, TRUE, &error);
227 
228   if (error) {
229     QString message = QString::fromLocal8Bit(error->message);
230     int domain = error->domain;
231     int code = error->code;
232     g_error_free(error);
233 
234     qLog(Warning) << message;
235     emit Error(id(), message, domain, code);
236 
237     return nullptr;
238   } else {
239     return bin;
240   }
241 }
242 
Init()243 bool GstEnginePipeline::Init() {
244   // Here we create all the parts of the gstreamer pipeline - from the source
245   // to the sink.  The parts of the pipeline are split up into bins:
246   //   uri decode bin -> audio bin
247   // The uri decode bin is a gstreamer builtin that automatically picks the
248   // right type of source and decoder for the URI.
249 
250   // The audio bin gets created here and contains:
251   //   queue ! audioconvert ! <caps32>
252   //         ! ( rgvolume ! rglimiter ! audioconvert2 ) ! tee
253   // rgvolume and rglimiter are only created when replaygain is enabled.
254 
255   // After the tee the pipeline splits.  One split is converted to 16-bit int
256   // samples for the scope, the other is kept as float32 and sent to the
257   // speaker.
258   //   tee1 ! probe_queue ! probe_converter ! <caps16> ! probe_sink
259   //   tee2 ! audio_queue ! equalizer_preamp ! equalizer ! volume ! audioscale
260   //        ! convert ! audiosink
261 
262   gst_segment_init(&last_decodebin_segment_, GST_FORMAT_TIME);
263 
264   // Audio bin
265   audiobin_ = gst_bin_new("audiobin");
266   gst_bin_add(GST_BIN(pipeline_), audiobin_);
267 
268   // Create the sink
269   if (!(audiosink_ = engine_->CreateElement(sink_, audiobin_))) return false;
270 
271   if (g_object_class_find_property(G_OBJECT_GET_CLASS(audiosink_), "device") &&
272       !device_.toString().isEmpty()) {
273     switch (device_.type()) {
274       case QVariant::Int:
275         g_object_set(G_OBJECT(audiosink_), "device", device_.toInt(), nullptr);
276         break;
277       case QVariant::LongLong:
278         g_object_set(G_OBJECT(audiosink_), "device", device_.toLongLong(),
279                      nullptr);
280         break;
281       case QVariant::String:
282         g_object_set(G_OBJECT(audiosink_), "device",
283                      device_.toString().toUtf8().constData(), nullptr);
284         break;
285       case QVariant::ByteArray: {
286         g_object_set(G_OBJECT(audiosink_), "device",
287                      device_.toByteArray().constData(), nullptr);
288         break;
289       }
290 
291       default:
292         qLog(Warning) << "Unknown device type" << device_;
293         break;
294     }
295   }
296 
297   // Create all the other elements
298   GstElement* tee, *probe_queue, *probe_converter, *probe_sink, *audio_queue,
299       *convert;
300 
301   queue_ = engine_->CreateElement("queue2", audiobin_);
302   audioconvert_ = engine_->CreateElement("audioconvert", audiobin_);
303   tee = engine_->CreateElement("tee", audiobin_);
304 
305   probe_queue = engine_->CreateElement("queue2", audiobin_);
306   probe_converter = engine_->CreateElement("audioconvert", audiobin_);
307   probe_sink = engine_->CreateElement("fakesink", audiobin_);
308 
309   audio_queue = engine_->CreateElement("queue", audiobin_);
310   equalizer_preamp_ = engine_->CreateElement("volume", audiobin_);
311   equalizer_ = engine_->CreateElement("equalizer-nbands", audiobin_);
312   stereo_panorama_ = engine_->CreateElement("audiopanorama", audiobin_);
313   volume_ = engine_->CreateElement("volume", audiobin_);
314   audioscale_ = engine_->CreateElement("audioresample", audiobin_);
315   convert = engine_->CreateElement("audioconvert", audiobin_);
316 
317   if (!queue_ || !audioconvert_ || !tee || !probe_queue || !probe_converter ||
318       !probe_sink || !audio_queue || !equalizer_preamp_ || !equalizer_ ||
319       !stereo_panorama_ || !volume_ || !audioscale_ || !convert) {
320     return false;
321   }
322 
323   // Create the replaygain elements if it's enabled.  event_probe is the
324   // audioconvert element we attach the probe to, which will change depending
325   // on whether replaygain is enabled.  convert_sink is the element after the
326   // first audioconvert, which again will change.
327   GstElement* event_probe = audioconvert_;
328   GstElement* convert_sink = tee;
329 
330   if (rg_enabled_) {
331     rgvolume_ = engine_->CreateElement("rgvolume", audiobin_);
332     rglimiter_ = engine_->CreateElement("rglimiter", audiobin_);
333     audioconvert2_ = engine_->CreateElement("audioconvert", audiobin_);
334     event_probe = audioconvert2_;
335     convert_sink = rgvolume_;
336 
337     if (!rgvolume_ || !rglimiter_ || !audioconvert2_) {
338       return false;
339     }
340 
341     // Set replaygain settings
342     g_object_set(G_OBJECT(rgvolume_), "album-mode", rg_mode_, nullptr);
343     g_object_set(G_OBJECT(rgvolume_), "pre-amp", double(rg_preamp_), nullptr);
344     g_object_set(G_OBJECT(rglimiter_), "enabled", int(rg_compression_),
345                  nullptr);
346   }
347 
348   // Create a pad on the outside of the audiobin and connect it to the pad of
349   // the first element.
350   GstPad* pad = gst_element_get_static_pad(queue_, "sink");
351   gst_element_add_pad(audiobin_, gst_ghost_pad_new("sink", pad));
352   gst_object_unref(pad);
353 
354   // Add a data probe on the src pad of the audioconvert element for our scope.
355   // We do it here because we want pre-equalized and pre-volume samples
356   // so that our visualization are not be affected by them.
357   pad = gst_element_get_static_pad(event_probe, "src");
358   gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
359                     &EventHandoffCallback, this, NULL);
360   gst_object_unref(pad);
361 
362   // Configure the fakesink properly
363   g_object_set(G_OBJECT(probe_sink), "sync", TRUE, nullptr);
364 
365   // Setting the equalizer bands:
366   //
367   // GStreamer's GstIirEqualizerNBands sets up shelve filters for the first and
368   // last bands as corner cases. That was causing the "inverted slider" bug.
369   // As a workaround, we create two dummy bands at both ends of the spectrum.
370   // This causes the actual first and last adjustable bands to be
371   // implemented using band-pass filters.
372 
373   g_object_set(G_OBJECT(equalizer_), "num-bands", 10 + 2, nullptr);
374 
375   // Dummy first band (bandwidth 0, cutting below 20Hz):
376   GstObject* first_band = GST_OBJECT(
377       gst_child_proxy_get_child_by_index(GST_CHILD_PROXY(equalizer_), 0));
378   g_object_set(G_OBJECT(first_band), "freq", 20.0, "bandwidth", 0, "gain", 0.0f,
379                nullptr);
380   g_object_unref(G_OBJECT(first_band));
381 
382   // Dummy last band (bandwidth 0, cutting over 20KHz):
383   GstObject* last_band = GST_OBJECT(gst_child_proxy_get_child_by_index(
384       GST_CHILD_PROXY(equalizer_), kEqBandCount + 1));
385   g_object_set(G_OBJECT(last_band), "freq", 20000.0, "bandwidth", 0, "gain",
386                0.0f, nullptr);
387   g_object_unref(G_OBJECT(last_band));
388 
389   int last_band_frequency = 0;
390   for (int i = 0; i < kEqBandCount; ++i) {
391     const int index_in_eq = i + 1;
392     GstObject* band = GST_OBJECT(gst_child_proxy_get_child_by_index(
393         GST_CHILD_PROXY(equalizer_), index_in_eq));
394 
395     const float frequency = kEqBandFrequencies[i];
396     const float bandwidth = frequency - last_band_frequency;
397     last_band_frequency = frequency;
398 
399     g_object_set(G_OBJECT(band), "freq", frequency, "bandwidth", bandwidth,
400                  "gain", 0.0f, nullptr);
401     g_object_unref(G_OBJECT(band));
402   }
403 
404   // Set the stereo balance.
405   g_object_set(G_OBJECT(stereo_panorama_), "panorama", stereo_balance_,
406                nullptr);
407 
408   // Set the buffer duration.  We set this on this queue instead of the
409   // decode bin (in ReplaceDecodeBin()) because setting it on the decode bin
410   // only affects network sources.
411   // Disable the default buffer and byte limits, so we only buffer based on
412   // time.
413   g_object_set(G_OBJECT(queue_), "max-size-buffers", 0, nullptr);
414   g_object_set(G_OBJECT(queue_), "max-size-bytes", 0, nullptr);
415   g_object_set(G_OBJECT(queue_), "max-size-time", buffer_duration_nanosec_,
416                nullptr);
417   g_object_set(G_OBJECT(queue_), "low-percent", buffer_min_fill_, nullptr);
418 
419   if (buffer_duration_nanosec_ > 0) {
420     g_object_set(G_OBJECT(queue_), "use-buffering", true, nullptr);
421   }
422 
423   g_object_set(G_OBJECT(probe_queue), "max-size-buffers", 0, nullptr);
424   g_object_set(G_OBJECT(probe_queue), "max-size-bytes", 0, nullptr);
425   g_object_set(G_OBJECT(probe_queue), "max-size-time", 0, nullptr);
426 
427   gst_element_link_many(queue_, audioconvert_, convert_sink, nullptr);
428   gst_element_link(probe_converter, probe_sink);
429 
430   // Link the outputs of tee to the queues on each path.
431   pad = gst_element_get_static_pad(probe_queue, "sink");
432   gst_pad_link(gst_element_get_request_pad(tee, "src_%u"), pad);
433   gst_object_unref(pad);
434 
435   pad = gst_element_get_static_pad(audio_queue, "sink");
436   gst_pad_link(gst_element_get_request_pad(tee, "src_%u"), pad);
437   gst_object_unref(pad);
438 
439   // Link replaygain elements if enabled.
440   if (rg_enabled_) {
441     gst_element_link_many(rgvolume_, rglimiter_, audioconvert2_, tee, nullptr);
442   }
443 
444   // Link the analyzer output of the tee and force 16 bit caps
445   GstCaps* caps16 = gst_caps_new_simple("audio/x-raw", "format", G_TYPE_STRING,
446                                         "S16LE", NULL);
447   gst_element_link_filtered(probe_queue, probe_converter, caps16);
448   gst_caps_unref(caps16);
449 
450   gst_element_link_many(audio_queue, equalizer_preamp_, equalizer_,
451                         stereo_panorama_, volume_, audioscale_, convert,
452                         nullptr);
453 
454   // We only limit the media type to raw audio.
455   // Let the audio output of the tee autonegotiate the bit depth and format.
456   GstCaps* caps = gst_caps_new_empty_simple("audio/x-raw");
457 
458   // Add caps for fixed sample rate and mono, but only if requested
459   if (sample_rate_ != GstEngine::kAutoSampleRate && sample_rate_ > 0) {
460     gst_caps_set_simple(caps, "rate", G_TYPE_INT, sample_rate_, nullptr);
461   }
462 
463   if (mono_playback_) {
464     gst_caps_set_simple(caps, "channels", G_TYPE_INT, 1, nullptr);
465   }
466 
467   gst_element_link_filtered(convert, audiosink_, caps);
468   gst_caps_unref(caps);
469 
470   // Add probes and handlers.
471   pad = gst_element_get_static_pad(probe_converter, "src");
472   gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, HandoffCallback, this,
473                     nullptr);
474   gst_object_unref(pad);
475   GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
476   gst_bus_set_sync_handler(bus, BusCallbackSync, this, nullptr);
477   bus_cb_id_ = gst_bus_add_watch(bus, BusCallback, this);
478   gst_object_unref(bus);
479 
480   MaybeLinkDecodeToAudio();
481 
482   return true;
483 }
484 
MaybeLinkDecodeToAudio()485 void GstEnginePipeline::MaybeLinkDecodeToAudio() {
486   if (!uridecodebin_ || !audiobin_) return;
487 
488   GstPad* pad = gst_element_get_static_pad(uridecodebin_, "src");
489   if (!pad) return;
490 
491   gst_object_unref(pad);
492   gst_element_link(uridecodebin_, audiobin_);
493 }
494 
InitFromString(const QString & pipeline)495 bool GstEnginePipeline::InitFromString(const QString& pipeline) {
496   pipeline_ = gst_pipeline_new("pipeline");
497 
498   GstElement* new_bin =
499       CreateDecodeBinFromString(pipeline.toLatin1().constData());
500   if (!new_bin) {
501     return false;
502   }
503 
504   if (!ReplaceDecodeBin(new_bin)) {
505     gst_object_unref(GST_OBJECT(new_bin));
506     return false;
507   }
508 
509   if (!Init()) return false;
510   return gst_element_link(new_bin, audiobin_);
511 }
512 
InitFromUrl(const QUrl & url,qint64 end_nanosec)513 bool GstEnginePipeline::InitFromUrl(const QUrl& url, qint64 end_nanosec) {
514   pipeline_ = gst_pipeline_new("pipeline");
515 
516   if (url.scheme() == "cdda" && !url.path().isEmpty()) {
517     // Currently, Gstreamer can't handle input CD devices inside cdda URL. So
518     // we handle them ourself: we extract the track number and re-create an
519     // URL with only cdda:// + the track number (which can be handled by
520     // Gstreamer). We keep the device in mind, and we will set it later using
521     // SourceSetupCallback
522     QStringList path = url.path().split('/');
523     url_ = QUrl(QString("cdda://%1").arg(path.takeLast()));
524     source_device_ = path.join("/");
525   } else {
526     url_ = url;
527   }
528   end_offset_nanosec_ = end_nanosec;
529 
530   // Decode bin
531   if (!ReplaceDecodeBin(url_)) return false;
532 
533   return Init();
534 }
535 
~GstEnginePipeline()536 GstEnginePipeline::~GstEnginePipeline() {
537   if (pipeline_) {
538     GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
539     gst_bus_set_sync_handler(bus, nullptr, nullptr, nullptr);
540     gst_object_unref(bus);
541 
542     g_source_remove(bus_cb_id_);
543     gst_element_set_state(pipeline_, GST_STATE_NULL);
544     gst_object_unref(GST_OBJECT(pipeline_));
545   }
546 }
547 
BusCallback(GstBus *,GstMessage * msg,gpointer self)548 gboolean GstEnginePipeline::BusCallback(GstBus*, GstMessage* msg,
549                                         gpointer self) {
550   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(self);
551 
552   qLog(Debug) << instance->id() << "bus message" << GST_MESSAGE_TYPE_NAME(msg);
553 
554   switch (GST_MESSAGE_TYPE(msg)) {
555     case GST_MESSAGE_ERROR:
556       instance->ErrorMessageReceived(msg);
557       break;
558 
559     case GST_MESSAGE_TAG:
560       instance->TagMessageReceived(msg);
561       break;
562 
563     case GST_MESSAGE_STATE_CHANGED:
564       instance->StateChangedMessageReceived(msg);
565       break;
566 
567     default:
568       break;
569   }
570 
571   return FALSE;
572 }
573 
BusCallbackSync(GstBus *,GstMessage * msg,gpointer self)574 GstBusSyncReply GstEnginePipeline::BusCallbackSync(GstBus*, GstMessage* msg,
575                                                    gpointer self) {
576   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(self);
577 
578   qLog(Debug) << instance->id() << "sync bus message"
579               << GST_MESSAGE_TYPE_NAME(msg);
580 
581   switch (GST_MESSAGE_TYPE(msg)) {
582     case GST_MESSAGE_EOS:
583       emit instance->EndOfStreamReached(instance->id(), false);
584       break;
585 
586     case GST_MESSAGE_TAG:
587       instance->TagMessageReceived(msg);
588       break;
589 
590     case GST_MESSAGE_ERROR:
591       instance->ErrorMessageReceived(msg);
592       break;
593 
594     case GST_MESSAGE_ELEMENT:
595       instance->ElementMessageReceived(msg);
596       break;
597 
598     case GST_MESSAGE_STATE_CHANGED:
599       instance->StateChangedMessageReceived(msg);
600       break;
601 
602     case GST_MESSAGE_BUFFERING:
603       instance->BufferingMessageReceived(msg);
604       break;
605 
606     case GST_MESSAGE_STREAM_STATUS:
607       instance->StreamStatusMessageReceived(msg);
608       break;
609 
610     case GST_MESSAGE_STREAM_START:
611       if (instance->emit_track_ended_on_stream_start_) {
612         qLog(Debug) << "New segment started, EOS will signal on next buffer "
613                        "discontinuity";
614         instance->emit_track_ended_on_stream_start_ = false;
615         instance->emit_track_ended_on_time_discontinuity_ = true;
616       }
617       break;
618 
619     default:
620       break;
621   }
622 
623   return GST_BUS_PASS;
624 }
625 
StreamStatusMessageReceived(GstMessage * msg)626 void GstEnginePipeline::StreamStatusMessageReceived(GstMessage* msg) {
627   GstStreamStatusType type;
628   GstElement* owner;
629   gst_message_parse_stream_status(msg, &type, &owner);
630 
631   if (type == GST_STREAM_STATUS_TYPE_CREATE) {
632     const GValue* val = gst_message_get_stream_status_object(msg);
633     if (G_VALUE_TYPE(val) == GST_TYPE_TASK) {
634       GstTask* task = static_cast<GstTask*>(g_value_get_object(val));
635       gst_task_set_enter_callback(task, &TaskEnterCallback, this, NULL);
636     }
637   }
638 }
639 
TaskEnterCallback(GstTask *,GThread *,gpointer)640 void GstEnginePipeline::TaskEnterCallback(GstTask*, GThread*, gpointer) {
641 // Bump the priority of the thread only on OS X
642 
643 #ifdef Q_OS_DARWIN
644   sched_param param;
645   memset(&param, 0, sizeof(param));
646 
647   param.sched_priority = 99;
648   pthread_setschedparam(pthread_self(), SCHED_RR, &param);
649 #endif
650 }
651 
ElementMessageReceived(GstMessage * msg)652 void GstEnginePipeline::ElementMessageReceived(GstMessage* msg) {
653   const GstStructure* structure = gst_message_get_structure(msg);
654 
655   if (gst_structure_has_name(structure, "redirect")) {
656     const char* uri = gst_structure_get_string(structure, "new-location");
657 
658     // Set the redirect URL.  In mmssrc redirect messages come during the
659     // initial state change to PLAYING, so callers can pick up this URL after
660     // the state change has failed.
661     redirect_url_ = QUrl::fromEncoded(uri);
662   }
663 }
664 
ErrorMessageReceived(GstMessage * msg)665 void GstEnginePipeline::ErrorMessageReceived(GstMessage* msg) {
666   GError* error;
667   gchar* debugs;
668 
669   gst_message_parse_error(msg, &error, &debugs);
670   QString message = QString::fromLocal8Bit(error->message);
671   QString debugstr = QString::fromLocal8Bit(debugs);
672   int domain = error->domain;
673   int code = error->code;
674   g_error_free(error);
675   free(debugs);
676 
677   if (!redirect_url_.isEmpty() &&
678       debugstr.contains(
679           "A redirect message was posted on the bus and should have been "
680           "handled by the application.")) {
681     // mmssrc posts a message on the bus *and* makes an error message when it
682     // wants to do a redirect.  We handle the message, but now we have to
683     // ignore the error too.
684     return;
685   }
686 
687   qLog(Error) << id() << debugstr;
688 
689   emit Error(id(), message, domain, code);
690 }
691 
692 namespace {
693 
694 /*
695  * Streams served by Akamai tend to have a weird tag format embedded.
696  *
697  * Example:
698  * All Things Dance - text="Evolution" song_spot="T" MediaBaseId="0"
699  * itunesTrackId="0" amgTrackId="0" amgArtistId="0" TAID="0" TPID="0"
700  * cartcutId="0"
701  */
ParseAkamaiTag(const QString & tag)702 QPair<QString, QString> ParseAkamaiTag(const QString& tag) {
703   QRegExp re("(.*) - text=\"([^\"]+)");
704   re.indexIn(tag);
705   if (re.capturedTexts().length() >= 3) {
706     return qMakePair(re.cap(1), re.cap(2));
707   }
708   return qMakePair(tag, QString());
709 }
710 
IsAkamaiTag(const QString & tag)711 bool IsAkamaiTag(const QString& tag) { return tag.contains("- text=\""); }
712 }
713 
TagMessageReceived(GstMessage * msg)714 void GstEnginePipeline::TagMessageReceived(GstMessage* msg) {
715   GstTagList* taglist = nullptr;
716   gst_message_parse_tag(msg, &taglist);
717 
718   Engine::SimpleMetaBundle bundle;
719   bundle.title = ParseTag(taglist, GST_TAG_TITLE);
720   if (IsAkamaiTag(bundle.title)) {
721     QPair<QString, QString> artistTitlePair = ParseAkamaiTag(bundle.title);
722     bundle.artist = artistTitlePair.first;
723     bundle.title = artistTitlePair.second;
724   } else {
725     bundle.artist = ParseTag(taglist, GST_TAG_ARTIST);
726     bundle.comment = ParseTag(taglist, GST_TAG_COMMENT);
727     bundle.album = ParseTag(taglist, GST_TAG_ALBUM);
728   }
729 
730   gst_tag_list_free(taglist);
731 
732   if (ignore_tags_) return;
733 
734   if (!bundle.title.isEmpty() || !bundle.artist.isEmpty() ||
735       !bundle.comment.isEmpty() || !bundle.album.isEmpty())
736     emit MetadataFound(id(), bundle);
737 }
738 
ParseTag(GstTagList * list,const char * tag) const739 QString GstEnginePipeline::ParseTag(GstTagList* list, const char* tag) const {
740   gchar* data = nullptr;
741   bool success = gst_tag_list_get_string(list, tag, &data);
742 
743   QString ret;
744   if (success && data) {
745     ret = QString::fromUtf8(data);
746     g_free(data);
747   }
748   return ret.trimmed();
749 }
750 
StateChangedMessageReceived(GstMessage * msg)751 void GstEnginePipeline::StateChangedMessageReceived(GstMessage* msg) {
752   if (msg->src != GST_OBJECT(pipeline_)) {
753     // We only care about state changes of the whole pipeline.
754     return;
755   }
756 
757   GstState old_state, new_state, pending;
758   gst_message_parse_state_changed(msg, &old_state, &new_state, &pending);
759 
760   if (!pipeline_is_initialised_ &&
761       (new_state == GST_STATE_PAUSED || new_state == GST_STATE_PLAYING)) {
762     pipeline_is_initialised_ = true;
763     if (pending_seek_nanosec_ != -1 && pipeline_is_connected_) {
764       QMetaObject::invokeMethod(this, "Seek", Qt::QueuedConnection,
765                                 Q_ARG(qint64, pending_seek_nanosec_));
766     }
767   }
768 
769   if (pipeline_is_initialised_ && new_state != GST_STATE_PAUSED &&
770       new_state != GST_STATE_PLAYING) {
771     pipeline_is_initialised_ = false;
772   }
773 }
774 
BufferingMessageReceived(GstMessage * msg)775 void GstEnginePipeline::BufferingMessageReceived(GstMessage* msg) {
776   // Only handle buffering messages from the queue2 element in audiobin - not
777   // the one that's created automatically by uridecodebin.
778   if (GST_ELEMENT(GST_MESSAGE_SRC(msg)) != queue_) {
779     return;
780   }
781 
782   // If we are loading new next track, we don't have to pause the playback.
783   // The buffering is for the next track and not the current one.
784   if (emit_track_ended_on_stream_start_) {
785     qLog(Debug) << "Buffering next track";
786     return;
787   }
788 
789   int percent = 0;
790   gst_message_parse_buffering(msg, &percent);
791 
792   const GstState current_state = state();
793 
794   if (percent == 0 && current_state == GST_STATE_PLAYING && !buffering_) {
795     buffering_ = true;
796     emit BufferingStarted();
797 
798     SetState(GST_STATE_PAUSED);
799   } else if (percent == 100 && buffering_) {
800     buffering_ = false;
801     emit BufferingFinished();
802 
803     SetState(GST_STATE_PLAYING);
804   } else if (buffering_) {
805     emit BufferingProgress(percent);
806   }
807 }
808 
NewPadCallback(GstElement *,GstPad * pad,gpointer self)809 void GstEnginePipeline::NewPadCallback(GstElement*, GstPad* pad,
810                                        gpointer self) {
811   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(self);
812   GstPad* const audiopad =
813       gst_element_get_static_pad(instance->audiobin_, "sink");
814 
815   // Link decodebin's sink pad to audiobin's src pad.
816   if (GST_PAD_IS_LINKED(audiopad)) {
817     qLog(Warning) << instance->id()
818                   << "audiopad is already linked, unlinking old pad";
819     gst_pad_unlink(audiopad, GST_PAD_PEER(audiopad));
820   }
821 
822   gst_pad_link(pad, audiopad);
823   gst_object_unref(audiopad);
824 
825   // Offset the timestamps on all the buffers coming out of the decodebin so
826   // they line up exactly with the end of the last buffer from the old
827   // decodebin.
828   // "Running time" is the time since the last flushing seek.
829   GstClockTime running_time = gst_segment_to_running_time(
830       &instance->last_decodebin_segment_, GST_FORMAT_TIME,
831       instance->last_decodebin_segment_.position);
832   gst_pad_set_offset(pad, running_time);
833 
834   // Add a probe to the pad so we can update last_decodebin_segment_.
835   gst_pad_add_probe(
836       pad, static_cast<GstPadProbeType>(GST_PAD_PROBE_TYPE_BUFFER |
837                                         GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
838                                         GST_PAD_PROBE_TYPE_EVENT_FLUSH),
839       DecodebinProbe, instance, nullptr);
840 
841   instance->pipeline_is_connected_ = true;
842   if (instance->pending_seek_nanosec_ != -1 &&
843       instance->pipeline_is_initialised_) {
844     QMetaObject::invokeMethod(instance, "Seek", Qt::QueuedConnection,
845                               Q_ARG(qint64, instance->pending_seek_nanosec_));
846   }
847 }
848 
DecodebinProbe(GstPad * pad,GstPadProbeInfo * info,gpointer data)849 GstPadProbeReturn GstEnginePipeline::DecodebinProbe(GstPad* pad,
850                                                     GstPadProbeInfo* info,
851                                                     gpointer data) {
852   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(data);
853   const GstPadProbeType info_type = GST_PAD_PROBE_INFO_TYPE(info);
854 
855   if (info_type & GST_PAD_PROBE_TYPE_BUFFER) {
856     // The decodebin produced a buffer.  Record its end time, so we can offset
857     // the buffers produced by the next decodebin when transitioning to the next
858     // song.
859     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
860 
861     GstClockTime timestamp = GST_BUFFER_TIMESTAMP(buffer);
862     GstClockTime duration = GST_BUFFER_DURATION(buffer);
863     if (timestamp == GST_CLOCK_TIME_NONE) {
864       timestamp = instance->last_decodebin_segment_.position;
865     }
866 
867     if (duration != GST_CLOCK_TIME_NONE) {
868       timestamp += duration;
869     }
870 
871     instance->last_decodebin_segment_.position = timestamp;
872   } else if (info_type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
873     GstEvent* event = GST_PAD_PROBE_INFO_EVENT(info);
874     GstEventType event_type = GST_EVENT_TYPE(event);
875 
876     if (event_type == GST_EVENT_SEGMENT) {
877       // A new segment started, we need to save this to calculate running time
878       // offsets later.
879       gst_event_copy_segment(event, &instance->last_decodebin_segment_);
880     } else if (event_type == GST_EVENT_FLUSH_START) {
881       // A flushing seek resets the running time to 0, so remove any offset
882       // we set on this pad before.
883       gst_pad_set_offset(pad, 0);
884     }
885   }
886 
887   return GST_PAD_PROBE_OK;
888 }
889 
HandoffCallback(GstPad *,GstPadProbeInfo * info,gpointer self)890 GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad*,
891                                                      GstPadProbeInfo* info,
892                                                      gpointer self) {
893   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(self);
894   GstBuffer* buf = gst_pad_probe_info_get_buffer(info);
895 
896   QList<BufferConsumer*> consumers;
897   {
898     QMutexLocker l(&instance->buffer_consumers_mutex_);
899     consumers = instance->buffer_consumers_;
900   }
901 
902   for (BufferConsumer* consumer : consumers) {
903     gst_buffer_ref(buf);
904     consumer->ConsumeBuffer(buf, instance->id());
905   }
906 
907   // Calculate the end time of this buffer so we can stop playback if it's
908   // after the end time of this song.
909   if (instance->end_offset_nanosec_ > 0) {
910     quint64 start_time = GST_BUFFER_TIMESTAMP(buf) - instance->segment_start_;
911     quint64 duration = GST_BUFFER_DURATION(buf);
912     quint64 end_time = start_time + duration;
913 
914     if (end_time > instance->end_offset_nanosec_) {
915       if (instance->has_next_valid_url()) {
916         if (instance->next_url_ == instance->url_ &&
917             instance->next_beginning_offset_nanosec_ ==
918                 instance->end_offset_nanosec_) {
919           // The "next" song is actually the next segment of this file - so
920           // cheat and keep on playing, but just tell the Engine we've moved on.
921           instance->end_offset_nanosec_ = instance->next_end_offset_nanosec_;
922           instance->next_url_ = QUrl();
923           instance->next_beginning_offset_nanosec_ = 0;
924           instance->next_end_offset_nanosec_ = 0;
925 
926           // GstEngine will try to seek to the start of the new section, but
927           // we're already there so ignore it.
928           instance->ignore_next_seek_ = true;
929           emit instance->EndOfStreamReached(instance->id(), true);
930         } else {
931           // We have a next song but we can't cheat, so move to it normally.
932           instance->TransitionToNext();
933         }
934       } else {
935         // There's no next song
936         emit instance->EndOfStreamReached(instance->id(), false);
937       }
938     }
939   }
940 
941   if (instance->emit_track_ended_on_time_discontinuity_) {
942     if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DISCONT) ||
943         GST_BUFFER_OFFSET(buf) < instance->last_buffer_offset_ ||
944         !GST_BUFFER_OFFSET_IS_VALID(buf)) {
945       qLog(Debug) << "Buffer discontinuity - emitting EOS";
946       instance->emit_track_ended_on_time_discontinuity_ = false;
947       emit instance->EndOfStreamReached(instance->id(), true);
948     }
949   }
950 
951   instance->last_buffer_offset_ = GST_BUFFER_OFFSET(buf);
952 
953   return GST_PAD_PROBE_OK;
954 }
955 
EventHandoffCallback(GstPad *,GstPadProbeInfo * info,gpointer self)956 GstPadProbeReturn GstEnginePipeline::EventHandoffCallback(GstPad*,
957                                                           GstPadProbeInfo* info,
958                                                           gpointer self) {
959   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(self);
960   GstEvent* e = gst_pad_probe_info_get_event(info);
961 
962   qLog(Debug) << instance->id() << "event" << GST_EVENT_TYPE_NAME(e);
963 
964   switch (GST_EVENT_TYPE(e)) {
965     case GST_EVENT_SEGMENT:
966       if (!instance->segment_start_received_) {
967         // The segment start time is used to calculate the proper offset of data
968         // buffers from the start of the stream
969         const GstSegment* segment = nullptr;
970         gst_event_parse_segment(e, &segment);
971         instance->segment_start_ = segment->start;
972         instance->segment_start_received_ = true;
973       }
974       break;
975 
976     default:
977       break;
978   }
979 
980   return GST_PAD_PROBE_OK;
981 }
982 
SourceDrainedCallback(GstURIDecodeBin * bin,gpointer self)983 void GstEnginePipeline::SourceDrainedCallback(GstURIDecodeBin* bin,
984                                               gpointer self) {
985   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(self);
986 
987   if (instance->has_next_valid_url() &&
988       // I'm not sure why, but calling this when previous track is a local song
989       // and the next track is a Spotify song is buggy: the Spotify song will
990       // not start or with some offset. So just do nothing here: when the song
991       // finished, EndOfStreamReached/TrackEnded will be emitted anyway so
992       // NextItem will be called.
993       !(instance->url_.scheme() != "spotify" &&
994         instance->next_url_.scheme() == "spotify")) {
995     instance->TransitionToNext();
996   }
997 }
998 
SourceSetupCallback(GstURIDecodeBin * bin,GParamSpec * pspec,gpointer self)999 void GstEnginePipeline::SourceSetupCallback(GstURIDecodeBin* bin,
1000                                             GParamSpec* pspec, gpointer self) {
1001   GstEnginePipeline* instance = reinterpret_cast<GstEnginePipeline*>(self);
1002   GstElement* element;
1003   g_object_get(bin, "source", &element, nullptr);
1004   if (!element) {
1005     return;
1006   }
1007 
1008   if (g_object_class_find_property(G_OBJECT_GET_CLASS(element), "device") &&
1009       !instance->source_device().isEmpty()) {
1010     // Gstreamer is not able to handle device in URL (referring to Gstreamer
1011     // documentation, this might be added in the future). Despite that, for now
1012     // we include device inside URL: we decompose it during Init and set device
1013     // here, when this callback is called.
1014     g_object_set(element, "device",
1015                  instance->source_device().toLocal8Bit().constData(), nullptr);
1016   }
1017 
1018   if (g_object_class_find_property(G_OBJECT_GET_CLASS(element), "user-agent")) {
1019     QString user_agent =
1020         QString("%1 %2").arg(QCoreApplication::applicationName(),
1021                              QCoreApplication::applicationVersion());
1022     g_object_set(element, "user-agent", user_agent.toUtf8().constData(),
1023                  nullptr);
1024 
1025 #ifdef Q_OS_DARWIN
1026     g_object_set(element, "tls-database", instance->engine_->tls_database(),
1027                  nullptr);
1028     g_object_set(element, "ssl-use-system-ca-file", false, nullptr);
1029     g_object_set(element, "ssl-strict", TRUE, nullptr);
1030 #endif
1031   }
1032   g_object_unref(element);
1033 }
1034 
TransitionToNext()1035 void GstEnginePipeline::TransitionToNext() {
1036   GstElement* old_decode_bin = uridecodebin_;
1037 
1038   ignore_tags_ = true;
1039 
1040   if (!ReplaceDecodeBin(next_url_)) {
1041     qLog(Error) << "ReplaceDecodeBin failed with " << next_url_;
1042     return;
1043   }
1044   gst_element_set_state(uridecodebin_, GST_STATE_PLAYING);
1045   MaybeLinkDecodeToAudio();
1046 
1047   url_ = next_url_;
1048   end_offset_nanosec_ = next_end_offset_nanosec_;
1049   next_url_ = QUrl();
1050   next_beginning_offset_nanosec_ = 0;
1051   next_end_offset_nanosec_ = 0;
1052 
1053   // This function gets called when the source has been drained, even if the
1054   // song hasn't finished playing yet.  We'll get a new stream when it really
1055   // does finish, so emit TrackEnded then.
1056   emit_track_ended_on_stream_start_ = true;
1057 
1058   // This has to happen *after* the gst_element_set_state on the new bin to
1059   // fix an occasional race condition deadlock.
1060   sElementDeleter->DeleteElementLater(old_decode_bin);
1061 
1062   ignore_tags_ = false;
1063 }
1064 
position() const1065 qint64 GstEnginePipeline::position() const {
1066   if (pipeline_is_initialised_)
1067     gst_element_query_position(pipeline_, GST_FORMAT_TIME,
1068                                &last_known_position_ns_);
1069 
1070   return last_known_position_ns_;
1071 }
1072 
length() const1073 qint64 GstEnginePipeline::length() const {
1074   gint64 value = 0;
1075   gst_element_query_duration(pipeline_, GST_FORMAT_TIME, &value);
1076 
1077   return value;
1078 }
1079 
state() const1080 GstState GstEnginePipeline::state() const {
1081   GstState s, sp;
1082   if (gst_element_get_state(pipeline_, &s, &sp, kGstStateTimeoutNanosecs) ==
1083       GST_STATE_CHANGE_FAILURE)
1084     return GST_STATE_NULL;
1085 
1086   return s;
1087 }
1088 
SetState(GstState state)1089 QFuture<GstStateChangeReturn> GstEnginePipeline::SetState(GstState state) {
1090 #ifdef HAVE_SPOTIFY
1091   if (url_.scheme() == "spotify" && !buffering_) {
1092     const GstState current_state = this->state();
1093 
1094     if (state == GST_STATE_PAUSED && current_state == GST_STATE_PLAYING) {
1095       SpotifyService* spotify = InternetModel::Service<SpotifyService>();
1096 
1097       // Need to schedule this in the spotify service's thread
1098       QMetaObject::invokeMethod(spotify, "SetPaused", Qt::QueuedConnection,
1099                                 Q_ARG(bool, true));
1100     } else if (state == GST_STATE_PLAYING &&
1101                current_state == GST_STATE_PAUSED) {
1102       SpotifyService* spotify = InternetModel::Service<SpotifyService>();
1103 
1104       // Need to schedule this in the spotify service's thread
1105       QMetaObject::invokeMethod(spotify, "SetPaused", Qt::QueuedConnection,
1106                                 Q_ARG(bool, false));
1107     }
1108   }
1109 #endif
1110   return ConcurrentRun::Run<GstStateChangeReturn, GstElement*, GstState>(
1111       &set_state_threadpool_, &gst_element_set_state, pipeline_, state);
1112 }
1113 
Seek(qint64 nanosec)1114 bool GstEnginePipeline::Seek(qint64 nanosec) {
1115   if (ignore_next_seek_) {
1116     ignore_next_seek_ = false;
1117     return true;
1118   }
1119 
1120   if (!pipeline_is_connected_ || !pipeline_is_initialised_) {
1121     pending_seek_nanosec_ = nanosec;
1122     return true;
1123   }
1124 
1125   pending_seek_nanosec_ = -1;
1126   last_known_position_ns_ = nanosec;
1127   return gst_element_seek_simple(pipeline_, GST_FORMAT_TIME,
1128                                  GST_SEEK_FLAG_FLUSH, nanosec);
1129 }
1130 
SetEqualizerEnabled(bool enabled)1131 void GstEnginePipeline::SetEqualizerEnabled(bool enabled) {
1132   eq_enabled_ = enabled;
1133   UpdateEqualizer();
1134 }
1135 
SetEqualizerParams(int preamp,const QList<int> & band_gains)1136 void GstEnginePipeline::SetEqualizerParams(int preamp,
1137                                            const QList<int>& band_gains) {
1138   eq_preamp_ = preamp;
1139   eq_band_gains_ = band_gains;
1140   UpdateEqualizer();
1141 }
1142 
SetStereoBalance(float value)1143 void GstEnginePipeline::SetStereoBalance(float value) {
1144   stereo_balance_ = value;
1145   UpdateStereoBalance();
1146 }
1147 
UpdateEqualizer()1148 void GstEnginePipeline::UpdateEqualizer() {
1149   // Update band gains
1150   for (int i = 0; i < kEqBandCount; ++i) {
1151     float gain = eq_enabled_ ? eq_band_gains_[i] : 0.0;
1152     if (gain < 0)
1153       gain *= 0.24;
1154     else
1155       gain *= 0.12;
1156 
1157     const int index_in_eq = i + 1;
1158     // Offset because of the first dummy band we created.
1159     GstObject* band = GST_OBJECT(gst_child_proxy_get_child_by_index(
1160         GST_CHILD_PROXY(equalizer_), index_in_eq));
1161     g_object_set(G_OBJECT(band), "gain", gain, nullptr);
1162     g_object_unref(G_OBJECT(band));
1163   }
1164 
1165   // Update preamp
1166   float preamp = 1.0;
1167   if (eq_enabled_)
1168     preamp = float(eq_preamp_ + 100) * 0.01;  // To scale from 0.0 to 2.0
1169 
1170   g_object_set(G_OBJECT(equalizer_preamp_), "volume", preamp, nullptr);
1171 }
1172 
UpdateStereoBalance()1173 void GstEnginePipeline::UpdateStereoBalance() {
1174   if (stereo_panorama_) {
1175     g_object_set(G_OBJECT(stereo_panorama_), "panorama", stereo_balance_,
1176                  nullptr);
1177   }
1178 }
1179 
SetVolume(int percent)1180 void GstEnginePipeline::SetVolume(int percent) {
1181   volume_percent_ = percent;
1182   UpdateVolume();
1183 }
1184 
SetVolumeModifier(qreal mod)1185 void GstEnginePipeline::SetVolumeModifier(qreal mod) {
1186   volume_modifier_ = mod;
1187   UpdateVolume();
1188 }
1189 
UpdateVolume()1190 void GstEnginePipeline::UpdateVolume() {
1191   float vol = double(volume_percent_) * 0.01 * volume_modifier_;
1192   g_object_set(G_OBJECT(volume_), "volume", vol, nullptr);
1193 }
1194 
StartFader(qint64 duration_nanosec,QTimeLine::Direction direction,QTimeLine::CurveShape shape,bool use_fudge_timer)1195 void GstEnginePipeline::StartFader(qint64 duration_nanosec,
1196                                    QTimeLine::Direction direction,
1197                                    QTimeLine::CurveShape shape,
1198                                    bool use_fudge_timer) {
1199   const int duration_msec = duration_nanosec / kNsecPerMsec;
1200 
1201   // If there's already another fader running then start from the same time
1202   // that one was already at.
1203   int start_time = direction == QTimeLine::Forward ? 0 : duration_msec;
1204   if (fader_ && fader_->state() == QTimeLine::Running) {
1205     if (duration_msec == fader_->duration()) {
1206       start_time = fader_->currentTime();
1207     } else {
1208       // Calculate the position in the new fader with the same value from
1209       // the old fader, so no volume jumps appear
1210       qreal time = qreal(duration_msec) *
1211                    (qreal(fader_->currentTime()) / qreal(fader_->duration()));
1212       start_time = qRound(time);
1213     }
1214   }
1215 
1216   fader_.reset(new QTimeLine(duration_msec, this));
1217   connect(fader_.get(), SIGNAL(valueChanged(qreal)),
1218           SLOT(SetVolumeModifier(qreal)));
1219   connect(fader_.get(), SIGNAL(finished()), SLOT(FaderTimelineFinished()));
1220   fader_->setDirection(direction);
1221   fader_->setCurveShape(shape);
1222   fader_->setCurrentTime(start_time);
1223   fader_->resume();
1224 
1225   fader_fudge_timer_.stop();
1226   use_fudge_timer_ = use_fudge_timer;
1227 
1228   SetVolumeModifier(fader_->currentValue());
1229 }
1230 
FaderTimelineFinished()1231 void GstEnginePipeline::FaderTimelineFinished() {
1232   fader_.reset();
1233 
1234   // Wait a little while longer before emitting the finished signal (and
1235   // probably destroying the pipeline) to account for delays in the audio
1236   // server/driver.
1237   if (use_fudge_timer_) {
1238     fader_fudge_timer_.start(kFaderFudgeMsec, this);
1239   } else {
1240     // Even here we cannot emit the signal directly, as it result in a
1241     // stutter when resuming playback. So use a quest small time, so you
1242     // won't notice the difference when resuming playback
1243     // (You get here when the pause fading is active)
1244     fader_fudge_timer_.start(250, this);
1245   }
1246 }
1247 
timerEvent(QTimerEvent * e)1248 void GstEnginePipeline::timerEvent(QTimerEvent* e) {
1249   if (e->timerId() == fader_fudge_timer_.timerId()) {
1250     fader_fudge_timer_.stop();
1251     emit FaderFinished();
1252     return;
1253   }
1254 
1255   QObject::timerEvent(e);
1256 }
1257 
AddBufferConsumer(BufferConsumer * consumer)1258 void GstEnginePipeline::AddBufferConsumer(BufferConsumer* consumer) {
1259   QMutexLocker l(&buffer_consumers_mutex_);
1260   buffer_consumers_ << consumer;
1261 }
1262 
RemoveBufferConsumer(BufferConsumer * consumer)1263 void GstEnginePipeline::RemoveBufferConsumer(BufferConsumer* consumer) {
1264   QMutexLocker l(&buffer_consumers_mutex_);
1265   buffer_consumers_.removeAll(consumer);
1266 }
1267 
RemoveAllBufferConsumers()1268 void GstEnginePipeline::RemoveAllBufferConsumers() {
1269   QMutexLocker l(&buffer_consumers_mutex_);
1270   buffer_consumers_.clear();
1271 }
1272 
SetNextUrl(const QUrl & url,qint64 beginning_nanosec,qint64 end_nanosec)1273 void GstEnginePipeline::SetNextUrl(const QUrl& url, qint64 beginning_nanosec,
1274                                    qint64 end_nanosec) {
1275   next_url_ = url;
1276   next_beginning_offset_nanosec_ = beginning_nanosec;
1277   next_end_offset_nanosec_ = end_nanosec;
1278 }
1279