1import base64 2import hmac 3import logging 4import os.path 5import random 6import re 7import string 8from binascii import unhexlify 9from collections import namedtuple 10from copy import deepcopy 11from hashlib import sha256 12from io import BytesIO 13from math import ceil 14from urllib.parse import parse_qsl, urljoin, urlparse, urlunparse 15 16from streamlink.cache import Cache 17from streamlink.exceptions import PluginError, StreamError 18from streamlink.packages.flashmedia import F4V, F4VError 19from streamlink.packages.flashmedia.box import Box 20from streamlink.packages.flashmedia.tag import ScriptData, TAG_TYPE_SCRIPT, Tag 21from streamlink.stream.flvconcat import FLVTagConcat 22from streamlink.stream.segmented import (SegmentedStreamReader, SegmentedStreamWorker, SegmentedStreamWriter) 23from streamlink.stream.stream import Stream 24from streamlink.stream.wrappers import StreamIOIterWrapper 25from streamlink.utils import absolute_url, swfdecompress 26 27log = logging.getLogger(__name__) 28# Akamai HD player verification key 29# Use unhexlify() rather than bytes.fromhex() for compatibility with before 30# Python 3. However, in Python 3.2 (not 3.3+), unhexlify only accepts a byte 31# string. 32AKAMAIHD_PV_KEY = unhexlify( 33 b"BD938D5EE6D9F42016F9C56577B6FDCF415FE4B184932B785AB32BCADC9BB592") 34 35# Some streams hosted by Akamai seem to require a hdcore parameter 36# to function properly. 37HDCORE_VERSION = "3.1.0" 38 39# Fragment URL format 40FRAGMENT_URL = "{url}{identifier}{quality}Seg{segment}-Frag{fragment}" 41 42Fragment = namedtuple("Fragment", "segment fragment duration url") 43 44 45class HDSStreamWriter(SegmentedStreamWriter): 46 def __init__(self, reader, *args, **kwargs): 47 options = reader.stream.session.options 48 kwargs["retries"] = options.get("hds-segment-attempts") 49 kwargs["threads"] = options.get("hds-segment-threads") 50 kwargs["timeout"] = options.get("hds-segment-timeout") 51 SegmentedStreamWriter.__init__(self, reader, *args, **kwargs) 52 53 duration, tags = None, [] 54 if self.stream.metadata: 55 duration = self.stream.metadata.value.get("duration") 56 tags = [Tag(TAG_TYPE_SCRIPT, timestamp=0, 57 data=self.stream.metadata)] 58 59 self.concater = FLVTagConcat(tags=tags, 60 duration=duration, 61 flatten_timestamps=True) 62 63 def fetch(self, fragment, retries=None): 64 if self.closed or not retries: 65 return 66 67 try: 68 request_params = self.stream.request_params.copy() 69 params = request_params.pop("params", {}) 70 params.pop("g", None) 71 return self.session.http.get(fragment.url, 72 stream=True, 73 timeout=self.timeout, 74 exception=StreamError, 75 params=params, 76 **request_params) 77 except StreamError as err: 78 log.error(f"Failed to open fragment {fragment.segment}-{fragment.fragment}: {err}") 79 return self.fetch(fragment, retries - 1) 80 81 def write(self, fragment, res, chunk_size=8192): 82 fd = StreamIOIterWrapper(res.iter_content(chunk_size)) 83 self.convert_fragment(fragment, fd) 84 85 def convert_fragment(self, fragment, fd): 86 mdat = None 87 try: 88 f4v = F4V(fd, raw_payload=True) 89 # Fast forward to mdat box 90 for box in f4v: 91 if box.type == "mdat": 92 mdat = box.payload.data 93 break 94 except F4VError as err: 95 log.error(f"Failed to parse fragment {fragment.segment}-{fragment.fragment}: {err}") 96 return 97 98 if not mdat: 99 log.error(f"No MDAT box found in fragment {fragment.segment}-{fragment.fragment}") 100 return 101 102 try: 103 for chunk in self.concater.iter_chunks(buf=mdat, skip_header=True): 104 self.reader.buffer.write(chunk) 105 106 if self.closed: 107 break 108 else: 109 log.debug(f"Download of fragment {fragment.segment}-{fragment.fragment} complete") 110 except OSError as err: 111 if "Unknown tag type" in str(err): 112 log.error("Unknown tag type found, this stream is probably encrypted") 113 self.close() 114 return 115 116 log.error(f"Error reading fragment {fragment.segment}-{fragment.fragment}: {err}") 117 118 119class HDSStreamWorker(SegmentedStreamWorker): 120 def __init__(self, *args, **kwargs): 121 SegmentedStreamWorker.__init__(self, *args, **kwargs) 122 123 self.bootstrap = self.stream.bootstrap 124 self.current_segment = -1 125 self.current_fragment = -1 126 self.first_fragment = 1 127 self.last_fragment = -1 128 self.end_fragment = None 129 130 self.bootstrap_minimal_reload_time = 2.0 131 self.bootstrap_reload_time = self.bootstrap_minimal_reload_time 132 self.invalid_fragments = set() 133 self.live_edge = self.session.options.get("hds-live-edge") 134 135 self.update_bootstrap() 136 137 def update_bootstrap(self): 138 log.debug("Updating bootstrap") 139 140 if isinstance(self.bootstrap, Box): 141 bootstrap = self.bootstrap 142 else: 143 bootstrap = self.fetch_bootstrap(self.bootstrap) 144 145 self.live = bootstrap.payload.live 146 self.profile = bootstrap.payload.profile 147 self.timestamp = bootstrap.payload.current_media_time 148 self.identifier = bootstrap.payload.movie_identifier 149 self.time_scale = bootstrap.payload.time_scale 150 self.segmentruntable = bootstrap.payload.segment_run_table_entries[0] 151 self.fragmentruntable = bootstrap.payload.fragment_run_table_entries[0] 152 153 self.first_fragment, last_fragment = self.fragment_count() 154 fragment_duration = self.fragment_duration(last_fragment) 155 156 if last_fragment != self.last_fragment: 157 bootstrap_changed = True 158 self.last_fragment = last_fragment 159 else: 160 bootstrap_changed = False 161 162 if self.current_fragment < 0: 163 if self.live: 164 current_fragment = last_fragment 165 166 # Less likely to hit edge if we don't start with last fragment, 167 # default buffer is 10 sec. 168 fragment_buffer = int(ceil(self.live_edge / fragment_duration)) 169 current_fragment = max(self.first_fragment, 170 current_fragment - (fragment_buffer - 1)) 171 172 log.debug(f"Live edge buffer {self.live_edge} sec is {fragment_buffer} fragments") 173 174 # Make sure we don't have a duration set when it's a 175 # live stream since it will just confuse players anyway. 176 self.writer.concater.duration = None 177 else: 178 current_fragment = self.first_fragment 179 180 self.current_fragment = current_fragment 181 182 log.debug(f"Current timestamp: {self.timestamp / self.time_scale}") 183 log.debug(f"Current segment: {self.current_segment}") 184 log.debug(f"Current fragment: {self.current_fragment}") 185 log.debug(f"First fragment: {self.first_fragment}") 186 log.debug(f"Last fragment: {self.last_fragment}") 187 log.debug(f"End fragment: {self.end_fragment}") 188 189 self.bootstrap_reload_time = fragment_duration 190 191 if self.live and not bootstrap_changed: 192 log.debug("Bootstrap not changed, shortening timer") 193 self.bootstrap_reload_time /= 2 194 195 self.bootstrap_reload_time = max(self.bootstrap_reload_time, 196 self.bootstrap_minimal_reload_time) 197 198 def fetch_bootstrap(self, url): 199 res = self.session.http.get(url, 200 exception=StreamError, 201 **self.stream.request_params) 202 return Box.deserialize(BytesIO(res.content)) 203 204 def fragment_url(self, segment, fragment): 205 url = absolute_url(self.stream.baseurl, self.stream.url) 206 return FRAGMENT_URL.format(url=url, 207 segment=segment, 208 fragment=fragment, 209 identifier="", 210 quality="") 211 212 def fragment_count(self): 213 table = self.fragmentruntable.payload.fragment_run_entry_table 214 first_fragment, end_fragment = None, None 215 216 for i, fragmentrun in enumerate(table): 217 if fragmentrun.discontinuity_indicator is not None: 218 if fragmentrun.discontinuity_indicator == 0: 219 break 220 elif fragmentrun.discontinuity_indicator > 0: 221 continue 222 223 if first_fragment is None: 224 first_fragment = fragmentrun.first_fragment 225 226 end_fragment = fragmentrun.first_fragment 227 fragment_duration = fragmentrun.first_fragment_timestamp + fragmentrun.fragment_duration 228 229 if self.timestamp > fragment_duration: 230 offset = (self.timestamp - fragment_duration) / fragmentrun.fragment_duration 231 end_fragment += int(offset) 232 233 if first_fragment is None: 234 first_fragment = 1 235 236 if end_fragment is None: 237 end_fragment = 1 238 239 return first_fragment, end_fragment 240 241 def fragment_duration(self, fragment): 242 fragment_duration = 0 243 table = self.fragmentruntable.payload.fragment_run_entry_table 244 time_scale = self.fragmentruntable.payload.time_scale 245 246 for i, fragmentrun in enumerate(table): 247 if fragmentrun.discontinuity_indicator is not None: 248 self.invalid_fragments.add(fragmentrun.first_fragment) 249 250 # Check for the last fragment of the stream 251 if fragmentrun.discontinuity_indicator == 0: 252 if i > 0: 253 prev = table[i - 1] 254 self.end_fragment = prev.first_fragment 255 256 break 257 elif fragmentrun.discontinuity_indicator > 0: 258 continue 259 260 if fragment >= fragmentrun.first_fragment: 261 fragment_duration = fragmentrun.fragment_duration / time_scale 262 263 return fragment_duration 264 265 def segment_from_fragment(self, fragment): 266 table = self.segmentruntable.payload.segment_run_entry_table 267 268 for segment, start, end in self.iter_segment_table(table): 269 if start - 1 <= fragment <= end: 270 return segment 271 else: 272 segment = 1 273 274 return segment 275 276 def iter_segment_table(self, table): 277 # If the first segment in the table starts at the beginning we 278 # can go from there, otherwise we start from the end and use the 279 # total fragment count to figure out where the last segment ends. 280 if table[0].first_segment == 1: 281 prev_frag = self.first_fragment - 1 282 for segmentrun in table: 283 start = prev_frag + 1 284 end = prev_frag + segmentrun.fragments_per_segment 285 286 yield segmentrun.first_segment, start, end 287 prev_frag = end 288 else: 289 prev_frag = self.last_fragment + 1 290 for segmentrun in reversed(table): 291 start = prev_frag - segmentrun.fragments_per_segment 292 end = prev_frag - 1 293 294 yield segmentrun.first_segment, start, end 295 prev_frag = start 296 297 def valid_fragment(self, fragment): 298 return fragment not in self.invalid_fragments 299 300 def iter_segments(self): 301 while not self.closed: 302 fragments = range(self.current_fragment, self.last_fragment + 1) 303 fragments = filter(self.valid_fragment, fragments) 304 305 for fragment in fragments: 306 self.current_fragment = fragment + 1 307 self.current_segment = self.segment_from_fragment(fragment) 308 309 fragment_duration = int(self.fragment_duration(fragment) * 1000) 310 fragment_url = self.fragment_url(self.current_segment, fragment) 311 fragment = Fragment(self.current_segment, fragment, 312 fragment_duration, fragment_url) 313 314 log.debug(f"Adding fragment {fragment.segment}-{fragment.fragment} to queue") 315 yield fragment 316 317 # End of stream 318 stream_end = self.end_fragment and fragment.fragment >= self.end_fragment 319 if self.closed or stream_end: 320 return 321 322 if self.wait(self.bootstrap_reload_time): 323 try: 324 self.update_bootstrap() 325 except StreamError as err: 326 log.warning(f"Failed to update bootstrap: {err}") 327 328 329class HDSStreamReader(SegmentedStreamReader): 330 __worker__ = HDSStreamWorker 331 __writer__ = HDSStreamWriter 332 333 def __init__(self, stream, *args, **kwargs): 334 SegmentedStreamReader.__init__(self, stream, *args, **kwargs) 335 336 337class HDSStream(Stream): 338 """ 339 Implements the Adobe HTTP Dynamic Streaming protocol 340 341 *Attributes:* 342 343 - :attr:`baseurl` Base URL 344 - :attr:`url` Base path of the stream, joined with the base URL when 345 fetching fragments 346 - :attr:`bootstrap` Either a URL pointing to the bootstrap or a 347 bootstrap :class:`Box` object used for initial information about 348 the stream 349 - :attr:`metadata` Either `None` or a :class:`ScriptData` object 350 that contains metadata about the stream, such as height, width and 351 bitrate 352 """ 353 354 __shortname__ = "hds" 355 356 def __init__(self, session, baseurl, url, bootstrap, metadata=None, 357 timeout=60, **request_params): 358 Stream.__init__(self, session) 359 360 self.baseurl = baseurl 361 self.url = url 362 self.bootstrap = bootstrap 363 self.metadata = metadata 364 self.timeout = timeout 365 366 # Deep copy request params to make it mutable 367 self.request_params = deepcopy(request_params) 368 369 parsed = urlparse(self.url) 370 if parsed.query: 371 params = parse_qsl(parsed.query) 372 if params: 373 if not self.request_params.get("params"): 374 self.request_params["params"] = {} 375 376 self.request_params["params"].update(params) 377 378 self.url = urlunparse( 379 (parsed.scheme, parsed.netloc, parsed.path, None, None, None) 380 ) 381 382 def __repr__(self): 383 return ("<HDSStream({0!r}, {1!r}, {2!r}," 384 " metadata={3!r}, timeout={4!r})>").format(self.baseurl, 385 self.url, 386 self.bootstrap, 387 self.metadata, 388 self.timeout) 389 390 def __json__(self): 391 if isinstance(self.bootstrap, Box): 392 bootstrap = base64.b64encode(self.bootstrap.serialize()) 393 else: 394 bootstrap = self.bootstrap 395 396 if isinstance(self.metadata, ScriptData): 397 metadata = self.metadata.__dict__ 398 else: 399 metadata = self.metadata 400 401 return dict(type=HDSStream.shortname(), baseurl=self.baseurl, 402 url=self.url, bootstrap=bootstrap, metadata=metadata, 403 params=self.request_params.get("params", {}), 404 headers=self.request_params.get("headers", {})) 405 406 def open(self): 407 reader = HDSStreamReader(self) 408 reader.open() 409 return reader 410 411 @classmethod 412 def parse_manifest(cls, session, url, timeout=60, pvswf=None, is_akamai=False, 413 **request_params): 414 """Parses a HDS manifest and returns its substreams. 415 416 :param url: The URL to the manifest. 417 :param timeout: How long to wait for data to be returned from 418 from the stream before raising an error. 419 :param is_akamai: force adding of the akamai parameters 420 :param pvswf: URL of player SWF for Akamai HD player verification. 421 """ 422 # private argument, should only be used in recursive calls 423 raise_for_drm = request_params.pop("raise_for_drm", False) 424 425 if not request_params: 426 request_params = {} 427 428 request_params["headers"] = request_params.get("headers", {}) 429 request_params["params"] = request_params.get("params", {}) 430 431 # These params are reserved for internal use 432 request_params.pop("exception", None) 433 request_params.pop("stream", None) 434 request_params.pop("timeout", None) 435 request_params.pop("url", None) 436 437 if "akamaihd" in url or is_akamai: 438 request_params["params"]["hdcore"] = HDCORE_VERSION 439 request_params["params"]["g"] = cls.cache_buster_string(12) 440 441 res = session.http.get(url, exception=IOError, **request_params) 442 manifest = session.http.xml(res, "manifest XML", ignore_ns=True, 443 exception=IOError) 444 445 if manifest.findtext("drmAdditionalHeader"): 446 log.debug(f"Omitting HDS stream protected by DRM: {url}") 447 if raise_for_drm: 448 raise PluginError("{} is protected by DRM".format(url)) 449 log.warning("Some or all streams are unavailable as they are protected by DRM") 450 return {} 451 452 parsed = urlparse(url) 453 baseurl = manifest.findtext("baseURL") 454 baseheight = manifest.findtext("height") 455 bootstraps = {} 456 streams = {} 457 458 if not baseurl: 459 baseurl = urljoin(url, os.path.dirname(parsed.path)) 460 461 if not baseurl.endswith("/"): 462 baseurl += "/" 463 464 for bootstrap in manifest.findall("bootstrapInfo"): 465 name = bootstrap.attrib.get("id") or "_global" 466 url = bootstrap.attrib.get("url") 467 468 if url: 469 box = absolute_url(baseurl, url) 470 else: 471 data = base64.b64decode(bytes(bootstrap.text, "utf8")) 472 box = Box.deserialize(BytesIO(data)) 473 474 bootstraps[name] = box 475 476 pvtoken = manifest.findtext("pv-2.0") 477 if pvtoken: 478 if not pvswf: 479 raise OSError("This manifest requires the 'pvswf' parameter " 480 "to verify the SWF") 481 482 params = cls._pv_params(session, pvswf, pvtoken, **request_params) 483 request_params["params"].update(params) 484 485 child_drm = False 486 487 for media in manifest.findall("media"): 488 url = media.attrib.get("url") 489 bootstrapid = media.attrib.get("bootstrapInfoId", "_global") 490 href = media.attrib.get("href") 491 492 if url and bootstrapid: 493 bootstrap = bootstraps.get(bootstrapid) 494 495 if not bootstrap: 496 continue 497 498 bitrate = media.attrib.get("bitrate") 499 streamid = media.attrib.get("streamId") 500 height = media.attrib.get("height") 501 502 if height: 503 quality = height + "p" 504 elif bitrate: 505 quality = bitrate + "k" 506 elif streamid: 507 quality = streamid 508 elif baseheight: 509 quality = baseheight + "p" 510 else: 511 quality = "live" 512 513 metadata = media.findtext("metadata") 514 515 if metadata: 516 metadata = base64.b64decode(bytes(metadata, "utf8")) 517 metadata = ScriptData.deserialize(BytesIO(metadata)) 518 else: 519 metadata = None 520 521 stream = HDSStream(session, baseurl, url, bootstrap, 522 metadata=metadata, timeout=timeout, 523 **request_params) 524 streams[quality] = stream 525 526 elif href: 527 url = absolute_url(baseurl, href) 528 try: 529 child_streams = cls.parse_manifest(session, url, 530 timeout=timeout, 531 is_akamai=is_akamai, 532 raise_for_drm=True, 533 **request_params) 534 except PluginError: 535 child_drm = True 536 child_streams = {} 537 538 for name, stream in child_streams.items(): 539 # Override stream name if bitrate is available in parent 540 # manifest but not the child one. 541 bitrate = media.attrib.get("bitrate") 542 543 if bitrate and not re.match(r"^(\d+)k$", name): 544 name = bitrate + "k" 545 546 streams[name] = stream 547 if child_drm: 548 log.warning("Some or all streams are unavailable as they are protected by DRM") 549 550 return streams 551 552 @classmethod 553 def _pv_params(cls, session, pvswf, pv, **request_params): 554 """Returns any parameters needed for Akamai HD player verification. 555 556 Algorithm originally documented by KSV, source: 557 http://stream-recorder.com/forum/showpost.php?p=43761&postcount=13 558 """ 559 560 try: 561 data, hdntl = pv.split(";") 562 except ValueError: 563 data = pv 564 hdntl = "" 565 566 cache = Cache(filename="stream.json") 567 key = "akamaihd-player:" + pvswf 568 cached = cache.get(key) 569 570 request_params = deepcopy(request_params) 571 headers = request_params.pop("headers", {}) 572 if cached: 573 headers["If-Modified-Since"] = cached["modified"] 574 swf = session.http.get(pvswf, headers=headers, **request_params) 575 576 if cached and swf.status_code == 304: # Server says not modified 577 hash = cached["hash"] 578 else: 579 # Calculate SHA-256 hash of the uncompressed SWF file, base-64 580 # encoded 581 hash = sha256() 582 hash.update(swfdecompress(swf.content)) 583 hash = base64.b64encode(hash.digest()).decode("ascii") 584 modified = swf.headers.get("Last-Modified", "") 585 586 # Only save in cache if a valid date is given 587 if len(modified) < 40: 588 cache.set(key, dict(hash=hash, modified=modified)) 589 590 msg = "st=0~exp=9999999999~acl=*~data={0}!{1}".format(data, hash) 591 auth = hmac.new(AKAMAIHD_PV_KEY, msg.encode("ascii"), sha256) 592 pvtoken = "{0}~hmac={1}".format(msg, auth.hexdigest()) 593 594 # The "hdntl" parameter can be accepted as a cookie or passed in the 595 # query string, but the "pvtoken" parameter can only be in the query 596 # string 597 params = [("pvtoken", pvtoken)] 598 params.extend(parse_qsl(hdntl, keep_blank_values=True)) 599 600 return params 601 602 @staticmethod 603 def cache_buster_string(length): 604 return "".join([random.choice(string.ascii_uppercase) for i in range(length)]) 605