1#!/usr/bin/python3 2 3# This program is free software; you can redistribute it and/or modify it under 4# the terms of the GNU Lesser General Public License as published by the Free 5# Software Foundation; either version 3 of the License, or (at your option) any 6# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text 7# of the license. 8 9__author__ = 'Martin Pitt' 10__copyright__ = '(c) 2012 Canonical Ltd.' 11 12import unittest 13import sys 14import os 15import tempfile 16import subprocess 17import time 18import importlib.util 19import tracemalloc 20 21import dbus 22import dbus.mainloop.glib 23 24from gi.repository import GLib 25 26import dbusmock 27 28tracemalloc.start(25) 29dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 30 31# "a <heart> b" in py2/3 compatible unicode 32UNICODE = b'a\xe2\x99\xa5b'.decode('UTF-8') 33 34 35class TestAPI(dbusmock.DBusTestCase): 36 '''Test dbus-mock API''' 37 38 @classmethod 39 def setUpClass(cls): 40 cls.start_session_bus() 41 cls.dbus_con = cls.get_dbus() 42 43 def setUp(self): 44 # pylint: disable=consider-using-with 45 self.mock_log = tempfile.NamedTemporaryFile() 46 self.p_mock = self.spawn_server('org.freedesktop.Test', 47 '/', 48 'org.freedesktop.Test.Main', 49 stdout=self.mock_log) 50 51 self.obj_test = self.dbus_con.get_object('org.freedesktop.Test', '/') 52 self.dbus_test = dbus.Interface(self.obj_test, 'org.freedesktop.Test.Main') 53 self.dbus_mock = dbus.Interface(self.obj_test, dbusmock.MOCK_IFACE) 54 self.dbus_props = dbus.Interface(self.obj_test, dbus.PROPERTIES_IFACE) 55 56 def assertLog(self, regex): 57 with open(self.mock_log.name, "rb") as f: 58 self.assertRegex(f.read(), regex) 59 60 def tearDown(self): 61 if self.p_mock.stdout: 62 self.p_mock.stdout.close() 63 self.p_mock.terminate() 64 self.p_mock.wait() 65 66 def test_noarg_noret(self): 67 '''no arguments, no return value''' 68 69 self.dbus_mock.AddMethod('', 'Do', '', '', '') 70 self.assertEqual(self.dbus_test.Do(), None) 71 72 # check that it's logged correctly 73 self.assertLog(b'^[0-9.]+ Do$') 74 75 def test_onearg_noret(self): 76 '''one argument, no return value''' 77 78 self.dbus_mock.AddMethod('', 'Do', 's', '', '') 79 self.assertEqual(self.dbus_test.Do('Hello'), None) 80 81 # check that it's logged correctly 82 self.assertLog(b'^[0-9.]+ Do "Hello"$') 83 84 def test_onearg_ret(self): 85 '''one argument, code for return value''' 86 87 self.dbus_mock.AddMethod('', 'Do', 's', 's', 'ret = args[0]') 88 self.assertEqual(self.dbus_test.Do('Hello'), 'Hello') 89 90 def test_unicode_str(self): 91 '''unicode string roundtrip''' 92 93 self.dbus_mock.AddMethod('', 'Do', 's', 's', 'ret = args[0] * 2') 94 self.assertEqual(self.dbus_test.Do(UNICODE), dbus.String(UNICODE * 2)) 95 96 def test_twoarg_ret(self): 97 '''two arguments, code for return value''' 98 99 self.dbus_mock.AddMethod('', 'Do', 'si', 's', 'ret = args[0] * args[1]') 100 self.assertEqual(self.dbus_test.Do('foo', 3), 'foofoofoo') 101 102 # check that it's logged correctly 103 self.assertLog(b'^[0-9.]+ Do "foo" 3$') 104 105 def test_array_arg(self): 106 '''array argument''' 107 108 self.dbus_mock.AddMethod('', 'Do', 'iaous', '', 109 f'''assert len(args) == 4 110assert args[0] == -1; 111assert args[1] == ['/foo'] 112assert type(args[1]) == dbus.Array 113assert type(args[1][0]) == dbus.ObjectPath 114assert args[2] == 5 115assert args[3] == {repr(UNICODE)} 116''') 117 self.assertEqual(self.dbus_test.Do(-1, ['/foo'], 5, UNICODE), None) 118 119 # check that it's logged correctly 120 self.assertLog(b'^[0-9.]+ Do -1 \\["/foo"\\] 5 "a\\xe2\\x99\\xa5b"$') 121 122 def test_dict_arg(self): 123 '''dictionary argument''' 124 125 self.dbus_mock.AddMethod('', 'Do', 'ia{si}u', '', 126 '''assert len(args) == 3 127assert args[0] == -1; 128assert args[1] == {'foo': 42} 129assert type(args[1]) == dbus.Dictionary 130assert args[2] == 5 131''') 132 self.assertEqual(self.dbus_test.Do(-1, {'foo': 42}, 5), None) 133 134 # check that it's logged correctly 135 self.assertLog(b'^[0-9.]+ Do -1 {"foo": 42} 5$') 136 137 def test_methods_on_other_interfaces(self): 138 '''methods on other interfaces''' 139 140 self.dbus_mock.AddMethod('org.freedesktop.Test.Other', 'OtherDo', '', '', '') 141 self.dbus_mock.AddMethods('org.freedesktop.Test.Other', 142 [('OtherDo2', '', '', ''), 143 ('OtherDo3', 'i', 'i', 'ret = args[0]')]) 144 145 # should not be on the main interface 146 self.assertRaises(dbus.exceptions.DBusException, 147 self.dbus_test.OtherDo) 148 149 # should be on the other interface 150 self.assertEqual(self.obj_test.OtherDo(dbus_interface='org.freedesktop.Test.Other'), None) 151 self.assertEqual(self.obj_test.OtherDo2(dbus_interface='org.freedesktop.Test.Other'), None) 152 self.assertEqual(self.obj_test.OtherDo3(42, dbus_interface='org.freedesktop.Test.Other'), 42) 153 154 # check that it's logged correctly 155 self.assertLog(b'^[0-9.]+ OtherDo\n[0-9.]+ OtherDo2\n[0-9.]+ OtherDo3 42$') 156 157 def test_methods_same_name(self): 158 '''methods with same name on different interfaces''' 159 160 self.dbus_mock.AddMethod('org.iface1', 'Do', 'i', 'i', 'ret = args[0] + 2') 161 self.dbus_mock.AddMethod('org.iface2', 'Do', 'i', 'i', 'ret = args[0] + 3') 162 163 # should not be on the main interface 164 self.assertRaises(dbus.exceptions.DBusException, 165 self.dbus_test.Do) 166 167 # should be on the other interface 168 self.assertEqual(self.obj_test.Do(10, dbus_interface='org.iface1'), 12) 169 self.assertEqual(self.obj_test.Do(11, dbus_interface='org.iface2'), 14) 170 171 # check that it's logged correctly 172 self.assertLog(b'^[0-9.]+ Do 10\n[0-9.]+ Do 11$') 173 174 # now add it to the primary interface, too 175 self.dbus_mock.AddMethod('', 'Do', 'i', 'i', 'ret = args[0] + 1') 176 self.assertEqual(self.obj_test.Do(9, dbus_interface='org.freedesktop.Test.Main'), 10) 177 self.assertEqual(self.obj_test.Do(10, dbus_interface='org.iface1'), 12) 178 self.assertEqual(self.obj_test.Do(11, dbus_interface='org.iface2'), 14) 179 180 def test_methods_type_mismatch(self): 181 '''calling methods with wrong arguments''' 182 183 def check(signature, args, err): 184 self.dbus_mock.AddMethod('', 'Do', signature, '', '') 185 try: 186 self.dbus_test.Do(*args) 187 self.fail(f'method call did not raise an error for signature "{signature}" and arguments {args}') 188 except dbus.exceptions.DBusException as e: 189 self.assertIn(err, str(e)) 190 191 # not enough arguments 192 check('i', [], 'TypeError: More items found') 193 check('is', [1], 'TypeError: More items found') 194 195 # too many arguments 196 check('', [1], 'TypeError: Fewer items found') 197 check('i', [1, 'hello'], 'TypeError: Fewer items found') 198 199 # type mismatch 200 check('u', [-1], 'convert negative value to unsigned') 201 check('i', ['hello'], 'TypeError') 202 check('i', ['hello'], 'integer') 203 check('s', [1], 'TypeError: Expected a string') 204 205 def test_add_object(self): 206 '''add a new object''' 207 208 self.dbus_mock.AddObject('/obj1', 209 'org.freedesktop.Test.Sub', 210 { 211 'state': dbus.String('online', variant_level=1), 212 'cute': dbus.Boolean(True, variant_level=1), 213 }, 214 []) 215 216 obj1 = self.dbus_con.get_object('org.freedesktop.Test', '/obj1') 217 dbus_sub = dbus.Interface(obj1, 'org.freedesktop.Test.Sub') 218 dbus_props = dbus.Interface(obj1, dbus.PROPERTIES_IFACE) 219 220 # check properties 221 self.assertEqual(dbus_props.Get('org.freedesktop.Test.Sub', 'state'), 'online') 222 self.assertEqual(dbus_props.Get('org.freedesktop.Test.Sub', 'cute'), True) 223 self.assertEqual(dbus_props.GetAll('org.freedesktop.Test.Sub'), 224 {'state': 'online', 'cute': True}) 225 226 # add new method 227 obj1.AddMethod('', 'Do', '', 's', 'ret = "hello"', 228 dbus_interface=dbusmock.MOCK_IFACE) 229 self.assertEqual(dbus_sub.Do(), 'hello') 230 231 def test_add_object_existing(self): 232 '''try to add an existing object''' 233 234 self.dbus_mock.AddObject('/obj1', 'org.freedesktop.Test.Sub', {}, []) 235 236 self.assertRaises(dbus.exceptions.DBusException, 237 self.dbus_mock.AddObject, 238 '/obj1', 239 'org.freedesktop.Test.Sub', 240 {}, 241 []) 242 243 # try to add the main object again 244 self.assertRaises(dbus.exceptions.DBusException, 245 self.dbus_mock.AddObject, 246 '/', 247 'org.freedesktop.Test.Other', 248 {}, 249 []) 250 251 def test_add_object_with_methods(self): 252 '''add a new object with methods''' 253 254 self.dbus_mock.AddObject('/obj1', 255 'org.freedesktop.Test.Sub', 256 { 257 'state': dbus.String('online', variant_level=1), 258 'cute': dbus.Boolean(True, variant_level=1), 259 }, 260 [ 261 ('Do0', '', 'i', 'ret = 42'), 262 ('Do1', 'i', 'i', 'ret = 31337'), 263 ]) 264 265 obj1 = self.dbus_con.get_object('org.freedesktop.Test', '/obj1') 266 267 self.assertEqual(obj1.Do0(), 42) 268 self.assertEqual(obj1.Do1(1), 31337) 269 self.assertRaises(dbus.exceptions.DBusException, 270 obj1.Do2, 31337) 271 272 def test_properties(self): 273 '''add and change properties''' 274 275 # no properties by default 276 self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'), {}) 277 278 # no such property 279 with self.assertRaises(dbus.exceptions.DBusException) as ctx: 280 self.dbus_props.Get('org.freedesktop.Test.Main', 'version') 281 self.assertEqual(ctx.exception.get_dbus_name(), 282 'org.freedesktop.Test.Main.UnknownProperty') 283 self.assertEqual(ctx.exception.get_dbus_message(), 284 'no such property version') 285 286 self.assertRaises(dbus.exceptions.DBusException, 287 self.dbus_props.Set, 288 'org.freedesktop.Test.Main', 289 'version', 290 dbus.Int32(2, variant_level=1)) 291 292 self.dbus_mock.AddProperty('org.freedesktop.Test.Main', 293 'version', 294 dbus.Int32(2, variant_level=1)) 295 # once again on default interface 296 self.dbus_mock.AddProperty('', 297 'connected', 298 dbus.Boolean(True, variant_level=1)) 299 300 self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'version'), 2) 301 self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'connected'), True) 302 303 self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'), 304 {'version': 2, 'connected': True}) 305 306 with self.assertRaises(dbus.exceptions.DBusException) as ctx: 307 self.dbus_props.GetAll('org.freedesktop.Test.Bogus') 308 self.assertEqual(ctx.exception.get_dbus_name(), 309 'org.freedesktop.Test.Main.UnknownInterface') 310 self.assertEqual(ctx.exception.get_dbus_message(), 311 'no such interface org.freedesktop.Test.Bogus') 312 313 # change property 314 self.dbus_props.Set('org.freedesktop.Test.Main', 'version', 315 dbus.Int32(4, variant_level=1)) 316 self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'version'), 4) 317 318 # check that the Get/Set calls get logged 319 with open(self.mock_log.name, encoding="UTF-8") as f: 320 contents = f.read() 321 self.assertRegex(contents, '\n[0-9.]+ Get org.freedesktop.Test.Main.version\n') 322 self.assertRegex(contents, '\n[0-9.]+ Get org.freedesktop.Test.Main.connected\n') 323 self.assertRegex(contents, '\n[0-9.]+ GetAll org.freedesktop.Test.Main\n') 324 self.assertRegex(contents, '\n[0-9.]+ Set org.freedesktop.Test.Main.version 4\n') 325 326 # add property to different interface 327 self.dbus_mock.AddProperty('org.freedesktop.Test.Other', 328 'color', 329 dbus.String('yellow', variant_level=1)) 330 331 self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'), 332 {'version': 4, 'connected': True}) 333 self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Other'), 334 {'color': 'yellow'}) 335 self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Other', 'color'), 336 'yellow') 337 338 changed_props = [] 339 ml = GLib.MainLoop() 340 341 def catch(*args, **kwargs): 342 if kwargs['interface'] != 'org.freedesktop.DBus.Properties': 343 return 344 345 self.assertEqual(kwargs['interface'], 'org.freedesktop.DBus.Properties') 346 self.assertEqual(kwargs['member'], 'PropertiesChanged') 347 348 [iface, changed, _invalidated] = args 349 self.assertEqual(iface, 'org.freedesktop.Test.Main') 350 351 changed_props.append(changed) 352 ml.quit() 353 354 match = self.dbus_con.add_signal_receiver(catch, 355 interface_keyword='interface', 356 path_keyword='path', 357 member_keyword='member') 358 359 # change property using mock helper 360 self.dbus_mock.UpdateProperties('org.freedesktop.Test.Main', { 361 'version': 5, 362 'connected': False, 363 }) 364 365 GLib.timeout_add(3000, ml.quit) 366 ml.run() 367 368 match.remove() 369 370 self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'), 371 {'version': 5, 'connected': False}) 372 self.assertEqual(changed_props, 373 [{'version': 5, 'connected': False}]) 374 375 # test adding properties with the array type 376 self.dbus_mock.AddProperty('org.freedesktop.Test.Main', 377 'array', 378 dbus.Array(['first'], signature='s')) 379 self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'array'), 380 ['first']) 381 382 # test updating properties with the array type 383 self.dbus_mock.UpdateProperties('org.freedesktop.Test.Main', 384 {'array': dbus.Array(['second', 'third'], 385 signature='s')}) 386 self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'array'), 387 ['second', 'third']) 388 389 def test_introspection_methods(self): 390 '''dynamically added methods appear in introspection''' 391 392 dbus_introspect = dbus.Interface(self.obj_test, dbus.INTROSPECTABLE_IFACE) 393 394 xml_empty = dbus_introspect.Introspect() 395 self.assertIn('<interface name="org.freedesktop.DBus.Mock">', xml_empty) 396 self.assertIn('<method name="AddMethod">', xml_empty) 397 398 self.dbus_mock.AddMethod('', 'Do', 'saiv', 'i', 'ret = 42') 399 400 xml_method = dbus_introspect.Introspect() 401 self.assertNotEqual(xml_empty, xml_method) 402 self.assertIn('<interface name="org.freedesktop.Test.Main">', xml_method) 403 # various Python versions use different name vs. type ordering 404 expected1 = '''<method name="Do"> 405 <arg direction="in" name="arg1" type="s" /> 406 <arg direction="in" name="arg2" type="ai" /> 407 <arg direction="in" name="arg3" type="v" /> 408 <arg direction="out" type="i" /> 409 </method>''' 410 expected2 = '''<method name="Do"> 411 <arg direction="in" type="s" name="arg1" /> 412 <arg direction="in" type="ai" name="arg2" /> 413 <arg direction="in" type="v" name="arg3" /> 414 <arg direction="out" type="i" /> 415 </method>''' 416 self.assertTrue(expected1 in xml_method or expected2 in xml_method, xml_method) 417 418 # properties in introspection are not supported by dbus-python right now 419 def test_introspection_properties(self): 420 '''dynamically added properties appear in introspection''' 421 422 self.dbus_mock.AddProperty('', 'Color', 'yellow') 423 self.dbus_mock.AddProperty('org.freedesktop.Test.Sub', 'Count', 5) 424 425 xml = self.obj_test.Introspect() 426 427 self.assertIn('<interface name="org.freedesktop.Test.Main">', xml) 428 self.assertIn('<interface name="org.freedesktop.Test.Sub">', xml) 429 # various Python versions use different attribute ordering 430 self.assertTrue('<property access="readwrite" name="Color" type="s" />' in xml or 431 '<property name="Color" type="s" access="readwrite" />' in xml, xml) 432 self.assertTrue('<property access="readwrite" name="Count" type="i" />' in xml or 433 '<property name="Count" type="i" access="readwrite" />' in xml, xml) 434 435 def test_objects_map(self): 436 '''access global objects map''' 437 438 self.dbus_mock.AddMethod('', 'EnumObjs', '', 'ao', 'ret = objects.keys()') 439 self.assertEqual(self.dbus_test.EnumObjs(), ['/']) 440 441 self.dbus_mock.AddObject('/obj1', 'org.freedesktop.Test.Sub', {}, []) 442 self.assertEqual(set(self.dbus_test.EnumObjs()), {'/', '/obj1'}) 443 444 def test_signals(self): 445 '''emitting signals''' 446 447 def do_emit(): 448 self.dbus_mock.EmitSignal('', 'SigNoArgs', '', []) 449 self.dbus_mock.EmitSignal('org.freedesktop.Test.Sub', 450 'SigTwoArgs', 451 'su', ['hello', 42]) 452 self.dbus_mock.EmitSignal('org.freedesktop.Test.Sub', 453 'SigTypeTest', 454 'iuvao', 455 [-42, 42, dbus.String('hello', variant_level=1), ['/a', '/b']]) 456 457 caught = [] 458 ml = GLib.MainLoop() 459 460 def catch(*args, **kwargs): 461 if kwargs['interface'].startswith('org.freedesktop.Test'): 462 caught.append((args, kwargs)) 463 if len(caught) == 3: 464 # we caught everything there is to catch, don't wait for the 465 # timeout 466 ml.quit() 467 468 self.dbus_con.add_signal_receiver(catch, 469 interface_keyword='interface', 470 path_keyword='path', 471 member_keyword='member') 472 473 GLib.timeout_add(200, do_emit) 474 # ensure that the loop quits even when we catch fewer than 2 signals 475 GLib.timeout_add(3000, ml.quit) 476 ml.run() 477 478 # check SigNoArgs 479 self.assertEqual(caught[0][0], ()) 480 self.assertEqual(caught[0][1]['member'], 'SigNoArgs') 481 self.assertEqual(caught[0][1]['path'], '/') 482 self.assertEqual(caught[0][1]['interface'], 'org.freedesktop.Test.Main') 483 484 # check SigTwoArgs 485 self.assertEqual(caught[1][0], ('hello', 42)) 486 self.assertEqual(caught[1][1]['member'], 'SigTwoArgs') 487 self.assertEqual(caught[1][1]['path'], '/') 488 self.assertEqual(caught[1][1]['interface'], 'org.freedesktop.Test.Sub') 489 490 # check data types in SigTypeTest 491 self.assertEqual(caught[2][1]['member'], 'SigTypeTest') 492 self.assertEqual(caught[2][1]['path'], '/') 493 args = caught[2][0] 494 self.assertEqual(args[0], -42) 495 self.assertEqual(type(args[0]), dbus.Int32) 496 self.assertEqual(args[0].variant_level, 0) 497 498 self.assertEqual(args[1], 42) 499 self.assertEqual(type(args[1]), dbus.UInt32) 500 self.assertEqual(args[1].variant_level, 0) 501 502 self.assertEqual(args[2], 'hello') 503 self.assertEqual(type(args[2]), dbus.String) 504 self.assertEqual(args[2].variant_level, 1) 505 506 self.assertEqual(args[3], ['/a', '/b']) 507 self.assertEqual(type(args[3]), dbus.Array) 508 self.assertEqual(args[3].variant_level, 0) 509 self.assertEqual(type(args[3][0]), dbus.ObjectPath) 510 self.assertEqual(args[3][0].variant_level, 0) 511 512 # check correct logging 513 with open(self.mock_log.name, encoding="UTF-8") as f: 514 log = f.read() 515 self.assertRegex(log, '[0-9.]+ emit org.freedesktop.Test.Main.SigNoArgs\n') 516 self.assertRegex(log, '[0-9.]+ emit org.freedesktop.Test.Sub.SigTwoArgs "hello" 42\n') 517 self.assertRegex(log, '[0-9.]+ emit org.freedesktop.Test.Sub.SigTypeTest -42 42') 518 self.assertRegex(log, r'[0-9.]+ emit org.freedesktop.Test.Sub.SigTypeTest -42 42 "hello" \["/a", "/b"\]\n') 519 520 def test_signals_type_mismatch(self): 521 '''emitting signals with wrong arguments''' 522 523 def check(signature, args, err): 524 try: 525 self.dbus_mock.EmitSignal('', 's', signature, args) 526 self.fail(f'EmitSignal did not raise an error for signature "{signature}" and arguments {args}') 527 except dbus.exceptions.DBusException as e: 528 self.assertIn(err, str(e)) 529 530 # not enough arguments 531 check('i', [], 'TypeError: More items found') 532 check('is', [1], 'TypeError: More items found') 533 534 # too many arguments 535 check('', [1], 'TypeError: Fewer items found') 536 check('i', [1, 'hello'], 'TypeError: Fewer items found') 537 538 # type mismatch 539 check('u', [-1], 'convert negative value to unsigned') 540 check('i', ['hello'], 'TypeError') 541 check('i', ['hello'], 'integer') 542 check('s', [1], 'TypeError: Expected a string') 543 544 def test_dbus_get_log(self): 545 '''query call logs over D-Bus''' 546 547 self.assertEqual(self.dbus_mock.ClearCalls(), None) 548 self.assertEqual(self.dbus_mock.GetCalls(), dbus.Array([])) 549 550 self.dbus_mock.AddMethod('', 'Do', '', '', '') 551 self.assertEqual(self.dbus_test.Do(), None) 552 mock_log = self.dbus_mock.GetCalls() 553 self.assertEqual(len(mock_log), 1) 554 self.assertGreater(mock_log[0][0], 10000) # timestamp 555 self.assertEqual(mock_log[0][1], 'Do') 556 self.assertEqual(mock_log[0][2], []) 557 558 self.assertEqual(self.dbus_mock.ClearCalls(), None) 559 self.assertEqual(self.dbus_mock.GetCalls(), dbus.Array([])) 560 561 self.dbus_mock.AddMethod('', 'Wop', 's', 's', 'ret="hello"') 562 self.assertEqual(self.dbus_test.Wop('foo'), 'hello') 563 self.assertEqual(self.dbus_test.Wop('bar'), 'hello') 564 mock_log = self.dbus_mock.GetCalls() 565 self.assertEqual(len(mock_log), 2) 566 self.assertGreater(mock_log[0][0], 10000) # timestamp 567 self.assertEqual(mock_log[0][1], 'Wop') 568 self.assertEqual(mock_log[0][2], ['foo']) 569 self.assertEqual(mock_log[1][1], 'Wop') 570 self.assertEqual(mock_log[1][2], ['bar']) 571 572 self.assertEqual(self.dbus_mock.ClearCalls(), None) 573 self.assertEqual(self.dbus_mock.GetCalls(), dbus.Array([])) 574 575 def test_dbus_get_method_calls(self): 576 '''query method call logs over D-Bus''' 577 578 self.dbus_mock.AddMethod('', 'Do', '', '', '') 579 self.assertEqual(self.dbus_test.Do(), None) 580 self.assertEqual(self.dbus_test.Do(), None) 581 582 self.dbus_mock.AddMethod('', 'Wop', 's', 's', 'ret="hello"') 583 self.assertEqual(self.dbus_test.Wop('foo'), 'hello') 584 self.assertEqual(self.dbus_test.Wop('bar'), 'hello') 585 586 mock_calls = self.dbus_mock.GetMethodCalls('Do') 587 self.assertEqual(len(mock_calls), 2) 588 self.assertEqual(mock_calls[0][1], []) 589 self.assertEqual(mock_calls[1][1], []) 590 591 mock_calls = self.dbus_mock.GetMethodCalls('Wop') 592 self.assertEqual(len(mock_calls), 2) 593 self.assertGreater(mock_calls[0][0], 10000) # timestamp 594 self.assertEqual(mock_calls[0][1], ['foo']) 595 self.assertGreater(mock_calls[1][0], 10000) # timestamp 596 self.assertEqual(mock_calls[1][1], ['bar']) 597 598 def test_dbus_method_called(self): 599 '''subscribe to MethodCalled signal''' 600 601 loop = GLib.MainLoop() 602 caught_signals = [] 603 604 def method_called(method, args, **_): 605 caught_signals.append((method, args)) 606 loop.quit() 607 608 self.dbus_mock.AddMethod('', 'Do', 's', '', '') 609 self.dbus_mock.connect_to_signal('MethodCalled', method_called) 610 self.assertEqual(self.dbus_test.Do('foo'), None) 611 612 GLib.timeout_add(5000, loop.quit) 613 loop.run() 614 615 self.assertEqual(len(caught_signals), 1) 616 method, args = caught_signals[0] 617 self.assertEqual(method, 'Do') 618 self.assertEqual(len(args), 1) 619 self.assertEqual(args[0], 'foo') 620 621 def test_reset(self): 622 '''resetting to pristine state''' 623 624 self.dbus_mock.AddMethod('', 'Do', '', '', '') 625 self.dbus_mock.AddProperty('', 'propone', True) 626 self.dbus_mock.AddProperty('org.Test.Other', 'proptwo', 1) 627 self.dbus_mock.AddObject('/obj1', '', {}, []) 628 629 self.dbus_mock.Reset() 630 631 # resets properties and keeps the initial object 632 self.assertEqual(self.dbus_props.GetAll(''), {}) 633 # resets methods 634 self.assertRaises(dbus.exceptions.DBusException, self.dbus_test.Do) 635 # resets other objects 636 obj1 = self.dbus_con.get_object('org.freedesktop.Test', '/obj1') 637 self.assertRaises(dbus.exceptions.DBusException, obj1.GetAll, '') 638 639 640class TestTemplates(dbusmock.DBusTestCase): 641 '''Test template API''' 642 643 @classmethod 644 def setUpClass(cls): 645 cls.start_session_bus() 646 cls.start_system_bus() 647 648 def test_local(self): 649 '''Load a local template *.py file''' 650 651 with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template: 652 my_template.write(b'''import dbus 653BUS_NAME = 'universe.Ultimate' 654MAIN_OBJ = '/' 655MAIN_IFACE = 'universe.Ultimate' 656SYSTEM_BUS = False 657 658def load(mock, parameters): 659 mock.AddMethods(MAIN_IFACE, [('Answer', '', 'i', 'ret = 42')]) 660''') 661 my_template.flush() 662 (p_mock, dbus_ultimate) = self.spawn_server_template( 663 my_template.name, stdout=subprocess.PIPE) 664 self.addCleanup(p_mock.wait) 665 self.addCleanup(p_mock.terminate) 666 self.addCleanup(p_mock.stdout.close) 667 668 # ensure that we don't use/write any .pyc files, they are dangerous 669 # in a world-writable directory like /tmp 670 self.assertFalse(os.path.exists(my_template.name + 'c')) 671 self.assertFalse(os.path.exists(importlib.util.cache_from_source(my_template.name))) 672 673 self.assertEqual(dbus_ultimate.Answer(), 42) 674 675 # should appear in introspection 676 xml = dbus_ultimate.Introspect() 677 self.assertIn('<interface name="universe.Ultimate">', xml) 678 self.assertIn('<method name="Answer">', xml) 679 680 # should not have ObjectManager API by default 681 self.assertRaises(dbus.exceptions.DBusException, 682 dbus_ultimate.GetManagedObjects) 683 684 def test_static_method(self): 685 '''Static method in a template''' 686 687 with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template: 688 my_template.write(b'''import dbus 689BUS_NAME = 'universe.Ultimate' 690MAIN_OBJ = '/' 691MAIN_IFACE = 'universe.Ultimate' 692SYSTEM_BUS = False 693 694def load(mock, parameters): 695 pass 696 697@dbus.service.method(MAIN_IFACE, 698 in_signature='', 699 out_signature='i') 700def Answer(self): 701 return 42 702''') 703 my_template.flush() 704 (p_mock, dbus_ultimate) = self.spawn_server_template( 705 my_template.name, stdout=subprocess.PIPE) 706 self.addCleanup(p_mock.wait) 707 self.addCleanup(p_mock.terminate) 708 self.addCleanup(p_mock.stdout.close) 709 710 self.assertEqual(dbus_ultimate.Answer(), 42) 711 712 # should appear in introspection 713 xml = dbus_ultimate.Introspect() 714 self.assertIn('<interface name="universe.Ultimate">', xml) 715 self.assertIn('<method name="Answer">', xml) 716 717 def test_local_nonexisting(self): 718 self.assertRaises(ImportError, self.spawn_server_template, '/non/existing.py') 719 720 def test_explicit_bus_(self): 721 '''Explicitly set the bus for a template that does not specify SYSTEM_BUS''' 722 723 with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template: 724 my_template.write(b'''import dbus 725BUS_NAME = 'universe.Ultimate' 726MAIN_OBJ = '/' 727MAIN_IFACE = 'universe.Ultimate' 728 729def load(mock, parameters): 730 mock.AddMethods(MAIN_IFACE, [('Answer', '', 'i', 'ret = 42')]) 731''') 732 my_template.flush() 733 (p_mock, dbus_ultimate) = self.spawn_server_template( 734 my_template.name, stdout=subprocess.PIPE, system_bus=False) 735 self.addCleanup(p_mock.wait) 736 self.addCleanup(p_mock.terminate) 737 self.addCleanup(p_mock.stdout.close) 738 739 self.wait_for_bus_object('universe.Ultimate', '/') 740 self.assertEqual(dbus_ultimate.Answer(), 42) 741 742 def test_override_bus_(self): 743 '''Override the bus for a template''' 744 745 with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template: 746 my_template.write(b'''import dbus 747BUS_NAME = 'universe.Ultimate' 748MAIN_OBJ = '/' 749MAIN_IFACE = 'universe.Ultimate' 750SYSTEM_BUS = True 751 752def load(mock, parameters): 753 mock.AddMethods(MAIN_IFACE, [('Answer', '', 'i', 'ret = 42')]) 754''') 755 my_template.flush() 756 (p_mock, dbus_ultimate) = self.spawn_server_template( 757 my_template.name, stdout=subprocess.PIPE, system_bus=False) 758 self.addCleanup(p_mock.wait) 759 self.addCleanup(p_mock.terminate) 760 self.addCleanup(p_mock.stdout.close) 761 762 self.wait_for_bus_object('universe.Ultimate', '/') 763 self.assertEqual(dbus_ultimate.Answer(), 42) 764 765 def test_object_manager(self): 766 '''Template with ObjectManager API''' 767 768 with tempfile.NamedTemporaryFile(prefix='objmgr_', suffix='.py') as my_template: 769 my_template.write(b'''import dbus 770BUS_NAME = 'org.test.Things' 771MAIN_OBJ = '/org/test/Things' 772IS_OBJECT_MANAGER = True 773SYSTEM_BUS = False 774 775def load(mock, parameters): 776 mock.AddObject('/org/test/Things/Thing1', 'org.test.Do', {'name': 'one'}, []) 777 mock.AddObject('/org/test/Things/Thing2', 'org.test.Do', {'name': 'two'}, []) 778 mock.AddObject('/org/test/Peer', 'org.test.Do', {'name': 'peer'}, []) 779''') 780 my_template.flush() 781 (p_mock, dbus_objmgr) = self.spawn_server_template( 782 my_template.name, stdout=subprocess.PIPE) 783 self.addCleanup(p_mock.wait) 784 self.addCleanup(p_mock.terminate) 785 self.addCleanup(p_mock.stdout.close) 786 787 # should have the two Things, but not the Peer 788 self.assertEqual(dbus_objmgr.GetManagedObjects(), 789 {'/org/test/Things/Thing1': {'org.test.Do': {'name': 'one'}}, 790 '/org/test/Things/Thing2': {'org.test.Do': {'name': 'two'}}}) 791 792 # should appear in introspection 793 xml = dbus_objmgr.Introspect() 794 self.assertIn('<interface name="org.freedesktop.DBus.ObjectManager">', xml) 795 self.assertIn('<method name="GetManagedObjects">', xml) 796 self.assertIn('<node name="Thing1" />', xml) 797 self.assertIn('<node name="Thing2" />', xml) 798 799 def test_reset(self): 800 '''Reset() puts the template back to pristine state''' 801 802 (p_mock, obj_logind) = self.spawn_server_template( 803 'logind', stdout=subprocess.PIPE) 804 self.addCleanup(p_mock.wait) 805 self.addCleanup(p_mock.terminate) 806 self.addCleanup(p_mock.stdout.close) 807 808 # do some property, method, and object changes 809 obj_logind.Set('org.freedesktop.login1.Manager', 'IdleAction', 'frob') 810 mock_logind = dbus.Interface(obj_logind, dbusmock.MOCK_IFACE) 811 mock_logind.AddProperty('org.Test.Other', 'walk', 'silly') 812 mock_logind.AddMethod('', 'DoWalk', '', '', '') 813 mock_logind.AddObject('/obj1', '', {}, []) 814 815 mock_logind.Reset() 816 817 # keeps the objects from the template 818 dbus_con = self.get_dbus(system_bus=True) 819 obj_logind = dbus_con.get_object('org.freedesktop.login1', 820 '/org/freedesktop/login1') 821 self.assertEqual(obj_logind.CanSuspend(), 'yes') 822 823 # resets properties 824 self.assertRaises(dbus.exceptions.DBusException, 825 obj_logind.GetAll, 'org.Test.Other') 826 self.assertEqual( 827 obj_logind.Get('org.freedesktop.login1.Manager', 'IdleAction'), 828 'ignore') 829 # resets methods 830 self.assertRaises(dbus.exceptions.DBusException, obj_logind.DoWalk) 831 # resets other objects 832 obj1 = dbus_con.get_object('org.freedesktop.login1', '/obj1') 833 self.assertRaises(dbus.exceptions.DBusException, obj1.GetAll, '') 834 835 836class TestCleanup(dbusmock.DBusTestCase): 837 '''Test cleanup of resources''' 838 839 def test_mock_terminates_with_bus(self): 840 '''Spawned mock processes exit when bus goes down''' 841 842 self.start_session_bus() 843 p_mock = self.spawn_server('org.freedesktop.Test', 844 '/', 845 'org.freedesktop.Test.Main') 846 self.stop_dbus(self.session_bus_pid) 847 848 # give the mock 2 seconds to terminate 849 timeout = 20 850 while timeout > 0: 851 if p_mock.poll() is not None: 852 break 853 timeout -= 1 854 time.sleep(0.1) 855 856 if p_mock.poll() is None: 857 # clean up manually 858 p_mock.terminate() 859 p_mock.wait() 860 self.fail('mock process did not terminate after 2 seconds') 861 862 self.assertEqual(p_mock.wait(), 0) 863 864 865class TestSubclass(dbusmock.DBusTestCase): 866 '''Test subclassing DBusMockObject''' 867 868 @classmethod 869 def setUpClass(cls): 870 cls.start_session_bus() 871 872 def test_ctor(self): 873 '''Override DBusMockObject constructor''' 874 875 class MyMock(dbusmock.mockobject.DBusMockObject): 876 def __init__(self): 877 bus_name = dbus.service.BusName('org.test.MyMock', 878 dbusmock.testcase.DBusTestCase.get_dbus()) 879 dbusmock.mockobject.DBusMockObject.__init__( 880 self, bus_name, '/', 'org.test.A', {}, os.devnull) 881 self.AddMethod('', 'Ping', '', 'i', 'ret = 42') 882 883 m = MyMock() 884 self.assertEqual(m.Ping(), 42) # pylint: disable=no-member 885 886 def test_none_props(self): 887 '''object with None properties argument''' 888 889 class MyMock(dbusmock.mockobject.DBusMockObject): 890 def __init__(self): 891 bus_name = dbus.service.BusName('org.test.MyMock', 892 dbusmock.testcase.DBusTestCase.get_dbus()) 893 dbusmock.mockobject.DBusMockObject.__init__( 894 self, bus_name, '/mymock', 'org.test.MyMockI', None, os.devnull) 895 self.AddMethod('', 'Ping', '', 'i', 'ret = 42') 896 897 m = MyMock() 898 self.assertEqual(m.Ping(), 42) # pylint: disable=no-member 899 self.assertEqual(m.GetAll('org.test.MyMockI'), {}) 900 901 m.AddProperty('org.test.MyMockI', 'blurb', 5) 902 self.assertEqual(m.GetAll('org.test.MyMockI'), {'blurb': 5}) 903 904 905if __name__ == '__main__': 906 # avoid writing to stderr 907 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout)) 908