1''' 2Tests for dispatcher.py 3''' 4import unittest 5 6import lib 7lib.setup_env() 8 9from mock import Mock 10import sys 11import socket 12 13from gajim.common.socks5 import * 14from gajim.common import jingle_xtls 15 16class fake_sock(Mock): 17 def __init__(self, sockobj): 18 Mock.__init__(self) 19 20 self.sockobj = sockobj 21 22 23 def setup_stream(self): 24 sha1 = self.sockobj._get_sha1_auth() 25 26 self.incoming = [] 27 self.incoming.append(self.sockobj._get_auth_response()) 28 self.incoming.append(self.sockobj._get_request_buff(sha1, 0x00)) 29 self.outgoing = [] 30 self.outgoing.append(self.sockobj._get_auth_buff()) 31 self.outgoing.append(self.sockobj._get_request_buff(sha1)) 32 33 def switch_stream(self): 34 # Roles are reversed, client will be expecting server stream 35 # and server will be expecting client stream 36 37 temp = self.incoming 38 self.incoming = self.outgoing 39 self.outgoing = temp 40 41 def _recv(self, foo): 42 return self.incoming.pop(0) 43 44 def _send(self, data): 45 # This method is surrounded by a try block, 46 # we can't use assert here 47 48 if data != self.outgoing[0]: 49 print('FAILED SENDING TEST') 50 self.outgoing.pop(0) 51 52class fake_idlequeue(Mock): 53 54 def __init__(self): 55 Mock.__init__(self) 56 57 def plug_idle(self, obj, writable=True, readable=True): 58 59 if readable: 60 obj.pollin() 61 if writable: 62 obj.pollout() 63 64class TestSocks5(unittest.TestCase): 65 ''' 66 Test class for Socks5 67 ''' 68 def setUp(self): 69 streamhost = { 'host': None, 70 'port': 1, 71 'initiator' : None, 72 'target' : None} 73 queue = Mock() 74 queue.file_props = {} 75 #self.sockobj = Socks5Receiver(fake_idlequeue(), streamhost, None) 76 self.sockobj = Socks5Sender(fake_idlequeue(), None, 'server', Mock() , 77 None, None, True, file_props={}) 78 sock = fake_sock(self.sockobj) 79 self.sockobj._sock = sock 80 self.sockobj._recv = sock._recv 81 self.sockobj._send = sock._send 82 self.sockobj.state = 1 83 self.sockobj.connected = True 84 self.sockobj.pollend = self._pollend 85 86 # Something that the receiver needs 87 #self.sockobj.file_props['type'] = 'r' 88 89 # Something that the sender needs 90 self.sockobj.file_props = {} 91 self.sockobj.file_props['type'] = 'r' 92 self.sockobj.file_props['paused'] = '' 93 self.sockobj.queue = Mock() 94 self.sockobj.queue.process_result = self._pollend 95 96 def _pollend(self, foo = None, duu = None): 97 # This is a disconnect function 98 sys.exit("end of the road") 99 100 def _check_inout(self): 101 # Check if there isn't anything else to receive or send 102 sock = self.sockobj._sock 103 assert(sock.incoming == []) 104 assert(sock.outgoing == []) 105 106 def test_connection_server(self): 107 return 108 mocksock = self.sockobj._sock 109 mocksock.setup_stream() 110 #self.sockobj._sock.switch_stream() 111 s = socket.socket(2, 1, 6) 112 server = ('127.0.0.1', 28000) 113 114 s.connect(server) 115 116 s.send(mocksock.outgoing.pop(0)) 117 self.assertEqual(s.recv(64), mocksock.incoming.pop(0)) 118 119 s.send(mocksock.outgoing.pop(0)) 120 self.assertEqual(s.recv(64), mocksock.incoming.pop(0)) 121 122 def test_connection_client(self): 123 124 125 mocksock = self.sockobj._sock 126 mocksock.setup_stream() 127 mocksock.switch_stream() 128 s = socket.socket(10, 1, 6) 129 130 131 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 132 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 133 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 134 135 netadd = ('::', 28000, 0, 0) 136 s.bind(netadd) 137 s.listen(socket.SOMAXCONN) 138 (s, address) = s.accept() 139 140 141 self.assertEqual(s.recv(64), mocksock.incoming.pop(0)) 142 s.send(mocksock.outgoing.pop(0)) 143 144 buff = s.recv(64) 145 inco = mocksock.incoming.pop(0) 146 #self.assertEqual(s.recv(64), mocksock.incoming.pop(0)) 147 s.send(mocksock.outgoing.pop(0)) 148 149 def test_client_negoc(self): 150 return 151 self.sockobj._sock.setup_stream() 152 try: 153 self.sockobj.pollout() 154 except SystemExit: 155 pass 156 157 self._check_inout() 158 159 def test_server_negoc(self): 160 return 161 self.sockobj._sock.setup_stream() 162 self.sockobj._sock.switch_stream() 163 try: 164 self.sockobj.idlequeue.plug_idle(self.sockobj, False, True) 165 except SystemExit: 166 pass 167 self._check_inout() 168 169 170if __name__ == '__main__': 171 172 unittest.main() 173