1import dbus
2import constants as cs
3
4from servicetest import EventPattern
5from file_transfer_helper import SendFileTest, ReceiveFileTest, \
6    FileTransferTest, exec_file_transfer_test
7
8from config import JINGLE_FILE_TRANSFER_ENABLED
9
10if not JINGLE_FILE_TRANSFER_ENABLED:
11    print "NOTE: built with --disable-file-transfer or --disable-voip"
12    raise SystemExit(77)
13
14class ReceiveFileDecline(ReceiveFileTest):
15
16    def __init__(self, file, address_type, access_control, access_control_param):
17        FileTransferTest.__init__(self, file, address_type, access_control,
18                                  access_control_param)
19        self._actions = [self.connect, self.set_ft_caps, None,
20
21                         self.wait_for_ft_caps, None,
22
23                         self.check_new_channel, self.close_and_check, self.done]
24
25    def close_and_check(self):
26        self.channel.Close()
27
28        state_event, event, _ = self.q.expect_many(
29            EventPattern('dbus-signal', signal='FileTransferStateChanged',
30                         path=self.channel.object_path),
31            EventPattern('stream-iq', stream=self.stream,
32                         iq_type='set', query_name='session'),
33            EventPattern('dbus-signal', signal='Closed',
34                         path=self.channel.object_path))
35
36        state, reason = state_event.args
37        assert state == cs.FT_STATE_CANCELLED
38        assert reason == cs.FT_STATE_CHANGE_REASON_LOCAL_STOPPED
39
40        while event.query.getAttribute('type') != 'terminate':
41            event = self.q.expect('stream-iq', stream=self.stream,
42                                  iq_type='set', query_name='session')
43
44
45
46class SendFileDeclined (SendFileTest):
47    def __init__(self, file, address_type,
48                 access_control, acces_control_param):
49        FileTransferTest.__init__(self, file, address_type,
50                                  access_control, acces_control_param)
51
52        self._actions = [self.connect, self.set_ft_caps,
53                         self.check_ft_available, None,
54
55                         self.wait_for_ft_caps, None,
56
57                         self.request_ft_channel, self.provide_file, None,
58
59                         self.check_declined, self.close_channel, self.done]
60
61    def check_declined(self):
62        state_event = self.q.expect('dbus-signal',
63                                    signal='FileTransferStateChanged',
64                                    path=self.channel.object_path)
65
66        state, reason = state_event.args
67        assert state == cs.FT_STATE_CANCELLED
68        assert reason == cs.FT_STATE_CHANGE_REASON_REMOTE_STOPPED
69
70        transferred = self.ft_props.Get(cs.CHANNEL_TYPE_FILE_TRANSFER,
71                                        'TransferredBytes')
72        # no byte has been transferred as the file was declined
73        assert transferred == 0
74
75        # try to provide the file
76        try:
77            self.provide_file()
78        except dbus.DBusException, e:
79            assert e.get_dbus_name() == cs.NOT_AVAILABLE
80        else:
81            assert False
82
83
84if __name__ == '__main__':
85    exec_file_transfer_test(SendFileDeclined, ReceiveFileDecline)
86