1'''unittest.TestCase convenience methods for DBusMocks''' 2 3# This program is free software; you can redistribute it and/or modify it under 4# the terms of the GNU Lesser General Public License as published by the Free 5# Software Foundation; either version 3 of the License, or (at your option) any 6# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text 7# of the license. 8 9__author__ = 'Martin Pitt' 10__copyright__ = '(c) 2012 Canonical Ltd.' 11 12import errno 13import os 14import signal 15import subprocess 16import sys 17import tempfile 18import time 19import unittest 20from typing import Tuple, Dict, Any 21 22import dbus 23 24from dbusmock.mockobject import MOCK_IFACE, OBJECT_MANAGER_IFACE, load_module 25 26 27class DBusTestCase(unittest.TestCase): 28 '''Base class for D-Bus mock tests. 29 30 This provides some convenience API to start/stop local D-Buses, so that you 31 can run a private local session and/or system bus to run mocks on. 32 33 This also provides a spawn_server() static method to run the D-Bus mock 34 server in a separate process. 35 ''' 36 session_bus_pid = None 37 system_bus_pid = None 38 39 @classmethod 40 def start_session_bus(cls) -> None: 41 '''Set up a private local session bus 42 43 This gets stopped automatically in tearDownClass(). 44 ''' 45 (DBusTestCase.session_bus_pid, addr) = cls.start_dbus() 46 os.environ['DBUS_SESSION_BUS_ADDRESS'] = addr 47 48 @classmethod 49 def start_system_bus(cls) -> None: 50 '''Set up a private local system bus 51 52 This gets stopped automatically in tearDownClass(). 53 ''' 54 # create a temporary configuration which makes the fake bus actually 55 # appear a type "system" 56 with tempfile.NamedTemporaryFile(prefix='dbusmock_cfg') as c: 57 c.write(b'''<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN" 58 "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd"> 59<busconfig> 60 <type>system</type> 61 <keep_umask/> 62 <listen>unix:tmpdir=/tmp</listen> 63 <!-- We do not add standard_system_servicedirs (i.e. we have *no* service directory). --> 64 65 <policy context="default"> 66 <allow send_destination="*" eavesdrop="true"/> 67 <allow eavesdrop="true"/> 68 <allow own="*"/> 69 </policy> 70</busconfig> 71''') 72 c.flush() 73 (DBusTestCase.system_bus_pid, addr) = cls.start_dbus(conf=c.name) 74 os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = addr 75 76 @classmethod 77 def tearDownClass(cls) -> None: 78 '''Stop private session/system buses''' 79 80 if DBusTestCase.session_bus_pid is not None: 81 cls.stop_dbus(DBusTestCase.session_bus_pid) 82 del os.environ['DBUS_SESSION_BUS_ADDRESS'] 83 DBusTestCase.session_bus_pid = None 84 if DBusTestCase.system_bus_pid is not None: 85 cls.stop_dbus(DBusTestCase.system_bus_pid) 86 del os.environ['DBUS_SYSTEM_BUS_ADDRESS'] 87 DBusTestCase.system_bus_pid = None 88 89 @classmethod 90 def start_dbus(cls, conf: str = None) -> Tuple[int, str]: 91 '''Start a D-Bus daemon 92 93 Return (pid, address) pair. 94 95 Normally you do not need to call this directly. Use start_system_bus() 96 and start_session_bus() instead. 97 ''' 98 argv = ['dbus-daemon', '--fork', '--print-address=1', '--print-pid=1'] 99 if conf: 100 argv.append('--config-file=' + conf) 101 else: 102 argv.append('--session') 103 lines = subprocess.check_output(argv, universal_newlines=True).strip().splitlines() 104 assert len(lines) == 2, 'expected exactly 2 lines of output from dbus-daemon' 105 # usually the first line is the address, but be lenient and accept any order 106 try: 107 return (int(lines[1]), lines[0]) 108 except ValueError: 109 return (int(lines[0]), lines[1]) 110 111 @classmethod 112 def stop_dbus(cls, pid: int) -> None: 113 '''Stop a D-Bus daemon 114 115 Normally you do not need to call this directly. When you use 116 start_system_bus() and start_session_bus(), these buses are 117 automatically stopped in tearDownClass(). 118 ''' 119 signal.signal(signal.SIGTERM, signal.SIG_IGN) 120 for _ in range(50): 121 try: 122 os.kill(pid, signal.SIGTERM) 123 except OSError as e: 124 if e.errno == errno.ESRCH: 125 break 126 raise 127 time.sleep(0.1) 128 else: 129 sys.stderr.write('ERROR: timed out waiting for bus process to terminate\n') 130 os.kill(pid, signal.SIGKILL) 131 time.sleep(0.5) 132 signal.signal(signal.SIGTERM, signal.SIG_DFL) 133 134 @classmethod 135 def get_dbus(cls, system_bus: bool = False) -> dbus.Bus: 136 '''Get dbus.bus.BusConnection() object 137 138 This is preferrable to dbus.SystemBus() and dbus.SessionBus() as those 139 do not get along with multiple changing local test buses. 140 ''' 141 if system_bus: 142 if os.environ.get('DBUS_SYSTEM_BUS_ADDRESS'): 143 return dbus.bus.BusConnection(os.environ['DBUS_SYSTEM_BUS_ADDRESS']) 144 return dbus.SystemBus() 145 146 if os.environ.get('DBUS_SESSION_BUS_ADDRESS'): 147 return dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS']) 148 return dbus.SessionBus() 149 150 @classmethod 151 def wait_for_bus_object(cls, dest: str, path: str, system_bus: bool = False, timeout: int = 600): 152 '''Wait for an object to appear on D-Bus 153 154 Raise an exception if object does not appear within one minute. You can 155 change the timeout with the "timeout" keyword argument which specifies 156 deciseconds. 157 ''' 158 bus = cls.get_dbus(system_bus) 159 160 last_exc = None 161 # we check whether the name is owned first, to avoid race conditions 162 # with service activation; once it's owned, wait until we can actually 163 # call methods 164 while timeout > 0: 165 if bus.name_has_owner(dest): 166 try: 167 p = dbus.Interface(bus.get_object(dest, path), 168 dbus_interface=dbus.INTROSPECTABLE_IFACE) 169 p.Introspect() 170 break 171 except dbus.exceptions.DBusException as e: 172 last_exc = e 173 if '.UnknownInterface' in str(e): 174 break 175 176 timeout -= 1 177 time.sleep(0.1) 178 if timeout <= 0: 179 assert timeout > 0, f'timed out waiting for D-Bus object {path}: {last_exc}' 180 181 @classmethod 182 def spawn_server(cls, name: str, path: str, interface: str, system_bus: bool = False, stdout: int = None): 183 '''Run a DBusMockObject instance in a separate process 184 185 The daemon will terminate automatically when the D-Bus that it connects 186 to goes down. If that does not happen (e. g. you test on the actual 187 system/session bus), you need to kill it manually. 188 189 This function blocks until the spawned DBusMockObject is ready and 190 listening on the bus. 191 192 Returns the Popen object of the spawned daemon. 193 ''' 194 argv = [sys.executable, '-m', 'dbusmock'] 195 if system_bus: 196 argv.append('--system') 197 argv.append(name) 198 argv.append(path) 199 argv.append(interface) 200 201 # pylint: disable=consider-using-with 202 daemon = subprocess.Popen(argv, stdout=stdout) 203 204 # wait for daemon to start up 205 cls.wait_for_bus_object(name, path, system_bus) 206 207 return daemon 208 209 @classmethod 210 def spawn_server_template(cls, template: str, parameters: Dict[str, Any] = None, stdout: int = None, system_bus: bool = None): 211 '''Run a D-Bus mock template instance in a separate process 212 213 This starts a D-Bus mock process and loads the given template with 214 (optional) parameters into it. For details about templates see 215 dbusmock.DBusMockObject.AddTemplate(). 216 217 Usually a template should specify SYSTEM_BUS = False/True to select whether it 218 gets loaded on the session or system bus. This can be overridden with the system_bus 219 parameter. For templates which don't set SYSTEM_BUS, this parameter has to be set. 220 221 The daemon will terminate automatically when the D-Bus that it connects 222 to goes down. If that does not happen (e. g. you test on the actual 223 system/session bus), you need to kill it manually. 224 225 This function blocks until the spawned DBusMockObject is ready and 226 listening on the bus. 227 228 Returns a pair (daemon Popen object, main dbus object). 229 ''' 230 # we need the bus address from the template module 231 module = load_module(template) 232 233 if hasattr(module, 'IS_OBJECT_MANAGER'): 234 is_object_manager = module.IS_OBJECT_MANAGER 235 else: 236 is_object_manager = False 237 238 if is_object_manager and not hasattr(module, 'MAIN_IFACE'): 239 interface_name = OBJECT_MANAGER_IFACE 240 else: 241 interface_name = module.MAIN_IFACE 242 243 if system_bus is None: 244 system_bus = module.SYSTEM_BUS 245 246 daemon = cls.spawn_server(module.BUS_NAME, module.MAIN_OBJ, 247 interface_name, system_bus, stdout) 248 249 bus = cls.get_dbus(system_bus) 250 obj = bus.get_object(module.BUS_NAME, module.MAIN_OBJ) 251 if not parameters: 252 parameters = dbus.Dictionary({}, signature='sv') 253 obj.AddTemplate(template, parameters, 254 dbus_interface=MOCK_IFACE) 255 256 return (daemon, obj) 257