1import base64
2import hmac
3import logging
4import os.path
5import random
6import re
7import string
8from binascii import unhexlify
9from collections import namedtuple
10from copy import deepcopy
11from hashlib import sha256
12from io import BytesIO
13from math import ceil
14from urllib.parse import parse_qsl, urljoin, urlparse, urlunparse
15
16from streamlink.cache import Cache
17from streamlink.exceptions import PluginError, StreamError
18from streamlink.packages.flashmedia import F4V, F4VError
19from streamlink.packages.flashmedia.box import Box
20from streamlink.packages.flashmedia.tag import ScriptData, TAG_TYPE_SCRIPT, Tag
21from streamlink.stream.flvconcat import FLVTagConcat
22from streamlink.stream.segmented import (SegmentedStreamReader, SegmentedStreamWorker, SegmentedStreamWriter)
23from streamlink.stream.stream import Stream
24from streamlink.stream.wrappers import StreamIOIterWrapper
25from streamlink.utils import absolute_url, swfdecompress
26
27log = logging.getLogger(__name__)
28# Akamai HD player verification key
29# Use unhexlify() rather than bytes.fromhex() for compatibility with before
30# Python 3. However, in Python 3.2 (not 3.3+), unhexlify only accepts a byte
31# string.
32AKAMAIHD_PV_KEY = unhexlify(
33    b"BD938D5EE6D9F42016F9C56577B6FDCF415FE4B184932B785AB32BCADC9BB592")
34
35# Some streams hosted by Akamai seem to require a hdcore parameter
36# to function properly.
37HDCORE_VERSION = "3.1.0"
38
39# Fragment URL format
40FRAGMENT_URL = "{url}{identifier}{quality}Seg{segment}-Frag{fragment}"
41
42Fragment = namedtuple("Fragment", "segment fragment duration url")
43
44
45class HDSStreamWriter(SegmentedStreamWriter):
46    def __init__(self, reader, *args, **kwargs):
47        options = reader.stream.session.options
48        kwargs["retries"] = options.get("hds-segment-attempts")
49        kwargs["threads"] = options.get("hds-segment-threads")
50        kwargs["timeout"] = options.get("hds-segment-timeout")
51        SegmentedStreamWriter.__init__(self, reader, *args, **kwargs)
52
53        duration, tags = None, []
54        if self.stream.metadata:
55            duration = self.stream.metadata.value.get("duration")
56            tags = [Tag(TAG_TYPE_SCRIPT, timestamp=0,
57                        data=self.stream.metadata)]
58
59        self.concater = FLVTagConcat(tags=tags,
60                                     duration=duration,
61                                     flatten_timestamps=True)
62
63    def fetch(self, fragment, retries=None):
64        if self.closed or not retries:
65            return
66
67        try:
68            request_params = self.stream.request_params.copy()
69            params = request_params.pop("params", {})
70            params.pop("g", None)
71            return self.session.http.get(fragment.url,
72                                         stream=True,
73                                         timeout=self.timeout,
74                                         exception=StreamError,
75                                         params=params,
76                                         **request_params)
77        except StreamError as err:
78            log.error(f"Failed to open fragment {fragment.segment}-{fragment.fragment}: {err}")
79            return self.fetch(fragment, retries - 1)
80
81    def write(self, fragment, res, chunk_size=8192):
82        fd = StreamIOIterWrapper(res.iter_content(chunk_size))
83        self.convert_fragment(fragment, fd)
84
85    def convert_fragment(self, fragment, fd):
86        mdat = None
87        try:
88            f4v = F4V(fd, raw_payload=True)
89            # Fast forward to mdat box
90            for box in f4v:
91                if box.type == "mdat":
92                    mdat = box.payload.data
93                    break
94        except F4VError as err:
95            log.error(f"Failed to parse fragment {fragment.segment}-{fragment.fragment}: {err}")
96            return
97
98        if not mdat:
99            log.error(f"No MDAT box found in fragment {fragment.segment}-{fragment.fragment}")
100            return
101
102        try:
103            for chunk in self.concater.iter_chunks(buf=mdat, skip_header=True):
104                self.reader.buffer.write(chunk)
105
106                if self.closed:
107                    break
108            else:
109                log.debug(f"Download of fragment {fragment.segment}-{fragment.fragment} complete")
110        except OSError as err:
111            if "Unknown tag type" in str(err):
112                log.error("Unknown tag type found, this stream is probably encrypted")
113                self.close()
114                return
115
116            log.error(f"Error reading fragment {fragment.segment}-{fragment.fragment}: {err}")
117
118
119class HDSStreamWorker(SegmentedStreamWorker):
120    def __init__(self, *args, **kwargs):
121        SegmentedStreamWorker.__init__(self, *args, **kwargs)
122
123        self.bootstrap = self.stream.bootstrap
124        self.current_segment = -1
125        self.current_fragment = -1
126        self.first_fragment = 1
127        self.last_fragment = -1
128        self.end_fragment = None
129
130        self.bootstrap_minimal_reload_time = 2.0
131        self.bootstrap_reload_time = self.bootstrap_minimal_reload_time
132        self.invalid_fragments = set()
133        self.live_edge = self.session.options.get("hds-live-edge")
134
135        self.update_bootstrap()
136
137    def update_bootstrap(self):
138        log.debug("Updating bootstrap")
139
140        if isinstance(self.bootstrap, Box):
141            bootstrap = self.bootstrap
142        else:
143            bootstrap = self.fetch_bootstrap(self.bootstrap)
144
145        self.live = bootstrap.payload.live
146        self.profile = bootstrap.payload.profile
147        self.timestamp = bootstrap.payload.current_media_time
148        self.identifier = bootstrap.payload.movie_identifier
149        self.time_scale = bootstrap.payload.time_scale
150        self.segmentruntable = bootstrap.payload.segment_run_table_entries[0]
151        self.fragmentruntable = bootstrap.payload.fragment_run_table_entries[0]
152
153        self.first_fragment, last_fragment = self.fragment_count()
154        fragment_duration = self.fragment_duration(last_fragment)
155
156        if last_fragment != self.last_fragment:
157            bootstrap_changed = True
158            self.last_fragment = last_fragment
159        else:
160            bootstrap_changed = False
161
162        if self.current_fragment < 0:
163            if self.live:
164                current_fragment = last_fragment
165
166                # Less likely to hit edge if we don't start with last fragment,
167                # default buffer is 10 sec.
168                fragment_buffer = int(ceil(self.live_edge / fragment_duration))
169                current_fragment = max(self.first_fragment,
170                                       current_fragment - (fragment_buffer - 1))
171
172                log.debug(f"Live edge buffer {self.live_edge} sec is {fragment_buffer} fragments")
173
174                # Make sure we don't have a duration set when it's a
175                # live stream since it will just confuse players anyway.
176                self.writer.concater.duration = None
177            else:
178                current_fragment = self.first_fragment
179
180            self.current_fragment = current_fragment
181
182        log.debug(f"Current timestamp: {self.timestamp / self.time_scale}")
183        log.debug(f"Current segment: {self.current_segment}")
184        log.debug(f"Current fragment: {self.current_fragment}")
185        log.debug(f"First fragment: {self.first_fragment}")
186        log.debug(f"Last fragment: {self.last_fragment}")
187        log.debug(f"End fragment: {self.end_fragment}")
188
189        self.bootstrap_reload_time = fragment_duration
190
191        if self.live and not bootstrap_changed:
192            log.debug("Bootstrap not changed, shortening timer")
193            self.bootstrap_reload_time /= 2
194
195        self.bootstrap_reload_time = max(self.bootstrap_reload_time,
196                                         self.bootstrap_minimal_reload_time)
197
198    def fetch_bootstrap(self, url):
199        res = self.session.http.get(url,
200                                    exception=StreamError,
201                                    **self.stream.request_params)
202        return Box.deserialize(BytesIO(res.content))
203
204    def fragment_url(self, segment, fragment):
205        url = absolute_url(self.stream.baseurl, self.stream.url)
206        return FRAGMENT_URL.format(url=url,
207                                   segment=segment,
208                                   fragment=fragment,
209                                   identifier="",
210                                   quality="")
211
212    def fragment_count(self):
213        table = self.fragmentruntable.payload.fragment_run_entry_table
214        first_fragment, end_fragment = None, None
215
216        for i, fragmentrun in enumerate(table):
217            if fragmentrun.discontinuity_indicator is not None:
218                if fragmentrun.discontinuity_indicator == 0:
219                    break
220                elif fragmentrun.discontinuity_indicator > 0:
221                    continue
222
223            if first_fragment is None:
224                first_fragment = fragmentrun.first_fragment
225
226            end_fragment = fragmentrun.first_fragment
227            fragment_duration = fragmentrun.first_fragment_timestamp + fragmentrun.fragment_duration
228
229            if self.timestamp > fragment_duration:
230                offset = (self.timestamp - fragment_duration) / fragmentrun.fragment_duration
231                end_fragment += int(offset)
232
233        if first_fragment is None:
234            first_fragment = 1
235
236        if end_fragment is None:
237            end_fragment = 1
238
239        return first_fragment, end_fragment
240
241    def fragment_duration(self, fragment):
242        fragment_duration = 0
243        table = self.fragmentruntable.payload.fragment_run_entry_table
244        time_scale = self.fragmentruntable.payload.time_scale
245
246        for i, fragmentrun in enumerate(table):
247            if fragmentrun.discontinuity_indicator is not None:
248                self.invalid_fragments.add(fragmentrun.first_fragment)
249
250                # Check for the last fragment of the stream
251                if fragmentrun.discontinuity_indicator == 0:
252                    if i > 0:
253                        prev = table[i - 1]
254                        self.end_fragment = prev.first_fragment
255
256                    break
257                elif fragmentrun.discontinuity_indicator > 0:
258                    continue
259
260            if fragment >= fragmentrun.first_fragment:
261                fragment_duration = fragmentrun.fragment_duration / time_scale
262
263        return fragment_duration
264
265    def segment_from_fragment(self, fragment):
266        table = self.segmentruntable.payload.segment_run_entry_table
267
268        for segment, start, end in self.iter_segment_table(table):
269            if start - 1 <= fragment <= end:
270                return segment
271        else:
272            segment = 1
273
274        return segment
275
276    def iter_segment_table(self, table):
277        # If the first segment in the table starts at the beginning we
278        # can go from there, otherwise we start from the end and use the
279        # total fragment count to figure out where the last segment ends.
280        if table[0].first_segment == 1:
281            prev_frag = self.first_fragment - 1
282            for segmentrun in table:
283                start = prev_frag + 1
284                end = prev_frag + segmentrun.fragments_per_segment
285
286                yield segmentrun.first_segment, start, end
287                prev_frag = end
288        else:
289            prev_frag = self.last_fragment + 1
290            for segmentrun in reversed(table):
291                start = prev_frag - segmentrun.fragments_per_segment
292                end = prev_frag - 1
293
294                yield segmentrun.first_segment, start, end
295                prev_frag = start
296
297    def valid_fragment(self, fragment):
298        return fragment not in self.invalid_fragments
299
300    def iter_segments(self):
301        while not self.closed:
302            fragments = range(self.current_fragment, self.last_fragment + 1)
303            fragments = filter(self.valid_fragment, fragments)
304
305            for fragment in fragments:
306                self.current_fragment = fragment + 1
307                self.current_segment = self.segment_from_fragment(fragment)
308
309                fragment_duration = int(self.fragment_duration(fragment) * 1000)
310                fragment_url = self.fragment_url(self.current_segment, fragment)
311                fragment = Fragment(self.current_segment, fragment,
312                                    fragment_duration, fragment_url)
313
314                log.debug(f"Adding fragment {fragment.segment}-{fragment.fragment} to queue")
315                yield fragment
316
317                # End of stream
318                stream_end = self.end_fragment and fragment.fragment >= self.end_fragment
319                if self.closed or stream_end:
320                    return
321
322            if self.wait(self.bootstrap_reload_time):
323                try:
324                    self.update_bootstrap()
325                except StreamError as err:
326                    log.warning(f"Failed to update bootstrap: {err}")
327
328
329class HDSStreamReader(SegmentedStreamReader):
330    __worker__ = HDSStreamWorker
331    __writer__ = HDSStreamWriter
332
333    def __init__(self, stream, *args, **kwargs):
334        SegmentedStreamReader.__init__(self, stream, *args, **kwargs)
335
336
337class HDSStream(Stream):
338    """
339    Implements the Adobe HTTP Dynamic Streaming protocol
340
341    *Attributes:*
342
343    - :attr:`baseurl` Base URL
344    - :attr:`url` Base path of the stream, joined with the base URL when
345      fetching fragments
346    - :attr:`bootstrap` Either a URL pointing to the bootstrap or a
347      bootstrap :class:`Box` object used for initial information about
348      the stream
349    - :attr:`metadata` Either `None` or a :class:`ScriptData` object
350      that contains metadata about the stream, such as height, width and
351      bitrate
352    """
353
354    __shortname__ = "hds"
355
356    def __init__(self, session, baseurl, url, bootstrap, metadata=None,
357                 timeout=60, **request_params):
358        Stream.__init__(self, session)
359
360        self.baseurl = baseurl
361        self.url = url
362        self.bootstrap = bootstrap
363        self.metadata = metadata
364        self.timeout = timeout
365
366        # Deep copy request params to make it mutable
367        self.request_params = deepcopy(request_params)
368
369        parsed = urlparse(self.url)
370        if parsed.query:
371            params = parse_qsl(parsed.query)
372            if params:
373                if not self.request_params.get("params"):
374                    self.request_params["params"] = {}
375
376                self.request_params["params"].update(params)
377
378        self.url = urlunparse(
379            (parsed.scheme, parsed.netloc, parsed.path, None, None, None)
380        )
381
382    def __repr__(self):
383        return ("<HDSStream({0!r}, {1!r}, {2!r},"
384                " metadata={3!r}, timeout={4!r})>").format(self.baseurl,
385                                                           self.url,
386                                                           self.bootstrap,
387                                                           self.metadata,
388                                                           self.timeout)
389
390    def __json__(self):
391        if isinstance(self.bootstrap, Box):
392            bootstrap = base64.b64encode(self.bootstrap.serialize())
393        else:
394            bootstrap = self.bootstrap
395
396        if isinstance(self.metadata, ScriptData):
397            metadata = self.metadata.__dict__
398        else:
399            metadata = self.metadata
400
401        return dict(type=HDSStream.shortname(), baseurl=self.baseurl,
402                    url=self.url, bootstrap=bootstrap, metadata=metadata,
403                    params=self.request_params.get("params", {}),
404                    headers=self.request_params.get("headers", {}))
405
406    def open(self):
407        reader = HDSStreamReader(self)
408        reader.open()
409        return reader
410
411    @classmethod
412    def parse_manifest(cls, session, url, timeout=60, pvswf=None, is_akamai=False,
413                       **request_params):
414        """Parses a HDS manifest and returns its substreams.
415
416        :param url: The URL to the manifest.
417        :param timeout: How long to wait for data to be returned from
418                        from the stream before raising an error.
419        :param is_akamai: force adding of the akamai parameters
420        :param pvswf: URL of player SWF for Akamai HD player verification.
421        """
422        # private argument, should only be used in recursive calls
423        raise_for_drm = request_params.pop("raise_for_drm", False)
424
425        if not request_params:
426            request_params = {}
427
428        request_params["headers"] = request_params.get("headers", {})
429        request_params["params"] = request_params.get("params", {})
430
431        # These params are reserved for internal use
432        request_params.pop("exception", None)
433        request_params.pop("stream", None)
434        request_params.pop("timeout", None)
435        request_params.pop("url", None)
436
437        if "akamaihd" in url or is_akamai:
438            request_params["params"]["hdcore"] = HDCORE_VERSION
439            request_params["params"]["g"] = cls.cache_buster_string(12)
440
441        res = session.http.get(url, exception=IOError, **request_params)
442        manifest = session.http.xml(res, "manifest XML", ignore_ns=True,
443                                    exception=IOError)
444
445        if manifest.findtext("drmAdditionalHeader"):
446            log.debug(f"Omitting HDS stream protected by DRM: {url}")
447            if raise_for_drm:
448                raise PluginError("{} is protected by DRM".format(url))
449            log.warning("Some or all streams are unavailable as they are protected by DRM")
450            return {}
451
452        parsed = urlparse(url)
453        baseurl = manifest.findtext("baseURL")
454        baseheight = manifest.findtext("height")
455        bootstraps = {}
456        streams = {}
457
458        if not baseurl:
459            baseurl = urljoin(url, os.path.dirname(parsed.path))
460
461        if not baseurl.endswith("/"):
462            baseurl += "/"
463
464        for bootstrap in manifest.findall("bootstrapInfo"):
465            name = bootstrap.attrib.get("id") or "_global"
466            url = bootstrap.attrib.get("url")
467
468            if url:
469                box = absolute_url(baseurl, url)
470            else:
471                data = base64.b64decode(bytes(bootstrap.text, "utf8"))
472                box = Box.deserialize(BytesIO(data))
473
474            bootstraps[name] = box
475
476        pvtoken = manifest.findtext("pv-2.0")
477        if pvtoken:
478            if not pvswf:
479                raise OSError("This manifest requires the 'pvswf' parameter "
480                              "to verify the SWF")
481
482            params = cls._pv_params(session, pvswf, pvtoken, **request_params)
483            request_params["params"].update(params)
484
485        child_drm = False
486
487        for media in manifest.findall("media"):
488            url = media.attrib.get("url")
489            bootstrapid = media.attrib.get("bootstrapInfoId", "_global")
490            href = media.attrib.get("href")
491
492            if url and bootstrapid:
493                bootstrap = bootstraps.get(bootstrapid)
494
495                if not bootstrap:
496                    continue
497
498                bitrate = media.attrib.get("bitrate")
499                streamid = media.attrib.get("streamId")
500                height = media.attrib.get("height")
501
502                if height:
503                    quality = height + "p"
504                elif bitrate:
505                    quality = bitrate + "k"
506                elif streamid:
507                    quality = streamid
508                elif baseheight:
509                    quality = baseheight + "p"
510                else:
511                    quality = "live"
512
513                metadata = media.findtext("metadata")
514
515                if metadata:
516                    metadata = base64.b64decode(bytes(metadata, "utf8"))
517                    metadata = ScriptData.deserialize(BytesIO(metadata))
518                else:
519                    metadata = None
520
521                stream = HDSStream(session, baseurl, url, bootstrap,
522                                   metadata=metadata, timeout=timeout,
523                                   **request_params)
524                streams[quality] = stream
525
526            elif href:
527                url = absolute_url(baseurl, href)
528                try:
529                    child_streams = cls.parse_manifest(session, url,
530                                                       timeout=timeout,
531                                                       is_akamai=is_akamai,
532                                                       raise_for_drm=True,
533                                                       **request_params)
534                except PluginError:
535                    child_drm = True
536                    child_streams = {}
537
538                for name, stream in child_streams.items():
539                    # Override stream name if bitrate is available in parent
540                    # manifest but not the child one.
541                    bitrate = media.attrib.get("bitrate")
542
543                    if bitrate and not re.match(r"^(\d+)k$", name):
544                        name = bitrate + "k"
545
546                    streams[name] = stream
547        if child_drm:
548            log.warning("Some or all streams are unavailable as they are protected by DRM")
549
550        return streams
551
552    @classmethod
553    def _pv_params(cls, session, pvswf, pv, **request_params):
554        """Returns any parameters needed for Akamai HD player verification.
555
556        Algorithm originally documented by KSV, source:
557        http://stream-recorder.com/forum/showpost.php?p=43761&postcount=13
558        """
559
560        try:
561            data, hdntl = pv.split(";")
562        except ValueError:
563            data = pv
564            hdntl = ""
565
566        cache = Cache(filename="stream.json")
567        key = "akamaihd-player:" + pvswf
568        cached = cache.get(key)
569
570        request_params = deepcopy(request_params)
571        headers = request_params.pop("headers", {})
572        if cached:
573            headers["If-Modified-Since"] = cached["modified"]
574        swf = session.http.get(pvswf, headers=headers, **request_params)
575
576        if cached and swf.status_code == 304:  # Server says not modified
577            hash = cached["hash"]
578        else:
579            # Calculate SHA-256 hash of the uncompressed SWF file, base-64
580            # encoded
581            hash = sha256()
582            hash.update(swfdecompress(swf.content))
583            hash = base64.b64encode(hash.digest()).decode("ascii")
584            modified = swf.headers.get("Last-Modified", "")
585
586            # Only save in cache if a valid date is given
587            if len(modified) < 40:
588                cache.set(key, dict(hash=hash, modified=modified))
589
590        msg = "st=0~exp=9999999999~acl=*~data={0}!{1}".format(data, hash)
591        auth = hmac.new(AKAMAIHD_PV_KEY, msg.encode("ascii"), sha256)
592        pvtoken = "{0}~hmac={1}".format(msg, auth.hexdigest())
593
594        # The "hdntl" parameter can be accepted as a cookie or passed in the
595        # query string, but the "pvtoken" parameter can only be in the query
596        # string
597        params = [("pvtoken", pvtoken)]
598        params.extend(parse_qsl(hdntl, keep_blank_values=True))
599
600        return params
601
602    @staticmethod
603    def cache_buster_string(length):
604        return "".join([random.choice(string.ascii_uppercase) for i in range(length)])
605