1#!/usr/bin/python3 -OO
2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; either version 2
7# of the License, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17
18"""
19sabnzbd.directunpacker
20"""
21
22import os
23import re
24import subprocess
25import time
26import threading
27import logging
28from typing import Optional
29
30import sabnzbd
31import sabnzbd.cfg as cfg
32from sabnzbd.misc import int_conv, format_time_string, build_and_run_command
33from sabnzbd.filesystem import long_path, remove_all, real_path, remove_file
34from sabnzbd.nzbstuff import NzbObject, NzbFile
35from sabnzbd.encoding import platform_btou
36from sabnzbd.decorators import synchronized
37from sabnzbd.newsunpack import EXTRACTFROM_RE, EXTRACTED_RE, rar_volumelist
38from sabnzbd.postproc import prepare_extraction_path
39from sabnzbd.utils.rarfile import RarFile
40from sabnzbd.utils.diskspeed import diskspeedmeasure
41
42# Need a lock to make sure start and stop is handled correctly
43# Otherwise we could stop while the thread was still starting
44START_STOP_LOCK = threading.RLock()
45
46ACTIVE_UNPACKERS = []
47
48RAR_NR = re.compile(r"(.*?)(\.part(\d*).rar|\.r(\d*))$", re.IGNORECASE)
49
50
51class DirectUnpacker(threading.Thread):
52    def __init__(self, nzo: NzbObject):
53        super().__init__()
54
55        self.nzo: NzbObject = nzo
56        self.active_instance: Optional[subprocess.Popen] = None
57        self.killed = False
58        self.next_file_lock = threading.Condition(threading.RLock())
59
60        self.unpack_dir_info = None
61        self.rarfile_nzf: Optional[NzbFile] = None
62        self.cur_setname = None
63        self.cur_volume = 0
64        self.total_volumes = {}
65        self.unpack_time = 0.0
66
67        self.success_sets = {}
68        self.next_sets = []
69
70        self.duplicate_lines = 0
71
72        nzo.direct_unpacker = self
73
74    def stop(self):
75        pass
76
77    def save(self):
78        pass
79
80    def reset_active(self):
81        # make sure the process and file handlers are closed nicely:
82        try:
83            if self.active_instance:
84                self.active_instance.stdout.close()
85                self.active_instance.stdin.close()
86                self.active_instance.wait(timeout=2)
87        except:
88            logging.debug("Exception in reset_active()", exc_info=True)
89            pass
90
91        self.active_instance = None
92        self.cur_setname = None
93        self.cur_volume = 0
94        self.rarfile_nzf = None
95
96    def check_requirements(self):
97        if (
98            not cfg.direct_unpack()
99            or self.killed
100            or self.nzo.first_articles
101            or not self.nzo.unpack
102            or self.nzo.bad_articles
103            or sabnzbd.newsunpack.RAR_PROBLEM
104        ):
105            return False
106        return True
107
108    def set_volumes_for_nzo(self):
109        """Loop over all files to detect the names"""
110        none_counter = 0
111        found_counter = 0
112        for nzf in self.nzo.files + self.nzo.finished_files:
113            nzf.setname, nzf.vol = analyze_rar_filename(nzf.filename)
114            # We matched?
115            if nzf.setname:
116                found_counter += 1
117                if nzf.setname not in self.total_volumes:
118                    self.total_volumes[nzf.setname] = 0
119                self.total_volumes[nzf.setname] = max(self.total_volumes[nzf.setname], nzf.vol)
120            else:
121                none_counter += 1
122
123        # Too much not found? Obfuscated, ignore results
124        if none_counter > found_counter:
125            self.total_volumes = {}
126
127    @synchronized(START_STOP_LOCK)
128    def add(self, nzf: NzbFile):
129        """Add jobs and start instance of DirectUnpack"""
130        if not cfg.direct_unpack_tested():
131            test_disk_performance()
132
133        # Stop if something is wrong or we shouldn't start yet
134        if not self.check_requirements():
135            return
136
137        # Is this the first set?
138        if not self.cur_setname:
139            self.set_volumes_for_nzo()
140            self.cur_setname = nzf.setname
141
142        # Analyze updated filenames
143        nzf.setname, nzf.vol = analyze_rar_filename(nzf.filename)
144
145        # Are we doing this set?
146        if self.cur_setname and self.cur_setname == nzf.setname:
147            logging.debug("DirectUnpack queued %s for %s", nzf.filename, self.cur_setname)
148            # Is this the first one of the first set?
149            if not self.active_instance and not self.is_alive() and self.have_next_volume():
150                # Too many runners already?
151                if len(ACTIVE_UNPACKERS) >= cfg.direct_unpack_threads():
152                    logging.info("Too many DirectUnpackers currently to start %s", self.cur_setname)
153                    return
154
155                # Start the unrar command and the loop
156                self.create_unrar_instance()
157                self.start()
158        elif not any(test_nzf.setname == nzf.setname for test_nzf in self.next_sets):
159            # Need to store this for the future, only once per set!
160            self.next_sets.append(nzf)
161
162        # Wake up the thread to see if this is good to go
163        with self.next_file_lock:
164            self.next_file_lock.notify()
165
166    def run(self):
167        # Input and output
168        linebuf = b""
169        last_volume_linebuf = b""
170        unrar_log = []
171        rarfiles = []
172        extracted = []
173        start_time = time.time()
174
175        # Need to read char-by-char because there's no newline after new-disk message
176        while 1:
177            # We need to lock, so we don't crash if unpacker is deleted while we read
178            with START_STOP_LOCK:
179                if not self.active_instance or not self.active_instance.stdout:
180                    break
181                char = self.active_instance.stdout.read(1)
182
183            if not char:
184                # End of program
185                break
186            linebuf += char
187
188            # Continue if it's not a space or end of line
189            if char not in (b" ", b"\n"):
190                continue
191
192            # Handle whole lines
193            if char == b"\n":
194                # When reaching end-of-line, we can safely convert and add to the log
195                linebuf_encoded = platform_btou(linebuf.strip())
196                unrar_log.append(linebuf_encoded)
197                linebuf = b""
198
199                # Error? Let PP-handle this job
200                if any(
201                    error_text in linebuf_encoded
202                    for error_text in (
203                        "ERROR: ",
204                        "Cannot create",
205                        "in the encrypted file",
206                        "CRC failed",
207                        "checksum failed",
208                        "You need to start extraction from a previous volume",
209                        "password is incorrect",
210                        "Incorrect password",
211                        "Write error",
212                        "checksum error",
213                        "Cannot open",
214                        "start extraction from a previous volume",
215                        "Unexpected end of archive",
216                    )
217                ):
218                    logging.info("Error in DirectUnpack of %s: %s", self.cur_setname, platform_btou(linebuf.strip()))
219                    self.abort()
220
221                elif linebuf_encoded.startswith("All OK"):
222                    # Did we reach the end?
223                    # Stop timer and finish
224                    self.unpack_time += time.time() - start_time
225                    ACTIVE_UNPACKERS.remove(self)
226
227                    # Add to success
228                    rarfile_path = os.path.join(self.nzo.download_path, self.rarfile_nzf.filename)
229                    self.success_sets[self.cur_setname] = (
230                        rar_volumelist(rarfile_path, self.nzo.password, rarfiles),
231                        extracted,
232                    )
233                    logging.info("DirectUnpack completed for %s", self.cur_setname)
234                    self.nzo.set_action_line(T("Direct Unpack"), T("Completed"))
235
236                    # List success in history-info
237                    msg = T("Unpacked %s files/folders in %s") % (len(extracted), format_time_string(self.unpack_time))
238                    msg = "%s - %s" % (T("Direct Unpack"), msg)
239                    self.nzo.set_unpack_info("Unpack", msg, self.cur_setname)
240
241                    # Write current log and clear
242                    logging.debug("DirectUnpack Unrar output %s", "\n".join(unrar_log))
243                    unrar_log = []
244                    rarfiles = []
245                    extracted = []
246
247                    # Are there more files left?
248                    while self.nzo.files and not self.next_sets:
249                        with self.next_file_lock:
250                            self.next_file_lock.wait()
251
252                    # Is there another set to do?
253                    if self.next_sets:
254                        # Start new instance
255                        nzf = self.next_sets.pop(0)
256                        self.reset_active()
257                        self.cur_setname = nzf.setname
258                        # Wait for the 1st volume to appear
259                        self.wait_for_next_volume()
260                        self.create_unrar_instance()
261                        start_time = time.time()
262                    else:
263                        self.killed = True
264                        break
265
266                elif linebuf_encoded.startswith("Extracting from"):
267                    # List files we used
268                    filename = re.search(EXTRACTFROM_RE, linebuf_encoded).group(1)
269                    if filename not in rarfiles:
270                        rarfiles.append(filename)
271                else:
272                    # List files we extracted
273                    m = re.search(EXTRACTED_RE, linebuf_encoded)
274                    if m:
275                        # In case of flat-unpack, UnRar still prints the whole path (?!)
276                        unpacked_file = m.group(2)
277                        if cfg.flat_unpack():
278                            unpacked_file = os.path.basename(unpacked_file)
279                        extracted.append(real_path(self.unpack_dir_info[0], unpacked_file))
280
281            if linebuf.endswith(b"[C]ontinue, [Q]uit "):
282                # Stop timer
283                self.unpack_time += time.time() - start_time
284
285                # Wait for the next one..
286                self.wait_for_next_volume()
287
288                # Possible that the instance was deleted while locked
289                if not self.killed:
290                    # If unrar stopped or is killed somehow, writing will cause a crash
291                    try:
292                        # Give unrar some time to do it's thing
293                        self.active_instance.stdin.write(b"C\n")
294                        start_time = time.time()
295                        time.sleep(0.1)
296                    except IOError:
297                        self.abort()
298                        break
299
300                    # Did we unpack a new volume? Sometimes UnRar hangs on 1 volume
301                    if not last_volume_linebuf or last_volume_linebuf != linebuf:
302                        # Next volume
303                        self.cur_volume += 1
304                        self.nzo.set_action_line(T("Direct Unpack"), self.get_formatted_stats())
305                        logging.info("DirectUnpacked volume %s for %s", self.cur_volume, self.cur_setname)
306
307                    # If lines did not change and we don't have the next volume, this download is missing files!
308                    # In rare occasions we can get stuck forever with repeating lines
309                    if last_volume_linebuf == linebuf:
310                        if not self.have_next_volume() or self.duplicate_lines > 10:
311                            logging.info("DirectUnpack failed due to missing files %s", self.cur_setname)
312                            self.abort()
313                        else:
314                            logging.debug('Duplicate output line detected: "%s"', platform_btou(last_volume_linebuf))
315                            self.duplicate_lines += 1
316                    else:
317                        self.duplicate_lines = 0
318                    last_volume_linebuf = linebuf
319
320        # Add last line and write any new output
321        if linebuf:
322            unrar_log.append(platform_btou(linebuf.strip()))
323            logging.debug("DirectUnpack Unrar output %s", "\n".join(unrar_log))
324
325        # Make more space
326        self.reset_active()
327        if self in ACTIVE_UNPACKERS:
328            ACTIVE_UNPACKERS.remove(self)
329
330        # Set the thread to killed so it never gets restarted by accident
331        self.killed = True
332
333    def have_next_volume(self):
334        """Check if next volume of set is available, start
335        from the end of the list where latest completed files are
336        Make sure that files are 100% written to disk by checking md5sum
337        """
338        for nzf_search in reversed(self.nzo.finished_files):
339            if nzf_search.setname == self.cur_setname and nzf_search.vol == (self.cur_volume + 1) and nzf_search.md5sum:
340                return nzf_search
341        return False
342
343    def wait_for_next_volume(self):
344        """Wait for the correct volume to appear
345        But stop if it was killed or the NZB is done
346        """
347        while not self.have_next_volume() and not self.killed and self.nzo.files:
348            with self.next_file_lock:
349                self.next_file_lock.wait()
350
351    @synchronized(START_STOP_LOCK)
352    def create_unrar_instance(self):
353        """Start the unrar instance using the user's options"""
354        # Generate extraction path and save for post-proc
355        if not self.unpack_dir_info:
356            try:
357                self.unpack_dir_info = prepare_extraction_path(self.nzo)
358            except:
359                # Prevent fatal crash if directory creation fails
360                self.abort()
361                return
362
363        # Get the information
364        extraction_path, _, _, one_folder, _ = self.unpack_dir_info
365
366        # Set options
367        if self.nzo.password:
368            password_command = "-p%s" % self.nzo.password
369        else:
370            password_command = "-p-"
371
372        if one_folder or cfg.flat_unpack():
373            action = "e"
374        else:
375            action = "x"
376
377        # The first NZF
378        self.rarfile_nzf = self.have_next_volume()
379
380        # Ignore if maybe this set is not there any more
381        # This can happen due to race/timing issues when creating the sets
382        if not self.rarfile_nzf:
383            return
384
385        # Generate command
386        rarfile_path = os.path.join(self.nzo.download_path, self.rarfile_nzf.filename)
387        if sabnzbd.WIN32:
388            # For Unrar to support long-path, we need to circumvent Python's list2cmdline
389            # See: https://github.com/sabnzbd/sabnzbd/issues/1043
390            # The -scf forces the output to be UTF8
391            command = [
392                "%s" % sabnzbd.newsunpack.RAR_COMMAND,
393                action,
394                "-vp",
395                "-idp",
396                "-scf",
397                "-o+",
398                "-ai",
399                password_command,
400                rarfile_path,
401                "%s\\" % long_path(extraction_path),
402            ]
403        else:
404            # Don't use "-ai" (not needed for non-Windows)
405            # The -scf forces the output to be UTF8
406            command = [
407                "%s" % sabnzbd.newsunpack.RAR_COMMAND,
408                action,
409                "-vp",
410                "-idp",
411                "-scf",
412                "-o+",
413                password_command,
414                "%s" % rarfile_path,
415                "%s/" % extraction_path,
416            ]
417
418        if cfg.ignore_unrar_dates():
419            command.insert(3, "-tsm-")
420
421        # Let's start from the first one!
422        self.cur_volume = 1
423
424        # Need to disable buffer to have direct feedback
425        self.active_instance = build_and_run_command(command, flatten_command=True, bufsize=0)
426
427        # Add to runners
428        ACTIVE_UNPACKERS.append(self)
429
430        # Doing the first
431        logging.info("DirectUnpacked volume %s for %s", self.cur_volume, self.cur_setname)
432
433    @synchronized(START_STOP_LOCK)
434    def abort(self):
435        """Abort running instance and delete generated files"""
436        if not self.killed and self.cur_setname:
437            logging.info("Aborting DirectUnpack for %s", self.cur_setname)
438            self.killed = True
439
440            # Save reference to the first rarfile
441            rarfile_nzf = self.rarfile_nzf
442
443            # Abort Unrar
444            if self.active_instance:
445                # First we try to abort gracefully
446                try:
447                    self.active_instance.stdin.write(b"Q\n")
448                    time.sleep(0.2)
449                except IOError:
450                    pass
451
452                # Now force kill and give it a bit of time
453                try:
454                    self.active_instance.kill()
455                    time.sleep(0.2)
456                except AttributeError:
457                    # Already killed by the Quit command
458                    pass
459
460            # Wake up the thread
461            with self.next_file_lock:
462                self.next_file_lock.notify()
463
464            # No new sets
465            self.next_sets = []
466            self.success_sets = {}
467
468            # Remove files
469            if self.unpack_dir_info:
470                extraction_path, _, _, one_folder, _ = self.unpack_dir_info
471                # In case of flat-unpack we need to remove the files manually
472                if one_folder:
473                    # RarFile can fail for mysterious reasons
474                    try:
475                        rar_contents = RarFile(
476                            os.path.join(self.nzo.download_path, rarfile_nzf.filename), single_file_check=True
477                        ).filelist()
478                        for rm_file in rar_contents:
479                            # Flat-unpack, so remove foldername from RarFile output
480                            f = os.path.join(extraction_path, os.path.basename(rm_file))
481                            remove_file(f)
482                    except:
483                        # The user will have to remove it themselves
484                        logging.info(
485                            "Failed to clean Direct Unpack after aborting %s", rarfile_nzf.filename, exc_info=True
486                        )
487                else:
488                    # We can just remove the whole path
489                    remove_all(extraction_path, recursive=True)
490                # Remove dir-info
491                self.unpack_dir_info = None
492
493            # Reset settings
494            self.reset_active()
495
496    def get_formatted_stats(self):
497        """Get percentage or number of rar's done"""
498        if self.cur_setname and self.cur_setname in self.total_volumes:
499            # This won't work on obfuscated posts
500            if self.total_volumes[self.cur_setname] >= self.cur_volume and self.cur_volume:
501                return "%02d/%02d" % (self.cur_volume, self.total_volumes[self.cur_setname])
502        return self.cur_volume
503
504
505def analyze_rar_filename(filename):
506    """Extract volume number and setname from rar-filenames
507    Both ".part01.rar" and ".r01"
508    """
509    m = RAR_NR.search(filename)
510    if m:
511        if m.group(4):
512            # Special since starts with ".rar", ".r00"
513            return m.group(1), int_conv(m.group(4)) + 2
514        return m.group(1), int_conv(m.group(3))
515    else:
516        # Detect if first of "rxx" set
517        if filename.endswith(".rar"):
518            return os.path.splitext(filename)[0], 1
519    return None, None
520
521
522def abort_all():
523    """Abort all running DirectUnpackers"""
524    logging.info("Aborting all DirectUnpackers")
525    for direct_unpacker in ACTIVE_UNPACKERS:
526        direct_unpacker.abort()
527
528
529def test_disk_performance():
530    """Test the incomplete-dir performance and enable
531    Direct Unpack if good enough (> 40MB/s)
532    """
533    if diskspeedmeasure(sabnzbd.cfg.download_dir.get_path()) > 40:
534        cfg.direct_unpack.set(True)
535        logging.warning(
536            T("Direct Unpack was automatically enabled.")
537            + " "
538            + T(
539                "Jobs will start unpacking during the downloading to reduce post-processing time. Only works for jobs that do not need repair."
540            )
541        )
542    else:
543        logging.info("Direct Unpack was not enabled, incomplete folder disk speed below 40MB/s")
544    cfg.direct_unpack_tested.set(True)
545    sabnzbd.config.save_config()
546