1# This program is free software; you can redistribute it and/or modify
2# it under the terms of the GNU General Public License as published by
3# the Free Software Foundation; either version 2 of the License, or
4# (at your option) any later version.
5
6import os
7import sys
8import contextlib
9
10try:
11    from gi.repository import Gst
12except ImportError:
13    Gst = None
14
15from tests import TestCase, skipUnless, get_data_path
16
17try:
18    from quodlibet.player.gstbe.util import GStreamerSink as Sink
19    from quodlibet.player.gstbe.util import parse_gstreamer_taglist
20    from quodlibet.player.gstbe.util import find_audio_sink
21    from quodlibet.player.gstbe.prefs import GstPlayerPreferences
22except ImportError:
23    pass
24
25from quodlibet.player import PlayerError
26from quodlibet.util import sanitize_tags, is_flatpak, matches_flatpak_runtime
27from quodlibet.formats import MusicFile
28from quodlibet import config
29
30
31@contextlib.contextmanager
32def ignore_gst_errors():
33    old = Gst.debug_get_default_threshold()
34    Gst.debug_set_default_threshold(Gst.DebugLevel.NONE)
35    yield
36    Gst.debug_set_default_threshold(old)
37
38
39@skipUnless(Gst, "GStreamer missing")
40class TGstPlayerPrefs(TestCase):
41
42    def setUp(self):
43        config.init()
44
45    def tearDown(self):
46        config.quit()
47
48    def test_main(self):
49        widget = GstPlayerPreferences(None, True)
50        widget.destroy()
51
52
53@skipUnless(Gst, "GStreamer missing")
54class TGStreamerSink(TestCase):
55    def test_simple(self):
56        sinks = ["gconfaudiosink", "alsasink"]
57        for n in filter(Gst.ElementFactory.find, sinks):
58            obj, name = Sink(n)
59            self.failUnless(obj)
60            self.failUnlessEqual(name, n)
61
62    def test_invalid(self):
63        with ignore_gst_errors():
64            self.assertRaises(PlayerError, Sink, "notarealsink")
65
66    def test_fallback(self):
67        obj, name = Sink("")
68        self.failUnless(obj)
69        if os.name == "nt":
70            self.failUnlessEqual(name, "directsoundsink")
71        else:
72            self.failUnlessEqual(name, find_audio_sink()[1])
73
74    def test_append_sink(self):
75        obj, name = Sink("volume")
76        self.failUnless(obj)
77        self.failUnlessEqual(name.split("!")[-1].strip(), Sink("")[1])
78
79
80@skipUnless(Gst, "GStreamer missing")
81class TGstreamerTagList(TestCase):
82    def test_parse(self):
83        # gst.TagList can't be filled using pyGtk... so use a dict instead
84
85        l = {}
86        l["extended-comment"] = u"foo=bar"
87        self.failUnless("foo" in parse_gstreamer_taglist(l))
88
89        l["extended-comment"] = [u"foo=bar", u"bar=foo", u"bar=foo2"]
90        self.failUnless("foo" in parse_gstreamer_taglist(l))
91        self.failUnless("bar" in parse_gstreamer_taglist(l))
92        self.failUnlessEqual(parse_gstreamer_taglist(l)["bar"], "foo\nfoo2")
93
94        # date is abstract, so define our own
95        # (might work with pygobject now)
96        class Foo(object):
97            def to_iso8601_string(self):
98                return "3000-10-2"
99        l["date"] = Foo()
100        date = Gst.DateTime
101        Gst.DateTime = Foo
102        self.failUnlessEqual(parse_gstreamer_taglist(l)["date"], "3000-10-2")
103        Gst.DateTime = date
104
105        l["foo"] = u"äöü"
106        parsed = parse_gstreamer_taglist(l)
107        self.assertTrue(isinstance(parsed["foo"], str))
108        self.assertTrue(u"äöü" in parsed["foo"].split("\n"))
109
110        l["foo"] = u"äöü".encode("utf-8")
111        parsed = parse_gstreamer_taglist(l)
112        self.assertTrue(isinstance(parsed["foo"], str))
113        self.assertTrue(u"äöü" in parsed["foo"].split("\n"))
114
115        l["bar"] = 1.2
116        self.failUnlessEqual(parse_gstreamer_taglist(l)["bar"], 1.2)
117
118        l["bar"] = 9
119        self.failUnlessEqual(parse_gstreamer_taglist(l)["bar"], 9)
120
121        l["bar"] = Gst.TagList() # some random gst instance
122        self.failUnless(
123            isinstance(parse_gstreamer_taglist(l)["bar"], str))
124        self.failUnless("GstTagList" in parse_gstreamer_taglist(l)["bar"])
125
126    def test_sanitize(self):
127        l = sanitize_tags({"location": u"http://foo"})
128        self.failUnless("website" in l)
129
130        l = sanitize_tags({"channel-mode": u"joint-stereo"})
131        self.failUnlessEqual(l["channel-mode"], "stereo")
132
133        l = sanitize_tags({"channel-mode": u"dual"})
134        self.failUnlessEqual(l["channel-mode"], "stereo")
135
136        l = sanitize_tags({"audio-codec": u"mp3"})
137        self.failUnlessEqual(l["audio-codec"], "MP3")
138
139        l = sanitize_tags({"audio-codec": u"Advanced Audio Coding"})
140        self.failUnlessEqual(l["audio-codec"], "MPEG-4 AAC")
141
142        l = sanitize_tags({"audio-codec": u"vorbis"})
143        self.failUnlessEqual(l["audio-codec"], "Ogg Vorbis")
144
145        l = {"a": u"http://www.shoutcast.com", "b": u"default genre"}
146        l = sanitize_tags(l)
147        self.failIf(l)
148
149        l = sanitize_tags({"duration": 1000 * 42}, stream=True)
150        self.failUnlessEqual(l["~#length"], 42)
151        l = sanitize_tags({"duration": 1000 * 42})
152        self.failIf(l)
153
154        l = sanitize_tags({"duration": u"bla"}, stream=True)
155        self.failUnlessEqual(l["duration"], u"bla")
156
157        l = sanitize_tags({"bitrate": 1000 * 42}, stream=True)
158        self.failUnlessEqual(l["~#bitrate"], 42)
159        l = sanitize_tags({"bitrate": 1000 * 42})
160        self.failIf(l)
161
162        l = sanitize_tags({"bitrate": u"bla"})
163        self.failUnlessEqual(l["bitrate"], u"bla")
164
165        l = sanitize_tags({"nominal-bitrate": 1000 * 42})
166        self.failUnlessEqual(l["~#bitrate"], 42)
167        l = sanitize_tags({"nominal-bitrate": 1000 * 42}, stream=True)
168        self.failIf(l)
169
170        l = sanitize_tags({"nominal-bitrate": u"bla"})
171        self.failUnlessEqual(l["nominal-bitrate"], u"bla")
172
173        l = {"emphasis": u"something"}
174        self.failIf(sanitize_tags(l))
175        self.failIf(sanitize_tags(l))
176
177        l = {"title": u"something"}
178        self.failIf(sanitize_tags(l))
179        self.failUnless(sanitize_tags(l, stream=True))
180
181        l = {"artist": u"something"}
182        self.failIf(sanitize_tags(l))
183        self.failUnless(sanitize_tags(l, stream=True))
184
185        l = {"~#foo": 42, "bar": 42, "~#bla": u"42"}
186        self.failUnless("~#foo" in sanitize_tags(l))
187        self.failUnless("~#bar" in sanitize_tags(l))
188        self.failUnless("bla" in sanitize_tags(l))
189
190        l = {}
191        l["extended-comment"] = [u"location=1", u"website=2", u"website=3"]
192        l = parse_gstreamer_taglist(l)
193        l = sanitize_tags(l)["website"].split("\n")
194        self.failUnless("1" in l)
195        self.failUnless("2" in l)
196        self.failUnless("3" in l)
197
198
199@skipUnless(Gst, "GStreamer missing")
200@skipUnless(sys.platform == "darwin" or os.name == "nt" or is_flatpak(),
201            "no control over gst")
202class TGStreamerCodecs(TestCase):
203
204    def setUp(self):
205        config.init()
206
207    def tearDown(self):
208        config.quit()
209
210    def _check(self, song):
211        old_threshold = Gst.debug_get_default_threshold()
212        Gst.debug_set_default_threshold(Gst.DebugLevel.NONE)
213
214        pipeline = Gst.parse_launch(
215            "uridecodebin uri=%s ! fakesink" % song("~uri"))
216        bus = pipeline.get_bus()
217        pipeline.set_state(Gst.State.PLAYING)
218        error = None
219        try:
220            while 1:
221                message = bus.timed_pop(Gst.SECOND * 40)
222                if not message or message.type == Gst.MessageType.ERROR:
223                    if message:
224                        error = message.parse_error()[0].message
225                    else:
226                        error = "timed out"
227                    break
228                if message.type == Gst.MessageType.EOS:
229                    break
230        finally:
231            pipeline.set_state(Gst.State.NULL)
232
233        Gst.debug_set_default_threshold(old_threshold)
234        return error
235
236    def test_decode_all(self):
237        """Decode all kinds of formats using Gstreamer, to check if
238        they all work and to notify us if a plugin is missing on
239        platforms where we control the packaging.
240        """
241
242        files = [
243            "coverart.wv",
244            "empty.aac",
245            "empty.flac",
246            "empty.ogg",
247            "empty.opus",
248            "silence-44-s.mpc",
249            "silence-44-s.sv8.mpc",
250            "silence-44-s.tta",
251            # "test.mid",
252            "test.spc",
253            "test.vgm",
254            "test.wma",
255            "empty.xm",
256            "h264_aac.mp4",
257            "h265_aac.mp4"
258        ]
259
260        if not matches_flatpak_runtime("*org.gnome.*/3.32"):
261            # https://gitlab.com/freedesktop-sdk/freedesktop-sdk/issues/809
262            files.append("silence-44-s.spx")
263
264        errors = []
265        for file_ in files:
266            path = get_data_path(file_)
267            song = MusicFile(path)
268            if song is not None:
269                error = self._check(song)
270                if error:
271                    errors.append((song("~format"), error))
272
273        if errors:
274            raise Exception("Decoding failed %r" % errors)
275