1#!/usr/bin/python3
2
3# This program is free software; you can redistribute it and/or modify it under
4# the terms of the GNU Lesser General Public License as published by the Free
5# Software Foundation; either version 3 of the License, or (at your option) any
6# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
7# of the license.
8
9__author__ = 'Martin Pitt'
10__copyright__ = '(c) 2012 Canonical Ltd.'
11
12import unittest
13import sys
14import os
15import tempfile
16import subprocess
17import time
18import importlib.util
19import tracemalloc
20
21import dbus
22import dbus.mainloop.glib
23
24from gi.repository import GLib
25
26import dbusmock
27
28tracemalloc.start(25)
29dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
30
31# "a <heart> b" in py2/3 compatible unicode
32UNICODE = b'a\xe2\x99\xa5b'.decode('UTF-8')
33
34
35class TestAPI(dbusmock.DBusTestCase):
36    '''Test dbus-mock API'''
37
38    @classmethod
39    def setUpClass(cls):
40        cls.start_session_bus()
41        cls.dbus_con = cls.get_dbus()
42
43    def setUp(self):
44        # pylint: disable=consider-using-with
45        self.mock_log = tempfile.NamedTemporaryFile()
46        self.p_mock = self.spawn_server('org.freedesktop.Test',
47                                        '/',
48                                        'org.freedesktop.Test.Main',
49                                        stdout=self.mock_log)
50
51        self.obj_test = self.dbus_con.get_object('org.freedesktop.Test', '/')
52        self.dbus_test = dbus.Interface(self.obj_test, 'org.freedesktop.Test.Main')
53        self.dbus_mock = dbus.Interface(self.obj_test, dbusmock.MOCK_IFACE)
54        self.dbus_props = dbus.Interface(self.obj_test, dbus.PROPERTIES_IFACE)
55
56    def assertLog(self, regex):
57        with open(self.mock_log.name, "rb") as f:
58            self.assertRegex(f.read(), regex)
59
60    def tearDown(self):
61        if self.p_mock.stdout:
62            self.p_mock.stdout.close()
63        self.p_mock.terminate()
64        self.p_mock.wait()
65
66    def test_noarg_noret(self):
67        '''no arguments, no return value'''
68
69        self.dbus_mock.AddMethod('', 'Do', '', '', '')
70        self.assertEqual(self.dbus_test.Do(), None)
71
72        # check that it's logged correctly
73        self.assertLog(b'^[0-9.]+ Do$')
74
75    def test_onearg_noret(self):
76        '''one argument, no return value'''
77
78        self.dbus_mock.AddMethod('', 'Do', 's', '', '')
79        self.assertEqual(self.dbus_test.Do('Hello'), None)
80
81        # check that it's logged correctly
82        self.assertLog(b'^[0-9.]+ Do "Hello"$')
83
84    def test_onearg_ret(self):
85        '''one argument, code for return value'''
86
87        self.dbus_mock.AddMethod('', 'Do', 's', 's', 'ret = args[0]')
88        self.assertEqual(self.dbus_test.Do('Hello'), 'Hello')
89
90    def test_unicode_str(self):
91        '''unicode string roundtrip'''
92
93        self.dbus_mock.AddMethod('', 'Do', 's', 's', 'ret = args[0] * 2')
94        self.assertEqual(self.dbus_test.Do(UNICODE), dbus.String(UNICODE * 2))
95
96    def test_twoarg_ret(self):
97        '''two arguments, code for return value'''
98
99        self.dbus_mock.AddMethod('', 'Do', 'si', 's', 'ret = args[0] * args[1]')
100        self.assertEqual(self.dbus_test.Do('foo', 3), 'foofoofoo')
101
102        # check that it's logged correctly
103        self.assertLog(b'^[0-9.]+ Do "foo" 3$')
104
105    def test_array_arg(self):
106        '''array argument'''
107
108        self.dbus_mock.AddMethod('', 'Do', 'iaous', '',
109                                 f'''assert len(args) == 4
110assert args[0] == -1;
111assert args[1] == ['/foo']
112assert type(args[1]) == dbus.Array
113assert type(args[1][0]) == dbus.ObjectPath
114assert args[2] == 5
115assert args[3] == {repr(UNICODE)}
116''')
117        self.assertEqual(self.dbus_test.Do(-1, ['/foo'], 5, UNICODE), None)
118
119        # check that it's logged correctly
120        self.assertLog(b'^[0-9.]+ Do -1 \\["/foo"\\] 5 "a\\xe2\\x99\\xa5b"$')
121
122    def test_dict_arg(self):
123        '''dictionary argument'''
124
125        self.dbus_mock.AddMethod('', 'Do', 'ia{si}u', '',
126                                 '''assert len(args) == 3
127assert args[0] == -1;
128assert args[1] == {'foo': 42}
129assert type(args[1]) == dbus.Dictionary
130assert args[2] == 5
131''')
132        self.assertEqual(self.dbus_test.Do(-1, {'foo': 42}, 5), None)
133
134        # check that it's logged correctly
135        self.assertLog(b'^[0-9.]+ Do -1 {"foo": 42} 5$')
136
137    def test_methods_on_other_interfaces(self):
138        '''methods on other interfaces'''
139
140        self.dbus_mock.AddMethod('org.freedesktop.Test.Other', 'OtherDo', '', '', '')
141        self.dbus_mock.AddMethods('org.freedesktop.Test.Other',
142                                  [('OtherDo2', '', '', ''),
143                                   ('OtherDo3', 'i', 'i', 'ret = args[0]')])
144
145        # should not be on the main interface
146        self.assertRaises(dbus.exceptions.DBusException,
147                          self.dbus_test.OtherDo)
148
149        # should be on the other interface
150        self.assertEqual(self.obj_test.OtherDo(dbus_interface='org.freedesktop.Test.Other'), None)
151        self.assertEqual(self.obj_test.OtherDo2(dbus_interface='org.freedesktop.Test.Other'), None)
152        self.assertEqual(self.obj_test.OtherDo3(42, dbus_interface='org.freedesktop.Test.Other'), 42)
153
154        # check that it's logged correctly
155        self.assertLog(b'^[0-9.]+ OtherDo\n[0-9.]+ OtherDo2\n[0-9.]+ OtherDo3 42$')
156
157    def test_methods_same_name(self):
158        '''methods with same name on different interfaces'''
159
160        self.dbus_mock.AddMethod('org.iface1', 'Do', 'i', 'i', 'ret = args[0] + 2')
161        self.dbus_mock.AddMethod('org.iface2', 'Do', 'i', 'i', 'ret = args[0] + 3')
162
163        # should not be on the main interface
164        self.assertRaises(dbus.exceptions.DBusException,
165                          self.dbus_test.Do)
166
167        # should be on the other interface
168        self.assertEqual(self.obj_test.Do(10, dbus_interface='org.iface1'), 12)
169        self.assertEqual(self.obj_test.Do(11, dbus_interface='org.iface2'), 14)
170
171        # check that it's logged correctly
172        self.assertLog(b'^[0-9.]+ Do 10\n[0-9.]+ Do 11$')
173
174        # now add it to the primary interface, too
175        self.dbus_mock.AddMethod('', 'Do', 'i', 'i', 'ret = args[0] + 1')
176        self.assertEqual(self.obj_test.Do(9, dbus_interface='org.freedesktop.Test.Main'), 10)
177        self.assertEqual(self.obj_test.Do(10, dbus_interface='org.iface1'), 12)
178        self.assertEqual(self.obj_test.Do(11, dbus_interface='org.iface2'), 14)
179
180    def test_methods_type_mismatch(self):
181        '''calling methods with wrong arguments'''
182
183        def check(signature, args, err):
184            self.dbus_mock.AddMethod('', 'Do', signature, '', '')
185            try:
186                self.dbus_test.Do(*args)
187                self.fail(f'method call did not raise an error for signature "{signature}" and arguments {args}')
188            except dbus.exceptions.DBusException as e:
189                self.assertIn(err, str(e))
190
191        # not enough arguments
192        check('i', [], 'TypeError: More items found')
193        check('is', [1], 'TypeError: More items found')
194
195        # too many arguments
196        check('', [1], 'TypeError: Fewer items found')
197        check('i', [1, 'hello'], 'TypeError: Fewer items found')
198
199        # type mismatch
200        check('u', [-1], 'convert negative value to unsigned')
201        check('i', ['hello'], 'TypeError')
202        check('i', ['hello'], 'integer')
203        check('s', [1], 'TypeError: Expected a string')
204
205    def test_add_object(self):
206        '''add a new object'''
207
208        self.dbus_mock.AddObject('/obj1',
209                                 'org.freedesktop.Test.Sub',
210                                 {
211                                     'state': dbus.String('online', variant_level=1),
212                                     'cute': dbus.Boolean(True, variant_level=1),
213                                 },
214                                 [])
215
216        obj1 = self.dbus_con.get_object('org.freedesktop.Test', '/obj1')
217        dbus_sub = dbus.Interface(obj1, 'org.freedesktop.Test.Sub')
218        dbus_props = dbus.Interface(obj1, dbus.PROPERTIES_IFACE)
219
220        # check properties
221        self.assertEqual(dbus_props.Get('org.freedesktop.Test.Sub', 'state'), 'online')
222        self.assertEqual(dbus_props.Get('org.freedesktop.Test.Sub', 'cute'), True)
223        self.assertEqual(dbus_props.GetAll('org.freedesktop.Test.Sub'),
224                         {'state': 'online', 'cute': True})
225
226        # add new method
227        obj1.AddMethod('', 'Do', '', 's', 'ret = "hello"',
228                       dbus_interface=dbusmock.MOCK_IFACE)
229        self.assertEqual(dbus_sub.Do(), 'hello')
230
231    def test_add_object_existing(self):
232        '''try to add an existing object'''
233
234        self.dbus_mock.AddObject('/obj1', 'org.freedesktop.Test.Sub', {}, [])
235
236        self.assertRaises(dbus.exceptions.DBusException,
237                          self.dbus_mock.AddObject,
238                          '/obj1',
239                          'org.freedesktop.Test.Sub',
240                          {},
241                          [])
242
243        # try to add the main object again
244        self.assertRaises(dbus.exceptions.DBusException,
245                          self.dbus_mock.AddObject,
246                          '/',
247                          'org.freedesktop.Test.Other',
248                          {},
249                          [])
250
251    def test_add_object_with_methods(self):
252        '''add a new object with methods'''
253
254        self.dbus_mock.AddObject('/obj1',
255                                 'org.freedesktop.Test.Sub',
256                                 {
257                                     'state': dbus.String('online', variant_level=1),
258                                     'cute': dbus.Boolean(True, variant_level=1),
259                                 },
260                                 [
261                                     ('Do0', '', 'i', 'ret = 42'),
262                                     ('Do1', 'i', 'i', 'ret = 31337'),
263                                 ])
264
265        obj1 = self.dbus_con.get_object('org.freedesktop.Test', '/obj1')
266
267        self.assertEqual(obj1.Do0(), 42)
268        self.assertEqual(obj1.Do1(1), 31337)
269        self.assertRaises(dbus.exceptions.DBusException,
270                          obj1.Do2, 31337)
271
272    def test_properties(self):
273        '''add and change properties'''
274
275        # no properties by default
276        self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'), {})
277
278        # no such property
279        with self.assertRaises(dbus.exceptions.DBusException) as ctx:
280            self.dbus_props.Get('org.freedesktop.Test.Main', 'version')
281        self.assertEqual(ctx.exception.get_dbus_name(),
282                         'org.freedesktop.Test.Main.UnknownProperty')
283        self.assertEqual(ctx.exception.get_dbus_message(),
284                         'no such property version')
285
286        self.assertRaises(dbus.exceptions.DBusException,
287                          self.dbus_props.Set,
288                          'org.freedesktop.Test.Main',
289                          'version',
290                          dbus.Int32(2, variant_level=1))
291
292        self.dbus_mock.AddProperty('org.freedesktop.Test.Main',
293                                   'version',
294                                   dbus.Int32(2, variant_level=1))
295        # once again on default interface
296        self.dbus_mock.AddProperty('',
297                                   'connected',
298                                   dbus.Boolean(True, variant_level=1))
299
300        self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'version'), 2)
301        self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'connected'), True)
302
303        self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'),
304                         {'version': 2, 'connected': True})
305
306        with self.assertRaises(dbus.exceptions.DBusException) as ctx:
307            self.dbus_props.GetAll('org.freedesktop.Test.Bogus')
308        self.assertEqual(ctx.exception.get_dbus_name(),
309                         'org.freedesktop.Test.Main.UnknownInterface')
310        self.assertEqual(ctx.exception.get_dbus_message(),
311                         'no such interface org.freedesktop.Test.Bogus')
312
313        # change property
314        self.dbus_props.Set('org.freedesktop.Test.Main', 'version',
315                            dbus.Int32(4, variant_level=1))
316        self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'version'), 4)
317
318        # check that the Get/Set calls get logged
319        with open(self.mock_log.name, encoding="UTF-8") as f:
320            contents = f.read()
321            self.assertRegex(contents, '\n[0-9.]+ Get org.freedesktop.Test.Main.version\n')
322            self.assertRegex(contents, '\n[0-9.]+ Get org.freedesktop.Test.Main.connected\n')
323            self.assertRegex(contents, '\n[0-9.]+ GetAll org.freedesktop.Test.Main\n')
324            self.assertRegex(contents, '\n[0-9.]+ Set org.freedesktop.Test.Main.version 4\n')
325
326        # add property to different interface
327        self.dbus_mock.AddProperty('org.freedesktop.Test.Other',
328                                   'color',
329                                   dbus.String('yellow', variant_level=1))
330
331        self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'),
332                         {'version': 4, 'connected': True})
333        self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Other'),
334                         {'color': 'yellow'})
335        self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Other', 'color'),
336                         'yellow')
337
338        changed_props = []
339        ml = GLib.MainLoop()
340
341        def catch(*args, **kwargs):
342            if kwargs['interface'] != 'org.freedesktop.DBus.Properties':
343                return
344
345            self.assertEqual(kwargs['interface'], 'org.freedesktop.DBus.Properties')
346            self.assertEqual(kwargs['member'], 'PropertiesChanged')
347
348            [iface, changed, _invalidated] = args
349            self.assertEqual(iface, 'org.freedesktop.Test.Main')
350
351            changed_props.append(changed)
352            ml.quit()
353
354        match = self.dbus_con.add_signal_receiver(catch,
355                                                  interface_keyword='interface',
356                                                  path_keyword='path',
357                                                  member_keyword='member')
358
359        # change property using mock helper
360        self.dbus_mock.UpdateProperties('org.freedesktop.Test.Main', {
361            'version': 5,
362            'connected': False,
363        })
364
365        GLib.timeout_add(3000, ml.quit)
366        ml.run()
367
368        match.remove()
369
370        self.assertEqual(self.dbus_props.GetAll('org.freedesktop.Test.Main'),
371                         {'version': 5, 'connected': False})
372        self.assertEqual(changed_props,
373                         [{'version': 5, 'connected': False}])
374
375        # test adding properties with the array type
376        self.dbus_mock.AddProperty('org.freedesktop.Test.Main',
377                                   'array',
378                                   dbus.Array(['first'], signature='s'))
379        self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'array'),
380                         ['first'])
381
382        # test updating properties with the array type
383        self.dbus_mock.UpdateProperties('org.freedesktop.Test.Main',
384                                        {'array': dbus.Array(['second', 'third'],
385                                                             signature='s')})
386        self.assertEqual(self.dbus_props.Get('org.freedesktop.Test.Main', 'array'),
387                         ['second', 'third'])
388
389    def test_introspection_methods(self):
390        '''dynamically added methods appear in introspection'''
391
392        dbus_introspect = dbus.Interface(self.obj_test, dbus.INTROSPECTABLE_IFACE)
393
394        xml_empty = dbus_introspect.Introspect()
395        self.assertIn('<interface name="org.freedesktop.DBus.Mock">', xml_empty)
396        self.assertIn('<method name="AddMethod">', xml_empty)
397
398        self.dbus_mock.AddMethod('', 'Do', 'saiv', 'i', 'ret = 42')
399
400        xml_method = dbus_introspect.Introspect()
401        self.assertNotEqual(xml_empty, xml_method)
402        self.assertIn('<interface name="org.freedesktop.Test.Main">', xml_method)
403        # various Python versions use different name vs. type ordering
404        expected1 = '''<method name="Do">
405      <arg direction="in" name="arg1" type="s" />
406      <arg direction="in" name="arg2" type="ai" />
407      <arg direction="in" name="arg3" type="v" />
408      <arg direction="out" type="i" />
409    </method>'''
410        expected2 = '''<method name="Do">
411      <arg direction="in" type="s" name="arg1" />
412      <arg direction="in" type="ai" name="arg2" />
413      <arg direction="in" type="v" name="arg3" />
414      <arg direction="out" type="i" />
415    </method>'''
416        self.assertTrue(expected1 in xml_method or expected2 in xml_method, xml_method)
417
418    # properties in introspection are not supported by dbus-python right now
419    def test_introspection_properties(self):
420        '''dynamically added properties appear in introspection'''
421
422        self.dbus_mock.AddProperty('', 'Color', 'yellow')
423        self.dbus_mock.AddProperty('org.freedesktop.Test.Sub', 'Count', 5)
424
425        xml = self.obj_test.Introspect()
426
427        self.assertIn('<interface name="org.freedesktop.Test.Main">', xml)
428        self.assertIn('<interface name="org.freedesktop.Test.Sub">', xml)
429        # various Python versions use different attribute ordering
430        self.assertTrue('<property access="readwrite" name="Color" type="s" />' in xml or
431                        '<property name="Color" type="s" access="readwrite" />' in xml, xml)
432        self.assertTrue('<property access="readwrite" name="Count" type="i" />' in xml or
433                        '<property name="Count" type="i" access="readwrite" />' in xml, xml)
434
435    def test_objects_map(self):
436        '''access global objects map'''
437
438        self.dbus_mock.AddMethod('', 'EnumObjs', '', 'ao', 'ret = objects.keys()')
439        self.assertEqual(self.dbus_test.EnumObjs(), ['/'])
440
441        self.dbus_mock.AddObject('/obj1', 'org.freedesktop.Test.Sub', {}, [])
442        self.assertEqual(set(self.dbus_test.EnumObjs()), {'/', '/obj1'})
443
444    def test_signals(self):
445        '''emitting signals'''
446
447        def do_emit():
448            self.dbus_mock.EmitSignal('', 'SigNoArgs', '', [])
449            self.dbus_mock.EmitSignal('org.freedesktop.Test.Sub',
450                                      'SigTwoArgs',
451                                      'su', ['hello', 42])
452            self.dbus_mock.EmitSignal('org.freedesktop.Test.Sub',
453                                      'SigTypeTest',
454                                      'iuvao',
455                                      [-42, 42, dbus.String('hello', variant_level=1), ['/a', '/b']])
456
457        caught = []
458        ml = GLib.MainLoop()
459
460        def catch(*args, **kwargs):
461            if kwargs['interface'].startswith('org.freedesktop.Test'):
462                caught.append((args, kwargs))
463            if len(caught) == 3:
464                # we caught everything there is to catch, don't wait for the
465                # timeout
466                ml.quit()
467
468        self.dbus_con.add_signal_receiver(catch,
469                                          interface_keyword='interface',
470                                          path_keyword='path',
471                                          member_keyword='member')
472
473        GLib.timeout_add(200, do_emit)
474        # ensure that the loop quits even when we catch fewer than 2 signals
475        GLib.timeout_add(3000, ml.quit)
476        ml.run()
477
478        # check SigNoArgs
479        self.assertEqual(caught[0][0], ())
480        self.assertEqual(caught[0][1]['member'], 'SigNoArgs')
481        self.assertEqual(caught[0][1]['path'], '/')
482        self.assertEqual(caught[0][1]['interface'], 'org.freedesktop.Test.Main')
483
484        # check SigTwoArgs
485        self.assertEqual(caught[1][0], ('hello', 42))
486        self.assertEqual(caught[1][1]['member'], 'SigTwoArgs')
487        self.assertEqual(caught[1][1]['path'], '/')
488        self.assertEqual(caught[1][1]['interface'], 'org.freedesktop.Test.Sub')
489
490        # check data types in SigTypeTest
491        self.assertEqual(caught[2][1]['member'], 'SigTypeTest')
492        self.assertEqual(caught[2][1]['path'], '/')
493        args = caught[2][0]
494        self.assertEqual(args[0], -42)
495        self.assertEqual(type(args[0]), dbus.Int32)
496        self.assertEqual(args[0].variant_level, 0)
497
498        self.assertEqual(args[1], 42)
499        self.assertEqual(type(args[1]), dbus.UInt32)
500        self.assertEqual(args[1].variant_level, 0)
501
502        self.assertEqual(args[2], 'hello')
503        self.assertEqual(type(args[2]), dbus.String)
504        self.assertEqual(args[2].variant_level, 1)
505
506        self.assertEqual(args[3], ['/a', '/b'])
507        self.assertEqual(type(args[3]), dbus.Array)
508        self.assertEqual(args[3].variant_level, 0)
509        self.assertEqual(type(args[3][0]), dbus.ObjectPath)
510        self.assertEqual(args[3][0].variant_level, 0)
511
512        # check correct logging
513        with open(self.mock_log.name, encoding="UTF-8") as f:
514            log = f.read()
515        self.assertRegex(log, '[0-9.]+ emit org.freedesktop.Test.Main.SigNoArgs\n')
516        self.assertRegex(log, '[0-9.]+ emit org.freedesktop.Test.Sub.SigTwoArgs "hello" 42\n')
517        self.assertRegex(log, '[0-9.]+ emit org.freedesktop.Test.Sub.SigTypeTest -42 42')
518        self.assertRegex(log, r'[0-9.]+ emit org.freedesktop.Test.Sub.SigTypeTest -42 42 "hello" \["/a", "/b"\]\n')
519
520    def test_signals_type_mismatch(self):
521        '''emitting signals with wrong arguments'''
522
523        def check(signature, args, err):
524            try:
525                self.dbus_mock.EmitSignal('', 's', signature, args)
526                self.fail(f'EmitSignal did not raise an error for signature "{signature}" and arguments {args}')
527            except dbus.exceptions.DBusException as e:
528                self.assertIn(err, str(e))
529
530        # not enough arguments
531        check('i', [], 'TypeError: More items found')
532        check('is', [1], 'TypeError: More items found')
533
534        # too many arguments
535        check('', [1], 'TypeError: Fewer items found')
536        check('i', [1, 'hello'], 'TypeError: Fewer items found')
537
538        # type mismatch
539        check('u', [-1], 'convert negative value to unsigned')
540        check('i', ['hello'], 'TypeError')
541        check('i', ['hello'], 'integer')
542        check('s', [1], 'TypeError: Expected a string')
543
544    def test_dbus_get_log(self):
545        '''query call logs over D-Bus'''
546
547        self.assertEqual(self.dbus_mock.ClearCalls(), None)
548        self.assertEqual(self.dbus_mock.GetCalls(), dbus.Array([]))
549
550        self.dbus_mock.AddMethod('', 'Do', '', '', '')
551        self.assertEqual(self.dbus_test.Do(), None)
552        mock_log = self.dbus_mock.GetCalls()
553        self.assertEqual(len(mock_log), 1)
554        self.assertGreater(mock_log[0][0], 10000)  # timestamp
555        self.assertEqual(mock_log[0][1], 'Do')
556        self.assertEqual(mock_log[0][2], [])
557
558        self.assertEqual(self.dbus_mock.ClearCalls(), None)
559        self.assertEqual(self.dbus_mock.GetCalls(), dbus.Array([]))
560
561        self.dbus_mock.AddMethod('', 'Wop', 's', 's', 'ret="hello"')
562        self.assertEqual(self.dbus_test.Wop('foo'), 'hello')
563        self.assertEqual(self.dbus_test.Wop('bar'), 'hello')
564        mock_log = self.dbus_mock.GetCalls()
565        self.assertEqual(len(mock_log), 2)
566        self.assertGreater(mock_log[0][0], 10000)  # timestamp
567        self.assertEqual(mock_log[0][1], 'Wop')
568        self.assertEqual(mock_log[0][2], ['foo'])
569        self.assertEqual(mock_log[1][1], 'Wop')
570        self.assertEqual(mock_log[1][2], ['bar'])
571
572        self.assertEqual(self.dbus_mock.ClearCalls(), None)
573        self.assertEqual(self.dbus_mock.GetCalls(), dbus.Array([]))
574
575    def test_dbus_get_method_calls(self):
576        '''query method call logs over D-Bus'''
577
578        self.dbus_mock.AddMethod('', 'Do', '', '', '')
579        self.assertEqual(self.dbus_test.Do(), None)
580        self.assertEqual(self.dbus_test.Do(), None)
581
582        self.dbus_mock.AddMethod('', 'Wop', 's', 's', 'ret="hello"')
583        self.assertEqual(self.dbus_test.Wop('foo'), 'hello')
584        self.assertEqual(self.dbus_test.Wop('bar'), 'hello')
585
586        mock_calls = self.dbus_mock.GetMethodCalls('Do')
587        self.assertEqual(len(mock_calls), 2)
588        self.assertEqual(mock_calls[0][1], [])
589        self.assertEqual(mock_calls[1][1], [])
590
591        mock_calls = self.dbus_mock.GetMethodCalls('Wop')
592        self.assertEqual(len(mock_calls), 2)
593        self.assertGreater(mock_calls[0][0], 10000)  # timestamp
594        self.assertEqual(mock_calls[0][1], ['foo'])
595        self.assertGreater(mock_calls[1][0], 10000)  # timestamp
596        self.assertEqual(mock_calls[1][1], ['bar'])
597
598    def test_dbus_method_called(self):
599        '''subscribe to MethodCalled signal'''
600
601        loop = GLib.MainLoop()
602        caught_signals = []
603
604        def method_called(method, args, **_):
605            caught_signals.append((method, args))
606            loop.quit()
607
608        self.dbus_mock.AddMethod('', 'Do', 's', '', '')
609        self.dbus_mock.connect_to_signal('MethodCalled', method_called)
610        self.assertEqual(self.dbus_test.Do('foo'), None)
611
612        GLib.timeout_add(5000, loop.quit)
613        loop.run()
614
615        self.assertEqual(len(caught_signals), 1)
616        method, args = caught_signals[0]
617        self.assertEqual(method, 'Do')
618        self.assertEqual(len(args), 1)
619        self.assertEqual(args[0], 'foo')
620
621    def test_reset(self):
622        '''resetting to pristine state'''
623
624        self.dbus_mock.AddMethod('', 'Do', '', '', '')
625        self.dbus_mock.AddProperty('', 'propone', True)
626        self.dbus_mock.AddProperty('org.Test.Other', 'proptwo', 1)
627        self.dbus_mock.AddObject('/obj1', '', {}, [])
628
629        self.dbus_mock.Reset()
630
631        # resets properties and keeps the initial object
632        self.assertEqual(self.dbus_props.GetAll(''), {})
633        # resets methods
634        self.assertRaises(dbus.exceptions.DBusException, self.dbus_test.Do)
635        # resets other objects
636        obj1 = self.dbus_con.get_object('org.freedesktop.Test', '/obj1')
637        self.assertRaises(dbus.exceptions.DBusException, obj1.GetAll, '')
638
639
640class TestTemplates(dbusmock.DBusTestCase):
641    '''Test template API'''
642
643    @classmethod
644    def setUpClass(cls):
645        cls.start_session_bus()
646        cls.start_system_bus()
647
648    def test_local(self):
649        '''Load a local template *.py file'''
650
651        with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template:
652            my_template.write(b'''import dbus
653BUS_NAME = 'universe.Ultimate'
654MAIN_OBJ = '/'
655MAIN_IFACE = 'universe.Ultimate'
656SYSTEM_BUS = False
657
658def load(mock, parameters):
659    mock.AddMethods(MAIN_IFACE, [('Answer', '', 'i', 'ret = 42')])
660''')
661            my_template.flush()
662            (p_mock, dbus_ultimate) = self.spawn_server_template(
663                my_template.name, stdout=subprocess.PIPE)
664            self.addCleanup(p_mock.wait)
665            self.addCleanup(p_mock.terminate)
666            self.addCleanup(p_mock.stdout.close)
667
668            # ensure that we don't use/write any .pyc files, they are dangerous
669            # in a world-writable directory like /tmp
670            self.assertFalse(os.path.exists(my_template.name + 'c'))
671            self.assertFalse(os.path.exists(importlib.util.cache_from_source(my_template.name)))
672
673        self.assertEqual(dbus_ultimate.Answer(), 42)
674
675        # should appear in introspection
676        xml = dbus_ultimate.Introspect()
677        self.assertIn('<interface name="universe.Ultimate">', xml)
678        self.assertIn('<method name="Answer">', xml)
679
680        # should not have ObjectManager API by default
681        self.assertRaises(dbus.exceptions.DBusException,
682                          dbus_ultimate.GetManagedObjects)
683
684    def test_static_method(self):
685        '''Static method in a template'''
686
687        with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template:
688            my_template.write(b'''import dbus
689BUS_NAME = 'universe.Ultimate'
690MAIN_OBJ = '/'
691MAIN_IFACE = 'universe.Ultimate'
692SYSTEM_BUS = False
693
694def load(mock, parameters):
695    pass
696
697@dbus.service.method(MAIN_IFACE,
698                     in_signature='',
699                     out_signature='i')
700def Answer(self):
701    return 42
702''')
703            my_template.flush()
704            (p_mock, dbus_ultimate) = self.spawn_server_template(
705                my_template.name, stdout=subprocess.PIPE)
706            self.addCleanup(p_mock.wait)
707            self.addCleanup(p_mock.terminate)
708            self.addCleanup(p_mock.stdout.close)
709
710        self.assertEqual(dbus_ultimate.Answer(), 42)
711
712        # should appear in introspection
713        xml = dbus_ultimate.Introspect()
714        self.assertIn('<interface name="universe.Ultimate">', xml)
715        self.assertIn('<method name="Answer">', xml)
716
717    def test_local_nonexisting(self):
718        self.assertRaises(ImportError, self.spawn_server_template, '/non/existing.py')
719
720    def test_explicit_bus_(self):
721        '''Explicitly set the bus for a template that does not specify SYSTEM_BUS'''
722
723        with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template:
724            my_template.write(b'''import dbus
725BUS_NAME = 'universe.Ultimate'
726MAIN_OBJ = '/'
727MAIN_IFACE = 'universe.Ultimate'
728
729def load(mock, parameters):
730    mock.AddMethods(MAIN_IFACE, [('Answer', '', 'i', 'ret = 42')])
731''')
732            my_template.flush()
733            (p_mock, dbus_ultimate) = self.spawn_server_template(
734                my_template.name, stdout=subprocess.PIPE, system_bus=False)
735            self.addCleanup(p_mock.wait)
736            self.addCleanup(p_mock.terminate)
737            self.addCleanup(p_mock.stdout.close)
738
739        self.wait_for_bus_object('universe.Ultimate', '/')
740        self.assertEqual(dbus_ultimate.Answer(), 42)
741
742    def test_override_bus_(self):
743        '''Override the bus for a template'''
744
745        with tempfile.NamedTemporaryFile(prefix='answer_', suffix='.py') as my_template:
746            my_template.write(b'''import dbus
747BUS_NAME = 'universe.Ultimate'
748MAIN_OBJ = '/'
749MAIN_IFACE = 'universe.Ultimate'
750SYSTEM_BUS = True
751
752def load(mock, parameters):
753    mock.AddMethods(MAIN_IFACE, [('Answer', '', 'i', 'ret = 42')])
754''')
755            my_template.flush()
756            (p_mock, dbus_ultimate) = self.spawn_server_template(
757                my_template.name, stdout=subprocess.PIPE, system_bus=False)
758            self.addCleanup(p_mock.wait)
759            self.addCleanup(p_mock.terminate)
760            self.addCleanup(p_mock.stdout.close)
761
762        self.wait_for_bus_object('universe.Ultimate', '/')
763        self.assertEqual(dbus_ultimate.Answer(), 42)
764
765    def test_object_manager(self):
766        '''Template with ObjectManager API'''
767
768        with tempfile.NamedTemporaryFile(prefix='objmgr_', suffix='.py') as my_template:
769            my_template.write(b'''import dbus
770BUS_NAME = 'org.test.Things'
771MAIN_OBJ = '/org/test/Things'
772IS_OBJECT_MANAGER = True
773SYSTEM_BUS = False
774
775def load(mock, parameters):
776    mock.AddObject('/org/test/Things/Thing1', 'org.test.Do', {'name': 'one'}, [])
777    mock.AddObject('/org/test/Things/Thing2', 'org.test.Do', {'name': 'two'}, [])
778    mock.AddObject('/org/test/Peer', 'org.test.Do', {'name': 'peer'}, [])
779''')
780            my_template.flush()
781            (p_mock, dbus_objmgr) = self.spawn_server_template(
782                my_template.name, stdout=subprocess.PIPE)
783            self.addCleanup(p_mock.wait)
784            self.addCleanup(p_mock.terminate)
785            self.addCleanup(p_mock.stdout.close)
786
787        # should have the two Things, but not the Peer
788        self.assertEqual(dbus_objmgr.GetManagedObjects(),
789                         {'/org/test/Things/Thing1': {'org.test.Do': {'name': 'one'}},
790                          '/org/test/Things/Thing2': {'org.test.Do': {'name': 'two'}}})
791
792        # should appear in introspection
793        xml = dbus_objmgr.Introspect()
794        self.assertIn('<interface name="org.freedesktop.DBus.ObjectManager">', xml)
795        self.assertIn('<method name="GetManagedObjects">', xml)
796        self.assertIn('<node name="Thing1" />', xml)
797        self.assertIn('<node name="Thing2" />', xml)
798
799    def test_reset(self):
800        '''Reset() puts the template back to pristine state'''
801
802        (p_mock, obj_logind) = self.spawn_server_template(
803            'logind', stdout=subprocess.PIPE)
804        self.addCleanup(p_mock.wait)
805        self.addCleanup(p_mock.terminate)
806        self.addCleanup(p_mock.stdout.close)
807
808        # do some property, method, and object changes
809        obj_logind.Set('org.freedesktop.login1.Manager', 'IdleAction', 'frob')
810        mock_logind = dbus.Interface(obj_logind, dbusmock.MOCK_IFACE)
811        mock_logind.AddProperty('org.Test.Other', 'walk', 'silly')
812        mock_logind.AddMethod('', 'DoWalk', '', '', '')
813        mock_logind.AddObject('/obj1', '', {}, [])
814
815        mock_logind.Reset()
816
817        # keeps the objects from the template
818        dbus_con = self.get_dbus(system_bus=True)
819        obj_logind = dbus_con.get_object('org.freedesktop.login1',
820                                         '/org/freedesktop/login1')
821        self.assertEqual(obj_logind.CanSuspend(), 'yes')
822
823        # resets properties
824        self.assertRaises(dbus.exceptions.DBusException,
825                          obj_logind.GetAll, 'org.Test.Other')
826        self.assertEqual(
827            obj_logind.Get('org.freedesktop.login1.Manager', 'IdleAction'),
828            'ignore')
829        # resets methods
830        self.assertRaises(dbus.exceptions.DBusException, obj_logind.DoWalk)
831        # resets other objects
832        obj1 = dbus_con.get_object('org.freedesktop.login1', '/obj1')
833        self.assertRaises(dbus.exceptions.DBusException, obj1.GetAll, '')
834
835
836class TestCleanup(dbusmock.DBusTestCase):
837    '''Test cleanup of resources'''
838
839    def test_mock_terminates_with_bus(self):
840        '''Spawned mock processes exit when bus goes down'''
841
842        self.start_session_bus()
843        p_mock = self.spawn_server('org.freedesktop.Test',
844                                   '/',
845                                   'org.freedesktop.Test.Main')
846        self.stop_dbus(self.session_bus_pid)
847
848        # give the mock 2 seconds to terminate
849        timeout = 20
850        while timeout > 0:
851            if p_mock.poll() is not None:
852                break
853            timeout -= 1
854            time.sleep(0.1)
855
856        if p_mock.poll() is None:
857            # clean up manually
858            p_mock.terminate()
859            p_mock.wait()
860            self.fail('mock process did not terminate after 2 seconds')
861
862        self.assertEqual(p_mock.wait(), 0)
863
864
865class TestSubclass(dbusmock.DBusTestCase):
866    '''Test subclassing DBusMockObject'''
867
868    @classmethod
869    def setUpClass(cls):
870        cls.start_session_bus()
871
872    def test_ctor(self):
873        '''Override DBusMockObject constructor'''
874
875        class MyMock(dbusmock.mockobject.DBusMockObject):
876            def __init__(self):
877                bus_name = dbus.service.BusName('org.test.MyMock',
878                                                dbusmock.testcase.DBusTestCase.get_dbus())
879                dbusmock.mockobject.DBusMockObject.__init__(
880                    self, bus_name, '/', 'org.test.A', {}, os.devnull)
881                self.AddMethod('', 'Ping', '', 'i', 'ret = 42')
882
883        m = MyMock()
884        self.assertEqual(m.Ping(), 42)  # pylint: disable=no-member
885
886    def test_none_props(self):
887        '''object with None properties argument'''
888
889        class MyMock(dbusmock.mockobject.DBusMockObject):
890            def __init__(self):
891                bus_name = dbus.service.BusName('org.test.MyMock',
892                                                dbusmock.testcase.DBusTestCase.get_dbus())
893                dbusmock.mockobject.DBusMockObject.__init__(
894                    self, bus_name, '/mymock', 'org.test.MyMockI', None, os.devnull)
895                self.AddMethod('', 'Ping', '', 'i', 'ret = 42')
896
897        m = MyMock()
898        self.assertEqual(m.Ping(), 42)  # pylint: disable=no-member
899        self.assertEqual(m.GetAll('org.test.MyMockI'), {})
900
901        m.AddProperty('org.test.MyMockI', 'blurb', 5)
902        self.assertEqual(m.GetAll('org.test.MyMockI'), {'blurb': 5})
903
904
905if __name__ == '__main__':
906    # avoid writing to stderr
907    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout))
908