1'''unittest.TestCase convenience methods for DBusMocks'''
2
3# This program is free software; you can redistribute it and/or modify it under
4# the terms of the GNU Lesser General Public License as published by the Free
5# Software Foundation; either version 3 of the License, or (at your option) any
6# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
7# of the license.
8
9__author__ = 'Martin Pitt'
10__copyright__ = '(c) 2012 Canonical Ltd.'
11
12import errno
13import os
14import signal
15import subprocess
16import sys
17import tempfile
18import time
19import unittest
20from typing import Tuple, Dict, Any
21
22import dbus
23
24from dbusmock.mockobject import MOCK_IFACE, OBJECT_MANAGER_IFACE, load_module
25
26
27class DBusTestCase(unittest.TestCase):
28    '''Base class for D-Bus mock tests.
29
30    This provides some convenience API to start/stop local D-Buses, so that you
31    can run a private local session and/or system bus to run mocks on.
32
33    This also provides a spawn_server() static method to run the D-Bus mock
34    server in a separate process.
35    '''
36    session_bus_pid = None
37    system_bus_pid = None
38
39    @classmethod
40    def start_session_bus(cls) -> None:
41        '''Set up a private local session bus
42
43        This gets stopped automatically in tearDownClass().
44        '''
45        (DBusTestCase.session_bus_pid, addr) = cls.start_dbus()
46        os.environ['DBUS_SESSION_BUS_ADDRESS'] = addr
47
48    @classmethod
49    def start_system_bus(cls) -> None:
50        '''Set up a private local system bus
51
52        This gets stopped automatically in tearDownClass().
53        '''
54        # create a temporary configuration which makes the fake bus actually
55        # appear a type "system"
56        with tempfile.NamedTemporaryFile(prefix='dbusmock_cfg') as c:
57            c.write(b'''<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN"
58 "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
59<busconfig>
60  <type>system</type>
61  <keep_umask/>
62  <listen>unix:tmpdir=/tmp</listen>
63  <!-- We do not add standard_system_servicedirs (i.e. we have *no* service directory). -->
64
65  <policy context="default">
66    <allow send_destination="*" eavesdrop="true"/>
67    <allow eavesdrop="true"/>
68    <allow own="*"/>
69  </policy>
70</busconfig>
71''')
72            c.flush()
73            (DBusTestCase.system_bus_pid, addr) = cls.start_dbus(conf=c.name)
74        os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = addr
75
76    @classmethod
77    def tearDownClass(cls) -> None:
78        '''Stop private session/system buses'''
79
80        if DBusTestCase.session_bus_pid is not None:
81            cls.stop_dbus(DBusTestCase.session_bus_pid)
82            del os.environ['DBUS_SESSION_BUS_ADDRESS']
83            DBusTestCase.session_bus_pid = None
84        if DBusTestCase.system_bus_pid is not None:
85            cls.stop_dbus(DBusTestCase.system_bus_pid)
86            del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
87            DBusTestCase.system_bus_pid = None
88
89    @classmethod
90    def start_dbus(cls, conf: str = None) -> Tuple[int, str]:
91        '''Start a D-Bus daemon
92
93        Return (pid, address) pair.
94
95        Normally you do not need to call this directly. Use start_system_bus()
96        and start_session_bus() instead.
97        '''
98        argv = ['dbus-daemon', '--fork', '--print-address=1', '--print-pid=1']
99        if conf:
100            argv.append('--config-file=' + conf)
101        else:
102            argv.append('--session')
103        lines = subprocess.check_output(argv, universal_newlines=True).strip().splitlines()
104        assert len(lines) == 2, 'expected exactly 2 lines of output from dbus-daemon'
105        # usually the first line is the address, but be lenient and accept any order
106        try:
107            return (int(lines[1]), lines[0])
108        except ValueError:
109            return (int(lines[0]), lines[1])
110
111    @classmethod
112    def stop_dbus(cls, pid: int) -> None:
113        '''Stop a D-Bus daemon
114
115        Normally you do not need to call this directly. When you use
116        start_system_bus() and start_session_bus(), these buses are
117        automatically stopped in tearDownClass().
118        '''
119        signal.signal(signal.SIGTERM, signal.SIG_IGN)
120        for _ in range(50):
121            try:
122                os.kill(pid, signal.SIGTERM)
123            except OSError as e:
124                if e.errno == errno.ESRCH:
125                    break
126                raise
127            time.sleep(0.1)
128        else:
129            sys.stderr.write('ERROR: timed out waiting for bus process to terminate\n')
130            os.kill(pid, signal.SIGKILL)
131            time.sleep(0.5)
132        signal.signal(signal.SIGTERM, signal.SIG_DFL)
133
134    @classmethod
135    def get_dbus(cls, system_bus: bool = False) -> dbus.Bus:
136        '''Get dbus.bus.BusConnection() object
137
138        This is preferrable to dbus.SystemBus() and dbus.SessionBus() as those
139        do not get along with multiple changing local test buses.
140        '''
141        if system_bus:
142            if os.environ.get('DBUS_SYSTEM_BUS_ADDRESS'):
143                return dbus.bus.BusConnection(os.environ['DBUS_SYSTEM_BUS_ADDRESS'])
144            return dbus.SystemBus()
145
146        if os.environ.get('DBUS_SESSION_BUS_ADDRESS'):
147            return dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS'])
148        return dbus.SessionBus()
149
150    @classmethod
151    def wait_for_bus_object(cls, dest: str, path: str, system_bus: bool = False, timeout: int = 600):
152        '''Wait for an object to appear on D-Bus
153
154        Raise an exception if object does not appear within one minute. You can
155        change the timeout with the "timeout" keyword argument which specifies
156        deciseconds.
157        '''
158        bus = cls.get_dbus(system_bus)
159
160        last_exc = None
161        # we check whether the name is owned first, to avoid race conditions
162        # with service activation; once it's owned, wait until we can actually
163        # call methods
164        while timeout > 0:
165            if bus.name_has_owner(dest):
166                try:
167                    p = dbus.Interface(bus.get_object(dest, path),
168                                       dbus_interface=dbus.INTROSPECTABLE_IFACE)
169                    p.Introspect()
170                    break
171                except dbus.exceptions.DBusException as e:
172                    last_exc = e
173                    if '.UnknownInterface' in str(e):
174                        break
175
176            timeout -= 1
177            time.sleep(0.1)
178        if timeout <= 0:
179            assert timeout > 0, f'timed out waiting for D-Bus object {path}: {last_exc}'
180
181    @classmethod
182    def spawn_server(cls, name: str, path: str, interface: str, system_bus: bool = False, stdout: int = None):
183        '''Run a DBusMockObject instance in a separate process
184
185        The daemon will terminate automatically when the D-Bus that it connects
186        to goes down.  If that does not happen (e. g. you test on the actual
187        system/session bus), you need to kill it manually.
188
189        This function blocks until the spawned DBusMockObject is ready and
190        listening on the bus.
191
192        Returns the Popen object of the spawned daemon.
193        '''
194        argv = [sys.executable, '-m', 'dbusmock']
195        if system_bus:
196            argv.append('--system')
197        argv.append(name)
198        argv.append(path)
199        argv.append(interface)
200
201        # pylint: disable=consider-using-with
202        daemon = subprocess.Popen(argv, stdout=stdout)
203
204        # wait for daemon to start up
205        cls.wait_for_bus_object(name, path, system_bus)
206
207        return daemon
208
209    @classmethod
210    def spawn_server_template(cls, template: str, parameters: Dict[str, Any] = None, stdout: int = None, system_bus: bool = None):
211        '''Run a D-Bus mock template instance in a separate process
212
213        This starts a D-Bus mock process and loads the given template with
214        (optional) parameters into it. For details about templates see
215        dbusmock.DBusMockObject.AddTemplate().
216
217        Usually a template should specify SYSTEM_BUS = False/True to select whether it
218        gets loaded on the session or system bus. This can be overridden with the system_bus
219        parameter. For templates which don't set SYSTEM_BUS, this parameter has to be set.
220
221        The daemon will terminate automatically when the D-Bus that it connects
222        to goes down.  If that does not happen (e. g. you test on the actual
223        system/session bus), you need to kill it manually.
224
225        This function blocks until the spawned DBusMockObject is ready and
226        listening on the bus.
227
228        Returns a pair (daemon Popen object, main dbus object).
229        '''
230        # we need the bus address from the template module
231        module = load_module(template)
232
233        if hasattr(module, 'IS_OBJECT_MANAGER'):
234            is_object_manager = module.IS_OBJECT_MANAGER
235        else:
236            is_object_manager = False
237
238        if is_object_manager and not hasattr(module, 'MAIN_IFACE'):
239            interface_name = OBJECT_MANAGER_IFACE
240        else:
241            interface_name = module.MAIN_IFACE
242
243        if system_bus is None:
244            system_bus = module.SYSTEM_BUS
245
246        daemon = cls.spawn_server(module.BUS_NAME, module.MAIN_OBJ,
247                                  interface_name, system_bus, stdout)
248
249        bus = cls.get_dbus(system_bus)
250        obj = bus.get_object(module.BUS_NAME, module.MAIN_OBJ)
251        if not parameters:
252            parameters = dbus.Dictionary({}, signature='sv')
253        obj.AddTemplate(template, parameters,
254                        dbus_interface=MOCK_IFACE)
255
256        return (daemon, obj)
257