1# coding: UTF-8 2'''Mock D-Bus objects for test suites.''' 3 4# This program is free software; you can redistribute it and/or modify it under 5# the terms of the GNU Lesser General Public License as published by the Free 6# Software Foundation; either version 3 of the License, or (at your option) any 7# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text 8# of the license. 9 10__author__ = 'Martin Pitt' 11__copyright__ = '(c) 2012 Canonical Ltd.' 12 13import copy 14import importlib 15import importlib.util 16import os 17import sys 18import time 19import types 20from typing import Optional, Dict, Any, List, Tuple, Sequence, KeysView 21from xml.etree import ElementTree 22 23import dbus 24import dbus.service 25 26# we do not use this ourselves, but mock methods often want to use this 27os # pyflakes pylint: disable=pointless-statement 28 29# global path -> DBusMockObject mapping 30objects: Dict[str, 'DBusMockObject'] = {} 31 32MOCK_IFACE = 'org.freedesktop.DBus.Mock' 33OBJECT_MANAGER_IFACE = 'org.freedesktop.DBus.ObjectManager' 34 35 36PropsType = Dict[str, Any] 37# (in_signature, out_signature, code, dbus_wrapper_fn) 38MethodType = Tuple[str, str, str, str] 39# (timestamp, method_name, call_args) 40CallLogType = Tuple[int, str, Sequence[Any]] 41 42 43def load_module(name: str): 44 '''Load a mock template Python module from dbusmock/templates/''' 45 46 if os.path.exists(name) and os.path.splitext(name)[1] == '.py': 47 spec = importlib.util.spec_from_file_location(os.path.splitext(os.path.basename(name))[0], name) 48 assert spec 49 mod = importlib.util.module_from_spec(spec) 50 with open(name, encoding="UTF-8") as f: 51 exec(f.read(), mod.__dict__, mod.__dict__) # pylint: disable=exec-used 52 return mod 53 54 return importlib.import_module('dbusmock.templates.' + name) 55 56 57def _format_args(args): 58 '''Format a D-Bus argument tuple into an appropriate logging string''' 59 60 def format_arg(a): 61 if isinstance(a, dbus.Boolean): 62 return str(bool(a)) 63 if isinstance(a, (dbus.Byte, int)): 64 return str(int(a)) 65 if isinstance(a, str): 66 return '"' + str(a) + '"' 67 if isinstance(a, list): 68 return '[' + ', '.join([format_arg(x) for x in a]) + ']' 69 if isinstance(a, dict): 70 fmta = '{' 71 first = True 72 for k, v in a.items(): 73 if first: 74 first = False 75 else: 76 fmta += ', ' 77 fmta += format_arg(k) + ': ' + format_arg(v) 78 return fmta + '}' 79 80 # fallback 81 return repr(a) 82 83 s = '' 84 for a in args: 85 if s: 86 s += ' ' 87 s += format_arg(a) 88 if s: 89 s = ' ' + s 90 return s 91 92 93def _wrap_in_dbus_variant(value): 94 dbus_types = [ 95 dbus.types.ByteArray, 96 dbus.types.Int16, 97 dbus.types.ObjectPath, 98 dbus.types.Struct, 99 dbus.types.UInt64, 100 dbus.types.Boolean, 101 dbus.types.Dictionary, 102 dbus.types.Int32, 103 dbus.types.Signature, 104 dbus.types.UInt16, 105 dbus.types.UnixFd, 106 dbus.types.Byte, 107 dbus.types.Double, 108 dbus.types.Int64, 109 dbus.types.String, 110 dbus.types.UInt32, 111 ] 112 if isinstance(value, dbus.String): 113 return dbus.String(str(value), variant_level=1) 114 if isinstance(value, dbus.types.Array): 115 return value 116 if type(value) in dbus_types: 117 return type(value)(value.conjugate(), variant_level=1) 118 if isinstance(value, str): 119 return dbus.String(value, variant_level=1) 120 raise dbus.exceptions.DBusException(f'could not wrap type {type(value)}') 121 122 123class DBusMockObject(dbus.service.Object): # pylint: disable=too-many-instance-attributes 124 '''Mock D-Bus object 125 126 This can be configured to have arbitrary methods (including code execution) 127 and properties via methods on the org.freedesktop.DBus.Mock interface, so 128 that you can control the mock from any programming language. 129 ''' 130 131 def __init__(self, bus_name: str, path: str, interface: str, props: PropsType, 132 logfile: Optional[str] = None, is_object_manager: bool = False) -> None: 133 '''Create a new DBusMockObject 134 135 bus_name: A dbus.service.BusName instance where the object will be put on 136 path: D-Bus object path 137 interface: Primary D-Bus interface name of this object (where 138 properties and methods will be put on) 139 props: A property_name (string) → property (Variant) map with initial 140 properties on "interface" 141 logfile: When given, method calls will be logged into that file name; 142 if None, logging will be written to stdout. Note that you can 143 also query the called methods over D-Bus with GetCalls() and 144 GetMethodCalls(). 145 is_object_manager: If True, the GetManagedObjects method will 146 automatically be implemented on the object, returning 147 all objects which have this one’s path as a prefix of 148 theirs. Note that the InterfacesAdded and 149 InterfacesRemoved signals will not be automatically 150 emitted. 151 ''' 152 dbus.service.Object.__init__(self, bus_name, path) 153 154 self.bus_name = bus_name 155 self.path = path 156 self.interface = interface 157 self.is_object_manager = is_object_manager 158 self.object_manager: Optional[DBusMockObject] = None 159 160 self._template: Optional[str] = None 161 self._template_parameters: Optional[PropsType] = None 162 163 # pylint: disable=consider-using-with 164 self.logfile = open(logfile, 'wb') if logfile else None 165 self.is_logfile_owner = True 166 self.call_log: List[CallLogType] = [] 167 168 if props is None: 169 props = {} 170 171 self._reset(props) 172 173 def __del__(self) -> None: 174 if self.logfile and self.is_logfile_owner: 175 self.logfile.close() 176 177 def _set_up_object_manager(self) -> None: 178 '''Set up this mock object as a D-Bus ObjectManager.''' 179 if self.path == '/': 180 cond = 'k != \'/\'' 181 else: 182 cond = f'k.startswith(\'{self.path}/\')' 183 184 self.AddMethod(OBJECT_MANAGER_IFACE, 185 'GetManagedObjects', '', 'a{oa{sa{sv}}}', 186 'ret = {dbus.ObjectPath(k): objects[k].props ' + 187 ' for k in objects.keys() if ' + cond + '}') 188 self.object_manager = self 189 190 def _reset(self, props: PropsType) -> None: 191 # interface -> name -> value 192 self.props = {self.interface: props} 193 194 # interface -> name -> (in_signature, out_signature, code, dbus_wrapper_fn) 195 self.methods: Dict[str, Dict[str, MethodType]] = {self.interface: {}} 196 197 if self.is_object_manager: 198 self._set_up_object_manager() 199 200 @dbus.service.method(dbus.PROPERTIES_IFACE, 201 in_signature='ss', out_signature='v') 202 def Get(self, interface_name: str, property_name: str) -> Any: 203 '''Standard D-Bus API for getting a property value''' 204 205 self.log(f'Get {interface_name}.{property_name}') 206 207 if not interface_name: 208 interface_name = self.interface 209 try: 210 return self.GetAll(interface_name)[property_name] 211 except KeyError as e: 212 raise dbus.exceptions.DBusException( 213 'no such property ' + property_name, 214 name=self.interface + '.UnknownProperty') from e 215 216 @dbus.service.method(dbus.PROPERTIES_IFACE, 217 in_signature='s', out_signature='a{sv}') 218 def GetAll(self, interface_name: str, *_, **__) -> PropsType: 219 '''Standard D-Bus API for getting all property values''' 220 221 self.log('GetAll ' + interface_name) 222 223 if not interface_name: 224 interface_name = self.interface 225 try: 226 return self.props[interface_name] 227 except KeyError as e: 228 raise dbus.exceptions.DBusException( 229 'no such interface ' + interface_name, 230 name=self.interface + '.UnknownInterface') from e 231 232 @dbus.service.method(dbus.PROPERTIES_IFACE, 233 in_signature='ssv', out_signature='') 234 def Set(self, interface_name: str, property_name: str, value: Any, *_, **__) -> None: 235 '''Standard D-Bus API for setting a property value''' 236 237 self.log(f'Set {interface_name}.{property_name}{_format_args((value,))}') 238 239 try: 240 iface_props = self.props[interface_name] 241 except KeyError as e: 242 raise dbus.exceptions.DBusException( 243 'no such interface ' + interface_name, 244 name=self.interface + '.UnknownInterface') from e 245 246 if property_name not in iface_props: 247 raise dbus.exceptions.DBusException( 248 'no such property ' + property_name, 249 name=self.interface + '.UnknownProperty') 250 251 iface_props[property_name] = value 252 253 self.EmitSignal('org.freedesktop.DBus.Properties', 254 'PropertiesChanged', 255 'sa{sv}as', 256 [interface_name, 257 dbus.Dictionary({property_name: value}, signature='sv'), 258 dbus.Array([], signature='s') 259 ]) 260 261 @dbus.service.method(MOCK_IFACE, 262 in_signature='ssa{sv}a(ssss)', 263 out_signature='') 264 def AddObject(self, path: str, interface: str, properties: PropsType, methods: List[MethodType]) -> None: 265 '''Add a new D-Bus object to the mock 266 267 path: D-Bus object path 268 interface: Primary D-Bus interface name of this object (where 269 properties and methods will be put on) 270 properties: A property_name (string) → value map with initial 271 properties on "interface" 272 methods: An array of 4-tuples (name, in_sig, out_sig, code) describing 273 methods to add to "interface"; see AddMethod() for details of 274 the tuple values 275 276 If this is a D-Bus ObjectManager instance, the InterfacesAdded signal 277 will *not* be emitted for the object automatically; it must be emitted 278 manually if desired. This is because AddInterface may be called after 279 AddObject, but before the InterfacesAdded signal should be emitted. 280 281 Example: 282 dbus_proxy.AddObject('/com/example/Foo/Manager', 283 'com.example.Foo.Control', 284 { 285 'state': dbus.String('online', variant_level=1), 286 }, 287 [ 288 ('Start', '', '', ''), 289 ('EchoInt', 'i', 'i', 'ret = args[0]'), 290 ('GetClients', '', 'ao', 'ret = ["/com/example/Foo/Client1"]'), 291 ]) 292 ''' 293 if path in objects: 294 raise dbus.exceptions.DBusException(f'object {path} already exists', name='org.freedesktop.DBus.Mock.NameError') 295 296 obj = DBusMockObject(self.bus_name, 297 path, 298 interface, 299 properties) 300 # make sure created objects inherit the log file stream 301 obj.logfile = self.logfile 302 obj.object_manager = self.object_manager 303 obj.is_logfile_owner = False 304 obj.AddMethods(interface, methods) 305 306 objects[path] = obj 307 308 @dbus.service.method(MOCK_IFACE, 309 in_signature='s', 310 out_signature='') 311 def RemoveObject(self, path: str) -> None: # pylint: disable=no-self-use 312 '''Remove a D-Bus object from the mock 313 314 As with AddObject, this will *not* emit the InterfacesRemoved signal if 315 it’s an ObjectManager instance. 316 ''' 317 try: 318 objects[path].remove_from_connection() 319 del objects[path] 320 except KeyError as e: 321 raise dbus.exceptions.DBusException( 322 f'object {path} does not exist', 323 name='org.freedesktop.DBus.Mock.NameError') from e 324 325 @dbus.service.method(MOCK_IFACE, 326 in_signature='', out_signature='') 327 def Reset(self) -> None: 328 '''Reset the mock object state. 329 330 Remove all mock objects from the bus and tidy up so the state is as if 331 python-dbusmock had just been restarted. If the mock object was 332 originally created with a template (from the command line, the Python 333 API or by calling AddTemplate over D-Bus), it will be 334 re-instantiated with that template. 335 ''' 336 # Clear other existing objects. 337 for obj_name, obj in objects.items(): 338 if obj_name != self.path: 339 obj.remove_from_connection() 340 objects.clear() 341 342 # Reinitialise our state. Carefully remove new methods from our dict; 343 # they don't not actually exist if they are a statically defined 344 # template function 345 for method_name in self.methods[self.interface]: 346 try: 347 delattr(self.__class__, method_name) 348 except AttributeError: 349 pass 350 351 self._reset({}) 352 353 if self._template is not None: 354 self.AddTemplate(self._template, self._template_parameters) 355 356 objects[self.path] = self 357 358 @dbus.service.method(MOCK_IFACE, 359 in_signature='sssss', 360 out_signature='') 361 def AddMethod(self, interface, name: str, in_sig: str, out_sig: str, code: str) -> None: 362 '''Add a method to this object 363 364 interface: D-Bus interface to add this to. For convenience you can 365 specify '' here to add the method to the object's main 366 interface (as specified on construction). 367 name: Name of the method 368 in_sig: Signature of input arguments; for example "ias" for a method 369 that takes an int32 and a string array as arguments; see 370 http://dbus.freedesktop.org/doc/dbus-specification.html#message-protocol-signatures 371 out_sig: Signature of output arguments; for example "s" for a method 372 that returns a string; use '' for methods that do not return 373 anything. 374 code: Python 3 code to run in the method call; you have access to the 375 arguments through the "args" list, and can set the return value 376 by assigning a value to the "ret" variable. You can also read the 377 global "objects" variable, which is a dictionary mapping object 378 paths to DBusMockObject instances. 379 380 For keeping state across method calls, you are free to use normal 381 Python members of the "self" object, which will be persistent for 382 the whole mock's life time. E. g. you can have a method with 383 "self.my_state = True", and another method that returns it with 384 "ret = self.my_state". 385 386 When specifying '', the method will not do anything (except 387 logging) and return None. 388 ''' 389 # pylint: disable=protected-access 390 391 if not interface: 392 interface = self.interface 393 n_args = len(dbus.Signature(in_sig)) 394 395 # we need to have separate methods for dbus-python, so clone 396 # mock_method(); using message_keyword with this dynamic approach fails 397 # because inspect cannot handle those, so pass on interface and method 398 # name as first positional arguments 399 method = lambda self, *args, **kwargs: DBusMockObject.mock_method( 400 self, interface, name, in_sig, *args, **kwargs) 401 402 # we cannot specify in_signature here, as that trips over a consistency 403 # check in dbus-python; we need to set it manually instead 404 dbus_method = dbus.service.method(interface, 405 out_signature=out_sig)(method) 406 dbus_method.__name__ = str(name) 407 dbus_method._dbus_in_signature = in_sig 408 dbus_method._dbus_args = [f'arg{i}' for i in range(1, n_args + 1)] 409 410 # for convenience, add mocked methods on the primary interface as 411 # callable methods 412 if interface == self.interface: 413 setattr(self.__class__, name, dbus_method) 414 415 self.methods.setdefault(interface, {})[str(name)] = (in_sig, out_sig, code, dbus_method) 416 417 @dbus.service.method(MOCK_IFACE, 418 in_signature='sa(ssss)', 419 out_signature='') 420 def AddMethods(self, interface: str, methods: List[MethodType]) -> None: 421 '''Add several methods to this object 422 423 interface: D-Bus interface to add this to. For convenience you can 424 specify '' here to add the method to the object's main 425 interface (as specified on construction). 426 methods: list of 4-tuples (name, in_sig, out_sig, code) describing one 427 method each. See AddMethod() for details of the tuple values. 428 ''' 429 for method in methods: 430 self.AddMethod(interface, *method) 431 432 def _set_property(self, interface, name, value): 433 # copy.copy removes one level of variant-ness, which means that the 434 # types get exported in introspection data correctly, but we can't do 435 # this for container types. 436 if not isinstance(value, (dbus.Dictionary, dbus.Array)): 437 value = copy.copy(value) 438 439 self.props.setdefault(interface, {})[name] = value 440 441 @dbus.service.method(MOCK_IFACE, 442 in_signature='sa{sv}', 443 out_signature='') 444 def UpdateProperties(self, interface: str, properties: PropsType) -> None: 445 '''Update properties on this object and send a PropertiesChanged signal 446 447 interface: D-Bus interface to update this to. For convenience you can 448 specify '' here to add the property to the object's main 449 interface (as specified on construction). 450 properties: A property_name (string) → value map 451 ''' 452 changed_props = {} 453 454 for name, value in properties.items(): 455 if not interface: 456 interface = self.interface 457 if name not in self.props.get(interface, {}): 458 raise dbus.exceptions.DBusException(f'property {name} not found', name=interface + '.NoSuchProperty') 459 460 self._set_property(interface, name, value) 461 changed_props[name] = _wrap_in_dbus_variant(value) 462 463 self.EmitSignal(dbus.PROPERTIES_IFACE, 'PropertiesChanged', 'sa{sv}as', [ 464 interface, changed_props, []]) 465 466 @dbus.service.method(MOCK_IFACE, 467 in_signature='ssv', 468 out_signature='') 469 def AddProperty(self, interface: str, name: str, value: Any) -> None: 470 '''Add property to this object 471 472 interface: D-Bus interface to add this to. For convenience you can 473 specify '' here to add the property to the object's main 474 interface (as specified on construction). 475 name: Property name. 476 value: Property value. 477 ''' 478 if not interface: 479 interface = self.interface 480 if name in self.props.get(interface, {}): 481 raise dbus.exceptions.DBusException(f'property {name} already exists', name=self.interface + '.PropertyExists') 482 483 self._set_property(interface, name, value) 484 485 @dbus.service.method(MOCK_IFACE, 486 in_signature='sa{sv}', 487 out_signature='') 488 def AddProperties(self, interface: str, properties: PropsType) -> None: 489 '''Add several properties to this object 490 491 interface: D-Bus interface to add this to. For convenience you can 492 specify '' here to add the property to the object's main 493 interface (as specified on construction). 494 properties: A property_name (string) → value map 495 ''' 496 for k, v in properties.items(): 497 self.AddProperty(interface, k, v) 498 499 @dbus.service.method(MOCK_IFACE, 500 in_signature='sa{sv}', 501 out_signature='') 502 def AddTemplate(self, template: str, parameters: PropsType) -> None: 503 '''Load a template into the mock. 504 505 python-dbusmock ships a set of standard mocks for common system 506 services such as UPower and NetworkManager. With these the actual tests 507 become a lot simpler, as they only have to set up the particular 508 properties for the tests, and not the skeleton of common properties, 509 interfaces, and methods. 510 511 template: Name of the template to load or the full path to a *.py file 512 for custom templates. See "pydoc dbusmock.templates" for a 513 list of available templates from python-dbusmock package, and 514 "pydoc dbusmock.templates.NAME" for documentation about 515 template NAME. 516 parameters: A parameter (string) → value (variant) map, for 517 parameterizing templates. Each template can define their 518 own, see documentation of that particular template for 519 details. 520 ''' 521 try: 522 module = load_module(template) 523 except ImportError as e: 524 raise dbus.exceptions.DBusException(f'Cannot add template {template}: {str(e)}', 525 name='org.freedesktop.DBus.Mock.TemplateError') 526 527 # If the template specifies this is an ObjectManager, set that up 528 if hasattr(module, 'IS_OBJECT_MANAGER') and module.IS_OBJECT_MANAGER: 529 self._set_up_object_manager() 530 531 # pick out all D-Bus service methods and add them to our interface 532 for symbol in dir(module): 533 # pylint: disable=protected-access 534 fn = getattr(module, symbol) 535 if ('_dbus_interface' in dir(fn) and ('_dbus_is_signal' not in dir(fn) or not fn._dbus_is_signal)): 536 # for dbus-python compatibility, add methods as callables 537 setattr(self.__class__, symbol, fn) 538 self.methods.setdefault(fn._dbus_interface, {})[str(symbol)] = ( 539 fn._dbus_in_signature, 540 fn._dbus_out_signature, '', fn 541 ) 542 543 if parameters is None: 544 parameters = {} 545 546 module.load(self, parameters) 547 # save the given template and parameters for re-instantiation on 548 # Reset() 549 self._template = template 550 self._template_parameters = parameters 551 552 @dbus.service.method(MOCK_IFACE, 553 in_signature='sssav', 554 out_signature='') 555 def EmitSignal(self, interface: str, name: str, signature: str, args: List[Any]) -> None: 556 '''Emit a signal from the object. 557 558 interface: D-Bus interface to send the signal from. For convenience you 559 can specify '' here to add the method to the object's main 560 interface (as specified on construction). 561 name: Name of the signal 562 signature: Signature of input arguments; for example "ias" for a signal 563 that takes an int32 and a string array as arguments; see 564 http://dbus.freedesktop.org/doc/dbus-specification.html#message-protocol-signatures 565 args: variant array with signal arguments; must match order and type in 566 "signature" 567 ''' 568 # pylint: disable=protected-access 569 if not interface: 570 interface = self.interface 571 572 # convert types of arguments according to signature, using 573 # MethodCallMessage.append(); this will also provide type/length 574 # checks, except for the case of an empty signature 575 if signature == '' and len(args) > 0: 576 raise TypeError('Fewer items found in D-Bus signature than in Python arguments') 577 m = dbus.connection.MethodCallMessage('a.b', '/a', 'a.b', 'a') 578 m.append(signature=signature, *args) 579 args = m.get_args_list() 580 581 fn = lambda self, *args: self.log(f'emit {interface}.{name}{_format_args(args)}') 582 fn.__name__ = str(name) 583 dbus_fn = dbus.service.signal(interface)(fn) 584 dbus_fn._dbus_signature = signature 585 dbus_fn._dbus_args = [f'arg{i}' for i in range(1, len(args) + 1)] 586 587 dbus_fn(self, *args) 588 589 @dbus.service.method(MOCK_IFACE, 590 in_signature='', 591 out_signature='a(tsav)') 592 def GetCalls(self) -> List[CallLogType]: 593 '''List all the logged calls since the last call to ClearCalls(). 594 595 Return a list of (timestamp, method_name, args_list) tuples. 596 ''' 597 return self.call_log 598 599 @dbus.service.method(MOCK_IFACE, 600 in_signature='s', 601 out_signature='a(tav)') 602 def GetMethodCalls(self, method: str) -> List[Tuple[int, Sequence[Any]]]: 603 '''List all the logged calls of a particular method. 604 605 Return a list of (timestamp, args_list) tuples. 606 ''' 607 return [(row[0], row[2]) for row in self.call_log if row[1] == method] 608 609 @dbus.service.method(MOCK_IFACE, 610 in_signature='', 611 out_signature='') 612 def ClearCalls(self) -> None: 613 '''Empty the log of mock call signatures.''' 614 615 self.call_log = [] 616 617 @dbus.service.signal(MOCK_IFACE, signature='sav') 618 def MethodCalled(self, name, args): 619 '''Signal emitted for every called mock method. 620 621 This is emitted for all mock method calls. This can be used to confirm 622 that a particular method was called with particular arguments, as an 623 alternative to reading the mock's log or GetCalls(). 624 ''' 625 626 def object_manager_emit_added(self, path: str) -> None: 627 '''Emit ObjectManager.InterfacesAdded signal''' 628 629 if self.object_manager is not None: 630 self.object_manager.EmitSignal(OBJECT_MANAGER_IFACE, 'InterfacesAdded', 631 'oa{sa{sv}}', [dbus.ObjectPath(path), 632 objects[path].props]) 633 634 def object_manager_emit_removed(self, path: str) -> None: 635 '''Emit ObjectManager.InterfacesRemoved signal''' 636 637 if self.object_manager is not None: 638 self.object_manager.EmitSignal(OBJECT_MANAGER_IFACE, 'InterfacesRemoved', 639 'oas', [dbus.ObjectPath(path), 640 objects[path].props]) 641 642 def mock_method(self, interface: str, dbus_method: str, in_signature: str, *args, **_) -> Any: 643 '''Master mock method. 644 645 This gets "instantiated" in AddMethod(). Execute the code snippet of 646 the method and return the "ret" variable if it was set. 647 ''' 648 # print('mock_method', dbus_method, self, in_signature, args, kwargs, file=sys.stderr) 649 650 # convert types of arguments according to signature, using 651 # MethodCallMessage.append(); this will also provide type/length 652 # checks, except for the case of an empty signature 653 if in_signature == '' and len(args) > 0: 654 raise TypeError('Fewer items found in D-Bus signature than in Python arguments') 655 m = dbus.connection.MethodCallMessage('a.b', '/a', 'a.b', 'a') 656 m.append(signature=in_signature, *args) 657 args = m.get_args_list() 658 659 self.log(dbus_method + _format_args(args)) 660 self.call_log.append((int(time.time()), str(dbus_method), args)) 661 self.MethodCalled(dbus_method, args) 662 663 # The code may be a Python 3 string to interpret, or may be a function 664 # object (if AddMethod was called from within Python itself, rather than 665 # over D-Bus). 666 code = self.methods[interface][dbus_method][2] 667 if code and isinstance(code, types.FunctionType): 668 return code(self, *args) 669 if code: 670 loc = locals().copy() 671 exec(code, globals(), loc) # pylint: disable=exec-used 672 if 'ret' in loc: 673 return loc['ret'] 674 675 return None 676 677 def log(self, msg: str) -> None: 678 '''Log a message, prefixed with a timestamp. 679 680 If a log file was specified in the constructor, it is written there, 681 otherwise it goes to stdout. 682 ''' 683 if self.logfile: 684 fd = self.logfile.fileno() 685 else: 686 fd = sys.stdout.fileno() 687 688 os.write(fd, f'{time.time():.3f} {msg}\n'.encode('UTF-8')) 689 690 @dbus.service.method(dbus.INTROSPECTABLE_IFACE, 691 in_signature='', 692 out_signature='s', 693 path_keyword='object_path', 694 connection_keyword='connection') 695 def Introspect(self, object_path: str, connection: dbus.connection.Connection) -> str: 696 '''Return XML description of this object's interfaces, methods and signals. 697 698 This wraps dbus-python's Introspect() method to include the dynamic 699 methods and properties. 700 ''' 701 # _dbus_class_table is an indirect private member of dbus.service.Object that pylint fails to see 702 # pylint: disable=no-member 703 704 # temporarily add our dynamic methods 705 cls = self.__class__.__module__ + '.' + self.__class__.__name__ 706 orig_interfaces = self._dbus_class_table[cls] 707 708 mock_interfaces = orig_interfaces.copy() 709 for iface, methods in self.methods.items(): 710 for method, impl in methods.items(): 711 mock_interfaces.setdefault(iface, {})[method] = impl[3] 712 self._dbus_class_table[cls] = mock_interfaces 713 714 xml = dbus.service.Object.Introspect(self, object_path, connection) 715 716 tree = ElementTree.fromstring(xml) 717 718 for name, name_props in self.props.items(): 719 # We might have properties for new interfaces we don't know about 720 # yet. Try to find an existing <interface> node named after our 721 # interface to append to, and create one if we can't. 722 interface = tree.find(f".//interface[@name='{name}']") 723 if interface is None: 724 interface = ElementTree.Element("interface", {"name": name}) 725 tree.append(interface) 726 727 for prop, val in name_props.items(): 728 if val is None: 729 # can't guess type from None, skip 730 continue 731 elem = ElementTree.Element("property", { 732 "name": prop, 733 # We don't store the signature anywhere, so guess it. 734 "type": dbus.lowlevel.Message.guess_signature(val), 735 "access": "readwrite"}) 736 737 interface.append(elem) 738 739 xml = ElementTree.tostring(tree, encoding='utf8', method='xml').decode('utf8') 740 741 # restore original class table 742 self._dbus_class_table[cls] = orig_interfaces 743 744 return xml 745 746 747# Overwrite dbus-python's _method_lookup(), as that offers no way to have the 748# same method name on different interfaces 749orig_method_lookup = dbus.service._method_lookup # pylint: disable=protected-access 750 751 752def _dbusmock_method_lookup(obj, method_name, dbus_interface): 753 try: 754 m = obj.methods[dbus_interface or obj.interface][method_name] 755 return (m[3], m[3]) 756 except KeyError: 757 return orig_method_lookup(obj, method_name, dbus_interface) 758 759 760dbus.service._method_lookup = _dbusmock_method_lookup # pylint: disable=protected-access 761 762 763# 764# Helper API for templates 765# 766 767 768def get_objects() -> KeysView[str]: 769 '''Return all existing object paths''' 770 771 return objects.keys() 772 773 774def get_object(path) -> DBusMockObject: 775 '''Return object for a given object path''' 776 777 return objects[path] 778