1# coding: UTF-8
2'''Mock D-Bus objects for test suites.'''
3
4# This program is free software; you can redistribute it and/or modify it under
5# the terms of the GNU Lesser General Public License as published by the Free
6# Software Foundation; either version 3 of the License, or (at your option) any
7# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
8# of the license.
9
10__author__ = 'Martin Pitt'
11__copyright__ = '(c) 2012 Canonical Ltd.'
12
13import copy
14import importlib
15import importlib.util
16import os
17import sys
18import time
19import types
20from typing import Optional, Dict, Any, List, Tuple, Sequence, KeysView
21from xml.etree import ElementTree
22
23import dbus
24import dbus.service
25
26# we do not use this ourselves, but mock methods often want to use this
27os  # pyflakes pylint: disable=pointless-statement
28
29# global path -> DBusMockObject mapping
30objects: Dict[str, 'DBusMockObject'] = {}
31
32MOCK_IFACE = 'org.freedesktop.DBus.Mock'
33OBJECT_MANAGER_IFACE = 'org.freedesktop.DBus.ObjectManager'
34
35
36PropsType = Dict[str, Any]
37# (in_signature, out_signature, code, dbus_wrapper_fn)
38MethodType = Tuple[str, str, str, str]
39# (timestamp, method_name, call_args)
40CallLogType = Tuple[int, str, Sequence[Any]]
41
42
43def load_module(name: str):
44    '''Load a mock template Python module from dbusmock/templates/'''
45
46    if os.path.exists(name) and os.path.splitext(name)[1] == '.py':
47        spec = importlib.util.spec_from_file_location(os.path.splitext(os.path.basename(name))[0], name)
48        assert spec
49        mod = importlib.util.module_from_spec(spec)
50        with open(name, encoding="UTF-8") as f:
51            exec(f.read(), mod.__dict__, mod.__dict__)  # pylint: disable=exec-used
52        return mod
53
54    return importlib.import_module('dbusmock.templates.' + name)
55
56
57def _format_args(args):
58    '''Format a D-Bus argument tuple into an appropriate logging string'''
59
60    def format_arg(a):
61        if isinstance(a, dbus.Boolean):
62            return str(bool(a))
63        if isinstance(a, (dbus.Byte, int)):
64            return str(int(a))
65        if isinstance(a, str):
66            return '"' + str(a) + '"'
67        if isinstance(a, list):
68            return '[' + ', '.join([format_arg(x) for x in a]) + ']'
69        if isinstance(a, dict):
70            fmta = '{'
71            first = True
72            for k, v in a.items():
73                if first:
74                    first = False
75                else:
76                    fmta += ', '
77                fmta += format_arg(k) + ': ' + format_arg(v)
78            return fmta + '}'
79
80        # fallback
81        return repr(a)
82
83    s = ''
84    for a in args:
85        if s:
86            s += ' '
87        s += format_arg(a)
88    if s:
89        s = ' ' + s
90    return s
91
92
93def _wrap_in_dbus_variant(value):
94    dbus_types = [
95        dbus.types.ByteArray,
96        dbus.types.Int16,
97        dbus.types.ObjectPath,
98        dbus.types.Struct,
99        dbus.types.UInt64,
100        dbus.types.Boolean,
101        dbus.types.Dictionary,
102        dbus.types.Int32,
103        dbus.types.Signature,
104        dbus.types.UInt16,
105        dbus.types.UnixFd,
106        dbus.types.Byte,
107        dbus.types.Double,
108        dbus.types.Int64,
109        dbus.types.String,
110        dbus.types.UInt32,
111    ]
112    if isinstance(value, dbus.String):
113        return dbus.String(str(value), variant_level=1)
114    if isinstance(value, dbus.types.Array):
115        return value
116    if type(value) in dbus_types:
117        return type(value)(value.conjugate(), variant_level=1)
118    if isinstance(value, str):
119        return dbus.String(value, variant_level=1)
120    raise dbus.exceptions.DBusException(f'could not wrap type {type(value)}')
121
122
123class DBusMockObject(dbus.service.Object):  # pylint: disable=too-many-instance-attributes
124    '''Mock D-Bus object
125
126    This can be configured to have arbitrary methods (including code execution)
127    and properties via methods on the org.freedesktop.DBus.Mock interface, so
128    that you can control the mock from any programming language.
129    '''
130
131    def __init__(self, bus_name: str, path: str, interface: str, props: PropsType,
132                 logfile: Optional[str] = None, is_object_manager: bool = False) -> None:
133        '''Create a new DBusMockObject
134
135        bus_name: A dbus.service.BusName instance where the object will be put on
136        path: D-Bus object path
137        interface: Primary D-Bus interface name of this object (where
138                   properties and methods will be put on)
139        props: A property_name (string) → property (Variant) map with initial
140               properties on "interface"
141        logfile: When given, method calls will be logged into that file name;
142                 if None, logging will be written to stdout. Note that you can
143                 also query the called methods over D-Bus with GetCalls() and
144                 GetMethodCalls().
145        is_object_manager: If True, the GetManagedObjects method will
146                           automatically be implemented on the object, returning
147                           all objects which have this one’s path as a prefix of
148                           theirs. Note that the InterfacesAdded and
149                           InterfacesRemoved signals will not be automatically
150                           emitted.
151        '''
152        dbus.service.Object.__init__(self, bus_name, path)
153
154        self.bus_name = bus_name
155        self.path = path
156        self.interface = interface
157        self.is_object_manager = is_object_manager
158        self.object_manager: Optional[DBusMockObject] = None
159
160        self._template: Optional[str] = None
161        self._template_parameters: Optional[PropsType] = None
162
163        # pylint: disable=consider-using-with
164        self.logfile = open(logfile, 'wb') if logfile else None
165        self.is_logfile_owner = True
166        self.call_log: List[CallLogType] = []
167
168        if props is None:
169            props = {}
170
171        self._reset(props)
172
173    def __del__(self) -> None:
174        if self.logfile and self.is_logfile_owner:
175            self.logfile.close()
176
177    def _set_up_object_manager(self) -> None:
178        '''Set up this mock object as a D-Bus ObjectManager.'''
179        if self.path == '/':
180            cond = 'k != \'/\''
181        else:
182            cond = f'k.startswith(\'{self.path}/\')'
183
184        self.AddMethod(OBJECT_MANAGER_IFACE,
185                       'GetManagedObjects', '', 'a{oa{sa{sv}}}',
186                       'ret = {dbus.ObjectPath(k): objects[k].props ' +
187                       '  for k in objects.keys() if ' + cond + '}')
188        self.object_manager = self
189
190    def _reset(self, props: PropsType) -> None:
191        # interface -> name -> value
192        self.props = {self.interface: props}
193
194        # interface -> name -> (in_signature, out_signature, code, dbus_wrapper_fn)
195        self.methods: Dict[str, Dict[str, MethodType]] = {self.interface: {}}
196
197        if self.is_object_manager:
198            self._set_up_object_manager()
199
200    @dbus.service.method(dbus.PROPERTIES_IFACE,
201                         in_signature='ss', out_signature='v')
202    def Get(self, interface_name: str, property_name: str) -> Any:
203        '''Standard D-Bus API for getting a property value'''
204
205        self.log(f'Get {interface_name}.{property_name}')
206
207        if not interface_name:
208            interface_name = self.interface
209        try:
210            return self.GetAll(interface_name)[property_name]
211        except KeyError as e:
212            raise dbus.exceptions.DBusException(
213                'no such property ' + property_name,
214                name=self.interface + '.UnknownProperty') from e
215
216    @dbus.service.method(dbus.PROPERTIES_IFACE,
217                         in_signature='s', out_signature='a{sv}')
218    def GetAll(self, interface_name: str, *_, **__) -> PropsType:
219        '''Standard D-Bus API for getting all property values'''
220
221        self.log('GetAll ' + interface_name)
222
223        if not interface_name:
224            interface_name = self.interface
225        try:
226            return self.props[interface_name]
227        except KeyError as e:
228            raise dbus.exceptions.DBusException(
229                'no such interface ' + interface_name,
230                name=self.interface + '.UnknownInterface') from e
231
232    @dbus.service.method(dbus.PROPERTIES_IFACE,
233                         in_signature='ssv', out_signature='')
234    def Set(self, interface_name: str, property_name: str, value: Any, *_, **__) -> None:
235        '''Standard D-Bus API for setting a property value'''
236
237        self.log(f'Set {interface_name}.{property_name}{_format_args((value,))}')
238
239        try:
240            iface_props = self.props[interface_name]
241        except KeyError as e:
242            raise dbus.exceptions.DBusException(
243                'no such interface ' + interface_name,
244                name=self.interface + '.UnknownInterface') from e
245
246        if property_name not in iface_props:
247            raise dbus.exceptions.DBusException(
248                'no such property ' + property_name,
249                name=self.interface + '.UnknownProperty')
250
251        iface_props[property_name] = value
252
253        self.EmitSignal('org.freedesktop.DBus.Properties',
254                        'PropertiesChanged',
255                        'sa{sv}as',
256                        [interface_name,
257                         dbus.Dictionary({property_name: value}, signature='sv'),
258                         dbus.Array([], signature='s')
259                        ])
260
261    @dbus.service.method(MOCK_IFACE,
262                         in_signature='ssa{sv}a(ssss)',
263                         out_signature='')
264    def AddObject(self, path: str, interface: str, properties: PropsType, methods: List[MethodType]) -> None:
265        '''Add a new D-Bus object to the mock
266
267        path: D-Bus object path
268        interface: Primary D-Bus interface name of this object (where
269                   properties and methods will be put on)
270        properties: A property_name (string) → value map with initial
271                    properties on "interface"
272        methods: An array of 4-tuples (name, in_sig, out_sig, code) describing
273                 methods to add to "interface"; see AddMethod() for details of
274                 the tuple values
275
276        If this is a D-Bus ObjectManager instance, the InterfacesAdded signal
277        will *not* be emitted for the object automatically; it must be emitted
278        manually if desired. This is because AddInterface may be called after
279        AddObject, but before the InterfacesAdded signal should be emitted.
280
281        Example:
282        dbus_proxy.AddObject('/com/example/Foo/Manager',
283                             'com.example.Foo.Control',
284                             {
285                                 'state': dbus.String('online', variant_level=1),
286                             },
287                             [
288                                 ('Start', '', '', ''),
289                                 ('EchoInt', 'i', 'i', 'ret = args[0]'),
290                                 ('GetClients', '', 'ao', 'ret = ["/com/example/Foo/Client1"]'),
291                             ])
292        '''
293        if path in objects:
294            raise dbus.exceptions.DBusException(f'object {path} already exists', name='org.freedesktop.DBus.Mock.NameError')
295
296        obj = DBusMockObject(self.bus_name,
297                             path,
298                             interface,
299                             properties)
300        # make sure created objects inherit the log file stream
301        obj.logfile = self.logfile
302        obj.object_manager = self.object_manager
303        obj.is_logfile_owner = False
304        obj.AddMethods(interface, methods)
305
306        objects[path] = obj
307
308    @dbus.service.method(MOCK_IFACE,
309                         in_signature='s',
310                         out_signature='')
311    def RemoveObject(self, path: str) -> None:  # pylint: disable=no-self-use
312        '''Remove a D-Bus object from the mock
313
314        As with AddObject, this will *not* emit the InterfacesRemoved signal if
315        it’s an ObjectManager instance.
316        '''
317        try:
318            objects[path].remove_from_connection()
319            del objects[path]
320        except KeyError as e:
321            raise dbus.exceptions.DBusException(
322                f'object {path} does not exist',
323                name='org.freedesktop.DBus.Mock.NameError') from e
324
325    @dbus.service.method(MOCK_IFACE,
326                         in_signature='', out_signature='')
327    def Reset(self) -> None:
328        '''Reset the mock object state.
329
330        Remove all mock objects from the bus and tidy up so the state is as if
331        python-dbusmock had just been restarted. If the mock object was
332        originally created with a template (from the command line, the Python
333        API or by calling AddTemplate over D-Bus), it will be
334        re-instantiated with that template.
335        '''
336        # Clear other existing objects.
337        for obj_name, obj in objects.items():
338            if obj_name != self.path:
339                obj.remove_from_connection()
340        objects.clear()
341
342        # Reinitialise our state. Carefully remove new methods from our dict;
343        # they don't not actually exist if they are a statically defined
344        # template function
345        for method_name in self.methods[self.interface]:
346            try:
347                delattr(self.__class__, method_name)
348            except AttributeError:
349                pass
350
351        self._reset({})
352
353        if self._template is not None:
354            self.AddTemplate(self._template, self._template_parameters)
355
356        objects[self.path] = self
357
358    @dbus.service.method(MOCK_IFACE,
359                         in_signature='sssss',
360                         out_signature='')
361    def AddMethod(self, interface, name: str, in_sig: str, out_sig: str, code: str) -> None:
362        '''Add a method to this object
363
364        interface: D-Bus interface to add this to. For convenience you can
365                   specify '' here to add the method to the object's main
366                   interface (as specified on construction).
367        name: Name of the method
368        in_sig: Signature of input arguments; for example "ias" for a method
369                that takes an int32 and a string array as arguments; see
370                http://dbus.freedesktop.org/doc/dbus-specification.html#message-protocol-signatures
371        out_sig: Signature of output arguments; for example "s" for a method
372                 that returns a string; use '' for methods that do not return
373                 anything.
374        code: Python 3 code to run in the method call; you have access to the
375              arguments through the "args" list, and can set the return value
376              by assigning a value to the "ret" variable. You can also read the
377              global "objects" variable, which is a dictionary mapping object
378              paths to DBusMockObject instances.
379
380              For keeping state across method calls, you are free to use normal
381              Python members of the "self" object, which will be persistent for
382              the whole mock's life time. E. g. you can have a method with
383              "self.my_state = True", and another method that returns it with
384              "ret = self.my_state".
385
386              When specifying '', the method will not do anything (except
387              logging) and return None.
388        '''
389        # pylint: disable=protected-access
390
391        if not interface:
392            interface = self.interface
393        n_args = len(dbus.Signature(in_sig))
394
395        # we need to have separate methods for dbus-python, so clone
396        # mock_method(); using message_keyword with this dynamic approach fails
397        # because inspect cannot handle those, so pass on interface and method
398        # name as first positional arguments
399        method = lambda self, *args, **kwargs: DBusMockObject.mock_method(
400            self, interface, name, in_sig, *args, **kwargs)
401
402        # we cannot specify in_signature here, as that trips over a consistency
403        # check in dbus-python; we need to set it manually instead
404        dbus_method = dbus.service.method(interface,
405                                          out_signature=out_sig)(method)
406        dbus_method.__name__ = str(name)
407        dbus_method._dbus_in_signature = in_sig
408        dbus_method._dbus_args = [f'arg{i}' for i in range(1, n_args + 1)]
409
410        # for convenience, add mocked methods on the primary interface as
411        # callable methods
412        if interface == self.interface:
413            setattr(self.__class__, name, dbus_method)
414
415        self.methods.setdefault(interface, {})[str(name)] = (in_sig, out_sig, code, dbus_method)
416
417    @dbus.service.method(MOCK_IFACE,
418                         in_signature='sa(ssss)',
419                         out_signature='')
420    def AddMethods(self, interface: str, methods: List[MethodType]) -> None:
421        '''Add several methods to this object
422
423        interface: D-Bus interface to add this to. For convenience you can
424                   specify '' here to add the method to the object's main
425                   interface (as specified on construction).
426        methods: list of 4-tuples (name, in_sig, out_sig, code) describing one
427                 method each. See AddMethod() for details of the tuple values.
428        '''
429        for method in methods:
430            self.AddMethod(interface, *method)
431
432    def _set_property(self, interface, name, value):
433        # copy.copy removes one level of variant-ness, which means that the
434        # types get exported in introspection data correctly, but we can't do
435        # this for container types.
436        if not isinstance(value, (dbus.Dictionary, dbus.Array)):
437            value = copy.copy(value)
438
439        self.props.setdefault(interface, {})[name] = value
440
441    @dbus.service.method(MOCK_IFACE,
442                         in_signature='sa{sv}',
443                         out_signature='')
444    def UpdateProperties(self, interface: str, properties: PropsType) -> None:
445        '''Update properties on this object and send a PropertiesChanged signal
446
447        interface: D-Bus interface to update this to. For convenience you can
448                   specify '' here to add the property to the object's main
449                   interface (as specified on construction).
450        properties: A property_name (string) → value map
451        '''
452        changed_props = {}
453
454        for name, value in properties.items():
455            if not interface:
456                interface = self.interface
457            if name not in self.props.get(interface, {}):
458                raise dbus.exceptions.DBusException(f'property {name} not found', name=interface + '.NoSuchProperty')
459
460            self._set_property(interface, name, value)
461            changed_props[name] = _wrap_in_dbus_variant(value)
462
463        self.EmitSignal(dbus.PROPERTIES_IFACE, 'PropertiesChanged', 'sa{sv}as', [
464            interface, changed_props, []])
465
466    @dbus.service.method(MOCK_IFACE,
467                         in_signature='ssv',
468                         out_signature='')
469    def AddProperty(self, interface: str, name: str, value: Any) -> None:
470        '''Add property to this object
471
472        interface: D-Bus interface to add this to. For convenience you can
473                   specify '' here to add the property to the object's main
474                   interface (as specified on construction).
475        name: Property name.
476        value: Property value.
477        '''
478        if not interface:
479            interface = self.interface
480        if name in self.props.get(interface, {}):
481            raise dbus.exceptions.DBusException(f'property {name} already exists', name=self.interface + '.PropertyExists')
482
483        self._set_property(interface, name, value)
484
485    @dbus.service.method(MOCK_IFACE,
486                         in_signature='sa{sv}',
487                         out_signature='')
488    def AddProperties(self, interface: str, properties: PropsType) -> None:
489        '''Add several properties to this object
490
491        interface: D-Bus interface to add this to. For convenience you can
492                   specify '' here to add the property to the object's main
493                   interface (as specified on construction).
494        properties: A property_name (string) → value map
495        '''
496        for k, v in properties.items():
497            self.AddProperty(interface, k, v)
498
499    @dbus.service.method(MOCK_IFACE,
500                         in_signature='sa{sv}',
501                         out_signature='')
502    def AddTemplate(self, template: str, parameters: PropsType) -> None:
503        '''Load a template into the mock.
504
505        python-dbusmock ships a set of standard mocks for common system
506        services such as UPower and NetworkManager. With these the actual tests
507        become a lot simpler, as they only have to set up the particular
508        properties for the tests, and not the skeleton of common properties,
509        interfaces, and methods.
510
511        template: Name of the template to load or the full path to a *.py file
512                  for custom templates. See "pydoc dbusmock.templates" for a
513                  list of available templates from python-dbusmock package, and
514                  "pydoc dbusmock.templates.NAME" for documentation about
515                  template NAME.
516        parameters: A parameter (string) → value (variant) map, for
517                    parameterizing templates. Each template can define their
518                    own, see documentation of that particular template for
519                    details.
520        '''
521        try:
522            module = load_module(template)
523        except ImportError as e:
524            raise dbus.exceptions.DBusException(f'Cannot add template {template}: {str(e)}',
525                                                name='org.freedesktop.DBus.Mock.TemplateError')
526
527        # If the template specifies this is an ObjectManager, set that up
528        if hasattr(module, 'IS_OBJECT_MANAGER') and module.IS_OBJECT_MANAGER:
529            self._set_up_object_manager()
530
531        # pick out all D-Bus service methods and add them to our interface
532        for symbol in dir(module):
533            # pylint: disable=protected-access
534            fn = getattr(module, symbol)
535            if ('_dbus_interface' in dir(fn) and ('_dbus_is_signal' not in dir(fn) or not fn._dbus_is_signal)):
536                # for dbus-python compatibility, add methods as callables
537                setattr(self.__class__, symbol, fn)
538                self.methods.setdefault(fn._dbus_interface, {})[str(symbol)] = (
539                    fn._dbus_in_signature,
540                    fn._dbus_out_signature, '', fn
541                )
542
543        if parameters is None:
544            parameters = {}
545
546        module.load(self, parameters)
547        # save the given template and parameters for re-instantiation on
548        # Reset()
549        self._template = template
550        self._template_parameters = parameters
551
552    @dbus.service.method(MOCK_IFACE,
553                         in_signature='sssav',
554                         out_signature='')
555    def EmitSignal(self, interface: str, name: str, signature: str, args: List[Any]) -> None:
556        '''Emit a signal from the object.
557
558        interface: D-Bus interface to send the signal from. For convenience you
559                   can specify '' here to add the method to the object's main
560                   interface (as specified on construction).
561        name: Name of the signal
562        signature: Signature of input arguments; for example "ias" for a signal
563                that takes an int32 and a string array as arguments; see
564                http://dbus.freedesktop.org/doc/dbus-specification.html#message-protocol-signatures
565        args: variant array with signal arguments; must match order and type in
566              "signature"
567        '''
568        # pylint: disable=protected-access
569        if not interface:
570            interface = self.interface
571
572        # convert types of arguments according to signature, using
573        # MethodCallMessage.append(); this will also provide type/length
574        # checks, except for the case of an empty signature
575        if signature == '' and len(args) > 0:
576            raise TypeError('Fewer items found in D-Bus signature than in Python arguments')
577        m = dbus.connection.MethodCallMessage('a.b', '/a', 'a.b', 'a')
578        m.append(signature=signature, *args)
579        args = m.get_args_list()
580
581        fn = lambda self, *args: self.log(f'emit {interface}.{name}{_format_args(args)}')
582        fn.__name__ = str(name)
583        dbus_fn = dbus.service.signal(interface)(fn)
584        dbus_fn._dbus_signature = signature
585        dbus_fn._dbus_args = [f'arg{i}' for i in range(1, len(args) + 1)]
586
587        dbus_fn(self, *args)
588
589    @dbus.service.method(MOCK_IFACE,
590                         in_signature='',
591                         out_signature='a(tsav)')
592    def GetCalls(self) -> List[CallLogType]:
593        '''List all the logged calls since the last call to ClearCalls().
594
595        Return a list of (timestamp, method_name, args_list) tuples.
596        '''
597        return self.call_log
598
599    @dbus.service.method(MOCK_IFACE,
600                         in_signature='s',
601                         out_signature='a(tav)')
602    def GetMethodCalls(self, method: str) -> List[Tuple[int, Sequence[Any]]]:
603        '''List all the logged calls of a particular method.
604
605        Return a list of (timestamp, args_list) tuples.
606        '''
607        return [(row[0], row[2]) for row in self.call_log if row[1] == method]
608
609    @dbus.service.method(MOCK_IFACE,
610                         in_signature='',
611                         out_signature='')
612    def ClearCalls(self) -> None:
613        '''Empty the log of mock call signatures.'''
614
615        self.call_log = []
616
617    @dbus.service.signal(MOCK_IFACE, signature='sav')
618    def MethodCalled(self, name, args):
619        '''Signal emitted for every called mock method.
620
621        This is emitted for all mock method calls.  This can be used to confirm
622        that a particular method was called with particular arguments, as an
623        alternative to reading the mock's log or GetCalls().
624        '''
625
626    def object_manager_emit_added(self, path: str) -> None:
627        '''Emit ObjectManager.InterfacesAdded signal'''
628
629        if self.object_manager is not None:
630            self.object_manager.EmitSignal(OBJECT_MANAGER_IFACE, 'InterfacesAdded',
631                                           'oa{sa{sv}}', [dbus.ObjectPath(path),
632                                                          objects[path].props])
633
634    def object_manager_emit_removed(self, path: str) -> None:
635        '''Emit ObjectManager.InterfacesRemoved signal'''
636
637        if self.object_manager is not None:
638            self.object_manager.EmitSignal(OBJECT_MANAGER_IFACE, 'InterfacesRemoved',
639                                           'oas', [dbus.ObjectPath(path),
640                                                   objects[path].props])
641
642    def mock_method(self, interface: str, dbus_method: str, in_signature: str, *args, **_) -> Any:
643        '''Master mock method.
644
645        This gets "instantiated" in AddMethod(). Execute the code snippet of
646        the method and return the "ret" variable if it was set.
647        '''
648        # print('mock_method', dbus_method, self, in_signature, args, kwargs, file=sys.stderr)
649
650        # convert types of arguments according to signature, using
651        # MethodCallMessage.append(); this will also provide type/length
652        # checks, except for the case of an empty signature
653        if in_signature == '' and len(args) > 0:
654            raise TypeError('Fewer items found in D-Bus signature than in Python arguments')
655        m = dbus.connection.MethodCallMessage('a.b', '/a', 'a.b', 'a')
656        m.append(signature=in_signature, *args)
657        args = m.get_args_list()
658
659        self.log(dbus_method + _format_args(args))
660        self.call_log.append((int(time.time()), str(dbus_method), args))
661        self.MethodCalled(dbus_method, args)
662
663        # The code may be a Python 3 string to interpret, or may be a function
664        # object (if AddMethod was called from within Python itself, rather than
665        # over D-Bus).
666        code = self.methods[interface][dbus_method][2]
667        if code and isinstance(code, types.FunctionType):
668            return code(self, *args)
669        if code:
670            loc = locals().copy()
671            exec(code, globals(), loc)  # pylint: disable=exec-used
672            if 'ret' in loc:
673                return loc['ret']
674
675        return None
676
677    def log(self, msg: str) -> None:
678        '''Log a message, prefixed with a timestamp.
679
680        If a log file was specified in the constructor, it is written there,
681        otherwise it goes to stdout.
682        '''
683        if self.logfile:
684            fd = self.logfile.fileno()
685        else:
686            fd = sys.stdout.fileno()
687
688        os.write(fd, f'{time.time():.3f} {msg}\n'.encode('UTF-8'))
689
690    @dbus.service.method(dbus.INTROSPECTABLE_IFACE,
691                         in_signature='',
692                         out_signature='s',
693                         path_keyword='object_path',
694                         connection_keyword='connection')
695    def Introspect(self, object_path: str, connection: dbus.connection.Connection) -> str:
696        '''Return XML description of this object's interfaces, methods and signals.
697
698        This wraps dbus-python's Introspect() method to include the dynamic
699        methods and properties.
700        '''
701        # _dbus_class_table is an indirect private member of dbus.service.Object that pylint fails to see
702        # pylint: disable=no-member
703
704        # temporarily add our dynamic methods
705        cls = self.__class__.__module__ + '.' + self.__class__.__name__
706        orig_interfaces = self._dbus_class_table[cls]
707
708        mock_interfaces = orig_interfaces.copy()
709        for iface, methods in self.methods.items():
710            for method, impl in methods.items():
711                mock_interfaces.setdefault(iface, {})[method] = impl[3]
712        self._dbus_class_table[cls] = mock_interfaces
713
714        xml = dbus.service.Object.Introspect(self, object_path, connection)
715
716        tree = ElementTree.fromstring(xml)
717
718        for name, name_props in self.props.items():
719            # We might have properties for new interfaces we don't know about
720            # yet. Try to find an existing <interface> node named after our
721            # interface to append to, and create one if we can't.
722            interface = tree.find(f".//interface[@name='{name}']")
723            if interface is None:
724                interface = ElementTree.Element("interface", {"name": name})
725                tree.append(interface)
726
727            for prop, val in name_props.items():
728                if val is None:
729                    # can't guess type from None, skip
730                    continue
731                elem = ElementTree.Element("property", {
732                    "name": prop,
733                    # We don't store the signature anywhere, so guess it.
734                    "type": dbus.lowlevel.Message.guess_signature(val),
735                    "access": "readwrite"})
736
737                interface.append(elem)
738
739        xml = ElementTree.tostring(tree, encoding='utf8', method='xml').decode('utf8')
740
741        # restore original class table
742        self._dbus_class_table[cls] = orig_interfaces
743
744        return xml
745
746
747# Overwrite dbus-python's _method_lookup(), as that offers no way to have the
748# same method name on different interfaces
749orig_method_lookup = dbus.service._method_lookup  # pylint: disable=protected-access
750
751
752def _dbusmock_method_lookup(obj, method_name, dbus_interface):
753    try:
754        m = obj.methods[dbus_interface or obj.interface][method_name]
755        return (m[3], m[3])
756    except KeyError:
757        return orig_method_lookup(obj, method_name, dbus_interface)
758
759
760dbus.service._method_lookup = _dbusmock_method_lookup  # pylint: disable=protected-access
761
762
763#
764# Helper API for templates
765#
766
767
768def get_objects() -> KeysView[str]:
769    '''Return all existing object paths'''
770
771    return objects.keys()
772
773
774def get_object(path) -> DBusMockObject:
775    '''Return object for a given object path'''
776
777    return objects[path]
778