1#!/usr/bin/python3 -OO 2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org> 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU General Public License 6# as published by the Free Software Foundation; either version 2 7# of the License, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 18""" 19sabnzbd.nzbqueue - nzb queue 20""" 21 22import os 23import logging 24import time 25import datetime 26import functools 27from typing import List, Dict, Union, Tuple, Optional 28 29import sabnzbd 30from sabnzbd.nzbstuff import NzbObject, Article 31from sabnzbd.misc import exit_sab, cat_to_opts, int_conv, caller_name, cmp, safe_lower 32from sabnzbd.filesystem import get_admin_path, remove_all, globber_full, remove_file, is_valid_script 33from sabnzbd.nzbparser import process_single_nzb 34from sabnzbd.panic import panic_queue 35from sabnzbd.decorators import NzbQueueLocker 36from sabnzbd.constants import ( 37 QUEUE_FILE_NAME, 38 QUEUE_VERSION, 39 FUTURE_Q_FOLDER, 40 JOB_ADMIN, 41 DEFAULT_PRIORITY, 42 LOW_PRIORITY, 43 NORMAL_PRIORITY, 44 HIGH_PRIORITY, 45 FORCE_PRIORITY, 46 REPAIR_PRIORITY, 47 STOP_PRIORITY, 48 VERIFIED_FILE, 49 Status, 50 IGNORED_FOLDERS, 51 QNFO, 52 DIRECT_WRITE_TRIGGER, 53) 54 55import sabnzbd.cfg as cfg 56from sabnzbd.downloader import Server 57from sabnzbd.assembler import file_has_articles 58import sabnzbd.notifier as notifier 59 60 61class NzbQueue: 62 """Singleton NzbQueue""" 63 64 def __init__(self): 65 self.__top_only: bool = cfg.top_only() 66 self.__nzo_list: List[NzbObject] = [] 67 self.__nzo_table: Dict[str, NzbObject] = {} 68 69 def read_queue(self, repair): 70 """Read queue from disk, supporting repair modes 71 0 = no repairs 72 1 = use existing queue, add missing "incomplete" folders 73 2 = Discard all queue admin, reconstruct from "incomplete" folders 74 """ 75 nzo_ids = [] 76 if repair < 2: 77 # Try to process the queue file 78 try: 79 data = sabnzbd.load_admin(QUEUE_FILE_NAME) 80 if data: 81 queue_vers, nzo_ids, _ = data 82 if not queue_vers == QUEUE_VERSION: 83 nzo_ids = [] 84 logging.error(T("Incompatible queuefile found, cannot proceed")) 85 if not repair: 86 panic_queue(os.path.join(cfg.admin_dir.get_path(), QUEUE_FILE_NAME)) 87 exit_sab(2) 88 except: 89 nzo_ids = [] 90 logging.error( 91 T("Error loading %s, corrupt file detected"), 92 os.path.join(cfg.admin_dir.get_path(), QUEUE_FILE_NAME), 93 ) 94 95 # First handle jobs in the queue file 96 folders = [] 97 for nzo_id in nzo_ids: 98 folder, _id = os.path.split(nzo_id) 99 path = get_admin_path(folder, future=False) 100 101 # Try as normal job 102 nzo = sabnzbd.load_data(_id, path, remove=False) 103 if not nzo: 104 # Try as future job 105 path = get_admin_path(folder, future=True) 106 nzo = sabnzbd.load_data(_id, path) 107 if nzo: 108 self.add(nzo, save=False, quiet=True) 109 folders.append(folder) 110 111 # Scan for any folders in "incomplete" that are not yet in the queue 112 if repair: 113 logging.info("Starting queue repair") 114 self.scan_jobs(not folders) 115 # Handle any lost future jobs 116 for item in globber_full(os.path.join(cfg.admin_dir.get_path(), FUTURE_Q_FOLDER)): 117 path, nzo_id = os.path.split(item) 118 if nzo_id not in self.__nzo_table: 119 if nzo_id.startswith("SABnzbd_nzo"): 120 nzo = sabnzbd.load_data(nzo_id, path, remove=True) 121 if nzo: 122 self.add(nzo, save=True) 123 else: 124 try: 125 remove_file(item) 126 except: 127 pass 128 129 @NzbQueueLocker 130 def scan_jobs(self, all_jobs=False, action=True): 131 """Scan "incomplete" for missing folders, 132 'all' is True: Include active folders 133 'action' is True, do the recovery action 134 returns list of orphaned folders 135 """ 136 result = [] 137 # Folders from the download queue 138 if all_jobs: 139 registered = [] 140 else: 141 registered = [nzo.work_name for nzo in self.__nzo_list] 142 143 # Retryable folders from History 144 items = sabnzbd.api.build_history()[0] 145 # Anything waiting or active or retryable is a known item 146 registered.extend( 147 [ 148 os.path.basename(item["path"]) 149 for item in items 150 if item["retry"] or item["loaded"] or item["status"] == Status.QUEUED 151 ] 152 ) 153 154 # Repair unregistered folders 155 for folder in globber_full(cfg.download_dir.get_path()): 156 name = os.path.basename(folder) 157 if os.path.isdir(folder) and name not in registered and name not in IGNORED_FOLDERS: 158 if action: 159 logging.info("Repairing job %s", folder) 160 self.repair_job(folder) 161 result.append(os.path.basename(folder)) 162 else: 163 if action: 164 logging.info("Skipping repair for job %s", folder) 165 return result 166 167 def repair_job(self, repair_folder, new_nzb=None, password=None): 168 """Reconstruct admin for a single job folder, optionally with new NZB""" 169 # Check if folder exists 170 if not repair_folder or not os.path.exists(repair_folder): 171 return None 172 173 name = os.path.basename(repair_folder) 174 admin_path = os.path.join(repair_folder, JOB_ADMIN) 175 176 # If Retry was used and a new NZB was uploaded 177 if getattr(new_nzb, "filename", None): 178 remove_all(admin_path, "*.gz", keep_folder=True) 179 logging.debug("Repair job %s with new NZB (%s)", name, new_nzb.filename) 180 _, nzo_ids = sabnzbd.add_nzbfile(new_nzb, nzbname=name, reuse=repair_folder, password=password) 181 else: 182 # Was this file already post-processed? 183 verified = sabnzbd.load_data(VERIFIED_FILE, admin_path, remove=False) 184 filenames = [] 185 if not verified or not all(verified[x] for x in verified): 186 filenames = globber_full(admin_path, "*.gz") 187 188 if filenames: 189 logging.debug("Repair job %s by re-parsing stored NZB", name) 190 _, nzo_ids = sabnzbd.add_nzbfile(filenames[0], nzbname=name, reuse=repair_folder, password=password) 191 else: 192 try: 193 logging.debug("Repair job %s without stored NZB", name) 194 nzo = NzbObject(name, nzbname=name, reuse=repair_folder) 195 nzo.password = password 196 self.add(nzo) 197 nzo_ids = [nzo.nzo_id] 198 except: 199 # NzoObject can throw exceptions if duplicate or unwanted etc 200 logging.info("Skipping %s due to exception", name, exc_info=True) 201 nzo_ids = [] 202 203 # Return None if we could not add anything 204 if nzo_ids: 205 return nzo_ids[0] 206 return None 207 208 @NzbQueueLocker 209 def send_back(self, old_nzo: NzbObject): 210 """Send back job to queue after successful pre-check""" 211 try: 212 nzb_path = globber_full(old_nzo.admin_path, "*.gz")[0] 213 except: 214 logging.info("Failed to find NZB file after pre-check (%s)", old_nzo.nzo_id) 215 return 216 217 # Store old position and create new NZO 218 old_position = self.__nzo_list.index(old_nzo) 219 res, nzo_ids = process_single_nzb( 220 old_nzo.filename, nzb_path, keep=True, reuse=old_nzo.download_path, nzo_id=old_nzo.nzo_id 221 ) 222 if res == 0 and nzo_ids: 223 # Swap to old position 224 new_nzo = self.get_nzo(nzo_ids[0]) 225 self.__nzo_list.remove(new_nzo) 226 self.__nzo_list.insert(old_position, new_nzo) 227 # Reset reuse flag to make pause/abort on encryption possible 228 self.__nzo_table[nzo_ids[0]].reuse = None 229 230 @NzbQueueLocker 231 def save(self, save_nzo: Union[NzbObject, None, bool] = None): 232 """Save queue, all nzo's or just the specified one""" 233 logging.info("Saving queue") 234 235 nzo_ids = [] 236 # Aggregate nzo_ids and save each nzo 237 for nzo in self.__nzo_list[:]: 238 if not nzo.is_gone(): 239 nzo_ids.append(os.path.join(nzo.work_name, nzo.nzo_id)) 240 if save_nzo is None or nzo is save_nzo: 241 if not nzo.futuretype: 242 # Also includes save_data for NZO 243 nzo.save_to_disk() 244 else: 245 sabnzbd.save_data(nzo, nzo.nzo_id, nzo.admin_path) 246 247 sabnzbd.save_admin((QUEUE_VERSION, nzo_ids, []), QUEUE_FILE_NAME) 248 249 def set_top_only(self, value): 250 self.__top_only = value 251 252 def generate_future(self, msg, pp=None, script=None, cat=None, url=None, priority=DEFAULT_PRIORITY, nzbname=None): 253 """Create and return a placeholder nzo object""" 254 logging.debug("Creating placeholder NZO") 255 future_nzo = NzbObject( 256 filename=msg, 257 pp=pp, 258 script=script, 259 futuretype=True, 260 cat=cat, 261 url=url, 262 priority=priority, 263 nzbname=nzbname, 264 status=Status.GRABBING, 265 ) 266 self.add(future_nzo) 267 return future_nzo 268 269 def change_opts(self, nzo_ids: str, pp: int) -> int: 270 result = 0 271 for nzo_id in [item.strip() for item in nzo_ids.split(",")]: 272 if nzo_id in self.__nzo_table: 273 self.__nzo_table[nzo_id].set_pp(pp) 274 result += 1 275 return result 276 277 def change_script(self, nzo_ids: str, script: str) -> int: 278 result = 0 279 if (script is None) or is_valid_script(script): 280 for nzo_id in [item.strip() for item in nzo_ids.split(",")]: 281 if nzo_id in self.__nzo_table: 282 self.__nzo_table[nzo_id].script = script 283 logging.info("Set script=%s for job %s", script, self.__nzo_table[nzo_id].final_name) 284 result += 1 285 return result 286 287 def change_cat(self, nzo_ids: str, cat: str, explicit_priority=None): 288 result = 0 289 for nzo_id in [item.strip() for item in nzo_ids.split(",")]: 290 if nzo_id in self.__nzo_table: 291 nzo = self.__nzo_table[nzo_id] 292 nzo.cat, pp, nzo.script, prio = cat_to_opts(cat) 293 logging.info("Set cat=%s for job %s", cat, nzo.final_name) 294 nzo.set_pp(pp) 295 if explicit_priority is None: 296 self.set_priority(nzo_id, prio) 297 # Abort any ongoing unpacking if the category changed 298 nzo.abort_direct_unpacker() 299 result += 1 300 return result 301 302 def change_name(self, nzo_id: str, name: str, password: str = None): 303 if nzo_id in self.__nzo_table: 304 nzo = self.__nzo_table[nzo_id] 305 logging.info("Renaming %s to %s", nzo.final_name, name) 306 # Abort any ongoing unpacking if the name changed (dirs change) 307 nzo.abort_direct_unpacker() 308 if not nzo.futuretype: 309 nzo.set_final_name_and_scan_password(name, password) 310 else: 311 # Reset url fetch wait time 312 nzo.url_wait = None 313 nzo.url_tries = 0 314 return True 315 else: 316 return False 317 318 def get_nzo(self, nzo_id) -> Optional[NzbObject]: 319 if nzo_id in self.__nzo_table: 320 return self.__nzo_table[nzo_id] 321 else: 322 return None 323 324 @NzbQueueLocker 325 def add(self, nzo: NzbObject, save=True, quiet=False) -> str: 326 if not nzo.nzo_id: 327 nzo.nzo_id = sabnzbd.get_new_id("nzo", nzo.admin_path, self.__nzo_table) 328 329 # If no files are to be downloaded anymore, send to postproc 330 if not nzo.files and not nzo.futuretype: 331 self.end_job(nzo) 332 return nzo.nzo_id 333 334 # Reset try_lists, markers and evaluate the scheduling settings 335 nzo.reset_try_list() 336 nzo.deleted = False 337 priority = nzo.priority 338 if sabnzbd.Scheduler.analyse(False, priority): 339 nzo.status = Status.PAUSED 340 341 self.__nzo_table[nzo.nzo_id] = nzo 342 if priority > HIGH_PRIORITY: 343 # Top and repair priority items are added to the top of the queue 344 self.__nzo_list.insert(0, nzo) 345 elif priority == LOW_PRIORITY: 346 self.__nzo_list.append(nzo) 347 else: 348 # for high priority we need to add the item at the bottom 349 # of any other high priority items above the normal priority 350 # for normal priority we need to add the item at the bottom 351 # of the normal priority items above the low priority 352 if self.__nzo_list: 353 pos = 0 354 added = False 355 for position in self.__nzo_list: 356 if position.priority < priority: 357 self.__nzo_list.insert(pos, nzo) 358 added = True 359 break 360 pos += 1 361 if not added: 362 # if there are no other items classed as a lower priority 363 # then it will be added to the bottom of the queue 364 self.__nzo_list.append(nzo) 365 else: 366 # if the queue is empty then simple append the item to the bottom 367 self.__nzo_list.append(nzo) 368 if save: 369 self.save(nzo) 370 371 if not (quiet or nzo.status == Status.FETCHING): 372 notifier.send_notification(T("NZB added to queue"), nzo.filename, "download", nzo.cat) 373 374 if not quiet and cfg.auto_sort(): 375 try: 376 field, direction = cfg.auto_sort().split() 377 self.sort_queue(field, direction) 378 except ValueError: 379 pass 380 return nzo.nzo_id 381 382 @NzbQueueLocker 383 def remove(self, nzo_id: str, cleanup=True, delete_all_data=True): 384 """Remove NZO from queue. 385 It can be added to history directly. 386 Or, we do some clean-up, sometimes leaving some data. 387 """ 388 if nzo_id in self.__nzo_table: 389 nzo = self.__nzo_table.pop(nzo_id) 390 logging.info("[%s] Removing job %s", caller_name(), nzo.final_name) 391 392 # Set statuses 393 nzo.deleted = True 394 if cleanup and not nzo.is_gone(): 395 nzo.status = Status.DELETED 396 self.__nzo_list.remove(nzo) 397 398 if cleanup: 399 nzo.purge_data(delete_all_data=delete_all_data) 400 self.save(False) 401 return nzo_id 402 return None 403 404 @NzbQueueLocker 405 def remove_multiple(self, nzo_ids: List[str], delete_all_data=True) -> List[str]: 406 removed = [] 407 for nzo_id in nzo_ids: 408 if self.remove(nzo_id, delete_all_data=delete_all_data): 409 removed.append(nzo_id) 410 411 # Any files left? Otherwise let's disconnect 412 if self.actives(grabs=False) == 0 and cfg.autodisconnect(): 413 # This was the last job, close server connections 414 sabnzbd.Downloader.disconnect() 415 416 return removed 417 418 @NzbQueueLocker 419 def remove_all(self, search: str = "") -> List[str]: 420 """Remove NZO's that match the search-pattern""" 421 nzo_ids = [] 422 search = safe_lower(search) 423 for nzo_id, nzo in self.__nzo_table.items(): 424 if not search or search in nzo.final_name.lower(): 425 nzo_ids.append(nzo_id) 426 return self.remove_multiple(nzo_ids) 427 428 def remove_nzf(self, nzo_id: str, nzf_id: str, force_delete=False) -> List[str]: 429 removed = [] 430 if nzo_id in self.__nzo_table: 431 nzo = self.__nzo_table[nzo_id] 432 nzf = nzo.get_nzf_by_id(nzf_id) 433 434 if nzf: 435 removed.append(nzf_id) 436 nzo.abort_direct_unpacker() 437 post_done = nzo.remove_nzf(nzf) 438 if post_done: 439 if nzo.finished_files: 440 self.end_job(nzo) 441 else: 442 self.remove(nzo_id) 443 elif force_delete: 444 # Force-remove all trace and update counters 445 nzo.bytes -= nzf.bytes 446 nzo.bytes_tried -= nzf.bytes - nzf.bytes_left 447 if nzf.is_par2 or sabnzbd.par2file.is_parfile(nzf.filename): 448 nzo.bytes_par2 -= nzf.bytes 449 del nzo.files_table[nzf_id] 450 nzo.finished_files.remove(nzf) 451 logging.info("Removed NZFs %s from job %s", removed, nzo.final_name) 452 return removed 453 454 def pause_multiple_nzo(self, nzo_ids: List[str]) -> List[str]: 455 handled = [] 456 for nzo_id in nzo_ids: 457 self.pause_nzo(nzo_id) 458 handled.append(nzo_id) 459 return handled 460 461 def pause_nzo(self, nzo_id: str) -> List[str]: 462 handled = [] 463 if nzo_id in self.__nzo_table: 464 nzo = self.__nzo_table[nzo_id] 465 nzo.pause() 466 logging.info("Paused nzo: %s", nzo_id) 467 handled.append(nzo_id) 468 return handled 469 470 def resume_multiple_nzo(self, nzo_ids: List[str]) -> List[str]: 471 handled = [] 472 for nzo_id in nzo_ids: 473 self.resume_nzo(nzo_id) 474 handled.append(nzo_id) 475 return handled 476 477 @NzbQueueLocker 478 def resume_nzo(self, nzo_id: str) -> List[str]: 479 handled = [] 480 if nzo_id in self.__nzo_table: 481 nzo = self.__nzo_table[nzo_id] 482 nzo.resume() 483 logging.info("Resumed nzo: %s", nzo_id) 484 handled.append(nzo_id) 485 return handled 486 487 @NzbQueueLocker 488 def switch(self, item_id_1: str, item_id_2: str) -> Tuple[int, int]: 489 try: 490 # Allow an index as second parameter, easier for some skins 491 i = int(item_id_2) 492 item_id_2 = self.__nzo_list[i].nzo_id 493 except: 494 pass 495 try: 496 nzo1 = self.__nzo_table[item_id_1] 497 nzo2 = self.__nzo_table[item_id_2] 498 except KeyError: 499 # One or both jobs missing 500 return -1, 0 501 502 if nzo1 == nzo2: 503 return -1, 0 504 505 # get the priorities of the two items 506 nzo1_priority = nzo1.priority 507 nzo2_priority = nzo2.priority 508 try: 509 # get the item id of the item below to use in priority changing 510 item_id_3 = self.__nzo_list[i + 1].nzo_id 511 # if there is an item below the id1 and id2 then we need that too 512 # to determine whether to change the priority 513 nzo3 = self.__nzo_table[item_id_3] 514 nzo3_priority = nzo3.priority 515 # if id1 is surrounded by items of a different priority then change it's pririty to match 516 if nzo2_priority != nzo1_priority and nzo3_priority != nzo1_priority or nzo2_priority > nzo1_priority: 517 nzo1.priority = nzo2_priority 518 except: 519 nzo1.priority = nzo2_priority 520 item_id_pos1 = -1 521 item_id_pos2 = -1 522 for i in range(len(self.__nzo_list)): 523 if item_id_1 == self.__nzo_list[i].nzo_id: 524 item_id_pos1 = i 525 elif item_id_2 == self.__nzo_list[i].nzo_id: 526 item_id_pos2 = i 527 if (item_id_pos1 > -1) and (item_id_pos2 > -1): 528 item = self.__nzo_list[item_id_pos1] 529 logging.info( 530 "Switching job [%s] %s => [%s] %s", 531 item_id_pos1, 532 item.final_name, 533 item_id_pos2, 534 self.__nzo_list[item_id_pos2].final_name, 535 ) 536 del self.__nzo_list[item_id_pos1] 537 self.__nzo_list.insert(item_id_pos2, item) 538 return item_id_pos2, nzo1.priority 539 # If moving failed/no movement took place 540 return -1, nzo1.priority 541 542 @NzbQueueLocker 543 def move_up_bulk(self, nzo_id, nzf_ids, size): 544 if nzo_id in self.__nzo_table: 545 for unused in range(size): 546 self.__nzo_table[nzo_id].move_up_bulk(nzf_ids) 547 548 @NzbQueueLocker 549 def move_top_bulk(self, nzo_id, nzf_ids): 550 if nzo_id in self.__nzo_table: 551 self.__nzo_table[nzo_id].move_top_bulk(nzf_ids) 552 553 @NzbQueueLocker 554 def move_down_bulk(self, nzo_id, nzf_ids, size): 555 if nzo_id in self.__nzo_table: 556 for unused in range(size): 557 self.__nzo_table[nzo_id].move_down_bulk(nzf_ids) 558 559 @NzbQueueLocker 560 def move_bottom_bulk(self, nzo_id, nzf_ids): 561 if nzo_id in self.__nzo_table: 562 self.__nzo_table[nzo_id].move_bottom_bulk(nzf_ids) 563 564 @NzbQueueLocker 565 def sort_by_avg_age(self, reverse=False): 566 logging.info("Sorting by average date... (reversed: %s)", reverse) 567 self.__nzo_list = sort_queue_function(self.__nzo_list, _nzo_date_cmp, reverse) 568 569 @NzbQueueLocker 570 def sort_by_name(self, reverse=False): 571 logging.info("Sorting by name... (reversed: %s)", reverse) 572 self.__nzo_list = sort_queue_function(self.__nzo_list, _nzo_name_cmp, reverse) 573 574 @NzbQueueLocker 575 def sort_by_size(self, reverse=False): 576 logging.info("Sorting by size... (reversed: %s)", reverse) 577 self.__nzo_list = sort_queue_function(self.__nzo_list, _nzo_size_cmp, reverse) 578 579 def sort_queue(self, field, reverse=None): 580 """Sort queue by field: "name", "size" or "avg_age" 581 Direction is specified as "desc"/True or "asc"/False 582 """ 583 if isinstance(reverse, str): 584 if reverse.lower() == "desc": 585 reverse = True 586 else: 587 reverse = False 588 if reverse is None: 589 reverse = False 590 if field.lower() == "name": 591 self.sort_by_name(reverse) 592 elif field.lower() == "size" or field.lower() == "bytes": 593 self.sort_by_size(reverse) 594 elif field.lower() == "avg_age": 595 self.sort_by_avg_age(not reverse) 596 else: 597 logging.debug("Sort: %s not recognized", field) 598 599 @NzbQueueLocker 600 def __set_priority(self, nzo_id, priority): 601 """Sets the priority on the nzo and places it in the queue at the appropriate position""" 602 try: 603 priority = int_conv(priority) 604 nzo = self.__nzo_table[nzo_id] 605 nzo_id_pos1 = -1 606 pos = -1 607 608 # If priority == STOP_PRIORITY, then send to queue 609 if priority == STOP_PRIORITY: 610 self.end_job(nzo) 611 return 612 613 # Get the current position in the queue 614 for i in range(len(self.__nzo_list)): 615 if nzo_id == self.__nzo_list[i].nzo_id: 616 nzo_id_pos1 = i 617 break 618 619 # Don't change priority and order if priority is the same as asked 620 if priority == self.__nzo_list[nzo_id_pos1].priority: 621 return nzo_id_pos1 622 623 nzo.set_priority(priority) 624 if sabnzbd.Scheduler.analyse(False, priority) and nzo.status in ( 625 Status.CHECKING, 626 Status.DOWNLOADING, 627 Status.QUEUED, 628 ): 629 nzo.status = Status.PAUSED 630 elif nzo.status == Status.PAUSED: 631 nzo.status = Status.QUEUED 632 nzo.save_to_disk() 633 634 if nzo_id_pos1 != -1: 635 del self.__nzo_list[nzo_id_pos1] 636 if priority == FORCE_PRIORITY: 637 # A top priority item (usually a completed download fetching pars) 638 # is added to the top of the queue 639 self.__nzo_list.insert(0, nzo) 640 pos = 0 641 elif priority == LOW_PRIORITY: 642 pos = len(self.__nzo_list) 643 self.__nzo_list.append(nzo) 644 else: 645 # for high priority we need to add the item at the bottom 646 # of any other high priority items above the normal priority 647 # for normal priority we need to add the item at the bottom 648 # of the normal priority items above the low priority 649 if self.__nzo_list: 650 p = 0 651 added = False 652 for position in self.__nzo_list: 653 if position.priority < priority: 654 self.__nzo_list.insert(p, nzo) 655 pos = p 656 added = True 657 break 658 p += 1 659 if not added: 660 # if there are no other items classed as a lower priority 661 # then it will be added to the bottom of the queue 662 pos = len(self.__nzo_list) 663 self.__nzo_list.append(nzo) 664 else: 665 # if the queue is empty then simple append the item to the bottom 666 self.__nzo_list.append(nzo) 667 pos = 0 668 669 logging.info( 670 "Set priority=%s for job %s => position=%s ", priority, self.__nzo_table[nzo_id].final_name, pos 671 ) 672 return pos 673 674 except: 675 return -1 676 677 @NzbQueueLocker 678 def set_priority(self, nzo_ids, priority): 679 try: 680 n = -1 681 for nzo_id in [item.strip() for item in nzo_ids.split(",")]: 682 n = self.__set_priority(nzo_id, priority) 683 return n 684 except: 685 return -1 686 687 @staticmethod 688 def reset_try_lists(article: Article, remove_fetcher_from_trylist: bool = True): 689 """Let article get new fetcher and reset trylists""" 690 if remove_fetcher_from_trylist: 691 article.remove_from_try_list(article.fetcher) 692 article.fetcher = None 693 article.tries = 0 694 article.nzf.reset_try_list() 695 article.nzf.nzo.reset_try_list() 696 697 def has_forced_items(self): 698 """Check if the queue contains any Forced 699 Priority items to download while paused 700 """ 701 for nzo in self.__nzo_list: 702 if nzo.priority == FORCE_PRIORITY and nzo.status not in (Status.PAUSED, Status.GRABBING): 703 return True 704 return False 705 706 def get_articles(self, server: Server, servers: List[Server], fetch_limit: int) -> List[Article]: 707 """Get next article for jobs in the queue 708 Not locked for performance, since it only reads the queue 709 """ 710 # Pre-calculate propagation delay 711 propagation_delay = float(cfg.propagation_delay() * 60) 712 for nzo in self.__nzo_list: 713 # Not when queue paused and not a forced item 714 if nzo.status not in (Status.PAUSED, Status.GRABBING) or nzo.priority == FORCE_PRIORITY: 715 # Check if past propagation delay, or forced 716 if ( 717 not propagation_delay 718 or nzo.priority == FORCE_PRIORITY 719 or (nzo.avg_stamp + propagation_delay) < time.time() 720 ): 721 if not nzo.server_in_try_list(server): 722 articles = nzo.get_articles(server, servers, fetch_limit) 723 if articles: 724 return articles 725 # Stop after first job that wasn't paused/propagating/etc 726 if self.__top_only: 727 return [] 728 return [] 729 730 def register_article(self, article: Article, success: bool = True): 731 """Register the articles we tried 732 Not locked for performance, since it only modifies individual NZOs 733 """ 734 nzf = article.nzf 735 nzo = nzf.nzo 736 737 if nzf.deleted: 738 logging.debug("Discarding article %s, no longer in queue", article.article) 739 return 740 741 articles_left, file_done, post_done = nzo.remove_article(article, success) 742 743 if nzo.is_gone(): 744 logging.debug("Discarding article for file %s, no longer in queue", nzf.filename) 745 else: 746 # Write data if file is done or at trigger time 747 if file_done or (articles_left and (articles_left % DIRECT_WRITE_TRIGGER) == 0): 748 if not nzo.precheck: 749 # Only start decoding if we have a filename and type 750 # The type is only set if sabyenc could decode the article 751 if nzf.filename and nzf.type: 752 sabnzbd.Assembler.process(nzo, nzf, file_done) 753 elif nzf.filename.lower().endswith(".par2"): 754 # Broken par2 file, try to get another one 755 nzo.promote_par2(nzf) 756 else: 757 if file_has_articles(nzf): 758 logging.warning(T("%s -> Unknown encoding"), nzf.filename) 759 760 # Save bookkeeping in case of crash 761 if file_done and (nzo.next_save is None or time.time() > nzo.next_save): 762 nzo.save_to_disk() 763 sabnzbd.BPSMeter.save() 764 if nzo.save_timeout is None: 765 nzo.next_save = None 766 else: 767 nzo.next_save = time.time() + nzo.save_timeout 768 769 # Remove post from Queue 770 if post_done: 771 nzo.set_download_report() 772 self.end_job(nzo) 773 774 @NzbQueueLocker 775 def end_job(self, nzo: NzbObject): 776 """Send NZO to the post-processing queue""" 777 # Notify assembler to call postprocessor 778 if not nzo.deleted: 779 logging.info("[%s] Ending job %s", caller_name(), nzo.final_name) 780 nzo.deleted = True 781 if nzo.precheck: 782 nzo.save_to_disk() 783 # Check result 784 enough, _ = nzo.check_availability_ratio() 785 if enough: 786 # Enough data present, do real download 787 self.send_back(nzo) 788 return 789 else: 790 # Not enough data, let postprocessor show it as failed 791 pass 792 sabnzbd.Assembler.process(nzo) 793 794 def actives(self, grabs=True) -> int: 795 """Return amount of non-paused jobs, optionally with 'grabbing' items 796 Not locked for performance, only reads the queue 797 """ 798 n = 0 799 for nzo in self.__nzo_list: 800 # Ignore any items that are paused 801 if grabs and nzo.status == Status.GRABBING: 802 n += 1 803 elif nzo.status not in (Status.PAUSED, Status.GRABBING): 804 n += 1 805 return n 806 807 def queue_info(self, search=None, nzo_ids=None, start=0, limit=0): 808 """Return list of queued jobs, 809 optionally filtered by 'search' and 'nzo_ids', and limited by start and limit. 810 Not locked for performance, only reads the queue 811 """ 812 if search: 813 search = search.lower() 814 if nzo_ids: 815 nzo_ids = nzo_ids.split(",") 816 bytes_left = 0 817 bytes_total = 0 818 bytes_left_previous_page = 0 819 q_size = 0 820 pnfo_list = [] 821 n = 0 822 823 for nzo in self.__nzo_list: 824 if nzo.status not in (Status.PAUSED, Status.CHECKING) or nzo.priority == FORCE_PRIORITY: 825 b_left = nzo.remaining 826 bytes_total += nzo.bytes 827 bytes_left += b_left 828 q_size += 1 829 # We need the number of bytes before the current page 830 if n < start: 831 bytes_left_previous_page += b_left 832 833 if (not search) or search in nzo.final_name.lower(): 834 if (not nzo_ids) or nzo.nzo_id in nzo_ids: 835 if (not limit) or (start <= n < start + limit): 836 pnfo_list.append(nzo.gather_info()) 837 n += 1 838 839 if not search and not nzo_ids: 840 n = len(self.__nzo_list) 841 return QNFO(bytes_total, bytes_left, bytes_left_previous_page, pnfo_list, q_size, n) 842 843 def remaining(self): 844 """Return bytes left in the queue by non-paused items 845 Not locked for performance, only reads the queue 846 """ 847 bytes_left = 0 848 for nzo in self.__nzo_list: 849 if nzo.status != Status.PAUSED: 850 bytes_left += nzo.remaining 851 return bytes_left 852 853 def is_empty(self): 854 empty = True 855 for nzo in self.__nzo_list: 856 if not nzo.futuretype and nzo.status != Status.PAUSED: 857 empty = False 858 break 859 return empty 860 861 def stop_idle_jobs(self): 862 """Detect jobs that have zero files left and send them to post processing""" 863 # Only check servers that are active 864 nr_servers = len([server for server in sabnzbd.Downloader.servers[:] if server.active]) 865 empty = [] 866 867 for nzo in self.__nzo_list: 868 if not nzo.futuretype and not nzo.files and nzo.status not in (Status.PAUSED, Status.GRABBING): 869 logging.info("Found idle job %s", nzo.final_name) 870 empty.append(nzo) 871 872 # Stall prevention by checking if all servers are in the trylist 873 # This is a CPU-cheaper alternative to prevent stalling 874 if len(nzo.try_list) >= nr_servers: 875 # Maybe the NZF's need a reset too? 876 for nzf in nzo.files: 877 if len(nzf.try_list) >= nr_servers: 878 # We do not want to reset all article trylists, they are good 879 logging.info("Resetting bad trylist for file %s in job %s", nzf.filename, nzo.final_name) 880 nzf.reset_try_list() 881 882 # Reset main trylist, minimal performance impact 883 logging.info("Resetting bad trylist for job %s", nzo.final_name) 884 nzo.reset_try_list() 885 886 for nzo in empty: 887 self.end_job(nzo) 888 889 def pause_on_prio(self, priority: int): 890 for nzo in self.__nzo_list: 891 if nzo.priority == priority: 892 nzo.pause() 893 894 @NzbQueueLocker 895 def resume_on_prio(self, priority: int): 896 for nzo in self.__nzo_list: 897 if nzo.priority == priority: 898 # Don't use nzo.resume() to avoid resetting job warning flags 899 nzo.status = Status.QUEUED 900 901 def pause_on_cat(self, cat: str): 902 for nzo in self.__nzo_list: 903 if nzo.cat == cat: 904 nzo.pause() 905 906 @NzbQueueLocker 907 def resume_on_cat(self, cat: str): 908 for nzo in self.__nzo_list: 909 if nzo.cat == cat: 910 # Don't use nzo.resume() to avoid resetting job warning flags 911 nzo.status = Status.QUEUED 912 913 def get_urls(self): 914 """Return list of future-types needing URL""" 915 lst = [] 916 for nzo_id in self.__nzo_table: 917 nzo = self.__nzo_table[nzo_id] 918 if nzo.futuretype: 919 url = nzo.url 920 if nzo.futuretype and url.lower().startswith("http"): 921 lst.append((url, nzo)) 922 return lst 923 924 def __repr__(self): 925 return "<NzbQueue>" 926 927 928def _nzo_date_cmp(nzo1: NzbObject, nzo2: NzbObject): 929 avg_date1 = nzo1.avg_date 930 avg_date2 = nzo2.avg_date 931 932 if avg_date1 is None and avg_date2 is None: 933 return 0 934 935 if avg_date1 is None: 936 avg_date1 = datetime.datetime.now() 937 elif avg_date2 is None: 938 avg_date2 = datetime.datetime.now() 939 940 return cmp(avg_date1, avg_date2) 941 942 943def _nzo_name_cmp(nzo1, nzo2): 944 return cmp(nzo1.final_name.lower(), nzo2.final_name.lower()) 945 946 947def _nzo_size_cmp(nzo1, nzo2): 948 return cmp(nzo1.bytes, nzo2.bytes) 949 950 951def sort_queue_function(nzo_list: List[NzbObject], method, reverse: bool) -> List[NzbObject]: 952 ultra_high_priority = [nzo for nzo in nzo_list if nzo.priority == REPAIR_PRIORITY] 953 super_high_priority = [nzo for nzo in nzo_list if nzo.priority == FORCE_PRIORITY] 954 high_priority = [nzo for nzo in nzo_list if nzo.priority == HIGH_PRIORITY] 955 normal_priority = [nzo for nzo in nzo_list if nzo.priority == NORMAL_PRIORITY] 956 low_priority = [nzo for nzo in nzo_list if nzo.priority == LOW_PRIORITY] 957 958 ultra_high_priority.sort(key=functools.cmp_to_key(method), reverse=reverse) 959 super_high_priority.sort(key=functools.cmp_to_key(method), reverse=reverse) 960 high_priority.sort(key=functools.cmp_to_key(method), reverse=reverse) 961 normal_priority.sort(key=functools.cmp_to_key(method), reverse=reverse) 962 low_priority.sort(key=functools.cmp_to_key(method), reverse=reverse) 963 964 new_list = ultra_high_priority 965 new_list.extend(super_high_priority) 966 new_list.extend(high_priority) 967 new_list.extend(normal_priority) 968 new_list.extend(low_priority) 969 970 # Make sure any left-over jobs enter the new list 971 for item in nzo_list: 972 if item not in new_list: 973 new_list.append(item) 974 975 return new_list 976