1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import os
5import time
6import platform
7import shutil
8import pytest
9import funq
10from funq.client import ApplicationContext, ApplicationConfig
11from funq.errors import FunqError
12
13
14FUNQ_DIR = os.path.dirname(__file__)
15TESTS_DIR = os.path.dirname(FUNQ_DIR)
16REPO_DIR = os.path.dirname(TESTS_DIR)
17DATA_DIR = os.path.join(TESTS_DIR, 'data')
18
19
20def pytest_addoption(parser):
21    parser.addoption("--librepcb-executable",
22                     action="store",
23                     help="Path to librepcb executable to test")
24
25
26class GlobalOptions:
27    def __init__(self):
28        self.funq_conf = 'funq.conf'
29        self.funq_attach_exe = funq.tools.which('funq')
30        self.funq_gkit = 'default'
31        self.funq_gkit_file = os.path.join(os.path.dirname(os.path.realpath(funq.client.__file__)), 'aliases-gkits.conf')
32
33
34class Application(object):
35    def __init__(self, executable, env=None, args=()):
36        super(Application, self).__init__()
37        cfg = ApplicationConfig(executable=executable, args=args, cwd=os.getcwd(), env=env,
38                                aliases=os.path.join(FUNQ_DIR, 'aliases'), global_options=GlobalOptions())
39        self._context = ApplicationContext(cfg)
40
41    def __enter__(self):
42        return self._context.funq
43
44    def __exit__(self, exc_type, exc_val, exc_tb):
45        del self._context
46
47
48class LibrePcbFixture(object):
49    def __init__(self, config, tmpdir):
50        super(LibrePcbFixture, self).__init__()
51        self.executable = os.path.abspath(config.getoption('--librepcb-executable'))
52        if not os.path.exists(self.executable):
53            raise Exception("Executable '{}' not found. Please pass it with "
54                            "'--librepcb-executable'.".format(self.executable))
55        self.tmpdir = tmpdir
56        # Copy test data to temporary directory to avoid modifications in original data
57        shutil.copytree(os.path.join(DATA_DIR, 'workspaces', 'Empty Workspace'),
58                        os.path.join(self.tmpdir, 'Empty Workspace'))
59        # Init members to default values
60        self.workspace_path = os.path.join(self.tmpdir, 'Empty Workspace')
61        self.project_path = None
62
63    def abspath(self, relpath):
64        return os.path.join(self.tmpdir, relpath)
65
66    def set_workspace(self, path):
67        if not os.path.isabs(path):
68            path = self.abspath(path)
69        self.workspace_path = path
70
71    def add_project(self, project, as_lppz=False):
72        src = os.path.join(DATA_DIR, 'projects', project)
73        dst = os.path.join(self.tmpdir, project)
74        if as_lppz:
75            shutil.make_archive(dst, 'zip', src)
76            shutil.move(dst + '.zip', dst + '.lppz')
77        else:
78            shutil.copytree(src, dst)
79
80    def set_project(self, path):
81        if not os.path.isabs(path):
82            path = self.abspath(path)
83        self.project_path = path
84
85    def get_workspace_libraries_path(self, subdir=''):
86        return os.path.join(self.workspace_path, 'v0.1', 'libraries', subdir)
87
88    def add_local_library_to_workspace(self, path):
89        if not os.path.isabs(path):
90            path = os.path.join(DATA_DIR, path)
91        dest = self.get_workspace_libraries_path('local')
92        dest = os.path.join(dest, os.path.basename(path))
93        shutil.copytree(path, dest)
94
95    def open(self):
96        self._create_application_config_file()
97        return Application(self.executable, env=self._env(), args=self._args())
98
99    def _create_application_config_file(self):
100        org_dir = 'LibrePCB.org' if platform.system() == 'Darwin' else 'LibrePCB'
101        config_dir = os.path.join(self.tmpdir, 'config', org_dir)
102        config_ini = os.path.join(config_dir, 'LibrePCB.ini')
103        if not os.path.exists(config_dir):
104            os.makedirs(config_dir)
105        # Only create config file once per test, so tests can check if settings
106        # are stored permanently.
107        if not os.path.exists(config_ini):
108            with open(config_ini, 'w') as f:
109                if self.workspace_path:
110                    f.write("[workspaces]\n")
111                    f.write("most_recently_used=\"{}\"\n".format(self.workspace_path.replace('\\', '/')))
112
113    def _args(self):
114        args = []
115        if self.project_path:
116            args.append(self.project_path)
117        return args
118
119    def _env(self):
120        env = os.environ
121        # Make GUI independent from the system's language
122        env['LC_ALL'] = 'C'
123        # Override configuration location to make tests independent of existing configs
124        env['LIBREPCB_CONFIG_DIR'] = os.path.join(self.tmpdir, 'config')
125        # Use a neutral username
126        env['USERNAME'] = 'testuser'
127        # Force LibrePCB to use Qt-style file dialogs because native dialogs don't work
128        env['LIBREPCB_DISABLE_NATIVE_DIALOGS'] = '1'
129        return env
130
131
132class Helpers(object):
133    @staticmethod
134    def wait_for_model_items_count(widget, min_count, max_count=None, timeout=5.0):
135        if max_count == 0:
136            # First wait a bit to be sure the model is really not populated
137            time.sleep(0.1)
138        count = None
139        for i in range(0, 100):
140            count = len(widget.model().items().items)
141            if min_count <= count and (max_count is None or count <= max_count):
142                return
143            time.sleep(timeout / 100.0)
144        raise Exception('Widget "{}" has {} items instead of [{}..{}]!'.format(
145            widget.properties().get('objectName'), count, min_count, max_count))
146
147    @staticmethod
148    def wait_for_library_scan_complete(app, timeout=10.0):
149        progress_bar = app.widget('controlPanelStatusBarProgressBar', wait_active=False)
150        # wait until scan has started (progress > 10%)
151        for i in range(0, 100):
152            percent = progress_bar.properties()['value']
153            if percent > 10:
154                break
155            time.sleep(timeout / 100.0)
156        # Wait until scan has finished (progressbar hidden)
157        Helpers.wait_until_widget_hidden(progress_bar, timeout=timeout)
158
159    @staticmethod
160    def wait_until_widget_hidden(widget, timeout=5.0):
161        for i in range(0, 100):
162            try:
163                if widget.properties()['visible'] is False:
164                    return
165            except FunqError as e:
166                if e.classname == 'NotRegisteredObject':
167                    return
168                raise
169            time.sleep(timeout / 100.0)
170        raise Exception('Widget "{}" is still visible!'.format(
171            widget.properties().get('objectName')))
172
173    @staticmethod
174    def wait_for_active_window(funq, widget, timeout=5.0):
175        Helpers._wait_for_active_widget(funq, widget, timeout, 'window')
176
177    @staticmethod
178    def wait_for_active_dialog(funq, widget, timeout=5.0):
179        Helpers._wait_for_active_widget(funq, widget, timeout, 'modal')
180
181    @staticmethod
182    def _wait_for_active_widget(funq, widget, timeout, widget_type):
183        active_widget = None
184        for i in range(0, 100):
185            active_widget = funq.active_widget(widget_type=widget_type)
186            if active_widget is not None and active_widget.oid == widget.oid:
187                return
188            time.sleep(timeout / 100.0)
189        properties = active_widget.properties() if active_widget else dict()
190        raise Exception('Active widget is "{}" ({})!'.format(
191            properties.get('windowTitle'), properties.get('objectName')))
192
193
194@pytest.fixture(scope="session")
195def librepcb_server():
196    """
197    Fixture which provides a HTTP server at localhost:8080
198
199    All tests should use this server instead of the official LibrePCB API server
200    or GitHub for downloading libraries.
201    """
202    import time
203    import threading
204    import socket
205    import socketserver
206    import http.server
207
208    class Handler(http.server.SimpleHTTPRequestHandler, object):
209        def translate_path(self, path):
210            path = super(Handler, self).translate_path(path)
211            relpath = os.path.relpath(path, os.curdir)
212            return os.path.join(DATA_DIR, 'server', relpath)
213
214    # Set SO_REUSEADDR option to avoid "port already in use" errors
215    httpd = socketserver.TCPServer(("", 50080), Handler, bind_and_activate=False)
216    httpd.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
217    httpd.server_bind()
218    httpd.server_activate()
219    thread = threading.Thread(target=httpd.serve_forever)
220    thread.daemon = True
221    thread.start()
222    time.sleep(0.2)  # wait a bit to make sure the server is ready
223
224
225@pytest.fixture
226def create_librepcb(request, tmpdir, librepcb_server):
227    """
228    Fixture allowing to create multiple application instances
229    """
230    def _create():
231        return LibrePcbFixture(request.config, str(tmpdir))
232    return _create
233
234
235@pytest.fixture
236def librepcb(create_librepcb):
237    """
238    Fixture allowing to create one application instance
239    """
240    yield create_librepcb()
241
242
243@pytest.fixture(scope="session")
244def helpers():
245    """
246    Fixture providing some helper functions
247    """
248    return Helpers()
249