1#!/usr/bin/python3 -OO
2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; either version 2
7# of the License, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17
18"""
19sabnzbd.nzbqueue - nzb queue
20"""
21
22import os
23import logging
24import time
25import datetime
26import functools
27from typing import List, Dict, Union, Tuple, Optional
28
29import sabnzbd
30from sabnzbd.nzbstuff import NzbObject, Article
31from sabnzbd.misc import exit_sab, cat_to_opts, int_conv, caller_name, cmp, safe_lower
32from sabnzbd.filesystem import get_admin_path, remove_all, globber_full, remove_file, is_valid_script
33from sabnzbd.nzbparser import process_single_nzb
34from sabnzbd.panic import panic_queue
35from sabnzbd.decorators import NzbQueueLocker
36from sabnzbd.constants import (
37    QUEUE_FILE_NAME,
38    QUEUE_VERSION,
39    FUTURE_Q_FOLDER,
40    JOB_ADMIN,
41    DEFAULT_PRIORITY,
42    LOW_PRIORITY,
43    NORMAL_PRIORITY,
44    HIGH_PRIORITY,
45    FORCE_PRIORITY,
46    REPAIR_PRIORITY,
47    STOP_PRIORITY,
48    VERIFIED_FILE,
49    Status,
50    IGNORED_FOLDERS,
51    QNFO,
52    DIRECT_WRITE_TRIGGER,
53)
54
55import sabnzbd.cfg as cfg
56from sabnzbd.downloader import Server
57from sabnzbd.assembler import file_has_articles
58import sabnzbd.notifier as notifier
59
60
61class NzbQueue:
62    """Singleton NzbQueue"""
63
64    def __init__(self):
65        self.__top_only: bool = cfg.top_only()
66        self.__nzo_list: List[NzbObject] = []
67        self.__nzo_table: Dict[str, NzbObject] = {}
68
69    def read_queue(self, repair):
70        """Read queue from disk, supporting repair modes
71        0 = no repairs
72        1 = use existing queue, add missing "incomplete" folders
73        2 = Discard all queue admin, reconstruct from "incomplete" folders
74        """
75        nzo_ids = []
76        if repair < 2:
77            # Try to process the queue file
78            try:
79                data = sabnzbd.load_admin(QUEUE_FILE_NAME)
80                if data:
81                    queue_vers, nzo_ids, _ = data
82                    if not queue_vers == QUEUE_VERSION:
83                        nzo_ids = []
84                        logging.error(T("Incompatible queuefile found, cannot proceed"))
85                        if not repair:
86                            panic_queue(os.path.join(cfg.admin_dir.get_path(), QUEUE_FILE_NAME))
87                            exit_sab(2)
88            except:
89                nzo_ids = []
90                logging.error(
91                    T("Error loading %s, corrupt file detected"),
92                    os.path.join(cfg.admin_dir.get_path(), QUEUE_FILE_NAME),
93                )
94
95        # First handle jobs in the queue file
96        folders = []
97        for nzo_id in nzo_ids:
98            folder, _id = os.path.split(nzo_id)
99            path = get_admin_path(folder, future=False)
100
101            # Try as normal job
102            nzo = sabnzbd.load_data(_id, path, remove=False)
103            if not nzo:
104                # Try as future job
105                path = get_admin_path(folder, future=True)
106                nzo = sabnzbd.load_data(_id, path)
107            if nzo:
108                self.add(nzo, save=False, quiet=True)
109                folders.append(folder)
110
111        # Scan for any folders in "incomplete" that are not yet in the queue
112        if repair:
113            logging.info("Starting queue repair")
114            self.scan_jobs(not folders)
115            # Handle any lost future jobs
116            for item in globber_full(os.path.join(cfg.admin_dir.get_path(), FUTURE_Q_FOLDER)):
117                path, nzo_id = os.path.split(item)
118                if nzo_id not in self.__nzo_table:
119                    if nzo_id.startswith("SABnzbd_nzo"):
120                        nzo = sabnzbd.load_data(nzo_id, path, remove=True)
121                        if nzo:
122                            self.add(nzo, save=True)
123                    else:
124                        try:
125                            remove_file(item)
126                        except:
127                            pass
128
129    @NzbQueueLocker
130    def scan_jobs(self, all_jobs=False, action=True):
131        """Scan "incomplete" for missing folders,
132        'all' is True: Include active folders
133        'action' is True, do the recovery action
134        returns list of orphaned folders
135        """
136        result = []
137        # Folders from the download queue
138        if all_jobs:
139            registered = []
140        else:
141            registered = [nzo.work_name for nzo in self.__nzo_list]
142
143        # Retryable folders from History
144        items = sabnzbd.api.build_history()[0]
145        # Anything waiting or active or retryable is a known item
146        registered.extend(
147            [
148                os.path.basename(item["path"])
149                for item in items
150                if item["retry"] or item["loaded"] or item["status"] == Status.QUEUED
151            ]
152        )
153
154        # Repair unregistered folders
155        for folder in globber_full(cfg.download_dir.get_path()):
156            name = os.path.basename(folder)
157            if os.path.isdir(folder) and name not in registered and name not in IGNORED_FOLDERS:
158                if action:
159                    logging.info("Repairing job %s", folder)
160                    self.repair_job(folder)
161                result.append(os.path.basename(folder))
162            else:
163                if action:
164                    logging.info("Skipping repair for job %s", folder)
165        return result
166
167    def repair_job(self, repair_folder, new_nzb=None, password=None):
168        """Reconstruct admin for a single job folder, optionally with new NZB"""
169        # Check if folder exists
170        if not repair_folder or not os.path.exists(repair_folder):
171            return None
172
173        name = os.path.basename(repair_folder)
174        admin_path = os.path.join(repair_folder, JOB_ADMIN)
175
176        # If Retry was used and a new NZB was uploaded
177        if getattr(new_nzb, "filename", None):
178            remove_all(admin_path, "*.gz", keep_folder=True)
179            logging.debug("Repair job %s with new NZB (%s)", name, new_nzb.filename)
180            _, nzo_ids = sabnzbd.add_nzbfile(new_nzb, nzbname=name, reuse=repair_folder, password=password)
181        else:
182            # Was this file already post-processed?
183            verified = sabnzbd.load_data(VERIFIED_FILE, admin_path, remove=False)
184            filenames = []
185            if not verified or not all(verified[x] for x in verified):
186                filenames = globber_full(admin_path, "*.gz")
187
188            if filenames:
189                logging.debug("Repair job %s by re-parsing stored NZB", name)
190                _, nzo_ids = sabnzbd.add_nzbfile(filenames[0], nzbname=name, reuse=repair_folder, password=password)
191            else:
192                try:
193                    logging.debug("Repair job %s without stored NZB", name)
194                    nzo = NzbObject(name, nzbname=name, reuse=repair_folder)
195                    nzo.password = password
196                    self.add(nzo)
197                    nzo_ids = [nzo.nzo_id]
198                except:
199                    # NzoObject can throw exceptions if duplicate or unwanted etc
200                    logging.info("Skipping %s due to exception", name, exc_info=True)
201                    nzo_ids = []
202
203        # Return None if we could not add anything
204        if nzo_ids:
205            return nzo_ids[0]
206        return None
207
208    @NzbQueueLocker
209    def send_back(self, old_nzo: NzbObject):
210        """Send back job to queue after successful pre-check"""
211        try:
212            nzb_path = globber_full(old_nzo.admin_path, "*.gz")[0]
213        except:
214            logging.info("Failed to find NZB file after pre-check (%s)", old_nzo.nzo_id)
215            return
216
217        # Store old position and create new NZO
218        old_position = self.__nzo_list.index(old_nzo)
219        res, nzo_ids = process_single_nzb(
220            old_nzo.filename, nzb_path, keep=True, reuse=old_nzo.download_path, nzo_id=old_nzo.nzo_id
221        )
222        if res == 0 and nzo_ids:
223            # Swap to old position
224            new_nzo = self.get_nzo(nzo_ids[0])
225            self.__nzo_list.remove(new_nzo)
226            self.__nzo_list.insert(old_position, new_nzo)
227            # Reset reuse flag to make pause/abort on encryption possible
228            self.__nzo_table[nzo_ids[0]].reuse = None
229
230    @NzbQueueLocker
231    def save(self, save_nzo: Union[NzbObject, None, bool] = None):
232        """Save queue, all nzo's or just the specified one"""
233        logging.info("Saving queue")
234
235        nzo_ids = []
236        # Aggregate nzo_ids and save each nzo
237        for nzo in self.__nzo_list[:]:
238            if not nzo.is_gone():
239                nzo_ids.append(os.path.join(nzo.work_name, nzo.nzo_id))
240                if save_nzo is None or nzo is save_nzo:
241                    if not nzo.futuretype:
242                        # Also includes save_data for NZO
243                        nzo.save_to_disk()
244                    else:
245                        sabnzbd.save_data(nzo, nzo.nzo_id, nzo.admin_path)
246
247        sabnzbd.save_admin((QUEUE_VERSION, nzo_ids, []), QUEUE_FILE_NAME)
248
249    def set_top_only(self, value):
250        self.__top_only = value
251
252    def generate_future(self, msg, pp=None, script=None, cat=None, url=None, priority=DEFAULT_PRIORITY, nzbname=None):
253        """Create and return a placeholder nzo object"""
254        logging.debug("Creating placeholder NZO")
255        future_nzo = NzbObject(
256            filename=msg,
257            pp=pp,
258            script=script,
259            futuretype=True,
260            cat=cat,
261            url=url,
262            priority=priority,
263            nzbname=nzbname,
264            status=Status.GRABBING,
265        )
266        self.add(future_nzo)
267        return future_nzo
268
269    def change_opts(self, nzo_ids: str, pp: int) -> int:
270        result = 0
271        for nzo_id in [item.strip() for item in nzo_ids.split(",")]:
272            if nzo_id in self.__nzo_table:
273                self.__nzo_table[nzo_id].set_pp(pp)
274                result += 1
275        return result
276
277    def change_script(self, nzo_ids: str, script: str) -> int:
278        result = 0
279        if (script is None) or is_valid_script(script):
280            for nzo_id in [item.strip() for item in nzo_ids.split(",")]:
281                if nzo_id in self.__nzo_table:
282                    self.__nzo_table[nzo_id].script = script
283                    logging.info("Set script=%s for job %s", script, self.__nzo_table[nzo_id].final_name)
284                    result += 1
285        return result
286
287    def change_cat(self, nzo_ids: str, cat: str, explicit_priority=None):
288        result = 0
289        for nzo_id in [item.strip() for item in nzo_ids.split(",")]:
290            if nzo_id in self.__nzo_table:
291                nzo = self.__nzo_table[nzo_id]
292                nzo.cat, pp, nzo.script, prio = cat_to_opts(cat)
293                logging.info("Set cat=%s for job %s", cat, nzo.final_name)
294                nzo.set_pp(pp)
295                if explicit_priority is None:
296                    self.set_priority(nzo_id, prio)
297                # Abort any ongoing unpacking if the category changed
298                nzo.abort_direct_unpacker()
299                result += 1
300        return result
301
302    def change_name(self, nzo_id: str, name: str, password: str = None):
303        if nzo_id in self.__nzo_table:
304            nzo = self.__nzo_table[nzo_id]
305            logging.info("Renaming %s to %s", nzo.final_name, name)
306            # Abort any ongoing unpacking if the name changed (dirs change)
307            nzo.abort_direct_unpacker()
308            if not nzo.futuretype:
309                nzo.set_final_name_and_scan_password(name, password)
310            else:
311                # Reset url fetch wait time
312                nzo.url_wait = None
313                nzo.url_tries = 0
314            return True
315        else:
316            return False
317
318    def get_nzo(self, nzo_id) -> Optional[NzbObject]:
319        if nzo_id in self.__nzo_table:
320            return self.__nzo_table[nzo_id]
321        else:
322            return None
323
324    @NzbQueueLocker
325    def add(self, nzo: NzbObject, save=True, quiet=False) -> str:
326        if not nzo.nzo_id:
327            nzo.nzo_id = sabnzbd.get_new_id("nzo", nzo.admin_path, self.__nzo_table)
328
329        # If no files are to be downloaded anymore, send to postproc
330        if not nzo.files and not nzo.futuretype:
331            self.end_job(nzo)
332            return nzo.nzo_id
333
334        # Reset try_lists, markers and evaluate the scheduling settings
335        nzo.reset_try_list()
336        nzo.deleted = False
337        priority = nzo.priority
338        if sabnzbd.Scheduler.analyse(False, priority):
339            nzo.status = Status.PAUSED
340
341        self.__nzo_table[nzo.nzo_id] = nzo
342        if priority > HIGH_PRIORITY:
343            # Top and repair priority items are added to the top of the queue
344            self.__nzo_list.insert(0, nzo)
345        elif priority == LOW_PRIORITY:
346            self.__nzo_list.append(nzo)
347        else:
348            # for high priority we need to add the item at the bottom
349            # of any other high priority items above the normal priority
350            # for normal priority we need to add the item at the bottom
351            # of the normal priority items above the low priority
352            if self.__nzo_list:
353                pos = 0
354                added = False
355                for position in self.__nzo_list:
356                    if position.priority < priority:
357                        self.__nzo_list.insert(pos, nzo)
358                        added = True
359                        break
360                    pos += 1
361                if not added:
362                    # if there are no other items classed as a lower priority
363                    # then it will be added to the bottom of the queue
364                    self.__nzo_list.append(nzo)
365            else:
366                # if the queue is empty then simple append the item to the bottom
367                self.__nzo_list.append(nzo)
368        if save:
369            self.save(nzo)
370
371        if not (quiet or nzo.status == Status.FETCHING):
372            notifier.send_notification(T("NZB added to queue"), nzo.filename, "download", nzo.cat)
373
374        if not quiet and cfg.auto_sort():
375            try:
376                field, direction = cfg.auto_sort().split()
377                self.sort_queue(field, direction)
378            except ValueError:
379                pass
380        return nzo.nzo_id
381
382    @NzbQueueLocker
383    def remove(self, nzo_id: str, cleanup=True, delete_all_data=True):
384        """Remove NZO from queue.
385        It can be added to history directly.
386        Or, we do some clean-up, sometimes leaving some data.
387        """
388        if nzo_id in self.__nzo_table:
389            nzo = self.__nzo_table.pop(nzo_id)
390            logging.info("[%s] Removing job %s", caller_name(), nzo.final_name)
391
392            # Set statuses
393            nzo.deleted = True
394            if cleanup and not nzo.is_gone():
395                nzo.status = Status.DELETED
396            self.__nzo_list.remove(nzo)
397
398            if cleanup:
399                nzo.purge_data(delete_all_data=delete_all_data)
400            self.save(False)
401            return nzo_id
402        return None
403
404    @NzbQueueLocker
405    def remove_multiple(self, nzo_ids: List[str], delete_all_data=True) -> List[str]:
406        removed = []
407        for nzo_id in nzo_ids:
408            if self.remove(nzo_id, delete_all_data=delete_all_data):
409                removed.append(nzo_id)
410
411        # Any files left? Otherwise let's disconnect
412        if self.actives(grabs=False) == 0 and cfg.autodisconnect():
413            # This was the last job, close server connections
414            sabnzbd.Downloader.disconnect()
415
416        return removed
417
418    @NzbQueueLocker
419    def remove_all(self, search: str = "") -> List[str]:
420        """Remove NZO's that match the search-pattern"""
421        nzo_ids = []
422        search = safe_lower(search)
423        for nzo_id, nzo in self.__nzo_table.items():
424            if not search or search in nzo.final_name.lower():
425                nzo_ids.append(nzo_id)
426        return self.remove_multiple(nzo_ids)
427
428    def remove_nzf(self, nzo_id: str, nzf_id: str, force_delete=False) -> List[str]:
429        removed = []
430        if nzo_id in self.__nzo_table:
431            nzo = self.__nzo_table[nzo_id]
432            nzf = nzo.get_nzf_by_id(nzf_id)
433
434            if nzf:
435                removed.append(nzf_id)
436                nzo.abort_direct_unpacker()
437                post_done = nzo.remove_nzf(nzf)
438                if post_done:
439                    if nzo.finished_files:
440                        self.end_job(nzo)
441                    else:
442                        self.remove(nzo_id)
443                elif force_delete:
444                    # Force-remove all trace and update counters
445                    nzo.bytes -= nzf.bytes
446                    nzo.bytes_tried -= nzf.bytes - nzf.bytes_left
447                    if nzf.is_par2 or sabnzbd.par2file.is_parfile(nzf.filename):
448                        nzo.bytes_par2 -= nzf.bytes
449                    del nzo.files_table[nzf_id]
450                    nzo.finished_files.remove(nzf)
451            logging.info("Removed NZFs %s from job %s", removed, nzo.final_name)
452        return removed
453
454    def pause_multiple_nzo(self, nzo_ids: List[str]) -> List[str]:
455        handled = []
456        for nzo_id in nzo_ids:
457            self.pause_nzo(nzo_id)
458            handled.append(nzo_id)
459        return handled
460
461    def pause_nzo(self, nzo_id: str) -> List[str]:
462        handled = []
463        if nzo_id in self.__nzo_table:
464            nzo = self.__nzo_table[nzo_id]
465            nzo.pause()
466            logging.info("Paused nzo: %s", nzo_id)
467            handled.append(nzo_id)
468        return handled
469
470    def resume_multiple_nzo(self, nzo_ids: List[str]) -> List[str]:
471        handled = []
472        for nzo_id in nzo_ids:
473            self.resume_nzo(nzo_id)
474            handled.append(nzo_id)
475        return handled
476
477    @NzbQueueLocker
478    def resume_nzo(self, nzo_id: str) -> List[str]:
479        handled = []
480        if nzo_id in self.__nzo_table:
481            nzo = self.__nzo_table[nzo_id]
482            nzo.resume()
483            logging.info("Resumed nzo: %s", nzo_id)
484            handled.append(nzo_id)
485        return handled
486
487    @NzbQueueLocker
488    def switch(self, item_id_1: str, item_id_2: str) -> Tuple[int, int]:
489        try:
490            # Allow an index as second parameter, easier for some skins
491            i = int(item_id_2)
492            item_id_2 = self.__nzo_list[i].nzo_id
493        except:
494            pass
495        try:
496            nzo1 = self.__nzo_table[item_id_1]
497            nzo2 = self.__nzo_table[item_id_2]
498        except KeyError:
499            # One or both jobs missing
500            return -1, 0
501
502        if nzo1 == nzo2:
503            return -1, 0
504
505        # get the priorities of the two items
506        nzo1_priority = nzo1.priority
507        nzo2_priority = nzo2.priority
508        try:
509            # get the item id of the item below to use in priority changing
510            item_id_3 = self.__nzo_list[i + 1].nzo_id
511            # if there is an item below the id1 and id2 then we need that too
512            # to determine whether to change the priority
513            nzo3 = self.__nzo_table[item_id_3]
514            nzo3_priority = nzo3.priority
515            # if id1 is surrounded by items of a different priority then change it's pririty to match
516            if nzo2_priority != nzo1_priority and nzo3_priority != nzo1_priority or nzo2_priority > nzo1_priority:
517                nzo1.priority = nzo2_priority
518        except:
519            nzo1.priority = nzo2_priority
520        item_id_pos1 = -1
521        item_id_pos2 = -1
522        for i in range(len(self.__nzo_list)):
523            if item_id_1 == self.__nzo_list[i].nzo_id:
524                item_id_pos1 = i
525            elif item_id_2 == self.__nzo_list[i].nzo_id:
526                item_id_pos2 = i
527            if (item_id_pos1 > -1) and (item_id_pos2 > -1):
528                item = self.__nzo_list[item_id_pos1]
529                logging.info(
530                    "Switching job [%s] %s => [%s] %s",
531                    item_id_pos1,
532                    item.final_name,
533                    item_id_pos2,
534                    self.__nzo_list[item_id_pos2].final_name,
535                )
536                del self.__nzo_list[item_id_pos1]
537                self.__nzo_list.insert(item_id_pos2, item)
538                return item_id_pos2, nzo1.priority
539        # If moving failed/no movement took place
540        return -1, nzo1.priority
541
542    @NzbQueueLocker
543    def move_up_bulk(self, nzo_id, nzf_ids, size):
544        if nzo_id in self.__nzo_table:
545            for unused in range(size):
546                self.__nzo_table[nzo_id].move_up_bulk(nzf_ids)
547
548    @NzbQueueLocker
549    def move_top_bulk(self, nzo_id, nzf_ids):
550        if nzo_id in self.__nzo_table:
551            self.__nzo_table[nzo_id].move_top_bulk(nzf_ids)
552
553    @NzbQueueLocker
554    def move_down_bulk(self, nzo_id, nzf_ids, size):
555        if nzo_id in self.__nzo_table:
556            for unused in range(size):
557                self.__nzo_table[nzo_id].move_down_bulk(nzf_ids)
558
559    @NzbQueueLocker
560    def move_bottom_bulk(self, nzo_id, nzf_ids):
561        if nzo_id in self.__nzo_table:
562            self.__nzo_table[nzo_id].move_bottom_bulk(nzf_ids)
563
564    @NzbQueueLocker
565    def sort_by_avg_age(self, reverse=False):
566        logging.info("Sorting by average date... (reversed: %s)", reverse)
567        self.__nzo_list = sort_queue_function(self.__nzo_list, _nzo_date_cmp, reverse)
568
569    @NzbQueueLocker
570    def sort_by_name(self, reverse=False):
571        logging.info("Sorting by name... (reversed: %s)", reverse)
572        self.__nzo_list = sort_queue_function(self.__nzo_list, _nzo_name_cmp, reverse)
573
574    @NzbQueueLocker
575    def sort_by_size(self, reverse=False):
576        logging.info("Sorting by size... (reversed: %s)", reverse)
577        self.__nzo_list = sort_queue_function(self.__nzo_list, _nzo_size_cmp, reverse)
578
579    def sort_queue(self, field, reverse=None):
580        """Sort queue by field: "name", "size" or "avg_age"
581        Direction is specified as "desc"/True or "asc"/False
582        """
583        if isinstance(reverse, str):
584            if reverse.lower() == "desc":
585                reverse = True
586            else:
587                reverse = False
588        if reverse is None:
589            reverse = False
590        if field.lower() == "name":
591            self.sort_by_name(reverse)
592        elif field.lower() == "size" or field.lower() == "bytes":
593            self.sort_by_size(reverse)
594        elif field.lower() == "avg_age":
595            self.sort_by_avg_age(not reverse)
596        else:
597            logging.debug("Sort: %s not recognized", field)
598
599    @NzbQueueLocker
600    def __set_priority(self, nzo_id, priority):
601        """Sets the priority on the nzo and places it in the queue at the appropriate position"""
602        try:
603            priority = int_conv(priority)
604            nzo = self.__nzo_table[nzo_id]
605            nzo_id_pos1 = -1
606            pos = -1
607
608            # If priority == STOP_PRIORITY, then send to queue
609            if priority == STOP_PRIORITY:
610                self.end_job(nzo)
611                return
612
613            # Get the current position in the queue
614            for i in range(len(self.__nzo_list)):
615                if nzo_id == self.__nzo_list[i].nzo_id:
616                    nzo_id_pos1 = i
617                    break
618
619            # Don't change priority and order if priority is the same as asked
620            if priority == self.__nzo_list[nzo_id_pos1].priority:
621                return nzo_id_pos1
622
623            nzo.set_priority(priority)
624            if sabnzbd.Scheduler.analyse(False, priority) and nzo.status in (
625                Status.CHECKING,
626                Status.DOWNLOADING,
627                Status.QUEUED,
628            ):
629                nzo.status = Status.PAUSED
630            elif nzo.status == Status.PAUSED:
631                nzo.status = Status.QUEUED
632            nzo.save_to_disk()
633
634            if nzo_id_pos1 != -1:
635                del self.__nzo_list[nzo_id_pos1]
636                if priority == FORCE_PRIORITY:
637                    # A top priority item (usually a completed download fetching pars)
638                    # is added to the top of the queue
639                    self.__nzo_list.insert(0, nzo)
640                    pos = 0
641                elif priority == LOW_PRIORITY:
642                    pos = len(self.__nzo_list)
643                    self.__nzo_list.append(nzo)
644                else:
645                    # for high priority we need to add the item at the bottom
646                    # of any other high priority items above the normal priority
647                    # for normal priority we need to add the item at the bottom
648                    # of the normal priority items above the low priority
649                    if self.__nzo_list:
650                        p = 0
651                        added = False
652                        for position in self.__nzo_list:
653                            if position.priority < priority:
654                                self.__nzo_list.insert(p, nzo)
655                                pos = p
656                                added = True
657                                break
658                            p += 1
659                        if not added:
660                            # if there are no other items classed as a lower priority
661                            # then it will be added to the bottom of the queue
662                            pos = len(self.__nzo_list)
663                            self.__nzo_list.append(nzo)
664                    else:
665                        # if the queue is empty then simple append the item to the bottom
666                        self.__nzo_list.append(nzo)
667                        pos = 0
668
669            logging.info(
670                "Set priority=%s for job %s => position=%s ", priority, self.__nzo_table[nzo_id].final_name, pos
671            )
672            return pos
673
674        except:
675            return -1
676
677    @NzbQueueLocker
678    def set_priority(self, nzo_ids, priority):
679        try:
680            n = -1
681            for nzo_id in [item.strip() for item in nzo_ids.split(",")]:
682                n = self.__set_priority(nzo_id, priority)
683            return n
684        except:
685            return -1
686
687    @staticmethod
688    def reset_try_lists(article: Article, remove_fetcher_from_trylist: bool = True):
689        """Let article get new fetcher and reset trylists"""
690        if remove_fetcher_from_trylist:
691            article.remove_from_try_list(article.fetcher)
692        article.fetcher = None
693        article.tries = 0
694        article.nzf.reset_try_list()
695        article.nzf.nzo.reset_try_list()
696
697    def has_forced_items(self):
698        """Check if the queue contains any Forced
699        Priority items to download while paused
700        """
701        for nzo in self.__nzo_list:
702            if nzo.priority == FORCE_PRIORITY and nzo.status not in (Status.PAUSED, Status.GRABBING):
703                return True
704        return False
705
706    def get_articles(self, server: Server, servers: List[Server], fetch_limit: int) -> List[Article]:
707        """Get next article for jobs in the queue
708        Not locked for performance, since it only reads the queue
709        """
710        # Pre-calculate propagation delay
711        propagation_delay = float(cfg.propagation_delay() * 60)
712        for nzo in self.__nzo_list:
713            # Not when queue paused and not a forced item
714            if nzo.status not in (Status.PAUSED, Status.GRABBING) or nzo.priority == FORCE_PRIORITY:
715                # Check if past propagation delay, or forced
716                if (
717                    not propagation_delay
718                    or nzo.priority == FORCE_PRIORITY
719                    or (nzo.avg_stamp + propagation_delay) < time.time()
720                ):
721                    if not nzo.server_in_try_list(server):
722                        articles = nzo.get_articles(server, servers, fetch_limit)
723                        if articles:
724                            return articles
725                    # Stop after first job that wasn't paused/propagating/etc
726                    if self.__top_only:
727                        return []
728        return []
729
730    def register_article(self, article: Article, success: bool = True):
731        """Register the articles we tried
732        Not locked for performance, since it only modifies individual NZOs
733        """
734        nzf = article.nzf
735        nzo = nzf.nzo
736
737        if nzf.deleted:
738            logging.debug("Discarding article %s, no longer in queue", article.article)
739            return
740
741        articles_left, file_done, post_done = nzo.remove_article(article, success)
742
743        if nzo.is_gone():
744            logging.debug("Discarding article for file %s, no longer in queue", nzf.filename)
745        else:
746            # Write data if file is done or at trigger time
747            if file_done or (articles_left and (articles_left % DIRECT_WRITE_TRIGGER) == 0):
748                if not nzo.precheck:
749                    # Only start decoding if we have a filename and type
750                    # The type is only set if sabyenc could decode the article
751                    if nzf.filename and nzf.type:
752                        sabnzbd.Assembler.process(nzo, nzf, file_done)
753                    elif nzf.filename.lower().endswith(".par2"):
754                        # Broken par2 file, try to get another one
755                        nzo.promote_par2(nzf)
756                    else:
757                        if file_has_articles(nzf):
758                            logging.warning(T("%s -> Unknown encoding"), nzf.filename)
759
760            # Save bookkeeping in case of crash
761            if file_done and (nzo.next_save is None or time.time() > nzo.next_save):
762                nzo.save_to_disk()
763                sabnzbd.BPSMeter.save()
764                if nzo.save_timeout is None:
765                    nzo.next_save = None
766                else:
767                    nzo.next_save = time.time() + nzo.save_timeout
768
769            # Remove post from Queue
770            if post_done:
771                nzo.set_download_report()
772                self.end_job(nzo)
773
774    @NzbQueueLocker
775    def end_job(self, nzo: NzbObject):
776        """Send NZO to the post-processing queue"""
777        # Notify assembler to call postprocessor
778        if not nzo.deleted:
779            logging.info("[%s] Ending job %s", caller_name(), nzo.final_name)
780            nzo.deleted = True
781            if nzo.precheck:
782                nzo.save_to_disk()
783                # Check result
784                enough, _ = nzo.check_availability_ratio()
785                if enough:
786                    # Enough data present, do real download
787                    self.send_back(nzo)
788                    return
789                else:
790                    # Not enough data, let postprocessor show it as failed
791                    pass
792            sabnzbd.Assembler.process(nzo)
793
794    def actives(self, grabs=True) -> int:
795        """Return amount of non-paused jobs, optionally with 'grabbing' items
796        Not locked for performance, only reads the queue
797        """
798        n = 0
799        for nzo in self.__nzo_list:
800            # Ignore any items that are paused
801            if grabs and nzo.status == Status.GRABBING:
802                n += 1
803            elif nzo.status not in (Status.PAUSED, Status.GRABBING):
804                n += 1
805        return n
806
807    def queue_info(self, search=None, nzo_ids=None, start=0, limit=0):
808        """Return list of queued jobs,
809        optionally filtered by 'search' and 'nzo_ids', and limited by start and limit.
810        Not locked for performance, only reads the queue
811        """
812        if search:
813            search = search.lower()
814        if nzo_ids:
815            nzo_ids = nzo_ids.split(",")
816        bytes_left = 0
817        bytes_total = 0
818        bytes_left_previous_page = 0
819        q_size = 0
820        pnfo_list = []
821        n = 0
822
823        for nzo in self.__nzo_list:
824            if nzo.status not in (Status.PAUSED, Status.CHECKING) or nzo.priority == FORCE_PRIORITY:
825                b_left = nzo.remaining
826                bytes_total += nzo.bytes
827                bytes_left += b_left
828                q_size += 1
829                # We need the number of bytes before the current page
830                if n < start:
831                    bytes_left_previous_page += b_left
832
833            if (not search) or search in nzo.final_name.lower():
834                if (not nzo_ids) or nzo.nzo_id in nzo_ids:
835                    if (not limit) or (start <= n < start + limit):
836                        pnfo_list.append(nzo.gather_info())
837                    n += 1
838
839        if not search and not nzo_ids:
840            n = len(self.__nzo_list)
841        return QNFO(bytes_total, bytes_left, bytes_left_previous_page, pnfo_list, q_size, n)
842
843    def remaining(self):
844        """Return bytes left in the queue by non-paused items
845        Not locked for performance, only reads the queue
846        """
847        bytes_left = 0
848        for nzo in self.__nzo_list:
849            if nzo.status != Status.PAUSED:
850                bytes_left += nzo.remaining
851        return bytes_left
852
853    def is_empty(self):
854        empty = True
855        for nzo in self.__nzo_list:
856            if not nzo.futuretype and nzo.status != Status.PAUSED:
857                empty = False
858                break
859        return empty
860
861    def stop_idle_jobs(self):
862        """Detect jobs that have zero files left and send them to post processing"""
863        # Only check servers that are active
864        nr_servers = len([server for server in sabnzbd.Downloader.servers[:] if server.active])
865        empty = []
866
867        for nzo in self.__nzo_list:
868            if not nzo.futuretype and not nzo.files and nzo.status not in (Status.PAUSED, Status.GRABBING):
869                logging.info("Found idle job %s", nzo.final_name)
870                empty.append(nzo)
871
872            # Stall prevention by checking if all servers are in the trylist
873            # This is a CPU-cheaper alternative to prevent stalling
874            if len(nzo.try_list) >= nr_servers:
875                # Maybe the NZF's need a reset too?
876                for nzf in nzo.files:
877                    if len(nzf.try_list) >= nr_servers:
878                        # We do not want to reset all article trylists, they are good
879                        logging.info("Resetting bad trylist for file %s in job %s", nzf.filename, nzo.final_name)
880                        nzf.reset_try_list()
881
882                # Reset main trylist, minimal performance impact
883                logging.info("Resetting bad trylist for job %s", nzo.final_name)
884                nzo.reset_try_list()
885
886        for nzo in empty:
887            self.end_job(nzo)
888
889    def pause_on_prio(self, priority: int):
890        for nzo in self.__nzo_list:
891            if nzo.priority == priority:
892                nzo.pause()
893
894    @NzbQueueLocker
895    def resume_on_prio(self, priority: int):
896        for nzo in self.__nzo_list:
897            if nzo.priority == priority:
898                # Don't use nzo.resume() to avoid resetting job warning flags
899                nzo.status = Status.QUEUED
900
901    def pause_on_cat(self, cat: str):
902        for nzo in self.__nzo_list:
903            if nzo.cat == cat:
904                nzo.pause()
905
906    @NzbQueueLocker
907    def resume_on_cat(self, cat: str):
908        for nzo in self.__nzo_list:
909            if nzo.cat == cat:
910                # Don't use nzo.resume() to avoid resetting job warning flags
911                nzo.status = Status.QUEUED
912
913    def get_urls(self):
914        """Return list of future-types needing URL"""
915        lst = []
916        for nzo_id in self.__nzo_table:
917            nzo = self.__nzo_table[nzo_id]
918            if nzo.futuretype:
919                url = nzo.url
920                if nzo.futuretype and url.lower().startswith("http"):
921                    lst.append((url, nzo))
922        return lst
923
924    def __repr__(self):
925        return "<NzbQueue>"
926
927
928def _nzo_date_cmp(nzo1: NzbObject, nzo2: NzbObject):
929    avg_date1 = nzo1.avg_date
930    avg_date2 = nzo2.avg_date
931
932    if avg_date1 is None and avg_date2 is None:
933        return 0
934
935    if avg_date1 is None:
936        avg_date1 = datetime.datetime.now()
937    elif avg_date2 is None:
938        avg_date2 = datetime.datetime.now()
939
940    return cmp(avg_date1, avg_date2)
941
942
943def _nzo_name_cmp(nzo1, nzo2):
944    return cmp(nzo1.final_name.lower(), nzo2.final_name.lower())
945
946
947def _nzo_size_cmp(nzo1, nzo2):
948    return cmp(nzo1.bytes, nzo2.bytes)
949
950
951def sort_queue_function(nzo_list: List[NzbObject], method, reverse: bool) -> List[NzbObject]:
952    ultra_high_priority = [nzo for nzo in nzo_list if nzo.priority == REPAIR_PRIORITY]
953    super_high_priority = [nzo for nzo in nzo_list if nzo.priority == FORCE_PRIORITY]
954    high_priority = [nzo for nzo in nzo_list if nzo.priority == HIGH_PRIORITY]
955    normal_priority = [nzo for nzo in nzo_list if nzo.priority == NORMAL_PRIORITY]
956    low_priority = [nzo for nzo in nzo_list if nzo.priority == LOW_PRIORITY]
957
958    ultra_high_priority.sort(key=functools.cmp_to_key(method), reverse=reverse)
959    super_high_priority.sort(key=functools.cmp_to_key(method), reverse=reverse)
960    high_priority.sort(key=functools.cmp_to_key(method), reverse=reverse)
961    normal_priority.sort(key=functools.cmp_to_key(method), reverse=reverse)
962    low_priority.sort(key=functools.cmp_to_key(method), reverse=reverse)
963
964    new_list = ultra_high_priority
965    new_list.extend(super_high_priority)
966    new_list.extend(high_priority)
967    new_list.extend(normal_priority)
968    new_list.extend(low_priority)
969
970    # Make sure any left-over jobs enter the new list
971    for item in nzo_list:
972        if item not in new_list:
973            new_list.append(item)
974
975    return new_list
976