1# This program is free software; you can redistribute it and/or modify 2# it under the terms of the GNU General Public License as published by 3# the Free Software Foundation; either version 2 of the License, or 4# (at your option) any later version. 5 6import os 7import sys 8import contextlib 9 10try: 11 from gi.repository import Gst 12except ImportError: 13 Gst = None 14 15from tests import TestCase, skipUnless, get_data_path 16 17try: 18 from quodlibet.player.gstbe.util import GStreamerSink as Sink 19 from quodlibet.player.gstbe.util import parse_gstreamer_taglist 20 from quodlibet.player.gstbe.util import find_audio_sink 21 from quodlibet.player.gstbe.prefs import GstPlayerPreferences 22except ImportError: 23 pass 24 25from quodlibet.player import PlayerError 26from quodlibet.util import sanitize_tags, is_flatpak, matches_flatpak_runtime 27from quodlibet.formats import MusicFile 28from quodlibet import config 29 30 31@contextlib.contextmanager 32def ignore_gst_errors(): 33 old = Gst.debug_get_default_threshold() 34 Gst.debug_set_default_threshold(Gst.DebugLevel.NONE) 35 yield 36 Gst.debug_set_default_threshold(old) 37 38 39@skipUnless(Gst, "GStreamer missing") 40class TGstPlayerPrefs(TestCase): 41 42 def setUp(self): 43 config.init() 44 45 def tearDown(self): 46 config.quit() 47 48 def test_main(self): 49 widget = GstPlayerPreferences(None, True) 50 widget.destroy() 51 52 53@skipUnless(Gst, "GStreamer missing") 54class TGStreamerSink(TestCase): 55 def test_simple(self): 56 sinks = ["gconfaudiosink", "alsasink"] 57 for n in filter(Gst.ElementFactory.find, sinks): 58 obj, name = Sink(n) 59 self.failUnless(obj) 60 self.failUnlessEqual(name, n) 61 62 def test_invalid(self): 63 with ignore_gst_errors(): 64 self.assertRaises(PlayerError, Sink, "notarealsink") 65 66 def test_fallback(self): 67 obj, name = Sink("") 68 self.failUnless(obj) 69 if os.name == "nt": 70 self.failUnlessEqual(name, "directsoundsink") 71 else: 72 self.failUnlessEqual(name, find_audio_sink()[1]) 73 74 def test_append_sink(self): 75 obj, name = Sink("volume") 76 self.failUnless(obj) 77 self.failUnlessEqual(name.split("!")[-1].strip(), Sink("")[1]) 78 79 80@skipUnless(Gst, "GStreamer missing") 81class TGstreamerTagList(TestCase): 82 def test_parse(self): 83 # gst.TagList can't be filled using pyGtk... so use a dict instead 84 85 l = {} 86 l["extended-comment"] = u"foo=bar" 87 self.failUnless("foo" in parse_gstreamer_taglist(l)) 88 89 l["extended-comment"] = [u"foo=bar", u"bar=foo", u"bar=foo2"] 90 self.failUnless("foo" in parse_gstreamer_taglist(l)) 91 self.failUnless("bar" in parse_gstreamer_taglist(l)) 92 self.failUnlessEqual(parse_gstreamer_taglist(l)["bar"], "foo\nfoo2") 93 94 # date is abstract, so define our own 95 # (might work with pygobject now) 96 class Foo(object): 97 def to_iso8601_string(self): 98 return "3000-10-2" 99 l["date"] = Foo() 100 date = Gst.DateTime 101 Gst.DateTime = Foo 102 self.failUnlessEqual(parse_gstreamer_taglist(l)["date"], "3000-10-2") 103 Gst.DateTime = date 104 105 l["foo"] = u"äöü" 106 parsed = parse_gstreamer_taglist(l) 107 self.assertTrue(isinstance(parsed["foo"], str)) 108 self.assertTrue(u"äöü" in parsed["foo"].split("\n")) 109 110 l["foo"] = u"äöü".encode("utf-8") 111 parsed = parse_gstreamer_taglist(l) 112 self.assertTrue(isinstance(parsed["foo"], str)) 113 self.assertTrue(u"äöü" in parsed["foo"].split("\n")) 114 115 l["bar"] = 1.2 116 self.failUnlessEqual(parse_gstreamer_taglist(l)["bar"], 1.2) 117 118 l["bar"] = 9 119 self.failUnlessEqual(parse_gstreamer_taglist(l)["bar"], 9) 120 121 l["bar"] = Gst.TagList() # some random gst instance 122 self.failUnless( 123 isinstance(parse_gstreamer_taglist(l)["bar"], str)) 124 self.failUnless("GstTagList" in parse_gstreamer_taglist(l)["bar"]) 125 126 def test_sanitize(self): 127 l = sanitize_tags({"location": u"http://foo"}) 128 self.failUnless("website" in l) 129 130 l = sanitize_tags({"channel-mode": u"joint-stereo"}) 131 self.failUnlessEqual(l["channel-mode"], "stereo") 132 133 l = sanitize_tags({"channel-mode": u"dual"}) 134 self.failUnlessEqual(l["channel-mode"], "stereo") 135 136 l = sanitize_tags({"audio-codec": u"mp3"}) 137 self.failUnlessEqual(l["audio-codec"], "MP3") 138 139 l = sanitize_tags({"audio-codec": u"Advanced Audio Coding"}) 140 self.failUnlessEqual(l["audio-codec"], "MPEG-4 AAC") 141 142 l = sanitize_tags({"audio-codec": u"vorbis"}) 143 self.failUnlessEqual(l["audio-codec"], "Ogg Vorbis") 144 145 l = {"a": u"http://www.shoutcast.com", "b": u"default genre"} 146 l = sanitize_tags(l) 147 self.failIf(l) 148 149 l = sanitize_tags({"duration": 1000 * 42}, stream=True) 150 self.failUnlessEqual(l["~#length"], 42) 151 l = sanitize_tags({"duration": 1000 * 42}) 152 self.failIf(l) 153 154 l = sanitize_tags({"duration": u"bla"}, stream=True) 155 self.failUnlessEqual(l["duration"], u"bla") 156 157 l = sanitize_tags({"bitrate": 1000 * 42}, stream=True) 158 self.failUnlessEqual(l["~#bitrate"], 42) 159 l = sanitize_tags({"bitrate": 1000 * 42}) 160 self.failIf(l) 161 162 l = sanitize_tags({"bitrate": u"bla"}) 163 self.failUnlessEqual(l["bitrate"], u"bla") 164 165 l = sanitize_tags({"nominal-bitrate": 1000 * 42}) 166 self.failUnlessEqual(l["~#bitrate"], 42) 167 l = sanitize_tags({"nominal-bitrate": 1000 * 42}, stream=True) 168 self.failIf(l) 169 170 l = sanitize_tags({"nominal-bitrate": u"bla"}) 171 self.failUnlessEqual(l["nominal-bitrate"], u"bla") 172 173 l = {"emphasis": u"something"} 174 self.failIf(sanitize_tags(l)) 175 self.failIf(sanitize_tags(l)) 176 177 l = {"title": u"something"} 178 self.failIf(sanitize_tags(l)) 179 self.failUnless(sanitize_tags(l, stream=True)) 180 181 l = {"artist": u"something"} 182 self.failIf(sanitize_tags(l)) 183 self.failUnless(sanitize_tags(l, stream=True)) 184 185 l = {"~#foo": 42, "bar": 42, "~#bla": u"42"} 186 self.failUnless("~#foo" in sanitize_tags(l)) 187 self.failUnless("~#bar" in sanitize_tags(l)) 188 self.failUnless("bla" in sanitize_tags(l)) 189 190 l = {} 191 l["extended-comment"] = [u"location=1", u"website=2", u"website=3"] 192 l = parse_gstreamer_taglist(l) 193 l = sanitize_tags(l)["website"].split("\n") 194 self.failUnless("1" in l) 195 self.failUnless("2" in l) 196 self.failUnless("3" in l) 197 198 199@skipUnless(Gst, "GStreamer missing") 200@skipUnless(sys.platform == "darwin" or os.name == "nt" or is_flatpak(), 201 "no control over gst") 202class TGStreamerCodecs(TestCase): 203 204 def setUp(self): 205 config.init() 206 207 def tearDown(self): 208 config.quit() 209 210 def _check(self, song): 211 old_threshold = Gst.debug_get_default_threshold() 212 Gst.debug_set_default_threshold(Gst.DebugLevel.NONE) 213 214 pipeline = Gst.parse_launch( 215 "uridecodebin uri=%s ! fakesink" % song("~uri")) 216 bus = pipeline.get_bus() 217 pipeline.set_state(Gst.State.PLAYING) 218 error = None 219 try: 220 while 1: 221 message = bus.timed_pop(Gst.SECOND * 40) 222 if not message or message.type == Gst.MessageType.ERROR: 223 if message: 224 error = message.parse_error()[0].message 225 else: 226 error = "timed out" 227 break 228 if message.type == Gst.MessageType.EOS: 229 break 230 finally: 231 pipeline.set_state(Gst.State.NULL) 232 233 Gst.debug_set_default_threshold(old_threshold) 234 return error 235 236 def test_decode_all(self): 237 """Decode all kinds of formats using Gstreamer, to check if 238 they all work and to notify us if a plugin is missing on 239 platforms where we control the packaging. 240 """ 241 242 files = [ 243 "coverart.wv", 244 "empty.aac", 245 "empty.flac", 246 "empty.ogg", 247 "empty.opus", 248 "silence-44-s.mpc", 249 "silence-44-s.sv8.mpc", 250 "silence-44-s.tta", 251 # "test.mid", 252 "test.spc", 253 "test.vgm", 254 "test.wma", 255 "empty.xm", 256 "h264_aac.mp4", 257 "h265_aac.mp4" 258 ] 259 260 if not matches_flatpak_runtime("*org.gnome.*/3.32"): 261 # https://gitlab.com/freedesktop-sdk/freedesktop-sdk/issues/809 262 files.append("silence-44-s.spx") 263 264 errors = [] 265 for file_ in files: 266 path = get_data_path(file_) 267 song = MusicFile(path) 268 if song is not None: 269 error = self._check(song) 270 if error: 271 errors.append((song("~format"), error)) 272 273 if errors: 274 raise Exception("Decoding failed %r" % errors) 275