1 #include "utp.h"
2 #include "utp_utils.h"
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <assert.h>
7 #include "templates.h"
8
9 #include <errno.h>
10 #ifdef WIN32
11 // newer versions of MSVC define these in errno.h
12 #ifndef ECONNRESET
13 #define ECONNRESET WSAECONNRESET
14 #endif
15 #endif
16
17 #ifdef POSIX
18 typedef sockaddr_storage SOCKADDR_STORAGE;
19 #endif // POSIX
20
21 #ifdef _WIN32
22 #define msleep(x) Sleep(x)
23 #else
24 #include <unistd.h>
25 #define msleep(x) usleep(x*1000)
26 #endif
27
28 #define utassert assert
29 #define utassert_failmsg(expr,failstmt) if (!(expr)) { failstmt; utassert(#expr); }
30
31 extern uint32 g_current_ms;
32
33 struct utp_socket {
34
35 utp_socket(UTPSocket* s);
36 ~utp_socket();
37
38 void close();
39 size_t write(char const* buf, size_t count);
40 void flush_write();
41
42 static void utp_read(void* socket, const byte* bytes, size_t count);
43 static void on_utp_write(void *socket, byte *bytes, size_t count);
44 static void on_utp_state(void *socket, int state);
45 static void on_utp_error(void *socket, int errcode);
on_utp_overheadutp_socket46 static void on_utp_overhead(void *socket, bool send, size_t count, int type) {}
47
get_rb_sizeutp_socket48 static size_t get_rb_size(void *socket)
49 { return 0; }
50
51 // write buffer
52 size_t _buf_size;
53 char _buffer[10*1024*2];
54
55 size_t _read_bytes;
56 bool _connected;
57 bool _readable;
58 bool _writable;
59 bool _ignore_reset;
60 bool _destroyed;
61
62 UTPSocket* _sock;
63 };
64
65 UTPFunctionTable utp_callbacks = {
66 &utp_socket::utp_read,
67 &utp_socket::on_utp_write,
68 &utp_socket::get_rb_size,
69 &utp_socket::on_utp_state,
70 &utp_socket::on_utp_error,
71 &utp_socket::on_utp_overhead
72 };
73
74 utp_socket* incoming = NULL;
75
76 struct TestUdpOutgoing {
77 int timestamp;
78 SOCKADDR_STORAGE addr;
79 socklen_t addrlen;
80 size_t len;
81 byte mem[1];
82 };
83
84 struct test_manager
85 {
test_managertest_manager86 test_manager() :
87 _receiver(NULL), _loss_counter(0), _loss_every(0), _reorder_counter(0), _reorder_every(0)
88 {
89 _send_buffer.Init();
90 }
drop_one_packet_everytest_manager91 void drop_one_packet_every(int x) { _loss_every = x; }
reorder_one_packet_everytest_manager92 void reorder_one_packet_every(int x) { _reorder_every = x; }
IncomingUTPtest_manager93 void IncomingUTP(UTPSocket* conn)
94 {
95 //printf("\nIn IncomingUTP\n");
96 utassert_failmsg(incoming == NULL, printf("\nincoming expected NULL actual %p\n", incoming));
97 incoming = new utp_socket(conn);
98 incoming->_connected = true;
99 incoming->_writable = true;
100 }
101
102 void Send(const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen);
103 void Flush(uint32 start_time, uint32 max_time);
104 void clear();
bindtest_manager105 void bind(test_manager* receiver)
106 {
107 _receiver = receiver;
108 }
109
~test_managertest_manager110 ~test_manager()
111 {
112 clear();
113 _send_buffer.Free();
114 }
115
116 test_manager* _receiver;
117 int _loss_counter;
118 int _loss_every;
119
120 int _reorder_counter;
121 int _reorder_every;
122
123 Array<TestUdpOutgoing*> _send_buffer;
124 };
125
ComparePacketTimestamp(TestUdpOutgoing * const * lhs,TestUdpOutgoing * const * rhs)126 int ComparePacketTimestamp(TestUdpOutgoing* const* lhs, TestUdpOutgoing* const* rhs)
127 {
128 return (*lhs)->timestamp - (*rhs)->timestamp;
129 }
130
test_incoming_proc(void * userdata,UTPSocket * conn)131 void test_incoming_proc(void *userdata, UTPSocket* conn)
132 {
133 ((test_manager*)userdata)->IncomingUTP(conn);
134 }
135
test_send_to_proc(void * userdata,const byte * p,size_t len,const struct sockaddr * to,socklen_t tolen)136 void test_send_to_proc(void *userdata, const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen)
137 {
138 ((test_manager*)userdata)->Send(p, len, to, tolen);
139 }
140
Flush(uint32 start_time,uint32 max_time)141 void test_manager::Flush(uint32 start_time, uint32 max_time)
142 {
143 //printf("In test_manager::Flush");
144 _send_buffer.Sort(&ComparePacketTimestamp);
145
146 for (size_t i = 0; i < _send_buffer.GetCount(); ++i) {
147 TestUdpOutgoing *uo = _send_buffer[i];
148 // utassert(uo);
149
150 if ((uint32)uo->timestamp > g_current_ms) continue;
151
152 if (_receiver) {
153 // Lookup the right UTP socket that can handle this message
154 UTP_IsIncomingUTP(&test_incoming_proc, &test_send_to_proc, _receiver, uo->mem, uo->len,
155 (const struct sockaddr*)&uo->addr, uo->addrlen);
156 }
157
158 _send_buffer.MoveUpLast(i);
159 --i;
160 free(uo);
161 }
162 }
163
clear()164 void test_manager::clear()
165 {
166 _loss_every = 0;
167 _reorder_every = 0;
168 _loss_counter = 0;
169 _reorder_counter = 0;
170 for(size_t i = 0; i < _send_buffer.GetCount(); i++) {
171 free(_send_buffer[i]);
172 }
173 _send_buffer.Clear();
174 }
175
Send(const byte * p,size_t len,const struct sockaddr * to,socklen_t tolen)176 void test_manager::Send(const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen)
177 {
178 if (_loss_every > 0 && _loss_counter == _loss_every) {
179 _loss_counter = 0;
180 //printf("DROP!\n");
181 return;
182 } else {
183 ++_loss_counter;
184 }
185
186 int delay = 10 + rand() % 30;
187
188 ++_reorder_counter;
189 if (_reorder_counter >= _reorder_every && _reorder_every > 0) {
190 delay = 9;
191 _reorder_counter = 0;
192 }
193
194 TestUdpOutgoing *q = (TestUdpOutgoing*)malloc(sizeof(TestUdpOutgoing) - 1 + len);
195 q->timestamp = g_current_ms + delay;
196 memcpy(&q->addr, to, tolen);
197 q->addrlen = tolen;
198 q->len = len;
199 memcpy(q->mem, p, len);
200 _send_buffer.Append(q);
201 }
202
203 test_manager* send_udp_manager = 0;
204 test_manager* receive_udp_manager = 0;
205
utp_socket(UTPSocket * s)206 utp_socket::utp_socket(UTPSocket* s) :
207 _buf_size(0), _read_bytes(0),
208 _connected(false), _readable(false), _writable(false), _ignore_reset(false),
209 _destroyed(false), _sock(s)
210 {
211 // printf("utp_socket: %x sock: %x\n", this, _sock);
212 utassert(s);
213 UTP_SetCallbacks(_sock, &utp_callbacks, this);
214 }
215
close()216 void utp_socket::close()
217 {
218 // printf("~utp_socket: %x\n", this);
219 UTP_Close(_sock);
220 }
221
~utp_socket()222 utp_socket::~utp_socket()
223 {
224 utassert(_sock == NULL);
225 }
226
utp_read(void * socket,const byte * bytes,size_t count)227 void utp_socket::utp_read(void* socket, const byte* bytes, size_t count)
228 {
229 utp_socket* s = (utp_socket*)socket;
230 //printf("received %d\n", count);
231 s->_read_bytes += count;
232 //printf("utp_socket::read %x sock: %x bytes: %d\n", s, s->_sock, bytes);
233 // TODO: assert the bytes we receive matches the pattern we sent
234 }
235
236 // called when the socket is ready to write count bytes
on_utp_write(void * socket,byte * bytes,size_t count)237 void utp_socket::on_utp_write(void *socket, byte *bytes, size_t count)
238 {
239 utp_socket* s = (utp_socket*)socket;
240 //printf("utp_socket::write %x sock: %x\n", s, s->_sock);
241 // utassert(count <= s->_buf_size);
242 memcpy(bytes, s->_buffer, count);
243 memmove(s->_buffer, s->_buffer+count, s->_buf_size - count);
244 s->_buf_size -= count;
245 //printf("sending %d bytes (%d left)\n", count, s->_buf_size);
246 }
247
flush_write()248 void utp_socket::flush_write()
249 {
250 //printf("utp_socket::flush_write %x sock: %x\n", this, _sock);
251 if (!_writable) return;
252 if (_buf_size == 0) return;
253
254 _writable = UTP_Write(_sock, _buf_size);
255 // if (!_writable) printf("not writable\n");
256 }
257
on_utp_state(void * socket,int state)258 void utp_socket::on_utp_state(void *socket, int state)
259 {
260 utp_socket* s = (utp_socket*)socket;
261 utassert(s->_sock);
262 //printf("utp_socket::state %x sock: %x\n", s, s->_sock);
263 switch(state) {
264 case UTP_STATE_CONNECT:
265 utassert(!s->_destroyed);
266 s->_connected = true;
267 // printf("connected!\n");
268 s->_writable = true;
269 s->flush_write();
270 break;
271 case UTP_STATE_WRITABLE:
272 utassert(s->_connected && !s->_destroyed);
273 s->_writable = true;
274 // printf("writable!\n");
275 s->flush_write();
276 break;
277 case UTP_STATE_DESTROYING:
278 utassert(!s->_destroyed);
279 s->_connected = false;
280 s->_readable = false;
281 s->_writable = false;
282 s->_destroyed = true;
283 s->_sock = NULL;
284 break;
285 case UTP_STATE_EOF:
286 utassert(s->_connected && !s->_destroyed);
287 s->_readable = false;
288 s->_connected = false;
289 break;
290 }
291 }
292
293 bool g_error = false;
294
on_utp_error(void * socket,int errcode)295 void utp_socket::on_utp_error(void *socket, int errcode)
296 {
297 printf("\nUTP ERROR: %d for socket %p\n", errcode, socket);
298 utp_socket* usock = ((utp_socket*)socket);
299 if (!usock->_ignore_reset || errcode != ECONNRESET) {
300 g_error = true;
301 utassert(false);
302 }
303 usock->close();
304 }
305
write(char const * buf,size_t count)306 size_t utp_socket::write(char const* buf, size_t count)
307 {
308 assert(_buf_size <= sizeof(_buffer));
309 size_t free = sizeof(_buffer) - _buf_size;
310 size_t to_write = count < free ? count : free;
311 if (to_write == 0) return 0;
312 memcpy(_buffer + _buf_size, buf, to_write);
313 _buf_size += count;
314 //printf("writing %d bytes to write buffer\n", count);
315 flush_write();
316 return to_write;
317 }
318
tick()319 void tick()
320 {
321 static int tick_counter = 0;
322
323 ++tick_counter;
324 if (tick_counter == 10) {
325 tick_counter = 0;
326 UTP_CheckTimeouts();
327 }
328
329 uint32 start_time = UTP_GetMilliseconds();
330 uint32 max_time = 1000;
331
332 send_udp_manager->Flush(start_time, max_time);
333 receive_udp_manager->Flush(start_time, max_time);
334
335 msleep(5);
336 }
337
338 enum flags_t {
339 use_utp_v1 = 1,
340 simulate_packetloss = 2,
341 simulate_packetreorder = 4,
342 heavy_loss = 8,
343 };
344
test_transfer(int flags)345 void test_transfer(int flags)
346 {
347 sockaddr_in sin;
348 memset(&sin, 0, sizeof(sin));
349 sin.sin_family = AF_INET;
350 sin.sin_addr.s_addr = inet_addr("127.0.0.1");
351 sin.sin_port = htons(12345);
352
353 UTPSocket* sock = UTP_Create(&test_send_to_proc, send_udp_manager,
354 (const struct sockaddr*)&sin, sizeof(sin));
355
356 utp_socket* sender = new utp_socket(sock);
357 if (flags & use_utp_v1) {
358 UTP_SetSockopt(sender->_sock, SO_UTPVERSION, 1);
359 } else {
360 UTP_SetSockopt(sender->_sock, SO_UTPVERSION, 0);
361 }
362
363 send_udp_manager->clear();
364 receive_udp_manager->clear();
365
366 if (flags & simulate_packetloss) {
367 send_udp_manager->drop_one_packet_every(33);
368 receive_udp_manager->drop_one_packet_every(47);
369
370 if (flags & heavy_loss) {
371 send_udp_manager->drop_one_packet_every(7);
372 receive_udp_manager->drop_one_packet_every(13);
373 }
374 }
375
376 if (flags & simulate_packetreorder) {
377 send_udp_manager->reorder_one_packet_every(27);
378 receive_udp_manager->reorder_one_packet_every(23);
379 }
380
381 UTP_Connect(sender->_sock);
382
383 for (int i = 0; i < 1500; ++i) {
384 tick();
385 if (sender->_connected && incoming) break;
386 }
387 utassert(incoming);
388 if (!incoming) return;
389 utassert(sender->_connected);
390 if (!sender->_connected) return;
391
392 char buffer[16*1024];
393 for (size_t i = 0; i < sizeof(buffer); ++i) buffer[i] = i & 0xff;
394
395 const size_t send_target = 10 * 16 * 1024;
396
397 size_t written = sender->write(buffer, sizeof(buffer));
398 utassert(written > 0);
399
400 for (int i = 0; i < 20000; ++i) {
401 tick();
402 // utassert(incoming->_read_bytes <= written);
403 // utassert(written <= send_target);
404 if (incoming->_read_bytes == send_target) break;
405 if (written < send_target && sender->_writable)
406 {
407 int offset = written % (16 * 1024);
408 written += sender->write(buffer + offset, 1024 * 16 - offset);
409 // printf("written: %d\n", written);
410 }
411 }
412 utassert_failmsg(incoming->_read_bytes == written, printf("\nread_bytes: %d written: %d\n", incoming->_read_bytes, written));
413
414 sender->close();
415
416 for (int i = 0; i < 1500; ++i) {
417 tick();
418 if (incoming->_connected == false) break;
419 }
420 utassert(incoming->_connected == false);
421
422 incoming->close();
423
424 // we know at this point that the sender sent all the data and the receiver got EOF.
425 // shutdown might be disrupted by dropped packets, so ignore RSTs
426 if (flags & simulate_packetloss) {
427 sender->_ignore_reset = true;
428 incoming->_ignore_reset = true;
429 }
430
431 for (int i = 0; i < 1500; ++i) {
432 tick();
433 if (sender->_destroyed == true) break;
434 }
435 utassert(sender->_destroyed == true);
436
437 for (int i = 0; i < 1500; ++i) {
438 tick();
439 if (incoming->_destroyed == true) break;
440 }
441 utassert(incoming->_destroyed == true);
442
443 delete sender;
444 delete incoming;
445 incoming = NULL;
446 }
447
448 bool wrapping_compare_less(uint32 lhs, uint32 rhs);
449
main()450 int main()
451 {
452 utassert(wrapping_compare_less(0xfffffff0, 0xffffffff) == true);
453 utassert(wrapping_compare_less(0xffffffff, 0xfffffff0) == false);
454 utassert(wrapping_compare_less(0xfff, 0xfffffff0) == false);
455 utassert(wrapping_compare_less(0xfffffff0, 0xfff) == true);
456 utassert(wrapping_compare_less(0x0, 0x1) == true);
457 utassert(wrapping_compare_less(0x1, 0x0) == false);
458 utassert(wrapping_compare_less(0x1, 0x1) == false);
459
460 send_udp_manager = new test_manager;
461 receive_udp_manager = new test_manager;
462 send_udp_manager->bind(receive_udp_manager);
463 receive_udp_manager->bind(send_udp_manager);
464
465 #define _ if (!g_error)
466
467 printf("\nTesting transfer\n");
468 test_transfer(0);
469 _ printf("\nTesting transfer with simulated packet loss\n");
470 _ test_transfer(simulate_packetloss);
471 _ printf("\nTesting transfer with simulated packet loss and reorder\n");
472 _ test_transfer(simulate_packetloss | simulate_packetreorder);
473 _ printf("\nTesting transfer with heavy simulated packet loss and reorder\n");
474 _ test_transfer(simulate_packetloss | simulate_packetreorder | heavy_loss);
475 _ printf("\nTesting transfer with simulated packet reorder\n");
476 _ test_transfer(simulate_packetreorder);
477
478 _ printf("\nTesting transfer using utp v1\n");
479 _ test_transfer(use_utp_v1);
480 _ printf("\nTesting transfer using utp v1 with simulated packet loss\n");
481 _ test_transfer(use_utp_v1 | simulate_packetloss);
482 _ printf("\nTesting transfer using utp v1 with simulated packet loss and reorder\n");
483 _ test_transfer(use_utp_v1 | simulate_packetloss | simulate_packetreorder);
484 _ printf("\nTesting transfer using utp v1 with heavy simulated packet loss and reorder\n");
485 _ test_transfer(use_utp_v1 | simulate_packetloss | simulate_packetreorder | heavy_loss);
486 _ printf("\nTesting transfer using utp v1 with simulated packet reorder\n");
487 _ test_transfer(use_utp_v1 | simulate_packetreorder);
488
489 return 0;
490 }
491
492