1 /***** Includes *****/
2 #include "config.h"
3 #include <sys/types.h>
4 
5 #include <time.h>
6 #ifdef HAVE_SYS_TIME_H
7 #include <sys/time.h>
8 #endif
9 #ifdef HAVE_SYS_TIMES_H
10 #include <sys/times.h>
11 #endif
12 #include <sys/socket.h>
13 #ifdef HAVE_SYS_SOCKIO_H
14 #include <sys/sockio.h>
15 #endif
16 #ifdef HAVE_SYS_SELECT_H
17 #include <sys/select.h>
18 #endif
19 #ifdef HAVE_SYS_UN_H
20 #include <sys/un.h>
21 #endif
22 #ifdef HAVE_SYS_UIO_H
23 #include <sys/uio.h>
24 #endif
25 #ifdef HAVE_HOSTLIB_H
26 #include "hostLib.h"
27 #endif
28 #ifdef HAVE_STREAMS_UN_H
29 #include <streams/un.h>
30 #endif
31 #include <netinet/in.h>
32 #include <arpa/inet.h>
33 #ifdef HAVE_NETDB_H
34 #include <netdb.h>
35 #endif
36 
37 #include <stdio.h>
38 #include <fcntl.h>
39 #ifdef HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42 #undef NDEBUG
43 #include <assert.h>
44 #include <signal.h>
45 #include <stdlib.h>
46 #include <string.h>
47 #include <limits.h>
48 #ifdef HAVE_MEMORY_H
49 #include <memory.h>
50 #endif
51 #include <algorithm>
52 #include <iostream>
53 
54 #include <atl.h>
55 #include "evpath.h"
56 #include "cm_transport.h"
57 #include "udt.h"
58 
59 #ifndef SOCKET_ERROR
60 #define SOCKET_ERROR -1
61 #endif
62 
63 #if defined (__INTEL_COMPILER)
64 #pragma warning (disable: 869)
65 #pragma warning (disable: 310)
66 #pragma warning (disable: 1418)
67 #pragma warning (disable: 180)
68 #pragma warning (disable: 2259)
69 #pragma warning (disable: 177)
70 #endif
71 
72 typedef struct func_list_item
73 {
74     select_list_func func;
75     void *arg1;
76     void *arg2;
77 } FunctionListElement;
78 
79 typedef struct socket_data_map_entry *socket_data_map_ptr;
80 
81 typedef struct udt4_transport_data
82 {
83     CManager cm;
84     char *hostname;
85     int listen_port;
86     attr_list characteristics;
87     CMtrans_services svc;
88     int eid;
89     int listen_sock;
90     int conn_count;
91     socket_data_map_ptr map;
92 } *udt4_transport_data_ptr;
93 
94 typedef struct udt4_connection_data
95 {
96     int remote_IP;
97     int remote_contact_port;
98     UDTSOCKET udtsock;
99     udt4_transport_data_ptr utd;
100     CMConnection conn;
101     CMbuffer read_buffer;
102     int read_buffer_len;
103 } *udt4_conn_data_ptr;
104 
105 struct socket_data_map_entry
106 {
107     UDTSOCKET udtsock;
108     udt4_conn_data_ptr ucd;
109 };
110 
111 #ifdef WSAEWOULDBLOCK
112 #define EWOULDBLOCK WSAEWOULDBLOCK
113 #define EAGAIN WSAEINPROGRESS
114 #define EINTR WSAEINTR
115 #define errno GetLastError()
116 #define read(fd, buf, len) recv(fd, buf, len, 0)
117 #define write(fd, buf, len) send(fd, buf, len, 0)
118 #endif
119 
120 static atom_t CM_FD = -1;
121 static atom_t CM_THIS_CONN_PORT = -1;
122 static atom_t CM_PEER_CONN_PORT = -1;
123 static atom_t CM_PEER_IP = -1;
124 static atom_t CM_PEER_HOSTNAME = -1;
125 static atom_t CM_PEER_LISTEN_PORT = -1;
126 static atom_t CM_TRANSPORT = -1;
127 static atom_t CM_TRANSPORT_RELIABLE = -1;
128 static atom_t CM_IP_PORT = -1;
129 static atom_t CM_IP_HOSTNAME = -1;
130 static atom_t CM_IP_ADDR = -1;
131 
132 #define TIMING_GUARD_START {     struct timeval t0,t1,diff; gettimeofday(&t0, NULL);
133 #define TIMING_GUARD_STOP gettimeofday(&t1, NULL);    timersub(&t1, &t0, &diff); if (diff.tv_sec > 0) fprintf(stderr, "TIME GUARD at %s:%d exceeded, time was was <%ld.%06ld> secs\n", __FILE__, __LINE__, (long)diff.tv_sec, (long)diff.tv_usec);}
134 
135 static int
check_host(char * hostname,void * sin_addr)136 check_host(char *hostname, void *sin_addr)
137 {
138     struct hostent *host_addr;
139     host_addr = gethostbyname(hostname);
140     if (host_addr == NULL) {
141 	struct in_addr addr;
142 	if (inet_aton(hostname, &addr) == 0) {
143 	    /*
144 	     *  not translatable as a hostname or
145 	     * as a dot-style string IP address
146 	     */
147 	    return 0;
148 	}
149 	assert(sizeof(int) == sizeof(struct in_addr));
150 	*((int *) sin_addr) = *((int *) &addr);
151     } else {
152 	memcpy(sin_addr, host_addr->h_addr, host_addr->h_length);
153     }
154     return 1;
155 }
156 
157 static udt4_conn_data_ptr
create_udt4_conn_data(CMtrans_services svc)158 create_udt4_conn_data(CMtrans_services svc)
159 {
160     udt4_conn_data_ptr ucd =
161 	(udt4_conn_data_ptr) svc->
162 	malloc_func(sizeof(struct udt4_connection_data));
163     memset(ucd, 0, sizeof(struct udt4_connection_data));
164     ucd->remote_contact_port = -1;
165     ucd->udtsock = -1;
166     return ucd;
167 }
168 
169 static void udt4_service_epoll(void *void_trans, void *void_conn_sock);
170 
171 static void
add_sock_map_data(udt4_transport_data_ptr udt,UDTSOCKET sock,udt4_conn_data_ptr ucd)172 add_sock_map_data(udt4_transport_data_ptr udt, UDTSOCKET sock,
173 		  udt4_conn_data_ptr ucd)
174 {
175     if (udt->conn_count == 0) {
176 	udt->map = (socket_data_map_entry *) malloc(sizeof(udt->map[0]));
177     } else {
178 	udt->map =
179 	    (socket_data_map_entry *) realloc(udt->map,
180 					      (udt->conn_count +
181 					       1) * sizeof(udt->map[0]));
182     }
183     udt->map[udt->conn_count].udtsock = sock;
184     udt->map[udt->conn_count].ucd = ucd;
185     udt->conn_count++;
186 }
187 
188 extern "C" void
libcmudt4_LTX_shutdown_conn(CMtrans_services svc,udt4_conn_data_ptr ucd)189 libcmudt4_LTX_shutdown_conn(CMtrans_services svc, udt4_conn_data_ptr ucd)
190 {
191     svc->connection_deref(ucd->conn);
192     svc->trace_out(ucd->utd->cm, "UDT4, closing socket %x\n",
193 		   ucd->udtsock);
194     UDT::epoll_remove_usock(ucd->utd->eid, ucd->udtsock);
195     UDT::close(ucd->udtsock);
196     free(ucd);
197 }
198 
199 
200 static int
is_private_192(int IP)201 is_private_192(int IP)
202 {
203     return ((IP & 0xffff0000) == 0xC0A80000);	/* equal 192.168.x.x */
204 }
205 
206 static int
is_private_182(int IP)207 is_private_182(int IP)
208 {
209     return ((IP & 0xffff0000) == 0xB6100000);	/* equal 182.16.x.x */
210 }
211 
212 static int
is_private_10(int IP)213 is_private_10(int IP)
214 {
215     return ((IP & 0xff000000) == 0x0A000000);	/* equal 10.x.x.x */
216 }
217 
218 extern "C" attr_list
219 libcmudt4_LTX_non_blocking_listen(CManager cm, CMtrans_services svc,
220 				  transport_entry trans,
221 				  attr_list listen_info);
222 
223 static int
initiate_conn(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs,udt4_conn_data_ptr ucd,attr_list conn_attr_list)224 initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
225 	      attr_list attrs, udt4_conn_data_ptr ucd,
226 	      attr_list conn_attr_list)
227 {
228     int sock;
229 
230     int int_port_num;
231     u_short port_num;
232     udt4_transport_data_ptr utd =
233 	(udt4_transport_data_ptr) trans->trans_data;
234     char *host_name;
235     int remote_IP = -1;
236     static int host_ip = 0;
237     union
238     {
239 	struct sockaddr s;
240 	struct sockaddr_in s_I4;
241 	struct sockaddr_in6 s_l6;
242     } sock_addr;
243 
244     if (utd->cm) {
245 	/* assert CM is locked */
246 	assert(CM_LOCKED(svc, utd->cm));
247     }
248     if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
249 		    /* value pointer */ (attr_value *) (long) &host_name)) {
250 	svc->trace_out(cm, "udt4 transport found no IP_HOST attribute");
251 	host_name = NULL;
252     } else {
253 	svc->trace_out(cm, "udt4 transport connect to host %s", host_name);
254     }
255     if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
256 		    /* value pointer */ (attr_value *) (long) &host_ip)) {
257 	svc->trace_out(cm, "udt4 transport found no IP_ADDR attribute");
258 	/* wasn't there */
259 	host_ip = 0;
260     } else {
261 	svc->trace_out(cm, "udt4 transport connect to host_IP %lx",
262 		       host_ip);
263     }
264     if ((host_name == NULL) && (host_ip == 0))
265 	return -1;
266 
267     if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
268 		    /* value pointer */
269 		    (attr_value *) (long) &int_port_num)) {
270 	svc->trace_out(cm, "udt4 transport found no IP_PORT attribute");
271 	return -1;
272     } else {
273 	svc->trace_out(cm, "udt4 transport connect to port %d",
274 		       int_port_num);
275     }
276     port_num = int_port_num;
277 
278     /* we should already be listening, but if not, listen */
279     if (utd->listen_port == -1) {
280 	attr_list l =
281 	    libcmudt4_LTX_non_blocking_listen(cm, svc, trans, NULL);
282 	if (l)
283 	    free_attr_list(l);
284     }
285     /* INET socket connection, host_name is the machine name */
286     char ip_str[INET_ADDRSTRLEN];
287 
288     if ((sock = UDT::socket(AF_INET, SOCK_STREAM, 0)) == SOCKET_ERROR) {
289 	svc->trace_out(cm,
290 		       " UDT4 connect FAILURE --> Couldn't create socket");
291 	return -1;
292     }
293     ((struct sockaddr_in *) &sock_addr)->sin_family = AF_INET;
294     if (host_name != NULL) {
295 	if (check_host(host_name, (void *) &sock_addr.s_I4.sin_addr) == 0) {
296 	    if (host_ip == 0) {
297 		svc->trace_out(cm,
298 			       "UDT4 connect FAILURE --> Host not found \"%s\", no IP addr supplied in contact list",
299 			       host_name);
300 	    } else {
301 		svc->trace_out(cm,
302 			       "CMUDT4 --> Host not found \"%s\", Using supplied IP addr %x",
303 			       host_name == NULL ? "(unknown)" : host_name,
304 			       host_ip);
305 		sock_addr.s_I4.sin_addr.s_addr = ntohl(host_ip);
306 	    }
307 	}
308     } else {
309 	sock_addr.s_I4.sin_addr.s_addr = ntohl(host_ip);
310     }
311     sock_addr.s_I4.sin_port = htons(port_num);
312     remote_IP = ntohl(sock_addr.s_I4.sin_addr.s_addr);
313     if (is_private_192(remote_IP)) {
314 	svc->trace_out(cm,
315 		       "Target IP is on a private 192.168.x.x network");
316     }
317     if (is_private_182(remote_IP)) {
318 	svc->trace_out(cm, "Target IP is on a private 182.16.x.x network");
319     }
320     if (is_private_10(remote_IP)) {
321 	svc->trace_out(cm, "Target IP is on a private 10.x.x.x network");
322     }
323     inet_ntop(AF_INET, &sock_addr.s_I4.sin_addr, ip_str, INET_ADDRSTRLEN);
324     svc->trace_out(cm,
325 		   "Attempting udt4 socket connection, host=\"%s\", IP = %s, port %d",
326 		   host_name == 0 ? "(unknown)" : host_name, ip_str,
327 		   ntohs(sock_addr.s_I4.sin_port));
328     if (UDT::
329 	connect(sock, (struct sockaddr *) &sock_addr,
330 		sizeof(sock_addr.s_I4)) == SOCKET_ERROR) {
331 	printf("Errno was %d\n", errno);
332 	svc->trace_out(cm,
333 		       "UDT4 connect FAILURE --> Connect() to IP %s failed",
334 		       ip_str);
335 	UDT::close(sock);
336     }
337     int local_listen_port = htonl(utd->listen_port);
338     if (UDT::send(sock, (char *) &local_listen_port, 4, 0) < 0) {
339 	svc->trace_out(cm, "UDT4 send of listen port failed\n");
340 	return -1;
341     }
342 
343     svc->trace_out(cm, "--> Connection established");
344     ucd->remote_IP = remote_IP;
345     ucd->remote_contact_port = int_port_num;
346     ucd->udtsock = sock;
347     ucd->utd = utd;
348 
349     add_attr(conn_attr_list, CM_FD, Attr_Int4, (attr_value) (long) sock);
350     add_attr(conn_attr_list, CM_THIS_CONN_PORT, Attr_Int4,
351 	     (attr_value) (long) int_port_num);
352     add_attr(conn_attr_list, CM_PEER_IP, Attr_Int4,
353 	     (attr_value) (long) ucd->remote_IP);
354     add_sock_map_data(utd, sock, ucd);
355     int localFD = UDT::usock_getfd(sock);
356     svc->trace_out(cm, "Adding UDT4 UDP socket %d to select list\n",
357 		   localFD);
358     svc->fd_add_select(cm, localFD, udt4_service_epoll, (void *) trans,
359 		       (void *) utd);
360     return sock;
361 }
362 
363 /*
364  * Initiate a socket connection with another data exchange.  If port_num is -1,
365  * establish a unix socket connection (name_str stores the file name of
366  * the waiting socket).  Otherwise, establish an INET socket connection
367  * (name_str stores the machine name).
368  */
369 extern "C" CMConnection
libcmudt4_LTX_initiate_conn(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs)370 libcmudt4_LTX_initiate_conn(CManager cm, CMtrans_services svc,
371 			    transport_entry trans, attr_list attrs)
372 {
373     udt4_conn_data_ptr ucd = create_udt4_conn_data(svc);
374     attr_list conn_attr_list = create_attr_list();
375     CMConnection conn;
376     int sock;
377     udt4_transport_data_ptr utd =
378 	(udt4_transport_data_ptr) trans->trans_data;
379 
380     if (utd->cm) {
381 	/* assert CM is locked */
382 	assert(CM_LOCKED(svc, utd->cm));
383     }
384     if ((sock =
385 	 initiate_conn(cm, svc, trans, attrs, ucd, conn_attr_list)) < 0)
386 	return NULL;
387 
388     add_attr(conn_attr_list, CM_PEER_LISTEN_PORT, Attr_Int4,
389 	     (attr_value) (long) ucd->remote_contact_port);
390     conn = svc->connection_create(trans, ucd, conn_attr_list);
391     ucd->utd = utd;
392     ucd->conn = conn;
393 
394     svc->trace_out(cm,
395 		   "Cmudt4 Adding trans->data_available as action on fd %d",
396 		   sock);
397     add_sock_map_data(utd, sock, ucd);
398     UDT::epoll_add_usock(utd->eid, sock, NULL);
399 
400     free_attr_list(conn_attr_list);
401     /* dump_sockinfo("initiate ", sock); */
402     svc->connection_addref(conn);	/* one ref count went to select
403 					 * (and CM), the other to the
404 					 * user */
405     return conn;
406 }
407 
408 /*
409  * Check to see that if we were to attempt to initiate a connection as
410  * indicated by the attribute list, would we be connecting to ourselves?
411  * For udt4, this involves checking to see if the host name is the
412  * same as ours and if the IP_PORT matches the one we are listening on.
413  */
414 extern "C" int
libcmudt4_LTX_self_check(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs)415 libcmudt4_LTX_self_check(CManager cm, CMtrans_services svc,
416 			 transport_entry trans, attr_list attrs)
417 {
418 
419     udt4_transport_data_ptr utd =
420 	(udt4_transport_data_ptr) trans->trans_data;
421     int host_addr;
422     int int_port_num;
423     char *host_name;
424     char my_host_name[256];
425     static int IP = 0;
426 
427     get_IP_config(my_host_name, sizeof(host_name), &IP, NULL, NULL, NULL,
428 		  NULL, svc->trace_out, (void *) cm);
429 
430     if (IP == 0) {
431 	if (IP == 0)
432 	    IP = INADDR_LOOPBACK;
433     }
434     if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
435 		    /* value pointer */ (attr_value *) (long) &host_name)) {
436 	svc->trace_out(cm,
437 		       "CMself check udt4 transport found no IP_HOST attribute");
438 	host_name = NULL;
439     }
440     if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
441 		    /* value pointer */ (attr_value *) (long) &host_addr)) {
442 	svc->trace_out(cm,
443 		       "CMself check udt4 transport found no IP_ADDR attribute");
444 	if (host_name == NULL)
445 	    return 0;
446 	host_addr = 0;
447     }
448     if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
449 		    /* value pointer */
450 		    (attr_value *) (long) &int_port_num)) {
451 	svc->trace_out(cm,
452 		       "CMself check udt4 transport found no IP_PORT attribute");
453 	return 0;
454     }
455     if (host_name && (strcmp(host_name, my_host_name) != 0)) {
456 	svc->trace_out(cm, "CMself check - Hostnames don't match");
457 	return 0;
458     }
459     if (host_addr && (IP != host_addr)) {
460 	svc->trace_out(cm,
461 		       "CMself check - Host IP addrs don't match, %lx, %lx",
462 		       IP, host_addr);
463 	return 0;
464     }
465     if (int_port_num != utd->listen_port) {
466 	svc->trace_out(cm, "CMself check - Ports don't match, %d, %d",
467 		       int_port_num, utd->listen_port);
468 	return 0;
469     }
470     svc->trace_out(cm, "CMself check returning TRUE");
471     return 1;
472 }
473 
474 extern "C" int
libcmudt4_LTX_connection_eq(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs,udt4_conn_data_ptr ucd)475 libcmudt4_LTX_connection_eq(CManager cm, CMtrans_services svc,
476 			    transport_entry trans, attr_list attrs,
477 			    udt4_conn_data_ptr ucd)
478 {
479 
480     int int_port_num;
481     int requested_IP = -1;
482     char *host_name = NULL;
483 
484     if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
485 		    /* value pointer */ (attr_value *) (long) &host_name)) {
486 	svc->trace_out(cm, "udt4 transport found no IP_HOST attribute");
487     }
488     if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
489 		    /* value pointer */
490 		    (attr_value *) (long) &int_port_num)) {
491 	svc->trace_out(cm,
492 		       "Conn Eq udt4 transport found no IP_PORT attribute");
493 	return 0;
494     }
495     if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
496 		    /* value pointer */
497 		    (attr_value *) (long) &requested_IP)) {
498 	svc->trace_out(cm, "udt4 transport found no IP_ADDR attribute");
499     }
500     if (requested_IP == -1) {
501 	check_host(host_name, (void *) &requested_IP);
502 	requested_IP = ntohl(requested_IP);
503 	svc->trace_out(cm, "IP translation for hostname %s is %x",
504 		       host_name, requested_IP);
505     }
506 
507     svc->trace_out(cm, "Socket Conn_eq comparing IP/ports %x/%d and %x/%d",
508 		   ucd->remote_IP, ucd->remote_contact_port,
509 		   requested_IP, int_port_num);
510     if ((ucd->remote_IP == requested_IP) &&
511 	(ucd->remote_contact_port == int_port_num)) {
512 	svc->trace_out(cm, "Socket Conn_eq returning TRUE");
513 	return 1;
514     }
515     svc->trace_out(cm, "Socket Conn_eq returning FALSE");
516     return 0;
517 }
518 
519 static attr_list
build_listen_attrs(CManager cm,CMtrans_services svc,udt4_transport_data_ptr sd,attr_list listen_info,int int_port_num)520 build_listen_attrs(CManager cm, CMtrans_services svc,
521 		   udt4_transport_data_ptr sd, attr_list listen_info,
522 		   int int_port_num)
523 {
524     char host_name[256];
525     attr_list ret_list;
526     int IP;
527     int use_hostname = 0;
528 
529     svc->trace_out(cm, "CMUdt4 listen succeeded on port %d", int_port_num);
530     get_IP_config(host_name, sizeof(host_name), &IP, NULL, NULL,
531 		  &use_hostname, listen_info, svc->trace_out, (void *) cm);
532 
533     ret_list = create_attr_list();
534 
535     if (sd) {
536 	sd->hostname = strdup(host_name);
537 	sd->listen_port = int_port_num;
538     }
539     if ((IP != 0) && !use_hostname) {
540 	add_attr(ret_list, CM_IP_ADDR, Attr_Int4, (attr_value) (long) IP);
541     }
542     if ((getenv("CMUseHostname") != NULL) || use_hostname) {
543 	add_attr(ret_list, CM_IP_HOSTNAME, Attr_String,
544 		 (attr_value) strdup(host_name));
545     } else if (IP == 0) {
546 	add_int_attr(ret_list, CM_IP_ADDR, INADDR_LOOPBACK);
547     }
548     add_attr(ret_list, CM_IP_PORT, Attr_Int4,
549 	     (attr_value) (long) int_port_num);
550 
551     add_attr(ret_list, CM_TRANSPORT, Attr_String,
552 	     (attr_value) strdup("udt4"));
553     return ret_list;
554 }
555 
556 /*
557  * Create an IP socket for connection from other CMs
558  */
559 extern "C" attr_list
libcmudt4_LTX_non_blocking_listen(CManager cm,CMtrans_services svc,transport_entry trans,attr_list listen_info)560 libcmudt4_LTX_non_blocking_listen(CManager cm, CMtrans_services svc,
561 				  transport_entry trans,
562 				  attr_list listen_info)
563 {
564     udt4_transport_data_ptr utd =
565 	(udt4_transport_data_ptr) trans->trans_data;
566     int length;
567     struct sockaddr_in sock_addr;
568     int sock_opt_val = 1;
569     int conn_sock;
570     int attr_port_num = 0;
571     u_short port_num = 0;
572     int port_range_low, port_range_high;
573     int use_hostname = 0;
574     int int_port_num;
575     int IP;
576     char host_name[256];
577 
578     if (utd->listen_port != -1) {
579 	/* we're already listening */
580 	if (port_num == 0) {
581 	    /* not requesting a specific port, return what we have */
582 	    return build_listen_attrs(cm, svc, NULL, listen_info,
583 				      utd->listen_port);
584 	} else {
585 	    printf
586 		("CMlisten_specific() requesting a specific port follows other Udt4 operation which initiated listen at another port.  Only one listen allowed, second listen fails.\n");
587 	    return NULL;
588 	}
589     }
590 
591     conn_sock = UDT::socket(AF_INET, SOCK_STREAM, 0);
592 
593     if (conn_sock == SOCKET_ERROR) {
594 	fprintf(stderr, "Cannot open INET socket\n");
595 	return NULL;
596     }
597     if (utd->cm) {
598 	/* assert CM is locked */
599 	assert(CM_LOCKED(svc, utd->cm));
600     }
601     /*
602      *  Check to see if a bind to a specific port was requested
603      */
604     if (listen_info != NULL
605 	&& !query_attr(listen_info, CM_IP_PORT,
606 		       NULL, (attr_value *) (long) &attr_port_num)) {
607 	port_num = 0;
608     } else {
609 	if (attr_port_num > USHRT_MAX || attr_port_num < 0) {
610 	    fprintf(stderr, "Requested port number %d is invalid\n",
611 		    attr_port_num);
612 	    return NULL;
613 	}
614 	port_num = attr_port_num;
615     }
616 
617     svc->trace_out(cm, "UDT4 begin listen, requested port %d",
618 		   attr_port_num);
619     get_IP_config(host_name, sizeof(host_name), &IP, &port_range_low,
620 		  &port_range_high, &use_hostname, listen_info,
621 		  svc->trace_out, (void *) cm);
622 
623     sock_addr.sin_family = AF_INET;
624     sock_addr.sin_addr.s_addr = INADDR_ANY;
625     sock_addr.sin_port = htons(port_num);
626 
627     if (sock_addr.sin_port != 0) {
628 	/* specific port requested. set REUSEADDR, REUSEPORT because
629 	 * previous server might have died badly */
630 	if (UDT::
631 	    setsockopt(conn_sock, SOL_SOCKET, UDT_REUSEADDR,
632 		       (char *) &sock_opt_val,
633 		       sizeof(sock_opt_val)) != 0) {
634 	    fprintf(stderr,
635 		    "Failed to set REUSEADDR on INET socket before bind\n");
636 	    perror("setsockopt(SO_REUSEADDR) failed");
637 	    return NULL;
638 	}
639 	svc->trace_out(cm, "UDT4 trying to bind selected port %d",
640 		       port_num);
641 	if (UDT::
642 	    bind(conn_sock, (struct sockaddr *) &sock_addr,
643 		 sizeof sock_addr) == SOCKET_ERROR) {
644 	    fprintf(stderr, "Cannot bind INET socket\n");
645 	    return NULL;
646 	}
647 	/* begin listening for conns */
648 	if (UDT::listen(conn_sock, FD_SETSIZE)) {
649 	    fprintf(stderr, "listen failed %s\n",
650 		    UDT::getlasterror().getErrorMessage());
651 	    return NULL;
652 	}
653     } else if (port_range_high == -1) {
654 	svc->trace_out(cm, "UDT4 trying to bind to any available port");
655 	sock_addr.sin_port = 0;
656 	if (UDT::
657 	    bind(conn_sock, (struct sockaddr *) &sock_addr,
658 		 sizeof sock_addr) == SOCKET_ERROR) {
659 	    fprintf(stderr, "Cannot bind INET socket\n");
660 	    return NULL;
661 	}
662 	/* begin listening for conns */
663 	if (UDT::listen(conn_sock, FD_SETSIZE)) {
664 	    fprintf(stderr, "listen failed %s\n",
665 		    UDT::getlasterror().getErrorMessage());
666 	    return NULL;
667 	}
668 
669     } else {
670 	long seedval = time(NULL) + getpid();
671 	/* port num is free.  Constrain to range to standards */
672 	int size = port_range_high - port_range_low;
673 	int tries = 30;
674 	int result = SOCKET_ERROR;
675 	srand48(seedval);
676 	while (tries > 0) {
677 	    int target = port_range_low + size * drand48();
678 	    sock_addr.sin_port = htons(target);
679 	    int_port_num = target;
680 	    svc->trace_out(cm, "UDT4 trying to bind port %d", target);
681 	    result = UDT::bind(conn_sock, (struct sockaddr *) &sock_addr,
682 			       sizeof sock_addr);
683 	    tries--;
684 	    /* begin listening for conns */
685 	    result = UDT::listen(conn_sock, FD_SETSIZE);
686 	    if (result != UDT::ERROR) {
687 		tries = 0;
688 	    } else {
689 		/*
690 		 *  For some reason, UDT4 gives conflicting port errors
691 		 *  on listen rather than bind.  You can't bind a socket
692 		 *  more than once, so on error we have to close the old
693 		 *  and start with a new UDT::socket.
694 		 */
695 		UDT::close(conn_sock);
696 		conn_sock = UDT::socket(AF_INET, SOCK_STREAM, 0);
697 	    }
698 
699 	    if (tries % 5 == 4) {
700 		/* try reseeding in case we're in sync with another
701 		 * process */
702 		srand48(time(NULL) + getpid());
703 	    }
704 	    if (tries == 20) {
705 		/* damn, tried a lot, increase the range (This might
706 		 * violate specified range) */
707 		size *= 10;
708 	    }
709 	    if (tries == 10) {
710 		/* damn, tried a lot more, increase the range (This might
711 		 * violate specified range) */
712 		size *= 10;
713 	    }
714 	}
715 	if (result == SOCKET_ERROR) {
716 	    fprintf(stderr, "Cannot bind INET socket\n");
717 	    return NULL;
718 	}
719     }
720     length = sizeof sock_addr;
721     if (UDT::
722 	getsockname(conn_sock, (struct sockaddr *) &sock_addr,
723 		    &length) < 0) {
724 	fprintf(stderr, "Cannot get socket name\n");
725 	return NULL;
726     }
727     /* set the port num as one we can be contacted at */
728 
729     UDT::epoll_add_usock(utd->eid, conn_sock, NULL);
730     int localFD = UDT::usock_getfd(conn_sock);
731     svc->trace_out(cm, "Adding UDT4 UDP socket %d to select list\n",
732 		   localFD);
733     svc->fd_add_select(cm, localFD, udt4_service_epoll, (void *) trans,
734 		       (void *) utd);
735     utd->listen_sock = conn_sock;
736 
737     {
738 	attr_list ret_list;
739 
740 	svc->trace_out(cm, "UDT4 listen succeeded on port %d, fd %d",
741 		       int_port_num, conn_sock);
742 	ret_list = create_attr_list();
743 
744 	utd->hostname = strdup(host_name);
745 	utd->listen_port = int_port_num;
746 	add_attr(ret_list, CM_TRANSPORT, Attr_String,
747 		 (attr_value) strdup("udt4"));
748 	if ((IP != 0) && (!use_hostname)) {
749 	    add_attr(ret_list, CM_IP_ADDR, Attr_Int4,
750 		     (attr_value) (long) IP);
751 	}
752 	if ((getenv("Cmudt4UseHostname") != NULL) || use_hostname) {
753 	    add_attr(ret_list, CM_IP_HOSTNAME, Attr_String,
754 		     (attr_value) strdup(host_name));
755 	} else if (IP == 0) {
756 	    add_attr(ret_list, CM_IP_ADDR, Attr_Int4,
757 		     (attr_value) INADDR_LOOPBACK);
758 	}
759 	add_attr(ret_list, CM_IP_PORT, Attr_Int4,
760 		 (attr_value) (long) int_port_num);
761 
762 	return ret_list;
763     }
764 }
765 
766 #ifdef NEED_IOVEC_DEFINE
767 struct iovec
768 {
769     void *iov_base;
770     int iov_len;
771 };
772 
773 #endif
774 
775 
776 #ifndef IOV_MAX
777 /* this is not defined in some places where it should be.  Conservative. */
778 #define IOV_MAX 16
779 #endif
780 
781 
782 extern "C" int
libcmudt4_LTX_writev_func(CMtrans_services svc,udt4_conn_data_ptr ucd,void * iovs,int iovcnt,attr_list attrs)783 libcmudt4_LTX_writev_func(CMtrans_services svc, udt4_conn_data_ptr ucd,
784 			  void *iovs, int iovcnt, attr_list attrs)
785 {
786     int left = 0;
787     struct iovec *iov = (struct iovec *) iovs;
788     /* sum lengths */
789     for (int i = 0; i < iovcnt; i++)
790 	left += iov[i].iov_len;
791 
792     svc->trace_out(ucd->utd->cm, "UDT4 writev of %d bytes on socket %d",
793 		   left, ucd->udtsock);
794     /*
795      * stupid implementation of writev first
796      *
797      * we'll copy all the data into a single block, then send it with one send()
798      */
799     char *buffer = (char *) malloc(left), *tmp;
800     tmp = buffer;
801     for (int i = 0; i < iovcnt; i++) {
802 	memcpy(tmp, iov[i].iov_base, iov[i].iov_len);
803 	tmp += iov[i].iov_len;
804     }
805     tmp = buffer;
806     while (left > 0) {
807 	int sent = UDT::send(ucd->udtsock, tmp, left, 0);
808 
809 	if (sent < 0) {
810 	    svc->trace_out(ucd->utd->cm,
811 			   "UDT4 send failed, error was %s\n",
812 			   UDT::getlasterror().getErrorMessage());
813 	    return 0;
814 	}
815 	left -= sent;
816 	tmp += sent;
817     }
818     free(buffer);
819     return iovcnt;
820 }
821 
822 static void
free_udt4_data(CManager cm,void * utdv)823 free_udt4_data(CManager cm, void *utdv)
824 {
825     udt4_transport_data_ptr utd = (udt4_transport_data_ptr) utdv;
826     CMtrans_services svc = utd->svc;
827     if (utd->hostname != NULL)
828 	svc->free_func(utd->hostname);
829     svc->free_func(utd);
830 }
831 
832 extern "C" int
libcmudt4_LTX_read_to_buffer_func(CMtrans_services svc,udt4_conn_data_ptr ucd,void * buffer,int requested_len,int non_blocking)833 libcmudt4_LTX_read_to_buffer_func(CMtrans_services svc,
834 				  udt4_conn_data_ptr ucd, void *buffer,
835 				  int requested_len, int non_blocking)
836 {
837     int left, iget;
838     int rcv_size, var_size = sizeof(rcv_size);
839     svc->trace_out(ucd->utd->cm,
840 		   "CMUDT4 read of %d bytes on fd %d, non_block %d",
841 		   requested_len, ucd->udtsock, non_blocking);
842     UDT::getsockopt(ucd->udtsock, 0, UDT_RCVDATA, &rcv_size, &var_size);
843     if (non_blocking && (rcv_size == 0)) {
844 	svc->trace_out(ucd->utd->cm,
845 		       "CMUDT4 socket state is %d, rcv_size = %d, non_blocking read would block, returning 0\n",
846 		       UDT::getsockstate(ucd->udtsock), rcv_size);
847 	return 0;
848     }
849 
850     if (UDT::ERROR ==
851 	(iget =
852 	 UDT::recv(ucd->udtsock, (char *) buffer, requested_len, 0))) {
853 	svc->trace_out(ucd->utd->cm, "CMUDT4 recv failed, error was %s\n",
854 		       UDT::getlasterror().getErrorMessage());;
855     }
856     if ((iget == -1) || (iget == 0)) {
857 	int lerrno = errno;
858 	if ((lerrno != EWOULDBLOCK) &&
859 	    (lerrno != EAGAIN) && (lerrno != EINTR)) {
860 	    /* serious error */
861 	    svc->trace_out(ucd->utd->cm,
862 			   "CMUDT4 iget was -1, errno is %d, returning 0 for read",
863 			   lerrno);
864 	    return -1;
865 	} else {
866 	    if (non_blocking) {
867 		svc->trace_out(ucd->utd->cm,
868 			       "CMUDT4 iget was -1, would block, errno is %d",
869 			       lerrno);
870 		return 0;
871 	    }
872 	    return -1;
873 	}
874     }
875     left = requested_len - iget;
876     while (left > 0) {
877 	int lerrno;
878 	if (UDT::ERROR ==
879 	    (iget =
880 	     UDT::recv(ucd->udtsock,
881 		       ((char *) buffer) + requested_len - left, left,
882 		       0))) {
883 	    svc->trace_out(ucd->utd->cm, "recv failed, message was %s\n",
884 			   UDT::getlasterror().getErrorMessage());
885 	}
886 	lerrno = errno;
887 	if (iget == -1) {
888 	    if ((lerrno != EWOULDBLOCK) &&
889 		(lerrno != EAGAIN) && (lerrno != EINTR)) {
890 		/* serious error */
891 		svc->trace_out(ucd->utd->cm,
892 			       "Cmudt4 iget was -1, errno is %d, returning %d for read",
893 			       lerrno, requested_len - left);
894 		return (requested_len - left);
895 	    } else {
896 		iget = 0;
897 	    }
898 	} else if (iget == 0) {
899 	    svc->trace_out(ucd->utd->cm,
900 			   "Cmudt4 iget was 0, errno is %d, returning %d for read",
901 			   lerrno, requested_len - left);
902 	    return requested_len - left;	/* end of file */
903 	}
904 	left -= iget;
905     }
906     return requested_len;
907 }
908 
909 static void
udt4_service_epoll(void * void_trans,void * not_used)910 udt4_service_epoll(void *void_trans, void *not_used)
911 {
912     transport_entry trans = (transport_entry) void_trans;
913     udt4_transport_data_ptr utd =
914 	(udt4_transport_data_ptr) trans->trans_data;
915     CManager cm = utd->cm;
916     CMtrans_services svc = utd->svc;
917     std::set < UDTSOCKET > readfds;
918     if (!(CM_LOCKED(svc, utd->cm))) {
919 	printf("UTD4 service network, CManager not locked\n");
920     }
921 
922     int ret = UDT::epoll_wait(utd->eid, &readfds, NULL, 50);
923     svc->trace_out(utd->cm, "Epoll_wait returned %d\n", ret);
924     for (std::set < UDTSOCKET >::iterator i = readfds.begin();
925 	 i != readfds.end(); ++i) {
926 	if (*i == utd->listen_sock) {
927 	    sockaddr_storage clientaddr;
928 	    attr_list conn_attr_list;
929 	    struct sockaddr_in remote;
930 	    int remote_len = sizeof(remote);
931 	    int addrlen = sizeof(clientaddr);
932 	    UDTSOCKET new_sock =
933 		UDT::accept(*i, (sockaddr *) & clientaddr, &addrlen);
934 	    udt4_conn_data_ptr ucd = create_udt4_conn_data(svc);
935 	    ucd->udtsock = new_sock;
936 	    ucd->utd = utd;
937 	    conn_attr_list = create_attr_list();
938 	    CMConnection conn =
939 		svc->connection_create(trans, ucd, conn_attr_list);
940 	    ucd->conn = conn;
941 	    UDT::getpeername(ucd->udtsock, (struct sockaddr *) &remote,
942 			     &remote_len);
943 	    add_int_attr(conn_attr_list, CM_PEER_IP,
944 			 ntohl(remote.sin_addr.s_addr));
945 	    ucd->remote_IP = ntohl(remote.sin_addr.s_addr);	/* remote_IP
946 								 * is in
947 								 * host
948 								 * byte
949 								 * order */
950 	    if (UDT::
951 		recv(ucd->udtsock, (char *) &ucd->remote_contact_port, 4,
952 		     0) < 0) {
953 		svc->trace_out(utd->cm,
954 			       "Remote host dropped connection without data");
955 		return;
956 	    } else {
957 		ucd->remote_contact_port = ntohl(ucd->remote_contact_port);
958 	    }
959 	    add_attr(conn_attr_list, CM_PEER_LISTEN_PORT, Attr_Int4,
960 		     (attr_value) (long) ucd->remote_contact_port);
961 	    struct in_addr addr;
962 	    addr.s_addr = htonl(ucd->remote_IP);
963 	    svc->trace_out(trans->cm,
964 			   "Remote host (IP %s) is listening at port %d\n",
965 			   inet_ntoa(addr), ucd->remote_contact_port);
966 	    free_attr_list(conn_attr_list);
967 	    add_sock_map_data(utd, new_sock, ucd);
968 	    UDT::epoll_add_usock(utd->eid, new_sock);
969 	} else {
970 	    // int rcv_size = 1024000;
971 	    // int var_size = sizeof(int), rs;
972 	    udt4_conn_data_ptr ucd = NULL;
973 	    for (int j = 0; j < utd->conn_count; j++) {
974 		if (utd->map[j].udtsock == *i) {
975 		    ucd = utd->map[j].ucd;
976 		}
977 	    }
978 	    if (ucd == NULL) {
979 		printf
980 		    ("Internal consistency error, conn data for sock %x not found\n",
981 		     *i);
982 	    }
983 	    svc->trace_out(cm, "Calling data available on connection %p\n",
984 			   ucd->conn);
985 	    (trans->data_available) (trans, ucd->conn);
986 	}
987     }
988 }
989 
990 extern "C" void *
libcmudt4_LTX_initialize(CManager cm,CMtrans_services svc,transport_entry trans)991 libcmudt4_LTX_initialize(CManager cm, CMtrans_services svc,
992 			 transport_entry trans)
993 {
994     static int atom_init = 0;
995 
996     (void) trans;
997     udt4_transport_data_ptr udt4_data;
998     svc->trace_out(cm, "Initialize UDP4 transport built in %s",
999 		   EVPATH_MODULE_BUILD_DIR);
1000 
1001     if (atom_init == 0) {
1002 	CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
1003 	CM_IP_PORT = attr_atom_from_string("IP_PORT");
1004 	CM_IP_ADDR = attr_atom_from_string("IP_ADDR");
1005 	CM_FD = attr_atom_from_string("CONNECTION_FILE_DESCRIPTOR");
1006 	CM_THIS_CONN_PORT = attr_atom_from_string("THIS_CONN_PORT");
1007 	CM_PEER_CONN_PORT = attr_atom_from_string("PEER_CONN_PORT");
1008 	CM_TRANSPORT = attr_atom_from_string("CM_TRANSPORT");
1009 	CM_PEER_IP = attr_atom_from_string("PEER_IP");
1010 	CM_PEER_HOSTNAME = attr_atom_from_string("PEER_HOSTNAME");
1011 	CM_PEER_LISTEN_PORT = attr_atom_from_string("PEER_LISTEN_PORT");
1012 	CM_TRANSPORT_RELIABLE =
1013 	    attr_atom_from_string("CM_TRANSPORT_RELIABLE");
1014 	atom_init++;
1015     }
1016     UDT::startup();
1017     udt4_data =
1018 	(udt4_transport_data_ptr) svc->
1019 	malloc_func(sizeof(struct udt4_transport_data));
1020     memset(udt4_data, 0, sizeof(struct udt4_transport_data));
1021     udt4_data->cm = cm;
1022     udt4_data->hostname = NULL;
1023     udt4_data->listen_port = -1;
1024     udt4_data->svc = svc;
1025     udt4_data->characteristics = create_attr_list();
1026     add_int_attr(udt4_data->characteristics, CM_TRANSPORT_RELIABLE, 1);
1027     svc->add_shutdown_task(cm, free_udt4_data, (void *) udt4_data,
1028 			   FREE_TASK);
1029     udt4_data->eid = UDT::epoll_create();
1030     int localFD = UDT::epoll_getfd(udt4_data->eid);
1031     svc->trace_out(cm, "Adding FD %d to select list", localFD);
1032 
1033     svc->fd_add_select(cm, localFD, udt4_service_epoll,
1034 		       (void *) trans, (void *) udt4_data);
1035     // svc->add_periodic_task(cm, 1, 0, (CMPollFunc) udt4_poll_epoll,
1036     // (void *)trans);
1037     return (void *) udt4_data;
1038 }
1039 
1040 extern "C" attr_list
libcmudt4_LTX_get_transport_characteristics(transport_entry trans,CMtrans_services svc,void * vutd)1041 libcmudt4_LTX_get_transport_characteristics(transport_entry trans,
1042 					    CMtrans_services svc,
1043 					    void *vutd)
1044 {
1045     struct udt4_transport_data *utd = (struct udt4_transport_data *) vutd;
1046     return utd->characteristics;
1047 }
1048 
1049 extern "C" void *
libcmudt4_LTX_read_block_func(CMtrans_services svc,udt4_conn_data_ptr conn_data,int * actual_len,int * offset_ptr)1050 libcmudt4_LTX_read_block_func(CMtrans_services svc,
1051 			      udt4_conn_data_ptr conn_data,
1052 			      int *actual_len, int *offset_ptr)
1053 {
1054     CMbuffer cb;
1055 
1056     if (conn_data->read_buffer_len == -1)
1057 	return NULL;
1058 
1059     *actual_len = conn_data->read_buffer_len;
1060     *offset_ptr = 0;
1061     cb = conn_data->read_buffer;
1062     conn_data->read_buffer_len = 0;
1063     conn_data->read_buffer = NULL;
1064     return cb;
1065 }
1066 
1067 extern "C" transport_entry
cmudt4_add_static_transport(CManager cm,CMtrans_services svc)1068 cmudt4_add_static_transport(CManager cm, CMtrans_services svc)
1069 {
1070     transport_entry transport;
1071     transport =
1072 	(transport_entry) svc->malloc_func(sizeof(struct _transport_item));
1073     memset(transport, 0, sizeof(*transport));
1074     transport->trans_name = strdup("udt4");
1075     transport->cm = cm;
1076     transport->transport_init =
1077 	(CMTransport_func) libcmudt4_LTX_initialize;
1078     transport->listen =
1079 	(CMTransport_listen_func) libcmudt4_LTX_non_blocking_listen;
1080     transport->initiate_conn =
1081 	(CMTransport_conn_func) libcmudt4_LTX_initiate_conn;
1082     transport->self_check =
1083 	(CMTransport_self_check_func) libcmudt4_LTX_self_check;
1084     transport->connection_eq =
1085 	(CMTransport_connection_eq_func) libcmudt4_LTX_connection_eq;
1086     transport->shutdown_conn =
1087 	(CMTransport_shutdown_conn_func) libcmudt4_LTX_shutdown_conn;
1088     transport->read_block_func = (CMTransport_read_block_func) NULL;
1089     transport->read_to_buffer_func =
1090 	(CMTransport_read_to_buffer_func)
1091 	libcmudt4_LTX_read_to_buffer_func;
1092     transport->writev_func =
1093 	(CMTransport_writev_func) libcmudt4_LTX_writev_func;
1094     transport->NBwritev_func = (CMTransport_writev_func) NULL;
1095 
1096     transport->set_write_notify = (CMTransport_set_write_notify_func) NULL;
1097     transport->get_transport_characteristics =
1098 	(CMTransport_get_transport_characteristics)
1099 	libcmudt4_LTX_get_transport_characteristics;
1100     if (transport->transport_init) {
1101 	transport->trans_data =
1102 	    transport->transport_init(cm, svc, transport);
1103     }
1104     return transport;
1105 }
1106