1#!/usr/bin/python3 -OO
2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; either version 2
7# of the License, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17
18"""
19sabnzbd.database - Database Support
20"""
21
22import os
23import time
24import zlib
25import logging
26import sys
27import threading
28import sqlite3
29from typing import Union, Dict, Optional, List
30
31import sabnzbd
32import sabnzbd.cfg
33from sabnzbd.constants import DB_HISTORY_NAME, STAGES, Status
34from sabnzbd.bpsmeter import this_week, this_month
35from sabnzbd.decorators import synchronized
36from sabnzbd.encoding import ubtou, utob
37from sabnzbd.misc import int_conv, caller_name, opts_to_pp
38from sabnzbd.filesystem import remove_file, clip_path
39
40DB_LOCK = threading.RLock()
41
42
43def convert_search(search):
44    """Convert classic wildcard to SQL wildcard"""
45    if not search:
46        # Default value
47        search = ""
48    else:
49        # Allow * for wildcard matching and space
50        search = search.replace("*", "%").replace(" ", "%")
51
52    # Allow ^ for start of string and $ for end of string
53    if search and search.startswith("^"):
54        search = search.replace("^", "")
55        search += "%"
56    elif search and search.endswith("$"):
57        search = search.replace("$", "")
58        search = "%" + search
59    else:
60        search = "%" + search + "%"
61    return search
62
63
64class HistoryDB:
65    """Class to access the History database
66    Each class-instance will create an access channel that
67    can be used in one thread.
68    Each thread needs its own class-instance!
69    """
70
71    # These class attributes will be accessed directly because
72    # they need to be shared by all instances
73    db_path = None  # Will contain full path to history database
74    done_cleaning = False  # Ensure we only do one Vacuum per session
75
76    @synchronized(DB_LOCK)
77    def __init__(self):
78        """Determine databse path and create connection"""
79        self.con = self.c = None
80        if not HistoryDB.db_path:
81            HistoryDB.db_path = os.path.join(sabnzbd.cfg.admin_dir.get_path(), DB_HISTORY_NAME)
82        self.connect()
83
84    def connect(self):
85        """Create a connection to the database"""
86        create_table = not os.path.exists(HistoryDB.db_path)
87        self.con = sqlite3.connect(HistoryDB.db_path)
88        self.con.row_factory = sqlite3.Row
89        self.c = self.con.cursor()
90        if create_table:
91            self.create_history_db()
92        elif not HistoryDB.done_cleaning:
93            # Run VACUUM on sqlite
94            # When an object (table, index, or trigger) is dropped from the database, it leaves behind empty space
95            # http://www.sqlite.org/lang_vacuum.html
96            HistoryDB.done_cleaning = True
97            self.execute("VACUUM")
98
99        self.execute("PRAGMA user_version;")
100        try:
101            version = self.c.fetchone()["user_version"]
102        except IndexError:
103            version = 0
104        if version < 1:
105            # Add any missing columns added since first DB version
106            # Use "and" to stop when database has been reset due to corruption
107            _ = (
108                self.execute("PRAGMA user_version = 1;")
109                and self.execute('ALTER TABLE "history" ADD COLUMN series TEXT;')
110                and self.execute('ALTER TABLE "history" ADD COLUMN md5sum TEXT;')
111            )
112        if version < 2:
113            # Add any missing columns added since second DB version
114            # Use "and" to stop when database has been reset due to corruption
115            _ = self.execute("PRAGMA user_version = 2;") and self.execute(
116                'ALTER TABLE "history" ADD COLUMN password TEXT;'
117            )
118
119    def execute(self, command, args=(), save=False):
120        """Wrapper for executing SQL commands"""
121        for tries in range(5, 0, -1):
122            try:
123                if args and isinstance(args, tuple):
124                    self.c.execute(command, args)
125                else:
126                    self.c.execute(command)
127                if save:
128                    self.con.commit()
129                return True
130            except:
131                error = str(sys.exc_info()[1])
132                if tries >= 0 and "is locked" in error:
133                    logging.debug("Database locked, wait and retry")
134                    time.sleep(0.5)
135                    continue
136                elif "readonly" in error:
137                    logging.error(T("Cannot write to History database, check access rights!"))
138                    # Report back success, because there's no recovery possible
139                    return True
140                elif "not a database" in error or "malformed" in error or "duplicate column name" in error:
141                    logging.error(T("Damaged History database, created empty replacement"))
142                    logging.info("Traceback: ", exc_info=True)
143                    self.close()
144                    try:
145                        remove_file(HistoryDB.db_path)
146                    except:
147                        pass
148                    self.connect()
149                    # Return False in case of "duplicate column" error
150                    # because the column addition in connect() must be terminated
151                    return "duplicate column name" not in error
152                else:
153                    logging.error(T("SQL Command Failed, see log"))
154                    logging.info("SQL: %s", command)
155                    logging.info("Arguments: %s", repr(args))
156                    logging.info("Traceback: ", exc_info=True)
157                    try:
158                        self.con.rollback()
159                    except:
160                        logging.debug("Rollback Failed:", exc_info=True)
161            return False
162
163    def create_history_db(self):
164        """Create a new (empty) database file"""
165        self.execute(
166            """
167        CREATE TABLE "history" (
168            "id" INTEGER PRIMARY KEY,
169            "completed" INTEGER NOT NULL,
170            "name" TEXT NOT NULL,
171            "nzb_name" TEXT NOT NULL,
172            "category" TEXT,
173            "pp" TEXT,
174            "script" TEXT,
175            "report" TEXT,
176            "url" TEXT,
177            "status" TEXT,
178            "nzo_id" TEXT,
179            "storage" TEXT,
180            "path" TEXT,
181            "script_log" BLOB,
182            "script_line" TEXT,
183            "download_time" INTEGER,
184            "postproc_time" INTEGER,
185            "stage_log" TEXT,
186            "downloaded" INTEGER,
187            "completeness" INTEGER,
188            "fail_message" TEXT,
189            "url_info" TEXT,
190            "bytes" INTEGER,
191            "meta" TEXT,
192            "series" TEXT,
193            "md5sum" TEXT,
194            "password" TEXT
195        )
196        """
197        )
198        self.execute("PRAGMA user_version = 2;")
199
200    def close(self):
201        """Close database connection"""
202        try:
203            self.c.close()
204            self.con.close()
205        except:
206            logging.error(T("Failed to close database, see log"))
207            logging.info("Traceback: ", exc_info=True)
208
209    def remove_completed(self, search=None):
210        """Remove all completed jobs from the database, optional with `search` pattern"""
211        search = convert_search(search)
212        logging.info("Removing all completed jobs from history")
213        return self.execute(
214            """DELETE FROM history WHERE name LIKE ? AND status = ?""", (search, Status.COMPLETED), save=True
215        )
216
217    def get_failed_paths(self, search=None):
218        """Return list of all storage paths of failed jobs (may contain non-existing or empty paths)"""
219        search = convert_search(search)
220        fetch_ok = self.execute(
221            """SELECT path FROM history WHERE name LIKE ? AND status = ?""", (search, Status.FAILED)
222        )
223        if fetch_ok:
224            return [item["path"] for item in self.c.fetchall()]
225        else:
226            return []
227
228    def remove_failed(self, search=None):
229        """Remove all failed jobs from the database, optional with `search` pattern"""
230        search = convert_search(search)
231        logging.info("Removing all failed jobs from history")
232        return self.execute(
233            """DELETE FROM history WHERE name LIKE ? AND status = ?""", (search, Status.FAILED), save=True
234        )
235
236    def remove_history(self, jobs=None):
237        """Remove all jobs in the list `jobs`, empty list will remove all completed jobs"""
238        if jobs is None:
239            self.remove_completed()
240        else:
241            if not isinstance(jobs, list):
242                jobs = [jobs]
243
244            for job in jobs:
245                self.execute("""DELETE FROM history WHERE nzo_id = ?""", (job,), save=True)
246                logging.info("[%s] Removing job %s from history", caller_name(), job)
247
248    def auto_history_purge(self):
249        """Remove history items based on the configured history-retention"""
250        if sabnzbd.cfg.history_retention() == "0":
251            return
252
253        if sabnzbd.cfg.history_retention() == "-1":
254            # Delete all non-failed ones
255            self.remove_completed()
256
257        if "d" in sabnzbd.cfg.history_retention():
258            # How many days to keep?
259            days_to_keep = int_conv(sabnzbd.cfg.history_retention().strip()[:-1])
260            seconds_to_keep = int(time.time()) - days_to_keep * 86400
261            if days_to_keep > 0:
262                logging.info("Removing completed jobs older than %s days from history", days_to_keep)
263                return self.execute(
264                    """DELETE FROM history WHERE status = ? AND completed < ?""",
265                    (Status.COMPLETED, seconds_to_keep),
266                    save=True,
267                )
268        else:
269            # How many to keep?
270            to_keep = int_conv(sabnzbd.cfg.history_retention())
271            if to_keep > 0:
272                logging.info("Removing all but last %s completed jobs from history", to_keep)
273                return self.execute(
274                    """DELETE FROM history WHERE status = ? AND id NOT IN (
275                        SELECT id FROM history WHERE status = ? ORDER BY completed DESC LIMIT ?
276                    )""",
277                    (Status.COMPLETED, Status.COMPLETED, to_keep),
278                    save=True,
279                )
280
281    def add_history_db(self, nzo, storage="", postproc_time=0, script_output="", script_line=""):
282        """Add a new job entry to the database"""
283        t = build_history_info(nzo, storage, postproc_time, script_output, script_line, series_info=True)
284
285        self.execute(
286            """INSERT INTO history (completed, name, nzb_name, category, pp, script, report,
287            url, status, nzo_id, storage, path, script_log, script_line, download_time, postproc_time, stage_log,
288            downloaded, fail_message, url_info, bytes, series, md5sum, password)
289            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
290            t,
291            save=True,
292        )
293        logging.info("Added job %s to history", nzo.final_name)
294
295    def fetch_history(
296        self,
297        start: Optional[int] = None,
298        limit: Optional[int] = None,
299        search: Optional[str] = None,
300        failed_only: int = 0,
301        categories: Optional[List[str]] = None,
302        nzo_ids: Optional[List[str]] = None,
303    ):
304        """Return records for specified jobs"""
305        command_args = [convert_search(search)]
306
307        post = ""
308        if categories:
309            categories = ["*" if c == "Default" else c for c in categories]
310            post = " AND (CATEGORY = ?"
311            post += " OR CATEGORY = ? " * (len(categories) - 1)
312            post += ")"
313            command_args.extend(categories)
314        if nzo_ids:
315            post += " AND (NZO_ID = ?"
316            post += " OR NZO_ID = ? " * (len(nzo_ids) - 1)
317            post += ")"
318            command_args.extend(nzo_ids)
319        if failed_only:
320            post += " AND STATUS = ?"
321            command_args.append(Status.FAILED)
322
323        cmd = "SELECT COUNT(*) FROM history WHERE name LIKE ?"
324        total_items = -1
325        if self.execute(cmd + post, tuple(command_args)):
326            total_items = self.c.fetchone()["COUNT(*)"]
327
328        if not start:
329            start = 0
330        if not limit:
331            limit = total_items
332
333        command_args.extend([start, limit])
334        cmd = "SELECT * FROM history WHERE name LIKE ?"
335        if self.execute(cmd + post + " ORDER BY completed desc LIMIT ?, ?", tuple(command_args)):
336            items = self.c.fetchall()
337        else:
338            items = []
339
340        fetched_items = len(items)
341
342        # Unpack the single line stage log
343        # Stage Name is separated by ::: stage lines by ; and stages by \r\n
344        items = [unpack_history_info(item) for item in items]
345
346        return items, fetched_items, total_items
347
348    def have_episode(self, series, season, episode):
349        """Check whether History contains this series episode"""
350        total = 0
351        series = series.lower().replace(".", " ").replace("_", " ").replace("  ", " ")
352        if series and season and episode:
353            pattern = "%s/%s/%s" % (series, season, episode)
354            if self.execute(
355                """SELECT COUNT(*) FROM History WHERE series = ? AND STATUS != ?""", (pattern, Status.FAILED)
356            ):
357                total = self.c.fetchone()["COUNT(*)"]
358        return total > 0
359
360    def have_name_or_md5sum(self, name, md5sum):
361        """Check whether this name or md5sum is already in History"""
362        total = 0
363        if self.execute(
364            """SELECT COUNT(*) FROM History WHERE ( LOWER(name) = LOWER(?) OR md5sum = ? ) AND STATUS != ?""",
365            (name, md5sum, Status.FAILED),
366        ):
367            total = self.c.fetchone()["COUNT(*)"]
368        return total > 0
369
370    def get_history_size(self):
371        """Returns the total size of the history and
372        amounts downloaded in the last month and week
373        """
374        # Total Size of the history
375        total = 0
376        if self.execute("""SELECT sum(bytes) FROM history"""):
377            total = self.c.fetchone()["sum(bytes)"]
378
379        # Amount downloaded this month
380        month_timest = int(this_month(time.time()))
381
382        month = 0
383        if self.execute("""SELECT sum(bytes) FROM history WHERE completed > ?""", (month_timest,)):
384            month = self.c.fetchone()["sum(bytes)"]
385
386        # Amount downloaded this week
387        week_timest = int(this_week(time.time()))
388
389        week = 0
390        if self.execute("""SELECT sum(bytes) FROM history WHERE completed > ?""", (week_timest,)):
391            week = self.c.fetchone()["sum(bytes)"]
392
393        return total, month, week
394
395    def get_script_log(self, nzo_id):
396        """Return decompressed log file"""
397        data = ""
398        t = (nzo_id,)
399        if self.execute("""SELECT script_log FROM history WHERE nzo_id = ?""", t):
400            try:
401                data = ubtou(zlib.decompress(self.c.fetchone()["script_log"]))
402            except:
403                pass
404        return data
405
406    def get_name(self, nzo_id):
407        """Return name of the job `nzo_id`"""
408        t = (nzo_id,)
409        name = ""
410        if self.execute("""SELECT name FROM history WHERE nzo_id = ?""", t):
411            try:
412                name = self.c.fetchone()["name"]
413            except TypeError:
414                # No records found
415                pass
416        return name
417
418    def get_path(self, nzo_id: str):
419        """Return the `incomplete` path of the job `nzo_id` if it is still there"""
420        t = (nzo_id,)
421        path = ""
422        if self.execute("""SELECT path FROM history WHERE nzo_id = ?""", t):
423            try:
424                path = self.c.fetchone()["path"]
425            except TypeError:
426                # No records found
427                pass
428        if os.path.exists(path):
429            return path
430        return None
431
432    def get_other(self, nzo_id):
433        """Return additional data for job `nzo_id`"""
434        t = (nzo_id,)
435        if self.execute("""SELECT * FROM history WHERE nzo_id = ?""", t):
436            try:
437                item = self.c.fetchone()
438                return item["report"], item["url"], item["pp"], item["script"], item["category"]
439            except TypeError:
440                # No records found
441                pass
442        return "", "", "", "", ""
443
444    def __enter__(self):
445        """For context manager support"""
446        return self
447
448    def __exit__(self, exc_type, exc_val, exc_tb):
449        """For context manager support, ignore any exception"""
450        self.close()
451
452
453_PP_LOOKUP = {0: "", 1: "R", 2: "U", 3: "D"}
454
455
456def build_history_info(nzo, workdir_complete="", postproc_time=0, script_output="", script_line="", series_info=False):
457    """Collects all the information needed for the database"""
458    completed = int(time.time())
459    pp = _PP_LOOKUP.get(opts_to_pp(*nzo.repair_opts), "X")
460
461    if script_output:
462        # Compress the output of the script
463        script_output = sqlite3.Binary(zlib.compress(utob(script_output)))
464
465    download_time = nzo.nzo_info.get("download_time", 0)
466    url_info = nzo.nzo_info.get("details", "") or nzo.nzo_info.get("more_info", "")
467
468    # Get the dictionary containing the stages and their unpack process
469    # Pack the dictionary up into a single string
470    # Stage Name is separated by ::: stage lines by ; and stages by \r\n
471    lines = []
472    for key, results in nzo.unpack_info.items():
473        lines.append("%s:::%s" % (key, ";".join(results)))
474    stage_log = "\r\n".join(lines)
475
476    # Reuse the old 'report' column to indicate a URL-fetch
477    report = "future" if nzo.futuretype else ""
478
479    # Analyze series info only when job is finished
480    series = ""
481    if series_info:
482        seriesname, season, episode, _ = sabnzbd.newsunpack.analyse_show(nzo.final_name)
483        if seriesname and season and episode:
484            series = "%s/%s/%s" % (seriesname.lower(), season, episode)
485
486    return (
487        completed,
488        nzo.final_name,
489        nzo.filename,
490        nzo.cat,
491        pp,
492        nzo.script,
493        report,
494        nzo.url,
495        nzo.status,
496        nzo.nzo_id,
497        clip_path(workdir_complete),
498        clip_path(nzo.download_path),
499        script_output,
500        script_line,
501        download_time,
502        postproc_time,
503        stage_log,
504        nzo.bytes_downloaded,
505        nzo.fail_msg,
506        url_info,
507        nzo.bytes_downloaded,
508        series,
509        nzo.md5sum,
510        nzo.password,
511    )
512
513
514def unpack_history_info(item: Union[Dict, sqlite3.Row]):
515    """Expands the single line stage_log from the DB
516    into a python dictionary for use in the history display
517    """
518    # Convert result to dictionary
519    if isinstance(item, sqlite3.Row):
520        item = dict(item)
521
522    # Stage Name is separated by ::: stage lines by ; and stages by \r\n
523    lst = item["stage_log"]
524    if lst:
525        parsed_stage_log = []
526        try:
527            all_stages_lines = lst.split("\r\n")
528        except:
529            logging.error(T("Invalid stage logging in history for %s"), item["name"])
530            logging.debug("Lines: %s", lst)
531            all_stages_lines = []
532
533        for stage_lines in all_stages_lines:
534            try:
535                key, logs = stage_lines.split(":::")
536            except:
537                logging.info('Missing key:::logs "%s"', stage_lines)
538                continue
539            stage = {"name": key, "actions": []}
540            try:
541                stage["actions"] = logs.split(";")
542            except:
543                logging.error(T("Invalid stage logging in history for %s"), item["name"])
544                logging.debug("Logs: %s", logs)
545            parsed_stage_log.append(stage)
546
547        # Sort it so it is more logical
548        parsed_stage_log.sort(key=lambda stage_log: STAGES.get(stage_log["name"], 100))
549        item["stage_log"] = parsed_stage_log
550
551    if item["script_log"]:
552        item["script_log"] = ""
553    # The action line is only available for items in the postproc queue
554    if "action_line" not in item:
555        item["action_line"] = ""
556    return item
557
558
559def midnight_history_purge():
560    logging.info("Scheduled history purge")
561    with HistoryDB() as history_db:
562        history_db.auto_history_purge()
563