1#!/usr/bin/python3 -OO 2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org> 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU General Public License 6# as published by the Free Software Foundation; either version 2 7# of the License, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 18""" 19tests.testhelper - Basic helper functions 20""" 21 22import os 23import time 24from http.client import RemoteDisconnected 25import pytest 26from random import choice, randint 27import requests 28from selenium.common.exceptions import WebDriverException 29from selenium.webdriver.common.keys import Keys 30from selenium.webdriver.support.ui import WebDriverWait 31from string import ascii_lowercase, digits 32from unittest import mock 33from urllib3.exceptions import ProtocolError 34import xmltodict 35 36import sabnzbd 37import sabnzbd.cfg as cfg 38from sabnzbd.constants import ( 39 DB_HISTORY_NAME, 40 DEF_ADMIN_DIR, 41 DEFAULT_PRIORITY, 42 FORCE_PRIORITY, 43 HIGH_PRIORITY, 44 INTERFACE_PRIORITIES, 45 LOW_PRIORITY, 46 NORMAL_PRIORITY, 47 REPAIR_PRIORITY, 48 Status, 49) 50import sabnzbd.database as db 51from sabnzbd.misc import pp_to_opts 52 53import tests.sabnews 54 55SAB_HOST = "127.0.0.1" 56SAB_PORT = randint(4200, 4299) 57SAB_APIKEY = "apikey" 58SAB_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) 59SAB_CACHE_DIR = os.path.join(SAB_BASE_DIR, "cache") 60SAB_DATA_DIR = os.path.join(SAB_BASE_DIR, "data") 61SAB_INCOMPLETE_DIR = os.path.join(SAB_CACHE_DIR, "Downloads", "incomplete") 62SAB_COMPLETE_DIR = os.path.join(SAB_CACHE_DIR, "Downloads", "complete") 63SAB_NEWSSERVER_HOST = "127.0.0.1" 64SAB_NEWSSERVER_PORT = 8888 65 66 67def set_config(settings_dict): 68 """Change config-values on the fly, per test""" 69 70 def set_config_decorator(func): 71 def wrapper_func(*args, **kwargs): 72 # Setting up as requested 73 for item, val in settings_dict.items(): 74 getattr(cfg, item).set(val) 75 76 # Perform test 77 value = func(*args, **kwargs) 78 79 # Reset values 80 for item in settings_dict: 81 getattr(cfg, item).set(getattr(cfg, item).default()) 82 return value 83 84 return wrapper_func 85 86 return set_config_decorator 87 88 89def set_platform(platform): 90 """Change config-values on the fly, per test""" 91 92 def set_platform_decorator(func): 93 def wrapper_func(*args, **kwargs): 94 # Save original values 95 is_windows = sabnzbd.WIN32 96 is_darwin = sabnzbd.DARWIN 97 98 # Set current platform 99 if platform == "win32": 100 sabnzbd.WIN32 = True 101 sabnzbd.DARWIN = False 102 elif platform == "darwin": 103 sabnzbd.WIN32 = False 104 sabnzbd.DARWIN = True 105 elif platform == "linux": 106 sabnzbd.WIN32 = False 107 sabnzbd.DARWIN = False 108 109 # Perform test 110 value = func(*args, **kwargs) 111 112 # Reset values 113 sabnzbd.WIN32 = is_windows 114 sabnzbd.DARWIN = is_darwin 115 116 return value 117 118 return wrapper_func 119 120 return set_platform_decorator 121 122 123def get_url_result(url="", host=SAB_HOST, port=SAB_PORT): 124 """Do basic request to web page""" 125 arguments = {"apikey": SAB_APIKEY} 126 return requests.get("http://%s:%s/%s/" % (host, port, url), params=arguments).text 127 128 129def get_api_result(mode, host=SAB_HOST, port=SAB_PORT, extra_arguments={}): 130 """Build JSON request to SABnzbd""" 131 arguments = {"apikey": SAB_APIKEY, "output": "json", "mode": mode} 132 arguments.update(extra_arguments) 133 r = requests.get("http://%s:%s/api" % (host, port), params=arguments) 134 if arguments["output"] == "text": 135 return r.text 136 elif arguments["output"] == "xml": 137 return xmltodict.parse(r.text) 138 return r.json() 139 140 141def create_nzb(nzb_dir, metadata=None): 142 """Create NZB from directory using SABNews""" 143 nzb_dir_full = os.path.join(SAB_DATA_DIR, nzb_dir) 144 return tests.sabnews.create_nzb(nzb_dir=nzb_dir_full, metadata=metadata) 145 146 147def create_and_read_nzb(nzbdir): 148 """Create NZB, return data and delete file""" 149 # Create NZB-file to import 150 nzb_path = create_nzb(nzbdir) 151 with open(nzb_path, "r") as nzb_data_fp: 152 nzb_data = nzb_data_fp.read() 153 # Remove the created NZB-file 154 os.remove(nzb_path) 155 return nzb_data 156 157 158class FakeHistoryDB(db.HistoryDB): 159 """ 160 HistoryDB class with added control of the db_path via an argument and the 161 capability to generate history entries. 162 """ 163 164 category_options = ["catA", "catB", "1234", "يوزنت"] 165 distro_names = ["Ubuntu", "デビアン", "Gentoo_Hobby_Edition", "Красная Шляпа"] 166 status_options = [ 167 Status.COMPLETED, 168 Status.EXTRACTING, 169 Status.FAILED, 170 Status.MOVING, 171 Status.QUICK_CHECK, 172 Status.REPAIRING, 173 Status.RUNNING, 174 Status.VERIFYING, 175 ] 176 177 def __init__(self, db_path): 178 db.HistoryDB.db_path = db_path 179 super().__init__() 180 181 def add_fake_history_jobs(self, number_of_entries=1): 182 """Generate a history db with any number of fake entries""" 183 184 for _ in range(0, number_of_entries): 185 nzo = mock.Mock() 186 187 # Mock all input build_history_info() needs 188 distro_choice = choice(self.distro_names) 189 distro_random = "".join(choice(ascii_lowercase + digits) for i in range(8)) 190 nzo.password = choice(["secret", ""]) 191 nzo.final_name = "%s.%s.Linux.ISO-Usenet" % (distro_choice, distro_random) 192 nzo.filename = "%s.%s.Linux-Usenet%s.nzb" % ( 193 (distro_choice, distro_random, "{{" + nzo.password + "}}") 194 if nzo.password 195 else (distro_choice, distro_random, "") 196 ) 197 nzo.cat = choice(self.category_options) 198 nzo.script = "placeholder_script" 199 nzo.url = "placeholder_url" 200 nzo.status = choice([Status.COMPLETED, choice(self.status_options)]) 201 nzo.fail_msg = "¡Fracaso absoluto!" if nzo.status == Status.FAILED else "" 202 nzo.nzo_id = "SABnzbd_nzo_%s" % ("".join(choice(ascii_lowercase + digits) for i in range(8))) 203 nzo.bytes_downloaded = randint(1024, 1024 ** 4) 204 nzo.md5sum = "".join(choice("abcdef" + digits) for i in range(32)) 205 nzo.repair_opts = pp_to_opts(choice(list(db._PP_LOOKUP.keys()))) # for "pp" 206 nzo.nzo_info = {"download_time": randint(1, 10 ** 4)} 207 nzo.unpack_info = {"unpack_info": "placeholder unpack_info line\r\n" * 3} 208 nzo.futuretype = False # for "report", only True when fetching an URL 209 nzo.download_path = os.path.join(os.path.dirname(db.HistoryDB.db_path), "placeholder_downpath") 210 211 # Mock time when calling add_history_db() to randomize completion times 212 almost_time = mock.Mock(return_value=time.time() - randint(0, 10 ** 8)) 213 with mock.patch("time.time", almost_time): 214 self.add_history_db( 215 nzo, 216 storage=os.path.join(os.path.dirname(db.HistoryDB.db_path), "placeholder_workdir"), 217 postproc_time=randint(1, 10 ** 3), 218 script_output="", 219 script_line="", 220 ) 221 222 223@pytest.mark.usefixtures("run_sabnzbd", "run_sabnews_and_selenium") 224class SABnzbdBaseTest: 225 def no_page_crash(self): 226 # Do a base test if CherryPy did not report test 227 assert "500 Internal Server Error" not in self.driver.title 228 229 def open_page(self, url): 230 # Open a page and test for crash 231 self.driver.get(url) 232 self.no_page_crash() 233 234 def scroll_to_top(self): 235 self.driver.find_element_by_tag_name("body").send_keys(Keys.CONTROL + Keys.HOME) 236 time.sleep(2) 237 238 def wait_for_ajax(self): 239 # We catch common nonsense errors from Selenium 240 try: 241 wait = WebDriverWait(self.driver, 15) 242 wait.until(lambda driver_wait: self.driver.execute_script("return jQuery.active") == 0) 243 wait.until(lambda driver_wait: self.driver.execute_script("return document.readyState") == "complete") 244 except (RemoteDisconnected, ProtocolError): 245 pass 246 247 @staticmethod 248 def selenium_wrapper(func, *args): 249 """Wrapper with retries for more stable Selenium""" 250 for i in range(3): 251 try: 252 return func(*args) 253 except WebDriverException as e: 254 # Try again in 2 seconds! 255 time.sleep(2) 256 pass 257 else: 258 raise e 259