1 #if HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 /* $Id: request.c,v 1.74.2.11 2007-10-18 06:09:37 d3h325 Exp $ */
6 #include "armcip.h"
7 #include "request.h"
8 #include "memlock.h"
9 #include "armci_shmem.h"
10 #include "copy.h"
11 #include "gpc.h"
12 #include <stdio.h>
13 #include <signal.h>
14
15 #define DEBUG_ 0
16 #define DEBUG_MEM 0
17
18 #if 0
19 # define MARK_ENTER(func_) { fprintf(stdout, "ENTERING %s\n", func_); fflush(stdout); }
20 # define MARK_EXIT(func_) { fprintf(stdout, "EXITING %s\n", func_); fflush(stdout); }
21 #else
22 # define MARK_ENTER(func_)
23 # define MARK_EXIT(func_)
24 #endif
25
26 #if 0
27 # define PRNDBG3(m,a1,a2,a3) \
28 fprintf(stderr,"DBG %d: " m,armci_me,a1,a2,a3);fflush(stderr)
29 # define PRNDBG(m) PRNDBG3(m,0,0,0)
30 # define PRNDBG1(m,a1) PRNDBG3(m,a1,0,0)
31 # define PRNDBG2(m,a1,a2) PRNDBG3(m,a1,a2,0)
32 #else
33 # define PRNDBG(m)
34 # define PRNDBG1(m,a1)
35 # define PRNDBG2(m,a1,a2)
36 # define PRNDBG3(m,a1,a2,a3)
37 #endif
38
39
40 #if !defined(GM) && !defined(VIA) && !defined(LAPI) &&!defined(VAPI)
41 double _armci_rcv_buf[MSG_BUFLEN_DBL];
42 double _armci_snd_buf[MSG_BUFLEN_DBL];
43 char* MessageSndBuffer = (char*)_armci_snd_buf;
44 char* MessageRcvBuffer = (char*)_armci_rcv_buf;
45 #endif
46
47
48 #define MAX_EHLEN 248
49 #define ADDBUF(buf,type,val) *(type*)(buf) = (val); (buf) += sizeof(type)
50 #define GETBUF(buf,type,var) (var) = *(type*)(buf); (buf) += sizeof(type)
51
52 #define ALLIGN8(buf){size_t _adr=(size_t)(buf); \
53 _adr>>=3; _adr<<=3; _adr+=8; (buf) = (char*)_adr; }
54
55 #ifndef CLN
56 # define CLN 1
57 #endif
58 #ifndef SERV
59 # define SERV 2
60 #endif
61
62 /*******************Routines to handle completion descriptor******************/
63 /*\
64 *Following the the routines to fill a completion descriptor, if necessary
65 *copy the data to destination based on completion descriptor
66 *NOTE, THE FOLLOWING ROUTINES ARE FOR CLIENTS ONLY
67 \*/
68
69
70 /*\Routine to complete a vector request, data is in buf and descriptor in dscr
71 \*/
72 extern int armci_direct_vector_get(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
armci_complete_vector_get(armci_giov_t darr[],int len,void * buf)73 static void armci_complete_vector_get(armci_giov_t darr[],int len,void *buf)
74 {
75 int proc;
76 request_header_t *msginfo = (request_header_t*) buf;
77 proc = msginfo->to;
78 #if defined(USE_SOCKET_VECTOR_API)
79 armci_direct_vector_get(msginfo, darr, len, proc);
80 #else
81 armci_rcv_vector_data(proc, msginfo, darr, len);
82 #endif
83 FREE_SEND_BUFFER(buf);
84 }
85
86
87
88
89
90
91 /*\ Routine called from buffers.c to complete a request for which the buffer was
92 * used for, so that the buffer can be reused.
93 \*/
armci_complete_req_buf(BUF_INFO_T * info,void * buffer)94 void armci_complete_req_buf(BUF_INFO_T *info, void *buffer)
95 {
96 request_header_t *msginfo = (request_header_t*) buffer;
97 ARMCI_PR_DBG("enter",0);
98 if(info->protocol==0)return;
99 else if(info->protocol==SDSCR_IN_PLACE){
100 char *dscr = info->dscr;
101 void *loc_ptr;
102 int stride_levels;
103 int *loc_stride_arr,*count;
104
105 loc_ptr = *(void**)dscr; dscr += sizeof(void*);
106 stride_levels = *(int*)dscr; dscr += sizeof(int);
107 loc_stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
108 count = (int*)dscr;
109 if(0 || DEBUG_){
110 if(armci_me==0){
111 printf("\n%d:extracted loc_ptr=%p, stridelevels=%d\n",armci_me,
112 loc_ptr,stride_levels);
113 fflush(stdout);
114 }
115 }
116 armci_rcv_strided_data(msginfo->to, msginfo, msginfo->datalen, loc_ptr,
117 stride_levels,loc_stride_arr,count);
118 FREE_SEND_BUFFER(msginfo);
119 }
120 else if(info->protocol==VDSCR_IN_PLACE || info->protocol==VDSCR_IN_PTR){
121 char *dscr;
122 int len,i;
123 if(info->protocol==VDSCR_IN_PLACE){
124 dscr = info->dscr;
125 //printf("\n%d:vdscr in place\n",armci_me);
126 }
127 else {
128 dscr = info->ptr.dscrbuf;
129 //printf("\n%d:vdscr in buf\n",armci_me);
130 }
131 GETBUF(dscr, long ,len);
132 {
133 armci_giov_t *darr;
134 darr = (armci_giov_t *)malloc(sizeof(armci_giov_t)*len);
135 if(!darr)armci_die("malloc in complete_req_buf failed",len);
136 for(i = 0; i< len; i++){
137 int parlen, bytes;
138 GETBUF(dscr, int, parlen);
139 GETBUF(dscr, int, bytes);
140 darr[i].ptr_array_len = parlen;
141 darr[i].bytes = bytes;
142 if(msginfo->operation==GET)darr[i].dst_ptr_array=(void **)dscr;
143 else darr[i].src_ptr_array=(void **)dscr;
144 dscr+=sizeof(void *)*parlen;
145 }
146 if (msginfo->operation==GET) armci_complete_vector_get(darr,len,buffer);
147 }
148 }
149 else
150 armci_die("armci_complete_req_buf,protocol val invalid",info->protocol);
151 ARMCI_PR_DBG("exit",0);
152 }
153
154 extern long x_net_offset(void *,int);
155 /*\ save a part of strided descriptor needed to complete request
156
157 rmo: it seems as if save_
158
159 \*/
armci_save_strided_dscr(char ** bptr,void * rem_ptr,int rem_stride_arr[],int count[],int stride_levels,int is_nb,int proc)160 void armci_save_strided_dscr(char **bptr, void *rem_ptr,int rem_stride_arr[],
161 int count[], int stride_levels,int is_nb,int proc)
162 {
163 int i;
164 char *bufptr=*bptr;
165 BUF_INFO_T *info=NULL;
166 long network_offset,tmpoffset;
167 ARMCI_PR_DBG("enter",0);
168
169 # ifdef PORTALS_UNRESOLVED
170 if(!is_nb){
171 network_offset=x_net_offset(rem_ptr,proc);
172 if(DEBUG_){printf("\n%d:rem_ptr=%p offset=%d newrem=%p",armci_me,rem_ptr,network_offset,(char *)rem_ptr+network_offset);fflush(stdout);}
173 rem_ptr = (char *)rem_ptr+network_offset;
174 }
175 # endif
176
177 if(is_nb){
178 info=BUF_TO_BUFINFO(*bptr);
179 bufptr = (info->dscr);
180 }
181 *(void**)bufptr = rem_ptr; bufptr += sizeof(void*);
182 *(int*)bufptr = stride_levels; bufptr += sizeof(int);
183 for(i=0;i<stride_levels;i++)((int*)bufptr)[i] = rem_stride_arr[i];
184 bufptr += stride_levels*sizeof(int);
185 for(i=0;i< stride_levels+1;i++)((int*)bufptr)[i] = count[i];
186 bufptr += (1+stride_levels)*sizeof(int);
187 if((0 || DEBUG_) && is_nb){
188 bufptr = (info->dscr);
189 if(armci_me==0)
190 printf("\n%d:rem_ptr %p=%p stride_levels %d=%d\n",armci_me,
191 *(void**)bufptr,rem_ptr,
192 *(int*)(bufptr + sizeof(void*)),stride_levels);
193 }
194 /*remote_strided expects the pointer to point to the end of descr hence..*/
195 if(is_nb)
196 info->protocol=SDSCR_IN_PLACE;
197 else
198 *bptr=bufptr;
199 ARMCI_PR_DBG("exit",0);
200
201 }
202
203
204 /*\ save a part of vector descriptor needed to complete request
205 \*/
armci_save_vector_dscr(char ** bptr,armci_giov_t darr[],int len,int op,int is_nb,int proc)206 void armci_save_vector_dscr(char **bptr,armci_giov_t darr[],int len,
207 int op,int is_nb, int proc)
208 {
209 int i,size=sizeof(int);
210 BUF_INFO_T *info;
211 char *buf,*bufptr=*bptr;
212 void *rem_ptr;
213 long offst;
214 ARMCI_PR_DBG("enter",0);
215 if(is_nb){
216 for(i=0;i<len;i++){
217 size+=(2*sizeof(int)+darr[i].ptr_array_len * sizeof(void*));
218 }
219 info=BUF_TO_BUFINFO(bufptr);
220 /*if descr fits in available buffer, use it else do malloc */
221 if(size<=UBUF_LEN){
222 buf = info->dscr;
223 info->protocol=VDSCR_IN_PLACE;
224 }
225 else {
226 info->ptr.dscrbuf = (void *)malloc(size);
227 buf = (char *)info->ptr.dscrbuf;
228 info->protocol=VDSCR_IN_PTR;
229 }
230 }
231 else
232 buf=bufptr;
233
234 ADDBUF(buf,long,len); /* number of sets */
235 for(i=0;i<len;i++){
236 int j;
237 ADDBUF(buf,int,darr[i].ptr_array_len); /* number of elements */
238 ADDBUF(buf,int,darr[i].bytes); /* sizeof element */
239 if(op==GET) {
240 if(is_nb){
241 rem_ptr = darr[i].dst_ptr_array;
242 }
243 else {
244 # ifdef PORTALS_UNRESOLVED
245 for(j=0;j<darr[i].ptr_array_len;j++){
246 offst=x_net_offset(darr[i].src_ptr_array[j],proc);
247 darr[i].src_ptr_array[j]= (char*)darr[i].src_ptr_array[j]+offst;
248 }
249 # endif
250 rem_ptr = darr[i].src_ptr_array;
251 }
252 }
253 else {
254 # ifdef PORTALS_UNRESOLVED
255 for(j=0;j<darr[i].ptr_array_len;j++){
256 offst=x_net_offset(darr[i].dst_ptr_array[j],proc);
257 darr[i].dst_ptr_array[j]= (char*)darr[i].dst_ptr_array[j]+offst;
258 }
259 # endif
260 rem_ptr = darr[i].dst_ptr_array;
261 }
262 armci_copy(rem_ptr,buf, darr[i].ptr_array_len * sizeof(void*));
263 buf += darr[i].ptr_array_len*sizeof(void*);
264 }
265 if(!is_nb)
266 *bptr=buf;
267 ARMCI_PR_DBG("exit",0);
268 }
269
270 /*\
271 * If buf==null, set handle->bufid to val, else set it to the id of the buf
272 \*/
armci_set_nbhandle_bufid(armci_ihdl_t nb_handle,char * buf,int val)273 void armci_set_nbhandle_bufid(armci_ihdl_t nb_handle,char *buf,int val)
274 {
275 BUF_INFO_T *info;
276 if(buf){
277 info = BUF_TO_BUFINFO(buf);
278 val = info->bufid;
279 }
280 nb_handle->bufid = val;
281 }
282
283 /**************End--Routines to handle completion descriptor******************/
284
285
286 /*\ send request to server to LOCK MUTEX
287 \*/
armci_rem_lock(int mutex,int proc,int * ticket)288 void armci_rem_lock(int mutex, int proc, int *ticket)
289 {
290 request_header_t *msginfo;
291 int *ibuf;
292 int bufsize = sizeof(request_header_t)+sizeof(int);
293
294 msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,LOCK,proc);
295 bzero(msginfo,sizeof(request_header_t));
296
297 msginfo->datalen = sizeof(int);
298 msginfo->dscrlen = 0;
299 msginfo->from = armci_me;
300 msginfo->to = proc;
301 msginfo->operation = LOCK;
302 msginfo->format = mutex;
303 msginfo->bytes = msginfo->datalen + msginfo->dscrlen;
304
305 ibuf = (int*)(msginfo+1);
306 *ibuf = mutex;
307
308 armci_send_req(proc, msginfo, bufsize, 0);
309
310 /* receive ticket from server */
311 *ticket = *(int*)armci_rcv_data(proc,msginfo,0);
312 FREE_SEND_BUFFER(msginfo);
313
314 if(DEBUG_)fprintf(stderr,"%d receiving ticket %d\n",armci_me, *ticket);
315 }
316
317
318
319
armci_server_lock(request_header_t * msginfo)320 void armci_server_lock(request_header_t *msginfo)
321 {
322 int *ibuf = (int*)(msginfo+1);
323 int proc = msginfo->from;
324 int mutex;
325 int ticket;
326 ARMCI_PR_DBG("enter",0);
327
328 mutex = *(int*)ibuf;
329
330 /* acquire lock on behalf of requesting process */
331 ticket = armci_server_lock_mutex(mutex, proc, msginfo->tag);
332
333 if(ticket >-1){
334 /* got lock */
335 msginfo->datalen = sizeof(int);
336 armci_send_data(msginfo, &ticket);
337 }
338 ARMCI_PR_DBG("exit",0);
339 }
340
341
342 /*\ send request to server to UNLOCK MUTEX
343 \*/
armci_rem_unlock(int mutex,int proc,int ticket)344 void armci_rem_unlock(int mutex, int proc, int ticket)
345 {
346 request_header_t *msginfo;
347 int *ibuf;
348 int bufsize = sizeof(request_header_t)+sizeof(ticket);
349
350 msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,UNLOCK,proc);
351 bzero(msginfo,sizeof(request_header_t));
352
353 msginfo->dscrlen = msginfo->bytes = sizeof(ticket);
354 msginfo->datalen = 0;
355 msginfo->from = armci_me;
356 msginfo->to = proc;
357 msginfo->operation = UNLOCK;
358 msginfo->format = mutex;
359 ibuf = (int*)(msginfo+1);
360 *ibuf = ticket;
361
362 if(DEBUG_)fprintf(stderr,"%d sending unlock\n",armci_me);
363 armci_send_req(proc, msginfo, bufsize,0);
364 }
365
366
367
368 /*\ server unlocks mutex and passes lock to the next waiting process
369 \*/
armci_server_unlock(request_header_t * msginfo,char * dscr)370 void armci_server_unlock(request_header_t *msginfo, char* dscr)
371 {
372 int ticket = *(int*)dscr;
373 int mutex = msginfo->format;
374 int proc = msginfo->to;
375 int waiting;
376
377 waiting = armci_server_unlock_mutex(mutex,proc,ticket,&msginfo->tag);
378
379 if(waiting >-1){ /* -1 means that nobody is waiting */
380
381 ticket++;
382 /* pass ticket to the waiting process */
383 msginfo->from = waiting;
384 msginfo->datalen = sizeof(ticket);
385 armci_send_data(msginfo, &ticket);
386
387 }
388 }
389
armci_unlock_waiting_process(msg_tag_t tag,int proc,int ticket)390 void armci_unlock_waiting_process(msg_tag_t tag, int proc, int ticket)
391 {
392 request_header_t header;
393 request_header_t *msginfo = &header;
394
395 msginfo->datalen = sizeof(int);
396 msginfo->tag = tag;
397 msginfo->from = proc;
398 msginfo->to = armci_me;
399 armci_send_data(msginfo, &ticket);
400 }
401
armci_server_ptr(int id)402 void * armci_server_ptr(int id){
403 char *buf;
404 int bufsize = sizeof(int);
405 request_header_t *msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,ATTACH,armci_me);
406 bzero(msginfo,sizeof(request_header_t));
407 msginfo->from = armci_me;
408 msginfo->to = SERVER_NODE(armci_clus_me);
409 msginfo->dscrlen = 0;
410 msginfo->datalen = sizeof(int);
411 msginfo->operation = ATTACH;
412 msginfo->bytes = msginfo->dscrlen+ msginfo->datalen;
413 armci_copy(&id, msginfo +1, sizeof(int));
414 if(DEBUG_MEM){
415 printf("\n%d:attach req:sending id %d \n",armci_me,id);fflush(stdout);
416 }
417 armci_send_req(armci_master, msginfo, bufsize,0);
418 buf= armci_rcv_data(armci_master,msginfo,sizeof(void *));/* receive response */
419 if(DEBUG_MEM){
420 printf("\n%d:attach req:got %p \n",armci_me,buf);fflush(stdout);
421 }
422 FREE_SEND_BUFFER(msginfo);
423 ARMCI_PR_DBG("exit",0);
424 return (void *)buf;
425
426 }
427
428 /*\ control message to the server, e.g.: ATTACH to shmem, return ptr etc.
429 \*/
armci_serv_attach_req(void * info,int ilen,long size,void * resp,int rlen)430 void armci_serv_attach_req(void *info, int ilen, long size, void* resp,int rlen)
431 {
432 char *buf;
433 ARMCI_PR_DBG("enter",0);
434 int bufsize = 2*sizeof(request_header_t)+ilen + sizeof(long)+sizeof(rlen);
435 long *idlist=(long *)info;
436 request_header_t *msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,ATTACH,armci_me);
437 bzero(msginfo,sizeof(request_header_t));
438
439 msginfo->from = armci_me;
440 msginfo->to = SERVER_NODE(armci_clus_me);
441 msginfo->dscrlen = ilen;
442 msginfo->datalen = sizeof(long)+sizeof(int);
443 msginfo->operation = ATTACH;
444 msginfo->bytes = msginfo->dscrlen+ msginfo->datalen;
445
446 armci_copy(info, msginfo +1, ilen);
447 if(DEBUG_MEM){printf("\n%d:sending idlist+1 %d, size %d, idlist[0] %d, idlist[1] %d\n",armci_me,idlist+1,size,idlist[0],idlist[1]);}
448 buf = ((char*)msginfo) + ilen + sizeof(request_header_t);
449 *((long*)buf) =size;
450 *(int*)(buf+ sizeof(long)) = rlen;
451 armci_send_req(armci_master, msginfo, bufsize,0);
452 if(rlen){
453 buf= armci_rcv_data(armci_master, msginfo,rlen); /* receive response */
454 bcopy(buf, resp, rlen);
455 FREE_SEND_BUFFER(msginfo);
456
457 if(DEBUG_MEM){printf("%d:client attaching got ptr=%p %d bytes\n",armci_me,buf,rlen);
458 fflush(stdout);
459 }
460 }
461 ARMCI_PR_DBG("exit",0);
462 }
463
464
465 /*\ server initializes its copy of the memory lock data structures
466 \*/
server_alloc_memlock(void * ptr_myclus)467 static void server_alloc_memlock(void *ptr_myclus)
468 {
469 int i;
470
471 /* for protection, set pointers for processes outside local node NULL */
472 memlock_table_array = calloc(armci_nproc,sizeof(void*));
473 if(!memlock_table_array) armci_die("malloc failed for ARMCI lock array",0);
474
475 /* set pointers for processes on local cluster node
476 * ptr_myclus - corresponds to the master process
477 */
478 for(i=0; i< armci_clus_info[armci_clus_me].nslave; i++){
479 memlock_table_array[armci_master +i] = ((char*)ptr_myclus)
480 + MAX_SLOTS*sizeof(memlock_t)*i;
481 }
482
483 /* set pointer to the use flag */
484 #ifdef MEMLOCK_SHMEM_FLAG
485 armci_use_memlock_table = (int*) (MAX_SLOTS*sizeof(memlock_t) +
486 (char*) memlock_table_array[armci_clus_last]);
487
488 if(DEBUG_)
489 fprintf(stderr,"server initialized memlock %p\n",armci_use_memlock_table);
490 #endif
491 }
492
493
494 static int allocate_memlock=1;
495
496 /*\ server actions triggered by client request to ATTACH
497 \*/
armci_server_ipc(request_header_t * msginfo,void * descr,void * buffer,int buflen)498 void armci_server_ipc(request_header_t* msginfo, void* descr,
499 void* buffer, int buflen)
500 {
501 double *ptr;
502 long *idlist = (long*)descr;
503 long size = *(long*)buffer;
504 int rlen = *(int*)(sizeof(long)+(char*)buffer);
505 extern int **_armci_int_mutexes;
506 ARMCI_PR_DBG("enter",0);
507 if(size<0) armci_die("armci_server_ipc: size<0",(int)size);
508 if(DEBUG_MEM)printf("\n%d:got idlist+1 %p, size %d, idlist[0] %d, idlist[1] %d",armci_me,idlist+1,size,idlist[0],idlist[1]);
509 ptr=(double*)Attach_Shared_Region(idlist+1,size,idlist[0]);
510 if(!ptr)armci_die("armci_server_ipc: failed to attach",0);
511 /* provide data server with access to the memory lock data structures */
512 if(allocate_memlock){
513 allocate_memlock = 0;
514 server_alloc_memlock(ptr);
515 }
516 if(_armci_int_mutexes==NULL){
517 printf("unresolved portals external\n");
518 abort();
519 # ifdef PORTALS_UNRESOLVED
520 extern int _armci_server_mutex_ready;
521 extern void *_armci_server_mutex_ptr;
522 if(_armci_server_mutex_ready){
523 _armci_int_mutexes=(int **)_armci_server_mutex_ptr;
524 }
525 # endif
526 }
527 if(size>0)armci_set_mem_offset(ptr);
528
529 if(msginfo->datalen != sizeof(long)+sizeof(int))
530 armci_die("armci_server_ipc: bad msginfo->datalen ",msginfo->datalen);
531
532 if(rlen==sizeof(ptr)){
533 msginfo->datalen = rlen;
534 armci_send_data(msginfo, &ptr);
535 }
536 else armci_die("armci_server_ipc: bad rlen",rlen);
537 ARMCI_PR_DBG("exit",0);
538 }
539
540
541 /*\ send RMW request to server
542 \*/
armci_rem_rmw(int op,void * ploc,void * prem,int extra,int proc)543 void armci_rem_rmw(int op, void *ploc, void *prem, int extra, int proc)
544 {
545 request_header_t *msginfo;
546 char *buf;
547 void *buffer;
548 int bufsize = sizeof(request_header_t)+sizeof(long)+sizeof(void*);
549 long offst;
550
551 ARMCI_PR_DBG("enter",0);
552 msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,op,proc);
553 bzero(msginfo,sizeof(request_header_t));
554
555 msginfo->dscrlen = sizeof(void*);
556 msginfo->from = armci_me;
557 msginfo->to = proc;
558 msginfo->operation = op;
559 msginfo->datalen = sizeof(long);
560 # ifdef PORTALS_UNRESOLVED
561 offst=x_net_offset(prem,proc);
562 prem = ((char *)prem+offst);
563 # endif
564 buf = (char*)(msginfo+1);
565 ADDBUF(buf, void*, prem); /* pointer is shipped as descriptor */
566
567 /* data field: extra argument in fetch&add and local value in swap */
568 if(op==ARMCI_SWAP){
569 ADDBUF(buf, int, *((int*)ploc));
570 }else if(op==ARMCI_SWAP_LONG) {
571 ADDBUF(buf, long, *((long*)ploc) );
572 msginfo->datalen = sizeof(long);
573 }else {
574 ADDBUF(buf, int, extra);
575 }
576
577 msginfo->bytes = msginfo->datalen+msginfo->dscrlen ;
578
579 if(DEBUG_){
580 printf("%d sending RMW request %d to %d\n",armci_me,op,proc);
581 fflush(stdout);
582 }
583 armci_send_req(proc, msginfo, bufsize,0);
584 buffer = armci_rcv_data(proc,msginfo,0); /* receive response */
585
586 if(op==ARMCI_FETCH_AND_ADD || op== ARMCI_SWAP)
587 *(int*)ploc = *(int*)buffer;
588 else
589 *(long*)ploc = *(long*)buffer;
590
591 FREE_SEND_BUFFER(msginfo);
592 ARMCI_PR_DBG("exit",0);
593 }
594
595
596 /*\ server response to RMW
597 \*/
armci_server_rmw(request_header_t * msginfo,void * ptr,void * pextra)598 void armci_server_rmw(request_header_t* msginfo,void* ptr, void* pextra)
599 {
600 long lold;
601 int iold;
602 void *pold=0;
603 int op = msginfo->operation;
604
605 ARMCI_PR_DBG("enter",0);
606 if(DEBUG_){
607 printf("%d server: executing RMW from %d. op=%d pextra=%p\n",armci_me,msginfo->from, op, pextra);
608 fflush(stdout);
609 }
610 if(msginfo->datalen != sizeof(long))
611 armci_die2("armci_server_rmw: bad datalen=",msginfo->datalen,op);
612
613 /* for swap operations *pextra has the value to swap
614 * for fetc&add it carries the increment argument
615 */
616 switch(op){
617 case ARMCI_SWAP:
618 iold = *(int*) pextra;
619 case ARMCI_FETCH_AND_ADD:
620 pold = &iold;
621 break;
622
623 case ARMCI_SWAP_LONG:
624 lold = *(long*) pextra;
625 case ARMCI_FETCH_AND_ADD_LONG:
626 pold = &lold;
627 break;
628
629 default:
630 armci_die("armci_server_rmw: bad operation code=",op);
631 }
632
633 armci_generic_rmw(op, pold, *(int**)ptr, *(int*) pextra, msginfo->to);
634
635 armci_send_data(msginfo, pold);
636 ARMCI_PR_DBG("exit",0);
637 }
638
639 extern int armci_direct_vector_snd(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
640 extern int armci_direct_vector(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
armci_rem_vector(int op,void * scale,armci_giov_t darr[],int len,int proc,int flag,armci_ihdl_t nb_handle)641 int armci_rem_vector(int op, void *scale, armci_giov_t darr[],int len,int proc,int flag, armci_ihdl_t nb_handle)
642 {
643 char *buf,*buf0;
644 request_header_t *msginfo;
645 int bytes =0, s, slen=0;
646 size_t adr;
647 int bufsize = sizeof(request_header_t);
648 int tag=0;
649
650 if(nb_handle)tag=nb_handle->tag;
651
652 /* compute size of the buffer needed */
653 for(s=0; s<len; s++){
654 bytes += darr[s].ptr_array_len * darr[s].bytes; /* data */
655 bufsize += darr[s].ptr_array_len *sizeof(void*)+2*sizeof(int); /*descr*/
656 }
657
658 bufsize += bytes + sizeof(long) +2*sizeof(double) +8; /*+scale+allignment*/
659
660 buf = buf0= GET_SEND_BUFFER(bufsize,op,proc);
661 msginfo = (request_header_t*)buf;
662 bzero(msginfo,sizeof(request_header_t));
663
664 /* printf("%d:: rem_vector. len=%d. ptr_len[len-1]=%d bytes[len-1]=%d bufsize=%d\n", */
665 /* armci_me, len, darr[len-1].ptr_array_len, darr[len-1].bytes,bufsize); */
666 /* fflush(stdout); */
667
668
669 if(nb_handle){
670 /* INIT_SENDBUF_INFO(nb_handle,buf,op,proc); redundant -- see armci_rem_strided */
671 _armci_buf_set_tag(buf,nb_handle->tag,0);
672 if(nb_handle->bufid == NB_NONE)
673 armci_set_nbhandle_bufid(nb_handle,buf,0);
674 }
675
676 buf += sizeof(request_header_t);
677
678 /* fill vector descriptor */
679 armci_save_vector_dscr(&buf,darr,len,op,0,proc);
680
681 /* align buf for doubles (8-bytes) before copying data */
682 adr = (size_t)buf;
683 adr >>=3;
684 adr <<=3;
685 adr +=8;
686 buf = (char*)adr;
687
688 msginfo->ehlen = 0;
689
690 /* fill message header */
691 msginfo->dscrlen = buf - buf0 - sizeof(request_header_t);
692 msginfo->from = armci_me;
693 msginfo->to = proc;
694 msginfo->operation = op;
695 msginfo->format = VECTOR;
696 msginfo->datalen = bytes;
697
698 /* put scale for accumulate */
699 switch(op){
700 case ARMCI_ACC_INT:
701 *(int*)buf = *(int*)scale; slen= sizeof(int); break;
702 case ARMCI_ACC_DCP:
703 ((double*)buf)[0] = ((double*)scale)[0];
704 ((double*)buf)[1] = ((double*)scale)[1];
705 slen=2*sizeof(double);break;
706 case ARMCI_ACC_DBL:
707 *(double*)buf = *(double*)scale; slen = sizeof(double); break;
708 case ARMCI_ACC_CPL:
709 ((float*)buf)[0] = ((float*)scale)[0];
710 ((float*)buf)[1] = ((float*)scale)[1];
711 slen=2*sizeof(float);break;
712 case ARMCI_ACC_FLT:
713 *(float*)buf = *(float*)scale; slen = sizeof(float); break;
714 default: slen=0;
715 }
716 buf += slen;
717 msginfo->datalen += slen;
718 msginfo->bytes = msginfo->datalen+msginfo->dscrlen;
719
720
721 /* for put and accumulate copy data into buffer */
722 if(op != GET){
723 /* fprintf(stderr,"sending %lf\n",*(double*)darr[0].src_ptr_array[0]);*/
724 armci_vector_to_buf(darr, len, buf);
725 }
726
727 armci_send_req(proc, msginfo, bufsize,tag);
728 /*x_buf_send_complete(buf0);*/
729
730 if(nb_handle && op==GET) armci_save_vector_dscr(&buf0,darr,len,op,1,proc);
731 if(op == GET&& !nb_handle){
732 armci_complete_vector_get(darr,len,msginfo);
733 }
734
735 return 0;
736 }
737
738 #define CHUN_ (8*8096)
739 #define CHUN 200000
740
741 /*\ client version of remote strided operation
742 \*/
armci_rem_strided(int op,void * scale,int proc,void * src_ptr,int src_stride_arr[],void * dst_ptr,int dst_stride_arr[],int count[],int stride_levels,ext_header_t * h,int flag,armci_ihdl_t nb_handle)743 int armci_rem_strided(int op, void* scale, int proc,
744 void *src_ptr, int src_stride_arr[],
745 void* dst_ptr, int dst_stride_arr[],
746 int count[], int stride_levels,
747 ext_header_t *h, int flag,armci_ihdl_t nb_handle)
748 {
749 char *buf, *buf0;
750 request_header_t *msginfo;
751 int i, slen=0, bytes;
752 void *rem_ptr;
753 int *rem_stride_arr;
754 int bufsize = sizeof(request_header_t);
755 int ehlen =0;
756 msg_tag_t msg_tag;
757 int tag=0;
758
759 /* we send ext header only for last chunk */
760 #if 0
761 if(h) ehlen = h->len;
762 #else
763 if(h) if(h->last) ehlen = h->len;
764 #endif
765 if(ehlen>MAX_EHLEN || ehlen <0)
766 armci_die2("armci_rem_strided ehlen out of range",MAX_EHLEN,ehlen);
767 /* calculate size of the buffer needed */
768 for(i=0, bytes=1;i<=stride_levels;i++)bytes*=count[i];
769 bufsize += bytes+sizeof(void*)+2*sizeof(int)*(stride_levels+1) +ehlen
770 +2*sizeof(double) + 16; /* +scale+alignment */
771
772 if (flag){
773 printf("%d: flag=%d\n",armci_me,flag);
774 if(op==GET)bufsize -=bytes;
775 }
776
777 buf = buf0= GET_SEND_BUFFER((bufsize),op,proc);
778 msginfo = (request_header_t*)buf;
779 bzero(msginfo,sizeof(request_header_t));
780
781
782 if(nb_handle)
783 #ifdef ACC_SMP
784 if(!ARMCI_ACC(op))
785 #endif
786 {
787 // printf("%s: non-blocking ops not yet supported\n",Portals_ID());
788 // abort();
789 /* INIT_SENDBUF_INFO(nb_handle,buf,op,proc); same as _armci_buf_set_tag, why here? */
790 _armci_buf_set_tag(buf,nb_handle->tag,0);
791 if(nb_handle->bufid == NB_NONE)
792 armci_set_nbhandle_bufid(nb_handle,buf,0);
793 tag = nb_handle->tag;
794 }
795
796 if(op == GET){
797 rem_ptr = src_ptr;
798 rem_stride_arr = src_stride_arr;
799 }else{
800 rem_ptr = dst_ptr;
801 rem_stride_arr = dst_stride_arr;
802 }
803
804 msginfo->datalen=bytes;
805
806 /* fill strided descriptor */
807 buf += sizeof(request_header_t);
808 /*this function fills the dscr into buf and also moves the buf ptr to the
809 end of the dscr*/
810 armci_save_strided_dscr(&buf,rem_ptr,rem_stride_arr,count,stride_levels,0,proc);
811
812 /* align buf for doubles (8-bytes) before copying data */
813 ALLIGN8(buf);
814
815 /* fill message header */
816 msginfo->from = armci_me;
817 msginfo->to = proc;
818 msginfo->format = STRIDED;
819 msginfo->operation = op;
820
821 /* put scale for accumulate */
822 switch(op){
823 case ARMCI_ACC_INT:
824 *(int*)buf = *(int*)scale; slen= sizeof(int); break;
825 case ARMCI_ACC_DCP:
826 ((double*)buf)[0] = ((double*)scale)[0];
827 ((double*)buf)[1] = ((double*)scale)[1];
828 slen=2*sizeof(double);break;
829 case ARMCI_ACC_DBL:
830 *(double*)buf = *(double*)scale; slen = sizeof(double); break;
831 case ARMCI_ACC_CPL:
832 ((float*)buf)[0] = ((float*)scale)[0];
833 ((float*)buf)[1] = ((float*)scale)[1];
834 slen=2*sizeof(float);break;
835 case ARMCI_ACC_FLT:
836 *(float*)buf = *(float*)scale; slen = sizeof(float); break;
837 case ARMCI_ACC_LNG:
838 *(long*)buf = *(long*)scale; slen = sizeof(long); break;
839 default: slen=0;
840 }
841
842 /*
843 if(ARMCI_ACC(op))printf("%d client len=%d alpha=%lf data=%lf,%lf\n",
844 armci_me, buf-(char*)msginfo,((double*)buf)[0],*((double*)src_ptr), ((double*)buf)[1]);
845 */
846
847 buf += slen;
848
849 /**** add extended header *******/
850 if(ehlen){
851 bcopy(h->exthdr,buf,ehlen);
852 i = ehlen%8; ehlen += (8-i); /* make sure buffer is still alligned */
853 buf += ehlen;
854 }
855
856 msginfo->ehlen = ehlen;
857 msginfo->dscrlen = buf - buf0 - sizeof(request_header_t);
858 msginfo->bytes = msginfo->datalen+msginfo->dscrlen;
859
860 if(op == GET){
861 /*
862 if(nb_handle) {
863 printf("%s rem_strided: nb gets not yet available\n",Portals_ID());
864 abort();
865 }
866 */
867 armci_send_req(proc, msginfo, bufsize,tag);
868 armci_save_strided_dscr(&buf0,dst_ptr,dst_stride_arr,count,
869 stride_levels,1,proc);
870
871 if(!nb_handle){
872 armci_rcv_strided_data(proc, msginfo, msginfo->datalen,
873 dst_ptr, stride_levels, dst_stride_arr, count);
874 FREE_SEND_BUFFER(msginfo);
875 }
876 } else {
877 /* for put and accumulate send data */
878 armci_send_strided(proc,msginfo, buf,
879 src_ptr, stride_levels, src_stride_arr, count,tag);
880 }
881
882 return 0;
883 }
884
885
armci_process_extheader(request_header_t * msginfo,char * dscr,char * buf,int buflen)886 void armci_process_extheader(request_header_t *msginfo, char *dscr, char* buf, int buflen)
887 {
888 armci_flag_t *h;
889 int *flag;
890
891 h = (armci_flag_t*)(dscr + msginfo->dscrlen - msginfo->ehlen);
892 #if 0
893 if(msginfo->ehlen)printf("%d:server from=%d len=%d: ptr=%p val=%d\n",armci_me,msginfo->from, msginfo->ehlen,h->ptr,h->val);
894 fflush(stdout);
895 #endif
896 flag = (int*)(h->ptr);
897 *flag = h->val;
898 }
899
armci_server(request_header_t * msginfo,char * dscr,char * buf,int buflen)900 void armci_server(request_header_t *msginfo, char *dscr, char* buf, int buflen)
901 {
902 int buf_stride_arr[MAX_STRIDE_LEVEL+1];
903 int *loc_stride_arr,slen;
904 int *count, stride_levels;
905 void *buf_ptr, *loc_ptr;
906 void *scale;
907 char *dscr_save = dscr;
908 int rc, i,proc;
909 int stat;
910
911 ARMCI_PR_DBG("enter",msginfo->datalen);fflush(stdout);
912 /*return if using readv/socket for put*/
913 if(msginfo->operation==PUT && msginfo->datalen==0){
914 if(msginfo->ehlen) /* process extra header if available */
915 armci_process_extheader(msginfo, dscr, buf, buflen);
916 return;
917 }
918
919 /* unpack descriptor record */
920 loc_ptr = *(void**)dscr; dscr += sizeof(void*);
921 stride_levels = *(int*)dscr; dscr += sizeof(int);
922 loc_stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
923 count = (int*)dscr;
924
925 /* compute stride array for buffer */
926 buf_stride_arr[0]=count[0];
927 for(i=0; i< stride_levels; i++)
928 buf_stride_arr[i+1]= buf_stride_arr[i]*count[i+1];
929
930 /* get scale for accumulate, adjust buf to point to data */
931 switch(msginfo->operation){
932 case ARMCI_ACC_INT: slen = sizeof(int); break;
933 case ARMCI_ACC_DCP: slen = 2*sizeof(double); break;
934 case ARMCI_ACC_DBL: slen = sizeof(double); break;
935 case ARMCI_ACC_CPL: slen = 2*sizeof(float); break;
936 case ARMCI_ACC_FLT: slen = sizeof(float); break;
937 case ARMCI_ACC_LNG: slen = sizeof(long); break;
938 default: slen=0;
939 }
940
941 scale = dscr_save+ (msginfo->dscrlen - slen -msginfo->ehlen);
942 /*
943 if(ARMCI_ACC(msginfo->operation))
944 fprintf(stderr,"%d in server len=%d slen=%d alpha=%lf data=%lf\n",
945 armci_me, msginfo->dscrlen, slen, *(double*)scale,*(double*)buf);
946 */
947
948 buf_ptr = buf; /* data in buffer */
949
950 proc = msginfo->to;
951
952 if(msginfo->operation == GET){
953 armci_send_strided_data(proc, msginfo, buf,
954 loc_ptr, stride_levels, loc_stride_arr, count);
955 /* fprintf(stderr, "GET response sent with tag: %d\n, msginfo->tag",
956 msginfo->tag); */
957 } else{
958 if((rc = armci_op_strided(msginfo->operation, scale, proc,
959 buf_ptr, buf_stride_arr, loc_ptr, loc_stride_arr,
960 count, stride_levels, 1,NULL)))
961 armci_die("server_strided: op from buf failed",rc);
962 }
963
964 if(msginfo->ehlen) /* process extra header if available */
965 armci_process_extheader(msginfo, dscr_save, buf, buflen);
966 ARMCI_PR_DBG("exit",0);
967 }
968
969
armci_server_vector(request_header_t * msginfo,char * dscr,char * buf,int buflen)970 void armci_server_vector( request_header_t *msginfo,
971 char *dscr, char* buf, int buflen)
972 {
973 int proc;
974 long len;
975 void *scale;
976 int i,s;
977 char *sbuf = buf;
978 if(msginfo->operation==PUT && msginfo->datalen==0)return;/*return if using readv/socket for put*/
979 /* unpack descriptor record */
980 GETBUF(dscr, long ,len);
981
982 /* get scale for accumulate, adjust buf to point to data */
983 scale = buf;
984 switch(msginfo->operation){
985 case ARMCI_ACC_INT: buf += sizeof(int); break;
986 case ARMCI_ACC_DCP: buf += 2*sizeof(double); break;
987 case ARMCI_ACC_DBL: buf += sizeof(double); break;
988 case ARMCI_ACC_CPL: buf += 2*sizeof(float); break;
989 case ARMCI_ACC_FLT: buf += sizeof(float); break;
990 }
991
992 proc = msginfo->to;
993
994 /*fprintf(stderr,"scale=%lf\n",*(double*)scale);*/
995 /* execute the operation */
996
997 switch(msginfo->operation) {
998 case GET:
999 /* fprintf(stderr, "%d:: Got a vector message!!\n", armci_me); */
1000 if(msginfo->ehlen) {
1001 armci_die("Unexpected vector message with non-zero ehlen. GPC call?",
1002 msginfo->ehlen);
1003 }
1004 else {
1005 for(i = 0; i< len; i++){
1006 int parlen, bytes;
1007 void **ptr;
1008 GETBUF(dscr, int, parlen);
1009 GETBUF(dscr, int, bytes);
1010 /* fprintf(stderr,"len=%d bytes=%d parlen=%d\n",len,bytes,parlen);*/
1011 ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1012 for(s=0; s< parlen; s++){
1013 armci_copy(ptr[s], buf, bytes);
1014 buf += bytes;
1015 }
1016 }
1017 /* fprintf(stderr,"%d:: VECTOR GET. server sending buffer %p datalen=%d\n",armci_me, sbuf, msginfo->datalen); */
1018 armci_send_data(msginfo, sbuf);
1019 }
1020 break;
1021
1022 case PUT:
1023
1024 /* fprintf(stderr,"received in buffer %lf\n",*(double*)buf);*/
1025 for(i = 0; i< len; i++){
1026 int parlen, bytes;
1027 void **ptr;
1028 GETBUF(dscr, int, parlen);
1029 GETBUF(dscr, int, bytes);
1030 ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1031 for(s=0; s< parlen; s++){
1032 /*
1033 armci_copy(buf, ptr[s], bytes);
1034 */
1035 bcopy(buf, ptr[s], (size_t)bytes);
1036 buf += bytes;
1037 }
1038 }
1039 break;
1040
1041 default:
1042
1043 /* this should be accumulate */
1044 if(!ARMCI_ACC(msginfo->operation))
1045 armci_die("v server: wrong op code",msginfo->operation);
1046
1047 /* fprintf(stderr,"received first=%lf last =%lf in buffer\n",*/
1048 /* *((double*)buf),((double*)buf)[99]);*/
1049
1050 for(i = 0; i< len; i++){
1051 int parlen, bytes;
1052 void **ptr;
1053 GETBUF(dscr, int, parlen);
1054 GETBUF(dscr, int, bytes);
1055 ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1056 armci_lockmem_scatter(ptr, parlen, bytes, proc);
1057 for(s=0; s< parlen; s++){
1058 armci_acc_2D(msginfo->operation, scale, proc, buf, ptr[s],
1059 bytes, 1, bytes, bytes, 0);
1060 buf += bytes;
1061 }
1062 ARMCI_UNLOCKMEM(proc);
1063 }
1064 }
1065 }
1066