1#!/usr/bin/python3 -OO 2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org> 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU General Public License 6# as published by the Free Software Foundation; either version 2 7# of the License, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 18""" 19sabnzbd.assembler - threaded assembly/decoding of files 20""" 21 22import os 23import queue 24import logging 25import re 26from threading import Thread 27from time import sleep 28import hashlib 29from typing import Tuple, Optional, List 30 31import sabnzbd 32from sabnzbd.misc import get_all_passwords, match_str 33from sabnzbd.filesystem import ( 34 set_permissions, 35 clip_path, 36 has_win_device, 37 diskspace, 38 get_filename, 39 has_unwanted_extension, 40) 41from sabnzbd.constants import Status, GIGI, MAX_ASSEMBLER_QUEUE 42import sabnzbd.cfg as cfg 43from sabnzbd.nzbstuff import NzbObject, NzbFile 44import sabnzbd.downloader 45import sabnzbd.par2file as par2file 46import sabnzbd.utils.rarfile as rarfile 47 48 49class Assembler(Thread): 50 def __init__(self): 51 super().__init__() 52 self.queue: queue.Queue[Tuple[Optional[NzbObject], Optional[NzbFile], Optional[bool]]] = queue.Queue() 53 54 def stop(self): 55 self.queue.put((None, None, None)) 56 57 def process(self, nzo: NzbObject, nzf: Optional[NzbFile] = None, file_done: Optional[bool] = None): 58 self.queue.put((nzo, nzf, file_done)) 59 60 def queue_full(self): 61 return self.queue.qsize() >= MAX_ASSEMBLER_QUEUE 62 63 def run(self): 64 while 1: 65 # Set NzbObject and NzbFile objects to None so references 66 # from this thread do not keep the objects alive (see #1628) 67 nzo = nzf = None 68 nzo, nzf, file_done = self.queue.get() 69 if not nzo: 70 logging.info("Shutting down") 71 break 72 73 if nzf: 74 # Check if enough disk space is free after each file is done 75 # If not enough space left, pause downloader and send email 76 if file_done and not sabnzbd.Downloader.paused: 77 freespace = diskspace(force=True) 78 full_dir = None 79 required_space = (cfg.download_free.get_float() + nzf.bytes) / GIGI 80 if freespace["download_dir"][1] < required_space: 81 full_dir = "download_dir" 82 83 # Enough space in download_dir, check complete_dir 84 complete_free = cfg.complete_free.get_float() 85 if complete_free > 0 and not full_dir: 86 required_space = 0 87 if cfg.direct_unpack(): 88 required_space = (complete_free + nzo.bytes_downloaded) / GIGI 89 else: 90 # Continue downloading until 95% complete before checking 91 if nzo.bytes_tried > (nzo.bytes - nzo.bytes_par2) * 0.95: 92 required_space = (complete_free + nzo.bytes) / GIGI 93 94 if required_space and freespace["complete_dir"][1] < required_space: 95 full_dir = "complete_dir" 96 97 if full_dir: 98 logging.warning(T("Too little diskspace forcing PAUSE")) 99 # Pause downloader, but don't save, since the disk is almost full! 100 sabnzbd.Downloader.pause() 101 if cfg.fulldisk_autoresume(): 102 sabnzbd.Scheduler.plan_diskspace_resume(full_dir, required_space) 103 sabnzbd.emailer.diskfull_mail() 104 105 # Prepare filepath 106 filepath = nzf.prepare_filepath() 107 108 if filepath: 109 logging.debug("Decoding part of %s", filepath) 110 try: 111 self.assemble(nzf, file_done) 112 except IOError as err: 113 # If job was deleted or in active post-processing, ignore error 114 if not nzo.deleted and not nzo.is_gone() and not nzo.pp_active: 115 # 28 == disk full => pause downloader 116 if err.errno == 28: 117 logging.error(T("Disk full! Forcing Pause")) 118 else: 119 logging.error(T("Disk error on creating file %s"), clip_path(filepath)) 120 # Log traceback 121 logging.info("Traceback: ", exc_info=True) 122 # Pause without saving 123 sabnzbd.Downloader.pause() 124 continue 125 except: 126 logging.error(T("Fatal error in Assembler"), exc_info=True) 127 break 128 129 # Continue after partly written data 130 if not file_done: 131 continue 132 133 # Clean-up admin data 134 logging.info("Decoding finished %s", filepath) 135 nzf.remove_admin() 136 137 # Do rar-related processing 138 if rarfile.is_rarfile(filepath): 139 # Encryption and unwanted extension detection 140 rar_encrypted, unwanted_file = check_encrypted_and_unwanted_files(nzo, filepath) 141 if rar_encrypted: 142 if cfg.pause_on_pwrar() == 1: 143 logging.warning( 144 T( 145 'Paused job "%s" because of encrypted RAR file (if supplied, all passwords were tried)' 146 ), 147 nzo.final_name, 148 ) 149 nzo.pause() 150 else: 151 logging.warning( 152 T( 153 'Aborted job "%s" because of encrypted RAR file (if supplied, all passwords were tried)' 154 ), 155 nzo.final_name, 156 ) 157 nzo.fail_msg = T("Aborted, encryption detected") 158 sabnzbd.NzbQueue.end_job(nzo) 159 160 if unwanted_file: 161 # Don't repeat the warning after a user override of an unwanted extension pause 162 if nzo.unwanted_ext == 0: 163 logging.warning( 164 T('In "%s" unwanted extension in RAR file. Unwanted file is %s '), 165 nzo.final_name, 166 unwanted_file, 167 ) 168 logging.debug(T("Unwanted extension is in rar file %s"), filepath) 169 if cfg.action_on_unwanted_extensions() == 1 and nzo.unwanted_ext == 0: 170 logging.debug("Unwanted extension ... pausing") 171 nzo.unwanted_ext = 1 172 nzo.pause() 173 if cfg.action_on_unwanted_extensions() == 2: 174 logging.debug("Unwanted extension ... aborting") 175 nzo.fail_msg = T("Aborted, unwanted extension detected") 176 sabnzbd.NzbQueue.end_job(nzo) 177 178 # Add to direct unpack 179 nzo.add_to_direct_unpacker(nzf) 180 181 elif par2file.is_parfile(filepath): 182 # Parse par2 files, cloaked or not 183 nzo.handle_par2(nzf, filepath) 184 185 filter_output, reason = nzo_filtered_by_rating(nzo) 186 if filter_output == 1: 187 logging.warning( 188 T('Paused job "%s" because of rating (%s)'), 189 nzo.final_name, 190 reason, 191 ) 192 nzo.pause() 193 elif filter_output == 2: 194 logging.warning( 195 T('Aborted job "%s" because of rating (%s)'), 196 nzo.final_name, 197 reason, 198 ) 199 nzo.fail_msg = T("Aborted, rating filter matched (%s)") % reason 200 sabnzbd.NzbQueue.end_job(nzo) 201 202 else: 203 sabnzbd.NzbQueue.remove(nzo.nzo_id, cleanup=False) 204 sabnzbd.PostProcessor.process(nzo) 205 206 @staticmethod 207 def assemble(nzf: NzbFile, file_done: bool): 208 """Assemble a NZF from its table of articles 209 1) Partial write: write what we have 210 2) Nothing written before: write all 211 """ 212 # New hash-object needed? 213 if not nzf.md5: 214 nzf.md5 = hashlib.md5() 215 216 with open(nzf.filepath, "ab") as fout: 217 for article in nzf.decodetable: 218 # Break if deleted during writing 219 if nzf.nzo.status is Status.DELETED: 220 break 221 222 # Skip already written articles 223 if article.on_disk: 224 continue 225 226 # Write all decoded articles 227 if article.decoded: 228 data = sabnzbd.ArticleCache.load_article(article) 229 # Could be empty in case nzo was deleted 230 if data: 231 fout.write(data) 232 nzf.md5.update(data) 233 article.on_disk = True 234 else: 235 logging.info("No data found when trying to write %s", article) 236 else: 237 # If the article was not decoded but the file 238 # is done, it is just a missing piece, so keep writing 239 if file_done: 240 continue 241 else: 242 # We reach an article that was not decoded 243 break 244 245 # Final steps 246 if file_done: 247 set_permissions(nzf.filepath) 248 nzf.md5sum = nzf.md5.digest() 249 250 251def file_has_articles(nzf: NzbFile): 252 """Do a quick check to see if any articles are present for this file. 253 Destructive: only to be used to differentiate between unknown encoding and no articles. 254 """ 255 has = False 256 for article in nzf.decodetable: 257 sleep(0.01) 258 data = sabnzbd.ArticleCache.load_article(article) 259 if data: 260 has = True 261 return has 262 263 264RE_SUBS = re.compile(r"\W+sub|subs|subpack|subtitle|subtitles(?![a-z])", re.I) 265SAFE_EXTS = (".mkv", ".mp4", ".avi", ".wmv", ".mpg", ".webm") 266 267 268def is_cloaked(nzo: NzbObject, path: str, names: List[str]) -> bool: 269 """Return True if this is likely to be a cloaked encrypted post""" 270 fname = os.path.splitext(get_filename(path.lower()))[0] 271 for name in names: 272 name = get_filename(name.lower()) 273 name, ext = os.path.splitext(name) 274 if ( 275 ext == ".rar" 276 and fname.startswith(name) 277 and (len(fname) - len(name)) < 8 278 and len(names) < 3 279 and not RE_SUBS.search(fname) 280 ): 281 # Only warn once 282 if nzo.encrypted == 0: 283 logging.warning( 284 T('Job "%s" is probably encrypted due to RAR with same name inside this RAR'), nzo.final_name 285 ) 286 nzo.encrypted = 1 287 return True 288 elif "password" in name and ext not in SAFE_EXTS: 289 # Only warn once 290 if nzo.encrypted == 0: 291 logging.warning(T('Job "%s" is probably encrypted: "password" in filename "%s"'), nzo.final_name, name) 292 nzo.encrypted = 1 293 return True 294 return False 295 296 297def check_encrypted_and_unwanted_files(nzo: NzbObject, filepath: str) -> Tuple[bool, Optional[str]]: 298 """Combines check for unwanted and encrypted files to save on CPU and IO""" 299 encrypted = False 300 unwanted = None 301 302 if (cfg.unwanted_extensions() and cfg.action_on_unwanted_extensions()) or ( 303 nzo.encrypted == 0 and cfg.pause_on_pwrar() 304 ): 305 # These checks should not break the assembler 306 try: 307 # Rarfile freezes on Windows special names, so don't try those! 308 if sabnzbd.WIN32 and has_win_device(filepath): 309 return encrypted, unwanted 310 311 # Is it even a rarfile? 312 if rarfile.is_rarfile(filepath): 313 # Open the rar 314 rarfile.UNRAR_TOOL = sabnzbd.newsunpack.RAR_COMMAND 315 zf = rarfile.RarFile(filepath, single_file_check=True) 316 317 # Check for encryption 318 if ( 319 nzo.encrypted == 0 320 and cfg.pause_on_pwrar() 321 and (zf.needs_password() or is_cloaked(nzo, filepath, zf.namelist())) 322 ): 323 # Load all passwords 324 passwords = get_all_passwords(nzo) 325 326 # Cloaked job? 327 if is_cloaked(nzo, filepath, zf.namelist()): 328 encrypted = True 329 elif not passwords: 330 # Only error when no password was set 331 nzo.encrypted = 1 332 encrypted = True 333 else: 334 # Lets test if any of the password work 335 password_hit = False 336 337 for password in passwords: 338 if password: 339 logging.info('Trying password "%s" on job "%s"', password, nzo.final_name) 340 try: 341 zf.setpassword(password) 342 except rarfile.Error: 343 # On weird passwords the setpassword() will fail 344 # but the actual testrar() will work 345 pass 346 try: 347 zf.testrar() 348 password_hit = password 349 break 350 except rarfile.RarWrongPassword: 351 # This one really didn't work 352 pass 353 except rarfile.RarCRCError as e: 354 # CRC errors can be thrown for wrong password or 355 # missing the next volume (with correct password) 356 if match_str(str(e), ("cannot find volume", "unexpected end of archive")): 357 # We assume this one worked! 358 password_hit = password 359 break 360 # This one didn't work 361 pass 362 except: 363 # All the other errors we skip, they might be fixable in post-proc. 364 # For example starting from the wrong volume, or damaged files 365 # This will cause the check to be performed again for the next rar, might 366 # be disk-intensive! Could be removed later and just accept the password. 367 return encrypted, unwanted 368 369 # Did any work? 370 if password_hit: 371 # We always trust the user's input 372 if not nzo.password: 373 nzo.password = password_hit 374 # Don't check other files 375 logging.info('Password "%s" matches for job "%s"', password_hit, nzo.final_name) 376 nzo.encrypted = -1 377 encrypted = False 378 else: 379 # Encrypted and none of them worked 380 nzo.encrypted = 1 381 encrypted = True 382 383 # Check for unwanted extensions 384 if cfg.unwanted_extensions() and cfg.action_on_unwanted_extensions(): 385 for somefile in zf.namelist(): 386 logging.debug("File contains: %s", somefile) 387 if has_unwanted_extension(somefile): 388 logging.debug("Unwanted file %s", somefile) 389 unwanted = somefile 390 zf.close() 391 del zf 392 except: 393 logging.info("Error during inspection of RAR-file %s", filepath) 394 logging.debug("Traceback: ", exc_info=True) 395 396 return encrypted, unwanted 397 398 399def nzo_filtered_by_rating(nzo: NzbObject) -> Tuple[int, str]: 400 if cfg.rating_enable() and cfg.rating_filter_enable() and (nzo.rating_filtered < 2): 401 rating = sabnzbd.Rating.get_rating_by_nzo(nzo.nzo_id) 402 if rating is not None: 403 nzo.rating_filtered = 1 404 reason = rating_filtered(rating, nzo.filename.lower(), True) 405 if reason is not None: 406 return 2, reason 407 reason = rating_filtered(rating, nzo.filename.lower(), False) 408 if reason is not None: 409 return 1, reason 410 return 0, "" 411 412 413def rating_filtered(rating, filename, abort): 414 def check_keyword(keyword): 415 clean_keyword = keyword.strip().lower() 416 return (len(clean_keyword) > 0) and (clean_keyword in filename) 417 418 audio = cfg.rating_filter_abort_audio() if abort else cfg.rating_filter_pause_audio() 419 video = cfg.rating_filter_abort_video() if abort else cfg.rating_filter_pause_video() 420 spam = cfg.rating_filter_abort_spam() if abort else cfg.rating_filter_pause_spam() 421 spam_confirm = cfg.rating_filter_abort_spam_confirm() if abort else cfg.rating_filter_pause_spam_confirm() 422 encrypted = cfg.rating_filter_abort_encrypted() if abort else cfg.rating_filter_pause_encrypted() 423 encrypted_confirm = ( 424 cfg.rating_filter_abort_encrypted_confirm() if abort else cfg.rating_filter_pause_encrypted_confirm() 425 ) 426 downvoted = cfg.rating_filter_abort_downvoted() if abort else cfg.rating_filter_pause_downvoted() 427 keywords = cfg.rating_filter_abort_keywords() if abort else cfg.rating_filter_pause_keywords() 428 if (video > 0) and (rating.avg_video > 0) and (rating.avg_video <= video): 429 return T("video") 430 if (audio > 0) and (rating.avg_audio > 0) and (rating.avg_audio <= audio): 431 return T("audio") 432 if (spam and ((rating.avg_spam_cnt > 0) or rating.avg_encrypted_confirm)) or ( 433 spam_confirm and rating.avg_spam_confirm 434 ): 435 return T("spam") 436 if (encrypted and ((rating.avg_encrypted_cnt > 0) or rating.avg_encrypted_confirm)) or ( 437 encrypted_confirm and rating.avg_encrypted_confirm 438 ): 439 return T("passworded") 440 if downvoted and (rating.avg_vote_up < rating.avg_vote_down): 441 return T("downvoted") 442 if any(check_keyword(k) for k in keywords.split(",")): 443 return T("keywords") 444 return None 445