1 /***** Includes *****/
2 #include "config.h"
3 #include <sys/types.h>
4
5 #include <inttypes.h>
6 #include <getopt.h>
7
8
9 #include <time.h>
10 #ifdef HAVE_SYS_TIME_H
11 #include <sys/time.h>
12 #endif
13 #ifdef HAVE_SYS_TIMES_H
14 #include <sys/times.h>
15 #endif
16 #include <sys/socket.h>
17 #ifdef HAVE_SYS_SOCKIO_H
18 #include <sys/sockio.h>
19 #endif
20 #ifdef HAVE_SYS_UN_H
21 #include <sys/un.h>
22 #endif
23 #ifdef HAVE_SYS_UIO_H
24 #include <sys/uio.h>
25 #endif
26 #ifdef HAVE_HOSTLIB_H
27 #include "hostLib.h"
28 #endif
29 #ifdef HAVE_STREAMS_UN_H
30 #include <streams/un.h>
31 #endif
32 #include <netinet/in.h>
33 #include <arpa/inet.h>
34 #ifdef HAVE_NETDB_H
35 #include <netdb.h>
36 #endif
37 #include <stdio.h>
38 #include <fcntl.h>
39 #ifndef HAVE_WINDOWS_H
40 #include <net/if.h>
41 #include <netinet/tcp.h>
42 #include <sys/ioctl.h>
43 #include <errno.h>
44 #endif
45 #ifdef HAVE_UNISTD_H
46 #include <unistd.h>
47 #endif
48 #undef NDEBUG
49 #include <assert.h>
50 #include <signal.h>
51 #include <stdlib.h>
52 #include <string.h>
53 #include <limits.h>
54 #ifdef HAVE_MEMORY_H
55 #include <memory.h>
56 #endif
57 #include <rdma/fabric.h>
58 #include <rdma/fi_endpoint.h>
59 #include <rdma/fi_rma.h>
60 #include <rdma/fi_cm.h>
61 #include <rdma/fi_errno.h>
62 #include <rdma/fi_eq.h>
63
64 #include <atl.h>
65 #include "evpath.h"
66 #include "cm_transport.h"
67 #include "cm_internal.h"
68 #include "ev_select.h"
69
70 #include <stdlib.h>
71
72 #ifndef SOCKET_ERROR
73 #define SOCKET_ERROR -1
74 #endif
75
76 #define _WITH_IB_
77 #define PIGGYBACK 1025*10
78
79 #ifdef _WITH_IB_
80
81
82 #if defined (__INTEL_COMPILER)
83 # pragma warning (disable: 869)
84 # pragma warning (disable: 310)
85 # pragma warning (disable: 1418)
86 # pragma warning (disable: 180)
87 # pragma warning (disable: 2259)
88 # pragma warning (disable: 177)
89 #endif
90
91
92 // BEGIN from shared.h in fabtests
93
94 /* haven't tested with earlier than version 1.2 */
95 #define FT_FIVERSION FI_VERSION(1,2)
96
97 void cq_readerr(struct fid_cq *cq, char *cq_str);
98
99
100 #define FT_PRINTERR(call, retv) \
101 do { fprintf(stderr, call "(): %d, %d (%s)\n", __LINE__, (int) retv, fi_strerror((int) -retv)); } while (0)
102
103 #define FT_ERR(fmt, ...) \
104 do { fprintf(stderr, "%s:%d: " fmt, __FILE__, __LINE__, ##__VA_ARGS__); } while (0)
105
106 #define MAX(a,b) (((a)>(b))?(a):(b))
107 #define MIN(a,b) (((a)<(b))?(a):(b))
108
109 // END from shared.h in fabtests
110
111
112 //all the message types have a queue associated with them
113 static char *msg_string[] = {"Request", "Response", "Piggyback"};
114 enum {msg_request = 0, msg_response = 1, msg_piggyback = 2} msg_type;
115
116 struct remote_entry {
117 uint64_t remote_addr;
118 uint32_t length;
119 uint64_t rkey;
120 };
121
122 struct request
123 {
124 uint64_t length;
125 uint32_t iovcnt;
126 uint32_t piggyback_length;
127 uint64_t request_ID;
128 struct remote_entry read_list[1];
129 };
130
131
132 struct response
133 {
134 uint64_t max_length;
135 uint64_t request_ID;
136 };
137
138 struct piggyback
139 {
140 uint32_t total_length;
141 uint32_t padding;
142 char body[4];
143 };
144
145
146 struct control_message
147 {
148 int type;
149 union {
150 struct request req;
151 struct response resp;
152 struct piggyback pb;
153 } u;
154 };
155
156 #define ptr_from_int64(p) (void *)(unsigned long)(p)
157 #define int64_from_ptr(p) (u_int64_t)(unsigned long)(p)
158
159
160 struct msg_item;
161
162 typedef enum pull_status_t {PULL_UNSUBMITTED = 0, PULL_SUBMITTED, PULL_COMPLETED, PULL_ACCOUNTED} pull_status_t;
163
164 struct pull_record {
165 struct remote_entry remote;
166 char *dest;
167 pull_status_t status;
168 struct msg_item *parent;
169 };
170
171 struct mr_list_item;
172 struct fabric_connection_data;
173
174 typedef struct msg_item {
175 struct fabric_connection_data *conn_data;
176 char *buffer;
177 long cur_buffer_size;
178 long message_size;
179 uint64_t request_ID;
180 struct fid_mr *mr;
181 int pull_count;
182 struct pull_record *pulls;
183 struct mr_list_item *mr_rec;
184 struct msg_item *next;
185 } *msg_item_p;
186
187 struct fabric_client_data;
188
189 typedef struct mr_list_item {
190 struct fabric_client_data *fabd;
191 long buffer_length;
192 void *buffer;
193 struct fid_mr *mr;
194 int in_use;
195 struct mr_list_item *next;
196 } *mr_list;
197
198 typedef struct fabric_client_data {
199 CManager cm;
200 CMtrans_services svc;
201 transport_entry trans;
202 struct fi_info *hints;
203
204 struct fid_fabric *fab;
205 struct fid_pep *listen_ep;
206 struct fid_domain *dom;
207 struct fid_eq *cmeq;
208
209 char *hostname;
210 int listen_port;
211 int lid;
212 int qpn;
213 int psn;
214 int port;
215 struct ibv_device *ibdev;
216 struct ibv_context *context;
217 struct ibv_comp_channel *send_channel;
218 struct ibv_comp_channel *recv_channel;
219 struct ibv_pd *pd;
220 struct ibv_cq *recv_cq;
221 struct ibv_cq *send_cq;
222 struct ibv_srq *srq;
223 int max_sge;
224
225 struct timeval pull_schedule_base;
226 struct timeval pull_schedule_period;
227 CMavail_period_ptr avail;
228
229 mr_list existing_mr_list;
230
231 int thread_init;
232 msg_item_p pull_queue;
233 msg_item_p completed_queue;
234 pthread_mutex_t pull_queue_mutex;
235 int thread_should_run;
236 struct fid_wait *send_waitset;
237 fd_set readset;
238 int nfds;
239 int wake_read_fd;
240 int wake_write_fd;
241 struct fabric_connection_data **fcd_array_by_sfd;
242 int ncqs;
243 struct fid **cq_array;
244 } *fabric_client_data_ptr;
245
246
247 typedef struct remote_info
248 {
249 int iovcnt;
250 struct fid_mr **mrlist;
251 struct iovec *iov;
252 CMcompletion_notify_func notify_func;
253 void *notify_client_data;
254 }rinfo;
255
256 typedef struct fabric_connection_data {
257 fabric_client_data_ptr fabd;
258 struct fid_cq *rcq, *scq;
259 struct fid_mr *read_mr;
260 struct fid_mr *send_mr;
261 struct fid_ep *conn_ep;
262 size_t buffer_size;
263 void *mapped_recv_buf;
264 char *send_buf;
265 CMbuffer read_buf;
266 int max_credits;
267 int read_buffer_len;
268 int read_offset;
269
270 char *remote_host;
271 int remote_IP;
272 int remote_contact_port;
273 int fd;
274 int sfd; /* fd for the scq */
275 CMConnection conn;
276 struct ibv_qp *dataqp;
277 int infocount;
278 rinfo last_write;
279 int max_imm_data;
280 } *fabric_conn_data_ptr;
281
msg_offset()282 static inline int msg_offset()
283 {
284 return ((int) (((char *) (&(((struct control_message *)NULL)->u.pb.body[0]))) - ((char *) NULL)));
285 }
286
287 static int alloc_cm_res(fabric_client_data_ptr fabd);
288 static int alloc_ep_res(fabric_conn_data_ptr fcd, struct fi_info *fi);
289 static int bind_ep_res(fabric_conn_data_ptr fcd);
290 static void free_ep_res(fabric_conn_data_ptr fcd);
291 static void
292 hand_to_pull_thread(CMtrans_services svc, fabric_client_data_ptr fabd,
293 msg_item_p msg);
294
295
296 static atom_t CM_FD = -1;
297 static atom_t CM_THIS_CONN_PORT = -1;
298 static atom_t CM_PEER_CONN_PORT = -1;
299 static atom_t CM_PEER_IP = -1;
300 static atom_t CM_PEER_HOSTNAME = -1;
301 static atom_t CM_PEER_LISTEN_PORT = -1;
302 static atom_t CM_NETWORK_POSTFIX = -1;
303 static atom_t CM_IP_PORT = -1;
304 static atom_t CM_IP_HOSTNAME = -1;
305 static atom_t CM_IP_ADDR = -1;
306 static atom_t CM_IP_INTERFACE = -1;
307 static atom_t CM_TRANSPORT = -1;
308
309 static int
check_host(hostname,sin_addr)310 check_host(hostname, sin_addr)
311 char *hostname;
312 void *sin_addr;
313 {
314 struct hostent *host_addr;
315 host_addr = gethostbyname(hostname);
316 if (host_addr == NULL) {
317 struct in_addr addr;
318 if (inet_aton(hostname, &addr) == 0) {
319 /*
320 * not translatable as a hostname or
321 * as a dot-style string IP address
322 */
323 return 0;
324 }
325 assert(sizeof(int) == sizeof(struct in_addr));
326 *((int *) sin_addr) = *((int*) &addr);
327 } else {
328 memcpy(sin_addr, host_addr->h_addr, host_addr->h_length);
329 }
330 return 1;
331 }
332
333 static fabric_conn_data_ptr
create_fabric_conn_data(CMtrans_services svc)334 create_fabric_conn_data(CMtrans_services svc)
335 {
336 fabric_conn_data_ptr fabric_conn_data = svc->malloc_func(sizeof(struct fabric_connection_data));
337 memset(fabric_conn_data, 0, sizeof(struct fabric_connection_data));
338 fabric_conn_data->remote_host = NULL;
339 fabric_conn_data->remote_contact_port = -1;
340 fabric_conn_data->fd = 0;
341 fabric_conn_data->read_buf = NULL;
342 fabric_conn_data->read_buffer_len = 0;
343
344 return fabric_conn_data;
345 }
346
347
348 static void
handle_scq_completion(struct fi_cq_data_entry * comp)349 handle_scq_completion(struct fi_cq_data_entry *comp)
350 {
351 if (comp->flags & FI_READ) {
352 struct pull_record *this_pull =
353 (struct pull_record *)comp->op_context;
354 // printf("Got an FI_READ completion\n");
355 this_pull->status = PULL_COMPLETED;
356 }
357 if (comp->flags & FI_SEND) {
358 *(int*)comp->op_context = 1;
359 }
360 }
361
internal_write_piggyback(CMtrans_services svc,fabric_conn_data_ptr fcd,int length,struct iovec * iov,int iovcnt)362 static int internal_write_piggyback(CMtrans_services svc,
363 fabric_conn_data_ptr fcd,
364 int length, struct iovec *iov, int iovcnt)
365 {
366 //this function is only called if length < piggyback
367 struct control_message *msg;
368 char *point;
369 int offset = msg_offset();
370 int i;
371
372 if(length >= PIGGYBACK)
373 {
374 //should never happen
375 return -1;
376 }
377
378 msg = malloc(offset + length);
379 memset(msg, 0, offset+length);
380 msg->type = msg_piggyback;
381 msg->u.pb.total_length = length + offset;
382 point = &msg->u.pb.body[0];
383 svc->trace_out(fcd->fabd->cm, "CMFABRIC sending piggyback msg of length %d,",
384 length);
385
386
387 for (i = 0; i < iovcnt; i++)
388 {
389 memcpy(point, iov[i].iov_base, iov[i].iov_len);
390 point += iov[i].iov_len;
391 }
392
393 {
394
395 memcpy(fcd->send_buf, msg, msg->u.pb.total_length);
396 int ret, sent = 0;
397
398 svc->trace_out(fcd->fabd->cm, "fi_send on conn_ep, length %d, send_buf %p\n", msg->u.pb.total_length, fcd->send_buf);
399 ret = fi_send(fcd->conn_ep, fcd->send_buf, msg->u.pb.total_length, fi_mr_desc(fcd->send_mr), 0, &sent);
400 if (ret) {
401 FT_PRINTERR("fi_send", ret);
402 return ret;
403 }
404
405 /* Read send queue */
406 do {
407 struct fi_cq_data_entry comp;
408 ret = fi_cq_read(fcd->scq, &comp, 1);
409 if (ret < 0 && ret != -FI_EAGAIN) {
410 FT_PRINTERR("fi_cq_read", ret);
411 cq_readerr(fcd->scq, " in internal write piggyback");
412 return ret;
413 }
414 if (ret == 1) handle_scq_completion(&comp);
415 } while (!sent);
416 }
417
418 free(msg);
419 return 0;
420 }
421
422
internal_write_response(CMtrans_services svc,fabric_conn_data_ptr fcd,int length,int64_t request_ID)423 static int internal_write_response(CMtrans_services svc,
424 fabric_conn_data_ptr fcd,
425 int length,
426 int64_t request_ID)
427 {
428 struct control_message msg;
429
430 msg.type = msg_response;
431 msg.u.resp.max_length = length;
432 msg.u.resp.request_ID = request_ID;
433
434 memcpy(fcd->send_buf, &msg, sizeof(msg));
435 int ret, sent = 0;
436
437 svc->trace_out(fcd->fabd->cm, "fi_send for write response\n");
438 ret = fi_send(fcd->conn_ep, fcd->send_buf, sizeof(msg), fi_mr_desc(fcd->send_mr), 0, &sent);
439 if (ret) {
440 FT_PRINTERR("fi_send", ret);
441 return ret;
442 }
443
444 /* Read send queue */
445 do {
446 struct fi_cq_data_entry comp;
447 svc->trace_out(fcd->fabd->cm, "fi_cq_read for send completion in write response\n");
448
449 ret = fi_cq_read(fcd->scq, &comp, 1);
450 if (ret < 0 && ret != -FI_EAGAIN) {
451 FT_PRINTERR("fi_cq_read", ret);
452 cq_readerr(fcd->scq, " in internal write response");
453 return ret;
454 }
455 if (ret == 1) handle_scq_completion(&comp);
456 } while (!sent);
457
458 return 0;
459 }
460
internal_write_request(CMtrans_services svc,fabric_conn_data_ptr fcd,int length,rinfo * request_info)461 static int internal_write_request(CMtrans_services svc,
462 fabric_conn_data_ptr fcd,
463 int length,
464 rinfo *request_info)
465 {
466 struct control_message *msg;
467 int ret, i, piggyback_prefix_vectors;
468 int size = sizeof(*msg) + request_info->iovcnt * sizeof(struct remote_entry);
469 msg = malloc(size);
470
471 msg->type = msg_request;
472 msg->u.req.length = length;
473 msg->u.req.iovcnt = request_info->iovcnt;
474 msg->u.req.request_ID = int64_from_ptr(request_info);
475 msg->u.req.piggyback_length = 0;
476
477 piggyback_prefix_vectors = 0;
478 for (i=0; i < request_info->iovcnt; i++) {
479 if (size + request_info->iov[i].iov_len < PIGGYBACK) {
480 size += request_info->iov[i].iov_len;
481 piggyback_prefix_vectors++;
482 } else {
483 break;
484 }
485 }
486 msg->u.req.iovcnt -= piggyback_prefix_vectors;
487 /* redo size */
488 size = sizeof(*msg) + (request_info->iovcnt-piggyback_prefix_vectors)
489 * sizeof(struct remote_entry);
490
491 for (i=0; i < piggyback_prefix_vectors; i++) {
492 msg = realloc(msg, size + request_info->iov[i].iov_len);
493 memcpy(((char*)msg) + size, request_info->iov[i].iov_base,
494 request_info->iov[i].iov_len);
495 size += request_info->iov[i].iov_len;
496 msg->u.req.piggyback_length += request_info->iov[i].iov_len;
497 }
498 /* handle remaining */
499 for (; i < request_info->iovcnt; i++) {
500 int j = i - piggyback_prefix_vectors;
501 msg->u.req.read_list[j].remote_addr = int64_from_ptr(request_info->iov[i].iov_base);
502 msg->u.req.read_list[j].length = request_info->iov[i].iov_len;
503 msg->u.req.read_list[j].rkey = fi_mr_key(request_info->mrlist[i]);
504 svc->trace_out(fcd->fabd->cm, "Adding source buffer[%d] %lx, len %d, key %lx\n",
505 i, msg->u.req.read_list[j].remote_addr, msg->u.req.read_list[j].length,
506 msg->u.req.read_list[j].rkey);
507 }
508
509 svc->trace_out(fcd->fabd->cm, "Doing internal write request, writing %d bytes", size);
510
511 memcpy(fcd->send_buf, msg, size);
512 int sent = 0;
513 ret = fi_send(fcd->conn_ep, fcd->send_buf, size, fi_mr_desc(fcd->send_mr), 0, &sent);
514 if (ret) {
515 FT_PRINTERR("fi_send", ret);
516 return ret;
517 }
518
519 /* Read send queue */
520 do {
521 struct fi_cq_data_entry comp;
522 ret = fi_cq_read(fcd->scq, &comp, 1);
523 if (ret < 0 && ret != -FI_EAGAIN) {
524 FT_PRINTERR("fi_cq_read", ret);
525 cq_readerr(fcd->scq, " in internal write request");
526 return ret;
527 }
528 if (ret == 1) handle_scq_completion(&comp);
529 } while (!sent);
530 return 0;
531 }
532
handle_response(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)533 static int handle_response(CMtrans_services svc,
534 fabric_conn_data_ptr fcd,
535 struct control_message *msg)
536 {
537
538 //read back response
539 struct response *rep;
540 rinfo *write_request;
541
542 rep = &msg->u.resp;
543 write_request = ptr_from_int64(rep->request_ID);
544
545 if (rep->max_length == -1) {
546 fprintf(stderr, "WRITE FAILED, request %p\n", write_request);
547 return -1;
548 }
549
550 if (write_request->notify_func) {
551 (write_request->notify_func)( write_request->notify_client_data);
552 }
553 svc->wake_any_pending_write(fcd->conn);
554 return 0;
555
556 }
557
558 static mr_list
get_free_mr(fabric_client_data_ptr fabd,long length)559 get_free_mr(fabric_client_data_ptr fabd, long length)
560 {
561 mr_list list, return_item = NULL;
562 pthread_mutex_lock(&fabd->pull_queue_mutex);
563 list = fabd->existing_mr_list;
564 while (list != NULL) {
565 if ((!list->in_use) && (list->buffer_length >= length)) {
566 list->in_use = 1;
567 return_item = list;
568 break;
569 }
570 list = list->next;
571 }
572 pthread_mutex_unlock(&fabd->pull_queue_mutex);
573 return return_item;
574 }
575
576 static void
add_mr_to_list(fabric_client_data_ptr fabd,mr_list mr_rec)577 add_mr_to_list(fabric_client_data_ptr fabd, mr_list mr_rec)
578 {
579 pthread_mutex_lock(&fabd->pull_queue_mutex);
580 mr_rec->fabd = fabd;
581 mr_rec->next = fabd->existing_mr_list;
582 fabd->existing_mr_list = mr_rec;
583 pthread_mutex_unlock(&fabd->pull_queue_mutex);
584 }
585
586 static void
free_func(void * mr_item_v)587 free_func(void *mr_item_v)
588 {
589 fabric_client_data_ptr fabd;
590 mr_list list, mr_item = mr_item_v;
591 fabd = mr_item->fabd;
592 pthread_mutex_lock(&fabd->pull_queue_mutex);
593 list = fabd->existing_mr_list;
594 while (list != NULL) {
595 if (list == mr_item_v) {
596 list->in_use = 0;
597 break;
598 }
599 list = list->next;
600 }
601 if (list == NULL) {
602 fprintf(stderr, "libfabric MR list inconsistency in free_func\n");
603 }
604 pthread_mutex_unlock(&fabd->pull_queue_mutex);
605 }
606
607
608 static void
add_to_pull_queue(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)609 add_to_pull_queue(CMtrans_services svc, fabric_conn_data_ptr fcd,
610 struct control_message *msg)
611 {
612 int i;
613 struct request *req = &msg->u.req;
614 int header_size = sizeof(*msg) + req->iovcnt * sizeof(struct remote_entry);
615
616
617 msg_item_p msg_pull_item = calloc(1, sizeof(struct msg_item));
618 msg_pull_item->message_size = req->length;
619 msg_pull_item->buffer = malloc(req->piggyback_length);
620 msg_pull_item->cur_buffer_size = req->piggyback_length;
621 /* copy portion of msg that was piggybacked */
622 memcpy(msg_pull_item->buffer, (char*)msg + header_size, req->piggyback_length);
623 msg_pull_item->pull_count = req->iovcnt;
624 msg_pull_item->pulls = calloc(req->iovcnt, sizeof(struct pull_record));
625 msg_pull_item->conn_data = fcd;
626 msg_pull_item->request_ID = req->request_ID;
627 for (i = 0; i < msg_pull_item->pull_count; i++) {
628 msg_pull_item->pulls[i].parent = msg_pull_item;
629 msg_pull_item->pulls[i].remote = req->read_list[i];
630 }
631 hand_to_pull_thread(svc, fcd->fabd, msg_pull_item);
632 }
633
634 static int
perform_pull(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)635 perform_pull(CMtrans_services svc, fabric_conn_data_ptr fcd,
636 struct control_message *msg)
637 {
638 int ret;
639 int count;
640 int i;
641 char *ptr;
642 int header_size;
643 struct request *req;
644 void *buffer;
645 struct fid_mr *mr;
646 struct mr_list_item *mr_rec;
647 CMbuffer cb;
648 req = &msg->u.req;
649
650 svc->trace_out(fcd->fabd->cm, "In handle request, len is %ld, iovcnt %d, request_ID %lx, piggyback_length = %d\n",
651 req->length, req->iovcnt, req->request_ID, req->piggyback_length);
652 mr_rec = get_free_mr(fcd->fabd, req->length);
653 if (mr_rec == NULL) {
654 mr_rec = calloc(1, sizeof(*mr_rec));
655 buffer = malloc(req->length);
656 mr_rec->buffer_length = req->length;
657 mr_rec->buffer = buffer;
658 mr_rec->in_use = 1;
659 mr_rec->next = NULL;
660 svc->trace_out(fcd->fabd->cm, "fi_mr_reg, buff %p, size %ld with attrs REMOTE_READ,REMOTE_WRITE, SEND, RECV\n", buffer, req->length);
661 ret = fi_mr_reg(fcd->fabd->dom, buffer, req->length,
662 FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV,
663 0, 0, 0, &mr, NULL);
664 if(ret) {
665 FT_PRINTERR("fi_mr_reg", ret);
666 svc->trace_out(fcd->fabd->cm, "Failed to get memory\n");
667 internal_write_response(svc, fcd, -1, req->request_ID);
668 return ret;
669 }
670
671 mr_rec->mr = mr;
672 add_mr_to_list(fcd->fabd, mr_rec);
673 } else {
674 svc->trace_out(fcd->fabd->cm, "Reusing exiting MR buff %p, size %ld\n", mr_rec->buffer, mr_rec->buffer_length);
675 buffer = mr_rec->buffer;
676 mr = mr_rec->mr;
677 mr_rec->in_use = 1;
678 }
679 cb = svc->create_data_and_link_buffer(fcd->fabd->cm, buffer, req->length);
680 cb->return_callback = free_func;
681 cb->return_callback_data = (void*)mr_rec;
682 header_size = sizeof(*msg) + req->iovcnt * sizeof(struct remote_entry);
683 memcpy(cb->buffer, (char*)msg + header_size, req->piggyback_length);
684
685 ptr = cb->buffer + req->piggyback_length;
686 count = req->iovcnt;
687 for (i=0; i < req->iovcnt; i++) {
688 struct fi_cq_data_entry comp;
689 struct pull_record this_pull;
690 this_pull.status = PULL_SUBMITTED;
691 int call_count = 0;
692 svc->trace_out(fcd->fabd->cm, "fi_read, buffer %p, len %d, (keys.rkey %lx, keys.addr %lx)\n", ptr, req->read_list[i].length, req->read_list[i].rkey, req->read_list[i].remote_addr);
693 ssize_t ret = fi_read(fcd->conn_ep, ptr, req->read_list[i].length, fi_mr_desc(mr),
694 0, req->read_list[i].remote_addr, req->read_list[i].rkey, (void*)&this_pull);
695 ptr += req->read_list[i].length;
696 do {
697 /* Read send queue */
698 ret = fi_cq_read(fcd->scq, &comp, 1);
699 if (ret < 0 && ret != -FI_EAGAIN) {
700 FT_PRINTERR("fi_cq_read", ret);
701 cq_readerr(fcd->scq, "scq");
702 }
703 call_count++;
704 if (ret == 1) handle_scq_completion(&comp);
705 } while (this_pull.status != PULL_COMPLETED);
706
707 if (comp.flags & FI_READ) {
708 count--;
709 }
710 }
711
712
713 while (count != 0) {
714 struct fi_cq_data_entry comp;
715 ret = fi_cq_read(fcd->scq, &comp, 1);
716 if (ret == -FI_EAGAIN) {printf("Eagain\n"); continue;}
717 if (ret < 0 && ret != -FI_EAGAIN) {
718 if (ret == -FI_EAVAIL) {
719 cq_readerr(fcd->scq, "scq");
720 } else {
721 FT_PRINTERR("fi_cq_read", ret);
722 }
723 continue;
724 } else if (ret > 0) {
725 printf("Successful remote read, Completion size is %ld, data %p\n", comp.len, comp.buf);
726 }
727 if (ret == 1) handle_scq_completion(&comp);
728
729 if (comp.flags & FI_READ) {
730 count--;
731 }
732 }
733
734 fcd->read_buf = cb;
735 svc->trace_out(fcd->fabd->cm, "FIrst 16 bytes of receive buffer (len %d) are %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x \n", req->length, ((unsigned char*)cb->buffer)[0], ((unsigned char*)cb->buffer)[1], ((unsigned char*)cb->buffer)[2], ((unsigned char*)cb->buffer)[3], ((unsigned char*)cb->buffer)[4], ((unsigned char*)cb->buffer)[5], ((unsigned char*)cb->buffer)[6], ((unsigned char*)cb->buffer)[7], ((unsigned char*)cb->buffer)[8], ((unsigned char*)cb->buffer)[9], ((unsigned char*)cb->buffer)[10], ((unsigned char*)cb->buffer)[11], ((unsigned char*)cb->buffer)[12], ((unsigned char*)cb->buffer)[13], ((unsigned char*)cb->buffer)[14], ((unsigned char*)cb->buffer)[15]);
736 fcd->read_buffer_len = req->length;
737 svc->trace_out(fcd->fabd->cm, "CMFABRIC handle_request completed");
738 internal_write_response(svc, fcd, req->length, req->request_ID);
739 return 0;
740 }
741
742
743 static void
wake_pull_thread(fabric_client_data_ptr fabd)744 wake_pull_thread(fabric_client_data_ptr fabd)
745 {
746 static char buffer = 'W'; /* doesn't matter what we write */
747 if (!fabd->send_waitset) {
748 if (write(fabd->wake_write_fd, &buffer, 1) != 1) {
749 if (fabd->thread_should_run)
750 printf("Whoops, wake_pull_thread write failed\n");
751 }
752 } else {
753 /* send a cq wake */
754 }
755 }
756
757 static void
hand_to_pull_thread(CMtrans_services svc,fabric_client_data_ptr fabd,msg_item_p msg)758 hand_to_pull_thread(CMtrans_services svc, fabric_client_data_ptr fabd,
759 msg_item_p msg)
760 {
761 struct msg_item *queue = fabd->pull_queue;
762 msg->next = NULL;
763 pthread_mutex_lock(&fabd->pull_queue_mutex);
764 if (fabd->pull_queue == NULL) {
765 fabd->pull_queue = msg;
766 } else {
767 while (queue->next) queue = queue->next;
768 queue->next = msg;
769 }
770 pthread_mutex_unlock(&fabd->pull_queue_mutex);
771 /* wake the thread if he's sleeping */
772 wake_pull_thread(fabd);
773 }
774
775 static void
return_completed_pull(CMtrans_services svc,fabric_client_data_ptr fabd,msg_item_p msg)776 return_completed_pull(CMtrans_services svc, fabric_client_data_ptr fabd,
777 msg_item_p msg)
778 {
779 pthread_mutex_lock(&fabd->pull_queue_mutex);
780 /* find msg in pull queue and take it out */
781 if (fabd->pull_queue == msg) {
782 fabd->pull_queue = msg->next;
783 msg->next = NULL;
784 } else {
785 struct msg_item *queue = fabd->pull_queue;
786 while (queue->next != msg) queue = queue->next;
787 queue->next = msg->next;
788 msg->next = NULL;
789 }
790
791 /* find add msg to completion queue */
792 if (fabd->completed_queue == NULL) {
793 fabd->completed_queue = msg;
794 } else {
795 struct msg_item *queue = fabd->completed_queue;
796 while (queue->next) queue = queue->next;
797 queue->next = msg;
798 }
799 svc->trace_out(fabd->cm, "Returning completed msg to the CM network thread\n");
800 pthread_mutex_unlock(&fabd->pull_queue_mutex);
801 /*
802 * this should wake the CM server thread and we'll check the
803 * pull completion queue in our handler
804 */
805 fabd->svc->wake_comm_thread(fabd->cm);
806
807 }
808
809 static void
kill_thread(fabric_client_data_ptr fabd)810 kill_thread(fabric_client_data_ptr fabd)
811 {
812 fabd->thread_should_run = 0;
813 if (fabd->thread_init) wake_pull_thread(fabd);
814 fabd->thread_init = 0;
815 }
816
817 static void
update_period_base(fabric_client_data_ptr fabd,struct timeval * now)818 update_period_base(fabric_client_data_ptr fabd, struct timeval *now)
819 {
820
821 struct timeval next;
822 /* add period to base until it's greater than now.
823 * Return base before the last add so that base is just less than now.
824 */
825 timeradd(&fabd->pull_schedule_base, &fabd->pull_schedule_period,
826 &next);
827 while(timercmp(&next, now, <)) {
828 fabd->pull_schedule_base = next;
829 timeradd(&fabd->pull_schedule_base, &fabd->pull_schedule_period,
830 &next);
831 }
832 }
833
834 static int
in_pull_period(fabric_client_data_ptr fabd,struct timeval * now)835 in_pull_period(fabric_client_data_ptr fabd, struct timeval *now)
836 {
837 int i = 0;
838 int period_count = 0;
839 struct timeval cur_offset, zero = {0,0};
840 timersub(now, &fabd->pull_schedule_base, &cur_offset);
841 update_period_base(fabd, now);
842 while (timercmp(&fabd->avail[period_count].duration, &zero, !=)) {
843 period_count++;
844 }
845 for (i = 0; i < period_count; i++) {
846 struct timeval end;
847 timeradd(&fabd->avail[i].offset, &fabd->avail[i].duration, &end);
848 if (timercmp(&cur_offset, &end, <) &&
849 timercmp(&cur_offset, &fabd->avail[i].offset, >)) return 1;
850 }
851 return 0;
852 }
853
854 static struct timeval
delay_until_wake(fabric_client_data_ptr fabd,struct timeval * now)855 delay_until_wake(fabric_client_data_ptr fabd, struct timeval *now)
856 {
857 /* calculate the delay until the next pull period opens */
858 int i = 0;
859 int period_count = 0;
860 struct timeval cur_offset, zero = {0,0};
861 timersub(now, &fabd->pull_schedule_base, &cur_offset);
862 update_period_base(fabd, now);
863 while (timercmp(&fabd->avail[period_count].duration, &zero, !=)) {
864 period_count++;
865 }
866 /* the one we want is the next start time past now */
867 for (i = 0; i < period_count; i++) {
868 if (timercmp(&fabd->avail[i].offset, &cur_offset, >)) {
869 struct timeval ret;
870 timersub(&fabd->avail[i].offset, &cur_offset, &ret);
871 return ret;
872 }
873 }
874 /* we must have been past the last offset,
875 return the first offset of the next period */
876 struct timeval ret;
877 timeradd(&fabd->pull_schedule_period, &fabd->avail[0].offset, &ret);
878 timersub(&ret, &cur_offset, &ret);
879 return ret;
880 }
881
882 static void
add_completion_fd(fabric_client_data_ptr fabd,fabric_conn_data_ptr fcd)883 add_completion_fd(fabric_client_data_ptr fabd, fabric_conn_data_ptr fcd)
884 {
885 if (fcd->sfd > fabd->nfds) {
886 fabd->fcd_array_by_sfd = realloc(fabd->fcd_array_by_sfd,
887 sizeof(fabd->fcd_array_by_sfd[0]) * fcd->sfd+1);
888 fabd->nfds = fcd->sfd;
889 }
890 fabd->fcd_array_by_sfd[fcd->sfd] = fcd;
891 fabd->cq_array = realloc(fabd->cq_array,
892 sizeof(fabd->cq_array[0]) * (fabd->ncqs+1));
893 fabd->cq_array[fabd->ncqs] = &fcd->scq->fid;
894 fabd->ncqs++;
895 FD_SET(fcd->sfd, &fabd->readset);
896 }
897
898
899 static void
remove_completion_fd(fabric_client_data_ptr fabd,fabric_conn_data_ptr fcd)900 remove_completion_fd(fabric_client_data_ptr fabd, fabric_conn_data_ptr fcd)
901 {
902 int i;
903 for (i = 0; i < fabd->ncqs; i++) {
904 if (fabd->cq_array[i] == &fcd->scq->fid) {
905 memcpy(&fabd->cq_array[i+1], &fabd->cq_array[i],
906 sizeof(fabd->cq_array[0]) * (fabd->ncqs - i - 1));
907 }
908 }
909 fabd->ncqs--;
910 FD_CLR(fcd->sfd, &fabd->readset);
911 }
912
913 #include <poll.h>
914
915 static int
can_do_something(fabric_client_data_ptr fabd)916 can_do_something(fabric_client_data_ptr fabd)
917 {
918 CMtrans_services svc = fabd->svc;
919 int did_something = 0;
920 int i;
921 msg_item_p pull = fabd->pull_queue;
922 while (pull) {
923 msg_item_p next = pull->next;
924 fabric_conn_data_ptr fcd = pull->conn_data;
925 svc->trace_out(fabd->cm, "Got a pull message with buffer count %d, next %p\n",
926 pull->pull_count, pull->next);
927 if (!pull->mr_rec) {
928 int ret;
929 /* try to get a mr */
930 mr_list mr_rec = get_free_mr(fcd->fabd, pull->message_size);
931 char *buffer = NULL;
932 if (mr_rec == NULL) {
933 if (0 /*!allow_register_new_mr(fcd, pull->message_size)*/) {
934 pull = pull->next;
935 continue;
936 }
937 mr_rec = calloc(1, sizeof(*mr_rec));
938 buffer = malloc(pull->message_size);
939 mr_rec->buffer_length = pull->message_size;
940 mr_rec->buffer = buffer;
941 mr_rec->in_use = 1;
942 mr_rec->next = NULL;
943 svc->trace_out(fcd->fabd->cm, "fi_mr_reg, buff %p, size %ld with attrs REMOTE_READ,REMOTE_WRITE, SEND, RECV\n", buffer, pull->message_size);
944 ret = fi_mr_reg(fcd->fabd->dom, buffer, pull->message_size,
945 FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV,
946 0, 0, 0, &mr_rec->mr, NULL);
947 if(ret) {
948 FT_PRINTERR("fi_mr_reg", ret);
949 svc->trace_out(fcd->fabd->cm, "Failed to get memory\n");
950 internal_write_response(svc, fcd, -1, pull->request_ID);
951 return ret;
952 }
953 pull->mr = mr_rec->mr;
954 pull->mr_rec = mr_rec;
955 add_mr_to_list(fcd->fabd, mr_rec);
956 } else {
957 svc->trace_out(fcd->fabd->cm, "Reusing exiting MR buff %p, size %ld\n", mr_rec->buffer, mr_rec->buffer_length);
958 mr_rec->in_use = 1;
959 pull->mr_rec = mr_rec;
960 buffer = mr_rec->buffer;
961 pull->mr = mr_rec->mr;
962 }
963 /* piggyback part of msg is in old pull->buffer */
964 memcpy(buffer, pull->buffer, pull->cur_buffer_size);
965 free(pull->buffer);
966 pull->buffer = buffer;
967
968 /* setup local buffers, now that we have memory */
969 char *ptr = pull->buffer + pull->cur_buffer_size;
970 pull->cur_buffer_size = pull->message_size;
971 for ( i = 0; i < pull->pull_count; i++) {
972 pull->pulls[i].dest = ptr;
973 ptr += pull->pulls[i].remote.length;
974 }
975 pull->cur_buffer_size = mr_rec->buffer_length;
976 did_something = 1;
977 }
978 /* at this point we should have an MR and mr_rec */
979 for ( i = 0; i < pull->pull_count; i++) {
980 if (pull->pulls[i].status == PULL_UNSUBMITTED) {
981
982 // struct fi_cq_data_entry comp;
983 struct remote_entry *remote = &pull->pulls[i].remote;
984 struct pull_record *this_pull = &pull->pulls[i];
985 if (0 /* can't do more pulls for some reason */) {
986 continue;
987 }
988
989 svc->trace_out(fcd->fabd->cm, "fi_read, buffer %p, len %d, (keys.rkey %lx, keys.addr %lx)\n", this_pull->dest, remote->length, remote->rkey, remote->remote_addr);
990 // struct pollfd poll_list[1];
991 // poll_list[0].fd = fcd->sfd;
992 // poll_list[0].events = POLLIN|POLLPRI;
993 // int pret = poll(&poll_list[0], (unsigned long)1, 0);
994 // printf("Before read poll list ret %d, revents = %x\n", pret, poll_list[0].revents);
995 ssize_t ret = fi_read(fcd->conn_ep, this_pull->dest,
996 remote->length, fi_mr_desc(pull->mr),
997 0, remote->remote_addr, remote->rkey,
998 (void*)this_pull);
999 if (ret) continue;
1000 this_pull->status = PULL_SUBMITTED;
1001 did_something = 1;
1002
1003 add_completion_fd(fabd, fcd);
1004 /* do { */
1005 /* /\* Read send queue *\/ */
1006 /* ret = fi_cq_read(fcd->scq, &comp, 1); */
1007 /* if (ret < 0 && ret != -FI_EAGAIN) { */
1008 /* FT_PRINTERR("fi_cq_read", ret); */
1009 /* cq_readerr(fcd->scq, "scq"); */
1010 /* } */
1011 /* // pret = poll(&poll_list[0], (unsigned long)1, 0); */
1012 /* // printf("AFTER read poll list ret %d, revents = %x\n", pret, poll_list[0].revents); */
1013 /* } while (ret == -FI_EAGAIN); */
1014 /* svc->trace_out(fcd->fabd->cm, "FI_READ Completion op_context is %p, flags %lx, len %ld, buf %p, data %lx\n", */
1015 /* comp.op_context, comp.flags, comp.len, comp.buf, comp.data); */
1016 /* if (ret == 1) { */
1017 /* printf("Immediate completion\n"); */
1018 /* handle_scq_completion(&comp); */
1019 /* } */
1020 }
1021 }
1022 /* if we get to this point, all pulls must have been submitted. */
1023 if (pull->mr_rec) {
1024 int all_done = 1;
1025 for ( i = 0; i < pull->pull_count; i++) {
1026 if (pull->pulls[i].status == PULL_COMPLETED) {
1027 pull->pulls[i].status = PULL_ACCOUNTED;
1028 }
1029 if (pull->pulls[i].status != PULL_ACCOUNTED) {
1030 all_done = 0;
1031 }
1032 }
1033 if (all_done) {
1034 remove_completion_fd(fabd, fcd);
1035 return_completed_pull(svc, fcd->fabd, pull);
1036 }
1037 }
1038 pull = next;
1039 }
1040 if (!did_something) svc->trace_out(fabd->cm, "PULL_THREAD Nothing to do\n");
1041 return did_something;
1042 }
1043
1044 static void
handle_completions(fabric_client_data_ptr fabd,fd_set * readset)1045 handle_completions(fabric_client_data_ptr fabd, fd_set *readset)
1046 {
1047 /* everything left should be scq completions */
1048 int i;
1049 if (readset) {
1050 for (i=0; i < fabd->nfds; i++) {
1051 if (FD_ISSET(i, readset)) {
1052 int ret;
1053 struct fi_cq_data_entry comp;
1054 /* Read send queue */
1055 fabric_conn_data_ptr fcd = fabd->fcd_array_by_sfd[i];
1056 ret = fi_cq_read(fcd->scq, &comp, 1);
1057 if (ret < 0 && ret != -FI_EAGAIN) {
1058 FT_PRINTERR("fi_cq_read", ret);
1059 cq_readerr(fcd->scq, "scq");
1060 }
1061 if (ret == 1) {
1062 handle_scq_completion(&comp);
1063 }
1064 }
1065 }
1066 } else {
1067 for (i=0; i < fabd->ncqs; i++) {
1068 int ret;
1069 struct fi_cq_data_entry comp;
1070 /* Read send queue */
1071 ret = fi_cq_read((struct fid_cq *) fabd->cq_array[i], &comp, 1);
1072 if (ret < 0 && ret != -FI_EAGAIN) {
1073 FT_PRINTERR("fi_cq_read", ret);
1074 cq_readerr((struct fid_cq *)fabd->cq_array[i], "scq");
1075 }
1076 if (ret == 1) {
1077 handle_scq_completion(&comp);
1078 }
1079 }
1080 }
1081 }
1082
1083
1084 static void
pull_thread(fabric_client_data_ptr fabd)1085 pull_thread(fabric_client_data_ptr fabd)
1086 {
1087 // thread_setup(); /* extract wait fd, other? */
1088 // CMtrans_services svc = fabd->svc;
1089 while(fabd->thread_should_run) {
1090 struct timeval now, delay;
1091 fd_set readset;
1092 gettimeofday(&now, NULL);
1093 if (in_pull_period(fabd, &now)) {
1094 while (can_do_something(fabd));
1095 }
1096 /* calculate time to next wake */
1097 /* sleep until something happens, *or* the next pull period begins */
1098 delay = delay_until_wake(fabd, &now);
1099 readset = fabd->readset;
1100 if (fabd->ncqs == 0) {
1101 /* we're just waiting on wake fd */
1102 select(fabd->nfds+1, &readset, NULL, NULL, &delay);
1103 } else if (fi_trywait(fabd->fab, &fabd->cq_array[0], fabd->ncqs) == FI_SUCCESS) {
1104 select(fabd->nfds+1, &readset, NULL, NULL, &delay);
1105 if (FD_ISSET(fabd->wake_read_fd, &readset)) {
1106 /* read and discard wake byte */
1107 char buffer;
1108 if (read(fabd->wake_read_fd, &buffer, 1) != 1) {
1109 perror("wake read failed\n");
1110 }
1111 FD_CLR(fabd->wake_read_fd, &readset);
1112 }
1113 handle_completions(fabd, &readset);
1114 } else {
1115 /* test all CQs */
1116 handle_completions(fabd, NULL);
1117 }
1118 }
1119 // thread_free_resources;
1120 }
1121
1122 /*
1123 * handle a pull request message
1124 */
handle_request(CMtrans_services svc,fabric_conn_data_ptr fcd,struct control_message * msg)1125 static void handle_request(CMtrans_services svc,
1126 fabric_conn_data_ptr fcd,
1127 struct control_message *msg)
1128 {
1129 //handling the request message
1130 if (fcd->fabd->avail) {
1131 add_to_pull_queue(svc, fcd, msg);
1132 wake_pull_thread(fcd->fabd);
1133 } else {
1134 /* immediate pull and handle */
1135 (void) perform_pull(svc, fcd, msg);
1136 }
1137 }
1138
cq_readerr(struct fid_cq * cq,char * cq_str)1139 void cq_readerr(struct fid_cq *cq, char *cq_str)
1140 {
1141 struct fi_cq_err_entry cq_err;
1142 const char *err_str;
1143 int ret;
1144
1145 ret = fi_cq_readerr(cq, &cq_err, 0);
1146 if (ret < 0)
1147 FT_PRINTERR("fi_cq_readerr", ret);
1148
1149 err_str = fi_cq_strerror(cq, cq_err.prov_errno, cq_err.err_data, NULL, 0);
1150 fprintf(stderr, "%s %s (%d)\n", cq_str, err_str, cq_err.prov_errno);
1151 }
1152
1153 void
CMFABRIC_data_available(transport_entry trans,CMConnection conn)1154 CMFABRIC_data_available(transport_entry trans, CMConnection conn)
1155 {
1156 fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1157 CMtrans_services svc = fabd->svc;
1158 struct control_message *msg;
1159 fabric_conn_data_ptr fcd;
1160 int ret, call_data_available;
1161 CMbuffer CMbuffer_to_return = NULL;
1162 struct fid **fids = malloc(sizeof(fids[0]));
1163 fcd = (fabric_conn_data_ptr) svc->get_transport_data(conn);
1164 fids[0] = &fcd->rcq->fid;
1165 fabd->trans = trans;
1166
1167 svc->trace_out(fabd->cm, "At the beginning of CMFabric_data_available: ");
1168 ret = fi_trywait(fcd->fabd->fab, fids, 1);
1169 switch (ret) {
1170 case FI_SUCCESS:
1171 svc->trace_out(fabd->cm, "Try wait on rcq returned FI_SUCCESS");
1172 break;
1173 case -FI_EAGAIN:
1174 svc->trace_out(fabd->cm, "Try wait on rcq returned FI_EAGAIN, read cq events");
1175 break;
1176 default:
1177 svc->trace_out(fabd->cm, "Try wait on rcq returned %d", ret);
1178 }
1179 {
1180 struct fi_cq_data_entry comp;
1181 ret = fi_cq_read(fcd->rcq, &comp, 1);
1182 if (ret == -FI_EAGAIN) {
1183 return;
1184 }
1185 if (ret < 0 && ret != -FI_EAGAIN) {
1186 if (ret == -FI_EAVAIL) {
1187 cq_readerr(fcd->rcq, "rcq");
1188 } else {
1189 FT_PRINTERR("fi_cq_read", ret);
1190 return;
1191 }
1192 return;
1193 } else if (ret > 0) {
1194 //
1195 }
1196 svc->trace_out(fabd->cm, "FI_RECV Completion op_context is %p, flags %lx, len %ld, buf %p, data %lx\n",
1197 comp.op_context, comp.flags, comp.len, comp.buf, comp.data);
1198 }
1199
1200 fcd = (fabric_conn_data_ptr) svc->get_transport_data(conn);
1201 msg = (struct control_message *) fcd->mapped_recv_buf;
1202 svc->trace_out(fcd->fabd->cm, "CMFABRIC data available type = %s(%d)",
1203 msg_string[msg->type], msg->type);
1204
1205 call_data_available = 0;
1206 CMbuffer_to_return = NULL;
1207 switch(msg->type) {
1208 case msg_piggyback: {
1209 int offset = msg_offset();
1210 fcd->read_buffer_len = msg->u.pb.total_length - offset;
1211 svc->trace_out(fcd->fabd->cm, "CMFABRIC received piggyback msg of length %d, added to read_buffer",
1212 fcd->read_buffer_len);
1213
1214 fcd->read_buf = fcd->fabd->svc->get_data_buffer(trans->cm, fcd->read_buffer_len);
1215 memcpy(fcd->read_buf->buffer, &msg->u.pb.body[0], fcd->read_buffer_len);
1216 fcd->read_offset = 0;
1217
1218 CMbuffer_to_return = fcd->read_buf;
1219 call_data_available = 1;
1220 break;
1221 }
1222 case msg_response:
1223 handle_response(svc, fcd, msg);
1224 break;
1225 case msg_request:
1226 handle_request(svc, fcd, msg);
1227 call_data_available = 1;
1228 CMbuffer_to_return = fcd->read_buf;
1229 break;
1230 default:
1231 printf("Bad message type %d\n", msg->type);
1232 }
1233 /* post the next receive before relinquishing control */
1234 ret = fi_recv(fcd->conn_ep, fcd->mapped_recv_buf, fcd->buffer_size, fi_mr_desc(fcd->read_mr), 0, fcd->mapped_recv_buf);
1235 if (ret)
1236 FT_PRINTERR("fi_recv", ret);
1237
1238 if (call_data_available) {
1239 trans->data_available(trans, conn);
1240 }
1241 if (CMbuffer_to_return) {
1242 svc->return_data_buffer(trans->cm, CMbuffer_to_return);
1243 }
1244 svc->trace_out(fcd->fabd->cm, "CMFABRIC data_available returning");
1245 }
1246
1247 static int server_connect(fabric_conn_data_ptr fcd);
1248
1249 /*
1250 * Accept socket connection
1251 */
1252 static void
fabric_accept_conn(void * void_trans,void * void_conn_sock)1253 fabric_accept_conn(void *void_trans, void *void_conn_sock)
1254 {
1255 transport_entry trans = (transport_entry) void_trans;
1256 fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1257 CMtrans_services svc = fabd->svc;
1258 fabric_conn_data_ptr fcd;
1259 int fd, ret;
1260 struct sockaddr sock_addr;
1261 unsigned int sock_len = sizeof(sock_addr);
1262
1263 CMConnection conn;
1264 attr_list conn_attr_list = NULL;
1265
1266 //ib stuff
1267 fcd = create_fabric_conn_data(svc);
1268 fcd->fabd = fabd;
1269
1270 server_connect(fcd);
1271 //initialize the dataqp that will be used for all RC comms
1272
1273 conn_attr_list = create_attr_list();
1274 conn = svc->connection_create(trans, fcd, conn_attr_list);
1275 fcd->conn = conn;
1276
1277 sock_len = sizeof(sock_addr);
1278 memset(&sock_addr, 0, sock_len);
1279 // getsockname(sock, (struct sockaddr *) &sock_addr, &sock_len);
1280 // int_port_num = ntohs(((struct sockaddr_in *) &sock_addr)->sin_port);
1281 // add_attr(conn_attr_list, CM_THIS_CONN_PORT, Attr_Int4,
1282 // (attr_value) (long)int_port_num);
1283
1284 memset(&sock_addr, 0, sizeof(sock_addr));
1285 sock_len = sizeof(sock_addr);
1286 /* if (getpeername(sock, &sock_addr, &sock_len) == 0) { */
1287 /* int_port_num = ntohs(((struct sockaddr_in *) &sock_addr)->sin_port); */
1288 /* add_attr(conn_attr_list, CM_PEER_CONN_PORT, Attr_Int4, */
1289 /* (attr_value) (long)int_port_num); */
1290 /* fcd->remote_IP = ntohl(((struct sockaddr_in *) &sock_addr)->sin_addr.s_addr); */
1291 /* add_attr(conn_attr_list, CM_PEER_IP, Attr_Int4, */
1292 /* (attr_value) (long)fcd->remote_IP); */
1293 /* if (sock_addr.sa_family == AF_INET) { */
1294 /* struct hostent *host; */
1295 /* struct sockaddr_in *in_sock = (struct sockaddr_in *) &sock_addr; */
1296 /* host = gethostbyaddr((char *) &in_sock->sin_addr, */
1297 /* sizeof(struct in_addr), AF_INET); */
1298 /* if (host != NULL) { */
1299 /* fcd->remote_host = strdup(host->h_name); */
1300 /* add_attr(conn_attr_list, CM_PEER_HOSTNAME, Attr_String, */
1301 /* (attr_value) strdup(host->h_name)); */
1302 /* } */
1303 /* } */
1304 /* } */
1305 if (fcd->remote_host != NULL) {
1306 svc->trace_out(fabd->cm, "Accepted CMFABRIC socket connection from host \"%s\"",
1307 fcd->remote_host);
1308 } else {
1309 svc->trace_out(fabd->cm, "Accepted CMFABRIC socket connection from UNKNOWN host");
1310 }
1311
1312
1313 if ((ret = fi_control (&fcd->rcq->fid, FI_GETWAIT, (void *) &fd))) {
1314 FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1315 }
1316 add_attr(conn_attr_list, CM_FD, Attr_Int4,
1317 (attr_value) (long)fd);
1318
1319 svc->trace_out(fabd->cm, "Cmfabric Adding trans->data_available as action on fd %d", fd);
1320 svc->fd_add_select(fabd->cm, fd, (select_list_func) CMFABRIC_data_available,
1321 (void *) trans, (void *) conn);
1322
1323 svc->trace_out(fabd->cm, "Falling out of accept conn\n");
1324 free_attr_list(conn_attr_list);
1325 fcd->fd = fd;
1326 if ((ret = fi_control (&fcd->scq->fid, FI_GETWAIT, (void *) &fcd->sfd))) {
1327 FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1328 }
1329 }
1330
1331 /*
1332 * incoming event on CM eq
1333 */
1334 static void
fabric_service_incoming(void * void_trans,void * void_eq)1335 fabric_service_incoming(void *void_trans, void *void_eq)
1336 {
1337 transport_entry trans = (transport_entry) void_trans;
1338 fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1339 struct fi_eq_cm_entry entry;
1340 uint32_t event;
1341 ssize_t rd;
1342
1343 rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, FI_PEEK);
1344 if (rd != sizeof entry) {
1345 if (rd == -FI_EAVAIL) {
1346 struct fi_eq_err_entry error = {0};
1347 int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
1348 if (rc) {
1349 char buf[1024];
1350 fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
1351 error.err_data, buf, 1024));
1352 }
1353 } else {
1354 FT_PRINTERR("fi_eq_sread", rd);
1355 }
1356 return;
1357 }
1358
1359 if (event == FI_CONNREQ) {
1360 fabric_accept_conn(void_trans, void_eq);
1361 } else {
1362 rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
1363 if (event == FI_SHUTDOWN){
1364 fabd->svc->trace_out(fabd->cm, "CMFABRIC got a shutdown event for some conn, who knows which one?\n");
1365 } else {
1366 printf("Unexpected event in service incoming,%s %d\n", fi_tostr(&event, FI_TYPE_EQ_EVENT), event);
1367 }
1368 }
1369 }
1370
1371 extern void
1372 libcmfabric_LTX_shutdown_conn(svc, fcd)
1373 CMtrans_services svc;
1374 fabric_conn_data_ptr fcd;
1375 {
1376 svc->trace_out(fcd->fabd->cm, "CMFABRIC shutdown_conn, removing select %d\n",
1377 fcd->fd);
1378 svc->fd_remove_select(fcd->fabd->cm, fcd->fd);
1379 close(fcd->fd);
1380 //free(fcd->remote_host);
1381 //free(fcd->read_buffer);
1382 if (fcd->last_write.iov) free(fcd->last_write.iov);
1383 free(fcd);
1384 }
1385
1386
client_connect(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs,fabric_conn_data_ptr fcd)1387 static int client_connect(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs, fabric_conn_data_ptr fcd)
1388 {
1389 fabric_client_data_ptr fabd = fcd->fabd;
1390 struct fi_eq_cm_entry entry;
1391 uint32_t event;
1392 struct fi_info *fi;
1393 ssize_t rd;
1394 int ret, int_port_num;
1395 struct in_addr dest_ip;
1396 char *host_name, *host_rep = NULL;
1397 char *dst_port = NULL;
1398 int i;
1399
1400 /* Get fabric info */
1401 if (!get_int_attr(attrs, CM_IP_ADDR,(int*) & dest_ip.s_addr)) {
1402 svc->trace_out(cm, "CMFABRIC transport found no IP_ADDR attribute");
1403 } else {
1404 host_rep = malloc(16);
1405 dest_ip.s_addr = htonl(dest_ip.s_addr);
1406 sprintf(host_rep, "%s", inet_ntoa(dest_ip));
1407
1408 }
1409 if (!get_int_attr(attrs, CM_IP_PORT, (int*) & int_port_num)) {
1410 svc->trace_out(cm, "CMFABRIC transport found no IP_PORT attribute");
1411 } else {
1412 dst_port = malloc(10);
1413 sprintf(dst_port, "%d", int_port_num);
1414 }
1415 svc->trace_out(fabd->cm, "Connecting to addr, %s, port %s\n", host_rep, dst_port);
1416 if (!get_string_attr(attrs, CM_IP_HOSTNAME, &host_name)) {
1417 svc->trace_out(cm, "CMFABRIC transport found no IP_HOSTNAME attribute");
1418 } else {
1419 host_rep = malloc(strlen(host_name));
1420 for (i = 0; i < (strlen(host_name)/2); i++) {
1421 sscanf(&host_name[i*2], "%2hhx", &host_rep[i]);
1422 }
1423 /* printf("name len is %d\n", (int)strlen(host_name)/2); */
1424 /* for(i = 0; i < strlen(host_name)/2; i++) { */
1425 /* printf("%02x", (unsigned char) host_rep[i]); */
1426 /* } */
1427 /* printf(" done\n"); */
1428 }
1429 ret = fi_getinfo(FT_FIVERSION, host_rep, dst_port, 0, fabd->hints, &fi);
1430 svc->trace_out(cm, "%s return value fi is %s\n", "client", fi_tostr(fi, FI_TYPE_INFO));
1431 if (ret) {
1432 FT_PRINTERR("fi_getinfo", ret);
1433 goto err0;
1434 }
1435
1436 /* Open fabric */
1437 ret = fi_fabric(fi->fabric_attr, &fabd->fab, NULL);
1438 if (ret) {
1439 FT_PRINTERR("fi_fabric", ret);
1440 goto err1;
1441 }
1442
1443 /* Open domain */
1444 ret = fi_domain(fabd->fab, fi, &fabd->dom, NULL);
1445 if (ret) {
1446 FT_PRINTERR("fi_domain", ret);
1447 goto err2;
1448 }
1449
1450 ret = alloc_cm_res(fabd);
1451 if (ret)
1452 goto err4;
1453
1454 ret = alloc_ep_res(fcd, fi);
1455 if (ret)
1456 goto err5;
1457
1458 ret = bind_ep_res(fcd);
1459 if (ret)
1460 goto err6;
1461
1462 /* Connect to server */
1463 ret = fi_connect(fcd->conn_ep, fi->dest_addr, NULL, 0);
1464 if (ret) {
1465 FT_PRINTERR("fi_connect", ret);
1466 goto err6;
1467 }
1468
1469 /* Wait for the connection to be established */
1470 rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
1471 if (rd != sizeof entry) {
1472 if (ret == -FI_EAVAIL) {
1473 struct fi_eq_err_entry error = {0};
1474 int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
1475 if (rc) {
1476 char buf[1024];
1477 fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
1478 error.err_data, buf, 1024));
1479 }
1480 } else {
1481 FT_PRINTERR("fi_eq_sread", rd);
1482 }
1483 goto err6;
1484 }
1485
1486 if (event != FI_CONNECTED || entry.fid != &fcd->conn_ep->fid) {
1487 FT_ERR("Unexpected CM event %d fid %p (ep %p)\n", event, entry.fid, fcd->conn_ep);
1488 ret = -FI_EOTHER;
1489 goto err6;
1490 }
1491
1492 fi_freeinfo(fi);
1493 return 0;
1494
1495 err6:
1496 free_ep_res(fcd);
1497 err5:
1498 fi_close(&fabd->cmeq->fid);
1499 err4:
1500 fi_close(&fabd->dom->fid);
1501 err2:
1502 fi_close(&fabd->fab->fid);
1503 err1:
1504 fi_freeinfo(fi);
1505 err0:
1506 return ret;
1507 }
1508
1509 static int
initiate_conn(cm,svc,trans,attrs,fcd,conn_attr_list,no_more_redirect)1510 initiate_conn(cm, svc, trans, attrs, fcd, conn_attr_list, no_more_redirect)
1511 CManager cm;
1512 CMtrans_services svc;
1513 transport_entry trans;
1514 attr_list attrs;
1515 fabric_conn_data_ptr fcd;
1516 attr_list conn_attr_list;
1517 int no_more_redirect;
1518 {
1519 int int_port_num;
1520 fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
1521 char *host_name;
1522 int remote_IP = -1;
1523 static int host_ip = 0;
1524
1525 //fabric stuff
1526
1527 if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
1528 /* value pointer */ (attr_value *)(long) & host_name)) {
1529 svc->trace_out(cm, "CMFABRIC transport found no IP_HOST attribute");
1530 host_name = NULL;
1531 } else {
1532 svc->trace_out(cm, "CMFABRIC transport connect to host %s", host_name);
1533 }
1534 if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
1535 /* value pointer */ (attr_value *)(long) & host_ip)) {
1536 svc->trace_out(cm, "CMFABRIC transport found no IP_ADDR attribute");
1537 /* wasn't there */
1538 host_ip = 0;
1539 } else {
1540 svc->trace_out(cm, "CMFABRIC transport connect to host_IP %lx", host_ip);
1541 }
1542
1543 if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
1544 /* value pointer */ (attr_value *)(long) & int_port_num)) {
1545 svc->trace_out(cm, "CMFABRIC transport found no IP_PORT attribute");
1546 // return -1;
1547 } else {
1548 svc->trace_out(cm, "CMFABRIC transport connect to port %d", int_port_num);
1549 }
1550
1551 client_connect(cm, svc, trans, attrs, fcd);
1552
1553
1554 //here we write out the connection port to the other side.
1555 //for sockets thats all thats required. For IB we can use this to exchange information about the
1556 //IB parameters for the other side
1557
1558 svc->trace_out(cm, "--> Connection established");
1559 fcd->remote_host = host_name == NULL ? NULL : strdup(host_name);
1560 fcd->remote_IP = remote_IP;
1561 fcd->remote_contact_port = int_port_num;
1562 fcd->fd = 0;
1563 fcd->fabd = fabd;
1564
1565 memset(&fcd->last_write, 0, sizeof(rinfo));
1566 fcd->infocount = 0;
1567
1568
1569
1570 add_attr(conn_attr_list, CM_THIS_CONN_PORT, Attr_Int4,
1571 (attr_value) (long)int_port_num);
1572 add_attr(conn_attr_list, CM_PEER_IP, Attr_Int4,
1573 (attr_value) (long)fcd->remote_IP);
1574 /* if (getpeername(sock, &sock_addr.s, &sock_len) == 0) {
1575 int_port_num = ntohs(((struct sockaddr_in *) &sock_addr)->sin_port);
1576 add_attr(conn_attr_list, CM_PEER_CONN_PORT, Attr_Int4,
1577 (attr_value) (long)int_port_num);
1578 if (sock_addr.s.sa_family == AF_INET) {
1579 struct hostent *host;
1580 struct sockaddr_in *in_sock = (struct sockaddr_in *) &sock_addr;
1581 host = gethostbyaddr((char *) &in_sock->sin_addr,
1582 sizeof(struct in_addr), AF_INET);
1583 if (host != NULL) {
1584 fcd->remote_host = strdup(host->h_name);
1585 add_attr(conn_attr_list, CM_PEER_HOSTNAME, Attr_String,
1586 (attr_value) strdup(host->h_name));
1587 }
1588 }
1589 }
1590 */
1591 svc->trace_out(fabd->cm, "Falling out of init conn\n");
1592 return 1;
1593 }
1594
1595 /*
1596 * Initiate a socket connection with another data exchange. If port_num is -1,
1597 * establish a unix socket connection (name_str stores the file name of
1598 * the waiting socket). Otherwise, establish an INET socket connection
1599 * (name_str stores the machine name).
1600 */
1601 extern CMConnection
1602 libcmfabric_LTX_initiate_conn(cm, svc, trans, attrs)
1603 CManager cm;
1604 CMtrans_services svc;
1605 transport_entry trans;
1606 attr_list attrs;
1607 {
1608 fabric_conn_data_ptr fcd = create_fabric_conn_data(svc);
1609 attr_list conn_attr_list = create_attr_list();
1610 CMConnection conn;
1611 int fd, ret;
1612
1613 fcd->fabd = trans->trans_data;
1614
1615 if (initiate_conn(cm, svc, trans, attrs, fcd, conn_attr_list, 0) < 0)
1616 return NULL;
1617
1618 add_attr(conn_attr_list, CM_PEER_LISTEN_PORT, Attr_Int4,
1619 (attr_value) (long)fcd->remote_contact_port);
1620 conn = svc->connection_create(trans, fcd, conn_attr_list);
1621 fcd->conn = conn;
1622
1623 if ((ret = fi_control (&fcd->rcq->fid, FI_GETWAIT, (void *) &fd))) {
1624 FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1625 }
1626 svc->trace_out(cm, "Cmfabric Adding trans->data_available as action on fd %d", fd);
1627 svc->fd_add_select(cm, fd, (select_list_func) CMFABRIC_data_available,
1628 (void *) trans, (void *) conn);
1629
1630 fcd->fd = fd;
1631 if ((ret = fi_control (&fcd->scq->fid, FI_GETWAIT, (void *) &fcd->sfd))) {
1632 FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
1633 }
1634 return conn;
1635 }
1636
1637 /*
1638 * Check to see that if we were to attempt to initiate a connection as
1639 * indicated by the attribute list, would we be connecting to ourselves?
1640 * For sockets, this involves checking to see if the host name is the
1641 * same as ours and if the IP_PORT matches the one we are listening on.
1642 */
1643 extern int
libcmfabric_LTX_self_check(CManager cm,CMtrans_services svc,transport_entry trans,attr_list attrs)1644 libcmfabric_LTX_self_check(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs)
1645 {
1646
1647 fabric_client_data_ptr fd = trans->trans_data;
1648 int host_addr;
1649 int int_port_num;
1650 char *host_name;
1651 char my_host_name[256];
1652 static int IP = 0;
1653
1654 get_IP_config(my_host_name, sizeof(host_name), &IP, NULL, NULL, NULL,
1655 NULL, svc->trace_out, (void *)cm);
1656
1657 if (IP == 0) {
1658 if (IP == 0) IP = INADDR_LOOPBACK;
1659 }
1660 if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
1661 /* value pointer */ (attr_value *)(long) & host_name)) {
1662 svc->trace_out(cm, "CMself check CMFABRIC transport found no IP_HOST attribute");
1663 host_name = NULL;
1664 }
1665 if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
1666 /* value pointer */ (attr_value *)(long) & host_addr)) {
1667 svc->trace_out(cm, "CMself check CMFABRIC transport found no IP_ADDR attribute");
1668 if (host_name == NULL) return 0;
1669 host_addr = 0;
1670 }
1671 if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
1672 /* value pointer */ (attr_value *)(long) & int_port_num)) {
1673 svc->trace_out(cm, "CMself check CMFABRIC transport found no IP_PORT attribute");
1674 return 0;
1675 }
1676 if (host_name && (strcmp(host_name, my_host_name) != 0)) {
1677 svc->trace_out(cm, "CMself check - Hostnames don't match");
1678 return 0;
1679 }
1680 if (host_addr && (IP != host_addr)) {
1681 svc->trace_out(cm, "CMself check - Host IP addrs don't match, %lx, %lx", IP, host_addr);
1682 return 0;
1683 }
1684 if (int_port_num != fd->listen_port) {
1685 svc->trace_out(cm, "CMself check - Ports don't match, %d, %d", int_port_num, fd->listen_port);
1686 return 0;
1687 }
1688 svc->trace_out(cm, "CMself check returning TRUE");
1689 return 1;
1690 }
1691
1692 extern int
1693 libcmfabric_LTX_connection_eq(cm, svc, trans, attrs, fcd)
1694 CManager cm;
1695 CMtrans_services svc;
1696 transport_entry trans;
1697 attr_list attrs;
1698 fabric_conn_data_ptr fcd;
1699 {
1700
1701 int int_port_num;
1702 int requested_IP = -1;
1703 char *host_name = NULL;
1704
1705 if (!query_attr(attrs, CM_IP_HOSTNAME, /* type pointer */ NULL,
1706 /* value pointer */ (attr_value *)(long) & host_name)) {
1707 svc->trace_out(cm, "CMFABRIC transport found no IP_HOST attribute");
1708 }
1709 if (!query_attr(attrs, CM_IP_PORT, /* type pointer */ NULL,
1710 /* value pointer */ (attr_value *)(long) & int_port_num)) {
1711 svc->trace_out(cm, "Conn Eq CMFABRIC transport found no IP_PORT attribute");
1712 return 0;
1713 }
1714 if (!query_attr(attrs, CM_IP_ADDR, /* type pointer */ NULL,
1715 /* value pointer */ (attr_value *)(long) & requested_IP)) {
1716 svc->trace_out(cm, "CMFABRIC transport found no IP_ADDR attribute");
1717 }
1718 if (requested_IP == -1) {
1719 check_host(host_name, (void *) &requested_IP);
1720 requested_IP = ntohl(requested_IP);
1721 svc->trace_out(cm, "IP translation for hostname %s is %x", host_name,
1722 requested_IP);
1723 }
1724
1725 svc->trace_out(cm, "Socket Conn_eq comparing IP/ports %x/%d and %x/%d",
1726 fcd->remote_IP, fcd->remote_contact_port,
1727 requested_IP, int_port_num);
1728 if ((fcd->remote_IP == requested_IP) &&
1729 (fcd->remote_contact_port == int_port_num)) {
1730 svc->trace_out(cm, "Socket Conn_eq returning TRUE");
1731 return 1;
1732 }
1733 svc->trace_out(cm, "Socket Conn_eq returning FALSE");
1734 return 0;
1735 }
1736
1737
free_lres(fabric_client_data_ptr fd)1738 static void free_lres(fabric_client_data_ptr fd)
1739 {
1740 fi_close(&fd->cmeq->fid);
1741 }
1742
alloc_cm_res(fabric_client_data_ptr fd)1743 static int alloc_cm_res(fabric_client_data_ptr fd)
1744 {
1745 struct fi_eq_attr cm_attr;
1746 int ret;
1747
1748 memset(&cm_attr, 0, sizeof cm_attr);
1749 cm_attr.wait_obj = FI_WAIT_FD;
1750 ret = fi_eq_open(fd->fab, &cm_attr, &fd->cmeq, NULL);
1751 if (ret)
1752 FT_PRINTERR("fi_eq_open", ret);
1753
1754 return ret;
1755 }
1756
free_ep_res(fabric_conn_data_ptr fcd)1757 static void free_ep_res(fabric_conn_data_ptr fcd)
1758 {
1759 fi_close(&fcd->conn_ep->fid);
1760 fi_close(&fcd->send_mr->fid);
1761 fi_close(&fcd->read_mr->fid);
1762 fi_close(&fcd->rcq->fid);
1763 fi_close(&fcd->scq->fid);
1764 }
1765
alloc_ep_res(fabric_conn_data_ptr fcd,struct fi_info * fi)1766 static int alloc_ep_res(fabric_conn_data_ptr fcd, struct fi_info *fi)
1767 {
1768 fabric_client_data_ptr fabd = fcd->fabd;
1769 struct fi_cq_attr cq_attr;
1770 uint64_t access_mode;
1771 int ret;
1772
1773 fcd->buffer_size = PIGGYBACK;
1774
1775 fcd->read_buf = fabd->svc->get_data_buffer(fabd->cm, MAX(fcd->buffer_size, sizeof(uint64_t)));
1776 if (!fcd->read_buf) {
1777 perror("malloc");
1778 return -1;
1779 }
1780 fcd->max_credits = 512;
1781 memset(&cq_attr, 0, sizeof cq_attr);
1782 cq_attr.format = FI_CQ_FORMAT_DATA;
1783 cq_attr.size = fcd->max_credits << 1;
1784 if (fabd->send_waitset) {
1785 cq_attr.wait_obj = FI_WAIT_SET;
1786 cq_attr.wait_set = fabd->send_waitset;
1787 } else {
1788 cq_attr.wait_obj = FI_WAIT_FD;
1789 }
1790 ret = fi_cq_open(fabd->dom, &cq_attr, &fcd->scq, NULL);
1791 if (ret) {
1792 FT_PRINTERR("fi_cq_open, on fcd->scq", ret);
1793 goto err1;
1794 }
1795
1796 struct fi_cq_attr attrs;
1797 memset(&attrs, 0, sizeof(attrs));
1798 attrs.format = FI_CQ_FORMAT_DATA;
1799 attrs.wait_obj = FI_WAIT_FD;
1800 attrs.size = fcd->max_credits << 1;
1801 ret = fi_cq_open(fabd->dom, &cq_attr, &fcd->rcq, NULL);
1802 if (ret) {
1803 FT_PRINTERR("fi_cq_open", ret);
1804 goto err2;
1805 }
1806
1807 access_mode = FI_REMOTE_READ;
1808 access_mode |= FI_RECV | FI_READ | FI_WRITE | FI_REMOTE_WRITE;
1809 fcd->send_buf = malloc(MAX(fcd->buffer_size, sizeof(uint64_t)));
1810 fcd->mapped_recv_buf = malloc(MAX(fcd->buffer_size, sizeof(uint64_t)));
1811 if (!fcd->send_buf) {
1812 perror("malloc");
1813 return -1;
1814 }
1815 ret = fi_mr_reg(fabd->dom, fcd->mapped_recv_buf, MAX(fcd->buffer_size, sizeof(uint64_t)),
1816 access_mode, 0, 0, 0, &fcd->read_mr, NULL);
1817 if (ret) {
1818 FT_PRINTERR("fi_mr_reg", ret);
1819 goto err3;
1820 }
1821
1822 access_mode = FI_REMOTE_WRITE | FI_WRITE;
1823 printf("fi_mr_reg length %lu, send_buf %p\n", MAX(fcd->buffer_size, sizeof(uint64_t)), fcd->send_buf);
1824 ret = fi_mr_reg(fabd->dom, fcd->send_buf, MAX(fcd->buffer_size, sizeof(uint64_t)),
1825 access_mode, 0, 0, 0, &fcd->send_mr, NULL);
1826 if (ret) {
1827 FT_PRINTERR("fi_mr_reg", ret);
1828 goto err3;
1829 }
1830
1831 ret = fi_endpoint(fabd->dom, fi, &fcd->conn_ep, NULL);
1832 if (ret) {
1833 FT_PRINTERR("fi_endpoint", ret);
1834 goto err4;
1835 }
1836
1837 if (!fabd->cmeq) {
1838 ret = alloc_cm_res(fabd);
1839 if (ret)
1840 goto err4;
1841 }
1842
1843 return 0;
1844
1845 err4:
1846 fi_close(&fcd->read_mr->fid);
1847 fi_close(&fcd->send_mr->fid);
1848 err3:
1849 fi_close(&fcd->rcq->fid);
1850 err2:
1851 fi_close(&fcd->scq->fid);
1852 err1:
1853 free(fcd->send_buf);
1854 return ret;
1855 }
1856
bind_ep_res(fabric_conn_data_ptr fcd)1857 static int bind_ep_res(fabric_conn_data_ptr fcd)
1858 {
1859 int ret;
1860
1861 ret = fi_ep_bind(fcd->conn_ep, &fcd->fabd->cmeq->fid, 0);
1862 if (ret) {
1863 FT_PRINTERR("fi_ep_bind", ret);
1864 return ret;
1865 }
1866
1867 ret = fi_ep_bind(fcd->conn_ep, &fcd->scq->fid, FI_SEND);
1868 if (ret) {
1869 FT_PRINTERR("fi_ep_bind", ret);
1870 return ret;
1871 }
1872
1873 ret = fi_ep_bind(fcd->conn_ep, &fcd->rcq->fid, FI_RECV);
1874 if (ret) {
1875 FT_PRINTERR("fi_ep_bind", ret);
1876 return ret;
1877 }
1878
1879 ret = fi_enable(fcd->conn_ep);
1880 if (ret) {
1881 FT_PRINTERR("fi_enable", ret);
1882 return ret;
1883 }
1884
1885 /* Post the first recv buffer */
1886 ret = fi_recv(fcd->conn_ep, fcd->mapped_recv_buf, fcd->buffer_size, fi_mr_desc(fcd->read_mr), 0, fcd->mapped_recv_buf);
1887 if (ret)
1888 FT_PRINTERR("fi_recv", ret);
1889
1890 return ret;
1891 }
1892
1893
1894 typedef struct {
1895 /* OFI objects */
1896 /* int avtid; */
1897 /* struct fid_domain *domain; */
1898 /* struct fid_fabric *fabric; */
1899 /* struct fid_av *av; */
1900 /* struct fid_ep *ep; */
1901 /* struct fid_cq *p2p_cq; */
1902 /* struct fid_cntr *rma_ctr; */
1903
1904 /* Queryable limits */
1905 uint64_t max_buffered_send;
1906 uint64_t max_buffered_write;
1907 uint64_t max_send;
1908 uint64_t max_write;
1909 uint64_t max_short_send;
1910 uint64_t max_mr_key_size;
1911 int max_windows_bits;
1912 int max_huge_rma_bits;
1913 int max_huge_rmas;
1914 int huge_rma_shift;
1915 int context_shift;
1916 size_t iov_limit;
1917 size_t rma_iov_limit;
1918
1919 /* /\* Mutexex and endpoints *\/ */
1920 /* MPIDI_OFI_cacheline_mutex_t mutexes[4]; */
1921 /* MPIDI_OFI_context_t ctx[MPIDI_OFI_MAX_ENDPOINTS]; */
1922
1923 /* /\* Window/RMA Globals *\/ */
1924 /* void *win_map; */
1925 /* uint64_t cntr; */
1926 /* MPIDI_OFI_atomic_valid_t win_op_table[MPIDI_OFI_DT_SIZES][MPIDI_OFI_OP_SIZES]; */
1927
1928 /* Active Message Globals */
1929 /* struct iovec am_iov[MPIDI_OFI_NUM_AM_BUFFERS]; */
1930 /* struct fi_msg am_msg[MPIDI_OFI_NUM_AM_BUFFERS]; */
1931 /* void *am_bufs[MPIDI_OFI_NUM_AM_BUFFERS]; */
1932 /* MPIDI_OFI_am_repost_request_t am_reqs[MPIDI_OFI_NUM_AM_BUFFERS]; */
1933 /* MPIDI_NM_am_target_handler_fn am_handlers[MPIDI_OFI_MAX_AM_HANDLERS_TOTAL]; */
1934 /* MPIDI_NM_am_origin_handler_fn am_send_cmpl_handlers[MPIDI_OFI_MAX_AM_HANDLERS_TOTAL]; */
1935 /* MPIU_buf_pool_t *am_buf_pool; */
1936 /* OPA_int_t am_inflight_inject_emus; */
1937 /* OPA_int_t am_inflight_rma_send_mrs; */
1938
1939 /* Completion queue buffering */
1940 /* MPIDI_OFI_cq_buff_entry_t cq_buffered[MPIDI_OFI_NUM_CQ_BUFFERED]; */
1941 /* struct slist cq_buff_list; */
1942 /* int cq_buff_head; */
1943 /* int cq_buff_tail; */
1944
1945 /* Process management and PMI globals */
1946 /* int pname_set; */
1947 /* int pname_len; */
1948 /* int jobid; */
1949 /* char addrname[FI_NAME_MAX]; */
1950 /* size_t addrnamelen; */
1951 /* char kvsname[MPIDI_KVSAPPSTRLEN]; */
1952 /* char pname[MPI_MAX_PROCESSOR_NAME]; */
1953 /* int port_name_tag_mask[MPIR_MAX_CONTEXT_MASK]; */
1954 } MPIDI_OFI_global_t;
1955
1956 MPIDI_OFI_global_t MPIDI_Global;
1957
server_listen(fabric_client_data_ptr fd,attr_list listen_info)1958 static int server_listen(fabric_client_data_ptr fd, attr_list listen_info)
1959 {
1960 struct fi_info *fi, *prov_use;
1961 CMtrans_services svc = fd->svc;
1962 int ret;
1963 int attr_port_num;
1964 char *port_str = NULL;
1965
1966 // ret = fi_getinfo(FT_FIVERSION, fd->opts.src_addr, fd->opts.src_port, FI_SOURCE,
1967 // fd->hints, &fi);
1968 if (listen_info != NULL
1969 && !get_int_attr(listen_info, CM_IP_PORT, &attr_port_num)) {
1970 attr_port_num = 0;
1971 } else {
1972 if (attr_port_num > USHRT_MAX || attr_port_num < 0) {
1973 fprintf(stderr, "Requested port number %d is invalid\n", attr_port_num);
1974 return 1;
1975 }
1976 port_str = malloc(10);
1977 sprintf(port_str, "%d", attr_port_num);
1978 }
1979
1980 ret = fi_getinfo(FT_FIVERSION, NULL, port_str, FI_SOURCE, fd->hints, &fi);
1981
1982 if (((struct sockaddr_in *)fi->src_addr)->sin_addr.s_addr == htonl(INADDR_LOOPBACK) &&
1983 (strcmp(fi->fabric_attr->prov_name, "verbs") == 0)) {
1984 char host_name[256];
1985 fi_freeinfo(fi);
1986
1987 if (listen_info) {
1988 listen_info = attr_copy_list(listen_info);
1989 } else {
1990 listen_info = create_attr_list();
1991 }
1992 set_string_attr(listen_info, CM_IP_INTERFACE, strdup("ib"));
1993
1994 svc->trace_out(fd->cm, "CMFabric begin listen, requested port %d", attr_port_num);
1995 get_IP_config(host_name, sizeof(host_name), NULL, NULL, NULL,
1996 NULL, listen_info, svc->trace_out, (void *)fd->cm);
1997 ret = fi_getinfo(FT_FIVERSION, host_name, port_str, FI_SOURCE, fd->hints, &fi);
1998 svc->trace_out(fd->cm, "%s return value fi is %s\n", "server", fi_tostr(fi, FI_TYPE_INFO));
1999 fd->hostname = strdup(host_name);
2000 free_attr_list(listen_info);
2001 } else {
2002 svc->trace_out(fd->cm, "%s return value fi is %s\n", "server", fi_tostr(fi, FI_TYPE_INFO));
2003 }
2004
2005 prov_use = fi;
2006 MPIDI_Global.max_buffered_send = prov_use->tx_attr->inject_size;
2007 MPIDI_Global.max_buffered_write = prov_use->tx_attr->inject_size;
2008 MPIDI_Global.max_send = prov_use->ep_attr->max_msg_size;
2009 MPIDI_Global.max_write = prov_use->ep_attr->max_msg_size;
2010 svc->trace_out(fd->cm, "Max send is %ld, max write is %ld\n", MPIDI_Global.max_send, MPIDI_Global.max_write);
2011 /* MPIDI_Global.iov_limit = MIN(prov_use->tx_attr->iov_limit,MPIDI_OFI_IOV_MAX); */
2012 /* MPIDI_Global.rma_iov_limit = MIN(prov_use->tx_attr->rma_iov_limit,MPIDI_OFI_IOV_MAX); */
2013 MPIDI_Global.max_mr_key_size = prov_use->domain_attr->mr_key_size;
2014
2015 /* if(MPIDI_Global.max_mr_key_size >= 8) { */
2016 /* MPIDI_Global.max_windows_bits = MPIDI_OFI_MAX_WINDOWS_BITS_64; */
2017 /* MPIDI_Global.max_huge_rma_bits = MPIDI_OFI_MAX_HUGE_RMA_BITS_64; */
2018 /* MPIDI_Global.max_huge_rmas = MPIDI_OFI_MAX_HUGE_RMAS_64; */
2019 /* MPIDI_Global.huge_rma_shift = MPIDI_OFI_HUGE_RMA_SHIFT_64; */
2020 /* MPIDI_Global.context_shift = MPIDI_OFI_CONTEXT_SHIFT_64; */
2021 /* } else if(MPIDI_Global.max_mr_key_size >= 4) { */
2022 /* MPIDI_Global.max_windows_bits = MPIDI_OFI_MAX_WINDOWS_BITS_32; */
2023 /* MPIDI_Global.max_huge_rma_bits = MPIDI_OFI_MAX_HUGE_RMA_BITS_32; */
2024 /* MPIDI_Global.max_huge_rmas = MPIDI_OFI_MAX_HUGE_RMAS_32; */
2025 /* MPIDI_Global.huge_rma_shift = MPIDI_OFI_HUGE_RMA_SHIFT_32; */
2026 /* MPIDI_Global.context_shift = MPIDI_OFI_CONTEXT_SHIFT_32; */
2027 /* } else if(MPIDI_Global.max_mr_key_size >= 2) { */
2028 /* MPIDI_Global.max_windows_bits = MPIDI_OFI_MAX_WINDOWS_BITS_16; */
2029 /* MPIDI_Global.max_huge_rma_bits = MPIDI_OFI_MAX_HUGE_RMA_BITS_16; */
2030 /* MPIDI_Global.max_huge_rmas = MPIDI_OFI_MAX_HUGE_RMAS_16; */
2031 /* MPIDI_Global.huge_rma_shift = MPIDI_OFI_HUGE_RMA_SHIFT_16; */
2032 /* MPIDI_Global.context_shift = MPIDI_OFI_CONTEXT_SHIFT_16; */
2033 /* } else { */
2034 /* MPIR_ERR_SETFATALANDJUMP4(mpi_errno, */
2035 /* MPI_ERR_OTHER, */
2036 /* "**ofid_rma_init", */
2037 /* "**ofid_rma_init %s %d %s %s", */
2038 /* __SHORT_FILE__, */
2039 /* __LINE__, */
2040 /* FCNAME, */
2041 /* "Key space too small"); */
2042 /* } */
2043 if (ret) {
2044 FT_PRINTERR("fi_getinfo", ret);
2045 return ret;
2046 }
2047
2048 ret = fi_fabric(fi->fabric_attr, &fd->fab, NULL);
2049
2050 if (ret) {
2051 FT_PRINTERR("fi_fabric", ret);
2052 goto err0;
2053 }
2054
2055 struct fi_wait_attr attrs;
2056 attrs.wait_obj = FI_WAIT_FD;
2057 attrs.flags = 0;
2058 ret = fi_wait_open(fd->fab, &attrs, &fd->send_waitset);
2059 if (ret) {
2060 /* sigh */
2061 fd->send_waitset = NULL;
2062 }
2063
2064 ret = fi_passive_ep(fd->fab, fi, &fd->listen_ep, NULL);
2065 if (ret) {
2066 FT_PRINTERR("fi_passive_ep", ret);
2067 goto err1;
2068 }
2069
2070 ret = alloc_cm_res(fd);
2071 if (ret)
2072 goto err2;
2073
2074 ret = fi_pep_bind(fd->listen_ep, &fd->cmeq->fid, 0);
2075 if (ret) {
2076 FT_PRINTERR("fi_pep_bind", ret);
2077 goto err3;
2078 }
2079
2080
2081 ret = fi_listen(fd->listen_ep);
2082 if (ret) {
2083 FT_PRINTERR("fi_listen", ret);
2084 goto err3;
2085 }
2086
2087 fi_freeinfo(fi);
2088 return 0;
2089 err3:
2090 free_lres(fd);
2091 err2:
2092 fi_close(&fd->listen_ep->fid);
2093 err1:
2094 fi_close(&fd->fab->fid);
2095 err0:
2096 fi_freeinfo(fi);
2097 return ret;
2098 }
2099
server_connect(fabric_conn_data_ptr fcd)2100 static int server_connect(fabric_conn_data_ptr fcd)
2101 {
2102 struct fi_eq_cm_entry entry;
2103 uint32_t event;
2104 struct fi_info *info = NULL;
2105 ssize_t rd;
2106 int ret;
2107 fabric_client_data_ptr fabd = fcd->fabd;
2108
2109 rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
2110 if (rd != sizeof entry) {
2111 if (rd == -FI_EAVAIL) {
2112 struct fi_eq_err_entry error = {0};
2113 int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
2114 if (rc) {
2115 char buf[1024];
2116 fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
2117 error.err_data, buf, 1024));
2118 }
2119 } else {
2120 FT_PRINTERR("fi_eq_sread", rd);
2121 }
2122 return (int) rd;
2123 }
2124
2125 info = entry.info;
2126 if (event != FI_CONNREQ) {
2127 fprintf(stderr, "Unexpected CM event %d\n", event);
2128 ret = -FI_EOTHER;
2129 goto err1;
2130 }
2131
2132 ret = fi_domain(fabd->fab, info, &fabd->dom, NULL);
2133 if (ret) {
2134 FT_PRINTERR("fi_domain", ret);
2135 goto err1;
2136 }
2137
2138
2139 ret = fi_endpoint(fabd->dom, info, &fcd->conn_ep, NULL);
2140 if (ret) {
2141 FT_PRINTERR("fi_endpoint", -ret);
2142 goto err1;
2143 }
2144
2145 ret = alloc_ep_res(fcd, info);
2146 if (ret)
2147 goto err1;
2148
2149 ret = bind_ep_res(fcd);
2150 if (ret)
2151 goto err3;
2152
2153 ret = fi_accept(fcd->conn_ep, NULL, 0);
2154 if (ret) {
2155 FT_PRINTERR("fi_accept", ret);
2156 goto err3;
2157 }
2158
2159 rd = fi_eq_sread(fabd->cmeq, &event, &entry, sizeof entry, -1, 0);
2160 if (rd != sizeof entry) {
2161 if (ret == -FI_EAVAIL) {
2162 struct fi_eq_err_entry error = {0};
2163 int rc = fi_eq_readerr(fabd->cmeq, &error, 0);
2164 if (rc) {
2165 char buf[1024];
2166 fprintf(stderr, "error event: %s\n", fi_eq_strerror(fabd->cmeq, error.prov_errno,
2167 error.err_data, buf, 1024));
2168 }
2169 } else {
2170 FT_PRINTERR("fi_eq_sread", rd);
2171 }
2172 goto err3;
2173 }
2174
2175 if (event != FI_CONNECTED || entry.fid != &fcd->conn_ep->fid) {
2176 fprintf(stderr, "Unexpected CM event %d fid %p (ep %p)\n",
2177 event, entry.fid, fcd->conn_ep);
2178 ret = -FI_EOTHER;
2179 goto err3;
2180 }
2181
2182 fi_freeinfo(info);
2183 return 0;
2184
2185 err3:
2186 free_ep_res(fcd);
2187 err1:
2188 fi_reject(fabd->listen_ep, info->handle, NULL, 0);
2189 fi_freeinfo(info);
2190 return ret;
2191 }
2192
2193 /*
2194 * Create an passive endpoint to listen for connections
2195 */
2196 extern attr_list
libcmfabric_LTX_non_blocking_listen(CManager cm,CMtrans_services svc,transport_entry trans,attr_list listen_info)2197 libcmfabric_LTX_non_blocking_listen(CManager cm, CMtrans_services svc, transport_entry trans, attr_list listen_info)
2198 {
2199 fabric_client_data_ptr fd = trans->trans_data;
2200 int wait_sock = -1;
2201 int ret, IP, port_num;
2202 size_t addrlen;
2203 struct sockaddr_in local_addr;
2204
2205 if (cm) {
2206 /* assert CM is locked */
2207 assert(CM_LOCKED(svc, cm));
2208 }
2209 int result = server_listen(fd, listen_info);
2210 if (result != 0) {
2211 fprintf(stderr, "Cannot bind INET socket\n");
2212 return NULL;
2213 }
2214
2215 addrlen = sizeof(local_addr);
2216 ret = fi_getname(&fd->listen_ep->fid, (void*)&local_addr, &addrlen);
2217 IP = ntohl(local_addr.sin_addr.s_addr);
2218 port_num = ntohs(local_addr.sin_port);
2219 if (ret) {
2220 FT_PRINTERR("fi_getname", ret);
2221 return NULL;
2222 }
2223
2224 ret = fi_control (&fd->cmeq->fid, FI_GETWAIT, (void *) &wait_sock);
2225 if (ret) {
2226 FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
2227 } else {
2228 svc->trace_out(cm, "Cmfabric Adding fabric_service_incoming as action on fd %d", wait_sock);
2229 svc->fd_add_select(cm, wait_sock, fabric_service_incoming,
2230 (void *) trans, (void *) fd->listen_ep);
2231 }
2232 {
2233 attr_list ret_list;
2234
2235 svc->trace_out(cm, "CMFABRIC listen succeeded on port %d, fd %d",
2236 port_num, wait_sock);
2237 ret_list = create_attr_list();
2238
2239 fd->listen_port = port_num;
2240 add_attr(ret_list, CM_TRANSPORT, Attr_String,
2241 (attr_value) strdup("fabric"));
2242 if ((getenv("CMFabricUseHostname") != NULL) ||
2243 (getenv("CM_NETWORK") != NULL)) {
2244 add_attr(ret_list, CM_IP_HOSTNAME, Attr_String,
2245 (attr_value) strdup(fd->hostname));
2246 } else if (IP == 0) {
2247 add_attr(ret_list, CM_IP_ADDR, Attr_Int4,
2248 (attr_value)INADDR_LOOPBACK);
2249 } else {
2250 add_int_attr(ret_list, CM_IP_ADDR, (int)IP);
2251 }
2252 add_attr(ret_list, CM_IP_PORT, Attr_Int4,
2253 (attr_value) (long)port_num);
2254
2255 return ret_list;
2256 }
2257 }
2258
2259 #if defined(HAVE_WINDOWS_H) && !defined(NEED_IOVEC_DEFINE)
2260 #define NEED_IOVEC_DEFINE
2261 #endif
2262
2263 #ifdef NEED_IOVEC_DEFINE
2264 struct iovec {
2265 void *iov_base;
2266 int iov_len;
2267 };
2268
2269 #endif
2270
2271 extern void
2272 libcmfabric_LTX_set_write_notify(trans, svc, fcd, enable)
2273 transport_entry trans;
2274 CMtrans_services svc;
2275 fabric_conn_data_ptr fcd;
2276 int enable;
2277 {
2278 if (enable != 0) {
2279 svc->fd_write_select(trans->cm, fcd->fd, (select_list_func) trans->write_possible,
2280 (void *)trans, (void *) fcd->conn);
2281 } else {
2282 /* remove entry */
2283 svc->fd_write_select(trans->cm, fcd->fd, NULL, NULL, NULL);
2284 }
2285 }
2286
2287
2288 extern CMbuffer
libcmfabric_LTX_read_block_func(CMtrans_services svc,fabric_conn_data_ptr fcd,int * len_ptr,int * offset_ptr)2289 libcmfabric_LTX_read_block_func(CMtrans_services svc, fabric_conn_data_ptr fcd, int *len_ptr, int *offset_ptr)
2290 {
2291 *len_ptr = fcd->read_buffer_len;
2292 if (fcd->read_buf) {
2293 CMbuffer tmp = fcd->read_buf;
2294 fcd->read_buf = NULL;
2295 fcd->read_buffer_len = 0;
2296 if (offset_ptr) *offset_ptr = fcd->read_offset;
2297 return tmp;
2298 }
2299 return NULL;
2300 }
2301
2302
2303 #ifndef IOV_MAX
2304 /* this is not defined in some places where it should be. Conservative. */
2305 #define IOV_MAX 16
2306 #endif
2307
2308 extern int
libcmfabric_LTX_writev_complete_notify_func(CMtrans_services svc,fabric_conn_data_ptr fcd,void * iovs,int iovcnt,attr_list attrs,CMcompletion_notify_func notify_func,void * notify_client_data)2309 libcmfabric_LTX_writev_complete_notify_func(CMtrans_services svc,
2310 fabric_conn_data_ptr fcd,
2311 void *iovs,
2312 int iovcnt,
2313 attr_list attrs,
2314 CMcompletion_notify_func notify_func,
2315 void *notify_client_data)
2316 {
2317 int fd = fcd->fd;
2318 int left = 0;
2319 int iget = 0;
2320 int i;
2321 int can_reuse_mapping = 0;
2322 struct iovec * iov = (struct iovec*) iovs, *tmp_iov;
2323 rinfo *last_write_request = &fcd->last_write;
2324 struct control_message msg;
2325
2326 for (i = 0; i < iovcnt; i++) {
2327 left += iov[i].iov_len;
2328 }
2329
2330 svc->trace_out(fcd->fabd->cm, "CMFABRIC writev of %d bytes on fd %d",
2331 left, fd);
2332
2333 if (left + msg_offset() < PIGGYBACK)
2334 {
2335 //total size is less than the piggyback size
2336 iget = internal_write_piggyback(svc, fcd, left, iov, iovcnt);
2337 if (notify_func) {
2338 (notify_func)(notify_client_data);
2339 }
2340 if(iget < 0)
2341 {
2342 svc->trace_out(fcd->fabd->cm, "CMFABRIC error in writing piggyback");
2343 return -1;
2344 }
2345 if(iget == 0)
2346 {
2347 return iovcnt;
2348 }
2349 return -1;
2350 }
2351
2352 svc->set_pending_write(fcd->conn);
2353
2354 if (notify_func) {
2355 can_reuse_mapping = 1;
2356 /* OK, we're not going to copy the data */
2357 if (last_write_request->iovcnt == iovcnt) {
2358 int i;
2359 for(i=0; i < last_write_request->iovcnt; i++) {
2360 if ((iov[i].iov_len != last_write_request->iov[i].iov_len) ||
2361 (iov[i].iov_base != last_write_request->iov[i].iov_base)) {
2362 can_reuse_mapping = 0;
2363 svc->trace_out(fcd->fabd->cm, "CMFABRIC already mapped data, doesn't match write, buf %d, %p vs. %p, %d vs. %d",
2364 i, iov[i].iov_base, last_write_request->iov[i].iov_base, iov[i].iov_len, last_write_request->iov[i].iov_len);
2365 break;
2366 }
2367 }
2368 } else {
2369 svc->trace_out(fcd->fabd->cm, "CMFABRIC either no already mapped data, or wrong buffer count");
2370 can_reuse_mapping = 0;
2371 }
2372 } else {
2373 svc->trace_out(fcd->fabd->cm, "CMFABRIC User-owned data with no notify, so no reuse\n");
2374 }
2375 #ifndef DO_DEREG_ON_FINISH
2376 if (last_write_request->iovcnt && !(can_reuse_mapping)) {
2377 for(i = 0; i < last_write_request->iovcnt; i ++)
2378 {
2379 fi_close(&last_write_request->mrlist[i]->fid);
2380 last_write_request->mrlist[i] = NULL;
2381 }
2382
2383 free(last_write_request->mrlist);
2384 free(last_write_request->iov);
2385 last_write_request->iov = NULL;
2386 last_write_request->iov = NULL;
2387 last_write_request->mrlist = NULL;
2388 last_write_request->iovcnt = 0;
2389 }
2390 #endif
2391 if (notify_func == NULL) {
2392 /*
2393 * Semantics are that *MUST* be done with the data when we return,
2394 * so, for now, copy all data
2395 */
2396 tmp_iov = malloc(sizeof(tmp_iov[0]) * iovcnt);
2397 for (i = 0; i < iovcnt; i++) {
2398 tmp_iov[i].iov_len = iov[i].iov_len;
2399 tmp_iov[i].iov_base = malloc(iov[i].iov_len);
2400 memcpy(tmp_iov[i].iov_base, iov[i].iov_base, iov[i].iov_len);
2401 }
2402 } else {
2403 /*
2404 * Cool. The app doesn't need the data back right away.
2405 * We can keep it and tell the upper levels when we're done.
2406 * Don't copy. The reply message will trigger the notification.
2407 */
2408 tmp_iov = iov;
2409 }
2410 memset(&msg, 0, sizeof(msg));
2411
2412
2413 svc->trace_out(fcd->fabd->cm, "Can reuse mapping is %d\n", can_reuse_mapping);
2414 if (!can_reuse_mapping) {
2415 int i;
2416 fcd->last_write.iovcnt = iovcnt;
2417 if (fcd->last_write.iov) free(fcd->last_write.iov);
2418 fcd->last_write.iov = malloc(sizeof(tmp_iov[0]) * iovcnt);
2419 memcpy(fcd->last_write.iov, tmp_iov, sizeof(tmp_iov[0]) * iovcnt);
2420
2421 fcd->last_write.mrlist = malloc(iovcnt * sizeof(fcd->last_write.mrlist[0]));
2422 for (i=0; i < iovcnt; i++) {
2423 int ret;
2424 svc->trace_out(fcd->fabd->cm, "fi_mr_reg %d, addr %p, len %ld, with attrs REMOTE_READ,REMOTE_WRITE, SEND, RECV)\n", i, tmp_iov[i].iov_base, tmp_iov[i].iov_len);
2425 ret = fi_mr_reg(fcd->fabd->dom, tmp_iov[i].iov_base, tmp_iov[i].iov_len,
2426 FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV, 0, 0, 0, &fcd->last_write.mrlist[i], NULL);
2427 if (ret) {
2428 FT_PRINTERR("fi_mr_reg of elements in bulk write", ret);
2429 return -1;
2430 }
2431
2432 }
2433 }
2434 fcd->last_write.notify_func = notify_func;
2435 fcd->last_write.notify_client_data = notify_client_data;
2436
2437 if(fcd->last_write.mrlist == NULL)
2438 {
2439 svc->trace_out(fcd->fabd->cm, "CMFABRIC writev error in registereing memory");
2440 return -1;
2441 }
2442
2443 svc->trace_out(fcd->fabd->cm, "FIrst 16 bytes of send buffer (len %d) are %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x\n", left, ((unsigned char*)iov[0].iov_base)[0], ((unsigned char*)iov[0].iov_base)[1], ((unsigned char*)iov[0].iov_base)[2], ((unsigned char*)iov[0].iov_base)[3], ((unsigned char*)iov[0].iov_base)[4], ((unsigned char*)iov[0].iov_base)[5], ((unsigned char*)iov[0].iov_base)[6], ((unsigned char*)iov[0].iov_base)[7], ((unsigned char*)iov[0].iov_base)[8], ((unsigned char*)iov[0].iov_base)[9], ((unsigned char*)iov[0].iov_base)[10], ((unsigned char*)iov[0].iov_base)[11], ((unsigned char*)iov[0].iov_base)[12], ((unsigned char*)iov[0].iov_base)[13], ((unsigned char*)iov[0].iov_base)[14], ((unsigned char*)iov[0].iov_base)[15]);
2444 iget = internal_write_request(svc, fcd, left, &fcd->last_write);
2445 if(iget < 0)
2446 {
2447 svc->trace_out(fcd->fabd->cm, "CMFABRIC error in writing request");
2448 return -1;
2449 }
2450
2451 return iovcnt;
2452 }
2453
2454 extern int
2455 libcmfabric_LTX_writev_func(svc, fcd, iovs, iovcnt, attrs)
2456 CMtrans_services svc;
2457 fabric_conn_data_ptr fcd;
2458 void *iovs;
2459 int iovcnt;
2460 attr_list attrs;
2461 {
2462 return libcmfabric_LTX_writev_complete_notify_func(svc, fcd, iovs, iovcnt,
2463 attrs, NULL, NULL);
2464 }
2465
2466 static void
free_fabric_data(CManager cm,void * fdv)2467 free_fabric_data(CManager cm, void *fdv)
2468 {
2469 fabric_client_data_ptr fd = (fabric_client_data_ptr) fdv;
2470 CMtrans_services svc = fd->svc;
2471 kill_thread(fd);
2472 if (fd->hostname != NULL)
2473 svc->free_func(fd->hostname);
2474 svc->free_func(fd);
2475 }
2476
2477 static void
check_completed_pull(CManager cm,fabric_client_data_ptr fabd)2478 check_completed_pull(CManager cm, fabric_client_data_ptr fabd)
2479 {
2480 msg_item_p msg;
2481 fabric_conn_data_ptr fcd;
2482 CMtrans_services svc = fabd->svc;
2483 if (!fabd->completed_queue) return;
2484
2485 pthread_mutex_lock(&fabd->pull_queue_mutex);
2486 msg = fabd->completed_queue;
2487 fabd->completed_queue = msg->next;
2488 pthread_mutex_unlock(&fabd->pull_queue_mutex);
2489
2490 svc->acquire_CM_lock(fabd->cm, __FILE__, __LINE__);
2491 /* OK, msg is ours alone */
2492 fcd = msg->conn_data;
2493 free(msg->pulls);
2494 CMbuffer cb = svc->create_data_and_link_buffer(fcd->fabd->cm,
2495 msg->buffer,
2496 msg->message_size);
2497 cb->return_callback = free_func;
2498 cb->return_callback_data = (void*)msg->mr_rec;
2499 fcd->read_buf = cb;
2500 svc->trace_out(fcd->fabd->cm, "FIrst 16 bytes of receive buffer (len %d) are %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x \n", msg->message_size, ((unsigned char*)cb->buffer)[0], ((unsigned char*)cb->buffer)[1], ((unsigned char*)cb->buffer)[2], ((unsigned char*)cb->buffer)[3], ((unsigned char*)cb->buffer)[4], ((unsigned char*)cb->buffer)[5], ((unsigned char*)cb->buffer)[6], ((unsigned char*)cb->buffer)[7], ((unsigned char*)cb->buffer)[8], ((unsigned char*)cb->buffer)[9], ((unsigned char*)cb->buffer)[10], ((unsigned char*)cb->buffer)[11], ((unsigned char*)cb->buffer)[12], ((unsigned char*)cb->buffer)[13], ((unsigned char*)cb->buffer)[14], ((unsigned char*)cb->buffer)[15]);
2501 fcd->read_buffer_len = msg->message_size;
2502 svc->trace_out(fcd->fabd->cm, "CMFABRIC handle_request completed");
2503 internal_write_response(svc, fcd, msg->message_size, msg->request_ID);
2504 fabd->trans->data_available(fabd->trans, fcd->conn);
2505 svc->return_data_buffer(fabd->trans->cm, cb);
2506 svc->drop_CM_lock(fabd->cm, __FILE__, __LINE__);
2507
2508 }
2509
2510 extern
libcmfabric_LTX_install_pull_schedule(CMtrans_services svc,transport_entry trans,struct timeval * base_time,struct timeval * period,CMavail_period_ptr avail)2511 void libcmfabric_LTX_install_pull_schedule(CMtrans_services svc,
2512 transport_entry trans,
2513 struct timeval *base_time,
2514 struct timeval *period,
2515 CMavail_period_ptr avail)
2516 {
2517 fabric_client_data_ptr fabd = trans->trans_data;
2518 fabd->pull_schedule_base = *base_time;
2519 fabd->pull_schedule_period = *period;
2520 CMavail_period_ptr tmp = fabd->avail;
2521 fabd->avail = avail;
2522 free(tmp);
2523 if (!fabd->thread_init) {
2524 svc->trace_out(fabd->cm, "Starting pull thread!\n");
2525 pthread_mutex_init(&fabd->pull_queue_mutex, NULL);
2526 fabd->thread_should_run = 1;
2527 pthread_t thr;
2528 if (!fabd->send_waitset) {
2529 /* no waitset support */
2530 int filedes[2];
2531 if (pipe(filedes) != 0) {
2532 perror("Pipe for wake not created. Wake mechanism inoperative.");
2533 return;
2534 }
2535 fabd->wake_read_fd = filedes[0];
2536 fabd->wake_write_fd = filedes[1];
2537 fabd->nfds = fabd->wake_read_fd;
2538 FD_SET(fabd->wake_read_fd, &fabd->readset);
2539 fabd->fcd_array_by_sfd = malloc(sizeof(char*));
2540 fabd->cq_array = malloc(sizeof(char*));
2541 }
2542 svc->add_poll(fabd->cm, (CMPollFunc) check_completed_pull, fabd);
2543 pthread_create(&thr, NULL, (void*(*)(void*))&pull_thread, fabd);
2544 fabd->thread_init = 1;
2545 }
2546 }
2547
2548 extern void *
libcmfabric_LTX_initialize(CManager cm,CMtrans_services svc)2549 libcmfabric_LTX_initialize(CManager cm, CMtrans_services svc)
2550 {
2551 static int atom_init = 0;
2552
2553 fabric_client_data_ptr fabd;
2554 svc->trace_out(cm, "Initialize CM fabric transport built in %s\n",
2555 EVPATH_MODULE_BUILD_DIR);
2556 if (atom_init == 0) {
2557 CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
2558 CM_IP_PORT = attr_atom_from_string("IP_PORT");
2559 CM_IP_ADDR = attr_atom_from_string("IP_ADDR");
2560 CM_IP_INTERFACE = attr_atom_from_string("IP_INTERFACE");
2561 CM_FD = attr_atom_from_string("CONNECTION_FILE_DESCRIPTOR");
2562 CM_THIS_CONN_PORT = attr_atom_from_string("THIS_CONN_PORT");
2563 CM_PEER_CONN_PORT = attr_atom_from_string("PEER_CONN_PORT");
2564 CM_PEER_IP = attr_atom_from_string("PEER_IP");
2565 CM_PEER_HOSTNAME = attr_atom_from_string("PEER_HOSTNAME");
2566 CM_PEER_LISTEN_PORT = attr_atom_from_string("PEER_LISTEN_PORT");
2567 CM_NETWORK_POSTFIX = attr_atom_from_string("CM_NETWORK_POSTFIX");
2568 CM_TRANSPORT = attr_atom_from_string("CM_TRANSPORT");
2569 atom_init++;
2570 }
2571 fabd = svc->malloc_func(sizeof(struct fabric_client_data));
2572 fabd->svc = svc;
2573 memset(fabd, 0, sizeof(struct fabric_client_data));
2574 fabd->cm = cm;
2575 fabd->hostname = NULL;
2576 fabd->listen_port = -1;
2577 fabd->svc = svc;
2578 fabd->port = 1; //need to somehow get proper port here
2579
2580 fabd->psn = lrand48()%256;
2581
2582 fabd->hints = fi_allocinfo();
2583
2584 // fabd->hints->cap = FI_CONTEXT;
2585 fabd->hints->ep_attr->type = FI_EP_MSG;
2586 fabd->hints->caps = FI_MSG | FI_RMA;
2587 // fabd->hints->caps = FI_MSG;
2588 fabd->hints->mode = FI_CONTEXT | FI_LOCAL_MR | FI_RX_CQ_DATA;
2589 fabd->hints->addr_format = FI_SOCKADDR;
2590 // fabd->hints->tx_attr->op_flags = FI_DELIVERY_COMPLETE | FI_COMPLETION;
2591 fabd->hints->tx_attr->op_flags = FI_COMPLETION;
2592
2593 fabd->hints->domain_attr->mr_mode = FI_MR_BASIC;
2594 fabd->hints->domain_attr->threading = FI_THREAD_SAFE;
2595 fabd->hints->domain_attr->control_progress = FI_PROGRESS_AUTO;
2596 fabd->hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
2597 svc->add_shutdown_task(cm, free_fabric_data, (void *) fabd, FREE_TASK);
2598
2599 fabd->wake_read_fd = -1;
2600 EVPATH_FD_ZERO(&fabd->readset);
2601 fabd->nfds = 0;
2602 return (void *) fabd;
2603 }
2604
2605
2606 extern transport_entry
cmfabric_add_static_transport(CManager cm,CMtrans_services svc)2607 cmfabric_add_static_transport(CManager cm, CMtrans_services svc)
2608 {
2609 transport_entry transport;
2610 transport = svc->malloc_func(sizeof(struct _transport_item));
2611 memset(transport, 0, sizeof(*transport));
2612 transport->trans_name = strdup("fabric");
2613 transport->cm = cm;
2614 transport->transport_init = (CMTransport_func)libcmfabric_LTX_initialize;
2615 transport->listen = (CMTransport_listen_func)libcmfabric_LTX_non_blocking_listen;
2616 transport->initiate_conn = (CMConnection(*)())libcmfabric_LTX_initiate_conn;
2617 transport->self_check = (int(*)())libcmfabric_LTX_self_check;
2618 transport->connection_eq = (int(*)())libcmfabric_LTX_connection_eq;
2619 transport->shutdown_conn = (CMTransport_shutdown_conn_func)libcmfabric_LTX_shutdown_conn;
2620 transport->read_block_func = (CMTransport_read_block_func)libcmfabric_LTX_read_block_func;
2621 transport->read_to_buffer_func = (CMTransport_read_to_buffer_func)NULL;
2622 transport->writev_func = (CMTransport_writev_func)libcmfabric_LTX_writev_func;
2623 transport->writev_complete_notify_func = (CMTransport_writev_complete_notify_func)libcmfabric_LTX_writev_complete_notify_func;
2624 transport->install_pull_schedule_func = (CMTransport_install_pull_schedule)libcmfabric_LTX_install_pull_schedule;
2625 transport->get_transport_characteristics = NULL;
2626 if (transport->transport_init) {
2627 transport->trans_data = transport->transport_init(cm, svc, transport);
2628 }
2629 return transport;
2630 }
2631
2632 #endif
2633
2634