1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 
5 /* $Id: request.c,v 1.74.2.11 2007-10-18 06:09:37 d3h325 Exp $ */
6 #include "armcip.h"
7 #include "request.h"
8 #include "memlock.h"
9 #include "armci_shmem.h"
10 #include "copy.h"
11 #include "gpc.h"
12 #include <stdio.h>
13 #include <signal.h>
14 
15 #define DEBUG_ 0
16 #define DEBUG_MEM 0
17 
18 #if 0
19 #   define MARK_ENTER(func_) { fprintf(stdout, "ENTERING %s\n", func_); fflush(stdout); }
20 #   define MARK_EXIT(func_) { fprintf(stdout, "EXITING %s\n", func_); fflush(stdout); }
21 #else
22 #   define MARK_ENTER(func_)
23 #   define MARK_EXIT(func_)
24 #endif
25 
26 #if 0
27 #   define PRNDBG3(m,a1,a2,a3) \
28                fprintf(stderr,"DBG %d: " m,armci_me,a1,a2,a3);fflush(stderr)
29 #   define PRNDBG(m) PRNDBG3(m,0,0,0)
30 #   define PRNDBG1(m,a1) PRNDBG3(m,a1,0,0)
31 #   define PRNDBG2(m,a1,a2) PRNDBG3(m,a1,a2,0)
32 #else
33 #   define PRNDBG(m)
34 #   define PRNDBG1(m,a1)
35 #   define PRNDBG2(m,a1,a2)
36 #   define PRNDBG3(m,a1,a2,a3)
37 #endif
38 
39 
40 #if !defined(GM) && !defined(VIA) && !defined(LAPI) &&!defined(VAPI)
41   double _armci_rcv_buf[MSG_BUFLEN_DBL];
42   double _armci_snd_buf[MSG_BUFLEN_DBL];
43   char* MessageSndBuffer = (char*)_armci_snd_buf;
44   char* MessageRcvBuffer = (char*)_armci_rcv_buf;
45 #endif
46 
47 
48 #define MAX_EHLEN 248
49 #define ADDBUF(buf,type,val) *(type*)(buf) = (val); (buf) += sizeof(type)
50 #define GETBUF(buf,type,var) (var) = *(type*)(buf); (buf) += sizeof(type)
51 
52 #define ALLIGN8(buf){size_t _adr=(size_t)(buf); \
53                     _adr>>=3; _adr<<=3; _adr+=8; (buf) = (char*)_adr; }
54 
55 #ifndef CLN
56 #   define CLN 1
57 #endif
58 #ifndef SERV
59 #   define SERV 2
60 #endif
61 
62 /*******************Routines to handle completion descriptor******************/
63 /*\
64  *Following the the routines to fill a completion descriptor, if necessary
65  *copy the data to destination based on completion descriptor
66  *NOTE, THE FOLLOWING ROUTINES ARE FOR CLIENTS ONLY
67 \*/
68 
69 
70 /*\Routine to complete a vector request, data is in buf and descriptor in dscr
71 \*/
72 extern int armci_direct_vector_get(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
armci_complete_vector_get(armci_giov_t darr[],int len,void * buf)73 static void armci_complete_vector_get(armci_giov_t darr[],int len,void *buf)
74 {
75 int proc;
76 request_header_t *msginfo = (request_header_t*) buf;
77     proc = msginfo->to;
78 #if defined(USE_SOCKET_VECTOR_API)
79     armci_direct_vector_get(msginfo, darr, len, proc);
80 #else
81     armci_rcv_vector_data(proc, msginfo, darr, len);
82 #endif
83     FREE_SEND_BUFFER(buf);
84 }
85 
86 
87 
88 
89 
90 
91 /*\ Routine called from buffers.c to complete a request for which the buffer was
92  *  used for, so that the buffer can be reused.
93 \*/
armci_complete_req_buf(BUF_INFO_T * info,void * buffer)94 void armci_complete_req_buf(BUF_INFO_T *info, void *buffer)
95 {
96 request_header_t *msginfo = (request_header_t*) buffer;
97     ARMCI_PR_DBG("enter",0);
98     if(info->protocol==0)return;
99     else if(info->protocol==SDSCR_IN_PLACE){
100        char *dscr = info->dscr;
101        void *loc_ptr;
102        int stride_levels;
103        int *loc_stride_arr,*count;
104 
105        loc_ptr = *(void**)dscr;           dscr += sizeof(void*);
106        stride_levels = *(int*)dscr;       dscr += sizeof(int);
107        loc_stride_arr = (int*)dscr;       dscr += stride_levels*sizeof(int);
108        count = (int*)dscr;
109        if(0 || DEBUG_){
110          if(armci_me==0){
111            printf("\n%d:extracted loc_ptr=%p, stridelevels=%d\n",armci_me,
112                   loc_ptr,stride_levels);
113            fflush(stdout);
114          }
115        }
116        armci_rcv_strided_data(msginfo->to, msginfo, msginfo->datalen, loc_ptr,
117                                 stride_levels,loc_stride_arr,count);
118        FREE_SEND_BUFFER(msginfo);
119     }
120     else if(info->protocol==VDSCR_IN_PLACE || info->protocol==VDSCR_IN_PTR){
121        char *dscr;
122        int len,i;
123        if(info->protocol==VDSCR_IN_PLACE){
124                dscr = info->dscr;
125                //printf("\n%d:vdscr in place\n",armci_me);
126        }
127        else {
128                dscr = info->ptr.dscrbuf;
129                //printf("\n%d:vdscr in buf\n",armci_me);
130        }
131        GETBUF(dscr, long ,len);
132        {
133          armci_giov_t *darr;
134          darr = (armci_giov_t *)malloc(sizeof(armci_giov_t)*len);
135          if(!darr)armci_die("malloc in complete_req_buf failed",len);
136          for(i = 0; i< len; i++){
137            int parlen, bytes;
138            GETBUF(dscr, int, parlen);
139            GETBUF(dscr, int, bytes);
140            darr[i].ptr_array_len = parlen;
141            darr[i].bytes = bytes;
142            if(msginfo->operation==GET)darr[i].dst_ptr_array=(void **)dscr;
143            else darr[i].src_ptr_array=(void **)dscr;
144            dscr+=sizeof(void *)*parlen;
145          }
146          if (msginfo->operation==GET) armci_complete_vector_get(darr,len,buffer);
147        }
148     }
149     else
150        armci_die("armci_complete_req_buf,protocol val invalid",info->protocol);
151     ARMCI_PR_DBG("exit",0);
152 }
153 
154 extern long x_net_offset(void *,int);
155 /*\ save a part of strided descriptor needed to complete request
156 
157 rmo: it seems as if save_
158 
159 \*/
armci_save_strided_dscr(char ** bptr,void * rem_ptr,int rem_stride_arr[],int count[],int stride_levels,int is_nb,int proc)160 void armci_save_strided_dscr(char **bptr, void *rem_ptr,int rem_stride_arr[],
161                              int count[], int stride_levels,int is_nb,int proc)
162 {
163 int i;
164 char *bufptr=*bptr;
165 BUF_INFO_T *info=NULL;
166 long network_offset,tmpoffset;
167     ARMCI_PR_DBG("enter",0);
168 
169   # ifdef PORTALS_UNRESOLVED
170     if(!is_nb){
171       network_offset=x_net_offset(rem_ptr,proc);
172       if(DEBUG_){printf("\n%d:rem_ptr=%p offset=%d newrem=%p",armci_me,rem_ptr,network_offset,(char *)rem_ptr+network_offset);fflush(stdout);}
173       rem_ptr = (char *)rem_ptr+network_offset;
174     }
175   # endif
176 
177     if(is_nb){
178        info=BUF_TO_BUFINFO(*bptr);
179        bufptr = (info->dscr);
180     }
181     *(void**)bufptr = rem_ptr;         bufptr += sizeof(void*);
182     *(int*)bufptr = stride_levels;     bufptr += sizeof(int);
183     for(i=0;i<stride_levels;i++)((int*)bufptr)[i] = rem_stride_arr[i];
184     bufptr += stride_levels*sizeof(int);
185     for(i=0;i< stride_levels+1;i++)((int*)bufptr)[i] = count[i];
186     bufptr += (1+stride_levels)*sizeof(int);
187     if((0 || DEBUG_) && is_nb){
188       bufptr = (info->dscr);
189       if(armci_me==0)
190         printf("\n%d:rem_ptr %p=%p stride_levels %d=%d\n",armci_me,
191                 *(void**)bufptr,rem_ptr,
192                 *(int*)(bufptr + sizeof(void*)),stride_levels);
193     }
194     /*remote_strided expects the pointer to point to the end of descr hence..*/
195     if(is_nb)
196        info->protocol=SDSCR_IN_PLACE;
197     else
198        *bptr=bufptr;
199     ARMCI_PR_DBG("exit",0);
200 
201 }
202 
203 
204 /*\ save a part of vector descriptor needed to complete request
205 \*/
armci_save_vector_dscr(char ** bptr,armci_giov_t darr[],int len,int op,int is_nb,int proc)206 void armci_save_vector_dscr(char **bptr,armci_giov_t darr[],int len,
207                             int op,int is_nb, int proc)
208 {
209 int i,size=sizeof(int);
210 BUF_INFO_T *info;
211 char *buf,*bufptr=*bptr;
212 void *rem_ptr;
213 long offst;
214     ARMCI_PR_DBG("enter",0);
215     if(is_nb){
216        for(i=0;i<len;i++){
217          size+=(2*sizeof(int)+darr[i].ptr_array_len * sizeof(void*));
218        }
219        info=BUF_TO_BUFINFO(bufptr);
220        /*if descr fits in available buffer, use it else do malloc */
221        if(size<=UBUF_LEN){
222          buf = info->dscr;
223          info->protocol=VDSCR_IN_PLACE;
224        }
225        else {
226          info->ptr.dscrbuf = (void *)malloc(size);
227          buf = (char *)info->ptr.dscrbuf;
228          info->protocol=VDSCR_IN_PTR;
229        }
230     }
231     else
232        buf=bufptr;
233 
234     ADDBUF(buf,long,len); /* number of sets */
235     for(i=0;i<len;i++){
236         int j;
237         ADDBUF(buf,int,darr[i].ptr_array_len); /* number of elements */
238         ADDBUF(buf,int,darr[i].bytes);         /* sizeof element */
239         if(op==GET) {
240           if(is_nb){
241             rem_ptr = darr[i].dst_ptr_array;
242           }
243           else {
244           # ifdef PORTALS_UNRESOLVED
245             for(j=0;j<darr[i].ptr_array_len;j++){
246               offst=x_net_offset(darr[i].src_ptr_array[j],proc);
247               darr[i].src_ptr_array[j]= (char*)darr[i].src_ptr_array[j]+offst;
248             }
249           # endif
250             rem_ptr = darr[i].src_ptr_array;
251           }
252         }
253         else {
254         # ifdef PORTALS_UNRESOLVED
255           for(j=0;j<darr[i].ptr_array_len;j++){
256             offst=x_net_offset(darr[i].dst_ptr_array[j],proc);
257             darr[i].dst_ptr_array[j]= (char*)darr[i].dst_ptr_array[j]+offst;
258           }
259         # endif
260           rem_ptr = darr[i].dst_ptr_array;
261         }
262         armci_copy(rem_ptr,buf, darr[i].ptr_array_len * sizeof(void*));
263         buf += darr[i].ptr_array_len*sizeof(void*);
264     }
265     if(!is_nb)
266        *bptr=buf;
267     ARMCI_PR_DBG("exit",0);
268 }
269 
270 /*\
271  * If buf==null, set handle->bufid to val, else set it to the id of the buf
272 \*/
armci_set_nbhandle_bufid(armci_ihdl_t nb_handle,char * buf,int val)273 void armci_set_nbhandle_bufid(armci_ihdl_t nb_handle,char *buf,int val)
274 {
275 BUF_INFO_T *info;
276     if(buf){
277        info = BUF_TO_BUFINFO(buf);
278        val = info->bufid;
279     }
280     nb_handle->bufid = val;
281 }
282 
283 /**************End--Routines to handle completion descriptor******************/
284 
285 
286 /*\ send request to server to LOCK MUTEX
287 \*/
armci_rem_lock(int mutex,int proc,int * ticket)288 void armci_rem_lock(int mutex, int proc, int *ticket)
289 {
290 request_header_t *msginfo;
291 int *ibuf;
292 int bufsize = sizeof(request_header_t)+sizeof(int);
293 
294     msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,LOCK,proc);
295     bzero(msginfo,sizeof(request_header_t));
296 
297     msginfo->datalen = sizeof(int);
298     msginfo->dscrlen = 0;
299     msginfo->from  = armci_me;
300     msginfo->to    = proc;
301     msginfo->operation = LOCK;
302     msginfo->format  = mutex;
303     msginfo->bytes = msginfo->datalen + msginfo->dscrlen;
304 
305     ibuf = (int*)(msginfo+1);
306     *ibuf = mutex;
307 
308     armci_send_req(proc, msginfo, bufsize, 0);
309 
310     /* receive ticket from server */
311     *ticket = *(int*)armci_rcv_data(proc,msginfo,0);
312     FREE_SEND_BUFFER(msginfo);
313 
314     if(DEBUG_)fprintf(stderr,"%d receiving ticket %d\n",armci_me, *ticket);
315 }
316 
317 
318 
319 
armci_server_lock(request_header_t * msginfo)320 void armci_server_lock(request_header_t *msginfo)
321 {
322 int *ibuf = (int*)(msginfo+1);
323 int proc  = msginfo->from;
324 int mutex;
325 int ticket;
326     ARMCI_PR_DBG("enter",0);
327 
328     mutex = *(int*)ibuf;
329 
330     /* acquire lock on behalf of requesting process */
331     ticket = armci_server_lock_mutex(mutex, proc, msginfo->tag);
332 
333     if(ticket >-1){
334        /* got lock */
335        msginfo->datalen = sizeof(int);
336        armci_send_data(msginfo, &ticket);
337     }
338     ARMCI_PR_DBG("exit",0);
339 }
340 
341 
342 /*\ send request to server to UNLOCK MUTEX
343 \*/
armci_rem_unlock(int mutex,int proc,int ticket)344 void armci_rem_unlock(int mutex, int proc, int ticket)
345 {
346 request_header_t *msginfo;
347 int *ibuf;
348 int bufsize = sizeof(request_header_t)+sizeof(ticket);
349 
350     msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,UNLOCK,proc);
351     bzero(msginfo,sizeof(request_header_t));
352 
353     msginfo->dscrlen = msginfo->bytes = sizeof(ticket);
354     msginfo->datalen = 0;
355     msginfo->from  = armci_me;
356     msginfo->to    = proc;
357     msginfo->operation = UNLOCK;
358     msginfo->format  = mutex;
359     ibuf = (int*)(msginfo+1);
360     *ibuf = ticket;
361 
362     if(DEBUG_)fprintf(stderr,"%d sending unlock\n",armci_me);
363     armci_send_req(proc, msginfo, bufsize,0);
364 }
365 
366 
367 
368 /*\ server unlocks mutex and passes lock to the next waiting process
369 \*/
armci_server_unlock(request_header_t * msginfo,char * dscr)370 void armci_server_unlock(request_header_t *msginfo, char* dscr)
371 {
372     int ticket = *(int*)dscr;
373     int mutex  = msginfo->format;
374     int proc   = msginfo->to;
375     int waiting;
376 
377     waiting = armci_server_unlock_mutex(mutex,proc,ticket,&msginfo->tag);
378 
379     if(waiting >-1){ /* -1 means that nobody is waiting */
380 
381        ticket++;
382        /* pass ticket to the waiting process */
383        msginfo->from = waiting;
384        msginfo->datalen = sizeof(ticket);
385        armci_send_data(msginfo, &ticket);
386 
387     }
388 }
389 
armci_unlock_waiting_process(msg_tag_t tag,int proc,int ticket)390 void armci_unlock_waiting_process(msg_tag_t tag, int proc, int ticket)
391 {
392 request_header_t header;
393 request_header_t *msginfo = &header;
394 
395   msginfo->datalen = sizeof(int);
396   msginfo->tag     = tag;
397   msginfo->from      = proc;
398   msginfo->to    = armci_me;
399   armci_send_data(msginfo, &ticket);
400 }
401 
armci_server_ptr(int id)402 void * armci_server_ptr(int id){
403 char *buf;
404 int bufsize = sizeof(int);
405 request_header_t *msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,ATTACH,armci_me);
406   bzero(msginfo,sizeof(request_header_t));
407   msginfo->from  = armci_me;
408   msginfo->to    = SERVER_NODE(armci_clus_me);
409   msginfo->dscrlen   = 0;
410   msginfo->datalen = sizeof(int);
411   msginfo->operation =  ATTACH;
412   msginfo->bytes = msginfo->dscrlen+ msginfo->datalen;
413   armci_copy(&id, msginfo +1, sizeof(int));
414   if(DEBUG_MEM){
415     printf("\n%d:attach req:sending id %d \n",armci_me,id);fflush(stdout);
416   }
417   armci_send_req(armci_master, msginfo, bufsize,0);
418   buf= armci_rcv_data(armci_master,msginfo,sizeof(void *));/* receive response */
419   if(DEBUG_MEM){
420     printf("\n%d:attach req:got %p \n",armci_me,buf);fflush(stdout);
421   }
422   FREE_SEND_BUFFER(msginfo);
423   ARMCI_PR_DBG("exit",0);
424   return (void *)buf;
425 
426 }
427 
428 /*\ control message to the server, e.g.: ATTACH to shmem, return ptr etc.
429 \*/
armci_serv_attach_req(void * info,int ilen,long size,void * resp,int rlen)430 void armci_serv_attach_req(void *info, int ilen, long size, void* resp,int rlen)
431 {
432 char *buf;
433     ARMCI_PR_DBG("enter",0);
434 int bufsize = 2*sizeof(request_header_t)+ilen + sizeof(long)+sizeof(rlen);
435 long *idlist=(long *)info;
436 request_header_t *msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,ATTACH,armci_me);
437     bzero(msginfo,sizeof(request_header_t));
438 
439     msginfo->from  = armci_me;
440     msginfo->to    = SERVER_NODE(armci_clus_me);
441     msginfo->dscrlen   = ilen;
442     msginfo->datalen = sizeof(long)+sizeof(int);
443     msginfo->operation =  ATTACH;
444     msginfo->bytes = msginfo->dscrlen+ msginfo->datalen;
445 
446     armci_copy(info, msginfo +1, ilen);
447     if(DEBUG_MEM){printf("\n%d:sending idlist+1 %d, size %d, idlist[0] %d, idlist[1] %d\n",armci_me,idlist+1,size,idlist[0],idlist[1]);}
448     buf = ((char*)msginfo) + ilen + sizeof(request_header_t);
449     *((long*)buf) =size;
450     *(int*)(buf+ sizeof(long)) = rlen;
451     armci_send_req(armci_master, msginfo, bufsize,0);
452     if(rlen){
453       buf= armci_rcv_data(armci_master, msginfo,rlen);  /* receive response */
454       bcopy(buf, resp, rlen);
455       FREE_SEND_BUFFER(msginfo);
456 
457       if(DEBUG_MEM){printf("%d:client attaching got ptr=%p %d bytes\n",armci_me,buf,rlen);
458          fflush(stdout);
459       }
460     }
461     ARMCI_PR_DBG("exit",0);
462 }
463 
464 
465 /*\ server initializes its copy of the memory lock data structures
466 \*/
server_alloc_memlock(void * ptr_myclus)467 static void server_alloc_memlock(void *ptr_myclus)
468 {
469 int i;
470 
471     /* for protection, set pointers for processes outside local node NULL */
472     memlock_table_array = calloc(armci_nproc,sizeof(void*));
473     if(!memlock_table_array) armci_die("malloc failed for ARMCI lock array",0);
474 
475     /* set pointers for processes on local cluster node
476      * ptr_myclus - corresponds to the master process
477      */
478     for(i=0; i< armci_clus_info[armci_clus_me].nslave; i++){
479         memlock_table_array[armci_master +i] = ((char*)ptr_myclus)
480                 + MAX_SLOTS*sizeof(memlock_t)*i;
481     }
482 
483     /* set pointer to the use flag */
484 #ifdef MEMLOCK_SHMEM_FLAG
485     armci_use_memlock_table = (int*) (MAX_SLOTS*sizeof(memlock_t) +
486                       (char*) memlock_table_array[armci_clus_last]);
487 
488     if(DEBUG_)
489       fprintf(stderr,"server initialized memlock %p\n",armci_use_memlock_table);
490 #endif
491 }
492 
493 
494 static int allocate_memlock=1;
495 
496 /*\ server actions triggered by client request to ATTACH
497 \*/
armci_server_ipc(request_header_t * msginfo,void * descr,void * buffer,int buflen)498 void armci_server_ipc(request_header_t* msginfo, void* descr,
499                       void* buffer, int buflen)
500 {
501 double *ptr;
502 long *idlist = (long*)descr;
503 long size = *(long*)buffer;
504 int rlen = *(int*)(sizeof(long)+(char*)buffer);
505 extern int **_armci_int_mutexes;
506     ARMCI_PR_DBG("enter",0);
507     if(size<0) armci_die("armci_server_ipc: size<0",(int)size);
508     if(DEBUG_MEM)printf("\n%d:got idlist+1 %p, size %d, idlist[0] %d, idlist[1] %d",armci_me,idlist+1,size,idlist[0],idlist[1]);
509     ptr=(double*)Attach_Shared_Region(idlist+1,size,idlist[0]);
510     if(!ptr)armci_die("armci_server_ipc: failed to attach",0);
511     /* provide data server with access to the memory lock data structures */
512     if(allocate_memlock){
513       allocate_memlock = 0;
514       server_alloc_memlock(ptr);
515     }
516     if(_armci_int_mutexes==NULL){
517       printf("unresolved portals external\n");
518       abort();
519     # ifdef PORTALS_UNRESOLVED
520       extern int _armci_server_mutex_ready;
521       extern void *_armci_server_mutex_ptr;
522       if(_armci_server_mutex_ready){
523         _armci_int_mutexes=(int **)_armci_server_mutex_ptr;
524       }
525     # endif
526     }
527    if(size>0)armci_set_mem_offset(ptr);
528 
529    if(msginfo->datalen != sizeof(long)+sizeof(int))
530       armci_die("armci_server_ipc: bad msginfo->datalen ",msginfo->datalen);
531 
532    if(rlen==sizeof(ptr)){
533      msginfo->datalen = rlen;
534      armci_send_data(msginfo, &ptr);
535    }
536    else armci_die("armci_server_ipc: bad rlen",rlen);
537    ARMCI_PR_DBG("exit",0);
538 }
539 
540 
541 /*\ send RMW request to server
542 \*/
armci_rem_rmw(int op,void * ploc,void * prem,int extra,int proc)543 void armci_rem_rmw(int op, void *ploc, void *prem, int extra, int proc)
544 {
545 request_header_t *msginfo;
546 char *buf;
547 void *buffer;
548 int bufsize = sizeof(request_header_t)+sizeof(long)+sizeof(void*);
549 long offst;
550 
551     ARMCI_PR_DBG("enter",0);
552     msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,op,proc);
553     bzero(msginfo,sizeof(request_header_t));
554 
555     msginfo->dscrlen = sizeof(void*);
556     msginfo->from  = armci_me;
557     msginfo->to    = proc;
558     msginfo->operation = op;
559     msginfo->datalen = sizeof(long);
560   # ifdef PORTALS_UNRESOLVED
561     offst=x_net_offset(prem,proc);
562     prem = ((char *)prem+offst);
563   # endif
564     buf = (char*)(msginfo+1);
565     ADDBUF(buf, void*, prem); /* pointer is shipped as descriptor */
566 
567     /* data field: extra argument in fetch&add and local value in swap */
568     if(op==ARMCI_SWAP){
569        ADDBUF(buf, int, *((int*)ploc));
570     }else if(op==ARMCI_SWAP_LONG) {
571        ADDBUF(buf, long, *((long*)ploc) );
572        msginfo->datalen = sizeof(long);
573     }else {
574        ADDBUF(buf, int, extra);
575     }
576 
577     msginfo->bytes   = msginfo->datalen+msginfo->dscrlen ;
578 
579     if(DEBUG_){
580         printf("%d sending RMW request %d to %d\n",armci_me,op,proc);
581         fflush(stdout);
582     }
583     armci_send_req(proc, msginfo, bufsize,0);
584     buffer = armci_rcv_data(proc,msginfo,0);  /* receive response */
585 
586     if(op==ARMCI_FETCH_AND_ADD || op== ARMCI_SWAP)
587         *(int*)ploc = *(int*)buffer;
588     else
589         *(long*)ploc = *(long*)buffer;
590 
591     FREE_SEND_BUFFER(msginfo);
592     ARMCI_PR_DBG("exit",0);
593 }
594 
595 
596 /*\ server response to RMW
597 \*/
armci_server_rmw(request_header_t * msginfo,void * ptr,void * pextra)598 void armci_server_rmw(request_header_t* msginfo,void* ptr, void* pextra)
599 {
600 long lold;
601 int iold;
602 void *pold=0;
603 int op = msginfo->operation;
604 
605     ARMCI_PR_DBG("enter",0);
606      if(DEBUG_){
607        printf("%d server: executing RMW from %d. op=%d pextra=%p\n",armci_me,msginfo->from, op, pextra);
608         fflush(stdout);
609      }
610      if(msginfo->datalen != sizeof(long))
611           armci_die2("armci_server_rmw: bad datalen=",msginfo->datalen,op);
612 
613      /* for swap operations *pextra has the  value to swap
614       * for fetc&add it carries the increment argument
615       */
616      switch(op){
617      case ARMCI_SWAP:
618         iold = *(int*) pextra;
619      case ARMCI_FETCH_AND_ADD:
620         pold = &iold;
621         break;
622 
623      case ARMCI_SWAP_LONG:
624         lold = *(long*) pextra;
625      case ARMCI_FETCH_AND_ADD_LONG:
626         pold = &lold;
627         break;
628 
629      default:
630           armci_die("armci_server_rmw: bad operation code=",op);
631      }
632 
633      armci_generic_rmw(op, pold, *(int**)ptr, *(int*) pextra, msginfo->to);
634 
635      armci_send_data(msginfo, pold);
636     ARMCI_PR_DBG("exit",0);
637 }
638 
639 extern int armci_direct_vector_snd(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
640 extern int armci_direct_vector(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
armci_rem_vector(int op,void * scale,armci_giov_t darr[],int len,int proc,int flag,armci_ihdl_t nb_handle)641 int armci_rem_vector(int op, void *scale, armci_giov_t darr[],int len,int proc,int flag, armci_ihdl_t nb_handle)
642 {
643     char *buf,*buf0;
644     request_header_t *msginfo;
645     int bytes =0, s, slen=0;
646     size_t adr;
647     int bufsize = sizeof(request_header_t);
648     int tag=0;
649 
650     if(nb_handle)tag=nb_handle->tag;
651 
652     /* compute size of the buffer needed */
653     for(s=0; s<len; s++){
654         bytes   += darr[s].ptr_array_len * darr[s].bytes; /* data */
655         bufsize += darr[s].ptr_array_len *sizeof(void*)+2*sizeof(int); /*descr*/
656     }
657 
658     bufsize += bytes + sizeof(long) +2*sizeof(double) +8; /*+scale+allignment*/
659 
660     buf = buf0= GET_SEND_BUFFER(bufsize,op,proc);
661     msginfo = (request_header_t*)buf;
662     bzero(msginfo,sizeof(request_header_t));
663 
664 /*     printf("%d:: rem_vector. len=%d. ptr_len[len-1]=%d bytes[len-1]=%d bufsize=%d\n",  */
665 /* 	   armci_me, len, darr[len-1].ptr_array_len, darr[len-1].bytes,bufsize); */
666 /*     fflush(stdout); */
667 
668 
669     if(nb_handle){
670    /*   INIT_SENDBUF_INFO(nb_handle,buf,op,proc); redundant -- see armci_rem_strided */
671       _armci_buf_set_tag(buf,nb_handle->tag,0);
672       if(nb_handle->bufid == NB_NONE)
673         armci_set_nbhandle_bufid(nb_handle,buf,0);
674     }
675 
676     buf += sizeof(request_header_t);
677 
678     /* fill vector descriptor */
679     armci_save_vector_dscr(&buf,darr,len,op,0,proc);
680 
681     /* align buf for doubles (8-bytes) before copying data */
682     adr = (size_t)buf;
683     adr >>=3;
684     adr <<=3;
685     adr +=8;
686     buf = (char*)adr;
687 
688     msginfo->ehlen = 0;
689 
690     /* fill message header */
691     msginfo->dscrlen = buf - buf0 - sizeof(request_header_t);
692     msginfo->from  = armci_me;
693     msginfo->to    = proc;
694     msginfo->operation  = op;
695     msginfo->format  = VECTOR;
696     msginfo->datalen = bytes;
697 
698     /* put scale for accumulate */
699     switch(op){
700     case ARMCI_ACC_INT:
701                *(int*)buf = *(int*)scale; slen= sizeof(int); break;
702     case ARMCI_ACC_DCP:
703                ((double*)buf)[0] = ((double*)scale)[0];
704                ((double*)buf)[1] = ((double*)scale)[1];
705                slen=2*sizeof(double);break;
706     case ARMCI_ACC_DBL:
707                *(double*)buf = *(double*)scale; slen = sizeof(double); break;
708     case ARMCI_ACC_CPL:
709                ((float*)buf)[0] = ((float*)scale)[0];
710                ((float*)buf)[1] = ((float*)scale)[1];
711                slen=2*sizeof(float);break;
712     case ARMCI_ACC_FLT:
713                *(float*)buf = *(float*)scale; slen = sizeof(float); break;
714     default: slen=0;
715     }
716     buf += slen;
717     msginfo->datalen += slen;
718     msginfo->bytes = msginfo->datalen+msginfo->dscrlen;
719 
720 
721     /* for put and accumulate copy data into buffer */
722     if(op != GET){
723 /*       fprintf(stderr,"sending %lf\n",*(double*)darr[0].src_ptr_array[0]);*/
724        armci_vector_to_buf(darr, len, buf);
725     }
726 
727     armci_send_req(proc, msginfo, bufsize,tag);
728     /*x_buf_send_complete(buf0);*/
729 
730     if(nb_handle && op==GET) armci_save_vector_dscr(&buf0,darr,len,op,1,proc);
731     if(op == GET&& !nb_handle){
732        armci_complete_vector_get(darr,len,msginfo);
733     }
734 
735     return 0;
736 }
737 
738 #define CHUN_ (8*8096)
739 #define CHUN 200000
740 
741 /*\ client version of remote strided operation
742 \*/
armci_rem_strided(int op,void * scale,int proc,void * src_ptr,int src_stride_arr[],void * dst_ptr,int dst_stride_arr[],int count[],int stride_levels,ext_header_t * h,int flag,armci_ihdl_t nb_handle)743 int armci_rem_strided(int op, void* scale, int proc,
744                        void *src_ptr, int src_stride_arr[],
745                        void* dst_ptr, int dst_stride_arr[],
746                        int count[], int stride_levels,
747                        ext_header_t *h, int flag,armci_ihdl_t nb_handle)
748 {
749     char *buf, *buf0;
750     request_header_t *msginfo;
751     int  i, slen=0, bytes;
752     void *rem_ptr;
753     int  *rem_stride_arr;
754     int bufsize = sizeof(request_header_t);
755     int ehlen =0;
756     msg_tag_t msg_tag;
757     int tag=0;
758 
759     /* we send ext header only for last chunk */
760 #if 0
761     if(h)  ehlen = h->len;
762 #else
763     if(h) if(h->last)  ehlen = h->len;
764 #endif
765     if(ehlen>MAX_EHLEN || ehlen <0)
766        armci_die2("armci_rem_strided ehlen out of range",MAX_EHLEN,ehlen);
767     /* calculate size of the buffer needed */
768     for(i=0, bytes=1;i<=stride_levels;i++)bytes*=count[i];
769     bufsize += bytes+sizeof(void*)+2*sizeof(int)*(stride_levels+1) +ehlen
770                +2*sizeof(double) + 16; /* +scale+alignment */
771 
772     if (flag){
773       printf("%d: flag=%d\n",armci_me,flag);
774       if(op==GET)bufsize -=bytes;
775     }
776 
777     buf = buf0= GET_SEND_BUFFER((bufsize),op,proc);
778     msginfo = (request_header_t*)buf;
779     bzero(msginfo,sizeof(request_header_t));
780 
781 
782     if(nb_handle)
783 #ifdef ACC_SMP
784 	 if(!ARMCI_ACC(op))
785 #endif
786     {
787     // printf("%s: non-blocking ops not yet supported\n",Portals_ID());
788     // abort();
789 /*    INIT_SENDBUF_INFO(nb_handle,buf,op,proc); same as _armci_buf_set_tag, why here? */
790      _armci_buf_set_tag(buf,nb_handle->tag,0);
791      if(nb_handle->bufid == NB_NONE)
792         armci_set_nbhandle_bufid(nb_handle,buf,0);
793      tag = nb_handle->tag;
794     }
795 
796     if(op == GET){
797        rem_ptr = src_ptr;
798        rem_stride_arr = src_stride_arr;
799     }else{
800        rem_ptr = dst_ptr;
801        rem_stride_arr = dst_stride_arr;
802     }
803 
804     msginfo->datalen=bytes;
805 
806     /* fill strided descriptor */
807     buf += sizeof(request_header_t);
808     /*this function fills the dscr into buf and also moves the buf ptr to the
809       end of the dscr*/
810     armci_save_strided_dscr(&buf,rem_ptr,rem_stride_arr,count,stride_levels,0,proc);
811 
812     /* align buf for doubles (8-bytes) before copying data */
813     ALLIGN8(buf);
814 
815     /* fill message header */
816     msginfo->from   = armci_me;
817     msginfo->to     = proc;
818     msginfo->format = STRIDED;
819     msginfo->operation  = op;
820 
821     /* put scale for accumulate */
822     switch(op){
823     case ARMCI_ACC_INT:
824                *(int*)buf = *(int*)scale; slen= sizeof(int); break;
825     case ARMCI_ACC_DCP:
826                ((double*)buf)[0] = ((double*)scale)[0];
827                ((double*)buf)[1] = ((double*)scale)[1];
828                slen=2*sizeof(double);break;
829     case ARMCI_ACC_DBL:
830                *(double*)buf = *(double*)scale; slen = sizeof(double); break;
831     case ARMCI_ACC_CPL:
832                ((float*)buf)[0] = ((float*)scale)[0];
833                ((float*)buf)[1] = ((float*)scale)[1];
834                slen=2*sizeof(float);break;
835     case ARMCI_ACC_FLT:
836                *(float*)buf = *(float*)scale; slen = sizeof(float); break;
837     case ARMCI_ACC_LNG:
838                *(long*)buf = *(long*)scale; slen = sizeof(long); break;
839     default: slen=0;
840     }
841 
842     /*
843 	if(ARMCI_ACC(op))printf("%d client len=%d alpha=%lf data=%lf,%lf\n",
844 	     armci_me, buf-(char*)msginfo,((double*)buf)[0],*((double*)src_ptr),             ((double*)buf)[1]);
845     */
846 
847     buf += slen;
848 
849     /**** add extended header *******/
850     if(ehlen){
851        bcopy(h->exthdr,buf,ehlen);
852        i = ehlen%8; ehlen += (8-i); /* make sure buffer is still alligned */
853        buf += ehlen;
854     }
855 
856     msginfo->ehlen  = ehlen;
857     msginfo->dscrlen = buf - buf0 - sizeof(request_header_t);
858     msginfo->bytes = msginfo->datalen+msginfo->dscrlen;
859 
860     if(op == GET){
861     /*
862       if(nb_handle) {
863          printf("%s rem_strided: nb gets not yet available\n",Portals_ID());
864          abort();
865       }
866     */
867       armci_send_req(proc, msginfo, bufsize,tag);
868       armci_save_strided_dscr(&buf0,dst_ptr,dst_stride_arr,count,
869                                  stride_levels,1,proc);
870 
871       if(!nb_handle){
872         armci_rcv_strided_data(proc, msginfo, msginfo->datalen,
873                               dst_ptr, stride_levels, dst_stride_arr, count);
874         FREE_SEND_BUFFER(msginfo);
875       }
876     } else {
877        /* for put and accumulate send data */
878        armci_send_strided(proc,msginfo, buf,
879                           src_ptr, stride_levels, src_stride_arr, count,tag);
880     }
881 
882     return 0;
883 }
884 
885 
armci_process_extheader(request_header_t * msginfo,char * dscr,char * buf,int buflen)886 void armci_process_extheader(request_header_t *msginfo, char *dscr, char* buf, int buflen)
887 {
888  armci_flag_t *h;
889  int *flag;
890 
891    h = (armci_flag_t*)(dscr + msginfo->dscrlen - msginfo->ehlen);
892 #if 0
893    if(msginfo->ehlen)printf("%d:server from=%d len=%d: ptr=%p val=%d\n",armci_me,msginfo->from, msginfo->ehlen,h->ptr,h->val);
894    fflush(stdout);
895 #endif
896    flag = (int*)(h->ptr);
897    *flag = h->val;
898 }
899 
armci_server(request_header_t * msginfo,char * dscr,char * buf,int buflen)900 void armci_server(request_header_t *msginfo, char *dscr, char* buf, int buflen)
901 {
902 int  buf_stride_arr[MAX_STRIDE_LEVEL+1];
903 int  *loc_stride_arr,slen;
904 int  *count, stride_levels;
905 void *buf_ptr, *loc_ptr;
906 void *scale;
907 char *dscr_save = dscr;
908 int  rc, i,proc;
909 int stat;
910 
911     ARMCI_PR_DBG("enter",msginfo->datalen);fflush(stdout);
912     /*return if using readv/socket for put*/
913     if(msginfo->operation==PUT && msginfo->datalen==0){
914       if(msginfo->ehlen) /* process extra header if available */
915          armci_process_extheader(msginfo, dscr, buf, buflen);
916       return;
917     }
918 
919     /* unpack descriptor record */
920     loc_ptr = *(void**)dscr;           dscr += sizeof(void*);
921     stride_levels = *(int*)dscr;       dscr += sizeof(int);
922     loc_stride_arr = (int*)dscr;       dscr += stride_levels*sizeof(int);
923     count = (int*)dscr;
924 
925     /* compute stride array for buffer */
926     buf_stride_arr[0]=count[0];
927     for(i=0; i< stride_levels; i++)
928         buf_stride_arr[i+1]= buf_stride_arr[i]*count[i+1];
929 
930     /* get scale for accumulate, adjust buf to point to data */
931     switch(msginfo->operation){
932     case ARMCI_ACC_INT:     slen = sizeof(int); break;
933     case ARMCI_ACC_DCP:     slen = 2*sizeof(double); break;
934     case ARMCI_ACC_DBL:     slen = sizeof(double); break;
935     case ARMCI_ACC_CPL:     slen = 2*sizeof(float); break;
936     case ARMCI_ACC_FLT:     slen = sizeof(float); break;
937     case ARMCI_ACC_LNG:     slen = sizeof(long); break;
938 	default:				slen=0;
939     }
940 
941     scale = dscr_save+ (msginfo->dscrlen - slen -msginfo->ehlen);
942 /*
943     if(ARMCI_ACC(msginfo->operation))
944       fprintf(stderr,"%d in server len=%d slen=%d alpha=%lf data=%lf\n",
945                armci_me, msginfo->dscrlen, slen, *(double*)scale,*(double*)buf);
946 */
947 
948     buf_ptr = buf; /*  data in buffer */
949 
950     proc = msginfo->to;
951 
952     if(msginfo->operation == GET){
953        armci_send_strided_data(proc, msginfo, buf,
954                                loc_ptr, stride_levels, loc_stride_arr, count);
955        /* fprintf(stderr, "GET response sent with tag: %d\n, msginfo->tag",
956           msginfo->tag); */
957     } else{
958        if((rc = armci_op_strided(msginfo->operation, scale, proc,
959                buf_ptr, buf_stride_arr, loc_ptr, loc_stride_arr,
960                count, stride_levels, 1,NULL)))
961                armci_die("server_strided: op from buf failed",rc);
962     }
963 
964     if(msginfo->ehlen) /* process extra header if available */
965          armci_process_extheader(msginfo, dscr_save, buf, buflen);
966     ARMCI_PR_DBG("exit",0);
967 }
968 
969 
armci_server_vector(request_header_t * msginfo,char * dscr,char * buf,int buflen)970 void armci_server_vector( request_header_t *msginfo,
971                           char *dscr, char* buf, int buflen)
972 {
973     int  proc;
974     long  len;
975     void *scale;
976     int  i,s;
977     char *sbuf = buf;
978     if(msginfo->operation==PUT && msginfo->datalen==0)return;/*return if using readv/socket for put*/
979     /* unpack descriptor record */
980     GETBUF(dscr, long ,len);
981 
982     /* get scale for accumulate, adjust buf to point to data */
983     scale = buf;
984     switch(msginfo->operation){
985     case ARMCI_ACC_INT:     buf += sizeof(int); break;
986     case ARMCI_ACC_DCP:     buf += 2*sizeof(double); break;
987     case ARMCI_ACC_DBL:     buf += sizeof(double); break;
988     case ARMCI_ACC_CPL:     buf += 2*sizeof(float); break;
989     case ARMCI_ACC_FLT:     buf += sizeof(float); break;
990     }
991 
992     proc = msginfo->to;
993 
994     /*fprintf(stderr,"scale=%lf\n",*(double*)scale);*/
995     /* execute the operation */
996 
997     switch(msginfo->operation) {
998     case GET:
999 /*        fprintf(stderr, "%d:: Got a vector message!!\n", armci_me); */
1000       if(msginfo->ehlen) {
1001 	armci_die("Unexpected vector message with non-zero ehlen. GPC call?",
1002 		   msginfo->ehlen);
1003       }
1004       else {
1005 	for(i = 0; i< len; i++){
1006 	  int parlen, bytes;
1007 	  void **ptr;
1008 	  GETBUF(dscr, int, parlen);
1009 	  GETBUF(dscr, int, bytes);
1010 	  /*        fprintf(stderr,"len=%d bytes=%d parlen=%d\n",len,bytes,parlen);*/
1011 	  ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1012 	  for(s=0; s< parlen; s++){
1013 	    armci_copy(ptr[s], buf, bytes);
1014 	    buf += bytes;
1015 	  }
1016 	}
1017 /*     fprintf(stderr,"%d:: VECTOR GET. server sending buffer %p datalen=%d\n",armci_me, sbuf, msginfo->datalen); */
1018 	armci_send_data(msginfo, sbuf);
1019       }
1020       break;
1021 
1022     case PUT:
1023 
1024 /*    fprintf(stderr,"received in buffer %lf\n",*(double*)buf);*/
1025       for(i = 0; i< len; i++){
1026         int parlen, bytes;
1027         void **ptr;
1028         GETBUF(dscr, int, parlen);
1029         GETBUF(dscr, int, bytes);
1030         ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1031         for(s=0; s< parlen; s++){
1032 /*
1033           armci_copy(buf, ptr[s], bytes);
1034 */
1035           bcopy(buf, ptr[s], (size_t)bytes);
1036           buf += bytes;
1037         }
1038       }
1039       break;
1040 
1041      default:
1042 
1043       /* this should be accumulate */
1044       if(!ARMCI_ACC(msginfo->operation))
1045                armci_die("v server: wrong op code",msginfo->operation);
1046 
1047 /*      fprintf(stderr,"received first=%lf last =%lf in buffer\n",*/
1048 /*                     *((double*)buf),((double*)buf)[99]);*/
1049 
1050       for(i = 0; i< len; i++){
1051         int parlen, bytes;
1052         void **ptr;
1053         GETBUF(dscr, int, parlen);
1054         GETBUF(dscr, int, bytes);
1055         ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1056         armci_lockmem_scatter(ptr, parlen, bytes, proc);
1057         for(s=0; s< parlen; s++){
1058           armci_acc_2D(msginfo->operation, scale, proc, buf, ptr[s],
1059                        bytes, 1, bytes, bytes, 0);
1060           buf += bytes;
1061         }
1062         ARMCI_UNLOCKMEM(proc);
1063       }
1064     }
1065 }
1066