1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 
5 /* $Id: openib.c,v 1.4.2.9 2007-10-18 06:08:03 d3h325 Exp $
6  *
7  * File organized as follows
8  */
9 #ifndef _GNU_SOURCE
10 #define _GNU_SOURCE
11 #endif
12 
13 #if HAVE_STDIO_H
14 #   include <stdio.h>
15 #endif
16 #if HAVE_STRINGS_H
17 #   include <strings.h>
18 #endif
19 #if HAVE_ASSERT_H
20 #   include <assert.h>
21 #endif
22 #if HAVE_UNISTD_H
23 #   include <unistd.h>
24 #endif
25 #if HAVE_STRING_H
26 #   include <string.h>
27 #endif
28 #include <mpi.h>
29 
30 #include "cbuf.h"
31 #include "armcip.h"
32 #include "copy.h"
33 #include "request.h"
34 #include "armci-vapi.h"
35 #include "iterator.h"
36 #define DEBUG_INIT 0
37 #define DEBUG_FINALIZE 0
38 #define DEBUG_SERVER 0
39 #define DEBUG_CLN 0
40 #define TIME_INIT 0
41 #  define VAPIDEV_NAME "InfiniHost0"
42 #  define INVAL_HNDL 0xFFFFFFFF
43 #define RNR_TIMER 12
44 
45 /*Debug macros used to tune what is being tested -- mostly openib calls*/
46 #define DBG_INIT  1
47 #define DBG_POLL  1
48 #define DBG_ALL   1
49 
50 #define QP_INACTIVE 5
51 #define QP_REQ_SENT 2
52 #define QP_ACK_RCVD 3
53 #define QP_ACTIVE 4
54 
55 u_int32_t armci_max_num_sg_ent;
56 u_int32_t armci_max_qp_ous_swr;
57 u_int32_t armci_max_qp_ous_rwr;
58 
59 typedef struct {
60    struct ibv_qp *qp;
61    uint32_t sqpnum;                /*we need to exchng qp nums,arr for that*/
62    uint16_t lid;
63    uint16_t state;
64    void *next;
65 } armci_connect_t;
66 
67 armci_connect_t *CLN_con, *SRV_con;
68 static uint32_t *SRV_rqpnums, *CLN_rqpnums; /*relevant rqp num arrs, to connect to svr and client*/
69 static uint32_t *CLN_rqpnumtmpbuf=NULL; /*temporary buf used during connection setup*/
70 /*\
71  * datastrucure for infinihost NIC
72 \*/
73 typedef struct {
74   uint16_t *lid_arr;                /*we need to exchange lids, arr for that*/
75   struct ibv_context *handle;       /*device context/handle*/
76   int maxtransfersize;
77   struct ibv_device_attr attr;      /*device properties*/
78   struct ibv_port_attr hca_port;    /*mostly for getting lid*/
79   uint8_t active_port;
80   struct ibv_pd *ptag;              /*protection tag*/
81   const char *vendor;
82   struct ibv_cq *scq;               /*send completion queue*/
83   struct ibv_cq *rcq;               /*recv completion queue*/
84   struct ibv_comp_channel *sch;     /*send completion channel*/
85   struct ibv_comp_channel *rch;     /*recv completion channel*/
86   void *scq_cntx;                   /*send context for completion queue*/
87   void *rcq_cntx;                   /*recv context for completion queue*/
88   int scv;                          /*send completion vector*/
89   int rcv;                          /*recv completion vector*/
90 } vapi_nic_t;
91 
92 typedef struct {
93   armci_vapi_memhndl_t *prem_handle; /*address server to store memory handle*/
94   armci_vapi_memhndl_t handle;
95 }ack_t;
96 
97 armci_vapi_memhndl_t *CLN_handle;
98 armci_vapi_memhndl_t serv_memhandle, client_memhandle;
99 armci_vapi_memhndl_t *handle_array;
100 armci_vapi_memhndl_t *pinned_handle;
101 
102 static vapi_nic_t nic_arr[3];
103 static vapi_nic_t *SRV_nic= nic_arr;
104 static vapi_nic_t *CLN_nic= nic_arr+1;
105 static int armci_server_terminating;
106 
107 #define NONE -1
108 static int armci_ack_proc=NONE;
109 
110 static int armci_vapi_server_ready;
111 static int armci_vapi_server_stage1=0;
112 static int armci_vapi_client_stage1=0;
113 static int armci_vapi_server_stage2=0;
114 static int armci_vapi_client_ready;
115 int _s=-1,_c=-1;
116 static int armci_vapi_max_inline_size=-1;
117 #define CLIENT_STAMP 101
118 #define SERV_STAMP 99
119 #define MAX_PROC_INLINE_SIZE 2048
120 
121 static char * client_tail;
122 static char * serv_tail;
123 static ack_t *SRV_ack;
124 
125 #if defined(PEND_BUFS)
126 typedef immbuf_t vapibuf_t;
127 typedef pendbuf_t vapibuf_pend_t;
128 #else
129 typedef struct {
130     struct ibv_recv_wr  dscr;
131     struct ibv_sge      sg_entry;
132     char buf[CBUF_DLEN];
133 } vapibuf_t;
134 #endif
135 
136 typedef struct {
137     struct ibv_send_wr  snd_dscr;
138     struct ibv_sge      ssg_entry;
139     struct ibv_recv_wr  rcv_dscr;
140     struct ibv_sge      rsg_entry;
141   char buf[VBUF_DLEN];
142 } vapibuf_ext_t;
143 
144 typedef struct {
145     struct ibv_send_wr  rmw_dscr;
146     struct ibv_sge      rmw_entry;
147 } vapirmw_t;
148 
149 unsigned int armci_use_odcm = 0;
150 unsigned int armci_use_lazy_break = 0;
151 unsigned int armci_use_apm = 0;
152 unsigned int armci_use_apm_test = 0;
153 unsigned int armci_use_srq = 0;
154 unsigned int armci_use_snft = 0;
155 unsigned int armci_use_affinity = 0;
156 
157 unsigned int armci_srq_size = 4096;
158 
159 pthread_t armci_async_thread[4];
160 
161 void async_thread_hca_events(void *ctx);
162 void* async_thread_ud_events(void *ctx);
163 
164 void init_apm_lock(void);
165 #if 0
166 static struct ibv_srq *create_srq(vapi_nic_t *nic);
167 #endif
168 
169 void setup_ud_channel(void);
170 
171 void process_recv_completion_from_server(armci_ud_rank *h, cbuf *v);
172 void process_recv_completion_from_client(armci_ud_rank *h, cbuf *v);
173 
174 struct ibv_srq      *CLN_srq_hndl;
175 struct ibv_srq      *SRV_srq_hndl;
176 void post_recv(void);
177 struct Remote_Buf
178 {
179     char            **buf;
180     uint32_t        *qp_num;
181     uint16_t        *lid;
182     uint32_t        *rkey;
183 };
184 
185 struct HCA
186 {
187     struct ibv_device   *ib_dev;
188     struct ibv_context  *context;
189     struct ibv_pd       *pd;
190     struct ibv_cq       *cq;
191     struct ibv_srq      *srq_hndl;
192     struct ibv_comp_channel *comp_channel;
193 };
194 
195 struct RC_Conn
196 {
197     struct ibv_qp       **qp;
198     uint16_t            *lid;
199     int                 *status;
200     uint32_t            *qp_num;
201     struct ibv_ah       **ud_ah;
202 };
203 
204 struct Remote_Buf rbuf;
205 
206 struct RC_Conn      conn;
207 
208 struct HCA          hca;
209 
210 void handle_network_fault(struct ibv_wc *pdscr);
211 
212 int process_recv_completion_from_client_flag;
213 
214 int total_active_conn_to_server, total_active_conn_to_client, total_breaks;
215 
216 void check_state_of_ib_connection(int a, int b);
217 
218 static vapibuf_t **serv_buf_arr;
219 #if !defined(PEND_BUFS)
220 /*These are typically used as spare buffers for communication. Since
221   we do not wait on completion anymore, we need to ensure things work
222   fine when these have in-flight messages. Disabled for now.*/
223 static vapibuf_t *spare_serv_buf, *spare_serv_bufptr;
224 static vapibuf_ext_t *serv_buf;
225 #endif
226 
227 static vapirmw_t rmw[64];
228 
229 static int *flag_arr; /* flag indicates its receiving scatter data */
230 #define SERV 2
231 #define CLN 1
232 
233 #define MAX_DESCR 2
234 typedef struct {
235     int avail;
236     struct ibv_qp *qp;
237     struct ibv_recv_wr *descr;
238 } descr_pool_t;
239 
240 static int* _gtmparr;
241 static void* test_ptr;
242 static int test_stride_arr[1];
243 static int test_count[2];
244 static int test_stride_levels;
245 char *MessageRcvBuffer;
246 
247 extern void armci_util_wait_int(volatile int *,int,int);
248 void armci_send_data_to_client(int proc, void *buf,int bytes,void *dbuf);
249 void armci_server_register_region(void *,long,ARMCI_MEMHDL_T *);
250 static descr_pool_t serv_descr_pool = {MAX_DESCR,NULL,NULL};
251 static descr_pool_t client_descr_pool = {MAX_DESCR,NULL,NULL};
252 
253 /**Buffer (long[1] used to set msginfo->tag.ack_ptr in
254    client-side. See usage in SERVER_SEND_ACK macro*/
255 static long *ack_buf;
256 
257 #define GET_DATA_PTR(buf) (sizeof(request_header_t) + (char*)buf)
258 
259 #define BUF_TO_SDESCR(buf) ((struct ibv_send_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->sdscr))
260 
261 #define BUF_TO_RDESCR(buf) ((struct ibv_recv_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rdscr))
262 
263 #define BUF_TO_SSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->ssg_entry))
264 
265 #define BUF_TO_RSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rsg_entry))
266 
267 #define BUF_TO_ECBUF(buf) (vapibuf_ext_t*)(((char*)buf) - (sizeof(struct ibv_send_wr)+sizeof(struct ibv_recv_wr)+2*sizeof(struct ibv_sge)))
268 
269 #define SERVER_SEND_ACK(p) do {            \
270     assert(*ack_buf == ARMCI_STAMP);       \
271     assert((p)>=0);                        \
272     armci_send_data_to_client((p),ack_buf, \
273       sizeof(long),msginfo->tag.ack_ptr);  \
274   } while(0)
275 /* #define SERVER_SEND_ACK(p) {assert(serv_buf!=NULL);assert(msginfo->from==(p));*((long *)serv_buf->buf)=ARMCI_STAMP;armci_send_data_to_client((p),serv_buf->buf,sizeof(long),msginfo->tag.ack_ptr);} */
276 
277 #define SERVER_SEND_DATA(_SS_proc,_SS_src,_SS_dst,_SS_size) {armci_send_data_to_client(_SS_proc,_SS_src,_SS_size,_SS_dst);}
278 #define SERVER_GET_DATA(_SG_proc,_SG_src,_SG_dst,_SG_size) {armci_get_data_from_client(_SG_proc,_SG_src,_SG_size,_SG_dst);}
279 
280 
281 /*\ descriptors will have unique ID's for the wait on descriptor routine to
282  * complete a descriptor and know where it came from
283 \*/
284 
285 #define NUMOFBUFFERS (MAX_BUFS+MAX_SMALL_BUFS)
286 #define DSCRID_FROMBUFS 1
287 #define DSCRID_FROMBUFS_END (DSCRID_FROMBUFS+NUMOFBUFFERS)
288 
289 #define DSCRID_NBDSCR 10000
290 #define DSCRID_NBDSCR_END (10000+MAX_PENDING)
291 
292 #define DSCRID_SCATGAT 20000
293 #define DSCRID_SCATGAT_END 20000+MAX_PENDING
294 
295 #define DSCRID_RMW 30000
296 #define DSCRID_RMW_END 30000+9999
297 
298 #if defined(PEND_BUFS)
299 #define DSCRID_PENDBUF (40000)
300 #define DSCRID_PENDBUF_END (DSCRID_PENDBUF + 2*PENDING_BUF_NUM+1)
301 
302 #define DSCRID_IMMBUF_RECV     (200000)
303 #define DSCRID_IMMBUF_RECV_END (600000)
304 
305 #define DSCRID_IMMBUF_RESP     (600000)
306 #define DSCRID_IMMBUF_RESP_END (1000000)
307 #endif
308 
309 extern double MPI_Wtime();
310 static double inittime0=0,inittime1=0,inittime2=0,inittime4=0;
311 
312 static int mark_buf_send_complete[NUMOFBUFFERS+1];
313 static sr_descr_t armci_vapi_client_nbsdscr_array[MAX_PENDING];
314 static sr_descr_t armci_vapi_client_nbrdscr_array[MAX_PENDING];
315 static sr_descr_t armci_vapi_serv_nbsdscr_array[MAX_PENDING];
316 static sr_descr_t armci_vapi_serv_nbrdscr_array[MAX_PENDING];
317 
318 void armci_server_transport_cleanup();
319 /********************FUNCTIONS TO CHECK OPENIB RETURN STATUS*******************/
armci_check_status(int debug,int rc,char * msg)320 void armci_check_status(int debug, int rc,char *msg)
321 {
322   dassertp(debug,rc==0,("%d: %s, rc=%d\n",armci_me,msg,rc));
323 /*     if(debug)printf("%d:%s, rc = %d\n", armci_me,msg, rc); */
324 /*     if(rc!=0)armci_die(msg,rc); */
325 }
326 
armci_vapi_check_return(int debug,int ret,const char * ss)327 void armci_vapi_check_return(int debug, int ret, const char *ss)
328 {
329 }
330 
armci_vapi_print_dscr_info(struct ibv_send_wr * sr,struct ibv_recv_wr * rr)331 void armci_vapi_print_dscr_info(struct ibv_send_wr *sr, struct ibv_recv_wr *rr)
332 {
333 int i;
334     if(rr){
335        printf("\n%d:print_dscr rr id=%ld sg_lst_len=%d",
336               armci_me, rr->wr_id, rr->num_sge);
337        for (i = 0; i < rr->num_sge; i++) {
338          printf("\n\t:sg_entry=%d addr=%lu len=%d",
339                 i, rr->sg_list[i].addr, rr->sg_list[i].length);
340        }
341        fflush(stdout);
342     }
343     if(sr){
344        printf("\n%d:print_dscr sr id=%lu opcode=%d sg_lst_len=%d",
345               armci_me, sr->wr_id, sr->opcode, sr->num_sge);
346        for (i = 0; i < sr->num_sge; i++) {
347          printf("\n\t:sg_entry=%d addr=%lu len=%d",
348                 i, sr->sg_list[i].addr, sr->sg_list[i].length);
349        }
350        fflush(stdout);
351     }
352 }
353 
354 /*****************END FUNCTIONS TO CHECK VAPI RETURN STATUS********************/
355 
armci_recv_complete(struct ibv_recv_wr * rcv_dscr,char * from,int numofrecvs)356 void armci_recv_complete(struct ibv_recv_wr *rcv_dscr,
357         char *from, int numofrecvs)
358 {
359 int rc=0;
360 struct ibv_wc pdscr1;
361 struct ibv_wc *pdscr = &pdscr1;
362 sr_descr_t *rdscr_arr;
363 vapi_nic_t *nic;
364 int debug,i;
365 
366     if(SERVER_CONTEXT){
367        rdscr_arr = armci_vapi_serv_nbrdscr_array;
368        nic=CLN_nic;
369        debug = DEBUG_SERVER;
370     }
371     else{
372        rdscr_arr = armci_vapi_client_nbrdscr_array;
373        nic=SRV_nic;
374        debug = DEBUG_CLN;
375     }
376     if(debug){
377        printf("\n%d%s:recv_complete called from %s id=%ld\n",armci_me,
378                ((SERVER_CONTEXT)?"(s)":" "),from,rcv_dscr->wr_id);fflush(stdout);
379     }
380     for(i=0;i<numofrecvs;i++){
381     do{
382       while(rc == 0) {
383          rc = ibv_poll_cq(nic->rcq, 1, pdscr);
384       }
385       dassertp(DBG_POLL|DBG_ALL,rc>=0,
386 	       ("%d: rc=%d id=%d status=%d (%d/%d)\n",
387 		armci_me,rc,pdscr->wr_id,pdscr->status,i,numofrecvs));
388       dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
389        if(debug){
390          if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END)
391            printf("\n%d:recv from %s complete id=%lu num=%d",armci_me,
392              from,pdscr->wr_id,rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs);
393        }
394        if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
395          rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
396          if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
397            rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
398        }
399        else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
400                /*this was from a blocking call, do nothing*/
401          continue;
402        }
403        else {
404          armci_die("\nclient should be posting only one kind of recv",armci_me);
405        }
406        rc = 0;
407    }while(pdscr->wr_id!=rcv_dscr->wr_id);
408    rc = 0;
409    }
410 
411 }
412 
armci_vapi_set_mark_buf_send_complete(int id)413 void armci_vapi_set_mark_buf_send_complete(int id)
414 {
415     mark_buf_send_complete[id]=0;
416 }
417 
armci_send_complete(struct ibv_send_wr * snd_dscr,char * from,int numoftimes)418 void armci_send_complete(struct ibv_send_wr *snd_dscr, char *from,int numoftimes)
419 {
420     int rc=0;
421     struct ibv_wc pdscr1;
422     struct ibv_wc *pdscr = &pdscr1;
423     sr_descr_t *sdscr_arr;
424     vapi_nic_t *nic;
425     int debug,i;
426 
427     pdscr1.status = IBV_WC_SUCCESS;
428     /*  bzero(&pdscr1, sizeof(pdscr1)); */
429     /* printf("%d: Waiting for send with wr_id=%d to complete\n", armci_me, snd_dscr->wr_id); */
430     /* fflush(stdout); */
431 
432     if(SERVER_CONTEXT){
433         sdscr_arr = armci_vapi_serv_nbsdscr_array;
434         nic=CLN_nic;
435         debug = DEBUG_SERVER;
436     }
437     else{
438         sdscr_arr = armci_vapi_client_nbsdscr_array;
439         nic=SRV_nic;
440         debug = DEBUG_CLN;
441     }
442 
443     if(debug) {
444         printf("\n%d%s:send_complete called from %s id=%ld nt=%d\n",armci_me,
445                 ((SERVER_CONTEXT)?"(s)":" "),from,snd_dscr->wr_id,numoftimes);
446         fflush(stdout);
447     }
448     for(i=0;i<numoftimes;i++){
449         do{
450             while(rc == 0){
451 #if defined(PEND_BUFS)
452                 if(SERVER_CONTEXT)
453                     rc = ibv_poll_cq(nic->rcq,1,pdscr);
454                 else
455 #endif
456                     rc = ibv_poll_cq(nic->scq,1, pdscr);
457             }
458             dassertp(DBG_POLL|DBG_ALL,rc>=0,
459                     ("%d:rc=%d status=%d id=%d (%d/%d)",armci_me,
460                      rc,pdscr->status,(int)pdscr->wr_id,i,numoftimes));
461             dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
462             /*       printf("%d: Obtained completion of wr_id=%d\n", armci_me, pdscr->wr_id); */
463             /*       fflush(stdout); */
464             if(SERVER_CONTEXT){
465                 if(debug)printf("%d:completed id %lu i=%d\n",armci_me,pdscr->wr_id,i);
466                 if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
467                     sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
468                     if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
469                         sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
470                 }
471                 else if(pdscr->wr_id >=armci_nproc && pdscr->wr_id < 2*armci_nproc){
472                     /*its coming from send_data_to_client just return*/
473                 }
474 #if defined(PEND_BUFS)
475                 else if(pdscr->wr_id >= DSCRID_IMMBUF_RESP && pdscr->wr_id>DSCRID_IMMBUF_RESP_END) {
476                     /*send from server to client completed*/
477                 }
478 #endif
479                 else armci_die("server send complete got weird id",pdscr->wr_id);
480             }
481             else{
482                 if(debug)printf("%d:completed id %lu i=%d\n",armci_me,pdscr->wr_id,i);
483                 if(pdscr->wr_id >=DSCRID_FROMBUFS && pdscr->wr_id < DSCRID_FROMBUFS_END) {
484                     /*	   printf("%d: marking send buffer %d as complete\n", armci_me, pdscr->wr_id);*/
485                     mark_buf_send_complete[pdscr->wr_id]=1;
486                 }
487                 else if(pdscr->wr_id >=DSCRID_NBDSCR && pdscr->wr_id < DSCRID_NBDSCR_END){
488                     sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends--;
489                     if(sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends==0)
490                         sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].tag=0;
491                 }
492                 else if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
493                     sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
494                     if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
495                         sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
496                 }
497                 else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
498                     /* 	   printf("%d: completed a blocking scatgat descriptor\n", armci_me); */
499                     /*this was from a blocking call, do nothing*/
500                     continue;
501                 }
502                 else armci_die("client send complete got weird id",pdscr->wr_id);
503             }
504             rc = 0;
505         }while(pdscr->wr_id!=snd_dscr->wr_id);
506         rc = 0;
507     }
508 }
509 
510 
armci_dscrlist_recv_complete(int tag,char * from,sr_descr_t * dscr)511 void armci_dscrlist_recv_complete(int tag, char* from,sr_descr_t *dscr)
512 {
513     int i,nr;
514     sr_descr_t *retdscr,*rdscr_arr;
515 
516     if(dscr == NULL){
517         if(SERVER_CONTEXT)
518             rdscr_arr = armci_vapi_serv_nbrdscr_array;
519         else
520             rdscr_arr = armci_vapi_client_nbrdscr_array;
521 
522         for(i=0;i<MAX_PENDING;i++){
523             if(rdscr_arr[i].tag==tag)
524                 break;
525         }
526 
527         if(i==MAX_PENDING)return;
528         retdscr = &rdscr_arr[i];
529     }
530     else
531         retdscr=dscr;
532 
533     nr = retdscr->numofrecvs;
534     armci_recv_complete(&(retdscr->rdescr),"(s)list_send_complete",nr);
535 }
536 
537 
armci_dscrlist_send_complete(int tag,char * from,sr_descr_t * dscr)538 void armci_dscrlist_send_complete(int tag,char *from, sr_descr_t *dscr)
539 {
540     int i,ns;
541     sr_descr_t *retdscr,*sdscr_arr;
542     if(dscr==NULL){
543         if(SERVER_CONTEXT)
544             sdscr_arr = armci_vapi_serv_nbsdscr_array;
545         else
546             sdscr_arr = armci_vapi_client_nbsdscr_array;
547 
548         for(i=0;i<MAX_PENDING;i++){
549             if(sdscr_arr[i].tag==tag)
550                 break;
551         }
552         if(i==MAX_PENDING)return;
553         retdscr=&sdscr_arr[i];
554     }
555     else
556         retdscr=dscr;
557 
558     ns = retdscr->numofsends;
559 
560     armci_send_complete(&(retdscr->sdescr),"dscrlist_send_complete",ns);
561 
562 }
563 
armci_client_nbcall_complete(sr_descr_t * dscr,int tag,int op)564 void armci_client_nbcall_complete(sr_descr_t *dscr, int tag, int op)
565 {
566     if(tag != dscr->tag)
567         return;
568 
569 	THREAD_LOCK(armci_user_threads.net_lock);
570 
571     if(op == GET){
572        if(dscr->issg){
573          if(dscr->numofrecvs>0)
574            armci_dscrlist_recv_complete(tag,"armci_client_nbcall_complete recv",
575                            dscr);
576        }
577        else{
578          if(dscr->numofsends>0)
579            armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
580                            dscr);
581        }
582     }
583     if(op == PUT){
584        if(dscr->numofsends>0)
585          armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
586                          dscr);
587     }
588 
589 	THREAD_UNLOCK(armci_user_threads.net_lock);
590 }
591 
592 
593 static int cur_serv_pend_descr;
594 static int cur_client_pend_descr;
595 
armci_vapi_get_next_rdescr(int nbtag,int sg)596 sr_descr_t *armci_vapi_get_next_rdescr(int nbtag,int sg)
597 {
598 static int serverthreadavail=-1; /*client thread can't touch this*/
599 static int clientthreadavail=-1; /*server thread can't touch this*/
600 int avail,newavail;
601 sr_descr_t *retdscr,*rdscr_arr;
602 
603     if(SERVER_CONTEXT){
604        rdscr_arr = armci_vapi_serv_nbrdscr_array;
605        avail = serverthreadavail;
606        /*printf("\n%d:serv thread avail=%d",armci_me,serverthreadavail);*/
607     }
608     else{
609        rdscr_arr = armci_vapi_client_nbrdscr_array;
610        avail = clientthreadavail;
611     }
612     if(avail==-1){
613        int i;
614        for(i=0;i<MAX_PENDING;i++){
615          rdscr_arr[i].tag=0;
616          bzero(&rdscr_arr[i].rdescr,sizeof(struct ibv_recv_wr));
617          if(sg)
618            rdscr_arr[i].rdescr.wr_id = DSCRID_SCATGAT + i;
619          else
620            rdscr_arr[i].rdescr.wr_id = DSCRID_NBDSCR + i;
621        }
622        avail=0;
623     }
624 
625     if(rdscr_arr[avail].tag!=0){
626        armci_dscrlist_recv_complete(rdscr_arr[avail].tag,
627                          "armci_vapi_get_next_rdescr",&rdscr_arr[avail]);
628     }
629 
630     rdscr_arr[avail].tag=nbtag;
631     rdscr_arr[avail].issg=sg;
632     retdscr= (rdscr_arr+avail);
633 
634     memset(&retdscr->rdescr,0,sizeof(struct ibv_recv_wr));
635 
636     if(sg)
637        retdscr->rdescr.wr_id = DSCRID_SCATGAT + avail;
638     else{
639        retdscr->rdescr.wr_id = DSCRID_NBDSCR + avail;
640        retdscr->numofrecvs=1;
641     }
642 
643     newavail = (avail+1)%MAX_PENDING;
644 
645     if(SERVER_CONTEXT){
646       cur_serv_pend_descr = avail;
647       serverthreadavail=newavail;
648     }
649     else{
650       cur_client_pend_descr = avail;
651       clientthreadavail=newavail;
652     }
653 
654     return(retdscr);
655 
656 }
657 
armci_vapi_get_next_sdescr(int nbtag,int sg)658 sr_descr_t *armci_vapi_get_next_sdescr(int nbtag,int sg)
659 {
660     static int serverthreadavail=-1; /*client thread can't touch this*/
661     static int clientthreadavail=-1; /*server thread can't touch this*/
662     int avail,newavail;
663     sr_descr_t *retdscr,*sdscr_arr;
664 
665     if(SERVER_CONTEXT){
666         sdscr_arr = armci_vapi_serv_nbsdscr_array;
667         avail = serverthreadavail;
668     }
669     else{
670         sdscr_arr = armci_vapi_client_nbsdscr_array;
671         avail = clientthreadavail;
672     }
673 
674     if(avail==-1){ /*first call*/
675         int i;
676         for(i=0;i<MAX_PENDING;i++){
677             sdscr_arr[i].tag=0;
678             bzero(&sdscr_arr[i].sdescr,sizeof(struct ibv_send_wr));
679             if(sg)
680                 sdscr_arr[i].sdescr.wr_id = DSCRID_SCATGAT+i;
681             else
682                 sdscr_arr[i].sdescr.wr_id = DSCRID_NBDSCR + i;
683         }
684         avail=0;
685     }
686 
687     if(sdscr_arr[avail].tag!=0){
688         armci_dscrlist_send_complete(sdscr_arr[avail].tag,
689                 "armci_vapi_get_next_sdescr",&sdscr_arr[avail]);
690     }
691 
692     sdscr_arr[avail].tag=nbtag;
693     sdscr_arr[avail].issg=sg;
694     retdscr= (sdscr_arr+avail);
695 
696     memset(&retdscr->sdescr,0,sizeof(struct ibv_recv_wr));
697 
698     if(sg)
699         retdscr->sdescr.wr_id = DSCRID_SCATGAT + avail;
700     else{
701         retdscr->sdescr.wr_id = DSCRID_NBDSCR + avail;
702         retdscr->numofsends=1;
703     }
704 
705     newavail = (avail+1)%MAX_PENDING;
706 
707     if(SERVER_CONTEXT){
708         cur_serv_pend_descr = avail;
709         serverthreadavail=newavail;
710     }
711     else{
712         cur_client_pend_descr = avail;
713         clientthreadavail=newavail;
714     }
715     return(retdscr);
716 }
717 
armci_wait_for_server()718 void armci_wait_for_server()
719 {
720     armci_server_terminating = 1;
721 }
722 
723 
724 /* ibv_create_qp does not use separate structure to return properties,
725    seems it is all inside ibv_qp */
armci_create_qp(vapi_nic_t * nic,struct ibv_qp ** qp)726 static void armci_create_qp(vapi_nic_t *nic, struct ibv_qp **qp)
727 {
728     struct ibv_qp_init_attr initattr;
729 
730     bzero(&initattr, sizeof(struct ibv_qp_init_attr));
731 
732     *qp = NULL;
733 
734     initattr.cap.max_send_wr = armci_max_qp_ous_swr;
735     initattr.cap.max_recv_wr = armci_max_qp_ous_rwr;
736     initattr.cap.max_recv_sge = armci_max_num_sg_ent;
737     initattr.cap.max_send_sge = armci_max_num_sg_ent;
738 #if defined(PEND_BUFS)
739     if(nic==CLN_nic) {
740         initattr.send_cq = nic->rcq;
741         initattr.recv_cq = nic->rcq;
742     }
743     else
744 #endif
745     {
746         initattr.send_cq = nic->scq;
747         initattr.recv_cq = nic->rcq;
748     }
749     initattr.qp_type = IBV_QPT_RC;
750 
751     *qp = ibv_create_qp(nic->ptag, &initattr);
752     dassert(1,*qp!=NULL);
753 
754     /* The value of inline size should be dependent on number of processes
755      * */
756     if (armci_nproc >= MAX_PROC_INLINE_SIZE) {
757         armci_vapi_max_inline_size = -1;
758     }
759     else {
760         armci_vapi_max_inline_size = initattr.cap.max_inline_data;
761     }
762 }
763 
764 int armci_openib_sl;
765 int armci_openib_server_poll;
armci_openib_env_init()766 void armci_openib_env_init()
767 {
768     char *value;
769 
770     if ((value = getenv("ARMCI_OPENIB_USE_SL")) != NULL){
771         armci_openib_sl = atoi(value);
772     }
773     else {
774         armci_openib_sl = 0;
775     }
776 
777     /* Don't enable server polling by default */
778     if ((value = getenv("ARMCI_OPENIB_SERVER_POLL")) != NULL){
779         armci_openib_server_poll = atoi(value);
780     }
781     else {
782         armci_openib_server_poll = 0;
783     }
784 }
785 
armci_init_nic(vapi_nic_t * nic,int scq_entries,int rcq_entries)786 static void armci_init_nic(vapi_nic_t *nic, int scq_entries, int
787         rcq_entries)
788 {
789     int rc, ndevs, i;
790     struct ibv_device **devs=NULL;
791 
792     if (nic == SRV_nic) {
793         /* Initialize OpenIB runtime variables only once*/
794         armci_openib_env_init();
795     }
796 
797     bzero(nic,sizeof(vapi_nic_t));
798     nic->lid_arr = (uint16_t *)calloc(armci_nproc,sizeof(uint16_t));
799     dassert(1,nic->lid_arr!=NULL);
800 
801     devs = ibv_get_device_list(&ndevs);
802 
803     char *runtime_devname;
804     int device_found = 0, device_id = 0;
805 
806     runtime_devname = getenv("ARMCI_OPENIB_DEVICE");
807 
808     if (runtime_devname) {
809         for (i = 0; i < ndevs; i++) {
810             if (!strncmp(ibv_get_device_name(devs[i]), runtime_devname, 32)) {
811                 device_found = 1;
812                 device_id = i;
813                 break;
814             }
815         }
816     }
817     else {
818         device_id = 0;
819         device_found = 1;
820     }
821 
822     assert(device_found);
823     nic->handle = ibv_open_device(devs[device_id]);
824 
825     nic->maxtransfersize = MAX_RDMA_SIZE;
826 
827     nic->vendor = ibv_get_device_name(devs[device_id]);
828 
829     rc = ibv_query_device(nic->handle, &nic->attr);
830     assert(0 == rc);
831 
832     int down_port_count_check = 0;
833     for (i = 1; i <= 2; i++) {
834         rc = ibv_query_port(nic->handle, (uint8_t)i, &nic->hca_port);
835         assert(0 == rc);
836         if (IBV_PORT_ACTIVE == nic->hca_port.state) {
837             nic->active_port = i;
838             break;
839         }
840         else {
841             down_port_count_check++;
842         }
843     }
844 
845     /* Assert that the number of inactive ports is not equal to the number
846      * of down ports on any adapter */
847     assert(down_port_count_check != 2);
848 
849     /*save the lid for doing a global exchange later */
850     nic->lid_arr[armci_me] = nic->hca_port.lid;
851 
852     /*allocate tag (protection domain) */
853     nic->ptag = ibv_alloc_pd(nic->handle);
854 
855     /* properties of scq and rcq required for the cq number, this also needs
856      * to be globally exchanged
857      */
858     nic->scv = 1;
859     nic->rcv = 2;
860     nic->scq = nic->rcq = NULL;
861 
862     if(scq_entries) {
863         nic->sch = ibv_create_comp_channel(nic->handle);
864         nic->scq = ibv_create_cq(nic->handle, 16000,
865                 nic->scq_cntx,nic->sch, 0);
866     }
867 
868     if(rcq_entries) {
869         nic->rch = ibv_create_comp_channel(nic->handle);
870         nic->rcq = ibv_create_cq(nic->handle, 32768,
871                 nic->rcq_cntx,nic->rch, 0);
872     }
873 
874     ibv_free_device_list(devs);
875 
876     armci_max_num_sg_ent = 29;
877     armci_max_qp_ous_swr = 100;
878     armci_max_qp_ous_rwr = 50;
879 
880     char *value;
881     if ((value = getenv("ARMCI_USE_ODCM")) != NULL){
882         armci_use_odcm = atoi(value);
883     } else {
884         armci_use_odcm = 0;
885     }
886 
887     armci_use_lazy_break = 0;
888 
889     if(armci_max_qp_ous_rwr + armci_max_qp_ous_swr>nic->attr.max_qp_wr){
890         armci_max_qp_ous_swr = nic->attr.max_qp_wr/16;
891         armci_max_qp_ous_rwr = nic->attr.max_qp_wr - armci_max_qp_ous_swr;
892     }
893     if(armci_max_num_sg_ent >= nic->attr.max_sge){
894         armci_max_num_sg_ent = nic->attr.max_sge - 1;
895     }
896 
897 }
898 
899 
armci_setaffinity(char * cpu_mapping)900 void armci_setaffinity(char *cpu_mapping) {
901     long N_CPUs_online;
902     cpu_set_t affinity_mask;
903     unsigned long affinity_mask_len = sizeof(affinity_mask);
904     char *tp;
905     char *cp;
906     char tp_str[8];
907     int cpu, i, j;
908 
909 
910     if (!armci_use_affinity)
911         return;
912 
913     /*Get number of CPU on machine */
914     if ((N_CPUs_online = sysconf(_SC_NPROCESSORS_ONLN)) < 1) {
915         perror("sysconf");
916     }
917 
918     if (cpu_mapping) {
919         tp = cpu_mapping;
920         j = 0;
921         while (*tp != '\0') {
922             i = 0;
923             cp = tp;
924             while (*cp != '\0' && *cp != ',' && *cp != ':') {
925                 cp++;
926                 i++;
927             }
928             strncpy(tp_str, tp, i);
929             tp_str[i] = '\0';
930             cpu = atoi(tp_str);
931 
932             if (j == armci_me - armci_master) {
933                 CPU_ZERO(&affinity_mask);
934                 CPU_SET(cpu, &affinity_mask);
935                 if (sched_setaffinity(0,
936                     affinity_mask_len, &affinity_mask)<0 ) {
937                     perror("sched_setaffinity");
938                 }
939                 break;
940             }
941             tp = cp + 1;
942             j++;
943         }
944 
945         free(cpu_mapping);
946     } else {
947         CPU_ZERO(&affinity_mask);
948         CPU_SET(((armci_me) - armci_master) %N_CPUs_online, &affinity_mask);
949 
950         if (sched_setaffinity(0,affinity_mask_len,&affinity_mask)<0 ) {
951             perror("sched_setaffinity");
952         }
953     }
954 }
955 
956 /****************MEMORY ALLOCATION REGISTRATION DEREGISTRATION****************/
957 static char * serv_malloc_buf_base;
958 #if ARMCI_ENABLE_GPC_CALLS
959 extern gpc_buf_t *gpc_req;
960 #endif
armci_server_alloc_bufs()961 void armci_server_alloc_bufs()
962 {
963     int bytes, total, extra =sizeof(struct ibv_recv_wr)*MAX_DESCR+SIXTYFOUR;
964     int mhsize = armci_nproc*sizeof(armci_vapi_memhndl_t); /* ack */
965     char *tmp, *tmp0;
966     int i;
967 #if defined(PEND_BUFS)
968     int clients = (IMM_BUF_NUM+1)*armci_nproc;
969 #else
970     int clients = armci_nproc;
971 #endif
972 
973     /* allocate memory for the recv buffers-must be alligned on 64byte bnd */
974     /* note we add extra one to repost it for the client we are received req */
975     bytes = (clients+1)*sizeof(vapibuf_t)+sizeof(vapibuf_ext_t) + extra+ mhsize
976 #if ARMCI_ENABLE_GPC_CALLS
977       + MAX_GPC_REQ * sizeof(gpc_buf_t)
978 #endif
979 #if defined(PEND_BUFS)
980       + (clients+1)*IMM_BUF_LEN
981       + PENDING_BUF_NUM*(sizeof(vapibuf_pend_t)+PENDING_BUF_LEN)
982 #endif
983       + sizeof(long)
984       + 7*SIXTYFOUR;
985     total = bytes + SIXTYFOUR;
986     if(total%4096!=0)
987        total = total - (total%4096) + 4096;
988     tmp0=tmp = malloc(total);
989     serv_malloc_buf_base = tmp0;
990 
991     dassert1(1,tmp!=NULL,(int)total);
992     /* stamp the last byte */
993     serv_tail= tmp + bytes+SIXTYFOUR-1;
994     *serv_tail=SERV_STAMP;
995     /* allocate memory for client memory handle to support put response
996      *         in dynamic memory registration protocols */
997     CLN_handle = (armci_vapi_memhndl_t*)tmp;
998     memset(CLN_handle,0,mhsize); /* set it to zero */
999     tmp += mhsize;
1000 
1001 #if ARMCI_ENABLE_GPC_CALLS
1002     /* gpc_req memory*/
1003     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1004     gpc_req = (gpc_buf_t *)tmp;
1005     tmp += MAX_GPC_REQ * sizeof(gpc_buf_t);
1006 #endif
1007 
1008     /* setup descriptor memory */
1009     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1010     serv_descr_pool.descr= (struct ibv_recv_wr *)(tmp);
1011     tmp += extra;
1012 
1013     /* setup ack buffer*/
1014     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1015     ack_buf = (long *)(tmp);
1016     *ack_buf=ARMCI_STAMP;
1017     tmp += sizeof(long);
1018 
1019     /* setup buffer pointers */
1020     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1021     serv_buf_arr = (vapibuf_t **)malloc(sizeof(vapibuf_t*)*clients);
1022     for(i=0;i<clients;i++){
1023       serv_buf_arr[i] = (vapibuf_t*)(tmp) + i;
1024     }
1025     tmp = (char *)(serv_buf_arr[0]+clients);
1026 
1027 #if defined(PEND_BUFS)
1028     /*setup buffers in immediate buffers*/
1029     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1030     for(i=0; i<clients; i++) {
1031       serv_buf_arr[i]->buf = tmp + i*IMM_BUF_LEN;
1032     }
1033     tmp += clients*IMM_BUF_LEN;
1034 
1035     /*setup pending buffers*/
1036     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1037     serv_pendbuf_arr = (vapibuf_pend_t *)(tmp);
1038     tmp=(char *)(serv_pendbuf_arr+PENDING_BUF_NUM);
1039     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1040     for(i=0; i<PENDING_BUF_NUM; i++) {
1041       serv_pendbuf_arr[i].buf = tmp+i*PENDING_BUF_LEN;
1042       assert(serv_pendbuf_arr[i].buf != NULL);
1043     }
1044     tmp += PENDING_BUF_NUM*PENDING_BUF_LEN;
1045     MessageRcvBuffer = NULL;
1046 #else
1047     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
1048     spare_serv_buf = (vapibuf_t *)tmp; /* spare buffer is at the end */
1049     spare_serv_bufptr = spare_serv_buf;    /* save the pointer for later */
1050     serv_buf =(vapibuf_ext_t*)(spare_serv_buf+1);
1051     tmp = (char *)(serv_buf+1);
1052 
1053     MessageRcvBuffer = serv_buf->buf;
1054 #endif
1055 
1056    flag_arr = (int *)malloc(sizeof(int)*armci_nproc);
1057    for (i =0; i<armci_nproc; i++) flag_arr[i] = 9999;
1058 
1059     if(DEBUG_SERVER){
1060       printf("\n%d(s):registering mem %p %dbytes ptag=%p handle=%p\n",
1061              armci_me, tmp0,total,(void*)CLN_nic->ptag,(void*)CLN_nic->handle);fflush(stdout);
1062     }
1063 
1064     serv_memhandle.memhndl = ibv_reg_mr(CLN_nic->ptag, tmp0, total,
1065                                         IBV_ACCESS_LOCAL_WRITE |
1066                                         IBV_ACCESS_REMOTE_WRITE |
1067                                         IBV_ACCESS_REMOTE_READ);
1068     dassert1(1,serv_memhandle.memhndl!=NULL,total);
1069     serv_memhandle.lkey=serv_memhandle.memhndl->lkey;
1070     serv_memhandle.rkey=serv_memhandle.memhndl->rkey;
1071 
1072     /* exchange address of ack/memhandle flag on servers */
1073     if(DEBUG_SERVER){
1074        printf("%d(s):registered mem %p %dbytes mhandle=%p mharr starts%p\n",
1075               armci_me, tmp0, total, (void*)serv_memhandle.memhndl,(void*)CLN_handle);
1076        fflush(stdout);
1077     }
1078 }
1079 
1080 static char * client_malloc_buf_base;
armci_vapi_client_mem_alloc(int size)1081 char * armci_vapi_client_mem_alloc(int size)
1082 {
1083     int mod, total;
1084     int extra = MAX_DESCR*sizeof(struct ibv_recv_wr)+SIXTYFOUR;
1085     char *tmp,*tmp0;
1086 
1087     /*we use the size passed by the armci_init_bufs routine instead of bytes*/
1088 
1089     total = size + extra + 2*SIXTYFOUR;
1090 
1091     if(total%4096!=0)
1092        total = total - (total%4096) + 4096;
1093     tmp0  = tmp = malloc(total);
1094     dassert1(1,tmp!=NULL,total);
1095     client_malloc_buf_base = tmp;
1096 #if 0
1097     /*SK: could this lead to a problem at ibv_reg_mr() because of unfixed 'total'?*/
1098     if(ALIGN64ADD(tmp0))tmp0+=ALIGN64ADD(tmp0);
1099 #endif
1100     /* stamp the last byte */
1101     client_tail= tmp + extra+ size +2*SIXTYFOUR-1;
1102     *client_tail=CLIENT_STAMP;
1103 
1104     /* we also have a place to store memhandle for zero-copy get */
1105     pinned_handle =(armci_vapi_memhndl_t *) (tmp + extra+ size +SIXTYFOUR-16);
1106 
1107     mod = ((ssize_t)tmp)%SIXTYFOUR;
1108     client_descr_pool.descr= (struct ibv_recv_wr*)(tmp+SIXTYFOUR-mod);
1109     tmp += extra;
1110 
1111     client_memhandle.memhndl = ibv_reg_mr(SRV_nic->ptag, tmp0, total,
1112                                           IBV_ACCESS_LOCAL_WRITE |
1113                                           IBV_ACCESS_REMOTE_WRITE |
1114                                           IBV_ACCESS_REMOTE_READ);
1115     dassert(1,client_memhandle.memhndl!=NULL);
1116 
1117     client_memhandle.lkey = client_memhandle.memhndl->lkey;
1118     client_memhandle.rkey = client_memhandle.memhndl->rkey;
1119     handle_array[armci_me].lkey = client_memhandle.lkey;
1120     handle_array[armci_me].rkey = client_memhandle.rkey;
1121 
1122     handle_array[armci_me].memhndl = client_memhandle.memhndl;
1123 
1124     if(DEBUG_INIT){
1125        printf("%d: registered client memory %p %dsize tmp=%p \n",
1126                armci_me,tmp0, total, tmp);
1127        fflush(stdout);
1128     }
1129     /*now that we have the handle array, we get every body elses RDMA handle*/
1130     total = (sizeof(armci_vapi_memhndl_t)*armci_nproc)/sizeof(int);
1131     armci_msg_gop_scope(SCOPE_ALL,handle_array,total,"+",ARMCI_INT);
1132 
1133     return(tmp);
1134 }
1135 
1136 
armci_server_register_region(void * ptr,long bytes,ARMCI_MEMHDL_T * memhdl)1137 void armci_server_register_region(void *ptr,long bytes, ARMCI_MEMHDL_T *memhdl)
1138 {
1139     bzero(memhdl,sizeof(ARMCI_MEMHDL_T));
1140 
1141     memhdl->memhndl = ibv_reg_mr(CLN_nic->ptag, ptr, bytes,
1142                IBV_ACCESS_LOCAL_WRITE |
1143                IBV_ACCESS_REMOTE_WRITE |
1144                IBV_ACCESS_REMOTE_READ);
1145     dassert(1,memhdl->memhndl!=NULL);
1146 
1147     memhdl->lkey=memhdl->memhndl->lkey;
1148     memhdl->rkey=memhdl->memhndl->rkey;
1149 
1150     if(DEBUG_SERVER){
1151        printf("\n%d(s):registered lkey=%d rkey=%d ptr=%p end=%p %p\n",armci_me,
1152                memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes,(void*)memhdl);
1153        fflush(stdout);
1154     }
1155 }
1156 
armci_pin_contig_hndl(void * ptr,size_t bytes,ARMCI_MEMHDL_T * memhdl)1157 int armci_pin_contig_hndl(void *ptr, size_t bytes, ARMCI_MEMHDL_T *memhdl)
1158 {
1159     memhdl->memhndl = ibv_reg_mr(SRV_nic->ptag, ptr, bytes,
1160                IBV_ACCESS_LOCAL_WRITE |
1161                IBV_ACCESS_REMOTE_WRITE |
1162                IBV_ACCESS_REMOTE_READ);
1163     dassert(1,memhdl->memhndl!=NULL);
1164     memhdl->lkey=memhdl->memhndl->lkey;
1165     memhdl->rkey=memhdl->memhndl->rkey;
1166     if(0){
1167        printf("\n%d:registered lkey=%d rkey=%d ptr=%p end=%p\n",armci_me,
1168                memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes);fflush(stdout);
1169     }
1170     return 1;
1171 }
1172 
1173 #if 1
armci_network_client_deregister_memory(ARMCI_MEMHDL_T * mh)1174 void armci_network_client_deregister_memory(ARMCI_MEMHDL_T *mh)
1175 {
1176     int rc;
1177     rc = ibv_dereg_mr(mh->memhndl);
1178     dassert1(1,rc==0,rc);
1179     armci_vapi_check_return(DEBUG_FINALIZE,rc,
1180                         "armci_network_client_deregister_memory:deregister_mr");
1181 }
armci_network_server_deregister_memory(ARMCI_MEMHDL_T * mh)1182 void armci_network_server_deregister_memory(ARMCI_MEMHDL_T *mh)
1183 {
1184     int rc;
1185 return; /* ??? why ??? */
1186     printf("\n%d:deregister ptr=%p",armci_me,(void*)mh);fflush(stdout);
1187     rc = ibv_dereg_mr(mh->memhndl);
1188     dassert1(1,rc==0,rc);
1189     armci_vapi_check_return(DEBUG_FINALIZE,rc,
1190                         "armci_network_server_deregister_memory:deregister_mr");
1191 }
1192 #else
1193 #   define armci_network_client_deregister_memory(mh)           \
1194            armci_vapi_check_return(DEBUG_FINALIZE,              \
1195                                    ibv_dereg_mr(mh->memhndl),   \
1196                                    "armci_network_client_deregister_memory:deregister_mr")
1197 #   define armci_network_server_deregister_memory(mh)           \
1198            armci_vapi_check_return(DEBUG_FINALIZE,              \
1199                                    ibv_dereg_mr(mh->memhndl),   \
1200                                    "armci_network_server_deregister_memory:deregister_mr")
1201 #endif
1202 
armci_set_serv_mh()1203 void armci_set_serv_mh()
1204 {
1205 int s, ratio = sizeof(ack_t)/sizeof(int);
1206     /* first collect addrresses on all masters */
1207     if(armci_me == armci_master){
1208        SRV_ack[armci_clus_me].prem_handle=CLN_handle;
1209        SRV_ack[armci_clus_me].handle =serv_memhandle;
1210        armci_msg_gop_scope(SCOPE_MASTERS,SRV_ack,ratio*armci_nclus,"+",
1211                            ARMCI_INT);
1212     }
1213     /* next master broadcasts the addresses within its node */
1214     armci_msg_bcast_scope(SCOPE_NODE,SRV_ack,armci_nclus*sizeof(ack_t),
1215                           armci_master);
1216 
1217     /* Finally save address corresponding to my id on each server */
1218     for(s=0; s< armci_nclus; s++){
1219        SRV_ack[s].prem_handle += armci_me;
1220     }
1221 
1222 }
1223 /**********END MEMORY ALLOCATION REGISTRATION AND DEREGISTRATION**************/
1224 
1225 /*\
1226  * init_connections, client_connect_to_servers -- client code
1227  * server_initial_connection, all_data_server -- server code
1228 \*/
armci_init_connections()1229 void armci_init_connections()
1230 {
1231     int c,s;
1232     int sz;
1233     uint32_t *tmpbuf;
1234     int *tmparr;
1235     if(TIME_INIT)inittime0 = MPI_Wtime();
1236 
1237 #if defined(PEND_BUFS)
1238     armci_pbuf_init_buffer_env();
1239 #endif
1240 
1241     armci_setaffinity(NULL);
1242 
1243     /* initialize nic connection for qp numbers and lid's */
1244     armci_init_nic(SRV_nic,1,1);
1245     for(c=0; c < NUMOFBUFFERS+1; c++) {
1246         mark_buf_send_complete[c]=1;
1247     }
1248     _gtmparr = (int *)calloc(armci_nproc,sizeof(int));
1249 
1250     /*qp_numbers and lids need to be exchanged globally*/
1251     tmparr = (int *)calloc(armci_nproc,sizeof(int));
1252     tmparr[armci_me] = SRV_nic->lid_arr[armci_me];
1253     sz = armci_nproc;
1254     armci_msg_gop_scope(SCOPE_ALL,tmparr,sz,"+",ARMCI_INT);
1255     for(c=0;c<armci_nproc;c++){
1256         SRV_nic->lid_arr[c]=tmparr[c];
1257         tmparr[c]=0;
1258     }
1259     /*SRV_con is for client to connect to servers */
1260     SRV_con=(armci_connect_t *)malloc(sizeof(armci_connect_t)*armci_nclus);
1261     dassert1(1,SRV_con!=NULL,sizeof(armci_connect_t)*armci_nclus);
1262     bzero(SRV_con,sizeof(armci_connect_t)*armci_nclus);
1263 
1264     CLN_con=(armci_connect_t*)malloc(sizeof(armci_connect_t)*armci_nproc);
1265     dassert1(1,CLN_con!=NULL,sizeof(armci_connect_t)*armci_nproc);
1266     bzero(CLN_con,sizeof(armci_connect_t)*armci_nproc);
1267 
1268     /*every client creates a qp with every server other than the one on itself*/
1269     SRV_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1270     dassert(1,SRV_rqpnums);
1271     tmpbuf = (uint32_t*)calloc(armci_nproc,sizeof(uint32_t));
1272     dassert(1,tmpbuf);
1273 
1274     sz = armci_nproc*(sizeof(uint32_t)/sizeof(int));
1275     armci_vapi_max_inline_size = 0;
1276 
1277 
1278     if (!armci_use_odcm) {
1279         for(s = 0; s < armci_nclus; s++){
1280             armci_connect_t *con = SRV_con + s;
1281             {
1282                 armci_create_qp(SRV_nic,&con->qp);
1283                 con->sqpnum  = con->qp->qp_num;
1284                 tmpbuf[armci_clus_info[s].master] = con->qp->qp_num;
1285                 con->lid = SRV_nic->lid_arr[s];
1286             }
1287         }
1288         MPI_Alltoall(tmpbuf,sizeof(uint32_t),MPI_CHAR,SRV_rqpnums,
1289                 sizeof(uint32_t),MPI_CHAR,ARMCI_COMM_WORLD);
1290         free(tmpbuf);
1291 
1292         if(armci_me != armci_master) {
1293             free(SRV_rqpnums);
1294             SRV_rqpnums = NULL;
1295         }
1296     }
1297     else {
1298         for(s = 0; s < armci_nclus; s++){
1299             armci_connect_t *con = SRV_con + s;
1300             con->state = QP_INACTIVE;
1301         }
1302     }
1303 
1304     SRV_ack = (ack_t*)calloc(armci_nclus, sizeof(ack_t));
1305     dassert1(1,SRV_ack!=NULL,armci_nclus*sizeof(ack_t));
1306 
1307     handle_array = (armci_vapi_memhndl_t *)calloc(sizeof(armci_vapi_memhndl_t),
1308             armci_nproc);
1309     dassert1(1,handle_array!=NULL,sizeof(armci_vapi_memhndl_t)*armci_nproc);
1310 
1311     if (armci_use_odcm) {
1312         setup_ud_channel();
1313     }
1314 }
1315 
vapi_connect_client()1316 static void vapi_connect_client()
1317 {
1318     int i, sz=0, c, rc;
1319     struct ibv_qp_attr qp_attr;
1320     enum ibv_qp_attr_mask qp_attr_mask;
1321 
1322     if (TIME_INIT) inittime0 = MPI_Wtime();
1323     if (armci_me == armci_master)
1324         armci_util_wait_int(&armci_vapi_server_stage1, 1, 10);
1325     if (TIME_INIT) printf("\n%d:wait for server to get to stage 1 time for "
1326                           "vapi_connect_client is %f",
1327                           armci_me, (inittime1 = MPI_Wtime()) - inittime0);
1328     sz = armci_nproc;
1329     if (armci_me == armci_master) {
1330        armci_msg_gop_scope(SCOPE_MASTERS, _gtmparr, sz, "+", ARMCI_INT);
1331        for (c=0; c<armci_nproc; c++) {
1332          CLN_nic->lid_arr[c] = _gtmparr[c];
1333          _gtmparr[c] = 0;
1334        }
1335        if (DEBUG_CLN) {
1336          printf("\n%d(svc): mylid = %d",armci_me,CLN_nic->lid_arr[armci_me]);
1337          fflush(stdout);
1338        }
1339     }
1340 
1341     armci_vapi_client_stage1 = 1;
1342 
1343     /* allocate and initialize connection structs */
1344     sz = armci_nproc*sizeof(uint32_t)/sizeof(int);
1345 
1346     if (armci_me == armci_master)
1347        armci_util_wait_int(&armci_vapi_server_stage2, 1, 10);
1348 #if 0
1349     for (c = 0; c < armci_nproc; c++){
1350        armci_connect_t *con = CLN_con + c;
1351        if (armci_me != armci_master) {
1352          char *ptrr;
1353          int extra;
1354          ptrr = malloc(8 + sizeof(uint32_t) * armci_nproc);
1355          extra = ALIGNLONGADD(ptrr);
1356          ptrr = ptrr + extra;
1357          con->rqpnum = (uint32_t *)ptrr;
1358          bzero(con->rqpnum, sizeof(uint32_t) * armci_nproc);
1359        }
1360        armci_msg_gop_scope(SCOPE_ALL, con->rqpnum, sz, "+", ARMCI_INT);
1361     }
1362 #else
1363     CLN_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1364 
1365     if (!armci_use_odcm) {
1366         if(armci_me != armci_master) {
1367             /*just has junk*/
1368             CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1369         }
1370         dassert(1, CLN_rqpnumtmpbuf);
1371         MPI_Alltoall(CLN_rqpnumtmpbuf, sizeof(uint32_t), MPI_CHAR,
1372                 CLN_rqpnums, sizeof(uint32_t), MPI_CHAR, ARMCI_COMM_WORLD);
1373         free(CLN_rqpnumtmpbuf);
1374         CLN_rqpnumtmpbuf=NULL;
1375 #endif
1376 
1377         if (TIME_INIT) printf("\n%d:wait for server tog et to stage 2 time for "
1378                 "vapi_connect_client is %f",
1379                 armci_me, (inittime2 = MPI_Wtime()) - inittime1);
1380         /*armci_set_serv_mh();*/
1381 
1382         if (DEBUG_CLN) {
1383             printf("%d:all connections ready\n", armci_me);
1384             fflush(stdout);
1385         }
1386 
1387         /* For sanity */
1388         memset(&qp_attr, 0, sizeof qp_attr);
1389         /* Modifying  QP to INIT */
1390         qp_attr_mask = IBV_QP_STATE
1391             | IBV_QP_PKEY_INDEX
1392             | IBV_QP_PORT
1393             | IBV_QP_ACCESS_FLAGS;
1394 
1395         qp_attr.qp_state = IBV_QPS_INIT;
1396         qp_attr.pkey_index = DEFAULT_PKEY_IX;
1397         qp_attr.port_num = SRV_nic->active_port;
1398         qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
1399 
1400         /* start from from server on my_node -1 */
1401         for (i = 0; i < armci_nclus; i++) {
1402             armci_connect_t *con;
1403             con = SRV_con + i;
1404             rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1405             dassertp(1,!rc,("%d: client RST->INIT i=%d rc=%d\n",armci_me,i,rc));
1406         }
1407 
1408         if (TIME_INIT) printf("\n%d:to init time for vapi_connect_client is %f",
1409                 armci_me, (inittime1 = MPI_Wtime()) - inittime2);
1410         qp_attr_mask = IBV_QP_STATE
1411             | IBV_QP_MAX_DEST_RD_ATOMIC
1412             | IBV_QP_PATH_MTU
1413             | IBV_QP_RQ_PSN
1414             | IBV_QP_MIN_RNR_TIMER;
1415         memset(&qp_attr, 0, sizeof qp_attr);
1416 
1417         qp_attr.qp_state        = IBV_QPS_RTR;
1418         qp_attr.max_dest_rd_atomic   = 4;
1419         qp_attr.path_mtu        = IBV_MTU_1024;
1420         qp_attr.rq_psn          = 0;
1421         qp_attr.min_rnr_timer   = RNR_TIMER;
1422 
1423         /* AV: Adding the service level parameter */
1424         qp_attr.ah_attr.sl      = armci_openib_sl;
1425 
1426         for (i = 0; i < armci_nclus; i++) {
1427             armci_connect_t *con;
1428 #if 0
1429             armci_connect_t *conS;
1430 #endif
1431             con = SRV_con + i;
1432 #if 0
1433             conS = CLN_con + armci_me;
1434 #endif
1435             qp_attr_mask |= IBV_QP_AV | IBV_QP_DEST_QPN;
1436 #if 0
1437             qp_attr.dest_qp_num = conS->rqpnum[armci_clus_info[i].master];
1438 #else
1439             qp_attr.dest_qp_num = CLN_rqpnums[armci_clus_info[i].master];
1440 #endif
1441             qp_attr.ah_attr.dlid = SRV_nic->lid_arr[armci_clus_info[i].master];
1442             qp_attr.ah_attr.port_num = SRV_nic->active_port;
1443 
1444             qp_attr.ah_attr.sl = armci_openib_sl;
1445 
1446             rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1447             dassertp(1,!rc,("%d: INIT->RTR client i=%d rc=%d\n",armci_me,i,rc));
1448         }
1449     }
1450     /*to to to RTS, other side must be in RTR*/
1451 
1452     armci_msg_barrier();
1453     if (TIME_INIT) printf("\n%d:init to rtr time for vapi_connect_client is %f",
1454                           armci_me, (inittime2 = MPI_Wtime()) - inittime1);
1455     armci_vapi_client_ready=1;
1456 
1457     if (!armci_use_odcm) {
1458 
1459         qp_attr_mask = IBV_QP_STATE
1460             | IBV_QP_SQ_PSN
1461             | IBV_QP_TIMEOUT
1462             | IBV_QP_RETRY_CNT
1463             | IBV_QP_RNR_RETRY
1464             | IBV_QP_MAX_QP_RD_ATOMIC;
1465 
1466         memset(&qp_attr, 0, sizeof qp_attr);
1467 
1468         qp_attr.qp_state            = IBV_QPS_RTS;
1469         qp_attr.sq_psn              = 0;
1470         qp_attr.timeout             = 18;
1471         qp_attr.retry_cnt           = 7;
1472         qp_attr.rnr_retry           = 7;
1473         qp_attr.max_rd_atomic  = 4;
1474 
1475         for (i = 0; i < armci_nclus; i++){
1476             armci_connect_t *con;
1477             con = SRV_con + i;
1478             rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1479             dassertp(1,!rc,("%d: client RTR->RTS i=%d rc=%d\n",armci_me,i,rc));
1480         }
1481         if (TIME_INIT) printf("\n%d:rtr to rts time for vapi_connect_client is %f",
1482                 armci_me, (inittime1 = MPI_Wtime()) - inittime2);
1483         free(CLN_rqpnums);
1484         CLN_rqpnums=NULL;
1485     }
1486 
1487 }
1488 
1489 
armci_client_connect_to_servers()1490 void armci_client_connect_to_servers()
1491 {
1492     extern void armci_util_wait_int(volatile int *,int,int);
1493     if (TIME_INIT) inittime0 = MPI_Wtime();
1494     _armci_buf_init();
1495 
1496     vapi_connect_client();
1497     if (armci_me == armci_master)
1498        armci_util_wait_int(&armci_vapi_server_ready,1,10);
1499     armci_msg_barrier();
1500     if (DEBUG_CLN && armci_me == armci_master) {
1501        printf("\n%d:server_ready=%d\n",armci_me,armci_vapi_server_ready);
1502        fflush(stdout);
1503     }
1504     if (TIME_INIT) printf("\n%d:time for client_connect_to_s is %f",
1505                           armci_me,MPI_Wtime()-inittime0);
1506 }
1507 
1508 
armci_init_vapibuf_recv(struct ibv_recv_wr * rd,struct ibv_sge * sg_entry,char * buf,int len,armci_vapi_memhndl_t * mhandle)1509 void armci_init_vapibuf_recv(struct ibv_recv_wr *rd, struct ibv_sge *sg_entry,
1510                              char *buf, int len, armci_vapi_memhndl_t *mhandle)
1511 {
1512      memset(rd,0,sizeof(struct ibv_recv_wr));
1513      rd->next = NULL;
1514      rd->num_sge    = 1;
1515      rd->sg_list    = sg_entry;
1516      rd->wr_id      = 0;
1517 
1518      sg_entry->lkey     = mhandle->lkey;
1519      sg_entry->addr     = (uint64_t)buf;
1520      sg_entry->length   = len;
1521 }
1522 
1523 
armci_init_vapibuf_send(struct ibv_send_wr * sd,struct ibv_sge * sg_entry,char * buf,int len,armci_vapi_memhndl_t * mhandle)1524 void armci_init_vapibuf_send(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
1525                              char *buf, int len, armci_vapi_memhndl_t *mhandle)
1526 {
1527      sd->opcode = IBV_WR_SEND;
1528      sd->next = NULL;
1529      sd->send_flags = IBV_SEND_SIGNALED;
1530      sd->num_sge            = 1;
1531      sd->sg_list            = sg_entry;
1532 
1533      sg_entry->lkey     = mhandle->lkey;
1534      sg_entry->addr     = (uint64_t)buf;
1535      sg_entry->length   = len;
1536 }
1537 
1538 
armci_init_cbuf_srdma(struct ibv_send_wr * sd,struct ibv_sge * sg_entry,char * lbuf,char * rbuf,int len,armci_vapi_memhndl_t * lhandle,armci_vapi_memhndl_t * rhandle)1539 static void armci_init_cbuf_srdma(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
1540                                   char *lbuf, char *rbuf, int len,
1541                                   armci_vapi_memhndl_t *lhandle,
1542                                   armci_vapi_memhndl_t *rhandle)
1543 {
1544      /* NOTE: sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
1545      sd->opcode = IBV_WR_RDMA_WRITE;
1546      sd->send_flags = IBV_SEND_SIGNALED;
1547      sd->next = NULL;
1548      sd->num_sge                    = 1;
1549      sd->sg_list                    = sg_entry;
1550      if (rhandle) sd->wr.rdma.rkey  = rhandle->rkey;
1551      sd->wr.rdma.remote_addr        = (uint64_t)rbuf;
1552 
1553      if (lhandle) sg_entry->lkey    = lhandle->lkey;
1554      sg_entry->addr                 = (uint64_t)lbuf;
1555      sg_entry->length               = len;
1556 }
1557 
1558 
armci_init_cbuf_rrdma(struct ibv_send_wr * sd,struct ibv_sge * sg_entry,char * lbuf,char * rbuf,int len,armci_vapi_memhndl_t * lhandle,armci_vapi_memhndl_t * rhandle)1559 static void armci_init_cbuf_rrdma(struct ibv_send_wr *sd, struct ibv_sge
1560         *sg_entry, char *lbuf, char *rbuf, int len, armci_vapi_memhndl_t
1561         *lhandle, armci_vapi_memhndl_t *rhandle)
1562 {
1563      sd->opcode = IBV_WR_RDMA_READ;
1564      sd->next = NULL;
1565      sd->send_flags = IBV_SEND_SIGNALED;
1566      sd->num_sge                    = 1;
1567      sd->sg_list                    = sg_entry;
1568      sd->wr.ud.remote_qkey          = 0;
1569      if (rhandle) sd->wr.rdma.rkey  = rhandle->rkey;
1570      sd->wr.rdma.remote_addr        = (uint64_t)rbuf;
1571 
1572      if (lhandle) sg_entry->lkey    = lhandle->lkey;
1573      sg_entry->addr                 = (uint64_t)lbuf;
1574      sg_entry->length               = len;
1575      /* sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
1576 }
1577 
1578 
armci_server_initial_connection()1579 void armci_server_initial_connection()
1580 {
1581   int c, rc, i, j;
1582     struct ibv_qp_attr qp_attr;
1583     enum ibv_qp_attr_mask qp_attr_mask;
1584     struct ibv_recv_wr *bad_wr;
1585 
1586     if (TIME_INIT)
1587         inittime0 = MPI_Wtime();
1588 
1589     if (DEBUG_SERVER) {
1590         printf("in server after fork %d (%d)\n",armci_me,getpid());
1591         fflush(stdout);
1592     }
1593 
1594 #if defined(PEND_BUFS) && !defined(SERVER_THREAD)
1595     armci_pbuf_init_buffer_env();
1596 #endif
1597     armci_init_nic(CLN_nic,1,1);
1598     if (!armci_openib_server_poll) {
1599 	/*
1600 	 * Start a notify event request immediately after creation so
1601 	 * nothing is missed.
1602 	 */
1603 	rc = ibv_req_notify_cq(CLN_nic->rcq, 0);
1604 	dassert1(1,rc==0,rc);
1605     }
1606 
1607     _gtmparr[armci_me] = CLN_nic->lid_arr[armci_me];
1608     armci_vapi_server_stage1 = 1;
1609     armci_util_wait_int(&armci_vapi_client_stage1, 1, 10);
1610 
1611     CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1612     dassert(1, CLN_rqpnumtmpbuf);
1613 
1614     if (!armci_use_odcm) {
1615         for (c = 0; c < armci_nproc; c++) {
1616             armci_connect_t *con = CLN_con + c;
1617             armci_create_qp(CLN_nic, &con->qp);
1618             con->sqpnum = con->qp->qp_num;
1619             con->lid    = CLN_nic->lid_arr[c];
1620             CLN_rqpnumtmpbuf[c] = con->qp->qp_num;
1621         }
1622     }
1623     else {
1624         for (c = 0; c < armci_nproc; c++) {
1625             armci_connect_t *con = CLN_con + c;
1626             con->state = QP_INACTIVE;
1627         }
1628     }
1629 
1630     armci_vapi_server_stage2 = 1;
1631 
1632     if (!armci_use_odcm) {
1633     qp_attr_mask = IBV_QP_STATE
1634                  | IBV_QP_PKEY_INDEX
1635                  | IBV_QP_PORT
1636                  | IBV_QP_ACCESS_FLAGS;
1637 
1638     memset(&qp_attr, 0, sizeof qp_attr);
1639     qp_attr.qp_state        = IBV_QPS_INIT;
1640     qp_attr.pkey_index      = DEFAULT_PKEY_IX;
1641     qp_attr.port_num        = CLN_nic->active_port;
1642     qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
1643         IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
1644 
1645     for (c = 0; c < armci_nproc; c++) {
1646        armci_connect_t *con = CLN_con + c;
1647        rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1648        dassertp(1,!rc,("%d: RTS->INIT server c=%d rc=%d\n",armci_me,c,rc));
1649     }
1650 
1651     memset(&qp_attr, 0, sizeof qp_attr);
1652     qp_attr_mask = IBV_QP_STATE
1653                  | IBV_QP_MAX_DEST_RD_ATOMIC
1654                  | IBV_QP_PATH_MTU
1655                  | IBV_QP_RQ_PSN
1656                  | IBV_QP_MIN_RNR_TIMER;
1657     qp_attr.qp_state           = IBV_QPS_RTR;
1658     qp_attr.path_mtu           = IBV_MTU_1024;
1659     qp_attr.max_dest_rd_atomic = 4;
1660     qp_attr.min_rnr_timer      = RNR_TIMER;
1661     qp_attr.rq_psn             = 0;
1662 
1663     for(c = 0; c < armci_nproc; c++) {
1664        armci_connect_t *con = CLN_con + c;
1665        qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
1666        qp_attr.dest_qp_num  = SRV_rqpnums[c];
1667        qp_attr.ah_attr.dlid = SRV_nic->lid_arr[c];
1668        qp_attr.ah_attr.port_num = CLN_nic->active_port;
1669 
1670        rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1671        dassertp(1,!rc,("%d: INIT->RTR server cln=%d rc=%d\n",armci_me,c,rc));
1672     }
1673     }
1674     armci_util_wait_int(&armci_vapi_client_ready,1,10);
1675     memset(&qp_attr, 0, sizeof qp_attr);
1676 
1677     if (!armci_use_odcm) {
1678     qp_attr_mask = IBV_QP_STATE
1679                  | IBV_QP_SQ_PSN
1680                  | IBV_QP_TIMEOUT
1681                  | IBV_QP_RETRY_CNT
1682                  | IBV_QP_RNR_RETRY
1683                  | IBV_QP_MAX_QP_RD_ATOMIC;
1684 
1685     qp_attr.qp_state            = IBV_QPS_RTS;
1686     qp_attr.sq_psn              = 0;
1687     qp_attr.timeout             = 18;
1688     qp_attr.retry_cnt           = 7;
1689     qp_attr.rnr_retry           = 7;
1690     qp_attr.max_rd_atomic  = 4;
1691 
1692     for (c = 0; c < armci_nproc; c++) {
1693        armci_connect_t *con = CLN_con + c;
1694        rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
1695        dassertp(1,!rc,("%d: server RTR->RTS cln=%d rc=%d\n",armci_me,c,rc));
1696     }
1697     free(SRV_rqpnums);
1698     SRV_rqpnums = NULL;
1699     }
1700 
1701     armci_server_alloc_bufs();
1702 
1703     if (!armci_use_odcm) {
1704     /* setup descriptors and post nonblocking receives */
1705 #if defined(PEND_BUFS)
1706     assert(armci_nproc*(IMM_BUF_NUM+1)<DSCRID_IMMBUF_RECV_END-DSCRID_IMMBUF_RECV);
1707     for(i =  0; i < armci_nproc; i++) {
1708       for(j=0; j<IMM_BUF_NUM+1; j++) {
1709 	vapibuf_t *cbuf;
1710 	cbuf = serv_buf_arr[i*(IMM_BUF_NUM+1)+j];
1711 	armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
1712 				IMM_BUF_LEN, &serv_memhandle);
1713 	/* we use index of the buffer to identify the buffer, this index is
1714 	 * returned with a call to ibv_poll_cq inside the ibv_wr */
1715 	cbuf->dscr.wr_id = i*(IMM_BUF_NUM+1)+j + DSCRID_IMMBUF_RECV;
1716 	if (DEBUG_SERVER) {
1717 	  printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
1718 	  fflush(stdout);
1719 	}
1720 	rc = ibv_post_recv((CLN_con+i)->qp, &cbuf->dscr, &bad_wr);
1721 	dassert1(1,rc==0,rc);
1722       }
1723     }
1724 #else
1725     for(i =  0; i < armci_nproc; i++) {
1726       vapibuf_t *cbuf;
1727       cbuf = serv_buf_arr[i];
1728       armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
1729 			      CBUF_DLEN, &serv_memhandle);
1730       /* we use index of the buffer to identify the buffer, this index is
1731        * returned with a call to ibv_poll_cq inside the ibv_wr */
1732       cbuf->dscr.wr_id = i+armci_nproc;
1733       if (DEBUG_SERVER) {
1734 	printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
1735 	fflush(stdout);
1736       }
1737       rc = ibv_post_recv((CLN_con+i)->qp, &cbuf->dscr, &bad_wr);
1738       dassert1(1,rc==0,rc);
1739     }
1740 #endif
1741     }
1742     if (TIME_INIT) printf("\n%d:post time for server_initial_conn is %f",
1743                           armci_me, MPI_Wtime() - inittime4);
1744 
1745     armci_vapi_server_ready=1;
1746 
1747     if (DEBUG_SERVER) {
1748        printf("%d: server connected to all clients\n",armci_me); fflush(stdout);
1749     }
1750     if (TIME_INIT) printf("\n%d:time for server_initial_conn is %f",
1751                           armci_me, MPI_Wtime() - inittime0);
1752 }
1753 
armci_finalize_nic(vapi_nic_t * nic)1754 static void armci_finalize_nic(vapi_nic_t *nic)
1755 {
1756     int ret;
1757 
1758     ret = ibv_destroy_cq(nic->scq);
1759     dassert1(1,ret==0,ret);
1760     armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_scq");
1761 
1762     ret = ibv_destroy_comp_channel(nic->sch);
1763     dassert1(1,ret==0,ret);
1764     armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_sch");
1765 
1766     ret = ibv_destroy_cq(nic->rcq);
1767     dassert1(1,ret==0,ret);
1768     armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rcq");
1769 
1770     ret = ibv_destroy_comp_channel(nic->rch);
1771     dassert1(1,ret==0,ret);
1772     armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rch");
1773 
1774     ret = ibv_close_device(nic->handle);
1775     dassert1(1,ret==0,ret);
1776 
1777     armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:release_hca");
1778 
1779 }
1780 
1781 
armci_server_transport_cleanup()1782 void armci_server_transport_cleanup()
1783 {
1784     int s;
1785     int rc;
1786 
1787     /*first we have empty send/recv queues TBD*/
1788     if(serv_malloc_buf_base){
1789         rc = ibv_dereg_mr(serv_memhandle.memhndl);
1790 	dassert1(1,rc==0,rc);
1791         armci_vapi_check_return(DEBUG_FINALIZE,rc,
1792                                 "armci_server_transport_cleanup:deregister_mr");
1793        /*now free it*/
1794        free(serv_malloc_buf_base);
1795     }
1796     /*now deregister all my regions from regionskk.c*/
1797     armci_server_region_destroy();
1798     if (CLN_con) {
1799         for (s = 0; s < armci_nproc; s++) {
1800             armci_connect_t *con = CLN_con + s;
1801             if (con->qp) {
1802                 rc = ibv_destroy_qp(con->qp);
1803                 armci_vapi_check_return(DEBUG_FINALIZE,rc,
1804                                         "armci_server_transport_cleanup:destroy_qp");
1805             }
1806 #if 0
1807             free(con->rqpnum);
1808 #endif
1809         }
1810         free(CLN_con);
1811     }
1812     armci_finalize_nic(CLN_nic);
1813 }
1814 
armci_transport_cleanup()1815 void armci_transport_cleanup()
1816 {
1817     int s;
1818     int rc;
1819 
1820     /*first deregister buffers memory */
1821     if (client_malloc_buf_base) {
1822         rc = ibv_dereg_mr(client_memhandle.memhndl);
1823 	dassert1(1,rc==0,rc);
1824         armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:deregister_mr");
1825         /*now free it*/
1826         free(client_malloc_buf_base);
1827     }
1828     /*now deregister all my regions from regions.c*/
1829     armci_region_destroy();
1830     if (SRV_con) {
1831         for (s = 0; s < armci_nclus; s++) {
1832             armci_connect_t *con = SRV_con + s;
1833             if (con->qp) {
1834                 rc = ibv_destroy_qp(con->qp);
1835 		dassert1(1,rc==0,rc);
1836                 armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:destroy_qp");
1837             }
1838 #if 0
1839             free(con->rqpnum);
1840 #endif
1841         }
1842         free(SRV_con);
1843     }
1844     armci_finalize_nic(SRV_nic);
1845 }
1846 
1847 /** Post an immediate buffer back for the client to send.
1848  */
_armci_pendbuf_post_immbuf(vapibuf_t * cbuf,int to)1849 static void _armci_pendbuf_post_immbuf(vapibuf_t *cbuf, int to) {
1850   int rc;
1851   struct ibv_recv_wr *bad_wr;
1852 #if defined(PEND_BUFS)
1853   assert(cbuf->dscr.wr_id == cbuf-serv_buf_arr[0]+DSCRID_IMMBUF_RECV);
1854 #endif
1855   rc = ibv_post_recv((CLN_con+to)->qp, &(cbuf->dscr), &bad_wr);
1856   dassert1(1,rc==0,rc);
1857 }
1858 
1859 #if defined(PEND_BUFS)
1860 #define DSCRID_TO_IMMBUFID(x) (x-DSCRID_IMMBUF_RECV)
1861 #else
1862 #define DSCRID_TO_IMMBUFID(x) ((x)-armci_nproc)
1863 #endif
1864 
1865 #if defined(PEND_BUFS)
1866 
1867 /**Obtain a message receive buffer to receive a message. Used in place
1868  *  of MessageRcvBuffer. Should not be used.
1869  */
armci_openib_get_msg_rcv_buf(int proc)1870 char *armci_openib_get_msg_rcv_buf(int proc)
1871 {
1872   armci_die("PEND_BUFS in OPENIB: MessageRcvBuffer not available. Should use the in-place buffers to receive data", proc);
1873   return NULL;
1874 }
1875 
1876 /** Check that the data is in a server allocated buffer. This is
1877  *   guaranteed to be pinned. Ideally, this should always be true. Any
1878  *   operation that request alternative support will have to fix this
1879  *   function and possibly @armci_openib_get_msg_rcv_buf().
1880  * @param br IN Buffer pointer being checked
1881  * @return 1 if it is a server-allocated buffer. 0 otherwise.
1882  */
armci_data_in_serv_buf(void * br)1883 int armci_data_in_serv_buf(void *br)
1884 {
1885   if(br>=(void *)serv_malloc_buf_base && br<(void *)serv_tail)
1886     return 1;
1887   if(DEBUG_SERVER) {
1888     printf("%d:: serv_bufs=%p<->%p. br=%p out of range\n",
1889 	   armci_me, serv_malloc_buf_base, serv_tail, br);
1890     fflush(stdout);
1891   }
1892   return 0;
1893 }
1894 
1895 #define PBUF_BUFID_TO_PUT_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2)
1896 #define PBUF_BUFID_TO_GET_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2+1)
1897 #define PBUF_WRID_TO_PBUFID(_id) (((_id)-DSCRID_PENDBUF)/2)
1898 #define PBUF_IS_GET_WRID(_id) (((_id)-DSCRID_PENDBUF)&1)
1899 #define PBUF_IS_PUT_WRID(_id) (!(((_id)-DSCRID_PENDBUF)&1))
1900 
1901 /**Complete processing this immediate buffer. Parameters is void *,
1902  * since vapibuf_t*|immbuf_t* is not available in armci-vapi.h
1903  */
armci_complete_immbuf(void * buf)1904 void armci_complete_immbuf(void *buf) {
1905   vapibuf_t *cbuf = (vapibuf_t*)buf;
1906   request_header_t *msginfo=(request_header_t*)cbuf->buf;
1907 
1908 #if SRI_CORRECT
1909 #error
1910   cbuf->send_pending = 0;
1911 #else
1912     _armci_pendbuf_post_immbuf(cbuf,msginfo->from);
1913 #endif
1914   armci_data_server(cbuf);
1915   if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
1916     SERVER_SEND_ACK(msginfo->from);
1917   }
1918 #if SRI_CORRECT
1919   if(!cbuf->send_pending) {
1920     _armci_pendbuf_post_immbuf(cbuf,msginfo->from);
1921   }
1922 #endif
1923 }
1924 
1925 /**Complete processing this pending buffer. Parameters is void *,
1926  * since vapibuf_t*|immbuf_t* is not available in armci-vapi.h. Note
1927  * that the pending buffer may not yet be available for reuse. This
1928  * will depend on the state of the pending buffer (which might have to
1929  * wait for a communication innitiated by armci_data_server() to
1930  * complete.
1931  */
armci_complete_pendbuf(void * buf)1932 void armci_complete_pendbuf(void *buf) {
1933   vapibuf_pend_t *pbuf = (vapibuf_pend_t *)buf;
1934   request_header_t *msginfo=(request_header_t*)pbuf->buf;
1935 
1936   assert(pbuf->vbuf);
1937 #if SRI_CORRECT
1938   pbuf->cbuf->send_pending=0;
1939 #else
1940   _armci_pendbuf_post_immbuf(pbuf->vbuf,msginfo->from);
1941 #endif
1942   armci_data_server(pbuf);
1943   if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
1944     SERVER_SEND_ACK(msginfo->from);
1945   }
1946 #if SRI_CORRECT
1947 #error
1948  assert(!pbuf->cbuf->send_pending);
1949   _armci_pendbuf_post_immbuf(pbuf->cbuf,msginfo->from);
1950 #endif
1951 }
1952 
1953 void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr,
1954 				 int dscrid, struct ibv_sge *ssg_entry,
1955 				 void *rbuf, void *lbuf, int bytes) ;
1956 void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr,
1957 				     int dscrid, struct ibv_sge *ssg_entry,
1958 				     void *rbuf, void *lbuf, int bytes);
1959 
no_srv_copy_nsegs_ulimit()1960 int no_srv_copy_nsegs_ulimit() {
1961   return armci_max_qp_ous_swr*armci_max_num_sg_ent/10;
1962 }
1963 
1964 /** Initiate a get operation to progress a pending buffer.
1965  * @param msginfo Request header for any additional processing
1966  * @param src Pointer to src of data (remote for GET)
1967  * @param dst Pointer to dst
1968  * @param bytes #bytes to transfer
1969  * @param proc proc to transfer from(for get)/to(for put)
1970  * @param pbufid Index of pending buffer
1971  */
armci_pbuf_start_get(void * msg_info,void * src,void * dst,int bytes,int proc,int pbufid)1972 void armci_pbuf_start_get(void *msg_info, void *src, void *dst,
1973 			  int bytes, int proc, int pbufid) {
1974   struct ibv_send_wr sdscr;
1975   struct ibv_sge sg_entry;
1976   int wrid = PBUF_BUFID_TO_GET_WRID(pbufid);
1977   request_header_t *msginfo=(request_header_t *)msg_info;
1978   void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
1979 					   char *dst_ptr,
1980 					   int dst_stride_arr[],
1981 					   int seg_count[],
1982 					   int stride_levels,
1983 					   request_header_t *msginfo);
1984 
1985 
1986 #if defined(PUT_NO_SRV_COPY)
1987   if(msginfo->operation==PUT && msginfo->format==STRIDED
1988      && !msginfo->pinned && src==msginfo->tag.data_ptr)   {
1989     char *loc_ptr, *rem_ptr;
1990     int stride_levels, *count;
1991     int *loc_stride_arr;
1992     char *dscr = (char *)(msginfo+1);
1993     ARMCI_MEMHDL_T *mhloc=NULL;
1994     int nsegs, i;
1995 
1996     /* unpack descriptor record */
1997     loc_ptr = *(void**)dscr;           dscr += sizeof(void*);
1998     stride_levels = *(int*)dscr;       dscr += sizeof(int);
1999     loc_stride_arr = (int*)dscr;       dscr += stride_levels*sizeof(int);
2000     count = (int*)dscr;
2001 
2002     rem_ptr = msginfo->tag.data_ptr;
2003 
2004     nsegs = 1;
2005     for(i=0; i<stride_levels; i++)
2006       nsegs *= count[i+1];
2007 
2008     dassert(1,proc==msginfo->from);
2009     if(nsegs<no_srv_copy_nsegs_ulimit() &&
2010        get_armci_region_local_hndl(loc_ptr,armci_clus_id(armci_me),&mhloc)) {
2011 /*       printf("%d(s): direct rdma from client buffers to server-side memory\n",armci_me); */
2012 /*       fflush(stdout); */
2013 
2014       armci_server_rdma_contig_to_strided(rem_ptr, proc,
2015 					  loc_ptr,loc_stride_arr,
2016 					  count, stride_levels,
2017 					  msginfo);
2018     return;
2019    }
2020   }
2021 #endif
2022 /*   printf("%d(s): rdma from client buffers to pending buffers\n",armci_me); */
2023 /*   fflush(stdout);   */
2024   _armci_get_data_from_client(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
2025 }
2026 
2027 /** Initiate a put operation to progress a pending buffer.
2028  * @param src Pointer to src of data (local for PUT)
2029  * @param dst Pointer to dst
2030  * @param bytes #bytes to transfer
2031  * @param proc proc to transfer from(for get)/to(for put)
2032  * @param pbufid Index of pending buffer
2033  */
armci_pbuf_start_put(void * src,void * dst,int bytes,int proc,int pbufid)2034 void armci_pbuf_start_put(void *src, void *dst, int bytes, int proc,
2035 			  int pbufid) {
2036   struct ibv_send_wr sdscr;
2037   struct ibv_sge sg_entry;
2038   int wrid = PBUF_BUFID_TO_PUT_WRID(pbufid);
2039 
2040   _armci_send_data_to_client_pbuf(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
2041 }
2042 
2043 /**
2044   * function to get data from remote client called by data
2045   * server. Note that this is only called for pending buffers.
2046   * @param proc IN the id of remote client
2047   * @param sdscr IN/OUT Descriptor to be used to post the get
2048   * @param dscrid IN ID to be used for the descriptor
2049   * @param ssg_entry IN Scatter/gather list
2050   * @param rbuf IN the remote buffer to get from
2051   * @param lbuf IN local buf to get the data into, this is the queue buffer for SERVER_QUEUE path
2052   * @param bytes IN the size of get
2053   * @see SERVER_QUEUE
2054   * @see armci_send_data_to_client
2055   */
_armci_get_data_from_client(int proc,struct ibv_send_wr * sdscr,int dscrid,struct ibv_sge * ssg_entry,void * rbuf,void * lbuf,int bytes)2056 /*static*/ void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr,
2057 				int dscrid, struct ibv_sge *ssg_entry,
2058 				void *rbuf, void *lbuf, int bytes)
2059 {
2060     int rc = 0;
2061 
2062     if(DEBUG_SERVER){
2063        printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
2064                armci_me,
2065                proc,lbuf,(char *)lbuf+bytes-sizeof(int),bytes);fflush(stdout);
2066     }
2067 
2068     memset(sdscr,0,sizeof(struct ibv_send_wr));
2069     armci_init_cbuf_rrdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
2070                           &serv_memhandle,(handle_array+proc));
2071 
2072     if(DEBUG_SERVER){
2073        printf("\n%d(s):handle_array[%d]=%p lbuf=%p flag=%p bytes=%d\n",armci_me,
2074               proc,(void*)&handle_array[proc],(char *)lbuf,
2075               (char *)lbuf+bytes-sizeof(int),bytes);
2076        fflush(stdout);
2077     }
2078 
2079     assert(sizeof(request_header_t)+bytes<PENDING_BUF_LEN);
2080 
2081     sdscr->wr_id = dscrid;
2082     struct ibv_send_wr *bad_wr;
2083     rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
2084     dassert1(1,rc==0,rc);
2085 }
2086 
_armci_send_data_to_client_pbuf(int proc,struct ibv_send_wr * sdscr,int dscrid,struct ibv_sge * ssg_entry,void * rbuf,void * lbuf,int bytes)2087 void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr,
2088 				     int dscrid, struct ibv_sge *ssg_entry,
2089 				     void *rbuf, void *lbuf, int bytes)  {
2090     int rc = 0;
2091 
2092     if(DEBUG_SERVER) {
2093        printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
2094                armci_me,
2095                proc,rbuf,(char *)rbuf+bytes-sizeof(int),bytes);fflush(stdout);
2096     }
2097     memset(sdscr,0,sizeof(struct ibv_send_wr));
2098     armci_init_cbuf_srdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
2099                           &serv_memhandle,(handle_array+proc));
2100     if(DEBUG_SERVER){
2101        printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
2102               proc,(void*)&handle_array[proc],(char *)rbuf,
2103               (char *)rbuf+bytes-sizeof(int),bytes);
2104        fflush(stdout);
2105     }
2106     sdscr->wr_id = dscrid;
2107     struct ibv_send_wr *bad_wr;
2108     rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
2109     dassert1(1,rc==0,rc);
2110 }
2111 #endif
2112 
2113 #define DATA_SERVER_YIELD_CPU
armci_call_data_server()2114 void armci_call_data_server()
2115 {
2116 int rc = 0;
2117 int rc1 = 0;
2118 vapibuf_t *cbuf,*cbufs;
2119 request_header_t *msginfo,*msg;
2120 int c,i;
2121 static int mytag=1;
2122 #ifdef CHANGE_SERVER_AFFINITY
2123 int rrr,serverwcount=0;
2124 #else
2125 #ifdef DATA_SERVER_YIELD_CPU_
2126 int serverwcount=0;
2127 #endif
2128 #endif
2129 
2130 #ifdef CHANGE_SERVER_AFFINITY
2131 cpu_set_t mycpuid,new_mask;
2132 char str[CPU_SETSIZE];
2133 char cid[8];
2134 extern char * cpuset_to_cstr(cpu_set_t *mask, char *str);
2135 int nslave=armci_clus_info[armci_clus_me].nslave;
2136     rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
2137 #endif
2138 
2139 #if ARMCI_ENABLE_GPC_CALLS
2140     unblock_thread_signal(GPC_COMPLETION_SIGNAL);
2141 #endif
2142 #if defined(PEND_BUFS)
2143     armci_pendbuf_init();
2144 #endif
2145 
2146     for (;;) {
2147       struct ibv_wc *pdscr=NULL;
2148       struct ibv_wc pdscr1;
2149       pdscr = &pdscr1;
2150       pdscr->status = IBV_WC_SUCCESS;
2151       rc = 0;
2152 #ifdef CHANGE_SERVER_AFFINITY
2153       static int ccc;
2154       serverwcount++;
2155       if(serverwcount==100){
2156         serverwcount=0;
2157         ccc=(ccc+1)%nslave;
2158         sprintf (cid, "%d", ccc);
2159         rrr = cstr_to_cpuset(&new_mask,cid);
2160         if (sched_setaffinity(0, sizeof (new_mask), &new_mask)) {
2161           perror("sched_setaffinity");
2162           printf("failed to set pid %d's affinity.\n", getpid());
2163         }
2164         rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
2165         if(rrr)perror("sched_getaffinity");
2166       }
2167 #else
2168 #ifdef DATA_SERVER_YIELD_CPU_
2169       serverwcount++;
2170       if(serverwcount==50){
2171         serverwcount=0;usleep(1);
2172       }
2173 #endif
2174 #endif
2175 
2176 #if ARMCI_ENABLE_GPC_CALLS
2177       block_thread_signal(GPC_COMPLETION_SIGNAL);
2178 #endif
2179       bzero(pdscr, sizeof(*pdscr));
2180       do {
2181         rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
2182 	if (armci_server_terminating) {
2183 	  /* server is interrupted when clients terminate connections */
2184 	  armci_server_transport_cleanup();
2185 	  sleep(1);
2186 	  _exit(0);
2187 	}
2188 	if (rc == 0 && !armci_openib_server_poll) {
2189 	  /* wait for a notify event */
2190           rc1 = ibv_get_cq_event(CLN_nic->rch,&CLN_nic->rcq,&CLN_nic->rcq_cntx);
2191           dassert1(1,rc1==0,rc1);
2192           ibv_ack_cq_events(CLN_nic->rcq, 1);
2193 	  /* re-arm notify event */
2194           rc1 = ibv_req_notify_cq(CLN_nic->rcq, 0);
2195           dassert1(1,rc1==0,rc1);
2196 	  /* note: an event receive does not guarantee an actual completion */
2197 	  continue;
2198 	}
2199       } while (rc == 0);
2200 
2201       if(DEBUG_SERVER) {
2202         printf("\n%d:pdscr=%p %p %d %d %d %d\n",armci_me,(void*)pdscr,(void*)&pdscr1,
2203                            pdscr->status,pdscr->opcode,pdscr->vendor_err,
2204                            pdscr->src_qp);
2205         fflush(stdout);
2206       }
2207       dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d",
2208 			armci_me,rc,(int)pdscr->wr_id,pdscr->status));
2209       dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
2210 
2211        if (DEBUG_SERVER) {
2212          printf("%d(s) : NEW MESSAGE bytelen %d \n",armci_me,pdscr->byte_len);
2213          printf("%d(s) : NEW MESSAGE id is %ld \n",armci_me,pdscr->wr_id);
2214          fflush(stdout);
2215        }
2216 #if defined(PEND_BUFS)
2217       if(pdscr->wr_id>=DSCRID_IMMBUF_RESP && pdscr->wr_id<DSCRID_IMMBUF_RESP_END) {
2218 /* 	fprintf(stderr, "%d(s) : Got server response msg completion\n", armci_me); */
2219 #if SRI_CORRECT
2220 	int id = pdscr->wr_id - DSCRID_IMMBUF_RESP;
2221 	if(id>=0 && id<armci_nproc*(IMM_BUF_NUM+1)) {
2222 	  int dest = id/(IMM_BUF_NUM+1);
2223 	  dassert(1,serv_buf_arr[id]->send_pending==1);
2224 	  serv_buf_arr[id]->send_pending = 0;
2225 	  _armci_pendbuf_post_immbuf(serv_buf_arr[id],dest);
2226 	}
2227 #endif
2228 	continue;
2229       }
2230        if (pdscr->wr_id>=DSCRID_PENDBUF && pdscr->wr_id<DSCRID_PENDBUF_END) {
2231 	 int pbufid = PBUF_WRID_TO_PBUFID(pdscr->wr_id);
2232 /* 	 printf("%d(s) : Progressing pending msg (something completed) pbufid=%d id=%ld byte_len=%d status=%d\n", armci_me, pbufid,pdscr->wr_id,pdscr->byte_len,done_status); */
2233 /* 	 fflush(stdout); */
2234 	 if(PBUF_IS_GET_WRID(pdscr->wr_id))
2235 	   armci_pendbuf_done_get(pbufid);
2236 	 else if(PBUF_IS_PUT_WRID(pdscr->wr_id))
2237 	   armci_pendbuf_done_put(pbufid);
2238 	 else
2239 	   armci_die("Pending buffer op completed. But not PUT or GET!",pdscr->wr_id);
2240 	 continue;
2241        }
2242 #endif
2243        if (pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END) {
2244 #if defined(PEND_BUFS)
2245 	 sr_descr_t *sdscr_arr;
2246 #else
2247 	 sr_descr_t *rdscr_arr;
2248 #endif
2249          if (DEBUG_SERVER) {
2250            printf("%d(s) : received SCATGAT DATA id = %ld, length = %d\n",
2251                   armci_me,pdscr->wr_id, pdscr->byte_len);
2252            fflush(stdout);
2253 	 }
2254 #if defined(PEND_BUFS)
2255 	 sdscr_arr = armci_vapi_serv_nbsdscr_array;
2256 	 assert(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends>0);
2257 	 sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
2258 	 if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
2259 	     sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
2260 #else
2261 	 rdscr_arr = armci_vapi_serv_nbrdscr_array;
2262 	 rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
2263 	 if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
2264 	   rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
2265 #endif
2266          continue;
2267        }
2268 
2269 #if defined(PEND_BUFS)
2270        assert(pdscr->wr_id>=DSCRID_IMMBUF_RECV && pdscr->wr_id<DSCRID_IMMBUF_RECV_END);
2271 #endif
2272        cbuf = serv_buf_arr[DSCRID_TO_IMMBUFID(pdscr->wr_id)];
2273        assert(cbuf->dscr.wr_id == pdscr->wr_id);
2274 
2275        msginfo = (request_header_t*)cbuf->buf;
2276        armci_ack_proc = c = msginfo->from;
2277 
2278        if (DEBUG_SERVER) {
2279          printf("%d(s) : request id is %ld operation is %d, length is %d from=%d cbuf->dscr.wr_id=%d\n",
2280 		armci_me,pdscr->wr_id,msginfo->operation,pdscr->byte_len,msginfo->from, (int)cbuf->dscr.wr_id);
2281          fflush(stdout);
2282        }
2283 
2284 #if defined(PEND_BUFS)
2285        cbufs = cbuf;
2286        armci_init_vapibuf_recv(&cbufs->dscr, &cbufs->sg_entry,cbufs->buf,
2287 			       IMM_BUF_LEN, &serv_memhandle);
2288        cbufs->dscr.wr_id = pdscr->wr_id;
2289 #else
2290        cbufs = serv_buf_arr[pdscr->wr_id - armci_nproc] = spare_serv_buf;
2291        armci_init_vapibuf_recv(&cbufs->dscr, &cbufs->sg_entry,cbufs->buf,
2292 			       CBUF_DLEN, &serv_memhandle);
2293        cbufs->dscr.wr_id = c + armci_nproc;
2294 
2295        spare_serv_buf = cbuf;
2296 #endif
2297 
2298        if(DEBUG_SERVER) {
2299 	 printf("%d(s):Came out of poll id=%ld\n",armci_me,pdscr->wr_id);
2300 	 fflush(stdout);
2301        }
2302 
2303        if(msginfo->operation == PUT &&msginfo->pinned == 1){
2304 	 int found, num;
2305 	 int stride_arr[MAX_STRIDE_LEVEL]; /*should be MAX_STRIDE_LEVELS*/
2306 	 int count[MAX_STRIDE_LEVEL];
2307 	 void *dest_ptr;
2308 	 int stride_levels;
2309 	 ARMCI_MEMHDL_T *loc_memhandle;
2310 	 void armci_post_scatter(void *,int *,int *,int, armci_vapi_memhndl_t *,int,int,int,sr_descr_t **);
2311 
2312 	 /*unpack decsriptor_record : should call a function instead */
2313 	 msg = msginfo + 1;
2314 	 test_ptr = dest_ptr = *(void**)msg;
2315 	 msg = (request_header_t *) ((char*)msg + sizeof(void*));
2316 	 test_stride_levels=stride_levels = *(int*)msg;
2317 	 msg = (request_header_t *) ((char*)msg + sizeof(int));
2318 	 for(i =0; i<stride_levels; i++){
2319 	   test_stride_arr[i] = stride_arr[i] = *(int*)msg;
2320 	   msg = (request_header_t*) ((int*)msg + 1);
2321 	 }
2322 	 for(i=0; i<stride_levels+1; i++){
2323 	   test_count[i] = count[i] = *(int*)msg;
2324 	   msg = (request_header_t*) ((int*)msg + 1);
2325 	 }
2326 
2327 	 if (DEBUG_SERVER) {
2328 	   printf(" server:the dest_ptr is %p\n", dest_ptr);
2329 	   for(i =0; i<stride_levels; i++)
2330 	     printf("stride_arr[i] is %d,value of count[i] is %d\n",
2331 		    stride_arr[i], count[i]);
2332 	   printf("the value of stride_levels is %d\n", stride_levels);
2333 	   fflush(stdout);
2334 	 }
2335 
2336 	 found =get_armci_region_local_hndl(dest_ptr,armci_me, &loc_memhandle);
2337 	 dassertp(1,found!=0,("%d:SERVER : local region not found id=%d",
2338 			      armci_me,pdscr->wr_id));
2339 
2340 	 if(DEBUG_SERVER) {
2341 	   printf("%d(s) : about to call armci_post_scatter\n",armci_me);
2342 	   fflush(stdout);
2343 	 }
2344 
2345 	 armci_post_scatter(dest_ptr, stride_arr, count, stride_levels,
2346 			    loc_memhandle,msginfo->from, mytag, SERV,NULL );
2347 
2348 	 mytag = (mytag+1)%(MAX_PENDING);
2349 	 if(mytag==0)mytag=1;
2350 
2351 	 if(DEBUG_SERVER) {
2352 	   printf("%d(s) : finished posting %d scatter\n",armci_me,num);
2353 	   fflush(stdout);
2354 	 }
2355 	 _armci_pendbuf_post_immbuf(cbufs, msginfo->from);
2356 	 SERVER_SEND_ACK(msginfo->from);
2357        }
2358        else if(msginfo->operation == REGISTER){
2359 	 if (DEBUG_SERVER) {
2360             printf("%d(s) : Register_op id is %d, comp_dscr_id is  %ld\n",
2361                      armci_me,msginfo->operation,pdscr->wr_id);
2362             fflush(stdout);
2363           }
2364 
2365           armci_server_register_region(*((void **)(msginfo+1)),
2366                            *((long *)((char *)(msginfo+1)+sizeof(void *))),
2367                            (ARMCI_MEMHDL_T *)(msginfo->tag.data_ptr));
2368 	  _armci_pendbuf_post_immbuf(cbufs, msginfo->from);
2369           *(long *)(msginfo->tag.ack_ptr) = ARMCI_STAMP;
2370           continue;
2371        }
2372        else {
2373          if(DEBUG_SERVER) {
2374 	   printf("%d(s) : request is %ld about to call armci_data_server\n",
2375 		  armci_me, pdscr->wr_id);
2376 	   fflush(stdout);
2377          }
2378 #if defined(PEND_BUFS)
2379 	 armci_pendbuf_service_req(cbuf);
2380 #else
2381 	 _armci_pendbuf_post_immbuf(cbufs, msginfo->from);
2382 	 armci_data_server(cbuf);
2383 
2384 	 if((msginfo->operation == PUT) || ARMCI_ACC(msginfo->operation)) {
2385 	   /* for operations that do not send data back we can send ACK now */
2386 	   SERVER_SEND_ACK(msginfo->from);
2387 	   if(DEBUG_SERVER){
2388 	     printf("%d(s) : posted ack\n\n",armci_me);
2389 	     fflush(stdout);
2390 	   }
2391 	 }
2392 #endif
2393        }
2394        if (0) {
2395 	 printf("%d(s):Done processed request\n\n",armci_me);
2396 	 fflush(stdout);
2397        }
2398 
2399 #if ARMCI_ENABLE_GPC_CALLS
2400        unblock_thread_signal(GPC_COMPLETION_SIGNAL);
2401 #endif
2402    }/* end of for */
2403 }
2404 
2405 
armci_vapi_complete_buf(armci_vapi_field_t * field,int snd,int rcv,int to,int op)2406 void armci_vapi_complete_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op) {
2407   struct ibv_send_wr *snd_dscr;
2408 
2409   BUF_INFO_T *info;
2410   info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
2411 
2412   if(info->tag && op==GET)return;
2413 
2414   if(snd){
2415     snd_dscr=&(field->sdscr);
2416     if(mark_buf_send_complete[snd_dscr->wr_id]==0)
2417       armci_send_complete(snd_dscr,"armci_vapi_complete_buf",1);
2418   }
2419 
2420   if(rcv){
2421     int *last;
2422     long *flag;
2423     int loop = 0;
2424     request_header_t *msginfo = (request_header_t *)(field+1);
2425     flag = (long *)&msginfo->tag.ack;
2426 
2427 
2428     if(op==PUT || ARMCI_ACC(op)){
2429       if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
2430 	 op == PUT);
2431       else{
2432 	while(armci_util_long_getval(flag) != ARMCI_STAMP) {
2433 	  loop++;
2434 	  loop %=100000;
2435 	  if(loop==0){
2436 	  }
2437 	}
2438       }
2439       /* 	 printf("%d: client complete_buf. op=%d loop=%d till *flag=ARMCI_STAMP\n", armci_me,op,loop); */
2440       /* 	 fflush(stdout); */
2441       *flag = 0L;
2442     }
2443     else{
2444       /*SK: I think we get here only for GET with result directly
2445 	going to client's pinned memory. (info.tag==0 && op==GET)*/
2446       last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
2447       while(armci_util_int_getval(last) == ARMCI_STAMP &&
2448 	    armci_util_long_getval(flag)  != ARMCI_STAMP){
2449 	loop++;
2450 	loop %=100000;
2451 	if(loop==0){
2452 	  if(DEBUG_CLN){
2453 	    printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
2454 		   armci_me,(void*)last,*last,(void*)flag,*flag,msginfo->datalen);
2455 	    fflush(stdout);
2456 	  }
2457 	}
2458       }
2459     }
2460   }
2461 }
2462 
armci_vapi_test_buf(armci_vapi_field_t * field,int snd,int rcv,int to,int op,int * retval)2463 void armci_vapi_test_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op, int *retval) {
2464   struct ibv_send_wr *snd_dscr;
2465 
2466   BUF_INFO_T *info;
2467   info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
2468 
2469   *retval = 0;
2470 
2471   if(info->tag && op==GET)return;
2472 
2473   if(snd){
2474     snd_dscr=&(field->sdscr);
2475     if(mark_buf_send_complete[snd_dscr->wr_id]==0) {
2476 /*       printf("%d: test buf. send not complete\n",armci_me); */
2477 /*       fflush(stdout); */
2478       return;
2479     }
2480   }
2481 
2482   if(rcv){
2483     int *last;
2484     long *flag;
2485     request_header_t *msginfo = (request_header_t *)(field+1);
2486     flag = (long *)&msginfo->tag.ack;
2487 
2488     if(op==PUT || ARMCI_ACC(op)){
2489       if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
2490 	 op == PUT)
2491 	*retval=1;
2492       else{
2493 	if(armci_util_long_getval(flag) == ARMCI_STAMP) {
2494 	  *retval = 1;
2495 	}
2496       }
2497       return;
2498     }
2499     else{
2500       /*SK: I think we get here only for GET with result directly
2501 	going to client's pinned memory. (info.tag==0 && op==GET)*/
2502       last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
2503       if(armci_util_int_getval(last) != ARMCI_STAMP ||
2504 	    armci_util_long_getval(flag)  == ARMCI_STAMP){
2505 	*retval=1;
2506       }
2507       return;
2508     }
2509   }
2510 }
2511 
2512 
armci_vapi_post_send(int isclient,int con_offset,struct ibv_send_wr * snd_dscr,char * from)2513 static inline void armci_vapi_post_send(int isclient,int con_offset,
2514                                         struct ibv_send_wr *snd_dscr,char *from)
2515 {
2516     int rc = 0;
2517     /*vapi_nic_t *nic;*/
2518     armci_connect_t *con;
2519     int total = 0;
2520 
2521     if(!isclient){
2522        /*nic = CLN_nic;*/
2523        con = CLN_con+con_offset;
2524     }
2525     else{
2526        /*nic = SRV_nic;*/
2527        con = SRV_con+con_offset;
2528     }
2529 
2530     if(DEBUG_CLN){
2531        printf("vapi_post_send: snd_dscr->num_sge=%d, snd_dscr->sg_list->length=%d\n",
2532               snd_dscr->num_sge, snd_dscr->sg_list->length);
2533        fflush(stdout);
2534     }
2535 
2536 
2537     /* find the total length of all the segments */
2538     total = snd_dscr->sg_list->length * snd_dscr->num_sge;
2539     if(DEBUG_CLN){
2540        printf("%d(c) : total is %d\t, max_size is %d\n",armci_me,total,
2541                     armci_vapi_max_inline_size);
2542     }
2543 
2544     struct ibv_send_wr *bad_wr;
2545     if (total > armci_vapi_max_inline_size) {
2546         rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
2547     } else {
2548         rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
2549         /* no corresponding call, using ibv_post_send
2550        rc = EVAPI_post_inline_sr(nic->handle,con->qp,snd_dscr);*/
2551     }
2552     dassert1(1,rc==0,rc);
2553 }
2554 
2555 /** Send request to server.
2556   */
armci_send_req_msg(int proc,void * buf,int bytes)2557 int armci_send_req_msg(int proc, void *buf, int bytes)
2558 {
2559   int cluster = armci_clus_id(proc);
2560     request_header_t *msginfo = (request_header_t *)buf;
2561     struct ibv_send_wr *snd_dscr;
2562     struct ibv_sge *ssg_lst;
2563 
2564     THREAD_LOCK(armci_user_threads.net_lock);
2565 
2566     check_state_of_ib_connection(proc, 0);
2567 
2568     snd_dscr = BUF_TO_SDESCR((char *)buf);
2569     ssg_lst  = BUF_TO_SSGLST((char *)buf);
2570 
2571     /*Stamp end of buffers as needed*/
2572     if(msginfo->operation == GET && !msginfo->pinned) {
2573       const int dscrlen = msginfo->dscrlen;
2574       const int datalen = msginfo->datalen;
2575       int *last;
2576       if(dscrlen < (datalen - sizeof(int)))
2577 	last = (int*)(((char*)(msginfo+1))+(datalen-sizeof(int)));
2578       else
2579 	last = (int*)(((char*)(msginfo+1))+(dscrlen+datalen-sizeof(int)));
2580       *last = ARMCI_STAMP;
2581 #ifdef GET_STRIDED_COPY_PIPELINED
2582       if(msginfo->format == STRIDED) {
2583 	const int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
2584 	int *sfirst = (int*)(dscrlen+(char*)(msginfo+1))+ssize; /*stamping
2585 							    can start here*/
2586 	int *slast = last, *ptr;
2587 	for(ptr=sfirst; ptr<slast; ptr+=ssize) {
2588 	  *ptr = ARMCI_STAMP;
2589 	}
2590       }
2591 #endif
2592     }
2593     if(msginfo->operation == ACK) {
2594       *(int *)(msginfo +1) = ARMCI_STAMP+1;
2595       *(((int *)(msginfo +1))+1) = ARMCI_STAMP+1;
2596     }
2597 
2598 
2599 #if defined(PEND_BUFS)
2600     if((msginfo->operation==PUT || ARMCI_ACC(msginfo->operation))
2601        && bytes > IMM_BUF_LEN) {
2602       msginfo->tag.imm_msg=0;
2603       assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
2604       bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
2605       assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
2606     }
2607     else if(msginfo->operation==GET
2608 	    && !(msginfo->datalen+sizeof(request_header_t)+msginfo->dscrlen<IMM_BUF_LEN)) {
2609       assert(sizeof(request_header_t) < IMM_BUF_LEN);
2610       msginfo->tag.imm_msg=0;
2611       bytes = ARMCI_MIN(sizeof(request_header_t)+msginfo->dscrlen, IMM_BUF_LEN);
2612     }
2613 #if defined(PUT_NO_SRV_COPY) && 0 /*SK:disabled. Imm msgs are sent inline
2614 				    for latency reasons*/
2615     else if(msginfo->operation==PUT && !msginfo->pinned && msginfo->format==STRIDED && msginfo->tag.data_len>=2048) {
2616       msginfo->tag.imm_msg = 0;
2617       assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
2618       bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
2619       assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
2620     }
2621 #endif
2622     else{
2623       msginfo->tag.imm_msg=1;
2624     }
2625 /*    printf("%d: send_req: op=%d bytes=%d data_len=%d imm=%d\n",*/
2626 /*	   armci_me, msginfo->operation, bytes, msginfo->datalen,msginfo->tag.imm_msg);*/
2627 /*    fflush(stdout);*/
2628     if(bytes<0 || bytes>IMM_BUF_LEN) {
2629       printf("%d(pid=%d): Trying to send too large a mesg. op=%d bytes=%d(max=%d) to=%d\n", armci_me, getpid(),msginfo->operation,bytes,IMM_BUF_LEN, proc);
2630       fflush(stdout);
2631       pause();
2632       assert(bytes>=0);
2633       assert(bytes <= IMM_BUF_LEN);
2634     }
2635     _armci_buf_ensure_pend_outstanding_op_per_node(buf,cluster);
2636 /*     printf("%d: send_req. ensured pend os per node. to=%d op=%d\n", armci_me, msginfo->to,msginfo->operation); */
2637 /*     fflush(stdout); */
2638 #else
2639     _armci_buf_ensure_one_outstanding_op_per_node(buf,cluster);
2640 #endif
2641 
2642     if(msginfo->operation == PUT || ARMCI_ACC(msginfo->operation)){
2643 #if defined(PEND_BUFS)
2644       if(!msginfo->tag.imm_msg){
2645         msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
2646         msginfo->tag.data_len = msginfo->datalen;
2647       }
2648       else
2649 	msginfo->tag.data_ptr = NULL;
2650 #else
2651       {
2652           msginfo->tag.data_ptr = (void *)&msginfo->tag.ack;
2653       }
2654 #endif
2655     }
2656     else {
2657        if(msginfo->operation == GET && !msginfo->bypass && msginfo->dscrlen
2658                        >= (msginfo->datalen-sizeof(int)))
2659          msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
2660        else
2661          msginfo->tag.data_ptr = GET_DATA_PTR(buf);
2662     }
2663 
2664     /*this has to be reset so that we can wait on it
2665       see ReadFromDirect*/
2666     msginfo->tag.ack = 0;
2667     msginfo->tag.ack_ptr = &(msginfo->tag.ack);
2668 
2669     if(DEBUG_CLN){
2670        printf("%d:the ack_ptr is initialised to %p, ack->value is %ld\n",
2671                  armci_me,msginfo->tag.ack_ptr,msginfo->tag.ack);fflush(stdout);
2672     }
2673 
2674     armci_init_vapibuf_send(snd_dscr, ssg_lst,buf,
2675                             bytes, &client_memhandle);
2676 
2677 /*    printf("%d: Sending req wr_id=%d to=%d\n",armci_me,snd_dscr->wr_id,proc);*/
2678 /*    fflush(stdout);*/
2679     armci_vapi_post_send(1,cluster,snd_dscr,"send_req_msg:post_send");
2680 
2681     THREAD_UNLOCK(armci_user_threads.net_lock);
2682 
2683     if(DEBUG_CLN){
2684        printf("%d:client sent REQ=%d %d bytes serv=%d qp=%p id =%ld lkey=%d\n",
2685                armci_me,msginfo->operation,bytes,cluster,
2686                (void*)(SRV_con+cluster)->qp,snd_dscr->wr_id,ssg_lst->lkey);
2687        fflush(stdout);
2688     }
2689     return(0);
2690 }
2691 
2692 
2693 /*\
2694  *  client waits for first phase ack before posting gather desr
2695 \*/
armci_wait_ack(char * buffer)2696 void armci_wait_ack(char *buffer)
2697 {
2698    long *flag;
2699    request_header_t *msginfo = (request_header_t *)(buffer);
2700    flag = (long*)&msginfo->tag.ack;
2701 
2702    while(armci_util_long_getval(flag) != ARMCI_STAMP);
2703    flag = 0;
2704 }
2705 
2706 
2707 
2708 
armci_client_direct_send(int p,void * src_buf,void * dst_buf,int len,void ** contextptr,int nbtag,ARMCI_MEMHDL_T * lochdl,ARMCI_MEMHDL_T * remhdl)2709 void armci_client_direct_send(int p,void *src_buf, void *dst_buf, int len,void** contextptr,int nbtag,ARMCI_MEMHDL_T *lochdl,ARMCI_MEMHDL_T *remhdl)
2710 {
2711 sr_descr_t *dirdscr;
2712 int clus = armci_clus_id(p);
2713 
2714     check_state_of_ib_connection(p, 0);
2715 
2716     THREAD_LOCK(armci_user_threads.net_lock);
2717 
2718     /*ID for the desr that comes from get_next_descr is already set*/
2719     dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2720     if(nbtag)*contextptr = dirdscr;
2721 
2722     armci_init_cbuf_srdma(&dirdscr->sdescr,dirdscr->sg_entry,src_buf,dst_buf,
2723                           len,lochdl,remhdl);
2724 
2725     armci_vapi_post_send(1,clus,&(dirdscr->sdescr),
2726                          "client_direct_send:post_send");
2727 
2728     /* the following unlock/lock ensures fairness (in case other threads are waiting
2729        on the lock) not required to work */
2730 #if 1
2731     THREAD_UNLOCK(armci_user_threads.net_lock);
2732     THREAD_LOCK(armci_user_threads.net_lock);
2733 #endif
2734 
2735     if(nbtag==0)
2736        armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_send",1);
2737 
2738     THREAD_UNLOCK(armci_user_threads.net_lock);
2739 }
2740 
2741 /*\ RDMA get
2742 \*/
armci_client_direct_get(int p,void * src_buf,void * dst_buf,int len,void ** cptr,int nbtag,ARMCI_MEMHDL_T * lochdl,ARMCI_MEMHDL_T * remhdl)2743 void armci_client_direct_get(int p, void *src_buf, void *dst_buf, int len,
2744                              void** cptr,int nbtag,ARMCI_MEMHDL_T *lochdl,
2745                              ARMCI_MEMHDL_T *remhdl)
2746 {
2747 int rc = 0;
2748 sr_descr_t *dirdscr;
2749 int clus = armci_clus_id(p);
2750     check_state_of_ib_connection(p, 0);
2751 struct ibv_send_wr *bad_wr;
2752 
2753     THREAD_LOCK(armci_user_threads.net_lock);
2754 
2755     /*ID for the desr that comes from get_next_descr is already set*/
2756     dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2757     if(nbtag)*cptr = dirdscr;
2758 
2759     if(DEBUG_CLN){
2760       printf("\n%d: in direct get lkey=%d rkey=%d\n",armci_me,lochdl->lkey,
2761                remhdl->rkey);fflush(stdout);
2762     }
2763 
2764     armci_init_cbuf_rrdma(&dirdscr->sdescr,dirdscr->sg_entry,dst_buf,src_buf,
2765                           len,lochdl,remhdl);
2766     rc = ibv_post_send((SRV_con+clus)->qp, &(dirdscr->sdescr), &bad_wr);
2767     dassert1(1,rc==0,rc);
2768 
2769     /* unlock/lock to ensure fairness: allows others thread post before
2770        waiting for completion */
2771     /*VT?check to see if this should be UNLOCK followed by lock*/
2772 #if 1
2773     THREAD_UNLOCK(armci_user_threads.net_lock);
2774     THREAD_LOCK(armci_user_threads.net_lock);
2775 #endif
2776 
2777     if(!nbtag){
2778        armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_get",1);
2779     }
2780 
2781     THREAD_UNLOCK(armci_user_threads.net_lock);
2782 }
2783 
2784 #define WQE_LIST_LENGTH 32
2785 #define WQE_LIST_COUNT  1
2786 
2787 /** Direct put into remote processor memory. Assumes that (and invoked
2788  *  only when) the source buffers in user memory are pinned as well.
2789  * @param operation PUT/GET
2790  * @param src_ptr Source pointer for data
2791  * @param src_stride_arr Strides on the source array
2792  * @param dst_ptr Destination pointer to start writing to
2793  * @param seq_count[stride_levels+1] #els in each stride
2794  * level. seg_count[0] is contiguous bytes
2795  * @param proc Destimation process
2796  * @param cptr OUT Pointer to store the descriptor to wait on for completion
2797  * @param nbtag IN Non-blocking tag (non-blocking op if nbtag!=0)
2798  * @param lochdl IN Local memory handle/key (registered memory stuff)
2799  * @param remhdl IN Remote memory handle/key
2800  *
2801  */
2802 #if 0
2803 void armci_client_direct_rdma_strided(int operation, int proc,
2804 				      char *src_ptr, int src_stride_arr[],
2805 				      char *dst_ptr, int dst_stride_arr[],
2806 				      int seg_count[],
2807 				      int stride_levels,
2808 				      void **cptr, int nbtag,
2809 				      ARMCI_MEMHDL_T *lochdl,
2810 				      ARMCI_MEMHDL_T *remhdl) {
2811 
2812   int rc;
2813   sr_descr_t *dirdscr;
2814   const int clus = armci_clus_id(proc);
2815   struct ibv_send_wr *bad_wr;
2816   struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2817   struct ibv_sge     sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2818   int busy[WQE_LIST_COUNT], wait_count[WQE_LIST_COUNT],clst;
2819   int i, j, c, numposts;
2820   int idx[MAX_STRIDE_LEVEL];
2821 
2822   THREAD_LOCK(armci_user_threads.net_lock);
2823 
2824   assert(stride_levels >= 0);
2825   assert(stride_levels<=MAX_STRIDE_LEVEL);
2826   /*ID for the desr that comes from get_next_descr is already set*/
2827   dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2828   if(nbtag)*cptr = dirdscr;
2829   assert(dirdscr->tag == nbtag);
2830 
2831   if(DEBUG_CLN) {
2832     printf("\n%d: in direct rdma strided id=%d lkey=%ld rkey=%ld\n",
2833 	   armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
2834   }
2835 
2836   for(c=0; c<WQE_LIST_COUNT; c++) {
2837     busy[c]=0;
2838   }
2839   /*initialize fixed values for descriptors*/
2840   bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
2841   bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
2842   for(j=0; j<WQE_LIST_COUNT; j++) {
2843     for(i=0; i<WQE_LIST_LENGTH; i++) {
2844       if(operation == PUT)
2845 	armci_init_cbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2846       else if(operation == GET)
2847 	armci_init_cbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2848       else
2849 	armci_die("rdma_strided: unsupported operation",operation);
2850       sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
2851       sdscr[j][i].send_flags = 0; /*non-signalled*/
2852       if(i<WQE_LIST_LENGTH-1)
2853 	sdscr[j][i].next = &sdscr[j][i+1];
2854     }
2855   }
2856   /*post requests in a loop*/
2857   numposts=1;
2858   for(i=1; i<=stride_levels; i++) {
2859     numposts *= seg_count[i];
2860   }
2861 /*   printf("%d: client rdma op=%d numposts=%d\n",armci_me,operation,numposts); */
2862 
2863   dirdscr->numofsends=0;
2864   bzero(idx, stride_levels*sizeof(int));
2865   int count = (numposts%WQE_LIST_LENGTH) ? (numposts%WQE_LIST_LENGTH):WQE_LIST_LENGTH;
2866   assert(count == ARMCI_MIN(count, numposts));
2867   clst=0;
2868   for(i=0; i<numposts; ) {
2869     for(j=i; j<i+count; j++) {
2870       int src_offset=0, dst_offset=0;
2871       for(c=0; c<stride_levels; c++) {
2872 	src_offset += idx[c]*src_stride_arr[c];
2873 	dst_offset += idx[c]*dst_stride_arr[c];
2874       }
2875 
2876 /*       armci_client_direct_send(proc,src_ptr+src_offset,  */
2877 /* 			       dst_ptr+dst_offset, seg_count[0], */
2878 /* 			       NULL,0,lochdl,remhdl); */
2879       if(busy[clst]) {
2880 	assert(wait_count[clst]>0);
2881 	armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",wait_count[clst]);
2882 	dirdscr->numofsends -= wait_count[clst];
2883 	busy[clst]=0;
2884 	wait_count[clst]=0;
2885       }
2886 
2887       if(operation == PUT) {
2888 	sg_entry[clst][j-i].addr        = (uint64_t)(src_ptr + src_offset);
2889 	sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(dst_ptr + dst_offset);
2890       }
2891       else if (operation == GET) {
2892 	sg_entry[clst][j-i].addr        = (uint64_t)(dst_ptr + dst_offset);
2893 	sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(src_ptr + src_offset);
2894       }
2895       assert(sg_entry[clst][j-i].length == seg_count[0]);
2896 
2897       idx[0] += 1;
2898       for(c=0;c<stride_levels-1 && idx[c]==seg_count[c+1]; c++) {
2899 	idx[c]=0; idx[c+1]++;
2900       }
2901     }
2902     sdscr[clst][count-1].next=NULL;
2903     sdscr[clst][count-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
2904     for(c=0; c<count-1; c++) {
2905       assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
2906     }
2907     rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
2908     dassert1(1,rc==0,rc);
2909     dirdscr->numofsends += 1;
2910     wait_count[clst] = 1;
2911 /*     armci_send_complete(&dirdscr->sdescr,"armci_client_direct_rdma_strided",count); */
2912 
2913     if(count < WQE_LIST_LENGTH) {
2914       sdscr[clst][count-1].next=&sdscr[clst][count]; /*reset it*/
2915     }
2916     sdscr[clst][count-1].send_flags=0; /*reset it*/
2917     i += count;
2918     count = ARMCI_MIN(WQE_LIST_LENGTH,numposts-i);
2919     assert(count==0 || count==WQE_LIST_LENGTH);
2920     clst = (clst+1)%WQE_LIST_COUNT;
2921   }
2922 
2923   if(!nbtag) {
2924     armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
2925     dirdscr->numofsends = 0;
2926     dirdscr->tag = 0;
2927   }
2928   THREAD_UNLOCK(armci_user_threads.net_lock);
2929 }
2930 #else
armci_client_direct_rdma_strided(int operation,int proc,char * src_ptr,int src_stride_arr[],char * dst_ptr,int dst_stride_arr[],int seg_count[],int stride_levels,void ** cptr,int nbtag,ARMCI_MEMHDL_T * lochdl,ARMCI_MEMHDL_T * remhdl)2931 void armci_client_direct_rdma_strided(int operation, int proc,
2932 				      char *src_ptr, int src_stride_arr[],
2933 				      char *dst_ptr, int dst_stride_arr[],
2934 				      int seg_count[],
2935 				      int stride_levels,
2936 				      void **cptr, int nbtag,
2937 				      ARMCI_MEMHDL_T *lochdl,
2938 				      ARMCI_MEMHDL_T *remhdl) {
2939   int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr;
2940   sr_descr_t *dirdscr;
2941   const int clus = armci_clus_id(proc);
2942   struct ibv_send_wr *bad_wr;
2943   struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2944   struct ibv_sge     sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2945   stride_info_t sinfo, dinfo;
2946 
2947   THREAD_LOCK(armci_user_threads.net_lock);
2948 
2949   assert(stride_levels >= 0);
2950   assert(stride_levels<=MAX_STRIDE_LEVEL);
2951   /*ID for the desr that comes from get_next_descr is already set*/
2952   dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2953   if(nbtag)*cptr = dirdscr;
2954   assert(dirdscr->tag == nbtag);
2955 
2956   if(DEBUG_CLN) {
2957     printf("\n%d: in direct rdma strided id=%lu lkey=%u rkey=%u\n",
2958 	   armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
2959   }
2960 
2961   /*initialize fixed values for descriptors*/
2962   bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
2963   bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
2964   for(j=0; j<WQE_LIST_COUNT; j++) {
2965     for(i=0; i<WQE_LIST_LENGTH; i++) {
2966       if(operation == PUT)
2967 	armci_init_cbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2968       else if(operation == GET)
2969 	armci_init_cbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2970       else
2971 	armci_die("rdma_strided: unsupported operation",operation);
2972       sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
2973       sdscr[j][i].send_flags = 0; /*non-signalled*/
2974       if(i<WQE_LIST_LENGTH-1)
2975 	sdscr[j][i].next = &sdscr[j][i+1];
2976     }
2977   }
2978 
2979   /*post requests in a loop*/
2980   armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
2981   armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
2982   assert(armci_stride_info_size(&sinfo)==armci_stride_info_size(&dinfo));
2983 
2984   dirdscr->numofsends=0;
2985   clst=ctr=0;
2986   bzero(busy, sizeof(int)*WQE_LIST_COUNT);
2987   while(armci_stride_info_has_more(&sinfo)) {
2988     assert(armci_stride_info_has_more(&dinfo));
2989     uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
2990     uint64_t daddr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
2991     if(operation == PUT) {
2992       sg_entry[clst][ctr].addr = saddr;
2993       sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
2994     }
2995     else if (operation == GET) {
2996       sg_entry[clst][ctr].addr = daddr;
2997       sdscr[clst][ctr].wr.rdma.remote_addr = saddr;
2998     }
2999     assert(sg_entry[clst][ctr].length == seg_count[0]);
3000 
3001     ctr+=1;
3002     armci_stride_info_next(&sinfo);
3003     armci_stride_info_next(&dinfo);
3004     if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(&sinfo)) {
3005       sdscr[clst][ctr-1].next=NULL;
3006       sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3007       for(c=0; c<ctr-1; c++) {
3008 	assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
3009       }
3010 
3011       check_state_of_ib_connection(armci_clus_info[clus].master, 0);
3012       rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
3013       dassert1(1,rc==0,rc);
3014       busy[clst] = 1;
3015       dirdscr->numofsends += 1;
3016       if(ctr<WQE_LIST_LENGTH)
3017 	sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
3018       sdscr[clst][ctr-1].send_flags = 0;
3019 
3020       ctr=0;
3021       clst = (clst+1)%WQE_LIST_COUNT;
3022       if(busy[clst]) {
3023 	armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",1);
3024 	dirdscr->numofsends -= 1;
3025 	busy[clst]=0;
3026       }
3027     }
3028   }
3029   armci_stride_info_destroy(&sinfo);
3030   armci_stride_info_destroy(&dinfo);
3031 
3032   if(!nbtag) {
3033     armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
3034     dirdscr->numofsends = 0;
3035     dirdscr->tag = 0;
3036   }
3037   THREAD_UNLOCK(armci_user_threads.net_lock);
3038 }
3039 #endif
3040 
3041 #if defined(PEND_BUFS)
armci_server_msginfo_to_pbuf_index(request_header_t * msginfo)3042 int armci_server_msginfo_to_pbuf_index(request_header_t *msginfo) {
3043   int index=-1, i;
3044 
3045   assert(!msginfo->tag.imm_msg);
3046   for(i = 0; i<PENDING_BUF_NUM; i++) {
3047     if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
3048       index = i;
3049       break;
3050     }
3051   }
3052   return index;
3053 }
3054 
3055 
3056 /** Routine for server to RDMA strided data to the client-side buffers
3057  * (allocated through buffers.c). This is to be used instead of
3058  * copying the data to immediate or pending buffers when possible.
3059  */
3060 #if 0
3061 void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
3062 					 int seg_count[],
3063 					 int stride_levels,
3064 					 char *dst_ptr, int proc,
3065 					 request_header_t *msginfo) {
3066   int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr, wr_id;
3067   sr_descr_t *dirdscr;
3068   struct ibv_send_wr *bad_wr, sdscr1;
3069   struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
3070   struct ibv_sge     sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
3071   stride_info_t sinfo;
3072   uint64_t daddr;
3073   ARMCI_MEMHDL_T *loc_memhdl;
3074   ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
3075 
3076   THREAD_LOCK(armci_user_threads.net_lock);
3077 
3078   assert(msginfo->operation == GET);
3079   assert(stride_levels >= 0);
3080   assert(stride_levels<=MAX_STRIDE_LEVEL);
3081 
3082   if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
3083     armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
3084   }
3085 
3086   if(!msginfo->tag.imm_msg) {
3087     int index = armci_server_msginfo_to_pbuf_index(msginfo);
3088     assert(index>=0);
3089     wr_id = PBUF_BUFID_TO_PUT_WRID(index);
3090   }
3091   else {
3092     wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3093   }
3094   bzero(&sdscr1, sizeof(sdscr1));
3095   sdscr1.wr_id = wr_id;
3096 
3097   if(DEBUG_CLN) {
3098     printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
3099 	   armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3100     fflush(stdout);
3101   }
3102 
3103   /*initialize fixed values for descriptors*/
3104   bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
3105   bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
3106   for(j=0; j<WQE_LIST_COUNT; j++) {
3107     for(i=0; i<WQE_LIST_LENGTH; i++) {
3108       armci_init_cbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3109       sdscr[j][i].wr_id = wr_id;
3110       sdscr[j][i].send_flags = 0; /*non-signalled*/
3111 /*       sdscr[j][i].send_flags = IBV_SEND_SIGNALED; /\*signalled*\/ */
3112       if(i<WQE_LIST_LENGTH-1)
3113 	sdscr[j][i].next = &sdscr[j][i+1];
3114     }
3115   }
3116 
3117   /*post requests in a loop*/
3118   sinfo = armci_stride_info_init(src_ptr,stride_levels,src_stride_arr,seg_count);
3119 
3120   clst=ctr=0;
3121   bzero(busy, sizeof(int)*WQE_LIST_COUNT);
3122   daddr = (uint64_t)dst_ptr;
3123   while(armci_stride_info_has_more(sinfo)) {
3124     uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(sinfo);
3125     sg_entry[clst][ctr].addr = saddr;
3126     sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
3127     assert(sg_entry[clst][ctr].length == seg_count[0]);
3128 
3129     ctr+=1;
3130     daddr += seg_count[0];
3131     armci_stride_info_next(sinfo);
3132     if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(sinfo)) {
3133       sdscr[clst][ctr-1].next=NULL;
3134       if(!armci_stride_info_has_more(sinfo)) {
3135 	sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3136       }
3137       for(c=0; c<ctr-1; c++) {
3138 	assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
3139       }
3140       rc = ibv_post_send(CLN_con[proc].qp, sdscr[clst], &bad_wr);
3141       dassert1(1,rc==0,rc);
3142       busy[clst] = 1;
3143 #if 0
3144       armci_send_complete(&sdscr1,"serv_rdma_to_contig",ctr);
3145       busy[clst] = 0;
3146 #endif
3147       if(ctr<WQE_LIST_LENGTH)
3148 	sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
3149       sdscr[clst][ctr-1].send_flags = 0;
3150 
3151       ctr=0;
3152       clst = (clst+1)%WQE_LIST_COUNT;
3153 #if 0
3154       if(busy[clst]) {
3155 	armci_send_complete(&sdscr1,"client_direct_rdma_strided",1);
3156 	busy[clst]=0;
3157       }
3158 #endif
3159     }
3160   }
3161   armci_stride_info_destroy(&sinfo);
3162   assert(proc == msginfo->from);
3163   THREAD_UNLOCK(armci_user_threads.net_lock);
3164 }
3165 #else
3166 
3167 #define MAX_NUM_SGE 64
3168 
3169 /*same as above, but uses gather rdma writes*/
armci_server_rdma_strided_to_contig(char * src_ptr,int src_stride_arr[],int seg_count[],int stride_levels,char * dst_ptr,int proc,request_header_t * msginfo)3170 void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
3171 					 int seg_count[],
3172 					 int stride_levels,
3173 					 char *dst_ptr, int proc,
3174 					 request_header_t *msginfo) {
3175   int rc, ctr, wr_id, bytes;
3176   struct ibv_send_wr *bad_wr, sdscr1, sdscr;
3177   struct ibv_sge     sg_entry[MAX_NUM_SGE];
3178   stride_info_t sinfo;
3179   uint64_t daddr;
3180   ARMCI_MEMHDL_T *loc_memhdl;
3181   ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
3182   const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
3183   int numposts=0, numsegs=0;
3184 
3185   THREAD_LOCK(armci_user_threads.net_lock);
3186 
3187   assert(msginfo->operation == GET);
3188   assert(stride_levels >= 0);
3189   assert(stride_levels<=MAX_STRIDE_LEVEL);
3190 
3191   if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
3192     armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
3193   }
3194 
3195   if(!msginfo->tag.imm_msg) {
3196     int index = armci_server_msginfo_to_pbuf_index(msginfo);
3197     assert(index>=0);
3198     wr_id = PBUF_BUFID_TO_PUT_WRID(index);
3199   }
3200   else {
3201     wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3202   }
3203   bzero(&sdscr1, sizeof(sdscr1));
3204   sdscr1.wr_id = wr_id;
3205 
3206   if(DEBUG_CLN) {
3207     printf("\n%d: in rdma strided to contig id=%d lkey=%u rkey=%u\n",
3208 	   armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3209     fflush(stdout);
3210   }
3211 
3212   /*initialize fixed values for descriptors*/
3213   bzero(&sdscr, sizeof(sdscr));
3214   bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
3215   armci_init_cbuf_srdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3216   sdscr.send_flags = 0; /*non-signalled*/
3217   sdscr.num_sge    = 0; /*set below in the loop*/
3218   sdscr.wr_id      = wr_id;
3219 
3220   for(ctr=0; ctr<max_num_sge; ctr++) {
3221     sg_entry[ctr].length = seg_count[0];
3222     sg_entry[ctr].lkey = loc_memhdl->lkey;
3223   }
3224 
3225   /*post requests in a loop*/
3226   armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
3227 
3228   numposts = numsegs = 0;
3229   ctr=0;
3230   daddr = (uint64_t)dst_ptr;
3231   bytes=0;
3232   while(armci_stride_info_has_more(&sinfo)) {
3233     sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
3234     assert(sg_entry[ctr].length == seg_count[0]);
3235 
3236     sdscr.num_sge += 1;
3237     bytes += seg_count[0];
3238     ctr+=1;
3239     numsegs += 1;
3240     armci_stride_info_next(&sinfo);
3241     if(ctr == max_num_sge || !armci_stride_info_has_more(&sinfo)) {
3242       sdscr.wr.rdma.remote_addr = daddr;
3243       if(!armci_stride_info_has_more(&sinfo)) {
3244 	sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3245       }
3246       else {
3247 	assert(sdscr.send_flags == 0);
3248       }
3249       assert(ctr == sdscr.num_sge);
3250       rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
3251       dassert1(1,rc==0,rc);
3252 
3253       numposts += 1;
3254       ctr=0;
3255       sdscr.num_sge = 0;
3256       daddr += bytes;
3257       bytes = 0;
3258     }
3259   }
3260 /*   printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
3261   armci_stride_info_destroy(&sinfo);
3262   assert(proc == msginfo->from);
3263   THREAD_UNLOCK(armci_user_threads.net_lock);
3264 }
3265 
3266 /*Directly read data from client buffers into remote memory. Data is
3267   contiguous in client-side. */
armci_server_rdma_contig_to_strided(char * src_ptr,int proc,char * dst_ptr,int dst_stride_arr[],int seg_count[],int stride_levels,request_header_t * msginfo)3268 void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
3269 					 char *dst_ptr,
3270 					 int dst_stride_arr[],
3271 					 int seg_count[],
3272 					 int stride_levels,
3273 					 request_header_t *msginfo) {
3274   int rc, ctr, wr_id, bytes;
3275   struct ibv_send_wr *bad_wr, sdscr1, sdscr;
3276   struct ibv_sge     sg_entry[MAX_NUM_SGE];
3277   stride_info_t dinfo;
3278   uint64_t saddr;
3279   ARMCI_MEMHDL_T *loc_memhdl;
3280   ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
3281   const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
3282   int numposts=0, numsegs=0;
3283 
3284   THREAD_LOCK(armci_user_threads.net_lock);
3285 
3286   assert(msginfo->operation == PUT);
3287   assert(stride_levels >= 0);
3288   assert(stride_levels<=MAX_STRIDE_LEVEL);
3289 
3290   if(!get_armci_region_local_hndl(dst_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
3291     armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
3292   }
3293 
3294   if(!msginfo->tag.imm_msg) {
3295     int index = armci_server_msginfo_to_pbuf_index(msginfo);
3296     assert(index>=0);
3297     wr_id = PBUF_BUFID_TO_GET_WRID(index);
3298   }
3299   else {
3300     wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3301   }
3302   bzero(&sdscr1, sizeof(sdscr1));
3303   sdscr1.wr_id = wr_id;
3304 
3305   if(DEBUG_CLN) {
3306     printf("\n%d: in rdma strided to contig id=%d lkey=%u rkey=%u\n",
3307 	   armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3308     fflush(stdout);
3309   }
3310 
3311   /*initialize fixed values for descriptors*/
3312   bzero(&sdscr, sizeof(sdscr));
3313   bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
3314   armci_init_cbuf_rrdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3315   sdscr.send_flags = 0; /*non-signalled*/
3316   sdscr.num_sge    = 0; /*set below in the loop*/
3317   sdscr.wr_id      = wr_id;
3318 
3319   for(ctr=0; ctr<max_num_sge; ctr++) {
3320     sg_entry[ctr].length = seg_count[0];
3321     sg_entry[ctr].lkey = loc_memhdl->lkey;
3322   }
3323 
3324   /*post requests in a loop*/
3325   armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
3326 
3327   numposts = numsegs = 0;
3328   ctr=0;
3329   saddr = (uint64_t)src_ptr;
3330   bytes=0;
3331   while(armci_stride_info_has_more(&dinfo)) {
3332     sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
3333     assert(sg_entry[ctr].length == seg_count[0]);
3334 
3335     sdscr.num_sge += 1;
3336     bytes += seg_count[0];
3337     ctr+=1;
3338     numsegs += 1;
3339     armci_stride_info_next(&dinfo);
3340     if(ctr == max_num_sge || !armci_stride_info_has_more(&dinfo)) {
3341       sdscr.wr.rdma.remote_addr = saddr;
3342       if(!armci_stride_info_has_more(&dinfo)) {
3343 	sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3344       }
3345       else {
3346 	assert(sdscr.send_flags == 0);
3347       }
3348       assert(ctr == sdscr.num_sge);
3349       rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
3350       dassert1(1,rc==0,rc);
3351 
3352       numposts += 1;
3353       ctr=0;
3354       sdscr.num_sge = 0;
3355       saddr += bytes;
3356       bytes = 0;
3357     }
3358   }
3359 /*   printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
3360   armci_stride_info_destroy(&dinfo);
3361   assert(proc == msginfo->from);
3362   THREAD_UNLOCK(armci_user_threads.net_lock);
3363 }
3364 
3365 #endif
3366 #endif
3367 
armci_ReadFromDirect(int proc,request_header_t * msginfo,int len)3368 char *armci_ReadFromDirect(int proc, request_header_t *msginfo, int len)
3369 {
3370 int cluster = armci_clus_id(proc);
3371 vapibuf_ext_t* ecbuf=BUF_TO_ECBUF(msginfo);
3372 char *dataptr = GET_DATA_PTR(ecbuf->buf);
3373 extern void armci_util_wait_int(volatile int *,int,int);
3374 
3375     if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
3376                 len,(void*)&(SRV_con+cluster)->qp); fflush(stdout);
3377     }
3378 
3379     if(mark_buf_send_complete[ecbuf->snd_dscr.wr_id]==0)
3380        armci_send_complete(&(ecbuf->snd_dscr),"armci_ReadFromDirect",1);
3381 
3382     if(!msginfo->bypass){
3383        long *flag;
3384        int *last;
3385        int loop = 0;
3386        flag = &(msginfo->tag.ack);
3387        if(msginfo->operation==GET){
3388          last = (int *)(dataptr+len-sizeof(int));
3389          if(msginfo->dscrlen >= (len-sizeof(int))){
3390            last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
3391            dataptr+=msginfo->dscrlen;
3392          }
3393 
3394          if(DEBUG_CLN){
3395            printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
3396                    (void*)last,*flag,len);fflush(stdout);
3397          }
3398 
3399          while(armci_util_int_getval(last) == ARMCI_STAMP &&
3400                armci_util_long_getval(flag)  != ARMCI_STAMP){
3401            loop++;
3402            loop %=100000;
3403            if(loop==0){
3404              if(DEBUG_CLN){
3405                printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
3406                       armci_me,(void*)last,*last,(void*)flag,*flag,msginfo->datalen);
3407                fflush(stdout);
3408              }
3409            }
3410          }
3411          *flag = 0L;
3412        }
3413        else if(msginfo->operation == REGISTER){
3414          while(armci_util_long_getval(flag)  != ARMCI_STAMP){
3415            loop++;
3416            loop %=100000;
3417            if(loop==0){
3418              if(DEBUG_CLN){
3419                printf("%d: client flag(%p)=%ld off=%d\n",
3420                       armci_me,(void*)flag,*flag,msginfo->datalen);
3421                fflush(stdout);
3422              }
3423            }
3424          }
3425        }
3426        else{
3427          int *flg = (int *)(dataptr+len);
3428          while(armci_util_int_getval(flg) != ARMCI_STAMP){
3429            loop++;
3430            loop %=100000;
3431            if(loop==0){
3432              if(DEBUG_CLN){
3433                printf("%d: client waiting (%p)=%d off=%d\n",
3434                       armci_me,(void*)flg,*flg,len);
3435                fflush(stdout);
3436              }
3437            }
3438          }
3439        }
3440     }
3441     return dataptr;
3442 }
3443 
3444 
3445 #ifdef GET_STRIDED_COPY_PIPELINED
3446 /**Same as armci_ReadFromDirect, except reads partial segments
3447  *  (identify by stamping done in armci_send_req_msg() and
3448  *  returns. Note that the return value is the starting pointer of the
3449  *  buffer containig the data. It is the same for all the segments
3450  *  read for a message.
3451  * @param proc IN Read data corresponding to an earlier req to this proc
3452  * @param msginfo IN The request for which we are reading now
3453  * @param len IN #bytes in the total response
3454  * @param bytes_done OUT @bytes of the total response read so far (monotonic)
3455  * @return Starting pointer to the buffer containing the data
3456  */
armci_ReadFromDirectSegment(int proc,request_header_t * msginfo,int len,int * bytes_done)3457 char *armci_ReadFromDirectSegment(int proc, request_header_t *msginfo, int len, int *bytes_done) {
3458   int cluster = armci_clus_id(proc);
3459   vapibuf_ext_t* ecbuf=BUF_TO_ECBUF(msginfo);
3460   char *dataptr = GET_DATA_PTR(ecbuf->buf);
3461   extern void armci_util_wait_int(volatile int *,int,int);
3462 
3463   if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
3464 			len,(void*)&(SRV_con+cluster)->qp); fflush(stdout);
3465   }
3466 
3467   if(mark_buf_send_complete[ecbuf->snd_dscr.wr_id]==0)
3468     armci_send_complete(&(ecbuf->snd_dscr),"armci_ReadFromDirect",1);
3469 
3470   if(!msginfo->bypass){
3471     long *flag;
3472     int *last;
3473     int loop = 0;
3474     flag = &(msginfo->tag.ack);
3475     if(msginfo->operation==GET){
3476       last = (int *)(dataptr+len-sizeof(int));
3477       if(msginfo->dscrlen >= (len-sizeof(int))){
3478 	last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
3479 	dataptr+=msginfo->dscrlen;
3480       }
3481 
3482       if(DEBUG_CLN){
3483 	printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
3484 	       (void*)last,*flag,len);fflush(stdout);
3485       }
3486 
3487       while(armci_util_int_getval(last) == ARMCI_STAMP &&
3488 	    armci_util_long_getval(flag)  != ARMCI_STAMP){
3489 	loop++;
3490 	loop %=100000;
3491 	if(loop==0){
3492 	  if(DEBUG_CLN){
3493 	    printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
3494 		   armci_me,(void*)last,*last,(void*)flag,*flag,msginfo->datalen);
3495 	    fflush(stdout);
3496 	  }
3497 	}
3498 
3499 	{
3500 	  int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
3501 	  int *sfirst = (int*)(msginfo->dscrlen+(char*)(msginfo+1))+ssize; /*stamping
3502 									     can start here*/
3503 	  int *slast = last;
3504 	  int off = (((int *)(dataptr+*bytes_done)-sfirst+ssize)/ssize)*ssize;
3505 	  int *ptr = sfirst+off;
3506 	  dassert(1,off>=0);
3507 	  dassert(1,(char *)sfirst>dataptr);
3508 	  dassert(1,(char *)ptr>dataptr);
3509 	  if(ptr<=slast && armci_util_int_getval(ptr)!=ARMCI_STAMP) {
3510 	    *bytes_done = ((char*)ptr)-dataptr;
3511 	    return dataptr;
3512 	  }
3513 	}
3514       }
3515       *flag = 0L;
3516       *bytes_done = len;
3517       return dataptr;
3518     }
3519     else if(msginfo->operation == REGISTER){
3520       while(armci_util_long_getval(flag)  != ARMCI_STAMP){
3521 	loop++;
3522 	loop %=100000;
3523 	if(loop==0){
3524 	  if(DEBUG_CLN){
3525 	    printf("%d: client flag(%p)=%ld off=%d\n",
3526 		   armci_me,(void*)flag,*flag,msginfo->datalen);
3527 	    fflush(stdout);
3528 	  }
3529 	}
3530       }
3531     }
3532     else{
3533       int *flg = (int *)(dataptr+len);
3534       while(armci_util_int_getval(flg) != ARMCI_STAMP){
3535 	loop++;
3536 	loop %=100000;
3537 	if(loop==0){
3538 	  if(DEBUG_CLN){
3539 	    printf("%d: client waiting (%p)=%d off=%d\n",
3540 		   armci_me,(void*)flg,*flg,len);
3541 	    fflush(stdout);
3542 	  }
3543 	}
3544       }
3545     }
3546   }
3547   *bytes_done = len;
3548   return dataptr;
3549 }
3550 #endif
3551 
3552 /**
3553   * @param proc IN id of remote client to put to
3554   * @param buf IN local buf (has to be registered)
3555  */
armci_send_data_to_client(int proc,void * buf,int bytes,void * dbuf)3556 void armci_send_data_to_client(int proc, void *buf, int bytes,void *dbuf)
3557 {
3558   int i, rc = 0;
3559     struct ibv_send_wr *bad_wr;
3560     struct ibv_send_wr sdscr;
3561     struct ibv_sge ssg_entry;
3562 
3563     if(DEBUG_SERVER){
3564        printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
3565                armci_me,
3566 	      proc,dbuf,(char *)dbuf+bytes-sizeof(int),bytes);fflush(stdout);
3567     }
3568 
3569     memset(&sdscr,0,sizeof(struct ibv_send_wr));
3570     memset(&ssg_entry,0,sizeof(ssg_entry));
3571     armci_init_cbuf_srdma(&sdscr,&ssg_entry,buf,dbuf,bytes,
3572                           &serv_memhandle,(handle_array+proc));
3573 
3574     if(DEBUG_SERVER){
3575        printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
3576               proc,(void*)&handle_array[proc],(char *)dbuf,
3577               (char *)dbuf+bytes-sizeof(int),bytes);
3578        fflush(stdout);
3579     }
3580 
3581 #if defined(PEND_BUFS)
3582     for(i=proc*(IMM_BUF_NUM+1); i<(proc+1)*(IMM_BUF_NUM+1); i++) {
3583       if((char*)buf>= serv_buf_arr[i]->buf &&
3584 	 (char*)buf<IMM_BUF_LEN+(char*)serv_buf_arr[i]->buf)
3585 	break;
3586     }
3587 
3588 #if SRI_CORRECT
3589      if(i<(proc+1)*(IMM_BUF_NUM+1)) {
3590       /*Message from an immediate buffer*/
3591      assert(serv_buf_arr[i]->send_pending==0);
3592       serv_buf_arr[i]->send_pending=1;
3593       sdscr.wr_id = DSCRID_IMMBUF_RESP+i;
3594     }
3595     else
3596 #endif
3597       {
3598 	sdscr.wr_id = DSCRID_IMMBUF_RESP+armci_nproc*(IMM_BUF_NUM+1)+1;
3599       }
3600 /* #endif */
3601 
3602 /* #if defined(PEND_BUFS) */
3603 /*     { */
3604 /*       static uint64_t ctr=DSCRID_IMMBUF_RESP; */
3605 /*       sdscr.wr_id = ctr; */
3606 /*       ctr = (ctr+1-DSCRID_IMMBUF_RESP)%(DSCRID_IMMBUF_RESP_END-DSCRID_IMMBUF_RESP)+DSCRID_IMMBUF_RESP; */
3607 /*     } */
3608 #else
3609     sdscr.wr_id = proc+armci_nproc;
3610 #endif
3611     rc = ibv_post_send((CLN_con+proc)->qp, &sdscr, &bad_wr);
3612     dassert1(1,rc==0,rc);
3613 
3614 #if !defined(PEND_BUFS)
3615     armci_send_complete(&sdscr,"armci_send_data_to_client",1);
3616 #endif
3617 }
3618 
armci_WriteToDirect(int proc,request_header_t * msginfo,void * buf)3619 void armci_WriteToDirect(int proc, request_header_t* msginfo, void *buf)
3620 {
3621 int bytes;
3622 int *last;
3623     ARMCI_PR_DBG("enter",0);
3624     bytes = (int)msginfo->datalen;
3625     if(DEBUG_SERVER){
3626       printf("%d(s):write to direct sent %d to %d at %p\n",armci_me,
3627              bytes,proc,(char *)msginfo->tag.data_ptr);
3628       fflush(stdout);
3629     }
3630     if(msginfo->operation!=GET){
3631        *(int *)((char *)buf+bytes)=ARMCI_STAMP;
3632        bytes+=sizeof(int);
3633     }
3634 #if defined(PEND_BUFS)
3635     if(!msginfo->tag.imm_msg) {
3636       int i;
3637 /*       fprintf(stderr, "%d:: Not immediate mesg operated on\n", armci_me); */
3638       assert(msginfo->operation == GET); /*nothing else uses this for now*/
3639       /**This is a pending buf*/
3640       vapibuf_pend_t *pbuf=NULL;
3641       int index;
3642       for(i = 0; i<PENDING_BUF_NUM; i++) {
3643 	if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
3644 	  pbuf = &serv_pendbuf_arr[i];
3645 	  index = i;
3646 	  break;
3647 	}
3648       }
3649       assert(pbuf != NULL);
3650       assert(sizeof(request_header_t)+msginfo->dscrlen+bytes<PENDING_BUF_LEN);
3651       _armci_send_data_to_client_pbuf(proc, &pbuf->sdscr,
3652 				      PBUF_BUFID_TO_PUT_WRID(index),
3653 				      &pbuf->sg_entry,
3654 				      msginfo->tag.data_ptr, buf,
3655 				      bytes);
3656     }
3657     else
3658 #endif
3659     {
3660       armci_send_data_to_client(proc,buf,bytes,msginfo->tag.data_ptr);
3661     }
3662     /*if(msginfo->dscrlen >= (bytes-sizeof(int)))
3663        last = (int*)(((char*)(buf)) + (msginfo->dscrlen+bytes - sizeof(int)));
3664     else*/
3665        last = (int*)(((char*)(buf)) + (bytes - sizeof(int)));
3666 
3667     if(msginfo->operation==GET && *last == ARMCI_STAMP){
3668        SERVER_SEND_ACK(msginfo->from);
3669     }
3670     armci_ack_proc=NONE;
3671     ARMCI_PR_DBG("exit",0);
3672 }
3673 
3674 
3675 #if defined(PEND_BUFS)
armci_rcv_req(void * mesg,void * phdr,void * pdescr,void * pdata,int * buflen)3676 void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
3677 {
3678   request_header_t *msginfo = *(request_header_t**)mesg;
3679   *(void **)phdr = msginfo;
3680 
3681   if(msginfo->tag.imm_msg)
3682     *buflen = IMM_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
3683   else
3684     *buflen = PENDING_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
3685 
3686   *(void **)pdata = msginfo->dscrlen + (char *)(msginfo+1);
3687   if(msginfo->bytes)
3688     *(void **)pdescr = msginfo+1;
3689   else
3690     *(void **)pdescr = NULL;
3691 }
3692 #else
armci_rcv_req(void * mesg,void * phdr,void * pdescr,void * pdata,int * buflen)3693 void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
3694 {
3695   vapibuf_t *cbuf = (vapibuf_t*)mesg;
3696   request_header_t *msginfo = (request_header_t *)cbuf->buf;
3697   *(void **)phdr = msginfo;
3698 
3699   ARMCI_PR_DBG("enter",msginfo->operation);
3700   if(DEBUG_SERVER){
3701     printf("%d(server): got %d req (dscrlen=%d datalen=%d) from %d\n",
3702 	   armci_me, msginfo->operation, msginfo->dscrlen,
3703 	   msginfo->datalen, msginfo->from); fflush(stdout);
3704   }
3705 
3706   /* we leave room for msginfo on the client side */
3707   *buflen = MSG_BUFLEN - sizeof(request_header_t);
3708 
3709   if(msginfo->bytes) {
3710     *(void **)pdescr = msginfo+1;
3711     if(msginfo->operation == GET)
3712       *(void **)pdata = MessageRcvBuffer;
3713     else
3714       *(void **)pdata = msginfo->dscrlen + (char*)(msginfo+1);
3715   }else {
3716     *(void**)pdescr = NULL;
3717     *(void**)pdata = MessageRcvBuffer;
3718   }
3719   ARMCI_PR_DBG("exit",msginfo->operation);
3720 }
3721 #endif
3722 
posts_scatter_desc(sr_descr_t * pend_dscr,int proc,int type)3723 static void posts_scatter_desc(sr_descr_t *pend_dscr,int proc,int type)
3724 {
3725 int rc;
3726 int cluster = armci_clus_id(proc);
3727 struct ibv_recv_wr *scat_dscr;
3728 struct ibv_recv_wr *bad_wr;
3729 
3730     scat_dscr = &pend_dscr->rdescr;
3731 
3732     /*armci_vapi_print_dscr_info(NULL,scat_dscr);*/
3733     if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3734        printf("%d(%d) : inside posts scatter dscr, id is %lu\n",
3735 	      armci_me,type,scat_dscr->wr_id);
3736        fflush(stdout);
3737     }
3738 
3739     if(type == SERV)
3740         rc = ibv_post_recv((CLN_con + proc)->qp, scat_dscr, &bad_wr);
3741     else
3742         rc = ibv_post_recv((SRV_con+cluster)->qp, scat_dscr, &bad_wr);
3743     dassert1(1,rc==0,rc);
3744 
3745     if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
3746        printf("\n%d: list_length is %d, id is %ld\n",
3747 	      armci_me,scat_dscr->num_sge,scat_dscr->wr_id);
3748        fflush(stdout);
3749     }
3750 }
3751 
3752 
3753 /*\
3754  *  client calls from request.c
3755  *  server calls from ds-shared.c
3756 \*/
3757 static sr_descr_t client_blocking_scatter_dscr;
armci_post_scatter(void * dest_ptr,int dest_stride_arr[],int count[],int stride_levels,armci_vapi_memhndl_t * mhandle,int proc,int nbtag,int type,sr_descr_t ** srd)3758 void armci_post_scatter(void *dest_ptr, int dest_stride_arr[], int count[],
3759      int stride_levels, armci_vapi_memhndl_t *mhandle,
3760      int proc, int nbtag, int type, sr_descr_t **srd)
3761 {
3762     int i;
3763     int total_of_2D = 1;
3764     int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
3765     int j,k,y;
3766     int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
3767     char* src, *src1;
3768     sr_descr_t *pend_dscr;
3769     struct ibv_sge *scat_sglist;
3770     struct ibv_recv_wr *scat_dscr;
3771 
3772     if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
3773        printf("%d(%d)  : inside post_scatter %d\n",armci_me,type,nbtag);
3774        fflush(stdout);
3775     }
3776 
3777     max_seg =  armci_max_num_sg_ent;
3778 
3779     THREAD_LOCK(armci_user_threads.net_lock);
3780 
3781     if(nbtag){
3782        pend_dscr = armci_vapi_get_next_rdescr(nbtag,1);
3783        if(srd!=NULL)*srd=pend_dscr;
3784     }
3785     else{
3786        pend_dscr = &client_blocking_scatter_dscr;
3787        pend_dscr->rdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
3788     }
3789 
3790     /*pend_dscr->proc = proc;*/
3791     pend_dscr->numofrecvs=0;
3792 
3793     scat_dscr = &pend_dscr->rdescr;
3794     scat_sglist = pend_dscr->sg_entry;
3795     /* scat_dscr->opcode = VAPI_RECEIVE; no ->opcode in ibv_recv_wr */
3796     /* scat_dscr->comp_type = VAPI_SIGNALED; no ->comp_type in ibv_recv_wr */
3797     scat_dscr->sg_list = scat_sglist;
3798     scat_dscr->num_sge = 0;
3799 
3800     index[2] = 0; unit[2] = 1;
3801     if(stride_levels > 1){
3802        total_of_2D = count[2];
3803        for(j=3; j<=stride_levels; j++){
3804          index[j] = 0; unit[j] = unit[j-1]*count[j-1];
3805          total_of_2D*=count[j];
3806        }
3807     }
3808 
3809     num_xmit = total_of_2D*count[1]/max_seg;
3810     rem_seg = (total_of_2D*count[1])%max_seg;
3811     if(num_xmit == 0) num_xmit = 1;
3812     else if(rem_seg!= 0)num_xmit++;
3813 
3814 
3815     if ((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
3816        printf("%d(%d):armci_post_scatter num_xmit = %d\t, rem_seg = %d\n",
3817                armci_me,type,num_xmit,rem_seg);
3818        fflush(stdout);
3819     }
3820 
3821     k=0; vecind = 0;
3822     if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3823     else num_seg = max_seg;
3824 
3825     y=0;
3826     for(i=0;i<total_of_2D;i++){
3827        src = (char *)dest_ptr;
3828        for(j=2;j<=stride_levels;j++){
3829          src+= index[j]*dest_stride_arr[j-1];
3830          if(((i+1)%unit[j]) == 0) index[j]++;
3831          if(index[j] >= count[j]) index[j] =0;
3832        }
3833        src1 = src;
3834 
3835        for(j=0; j<count[1]; j++, vecind++){
3836          if(vecind == num_seg) {
3837            posts_scatter_desc(pend_dscr,proc,type);
3838            pend_dscr->numofrecvs++;
3839 
3840            /* the previous one has been posted, start off new*/
3841            scat_dscr->num_sge = 0;
3842            y = 0; /* reuse the same scatter descriptor */
3843            vecind=0;k++;
3844            if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3845          }
3846          /* fill the scatter descriptor */
3847          scat_sglist[y].addr = (uint64_t)src1;
3848          scat_sglist[y].lkey = mhandle->lkey;
3849          scat_sglist[y].length = count[0];
3850          scat_dscr->num_sge++;
3851          src1 += dest_stride_arr[0];
3852          y++;
3853 
3854        }
3855 
3856        if(vecind == num_seg){
3857          posts_scatter_desc(pend_dscr,proc,type);
3858          pend_dscr->numofrecvs++;
3859 
3860          /* the previous one has been posted, start off new*/
3861          scat_dscr->num_sge = 0;
3862          y =0 ;
3863          vecind = 0; k++;
3864          if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
3865          else num_seg = max_seg;
3866        }
3867 
3868     }
3869 
3870     THREAD_UNLOCK(armci_user_threads.net_lock);
3871 
3872 /*     printf("%d(s): num scatters posted=%d\n", armci_me,pend_dscr->numofrecvs); */
3873     if(!nbtag){
3874        /*if blocking call wait_for_blocking_scatter to complete*/
3875     }
3876     return;
3877 }
3878 
armci_wait_for_blocking_scatter()3879 void armci_wait_for_blocking_scatter()
3880 {
3881 sr_descr_t *pend_dscr=&client_blocking_scatter_dscr;
3882     armci_recv_complete(&pend_dscr->rdescr,"armci_post_scatter",pend_dscr->numofrecvs);
3883 }
3884 
3885 
3886 /*\
3887  *  function used by armci_post_gather to actually post the sctter list
3888 \*/
posts_gather_desc(sr_descr_t * pend_dscr,int proc,int type)3889 static void posts_gather_desc(sr_descr_t *pend_dscr,int proc,int type)
3890 {
3891     int rc;
3892     int cluster = armci_clus_id(proc);
3893     struct ibv_send_wr *gat_dscr;
3894     struct ibv_send_wr *bad_wr;
3895 
3896     THREAD_LOCK(armci_user_threads.net_lock);
3897 
3898     gat_dscr = &pend_dscr->sdescr;
3899     /*armci_vapi_print_dscr_info(gat_dscr,NULL);*/
3900     if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3901        printf("%d: type(client=1)=%d inside posts gather dscr, id is %lu\n",
3902 	      armci_me,type,gat_dscr->wr_id);
3903        fflush(stdout);
3904     }
3905 
3906     rc = 0;
3907     if(type == CLN){
3908        rc = ibv_post_send((SRV_con+cluster)->qp, gat_dscr, &bad_wr);
3909     }
3910     else{
3911         rc = ibv_post_send((CLN_con + proc)->qp, gat_dscr, &bad_wr);
3912     }
3913     dassert1(1,rc==0,rc);
3914 
3915     THREAD_UNLOCK(armci_user_threads.net_lock);
3916 
3917 }
3918 
3919 /*\
3920  *  posts a bunch of gather descriptors
3921 \*/
3922 static sr_descr_t client_blocking_gather_dscr;
armci_post_gather(void * src_ptr,int src_stride_arr[],int count[],int stride_levels,armci_vapi_memhndl_t * mhandle,int proc,int nbtag,int type,sr_descr_t ** srd)3923 void armci_post_gather(void *src_ptr, int src_stride_arr[], int count[],
3924       int stride_levels, armci_vapi_memhndl_t *mhandle,
3925       int proc,int nbtag, int type, sr_descr_t **srd)
3926 {
3927     int i;
3928     int total_of_2D = 1;
3929     int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
3930     int j,k,y;
3931     char *src, *src1;
3932     int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
3933     sr_descr_t *pend_dscr;
3934 
3935     struct ibv_sge *gat_sglist;
3936     struct ibv_send_wr *gat_dscr;
3937 
3938     if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3939       printf("%d(%d)  : inside post_gather\n",armci_me,type);
3940       fflush(stdout);
3941     }
3942 
3943     max_seg =  armci_max_num_sg_ent;
3944     if(nbtag){
3945        pend_dscr = armci_vapi_get_next_sdescr(nbtag,1);
3946        if(srd!=NULL)*srd=pend_dscr;
3947     }
3948     else{
3949        pend_dscr = &client_blocking_gather_dscr;
3950        pend_dscr->sdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
3951     }
3952     pend_dscr->numofsends=0;
3953 
3954     gat_dscr = &pend_dscr->sdescr;
3955     gat_sglist = pend_dscr->sg_entry;
3956     gat_dscr->opcode = IBV_WR_SEND;
3957     gat_dscr->send_flags = IBV_SEND_SIGNALED;
3958     gat_dscr->sg_list = gat_sglist;
3959     gat_dscr->num_sge = 0;
3960 /*     gat_dscr->send_flags = 0; */
3961 
3962     index[2] = 0; unit[2] = 1;
3963     if(stride_levels > 1){
3964       total_of_2D = count[2];
3965       for(j=3; j<=stride_levels; j++){
3966         index[j] = 0; unit[j] = unit[j-1]*count[j-1];
3967         total_of_2D*=count[j];
3968       }
3969     }
3970 
3971     num_xmit = total_of_2D*count[1]/max_seg;
3972     rem_seg = (total_of_2D*count[1])%max_seg;
3973     if(num_xmit == 0) num_xmit = 1;
3974     else if(rem_seg!= 0)num_xmit++;
3975 
3976     if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
3977        printf("%d(%d):armci_post_gather total_2D=%d, num_xmit=%d, rem_seg =%d, count[1] = %d\n",armci_me,type,total_of_2D, num_xmit,rem_seg,count[1]);
3978       fflush(stdout);
3979     }
3980 
3981     k=0; vecind = 0;
3982     if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3983     else num_seg = max_seg;
3984 
3985     y=0;
3986     for(i=0;i<total_of_2D;i++){
3987        src = (char *)src_ptr;
3988        for(j=2;j<=stride_levels;j++){
3989          src+= index[j]*src_stride_arr[j-1];
3990          if(((i+1)%unit[j]) == 0) index[j]++;
3991          if(index[j] >= count[j]) index[j] =0;
3992        }
3993        src1 = src;
3994 
3995        for(j=0; j<count[1]; j++, vecind++){
3996          if(vecind == num_seg){
3997            posts_gather_desc(pend_dscr,proc,type);
3998            pend_dscr->numofsends++;
3999 
4000            /* the previous one has been posted, start off new*/
4001            gat_dscr->num_sge = 0;
4002            y = 0;
4003            vecind=0;k++;
4004            if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
4005          }
4006 
4007          /* fill the gather descriptor */
4008          gat_sglist[y].addr = (uint64_t)src1;
4009          gat_sglist[y].lkey = mhandle->lkey;
4010          gat_sglist[y].length = count[0];
4011          gat_dscr->num_sge++;
4012          src1 += src_stride_arr[0];
4013          y++;
4014 
4015        }
4016 
4017        if(vecind == num_seg){
4018          posts_gather_desc(pend_dscr,proc,type);
4019          pend_dscr->numofsends++;
4020          if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
4021            printf("%d(%d)posts_gather_desc done\n",armci_me,type);
4022            fflush(stdout);
4023          }
4024 
4025          /* the previous one has been posted, start off new*/
4026          gat_dscr->num_sge = 0;
4027          y = 0;
4028          vecind = 0; k++;
4029          if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
4030          else num_seg = max_seg;
4031        }
4032     }
4033 /*     printf("%d: num gathers posted =%d\n",armci_me,pend_dscr->numofsends); */
4034     if(!nbtag){
4035        /*complete here*/
4036        armci_send_complete(&pend_dscr->sdescr,"armci_post_gather",pend_dscr->numofsends);
4037     }
4038     return;
4039 }
4040 /***********************END SCATTER GATHER STUFF******************************/
4041 
4042 
4043 
4044 /***********************SPECIAL SEND/RECV*************************************/
armci_server_direct_send(int dst,char * src_buf,char * dst_buf,int len,uint32_t * lkey,uint32_t * rkey)4045 void armci_server_direct_send(int dst, char *src_buf, char *dst_buf, int len,
4046                               uint32_t *lkey, uint32_t *rkey)
4047 {
4048     int rc = 0;
4049     struct ibv_wc *pdscr=NULL;
4050     struct ibv_wc pdscr1;
4051     struct ibv_send_wr sdscr;
4052     struct ibv_sge ssg_entry;
4053 
4054     pdscr = &pdscr1;
4055 
4056     if(DEBUG_SERVER){
4057        printf("\n%d(s):sending dir data to client %d at %p bytes=%d last=%p\n",
4058                 armci_me,dst,dst_buf,len,(dst_buf+len-4));fflush(stdout);
4059     }
4060 
4061     memset(&sdscr,0,sizeof(struct ibv_send_wr));
4062     armci_init_cbuf_srdma(&sdscr,&ssg_entry,src_buf,dst_buf,len,NULL,NULL);
4063     sdscr.wr.rdma.rkey = *rkey;
4064     ssg_entry.lkey = *lkey;
4065 
4066     sdscr.wr_id = dst+armci_nproc;
4067     struct ibv_send_wr *bad_wr;
4068     rc = ibv_post_send((CLN_con+dst)->qp, &sdscr, &bad_wr);
4069     dassert1(1,rc==0,rc);
4070 
4071     while (rc == 0) {
4072        rc = ibv_poll_cq(CLN_nic->scq, 1, pdscr);
4073     }
4074     dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d\n",
4075 		      armci_me,rc,(int)pdscr->wr_id,pdscr->status));
4076     dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);;
4077 }
4078 
4079 
4080 
armci_send_contig_bypass(int proc,request_header_t * msginfo,void * src_ptr,void * rem_ptr,int bytes)4081 void armci_send_contig_bypass(int proc, request_header_t *msginfo,
4082                               void *src_ptr, void *rem_ptr, int bytes)
4083 {
4084     int *last;
4085     uint32_t *lkey=NULL;
4086     uint32_t *rkey;
4087     int dscrlen = msginfo->dscrlen;
4088 
4089     last = (int*)(((char*)(src_ptr)) + (bytes - sizeof(int)));
4090     dassertp(1,msginfo->pinned,("%d: not pinned proc=%d",armci_me,proc));
4091 
4092     rkey = (uint32_t *)((char *)(msginfo+1)+dscrlen-(sizeof(uint32_t)+sizeof(uint32_t)));
4093 
4094     if(DEBUG_SERVER){
4095        printf("%d(server): sending data bypass to %d (%p,%p) %d %d\n", armci_me,
4096                msginfo->from,src_ptr, rem_ptr,*lkey,*rkey);
4097        fflush(stdout);
4098     }
4099     armci_server_direct_send(msginfo->from,src_ptr,rem_ptr,bytes,lkey,rkey);
4100 
4101     if(*last == ARMCI_STAMP){
4102        SERVER_SEND_ACK(msginfo->from);
4103     }
4104 }
4105 
armci_rcv_strided_data_bypass_both(int proc,request_header_t * msginfo,void * ptr,int * count,int stride_levels)4106 void armci_rcv_strided_data_bypass_both(int proc, request_header_t *msginfo,
4107                                        void *ptr, int *count, int stride_levels)
4108 {
4109 int datalen = msginfo->datalen;
4110 int *last;
4111 long *ack;
4112 int loop=0;
4113 
4114     if(DEBUG_CLN){ printf("%d:rcv_strided_data_both bypass from %d\n",
4115                 armci_me,  proc); fflush(stdout);
4116     }
4117     if(!stride_levels){
4118       last = (int*)(((char*)(ptr)) + (count[0] -sizeof(int)));
4119       ack  = (long *)&msginfo->tag;
4120       while(armci_util_int_getval(last) == ARMCI_STAMP &&
4121             armci_util_long_getval(ack)  != ARMCI_STAMP){
4122         loop++;
4123         loop %=1000000;
4124         if(loop==0){
4125           if(DEBUG_CLN){
4126             printf("%d: client last(%p)=%d ack(%p)=%ld off=%d\n",
4127                   armci_me,(void*)last,*last,(void*)ack,*ack,(int)((char*)last - (char*)ptr));
4128             fflush(stdout);
4129           }
4130         }
4131       }
4132     }
4133     else {
4134       printf("\n%d:rcv_strided_data called, it should never be called\n",armci_me);
4135       armci_dscrlist_recv_complete(0,"armci_rcv_strided_data_bypass_both",NULL);
4136     }
4137 
4138     if(DEBUG_CLN){printf("%d:rcv_strided_data bypass both: %d bytes from %d\n",
4139                           armci_me, datalen, proc); fflush(stdout);
4140     }
4141 }
4142 
4143 
armci_pin_memory(void * ptr,int stride_arr[],int count[],int strides)4144 int armci_pin_memory(void *ptr, int stride_arr[], int count[], int strides)
4145 {
4146     fprintf(stderr, "[%d]:armci_pin_memory not implemented\n",armci_me);
4147     fflush(stderr);
4148     return 0;
4149 }
4150 
4151 
armci_client_send_ack(int proc,int n)4152 void armci_client_send_ack(int proc, int n)
4153 {
4154     printf("\n%d:client_send_ack not implemented",armci_me);fflush(stdout);
4155 }
4156 
4157 
armci_rcv_strided_data_bypass(int proc,request_header_t * msginfo,void * ptr,int stride_levels)4158 void armci_rcv_strided_data_bypass(int proc, request_header_t* msginfo,
4159                                    void *ptr, int stride_levels)
4160 {
4161     printf("\n%d:armci_rcv_strided_data_bypass not implemented",armci_me);
4162     fflush(stdout);
4163 }
4164 
4165 
armci_unpin_memory(void * ptr,int stride_arr[],int count[],int strides)4166 void armci_unpin_memory(void *ptr, int stride_arr[], int count[], int strides)
4167 {
4168     printf("\n%d:armci_unpin_memory not implemented",armci_me);fflush(stdout);
4169 }
4170 
4171 
armcill_server_wait_ack(int proc,int n)4172 int armcill_server_wait_ack(int proc, int n)
4173 {
4174     printf("\n%d:armcill_server_wait_ack not implemented",armci_me);
4175     fflush(stdout);
4176     return(0);
4177 }
4178 
4179 
armcill_server_put(int proc,void * s,void * d,int len)4180 void armcill_server_put(int proc, void* s, void *d, int len)
4181 {
4182     printf("\n%d:armcill_server_put not implemented",armci_me);fflush(stdout);
4183 }
4184 
4185 
4186 /*\
4187  *  initialising the atomic send descriptor
4188 \*/
armci_init_vapibuf_atomic(struct ibv_send_wr * sd,struct ibv_sge * sg,int op,int * ploc,int * prem,int extra,int id,ARMCI_MEMHDL_T * lhandle,ARMCI_MEMHDL_T * rhandle)4189 void armci_init_vapibuf_atomic(struct ibv_send_wr *sd, struct ibv_sge *sg,
4190                    int op, int*ploc,int *prem, int extra,
4191                    int id,ARMCI_MEMHDL_T *lhandle,
4192                    ARMCI_MEMHDL_T *rhandle)
4193 {
4194     if (1) {
4195        printf("%d(c) : entered armci_init_vapibuf_atomic\n",armci_me);
4196        fflush(stdout);
4197     }
4198     memset(sd,0,sizeof(struct ibv_send_wr));
4199     if (op == ARMCI_FETCH_AND_ADD_LONG ) {
4200        printf("%d(c) :setting opcode for snd dscr to FETCH_AND_ADD\n",armci_me);
4201        sd->opcode = IBV_WR_ATOMIC_FETCH_AND_ADD;
4202        sd->wr.atomic.compare_add = (uint64_t)extra;
4203     } else if(op == ARMCI_SWAP_LONG){
4204        sd->opcode = IBV_WR_ATOMIC_CMP_AND_SWP;
4205        sd->wr.atomic.swap = (uint64_t)extra;
4206     }
4207     sd->send_flags = IBV_SEND_SIGNALED;
4208     sg->length = 8; /* 64 bit atomic*/
4209     printf("--------\n");
4210     sg->addr= (uint64_t)(void *)ploc;
4211     if(lhandle)
4212     sg->lkey = lhandle->lkey;
4213     sd->sg_list = sg;
4214     sd->num_sge = 1;
4215     sd->wr.atomic.remote_addr = (uint64_t)(void *)prem;
4216     if(rhandle)
4217        sd->wr.atomic.rkey = rhandle->rkey; /* how do we get the remote key  */
4218     sd->wr_id = DSCRID_RMW + armci_me;
4219 
4220     if(1){
4221        printf("%d(c) : finished initialising atomic send desc id is %ld,armci_ime = %d\n",armci_me,sd->wr_id,armci_me);
4222        fflush(stdout);
4223     }
4224 }
4225 /*\
4226  *   using vapi remote atomic operations
4227 \*/
client_rmw_complete(struct ibv_send_wr * snd_dscr,char * from)4228 void client_rmw_complete(struct ibv_send_wr *snd_dscr, char *from)
4229 {
4230     int rc = 0;
4231     struct ibv_wc pdscr1;
4232     struct ibv_wc *pdscr=&pdscr1;
4233 
4234   printf("%d(c) : inside client_rmw_complete\n",armci_me);
4235   do {
4236       while (rc == 0) {
4237 	rc =  ibv_poll_cq(CLN_nic->scq, 1, pdscr);
4238       }
4239       dassertp(DBG_POLL|DBG_ALL,rc>=0,
4240 	       ("%d: rc=%d id=%d status=%d\n",
4241 		armci_me,rc,pdscr->wr_id,pdscr->status));
4242       dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
4243       rc = 0;
4244     } while(pdscr->wr_id != snd_dscr->wr_id);
4245 }
4246 
4247 
armci_direct_rmw(int op,int * ploc,int * prem,int extra,int proc,ARMCI_MEMHDL_T * lhandle,ARMCI_MEMHDL_T * rhandle)4248 void armci_direct_rmw(int op, int*ploc, int *prem, int extra, int proc,
4249                       ARMCI_MEMHDL_T *lhandle, ARMCI_MEMHDL_T *rhandle)
4250 {
4251     int rc = 0;
4252     struct ibv_send_wr *sd;
4253     struct ibv_sge *sg;
4254     armci_connect_t *con;
4255 
4256     con = CLN_con+proc;
4257 
4258     sd = &(rmw[armci_me].rmw_dscr);
4259     sg = &(rmw[armci_me].rmw_entry);
4260 
4261     if (1) {
4262         printf("%d(c) : about to call armci_init_vapibuf_atomic\n",armci_me);
4263         fflush(stdout);
4264     }
4265 
4266   armci_init_vapibuf_atomic(sd, sg, op,ploc,prem,extra,proc,lhandle,rhandle);
4267 
4268   if (1) {
4269      printf("%d(c) : finished armci_init_vapibuf_atomic\n",armci_me);
4270      fflush(stdout);
4271   }
4272 
4273   struct ibv_send_wr * bad_wr;
4274   rc = ibv_post_send(con->qp, sd, &bad_wr);
4275   dassert1(1,rc==0,rc);
4276 
4277   if (1) {
4278      printf("%d(c) : finished posting desc\n",armci_me);
4279      fflush(stdout);
4280   }
4281 
4282   /*armci_send_complete(sd,"send_remote_atomic");*/
4283   client_rmw_complete(sd,"send_remote_atomic");
4284 
4285   return;
4286 }
4287 
4288 struct node *dto_q = NULL;
4289 
process_con_break_from_client(armci_ud_rank * h,cbuf * v)4290 void process_con_break_from_client(armci_ud_rank *h, cbuf *v)
4291 {
4292 
4293     armci_connect_t *con = CLN_con + h->src_rank;
4294 
4295     cbuf *v1 = get_cbuf();
4296 
4297     assert( v1 != NULL);
4298 
4299     v1->desc.u.sr.wr.ud.remote_qpn = rbuf.qp_num[h->src_rank];
4300     v1->desc.u.sr.wr.ud.remote_qkey = 0;
4301     v1->desc.u.sr.wr.ud.ah = conn.ud_ah[h->src_rank];
4302 
4303     armci_ud_rank *h1 = (armci_ud_rank *)CBUF_BUFFER_START(v1);
4304     h1->src_rank = armci_me;
4305     h1->qpnum = con->sqpnum;
4306     h1->msg_type = QP_CON_BREAK_FROM_SERVER;
4307     cbuf_init_send(v1, sizeof(armci_ud_rank));
4308 
4309     /* Release the receiving cbuf */
4310     release_cbuf(v);
4311 
4312     struct ibv_send_wr *send_wr;
4313     if(ibv_post_send(conn.qp[0], &(v1->desc.u.sr), &send_wr)) {
4314         fprintf(stderr, "Error posting send\n");
4315     }
4316 }
4317 
4318 
process_con_break_from_server(armci_ud_rank * h,cbuf * v)4319 void process_con_break_from_server(armci_ud_rank *h, cbuf *v)
4320 {
4321 
4322     armci_connect_t *con = SRV_con + armci_clus_id(h->src_rank);
4323     con->state = QP_INACTIVE;
4324     release_cbuf(v);
4325 }
4326 
process_recv_completion_from_client(armci_ud_rank * h,cbuf * v)4327 void process_recv_completion_from_client(armci_ud_rank *h, cbuf *v)
4328 {
4329 
4330     struct ibv_qp_attr qp_attr;
4331     enum ibv_qp_attr_mask qp_attr_mask;
4332     struct ibv_recv_wr *bad_wr;
4333 
4334     int rc, j;
4335 
4336     static int qp_flag = 1;
4337 
4338     if (qp_flag == 1) {
4339         total_active_conn_to_client = 0;
4340         qp_flag = 0;
4341     }
4342 
4343     total_active_conn_to_client++;
4344 
4345     armci_connect_t *con = CLN_con + h->src_rank;
4346 
4347     if (con->state == QP_ACTIVE) {
4348 
4349         qp_attr_mask = IBV_QP_STATE;
4350 
4351         memset(&qp_attr, 0, sizeof qp_attr);
4352         qp_attr.qp_state        = IBV_QPS_SQD;
4353 
4354         if (ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask)) {
4355             fprintf(stdout," Error modifying QP\n");
4356             fflush(stdout);
4357         }
4358 
4359         if (ibv_destroy_qp(con->qp)) {
4360             printf("Error destroying QP\n");
4361         }
4362 
4363         total_active_conn_to_client--;
4364     }
4365     armci_create_qp(CLN_nic, &con->qp);
4366     con->sqpnum = con->qp->qp_num;
4367     con->lid    = CLN_nic->lid_arr[h->src_rank];
4368     CLN_rqpnumtmpbuf[h->src_rank] = con->qp->qp_num;
4369     qp_attr_mask = IBV_QP_STATE
4370         | IBV_QP_PKEY_INDEX
4371         | IBV_QP_PORT
4372         | IBV_QP_ACCESS_FLAGS;
4373 
4374     memset(&qp_attr, 0, sizeof qp_attr);
4375     qp_attr.qp_state        = IBV_QPS_INIT;
4376     qp_attr.pkey_index      = DEFAULT_PKEY_IX;
4377     qp_attr.port_num        = CLN_nic->active_port;
4378     qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE|
4379         IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
4380 
4381     rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4382 
4383     memset(&qp_attr, 0, sizeof qp_attr);
4384     qp_attr_mask = IBV_QP_STATE
4385         | IBV_QP_MAX_DEST_RD_ATOMIC
4386         | IBV_QP_PATH_MTU
4387         | IBV_QP_RQ_PSN
4388         | IBV_QP_MIN_RNR_TIMER;
4389     qp_attr.qp_state           = IBV_QPS_RTR;
4390     qp_attr.path_mtu           = IBV_MTU_1024;          /*MTU*/
4391     qp_attr.max_dest_rd_atomic = 4;
4392     qp_attr.min_rnr_timer      = RNR_TIMER;
4393     qp_attr.rq_psn             = 0;
4394 
4395     /* Fill in the information from the header */
4396     SRV_rqpnums[h->src_rank] = h->qpnum;
4397 
4398     qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
4399     qp_attr.dest_qp_num  = SRV_rqpnums[h->src_rank];
4400     qp_attr.ah_attr.dlid = SRV_nic->lid_arr[h->src_rank];
4401     qp_attr.ah_attr.port_num = CLN_nic->active_port;
4402 
4403     rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4404 
4405     memset(&qp_attr, 0, sizeof qp_attr);
4406 
4407     qp_attr_mask = IBV_QP_STATE
4408         | IBV_QP_SQ_PSN
4409         | IBV_QP_TIMEOUT
4410         | IBV_QP_RETRY_CNT
4411         | IBV_QP_RNR_RETRY
4412         | IBV_QP_MAX_QP_RD_ATOMIC;
4413 
4414     qp_attr.qp_state            = IBV_QPS_RTS;
4415     qp_attr.sq_psn              = 0;
4416     qp_attr.timeout             = 18;
4417     qp_attr.retry_cnt           = 7;
4418     qp_attr.rnr_retry           = 7;
4419     qp_attr.max_rd_atomic  = 4;
4420     rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
4421 
4422 #if defined(PEND_BUFS)
4423     assert(armci_nproc*(IMM_BUF_NUM+1)<DSCRID_IMMBUF_RECV_END-DSCRID_IMMBUF_RECV);
4424     for(j=0; j<IMM_BUF_NUM+1; j++) {
4425         vapibuf_t *cbuf;
4426         cbuf = serv_buf_arr[h->src_rank*(IMM_BUF_NUM+1)+j];
4427         armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
4428                 IMM_BUF_LEN, &serv_memhandle);
4429         cbuf->dscr.wr_id = h->src_rank*(IMM_BUF_NUM+1)+j + DSCRID_IMMBUF_RECV;
4430         if (DEBUG_SERVER) {
4431             printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
4432             fflush(stdout);
4433         }
4434         if (armci_use_srq) {
4435             rc = ibv_post_srq_recv(CLN_srq_hndl, &cbuf->dscr, &bad_wr);
4436         }
4437         else {
4438             rc = ibv_post_recv((CLN_con + h->src_rank)->qp, &cbuf->dscr, &bad_wr);
4439         }
4440 
4441         dassert1(1,rc==0,rc);
4442     }
4443 #else
4444     int i;
4445     for(i =  0; i < armci_nproc; i++) {
4446         vapibuf_t *cbuf;
4447         cbuf = serv_buf_arr[h->src_rank];
4448         armci_init_vapibuf_recv(&cbuf->dscr, &cbuf->sg_entry, cbuf->buf,
4449                 CBUF_DLEN, &serv_memhandle);
4450         cbuf->dscr.wr_id = h->src_rank+armci_nproc;
4451         if (DEBUG_SERVER) {
4452             printf("\n%d(s):posted rr with lkey=%d",armci_me,cbuf->sg_entry.lkey);
4453             fflush(stdout);
4454         }
4455         if (armci_use_srq) {
4456             rc = ibv_post_srq_recv(CLN_srq_hndl, &cbuf->dscr, &bad_wr);
4457         }
4458         else {
4459             rc = ibv_post_recv((CLN_con+h->src_rank)->qp, &cbuf->dscr, &bad_wr);
4460         }
4461         dassert1(1,rc==0,rc);
4462     }
4463 #endif
4464 
4465     /* Now send back the information */
4466 
4467     struct cbuf *v1 = get_cbuf();
4468 
4469     assert( v1 != NULL);
4470 
4471     v1->desc.u.sr.wr.ud.remote_qpn = rbuf.qp_num[h->src_rank];
4472     v1->desc.u.sr.wr.ud.remote_qkey = 0;
4473     v1->desc.u.sr.wr.ud.ah = conn.ud_ah[h->src_rank];
4474 
4475     armci_ud_rank *h1 = (armci_ud_rank *)CBUF_BUFFER_START(v1);
4476     h1->src_rank = armci_me;
4477     h1->qpnum = con->sqpnum;
4478     h1->msg_type = 2;
4479     cbuf_init_send(v1, sizeof(armci_ud_rank));
4480 
4481     /* Release the receiving cbuf */
4482     release_cbuf(v);
4483 
4484     struct ibv_send_wr *send_wr;
4485     if(ibv_post_send(conn.qp[0], &(v1->desc.u.sr), &send_wr)) {
4486         fprintf(stderr, "Error posting send\n");
4487     }
4488 
4489     con->state = QP_ACTIVE;
4490 }
4491 
progress_engine()4492 void progress_engine()
4493 {
4494     int ne;
4495     struct ibv_wc wc;
4496     void *cbuf_addr;
4497     cbuf *v;
4498 
4499     do {
4500         ne = ibv_poll_cq(hca.cq, 1, &wc);
4501     } while (ne < 1);
4502 
4503     /* Okay, got an entry, check for errors */
4504     if(ne < 0) {
4505         fprintf(stderr,"Error Polling CQ\n");
4506     }
4507 
4508     if(wc.status != IBV_WC_SUCCESS) {
4509         fprintf(stderr, "[%d] Failed status %d\n",
4510                 armci_me, wc.status);
4511     }
4512     cbuf_addr = (void *) ((aint_t) wc.wr_id);
4513     assert(cbuf_addr != NULL);
4514     v = (cbuf *)cbuf_addr;
4515     armci_ud_rank *h = (armci_ud_rank *)(CBUF_BUFFER_START(v) + 40);
4516 
4517     if(IBV_WC_SEND == wc.opcode
4518             || IBV_WC_RDMA_WRITE == wc.opcode) {
4519         release_cbuf(v);
4520         /* Do nothing, just release the cbuf */
4521         /* Send Completion */
4522     } else if (IBV_WC_RECV == wc.opcode) {
4523         /* Recv completion */
4524         post_recv();
4525 
4526         /* Check if the message received is from a data server */
4527         assert((h->msg_type == QP_CON_REQ)
4528                 || (h->msg_type == QP_CON_ACK)
4529                 || (h->msg_type == QP_CON_BREAK_FROM_CLIENT)
4530                 || (h->msg_type == QP_CON_BREAK_FROM_SERVER));
4531         if (h->msg_type == QP_CON_REQ) {
4532             process_recv_completion_from_client(h, v);
4533         }
4534         else if (h->msg_type == QP_CON_BREAK_FROM_CLIENT) {
4535             process_con_break_from_client(h, v);
4536         }
4537         else if (h->msg_type == QP_CON_BREAK_FROM_SERVER) {
4538             process_con_break_from_server(h, v);
4539         }
4540         else {
4541             process_recv_completion_from_server(h, v);
4542         }
4543     } else {
4544         fprintf(stderr, "Unknown opcode recv'd\n");
4545     }
4546 
4547 }
4548 
4549 
async_thread_ud_events(void * context)4550 void* async_thread_ud_events(void *context)
4551 {
4552     while (1) {
4553         progress_engine();
4554     }
4555 
4556     return NULL;
4557 }
4558 
4559 
4560 #if 0
4561 static struct ibv_srq *create_srq(vapi_nic_t *nic)
4562 {
4563     struct ibv_srq_init_attr srq_init_attr;
4564     struct ibv_srq *srq_ptr = NULL;
4565 
4566     memset(&srq_init_attr, 0, sizeof(srq_init_attr));
4567 
4568     srq_init_attr.srq_context = nic->handle;
4569 #ifdef PEND_BUFS
4570     srq_init_attr.attr.max_wr = armci_nproc * (IMM_BUF_NUM + 1) + 200;
4571 #else
4572     srq_init_attr.attr.max_wr = armci_nproc + 200;
4573 #endif
4574     srq_init_attr.attr.max_sge = 1;
4575     /* The limit value should be ignored during SRQ create */
4576     srq_init_attr.attr.srq_limit = 30;
4577 
4578     srq_ptr = ibv_create_srq(nic->ptag, &srq_init_attr);
4579     return srq_ptr;
4580 }
4581 #endif
4582 
init_params(void)4583 int init_params(void)
4584 {
4585     conn.qp = (struct ibv_qp **) malloc(armci_nproc * sizeof(struct ibv_qp *));
4586     conn.lid = (uint16_t *) malloc(armci_nproc * sizeof(int));
4587     conn.qp_num = (uint32_t *) malloc(armci_nproc * sizeof(int));
4588     conn.ud_ah = (struct ibv_ah **) malloc (armci_nproc * sizeof (struct ibv_ah *));
4589     conn.status = (int *) malloc(armci_nproc * sizeof(int));
4590 
4591     memset((void *)conn.status, 0, sizeof(int) * armci_nproc);
4592 
4593     rbuf.qp_num = (uint32_t *) malloc(armci_nproc * sizeof(int));
4594     rbuf.lid = (uint16_t *) malloc(armci_nproc * sizeof(int));
4595     rbuf.rkey = (uint32_t *) malloc(armci_nproc * sizeof(int));
4596     rbuf.buf = (char **) malloc(armci_nproc * sizeof(char *));
4597 
4598     assert(conn.qp && conn.lid && rbuf.qp_num && rbuf.lid && rbuf.rkey && rbuf.buf);
4599     return 0;
4600 }
4601 
4602 
open_hca(void)4603 int open_hca(void)
4604 {
4605     struct ibv_device **dev_list;
4606 
4607     int num_hcas;
4608     dev_list = ibv_get_device_list(&num_hcas);
4609 
4610 
4611     hca.ib_dev = dev_list[0];
4612 
4613     hca.context = ibv_open_device(hca.ib_dev);
4614 
4615     if(!hca.context) {
4616         fprintf(stderr,"Couldn't get context %s\n",
4617                 ibv_get_device_name(hca.ib_dev));
4618         return 1;
4619     }
4620 
4621     hca.pd = ibv_alloc_pd(hca.context);
4622 
4623     if(!hca.pd) {
4624         fprintf(stderr,"Couldn't get pd %s\n",
4625                 ibv_get_device_name(hca.ib_dev));
4626         return 1;
4627     }
4628     return 0;
4629 }
4630 
create_cq(void)4631 int create_cq(void)
4632 {
4633     hca.cq = ibv_create_cq(hca.context, 16000, NULL,
4634                     NULL, 0);
4635     if(!hca.cq) {
4636         fprintf(stderr, "Couldn't create CQ\n");
4637         return 1;
4638     }
4639     if (armci_me == armci_master) {
4640         pthread_create(&armci_async_thread[1], NULL,
4641                 &async_thread_ud_events, (void *) hca.context);
4642     }
4643 
4644     return 0;
4645 }
4646 
get_lid(void)4647 int get_lid(void)
4648 {
4649     struct ibv_port_attr port_attr[2];
4650     int i, j;
4651 
4652     for (j = 1; j <= 1; j++) {
4653         if (!ibv_query_port(hca.context, j, &port_attr[j - 1])
4654                 && (port_attr[j - 1].state == IBV_PORT_ACTIVE)) {
4655             for (i = 0; i < armci_nproc; i++)
4656                 conn.lid[i] = port_attr[j - 1].lid;
4657             return 0;
4658         }
4659     }
4660 
4661     return 1;
4662 }
4663 
exch_addr(void)4664 int exch_addr(void)
4665 {
4666     MPI_Allgather((void *)conn.qp_num, sizeof(uint32_t), MPI_BYTE,
4667             (void *)rbuf.qp_num, sizeof(uint32_t), MPI_BYTE, ARMCI_COMM_WORLD);
4668     MPI_Alltoall((void *)conn.lid, sizeof(uint16_t), MPI_BYTE,
4669             (void *)rbuf.lid, sizeof(uint16_t), MPI_BYTE, ARMCI_COMM_WORLD);
4670 
4671 #ifdef DEBUG
4672     for (i = 0; i < nprocs; i++) {
4673         if (me == i)
4674             continue;
4675         fprintf(stdout,"[%d] Remote QP %d, Remote LID %u, Rkey %u, Lkey %u\n"
4676                 " LBuf %p, RBuf %p\n",
4677                 me, rbuf.qp_num[i], rbuf.lid[i], rbuf.rkey[i], lbuf.mr->lkey,
4678                 lbuf.buf, rbuf.buf[i]);
4679         fflush(stdout);
4680     }
4681 #endif
4682 
4683     return 0;
4684 }
4685 
create_qp(void)4686 int create_qp(void)
4687 {
4688     struct ibv_qp_attr qp_attr;
4689 
4690     memset(&qp_attr, 0, sizeof qp_attr);
4691 
4692     struct ibv_qp_init_attr attr = {
4693         .send_cq = hca.cq,
4694         .recv_cq = hca.cq,
4695         .cap     = {
4696             .max_send_wr  = 8192,
4697             .max_recv_wr  = 8192,
4698             .max_send_sge = 1,
4699             .max_recv_sge = 1,
4700             .max_inline_data = 0
4701         },
4702         .qp_type = IBV_QPT_UD
4703     };
4704 
4705     conn.qp[0] = ibv_create_qp(hca.pd, &attr);
4706     if(!conn.qp[0]) {
4707         fprintf(stderr,"Couldn't create QP\n");
4708         return 1;
4709     }
4710 
4711     conn.qp_num[0] = conn.qp[0]->qp_num;
4712     qp_attr.qp_state = IBV_QPS_INIT;
4713     qp_attr.pkey_index = 0;
4714     qp_attr.port_num   = 1;
4715     qp_attr.qkey = 0;
4716 
4717     if(ibv_modify_qp(conn.qp[0], &qp_attr,
4718                 IBV_QP_STATE              |
4719                 IBV_QP_PKEY_INDEX         |
4720                 IBV_QP_PORT               |
4721                 IBV_QP_QKEY)) {
4722         fprintf(stderr,"Could not modify QP to INIT\n");
4723         return 1;
4724     }
4725 #ifdef DEBUG
4726     fprintf(stdout,"[%d] Created QP %d, LID %d\n", me,
4727             conn.qp_num[0], conn.lid[0]);
4728     fflush(stdout);
4729 #endif
4730 
4731     return 0;
4732 }
armci_register_memory(void * buf,int len)4733 struct ibv_mr* armci_register_memory(void* buf, int len)
4734 {
4735     return (ibv_reg_mr(hca.pd, buf, len,
4736                 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE |
4737                 IBV_ACCESS_REMOTE_READ));
4738 }
4739 
connect_qp(void)4740 int connect_qp(void)
4741 {
4742     struct ibv_qp_attr attr;
4743     int i;
4744     memset(&attr, 0 , sizeof attr);
4745 
4746     attr.qp_state       = IBV_QPS_RTR;
4747     if (ibv_modify_qp(conn.qp[0], &attr,
4748                 IBV_QP_STATE)) {
4749         fprintf(stderr, "Failed to modify QP to RTR\n");
4750         return 1;
4751     }
4752 
4753     attr.qp_state       = IBV_QPS_RTS;
4754     attr.sq_psn         = 0;
4755     if (ibv_modify_qp(conn.qp[0], &attr,
4756                 IBV_QP_STATE              |
4757                 IBV_QP_SQ_PSN)) {
4758         fprintf(stderr, "Failed to modify QP to RTS\n");
4759         return 1;
4760     }
4761     for (i = 0; i < armci_nproc; i++) {
4762         struct ibv_ah_attr ah_attr;
4763         memset(&ah_attr, 0, sizeof(ah_attr));
4764         ah_attr.is_global = 0;
4765         ah_attr.dlid = rbuf.lid[i];
4766         ah_attr.sl = 0;
4767         ah_attr.src_path_bits = 0;
4768         ah_attr.port_num = 1;
4769 
4770         conn.ud_ah[i] = ibv_create_ah(hca.pd, &ah_attr);
4771 
4772         if (!conn.ud_ah[i]) {
4773             fprintf(stderr, "Error creating address handles\n");
4774         }
4775     }
4776 
4777     return 0;
4778 }
4779 
4780 int total_ud_recv_buffers = 4096;
4781 
4782 
post_recv()4783 void post_recv()
4784 {
4785     cbuf *v = get_cbuf();
4786     cbuf_init_recv(v, CBUF_BUFFER_SIZE);
4787     struct ibv_recv_wr *bad_wr;
4788 
4789     /* set id to be the global_rank */
4790     v->grank = -1;
4791 
4792     if(ibv_post_recv(conn.qp[0], &(v->desc.u.rr), &bad_wr)) {
4793         fprintf(stderr," Error posting UD recv\n");
4794         fflush(stderr);
4795     }
4796 }
4797 
4798 
post_buffers()4799 int post_buffers()
4800 {
4801     init_cbuf_lock();
4802     allocate_cbufs(8192);
4803     int i;
4804     for (i = 0; i < total_ud_recv_buffers; i++) {
4805         post_recv();
4806     }
4807     return 0;
4808 }
4809 
test_connectivity(void)4810 void test_connectivity(void)
4811 {
4812     int i;
4813     struct ibv_send_wr *bad_wr;
4814 
4815     cbuf *v = NULL;
4816     for (i = 0; i < armci_nproc;i++) {
4817         if (armci_me == i)
4818             continue;
4819 
4820         int j;
4821 
4822         for (j = 0 ; j < 2; j++) {
4823             v = get_cbuf();
4824             assert(v != NULL);
4825             v->desc.u.sr.wr.ud.remote_qpn = rbuf.qp_num[i];
4826             v->desc.u.sr.wr.ud.remote_qkey = 0;
4827             v->desc.u.sr.wr.ud.ah = conn.ud_ah[i];
4828 
4829             armci_ud_rank *h = (armci_ud_rank *)CBUF_BUFFER_START(v);
4830             h->src_rank = armci_me;
4831             cbuf_init_send(v, sizeof(armci_ud_rank));
4832             if(ibv_post_send(conn.qp[0], &(v->desc.u.sr), &bad_wr)) {
4833                 fprintf(stderr, "Error posting send\n");
4834             }
4835         }
4836     }
4837 }
4838 
4839 #if 0
4840 int recycle_dead_qp()
4841 {
4842 }
4843 #endif
4844 
handle_network_fault(struct ibv_wc * pdscr)4845 void handle_network_fault(struct ibv_wc *pdscr)
4846 {
4847 #if 0
4848     recycle_dead_qp();
4849 #endif
4850 }
4851 
4852 
setup_ud_channel()4853 void setup_ud_channel()
4854 {
4855     if(init_params()) {
4856     }
4857 
4858     if(open_hca()) {
4859     }
4860 
4861     if(create_cq()) {
4862     }
4863 
4864     if(get_lid()) {
4865     }
4866 
4867     if(create_qp()) {
4868     }
4869 
4870     if (exch_addr()) {
4871     }
4872 
4873     if(connect_qp()) {
4874     }
4875 
4876     MPI_Barrier(ARMCI_COMM_WORLD);
4877 
4878     if(post_buffers()) {
4879     }
4880 
4881     MPI_Barrier(ARMCI_COMM_WORLD);
4882 }
4883 
4884 
process_recv_completion_from_server(armci_ud_rank * h,cbuf * v)4885 void process_recv_completion_from_server(armci_ud_rank *h, cbuf *v)
4886 {
4887 
4888     struct ibv_qp_attr qp_attr;
4889     enum ibv_qp_attr_mask qp_attr_mask;
4890 
4891     int rc;
4892 
4893     armci_connect_t *con = SRV_con + armci_clus_id(h->src_rank);
4894 
4895     qp_attr_mask = IBV_QP_STATE
4896         | IBV_QP_PKEY_INDEX
4897         | IBV_QP_PORT
4898         | IBV_QP_ACCESS_FLAGS;
4899 
4900     memset(&qp_attr, 0, sizeof qp_attr);
4901     qp_attr.qp_state        = IBV_QPS_INIT;
4902     qp_attr.pkey_index      = DEFAULT_PKEY_IX;
4903     qp_attr.port_num        = SRV_nic->active_port;
4904     qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE|
4905         IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
4906 
4907     rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4908     assert(0 == rc);
4909 
4910     memset(&qp_attr, 0, sizeof qp_attr);
4911     qp_attr_mask = IBV_QP_STATE
4912         | IBV_QP_MAX_DEST_RD_ATOMIC
4913         | IBV_QP_PATH_MTU
4914         | IBV_QP_RQ_PSN
4915         | IBV_QP_MIN_RNR_TIMER;
4916     qp_attr.qp_state           = IBV_QPS_RTR;
4917     qp_attr.path_mtu           = IBV_MTU_1024;          /*MTU*/
4918     qp_attr.max_dest_rd_atomic = 4;
4919     qp_attr.min_rnr_timer      = RNR_TIMER;
4920     qp_attr.rq_psn             = 0;
4921 
4922     /* Fill in the information from the header */
4923     CLN_rqpnums[h->src_rank] = h->qpnum;
4924 
4925     qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
4926     qp_attr.dest_qp_num  = CLN_rqpnums[h->src_rank];
4927     qp_attr.ah_attr.dlid = SRV_nic->lid_arr[h->src_rank];
4928     qp_attr.ah_attr.port_num = SRV_nic->active_port;
4929     rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
4930     assert(0 == rc);
4931 
4932     memset(&qp_attr, 0, sizeof qp_attr);
4933 
4934     qp_attr_mask = IBV_QP_STATE
4935         | IBV_QP_SQ_PSN
4936         | IBV_QP_TIMEOUT
4937         | IBV_QP_RETRY_CNT
4938         | IBV_QP_RNR_RETRY
4939         | IBV_QP_MAX_QP_RD_ATOMIC;
4940 
4941     qp_attr.qp_state            = IBV_QPS_RTS;
4942     qp_attr.sq_psn              = 0;
4943     qp_attr.timeout             = 18;
4944     qp_attr.retry_cnt           = 7;
4945     qp_attr.rnr_retry           = 7;
4946     qp_attr.max_rd_atomic  = 4;
4947 
4948     rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
4949     assert(0 == rc);
4950    release_cbuf(v);
4951 
4952     con->state = QP_ACTIVE;
4953 }
4954 
4955 struct con_q_t {
4956     struct armci_connect_t *head;
4957     void *next;
4958 };
4959 
4960 struct con_q_t *con_q;
4961 
4962 #define MAX_CLIENT_TO_SERVER_CONN 2
4963 
4964 
dequeue_conn()4965 armci_connect_t * dequeue_conn()
4966 {
4967     return NULL;
4968 }
4969 
get_max_client_to_server_conn()4970 int get_max_client_to_server_conn()
4971 {
4972     static int env_read_flag = 0;
4973 
4974     static int max_client_to_server_conn = 0;
4975     if (!env_read_flag) {
4976         char *value;
4977         if ((value = getenv("ARMCI_MAX_CLIENT_TO_SERVER_FACTOR")) != NULL){
4978             max_client_to_server_conn = armci_nclus/atoi(value);
4979             /* We need a minimum of 4 connections */
4980             if (max_client_to_server_conn <= 1)
4981                 max_client_to_server_conn = 2;
4982 
4983             if (armci_me == 0) {
4984                 fprintf(stdout, "max_client_to_server_conn[%d]\n",
4985                         max_client_to_server_conn);
4986             }
4987         } else {
4988             max_client_to_server_conn = armci_nclus / 2;
4989             if (max_client_to_server_conn <= 1)
4990                 max_client_to_server_conn = 2;
4991         }
4992     }
4993 
4994     return max_client_to_server_conn;
4995 
4996 }
4997 
get_the_victim_connection()4998 int get_the_victim_connection()
4999 {
5000     return 0;
5001 }
5002 
break_a_connection_if_needed()5003 void break_a_connection_if_needed()
5004 {
5005     assert(!SERVER_CONTEXT);
5006 
5007     if (!armci_use_lazy_break)
5008         return;
5009 
5010     int max_client_to_server_conn = get_max_client_to_server_conn();
5011 
5012     if ((total_active_conn_to_server >=
5013                 max_client_to_server_conn) && (armci_me != armci_master)) {
5014 
5015         int proc, clus_id;
5016         do {
5017             proc = get_the_victim_connection();
5018             /* Not enough on the queue */
5019             if (proc == -1)
5020                 return;
5021             clus_id = armci_clus_id(proc);
5022         } while ((armci_clus_me == clus_id) ||
5023                 ((SRV_con + clus_id)->state != QP_ACTIVE));
5024 
5025         ARMCI_WaitAll();
5026 
5027         ARMCI_Fence(proc);
5028 
5029         struct ibv_qp_attr qp_attr;
5030         enum ibv_qp_attr_mask qp_attr_mask;
5031 
5032         armci_connect_t *con = SRV_con + clus_id;
5033 
5034         dassert(1, con->state == QP_ACTIVE);
5035         qp_attr_mask = IBV_QP_STATE;
5036 
5037         memset(&qp_attr, 0, sizeof qp_attr);
5038         qp_attr.qp_state        = IBV_QPS_SQD;
5039 
5040         if (ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask)) {
5041             fprintf(stdout," Error modifying QP\n");
5042             fflush(stdout);
5043         }
5044         cbuf *v = get_cbuf();
5045         assert(v != NULL);
5046         v->desc.u.sr.wr.ud.remote_qpn =
5047             rbuf.qp_num[armci_clus_info[clus_id].master];
5048         v->desc.u.sr.wr.ud.remote_qkey = 0;
5049         v->desc.u.sr.wr.ud.ah = conn.ud_ah[armci_clus_info[clus_id].master];
5050 
5051         armci_ud_rank *h = (armci_ud_rank *)CBUF_BUFFER_START(v);
5052         h->src_rank = armci_me;
5053         h->qpnum = con->sqpnum;
5054 
5055         h->msg_type = QP_CON_BREAK_FROM_CLIENT;
5056 
5057         struct ibv_send_wr *bad_wr;
5058 
5059         cbuf_init_send(v, sizeof(armci_ud_rank));
5060 
5061         if(ibv_post_send(conn.qp[0], &(v->desc.u.sr), &bad_wr)) {
5062             printf("Error posting send\n");
5063         }
5064 
5065 
5066         while (con->state != QP_INACTIVE) {
5067             if (armci_me != armci_master)
5068                 progress_engine();
5069         }
5070         if (ibv_destroy_qp(con->qp)) {
5071             printf("Error destroying QP\n");
5072         }
5073     }
5074 }
5075 
5076 
check_state_of_ib_connection(int proc,int force)5077 void check_state_of_ib_connection(int proc, int force)
5078 {
5079     int clus_id;
5080     armci_connect_t *con;
5081 
5082     /* return if ARMCI does not use on demand connection management */
5083     if (!armci_use_odcm)
5084         return;
5085 
5086     static int flag = 1;
5087 
5088     if (flag == 1) {
5089         total_active_conn_to_server = 0;
5090         total_breaks = 0;
5091         flag = 0;
5092     }
5093     /* Check clus id */
5094     clus_id = armci_clus_id(proc);
5095 
5096     con = SRV_con + clus_id;
5097 
5098     assert(!SERVER_CONTEXT);
5099 
5100     if (con->state != QP_ACTIVE) {
5101 
5102         if (!force)
5103             break_a_connection_if_needed();
5104 
5105         total_active_conn_to_server++;
5106 
5107         armci_create_qp(SRV_nic, &con->qp);
5108         con->sqpnum  = con->qp->qp_num;
5109         con->lid = SRV_nic->lid_arr[clus_id];
5110 
5111         cbuf *v = get_cbuf();
5112         assert(v != NULL);
5113         v->desc.u.sr.wr.ud.remote_qpn =
5114             rbuf.qp_num[armci_clus_info[clus_id].master];
5115         v->desc.u.sr.wr.ud.remote_qkey = 0;
5116         v->desc.u.sr.wr.ud.ah = conn.ud_ah[armci_clus_info[clus_id].master];
5117 
5118         armci_ud_rank *h = (armci_ud_rank *)CBUF_BUFFER_START(v);
5119         h->src_rank = armci_me;
5120         h->qpnum = con->sqpnum;
5121 
5122         h->msg_type = QP_CON_REQ;
5123 
5124         struct ibv_send_wr *bad_wr;
5125 
5126         cbuf_init_send(v, sizeof(armci_ud_rank));
5127 
5128         if(ibv_post_send(conn.qp[0], &(v->desc.u.sr), &bad_wr)) {
5129             printf("Error posting send\n");
5130         }
5131 
5132         if (armci_me != armci_master) {
5133             while (con->state != QP_ACTIVE) {
5134                 progress_engine();
5135             }
5136         } else {
5137             while (con->state != QP_ACTIVE) {
5138                 usleep(1);
5139             }
5140         }
5141     }
5142 
5143 }
5144 
5145 
5146