1import unittest
2from unittest.mock import ANY, MagicMock, Mock, call, patch
3
4from streamlink import PluginError
5from streamlink.stream import DASHStream
6from streamlink.stream.dash import DASHStreamWorker
7from streamlink.stream.dash_manifest import MPD
8from tests.resources import text, xml
9
10
11class TestDASHStream(unittest.TestCase):
12    def setUp(self):
13        self.session = MagicMock()
14        self.test_url = "http://test.bar/foo.mpd"
15        self.session.http.get.return_value = Mock(url=self.test_url)
16
17    @patch('streamlink.stream.dash.MPD')
18    def test_parse_manifest_video_only(self, mpdClass):
19        mpdClass.return_value = Mock(periods=[
20            Mock(adaptationSets=[
21                Mock(contentProtection=None,
22                     representations=[
23                         Mock(id=1, mimeType="video/mp4", height=720),
24                         Mock(id=2, mimeType="video/mp4", height=1080)
25                     ])
26            ])
27        ])
28
29        streams = DASHStream.parse_manifest(self.session, self.test_url)
30        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
31
32        self.assertSequenceEqual(
33            sorted(list(streams.keys())),
34            sorted(["720p", "1080p"])
35        )
36
37    @patch('streamlink.stream.dash.MPD')
38    def test_parse_manifest_audio_only(self, mpdClass):
39        mpdClass.return_value = Mock(periods=[
40            Mock(adaptationSets=[
41                Mock(contentProtection=None,
42                     representations=[
43                         Mock(id=1, mimeType="audio/mp4", bandwidth=128.0, lang='en'),
44                         Mock(id=2, mimeType="audio/mp4", bandwidth=256.0, lang='en')
45                     ])
46            ])
47        ])
48
49        streams = DASHStream.parse_manifest(self.session, self.test_url)
50        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
51
52        self.assertSequenceEqual(
53            sorted(list(streams.keys())),
54            sorted(["a128k", "a256k"])
55        )
56
57    @patch('streamlink.stream.dash.MPD')
58    def test_parse_manifest_audio_single(self, mpdClass):
59        mpdClass.return_value = Mock(periods=[
60            Mock(adaptationSets=[
61                Mock(contentProtection=None,
62                     representations=[
63                         Mock(id=1, mimeType="video/mp4", height=720),
64                         Mock(id=2, mimeType="video/mp4", height=1080),
65                         Mock(id=3, mimeType="audio/aac", bandwidth=128.0, lang='en')
66                     ])
67            ])
68        ])
69
70        streams = DASHStream.parse_manifest(self.session, self.test_url)
71        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
72
73        self.assertSequenceEqual(
74            sorted(list(streams.keys())),
75            sorted(["720p", "1080p"])
76        )
77
78    @patch('streamlink.stream.dash.MPD')
79    def test_parse_manifest_audio_multi(self, mpdClass):
80        mpdClass.return_value = Mock(periods=[
81            Mock(adaptationSets=[
82                Mock(contentProtection=None,
83                     representations=[
84                         Mock(id=1, mimeType="video/mp4", height=720),
85                         Mock(id=2, mimeType="video/mp4", height=1080),
86                         Mock(id=3, mimeType="audio/aac", bandwidth=128.0, lang='en'),
87                         Mock(id=4, mimeType="audio/aac", bandwidth=256.0, lang='en')
88                     ])
89            ])
90        ])
91
92        streams = DASHStream.parse_manifest(self.session, self.test_url)
93        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
94
95        self.assertSequenceEqual(
96            sorted(list(streams.keys())),
97            sorted(["720p+a128k", "1080p+a128k", "720p+a256k", "1080p+a256k"])
98        )
99
100    @patch('streamlink.stream.dash.MPD')
101    def test_parse_manifest_audio_multi_lang(self, mpdClass):
102        mpdClass.return_value = Mock(periods=[
103            Mock(adaptationSets=[
104                Mock(contentProtection=None,
105                     representations=[
106                         Mock(id=1, mimeType="video/mp4", height=720),
107                         Mock(id=2, mimeType="video/mp4", height=1080),
108                         Mock(id=3, mimeType="audio/aac", bandwidth=128.0, lang='en'),
109                         Mock(id=4, mimeType="audio/aac", bandwidth=128.0, lang='es')
110                     ])
111            ])
112        ])
113
114        streams = DASHStream.parse_manifest(self.session, self.test_url)
115        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
116
117        self.assertSequenceEqual(
118            sorted(list(streams.keys())),
119            sorted(["720p", "1080p"])
120        )
121
122        self.assertEqual(streams["720p"].audio_representation.lang, "en")
123        self.assertEqual(streams["1080p"].audio_representation.lang, "en")
124
125    @patch('streamlink.stream.dash.MPD')
126    def test_parse_manifest_audio_multi_lang_alpha3(self, mpdClass):
127        mpdClass.return_value = Mock(periods=[
128            Mock(adaptationSets=[
129                Mock(contentProtection=None,
130                     representations=[
131                         Mock(id=1, mimeType="video/mp4", height=720),
132                         Mock(id=2, mimeType="video/mp4", height=1080),
133                         Mock(id=3, mimeType="audio/aac", bandwidth=128.0, lang='eng'),
134                         Mock(id=4, mimeType="audio/aac", bandwidth=128.0, lang='spa')
135                     ])
136            ])
137        ])
138
139        streams = DASHStream.parse_manifest(self.session, self.test_url)
140        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
141
142        self.assertSequenceEqual(
143            sorted(list(streams.keys())),
144            sorted(["720p", "1080p"])
145        )
146
147        self.assertEqual(streams["720p"].audio_representation.lang, "eng")
148        self.assertEqual(streams["1080p"].audio_representation.lang, "eng")
149
150    @patch('streamlink.stream.dash.MPD')
151    def test_parse_manifest_audio_invalid_lang(self, mpdClass):
152        mpdClass.return_value = Mock(periods=[
153            Mock(adaptationSets=[
154                Mock(contentProtection=None,
155                     representations=[
156                         Mock(id=1, mimeType="video/mp4", height=720),
157                         Mock(id=2, mimeType="video/mp4", height=1080),
158                         Mock(id=3, mimeType="audio/aac", bandwidth=128.0, lang='en_no_voice'),
159                     ])
160            ])
161        ])
162
163        streams = DASHStream.parse_manifest(self.session, self.test_url)
164        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
165
166        self.assertSequenceEqual(
167            sorted(list(streams.keys())),
168            sorted(["720p", "1080p"])
169        )
170
171        self.assertEqual(streams["720p"].audio_representation.lang, "en_no_voice")
172        self.assertEqual(streams["1080p"].audio_representation.lang, "en_no_voice")
173
174    @patch('streamlink.stream.dash.MPD')
175    def test_parse_manifest_audio_multi_lang_locale(self, mpdClass):
176        self.session.localization.language.alpha2 = "es"
177        self.session.localization.explicit = True
178
179        mpdClass.return_value = Mock(periods=[
180            Mock(adaptationSets=[
181                Mock(contentProtection=None,
182                     representations=[
183                         Mock(id=1, mimeType="video/mp4", height=720),
184                         Mock(id=2, mimeType="video/mp4", height=1080),
185                         Mock(id=3, mimeType="audio/aac", bandwidth=128.0, lang='en'),
186                         Mock(id=4, mimeType="audio/aac", bandwidth=128.0, lang='es')
187                     ])
188            ])
189        ])
190
191        streams = DASHStream.parse_manifest(self.session, self.test_url)
192        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
193
194        self.assertSequenceEqual(
195            sorted(list(streams.keys())),
196            sorted(["720p", "1080p"])
197        )
198
199        self.assertEqual(streams["720p"].audio_representation.lang, "es")
200        self.assertEqual(streams["1080p"].audio_representation.lang, "es")
201
202    @patch('streamlink.stream.dash.MPD')
203    def test_parse_manifest_drm(self, mpdClass):
204        mpdClass.return_value = Mock(periods=[Mock(adaptationSets=[Mock(contentProtection="DRM")])])
205
206        self.assertRaises(PluginError,
207                          DASHStream.parse_manifest,
208                          self.session, self.test_url)
209        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
210
211    def test_parse_manifest_string(self):
212        with text("dash/test_9.mpd") as mpd_txt:
213            test_manifest = mpd_txt.read()
214
215        streams = DASHStream.parse_manifest(self.session, test_manifest)
216        self.assertSequenceEqual(list(streams.keys()), ['2500k'])
217
218    @patch('streamlink.stream.dash.DASHStreamReader')
219    @patch('streamlink.stream.dash.FFMPEGMuxer')
220    def test_stream_open_video_only(self, muxer, reader):
221        stream = DASHStream(self.session, Mock(), Mock(id=1, mimeType="video/mp4"))
222        open_reader = reader.return_value = Mock()
223
224        stream.open()
225
226        reader.assert_called_with(stream, 1, "video/mp4")
227        open_reader.open.assert_called_with()
228        muxer.assert_not_called()
229
230    @patch('streamlink.stream.dash.DASHStreamReader')
231    @patch('streamlink.stream.dash.FFMPEGMuxer')
232    def test_stream_open_video_audio(self, muxer, reader):
233        stream = DASHStream(self.session, Mock(), Mock(id=1, mimeType="video/mp4"), Mock(id=2, mimeType="audio/mp3", lang='en'))
234        open_reader = reader.return_value = Mock()
235
236        stream.open()
237
238        self.assertSequenceEqual(reader.mock_calls, [call(stream, 1, "video/mp4"),
239                                                     call().open(),
240                                                     call(stream, 2, "audio/mp3"),
241                                                     call().open()])
242        self.assertSequenceEqual(muxer.mock_calls, [call(self.session, open_reader, open_reader, copyts=True),
243                                                    call().open()])
244
245    @patch('streamlink.stream.dash.MPD')
246    def test_segments_number_time(self, mpdClass):
247        with xml("dash/test_9.mpd") as mpd_xml:
248            mpdClass.return_value = MPD(mpd_xml, base_url="http://test.bar", url="http://test.bar/foo.mpd")
249
250            streams = DASHStream.parse_manifest(self.session, self.test_url)
251            mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
252
253            self.assertSequenceEqual(list(streams.keys()), ['2500k'])
254
255    @patch('streamlink.stream.dash.MPD')
256    def test_parse_manifest_with_duplicated_resolutions(self, mpdClass):
257        """
258            Verify the fix for https://github.com/streamlink/streamlink/issues/3365
259        """
260        mpdClass.return_value = Mock(periods=[
261            Mock(adaptationSets=[
262                Mock(contentProtection=None,
263                     representations=[
264                         Mock(id=1, mimeType="video/mp4", height=1080, bandwidth=128.0),
265                         Mock(id=2, mimeType="video/mp4", height=1080, bandwidth=64.0),
266                         Mock(id=3, mimeType="video/mp4", height=1080, bandwidth=32.0),
267                         Mock(id=4, mimeType="video/mp4", height=720),
268                     ])
269            ])
270        ])
271
272        streams = DASHStream.parse_manifest(self.session, self.test_url)
273        mpdClass.assert_called_with(ANY, base_url="http://test.bar", url="http://test.bar/foo.mpd")
274
275        self.assertSequenceEqual(
276            sorted(list(streams.keys())),
277            sorted(["720p", "1080p", "1080p_alt", "1080p_alt2"])
278        )
279
280
281class TestDASHStreamWorker(unittest.TestCase):
282    @patch("streamlink.stream.dash_manifest.time.sleep")
283    @patch('streamlink.stream.dash.MPD')
284    def test_dynamic_reload(self, mpdClass, sleep):
285        reader = MagicMock()
286        worker = DASHStreamWorker(reader)
287        reader.representation_id = 1
288        reader.mime_type = "video/mp4"
289
290        representation = Mock(id=1, mimeType="video/mp4", height=720)
291        segments = [Mock(url="init_segment"), Mock(url="first_segment"), Mock(url="second_segment")]
292        representation.segments.return_value = [segments[0]]
293        mpdClass.return_value = worker.mpd = Mock(dynamic=True,
294                                                  publishTime=1,
295                                                  periods=[
296                                                      Mock(adaptationSets=[
297                                                          Mock(contentProtection=None,
298                                                               representations=[
299                                                                   representation
300                                                               ])
301                                                      ])
302                                                  ])
303        worker.mpd.type = "dynamic"
304        worker.mpd.minimumUpdatePeriod.total_seconds.return_value = 0
305        worker.mpd.periods[0].duration.total_seconds.return_value = 0
306
307        segment_iter = worker.iter_segments()
308
309        representation.segments.return_value = segments[:1]
310        self.assertEqual(next(segment_iter), segments[0])
311        representation.segments.assert_called_with(init=True)
312
313        representation.segments.return_value = segments[1:]
314        self.assertSequenceEqual([next(segment_iter), next(segment_iter)], segments[1:])
315        representation.segments.assert_called_with(init=False)
316
317    @patch("streamlink.stream.dash_manifest.time.sleep")
318    def test_static(self, sleep):
319        reader = MagicMock()
320        worker = DASHStreamWorker(reader)
321        reader.representation_id = 1
322        reader.mime_type = "video/mp4"
323
324        representation = Mock(id=1, mimeType="video/mp4", height=720)
325        segments = [Mock(url="init_segment"), Mock(url="first_segment"), Mock(url="second_segment")]
326        representation.segments.return_value = [segments[0]]
327        worker.mpd = Mock(dynamic=False,
328                          publishTime=1,
329                          periods=[
330                              Mock(adaptationSets=[
331                                  Mock(contentProtection=None,
332                                       representations=[
333                                           representation
334                                       ])
335                              ])
336                          ])
337        worker.mpd.type = "static"
338        worker.mpd.minimumUpdatePeriod.total_seconds.return_value = 0
339        worker.mpd.periods[0].duration.total_seconds.return_value = 0
340
341        representation.segments.return_value = segments
342        self.assertSequenceEqual(list(worker.iter_segments()), segments)
343        representation.segments.assert_called_with(init=True)
344
345    @patch("streamlink.stream.dash_manifest.time.time")
346    @patch("streamlink.stream.dash_manifest.time.sleep")
347    def test_static_refresh_wait(self, sleep, time):
348        """
349            Verify the fix for https://github.com/streamlink/streamlink/issues/2873
350        """
351        time.return_value = 1
352        reader = MagicMock()
353        worker = DASHStreamWorker(reader)
354        reader.representation_id = 1
355        reader.mime_type = "video/mp4"
356
357        representation = Mock(id=1, mimeType="video/mp4", height=720)
358        segments = [Mock(url="init_segment"), Mock(url="first_segment"), Mock(url="second_segment")]
359        representation.segments.return_value = [segments[0]]
360        worker.mpd = Mock(dynamic=False,
361                          publishTime=1,
362                          periods=[
363                              Mock(adaptationSets=[
364                                  Mock(contentProtection=None,
365                                       representations=[
366                                           representation
367                                       ])
368                              ])
369                          ])
370        worker.mpd.type = "static"
371        for duration in (0, 204.32):
372            worker.mpd.minimumUpdatePeriod.total_seconds.return_value = 0
373            worker.mpd.periods[0].duration.total_seconds.return_value = duration
374
375            representation.segments.return_value = segments
376            self.assertSequenceEqual(list(worker.iter_segments()), segments)
377            representation.segments.assert_called_with(init=True)
378            sleep.assert_called_with(5)
379
380    @patch("streamlink.stream.dash_manifest.time.sleep")
381    def test_duplicate_rep_id(self, sleep):
382        representation_vid = Mock(id=1, mimeType="video/mp4", height=720)
383        representation_aud = Mock(id=1, mimeType="audio/aac", lang='en')
384
385        mpd = Mock(dynamic=False,
386                   publishTime=1,
387                   periods=[
388                       Mock(adaptationSets=[
389                           Mock(contentProtection=None,
390                                representations=[
391                                    representation_vid
392                                ]),
393                           Mock(contentProtection=None,
394                                representations=[
395                                    representation_aud
396                                ])
397                       ])
398                   ])
399
400        self.assertEqual(representation_vid, DASHStreamWorker.get_representation(mpd, 1, "video/mp4"))
401        self.assertEqual(representation_aud, DASHStreamWorker.get_representation(mpd, 1, "audio/aac"))
402