1 /* EngineCallbacks.cpp */
2 
3 /* Copyright (C) 2011-2020 Michael Lugmair (Lucio Carreras)
4  *
5  * This file is part of sayonara player
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version.
11 
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16 
17  * You should have received a copy of the GNU General Public License
18  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "Callbacks.h"
22 #include "Components/Engine/EngineUtils.h"
23 #include "Components/Engine/Engine.h"
24 #include "Components/Engine/Pipeline.h"
25 
26 #include "Utils/Utils.h"
27 #include "Utils/WebAccess/Proxy.h"
28 #include "Utils/Settings/Settings.h"
29 #include "Utils/MetaData/MetaData.h"
30 #include "Utils/Logger/Logger.h"
31 #include "Utils/globals.h"
32 
33 #include <QList>
34 #include <QRegExp>
35 #include <QByteArray>
36 
37 #include <memory>
38 #include <algorithm>
39 #include <vector>
40 
41 #include <gst/gst.h>
42 
43 namespace EngineUtils=::Engine::Utils;
44 namespace Callbacks=::Engine::Callbacks;
45 using ::Engine::Engine;
46 namespace EngineNS=Engine;
47 
48 const char* ClassEngineCallbacks = "Engine Callbacks";
49 
50 #ifdef Q_OS_WIN
destroy_notify(gpointer data)51 	void EngineCallbacks::destroy_notify(gpointer data) {}
52 
53 	GstBusSyncReply
bus_message_received(GstBus * bus,GstMessage * msg,gpointer data)54 	EngineCallbacks::bus_message_received(GstBus* bus, GstMessage* msg, gpointer data) {
55 		if(bus_state_changed(bus, msg, data)){
56 			gst_message_unref(msg);
57 			return GST_BUS_DROP;
58 		}
59 
60 		gst_message_unref(msg);
61 		return GST_BUS_DROP;
62 	}
63 #endif
64 
parse_image(GstElement * src,GstTagList * tags,EngineNS::Engine * engine)65 static bool parse_image(GstElement* src, GstTagList* tags, EngineNS::Engine* engine)
66 {
67 	GstSample* sample;
68 	bool success = gst_tag_list_get_sample(tags, GST_TAG_IMAGE, &sample);
69 
70 	if(!success)
71 	{
72 		success = gst_tag_list_get_sample(tags, GST_TAG_PREVIEW_IMAGE, &sample);
73 		if(!success){
74 			return false;
75 		}
76 	}
77 
78 	GstCaps* caps = gst_sample_get_caps(sample);
79 	if(!caps){
80 		gst_sample_unref(sample);
81 		return false;
82 	}
83 
84 	EngineUtils::GStringAutoFree mimetype(gst_caps_to_string(caps));
85 	if(mimetype.data() == nullptr){
86 		gst_sample_unref(sample);
87 		return false;
88 	}
89 
90 	QString mime;
91 	QString fullMime(mimetype.data());
92 
93 	QRegExp re(".*(image/[a-z|A-Z]+).*");
94 	if(re.indexIn(fullMime) >= 0){
95 		mime = re.cap(1);
96 	}
97 
98 	spLog(Log::Develop, "Engine Callbacks") << "Cover in Track: " << fullMime;
99 
100 	GstBuffer* buffer = gst_sample_get_buffer(sample);
101 	if(!buffer){
102 		gst_sample_unref(sample);
103 		return false;
104 	}
105 
106 	gsize size = gst_buffer_get_size(buffer);
107 	if(size == 0){
108 		gst_sample_unref(sample);
109 		return false;
110 	}
111 
112 	gchar* data = new gchar[size];
113 	size = gst_buffer_extract(buffer, 0, data, size);
114 
115 	if(size == 0) {
116 		delete[] data;
117 		gst_sample_unref(sample);
118 		return false;
119 	}
120 
121 	QByteArray arr(data, size);
122 	engine->updateCover(src, arr, mime);
123 
124 	delete[] data;
125 	gst_sample_unref(sample);
126 
127 	return (size > 0);
128 }
129 
130 
131 // check messages from bus
busStateChanged(GstBus * bus,GstMessage * msg,gpointer data)132 gboolean Callbacks::busStateChanged(GstBus* bus, GstMessage* msg, gpointer data)
133 {
134 	static QStringList string_tags
135 	{
136 		GST_TAG_TITLE,
137 		GST_TAG_ARTIST,
138 		GST_TAG_ALBUM,
139 		GST_TAG_ALBUM_ARTIST,
140 		GST_TAG_COMMENT,
141 
142 		GST_TAG_PERFORMER,
143 		GST_TAG_HOMEPAGE,
144 		GST_TAG_DESCRIPTION,
145 		GST_TAG_ORGANIZATION,
146 		GST_TAG_CONTACT,
147 		GST_TAG_SHOW_NAME,
148 		GST_TAG_PUBLISHER
149 	};
150 
151 	Q_UNUSED(bus);
152 
153 	auto* engine = static_cast<Engine*>(data);
154 	if(!engine){
155 		return true;
156 	}
157 
158 	GstMessageType msg_type = GST_MESSAGE_TYPE(msg);
159 	QString	msg_src_name = QString(GST_MESSAGE_SRC_NAME(msg)).toLower();
160 	GstElement*	src = reinterpret_cast<GstElement*>(msg->src);
161 
162 	switch (msg_type)
163 	{
164 		case GST_MESSAGE_EOS:
165 
166 			if(  !msg_src_name.contains("sr_filesink") &&
167 				 !msg_src_name.contains("level_sink") &&
168 				 !msg_src_name.contains("spectrum_sink") &&
169 				 !msg_src_name.contains("pipeline"))
170 			{
171 				spLog(Log::Debug, ClassEngineCallbacks) << "EOF reached: " << msg_src_name;
172 				break;
173 			}
174 
175 			engine->setTrackFinished(src);
176 
177 			break;
178 
179 		case GST_MESSAGE_ELEMENT:
180 			if(msg_src_name.compare("spectrum") == 0){
181 				return spectrumHandler(bus, msg, engine);
182 			}
183 
184 			if(msg_src_name.compare("level") == 0){
185 				return levelHandler(bus, msg, engine);
186 			}
187 
188 			break;
189 
190 		case GST_MESSAGE_SEGMENT_DONE:
191 			spLog(Log::Debug, ClassEngineCallbacks) << "Segment done: " << msg_src_name;
192 			break;
193 
194 		case GST_MESSAGE_TAG:
195 		{
196 			if( msg_src_name.contains("fake") ||
197 			    msg_src_name.contains("lame") ||
198 				!msg_src_name.contains("sink") )
199 			{
200 				break;
201 			}
202 
203 			GstTagList*	tags = nullptr;
204 			gst_message_parse_tag(msg, &tags);
205 			if(!tags){
206 				break;
207 			}
208 
209 			parse_image(src, tags, engine);
210 
211 			bool success;
212 			MetaData md = engine->currentTrack();
213 			bool update_metadata = false;
214 			for(const QString& tag : string_tags)
215 			{
216 				gchar* value=nullptr;
217 				success = gst_tag_list_get_string(tags, tag.toLocal8Bit().constData(), &value);
218 				if(!success) {
219 					continue;
220 				}
221 
222 				update_metadata = true;
223 
224 				if(tag == GST_TAG_TITLE) {
225 					md.setTitle(value);
226 				}
227 
228 				else if(tag == GST_TAG_ARTIST) {
229 					md.setArtist(value);
230 				}
231 
232 				else if(tag == GST_TAG_ALBUM) {
233 					md.setAlbum(value);
234 				}
235 
236 				else if(tag == GST_TAG_ALBUM_ARTIST) {
237 					md.setAlbumArtist(value);
238 				}
239 
240 				else if(tag == GST_TAG_COMMENT) {
241 					md.setComment(value);
242 				}
243 
244 				else {
245 					const gchar* nick = gst_tag_get_nick(tag.toLocal8Bit().constData());
246 
247 					QString sNick = tag;
248 					if(nick && strnlen(nick, 3) > 0) {
249 						sNick = QString::fromLocal8Bit(nick);
250 					}
251 
252 					md.replaceCustomField(tag, Util::stringToFirstUpper(sNick), value);
253 				}
254 
255 				g_free(value);
256 			}
257 
258 			Bitrate bitrate;
259 			success = gst_tag_list_get_uint(tags, GST_TAG_BITRATE, &bitrate);
260 			if(success)
261 			{
262 				engine->updateBitrate((bitrate / 1000) * 1000, src);
263 			}
264 
265 			if(update_metadata) {
266 				engine->updateMetadata(md, src);
267 			}
268 
269 			gst_tag_list_unref(tags);
270 		}
271 
272 		break;
273 
274 		case GST_MESSAGE_STATE_CHANGED:
275 			GstState old_state, new_state, pending_state;
276 
277 			gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
278 //			sp_log(Log::Debug, "Callback") << GST_MESSAGE_SRC_NAME(msg) << ": "
279 //							   << "State changed from "
280 //							   << gst_element_state_get_name(old_state)
281 //							   << " to "
282 //							   << gst_element_state_get_name(new_state)
283 //							   << " pending: "
284 //							   << gst_element_state_get_name(pending_state);
285 
286 			if(!msg_src_name.contains("pipeline", Qt::CaseInsensitive)){
287 				break;
288 			}
289 
290 			if( new_state == GST_STATE_PLAYING ||
291 				new_state == GST_STATE_PAUSED ||
292 				new_state == GST_STATE_READY)
293 			{
294 				engine->setTrackReady(src);
295 			}
296 
297 			break;
298 
299 		case GST_MESSAGE_BUFFERING:
300 
301 			gint percent;
302 
303 			gint avg_in, avg_out;
304 			int64_t buffering_left;
305 
306 			GstBufferingMode mode;
307 
308 			gst_message_parse_buffering(msg, &percent);
309 			gst_message_parse_buffering_stats(msg, &mode, &avg_in, &avg_out, &buffering_left );
310 
311 			spLog(Log::Crazy, "Engine Callback") << "Buffering: " << percent;
312 			spLog(Log::Crazy, "Engine Callback") << "Avg In: " << avg_in << " Avg Out: " << avg_out << " buffering_left: " << buffering_left;
313 
314 			engine->setBufferState(percent, src);
315 			break;
316 
317 		case GST_MESSAGE_DURATION_CHANGED:
318 			engine->updateDuration(src);
319 			break;
320 
321 		case GST_MESSAGE_INFO:
322 			/*gst_message_parse_info(msg, &err, nullptr);*/
323 			break;
324 
325 		case GST_MESSAGE_WARNING:
326 			{
327 				GError*	err;
328 				gst_message_parse_warning(msg, &err, nullptr);
329 				spLog(Log::Warning, ClassEngineCallbacks) << "Engine: GST_MESSAGE_WARNING: " << err->message << ": "
330 					 << GST_MESSAGE_SRC_NAME(msg);
331 				g_error_free(err);
332 			}
333 			break;
334 
335 		case GST_MESSAGE_ERROR:
336 			{
337 				static QString error_msg;
338 				GError*	err;
339 				gst_message_parse_error(msg, &err, nullptr);
340 
341 				QString src_name(GST_MESSAGE_SRC_NAME(msg));
342 
343 				spLog(Log::Error, ClassEngineCallbacks) << "Engine: GST_MESSAGE_ERROR: " << err->message << ": "
344 						 << src_name;
345 
346 				QString new_error_msg = QString(err->message);
347 
348 				if(error_msg != new_error_msg)
349 				{
350 					engine->error(new_error_msg, src_name);
351 					error_msg = new_error_msg;
352 				}
353 
354 				g_error_free(err);
355 			}
356 			break;
357 
358 		case GST_MESSAGE_STREAM_STATUS:
359 			/*{
360 				GstStreamStatusType type;
361 				gst_message_parse_stream_status(msg, &type, NULL);
362 				sp_log(Log::Debug, ClassEngineCallbacks) << "Get stream status " << type;
363 			}*/
364 			break;
365 
366 		default:
367 			break;
368 	}
369 
370 	return true;
371 }
372 
373 
374 // level changed
375 gboolean
levelHandler(GstBus * bus,GstMessage * message,gpointer data)376 Callbacks::levelHandler(GstBus* bus, GstMessage* message, gpointer data)
377 {
378 	Q_UNUSED(bus);
379 
380 	auto* engine = static_cast<Engine*>(data);
381 	if(!engine) {
382 		return true;
383 	}
384 
385 	const GstStructure* structure = gst_message_get_structure(message);
386 	if(!structure) {
387 		spLog(Log::Warning, ClassEngineCallbacks) << "structure is null";
388 		return true;
389 	}
390 
391 	const gchar* name = gst_structure_get_name(structure);
392 	if ( strcmp(name, "level") != 0 ) {
393 		return true;
394 	}
395 
396 	const GValue* peak_value = gst_structure_get_value(structure, "peak");
397 	if(!peak_value) {
398 		return true;
399 	}
400 
401 	auto* rms_arr = static_cast<GValueArray*>(g_value_get_boxed(peak_value));
402 	guint n_peak_elements = rms_arr->n_values;
403 	if(n_peak_elements == 0) {
404 		return true;
405 	}
406 
407 	double channel_values[2];
408 	n_peak_elements = std::min((guint) 2, n_peak_elements);
409 	for(guint i=0; i<n_peak_elements; i++)
410 	{
411 		const GValue* val = rms_arr->values + i;
412 
413 		if(!G_VALUE_HOLDS_DOUBLE(val)) {
414 			spLog(Log::Debug, ClassEngineCallbacks) << "Could not find a double";
415 			break;
416 		}
417 
418 		double d = g_value_get_double(val);
419 		if(d < 0){
420 			channel_values[i] = d;
421 		}
422 	}
423 
424 	if(n_peak_elements >= 2) {
425 		engine->setLevel(channel_values[0], channel_values[1]);
426 	}
427 
428 	else if(n_peak_elements == 1) {
429 		engine->setLevel(channel_values[0], channel_values[0]);
430 	}
431 
432 	return true;
433 }
434 
435 
436 // spectrum changed
437 gboolean
spectrumHandler(GstBus * bus,GstMessage * message,gpointer data)438 Callbacks::spectrumHandler(GstBus* bus, GstMessage* message, gpointer data)
439 {
440 	Q_UNUSED(bus);
441 
442 	static std::vector<float> spectrumValues;
443 
444 	auto* engine = static_cast<Engine*>(data);
445 	if(!engine) {
446 		return true;
447 	}
448 
449 	const GstStructure* structure = gst_message_get_structure(message);
450 	if(!structure) {
451 		return true;
452 	}
453 
454 	const gchar* structure_name = gst_structure_get_name(structure);
455 	if( strcmp(structure_name, "spectrum") != 0 ) {
456 		return true;
457 	}
458 
459 	const GValue* magnitudes = gst_structure_get_value (structure, "magnitude");
460 
461 	int bins = std::max(1, GetSetting(Set::Engine_SpectrumBins));
462 	if(spectrumValues.empty())
463 	{
464 		spectrumValues.resize(bins, 0);
465 	}
466 
467 	for (int i=0; i<bins; ++i)
468 	{
469 		const GValue* mag = gst_value_list_get_value(magnitudes, i);
470 		if(!mag) {
471 			continue;
472 		}
473 
474 		float f = g_value_get_float(mag);
475 		spectrumValues[i] = f;
476 	}
477 
478 	engine->setSpectrum(spectrumValues);
479 
480 	return true;
481 }
482 
483 
484 
positionChanged(gpointer data)485 gboolean Callbacks::positionChanged(gpointer data)
486 {
487 	auto* pipeline = static_cast<Pipeline*>(data);
488 	if(!pipeline){
489 		return false;
490 	}
491 
492 	GstState state = pipeline->state();
493 	if( state != GST_STATE_PLAYING &&
494 		state != GST_STATE_PAUSED &&
495 		state != GST_STATE_READY)
496 	{
497 		return true;
498 	}
499 
500 	pipeline->checkPosition();
501 
502 	return true;
503 }
504 
505 // dynamic linking, important for decodebin
decodebinReady(GstElement * source,GstPad * new_src_pad,gpointer data)506 void Callbacks::decodebinReady(GstElement* source, GstPad* new_src_pad, gpointer data)
507 {
508 	EngineUtils::GStringAutoFree element_name(gst_element_get_name(source));
509 	spLog(Log::Develop, "Callback") << "Source: " << element_name.data();
510 
511 	auto* element = static_cast<GstElement*>(data);
512 	GstPad*	sink_pad = gst_element_get_static_pad(element, "sink");
513 	if(!sink_pad){
514 		return;
515 	}
516 
517 	if(gst_pad_is_linked(sink_pad))
518 	{
519 		gst_object_unref(sink_pad);
520 		return;
521 	}
522 
523 	GstPadLinkReturn pad_link_return = gst_pad_link(new_src_pad, sink_pad);
524 
525 	if(pad_link_return != GST_PAD_LINK_OK)
526 	{
527 		spLog(Log::Error, ClassEngineCallbacks) << "Dynamic pad linking: Cannot link pads";
528 
529 		switch(pad_link_return)
530 		{
531 			case GST_PAD_LINK_WRONG_HIERARCHY:
532 				spLog(Log::Error, ClassEngineCallbacks) << "Cause: Wrong hierarchy";
533 				break;
534 			case GST_PAD_LINK_WAS_LINKED:
535 				spLog(Log::Error, ClassEngineCallbacks) << "Cause: Pad was already linked";
536 				break;
537 			case GST_PAD_LINK_WRONG_DIRECTION:
538 				spLog(Log::Error, ClassEngineCallbacks) << "Cause: Pads have wrong direction";
539 				break;
540 			case GST_PAD_LINK_NOFORMAT:
541 				spLog(Log::Error, ClassEngineCallbacks) << "Cause: Pads have incompatible format";
542 				break;
543 			case GST_PAD_LINK_NOSCHED:
544 				spLog(Log::Error, ClassEngineCallbacks) << "Cause: Pads cannot cooperate scheduling";
545 				break;
546 			case GST_PAD_LINK_REFUSED:
547 			default:
548 				spLog(Log::Error, ClassEngineCallbacks) << "Cause: Refused because of different reason";
549 				break;
550 		}
551 	}
552 
553 	else {
554 		spLog(Log::Develop, "Callbacks") << "Successfully linked " << gst_element_get_name(source) << " with " << gst_element_get_name(element);
555 	}
556 
557 	gst_object_unref(sink_pad);
558 }
559 
560 
561 #define TCP_BUFFER_SIZE 16384
newBuffer(GstElement * sink,gpointer p)562 GstFlowReturn Callbacks::newBuffer(GstElement *sink, gpointer p)
563 {
564 	static char data[TCP_BUFFER_SIZE];
565 
566 	auto* pipeline = static_cast<PipelineExtensions::BroadcastDataReceiver*>(p);
567 	if(!pipeline){
568 		return GST_FLOW_OK;
569 	}
570 
571 	GstSample* sample = gst_app_sink_pull_sample(GST_APP_SINK(sink));
572 	if(!sample) {
573 		return GST_FLOW_OK;
574 	}
575 
576 	GstBuffer* buffer = gst_sample_get_buffer(sample);
577 	if(!buffer) {
578 		gst_sample_unref(sample);
579 		return GST_FLOW_OK;
580 	}
581 
582 	gsize size = gst_buffer_get_size(buffer);
583 	gsize newSize = gst_buffer_extract(buffer, 0, data, size);
584 
585 	QByteArray bytes(data, int(newSize));
586 	pipeline->setRawData(bytes);
587 
588 	gst_sample_unref(sample);
589 
590 	return GST_FLOW_OK;
591 }
592 
593 
is_source_soup(GstElement * source)594 static bool is_source_soup(GstElement* source)
595 {
596 	GstElementFactory* fac = gst_element_get_factory(source);
597 	GType type = gst_element_factory_get_element_type(fac);
598 
599 	const gchar* name = g_type_name(type);
600 	QString src_type(name);
601 
602 	return (src_type.compare("gstsouphttpsrc", Qt::CaseInsensitive) == 0);
603 }
604 
605 
sourceReady(GstURIDecodeBin * bin,GstElement * source,gpointer data)606 void Callbacks::sourceReady(GstURIDecodeBin* bin, GstElement* source, gpointer data)
607 {
608 	Q_UNUSED(bin);
609 	Q_UNUSED(data);
610 
611 	spLog(Log::Develop, "Engine Callback") << "Source ready: is soup? " << is_source_soup(source);
612 	gst_base_src_set_dynamic_size(GST_BASE_SRC(source), false);
613 
614 	if(is_source_soup(source))
615 	{
616 		if(Proxy::active())
617 		{
618 			spLog(Log::Develop, "Engine Callback") << "Will use proxy: " << Proxy::fullUrl();
619 
620 			if(Proxy::hasUsername())
621 			{
622 				spLog(Log::Develop, "Engine Callback") << "Will use proxy username: " << Proxy::username();
623 
624 				EngineUtils::setValues(source,
625 						"proxy-id", Proxy::username().toLocal8Bit().data(),
626 						"proxy-pw", Proxy::password().toLocal8Bit().data());
627 			}
628 		}
629 	}
630 }
631