1import logging 2import re 3from html import unescape as html_unescape 4from urllib.parse import unquote, urlparse 5 6from streamlink.plugin import Plugin 7from streamlink.plugin.api import useragents 8from streamlink.plugin.api.utils import itertags 9from streamlink.stream import HLSStream, HTTPStream 10from streamlink.utils import update_scheme 11 12log = logging.getLogger(__name__) 13 14 15class VK(Plugin): 16 17 API_URL = 'https://vk.com/al_video.php' 18 19 _url_re = re.compile(r'''https?://(?:\w+\.)?vk\.com/video 20 (?:\?z=video)?(?P<video_id>-?[0-9]*_[0-9]*) 21 ''', re.VERBOSE) 22 _url_catalog_re = re.compile(r"https?://(\w+\.)?vk\.com/videos-?[0-9]*") 23 _vod_quality_re = re.compile(r"\.([0-9]*?)\.mp4") 24 25 @classmethod 26 def can_handle_url(cls, url): 27 if cls._url_catalog_re.match(url) is not None: 28 url = cls.follow_vk_redirect(url) 29 if url is None: 30 return False 31 return cls._url_re.match(url) is not None 32 33 @classmethod 34 def follow_vk_redirect(cls, url): 35 # If this is a 'videos' catalog URL 36 # with an video ID in the GET request, get that instead 37 parsed_url = urlparse(url) 38 if parsed_url.path.startswith('/videos'): 39 query = {v[0]: v[1] for v in [q.split('=') for q in parsed_url.query.split('&')] if v[0] == 'z'} 40 try: 41 true_path = unquote(query['z']).split('/')[0] 42 return parsed_url.scheme + '://' + parsed_url.netloc + '/' + true_path 43 except KeyError: 44 # No redirect found in query string, 45 # so return the catalog url and fail later 46 return url 47 else: 48 return url 49 50 def _get_streams(self): 51 """ 52 Find the streams for vk.com 53 :return: 54 """ 55 self.session.http.headers.update({'User-Agent': useragents.IPHONE_6}) 56 57 # If this is a 'videos' catalog URL 58 # with an video ID in the GET request, get that instead 59 url = self.follow_vk_redirect(self.url) 60 61 m = self._url_re.match(url) 62 if not m: 63 log.error('URL is not compatible: {0}'.format(url)) 64 return 65 66 video_id = m.group('video_id') 67 log.debug('video ID: {0}'.format(video_id)) 68 69 params = { 70 'act': 'show_inline', 71 'al': '1', 72 'video': video_id, 73 } 74 res = self.session.http.post(self.API_URL, params=params) 75 76 for _i in itertags(res.text, 'iframe'): 77 if _i.attributes.get('src'): 78 iframe_url = update_scheme(self.url, _i.attributes['src']) 79 log.debug('Found iframe: {0}'.format(iframe_url)) 80 yield from self.session.streams(iframe_url).items() 81 82 for _i in itertags(res.text.replace('\\', ''), 'source'): 83 if _i.attributes.get('type') == 'application/vnd.apple.mpegurl': 84 video_url = html_unescape(_i.attributes['src']) 85 streams = HLSStream.parse_variant_playlist(self.session, 86 video_url) 87 if not streams: 88 yield 'live', HLSStream(self.session, video_url) 89 else: 90 yield from streams.items() 91 elif _i.attributes.get('type') == 'video/mp4': 92 q = 'vod' 93 video_url = _i.attributes['src'] 94 m = self._vod_quality_re.search(video_url) 95 if m: 96 q = '{0}p'.format(m.group(1)) 97 yield q, HTTPStream(self.session, video_url) 98 99 100__plugin__ = VK 101