1#!/usr/bin/python3 -OO 2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org> 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU General Public License 6# as published by the Free Software Foundation; either version 2 7# of the License, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 18""" 19sabnzbd.decoder - article decoder 20""" 21 22import logging 23import hashlib 24import queue 25from threading import Thread 26from typing import Tuple, List, Optional 27 28import sabnzbd 29import sabnzbd.cfg as cfg 30from sabnzbd.constants import SABYENC_VERSION_REQUIRED 31from sabnzbd.nzbstuff import Article 32from sabnzbd.misc import match_str 33 34# Check for correct SABYenc version 35SABYENC_VERSION = None 36try: 37 import sabyenc3 38 39 SABYENC_ENABLED = True 40 SABYENC_VERSION = sabyenc3.__version__ 41 # Verify version to at least match minor version 42 if SABYENC_VERSION[:3] != SABYENC_VERSION_REQUIRED[:3]: 43 raise ImportError 44except ImportError: 45 SABYENC_ENABLED = False 46 47 48class CrcError(Exception): 49 def __init__(self, needcrc: int, gotcrc: int, data: bytes): 50 super().__init__() 51 self.needcrc = needcrc 52 self.gotcrc = gotcrc 53 self.data = data 54 55 56class BadYenc(Exception): 57 def __init__(self): 58 super().__init__() 59 60 61class Decoder: 62 """Implement thread-like coordinator for the decoders""" 63 64 def __init__(self): 65 logging.debug("Initializing decoders") 66 # Initialize queue and servers 67 self.decoder_queue = queue.Queue() 68 69 # Initialize decoders 70 self.decoder_workers = [] 71 for i in range(cfg.num_decoders()): 72 self.decoder_workers.append(DecoderWorker(self.decoder_queue)) 73 74 def start(self): 75 for decoder_worker in self.decoder_workers: 76 decoder_worker.start() 77 78 def is_alive(self) -> bool: 79 # Check all workers 80 for decoder_worker in self.decoder_workers: 81 if not decoder_worker.is_alive(): 82 return False 83 return True 84 85 def stop(self): 86 # Put multiple to stop all decoders 87 for _ in self.decoder_workers: 88 self.decoder_queue.put((None, None)) 89 90 def join(self): 91 # Wait for all decoders to finish 92 for decoder_worker in self.decoder_workers: 93 try: 94 decoder_worker.join() 95 except: 96 pass 97 98 def process(self, article: Article, raw_data: List[bytes]): 99 # We use reported article-size, just like sabyenc does 100 sabnzbd.ArticleCache.reserve_space(article.bytes) 101 self.decoder_queue.put((article, raw_data)) 102 103 def queue_full(self) -> bool: 104 # Check if the queue size exceeds the limits 105 return self.decoder_queue.qsize() >= sabnzbd.ArticleCache.decoder_cache_article_limit 106 107 108class DecoderWorker(Thread): 109 """The actuall workhorse that handles decoding!""" 110 111 def __init__(self, decoder_queue): 112 super().__init__() 113 logging.debug("Initializing decoder %s", self.name) 114 115 self.decoder_queue: queue.Queue[Tuple[Optional[Article], Optional[List[bytes]]]] = decoder_queue 116 117 def run(self): 118 while 1: 119 # Set Article and NzbObject objects to None so references from this 120 # thread do not keep the parent objects alive (see #1628) 121 decoded_data = raw_data = article = nzo = None 122 article, raw_data = self.decoder_queue.get() 123 if not article: 124 logging.info("Shutting down decoder %s", self.name) 125 break 126 127 nzo = article.nzf.nzo 128 art_id = article.article 129 130 # Free space in the decoder-queue 131 sabnzbd.ArticleCache.free_reserved_space(article.bytes) 132 133 # Keeping track 134 article_success = False 135 136 try: 137 if nzo.precheck: 138 raise BadYenc 139 140 if sabnzbd.LOG_ALL: 141 logging.debug("Decoding %s", art_id) 142 143 decoded_data = decode(article, raw_data) 144 article_success = True 145 146 except MemoryError: 147 logging.warning(T("Decoder failure: Out of memory")) 148 logging.info("Decoder-Queue: %d", self.decoder_queue.qsize()) 149 logging.info("Cache: %d, %d, %d", *sabnzbd.ArticleCache.cache_info()) 150 logging.info("Traceback: ", exc_info=True) 151 sabnzbd.Downloader.pause() 152 153 # This article should be fetched again 154 sabnzbd.NzbQueue.reset_try_lists(article) 155 continue 156 157 except CrcError as crc_error: 158 logging.info("CRC Error in %s" % art_id) 159 160 # Continue to the next one if we found new server 161 if search_new_server(article): 162 continue 163 164 # Store data, maybe par2 can still fix it 165 decoded_data = crc_error.data 166 167 except (BadYenc, ValueError): 168 # Handles precheck and badly formed articles 169 if nzo.precheck and raw_data and raw_data[0].startswith(b"223 "): 170 # STAT was used, so we only get a status code 171 article_success = True 172 else: 173 # Examine headers (for precheck) or body (for download) 174 # Look for DMCA clues (while skipping "X-" headers) 175 # Detect potential UUencode 176 for line in raw_data: 177 lline = line.lower() 178 if b"message-id:" in lline: 179 article_success = True 180 if not lline.startswith(b"x-") and match_str( 181 lline, (b"dmca", b"removed", b"cancel", b"blocked") 182 ): 183 article_success = False 184 logging.info("Article removed from server (%s)", art_id) 185 break 186 if lline.find(b"\nbegin ") >= 0: 187 logme = T("UUencode detected, only yEnc encoding is supported [%s]") % nzo.final_name 188 logging.error(logme) 189 nzo.fail_msg = logme 190 sabnzbd.NzbQueue.end_job(nzo) 191 break 192 193 # Pre-check, proper article found so just register 194 if nzo.precheck and article_success and sabnzbd.LOG_ALL: 195 logging.debug("Server %s has article %s", article.fetcher, art_id) 196 elif not article_success: 197 # If not pre-check, this must be a bad article 198 if not nzo.precheck: 199 logging.info("Badly formed yEnc article in %s", art_id, exc_info=True) 200 201 # Continue to the next one if we found new server 202 if search_new_server(article): 203 continue 204 205 except: 206 logging.warning(T("Unknown Error while decoding %s"), art_id) 207 logging.info("Traceback: ", exc_info=True) 208 209 # Continue to the next one if we found new server 210 if search_new_server(article): 211 continue 212 213 if decoded_data: 214 # If the data needs to be written to disk due to full cache, this will be slow 215 # Causing the decoder-queue to fill up and delay the downloader 216 sabnzbd.ArticleCache.save_article(article, decoded_data) 217 218 sabnzbd.NzbQueue.register_article(article, article_success) 219 220 221def decode(article: Article, raw_data: List[bytes]) -> bytes: 222 # Let SABYenc do all the heavy lifting 223 decoded_data, yenc_filename, crc, crc_expected, crc_correct = sabyenc3.decode_usenet_chunks(raw_data, article.bytes) 224 225 # Mark as decoded 226 article.decoded = True 227 228 # Assume it is yenc 229 article.nzf.type = "yenc" 230 231 # Only set the name if it was found and not obfuscated 232 if not article.nzf.filename_checked and yenc_filename: 233 # Set the md5-of-16k if this is the first article 234 if article.lowest_partnum: 235 article.nzf.md5of16k = hashlib.md5(decoded_data[:16384]).digest() 236 237 # Try the rename, even if it's not the first article 238 # For example when the first article was missing 239 article.nzf.nzo.verify_nzf_filename(article.nzf, yenc_filename) 240 241 # CRC check 242 if not crc_correct: 243 raise CrcError(crc_expected, crc, decoded_data) 244 245 return decoded_data 246 247 248def search_new_server(article: Article) -> bool: 249 """Shorthand for searching new server or else increasing bad_articles""" 250 # Continue to the next one if we found new server 251 if not article.search_new_server(): 252 # Increase bad articles if no new server was found 253 article.nzf.nzo.increase_bad_articles_counter("bad_articles") 254 return False 255 return True 256