1# Copyright (C) 2009 Nokia Corporation
2# Copyright (C) 2009 Collabora Ltd.
3#
4# This library is free software; you can redistribute it and/or
5# modify it under the terms of the GNU Lesser General Public
6# License as published by the Free Software Foundation; either
7# version 2.1 of the License, or (at your option) any later version.
8#
9# This library is distributed in the hope that it will be useful, but
10# WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12# Lesser General Public License for more details.
13#
14# You should have received a copy of the GNU Lesser General Public
15# License along with this library; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
17# 02110-1301 USA
18
19import dbus
20"""Regression test for recovering from an MC crash.
21"""
22
23import os
24
25import dbus
26import dbus.service
27
28from servicetest import EventPattern, call_async
29from mctest import exec_test, SimulatedConnection, SimulatedClient, \
30        create_fakecm_account, enable_fakecm_account, SimulatedChannel, \
31        expect_client_setup, MC
32import constants as cs
33
34account_id = 'fakecm/fakeprotocol/jc_2edenton_40unatco_2eint'
35
36def preseed(q, bus, fake_accounts_service):
37    accounts_dir = os.environ['MC_ACCOUNT_DIR']
38
39    try:
40        os.mkdir(accounts_dir, 0o700)
41    except OSError:
42        pass
43
44    fake_accounts_service.update_attributes(account_id, changed={
45        'manager': 'fakecm',
46        'protocol': 'fakeprotocol',
47        'DisplayName': 'Work account',
48        'NormalizedName': 'jc.denton@unatco.int',
49        'Enabled': True,
50        })
51    fake_accounts_service.update_parameters(account_id, untyped={
52        'account': 'jc.denton@unatco.int',
53        'password': 'ionstorm',
54        })
55
56    account_connections_file = open(accounts_dir + '/.mc_connections', 'w')
57
58    account_connections_file.write("%s\t%s\t%s\n" %
59            (cs.tp_path_prefix + '/Connection/fakecm/fakeprotocol/jc',
60                cs.tp_name_prefix + '.Connection.fakecm.fakeprotocol.jc',
61                'fakecm/fakeprotocol/jc_2edenton_40unatco_2eint'))
62
63def test(q, bus, unused, **kwargs):
64    fake_accounts_service = kwargs['fake_accounts_service']
65    preseed(q, bus, fake_accounts_service)
66
67    text_fixed_properties = dbus.Dictionary({
68        cs.CHANNEL + '.TargetHandleType': cs.HT_CONTACT,
69        cs.CHANNEL + '.ChannelType': cs.CHANNEL_TYPE_TEXT,
70        }, signature='sv')
71
72    conn = SimulatedConnection(q, bus, 'fakecm', 'fakeprotocol',
73            'jc', 'jc.denton@unatco.int')
74    conn.StatusChanged(cs.CONN_STATUS_CONNECTED, 0)
75
76    unhandled_properties = dbus.Dictionary(text_fixed_properties, signature='sv')
77    unhandled_properties[cs.CHANNEL + '.Interfaces'] = dbus.Array(signature='s')
78    unhandled_properties[cs.CHANNEL + '.TargetID'] = 'anna.navarre@unatco.int'
79    unhandled_properties[cs.CHANNEL + '.TargetHandle'] = \
80            dbus.UInt32(conn.ensure_handle(cs.HT_CONTACT, 'anna.navarre@unatco.int'))
81    unhandled_properties[cs.CHANNEL + '.InitiatorHandle'] = dbus.UInt32(conn.self_handle)
82    unhandled_properties[cs.CHANNEL + '.InitiatorID'] = conn.self_ident
83    unhandled_properties[cs.CHANNEL + '.Requested'] = True
84    unhandled_chan = SimulatedChannel(conn, unhandled_properties)
85    unhandled_chan.announce()
86
87    handled_properties = dbus.Dictionary(text_fixed_properties, signature='sv')
88    handled_properties[cs.CHANNEL + '.Interfaces'] = dbus.Array(signature='s')
89    handled_properties[cs.CHANNEL + '.TargetID'] = 'gunther.hermann@unatco.int'
90    handled_properties[cs.CHANNEL + '.TargetHandle'] = \
91            dbus.UInt32(conn.ensure_handle(cs.HT_CONTACT, 'gunther.hermann@unatco.int'))
92    handled_properties[cs.CHANNEL + '.InitiatorHandle'] = dbus.UInt32(conn.self_handle)
93    handled_properties[cs.CHANNEL + '.InitiatorID'] = conn.self_ident
94    handled_properties[cs.CHANNEL + '.Requested'] = True
95    handled_chan = SimulatedChannel(conn, handled_properties)
96    handled_chan.announce()
97
98    client = SimulatedClient(q, bus, 'Empathy',
99            observe=[text_fixed_properties], approve=[text_fixed_properties],
100            handle=[text_fixed_properties], bypass_approval=False)
101    client.handled_channels.append(handled_chan.object_path)
102
103    # Service-activate MC.
104    # We're told about the other channel as an observer...
105    mc = MC(q, bus, wait_for_names=False)
106    e, = mc.wait_for_names(
107            EventPattern('dbus-method-call',
108                path=client.object_path,
109                interface=cs.OBSERVER, method='ObserveChannels',
110                handled=False),
111            )
112
113    assert e.args[1] == conn.object_path, e.args
114    channels = e.args[2]
115    assert channels[0][0] == unhandled_chan.object_path, channels
116    q.dbus_return(e.message, signature='')
117
118    # ... and as a handler
119    e = q.expect('dbus-method-call',
120            path=client.object_path,
121            interface=cs.HANDLER, method='HandleChannels',
122            handled=False)
123    assert e.args[1] == conn.object_path, e.args
124    channels = e.args[2]
125    assert channels[0][0] == unhandled_chan.object_path, channels
126    q.dbus_return(e.message, signature='')
127
128if __name__ == '__main__':
129    exec_test(test, {}, preload_mc=False, use_fake_accounts_service=True,
130            pass_kwargs=True)
131