1 /*
2 *
3 * This file is part of MUMPS 4.10.0, built on Tue May 10 12:56:32 UTC 2011
4 *
5 *
6 * This version of MUMPS is provided to you free of charge. It is public
7 * domain, based on public domain software developed during the Esprit IV
8 * European project PARASOL (1996-1999). Since this first public domain
9 * version in 1999, research and developments have been supported by the
10 * following institutions: CERFACS, CNRS, ENS Lyon, INPT(ENSEEIHT)-IRIT,
11 * INRIA, and University of Bordeaux.
12 *
13 * The MUMPS team at the moment of releasing this version includes
14 * Patrick Amestoy, Maurice Bremond, Alfredo Buttari, Abdou Guermouche,
15 * Guillaume Joslin, Jean-Yves L'Excellent, Francois-Henry Rouet, Bora
16 * Ucar and Clement Weisbecker.
17 *
18 * We are also grateful to Emmanuel Agullo, Caroline Bousquet, Indranil
19 * Chowdhury, Philippe Combes, Christophe Daniel, Iain Duff, Vincent Espirat,
20 * Aurelia Fevre, Jacko Koster, Stephane Pralet, Chiara Puglisi, Gregoire
21 * Richard, Tzvetomila Slavova, Miroslav Tuma and Christophe Voemel who
22 * have been contributing to this project.
23 *
24 * Up-to-date copies of the MUMPS package can be obtained
25 * from the Web pages:
26 * http://mumps.enseeiht.fr/ or http://graal.ens-lyon.fr/MUMPS
27 *
28 *
29 * THIS MATERIAL IS PROVIDED AS IS, WITH ABSOLUTELY NO WARRANTY
30 * EXPRESSED OR IMPLIED. ANY USE IS AT YOUR OWN RISK.
31 *
32 *
33 * User documentation of any code that uses this software can
34 * include this complete notice. You can acknowledge (using
35 * references [1] and [2]) the contribution of this package
36 * in any scientific publication dependent upon the use of the
37 * package. You shall use reasonable endeavours to notify
38 * the authors of the package of this publication.
39 *
40 * [1] P. R. Amestoy, I. S. Duff, J. Koster and J.-Y. L'Excellent,
41 * A fully asynchronous multifrontal solver using distributed dynamic
42 * scheduling, SIAM Journal of Matrix Analysis and Applications,
43 * Vol 23, No 1, pp 15-41 (2001).
44 *
45 * [2] P. R. Amestoy and A. Guermouche and J.-Y. L'Excellent and
46 * S. Pralet, Hybrid scheduling for the parallel solution of linear
47 * systems. Parallel Computing Vol 32 (2), pp 136-156 (2006).
48 *
49 */
50 #include "mumps_io_basic.h"
51 #include "mumps_io_err.h"
52 #include "mumps_io_thread.h"
53 #if ! defined(MUMPS_WIN32) && ! defined(WITHOUT_PTHREAD)
54 /* Exported global variables */
55 int io_flag_stop,current_req_num;
56 pthread_t io_thread,main_thread;
57 pthread_mutex_t io_mutex;
58 pthread_cond_t cond_io,cond_nb_free_finished_requests,cond_nb_free_active_requests,cond_stop;
59 pthread_mutex_t io_mutex_cond;
60 int int_sem_io,int_sem_nb_free_finished_requests,int_sem_nb_free_active_requests,int_sem_stop;
61 int with_sem;
62 struct request_io *io_queue;
63 int first_active,last_active,nb_active;
64 int *finished_requests_inode,*finished_requests_id,first_finished_requests,
65 last_finished_requests,nb_finished_requests,smallest_request_id;
66 int mumps_owns_mutex;
67 int test_request_called_from_mumps;
68 /* Other global variables */
69 double inactive_time_io_thread;
70 int time_flag_io_thread;
71 struct timeval origin_time_io_thread;
72 /**
73 * Main function of the io thread when semaphores are used.
74 */
mumps_async_thread_function_with_sem(void * arg)75 void* mumps_async_thread_function_with_sem (void* arg){
76 struct request_io *current_io_request;
77 int ierr,_sem_stop;
78 struct timeval start_time,end_time;
79 int ret_code;
80 for (;;){
81 gettimeofday(&start_time,NULL);
82 if(with_sem==2){
83 mumps_wait_sem(&int_sem_io,&cond_io);
84 }
85 /* sem_wait(&sem_io); */
86 gettimeofday(&end_time,NULL);
87 if(time_flag_io_thread){
88 inactive_time_io_thread=inactive_time_io_thread+((double)end_time.tv_sec+((double)end_time.tv_usec/1000000))-((double)start_time.tv_sec+((double)start_time.tv_usec/1000000));
89 }else{
90 inactive_time_io_thread=((double)end_time.tv_sec+((double)end_time.tv_usec/1000000))-((double)origin_time_io_thread.tv_sec+((double)origin_time_io_thread.tv_usec/1000000));
91 }
92 if(!time_flag_io_thread){
93 time_flag_io_thread=1;
94 }
95 /* Check if the main thread ordered to stop this slave thread */
96 /* sem_getvalue(&sem_stop,&_sem_stop); */
97 if(with_sem==2){
98 mumps_get_sem(&int_sem_stop,&_sem_stop);
99 }
100 if(_sem_stop==IO_FLAG_STOP){
101 /* The thread must stop */
102 break; /* Breaks the while loop. */
103 }
104 current_io_request=&io_queue[first_active];
105 switch(current_io_request->io_type)
106 {
107 case IO_WRITE:
108 ret_code=mumps_io_do_write_block(current_io_request->addr,
109 current_io_request->size,
110 &(current_io_request->file_type),
111 current_io_request->vaddr,
112 &ierr);
113 if(ret_code<0){
114 goto end;
115 }
116 break;
117 case IO_READ:
118 ret_code=mumps_io_do_read_block(current_io_request->addr,
119 current_io_request->size,
120 &(current_io_request->file_type),
121 current_io_request->vaddr,
122 &ierr);
123 if(ret_code<0){
124 goto end;
125 }
126 break;
127 default:
128 printf("Error : Mumps_IO : Operation %d is neither READ nor WRITE\n",current_io_request->io_type);
129 exit (-3);
130 }
131 /* Notify that the IO was performed */
132 /* Wait that finished_requests queue could register
133 the notification */
134 if(with_sem==2){
135 mumps_wait_sem(&int_sem_nb_free_finished_requests,&cond_nb_free_finished_requests);
136 }
137 pthread_mutex_lock(&io_mutex);
138 /* Updates active queue bounds */
139 /* Register the notification in finished_requests queue
140 and updates its bounds. */
141 finished_requests_id[last_finished_requests]=current_io_request->req_num;
142 finished_requests_inode[last_finished_requests]=current_io_request->inode;
143 last_finished_requests=(last_finished_requests+1)%(MAX_FINISH_REQ); /* ??? */
144 nb_finished_requests++;
145 /* Realeases the lock : ***UNLOCK*** */
146 nb_active--;
147 if(first_active<MAX_IO-1){
148 first_active++;
149 }
150 else{
151 first_active=0;
152 }
153 if(with_sem==2){
154 mumps_post_sem(&(current_io_request->int_local_cond),&(current_io_request->local_cond));
155 }
156 pthread_mutex_unlock(&io_mutex);
157 /* Finally increases the number of free active requests.*/
158 /* sem_post(&sem_nb_free_active_requests); */
159 mumps_post_sem(&int_sem_nb_free_active_requests,&cond_nb_free_active_requests);
160 }
161 end:
162 /* The main thread ordered the end of the IO thread (it changed sem_stop).
163 We exit. */
164 pthread_exit(NULL);
165 /* Not reached */
166 return NULL;
167 }
mumps_test_request_th(int * request_id,int * flag)168 int mumps_test_request_th(int* request_id,int *flag){
169 /* Tests if the request "request_id" has finished. It sets the flag */
170 /* argument to 1 if the request has finished (0 otherwise) */
171 int request_pos;
172 int i;
173 i=mumps_check_error_th();
174 if(i!=0){
175 return i;
176 }
177 pthread_mutex_lock(&io_mutex);
178 /* printf("entering test !!! \n"); */
179 if(*request_id < smallest_request_id){
180 *flag=1;
181 /* exit (-2); */
182 }else{
183 if(nb_finished_requests==0){
184 *flag=0;
185 }else{
186 request_pos=(first_finished_requests+nb_finished_requests-1)%(MAX_IO*2);
187 if(*request_id > finished_requests_id[request_pos]){
188 /*the request has not been treated yet since it is not in
189 the list of treated requests*/
190 i=0;
191 /*this loop is only for checking (no special treatment is done*/
192 while(i<nb_active){
193 request_pos=(first_active+i)%(MAX_IO);
194 if(io_queue[request_pos].req_num==*request_id){
195 break;
196 }
197 i++;
198 }
199 if(i==nb_active){
200 return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_test_request_th (1))\n");
201 }
202 *flag=0;
203 }else{
204 /*the request has been treated yet since it is the list of
205 treated requests*/
206 i=0;
207 while(i<nb_finished_requests){
208 request_pos=(first_finished_requests+i)%(MAX_IO*2);
209 if(finished_requests_id[request_pos]==*request_id){
210 break;
211 }
212 i++;
213 }
214 if(i==nb_finished_requests){
215 return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_test_request_th (2))\n");
216 }
217 *flag=1;
218 }
219 }
220 }
221 /* printf("je vais essayer de nettoyer %d\n",nb_finished_requests);
222 pthread_mutex_unlock(&io_mutex);
223 sleep (10); */
224 mumps_owns_mutex=1;
225 /* printf("ici nb_finished_requests=%d\n",nb_finished_requests);*/
226 mumps_clean_finished_queue_th();
227 mumps_owns_mutex=0;
228 pthread_mutex_unlock(&io_mutex);
229 return 0;
230 }
mumps_wait_req_sem_th(int * request_id)231 int mumps_wait_req_sem_th(int *request_id){
232 int i,j;
233 j=first_active;
234 for(i=0;i<nb_active;i++){
235 if(io_queue[j].req_num==*request_id) break;
236 j=(j+1)%MAX_IO;
237 }
238 if(i<nb_active){
239 mumps_wait_sem(&(io_queue[j].int_local_cond),&(io_queue[j].local_cond));
240 }
241 return 0;
242 }
mumps_wait_request_th(int * request_id)243 int mumps_wait_request_th(int *request_id){
244 /* waits for the termination of the request "request_id" */
245 int flag=0,ierr;
246 if(with_sem!=2){
247 while(!flag){
248 ierr=mumps_test_request_th(request_id,&flag);
249 if(ierr!=0)return ierr;
250 }
251 }else{
252 ierr=mumps_test_request_th(request_id,&flag);
253 if(ierr!=0)return ierr;
254 if(!flag){
255 mumps_wait_req_sem_th(request_id);
256 ierr=mumps_test_request_th(request_id,&flag);
257 if(ierr!=0)return ierr;
258 }
259 }
260 return 0;
261 }
mumps_is_there_finished_request_th(int * flag)262 int mumps_is_there_finished_request_th(int* flag){
263 if(!mumps_owns_mutex) pthread_mutex_lock(&io_mutex);
264 if(nb_finished_requests==0){
265 *flag=0;
266 }else{
267 *flag=1;
268 /* printf("finished : %d\n",nb_finished_requests); */
269 }
270 if(!mumps_owns_mutex) pthread_mutex_unlock(&io_mutex);
271 return 0;
272 }
mumps_clean_finished_queue_th()273 int mumps_clean_finished_queue_th(){
274 /* Cleans the finished request queue. On exit, the queue is empty.*/
275 int local_flag;
276 int cur_req;
277 int loc_owned_mutex=0,ierr;
278 if(!mumps_owns_mutex){
279 pthread_mutex_lock(&io_mutex);
280 mumps_owns_mutex=1;
281 loc_owned_mutex=1;
282 }
283 /* this block of code is designed for avoiding deadlocks between
284 the two threads*/
285 mumps_is_there_finished_request_th(&local_flag);
286 while(local_flag){
287 ierr=mumps_clean_request_th(&cur_req);
288 if(ierr!=0){
289 return ierr;
290 }
291 mumps_is_there_finished_request_th(&local_flag);
292 }
293 if((!mumps_owns_mutex)||(loc_owned_mutex)){
294 pthread_mutex_unlock(&io_mutex);
295 mumps_owns_mutex=0;
296 }
297 return 0;
298 }
mumps_clean_request_th(int * request_id)299 int mumps_clean_request_th(int* request_id){
300 int ierr;
301 ierr=mumps_check_error_th();
302 if(ierr!=0){
303 return ierr;
304 }
305 if(!mumps_owns_mutex)pthread_mutex_lock(&io_mutex);
306 *request_id=finished_requests_id[first_finished_requests];
307 if(smallest_request_id!=finished_requests_id[first_finished_requests]){
308 return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_clean_request_th)\n");
309 }
310 finished_requests_id[first_finished_requests]=-9999;
311 first_finished_requests=(first_finished_requests+1)%(MAX_FINISH_REQ);
312 nb_finished_requests--;
313 /*we treat the io requests in their arrival order => we just have to
314 increase smallest_request_id*/
315 smallest_request_id++;
316 if(!mumps_owns_mutex) pthread_mutex_unlock(&io_mutex);
317 if(with_sem) {
318 if(with_sem==2){
319 mumps_post_sem(&int_sem_nb_free_finished_requests,&cond_nb_free_finished_requests);
320 }
321 }
322 return 0;
323 }
mumps_low_level_init_ooc_c_th(int * async,int * ierr)324 int mumps_low_level_init_ooc_c_th(int* async, int* ierr){
325 int i, ret_code;
326 char buf[64];
327 /* Computes the number of files needed. Uses ceil value. */
328 *ierr=0;
329 current_req_num=0;
330 with_sem=2;
331 first_active=0;
332 last_active=0;
333 nb_active=0;
334 first_finished_requests=0;
335 last_finished_requests=0;
336 nb_finished_requests=0;
337 smallest_request_id=0;
338 mumps_owns_mutex=0;
339 inactive_time_io_thread=0;
340 time_flag_io_thread=0;
341 gettimeofday(&origin_time_io_thread,NULL);
342 /* mumps_io_flag_async=*async; */
343 if(*async!=IO_ASYNC_TH){
344 *ierr = -91;
345 sprintf(buf,"Internal error: mumps_low_level_init_ooc_c_th should not to be called with strat_IO=%d\n",*async);
346 return mumps_io_error(*ierr,buf);
347 }
348 if(*async){
349 pthread_mutex_init(&io_mutex,NULL);
350 mumps_io_init_err_lock();
351 #ifdef WITH_PFUNC
352 mumps_io_init_pointers_lock();
353 #endif
354 io_queue=(struct request_io *)malloc(MAX_IO*sizeof(struct request_io));
355 if(with_sem==2){
356 for(i=0;i<MAX_IO;i++){
357 pthread_cond_init(&(io_queue[i].local_cond),NULL);
358 io_queue[i].int_local_cond=0;
359 }
360 }
361 finished_requests_id=(int *)malloc(MAX_IO*2*sizeof(int));
362 finished_requests_inode=(int *)malloc(MAX_IO*2*sizeof(int));
363 for(i=0;i<MAX_IO*2;i++){
364 finished_requests_id[i]=-9999;
365 finished_requests_inode[i]=-9999;
366 }
367 if(with_sem){
368 switch(with_sem){
369 case 2:
370 int_sem_io=0;
371 int_sem_stop=0;
372 int_sem_nb_free_finished_requests=MAX_FINISH_REQ;
373 int_sem_nb_free_active_requests=MAX_IO;
374 pthread_cond_init(&cond_stop,NULL);
375 pthread_cond_init(&cond_io,NULL);
376 pthread_cond_init(&cond_nb_free_active_requests,NULL);
377 pthread_cond_init(&cond_nb_free_finished_requests,NULL);
378 pthread_mutex_init(&io_mutex_cond,NULL);
379 break;
380 default:
381 *ierr = -92;
382 sprintf(buf,"Internal error: mumps_low_level_init_ooc_c_th should not to be called with strat_IO=%d\n",*async);
383 return mumps_io_error(*ierr,buf);
384 }
385 ret_code=pthread_create(&io_thread,NULL,mumps_async_thread_function_with_sem,NULL);
386 }
387 if(ret_code!=0){
388 errno = ret_code;
389 return mumps_io_sys_error(-92,"Unable to create I/O thread");
390 }
391 main_thread=pthread_self();
392 }
393 return 0;
394 }
mumps_async_write_th(const int * strat_IO,void * address_block,long long block_size,int * inode,int * request_arg,int * type,long long vaddr,int * ierr)395 int mumps_async_write_th(const int * strat_IO,
396 void * address_block,
397 long long block_size,
398 int * inode,
399 int * request_arg,
400 int * type,
401 long long vaddr,
402 int * ierr){
403 int cur_req;
404 *ierr=mumps_check_error_th();
405 if(*ierr!=0){
406 return *ierr;
407 }
408 if(with_sem){
409 mumps_clean_finished_queue_th();
410 if(with_sem==2){
411 mumps_wait_sem(&int_sem_nb_free_active_requests,&cond_nb_free_active_requests);
412 }
413 /* sem_wait(&sem_nb_free_active_requests); */
414 pthread_mutex_lock(&io_mutex);
415 }
416 if(nb_active<=MAX_IO){
417 if(nb_active==0){
418 first_active=last_active;
419 }
420 else{
421 last_active=(last_active+1)%MAX_IO;
422 }
423 cur_req=last_active;
424 nb_active++;
425 io_queue[cur_req].inode=*inode;
426 io_queue[cur_req].req_num=current_req_num;
427 io_queue[cur_req].addr=address_block;
428 io_queue[cur_req].size=block_size;
429 io_queue[cur_req].vaddr=vaddr;
430 io_queue[cur_req].io_type=0;
431 io_queue[cur_req].file_type=*type;
432 if(with_sem==2){
433 io_queue[cur_req].int_local_cond=0;
434 }
435 *request_arg=current_req_num;
436 current_req_num++;
437 }else{
438 *ierr = -91;
439 return mumps_io_error(*ierr,"Internal error in OOC Management layer (mumps_async_write_th)\n");
440 /* exit(-3);*/
441 }
442 pthread_mutex_unlock(&io_mutex);
443 if(with_sem){
444 /* sem_post(&sem_io); */
445 if(with_sem==2){
446 mumps_post_sem(&int_sem_io,&cond_io);
447 }
448 }
449 return 0;
450 }
mumps_async_read_th(const int * strat_IO,void * address_block,long long block_size,int * inode,int * request_arg,int * type,long long vaddr,int * ierr)451 int mumps_async_read_th(const int * strat_IO,
452 void * address_block,
453 long long block_size,
454 int * inode,
455 int * request_arg,
456 int * type,
457 long long vaddr,
458 int * ierr){
459 int cur_req;
460 *ierr=mumps_check_error_th();
461 if(*ierr!=0){
462 return *ierr;
463 }
464 if(with_sem){
465 mumps_clean_finished_queue_th();
466 /* end of the block*/
467 if(with_sem==2){
468 mumps_wait_sem(&int_sem_nb_free_active_requests,&cond_nb_free_active_requests);
469 }
470 /* sem_wait(&sem_nb_free_active_requests); */
471 pthread_mutex_lock(&io_mutex);
472 }
473 if(nb_active<MAX_IO){
474 if(nb_active==0){
475 first_active=last_active;
476 }else{
477 last_active=(last_active+1)%MAX_IO;
478 /* if(last_active<MAX_IO-1){
479 cur_req=last_active+1;
480 last_active++;
481 }else{
482 cur_req=0;
483 last_active=0;
484 }*/
485 }
486 cur_req=last_active;
487 nb_active++;
488 io_queue[cur_req].inode=*inode;
489 io_queue[cur_req].req_num=current_req_num;
490 io_queue[cur_req].addr=address_block;
491 io_queue[cur_req].size=block_size;
492 io_queue[cur_req].vaddr=vaddr;
493 io_queue[cur_req].io_type=1;
494 io_queue[cur_req].file_type=*type;
495 if(with_sem==2){
496 io_queue[cur_req].int_local_cond=0;
497 }
498 *request_arg=current_req_num;
499 current_req_num++;
500 }else{
501 *ierr = -91;
502 return mumps_io_error(*ierr,"Internal error in OOC Management layer (mumps_async_read_th)\n");
503 }
504 if(with_sem){
505 /* sem_post(&sem_io); */
506 if(with_sem==2){
507 mumps_post_sem(&int_sem_io,&cond_io);
508 }
509 }
510 pthread_mutex_unlock(&io_mutex);
511 return 0;
512 }
mumps_clean_io_data_c_th(int * myid)513 int mumps_clean_io_data_c_th(int *myid){
514 int i;
515 /* cleans the thread/io management data*/
516 if(mumps_io_flag_async){
517 /*we can use either signals or mutexes for this step */
518 if(with_sem){
519 if(with_sem==2){
520 mumps_post_sem(&int_sem_stop,&cond_stop);
521 mumps_post_sem(&int_sem_io,&cond_io);
522 }
523 }else{
524 pthread_mutex_lock(&io_mutex);
525 io_flag_stop=1;
526 pthread_mutex_unlock(&io_mutex);
527 }
528 pthread_join(io_thread,NULL);
529 pthread_mutex_destroy(&io_mutex);
530 mumps_io_destroy_err_lock();
531 #ifdef WITH_PFUNC
532 mumps_io_destroy_pointers_lock();
533 #endif
534 if(with_sem){
535 if(with_sem==2){
536 pthread_cond_destroy(&cond_stop);
537 pthread_cond_destroy(&cond_io);
538 pthread_cond_destroy(&cond_nb_free_active_requests);
539 pthread_cond_destroy(&cond_nb_free_finished_requests);
540 pthread_mutex_destroy(&io_mutex_cond);
541 }
542 }
543 }
544 if(with_sem==2){
545 for(i=0;i<MAX_IO;i++){
546 pthread_cond_destroy(&(io_queue[i].local_cond));
547 }
548 }
549 free(io_queue);
550 free(finished_requests_id);
551 free(finished_requests_inode);
552 return 0;
553 }
mumps_get_sem(void * arg,int * value)554 int mumps_get_sem(void *arg,int *value){
555 switch(with_sem){
556 case 2:
557 pthread_mutex_lock(&io_mutex_cond);
558 *value=*((int *)arg);
559 pthread_mutex_unlock(&io_mutex_cond);
560 break;
561 default:
562 return mumps_io_error(-91,"Internal error in OOC Management layer (mumps__get_sem)\n");
563 }
564 return 0;
565 }
mumps_wait_sem(void * arg,pthread_cond_t * cond)566 int mumps_wait_sem(void *arg,pthread_cond_t *cond){
567 int *tmp_pointer;
568 switch(with_sem){
569 case 2:
570 pthread_mutex_lock(&io_mutex_cond);
571 tmp_pointer=(int *)arg;
572 while(*tmp_pointer==0){
573 pthread_cond_wait(cond, &io_mutex_cond);
574 }
575 (*tmp_pointer)--;
576 pthread_mutex_unlock(&io_mutex_cond);
577 break;
578 default:
579 return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_wait_sem)\n");
580 }
581 return 0;
582 }
mumps_post_sem(void * arg,pthread_cond_t * cond)583 int mumps_post_sem(void *arg,pthread_cond_t *cond){
584 int *tmp_pointer;
585 switch(with_sem){
586 case 2:
587 pthread_mutex_lock(&io_mutex_cond);
588 tmp_pointer=(int *)arg;
589 (*tmp_pointer)++;
590 if(*tmp_pointer==1){
591 pthread_cond_broadcast(cond);
592 }
593 pthread_mutex_unlock(&io_mutex_cond);
594 break;
595 default:
596 return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_post_sem)\n");
597 }
598 return 0;
599 }
600 #endif /* MUMPS_WIN32 && WITHOUT_PTHREAD */
601