1 #if HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 /* $Id: mutex.c,v 1.24.10.1 2006-12-21 23:50:48 manoj Exp $ */
6 #include "armcip.h"
7 #include "copy.h"
8 #include "request.h"
9 #if HAVE_STDIO_H
10 # include <stdio.h>
11 #endif
12
13 #define DEBUG 0
14 #define MAX_LOCKS 32768
15 #define SPINMAX 1000
16
17 #if defined(LAPI) || defined(GM)
18 # define SERVER_LOCK
19 #endif
20
21 double _dummy_work_=0.;
22 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
23 int mymutexcount;
24 double _dummy_server_work_=0.;
25 #endif
26 static int num_mutexes=0, *tickets;
27
28 typedef struct {
29 int mutex;
30 int turn;
31 msg_tag_t tag;
32 } waiting_list_t;
33
34
35 /* data structure to store info about blocked (waiting) process for mutex */
36 static waiting_list_t* blocked=(waiting_list_t*)0;
37
38
39 typedef struct {
40 int* token;
41 int* turn;
42 int* tickets;
43 int count;
44 } mutex_entry_t;
45
46 void** mutex_mem_ar;
47 mutex_entry_t *glob_mutex;
48
49
PARMCI_Create_mutexes(int num)50 int PARMCI_Create_mutexes(int num)
51 {
52 int rc,p, totcount;
53 int *mutex_count;
54
55 if (num < 0 || num > MAX_LOCKS) return(FAIL);
56 if(num_mutexes) armci_die("mutexes already created",num_mutexes);
57
58 if(armci_nproc == 1){ num_mutexes=1; return(0); }
59
60 mutex_count = malloc(sizeof(int)*armci_nproc);
61 dassert(1, mutex_count);
62
63 /* local memory allocation for mutex arrays*/
64 mutex_mem_ar = (void*) malloc(armci_nproc*sizeof(void*));
65 if(!mutex_mem_ar) armci_die("PARMCI_Create_mutexes: malloc failed",0);
66 glob_mutex = (void*)malloc(armci_nproc*sizeof(mutex_entry_t));
67 if(!glob_mutex){
68 free(mutex_count);
69 free(mutex_mem_ar);
70 armci_die("PARMCI_Create_mutexes: malloc 2 failed",0);
71 }
72
73
74 /* bzero(mutex_count,armci_nproc*sizeof(int));*/
75 bzero((char*)mutex_count,sizeof(int)*armci_nproc);
76
77 /* find out how many mutexes everybody allocated */
78 mutex_count[armci_me]=num;
79 armci_msg_igop(mutex_count, armci_nproc, "+");
80 for(p=totcount=0; p< armci_nproc; p++)totcount+=mutex_count[p];
81
82 tickets = calloc(totcount,sizeof(int));
83 if(!tickets) {
84 free(glob_mutex);
85 free(mutex_mem_ar);
86 free(mutex_count);
87 return(FAIL2);
88 }
89
90 /* we need memory for token and turn - 2 ints */
91 rc = PARMCI_Malloc(mutex_mem_ar,2*num*sizeof(int));
92 if(rc){
93 free(glob_mutex);
94 free(mutex_mem_ar);
95 free(tickets);
96 free(mutex_count);
97 return(FAIL3);
98 }
99
100 if(num)bzero((char*)mutex_mem_ar[armci_me],2*num*sizeof(int));
101
102 /* setup global mutex array */
103 for(p=totcount=0; p< armci_nproc; p++){
104 glob_mutex[p].token = mutex_mem_ar[p];
105 glob_mutex[p].turn = glob_mutex[p].token + mutex_count[p];
106 glob_mutex[p].count = mutex_count[p];
107 glob_mutex[p].tickets = tickets + totcount;
108 totcount += mutex_count[p];
109 }
110
111 num_mutexes= totcount;
112 #ifdef LAPI
113 mymutexcount = num;
114 #endif
115 PARMCI_Barrier();
116
117 if(DEBUG)
118 fprintf(stderr,"%d created (%d,%d) mutexes\n",armci_me,num,totcount);
119
120 free(mutex_count);
121 return(0);
122 }
123
124
armci_serv_mutex_create()125 void armci_serv_mutex_create()
126 {
127 int mem = armci_nproc*sizeof(waiting_list_t);
128 blocked = (waiting_list_t*)malloc(mem);
129 if(!blocked) armci_die("armci server:error allocating mutex memory ",0);
130 }
131
132
armci_serv_mutex_close()133 void armci_serv_mutex_close()
134 {
135 if(blocked) free(blocked );
136 blocked = (waiting_list_t*)0;
137 }
138
139
PARMCI_Destroy_mutexes()140 int PARMCI_Destroy_mutexes()
141 {
142 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
143 int proc, mutex, i,factor=0;
144 #endif
145 if(num_mutexes==0)armci_die("armci_destroy_mutexes: not created",0);
146 if(armci_nproc == 1) return(0);
147
148 armci_msg_barrier();
149
150 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
151 for(proc=0;proc<armci_nproc;proc++){
152 for(mutex=0;mutex<glob_mutex[proc].count;mutex++){
153 _dummy_server_work_ = 0.; /* must be global to fool the compiler */
154 while(!armci_mutex_free(mutex,proc)){
155 for(i=0; i< SPINMAX *factor; i++) _dummy_server_work_ += 1.;
156 factor+=1;
157 }
158 }
159 }
160 #endif
161 num_mutexes=0;
162
163 # if defined(SERVER_LOCK)
164 armci_serv_mutex_close();
165 # endif
166
167 if(glob_mutex[armci_me].count)PARMCI_Free(glob_mutex[armci_me].token);
168
169 free(tickets);
170 free(glob_mutex);
171 free(mutex_mem_ar);
172
173 return(0);
174 }
175
176
register_in_mutex_queue(int id,int proc)177 static int register_in_mutex_queue(int id, int proc)
178 {
179 int *mutex_entry, ticket;
180
181 if(glob_mutex[proc].count < id)
182 armci_die2("armci:invalid mutex id",id, glob_mutex[proc].count);
183 mutex_entry = glob_mutex[proc].token + id;
184 PARMCI_Rmw(ARMCI_FETCH_AND_ADD, &ticket, mutex_entry, 1, proc);
185
186 return ticket;
187 }
188
189
190 /*\ check if mutex is available by comparing turn and token
191 * can only be called by a process on the same SMP node as proc
192 \*/
armci_mutex_free(int mutex,int proc)193 static int armci_mutex_free(int mutex, int proc)
194 {
195 volatile int *mutex_ticket=glob_mutex[proc].turn + mutex;
196 volatile int *turn = glob_mutex[proc].token +mutex;
197
198 /* here we will put code to check if other processes on the node
199 * are waiting for this mutex
200 * lockinfo_node[me].ticket = mutex_ticket;
201 * lockinfo_node[me].mutex = mutex;
202 */
203
204 if(*mutex_ticket == *turn) return 1;
205 else return 0;
206 }
207
208
209
armci_generic_lock(int mutex,int proc)210 static void armci_generic_lock(int mutex, int proc)
211 {
212 int i, myturn, factor=0, len=sizeof(int);
213 int *mutex_ticket, next_in_line;
214
215 mutex_ticket= glob_mutex[proc].turn + mutex;
216 myturn = register_in_mutex_queue(mutex, proc);
217
218 /* code to reduce cost of unlocking mutex on the same SMP node goes here
219 * lockinfo_node[me].ticket = mutex_ticket;
220 * lockinfo_node[me].mutex = mutex;
221 */
222
223 _dummy_work_ = 0.; /* must be global to fool the compiler */
224 do {
225
226 PARMCI_Get(mutex_ticket, &next_in_line, len, proc);
227 if(next_in_line > myturn)
228 armci_die2("armci: problem with tickets",myturn,next_in_line);
229
230 /* apply a linear backoff delay before retrying */
231 for(i=0; i< SPINMAX * factor; i++) _dummy_work_ += 1.;
232
233 factor += 1;
234
235 }while (myturn != next_in_line);
236
237 glob_mutex[proc].tickets[mutex] = myturn; /* save ticket value */
238 }
239
240
armci_generic_unlock(int mutex,int proc)241 static void armci_generic_unlock(int mutex, int proc)
242 {
243 int *mutex_ticket= glob_mutex[proc].turn + mutex;
244 int *newval = glob_mutex[proc].tickets +mutex;
245 int len=sizeof(int);
246
247 /* update ticket for next process requesting this mutex */
248 (*newval) ++;
249
250 /* write new ticket value stored previously in tickets */
251 PARMCI_Put(newval, mutex_ticket, len, proc);
252 MEM_FENCE;
253 }
254
255
256 /*\ Acquire mutex for "proc"
257 * -must be executed in hrecv/AM handler thread
258 * -application thread must use generic_lock routine
259 \*/
armci_server_lock_mutex(int mutex,int proc,msg_tag_t tag)260 int armci_server_lock_mutex(int mutex, int proc, msg_tag_t tag)
261 {
262 int myturn;
263 int *mutex_ticket, next_in_line, len=sizeof(int);
264 int owner = armci_me;
265
266
267 if(DEBUG)fprintf(stderr,"SLOCK=%d owner=%d p=%d m=%d\n",
268 armci_me,owner, proc,mutex);
269
270 mutex_ticket= glob_mutex[owner].turn + mutex;
271 myturn = register_in_mutex_queue(mutex, owner);
272
273 armci_copy(mutex_ticket, &next_in_line, len);
274
275 if(next_in_line > myturn)
276 armci_die2("armci-s: problem with tickets",myturn,next_in_line);
277
278 if(next_in_line != myturn){
279 if(!blocked)armci_serv_mutex_create();
280 blocked[proc].mutex = mutex;
281 blocked[proc].turn = myturn;
282 blocked[proc].tag = tag;
283 if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d blocked (%d,%d)\n",
284 armci_me, proc, next_in_line,myturn);
285 return -1;
286
287 } else {
288
289 if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d sending ticket (%d)\n",
290 armci_me, proc, myturn);
291
292 /* send ticket to requesting node */
293 /* GA_SEND_REPLY(tag, &myturn, sizeof(int), proc); */
294 return (myturn);
295 }
296 }
297
298
299
300 /*\ Release mutex "id" held by proc
301 * called from hrecv/AM handler AND application thread
302 \*/
armci_server_unlock_mutex(int mutex,int proc,int Ticket,msg_tag_t * ptag)303 int armci_server_unlock_mutex(int mutex, int proc, int Ticket, msg_tag_t* ptag)
304 {
305 #define NOBODY -1
306 int owner = armci_me;
307 int i, p=NOBODY, *mutex_ticket= glob_mutex[owner].turn + mutex;
308 int len=sizeof(int);
309
310 if(DEBUG) fprintf(stderr,"SUNLOCK=%d node=%d mutex=%d ticket=%d\n",
311 armci_me,proc,mutex,Ticket);
312
313 Ticket++;
314 armci_copy(&Ticket, mutex_ticket, len);
315
316 /* if mutex is free then nobody is reqistered in queue */
317 if(armci_mutex_free(mutex, proc)) return -1;
318
319 /* search for the next process in queue waiting for this mutex */
320 for(i=0; i< armci_nproc; i++){
321 if(!blocked)break; /* not allocated yet - nobody is waiting */
322 if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d list=(%d,%d)\n",
323 armci_me, i, blocked[i].mutex, blocked[i].turn);
324 if((blocked[i].mutex == mutex) && (blocked[i].turn == Ticket)){
325 p = i;
326 break;
327 }
328 }
329
330 /* send Ticket to a process waiting for mutex */
331 if(p != NOBODY){
332 if(p == armci_me)armci_die("server_unlock: cannot unlock self",0);
333 else {
334
335 if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d unlock ticket=%d go=%d\n",
336 armci_me, proc, Ticket, p);
337
338 /* GA_SEND_REPLY(blocked[p].tag, &Ticket, sizeof(int), p); */
339 *ptag = blocked[p].tag;
340 return p;
341
342 }
343 }
344
345 return -1; /* nobody is waiting */
346 }
347
348
349
PARMCI_Lock(int mutex,int proc)350 void PARMCI_Lock(int mutex, int proc)
351 {
352 #if defined(SERVER_LOCK)
353 int direct;
354 #endif
355
356 if(DEBUG)fprintf(stderr,"%d enter lock\n",armci_me);
357 if(armci_nproc == 1) return;
358
359 if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
360
361 if(mutex > glob_mutex[proc].count)
362 armci_die2("armci_lock: mutex not allocated", mutex,
363 glob_mutex[proc].count);
364
365 # if defined(SERVER_LOCK)
366 direct=SAMECLUSNODE(proc);
367 if(!direct)
368 armci_rem_lock(mutex,proc, glob_mutex[proc].tickets + mutex );
369 else
370 # endif
371 armci_generic_lock(mutex,proc);
372
373 if(DEBUG)fprintf(stderr,"%d leave lock\n",armci_me);
374 }
375
376
377
PARMCI_Unlock(int mutex,int proc)378 void PARMCI_Unlock(int mutex, int proc)
379 {
380 if(DEBUG)fprintf(stderr,"%d enter unlock\n",armci_me);
381 if(armci_nproc == 1) return;
382
383 if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
384
385 if(mutex > glob_mutex[proc].count)
386 armci_die2("armci_lock: mutex not allocated", mutex,
387 glob_mutex[proc].count);
388
389 # if defined(SERVER_LOCK)
390 if(armci_nclus >1) {
391 if(proc != armci_me)
392 armci_rem_unlock(mutex, proc, glob_mutex[proc].tickets[mutex]);
393 else {
394 int ticket = glob_mutex[proc].tickets[mutex];
395 msg_tag_t tag;
396 int waiting;
397
398 waiting = armci_server_unlock_mutex(mutex, proc, ticket, &tag);
399 if(waiting >-1)
400 armci_unlock_waiting_process(tag, waiting, ++ticket);
401 }
402 }
403 else
404 # endif
405 armci_generic_unlock(mutex, proc);
406
407 if(DEBUG)fprintf(stderr,"%d leave unlock\n",armci_me);
408 }
409