1import re 2import json 3import math 4import base64 5import logging 6from datetime import datetime, timedelta 7 8from argparse import ArgumentParser 9 10parser = ArgumentParser() 11parser.add_argument('-d', '--duration', choices=('any', 'short', 'medium', 'long')) 12parser.add_argument('-a', '--after') 13parser.add_argument('-l', '--live', nargs="?", const=True) 14parser.add_argument('-c', '--category', nargs=1) 15parser.add_argument('search', nargs='+') 16 17import pafy 18 19from .. import g, c, screen, config, util, content, listview, contentquery 20from ..playlist import Video, Playlist 21from . import command 22from .songlist import plist, paginatesongs 23 24 25ISO8601_TIMEDUR_EX = re.compile(r'PT((\d{1,3})H)?((\d{1,3})M)?((\d{1,2})S)?') 26 27DAYS = dict(day = 1, 28 week = 7, 29 month = 30, 30 year = 365) 31 32 33def _search(progtext, qs=None, msg=None, failmsg=None): 34 """ Perform memoized url fetch, display progtext. """ 35 36 loadmsg = "Searching for '%s%s%s'" % (c.y, progtext, c.w) 37 38 wdata = pafy.call_gdata('search', qs) 39 40 def iter_songs(): 41 wdata2 = wdata 42 while True: 43 for song in get_tracks_from_json(wdata2): 44 yield song 45 46 if not wdata2.get('nextPageToken'): 47 break 48 qs['pageToken'] = wdata2['nextPageToken'] 49 wdata2 = pafy.call_gdata('search', qs) 50 51 # The youtube search api returns a maximum of 500 results 52 length = min(wdata['pageInfo']['totalResults'], 500) 53 slicer = util.IterSlicer(iter_songs(), length) 54 55 paginatesongs(slicer, length=length, msg=msg, failmsg=failmsg, 56 loadmsg=loadmsg) 57 58 59def token(page): 60 """ Returns a page token for a given start index. """ 61 index = (page or 0) * util.getxy().max_results 62 k = index//128 - 1 63 index -= 128 * k 64 f = [8, index] 65 if k > 0 or index > 127: 66 f.append(k+1) 67 f += [16, 0] 68 b64 = base64.b64encode(bytes(f)).decode('utf8') 69 return b64.strip('=') 70 71 72def generate_search_qs(term, match='term', videoDuration='any', after=None, category=None, is_live=False): 73 """ Return query string. """ 74 75 aliases = dict(views='viewCount') 76 qs = { 77 'q': term, 78 'maxResults': 50, 79 'safeSearch': "none", 80 'order': aliases.get(config.ORDER.get, config.ORDER.get), 81 'part': 'id,snippet', 82 'type': 'video', 83 'videoDuration': videoDuration, 84 'key': config.API_KEY.get 85 } 86 87 if after: 88 after = after.lower() 89 qs['publishedAfter'] = '%sZ' % (datetime.utcnow() - timedelta(days=DAYS[after])).isoformat() \ 90 if after in DAYS.keys() else '%s%s' % (after, 'T00:00:00Z' * (len(after) == 10)) 91 92 if match == 'related': 93 qs['relatedToVideoId'] = term 94 del qs['q'] 95 96 if config.SEARCH_MUSIC.get: 97 qs['videoCategoryId'] = 10 98 99 if category: 100 qs['videoCategoryId'] = category 101 102 if is_live: 103 qs['eventType'] = "live" 104 105 return qs 106 107 108def userdata_cached(userterm): 109 """ Check if user name search term found in cache """ 110 userterm = ''.join([t.strip().lower() for t in userterm.split(' ')]) 111 return g.username_query_cache.get(userterm) 112 113 114def cache_userdata(userterm, username, channel_id): 115 """ Cache user name and channel id tuple """ 116 userterm = ''.join([t.strip().lower() for t in userterm.split(' ')]) 117 g.username_query_cache[userterm] = (username, channel_id) 118 util.dbg('Cache data for username search query "{}": {} ({})'.format( 119 userterm, username, channel_id)) 120 121 while len(g.username_query_cache) > 300: 122 g.username_query_cache.popitem(last=False) 123 return (username, channel_id) 124 125 126def channelfromname(user): 127 """ Query channel id from username. """ 128 129 cached = userdata_cached(user) 130 if cached: 131 user, channel_id = cached 132 else: 133 # if the user is looked for by their display name, 134 # we have to sent an additional request to find their 135 # channel id 136 qs = {'part': 'id,snippet', 137 'forUsername': user, 138 'key': config.API_KEY.get} 139 140 try: 141 userinfo = pafy.call_gdata('channels', qs)['items'] 142 if len(userinfo) > 0: 143 snippet = userinfo[0].get('snippet', {}) 144 channel_id = userinfo[0].get('id', user) 145 username = snippet.get('title', user) 146 user = cache_userdata(user, username, channel_id)[0] 147 else: 148 g.message = "User {} not found.".format(c.y + user + c.w) 149 return 150 151 except pafy.GdataError as e: 152 g.message = "Could not retrieve information for user {}\n{}".format( 153 c.y + user + c.w, e) 154 util.dbg('Error during channel request for user {}:\n{}'.format( 155 user, e)) 156 return 157 158 # at this point, we know the channel id associated to a user name 159 return (user, channel_id) 160 161 162@command(r'channels\s+(.+)') 163def channelsearch(q_user): 164 165 qs = {'part': 'id,snippet', 166 'q': q_user, 167 'maxResults': 50, 168 'type': 'channel', 169 'order': "relevance" 170 } 171 172 QueryObj = contentquery.ContentQuery(listview.ListUser, 'search', qs) 173 columns = [ 174 {"name": "idx", "size": 3, "heading": "Num"}, 175 {"name": "name", "size": 30, "heading": "Username"}, 176 {"name": "description", "size": "remaining", "heading": "Description"}, 177 ] 178 179 def run_m(user_id): 180 """ Search ! """ 181 usersearch_id(*(user_id[0])) 182 del g.content 183 184 g.content = listview.ListView(columns, QueryObj, run_m) 185 g.message = "Results for channel search: '%s'" % q_user 186 187 188@command(r'user\s+(.+)', 'user') 189def usersearch(q_user, identify='forUsername'): 190 """ Fetch uploads by a YouTube user. """ 191 192 user, _, term = (x.strip() for x in q_user.partition("/")) 193 if identify == 'forUsername': 194 ret = channelfromname(user) 195 if not ret: # Error 196 return 197 user, channel_id = ret 198 199 else: 200 channel_id = user 201 202 # at this point, we know the channel id associated to a user name 203 usersearch_id(user, channel_id, term) 204 205 206def usersearch_id(user, channel_id, term): 207 """ Performs a search within a user's (i.e. a channel's) uploads 208 for an optional search term with the user (i.e. the channel) 209 identified by its ID """ 210 211 query = generate_search_qs(term) 212 aliases = dict(views='viewCount') # The value of the config item is 'views' not 'viewCount' 213 if config.USER_ORDER.get: 214 query['order'] = aliases.get(config.USER_ORDER.get, 215 config.USER_ORDER.get) 216 query['channelId'] = channel_id 217 218 termuser = tuple([c.y + x + c.w for x in (term, user)]) 219 if term: 220 msg = "Results for {1}{3}{0} (by {2}{4}{0})" 221 progtext = "%s by %s" % termuser 222 failmsg = "No matching results for %s (by %s)" % termuser 223 else: 224 msg = "Video uploads by {2}{4}{0}" 225 progtext = termuser[1] 226 if config.SEARCH_MUSIC: 227 failmsg = """User %s not found or has no videos in the Music category. 228Use 'set search_music False' to show results not in the Music category.""" % termuser[1] 229 else: 230 failmsg = "User %s not found or has no videos." % termuser[1] 231 msg = str(msg).format(c.w, c.y, c.y, term, user) 232 233 _search(progtext, query, msg, failmsg) 234 235 236def related_search(vitem): 237 """ Fetch uploads by a YouTube user. """ 238 query = generate_search_qs(vitem.ytid, match='related') 239 240 if query.get('videoCategoryId'): 241 del query['videoCategoryId'] 242 243 t = vitem.title 244 ttitle = t[:48].strip() + ".." if len(t) > 49 else t 245 246 msg = "Videos related to %s%s%s" % (c.y, ttitle, c.w) 247 failmsg = "Related to %s%s%s not found" % (c.y, vitem.ytid, c.w) 248 _search(ttitle, query, msg, failmsg) 249 250 251# Livestream category search 252@command(r'live\s+(.+)', 'live') 253def livestream_category_search(term): 254 sel_category = g.categories.get(term, None) 255 256 if not sel_category: 257 g.message = ("That is not a valid category. Valid categories are: ") 258 g.message += (", ".join(g.categories.keys())) 259 return 260 261 query = { 262 "part": "id,snippet", 263 "eventType": "live", 264 "maxResults": 50, 265 "type": "video", 266 "videoCategoryId": sel_category 267 } 268 269 query_obj = contentquery.ContentQuery(listview.ListLiveStream, 'search', query) 270 columns = [ 271 {"name": "idx", "size": 3, "heading": "Num"}, 272 {"name": "title", "size": 40, "heading": "Title"}, 273 {"name": "description", "size": "remaining", "heading": "Description"}, 274 ] 275 276 def start_stream(returned): 277 songs = Playlist("Search Results", [Video(*x) for x in returned]) 278 if not config.PLAYER.get or not util.has_exefile(config.PLAYER.get): 279 g.message = "Player not configured! Enter %sset player <player_app> "\ 280 "%s to set a player" % (c.g, c.w) 281 return 282 g.PLAYER_OBJ.play(songs, False, False, False) 283 284 g.content = listview.ListView(columns, query_obj, start_stream) 285 g.message = "Livestreams in category: '%s'" % term 286 287 288# Note: [^./] is to prevent overlap with playlist search command 289@command(r'(?:search|\.|/)\s*([^./].{1,500})', 'search') 290def search(term): 291 """ Perform search. """ 292 try: # TODO make use of unknowns 293 args, unknown = parser.parse_known_args(term.split()) 294 video_duration = args.duration if args.duration else 'any' 295 if args.category: 296 if not args.category[0].isdigit(): 297 args.category = g.categories.get(args.category[0]) 298 else: 299 args.category = "".join(args.category) 300 after = args.after 301 term = ' '.join(args.search) 302 except SystemExit: # <------ argsparse calls exit() 303 g.message = c.b + "Bad syntax. Enter h for help" + c.w 304 return 305 306 if not term or len(term) < 2: 307 g.message = c.r + "Not enough input" + c.w 308 g.content = content.generate_songlist_display() 309 return 310 311 logging.info("search for %s", term) 312 query = generate_search_qs(term, videoDuration=video_duration, after=after, 313 category=args.category, is_live=args.live) 314 315 msg = "Search results for %s%s%s" % (c.y, term, c.w) 316 failmsg = "Found nothing for %s%s%s" % (c.y, term, c.w) 317 _search(term, query, msg, failmsg) 318 319 320@command(r'u(?:ser)?pl\s(.*)', 'userpl', 'upl') 321def user_pls(user): 322 """ Retrieve user playlists. """ 323 return pl_search(user, is_user=True) 324 325 326@command(r'(?:\.\.|\/\/|pls(?:earch)?\s)\s*(.*)', 'plsearch') 327def pl_search(term, page=0, splash=True, is_user=False): 328 """ Search for YouTube playlists. 329 330 term can be query str or dict indicating user playlist search. 331 332 """ 333 if not term or len(term) < 2: 334 g.message = c.r + "Not enough input" + c.w 335 g.content = content.generate_songlist_display() 336 return 337 338 if splash: 339 g.content = content.logo(c.g) 340 prog = "user: " + term if is_user else term 341 g.message = "Searching playlists for %s" % c.y + prog + c.w 342 screen.update() 343 344 if is_user: 345 ret = channelfromname(term) 346 if not ret: # Error 347 return 348 user, channel_id = ret 349 350 else: 351 # playlist search is done with the above url and param type=playlist 352 logging.info("playlist search for %s", prog) 353 qs = generate_search_qs(term) 354 qs['pageToken'] = token(page) 355 qs['type'] = 'playlist' 356 if 'videoCategoryId' in qs: 357 del qs['videoCategoryId'] # Incompatable with type=playlist 358 359 pldata = pafy.call_gdata('search', qs) 360 361 id_list = [i.get('id', {}).get('playlistId') 362 for i in pldata.get('items', ()) 363 if i['id']['kind'] == 'youtube#playlist'] 364 365 result_count = min(pldata['pageInfo']['totalResults'], 500) 366 367 qs = {'part': 'contentDetails,snippet', 368 'maxResults': 50} 369 370 if is_user: 371 if page: 372 qs['pageToken'] = token(page) 373 qs['channelId'] = channel_id 374 else: 375 qs['id'] = ','.join(id_list) 376 377 pldata = pafy.call_gdata('playlists', qs) 378 playlists = get_pl_from_json(pldata)[:util.getxy().max_results] 379 380 if is_user: 381 result_count = pldata['pageInfo']['totalResults'] 382 383 if playlists: 384 g.last_search_query = (pl_search, {"term": term, "is_user": is_user}) 385 g.browse_mode = "ytpl" 386 g.current_page = page 387 g.result_count = result_count 388 g.ytpls = playlists 389 g.message = "Playlist results for %s" % c.y + prog + c.w 390 g.content = content.generate_playlist_display() 391 392 else: 393 g.message = "No playlists found for: %s" % c.y + prog + c.w 394 g.current_page = 0 395 g.content = content.generate_songlist_display(zeromsg=g.message) 396 397 398def get_pl_from_json(pldata): 399 """ Process json playlist data. """ 400 401 try: 402 items = pldata['items'] 403 404 except KeyError: 405 items = [] 406 407 results = [] 408 409 for item in items: 410 snippet = item['snippet'] 411 results.append(dict( 412 link=item["id"], 413 size=item["contentDetails"]["itemCount"], 414 title=snippet["title"], 415 author=snippet["channelTitle"], 416 created=snippet["publishedAt"], 417 updated=snippet['publishedAt'], #XXX Not available in API? 418 description=snippet["description"])) 419 420 return results 421 422 423def get_track_id_from_json(item): 424 """ Try to extract video Id from various response types """ 425 fields = ['contentDetails/videoId', 426 'snippet/resourceId/videoId', 427 'id/videoId', 428 'id'] 429 for field in fields: 430 node = item 431 for p in field.split('/'): 432 if node and isinstance(node, dict): 433 node = node.get(p) 434 if node: 435 return node 436 return '' 437 438 439def get_tracks_from_json(jsons): 440 """ Get search results from API response """ 441 442 items = jsons.get("items") 443 if not items: 444 util.dbg("got unexpected data or no search results") 445 return () 446 447 # fetch detailed information about items from videos API 448 id_list = [get_track_id_from_json(i) 449 for i in items 450 if i['id']['kind'] == 'youtube#video'] 451 452 qs = {'part':'contentDetails,statistics,snippet', 453 'id': ','.join(id_list)} 454 455 wdata = pafy.call_gdata('videos', qs) 456 457 items_vidinfo = wdata.get('items', []) 458 # enhance search results by adding information from videos API response 459 for searchresult, vidinfoitem in zip(items, items_vidinfo): 460 searchresult.update(vidinfoitem) 461 462 # populate list of video objects 463 songs = [] 464 for item in items: 465 466 try: 467 468 ytid = get_track_id_from_json(item) 469 duration = item.get('contentDetails', {}).get('duration') 470 471 if duration: 472 duration = ISO8601_TIMEDUR_EX.findall(duration) 473 if len(duration) > 0: 474 _, hours, _, minutes, _, seconds = duration[0] 475 duration = [seconds, minutes, hours] 476 duration = [int(v) if len(v) > 0 else 0 for v in duration] 477 duration = sum([60**p*v for p, v in enumerate(duration)]) 478 else: 479 duration = 30 480 else: 481 duration = 30 482 483 stats = item.get('statistics', {}) 484 snippet = item.get('snippet', {}) 485 title = snippet.get('title', '').strip() 486 # instantiate video representation in local model 487 cursong = Video(ytid=ytid, title=title, length=duration) 488 likes = int(stats.get('likeCount', 0)) 489 dislikes = int(stats.get('dislikeCount', 0)) 490 #XXX this is a very poor attempt to calculate a rating value 491 rating = 5.*likes/(likes+dislikes) if (likes+dislikes) > 0 else 0 492 category = snippet.get('categoryId') 493 publishedlocaldatetime = util.yt_datetime_local(snippet.get('publishedAt', '')) 494 495 # cache video information in custom global variable store 496 g.meta[ytid] = dict( 497 # tries to get localized title first, fallback to normal title 498 title=snippet.get('localized', 499 {'title':snippet.get('title', 500 '[!!!]')}).get('title', 501 '[!]'), 502 length=str(util.fmt_time(cursong.length)), 503 rating=str('{}'.format(rating))[:4].ljust(4, "0"), 504 uploader=snippet.get('channelId'), 505 uploaderName=snippet.get('channelTitle'), 506 category=category, 507 aspect="custom", #XXX 508 uploaded=publishedlocaldatetime[1], 509 uploadedTime=publishedlocaldatetime[2], 510 likes=str(num_repr(likes)), 511 dislikes=str(num_repr(dislikes)), 512 commentCount=str(num_repr(int(stats.get('commentCount', 0)))), 513 viewCount=str(num_repr(int(stats.get('viewCount', 0))))) 514 515 except Exception as e: 516 517 util.dbg(json.dumps(item, indent=2)) 518 util.dbg('Error during metadata extraction/instantiation of ' + 519 'search result {}\n{}'.format(ytid, e)) 520 521 songs.append(cursong) 522 523 # return video objects 524 return songs 525 526 527def num_repr(num): 528 """ Return up to four digit string representation of a number, eg 2.6m. """ 529 if num <= 9999: 530 return str(num) 531 532 def digit_count(x): 533 """ Return number of digits. """ 534 return int(math.floor(math.log10(x)) + 1) 535 536 digits = digit_count(num) 537 sig = 3 if digits % 3 == 0 else 2 538 rounded = int(round(num, int(sig - digits))) 539 digits = digit_count(rounded) 540 suffix = "_kmBTqXYX"[(digits - 1) // 3] 541 front = 3 if digits % 3 == 0 else digits % 3 542 543 if not front == 1: 544 return str(rounded)[0:front] + suffix 545 546 return str(rounded)[0] + "." + str(rounded)[1] + suffix 547 548 549@command(r'u\s?([\d]{1,4})', 'u') 550def user_more(num): 551 """ Show more videos from user of vid num. """ 552 if g.browse_mode != "normal": 553 g.message = "User uploads must refer to a specific video item" 554 g.message = c.y + g.message + c.w 555 g.content = content.generate_songlist_display() 556 return 557 558 g.current_page = 0 559 item = g.model[int(num) - 1] 560 561 #TODO: Cleaner way of doing this? 562 if item.ytid in g.meta: 563 channel_id = g.meta.get(item.ytid, {}).get('uploader') 564 user = g.meta.get(item.ytid, {}).get('uploaderName') 565 else: 566 paf = util.get_pafy(item) 567 user, channel_id = channelfromname(paf.author) 568 569 usersearch_id(user, channel_id, '') 570 571 572@command(r'r\s?(\d{1,4})', 'r') 573def related(num): 574 """ Show videos related to to vid num. """ 575 if g.browse_mode != "normal": 576 g.message = "Related items must refer to a specific video item" 577 g.message = c.y + g.message + c.w 578 g.content = content.generate_songlist_display() 579 return 580 581 g.current_page = 0 582 item = g.model[int(num) - 1] 583 related_search(item) 584 585 586@command(r'mix\s*(\d{1,4})', 'mix') 587def mix(num): 588 """ Retrieves the YouTube mix for the selected video. """ 589 g.content = g.content or content.generate_songlist_display() 590 if g.browse_mode != "normal": 591 g.message = util.F('mix only videos') 592 else: 593 item = (g.model[int(num) - 1]) 594 if item is None: 595 g.message = util.F('invalid item') 596 return 597 item = util.get_pafy(item) 598 # Mix playlists are made up of 'RD' + video_id 599 try: 600 plist("RD" + item.videoid) 601 except OSError: 602 g.message = util.F('no mix') 603 604 605@command(r'url\s(.*[-_a-zA-Z0-9]{11}.*)', 'url') 606def yt_url(url, print_title=0): 607 """ Acess videos by urls. """ 608 url_list = url.split() 609 610 g.model.songs = [] 611 612 for u in url_list: 613 try: 614 p = util.get_pafy(u) 615 616 except (IOError, ValueError) as e: 617 g.message = c.r + str(e) + c.w 618 g.content = g.content or content.generate_songlist_display( 619 zeromsg=g.message) 620 return 621 622 g.browse_mode = "normal" 623 v = Video(p.videoid, p.title, p.length) 624 g.model.songs.append(v) 625 626 if not g.command_line: 627 g.content = content.generate_songlist_display() 628 629 if print_title: 630 util.xprint(v.title) 631 632 633@command(r'url_file\s(\S+)', 'url_file') 634def yt_url_file(file_name): 635 """ Access a list of urls in a text file """ 636 637 #Open and read the file 638 try: 639 with open(file_name, "r") as fo: 640 output = ' '.join([line.strip() for line in fo if line.strip()]) 641 642 except (IOError): 643 g.message = c.r + 'Error while opening the file, check the validity of the path' + c.w 644 g.content = g.content or content.generate_songlist_display( 645 zeromsg=g.message) 646 return 647 648 #Finally pass the input to yt_url 649 yt_url(output) 650