1 /*
2  *
3  *  This file is part of MUMPS 4.10.0, built on Tue May 10 12:56:32 UTC 2011
4  *
5  *
6  *  This version of MUMPS is provided to you free of charge. It is public
7  *  domain, based on public domain software developed during the Esprit IV
8  *  European project PARASOL (1996-1999). Since this first public domain
9  *  version in 1999, research and developments have been supported by the
10  *  following institutions: CERFACS, CNRS, ENS Lyon, INPT(ENSEEIHT)-IRIT,
11  *  INRIA, and University of Bordeaux.
12  *
13  *  The MUMPS team at the moment of releasing this version includes
14  *  Patrick Amestoy, Maurice Bremond, Alfredo Buttari, Abdou Guermouche,
15  *  Guillaume Joslin, Jean-Yves L'Excellent, Francois-Henry Rouet, Bora
16  *  Ucar and Clement Weisbecker.
17  *
18  *  We are also grateful to Emmanuel Agullo, Caroline Bousquet, Indranil
19  *  Chowdhury, Philippe Combes, Christophe Daniel, Iain Duff, Vincent Espirat,
20  *  Aurelia Fevre, Jacko Koster, Stephane Pralet, Chiara Puglisi, Gregoire
21  *  Richard, Tzvetomila Slavova, Miroslav Tuma and Christophe Voemel who
22  *  have been contributing to this project.
23  *
24  *  Up-to-date copies of the MUMPS package can be obtained
25  *  from the Web pages:
26  *  http://mumps.enseeiht.fr/  or  http://graal.ens-lyon.fr/MUMPS
27  *
28  *
29  *   THIS MATERIAL IS PROVIDED AS IS, WITH ABSOLUTELY NO WARRANTY
30  *   EXPRESSED OR IMPLIED. ANY USE IS AT YOUR OWN RISK.
31  *
32  *
33  *  User documentation of any code that uses this software can
34  *  include this complete notice. You can acknowledge (using
35  *  references [1] and [2]) the contribution of this package
36  *  in any scientific publication dependent upon the use of the
37  *  package. You shall use reasonable endeavours to notify
38  *  the authors of the package of this publication.
39  *
40  *   [1] P. R. Amestoy, I. S. Duff, J. Koster and  J.-Y. L'Excellent,
41  *   A fully asynchronous multifrontal solver using distributed dynamic
42  *   scheduling, SIAM Journal of Matrix Analysis and Applications,
43  *   Vol 23, No 1, pp 15-41 (2001).
44  *
45  *   [2] P. R. Amestoy and A. Guermouche and J.-Y. L'Excellent and
46  *   S. Pralet, Hybrid scheduling for the parallel solution of linear
47  *   systems. Parallel Computing Vol 32 (2), pp 136-156 (2006).
48  *
49  */
50 #include "mumps_io_basic.h"
51 #include "mumps_io_err.h"
52 #include "mumps_io_thread.h"
53 #if ! defined(MUMPS_WIN32) && ! defined(WITHOUT_PTHREAD)
54 /* Exported global variables */
55 int io_flag_stop,current_req_num;
56 pthread_t io_thread,main_thread;
57 pthread_mutex_t io_mutex;
58 pthread_cond_t cond_io,cond_nb_free_finished_requests,cond_nb_free_active_requests,cond_stop;
59 pthread_mutex_t io_mutex_cond;
60 int int_sem_io,int_sem_nb_free_finished_requests,int_sem_nb_free_active_requests,int_sem_stop;
61 int with_sem;
62 struct request_io *io_queue;
63 int first_active,last_active,nb_active;
64 int *finished_requests_inode,*finished_requests_id,first_finished_requests,
65   last_finished_requests,nb_finished_requests,smallest_request_id;
66 int mumps_owns_mutex;
67 int test_request_called_from_mumps;
68 /* Other global variables */
69 double inactive_time_io_thread;
70 int time_flag_io_thread;
71 struct timeval origin_time_io_thread;
72 /**
73  * Main function of the io thread when semaphores are used.
74  */
mumps_async_thread_function_with_sem(void * arg)75 void*  mumps_async_thread_function_with_sem (void* arg){
76    struct request_io *current_io_request;
77    int ierr,_sem_stop;
78    struct timeval start_time,end_time;
79    int ret_code;
80    for (;;){
81      gettimeofday(&start_time,NULL);
82        if(with_sem==2){
83          mumps_wait_sem(&int_sem_io,&cond_io);
84        }
85      /*     sem_wait(&sem_io);  */
86      gettimeofday(&end_time,NULL);
87      if(time_flag_io_thread){
88        inactive_time_io_thread=inactive_time_io_thread+((double)end_time.tv_sec+((double)end_time.tv_usec/1000000))-((double)start_time.tv_sec+((double)start_time.tv_usec/1000000));
89      }else{
90        inactive_time_io_thread=((double)end_time.tv_sec+((double)end_time.tv_usec/1000000))-((double)origin_time_io_thread.tv_sec+((double)origin_time_io_thread.tv_usec/1000000));
91      }
92      if(!time_flag_io_thread){
93        time_flag_io_thread=1;
94      }
95      /* Check if the main thread ordered to stop this slave thread */
96      /*     sem_getvalue(&sem_stop,&_sem_stop); */
97        if(with_sem==2){
98          mumps_get_sem(&int_sem_stop,&_sem_stop);
99        }
100      if(_sem_stop==IO_FLAG_STOP){
101        /* The thread must stop */
102        break; /* Breaks the while loop. */
103      }
104       current_io_request=&io_queue[first_active];
105       switch(current_io_request->io_type)
106         {
107          case IO_WRITE:
108            ret_code=mumps_io_do_write_block(current_io_request->addr,
109                                      current_io_request->size,
110                                      &(current_io_request->file_type),
111                                      current_io_request->vaddr,
112                                      &ierr);
113            if(ret_code<0){
114              goto end;
115            }
116            break;
117          case IO_READ:
118            ret_code=mumps_io_do_read_block(current_io_request->addr,
119            current_io_request->size,
120            &(current_io_request->file_type),
121            current_io_request->vaddr,
122            &ierr);
123            if(ret_code<0){
124              goto end;
125            }
126            break;
127          default:
128            printf("Error : Mumps_IO : Operation %d is neither READ nor WRITE\n",current_io_request->io_type);
129            exit (-3);
130          }
131       /* Notify that the IO was performed */
132       /* Wait that finished_requests queue could register
133        the notification */
134        if(with_sem==2){
135          mumps_wait_sem(&int_sem_nb_free_finished_requests,&cond_nb_free_finished_requests);
136        }
137       pthread_mutex_lock(&io_mutex);
138       /* Updates active queue bounds */
139       /* Register the notification in finished_requests queue
140        and updates its bounds. */
141       finished_requests_id[last_finished_requests]=current_io_request->req_num;
142       finished_requests_inode[last_finished_requests]=current_io_request->inode;
143       last_finished_requests=(last_finished_requests+1)%(MAX_FINISH_REQ); /* ??? */
144       nb_finished_requests++;
145       /* Realeases the lock : ***UNLOCK*** */
146       nb_active--;
147       if(first_active<MAX_IO-1){
148         first_active++;
149       }
150       else{
151         first_active=0;
152       }
153       if(with_sem==2){
154         mumps_post_sem(&(current_io_request->int_local_cond),&(current_io_request->local_cond));
155       }
156       pthread_mutex_unlock(&io_mutex);
157       /* Finally increases the number of free active requests.*/
158       /*      sem_post(&sem_nb_free_active_requests); */
159           mumps_post_sem(&int_sem_nb_free_active_requests,&cond_nb_free_active_requests);
160    }
161  end:
162    /* The main thread ordered the end of the IO thread (it changed sem_stop).
163     We exit. */
164    pthread_exit(NULL);
165 /* Not reached */
166    return NULL;
167 }
mumps_test_request_th(int * request_id,int * flag)168 int mumps_test_request_th(int* request_id,int *flag){
169   /* Tests if the request "request_id" has finished. It sets the flag  */
170   /* argument to 1 if the request has finished (0 otherwise)           */
171   int request_pos;
172   int i;
173   i=mumps_check_error_th();
174   if(i!=0){
175     return i;
176   }
177   pthread_mutex_lock(&io_mutex);
178   /*  printf("entering test !!! \n"); */
179   if(*request_id < smallest_request_id){
180     *flag=1;
181     /*    exit (-2); */
182   }else{
183     if(nb_finished_requests==0){
184       *flag=0;
185     }else{
186       request_pos=(first_finished_requests+nb_finished_requests-1)%(MAX_IO*2);
187       if(*request_id > finished_requests_id[request_pos]){
188         /*the request has not been treated yet since it is not in
189         the list of treated requests*/
190         i=0;
191         /*this loop is only for checking (no special treatment is done*/
192         while(i<nb_active){
193           request_pos=(first_active+i)%(MAX_IO);
194           if(io_queue[request_pos].req_num==*request_id){
195             break;
196           }
197           i++;
198         }
199         if(i==nb_active){
200           return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_test_request_th (1))\n");
201         }
202         *flag=0;
203       }else{
204         /*the request has been treated yet since it is the list of
205           treated requests*/
206         i=0;
207         while(i<nb_finished_requests){
208           request_pos=(first_finished_requests+i)%(MAX_IO*2);
209           if(finished_requests_id[request_pos]==*request_id){
210             break;
211           }
212           i++;
213         }
214         if(i==nb_finished_requests){
215           return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_test_request_th (2))\n");
216         }
217         *flag=1;
218       }
219     }
220   }
221   /*  printf("je vais essayer de nettoyer %d\n",nb_finished_requests);
222     pthread_mutex_unlock(&io_mutex);
223     sleep (10); */
224   mumps_owns_mutex=1;
225   /* printf("ici nb_finished_requests=%d\n",nb_finished_requests);*/
226   mumps_clean_finished_queue_th();
227   mumps_owns_mutex=0;
228   pthread_mutex_unlock(&io_mutex);
229   return 0;
230 }
mumps_wait_req_sem_th(int * request_id)231 int mumps_wait_req_sem_th(int *request_id){
232   int i,j;
233   j=first_active;
234   for(i=0;i<nb_active;i++){
235     if(io_queue[j].req_num==*request_id) break;
236     j=(j+1)%MAX_IO;
237   }
238   if(i<nb_active){
239     mumps_wait_sem(&(io_queue[j].int_local_cond),&(io_queue[j].local_cond));
240   }
241   return 0;
242 }
mumps_wait_request_th(int * request_id)243 int mumps_wait_request_th(int *request_id){
244   /* waits for the termination of the request "request_id" */
245   int flag=0,ierr;
246   if(with_sem!=2){
247     while(!flag){
248       ierr=mumps_test_request_th(request_id,&flag);
249       if(ierr!=0)return ierr;
250     }
251   }else{
252     ierr=mumps_test_request_th(request_id,&flag);
253     if(ierr!=0)return ierr;
254     if(!flag){
255       mumps_wait_req_sem_th(request_id);
256       ierr=mumps_test_request_th(request_id,&flag);
257       if(ierr!=0)return ierr;
258     }
259   }
260   return 0;
261 }
mumps_is_there_finished_request_th(int * flag)262 int mumps_is_there_finished_request_th(int* flag){
263   if(!mumps_owns_mutex) pthread_mutex_lock(&io_mutex);
264   if(nb_finished_requests==0){
265     *flag=0;
266   }else{
267     *flag=1;
268     /*    printf("finished : %d\n",nb_finished_requests);     */
269   }
270   if(!mumps_owns_mutex) pthread_mutex_unlock(&io_mutex);
271   return 0;
272 }
mumps_clean_finished_queue_th()273 int mumps_clean_finished_queue_th(){
274    /* Cleans the finished request queue. On exit, the queue is empty.*/
275    int local_flag;
276    int cur_req;
277    int loc_owned_mutex=0,ierr;
278    if(!mumps_owns_mutex){
279       pthread_mutex_lock(&io_mutex);
280       mumps_owns_mutex=1;
281       loc_owned_mutex=1;
282   }
283   /* this block of code is designed for avoiding deadlocks between
284      the two threads*/
285    mumps_is_there_finished_request_th(&local_flag);
286    while(local_flag){
287      ierr=mumps_clean_request_th(&cur_req);
288      if(ierr!=0){
289        return ierr;
290      }
291      mumps_is_there_finished_request_th(&local_flag);
292    }
293    if((!mumps_owns_mutex)||(loc_owned_mutex)){
294       pthread_mutex_unlock(&io_mutex);
295       mumps_owns_mutex=0;
296    }
297    return 0;
298 }
mumps_clean_request_th(int * request_id)299 int mumps_clean_request_th(int* request_id){
300   int ierr;
301   ierr=mumps_check_error_th();
302   if(ierr!=0){
303     return ierr;
304   }
305   if(!mumps_owns_mutex)pthread_mutex_lock(&io_mutex);
306   *request_id=finished_requests_id[first_finished_requests];
307   if(smallest_request_id!=finished_requests_id[first_finished_requests]){
308     return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_clean_request_th)\n");
309   }
310   finished_requests_id[first_finished_requests]=-9999;
311   first_finished_requests=(first_finished_requests+1)%(MAX_FINISH_REQ);
312   nb_finished_requests--;
313   /*we treat the io requests in their arrival order => we just have to
314     increase smallest_request_id*/
315   smallest_request_id++;
316   if(!mumps_owns_mutex) pthread_mutex_unlock(&io_mutex);
317   if(with_sem) {
318       if(with_sem==2){
319         mumps_post_sem(&int_sem_nb_free_finished_requests,&cond_nb_free_finished_requests);
320       }
321   }
322   return 0;
323 }
mumps_low_level_init_ooc_c_th(int * async,int * ierr)324 int mumps_low_level_init_ooc_c_th(int* async, int* ierr){
325   int i, ret_code;
326   char buf[64];
327   /* Computes the number of files needed. Uses ceil value. */
328   *ierr=0;
329   current_req_num=0;
330   with_sem=2;
331   first_active=0;
332   last_active=0;
333   nb_active=0;
334   first_finished_requests=0;
335   last_finished_requests=0;
336   nb_finished_requests=0;
337   smallest_request_id=0;
338   mumps_owns_mutex=0;
339   inactive_time_io_thread=0;
340   time_flag_io_thread=0;
341   gettimeofday(&origin_time_io_thread,NULL);
342   /*  mumps_io_flag_async=*async; */
343   if(*async!=IO_ASYNC_TH){
344     *ierr = -91;
345     sprintf(buf,"Internal error: mumps_low_level_init_ooc_c_th should not to be called with strat_IO=%d\n",*async);
346     return mumps_io_error(*ierr,buf);
347   }
348   if(*async){
349     pthread_mutex_init(&io_mutex,NULL);
350     mumps_io_init_err_lock();
351 #ifdef WITH_PFUNC
352     mumps_io_init_pointers_lock();
353 #endif
354     io_queue=(struct request_io *)malloc(MAX_IO*sizeof(struct request_io));
355     if(with_sem==2){
356       for(i=0;i<MAX_IO;i++){
357         pthread_cond_init(&(io_queue[i].local_cond),NULL);
358         io_queue[i].int_local_cond=0;
359       }
360     }
361     finished_requests_id=(int *)malloc(MAX_IO*2*sizeof(int));
362     finished_requests_inode=(int *)malloc(MAX_IO*2*sizeof(int));
363     for(i=0;i<MAX_IO*2;i++){
364       finished_requests_id[i]=-9999;
365       finished_requests_inode[i]=-9999;
366     }
367     if(with_sem){
368       switch(with_sem){
369       case 2:
370         int_sem_io=0;
371         int_sem_stop=0;
372         int_sem_nb_free_finished_requests=MAX_FINISH_REQ;
373         int_sem_nb_free_active_requests=MAX_IO;
374         pthread_cond_init(&cond_stop,NULL);
375         pthread_cond_init(&cond_io,NULL);
376         pthread_cond_init(&cond_nb_free_active_requests,NULL);
377         pthread_cond_init(&cond_nb_free_finished_requests,NULL);
378         pthread_mutex_init(&io_mutex_cond,NULL);
379         break;
380       default:
381         *ierr = -92;
382         sprintf(buf,"Internal error: mumps_low_level_init_ooc_c_th should not to be called with strat_IO=%d\n",*async);
383         return mumps_io_error(*ierr,buf);
384       }
385       ret_code=pthread_create(&io_thread,NULL,mumps_async_thread_function_with_sem,NULL);
386     }
387     if(ret_code!=0){
388       errno = ret_code;
389       return mumps_io_sys_error(-92,"Unable to create I/O thread");
390     }
391     main_thread=pthread_self();
392   }
393   return 0;
394 }
mumps_async_write_th(const int * strat_IO,void * address_block,long long block_size,int * inode,int * request_arg,int * type,long long vaddr,int * ierr)395 int mumps_async_write_th(const int * strat_IO,
396                         void * address_block,
397                         long long block_size,
398                         int * inode,
399                         int * request_arg,
400                         int * type,
401                         long long vaddr,
402                         int * ierr){
403   int cur_req;
404   *ierr=mumps_check_error_th();
405   if(*ierr!=0){
406     return *ierr;
407   }
408   if(with_sem){
409     mumps_clean_finished_queue_th();
410       if(with_sem==2){
411         mumps_wait_sem(&int_sem_nb_free_active_requests,&cond_nb_free_active_requests);
412       }
413     /*    sem_wait(&sem_nb_free_active_requests); */
414     pthread_mutex_lock(&io_mutex);
415   }
416   if(nb_active<=MAX_IO){
417     if(nb_active==0){
418       first_active=last_active;
419     }
420     else{
421       last_active=(last_active+1)%MAX_IO;
422     }
423     cur_req=last_active;
424     nb_active++;
425     io_queue[cur_req].inode=*inode;
426     io_queue[cur_req].req_num=current_req_num;
427     io_queue[cur_req].addr=address_block;
428     io_queue[cur_req].size=block_size;
429     io_queue[cur_req].vaddr=vaddr;
430     io_queue[cur_req].io_type=0;
431     io_queue[cur_req].file_type=*type;
432     if(with_sem==2){
433       io_queue[cur_req].int_local_cond=0;
434     }
435     *request_arg=current_req_num;
436     current_req_num++;
437   }else{
438     *ierr = -91;
439     return mumps_io_error(*ierr,"Internal error in OOC Management layer (mumps_async_write_th)\n");
440     /*    exit(-3);*/
441   }
442   pthread_mutex_unlock(&io_mutex);
443   if(with_sem){
444     /*    sem_post(&sem_io); */
445       if(with_sem==2){
446         mumps_post_sem(&int_sem_io,&cond_io);
447       }
448   }
449   return 0;
450 }
mumps_async_read_th(const int * strat_IO,void * address_block,long long block_size,int * inode,int * request_arg,int * type,long long vaddr,int * ierr)451 int mumps_async_read_th(const int * strat_IO,
452                        void * address_block,
453                        long long  block_size,
454                        int * inode,
455                        int * request_arg,
456                         int * type,
457                        long long vaddr,
458                        int * ierr){
459   int cur_req;
460   *ierr=mumps_check_error_th();
461   if(*ierr!=0){
462     return *ierr;
463   }
464   if(with_sem){
465     mumps_clean_finished_queue_th();
466     /* end of the block*/
467       if(with_sem==2){
468         mumps_wait_sem(&int_sem_nb_free_active_requests,&cond_nb_free_active_requests);
469       }
470     /*    sem_wait(&sem_nb_free_active_requests); */
471     pthread_mutex_lock(&io_mutex);
472   }
473   if(nb_active<MAX_IO){
474     if(nb_active==0){
475       first_active=last_active;
476     }else{
477       last_active=(last_active+1)%MAX_IO;
478       /*        if(last_active<MAX_IO-1){
479                 cur_req=last_active+1;
480                 last_active++;
481                 }else{
482                 cur_req=0;
483                 last_active=0;
484                 }*/
485     }
486     cur_req=last_active;
487     nb_active++;
488     io_queue[cur_req].inode=*inode;
489     io_queue[cur_req].req_num=current_req_num;
490     io_queue[cur_req].addr=address_block;
491     io_queue[cur_req].size=block_size;
492     io_queue[cur_req].vaddr=vaddr;
493     io_queue[cur_req].io_type=1;
494     io_queue[cur_req].file_type=*type;
495     if(with_sem==2){
496       io_queue[cur_req].int_local_cond=0;
497     }
498     *request_arg=current_req_num;
499     current_req_num++;
500   }else{
501     *ierr = -91;
502     return mumps_io_error(*ierr,"Internal error in OOC Management layer (mumps_async_read_th)\n");
503   }
504   if(with_sem){
505     /*    sem_post(&sem_io); */
506       if(with_sem==2){
507         mumps_post_sem(&int_sem_io,&cond_io);
508       }
509   }
510   pthread_mutex_unlock(&io_mutex);
511   return 0;
512 }
mumps_clean_io_data_c_th(int * myid)513 int mumps_clean_io_data_c_th(int *myid){
514   int i;
515   /* cleans the thread/io management data*/
516   if(mumps_io_flag_async){
517     /*we can use either signals or mutexes for this step */
518     if(with_sem){
519         if(with_sem==2){
520           mumps_post_sem(&int_sem_stop,&cond_stop);
521           mumps_post_sem(&int_sem_io,&cond_io);
522         }
523     }else{
524       pthread_mutex_lock(&io_mutex);
525       io_flag_stop=1;
526       pthread_mutex_unlock(&io_mutex);
527     }
528     pthread_join(io_thread,NULL);
529     pthread_mutex_destroy(&io_mutex);
530     mumps_io_destroy_err_lock();
531 #ifdef WITH_PFUNC
532     mumps_io_destroy_pointers_lock();
533 #endif
534     if(with_sem){
535         if(with_sem==2){
536           pthread_cond_destroy(&cond_stop);
537           pthread_cond_destroy(&cond_io);
538           pthread_cond_destroy(&cond_nb_free_active_requests);
539           pthread_cond_destroy(&cond_nb_free_finished_requests);
540           pthread_mutex_destroy(&io_mutex_cond);
541         }
542     }
543   }
544   if(with_sem==2){
545     for(i=0;i<MAX_IO;i++){
546       pthread_cond_destroy(&(io_queue[i].local_cond));
547     }
548   }
549   free(io_queue);
550   free(finished_requests_id);
551   free(finished_requests_inode);
552   return 0;
553 }
mumps_get_sem(void * arg,int * value)554 int mumps_get_sem(void *arg,int *value){
555   switch(with_sem){
556   case 2:
557     pthread_mutex_lock(&io_mutex_cond);
558     *value=*((int *)arg);
559     pthread_mutex_unlock(&io_mutex_cond);
560     break;
561   default:
562     return mumps_io_error(-91,"Internal error in OOC Management layer (mumps__get_sem)\n");
563   }
564   return 0;
565 }
mumps_wait_sem(void * arg,pthread_cond_t * cond)566 int mumps_wait_sem(void *arg,pthread_cond_t *cond){
567   int *tmp_pointer;
568   switch(with_sem){
569   case 2:
570     pthread_mutex_lock(&io_mutex_cond);
571     tmp_pointer=(int *)arg;
572     while(*tmp_pointer==0){
573       pthread_cond_wait(cond, &io_mutex_cond);
574     }
575     (*tmp_pointer)--;
576     pthread_mutex_unlock(&io_mutex_cond);
577     break;
578   default:
579     return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_wait_sem)\n");
580   }
581   return 0;
582 }
mumps_post_sem(void * arg,pthread_cond_t * cond)583 int mumps_post_sem(void *arg,pthread_cond_t *cond){
584   int *tmp_pointer;
585   switch(with_sem){
586   case 2:
587     pthread_mutex_lock(&io_mutex_cond);
588     tmp_pointer=(int *)arg;
589     (*tmp_pointer)++;
590     if(*tmp_pointer==1){
591       pthread_cond_broadcast(cond);
592     }
593     pthread_mutex_unlock(&io_mutex_cond);
594     break;
595   default:
596     return mumps_io_error(-91,"Internal error in OOC Management layer (mumps_post_sem)\n");
597   }
598   return 0;
599 }
600 #endif /* MUMPS_WIN32 && WITHOUT_PTHREAD */
601