1#!/usr/bin/python3 -OO
2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; either version 2
7# of the License, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17
18"""
19sabnzbd.decoder - article decoder
20"""
21
22import logging
23import hashlib
24import queue
25from threading import Thread
26from typing import Tuple, List, Optional
27
28import sabnzbd
29import sabnzbd.cfg as cfg
30from sabnzbd.constants import SABYENC_VERSION_REQUIRED
31from sabnzbd.nzbstuff import Article
32from sabnzbd.misc import match_str
33
34# Check for correct SABYenc version
35SABYENC_VERSION = None
36try:
37    import sabyenc3
38
39    SABYENC_ENABLED = True
40    SABYENC_VERSION = sabyenc3.__version__
41    # Verify version to at least match minor version
42    if SABYENC_VERSION[:3] != SABYENC_VERSION_REQUIRED[:3]:
43        raise ImportError
44except ImportError:
45    SABYENC_ENABLED = False
46
47
48class CrcError(Exception):
49    def __init__(self, needcrc: int, gotcrc: int, data: bytes):
50        super().__init__()
51        self.needcrc = needcrc
52        self.gotcrc = gotcrc
53        self.data = data
54
55
56class BadYenc(Exception):
57    def __init__(self):
58        super().__init__()
59
60
61class Decoder:
62    """Implement thread-like coordinator for the decoders"""
63
64    def __init__(self):
65        logging.debug("Initializing decoders")
66        # Initialize queue and servers
67        self.decoder_queue = queue.Queue()
68
69        # Initialize decoders
70        self.decoder_workers = []
71        for i in range(cfg.num_decoders()):
72            self.decoder_workers.append(DecoderWorker(self.decoder_queue))
73
74    def start(self):
75        for decoder_worker in self.decoder_workers:
76            decoder_worker.start()
77
78    def is_alive(self) -> bool:
79        # Check all workers
80        for decoder_worker in self.decoder_workers:
81            if not decoder_worker.is_alive():
82                return False
83        return True
84
85    def stop(self):
86        # Put multiple to stop all decoders
87        for _ in self.decoder_workers:
88            self.decoder_queue.put((None, None))
89
90    def join(self):
91        # Wait for all decoders to finish
92        for decoder_worker in self.decoder_workers:
93            try:
94                decoder_worker.join()
95            except:
96                pass
97
98    def process(self, article: Article, raw_data: List[bytes]):
99        # We use reported article-size, just like sabyenc does
100        sabnzbd.ArticleCache.reserve_space(article.bytes)
101        self.decoder_queue.put((article, raw_data))
102
103    def queue_full(self) -> bool:
104        # Check if the queue size exceeds the limits
105        return self.decoder_queue.qsize() >= sabnzbd.ArticleCache.decoder_cache_article_limit
106
107
108class DecoderWorker(Thread):
109    """The actuall workhorse that handles decoding!"""
110
111    def __init__(self, decoder_queue):
112        super().__init__()
113        logging.debug("Initializing decoder %s", self.name)
114
115        self.decoder_queue: queue.Queue[Tuple[Optional[Article], Optional[List[bytes]]]] = decoder_queue
116
117    def run(self):
118        while 1:
119            # Set Article and NzbObject objects to None so references from this
120            # thread do not keep the parent objects alive (see #1628)
121            decoded_data = raw_data = article = nzo = None
122            article, raw_data = self.decoder_queue.get()
123            if not article:
124                logging.info("Shutting down decoder %s", self.name)
125                break
126
127            nzo = article.nzf.nzo
128            art_id = article.article
129
130            # Free space in the decoder-queue
131            sabnzbd.ArticleCache.free_reserved_space(article.bytes)
132
133            # Keeping track
134            article_success = False
135
136            try:
137                if nzo.precheck:
138                    raise BadYenc
139
140                if sabnzbd.LOG_ALL:
141                    logging.debug("Decoding %s", art_id)
142
143                decoded_data = decode(article, raw_data)
144                article_success = True
145
146            except MemoryError:
147                logging.warning(T("Decoder failure: Out of memory"))
148                logging.info("Decoder-Queue: %d", self.decoder_queue.qsize())
149                logging.info("Cache: %d, %d, %d", *sabnzbd.ArticleCache.cache_info())
150                logging.info("Traceback: ", exc_info=True)
151                sabnzbd.Downloader.pause()
152
153                # This article should be fetched again
154                sabnzbd.NzbQueue.reset_try_lists(article)
155                continue
156
157            except CrcError as crc_error:
158                logging.info("CRC Error in %s" % art_id)
159
160                # Continue to the next one if we found new server
161                if search_new_server(article):
162                    continue
163
164                # Store data, maybe par2 can still fix it
165                decoded_data = crc_error.data
166
167            except (BadYenc, ValueError):
168                # Handles precheck and badly formed articles
169                if nzo.precheck and raw_data and raw_data[0].startswith(b"223 "):
170                    # STAT was used, so we only get a status code
171                    article_success = True
172                else:
173                    # Examine headers (for precheck) or body (for download)
174                    # Look for DMCA clues (while skipping "X-" headers)
175                    # Detect potential UUencode
176                    for line in raw_data:
177                        lline = line.lower()
178                        if b"message-id:" in lline:
179                            article_success = True
180                        if not lline.startswith(b"x-") and match_str(
181                            lline, (b"dmca", b"removed", b"cancel", b"blocked")
182                        ):
183                            article_success = False
184                            logging.info("Article removed from server (%s)", art_id)
185                            break
186                        if lline.find(b"\nbegin ") >= 0:
187                            logme = T("UUencode detected, only yEnc encoding is supported [%s]") % nzo.final_name
188                            logging.error(logme)
189                            nzo.fail_msg = logme
190                            sabnzbd.NzbQueue.end_job(nzo)
191                            break
192
193                # Pre-check, proper article found so just register
194                if nzo.precheck and article_success and sabnzbd.LOG_ALL:
195                    logging.debug("Server %s has article %s", article.fetcher, art_id)
196                elif not article_success:
197                    # If not pre-check, this must be a bad article
198                    if not nzo.precheck:
199                        logging.info("Badly formed yEnc article in %s", art_id, exc_info=True)
200
201                    # Continue to the next one if we found new server
202                    if search_new_server(article):
203                        continue
204
205            except:
206                logging.warning(T("Unknown Error while decoding %s"), art_id)
207                logging.info("Traceback: ", exc_info=True)
208
209                # Continue to the next one if we found new server
210                if search_new_server(article):
211                    continue
212
213            if decoded_data:
214                # If the data needs to be written to disk due to full cache, this will be slow
215                # Causing the decoder-queue to fill up and delay the downloader
216                sabnzbd.ArticleCache.save_article(article, decoded_data)
217
218            sabnzbd.NzbQueue.register_article(article, article_success)
219
220
221def decode(article: Article, raw_data: List[bytes]) -> bytes:
222    # Let SABYenc do all the heavy lifting
223    decoded_data, yenc_filename, crc, crc_expected, crc_correct = sabyenc3.decode_usenet_chunks(raw_data, article.bytes)
224
225    # Mark as decoded
226    article.decoded = True
227
228    # Assume it is yenc
229    article.nzf.type = "yenc"
230
231    # Only set the name if it was found and not obfuscated
232    if not article.nzf.filename_checked and yenc_filename:
233        # Set the md5-of-16k if this is the first article
234        if article.lowest_partnum:
235            article.nzf.md5of16k = hashlib.md5(decoded_data[:16384]).digest()
236
237        # Try the rename, even if it's not the first article
238        # For example when the first article was missing
239        article.nzf.nzo.verify_nzf_filename(article.nzf, yenc_filename)
240
241    # CRC check
242    if not crc_correct:
243        raise CrcError(crc_expected, crc, decoded_data)
244
245    return decoded_data
246
247
248def search_new_server(article: Article) -> bool:
249    """Shorthand for searching new server or else increasing bad_articles"""
250    # Continue to the next one if we found new server
251    if not article.search_new_server():
252        # Increase bad articles if no new server was found
253        article.nzf.nzo.increase_bad_articles_counter("bad_articles")
254        return False
255    return True
256