1 #if HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 /* $Id: mutex.c,v 1.24.10.1 2006-12-21 23:50:48 manoj Exp $ */
6 #include "armcip.h"
7 #include "copy.h"
8 #include "request.h"
9 #include <stdio.h>
10
11 #define DEBUG 0
12 #define MAX_LOCKS 32768
13 #define SPINMAX 1000
14
15 #if defined(LAPI) || defined(GM)
16 # define SERVER_LOCK
17 #endif
18
19 double _dummy_work_=0.;
20 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
21 int mymutexcount;
22 double _dummy_server_work_=0.;
23 #endif
24 static int num_mutexes=0, *tickets;
25
26 typedef struct {
27 int mutex;
28 int turn;
29 msg_tag_t tag;
30 } waiting_list_t;
31
32
33 /* data structure to store info about blocked (waiting) process for mutex */
34 static waiting_list_t* blocked=(waiting_list_t*)0;
35
36
37 typedef struct {
38 int* token;
39 int* turn;
40 int* tickets;
41 int count;
42 } mutex_entry_t;
43
44 void** mutex_mem_ar;
45 mutex_entry_t *glob_mutex;
46
47
PARMCI_Create_mutexes(int num)48 int PARMCI_Create_mutexes(int num)
49 {
50 int rc,p, totcount;
51 int *mutex_count = (int*)armci_internal_buffer;
52
53 if((sizeof(int)*armci_nproc) > armci_getbufsize()){
54 mutex_count = (double *)malloc(sizeof(int)*armci_nproc);
55 }
56 if (num < 0 || num > MAX_LOCKS) return(FAIL);
57 if(num_mutexes) armci_die("mutexes already created",num_mutexes);
58
59 if(armci_nproc == 1){ num_mutexes=1; return(0); }
60
61 /* local memory allocation for mutex arrays*/
62 mutex_mem_ar = (void*) malloc(armci_nproc*sizeof(void*));
63 if(!mutex_mem_ar) armci_die("PARMCI_Create_mutexes: malloc failed",0);
64 glob_mutex = (void*)malloc(armci_nproc*sizeof(mutex_entry_t));
65 if(!glob_mutex){
66 free(mutex_mem_ar);
67 armci_die("PARMCI_Create_mutexes: malloc 2 failed",0);
68 }
69
70
71 /* bzero(mutex_count,armci_nproc*sizeof(int));*/
72 bzero((char*)mutex_count,sizeof(int)*armci_nproc);
73
74 /* find out how many mutexes everybody allocated */
75 mutex_count[armci_me]=num;
76 armci_msg_igop(mutex_count, armci_nproc, "+");
77 for(p=totcount=0; p< armci_nproc; p++)totcount+=mutex_count[p];
78
79 tickets = calloc(totcount,sizeof(int));
80 if(!tickets) {
81 free(glob_mutex);
82 free(mutex_mem_ar);
83 return(FAIL2);
84 }
85
86 /* we need memory for token and turn - 2 ints */
87 rc = PARMCI_Malloc(mutex_mem_ar,2*num*sizeof(int));
88 if(rc){
89 free(glob_mutex);
90 free(mutex_mem_ar);
91 free(tickets);
92 return(FAIL3);
93 }
94
95 if(num)bzero((char*)mutex_mem_ar[armci_me],2*num*sizeof(int));
96
97 /* setup global mutex array */
98 for(p=totcount=0; p< armci_nproc; p++){
99 glob_mutex[p].token = mutex_mem_ar[p];
100 glob_mutex[p].turn = glob_mutex[p].token + mutex_count[p];
101 glob_mutex[p].count = mutex_count[p];
102 glob_mutex[p].tickets = tickets + totcount;
103 totcount += mutex_count[p];
104 }
105
106 num_mutexes= totcount;
107 #ifdef LAPI
108 mymutexcount = num;
109 #endif
110 PARMCI_Barrier();
111
112 if(DEBUG)
113 fprintf(stderr,"%d created (%d,%d) mutexes\n",armci_me,num,totcount);
114
115 return(0);
116 }
117
118
armci_serv_mutex_create()119 void armci_serv_mutex_create()
120 {
121 int mem = armci_nproc*sizeof(waiting_list_t);
122 blocked = (waiting_list_t*)malloc(mem);
123 if(!blocked) armci_die("armci server:error allocating mutex memory ",0);
124 }
125
126
armci_serv_mutex_close()127 void armci_serv_mutex_close()
128 {
129 if(blocked) free(blocked );
130 blocked = (waiting_list_t*)0;
131 }
132
133
PARMCI_Destroy_mutexes()134 int PARMCI_Destroy_mutexes()
135 {
136 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
137 int proc, mutex, i,factor=0;
138 #endif
139 if(num_mutexes==0)armci_die("armci_destroy_mutexes: not created",0);
140 if(armci_nproc == 1) return(0);
141
142 armci_msg_barrier();
143
144 #ifdef LAPI /*fix to if cmpl handler for a pending unlock runs after destroy*/
145 for(proc=0;proc<armci_nproc;proc++){
146 for(mutex=0;mutex<glob_mutex[proc].count;mutex++){
147 _dummy_server_work_ = 0.; /* must be global to fool the compiler */
148 while(!armci_mutex_free(mutex,proc)){
149 for(i=0; i< SPINMAX *factor; i++) _dummy_server_work_ += 1.;
150 factor+=1;
151 }
152 }
153 }
154 #endif
155 num_mutexes=0;
156
157 # if defined(SERVER_LOCK)
158 armci_serv_mutex_close();
159 # endif
160
161 // RMO: Forcing PARMCI_Free to be a collective operation
162 // if(glob_mutex[armci_me].count)
163 PARMCI_Free(glob_mutex[armci_me].token);
164
165 free(tickets);
166 free(glob_mutex);
167 free(mutex_mem_ar);
168
169 return(0);
170 }
171
172
register_in_mutex_queue(int id,int proc)173 static int register_in_mutex_queue(int id, int proc)
174 {
175 int *mutex_entry, ticket;
176
177 if(glob_mutex[proc].count < id)
178 armci_die2("armci:invalid mutex id",id, glob_mutex[proc].count);
179 mutex_entry = glob_mutex[proc].token + id;
180 PARMCI_Rmw(ARMCI_FETCH_AND_ADD, &ticket, mutex_entry, 1, proc);
181
182 return ticket;
183 }
184
185
186 /*\ check if mutex is available by comparing turn and token
187 * can only be called by a process on the same SMP node as proc
188 \*/
armci_mutex_free(int mutex,int proc)189 static int armci_mutex_free(int mutex, int proc)
190 {
191 volatile int *mutex_ticket=glob_mutex[proc].turn + mutex;
192 volatile int *turn = glob_mutex[proc].token +mutex;
193
194 /* here we will put code to check if other processes on the node
195 * are waiting for this mutex
196 * lockinfo_node[me].ticket = mutex_ticket;
197 * lockinfo_node[me].mutex = mutex;
198 */
199
200 if(*mutex_ticket == *turn) return 1;
201 else return 0;
202 }
203
204
205
armci_generic_lock(int mutex,int proc)206 static void armci_generic_lock(int mutex, int proc)
207 {
208 int i, myturn, factor=0, len=sizeof(int);
209 int *mutex_ticket, next_in_line;
210
211 mutex_ticket= glob_mutex[proc].turn + mutex;
212 myturn = register_in_mutex_queue(mutex, proc);
213
214 /* code to reduce cost of unlocking mutex on the same SMP node goes here
215 * lockinfo_node[me].ticket = mutex_ticket;
216 * lockinfo_node[me].mutex = mutex;
217 */
218
219 _dummy_work_ = 0.; /* must be global to fool the compiler */
220 do {
221
222 PARMCI_Get(mutex_ticket, &next_in_line, len, proc);
223 if(next_in_line > myturn)
224 armci_die2("armci: problem with tickets",myturn,next_in_line);
225
226 /* apply a linear backoff delay before retrying */
227 for(i=0; i< SPINMAX * factor; i++) _dummy_work_ += 1.;
228
229 factor += 1;
230
231 }while (myturn != next_in_line);
232
233 glob_mutex[proc].tickets[mutex] = myturn; /* save ticket value */
234 }
235
236
armci_generic_unlock(int mutex,int proc)237 static void armci_generic_unlock(int mutex, int proc)
238 {
239 int *mutex_ticket= glob_mutex[proc].turn + mutex;
240 int *newval = glob_mutex[proc].tickets +mutex;
241 int len=sizeof(int);
242
243 /* update ticket for next process requesting this mutex */
244 (*newval) ++;
245
246 /* write new ticket value stored previously in tickets */
247 PARMCI_Put(newval, mutex_ticket, len, proc);
248 MEM_FENCE;
249 }
250
251
252 /*\ Acquire mutex for "proc"
253 * -must be executed in hrecv/AM handler thread
254 * -application thread must use generic_lock routine
255 \*/
armci_server_lock_mutex(int mutex,int proc,msg_tag_t tag)256 int armci_server_lock_mutex(int mutex, int proc, msg_tag_t tag)
257 {
258 int myturn;
259 int *mutex_ticket, next_in_line, len=sizeof(int);
260 int owner = armci_me;
261
262
263 if(DEBUG)fprintf(stderr,"SLOCK=%d owner=%d p=%d m=%d\n",
264 armci_me,owner, proc,mutex);
265
266 mutex_ticket= glob_mutex[owner].turn + mutex;
267 myturn = register_in_mutex_queue(mutex, owner);
268
269 armci_copy(mutex_ticket, &next_in_line, len);
270
271 if(next_in_line > myturn)
272 armci_die2("armci-s: problem with tickets",myturn,next_in_line);
273
274 if(next_in_line != myturn){
275 if(!blocked)armci_serv_mutex_create();
276 blocked[proc].mutex = mutex;
277 blocked[proc].turn = myturn;
278 blocked[proc].tag = tag;
279 if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d blocked (%d,%d)\n",
280 armci_me, proc, next_in_line,myturn);
281 return -1;
282
283 } else {
284
285 if(DEBUG) fprintf(stderr,"SLOCK=%d proc=%d sending ticket (%d)\n",
286 armci_me, proc, myturn);
287
288 /* send ticket to requesting node */
289 /* GA_SEND_REPLY(tag, &myturn, sizeof(int), proc); */
290 return (myturn);
291 }
292 }
293
294
295
296 /*\ Release mutex "id" held by proc
297 * called from hrecv/AM handler AND application thread
298 \*/
armci_server_unlock_mutex(int mutex,int proc,int Ticket,msg_tag_t * ptag)299 int armci_server_unlock_mutex(int mutex, int proc, int Ticket, msg_tag_t* ptag)
300 {
301 #define NOBODY -1
302 int owner = armci_me;
303 int i, p=NOBODY, *mutex_ticket= glob_mutex[owner].turn + mutex;
304 int len=sizeof(int);
305
306 if(DEBUG) fprintf(stderr,"SUNLOCK=%d node=%d mutex=%d ticket=%d\n",
307 armci_me,proc,mutex,Ticket);
308
309 Ticket++;
310 armci_copy(&Ticket, mutex_ticket, len);
311
312 /* if mutex is free then nobody is reqistered in queue */
313 if(armci_mutex_free(mutex, proc)) return -1;
314
315 /* search for the next process in queue waiting for this mutex */
316 for(i=0; i< armci_nproc; i++){
317 if(!blocked)break; /* not allocated yet - nobody is waiting */
318 if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d list=(%d,%d)\n",
319 armci_me, i, blocked[i].mutex, blocked[i].turn);
320 if((blocked[i].mutex == mutex) && (blocked[i].turn == Ticket)){
321 p = i;
322 break;
323 }
324 }
325
326 /* send Ticket to a process waiting for mutex */
327 if(p != NOBODY){
328 if(p == armci_me)armci_die("server_unlock: cannot unlock self",0);
329 else {
330
331 if(DEBUG)fprintf(stderr,"SUNLOCK=%d node=%d unlock ticket=%d go=%d\n",
332 armci_me, proc, Ticket, p);
333
334 /* GA_SEND_REPLY(blocked[p].tag, &Ticket, sizeof(int), p); */
335 *ptag = blocked[p].tag;
336 return p;
337
338 }
339 }
340
341 return -1; /* nobody is waiting */
342 }
343
344
345
PARMCI_Lock(int mutex,int proc)346 void PARMCI_Lock(int mutex, int proc)
347 {
348 #if defined(SERVER_LOCK)
349 int direct;
350 #endif
351
352 if(DEBUG)fprintf(stderr,"%d enter lock\n",armci_me);
353
354 if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
355
356 if(mutex > glob_mutex[proc].count)
357 armci_die2("armci_lock: mutex not allocated", mutex,
358 glob_mutex[proc].count);
359
360 if(armci_nproc == 1) return;
361
362 # if defined(SERVER_LOCK)
363 direct=SAMECLUSNODE(proc);
364 if(!direct)
365 armci_rem_lock(mutex,proc, glob_mutex[proc].tickets + mutex );
366 else
367 # endif
368 armci_generic_lock(mutex,proc);
369
370 if(DEBUG)fprintf(stderr,"%d leave lock\n",armci_me);
371 }
372
373
374
PARMCI_Unlock(int mutex,int proc)375 void PARMCI_Unlock(int mutex, int proc)
376 {
377 if(DEBUG)fprintf(stderr,"%d enter unlock\n",armci_me);
378
379 if(!num_mutexes) armci_die("armci_lock: create mutexes first",0);
380
381 if(mutex > glob_mutex[proc].count)
382 armci_die2("armci_lock: mutex not allocated", mutex,
383 glob_mutex[proc].count);
384
385 if(armci_nproc == 1) return;
386
387 # if defined(SERVER_LOCK)
388 if(armci_nclus >1) {
389 if(proc != armci_me)
390 armci_rem_unlock(mutex, proc, glob_mutex[proc].tickets[mutex]);
391 else {
392 int ticket = glob_mutex[proc].tickets[mutex];
393 msg_tag_t tag;
394 int waiting;
395
396 waiting = armci_server_unlock_mutex(mutex, proc, ticket, &tag);
397 if(waiting >-1)
398 armci_unlock_waiting_process(tag, waiting, ++ticket);
399 }
400 }
401 else
402 # endif
403 armci_generic_unlock(mutex, proc);
404
405 if(DEBUG)fprintf(stderr,"%d leave unlock\n",armci_me);
406 }
407