1 /***** Includes *****/
2 #include "config.h"
3 #include <sys/types.h>
4 
5 #include <inttypes.h>
6 #include <getopt.h>
7 
8 
9 #include <time.h>
10 #ifdef HAVE_SYS_TIME_H
11 #include <sys/time.h>
12 #endif
13 #ifdef HAVE_SYS_TIMES_H
14 #include <sys/times.h>
15 #endif
16 #include <sys/socket.h>
17 #ifdef HAVE_SYS_SOCKIO_H
18 #include <sys/sockio.h>
19 #endif
20 #ifdef HAVE_SYS_UN_H
21 #include <sys/un.h>
22 #endif
23 #ifdef HAVE_SYS_UIO_H
24 #include <sys/uio.h>
25 #endif
26 #ifdef HAVE_HOSTLIB_H
27 #include "hostLib.h"
28 #endif
29 #ifdef HAVE_STREAMS_UN_H
30 #include <streams/un.h>
31 #endif
32 #include <netinet/in.h>
33 #include <arpa/inet.h>
34 #ifdef HAVE_NETDB_H
35 #include <netdb.h>
36 #endif
37 #include <stdio.h>
38 #include <fcntl.h>
39 #ifndef HAVE_WINDOWS_H
40 #include <net/if.h>
41 #include <netinet/tcp.h>
42 #include <sys/ioctl.h>
43 #include <errno.h>
44 #endif
45 #ifdef HAVE_UNISTD_H
46 #include <unistd.h>
47 #endif
48 #undef NDEBUG
49 #include <assert.h>
50 #include <signal.h>
51 #include <stdlib.h>
52 #include <string.h>
53 #include <limits.h>
54 #ifdef HAVE_MEMORY_H
55 #include <memory.h>
56 #endif
57 #include <rdma/fabric.h>
58 #include <rdma/fi_endpoint.h>
59 #include <rdma/fi_rma.h>
60 #include <rdma/fi_cm.h>
61 #include <rdma/fi_errno.h>
62 #include <rdma/fi_eq.h>
63 
64 #include <atl.h>
65 #include "evpath.h"
66 #include "cm_transport.h"
67 #include "cm_internal.h"
68 #include "ev_select.h"
69 
70 #include <stdlib.h>
71 
72 #ifndef SOCKET_ERROR
73 #define SOCKET_ERROR -1
74 #endif
75 
76 #define _WITH_IB_
77 #define PIGGYBACK 1025*10
78 
79 #ifdef _WITH_IB_
80 
81 
82 #if defined (__INTEL_COMPILER)
83 #  pragma warning (disable: 869)
84 #  pragma warning (disable: 310)
85 #  pragma warning (disable: 1418)
86 #  pragma warning (disable: 180)
87 #  pragma warning (disable: 2259)
88 #  pragma warning (disable: 177)
89 #endif
90 
91 
92 //   BEGIN from shared.h in fabtests
93 
94 /* haven't tested with earlier than version 1.2 */
95 #define FT_FIVERSION FI_VERSION(1,2)
96 
97 void cq_readerr(struct fid_cq *cq, char *cq_str);
98 
99 
100 #define FT_PRINTERR(call, retv) \
101 	do { fprintf(stderr, call "(): %d, %d (%s)\n", __LINE__, (int) retv, fi_strerror((int) -retv)); } while (0)
102 
103 #define FT_ERR(fmt, ...) \
104 	do { fprintf(stderr, "%s:%d: " fmt, __FILE__, __LINE__, ##__VA_ARGS__); } while (0)
105 
106 #define MAX(a,b) (((a)>(b))?(a):(b))
107 #define MIN(a,b) (((a)<(b))?(a):(b))
108 
109 //   END from shared.h in fabtests
110 
111 
112 //all the message types have a queue associated with them
113 static char *msg_string[] = {"Request", "Response", "Piggyback"};
114 enum {msg_request = 0, msg_response = 1, msg_piggyback = 2} msg_type;
115 
116 struct remote_entry {
117     uint64_t remote_addr;
118     uint32_t length;
119     uint64_t rkey;
120 };
121 
122 struct request
123 {
124     uint64_t length;
125     uint32_t iovcnt;
126     uint32_t piggyback_length;
127     uint64_t request_ID;
128     struct remote_entry  read_list[1];
129 };
130 
131 
132 struct response
133 {
134     uint64_t max_length;
135     uint64_t request_ID;
136 };
137 
138 struct piggyback
139 {
140     uint32_t total_length;
141     uint32_t padding;
142     char  body[4];
143 };
144 
145 
146 struct control_message
147 {
148     int type;
149     union {
150 	struct request req;
151 	struct response resp;
152 	struct piggyback pb;
153     } u;
154 };
155 
156 #define ptr_from_int64(p) (void *)(unsigned long)(p)
157 #define int64_from_ptr(p) (u_int64_t)(unsigned long)(p)
158 
159 
160 struct msg_item;
161 
162 typedef enum pull_status_t {PULL_UNSUBMITTED = 0, PULL_SUBMITTED, PULL_COMPLETED, PULL_ACCOUNTED} pull_status_t;
163 
164 struct pull_record {
165     struct remote_entry remote;
166     char *dest;
167     pull_status_t status;
168     struct msg_item *parent;
169 };
170 
171 struct mr_list_item;
172 struct fabric_connection_data;
173 
174 typedef struct msg_item {
175     struct fabric_connection_data *conn_data;
176     char *buffer;
177     long cur_buffer_size;
178     long message_size;
179     uint64_t request_ID;
180     struct fid_mr *mr;
181     int pull_count;
182     struct pull_record *pulls;
183     struct mr_list_item *mr_rec;
184     struct msg_item *next;
185 } *msg_item_p;
186 
187 struct fabric_client_data;
188 
189 typedef struct mr_list_item {
190     struct fabric_client_data *fabd;
191     long buffer_length;
192     void *buffer;
193     struct fid_mr *mr;
194     int in_use;
195     struct mr_list_item *next;
196 } *mr_list;
197 
198 typedef struct fabric_client_data {
199     CManager cm;
200     CMtrans_services svc;
201     transport_entry trans;
202     struct fi_info *hints;
203 
204     struct fid_fabric *fab;
205     struct fid_pep *listen_ep;
206     struct fid_domain *dom;
207     struct fid_eq *cmeq;
208 
209     char *hostname;
210     int listen_port;
211     int lid;
212     int qpn;
213     int psn;
214     int port;
215     struct ibv_device *ibdev;
216     struct ibv_context *context;
217     struct ibv_comp_channel *send_channel;
218     struct ibv_comp_channel *recv_channel;
219     struct ibv_pd *pd;
220     struct ibv_cq *recv_cq;
221     struct ibv_cq *send_cq;
222     struct ibv_srq *srq;
223     int max_sge;
224 
225     struct timeval pull_schedule_base;
226     struct timeval pull_schedule_period;
227     CMavail_period_ptr avail;
228 
229     mr_list existing_mr_list;
230 
231     int thread_init;
232     msg_item_p pull_queue;
233     msg_item_p completed_queue;
234     pthread_mutex_t pull_queue_mutex;
235     int thread_should_run;
236     struct fid_wait *send_waitset;
237     fd_set readset;
238     int nfds;
239     int wake_read_fd;
240     int wake_write_fd;
241     struct fabric_connection_data **fcd_array_by_sfd;
242     int ncqs;
243     struct fid **cq_array;
244 } *fabric_client_data_ptr;
245 
246 
247 typedef struct remote_info
248 {
249     int iovcnt;
250     struct fid_mr **mrlist;
251     struct iovec *iov;
252     CMcompletion_notify_func notify_func;
253     void *notify_client_data;
254 }rinfo;
255 
256 typedef struct fabric_connection_data {
257     fabric_client_data_ptr fabd;
258     struct fid_cq *rcq, *scq;
259     struct fid_mr *read_mr;
260     struct fid_mr *send_mr;
261     struct fid_ep *conn_ep;
262     size_t buffer_size;
263     void *mapped_recv_buf;
264     char *send_buf;
265     CMbuffer read_buf;
266     int max_credits;
267     int read_buffer_len;
268     int read_offset;
269 
270     char *remote_host;
271     int remote_IP;
272     int remote_contact_port;
273     int fd;
274     int sfd;   /* fd for the scq */
275     CMConnection conn;
276     struct ibv_qp *dataqp;
277     int infocount;
278     rinfo last_write;
279     int max_imm_data;
280 } *fabric_conn_data_ptr;
281 
msg_offset()282 static inline int msg_offset()
283 {
284 	return ((int) (((char *) (&(((struct control_message *)NULL)->u.pb.body[0]))) - ((char *) NULL)));
285 }
286 
287 static int alloc_cm_res(fabric_client_data_ptr fabd);
288 static int alloc_ep_res(fabric_conn_data_ptr fcd, struct fi_info *fi);
289 static int bind_ep_res(fabric_conn_data_ptr fcd);
290 static void free_ep_res(fabric_conn_data_ptr fcd);
291 static void
292 hand_to_pull_thread(CMtrans_services svc, fabric_client_data_ptr fabd,
293 		    msg_item_p msg);
294 
295 
296 static atom_t CM_FD = -1;
297 static atom_t CM_THIS_CONN_PORT = -1;
298 static atom_t CM_PEER_CONN_PORT = -1;
299 static atom_t CM_PEER_IP = -1;
300 static atom_t CM_PEER_HOSTNAME = -1;
301 static atom_t CM_PEER_LISTEN_PORT = -1;
302 static atom_t CM_NETWORK_POSTFIX = -1;
303 static atom_t CM_IP_PORT = -1;
304 static atom_t CM_IP_HOSTNAME = -1;
305 static atom_t CM_IP_ADDR = -1;
306 static atom_t CM_IP_INTERFACE = -1;
307 static atom_t CM_TRANSPORT = -1;
308 
309 static int
check_host(hostname,sin_addr)310 check_host(hostname, sin_addr)
311 	char *hostname;
312 void *sin_addr;
313 {
314 	struct hostent *host_addr;
315 	host_addr = gethostbyname(hostname);
316 	if (host_addr == NULL) {
317 		struct in_addr addr;
318 		if (inet_aton(hostname, &addr) == 0) {
319 			/*
320 			 *  not translatable as a hostname or
321 			 * as a dot-style string IP address
322 			 */
323 			return 0;
324 		}
325 		assert(sizeof(int) == sizeof(struct in_addr));
326 		*((int *) sin_addr) = *((int*) &addr);
327 	} else {
328 		memcpy(sin_addr, host_addr->h_addr, host_addr->h_length);
329 	}
330 	return 1;
331 }
332 
333 static fabric_conn_data_ptr
create_fabric_conn_data(CMtrans_services svc)334 create_fabric_conn_data(CMtrans_services svc)
335 {
336     fabric_conn_data_ptr fabric_conn_data = svc->malloc_func(sizeof(struct fabric_connection_data));
337     memset(fabric_conn_data, 0, sizeof(struct fabric_connection_data));
338     fabric_conn_data->remote_host = NULL;
339     fabric_conn_data->remote_contact_port = -1;
340     fabric_conn_data->fd = 0;
341     fabric_conn_data->read_buf = NULL;
342     fabric_conn_data->read_buffer_len = 0;
343 
344     return fabric_conn_data;
345 }
346 
347 
348 static void
handle_scq_completion(struct fi_cq_data_entry * comp)349 handle_scq_completion(struct fi_cq_data_entry *comp)
350 {
351     if (comp->flags & FI_READ) {
352 	struct pull_record *this_pull =
353 	    (struct pull_record *)comp->op_context;
354 //	printf("Got an FI_READ completion\n");
355 	this_pull->status = PULL_COMPLETED;
356     }
357     if (comp->flags & FI_SEND) {
358 	*(int*)comp->op_context = 1;
359     }
360 }
361 
internal_write_piggyback(CMtrans_services svc,fabric_conn_data_ptr fcd,int length,struct iovec * iov,int iovcnt)362 static int internal_write_piggyback(CMtrans_services svc,
363                                     fabric_conn_data_ptr fcd,
364                                     int length, struct iovec *iov, int iovcnt)
365 {
366     //this function is only called if length < piggyback
367     struct control_message *msg;
368     char *point;
369     int offset = msg_offset();
370     int i;
371 
372     if(length >= PIGGYBACK)
373     {
374 	//should never happen
375 	return -1;
376     }
377 
378     msg = malloc(offset + length);
379     memset(msg, 0, offset+length);
380     msg->type = msg_piggyback;
381     msg->u.pb.total_length = length + offset;
382     point = &msg->u.pb.body[0];
383     svc->trace_out(fcd->fabd->cm, "CMFABRIC sending piggyback msg of length %d,",
384 		   length);
385 
386 
387     for (i = 0; i < iovcnt; i++)
388     {
389 	memcpy(point, iov[i].iov_base, iov[i].iov_len);
390 	point += iov[i].iov_len;
391     }
392 
393     {
394 
395 	memcpy(fcd->send_buf, msg, msg->u.pb.total_length);
396 	int ret, sent = 0;
397 
398 	svc->trace_out(fcd->fabd->cm, "fi_send on conn_ep, length %d, send_buf %p\n", msg->u.pb.total_length, fcd->send_buf);
399 	ret = fi_send(fcd->conn_ep, fcd->send_buf, msg->u.pb.total_length, fi_mr_desc(fcd->send_mr), 0, &sent);
400 	if (ret) {
401 	    FT_PRINTERR("fi_send", ret);
402 	    return ret;
403 	}
404 
405 	/* Read send queue */
406 	do {
407 	    struct fi_cq_data_entry comp;
408 	    ret = fi_cq_read(fcd->scq, &comp, 1);
409 	    if (ret < 0 && ret != -FI_EAGAIN) {
410 		FT_PRINTERR("fi_cq_read", ret);
411 		cq_readerr(fcd->scq, " in internal write piggyback");
412 		return ret;
413 	    }
414 	    if (ret == 1) handle_scq_completion(&comp);
415 	} while (!sent);
416     }
417 
418     free(msg);
419     return 0;
420 }
421 
422 
internal_write_response(CMtrans_services svc,fabric_conn_data_ptr fcd,int length,int64_t request_ID)423 static int internal_write_response(CMtrans_services svc,
424                                    fabric_conn_data_ptr fcd,
425                                    int length,
426 				   int64_t request_ID)
427 {
428     struct control_message msg;
429 
430     msg.type = msg_response;
431     msg.u.resp.max_length = length;
432     msg.u.resp.request_ID = request_ID;
433 
434     memcpy(fcd->send_buf, &msg, sizeof(msg));
435     int ret, sent = 0;
436 
437     svc->trace_out(fcd->fabd->cm, "fi_send for write response\n");
438     ret = fi_send(fcd->conn_ep, fcd->send_buf, sizeof(msg), fi_mr_desc(fcd->send_mr), 0, &sent);
439     if (ret) {
440 	FT_PRINTERR("fi_send", ret);
441 	return ret;
442     }
443 
444     /* Read send queue */
445     do {
446 	struct fi_cq_data_entry comp;
447 	svc->trace_out(fcd->fabd->cm, "fi_cq_read for send completion in write response\n");
448 
449 	ret = fi_cq_read(fcd->scq, &comp, 1);
450 	if (ret < 0 && ret != -FI_EAGAIN) {
451 	    FT_PRINTERR("fi_cq_read", ret);
452 	    cq_readerr(fcd->scq, " in internal write response");
453 	    return ret;
454 	}
455 	if (ret == 1) handle_scq_completion(&comp);
456     } while (!sent);
457 
458     return 0;
459 }
460 
internal_write_request(CMtrans_services svc,fabric_conn_data_ptr fcd,int length,rinfo * request_info)461 static int internal_write_request(CMtrans_services svc,
462                                   fabric_conn_data_ptr fcd,
463                                   int length,
464     				  rinfo *request_info)
465 {
466 	struct control_message *msg;
467 	int ret, i, piggyback_prefix_vectors;
468 	int size = sizeof(*msg) + request_info->iovcnt * sizeof(struct remote_entry);
469 	msg = malloc(size);
470 
471 	msg->type = msg_request;
472 	msg->u.req.length = length;
473 	msg->u.req.iovcnt = request_info->iovcnt;
474 	msg->u.req.request_ID = int64_from_ptr(request_info);
475 	msg->u.req.piggyback_length = 0;
476 
477 	piggyback_prefix_vectors = 0;
478 	for (i=0; i < request_info->iovcnt; i++) {
479 	    if (size + request_info->iov[i].iov_len < PIGGYBACK) {
480 		size += request_info->iov[i].iov_len;
481 		piggyback_prefix_vectors++;
482 	    } else {
483 		break;
484 	    }
485 	}
486 	msg->u.req.iovcnt -= piggyback_prefix_vectors;
487 	/* redo size */
488 	size = sizeof(*msg) + (request_info->iovcnt-piggyback_prefix_vectors)
489 	    * sizeof(struct remote_entry);
490 
491 	for (i=0; i < piggyback_prefix_vectors; i++) {
492 	    msg = realloc(msg, size + request_info->iov[i].iov_len);
493 	    memcpy(((char*)msg) + size, request_info->iov[i].iov_base,
494 		   request_info->iov[i].iov_len);
495 	    size += request_info->iov[i].iov_len;
496 	    msg->u.req.piggyback_length += request_info->iov[i].iov_len;
497 	}
498 	/* handle remaining */
499 	for (; i < request_info->iovcnt; i++) {
500 	    int j = i - piggyback_prefix_vectors;
501 	    msg->u.req.read_list[j].remote_addr = int64_from_ptr(request_info->iov[i].iov_base);
502 	    msg->u.req.read_list[j].length = request_info->iov[i].iov_len;
503 	    msg->u.req.read_list[j].rkey = fi_mr_key(request_info->mrlist[i]);
504 	    svc->trace_out(fcd->fabd->cm, "Adding source buffer[%d] %lx, len %d, key %lx\n",
505 		   i, msg->u.req.read_list[j].remote_addr, msg->u.req.read_list[j].length,
506 		   msg->u.req.read_list[j].rkey);
507 	}
508 
509 	svc->trace_out(fcd->fabd->cm, "Doing internal write request, writing %d bytes", size);
510 
511 	memcpy(fcd->send_buf, msg, size);
512 	int sent = 0;
513 	ret = fi_send(fcd->conn_ep, fcd->send_buf, size, fi_mr_desc(fcd->send_mr), 0, &sent);
514 	if (ret) {
515 	    FT_PRINTERR("fi_send", ret);
516 	    return ret;
517 	}
518 
519 	/* Read send queue */
520 	do {
521 	    struct fi_cq_data_entry comp;
522 	    ret = fi_cq_read(fcd->scq, &comp, 1);
523 	    if (ret < 0 && ret != -FI_EAGAIN) {
524 		FT_PRINTERR("fi_cq_read", ret);
525 		cq_readerr(fcd->scq, " in internal write request");
526 		return ret;
527 	    }
528 	    if (ret == 1) handle_scq_completion(&comp);
529 	} while (!sent);
530 	return 0;
531 }
532 
handle_response(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)533 static int handle_response(CMtrans_services svc,
534                            fabric_conn_data_ptr fcd,
535                            struct control_message *msg)
536 {
537 
538     //read back response
539     struct response *rep;
540     rinfo *write_request;
541 
542     rep = &msg->u.resp;
543     write_request = ptr_from_int64(rep->request_ID);
544 
545     if (rep->max_length == -1) {
546 	fprintf(stderr, "WRITE FAILED, request %p\n", write_request);
547 	return -1;
548     }
549 
550     if (write_request->notify_func) {
551 	(write_request->notify_func)( write_request->notify_client_data);
552     }
553     svc->wake_any_pending_write(fcd->conn);
554     return 0;
555 
556 }
557 
558 static mr_list
get_free_mr(fabric_client_data_ptr fabd,long length)559 get_free_mr(fabric_client_data_ptr fabd, long length)
560 {
561     mr_list list, return_item = NULL;
562     pthread_mutex_lock(&fabd->pull_queue_mutex);
563     list = fabd->existing_mr_list;
564     while (list != NULL) {
565 	if ((!list->in_use) && (list->buffer_length >= length)) {
566 	    list->in_use = 1;
567 	    return_item = list;
568 	    break;
569 	}
570 	list = list->next;
571     }
572     pthread_mutex_unlock(&fabd->pull_queue_mutex);
573     return return_item;
574 }
575 
576 static void
add_mr_to_list(fabric_client_data_ptr fabd,mr_list mr_rec)577 add_mr_to_list(fabric_client_data_ptr fabd, mr_list mr_rec)
578 {
579     pthread_mutex_lock(&fabd->pull_queue_mutex);
580     mr_rec->fabd = fabd;
581     mr_rec->next = fabd->existing_mr_list;
582     fabd->existing_mr_list = mr_rec;
583     pthread_mutex_unlock(&fabd->pull_queue_mutex);
584 }
585 
586 static void
free_func(void * mr_item_v)587 free_func(void *mr_item_v)
588 {
589     fabric_client_data_ptr fabd;
590     mr_list list, mr_item = mr_item_v;
591     fabd = mr_item->fabd;
592     pthread_mutex_lock(&fabd->pull_queue_mutex);
593     list = fabd->existing_mr_list;
594     while (list != NULL) {
595 	if (list ==  mr_item_v) {
596 	    list->in_use = 0;
597 	    break;
598 	}
599 	list = list->next;
600     }
601     if (list == NULL) {
602 	fprintf(stderr, "libfabric MR list inconsistency in free_func\n");
603     }
604     pthread_mutex_unlock(&fabd->pull_queue_mutex);
605 }
606 
607 
608 static void
add_to_pull_queue(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)609 add_to_pull_queue(CMtrans_services svc, fabric_conn_data_ptr fcd,
610 	     struct control_message *msg)
611 {
612     int i;
613     struct request *req = &msg->u.req;
614     int header_size = sizeof(*msg) + req->iovcnt * sizeof(struct remote_entry);
615 
616 
617     msg_item_p msg_pull_item = calloc(1, sizeof(struct msg_item));
618     msg_pull_item->message_size = req->length;
619     msg_pull_item->buffer = malloc(req->piggyback_length);
620     msg_pull_item->cur_buffer_size = req->piggyback_length;
621     /* copy portion of msg that was piggybacked */
622     memcpy(msg_pull_item->buffer, (char*)msg + header_size, req->piggyback_length);
623     msg_pull_item->pull_count = req->iovcnt;
624     msg_pull_item->pulls = calloc(req->iovcnt, sizeof(struct pull_record));
625     msg_pull_item->conn_data = fcd;
626     msg_pull_item->request_ID = req->request_ID;
627     for (i = 0; i < msg_pull_item->pull_count; i++) {
628 	msg_pull_item->pulls[i].parent = msg_pull_item;
629 	msg_pull_item->pulls[i].remote = req->read_list[i];
630     }
631     hand_to_pull_thread(svc, fcd->fabd, msg_pull_item);
632 }
633 
634 static int
perform_pull(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)635 perform_pull(CMtrans_services svc, fabric_conn_data_ptr fcd,
636 	     struct control_message *msg)
637 {
638     int ret;
639     int count;
640     int i;
641     char *ptr;
642     int header_size;
643     struct request *req;
644     void *buffer;
645     struct fid_mr *mr;
646     struct mr_list_item *mr_rec;
647     CMbuffer cb;
648     req = &msg->u.req;
649 
650     svc->trace_out(fcd->fabd->cm, "In handle request, len is %ld, iovcnt %d, request_ID %lx, piggyback_length = %d\n",
651 	   req->length, req->iovcnt, req->request_ID, req->piggyback_length);
652     mr_rec = get_free_mr(fcd->fabd, req->length);
653     if (mr_rec == NULL) {
654 	mr_rec = calloc(1, sizeof(*mr_rec));
655 	buffer = malloc(req->length);
656 	mr_rec->buffer_length = req->length;
657 	mr_rec->buffer = buffer;
658 	mr_rec->in_use = 1;
659 	mr_rec->next = NULL;
660 	svc->trace_out(fcd->fabd->cm, "fi_mr_reg, buff %p, size %ld with attrs REMOTE_READ,REMOTE_WRITE, SEND, RECV\n", buffer, req->length);
661 	ret = fi_mr_reg(fcd->fabd->dom, buffer, req->length,
662 			FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV,
663 			0, 0, 0, &mr, NULL);
664 	if(ret) {
665 	    FT_PRINTERR("fi_mr_reg", ret);
666 	    svc->trace_out(fcd->fabd->cm, "Failed to get memory\n");
667 	    internal_write_response(svc, fcd, -1, req->request_ID);
668 	    return ret;
669 	}
670 
671 	mr_rec->mr = mr;
672 	add_mr_to_list(fcd->fabd, mr_rec);
673     } else {
674 	svc->trace_out(fcd->fabd->cm, "Reusing exiting MR buff %p, size %ld\n", mr_rec->buffer, mr_rec->buffer_length);
675 	buffer = mr_rec->buffer;
676 	mr = mr_rec->mr;
677 	mr_rec->in_use = 1;
678     }
679     cb = svc->create_data_and_link_buffer(fcd->fabd->cm, buffer, req->length);
680     cb->return_callback = free_func;
681     cb->return_callback_data = (void*)mr_rec;
682     header_size = sizeof(*msg) + req->iovcnt * sizeof(struct remote_entry);
683     memcpy(cb->buffer, (char*)msg + header_size, req->piggyback_length);
684 
685     ptr = cb->buffer + req->piggyback_length;
686     count = req->iovcnt;
687     for (i=0; i < req->iovcnt; i++) {
688     	struct fi_cq_data_entry comp;
689 	struct pull_record this_pull;
690 	this_pull.status = PULL_SUBMITTED;
691 	int call_count = 0;
692 	svc->trace_out(fcd->fabd->cm, "fi_read, buffer %p, len %d, (keys.rkey %lx, keys.addr %lx)\n", ptr, req->read_list[i].length, req->read_list[i].rkey, req->read_list[i].remote_addr);
693 	ssize_t ret = fi_read(fcd->conn_ep, ptr, req->read_list[i].length, fi_mr_desc(mr),
694 			      0, req->read_list[i].remote_addr, req->read_list[i].rkey, (void*)&this_pull);
695 	ptr += req->read_list[i].length;
696 	do {
697 	    /* Read send queue */
698 	    ret = fi_cq_read(fcd->scq, &comp, 1);
699 	    if (ret < 0 && ret != -FI_EAGAIN) {
700 		FT_PRINTERR("fi_cq_read", ret);
701 		cq_readerr(fcd->scq, "scq");
702 	    }
703 	    call_count++;
704 	    if (ret == 1) handle_scq_completion(&comp);
705 	} while (this_pull.status != PULL_COMPLETED);
706 
707     	if (comp.flags & FI_READ) {
708     	    count--;
709 	}
710     }
711 
712 
713     while (count != 0) {
714     	struct fi_cq_data_entry comp;
715 	ret = fi_cq_read(fcd->scq, &comp, 1);
716 	if (ret == -FI_EAGAIN) {printf("Eagain\n"); continue;}
717 	if (ret < 0 && ret != -FI_EAGAIN) {
718 	    if (ret == -FI_EAVAIL) {
719 		cq_readerr(fcd->scq, "scq");
720 	    } else {
721 		FT_PRINTERR("fi_cq_read", ret);
722 	    }
723 	    continue;
724 	} else if (ret > 0) {
725 	    printf("Successful remote read, Completion size is %ld, data %p\n", comp.len, comp.buf);
726 	}
727 	if (ret == 1) handle_scq_completion(&comp);
728 
729     	if (comp.flags & FI_READ) {
730     	    count--;
731     	}
732     }
733 
734     fcd->read_buf = cb;
735     svc->trace_out(fcd->fabd->cm, "FIrst 16 bytes of receive buffer (len %d) are %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x \n", req->length, ((unsigned char*)cb->buffer)[0], ((unsigned char*)cb->buffer)[1], ((unsigned char*)cb->buffer)[2], ((unsigned char*)cb->buffer)[3], ((unsigned char*)cb->buffer)[4], ((unsigned char*)cb->buffer)[5], ((unsigned char*)cb->buffer)[6], ((unsigned char*)cb->buffer)[7], ((unsigned char*)cb->buffer)[8], ((unsigned char*)cb->buffer)[9], ((unsigned char*)cb->buffer)[10], ((unsigned char*)cb->buffer)[11], ((unsigned char*)cb->buffer)[12], ((unsigned char*)cb->buffer)[13], ((unsigned char*)cb->buffer)[14], ((unsigned char*)cb->buffer)[15]);
736     fcd->read_buffer_len = req->length;
737     svc->trace_out(fcd->fabd->cm, "CMFABRIC handle_request completed");
738     internal_write_response(svc, fcd, req->length, req->request_ID);
739     return 0;
740 }
741 
742 
743 static void
wake_pull_thread(fabric_client_data_ptr fabd)744 wake_pull_thread(fabric_client_data_ptr fabd)
745 {
746     static char buffer = 'W';  /* doesn't matter what we write */
747     if (!fabd->send_waitset) {
748 	if (write(fabd->wake_write_fd, &buffer, 1) != 1) {
749 	    if (fabd->thread_should_run)
750 		printf("Whoops, wake_pull_thread write failed\n");
751 	}
752     } else {
753 	/* send a cq wake */
754     }
755 }
756 
757 static void
hand_to_pull_thread(CMtrans_services svc,fabric_client_data_ptr fabd,msg_item_p msg)758 hand_to_pull_thread(CMtrans_services svc, fabric_client_data_ptr fabd,
759 		    msg_item_p msg)
760 {
761     struct msg_item *queue = fabd->pull_queue;
762     msg->next = NULL;
763     pthread_mutex_lock(&fabd->pull_queue_mutex);
764     if (fabd->pull_queue == NULL) {
765 	fabd->pull_queue = msg;
766     } else {
767 	while (queue->next) queue = queue->next;
768 	queue->next = msg;
769     }
770     pthread_mutex_unlock(&fabd->pull_queue_mutex);
771     /* wake the thread if he's sleeping */
772     wake_pull_thread(fabd);
773 }
774 
775 static void
return_completed_pull(CMtrans_services svc,fabric_client_data_ptr fabd,msg_item_p msg)776 return_completed_pull(CMtrans_services svc, fabric_client_data_ptr fabd,
777 		      msg_item_p msg)
778 {
779     pthread_mutex_lock(&fabd->pull_queue_mutex);
780     /* find msg in pull queue and take it out */
781     if (fabd->pull_queue == msg) {
782 	fabd->pull_queue = msg->next;
783 	msg->next = NULL;
784     } else {
785 	struct msg_item *queue = fabd->pull_queue;
786 	while (queue->next != msg) queue = queue->next;
787 	queue->next = msg->next;
788 	msg->next = NULL;
789     }
790 
791     /* find add msg to completion queue */
792     if (fabd->completed_queue == NULL) {
793 	fabd->completed_queue = msg;
794     } else {
795 	struct msg_item *queue = fabd->completed_queue;
796 	while (queue->next) queue = queue->next;
797 	queue->next = msg;
798     }
799     svc->trace_out(fabd->cm, "Returning completed msg to the CM network thread\n");
800     pthread_mutex_unlock(&fabd->pull_queue_mutex);
801     /*
802      *  this should wake the CM server thread and we'll check the
803      *  pull completion queue in our handler
804      */
805     fabd->svc->wake_comm_thread(fabd->cm);
806 
807 }
808 
809 static void
kill_thread(fabric_client_data_ptr fabd)810 kill_thread(fabric_client_data_ptr fabd)
811 {
812     fabd->thread_should_run = 0;
813     if (fabd->thread_init) wake_pull_thread(fabd);
814     fabd->thread_init = 0;
815 }
816 
817 static void
update_period_base(fabric_client_data_ptr fabd,struct timeval * now)818 update_period_base(fabric_client_data_ptr fabd, struct timeval *now)
819 {
820 
821     struct timeval next;
822     /* add period to base until it's greater than now.
823      * Return base before the last add so that base is just less than now.
824      */
825     timeradd(&fabd->pull_schedule_base, &fabd->pull_schedule_period,
826 	     &next);
827     while(timercmp(&next, now, <)) {
828 	fabd->pull_schedule_base = next;
829 	timeradd(&fabd->pull_schedule_base, &fabd->pull_schedule_period,
830 		 &next);
831     }
832 }
833 
834 static int
in_pull_period(fabric_client_data_ptr fabd,struct timeval * now)835 in_pull_period(fabric_client_data_ptr fabd, struct timeval *now)
836 {
837     int i = 0;
838     int period_count = 0;
839     struct timeval cur_offset, zero = {0,0};
840     timersub(now, &fabd->pull_schedule_base, &cur_offset);
841     update_period_base(fabd, now);
842     while (timercmp(&fabd->avail[period_count].duration, &zero, !=))  {
843 	period_count++;
844     }
845     for (i = 0; i < period_count; i++) {
846 	struct timeval end;
847 	timeradd(&fabd->avail[i].offset, &fabd->avail[i].duration, &end);
848 	if (timercmp(&cur_offset, &end, <) &&
849 	    timercmp(&cur_offset, &fabd->avail[i].offset, >)) return 1;
850     }
851     return 0;
852 }
853 
854 static struct timeval
delay_until_wake(fabric_client_data_ptr fabd,struct timeval * now)855 delay_until_wake(fabric_client_data_ptr fabd, struct timeval *now)
856 {
857     /* calculate the delay until the next pull period opens */
858     int i = 0;
859     int period_count = 0;
860     struct timeval cur_offset, zero = {0,0};
861     timersub(now, &fabd->pull_schedule_base, &cur_offset);
862     update_period_base(fabd, now);
863     while (timercmp(&fabd->avail[period_count].duration, &zero, !=))  {
864 	period_count++;
865     }
866     /* the one we want is the next start time past now */
867     for (i = 0; i < period_count; i++) {
868 	if (timercmp(&fabd->avail[i].offset, &cur_offset, >)) {
869 	    struct timeval ret;
870 	    timersub(&fabd->avail[i].offset, &cur_offset, &ret);
871 	    return ret;
872 	}
873     }
874     /* we must have been past the last offset,
875        return the first offset of the next period */
876     struct timeval ret;
877     timeradd(&fabd->pull_schedule_period, &fabd->avail[0].offset, &ret);
878     timersub(&ret, &cur_offset, &ret);
879     return ret;
880 }
881 
882 static void
add_completion_fd(fabric_client_data_ptr fabd,fabric_conn_data_ptr fcd)883 add_completion_fd(fabric_client_data_ptr fabd, fabric_conn_data_ptr fcd)
884 {
885     if (fcd->sfd > fabd->nfds) {
886 	fabd->fcd_array_by_sfd = realloc(fabd->fcd_array_by_sfd,
887 					 sizeof(fabd->fcd_array_by_sfd[0]) * fcd->sfd+1);
888 	fabd->nfds = fcd->sfd;
889     }
890     fabd->fcd_array_by_sfd[fcd->sfd] = fcd;
891     fabd->cq_array = realloc(fabd->cq_array,
892 			     sizeof(fabd->cq_array[0]) * (fabd->ncqs+1));
893     fabd->cq_array[fabd->ncqs] = &fcd->scq->fid;
894     fabd->ncqs++;
895     FD_SET(fcd->sfd, &fabd->readset);
896 }
897 
898 
899 static void
remove_completion_fd(fabric_client_data_ptr fabd,fabric_conn_data_ptr fcd)900 remove_completion_fd(fabric_client_data_ptr fabd, fabric_conn_data_ptr fcd)
901 {
902     int i;
903     for (i = 0; i < fabd->ncqs; i++) {
904 	if (fabd->cq_array[i] == &fcd->scq->fid) {
905 	    memcpy(&fabd->cq_array[i+1], &fabd->cq_array[i],
906 		   sizeof(fabd->cq_array[0]) * (fabd->ncqs - i - 1));
907 	}
908     }
909     fabd->ncqs--;
910     FD_CLR(fcd->sfd, &fabd->readset);
911 }
912 
913 #include <poll.h>
914 
915 static int
can_do_something(fabric_client_data_ptr fabd)916 can_do_something(fabric_client_data_ptr fabd)
917 {
918     CMtrans_services svc = fabd->svc;
919     int did_something = 0;
920     int i;
921     msg_item_p pull = fabd->pull_queue;
922     while (pull) {
923 	msg_item_p next = pull->next;
924 	fabric_conn_data_ptr fcd = pull->conn_data;
925 	svc->trace_out(fabd->cm, "Got a pull message with buffer count %d, next %p\n",
926 		       pull->pull_count, pull->next);
927 	if (!pull->mr_rec) {
928 	    int ret;
929 	    /* try to get a mr */
930 	    mr_list mr_rec = get_free_mr(fcd->fabd, pull->message_size);
931 	    char *buffer = NULL;
932 	    if (mr_rec == NULL) {
933 		if (0 /*!allow_register_new_mr(fcd, pull->message_size)*/) {
934 		    pull = pull->next;
935 		    continue;
936 		}
937 		mr_rec = calloc(1, sizeof(*mr_rec));
938 		buffer = malloc(pull->message_size);
939 		mr_rec->buffer_length = pull->message_size;
940 		mr_rec->buffer = buffer;
941 		mr_rec->in_use = 1;
942 		mr_rec->next = NULL;
943 		svc->trace_out(fcd->fabd->cm, "fi_mr_reg, buff %p, size %ld with attrs REMOTE_READ,REMOTE_WRITE, SEND, RECV\n", buffer, pull->message_size);
944 		ret = fi_mr_reg(fcd->fabd->dom, buffer, pull->message_size,
945 				FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV,
946 				0, 0, 0, &mr_rec->mr, NULL);
947 		if(ret) {
948 		    FT_PRINTERR("fi_mr_reg", ret);
949 		    svc->trace_out(fcd->fabd->cm, "Failed to get memory\n");
950 		    internal_write_response(svc, fcd, -1, pull->request_ID);
951 		    return ret;
952 		}
953 		pull->mr = mr_rec->mr;
954 		pull->mr_rec = mr_rec;
955 		add_mr_to_list(fcd->fabd, mr_rec);
956 	    } else {
957 		svc->trace_out(fcd->fabd->cm, "Reusing exiting MR buff %p, size %ld\n", mr_rec->buffer, mr_rec->buffer_length);
958 		mr_rec->in_use = 1;
959 		pull->mr_rec = mr_rec;
960 		buffer = mr_rec->buffer;
961 		pull->mr = mr_rec->mr;
962 	    }
963 	    /* piggyback part of msg is in old pull->buffer */
964 	    memcpy(buffer, pull->buffer, pull->cur_buffer_size);
965 	    free(pull->buffer);
966 	    pull->buffer = buffer;
967 
968 	    /* setup local buffers, now that we have memory */
969 	    char *ptr = pull->buffer + pull->cur_buffer_size;
970 	    pull->cur_buffer_size = pull->message_size;
971 	    for ( i = 0; i < pull->pull_count; i++) {
972 		pull->pulls[i].dest = ptr;
973 		ptr += pull->pulls[i].remote.length;
974 	    }
975 	    pull->cur_buffer_size = mr_rec->buffer_length;
976 	    did_something = 1;
977 	}
978 	/* at this point we should have an MR and mr_rec */
979 	for ( i = 0; i < pull->pull_count; i++) {
980 	    if (pull->pulls[i].status == PULL_UNSUBMITTED) {
981 
982 //		struct fi_cq_data_entry comp;
983 		struct remote_entry *remote = &pull->pulls[i].remote;
984 		struct pull_record *this_pull  = &pull->pulls[i];
985 		if (0 /* can't do more pulls for some reason */) {
986 		    continue;
987 		}
988 
989 		svc->trace_out(fcd->fabd->cm, "fi_read, buffer %p, len %d, (keys.rkey %lx, keys.addr %lx)\n", this_pull->dest, remote->length, remote->rkey, remote->remote_addr);
990 //		struct pollfd poll_list[1];
991 //		poll_list[0].fd = fcd->sfd;
992 //		poll_list[0].events = POLLIN|POLLPRI;
993 //		int pret = poll(&poll_list[0], (unsigned long)1, 0);
994 //		printf("Before read poll list ret %d, revents = %x\n", pret, poll_list[0].revents);
995 		ssize_t ret = fi_read(fcd->conn_ep, this_pull->dest,
996 				      remote->length, fi_mr_desc(pull->mr),
997 				      0, remote->remote_addr, remote->rkey,
998 				      (void*)this_pull);
999 		if (ret) continue;
1000 		this_pull->status = PULL_SUBMITTED;
1001 		did_something = 1;
1002 
1003 		add_completion_fd(fabd, fcd);
1004 /* 		do { */
1005 /* 		    /\* Read send queue *\/ */
1006 /* 		    ret = fi_cq_read(fcd->scq, &comp, 1); */
1007 /* 		    if (ret < 0 && ret != -FI_EAGAIN) { */
1008 /* 			FT_PRINTERR("fi_cq_read", ret); */
1009 /* 			cq_readerr(fcd->scq, "scq"); */
1010 /* 		    } */
1011 /* //		    pret = poll(&poll_list[0], (unsigned long)1, 0); */
1012 /* //		    printf("AFTER read poll list ret %d, revents = %x\n", pret, poll_list[0].revents); */
1013 /* 		} while (ret == -FI_EAGAIN); */
1014 /* 		svc->trace_out(fcd->fabd->cm, "FI_READ Completion op_context is %p, flags %lx, len %ld, buf %p, data %lx\n", */
1015 /* 		       comp.op_context, comp.flags, comp.len, comp.buf, comp.data); */
1016 /* 		if (ret == 1) { */
1017 /* 		    printf("Immediate completion\n"); */
1018 /* 		    handle_scq_completion(&comp); */
1019 /* 		} */
1020 	    }
1021 	}
1022 	/* if we get to this point, all pulls must have been submitted. */
1023 	if (pull->mr_rec) {
1024 	    int all_done = 1;
1025 	    for ( i = 0; i < pull->pull_count; i++) {
1026 		if (pull->pulls[i].status == PULL_COMPLETED) {
1027 		    pull->pulls[i].status = PULL_ACCOUNTED;
1028 		}
1029 		if (pull->pulls[i].status != PULL_ACCOUNTED) {
1030 		    all_done = 0;
1031 		}
1032 	    }
1033 	    if (all_done) {
1034 		remove_completion_fd(fabd, fcd);
1035 		return_completed_pull(svc, fcd->fabd, pull);
1036 	    }
1037 	}
1038 	pull = next;
1039     }
1040     if (!did_something) svc->trace_out(fabd->cm, "PULL_THREAD Nothing to do\n");
1041     return did_something;
1042 }
1043 
1044 static void
handle_completions(fabric_client_data_ptr fabd,fd_set * readset)1045 handle_completions(fabric_client_data_ptr fabd, fd_set *readset)
1046 {
1047     /* everything left should be scq completions */
1048     int i;
1049     if (readset) {
1050 	for (i=0; i < fabd->nfds; i++) {
1051 	    if (FD_ISSET(i, readset)) {
1052 		int ret;
1053 		struct fi_cq_data_entry comp;
1054 		/* Read send queue */
1055 		fabric_conn_data_ptr fcd = fabd->fcd_array_by_sfd[i];
1056 		ret = fi_cq_read(fcd->scq, &comp, 1);
1057 		if (ret < 0 && ret != -FI_EAGAIN) {
1058 		    FT_PRINTERR("fi_cq_read", ret);
1059 		    cq_readerr(fcd->scq, "scq");
1060 		}
1061 		if (ret == 1) {
1062 		    handle_scq_completion(&comp);
1063 		}
1064 	    }
1065 	}
1066     } else {
1067 	for (i=0; i < fabd->ncqs; i++) {
1068 	    int ret;
1069 	    struct fi_cq_data_entry comp;
1070 	    /* Read send queue */
1071 	    ret = fi_cq_read((struct fid_cq *) fabd->cq_array[i], &comp, 1);
1072 	    if (ret < 0 && ret != -FI_EAGAIN) {
1073 		FT_PRINTERR("fi_cq_read", ret);
1074 		cq_readerr((struct fid_cq *)fabd->cq_array[i], "scq");
1075 	    }
1076 	    if (ret == 1) {
1077 		handle_scq_completion(&comp);
1078 	    }
1079 	}
1080     }
1081 }
1082 
1083 
1084 static void
pull_thread(fabric_client_data_ptr fabd)1085 pull_thread(fabric_client_data_ptr fabd)
1086 {
1087 //    thread_setup();  /* extract wait fd, other? */
1088 //    CMtrans_services svc = fabd->svc;
1089     while(fabd->thread_should_run) {
1090 	struct timeval now, delay;
1091 	fd_set readset;
1092 	gettimeofday(&now, NULL);
1093 	if (in_pull_period(fabd, &now)) {
1094 	    while (can_do_something(fabd));
1095         }
1096 	/* calculate time to next wake */
1097 	/* sleep until something happens, *or* the next pull period begins */
1098 	delay = delay_until_wake(fabd, &now);
1099 	readset = fabd->readset;
1100 	if (fabd->ncqs == 0) {
1101 	    /* we're just waiting on wake fd */
1102 	    select(fabd->nfds+1, &readset, NULL, NULL, &delay);
1103 	} else if (fi_trywait(fabd->fab, &fabd->cq_array[0], fabd->ncqs) == FI_SUCCESS) {
1104 	    select(fabd->nfds+1, &readset, NULL, NULL, &delay);
1105 	    if (FD_ISSET(fabd->wake_read_fd, &readset)) {
1106 		/* read and discard wake byte */
1107 		char buffer;
1108 		if (read(fabd->wake_read_fd, &buffer, 1) != 1) {
1109 		    perror("wake read failed\n");
1110 		}
1111 		FD_CLR(fabd->wake_read_fd, &readset);
1112 	    }
1113 	    handle_completions(fabd, &readset);
1114 	} else {
1115 	    /* test all CQs */
1116 	    handle_completions(fabd, NULL);
1117 	}
1118     }
1119 //    thread_free_resources;
1120 }
1121 
1122 /*
1123  * handle a pull request message
1124  */
handle_request(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)1125 static void handle_request(CMtrans_services svc,
1126                            fabric_conn_data_ptr fcd,
1127                            struct control_message *msg)
1128 {
1129     //handling the request message
1130     if (fcd->fabd->avail) {
1131 	add_to_pull_queue(svc, fcd, msg);
1132 	wake_pull_thread(fcd->fabd);
1133     } else {
1134 	/* immediate pull and handle */
1135 	(void) perform_pull(svc, fcd, msg);
1136     }
1137 }
1138 
cq_readerr(struct fid_cq * cq,char * cq_str)1139 void cq_readerr(struct fid_cq *cq, char *cq_str)
1140 {
1141 	struct fi_cq_err_entry cq_err;
1142 	const char *err_str;
1143 	int ret;
1144 
1145 	ret = fi_cq_readerr(cq, &cq_err, 0);
1146 	if (ret < 0)
1147 		FT_PRINTERR("fi_cq_readerr", ret);
1148 
1149 	err_str = fi_cq_strerror(cq, cq_err.prov_errno, cq_err.err_data, NULL, 0);
1150 	fprintf(stderr, "%s %s (%d)\n", cq_str, err_str, cq_err.prov_errno);
1151 }
1152 
1153 void
CMFABRIC_data_available(transport_entry trans,CMConnection conn)1154 CMFABRIC_data_available(transport_entry trans, CMConnection conn)
1155 {
1156 	fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1157 	CMtrans_services svc = fabd->svc;
1158 	struct control_message *msg;
1159 	fabric_conn_data_ptr fcd;
1160 	int ret, call_data_available;
1161 	CMbuffer CMbuffer_to_return = NULL;
1162 	struct fid **fids = malloc(sizeof(fids[0]));
1163 	fcd = (fabric_conn_data_ptr) svc->get_transport_data(conn);
1164 	fids[0] = &fcd->rcq->fid;
1165 	fabd->trans = trans;
1166 
1167 	svc->trace_out(fabd->cm, "At the beginning of CMFabric_data_available: ");
1168 	ret = fi_trywait(fcd->fabd->fab, fids, 1);
1169 	switch (ret) {
1170 	case FI_SUCCESS:
1171 	    svc->trace_out(fabd->cm, "Try wait on rcq returned FI_SUCCESS");
1172 	    break;
1173 	case -FI_EAGAIN:
1174 	    svc->trace_out(fabd->cm, "Try wait on rcq returned FI_EAGAIN, read cq events");
1175 	    break;
1176 	default:
1177 	    svc->trace_out(fabd->cm, "Try wait on rcq returned %d", ret);
1178 	}
1179 	{
1180 	    struct fi_cq_data_entry comp;
1181 		ret = fi_cq_read(fcd->rcq, &comp, 1);
1182 		if (ret == -FI_EAGAIN) {
1183 		    return;
1184 		}
1185 		if (ret < 0 && ret != -FI_EAGAIN) {
1186 			if (ret == -FI_EAVAIL) {
1187 				cq_readerr(fcd->rcq, "rcq");
1188 			} else {
1189 				FT_PRINTERR("fi_cq_read", ret);
1190 				return;
1191 			}
1192 			return;
1193 		} else if (ret > 0) {
1194 		    //
1195 		}
1196 		svc->trace_out(fabd->cm, "FI_RECV Completion op_context is %p, flags %lx, len %ld, buf %p, data %lx\n",
1197 		       comp.op_context, comp.flags, comp.len, comp.buf, comp.data);
1198 	}
1199 
1200 	fcd = (fabric_conn_data_ptr) svc->get_transport_data(conn);
1201 	msg = (struct control_message *) fcd->mapped_recv_buf;
1202 	svc->trace_out(fcd->fabd->cm, "CMFABRIC data available type = %s(%d)",
1203 		       msg_string[msg->type], msg->type);
1204 
1205 	call_data_available = 0;
1206 	CMbuffer_to_return = NULL;
1207 	switch(msg->type) {
1208 	case msg_piggyback: {
1209 	    	int offset = msg_offset();
1210 		fcd->read_buffer_len = msg->u.pb.total_length - offset;
1211 		svc->trace_out(fcd->fabd->cm, "CMFABRIC received piggyback msg of length %d, added to read_buffer",
1212 			       fcd->read_buffer_len);
1213 
1214 		fcd->read_buf = fcd->fabd->svc->get_data_buffer(trans->cm, fcd->read_buffer_len);
1215 		memcpy(fcd->read_buf->buffer, &msg->u.pb.body[0], fcd->read_buffer_len);
1216 		fcd->read_offset = 0;
1217 
1218 		CMbuffer_to_return = fcd->read_buf;
1219 		call_data_available = 1;
1220 		break;
1221 	}
1222 	case msg_response:
1223 		handle_response(svc, fcd, msg);
1224 		break;
1225 	case msg_request:
1226 		handle_request(svc, fcd, msg);
1227 		call_data_available = 1;
1228 		CMbuffer_to_return = fcd->read_buf;
1229 		break;
1230 	default:
1231 		printf("Bad message type %d\n", msg->type);
1232 	}
1233 	/* post the next receive before relinquishing control */
1234 	ret = fi_recv(fcd->conn_ep, fcd->mapped_recv_buf, fcd->buffer_size, fi_mr_desc(fcd->read_mr), 0, fcd->mapped_recv_buf);
1235 	if (ret)
1236 		FT_PRINTERR("fi_recv", ret);
1237 
1238 	if (call_data_available) {
1239 	    trans->data_available(trans, conn);
1240 	}
1241 	if (CMbuffer_to_return) {
1242 	    svc->return_data_buffer(trans->cm, CMbuffer_to_return);
1243 	}
1244 	svc->trace_out(fcd->fabd->cm, "CMFABRIC data_available returning");
1245 }
1246 
1247 static int server_connect(fabric_conn_data_ptr fcd);
1248 
1249 /*
1250  * Accept socket connection
1251  */
1252 static void
fabric_accept_conn(void * void_trans,void * void_conn_sock)1253 fabric_accept_conn(void *void_trans, void *void_conn_sock)
1254 {
1255     transport_entry trans = (transport_entry) void_trans;
1256     fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1257     CMtrans_services svc = fabd->svc;
1258     fabric_conn_data_ptr fcd;
1259     int fd, ret;
1260     struct sockaddr sock_addr;
1261     unsigned int sock_len = sizeof(sock_addr);
1262 
1263     CMConnection conn;
1264     attr_list conn_attr_list = NULL;
1265 
1266     //ib stuff
1267     fcd = create_fabric_conn_data(svc);
1268     fcd->fabd = fabd;
1269 
1270     server_connect(fcd);
1271     //initialize the dataqp that will be used for all RC comms
1272 
1273     conn_attr_list = create_attr_list();
1274     conn = svc->connection_create(trans, fcd, conn_attr_list);
1275     fcd->conn = conn;
1276 
1277     sock_len = sizeof(sock_addr);
1278     memset(&sock_addr, 0, sock_len);
1279 //    getsockname(sock, (struct sockaddr *) &sock_addr, &sock_len);
1280 //    int_port_num = ntohs(((struct sockaddr_in *) &sock_addr)->sin_port);
1281 //    add_attr(conn_attr_list, CM_THIS_CONN_PORT, Attr_Int4,
1282 //	     (attr_value) (long)int_port_num);
1283 
1284     memset(&sock_addr, 0, sizeof(sock_addr));
1285     sock_len = sizeof(sock_addr);
1286     /* if (getpeername(sock, &sock_addr, &sock_len) == 0) { */
1287     /* 	int_port_num = ntohs(((struct sockaddr_in *) &sock_addr)->sin_port); */
1288     /* 	add_attr(conn_attr_list, CM_PEER_CONN_PORT, Attr_Int4, */
1289     /* 		 (attr_value) (long)int_port_num); */
1290     /* 	fcd->remote_IP = ntohl(((struct sockaddr_in *) &sock_addr)->sin_addr.s_addr); */
1291     /* 	add_attr(conn_attr_list, CM_PEER_IP, Attr_Int4, */
1292     /* 		 (attr_value) (long)fcd->remote_IP); */
1293     /* 	if (sock_addr.sa_family == AF_INET) { */
1294     /* 	    struct hostent *host; */
1295     /* 	    struct sockaddr_in *in_sock = (struct sockaddr_in *) &sock_addr; */
1296     /* 	    host = gethostbyaddr((char *) &in_sock->sin_addr, */
1297     /* 				 sizeof(struct in_addr), AF_INET); */
1298     /* 	    if (host != NULL) { */
1299     /* 		fcd->remote_host = strdup(host->h_name); */
1300     /* 		add_attr(conn_attr_list, CM_PEER_HOSTNAME, Attr_String, */
1301     /* 			 (attr_value) strdup(host->h_name)); */
1302     /* 	    } */
1303     /* 	} */
1304     /* } */
1305     if (fcd->remote_host != NULL) {
1306 	svc->trace_out(fabd->cm, "Accepted CMFABRIC socket connection from host \"%s\"",
1307 		       fcd->remote_host);
1308     } else {
1309 	svc->trace_out(fabd->cm, "Accepted CMFABRIC socket connection from UNKNOWN host");
1310     }
1311 
1312 
1313     if ((ret = fi_control (&fcd->rcq->fid, FI_GETWAIT, (void *) &fd))) {
1314 	FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1315     }
1316     add_attr(conn_attr_list, CM_FD, Attr_Int4,
1317 	     (attr_value) (long)fd);
1318 
1319     svc->trace_out(fabd->cm, "Cmfabric Adding trans->data_available as action on fd %d", fd);
1320     svc->fd_add_select(fabd->cm, fd, (select_list_func) CMFABRIC_data_available,
1321 		       (void *) trans, (void *) conn);
1322 
1323     svc->trace_out(fabd->cm, "Falling out of accept conn\n");
1324     free_attr_list(conn_attr_list);
1325     fcd->fd = fd;
1326     if ((ret = fi_control (&fcd->scq->fid, FI_GETWAIT, (void *) &fcd->sfd))) {
1327 	FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1328     }
1329 }
1330 
1331 /*
1332  * incoming event on CM eq
1333  */
1334 static void
fabric_service_incoming(void * void_trans,void * void_eq)1335 fabric_service_incoming(void *void_trans, void *void_eq)
1336 {
1337     transport_entry trans = (transport_entry) void_trans;
1338     fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1339     struct fi_eq_cm_entry entry;
1340     uint32_t event;
1341     ssize_t rd;
1342 
1343     rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, FI_PEEK);
1344     if (rd != sizeof entry) {
1345 	if (rd == -FI_EAVAIL) {
1346 	    struct fi_eq_err_entry error = {0};
1347 	    int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
1348 	    if (rc) {
1349 		char buf[1024];
1350 		fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
1351       error.err_data, buf, 1024));
1352 	    }
1353 	} else {
1354 	    FT_PRINTERR("fi_eq_sread", rd);
1355 	}
1356 	return;
1357     }
1358 
1359     if (event == FI_CONNREQ) {
1360 	fabric_accept_conn(void_trans, void_eq);
1361     } else {
1362 	rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
1363 	if (event == FI_SHUTDOWN){
1364 	    fabd->svc->trace_out(fabd->cm, "CMFABRIC got a shutdown event for some conn, who knows which one?\n");
1365 	} else {
1366 	    printf("Unexpected event in service incoming,%s %d\n", fi_tostr(&event, FI_TYPE_EQ_EVENT), event);
1367 	}
1368     }
1369 }
1370 
1371 extern void
1372 libcmfabric_LTX_shutdown_conn(svc, fcd)
1373 	CMtrans_services svc;
1374 fabric_conn_data_ptr fcd;
1375 {
1376 	svc->trace_out(fcd->fabd->cm, "CMFABRIC shutdown_conn, removing select %d\n",
1377 	               fcd->fd);
1378 	svc->fd_remove_select(fcd->fabd->cm, fcd->fd);
1379 	close(fcd->fd);
1380 	//free(fcd->remote_host);
1381 	//free(fcd->read_buffer);
1382 	if (fcd->last_write.iov) free(fcd->last_write.iov);
1383 	free(fcd);
1384 }
1385 
1386 
client_connect(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs,fabric_conn_data_ptr fcd)1387 static int client_connect(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs, fabric_conn_data_ptr fcd)
1388 {
1389     fabric_client_data_ptr fabd = fcd->fabd;
1390     struct fi_eq_cm_entry entry;
1391     uint32_t event;
1392     struct fi_info *fi;
1393     ssize_t rd;
1394     int ret, int_port_num;
1395     struct in_addr dest_ip;
1396     char *host_name, *host_rep = NULL;
1397     char *dst_port = NULL;
1398     int i;
1399 
1400     /* Get fabric info */
1401     if (!get_int_attr(attrs, CM_IP_ADDR,(int*) & dest_ip.s_addr)) {
1402 	svc->trace_out(cm, "CMFABRIC transport found no IP_ADDR attribute");
1403     } else {
1404 	host_rep = malloc(16);
1405 	dest_ip.s_addr = htonl(dest_ip.s_addr);
1406 	sprintf(host_rep, "%s", inet_ntoa(dest_ip));
1407 
1408     }
1409     if (!get_int_attr(attrs, CM_IP_PORT, (int*) & int_port_num)) {
1410 	svc->trace_out(cm, "CMFABRIC transport found no IP_PORT attribute");
1411     } else {
1412 	dst_port = malloc(10);
1413 	sprintf(dst_port, "%d", int_port_num);
1414     }
1415     svc->trace_out(fabd->cm, "Connecting to addr, %s, port %s\n", host_rep, dst_port);
1416     if (!get_string_attr(attrs, CM_IP_HOSTNAME, &host_name)) {
1417 	svc->trace_out(cm, "CMFABRIC transport found no IP_HOSTNAME attribute");
1418     } else {
1419       host_rep = malloc(strlen(host_name));
1420       for (i = 0; i < (strlen(host_name)/2); i++) {
1421 	sscanf(&host_name[i*2], "%2hhx", &host_rep[i]);
1422       }
1423       /* printf("name len is %d\n", (int)strlen(host_name)/2); */
1424       /* for(i = 0; i < strlen(host_name)/2; i++) { */
1425       /* 	printf("%02x", (unsigned char) host_rep[i]); */
1426       /* } */
1427       /* printf(" done\n"); */
1428     }
1429     ret = fi_getinfo(FT_FIVERSION, host_rep, dst_port, 0, fabd->hints, &fi);
1430     svc->trace_out(cm, "%s return value fi is %s\n", "client", fi_tostr(fi, FI_TYPE_INFO));
1431     if (ret) {
1432 	FT_PRINTERR("fi_getinfo", ret);
1433 	goto err0;
1434     }
1435 
1436     /* Open fabric */
1437     ret = fi_fabric(fi->fabric_attr, &fabd->fab, NULL);
1438     if (ret) {
1439 	FT_PRINTERR("fi_fabric", ret);
1440 	goto err1;
1441     }
1442 
1443     /* Open domain */
1444     ret = fi_domain(fabd->fab, fi, &fabd->dom, NULL);
1445     if (ret) {
1446 	FT_PRINTERR("fi_domain", ret);
1447 	goto err2;
1448     }
1449 
1450     ret = alloc_cm_res(fabd);
1451     if (ret)
1452 	goto err4;
1453 
1454     ret = alloc_ep_res(fcd, fi);
1455     if (ret)
1456 	goto err5;
1457 
1458     ret = bind_ep_res(fcd);
1459     if (ret)
1460 	goto err6;
1461 
1462     /* Connect to server */
1463     ret = fi_connect(fcd->conn_ep, fi->dest_addr, NULL, 0);
1464     if (ret) {
1465 	FT_PRINTERR("fi_connect", ret);
1466 	goto err6;
1467     }
1468 
1469     /* Wait for the connection to be established */
1470     rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
1471     if (rd != sizeof entry) {
1472 	if (ret == -FI_EAVAIL) {
1473 	    struct fi_eq_err_entry error = {0};
1474 	    int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
1475 	    if (rc) {
1476 		char buf[1024];
1477 		fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
1478       error.err_data, buf, 1024));
1479 	    }
1480 	} else {
1481 	    FT_PRINTERR("fi_eq_sread", rd);
1482 	}
1483 	goto err6;
1484     }
1485 
1486     if (event != FI_CONNECTED || entry.fid != &fcd->conn_ep->fid) {
1487 	FT_ERR("Unexpected CM event %d fid %p (ep %p)\n", event, entry.fid, fcd->conn_ep);
1488 	ret = -FI_EOTHER;
1489 	goto err6;
1490     }
1491 
1492     fi_freeinfo(fi);
1493     return 0;
1494 
1495 err6:
1496     free_ep_res(fcd);
1497 err5:
1498     fi_close(&fabd->cmeq->fid);
1499 err4:
1500     fi_close(&fabd->dom->fid);
1501 err2:
1502     fi_close(&fabd->fab->fid);
1503 err1:
1504     fi_freeinfo(fi);
1505 err0:
1506     return ret;
1507 }
1508 
1509 static int
initiate_conn(cm,svc,trans,attrs,fcd,conn_attr_list,no_more_redirect)1510 initiate_conn(cm, svc, trans, attrs, fcd, conn_attr_list, no_more_redirect)
1511 	CManager cm;
1512 CMtrans_services svc;
1513 transport_entry trans;
1514 attr_list attrs;
1515 fabric_conn_data_ptr fcd;
1516 attr_list conn_attr_list;
1517 int no_more_redirect;
1518 {
1519 	int int_port_num;
1520 	fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1521 	char *host_name;
1522 	int remote_IP = -1;
1523 	static int host_ip = 0;
1524 
1525 	//fabric stuff
1526 
1527 	if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
1528 	                /* value pointer */ (attr_value *)(long) & host_name)) {
1529 		svc->trace_out(cm, "CMFABRIC transport found no IP_HOST attribute");
1530 		host_name = NULL;
1531 	} else {
1532 		svc->trace_out(cm, "CMFABRIC transport connect to host %s", host_name);
1533 	}
1534 	if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
1535 	                /* value pointer */ (attr_value *)(long) & host_ip)) {
1536 		svc->trace_out(cm, "CMFABRIC transport found no IP_ADDR attribute");
1537 		/* wasn't there */
1538 		host_ip = 0;
1539 	} else {
1540 		svc->trace_out(cm, "CMFABRIC transport connect to host_IP %lx", host_ip);
1541 	}
1542 
1543 	if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
1544 	                /* value pointer */ (attr_value *)(long) & int_port_num)) {
1545 		svc->trace_out(cm, "CMFABRIC transport found no IP_PORT attribute");
1546 //		return -1;
1547 	} else {
1548 		svc->trace_out(cm, "CMFABRIC transport connect to port %d", int_port_num);
1549 	}
1550 
1551 	client_connect(cm, svc, trans, attrs, fcd);
1552 
1553 
1554 //here we write out the connection port to the other side.
1555 //for sockets thats all thats required. For IB we can use this to exchange information about the
1556 //IB parameters for the other side
1557 
1558 	svc->trace_out(cm, "--> Connection established");
1559 	fcd->remote_host = host_name == NULL ? NULL : strdup(host_name);
1560 	fcd->remote_IP = remote_IP;
1561 	fcd->remote_contact_port = int_port_num;
1562 	fcd->fd = 0;
1563 	fcd->fabd = fabd;
1564 
1565 	memset(&fcd->last_write, 0, sizeof(rinfo));
1566 	fcd->infocount = 0;
1567 
1568 
1569 
1570 	add_attr(conn_attr_list, CM_THIS_CONN_PORT, Attr_Int4,
1571 	         (attr_value) (long)int_port_num);
1572 	add_attr(conn_attr_list, CM_PEER_IP, Attr_Int4,
1573 	         (attr_value) (long)fcd->remote_IP);
1574 /*	if (getpeername(sock, &sock_addr.s, &sock_len) == 0) {
1575 		int_port_num = ntohs(((struct sockaddr_in *) &sock_addr)->sin_port);
1576 		add_attr(conn_attr_list, CM_PEER_CONN_PORT, Attr_Int4,
1577 		         (attr_value) (long)int_port_num);
1578 		if (sock_addr.s.sa_family == AF_INET) {
1579 			struct hostent *host;
1580 			struct sockaddr_in *in_sock = (struct sockaddr_in *) &sock_addr;
1581 			host = gethostbyaddr((char *) &in_sock->sin_addr,
1582 			                     sizeof(struct in_addr), AF_INET);
1583 			if (host != NULL) {
1584 				fcd->remote_host = strdup(host->h_name);
1585 				add_attr(conn_attr_list, CM_PEER_HOSTNAME, Attr_String,
1586 				         (attr_value) strdup(host->h_name));
1587 			}
1588 		}
1589 	}
1590 */
1591 	svc->trace_out(fabd->cm, "Falling out of init conn\n");
1592 	return 1;
1593 }
1594 
1595 /*
1596  * Initiate a socket connection with another data exchange.  If port_num is -1,
1597  * establish a unix socket connection (name_str stores the file name of
1598  * the waiting socket).  Otherwise, establish an INET socket connection
1599  * (name_str stores the machine name).
1600  */
1601 extern CMConnection
1602 libcmfabric_LTX_initiate_conn(cm, svc, trans, attrs)
1603 	CManager cm;
1604 CMtrans_services svc;
1605 transport_entry trans;
1606 attr_list attrs;
1607 {
1608     fabric_conn_data_ptr fcd = create_fabric_conn_data(svc);
1609     attr_list conn_attr_list = create_attr_list();
1610     CMConnection conn;
1611     int fd, ret;
1612 
1613     fcd->fabd = trans->trans_data;
1614 
1615     if (initiate_conn(cm, svc, trans, attrs, fcd, conn_attr_list, 0) < 0)
1616 	return NULL;
1617 
1618     add_attr(conn_attr_list, CM_PEER_LISTEN_PORT, Attr_Int4,
1619 	     (attr_value) (long)fcd->remote_contact_port);
1620     conn = svc->connection_create(trans, fcd, conn_attr_list);
1621     fcd->conn = conn;
1622 
1623     if ((ret = fi_control (&fcd->rcq->fid, FI_GETWAIT, (void *) &fd))) {
1624 	FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1625     }
1626     svc->trace_out(cm, "Cmfabric Adding trans->data_available as action on fd %d", fd);
1627     svc->fd_add_select(cm, fd, (select_list_func) CMFABRIC_data_available,
1628 		       (void *) trans, (void *) conn);
1629 
1630     fcd->fd = fd;
1631     if ((ret = fi_control (&fcd->scq->fid, FI_GETWAIT, (void *) &fcd->sfd))) {
1632 	FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1633     }
1634     return conn;
1635 }
1636 
1637 /*
1638  * Check to see that if we were to attempt to initiate a connection as
1639  * indicated by the attribute list, would we be connecting to ourselves?
1640  * For sockets, this involves checking to see if the host name is the
1641  * same as ours and if the IP_PORT matches the one we are listening on.
1642  */
1643 extern int
libcmfabric_LTX_self_check(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs)1644 libcmfabric_LTX_self_check(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs)
1645 {
1646 
1647     fabric_client_data_ptr fd = trans->trans_data;
1648     int host_addr;
1649     int int_port_num;
1650     char *host_name;
1651     char my_host_name[256];
1652     static int IP = 0;
1653 
1654     get_IP_config(my_host_name, sizeof(host_name), &IP, NULL, NULL, NULL,
1655 		  NULL, svc->trace_out, (void *)cm);
1656 
1657     if (IP == 0) {
1658 	if (IP == 0) IP = INADDR_LOOPBACK;
1659     }
1660     if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
1661 		    /* value pointer */ (attr_value *)(long) & host_name)) {
1662 	svc->trace_out(cm, "CMself check CMFABRIC transport found no IP_HOST attribute");
1663 	host_name = NULL;
1664     }
1665     if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
1666 		    /* value pointer */ (attr_value *)(long) & host_addr)) {
1667 	svc->trace_out(cm, "CMself check CMFABRIC transport found no IP_ADDR attribute");
1668 	if (host_name == NULL) return 0;
1669 	host_addr = 0;
1670     }
1671     if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
1672 		    /* value pointer */ (attr_value *)(long) & int_port_num)) {
1673 	svc->trace_out(cm, "CMself check CMFABRIC transport found no IP_PORT attribute");
1674 	return 0;
1675     }
1676     if (host_name && (strcmp(host_name, my_host_name) != 0)) {
1677 	svc->trace_out(cm, "CMself check - Hostnames don't match");
1678 	return 0;
1679     }
1680     if (host_addr && (IP != host_addr)) {
1681 	svc->trace_out(cm, "CMself check - Host IP addrs don't match, %lx, %lx", IP, host_addr);
1682 	return 0;
1683     }
1684     if (int_port_num != fd->listen_port) {
1685 	svc->trace_out(cm, "CMself check - Ports don't match, %d, %d", int_port_num, fd->listen_port);
1686 	return 0;
1687     }
1688     svc->trace_out(cm, "CMself check returning TRUE");
1689     return 1;
1690 }
1691 
1692 extern int
1693 libcmfabric_LTX_connection_eq(cm, svc, trans, attrs, fcd)
1694 	CManager cm;
1695 CMtrans_services svc;
1696 transport_entry trans;
1697 attr_list attrs;
1698 fabric_conn_data_ptr fcd;
1699 {
1700 
1701 	int int_port_num;
1702 	int requested_IP = -1;
1703 	char *host_name = NULL;
1704 
1705 	if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
1706 	                /* value pointer */ (attr_value *)(long) & host_name)) {
1707 		svc->trace_out(cm, "CMFABRIC transport found no IP_HOST attribute");
1708 	}
1709 	if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
1710 	                /* value pointer */ (attr_value *)(long) & int_port_num)) {
1711 		svc->trace_out(cm, "Conn Eq CMFABRIC transport found no IP_PORT attribute");
1712 		return 0;
1713 	}
1714 	if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
1715 	                /* value pointer */ (attr_value *)(long) & requested_IP)) {
1716 		svc->trace_out(cm, "CMFABRIC transport found no IP_ADDR attribute");
1717 	}
1718 	if (requested_IP == -1) {
1719 		check_host(host_name, (void *) &requested_IP);
1720 		requested_IP = ntohl(requested_IP);
1721 		svc->trace_out(cm, "IP translation for hostname %s is %x", host_name,
1722 		               requested_IP);
1723 	}
1724 
1725 	svc->trace_out(cm, "Socket Conn_eq comparing IP/ports %x/%d and %x/%d",
1726 	               fcd->remote_IP, fcd->remote_contact_port,
1727 	               requested_IP, int_port_num);
1728 	if ((fcd->remote_IP == requested_IP) &&
1729 	    (fcd->remote_contact_port == int_port_num)) {
1730 		svc->trace_out(cm, "Socket Conn_eq returning TRUE");
1731 		return 1;
1732 	}
1733 	svc->trace_out(cm, "Socket Conn_eq returning FALSE");
1734 	return 0;
1735 }
1736 
1737 
free_lres(fabric_client_data_ptr fd)1738 static void free_lres(fabric_client_data_ptr fd)
1739 {
1740 	fi_close(&fd->cmeq->fid);
1741 }
1742 
alloc_cm_res(fabric_client_data_ptr fd)1743 static int alloc_cm_res(fabric_client_data_ptr fd)
1744 {
1745 	struct fi_eq_attr cm_attr;
1746 	int ret;
1747 
1748 	memset(&cm_attr, 0, sizeof cm_attr);
1749 	cm_attr.wait_obj = FI_WAIT_FD;
1750 	ret = fi_eq_open(fd->fab, &cm_attr, &fd->cmeq, NULL);
1751 	if (ret)
1752 		FT_PRINTERR("fi_eq_open", ret);
1753 
1754 	return ret;
1755 }
1756 
free_ep_res(fabric_conn_data_ptr fcd)1757 static void free_ep_res(fabric_conn_data_ptr fcd)
1758 {
1759 	fi_close(&fcd->conn_ep->fid);
1760 	fi_close(&fcd->send_mr->fid);
1761 	fi_close(&fcd->read_mr->fid);
1762 	fi_close(&fcd->rcq->fid);
1763 	fi_close(&fcd->scq->fid);
1764 }
1765 
alloc_ep_res(fabric_conn_data_ptr fcd,struct fi_info * fi)1766 static int alloc_ep_res(fabric_conn_data_ptr fcd, struct fi_info *fi)
1767 {
1768 	fabric_client_data_ptr fabd = fcd->fabd;
1769 	struct fi_cq_attr cq_attr;
1770 	uint64_t access_mode;
1771 	int ret;
1772 
1773 	fcd->buffer_size = PIGGYBACK;
1774 
1775 	fcd->read_buf = fabd->svc->get_data_buffer(fabd->cm, MAX(fcd->buffer_size, sizeof(uint64_t)));
1776 	if (!fcd->read_buf) {
1777 		perror("malloc");
1778 		return -1;
1779 	}
1780 	fcd->max_credits = 512;
1781 	memset(&cq_attr, 0, sizeof cq_attr);
1782 	cq_attr.format = FI_CQ_FORMAT_DATA;
1783 	cq_attr.size = fcd->max_credits << 1;
1784 	if (fabd->send_waitset) {
1785 	    cq_attr.wait_obj = FI_WAIT_SET;
1786 	    cq_attr.wait_set = fabd->send_waitset;
1787 	} else {
1788 	    cq_attr.wait_obj = FI_WAIT_FD;
1789 	}
1790 	ret = fi_cq_open(fabd->dom, &cq_attr, &fcd->scq, NULL);
1791 	if (ret) {
1792 		FT_PRINTERR("fi_cq_open, on fcd->scq", ret);
1793 		goto err1;
1794 	}
1795 
1796 	struct fi_cq_attr attrs;
1797 	memset(&attrs, 0, sizeof(attrs));
1798 	attrs.format = FI_CQ_FORMAT_DATA;
1799 	attrs.wait_obj = FI_WAIT_FD;
1800 	attrs.size = fcd->max_credits << 1;
1801 	ret = fi_cq_open(fabd->dom, &cq_attr, &fcd->rcq, NULL);
1802 	if (ret) {
1803 		FT_PRINTERR("fi_cq_open", ret);
1804 		goto err2;
1805 	}
1806 
1807 	access_mode = FI_REMOTE_READ;
1808 	access_mode |= FI_RECV | FI_READ | FI_WRITE | FI_REMOTE_WRITE;
1809 	fcd->send_buf = malloc(MAX(fcd->buffer_size, sizeof(uint64_t)));
1810 	fcd->mapped_recv_buf = malloc(MAX(fcd->buffer_size, sizeof(uint64_t)));
1811 	if (!fcd->send_buf) {
1812 		perror("malloc");
1813 		return -1;
1814 	}
1815 	ret = fi_mr_reg(fabd->dom, fcd->mapped_recv_buf, MAX(fcd->buffer_size, sizeof(uint64_t)),
1816 			access_mode, 0, 0, 0, &fcd->read_mr, NULL);
1817 	if (ret) {
1818 		FT_PRINTERR("fi_mr_reg", ret);
1819 		goto err3;
1820 	}
1821 
1822 	access_mode = FI_REMOTE_WRITE | FI_WRITE;
1823 	printf("fi_mr_reg length %lu, send_buf %p\n", MAX(fcd->buffer_size, sizeof(uint64_t)), fcd->send_buf);
1824 	ret = fi_mr_reg(fabd->dom, fcd->send_buf, MAX(fcd->buffer_size, sizeof(uint64_t)),
1825 			access_mode, 0, 0, 0, &fcd->send_mr, NULL);
1826 	if (ret) {
1827 		FT_PRINTERR("fi_mr_reg", ret);
1828 		goto err3;
1829 	}
1830 
1831 	ret = fi_endpoint(fabd->dom, fi, &fcd->conn_ep, NULL);
1832 	if (ret) {
1833 		FT_PRINTERR("fi_endpoint", ret);
1834 		goto err4;
1835 	}
1836 
1837 	if (!fabd->cmeq) {
1838 		ret = alloc_cm_res(fabd);
1839 		if (ret)
1840 			goto err4;
1841 	}
1842 
1843 	return 0;
1844 
1845 err4:
1846 	fi_close(&fcd->read_mr->fid);
1847 	fi_close(&fcd->send_mr->fid);
1848 err3:
1849 	fi_close(&fcd->rcq->fid);
1850 err2:
1851 	fi_close(&fcd->scq->fid);
1852 err1:
1853 	free(fcd->send_buf);
1854 	return ret;
1855 }
1856 
bind_ep_res(fabric_conn_data_ptr fcd)1857 static int bind_ep_res(fabric_conn_data_ptr fcd)
1858 {
1859 	int ret;
1860 
1861 	ret = fi_ep_bind(fcd->conn_ep, &fcd->fabd->cmeq->fid, 0);
1862 	if (ret) {
1863 		FT_PRINTERR("fi_ep_bind", ret);
1864 		return ret;
1865 	}
1866 
1867 	ret = fi_ep_bind(fcd->conn_ep, &fcd->scq->fid, FI_SEND);
1868 	if (ret) {
1869 		FT_PRINTERR("fi_ep_bind", ret);
1870 		return ret;
1871 	}
1872 
1873 	ret = fi_ep_bind(fcd->conn_ep, &fcd->rcq->fid, FI_RECV);
1874 	if (ret) {
1875 		FT_PRINTERR("fi_ep_bind", ret);
1876 		return ret;
1877 	}
1878 
1879 	ret = fi_enable(fcd->conn_ep);
1880 	if (ret) {
1881 		FT_PRINTERR("fi_enable", ret);
1882 		return ret;
1883 	}
1884 
1885 	/* Post the first recv buffer */
1886 	ret = fi_recv(fcd->conn_ep, fcd->mapped_recv_buf, fcd->buffer_size, fi_mr_desc(fcd->read_mr), 0, fcd->mapped_recv_buf);
1887 	if (ret)
1888 		FT_PRINTERR("fi_recv", ret);
1889 
1890 	return ret;
1891 }
1892 
1893 
1894 typedef struct {
1895     /* OFI objects */
1896     /* int avtid; */
1897     /* struct fid_domain *domain; */
1898     /* struct fid_fabric *fabric; */
1899     /* struct fid_av     *av; */
1900     /* struct fid_ep     *ep; */
1901     /* struct fid_cq     *p2p_cq; */
1902     /* struct fid_cntr   *rma_ctr; */
1903 
1904     /* Queryable limits */
1905     uint64_t        max_buffered_send;
1906     uint64_t        max_buffered_write;
1907     uint64_t        max_send;
1908     uint64_t        max_write;
1909     uint64_t        max_short_send;
1910     uint64_t        max_mr_key_size;
1911     int             max_windows_bits;
1912     int             max_huge_rma_bits;
1913     int             max_huge_rmas;
1914     int             huge_rma_shift;
1915     int             context_shift;
1916     size_t          iov_limit;
1917     size_t          rma_iov_limit;
1918 
1919     /* /\* Mutexex and endpoints *\/ */
1920     /* MPIDI_OFI_cacheline_mutex_t mutexes[4]; */
1921     /* MPIDI_OFI_context_t         ctx[MPIDI_OFI_MAX_ENDPOINTS]; */
1922 
1923     /* /\* Window/RMA Globals *\/ */
1924     /* void                             *win_map; */
1925     /* uint64_t                          cntr; */
1926     /* MPIDI_OFI_atomic_valid_t  win_op_table[MPIDI_OFI_DT_SIZES][MPIDI_OFI_OP_SIZES]; */
1927 
1928     /* Active Message Globals */
1929     /* struct iovec                           am_iov[MPIDI_OFI_NUM_AM_BUFFERS]; */
1930     /* struct fi_msg                          am_msg[MPIDI_OFI_NUM_AM_BUFFERS]; */
1931     /* void                                  *am_bufs[MPIDI_OFI_NUM_AM_BUFFERS]; */
1932     /* MPIDI_OFI_am_repost_request_t  am_reqs[MPIDI_OFI_NUM_AM_BUFFERS]; */
1933     /* MPIDI_NM_am_target_handler_fn      am_handlers[MPIDI_OFI_MAX_AM_HANDLERS_TOTAL]; */
1934     /* MPIDI_NM_am_origin_handler_fn      am_send_cmpl_handlers[MPIDI_OFI_MAX_AM_HANDLERS_TOTAL]; */
1935     /* MPIU_buf_pool_t                       *am_buf_pool; */
1936     /* OPA_int_t                              am_inflight_inject_emus; */
1937     /* OPA_int_t                              am_inflight_rma_send_mrs; */
1938 
1939     /* Completion queue buffering */
1940     /* MPIDI_OFI_cq_buff_entry_t cq_buffered[MPIDI_OFI_NUM_CQ_BUFFERED]; */
1941     /* struct slist                      cq_buff_list; */
1942     /* int                               cq_buff_head; */
1943     /* int                               cq_buff_tail; */
1944 
1945     /* Process management and PMI globals */
1946     /* int    pname_set; */
1947     /* int    pname_len; */
1948     /* int    jobid; */
1949     /* char   addrname[FI_NAME_MAX]; */
1950     /* size_t addrnamelen; */
1951     /* char   kvsname[MPIDI_KVSAPPSTRLEN]; */
1952     /* char   pname[MPI_MAX_PROCESSOR_NAME]; */
1953     /* int    port_name_tag_mask[MPIR_MAX_CONTEXT_MASK]; */
1954 } MPIDI_OFI_global_t;
1955 
1956 MPIDI_OFI_global_t       MPIDI_Global;
1957 
server_listen(fabric_client_data_ptr fd,attr_list listen_info)1958 static int server_listen(fabric_client_data_ptr fd, attr_list listen_info)
1959 {
1960     struct fi_info *fi, *prov_use;
1961     CMtrans_services svc = fd->svc;
1962     int ret;
1963     int attr_port_num;
1964     char *port_str = NULL;
1965 
1966 //    ret = fi_getinfo(FT_FIVERSION, fd->opts.src_addr, fd->opts.src_port, FI_SOURCE,
1967 //		     fd->hints, &fi);
1968     if (listen_info != NULL
1969 	&& !get_int_attr(listen_info, CM_IP_PORT, &attr_port_num)) {
1970 	attr_port_num = 0;
1971     } else {
1972 	if (attr_port_num > USHRT_MAX || attr_port_num < 0) {
1973 	    fprintf(stderr, "Requested port number %d is invalid\n", attr_port_num);
1974 	    return 1;
1975 	}
1976 	port_str = malloc(10);
1977 	sprintf(port_str, "%d", attr_port_num);
1978     }
1979 
1980     ret = fi_getinfo(FT_FIVERSION, NULL, port_str, FI_SOURCE, fd->hints, &fi);
1981 
1982     if (((struct sockaddr_in *)fi->src_addr)->sin_addr.s_addr == htonl(INADDR_LOOPBACK) &&
1983 	(strcmp(fi->fabric_attr->prov_name, "verbs") == 0)) {
1984 	char host_name[256];
1985 	fi_freeinfo(fi);
1986 
1987 	if (listen_info) {
1988 	    listen_info = attr_copy_list(listen_info);
1989 	} else {
1990 	    listen_info = create_attr_list();
1991 	}
1992 	set_string_attr(listen_info, CM_IP_INTERFACE, strdup("ib"));
1993 
1994 	svc->trace_out(fd->cm, "CMFabric begin listen, requested port %d", attr_port_num);
1995 	get_IP_config(host_name, sizeof(host_name), NULL, NULL, NULL,
1996 		      NULL, listen_info, svc->trace_out, (void *)fd->cm);
1997 	ret = fi_getinfo(FT_FIVERSION, host_name, port_str, FI_SOURCE, fd->hints, &fi);
1998 	svc->trace_out(fd->cm, "%s return value fi is %s\n", "server", fi_tostr(fi, FI_TYPE_INFO));
1999 	fd->hostname = strdup(host_name);
2000 	free_attr_list(listen_info);
2001     } else {
2002 	svc->trace_out(fd->cm, "%s return value fi is %s\n", "server", fi_tostr(fi, FI_TYPE_INFO));
2003     }
2004 
2005     prov_use = fi;
2006     MPIDI_Global.max_buffered_send  = prov_use->tx_attr->inject_size;
2007     MPIDI_Global.max_buffered_write = prov_use->tx_attr->inject_size;
2008     MPIDI_Global.max_send           = prov_use->ep_attr->max_msg_size;
2009     MPIDI_Global.max_write          = prov_use->ep_attr->max_msg_size;
2010     svc->trace_out(fd->cm, "Max send is %ld, max write is %ld\n", MPIDI_Global.max_send, MPIDI_Global.max_write);
2011     /* MPIDI_Global.iov_limit          = MIN(prov_use->tx_attr->iov_limit,MPIDI_OFI_IOV_MAX); */
2012     /* MPIDI_Global.rma_iov_limit      = MIN(prov_use->tx_attr->rma_iov_limit,MPIDI_OFI_IOV_MAX); */
2013     MPIDI_Global.max_mr_key_size    = prov_use->domain_attr->mr_key_size;
2014 
2015     /* if(MPIDI_Global.max_mr_key_size >= 8) { */
2016     /*     MPIDI_Global.max_windows_bits   = MPIDI_OFI_MAX_WINDOWS_BITS_64; */
2017     /*     MPIDI_Global.max_huge_rma_bits  = MPIDI_OFI_MAX_HUGE_RMA_BITS_64; */
2018     /*     MPIDI_Global.max_huge_rmas      = MPIDI_OFI_MAX_HUGE_RMAS_64; */
2019     /*     MPIDI_Global.huge_rma_shift     = MPIDI_OFI_HUGE_RMA_SHIFT_64; */
2020     /*     MPIDI_Global.context_shift      = MPIDI_OFI_CONTEXT_SHIFT_64; */
2021     /* } else if(MPIDI_Global.max_mr_key_size >= 4) { */
2022     /*     MPIDI_Global.max_windows_bits   = MPIDI_OFI_MAX_WINDOWS_BITS_32; */
2023     /*     MPIDI_Global.max_huge_rma_bits  = MPIDI_OFI_MAX_HUGE_RMA_BITS_32; */
2024     /*     MPIDI_Global.max_huge_rmas      = MPIDI_OFI_MAX_HUGE_RMAS_32; */
2025     /*     MPIDI_Global.huge_rma_shift     = MPIDI_OFI_HUGE_RMA_SHIFT_32; */
2026     /*     MPIDI_Global.context_shift      = MPIDI_OFI_CONTEXT_SHIFT_32; */
2027     /* } else if(MPIDI_Global.max_mr_key_size >= 2) { */
2028     /*     MPIDI_Global.max_windows_bits   = MPIDI_OFI_MAX_WINDOWS_BITS_16; */
2029     /*     MPIDI_Global.max_huge_rma_bits  = MPIDI_OFI_MAX_HUGE_RMA_BITS_16; */
2030     /*     MPIDI_Global.max_huge_rmas      = MPIDI_OFI_MAX_HUGE_RMAS_16; */
2031     /*     MPIDI_Global.huge_rma_shift     = MPIDI_OFI_HUGE_RMA_SHIFT_16; */
2032     /*     MPIDI_Global.context_shift      = MPIDI_OFI_CONTEXT_SHIFT_16; */
2033     /* } else { */
2034     /*     MPIR_ERR_SETFATALANDJUMP4(mpi_errno, */
2035     /*                               MPI_ERR_OTHER, */
2036     /*                               "**ofid_rma_init", */
2037     /*                               "**ofid_rma_init %s %d %s %s", */
2038     /*                               __SHORT_FILE__, */
2039     /*                               __LINE__, */
2040     /*                               FCNAME, */
2041     /*                               "Key space too small"); */
2042     /* } */
2043 	if (ret) {
2044 		FT_PRINTERR("fi_getinfo", ret);
2045 		return ret;
2046 	}
2047 
2048 	ret = fi_fabric(fi->fabric_attr, &fd->fab, NULL);
2049 
2050 	if (ret) {
2051 		FT_PRINTERR("fi_fabric", ret);
2052 		goto err0;
2053 	}
2054 
2055 	struct fi_wait_attr attrs;
2056 	attrs.wait_obj = FI_WAIT_FD;
2057 	attrs.flags = 0;
2058 	ret = fi_wait_open(fd->fab, &attrs, &fd->send_waitset);
2059 	if (ret) {
2060 	    /* sigh */
2061 	    fd->send_waitset = NULL;
2062 	}
2063 
2064 	ret = fi_passive_ep(fd->fab, fi, &fd->listen_ep, NULL);
2065 	if (ret) {
2066 		FT_PRINTERR("fi_passive_ep", ret);
2067 		goto err1;
2068 	}
2069 
2070 	ret = alloc_cm_res(fd);
2071 	if (ret)
2072 		goto err2;
2073 
2074 	ret = fi_pep_bind(fd->listen_ep, &fd->cmeq->fid, 0);
2075 	if (ret) {
2076 		FT_PRINTERR("fi_pep_bind", ret);
2077 		goto err3;
2078 	}
2079 
2080 
2081 	ret = fi_listen(fd->listen_ep);
2082 	if (ret) {
2083 		FT_PRINTERR("fi_listen", ret);
2084 		goto err3;
2085 	}
2086 
2087 	fi_freeinfo(fi);
2088 	return 0;
2089 err3:
2090 	free_lres(fd);
2091 err2:
2092 	fi_close(&fd->listen_ep->fid);
2093 err1:
2094 	fi_close(&fd->fab->fid);
2095 err0:
2096 	fi_freeinfo(fi);
2097 	return ret;
2098 }
2099 
server_connect(fabric_conn_data_ptr fcd)2100 static int server_connect(fabric_conn_data_ptr fcd)
2101 {
2102 	struct fi_eq_cm_entry entry;
2103 	uint32_t event;
2104 	struct fi_info *info = NULL;
2105 	ssize_t rd;
2106 	int ret;
2107 	fabric_client_data_ptr fabd = fcd->fabd;
2108 
2109 	rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
2110 	if (rd != sizeof entry) {
2111 	    if (rd == -FI_EAVAIL) {
2112 		struct fi_eq_err_entry error = {0};
2113 		int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
2114 		if (rc) {
2115 		    char buf[1024];
2116 		    fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
2117 									error.err_data, buf, 1024));
2118 		}
2119 	    } else {
2120 		FT_PRINTERR("fi_eq_sread", rd);
2121 	    }
2122 	    return (int) rd;
2123 	}
2124 
2125 	info = entry.info;
2126 	if (event != FI_CONNREQ) {
2127 		fprintf(stderr, "Unexpected CM event %d\n", event);
2128 		ret = -FI_EOTHER;
2129 		goto err1;
2130 	}
2131 
2132 	ret = fi_domain(fabd->fab, info, &fabd->dom, NULL);
2133 	if (ret) {
2134 		FT_PRINTERR("fi_domain", ret);
2135 		goto err1;
2136 	}
2137 
2138 
2139 	ret = fi_endpoint(fabd->dom, info, &fcd->conn_ep, NULL);
2140 	if (ret) {
2141 		FT_PRINTERR("fi_endpoint", -ret);
2142 		goto err1;
2143 	}
2144 
2145 	ret = alloc_ep_res(fcd, info);
2146 	if (ret)
2147 		 goto err1;
2148 
2149 	ret = bind_ep_res(fcd);
2150 	if (ret)
2151 		goto err3;
2152 
2153 	ret = fi_accept(fcd->conn_ep, NULL, 0);
2154 	if (ret) {
2155 		FT_PRINTERR("fi_accept", ret);
2156 		goto err3;
2157 	}
2158 
2159 	rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
2160  	if (rd != sizeof entry) {
2161 	    if (ret == -FI_EAVAIL) {
2162 		struct fi_eq_err_entry error = {0};
2163 		int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
2164 		if (rc) {
2165 		    char buf[1024];
2166 		    fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
2167 									error.err_data, buf, 1024));
2168 		}
2169 	    } else {
2170 		FT_PRINTERR("fi_eq_sread", rd);
2171 	    }
2172 	    goto err3;
2173  	}
2174 
2175 	if (event != FI_CONNECTED || entry.fid != &fcd->conn_ep->fid) {
2176 		fprintf(stderr, "Unexpected CM event %d fid %p (ep %p)\n",
2177 			event, entry.fid, fcd->conn_ep);
2178  		ret = -FI_EOTHER;
2179  		goto err3;
2180  	}
2181 
2182  	fi_freeinfo(info);
2183  	return 0;
2184 
2185 err3:
2186 	free_ep_res(fcd);
2187 err1:
2188  	fi_reject(fabd->listen_ep, info->handle, NULL, 0);
2189  	fi_freeinfo(info);
2190  	return ret;
2191 }
2192 
2193 /*
2194  * Create an passive endpoint to listen for connections
2195  */
2196 extern attr_list
libcmfabric_LTX_non_blocking_listen(CManager cm,CMtrans_services svc,transport_entry trans,attr_list listen_info)2197 libcmfabric_LTX_non_blocking_listen(CManager cm, CMtrans_services svc, transport_entry trans, attr_list listen_info)
2198 {
2199     fabric_client_data_ptr fd = trans->trans_data;
2200     int wait_sock = -1;
2201     int ret, IP, port_num;
2202     size_t addrlen;
2203     struct sockaddr_in local_addr;
2204 
2205     if (cm) {
2206 	/* assert CM is locked */
2207 	assert(CM_LOCKED(svc, cm));
2208     }
2209     int result = server_listen(fd, listen_info);
2210     if (result != 0) {
2211 	fprintf(stderr, "Cannot bind INET socket\n");
2212 	return NULL;
2213     }
2214 
2215     addrlen = sizeof(local_addr);
2216     ret = fi_getname(&fd->listen_ep->fid, (void*)&local_addr, &addrlen);
2217     IP = ntohl(local_addr.sin_addr.s_addr);
2218     port_num = ntohs(local_addr.sin_port);
2219     if (ret) {
2220 	FT_PRINTERR("fi_getname", ret);
2221 	return NULL;
2222     }
2223 
2224     ret = fi_control (&fd->cmeq->fid, FI_GETWAIT, (void *) &wait_sock);
2225     if (ret) {
2226 	FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
2227     } else {
2228 	svc->trace_out(cm, "Cmfabric Adding fabric_service_incoming as action on fd %d", wait_sock);
2229 	svc->fd_add_select(cm, wait_sock, fabric_service_incoming,
2230 			   (void *) trans, (void *) fd->listen_ep);
2231     }
2232     {
2233 	attr_list ret_list;
2234 
2235 	svc->trace_out(cm, "CMFABRIC listen succeeded on port %d, fd %d",
2236 		       port_num, wait_sock);
2237 	ret_list = create_attr_list();
2238 
2239 	fd->listen_port = port_num;
2240 	add_attr(ret_list, CM_TRANSPORT, Attr_String,
2241 		 (attr_value) strdup("fabric"));
2242 	if ((getenv("CMFabricUseHostname") != NULL) ||
2243 	    (getenv("CM_NETWORK") != NULL)) {
2244 	    add_attr(ret_list, CM_IP_HOSTNAME, Attr_String,
2245 		     (attr_value) strdup(fd->hostname));
2246 	} else if (IP == 0) {
2247 	    add_attr(ret_list, CM_IP_ADDR, Attr_Int4,
2248 		     (attr_value)INADDR_LOOPBACK);
2249 	} else {
2250 	    add_int_attr(ret_list, CM_IP_ADDR, (int)IP);
2251 	}
2252 	add_attr(ret_list, CM_IP_PORT, Attr_Int4,
2253 		 (attr_value) (long)port_num);
2254 
2255 	return ret_list;
2256     }
2257 }
2258 
2259 #if defined(HAVE_WINDOWS_H) && !defined(NEED_IOVEC_DEFINE)
2260 #define NEED_IOVEC_DEFINE
2261 #endif
2262 
2263 #ifdef NEED_IOVEC_DEFINE
2264 struct iovec {
2265 	void *iov_base;
2266 	int iov_len;
2267 };
2268 
2269 #endif
2270 
2271 extern void
2272 libcmfabric_LTX_set_write_notify(trans, svc, fcd, enable)
2273 	transport_entry trans;
2274 CMtrans_services svc;
2275 fabric_conn_data_ptr fcd;
2276 int enable;
2277 {
2278 	if (enable != 0) {
2279 		svc->fd_write_select(trans->cm, fcd->fd, (select_list_func) trans->write_possible,
2280 		                     (void *)trans, (void *) fcd->conn);
2281 	} else {
2282 		/* remove entry */
2283 		svc->fd_write_select(trans->cm, fcd->fd, NULL, NULL, NULL);
2284 	}
2285 }
2286 
2287 
2288 extern CMbuffer
libcmfabric_LTX_read_block_func(CMtrans_services svc,fabric_conn_data_ptr fcd,int * len_ptr,int * offset_ptr)2289 libcmfabric_LTX_read_block_func(CMtrans_services svc, fabric_conn_data_ptr fcd, int *len_ptr, int *offset_ptr)
2290 {
2291     *len_ptr = fcd->read_buffer_len;
2292     if (fcd->read_buf) {
2293 	CMbuffer tmp = fcd->read_buf;
2294 	fcd->read_buf = NULL;
2295 	fcd->read_buffer_len = 0;
2296 	if (offset_ptr) *offset_ptr = fcd->read_offset;
2297 	return tmp;
2298     }
2299     return NULL;
2300 }
2301 
2302 
2303 #ifndef IOV_MAX
2304 /* this is not defined in some places where it should be.  Conservative. */
2305 #define IOV_MAX 16
2306 #endif
2307 
2308 extern int
libcmfabric_LTX_writev_complete_notify_func(CMtrans_services svc,fabric_conn_data_ptr fcd,void * iovs,int iovcnt,attr_list attrs,CMcompletion_notify_func notify_func,void * notify_client_data)2309 libcmfabric_LTX_writev_complete_notify_func(CMtrans_services svc,
2310 					fabric_conn_data_ptr fcd,
2311 					void *iovs,
2312 					int iovcnt,
2313 					attr_list attrs,
2314 					CMcompletion_notify_func notify_func,
2315 					void *notify_client_data)
2316 {
2317     	int fd = fcd->fd;
2318 	int left = 0;
2319 	int iget = 0;
2320 	int i;
2321 	int can_reuse_mapping = 0;
2322 	struct iovec * iov = (struct iovec*) iovs, *tmp_iov;
2323 	rinfo *last_write_request = &fcd->last_write;
2324 	struct control_message msg;
2325 
2326 	for (i = 0; i < iovcnt; i++) {
2327 	    left += iov[i].iov_len;
2328 	}
2329 
2330 	svc->trace_out(fcd->fabd->cm, "CMFABRIC writev of %d bytes on fd %d",
2331 	               left, fd);
2332 
2333 	if (left + msg_offset() < PIGGYBACK)
2334 	{
2335 		//total size is less than the piggyback size
2336 		iget = internal_write_piggyback(svc, fcd, left, iov, iovcnt);
2337 		if (notify_func) {
2338 		    (notify_func)(notify_client_data);
2339 		}
2340 		if(iget < 0)
2341 		{
2342 			svc->trace_out(fcd->fabd->cm, "CMFABRIC error in writing piggyback");
2343 			return -1;
2344 		}
2345 		if(iget == 0)
2346 		{
2347 			return iovcnt;
2348 		}
2349 		return -1;
2350 	}
2351 
2352 	svc->set_pending_write(fcd->conn);
2353 
2354 	if (notify_func) {
2355 	    can_reuse_mapping = 1;
2356 	    /* OK, we're not going to copy the data */
2357 	    if (last_write_request->iovcnt == iovcnt) {
2358 		int i;
2359 		for(i=0; i < last_write_request->iovcnt; i++) {
2360 		    if ((iov[i].iov_len != last_write_request->iov[i].iov_len) ||
2361 			(iov[i].iov_base != last_write_request->iov[i].iov_base)) {
2362 			can_reuse_mapping = 0;
2363 			svc->trace_out(fcd->fabd->cm, "CMFABRIC already mapped data, doesn't match write, buf %d, %p vs. %p, %d vs. %d",
2364 				       i, iov[i].iov_base, last_write_request->iov[i].iov_base, iov[i].iov_len, last_write_request->iov[i].iov_len);
2365 		    break;
2366 		    }
2367 		}
2368 	    } else {
2369 		svc->trace_out(fcd->fabd->cm, "CMFABRIC either no already mapped data, or wrong buffer count");
2370 		can_reuse_mapping = 0;
2371 	    }
2372 	} else {
2373 	    svc->trace_out(fcd->fabd->cm, "CMFABRIC User-owned data with no notify, so no reuse\n");
2374 	}
2375 #ifndef DO_DEREG_ON_FINISH
2376 	if (last_write_request->iovcnt && !(can_reuse_mapping)) {
2377 	    for(i = 0; i < last_write_request->iovcnt; i ++)
2378 	    {
2379 		fi_close(&last_write_request->mrlist[i]->fid);
2380 		last_write_request->mrlist[i] = NULL;
2381 	    }
2382 
2383 	    free(last_write_request->mrlist);
2384 	    free(last_write_request->iov);
2385 	    last_write_request->iov = NULL;
2386 	    last_write_request->iov = NULL;
2387 	    last_write_request->mrlist = NULL;
2388 	    last_write_request->iovcnt = 0;
2389 	}
2390 #endif
2391 	if (notify_func == NULL) {
2392 	    /*
2393 	     * Semantics are that *MUST* be done with the data when we return,
2394 	     * so, for now, copy all data
2395 	     */
2396 	    tmp_iov = malloc(sizeof(tmp_iov[0]) * iovcnt);
2397 	    for (i = 0; i < iovcnt; i++) {
2398 		tmp_iov[i].iov_len = iov[i].iov_len;
2399 		tmp_iov[i].iov_base = malloc(iov[i].iov_len);
2400 		memcpy(tmp_iov[i].iov_base, iov[i].iov_base, iov[i].iov_len);
2401 	    }
2402 	} else {
2403 	    /*
2404 	     *  Cool.  The app doesn't need the data back right away.
2405 	     *  We can keep it and tell the upper levels when we're done.
2406 	     *  Don't copy.  The reply message will trigger the notification.
2407 	     */
2408 	    tmp_iov = iov;
2409 	}
2410 	memset(&msg, 0, sizeof(msg));
2411 
2412 
2413 	svc->trace_out(fcd->fabd->cm, "Can reuse mapping is %d\n", can_reuse_mapping);
2414 	if (!can_reuse_mapping) {
2415 	    int i;
2416 	    fcd->last_write.iovcnt = iovcnt;
2417 	    if (fcd->last_write.iov) free(fcd->last_write.iov);
2418 	    fcd->last_write.iov = malloc(sizeof(tmp_iov[0]) * iovcnt);
2419 	    memcpy(fcd->last_write.iov, tmp_iov, sizeof(tmp_iov[0]) * iovcnt);
2420 
2421 	    fcd->last_write.mrlist = malloc(iovcnt * sizeof(fcd->last_write.mrlist[0]));
2422 	    for (i=0; i < iovcnt; i++) {
2423 		int ret;
2424 		svc->trace_out(fcd->fabd->cm, "fi_mr_reg %d, addr %p, len %ld, with attrs REMOTE_READ,REMOTE_WRITE, SEND, RECV)\n", i, tmp_iov[i].iov_base, tmp_iov[i].iov_len);
2425 		ret = fi_mr_reg(fcd->fabd->dom, tmp_iov[i].iov_base, tmp_iov[i].iov_len,
2426 				FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV, 0, 0, 0, &fcd->last_write.mrlist[i], NULL);
2427 		if (ret) {
2428 		    FT_PRINTERR("fi_mr_reg of elements in bulk write", ret);
2429 		    return -1;
2430 		}
2431 
2432 	    }
2433 	}
2434 	fcd->last_write.notify_func = notify_func;
2435 	fcd->last_write.notify_client_data = notify_client_data;
2436 
2437 	if(fcd->last_write.mrlist == NULL)
2438 	{
2439 		svc->trace_out(fcd->fabd->cm, "CMFABRIC writev error in registereing memory");
2440 		return -1;
2441 	}
2442 
2443 	svc->trace_out(fcd->fabd->cm, "FIrst 16 bytes of send buffer (len %d) are %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x\n", left, ((unsigned char*)iov[0].iov_base)[0], ((unsigned char*)iov[0].iov_base)[1], ((unsigned char*)iov[0].iov_base)[2], ((unsigned char*)iov[0].iov_base)[3], ((unsigned char*)iov[0].iov_base)[4], ((unsigned char*)iov[0].iov_base)[5], ((unsigned char*)iov[0].iov_base)[6], ((unsigned char*)iov[0].iov_base)[7], ((unsigned char*)iov[0].iov_base)[8], ((unsigned char*)iov[0].iov_base)[9], ((unsigned char*)iov[0].iov_base)[10], ((unsigned char*)iov[0].iov_base)[11], ((unsigned char*)iov[0].iov_base)[12], ((unsigned char*)iov[0].iov_base)[13], ((unsigned char*)iov[0].iov_base)[14], ((unsigned char*)iov[0].iov_base)[15]);
2444 	iget = internal_write_request(svc, fcd, left, &fcd->last_write);
2445 	if(iget < 0)
2446 	{
2447 		svc->trace_out(fcd->fabd->cm, "CMFABRIC error in writing request");
2448 		return -1;
2449 	}
2450 
2451 	return iovcnt;
2452 }
2453 
2454 extern int
2455 libcmfabric_LTX_writev_func(svc, fcd, iovs, iovcnt, attrs)
2456 CMtrans_services svc;
2457 fabric_conn_data_ptr fcd;
2458 void *iovs;
2459 int iovcnt;
2460 attr_list attrs;
2461 {
2462     return libcmfabric_LTX_writev_complete_notify_func(svc, fcd, iovs, iovcnt,
2463 						   attrs, NULL, NULL);
2464 }
2465 
2466 static void
free_fabric_data(CManager cm,void * fdv)2467 free_fabric_data(CManager cm, void *fdv)
2468 {
2469     fabric_client_data_ptr fd = (fabric_client_data_ptr) fdv;
2470     CMtrans_services svc = fd->svc;
2471     kill_thread(fd);
2472     if (fd->hostname != NULL)
2473 	svc->free_func(fd->hostname);
2474     svc->free_func(fd);
2475 }
2476 
2477 static void
check_completed_pull(CManager cm,fabric_client_data_ptr fabd)2478 check_completed_pull(CManager cm, fabric_client_data_ptr fabd)
2479 {
2480     msg_item_p msg;
2481     fabric_conn_data_ptr fcd;
2482     CMtrans_services svc = fabd->svc;
2483     if (!fabd->completed_queue) return;
2484 
2485     pthread_mutex_lock(&fabd->pull_queue_mutex);
2486     msg = fabd->completed_queue;
2487     fabd->completed_queue = msg->next;
2488     pthread_mutex_unlock(&fabd->pull_queue_mutex);
2489 
2490     svc->acquire_CM_lock(fabd->cm, __FILE__, __LINE__);
2491     /* OK, msg is ours alone */
2492     fcd = msg->conn_data;
2493     free(msg->pulls);
2494     CMbuffer cb = svc->create_data_and_link_buffer(fcd->fabd->cm,
2495 						   msg->buffer,
2496 						   msg->message_size);
2497     cb->return_callback = free_func;
2498     cb->return_callback_data = (void*)msg->mr_rec;
2499     fcd->read_buf = cb;
2500     svc->trace_out(fcd->fabd->cm, "FIrst 16 bytes of receive buffer (len %d) are %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x \n", msg->message_size, ((unsigned char*)cb->buffer)[0], ((unsigned char*)cb->buffer)[1], ((unsigned char*)cb->buffer)[2], ((unsigned char*)cb->buffer)[3], ((unsigned char*)cb->buffer)[4], ((unsigned char*)cb->buffer)[5], ((unsigned char*)cb->buffer)[6], ((unsigned char*)cb->buffer)[7], ((unsigned char*)cb->buffer)[8], ((unsigned char*)cb->buffer)[9], ((unsigned char*)cb->buffer)[10], ((unsigned char*)cb->buffer)[11], ((unsigned char*)cb->buffer)[12], ((unsigned char*)cb->buffer)[13], ((unsigned char*)cb->buffer)[14], ((unsigned char*)cb->buffer)[15]);
2501     fcd->read_buffer_len = msg->message_size;
2502     svc->trace_out(fcd->fabd->cm, "CMFABRIC handle_request completed");
2503     internal_write_response(svc, fcd, msg->message_size, msg->request_ID);
2504     fabd->trans->data_available(fabd->trans, fcd->conn);
2505     svc->return_data_buffer(fabd->trans->cm, cb);
2506     svc->drop_CM_lock(fabd->cm, __FILE__, __LINE__);
2507 
2508 }
2509 
2510 extern
libcmfabric_LTX_install_pull_schedule(CMtrans_services svc,transport_entry trans,struct timeval * base_time,struct timeval * period,CMavail_period_ptr avail)2511 void libcmfabric_LTX_install_pull_schedule(CMtrans_services svc,
2512 					   transport_entry trans,
2513 					   struct timeval *base_time,
2514 					   struct timeval *period,
2515 					   CMavail_period_ptr avail)
2516 {
2517     fabric_client_data_ptr fabd = trans->trans_data;
2518     fabd->pull_schedule_base = *base_time;
2519     fabd->pull_schedule_period = *period;
2520     CMavail_period_ptr tmp = fabd->avail;
2521     fabd->avail = avail;
2522     free(tmp);
2523     if (!fabd->thread_init) {
2524 	svc->trace_out(fabd->cm, "Starting pull thread!\n");
2525 	pthread_mutex_init(&fabd->pull_queue_mutex, NULL);
2526 	fabd->thread_should_run = 1;
2527 	pthread_t thr;
2528 	if (!fabd->send_waitset) {
2529 	    /* no waitset support */
2530 	    int filedes[2];
2531 	    if (pipe(filedes) != 0) {
2532 		perror("Pipe for wake not created.  Wake mechanism inoperative.");
2533 		return;
2534 	    }
2535 	    fabd->wake_read_fd = filedes[0];
2536 	    fabd->wake_write_fd = filedes[1];
2537 	    fabd->nfds = fabd->wake_read_fd;
2538 	    FD_SET(fabd->wake_read_fd, &fabd->readset);
2539 	    fabd->fcd_array_by_sfd = malloc(sizeof(char*));
2540 	    fabd->cq_array = malloc(sizeof(char*));
2541 	}
2542 	svc->add_poll(fabd->cm, (CMPollFunc) check_completed_pull, fabd);
2543 	pthread_create(&thr, NULL, (void*(*)(void*))&pull_thread, fabd);
2544 	fabd->thread_init = 1;
2545     }
2546 }
2547 
2548 extern void *
libcmfabric_LTX_initialize(CManager cm,CMtrans_services svc)2549 libcmfabric_LTX_initialize(CManager cm, CMtrans_services svc)
2550 {
2551     static int atom_init = 0;
2552 
2553     fabric_client_data_ptr fabd;
2554     svc->trace_out(cm, "Initialize CM fabric transport built in %s\n",
2555 		   EVPATH_MODULE_BUILD_DIR);
2556     if (atom_init == 0) {
2557 	CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
2558 	CM_IP_PORT = attr_atom_from_string("IP_PORT");
2559 	CM_IP_ADDR = attr_atom_from_string("IP_ADDR");
2560 	CM_IP_INTERFACE = attr_atom_from_string("IP_INTERFACE");
2561 	CM_FD = attr_atom_from_string("CONNECTION_FILE_DESCRIPTOR");
2562 	CM_THIS_CONN_PORT = attr_atom_from_string("THIS_CONN_PORT");
2563 	CM_PEER_CONN_PORT = attr_atom_from_string("PEER_CONN_PORT");
2564 	CM_PEER_IP = attr_atom_from_string("PEER_IP");
2565 	CM_PEER_HOSTNAME = attr_atom_from_string("PEER_HOSTNAME");
2566 	CM_PEER_LISTEN_PORT = attr_atom_from_string("PEER_LISTEN_PORT");
2567 	CM_NETWORK_POSTFIX = attr_atom_from_string("CM_NETWORK_POSTFIX");
2568 	CM_TRANSPORT = attr_atom_from_string("CM_TRANSPORT");
2569 	atom_init++;
2570     }
2571     fabd = svc->malloc_func(sizeof(struct fabric_client_data));
2572     fabd->svc = svc;
2573     memset(fabd, 0, sizeof(struct fabric_client_data));
2574     fabd->cm = cm;
2575     fabd->hostname = NULL;
2576     fabd->listen_port = -1;
2577     fabd->svc = svc;
2578     fabd->port = 1; //need to somehow get proper port here
2579 
2580     fabd->psn = lrand48()%256;
2581 
2582     fabd->hints = fi_allocinfo();
2583 
2584 //	fabd->hints->cap		= FI_CONTEXT;
2585     fabd->hints->ep_attr->type	= FI_EP_MSG;
2586     fabd->hints->caps		= FI_MSG | FI_RMA;
2587 //	fabd->hints->caps		= FI_MSG;
2588     fabd->hints->mode		= FI_CONTEXT | FI_LOCAL_MR | FI_RX_CQ_DATA;
2589     fabd->hints->addr_format	= FI_SOCKADDR;
2590 //	fabd->hints->tx_attr->op_flags  = FI_DELIVERY_COMPLETE | FI_COMPLETION;
2591     fabd->hints->tx_attr->op_flags  = FI_COMPLETION;
2592 
2593     fabd->hints->domain_attr->mr_mode = FI_MR_BASIC;
2594     fabd->hints->domain_attr->threading        =  FI_THREAD_SAFE;
2595     fabd->hints->domain_attr->control_progress =  FI_PROGRESS_AUTO;
2596     fabd->hints->domain_attr->data_progress    =  FI_PROGRESS_AUTO;
2597     svc->add_shutdown_task(cm, free_fabric_data, (void *) fabd, FREE_TASK);
2598 
2599     fabd->wake_read_fd = -1;
2600     EVPATH_FD_ZERO(&fabd->readset);
2601     fabd->nfds = 0;
2602     return (void *) fabd;
2603 }
2604 
2605 
2606 extern transport_entry
cmfabric_add_static_transport(CManager cm,CMtrans_services svc)2607 cmfabric_add_static_transport(CManager cm, CMtrans_services svc)
2608 {
2609     transport_entry transport;
2610     transport = svc->malloc_func(sizeof(struct _transport_item));
2611     memset(transport, 0, sizeof(*transport));
2612     transport->trans_name = strdup("fabric");
2613     transport->cm = cm;
2614     transport->transport_init = (CMTransport_func)libcmfabric_LTX_initialize;
2615     transport->listen = (CMTransport_listen_func)libcmfabric_LTX_non_blocking_listen;
2616     transport->initiate_conn = (CMConnection(*)())libcmfabric_LTX_initiate_conn;
2617     transport->self_check = (int(*)())libcmfabric_LTX_self_check;
2618     transport->connection_eq = (int(*)())libcmfabric_LTX_connection_eq;
2619     transport->shutdown_conn = (CMTransport_shutdown_conn_func)libcmfabric_LTX_shutdown_conn;
2620     transport->read_block_func = (CMTransport_read_block_func)libcmfabric_LTX_read_block_func;
2621     transport->read_to_buffer_func = (CMTransport_read_to_buffer_func)NULL;
2622     transport->writev_func = (CMTransport_writev_func)libcmfabric_LTX_writev_func;
2623     transport->writev_complete_notify_func = (CMTransport_writev_complete_notify_func)libcmfabric_LTX_writev_complete_notify_func;
2624     transport->install_pull_schedule_func = (CMTransport_install_pull_schedule)libcmfabric_LTX_install_pull_schedule;
2625     transport->get_transport_characteristics = NULL;
2626     if (transport->transport_init) {
2627 	transport->trans_data = transport->transport_init(cm, svc, transport);
2628     }
2629     return transport;
2630 }
2631 
2632 #endif
2633 
2634