1#!/usr/bin/python3 -OO 2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org> 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU General Public License 6# as published by the Free Software Foundation; either version 2 7# of the License, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 18""" 19sabnzbd.directunpacker 20""" 21 22import os 23import re 24import subprocess 25import time 26import threading 27import logging 28from typing import Optional 29 30import sabnzbd 31import sabnzbd.cfg as cfg 32from sabnzbd.misc import int_conv, format_time_string, build_and_run_command 33from sabnzbd.filesystem import long_path, remove_all, real_path, remove_file 34from sabnzbd.nzbstuff import NzbObject, NzbFile 35from sabnzbd.encoding import platform_btou 36from sabnzbd.decorators import synchronized 37from sabnzbd.newsunpack import EXTRACTFROM_RE, EXTRACTED_RE, rar_volumelist 38from sabnzbd.postproc import prepare_extraction_path 39from sabnzbd.utils.rarfile import RarFile 40from sabnzbd.utils.diskspeed import diskspeedmeasure 41 42# Need a lock to make sure start and stop is handled correctly 43# Otherwise we could stop while the thread was still starting 44START_STOP_LOCK = threading.RLock() 45 46ACTIVE_UNPACKERS = [] 47 48RAR_NR = re.compile(r"(.*?)(\.part(\d*).rar|\.r(\d*))$", re.IGNORECASE) 49 50 51class DirectUnpacker(threading.Thread): 52 def __init__(self, nzo: NzbObject): 53 super().__init__() 54 55 self.nzo: NzbObject = nzo 56 self.active_instance: Optional[subprocess.Popen] = None 57 self.killed = False 58 self.next_file_lock = threading.Condition(threading.RLock()) 59 60 self.unpack_dir_info = None 61 self.rarfile_nzf: Optional[NzbFile] = None 62 self.cur_setname = None 63 self.cur_volume = 0 64 self.total_volumes = {} 65 self.unpack_time = 0.0 66 67 self.success_sets = {} 68 self.next_sets = [] 69 70 self.duplicate_lines = 0 71 72 nzo.direct_unpacker = self 73 74 def stop(self): 75 pass 76 77 def save(self): 78 pass 79 80 def reset_active(self): 81 # make sure the process and file handlers are closed nicely: 82 try: 83 if self.active_instance: 84 self.active_instance.stdout.close() 85 self.active_instance.stdin.close() 86 self.active_instance.wait(timeout=2) 87 except: 88 logging.debug("Exception in reset_active()", exc_info=True) 89 pass 90 91 self.active_instance = None 92 self.cur_setname = None 93 self.cur_volume = 0 94 self.rarfile_nzf = None 95 96 def check_requirements(self): 97 if ( 98 not cfg.direct_unpack() 99 or self.killed 100 or self.nzo.first_articles 101 or not self.nzo.unpack 102 or self.nzo.bad_articles 103 or sabnzbd.newsunpack.RAR_PROBLEM 104 ): 105 return False 106 return True 107 108 def set_volumes_for_nzo(self): 109 """Loop over all files to detect the names""" 110 none_counter = 0 111 found_counter = 0 112 for nzf in self.nzo.files + self.nzo.finished_files: 113 nzf.setname, nzf.vol = analyze_rar_filename(nzf.filename) 114 # We matched? 115 if nzf.setname: 116 found_counter += 1 117 if nzf.setname not in self.total_volumes: 118 self.total_volumes[nzf.setname] = 0 119 self.total_volumes[nzf.setname] = max(self.total_volumes[nzf.setname], nzf.vol) 120 else: 121 none_counter += 1 122 123 # Too much not found? Obfuscated, ignore results 124 if none_counter > found_counter: 125 self.total_volumes = {} 126 127 @synchronized(START_STOP_LOCK) 128 def add(self, nzf: NzbFile): 129 """Add jobs and start instance of DirectUnpack""" 130 if not cfg.direct_unpack_tested(): 131 test_disk_performance() 132 133 # Stop if something is wrong or we shouldn't start yet 134 if not self.check_requirements(): 135 return 136 137 # Is this the first set? 138 if not self.cur_setname: 139 self.set_volumes_for_nzo() 140 self.cur_setname = nzf.setname 141 142 # Analyze updated filenames 143 nzf.setname, nzf.vol = analyze_rar_filename(nzf.filename) 144 145 # Are we doing this set? 146 if self.cur_setname and self.cur_setname == nzf.setname: 147 logging.debug("DirectUnpack queued %s for %s", nzf.filename, self.cur_setname) 148 # Is this the first one of the first set? 149 if not self.active_instance and not self.is_alive() and self.have_next_volume(): 150 # Too many runners already? 151 if len(ACTIVE_UNPACKERS) >= cfg.direct_unpack_threads(): 152 logging.info("Too many DirectUnpackers currently to start %s", self.cur_setname) 153 return 154 155 # Start the unrar command and the loop 156 self.create_unrar_instance() 157 self.start() 158 elif not any(test_nzf.setname == nzf.setname for test_nzf in self.next_sets): 159 # Need to store this for the future, only once per set! 160 self.next_sets.append(nzf) 161 162 # Wake up the thread to see if this is good to go 163 with self.next_file_lock: 164 self.next_file_lock.notify() 165 166 def run(self): 167 # Input and output 168 linebuf = b"" 169 last_volume_linebuf = b"" 170 unrar_log = [] 171 rarfiles = [] 172 extracted = [] 173 start_time = time.time() 174 175 # Need to read char-by-char because there's no newline after new-disk message 176 while 1: 177 # We need to lock, so we don't crash if unpacker is deleted while we read 178 with START_STOP_LOCK: 179 if not self.active_instance or not self.active_instance.stdout: 180 break 181 char = self.active_instance.stdout.read(1) 182 183 if not char: 184 # End of program 185 break 186 linebuf += char 187 188 # Continue if it's not a space or end of line 189 if char not in (b" ", b"\n"): 190 continue 191 192 # Handle whole lines 193 if char == b"\n": 194 # When reaching end-of-line, we can safely convert and add to the log 195 linebuf_encoded = platform_btou(linebuf.strip()) 196 unrar_log.append(linebuf_encoded) 197 linebuf = b"" 198 199 # Error? Let PP-handle this job 200 if any( 201 error_text in linebuf_encoded 202 for error_text in ( 203 "ERROR: ", 204 "Cannot create", 205 "in the encrypted file", 206 "CRC failed", 207 "checksum failed", 208 "You need to start extraction from a previous volume", 209 "password is incorrect", 210 "Incorrect password", 211 "Write error", 212 "checksum error", 213 "Cannot open", 214 "start extraction from a previous volume", 215 "Unexpected end of archive", 216 ) 217 ): 218 logging.info("Error in DirectUnpack of %s: %s", self.cur_setname, platform_btou(linebuf.strip())) 219 self.abort() 220 221 elif linebuf_encoded.startswith("All OK"): 222 # Did we reach the end? 223 # Stop timer and finish 224 self.unpack_time += time.time() - start_time 225 ACTIVE_UNPACKERS.remove(self) 226 227 # Add to success 228 rarfile_path = os.path.join(self.nzo.download_path, self.rarfile_nzf.filename) 229 self.success_sets[self.cur_setname] = ( 230 rar_volumelist(rarfile_path, self.nzo.password, rarfiles), 231 extracted, 232 ) 233 logging.info("DirectUnpack completed for %s", self.cur_setname) 234 self.nzo.set_action_line(T("Direct Unpack"), T("Completed")) 235 236 # List success in history-info 237 msg = T("Unpacked %s files/folders in %s") % (len(extracted), format_time_string(self.unpack_time)) 238 msg = "%s - %s" % (T("Direct Unpack"), msg) 239 self.nzo.set_unpack_info("Unpack", msg, self.cur_setname) 240 241 # Write current log and clear 242 logging.debug("DirectUnpack Unrar output %s", "\n".join(unrar_log)) 243 unrar_log = [] 244 rarfiles = [] 245 extracted = [] 246 247 # Are there more files left? 248 while self.nzo.files and not self.next_sets: 249 with self.next_file_lock: 250 self.next_file_lock.wait() 251 252 # Is there another set to do? 253 if self.next_sets: 254 # Start new instance 255 nzf = self.next_sets.pop(0) 256 self.reset_active() 257 self.cur_setname = nzf.setname 258 # Wait for the 1st volume to appear 259 self.wait_for_next_volume() 260 self.create_unrar_instance() 261 start_time = time.time() 262 else: 263 self.killed = True 264 break 265 266 elif linebuf_encoded.startswith("Extracting from"): 267 # List files we used 268 filename = re.search(EXTRACTFROM_RE, linebuf_encoded).group(1) 269 if filename not in rarfiles: 270 rarfiles.append(filename) 271 else: 272 # List files we extracted 273 m = re.search(EXTRACTED_RE, linebuf_encoded) 274 if m: 275 # In case of flat-unpack, UnRar still prints the whole path (?!) 276 unpacked_file = m.group(2) 277 if cfg.flat_unpack(): 278 unpacked_file = os.path.basename(unpacked_file) 279 extracted.append(real_path(self.unpack_dir_info[0], unpacked_file)) 280 281 if linebuf.endswith(b"[C]ontinue, [Q]uit "): 282 # Stop timer 283 self.unpack_time += time.time() - start_time 284 285 # Wait for the next one.. 286 self.wait_for_next_volume() 287 288 # Possible that the instance was deleted while locked 289 if not self.killed: 290 # If unrar stopped or is killed somehow, writing will cause a crash 291 try: 292 # Give unrar some time to do it's thing 293 self.active_instance.stdin.write(b"C\n") 294 start_time = time.time() 295 time.sleep(0.1) 296 except IOError: 297 self.abort() 298 break 299 300 # Did we unpack a new volume? Sometimes UnRar hangs on 1 volume 301 if not last_volume_linebuf or last_volume_linebuf != linebuf: 302 # Next volume 303 self.cur_volume += 1 304 self.nzo.set_action_line(T("Direct Unpack"), self.get_formatted_stats()) 305 logging.info("DirectUnpacked volume %s for %s", self.cur_volume, self.cur_setname) 306 307 # If lines did not change and we don't have the next volume, this download is missing files! 308 # In rare occasions we can get stuck forever with repeating lines 309 if last_volume_linebuf == linebuf: 310 if not self.have_next_volume() or self.duplicate_lines > 10: 311 logging.info("DirectUnpack failed due to missing files %s", self.cur_setname) 312 self.abort() 313 else: 314 logging.debug('Duplicate output line detected: "%s"', platform_btou(last_volume_linebuf)) 315 self.duplicate_lines += 1 316 else: 317 self.duplicate_lines = 0 318 last_volume_linebuf = linebuf 319 320 # Add last line and write any new output 321 if linebuf: 322 unrar_log.append(platform_btou(linebuf.strip())) 323 logging.debug("DirectUnpack Unrar output %s", "\n".join(unrar_log)) 324 325 # Make more space 326 self.reset_active() 327 if self in ACTIVE_UNPACKERS: 328 ACTIVE_UNPACKERS.remove(self) 329 330 # Set the thread to killed so it never gets restarted by accident 331 self.killed = True 332 333 def have_next_volume(self): 334 """Check if next volume of set is available, start 335 from the end of the list where latest completed files are 336 Make sure that files are 100% written to disk by checking md5sum 337 """ 338 for nzf_search in reversed(self.nzo.finished_files): 339 if nzf_search.setname == self.cur_setname and nzf_search.vol == (self.cur_volume + 1) and nzf_search.md5sum: 340 return nzf_search 341 return False 342 343 def wait_for_next_volume(self): 344 """Wait for the correct volume to appear 345 But stop if it was killed or the NZB is done 346 """ 347 while not self.have_next_volume() and not self.killed and self.nzo.files: 348 with self.next_file_lock: 349 self.next_file_lock.wait() 350 351 @synchronized(START_STOP_LOCK) 352 def create_unrar_instance(self): 353 """Start the unrar instance using the user's options""" 354 # Generate extraction path and save for post-proc 355 if not self.unpack_dir_info: 356 try: 357 self.unpack_dir_info = prepare_extraction_path(self.nzo) 358 except: 359 # Prevent fatal crash if directory creation fails 360 self.abort() 361 return 362 363 # Get the information 364 extraction_path, _, _, one_folder, _ = self.unpack_dir_info 365 366 # Set options 367 if self.nzo.password: 368 password_command = "-p%s" % self.nzo.password 369 else: 370 password_command = "-p-" 371 372 if one_folder or cfg.flat_unpack(): 373 action = "e" 374 else: 375 action = "x" 376 377 # The first NZF 378 self.rarfile_nzf = self.have_next_volume() 379 380 # Ignore if maybe this set is not there any more 381 # This can happen due to race/timing issues when creating the sets 382 if not self.rarfile_nzf: 383 return 384 385 # Generate command 386 rarfile_path = os.path.join(self.nzo.download_path, self.rarfile_nzf.filename) 387 if sabnzbd.WIN32: 388 # For Unrar to support long-path, we need to circumvent Python's list2cmdline 389 # See: https://github.com/sabnzbd/sabnzbd/issues/1043 390 # The -scf forces the output to be UTF8 391 command = [ 392 "%s" % sabnzbd.newsunpack.RAR_COMMAND, 393 action, 394 "-vp", 395 "-idp", 396 "-scf", 397 "-o+", 398 "-ai", 399 password_command, 400 rarfile_path, 401 "%s\\" % long_path(extraction_path), 402 ] 403 else: 404 # Don't use "-ai" (not needed for non-Windows) 405 # The -scf forces the output to be UTF8 406 command = [ 407 "%s" % sabnzbd.newsunpack.RAR_COMMAND, 408 action, 409 "-vp", 410 "-idp", 411 "-scf", 412 "-o+", 413 password_command, 414 "%s" % rarfile_path, 415 "%s/" % extraction_path, 416 ] 417 418 if cfg.ignore_unrar_dates(): 419 command.insert(3, "-tsm-") 420 421 # Let's start from the first one! 422 self.cur_volume = 1 423 424 # Need to disable buffer to have direct feedback 425 self.active_instance = build_and_run_command(command, flatten_command=True, bufsize=0) 426 427 # Add to runners 428 ACTIVE_UNPACKERS.append(self) 429 430 # Doing the first 431 logging.info("DirectUnpacked volume %s for %s", self.cur_volume, self.cur_setname) 432 433 @synchronized(START_STOP_LOCK) 434 def abort(self): 435 """Abort running instance and delete generated files""" 436 if not self.killed and self.cur_setname: 437 logging.info("Aborting DirectUnpack for %s", self.cur_setname) 438 self.killed = True 439 440 # Save reference to the first rarfile 441 rarfile_nzf = self.rarfile_nzf 442 443 # Abort Unrar 444 if self.active_instance: 445 # First we try to abort gracefully 446 try: 447 self.active_instance.stdin.write(b"Q\n") 448 time.sleep(0.2) 449 except IOError: 450 pass 451 452 # Now force kill and give it a bit of time 453 try: 454 self.active_instance.kill() 455 time.sleep(0.2) 456 except AttributeError: 457 # Already killed by the Quit command 458 pass 459 460 # Wake up the thread 461 with self.next_file_lock: 462 self.next_file_lock.notify() 463 464 # No new sets 465 self.next_sets = [] 466 self.success_sets = {} 467 468 # Remove files 469 if self.unpack_dir_info: 470 extraction_path, _, _, one_folder, _ = self.unpack_dir_info 471 # In case of flat-unpack we need to remove the files manually 472 if one_folder: 473 # RarFile can fail for mysterious reasons 474 try: 475 rar_contents = RarFile( 476 os.path.join(self.nzo.download_path, rarfile_nzf.filename), single_file_check=True 477 ).filelist() 478 for rm_file in rar_contents: 479 # Flat-unpack, so remove foldername from RarFile output 480 f = os.path.join(extraction_path, os.path.basename(rm_file)) 481 remove_file(f) 482 except: 483 # The user will have to remove it themselves 484 logging.info( 485 "Failed to clean Direct Unpack after aborting %s", rarfile_nzf.filename, exc_info=True 486 ) 487 else: 488 # We can just remove the whole path 489 remove_all(extraction_path, recursive=True) 490 # Remove dir-info 491 self.unpack_dir_info = None 492 493 # Reset settings 494 self.reset_active() 495 496 def get_formatted_stats(self): 497 """Get percentage or number of rar's done""" 498 if self.cur_setname and self.cur_setname in self.total_volumes: 499 # This won't work on obfuscated posts 500 if self.total_volumes[self.cur_setname] >= self.cur_volume and self.cur_volume: 501 return "%02d/%02d" % (self.cur_volume, self.total_volumes[self.cur_setname]) 502 return self.cur_volume 503 504 505def analyze_rar_filename(filename): 506 """Extract volume number and setname from rar-filenames 507 Both ".part01.rar" and ".r01" 508 """ 509 m = RAR_NR.search(filename) 510 if m: 511 if m.group(4): 512 # Special since starts with ".rar", ".r00" 513 return m.group(1), int_conv(m.group(4)) + 2 514 return m.group(1), int_conv(m.group(3)) 515 else: 516 # Detect if first of "rxx" set 517 if filename.endswith(".rar"): 518 return os.path.splitext(filename)[0], 1 519 return None, None 520 521 522def abort_all(): 523 """Abort all running DirectUnpackers""" 524 logging.info("Aborting all DirectUnpackers") 525 for direct_unpacker in ACTIVE_UNPACKERS: 526 direct_unpacker.abort() 527 528 529def test_disk_performance(): 530 """Test the incomplete-dir performance and enable 531 Direct Unpack if good enough (> 40MB/s) 532 """ 533 if diskspeedmeasure(sabnzbd.cfg.download_dir.get_path()) > 40: 534 cfg.direct_unpack.set(True) 535 logging.warning( 536 T("Direct Unpack was automatically enabled.") 537 + " " 538 + T( 539 "Jobs will start unpacking during the downloading to reduce post-processing time. Only works for jobs that do not need repair." 540 ) 541 ) 542 else: 543 logging.info("Direct Unpack was not enabled, incomplete folder disk speed below 40MB/s") 544 cfg.direct_unpack_tested.set(True) 545 sabnzbd.config.save_config() 546