1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 
5 /* $Id: mutex.c,v 1.24.10.1 2006-12-21 23:50:48 manoj Exp $ */
6 #include "armcip.h"
7 #include "copy.h"
8 #include "request.h"
9 #if HAVE_STDIO_H
10 #   include <stdio.h>
11 #endif
12 
13 #define DEBUG 0
14 #define MAX_LOCKS 32768
15 #define SPINMAX 1000
16 
17 #if defined(LAPI) || defined(GM)
18 #  define SERVER_LOCK
19 #endif
20 
21 double _dummy_work_=0.;
22 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
23 int mymutexcount;
24 double _dummy_server_work_=0.;
25 #endif
26 static int num_mutexes=0, *tickets;
27 
28 typedef struct {
29        int mutex;
30        int turn;
31        msg_tag_t tag;
32 } waiting_list_t;
33 
34 
35 /* data structure to store info about blocked (waiting) process for mutex */
36 static waiting_list_t* blocked=(waiting_list_t*)0;
37 
38 
39 typedef struct {
40 int* token;
41 int* turn;
42 int* tickets;
43 int count;
44 } mutex_entry_t;
45 
46 void** mutex_mem_ar;
47 mutex_entry_t *glob_mutex;
48 
49 
PARMCI_Create_mutexes(int num)50 int PARMCI_Create_mutexes(int num)
51 {
52 int rc,p, totcount;
53  int *mutex_count;
54 
55 	if (num < 0 || num > MAX_LOCKS) return(FAIL);
56         if(num_mutexes) armci_die("mutexes already created",num_mutexes);
57 
58         if(armci_nproc == 1){  num_mutexes=1; return(0); }
59 
60 	mutex_count = malloc(sizeof(int)*armci_nproc);
61 	dassert(1, mutex_count);
62 
63         /* local memory allocation for mutex arrays*/
64         mutex_mem_ar = (void*) malloc(armci_nproc*sizeof(void*));
65         if(!mutex_mem_ar) armci_die("PARMCI_Create_mutexes: malloc failed",0);
66         glob_mutex = (void*)malloc(armci_nproc*sizeof(mutex_entry_t));
67         if(!glob_mutex){
68 	  free(mutex_count);
69            free(mutex_mem_ar);
70            armci_die("PARMCI_Create_mutexes: malloc 2 failed",0);
71         }
72 
73 
74 /*        bzero(mutex_count,armci_nproc*sizeof(int));*/
75         bzero((char*)mutex_count,sizeof(int)*armci_nproc);
76 
77         /* find out how many mutexes everybody allocated */
78         mutex_count[armci_me]=num;
79         armci_msg_igop(mutex_count, armci_nproc, "+");
80 	for(p=totcount=0; p< armci_nproc; p++)totcount+=mutex_count[p];
81 
82         tickets = calloc(totcount,sizeof(int));
83         if(!tickets) {
84            free(glob_mutex);
85            free(mutex_mem_ar);
86 	   free(mutex_count);
87            return(FAIL2);
88         }
89 
90         /* we need memory for token and turn - 2 ints */
91 	rc = PARMCI_Malloc(mutex_mem_ar,2*num*sizeof(int));
92         if(rc){
93            free(glob_mutex);
94            free(mutex_mem_ar);
95            free(tickets);
96 	   free(mutex_count);
97            return(FAIL3);
98         }
99 
100         if(num)bzero((char*)mutex_mem_ar[armci_me],2*num*sizeof(int));
101 
102         /* setup global mutex array */
103 	for(p=totcount=0; p< armci_nproc; p++){
104            glob_mutex[p].token   = mutex_mem_ar[p];
105            glob_mutex[p].turn    = glob_mutex[p].token + mutex_count[p];
106            glob_mutex[p].count   = mutex_count[p];
107            glob_mutex[p].tickets = tickets + totcount;
108            totcount += mutex_count[p];
109         }
110 
111         num_mutexes= totcount;
112 #ifdef LAPI
113         mymutexcount = num;
114 #endif
115         PARMCI_Barrier();
116 
117         if(DEBUG)
118            fprintf(stderr,"%d created (%d,%d) mutexes\n",armci_me,num,totcount);
119 
120 	free(mutex_count);
121         return(0);
122 }
123 
124 
armci_serv_mutex_create()125 void armci_serv_mutex_create()
126 {
127      int mem = armci_nproc*sizeof(waiting_list_t);
128      blocked = (waiting_list_t*)malloc(mem);
129      if(!blocked) armci_die("armci server:error allocating mutex memory ",0);
130 }
131 
132 
armci_serv_mutex_close()133 void armci_serv_mutex_close()
134 {
135      if(blocked) free(blocked );
136      blocked = (waiting_list_t*)0;
137 }
138 
139 
PARMCI_Destroy_mutexes()140 int PARMCI_Destroy_mutexes()
141 {
142 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
143      int proc, mutex, i,factor=0;
144 #endif
145      if(num_mutexes==0)armci_die("armci_destroy_mutexes: not created",0);
146      if(armci_nproc == 1) return(0);
147 
148      armci_msg_barrier();
149 
150 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
151      for(proc=0;proc<armci_nproc;proc++){
152           for(mutex=0;mutex<glob_mutex[proc].count;mutex++){
153             _dummy_server_work_ = 0.; /* must be global to fool the compiler */
154             while(!armci_mutex_free(mutex,proc)){
155               for(i=0; i<  SPINMAX *factor; i++) _dummy_server_work_ += 1.;
156 	      factor+=1;
157             }
158 	  }
159      }
160 #endif
161      num_mutexes=0;
162 
163 #    if defined(SERVER_LOCK)
164         armci_serv_mutex_close();
165 #    endif
166 
167      if(glob_mutex[armci_me].count)PARMCI_Free(glob_mutex[armci_me].token);
168 
169      free(tickets);
170      free(glob_mutex);
171      free(mutex_mem_ar);
172 
173      return(0);
174 }
175 
176 
register_in_mutex_queue(int id,int proc)177 static int register_in_mutex_queue(int id, int proc)
178 {
179 int *mutex_entry, ticket;
180 
181     if(glob_mutex[proc].count < id)
182        armci_die2("armci:invalid mutex id",id, glob_mutex[proc].count);
183     mutex_entry = glob_mutex[proc].token  + id;
184     PARMCI_Rmw(ARMCI_FETCH_AND_ADD, &ticket, mutex_entry, 1, proc);
185 
186     return ticket;
187 }
188 
189 
190 /*\ check if mutex is available by comparing turn and token
191  *  can only be called by a process on the same SMP node as proc
192 \*/
armci_mutex_free(int mutex,int proc)193 static int armci_mutex_free(int mutex, int proc)
194 {
195 volatile int *mutex_ticket=glob_mutex[proc].turn + mutex;
196 volatile int *turn = glob_mutex[proc].token  +mutex;
197 
198       /* here we will put code to check if other processes on the node
199        * are waiting for this mutex
200        * lockinfo_node[me].ticket = mutex_ticket;
201        * lockinfo_node[me].mutex  = mutex;
202        */
203 
204        if(*mutex_ticket == *turn) return 1;
205        else return 0;
206 }
207 
208 
209 
armci_generic_lock(int mutex,int proc)210 static void armci_generic_lock(int mutex, int proc)
211 {
212 int i, myturn, factor=0, len=sizeof(int);
213 int  *mutex_ticket, next_in_line;
214 
215       mutex_ticket= glob_mutex[proc].turn + mutex;
216       myturn = register_in_mutex_queue(mutex, proc);
217 
218       /* code to reduce cost of unlocking mutex on the same SMP node goes here
219        * lockinfo_node[me].ticket = mutex_ticket;
220        * lockinfo_node[me].mutex  = mutex;
221        */
222 
223       _dummy_work_ = 0.; /* must be global to fool the compiler */
224       do {
225 
226            PARMCI_Get(mutex_ticket, &next_in_line, len, proc);
227            if(next_in_line > myturn)
228               armci_die2("armci: problem with tickets",myturn,next_in_line);
229 
230            /* apply a linear backoff delay before retrying  */
231            for(i=0; i<  SPINMAX * factor; i++) _dummy_work_ += 1.;
232 
233            factor += 1;
234 
235       }while (myturn != next_in_line);
236 
237       glob_mutex[proc].tickets[mutex] = myturn; /* save ticket value */
238 }
239 
240 
armci_generic_unlock(int mutex,int proc)241 static void armci_generic_unlock(int mutex, int proc)
242 {
243 int *mutex_ticket= glob_mutex[proc].turn + mutex;
244 int *newval = glob_mutex[proc].tickets +mutex;
245 int len=sizeof(int);
246 
247        /* update ticket for next process requesting this mutex */
248        (*newval) ++;
249 
250        /* write new ticket value stored previously in tickets  */
251        PARMCI_Put(newval, mutex_ticket, len, proc);
252        MEM_FENCE;
253 }
254 
255 
256 /*\  Acquire mutex  for "proc"
257  *   -must be executed in hrecv/AM handler thread
258  *   -application thread must use generic_lock routine
259 \*/
armci_server_lock_mutex(int mutex,int proc,msg_tag_t tag)260 int armci_server_lock_mutex(int mutex, int proc, msg_tag_t tag)
261 {
262 int myturn;
263 int *mutex_ticket, next_in_line, len=sizeof(int);
264 int owner = armci_me;
265 
266 
267       if(DEBUG)fprintf(stderr,"SLOCK=%d owner=%d p=%d m=%d\n",
268                        armci_me,owner, proc,mutex);
269 
270       mutex_ticket= glob_mutex[owner].turn + mutex;
271       myturn = register_in_mutex_queue(mutex, owner);
272 
273       armci_copy(mutex_ticket, &next_in_line, len);
274 
275       if(next_in_line > myturn)
276          armci_die2("armci-s: problem with tickets",myturn,next_in_line);
277 
278       if(next_in_line != myturn){
279            if(!blocked)armci_serv_mutex_create();
280            blocked[proc].mutex = mutex;
281            blocked[proc].turn = myturn;
282            blocked[proc].tag  = tag;
283            if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d blocked (%d,%d)\n",
284                                      armci_me, proc, next_in_line,myturn);
285            return -1;
286 
287       } else {
288 
289            if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d sending ticket (%d)\n",
290                                                        armci_me, proc, myturn);
291 
292            /* send ticket to requesting node */
293            /* GA_SEND_REPLY(tag, &myturn, sizeof(int), proc); */
294            return (myturn);
295       }
296 }
297 
298 
299 
300 /*\  Release mutex "id" held by proc
301  *   called from hrecv/AM handler AND application thread
302 \*/
armci_server_unlock_mutex(int mutex,int proc,int Ticket,msg_tag_t * ptag)303 int armci_server_unlock_mutex(int mutex, int proc, int Ticket, msg_tag_t* ptag)
304 {
305 #define NOBODY -1
306 int owner = armci_me;
307 int i, p=NOBODY, *mutex_ticket= glob_mutex[owner].turn + mutex;
308 int len=sizeof(int);
309 
310      if(DEBUG) fprintf(stderr,"SUNLOCK=%d node=%d mutex=%d ticket=%d\n",
311                        armci_me,proc,mutex,Ticket);
312 
313      Ticket++;
314      armci_copy(&Ticket, mutex_ticket, len);
315 
316      /* if mutex is free then nobody is reqistered in queue */
317      if(armci_mutex_free(mutex, proc))  return -1;
318 
319      /* search for the next process in queue waiting for this mutex */
320      for(i=0; i< armci_nproc; i++){
321         if(!blocked)break; /* not allocated yet - nobody is waiting */
322         if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d list=(%d,%d)\n",
323                           armci_me, i, blocked[i].mutex, blocked[i].turn);
324         if((blocked[i].mutex == mutex) && (blocked[i].turn == Ticket)){
325            p = i;
326            break;
327         }
328      }
329 
330      /* send Ticket to a process waiting for mutex */
331      if(p != NOBODY){
332         if(p == armci_me)armci_die("server_unlock: cannot unlock self",0);
333         else {
334 
335           if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d unlock ticket=%d go=%d\n",
336                                          armci_me, proc, Ticket, p);
337 
338           /*   GA_SEND_REPLY(blocked[p].tag, &Ticket, sizeof(int), p); */
339           *ptag = blocked[p].tag;
340           return p;
341 
342         }
343      }
344 
345      return -1; /* nobody is waiting */
346 }
347 
348 
349 
PARMCI_Lock(int mutex,int proc)350 void PARMCI_Lock(int mutex, int proc)
351 {
352 #if defined(SERVER_LOCK)
353 int direct;
354 #endif
355 
356         if(DEBUG)fprintf(stderr,"%d enter lock\n",armci_me);
357         if(armci_nproc == 1) return;
358 
359         if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
360 
361         if(mutex > glob_mutex[proc].count)
362            armci_die2("armci_lock: mutex not allocated", mutex,
363                       glob_mutex[proc].count);
364 
365 #       if defined(SERVER_LOCK)
366            direct=SAMECLUSNODE(proc);
367            if(!direct)
368               armci_rem_lock(mutex,proc, glob_mutex[proc].tickets + mutex );
369            else
370 #       endif
371               armci_generic_lock(mutex,proc);
372 
373         if(DEBUG)fprintf(stderr,"%d leave lock\n",armci_me);
374 }
375 
376 
377 
PARMCI_Unlock(int mutex,int proc)378 void PARMCI_Unlock(int mutex, int proc)
379 {
380         if(DEBUG)fprintf(stderr,"%d enter unlock\n",armci_me);
381         if(armci_nproc == 1) return;
382 
383         if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
384 
385         if(mutex > glob_mutex[proc].count)
386            armci_die2("armci_lock: mutex not allocated", mutex,
387                       glob_mutex[proc].count);
388 
389 #       if defined(SERVER_LOCK)
390            if(armci_nclus >1) {
391 	     if(proc != armci_me)
392                armci_rem_unlock(mutex, proc, glob_mutex[proc].tickets[mutex]);
393 	     else {
394 	       int ticket = glob_mutex[proc].tickets[mutex];
395 	       msg_tag_t tag;
396 	       int waiting;
397 
398 	       waiting = armci_server_unlock_mutex(mutex, proc, ticket, &tag);
399 	       if(waiting >-1)
400 		 armci_unlock_waiting_process(tag, waiting, ++ticket);
401 	     }
402 	   }
403 	   else
404 #       endif
405 	     armci_generic_unlock(mutex, proc);
406 
407         if(DEBUG)fprintf(stderr,"%d leave unlock\n",armci_me);
408 }
409