1 /* heating scheme with threads if available
2
3
4 part of migrate
5 Peter Beerli 2000
6
7 Copyright 1996-2002 Peter Beerli and Joseph Felsenstein, Seattle WA
8 Copyright 2003-2008 Peter Beerli, Tallahassee FL
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
14 of the Software, and to permit persons to whom the Software is furnished to do
15 so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in all copies
18 or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
21 INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
22 PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
24 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
25 OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26
27 $Id: heating.c 2067 2012-07-27 20:59:32Z beerli $
28
29 */
30 /*! \file heating.c
31
32 Heating handler for threaded heating
33
34 */
35
36 #include "migration.h"
37 #include "mcmc.h"
38 #include "sighandler.h"
39 #include "random.h"
40 #ifdef mPI
41 #include "migrate_mpi.h"
42 #endif
43
44 extern int myID; //identifier for nodes in MPI context
45
46 #if PTHREADS /* ==================threaded version======== */
47
48 #include <pthread.h>
49 #include "heating.h"
50 #include "bayes.h"
51
52 void fill_tpool (tpool_t tpool, world_fmt ** universe, int universe_size);
53 void tpool_init (tpool_t * tpoolp, int num_worker_threads, int max_queue_size,
54 int do_not_block_when_full);
55 void thread_tree_update (void *thisworld);
56 int tpool_destroy (tpool_t tpool, int finish);
57 void tpool_thread (tpool_t tpool);
58 int tpool_synchronize (tpool_t tpool, int finish);
59 void allocate_thread_workp(long n, tpool_work_t ** workpointers);
60
61 extern void run_one_update(world_fmt *world); //defined in main.c
62
63 tpool_work_t **thread_work_pointers;
64
65 void
tpool_init(tpool_t * tpoolp,int num_worker_threads,int max_queue_size,int do_not_block_when_full)66 tpool_init (tpool_t * tpoolp, int num_worker_threads, int max_queue_size,
67 int do_not_block_when_full)
68 {
69 int i, rtn;
70 tpool_t tpool;
71
72 tpool = (tpool_t) mymalloc (sizeof (struct _tpool_t));
73
74 tpool->num_threads = num_worker_threads;
75 tpool->max_queue_size = max_queue_size;
76 tpool->do_not_block_when_full = do_not_block_when_full;
77 tpool->threads =
78 (pthread_t *) mymalloc (sizeof (pthread_t) * num_worker_threads);
79
80 tpool->cur_queue_size = 0;
81 tpool->queue_head = NULL;
82 tpool->queue_tail = NULL;
83 tpool->queue_closed = 0;
84 tpool->shutdown = 0;
85 tpool->done = 0;
86 if ((rtn = pthread_mutex_init (&(tpool->random_lock), NULL)) != 0)
87 error ("pthread_mutex_init for tpool->random_lock failed");
88 if ((rtn = pthread_mutex_init (&(tpool->queue_lock), NULL)) != 0)
89 error ("pthread_mutex_init for tpool->queue_lock failed");
90 if ((rtn = pthread_cond_init (&(tpool->queue_done), NULL)) != 0)
91 error ("pthread_cond_init for tpool->queue_done failed");
92 if ((rtn = pthread_cond_init (&(tpool->queue_not_empty), NULL)) != 0)
93 error ("pthread_cond_init for tpool->queue_not_empty failed");
94 if ((rtn = pthread_cond_init (&(tpool->queue_not_full), NULL)) != 0)
95 error ("pthread_cond_init for tpool->queue_not_full failed");
96 if ((rtn = pthread_cond_init (&(tpool->queue_empty), NULL)) != 0)
97 error ("pthread_cond_init for tpool->queue_empty failed");
98
99 for (i = 0; i < tpool->num_threads; i++)
100 {
101 if ((rtn =
102 pthread_create (&(tpool->threads[i]), NULL, (void *) tpool_thread,
103 (void *) tpool)) != 0)
104 error ("Thread pool creation failed with pthread_create");
105 }
106 *tpoolp = tpool;
107 thread_work_pointers = (tpool_work_t **) mycalloc(num_worker_threads, sizeof(tpool_work_t *));
108 allocate_thread_workp(num_worker_threads, thread_work_pointers);
109 }
110
111
allocate_thread_workp(long n,tpool_work_t ** workpointers)112 void allocate_thread_workp(long n, tpool_work_t ** workpointers)
113 {
114 // memory savings?
115 long i;
116 for(i=0;i<n;i++)
117 (workpointers)[i] = (tpool_work_t *) mymalloc (sizeof (tpool_work_t));
118 }
119
120 int
tpool_add_work(tpool_t tpool,void * routine,void * arg,long z)121 tpool_add_work (tpool_t tpool, void *routine, void *arg, long z)
122 {
123 tpool_work_t *workp;
124 pthread_mutex_lock (&tpool->queue_lock);
125
126 if ((tpool->cur_queue_size == tpool->max_queue_size)
127 && tpool->do_not_block_when_full)
128 {
129 pthread_mutex_unlock (&tpool->queue_lock);
130 return -1;
131 }
132 while ((tpool->cur_queue_size == tpool->max_queue_size)
133 && (!tpool->shutdown || tpool->queue_closed))
134 {
135 pthread_cond_wait (&tpool->queue_not_full, &tpool->queue_lock);
136 }
137
138 if (tpool->shutdown || tpool->queue_closed)
139 {
140 pthread_mutex_unlock (&tpool->queue_lock);
141 return -1;
142 }
143
144 // workp = (tpool_work_t *) mymalloc (sizeof (tpool_work_t));
145 workp = thread_work_pointers[z];
146 workp->routine = routine;
147 workp->arg = arg;
148 workp->next = NULL;
149 if (tpool->cur_queue_size == 0)
150 {
151 tpool->queue_tail = tpool->queue_head = workp;
152 pthread_cond_broadcast (&tpool->queue_not_empty);
153 }
154 else
155 {
156 (tpool->queue_tail)->next = workp;
157 tpool->queue_tail = workp;
158 }
159 tpool->cur_queue_size++;
160 pthread_mutex_unlock (&tpool->queue_lock);
161 return 1;
162 }
163
164 /// \brief put work for heated chains on to the heated-chain-stack
165 ///
166 /// queue the work needed to be done into the stack of heated chains
167 /// if there is an error the routien will abort with an
168 /// error "filling heating queue failed"
fill_tpool(tpool_t tpool,world_fmt ** universe,int universe_size)169 void fill_tpool (tpool_t tpool, world_fmt ** universe, int universe_size)
170 {
171 int i;
172 for (i = 0; i < universe_size; i++)
173 {
174 if (!tpool_add_work (tpool, thread_tree_update, (void *) universe[i], i))
175 error ("Filling heating queue failed");
176 }
177 }
178
179 void
tpool_thread(tpool_t tpool)180 tpool_thread (tpool_t tpool)
181 {
182 tpool_work_t *myworkp;
183
184 for (;;)
185 {
186 pthread_mutex_lock (&(tpool->queue_lock));
187 while ((tpool->cur_queue_size == 0) && (!tpool->shutdown))
188 {
189 pthread_cond_wait (&(tpool->queue_not_empty), &(tpool->queue_lock));
190 }
191 if (tpool->shutdown)
192 {
193 pthread_mutex_unlock (&tpool->queue_lock);
194 pthread_exit (NULL);
195 }
196 myworkp = tpool->queue_head;
197 tpool->cur_queue_size--;
198 if (tpool->cur_queue_size == 0)
199 tpool->queue_head = tpool->queue_tail = NULL;
200 else
201 tpool->queue_head = myworkp->next;
202
203 if ((!tpool->do_not_block_when_full)
204 && (tpool->cur_queue_size == (tpool->max_queue_size - 1)))
205 pthread_cond_broadcast (&(tpool->queue_not_full));
206
207 if (tpool->cur_queue_size == 0)
208 pthread_cond_signal (&(tpool->queue_empty));
209
210 pthread_mutex_unlock (&tpool->queue_lock);
211 (*(myworkp->routine)) (myworkp->arg);
212 pthread_mutex_lock (&tpool->queue_lock);
213 tpool->done++;
214 // printf("-%i",tpool->done);
215 pthread_cond_signal (&tpool->queue_done);
216 pthread_mutex_unlock (&tpool->queue_lock);
217 //myfree(myworkp);
218 }
219 }
220
221 /// \brief updates the heated tree and records acceptance
222 ///
223 /// updates the tree during threaded heating scheme
thread_tree_update(void * thisworld)224 void thread_tree_update (void *thisworld)
225 {
226 long i;
227 world_fmt * world = (world_fmt *) thisworld;
228 // the version commented out would increase the runtime by the interval
229 // using the heating interval in heated_swap() is not as efficient in thread but would
230 // do the right thing, but just not checking that often.
231 // for (i = 0; i < world->options->heating_interval; i++)
232 //{
233 run_one_update(thisworld);
234 // }
235 }
236
237 ///
238 /// synchronizes the heated chains and controls the locking of chains
239 int
tpool_synchronize(tpool_t tpool,int fullnumber)240 tpool_synchronize (tpool_t tpool, int fullnumber)
241 {
242 int rtn;
243 // printf("\nSync ");
244 if ((rtn = pthread_mutex_lock (&(tpool->queue_lock))) != 0)
245 error ("pthread_mutex_lock failed in tpool_synchronize()");
246 // printf("{%i} ",tpool->done);
247 while (tpool->done < fullnumber)
248 {
249 // printf("(%i [%i]) ",tpool->done, tpool->cur_queue_size);
250 if ((rtn =
251 pthread_cond_wait (&(tpool->queue_done),
252 &(tpool->queue_lock))) != 0)
253 error ("pthread_cond_wait failed in tpool_synchronize()");
254 }
255 tpool->done = 0;
256
257 if ((rtn = pthread_mutex_unlock (&(tpool->queue_lock))) != 0)
258 error ("pthread_mutex_unlock failed in tpool_destroy()");
259 return 0;
260 }
261
262 int
tpool_destroy(tpool_t tpool,int finish)263 tpool_destroy (tpool_t tpool, int finish)
264 {
265 int i, rtn;
266 tpool_work_t *cur_nodep;
267
268 if ((rtn = pthread_mutex_lock (&(tpool->queue_lock))) != 0)
269 error ("pthread_mutex_lock failed in tpool_destroy()");
270
271 if (tpool->queue_closed || tpool->shutdown)
272 {
273 if ((rtn = pthread_mutex_unlock (&(tpool->queue_lock))) != 0)
274 error ("pthread_mutex_unlock failed in tpool_destroy()");
275 return 0;
276 }
277
278 tpool->queue_closed = 1;
279
280 if (finish == 1)
281 {
282 while (tpool->cur_queue_size != 0)
283 {
284 if ((rtn =
285 pthread_cond_wait (&(tpool->queue_empty),
286 &(tpool->queue_lock))) != 0)
287 error ("pthread_cond_wait failed in tpool_destroy()");
288 }
289 }
290 tpool->shutdown = 1;
291
292 if ((rtn = pthread_mutex_unlock (&(tpool->queue_lock))) != 0)
293 error ("pthread_mutex_unlock failed in tpool_destroy()");
294
295 if ((rtn = pthread_cond_broadcast (&(tpool->queue_not_empty))) != 0)
296 error ("pthread_cond_broadcast failed for queue_not_empty()");
297 if ((rtn = pthread_cond_broadcast (&(tpool->queue_not_full))) != 0)
298 error ("pthread_cond_broadcast failed for queue_not_full");
299
300 for (i = 0; i < tpool->num_threads; i++)
301 {
302 if ((rtn = pthread_join (tpool->threads[i], NULL)) != 0)
303 error ("pthread_join failed in tpool_destroy()");
304 }
305
306 myfree(tpool->threads);
307 while (tpool->queue_head != NULL)
308 {
309 cur_nodep = tpool->queue_head->next;
310 tpool->queue_head = tpool->queue_head->next;
311 myfree(cur_nodep);
312 }
313 myfree(tpool);
314 return 0;
315 }
316
317 #else /* ==================NOT threaded version======== */
318
319
320
321 #endif
322
323 #define HEATCHECKINTERVAL 1000
324 #define HEATSWAPLOW 0
325 #define HEATSWAPHIGH 10
adjust_temperatures(world_fmt ** universe,long hchains,long step,long steps)326 void adjust_temperatures(world_fmt ** universe, long hchains, long step, long steps)
327 {
328 long i;
329 const MYREAL bigger = 1.1;
330 const MYREAL smaller = 0.9;
331 const MYREAL corrsum = steps / HEATCHECKINTERVAL;
332 MYREAL *delta;
333 if(step == 0)
334 {
335 for(i=0; i< hchains; i++)
336 {
337 universe[i]->treeswapcount=0;
338 if(steps < HEATCHECKINTERVAL)
339 universe[i]->averageheat = 1./universe[i]->heat;
340 else
341 universe[i]->averageheat = 0.0;
342 }
343 }
344 else
345 {
346 if ( (step % HEATCHECKINTERVAL ) == 0)
347 {
348 universe[0]->averageheat= 1.0;
349 delta = (MYREAL *) mycalloc(hchains,sizeof(MYREAL));
350 // FPRINTF(stdout,"\n%f %li\n",1./universe[0]->heat,universe[0]->treeswapcount);
351 for(i=1; i< hchains; i++)
352 {
353 universe[i]->averageheat += (1./(universe[i]->heat * corrsum ));
354 //FPRINTF(stdout,"%f %li\n",1./universe[i]->heat,universe[i]->treeswapcount);
355 delta[i-1] = 1./universe[i]->heat - 1./universe[i-1]->heat;
356 if(delta[i-1] < EPSILON)
357 delta[i-1] = EPSILON;
358 }
359 for(i=1; i< hchains; i++)
360 {
361 if(universe[i-1]->treeswapcount <= HEATSWAPLOW)
362 delta[i-1] *= smaller;
363 else
364 {
365 if(delta[i-1]<1000 && universe[i-1]->treeswapcount >= HEATSWAPHIGH)
366 delta[i-1] *= bigger;
367 }
368 }
369 universe[0]->heat = 1.;
370 universe[0]->treeswapcount=0;
371 for(i=1; i< hchains; i++)
372 {
373 universe[i]->heat = 1./(1./universe[i-1]->heat + delta[i-1]);
374 universe[i]->treeswapcount=0;
375 }
376 myfree(delta);
377 }
378 }
379 }
380 ///
381 /// adjust temperatures using a lower and upper bound (temperature=1 and temperature=highest)
adjust_temperatures_bounded(world_fmt ** universe,long hchains,long step,long steps)382 void adjust_temperatures_bounded(world_fmt ** universe, long hchains, long step, long steps)
383 {
384 //const MYREAL corrsum = steps / HEATCHECKINTERVAL;
385 long i;
386 long deltasum;
387 MYREAL negheat = 0.0;
388 MYREAL *delta;
389 if(step == 0)
390 {
391 for(i=0; i< hchains; i++)
392 {
393 universe[i]->treeswapcount=0;
394 if(steps < HEATCHECKINTERVAL)
395 universe[i]->averageheat = 1./universe[i]->heat;
396 else
397 universe[i]->averageheat = 0.0;
398 }
399 }
400 else
401 {
402 if ( (step % HEATCHECKINTERVAL ) == 0)
403 {
404 universe[0]->averageheat= 1.0;
405 delta = (MYREAL *) mycalloc(hchains,sizeof(MYREAL));
406 #ifdef DEBUG
407 fprintf(stdout,"\n%i> chain 1: %f %f %li\n",myID, 1./universe[0]->heat,universe[0]->averageheat, universe[0]->treeswapcount);
408 #endif
409 deltasum = 0;
410 for(i=1; i< hchains; i++)
411 {
412 universe[i]->averageheat += HEATCHECKINTERVAL * (1./universe[i]->heat - universe[i]->averageheat) / step;
413 #ifdef DEBUG
414 fprintf(stdout,"%i> chain %li: %f %f %li (step=%li (%f))\n", myID, i+1, 1./universe[i]->heat,universe[i]->averageheat, universe[i]->treeswapcount, step, (MYREAL) HEATCHECKINTERVAL/step);
415 #endif
416 delta[i-1] = (MYREAL) universe[i]->treeswapcount;
417 if(delta[i-1] < 1.0)
418 delta[i-1] = 1.0;
419 deltasum += (long) delta[i-1];
420 }
421
422 universe[0]->heat = 1.;
423 universe[0]->treeswapcount=0;
424 for(i=1; i < hchains-1; i++)
425 {
426 negheat += delta[i-1]/deltasum;
427 universe[i]->heat = 1. - negheat;
428 universe[i]->treeswapcount=0;
429 }
430 myfree(delta);
431 }
432 }
433 }
434
435
reset_weight(world_fmt * world)436 void reset_weight(world_fmt *world)
437 {
438 long i;
439 seqmodel_fmt *seq = world->data->seq[0];
440 long endsite = seq->endsite;
441 long heated_end = (long) (endsite - endsite * 1./world->heat);
442 //MYREAL v;
443 memcpy(seq->aliasweight, seq->savealiasweight, sizeof(long)*endsite);
444 for(i=0;i<heated_end; i++)
445 {
446 //xcode v = RANDINT(0,endsite);
447 seq->aliasweight[0] = 0 ;
448 }
449 }
450