1#!/usr/bin/python3 -OO
2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; either version 2
7# of the License, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17
18"""
19tests.testhelper - Basic helper functions
20"""
21
22import os
23import time
24from http.client import RemoteDisconnected
25import pytest
26from random import choice, randint
27import requests
28from selenium.common.exceptions import WebDriverException
29from selenium.webdriver.common.keys import Keys
30from selenium.webdriver.support.ui import WebDriverWait
31from string import ascii_lowercase, digits
32from unittest import mock
33from urllib3.exceptions import ProtocolError
34import xmltodict
35
36import sabnzbd
37import sabnzbd.cfg as cfg
38from sabnzbd.constants import (
39    DB_HISTORY_NAME,
40    DEF_ADMIN_DIR,
41    DEFAULT_PRIORITY,
42    FORCE_PRIORITY,
43    HIGH_PRIORITY,
44    INTERFACE_PRIORITIES,
45    LOW_PRIORITY,
46    NORMAL_PRIORITY,
47    REPAIR_PRIORITY,
48    Status,
49)
50import sabnzbd.database as db
51from sabnzbd.misc import pp_to_opts
52
53import tests.sabnews
54
55SAB_HOST = "127.0.0.1"
56SAB_PORT = randint(4200, 4299)
57SAB_APIKEY = "apikey"
58SAB_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
59SAB_CACHE_DIR = os.path.join(SAB_BASE_DIR, "cache")
60SAB_DATA_DIR = os.path.join(SAB_BASE_DIR, "data")
61SAB_INCOMPLETE_DIR = os.path.join(SAB_CACHE_DIR, "Downloads", "incomplete")
62SAB_COMPLETE_DIR = os.path.join(SAB_CACHE_DIR, "Downloads", "complete")
63SAB_NEWSSERVER_HOST = "127.0.0.1"
64SAB_NEWSSERVER_PORT = 8888
65
66
67def set_config(settings_dict):
68    """Change config-values on the fly, per test"""
69
70    def set_config_decorator(func):
71        def wrapper_func(*args, **kwargs):
72            # Setting up as requested
73            for item, val in settings_dict.items():
74                getattr(cfg, item).set(val)
75
76            # Perform test
77            value = func(*args, **kwargs)
78
79            # Reset values
80            for item in settings_dict:
81                getattr(cfg, item).set(getattr(cfg, item).default())
82            return value
83
84        return wrapper_func
85
86    return set_config_decorator
87
88
89def set_platform(platform):
90    """Change config-values on the fly, per test"""
91
92    def set_platform_decorator(func):
93        def wrapper_func(*args, **kwargs):
94            # Save original values
95            is_windows = sabnzbd.WIN32
96            is_darwin = sabnzbd.DARWIN
97
98            # Set current platform
99            if platform == "win32":
100                sabnzbd.WIN32 = True
101                sabnzbd.DARWIN = False
102            elif platform == "darwin":
103                sabnzbd.WIN32 = False
104                sabnzbd.DARWIN = True
105            elif platform == "linux":
106                sabnzbd.WIN32 = False
107                sabnzbd.DARWIN = False
108
109            # Perform test
110            value = func(*args, **kwargs)
111
112            # Reset values
113            sabnzbd.WIN32 = is_windows
114            sabnzbd.DARWIN = is_darwin
115
116            return value
117
118        return wrapper_func
119
120    return set_platform_decorator
121
122
123def get_url_result(url="", host=SAB_HOST, port=SAB_PORT):
124    """Do basic request to web page"""
125    arguments = {"apikey": SAB_APIKEY}
126    return requests.get("http://%s:%s/%s/" % (host, port, url), params=arguments).text
127
128
129def get_api_result(mode, host=SAB_HOST, port=SAB_PORT, extra_arguments={}):
130    """Build JSON request to SABnzbd"""
131    arguments = {"apikey": SAB_APIKEY, "output": "json", "mode": mode}
132    arguments.update(extra_arguments)
133    r = requests.get("http://%s:%s/api" % (host, port), params=arguments)
134    if arguments["output"] == "text":
135        return r.text
136    elif arguments["output"] == "xml":
137        return xmltodict.parse(r.text)
138    return r.json()
139
140
141def create_nzb(nzb_dir, metadata=None):
142    """Create NZB from directory using SABNews"""
143    nzb_dir_full = os.path.join(SAB_DATA_DIR, nzb_dir)
144    return tests.sabnews.create_nzb(nzb_dir=nzb_dir_full, metadata=metadata)
145
146
147def create_and_read_nzb(nzbdir):
148    """Create NZB, return data and delete file"""
149    # Create NZB-file to import
150    nzb_path = create_nzb(nzbdir)
151    with open(nzb_path, "r") as nzb_data_fp:
152        nzb_data = nzb_data_fp.read()
153    # Remove the created NZB-file
154    os.remove(nzb_path)
155    return nzb_data
156
157
158class FakeHistoryDB(db.HistoryDB):
159    """
160    HistoryDB class with added control of the db_path via an argument and the
161    capability to generate history entries.
162    """
163
164    category_options = ["catA", "catB", "1234", "يوزنت"]
165    distro_names = ["Ubuntu", "デビアン", "Gentoo_Hobby_Edition", "Красная Шляпа"]
166    status_options = [
167        Status.COMPLETED,
168        Status.EXTRACTING,
169        Status.FAILED,
170        Status.MOVING,
171        Status.QUICK_CHECK,
172        Status.REPAIRING,
173        Status.RUNNING,
174        Status.VERIFYING,
175    ]
176
177    def __init__(self, db_path):
178        db.HistoryDB.db_path = db_path
179        super().__init__()
180
181    def add_fake_history_jobs(self, number_of_entries=1):
182        """Generate a history db with any number of fake entries"""
183
184        for _ in range(0, number_of_entries):
185            nzo = mock.Mock()
186
187            # Mock all input build_history_info() needs
188            distro_choice = choice(self.distro_names)
189            distro_random = "".join(choice(ascii_lowercase + digits) for i in range(8))
190            nzo.password = choice(["secret", ""])
191            nzo.final_name = "%s.%s.Linux.ISO-Usenet" % (distro_choice, distro_random)
192            nzo.filename = "%s.%s.Linux-Usenet%s.nzb" % (
193                (distro_choice, distro_random, "{{" + nzo.password + "}}")
194                if nzo.password
195                else (distro_choice, distro_random, "")
196            )
197            nzo.cat = choice(self.category_options)
198            nzo.script = "placeholder_script"
199            nzo.url = "placeholder_url"
200            nzo.status = choice([Status.COMPLETED, choice(self.status_options)])
201            nzo.fail_msg = "¡Fracaso absoluto!" if nzo.status == Status.FAILED else ""
202            nzo.nzo_id = "SABnzbd_nzo_%s" % ("".join(choice(ascii_lowercase + digits) for i in range(8)))
203            nzo.bytes_downloaded = randint(1024, 1024 ** 4)
204            nzo.md5sum = "".join(choice("abcdef" + digits) for i in range(32))
205            nzo.repair_opts = pp_to_opts(choice(list(db._PP_LOOKUP.keys())))  # for "pp"
206            nzo.nzo_info = {"download_time": randint(1, 10 ** 4)}
207            nzo.unpack_info = {"unpack_info": "placeholder unpack_info line\r\n" * 3}
208            nzo.futuretype = False  # for "report", only True when fetching an URL
209            nzo.download_path = os.path.join(os.path.dirname(db.HistoryDB.db_path), "placeholder_downpath")
210
211            # Mock time when calling add_history_db() to randomize completion times
212            almost_time = mock.Mock(return_value=time.time() - randint(0, 10 ** 8))
213            with mock.patch("time.time", almost_time):
214                self.add_history_db(
215                    nzo,
216                    storage=os.path.join(os.path.dirname(db.HistoryDB.db_path), "placeholder_workdir"),
217                    postproc_time=randint(1, 10 ** 3),
218                    script_output="",
219                    script_line="",
220                )
221
222
223@pytest.mark.usefixtures("run_sabnzbd", "run_sabnews_and_selenium")
224class SABnzbdBaseTest:
225    def no_page_crash(self):
226        # Do a base test if CherryPy did not report test
227        assert "500 Internal Server Error" not in self.driver.title
228
229    def open_page(self, url):
230        # Open a page and test for crash
231        self.driver.get(url)
232        self.no_page_crash()
233
234    def scroll_to_top(self):
235        self.driver.find_element_by_tag_name("body").send_keys(Keys.CONTROL + Keys.HOME)
236        time.sleep(2)
237
238    def wait_for_ajax(self):
239        # We catch common nonsense errors from Selenium
240        try:
241            wait = WebDriverWait(self.driver, 15)
242            wait.until(lambda driver_wait: self.driver.execute_script("return jQuery.active") == 0)
243            wait.until(lambda driver_wait: self.driver.execute_script("return document.readyState") == "complete")
244        except (RemoteDisconnected, ProtocolError):
245            pass
246
247    @staticmethod
248    def selenium_wrapper(func, *args):
249        """Wrapper with retries for more stable Selenium"""
250        for i in range(3):
251            try:
252                return func(*args)
253            except WebDriverException as e:
254                # Try again in 2 seconds!
255                time.sleep(2)
256                pass
257        else:
258            raise e
259