1 /*! \File migrate_mpi.c */
2 /* MPI parts for migrate
3 started November 2000, Seattle
4 Peter Beerli beerli@fsu.edu
5
6
7 Copyright 1996-2002 Peter Beerli and Joseph Felsenstein, Seattle WA
8 Copyright 2003-2007 Peter Beerli, Tallahassee FL
9
10 Permission is hereby granted, free of charge, to any person obtaining
11 a copy of this software and associated documentation files (the
12 "Software"), to deal in the Software without restriction, including
13 without limitation the rights to use, copy, modify, merge, publish,
14 distribute, sublicense, and/or sell copies of the Software, and to
15 permit persons to whom the Software is furnished to do so, subject
16 to the following conditions:
17
18 The above copyright notice and this permission notice shall be
19 included in all copies or substantial portions of the Software.
20
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
24 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28
29
30 $Id: migrate_mpi.c 2170 2013-09-19 12:08:27Z beerli $
31 */
32 #ifdef MPI
33 #include "migration.h"
34 #include "tools.h"
35 #include "sighandler.h"
36 #include "migrate_mpi.h"
37 #include "broyden.h"
38 #include "combroyden.h"
39 #include "gammalike.h"
40 #include "profile.h"
41 #include "pretty.h"
42 #include "options.h"
43 #include "tree.h"
44 #include "world.h"
45 #include "joint-chains.h"
46 #include "data.h"
47 #include "laguerre.h"
48 #include "random.h"
49 #ifdef UEP
50 #include "uep.h"
51 #endif
52 #include "bayes.h"
53 #ifndef WINDOWS
54 #include <unistd.h>
55 #endif
56 /*should go into profile.h*/
57 #define GRIDSIZE 9
58
59 #ifdef PRETTY
60 extern float page_height;
61 #endif
62
63 extern int numcpu;
64
65 extern const MPI_Datatype mpisizeof;
66
67 extern void run_replicate(long locus,
68 long replicate,
69 world_fmt **universe,
70 option_fmt *options,
71 data_fmt *data,
72 tpool_t * heating_pool,
73 int usize,
74 long *treefilepos,
75 long *Gmax);
76 extern void run_locus (world_fmt ** universe, int usize,
77 option_fmt * options, data_fmt * data,
78 tpool_t * heating_pool, long maxreplicate,
79 long locus, long *treefilepos, long *Gmax);
80
81 void mpi_run_locus(world_fmt ** universe, int usize, option_fmt * options,
82 data_fmt * data, tpool_t * heating_pool, long maxreplicate,
83 long locus, long *treefilepos, long *Gmax);
84 void mpi_runreplicates_worker (world_fmt ** universe, int usize,
85 option_fmt * options, data_fmt * data,
86 tpool_t * heating_pool,
87 long *treefilepos, long *Gmax);
88 long pack_databuffer (data_fmt * data, option_fmt * options);
89 void unpack_databuffer (data_fmt * data, option_fmt * options);
90 void pack_allele_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data,
91 long pop, long ind);
92 void pack_sequence_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data,
93 long pop, long ind, long locus);
94 void mpi_gradient_master (nr_fmt * nr, world_fmt * world, int *who);
95 void mpi_resultsmaster (long sendtype, world_fmt * world,
96 long maxrep,
97 void (*unpack) (char *buffer, world_fmt * world,
98 long locus, long maxrep,
99 long numpop));
100
101 void mpi_results_worker (long bufs, world_fmt * world,
102 long maxrep,
103 long (*pack) (MYREAL **buffer, world_fmt * world,
104 long locus, long maxrep, long numpop));
105 void assignloci_worker (world_fmt * world, option_fmt *options, long *Gmax);
106 void assign_worker_cleanup (void);
107 void swap_atl (long from, long to, world_fmt * world);
108 long pack_quantile (char **buffer, quantile_fmt quant, long n);
109 void unpack_quantile (char *buffer, quantile_fmt quant, long n);
110 long pack_failed_percentiles (char **buffer, boolean *failed, long n);
111 void unpack_failed_percentiles (char *buffer, boolean *failed, long n);
112
113 void handle_message(char *rawmessage,int sender, world_fmt *world);
114 void handle_mdim(float *values,long n, int sender, world_fmt * world);
115 void handle_burnin_message(char *rawmessage,int sender, world_fmt * world);
116
117 void set_filehandle(char *message, world_fmt *world,
118 void **file, long *msgstart);
119
120 void mpi_receive_replicate( int sender, int tag, long locus, long replicate, world_fmt * world);
121
122 long unpack_single_bayes_buffer(MYREAL *buffer, bayes_fmt * bayes, world_fmt * world,long locus);
123 long pack_single_bayes_buffer(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus);
124 long pack_single_bayes_buffer_part(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus);
125
126 void unpack_hist_bayes_buffer(MYREAL *buffer, bayes_fmt *bayes, world_fmt *world, long locus);
127 long pack_hist_bayes_buffer(MYREAL **buffer, bayeshistogram_fmt *hist, world_fmt * world, long startposition);
128
129 long unpack_BF_buffer(MYREAL *buffer, long start, long locus, world_fmt * world);
130 long unpack_ess_buffer(MYREAL *buffer, long start, world_fmt *world);
131 long pack_BF_buffer(MYREAL **buffer, long start, long locus, world_fmt * world);
132 long pack_ess_buffer(MYREAL **buffer, long start, world_fmt *world);
133
134 void unpack_sumfile_buffer (MYREAL *buffer, world_fmt * world,
135 long locus, long maxrep, long numpop);
136 void unpack_single_sumfile_buffer (MYREAL *buffer, timearchive_fmt **ta, world_fmt *world,
137 long locus, long replicate, long numpop, long *startz);
138 long
139 pack_sumfile_buffer (MYREAL **buffer, world_fmt * world,
140 long locus, long maxrep, long numpop);
141
142 long pack_single_sumfile_buffer(MYREAL **buffer, long z, long *allocbufsize, world_fmt * world,
143 long locus, long replicate, long numpop);
144
145 void mpi_send_replicate(int sender, long locus, long replicate, world_fmt * world);
146 long mpi_send_stop_mcmc_lociworker(long numcpu, long loci);
147 long mpi_send_stop_mcmc_replicateworker(long numcpu, long loci);
148 void mpi_send_stop_tag (int worker, world_fmt * world);
149 long mpi_send_stop_mcmc_worker(long numcpu, long loci, MPI_Comm *comm, MPI_Request *irequests, MPI_Status *istatus, long id);
150 void mpi_send_stop_tag (int worker, world_fmt * world);
151 void send_receive_bayes_params(world_fmt *world, long locus);
152 void handle_replication(int sender,int tag,char *tempstr, world_fmt *world);
153 boolean in_mpistack(int sender, world_fmt *world);
154
155 #ifdef MPICHECK
156 #include <sys/resource.h>
157 void set_memory_limit(rlim_t softsize,rlim_t maxsize);
158 void check_memory_limit();
159 #endif
160
161 #ifdef PARALIO
162 boolean my_write_error;
163 #endif
164
165
166 ////////////////////////////////////////////////////////////////////////////////
167 ////////////////////////////////////////////////////////////////////////////////
168 ////////////////////////////////////////////////////////////////////////////////
169 ////////////////////////////////////////////////////////////////////////////////
170
171 ///
172 /// Controls all loci in the MPI implementation, uses a load balancing scheme and
173 /// distributes the work on the waiting nodes
174 void
mpi_runloci_master(long loci,int * who,world_fmt * world,boolean options_readsum,boolean menu)175 mpi_runloci_master (long loci, int *who, world_fmt *world, boolean options_readsum, boolean menu)
176 {
177 boolean done = FALSE;
178 long alldone = 0;
179 int tag;
180 int sender = 0;
181
182 long locus;
183 long *twolongs;
184 long locusdone = -1;
185 long numsent = 0;
186 long tempstrsize = LINESIZE;
187 long nbase = loci + 1;
188 long minnodes = MIN((long) numcpu-1, (long) nbase-1);
189
190 char *tempstr;
191 char *leadstr;
192 float *temp;
193 long nn = world->numpop2 + 9 + world->bayes->mu + world->options->heated_chains + 2;
194
195 MPI_Status status;
196 MPI_Status *istatus;
197 MPI_Request *irequests;
198
199 int ll;
200 #ifdef IPROBE
201 int notwaiting=0;
202 #endif
203 irequests = (MPI_Request *) mycalloc(minnodes, sizeof(MPI_Request));
204 istatus = (MPI_Status *) mycalloc(minnodes, sizeof(MPI_Status));
205 twolongs = (long *) mycalloc(TWO, sizeof(long));
206 tempstr = (char *) mycalloc(tempstrsize, sizeof(char));
207 leadstr = (char *) mycalloc(SMALLBUFSIZE, sizeof(char));
208 temp = (float *) mycalloc(nn, sizeof(float));
209 twolongs[1]= 0;
210 // printf("%i> MASTER: minnodes = %li\n+++++++++++++++++++++\n",myID, minnodes);
211 for (locus = 0; locus < minnodes; locus++)
212 {
213 twolongs[0] = locus;
214 ll = (int) (locus + 1);
215 MYMPIISEND(twolongs, TWO, MPI_LONG, ll, ll, comm_world, &irequests[numsent]);
216 numsent++;
217 // printf("%i> MASTER: Locus %li sent to worker n%i twolong[0]=%li\n",myID, locus, (MYINT) locus + 1, twolongs[0]);
218 }
219 // waits until all minodes nodes received their message
220 MYMPIWAITALL(minnodes, irequests, istatus);
221 //printf("%i> MASTER: after initial send of loci\n",myID);
222
223 locus = 0;
224 world->mpistacknum = 0;
225 while((alldone < world->loci) || (world->mpistacknum < numcpu-1))
226 {
227 done=FALSE;
228 while(!done)
229 {
230 memset(tempstr,0,sizeof(char)*tempstrsize);
231 #ifdef IPROBE
232 if(menu)
233 {
234 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm_world, ¬waiting, &status);
235 if(!notwaiting)
236 {
237 sleep(1);
238 get_time (tempstr, "%H:%M:%S");
239 fprintf(stdout,"%i> master waits for workers -- %s\n",myID, tempstr);
240 continue;
241 }
242 }
243 #endif
244 MYMPIRECV (leadstr, SMALLBUFSIZE, MPI_CHAR, (MYINT) MPI_ANY_SOURCE,
245 (MYINT) MPI_ANY_TAG, comm_world, &status);
246 sender = status.MPI_SOURCE;
247 tag = status.MPI_TAG;
248 //if(tempstr[0] != 'M')
249 //printff("%i> MASTER: received from n%i with tag %i: %s\n",myID,sender,tag,tempstr);
250 switch(leadstr[0])
251 {
252 case 'M':
253 tempstrsize = atol(leadstr+1);
254 tempstr = (char*) realloc(tempstr,sizeof(char)*(tempstrsize+1));
255 memset(tempstr,0,sizeof(char)*(tempstrsize+1));
256 MYMPIRECV (tempstr, tempstrsize, MPI_CHAR, sender, tag,
257 comm_world, &status);
258 // printf("%i> MASTER: message from n%i: %s\n",myID,sender,tempstr);
259 handle_message(tempstr,sender, world);
260 break;
261 case 'Z': // reading/writing of the raw bayes posterior data , also used to guide
262 // multiple replicates on when to start sampling
263 MYMPIRECV (temp, nn, MPI_FLOAT, sender, tag,
264 comm_world, &status);
265 handle_mdim(temp,nn,sender, world);
266 break;
267 case 'B': //burnin stopping rule
268 tempstrsize = atol(leadstr+1);
269 tempstr = (char*) realloc(tempstr,sizeof(char)*(tempstrsize+1));
270 memset(tempstr,0,sizeof(char)*(tempstrsize+1));
271 MYMPIRECV (tempstr, tempstrsize, MPI_CHAR, sender, tag,
272 comm_world, &status);
273 //printf("%i> MASTER: burn-in message from n%i: %s\n",myID,sender,tempstr);
274 handle_burnin_message(tempstr,sender-BURNTAG, world);
275 break;
276 case 'R':
277 //ignore first character and translate into locusnumber
278 locusdone = atol(leadstr+1);
279 //if negative this means locus had no data at all -> ignored
280 if(locusdone<0)
281 {
282 locusdone = -locusdone;
283 world->data->skiploci[locusdone]=TRUE;
284 //printf("MASTER received a skiplocus %li\n",locusdone);
285 }
286 done=TRUE;
287 ++alldone;
288 //printff("%i> MASTER: locus %li, %li of %li finished **************************\n",myID, locusdone,alldone,world->loci);
289 break;
290 case 'N': // need a replicator, this message is sent from a
291 // locus-node to the master for distribution among nodes that
292 // are waiting for work, the master send will delegate a
293 // replicate and tell also the replicator
294 // where it needs to send the final result.
295 #ifdef DEBUG_MPI
296 printf("%i> MASTER: received replicator request from n%i using string %s\n",myID, sender, tempstr);
297 #endif
298 // add the mpistack_request list
299 handle_replication(sender,tag,leadstr,world);
300 break;
301 case 'G':
302 // node has free time and could do replicator work
303 #ifdef DEBUG_MPI
304 printf("%i> MASTER: accepts replicator n%i at stack position %li\n",myID, sender,world->mpistacknum);
305 #endif
306 world->mpistack[world->mpistacknum] = sender;
307 world->mpistacknum += 1;
308 if(alldone>=world->loci && (world->mpistacknum >= (numcpu-1)) && world->mpistack_requestnum==0)
309 {
310 #ifdef DEBUG_MPI
311 printf("%i> MASTER: reached mpistacknum=%li\n",myID,world->mpistacknum);
312 #endif
313 done=TRUE;
314 continue;
315 }
316 while(world->mpistacknum > 0 && world->mpistack_requestnum > 0)
317 {
318 world->mpistack_requestnum -= 1;
319 sender = world->mpistack_request[world->mpistack_requestnum].sender;
320 tag = world->mpistack_request[world->mpistack_requestnum].tag;
321 handle_replication(sender,tag,
322 world->mpistack_request[world->mpistack_requestnum].tempstr,world);
323 }
324 break;
325 default: /* never go here under normal run condition */
326 #ifdef DEBUG_MPI
327 fprintf(stderr,"%i> message=@%s@\n@%i@> sender=@%i@ tag=@%i@\n",
328 myID,leadstr, myID,status.MPI_SOURCE,status.MPI_TAG);
329 done=TRUE;
330 #else
331 MPI_Finalize();
332 error("DIED because of wrong message from worker");
333 #endif
334 break;
335 }
336 }
337 who[locusdone] = sender;
338 // if not done with loci send another locus-work to sender (a node 1..numcpu)
339 if (numsent < loci)
340 {
341 twolongs[0]=numsent;
342 MYMPISEND (twolongs, TWO, MPI_LONG, (MYINT) sender, (MYINT) numsent + 1, comm_world);
343 numsent++;
344 //printff("%i> MASTER: ANOTHER locus %li sent to worker n%i %i\n",myID, numsent, (MYINT) numsent, (MYINT) numsent + 1);
345 }
346 else
347 {
348 twolongs[0] = 0;
349 //tell workers to stop waiting for new loci
350 if(!in_mpistack(sender, world))
351 {
352 MYMPISEND (twolongs, TWO, MPI_LONG, (MYINT) sender, (MYINT) 0, comm_world);
353 }
354 }
355 //printff("%03i> available workers: %li\n requests pending: %li\n total workers=%i\n loci done=%li\n",myID, world->mpistacknum,world->mpistack_requestnum, numcpu-1, locus+1);
356 locus++;
357 }
358 //printf("%03i> available workers: %li\n requests pending: %li\n total workers=%i\n loci done=%li\n alldone=%li\n",myID, world->mpistacknum,world->mpistack_requestnum, numcpu-1, locus+1,alldone);
359 // stop loci and/or replicate worker that had never the chance to work on a locus
360 // or replicate, but are still listening
361 for(locus=0;locus < world->mpistacknum;locus++)
362 {
363 #ifdef DEBUG_MPI
364 fprintf(stdout,"%i> sent kill to node %i\n",myID, world->mpistack[locus]);
365 #endif
366 mpi_send_stop_tag(world->mpistack[locus], world);
367 }
368 myfree(twolongs);
369 myfree(istatus);
370 myfree(irequests);
371 myfree(tempstr);
372 myfree(leadstr);
373 myfree(temp);
374 //printf("%i> before barrier\n",myID);
375 //MYMPIBARRIER(comm_world);
376 //printf("%i> leaving mpi_runloci_function()\n", myID);
377 }
378
379
380 ///
381 /// worker nodes execute this function and serve the master
mpi_runloci_worker(world_fmt ** universe,int usize,option_fmt * options,data_fmt * data,tpool_t * heating_pool,long maxreplicate,long * treefilepos,long * Gmax)382 void mpi_runloci_worker (world_fmt ** universe, int usize,
383 option_fmt * options, data_fmt * data,
384 tpool_t * heating_pool, long maxreplicate,
385 long *treefilepos, long *Gmax)
386 {
387 boolean done = FALSE;
388
389 char *rawmessage;
390
391 long locus;
392 long *twolongs;
393 long rawmsgsize = 0;
394 #ifdef MPIREPLICANT
395 long nbase = data->loci+1;
396 #endif
397
398 MPI_Status status;
399
400 twolongs = (long *) mycalloc(TWO, sizeof(long));
401 rawmessage = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
402 #ifdef MPIREPLICANT
403 if(myID < nbase)
404 {
405 #endif
406 while (!done)
407 {
408 MYMPIRECV (twolongs, TWO, MPI_LONG, MASTER, MPI_ANY_TAG,
409 comm_world, &status);
410 locus = twolongs[0];
411 if (status.MPI_TAG != 0) //stop condition
412 {
413 //printf("%i> WORKER: received locus %li (twolongs[0]=%li) from MASTER\n",myID, locus, twolongs[0]);
414 #ifdef MPIREPLICANT
415 mpi_run_locus(universe, usize, options, data,
416 heating_pool, maxreplicate, locus, treefilepos, Gmax);
417 #else
418 run_locus (universe, usize, options, data,
419 heating_pool, maxreplicate, locus, treefilepos, Gmax);
420 #endif
421 if(universe[0]->data->skiploci[locus]==FALSE)
422 rawmsgsize = 1 + sprintf(rawmessage,"R%li",locus);
423 else
424 rawmsgsize = 1 + sprintf(rawmessage,"R-%li",locus);
425 MYMPISEND (rawmessage, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER,
426 (MYINT) (locus + ONE), comm_world);
427 /* we want to know what locus we worked for
428 - to control the work sent by master
429 - to use in setup_parameter0() [combroyden2.c] */
430 universe[0]->who[locidone++] = locus;
431 }
432 else
433 {
434 done = TRUE;
435 //printf("%i> LOCUSWORKER finished\n",myID);
436 mpi_runreplicates_worker (universe, usize, options, data, heating_pool, treefilepos, Gmax);
437 // printf("%i> LOCUSWORKER and then REPLICANT: finished\n",myID);
438 }
439 }
440 #ifdef MPIREPLICANT
441 }
442 else
443 {
444 mpi_runreplicates_worker (universe, usize, options, data, heating_pool, treefilepos, Gmax);
445 }
446 #endif
447 myfree(twolongs);
448 myfree(rawmessage);
449 //printf("%i> WORKER done\n",myID);
450 // printf("%i> before barrier\n",myID);
451 //MYMPIBARRIER(comm_world);
452 }
453
454 #ifdef MPIREPLICANT
455 ///
456 /// attempt to improve MPIruns with replicates
457 /// each locus is responsible for replication farming and and reporting back to master
458 /// master <- locus-master <- locus-replicate-worker
459 /// generate genealogies for a single locus
460 /// \callgraph
461 void
mpi_run_locus(world_fmt ** universe,int usize,option_fmt * options,data_fmt * data,tpool_t * heating_pool,long maxreplicate,long locus,long * treefilepos,long * Gmax)462 mpi_run_locus(world_fmt ** universe, int usize, option_fmt * options,
463 data_fmt * data, tpool_t * heating_pool, long maxreplicate,
464 long locus, long *treefilepos, long *Gmax)
465 {
466 //world_fmt *world=universe[0];
467 boolean done=FALSE;
468
469 char *tempstr;
470 // const long tempstrsize = SMALLBUFSIZE;
471 int tag;
472 int *who;
473 int minnodes; // number of replicate-worker nodes [changed meaning!]
474 int sender = 0;
475 int nbase = data->loci; //number of loci-worker nodes
476 int senderlocus = -1;
477 long replicate;
478 long i;
479 long *temp;
480 long numsent = 1;//
481 long senderreplicate = 0;
482
483 // MPI_Request irequest;
484 MPI_Request *irequests = NULL;
485
486 MPI_Status status;
487 MPI_Status *istatus = NULL;
488
489 temp = (long *) mycalloc(TWO, sizeof(long));
490 tempstr = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
491 who = (int *) mycalloc(maxreplicate,sizeof(int));
492
493
494 if(maxreplicate>1)
495 {
496 // number of nodes available for replicate workers,
497 // numcpu - nbase - 1 [for masternode] and maxreplicate-1
498 // [locus-worker is doing one replicate itself] are limiting
499 //minnodes = maxreplicate; //MAX(0,MIN(maxreplicate-1,numcpu-nbase-1))+1;
500 minnodes = MAX(0,MIN(numcpu-nbase-1,maxreplicate-1))+1;
501 irequests = (MPI_Request *) mycalloc(minnodes+1,sizeof(MPI_Request));
502 istatus = (MPI_Status *) mycalloc(minnodes+1,sizeof(MPI_Status));
503 temp[0] = locus;
504 // replicate 1 to maxreplicate should be worked on by other nodes
505 // minnodes is the number of nodes free for work on replicates alone
506 // so with 3 replicates to work on and 3 total nodes and a single locus we need
507 // 1 master, 1 locus-worker and have 1 replicate-worker, the locus-worker and
508 // the replicate worker will both work on replicates, the locus-worker sends
509 // off 1 request (for the replicate worker), because we start the locus worker
510 // with replicate 0, the loop for the replicate workers starts at one
511 // and to address this shift the MIN(..,minnodes+1) addresses that
512 for (replicate = 1; replicate < minnodes; replicate++)//Cesky Krumlov 2013 replicate=1 replaced
513 {
514 temp[1] = replicate;
515 sprintf(tempstr,"N%i %li %li", myID, locus, replicate);
516 //fprintf(stdout,"%i> request a node from n%i for locus %li and replicate %li\n",myID, MASTER, locus, replicate);
517 MYMPIISEND (tempstr, SMALLBUFSIZE, MPI_CHAR,
518 (MYINT) MASTER, (MYINT) (locus + 1 + REPTAG), comm_world, &irequests[numsent]);
519 numsent++; // counter of how many replicates are sent off-node
520 }
521 // this should be asynchronous so that the myRepID-master can do some work, too.
522 //Cesky Krumlov 2013
523 run_replicate(locus, 0, universe, options, data,
524 heating_pool, usize,
525 treefilepos, Gmax);
526 //Cesky Krumlov 2013
527 //Cesky Krumlov 2013
528 //Cesky Krumlov 2013
529 who[0] = myID;
530 MYMPIWAITALL(numsent,irequests, istatus); // wait for all replicators to finish
531 // set replicate counter to 1 because locus-worker itself finished first replicate
532 replicate=1;//Cesky Krumlov 2013 was 1
533 //Cesky Krumlov 2013numsent++;
534
535 done = FALSE;
536 while(!done)
537 {
538 // done=TRUE means that
539 // no replicator worker is available yet
540 // the loci-worker has to do all the work if numsent==1
541 // Cesky Krumlov 2013 printf("%i> accepted/trials: numsent=%li rep %li [%li,%li,%li,%li]/[%li,%li,%li,%li]\n",
542 // Cesky Krumlov 2013 myID, numsent, replicate, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
543 if(numsent==1)
544 {
545 run_replicate(locus, replicate, universe, options, data,
546 heating_pool, usize,
547 treefilepos, Gmax);
548 who[replicate] = myID;
549 replicate++;
550 if(replicate >= maxreplicate)
551 done=TRUE;
552 }
553 else
554 {
555 memset(irequests,0,sizeof(int)*minnodes);
556 memset(istatus,0,sizeof(int)*minnodes);
557 MYMPIRECV (tempstr, SMALLBUFSIZE, MPI_CHAR, MPI_ANY_SOURCE,
558 (MYINT)(locus+1+ REPTAG), comm_world, &status);
559 sender = status.MPI_SOURCE; // repID of the replicator that did the work
560 tag = status.MPI_TAG; // tag is working locus + replicator tag
561 senderlocus = tag-REPTAG; // locus that was worked on by sender
562 // test so that we can be sure that we got things from a valid replicator worker
563 if(sender == myID)
564 {
565 fprintf(stdout,"%i, %i> DIE\nDIE\nDIE\nDIE\nDIE\nDIE\nDIE\nDIE\nDIE\n",myID, myRepID);
566 error("tried to send a replicate to myself using MPI -- this is not allowed\n");
567 }
568 // test whether the locus is the same between locus-worker and replicator
569 if(senderlocus-1 != locus)
570 warning("%i> !!!!!!!!!!!!! got wrong locus from worker myRepID=%i (my locus %i != its locus %i )\n",
571 myID, sender, locus, senderlocus-1);
572 // receive only messages that are prefixed with 'R' and exit on all others
573 if(tempstr[0]=='R')
574 {
575 //ignore first character and translate into repnumber
576 senderreplicate = atol(tempstr+1);
577 }
578 else
579 {
580 fprintf(stderr,"%i> message=%s\n%i> sender=%i tag=%i\n",myID,tempstr, myID,
581 status.MPI_SOURCE,status.MPI_TAG);
582 error("DIED because of wrong message from worker");
583 }
584 // record sender , this record should be filled at the end of this function
585 who[senderreplicate] = sender;
586 //fprintf(stdout,"%i> senderreplicate=%li\n%i> sender=%i tag=%i\n",myID,senderreplicate, myID,
587 // status.MPI_SOURCE,status.MPI_TAG);
588 mpi_receive_replicate(sender, tag, locus, senderreplicate, universe[0]);
589 replicate++;
590 if(replicate >= maxreplicate)
591 {
592 done=TRUE;
593 }
594 temp[0] = locus;
595 if (numsent < maxreplicate) //at least one set was worked by the locus-worker
596 {
597 temp[1] = numsent;
598 sprintf(tempstr,"N%i %li %li", myID, locus, numsent);
599 //fprintf(stdout,"%i> request a node from n%i for locus %li and replicate %li\n",myID, MASTER, locus, numsent);
600 MYMPIISEND (tempstr, SMALLBUFSIZE, MPI_CHAR,
601 (MYINT) MASTER, (MYINT) (locus + 1 + REPTAG), comm_world, &irequest);
602 numsent++;
603 MYMPIWAITALL(ONE, &irequest, &status);
604 }
605 }
606 }
607 }
608 else
609 { /* no replicates */
610 run_replicate(locus, 0, universe, options, data,
611 heating_pool, usize,
612 treefilepos, Gmax);
613 }
614 myfree(temp);
615 myfree(tempstr);
616 myfree(who);
617 if(maxreplicate>1 && (numcpu-1) > universe[0]->loci)
618 {
619 const long hc = universe[0]->options->heated_chains;
620 long t;
621 if(universe[0]->options->heating)
622 {
623 printf("CORRECTING\n");
624 for(t=0; t < hc; t++)
625 {
626 universe[0]->bf[locus * hc + t] /= maxreplicate;
627 }
628 }
629 myfree(irequests);
630 myfree(istatus);
631 }
632 #ifdef UEP
633 if (options->uep)
634 show_uep_store(universe[0]);
635 #endif
636 if (options->bayes_infer)
637 {
638 if (options->replicate && options->replicatenum > 0)
639 {
640 (universe[0])->repkind = MULTIPLERUN;
641 }
642 // @@@@@@@@@@CHECK@@@@@@@@@@@@@
643 if(!universe[0]->options->has_bayesmdimfile)
644 calculate_credibility_interval(universe[0], locus);
645 }
646 else
647 {
648 if (options->replicate && options->replicatenum > 0)
649 {
650 (universe[0])->repkind = MULTIPLERUN;
651 #ifdef LONGSUM
652 change_longsum_times(EARTH);
653 #endif /*LONGSUM*/
654 // over multiple replicates if present or single locus
655 //printf("ML over multiple replicates or single locus\n");
656 (void) estimateParameter(options->replicatenum, *Gmax, universe[0], options,
657 (universe[0])->cov[locus], options->lchains, /*type*/ 'l',
658 SINGLELOCUS, (universe[0])->repkind);
659 }
660 }
661
662 // cleanup
663 if (options->heating)
664 {
665 for (i = 0; i < options->heated_chains; i++)
666 {
667 //free_tree(universe[i]->root, universe[i]);
668 }
669 }
670 else
671 {
672 // free_tree(universe[0]->root, universe[0]);
673 }
674 bayes_reset(universe[0]);
675 }
676
677
678 ///
679 /// run replicates on replicate-worker nodes
680 /// this function is called in main() and will work on any preset locus and any preset replicate
681 /// the calling function is responsible for the correct assignment to locus and replicate.
682 void
mpi_runreplicates_worker(world_fmt ** universe,int usize,option_fmt * options,data_fmt * data,tpool_t * heating_pool,long * treefilepos,long * Gmax)683 mpi_runreplicates_worker (world_fmt ** universe, int usize,
684 option_fmt * options, data_fmt * data,
685 tpool_t * heating_pool,
686 long *treefilepos, long *Gmax)
687 {
688 boolean done = FALSE;
689 char *rawmessage;
690 int sender;
691 long nng;
692 long *temp;
693 long replicate;
694 long locus;
695 long rawmsgsize = 0;
696 char *ready;
697 MPI_Status status;
698 ready = mycalloc(SMALLBUFSIZE,sizeof(char));
699 temp = (long *) mycalloc(3, sizeof(long));
700 rawmessage = (char *) mycalloc(STRSIZE,sizeof(char));
701 sprintf(ready,"G%i",myID);
702 while (!done)
703 {
704 // printf("%i> REPLICANT: send ready message \"%s\" to n%i\n", myID, ready, MASTER);
705 MYMPISEND (ready, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, myID, comm_world);
706 MYMPIRECV (temp, 3, MPI_LONG, (MYINT) MASTER, MPI_ANY_TAG, comm_world, &status);
707 sender = (int) temp[0];
708 locus = temp[1];
709 replicate = temp[2];
710 //printf("%i> REPLICANT: received locus %li and replicate %li from n%i via n%i \n", myID, locus+1, replicate+1, sender, status.MPI_SOURCE);
711 if (status.MPI_TAG != 0) //stop condition
712 {
713 nng=universe[0]->numpop2 + universe[0]->loci * universe[0]->bayes->mu + 1;
714 memset(universe[0]->accept_archive,0,sizeof(long)*2*nng);//Cesky Krumlov 2013 resets also trials_archive
715
716 run_replicate(locus, replicate, universe, options, data, heating_pool, usize,treefilepos, Gmax);
717 //printf("%i> after runreplicate in\n",myID);
718 rawmsgsize = 1 + sprintf(rawmessage,"R%li ",replicate);
719 MYMPISEND (rawmessage, rawmsgsize, MPI_CHAR, (MYINT) sender,
720 (MYINT) (locus+1+ REPTAG), comm_world);
721 mpi_send_replicate(sender, locus, replicate, universe[0]);
722 //printf("%i> after mpi_send_replicate()\n",myID);
723 if(universe[0]->options->bayes_infer)
724 {
725 universe[0]->bayes->numparams = 0;
726 }
727 }
728 else
729 {
730 done = TRUE;
731 //printf("%i> REPLICANT received KILL signal.\n",myID);
732 }
733 }
734 myfree(ready);
735 myfree(temp);
736 myfree(rawmessage);
737 // printf("%i> REPLICANT: all replication work is done\n",myID);
738 }
739
740 //---------end replication in MPI
741 void
assign_worker_cleanup(void)742 assign_worker_cleanup (void)
743 {
744 boolean done = FALSE;
745
746 char *rawmessage;
747
748 int sender;
749
750 long *temp;
751 char *ready;
752 MPI_Status status;
753 ready = mycalloc(SMALLBUFSIZE,sizeof(char));
754 temp = (long *) mycalloc(3, sizeof(long));
755 rawmessage = (char *) mycalloc(STRSIZE,sizeof(char));
756 sprintf(ready,"G%i",myID);
757 while (!done)
758 {
759 //printf("%i> REPLICANT: send ready message \"%s\" to n%i\n", myID, ready, MASTER);
760 MYMPISEND (ready, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, myID, comm_world);
761 MYMPIRECV (temp, 3, MPI_LONG, (MYINT) MASTER, MPI_ANY_TAG, comm_world, &status);
762 sender = (int) temp[0];
763 if (status.MPI_TAG != 0) //stop condition
764 {
765 printf("%i> received real message but do not know what to do with this\n",myID);
766 }
767 else
768 {
769 done = TRUE;
770 }
771 }
772 myfree(ready);
773 myfree(temp);
774 myfree(rawmessage);
775 }
776
777 //---------end replication in MPI
778 #endif /*MPIREPLICANT*/
779
780
781 ///
782 /// orchestrates the likelihood calculation
783 MYREAL
mpi_likelihood_master(MYREAL * param,MYREAL * lparam,world_fmt * world,nr_fmt * nr,helper_fmt * helper,int * who)784 mpi_likelihood_master (MYREAL *param, MYREAL *lparam,
785 world_fmt * world, nr_fmt * nr,
786 helper_fmt * helper, int *who)
787 {
788 int tag;
789
790 long locus;
791 long worker;
792 long sender;
793 long addon = 1;
794 long numelem = world->numpop2 + (world->options->gamma ? 1 : 0);
795 long numelem2 = numelem * 2;
796
797 MYREAL logres = 0.0;
798 MYREAL *temp;
799 MYREAL *tmp;
800
801 MPI_Status status;
802
803 MYREAL * locusweight = world->data->locusweight; //invariant loci treatment
804
805 doublevec1d(&tmp,world->loci);
806 doublevec1d(&temp,numelem2+2);
807
808 temp[0] = MIGMPI_LIKE;
809 memcpy (temp + 1, param, numelem * sizeof (MYREAL));
810 memcpy (temp + 1 + numelem, lparam, numelem * sizeof (MYREAL));
811
812 memset (nr->locilikes, 0, sizeof (MYREAL) * world->loci);
813
814 // if(world->loci==1)
815 // addon=1;
816 //else
817 //addon=0;
818
819 for (worker = 1; worker < MIN (world->loci + addon, numcpu); worker++)
820 {
821 MYMPISEND (temp, (MYINT) numelem2+2, mpisizeof, (MYINT) worker, (MYINT) worker, comm_world);
822 }
823 for (worker = 1; worker < MIN (world->loci + addon, numcpu); worker++)
824 {
825 MYMPIRECV (tmp, (MYINT) world->loci, mpisizeof, MPI_ANY_SOURCE,
826 MPI_ANY_TAG, comm_world, &status);
827 sender = status.MPI_SOURCE;
828 tag = status.MPI_TAG;
829 // the worker send a vector of values
830 // e.g. (0,0,0,-12,0,-34,0,0,0), these are all loci
831 // of which most of them were not evaluated
832 // the loop updates the master copy of locilikes
833 for (locus = 0; locus < world->loci; locus++)
834 {
835 nr->locilikes[locus] += tmp[locus];
836 }
837 }
838 for (locus = 0; locus < world->loci; locus++)
839 {
840 logres += locusweight[locus] * nr->locilikes[locus];//invariant loci treatment
841 }
842 nr->llike = logres;
843 myfree(temp);
844 myfree(tmp);
845 return logres;
846 }
847
848
849 ///
850 /// workers calculate likelihood and wait for master to receive work orders
851 void
mpi_likelihood_worker(world_fmt * world,helper_fmt * helper,long rep)852 mpi_likelihood_worker (world_fmt * world, helper_fmt * helper, long rep)
853 {
854 long locus;
855 long ww;
856
857 nr_fmt *nr = helper->nr;
858
859 MYREAL *param = helper->expxv;
860 MYREAL *lparam = helper->xv;
861 MYREAL *mu_rates = world->options->mu_rates;
862
863 memset (nr->locilikes, 0, sizeof (MYREAL) * world->loci);
864
865 if (world->options->gamma)
866 {
867 if (lparam[nr->numpop2] > LNMAXPARAM)
868 {
869 lparam[nr->numpop2] = LNMAXPARAM;
870 }
871 initgammacat (nr->categs, EXP (lparam[nr->numpop2]),1./* EXP (lparam[0])*/,
872 nr->rate, nr->probcat);
873 }
874
875 for (ww = 0; ww < locidone; ww++)
876 {
877 locus = nr->world->who[ww];
878 if (!world->options->gamma)
879 {
880 nr->locilikes[locus] =
881 calc_locus_like (nr, param, lparam, locus) + mu_rates[locus];
882 }
883 else
884 {
885 helper->locus = locus;
886 nr->locilikes[locus] = gamma_locus_like (nr,param,lparam,helper->weight,locus);
887 }
888 }
889 }
890
891 ///
892 /// setup start parameters
893 void
mpi_startparam_master(world_fmt * world)894 mpi_startparam_master(world_fmt * world)
895 {
896 int tag;
897 int numreceived = 0;
898
899 long i;
900 long sender;
901 long workerloci = 0;
902
903 MYREAL *tmp;
904
905 MPI_Status status;
906
907 tmp = (MYREAL*) mycalloc(world->numpop2+1,sizeof(MYREAL));
908
909 while (numreceived < world->loci)
910 {
911 MYMPIRECV (tmp, world->numpop2+1, mpisizeof, MPI_ANY_SOURCE,
912 MPI_ANY_TAG, comm_world, &status);
913 sender = status.MPI_SOURCE;
914 tag = status.MPI_TAG;
915 workerloci=tmp[0];
916 for(i=0; i<world->numpop2; i++)
917 {
918 world->param0[i] += tmp[i+1];
919 }
920 numreceived+=workerloci;
921 }
922 for(i=0; i<world->numpop2; i++)
923 world->param0[i] /= world->loci;
924
925 myfree(tmp);
926 }
927
928 ///
929 /// set start parameters for workers
930 void
mpi_startparam_worker(world_fmt * world)931 mpi_startparam_worker (world_fmt * world)
932 {
933 long ww;
934 long repstart;
935 long repstop;
936 long r;
937 long i;
938 long locus;
939
940 MYREAL *tmp;
941
942 if(locidone>0)
943 {
944 tmp = (MYREAL*) mycalloc(world->numpop2+1,sizeof(MYREAL));
945 set_replicates (world, world->repkind, world->options->replicatenum,
946 &repstart, &repstop);
947 tmp[0]=(MYREAL)locidone;
948 for (ww = 0; ww < locidone; ww++)
949 {
950 locus = world->who[ww];
951 for (r = repstart; r < repstop; r++)
952 {
953 for(i=0; i < world->numpop2; i++)
954 tmp[i+1] += world->atl[r][locus].param[i];
955 }
956 }
957 for(i=1; i < world->numpop2+1; i++)
958 {
959 tmp[i] /= locidone * (repstop-repstart);
960 }
961 MYMPISEND (tmp, world->numpop2+1, mpisizeof, MASTER, myID, comm_world);
962 myfree(tmp);
963 }
964 }
965
966
967 ///
968 /// orchestrates max(gmax) over all nodes
969 void
mpi_gmax_master(world_fmt * world,long * Gmax)970 mpi_gmax_master (world_fmt * world, long *Gmax)
971 {
972 int tag;
973 int sender;
974 int numreceived = 0;
975
976 long tmp;
977
978 MPI_Status status;
979
980 *Gmax = 0.;
981
982 MYMPIBCAST (Gmax, ONE, MPI_LONG, MASTER, comm_world);
983
984 while (numreceived < MIN(world->loci, numcpu - 1))
985 {
986 MYMPIRECV (&tmp, ONE, MPI_LONG, MPI_ANY_SOURCE,
987 MPI_ANY_TAG, comm_world, &status);
988 sender = status.MPI_SOURCE;
989 tag = status.MPI_TAG;
990 if (*Gmax < tmp)
991 {
992 *Gmax = tmp;
993 }
994 numreceived++;
995 }
996 // do we need this barrier really?
997 MYMPIBARRIER(comm_world);
998 }
999
1000 ///
1001 /// returns the gmax values to the master to evaluate max(gmax)
1002 void
mpi_gmax_worker(world_fmt * world)1003 mpi_gmax_worker (world_fmt * world)
1004 {
1005 long ww;
1006 long repstart;
1007 long repstop;
1008 long r;
1009 long locus;
1010 long Gmax = 1;
1011
1012 MYMPIBCAST (&Gmax, ONE, MPI_LONG, MASTER, comm_world);
1013 #ifdef DEBUG_MPI
1014 printf("%i> locidone=%i\n",myID,locidone);
1015 #endif
1016 if(locidone>0)
1017 {
1018 set_replicates (world, world->repkind, world->options->replicatenum,
1019 &repstart, &repstop);
1020
1021 for (ww = 0; ww < locidone; ww++)
1022 {
1023 locus = world->who[ww];
1024 for (r = repstart; r < repstop; r++)
1025 {
1026 if (Gmax < world->atl[r][locus].T)
1027 Gmax = world->atl[r][locus].T;
1028 }
1029 }
1030 MYMPISEND (&Gmax, ONE, MPI_LONG, MASTER, myID, comm_world);
1031 }
1032 // do we need this barrier really?
1033 MYMPIBARRIER(comm_world);
1034 }
1035
1036 ///
1037 /// first worker (myID=1, myRepId=0) will send stop-message to replication nodes
1038 /// the comm_worker group's master is the first worker who has ID=0 in this group
1039 /// as results we send messages to id=1..x in the comm_worker group, do not mix this
1040 /// with the id in comm_world that include the master (id=0 there).
mpi_send_stop_mcmc_worker(long numcpu,long loci,MPI_Comm * comm,MPI_Request * irequests,MPI_Status * istatus,long id)1041 long mpi_send_stop_mcmc_worker(long numcpu, long loci, MPI_Comm *comm,
1042 MPI_Request *irequests, MPI_Status *istatus, long id)
1043 {
1044 long twolongs[2];
1045 long *temp;
1046 long receiver;
1047 long sent = 0;
1048 long xx = (id==0) ? 0 : 1;
1049
1050 temp = (long *) mycalloc(TWO, sizeof(long));
1051 twolongs[0]=0;
1052 twolongs[1]=0;
1053
1054 for(receiver=loci+1-xx; receiver< numcpu-xx; receiver++)
1055 {
1056 MYMPIISEND (temp, TWO, MPI_LONG, (MYINT) receiver, 0, *comm, &irequests[sent]);
1057 sent++;
1058 }
1059 if(sent>0)
1060 {
1061 MYMPIWAITALL(sent,irequests, istatus); // wait for all replicators to finish
1062 }
1063 myfree(temp);
1064 return sent;
1065 }
1066
1067 ///
1068 /// first worker (myID=1, myRepId=0) will send stop-message to replication nodes
1069 /// the comm_worker group's master is the first worker who has ID=0 in this group
1070 /// as results we send messages to id=1..x in the comm_worker group, do not mix this
1071 /// with the id in comm_world that include the master (id=0 there).
mpi_send_stop_mcmc_lociworker(long numcpu,long loci)1072 long mpi_send_stop_mcmc_lociworker(long numcpu, long loci)
1073 {
1074
1075 long receiver;
1076 long *temp;
1077 long xx = (myID==0) ? 0 : 1;
1078 long sent = 0;
1079 long minnodes = MIN(numcpu,loci -1);
1080
1081
1082 MPI_Request *irequests;
1083 MPI_Status *istatus;
1084 irequests = (MPI_Request *) mycalloc(minnodes+1,sizeof(MPI_Request));
1085 istatus = (MPI_Status *) mycalloc(minnodes+1,sizeof(MPI_Status));
1086
1087 temp = (long *) mycalloc(TWO, sizeof(long));
1088
1089 for(receiver=loci+1-xx; receiver< numcpu-xx; receiver++)
1090 {
1091 error("disable because testing why openmpi breaks");
1092 // MYMPIISEND (temp, TWO, MPI_LONG, (MYINT) receiver, 0, comm_workers, &irequests[sent]);
1093 sent++;
1094 }
1095 if(sent>0)
1096 MYMPIWAITALL(sent,irequests, istatus); // wait for all replicators to finish
1097 myfree(temp);
1098 myfree(irequests);
1099 myfree(istatus);
1100 return sent;
1101 }
1102
1103 ///
1104 /// stops all replicate workers
mpi_send_stop_mcmc_replicateworker(long numcpu,long loci)1105 long mpi_send_stop_mcmc_replicateworker(long numcpu, long loci)
1106 {
1107
1108 long *temp;
1109 long receiver;
1110 long sent = 0;
1111 long xx = (myID==0) ? 0 : 1;
1112 long minnodes = labs(numcpu - loci -1);
1113
1114 MPI_Request *irequests;
1115 MPI_Status *istatus;
1116
1117 irequests = (MPI_Request *) mycalloc(minnodes+1,sizeof(MPI_Request));
1118 istatus = (MPI_Status *) mycalloc(minnodes+1,sizeof(MPI_Status));
1119 temp = (long *) mycalloc(TWO, sizeof(long));
1120
1121 for(receiver=loci+1-xx; receiver< numcpu-xx; receiver++)
1122 {
1123 error("disable because testing why openmpi breaks");
1124 // MYMPIISEND (temp, 2, MPI_LONG, (MYINT) receiver, 0, comm_workers, &irequests[sent]);
1125 sent++;
1126 }
1127 if(sent>0)
1128 {
1129 MYMPIWAITALL(sent,irequests, istatus); // wait for all replicators to finish
1130 }
1131 myfree(temp);
1132 myfree(irequests);
1133 myfree(istatus);
1134 return sent;
1135 }
1136
1137 ///
1138 /// sends a stop signal to all loci-worker
1139 void
mpi_send_stop(world_fmt * world)1140 mpi_send_stop (world_fmt * world)
1141 {
1142 long worker;
1143 long numelem = world->numpop2 + (world->options->gamma ? 1 : 0);
1144 long numelem2 = 2 * numelem;
1145
1146 MYREAL *temp;
1147
1148 temp = (MYREAL *) mycalloc (numelem2+2, sizeof (MYREAL));
1149 temp[0] = MIGMPI_END;
1150
1151 for (worker = 1; worker < numcpu; worker++)
1152 {
1153 MYMPISEND (temp, (MYINT) numelem2+2, mpisizeof, (MYINT) worker, (MYINT) 0, comm_world); //end of loci
1154 }
1155 myfree(temp);
1156 }
1157
1158 ///
1159 /// sends a stop signal to a specific worker used in for assignloci_worker
mpi_send_stop_tag(int worker,world_fmt * world)1160 void mpi_send_stop_tag (int worker, world_fmt * world)
1161 {
1162 long *temp;
1163
1164 temp = (long *) mycalloc (TWO, sizeof (MYREAL));
1165
1166 temp[0] = 0;
1167 temp[1] = 0;
1168 MYMPISEND (temp, (MYINT) TWO, MPI_LONG, (MYINT) worker, (MYINT) 0, comm_world);
1169
1170 myfree(temp);
1171
1172 }
1173
1174 ///
1175 /// sends a continue with further analysis to the workers
1176 void
mpi_results_stop(void)1177 mpi_results_stop (void)
1178 {
1179 long worker;
1180 long dummy = 0;
1181
1182 for (worker = 1; worker < numcpu; worker++)
1183 {
1184 MYMPISEND (&dummy, ONE, MPI_LONG, (MYINT) worker, (MYINT) 0, comm_world);
1185 }
1186 }
1187
1188 ///
1189 /// sends out requests and receives all the locus-gradients from the workers
1190 void
mpi_gradient_master(nr_fmt * nr,world_fmt * world,int * who)1191 mpi_gradient_master (nr_fmt * nr, world_fmt * world, int *who)
1192 {
1193 int tag;
1194
1195 long locus;
1196 long sender;
1197 long *tempindex;
1198 long addon = 1;//(world->loci == 1)? 0 : 1;
1199 long numelem = nr->partsize;
1200 long numelem2 = 2 * numelem;
1201
1202 MYREAL *temp;
1203
1204 MPI_Status status;
1205
1206 temp = (MYREAL *) mycalloc (numelem2+2, sizeof (MYREAL));
1207 tempindex = (long *) mycalloc (numelem, sizeof (long));
1208 temp[0] = MIGMPI_GRADIENT;
1209
1210 memcpy (temp + 1, nr->param, numelem * sizeof (MYREAL));
1211 memcpy (tempindex, nr->indeks, nr->partsize * sizeof (long));
1212 memcpy (temp + 1 + numelem, nr->lparam, numelem * sizeof (MYREAL));
1213 temp[numelem2+1] = nr->profilenum;
1214
1215 for (locus = 1; locus < MIN (world->loci + addon, numcpu); locus++)
1216 {
1217 MYMPISEND (temp, (MYINT) numelem2+2, mpisizeof, (MYINT) locus, (MYINT) locus, comm_world);
1218 MYMPISEND (tempindex, (MYINT) numelem, MPI_LONG, (MYINT) locus, (MYINT) locus, comm_world);
1219 }
1220
1221 memset (nr->d, 0, sizeof (MYREAL) * numelem);
1222 for (locus = 1; locus < MIN (world->loci + addon, numcpu); locus++)
1223 {
1224 copy_and_clear_d (nr);
1225 MYMPIRECV (nr->d, (MYINT) numelem, mpisizeof, MPI_ANY_SOURCE,
1226 MPI_ANY_TAG, comm_world, &status);
1227 add_back_d (nr);
1228 sender = status.MPI_SOURCE;
1229 tag = status.MPI_TAG;
1230 }
1231 myfree(temp);
1232 myfree(tempindex);
1233 }
1234
1235 ///
1236 /// worker returns a gradient to master
1237 void
mpi_gradient_worker(helper_fmt * helper,nr_fmt * nr,timearchive_fmt ** tyme)1238 mpi_gradient_worker (helper_fmt * helper, nr_fmt * nr,
1239 timearchive_fmt ** tyme)
1240 {
1241 long ww, locus;
1242
1243 memset (nr->d, 0, sizeof (MYREAL) * nr->partsize);
1244 for (ww = 0; ww < locidone; ww++)
1245 {
1246 locus = nr->world->who[ww];
1247 if(!nr->world->options->gamma)
1248 {
1249 copy_and_clear_d (nr);
1250 simple_loci_derivatives (nr->d, nr, tyme, locus);
1251 add_back_d (nr);
1252 }
1253 else
1254 {
1255 gamma_locus_derivative (helper, locus);
1256 }
1257 }
1258 }
1259
1260 ///
1261 /// provides calculations for the master. calculates likelihoods, gradients, but also
1262 /// delivers results, migration histories, bayesian results and more.
1263 /// workers stay a long time in this function and anwer requests from the master
1264 void
mpi_maximize_worker(world_fmt * world,option_fmt * options,long kind,long rep)1265 mpi_maximize_worker (world_fmt * world, option_fmt *options, long kind, long rep)
1266 {
1267 boolean done = FALSE;
1268 long locus;
1269 long repstart;
1270 long repstop;
1271 long Gmax;
1272 long numelem = world->numpop2 + (world->options->gamma ? 1 : 0) ;
1273 long numelem2 = numelem * 2;
1274 MYREAL *temp;
1275 MPI_Status status;
1276 nr_fmt *nr;
1277 helper_fmt helper;
1278 //char tempfilename[STRSIZE];
1279 temp = (MYREAL *) mycalloc (numelem2 + 2, sizeof (MYREAL));
1280 helper.xv = (MYREAL *) mycalloc (numelem2, sizeof (MYREAL));
1281 helper.expxv = (MYREAL *) mycalloc (numelem2, sizeof (MYREAL));
1282 helper.analystype = SINGLELOCUS; //this may be problematic byt seems only
1283 // to be used here for decision on gamma or not
1284 nr = (nr_fmt *) mycalloc (1, sizeof (nr_fmt));
1285 set_replicates (world, world->repkind, rep, &repstart, &repstop);
1286 which_calc_like (world->repkind);
1287 if(!world->options->bayes_infer)
1288 {
1289 MYMPIBCAST (&Gmax, 1, MPI_LONG, MASTER, comm_world);
1290 create_nr (nr, world, Gmax, 0, world->loci, world->repkind, repstart);
1291 SETUPPARAM0 (world, nr, world->repkind,
1292 repstart, repstop, world->loci, kind, TRUE);
1293 }
1294 else
1295 {
1296 Gmax=1;
1297 create_nr (nr, world, Gmax, 0, world->loci, world->repkind, repstart);
1298 }
1299 while (!done)
1300 {
1301 MYMPIRECV (temp, numelem2+2, mpisizeof, MASTER, MPI_ANY_TAG,
1302 comm_world, &status);
1303 locus = world->locus = status.MPI_TAG - 1;
1304 switch ((long) temp[0])
1305 {
1306 case MIGMPI_LIKE: // returns a log(likelihood) to the master
1307 memset (nr->locilikes, 0, sizeof (MYREAL) * (world->loci+1));
1308 memcpy (helper.expxv, temp + 1, sizeof (MYREAL) * numelem);
1309 memcpy (helper.xv, temp + 1 + numelem,
1310 sizeof (MYREAL) * numelem);
1311 fill_helper (&helper, helper.expxv, helper.xv, world, nr);
1312 mpi_likelihood_worker (world, &helper, rep);
1313 MYMPISEND (nr->locilikes, (MYINT) world->loci, mpisizeof, (MYINT) MASTER,
1314 (MYINT) (locus + 1), comm_world);
1315 break;
1316 case MIGMPI_GRADIENT: // returns a gradient to the master
1317 memcpy (nr->param, temp + 1, sizeof (MYREAL) * (numelem - 1));
1318 memcpy (nr->lparam, temp + 1 + numelem,
1319 sizeof (MYREAL) * numelem);
1320 fill_helper (&helper, nr->param, nr->lparam, world, nr);
1321 nr->profilenum = temp[numelem2 + 1];
1322 MYMPIRECV (nr->indeks, (MYINT) numelem, MPI_LONG, MASTER, (MYINT) (locus+1),
1323 comm_world, &status);
1324 mpi_gradient_worker (&helper, nr, world->atl);
1325 MYMPISEND (nr->d, (MYINT) numelem, mpisizeof, (MYINT) MASTER, (MYINT) (locus + 1),
1326 comm_world);
1327 break;
1328 case MIGMPI_RESULT: // returns the results (stats for the loci worked on
1329 mpi_results_worker ((long) temp[0], world, repstop, pack_result_buffer);
1330 break;
1331 // case MIGMPI_PLOTPLANE: // returns the results (stats for the loci worked on
1332 //mpi_results_worker ((long) temp[0], world, repstop, pack_plotplane_buffer);
1333 //break;
1334 case MIGMPI_SUMFILE: // returns the tree summary statistic to the master
1335 mpi_results_worker ((long) temp[0], world, repstop, pack_sumfile_buffer);
1336 break;
1337 case MIGMPI_TREESPACE: // returns the treefile to the master
1338 // printf("%i> print treespace to master\n",myID);
1339 mpi_results_worker ((long) temp[0], world, repstop, pack_treespace_buffer);
1340 break;
1341 case MIGMPI_MIGHIST: // returns the histogram of events (top parts + obsolete parts)
1342 mpi_results_worker ((long) temp[0], world, repstop, pack_mighist_buffer);
1343 break;
1344 case MIGMPI_SKYLINE: // returns the histogram of events
1345 //fprintf(stdout,"%i-------------------\n",myID);
1346 // debug_skyline(world,"mpiresultsworker-skyline");
1347 mpi_results_worker ((long) temp[0], world, repstop, pack_skyline_buffer);
1348 //fprintf(stdout,"%i-------------------\n",myID);
1349 break;
1350 case MIGMPI_BAYESHIST: // returns the histogram of the parameters
1351 //printf("%i> send bayeshist\n",myID);
1352 mpi_results_worker ((long) temp[0], world, repstop, pack_bayes_buffer);
1353 break;
1354 #ifdef PARALIO
1355 case MIGMPI_PARALIO:
1356 fprintf(stdout,"%i> bayesmdimfile closing\n", myID, options->bayesmdimfilename);
1357 MPI_File_close(&world->mpi_bayesmdimfile);
1358 sprintf(tempfilename,"%s",options->bayesmdimfilename);
1359 MYMPISEND (tempfilename, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID, comm_world);
1360 break;
1361 #endif
1362 case MIGMPI_END:
1363 done = TRUE;
1364 break;
1365 default:
1366 fprintf (stdout, "%i> Do not understand task --> exit \n", myID);
1367 MPI_Finalize();
1368 exit (0);
1369 }
1370 }
1371 myfree(temp);
1372 myfree(helper.xv);
1373 myfree(helper.expxv);
1374 destroy_nr (nr, world);
1375 }
1376
1377 ///
1378 /// sends out the options to all workers
1379 void
broadcast_options_master(option_fmt * options,data_fmt * data)1380 broadcast_options_master (option_fmt * options, data_fmt *data)
1381 {
1382 long allocbufsize = LONGLINESIZE;
1383 long bufsize;
1384 char *buffer;
1385 #if DEBUG
1386 char nowstr[LINESIZE];
1387 get_time (nowstr, "%c");
1388 printf("Master started option broadcast at %s\n", nowstr);
1389 #endif
1390
1391 // char *buffermu;
1392 buffer = (char *) mycalloc (allocbufsize, sizeof (char));
1393 bufsize = save_options_buffer (&buffer, &allocbufsize, options, data);
1394 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1395 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1396 myfree(buffer);
1397 #if DEBUG
1398 get_time (nowstr, "%c");
1399 printf("master finished broadcast of options %s\n",nowstr);
1400 #endif
1401 }
1402
1403 ///
1404 /// receives the options
1405 void
broadcast_options_worker(option_fmt * options)1406 broadcast_options_worker (option_fmt * options)
1407 {
1408 // long i;
1409 // char *temp;
1410 long bufsize;
1411 char *buffer, *sbuffer;
1412 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1413 buffer = (char *) mycalloc (bufsize + 1, sizeof (char));
1414 sbuffer = buffer;
1415 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1416 read_options_worker (&buffer, options);
1417 myfree(sbuffer);
1418 #if DEBUG
1419 char nowstr[LINESIZE];
1420 get_time (nowstr, "%c");
1421 printf("%i > worker finished option broadcast receive at %s\n",myID, nowstr);
1422 #endif
1423
1424 }
1425
1426
1427 ///
1428 /// broadcasts the data to all nodes
1429 void
broadcast_data_master(data_fmt * data,option_fmt * options)1430 broadcast_data_master (data_fmt * data, option_fmt * options)
1431 {
1432 long bufsize;
1433 long allocbuffermusize = 1;
1434 char *buffermu;
1435 #if DEBUG
1436 char nowstr[LINESIZE];
1437 get_time (nowstr, "%c");
1438 printf("Master broadcasts data at %s\n",nowstr);
1439 #endif
1440 bufsize = pack_databuffer (data, options);
1441 if(options->murates)
1442 {
1443 buffermu = (char *) mycalloc (1, sizeof (char));
1444 bufsize = save_mu_rates_buffer(&buffermu,&allocbuffermusize, options);
1445 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1446 MYMPIBCAST (buffermu, bufsize, MPI_CHAR, MASTER, comm_world);
1447 myfree(buffermu);
1448 }
1449 #ifdef DEBUG
1450 get_time (nowstr, "%c");
1451 printf("Master finished broadcasts of data at %s\n",nowstr);
1452 #endif
1453 }
1454
1455 ///
1456 /// receives the data from the master
1457 void
broadcast_data_worker(data_fmt * data,option_fmt * options)1458 broadcast_data_worker (data_fmt * data, option_fmt * options)
1459 {
1460 long bufsize=1;
1461 char *buffer;
1462 char *buffermu, *sbuffermu;
1463 char *temp;
1464 long i;
1465 //MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1466 buffer = (char *) mycalloc (bufsize, sizeof (char));
1467 //MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1468 unpack_databuffer (data, options);
1469 myfree(buffer);
1470 if(options->murates)
1471 {
1472 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1473 buffermu = (char *) mycalloc (bufsize + 1, sizeof (char));
1474 sbuffermu = buffermu;
1475 MYMPIBCAST (buffermu, bufsize, MPI_CHAR, MASTER, comm_world);
1476 //fprintf(stdout,"%i> received muratebuffer:>%s<\n",myID, buffermu);
1477 temp=strsep(&buffermu,"\n");
1478 options->muloci = atoi(temp);
1479 if(options->mu_rates==NULL)
1480 options->mu_rates = (MYREAL *) mycalloc(1+options->muloci, sizeof(MYREAL));
1481 for(i=0; i< options->muloci; i++)
1482 {
1483 temp = strsep(&buffermu,"\n");
1484 options->mu_rates[i] = atof(temp);
1485 }
1486 myfree(sbuffermu);
1487 }
1488 #ifdef DEBUG
1489 char nowstr[LINESIZE];
1490 get_time (nowstr, "%c");
1491 printf("%i > worker finished data broadcast receive at %s\n",myID, nowstr);
1492 #endif
1493
1494 }
1495
1496 ///
1497 /// pack the data for shipment to the workers
1498 long
pack_databuffer(data_fmt * data,option_fmt * options)1499 pack_databuffer (data_fmt * data, option_fmt * options)
1500 {
1501 long locus, pop, ind;
1502 long bufsize = 0;
1503 long biggest;
1504 #ifdef UEP
1505 long sumtips, i;
1506 #endif
1507 char *buffer;
1508 long allocfpsize=LONGLINESIZE;
1509
1510 buffer = (char *) mycalloc (allocfpsize,sizeof (char));
1511 bufsize = sprintf (buffer, "%c %li %li %li\n", options->datatype, (long) data->hasghost,
1512 data->numpop, data->loci);
1513 // send header info
1514 bufsize += 1;
1515 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1516 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1517 // send sites and loci information
1518 bufsize=0;
1519 for (locus = 0; locus < data->loci; locus++)
1520 {
1521 bufsize += sprintf (buffer+bufsize, "%li\n", data->seq[0]->sites[locus]);
1522 if (bufsize >= allocfpsize-100)
1523 {
1524 allocfpsize += LONGLINESIZE;
1525 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1526 }
1527 }
1528 if (bufsize >= allocfpsize-100)
1529 {
1530 allocfpsize += LONGLINESIZE;
1531 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1532 }
1533 bufsize += sprintf (buffer + bufsize, "%li %f\n", data->seq[0]->addon, data->seq[0]->fracchange);
1534 bufsize += 1;
1535 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1536 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1537
1538
1539 // send population data, send locus data, then one individual at a time
1540 for (pop = 0; pop < data->numpop; pop++)
1541 {
1542 //send each population name
1543 if (allocfpsize < LINESIZE)
1544 {
1545 allocfpsize += LINESIZE;
1546 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1547 }
1548 bufsize = sprintf (buffer, "%s\n", data->popnames[pop]);//set to start
1549 bufsize += 1;
1550 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1551 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1552 biggest = 0;
1553 bufsize = 0;
1554 // send number of ind for each locus per population
1555 for(locus=0; locus<data->loci; locus++)
1556 {
1557 if (bufsize >= allocfpsize-100)
1558 {
1559 allocfpsize += LONGLINESIZE;
1560 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1561 }
1562 bufsize += sprintf (buffer + bufsize, "%li %li\n", data->numind[pop][locus],data->numalleles[pop][locus]);
1563 if(biggest < data->numind[pop][locus])
1564 biggest = data->numind[pop][locus];
1565 }
1566 bufsize += 1;
1567 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1568 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1569 bufsize = 0;
1570 // send individuals
1571 if (!strchr (SEQUENCETYPES, options->datatype))
1572 {
1573 for (ind = 0; ind < biggest; ind++)
1574 {
1575 // assume that buffer is always larger than nmlength
1576 // resetting bufsize to start!
1577 bufsize = sprintf (buffer, "%*s\n", (int) options->nmlength, data->indnames[pop][ind][0]);
1578 pack_allele_data (&buffer, &bufsize, &allocfpsize, data, pop, ind);
1579 bufsize += 1;
1580 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1581 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1582 }
1583 }
1584 else
1585 {
1586 bufsize = 0;
1587 for(locus=0;locus<data->loci; ++locus)
1588 {
1589 for (ind = 0; ind < data->numind[pop][locus]; ind++)
1590 {
1591 // assume that buffer is always larger than nmlength
1592 // resetting bufsize to start!
1593 bufsize = sprintf (buffer, "%*s\n", (int) options->nmlength,
1594 data->indnames[pop][ind][locus]);
1595 pack_sequence_data (&buffer, &bufsize, &allocfpsize, data, pop, ind, locus);
1596 bufsize += 1;
1597 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1598 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1599 }
1600 }
1601 }
1602 }
1603 // send geofile
1604 if (options->geo)
1605 {
1606 bufsize = 0;
1607 for (pop = 0; pop < data->numpop * data->numpop; pop++)
1608 {
1609 if (bufsize >= allocfpsize-100)
1610 {
1611 allocfpsize += LONGLINESIZE;
1612 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1613 }
1614 bufsize += sprintf (buffer+ bufsize, "%f %f\n", data->geo[pop], data->lgeo[pop]);
1615 }
1616 bufsize += 1;
1617 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1618 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1619 }
1620 // send randomsubset data
1621 bufsize = 0;
1622 for (pop = 0; pop < data->numpop; pop++)
1623 {
1624 for(locus=0;locus<data->loci;locus++)
1625 {
1626 for(ind=0; ind < data->numind[pop][locus]; ind++)
1627 {
1628 if (bufsize >= allocfpsize-100)
1629 {
1630 allocfpsize += LONGLINESIZE;
1631 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1632 }
1633 bufsize += sprintf (buffer+ bufsize, "%li\n", data->shuffled[pop][locus][ind]);
1634 }
1635 }
1636 }
1637 bufsize += 1;
1638 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1639 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1640
1641 // tipdate file
1642 if (options->has_datefile)
1643 {
1644 bufsize = 0;
1645 for(locus=0;locus<data->loci;locus++)
1646 {
1647 for (pop = 0; pop < data->numpop; pop++)
1648 {
1649 for(ind=0; ind < data->numind[pop][locus]; ind++)
1650 {
1651 if (bufsize >= allocfpsize-200)
1652 {
1653 allocfpsize += LONGLINESIZE;
1654 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1655 }
1656 bufsize += sprintf (buffer+bufsize, "%s %20.20f\n", data->sampledates[pop][locus][ind].name, data->sampledates[pop][locus][ind].date);
1657 }
1658 }
1659 }
1660 // assumes buffer is big enough
1661 bufsize += sprintf (buffer + bufsize, "%20.20f\n", data->maxsampledate);
1662 // printf("%i> pack_data maxsampledate=%f %s\n",myID, data->maxsampledate, fp);
1663 //printf("%i> pack_data:%s\n""""""""""""""""""""""""\n\n\n",myID, buffer);
1664 bufsize += 1;
1665 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1666 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1667 }
1668
1669 // uepfile
1670 #ifdef UEP
1671 if (options->uep)
1672 {
1673 sumtips = 0;
1674 bufsize = 0;
1675 for (pop = 0; pop < data->numpop; ++pop)
1676 sumtips += data->numind[pop][0];//Assumes UEP is matched by locus 1
1677 bufsize += sprintf (buffer+ bufsize, "%li %li\n", sumtips, data->uepsites);
1678 if (strchr (SEQUENCETYPES, options->datatype))
1679 {
1680 for (pop = 0; sumtips; pop++)
1681 {
1682 for (i = 0; i < data->uepsites; i++)
1683 {
1684 if (bufsize >= allocfpsize-100)
1685 {
1686 allocfpsize += LONGLINESIZE;
1687 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1688 }
1689 bufsize += sprintf (buffer + bufsize, "%i\n", data->uep[pop][i]);
1690 }
1691 }
1692 }
1693 else
1694 {
1695 for (pop = 0; sumtips; pop++)
1696 {
1697 for (i = 0; i < data->uepsites; i++)
1698 {
1699 if (bufsize >= allocfpsize-100)
1700 {
1701 allocfpsize += LONGLINESIZE;
1702 buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1703 }
1704 bufsize += sprintf (buffer + bufsize, "%i %i\n", data->uep[pop][i],
1705 data->uep[pop + sumtips][i]);
1706 }
1707 }
1708 }
1709 bufsize += 1;
1710 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1711 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1712 }
1713
1714 #endif
1715 myfree(buffer);
1716 return 0;
1717 }
1718
1719
1720 void
pack_allele_data(char ** buffer,long * bufsize,long * allocbufsize,data_fmt * data,long pop,long ind)1721 pack_allele_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data, long pop,
1722 long ind)
1723 {
1724 //char fp[LONGLINESIZE];
1725 long locus;
1726 for (locus = 0; locus < data->loci; locus++)
1727 {
1728 if (*bufsize > *allocbufsize-100)
1729 {
1730 *allocbufsize += *bufsize;
1731 *buffer = (char*) myrealloc(*buffer, *allocbufsize * sizeof(char));
1732 }
1733 *bufsize += sprintf (*buffer + *bufsize, "%s %s\n", data->yy[pop][ind][locus][0],
1734 data->yy[pop][ind][locus][1]);
1735 }
1736 }
1737
1738 void
pack_sequence_data(char ** buffer,long * bufsize,long * allocbufsize,data_fmt * data,long pop,long ind,long locus)1739 pack_sequence_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data, long pop,
1740 long ind, long locus)
1741 {
1742 //char *fp;
1743 // long locus;
1744 // fp = mycalloc (1, sizeof (char));
1745 // for (locus = 0; locus < data->loci; locus++)
1746 // {
1747 if (*bufsize > *allocbufsize-data->seq[0]->sites[locus])
1748 {
1749 *allocbufsize += *bufsize;
1750 *buffer = (char*) myrealloc(*buffer, *allocbufsize * sizeof(char));
1751 }
1752 *bufsize += sprintf (*buffer + *bufsize, "%s\n", data->yy[pop][ind][locus][0]);
1753 // }
1754 }
1755
1756 // this function and get_data() do not mix well!
1757 void
unpack_databuffer(data_fmt * data,option_fmt * options)1758 unpack_databuffer (data_fmt * data, option_fmt * options)
1759 {
1760 long len;
1761 long locus, pop, ind, i=0;
1762 long biggest;
1763 #ifdef UEP
1764 long sumtips;
1765 #endif
1766 char *buffer;
1767 char *buf;
1768 char *input;
1769 long inputsize = LONGLINESIZE;
1770 char *name;
1771 long hasghost;
1772 long bufsize;
1773 long allocbufsize=LONGLINESIZE;
1774 buffer = (char *) mycalloc(allocbufsize,sizeof(char));
1775 input = (char *) mycalloc (LONGLINESIZE, sizeof (char));
1776 name = (char *) mycalloc (LONGLINESIZE, sizeof (char));
1777 // receive header info
1778 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1779 if (bufsize > allocbufsize)
1780 {
1781 allocbufsize = bufsize;
1782 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1783 }
1784 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1785 buf = buffer;
1786 sgets_safe (&input, &inputsize, &buf);
1787 sscanf (input, "%c%li%li%li", &options->datatype, &hasghost, &data->numpop,
1788 &data->loci);
1789 //printf("datatype=%c numpop=%li loci=%li\n",options->datatype,data->numpop,data->loci);
1790 data->hasghost = (boolean) hasghost;
1791 init_data_structure1 (&data, options);
1792 // receive site and loci information
1793 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1794 if (bufsize > allocbufsize)
1795 {
1796 allocbufsize = bufsize;
1797 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1798 }
1799 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1800 buf = buffer;
1801 for (locus = 0; locus < data->loci; locus++)
1802 {
1803 sgets_safe (&input, &inputsize, &buf);
1804 sscanf (input, "%li", &data->seq[0]->sites[locus]);
1805 }
1806 sgets_safe (&input, &inputsize, &buf);
1807 #ifdef USE_MYREAL_FLOAT
1808 sscanf (input, "%li%f", &data->seq[0]->addon, &data->seq[0]->fracchange);
1809 #else
1810 sscanf (input, "%li%lf", &data->seq[0]->addon, &data->seq[0]->fracchange);
1811 #endif
1812 // population data
1813 for (pop = 0; pop < data->numpop; pop++)
1814 {
1815 // receive each population name
1816 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1817 if (bufsize > allocbufsize)
1818 {
1819 allocbufsize = bufsize;
1820 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1821 }
1822 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1823 buf = buffer;
1824 sgets_safe (&input, &inputsize, &buf);
1825 sscanf (input, "%s", data->popnames[pop]);
1826 biggest=0;
1827 // receive ind numbers for each locus
1828 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1829 if (bufsize > allocbufsize)
1830 {
1831 allocbufsize = bufsize;
1832 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1833 }
1834 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1835 buf = buffer;
1836 for(locus=0; locus<data->loci; locus++)
1837 {
1838 sgets_safe (&input, &inputsize, &buf);
1839 sscanf (input, "%li %li", &data->numind[pop][locus],&data->numalleles[pop][locus]);
1840 if(biggest<data->numind[pop][locus])
1841 biggest = data->numind[pop][locus];
1842 }
1843 init_data_structure2 (&data, options, pop);
1844 // receive individual data per pop
1845 if (!strchr (SEQUENCETYPES, options->datatype))
1846 {
1847 for (ind = 0; ind < biggest; ind++)
1848 {
1849 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1850 if (bufsize > allocbufsize)
1851 {
1852 allocbufsize = bufsize;
1853 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1854 }
1855 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1856 buf = buffer;
1857 sgets_safe (&input, &inputsize, &buf);
1858 sscanf (input, "%s", data->indnames[pop][ind][0]);
1859 for (locus = 0; locus < data->loci; locus++)
1860 {
1861 sgets_safe (&input, &inputsize, &buf);
1862 sscanf (input, "%s %s", data->yy[pop][ind][locus][0],
1863 data->yy[pop][ind][locus][1]);
1864 }
1865 }
1866 }
1867 else
1868 {
1869 for (locus = 0; locus < data->loci; locus++)
1870 {
1871 for (ind = 0; ind < data->numind[pop][locus]; ind++)
1872 {
1873 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1874 if (bufsize > allocbufsize)
1875 {
1876 allocbufsize = bufsize;
1877 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1878 }
1879 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1880 buf = buffer;
1881 sgets_safe (&input, &inputsize, &buf);
1882 sprintf(data->indnames[pop][ind][locus],"%-*s", (int) options->nmlength, input);
1883 input =(char *) myrealloc (input, sizeof (char) * (1 + data->seq[0]->sites[locus]));
1884 sgets (input, 10 + data->seq[0]->sites[locus], &buf);
1885 len = data->seq[0]->sites[locus];
1886 sprintf(data->yy[pop][ind][locus][0],"%-*s", (int) len,input);
1887 }
1888 }
1889 }
1890 }
1891 // geofile
1892 data->geo =
1893 (MYREAL *) mycalloc (1, sizeof (MYREAL) * data->numpop * data->numpop);
1894 data->lgeo =
1895 (MYREAL *) mycalloc (1, sizeof (MYREAL) * data->numpop * data->numpop);
1896 if (!options->geo)
1897 {
1898 for (i = 0; i < data->numpop * data->numpop; i++)
1899 data->geo[i] = 1.0;
1900 }
1901 else
1902 {
1903 // receive geofile
1904 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1905 if (bufsize > allocbufsize)
1906 {
1907 allocbufsize = bufsize;
1908 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1909 }
1910 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1911 buf = buffer;
1912 for (pop = 0; pop < data->numpop * data->numpop; pop++)
1913 {
1914 sgets_safe (&input, &inputsize, &buf);
1915 #ifdef USE_MYREAL_FLOAT
1916 sscanf (input, "%f%f", &data->geo[pop], &data->lgeo[pop]);
1917 #else
1918 sscanf (input, "%lf%lf", &data->geo[pop], &data->lgeo[pop]);
1919 #endif
1920 }
1921 }
1922
1923 // receive randomsubset
1924 data->shuffled = (long ***) mycalloc(data->numpop,sizeof(long **));
1925 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1926 if (bufsize > allocbufsize)
1927 {
1928 allocbufsize = bufsize;
1929 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1930 }
1931 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1932 buf = buffer;
1933 for (pop = 0; pop < data->numpop; pop++)
1934 {
1935 data->shuffled[pop] = (long **) mycalloc(data->loci,sizeof(long *));
1936 for(locus=0;locus<data->loci;locus++)
1937 {
1938 data->shuffled[pop][locus] = (long *) mycalloc(data->numind[pop][locus],sizeof(long));
1939 for(ind=0; ind < data->numind[pop][locus]; ind++)
1940 {
1941 sgets_safe (&input, &inputsize, &buf);
1942 sscanf (input, "%li", &data->shuffled[pop][locus][ind]);
1943 }
1944 }
1945 }
1946
1947 //receive date data
1948 data->sampledates = (tipdate_fmt ***) mycalloc (data->numpop, sizeof (tipdate_fmt **));
1949 data->sampledates[0] = (tipdate_fmt **) mycalloc (data->numpop * data->loci, sizeof (tipdate_fmt *));
1950 for (pop = 1; pop < data->numpop; pop++)
1951 {
1952 data->sampledates[pop] = data->sampledates[0] + data->loci * pop;
1953 }
1954 for(locus=0;locus<data->loci;locus++)
1955 {
1956 for(pop=0;pop < data->numpop; pop++)
1957 {
1958 data->sampledates[pop][locus] = (tipdate_fmt*) mycalloc(data->numind[pop][locus],sizeof(tipdate_fmt));
1959 }
1960 }
1961 data->maxsampledate = 0.0;
1962 if (options->has_datefile)
1963 {
1964 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1965 if (bufsize > allocbufsize)
1966 {
1967 allocbufsize = bufsize;
1968 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1969 }
1970 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1971 buf = buffer;
1972 for(locus=0;locus<data->loci;locus++)
1973 {
1974 for (pop = 0; pop < data->numpop; pop++)
1975 {
1976 for(ind=0; ind < data->numind[pop][locus]; ind++)
1977 {
1978 sgets_safe (&input, &inputsize, &buf);
1979 //printf("%i> %s\n",myID, input);
1980 #ifdef USE_MYREAL_FLOAT
1981 sscanf (input, "%s %f", name, &data->sampledates[pop][locus][ind].date);
1982 #else
1983 sscanf (input, "%s %lf", name, &data->sampledates[pop][locus][ind].date);
1984
1985 data->sampledates[pop][locus][ind].name = (char *) mycalloc(strlen(name)+1,sizeof(char));
1986 strcpy(data->sampledates[pop][locus][ind].name, name);
1987 }
1988 }
1989 }
1990 sgets_safe (&input, &inputsize, &buf);
1991 #ifdef USE_MYREAL_FLOAT
1992 sscanf(input,"%f",&data->maxsampledate);
1993 #else
1994 sscanf(input,"%lf",&data->maxsampledate);
1995 #endif
1996
1997 // printf("%i> maxsampledate=%f %s\n''''''\n%s\n''''''''''''\n",myID, data->maxsampledate, input, buffer);
1998 }
1999 #endif
2000 // uepfile
2001 #ifdef UEP
2002 if (options->uep)
2003 {
2004 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
2005 if (bufsize > allocbufsize)
2006 {
2007 allocbufsize = bufsize;
2008 buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
2009 }
2010 MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
2011 buf = buffer;
2012 sgets_safe (&input, &inputsize, &buf);
2013 sscanf (input, "%li%li", &sumtips, &data->uepsites);
2014 data->uep =
2015 (int **) mycalloc (number_genomes (options->datatype) * sumtips,
2016 sizeof (int *));
2017 if (strchr (SEQUENCETYPES, options->datatype))
2018 {
2019 for (pop = 0; sumtips; pop++)
2020 {
2021 data->uep[i] = (int *) mycalloc (data->uepsites, sizeof (int));
2022 for (i = 0; i < data->uepsites; i++)
2023 {
2024 sgets_safe (&input, &inputsize, &buf);
2025 sscanf (input, "%i", &data->uep[pop][i]);
2026 }
2027 }
2028 }
2029 else
2030 {
2031 for (pop = 0; sumtips; pop++)
2032 {
2033 data->uep[i] = (int *) mycalloc (data->uepsites, sizeof (int));
2034 data->uep[i + sumtips] =
2035 (int *) mycalloc (data->uepsites, sizeof (int));
2036 for (i = 0; i < data->uepsites; i++)
2037 {
2038 sgets_safe (&input, &inputsize, &buf);
2039 sscanf (input, "%i%i", &data->uep[pop][i],
2040 &data->uep[pop + sumtips][i]);
2041 }
2042 }
2043 }
2044
2045 }
2046 #endif
2047 init_data_structure3 (data,options);
2048
2049 switch (options->datatype)
2050 {
2051 case 'a':
2052 create_alleles (data, options);
2053 break;
2054 case 'b':
2055 for (pop = 0; pop < data->loci; pop++)
2056 data->maxalleles[pop] = XBROWN_SIZE;
2057 break;
2058 case 'm':
2059 create_alleles (data, options);
2060 for (pop = 0; pop < data->loci; pop++)
2061 data->maxalleles[pop] = options->micro_stepnum;
2062 break;
2063 }
2064 myfree(input);
2065 myfree(name);
2066 myfree(buffer);
2067 }
2068
2069 #if 0
2070 ///
2071 /// unpacks results from buffer to fill data structures in master for final printout
2072 void
unpack_plotplane_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2073 unpack_plotplane_buffer (MYREAL *buffer, world_fmt * world,
2074 long locus, long maxrep, long numpop)
2075 {
2076
2077 }
2078 ///
2079 /// pack plotplane into buffer to ship to master
2080 long
pack_plotplane_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2081 pack_plotplane_buffer (MYREAL **buffer, world_fmt * world,
2082 long locus, long maxrep, long numpop)
2083 {
2084 }
2085 #endif
2086
2087
2088 ///
2089 /// unpacks results from buffer to fill data structures in master for final printout
2090 void
unpack_result_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2091 unpack_result_buffer (MYREAL *buffer, world_fmt * world,
2092 long locus, long maxrep, long numpop)
2093 {
2094 long z=0;
2095 long rep;
2096 long pop;
2097 long addon=0;
2098 long numpop2 = world->numpop2;
2099 timearchive_fmt **atl = world->atl;
2100 MYREAL ***apg0 = world->apg0;
2101
2102 if (maxrep > 1)
2103 addon = 1;
2104
2105 for (rep = 0; rep < maxrep + addon; rep++)
2106 {
2107 atl[rep][locus].param_like = buffer[z++];
2108 for (pop = 0; pop < 4 * numpop2; pop++)
2109 {
2110 atl[rep][locus].parameters[pop] = buffer[z++];
2111 }
2112 //fprintf(stderr,"%i> unpacked result locus=%li replicate %li\n",myID,locus, rep);
2113 }
2114 // apg0
2115 for (rep = 0; rep < maxrep; rep++)
2116 {
2117 for (pop = 0; pop < world->options->lsteps; pop++)
2118 {
2119 apg0[rep][locus][pop] = buffer[z++];
2120 }
2121 }
2122 if(world->options->bayes_infer)
2123 {
2124 // BF material
2125 z = unpack_BF_buffer(buffer, z, locus, world);
2126 // ESS material
2127 z = unpack_ess_buffer(buffer, z, world);
2128 }
2129 }
2130
2131 ///
2132 /// pack results into buffer to ship to master
2133 long
pack_result_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2134 pack_result_buffer (MYREAL **buffer, world_fmt * world,
2135 long locus, long maxrep, long numpop)
2136 {
2137 long rep;
2138 long pop;
2139 long bufsize;
2140 long z = 0;
2141 long addon = 0;
2142 long numpop2 = world->numpop2;
2143 long nn = numpop2 + (world->bayes->mu * world->loci);
2144 timearchive_fmt **atl = world->atl;
2145 MYREAL ***apg0 = world->apg0;
2146
2147 if (maxrep > 1)
2148 addon = 1;
2149
2150 bufsize = (maxrep+addon) + (maxrep+addon) * 4 * numpop2 + maxrep * world->options->lsteps + \
2151 world->options->heated_chains * world->loci + 5 * world->loci + 1 + nn * 2;
2152 (*buffer) = (MYREAL *) myrealloc (*buffer, sizeof (MYREAL) * bufsize);
2153 memset (*buffer, 0, sizeof (MYREAL) * bufsize);
2154
2155 for (rep = 0; rep < maxrep + addon; rep++)
2156 {
2157 (*buffer)[z++] = atl[rep][locus].param_like;
2158 for (pop = 0; pop < 4 * numpop2; pop++)
2159 {
2160 (*buffer)[z++] = atl[rep][locus].parameters[pop];
2161 }
2162 // fprintf(stderr,"%i> packed result locus=%li replicate %li\n",myID,locus, rep);
2163 }
2164 // apg0
2165 for (rep = 0; rep < maxrep; rep++)
2166 {
2167 for (pop = 0; pop < world->options->lsteps; pop++)
2168 {
2169 (*buffer)[z++] = apg0[rep][locus][pop];
2170 }
2171 }
2172 if(world->options->bayes_infer)
2173 {
2174 // BF material
2175 if(!world->data->skiploci[locus])
2176 {
2177 //fprintf(stderr,"%i> packed result locus=%li replicate %li\n",myID,locus, rep);
2178 z = pack_BF_buffer(buffer, z, locus, world);
2179 }
2180 // ESS material
2181 z = pack_ess_buffer(buffer, z, world);
2182 }
2183 #ifdef DEBUG_MPI
2184 fprintf(stdout,"DEBUG: %i> z=%li, bufsize=%li\n", myID, z, bufsize);
2185 #endif
2186 if(bufsize >= z)
2187 {
2188 bufsize = z;
2189 }
2190 else
2191 {
2192 fprintf(stderr,"%i> bufsize=%li < z=%li\n",myID,bufsize,z);
2193 error("pack_results tried to stuff to much into buffer in pack_result_buffer()\n");
2194 }
2195 return bufsize;
2196 }
2197
2198
2199
2200
2201 ///
2202 /// unpacks replicate samples of migration events, adds numbers and part-vectors
2203 /// to the final array per locus, this function is only used with replicates over
2204 /// multiple loci.
2205 void
unpack_mighist_replicate_buffer_old(MYREAL * buffer,world_fmt * world,long locus,long numpop)2206 unpack_mighist_replicate_buffer_old (MYREAL *buffer, world_fmt * world,
2207 long locus, long numpop)
2208 {
2209 long i;
2210 long j;
2211 long z = 0;
2212 long nummighist;
2213 long nummighistold;
2214 mighistloci_fmt *aa;
2215 aa = &world->mighistloci[locus];
2216 nummighist = (long) buffer[z++];
2217 nummighistold = aa->mighistnum;
2218 aa->mighistnum += nummighist;
2219 if(aa->allocsize <= aa->mighistnum)
2220 {
2221 aa->mighist = (mighist_fmt *) myrealloc (aa->mighist, sizeof (mighist_fmt) *(aa->mighistnum+1));
2222 for(j=aa->allocsize; j<=aa->mighistnum; j++)
2223 {
2224 aa->mighist[j].allocsize=1;
2225 aa->mighist[j].migeventsize=0;
2226 aa->mighist[j].migevents =
2227 (migevent_fmt *) mycalloc (1, sizeof (migevent_fmt) );
2228 //printf("%i> first events: alloc=%li size=%li\n", myID, aa->mighist[j].allocsize , aa->mighist[j].migeventsize);
2229 }
2230 aa->allocsize = aa->mighistnum+1;
2231 }
2232 for (j = nummighistold; j < aa->mighistnum; j++)
2233 {
2234 aa->mighist[j].copies = (long) buffer[z++];
2235 aa->mighist[j].weight = (long) buffer[z++];
2236 aa->mighist[j].migeventsize = (long) buffer[z++];
2237 //printf("%i> events: alloc=%li size=%li\n", myID, aa->mighist[j].allocsize , aa->mighist[j].migeventsize);
2238 aa->mighist[j].allocsize = aa->mighist[j].migeventsize + 1;
2239 aa->mighist[j].migevents = (migevent_fmt *) myrealloc (aa->mighist[j].migevents,
2240 sizeof (migevent_fmt) *
2241 aa->mighist[j].allocsize);
2242 for (i = 0; i < aa->mighist[j].migeventsize; i++)
2243 {
2244 aa->mighist[j].migevents[i].age = buffer[z++];
2245 aa->mighist[j].migevents[i].from = (long) buffer[z++];
2246 aa->mighist[j].migevents[i].to = (long) buffer[z++];
2247 aa->mighist[j].migevents[i].sumlines = (long) buffer[z++];
2248 }
2249 }
2250 }
2251 ///
2252 /// unpacks replicate samples of migration events, adds numbers and part-vectors
2253 /// to the final array per locus, this function is only used with replicates over
2254 /// multiple loci.
2255 void
unpack_mighist_replicate_buffer(MYREAL * buffer,world_fmt * world,long locus,long numpop)2256 unpack_mighist_replicate_buffer(MYREAL *buffer, world_fmt * world,
2257 long locus, long numpop)
2258 {
2259 long i;
2260 long pop;
2261 long z = 0;
2262 long oldeventbinnum;
2263 long * eventbinnum = NULL;
2264 duo ** eventbins;
2265 mighistloci_fmt *aa;
2266
2267 aa = &world->mighistloci[locus];
2268 eventbins = aa->migeventbins;
2269 eventbinnum = aa->migeventbinnum;
2270
2271
2272 for (pop = (world->options->mighist_all ? 0 : world->numpop);
2273 pop < world->numpop2; pop++)
2274 {
2275 oldeventbinnum = eventbinnum[pop];
2276 eventbinnum[pop] = (long) buffer[z++];
2277 if(oldeventbinnum < eventbinnum[pop])
2278 {
2279 aa->migeventbins[pop] = (duo *) myrealloc(aa->migeventbins[pop], sizeof(duo) * eventbinnum[pop]);
2280 memset(aa->migeventbins[pop][oldeventbinnum], 0, sizeof(duo) * (eventbinnum[pop] - oldeventbinnum));
2281 }
2282 for (i = 0; i < eventbinnum[pop]; i++)
2283 {
2284 eventbins[pop][i][0] += buffer[z++];
2285 eventbins[pop][i][1] += buffer[z++];
2286 }
2287 }
2288 }
2289
2290
2291 ///
2292 /// receive buffer with treespace content and unpack it.
2293 /// Messing with format assignment to fit into the result_worker/master scheme
2294 void
unpack_treespace_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2295 unpack_treespace_buffer (MYREAL *buffer, world_fmt * world,
2296 long locus, long maxrep, long numpop)
2297 {
2298 MYREAL like = -HUGE;
2299 char *input;
2300 char *sbuf;
2301 char *buf ;
2302 long size = strlen((char *) buffer)+1;
2303 buf = NULL;
2304 // printf("%i> in unpack_treespace_buffer() buffer has size %li\n%s\n\n",myID,size, (char*) buffer);
2305 if(size<=1)
2306 {
2307 return;
2308 }
2309 //input = (char*) mycalloc(LINESIZE,sizeof(char));
2310 // printf("%i> in unpack_treespace_buffer() after first if\n",myID);
2311 buf = (char *) mycalloc(size+1,sizeof(char));
2312 sbuf = buf;
2313 memcpy(buf, buffer,sizeof(char)*(size));
2314 //printf("%i> in unpack_treespace_buffer() after memcpy\n",myID);
2315 // printf("%i> in unpack_treespace_buffer() %s\n%s\n",myID,input,buf);
2316 // printf("%i> Unpack++++++++++++\nsize=%li\n%s\n++++++++++++\n%s\n++++++++++++++++++++++++++++++++++++++++++++++++++\n",myID, size, buf, (char*) buffer);fflush(stdout);
2317 if(world->options->treeprint==BEST)
2318 {
2319 input = strsep(&buf,"@");
2320 //printf("%i> in unpack_treespace_buffer() after strsep %s\n",myID,input);fflush(stdout);
2321 like = atof(input);
2322 if(like >= world->besttreelike[locus])
2323 {
2324 world->besttreelike[locus] = like;
2325 // if(world->treespacenum[locus] < size)
2326 // {
2327 //printf("%i> in unpack_treespace_buffer() before treespace realloc %s\n%s\n",myID,input,buf);
2328 size = strlen(buf) + 1;
2329 world->treespacealloc[locus] = size;
2330 world->treespacenum[locus] = size;
2331 world->treespace[locus] = (char *) myrealloc(world->treespace[locus],
2332 sizeof(char)*(size+1));
2333 strcpy(world->treespace[locus],buf);
2334 }
2335 }
2336 else
2337 {
2338 size = strlen(buf) + 1;
2339 world->treespacealloc[locus] = size;
2340 world->treespacenum[locus] = size;
2341 world->treespace[locus] = (char *) myrealloc(world->treespace[locus],
2342 sizeof(char)*(size+1));
2343 strcpy(world->treespace[locus],buf);
2344 }
2345 myfree(sbuf);
2346 }
2347
2348 ///
2349 /// pack treespace into buffer
2350 /// ship to master; messing with format assignment because the standard transport buffer
2351 /// is a double
pack_treespace_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2352 long pack_treespace_buffer (MYREAL **buffer, world_fmt * world,
2353 long locus, long maxrep, long numpop)
2354 {
2355 long thissize = strlen(world->treespace[locus])+1;
2356 long thisrealsize;
2357 char *input;
2358 char *ptr2, *ptr3;
2359 long pos = 0;
2360 MYREAL like = -HUGE;
2361 if(world->treespace[locus]==NULL)
2362 return 0;
2363 ptr2 = strstr(world->treespace[locus],"=");
2364 ptr3 = strstr(world->treespace[locus]," ]\n");
2365 pos = ptr3-ptr2;
2366
2367 input = mycalloc(LONGLINESIZE,sizeof(char));
2368 if(world->options->treeprint == BEST)
2369 {
2370 sprintf(input,"%-*s", (int) pos, ptr2+1);
2371 like = atof(input);
2372 pos = sprintf(input,"%f @", like);
2373 // fprintf(stdout,"%i> in pack_treespace_buffer() filling %li size\n",myID, thissize);
2374 // the master routine for this expects a MYREAL *buffer, but
2375 // the tree is a string
2376 thissize += pos;
2377 }
2378 thisrealsize = (long) (1. + (thissize * sizeof(char) / sizeof(MYREAL)));
2379 (*buffer) = (MYREAL *) myrealloc(*buffer, (1+thisrealsize) * sizeof(MYREAL));
2380 memset(*buffer, 0, sizeof(MYREAL) * (1+thisrealsize));
2381 // the sizeof(char) is NO MISTAKE!
2382 sprintf((char*)(*buffer), "%s%s", input, world->treespace[locus]);
2383 // if(myID==1)
2384 // {
2385 // printf("%i> pack++++++++++++++++\nsize=%li, realsize=%li [char=%li real=%li]\n%s\n++++++++++++++++++++++++++++++++\n",myID, thissize,thisrealsize, sizeof(char), sizeof(MYREAL),(char *)(*buffer));
2386 // }
2387 return thisrealsize;
2388 }
2389
2390
2391 void
unpack_mighist_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2392 unpack_mighist_buffer (MYREAL *buffer, world_fmt * world,
2393 long locus, long maxrep, long numpop)
2394 {
2395 long i;
2396 long pop;
2397 long z = 0;
2398 long oldeventbinnum;
2399 long * eventbinnum = NULL;
2400 duo ** eventbins;
2401 mighistloci_fmt *aa;
2402
2403 aa = &world->mighistloci[locus];
2404 eventbins = aa->migeventbins;
2405 eventbinnum = aa->migeventbinnum;
2406
2407
2408 for (pop = (world->options->mighist_all ? 0 : world->numpop);
2409 pop < world->numpop2; pop++)
2410 {
2411 oldeventbinnum = eventbinnum[pop];
2412 eventbinnum[pop] = (long) buffer[z++];
2413 if(oldeventbinnum < eventbinnum[pop])
2414 {
2415 aa->migeventbins[pop] = (duo *) myrealloc(aa->migeventbins[pop], sizeof(duo) * eventbinnum[pop]);
2416 memset(aa->migeventbins[pop][oldeventbinnum], 0, sizeof(duo) * (eventbinnum[pop] - oldeventbinnum));
2417 }
2418
2419 for (i = 0; i < eventbinnum[pop]; i++)
2420 {
2421 eventbins[pop][i][0] = buffer[z++];
2422 eventbins[pop][i][1] = buffer[z++];
2423 }
2424 }
2425 }
2426
2427
2428 ///
2429 /// pack migration events and coalescence events into buffer to
2430 /// ship to master
pack_mighist_buffer_old(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2431 long pack_mighist_buffer_old (MYREAL **buffer, world_fmt * world,
2432 long locus, long maxrep, long numpop)
2433 {
2434 long i;
2435 long j;
2436 long bufsize = 1;
2437 long z = 0;
2438 mighistloci_fmt *aa;
2439 long thin = 1;
2440 MYREAL *tmp;
2441 aa = &world->mighistloci[locus];
2442 for (j = 0; j < aa->mighistnum; j++)
2443 {
2444 //aa->mighist[j].migeventsize = 1;
2445 bufsize += aa->mighist[j].migeventsize;
2446 }
2447 // trial code to avoid a crash due to not enough memory available for buffer
2448 if((tmp = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize+1))) == NULL)
2449 {
2450 do
2451 {
2452 thin *= 10;
2453 warning("Recovery mode for the migration events: thinning the events list by a factor of %li",thin);
2454 bufsize = 0;
2455 for (j = 0; j < aa->mighistnum; j+=thin)
2456 {
2457 if(j<aa->mighistnum)
2458 bufsize += aa->mighist[j].migeventsize;
2459 }
2460 tmp = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize+1));
2461 }
2462 while(tmp == NULL || thin < 10000);
2463 }
2464 (*buffer) = tmp;
2465 memset (*buffer, 0, sizeof (MYREAL) * (bufsize+1));
2466
2467 (*buffer)[z++] = (MYREAL)(aa->mighistnum/thin);
2468 for (j = 0; j < aa->mighistnum; j+=thin)
2469 {
2470 if(j < aa->mighistnum)
2471 {
2472 #ifdef DEBUG_MPI
2473 printf ("%i> packmighistbuffer: %li %li %li\n", myID, aa->mighist[j].copies,
2474 aa->mighist[j].weight, aa->mighist[j].migeventsize);
2475 #endif
2476 (*buffer)[z++] = (MYREAL) aa->mighist[j].copies;
2477 (*buffer)[z++] = (MYREAL) aa->mighist[j].weight;
2478 (*buffer)[z++] = (MYREAL) aa->mighist[j].migeventsize;
2479
2480 for (i = 0; i < aa->mighist[j].migeventsize; i++)
2481 {
2482 (*buffer)[z++] = aa->mighist[j].migevents[i].age;
2483 (*buffer)[z++] = (MYREAL) aa->mighist[j].migevents[i].from;
2484 (*buffer)[z++] = (MYREAL) aa->mighist[j].migevents[i].to;
2485 (*buffer)[z++] = (MYREAL) aa->mighist[j].migevents[i].sumlines;
2486 }
2487 }
2488 }
2489 if(bufsize < z)
2490 {
2491 fprintf(stderr,"%i> bufsize=%li < z=%li\n",myID,bufsize,z);
2492 error("pack_mighist_buffer() failed\n");
2493 }
2494 return z;
2495 }
2496 ///
2497 /// pack migration events and coalescence events into buffer to
2498 /// ship to master
pack_mighist_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2499 long pack_mighist_buffer (MYREAL **buffer, world_fmt * world,
2500 long locus, long maxrep, long numpop)
2501 {
2502 long i;
2503 long pop;
2504 long z = 0;
2505 long bufsize = 0;
2506 long * eventbinnum = NULL;
2507 duo ** eventbins;
2508 mighistloci_fmt *aa;
2509
2510 aa = &world->mighistloci[locus];
2511 eventbins = aa->migeventbins;
2512 eventbinnum = aa->migeventbinnum;
2513 for (pop = (world->options->mighist_all ? 0 : world->numpop);
2514 pop < world->numpop2; pop++)
2515 {
2516 bufsize += 1 + 2 * eventbinnum[pop];
2517 }
2518 (*buffer) = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize+1));
2519
2520 for (pop = (world->options->mighist_all ? 0 : world->numpop);
2521 pop < world->numpop2; pop++)
2522 {
2523 (*buffer)[z++] = (MYREAL)(eventbinnum[pop]);
2524 for (i = 0; i < eventbinnum[pop]; i++)
2525 {
2526 (*buffer)[z++] = (MYREAL) eventbins[pop][i][0];
2527 (*buffer)[z++] = (MYREAL) eventbins[pop][i][1];
2528 }
2529 }
2530 if(bufsize < z)
2531 error("pack_mighist_buffer() has a memory problem");
2532 return z;
2533 }
2534
2535 ///
2536 /// receive buffer with skyline content and unpack it
2537 void
unpack_skyline_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2538 unpack_skyline_buffer (MYREAL *buffer, world_fmt * world,
2539 long locus, long maxrep, long numpop)
2540 {
2541 //const MYREAL invmaxrep = 1./((world->options->replicate
2542 // && world->options->replicatenum >
2543 // 0) ? world->options->replicatenum : 1);
2544 const long np2 = numpop * numpop;
2545 long i;
2546 long j;
2547 long templ;
2548 long allocsize;
2549 long z = 0;
2550 long *receive_eventbinnum;
2551 MYREAL temp;
2552 mighistloci_fmt *aa;
2553
2554 receive_eventbinnum = (long *) mycalloc(np2, sizeof(long));
2555
2556 aa = &world->mighistloci[locus];
2557 // read the number of bins for each parameter
2558 for(j=0; j< world->numpop2; j++)
2559 {
2560 templ = (long) buffer[z++];
2561 receive_eventbinnum[j] = templ;
2562 if(templ >= aa->eventbinnum[j])
2563 {
2564 allocsize = templ + 1;
2565 aa->eventbins[j] = (tetra *) myrealloc (aa->eventbins[j], sizeof (tetra) * allocsize);
2566 //memset(aa->eventbins[j] + aa->eventbinnum[j],0,sizeof(tetra)*(templ-aa->eventbinnum[j]));
2567 for(i=aa->eventbinnum[j];i < allocsize; i++)
2568 {
2569 aa->eventbins[j][i][0] = 0.F;
2570 aa->eventbins[j][i][1] = 0.F;
2571 aa->eventbins[j][i][2] = 0.F;
2572 aa->eventbins[j][i][3] = 0.F;
2573 aa->eventbins[j][i][4] = 0.F;
2574 aa->eventbins[j][i][5] = 0.F;
2575 aa->eventbins[j][i][6] = 0.F;
2576 aa->eventbins[j][i][7] = 0.F;
2577 aa->eventbins[j][i][8] = 0.F;
2578 }
2579 aa->eventbinnum[j] = allocsize;
2580 //debug_skyline(world,"increased eventbins in unpack-skyline");
2581 }
2582 }
2583 // read time width of bins
2584 temp = (MYREAL) buffer[z++];
2585 if(temp - world->options->eventbinsize > EPSILON)
2586 error("problem with bins size transmission in unpack_skyline...");
2587 for(j=0; j< world->numpop2; j++)
2588 {
2589 for (i = 0; i < receive_eventbinnum[j]; i++)
2590 {
2591 aa->eventbins[j][i][0] += (float) buffer[z++];// * invmaxrep;
2592 aa->eventbins[j][i][1] += (float) buffer[z++];// * invmaxrep;
2593 aa->eventbins[j][i][2] += (float) buffer[z++];// * invmaxrep;
2594 aa->eventbins[j][i][3] += (float) buffer[z++];// * invmaxrep;
2595 aa->eventbins[j][i][4] += (float) buffer[z++];// * invmaxrep;
2596 aa->eventbins[j][i][5] += (float) buffer[z++];// * invmaxrep;
2597 }
2598 }
2599 //debug_skyline(world,"after unpack_skyline_buffer");
2600 myfree(receive_eventbinnum);
2601 }
2602
2603 ///
2604 /// pack skyline into buffer to
2605 /// ship to master
pack_skyline_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2606 long pack_skyline_buffer (MYREAL **buffer, world_fmt * world,
2607 long locus, long maxrep, long numpop)
2608 {
2609 long j, i;
2610 long bufsize = world->numpop2 + 1;
2611 long z = 0L;
2612 mighistloci_fmt *aa;
2613
2614 aa = &world->mighistloci[locus];
2615 //printf("Buffer in pack_skyline()=%f (%p)(%p)\n",(*buffer)[0], (*buffer), buffer);
2616 for (j = 0; j < world->numpop2; j++)
2617 {
2618 // printf("%i> bufsize=%li eventbinnum=%li\n",myID, bufsize, aa->eventbinnum[j]);
2619 bufsize += 6 * aa->eventbinnum[j];
2620 }
2621 // fprintf(stdout,"%i> bufsize in pack-skyline-buffer %li for locus %li\n", myID, bufsize, locus);
2622 //myfree(*buffer);
2623 (*buffer) = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize));
2624 //(*buffer) = (MYREAL *) mycalloc(bufsize,sizeof (MYREAL));
2625 // memset (*buffer, 0, sizeof (MYREAL) * (bufsize));
2626 // record how many bins in for each parameter
2627 for (j = 0; j < world->numpop2; j++)
2628 {
2629 (*buffer)[z++] = (MYREAL) aa->eventbinnum[j];
2630 }
2631 // time width of bins
2632 (*buffer)[z++] = (MYREAL) world->options->eventbinsize;
2633 for (j = 0; j < world->numpop2; j++)
2634 {
2635 for (i = 0; i < aa->eventbinnum[j]; i++)
2636 {
2637 (*buffer)[z++] = (MYREAL) aa->eventbins[j][i][0];
2638 (*buffer)[z++] = (MYREAL) aa->eventbins[j][i][1];
2639 (*buffer)[z++] = (MYREAL) aa->eventbins[j][i][2];
2640 (*buffer)[z++] = (MYREAL) aa->eventbins[j][i][3];
2641 (*buffer)[z++] = (MYREAL) aa->eventbins[j][i][4];
2642 (*buffer)[z++] = (MYREAL) aa->eventbins[j][i][5];
2643 }
2644 }
2645 if(bufsize < z)
2646 {
2647 fprintf(stderr,"error: bufsize=%li, z=%li\n", bufsize, z);
2648 error("pack_skyline_buffer: bufsize is too small");
2649 }
2650 return z;
2651 }
2652
2653
2654 ///
2655 /// unpack bayes parameters to fit into mpi_results_master()
2656 void
unpack_bayes_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2657 unpack_bayes_buffer (MYREAL *buffer, world_fmt * world,
2658 long locus, long maxrep, long numpop)
2659 {
2660 // this combines the single_bayes and hist_bayes buffer unpacking
2661 unpack_hist_bayes_buffer(buffer, world->bayes, world, locus);
2662 }
2663
2664 ///
2665 /// pack bayes parameters to fit into mpi_results_worker()
2666 long
pack_bayes_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2667 pack_bayes_buffer (MYREAL **buffer, world_fmt * world,
2668 long locus, long maxrep, long numpop)
2669 {
2670 long i;
2671 long bufsize;
2672 long z;
2673 long sizec = sizeof(MYREAL);
2674 // long numbins = 0;
2675 bayes_fmt *bayes = world->bayes;
2676 long np2 = world->numpop2;
2677 long npp = np2 + bayes->mu;
2678 bayeshistogram_fmt *hist;
2679 // memory for pack_single_bayes_buffer_part()
2680 // locus and numparams + accept and trial of genealogy
2681 bufsize = 4;
2682 for(i=0; i < npp; i++)
2683 {
2684 if(bayes->map[i][1] != INVALID)
2685 {
2686 // per parameter
2687 //for each parameter acceptance and trial +2 // for each parameter
2688 bufsize += 2;
2689 }
2690 }
2691
2692 // hist_bayes_buffer:
2693 // max buffer memory needed is (npp + 11*npp + (3 * npp * hist->bins[i])
2694 hist = &(world->bayes->histogram[locus]);
2695 for(i=0; i < npp; i++)
2696 {
2697 if(bayes->map[i][1] != INVALID && !world->options->has_bayesmdimfile)
2698 {
2699 //for each parameter
2700 // bins number +1
2701 // datastore +11
2702 // bins*3 +3*bins
2703 bufsize += 12;
2704 bufsize += 3 * hist->bins[i]; //set50, set95, result per bin
2705 }
2706 }
2707 bufsize += npp*npp;
2708 // pack_BF_buffer
2709 // hmscale, hm +2
2710 // bf: number of heated chains +world->options->heated_chains
2711 bufsize += 2 + world->options->heated_chains;
2712 // Autoarchive, ESS buffer: parameters +2*(numpop2+loci)
2713 // genealogy +2
2714 bufsize += 2*npp + 2;
2715 //test printf("%i> bufsize=%li (npp=%li, numbins=%li)\n",myID, bufsize, npp, numbins);
2716 (*buffer) = (MYREAL *) myrealloc(*buffer, sizeof(MYREAL) * bufsize);
2717 memset (*buffer, 0,sizec * bufsize);
2718
2719 z = pack_single_bayes_buffer_part(buffer,world->bayes,world,locus);
2720 //printf("%i> z=%li (%li)\n",myID,z, 2 + 2 * (np2+1));
2721 if(!world->options->has_bayesmdimfile)
2722 z = pack_hist_bayes_buffer(buffer, hist, world, z);
2723 // BF material
2724 if(!world->data->skiploci[locus])
2725 {
2726 // fprintf(stderr,"%i> pack_result_buffer(): packed BF result locus=%li replicate %li\n",myID,locus, -1);
2727 z = pack_BF_buffer(buffer, z, locus, world);
2728 if(z > bufsize)
2729 {
2730 fprintf(stderr,"%i> ERROR: allocated bufsize=%li is smaller than used bufsize=%li\n",myID, bufsize, z);
2731 error("buffer allocation overflowed");
2732 }
2733 }
2734 // ESS material
2735 z = pack_ess_buffer(buffer, z, world);
2736 if(z > bufsize)
2737 {
2738 fprintf(stderr,"%i> ERROR: allocated bufsize=%li is smaller than used bufsize=%li\n",myID, bufsize, z);
2739 error("buffer allocation overflowed");
2740 }
2741 return z;
2742 }
2743
2744 ///
2745 /// unpack the bayes histogram
unpack_hist_bayes_buffer(MYREAL * buffer,bayes_fmt * bayes,world_fmt * world,long locus)2746 void unpack_hist_bayes_buffer(MYREAL *buffer, bayes_fmt *bayes, world_fmt *world, long locus)
2747 {
2748 long j, i;
2749 long pa;
2750 long z = 0;
2751 long numbins = 0;
2752 long pnum;
2753 long tmp1, tmp2;
2754 long tmplocus;
2755 bayeshistogram_fmt *hist;
2756 long total = 0;
2757 long np2 = world->numpop2;
2758 long npp = np2 + bayes->mu;
2759 long npp11 = 11 * npp;
2760 //
2761 // begin unpack_single_bayes_buffer_part
2762 tmplocus = (long) buffer[z++];
2763 pnum = (long) buffer[z++];
2764 #ifdef DEBUG
2765 const MYREAL rat = numcpu / (world->maxreplicate * world->loci);
2766 fprintf (stdout, "%i> unpack_hist_bayes_buffer() received pnum=%li, rat=%f\n", myID, pnum, rat);fflush(stdout);
2767 #endif
2768 if(tmplocus!=locus)
2769 {
2770 bayes->numparams=0;
2771 locus = tmplocus;
2772 }
2773
2774 // Cesky Krumlov 2013 printf("%i> original accepted/trials: [%li,%li,%li,%li,%li]/[%li,%li,%li,%li,%li]\n",
2775 // Cesky Krumlov 2013 myID, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
2776
2777 for (j = 0; j < np2; ++j)
2778 {
2779 if(bayes->map[j][1] != INVALID)
2780 {
2781 world->accept_archive[j] += (tmp1 = (long) buffer[z++]);//*rat);
2782 world->trials_archive[j] += (tmp2 = (long) buffer[z++]);//*rat);
2783 #ifdef DEBUG
2784 fprintf (stdout, "%i> received (acc %li) (trial %li) => (sumacc %li) (sumtrial %li)\n", myID, tmp1, tmp2, world->accept_archive[j], world->trials_archive[j]);
2785 #endif
2786 }
2787 }
2788 // genealogy
2789 world->accept_archive[j] += (tmp1 = (long) buffer[z++]);//*rat);
2790 world->trials_archive[j] += (tmp2 = (long) buffer[z++]);//*rat);
2791 #ifdef DEBUG
2792 fprintf (stdout, "%i> genealogy received (acc %li) (trial %li) => (sumacc %li) (sumtrial %li)\n", myID, tmp1, tmp2, world->accept_archive[j], world->trials_archive[j]);
2793 #endif
2794 // end unpack_single_bayes_buffer
2795 //
2796 if(!world->options->has_bayesmdimfile)
2797 {
2798 bayes->histogram[locus].datastore = (MYREAL *) myrealloc(bayes->histogram[locus].datastore, sizeof(MYREAL) * (1+npp11));
2799 hist = &(bayes->histogram[locus]);
2800 hist->minima = hist->datastore; // contains minimal values for each parameter
2801 hist->maxima = hist->datastore + npp; // contains maximal values for each parameter
2802 hist->adjmaxima = hist->datastore + 2*npp;// holds maxima values used in histogram [are smaller than maxima]
2803 hist->cred50l = hist->datastore + 3*npp; // holds 50%-credibility margins (<all lower values>,
2804 hist->cred50u = hist->datastore + 4*npp; //<all high values>)
2805 hist->cred95l = hist->datastore + 5*npp; // holds 95%-credibility margins (<all lower values>)
2806 hist->cred95u = hist->datastore + 6*npp; //<all high values>)
2807 hist->modes = hist->datastore + 7*npp; // holds 95%-credibility margins (<all lower values>, <all high values>)
2808 hist->medians = hist->datastore + 8*npp;
2809 hist->means = hist->datastore + 9*npp;
2810 hist->stds = hist->datastore + 10*npp;
2811
2812
2813 for(i = 0; i < npp; ++i)
2814 {
2815 if(bayes->map[i][1] != INVALID)
2816 {
2817 hist->bins[i] = (long) buffer[z++];
2818 total += hist->bins[i];
2819 }
2820 }
2821 // this steps kills poor memory machines [setting results to floats may help a little]
2822 hist->results = (float *) mycalloc(total * npp, sizeof(float));
2823 hist->set95 = (char *) mycalloc(total * npp * 2 + 2, sizeof(char));
2824 hist->set50 = hist->set95 + (total * npp + 1);
2825
2826 for(i = 0; i < npp; ++i)
2827 {
2828 if(bayes->map[i][1] != INVALID)
2829 {
2830 for(j=0;j<11;j++)
2831 hist->datastore[11*i+j] = buffer[z++];
2832 }
2833 }
2834 numbins = 0;
2835 for(pa=0; pa < npp; pa++)
2836 {
2837 for(i=0;i<hist->bins[pa];i++)
2838 {
2839 hist->set50[numbins + i] = (buffer[z++] < 1.0 ? '0' : '1');
2840 hist->set95[numbins + i] = (buffer[z++] < 1.0 ? '0' : '1');
2841 hist->results[numbins + i] = (float) buffer[z++];
2842 }
2843 numbins += hist->bins[pa];
2844 //
2845 // CHECK
2846 world->bayes->histtotal[locus * npp + pa] = hist->bins[pa];
2847 //
2848 }
2849 if(hist->covariance==NULL)
2850 {
2851 doublevec2d(&hist->covariance,npp,npp);
2852 }
2853 for(i=0;i<npp;i++)
2854 {
2855 for(j=0;j<npp;j++)
2856 {
2857 hist->covariance[i][j] = buffer[z++];
2858 }
2859 }
2860 }
2861 // BF material
2862 z = unpack_BF_buffer(buffer, z, locus, world);
2863 // ESS material
2864 z = unpack_ess_buffer(buffer, z, world);
2865 }
2866
2867 ///
2868 /// Bayes factor material buffer unpacker
unpack_BF_buffer(MYREAL * buffer,long start,long locus,world_fmt * world)2869 long unpack_BF_buffer(MYREAL *buffer, long start, long locus, world_fmt * world)
2870 {
2871 long i;
2872 long z = start;
2873 long hc = world->options->heated_chains;
2874 MYREAL temp;
2875 MYREAL *ttemp;
2876 MYREAL *htemp;
2877
2878 ttemp = calloc(3 * world->loci + world->options->heated_chains + 2, sizeof(MYREAL));
2879 htemp = ttemp + world->loci;
2880 //atemp = htemp + world->loci;
2881 //
2882 // harmonic mean calculation
2883 temp = buffer[z++]; //hmscale
2884 if(temp < world->hmscale[locus])
2885 {
2886 if(world->hm[locus]>0.0)
2887 world->hm[locus] *= EXP(temp - world->hmscale[locus]);
2888 world->hmscale[locus] = temp;
2889 // printf("%i> locus=%li hmscale=%f hm=%f temp=%f\n",myID,locus, world->hmscale[locus], world->hm[locus], temp);
2890 }
2891 htemp[locus] = temp;//hmscale store
2892 world->hm[locus] += EXP(-htemp[locus]+world->hmscale[locus]) * buffer[z++];
2893
2894 // thermodynamic integration
2895 for(i=0;i < world->options->heated_chains; i++)
2896 {
2897 world->bf[locus * hc + i] += buffer[z++];
2898 //printf("%i> ****bf[%li + hc + %li]=%f\n",myID, locus, i, world->bf[locus*hc+i]);
2899 }
2900
2901 myfree(ttemp);
2902 return z;
2903 }
2904
unpack_ess_buffer(MYREAL * buffer,long start,world_fmt * world)2905 long unpack_ess_buffer(MYREAL *buffer, long start, world_fmt *world)
2906 {
2907 long pa;
2908 long z = start;
2909 static long n=1;
2910 long np2 = world->numpop2;
2911 long npp = world->numpop2 + ((long) world->bayes->mu) * world->loci;
2912
2913 // MYREAL *atemp;
2914 #ifdef DEBUG
2915 const MYREAL rat = (MYREAL) numcpu / (world->maxreplicate * world->loci);
2916 fprintf (stdout, "%i> unpack_ess_buffer() rat=%i / (%li * %li) = %f\n", myID, numcpu, world->maxreplicate, world->loci, rat);fflush(stdout);
2917 #endif
2918
2919
2920 //printf("\n%i> npp =%li\n",myID,npp);
2921 //printf("%i> mu =%i\n",myID,(int) world->bayes->mu);
2922 //printf("%i> loci=%li\n",myID,world->loci);
2923 //printf("%i> unpack_ess_buffer ***unpacking BF[0]=%f hm=%f npp=%li in unpack_hist_bayes_buffer()\n", myID, world->bf[0], world->hm[0], npp);
2924 // unpacking autocorrelation and ess buffer
2925 for(pa=0; pa < np2; pa++)
2926 {
2927 if(world->bayes->map[pa][1] != INVALID)
2928 world->auto_archive[pa] += (buffer[z++] - world->auto_archive[pa])/n;
2929 }
2930 if(world->bayes->mu)
2931 {
2932 for(pa=np2;pa<npp;pa++)
2933 world->auto_archive[pa] += (buffer[z++] - world->auto_archive[pa])/n;
2934 }
2935 //genealogy
2936 world->auto_archive[pa] += (buffer[z++] - world->auto_archive[pa])/n;
2937 n++;
2938 // printf("%i>>>>>> autoarchive %f\n",myID, world->auto_archive[0]);
2939 for(pa=0; pa < np2; pa++)
2940 {
2941 if(world->bayes->map[pa][1] != INVALID)
2942 world->ess_archive[pa] += buffer[z++];// * rat;
2943 }
2944 if(world->bayes->mu)
2945 {
2946 for(pa=np2;pa<npp;pa++)
2947 world->ess_archive[pa] += buffer[z++];// * rat;
2948 }
2949 //genealogy
2950 world->ess_archive[pa] += buffer[z++];// * rat;
2951 return z;
2952 }
2953
2954 ///
2955 /// Bayes factor material buffer packer
pack_BF_buffer(MYREAL ** buffer,long start,long locus,world_fmt * world)2956 long pack_BF_buffer(MYREAL **buffer, long start, long locus, world_fmt * world)
2957 {
2958 // buffer memory needs are (2 + #heatedchains)
2959 long i;
2960 long z = start;
2961 const long hc = world->options->heated_chains;
2962 (*buffer)[z++] = world->hmscale[locus];
2963 (*buffer)[z++] = world->hm[locus];
2964 for(i=0; i < hc; i++)
2965 {
2966 (*buffer)[z++] = world->bf[locus * hc + i];
2967 }
2968 // printf("%i> locus=%li send hmscale=%f hm=%f\n",myID, locus,world->hmscale[locus],world->hm[locus]);
2969 return z;
2970 }
2971
2972 /// packing autocorrelation and ess buffer
pack_ess_buffer(MYREAL ** buffer,long start,world_fmt * world)2973 long pack_ess_buffer(MYREAL **buffer, long start, world_fmt *world)
2974 {
2975 // buffer memory needs are (npp + 1) + (npp+1) (numpop2 + mu*loci)
2976 long i;
2977 long z = start;
2978 const long np2 = world->numpop2;
2979 const long npp = np2 + ((long) world->bayes->mu * world->loci);
2980 (*buffer) = (MYREAL *) myrealloc(*buffer, (start + ((npp + 1) + (npp+1) * npp))*sizeof(MYREAL));
2981 //printf("\n%i> npp =%li\n",myID,npp);
2982 //printf("%i> mu =%li\n",myID,(long)world->bayes->mu);
2983 //printf("%i> loci=%li\n",myID,world->loci);
2984 //printf("%i>> pack_essbuffer: npp=%li autoarchive %f\n",myID, npp, world->auto_archive[0]);
2985 for(i=0;i<np2;i++)
2986 {
2987 if(world->bayes->map[i][1] != INVALID)
2988 (*buffer)[z++] = world->auto_archive[i];
2989 //printf("%i> auto: %f\n",myID,world->auto_archive[i]);
2990 }
2991 //rates for each locus
2992 if(world->bayes->mu)
2993 {
2994 for(i=np2;i<npp;i++)
2995 (*buffer)[z++] = world->auto_archive[i];
2996 }
2997 // genealogy
2998 (*buffer)[z++] = world->auto_archive[i];
2999 //printf("%i> auto G: %f\n",myID,world->auto_archive[i]);
3000 for(i=0;i<np2;i++)
3001 {
3002 if(world->bayes->map[i][1] != INVALID)
3003 (*buffer)[z++] = world->ess_archive[i];
3004 //printf("%i> ess: %f\n",myID,world->ess_archive[i]);
3005 }
3006 if(world->bayes->mu)
3007 {
3008 for(i=np2;i<npp;i++)
3009 (*buffer)[z++] = world->ess_archive[i];
3010 }
3011 //genealogy
3012 (*buffer)[z++] = world->ess_archive[i];
3013 //printf("%i> ess G: %f\n",myID,world->ess_archive[i]);
3014 return z;
3015 }
3016
3017 ///
3018 /// pack the bayes histogram
pack_hist_bayes_buffer(MYREAL ** buffer,bayeshistogram_fmt * hist,world_fmt * world,long startposition)3019 long pack_hist_bayes_buffer(MYREAL **buffer, bayeshistogram_fmt *hist, world_fmt * world, long startposition)
3020 {
3021 // buffer memory needed is (npp + 11*npp + (3 * npp * hist->bins[i])
3022 long j;
3023 long i;
3024 long npp = world->numpop2 + world->bayes->mu;
3025 long numbins = 0;
3026 long z = startposition;
3027 bayes_fmt *bayes = world->bayes;
3028 #ifdef DEBUG_MPI
3029 printf("%i> pack_hist_bayes_buffer: position=%li last value = %f numparams=%li npp=%li\n",myID, startposition, (z > 0) ? (*buffer)[startposition] : -9999., hist->numparam, npp);
3030 #endif
3031
3032 for(i = 0; i < npp; ++i)
3033 {
3034 if(bayes->map[i][1] != INVALID)
3035 {
3036 (*buffer)[z++] = (MYREAL) hist->bins[i];
3037 }
3038 }
3039 // printf("%i> npp=%li, z=%li\n",myID,npp,z);
3040 // parameter and mu datastore
3041 for(i = 0; i < npp; ++i)
3042 {
3043 if(bayes->map[i][1] != INVALID)
3044 {
3045 for(j=0;j<11;j++)
3046 (*buffer)[z++] = (MYREAL) hist->datastore[11*i+j];
3047 }
3048 }
3049 numbins = 0;
3050 //printf("%i> npp2+=%li, z=%li\n",myID,npp+npp11,z);
3051 for(i=0; i < npp; i++)
3052 {
3053 // bins are zero for "c" and "0" parameters
3054 // if(bayes->map[i][1] != INVALID)
3055 // {
3056 for(j=0;j<hist->bins[i];j++)
3057 {
3058 (*buffer)[z++] = (MYREAL) (hist->set50[numbins + j]=='1' ? 1.0 : 0.0);
3059 (*buffer)[z++] = (MYREAL) (hist->set95[numbins + j]=='1' ? 1.0 : 0.0);
3060 (*buffer)[z++] = (MYREAL) hist->results[numbins + j];
3061 }
3062 // }
3063 numbins += hist->bins[i];
3064 }
3065 // pack covariance matrix
3066 for(i=0;i<npp;i++)
3067 {
3068 for(j=0;j<npp;j++)
3069 {
3070 (*buffer)[z++] = hist->covariance[i][j];
3071 }
3072 }
3073 #ifdef DEBUG_MPI
3074 printf("%i> pack_hist_bayes_buffer: position=%li numbins=%li, last value = %f\n",myID, z, numbins, (*buffer)[z-1]);
3075 #endif
3076 return z;
3077 }
3078
3079
3080 ///
3081 /// unpack bayes parameter buffer, sent from replicant nodes AND lociworker to the master
3082 /// the values will be simply added to the bayes->params, no records of replicates will be done.
unpack_single_bayes_buffer(MYREAL * buffer,bayes_fmt * bayes,world_fmt * world,long locus)3083 long unpack_single_bayes_buffer(MYREAL *buffer,bayes_fmt * bayes, world_fmt * world,long locus)
3084 {
3085 long i, j;
3086 long z = 0 ;
3087 long pnum;
3088 long tmp1, tmp2;
3089 long tmplocus;
3090 long allocparams = world->bayes->allocparams;
3091 // long oldallocparams = world->bayes->allocparams;
3092 long repstart;
3093 long repstop;
3094 long nn = 2+world->numpop2 + (world->bayes->mu) ;
3095 set_replicates (world, world->repkind, world->options->replicatenum,
3096 &repstart, &repstop);
3097 tmplocus = (long) buffer[z++];
3098 pnum = (long) buffer[z++];
3099 //fprintf (stdout, "%i> received locus=%li (oldlocus=%li) pnum=%li\n", myID, tmplocus, locus, pnum);
3100 if(tmplocus!=locus)
3101 world->bayes->numparams=0;
3102
3103 pnum += world->bayes->numparams;
3104 if(pnum >=world->bayes->allocparams)
3105 {
3106 allocparams = pnum + 1;
3107 world->bayes->params = (MYREAL *) myrealloc(world->bayes->params,sizeof(MYREAL)*allocparams*nn);
3108 }
3109 world->bayes->allocparams = allocparams;
3110 for(i = world->bayes->numparams; i < pnum; ++i)
3111 {
3112 // the first element is log(p(d|g)p(g|param))
3113 (world->bayes->params+(nn*i))[0] = buffer[z++];
3114 // the second element is log(p(d|g))
3115 (world->bayes->params+(nn*i))[1] = buffer[z++];
3116 //fprintf (stdout, "%i> receive params line %li ", myID, i);
3117 for (j = 2; j < nn; ++j)
3118 {
3119 if(bayes->map[j-2][1] != INVALID)
3120 (world->bayes->params+(nn*i))[j] = buffer[z++];
3121 //fprintf (stdout, "%f ", (world->bayes->params+(nn*i + 1))[j]);
3122 }
3123 //fprintf (stdout, "\n");
3124 }
3125 world->bayes->numparams = pnum;
3126 // acceptance ratios are added to the ones we have already
3127 // parameter acceptances
3128
3129 // Cesky Krumlov 2013 printf("%i> **original accepted/trials: [%li,%li,%li,%li,%li]/[%li,%li,%li,%li,%li]\n",
3130 // Cesky Krumlov 2013 myID, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
3131
3132 for (j = 0; j < world->numpop2; ++j)
3133 {
3134 if(bayes->map[j][1] != INVALID)
3135 {
3136 tmp1 = (long) buffer[z++];
3137 tmp2 = (long) buffer[z++];
3138 //Cesky Krumlov 2013
3139 world->accept_archive[j] += tmp1;
3140 //Cesky Krumlov 2013
3141 world->trials_archive[j] += tmp2;
3142 }
3143 }
3144 // the last acceptance is the one for the genealogies
3145 tmp1 = (long) buffer[z++];
3146 tmp2 = (long) buffer[z++];
3147 //Cesky Krumlov 2013
3148 world->accept_archive[j] += tmp1;
3149 //Cesky Krumlov 2013
3150 world->trials_archive[j] += tmp2;
3151 // Cesky Krumlov 2013 printf("%i> **updated accepted/trials: [%li,%li,%li,%li,%li]/[%li,%li,%li,%li,%li]\n",
3152 // Cesky Krumlov 2013 myID, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
3153
3154 //printf("%i> unpack_single_bayes_buffer(): buffer counter is at %li",myID, z);
3155 return z;
3156 }
3157
3158 ///
3159 /// Pack bayes parameter buffer, sent from replicant nodes AND lociworker to the master
3160 /// the values will be simply added to the bayes->params, no records of specific replicates are kept.
pack_single_bayes_buffer(MYREAL ** buffer,bayes_fmt * bayes,world_fmt * world,long locus)3161 long pack_single_bayes_buffer(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus)
3162 {
3163 long i, j;
3164 long bufsize;
3165 long z = 0;
3166 // long nump = world->numpop2 + 1;
3167 long nn = 2 + world->numpop2 + (world->bayes->mu) ;
3168 const long nng= world->numpop2 + world->loci * world->bayes->mu + 1;
3169 bufsize = 2 * (world->numpop2 + 1); //acceptance ratio: params + tree
3170 bufsize += 2; // loci + numparams
3171 bufsize += world->bayes->numparams * nn;
3172 //printf("%i> bufsize in pack_single_bayes_buffer()=%li\n",myID,bufsize);
3173 (*buffer) = (MYREAL *) myrealloc(*buffer, bufsize * sizeof(MYREAL));
3174 memset (*buffer, 0, bufsize * sizeof(MYREAL));
3175 #ifdef DEBUG_MPI
3176 fprintf(stdout, "%i>>>>>\n buffersize=%li, numparams=%li\n>>>>\n",
3177 myID,
3178 bufsize,
3179 world->bayes->numparams
3180 );
3181 fflush(stdout);
3182 #endif
3183
3184 (*buffer)[z++] = (MYREAL) locus;
3185 (*buffer)[z++] = (MYREAL) bayes->numparams;
3186
3187 for(i = 0; i < world->bayes->numparams; ++i)
3188 {
3189 //the first and second elements are logprob
3190 (*buffer)[z++] = (bayes->params+(i*nn))[0];
3191 (*buffer)[z++] = (bayes->params+(i*nn))[1];
3192 for (j = 2; j < nn; ++j) //the first and second elements are logprob
3193 {
3194 if(bayes->map[j-2][1] != INVALID)
3195 (*buffer)[z++] = (bayes->params+(i*nn))[j];
3196 }
3197 }
3198 // for the parameters
3199 for (j = 0; j < world->numpop2; ++j)
3200 {
3201 if(bayes->map[j][1] != INVALID)
3202 {
3203 (*buffer)[z++] = (MYREAL) world->accept_archive[j];
3204 (*buffer)[z++] = (MYREAL) world->trials_archive[j];
3205 }
3206 }
3207 // for the genealogy
3208 (*buffer)[z++] = (MYREAL) world->accept_archive[j];
3209 (*buffer)[z++] = (MYREAL) world->trials_archive[j];
3210
3211 if(bufsize < z)
3212 error("buffer is too small in pack_single_bayes_buffer()\n");
3213 memset(world->accept_archive,0,sizeof(long)*2*nng);//Cesky Krumlov 2013
3214 return z;
3215 }
3216 ///
3217 /// Pack bayes parameter buffer, sent from replicant nodes AND lociworker to the master
3218 /// the acceptance and trial values will be simply added.
3219
pack_single_bayes_buffer_part(MYREAL ** buffer,bayes_fmt * bayes,world_fmt * world,long locus)3220 long pack_single_bayes_buffer_part(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus)
3221 {
3222 long j;
3223 long z = 0;
3224 const long nng= world->numpop2 + world->loci * world->bayes->mu + 1;
3225
3226 (*buffer)[z++] = (MYREAL) locus;
3227 (*buffer)[z++] = (MYREAL) bayes->numparams;
3228
3229 // parameters
3230 for (j = 0; j < world->numpop2; ++j)
3231 {
3232 if(bayes->map[j][1] != INVALID)
3233 {
3234 (*buffer)[z++] = (MYREAL) world->accept_archive[j];
3235 (*buffer)[z++] = (MYREAL) world->trials_archive[j];
3236 }
3237 }
3238 // genealogy
3239 (*buffer)[z++] = (MYREAL) world->accept_archive[j];
3240 (*buffer)[z++] = (MYREAL) world->trials_archive[j];
3241 memset(world->accept_archive,0,sizeof(long)* 2 * nng); //removes accept and trials archive
3242 return z;
3243 }
3244
3245 ///
3246 /// unpack minimal statistic trees
3247 /// \todo there are differences between unpack_sumfile() and and read_savesum() this needs reconciliation
3248 void
unpack_sumfile_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)3249 unpack_sumfile_buffer (MYREAL *buffer, world_fmt * world,
3250 long locus, long maxrep, long numpop)
3251 {
3252 long replicate;
3253 timearchive_fmt **ta = world->atl;
3254 long z=0;
3255 for (replicate = 0; replicate < maxrep; replicate++)
3256 {
3257 unpack_single_sumfile_buffer (buffer, ta, world, locus, replicate, numpop,&z);
3258 }
3259 }
3260
3261
3262
3263 ///
3264 /// unpack minimal statistic trees for a single replicate
3265 void
unpack_single_sumfile_buffer(MYREAL * buffer,timearchive_fmt ** ta,world_fmt * world,long locus,long replicate,long numpop,long * startz)3266 unpack_single_sumfile_buffer (MYREAL *buffer, timearchive_fmt **ta, world_fmt *world,
3267 long locus, long replicate, long numpop, long *startz)
3268 {
3269 long i, j;
3270 long z = *startz;
3271 long numpop2plus = numpop * numpop + 2 * numpop;
3272 ta[replicate][locus].T = (long) buffer[z++];
3273 ta[replicate][locus].numpop = (long) buffer[z++];
3274 ta[replicate][locus].sumtips = (long) buffer[z++];
3275 ta[replicate][locus].param_like = buffer[z++];
3276 //fprintf(stderr,"%i> unpack_single_sumfile: start_z=%li, replicate=%li, locus=%li ", myID, *startz, replicate, locus);
3277 world->chainlikes[locus][replicate] = ta[replicate][locus].param_like;
3278 //fprintf(stdout,"%i> got sumfile locus %li and replicate %li (%li)\n",myID,locus,replicate, ta[replicate][locus].allocT);
3279 increase_timearchive (world, locus, ta[replicate][locus].T, world->numpop, replicate);
3280 for (i = 0; i < ta[replicate][locus].T; i++)
3281 {
3282 ta[replicate][locus].tl[i].copies = buffer[z++];
3283 ta[replicate][locus].tl[i].lcopies = buffer[z++];
3284 for (j = 0; j < numpop2plus; j++)
3285 {
3286 ta[replicate][locus].tl[i].data[j] = buffer[z++];
3287 }
3288 }
3289 for (i = 0; i < world->numpop2; i++)
3290 {
3291 ta[replicate][locus].param[i] = buffer[z++];
3292 ta[replicate][locus].param0[i] = buffer[z++];
3293 }
3294 log_param0 (ta[replicate][locus].param0, ta[replicate][locus].lparam0, world->numpop2);
3295 ta[replicate][locus].trials = buffer[z++];
3296 ta[replicate][locus].normd = buffer[z++];
3297 *startz = z;
3298 //fprintf(stderr,"<%i> end_z=%li\n", myID, *startz);
3299 }
3300
3301
pack_single_sumfile_buffer(MYREAL ** buffer,long z,long * allocbufsize,world_fmt * world,long locus,long replicate,long numpop)3302 long pack_single_sumfile_buffer(MYREAL **buffer, long z, long *allocbufsize, world_fmt * world,
3303 long locus, long replicate, long numpop)
3304 {
3305 long i, j;
3306 long numpop2 = numpop * numpop;
3307 long numpop2plus = numpop2 + 2 * numpop;
3308 timearchive_fmt **ta = world->atl;
3309 long bufsize = ta[replicate][locus].T;
3310 bufsize *= numpop2plus;
3311 bufsize += 2 * ta[replicate][locus].T;
3312 bufsize += 2 * numpop2;
3313 bufsize += 6;
3314 if(*allocbufsize < bufsize)
3315 {
3316 (*buffer) = (MYREAL *) myrealloc (*buffer, bufsize * sizeof (MYREAL));
3317 *allocbufsize = bufsize;
3318 }
3319 (*buffer)[z++] = (MYREAL) ta[replicate][locus].T;
3320 (*buffer)[z++] = (MYREAL) ta[replicate][locus].numpop;
3321 (*buffer)[z++] = (MYREAL) ta[replicate][locus].sumtips;
3322 (*buffer)[z++] = ta[replicate][locus].param_like;
3323
3324 for (i = 0; i < ta[replicate][locus].T; i++)
3325 {
3326 (*buffer)[z++] = (MYREAL) ta[replicate][locus].tl[i].copies;
3327 (*buffer)[z++] = ta[replicate][locus].tl[i].lcopies;
3328 for (j = 0; j < numpop2plus; j++)
3329 {
3330 (*buffer)[z++] = ta[replicate][locus].tl[i].data[j];
3331 }
3332 }
3333 for (i = 0; i < numpop2; i++)
3334 {
3335 (*buffer)[z++] = ta[replicate][locus].param[i];
3336 (*buffer)[z++] = ta[replicate][locus].param0[i];
3337 }
3338 (*buffer)[z++] = (MYREAL) ta[replicate][locus].trials;
3339 (*buffer)[z] = ta[replicate][locus].normd;
3340 z++;
3341 return z;
3342 }
3343
3344 long
pack_sumfile_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)3345 pack_sumfile_buffer (MYREAL **buffer, world_fmt * world,
3346 long locus, long maxrep, long numpop)
3347 {
3348 long replicate;
3349 long bufsize = 1;
3350 long allocbufsize = 0;
3351 long z = 0;
3352 long numpop2 = numpop * numpop;
3353 long numpop2plus = numpop2 + 2 * numpop;
3354 timearchive_fmt **ta = world->atl;
3355
3356 for (replicate = 0; replicate < maxrep; replicate++)
3357 {
3358 bufsize = ta[replicate][locus].T;
3359 bufsize *= numpop2plus;
3360 bufsize += 2 * ta[replicate][locus].T;
3361 bufsize += 2 * numpop2;
3362 bufsize += 6;
3363 allocbufsize += bufsize;
3364 //fprintf(stderr,"%i> bufsize=%li allocbufsize=%li\n", myID, bufsize, allocbufsize);
3365 }
3366 (*buffer) = (MYREAL *) myrealloc ((*buffer), allocbufsize * sizeof (MYREAL));
3367 memset(*buffer, 0, sizeof(MYREAL) * allocbufsize);
3368 for (replicate = 0; replicate < maxrep; replicate++)
3369 {
3370 z = pack_single_sumfile_buffer(buffer, z, &allocbufsize, world, locus, replicate, numpop);
3371 //fprintf(stderr,"%i> PACKED SUMFILE FOR REPLICATE %li z=%li locus=%li allocbufsize=%li\n", myID, replicate, z, locus, allocbufsize);
3372 }
3373 bufsize = z;
3374 if(bufsize > allocbufsize)
3375 {
3376 fprintf(stderr,"bufsize=%li allocbufsize=%li\n", bufsize, allocbufsize);
3377 error("allocation exceeded in pack_sumfile_buffer");
3378 }
3379 return bufsize;
3380 }
3381
3382
3383 ///
3384 /// gather results (sumfiles, results, migrate-histogram, ..) from workers
3385 void
mpi_results_master(long sendtype,world_fmt * world,long maxreplicate,void (* unpack)(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop))3386 mpi_results_master (long sendtype, world_fmt * world, long maxreplicate,
3387 void (*unpack) (MYREAL *buffer, world_fmt * world,
3388 long locus, long maxrep, long numpop))
3389 {
3390 #ifdef DEBUG_MPI
3391 long ii;
3392 #endif
3393 long numpop = world->numpop;
3394 long bufsize = 1;
3395 // maxreplicate > 1 ---> add 1 [this all needs careful checking]
3396 // MIGMPI_SUMFILE -----> 0
3397 // MIGMPI_HIST -----> 0
3398 // still strange? long addon = (maxreplicate>1) ? 1 : ((sendtype == MIGMPI_SUMFILE) || (sendtype == MIGMPI_MIGHIST) )? 0 : ((world->loci == 1) ? 0 : 1) ;
3399 //long addon = (maxreplicate>1) ? 1 : ((world->loci > 1) ? 1 : 0) ;
3400 long addon = 1;
3401 // boolean done = FALSE;
3402 MYREAL *buffer;
3403 MYREAL *temp;
3404 int worker;
3405 long z, tag, sender;
3406 MPI_Status status;
3407 long numelem = world->numpop2 + (world->options->gamma ? 1 : 0);
3408 long numelem2 = 2 * numelem;
3409
3410 temp = (MYREAL *) mycalloc (numelem2 + 2, sizeof (MYREAL));
3411 buffer = (MYREAL *) mycalloc (bufsize+1, sizeof (MYREAL));
3412 temp[0] = (MYREAL)sendtype;
3413 temp[1] = (MYREAL) bufsize;
3414 for (worker = 1; worker < MIN (world->loci + addon, numcpu); worker++)
3415 {
3416 //printf("%i> MASTER requests information from n%i for locus %i\n",myID, worker, worker-1);
3417 MYMPISEND (temp, numelem2 + 2, mpisizeof, worker, worker, comm_world);
3418 }
3419 z = 0;
3420 while (z < world->loci)
3421 {
3422 MYMPIRECV (&bufsize, ONE, MPI_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG,
3423 comm_world, &status);
3424 buffer = (MYREAL *) myrealloc (buffer, sizeof (MYREAL) * (bufsize + 1));
3425 memset (buffer, 0, sizeof (MYREAL) * (bufsize + 1));
3426 sender = status.MPI_SOURCE;
3427 tag = status.MPI_TAG;
3428 #ifdef DEBUG_MPI
3429 fprintf(stdout, "%i> z=%li worker=%li bufsize=%li -------------------------------------\n",myID, z, sender, bufsize);
3430 #endif
3431 MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
3432 #ifdef DEBUG_MPI
3433 fprintf(stdout,"%i>------------------------------------\nbuffer=",myID);
3434 for(ii=0;ii<bufsize;ii++)
3435 fprintf(stdout," %f", buffer[ii]);
3436 fprintf(stdout,"\n%i>-------------------------------------\n",myID);
3437 #endif
3438 (*unpack) (buffer, world, tag - 1, maxreplicate, numpop);
3439 //fprintf(stdout,"%i> unpacked bufsize=%li from node %li\n",myID,bufsize,sender); fflush(stdout);
3440 z++;
3441 }
3442 myfree(buffer);
3443 myfree(temp);
3444 }
3445
3446 void
mpi_results_worker(long bufs,world_fmt * world,long maxrep,long (* pack)(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop))3447 mpi_results_worker (long bufs, world_fmt * world, long maxrep,
3448 long (*pack) (MYREAL **buffer, world_fmt * world,
3449 long locus, long maxrep, long numpop))
3450 {
3451 long numpop = world->numpop;
3452 long ww, locus;
3453 MYREAL *allbuffer;
3454 long bufsize = 1;
3455 allbuffer = (MYREAL *) mycalloc(1, sizeof(MYREAL));
3456 //fprintf(stdout,"%i> locidone=%i\n",myID, locidone); fflush(stdout);
3457 for (ww = 0; ww < locidone; ww++)
3458 {
3459 locus = world->who[ww];
3460 bufsize = (*pack) (&allbuffer, world, locus, maxrep, numpop);
3461 #ifdef DEBUG_MPI
3462 fprintf(stdout,"%i> locus=%li after pack bufsize=%li\n",myID, locus, bufsize); fflush(stdout);
3463 #endif
3464 MYMPISEND (&bufsize, ONE, MPI_LONG, MASTER, (int) (locus + 1), comm_world);
3465 //fprintf(stdout,"%i> sending results from locus=%li using bufsize=%li to master \n",myID, locus, bufsize); fflush(stdout);
3466 MYMPISEND (allbuffer, bufsize, mpisizeof, MASTER, (int) (locus + 1), comm_world);
3467 }
3468 myfree(allbuffer);
3469 }
3470
3471 void
mpi_broadcast_results(world_fmt * world,long loci,long (* pack)(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop),void (* unpack)(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop))3472 mpi_broadcast_results (world_fmt * world, long loci,
3473 long (*pack) (MYREAL **buffer, world_fmt * world,
3474 long locus, long maxrep, long numpop),
3475 void (*unpack) (MYREAL *buffer, world_fmt * world,
3476 long locus, long maxrep, long numpop))
3477 {
3478 long locus;
3479 // long addon = (world->loci == 1) 0 : 1;
3480 long bufsize=1;
3481 #ifdef DEBUG_MPI
3482 char nowstr[STRSIZE];
3483 #endif
3484 MYREAL *allbuffer = NULL;// = &world->buffer;
3485
3486 long maxreplicate = (world->options->replicate
3487 && world->options->replicatenum >
3488 0) ? world->options->replicatenum : 1;
3489 allbuffer = (MYREAL *) mycalloc (1, sizeof (MYREAL));
3490 #ifdef DEBUG_MPI
3491 get_time (nowstr, "%H:%M:%S");
3492 if(world->options->progress)
3493 fprintf(stdout, "%i> Redistributing the data\nResult parts [Time is %s]\n",myID, nowstr);
3494 #endif
3495 for (locus = 0; locus < loci; locus++)
3496 {
3497 if (myID == MASTER)
3498 {
3499 bufsize =(*pack) (&allbuffer, world, locus, maxreplicate,
3500 world->numpop);
3501 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
3502 MYMPIBCAST (allbuffer, bufsize, mpisizeof, MASTER, comm_world);
3503 #ifdef DEBUG_MPI
3504 printf("%i> Locus %li results sent\n",myID, locus);
3505 #endif
3506 }
3507 else
3508 {
3509 MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
3510 allbuffer = (MYREAL *) myrealloc (allbuffer, sizeof (MYREAL) * bufsize + 1);
3511 MYMPIBCAST (allbuffer, bufsize, mpisizeof, MASTER, comm_world);
3512 (*unpack)(allbuffer, world, locus, maxreplicate,
3513 world->numpop);
3514 #ifdef DEBUG_MPI
3515 printf("%i> Locus %li results received\n",myID, locus);
3516 #endif
3517 }
3518 //myfree(allbuffer);
3519 //allbuffer=NULL;
3520 //memset (allbuffer, 0, sizeof (char) * bufsize);
3521 }
3522 MYMPIBARRIER(comm_world);
3523 myfree(allbuffer);
3524 }
3525
3526 // slownet profiler
3527 //
3528 #ifdef SLOWNET
3529
3530 ///
3531 /// orchestrates and prints the profile likelihoods using MPI
3532 /// when the slownet option for MPI work is chosen
3533 void
mpi_profiles_master(world_fmt * world,long nparam,int * profilewho)3534 mpi_profiles_master (world_fmt * world, long nparam, int *profilewho)
3535 {
3536
3537 boolean done;
3538 int tag;
3539 int sender = 0;
3540
3541 #ifdef PRETTY
3542 long location;
3543 #endif
3544 long pnum;
3545 long pdone;
3546 long bufsize;
3547 long quantsize;
3548 long failuresize;
3549 long tempstrsize=STRSIZE;
3550 long temp[4];
3551 long numsent = 0;
3552
3553 char *tempstr;
3554 char *leadstr;
3555 char **buffer = &world->buffer;
3556
3557 FILE *outfile = world->outfile;
3558
3559 MPI_Request *irequests;
3560 MPI_Status status;
3561 MPI_Status *istatus;
3562 long *profilelist;
3563 long minnodes;
3564 long newnparam = nparam;
3565 long parameternum;
3566 tempstr = (char*) mycalloc(tempstrsize,sizeof(char));
3567 leadstr = (char*) mycalloc(STRSIZE,sizeof(char));
3568 profilelist = (long*) mycalloc(world->numpop2+1,sizeof(long));
3569 newnparam = choose_profile_parameters (world, profilelist);
3570 minnodes = MIN (newnparam, numcpu - 1);
3571 irequests = (MPI_Request *) mycalloc(minnodes,sizeof(MPI_Request));
3572 istatus = (MPI_Status *) mycalloc(minnodes,sizeof(MPI_Status));
3573
3574 for (pnum = 0; pnum < minnodes; pnum++)
3575 {
3576 parameternum = profilelist[pnum];
3577 MYMPIISEND (¶meternum, 1, MPI_LONG, (MYINT) (pnum + 1), (MYINT) (pnum + 1), comm_world,&irequests[numsent]);
3578 //fprintf(stdout,"%i>>>>> sent parameter number %li to node %li with tag %li\n",myID,pnum,pnum+1,pnum+1);
3579 numsent++;
3580 }
3581 MYMPIWAITALL(minnodes,irequests, istatus);
3582
3583 for (pnum = 0; pnum < newnparam; pnum++)
3584 {
3585 done = FALSE;
3586 while(!done)
3587 {
3588 MYMPIRECV (leadstr, STRSIZE, MPI_CHAR, (MYINT) MPI_ANY_SOURCE, (MYINT) MPI_ANY_TAG, comm_world, &status);
3589 sender = status.MPI_SOURCE;
3590 tag = status.MPI_TAG;
3591 switch(leadstr[0])
3592 {
3593 case 'M':
3594 tempstrsize = atol(leadstr+1);
3595 tempstr = (char*) realloc(tempstr,sizeof(char)*(tempstrsize+1));
3596 memset(tempstr,0,sizeof(char)*(tempstrsize+1));
3597 // fprintf(stdout,"%i> ready to receive %li chars\n",myID,tempstrsize);
3598 MYMPIRECV (tempstr, tempstrsize, MPI_CHAR, (MYINT) sender, (MYINT) tag,
3599 comm_world, &status);
3600 handle_message(tempstr,sender, world);
3601 break;
3602 case 'P':
3603 //fprintf(stdout,"%i> ready to receive 4 longs\n",myID);
3604 MYMPIRECV (temp, FOUR, MPI_LONG, (MYINT) sender, (MYINT) MPI_ANY_TAG,
3605 comm_world, &status);
3606 pdone = temp[0];
3607 bufsize = temp[1];
3608 quantsize = temp[2];
3609 failuresize = temp[3];
3610 //fprintf(stdout,"%i> ++++++++++++ bufsize=%li quantsize=%li failuresize=%li from sender %i\n",
3611 // myID,bufsize,quantsize,failuresize,sender);
3612 profilewho[pdone] = sender;
3613 *buffer =
3614 (char *) myrealloc (*buffer, sizeof (char) * (bufsize + quantsize + failuresize + 1));
3615 //memset (*buffer, 0, sizeof (char) * (bufsize + quantsize + failuresize + 1));
3616 MYMPIRECV (*buffer, bufsize + quantsize + failuresize, MPI_CHAR, (MYINT) sender, (MYINT) tag,
3617 comm_world, &status);
3618 //fprintf(stdout,"@%s@\n\n@%s@\n\n\n",*buffer+bufsize,*buffer+bufsize+quantsize);
3619 //fprintf(stdout,"########################\n# %i> buf %li from %i \n########################\n",
3620 //myID, (long) (long) strlen(*buffer)+1, sender);
3621 if(world->options->printprofsummary)
3622 {
3623 unpack_quantile ((*buffer) + bufsize, world->quantiles[pdone],
3624 GRIDSIZE);
3625 unpack_failed_percentiles ((*buffer) + bufsize + quantsize, world->percentile_failed[pdone],
3626 GRIDSIZE);
3627 // fprintf(stdout,"%i>failed=%i %i %i %i %i %i %i\n",myID,world->percentile_failed[pdone][0]
3628 // ,world->percentile_failed[pdone][1]
3629 // ,world->percentile_failed[pdone][2]
3630 // ,world->percentile_failed[pdone][3]
3631 // ,world->percentile_failed[pdone][4]
3632 // ,world->percentile_failed[pdone][5]
3633 // ,world->percentile_failed[pdone][6]);
3634 memset ((*buffer) + bufsize , 0, sizeof (char) * (quantsize + failuresize));
3635 }
3636 // print profile table that is in the first part of the buffer
3637 fprintf (outfile, "%s\n\n", *buffer);
3638 #ifdef PRETTY
3639 location = strlen(*buffer);
3640 pdf_print_profile_table(55.0f, &page_height, world->options->profilemethod, *buffer + location + 1, world);
3641 #endif
3642 done=TRUE;
3643 break;
3644 default:
3645 fprintf(stderr,"%i> message=%s\n%i> sender=%i tag=%i\n",myID,tempstr, myID,status.MPI_SOURCE,status.MPI_TAG);
3646 MPI_Finalize();
3647 error("DIED because of wrong message from worker");
3648 break;
3649 }
3650 }
3651 if (numsent < newnparam)
3652 {
3653 parameternum = profilelist[numsent];
3654 MYMPISEND (¶meternum, ONE, MPI_LONG, (MYINT) sender, (MYINT) (numsent + 1), comm_world);
3655 numsent++;
3656 }
3657 else
3658 {
3659 // stop worker because there is nothing to do anymore
3660 MYMPISEND (&nparam, ONE, MPI_LONG, sender, 0, comm_world); //end of parameter list
3661 }
3662 }
3663 // stop workers that did nothing for profiles
3664 for (sender = MIN (newnparam, numcpu - 1) + 1; sender < numcpu ; sender++)
3665 {
3666 // stop all nodes to wait for profiles
3667 MYMPISEND (&newnparam, ONE, MPI_LONG, sender, 0, comm_world);
3668 }
3669 myfree(tempstr);
3670 myfree(leadstr);
3671 myfree(istatus);
3672 myfree(irequests);
3673 myfree(profilelist);
3674 }
3675
3676 void
mpi_profiles_worker(world_fmt * world,long * gmaxptr)3677 mpi_profiles_worker (world_fmt * world, long *gmaxptr)
3678 {
3679 boolean done = FALSE;
3680 long pnum;
3681 long temp[4];
3682 char *tempstr;
3683 char *quantilebuffer;
3684 char *failedbuffer;
3685 MPI_Status status;
3686 quantilebuffer = (char *) mycalloc (ONE, sizeof (char));
3687 failedbuffer = (char *) mycalloc (ONE, sizeof (char));
3688 tempstr = (char *) mycalloc (STRSIZE, sizeof (char));
3689 while (!done)
3690 {
3691 //fprintf(stdout,"%i> before receive of parameter number\n",myID);
3692 MYMPIRECV (&pnum, 1, MPI_LONG, (MYINT) MASTER, (MYINT) MPI_ANY_TAG, comm_world, &status);
3693 //fprintf(stdout,"%i> RECEIVED parameter number %li from %i with tag %i\n",myID,pnum,status.MPI_SOURCE,status.MPI_TAG);
3694
3695 if (status.MPI_TAG != 0) //stop condition
3696 {
3697 // fills world->buffer with profile information
3698 print_profile_likelihood_driver (pnum, world, gmaxptr);
3699 temp[0] = pnum;
3700 temp[1] = (long) strlen (world->buffer);
3701 #ifdef PRETTY
3702 // the PDF table is also sent in addition to the formatted ascii table
3703 temp[1] = (long) strlen(world->buffer + temp[1] + 1) + temp[1] + 1;
3704 #endif
3705 if(world->options->printprofsummary)
3706 {
3707 temp[2] = pack_quantile (&quantilebuffer, world->quantiles[pnum], GRIDSIZE);
3708 world->buffer = (char *) myrealloc (world->buffer,
3709 sizeof (char) * (temp[1] + temp[2] + 1));
3710 sprintf(world->buffer + temp[1], "%s", quantilebuffer);
3711 temp[3] = pack_failed_percentiles (&failedbuffer, world->percentile_failed[pnum], GRIDSIZE);
3712 world->buffer = (char *) myrealloc (world->buffer,
3713 sizeof (char) * (temp[1] + temp[2] + temp[3] + 1));
3714 sprintf(world->buffer+temp[1]+temp[2], "%s", failedbuffer);
3715 }
3716 else
3717 {
3718 temp[2] = 0;
3719 temp[3] = 0;
3720 }
3721 sprintf(tempstr,"P%li", temp[1]);
3722 //tempstr[0]='P';
3723 MYMPISEND (tempstr, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) pnum + 1, comm_world);
3724 MYMPISEND (temp, FOUR, MPI_LONG, (MYINT) MASTER, (MYINT) pnum + 1, comm_world);
3725 MYMPISEND (world->buffer, temp[1] + temp[2] + temp[3], MPI_CHAR, (MYINT) MASTER, (MYINT)
3726 pnum + 1, comm_world);
3727 world->profilewho[profiledone++] = pnum;
3728 }
3729 else
3730 {
3731 done = TRUE;
3732 }
3733 }
3734 myfree(tempstr);
3735 myfree(quantilebuffer);
3736 myfree(failedbuffer);
3737 }
3738
3739 long
pack_quantile(char ** buffer,quantile_fmt quant,long n)3740 pack_quantile (char **buffer, quantile_fmt quant, long n)
3741 {
3742 long i;
3743 char fp[LONGLINESIZE];
3744 long bufsize = LONGLINESIZE;
3745 *buffer = (char *) myrealloc (*buffer, sizeof (char) * bufsize);
3746 sprintf (*buffer, "QUANTILEBUFFER:\n %s\n", quant.name);
3747 for (i = 0; i < n; i++)
3748 {
3749 bufsize += 1 + sprintf (fp, "%20.20f\n", quant.param[i]);
3750 *buffer = (char *) myrealloc (*buffer, sizeof (char) * bufsize);
3751 strcat (*buffer, fp);
3752 }
3753 bufsize = (long) strlen(*buffer);
3754 return bufsize;
3755 }
3756
3757 void
unpack_quantile(char * buffer,quantile_fmt quant,long n)3758 unpack_quantile (char *buffer, quantile_fmt quant, long n)
3759 {
3760 long i;
3761 char *input;
3762 char *buf = buffer;
3763 input = (char*) mycalloc(LONGLINESIZE,sizeof(char));
3764 sgets (input, LONGLINESIZE, &buf);
3765 sgets (input, LONGLINESIZE, &buf);
3766 strcpy (quant.name, input);
3767 for (i = 0; i < n; i++)
3768 {
3769 sgets (input, LONGLINESIZE, &buf);
3770 quant.param[i] = atof (input);
3771 }
3772 myfree(input);
3773 }
3774
3775 ///
3776 /// pack notice of failure of convergence to the profile likelihood precentiles
3777 /// this assume that n is never bigger than LONGLINESIZE, a safe assumption
3778 /// n is the number of grid points in the profile calculation, currently set to 9
3779 /// (May 19 2004), changing this number will cause large ripple effects. but see
3780 /// under profile_max_precentile()
3781 long
pack_failed_percentiles(char ** buffer,boolean * failed,long n)3782 pack_failed_percentiles (char **buffer, boolean *failed, long n)
3783 {
3784 long i;
3785 char fp[LONGLINESIZE];
3786 long bufsize = n + ONE;
3787 *buffer = (char *) myrealloc (*buffer, sizeof (char) * bufsize);
3788 memset(*buffer,0,sizeof(char)*bufsize);
3789 for (i = 0; i < n; i++)
3790 fp[i] = failed[i] ? '1' : '0' ;
3791 fp[i]='\0';
3792 strcat (*buffer, fp);
3793 return bufsize;
3794 }
3795
3796 ///
3797 /// unpack notice of failure of convergence to the profile likelihood precentiles
3798 /// this assume that n is never bigger than LONGLINESIZE, a safe assumption
3799 void
unpack_failed_percentiles(char * buffer,boolean * failed,long n)3800 unpack_failed_percentiles (char *buffer, boolean *failed, long n)
3801 {
3802 long i;
3803 char *input;
3804 char *buf = buffer;
3805 input = (char*) mycalloc(LONGLINESIZE,sizeof(char));
3806 sgets (input, LONGLINESIZE, &buf);
3807 //fprintf(stdout,"@%s@\n",input);
3808 for (i = 0; i < n; i++)
3809 {
3810 failed[i] = (input[i] == '1');
3811 // fprintf(stdout,"@%i\n",(int) failed[i]);
3812 }
3813 myfree(input);
3814 }
3815
3816 #endif
3817
3818 /*
3819 // send the data over all loci/replicates to all nodes
3820 // including the master node, so that all nodes can then
3821 // start calculating profiles [see calc_profiles()]
3822 //
3823 void distribute_locidata(world_fmt *world)
3824 {
3825 char *buffer;
3826 pack_loci_data(world, &buffer);
3827 MPI_allgather(buffer);
3828 unpack_loci_data(buffer, world);
3829 myfree(buffer);
3830 }
3831
3832 void pack_loci_data(world_fmt *world, char **buffer)
3833 {
3834 long replicates = world->options->repl
3835 *buffer = myrealloc(*buffer,LONGLINESIZE);
3836 hits = sscanf (input, "%li %li %li %li %li", &world->loci, &world->numpop, &world->numpop2, &tmp, &replicates);
3837 }
3838 */
3839
3840 // necessary for analyzing old sumfiles using MPI
3841 //
3842 // master is reusing mpi_runloci_master()
3843 void
assignloci_worker(world_fmt * world,option_fmt * options,long * Gmax)3844 assignloci_worker (world_fmt * world, option_fmt *options, long * Gmax)
3845 {
3846 boolean done = FALSE;
3847 long locus;
3848 MPI_Status status;
3849 long * twolongs;
3850 char *locusstring;
3851 twolongs = (long *) mycalloc(TWO,sizeof(long));
3852 locusstring = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
3853
3854 world->options->progress=FALSE;
3855 options->progress=FALSE;
3856
3857 while (!done)
3858 {
3859 MYMPIRECV (twolongs, TWO, MPI_LONG, (MYINT) MASTER, (MYINT) MPI_ANY_TAG,
3860 comm_world, &status); //from mpi_runloci_master() around line migrate_mpi.c:163
3861 if (status.MPI_TAG != 0) //stop condition
3862 {
3863 locus = twolongs[0];
3864 sprintf(locusstring,"R%li",locus);
3865 #ifdef DEBUG_MPI
3866 printf("%i>>>>>> received locus %li in assignloci_worker{}\n",myID,locus);
3867 swap_atl (locus, locidone, world);
3868 printf("%i>>>>>> will send locus %li (%s) in assignloci_worker{}\n",myID,locus,locusstring);
3869 #else
3870 swap_atl (locus, locidone, world);
3871 #endif
3872 MYMPISEND (locusstring, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) locus + 1, comm_world);
3873 /* we want to know what locus we worked for
3874 - to control the work sent by master
3875 - to use in setup_parameter0() [combroyden2.c] */
3876 world->who[locidone++] = locus;
3877 //
3878 if (options->replicate && options->replicatenum > 0)
3879 {
3880 world->locus = locus;
3881 world->repkind = MULTIPLERUN;
3882 #ifdef LONGSUM
3883 change_longsum_times (world); //multi run replicates
3884 #endif /*LONGSUM*/
3885 if (!options->bayes_infer)
3886 {
3887 (void) estimateParameter (options->replicatenum, *Gmax, world,
3888 options, world->cov[locus], 1, 'l',
3889 SINGLELOCUS, world->repkind);
3890 }
3891 }
3892
3893 //
3894 }
3895 else
3896 {
3897 done = TRUE;
3898 assign_worker_cleanup();
3899 #ifdef DEBUG_MPI
3900 fprintf(stdout,"%i> STOP: received stop from %i\n",myID, status.MPI_SOURCE);
3901 #endif
3902 }
3903 }
3904 myfree(locusstring);
3905 }
3906
3907 void
swap_atl(long from,long to,world_fmt * world)3908 swap_atl (long from, long to, world_fmt * world)
3909 {
3910 long r;
3911 timearchive_fmt *tmp;
3912 for (r = 0; r < world->options->replicatenum; r++)
3913 {
3914 tmp = &world->atl[r][to];
3915 world->atl[r][to] = world->atl[r][from];
3916 world->atl[r][from] = *tmp;
3917 }
3918 }
3919
3920
3921 #ifdef SLOWNET
3922 void
setup_parameter0_slowmpi(world_fmt * world,nr_fmt * nr,long repkind,long repstart,long repstop,long loci,long kind,boolean multilocus)3923 setup_parameter0_slowmpi (world_fmt * world, nr_fmt * nr, long repkind,
3924 long repstart, long repstop, long loci, long kind,
3925 boolean multilocus)
3926 {
3927 long locus, r;
3928 if (myID != MASTER)
3929 {
3930 if (multilocus)
3931 {
3932 for (locus = 0; locus < loci; locus++)
3933 {
3934 if (repkind == SINGLECHAIN)
3935 {
3936 for (r = repstart; r < repstop; r++)
3937 create_apg0 (nr->apg0[r][locus], nr,
3938 &world->atl[r][locus], locus);
3939 }
3940 else
3941 {
3942 // if (kind != PROFILE)
3943 // {
3944 for (r = repstart; r < repstop; r++)
3945 create_apg0 (nr->apg0[r][locus], nr,
3946 &world->atl[r][locus], locus);
3947 interpolate_like (nr, locus);
3948 // }
3949 // else
3950 // {
3951 for (r = repstart; r < repstop; r++)
3952 create_multiapg0 (nr->apg0[r][locus], nr, r, locus);
3953 // }
3954 }
3955 }
3956 }
3957 else //single locus
3958 {
3959 if (repkind == SINGLECHAIN)
3960 {
3961 for (r = repstart; r < repstop; r++)
3962 create_apg0 (nr->apg0[r][world->locus], nr,
3963 &world->atl[r][world->locus], world->locus);
3964 }
3965 else
3966 {
3967 // if (kind != PROFILE)
3968 // {
3969 for (r = repstart; r < repstop; r++)
3970 create_apg0 (nr->apg0[r][world->locus], nr,
3971 &world->atl[r][world->locus], world->locus);
3972 interpolate_like (nr, world->locus);
3973 // }
3974 for (r = repstart; r < repstop; r++)
3975 create_multiapg0 (nr->apg0[r][world->locus], nr, r,
3976 world->locus);
3977 }
3978 }
3979 }
3980 }
3981 #endif
3982
3983 void
setup_parameter0_mpi(world_fmt * world,nr_fmt * nr,long repkind,long repstart,long repstop,long loci,long kind,boolean multilocus)3984 setup_parameter0_mpi (world_fmt * world, nr_fmt * nr, long repkind,
3985 long repstart, long repstop, long loci, long kind,
3986 boolean multilocus)
3987 {
3988 long locus, r;
3989 long ll;
3990 if (myID != MASTER)
3991 {
3992 if (multilocus)
3993 {
3994 for (ll = 0; ll < locidone; ll++)
3995 {
3996 locus = world->locus = world->who[ll];
3997 if (repkind == SINGLECHAIN)
3998 {
3999 for (r = repstart; r < repstop; r++)
4000 create_apg0 (nr->apg0[r][locus], nr,
4001 &world->atl[r][locus], locus);
4002 }
4003 else
4004 {
4005 // if (kind != PROFILE)
4006 // {
4007 for (r = repstart; r < repstop; r++)
4008 create_apg0 (nr->apg0[r][locus], nr,
4009 &world->atl[r][locus], locus);
4010 interpolate_like (nr, locus);
4011 // }
4012 // else
4013 // {
4014 for (r = repstart; r < repstop; r++)
4015 create_multiapg0 (nr->apg0[r][locus], nr, r, locus);
4016 // }
4017 }
4018 }
4019 }
4020 else //single locus
4021 {
4022 if (repkind == SINGLECHAIN)
4023 {
4024 for (r = repstart; r < repstop; r++)
4025 create_apg0 (nr->apg0[r][world->locus], nr,
4026 &world->atl[r][world->locus], world->locus);
4027 }
4028 else
4029 {
4030 // if (kind != PROFILE)
4031 // {
4032 for (r = repstart; r < repstop; r++)
4033 create_apg0 (nr->apg0[r][world->locus], nr,
4034 &world->atl[r][world->locus], world->locus);
4035 interpolate_like (nr, world->locus);
4036 // }
4037 for (r = repstart; r < repstop; r++)
4038 create_multiapg0 (nr->apg0[r][world->locus], nr, r,
4039 world->locus);
4040 }
4041 }
4042 }
4043 }
4044
4045 ///
4046 /// checks whether a node is already in the mpistack list
in_mpistack(int sender,world_fmt * world)4047 boolean in_mpistack(int sender, world_fmt *world)
4048 {
4049 long i;
4050 for (i=0;i < world->mpistacknum; i++)
4051 {
4052 if((world->mpistack[i]) == sender)
4053 return TRUE;
4054 }
4055 return FALSE;
4056 }
4057
handle_replication(int sender,int tag,char * tempstr,world_fmt * world)4058 void handle_replication(int sender,int tag,char *tempstr, world_fmt *world)
4059 {
4060 //long pos=0;
4061 long i;
4062 long * temp;
4063 int realsender;
4064 long locus;
4065 long replicate;
4066 int replicator;
4067 // boolean from_locus_sender = (sender <= world->loci);
4068 // boolean from_replicator = (sender > world-> loci);
4069 sscanf(tempstr+1,"%i%li%li", &realsender, &locus, &replicate);
4070 //if(from_replicator)
4071 // warning("these guys should not send to here");
4072 temp = (long *) mycalloc(3, sizeof(long));
4073 temp[0] = sender;
4074 temp[1] = locus;
4075 temp[2] = replicate;
4076 if(world->mpistacknum > 0)
4077 {
4078 world->mpistacknum -= 1;
4079 replicator = world->mpistack[world->mpistacknum];
4080 //printf("%i> checking out replicator:[%4li] %i\n",myID, world->mpistacknum,replicator);
4081 MYMPISEND(temp, 3, MPI_LONG, replicator, (MYINT) tag, comm_world);
4082 }
4083 else
4084 {
4085 if(world->mpistack_requestnum>=world->mpistack_request_numalloc)
4086 {
4087
4088 world->mpistack_request = (mpirequest_fmt *) myrealloc(world->mpistack_request,
4089 sizeof(mpirequest_fmt)*(world->mpistack_requestnum+10));
4090 for(i=world->mpistack_request_numalloc; i < world->mpistack_requestnum + 10; i++)
4091 {
4092 world->mpistack_request[i].tempstr = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
4093 }
4094 world->mpistack_request_numalloc = world->mpistack_requestnum + 10;
4095 }
4096 strcpy(world->mpistack_request[world->mpistack_requestnum].tempstr, tempstr);
4097 world->mpistack_request[world->mpistack_requestnum].sender = sender;
4098 world->mpistack_request[world->mpistack_requestnum].tag = tag;
4099 world->mpistack_requestnum += 1;
4100 // printf("%i> MASTER: added request from n%i to request-stack position %li\n",myID, sender, world->mpistack_requestnum);
4101 }
4102 myfree(temp);
4103 }
4104
handle_mdim(float * values,long n,int sender,world_fmt * world)4105 void handle_mdim(float *values,long n, int sender, world_fmt * world)
4106 {
4107 register long i;
4108 register long z;
4109
4110 int digits;
4111 #ifdef ZNZ
4112 znzFile file = world->bayesmdimfile;
4113 #else
4114 FILE *file = world->bayesmdimfile;
4115 #endif
4116 register char *temp;
4117
4118 temp = (char *) mycalloc(n*20+LINESIZE,sizeof(char));
4119 z = sprintf(temp,"%li\t%li\t%li\t%f\t%f\t%f\t%f\t%li\t%f",
4120 (long) values[0], (long) values[1]+1,(long) values[2]+1,
4121 values[3], values[4], values[5], values[6],(long) values[7], values[8]);
4122 for(i=9;i<n; i++)
4123 {
4124 //fprintf (stdout, "%f (%li) ",values[i], i);
4125 if(i > (world->numpop2 + world->bayes->mu))
4126 {
4127 z += sprintf(temp+z,"\t%f",values[i]);
4128 continue;
4129 }
4130 if(values[i] < SMALLEPSILON)
4131 {
4132 z += sprintf(temp+z,"\t0");
4133 }
4134 else
4135 {
4136 digits = (long) log10(values[i]);
4137 switch(digits)
4138 {
4139 case -8:
4140 case -6:
4141 case -5:
4142 // fmt = 10;
4143 z += sprintf(temp + z,"\t%.10f", values[i]);
4144 break;
4145 case -4:
4146 case -3:
4147 //fmt = 8;
4148 z += sprintf(temp + z,"\t%.8f", values[i]);
4149 break;
4150 case -2:
4151 case -1:
4152 // fmt= 5;
4153 z += sprintf(temp + z,"\t%.5f", values[i]);
4154 break;
4155 case 0:
4156 case 1:
4157 // fmt = 4;
4158 z += sprintf(temp + z,"\t%.4f", values[i]);
4159 break;
4160 case 2:
4161 // fmt = 2;
4162 z += sprintf(temp + z,"\t%.2f", values[i]);
4163 break;
4164 case 3:
4165 // fmt = 1;
4166 z += sprintf(temp + z,"\t%.1f", values[i]);
4167 break;
4168 case 4:
4169 case 5:
4170 case 6:
4171 case 7:
4172 case 8:
4173 // fmt = 0;
4174 z += sprintf(temp + z,"\t%.0f", values[i]);
4175 break;
4176 default:
4177 if(digits<-8)
4178 {
4179 // fmt=20;
4180 z += sprintf(temp + z,"\t%.20f", values[i]);
4181 }
4182 else
4183 {
4184 // fmt = 5;
4185 z += sprintf(temp + z,"\t%.5f", values[i]);
4186 }
4187 break;
4188 }
4189 // fprintf(stdout,"\t%f", values[i]);
4190 }
4191 }
4192 // fprintf (stdout, " [%i] \n", myID);
4193 #ifdef ZNZ
4194 znzprintf(file,"%s\n",temp);
4195 #else
4196 fprintf(file,"%s\n",temp);
4197 fflush(file);
4198 #endif
4199 myfree(temp);
4200 //fprintf(stdout,"\n");
4201 // calculate_parallel_convergence(world, values, size);
4202 }
4203
handle_message(char * rawmessage,int sender,world_fmt * world)4204 void handle_message(char *rawmessage,int sender, world_fmt * world)
4205 {
4206 char *rawptr;
4207 long pos=0;
4208 void *file = (void *) stdout;
4209 rawptr = rawmessage;
4210 //fprintf(stderr,"%i> handle_message: %s\n",myID, rawmessage);
4211 set_filehandle(rawmessage, world, &file, &pos);
4212 //fprintf(stderr,"%i> handle_message: pos=%li\n",myID,pos);
4213 //fprintf(stderr,"%i> handle_message: %s\n",myID, rawmessage + pos);
4214 fprintf((FILE *) file,"%s", rawptr + pos);
4215 fflush((FILE *) file);
4216 }
4217
handle_burnin_message(char * rawmessage,int sender,world_fmt * world)4218 void handle_burnin_message(char *rawmessage,int sender, world_fmt * world)
4219 {
4220 static long z = 0;
4221 static boolean done = FALSE;
4222 static long *replicates;
4223 long locus;
4224 long step;
4225 MYREAL var;
4226 MYREAL oldvar;
4227 MYREAL ess;
4228 MYREAL acceptance;
4229 if(!done)
4230 {
4231 replicates = mycalloc(world->loci,sizeof(long));
4232 done = TRUE;
4233 }
4234 #ifdef USE_MYREAL_FLOAT
4235 sscanf(rawmessage,"%li%f%f%f%f%li",&locus, &ess, &acceptance, &var, &oldvar, &step);
4236 #else
4237 sscanf(rawmessage,"%li%lf%lf%lf%lf%li",&locus, &ess, &acceptance, &var, &oldvar, &step);
4238 #endif
4239 replicates[locus] += 1;
4240 if(world->options->verbose)
4241 {
4242 fprintf(stdout,"%i> Burn-in on node %i (locus=%li, repl=%li) stopped at step %li with variance-ratio=%.2f/%.2f=%.3f\n min(ess)=%f and avg(acceptance)=%f", myID, sender, locus, replicates[locus], step, var, oldvar, var/oldvar, ess, acceptance);
4243 }
4244 world->burnin_stops[z].locus = locus;
4245 world->burnin_stops[z].replicate = replicates[locus];
4246 world->burnin_stops[z].stopstep = step;
4247 world->burnin_stops[z].ess = ess;
4248 world->burnin_stops[z].accept = acceptance;
4249 world->burnin_stops[z].variance = var;
4250 world->burnin_stops[z].oldvariance = oldvar;
4251 world->burnin_stops[z].worker = sender;
4252 z++;
4253 #ifdef DEBUG
4254 printf("%i> handle burnin_stop: z=%li maxalloc=maxrep*loci\n",myID,z);
4255 #endif
4256 }
4257
4258 ///
4259 /// sets up a file database so that the master and worker worker end up writing to the same file
4260 /// the workers send the stuff to the master and the master then figures out (using the db)
4261 /// what file pointer the stuff was intended for.
4262 /// needs globals filedb, and filenum
setup_filehandle_db(FILE * file,world_fmt * world,option_fmt * options,data_fmt * data)4263 void setup_filehandle_db(FILE *file, world_fmt *world, option_fmt *options, data_fmt *data)
4264 {
4265 long filehandle = get_filehandle(file, world, options, data);
4266 filedb[filenum].file = file;
4267 filedb[filenum++].handle = filehandle;
4268 #ifdef DEBUG_MPI
4269 fprintf(stdout,"filedb %li: %p %li\n",filenum, file,filehandle);
4270 #endif
4271 }
4272
retrieve_filehandle(FILE * file)4273 long retrieve_filehandle(FILE *file)
4274 {
4275 long i=0;
4276 long filehandle = 0;
4277 while(filedb[i].file != file && i<filenum)
4278 i++;
4279 if(i!=filenum)
4280 filehandle = filedb[i].handle;
4281 return filehandle;
4282 }
4283
get_filehandle(void * vfile,world_fmt * world,option_fmt * options,data_fmt * data)4284 long get_filehandle(void *vfile, world_fmt *world, option_fmt *options, data_fmt *data)
4285 {
4286 #ifdef ZNZ
4287 if(((znzFile) vfile) == world->bayesmdimfile)
4288 return BAYESMDIMFILENUM;
4289 FILE * file = (FILE *) vfile;
4290 #else
4291 FILE * file = (FILE *) vfile;
4292 if(file == world->bayesmdimfile)
4293 return BAYESMDIMFILENUM;
4294 #endif
4295 if(file == stdout)
4296 return STDOUTNUM;
4297 if(file == options->logfile)
4298 return LOGFILENUM;
4299 if(file == world->outfile)
4300 return OUTFILENUM;
4301 if(file == options->aicfile)
4302 return AICFILENUM;
4303 if(file == world->mathfile)
4304 return MATHFILENUM;
4305 if(file == world->mighistfile)
4306 return MIGHISTFILENUM;
4307 if(file == world->skylinefile)
4308 return SKYLINEFILENUM;
4309 if(file == world->bayesfile)
4310 return BAYESFILENUM;
4311 if(file == world->pdfoutfile)
4312 return PDFOUTFILENUM;
4313 if(file == world->treefile)
4314 return TREEFILENUM;
4315 return STDOUTNUM;
4316 }
4317
get_filehandle2(void * vfile,world_fmt * world)4318 long get_filehandle2(void *vfile, world_fmt *world)
4319 {
4320 FILE *file ;
4321 #ifdef ZNZ
4322 if(((znzFile) vfile) == world->bayesmdimfile)
4323 return BAYESMDIMFILENUM;
4324 #else
4325 if(((FILE *) vfile) == world->bayesmdimfile)
4326 return BAYESMDIMFILENUM;
4327 #endif
4328 file = (FILE*) vfile;
4329 if(file == stdout)
4330 return STDOUTNUM;
4331 if(file == world->options->logfile)
4332 return LOGFILENUM;
4333 if(file == world->outfile)
4334 return OUTFILENUM;
4335 if(file == world->mighistfile)
4336 return MIGHISTFILENUM;
4337 if(file == world->skylinefile)
4338 return SKYLINEFILENUM;
4339 if(file == world->options->aicfile)
4340 return AICFILENUM;
4341 if(file == world->mathfile)
4342 return MATHFILENUM;
4343 if(file == world->bayesfile)
4344 return BAYESFILENUM;
4345 if(file == world->treefile)
4346 return TREEFILENUM;
4347 // fprintf(stdout,"@@@@@@@@@@@@@@@wrong wrong wrong@@@@@@@@@@@@@@@@\n");
4348 return STDOUTNUM;
4349 }
4350
set_filehandle(char * message,world_fmt * world,void ** file,long * msgstart)4351 void set_filehandle(char *message, world_fmt *world,
4352 void **file, long *msgstart)
4353 {
4354 char *temp;
4355 long filenum;
4356 long i = 1;
4357 temp = (char *) mycalloc(10,sizeof(char));
4358 if(message[0] == '\0')
4359 {
4360 warning("%i> set_filehandle() the message was NULL",myID);
4361 }
4362 temp[0] = message[i];
4363 while(temp[i-1]!=':' && i < 9 && message[i]!='\0')
4364 {
4365 #ifdef DEBUG_MPI
4366 fprintf(stderr,"%i>>>>>> temp =%s\n",myID,temp);
4367 fprintf(stderr,"%i>>>>>> temp[%li]=%c\n",myID,i,temp[i]);
4368 fprintf(stderr,"%i>>>>>> temp =%s\n",myID,message);
4369 #endif
4370 i++;
4371 temp[i-1] = message[i];
4372 }
4373 *msgstart = i+1;
4374 filenum = atol(temp);
4375 // fprintf(stdout,"\n@@@@@@@@@@@@@@@@@%li@%s@%li@\n",filenum,temp,i);
4376 myfree(temp);
4377 switch(filenum)
4378 {
4379 case STDOUTNUM:
4380 {
4381 // fprintf(stdout,"\n");
4382 *file = stdout;
4383 return;
4384 }
4385 case LOGFILENUM:
4386 {
4387 // fprintf(stdout," logfile\n");
4388 *file = world->options->logfile;
4389 return;
4390 }
4391 case OUTFILENUM:
4392 {
4393 *file = world->outfile;
4394 return ;
4395 }
4396 case AICFILENUM:
4397 {
4398 *file = world->options->aicfile;
4399 return ;
4400 }
4401 case MATHFILENUM:
4402 {
4403 *file = world->mathfile;
4404 return ;
4405 }
4406 case MIGHISTFILENUM:
4407 {
4408 *file = world->mighistfile;
4409 return ;
4410 }
4411 case SKYLINEFILENUM:
4412 {
4413 *file = world->skylinefile;
4414 return ;
4415 }
4416 case BAYESFILENUM:
4417 {
4418 *file = world->bayesfile;
4419 return ;
4420 }
4421 case BAYESMDIMFILENUM:
4422 {
4423 *file = world->bayesmdimfile;
4424 return ;
4425 }
4426 case TREEFILENUM:
4427 {
4428 *file = world->treefile;
4429 return ;
4430 }
4431 case PDFOUTFILENUM:
4432 {
4433 *file = world->pdfoutfile;
4434 return ;
4435 }
4436 }
4437 *file = stdout;
4438 return;
4439 }
4440
4441
4442 void
mpi_fprintf(FILE * file,const char * fmt,...)4443 mpi_fprintf(FILE *file, const char *fmt, ...)
4444 {
4445 char *p1 = NULL;
4446 char *p = NULL;
4447 va_list ap;
4448 long filehandle = 0;
4449 long bufsize = 0;
4450
4451 long pallocsize = LINESIZE;
4452 p = (char *) mycalloc(pallocsize,sizeof(char));
4453 p1 = (char *) mycalloc(STRSIZE,sizeof(char));
4454 if(myID != MASTER)
4455 {
4456 filehandle = retrieve_filehandle(file);
4457 bufsize = sprintf(p, "%c%li:",'M',filehandle);
4458 }
4459 va_start(ap, fmt);
4460 bufsize += vsprintf(p+bufsize, fmt, ap);
4461 if(bufsize >= pallocsize)
4462 error("failed in mpi_printf(): problem with buffer size!");
4463 if(myID != MASTER)
4464 {
4465 sprintf(p1,"M%li",bufsize);
4466 MYMPISEND (p1, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4467 MYMPISEND (p, bufsize, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4468 }
4469 else
4470 {
4471 fprintf(file,"%s", p);
4472 }
4473 va_end(ap);
4474 myfree(p);
4475 myfree(p1);
4476 }
4477
4478 void
mpi_fprintf2(FILE * file,long filesize,const char * fmt,...)4479 mpi_fprintf2(FILE *file, long filesize, const char *fmt, ...)
4480 {
4481 char *p1;
4482 char *p;
4483 va_list ap;
4484 long filehandle = 0;
4485 long bufsize = 0;
4486
4487 long pallocsize = filesize+strlen(fmt)+10;//leave room for "M:number"
4488
4489 p = (char *) mycalloc(pallocsize,sizeof(char));
4490 p1 = (char *) mycalloc(STRSIZE,sizeof(char));
4491 if(myID!=MASTER)
4492 {
4493 filehandle = retrieve_filehandle(file);
4494 bufsize = sprintf(p, "%c%li:",'M',filehandle);
4495 }
4496 va_start(ap, fmt);
4497 bufsize += vsprintf(p+bufsize, fmt, ap);
4498 if(bufsize>=pallocsize)
4499 {
4500 warning("Failing because bufsize=%li >= allocsize=%li\n",bufsize,pallocsize);
4501 error("failed in mpi_printf2()");
4502 }
4503 if(myID!=MASTER)
4504 {
4505 sprintf(p1,"M%li",bufsize);
4506 MYMPISEND (p1, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4507 MYMPISEND (p, bufsize, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4508 }
4509 else
4510 fprintf(file,"%s", p);
4511 va_end(ap);
4512 myfree(p);
4513 myfree(p1);
4514 }
4515
4516 ///
4517 /// sends raw bayesian parameters to master, using label 'Z' to match on the master side
4518 #ifdef PARALIO
mpi_mdim_send(MPI_File * file,char * values,long size)4519 void mpi_mdim_send(MPI_File *file, char *values, long size)
4520 #else
4521 void mpi_mdim_send(float *values, long size)
4522 #endif
4523 {
4524 #ifdef PARALIO
4525 MPI_Status status;
4526 char error_string[LINESIZE];
4527 int length_of_error_string, error_class;
4528 #else
4529 char *p1;
4530 p1 = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
4531 if(myID!=MASTER)
4532 {
4533 #endif /*with paralio the master writes the header using this service*/
4534 #ifdef PARALIO
4535 my_write_error = MPI_File_write(*file,values,size,MPI_CHAR, &status);
4536 if (my_write_error != MPI_SUCCESS)
4537 {
4538 MPI_Error_class(my_write_error, &error_class);
4539 MPI_Error_string(error_class, error_string, &length_of_error_string);
4540 printf("%i> %s\n", myID, error_string);
4541 MPI_Error_string(my_write_error, error_string, &length_of_error_string);
4542 printf("%i> %s\n", myID, error_string);
4543 my_write_error = TRUE;
4544 }
4545 #else
4546 sprintf(p1,"Z%li",size);
4547 MYMPISEND (p1, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4548 #ifdef DEBUG_MPI
4549 fprintf(stdout,"%i> mdimlast=%f\n",myID,values[size-1]);
4550 #endif
4551 MYMPISEND (values, size, MPI_FLOAT, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4552 #endif
4553 #ifndef PARALIO /* without paralio the master is not allowed here*/
4554 }
4555 else
4556 error("master sends itself bayesallfile stuff");
4557 myfree(p1);
4558 #endif
4559 }
4560
4561
4562 ///
4563 /// assembles the data from a replicant (receives materials from mpi_send_replicant()
4564 ///
4565 void
4566 mpi_receive_replicate(int sender, int tag, long locus, long replicate, world_fmt * world)
4567 {
4568 MYREAL *buffer;
4569 long bufsize=1;
4570 MPI_Status status;
4571 long z;
4572 MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4573 #ifdef DEBUG_MPI
4574 fprintf(stdout,"%i> WORKER: mpi_receive_replicate received bufsize=%li from sender=%i with tag=%i\n",myID, bufsize, status.MPI_SOURCE, status.MPI_TAG);
4575 sender = status.MPI_SOURCE;
4576 tag = status.MPI_TAG;
4577 fprintf(stdout,"%i> mpi_receive_replicate received bufsize=%li from sender=%i with tag=%i\n",myID, bufsize, sender, tag);
4578 #endif
4579 buffer = (MYREAL *) mycalloc (bufsize, sizeof (MYREAL));
4580 MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4581 //fprintf(stdout,"%i> received bufsize is really %li and bufsize=%li\n",myID,(long) (long) strlen(buffer),bufsize);
4582 if(world->options->bayes_infer)
4583 {
4584 unpack_single_bayes_buffer(buffer,world->bayes,world,locus);
4585 }
4586 else
4587 {
4588 z=0;
4589 unpack_single_sumfile_buffer(buffer, world->atl, world, locus, replicate, world->numpop, &z);
4590 // fprintf(stdout,"%i> replicate=%li locus=%li\n",myID, replicate, locus);
4591 }
4592 if(world->options->mighist)
4593 {
4594 MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4595 #ifdef DEBUG_MPI
4596 fprintf(stdout,"%i> mpi_receive_replicate received mighistogram bufsize=%li from sender=%i with tag=%i\n",
4597 myID, bufsize, status.MPI_SOURCE, status.MPI_TAG);
4598 #endif
4599 buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4600 MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4601 unpack_mighist_replicate_buffer(buffer, world, locus, world->numpop);
4602 }
4603 if(world->options->mighist && world->options->skyline)
4604 {
4605 MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4606 buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4607 MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4608 unpack_skyline_buffer(buffer, world, locus, -1, world->numpop);
4609 }
4610 // send best tree if available
4611 // if(world->options->treeprint == BEST && world->options->treeinmemory == TRUE)
4612 if(world->options->treeinmemory == TRUE)
4613 {
4614 MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4615 buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4616 // printf("%i> mpi_receive_buffer() received treespace buffersize: %li %p\n", myID, bufsize, buffer);
4617 MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4618 unpack_treespace_buffer(buffer, world, locus, -1, world->numpop);
4619 }
4620
4621 // BF material
4622 if(world->options->bayes_infer)
4623 {
4624 MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4625 buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4626 MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4627 if(!world->data->skiploci[locus])
4628 {
4629 //printf("%i> mpi_receive_buffer() received BF buffersize: %li %p\n", myID, bufsize, buffer);
4630 bufsize = unpack_BF_buffer(buffer, 0, locus, world);
4631 }
4632 // ESS material
4633 bufsize = unpack_ess_buffer(buffer, bufsize, world);
4634 }
4635
4636 myfree(buffer);
4637 }
4638
4639 ///
4640 /// replicant sends data to sub-master
4641 ///
4642 void
4643 mpi_send_replicate(int sender, long locus, long replicate, world_fmt * world)
4644 {
4645 long allocbufsize = 1;
4646 long bufsize = 0;
4647 long numpop = world->numpop;
4648 long numpop2 = numpop * numpop;
4649 //long numpop2plus = numpop2 + 2 * numpop;
4650 long npp = numpop2 + world->bayes->mu * world->loci;
4651 MYREAL *buffer = NULL;
4652 // timearchive_fmt **ta = world->atl;
4653 buffer = (MYREAL *) mycalloc (allocbufsize, sizeof (MYREAL));
4654
4655 if(world->options->bayes_infer)
4656 {
4657 bufsize = pack_single_bayes_buffer(&buffer,world->bayes,world,locus);
4658 allocbufsize = bufsize;
4659 }
4660 else
4661 {
4662 bufsize = pack_single_sumfile_buffer(&buffer, 0, &allocbufsize, world, locus, replicate, world->numpop);
4663 }
4664
4665 MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4666 MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4667 if(world->options->mighist)
4668 {
4669 bufsize = pack_mighist_buffer(&buffer, world, locus, -1, numpop);
4670 allocbufsize = bufsize;
4671 MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4672 MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4673 }
4674
4675 if(world->options->mighist && world->options->skyline)
4676 {
4677 bufsize = pack_skyline_buffer(&buffer, world, locus, -1, numpop);
4678 allocbufsize = bufsize;
4679 MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4680 MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4681 }
4682 // send best tree if available
4683 if(/*world->options->treeprint == BEST &&*/ world->options->treeinmemory == TRUE)
4684 {
4685 // printf("%i> send treespace buffer to %li", myID,locus+1+REPTAG);
4686 bufsize = pack_treespace_buffer(&buffer, world, locus, -1, numpop);
4687 allocbufsize = bufsize;
4688 MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4689 MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4690 }
4691
4692 // BF material
4693 if(world->options->bayes_infer)
4694 {
4695 bufsize = world->options->heated_chains * world->loci + 5 * world->loci + 20 * (npp+1);
4696 buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4697 if(!world->data->skiploci[locus])
4698 {
4699 //fprintf(stderr,"%i> REPLICANT: packed result locus=%li replicate %li\n",myID,locus, replicate);
4700 bufsize = pack_BF_buffer(&buffer, 0, locus, world);
4701 }
4702 // ESS material
4703 bufsize = pack_ess_buffer(&buffer, bufsize, world);
4704 // send BF and ESS material
4705 MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4706 MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4707 }
4708 myfree(buffer);
4709 }
4710
4711 #ifdef MPICHECK
4712 void set_memory_limit(rlim_t softsize,rlim_t maxsize)
4713 {
4714 struct rlimit r;
4715 r.rlim_cur=softsize;
4716 r.rlim_max=maxsize;
4717 setrlimit(RLIMIT_AS, &r);
4718 }
4719
4720 void check_memory_limit()
4721 {
4722 struct rlimit r;
4723 getrlimit(RLIMIT_AS, &r);
4724 fprintf(stdout, "%i> current memory/data usage: %f\n",myID, (double) r.rlim_cur);
4725 }
4726 #endif
4727 #endif /* MPI */
4728