1 #if HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 /* $Id: openib.c,v 1.4.2.9 2007-10-18 06:08:03 d3h325 Exp $
6 *
7 * File organized as follows
8 */
9 #ifndef _GNU_SOURCE
10 #define _GNU_SOURCE
11 #endif
12
13 #if HAVE_STDIO_H
14 # include <stdio.h>
15 #endif
16 #if HAVE_STRINGS_H
17 # include <strings.h>
18 #endif
19 #if HAVE_ASSERT_H
20 # include <assert.h>
21 #endif
22 #if HAVE_UNISTD_H
23 # include <unistd.h>
24 #endif
25 #if HAVE_STRING_H
26 # include <string.h>
27 #endif
28 #include <mpi.h>
29
30 #include "cbuf.h"
31 #include "armcip.h"
32 #include "copy.h"
33 #include "request.h"
34 #include "armci-vapi.h"
35 #include "iterator.h"
36 #define DEBUG_INIT 0
37 #define DEBUG_FINALIZE 0
38 #define DEBUG_SERVER 0
39 #define DEBUG_CLN 0
40 #define TIME_INIT 0
41 # define VAPIDEV_NAME "InfiniHost0"
42 # define INVAL_HNDL 0xFFFFFFFF
43 #define RNR_TIMER 12
44
45 /*Debug macros used to tune what is being tested -- mostly openib calls*/
46 #define DBG_INIT 1
47 #define DBG_POLL 1
48 #define DBG_ALL 1
49
50 #define QP_INACTIVE 5
51 #define QP_REQ_SENT 2
52 #define QP_ACK_RCVD 3
53 #define QP_ACTIVE 4
54
55 u_int32_t armci_max_num_sg_ent;
56 u_int32_t armci_max_qp_ous_swr;
57 u_int32_t armci_max_qp_ous_rwr;
58
59 typedef struct {
60 struct ibv_qp *qp;
61 uint32_t sqpnum; /*we need to exchng qp nums,arr for that*/
62 uint16_t lid;
63 uint16_t state;
64 void *next;
65 } armci_connect_t;
66
67 armci_connect_t *CLN_con, *SRV_con;
68 static uint32_t *SRV_rqpnums, *CLN_rqpnums; /*relevant rqp num arrs, to connect to svr and client*/
69 static uint32_t *CLN_rqpnumtmpbuf=NULL; /*temporary buf used during connection setup*/
70 /*\
71 * datastrucure for infinihost NIC
72 \*/
73 typedef struct {
74 uint16_t *lid_arr; /*we need to exchange lids, arr for that*/
75 struct ibv_context *handle; /*device context/handle*/
76 int maxtransfersize;
77 struct ibv_device_attr attr; /*device properties*/
78 struct ibv_port_attr hca_port; /*mostly for getting lid*/
79 uint8_t active_port;
80 struct ibv_pd *ptag; /*protection tag*/
81 const char *vendor;
82 struct ibv_cq *scq; /*send completion queue*/
83 struct ibv_cq *rcq; /*recv completion queue*/
84 struct ibv_comp_channel *sch; /*send completion channel*/
85 struct ibv_comp_channel *rch; /*recv completion channel*/
86 void *scq_cntx; /*send context for completion queue*/
87 void *rcq_cntx; /*recv context for completion queue*/
88 int scv; /*send completion vector*/
89 int rcv; /*recv completion vector*/
90 } vapi_nic_t;
91
92 typedef struct {
93 armci_vapi_memhndl_t *prem_handle; /*address server to store memory handle*/
94 armci_vapi_memhndl_t handle;
95 }ack_t;
96
97 armci_vapi_memhndl_t *CLN_handle;
98 armci_vapi_memhndl_t serv_memhandle, client_memhandle;
99 armci_vapi_memhndl_t *handle_array;
100 armci_vapi_memhndl_t *pinned_handle;
101
102 static vapi_nic_t nic_arr[3];
103 static vapi_nic_t *SRV_nic= nic_arr;
104 static vapi_nic_t *CLN_nic= nic_arr+1;
105 static int armci_server_terminating;
106
107 #define NONE -1
108 static int armci_ack_proc=NONE;
109
110 static int armci_vapi_server_ready;
111 static int armci_vapi_server_stage1=0;
112 static int armci_vapi_client_stage1=0;
113 static int armci_vapi_server_stage2=0;
114 static int armci_vapi_client_ready;
115 int _s=-1,_c=-1;
116 static int armci_vapi_max_inline_size=-1;
117 #define CLIENT_STAMP 101
118 #define SERV_STAMP 99
119 #define MAX_PROC_INLINE_SIZE 2048
120
121 static char * client_tail;
122 static char * serv_tail;
123 static ack_t *SRV_ack;
124
125 #if defined(PEND_BUFS)
126 typedef immbuf_t vapibuf_t;
127 typedef pendbuf_t vapibuf_pend_t;
128 #else
129 typedef struct {
130 struct ibv_recv_wr dscr;
131 struct ibv_sge sg_entry;
132 char buf[CBUF_DLEN];
133 } vapibuf_t;
134 #endif
135
136 typedef struct {
137 struct ibv_send_wr snd_dscr;
138 struct ibv_sge ssg_entry;
139 struct ibv_recv_wr rcv_dscr;
140 struct ibv_sge rsg_entry;
141 char buf[VBUF_DLEN];
142 } vapibuf_ext_t;
143
144 typedef struct {
145 struct ibv_send_wr rmw_dscr;
146 struct ibv_sge rmw_entry;
147 } vapirmw_t;
148
149 unsigned int armci_use_odcm = 0;
150 unsigned int armci_use_lazy_break = 0;
151 unsigned int armci_use_apm = 0;
152 unsigned int armci_use_apm_test = 0;
153 unsigned int armci_use_srq = 0;
154 unsigned int armci_use_snft = 0;
155 unsigned int armci_use_affinity = 0;
156
157 unsigned int armci_srq_size = 4096;
158
159 pthread_t armci_async_thread[4];
160
161 void async_thread_hca_events(void *ctx);
162 void* async_thread_ud_events(void *ctx);
163
164 void init_apm_lock(void);
165 #if 0
166 static struct ibv_srq *create_srq(vapi_nic_t *nic);
167 #endif
168
169 void setup_ud_channel(void);
170
171 void process_recv_completion_from_server(armci_ud_rank *h, cbuf *v);
172 void process_recv_completion_from_client(armci_ud_rank *h, cbuf *v);
173
174 struct ibv_srq *CLN_srq_hndl;
175 struct ibv_srq *SRV_srq_hndl;
176 void post_recv(void);
177 struct Remote_Buf
178 {
179 char **buf;
180 uint32_t *qp_num;
181 uint16_t *lid;
182 uint32_t *rkey;
183 };
184
185 struct HCA
186 {
187 struct ibv_device *ib_dev;
188 struct ibv_context *context;
189 struct ibv_pd *pd;
190 struct ibv_cq *cq;
191 struct ibv_srq *srq_hndl;
192 struct ibv_comp_channel *comp_channel;
193 };
194
195 struct RC_Conn
196 {
197 struct ibv_qp **qp;
198 uint16_t *lid;
199 int *status;
200 uint32_t *qp_num;
201 struct ibv_ah **ud_ah;
202 };
203
204 struct Remote_Buf rbuf;
205
206 struct RC_Conn conn;
207
208 struct HCA hca;
209
210 void handle_network_fault(struct ibv_wc *pdscr);
211
212 int process_recv_completion_from_client_flag;
213
214 int total_active_conn_to_server, total_active_conn_to_client, total_breaks;
215
216 void check_state_of_ib_connection(int a, int b);
217
218 static vapibuf_t **serv_buf_arr;
219 #if !defined(PEND_BUFS)
220 /*These are typically used as spare buffers for communication. Since
221 we do not wait on completion anymore, we need to ensure things work
222 fine when these have in-flight messages. Disabled for now.*/
223 static vapibuf_t *spare_serv_buf, *spare_serv_bufptr;
224 static vapibuf_ext_t *serv_buf;
225 #endif
226
227 static vapirmw_t rmw[64];
228
229 static int *flag_arr; /* flag indicates its receiving scatter data */
230 #define SERV 2
231 #define CLN 1
232
233 #define MAX_DESCR 2
234 typedef struct {
235 int avail;
236 struct ibv_qp *qp;
237 struct ibv_recv_wr *descr;
238 } descr_pool_t;
239
240 static int* _gtmparr;
241 static void* test_ptr;
242 static int test_stride_arr[1];
243 static int test_count[2];
244 static int test_stride_levels;
245 char *MessageRcvBuffer;
246
247 extern void armci_util_wait_int(volatile int *,int,int);
248 void armci_send_data_to_client(int proc, void *buf,int bytes,void *dbuf);
249 void armci_server_register_region(void *,long,ARMCI_MEMHDL_T *);
250 static descr_pool_t serv_descr_pool = {MAX_DESCR,NULL,NULL};
251 static descr_pool_t client_descr_pool = {MAX_DESCR,NULL,NULL};
252
253 /**Buffer (long[1] used to set msginfo->tag.ack_ptr in
254 client-side. See usage in SERVER_SEND_ACK macro*/
255 static long *ack_buf;
256
257 #define GET_DATA_PTR(buf) (sizeof(request_header_t) + (char*)buf)
258
259 #define BUF_TO_SDESCR(buf) ((struct ibv_send_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->sdscr))
260
261 #define BUF_TO_RDESCR(buf) ((struct ibv_recv_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rdscr))
262
263 #define BUF_TO_SSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->ssg_entry))
264
265 #define BUF_TO_RSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rsg_entry))
266
267 #define BUF_TO_ECBUF(buf) (vapibuf_ext_t*)(((char*)buf) - (sizeof(struct ibv_send_wr)+sizeof(struct ibv_recv_wr)+2*sizeof(struct ibv_sge)))
268
269 #define SERVER_SEND_ACK(p) do { \
270 assert(*ack_buf == ARMCI_STAMP); \
271 assert((p)>=0); \
272 armci_send_data_to_client((p),ack_buf, \
273 sizeof(long),msginfo->tag.ack_ptr); \
274 } while(0)
275 /* #define SERVER_SEND_ACK(p) {assert(serv_buf!=NULL);assert(msginfo->from==(p));*((long *)serv_buf->buf)=ARMCI_STAMP;armci_send_data_to_client((p),serv_buf->buf,sizeof(long),msginfo->tag.ack_ptr);} */
276
277 #define SERVER_SEND_DATA(_SS_proc,_SS_src,_SS_dst,_SS_size) {armci_send_data_to_client(_SS_proc,_SS_src,_SS_size,_SS_dst);}
278 #define SERVER_GET_DATA(_SG_proc,_SG_src,_SG_dst,_SG_size) {armci_get_data_from_client(_SG_proc,_SG_src,_SG_size,_SG_dst);}
279
280
281 /*\ descriptors will have unique ID's for the wait on descriptor routine to
282 * complete a descriptor and know where it came from
283 \*/
284
285 #define NUMOFBUFFERS (MAX_BUFS+MAX_SMALL_BUFS)
286 #define DSCRID_FROMBUFS 1
287 #define DSCRID_FROMBUFS_END (DSCRID_FROMBUFS+NUMOFBUFFERS)
288
289 #define DSCRID_NBDSCR 10000
290 #define DSCRID_NBDSCR_END (10000+MAX_PENDING)
291
292 #define DSCRID_SCATGAT 20000
293 #define DSCRID_SCATGAT_END 20000+MAX_PENDING
294
295 #define DSCRID_RMW 30000
296 #define DSCRID_RMW_END 30000+9999
297
298 #if defined(PEND_BUFS)
299 #define DSCRID_PENDBUF (40000)
300 #define DSCRID_PENDBUF_END (DSCRID_PENDBUF + 2*PENDING_BUF_NUM+1)
301
302 #define DSCRID_IMMBUF_RECV (200000)
303 #define DSCRID_IMMBUF_RECV_END (600000)
304
305 #define DSCRID_IMMBUF_RESP (600000)
306 #define DSCRID_IMMBUF_RESP_END (1000000)
307 #endif
308
309 extern double MPI_Wtime();
310 static double inittime0=0,inittime1=0,inittime2=0,inittime4=0;
311
312 static int mark_buf_send_complete[NUMOFBUFFERS+1];
313 static sr_descr_t armci_vapi_client_nbsdscr_array[MAX_PENDING];
314 static sr_descr_t armci_vapi_client_nbrdscr_array[MAX_PENDING];
315 static sr_descr_t armci_vapi_serv_nbsdscr_array[MAX_PENDING];
316 static sr_descr_t armci_vapi_serv_nbrdscr_array[MAX_PENDING];
317
318 void armci_server_transport_cleanup();
319 /********************FUNCTIONS TO CHECK OPENIB RETURN STATUS*******************/
armci_check_status(int debug,int rc,char * msg)320 void armci_check_status(int debug, int rc,char *msg)
321 {
322 dassertp(debug,rc==0,("%d: %s, rc=%d\n",armci_me,msg,rc));
323 /* if(debug)printf("%d:%s, rc = %d\n", armci_me,msg, rc); */
324 /* if(rc!=0)armci_die(msg,rc); */
325 }
326
armci_vapi_check_return(int debug,int ret,const char * ss)327 void armci_vapi_check_return(int debug, int ret, const char *ss)
328 {
329 }
330
armci_vapi_print_dscr_info(struct ibv_send_wr * sr,struct ibv_recv_wr * rr)331 void armci_vapi_print_dscr_info(struct ibv_send_wr *sr, struct ibv_recv_wr *rr)
332 {
333 int i;
334 if(rr){
335 printf("\n%d:print_dscr rr id=%ld sg_lst_len=%d",
336 armci_me, rr->wr_id, rr->num_sge);
337 for (i = 0; i < rr->num_sge; i++) {
338 printf("\n\t:sg_entry=%d addr=%lu len=%d",
339 i, rr->sg_list[i].addr, rr->sg_list[i].length);
340 }
341 fflush(stdout);
342 }
343 if(sr){
344 printf("\n%d:print_dscr sr id=%lu opcode=%d sg_lst_len=%d",
345 armci_me, sr->wr_id, sr->opcode, sr->num_sge);
346 for (i = 0; i < sr->num_sge; i++) {
347 printf("\n\t:sg_entry=%d addr=%lu len=%d",
348 i, sr->sg_list[i].addr, sr->sg_list[i].length);
349 }
350 fflush(stdout);
351 }
352 }
353
354 /*****************END FUNCTIONS TO CHECK VAPI RETURN STATUS********************/
355
armci_recv_complete(struct ibv_recv_wr * rcv_dscr,char * from,int numofrecvs)356 void armci_recv_complete(struct ibv_recv_wr *rcv_dscr,
357 char *from, int numofrecvs)
358 {
359 int rc=0;
360 struct ibv_wc pdscr1;
361 struct ibv_wc *pdscr = &pdscr1;
362 sr_descr_t *rdscr_arr;
363 vapi_nic_t *nic;
364 int debug,i;
365
366 if(SERVER_CONTEXT){
367 rdscr_arr = armci_vapi_serv_nbrdscr_array;
368 nic=CLN_nic;
369 debug = DEBUG_SERVER;
370 }
371 else{
372 rdscr_arr = armci_vapi_client_nbrdscr_array;
373 nic=SRV_nic;
374 debug = DEBUG_CLN;
375 }
376 if(debug){
377 printf("\n%d%s:recv_complete called from %s id=%ld\n",armci_me,
378 ((SERVER_CONTEXT)?"(s)":" "),from,rcv_dscr->wr_id);fflush(stdout);
379 }
380 for(i=0;i<numofrecvs;i++){
381 do{
382 while(rc == 0) {
383 rc = ibv_poll_cq(nic->rcq, 1, pdscr);
384 }
385 dassertp(DBG_POLL|DBG_ALL,rc>=0,
386 ("%d: rc=%d id=%d status=%d (%d/%d)\n",
387 armci_me,rc,pdscr->wr_id,pdscr->status,i,numofrecvs));
388 dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
389 if(debug){
390 if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END)
391 printf("\n%d:recv from %s complete id=%lu num=%d",armci_me,
392 from,pdscr->wr_id,rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs);
393 }
394 if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
395 rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
396 if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
397 rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
398 }
399 else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
400 /*this was from a blocking call, do nothing*/
401 continue;
402 }
403 else {
404 armci_die("\nclient should be posting only one kind of recv",armci_me);
405 }
406 rc = 0;
407 }while(pdscr->wr_id!=rcv_dscr->wr_id);
408 rc = 0;
409 }
410
411 }
412
armci_vapi_set_mark_buf_send_complete(int id)413 void armci_vapi_set_mark_buf_send_complete(int id)
414 {
415 mark_buf_send_complete[id]=0;
416 }
417
armci_send_complete(struct ibv_send_wr * snd_dscr,char * from,int numoftimes)418 void armci_send_complete(struct ibv_send_wr *snd_dscr, char *from,int numoftimes)
419 {
420 int rc=0;
421 struct ibv_wc pdscr1;
422 struct ibv_wc *pdscr = &pdscr1;
423 sr_descr_t *sdscr_arr;
424 vapi_nic_t *nic;
425 int debug,i;
426
427 pdscr1.status = IBV_WC_SUCCESS;
428 /* bzero(&pdscr1, sizeof(pdscr1)); */
429 /* printf("%d: Waiting for send with wr_id=%d to complete\n", armci_me, snd_dscr->wr_id); */
430 /* fflush(stdout); */
431
432 if(SERVER_CONTEXT){
433 sdscr_arr = armci_vapi_serv_nbsdscr_array;
434 nic=CLN_nic;
435 debug = DEBUG_SERVER;
436 }
437 else{
438 sdscr_arr = armci_vapi_client_nbsdscr_array;
439 nic=SRV_nic;
440 debug = DEBUG_CLN;
441 }
442
443 if(debug) {
444 printf("\n%d%s:send_complete called from %s id=%ld nt=%d\n",armci_me,
445 ((SERVER_CONTEXT)?"(s)":" "),from,snd_dscr->wr_id,numoftimes);
446 fflush(stdout);
447 }
448 for(i=0;i<numoftimes;i++){
449 do{
450 while(rc == 0){
451 #if defined(PEND_BUFS)
452 if(SERVER_CONTEXT)
453 rc = ibv_poll_cq(nic->rcq,1,pdscr);
454 else
455 #endif
456 rc = ibv_poll_cq(nic->scq,1, pdscr);
457 }
458 dassertp(DBG_POLL|DBG_ALL,rc>=0,
459 ("%d:rc=%d status=%d id=%d (%d/%d)",armci_me,
460 rc,pdscr->status,(int)pdscr->wr_id,i,numoftimes));
461 dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
462 /* printf("%d: Obtained completion of wr_id=%d\n", armci_me, pdscr->wr_id); */
463 /* fflush(stdout); */
464 if(SERVER_CONTEXT){
465 if(debug)printf("%d:completed id %lu i=%d\n",armci_me,pdscr->wr_id,i);
466 if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
467 sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
468 if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
469 sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
470 }
471 else if(pdscr->wr_id >=armci_nproc && pdscr->wr_id < 2*armci_nproc){
472 /*its coming from send_data_to_client just return*/
473 }
474 #if defined(PEND_BUFS)
475 else if(pdscr->wr_id >= DSCRID_IMMBUF_RESP && pdscr->wr_id>DSCRID_IMMBUF_RESP_END) {
476 /*send from server to client completed*/
477 }
478 #endif
479 else armci_die("server send complete got weird id",pdscr->wr_id);
480 }
481 else{
482 if(debug)printf("%d:completed id %lu i=%d\n",armci_me,pdscr->wr_id,i);
483 if(pdscr->wr_id >=DSCRID_FROMBUFS && pdscr->wr_id < DSCRID_FROMBUFS_END) {
484 /* printf("%d: marking send buffer %d as complete\n", armci_me, pdscr->wr_id);*/
485 mark_buf_send_complete[pdscr->wr_id]=1;
486 }
487 else if(pdscr->wr_id >=DSCRID_NBDSCR && pdscr->wr_id < DSCRID_NBDSCR_END){
488 sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends--;
489 if(sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends==0)
490 sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].tag=0;
491 }
492 else if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
493 sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
494 if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
495 sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
496 }
497 else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
498 /* printf("%d: completed a blocking scatgat descriptor\n", armci_me); */
499 /*this was from a blocking call, do nothing*/
500 continue;
501 }
502 else armci_die("client send complete got weird id",pdscr->wr_id);
503 }
504 rc = 0;
505 }while(pdscr->wr_id!=snd_dscr->wr_id);
506 rc = 0;
507 }
508 }
509
510
armci_dscrlist_recv_complete(int tag,char * from,sr_descr_t * dscr)511 void armci_dscrlist_recv_complete(int tag, char* from,sr_descr_t *dscr)
512 {
513 int i,nr;
514 sr_descr_t *retdscr,*rdscr_arr;
515
516 if(dscr == NULL){
517 if(SERVER_CONTEXT)
518 rdscr_arr = armci_vapi_serv_nbrdscr_array;
519 else
520 rdscr_arr = armci_vapi_client_nbrdscr_array;
521
522 for(i=0;i<MAX_PENDING;i++){
523 if(rdscr_arr[i].tag==tag)
524 break;
525 }
526
527 if(i==MAX_PENDING)return;
528 retdscr = &rdscr_arr[i];
529 }
530 else
531 retdscr=dscr;
532
533 nr = retdscr->numofrecvs;
534 armci_recv_complete(&(retdscr->rdescr),"(s)list_send_complete",nr);
535 }
536
537
armci_dscrlist_send_complete(int tag,char * from,sr_descr_t * dscr)538 void armci_dscrlist_send_complete(int tag,char *from, sr_descr_t *dscr)
539 {
540 int i,ns;
541 sr_descr_t *retdscr,*sdscr_arr;
542 if(dscr==NULL){
543 if(SERVER_CONTEXT)
544 sdscr_arr = armci_vapi_serv_nbsdscr_array;
545 else
546 sdscr_arr = armci_vapi_client_nbsdscr_array;
547
548 for(i=0;i<MAX_PENDING;i++){
549 if(sdscr_arr[i].tag==tag)
550 break;
551 }
552 if(i==MAX_PENDING)return;
553 retdscr=&sdscr_arr[i];
554 }
555 else
556 retdscr=dscr;
557
558 ns = retdscr->numofsends;
559
560 armci_send_complete(&(retdscr->sdescr),"dscrlist_send_complete",ns);
561
562 }
563
armci_client_nbcall_complete(sr_descr_t * dscr,int tag,int op)564 void armci_client_nbcall_complete(sr_descr_t *dscr, int tag, int op)
565 {
566 if(tag != dscr->tag)
567 return;
568
569 THREAD_LOCK(armci_user_threads.net_lock);
570
571 if(op == GET){
572 if(dscr->issg){
573 if(dscr->numofrecvs>0)
574 armci_dscrlist_recv_complete(tag,"armci_client_nbcall_complete recv",
575 dscr);
576 }
577 else{
578 if(dscr->numofsends>0)
579 armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
580 dscr);
581 }
582 }
583 if(op == PUT){
584 if(dscr->numofsends>0)
585 armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
586 dscr);
587 }
588
589 THREAD_UNLOCK(armci_user_threads.net_lock);
590 }
591
592
593 static int cur_serv_pend_descr;
594 static int cur_client_pend_descr;
595
armci_vapi_get_next_rdescr(int nbtag,int sg)596 sr_descr_t *armci_vapi_get_next_rdescr(int nbtag,int sg)
597 {
598 static int serverthreadavail=-1; /*client thread can't touch this*/
599 static int clientthreadavail=-1; /*server thread can't touch this*/
600 int avail,newavail;
601 sr_descr_t *retdscr,*rdscr_arr;
602
603 if(SERVER_CONTEXT){
604 rdscr_arr = armci_vapi_serv_nbrdscr_array;
605 avail = serverthreadavail;
606 /*printf("\n%d:serv thread avail=%d",armci_me,serverthreadavail);*/
607 }
608 else{
609 rdscr_arr = armci_vapi_client_nbrdscr_array;
610 avail = clientthreadavail;
611 }
612 if(avail==-1){
613 int i;
614 for(i=0;i<MAX_PENDING;i++){
615 rdscr_arr[i].tag=0;
616 bzero(&rdscr_arr[i].rdescr,sizeof(struct ibv_recv_wr));
617 if(sg)
618 rdscr_arr[i].rdescr.wr_id = DSCRID_SCATGAT + i;
619 else
620 rdscr_arr[i].rdescr.wr_id = DSCRID_NBDSCR + i;
621 }
622 avail=0;
623 }
624
625 if(rdscr_arr[avail].tag!=0){
626 armci_dscrlist_recv_complete(rdscr_arr[avail].tag,
627 "armci_vapi_get_next_rdescr",&rdscr_arr[avail]);
628 }
629
630 rdscr_arr[avail].tag=nbtag;
631 rdscr_arr[avail].issg=sg;
632 retdscr= (rdscr_arr+avail);
633
634 memset(&retdscr->rdescr,0,sizeof(struct ibv_recv_wr));
635
636 if(sg)
637 retdscr->rdescr.wr_id = DSCRID_SCATGAT + avail;
638 else{
639 retdscr->rdescr.wr_id = DSCRID_NBDSCR + avail;
640 retdscr->numofrecvs=1;
641 }
642
643 newavail = (avail+1)%MAX_PENDING;
644
645 if(SERVER_CONTEXT){
646 cur_serv_pend_descr = avail;
647 serverthreadavail=newavail;
648 }
649 else{
650 cur_client_pend_descr = avail;
651 clientthreadavail=newavail;
652 }
653
654 return(retdscr);
655
656 }
657
armci_vapi_get_next_sdescr(int nbtag,int sg)658 sr_descr_t *armci_vapi_get_next_sdescr(int nbtag,int sg)
659 {
660 static int serverthreadavail=-1; /*client thread can't touch this*/
661 static int clientthreadavail=-1; /*server thread can't touch this*/
662 int avail,newavail;
663 sr_descr_t *retdscr,*sdscr_arr;
664
665 if(SERVER_CONTEXT){
666 sdscr_arr = armci_vapi_serv_nbsdscr_array;
667 avail = serverthreadavail;
668 }
669 else{
670 sdscr_arr = armci_vapi_client_nbsdscr_array;
671 avail = clientthreadavail;
672 }
673
674 if(avail==-1){ /*first call*/
675 int i;
676 for(i=0;i<MAX_PENDING;i++){
677 sdscr_arr[i].tag=0;
678 bzero(&sdscr_arr[i].sdescr,sizeof(struct ibv_send_wr));
679 if(sg)
680 sdscr_arr[i].sdescr.wr_id = DSCRID_SCATGAT+i;
681 else
682 sdscr_arr[i].sdescr.wr_id = DSCRID_NBDSCR + i;
683 }
684 avail=0;
685 }
686
687 if(sdscr_arr[avail].tag!=0){
688 armci_dscrlist_send_complete(sdscr_arr[avail].tag,
689 "armci_vapi_get_next_sdescr",&sdscr_arr[avail]);
690 }
691
692 sdscr_arr[avail].tag=nbtag;
693 sdscr_arr[avail].issg=sg;
694 retdscr= (sdscr_arr+avail);
695
696 memset(&retdscr->sdescr,0,sizeof(struct ibv_recv_wr));
697
698 if(sg)
699 retdscr->sdescr.wr_id = DSCRID_SCATGAT + avail;
700 else{
701 retdscr->sdescr.wr_id = DSCRID_NBDSCR + avail;
702 retdscr->numofsends=1;
703 }
704
705 newavail = (avail+1)%MAX_PENDING;
706
707 if(SERVER_CONTEXT){
708 cur_serv_pend_descr = avail;
709 serverthreadavail=newavail;
710 }
711 else{
712 cur_client_pend_descr = avail;
713 clientthreadavail=newavail;
714 }
715 return(retdscr);
716 }
717
armci_wait_for_server()718 void armci_wait_for_server()
719 {
720 armci_server_terminating = 1;
721 }
722
723
724 /* ibv_create_qp does not use separate structure to return properties,
725 seems it is all inside ibv_qp */
armci_create_qp(vapi_nic_t * nic,struct ibv_qp ** qp)726 static void armci_create_qp(vapi_nic_t *nic, struct ibv_qp **qp)
727 {
728 struct ibv_qp_init_attr initattr;
729
730 bzero(&initattr, sizeof(struct ibv_qp_init_attr));
731
732 *qp = NULL;
733
734 initattr.cap.max_send_wr = armci_max_qp_ous_swr;
735 initattr.cap.max_recv_wr = armci_max_qp_ous_rwr;
736 initattr.cap.max_recv_sge = armci_max_num_sg_ent;
737 initattr.cap.max_send_sge = armci_max_num_sg_ent;
738 #if defined(PEND_BUFS)
739 if(nic==CLN_nic) {
740 initattr.send_cq = nic->rcq;
741 initattr.recv_cq = nic->rcq;
742 }
743 else
744 #endif
745 {
746 initattr.send_cq = nic->scq;
747 initattr.recv_cq = nic->rcq;
748 }
749 initattr.qp_type = IBV_QPT_RC;
750
751 *qp = ibv_create_qp(nic->ptag, &initattr);
752 dassert(1,*qp!=NULL);
753
754 /* The value of inline size should be dependent on number of processes
755 * */
756 if (armci_nproc >= MAX_PROC_INLINE_SIZE) {
757 armci_vapi_max_inline_size = -1;
758 }
759 else {
760 armci_vapi_max_inline_size = initattr.cap.max_inline_data;
761 }
762 }
763
764 int armci_openib_sl;
765 int armci_openib_server_poll;
armci_openib_env_init()766 void armci_openib_env_init()
767 {
768 char *value;
769
770 if ((value = getenv("ARMCI_OPENIB_USE_SL")) != NULL){
771 armci_openib_sl = atoi(value);
772 }
773 else {
774 armci_openib_sl = 0;
775 }
776
777 /* Don't enable server polling by default */
778 if ((value = getenv("ARMCI_OPENIB_SERVER_POLL")) != NULL){
779 armci_openib_server_poll = atoi(value);
780 }
781 else {
782 armci_openib_server_poll = 0;
783 }
784 }
785
armci_init_nic(vapi_nic_t * nic,int scq_entries,int rcq_entries)786 static void armci_init_nic(vapi_nic_t *nic, int scq_entries, int
787 rcq_entries)
788 {
789 int rc, ndevs, i;
790 struct ibv_device **devs=NULL;
791
792 if (nic == SRV_nic) {
793 /* Initialize OpenIB runtime variables only once*/
794 armci_openib_env_init();
795 }
796
797 bzero(nic,sizeof(vapi_nic_t));
798 nic->lid_arr = (uint16_t *)calloc(armci_nproc,sizeof(uint16_t));
799 dassert(1,nic->lid_arr!=NULL);
800
801 devs = ibv_get_device_list(&ndevs);
802
803 char *runtime_devname;
804 int device_found = 0, device_id = 0;
805
806 runtime_devname = getenv("ARMCI_OPENIB_DEVICE");
807
808 if (runtime_devname) {
809 for (i = 0; i < ndevs; i++) {
810 if (!strncmp(ibv_get_device_name(devs[i]), runtime_devname, 32)) {
811 device_found = 1;
812 device_id = i;
813 break;
814 }
815 }
816 }
817 else {
818 device_id = 0;
819 device_found = 1;
820 }
821
822 assert(device_found);
823 nic->handle = ibv_open_device(devs[device_id]);
824
825 nic->maxtransfersize = MAX_RDMA_SIZE;
826
827 nic->vendor = ibv_get_device_name(devs[device_id]);
828
829 rc = ibv_query_device(nic->handle, &nic->attr);
830 assert(0 == rc);
831
832 int down_port_count_check = 0;
833 for (i = 1; i <= 2; i++) {
834 rc = ibv_query_port(nic->handle, (uint8_t)i, &nic->hca_port);
835 assert(0 == rc);
836 if (IBV_PORT_ACTIVE == nic->hca_port.state) {
837 nic->active_port = i;
838 break;
839 }
840 else {
841 down_port_count_check++;
842 }
843 }
844
845 /* Assert that the number of inactive ports is not equal to the number
846 * of down ports on any adapter */
847 assert(down_port_count_check != 2);
848
849 /*save the lid for doing a global exchange later */
850 nic->lid_arr[armci_me] = nic->hca_port.lid;
851
852 /*allocate tag (protection domain) */
853 nic->ptag = ibv_alloc_pd(nic->handle);
854
855 /* properties of scq and rcq required for the cq number, this also needs
856 * to be globally exchanged
857 */
858 nic->scv = 1;
859 nic->rcv = 2;
860 nic->scq = nic->rcq = NULL;
861
862 if(scq_entries) {
863 nic->sch = ibv_create_comp_channel(nic->handle);
864 nic->scq = ibv_create_cq(nic->handle, 16000,
865 nic->scq_cntx,nic->sch, 0);
866 }
867
868 if(rcq_entries) {
869 nic->rch = ibv_create_comp_channel(nic->handle);
870 nic->rcq = ibv_create_cq(nic->handle, 32768,
871 nic->rcq_cntx,nic->rch, 0);
872 }
873
874 ibv_free_device_list(devs);
875
876 armci_max_num_sg_ent = 29;
877 armci_max_qp_ous_swr = 100;
878 armci_max_qp_ous_rwr = 50;
879
880 char *value;
881 if ((value = getenv("ARMCI_USE_ODCM")) != NULL){
882 armci_use_odcm = atoi(value);
883 } else {
884 armci_use_odcm = 0;
885 }
886
887 armci_use_lazy_break = 0;
888
889 if(armci_max_qp_ous_rwr + armci_max_qp_ous_swr>nic->attr.max_qp_wr){
890 armci_max_qp_ous_swr = nic->attr.max_qp_wr/16;
891 armci_max_qp_ous_rwr = nic->attr.max_qp_wr - armci_max_qp_ous_swr;
892 }
893 if(armci_max_num_sg_ent >= nic->attr.max_sge){
894 armci_max_num_sg_ent = nic->attr.max_sge - 1;
895 }
896
897 }
898
899
armci_setaffinity(char * cpu_mapping)900 void armci_setaffinity(char *cpu_mapping) {
901 long N_CPUs_online;
902 cpu_set_t affinity_mask;
903 unsigned long affinity_mask_len = sizeof(affinity_mask);
904 char *tp;
905 char *cp;
906 char tp_str[8];
907 int cpu, i, j;
908
909
910 if (!armci_use_affinity)
911 return;
912
913 /*Get number of CPU on machine */
914 if ((N_CPUs_online = sysconf(_SC_NPROCESSORS_ONLN)) < 1) {
915 perror("sysconf");
916 }
917
918 if (cpu_mapping) {
919 tp = cpu_mapping;
920 j = 0;
921 while (*tp != '\0') {
922 i = 0;
923 cp = tp;
924 while (*cp != '\0' && *cp != ',' && *cp != ':') {
925 cp++;
926 i++;
927 }
928 strncpy(tp_str, tp, i);
929 tp_str[i] = '\0';
930 cpu = atoi(tp_str);
931
932 if (j == armci_me - armci_master) {
933 CPU_ZERO(&affinity_mask);
934 CPU_SET(cpu, &affinity_mask);
935 if (sched_setaffinity(0,
936 affinity_mask_len, &affinity_mask)<0 ) {
937 perror("sched_setaffinity");
938 }
939 break;
940 }
941 tp = cp + 1;
942 j++;
943 }
944
945 free(cpu_mapping);
946 } else {
947 CPU_ZERO(&affinity_mask);
948 CPU_SET(((armci_me) - armci_master) %N_CPUs_online, &affinity_mask);
949
950 if (sched_setaffinity(0,affinity_mask_len,&affinity_mask)<0 ) {
951 perror("sched_setaffinity");
952 }
953 }
954 }
955
956 /****************MEMORY ALLOCATION REGISTRATION DEREGISTRATION****************/
957 static char * serv_malloc_buf_base;
958 #if ARMCI_ENABLE_GPC_CALLS
959 extern gpc_buf_t *gpc_req;
960 #endif
armci_server_alloc_bufs()961 void armci_server_alloc_bufs()
962 {
963 int bytes, total, extra =sizeof(struct ibv_recv_wr)*MAX_DESCR+SIXTYFOUR;
964 int mhsize = armci_nproc*sizeof(armci_vapi_memhndl_t); /* ack */
965 char *tmp, *tmp0;
966 int i;
967 #if defined(PEND_BUFS)
968 int clients = (IMM_BUF_NUM+1)*armci_nproc;
969 #else
970 int clients = armci_nproc;
971 #endif
972
973 /* allocate memory for the recv buffers-must be alligned on 64byte bnd */
974 /* note we add extra one to repost it for the client we are received req */
975 bytes = (clients+1)*sizeof(vapibuf_t)+sizeof(vapibuf_ext_t) + extra+ mhsize
976 #if ARMCI_ENABLE_GPC_CALLS
977 + MAX_GPC_REQ * sizeof(gpc_buf_t)
978 #endif
979 #if defined(PEND_BUFS)
980 + (clients+1)*IMM_BUF_LEN
981 + PENDING_BUF_NUM*(sizeof(vapibuf_pend_t)+PENDING_BUF_LEN)
982 #endif
983 + sizeof(long)
984 + 7*SIXTYFOUR;
985 total = bytes + SIXTYFOUR;
986 if(total%4096!=0)
987 total = total - (total%4096) + 4096;
988 tmp0=tmp = malloc(total);
989 serv_malloc_buf_base = tmp0;
990
991 dassert1(1,tmp!=NULL,(int)total);
992 /* stamp the last byte */
993 serv_tail= tmp + bytes+SIXTYFOUR-1;
994 *serv_tail=SERV_STAMP;
995 /* allocate memory for client memory handle to support put response
996 * in dynamic memory registration protocols */
997 CLN_handle = (armci_vapi_memhndl_t*)tmp;
998 memset(CLN_handle,0,mhsize); /* set it to zero */
999 tmp += mhsize;
1000
1001 #if ARMCI_ENABLE_GPC_CALLS
1002 /* gpc_req memory*/
1003 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1004 gpc_req = (gpc_buf_t *)tmp;
1005 tmp += MAX_GPC_REQ * sizeof(gpc_buf_t);
1006 #endif
1007
1008 /* setup descriptor memory */
1009 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1010 serv_descr_pool.descr= (struct ibv_recv_wr *)(tmp);
1011 tmp += extra;
1012
1013 /* setup ack buffer*/
1014 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1015 ack_buf = (long *)(tmp);
1016 *ack_buf=ARMCI_STAMP;
1017 tmp += sizeof(long);
1018
1019 /* setup buffer pointers */
1020 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1021 serv_buf_arr = (vapibuf_t **)malloc(sizeof(vapibuf_t*)*clients);
1022 for(i=0;i<clients;i++){
1023 serv_buf_arr[i] = (vapibuf_t*)(tmp) + i;
1024 }
1025 tmp = (char *)(serv_buf_arr[0]+clients);
1026
1027 #if defined(PEND_BUFS)
1028 /*setup buffers in immediate buffers*/
1029 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1030 for(i=0; i<clients; i++) {
1031 serv_buf_arr[i]->buf = tmp + i*IMM_BUF_LEN;
1032 }
1033 tmp += clients*IMM_BUF_LEN;
1034
1035 /*setup pending buffers*/
1036 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1037 serv_pendbuf_arr = (vapibuf_pend_t *)(tmp);
1038 tmp=(char *)(serv_pendbuf_arr+PENDING_BUF_NUM);
1039 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1040 for(i=0; i<PENDING_BUF_NUM; i++) {
1041 serv_pendbuf_arr[i].buf = tmp+i*PENDING_BUF_LEN;
1042 assert(serv_pendbuf_arr[i].buf != NULL);
1043 }
1044 tmp += PENDING_BUF_NUM*PENDING_BUF_LEN;
1045 MessageRcvBuffer = NULL;
1046 #else
1047 tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1048 spare_serv_buf = (vapibuf_t *)tmp; /* spare buffer is at the end */
1049 spare_serv_bufptr = spare_serv_buf; /* save the pointer for later */
1050 serv_buf =(vapibuf_ext_t*)(spare_serv_buf+1);
1051 tmp = (char *)(serv_buf+1);
1052
1053 MessageRcvBuffer = serv_buf->buf;
1054 #endif
1055
1056 flag_arr = (int *)malloc(sizeof(int)*armci_nproc);
1057 for (i =0; i<armci_nproc; i++) flag_arr[i] = 9999;
1058
1059 if(DEBUG_SERVER){
1060 printf("\n%d(s):registering mem %p %dbytes ptag=%p handle=%p\n",
1061 armci_me, tmp0,total,(void*)CLN_nic->ptag,(void*)CLN_nic->handle);fflush(stdout);
1062 }
1063
1064 serv_memhandle.memhndl = ibv_reg_mr(CLN_nic->ptag, tmp0, total,
1065 IBV_ACCESS_LOCAL_WRITE |
1066 IBV_ACCESS_REMOTE_WRITE |
1067 IBV_ACCESS_REMOTE_READ);
1068 dassert1(1,serv_memhandle.memhndl!=NULL,total);
1069 serv_memhandle.lkey=serv_memhandle.memhndl->lkey;
1070 serv_memhandle.rkey=serv_memhandle.memhndl->rkey;
1071
1072 /* exchange address of ack/memhandle flag on servers */
1073 if(DEBUG_SERVER){
1074 printf("%d(s):registered mem %p %dbytes mhandle=%p mharr starts%p\n",
1075 armci_me, tmp0, total, (void*)serv_memhandle.memhndl,(void*)CLN_handle);
1076 fflush(stdout);
1077 }
1078 }
1079
1080 static char * client_malloc_buf_base;
armci_vapi_client_mem_alloc(int size)1081 char * armci_vapi_client_mem_alloc(int size)
1082 {
1083 int mod, total;
1084 int extra = MAX_DESCR*sizeof(struct ibv_recv_wr)+SIXTYFOUR;
1085 char *tmp,*tmp0;
1086
1087 /*we use the size passed by the armci_init_bufs routine instead of bytes*/
1088
1089 total = size + extra + 2*SIXTYFOUR;
1090
1091 if(total%4096!=0)
1092 total = total - (total%4096) + 4096;
1093 tmp0 = tmp = malloc(total);
1094 dassert1(1,tmp!=NULL,total);
1095 client_malloc_buf_base = tmp;
1096 #if 0
1097 /*SK: could this lead to a problem at ibv_reg_mr() because of unfixed 'total'?*/
1098 if(ALIGN64ADD(tmp0))tmp0+=ALIGN64ADD(tmp0);
1099 #endif
1100 /* stamp the last byte */
1101 client_tail= tmp + extra+ size +2*SIXTYFOUR-1;
1102 *client_tail=CLIENT_STAMP;
1103
1104 /* we also have a place to store memhandle for zero-copy get */
1105 pinned_handle =(armci_vapi_memhndl_t *) (tmp + extra+ size +SIXTYFOUR-16);
1106
1107 mod = ((ssize_t)tmp)%SIXTYFOUR;
1108 client_descr_pool.descr= (struct ibv_recv_wr*)(tmp+SIXTYFOUR-mod);
1109 tmp += extra;
1110
1111 client_memhandle.memhndl = ibv_reg_mr(SRV_nic->ptag, tmp0, total,
1112 IBV_ACCESS_LOCAL_WRITE |
1113 IBV_ACCESS_REMOTE_WRITE |
1114 IBV_ACCESS_REMOTE_READ);
1115 dassert(1,client_memhandle.memhndl!=NULL);
1116
1117 client_memhandle.lkey = client_memhandle.memhndl->lkey;
1118 client_memhandle.rkey = client_memhandle.memhndl->rkey;
1119 handle_array[armci_me].lkey = client_memhandle.lkey;
1120 handle_array[armci_me].rkey = client_memhandle.rkey;
1121
1122 handle_array[armci_me].memhndl = client_memhandle.memhndl;
1123
1124 if(DEBUG_INIT){
1125 printf("%d: registered client memory %p %dsize tmp=%p \n",
1126 armci_me,tmp0, total, tmp);
1127 fflush(stdout);
1128 }
1129 /*now that we have the handle array, we get every body elses RDMA handle*/
1130 total = (sizeof(armci_vapi_memhndl_t)*armci_nproc)/sizeof(int);
1131 armci_msg_gop_scope(SCOPE_ALL,handle_array,total,"+",ARMCI_INT);
1132
1133 return(tmp);
1134 }
1135
1136
armci_server_register_region(void * ptr,long bytes,ARMCI_MEMHDL_T * memhdl)1137 void armci_server_register_region(void *ptr,long bytes, ARMCI_MEMHDL_T *memhdl)
1138 {
1139 bzero(memhdl,sizeof(ARMCI_MEMHDL_T));
1140
1141 memhdl->memhndl = ibv_reg_mr(CLN_nic->ptag, ptr, bytes,
1142 IBV_ACCESS_LOCAL_WRITE |
1143 IBV_ACCESS_REMOTE_WRITE |
1144 IBV_ACCESS_REMOTE_READ);
1145 dassert(1,memhdl->memhndl!=NULL);
1146
1147 memhdl->lkey=memhdl->memhndl->lkey;
1148 memhdl->rkey=memhdl->memhndl->rkey;
1149
1150 if(DEBUG_SERVER){
1151 printf("\n%d(s):registered lkey=%d rkey=%d ptr=%p end=%p %p\n",armci_me,
1152 memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes,(void*)memhdl);
1153 fflush(stdout);
1154 }
1155 }
1156
armci_pin_contig_hndl(void * ptr,size_t bytes,ARMCI_MEMHDL_T * memhdl)1157 int armci_pin_contig_hndl(void *ptr, size_t bytes, ARMCI_MEMHDL_T *memhdl)
1158 {
1159 memhdl->memhndl = ibv_reg_mr(SRV_nic->ptag, ptr, bytes,
1160 IBV_ACCESS_LOCAL_WRITE |
1161 IBV_ACCESS_REMOTE_WRITE |
1162 IBV_ACCESS_REMOTE_READ);
1163 dassert(1,memhdl->memhndl!=NULL);
1164 memhdl->lkey=memhdl->memhndl->lkey;
1165 memhdl->rkey=memhdl->memhndl->rkey;
1166 if(0){
1167 printf("\n%d:registered lkey=%d rkey=%d ptr=%p end=%p\n",armci_me,
1168 memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes);fflush(stdout);
1169 }
1170 return 1;
1171 }
1172
1173 #if 1
armci_network_client_deregister_memory(ARMCI_MEMHDL_T * mh)1174 void armci_network_client_deregister_memory(ARMCI_MEMHDL_T *mh)
1175 {
1176 int rc;
1177 rc = ibv_dereg_mr(mh->memhndl);
1178 dassert1(1,rc==0,rc);
1179 armci_vapi_check_return(DEBUG_FINALIZE,rc,
1180 "armci_network_client_deregister_memory:deregister_mr");
1181 }
armci_network_server_deregister_memory(ARMCI_MEMHDL_T * mh)1182 void armci_network_server_deregister_memory(ARMCI_MEMHDL_T *mh)
1183 {
1184 int rc;
1185 return; /* ??? why ??? */
1186 printf("\n%d:deregister ptr=%p",armci_me,(void*)mh);fflush(stdout);
1187 rc = ibv_dereg_mr(mh->memhndl);
1188 dassert1(1,rc==0,rc);
1189 armci_vapi_check_return(DEBUG_FINALIZE,rc,
1190 "armci_network_server_deregister_memory:deregister_mr");
1191 }
1192 #else
1193 # define armci_network_client_deregister_memory(mh) \
1194 armci_vapi_check_return(DEBUG_FINALIZE, \
1195 ibv_dereg_mr(mh->memhndl), \
1196 "armci_network_client_deregister_memory:deregister_mr")
1197 # define armci_network_server_deregister_memory(mh) \
1198 armci_vapi_check_return(DEBUG_FINALIZE, \
1199 ibv_dereg_mr(mh->memhndl), \
1200 "armci_network_server_deregister_memory:deregister_mr")
1201 #endif
1202
armci_set_serv_mh()1203 void armci_set_serv_mh()
1204 {
1205 int s, ratio = sizeof(ack_t)/sizeof(int);
1206 /* first collect addrresses on all masters */
1207 if(armci_me == armci_master){
1208 SRV_ack[armci_clus_me].prem_handle=CLN_handle;
1209 SRV_ack[armci_clus_me].handle =serv_memhandle;
1210 armci_msg_gop_scope(SCOPE_MASTERS,SRV_ack,ratio*armci_nclus,"+",
1211 ARMCI_INT);
1212 }
1213 /* next master broadcasts the addresses within its node */
1214 armci_msg_bcast_scope(SCOPE_NODE,SRV_ack,armci_nclus*sizeof(ack_t),
1215 armci_master);
1216
1217 /* Finally save address corresponding to my id on each server */
1218 for(s=0; s< armci_nclus; s++){
1219 SRV_ack[s].prem_handle += armci_me;
1220 }
1221
1222 }
1223 /**********END MEMORY ALLOCATION REGISTRATION AND DEREGISTRATION**************/
1224
1225 /*\
1226 * init_connections, client_connect_to_servers -- client code
1227 * server_initial_connection, all_data_server -- server code
1228 \*/
armci_init_connections()1229 void armci_init_connections()
1230 {
1231 int c,s;
1232 int sz;
1233 uint32_t *tmpbuf;
1234 int *tmparr;
1235 if(TIME_INIT)inittime0 = MPI_Wtime();
1236
1237 #if defined(PEND_BUFS)
1238 armci_pbuf_init_buffer_env();
1239 #endif
1240
1241 armci_setaffinity(NULL);
1242
1243 /* initialize nic connection for qp numbers and lid's */
1244 armci_init_nic(SRV_nic,1,1);
1245 for(c=0; c < NUMOFBUFFERS+1; c++) {
1246 mark_buf_send_complete[c]=1;
1247 }
1248 _gtmparr = (int *)calloc(armci_nproc,sizeof(int));
1249
1250 /*qp_numbers and lids need to be exchanged globally*/
1251 tmparr = (int *)calloc(armci_nproc,sizeof(int));
1252 tmparr[armci_me] = SRV_nic->lid_arr[armci_me];
1253 sz = armci_nproc;
1254 armci_msg_gop_scope(SCOPE_ALL,tmparr,sz,"+",ARMCI_INT);
1255 for(c=0;c<armci_nproc;c++){
1256 SRV_nic->lid_arr[c]=tmparr[c];
1257 tmparr[c]=0;
1258 }
1259 /*SRV_con is for client to connect to servers */
1260 SRV_con=(armci_connect_t *)malloc(sizeof(armci_connect_t)*armci_nclus);
1261 dassert1(1,SRV_con!=NULL,sizeof(armci_connect_t)*armci_nclus);
1262 bzero(SRV_con,sizeof(armci_connect_t)*armci_nclus);
1263
1264 CLN_con=(armci_connect_t*)malloc(sizeof(armci_connect_t)*armci_nproc);
1265 dassert1(1,CLN_con!=NULL,sizeof(armci_connect_t)*armci_nproc);
1266 bzero(CLN_con,sizeof(armci_connect_t)*armci_nproc);
1267
1268 /*every client creates a qp with every server other than the one on itself*/
1269 SRV_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1270 dassert(1,SRV_rqpnums);
1271 tmpbuf = (uint32_t*)calloc(armci_nproc,sizeof(uint32_t));
1272 dassert(1,tmpbuf);
1273
1274 sz = armci_nproc*(sizeof(uint32_t)/sizeof(int));
1275 armci_vapi_max_inline_size = 0;
1276
1277
1278 if (!armci_use_odcm) {
1279 for(s = 0; s < armci_nclus; s++){
1280 armci_connect_t *con = SRV_con + s;
1281 {
1282 armci_create_qp(SRV_nic,&con->qp);
1283 con->sqpnum = con->qp->qp_num;
1284 tmpbuf[armci_clus_info[s].master] = con->qp->qp_num;
1285 con->lid = SRV_nic->lid_arr[s];
1286 }
1287 }
1288 MPI_Alltoall(tmpbuf,sizeof(uint32_t),MPI_CHAR,SRV_rqpnums,
1289 sizeof(uint32_t),MPI_CHAR,ARMCI_COMM_WORLD);
1290 free(tmpbuf);
1291
1292 if(armci_me != armci_master) {
1293 free(SRV_rqpnums);
1294 SRV_rqpnums = NULL;
1295 }
1296 }
1297 else {
1298 for(s = 0; s < armci_nclus; s++){
1299 armci_connect_t *con = SRV_con + s;
1300 con->state = QP_INACTIVE;
1301 }
1302 }
1303
1304 SRV_ack = (ack_t*)calloc(armci_nclus, sizeof(ack_t));
1305 dassert1(1,SRV_ack!=NULL,armci_nclus*sizeof(ack_t));
1306
1307 handle_array = (armci_vapi_memhndl_t *)calloc(sizeof(armci_vapi_memhndl_t),
1308 armci_nproc);
1309 dassert1(1,handle_array!=NULL,sizeof(armci_vapi_memhndl_t)*armci_nproc);
1310
1311 if (armci_use_odcm) {
1312 setup_ud_channel();
1313 }
1314 }
1315
vapi_connect_client()1316 static void vapi_connect_client()
1317 {
1318 int i, sz=0, c, rc;
1319 struct ibv_qp_attr qp_attr;
1320 enum ibv_qp_attr_mask qp_attr_mask;
1321
1322 if (TIME_INIT) inittime0 = MPI_Wtime();
1323 if (armci_me == armci_master)
1324 armci_util_wait_int(&armci_vapi_server_stage1, 1, 10);
1325 if (TIME_INIT) printf("\n%d:wait for server to get to stage 1 time for "
1326 "vapi_connect_client is %f",
1327 armci_me, (inittime1 = MPI_Wtime()) - inittime0);
1328 sz = armci_nproc;
1329 if (armci_me == armci_master) {
1330 armci_msg_gop_scope(SCOPE_MASTERS, _gtmparr, sz, "+", ARMCI_INT);
1331 for (c=0; c<armci_nproc; c++) {
1332 CLN_nic->lid_arr[c] = _gtmparr[c];
1333 _gtmparr[c] = 0;
1334 }
1335 if (DEBUG_CLN) {
1336 printf("\n%d(svc): mylid = %d",armci_me,CLN_nic->lid_arr[armci_me]);
1337 fflush(stdout);
1338 }
1339 }
1340
1341 armci_vapi_client_stage1 = 1;
1342
1343 /* allocate and initialize connection structs */
1344 sz = armci_nproc*sizeof(uint32_t)/sizeof(int);
1345
1346 if (armci_me == armci_master)
1347 armci_util_wait_int(&armci_vapi_server_stage2, 1, 10);
1348 #if 0
1349 for (c = 0; c < armci_nproc; c++){
1350 armci_connect_t *con = CLN_con + c;
1351 if (armci_me != armci_master) {
1352 char *ptrr;
1353 int extra;
1354 ptrr = malloc(8 + sizeof(uint32_t) * armci_nproc);
1355 extra = ALIGNLONGADD(ptrr);
1356 ptrr = ptrr + extra;
1357 con->rqpnum = (uint32_t *)ptrr;
1358 bzero(con->rqpnum, sizeof(uint32_t) * armci_nproc);
1359 }
1360 armci_msg_gop_scope(SCOPE_ALL, con->rqpnum, sz, "+", ARMCI_INT);
1361 }
1362 #else
1363 CLN_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1364
1365 if (!armci_use_odcm) {
1366 if(armci_me != armci_master) {
1367 /*just has junk*/
1368 CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1369 }
1370 dassert(1, CLN_rqpnumtmpbuf);
1371 MPI_Alltoall(CLN_rqpnumtmpbuf, sizeof(uint32_t), MPI_CHAR,
1372 CLN_rqpnums, sizeof(uint32_t), MPI_CHAR, ARMCI_COMM_WORLD);
1373 free(CLN_rqpnumtmpbuf);
1374 CLN_rqpnumtmpbuf=NULL;
1375 #endif
1376
1377 if (TIME_INIT) printf("\n%d:wait for server tog et to stage 2 time for "
1378 "vapi_connect_client is %f",
1379 armci_me, (inittime2 = MPI_Wtime()) - inittime1);
1380 /*armci_set_serv_mh();*/
1381
1382 if (DEBUG_CLN) {
1383 printf("%d:all connections ready\n", armci_me);
1384 fflush(stdout);
1385 }
1386
1387 /* For sanity */
1388 memset(&qp_attr, 0, sizeof qp_attr);
1389 /* Modifying QP to INIT */
1390 qp_attr_mask = IBV_QP_STATE
1391 | IBV_QP_PKEY_INDEX
1392 | IBV_QP_PORT
1393 | IBV_QP_ACCESS_FLAGS;
1394
1395 qp_attr.qp_state = IBV_QPS_INIT;
1396 qp_attr.pkey_index = DEFAULT_PKEY_IX;
1397 qp_attr.port_num = SRV_nic->active_port;
1398 qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
1399
1400 /* start from from server on my_node -1 */
1401 for (i = 0; i < armci_nclus; i++) {
1402 armci_connect_t *con;
1403 con = SRV_con + i;
1404 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1405 dassertp(1,!rc,("%d: client RST->INIT i=%d rc=%d\n",armci_me,i,rc));
1406 }
1407
1408 if (TIME_INIT) printf("\n%d:to init time for vapi_connect_client is %f",
1409 armci_me, (inittime1 = MPI_Wtime()) - inittime2);
1410 qp_attr_mask = IBV_QP_STATE
1411 | IBV_QP_MAX_DEST_RD_ATOMIC
1412 | IBV_QP_PATH_MTU
1413 | IBV_QP_RQ_PSN
1414 | IBV_QP_MIN_RNR_TIMER;
1415 memset(&qp_attr, 0, sizeof qp_attr);
1416
1417 qp_attr.qp_state = IBV_QPS_RTR;
1418 qp_attr.max_dest_rd_atomic = 4;
1419 qp_attr.path_mtu = IBV_MTU_1024;
1420 qp_attr.rq_psn = 0;
1421 qp_attr.min_rnr_timer = RNR_TIMER;
1422
1423 /* AV: Adding the service level parameter */
1424 qp_attr.ah_attr.sl = armci_openib_sl;
1425
1426 for (i = 0; i < armci_nclus; i++) {
1427 armci_connect_t *con;
1428 #if 0
1429 armci_connect_t *conS;
1430 #endif
1431 con = SRV_con + i;
1432 #if 0
1433 conS = CLN_con + armci_me;
1434 #endif
1435 qp_attr_mask |= IBV_QP_AV | IBV_QP_DEST_QPN;
1436 #if 0
1437 qp_attr.dest_qp_num = conS->rqpnum[armci_clus_info[i].master];
1438 #else
1439 qp_attr.dest_qp_num = CLN_rqpnums[armci_clus_info[i].master];
1440 #endif
1441 qp_attr.ah_attr.dlid = SRV_nic->lid_arr[armci_clus_info[i].master];
1442 qp_attr.ah_attr.port_num = SRV_nic->active_port;
1443
1444 qp_attr.ah_attr.sl = armci_openib_sl;
1445
1446 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1447 dassertp(1,!rc,("%d: INIT->RTR client i=%d rc=%d\n",armci_me,i,rc));
1448 }
1449 }
1450 /*to to to RTS, other side must be in RTR*/
1451
1452 armci_msg_barrier();
1453 if (TIME_INIT) printf("\n%d:init to rtr time for vapi_connect_client is %f",
1454 armci_me, (inittime2 = MPI_Wtime()) - inittime1);
1455 armci_vapi_client_ready=1;
1456
1457 if (!armci_use_odcm) {
1458
1459 qp_attr_mask = IBV_QP_STATE
1460 | IBV_QP_SQ_PSN
1461 | IBV_QP_TIMEOUT
1462 | IBV_QP_RETRY_CNT
1463 | IBV_QP_RNR_RETRY
1464 | IBV_QP_MAX_QP_RD_ATOMIC;
1465
1466 memset(&qp_attr, 0, sizeof qp_attr);
1467
1468 qp_attr.qp_state = IBV_QPS_RTS;
1469 qp_attr.sq_psn = 0;
1470 qp_attr.timeout = 18;
1471 qp_attr.retry_cnt = 7;
1472 qp_attr.rnr_retry = 7;
1473 qp_attr.max_rd_atomic = 4;
1474
1475 for (i = 0; i < armci_nclus; i++){
1476 armci_connect_t *con;
1477 con = SRV_con + i;
1478 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1479 dassertp(1,!rc,("%d: client RTR->RTS i=%d rc=%d\n",armci_me,i,rc));
1480 }
1481 if (TIME_INIT) printf("\n%d:rtr to rts time for vapi_connect_client is %f",
1482 armci_me, (inittime1 = MPI_Wtime()) - inittime2);
1483 free(CLN_rqpnums);
1484 CLN_rqpnums=NULL;
1485 }
1486
1487 }
1488
1489
armci_client_connect_to_servers()1490 void armci_client_connect_to_servers()
1491 {
1492 extern void armci_util_wait_int(volatile int *,int,int);
1493 if (TIME_INIT) inittime0 = MPI_Wtime();
1494 _armci_buf_init();
1495
1496 vapi_connect_client();
1497 if (armci_me == armci_master)
1498 armci_util_wait_int(&armci_vapi_server_ready,1,10);
1499 armci_msg_barrier();
1500 if (DEBUG_CLN && armci_me == armci_master) {
1501 printf("\n%d:server_ready=%d\n",armci_me,armci_vapi_server_ready);
1502 fflush(stdout);
1503 }
1504 if (TIME_INIT) printf("\n%d:time for client_connect_to_s is %f",
1505 armci_me,MPI_Wtime()-inittime0);
1506 }
1507
1508
armci_init_vapibuf_recv(struct ibv_recv_wr * rd,struct ibv_sge * sg_entry,char * buf,int len,armci_vapi_memhndl_t * mhandle)1509 void armci_init_vapibuf_recv(struct ibv_recv_wr *rd, struct ibv_sge *sg_entry,
1510 char *buf, int len, armci_vapi_memhndl_t *mhandle)
1511 {
1512 memset(rd,0,sizeof(struct ibv_recv_wr));
1513 rd->next = NULL;
1514 rd->num_sge = 1;
1515 rd->sg_list = sg_entry;
1516 rd->wr_id = 0;
1517
1518 sg_entry->lkey = mhandle->lkey;
1519 sg_entry->addr = (uint64_t)buf;
1520 sg_entry->length = len;
1521 }
1522
1523
armci_init_vapibuf_send(struct ibv_send_wr * sd,struct ibv_sge * sg_entry,char * buf,int len,armci_vapi_memhndl_t * mhandle)1524 void armci_init_vapibuf_send(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
1525 char *buf, int len, armci_vapi_memhndl_t *mhandle)
1526 {
1527 sd->opcode = IBV_WR_SEND;
1528 sd->next = NULL;
1529 sd->send_flags = IBV_SEND_SIGNALED;
1530 sd->num_sge = 1;
1531 sd->sg_list = sg_entry;
1532
1533 sg_entry->lkey = mhandle->lkey;
1534 sg_entry->addr = (uint64_t)buf;
1535 sg_entry->length = len;
1536 }
1537
1538
armci_init_cbuf_srdma(struct ibv_send_wr * sd,struct ibv_sge * sg_entry,char * lbuf,char * rbuf,int len,armci_vapi_memhndl_t * lhandle,armci_vapi_memhndl_t * rhandle)1539 static void armci_init_cbuf_srdma(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
1540 char *lbuf, char *rbuf, int len,
1541 armci_vapi_memhndl_t *lhandle,
1542 armci_vapi_memhndl_t *rhandle)
1543 {
1544 /* NOTE: sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
1545 sd->opcode = IBV_WR_RDMA_WRITE;
1546 sd->send_flags = IBV_SEND_SIGNALED;
1547 sd->next = NULL;
1548 sd->num_sge = 1;
1549 sd->sg_list = sg_entry;
1550 if (rhandle) sd->wr.rdma.rkey = rhandle->rkey;
1551 sd->wr.rdma.remote_addr = (uint64_t)rbuf;
1552
1553 if (lhandle) sg_entry->lkey = lhandle->lkey;
1554 sg_entry->addr = (uint64_t)lbuf;
1555 sg_entry->length = len;
1556 }
1557
1558
armci_init_cbuf_rrdma(struct ibv_send_wr * sd,struct ibv_sge * sg_entry,char * lbuf,char * rbuf,int len,armci_vapi_memhndl_t * lhandle,armci_vapi_memhndl_t * rhandle)1559 static void armci_init_cbuf_rrdma(struct ibv_send_wr *sd, struct ibv_sge
1560 *sg_entry, char *lbuf, char *rbuf, int len, armci_vapi_memhndl_t
1561 *lhandle, armci_vapi_memhndl_t *rhandle)
1562 {
1563 sd->opcode = IBV_WR_RDMA_READ;
1564 sd->next = NULL;
1565 sd->send_flags = IBV_SEND_SIGNALED;
1566 sd->num_sge = 1;
1567 sd->sg_list = sg_entry;
1568 sd->wr.ud.remote_qkey = 0;
1569 if (rhandle) sd->wr.rdma.rkey = rhandle->rkey;
1570 sd->wr.rdma.remote_addr = (uint64_t)rbuf;
1571
1572 if (lhandle) sg_entry->lkey = lhandle->lkey;
1573 sg_entry->addr = (uint64_t)lbuf;
1574 sg_entry->length = len;
1575 /* sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
1576 }
1577
1578
armci_server_initial_connection()1579 void armci_server_initial_connection()
1580 {
1581 int c, rc, i, j;
1582 struct ibv_qp_attr qp_attr;
1583 enum ibv_qp_attr_mask qp_attr_mask;
1584 struct ibv_recv_wr *bad_wr;
1585
1586 if (TIME_INIT)
1587 inittime0 = MPI_Wtime();
1588
1589 if (DEBUG_SERVER) {
1590 printf("in server after fork %d (%d)\n",armci_me,getpid());
1591 fflush(stdout);
1592 }
1593
1594 #if defined(PEND_BUFS) && !defined(SERVER_THREAD)
1595 armci_pbuf_init_buffer_env();
1596 #endif
1597 armci_init_nic(CLN_nic,1,1);
1598 if (!armci_openib_server_poll) {
1599 /*
1600 * Start a notify event request immediately after creation so
1601 * nothing is missed.
1602 */
1603 rc = ibv_req_notify_cq(CLN_nic->rcq, 0);
1604 dassert1(1,rc==0,rc);
1605 }
1606
1607 _gtmparr[armci_me] = CLN_nic->lid_arr[armci_me];
1608 armci_vapi_server_stage1 = 1;
1609 armci_util_wait_int(&armci_vapi_client_stage1, 1, 10);
1610
1611 CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1612 dassert(1, CLN_rqpnumtmpbuf);
1613
1614 if (!armci_use_odcm) {
1615 for (c = 0; c < armci_nproc; c++) {
1616 armci_connect_t *con = CLN_con + c;
1617 armci_create_qp(CLN_nic, &con->qp);
1618 con->sqpnum = con->qp->qp_num;
1619 con->lid = CLN_nic->lid_arr[c];
1620 CLN_rqpnumtmpbuf[c] = con->qp->qp_num;
1621 }
1622 }
1623 else {
1624 for (c = 0; c < armci_nproc; c++) {
1625 armci_connect_t *con = CLN_con + c;
1626 con->state = QP_INACTIVE;
1627 }
1628 }
1629
1630 armci_vapi_server_stage2 = 1;
1631
1632 if (!armci_use_odcm) {
1633 qp_attr_mask = IBV_QP_STATE
1634 | IBV_QP_PKEY_INDEX
1635 | IBV_QP_PORT
1636 | IBV_QP_ACCESS_FLAGS;
1637
1638 memset(&qp_attr, 0, sizeof qp_attr);
1639 qp_attr.qp_state = IBV_QPS_INIT;
1640 qp_attr.pkey_index = DEFAULT_PKEY_IX;
1641 qp_attr.port_num = CLN_nic->active_port;
1642 qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
1643 IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
1644
1645 for (c = 0; c < armci_nproc; c++) {
1646 armci_connect_t *con = CLN_con + c;
1647 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1648 dassertp(1,!rc,("%d: RTS->INIT server c=%d rc=%d\n",armci_me,c,rc));
1649 }
1650
1651 memset(&qp_attr, 0, sizeof qp_attr);
1652 qp_attr_mask = IBV_QP_STATE
1653 | IBV_QP_MAX_DEST_RD_ATOMIC
1654 | IBV_QP_PATH_MTU
1655 | IBV_QP_RQ_PSN
1656 | IBV_QP_MIN_RNR_TIMER;
1657 qp_attr.qp_state = IBV_QPS_RTR;
1658 qp_attr.path_mtu = IBV_MTU_1024;
1659 qp_attr.max_dest_rd_atomic = 4;
1660 qp_attr.min_rnr_timer = RNR_TIMER;
1661 qp_attr.rq_psn = 0;
1662
1663 for(c = 0; c < armci_nproc; c++) {
1664 armci_connect_t *con = CLN_con + c;
1665 qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
1666 qp_attr.dest_qp_num = SRV_rqpnums[c];
1667 qp_attr.ah_attr.dlid = SRV_nic->lid_arr[c];
1668 qp_attr.ah_attr.port_num = CLN_nic->active_port;
1669
1670 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1671 dassertp(1,!rc,("%d: INIT->RTR server cln=%d rc=%d\n",armci_me,c,rc));
1672 }
1673 }
1674 armci_util_wait_int(&armci_vapi_client_ready,1,10);
1675 memset(&qp_attr, 0, sizeof qp_attr);
1676
1677 if (!armci_use_odcm) {
1678 qp_attr_mask = IBV_QP_STATE
1679 | IBV_QP_SQ_PSN
1680 | IBV_QP_TIMEOUT
1681 | IBV_QP_RETRY_CNT
1682 | IBV_QP_RNR_RETRY
1683 | IBV_QP_MAX_QP_RD_ATOMIC;
1684
1685 qp_attr.qp_state = IBV_QPS_RTS;
1686 qp_attr.sq_psn = 0;
1687 qp_attr.timeout = 18;
1688 qp_attr.retry_cnt = 7;
1689 qp_attr.rnr_retry = 7;
1690 qp_attr.max_rd_atomic = 4;
1691
1692 for (c = 0; c < armci_nproc; c++) {
1693 armci_connect_t *con = CLN_con + c;
1694 rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
1695 dassertp(1,!rc,("%d: server RTR->RTS cln=%d rc=%d\n",armci_me,c,rc));
1696 }
1697 free(SRV_rqpnums);
1698 SRV_rqpnums = NULL;
1699 }
1700
1701 armci_server_alloc_bufs();
1702
1703 if (!armci_use_odcm) {
1704 /* setup descriptors and post nonblocking receives */
1705 #if defined(PEND_BUFS)
1706 assert(armci_nproc*(IMM_BUF_NUM+1)<DSCRID_IMMBUF_RECV_END-DSCRID_IMMBUF_RECV);
1707 for(i = 0; i < armci_nproc; i++) {
1708 for(j=0; j<IMM_BUF_NUM+1; j++) {
1709 vapibuf_t *cbuf;
1710 cbuf = serv_buf_arr[i*(IMM_BUF_NUM+1)+j];
1711 armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
1712 IMM_BUF_LEN, &serv_memhandle);
1713 /* we use index of the buffer to identify the buffer, this index is
1714 * returned with a call to ibv_poll_cq inside the ibv_wr */
1715 cbuf->dscr.wr_id = i*(IMM_BUF_NUM+1)+j + DSCRID_IMMBUF_RECV;
1716 if (DEBUG_SERVER) {
1717 printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
1718 fflush(stdout);
1719 }
1720 rc = ibv_post_recv((CLN_con+i)->qp, &cbuf->dscr, &bad_wr);
1721 dassert1(1,rc==0,rc);
1722 }
1723 }
1724 #else
1725 for(i = 0; i < armci_nproc; i++) {
1726 vapibuf_t *cbuf;
1727 cbuf = serv_buf_arr[i];
1728 armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
1729 CBUF_DLEN, &serv_memhandle);
1730 /* we use index of the buffer to identify the buffer, this index is
1731 * returned with a call to ibv_poll_cq inside the ibv_wr */
1732 cbuf->dscr.wr_id = i+armci_nproc;
1733 if (DEBUG_SERVER) {
1734 printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
1735 fflush(stdout);
1736 }
1737 rc = ibv_post_recv((CLN_con+i)->qp, &cbuf->dscr, &bad_wr);
1738 dassert1(1,rc==0,rc);
1739 }
1740 #endif
1741 }
1742 if (TIME_INIT) printf("\n%d:post time for server_initial_conn is %f",
1743 armci_me, MPI_Wtime() - inittime4);
1744
1745 armci_vapi_server_ready=1;
1746
1747 if (DEBUG_SERVER) {
1748 printf("%d: server connected to all clients\n",armci_me); fflush(stdout);
1749 }
1750 if (TIME_INIT) printf("\n%d:time for server_initial_conn is %f",
1751 armci_me, MPI_Wtime() - inittime0);
1752 }
1753
armci_finalize_nic(vapi_nic_t * nic)1754 static void armci_finalize_nic(vapi_nic_t *nic)
1755 {
1756 int ret;
1757
1758 ret = ibv_destroy_cq(nic->scq);
1759 dassert1(1,ret==0,ret);
1760 armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_scq");
1761
1762 ret = ibv_destroy_comp_channel(nic->sch);
1763 dassert1(1,ret==0,ret);
1764 armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_sch");
1765
1766 ret = ibv_destroy_cq(nic->rcq);
1767 dassert1(1,ret==0,ret);
1768 armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rcq");
1769
1770 ret = ibv_destroy_comp_channel(nic->rch);
1771 dassert1(1,ret==0,ret);
1772 armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rch");
1773
1774 ret = ibv_close_device(nic->handle);
1775 dassert1(1,ret==0,ret);
1776
1777 armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:release_hca");
1778
1779 }
1780
1781
armci_server_transport_cleanup()1782 void armci_server_transport_cleanup()
1783 {
1784 int s;
1785 int rc;
1786
1787 /*first we have empty send/recv queues TBD*/
1788 if(serv_malloc_buf_base){
1789 rc = ibv_dereg_mr(serv_memhandle.memhndl);
1790 dassert1(1,rc==0,rc);
1791 armci_vapi_check_return(DEBUG_FINALIZE,rc,
1792 "armci_server_transport_cleanup:deregister_mr");
1793 /*now free it*/
1794 free(serv_malloc_buf_base);
1795 }
1796 /*now deregister all my regions from regionskk.c*/
1797 armci_server_region_destroy();
1798 if (CLN_con) {
1799 for (s = 0; s < armci_nproc; s++) {
1800 armci_connect_t *con = CLN_con + s;
1801 if (con->qp) {
1802 rc = ibv_destroy_qp(con->qp);
1803 armci_vapi_check_return(DEBUG_FINALIZE,rc,
1804 "armci_server_transport_cleanup:destroy_qp");
1805 }
1806 #if 0
1807 free(con->rqpnum);
1808 #endif
1809 }
1810 free(CLN_con);
1811 }
1812 armci_finalize_nic(CLN_nic);
1813 }
1814
armci_transport_cleanup()1815 void armci_transport_cleanup()
1816 {
1817 int s;
1818 int rc;
1819
1820 /*first deregister buffers memory */
1821 if (client_malloc_buf_base) {
1822 rc = ibv_dereg_mr(client_memhandle.memhndl);
1823 dassert1(1,rc==0,rc);
1824 armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:deregister_mr");
1825 /*now free it*/
1826 free(client_malloc_buf_base);
1827 }
1828 /*now deregister all my regions from regions.c*/
1829 armci_region_destroy();
1830 if (SRV_con) {
1831 for (s = 0; s < armci_nclus; s++) {
1832 armci_connect_t *con = SRV_con + s;
1833 if (con->qp) {
1834 rc = ibv_destroy_qp(con->qp);
1835 dassert1(1,rc==0,rc);
1836 armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:destroy_qp");
1837 }
1838 #if 0
1839 free(con->rqpnum);
1840 #endif
1841 }
1842 free(SRV_con);
1843 }
1844 armci_finalize_nic(SRV_nic);
1845 }
1846
1847 /** Post an immediate buffer back for the client to send.
1848 */
_armci_pendbuf_post_immbuf(vapibuf_t * cbuf,int to)1849 static void _armci_pendbuf_post_immbuf(vapibuf_t *cbuf, int to) {
1850 int rc;
1851 struct ibv_recv_wr *bad_wr;
1852 #if defined(PEND_BUFS)
1853 assert(cbuf->dscr.wr_id == cbuf-serv_buf_arr[0]+DSCRID_IMMBUF_RECV);
1854 #endif
1855 rc = ibv_post_recv((CLN_con+to)->qp, &(cbuf->dscr), &bad_wr);
1856 dassert1(1,rc==0,rc);
1857 }
1858
1859 #if defined(PEND_BUFS)
1860 #define DSCRID_TO_IMMBUFID(x) (x-DSCRID_IMMBUF_RECV)
1861 #else
1862 #define DSCRID_TO_IMMBUFID(x) ((x)-armci_nproc)
1863 #endif
1864
1865 #if defined(PEND_BUFS)
1866
1867 /**Obtain a message receive buffer to receive a message. Used in place
1868 * of MessageRcvBuffer. Should not be used.
1869 */
armci_openib_get_msg_rcv_buf(int proc)1870 char *armci_openib_get_msg_rcv_buf(int proc)
1871 {
1872 armci_die("PEND_BUFS in OPENIB: MessageRcvBuffer not available. Should use the in-place buffers to receive data", proc);
1873 return NULL;
1874 }
1875
1876 /** Check that the data is in a server allocated buffer. This is
1877 * guaranteed to be pinned. Ideally, this should always be true. Any
1878 * operation that request alternative support will have to fix this
1879 * function and possibly @armci_openib_get_msg_rcv_buf().
1880 * @param br IN Buffer pointer being checked
1881 * @return 1 if it is a server-allocated buffer. 0 otherwise.
1882 */
armci_data_in_serv_buf(void * br)1883 int armci_data_in_serv_buf(void *br)
1884 {
1885 if(br>=(void *)serv_malloc_buf_base && br<(void *)serv_tail)
1886 return 1;
1887 if(DEBUG_SERVER) {
1888 printf("%d:: serv_bufs=%p<->%p. br=%p out of range\n",
1889 armci_me, serv_malloc_buf_base, serv_tail, br);
1890 fflush(stdout);
1891 }
1892 return 0;
1893 }
1894
1895 #define PBUF_BUFID_TO_PUT_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2)
1896 #define PBUF_BUFID_TO_GET_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2+1)
1897 #define PBUF_WRID_TO_PBUFID(_id) (((_id)-DSCRID_PENDBUF)/2)
1898 #define PBUF_IS_GET_WRID(_id) (((_id)-DSCRID_PENDBUF)&1)
1899 #define PBUF_IS_PUT_WRID(_id) (!(((_id)-DSCRID_PENDBUF)&1))
1900
1901 /**Complete processing this immediate buffer. Parameters is void *,
1902 * since vapibuf_t*|immbuf_t* is not available in armci-vapi.h
1903 */
armci_complete_immbuf(void * buf)1904 void armci_complete_immbuf(void *buf) {
1905 vapibuf_t *cbuf = (vapibuf_t*)buf;
1906 request_header_t *msginfo=(request_header_t*)cbuf->buf;
1907
1908 #if SRI_CORRECT
1909 #error
1910 cbuf->send_pending = 0;
1911 #else
1912 _armci_pendbuf_post_immbuf(cbuf,msginfo->from);
1913 #endif
1914 armci_data_server(cbuf);
1915 if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
1916 SERVER_SEND_ACK(msginfo->from);
1917 }
1918 #if SRI_CORRECT
1919 if(!cbuf->send_pending) {
1920 _armci_pendbuf_post_immbuf(cbuf,msginfo->from);
1921 }
1922 #endif
1923 }
1924
1925 /**Complete processing this pending buffer. Parameters is void *,
1926 * since vapibuf_t*|immbuf_t* is not available in armci-vapi.h. Note
1927 * that the pending buffer may not yet be available for reuse. This
1928 * will depend on the state of the pending buffer (which might have to
1929 * wait for a communication innitiated by armci_data_server() to
1930 * complete.
1931 */
armci_complete_pendbuf(void * buf)1932 void armci_complete_pendbuf(void *buf) {
1933 vapibuf_pend_t *pbuf = (vapibuf_pend_t *)buf;
1934 request_header_t *msginfo=(request_header_t*)pbuf->buf;
1935
1936 assert(pbuf->vbuf);
1937 #if SRI_CORRECT
1938 pbuf->cbuf->send_pending=0;
1939 #else
1940 _armci_pendbuf_post_immbuf(pbuf->vbuf,msginfo->from);
1941 #endif
1942 armci_data_server(pbuf);
1943 if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
1944 SERVER_SEND_ACK(msginfo->from);
1945 }
1946 #if SRI_CORRECT
1947 #error
1948 assert(!pbuf->cbuf->send_pending);
1949 _armci_pendbuf_post_immbuf(pbuf->cbuf,msginfo->from);
1950 #endif
1951 }
1952
1953 void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr,
1954 int dscrid, struct ibv_sge *ssg_entry,
1955 void *rbuf, void *lbuf, int bytes) ;
1956 void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr,
1957 int dscrid, struct ibv_sge *ssg_entry,
1958 void *rbuf, void *lbuf, int bytes);
1959
no_srv_copy_nsegs_ulimit()1960 int no_srv_copy_nsegs_ulimit() {
1961 return armci_max_qp_ous_swr*armci_max_num_sg_ent/10;
1962 }
1963
1964 /** Initiate a get operation to progress a pending buffer.
1965 * @param msginfo Request header for any additional processing
1966 * @param src Pointer to src of data (remote for GET)
1967 * @param dst Pointer to dst
1968 * @param bytes #bytes to transfer
1969 * @param proc proc to transfer from(for get)/to(for put)
1970 * @param pbufid Index of pending buffer
1971 */
armci_pbuf_start_get(void * msg_info,void * src,void * dst,int bytes,int proc,int pbufid)1972 void armci_pbuf_start_get(void *msg_info, void *src, void *dst,
1973 int bytes, int proc, int pbufid) {
1974 struct ibv_send_wr sdscr;
1975 struct ibv_sge sg_entry;
1976 int wrid = PBUF_BUFID_TO_GET_WRID(pbufid);
1977 request_header_t *msginfo=(request_header_t *)msg_info;
1978 void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
1979 char *dst_ptr,
1980 int dst_stride_arr[],
1981 int seg_count[],
1982 int stride_levels,
1983 request_header_t *msginfo);
1984
1985
1986 #if defined(PUT_NO_SRV_COPY)
1987 if(msginfo->operation==PUT && msginfo->format==STRIDED
1988 && !msginfo->pinned && src==msginfo->tag.data_ptr) {
1989 char *loc_ptr, *rem_ptr;
1990 int stride_levels, *count;
1991 int *loc_stride_arr;
1992 char *dscr = (char *)(msginfo+1);
1993 ARMCI_MEMHDL_T *mhloc=NULL;
1994 int nsegs, i;
1995
1996 /* unpack descriptor record */
1997 loc_ptr = *(void**)dscr; dscr += sizeof(void*);
1998 stride_levels = *(int*)dscr; dscr += sizeof(int);
1999 loc_stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
2000 count = (int*)dscr;
2001
2002 rem_ptr = msginfo->tag.data_ptr;
2003
2004 nsegs = 1;
2005 for(i=0; i<stride_levels; i++)
2006 nsegs *= count[i+1];
2007
2008 dassert(1,proc==msginfo->from);
2009 if(nsegs<no_srv_copy_nsegs_ulimit() &&
2010 get_armci_region_local_hndl(loc_ptr,armci_clus_id(armci_me),&mhloc)) {
2011 /* printf("%d(s): direct rdma from client buffers to server-side memory\n",armci_me); */
2012 /* fflush(stdout); */
2013
2014 armci_server_rdma_contig_to_strided(rem_ptr, proc,
2015 loc_ptr,loc_stride_arr,
2016 count, stride_levels,
2017 msginfo);
2018 return;
2019 }
2020 }
2021 #endif
2022 /* printf("%d(s): rdma from client buffers to pending buffers\n",armci_me); */
2023 /* fflush(stdout); */
2024 _armci_get_data_from_client(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
2025 }
2026
2027 /** Initiate a put operation to progress a pending buffer.
2028 * @param src Pointer to src of data (local for PUT)
2029 * @param dst Pointer to dst
2030 * @param bytes #bytes to transfer
2031 * @param proc proc to transfer from(for get)/to(for put)
2032 * @param pbufid Index of pending buffer
2033 */
armci_pbuf_start_put(void * src,void * dst,int bytes,int proc,int pbufid)2034 void armci_pbuf_start_put(void *src, void *dst, int bytes, int proc,
2035 int pbufid) {
2036 struct ibv_send_wr sdscr;
2037 struct ibv_sge sg_entry;
2038 int wrid = PBUF_BUFID_TO_PUT_WRID(pbufid);
2039
2040 _armci_send_data_to_client_pbuf(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
2041 }
2042
2043 /**
2044 * function to get data from remote client called by data
2045 * server. Note that this is only called for pending buffers.
2046 * @param proc IN the id of remote client
2047 * @param sdscr IN/OUT Descriptor to be used to post the get
2048 * @param dscrid IN ID to be used for the descriptor
2049 * @param ssg_entry IN Scatter/gather list
2050 * @param rbuf IN the remote buffer to get from
2051 * @param lbuf IN local buf to get the data into, this is the queue buffer for SERVER_QUEUE path
2052 * @param bytes IN the size of get
2053 * @see SERVER_QUEUE
2054 * @see armci_send_data_to_client
2055 */
_armci_get_data_from_client(int proc,struct ibv_send_wr * sdscr,int dscrid,struct ibv_sge * ssg_entry,void * rbuf,void * lbuf,int bytes)2056 /*static*/ void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr,
2057 int dscrid, struct ibv_sge *ssg_entry,
2058 void *rbuf, void *lbuf, int bytes)
2059 {
2060 int rc = 0;
2061
2062 if(DEBUG_SERVER){
2063 printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
2064 armci_me,
2065 proc,lbuf,(char *)lbuf+bytes-sizeof(int),bytes);fflush(stdout);
2066 }
2067
2068 memset(sdscr,0,sizeof(struct ibv_send_wr));
2069 armci_init_cbuf_rrdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
2070 &serv_memhandle,(handle_array+proc));
2071
2072 if(DEBUG_SERVER){
2073 printf("\n%d(s):handle_array[%d]=%p lbuf=%p flag=%p bytes=%d\n",armci_me,
2074 proc,(void*)&handle_array[proc],(char *)lbuf,
2075 (char *)lbuf+bytes-sizeof(int),bytes);
2076 fflush(stdout);
2077 }
2078
2079 assert(sizeof(request_header_t)+bytes<PENDING_BUF_LEN);
2080
2081 sdscr->wr_id = dscrid;
2082 struct ibv_send_wr *bad_wr;
2083 rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
2084 dassert1(1,rc==0,rc);
2085 }
2086
_armci_send_data_to_client_pbuf(int proc,struct ibv_send_wr * sdscr,int dscrid,struct ibv_sge * ssg_entry,void * rbuf,void * lbuf,int bytes)2087 void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr,
2088 int dscrid, struct ibv_sge *ssg_entry,
2089 void *rbuf, void *lbuf, int bytes) {
2090 int rc = 0;
2091
2092 if(DEBUG_SERVER) {
2093 printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
2094 armci_me,
2095 proc,rbuf,(char *)rbuf+bytes-sizeof(int),bytes);fflush(stdout);
2096 }
2097 memset(sdscr,0,sizeof(struct ibv_send_wr));
2098 armci_init_cbuf_srdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
2099 &serv_memhandle,(handle_array+proc));
2100 if(DEBUG_SERVER){
2101 printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
2102 proc,(void*)&handle_array[proc],(char *)rbuf,
2103 (char *)rbuf+bytes-sizeof(int),bytes);
2104 fflush(stdout);
2105 }
2106 sdscr->wr_id = dscrid;
2107 struct ibv_send_wr *bad_wr;
2108 rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
2109 dassert1(1,rc==0,rc);
2110 }
2111 #endif
2112
2113 #define DATA_SERVER_YIELD_CPU
armci_call_data_server()2114 void armci_call_data_server()
2115 {
2116 int rc = 0;
2117 int rc1 = 0;
2118 vapibuf_t *cbuf,*cbufs;
2119 request_header_t *msginfo,*msg;
2120 int c,i;
2121 static int mytag=1;
2122 #ifdef CHANGE_SERVER_AFFINITY
2123 int rrr,serverwcount=0;
2124 #else
2125 #ifdef DATA_SERVER_YIELD_CPU_
2126 int serverwcount=0;
2127 #endif
2128 #endif
2129
2130 #ifdef CHANGE_SERVER_AFFINITY
2131 cpu_set_t mycpuid,new_mask;
2132 char str[CPU_SETSIZE];
2133 char cid[8];
2134 extern char * cpuset_to_cstr(cpu_set_t *mask, char *str);
2135 int nslave=armci_clus_info[armci_clus_me].nslave;
2136 rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
2137 #endif
2138
2139 #if ARMCI_ENABLE_GPC_CALLS
2140 unblock_thread_signal(GPC_COMPLETION_SIGNAL);
2141 #endif
2142 #if defined(PEND_BUFS)
2143 armci_pendbuf_init();
2144 #endif
2145
2146 for (;;) {
2147 struct ibv_wc *pdscr=NULL;
2148 struct ibv_wc pdscr1;
2149 pdscr = &pdscr1;
2150 pdscr->status = IBV_WC_SUCCESS;
2151 rc = 0;
2152 #ifdef CHANGE_SERVER_AFFINITY
2153 static int ccc;
2154 serverwcount++;
2155 if(serverwcount==100){
2156 serverwcount=0;
2157 ccc=(ccc+1)%nslave;
2158 sprintf (cid, "%d", ccc);
2159 rrr = cstr_to_cpuset(&new_mask,cid);
2160 if (sched_setaffinity(0, sizeof (new_mask), &new_mask)) {
2161 perror("sched_setaffinity");
2162 printf("failed to set pid %d's affinity.\n", getpid());
2163 }
2164 rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
2165 if(rrr)perror("sched_getaffinity");
2166 }
2167 #else
2168 #ifdef DATA_SERVER_YIELD_CPU_
2169 serverwcount++;
2170 if(serverwcount==50){
2171 serverwcount=0;usleep(1);
2172 }
2173 #endif
2174 #endif
2175
2176 #if ARMCI_ENABLE_GPC_CALLS
2177 block_thread_signal(GPC_COMPLETION_SIGNAL);
2178 #endif
2179 bzero(pdscr, sizeof(*pdscr));
2180 do {
2181 rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
2182 if (armci_server_terminating) {
2183 /* server is interrupted when clients terminate connections */
2184 armci_server_transport_cleanup();
2185 sleep(1);
2186 _exit(0);
2187 }
2188 if (rc == 0 && !armci_openib_server_poll) {
2189 /* wait for a notify event */
2190 rc1 = ibv_get_cq_event(CLN_nic->rch,&CLN_nic->rcq,&CLN_nic->rcq_cntx);
2191 dassert1(1,rc1==0,rc1);
2192 ibv_ack_cq_events(CLN_nic->rcq, 1);
2193 /* re-arm notify event */
2194 rc1 = ibv_req_notify_cq(CLN_nic->rcq, 0);
2195 dassert1(1,rc1==0,rc1);
2196 /* note: an event receive does not guarantee an actual completion */
2197 continue;
2198 }
2199 } while (rc == 0);
2200
2201 if(DEBUG_SERVER) {
2202 printf("\n%d:pdscr=%p %p %d %d %d %d\n",armci_me,(void*)pdscr,(void*)&pdscr1,
2203 pdscr->status,pdscr->opcode,pdscr->vendor_err,
2204 pdscr->src_qp);
2205 fflush(stdout);
2206 }
2207 dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d",
2208 armci_me,rc,(int)pdscr->wr_id,pdscr->status));
2209 dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
2210
2211 if (DEBUG_SERVER) {
2212 printf("%d(s) : NEW MESSAGE bytelen %d \n",armci_me,pdscr->byte_len);
2213 printf("%d(s) : NEW MESSAGE id is %ld \n",armci_me,pdscr->wr_id);
2214 fflush(stdout);
2215 }
2216 #if defined(PEND_BUFS)
2217 if(pdscr->wr_id>=DSCRID_IMMBUF_RESP && pdscr->wr_id<DSCRID_IMMBUF_RESP_END) {
2218 /* fprintf(stderr, "%d(s) : Got server response msg completion\n", armci_me); */
2219 #if SRI_CORRECT
2220 int id = pdscr->wr_id - DSCRID_IMMBUF_RESP;
2221 if(id>=0 && id<armci_nproc*(IMM_BUF_NUM+1)) {
2222 int dest = id/(IMM_BUF_NUM+1);
2223 dassert(1,serv_buf_arr[id]->send_pending==1);
2224 serv_buf_arr[id]->send_pending = 0;
2225 _armci_pendbuf_post_immbuf(serv_buf_arr[id],dest);
2226 }
2227 #endif
2228 continue;
2229 }
2230 if (pdscr->wr_id>=DSCRID_PENDBUF && pdscr->wr_id<DSCRID_PENDBUF_END) {
2231 int pbufid = PBUF_WRID_TO_PBUFID(pdscr->wr_id);
2232 /* printf("%d(s) : Progressing pending msg (something completed) pbufid=%d id=%ld byte_len=%d status=%d\n", armci_me, pbufid,pdscr->wr_id,pdscr->byte_len,done_status); */
2233 /* fflush(stdout); */
2234 if(PBUF_IS_GET_WRID(pdscr->wr_id))
2235 armci_pendbuf_done_get(pbufid);
2236 else if(PBUF_IS_PUT_WRID(pdscr->wr_id))
2237 armci_pendbuf_done_put(pbufid);
2238 else
2239 armci_die("Pending buffer op completed. But not PUT or GET!",pdscr->wr_id);
2240 continue;
2241 }
2242 #endif
2243 if (pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END) {
2244 #if defined(PEND_BUFS)
2245 sr_descr_t *sdscr_arr;
2246 #else
2247 sr_descr_t *rdscr_arr;
2248 #endif
2249 if (DEBUG_SERVER) {
2250 printf("%d(s) : received SCATGAT DATA id = %ld, length = %d\n",
2251 armci_me,pdscr->wr_id, pdscr->byte_len);
2252 fflush(stdout);
2253 }
2254 #if defined(PEND_BUFS)
2255 sdscr_arr = armci_vapi_serv_nbsdscr_array;
2256 assert(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends>0);
2257 sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
2258 if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
2259 sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
2260 #else
2261 rdscr_arr = armci_vapi_serv_nbrdscr_array;
2262 rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
2263 if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
2264 rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
2265 #endif
2266 continue;
2267 }
2268
2269 #if defined(PEND_BUFS)
2270 assert(pdscr->wr_id>=DSCRID_IMMBUF_RECV && pdscr->wr_id<DSCRID_IMMBUF_RECV_END);
2271 #endif
2272 cbuf = serv_buf_arr[DSCRID_TO_IMMBUFID(pdscr->wr_id)];
2273 assert(cbuf->dscr.wr_id == pdscr->wr_id);
2274
2275 msginfo = (request_header_t*)cbuf->buf;
2276 armci_ack_proc = c = msginfo->from;
2277
2278 if (DEBUG_SERVER) {
2279 printf("%d(s) : request id is %ld operation is %d, length is %d from=%d cbuf->dscr.wr_id=%d\n",
2280 armci_me,pdscr->wr_id,msginfo->operation,pdscr->byte_len,msginfo->from, (int)cbuf->dscr.wr_id);
2281 fflush(stdout);
2282 }
2283
2284 #if defined(PEND_BUFS)
2285 cbufs = cbuf;
2286 armci_init_vapibuf_recv(&cbufs->dscr, &cbufs->sg_entry,cbufs->buf,
2287 IMM_BUF_LEN, &serv_memhandle);
2288 cbufs->dscr.wr_id = pdscr->wr_id;
2289 #else
2290 cbufs = serv_buf_arr[pdscr->wr_id - armci_nproc] = spare_serv_buf;
2291 armci_init_vapibuf_recv(&cbufs->dscr, &cbufs->sg_entry,cbufs->buf,
2292 CBUF_DLEN, &serv_memhandle);
2293 cbufs->dscr.wr_id = c + armci_nproc;
2294
2295 spare_serv_buf = cbuf;
2296 #endif
2297
2298 if(DEBUG_SERVER) {
2299 printf("%d(s):Came out of poll id=%ld\n",armci_me,pdscr->wr_id);
2300 fflush(stdout);
2301 }
2302
2303 if(msginfo->operation == PUT &&msginfo->pinned == 1){
2304 int found, num;
2305 int stride_arr[MAX_STRIDE_LEVEL]; /*should be MAX_STRIDE_LEVELS*/
2306 int count[MAX_STRIDE_LEVEL];
2307 void *dest_ptr;
2308 int stride_levels;
2309 ARMCI_MEMHDL_T *loc_memhandle;
2310 void armci_post_scatter(void *,int *,int *,int, armci_vapi_memhndl_t *,int,int,int,sr_descr_t **);
2311
2312 /*unpack decsriptor_record : should call a function instead */
2313 msg = msginfo + 1;
2314 test_ptr = dest_ptr = *(void**)msg;
2315 msg = (request_header_t *) ((char*)msg + sizeof(void*));
2316 test_stride_levels=stride_levels = *(int*)msg;
2317 msg = (request_header_t *) ((char*)msg + sizeof(int));
2318 for(i =0; i<stride_levels; i++){
2319 test_stride_arr[i] = stride_arr[i] = *(int*)msg;
2320 msg = (request_header_t*) ((int*)msg + 1);
2321 }
2322 for(i=0; i<stride_levels+1; i++){
2323 test_count[i] = count[i] = *(int*)msg;
2324 msg = (request_header_t*) ((int*)msg + 1);
2325 }
2326
2327 if (DEBUG_SERVER) {
2328 printf(" server:the dest_ptr is %p\n", dest_ptr);
2329 for(i =0; i<stride_levels; i++)
2330 printf("stride_arr[i] is %d,value of count[i] is %d\n",
2331 stride_arr[i], count[i]);
2332 printf("the value of stride_levels is %d\n", stride_levels);
2333 fflush(stdout);
2334 }
2335
2336 found =get_armci_region_local_hndl(dest_ptr,armci_me, &loc_memhandle);
2337 dassertp(1,found!=0,("%d:SERVER : local region not found id=%d",
2338 armci_me,pdscr->wr_id));
2339
2340 if(DEBUG_SERVER) {
2341 printf("%d(s) : about to call armci_post_scatter\n",armci_me);
2342 fflush(stdout);
2343 }
2344
2345 armci_post_scatter(dest_ptr, stride_arr, count, stride_levels,
2346 loc_memhandle,msginfo->from, mytag, SERV,NULL );
2347
2348 mytag = (mytag+1)%(MAX_PENDING);
2349 if(mytag==0)mytag=1;
2350
2351 if(DEBUG_SERVER) {
2352 printf("%d(s) : finished posting %d scatter\n",armci_me,num);
2353 fflush(stdout);
2354 }
2355 _armci_pendbuf_post_immbuf(cbufs, msginfo->from);
2356 SERVER_SEND_ACK(msginfo->from);
2357 }
2358 else if(msginfo->operation == REGISTER){
2359 if (DEBUG_SERVER) {
2360 printf("%d(s) : Register_op id is %d, comp_dscr_id is %ld\n",
2361 armci_me,msginfo->operation,pdscr->wr_id);
2362 fflush(stdout);
2363 }
2364
2365 armci_server_register_region(*((void **)(msginfo+1)),
2366 *((long *)((char *)(msginfo+1)+sizeof(void *))),
2367 (ARMCI_MEMHDL_T *)(msginfo->tag.data_ptr));
2368 _armci_pendbuf_post_immbuf(cbufs, msginfo->from);
2369 *(long *)(msginfo->tag.ack_ptr) = ARMCI_STAMP;
2370 continue;
2371 }
2372 else {
2373 if(DEBUG_SERVER) {
2374 printf("%d(s) : request is %ld about to call armci_data_server\n",
2375 armci_me, pdscr->wr_id);
2376 fflush(stdout);
2377 }
2378 #if defined(PEND_BUFS)
2379 armci_pendbuf_service_req(cbuf);
2380 #else
2381 _armci_pendbuf_post_immbuf(cbufs, msginfo->from);
2382 armci_data_server(cbuf);
2383
2384 if((msginfo->operation == PUT) || ARMCI_ACC(msginfo->operation)) {
2385 /* for operations that do not send data back we can send ACK now */
2386 SERVER_SEND_ACK(msginfo->from);
2387 if(DEBUG_SERVER){
2388 printf("%d(s) : posted ack\n\n",armci_me);
2389 fflush(stdout);
2390 }
2391 }
2392 #endif
2393 }
2394 if (0) {
2395 printf("%d(s):Done processed request\n\n",armci_me);
2396 fflush(stdout);
2397 }
2398
2399 #if ARMCI_ENABLE_GPC_CALLS
2400 unblock_thread_signal(GPC_COMPLETION_SIGNAL);
2401 #endif
2402 }/* end of for */
2403 }
2404
2405
armci_vapi_complete_buf(armci_vapi_field_t * field,int snd,int rcv,int to,int op)2406 void armci_vapi_complete_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op) {
2407 struct ibv_send_wr *snd_dscr;
2408
2409 BUF_INFO_T *info;
2410 info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
2411
2412 if(info->tag && op==GET)return;
2413
2414 if(snd){
2415 snd_dscr=&(field->sdscr);
2416 if(mark_buf_send_complete[snd_dscr->wr_id]==0)
2417 armci_send_complete(snd_dscr,"armci_vapi_complete_buf",1);
2418 }
2419
2420 if(rcv){
2421 int *last;
2422 long *flag;
2423 int loop = 0;
2424 request_header_t *msginfo = (request_header_t *)(field+1);
2425 flag = (long *)&msginfo->tag.ack;
2426
2427
2428 if(op==PUT || ARMCI_ACC(op)){
2429 if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
2430 op == PUT);
2431 else{
2432 while(armci_util_long_getval(flag) != ARMCI_STAMP) {
2433 loop++;
2434 loop %=100000;
2435 if(loop==0){
2436 }
2437 }
2438 }
2439 /* printf("%d: client complete_buf. op=%d loop=%d till *flag=ARMCI_STAMP\n", armci_me,op,loop); */
2440 /* fflush(stdout); */
2441 *flag = 0L;
2442 }
2443 else{
2444 /*SK: I think we get here only for GET with result directly
2445 going to client's pinned memory. (info.tag==0 && op==GET)*/
2446 last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
2447 while(armci_util_int_getval(last) == ARMCI_STAMP &&
2448 armci_util_long_getval(flag) != ARMCI_STAMP){
2449 loop++;
2450 loop %=100000;
2451 if(loop==0){
2452 if(DEBUG_CLN){
2453 printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
2454 armci_me,(void*)last,*last,(void*)flag,*flag,msginfo->datalen);
2455 fflush(stdout);
2456 }
2457 }
2458 }
2459 }
2460 }
2461 }
2462
armci_vapi_test_buf(armci_vapi_field_t * field,int snd,int rcv,int to,int op,int * retval)2463 void armci_vapi_test_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op, int *retval) {
2464 struct ibv_send_wr *snd_dscr;
2465
2466 BUF_INFO_T *info;
2467 info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
2468
2469 *retval = 0;
2470
2471 if(info->tag && op==GET)return;
2472
2473 if(snd){
2474 snd_dscr=&(field->sdscr);
2475 if(mark_buf_send_complete[snd_dscr->wr_id]==0) {
2476 /* printf("%d: test buf. send not complete\n",armci_me); */
2477 /* fflush(stdout); */
2478 return;
2479 }
2480 }
2481
2482 if(rcv){
2483 int *last;
2484 long *flag;
2485 request_header_t *msginfo = (request_header_t *)(field+1);
2486 flag = (long *)&msginfo->tag.ack;
2487
2488 if(op==PUT || ARMCI_ACC(op)){
2489 if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
2490 op == PUT)
2491 *retval=1;
2492 else{
2493 if(armci_util_long_getval(flag) == ARMCI_STAMP) {
2494 *retval = 1;
2495 }
2496 }
2497 return;
2498 }
2499 else{
2500 /*SK: I think we get here only for GET with result directly
2501 going to client's pinned memory. (info.tag==0 && op==GET)*/
2502 last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
2503 if(armci_util_int_getval(last) != ARMCI_STAMP ||
2504 armci_util_long_getval(flag) == ARMCI_STAMP){
2505 *retval=1;
2506 }
2507 return;
2508 }
2509 }
2510 }
2511
2512
armci_vapi_post_send(int isclient,int con_offset,struct ibv_send_wr * snd_dscr,char * from)2513 static inline void armci_vapi_post_send(int isclient,int con_offset,
2514 struct ibv_send_wr *snd_dscr,char *from)
2515 {
2516 int rc = 0;
2517 /*vapi_nic_t *nic;*/
2518 armci_connect_t *con;
2519 int total = 0;
2520
2521 if(!isclient){
2522 /*nic = CLN_nic;*/
2523 con = CLN_con+con_offset;
2524 }
2525 else{
2526 /*nic = SRV_nic;*/
2527 con = SRV_con+con_offset;
2528 }
2529
2530 if(DEBUG_CLN){
2531 printf("vapi_post_send: snd_dscr->num_sge=%d, snd_dscr->sg_list->length=%d\n",
2532 snd_dscr->num_sge, snd_dscr->sg_list->length);
2533 fflush(stdout);
2534 }
2535
2536
2537 /* find the total length of all the segments */
2538 total = snd_dscr->sg_list->length * snd_dscr->num_sge;
2539 if(DEBUG_CLN){
2540 printf("%d(c) : total is %d\t, max_size is %d\n",armci_me,total,
2541 armci_vapi_max_inline_size);
2542 }
2543
2544 struct ibv_send_wr *bad_wr;
2545 if (total > armci_vapi_max_inline_size) {
2546 rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
2547 } else {
2548 rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
2549 /* no corresponding call, using ibv_post_send
2550 rc = EVAPI_post_inline_sr(nic->handle,con->qp,snd_dscr);*/
2551 }
2552 dassert1(1,rc==0,rc);
2553 }
2554
2555 /** Send request to server.
2556 */
armci_send_req_msg(int proc,void * buf,int bytes)2557 int armci_send_req_msg(int proc, void *buf, int bytes)
2558 {
2559 int cluster = armci_clus_id(proc);
2560 request_header_t *msginfo = (request_header_t *)buf;
2561 struct ibv_send_wr *snd_dscr;
2562 struct ibv_sge *ssg_lst;
2563
2564 THREAD_LOCK(armci_user_threads.net_lock);
2565
2566 check_state_of_ib_connection(proc, 0);
2567
2568 snd_dscr = BUF_TO_SDESCR((char *)buf);
2569 ssg_lst = BUF_TO_SSGLST((char *)buf);
2570
2571 /*Stamp end of buffers as needed*/
2572 if(msginfo->operation == GET && !msginfo->pinned) {
2573 const int dscrlen = msginfo->dscrlen;
2574 const int datalen = msginfo->datalen;
2575 int *last;
2576 if(dscrlen < (datalen - sizeof(int)))
2577 last = (int*)(((char*)(msginfo+1))+(datalen-sizeof(int)));
2578 else
2579 last = (int*)(((char*)(msginfo+1))+(dscrlen+datalen-sizeof(int)));
2580 *last = ARMCI_STAMP;
2581 #ifdef GET_STRIDED_COPY_PIPELINED
2582 if(msginfo->format == STRIDED) {
2583 const int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
2584 int *sfirst = (int*)(dscrlen+(char*)(msginfo+1))+ssize; /*stamping
2585 can start here*/
2586 int *slast = last, *ptr;
2587 for(ptr=sfirst; ptr<slast; ptr+=ssize) {
2588 *ptr = ARMCI_STAMP;
2589 }
2590 }
2591 #endif
2592 }
2593 if(msginfo->operation == ACK) {
2594 *(int *)(msginfo +1) = ARMCI_STAMP+1;
2595 *(((int *)(msginfo +1))+1) = ARMCI_STAMP+1;
2596 }
2597
2598
2599 #if defined(PEND_BUFS)
2600 if((msginfo->operation==PUT || ARMCI_ACC(msginfo->operation))
2601 && bytes > IMM_BUF_LEN) {
2602 msginfo->tag.imm_msg=0;
2603 assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
2604 bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
2605 assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
2606 }
2607 else if(msginfo->operation==GET
2608 && !(msginfo->datalen+sizeof(request_header_t)+msginfo->dscrlen<IMM_BUF_LEN)) {
2609 assert(sizeof(request_header_t) < IMM_BUF_LEN);
2610 msginfo->tag.imm_msg=0;
2611 bytes = ARMCI_MIN(sizeof(request_header_t)+msginfo->dscrlen, IMM_BUF_LEN);
2612 }
2613 #if defined(PUT_NO_SRV_COPY) && 0 /*SK:disabled. Imm msgs are sent inline
2614 for latency reasons*/
2615 else if(msginfo->operation==PUT && !msginfo->pinned && msginfo->format==STRIDED && msginfo->tag.data_len>=2048) {
2616 msginfo->tag.imm_msg = 0;
2617 assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
2618 bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
2619 assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
2620 }
2621 #endif
2622 else{
2623 msginfo->tag.imm_msg=1;
2624 }
2625 /* printf("%d: send_req: op=%d bytes=%d data_len=%d imm=%d\n",*/
2626 /* armci_me, msginfo->operation, bytes, msginfo->datalen,msginfo->tag.imm_msg);*/
2627 /* fflush(stdout);*/
2628 if(bytes<0 || bytes>IMM_BUF_LEN) {
2629 printf("%d(pid=%d): Trying to send too large a mesg. op=%d bytes=%d(max=%d) to=%d\n", armci_me, getpid(),msginfo->operation,bytes,IMM_BUF_LEN, proc);
2630 fflush(stdout);
2631 pause();
2632 assert(bytes>=0);
2633 assert(bytes <= IMM_BUF_LEN);
2634 }
2635 _armci_buf_ensure_pend_outstanding_op_per_node(buf,cluster);
2636 /* printf("%d: send_req. ensured pend os per node. to=%d op=%d\n", armci_me, msginfo->to,msginfo->operation); */
2637 /* fflush(stdout); */
2638 #else
2639 _armci_buf_ensure_one_outstanding_op_per_node(buf,cluster);
2640 #endif
2641
2642 if(msginfo->operation == PUT || ARMCI_ACC(msginfo->operation)){
2643 #if defined(PEND_BUFS)
2644 if(!msginfo->tag.imm_msg){
2645 msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
2646 msginfo->tag.data_len = msginfo->datalen;
2647 }
2648 else
2649 msginfo->tag.data_ptr = NULL;
2650 #else
2651 {
2652 msginfo->tag.data_ptr = (void *)&msginfo->tag.ack;
2653 }
2654 #endif
2655 }
2656 else {
2657 if(msginfo->operation == GET && !msginfo->bypass && msginfo->dscrlen
2658 >= (msginfo->datalen-sizeof(int)))
2659 msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
2660 else
2661 msginfo->tag.data_ptr = GET_DATA_PTR(buf);
2662 }
2663
2664 /*this has to be reset so that we can wait on it
2665 see ReadFromDirect*/
2666 msginfo->tag.ack = 0;
2667 msginfo->tag.ack_ptr = &(msginfo->tag.ack);
2668
2669 if(DEBUG_CLN){
2670 printf("%d:the ack_ptr is initialised to %p, ack->value is %ld\n",
2671 armci_me,msginfo->tag.ack_ptr,msginfo->tag.ack);fflush(stdout);
2672 }
2673
2674 armci_init_vapibuf_send(snd_dscr, ssg_lst,buf,
2675 bytes, &client_memhandle);
2676
2677 /* printf("%d: Sending req wr_id=%d to=%d\n",armci_me,snd_dscr->wr_id,proc);*/
2678 /* fflush(stdout);*/
2679 armci_vapi_post_send(1,cluster,snd_dscr,"send_req_msg:post_send");
2680
2681 THREAD_UNLOCK(armci_user_threads.net_lock);
2682
2683 if(DEBUG_CLN){
2684 printf("%d:client sent REQ=%d %d bytes serv=%d qp=%p id =%ld lkey=%d\n",
2685 armci_me,msginfo->operation,bytes,cluster,
2686 (void*)(SRV_con+cluster)->qp,snd_dscr->wr_id,ssg_lst->lkey);
2687 fflush(stdout);
2688 }
2689 return(0);
2690 }
2691
2692
2693 /*\
2694 * client waits for first phase ack before posting gather desr
2695 \*/
armci_wait_ack(char * buffer)2696 void armci_wait_ack(char *buffer)
2697 {
2698 long *flag;
2699 request_header_t *msginfo = (request_header_t *)(buffer);
2700 flag = (long*)&msginfo->tag.ack;
2701
2702 while(armci_util_long_getval(flag) != ARMCI_STAMP);
2703 flag = 0;
2704 }
2705
2706
2707
2708
armci_client_direct_send(int p,void * src_buf,void * dst_buf,int len,void ** contextptr,int nbtag,ARMCI_MEMHDL_T * lochdl,ARMCI_MEMHDL_T * remhdl)2709 void armci_client_direct_send(int p,void *src_buf, void *dst_buf, int len,void** contextptr,int nbtag,ARMCI_MEMHDL_T *lochdl,ARMCI_MEMHDL_T *remhdl)
2710 {
2711 sr_descr_t *dirdscr;
2712 int clus = armci_clus_id(p);
2713
2714 check_state_of_ib_connection(p, 0);
2715
2716 THREAD_LOCK(armci_user_threads.net_lock);
2717
2718 /*ID for the desr that comes from get_next_descr is already set*/
2719 dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2720 if(nbtag)*contextptr = dirdscr;
2721
2722 armci_init_cbuf_srdma(&dirdscr->sdescr,dirdscr->sg_entry,src_buf,dst_buf,
2723 len,lochdl,remhdl);
2724
2725 armci_vapi_post_send(1,clus,&(dirdscr->sdescr),
2726 "client_direct_send:post_send");
2727
2728 /* the following unlock/lock ensures fairness (in case other threads are waiting
2729 on the lock) not required to work */
2730 #if 1
2731 THREAD_UNLOCK(armci_user_threads.net_lock);
2732 THREAD_LOCK(armci_user_threads.net_lock);
2733 #endif
2734
2735 if(nbtag==0)
2736 armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_send",1);
2737
2738 THREAD_UNLOCK(armci_user_threads.net_lock);
2739 }
2740
2741 /*\ RDMA get
2742 \*/
armci_client_direct_get(int p,void * src_buf,void * dst_buf,int len,void ** cptr,int nbtag,ARMCI_MEMHDL_T * lochdl,ARMCI_MEMHDL_T * remhdl)2743 void armci_client_direct_get(int p, void *src_buf, void *dst_buf, int len,
2744 void** cptr,int nbtag,ARMCI_MEMHDL_T *lochdl,
2745 ARMCI_MEMHDL_T *remhdl)
2746 {
2747 int rc = 0;
2748 sr_descr_t *dirdscr;
2749 int clus = armci_clus_id(p);
2750 check_state_of_ib_connection(p, 0);
2751 struct ibv_send_wr *bad_wr;
2752
2753 THREAD_LOCK(armci_user_threads.net_lock);
2754
2755 /*ID for the desr that comes from get_next_descr is already set*/
2756 dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2757 if(nbtag)*cptr = dirdscr;
2758
2759 if(DEBUG_CLN){
2760 printf("\n%d: in direct get lkey=%d rkey=%d\n",armci_me,lochdl->lkey,
2761 remhdl->rkey);fflush(stdout);
2762 }
2763
2764 armci_init_cbuf_rrdma(&dirdscr->sdescr,dirdscr->sg_entry,dst_buf,src_buf,
2765 len,lochdl,remhdl);
2766 rc = ibv_post_send((SRV_con+clus)->qp, &(dirdscr->sdescr), &bad_wr);
2767 dassert1(1,rc==0,rc);
2768
2769 /* unlock/lock to ensure fairness: allows others thread post before
2770 waiting for completion */
2771 /*VT?check to see if this should be UNLOCK followed by lock*/
2772 #if 1
2773 THREAD_UNLOCK(armci_user_threads.net_lock);
2774 THREAD_LOCK(armci_user_threads.net_lock);
2775 #endif
2776
2777 if(!nbtag){
2778 armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_get",1);
2779 }
2780
2781 THREAD_UNLOCK(armci_user_threads.net_lock);
2782 }
2783
2784 #define WQE_LIST_LENGTH 32
2785 #define WQE_LIST_COUNT 1
2786
2787 /** Direct put into remote processor memory. Assumes that (and invoked
2788 * only when) the source buffers in user memory are pinned as well.
2789 * @param operation PUT/GET
2790 * @param src_ptr Source pointer for data
2791 * @param src_stride_arr Strides on the source array
2792 * @param dst_ptr Destination pointer to start writing to
2793 * @param seq_count[stride_levels+1] #els in each stride
2794 * level. seg_count[0] is contiguous bytes
2795 * @param proc Destimation process
2796 * @param cptr OUT Pointer to store the descriptor to wait on for completion
2797 * @param nbtag IN Non-blocking tag (non-blocking op if nbtag!=0)
2798 * @param lochdl IN Local memory handle/key (registered memory stuff)
2799 * @param remhdl IN Remote memory handle/key
2800 *
2801 */
2802 #if 0
2803 void armci_client_direct_rdma_strided(int operation, int proc,
2804 char *src_ptr, int src_stride_arr[],
2805 char *dst_ptr, int dst_stride_arr[],
2806 int seg_count[],
2807 int stride_levels,
2808 void **cptr, int nbtag,
2809 ARMCI_MEMHDL_T *lochdl,
2810 ARMCI_MEMHDL_T *remhdl) {
2811
2812 int rc;
2813 sr_descr_t *dirdscr;
2814 const int clus = armci_clus_id(proc);
2815 struct ibv_send_wr *bad_wr;
2816 struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2817 struct ibv_sge sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2818 int busy[WQE_LIST_COUNT], wait_count[WQE_LIST_COUNT],clst;
2819 int i, j, c, numposts;
2820 int idx[MAX_STRIDE_LEVEL];
2821
2822 THREAD_LOCK(armci_user_threads.net_lock);
2823
2824 assert(stride_levels >= 0);
2825 assert(stride_levels<=MAX_STRIDE_LEVEL);
2826 /*ID for the desr that comes from get_next_descr is already set*/
2827 dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2828 if(nbtag)*cptr = dirdscr;
2829 assert(dirdscr->tag == nbtag);
2830
2831 if(DEBUG_CLN) {
2832 printf("\n%d: in direct rdma strided id=%d lkey=%ld rkey=%ld\n",
2833 armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
2834 }
2835
2836 for(c=0; c<WQE_LIST_COUNT; c++) {
2837 busy[c]=0;
2838 }
2839 /*initialize fixed values for descriptors*/
2840 bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
2841 bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
2842 for(j=0; j<WQE_LIST_COUNT; j++) {
2843 for(i=0; i<WQE_LIST_LENGTH; i++) {
2844 if(operation == PUT)
2845 armci_init_cbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2846 else if(operation == GET)
2847 armci_init_cbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2848 else
2849 armci_die("rdma_strided: unsupported operation",operation);
2850 sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
2851 sdscr[j][i].send_flags = 0; /*non-signalled*/
2852 if(i<WQE_LIST_LENGTH-1)
2853 sdscr[j][i].next = &sdscr[j][i+1];
2854 }
2855 }
2856 /*post requests in a loop*/
2857 numposts=1;
2858 for(i=1; i<=stride_levels; i++) {
2859 numposts *= seg_count[i];
2860 }
2861 /* printf("%d: client rdma op=%d numposts=%d\n",armci_me,operation,numposts); */
2862
2863 dirdscr->numofsends=0;
2864 bzero(idx, stride_levels*sizeof(int));
2865 int count = (numposts%WQE_LIST_LENGTH) ? (numposts%WQE_LIST_LENGTH):WQE_LIST_LENGTH;
2866 assert(count == ARMCI_MIN(count, numposts));
2867 clst=0;
2868 for(i=0; i<numposts; ) {
2869 for(j=i; j<i+count; j++) {
2870 int src_offset=0, dst_offset=0;
2871 for(c=0; c<stride_levels; c++) {
2872 src_offset += idx[c]*src_stride_arr[c];
2873 dst_offset += idx[c]*dst_stride_arr[c];
2874 }
2875
2876 /* armci_client_direct_send(proc,src_ptr+src_offset, */
2877 /* dst_ptr+dst_offset, seg_count[0], */
2878 /* NULL,0,lochdl,remhdl); */
2879 if(busy[clst]) {
2880 assert(wait_count[clst]>0);
2881 armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",wait_count[clst]);
2882 dirdscr->numofsends -= wait_count[clst];
2883 busy[clst]=0;
2884 wait_count[clst]=0;
2885 }
2886
2887 if(operation == PUT) {
2888 sg_entry[clst][j-i].addr = (uint64_t)(src_ptr + src_offset);
2889 sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(dst_ptr + dst_offset);
2890 }
2891 else if (operation == GET) {
2892 sg_entry[clst][j-i].addr = (uint64_t)(dst_ptr + dst_offset);
2893 sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(src_ptr + src_offset);
2894 }
2895 assert(sg_entry[clst][j-i].length == seg_count[0]);
2896
2897 idx[0] += 1;
2898 for(c=0;c<stride_levels-1 && idx[c]==seg_count[c+1]; c++) {
2899 idx[c]=0; idx[c+1]++;
2900 }
2901 }
2902 sdscr[clst][count-1].next=NULL;
2903 sdscr[clst][count-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
2904 for(c=0; c<count-1; c++) {
2905 assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
2906 }
2907 rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
2908 dassert1(1,rc==0,rc);
2909 dirdscr->numofsends += 1;
2910 wait_count[clst] = 1;
2911 /* armci_send_complete(&dirdscr->sdescr,"armci_client_direct_rdma_strided",count); */
2912
2913 if(count < WQE_LIST_LENGTH) {
2914 sdscr[clst][count-1].next=&sdscr[clst][count]; /*reset it*/
2915 }
2916 sdscr[clst][count-1].send_flags=0; /*reset it*/
2917 i += count;
2918 count = ARMCI_MIN(WQE_LIST_LENGTH,numposts-i);
2919 assert(count==0 || count==WQE_LIST_LENGTH);
2920 clst = (clst+1)%WQE_LIST_COUNT;
2921 }
2922
2923 if(!nbtag) {
2924 armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
2925 dirdscr->numofsends = 0;
2926 dirdscr->tag = 0;
2927 }
2928 THREAD_UNLOCK(armci_user_threads.net_lock);
2929 }
2930 #else
armci_client_direct_rdma_strided(int operation,int proc,char * src_ptr,int src_stride_arr[],char * dst_ptr,int dst_stride_arr[],int seg_count[],int stride_levels,void ** cptr,int nbtag,ARMCI_MEMHDL_T * lochdl,ARMCI_MEMHDL_T * remhdl)2931 void armci_client_direct_rdma_strided(int operation, int proc,
2932 char *src_ptr, int src_stride_arr[],
2933 char *dst_ptr, int dst_stride_arr[],
2934 int seg_count[],
2935 int stride_levels,
2936 void **cptr, int nbtag,
2937 ARMCI_MEMHDL_T *lochdl,
2938 ARMCI_MEMHDL_T *remhdl) {
2939 int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr;
2940 sr_descr_t *dirdscr;
2941 const int clus = armci_clus_id(proc);
2942 struct ibv_send_wr *bad_wr;
2943 struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2944 struct ibv_sge sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2945 stride_info_t sinfo, dinfo;
2946
2947 THREAD_LOCK(armci_user_threads.net_lock);
2948
2949 assert(stride_levels >= 0);
2950 assert(stride_levels<=MAX_STRIDE_LEVEL);
2951 /*ID for the desr that comes from get_next_descr is already set*/
2952 dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2953 if(nbtag)*cptr = dirdscr;
2954 assert(dirdscr->tag == nbtag);
2955
2956 if(DEBUG_CLN) {
2957 printf("\n%d: in direct rdma strided id=%lu lkey=%u rkey=%u\n",
2958 armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
2959 }
2960
2961 /*initialize fixed values for descriptors*/
2962 bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
2963 bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
2964 for(j=0; j<WQE_LIST_COUNT; j++) {
2965 for(i=0; i<WQE_LIST_LENGTH; i++) {
2966 if(operation == PUT)
2967 armci_init_cbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2968 else if(operation == GET)
2969 armci_init_cbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2970 else
2971 armci_die("rdma_strided: unsupported operation",operation);
2972 sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
2973 sdscr[j][i].send_flags = 0; /*non-signalled*/
2974 if(i<WQE_LIST_LENGTH-1)
2975 sdscr[j][i].next = &sdscr[j][i+1];
2976 }
2977 }
2978
2979 /*post requests in a loop*/
2980 armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
2981 armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
2982 assert(armci_stride_info_size(&sinfo)==armci_stride_info_size(&dinfo));
2983
2984 dirdscr->numofsends=0;
2985 clst=ctr=0;
2986 bzero(busy, sizeof(int)*WQE_LIST_COUNT);
2987 while(armci_stride_info_has_more(&sinfo)) {
2988 assert(armci_stride_info_has_more(&dinfo));
2989 uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
2990 uint64_t daddr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
2991 if(operation == PUT) {
2992 sg_entry[clst][ctr].addr = saddr;
2993 sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
2994 }
2995 else if (operation == GET) {
2996 sg_entry[clst][ctr].addr = daddr;
2997 sdscr[clst][ctr].wr.rdma.remote_addr = saddr;
2998 }
2999 assert(sg_entry[clst][ctr].length == seg_count[0]);
3000
3001 ctr+=1;
3002 armci_stride_info_next(&sinfo);
3003 armci_stride_info_next(&dinfo);
3004 if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(&sinfo)) {
3005 sdscr[clst][ctr-1].next=NULL;
3006 sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3007 for(c=0; c<ctr-1; c++) {
3008 assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
3009 }
3010
3011 check_state_of_ib_connection(armci_clus_info[clus].master, 0);
3012 rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
3013 dassert1(1,rc==0,rc);
3014 busy[clst] = 1;
3015 dirdscr->numofsends += 1;
3016 if(ctr<WQE_LIST_LENGTH)
3017 sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
3018 sdscr[clst][ctr-1].send_flags = 0;
3019
3020 ctr=0;
3021 clst = (clst+1)%WQE_LIST_COUNT;
3022 if(busy[clst]) {
3023 armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",1);
3024 dirdscr->numofsends -= 1;
3025 busy[clst]=0;
3026 }
3027 }
3028 }
3029 armci_stride_info_destroy(&sinfo);
3030 armci_stride_info_destroy(&dinfo);
3031
3032 if(!nbtag) {
3033 armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
3034 dirdscr->numofsends = 0;
3035 dirdscr->tag = 0;
3036 }
3037 THREAD_UNLOCK(armci_user_threads.net_lock);
3038 }
3039 #endif
3040
3041 #if defined(PEND_BUFS)
armci_server_msginfo_to_pbuf_index(request_header_t * msginfo)3042 int armci_server_msginfo_to_pbuf_index(request_header_t *msginfo) {
3043 int index=-1, i;
3044
3045 assert(!msginfo->tag.imm_msg);
3046 for(i = 0; i<PENDING_BUF_NUM; i++) {
3047 if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
3048 index = i;
3049 break;
3050 }
3051 }
3052 return index;
3053 }
3054
3055
3056 /** Routine for server to RDMA strided data to the client-side buffers
3057 * (allocated through buffers.c). This is to be used instead of
3058 * copying the data to immediate or pending buffers when possible.
3059 */
3060 #if 0
3061 void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
3062 int seg_count[],
3063 int stride_levels,
3064 char *dst_ptr, int proc,
3065 request_header_t *msginfo) {
3066 int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr, wr_id;
3067 sr_descr_t *dirdscr;
3068 struct ibv_send_wr *bad_wr, sdscr1;
3069 struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
3070 struct ibv_sge sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
3071 stride_info_t sinfo;
3072 uint64_t daddr;
3073 ARMCI_MEMHDL_T *loc_memhdl;
3074 ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
3075
3076 THREAD_LOCK(armci_user_threads.net_lock);
3077
3078 assert(msginfo->operation == GET);
3079 assert(stride_levels >= 0);
3080 assert(stride_levels<=MAX_STRIDE_LEVEL);
3081
3082 if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
3083 armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
3084 }
3085
3086 if(!msginfo->tag.imm_msg) {
3087 int index = armci_server_msginfo_to_pbuf_index(msginfo);
3088 assert(index>=0);
3089 wr_id = PBUF_BUFID_TO_PUT_WRID(index);
3090 }
3091 else {
3092 wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3093 }
3094 bzero(&sdscr1, sizeof(sdscr1));
3095 sdscr1.wr_id = wr_id;
3096
3097 if(DEBUG_CLN) {
3098 printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
3099 armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3100 fflush(stdout);
3101 }
3102
3103 /*initialize fixed values for descriptors*/
3104 bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
3105 bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
3106 for(j=0; j<WQE_LIST_COUNT; j++) {
3107 for(i=0; i<WQE_LIST_LENGTH; i++) {
3108 armci_init_cbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3109 sdscr[j][i].wr_id = wr_id;
3110 sdscr[j][i].send_flags = 0; /*non-signalled*/
3111 /* sdscr[j][i].send_flags = IBV_SEND_SIGNALED; /\*signalled*\/ */
3112 if(i<WQE_LIST_LENGTH-1)
3113 sdscr[j][i].next = &sdscr[j][i+1];
3114 }
3115 }
3116
3117 /*post requests in a loop*/
3118 sinfo = armci_stride_info_init(src_ptr,stride_levels,src_stride_arr,seg_count);
3119
3120 clst=ctr=0;
3121 bzero(busy, sizeof(int)*WQE_LIST_COUNT);
3122 daddr = (uint64_t)dst_ptr;
3123 while(armci_stride_info_has_more(sinfo)) {
3124 uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(sinfo);
3125 sg_entry[clst][ctr].addr = saddr;
3126 sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
3127 assert(sg_entry[clst][ctr].length == seg_count[0]);
3128
3129 ctr+=1;
3130 daddr += seg_count[0];
3131 armci_stride_info_next(sinfo);
3132 if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(sinfo)) {
3133 sdscr[clst][ctr-1].next=NULL;
3134 if(!armci_stride_info_has_more(sinfo)) {
3135 sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3136 }
3137 for(c=0; c<ctr-1; c++) {
3138 assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
3139 }
3140 rc = ibv_post_send(CLN_con[proc].qp, sdscr[clst], &bad_wr);
3141 dassert1(1,rc==0,rc);
3142 busy[clst] = 1;
3143 #if 0
3144 armci_send_complete(&sdscr1,"serv_rdma_to_contig",ctr);
3145 busy[clst] = 0;
3146 #endif
3147 if(ctr<WQE_LIST_LENGTH)
3148 sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
3149 sdscr[clst][ctr-1].send_flags = 0;
3150
3151 ctr=0;
3152 clst = (clst+1)%WQE_LIST_COUNT;
3153 #if 0
3154 if(busy[clst]) {
3155 armci_send_complete(&sdscr1,"client_direct_rdma_strided",1);
3156 busy[clst]=0;
3157 }
3158 #endif
3159 }
3160 }
3161 armci_stride_info_destroy(&sinfo);
3162 assert(proc == msginfo->from);
3163 THREAD_UNLOCK(armci_user_threads.net_lock);
3164 }
3165 #else
3166
3167 #define MAX_NUM_SGE 64
3168
3169 /*same as above, but uses gather rdma writes*/
armci_server_rdma_strided_to_contig(char * src_ptr,int src_stride_arr[],int seg_count[],int stride_levels,char * dst_ptr,int proc,request_header_t * msginfo)3170 void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
3171 int seg_count[],
3172 int stride_levels,
3173 char *dst_ptr, int proc,
3174 request_header_t *msginfo) {
3175 int rc, ctr, wr_id, bytes;
3176 struct ibv_send_wr *bad_wr, sdscr1, sdscr;
3177 struct ibv_sge sg_entry[MAX_NUM_SGE];
3178 stride_info_t sinfo;
3179 uint64_t daddr;
3180 ARMCI_MEMHDL_T *loc_memhdl;
3181 ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
3182 const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
3183 int numposts=0, numsegs=0;
3184
3185 THREAD_LOCK(armci_user_threads.net_lock);
3186
3187 assert(msginfo->operation == GET);
3188 assert(stride_levels >= 0);
3189 assert(stride_levels<=MAX_STRIDE_LEVEL);
3190
3191 if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
3192 armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
3193 }
3194
3195 if(!msginfo->tag.imm_msg) {
3196 int index = armci_server_msginfo_to_pbuf_index(msginfo);
3197 assert(index>=0);
3198 wr_id = PBUF_BUFID_TO_PUT_WRID(index);
3199 }
3200 else {
3201 wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3202 }
3203 bzero(&sdscr1, sizeof(sdscr1));
3204 sdscr1.wr_id = wr_id;
3205
3206 if(DEBUG_CLN) {
3207 printf("\n%d: in rdma strided to contig id=%d lkey=%u rkey=%u\n",
3208 armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3209 fflush(stdout);
3210 }
3211
3212 /*initialize fixed values for descriptors*/
3213 bzero(&sdscr, sizeof(sdscr));
3214 bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
3215 armci_init_cbuf_srdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3216 sdscr.send_flags = 0; /*non-signalled*/
3217 sdscr.num_sge = 0; /*set below in the loop*/
3218 sdscr.wr_id = wr_id;
3219
3220 for(ctr=0; ctr<max_num_sge; ctr++) {
3221 sg_entry[ctr].length = seg_count[0];
3222 sg_entry[ctr].lkey = loc_memhdl->lkey;
3223 }
3224
3225 /*post requests in a loop*/
3226 armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
3227
3228 numposts = numsegs = 0;
3229 ctr=0;
3230 daddr = (uint64_t)dst_ptr;
3231 bytes=0;
3232 while(armci_stride_info_has_more(&sinfo)) {
3233 sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
3234 assert(sg_entry[ctr].length == seg_count[0]);
3235
3236 sdscr.num_sge += 1;
3237 bytes += seg_count[0];
3238 ctr+=1;
3239 numsegs += 1;
3240 armci_stride_info_next(&sinfo);
3241 if(ctr == max_num_sge || !armci_stride_info_has_more(&sinfo)) {
3242 sdscr.wr.rdma.remote_addr = daddr;
3243 if(!armci_stride_info_has_more(&sinfo)) {
3244 sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3245 }
3246 else {
3247 assert(sdscr.send_flags == 0);
3248 }
3249 assert(ctr == sdscr.num_sge);
3250 rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
3251 dassert1(1,rc==0,rc);
3252
3253 numposts += 1;
3254 ctr=0;
3255 sdscr.num_sge = 0;
3256 daddr += bytes;
3257 bytes = 0;
3258 }
3259 }
3260 /* printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
3261 armci_stride_info_destroy(&sinfo);
3262 assert(proc == msginfo->from);
3263 THREAD_UNLOCK(armci_user_threads.net_lock);
3264 }
3265
3266 /*Directly read data from client buffers into remote memory. Data is
3267 contiguous in client-side. */
armci_server_rdma_contig_to_strided(char * src_ptr,int proc,char * dst_ptr,int dst_stride_arr[],int seg_count[],int stride_levels,request_header_t * msginfo)3268 void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
3269 char *dst_ptr,
3270 int dst_stride_arr[],
3271 int seg_count[],
3272 int stride_levels,
3273 request_header_t *msginfo) {
3274 int rc, ctr, wr_id, bytes;
3275 struct ibv_send_wr *bad_wr, sdscr1, sdscr;
3276 struct ibv_sge sg_entry[MAX_NUM_SGE];
3277 stride_info_t dinfo;
3278 uint64_t saddr;
3279 ARMCI_MEMHDL_T *loc_memhdl;
3280 ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
3281 const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
3282 int numposts=0, numsegs=0;
3283
3284 THREAD_LOCK(armci_user_threads.net_lock);
3285
3286 assert(msginfo->operation == PUT);
3287 assert(stride_levels >= 0);
3288 assert(stride_levels<=MAX_STRIDE_LEVEL);
3289
3290 if(!get_armci_region_local_hndl(dst_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
3291 armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
3292 }
3293
3294 if(!msginfo->tag.imm_msg) {
3295 int index = armci_server_msginfo_to_pbuf_index(msginfo);
3296 assert(index>=0);
3297 wr_id = PBUF_BUFID_TO_GET_WRID(index);
3298 }
3299 else {
3300 wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3301 }
3302 bzero(&sdscr1, sizeof(sdscr1));
3303 sdscr1.wr_id = wr_id;
3304
3305 if(DEBUG_CLN) {
3306 printf("\n%d: in rdma strided to contig id=%d lkey=%u rkey=%u\n",
3307 armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3308 fflush(stdout);
3309 }
3310
3311 /*initialize fixed values for descriptors*/
3312 bzero(&sdscr, sizeof(sdscr));
3313 bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
3314 armci_init_cbuf_rrdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3315 sdscr.send_flags = 0; /*non-signalled*/
3316 sdscr.num_sge = 0; /*set below in the loop*/
3317 sdscr.wr_id = wr_id;
3318
3319 for(ctr=0; ctr<max_num_sge; ctr++) {
3320 sg_entry[ctr].length = seg_count[0];
3321 sg_entry[ctr].lkey = loc_memhdl->lkey;
3322 }
3323
3324 /*post requests in a loop*/
3325 armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
3326
3327 numposts = numsegs = 0;
3328 ctr=0;
3329 saddr = (uint64_t)src_ptr;
3330 bytes=0;
3331 while(armci_stride_info_has_more(&dinfo)) {
3332 sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
3333 assert(sg_entry[ctr].length == seg_count[0]);
3334
3335 sdscr.num_sge += 1;
3336 bytes += seg_count[0];
3337 ctr+=1;
3338 numsegs += 1;
3339 armci_stride_info_next(&dinfo);
3340 if(ctr == max_num_sge || !armci_stride_info_has_more(&dinfo)) {
3341 sdscr.wr.rdma.remote_addr = saddr;
3342 if(!armci_stride_info_has_more(&dinfo)) {
3343 sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3344 }
3345 else {
3346 assert(sdscr.send_flags == 0);
3347 }
3348 assert(ctr == sdscr.num_sge);
3349 rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
3350 dassert1(1,rc==0,rc);
3351
3352 numposts += 1;
3353 ctr=0;
3354 sdscr.num_sge = 0;
3355 saddr += bytes;
3356 bytes = 0;
3357 }
3358 }
3359 /* printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
3360 armci_stride_info_destroy(&dinfo);
3361 assert(proc == msginfo->from);
3362 THREAD_UNLOCK(armci_user_threads.net_lock);
3363 }
3364
3365 #endif
3366 #endif
3367
armci_ReadFromDirect(int proc,request_header_t * msginfo,int len)3368 char *armci_ReadFromDirect(int proc, request_header_t *msginfo, int len)
3369 {
3370 int cluster = armci_clus_id(proc);
3371 vapibuf_ext_t* ecbuf=BUF_TO_ECBUF(msginfo);
3372 char *dataptr = GET_DATA_PTR(ecbuf->buf);
3373 extern void armci_util_wait_int(volatile int *,int,int);
3374
3375 if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
3376 len,(void*)&(SRV_con+cluster)->qp); fflush(stdout);
3377 }
3378
3379 if(mark_buf_send_complete[ecbuf->snd_dscr.wr_id]==0)
3380 armci_send_complete(&(ecbuf->snd_dscr),"armci_ReadFromDirect",1);
3381
3382 if(!msginfo->bypass){
3383 long *flag;
3384 int *last;
3385 int loop = 0;
3386 flag = &(msginfo->tag.ack);
3387 if(msginfo->operation==GET){
3388 last = (int *)(dataptr+len-sizeof(int));
3389 if(msginfo->dscrlen >= (len-sizeof(int))){
3390 last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
3391 dataptr+=msginfo->dscrlen;
3392 }
3393
3394 if(DEBUG_CLN){
3395 printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
3396 (void*)last,*flag,len);fflush(stdout);
3397 }
3398
3399 while(armci_util_int_getval(last) == ARMCI_STAMP &&
3400 armci_util_long_getval(flag) != ARMCI_STAMP){
3401 loop++;
3402 loop %=100000;
3403 if(loop==0){
3404 if(DEBUG_CLN){
3405 printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
3406 armci_me,(void*)last,*last,(void*)flag,*flag,msginfo->datalen);
3407 fflush(stdout);
3408 }
3409 }
3410 }
3411 *flag = 0L;
3412 }
3413 else if(msginfo->operation == REGISTER){
3414 while(armci_util_long_getval(flag) != ARMCI_STAMP){
3415 loop++;
3416 loop %=100000;
3417 if(loop==0){
3418 if(DEBUG_CLN){
3419 printf("%d: client flag(%p)=%ld off=%d\n",
3420 armci_me,(void*)flag,*flag,msginfo->datalen);
3421 fflush(stdout);
3422 }
3423 }
3424 }
3425 }
3426 else{
3427 int *flg = (int *)(dataptr+len);
3428 while(armci_util_int_getval(flg) != ARMCI_STAMP){
3429 loop++;
3430 loop %=100000;
3431 if(loop==0){
3432 if(DEBUG_CLN){
3433 printf("%d: client waiting (%p)=%d off=%d\n",
3434 armci_me,(void*)flg,*flg,len);
3435 fflush(stdout);
3436 }
3437 }
3438 }
3439 }
3440 }
3441 return dataptr;
3442 }
3443
3444
3445 #ifdef GET_STRIDED_COPY_PIPELINED
3446 /**Same as armci_ReadFromDirect, except reads partial segments
3447 * (identify by stamping done in armci_send_req_msg() and
3448 * returns. Note that the return value is the starting pointer of the
3449 * buffer containig the data. It is the same for all the segments
3450 * read for a message.
3451 * @param proc IN Read data corresponding to an earlier req to this proc
3452 * @param msginfo IN The request for which we are reading now
3453 * @param len IN #bytes in the total response
3454 * @param bytes_done OUT @bytes of the total response read so far (monotonic)
3455 * @return Starting pointer to the buffer containing the data
3456 */
armci_ReadFromDirectSegment(int proc,request_header_t * msginfo,int len,int * bytes_done)3457 char *armci_ReadFromDirectSegment(int proc, request_header_t *msginfo, int len, int *bytes_done) {
3458 int cluster = armci_clus_id(proc);
3459 vapibuf_ext_t* ecbuf=BUF_TO_ECBUF(msginfo);
3460 char *dataptr = GET_DATA_PTR(ecbuf->buf);
3461 extern void armci_util_wait_int(volatile int *,int,int);
3462
3463 if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
3464 len,(void*)&(SRV_con+cluster)->qp); fflush(stdout);
3465 }
3466
3467 if(mark_buf_send_complete[ecbuf->snd_dscr.wr_id]==0)
3468 armci_send_complete(&(ecbuf->snd_dscr),"armci_ReadFromDirect",1);
3469
3470 if(!msginfo->bypass){
3471 long *flag;
3472 int *last;
3473 int loop = 0;
3474 flag = &(msginfo->tag.ack);
3475 if(msginfo->operation==GET){
3476 last = (int *)(dataptr+len-sizeof(int));
3477 if(msginfo->dscrlen >= (len-sizeof(int))){
3478 last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
3479 dataptr+=msginfo->dscrlen;
3480 }
3481
3482 if(DEBUG_CLN){
3483 printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
3484 (void*)last,*flag,len);fflush(stdout);
3485 }
3486
3487 while(armci_util_int_getval(last) == ARMCI_STAMP &&
3488 armci_util_long_getval(flag) != ARMCI_STAMP){
3489 loop++;
3490 loop %=100000;
3491 if(loop==0){
3492 if(DEBUG_CLN){
3493 printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
3494 armci_me,(void*)last,*last,(void*)flag,*flag,msginfo->datalen);
3495 fflush(stdout);
3496 }
3497 }
3498
3499 {
3500 int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
3501 int *sfirst = (int*)(msginfo->dscrlen+(char*)(msginfo+1))+ssize; /*stamping
3502 can start here*/
3503 int *slast = last;
3504 int off = (((int *)(dataptr+*bytes_done)-sfirst+ssize)/ssize)*ssize;
3505 int *ptr = sfirst+off;
3506 dassert(1,off>=0);
3507 dassert(1,(char *)sfirst>dataptr);
3508 dassert(1,(char *)ptr>dataptr);
3509 if(ptr<=slast && armci_util_int_getval(ptr)!=ARMCI_STAMP) {
3510 *bytes_done = ((char*)ptr)-dataptr;
3511 return dataptr;
3512 }
3513 }
3514 }
3515 *flag = 0L;
3516 *bytes_done = len;
3517 return dataptr;
3518 }
3519 else if(msginfo->operation == REGISTER){
3520 while(armci_util_long_getval(flag) != ARMCI_STAMP){
3521 loop++;
3522 loop %=100000;
3523 if(loop==0){
3524 if(DEBUG_CLN){
3525 printf("%d: client flag(%p)=%ld off=%d\n",
3526 armci_me,(void*)flag,*flag,msginfo->datalen);
3527 fflush(stdout);
3528 }
3529 }
3530 }
3531 }
3532 else{
3533 int *flg = (int *)(dataptr+len);
3534 while(armci_util_int_getval(flg) != ARMCI_STAMP){
3535 loop++;
3536 loop %=100000;
3537 if(loop==0){
3538 if(DEBUG_CLN){
3539 printf("%d: client waiting (%p)=%d off=%d\n",
3540 armci_me,(void*)flg,*flg,len);
3541 fflush(stdout);
3542 }
3543 }
3544 }
3545 }
3546 }
3547 *bytes_done = len;
3548 return dataptr;
3549 }
3550 #endif
3551
3552 /**
3553 * @param proc IN id of remote client to put to
3554 * @param buf IN local buf (has to be registered)
3555 */
armci_send_data_to_client(int proc,void * buf,int bytes,void * dbuf)3556 void armci_send_data_to_client(int proc, void *buf, int bytes,void *dbuf)
3557 {
3558 int i, rc = 0;
3559 struct ibv_send_wr *bad_wr;
3560 struct ibv_send_wr sdscr;
3561 struct ibv_sge ssg_entry;
3562
3563 if(DEBUG_SERVER){
3564 printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
3565 armci_me,
3566 proc,dbuf,(char *)dbuf+bytes-sizeof(int),bytes);fflush(stdout);
3567 }
3568
3569 memset(&sdscr,0,sizeof(struct ibv_send_wr));
3570 memset(&ssg_entry,0,sizeof(ssg_entry));
3571 armci_init_cbuf_srdma(&sdscr,&ssg_entry,buf,dbuf,bytes,
3572 &serv_memhandle,(handle_array+proc));
3573
3574 if(DEBUG_SERVER){
3575 printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
3576 proc,(void*)&handle_array[proc],(char *)dbuf,
3577 (char *)dbuf+bytes-sizeof(int),bytes);
3578 fflush(stdout);
3579 }
3580
3581 #if defined(PEND_BUFS)
3582 for(i=proc*(IMM_BUF_NUM+1); i<(proc+1)*(IMM_BUF_NUM+1); i++) {
3583 if((char*)buf>= serv_buf_arr[i]->buf &&
3584 (char*)buf<IMM_BUF_LEN+(char*)serv_buf_arr[i]->buf)
3585 break;
3586 }
3587
3588 #if SRI_CORRECT
3589 if(i<(proc+1)*(IMM_BUF_NUM+1)) {
3590 /*Message from an immediate buffer*/
3591 assert(serv_buf_arr[i]->send_pending==0);
3592 serv_buf_arr[i]->send_pending=1;
3593 sdscr.wr_id = DSCRID_IMMBUF_RESP+i;
3594 }
3595 else
3596 #endif
3597 {
3598 sdscr.wr_id = DSCRID_IMMBUF_RESP+armci_nproc*(IMM_BUF_NUM+1)+1;
3599 }
3600 /* #endif */
3601
3602 /* #if defined(PEND_BUFS) */
3603 /* { */
3604 /* static uint64_t ctr=DSCRID_IMMBUF_RESP; */
3605 /* sdscr.wr_id = ctr; */
3606 /* ctr = (ctr+1-DSCRID_IMMBUF_RESP)%(DSCRID_IMMBUF_RESP_END-DSCRID_IMMBUF_RESP)+DSCRID_IMMBUF_RESP; */
3607 /* } */
3608 #else
3609 sdscr.wr_id = proc+armci_nproc;
3610 #endif
3611 rc = ibv_post_send((CLN_con+proc)->qp, &sdscr, &bad_wr);
3612 dassert1(1,rc==0,rc);
3613
3614 #if !defined(PEND_BUFS)
3615 armci_send_complete(&sdscr,"armci_send_data_to_client",1);
3616 #endif
3617 }
3618
armci_WriteToDirect(int proc,request_header_t * msginfo,void * buf)3619 void armci_WriteToDirect(int proc, request_header_t* msginfo, void *buf)
3620 {
3621 int bytes;
3622 int *last;
3623 ARMCI_PR_DBG("enter",0);
3624 bytes = (int)msginfo->datalen;
3625 if(DEBUG_SERVER){
3626 printf("%d(s):write to direct sent %d to %d at %p\n",armci_me,
3627 bytes,proc,(char *)msginfo->tag.data_ptr);
3628 fflush(stdout);
3629 }
3630 if(msginfo->operation!=GET){
3631 *(int *)((char *)buf+bytes)=ARMCI_STAMP;
3632 bytes+=sizeof(int);
3633 }
3634 #if defined(PEND_BUFS)
3635 if(!msginfo->tag.imm_msg) {
3636 int i;
3637 /* fprintf(stderr, "%d:: Not immediate mesg operated on\n", armci_me); */
3638 assert(msginfo->operation == GET); /*nothing else uses this for now*/
3639 /**This is a pending buf*/
3640 vapibuf_pend_t *pbuf=NULL;
3641 int index;
3642 for(i = 0; i<PENDING_BUF_NUM; i++) {
3643 if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
3644 pbuf = &serv_pendbuf_arr[i];
3645 index = i;
3646 break;
3647 }
3648 }
3649 assert(pbuf != NULL);
3650 assert(sizeof(request_header_t)+msginfo->dscrlen+bytes<PENDING_BUF_LEN);
3651 _armci_send_data_to_client_pbuf(proc, &pbuf->sdscr,
3652 PBUF_BUFID_TO_PUT_WRID(index),
3653 &pbuf->sg_entry,
3654 msginfo->tag.data_ptr, buf,
3655 bytes);
3656 }
3657 else
3658 #endif
3659 {
3660 armci_send_data_to_client(proc,buf,bytes,msginfo->tag.data_ptr);
3661 }
3662 /*if(msginfo->dscrlen >= (bytes-sizeof(int)))
3663 last = (int*)(((char*)(buf)) + (msginfo->dscrlen+bytes - sizeof(int)));
3664 else*/
3665 last = (int*)(((char*)(buf)) + (bytes - sizeof(int)));
3666
3667 if(msginfo->operation==GET && *last == ARMCI_STAMP){
3668 SERVER_SEND_ACK(msginfo->from);
3669 }
3670 armci_ack_proc=NONE;
3671 ARMCI_PR_DBG("exit",0);
3672 }
3673
3674
3675 #if defined(PEND_BUFS)
armci_rcv_req(void * mesg,void * phdr,void * pdescr,void * pdata,int * buflen)3676 void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
3677 {
3678 request_header_t *msginfo = *(request_header_t**)mesg;
3679 *(void **)phdr = msginfo;
3680
3681 if(msginfo->tag.imm_msg)
3682 *buflen = IMM_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
3683 else
3684 *buflen = PENDING_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
3685
3686 *(void **)pdata = msginfo->dscrlen + (char *)(msginfo+1);
3687 if(msginfo->bytes)
3688 *(void **)pdescr = msginfo+1;
3689 else
3690 *(void **)pdescr = NULL;
3691 }
3692 #else
armci_rcv_req(void * mesg,void * phdr,void * pdescr,void * pdata,int * buflen)3693 void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
3694 {
3695 vapibuf_t *cbuf = (vapibuf_t*)mesg;
3696 request_header_t *msginfo = (request_header_t *)cbuf->buf;
3697 *(void **)phdr = msginfo;
3698
3699 ARMCI_PR_DBG("enter",msginfo->operation);
3700 if(DEBUG_SERVER){
3701 printf("%d(server): got %d req (dscrlen=%d datalen=%d) from %d\n",
3702 armci_me, msginfo->operation, msginfo->dscrlen,
3703 msginfo->datalen, msginfo->from); fflush(stdout);
3704 }
3705
3706 /* we leave room for msginfo on the client side */
3707 *buflen = MSG_BUFLEN - sizeof(request_header_t);
3708
3709 if(msginfo->bytes) {
3710 *(void **)pdescr = msginfo+1;
3711 if(msginfo->operation == GET)
3712 *(void **)pdata = MessageRcvBuffer;
3713 else
3714 *(void **)pdata = msginfo->dscrlen + (char*)(msginfo+1);
3715 }else {
3716 *(void**)pdescr = NULL;
3717 *(void**)pdata = MessageRcvBuffer;
3718 }
3719 ARMCI_PR_DBG("exit",msginfo->operation);
3720 }
3721 #endif
3722
posts_scatter_desc(sr_descr_t * pend_dscr,int proc,int type)3723 static void posts_scatter_desc(sr_descr_t *pend_dscr,int proc,int type)
3724 {
3725 int rc;
3726 int cluster = armci_clus_id(proc);
3727 struct ibv_recv_wr *scat_dscr;
3728 struct ibv_recv_wr *bad_wr;
3729
3730 scat_dscr = &pend_dscr->rdescr;
3731
3732 /*armci_vapi_print_dscr_info(NULL,scat_dscr);*/
3733 if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3734 printf("%d(%d) : inside posts scatter dscr, id is %lu\n",
3735 armci_me,type,scat_dscr->wr_id);
3736 fflush(stdout);
3737 }
3738
3739 if(type == SERV)
3740 rc = ibv_post_recv((CLN_con + proc)->qp, scat_dscr, &bad_wr);
3741 else
3742 rc = ibv_post_recv((SRV_con+cluster)->qp, scat_dscr, &bad_wr);
3743 dassert1(1,rc==0,rc);
3744
3745 if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
3746 printf("\n%d: list_length is %d, id is %ld\n",
3747 armci_me,scat_dscr->num_sge,scat_dscr->wr_id);
3748 fflush(stdout);
3749 }
3750 }
3751
3752
3753 /*\
3754 * client calls from request.c
3755 * server calls from ds-shared.c
3756 \*/
3757 static sr_descr_t client_blocking_scatter_dscr;
armci_post_scatter(void * dest_ptr,int dest_stride_arr[],int count[],int stride_levels,armci_vapi_memhndl_t * mhandle,int proc,int nbtag,int type,sr_descr_t ** srd)3758 void armci_post_scatter(void *dest_ptr, int dest_stride_arr[], int count[],
3759 int stride_levels, armci_vapi_memhndl_t *mhandle,
3760 int proc, int nbtag, int type, sr_descr_t **srd)
3761 {
3762 int i;
3763 int total_of_2D = 1;
3764 int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
3765 int j,k,y;
3766 int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
3767 char* src, *src1;
3768 sr_descr_t *pend_dscr;
3769 struct ibv_sge *scat_sglist;
3770 struct ibv_recv_wr *scat_dscr;
3771
3772 if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
3773 printf("%d(%d) : inside post_scatter %d\n",armci_me,type,nbtag);
3774 fflush(stdout);
3775 }
3776
3777 max_seg = armci_max_num_sg_ent;
3778
3779 THREAD_LOCK(armci_user_threads.net_lock);
3780
3781 if(nbtag){
3782 pend_dscr = armci_vapi_get_next_rdescr(nbtag,1);
3783 if(srd!=NULL)*srd=pend_dscr;
3784 }
3785 else{
3786 pend_dscr = &client_blocking_scatter_dscr;
3787 pend_dscr->rdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
3788 }
3789
3790 /*pend_dscr->proc = proc;*/
3791 pend_dscr->numofrecvs=0;
3792
3793 scat_dscr = &pend_dscr->rdescr;
3794 scat_sglist = pend_dscr->sg_entry;
3795 /* scat_dscr->opcode = VAPI_RECEIVE; no ->opcode in ibv_recv_wr */
3796 /* scat_dscr->comp_type = VAPI_SIGNALED; no ->comp_type in ibv_recv_wr */
3797 scat_dscr->sg_list = scat_sglist;
3798 scat_dscr->num_sge = 0;
3799
3800 index[2] = 0; unit[2] = 1;
3801 if(stride_levels > 1){
3802 total_of_2D = count[2];
3803 for(j=3; j<=stride_levels; j++){
3804 index[j] = 0; unit[j] = unit[j-1]*count[j-1];
3805 total_of_2D*=count[j];
3806 }
3807 }
3808
3809 num_xmit = total_of_2D*count[1]/max_seg;
3810 rem_seg = (total_of_2D*count[1])%max_seg;
3811 if(num_xmit == 0) num_xmit = 1;
3812 else if(rem_seg!= 0)num_xmit++;
3813
3814
3815 if ((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
3816 printf("%d(%d):armci_post_scatter num_xmit = %d\t, rem_seg = %d\n",
3817 armci_me,type,num_xmit,rem_seg);
3818 fflush(stdout);
3819 }
3820
3821 k=0; vecind = 0;
3822 if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3823 else num_seg = max_seg;
3824
3825 y=0;
3826 for(i=0;i<total_of_2D;i++){
3827 src = (char *)dest_ptr;
3828 for(j=2;j<=stride_levels;j++){
3829 src+= index[j]*dest_stride_arr[j-1];
3830 if(((i+1)%unit[j]) == 0) index[j]++;
3831 if(index[j] >= count[j]) index[j] =0;
3832 }
3833 src1 = src;
3834
3835 for(j=0; j<count[1]; j++, vecind++){
3836 if(vecind == num_seg) {
3837 posts_scatter_desc(pend_dscr,proc,type);
3838 pend_dscr->numofrecvs++;
3839
3840 /* the previous one has been posted, start off new*/
3841 scat_dscr->num_sge = 0;
3842 y = 0; /* reuse the same scatter descriptor */
3843 vecind=0;k++;
3844 if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3845 }
3846 /* fill the scatter descriptor */
3847 scat_sglist[y].addr = (uint64_t)src1;
3848 scat_sglist[y].lkey = mhandle->lkey;
3849 scat_sglist[y].length = count[0];
3850 scat_dscr->num_sge++;
3851 src1 += dest_stride_arr[0];
3852 y++;
3853
3854 }
3855
3856 if(vecind == num_seg){
3857 posts_scatter_desc(pend_dscr,proc,type);
3858 pend_dscr->numofrecvs++;
3859
3860 /* the previous one has been posted, start off new*/
3861 scat_dscr->num_sge = 0;
3862 y =0 ;
3863 vecind = 0; k++;
3864 if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
3865 else num_seg = max_seg;
3866 }
3867
3868 }
3869
3870 THREAD_UNLOCK(armci_user_threads.net_lock);
3871
3872 /* printf("%d(s): num scatters posted=%d\n", armci_me,pend_dscr->numofrecvs); */
3873 if(!nbtag){
3874 /*if blocking call wait_for_blocking_scatter to complete*/
3875 }
3876 return;
3877 }
3878
armci_wait_for_blocking_scatter()3879 void armci_wait_for_blocking_scatter()
3880 {
3881 sr_descr_t *pend_dscr=&client_blocking_scatter_dscr;
3882 armci_recv_complete(&pend_dscr->rdescr,"armci_post_scatter",pend_dscr->numofrecvs);
3883 }
3884
3885
3886 /*\
3887 * function used by armci_post_gather to actually post the sctter list
3888 \*/
posts_gather_desc(sr_descr_t * pend_dscr,int proc,int type)3889 static void posts_gather_desc(sr_descr_t *pend_dscr,int proc,int type)
3890 {
3891 int rc;
3892 int cluster = armci_clus_id(proc);
3893 struct ibv_send_wr *gat_dscr;
3894 struct ibv_send_wr *bad_wr;
3895
3896 THREAD_LOCK(armci_user_threads.net_lock);
3897
3898 gat_dscr = &pend_dscr->sdescr;
3899 /*armci_vapi_print_dscr_info(gat_dscr,NULL);*/
3900 if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3901 printf("%d: type(client=1)=%d inside posts gather dscr, id is %lu\n",
3902 armci_me,type,gat_dscr->wr_id);
3903 fflush(stdout);
3904 }
3905
3906 rc = 0;
3907 if(type == CLN){
3908 rc = ibv_post_send((SRV_con+cluster)->qp, gat_dscr, &bad_wr);
3909 }
3910 else{
3911 rc = ibv_post_send((CLN_con + proc)->qp, gat_dscr, &bad_wr);
3912 }
3913 dassert1(1,rc==0,rc);
3914
3915 THREAD_UNLOCK(armci_user_threads.net_lock);
3916
3917 }
3918
3919 /*\
3920 * posts a bunch of gather descriptors
3921 \*/
3922 static sr_descr_t client_blocking_gather_dscr;
armci_post_gather(void * src_ptr,int src_stride_arr[],int count[],int stride_levels,armci_vapi_memhndl_t * mhandle,int proc,int nbtag,int type,sr_descr_t ** srd)3923 void armci_post_gather(void *src_ptr, int src_stride_arr[], int count[],
3924 int stride_levels, armci_vapi_memhndl_t *mhandle,
3925 int proc,int nbtag, int type, sr_descr_t **srd)
3926 {
3927 int i;
3928 int total_of_2D = 1;
3929 int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
3930 int j,k,y;
3931 char *src, *src1;
3932 int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
3933 sr_descr_t *pend_dscr;
3934
3935 struct ibv_sge *gat_sglist;
3936 struct ibv_send_wr *gat_dscr;
3937
3938 if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3939 printf("%d(%d) : inside post_gather\n",armci_me,type);
3940 fflush(stdout);
3941 }
3942
3943 max_seg = armci_max_num_sg_ent;
3944 if(nbtag){
3945 pend_dscr = armci_vapi_get_next_sdescr(nbtag,1);
3946 if(srd!=NULL)*srd=pend_dscr;
3947 }
3948 else{
3949 pend_dscr = &client_blocking_gather_dscr;
3950 pend_dscr->sdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
3951 }
3952 pend_dscr->numofsends=0;
3953
3954 gat_dscr = &pend_dscr->sdescr;
3955 gat_sglist = pend_dscr->sg_entry;
3956 gat_dscr->opcode = IBV_WR_SEND;
3957 gat_dscr->send_flags = IBV_SEND_SIGNALED;
3958 gat_dscr->sg_list = gat_sglist;
3959 gat_dscr->num_sge = 0;
3960 /* gat_dscr->send_flags = 0; */
3961
3962 index[2] = 0; unit[2] = 1;
3963 if(stride_levels > 1){
3964 total_of_2D = count[2];
3965 for(j=3; j<=stride_levels; j++){
3966 index[j] = 0; unit[j] = unit[j-1]*count[j-1];
3967 total_of_2D*=count[j];
3968 }
3969 }
3970
3971 num_xmit = total_of_2D*count[1]/max_seg;
3972 rem_seg = (total_of_2D*count[1])%max_seg;
3973 if(num_xmit == 0) num_xmit = 1;
3974 else if(rem_seg!= 0)num_xmit++;
3975
3976 if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
3977 printf("%d(%d):armci_post_gather total_2D=%d, num_xmit=%d, rem_seg =%d, count[1] = %d\n",armci_me,type,total_of_2D, num_xmit,rem_seg,count[1]);
3978 fflush(stdout);
3979 }
3980
3981 k=0; vecind = 0;
3982 if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3983 else num_seg = max_seg;
3984
3985 y=0;
3986 for(i=0;i<total_of_2D;i++){
3987 src = (char *)src_ptr;
3988 for(j=2;j<=stride_levels;j++){
3989 src+= index[j]*src_stride_arr[j-1];
3990 if(((i+1)%unit[j]) == 0) index[j]++;
3991 if(index[j] >= count[j]) index[j] =0;
3992 }
3993 src1 = src;
3994
3995 for(j=0; j<count[1]; j++, vecind++){
3996 if(vecind == num_seg){
3997 posts_gather_desc(pend_dscr,proc,type);
3998 pend_dscr->numofsends++;
3999
4000 /* the previous one has been posted, start off new*/
4001 gat_dscr->num_sge = 0;
4002 y = 0;
4003 vecind=0;k++;
4004 if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
4005 }
4006
4007 /* fill the gather descriptor */
4008 gat_sglist[y].addr = (uint64_t)src1;
4009 gat_sglist[y].lkey = mhandle->lkey;
4010 gat_sglist[y].length = count[0];
4011 gat_dscr->num_sge++;
4012 src1 += src_stride_arr[0];
4013 y++;
4014
4015 }
4016
4017 if(vecind == num_seg){
4018 posts_gather_desc(pend_dscr,proc,type);
4019 pend_dscr->numofsends++;
4020 if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
4021 printf("%d(%d)posts_gather_desc done\n",armci_me,type);
4022 fflush(stdout);
4023 }
4024
4025 /* the previous one has been posted, start off new*/
4026 gat_dscr->num_sge = 0;
4027 y = 0;
4028 vecind = 0; k++;
4029 if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
4030 else num_seg = max_seg;
4031 }
4032 }
4033 /* printf("%d: num gathers posted =%d\n",armci_me,pend_dscr->numofsends); */
4034 if(!nbtag){
4035 /*complete here*/
4036 armci_send_complete(&pend_dscr->sdescr,"armci_post_gather",pend_dscr->numofsends);
4037 }
4038 return;
4039 }
4040 /***********************END SCATTER GATHER STUFF******************************/
4041
4042
4043
4044 /***********************SPECIAL SEND/RECV*************************************/
armci_server_direct_send(int dst,char * src_buf,char * dst_buf,int len,uint32_t * lkey,uint32_t * rkey)4045 void armci_server_direct_send(int dst, char *src_buf, char *dst_buf, int len,
4046 uint32_t *lkey, uint32_t *rkey)
4047 {
4048 int rc = 0;
4049 struct ibv_wc *pdscr=NULL;
4050 struct ibv_wc pdscr1;
4051 struct ibv_send_wr sdscr;
4052 struct ibv_sge ssg_entry;
4053
4054 pdscr = &pdscr1;
4055
4056 if(DEBUG_SERVER){
4057 printf("\n%d(s):sending dir data to client %d at %p bytes=%d last=%p\n",
4058 armci_me,dst,dst_buf,len,(dst_buf+len-4));fflush(stdout);
4059 }
4060
4061 memset(&sdscr,0,sizeof(struct ibv_send_wr));
4062 armci_init_cbuf_srdma(&sdscr,&ssg_entry,src_buf,dst_buf,len,NULL,NULL);
4063 sdscr.wr.rdma.rkey = *rkey;
4064 ssg_entry.lkey = *lkey;
4065
4066 sdscr.wr_id = dst+armci_nproc;
4067 struct ibv_send_wr *bad_wr;
4068 rc = ibv_post_send((CLN_con+dst)->qp, &sdscr, &bad_wr);
4069 dassert1(1,rc==0,rc);
4070
4071 while (rc == 0) {
4072 rc = ibv_poll_cq(CLN_nic->scq, 1, pdscr);
4073 }
4074 dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d\n",
4075 armci_me,rc,(int)pdscr->wr_id,pdscr->status));
4076 dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);;
4077 }
4078
4079
4080
armci_send_contig_bypass(int proc,request_header_t * msginfo,void * src_ptr,void * rem_ptr,int bytes)4081 void armci_send_contig_bypass(int proc, request_header_t *msginfo,
4082 void *src_ptr, void *rem_ptr, int bytes)
4083 {
4084 int *last;
4085 uint32_t *lkey=NULL;
4086 uint32_t *rkey;
4087 int dscrlen = msginfo->dscrlen;
4088
4089 last = (int*)(((char*)(src_ptr)) + (bytes - sizeof(int)));
4090 dassertp(1,msginfo->pinned,("%d: not pinned proc=%d",armci_me,proc));
4091
4092 rkey = (uint32_t *)((char *)(msginfo+1)+dscrlen-(sizeof(uint32_t)+sizeof(uint32_t)));
4093
4094 if(DEBUG_SERVER){
4095 printf("%d(server): sending data bypass to %d (%p,%p) %d %d\n", armci_me,
4096 msginfo->from,src_ptr, rem_ptr,*lkey,*rkey);
4097 fflush(stdout);
4098 }
4099 armci_server_direct_send(msginfo->from,src_ptr,rem_ptr,bytes,lkey,rkey);
4100
4101 if(*last == ARMCI_STAMP){
4102 SERVER_SEND_ACK(msginfo->from);
4103 }
4104 }
4105
armci_rcv_strided_data_bypass_both(int proc,request_header_t * msginfo,void * ptr,int * count,int stride_levels)4106 void armci_rcv_strided_data_bypass_both(int proc, request_header_t *msginfo,
4107 void *ptr, int *count, int stride_levels)
4108 {
4109 int datalen = msginfo->datalen;
4110 int *last;
4111 long *ack;
4112 int loop=0;
4113
4114 if(DEBUG_CLN){ printf("%d:rcv_strided_data_both bypass from %d\n",
4115 armci_me, proc); fflush(stdout);
4116 }
4117 if(!stride_levels){
4118 last = (int*)(((char*)(ptr)) + (count[0] -sizeof(int)));
4119 ack = (long *)&msginfo->tag;
4120 while(armci_util_int_getval(last) == ARMCI_STAMP &&
4121 armci_util_long_getval(ack) != ARMCI_STAMP){
4122 loop++;
4123 loop %=1000000;
4124 if(loop==0){
4125 if(DEBUG_CLN){
4126 printf("%d: client last(%p)=%d ack(%p)=%ld off=%d\n",
4127 armci_me,(void*)last,*last,(void*)ack,*ack,(int)((char*)last - (char*)ptr));
4128 fflush(stdout);
4129 }
4130 }
4131 }
4132 }
4133 else {
4134 printf("\n%d:rcv_strided_data called, it should never be called\n",armci_me);
4135 armci_dscrlist_recv_complete(0,"armci_rcv_strided_data_bypass_both",NULL);
4136 }
4137
4138 if(DEBUG_CLN){printf("%d:rcv_strided_data bypass both: %d bytes from %d\n",
4139 armci_me, datalen, proc); fflush(stdout);
4140 }
4141 }
4142
4143
armci_pin_memory(void * ptr,int stride_arr[],int count[],int strides)4144 int armci_pin_memory(void *ptr, int stride_arr[], int count[], int strides)
4145 {
4146 fprintf(stderr, "[%d]:armci_pin_memory not implemented\n",armci_me);
4147 fflush(stderr);
4148 return 0;
4149 }
4150
4151
armci_client_send_ack(int proc,int n)4152 void armci_client_send_ack(int proc, int n)
4153 {
4154 printf("\n%d:client_send_ack not implemented",armci_me);fflush(stdout);
4155 }
4156
4157
armci_rcv_strided_data_bypass(int proc,request_header_t * msginfo,void * ptr,int stride_levels)4158 void armci_rcv_strided_data_bypass(int proc, request_header_t* msginfo,
4159 void *ptr, int stride_levels)
4160 {
4161 printf("\n%d:armci_rcv_strided_data_bypass not implemented",armci_me);
4162 fflush(stdout);
4163 }
4164
4165
armci_unpin_memory(void * ptr,int stride_arr[],int count[],int strides)4166 void armci_unpin_memory(void *ptr, int stride_arr[], int count[], int strides)
4167 {
4168 printf("\n%d:armci_unpin_memory not implemented",armci_me);fflush(stdout);
4169 }
4170
4171
armcill_server_wait_ack(int proc,int n)4172 int armcill_server_wait_ack(int proc, int n)
4173 {
4174 printf("\n%d:armcill_server_wait_ack not implemented",armci_me);
4175 fflush(stdout);
4176 return(0);
4177 }
4178
4179
armcill_server_put(int proc,void * s,void * d,int len)4180 void armcill_server_put(int proc, void* s, void *d, int len)
4181 {
4182 printf("\n%d:armcill_server_put not implemented",armci_me);fflush(stdout);
4183 }
4184
4185
4186 /*\
4187 * initialising the atomic send descriptor
4188 \*/
armci_init_vapibuf_atomic(struct ibv_send_wr * sd,struct ibv_sge * sg,int op,int * ploc,int * prem,int extra,int id,ARMCI_MEMHDL_T * lhandle,ARMCI_MEMHDL_T * rhandle)4189 void armci_init_vapibuf_atomic(struct ibv_send_wr *sd, struct ibv_sge *sg,
4190 int op, int*ploc,int *prem, int extra,
4191 int id,ARMCI_MEMHDL_T *lhandle,
4192 ARMCI_MEMHDL_T *rhandle)
4193 {
4194 if (1) {
4195 printf("%d(c) : entered armci_init_vapibuf_atomic\n",armci_me);
4196 fflush(stdout);
4197 }
4198 memset(sd,0,sizeof(struct ibv_send_wr));
4199 if (op == ARMCI_FETCH_AND_ADD_LONG ) {
4200 printf("%d(c) :setting opcode for snd dscr to FETCH_AND_ADD\n",armci_me);
4201 sd->opcode = IBV_WR_ATOMIC_FETCH_AND_ADD;
4202 sd->wr.atomic.compare_add = (uint64_t)extra;
4203 } else if(op == ARMCI_SWAP_LONG){
4204 sd->opcode = IBV_WR_ATOMIC_CMP_AND_SWP;
4205 sd->wr.atomic.swap = (uint64_t)extra;
4206 }
4207 sd->send_flags = IBV_SEND_SIGNALED;
4208 sg->length = 8; /* 64 bit atomic*/
4209 printf("--------\n");
4210 sg->addr= (uint64_t)(void *)ploc;
4211 if(lhandle)
4212 sg->lkey = lhandle->lkey;
4213 sd->sg_list = sg;
4214 sd->num_sge = 1;
4215 sd->wr.atomic.remote_addr = (uint64_t)(void *)prem;
4216 if(rhandle)
4217 sd->wr.atomic.rkey = rhandle->rkey; /* how do we get the remote key */
4218 sd->wr_id = DSCRID_RMW + armci_me;
4219
4220 if(1){
4221 printf("%d(c) : finished initialising atomic send desc id is %ld,armci_ime = %d\n",armci_me,sd->wr_id,armci_me);
4222 fflush(stdout);
4223 }
4224 }
4225 /*\
4226 * using vapi remote atomic operations
4227 \*/
client_rmw_complete(struct ibv_send_wr * snd_dscr,char * from)4228 void client_rmw_complete(struct ibv_send_wr *snd_dscr, char *from)
4229 {
4230 int rc = 0;
4231 struct ibv_wc pdscr1;
4232 struct ibv_wc *pdscr=&pdscr1;
4233
4234 printf("%d(c) : inside client_rmw_complete\n",armci_me);
4235 do {
4236 while (rc == 0) {
4237 rc = ibv_poll_cq(CLN_nic->scq, 1, pdscr);
4238 }
4239 dassertp(DBG_POLL|DBG_ALL,rc>=0,
4240 ("%d: rc=%d id=%d status=%d\n",
4241 armci_me,rc,pdscr->wr_id,pdscr->status));
4242 dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
4243 rc = 0;
4244 } while(pdscr->wr_id != snd_dscr->wr_id);
4245 }
4246
4247
armci_direct_rmw(int op,int * ploc,int * prem,int extra,int proc,ARMCI_MEMHDL_T * lhandle,ARMCI_MEMHDL_T * rhandle)4248 void armci_direct_rmw(int op, int*ploc, int *prem, int extra, int proc,
4249 ARMCI_MEMHDL_T *lhandle, ARMCI_MEMHDL_T *rhandle)
4250 {
4251 int rc = 0;
4252 struct ibv_send_wr *sd;
4253 struct ibv_sge *sg;
4254 armci_connect_t *con;
4255
4256 con = CLN_con+proc;
4257
4258 sd = &(rmw[armci_me].rmw_dscr);
4259 sg = &(rmw[armci_me].rmw_entry);
4260
4261 if (1) {
4262 printf("%d(c) : about to call armci_init_vapibuf_atomic\n",armci_me);
4263 fflush(stdout);
4264 }
4265
4266 armci_init_vapibuf_atomic(sd, sg, op,ploc,prem,extra,proc,lhandle,rhandle);
4267
4268 if (1) {
4269 printf("%d(c) : finished armci_init_vapibuf_atomic\n",armci_me);
4270 fflush(stdout);
4271 }
4272
4273 struct ibv_send_wr * bad_wr;
4274 rc = ibv_post_send(con->qp, sd, &bad_wr);
4275 dassert1(1,rc==0,rc);
4276
4277 if (1) {
4278 printf("%d(c) : finished posting desc\n",armci_me);
4279 fflush(stdout);
4280 }
4281
4282 /*armci_send_complete(sd,"send_remote_atomic");*/
4283 client_rmw_complete(sd,"send_remote_atomic");
4284
4285 return;
4286 }
4287
4288 struct node *dto_q = NULL;
4289
process_con_break_from_client(armci_ud_rank * h,cbuf * v)4290 void process_con_break_from_client(armci_ud_rank *h, cbuf *v)
4291 {
4292
4293 armci_connect_t *con = CLN_con + h->src_rank;
4294
4295 cbuf *v1 = get_cbuf();
4296
4297 assert( v1 != NULL);
4298
4299 v1->desc.u.sr.wr.ud.remote_qpn = rbuf.qp_num[h->src_rank];
4300 v1->desc.u.sr.wr.ud.remote_qkey = 0;
4301 v1->desc.u.sr.wr.ud.ah = conn.ud_ah[h->src_rank];
4302
4303 armci_ud_rank *h1 = (armci_ud_rank *)CBUF_BUFFER_START(v1);
4304 h1->src_rank = armci_me;
4305 h1->qpnum = con->sqpnum;
4306 h1->msg_type = QP_CON_BREAK_FROM_SERVER;
4307 cbuf_init_send(v1, sizeof(armci_ud_rank));
4308
4309 /* Release the receiving cbuf */
4310 release_cbuf(v);
4311
4312 struct ibv_send_wr *send_wr;
4313 if(ibv_post_send(conn.qp[0], &(v1->desc.u.sr), &send_wr)) {
4314 fprintf(stderr, "Error posting send\n");
4315 }
4316 }
4317
4318
process_con_break_from_server(armci_ud_rank * h,cbuf * v)4319 void process_con_break_from_server(armci_ud_rank *h, cbuf *v)
4320 {
4321
4322 armci_connect_t *con = SRV_con + armci_clus_id(h->src_rank);
4323 con->state = QP_INACTIVE;
4324 release_cbuf(v);
4325 }
4326
process_recv_completion_from_client(armci_ud_rank * h,cbuf * v)4327 void process_recv_completion_from_client(armci_ud_rank *h, cbuf *v)
4328 {
4329
4330 struct ibv_qp_attr qp_attr;
4331 enum ibv_qp_attr_mask qp_attr_mask;
4332 struct ibv_recv_wr *bad_wr;
4333
4334 int rc, j;
4335
4336 static int qp_flag = 1;
4337
4338 if (qp_flag == 1) {
4339 total_active_conn_to_client = 0;
4340 qp_flag = 0;
4341 }
4342
4343 total_active_conn_to_client++;
4344
4345 armci_connect_t *con = CLN_con + h->src_rank;
4346
4347 if (con->state == QP_ACTIVE) {
4348
4349 qp_attr_mask = IBV_QP_STATE;
4350
4351 memset(&qp_attr, 0, sizeof qp_attr);
4352 qp_attr.qp_state = IBV_QPS_SQD;
4353
4354 if (ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask)) {
4355 fprintf(stdout," Error modifying QP\n");
4356 fflush(stdout);
4357 }
4358
4359 if (ibv_destroy_qp(con->qp)) {
4360 printf("Error destroying QP\n");
4361 }
4362
4363 total_active_conn_to_client--;
4364 }
4365 armci_create_qp(CLN_nic, &con->qp);
4366 con->sqpnum = con->qp->qp_num;
4367 con->lid = CLN_nic->lid_arr[h->src_rank];
4368 CLN_rqpnumtmpbuf[h->src_rank] = con->qp->qp_num;
4369 qp_attr_mask = IBV_QP_STATE
4370 | IBV_QP_PKEY_INDEX
4371 | IBV_QP_PORT
4372 | IBV_QP_ACCESS_FLAGS;
4373
4374 memset(&qp_attr, 0, sizeof qp_attr);
4375 qp_attr.qp_state = IBV_QPS_INIT;
4376 qp_attr.pkey_index = DEFAULT_PKEY_IX;
4377 qp_attr.port_num = CLN_nic->active_port;
4378 qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE|
4379 IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
4380
4381 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4382
4383 memset(&qp_attr, 0, sizeof qp_attr);
4384 qp_attr_mask = IBV_QP_STATE
4385 | IBV_QP_MAX_DEST_RD_ATOMIC
4386 | IBV_QP_PATH_MTU
4387 | IBV_QP_RQ_PSN
4388 | IBV_QP_MIN_RNR_TIMER;
4389 qp_attr.qp_state = IBV_QPS_RTR;
4390 qp_attr.path_mtu = IBV_MTU_1024; /*MTU*/
4391 qp_attr.max_dest_rd_atomic = 4;
4392 qp_attr.min_rnr_timer = RNR_TIMER;
4393 qp_attr.rq_psn = 0;
4394
4395 /* Fill in the information from the header */
4396 SRV_rqpnums[h->src_rank] = h->qpnum;
4397
4398 qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
4399 qp_attr.dest_qp_num = SRV_rqpnums[h->src_rank];
4400 qp_attr.ah_attr.dlid = SRV_nic->lid_arr[h->src_rank];
4401 qp_attr.ah_attr.port_num = CLN_nic->active_port;
4402
4403 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4404
4405 memset(&qp_attr, 0, sizeof qp_attr);
4406
4407 qp_attr_mask = IBV_QP_STATE
4408 | IBV_QP_SQ_PSN
4409 | IBV_QP_TIMEOUT
4410 | IBV_QP_RETRY_CNT
4411 | IBV_QP_RNR_RETRY
4412 | IBV_QP_MAX_QP_RD_ATOMIC;
4413
4414 qp_attr.qp_state = IBV_QPS_RTS;
4415 qp_attr.sq_psn = 0;
4416 qp_attr.timeout = 18;
4417 qp_attr.retry_cnt = 7;
4418 qp_attr.rnr_retry = 7;
4419 qp_attr.max_rd_atomic = 4;
4420 rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
4421
4422 #if defined(PEND_BUFS)
4423 assert(armci_nproc*(IMM_BUF_NUM+1)<DSCRID_IMMBUF_RECV_END-DSCRID_IMMBUF_RECV);
4424 for(j=0; j<IMM_BUF_NUM+1; j++) {
4425 vapibuf_t *cbuf;
4426 cbuf = serv_buf_arr[h->src_rank*(IMM_BUF_NUM+1)+j];
4427 armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
4428 IMM_BUF_LEN, &serv_memhandle);
4429 cbuf->dscr.wr_id = h->src_rank*(IMM_BUF_NUM+1)+j + DSCRID_IMMBUF_RECV;
4430 if (DEBUG_SERVER) {
4431 printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
4432 fflush(stdout);
4433 }
4434 if (armci_use_srq) {
4435 rc = ibv_post_srq_recv(CLN_srq_hndl, &cbuf->dscr, &bad_wr);
4436 }
4437 else {
4438 rc = ibv_post_recv((CLN_con + h->src_rank)->qp, &cbuf->dscr, &bad_wr);
4439 }
4440
4441 dassert1(1,rc==0,rc);
4442 }
4443 #else
4444 int i;
4445 for(i = 0; i < armci_nproc; i++) {
4446 vapibuf_t *cbuf;
4447 cbuf = serv_buf_arr[h->src_rank];
4448 armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
4449 CBUF_DLEN, &serv_memhandle);
4450 cbuf->dscr.wr_id = h->src_rank+armci_nproc;
4451 if (DEBUG_SERVER) {
4452 printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
4453 fflush(stdout);
4454 }
4455 if (armci_use_srq) {
4456 rc = ibv_post_srq_recv(CLN_srq_hndl, &cbuf->dscr, &bad_wr);
4457 }
4458 else {
4459 rc = ibv_post_recv((CLN_con+h->src_rank)->qp, &cbuf->dscr, &bad_wr);
4460 }
4461 dassert1(1,rc==0,rc);
4462 }
4463 #endif
4464
4465 /* Now send back the information */
4466
4467 struct cbuf *v1 = get_cbuf();
4468
4469 assert( v1 != NULL);
4470
4471 v1->desc.u.sr.wr.ud.remote_qpn = rbuf.qp_num[h->src_rank];
4472 v1->desc.u.sr.wr.ud.remote_qkey = 0;
4473 v1->desc.u.sr.wr.ud.ah = conn.ud_ah[h->src_rank];
4474
4475 armci_ud_rank *h1 = (armci_ud_rank *)CBUF_BUFFER_START(v1);
4476 h1->src_rank = armci_me;
4477 h1->qpnum = con->sqpnum;
4478 h1->msg_type = 2;
4479 cbuf_init_send(v1, sizeof(armci_ud_rank));
4480
4481 /* Release the receiving cbuf */
4482 release_cbuf(v);
4483
4484 struct ibv_send_wr *send_wr;
4485 if(ibv_post_send(conn.qp[0], &(v1->desc.u.sr), &send_wr)) {
4486 fprintf(stderr, "Error posting send\n");
4487 }
4488
4489 con->state = QP_ACTIVE;
4490 }
4491
progress_engine()4492 void progress_engine()
4493 {
4494 int ne;
4495 struct ibv_wc wc;
4496 void *cbuf_addr;
4497 cbuf *v;
4498
4499 do {
4500 ne = ibv_poll_cq(hca.cq, 1, &wc);
4501 } while (ne < 1);
4502
4503 /* Okay, got an entry, check for errors */
4504 if(ne < 0) {
4505 fprintf(stderr,"Error Polling CQ\n");
4506 }
4507
4508 if(wc.status != IBV_WC_SUCCESS) {
4509 fprintf(stderr, "[%d] Failed status %d\n",
4510 armci_me, wc.status);
4511 }
4512 cbuf_addr = (void *) ((aint_t) wc.wr_id);
4513 assert(cbuf_addr != NULL);
4514 v = (cbuf *)cbuf_addr;
4515 armci_ud_rank *h = (armci_ud_rank *)(CBUF_BUFFER_START(v) + 40);
4516
4517 if(IBV_WC_SEND == wc.opcode
4518 || IBV_WC_RDMA_WRITE == wc.opcode) {
4519 release_cbuf(v);
4520 /* Do nothing, just release the cbuf */
4521 /* Send Completion */
4522 } else if (IBV_WC_RECV == wc.opcode) {
4523 /* Recv completion */
4524 post_recv();
4525
4526 /* Check if the message received is from a data server */
4527 assert((h->msg_type == QP_CON_REQ)
4528 || (h->msg_type == QP_CON_ACK)
4529 || (h->msg_type == QP_CON_BREAK_FROM_CLIENT)
4530 || (h->msg_type == QP_CON_BREAK_FROM_SERVER));
4531 if (h->msg_type == QP_CON_REQ) {
4532 process_recv_completion_from_client(h, v);
4533 }
4534 else if (h->msg_type == QP_CON_BREAK_FROM_CLIENT) {
4535 process_con_break_from_client(h, v);
4536 }
4537 else if (h->msg_type == QP_CON_BREAK_FROM_SERVER) {
4538 process_con_break_from_server(h, v);
4539 }
4540 else {
4541 process_recv_completion_from_server(h, v);
4542 }
4543 } else {
4544 fprintf(stderr, "Unknown opcode recv'd\n");
4545 }
4546
4547 }
4548
4549
async_thread_ud_events(void * context)4550 void* async_thread_ud_events(void *context)
4551 {
4552 while (1) {
4553 progress_engine();
4554 }
4555
4556 return NULL;
4557 }
4558
4559
4560 #if 0
4561 static struct ibv_srq *create_srq(vapi_nic_t *nic)
4562 {
4563 struct ibv_srq_init_attr srq_init_attr;
4564 struct ibv_srq *srq_ptr = NULL;
4565
4566 memset(&srq_init_attr, 0, sizeof(srq_init_attr));
4567
4568 srq_init_attr.srq_context = nic->handle;
4569 #ifdef PEND_BUFS
4570 srq_init_attr.attr.max_wr = armci_nproc * (IMM_BUF_NUM + 1) + 200;
4571 #else
4572 srq_init_attr.attr.max_wr = armci_nproc + 200;
4573 #endif
4574 srq_init_attr.attr.max_sge = 1;
4575 /* The limit value should be ignored during SRQ create */
4576 srq_init_attr.attr.srq_limit = 30;
4577
4578 srq_ptr = ibv_create_srq(nic->ptag, &srq_init_attr);
4579 return srq_ptr;
4580 }
4581 #endif
4582
init_params(void)4583 int init_params(void)
4584 {
4585 conn.qp = (struct ibv_qp **) malloc(armci_nproc * sizeof(struct ibv_qp *));
4586 conn.lid = (uint16_t *) malloc(armci_nproc * sizeof(int));
4587 conn.qp_num = (uint32_t *) malloc(armci_nproc * sizeof(int));
4588 conn.ud_ah = (struct ibv_ah **) malloc (armci_nproc * sizeof (struct ibv_ah *));
4589 conn.status = (int *) malloc(armci_nproc * sizeof(int));
4590
4591 memset((void *)conn.status, 0, sizeof(int) * armci_nproc);
4592
4593 rbuf.qp_num = (uint32_t *) malloc(armci_nproc * sizeof(int));
4594 rbuf.lid = (uint16_t *) malloc(armci_nproc * sizeof(int));
4595 rbuf.rkey = (uint32_t *) malloc(armci_nproc * sizeof(int));
4596 rbuf.buf = (char **) malloc(armci_nproc * sizeof(char *));
4597
4598 assert(conn.qp && conn.lid && rbuf.qp_num && rbuf.lid && rbuf.rkey && rbuf.buf);
4599 return 0;
4600 }
4601
4602
open_hca(void)4603 int open_hca(void)
4604 {
4605 struct ibv_device **dev_list;
4606
4607 int num_hcas;
4608 dev_list = ibv_get_device_list(&num_hcas);
4609
4610
4611 hca.ib_dev = dev_list[0];
4612
4613 hca.context = ibv_open_device(hca.ib_dev);
4614
4615 if(!hca.context) {
4616 fprintf(stderr,"Couldn't get context %s\n",
4617 ibv_get_device_name(hca.ib_dev));
4618 return 1;
4619 }
4620
4621 hca.pd = ibv_alloc_pd(hca.context);
4622
4623 if(!hca.pd) {
4624 fprintf(stderr,"Couldn't get pd %s\n",
4625 ibv_get_device_name(hca.ib_dev));
4626 return 1;
4627 }
4628 return 0;
4629 }
4630
create_cq(void)4631 int create_cq(void)
4632 {
4633 hca.cq = ibv_create_cq(hca.context, 16000, NULL,
4634 NULL, 0);
4635 if(!hca.cq) {
4636 fprintf(stderr, "Couldn't create CQ\n");
4637 return 1;
4638 }
4639 if (armci_me == armci_master) {
4640 pthread_create(&armci_async_thread[1], NULL,
4641 &async_thread_ud_events, (void *) hca.context);
4642 }
4643
4644 return 0;
4645 }
4646
get_lid(void)4647 int get_lid(void)
4648 {
4649 struct ibv_port_attr port_attr[2];
4650 int i, j;
4651
4652 for (j = 1; j <= 1; j++) {
4653 if (!ibv_query_port(hca.context, j, &port_attr[j - 1])
4654 && (port_attr[j - 1].state == IBV_PORT_ACTIVE)) {
4655 for (i = 0; i < armci_nproc; i++)
4656 conn.lid[i] = port_attr[j - 1].lid;
4657 return 0;
4658 }
4659 }
4660
4661 return 1;
4662 }
4663
exch_addr(void)4664 int exch_addr(void)
4665 {
4666 MPI_Allgather((void *)conn.qp_num, sizeof(uint32_t), MPI_BYTE,
4667 (void *)rbuf.qp_num, sizeof(uint32_t), MPI_BYTE, ARMCI_COMM_WORLD);
4668 MPI_Alltoall((void *)conn.lid, sizeof(uint16_t), MPI_BYTE,
4669 (void *)rbuf.lid, sizeof(uint16_t), MPI_BYTE, ARMCI_COMM_WORLD);
4670
4671 #ifdef DEBUG
4672 for (i = 0; i < nprocs; i++) {
4673 if (me == i)
4674 continue;
4675 fprintf(stdout,"[%d] Remote QP %d, Remote LID %u, Rkey %u, Lkey %u\n"
4676 " LBuf %p, RBuf %p\n",
4677 me, rbuf.qp_num[i], rbuf.lid[i], rbuf.rkey[i], lbuf.mr->lkey,
4678 lbuf.buf, rbuf.buf[i]);
4679 fflush(stdout);
4680 }
4681 #endif
4682
4683 return 0;
4684 }
4685
create_qp(void)4686 int create_qp(void)
4687 {
4688 struct ibv_qp_attr qp_attr;
4689
4690 memset(&qp_attr, 0, sizeof qp_attr);
4691
4692 struct ibv_qp_init_attr attr = {
4693 .send_cq = hca.cq,
4694 .recv_cq = hca.cq,
4695 .cap = {
4696 .max_send_wr = 8192,
4697 .max_recv_wr = 8192,
4698 .max_send_sge = 1,
4699 .max_recv_sge = 1,
4700 .max_inline_data = 0
4701 },
4702 .qp_type = IBV_QPT_UD
4703 };
4704
4705 conn.qp[0] = ibv_create_qp(hca.pd, &attr);
4706 if(!conn.qp[0]) {
4707 fprintf(stderr,"Couldn't create QP\n");
4708 return 1;
4709 }
4710
4711 conn.qp_num[0] = conn.qp[0]->qp_num;
4712 qp_attr.qp_state = IBV_QPS_INIT;
4713 qp_attr.pkey_index = 0;
4714 qp_attr.port_num = 1;
4715 qp_attr.qkey = 0;
4716
4717 if(ibv_modify_qp(conn.qp[0], &qp_attr,
4718 IBV_QP_STATE |
4719 IBV_QP_PKEY_INDEX |
4720 IBV_QP_PORT |
4721 IBV_QP_QKEY)) {
4722 fprintf(stderr,"Could not modify QP to INIT\n");
4723 return 1;
4724 }
4725 #ifdef DEBUG
4726 fprintf(stdout,"[%d] Created QP %d, LID %d\n", me,
4727 conn.qp_num[0], conn.lid[0]);
4728 fflush(stdout);
4729 #endif
4730
4731 return 0;
4732 }
armci_register_memory(void * buf,int len)4733 struct ibv_mr* armci_register_memory(void* buf, int len)
4734 {
4735 return (ibv_reg_mr(hca.pd, buf, len,
4736 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE |
4737 IBV_ACCESS_REMOTE_READ));
4738 }
4739
connect_qp(void)4740 int connect_qp(void)
4741 {
4742 struct ibv_qp_attr attr;
4743 int i;
4744 memset(&attr, 0 , sizeof attr);
4745
4746 attr.qp_state = IBV_QPS_RTR;
4747 if (ibv_modify_qp(conn.qp[0], &attr,
4748 IBV_QP_STATE)) {
4749 fprintf(stderr, "Failed to modify QP to RTR\n");
4750 return 1;
4751 }
4752
4753 attr.qp_state = IBV_QPS_RTS;
4754 attr.sq_psn = 0;
4755 if (ibv_modify_qp(conn.qp[0], &attr,
4756 IBV_QP_STATE |
4757 IBV_QP_SQ_PSN)) {
4758 fprintf(stderr, "Failed to modify QP to RTS\n");
4759 return 1;
4760 }
4761 for (i = 0; i < armci_nproc; i++) {
4762 struct ibv_ah_attr ah_attr;
4763 memset(&ah_attr, 0, sizeof(ah_attr));
4764 ah_attr.is_global = 0;
4765 ah_attr.dlid = rbuf.lid[i];
4766 ah_attr.sl = 0;
4767 ah_attr.src_path_bits = 0;
4768 ah_attr.port_num = 1;
4769
4770 conn.ud_ah[i] = ibv_create_ah(hca.pd, &ah_attr);
4771
4772 if (!conn.ud_ah[i]) {
4773 fprintf(stderr, "Error creating address handles\n");
4774 }
4775 }
4776
4777 return 0;
4778 }
4779
4780 int total_ud_recv_buffers = 4096;
4781
4782
post_recv()4783 void post_recv()
4784 {
4785 cbuf *v = get_cbuf();
4786 cbuf_init_recv(v, CBUF_BUFFER_SIZE);
4787 struct ibv_recv_wr *bad_wr;
4788
4789 /* set id to be the global_rank */
4790 v->grank = -1;
4791
4792 if(ibv_post_recv(conn.qp[0], &(v->desc.u.rr), &bad_wr)) {
4793 fprintf(stderr," Error posting UD recv\n");
4794 fflush(stderr);
4795 }
4796 }
4797
4798
post_buffers()4799 int post_buffers()
4800 {
4801 init_cbuf_lock();
4802 allocate_cbufs(8192);
4803 int i;
4804 for (i = 0; i < total_ud_recv_buffers; i++) {
4805 post_recv();
4806 }
4807 return 0;
4808 }
4809
test_connectivity(void)4810 void test_connectivity(void)
4811 {
4812 int i;
4813 struct ibv_send_wr *bad_wr;
4814
4815 cbuf *v = NULL;
4816 for (i = 0; i < armci_nproc;i++) {
4817 if (armci_me == i)
4818 continue;
4819
4820 int j;
4821
4822 for (j = 0 ; j < 2; j++) {
4823 v = get_cbuf();
4824 assert(v != NULL);
4825 v->desc.u.sr.wr.ud.remote_qpn = rbuf.qp_num[i];
4826 v->desc.u.sr.wr.ud.remote_qkey = 0;
4827 v->desc.u.sr.wr.ud.ah = conn.ud_ah[i];
4828
4829 armci_ud_rank *h = (armci_ud_rank *)CBUF_BUFFER_START(v);
4830 h->src_rank = armci_me;
4831 cbuf_init_send(v, sizeof(armci_ud_rank));
4832 if(ibv_post_send(conn.qp[0], &(v->desc.u.sr), &bad_wr)) {
4833 fprintf(stderr, "Error posting send\n");
4834 }
4835 }
4836 }
4837 }
4838
4839 #if 0
4840 int recycle_dead_qp()
4841 {
4842 }
4843 #endif
4844
handle_network_fault(struct ibv_wc * pdscr)4845 void handle_network_fault(struct ibv_wc *pdscr)
4846 {
4847 #if 0
4848 recycle_dead_qp();
4849 #endif
4850 }
4851
4852
setup_ud_channel()4853 void setup_ud_channel()
4854 {
4855 if(init_params()) {
4856 }
4857
4858 if(open_hca()) {
4859 }
4860
4861 if(create_cq()) {
4862 }
4863
4864 if(get_lid()) {
4865 }
4866
4867 if(create_qp()) {
4868 }
4869
4870 if (exch_addr()) {
4871 }
4872
4873 if(connect_qp()) {
4874 }
4875
4876 MPI_Barrier(ARMCI_COMM_WORLD);
4877
4878 if(post_buffers()) {
4879 }
4880
4881 MPI_Barrier(ARMCI_COMM_WORLD);
4882 }
4883
4884
process_recv_completion_from_server(armci_ud_rank * h,cbuf * v)4885 void process_recv_completion_from_server(armci_ud_rank *h, cbuf *v)
4886 {
4887
4888 struct ibv_qp_attr qp_attr;
4889 enum ibv_qp_attr_mask qp_attr_mask;
4890
4891 int rc;
4892
4893 armci_connect_t *con = SRV_con + armci_clus_id(h->src_rank);
4894
4895 qp_attr_mask = IBV_QP_STATE
4896 | IBV_QP_PKEY_INDEX
4897 | IBV_QP_PORT
4898 | IBV_QP_ACCESS_FLAGS;
4899
4900 memset(&qp_attr, 0, sizeof qp_attr);
4901 qp_attr.qp_state = IBV_QPS_INIT;
4902 qp_attr.pkey_index = DEFAULT_PKEY_IX;
4903 qp_attr.port_num = SRV_nic->active_port;
4904 qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE|
4905 IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
4906
4907 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4908 assert(0 == rc);
4909
4910 memset(&qp_attr, 0, sizeof qp_attr);
4911 qp_attr_mask = IBV_QP_STATE
4912 | IBV_QP_MAX_DEST_RD_ATOMIC
4913 | IBV_QP_PATH_MTU
4914 | IBV_QP_RQ_PSN
4915 | IBV_QP_MIN_RNR_TIMER;
4916 qp_attr.qp_state = IBV_QPS_RTR;
4917 qp_attr.path_mtu = IBV_MTU_1024; /*MTU*/
4918 qp_attr.max_dest_rd_atomic = 4;
4919 qp_attr.min_rnr_timer = RNR_TIMER;
4920 qp_attr.rq_psn = 0;
4921
4922 /* Fill in the information from the header */
4923 CLN_rqpnums[h->src_rank] = h->qpnum;
4924
4925 qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
4926 qp_attr.dest_qp_num = CLN_rqpnums[h->src_rank];
4927 qp_attr.ah_attr.dlid = SRV_nic->lid_arr[h->src_rank];
4928 qp_attr.ah_attr.port_num = SRV_nic->active_port;
4929 rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4930 assert(0 == rc);
4931
4932 memset(&qp_attr, 0, sizeof qp_attr);
4933
4934 qp_attr_mask = IBV_QP_STATE
4935 | IBV_QP_SQ_PSN
4936 | IBV_QP_TIMEOUT
4937 | IBV_QP_RETRY_CNT
4938 | IBV_QP_RNR_RETRY
4939 | IBV_QP_MAX_QP_RD_ATOMIC;
4940
4941 qp_attr.qp_state = IBV_QPS_RTS;
4942 qp_attr.sq_psn = 0;
4943 qp_attr.timeout = 18;
4944 qp_attr.retry_cnt = 7;
4945 qp_attr.rnr_retry = 7;
4946 qp_attr.max_rd_atomic = 4;
4947
4948 rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
4949 assert(0 == rc);
4950 release_cbuf(v);
4951
4952 con->state = QP_ACTIVE;
4953 }
4954
4955 struct con_q_t {
4956 struct armci_connect_t *head;
4957 void *next;
4958 };
4959
4960 struct con_q_t *con_q;
4961
4962 #define MAX_CLIENT_TO_SERVER_CONN 2
4963
4964
dequeue_conn()4965 armci_connect_t * dequeue_conn()
4966 {
4967 return NULL;
4968 }
4969
get_max_client_to_server_conn()4970 int get_max_client_to_server_conn()
4971 {
4972 static int env_read_flag = 0;
4973
4974 static int max_client_to_server_conn = 0;
4975 if (!env_read_flag) {
4976 char *value;
4977 if ((value = getenv("ARMCI_MAX_CLIENT_TO_SERVER_FACTOR")) != NULL){
4978 max_client_to_server_conn = armci_nclus/atoi(value);
4979 /* We need a minimum of 4 connections */
4980 if (max_client_to_server_conn <= 1)
4981 max_client_to_server_conn = 2;
4982
4983 if (armci_me == 0) {
4984 fprintf(stdout, "max_client_to_server_conn[%d]\n",
4985 max_client_to_server_conn);
4986 }
4987 } else {
4988 max_client_to_server_conn = armci_nclus / 2;
4989 if (max_client_to_server_conn <= 1)
4990 max_client_to_server_conn = 2;
4991 }
4992 }
4993
4994 return max_client_to_server_conn;
4995
4996 }
4997
get_the_victim_connection()4998 int get_the_victim_connection()
4999 {
5000 return 0;
5001 }
5002
break_a_connection_if_needed()5003 void break_a_connection_if_needed()
5004 {
5005 assert(!SERVER_CONTEXT);
5006
5007 if (!armci_use_lazy_break)
5008 return;
5009
5010 int max_client_to_server_conn = get_max_client_to_server_conn();
5011
5012 if ((total_active_conn_to_server >=
5013 max_client_to_server_conn) && (armci_me != armci_master)) {
5014
5015 int proc, clus_id;
5016 do {
5017 proc = get_the_victim_connection();
5018 /* Not enough on the queue */
5019 if (proc == -1)
5020 return;
5021 clus_id = armci_clus_id(proc);
5022 } while ((armci_clus_me == clus_id) ||
5023 ((SRV_con + clus_id)->state != QP_ACTIVE));
5024
5025 ARMCI_WaitAll();
5026
5027 ARMCI_Fence(proc);
5028
5029 struct ibv_qp_attr qp_attr;
5030 enum ibv_qp_attr_mask qp_attr_mask;
5031
5032 armci_connect_t *con = SRV_con + clus_id;
5033
5034 dassert(1, con->state == QP_ACTIVE);
5035 qp_attr_mask = IBV_QP_STATE;
5036
5037 memset(&qp_attr, 0, sizeof qp_attr);
5038 qp_attr.qp_state = IBV_QPS_SQD;
5039
5040 if (ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask)) {
5041 fprintf(stdout," Error modifying QP\n");
5042 fflush(stdout);
5043 }
5044 cbuf *v = get_cbuf();
5045 assert(v != NULL);
5046 v->desc.u.sr.wr.ud.remote_qpn =
5047 rbuf.qp_num[armci_clus_info[clus_id].master];
5048 v->desc.u.sr.wr.ud.remote_qkey = 0;
5049 v->desc.u.sr.wr.ud.ah = conn.ud_ah[armci_clus_info[clus_id].master];
5050
5051 armci_ud_rank *h = (armci_ud_rank *)CBUF_BUFFER_START(v);
5052 h->src_rank = armci_me;
5053 h->qpnum = con->sqpnum;
5054
5055 h->msg_type = QP_CON_BREAK_FROM_CLIENT;
5056
5057 struct ibv_send_wr *bad_wr;
5058
5059 cbuf_init_send(v, sizeof(armci_ud_rank));
5060
5061 if(ibv_post_send(conn.qp[0], &(v->desc.u.sr), &bad_wr)) {
5062 printf("Error posting send\n");
5063 }
5064
5065
5066 while (con->state != QP_INACTIVE) {
5067 if (armci_me != armci_master)
5068 progress_engine();
5069 }
5070 if (ibv_destroy_qp(con->qp)) {
5071 printf("Error destroying QP\n");
5072 }
5073 }
5074 }
5075
5076
check_state_of_ib_connection(int proc,int force)5077 void check_state_of_ib_connection(int proc, int force)
5078 {
5079 int clus_id;
5080 armci_connect_t *con;
5081
5082 /* return if ARMCI does not use on demand connection management */
5083 if (!armci_use_odcm)
5084 return;
5085
5086 static int flag = 1;
5087
5088 if (flag == 1) {
5089 total_active_conn_to_server = 0;
5090 total_breaks = 0;
5091 flag = 0;
5092 }
5093 /* Check clus id */
5094 clus_id = armci_clus_id(proc);
5095
5096 con = SRV_con + clus_id;
5097
5098 assert(!SERVER_CONTEXT);
5099
5100 if (con->state != QP_ACTIVE) {
5101
5102 if (!force)
5103 break_a_connection_if_needed();
5104
5105 total_active_conn_to_server++;
5106
5107 armci_create_qp(SRV_nic, &con->qp);
5108 con->sqpnum = con->qp->qp_num;
5109 con->lid = SRV_nic->lid_arr[clus_id];
5110
5111 cbuf *v = get_cbuf();
5112 assert(v != NULL);
5113 v->desc.u.sr.wr.ud.remote_qpn =
5114 rbuf.qp_num[armci_clus_info[clus_id].master];
5115 v->desc.u.sr.wr.ud.remote_qkey = 0;
5116 v->desc.u.sr.wr.ud.ah = conn.ud_ah[armci_clus_info[clus_id].master];
5117
5118 armci_ud_rank *h = (armci_ud_rank *)CBUF_BUFFER_START(v);
5119 h->src_rank = armci_me;
5120 h->qpnum = con->sqpnum;
5121
5122 h->msg_type = QP_CON_REQ;
5123
5124 struct ibv_send_wr *bad_wr;
5125
5126 cbuf_init_send(v, sizeof(armci_ud_rank));
5127
5128 if(ibv_post_send(conn.qp[0], &(v->desc.u.sr), &bad_wr)) {
5129 printf("Error posting send\n");
5130 }
5131
5132 if (armci_me != armci_master) {
5133 while (con->state != QP_ACTIVE) {
5134 progress_engine();
5135 }
5136 } else {
5137 while (con->state != QP_ACTIVE) {
5138 usleep(1);
5139 }
5140 }
5141 }
5142
5143 }
5144
5145
5146