1 /***** Includes *****/
2 #include "config.h"
3 #include <sys/types.h>
4
5 #include <time.h>
6 #ifdef HAVE_SYS_TIME_H
7 #include <sys/time.h>
8 #endif
9 #ifdef HAVE_SYS_TIMES_H
10 #include <sys/times.h>
11 #endif
12 #include <sys/socket.h>
13 #ifdef HAVE_SYS_SOCKIO_H
14 #include <sys/sockio.h>
15 #endif
16 #ifdef HAVE_SYS_SELECT_H
17 #include <sys/select.h>
18 #endif
19 #ifdef HAVE_SYS_UN_H
20 #include <sys/un.h>
21 #endif
22 #ifdef HAVE_SYS_UIO_H
23 #include <sys/uio.h>
24 #endif
25 #ifdef HAVE_HOSTLIB_H
26 #include "hostLib.h"
27 #endif
28 #ifdef HAVE_STREAMS_UN_H
29 #include <streams/un.h>
30 #endif
31 #include <netinet/in.h>
32 #include <arpa/inet.h>
33 #ifdef HAVE_NETDB_H
34 #include <netdb.h>
35 #endif
36
37 #include <stdio.h>
38 #include <fcntl.h>
39 #ifdef HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42 #undef NDEBUG
43 #include <assert.h>
44 #include <signal.h>
45 #include <stdlib.h>
46 #include <string.h>
47 #include <limits.h>
48 #ifdef HAVE_MEMORY_H
49 #include <memory.h>
50 #endif
51 #include <algorithm>
52 #include <iostream>
53
54 #include <atl.h>
55 #include "evpath.h"
56 #include "cm_transport.h"
57 #include "udt.h"
58
59 #ifndef SOCKET_ERROR
60 #define SOCKET_ERROR -1
61 #endif
62
63 #if defined (__INTEL_COMPILER)
64 #pragma warning (disable: 869)
65 #pragma warning (disable: 310)
66 #pragma warning (disable: 1418)
67 #pragma warning (disable: 180)
68 #pragma warning (disable: 2259)
69 #pragma warning (disable: 177)
70 #endif
71
72 typedef struct func_list_item
73 {
74 select_list_func func;
75 void *arg1;
76 void *arg2;
77 } FunctionListElement;
78
79 typedef struct socket_data_map_entry *socket_data_map_ptr;
80
81 typedef struct udt4_transport_data
82 {
83 CManager cm;
84 char *hostname;
85 int listen_port;
86 attr_list characteristics;
87 CMtrans_services svc;
88 int eid;
89 int listen_sock;
90 int conn_count;
91 socket_data_map_ptr map;
92 } *udt4_transport_data_ptr;
93
94 typedef struct udt4_connection_data
95 {
96 int remote_IP;
97 int remote_contact_port;
98 UDTSOCKET udtsock;
99 udt4_transport_data_ptr utd;
100 CMConnection conn;
101 CMbuffer read_buffer;
102 int read_buffer_len;
103 } *udt4_conn_data_ptr;
104
105 struct socket_data_map_entry
106 {
107 UDTSOCKET udtsock;
108 udt4_conn_data_ptr ucd;
109 };
110
111 #ifdef WSAEWOULDBLOCK
112 #define EWOULDBLOCK WSAEWOULDBLOCK
113 #define EAGAIN WSAEINPROGRESS
114 #define EINTR WSAEINTR
115 #define errno GetLastError()
116 #define read(fd, buf, len) recv(fd, buf, len, 0)
117 #define write(fd, buf, len) send(fd, buf, len, 0)
118 #endif
119
120 static atom_t CM_FD = -1;
121 static atom_t CM_THIS_CONN_PORT = -1;
122 static atom_t CM_PEER_CONN_PORT = -1;
123 static atom_t CM_PEER_IP = -1;
124 static atom_t CM_PEER_HOSTNAME = -1;
125 static atom_t CM_PEER_LISTEN_PORT = -1;
126 static atom_t CM_TRANSPORT = -1;
127 static atom_t CM_TRANSPORT_RELIABLE = -1;
128 static atom_t CM_IP_PORT = -1;
129 static atom_t CM_IP_HOSTNAME = -1;
130 static atom_t CM_IP_ADDR = -1;
131
132 #define TIMING_GUARD_START { struct timeval t0,t1,diff; gettimeofday(&t0, NULL);
133 #define TIMING_GUARD_STOP gettimeofday(&t1, NULL); timersub(&t1, &t0, &diff); if (diff.tv_sec > 0) fprintf(stderr, "TIME GUARD at %s:%d exceeded, time was was <%ld.%06ld> secs\n", __FILE__, __LINE__, (long)diff.tv_sec, (long)diff.tv_usec);}
134
135 static int
check_host(char * hostname,void * sin_addr)136 check_host(char *hostname, void *sin_addr)
137 {
138 struct hostent *host_addr;
139 host_addr = gethostbyname(hostname);
140 if (host_addr == NULL) {
141 struct in_addr addr;
142 if (inet_aton(hostname, &addr) == 0) {
143 /*
144 * not translatable as a hostname or
145 * as a dot-style string IP address
146 */
147 return 0;
148 }
149 assert(sizeof(int) == sizeof(struct in_addr));
150 *((int *) sin_addr) = *((int *) &addr);
151 } else {
152 memcpy(sin_addr, host_addr->h_addr, host_addr->h_length);
153 }
154 return 1;
155 }
156
157 static udt4_conn_data_ptr
create_udt4_conn_data(CMtrans_services svc)158 create_udt4_conn_data(CMtrans_services svc)
159 {
160 udt4_conn_data_ptr ucd =
161 (udt4_conn_data_ptr) svc->
162 malloc_func(sizeof(struct udt4_connection_data));
163 memset(ucd, 0, sizeof(struct udt4_connection_data));
164 ucd->remote_contact_port = -1;
165 ucd->udtsock = -1;
166 return ucd;
167 }
168
169 static void udt4_service_epoll(void *void_trans, void *void_conn_sock);
170
171 static void
add_sock_map_data(udt4_transport_data_ptr udt,UDTSOCKET sock,udt4_conn_data_ptr ucd)172 add_sock_map_data(udt4_transport_data_ptr udt, UDTSOCKET sock,
173 udt4_conn_data_ptr ucd)
174 {
175 if (udt->conn_count == 0) {
176 udt->map = (socket_data_map_entry *) malloc(sizeof(udt->map[0]));
177 } else {
178 udt->map =
179 (socket_data_map_entry *) realloc(udt->map,
180 (udt->conn_count +
181 1) * sizeof(udt->map[0]));
182 }
183 udt->map[udt->conn_count].udtsock = sock;
184 udt->map[udt->conn_count].ucd = ucd;
185 udt->conn_count++;
186 }
187
188 extern "C" void
libcmudt4_LTX_shutdown_conn(CMtrans_services svc,udt4_conn_data_ptr ucd)189 libcmudt4_LTX_shutdown_conn(CMtrans_services svc, udt4_conn_data_ptr ucd)
190 {
191 svc->connection_deref(ucd->conn);
192 svc->trace_out(ucd->utd->cm, "UDT4, closing socket %x\n",
193 ucd->udtsock);
194 UDT::epoll_remove_usock(ucd->utd->eid, ucd->udtsock);
195 UDT::close(ucd->udtsock);
196 free(ucd);
197 }
198
199
200 static int
is_private_192(int IP)201 is_private_192(int IP)
202 {
203 return ((IP & 0xffff0000) == 0xC0A80000); /* equal 192.168.x.x */
204 }
205
206 static int
is_private_182(int IP)207 is_private_182(int IP)
208 {
209 return ((IP & 0xffff0000) == 0xB6100000); /* equal 182.16.x.x */
210 }
211
212 static int
is_private_10(int IP)213 is_private_10(int IP)
214 {
215 return ((IP & 0xff000000) == 0x0A000000); /* equal 10.x.x.x */
216 }
217
218 extern "C" attr_list
219 libcmudt4_LTX_non_blocking_listen(CManager cm, CMtrans_services svc,
220 transport_entry trans,
221 attr_list listen_info);
222
223 static int
initiate_conn(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs,udt4_conn_data_ptr ucd,attr_list conn_attr_list)224 initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
225 attr_list attrs, udt4_conn_data_ptr ucd,
226 attr_list conn_attr_list)
227 {
228 int sock;
229
230 int int_port_num;
231 u_short port_num;
232 udt4_transport_data_ptr utd =
233 (udt4_transport_data_ptr) trans->trans_data;
234 char *host_name;
235 int remote_IP = -1;
236 static int host_ip = 0;
237 union
238 {
239 struct sockaddr s;
240 struct sockaddr_in s_I4;
241 struct sockaddr_in6 s_l6;
242 } sock_addr;
243
244 if (utd->cm) {
245 /* assert CM is locked */
246 assert(CM_LOCKED(svc, utd->cm));
247 }
248 if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
249 /* value pointer */ (attr_value *) (long) &host_name)) {
250 svc->trace_out(cm, "udt4 transport found no IP_HOST attribute");
251 host_name = NULL;
252 } else {
253 svc->trace_out(cm, "udt4 transport connect to host %s", host_name);
254 }
255 if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
256 /* value pointer */ (attr_value *) (long) &host_ip)) {
257 svc->trace_out(cm, "udt4 transport found no IP_ADDR attribute");
258 /* wasn't there */
259 host_ip = 0;
260 } else {
261 svc->trace_out(cm, "udt4 transport connect to host_IP %lx",
262 host_ip);
263 }
264 if ((host_name == NULL) && (host_ip == 0))
265 return -1;
266
267 if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
268 /* value pointer */
269 (attr_value *) (long) &int_port_num)) {
270 svc->trace_out(cm, "udt4 transport found no IP_PORT attribute");
271 return -1;
272 } else {
273 svc->trace_out(cm, "udt4 transport connect to port %d",
274 int_port_num);
275 }
276 port_num = int_port_num;
277
278 /* we should already be listening, but if not, listen */
279 if (utd->listen_port == -1) {
280 attr_list l =
281 libcmudt4_LTX_non_blocking_listen(cm, svc, trans, NULL);
282 if (l)
283 free_attr_list(l);
284 }
285 /* INET socket connection, host_name is the machine name */
286 char ip_str[INET_ADDRSTRLEN];
287
288 if ((sock = UDT::socket(AF_INET, SOCK_STREAM, 0)) == SOCKET_ERROR) {
289 svc->trace_out(cm,
290 " UDT4 connect FAILURE --> Couldn't create socket");
291 return -1;
292 }
293 ((struct sockaddr_in *) &sock_addr)->sin_family = AF_INET;
294 if (host_name != NULL) {
295 if (check_host(host_name, (void *) &sock_addr.s_I4.sin_addr) == 0) {
296 if (host_ip == 0) {
297 svc->trace_out(cm,
298 "UDT4 connect FAILURE --> Host not found \"%s\", no IP addr supplied in contact list",
299 host_name);
300 } else {
301 svc->trace_out(cm,
302 "CMUDT4 --> Host not found \"%s\", Using supplied IP addr %x",
303 host_name == NULL ? "(unknown)" : host_name,
304 host_ip);
305 sock_addr.s_I4.sin_addr.s_addr = ntohl(host_ip);
306 }
307 }
308 } else {
309 sock_addr.s_I4.sin_addr.s_addr = ntohl(host_ip);
310 }
311 sock_addr.s_I4.sin_port = htons(port_num);
312 remote_IP = ntohl(sock_addr.s_I4.sin_addr.s_addr);
313 if (is_private_192(remote_IP)) {
314 svc->trace_out(cm,
315 "Target IP is on a private 192.168.x.x network");
316 }
317 if (is_private_182(remote_IP)) {
318 svc->trace_out(cm, "Target IP is on a private 182.16.x.x network");
319 }
320 if (is_private_10(remote_IP)) {
321 svc->trace_out(cm, "Target IP is on a private 10.x.x.x network");
322 }
323 inet_ntop(AF_INET, &sock_addr.s_I4.sin_addr, ip_str, INET_ADDRSTRLEN);
324 svc->trace_out(cm,
325 "Attempting udt4 socket connection, host=\"%s\", IP = %s, port %d",
326 host_name == 0 ? "(unknown)" : host_name, ip_str,
327 ntohs(sock_addr.s_I4.sin_port));
328 if (UDT::
329 connect(sock, (struct sockaddr *) &sock_addr,
330 sizeof(sock_addr.s_I4)) == SOCKET_ERROR) {
331 printf("Errno was %d\n", errno);
332 svc->trace_out(cm,
333 "UDT4 connect FAILURE --> Connect() to IP %s failed",
334 ip_str);
335 UDT::close(sock);
336 }
337 int local_listen_port = htonl(utd->listen_port);
338 if (UDT::send(sock, (char *) &local_listen_port, 4, 0) < 0) {
339 svc->trace_out(cm, "UDT4 send of listen port failed\n");
340 return -1;
341 }
342
343 svc->trace_out(cm, "--> Connection established");
344 ucd->remote_IP = remote_IP;
345 ucd->remote_contact_port = int_port_num;
346 ucd->udtsock = sock;
347 ucd->utd = utd;
348
349 add_attr(conn_attr_list, CM_FD, Attr_Int4, (attr_value) (long) sock);
350 add_attr(conn_attr_list, CM_THIS_CONN_PORT, Attr_Int4,
351 (attr_value) (long) int_port_num);
352 add_attr(conn_attr_list, CM_PEER_IP, Attr_Int4,
353 (attr_value) (long) ucd->remote_IP);
354 add_sock_map_data(utd, sock, ucd);
355 int localFD = UDT::usock_getfd(sock);
356 svc->trace_out(cm, "Adding UDT4 UDP socket %d to select list\n",
357 localFD);
358 svc->fd_add_select(cm, localFD, udt4_service_epoll, (void *) trans,
359 (void *) utd);
360 return sock;
361 }
362
363 /*
364 * Initiate a socket connection with another data exchange. If port_num is -1,
365 * establish a unix socket connection (name_str stores the file name of
366 * the waiting socket). Otherwise, establish an INET socket connection
367 * (name_str stores the machine name).
368 */
369 extern "C" CMConnection
libcmudt4_LTX_initiate_conn(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs)370 libcmudt4_LTX_initiate_conn(CManager cm, CMtrans_services svc,
371 transport_entry trans, attr_list attrs)
372 {
373 udt4_conn_data_ptr ucd = create_udt4_conn_data(svc);
374 attr_list conn_attr_list = create_attr_list();
375 CMConnection conn;
376 int sock;
377 udt4_transport_data_ptr utd =
378 (udt4_transport_data_ptr) trans->trans_data;
379
380 if (utd->cm) {
381 /* assert CM is locked */
382 assert(CM_LOCKED(svc, utd->cm));
383 }
384 if ((sock =
385 initiate_conn(cm, svc, trans, attrs, ucd, conn_attr_list)) < 0)
386 return NULL;
387
388 add_attr(conn_attr_list, CM_PEER_LISTEN_PORT, Attr_Int4,
389 (attr_value) (long) ucd->remote_contact_port);
390 conn = svc->connection_create(trans, ucd, conn_attr_list);
391 ucd->utd = utd;
392 ucd->conn = conn;
393
394 svc->trace_out(cm,
395 "Cmudt4 Adding trans->data_available as action on fd %d",
396 sock);
397 add_sock_map_data(utd, sock, ucd);
398 UDT::epoll_add_usock(utd->eid, sock, NULL);
399
400 free_attr_list(conn_attr_list);
401 /* dump_sockinfo("initiate ", sock); */
402 svc->connection_addref(conn); /* one ref count went to select
403 * (and CM), the other to the
404 * user */
405 return conn;
406 }
407
408 /*
409 * Check to see that if we were to attempt to initiate a connection as
410 * indicated by the attribute list, would we be connecting to ourselves?
411 * For udt4, this involves checking to see if the host name is the
412 * same as ours and if the IP_PORT matches the one we are listening on.
413 */
414 extern "C" int
libcmudt4_LTX_self_check(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs)415 libcmudt4_LTX_self_check(CManager cm, CMtrans_services svc,
416 transport_entry trans, attr_list attrs)
417 {
418
419 udt4_transport_data_ptr utd =
420 (udt4_transport_data_ptr) trans->trans_data;
421 int host_addr;
422 int int_port_num;
423 char *host_name;
424 char my_host_name[256];
425 static int IP = 0;
426
427 get_IP_config(my_host_name, sizeof(host_name), &IP, NULL, NULL, NULL,
428 NULL, svc->trace_out, (void *) cm);
429
430 if (IP == 0) {
431 if (IP == 0)
432 IP = INADDR_LOOPBACK;
433 }
434 if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
435 /* value pointer */ (attr_value *) (long) &host_name)) {
436 svc->trace_out(cm,
437 "CMself check udt4 transport found no IP_HOST attribute");
438 host_name = NULL;
439 }
440 if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
441 /* value pointer */ (attr_value *) (long) &host_addr)) {
442 svc->trace_out(cm,
443 "CMself check udt4 transport found no IP_ADDR attribute");
444 if (host_name == NULL)
445 return 0;
446 host_addr = 0;
447 }
448 if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
449 /* value pointer */
450 (attr_value *) (long) &int_port_num)) {
451 svc->trace_out(cm,
452 "CMself check udt4 transport found no IP_PORT attribute");
453 return 0;
454 }
455 if (host_name && (strcmp(host_name, my_host_name) != 0)) {
456 svc->trace_out(cm, "CMself check - Hostnames don't match");
457 return 0;
458 }
459 if (host_addr && (IP != host_addr)) {
460 svc->trace_out(cm,
461 "CMself check - Host IP addrs don't match, %lx, %lx",
462 IP, host_addr);
463 return 0;
464 }
465 if (int_port_num != utd->listen_port) {
466 svc->trace_out(cm, "CMself check - Ports don't match, %d, %d",
467 int_port_num, utd->listen_port);
468 return 0;
469 }
470 svc->trace_out(cm, "CMself check returning TRUE");
471 return 1;
472 }
473
474 extern "C" int
libcmudt4_LTX_connection_eq(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs,udt4_conn_data_ptr ucd)475 libcmudt4_LTX_connection_eq(CManager cm, CMtrans_services svc,
476 transport_entry trans, attr_list attrs,
477 udt4_conn_data_ptr ucd)
478 {
479
480 int int_port_num;
481 int requested_IP = -1;
482 char *host_name = NULL;
483
484 if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
485 /* value pointer */ (attr_value *) (long) &host_name)) {
486 svc->trace_out(cm, "udt4 transport found no IP_HOST attribute");
487 }
488 if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
489 /* value pointer */
490 (attr_value *) (long) &int_port_num)) {
491 svc->trace_out(cm,
492 "Conn Eq udt4 transport found no IP_PORT attribute");
493 return 0;
494 }
495 if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
496 /* value pointer */
497 (attr_value *) (long) &requested_IP)) {
498 svc->trace_out(cm, "udt4 transport found no IP_ADDR attribute");
499 }
500 if (requested_IP == -1) {
501 check_host(host_name, (void *) &requested_IP);
502 requested_IP = ntohl(requested_IP);
503 svc->trace_out(cm, "IP translation for hostname %s is %x",
504 host_name, requested_IP);
505 }
506
507 svc->trace_out(cm, "Socket Conn_eq comparing IP/ports %x/%d and %x/%d",
508 ucd->remote_IP, ucd->remote_contact_port,
509 requested_IP, int_port_num);
510 if ((ucd->remote_IP == requested_IP) &&
511 (ucd->remote_contact_port == int_port_num)) {
512 svc->trace_out(cm, "Socket Conn_eq returning TRUE");
513 return 1;
514 }
515 svc->trace_out(cm, "Socket Conn_eq returning FALSE");
516 return 0;
517 }
518
519 static attr_list
build_listen_attrs(CManager cm,CMtrans_services svc,udt4_transport_data_ptr sd,attr_list listen_info,int int_port_num)520 build_listen_attrs(CManager cm, CMtrans_services svc,
521 udt4_transport_data_ptr sd, attr_list listen_info,
522 int int_port_num)
523 {
524 char host_name[256];
525 attr_list ret_list;
526 int IP;
527 int use_hostname = 0;
528
529 svc->trace_out(cm, "CMUdt4 listen succeeded on port %d", int_port_num);
530 get_IP_config(host_name, sizeof(host_name), &IP, NULL, NULL,
531 &use_hostname, listen_info, svc->trace_out, (void *) cm);
532
533 ret_list = create_attr_list();
534
535 if (sd) {
536 sd->hostname = strdup(host_name);
537 sd->listen_port = int_port_num;
538 }
539 if ((IP != 0) && !use_hostname) {
540 add_attr(ret_list, CM_IP_ADDR, Attr_Int4, (attr_value) (long) IP);
541 }
542 if ((getenv("CMUseHostname") != NULL) || use_hostname) {
543 add_attr(ret_list, CM_IP_HOSTNAME, Attr_String,
544 (attr_value) strdup(host_name));
545 } else if (IP == 0) {
546 add_int_attr(ret_list, CM_IP_ADDR, INADDR_LOOPBACK);
547 }
548 add_attr(ret_list, CM_IP_PORT, Attr_Int4,
549 (attr_value) (long) int_port_num);
550
551 add_attr(ret_list, CM_TRANSPORT, Attr_String,
552 (attr_value) strdup("udt4"));
553 return ret_list;
554 }
555
556 /*
557 * Create an IP socket for connection from other CMs
558 */
559 extern "C" attr_list
libcmudt4_LTX_non_blocking_listen(CManager cm,CMtrans_services svc,transport_entry trans,attr_list listen_info)560 libcmudt4_LTX_non_blocking_listen(CManager cm, CMtrans_services svc,
561 transport_entry trans,
562 attr_list listen_info)
563 {
564 udt4_transport_data_ptr utd =
565 (udt4_transport_data_ptr) trans->trans_data;
566 int length;
567 struct sockaddr_in sock_addr;
568 int sock_opt_val = 1;
569 int conn_sock;
570 int attr_port_num = 0;
571 u_short port_num = 0;
572 int port_range_low, port_range_high;
573 int use_hostname = 0;
574 int int_port_num;
575 int IP;
576 char host_name[256];
577
578 if (utd->listen_port != -1) {
579 /* we're already listening */
580 if (port_num == 0) {
581 /* not requesting a specific port, return what we have */
582 return build_listen_attrs(cm, svc, NULL, listen_info,
583 utd->listen_port);
584 } else {
585 printf
586 ("CMlisten_specific() requesting a specific port follows other Udt4 operation which initiated listen at another port. Only one listen allowed, second listen fails.\n");
587 return NULL;
588 }
589 }
590
591 conn_sock = UDT::socket(AF_INET, SOCK_STREAM, 0);
592
593 if (conn_sock == SOCKET_ERROR) {
594 fprintf(stderr, "Cannot open INET socket\n");
595 return NULL;
596 }
597 if (utd->cm) {
598 /* assert CM is locked */
599 assert(CM_LOCKED(svc, utd->cm));
600 }
601 /*
602 * Check to see if a bind to a specific port was requested
603 */
604 if (listen_info != NULL
605 && !query_attr(listen_info, CM_IP_PORT,
606 NULL, (attr_value *) (long) &attr_port_num)) {
607 port_num = 0;
608 } else {
609 if (attr_port_num > USHRT_MAX || attr_port_num < 0) {
610 fprintf(stderr, "Requested port number %d is invalid\n",
611 attr_port_num);
612 return NULL;
613 }
614 port_num = attr_port_num;
615 }
616
617 svc->trace_out(cm, "UDT4 begin listen, requested port %d",
618 attr_port_num);
619 get_IP_config(host_name, sizeof(host_name), &IP, &port_range_low,
620 &port_range_high, &use_hostname, listen_info,
621 svc->trace_out, (void *) cm);
622
623 sock_addr.sin_family = AF_INET;
624 sock_addr.sin_addr.s_addr = INADDR_ANY;
625 sock_addr.sin_port = htons(port_num);
626
627 if (sock_addr.sin_port != 0) {
628 /* specific port requested. set REUSEADDR, REUSEPORT because
629 * previous server might have died badly */
630 if (UDT::
631 setsockopt(conn_sock, SOL_SOCKET, UDT_REUSEADDR,
632 (char *) &sock_opt_val,
633 sizeof(sock_opt_val)) != 0) {
634 fprintf(stderr,
635 "Failed to set REUSEADDR on INET socket before bind\n");
636 perror("setsockopt(SO_REUSEADDR) failed");
637 return NULL;
638 }
639 svc->trace_out(cm, "UDT4 trying to bind selected port %d",
640 port_num);
641 if (UDT::
642 bind(conn_sock, (struct sockaddr *) &sock_addr,
643 sizeof sock_addr) == SOCKET_ERROR) {
644 fprintf(stderr, "Cannot bind INET socket\n");
645 return NULL;
646 }
647 /* begin listening for conns */
648 if (UDT::listen(conn_sock, FD_SETSIZE)) {
649 fprintf(stderr, "listen failed %s\n",
650 UDT::getlasterror().getErrorMessage());
651 return NULL;
652 }
653 } else if (port_range_high == -1) {
654 svc->trace_out(cm, "UDT4 trying to bind to any available port");
655 sock_addr.sin_port = 0;
656 if (UDT::
657 bind(conn_sock, (struct sockaddr *) &sock_addr,
658 sizeof sock_addr) == SOCKET_ERROR) {
659 fprintf(stderr, "Cannot bind INET socket\n");
660 return NULL;
661 }
662 /* begin listening for conns */
663 if (UDT::listen(conn_sock, FD_SETSIZE)) {
664 fprintf(stderr, "listen failed %s\n",
665 UDT::getlasterror().getErrorMessage());
666 return NULL;
667 }
668
669 } else {
670 long seedval = time(NULL) + getpid();
671 /* port num is free. Constrain to range to standards */
672 int size = port_range_high - port_range_low;
673 int tries = 30;
674 int result = SOCKET_ERROR;
675 srand48(seedval);
676 while (tries > 0) {
677 int target = port_range_low + size * drand48();
678 sock_addr.sin_port = htons(target);
679 int_port_num = target;
680 svc->trace_out(cm, "UDT4 trying to bind port %d", target);
681 result = UDT::bind(conn_sock, (struct sockaddr *) &sock_addr,
682 sizeof sock_addr);
683 tries--;
684 /* begin listening for conns */
685 result = UDT::listen(conn_sock, FD_SETSIZE);
686 if (result != UDT::ERROR) {
687 tries = 0;
688 } else {
689 /*
690 * For some reason, UDT4 gives conflicting port errors
691 * on listen rather than bind. You can't bind a socket
692 * more than once, so on error we have to close the old
693 * and start with a new UDT::socket.
694 */
695 UDT::close(conn_sock);
696 conn_sock = UDT::socket(AF_INET, SOCK_STREAM, 0);
697 }
698
699 if (tries % 5 == 4) {
700 /* try reseeding in case we're in sync with another
701 * process */
702 srand48(time(NULL) + getpid());
703 }
704 if (tries == 20) {
705 /* damn, tried a lot, increase the range (This might
706 * violate specified range) */
707 size *= 10;
708 }
709 if (tries == 10) {
710 /* damn, tried a lot more, increase the range (This might
711 * violate specified range) */
712 size *= 10;
713 }
714 }
715 if (result == SOCKET_ERROR) {
716 fprintf(stderr, "Cannot bind INET socket\n");
717 return NULL;
718 }
719 }
720 length = sizeof sock_addr;
721 if (UDT::
722 getsockname(conn_sock, (struct sockaddr *) &sock_addr,
723 &length) < 0) {
724 fprintf(stderr, "Cannot get socket name\n");
725 return NULL;
726 }
727 /* set the port num as one we can be contacted at */
728
729 UDT::epoll_add_usock(utd->eid, conn_sock, NULL);
730 int localFD = UDT::usock_getfd(conn_sock);
731 svc->trace_out(cm, "Adding UDT4 UDP socket %d to select list\n",
732 localFD);
733 svc->fd_add_select(cm, localFD, udt4_service_epoll, (void *) trans,
734 (void *) utd);
735 utd->listen_sock = conn_sock;
736
737 {
738 attr_list ret_list;
739
740 svc->trace_out(cm, "UDT4 listen succeeded on port %d, fd %d",
741 int_port_num, conn_sock);
742 ret_list = create_attr_list();
743
744 utd->hostname = strdup(host_name);
745 utd->listen_port = int_port_num;
746 add_attr(ret_list, CM_TRANSPORT, Attr_String,
747 (attr_value) strdup("udt4"));
748 if ((IP != 0) && (!use_hostname)) {
749 add_attr(ret_list, CM_IP_ADDR, Attr_Int4,
750 (attr_value) (long) IP);
751 }
752 if ((getenv("Cmudt4UseHostname") != NULL) || use_hostname) {
753 add_attr(ret_list, CM_IP_HOSTNAME, Attr_String,
754 (attr_value) strdup(host_name));
755 } else if (IP == 0) {
756 add_attr(ret_list, CM_IP_ADDR, Attr_Int4,
757 (attr_value) INADDR_LOOPBACK);
758 }
759 add_attr(ret_list, CM_IP_PORT, Attr_Int4,
760 (attr_value) (long) int_port_num);
761
762 return ret_list;
763 }
764 }
765
766 #ifdef NEED_IOVEC_DEFINE
767 struct iovec
768 {
769 void *iov_base;
770 int iov_len;
771 };
772
773 #endif
774
775
776 #ifndef IOV_MAX
777 /* this is not defined in some places where it should be. Conservative. */
778 #define IOV_MAX 16
779 #endif
780
781
782 extern "C" int
libcmudt4_LTX_writev_func(CMtrans_services svc,udt4_conn_data_ptr ucd,void * iovs,int iovcnt,attr_list attrs)783 libcmudt4_LTX_writev_func(CMtrans_services svc, udt4_conn_data_ptr ucd,
784 void *iovs, int iovcnt, attr_list attrs)
785 {
786 int left = 0;
787 struct iovec *iov = (struct iovec *) iovs;
788 /* sum lengths */
789 for (int i = 0; i < iovcnt; i++)
790 left += iov[i].iov_len;
791
792 svc->trace_out(ucd->utd->cm, "UDT4 writev of %d bytes on socket %d",
793 left, ucd->udtsock);
794 /*
795 * stupid implementation of writev first
796 *
797 * we'll copy all the data into a single block, then send it with one send()
798 */
799 char *buffer = (char *) malloc(left), *tmp;
800 tmp = buffer;
801 for (int i = 0; i < iovcnt; i++) {
802 memcpy(tmp, iov[i].iov_base, iov[i].iov_len);
803 tmp += iov[i].iov_len;
804 }
805 tmp = buffer;
806 while (left > 0) {
807 int sent = UDT::send(ucd->udtsock, tmp, left, 0);
808
809 if (sent < 0) {
810 svc->trace_out(ucd->utd->cm,
811 "UDT4 send failed, error was %s\n",
812 UDT::getlasterror().getErrorMessage());
813 return 0;
814 }
815 left -= sent;
816 tmp += sent;
817 }
818 free(buffer);
819 return iovcnt;
820 }
821
822 static void
free_udt4_data(CManager cm,void * utdv)823 free_udt4_data(CManager cm, void *utdv)
824 {
825 udt4_transport_data_ptr utd = (udt4_transport_data_ptr) utdv;
826 CMtrans_services svc = utd->svc;
827 if (utd->hostname != NULL)
828 svc->free_func(utd->hostname);
829 svc->free_func(utd);
830 }
831
832 extern "C" int
libcmudt4_LTX_read_to_buffer_func(CMtrans_services svc,udt4_conn_data_ptr ucd,void * buffer,int requested_len,int non_blocking)833 libcmudt4_LTX_read_to_buffer_func(CMtrans_services svc,
834 udt4_conn_data_ptr ucd, void *buffer,
835 int requested_len, int non_blocking)
836 {
837 int left, iget;
838 int rcv_size, var_size = sizeof(rcv_size);
839 svc->trace_out(ucd->utd->cm,
840 "CMUDT4 read of %d bytes on fd %d, non_block %d",
841 requested_len, ucd->udtsock, non_blocking);
842 UDT::getsockopt(ucd->udtsock, 0, UDT_RCVDATA, &rcv_size, &var_size);
843 if (non_blocking && (rcv_size == 0)) {
844 svc->trace_out(ucd->utd->cm,
845 "CMUDT4 socket state is %d, rcv_size = %d, non_blocking read would block, returning 0\n",
846 UDT::getsockstate(ucd->udtsock), rcv_size);
847 return 0;
848 }
849
850 if (UDT::ERROR ==
851 (iget =
852 UDT::recv(ucd->udtsock, (char *) buffer, requested_len, 0))) {
853 svc->trace_out(ucd->utd->cm, "CMUDT4 recv failed, error was %s\n",
854 UDT::getlasterror().getErrorMessage());;
855 }
856 if ((iget == -1) || (iget == 0)) {
857 int lerrno = errno;
858 if ((lerrno != EWOULDBLOCK) &&
859 (lerrno != EAGAIN) && (lerrno != EINTR)) {
860 /* serious error */
861 svc->trace_out(ucd->utd->cm,
862 "CMUDT4 iget was -1, errno is %d, returning 0 for read",
863 lerrno);
864 return -1;
865 } else {
866 if (non_blocking) {
867 svc->trace_out(ucd->utd->cm,
868 "CMUDT4 iget was -1, would block, errno is %d",
869 lerrno);
870 return 0;
871 }
872 return -1;
873 }
874 }
875 left = requested_len - iget;
876 while (left > 0) {
877 int lerrno;
878 if (UDT::ERROR ==
879 (iget =
880 UDT::recv(ucd->udtsock,
881 ((char *) buffer) + requested_len - left, left,
882 0))) {
883 svc->trace_out(ucd->utd->cm, "recv failed, message was %s\n",
884 UDT::getlasterror().getErrorMessage());
885 }
886 lerrno = errno;
887 if (iget == -1) {
888 if ((lerrno != EWOULDBLOCK) &&
889 (lerrno != EAGAIN) && (lerrno != EINTR)) {
890 /* serious error */
891 svc->trace_out(ucd->utd->cm,
892 "Cmudt4 iget was -1, errno is %d, returning %d for read",
893 lerrno, requested_len - left);
894 return (requested_len - left);
895 } else {
896 iget = 0;
897 }
898 } else if (iget == 0) {
899 svc->trace_out(ucd->utd->cm,
900 "Cmudt4 iget was 0, errno is %d, returning %d for read",
901 lerrno, requested_len - left);
902 return requested_len - left; /* end of file */
903 }
904 left -= iget;
905 }
906 return requested_len;
907 }
908
909 static void
udt4_service_epoll(void * void_trans,void * not_used)910 udt4_service_epoll(void *void_trans, void *not_used)
911 {
912 transport_entry trans = (transport_entry) void_trans;
913 udt4_transport_data_ptr utd =
914 (udt4_transport_data_ptr) trans->trans_data;
915 CManager cm = utd->cm;
916 CMtrans_services svc = utd->svc;
917 std::set < UDTSOCKET > readfds;
918 if (!(CM_LOCKED(svc, utd->cm))) {
919 printf("UTD4 service network, CManager not locked\n");
920 }
921
922 int ret = UDT::epoll_wait(utd->eid, &readfds, NULL, 50);
923 svc->trace_out(utd->cm, "Epoll_wait returned %d\n", ret);
924 for (std::set < UDTSOCKET >::iterator i = readfds.begin();
925 i != readfds.end(); ++i) {
926 if (*i == utd->listen_sock) {
927 sockaddr_storage clientaddr;
928 attr_list conn_attr_list;
929 struct sockaddr_in remote;
930 int remote_len = sizeof(remote);
931 int addrlen = sizeof(clientaddr);
932 UDTSOCKET new_sock =
933 UDT::accept(*i, (sockaddr *) & clientaddr, &addrlen);
934 udt4_conn_data_ptr ucd = create_udt4_conn_data(svc);
935 ucd->udtsock = new_sock;
936 ucd->utd = utd;
937 conn_attr_list = create_attr_list();
938 CMConnection conn =
939 svc->connection_create(trans, ucd, conn_attr_list);
940 ucd->conn = conn;
941 UDT::getpeername(ucd->udtsock, (struct sockaddr *) &remote,
942 &remote_len);
943 add_int_attr(conn_attr_list, CM_PEER_IP,
944 ntohl(remote.sin_addr.s_addr));
945 ucd->remote_IP = ntohl(remote.sin_addr.s_addr); /* remote_IP
946 * is in
947 * host
948 * byte
949 * order */
950 if (UDT::
951 recv(ucd->udtsock, (char *) &ucd->remote_contact_port, 4,
952 0) < 0) {
953 svc->trace_out(utd->cm,
954 "Remote host dropped connection without data");
955 return;
956 } else {
957 ucd->remote_contact_port = ntohl(ucd->remote_contact_port);
958 }
959 add_attr(conn_attr_list, CM_PEER_LISTEN_PORT, Attr_Int4,
960 (attr_value) (long) ucd->remote_contact_port);
961 struct in_addr addr;
962 addr.s_addr = htonl(ucd->remote_IP);
963 svc->trace_out(trans->cm,
964 "Remote host (IP %s) is listening at port %d\n",
965 inet_ntoa(addr), ucd->remote_contact_port);
966 free_attr_list(conn_attr_list);
967 add_sock_map_data(utd, new_sock, ucd);
968 UDT::epoll_add_usock(utd->eid, new_sock);
969 } else {
970 // int rcv_size = 1024000;
971 // int var_size = sizeof(int), rs;
972 udt4_conn_data_ptr ucd = NULL;
973 for (int j = 0; j < utd->conn_count; j++) {
974 if (utd->map[j].udtsock == *i) {
975 ucd = utd->map[j].ucd;
976 }
977 }
978 if (ucd == NULL) {
979 printf
980 ("Internal consistency error, conn data for sock %x not found\n",
981 *i);
982 }
983 svc->trace_out(cm, "Calling data available on connection %p\n",
984 ucd->conn);
985 (trans->data_available) (trans, ucd->conn);
986 }
987 }
988 }
989
990 extern "C" void *
libcmudt4_LTX_initialize(CManager cm,CMtrans_services svc,transport_entry trans)991 libcmudt4_LTX_initialize(CManager cm, CMtrans_services svc,
992 transport_entry trans)
993 {
994 static int atom_init = 0;
995
996 (void) trans;
997 udt4_transport_data_ptr udt4_data;
998 svc->trace_out(cm, "Initialize UDP4 transport built in %s",
999 EVPATH_MODULE_BUILD_DIR);
1000
1001 if (atom_init == 0) {
1002 CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
1003 CM_IP_PORT = attr_atom_from_string("IP_PORT");
1004 CM_IP_ADDR = attr_atom_from_string("IP_ADDR");
1005 CM_FD = attr_atom_from_string("CONNECTION_FILE_DESCRIPTOR");
1006 CM_THIS_CONN_PORT = attr_atom_from_string("THIS_CONN_PORT");
1007 CM_PEER_CONN_PORT = attr_atom_from_string("PEER_CONN_PORT");
1008 CM_TRANSPORT = attr_atom_from_string("CM_TRANSPORT");
1009 CM_PEER_IP = attr_atom_from_string("PEER_IP");
1010 CM_PEER_HOSTNAME = attr_atom_from_string("PEER_HOSTNAME");
1011 CM_PEER_LISTEN_PORT = attr_atom_from_string("PEER_LISTEN_PORT");
1012 CM_TRANSPORT_RELIABLE =
1013 attr_atom_from_string("CM_TRANSPORT_RELIABLE");
1014 atom_init++;
1015 }
1016 UDT::startup();
1017 udt4_data =
1018 (udt4_transport_data_ptr) svc->
1019 malloc_func(sizeof(struct udt4_transport_data));
1020 memset(udt4_data, 0, sizeof(struct udt4_transport_data));
1021 udt4_data->cm = cm;
1022 udt4_data->hostname = NULL;
1023 udt4_data->listen_port = -1;
1024 udt4_data->svc = svc;
1025 udt4_data->characteristics = create_attr_list();
1026 add_int_attr(udt4_data->characteristics, CM_TRANSPORT_RELIABLE, 1);
1027 svc->add_shutdown_task(cm, free_udt4_data, (void *) udt4_data,
1028 FREE_TASK);
1029 udt4_data->eid = UDT::epoll_create();
1030 int localFD = UDT::epoll_getfd(udt4_data->eid);
1031 svc->trace_out(cm, "Adding FD %d to select list", localFD);
1032
1033 svc->fd_add_select(cm, localFD, udt4_service_epoll,
1034 (void *) trans, (void *) udt4_data);
1035 // svc->add_periodic_task(cm, 1, 0, (CMPollFunc) udt4_poll_epoll,
1036 // (void *)trans);
1037 return (void *) udt4_data;
1038 }
1039
1040 extern "C" attr_list
libcmudt4_LTX_get_transport_characteristics(transport_entry trans,CMtrans_services svc,void * vutd)1041 libcmudt4_LTX_get_transport_characteristics(transport_entry trans,
1042 CMtrans_services svc,
1043 void *vutd)
1044 {
1045 struct udt4_transport_data *utd = (struct udt4_transport_data *) vutd;
1046 return utd->characteristics;
1047 }
1048
1049 extern "C" void *
libcmudt4_LTX_read_block_func(CMtrans_services svc,udt4_conn_data_ptr conn_data,int * actual_len,int * offset_ptr)1050 libcmudt4_LTX_read_block_func(CMtrans_services svc,
1051 udt4_conn_data_ptr conn_data,
1052 int *actual_len, int *offset_ptr)
1053 {
1054 CMbuffer cb;
1055
1056 if (conn_data->read_buffer_len == -1)
1057 return NULL;
1058
1059 *actual_len = conn_data->read_buffer_len;
1060 *offset_ptr = 0;
1061 cb = conn_data->read_buffer;
1062 conn_data->read_buffer_len = 0;
1063 conn_data->read_buffer = NULL;
1064 return cb;
1065 }
1066
1067 extern "C" transport_entry
cmudt4_add_static_transport(CManager cm,CMtrans_services svc)1068 cmudt4_add_static_transport(CManager cm, CMtrans_services svc)
1069 {
1070 transport_entry transport;
1071 transport =
1072 (transport_entry) svc->malloc_func(sizeof(struct _transport_item));
1073 memset(transport, 0, sizeof(*transport));
1074 transport->trans_name = strdup("udt4");
1075 transport->cm = cm;
1076 transport->transport_init =
1077 (CMTransport_func) libcmudt4_LTX_initialize;
1078 transport->listen =
1079 (CMTransport_listen_func) libcmudt4_LTX_non_blocking_listen;
1080 transport->initiate_conn =
1081 (CMTransport_conn_func) libcmudt4_LTX_initiate_conn;
1082 transport->self_check =
1083 (CMTransport_self_check_func) libcmudt4_LTX_self_check;
1084 transport->connection_eq =
1085 (CMTransport_connection_eq_func) libcmudt4_LTX_connection_eq;
1086 transport->shutdown_conn =
1087 (CMTransport_shutdown_conn_func) libcmudt4_LTX_shutdown_conn;
1088 transport->read_block_func = (CMTransport_read_block_func) NULL;
1089 transport->read_to_buffer_func =
1090 (CMTransport_read_to_buffer_func)
1091 libcmudt4_LTX_read_to_buffer_func;
1092 transport->writev_func =
1093 (CMTransport_writev_func) libcmudt4_LTX_writev_func;
1094 transport->NBwritev_func = (CMTransport_writev_func) NULL;
1095
1096 transport->set_write_notify = (CMTransport_set_write_notify_func) NULL;
1097 transport->get_transport_characteristics =
1098 (CMTransport_get_transport_characteristics)
1099 libcmudt4_LTX_get_transport_characteristics;
1100 if (transport->transport_init) {
1101 transport->trans_data =
1102 transport->transport_init(cm, svc, transport);
1103 }
1104 return transport;
1105 }
1106