1#!/usr/bin/python3 -OO 2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org> 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU General Public License 6# as published by the Free Software Foundation; either version 2 7# of the License, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 18""" 19sabnzbd.database - Database Support 20""" 21 22import os 23import time 24import zlib 25import logging 26import sys 27import threading 28import sqlite3 29from typing import Union, Dict, Optional, List 30 31import sabnzbd 32import sabnzbd.cfg 33from sabnzbd.constants import DB_HISTORY_NAME, STAGES, Status 34from sabnzbd.bpsmeter import this_week, this_month 35from sabnzbd.decorators import synchronized 36from sabnzbd.encoding import ubtou, utob 37from sabnzbd.misc import int_conv, caller_name, opts_to_pp 38from sabnzbd.filesystem import remove_file, clip_path 39 40DB_LOCK = threading.RLock() 41 42 43def convert_search(search): 44 """Convert classic wildcard to SQL wildcard""" 45 if not search: 46 # Default value 47 search = "" 48 else: 49 # Allow * for wildcard matching and space 50 search = search.replace("*", "%").replace(" ", "%") 51 52 # Allow ^ for start of string and $ for end of string 53 if search and search.startswith("^"): 54 search = search.replace("^", "") 55 search += "%" 56 elif search and search.endswith("$"): 57 search = search.replace("$", "") 58 search = "%" + search 59 else: 60 search = "%" + search + "%" 61 return search 62 63 64class HistoryDB: 65 """Class to access the History database 66 Each class-instance will create an access channel that 67 can be used in one thread. 68 Each thread needs its own class-instance! 69 """ 70 71 # These class attributes will be accessed directly because 72 # they need to be shared by all instances 73 db_path = None # Will contain full path to history database 74 done_cleaning = False # Ensure we only do one Vacuum per session 75 76 @synchronized(DB_LOCK) 77 def __init__(self): 78 """Determine databse path and create connection""" 79 self.con = self.c = None 80 if not HistoryDB.db_path: 81 HistoryDB.db_path = os.path.join(sabnzbd.cfg.admin_dir.get_path(), DB_HISTORY_NAME) 82 self.connect() 83 84 def connect(self): 85 """Create a connection to the database""" 86 create_table = not os.path.exists(HistoryDB.db_path) 87 self.con = sqlite3.connect(HistoryDB.db_path) 88 self.con.row_factory = sqlite3.Row 89 self.c = self.con.cursor() 90 if create_table: 91 self.create_history_db() 92 elif not HistoryDB.done_cleaning: 93 # Run VACUUM on sqlite 94 # When an object (table, index, or trigger) is dropped from the database, it leaves behind empty space 95 # http://www.sqlite.org/lang_vacuum.html 96 HistoryDB.done_cleaning = True 97 self.execute("VACUUM") 98 99 self.execute("PRAGMA user_version;") 100 try: 101 version = self.c.fetchone()["user_version"] 102 except IndexError: 103 version = 0 104 if version < 1: 105 # Add any missing columns added since first DB version 106 # Use "and" to stop when database has been reset due to corruption 107 _ = ( 108 self.execute("PRAGMA user_version = 1;") 109 and self.execute('ALTER TABLE "history" ADD COLUMN series TEXT;') 110 and self.execute('ALTER TABLE "history" ADD COLUMN md5sum TEXT;') 111 ) 112 if version < 2: 113 # Add any missing columns added since second DB version 114 # Use "and" to stop when database has been reset due to corruption 115 _ = self.execute("PRAGMA user_version = 2;") and self.execute( 116 'ALTER TABLE "history" ADD COLUMN password TEXT;' 117 ) 118 119 def execute(self, command, args=(), save=False): 120 """Wrapper for executing SQL commands""" 121 for tries in range(5, 0, -1): 122 try: 123 if args and isinstance(args, tuple): 124 self.c.execute(command, args) 125 else: 126 self.c.execute(command) 127 if save: 128 self.con.commit() 129 return True 130 except: 131 error = str(sys.exc_info()[1]) 132 if tries >= 0 and "is locked" in error: 133 logging.debug("Database locked, wait and retry") 134 time.sleep(0.5) 135 continue 136 elif "readonly" in error: 137 logging.error(T("Cannot write to History database, check access rights!")) 138 # Report back success, because there's no recovery possible 139 return True 140 elif "not a database" in error or "malformed" in error or "duplicate column name" in error: 141 logging.error(T("Damaged History database, created empty replacement")) 142 logging.info("Traceback: ", exc_info=True) 143 self.close() 144 try: 145 remove_file(HistoryDB.db_path) 146 except: 147 pass 148 self.connect() 149 # Return False in case of "duplicate column" error 150 # because the column addition in connect() must be terminated 151 return "duplicate column name" not in error 152 else: 153 logging.error(T("SQL Command Failed, see log")) 154 logging.info("SQL: %s", command) 155 logging.info("Arguments: %s", repr(args)) 156 logging.info("Traceback: ", exc_info=True) 157 try: 158 self.con.rollback() 159 except: 160 logging.debug("Rollback Failed:", exc_info=True) 161 return False 162 163 def create_history_db(self): 164 """Create a new (empty) database file""" 165 self.execute( 166 """ 167 CREATE TABLE "history" ( 168 "id" INTEGER PRIMARY KEY, 169 "completed" INTEGER NOT NULL, 170 "name" TEXT NOT NULL, 171 "nzb_name" TEXT NOT NULL, 172 "category" TEXT, 173 "pp" TEXT, 174 "script" TEXT, 175 "report" TEXT, 176 "url" TEXT, 177 "status" TEXT, 178 "nzo_id" TEXT, 179 "storage" TEXT, 180 "path" TEXT, 181 "script_log" BLOB, 182 "script_line" TEXT, 183 "download_time" INTEGER, 184 "postproc_time" INTEGER, 185 "stage_log" TEXT, 186 "downloaded" INTEGER, 187 "completeness" INTEGER, 188 "fail_message" TEXT, 189 "url_info" TEXT, 190 "bytes" INTEGER, 191 "meta" TEXT, 192 "series" TEXT, 193 "md5sum" TEXT, 194 "password" TEXT 195 ) 196 """ 197 ) 198 self.execute("PRAGMA user_version = 2;") 199 200 def close(self): 201 """Close database connection""" 202 try: 203 self.c.close() 204 self.con.close() 205 except: 206 logging.error(T("Failed to close database, see log")) 207 logging.info("Traceback: ", exc_info=True) 208 209 def remove_completed(self, search=None): 210 """Remove all completed jobs from the database, optional with `search` pattern""" 211 search = convert_search(search) 212 logging.info("Removing all completed jobs from history") 213 return self.execute( 214 """DELETE FROM history WHERE name LIKE ? AND status = ?""", (search, Status.COMPLETED), save=True 215 ) 216 217 def get_failed_paths(self, search=None): 218 """Return list of all storage paths of failed jobs (may contain non-existing or empty paths)""" 219 search = convert_search(search) 220 fetch_ok = self.execute( 221 """SELECT path FROM history WHERE name LIKE ? AND status = ?""", (search, Status.FAILED) 222 ) 223 if fetch_ok: 224 return [item["path"] for item in self.c.fetchall()] 225 else: 226 return [] 227 228 def remove_failed(self, search=None): 229 """Remove all failed jobs from the database, optional with `search` pattern""" 230 search = convert_search(search) 231 logging.info("Removing all failed jobs from history") 232 return self.execute( 233 """DELETE FROM history WHERE name LIKE ? AND status = ?""", (search, Status.FAILED), save=True 234 ) 235 236 def remove_history(self, jobs=None): 237 """Remove all jobs in the list `jobs`, empty list will remove all completed jobs""" 238 if jobs is None: 239 self.remove_completed() 240 else: 241 if not isinstance(jobs, list): 242 jobs = [jobs] 243 244 for job in jobs: 245 self.execute("""DELETE FROM history WHERE nzo_id = ?""", (job,), save=True) 246 logging.info("[%s] Removing job %s from history", caller_name(), job) 247 248 def auto_history_purge(self): 249 """Remove history items based on the configured history-retention""" 250 if sabnzbd.cfg.history_retention() == "0": 251 return 252 253 if sabnzbd.cfg.history_retention() == "-1": 254 # Delete all non-failed ones 255 self.remove_completed() 256 257 if "d" in sabnzbd.cfg.history_retention(): 258 # How many days to keep? 259 days_to_keep = int_conv(sabnzbd.cfg.history_retention().strip()[:-1]) 260 seconds_to_keep = int(time.time()) - days_to_keep * 86400 261 if days_to_keep > 0: 262 logging.info("Removing completed jobs older than %s days from history", days_to_keep) 263 return self.execute( 264 """DELETE FROM history WHERE status = ? AND completed < ?""", 265 (Status.COMPLETED, seconds_to_keep), 266 save=True, 267 ) 268 else: 269 # How many to keep? 270 to_keep = int_conv(sabnzbd.cfg.history_retention()) 271 if to_keep > 0: 272 logging.info("Removing all but last %s completed jobs from history", to_keep) 273 return self.execute( 274 """DELETE FROM history WHERE status = ? AND id NOT IN ( 275 SELECT id FROM history WHERE status = ? ORDER BY completed DESC LIMIT ? 276 )""", 277 (Status.COMPLETED, Status.COMPLETED, to_keep), 278 save=True, 279 ) 280 281 def add_history_db(self, nzo, storage="", postproc_time=0, script_output="", script_line=""): 282 """Add a new job entry to the database""" 283 t = build_history_info(nzo, storage, postproc_time, script_output, script_line, series_info=True) 284 285 self.execute( 286 """INSERT INTO history (completed, name, nzb_name, category, pp, script, report, 287 url, status, nzo_id, storage, path, script_log, script_line, download_time, postproc_time, stage_log, 288 downloaded, fail_message, url_info, bytes, series, md5sum, password) 289 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", 290 t, 291 save=True, 292 ) 293 logging.info("Added job %s to history", nzo.final_name) 294 295 def fetch_history( 296 self, 297 start: Optional[int] = None, 298 limit: Optional[int] = None, 299 search: Optional[str] = None, 300 failed_only: int = 0, 301 categories: Optional[List[str]] = None, 302 nzo_ids: Optional[List[str]] = None, 303 ): 304 """Return records for specified jobs""" 305 command_args = [convert_search(search)] 306 307 post = "" 308 if categories: 309 categories = ["*" if c == "Default" else c for c in categories] 310 post = " AND (CATEGORY = ?" 311 post += " OR CATEGORY = ? " * (len(categories) - 1) 312 post += ")" 313 command_args.extend(categories) 314 if nzo_ids: 315 post += " AND (NZO_ID = ?" 316 post += " OR NZO_ID = ? " * (len(nzo_ids) - 1) 317 post += ")" 318 command_args.extend(nzo_ids) 319 if failed_only: 320 post += " AND STATUS = ?" 321 command_args.append(Status.FAILED) 322 323 cmd = "SELECT COUNT(*) FROM history WHERE name LIKE ?" 324 total_items = -1 325 if self.execute(cmd + post, tuple(command_args)): 326 total_items = self.c.fetchone()["COUNT(*)"] 327 328 if not start: 329 start = 0 330 if not limit: 331 limit = total_items 332 333 command_args.extend([start, limit]) 334 cmd = "SELECT * FROM history WHERE name LIKE ?" 335 if self.execute(cmd + post + " ORDER BY completed desc LIMIT ?, ?", tuple(command_args)): 336 items = self.c.fetchall() 337 else: 338 items = [] 339 340 fetched_items = len(items) 341 342 # Unpack the single line stage log 343 # Stage Name is separated by ::: stage lines by ; and stages by \r\n 344 items = [unpack_history_info(item) for item in items] 345 346 return items, fetched_items, total_items 347 348 def have_episode(self, series, season, episode): 349 """Check whether History contains this series episode""" 350 total = 0 351 series = series.lower().replace(".", " ").replace("_", " ").replace(" ", " ") 352 if series and season and episode: 353 pattern = "%s/%s/%s" % (series, season, episode) 354 if self.execute( 355 """SELECT COUNT(*) FROM History WHERE series = ? AND STATUS != ?""", (pattern, Status.FAILED) 356 ): 357 total = self.c.fetchone()["COUNT(*)"] 358 return total > 0 359 360 def have_name_or_md5sum(self, name, md5sum): 361 """Check whether this name or md5sum is already in History""" 362 total = 0 363 if self.execute( 364 """SELECT COUNT(*) FROM History WHERE ( LOWER(name) = LOWER(?) OR md5sum = ? ) AND STATUS != ?""", 365 (name, md5sum, Status.FAILED), 366 ): 367 total = self.c.fetchone()["COUNT(*)"] 368 return total > 0 369 370 def get_history_size(self): 371 """Returns the total size of the history and 372 amounts downloaded in the last month and week 373 """ 374 # Total Size of the history 375 total = 0 376 if self.execute("""SELECT sum(bytes) FROM history"""): 377 total = self.c.fetchone()["sum(bytes)"] 378 379 # Amount downloaded this month 380 month_timest = int(this_month(time.time())) 381 382 month = 0 383 if self.execute("""SELECT sum(bytes) FROM history WHERE completed > ?""", (month_timest,)): 384 month = self.c.fetchone()["sum(bytes)"] 385 386 # Amount downloaded this week 387 week_timest = int(this_week(time.time())) 388 389 week = 0 390 if self.execute("""SELECT sum(bytes) FROM history WHERE completed > ?""", (week_timest,)): 391 week = self.c.fetchone()["sum(bytes)"] 392 393 return total, month, week 394 395 def get_script_log(self, nzo_id): 396 """Return decompressed log file""" 397 data = "" 398 t = (nzo_id,) 399 if self.execute("""SELECT script_log FROM history WHERE nzo_id = ?""", t): 400 try: 401 data = ubtou(zlib.decompress(self.c.fetchone()["script_log"])) 402 except: 403 pass 404 return data 405 406 def get_name(self, nzo_id): 407 """Return name of the job `nzo_id`""" 408 t = (nzo_id,) 409 name = "" 410 if self.execute("""SELECT name FROM history WHERE nzo_id = ?""", t): 411 try: 412 name = self.c.fetchone()["name"] 413 except TypeError: 414 # No records found 415 pass 416 return name 417 418 def get_path(self, nzo_id: str): 419 """Return the `incomplete` path of the job `nzo_id` if it is still there""" 420 t = (nzo_id,) 421 path = "" 422 if self.execute("""SELECT path FROM history WHERE nzo_id = ?""", t): 423 try: 424 path = self.c.fetchone()["path"] 425 except TypeError: 426 # No records found 427 pass 428 if os.path.exists(path): 429 return path 430 return None 431 432 def get_other(self, nzo_id): 433 """Return additional data for job `nzo_id`""" 434 t = (nzo_id,) 435 if self.execute("""SELECT * FROM history WHERE nzo_id = ?""", t): 436 try: 437 item = self.c.fetchone() 438 return item["report"], item["url"], item["pp"], item["script"], item["category"] 439 except TypeError: 440 # No records found 441 pass 442 return "", "", "", "", "" 443 444 def __enter__(self): 445 """For context manager support""" 446 return self 447 448 def __exit__(self, exc_type, exc_val, exc_tb): 449 """For context manager support, ignore any exception""" 450 self.close() 451 452 453_PP_LOOKUP = {0: "", 1: "R", 2: "U", 3: "D"} 454 455 456def build_history_info(nzo, workdir_complete="", postproc_time=0, script_output="", script_line="", series_info=False): 457 """Collects all the information needed for the database""" 458 completed = int(time.time()) 459 pp = _PP_LOOKUP.get(opts_to_pp(*nzo.repair_opts), "X") 460 461 if script_output: 462 # Compress the output of the script 463 script_output = sqlite3.Binary(zlib.compress(utob(script_output))) 464 465 download_time = nzo.nzo_info.get("download_time", 0) 466 url_info = nzo.nzo_info.get("details", "") or nzo.nzo_info.get("more_info", "") 467 468 # Get the dictionary containing the stages and their unpack process 469 # Pack the dictionary up into a single string 470 # Stage Name is separated by ::: stage lines by ; and stages by \r\n 471 lines = [] 472 for key, results in nzo.unpack_info.items(): 473 lines.append("%s:::%s" % (key, ";".join(results))) 474 stage_log = "\r\n".join(lines) 475 476 # Reuse the old 'report' column to indicate a URL-fetch 477 report = "future" if nzo.futuretype else "" 478 479 # Analyze series info only when job is finished 480 series = "" 481 if series_info: 482 seriesname, season, episode, _ = sabnzbd.newsunpack.analyse_show(nzo.final_name) 483 if seriesname and season and episode: 484 series = "%s/%s/%s" % (seriesname.lower(), season, episode) 485 486 return ( 487 completed, 488 nzo.final_name, 489 nzo.filename, 490 nzo.cat, 491 pp, 492 nzo.script, 493 report, 494 nzo.url, 495 nzo.status, 496 nzo.nzo_id, 497 clip_path(workdir_complete), 498 clip_path(nzo.download_path), 499 script_output, 500 script_line, 501 download_time, 502 postproc_time, 503 stage_log, 504 nzo.bytes_downloaded, 505 nzo.fail_msg, 506 url_info, 507 nzo.bytes_downloaded, 508 series, 509 nzo.md5sum, 510 nzo.password, 511 ) 512 513 514def unpack_history_info(item: Union[Dict, sqlite3.Row]): 515 """Expands the single line stage_log from the DB 516 into a python dictionary for use in the history display 517 """ 518 # Convert result to dictionary 519 if isinstance(item, sqlite3.Row): 520 item = dict(item) 521 522 # Stage Name is separated by ::: stage lines by ; and stages by \r\n 523 lst = item["stage_log"] 524 if lst: 525 parsed_stage_log = [] 526 try: 527 all_stages_lines = lst.split("\r\n") 528 except: 529 logging.error(T("Invalid stage logging in history for %s"), item["name"]) 530 logging.debug("Lines: %s", lst) 531 all_stages_lines = [] 532 533 for stage_lines in all_stages_lines: 534 try: 535 key, logs = stage_lines.split(":::") 536 except: 537 logging.info('Missing key:::logs "%s"', stage_lines) 538 continue 539 stage = {"name": key, "actions": []} 540 try: 541 stage["actions"] = logs.split(";") 542 except: 543 logging.error(T("Invalid stage logging in history for %s"), item["name"]) 544 logging.debug("Logs: %s", logs) 545 parsed_stage_log.append(stage) 546 547 # Sort it so it is more logical 548 parsed_stage_log.sort(key=lambda stage_log: STAGES.get(stage_log["name"], 100)) 549 item["stage_log"] = parsed_stage_log 550 551 if item["script_log"]: 552 item["script_log"] = "" 553 # The action line is only available for items in the postproc queue 554 if "action_line" not in item: 555 item["action_line"] = "" 556 return item 557 558 559def midnight_history_purge(): 560 logging.info("Scheduled history purge") 561 with HistoryDB() as history_db: 562 history_db.auto_history_purge() 563