1 #include "utp.h"
2 #include "utp_utils.h"
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <assert.h>
7 #include "templates.h"
8 
9 #include <errno.h>
10 #ifdef WIN32
11 // newer versions of MSVC define these in errno.h
12 #ifndef ECONNRESET
13 #define ECONNRESET WSAECONNRESET
14 #endif
15 #endif
16 
17 #ifdef POSIX
18 typedef sockaddr_storage SOCKADDR_STORAGE;
19 #endif // POSIX
20 
21 #ifdef _WIN32
22 #define msleep(x) Sleep(x)
23 #else
24 #include <unistd.h>
25 #define msleep(x) usleep(x*1000)
26 #endif
27 
28 #define utassert assert
29 #define utassert_failmsg(expr,failstmt) if (!(expr)) { failstmt; utassert(#expr); }
30 
31 extern uint32 g_current_ms;
32 
33 struct utp_socket {
34 
35 	utp_socket(UTPSocket* s);
36 	~utp_socket();
37 
38 	void close();
39 	size_t write(char const* buf, size_t count);
40 	void flush_write();
41 
42 	static void utp_read(void* socket, const byte* bytes, size_t count);
43 	static void on_utp_write(void *socket, byte *bytes, size_t count);
44 	static void on_utp_state(void *socket, int state);
45 	static void on_utp_error(void *socket, int errcode);
on_utp_overheadutp_socket46 	static void on_utp_overhead(void *socket, bool send, size_t count, int type) {}
47 
get_rb_sizeutp_socket48 	static size_t get_rb_size(void *socket)
49 	{ return 0; }
50 
51 	// write buffer
52 	size_t _buf_size;
53 	char _buffer[10*1024*2];
54 
55 	size_t _read_bytes;
56 	bool _connected;
57 	bool _readable;
58 	bool _writable;
59 	bool _ignore_reset;
60 	bool _destroyed;
61 
62 	UTPSocket* _sock;
63 };
64 
65 UTPFunctionTable utp_callbacks = {
66 	&utp_socket::utp_read,
67 	&utp_socket::on_utp_write,
68 	&utp_socket::get_rb_size,
69 	&utp_socket::on_utp_state,
70 	&utp_socket::on_utp_error,
71 	&utp_socket::on_utp_overhead
72 };
73 
74 utp_socket* incoming = NULL;
75 
76 struct TestUdpOutgoing {
77 	int timestamp;
78 	SOCKADDR_STORAGE addr;
79 	socklen_t addrlen;
80 	size_t len;
81 	byte mem[1];
82 };
83 
84 struct test_manager
85 {
test_managertest_manager86 	test_manager() :
87 		_receiver(NULL), _loss_counter(0), _loss_every(0), _reorder_counter(0), _reorder_every(0)
88 	{
89 		_send_buffer.Init();
90 	}
drop_one_packet_everytest_manager91 	void drop_one_packet_every(int x) { _loss_every = x; }
reorder_one_packet_everytest_manager92 	void reorder_one_packet_every(int x) { _reorder_every = x; }
IncomingUTPtest_manager93 	void IncomingUTP(UTPSocket* conn)
94 	{
95 		//printf("\nIn IncomingUTP\n");
96 		utassert_failmsg(incoming == NULL, printf("\nincoming expected NULL actual %p\n", incoming));
97 		incoming = new utp_socket(conn);
98 		incoming->_connected = true;
99 		incoming->_writable = true;
100 	}
101 
102 	void Send(const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen);
103 	void Flush(uint32 start_time, uint32 max_time);
104 	void clear();
bindtest_manager105 	void bind(test_manager* receiver)
106 	{
107 		_receiver = receiver;
108 	}
109 
~test_managertest_manager110 	~test_manager()
111 	{
112 		clear();
113 		_send_buffer.Free();
114 	}
115 
116 	test_manager* _receiver;
117 	int _loss_counter;
118 	int _loss_every;
119 
120 	int _reorder_counter;
121 	int _reorder_every;
122 
123 	Array<TestUdpOutgoing*> _send_buffer;
124 };
125 
ComparePacketTimestamp(TestUdpOutgoing * const * lhs,TestUdpOutgoing * const * rhs)126 int ComparePacketTimestamp(TestUdpOutgoing* const* lhs, TestUdpOutgoing* const* rhs)
127 {
128 	return (*lhs)->timestamp - (*rhs)->timestamp;
129 }
130 
test_incoming_proc(void * userdata,UTPSocket * conn)131 void test_incoming_proc(void *userdata, UTPSocket* conn)
132 {
133 	((test_manager*)userdata)->IncomingUTP(conn);
134 }
135 
test_send_to_proc(void * userdata,const byte * p,size_t len,const struct sockaddr * to,socklen_t tolen)136 void test_send_to_proc(void *userdata, const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen)
137 {
138 	((test_manager*)userdata)->Send(p, len, to, tolen);
139 }
140 
Flush(uint32 start_time,uint32 max_time)141 void test_manager::Flush(uint32 start_time, uint32 max_time)
142 {
143 	//printf("In test_manager::Flush");
144 	_send_buffer.Sort(&ComparePacketTimestamp);
145 
146 	for (size_t i = 0; i < _send_buffer.GetCount(); ++i) {
147 		TestUdpOutgoing *uo = _send_buffer[i];
148 //		utassert(uo);
149 
150 		if ((uint32)uo->timestamp > g_current_ms) continue;
151 
152 		if (_receiver) {
153 			// Lookup the right UTP socket that can handle this message
154 			UTP_IsIncomingUTP(&test_incoming_proc, &test_send_to_proc, _receiver, uo->mem, uo->len,
155 							  (const struct sockaddr*)&uo->addr, uo->addrlen);
156 		}
157 
158 		_send_buffer.MoveUpLast(i);
159 		--i;
160 		free(uo);
161 	}
162 }
163 
clear()164 void test_manager::clear()
165 {
166 	_loss_every = 0;
167 	_reorder_every = 0;
168 	_loss_counter = 0;
169 	_reorder_counter = 0;
170 	for(size_t i = 0; i < _send_buffer.GetCount(); i++) {
171 		free(_send_buffer[i]);
172 	}
173 	_send_buffer.Clear();
174 }
175 
Send(const byte * p,size_t len,const struct sockaddr * to,socklen_t tolen)176 void test_manager::Send(const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen)
177 {
178 	if (_loss_every > 0 && _loss_counter == _loss_every) {
179 		_loss_counter = 0;
180 		//printf("DROP!\n");
181 		return;
182 	} else {
183 		++_loss_counter;
184 	}
185 
186 	int delay = 10 + rand() % 30;
187 
188 	++_reorder_counter;
189 	if (_reorder_counter >= _reorder_every && _reorder_every > 0) {
190 		delay = 9;
191 		_reorder_counter = 0;
192 	}
193 
194 	TestUdpOutgoing *q = (TestUdpOutgoing*)malloc(sizeof(TestUdpOutgoing) - 1 + len);
195 	q->timestamp = g_current_ms + delay;
196 	memcpy(&q->addr, to, tolen);
197 	q->addrlen = tolen;
198 	q->len = len;
199 	memcpy(q->mem, p, len);
200 	_send_buffer.Append(q);
201 }
202 
203 test_manager* send_udp_manager = 0;
204 test_manager* receive_udp_manager = 0;
205 
utp_socket(UTPSocket * s)206 utp_socket::utp_socket(UTPSocket* s) :
207 	_buf_size(0), _read_bytes(0),
208 	_connected(false), _readable(false), _writable(false), _ignore_reset(false),
209 	_destroyed(false),  _sock(s)
210 {
211 //	printf("utp_socket: %x sock: %x\n", this, _sock);
212 	utassert(s);
213 	UTP_SetCallbacks(_sock, &utp_callbacks, this);
214 }
215 
close()216 void utp_socket::close()
217 {
218 //	printf("~utp_socket: %x\n", this);
219 	UTP_Close(_sock);
220 }
221 
~utp_socket()222 utp_socket::~utp_socket()
223 {
224 	utassert(_sock == NULL);
225 }
226 
utp_read(void * socket,const byte * bytes,size_t count)227 void utp_socket::utp_read(void* socket, const byte* bytes, size_t count)
228 {
229 	utp_socket* s = (utp_socket*)socket;
230 	//printf("received %d\n", count);
231 	s->_read_bytes += count;
232 	//printf("utp_socket::read %x sock: %x bytes: %d\n", s, s->_sock, bytes);
233 // TODO: assert the bytes we receive matches the pattern we sent
234 }
235 
236 // called when the socket is ready to write count bytes
on_utp_write(void * socket,byte * bytes,size_t count)237 void utp_socket::on_utp_write(void *socket, byte *bytes, size_t count)
238 {
239 	utp_socket* s = (utp_socket*)socket;
240 	//printf("utp_socket::write %x sock: %x\n", s, s->_sock);
241 //	utassert(count <= s->_buf_size);
242 	memcpy(bytes, s->_buffer, count);
243 	memmove(s->_buffer, s->_buffer+count, s->_buf_size - count);
244 	s->_buf_size -= count;
245 	//printf("sending %d bytes (%d left)\n", count, s->_buf_size);
246 }
247 
flush_write()248 void utp_socket::flush_write()
249 {
250 	//printf("utp_socket::flush_write %x sock: %x\n", this, _sock);
251 	if (!_writable) return;
252 	if (_buf_size == 0) return;
253 
254 	_writable = UTP_Write(_sock, _buf_size);
255 //	if (!_writable) printf("not writable\n");
256 }
257 
on_utp_state(void * socket,int state)258 void utp_socket::on_utp_state(void *socket, int state)
259 {
260 	utp_socket* s = (utp_socket*)socket;
261 	utassert(s->_sock);
262 	//printf("utp_socket::state %x sock: %x\n", s, s->_sock);
263 	switch(state) {
264 	case UTP_STATE_CONNECT:
265 		utassert(!s->_destroyed);
266 		s->_connected = true;
267 //		printf("connected!\n");
268 		s->_writable = true;
269 		s->flush_write();
270 		break;
271 	case UTP_STATE_WRITABLE:
272 		utassert(s->_connected && !s->_destroyed);
273 		s->_writable = true;
274 //		printf("writable!\n");
275 		s->flush_write();
276 		break;
277 	case UTP_STATE_DESTROYING:
278 		utassert(!s->_destroyed);
279 		s->_connected = false;
280 		s->_readable = false;
281 		s->_writable = false;
282 		s->_destroyed = true;
283 		s->_sock = NULL;
284 		break;
285 	case UTP_STATE_EOF:
286 		utassert(s->_connected && !s->_destroyed);
287 		s->_readable = false;
288 		s->_connected = false;
289 		break;
290 	}
291 }
292 
293 bool g_error = false;
294 
on_utp_error(void * socket,int errcode)295 void utp_socket::on_utp_error(void *socket, int errcode)
296 {
297 	printf("\nUTP ERROR: %d for socket %p\n", errcode, socket);
298 	utp_socket* usock = ((utp_socket*)socket);
299 	if (!usock->_ignore_reset || errcode != ECONNRESET) {
300 		g_error = true;
301 		utassert(false);
302 	}
303 	usock->close();
304 }
305 
write(char const * buf,size_t count)306 size_t utp_socket::write(char const* buf, size_t count)
307 {
308 	assert(_buf_size <= sizeof(_buffer));
309 	size_t free = sizeof(_buffer) - _buf_size;
310 	size_t to_write = count < free ? count : free;
311 	if (to_write == 0) return 0;
312 	memcpy(_buffer + _buf_size, buf, to_write);
313 	_buf_size += count;
314 	//printf("writing %d bytes to write buffer\n", count);
315 	flush_write();
316 	return to_write;
317 }
318 
tick()319 void tick()
320 {
321 	static int tick_counter = 0;
322 
323 	++tick_counter;
324 	if (tick_counter == 10) {
325 		tick_counter = 0;
326 		UTP_CheckTimeouts();
327 	}
328 
329 	uint32 start_time = UTP_GetMilliseconds();
330 	uint32 max_time = 1000;
331 
332 	send_udp_manager->Flush(start_time, max_time);
333 	receive_udp_manager->Flush(start_time, max_time);
334 
335 	msleep(5);
336 }
337 
338 enum flags_t {
339 	use_utp_v1 = 1,
340 	simulate_packetloss = 2,
341 	simulate_packetreorder = 4,
342 	heavy_loss = 8,
343 };
344 
test_transfer(int flags)345 void test_transfer(int flags)
346 {
347 	sockaddr_in sin;
348 	memset(&sin, 0, sizeof(sin));
349 	sin.sin_family = AF_INET;
350 	sin.sin_addr.s_addr = inet_addr("127.0.0.1");
351 	sin.sin_port = htons(12345);
352 
353 	UTPSocket* sock = UTP_Create(&test_send_to_proc, send_udp_manager,
354 								 (const struct sockaddr*)&sin, sizeof(sin));
355 
356 	utp_socket* sender = new utp_socket(sock);
357 	if (flags & use_utp_v1) {
358 		UTP_SetSockopt(sender->_sock, SO_UTPVERSION, 1);
359 	} else {
360 		UTP_SetSockopt(sender->_sock, SO_UTPVERSION, 0);
361 	}
362 
363 	send_udp_manager->clear();
364 	receive_udp_manager->clear();
365 
366 	if (flags & simulate_packetloss) {
367 		send_udp_manager->drop_one_packet_every(33);
368 		receive_udp_manager->drop_one_packet_every(47);
369 
370 		if (flags & heavy_loss) {
371 			send_udp_manager->drop_one_packet_every(7);
372 			receive_udp_manager->drop_one_packet_every(13);
373 		}
374 	}
375 
376 	if (flags & simulate_packetreorder) {
377 		send_udp_manager->reorder_one_packet_every(27);
378 		receive_udp_manager->reorder_one_packet_every(23);
379 	}
380 
381 	UTP_Connect(sender->_sock);
382 
383 	for (int i = 0; i < 1500; ++i) {
384 		tick();
385 		if (sender->_connected && incoming) break;
386 	}
387 	utassert(incoming);
388 	if (!incoming) return;
389 	utassert(sender->_connected);
390 	if (!sender->_connected) return;
391 
392 	char buffer[16*1024];
393 	for (size_t i = 0; i < sizeof(buffer); ++i) buffer[i] = i & 0xff;
394 
395 	const size_t send_target = 10 * 16 * 1024;
396 
397 	size_t written = sender->write(buffer, sizeof(buffer));
398 	utassert(written > 0);
399 
400 	for (int i = 0; i < 20000; ++i) {
401 		tick();
402 //		utassert(incoming->_read_bytes <= written);
403 //		utassert(written <= send_target);
404 		if (incoming->_read_bytes == send_target) break;
405 		if (written < send_target && sender->_writable)
406 		{
407 			int offset = written % (16 * 1024);
408 			written += sender->write(buffer + offset, 1024 * 16 - offset);
409 //			printf("written: %d\n", written);
410 		}
411 	}
412 	utassert_failmsg(incoming->_read_bytes == written, printf("\nread_bytes: %d written: %d\n", incoming->_read_bytes, written));
413 
414 	sender->close();
415 
416 	for (int i = 0; i < 1500; ++i) {
417 		tick();
418 		if (incoming->_connected == false) break;
419 	}
420 	utassert(incoming->_connected == false);
421 
422 	incoming->close();
423 
424 	// we know at this point that the sender sent all the data and the receiver got EOF.
425 	// shutdown might be disrupted by dropped packets, so ignore RSTs
426 	if (flags & simulate_packetloss) {
427 		sender->_ignore_reset = true;
428 		incoming->_ignore_reset = true;
429 	}
430 
431 	for (int i = 0; i < 1500; ++i) {
432 		tick();
433 		if (sender->_destroyed == true) break;
434 	}
435 	utassert(sender->_destroyed == true);
436 
437 	for (int i = 0; i < 1500; ++i) {
438 		tick();
439 		if (incoming->_destroyed == true) break;
440 	}
441 	utassert(incoming->_destroyed == true);
442 
443 	delete sender;
444 	delete incoming;
445 	incoming = NULL;
446 }
447 
448 bool wrapping_compare_less(uint32 lhs, uint32 rhs);
449 
main()450 int main()
451 {
452 	utassert(wrapping_compare_less(0xfffffff0, 0xffffffff) == true);
453 	utassert(wrapping_compare_less(0xffffffff, 0xfffffff0) == false);
454 	utassert(wrapping_compare_less(0xfff, 0xfffffff0) == false);
455 	utassert(wrapping_compare_less(0xfffffff0, 0xfff) == true);
456 	utassert(wrapping_compare_less(0x0, 0x1) == true);
457 	utassert(wrapping_compare_less(0x1, 0x0) == false);
458 	utassert(wrapping_compare_less(0x1, 0x1) == false);
459 
460 	send_udp_manager = new test_manager;
461 	receive_udp_manager = new test_manager;
462 	send_udp_manager->bind(receive_udp_manager);
463 	receive_udp_manager->bind(send_udp_manager);
464 
465 #define _ if (!g_error)
466 
467 	printf("\nTesting transfer\n");
468 	test_transfer(0);
469 	_ printf("\nTesting transfer with simulated packet loss\n");
470 	_ test_transfer(simulate_packetloss);
471 	_ printf("\nTesting transfer with simulated packet loss and reorder\n");
472 	_ test_transfer(simulate_packetloss | simulate_packetreorder);
473 	_ printf("\nTesting transfer with heavy simulated packet loss and reorder\n");
474 	_ test_transfer(simulate_packetloss | simulate_packetreorder | heavy_loss);
475 	_ printf("\nTesting transfer with simulated packet reorder\n");
476 	_ test_transfer(simulate_packetreorder);
477 
478 	_ printf("\nTesting transfer using utp v1\n");
479 	_ test_transfer(use_utp_v1);
480 	_ printf("\nTesting transfer using utp v1 with simulated packet loss\n");
481 	_ test_transfer(use_utp_v1 | simulate_packetloss);
482 	_ printf("\nTesting transfer using utp v1 with simulated packet loss and reorder\n");
483 	_ test_transfer(use_utp_v1 | simulate_packetloss | simulate_packetreorder);
484 	_ printf("\nTesting transfer using utp v1 with heavy simulated packet loss and reorder\n");
485 	_ test_transfer(use_utp_v1 | simulate_packetloss | simulate_packetreorder | heavy_loss);
486 	_ printf("\nTesting transfer using utp v1 with simulated packet reorder\n");
487 	_ test_transfer(use_utp_v1 | simulate_packetreorder);
488 
489 	return 0;
490 }
491 
492