1 /* heating scheme with threads if available
2 
3 
4 part of migrate
5 Peter Beerli 2000
6 
7 Copyright 1996-2002 Peter Beerli and Joseph Felsenstein, Seattle WA
8 Copyright 2003-2008 Peter Beerli, Tallahassee FL
9 
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
14 of the Software, and to permit persons to whom the Software is furnished to do
15 so, subject to the following conditions:
16 
17 The above copyright notice and this permission notice shall be included in all copies
18 or substantial portions of the Software.
19 
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
21 INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
22 PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
24 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
25 OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 
27 $Id: heating.c 2067 2012-07-27 20:59:32Z beerli $
28 
29 */
30 /*! \file heating.c
31 
32 Heating handler for threaded heating
33 
34 */
35 
36 #include "migration.h"
37 #include "mcmc.h"
38 #include "sighandler.h"
39 #include "random.h"
40 #ifdef mPI
41 #include "migrate_mpi.h"
42 #endif
43 
44 extern int myID; //identifier for nodes in MPI context
45 
46 #if PTHREADS   /* ==================threaded version======== */
47 
48 #include <pthread.h>
49 #include "heating.h"
50 #include "bayes.h"
51 
52 void fill_tpool (tpool_t tpool, world_fmt ** universe, int universe_size);
53 void tpool_init (tpool_t * tpoolp, int num_worker_threads, int max_queue_size,
54                  int do_not_block_when_full);
55 void thread_tree_update (void *thisworld);
56 int tpool_destroy (tpool_t tpool, int finish);
57 void tpool_thread (tpool_t tpool);
58 int tpool_synchronize (tpool_t tpool, int finish);
59 void allocate_thread_workp(long n, tpool_work_t ** workpointers);
60 
61 extern void run_one_update(world_fmt *world); //defined in main.c
62 
63 tpool_work_t **thread_work_pointers;
64 
65 void
tpool_init(tpool_t * tpoolp,int num_worker_threads,int max_queue_size,int do_not_block_when_full)66 tpool_init (tpool_t * tpoolp, int num_worker_threads, int max_queue_size,
67             int do_not_block_when_full)
68 {
69     int i, rtn;
70     tpool_t tpool;
71 
72     tpool = (tpool_t) mymalloc (sizeof (struct _tpool_t));
73 
74     tpool->num_threads = num_worker_threads;
75     tpool->max_queue_size = max_queue_size;
76     tpool->do_not_block_when_full = do_not_block_when_full;
77     tpool->threads =
78         (pthread_t *) mymalloc (sizeof (pthread_t) * num_worker_threads);
79 
80     tpool->cur_queue_size = 0;
81     tpool->queue_head = NULL;
82     tpool->queue_tail = NULL;
83     tpool->queue_closed = 0;
84     tpool->shutdown = 0;
85     tpool->done = 0;
86     if ((rtn = pthread_mutex_init (&(tpool->random_lock), NULL)) != 0)
87         error ("pthread_mutex_init for tpool->random_lock failed");
88     if ((rtn = pthread_mutex_init (&(tpool->queue_lock), NULL)) != 0)
89         error ("pthread_mutex_init for tpool->queue_lock failed");
90     if ((rtn = pthread_cond_init (&(tpool->queue_done), NULL)) != 0)
91         error ("pthread_cond_init for tpool->queue_done failed");
92     if ((rtn = pthread_cond_init (&(tpool->queue_not_empty), NULL)) != 0)
93         error ("pthread_cond_init for tpool->queue_not_empty failed");
94     if ((rtn = pthread_cond_init (&(tpool->queue_not_full), NULL)) != 0)
95         error ("pthread_cond_init for tpool->queue_not_full failed");
96     if ((rtn = pthread_cond_init (&(tpool->queue_empty), NULL)) != 0)
97         error ("pthread_cond_init for tpool->queue_empty failed");
98 
99     for (i = 0; i < tpool->num_threads; i++)
100     {
101         if ((rtn =
102                     pthread_create (&(tpool->threads[i]), NULL, (void *) tpool_thread,
103                                     (void *) tpool)) != 0)
104             error ("Thread pool creation failed with pthread_create");
105     }
106     *tpoolp = tpool;
107     thread_work_pointers = (tpool_work_t **) mycalloc(num_worker_threads, sizeof(tpool_work_t *));
108     allocate_thread_workp(num_worker_threads, thread_work_pointers);
109 }
110 
111 
allocate_thread_workp(long n,tpool_work_t ** workpointers)112 void allocate_thread_workp(long n, tpool_work_t ** workpointers)
113 {
114     // memory savings?
115   long i;
116   for(i=0;i<n;i++)
117     (workpointers)[i] = (tpool_work_t *) mymalloc (sizeof (tpool_work_t));
118 }
119 
120 int
tpool_add_work(tpool_t tpool,void * routine,void * arg,long z)121 tpool_add_work (tpool_t tpool, void *routine, void *arg, long z)
122 {
123     tpool_work_t *workp;
124     pthread_mutex_lock (&tpool->queue_lock);
125 
126     if ((tpool->cur_queue_size == tpool->max_queue_size)
127             && tpool->do_not_block_when_full)
128     {
129         pthread_mutex_unlock (&tpool->queue_lock);
130         return -1;
131     }
132     while ((tpool->cur_queue_size == tpool->max_queue_size)
133             && (!tpool->shutdown || tpool->queue_closed))
134     {
135         pthread_cond_wait (&tpool->queue_not_full, &tpool->queue_lock);
136     }
137 
138     if (tpool->shutdown || tpool->queue_closed)
139     {
140         pthread_mutex_unlock (&tpool->queue_lock);
141         return -1;
142     }
143 
144     //    workp = (tpool_work_t *) mymalloc (sizeof (tpool_work_t));
145     workp = thread_work_pointers[z];
146     workp->routine = routine;
147     workp->arg = arg;
148     workp->next = NULL;
149     if (tpool->cur_queue_size == 0)
150     {
151         tpool->queue_tail = tpool->queue_head = workp;
152         pthread_cond_broadcast (&tpool->queue_not_empty);
153     }
154     else
155     {
156         (tpool->queue_tail)->next = workp;
157         tpool->queue_tail = workp;
158     }
159     tpool->cur_queue_size++;
160     pthread_mutex_unlock (&tpool->queue_lock);
161     return 1;
162 }
163 
164 /// \brief put work for heated chains on to the heated-chain-stack
165 ///
166 /// queue the work needed to be done into the stack of heated chains
167 /// if there is an error the routien will abort with an
168 /// error "filling heating queue failed"
fill_tpool(tpool_t tpool,world_fmt ** universe,int universe_size)169 void fill_tpool (tpool_t tpool, world_fmt ** universe, int universe_size)
170 {
171     int i;
172     for (i = 0; i < universe_size; i++)
173     {
174       if (!tpool_add_work (tpool, thread_tree_update, (void *) universe[i], i))
175             error ("Filling heating queue failed");
176     }
177 }
178 
179 void
tpool_thread(tpool_t tpool)180 tpool_thread (tpool_t tpool)
181 {
182     tpool_work_t *myworkp;
183 
184     for (;;)
185     {
186         pthread_mutex_lock (&(tpool->queue_lock));
187         while ((tpool->cur_queue_size == 0) && (!tpool->shutdown))
188         {
189             pthread_cond_wait (&(tpool->queue_not_empty), &(tpool->queue_lock));
190         }
191         if (tpool->shutdown)
192         {
193             pthread_mutex_unlock (&tpool->queue_lock);
194             pthread_exit (NULL);
195         }
196         myworkp = tpool->queue_head;
197         tpool->cur_queue_size--;
198         if (tpool->cur_queue_size == 0)
199             tpool->queue_head = tpool->queue_tail = NULL;
200         else
201             tpool->queue_head = myworkp->next;
202 
203         if ((!tpool->do_not_block_when_full)
204                 && (tpool->cur_queue_size == (tpool->max_queue_size - 1)))
205             pthread_cond_broadcast (&(tpool->queue_not_full));
206 
207         if (tpool->cur_queue_size == 0)
208             pthread_cond_signal (&(tpool->queue_empty));
209 
210         pthread_mutex_unlock (&tpool->queue_lock);
211         (*(myworkp->routine)) (myworkp->arg);
212         pthread_mutex_lock (&tpool->queue_lock);
213         tpool->done++;
214         //      printf("-%i",tpool->done);
215         pthread_cond_signal (&tpool->queue_done);
216         pthread_mutex_unlock (&tpool->queue_lock);
217         //myfree(myworkp);
218     }
219 }
220 
221 /// \brief updates the heated tree and records acceptance
222 ///
223 /// updates the tree during threaded heating scheme
thread_tree_update(void * thisworld)224 void thread_tree_update (void *thisworld)
225 {
226     long i;
227     world_fmt * world = (world_fmt *) thisworld;
228     // the version commented out would increase the runtime by the interval
229     // using the heating interval in heated_swap() is not as efficient in thread but would
230     // do the right thing, but just not checking that often.
231     //    for (i = 0; i < world->options->heating_interval; i++)
232     //{
233 		run_one_update(thisworld);
234 		//  }
235 }
236 
237 ///
238 /// synchronizes the heated chains and controls the locking of chains
239 int
tpool_synchronize(tpool_t tpool,int fullnumber)240 tpool_synchronize (tpool_t tpool, int fullnumber)
241 {
242     int rtn;
243     //  printf("\nSync ");
244     if ((rtn = pthread_mutex_lock (&(tpool->queue_lock))) != 0)
245         error ("pthread_mutex_lock failed in tpool_synchronize()");
246     //  printf("{%i} ",tpool->done);
247     while (tpool->done < fullnumber)
248     {
249         //      printf("(%i [%i]) ",tpool->done,   tpool->cur_queue_size);
250         if ((rtn =
251                     pthread_cond_wait (&(tpool->queue_done),
252                                        &(tpool->queue_lock))) != 0)
253             error ("pthread_cond_wait failed in tpool_synchronize()");
254     }
255     tpool->done = 0;
256 
257     if ((rtn = pthread_mutex_unlock (&(tpool->queue_lock))) != 0)
258         error ("pthread_mutex_unlock failed in tpool_destroy()");
259     return 0;
260 }
261 
262 int
tpool_destroy(tpool_t tpool,int finish)263 tpool_destroy (tpool_t tpool, int finish)
264 {
265     int i, rtn;
266     tpool_work_t *cur_nodep;
267 
268     if ((rtn = pthread_mutex_lock (&(tpool->queue_lock))) != 0)
269         error ("pthread_mutex_lock failed in tpool_destroy()");
270 
271     if (tpool->queue_closed || tpool->shutdown)
272     {
273         if ((rtn = pthread_mutex_unlock (&(tpool->queue_lock))) != 0)
274             error ("pthread_mutex_unlock failed in tpool_destroy()");
275         return 0;
276     }
277 
278     tpool->queue_closed = 1;
279 
280     if (finish == 1)
281     {
282         while (tpool->cur_queue_size != 0)
283         {
284             if ((rtn =
285                         pthread_cond_wait (&(tpool->queue_empty),
286                                            &(tpool->queue_lock))) != 0)
287                 error ("pthread_cond_wait failed in tpool_destroy()");
288         }
289     }
290     tpool->shutdown = 1;
291 
292     if ((rtn = pthread_mutex_unlock (&(tpool->queue_lock))) != 0)
293         error ("pthread_mutex_unlock failed in tpool_destroy()");
294 
295     if ((rtn = pthread_cond_broadcast (&(tpool->queue_not_empty))) != 0)
296         error ("pthread_cond_broadcast failed for queue_not_empty()");
297     if ((rtn = pthread_cond_broadcast (&(tpool->queue_not_full))) != 0)
298         error ("pthread_cond_broadcast failed for queue_not_full");
299 
300     for (i = 0; i < tpool->num_threads; i++)
301     {
302         if ((rtn = pthread_join (tpool->threads[i], NULL)) != 0)
303             error ("pthread_join failed in tpool_destroy()");
304     }
305 
306     myfree(tpool->threads);
307     while (tpool->queue_head != NULL)
308     {
309         cur_nodep = tpool->queue_head->next;
310         tpool->queue_head = tpool->queue_head->next;
311         myfree(cur_nodep);
312     }
313     myfree(tpool);
314     return 0;
315 }
316 
317 #else /* ==================NOT threaded version======== */
318 
319 
320 
321 #endif
322 
323 #define HEATCHECKINTERVAL 1000
324 #define HEATSWAPLOW 0
325 #define HEATSWAPHIGH 10
adjust_temperatures(world_fmt ** universe,long hchains,long step,long steps)326 void adjust_temperatures(world_fmt ** universe, long hchains, long step, long steps)
327 {
328     long i;
329     const MYREAL bigger = 1.1;
330     const MYREAL smaller = 0.9;
331     const MYREAL corrsum = steps / HEATCHECKINTERVAL;
332     MYREAL *delta;
333     if(step == 0)
334     {
335         for(i=0; i< hchains; i++)
336         {
337             universe[i]->treeswapcount=0;
338             if(steps < HEATCHECKINTERVAL)
339                 universe[i]->averageheat  = 1./universe[i]->heat;
340             else
341                 universe[i]->averageheat  = 0.0;
342         }
343     }
344     else
345     {
346         if ( (step % HEATCHECKINTERVAL ) == 0)
347         {
348             universe[0]->averageheat= 1.0;
349             delta = (MYREAL *) mycalloc(hchains,sizeof(MYREAL));
350 	    // FPRINTF(stdout,"\n%f %li\n",1./universe[0]->heat,universe[0]->treeswapcount);
351             for(i=1; i< hchains; i++)
352             {
353                 universe[i]->averageheat += (1./(universe[i]->heat * corrsum ));
354 		//FPRINTF(stdout,"%f %li\n",1./universe[i]->heat,universe[i]->treeswapcount);
355                 delta[i-1] = 1./universe[i]->heat - 1./universe[i-1]->heat;
356 		if(delta[i-1] < EPSILON)
357 		  delta[i-1] = EPSILON;
358             }
359             for(i=1; i< hchains; i++)
360             {
361                 if(universe[i-1]->treeswapcount <= HEATSWAPLOW)
362                     delta[i-1] *= smaller;
363                 else
364                 {
365                     if(delta[i-1]<1000 && universe[i-1]->treeswapcount >= HEATSWAPHIGH)
366 			  delta[i-1] *= bigger;
367                 }
368             }
369             universe[0]->heat = 1.;
370             universe[0]->treeswapcount=0;
371 	    for(i=1; i< hchains; i++)
372 	      {
373 		universe[i]->heat = 1./(1./universe[i-1]->heat + delta[i-1]);
374 		universe[i]->treeswapcount=0;
375 	      }
376             myfree(delta);
377         }
378     }
379 }
380 ///
381 /// adjust temperatures using a lower and upper bound (temperature=1 and temperature=highest)
adjust_temperatures_bounded(world_fmt ** universe,long hchains,long step,long steps)382 void adjust_temperatures_bounded(world_fmt ** universe, long hchains, long step, long steps)
383 {
384   //const MYREAL corrsum = steps / HEATCHECKINTERVAL;
385     long i;
386     long deltasum;
387     MYREAL negheat = 0.0;
388     MYREAL *delta;
389     if(step == 0)
390     {
391         for(i=0; i< hchains; i++)
392         {
393             universe[i]->treeswapcount=0;
394             if(steps < HEATCHECKINTERVAL)
395                 universe[i]->averageheat  = 1./universe[i]->heat;
396             else
397                 universe[i]->averageheat  = 0.0;
398         }
399     }
400     else
401     {
402         if ( (step % HEATCHECKINTERVAL ) == 0)
403         {
404             universe[0]->averageheat= 1.0;
405             delta = (MYREAL *) mycalloc(hchains,sizeof(MYREAL));
406 #ifdef DEBUG
407 	    fprintf(stdout,"\n%i> chain 1: %f %f %li\n",myID, 1./universe[0]->heat,universe[0]->averageheat, universe[0]->treeswapcount);
408 #endif
409 	    deltasum = 0;
410             for(i=1; i< hchains; i++)
411             {
412 	      universe[i]->averageheat += HEATCHECKINTERVAL * (1./universe[i]->heat - universe[i]->averageheat) / step;
413 #ifdef DEBUG
414 	      fprintf(stdout,"%i> chain %li: %f %f %li (step=%li (%f))\n", myID, i+1, 1./universe[i]->heat,universe[i]->averageheat, universe[i]->treeswapcount, step, (MYREAL) HEATCHECKINTERVAL/step);
415 #endif
416                 delta[i-1] = (MYREAL) universe[i]->treeswapcount;
417 		if(delta[i-1] < 1.0)
418 		  delta[i-1] = 1.0;
419 		deltasum += (long) delta[i-1];
420             }
421 
422             universe[0]->heat = 1.;
423 	    universe[0]->treeswapcount=0;
424             for(i=1; i < hchains-1; i++)
425             {
426 	      negheat += delta[i-1]/deltasum;
427 	      universe[i]->heat = 1. - negheat;
428 	      universe[i]->treeswapcount=0;
429 	    }
430             myfree(delta);
431         }
432     }
433 }
434 
435 
reset_weight(world_fmt * world)436 void reset_weight(world_fmt *world)
437 {
438   long i;
439   seqmodel_fmt *seq = world->data->seq[0];
440   long endsite = seq->endsite;
441   long heated_end = (long) (endsite - endsite * 1./world->heat);
442   //MYREAL v;
443   memcpy(seq->aliasweight, seq->savealiasweight, sizeof(long)*endsite);
444   for(i=0;i<heated_end; i++)
445     {
446     //xcode   v = RANDINT(0,endsite);
447       seq->aliasweight[0] = 0 ;
448     }
449 }
450