1'''
2Tests for dispatcher.py
3'''
4import unittest
5
6import lib
7lib.setup_env()
8
9from mock import Mock
10import sys
11import socket
12
13from gajim.common.socks5 import *
14from gajim.common import jingle_xtls
15
16class fake_sock(Mock):
17    def __init__(self, sockobj):
18        Mock.__init__(self)
19
20        self.sockobj = sockobj
21
22
23    def setup_stream(self):
24        sha1 = self.sockobj._get_sha1_auth()
25
26        self.incoming = []
27        self.incoming.append(self.sockobj._get_auth_response())
28        self.incoming.append(self.sockobj._get_request_buff(sha1, 0x00))
29        self.outgoing = []
30        self.outgoing.append(self.sockobj._get_auth_buff())
31        self.outgoing.append(self.sockobj._get_request_buff(sha1))
32
33    def switch_stream(self):
34        # Roles are reversed, client will be expecting server stream
35        # and server will be expecting client stream
36
37        temp = self.incoming
38        self.incoming = self.outgoing
39        self.outgoing = temp
40
41    def _recv(self, foo):
42        return self.incoming.pop(0)
43
44    def _send(self, data):
45        # This method is surrounded by a try block,
46        # we can't use assert here
47
48        if data != self.outgoing[0]:
49            print('FAILED SENDING TEST')
50        self.outgoing.pop(0)
51
52class fake_idlequeue(Mock):
53
54    def __init__(self):
55        Mock.__init__(self)
56
57    def plug_idle(self, obj, writable=True, readable=True):
58
59        if readable:
60            obj.pollin()
61        if writable:
62            obj.pollout()
63
64class TestSocks5(unittest.TestCase):
65    '''
66    Test class for Socks5
67    '''
68    def setUp(self):
69        streamhost = { 'host': None,
70                       'port': 1,
71                       'initiator' : None,
72                       'target' : None}
73        queue = Mock()
74        queue.file_props = {}
75        #self.sockobj = Socks5Receiver(fake_idlequeue(), streamhost, None)
76        self.sockobj = Socks5Sender(fake_idlequeue(), None, 'server', Mock() ,
77                                    None, None, True, file_props={})
78        sock = fake_sock(self.sockobj)
79        self.sockobj._sock = sock
80        self.sockobj._recv = sock._recv
81        self.sockobj._send = sock._send
82        self.sockobj.state = 1
83        self.sockobj.connected = True
84        self.sockobj.pollend = self._pollend
85
86        # Something that the receiver needs
87        #self.sockobj.file_props['type'] = 'r'
88
89        # Something that the sender needs
90        self.sockobj.file_props = {}
91        self.sockobj.file_props['type'] = 'r'
92        self.sockobj.file_props['paused'] = ''
93        self.sockobj.queue = Mock()
94        self.sockobj.queue.process_result = self._pollend
95
96    def _pollend(self, foo = None, duu = None):
97        # This is a disconnect function
98        sys.exit("end of the road")
99
100    def _check_inout(self):
101        # Check if there isn't anything else to receive or send
102        sock = self.sockobj._sock
103        assert(sock.incoming == [])
104        assert(sock.outgoing == [])
105
106    def test_connection_server(self):
107        return
108        mocksock = self.sockobj._sock
109        mocksock.setup_stream()
110        #self.sockobj._sock.switch_stream()
111        s = socket.socket(2, 1, 6)
112        server = ('127.0.0.1', 28000)
113
114        s.connect(server)
115
116        s.send(mocksock.outgoing.pop(0))
117        self.assertEqual(s.recv(64), mocksock.incoming.pop(0))
118
119        s.send(mocksock.outgoing.pop(0))
120        self.assertEqual(s.recv(64), mocksock.incoming.pop(0))
121
122    def test_connection_client(self):
123
124
125        mocksock = self.sockobj._sock
126        mocksock.setup_stream()
127        mocksock.switch_stream()
128        s = socket.socket(10, 1, 6)
129
130
131        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
132        s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
133        s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
134
135        netadd = ('::', 28000, 0, 0)
136        s.bind(netadd)
137        s.listen(socket.SOMAXCONN)
138        (s, address) = s.accept()
139
140
141        self.assertEqual(s.recv(64), mocksock.incoming.pop(0))
142        s.send(mocksock.outgoing.pop(0))
143
144        buff = s.recv(64)
145        inco = mocksock.incoming.pop(0)
146        #self.assertEqual(s.recv(64), mocksock.incoming.pop(0))
147        s.send(mocksock.outgoing.pop(0))
148
149    def test_client_negoc(self):
150        return
151        self.sockobj._sock.setup_stream()
152        try:
153            self.sockobj.pollout()
154        except SystemExit:
155            pass
156
157        self._check_inout()
158
159    def test_server_negoc(self):
160        return
161        self.sockobj._sock.setup_stream()
162        self.sockobj._sock.switch_stream()
163        try:
164            self.sockobj.idlequeue.plug_idle(self.sockobj, False, True)
165        except SystemExit:
166            pass
167        self._check_inout()
168
169
170if __name__ == '__main__':
171
172    unittest.main()
173