1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 
5 /* $Id: mutex.c,v 1.24.10.1 2006-12-21 23:50:48 manoj Exp $ */
6 #include "armcip.h"
7 #include "copy.h"
8 #include "request.h"
9 #include <stdio.h>
10 
11 #define DEBUG 0
12 #define MAX_LOCKS 32768
13 #define SPINMAX 1000
14 
15 #if defined(LAPI) || defined(GM)
16 #  define SERVER_LOCK
17 #endif
18 
19 double _dummy_work_=0.;
20 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
21 int mymutexcount;
22 double _dummy_server_work_=0.;
23 #endif
24 static int num_mutexes=0, *tickets;
25 
26 typedef struct {
27        int mutex;
28        int turn;
29        msg_tag_t tag;
30 } waiting_list_t;
31 
32 
33 /* data structure to store info about blocked (waiting) process for mutex */
34 static waiting_list_t* blocked=(waiting_list_t*)0;
35 
36 
37 typedef struct {
38 int* token;
39 int* turn;
40 int* tickets;
41 int count;
42 } mutex_entry_t;
43 
44 void** mutex_mem_ar;
45 mutex_entry_t *glob_mutex;
46 
47 
PARMCI_Create_mutexes(int num)48 int PARMCI_Create_mutexes(int num)
49 {
50 int rc,p, totcount;
51 int *mutex_count = (int*)armci_internal_buffer;
52 
53   if((sizeof(int)*armci_nproc) > armci_getbufsize()){
54     mutex_count = (double *)malloc(sizeof(int)*armci_nproc);
55   }
56 	if (num < 0 || num > MAX_LOCKS) return(FAIL);
57   if(num_mutexes) armci_die("mutexes already created",num_mutexes);
58 
59   if(armci_nproc == 1){  num_mutexes=1; return(0); }
60 
61   /* local memory allocation for mutex arrays*/
62   mutex_mem_ar = (void*) malloc(armci_nproc*sizeof(void*));
63   if(!mutex_mem_ar) armci_die("PARMCI_Create_mutexes: malloc failed",0);
64     glob_mutex = (void*)malloc(armci_nproc*sizeof(mutex_entry_t));
65   if(!glob_mutex){
66     free(mutex_mem_ar);
67     armci_die("PARMCI_Create_mutexes: malloc 2 failed",0);
68   }
69 
70 
71 /*        bzero(mutex_count,armci_nproc*sizeof(int));*/
72   bzero((char*)mutex_count,sizeof(int)*armci_nproc);
73 
74         /* find out how many mutexes everybody allocated */
75         mutex_count[armci_me]=num;
76         armci_msg_igop(mutex_count, armci_nproc, "+");
77 	for(p=totcount=0; p< armci_nproc; p++)totcount+=mutex_count[p];
78 
79         tickets = calloc(totcount,sizeof(int));
80         if(!tickets) {
81            free(glob_mutex);
82            free(mutex_mem_ar);
83            return(FAIL2);
84         }
85 
86         /* we need memory for token and turn - 2 ints */
87 	rc = PARMCI_Malloc(mutex_mem_ar,2*num*sizeof(int));
88         if(rc){
89            free(glob_mutex);
90            free(mutex_mem_ar);
91            free(tickets);
92            return(FAIL3);
93         }
94 
95         if(num)bzero((char*)mutex_mem_ar[armci_me],2*num*sizeof(int));
96 
97         /* setup global mutex array */
98 	for(p=totcount=0; p< armci_nproc; p++){
99            glob_mutex[p].token   = mutex_mem_ar[p];
100            glob_mutex[p].turn    = glob_mutex[p].token + mutex_count[p];
101            glob_mutex[p].count   = mutex_count[p];
102            glob_mutex[p].tickets = tickets + totcount;
103            totcount += mutex_count[p];
104         }
105 
106         num_mutexes= totcount;
107 #ifdef LAPI
108         mymutexcount = num;
109 #endif
110         PARMCI_Barrier();
111 
112         if(DEBUG)
113            fprintf(stderr,"%d created (%d,%d) mutexes\n",armci_me,num,totcount);
114 
115         return(0);
116 }
117 
118 
armci_serv_mutex_create()119 void armci_serv_mutex_create()
120 {
121      int mem = armci_nproc*sizeof(waiting_list_t);
122      blocked = (waiting_list_t*)malloc(mem);
123      if(!blocked) armci_die("armci server:error allocating mutex memory ",0);
124 }
125 
126 
armci_serv_mutex_close()127 void armci_serv_mutex_close()
128 {
129      if(blocked) free(blocked );
130      blocked = (waiting_list_t*)0;
131 }
132 
133 
PARMCI_Destroy_mutexes()134 int PARMCI_Destroy_mutexes()
135 {
136 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
137      int proc, mutex, i,factor=0;
138 #endif
139      if(num_mutexes==0)armci_die("armci_destroy_mutexes: not created",0);
140      if(armci_nproc == 1) return(0);
141 
142      armci_msg_barrier();
143 
144 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
145      for(proc=0;proc<armci_nproc;proc++){
146           for(mutex=0;mutex<glob_mutex[proc].count;mutex++){
147             _dummy_server_work_ = 0.; /* must be global to fool the compiler */
148             while(!armci_mutex_free(mutex,proc)){
149               for(i=0; i<  SPINMAX *factor; i++) _dummy_server_work_ += 1.;
150 	      factor+=1;
151             }
152 	  }
153      }
154 #endif
155      num_mutexes=0;
156 
157 #    if defined(SERVER_LOCK)
158         armci_serv_mutex_close();
159 #    endif
160 
161      // RMO: Forcing PARMCI_Free to be a collective operation
162      // if(glob_mutex[armci_me].count)
163      PARMCI_Free(glob_mutex[armci_me].token);
164 
165      free(tickets);
166      free(glob_mutex);
167      free(mutex_mem_ar);
168 
169      return(0);
170 }
171 
172 
register_in_mutex_queue(int id,int proc)173 static int register_in_mutex_queue(int id, int proc)
174 {
175 int *mutex_entry, ticket;
176 
177     if(glob_mutex[proc].count < id)
178        armci_die2("armci:invalid mutex id",id, glob_mutex[proc].count);
179     mutex_entry = glob_mutex[proc].token  + id;
180     PARMCI_Rmw(ARMCI_FETCH_AND_ADD, &ticket, mutex_entry, 1, proc);
181 
182     return ticket;
183 }
184 
185 
186 /*\ check if mutex is available by comparing turn and token
187  *  can only be called by a process on the same SMP node as proc
188 \*/
armci_mutex_free(int mutex,int proc)189 static int armci_mutex_free(int mutex, int proc)
190 {
191 volatile int *mutex_ticket=glob_mutex[proc].turn + mutex;
192 volatile int *turn = glob_mutex[proc].token  +mutex;
193 
194       /* here we will put code to check if other processes on the node
195        * are waiting for this mutex
196        * lockinfo_node[me].ticket = mutex_ticket;
197        * lockinfo_node[me].mutex  = mutex;
198        */
199 
200        if(*mutex_ticket == *turn) return 1;
201        else return 0;
202 }
203 
204 
205 
armci_generic_lock(int mutex,int proc)206 static void armci_generic_lock(int mutex, int proc)
207 {
208 int i, myturn, factor=0, len=sizeof(int);
209 int  *mutex_ticket, next_in_line;
210 
211       mutex_ticket= glob_mutex[proc].turn + mutex;
212       myturn = register_in_mutex_queue(mutex, proc);
213 
214       /* code to reduce cost of unlocking mutex on the same SMP node goes here
215        * lockinfo_node[me].ticket = mutex_ticket;
216        * lockinfo_node[me].mutex  = mutex;
217        */
218 
219       _dummy_work_ = 0.; /* must be global to fool the compiler */
220       do {
221 
222            PARMCI_Get(mutex_ticket, &next_in_line, len, proc);
223            if(next_in_line > myturn)
224               armci_die2("armci: problem with tickets",myturn,next_in_line);
225 
226            /* apply a linear backoff delay before retrying  */
227            for(i=0; i<  SPINMAX * factor; i++) _dummy_work_ += 1.;
228 
229            factor += 1;
230 
231       }while (myturn != next_in_line);
232 
233       glob_mutex[proc].tickets[mutex] = myturn; /* save ticket value */
234 }
235 
236 
armci_generic_unlock(int mutex,int proc)237 static void armci_generic_unlock(int mutex, int proc)
238 {
239 int *mutex_ticket= glob_mutex[proc].turn + mutex;
240 int *newval = glob_mutex[proc].tickets +mutex;
241 int len=sizeof(int);
242 
243        /* update ticket for next process requesting this mutex */
244        (*newval) ++;
245 
246        /* write new ticket value stored previously in tickets  */
247        PARMCI_Put(newval, mutex_ticket, len, proc);
248        MEM_FENCE;
249 }
250 
251 
252 /*\  Acquire mutex  for "proc"
253  *   -must be executed in hrecv/AM handler thread
254  *   -application thread must use generic_lock routine
255 \*/
armci_server_lock_mutex(int mutex,int proc,msg_tag_t tag)256 int armci_server_lock_mutex(int mutex, int proc, msg_tag_t tag)
257 {
258 int myturn;
259 int *mutex_ticket, next_in_line, len=sizeof(int);
260 int owner = armci_me;
261 
262 
263       if(DEBUG)fprintf(stderr,"SLOCK=%d owner=%d p=%d m=%d\n",
264                        armci_me,owner, proc,mutex);
265 
266       mutex_ticket= glob_mutex[owner].turn + mutex;
267       myturn = register_in_mutex_queue(mutex, owner);
268 
269       armci_copy(mutex_ticket, &next_in_line, len);
270 
271       if(next_in_line > myturn)
272          armci_die2("armci-s: problem with tickets",myturn,next_in_line);
273 
274       if(next_in_line != myturn){
275            if(!blocked)armci_serv_mutex_create();
276            blocked[proc].mutex = mutex;
277            blocked[proc].turn = myturn;
278            blocked[proc].tag  = tag;
279            if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d blocked (%d,%d)\n",
280                                      armci_me, proc, next_in_line,myturn);
281            return -1;
282 
283       } else {
284 
285            if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d sending ticket (%d)\n",
286                                                        armci_me, proc, myturn);
287 
288            /* send ticket to requesting node */
289            /* GA_SEND_REPLY(tag, &myturn, sizeof(int), proc); */
290            return (myturn);
291       }
292 }
293 
294 
295 
296 /*\  Release mutex "id" held by proc
297  *   called from hrecv/AM handler AND application thread
298 \*/
armci_server_unlock_mutex(int mutex,int proc,int Ticket,msg_tag_t * ptag)299 int armci_server_unlock_mutex(int mutex, int proc, int Ticket, msg_tag_t* ptag)
300 {
301 #define NOBODY -1
302 int owner = armci_me;
303 int i, p=NOBODY, *mutex_ticket= glob_mutex[owner].turn + mutex;
304 int len=sizeof(int);
305 
306      if(DEBUG) fprintf(stderr,"SUNLOCK=%d node=%d mutex=%d ticket=%d\n",
307                        armci_me,proc,mutex,Ticket);
308 
309      Ticket++;
310      armci_copy(&Ticket, mutex_ticket, len);
311 
312      /* if mutex is free then nobody is reqistered in queue */
313      if(armci_mutex_free(mutex, proc))  return -1;
314 
315      /* search for the next process in queue waiting for this mutex */
316      for(i=0; i< armci_nproc; i++){
317         if(!blocked)break; /* not allocated yet - nobody is waiting */
318         if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d list=(%d,%d)\n",
319                           armci_me, i, blocked[i].mutex, blocked[i].turn);
320         if((blocked[i].mutex == mutex) && (blocked[i].turn == Ticket)){
321            p = i;
322            break;
323         }
324      }
325 
326      /* send Ticket to a process waiting for mutex */
327      if(p != NOBODY){
328         if(p == armci_me)armci_die("server_unlock: cannot unlock self",0);
329         else {
330 
331           if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d unlock ticket=%d go=%d\n",
332                                          armci_me, proc, Ticket, p);
333 
334           /*   GA_SEND_REPLY(blocked[p].tag, &Ticket, sizeof(int), p); */
335           *ptag = blocked[p].tag;
336           return p;
337 
338         }
339      }
340 
341      return -1; /* nobody is waiting */
342 }
343 
344 
345 
PARMCI_Lock(int mutex,int proc)346 void PARMCI_Lock(int mutex, int proc)
347 {
348 #if defined(SERVER_LOCK)
349 int direct;
350 #endif
351 
352         if(DEBUG)fprintf(stderr,"%d enter lock\n",armci_me);
353 
354         if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
355 
356         if(mutex > glob_mutex[proc].count)
357            armci_die2("armci_lock: mutex not allocated", mutex,
358                       glob_mutex[proc].count);
359 
360         if(armci_nproc == 1) return;
361 
362 #       if defined(SERVER_LOCK)
363            direct=SAMECLUSNODE(proc);
364            if(!direct)
365               armci_rem_lock(mutex,proc, glob_mutex[proc].tickets + mutex );
366            else
367 #       endif
368               armci_generic_lock(mutex,proc);
369 
370         if(DEBUG)fprintf(stderr,"%d leave lock\n",armci_me);
371 }
372 
373 
374 
PARMCI_Unlock(int mutex,int proc)375 void PARMCI_Unlock(int mutex, int proc)
376 {
377         if(DEBUG)fprintf(stderr,"%d enter unlock\n",armci_me);
378 
379         if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
380 
381         if(mutex > glob_mutex[proc].count)
382            armci_die2("armci_lock: mutex not allocated", mutex,
383                       glob_mutex[proc].count);
384 
385         if(armci_nproc == 1) return;
386 
387 #       if defined(SERVER_LOCK)
388            if(armci_nclus >1) {
389 	     if(proc != armci_me)
390                armci_rem_unlock(mutex, proc, glob_mutex[proc].tickets[mutex]);
391 	     else {
392 	       int ticket = glob_mutex[proc].tickets[mutex];
393 	       msg_tag_t tag;
394 	       int waiting;
395 
396 	       waiting = armci_server_unlock_mutex(mutex, proc, ticket, &tag);
397 	       if(waiting >-1)
398 		 armci_unlock_waiting_process(tag, waiting, ++ticket);
399 	     }
400 	   }
401 	   else
402 #       endif
403 	     armci_generic_unlock(mutex, proc);
404 
405         if(DEBUG)fprintf(stderr,"%d leave unlock\n",armci_me);
406 }
407