1 /*! \File migrate_mpi.c */
2 /* MPI parts for migrate
3    started November 2000, Seattle
4    Peter Beerli beerli@fsu.edu
5 
6 
7 Copyright 1996-2002 Peter Beerli and Joseph Felsenstein, Seattle WA
8 Copyright 2003-2007 Peter Beerli, Tallahassee FL
9 
10  Permission is hereby granted, free of charge, to any person obtaining
11  a copy of this software and associated documentation files (the
12  "Software"), to deal in the Software without restriction, including
13  without limitation the rights to use, copy, modify, merge, publish,
14  distribute, sublicense, and/or sell copies of the Software, and to
15  permit persons to whom the Software is furnished to do so, subject
16  to the following conditions:
17 
18  The above copyright notice and this permission notice shall be
19  included in all copies or substantial portions of the Software.
20 
21  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
23  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
24  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
25  ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
26  CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
27  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 
29 
30 $Id: migrate_mpi.c 2170 2013-09-19 12:08:27Z beerli $
31 */
32 #ifdef MPI
33 #include "migration.h"
34 #include "tools.h"
35 #include "sighandler.h"
36 #include "migrate_mpi.h"
37 #include "broyden.h"
38 #include "combroyden.h"
39 #include "gammalike.h"
40 #include "profile.h"
41 #include "pretty.h"
42 #include "options.h"
43 #include "tree.h"
44 #include "world.h"
45 #include "joint-chains.h"
46 #include "data.h"
47 #include "laguerre.h"
48 #include "random.h"
49 #ifdef UEP
50 #include "uep.h"
51 #endif
52 #include "bayes.h"
53 #ifndef WINDOWS
54 #include <unistd.h>
55 #endif
56 /*should go into profile.h*/
57 #define GRIDSIZE 9
58 
59 #ifdef PRETTY
60 extern float page_height;
61 #endif
62 
63 extern int numcpu;
64 
65 extern const MPI_Datatype mpisizeof;
66 
67 extern void run_replicate(long locus,
68                           long replicate,
69                           world_fmt **universe,
70                           option_fmt *options,
71                           data_fmt *data,
72                           tpool_t * heating_pool,
73                           int usize,
74                           long *treefilepos,
75                           long *Gmax);
76 extern void run_locus (world_fmt ** universe, int usize,
77                        option_fmt * options, data_fmt * data,
78                        tpool_t * heating_pool, long maxreplicate,
79                        long locus, long *treefilepos, long *Gmax);
80 
81 void mpi_run_locus(world_fmt ** universe, int usize, option_fmt * options,
82                    data_fmt * data, tpool_t * heating_pool, long maxreplicate,
83                    long locus, long *treefilepos, long *Gmax);
84 void mpi_runreplicates_worker (world_fmt ** universe, int usize,
85                                option_fmt * options, data_fmt * data,
86                                tpool_t * heating_pool,
87                                long *treefilepos, long *Gmax);
88 long pack_databuffer (data_fmt * data, option_fmt * options);
89 void unpack_databuffer (data_fmt * data, option_fmt * options);
90 void pack_allele_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data,
91                        long pop, long ind);
92 void pack_sequence_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data,
93                          long pop, long ind, long locus);
94 void mpi_gradient_master (nr_fmt * nr, world_fmt * world, int *who);
95 void mpi_resultsmaster (long sendtype, world_fmt * world,
96                          long maxrep,
97                          void (*unpack) (char *buffer, world_fmt * world,
98                                          long locus, long maxrep,
99                                          long numpop));
100 
101 void mpi_results_worker (long bufs, world_fmt * world,
102                          long maxrep,
103                          long (*pack) (MYREAL **buffer, world_fmt * world,
104                                        long locus, long maxrep, long numpop));
105 void assignloci_worker (world_fmt * world, option_fmt *options, long *Gmax);
106 void assign_worker_cleanup (void);
107 void swap_atl (long from, long to, world_fmt * world);
108 long pack_quantile (char **buffer, quantile_fmt quant, long n);
109 void unpack_quantile (char *buffer, quantile_fmt quant, long n);
110 long pack_failed_percentiles (char **buffer, boolean *failed, long n);
111 void unpack_failed_percentiles (char *buffer, boolean *failed, long n);
112 
113 void handle_message(char *rawmessage,int sender, world_fmt *world);
114 void handle_mdim(float *values,long n, int sender, world_fmt * world);
115 void handle_burnin_message(char *rawmessage,int sender, world_fmt * world);
116 
117 void set_filehandle(char *message, world_fmt *world,
118                     void **file, long *msgstart);
119 
120 void mpi_receive_replicate( int sender, int tag, long locus, long replicate, world_fmt * world);
121 
122 long unpack_single_bayes_buffer(MYREAL *buffer, bayes_fmt * bayes, world_fmt * world,long locus);
123 long pack_single_bayes_buffer(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus);
124 long pack_single_bayes_buffer_part(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus);
125 
126 void unpack_hist_bayes_buffer(MYREAL *buffer, bayes_fmt *bayes, world_fmt *world, long locus);
127 long pack_hist_bayes_buffer(MYREAL **buffer, bayeshistogram_fmt *hist, world_fmt * world, long startposition);
128 
129 long unpack_BF_buffer(MYREAL *buffer, long start, long locus, world_fmt * world);
130 long unpack_ess_buffer(MYREAL *buffer, long start, world_fmt *world);
131 long pack_BF_buffer(MYREAL **buffer, long start, long locus, world_fmt * world);
132 long pack_ess_buffer(MYREAL **buffer, long start, world_fmt *world);
133 
134 void unpack_sumfile_buffer (MYREAL *buffer, world_fmt * world,
135                             long locus, long maxrep, long numpop);
136 void unpack_single_sumfile_buffer (MYREAL *buffer, timearchive_fmt **ta, world_fmt *world,
137                                    long locus, long replicate, long numpop, long *startz);
138 long
139 pack_sumfile_buffer (MYREAL **buffer, world_fmt * world,
140                      long locus, long maxrep, long numpop);
141 
142 long pack_single_sumfile_buffer(MYREAL **buffer, long z, long *allocbufsize, world_fmt * world,
143                                 long locus, long replicate, long numpop);
144 
145 void mpi_send_replicate(int sender, long locus, long replicate, world_fmt * world);
146 long  mpi_send_stop_mcmc_lociworker(long numcpu, long loci);
147 long  mpi_send_stop_mcmc_replicateworker(long numcpu, long loci);
148 void mpi_send_stop_tag (int worker, world_fmt * world);
149 long  mpi_send_stop_mcmc_worker(long numcpu, long loci, MPI_Comm *comm, MPI_Request *irequests, MPI_Status *istatus, long id);
150 void mpi_send_stop_tag (int worker, world_fmt * world);
151 void send_receive_bayes_params(world_fmt *world, long locus);
152 void handle_replication(int sender,int tag,char *tempstr, world_fmt *world);
153 boolean in_mpistack(int sender, world_fmt *world);
154 
155 #ifdef MPICHECK
156 #include <sys/resource.h>
157 void set_memory_limit(rlim_t softsize,rlim_t maxsize);
158 void check_memory_limit();
159 #endif
160 
161 #ifdef PARALIO
162 boolean my_write_error;
163 #endif
164 
165 
166 ////////////////////////////////////////////////////////////////////////////////
167 ////////////////////////////////////////////////////////////////////////////////
168 ////////////////////////////////////////////////////////////////////////////////
169 ////////////////////////////////////////////////////////////////////////////////
170 
171 ///
172 /// Controls all loci in the MPI implementation, uses a load balancing scheme and
173 /// distributes the work on the waiting nodes
174 void
mpi_runloci_master(long loci,int * who,world_fmt * world,boolean options_readsum,boolean menu)175 mpi_runloci_master (long loci, int *who, world_fmt *world, boolean options_readsum, boolean menu)
176 {
177     boolean done = FALSE;
178     long alldone = 0;
179     int tag;
180     int sender       = 0;
181 
182     long locus;
183     long *twolongs;
184     long locusdone   = -1;
185     long numsent     = 0;
186     long tempstrsize = LINESIZE;
187     long nbase       = loci + 1;
188     long minnodes    = MIN((long) numcpu-1, (long) nbase-1);
189 
190     char *tempstr;
191     char *leadstr;
192     float *temp;
193     long nn = world->numpop2 + 9 + world->bayes->mu + world->options->heated_chains + 2;
194 
195     MPI_Status status;
196     MPI_Status *istatus;
197     MPI_Request *irequests;
198 
199     int ll;
200 #ifdef IPROBE
201     int notwaiting=0;
202 #endif
203     irequests  = (MPI_Request *) mycalloc(minnodes, sizeof(MPI_Request));
204     istatus    = (MPI_Status *) mycalloc(minnodes, sizeof(MPI_Status));
205     twolongs   = (long *) mycalloc(TWO, sizeof(long));
206     tempstr    = (char *) mycalloc(tempstrsize, sizeof(char));
207     leadstr    = (char *) mycalloc(SMALLBUFSIZE, sizeof(char));
208     temp       = (float *) mycalloc(nn, sizeof(float));
209     twolongs[1]= 0;
210     //    	printf("%i> MASTER: minnodes = %li\n+++++++++++++++++++++\n",myID, minnodes);
211     for (locus = 0; locus < minnodes; locus++)
212       {
213 	twolongs[0] = locus;
214 	ll = (int) (locus + 1);
215 	MYMPIISEND(twolongs, TWO, MPI_LONG, ll, ll, comm_world, &irequests[numsent]);
216 	numsent++;
217 	//	printf("%i> MASTER: Locus %li sent to worker n%i twolong[0]=%li\n",myID, locus, (MYINT) locus + 1, twolongs[0]);
218       }
219     // waits until all minodes nodes received their message
220     MYMPIWAITALL(minnodes, irequests, istatus);
221     //printf("%i> MASTER: after initial send of loci\n",myID);
222 
223     locus = 0;
224     world->mpistacknum = 0;
225     while((alldone < world->loci) || (world->mpistacknum < numcpu-1))
226       {
227         done=FALSE;
228         while(!done)
229 	  {
230 	    memset(tempstr,0,sizeof(char)*tempstrsize);
231 #ifdef IPROBE
232 	    if(menu)
233 	      {
234 		MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm_world, &notwaiting, &status);
235 		if(!notwaiting)
236 		  {
237 		    sleep(1);
238 		    get_time (tempstr, "%H:%M:%S");
239 		    fprintf(stdout,"%i> master waits for workers -- %s\n",myID, tempstr);
240 		    continue;
241 		  }
242 	      }
243 #endif
244 	    MYMPIRECV (leadstr, SMALLBUFSIZE, MPI_CHAR, (MYINT) MPI_ANY_SOURCE,
245 		       (MYINT) MPI_ANY_TAG, comm_world, &status);
246 	    sender = status.MPI_SOURCE;
247 	    tag = status.MPI_TAG;
248 	    //if(tempstr[0] != 'M')
249 	      //printff("%i> MASTER: received from n%i with tag %i: %s\n",myID,sender,tag,tempstr);
250 	    switch(leadstr[0])
251 	      {
252 	      case 'M':
253 		tempstrsize = atol(leadstr+1);
254 		tempstr = (char*) realloc(tempstr,sizeof(char)*(tempstrsize+1));
255 		memset(tempstr,0,sizeof(char)*(tempstrsize+1));
256 		MYMPIRECV (tempstr, tempstrsize, MPI_CHAR, sender, tag,
257 			   comm_world, &status);
258 		//		printf("%i> MASTER: message from n%i: %s\n",myID,sender,tempstr);
259 		handle_message(tempstr,sender, world);
260 		break;
261 	      case 'Z': // reading/writing of the raw bayes posterior data , also used to guide
262 		// multiple replicates on when to start sampling
263 		MYMPIRECV (temp, nn, MPI_FLOAT, sender, tag,
264 			   comm_world, &status);
265 		handle_mdim(temp,nn,sender, world);
266 		break;
267 	      case 'B': //burnin stopping rule
268 		tempstrsize = atol(leadstr+1);
269 		tempstr = (char*) realloc(tempstr,sizeof(char)*(tempstrsize+1));
270 		memset(tempstr,0,sizeof(char)*(tempstrsize+1));
271 		MYMPIRECV (tempstr, tempstrsize, MPI_CHAR, sender, tag,
272 			   comm_world, &status);
273 		//printf("%i> MASTER: burn-in message from n%i: %s\n",myID,sender,tempstr);
274 		handle_burnin_message(tempstr,sender-BURNTAG, world);
275 		break;
276 	      case 'R':
277 		//ignore first character and translate into locusnumber
278                 locusdone = atol(leadstr+1);
279 		//if negative this means locus had no data at all -> ignored
280 		if(locusdone<0)
281 		  {
282 		    locusdone = -locusdone;
283 		    world->data->skiploci[locusdone]=TRUE;
284 		    //printf("MASTER received a skiplocus %li\n",locusdone);
285 		  }
286 		done=TRUE;
287 		++alldone;
288 		//printff("%i> MASTER: locus %li, %li of %li finished **************************\n",myID, locusdone,alldone,world->loci);
289                 break;
290 	      case 'N': // need a replicator, this message is sent from a
291 		// locus-node to the master for distribution among nodes that
292 		// are waiting for work, the master send will delegate a
293 		// replicate and tell also the replicator
294 		// where it needs to send the final result.
295 #ifdef DEBUG_MPI
296 	        printf("%i> MASTER: received replicator request from n%i using string %s\n",myID, sender, tempstr);
297 #endif
298 		// add the mpistack_request list
299 		handle_replication(sender,tag,leadstr,world);
300 		break;
301 	      case 'G':
302 		// node has free time and could do replicator work
303 #ifdef DEBUG_MPI
304 	        printf("%i> MASTER: accepts replicator n%i at stack position %li\n",myID, sender,world->mpistacknum);
305 #endif
306 		world->mpistack[world->mpistacknum] = sender;
307 		world->mpistacknum += 1;
308 		if(alldone>=world->loci && (world->mpistacknum >= (numcpu-1)) && world->mpistack_requestnum==0)
309 		  {
310 #ifdef DEBUG_MPI
311 		    printf("%i> MASTER: reached mpistacknum=%li\n",myID,world->mpistacknum);
312 #endif
313 		    done=TRUE;
314 		    continue;
315 		  }
316 		while(world->mpistacknum > 0 && world->mpistack_requestnum > 0)
317 		  {
318 		    world->mpistack_requestnum -= 1;
319 		    sender = world->mpistack_request[world->mpistack_requestnum].sender;
320 		    tag = world->mpistack_request[world->mpistack_requestnum].tag;
321 		    handle_replication(sender,tag,
322 		      world->mpistack_request[world->mpistack_requestnum].tempstr,world);
323 		  }
324 		break;
325 	      default: /* never go here under normal run condition */
326 #ifdef DEBUG_MPI
327                 fprintf(stderr,"%i> message=@%s@\n@%i@> sender=@%i@ tag=@%i@\n",
328 			myID,leadstr, myID,status.MPI_SOURCE,status.MPI_TAG);
329 		done=TRUE;
330 #else
331                 MPI_Finalize();
332                 error("DIED because of wrong message from worker");
333 #endif
334                 break;
335 	      }
336 	  }
337         who[locusdone] = sender;
338 	// if not done with loci send another locus-work to sender (a node 1..numcpu)
339         if (numsent < loci)
340 	  {
341             twolongs[0]=numsent;
342             MYMPISEND (twolongs, TWO, MPI_LONG, (MYINT) sender, (MYINT) numsent + 1, comm_world);
343             numsent++;
344 	//printff("%i> MASTER: ANOTHER locus %li sent to worker n%i %i\n",myID, numsent, (MYINT) numsent, (MYINT) numsent + 1);
345 	  }
346         else
347 	  {
348             twolongs[0] = 0;
349 	    //tell workers to stop waiting for new loci
350 	    if(!in_mpistack(sender, world))
351 	      {
352 		MYMPISEND (twolongs, TWO, MPI_LONG, (MYINT) sender, (MYINT) 0, comm_world);
353 	      }
354 	  }
355 	//printff("%03i> available workers: %li\n     requests pending:      %li\n     total workers=%i\n     loci done=%li\n",myID, world->mpistacknum,world->mpistack_requestnum, numcpu-1, locus+1);
356 	locus++;
357       }
358 //printf("%03i> available workers: %li\n     requests pending:      %li\n     total workers=%i\n     loci done=%li\n   alldone=%li\n",myID, world->mpistacknum,world->mpistack_requestnum, numcpu-1, locus+1,alldone);
359     // stop loci and/or replicate worker that had never the chance to work on a locus
360     // or replicate, but are still listening
361     for(locus=0;locus < world->mpistacknum;locus++)
362       {
363 #ifdef DEBUG_MPI
364 	fprintf(stdout,"%i> sent kill to node %i\n",myID, world->mpistack[locus]);
365 #endif
366 	mpi_send_stop_tag(world->mpistack[locus], world);
367       }
368     myfree(twolongs);
369     myfree(istatus);
370     myfree(irequests);
371     myfree(tempstr);
372     myfree(leadstr);
373     myfree(temp);
374     //printf("%i> before barrier\n",myID);
375     //MYMPIBARRIER(comm_world);
376     //printf("%i> leaving mpi_runloci_function()\n", myID);
377 }
378 
379 
380 ///
381 /// worker nodes execute this function and serve the master
mpi_runloci_worker(world_fmt ** universe,int usize,option_fmt * options,data_fmt * data,tpool_t * heating_pool,long maxreplicate,long * treefilepos,long * Gmax)382 void mpi_runloci_worker (world_fmt ** universe, int usize,
383                     option_fmt * options, data_fmt * data,
384                     tpool_t * heating_pool, long maxreplicate,
385                     long *treefilepos, long *Gmax)
386 {
387     boolean done = FALSE;
388 
389     char *rawmessage;
390 
391     long locus;
392     long *twolongs;
393     long rawmsgsize    = 0;
394 #ifdef MPIREPLICANT
395     long nbase         = data->loci+1;
396 #endif
397 
398     MPI_Status status;
399 
400     twolongs   = (long *) mycalloc(TWO, sizeof(long));
401     rawmessage = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
402 #ifdef MPIREPLICANT
403     if(myID < nbase)
404       {
405 #endif
406         while (!done)
407 	  {
408             MYMPIRECV (twolongs, TWO, MPI_LONG, MASTER, MPI_ANY_TAG,
409 		       comm_world, &status);
410             locus = twolongs[0];
411             if (status.MPI_TAG != 0) //stop condition
412 	      {
413 		//printf("%i> WORKER: received locus %li (twolongs[0]=%li) from MASTER\n",myID, locus, twolongs[0]);
414 #ifdef MPIREPLICANT
415                 mpi_run_locus(universe, usize, options, data,
416                               heating_pool, maxreplicate, locus, treefilepos, Gmax);
417 #else
418                 run_locus (universe, usize, options, data,
419                            heating_pool, maxreplicate, locus, treefilepos, Gmax);
420 #endif
421 		if(universe[0]->data->skiploci[locus]==FALSE)
422 		  rawmsgsize = 1 + sprintf(rawmessage,"R%li",locus);
423 		else
424 		  rawmsgsize = 1 + sprintf(rawmessage,"R-%li",locus);
425                 MYMPISEND (rawmessage, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER,
426 			   (MYINT) (locus + ONE), comm_world);
427                 /* we want to know what locus we worked for
428                    - to control the work sent by master
429                    - to use in setup_parameter0() [combroyden2.c] */
430                 universe[0]->who[locidone++] = locus;
431 	      }
432             else
433 	      {
434                 done = TRUE;
435 		//printf("%i> LOCUSWORKER finished\n",myID);
436 		mpi_runreplicates_worker (universe, usize, options,  data, heating_pool, treefilepos, Gmax);
437 		//		printf("%i> LOCUSWORKER and then REPLICANT: finished\n",myID);
438 	      }
439 	  }
440 #ifdef MPIREPLICANT
441       }
442     else
443       {
444         mpi_runreplicates_worker (universe, usize, options,  data, heating_pool, treefilepos, Gmax);
445       }
446 #endif
447     myfree(twolongs);
448     myfree(rawmessage);
449     //printf("%i> WORKER done\n",myID);
450     //   printf("%i> before barrier\n",myID);
451     //MYMPIBARRIER(comm_world);
452 }
453 
454 #ifdef MPIREPLICANT
455 ///
456 /// attempt to improve MPIruns with replicates
457 /// each locus is responsible for replication farming and and reporting back to master
458 /// master <- locus-master <- locus-replicate-worker
459 /// generate genealogies for a single locus
460 /// \callgraph
461 void
mpi_run_locus(world_fmt ** universe,int usize,option_fmt * options,data_fmt * data,tpool_t * heating_pool,long maxreplicate,long locus,long * treefilepos,long * Gmax)462 mpi_run_locus(world_fmt ** universe, int usize, option_fmt * options,
463           data_fmt * data, tpool_t * heating_pool, long maxreplicate,
464           long locus, long *treefilepos, long *Gmax)
465 {
466   //world_fmt *world=universe[0];
467     boolean done=FALSE;
468 
469     char *tempstr;
470     //    const long tempstrsize = SMALLBUFSIZE;
471     int tag;
472     int *who;
473     int minnodes;                         // number of replicate-worker nodes [changed meaning!]
474     int sender           = 0;
475     int nbase            = data->loci;    //number of loci-worker nodes
476     int senderlocus      = -1;
477     long replicate;
478     long i;
479     long *temp;
480     long numsent         = 1;//
481     long senderreplicate = 0;
482 
483     //    MPI_Request irequest;
484     MPI_Request *irequests = NULL;
485 
486     MPI_Status status;
487     MPI_Status *istatus    = NULL;
488 
489     temp    = (long *) mycalloc(TWO, sizeof(long));
490     tempstr = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
491     who     = (int *) mycalloc(maxreplicate,sizeof(int));
492 
493 
494     if(maxreplicate>1)
495       {
496 	// number of nodes available for replicate workers,
497 	// numcpu - nbase - 1 [for masternode] and maxreplicate-1
498 	// [locus-worker is doing one replicate itself]  are limiting
499 	//minnodes =  maxreplicate; //MAX(0,MIN(maxreplicate-1,numcpu-nbase-1))+1;
500 	minnodes =  MAX(0,MIN(numcpu-nbase-1,maxreplicate-1))+1;
501 	irequests = (MPI_Request *) mycalloc(minnodes+1,sizeof(MPI_Request));
502 	istatus   = (MPI_Status *) mycalloc(minnodes+1,sizeof(MPI_Status));
503 	temp[0] = locus;
504 	// replicate 1 to maxreplicate should be worked on by other nodes
505 	// minnodes is the number of nodes free for work on replicates alone
506 	// so with 3 replicates to work on and 3 total nodes and a single locus we need
507 	// 1 master, 1 locus-worker and have 1 replicate-worker, the locus-worker and
508 	// the replicate worker will both work on replicates, the locus-worker sends
509 	// off 1 request (for the replicate worker), because we start the locus worker
510 	// with replicate 0, the loop for the replicate workers starts at one
511 	// and to address this shift the MIN(..,minnodes+1) addresses that
512 	for (replicate = 1; replicate < minnodes; replicate++)//Cesky Krumlov 2013 replicate=1 replaced
513 	  {
514             temp[1] = replicate;
515 	    sprintf(tempstr,"N%i %li %li", myID, locus, replicate);
516 	    //fprintf(stdout,"%i> request a node from n%i for locus %li and replicate %li\n",myID, MASTER, locus, replicate);
517             MYMPIISEND (tempstr, SMALLBUFSIZE, MPI_CHAR,
518 			(MYINT) MASTER, (MYINT) (locus + 1 + REPTAG), comm_world, &irequests[numsent]);
519             numsent++;   // counter of how many replicates are sent off-node
520 	  }
521         // this should be asynchronous so that the myRepID-master can do some work, too.
522 	//Cesky Krumlov 2013
523 	run_replicate(locus, 0, universe, options, data,
524 		      heating_pool, usize,
525 		      treefilepos, Gmax);
526         //Cesky Krumlov 2013
527         //Cesky Krumlov 2013
528         //Cesky Krumlov 2013
529 	who[0] = myID;
530         MYMPIWAITALL(numsent,irequests, istatus); // wait for all replicators to finish
531         // set replicate counter to 1 because locus-worker itself finished first replicate
532         replicate=1;//Cesky Krumlov 2013 was 1
533         //Cesky Krumlov 2013numsent++;
534 
535         done = FALSE;
536         while(!done)
537 	  {
538             // done=TRUE means that
539             // no replicator worker is available yet
540             // the loci-worker has to do all the work if numsent==1
541 // Cesky Krumlov 2013	    printf("%i> accepted/trials: numsent=%li rep %li [%li,%li,%li,%li]/[%li,%li,%li,%li]\n",
542 // Cesky Krumlov 2013		   myID, numsent, replicate, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
543             if(numsent==1)
544 	      {
545                 run_replicate(locus, replicate, universe, options, data,
546                               heating_pool, usize,
547                               treefilepos, Gmax);
548                 who[replicate] = myID;
549                 replicate++;
550                 if(replicate >= maxreplicate)
551 		  done=TRUE;
552 	      }
553             else
554 	      {
555                 memset(irequests,0,sizeof(int)*minnodes);
556                 memset(istatus,0,sizeof(int)*minnodes);
557                 MYMPIRECV (tempstr, SMALLBUFSIZE, MPI_CHAR, MPI_ANY_SOURCE,
558 			   (MYINT)(locus+1+ REPTAG), comm_world, &status);
559                 sender = status.MPI_SOURCE;  // repID of the replicator that did the work
560                 tag = status.MPI_TAG;        // tag is working locus + replicator tag
561                 senderlocus = tag-REPTAG;    // locus that was worked on by sender
562                 // test so that we can be sure that we got things from a valid replicator worker
563                 if(sender == myID)
564 		  {
565                     fprintf(stdout,"%i, %i> DIE\nDIE\nDIE\nDIE\nDIE\nDIE\nDIE\nDIE\nDIE\n",myID, myRepID);
566                     error("tried to send a replicate to myself using MPI -- this is not allowed\n");
567 		  }
568                 // test whether the locus is the same between locus-worker and replicator
569                 if(senderlocus-1 != locus)
570 		  warning("%i> !!!!!!!!!!!!! got wrong locus from worker myRepID=%i (my locus %i != its locus %i )\n",
571                           myID, sender, locus, senderlocus-1);
572                 // receive only messages that are prefixed with 'R' and exit on all others
573                 if(tempstr[0]=='R')
574 		  {
575                     //ignore first character and translate into repnumber
576                     senderreplicate = atol(tempstr+1);
577 		  }
578                 else
579 		  {
580 		    fprintf(stderr,"%i> message=%s\n%i> sender=%i tag=%i\n",myID,tempstr, myID,
581 			    status.MPI_SOURCE,status.MPI_TAG);
582                     error("DIED because of wrong message from worker");
583 		  }
584 		// record sender , this record should be filled at the end of this function
585                 who[senderreplicate] = sender;
586 		//fprintf(stdout,"%i> senderreplicate=%li\n%i> sender=%i tag=%i\n",myID,senderreplicate, myID,
587 		//	    status.MPI_SOURCE,status.MPI_TAG);
588                 mpi_receive_replicate(sender, tag, locus, senderreplicate, universe[0]);
589                 replicate++;
590                 if(replicate >= maxreplicate)
591 		  {
592                     done=TRUE;
593 		  }
594                 temp[0] = locus;
595                 if (numsent < maxreplicate) //at least one set was worked by the locus-worker
596 		  {
597                     temp[1] = numsent;
598 		    sprintf(tempstr,"N%i %li %li", myID, locus, numsent);
599 		    //fprintf(stdout,"%i> request a node from n%i for locus %li and replicate %li\n",myID, MASTER, locus, numsent);
600 		    MYMPIISEND (tempstr, SMALLBUFSIZE, MPI_CHAR,
601 				(MYINT) MASTER, (MYINT) (locus + 1 + REPTAG), comm_world, &irequest);
602                     numsent++;
603                     MYMPIWAITALL(ONE, &irequest, &status);
604 		  }
605 	      }
606 	  }
607       }
608     else
609       { /* no replicates */
610         run_replicate(locus, 0, universe, options, data,
611                       heating_pool, usize,
612                       treefilepos, Gmax);
613       }
614     myfree(temp);
615     myfree(tempstr);
616     myfree(who);
617     if(maxreplicate>1 && (numcpu-1) > universe[0]->loci)
618       {
619 	const long hc = universe[0]->options->heated_chains;
620 	long t;
621 	if(universe[0]->options->heating)
622 	  {
623 	    printf("CORRECTING\n");
624 	    for(t=0; t < hc; t++)
625 	      {
626 		universe[0]->bf[locus * hc + t] /= maxreplicate;
627 	      }
628 	  }
629 	myfree(irequests);
630 	myfree(istatus);
631       }
632 #ifdef UEP
633     if (options->uep)
634       show_uep_store(universe[0]);
635 #endif
636     if (options->bayes_infer)
637       {
638 	if (options->replicate && options->replicatenum > 0)
639 	  {
640 	    (universe[0])->repkind = MULTIPLERUN;
641 	  }
642 	// @@@@@@@@@@CHECK@@@@@@@@@@@@@
643 	if(!universe[0]->options->has_bayesmdimfile)
644 	  calculate_credibility_interval(universe[0], locus);
645       }
646     else
647       {
648 	if (options->replicate && options->replicatenum > 0)
649 	  {
650 	    (universe[0])->repkind = MULTIPLERUN;
651 #ifdef LONGSUM
652 	    change_longsum_times(EARTH);
653 #endif /*LONGSUM*/
654 	    // over multiple replicates if present or single locus
655 	    //printf("ML over multiple replicates or single locus\n");
656 	    (void) estimateParameter(options->replicatenum, *Gmax, universe[0], options,
657 				     (universe[0])->cov[locus], options->lchains, /*type*/ 'l',
658 				     SINGLELOCUS, (universe[0])->repkind);
659 	  }
660       }
661 
662     // cleanup
663     if (options->heating)
664     {
665         for (i = 0; i < options->heated_chains; i++)
666         {
667 	  //free_tree(universe[i]->root, universe[i]);
668         }
669     }
670     else
671     {
672       //      free_tree(universe[0]->root, universe[0]);
673     }
674     bayes_reset(universe[0]);
675 }
676 
677 
678 ///
679 /// run replicates on replicate-worker nodes
680 /// this function is called in main() and will work on any preset locus and any preset replicate
681 /// the calling function is responsible for the correct assignment to locus and replicate.
682 void
mpi_runreplicates_worker(world_fmt ** universe,int usize,option_fmt * options,data_fmt * data,tpool_t * heating_pool,long * treefilepos,long * Gmax)683 mpi_runreplicates_worker (world_fmt ** universe, int usize,
684                     option_fmt * options, data_fmt * data,
685                     tpool_t * heating_pool,
686                     long *treefilepos, long *Gmax)
687 {
688     boolean done = FALSE;
689     char *rawmessage;
690     int sender;
691     long nng;
692     long *temp;
693     long replicate;
694     long locus;
695     long rawmsgsize = 0;
696     char *ready;
697     MPI_Status status;
698     ready = mycalloc(SMALLBUFSIZE,sizeof(char));
699     temp = (long *) mycalloc(3, sizeof(long));
700     rawmessage = (char *) mycalloc(STRSIZE,sizeof(char));
701     sprintf(ready,"G%i",myID);
702     while (!done)
703       {
704 	// 	printf("%i> REPLICANT:  send ready message \"%s\" to n%i\n", myID, ready, MASTER);
705 	MYMPISEND (ready, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, myID, comm_world);
706         MYMPIRECV (temp, 3, MPI_LONG, (MYINT) MASTER, MPI_ANY_TAG, comm_world, &status);
707         sender = (int) temp[0];
708         locus = temp[1];
709         replicate = temp[2];
710 	//printf("%i> REPLICANT:  received locus %li and replicate %li from n%i via n%i \n", myID, locus+1, replicate+1, sender, status.MPI_SOURCE);
711         if (status.MPI_TAG != 0) //stop condition
712           {
713 	    nng=universe[0]->numpop2 + universe[0]->loci * universe[0]->bayes->mu + 1;
714 	    memset(universe[0]->accept_archive,0,sizeof(long)*2*nng);//Cesky Krumlov 2013 resets also trials_archive
715 
716             run_replicate(locus, replicate, universe, options, data, heating_pool, usize,treefilepos, Gmax);
717 	    //printf("%i> after runreplicate in\n",myID);
718             rawmsgsize = 1 + sprintf(rawmessage,"R%li ",replicate);
719             MYMPISEND (rawmessage, rawmsgsize, MPI_CHAR, (MYINT) sender,
720 		       (MYINT) (locus+1+ REPTAG), comm_world);
721             mpi_send_replicate(sender, locus, replicate, universe[0]);
722 	    //printf("%i> after mpi_send_replicate()\n",myID);
723 	    if(universe[0]->options->bayes_infer)
724 	      {
725 		universe[0]->bayes->numparams = 0;
726 	      }
727           }
728         else
729           {
730             done = TRUE;
731 	    //printf("%i> REPLICANT received KILL signal.\n",myID);
732           }
733       }
734     myfree(ready);
735     myfree(temp);
736     myfree(rawmessage);
737     //    printf("%i> REPLICANT: all replication work is done\n",myID);
738 }
739 
740 //---------end replication in MPI
741 void
assign_worker_cleanup(void)742 assign_worker_cleanup (void)
743 {
744     boolean done = FALSE;
745 
746     char *rawmessage;
747 
748     int sender;
749 
750     long *temp;
751     char *ready;
752     MPI_Status status;
753     ready = mycalloc(SMALLBUFSIZE,sizeof(char));
754     temp = (long *) mycalloc(3, sizeof(long));
755     rawmessage = (char *) mycalloc(STRSIZE,sizeof(char));
756     sprintf(ready,"G%i",myID);
757     while (!done)
758       {
759  	//printf("%i> REPLICANT:  send ready message \"%s\" to n%i\n", myID, ready, MASTER);
760 	MYMPISEND (ready, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, myID, comm_world);
761         MYMPIRECV (temp, 3, MPI_LONG, (MYINT) MASTER, MPI_ANY_TAG, comm_world, &status);
762         sender = (int) temp[0];
763         if (status.MPI_TAG != 0) //stop condition
764           {
765 	    printf("%i> received real message but do not know what to do with this\n",myID);
766           }
767         else
768           {
769             done = TRUE;
770           }
771       }
772     myfree(ready);
773     myfree(temp);
774     myfree(rawmessage);
775 }
776 
777 //---------end replication in MPI
778 #endif /*MPIREPLICANT*/
779 
780 
781 ///
782 /// orchestrates the likelihood calculation
783 MYREAL
mpi_likelihood_master(MYREAL * param,MYREAL * lparam,world_fmt * world,nr_fmt * nr,helper_fmt * helper,int * who)784 mpi_likelihood_master (MYREAL *param, MYREAL *lparam,
785                        world_fmt * world, nr_fmt * nr,
786                        helper_fmt * helper, int *who)
787 {
788   int tag;
789 
790   long locus;
791   long worker;
792   long sender;
793   long addon    = 1;
794   long numelem  = world->numpop2 + (world->options->gamma ? 1 : 0);
795   long numelem2 = numelem * 2;
796 
797   MYREAL logres = 0.0;
798   MYREAL *temp;
799   MYREAL *tmp;
800 
801   MPI_Status status;
802 
803   MYREAL * locusweight = world->data->locusweight; //invariant loci treatment
804 
805   doublevec1d(&tmp,world->loci);
806   doublevec1d(&temp,numelem2+2);
807 
808   temp[0] = MIGMPI_LIKE;
809   memcpy (temp + 1, param, numelem * sizeof (MYREAL));
810   memcpy (temp + 1 + numelem, lparam, numelem * sizeof (MYREAL));
811 
812   memset (nr->locilikes, 0, sizeof (MYREAL) * world->loci);
813 
814   //  if(world->loci==1)
815   //  addon=1;
816   //else
817   //addon=0;
818 
819   for (worker = 1; worker < MIN (world->loci + addon, numcpu); worker++)
820     {
821       MYMPISEND (temp, (MYINT) numelem2+2, mpisizeof, (MYINT) worker, (MYINT) worker, comm_world);
822     }
823   for (worker = 1; worker < MIN (world->loci + addon, numcpu); worker++)
824     {
825       MYMPIRECV (tmp, (MYINT) world->loci, mpisizeof, MPI_ANY_SOURCE,
826 		 MPI_ANY_TAG, comm_world, &status);
827       sender = status.MPI_SOURCE;
828       tag = status.MPI_TAG;
829       // the worker send a vector of values
830       // e.g. (0,0,0,-12,0,-34,0,0,0), these are all loci
831       // of which most of them were not evaluated
832       // the loop updates the master copy of locilikes
833       for (locus = 0; locus < world->loci; locus++)
834         {
835 	  nr->locilikes[locus] += tmp[locus];
836         }
837     }
838   for (locus = 0; locus < world->loci; locus++)
839     {
840       logres += locusweight[locus] * nr->locilikes[locus];//invariant loci treatment
841     }
842   nr->llike = logres;
843   myfree(temp);
844   myfree(tmp);
845   return logres;
846 }
847 
848 
849 ///
850 /// workers calculate likelihood and wait for master to receive work orders
851 void
mpi_likelihood_worker(world_fmt * world,helper_fmt * helper,long rep)852 mpi_likelihood_worker (world_fmt * world, helper_fmt * helper, long rep)
853 {
854     long locus;
855     long ww;
856 
857     nr_fmt *nr = helper->nr;
858 
859     MYREAL *param = helper->expxv;
860     MYREAL *lparam = helper->xv;
861     MYREAL *mu_rates = world->options->mu_rates;
862 
863     memset (nr->locilikes, 0, sizeof (MYREAL) * world->loci);
864 
865     if (world->options->gamma)
866       {
867         if (lparam[nr->numpop2] > LNMAXPARAM)
868 	  {
869             lparam[nr->numpop2] = LNMAXPARAM;
870 	  }
871         initgammacat (nr->categs, EXP (lparam[nr->numpop2]),1./* EXP (lparam[0])*/,
872                       nr->rate, nr->probcat);
873       }
874 
875     for (ww = 0; ww < locidone; ww++)
876       {
877         locus = nr->world->who[ww];
878         if (!world->options->gamma)
879 	  {
880             nr->locilikes[locus] =
881 	      calc_locus_like (nr, param, lparam, locus) + mu_rates[locus];
882 	  }
883         else
884 	  {
885             helper->locus = locus;
886             nr->locilikes[locus] = gamma_locus_like (nr,param,lparam,helper->weight,locus);
887 	  }
888       }
889 }
890 
891 ///
892 /// setup start parameters
893 void
mpi_startparam_master(world_fmt * world)894 mpi_startparam_master(world_fmt * world)
895 {
896     int tag;
897     int numreceived = 0;
898 
899     long i;
900     long sender;
901     long workerloci = 0;
902 
903     MYREAL  *tmp;
904 
905     MPI_Status status;
906 
907     tmp = (MYREAL*) mycalloc(world->numpop2+1,sizeof(MYREAL));
908 
909     while (numreceived < world->loci)
910       {
911         MYMPIRECV (tmp, world->numpop2+1, mpisizeof, MPI_ANY_SOURCE,
912                    MPI_ANY_TAG, comm_world, &status);
913         sender = status.MPI_SOURCE;
914         tag = status.MPI_TAG;
915         workerloci=tmp[0];
916         for(i=0; i<world->numpop2; i++)
917 	  {
918 	    world->param0[i] += tmp[i+1];
919 	  }
920         numreceived+=workerloci;
921       }
922     for(i=0; i<world->numpop2; i++)
923       world->param0[i] /= world->loci;
924 
925     myfree(tmp);
926 }
927 
928 ///
929 /// set start parameters for workers
930 void
mpi_startparam_worker(world_fmt * world)931 mpi_startparam_worker (world_fmt * world)
932 {
933     long ww;
934     long repstart;
935     long repstop;
936     long r;
937     long i;
938     long locus;
939 
940     MYREAL *tmp;
941 
942     if(locidone>0)
943       {
944 	tmp = (MYREAL*) mycalloc(world->numpop2+1,sizeof(MYREAL));
945         set_replicates (world, world->repkind, world->options->replicatenum,
946                         &repstart, &repstop);
947         tmp[0]=(MYREAL)locidone;
948         for (ww = 0; ww < locidone; ww++)
949 	  {
950             locus = world->who[ww];
951             for (r = repstart; r < repstop; r++)
952 	      {
953                 for(i=0; i < world->numpop2; i++)
954 		  tmp[i+1] += world->atl[r][locus].param[i];
955 	      }
956 	  }
957         for(i=1; i < world->numpop2+1; i++)
958 	  {
959 	    tmp[i] /= locidone * (repstop-repstart);
960 	  }
961 	MYMPISEND (tmp, world->numpop2+1, mpisizeof, MASTER, myID, comm_world);
962 	myfree(tmp);
963       }
964 }
965 
966 
967 ///
968 /// orchestrates max(gmax) over all nodes
969 void
mpi_gmax_master(world_fmt * world,long * Gmax)970 mpi_gmax_master (world_fmt * world, long *Gmax)
971 {
972     int tag;
973     int sender;
974     int numreceived = 0;
975 
976     long tmp;
977 
978     MPI_Status status;
979 
980     *Gmax = 0.;
981 
982     MYMPIBCAST (Gmax, ONE, MPI_LONG, MASTER, comm_world);
983 
984     while (numreceived < MIN(world->loci, numcpu - 1))
985       {
986 	MYMPIRECV (&tmp, ONE, MPI_LONG, MPI_ANY_SOURCE,
987 		   MPI_ANY_TAG, comm_world, &status);
988 	sender = status.MPI_SOURCE;
989 	tag = status.MPI_TAG;
990         if (*Gmax < tmp)
991 	  {
992             *Gmax = tmp;
993 	  }
994         numreceived++;
995       }
996     //  do we need this barrier really?
997     MYMPIBARRIER(comm_world);
998 }
999 
1000 ///
1001 /// returns the gmax values to the master to evaluate max(gmax)
1002 void
mpi_gmax_worker(world_fmt * world)1003 mpi_gmax_worker (world_fmt * world)
1004 {
1005     long ww;
1006     long repstart;
1007     long repstop;
1008     long r;
1009     long locus;
1010     long Gmax = 1;
1011 
1012     MYMPIBCAST (&Gmax, ONE, MPI_LONG, MASTER, comm_world);
1013 #ifdef DEBUG_MPI
1014     printf("%i> locidone=%i\n",myID,locidone);
1015 #endif
1016     if(locidone>0)
1017       {
1018 	set_replicates (world, world->repkind, world->options->replicatenum,
1019 			&repstart, &repstop);
1020 
1021 	for (ww = 0; ww < locidone; ww++)
1022 	  {
1023 	    locus = world->who[ww];
1024 	    for (r = repstart; r < repstop; r++)
1025 	      {
1026 		if (Gmax < world->atl[r][locus].T)
1027 		  Gmax = world->atl[r][locus].T;
1028 	      }
1029 	  }
1030 	MYMPISEND (&Gmax, ONE, MPI_LONG, MASTER, myID, comm_world);
1031       }
1032     //  do we need this barrier really?
1033     MYMPIBARRIER(comm_world);
1034 }
1035 
1036 ///
1037 /// first worker (myID=1, myRepId=0) will send stop-message to replication nodes
1038 /// the comm_worker group's master is the first worker who has ID=0 in this group
1039 /// as results we send messages to id=1..x in the comm_worker group, do not mix this
1040 /// with the id in comm_world that include the master (id=0 there).
mpi_send_stop_mcmc_worker(long numcpu,long loci,MPI_Comm * comm,MPI_Request * irequests,MPI_Status * istatus,long id)1041 long  mpi_send_stop_mcmc_worker(long numcpu, long loci, MPI_Comm *comm,
1042 				     MPI_Request *irequests, MPI_Status *istatus, long id)
1043 {
1044     long twolongs[2];
1045     long *temp;
1046     long receiver;
1047     long sent      = 0;
1048     long xx        = (id==0) ? 0 : 1;
1049 
1050     temp = (long *) mycalloc(TWO, sizeof(long));
1051     twolongs[0]=0;
1052     twolongs[1]=0;
1053 
1054     for(receiver=loci+1-xx; receiver< numcpu-xx; receiver++)
1055       {
1056 	MYMPIISEND (temp, TWO, MPI_LONG, (MYINT) receiver, 0, *comm, &irequests[sent]);
1057         sent++;
1058       }
1059     if(sent>0)
1060       {
1061 	MYMPIWAITALL(sent,irequests, istatus); // wait for all replicators to finish
1062       }
1063     myfree(temp);
1064     return sent;
1065 }
1066 
1067 ///
1068 /// first worker (myID=1, myRepId=0) will send stop-message to replication nodes
1069 /// the comm_worker group's master is the first worker who has ID=0 in this group
1070 /// as results we send messages to id=1..x in the comm_worker group, do not mix this
1071 /// with the id in comm_world that include the master (id=0 there).
mpi_send_stop_mcmc_lociworker(long numcpu,long loci)1072 long  mpi_send_stop_mcmc_lociworker(long numcpu, long loci)
1073 {
1074 
1075   long receiver;
1076   long *temp;
1077   long xx       = (myID==0) ? 0 : 1;
1078   long sent     = 0;
1079   long minnodes = MIN(numcpu,loci -1);
1080 
1081 
1082   MPI_Request *irequests;
1083   MPI_Status *istatus;
1084   irequests = (MPI_Request *) mycalloc(minnodes+1,sizeof(MPI_Request));
1085   istatus = (MPI_Status *) mycalloc(minnodes+1,sizeof(MPI_Status));
1086 
1087   temp = (long *) mycalloc(TWO, sizeof(long));
1088 
1089   for(receiver=loci+1-xx; receiver< numcpu-xx; receiver++)
1090     {
1091            error("disable because testing why openmpi breaks");
1092       //      MYMPIISEND (temp, TWO, MPI_LONG, (MYINT) receiver, 0, comm_workers, &irequests[sent]);
1093       sent++;
1094     }
1095   if(sent>0)
1096     MYMPIWAITALL(sent,irequests, istatus); // wait for all replicators to finish
1097   myfree(temp);
1098   myfree(irequests);
1099   myfree(istatus);
1100   return sent;
1101 }
1102 
1103 ///
1104 /// stops all replicate workers
mpi_send_stop_mcmc_replicateworker(long numcpu,long loci)1105 long  mpi_send_stop_mcmc_replicateworker(long numcpu, long loci)
1106 {
1107 
1108   long *temp;
1109   long receiver;
1110   long sent      = 0;
1111   long xx        = (myID==0) ? 0 : 1;
1112   long minnodes  = labs(numcpu - loci -1);
1113 
1114   MPI_Request *irequests;
1115   MPI_Status *istatus;
1116 
1117   irequests = (MPI_Request *) mycalloc(minnodes+1,sizeof(MPI_Request));
1118   istatus   = (MPI_Status *) mycalloc(minnodes+1,sizeof(MPI_Status));
1119   temp      = (long *) mycalloc(TWO, sizeof(long));
1120 
1121   for(receiver=loci+1-xx; receiver< numcpu-xx; receiver++)
1122     {
1123       error("disable because testing why openmpi breaks");
1124       //     MYMPIISEND (temp, 2, MPI_LONG, (MYINT) receiver, 0, comm_workers, &irequests[sent]);
1125       sent++;
1126     }
1127   if(sent>0)
1128     {
1129       MYMPIWAITALL(sent,irequests, istatus); // wait for all replicators to finish
1130     }
1131   myfree(temp);
1132   myfree(irequests);
1133   myfree(istatus);
1134   return sent;
1135 }
1136 
1137 ///
1138 /// sends a stop signal to all loci-worker
1139 void
mpi_send_stop(world_fmt * world)1140 mpi_send_stop (world_fmt * world)
1141 {
1142   long worker;
1143   long numelem  = world->numpop2 + (world->options->gamma ? 1 : 0);
1144   long numelem2 = 2 * numelem;
1145 
1146   MYREAL *temp;
1147 
1148   temp = (MYREAL *) mycalloc (numelem2+2, sizeof (MYREAL));
1149   temp[0] = MIGMPI_END;
1150 
1151   for (worker = 1; worker < numcpu; worker++)
1152     {
1153       MYMPISEND (temp, (MYINT) numelem2+2, mpisizeof, (MYINT) worker, (MYINT) 0, comm_world); //end of loci
1154     }
1155   myfree(temp);
1156 }
1157 
1158 ///
1159 /// sends a stop signal to a specific worker used in for assignloci_worker
mpi_send_stop_tag(int worker,world_fmt * world)1160 void mpi_send_stop_tag (int worker, world_fmt * world)
1161 {
1162   long *temp;
1163 
1164   temp = (long *) mycalloc (TWO, sizeof (MYREAL));
1165 
1166   temp[0] = 0;
1167   temp[1] = 0;
1168   MYMPISEND (temp, (MYINT) TWO, MPI_LONG, (MYINT) worker, (MYINT) 0, comm_world);
1169 
1170   myfree(temp);
1171 
1172 }
1173 
1174 ///
1175 /// sends a continue with further analysis to the workers
1176 void
mpi_results_stop(void)1177 mpi_results_stop (void)
1178 {
1179     long worker;
1180     long dummy = 0;
1181 
1182     for (worker = 1; worker < numcpu; worker++)
1183       {
1184         MYMPISEND (&dummy, ONE, MPI_LONG, (MYINT) worker, (MYINT) 0, comm_world);
1185       }
1186 }
1187 
1188 ///
1189 /// sends out requests and receives all the locus-gradients from the workers
1190 void
mpi_gradient_master(nr_fmt * nr,world_fmt * world,int * who)1191 mpi_gradient_master (nr_fmt * nr, world_fmt * world, int *who)
1192 {
1193     int tag;
1194 
1195     long locus;
1196     long sender;
1197     long *tempindex;
1198     long addon    = 1;//(world->loci == 1)? 0 : 1;
1199     long numelem  = nr->partsize;
1200     long numelem2 = 2 * numelem;
1201 
1202     MYREAL *temp;
1203 
1204     MPI_Status status;
1205 
1206     temp      = (MYREAL *) mycalloc (numelem2+2, sizeof (MYREAL));
1207     tempindex = (long *) mycalloc (numelem, sizeof (long));
1208     temp[0]   = MIGMPI_GRADIENT;
1209 
1210     memcpy (temp + 1, nr->param, numelem * sizeof (MYREAL));
1211     memcpy (tempindex, nr->indeks, nr->partsize * sizeof (long));
1212     memcpy (temp + 1 + numelem, nr->lparam, numelem * sizeof (MYREAL));
1213     temp[numelem2+1] = nr->profilenum;
1214 
1215     for (locus = 1; locus < MIN (world->loci + addon, numcpu); locus++)
1216       {
1217         MYMPISEND (temp, (MYINT) numelem2+2, mpisizeof, (MYINT) locus, (MYINT) locus, comm_world);
1218         MYMPISEND (tempindex, (MYINT) numelem, MPI_LONG, (MYINT) locus, (MYINT) locus, comm_world);
1219       }
1220 
1221     memset (nr->d, 0, sizeof (MYREAL) * numelem);
1222     for (locus = 1; locus < MIN (world->loci + addon, numcpu); locus++)
1223       {
1224 	copy_and_clear_d (nr);
1225 	MYMPIRECV (nr->d, (MYINT) numelem, mpisizeof, MPI_ANY_SOURCE,
1226 		   MPI_ANY_TAG, comm_world, &status);
1227 	add_back_d (nr);
1228 	sender = status.MPI_SOURCE;
1229 	tag = status.MPI_TAG;
1230       }
1231     myfree(temp);
1232     myfree(tempindex);
1233 }
1234 
1235 ///
1236 /// worker returns a gradient to master
1237 void
mpi_gradient_worker(helper_fmt * helper,nr_fmt * nr,timearchive_fmt ** tyme)1238 mpi_gradient_worker (helper_fmt * helper, nr_fmt * nr,
1239                      timearchive_fmt ** tyme)
1240 {
1241   long ww, locus;
1242 
1243   memset (nr->d, 0, sizeof (MYREAL) * nr->partsize);
1244   for (ww = 0; ww < locidone; ww++)
1245     {
1246       locus = nr->world->who[ww];
1247       if(!nr->world->options->gamma)
1248         {
1249 	  copy_and_clear_d (nr);
1250 	  simple_loci_derivatives (nr->d, nr, tyme, locus);
1251 	  add_back_d (nr);
1252         }
1253       else
1254         {
1255 	  gamma_locus_derivative (helper, locus);
1256         }
1257     }
1258 }
1259 
1260 ///
1261 /// provides calculations for the master. calculates likelihoods, gradients, but also
1262 /// delivers results, migration histories, bayesian results and more.
1263 /// workers stay a long time in this function and anwer requests from the master
1264 void
mpi_maximize_worker(world_fmt * world,option_fmt * options,long kind,long rep)1265 mpi_maximize_worker (world_fmt * world, option_fmt *options, long kind, long rep)
1266 {
1267   boolean done = FALSE;
1268   long locus;
1269   long repstart;
1270   long repstop;
1271   long Gmax;
1272   long numelem  =  world->numpop2 + (world->options->gamma ?  1 : 0) ;
1273   long numelem2 = numelem * 2;
1274   MYREAL *temp;
1275   MPI_Status status;
1276   nr_fmt *nr;
1277   helper_fmt helper;
1278   //char tempfilename[STRSIZE];
1279   temp = (MYREAL *) mycalloc (numelem2 + 2, sizeof (MYREAL));
1280   helper.xv = (MYREAL *) mycalloc (numelem2, sizeof (MYREAL));
1281   helper.expxv = (MYREAL *) mycalloc (numelem2, sizeof (MYREAL));
1282   helper.analystype = SINGLELOCUS; //this may be problematic byt seems only
1283   // to be used here for decision on gamma or not
1284   nr = (nr_fmt *) mycalloc (1, sizeof (nr_fmt));
1285   set_replicates (world, world->repkind, rep, &repstart, &repstop);
1286   which_calc_like (world->repkind);
1287   if(!world->options->bayes_infer)
1288     {
1289       MYMPIBCAST (&Gmax, 1, MPI_LONG, MASTER, comm_world);
1290       create_nr (nr, world, Gmax, 0, world->loci, world->repkind, repstart);
1291       SETUPPARAM0 (world, nr, world->repkind,
1292 		   repstart, repstop, world->loci, kind, TRUE);
1293     }
1294   else
1295     {
1296       Gmax=1;
1297       create_nr (nr, world, Gmax, 0, world->loci, world->repkind, repstart);
1298     }
1299   while (!done)
1300     {
1301       MYMPIRECV (temp, numelem2+2, mpisizeof, MASTER, MPI_ANY_TAG,
1302 		 comm_world, &status);
1303       locus = world->locus = status.MPI_TAG - 1;
1304       switch ((long) temp[0])
1305         {
1306         case MIGMPI_LIKE: // returns a log(likelihood) to the master
1307 	  memset (nr->locilikes, 0, sizeof (MYREAL) * (world->loci+1));
1308 	  memcpy (helper.expxv, temp + 1, sizeof (MYREAL) * numelem);
1309 	  memcpy (helper.xv, temp + 1 + numelem,
1310 		  sizeof (MYREAL) * numelem);
1311 	  fill_helper (&helper, helper.expxv, helper.xv, world, nr);
1312 	  mpi_likelihood_worker (world, &helper, rep);
1313 	  MYMPISEND (nr->locilikes, (MYINT) world->loci, mpisizeof, (MYINT) MASTER,
1314 		     (MYINT) (locus + 1), comm_world);
1315 	  break;
1316         case MIGMPI_GRADIENT: // returns a gradient to the master
1317 	  memcpy (nr->param, temp + 1, sizeof (MYREAL) * (numelem - 1));
1318 	  memcpy (nr->lparam, temp + 1 + numelem,
1319 		  sizeof (MYREAL) * numelem);
1320 	  fill_helper (&helper, nr->param, nr->lparam, world, nr);
1321 	  nr->profilenum = temp[numelem2 + 1];
1322 	  MYMPIRECV (nr->indeks, (MYINT) numelem, MPI_LONG, MASTER, (MYINT) (locus+1),
1323 		     comm_world, &status);
1324 	  mpi_gradient_worker (&helper, nr, world->atl);
1325 	  MYMPISEND (nr->d, (MYINT) numelem, mpisizeof, (MYINT) MASTER, (MYINT) (locus + 1),
1326 		     comm_world);
1327 	  break;
1328         case MIGMPI_RESULT: // returns the results (stats for the loci worked on
1329 	  mpi_results_worker ((long) temp[0], world, repstop, pack_result_buffer);
1330 	  break;
1331 	  //        case MIGMPI_PLOTPLANE: // returns the results (stats for the loci worked on
1332 	  //mpi_results_worker ((long) temp[0], world, repstop, pack_plotplane_buffer);
1333 	  //break;
1334         case MIGMPI_SUMFILE: // returns the tree summary statistic to the master
1335 	  mpi_results_worker ((long) temp[0], world, repstop, pack_sumfile_buffer);
1336 	  break;
1337         case MIGMPI_TREESPACE: // returns the treefile to the master
1338 	  //	  printf("%i> print treespace to master\n",myID);
1339 	  mpi_results_worker ((long) temp[0], world, repstop, pack_treespace_buffer);
1340 	  break;
1341         case MIGMPI_MIGHIST: // returns the histogram of events (top parts + obsolete parts)
1342 	  mpi_results_worker ((long) temp[0], world, repstop, pack_mighist_buffer);
1343 	  break;
1344         case MIGMPI_SKYLINE: // returns the histogram of events
1345 	  //fprintf(stdout,"%i-------------------\n",myID);
1346 	  //	  debug_skyline(world,"mpiresultsworker-skyline");
1347 	  mpi_results_worker ((long) temp[0], world, repstop, pack_skyline_buffer);
1348 	  //fprintf(stdout,"%i-------------------\n",myID);
1349 	  break;
1350         case MIGMPI_BAYESHIST: // returns the histogram of the parameters
1351 	  //printf("%i> send bayeshist\n",myID);
1352 	  mpi_results_worker ((long) temp[0], world, repstop, pack_bayes_buffer);
1353 	  break;
1354 #ifdef PARALIO
1355 	case MIGMPI_PARALIO:
1356 	  fprintf(stdout,"%i> bayesmdimfile closing\n", myID, options->bayesmdimfilename);
1357 	  MPI_File_close(&world->mpi_bayesmdimfile);
1358 	  sprintf(tempfilename,"%s",options->bayesmdimfilename);
1359 	  MYMPISEND (tempfilename, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID, comm_world);
1360 	  break;
1361 #endif
1362         case MIGMPI_END:
1363 	  done = TRUE;
1364 	  break;
1365         default:
1366 	  fprintf (stdout, "%i> Do not understand task --> exit \n", myID);
1367 	  MPI_Finalize();
1368 	  exit (0);
1369         }
1370     }
1371   myfree(temp);
1372   myfree(helper.xv);
1373   myfree(helper.expxv);
1374   destroy_nr (nr, world);
1375 }
1376 
1377 ///
1378 /// sends out the options to all workers
1379 void
broadcast_options_master(option_fmt * options,data_fmt * data)1380 broadcast_options_master (option_fmt * options, data_fmt *data)
1381 {
1382   long allocbufsize = LONGLINESIZE;
1383   long bufsize;
1384   char *buffer;
1385 #if DEBUG
1386   char nowstr[LINESIZE];
1387   get_time (nowstr, "%c");
1388   printf("Master started option broadcast at %s\n", nowstr);
1389 #endif
1390 
1391   //  char *buffermu;
1392   buffer = (char *) mycalloc (allocbufsize, sizeof (char));
1393   bufsize = save_options_buffer (&buffer, &allocbufsize, options, data);
1394   MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1395   MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1396   myfree(buffer);
1397 #if DEBUG
1398   get_time (nowstr, "%c");
1399   printf("master finished broadcast of options %s\n",nowstr);
1400 #endif
1401 }
1402 
1403 ///
1404 /// receives the options
1405 void
broadcast_options_worker(option_fmt * options)1406 broadcast_options_worker (option_fmt * options)
1407 {
1408   //  long i;
1409   //  char *temp;
1410   long bufsize;
1411   char *buffer, *sbuffer;
1412   MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1413   buffer = (char *) mycalloc (bufsize + 1, sizeof (char));
1414   sbuffer = buffer;
1415   MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1416   read_options_worker (&buffer, options);
1417   myfree(sbuffer);
1418 #if DEBUG
1419   char nowstr[LINESIZE];
1420   get_time (nowstr, "%c");
1421   printf("%i > worker finished option broadcast receive at %s\n",myID, nowstr);
1422 #endif
1423 
1424 }
1425 
1426 
1427 ///
1428 /// broadcasts the data to all nodes
1429 void
broadcast_data_master(data_fmt * data,option_fmt * options)1430 broadcast_data_master (data_fmt * data, option_fmt * options)
1431 {
1432   long bufsize;
1433   long allocbuffermusize = 1;
1434   char *buffermu;
1435 #if DEBUG
1436   char nowstr[LINESIZE];
1437   get_time (nowstr, "%c");
1438   printf("Master broadcasts data at %s\n",nowstr);
1439 #endif
1440   bufsize = pack_databuffer (data, options);
1441   if(options->murates)
1442     {
1443       buffermu = (char *) mycalloc (1, sizeof (char));
1444       bufsize = save_mu_rates_buffer(&buffermu,&allocbuffermusize, options);
1445       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1446       MYMPIBCAST (buffermu, bufsize, MPI_CHAR, MASTER, comm_world);
1447       myfree(buffermu);
1448     }
1449 #ifdef DEBUG
1450   get_time (nowstr, "%c");
1451   printf("Master finished broadcasts of data at %s\n",nowstr);
1452 #endif
1453 }
1454 
1455 ///
1456 /// receives the data from the master
1457 void
broadcast_data_worker(data_fmt * data,option_fmt * options)1458 broadcast_data_worker (data_fmt * data, option_fmt * options)
1459 {
1460   long bufsize=1;
1461   char *buffer;
1462   char *buffermu, *sbuffermu;
1463   char *temp;
1464   long i;
1465   //MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1466   buffer = (char *) mycalloc (bufsize, sizeof (char));
1467   //MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1468   unpack_databuffer (data, options);
1469   myfree(buffer);
1470   if(options->murates)
1471     {
1472       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1473       buffermu = (char *) mycalloc (bufsize + 1, sizeof (char));
1474       sbuffermu = buffermu;
1475       MYMPIBCAST (buffermu, bufsize, MPI_CHAR, MASTER, comm_world);
1476       //fprintf(stdout,"%i> received muratebuffer:>%s<\n",myID, buffermu);
1477       temp=strsep(&buffermu,"\n");
1478       options->muloci = atoi(temp);
1479       if(options->mu_rates==NULL)
1480 	options->mu_rates = (MYREAL *) mycalloc(1+options->muloci, sizeof(MYREAL));
1481       for(i=0; i< options->muloci; i++)
1482 	{
1483 	  temp = strsep(&buffermu,"\n");
1484 	  options->mu_rates[i] = atof(temp);
1485 	}
1486       myfree(sbuffermu);
1487     }
1488 #ifdef DEBUG
1489   char nowstr[LINESIZE];
1490   get_time (nowstr, "%c");
1491   printf("%i > worker finished data broadcast receive at %s\n",myID, nowstr);
1492 #endif
1493 
1494 }
1495 
1496 ///
1497 /// pack the data for shipment to the workers
1498 long
pack_databuffer(data_fmt * data,option_fmt * options)1499 pack_databuffer (data_fmt * data, option_fmt * options)
1500 {
1501   long locus, pop, ind;
1502   long bufsize = 0;
1503   long biggest;
1504 #ifdef UEP
1505   long sumtips, i;
1506 #endif
1507   char *buffer;
1508   long allocfpsize=LONGLINESIZE;
1509 
1510   buffer = (char *) mycalloc (allocfpsize,sizeof (char));
1511   bufsize = sprintf (buffer, "%c %li %li %li\n", options->datatype, (long) data->hasghost,
1512 			  data->numpop, data->loci);
1513   // send header info
1514   bufsize += 1;
1515   MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1516   MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1517   // send sites and loci information
1518   bufsize=0;
1519   for (locus = 0; locus < data->loci; locus++)
1520     {
1521       bufsize += sprintf (buffer+bufsize, "%li\n", data->seq[0]->sites[locus]);
1522       if (bufsize >= allocfpsize-100)
1523 	{
1524 	  allocfpsize += LONGLINESIZE;
1525 	  buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1526 	}
1527     }
1528   if (bufsize >= allocfpsize-100)
1529     {
1530       allocfpsize += LONGLINESIZE;
1531       buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1532     }
1533   bufsize += sprintf (buffer + bufsize, "%li %f\n", data->seq[0]->addon, data->seq[0]->fracchange);
1534   bufsize += 1;
1535   MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1536   MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1537 
1538 
1539   // send population data, send locus data, then one individual at a time
1540   for (pop = 0; pop < data->numpop; pop++)
1541     {
1542       //send each population name
1543       if (allocfpsize < LINESIZE)
1544 	{
1545 	  allocfpsize += LINESIZE;
1546 	  buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1547 	}
1548       bufsize = sprintf (buffer, "%s\n", data->popnames[pop]);//set to start
1549       bufsize += 1;
1550       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1551       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1552       biggest = 0;
1553       bufsize = 0;
1554       // send number of ind for each locus per population
1555       for(locus=0; locus<data->loci; locus++)
1556         {
1557 	  if (bufsize >= allocfpsize-100)
1558 	    {
1559 	      allocfpsize += LONGLINESIZE;
1560 	      buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1561 	    }
1562 	  bufsize += sprintf (buffer + bufsize, "%li %li\n", data->numind[pop][locus],data->numalleles[pop][locus]);
1563 	  if(biggest < data->numind[pop][locus])
1564 	    biggest = data->numind[pop][locus];
1565 	}
1566       bufsize += 1;
1567       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1568       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1569       bufsize = 0;
1570       // send individuals
1571       if (!strchr (SEQUENCETYPES, options->datatype))
1572         {
1573 	  for (ind = 0; ind < biggest; ind++)
1574             {
1575 	      // assume that buffer is always larger than nmlength
1576 	      // resetting bufsize to start!
1577 	      bufsize = sprintf (buffer, "%*s\n",  (int) options->nmlength, data->indnames[pop][ind][0]);
1578 	      pack_allele_data (&buffer, &bufsize, &allocfpsize, data, pop, ind);
1579 	      bufsize += 1;
1580 	      MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1581 	      MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1582             }
1583         }
1584       else
1585         {
1586 	  bufsize = 0;
1587 	  for(locus=0;locus<data->loci; ++locus)
1588             {
1589 	      for (ind = 0; ind < data->numind[pop][locus]; ind++)
1590                 {
1591 		  // assume that buffer is always larger than nmlength
1592 		  // resetting bufsize to start!
1593 		  bufsize = sprintf (buffer, "%*s\n", (int) options->nmlength,
1594 				     data->indnames[pop][ind][locus]);
1595 		  pack_sequence_data (&buffer, &bufsize, &allocfpsize, data, pop, ind, locus);
1596 		  bufsize += 1;
1597 		  MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1598 		  MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1599                 }
1600             }
1601         }
1602     }
1603   // send geofile
1604   if (options->geo)
1605     {
1606       bufsize = 0;
1607       for (pop = 0; pop < data->numpop * data->numpop; pop++)
1608         {
1609 	  if (bufsize >= allocfpsize-100)
1610 	    {
1611 	      allocfpsize += LONGLINESIZE;
1612 	      buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1613 	    }
1614 	  bufsize += sprintf (buffer+ bufsize, "%f %f\n", data->geo[pop], data->lgeo[pop]);
1615         }
1616       bufsize += 1;
1617       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1618       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1619     }
1620   // send randomsubset data
1621   bufsize = 0;
1622   for (pop = 0; pop < data->numpop; pop++)
1623     {
1624       for(locus=0;locus<data->loci;locus++)
1625 	{
1626 	  for(ind=0; ind < data->numind[pop][locus]; ind++)
1627 	    {
1628 	      if (bufsize >= allocfpsize-100)
1629 		{
1630 		  allocfpsize += LONGLINESIZE;
1631 		  buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1632 		}
1633 	      bufsize += sprintf (buffer+ bufsize, "%li\n", data->shuffled[pop][locus][ind]);
1634 	    }
1635         }
1636     }
1637   bufsize += 1;
1638   MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1639   MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1640 
1641   // tipdate file
1642   if (options->has_datefile)
1643     {
1644       bufsize = 0;
1645       for(locus=0;locus<data->loci;locus++)
1646 	{
1647 	  for (pop = 0; pop < data->numpop; pop++)
1648 	    {
1649 	      for(ind=0; ind < data->numind[pop][locus]; ind++)
1650 		{
1651 		  if (bufsize >= allocfpsize-200)
1652 		    {
1653 		      allocfpsize += LONGLINESIZE;
1654 		      buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1655 		    }
1656 		  bufsize += sprintf (buffer+bufsize, "%s %20.20f\n", data->sampledates[pop][locus][ind].name, data->sampledates[pop][locus][ind].date);
1657 		}
1658 	    }
1659 	}
1660       // assumes buffer is big enough
1661       bufsize += sprintf (buffer + bufsize, "%20.20f\n", data->maxsampledate);
1662       //      printf("%i> pack_data maxsampledate=%f %s\n",myID, data->maxsampledate, fp);
1663       //printf("%i> pack_data:%s\n""""""""""""""""""""""""\n\n\n",myID, buffer);
1664       bufsize += 1;
1665       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1666       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1667     }
1668 
1669   // uepfile
1670 #ifdef UEP
1671   if (options->uep)
1672     {
1673       sumtips = 0;
1674       bufsize = 0;
1675       for (pop = 0; pop < data->numpop; ++pop)
1676 	sumtips += data->numind[pop][0];//Assumes UEP is matched by locus 1
1677       bufsize += sprintf (buffer+ bufsize, "%li %li\n", sumtips, data->uepsites);
1678       if (strchr (SEQUENCETYPES, options->datatype))
1679         {
1680 	  for (pop = 0; sumtips; pop++)
1681             {
1682 	      for (i = 0; i < data->uepsites; i++)
1683                 {
1684 		  if (bufsize >= allocfpsize-100)
1685 		    {
1686 		      allocfpsize += LONGLINESIZE;
1687 		      buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1688 		    }
1689 		  bufsize += sprintf (buffer + bufsize, "%i\n", data->uep[pop][i]);
1690                 }
1691             }
1692         }
1693       else
1694         {
1695 	  for (pop = 0; sumtips; pop++)
1696             {
1697 	      for (i = 0; i < data->uepsites; i++)
1698                 {
1699 		  if (bufsize >= allocfpsize-100)
1700 		    {
1701 		      allocfpsize += LONGLINESIZE;
1702 		      buffer = (char *) myrealloc (buffer, sizeof (char) * allocfpsize);
1703 		    }
1704 		  bufsize += sprintf (buffer + bufsize, "%i %i\n", data->uep[pop][i],
1705 					  data->uep[pop + sumtips][i]);
1706                 }
1707             }
1708         }
1709       bufsize += 1;
1710       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1711       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1712     }
1713 
1714 #endif
1715   myfree(buffer);
1716   return 0;
1717 }
1718 
1719 
1720 void
pack_allele_data(char ** buffer,long * bufsize,long * allocbufsize,data_fmt * data,long pop,long ind)1721 pack_allele_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data, long pop,
1722                   long ind)
1723 {
1724   //char fp[LONGLINESIZE];
1725     long locus;
1726     for (locus = 0; locus < data->loci; locus++)
1727     {
1728       if (*bufsize > *allocbufsize-100)
1729 	{
1730 	  *allocbufsize += *bufsize;
1731 	  *buffer = (char*) myrealloc(*buffer, *allocbufsize * sizeof(char));
1732 	}
1733         *bufsize += sprintf (*buffer + *bufsize, "%s %s\n", data->yy[pop][ind][locus][0],
1734                                  data->yy[pop][ind][locus][1]);
1735     }
1736 }
1737 
1738 void
pack_sequence_data(char ** buffer,long * bufsize,long * allocbufsize,data_fmt * data,long pop,long ind,long locus)1739 pack_sequence_data (char **buffer, long *bufsize, long *allocbufsize, data_fmt * data, long pop,
1740                     long ind, long locus)
1741 {
1742   //char *fp;
1743     //  long locus;
1744     //  fp = mycalloc (1, sizeof (char));
1745     // for (locus = 0; locus < data->loci; locus++)
1746     //   {
1747     if (*bufsize > *allocbufsize-data->seq[0]->sites[locus])
1748       {
1749 	*allocbufsize += *bufsize;
1750 	*buffer = (char*) myrealloc(*buffer, *allocbufsize * sizeof(char));
1751       }
1752     *bufsize += sprintf (*buffer + *bufsize, "%s\n", data->yy[pop][ind][locus][0]);
1753     //   }
1754 }
1755 
1756 // this function and get_data() do not mix well!
1757 void
unpack_databuffer(data_fmt * data,option_fmt * options)1758 unpack_databuffer (data_fmt * data, option_fmt * options)
1759 {
1760   long len;
1761     long locus, pop, ind, i=0;
1762     long biggest;
1763 #ifdef UEP
1764     long sumtips;
1765 #endif
1766     char *buffer;
1767     char *buf;
1768     char *input;
1769     long inputsize = LONGLINESIZE;
1770     char *name;
1771     long hasghost;
1772     long bufsize;
1773     long allocbufsize=LONGLINESIZE;
1774     buffer = (char *)   mycalloc(allocbufsize,sizeof(char));
1775     input = (char *) mycalloc (LONGLINESIZE, sizeof (char));
1776     name = (char *)  mycalloc (LONGLINESIZE, sizeof (char));
1777     // receive header info
1778     MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1779     if (bufsize > allocbufsize)
1780       {
1781 	allocbufsize = bufsize;
1782 	buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1783       }
1784     MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1785     buf = buffer;
1786     sgets_safe (&input, &inputsize, &buf);
1787     sscanf (input, "%c%li%li%li", &options->datatype, &hasghost, &data->numpop,
1788             &data->loci);
1789     //printf("datatype=%c numpop=%li loci=%li\n",options->datatype,data->numpop,data->loci);
1790     data->hasghost = (boolean) hasghost;
1791     init_data_structure1 (&data, options);
1792     // receive site and loci information
1793     MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1794     if (bufsize > allocbufsize)
1795       {
1796 	allocbufsize = bufsize;
1797 	buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1798       }
1799     MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1800     buf = buffer;
1801     for (locus = 0; locus < data->loci; locus++)
1802     {
1803         sgets_safe (&input, &inputsize, &buf);
1804         sscanf (input, "%li", &data->seq[0]->sites[locus]);
1805     }
1806     sgets_safe (&input, &inputsize, &buf);
1807 #ifdef USE_MYREAL_FLOAT
1808     sscanf (input, "%li%f", &data->seq[0]->addon, &data->seq[0]->fracchange);
1809 #else
1810     sscanf (input, "%li%lf", &data->seq[0]->addon, &data->seq[0]->fracchange);
1811 #endif
1812     // population data
1813     for (pop = 0; pop < data->numpop; pop++)
1814     {
1815       // receive each population name
1816       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1817       if (bufsize > allocbufsize)
1818 	{
1819 	  allocbufsize = bufsize;
1820 	  buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1821 	}
1822       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1823       buf = buffer;
1824       sgets_safe (&input, &inputsize, &buf);
1825       sscanf (input, "%s", data->popnames[pop]);
1826       biggest=0;
1827       // receive ind numbers for each locus
1828       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1829       if (bufsize > allocbufsize)
1830 	{
1831 	  allocbufsize = bufsize;
1832 	  buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1833 	}
1834       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1835       buf = buffer;
1836       for(locus=0; locus<data->loci; locus++)
1837         {
1838 	  sgets_safe (&input, &inputsize, &buf);
1839 	  sscanf (input, "%li %li", &data->numind[pop][locus],&data->numalleles[pop][locus]);
1840 	  if(biggest<data->numind[pop][locus])
1841 	    biggest = data->numind[pop][locus];
1842         }
1843       init_data_structure2 (&data, options, pop);
1844       // receive individual data per pop
1845         if (!strchr (SEQUENCETYPES, options->datatype))
1846         {
1847             for (ind = 0; ind < biggest; ind++)
1848             {
1849 	      MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1850 	      if (bufsize > allocbufsize)
1851 		{
1852 		  allocbufsize = bufsize;
1853 		  buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1854 		}
1855 	      MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1856 	      buf = buffer;
1857 	      sgets_safe (&input, &inputsize, &buf);
1858 	      sscanf (input, "%s", data->indnames[pop][ind][0]);
1859 	      for (locus = 0; locus < data->loci; locus++)
1860                 {
1861 		  sgets_safe (&input, &inputsize, &buf);
1862 		  sscanf (input, "%s %s", data->yy[pop][ind][locus][0],
1863 			  data->yy[pop][ind][locus][1]);
1864                 }
1865             }
1866         }
1867         else
1868         {
1869             for (locus = 0; locus < data->loci; locus++)
1870             {
1871                 for (ind = 0; ind < data->numind[pop][locus]; ind++)
1872                 {
1873 		  MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1874 		  if (bufsize > allocbufsize)
1875 		    {
1876 		      allocbufsize = bufsize;
1877 		      buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1878 		    }
1879 		  MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1880 		  buf = buffer;
1881 		  sgets_safe (&input, &inputsize, &buf);
1882 		  sprintf(data->indnames[pop][ind][locus],"%-*s", (int) options->nmlength, input);
1883 		  input =(char *) myrealloc (input, sizeof (char) * (1 + data->seq[0]->sites[locus]));
1884 		  sgets (input, 10 + data->seq[0]->sites[locus], &buf);
1885 		  len = data->seq[0]->sites[locus];
1886 		  sprintf(data->yy[pop][ind][locus][0],"%-*s", (int) len,input);
1887                 }
1888             }
1889         }
1890     }
1891     // geofile
1892     data->geo =
1893         (MYREAL *) mycalloc (1, sizeof (MYREAL) * data->numpop * data->numpop);
1894     data->lgeo =
1895         (MYREAL *) mycalloc (1, sizeof (MYREAL) * data->numpop * data->numpop);
1896     if (!options->geo)
1897     {
1898         for (i = 0; i < data->numpop * data->numpop; i++)
1899             data->geo[i] = 1.0;
1900     }
1901     else
1902     {
1903       // receive geofile
1904       MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1905       if (bufsize > allocbufsize)
1906 	{
1907 	  allocbufsize = bufsize;
1908 	  buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1909 	}
1910       MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1911       buf = buffer;
1912       for (pop = 0; pop < data->numpop * data->numpop; pop++)
1913         {
1914 	  sgets_safe (&input, &inputsize, &buf);
1915 #ifdef USE_MYREAL_FLOAT
1916 	  sscanf (input, "%f%f", &data->geo[pop], &data->lgeo[pop]);
1917 #else
1918 	  sscanf (input, "%lf%lf", &data->geo[pop], &data->lgeo[pop]);
1919 #endif
1920         }
1921     }
1922 
1923   // receive randomsubset
1924     data->shuffled = (long ***) mycalloc(data->numpop,sizeof(long **));
1925     MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1926     if (bufsize > allocbufsize)
1927       {
1928 	allocbufsize = bufsize;
1929 	buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1930       }
1931     MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1932     buf = buffer;
1933     for (pop = 0; pop < data->numpop; pop++)
1934     {
1935       data->shuffled[pop] = (long **) mycalloc(data->loci,sizeof(long *));
1936       for(locus=0;locus<data->loci;locus++)
1937 	{
1938 	  data->shuffled[pop][locus] = (long *) mycalloc(data->numind[pop][locus],sizeof(long));
1939 	  for(ind=0; ind < data->numind[pop][locus]; ind++)
1940 	    {
1941 	      sgets_safe (&input, &inputsize, &buf);
1942 	      sscanf (input, "%li", &data->shuffled[pop][locus][ind]);
1943 	    }
1944 	}
1945     }
1946 
1947     //receive date data
1948     data->sampledates = (tipdate_fmt ***) mycalloc (data->numpop, sizeof (tipdate_fmt **));
1949     data->sampledates[0] = (tipdate_fmt **) mycalloc (data->numpop * data->loci, sizeof (tipdate_fmt *));
1950     for (pop = 1; pop < data->numpop; pop++)
1951       {
1952 	data->sampledates[pop] = data->sampledates[0] + data->loci * pop;
1953       }
1954     for(locus=0;locus<data->loci;locus++)
1955       {
1956 	for(pop=0;pop < data->numpop; pop++)
1957 	  {
1958 	    data->sampledates[pop][locus] = (tipdate_fmt*) mycalloc(data->numind[pop][locus],sizeof(tipdate_fmt));
1959 	  }
1960       }
1961     data->maxsampledate = 0.0;
1962     if (options->has_datefile)
1963       {
1964 	MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
1965 	if (bufsize > allocbufsize)
1966 	  {
1967 	    allocbufsize = bufsize;
1968 	    buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
1969 	  }
1970 	MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
1971 	buf = buffer;
1972 	for(locus=0;locus<data->loci;locus++)
1973 	  {
1974 	    for (pop = 0; pop < data->numpop; pop++)
1975 	      {
1976 		for(ind=0; ind < data->numind[pop][locus]; ind++)
1977 		  {
1978 		    sgets_safe (&input, &inputsize, &buf);
1979 		    //printf("%i> %s\n",myID, input);
1980 #ifdef USE_MYREAL_FLOAT
1981 		    sscanf (input, "%s %f", name, &data->sampledates[pop][locus][ind].date);
1982 #else
1983 		    sscanf (input, "%s %lf", name, &data->sampledates[pop][locus][ind].date);
1984 
1985 		    data->sampledates[pop][locus][ind].name = (char *) mycalloc(strlen(name)+1,sizeof(char));
1986 		    strcpy(data->sampledates[pop][locus][ind].name, name);
1987 		  }
1988 	      }
1989 	  }
1990 	sgets_safe (&input, &inputsize, &buf);
1991 #ifdef USE_MYREAL_FLOAT
1992 	sscanf(input,"%f",&data->maxsampledate);
1993 #else
1994 	sscanf(input,"%lf",&data->maxsampledate);
1995 #endif
1996 
1997 	//	printf("%i> maxsampledate=%f %s\n''''''\n%s\n''''''''''''\n",myID, data->maxsampledate, input, buffer);
1998       }
1999 #endif
2000     // uepfile
2001 #ifdef UEP
2002     if (options->uep)
2003     {
2004 	MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
2005 	if (bufsize > allocbufsize)
2006 	  {
2007 	    allocbufsize = bufsize;
2008 	    buffer = (char*) myrealloc(buffer, allocbufsize * sizeof(char));
2009 	  }
2010 	MYMPIBCAST (buffer, bufsize, MPI_CHAR, MASTER, comm_world);
2011 	buf = buffer;
2012         sgets_safe (&input, &inputsize, &buf);
2013         sscanf (input, "%li%li", &sumtips, &data->uepsites);
2014         data->uep =
2015             (int **) mycalloc (number_genomes (options->datatype) * sumtips,
2016                              sizeof (int *));
2017         if (strchr (SEQUENCETYPES, options->datatype))
2018         {
2019             for (pop = 0; sumtips; pop++)
2020             {
2021                 data->uep[i] = (int *) mycalloc (data->uepsites, sizeof (int));
2022                 for (i = 0; i < data->uepsites; i++)
2023                 {
2024                     sgets_safe (&input, &inputsize, &buf);
2025                     sscanf (input, "%i", &data->uep[pop][i]);
2026                 }
2027             }
2028         }
2029         else
2030         {
2031             for (pop = 0; sumtips; pop++)
2032             {
2033                 data->uep[i] = (int *) mycalloc (data->uepsites, sizeof (int));
2034                 data->uep[i + sumtips] =
2035                     (int *) mycalloc (data->uepsites, sizeof (int));
2036                 for (i = 0; i < data->uepsites; i++)
2037                 {
2038                     sgets_safe (&input, &inputsize, &buf);
2039                     sscanf (input, "%i%i", &data->uep[pop][i],
2040                             &data->uep[pop + sumtips][i]);
2041                 }
2042             }
2043         }
2044 
2045     }
2046 #endif
2047     init_data_structure3 (data,options);
2048 
2049     switch (options->datatype)
2050     {
2051     case 'a':
2052         create_alleles (data, options);
2053         break;
2054     case 'b':
2055         for (pop = 0; pop < data->loci; pop++)
2056             data->maxalleles[pop] = XBROWN_SIZE;
2057         break;
2058     case 'm':
2059         create_alleles (data, options);
2060         for (pop = 0; pop < data->loci; pop++)
2061             data->maxalleles[pop] = options->micro_stepnum;
2062         break;
2063     }
2064     myfree(input);
2065     myfree(name);
2066     myfree(buffer);
2067 }
2068 
2069 #if 0
2070 ///
2071 /// unpacks results from buffer to fill data structures in master for final printout
2072 void
unpack_plotplane_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2073 unpack_plotplane_buffer (MYREAL *buffer, world_fmt * world,
2074                       long locus, long maxrep, long numpop)
2075 {
2076 
2077 }
2078 ///
2079 /// pack plotplane into buffer to ship to master
2080 long
pack_plotplane_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2081 pack_plotplane_buffer (MYREAL **buffer, world_fmt * world,
2082                     long locus, long maxrep, long numpop)
2083 {
2084 }
2085 #endif
2086 
2087 
2088 ///
2089 /// unpacks results from buffer to fill data structures in master for final printout
2090 void
unpack_result_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2091 unpack_result_buffer (MYREAL *buffer, world_fmt * world,
2092                       long locus, long maxrep, long numpop)
2093 {
2094   long z=0;
2095   long rep;
2096   long pop;
2097   long addon=0;
2098   long numpop2 = world->numpop2;
2099   timearchive_fmt **atl = world->atl;
2100   MYREAL ***apg0 = world->apg0;
2101 
2102   if (maxrep > 1)
2103     addon = 1;
2104 
2105   for (rep = 0; rep < maxrep + addon; rep++)
2106     {
2107         atl[rep][locus].param_like = buffer[z++];
2108         for (pop = 0; pop < 4 * numpop2; pop++)
2109         {
2110             atl[rep][locus].parameters[pop] = buffer[z++];
2111         }
2112 	//fprintf(stderr,"%i> unpacked result locus=%li replicate %li\n",myID,locus, rep);
2113     }
2114     // apg0
2115     for (rep = 0; rep < maxrep; rep++)
2116     {
2117         for (pop = 0; pop < world->options->lsteps; pop++)
2118         {
2119             apg0[rep][locus][pop] = buffer[z++];
2120         }
2121     }
2122   if(world->options->bayes_infer)
2123     {
2124       // BF material
2125       z = unpack_BF_buffer(buffer, z, locus, world);
2126       // ESS material
2127       z = unpack_ess_buffer(buffer, z, world);
2128     }
2129 }
2130 
2131 ///
2132 /// pack results into buffer to ship to master
2133 long
pack_result_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2134 pack_result_buffer (MYREAL **buffer, world_fmt * world,
2135                     long locus, long maxrep, long numpop)
2136 {
2137   long rep;
2138   long pop;
2139   long bufsize;
2140   long z = 0;
2141   long addon = 0;
2142   long numpop2 = world->numpop2;
2143   long nn = numpop2 + (world->bayes->mu * world->loci);
2144   timearchive_fmt **atl = world->atl;
2145   MYREAL ***apg0 = world->apg0;
2146 
2147   if (maxrep > 1)
2148     addon = 1;
2149 
2150   bufsize = (maxrep+addon) + (maxrep+addon) * 4 * numpop2 + maxrep * world->options->lsteps + \
2151     world->options->heated_chains * world->loci + 5 * world->loci + 1 + nn * 2;
2152   (*buffer) = (MYREAL *) myrealloc (*buffer, sizeof (MYREAL) * bufsize);
2153   memset (*buffer, 0, sizeof (MYREAL) * bufsize);
2154 
2155   for (rep = 0; rep < maxrep + addon; rep++)
2156     {
2157       (*buffer)[z++] = atl[rep][locus].param_like;
2158       for (pop = 0; pop < 4 * numpop2; pop++)
2159 	{
2160 	  (*buffer)[z++] =  atl[rep][locus].parameters[pop];
2161 	}
2162       //      fprintf(stderr,"%i> packed result locus=%li replicate %li\n",myID,locus, rep);
2163     }
2164   // apg0
2165   for (rep = 0; rep < maxrep; rep++)
2166     {
2167       for (pop = 0; pop < world->options->lsteps; pop++)
2168         {
2169 	  (*buffer)[z++] =  apg0[rep][locus][pop];
2170         }
2171     }
2172   if(world->options->bayes_infer)
2173     {
2174       // BF material
2175       if(!world->data->skiploci[locus])
2176 	{
2177 	  //fprintf(stderr,"%i> packed result locus=%li replicate %li\n",myID,locus, rep);
2178 	  z = pack_BF_buffer(buffer, z, locus, world);
2179 	}
2180       // ESS material
2181       z = pack_ess_buffer(buffer, z, world);
2182     }
2183 #ifdef DEBUG_MPI
2184   fprintf(stdout,"DEBUG: %i> z=%li, bufsize=%li\n", myID, z, bufsize);
2185 #endif
2186   if(bufsize >= z)
2187     {
2188       bufsize = z;
2189     }
2190   else
2191     {
2192       fprintf(stderr,"%i> bufsize=%li < z=%li\n",myID,bufsize,z);
2193       error("pack_results tried to stuff to much into buffer in pack_result_buffer()\n");
2194     }
2195   return bufsize;
2196 }
2197 
2198 
2199 
2200 
2201 ///
2202 /// unpacks replicate samples of migration events, adds numbers and part-vectors
2203 /// to the final array per locus, this function is only used with replicates over
2204 /// multiple loci.
2205 void
unpack_mighist_replicate_buffer_old(MYREAL * buffer,world_fmt * world,long locus,long numpop)2206 unpack_mighist_replicate_buffer_old (MYREAL *buffer, world_fmt * world,
2207                        long locus, long numpop)
2208 {
2209   long i;
2210   long j;
2211   long z = 0;
2212   long nummighist;
2213   long nummighistold;
2214   mighistloci_fmt *aa;
2215   aa = &world->mighistloci[locus];
2216   nummighist = (long) buffer[z++];
2217   nummighistold = aa->mighistnum;
2218   aa->mighistnum += nummighist;
2219   if(aa->allocsize <= aa->mighistnum)
2220     {
2221       aa->mighist = (mighist_fmt *) myrealloc (aa->mighist, sizeof (mighist_fmt) *(aa->mighistnum+1));
2222       for(j=aa->allocsize; j<=aa->mighistnum; j++)
2223         {
2224 	  aa->mighist[j].allocsize=1;
2225 	  aa->mighist[j].migeventsize=0;
2226 	  aa->mighist[j].migevents =
2227 	    (migevent_fmt *) mycalloc (1,  sizeof (migevent_fmt) );
2228 	  //printf("%i> first events: alloc=%li size=%li\n", myID, aa->mighist[j].allocsize , aa->mighist[j].migeventsize);
2229         }
2230       aa->allocsize = aa->mighistnum+1;
2231     }
2232   for (j = nummighistold; j < aa->mighistnum; j++)
2233     {
2234       aa->mighist[j].copies = (long) buffer[z++];
2235       aa->mighist[j].weight = (long) buffer[z++];
2236       aa->mighist[j].migeventsize = (long) buffer[z++];
2237       //printf("%i> events: alloc=%li size=%li\n", myID, aa->mighist[j].allocsize , aa->mighist[j].migeventsize);
2238       aa->mighist[j].allocsize = aa->mighist[j].migeventsize + 1;
2239       aa->mighist[j].migevents = (migevent_fmt *) myrealloc (aa->mighist[j].migevents,
2240 							     sizeof (migevent_fmt) *
2241 							     aa->mighist[j].allocsize);
2242       for (i = 0; i < aa->mighist[j].migeventsize; i++)
2243         {
2244 	  aa->mighist[j].migevents[i].age = buffer[z++];
2245 	  aa->mighist[j].migevents[i].from = (long) buffer[z++];
2246 	  aa->mighist[j].migevents[i].to = (long) buffer[z++];
2247 	  aa->mighist[j].migevents[i].sumlines = (long) buffer[z++];
2248         }
2249     }
2250 }
2251 ///
2252 /// unpacks replicate samples of migration events, adds numbers and part-vectors
2253 /// to the final array per locus, this function is only used with replicates over
2254 /// multiple loci.
2255 void
unpack_mighist_replicate_buffer(MYREAL * buffer,world_fmt * world,long locus,long numpop)2256 unpack_mighist_replicate_buffer(MYREAL *buffer, world_fmt * world,
2257                        long locus, long numpop)
2258 {
2259   long i;
2260   long pop;
2261   long z         = 0;
2262   long          oldeventbinnum;
2263   long         * eventbinnum = NULL;
2264   duo         ** eventbins;
2265   mighistloci_fmt *aa;
2266 
2267   aa = &world->mighistloci[locus];
2268   eventbins = aa->migeventbins;
2269   eventbinnum = aa->migeventbinnum;
2270 
2271 
2272   for (pop = (world->options->mighist_all ? 0 : world->numpop);
2273        pop <  world->numpop2; pop++)
2274     {
2275       oldeventbinnum = eventbinnum[pop];
2276       eventbinnum[pop] = (long) buffer[z++];
2277       if(oldeventbinnum < eventbinnum[pop])
2278 	{
2279 	  aa->migeventbins[pop] = (duo *) myrealloc(aa->migeventbins[pop], sizeof(duo) * eventbinnum[pop]);
2280 	  memset(aa->migeventbins[pop][oldeventbinnum], 0, sizeof(duo) * (eventbinnum[pop] - oldeventbinnum));
2281 	}
2282       for (i = 0; i < eventbinnum[pop]; i++)
2283 	{
2284 	  eventbins[pop][i][0] += buffer[z++];
2285 	  eventbins[pop][i][1] += buffer[z++];
2286 	}
2287     }
2288 }
2289 
2290 
2291 ///
2292 /// receive buffer with treespace content and unpack it.
2293 /// Messing with format assignment to fit into the result_worker/master scheme
2294 void
unpack_treespace_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2295 unpack_treespace_buffer (MYREAL *buffer, world_fmt * world,
2296                        long locus, long maxrep, long numpop)
2297 {
2298   MYREAL like = -HUGE;
2299     char *input;
2300     char *sbuf;
2301     char *buf ;
2302     long size  = strlen((char *) buffer)+1;
2303     buf = NULL;
2304     //    printf("%i> in unpack_treespace_buffer() buffer has size %li\n%s\n\n",myID,size, (char*) buffer);
2305     if(size<=1)
2306       {
2307 	return;
2308       }
2309     //input = (char*) mycalloc(LINESIZE,sizeof(char));
2310     //    printf("%i> in unpack_treespace_buffer() after first if\n",myID);
2311     buf = (char *) mycalloc(size+1,sizeof(char));
2312     sbuf = buf;
2313     memcpy(buf, buffer,sizeof(char)*(size));
2314     //printf("%i> in unpack_treespace_buffer() after memcpy\n",myID);
2315     //    printf("%i> in unpack_treespace_buffer()  %s\n%s\n",myID,input,buf);
2316     //    printf("%i> Unpack++++++++++++\nsize=%li\n%s\n++++++++++++\n%s\n++++++++++++++++++++++++++++++++++++++++++++++++++\n",myID, size, buf, (char*) buffer);fflush(stdout);
2317     if(world->options->treeprint==BEST)
2318       {
2319 	input = strsep(&buf,"@");
2320 	//printf("%i> in unpack_treespace_buffer() after strsep %s\n",myID,input);fflush(stdout);
2321 	like = atof(input);
2322 	if(like >= world->besttreelike[locus])
2323 	  {
2324 	    world->besttreelike[locus] = like;
2325 	    //	if(world->treespacenum[locus] < size)
2326 	    //  {
2327 	    //printf("%i> in unpack_treespace_buffer() before treespace realloc  %s\n%s\n",myID,input,buf);
2328 	    size = strlen(buf) + 1;
2329 	    world->treespacealloc[locus] = size;
2330 	    world->treespacenum[locus] = size;
2331 	    world->treespace[locus] = (char *) myrealloc(world->treespace[locus],
2332 							 sizeof(char)*(size+1));
2333 	    strcpy(world->treespace[locus],buf);
2334 	  }
2335       }
2336     else
2337       {
2338 	size = strlen(buf) + 1;
2339 	world->treespacealloc[locus] = size;
2340 	world->treespacenum[locus] = size;
2341 	world->treespace[locus] = (char *) myrealloc(world->treespace[locus],
2342 						     sizeof(char)*(size+1));
2343 	strcpy(world->treespace[locus],buf);
2344       }
2345     myfree(sbuf);
2346 }
2347 
2348 ///
2349 /// pack treespace into buffer
2350 /// ship to master; messing with format assignment because the standard transport buffer
2351 /// is a double
pack_treespace_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2352 long pack_treespace_buffer (MYREAL **buffer, world_fmt * world,
2353 			    long locus, long maxrep, long numpop)
2354 {
2355   long thissize = strlen(world->treespace[locus])+1;
2356   long thisrealsize;
2357   char *input;
2358   char *ptr2, *ptr3;
2359   long pos = 0;
2360   MYREAL like = -HUGE;
2361   if(world->treespace[locus]==NULL)
2362     return 0;
2363   ptr2 = strstr(world->treespace[locus],"=");
2364   ptr3 = strstr(world->treespace[locus]," ]\n");
2365   pos  = ptr3-ptr2;
2366 
2367   input = mycalloc(LONGLINESIZE,sizeof(char));
2368   if(world->options->treeprint == BEST)
2369     {
2370       sprintf(input,"%-*s", (int) pos, ptr2+1);
2371       like = atof(input);
2372       pos   = sprintf(input,"%f @", like);
2373       //  fprintf(stdout,"%i> in pack_treespace_buffer() filling %li size\n",myID, thissize);
2374       // the master routine for this expects a MYREAL *buffer, but
2375       // the tree is a string
2376       thissize += pos;
2377     }
2378   thisrealsize = (long) (1. + (thissize * sizeof(char) / sizeof(MYREAL)));
2379   (*buffer) = (MYREAL *) myrealloc(*buffer, (1+thisrealsize) * sizeof(MYREAL));
2380   memset(*buffer, 0, sizeof(MYREAL) * (1+thisrealsize));
2381   // the sizeof(char) is NO MISTAKE!
2382   sprintf((char*)(*buffer), "%s%s", input, world->treespace[locus]);
2383   //  if(myID==1)
2384   //  {
2385   //    printf("%i> pack++++++++++++++++\nsize=%li, realsize=%li [char=%li real=%li]\n%s\n++++++++++++++++++++++++++++++++\n",myID, thissize,thisrealsize, sizeof(char), sizeof(MYREAL),(char *)(*buffer));
2386   //  }
2387   return thisrealsize;
2388 }
2389 
2390 
2391 void
unpack_mighist_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2392 unpack_mighist_buffer (MYREAL *buffer, world_fmt * world,
2393                        long locus, long maxrep, long numpop)
2394 {
2395   long i;
2396   long pop;
2397   long z         = 0;
2398   long           oldeventbinnum;
2399   long         * eventbinnum = NULL;
2400   duo         ** eventbins;
2401   mighistloci_fmt *aa;
2402 
2403   aa = &world->mighistloci[locus];
2404   eventbins = aa->migeventbins;
2405   eventbinnum = aa->migeventbinnum;
2406 
2407 
2408   for (pop = (world->options->mighist_all ? 0 : world->numpop);
2409        pop <  world->numpop2; pop++)
2410     {
2411       oldeventbinnum = eventbinnum[pop];
2412       eventbinnum[pop] = (long) buffer[z++];
2413       if(oldeventbinnum < eventbinnum[pop])
2414 	{
2415 	  aa->migeventbins[pop] = (duo *) myrealloc(aa->migeventbins[pop], sizeof(duo) * eventbinnum[pop]);
2416 	  memset(aa->migeventbins[pop][oldeventbinnum], 0, sizeof(duo) * (eventbinnum[pop] - oldeventbinnum));
2417 	}
2418 
2419       for (i = 0; i < eventbinnum[pop]; i++)
2420 	{
2421 	  eventbins[pop][i][0] = buffer[z++];
2422 	  eventbins[pop][i][1] = buffer[z++];
2423 	}
2424     }
2425 }
2426 
2427 
2428 ///
2429 /// pack migration events and coalescence events into  buffer to
2430 /// ship to master
pack_mighist_buffer_old(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2431 long pack_mighist_buffer_old (MYREAL **buffer, world_fmt * world,
2432 			  long locus, long maxrep, long numpop)
2433 {
2434   long i;
2435   long j;
2436   long bufsize = 1;
2437   long z = 0;
2438   mighistloci_fmt *aa;
2439   long thin = 1;
2440   MYREAL *tmp;
2441   aa = &world->mighistloci[locus];
2442   for (j = 0; j < aa->mighistnum; j++)
2443     {
2444       //aa->mighist[j].migeventsize = 1;
2445       bufsize += aa->mighist[j].migeventsize;
2446     }
2447   // trial code to avoid a crash due to not enough memory available for buffer
2448   if((tmp = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize+1))) == NULL)
2449     {
2450       do
2451 	{
2452 	  thin *= 10;
2453 	  warning("Recovery mode for the migration events: thinning the events list by a factor of %li",thin);
2454 	  bufsize = 0;
2455 	  for (j = 0; j < aa->mighistnum; j+=thin)
2456 	    {
2457 	      if(j<aa->mighistnum)
2458 		bufsize += aa->mighist[j].migeventsize;
2459 	    }
2460 	  tmp = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize+1));
2461 	}
2462       while(tmp == NULL || thin < 10000);
2463     }
2464   (*buffer) = tmp;
2465   memset (*buffer, 0, sizeof (MYREAL) * (bufsize+1));
2466 
2467   (*buffer)[z++] = (MYREAL)(aa->mighistnum/thin);
2468   for (j = 0; j < aa->mighistnum; j+=thin)
2469     {
2470       if(j < aa->mighistnum)
2471 	{
2472 #ifdef DEBUG_MPI
2473 	  printf ("%i> packmighistbuffer: %li %li %li\n", myID, aa->mighist[j].copies,
2474 		  aa->mighist[j].weight, aa->mighist[j].migeventsize);
2475 #endif
2476 	  (*buffer)[z++] = (MYREAL) aa->mighist[j].copies;
2477 	  (*buffer)[z++] = (MYREAL) aa->mighist[j].weight;
2478 	  (*buffer)[z++] = (MYREAL) aa->mighist[j].migeventsize;
2479 
2480 	  for (i = 0; i < aa->mighist[j].migeventsize; i++)
2481 	    {
2482 	      (*buffer)[z++] = aa->mighist[j].migevents[i].age;
2483 	      (*buffer)[z++] = (MYREAL) aa->mighist[j].migevents[i].from;
2484 	      (*buffer)[z++] = (MYREAL) aa->mighist[j].migevents[i].to;
2485 	      (*buffer)[z++] = (MYREAL) aa->mighist[j].migevents[i].sumlines;
2486 	    }
2487 	}
2488     }
2489   if(bufsize < z)
2490     {
2491       fprintf(stderr,"%i> bufsize=%li < z=%li\n",myID,bufsize,z);
2492       error("pack_mighist_buffer() failed\n");
2493     }
2494   return z;
2495 }
2496 ///
2497 /// pack migration events and coalescence events into  buffer to
2498 /// ship to master
pack_mighist_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2499 long pack_mighist_buffer (MYREAL **buffer, world_fmt * world,
2500 			  long locus, long maxrep, long numpop)
2501 {
2502   long i;
2503   long pop;
2504   long z         = 0;
2505   long bufsize   = 0;
2506   long         * eventbinnum = NULL;
2507   duo         ** eventbins;
2508   mighistloci_fmt *aa;
2509 
2510   aa = &world->mighistloci[locus];
2511   eventbins = aa->migeventbins;
2512   eventbinnum = aa->migeventbinnum;
2513   for (pop = (world->options->mighist_all ? 0 : world->numpop);
2514        pop <  world->numpop2; pop++)
2515     {
2516       bufsize += 1 + 2 * eventbinnum[pop];
2517     }
2518   (*buffer) = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize+1));
2519 
2520   for (pop = (world->options->mighist_all ? 0 : world->numpop);
2521        pop <  world->numpop2; pop++)
2522     {
2523       (*buffer)[z++] = (MYREAL)(eventbinnum[pop]);
2524       for (i = 0; i < eventbinnum[pop]; i++)
2525 	{
2526 	  (*buffer)[z++] = (MYREAL) eventbins[pop][i][0];
2527 	  (*buffer)[z++] = (MYREAL) eventbins[pop][i][1];
2528 	}
2529     }
2530   if(bufsize < z)
2531     error("pack_mighist_buffer() has a memory problem");
2532   return z;
2533 }
2534 
2535 ///
2536 /// receive buffer with skyline content and unpack it
2537 void
unpack_skyline_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2538 unpack_skyline_buffer (MYREAL *buffer, world_fmt * world,
2539                        long locus, long maxrep, long numpop)
2540 {
2541   //const   MYREAL invmaxrep = 1./((world->options->replicate
2542   //                       && world->options->replicatenum >
2543   //				  0) ? world->options->replicatenum : 1);
2544   const long np2 = numpop * numpop;
2545   long i;
2546   long j;
2547   long templ;
2548   long allocsize;
2549   long z   = 0;
2550   long *receive_eventbinnum;
2551   MYREAL temp;
2552   mighistloci_fmt *aa;
2553 
2554   receive_eventbinnum = (long *) mycalloc(np2, sizeof(long));
2555 
2556   aa = &world->mighistloci[locus];
2557   // read the number of bins for each parameter
2558   for(j=0; j< world->numpop2; j++)
2559     {
2560       templ = (long) buffer[z++];
2561       receive_eventbinnum[j] = templ;
2562       if(templ >= aa->eventbinnum[j])
2563 	{
2564 	  allocsize = templ + 1;
2565 	  aa->eventbins[j] = (tetra *) myrealloc (aa->eventbins[j], sizeof (tetra) * allocsize);
2566 	  //memset(aa->eventbins[j] + aa->eventbinnum[j],0,sizeof(tetra)*(templ-aa->eventbinnum[j]));
2567 	  for(i=aa->eventbinnum[j];i < allocsize; i++)
2568 	    {
2569 	      aa->eventbins[j][i][0] = 0.F;
2570 	      aa->eventbins[j][i][1] = 0.F;
2571 	      aa->eventbins[j][i][2] = 0.F;
2572 	      aa->eventbins[j][i][3] = 0.F;
2573 	      aa->eventbins[j][i][4] = 0.F;
2574 	      aa->eventbins[j][i][5] = 0.F;
2575 	      aa->eventbins[j][i][6] = 0.F;
2576 	      aa->eventbins[j][i][7] = 0.F;
2577 	      aa->eventbins[j][i][8] = 0.F;
2578 	    }
2579 	  aa->eventbinnum[j] = allocsize;
2580 	  //debug_skyline(world,"increased eventbins in unpack-skyline");
2581 	}
2582     }
2583   // read time width of bins
2584   temp = (MYREAL) buffer[z++];
2585   if(temp - world->options->eventbinsize > EPSILON)
2586     error("problem with bins size transmission in unpack_skyline...");
2587   for(j=0; j< world->numpop2; j++)
2588     {
2589       for (i = 0; i < receive_eventbinnum[j]; i++)
2590 	{
2591 	  aa->eventbins[j][i][0] += (float) buffer[z++];// * invmaxrep;
2592 	  aa->eventbins[j][i][1] += (float) buffer[z++];// * invmaxrep;
2593 	  aa->eventbins[j][i][2] += (float) buffer[z++];// * invmaxrep;
2594 	  aa->eventbins[j][i][3] += (float) buffer[z++];// * invmaxrep;
2595 	  aa->eventbins[j][i][4] += (float) buffer[z++];// * invmaxrep;
2596 	  aa->eventbins[j][i][5] += (float) buffer[z++];// * invmaxrep;
2597 	}
2598     }
2599     //debug_skyline(world,"after unpack_skyline_buffer");
2600     myfree(receive_eventbinnum);
2601 }
2602 
2603 ///
2604 /// pack skyline into  buffer to
2605 /// ship to master
pack_skyline_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2606 long pack_skyline_buffer (MYREAL **buffer, world_fmt * world,
2607                      long locus, long maxrep, long numpop)
2608 {
2609   long j, i;
2610   long bufsize = world->numpop2 + 1;
2611   long z = 0L;
2612   mighistloci_fmt *aa;
2613 
2614   aa = &world->mighistloci[locus];
2615   //printf("Buffer in pack_skyline()=%f (%p)(%p)\n",(*buffer)[0], (*buffer), buffer);
2616   for (j = 0; j < world->numpop2; j++)
2617     {
2618       //      printf("%i> bufsize=%li eventbinnum=%li\n",myID, bufsize, aa->eventbinnum[j]);
2619       bufsize += 6 * aa->eventbinnum[j];
2620     }
2621   //  fprintf(stdout,"%i> bufsize in pack-skyline-buffer %li for locus %li\n", myID, bufsize, locus);
2622   //myfree(*buffer);
2623   (*buffer) = (MYREAL *) myrealloc ((*buffer), sizeof (MYREAL) * (bufsize));
2624   //(*buffer) = (MYREAL *) mycalloc(bufsize,sizeof (MYREAL));
2625   //  memset (*buffer, 0, sizeof (MYREAL) * (bufsize));
2626   // record how many bins in for each parameter
2627   for (j = 0; j < world->numpop2; j++)
2628     {
2629       (*buffer)[z++] = (MYREAL) aa->eventbinnum[j];
2630     }
2631   // time width of bins
2632   (*buffer)[z++] = (MYREAL) world->options->eventbinsize;
2633   for (j = 0; j < world->numpop2; j++)
2634     {
2635       for (i = 0; i < aa->eventbinnum[j]; i++)
2636 	{
2637 	  (*buffer)[z++] =  (MYREAL) aa->eventbins[j][i][0];
2638 	  (*buffer)[z++] =  (MYREAL) aa->eventbins[j][i][1];
2639 	  (*buffer)[z++] =  (MYREAL) aa->eventbins[j][i][2];
2640 	  (*buffer)[z++] =  (MYREAL) aa->eventbins[j][i][3];
2641 	  (*buffer)[z++] =  (MYREAL) aa->eventbins[j][i][4];
2642 	  (*buffer)[z++] =  (MYREAL) aa->eventbins[j][i][5];
2643 	}
2644     }
2645   if(bufsize < z)
2646     {
2647       fprintf(stderr,"error: bufsize=%li, z=%li\n", bufsize, z);
2648       error("pack_skyline_buffer: bufsize is too small");
2649     }
2650     return z;
2651 }
2652 
2653 
2654 ///
2655 /// unpack bayes parameters to fit into mpi_results_master()
2656 void
unpack_bayes_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)2657 unpack_bayes_buffer (MYREAL *buffer, world_fmt * world,
2658                        long locus, long maxrep, long numpop)
2659 {
2660   // this combines the single_bayes and hist_bayes buffer unpacking
2661   unpack_hist_bayes_buffer(buffer, world->bayes, world, locus);
2662 }
2663 
2664 ///
2665 /// pack bayes parameters to fit into mpi_results_worker()
2666 long
pack_bayes_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)2667 pack_bayes_buffer (MYREAL **buffer, world_fmt * world,
2668 		   long locus, long maxrep, long numpop)
2669 {
2670   long i;
2671   long bufsize;
2672   long z;
2673   long sizec       = sizeof(MYREAL);
2674   //  long numbins     = 0;
2675   bayes_fmt *bayes = world->bayes;
2676   long np2         = world->numpop2;
2677   long npp         = np2 + bayes->mu;
2678   bayeshistogram_fmt  *hist;
2679   // memory for pack_single_bayes_buffer_part()
2680   // locus and numparams + accept and trial of genealogy
2681   bufsize =  4;
2682   for(i=0; i < npp; i++)
2683     {
2684       if(bayes->map[i][1] != INVALID)
2685 	  {
2686 	    // per parameter
2687 	    //for each parameter acceptance and trial +2 // for each parameter
2688 	    bufsize += 2;
2689 	  }
2690     }
2691 
2692   // hist_bayes_buffer:
2693   // max buffer memory needed is (npp + 11*npp + (3 * npp * hist->bins[i])
2694   hist = &(world->bayes->histogram[locus]);
2695   for(i=0; i < npp; i++)
2696     {
2697       if(bayes->map[i][1] != INVALID &&	!world->options->has_bayesmdimfile)
2698 	  {
2699 	    //for each parameter
2700 	    //                   bins number          +1
2701 	    //                   datastore           +11
2702 	    //                   bins*3              +3*bins
2703 	    bufsize += 12;
2704 	    bufsize += 3 * hist->bins[i]; //set50, set95, result per bin
2705 	  }
2706     }
2707   bufsize += npp*npp;
2708   // pack_BF_buffer
2709   //   hmscale, hm    +2
2710   //   bf: number of heated chains +world->options->heated_chains
2711   bufsize += 2 + world->options->heated_chains;
2712   // Autoarchive, ESS buffer:     parameters           +2*(numpop2+loci)
2713   //                              genealogy            +2
2714   bufsize += 2*npp + 2;
2715   //test printf("%i> bufsize=%li (npp=%li, numbins=%li)\n",myID, bufsize, npp, numbins);
2716   (*buffer) = (MYREAL *) myrealloc(*buffer, sizeof(MYREAL) * bufsize);
2717   memset (*buffer, 0,sizec * bufsize);
2718 
2719   z = pack_single_bayes_buffer_part(buffer,world->bayes,world,locus);
2720   //printf("%i> z=%li (%li)\n",myID,z, 2 + 2 * (np2+1));
2721   if(!world->options->has_bayesmdimfile)
2722     z = pack_hist_bayes_buffer(buffer, hist, world, z);
2723   // BF material
2724   if(!world->data->skiploci[locus])
2725     {
2726       // fprintf(stderr,"%i> pack_result_buffer(): packed BF result locus=%li replicate %li\n",myID,locus, -1);
2727       z = pack_BF_buffer(buffer, z, locus, world);
2728       if(z > bufsize)
2729 	{
2730 	  fprintf(stderr,"%i> ERROR: allocated bufsize=%li is smaller than used bufsize=%li\n",myID, bufsize, z);
2731 	  error("buffer allocation overflowed");
2732 	}
2733     }
2734   // ESS material
2735   z = pack_ess_buffer(buffer, z, world);
2736   if(z > bufsize)
2737     {
2738       fprintf(stderr,"%i> ERROR: allocated bufsize=%li is smaller than used bufsize=%li\n",myID, bufsize, z);
2739       error("buffer allocation overflowed");
2740     }
2741   return z;
2742 }
2743 
2744 ///
2745 /// unpack the bayes histogram
unpack_hist_bayes_buffer(MYREAL * buffer,bayes_fmt * bayes,world_fmt * world,long locus)2746 void unpack_hist_bayes_buffer(MYREAL *buffer, bayes_fmt *bayes, world_fmt *world, long locus)
2747 {
2748     long                j, i;
2749     long                pa;
2750     long                z = 0;
2751     long                numbins = 0;
2752     long                pnum;
2753     long                tmp1, tmp2;
2754     long                tmplocus;
2755     bayeshistogram_fmt  *hist;
2756     long                total = 0;
2757     long                np2 = world->numpop2;
2758     long                npp = np2 + bayes->mu;
2759     long                npp11 = 11 * npp;
2760     //
2761     // begin unpack_single_bayes_buffer_part
2762     tmplocus = (long) buffer[z++];
2763     pnum = (long) buffer[z++];
2764 #ifdef DEBUG
2765     const MYREAL        rat = numcpu / (world->maxreplicate * world->loci);
2766     fprintf (stdout, "%i> unpack_hist_bayes_buffer() received pnum=%li, rat=%f\n", myID, pnum, rat);fflush(stdout);
2767 #endif
2768     if(tmplocus!=locus)
2769     {
2770         bayes->numparams=0;
2771         locus = tmplocus;
2772     }
2773 
2774 // Cesky Krumlov 2013    printf("%i> original accepted/trials:  [%li,%li,%li,%li,%li]/[%li,%li,%li,%li,%li]\n",
2775 // Cesky Krumlov 2013	   myID, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
2776 
2777     for (j = 0; j < np2; ++j)
2778     {
2779       if(bayes->map[j][1] != INVALID)
2780 	  {
2781 	    world->accept_archive[j] += (tmp1 = (long) buffer[z++]);//*rat);
2782 	    world->trials_archive[j] += (tmp2 = (long) buffer[z++]);//*rat);
2783 #ifdef DEBUG
2784 	    fprintf (stdout, "%i> received (acc %li) (trial %li) => (sumacc %li) (sumtrial %li)\n", myID, tmp1, tmp2, world->accept_archive[j], world->trials_archive[j]);
2785 #endif
2786 	  }
2787     }
2788     // genealogy
2789     world->accept_archive[j] += (tmp1 = (long) buffer[z++]);//*rat);
2790     world->trials_archive[j] += (tmp2 = (long) buffer[z++]);//*rat);
2791 #ifdef DEBUG
2792 	    fprintf (stdout, "%i> genealogy received (acc %li) (trial %li) => (sumacc %li) (sumtrial %li)\n", myID, tmp1, tmp2, world->accept_archive[j], world->trials_archive[j]);
2793 #endif
2794     // end unpack_single_bayes_buffer
2795     //
2796     if(!world->options->has_bayesmdimfile)
2797       {
2798 	bayes->histogram[locus].datastore = (MYREAL *) myrealloc(bayes->histogram[locus].datastore, sizeof(MYREAL) * (1+npp11));
2799 	hist = &(bayes->histogram[locus]);
2800 	hist->minima = hist->datastore;    // contains minimal values for each parameter
2801         hist->maxima = hist->datastore + npp;    // contains maximal values for each parameter
2802         hist->adjmaxima = hist->datastore + 2*npp;// holds maxima values used in histogram [are smaller than maxima]
2803 	hist->cred50l  = hist->datastore + 3*npp;    // holds 50%-credibility margins (<all lower values>,
2804 	hist->cred50u = hist->datastore + 4*npp;   //<all high values>)
2805 	hist->cred95l = hist->datastore + 5*npp;    // holds 95%-credibility margins (<all lower values>)
2806 	hist->cred95u = hist->datastore + 6*npp;   //<all high values>)
2807 	hist->modes = hist->datastore + 7*npp;    // holds 95%-credibility margins (<all lower values>, <all high values>)
2808 	hist->medians = hist->datastore + 8*npp;
2809 	hist->means = hist->datastore + 9*npp;
2810 	hist->stds = hist->datastore + 10*npp;
2811 
2812 
2813 	for(i = 0; i < npp; ++i)
2814 	  {
2815 	    if(bayes->map[i][1] != INVALID)
2816 	      {
2817 		hist->bins[i] = (long) buffer[z++];
2818 		total += hist->bins[i];
2819 	      }
2820 	  }
2821 	// this steps kills poor memory machines [setting results to floats may help a little]
2822 	hist->results = (float *) mycalloc(total * npp, sizeof(float));
2823 	hist->set95 = (char *) mycalloc(total * npp * 2 + 2, sizeof(char));
2824 	hist->set50 = hist->set95 + (total * npp + 1);
2825 
2826 	for(i = 0; i < npp; ++i)
2827 	  {
2828 	    if(bayes->map[i][1] != INVALID)
2829 	      {
2830 		for(j=0;j<11;j++)
2831 		  hist->datastore[11*i+j] = buffer[z++];
2832 	      }
2833 	  }
2834 	numbins = 0;
2835 	for(pa=0; pa < npp; pa++)
2836 	  {
2837 	    for(i=0;i<hist->bins[pa];i++)
2838 	      {
2839 		hist->set50[numbins + i] = (buffer[z++] < 1.0 ? '0' : '1');
2840 		hist->set95[numbins + i] = (buffer[z++] < 1.0 ? '0' : '1');
2841 		hist->results[numbins + i] = (float) buffer[z++];
2842 	      }
2843 	    numbins += hist->bins[pa];
2844 	    //
2845 	    // CHECK
2846 	    world->bayes->histtotal[locus * npp + pa] = hist->bins[pa];
2847 	    //
2848 	  }
2849 	if(hist->covariance==NULL)
2850 	  {
2851 	    doublevec2d(&hist->covariance,npp,npp);
2852 	  }
2853 	for(i=0;i<npp;i++)
2854 	  {
2855 	    for(j=0;j<npp;j++)
2856 	      {
2857 		hist->covariance[i][j] = buffer[z++];
2858 	      }
2859 	  }
2860       }
2861     // BF material
2862     z = unpack_BF_buffer(buffer, z, locus, world);
2863     // ESS material
2864     z = unpack_ess_buffer(buffer, z, world);
2865 }
2866 
2867 ///
2868 /// Bayes factor material buffer unpacker
unpack_BF_buffer(MYREAL * buffer,long start,long locus,world_fmt * world)2869 long unpack_BF_buffer(MYREAL *buffer, long start, long locus, world_fmt * world)
2870 {
2871   long i;
2872   long z = start;
2873   long hc = world->options->heated_chains;
2874   MYREAL              temp;
2875   MYREAL              *ttemp;
2876   MYREAL              *htemp;
2877 
2878   ttemp = calloc(3 * world->loci + world->options->heated_chains + 2, sizeof(MYREAL));
2879   htemp = ttemp + world->loci;
2880   //atemp = htemp + world->loci;
2881   //
2882   // harmonic mean calculation
2883   temp = buffer[z++]; //hmscale
2884   if(temp < world->hmscale[locus])
2885     {
2886       if(world->hm[locus]>0.0)
2887 	world->hm[locus] *= EXP(temp - world->hmscale[locus]);
2888       world->hmscale[locus] = temp;
2889       //      printf("%i> locus=%li hmscale=%f hm=%f temp=%f\n",myID,locus, world->hmscale[locus], world->hm[locus], temp);
2890     }
2891   htemp[locus] = temp;//hmscale store
2892   world->hm[locus] += EXP(-htemp[locus]+world->hmscale[locus]) * buffer[z++];
2893 
2894   // thermodynamic integration
2895   for(i=0;i < world->options->heated_chains; i++)
2896     {
2897       world->bf[locus * hc + i] +=  buffer[z++];
2898       //printf("%i> ****bf[%li + hc + %li]=%f\n",myID, locus, i, world->bf[locus*hc+i]);
2899     }
2900 
2901   myfree(ttemp);
2902   return z;
2903 }
2904 
unpack_ess_buffer(MYREAL * buffer,long start,world_fmt * world)2905 long unpack_ess_buffer(MYREAL *buffer, long start, world_fmt *world)
2906 {
2907   long pa;
2908   long z = start;
2909   static long n=1;
2910   long np2 = world->numpop2;
2911   long npp = world->numpop2 + ((long) world->bayes->mu) * world->loci;
2912 
2913   //  MYREAL              *atemp;
2914   #ifdef DEBUG
2915   const MYREAL        rat = (MYREAL) numcpu / (world->maxreplicate * world->loci);
2916   fprintf (stdout, "%i> unpack_ess_buffer() rat=%i / (%li * %li) = %f\n", myID, numcpu, world->maxreplicate, world->loci, rat);fflush(stdout);
2917 #endif
2918 
2919 
2920   //printf("\n%i> npp =%li\n",myID,npp);
2921   //printf("%i> mu  =%i\n",myID,(int) world->bayes->mu);
2922   //printf("%i> loci=%li\n",myID,world->loci);
2923   //printf("%i> unpack_ess_buffer ***unpacking BF[0]=%f hm=%f npp=%li in unpack_hist_bayes_buffer()\n", myID, world->bf[0], world->hm[0], npp);
2924   // unpacking autocorrelation and ess buffer
2925   for(pa=0; pa < np2; pa++)
2926     {
2927       if(world->bayes->map[pa][1] != INVALID)
2928 	world->auto_archive[pa] += (buffer[z++] - world->auto_archive[pa])/n;
2929     }
2930   if(world->bayes->mu)
2931     {
2932       for(pa=np2;pa<npp;pa++)
2933 	world->auto_archive[pa] += (buffer[z++] - world->auto_archive[pa])/n;
2934     }
2935   //genealogy
2936   world->auto_archive[pa] += (buffer[z++] - world->auto_archive[pa])/n;
2937   n++;
2938   //    printf("%i>>>>>> autoarchive %f\n",myID, world->auto_archive[0]);
2939   for(pa=0; pa < np2; pa++)
2940     {
2941       if(world->bayes->map[pa][1] != INVALID)
2942 	world->ess_archive[pa] += buffer[z++];// * rat;
2943     }
2944   if(world->bayes->mu)
2945     {
2946       for(pa=np2;pa<npp;pa++)
2947 	world->ess_archive[pa] += buffer[z++];// * rat;
2948     }
2949   //genealogy
2950   world->ess_archive[pa] += buffer[z++];// * rat;
2951   return z;
2952 }
2953 
2954 ///
2955 /// Bayes factor material buffer packer
pack_BF_buffer(MYREAL ** buffer,long start,long locus,world_fmt * world)2956 long pack_BF_buffer(MYREAL **buffer, long start, long locus, world_fmt * world)
2957 {
2958   // buffer memory needs are (2 + #heatedchains)
2959   long i;
2960   long z  = start;
2961   const long hc = world->options->heated_chains;
2962   (*buffer)[z++] = world->hmscale[locus];
2963   (*buffer)[z++] = world->hm[locus];
2964   for(i=0; i < hc; i++)
2965     {
2966       (*buffer)[z++] = world->bf[locus * hc + i];
2967     }
2968   //  printf("%i> locus=%li send hmscale=%f hm=%f\n",myID, locus,world->hmscale[locus],world->hm[locus]);
2969   return z;
2970 }
2971 
2972  /// packing autocorrelation and ess buffer
pack_ess_buffer(MYREAL ** buffer,long start,world_fmt * world)2973 long pack_ess_buffer(MYREAL **buffer, long start, world_fmt *world)
2974 {
2975   // buffer memory needs are (npp + 1) + (npp+1) (numpop2 + mu*loci)
2976   long i;
2977   long z = start;
2978   const long np2 = world->numpop2;
2979   const long npp = np2 + ((long) world->bayes->mu * world->loci);
2980   (*buffer) = (MYREAL *) myrealloc(*buffer, (start + ((npp + 1) + (npp+1) * npp))*sizeof(MYREAL));
2981   //printf("\n%i> npp =%li\n",myID,npp);
2982   //printf("%i> mu  =%li\n",myID,(long)world->bayes->mu);
2983   //printf("%i> loci=%li\n",myID,world->loci);
2984   //printf("%i>> pack_essbuffer: npp=%li autoarchive %f\n",myID, npp, world->auto_archive[0]);
2985   for(i=0;i<np2;i++)
2986     {
2987       if(world->bayes->map[i][1] != INVALID)
2988 	(*buffer)[z++] = world->auto_archive[i];
2989       //printf("%i> auto: %f\n",myID,world->auto_archive[i]);
2990     }
2991   //rates for each locus
2992   if(world->bayes->mu)
2993     {
2994       for(i=np2;i<npp;i++)
2995 	(*buffer)[z++] = world->auto_archive[i];
2996     }
2997   // genealogy
2998   (*buffer)[z++] = world->auto_archive[i];
2999   //printf("%i> auto G: %f\n",myID,world->auto_archive[i]);
3000   for(i=0;i<np2;i++)
3001     {
3002       if(world->bayes->map[i][1] != INVALID)
3003 	(*buffer)[z++] = world->ess_archive[i];
3004       //printf("%i> ess: %f\n",myID,world->ess_archive[i]);
3005     }
3006   if(world->bayes->mu)
3007     {
3008       for(i=np2;i<npp;i++)
3009 	(*buffer)[z++] = world->ess_archive[i];
3010     }
3011   //genealogy
3012   (*buffer)[z++] = world->ess_archive[i];
3013   //printf("%i> ess G: %f\n",myID,world->ess_archive[i]);
3014   return z;
3015 }
3016 
3017 ///
3018 /// pack the bayes histogram
pack_hist_bayes_buffer(MYREAL ** buffer,bayeshistogram_fmt * hist,world_fmt * world,long startposition)3019 long pack_hist_bayes_buffer(MYREAL **buffer, bayeshistogram_fmt *hist, world_fmt * world, long startposition)
3020 {
3021   // buffer memory needed is (npp + 11*npp + (3 * npp * hist->bins[i])
3022   long  j;
3023   long  i;
3024   long  npp     = world->numpop2 + world->bayes->mu;
3025   long  numbins = 0;
3026   long  z       = startposition;
3027   bayes_fmt *bayes = world->bayes;
3028 #ifdef DEBUG_MPI
3029     printf("%i> pack_hist_bayes_buffer: position=%li last value = %f numparams=%li npp=%li\n",myID, startposition, (z > 0) ? (*buffer)[startposition] : -9999., hist->numparam, npp);
3030 #endif
3031 
3032     for(i = 0; i < npp; ++i)
3033       {
3034 	if(bayes->map[i][1] != INVALID)
3035           {
3036 	    (*buffer)[z++] = (MYREAL) hist->bins[i];
3037 	  }
3038       }
3039     //    printf("%i> npp=%li, z=%li\n",myID,npp,z);
3040     // parameter and mu datastore
3041     for(i = 0; i < npp; ++i)
3042       {
3043 	if(bayes->map[i][1] != INVALID)
3044           {
3045 	    for(j=0;j<11;j++)
3046 	      (*buffer)[z++] = (MYREAL) hist->datastore[11*i+j];
3047 	  }
3048       }
3049     numbins = 0;
3050     //printf("%i> npp2+=%li, z=%li\n",myID,npp+npp11,z);
3051     for(i=0; i < npp; i++)
3052       {
3053 	// bins are zero for "c" and "0" parameters
3054 	//	if(bayes->map[i][1] != INVALID)
3055         //  {
3056 	    for(j=0;j<hist->bins[i];j++)
3057 	      {
3058 		(*buffer)[z++] = (MYREAL) (hist->set50[numbins + j]=='1' ? 1.0 : 0.0);
3059 		(*buffer)[z++] = (MYREAL) (hist->set95[numbins + j]=='1' ? 1.0 : 0.0);
3060 		(*buffer)[z++] = (MYREAL) hist->results[numbins + j];
3061 	      }
3062 	    //  }
3063 	numbins += hist->bins[i];
3064       }
3065     // pack covariance matrix
3066     for(i=0;i<npp;i++)
3067       {
3068 	for(j=0;j<npp;j++)
3069 	  {
3070 	    (*buffer)[z++] = hist->covariance[i][j];
3071 	  }
3072       }
3073 #ifdef DEBUG_MPI
3074     printf("%i> pack_hist_bayes_buffer: position=%li numbins=%li, last value = %f\n",myID, z, numbins, (*buffer)[z-1]);
3075 #endif
3076     return z;
3077 }
3078 
3079 
3080 ///
3081 /// unpack bayes parameter buffer, sent from replicant nodes AND lociworker to the master
3082 /// the values will be simply added to the bayes->params, no records of replicates will be done.
unpack_single_bayes_buffer(MYREAL * buffer,bayes_fmt * bayes,world_fmt * world,long locus)3083 long unpack_single_bayes_buffer(MYREAL *buffer,bayes_fmt * bayes, world_fmt * world,long locus)
3084 {
3085   long i, j;
3086   long z = 0 ;
3087   long pnum;
3088   long tmp1, tmp2;
3089   long tmplocus;
3090   long allocparams = world->bayes->allocparams;
3091   //    long oldallocparams = world->bayes->allocparams;
3092   long repstart;
3093   long repstop;
3094   long nn = 2+world->numpop2 + (world->bayes->mu) ;
3095   set_replicates (world, world->repkind, world->options->replicatenum,
3096 		  &repstart, &repstop);
3097   tmplocus = (long) buffer[z++];
3098   pnum = (long) buffer[z++];
3099   //fprintf (stdout, "%i> received locus=%li (oldlocus=%li) pnum=%li\n", myID, tmplocus, locus, pnum);
3100   if(tmplocus!=locus)
3101     world->bayes->numparams=0;
3102 
3103   pnum += world->bayes->numparams;
3104   if(pnum >=world->bayes->allocparams)
3105     {
3106       allocparams = pnum + 1;
3107       world->bayes->params = (MYREAL *) myrealloc(world->bayes->params,sizeof(MYREAL)*allocparams*nn);
3108     }
3109   world->bayes->allocparams = allocparams;
3110   for(i = world->bayes->numparams; i < pnum; ++i)
3111     {
3112       // the first element is log(p(d|g)p(g|param))
3113       (world->bayes->params+(nn*i))[0] = buffer[z++];
3114       // the second element is log(p(d|g))
3115       (world->bayes->params+(nn*i))[1] = buffer[z++];
3116       //fprintf (stdout, "%i> receive params line %li ", myID, i);
3117       for (j = 2; j < nn; ++j)
3118 	  {
3119 	    if(bayes->map[j-2][1] != INVALID)
3120 	      (world->bayes->params+(nn*i))[j] = buffer[z++];
3121 	    //fprintf (stdout, "%f ", (world->bayes->params+(nn*i + 1))[j]);
3122 	  }
3123       //fprintf (stdout, "\n");
3124     }
3125   world->bayes->numparams = pnum;
3126   // acceptance ratios are added to the ones we have already
3127   // parameter acceptances
3128 
3129 // Cesky Krumlov 2013    printf("%i> **original accepted/trials:  [%li,%li,%li,%li,%li]/[%li,%li,%li,%li,%li]\n",
3130 // Cesky Krumlov 2013	   myID, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
3131 
3132   for (j = 0; j < world->numpop2; ++j)
3133     {
3134       if(bayes->map[j][1] != INVALID)
3135 	{
3136 	  tmp1 = (long) buffer[z++];
3137 	  tmp2 = (long) buffer[z++];
3138 	  //Cesky Krumlov 2013
3139 world->accept_archive[j] += tmp1;
3140 	  //Cesky Krumlov 2013
3141 world->trials_archive[j] += tmp2;
3142 	}
3143     }
3144   // the last acceptance is the one for the genealogies
3145   tmp1 = (long) buffer[z++];
3146   tmp2 = (long) buffer[z++];
3147   //Cesky Krumlov 2013
3148 world->accept_archive[j] += tmp1;
3149   //Cesky Krumlov 2013
3150 world->trials_archive[j] += tmp2;
3151 // Cesky Krumlov 2013 printf("%i> **updated accepted/trials:  [%li,%li,%li,%li,%li]/[%li,%li,%li,%li,%li]\n",
3152 // Cesky Krumlov 2013	   myID, world->accept_archive[0],world->accept_archive[1],world->accept_archive[2],world->accept_archive[3],world->accept_archive[4],world->trials_archive[0],world->trials_archive[1],world->trials_archive[2],world->trials_archive[3],world->trials_archive[4]);
3153 
3154   //printf("%i> unpack_single_bayes_buffer(): buffer counter is at %li",myID, z);
3155   return z;
3156 }
3157 
3158 ///
3159 /// Pack bayes parameter buffer, sent from replicant nodes AND lociworker to the master
3160 /// the values will be simply added to the bayes->params, no records of specific replicates are kept.
pack_single_bayes_buffer(MYREAL ** buffer,bayes_fmt * bayes,world_fmt * world,long locus)3161 long pack_single_bayes_buffer(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus)
3162 {
3163     long i, j;
3164     long bufsize;
3165     long z = 0;
3166     //    long nump = world->numpop2 + 1;
3167     long nn = 2 + world->numpop2 + (world->bayes->mu) ;
3168     const long nng= world->numpop2 + world->loci * world->bayes->mu + 1;
3169     bufsize = 2 * (world->numpop2 + 1); //acceptance ratio: params + tree
3170     bufsize += 2; // loci + numparams
3171     bufsize += world->bayes->numparams * nn;
3172     //printf("%i> bufsize in pack_single_bayes_buffer()=%li\n",myID,bufsize);
3173     (*buffer) = (MYREAL *) myrealloc(*buffer, bufsize * sizeof(MYREAL));
3174     memset (*buffer, 0, bufsize * sizeof(MYREAL));
3175 #ifdef DEBUG_MPI
3176     fprintf(stdout, "%i>>>>>\n  buffersize=%li, numparams=%li\n>>>>\n",
3177     	    myID,
3178 	    bufsize,
3179 	    world->bayes->numparams
3180 	    );
3181     fflush(stdout);
3182 #endif
3183 
3184     (*buffer)[z++] = (MYREAL) locus;
3185     (*buffer)[z++] = (MYREAL) bayes->numparams;
3186 
3187     for(i = 0; i < world->bayes->numparams; ++i)
3188       {
3189 	//the first and second elements are logprob
3190 	(*buffer)[z++] = (bayes->params+(i*nn))[0];
3191 	(*buffer)[z++] = (bayes->params+(i*nn))[1];
3192 	for (j = 2; j < nn; ++j) //the first and second elements are logprob
3193 	  {
3194 	    if(bayes->map[j-2][1] != INVALID)
3195 	      (*buffer)[z++] = (bayes->params+(i*nn))[j];
3196 	  }
3197       }
3198     // for the parameters
3199     for (j = 0; j < world->numpop2; ++j)
3200       {
3201 	if(bayes->map[j][1] != INVALID)
3202 	  {
3203 	    (*buffer)[z++] =  (MYREAL) world->accept_archive[j];
3204 	    (*buffer)[z++] =  (MYREAL) world->trials_archive[j];
3205 	  }
3206       }
3207     // for the genealogy
3208     (*buffer)[z++] =  (MYREAL) world->accept_archive[j];
3209     (*buffer)[z++] =  (MYREAL) world->trials_archive[j];
3210 
3211     if(bufsize < z)
3212       error("buffer is too small in pack_single_bayes_buffer()\n");
3213     memset(world->accept_archive,0,sizeof(long)*2*nng);//Cesky Krumlov 2013
3214     return z;
3215 }
3216 ///
3217 /// Pack bayes parameter buffer, sent from replicant nodes AND lociworker to the master
3218 /// the acceptance and trial values will be simply added.
3219 
pack_single_bayes_buffer_part(MYREAL ** buffer,bayes_fmt * bayes,world_fmt * world,long locus)3220 long pack_single_bayes_buffer_part(MYREAL **buffer, bayes_fmt *bayes, world_fmt *world,long locus)
3221 {
3222     long j;
3223     long z = 0;
3224     const long nng= world->numpop2 + world->loci * world->bayes->mu + 1;
3225 
3226     (*buffer)[z++] = (MYREAL) locus;
3227     (*buffer)[z++] = (MYREAL) bayes->numparams;
3228 
3229     // parameters
3230     for (j = 0; j < world->numpop2; ++j)
3231     {
3232       if(bayes->map[j][1] != INVALID)
3233 	{
3234 	  (*buffer)[z++] = (MYREAL) world->accept_archive[j];
3235 	  (*buffer)[z++] = (MYREAL)  world->trials_archive[j];
3236 	}
3237     }
3238     // genealogy
3239     (*buffer)[z++] = (MYREAL) world->accept_archive[j];
3240     (*buffer)[z++] = (MYREAL)  world->trials_archive[j];
3241     memset(world->accept_archive,0,sizeof(long)* 2 * nng); //removes accept and trials archive
3242     return z;
3243 }
3244 
3245 ///
3246 /// unpack minimal statistic trees
3247 /// \todo there are differences between unpack_sumfile() and and read_savesum() this needs reconciliation
3248 void
unpack_sumfile_buffer(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop)3249 unpack_sumfile_buffer (MYREAL *buffer, world_fmt * world,
3250                        long locus, long maxrep, long numpop)
3251 {
3252     long replicate;
3253     timearchive_fmt **ta = world->atl;
3254     long z=0;
3255     for (replicate = 0; replicate < maxrep; replicate++)
3256     {
3257       unpack_single_sumfile_buffer (buffer, ta, world, locus, replicate, numpop,&z);
3258     }
3259 }
3260 
3261 
3262 
3263 ///
3264 /// unpack minimal statistic trees for a single replicate
3265 void
unpack_single_sumfile_buffer(MYREAL * buffer,timearchive_fmt ** ta,world_fmt * world,long locus,long replicate,long numpop,long * startz)3266 unpack_single_sumfile_buffer (MYREAL *buffer, timearchive_fmt **ta, world_fmt *world,
3267                               long locus, long replicate, long numpop, long *startz)
3268 {
3269     long i, j;
3270     long z = *startz;
3271     long numpop2plus = numpop * numpop + 2 * numpop;
3272     ta[replicate][locus].T = (long) buffer[z++];
3273     ta[replicate][locus].numpop = (long) buffer[z++];
3274     ta[replicate][locus].sumtips = (long) buffer[z++];
3275     ta[replicate][locus].param_like = buffer[z++];
3276     //fprintf(stderr,"%i> unpack_single_sumfile: start_z=%li, replicate=%li, locus=%li ", myID, *startz, replicate, locus);
3277     world->chainlikes[locus][replicate] = ta[replicate][locus].param_like;
3278     //fprintf(stdout,"%i> got sumfile locus %li and replicate %li (%li)\n",myID,locus,replicate, ta[replicate][locus].allocT);
3279     increase_timearchive (world, locus, ta[replicate][locus].T, world->numpop, replicate);
3280     for (i = 0; i < ta[replicate][locus].T; i++)
3281     {
3282       ta[replicate][locus].tl[i].copies = buffer[z++];
3283       ta[replicate][locus].tl[i].lcopies =  buffer[z++];
3284       for (j = 0; j < numpop2plus; j++)
3285         {
3286 	  ta[replicate][locus].tl[i].data[j] = buffer[z++];
3287         }
3288     }
3289     for (i = 0; i < world->numpop2; i++)
3290     {
3291       ta[replicate][locus].param[i] = buffer[z++];
3292       ta[replicate][locus].param0[i] = buffer[z++];
3293     }
3294     log_param0 (ta[replicate][locus].param0, ta[replicate][locus].lparam0, world->numpop2);
3295     ta[replicate][locus].trials = buffer[z++];
3296     ta[replicate][locus].normd = buffer[z++];
3297     *startz = z;
3298     //fprintf(stderr,"<%i> end_z=%li\n", myID, *startz);
3299 }
3300 
3301 
pack_single_sumfile_buffer(MYREAL ** buffer,long z,long * allocbufsize,world_fmt * world,long locus,long replicate,long numpop)3302 long pack_single_sumfile_buffer(MYREAL **buffer, long z, long *allocbufsize, world_fmt * world,
3303                                 long locus, long replicate, long numpop)
3304 {
3305     long i, j;
3306     long numpop2 = numpop * numpop;
3307     long numpop2plus = numpop2 + 2 * numpop;
3308     timearchive_fmt **ta = world->atl;
3309     long  bufsize = ta[replicate][locus].T;
3310     bufsize *= numpop2plus;
3311     bufsize += 2 * ta[replicate][locus].T;
3312     bufsize += 2 * numpop2;
3313     bufsize += 6;
3314     if(*allocbufsize < bufsize)
3315       {
3316 	(*buffer) = (MYREAL *) myrealloc (*buffer, bufsize * sizeof (MYREAL));
3317 	*allocbufsize = bufsize;
3318       }
3319     (*buffer)[z++] = (MYREAL) ta[replicate][locus].T;
3320     (*buffer)[z++] = (MYREAL) ta[replicate][locus].numpop;
3321     (*buffer)[z++] = (MYREAL) ta[replicate][locus].sumtips;
3322     (*buffer)[z++] = ta[replicate][locus].param_like;
3323 
3324     for (i = 0; i < ta[replicate][locus].T; i++)
3325       {
3326 	(*buffer)[z++] = (MYREAL) ta[replicate][locus].tl[i].copies;
3327 	(*buffer)[z++] = ta[replicate][locus].tl[i].lcopies;
3328 	for (j = 0; j < numpop2plus; j++)
3329 	  {
3330 	    (*buffer)[z++] = ta[replicate][locus].tl[i].data[j];
3331 	  }
3332       }
3333     for (i = 0; i < numpop2; i++)
3334       {
3335 	(*buffer)[z++] = ta[replicate][locus].param[i];
3336 	(*buffer)[z++] = ta[replicate][locus].param0[i];
3337       }
3338     (*buffer)[z++] = (MYREAL) ta[replicate][locus].trials;
3339     (*buffer)[z] = ta[replicate][locus].normd;
3340     z++;
3341     return z;
3342 }
3343 
3344 long
pack_sumfile_buffer(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop)3345 pack_sumfile_buffer (MYREAL **buffer, world_fmt * world,
3346                      long locus, long maxrep, long numpop)
3347 {
3348   long replicate;
3349   long bufsize = 1;
3350   long allocbufsize = 0;
3351   long z = 0;
3352   long numpop2 = numpop * numpop;
3353   long numpop2plus = numpop2 + 2 * numpop;
3354   timearchive_fmt **ta = world->atl;
3355 
3356   for (replicate = 0; replicate < maxrep; replicate++)
3357     {
3358       bufsize = ta[replicate][locus].T;
3359       bufsize *= numpop2plus;
3360       bufsize += 2 * ta[replicate][locus].T;
3361       bufsize += 2 * numpop2;
3362       bufsize += 6;
3363       allocbufsize += bufsize;
3364       //fprintf(stderr,"%i> bufsize=%li allocbufsize=%li\n", myID, bufsize, allocbufsize);
3365     }
3366   (*buffer) = (MYREAL *) myrealloc ((*buffer), allocbufsize * sizeof (MYREAL));
3367   memset(*buffer, 0, sizeof(MYREAL) * allocbufsize);
3368   for (replicate = 0; replicate < maxrep; replicate++)
3369     {
3370       z = pack_single_sumfile_buffer(buffer, z, &allocbufsize, world, locus, replicate, numpop);
3371       //fprintf(stderr,"%i> PACKED SUMFILE FOR REPLICATE %li z=%li locus=%li allocbufsize=%li\n", myID, replicate, z, locus, allocbufsize);
3372     }
3373     bufsize = z;
3374     if(bufsize > allocbufsize)
3375       {
3376 	fprintf(stderr,"bufsize=%li allocbufsize=%li\n", bufsize, allocbufsize);
3377         error("allocation exceeded in pack_sumfile_buffer");
3378       }
3379     return bufsize;
3380 }
3381 
3382 
3383 ///
3384 /// gather results (sumfiles, results, migrate-histogram, ..) from workers
3385 void
mpi_results_master(long sendtype,world_fmt * world,long maxreplicate,void (* unpack)(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop))3386 mpi_results_master (long sendtype, world_fmt * world, long maxreplicate,
3387                     void (*unpack) (MYREAL *buffer, world_fmt * world,
3388                                     long locus, long maxrep, long numpop))
3389 {
3390 #ifdef DEBUG_MPI
3391   long ii;
3392 #endif
3393     long numpop = world->numpop;
3394     long bufsize = 1;
3395     // maxreplicate > 1 ---> add 1 [this all needs careful checking]
3396     // MIGMPI_SUMFILE -----> 0
3397     // MIGMPI_HIST    -----> 0
3398     // still strange? long addon = (maxreplicate>1) ? 1 : ((sendtype == MIGMPI_SUMFILE) ||  (sendtype == MIGMPI_MIGHIST) )? 0 : ((world->loci == 1) ? 0 : 1) ;
3399     //long addon = (maxreplicate>1) ? 1 : ((world->loci > 1) ? 1 : 0) ;
3400     long addon = 1;
3401     //    boolean done = FALSE;
3402     MYREAL *buffer;
3403     MYREAL *temp;
3404     int worker;
3405     long z, tag, sender;
3406     MPI_Status status;
3407     long numelem = world->numpop2 + (world->options->gamma ? 1 : 0);
3408     long numelem2 = 2 * numelem;
3409 
3410     temp = (MYREAL *) mycalloc (numelem2 + 2, sizeof (MYREAL));
3411     buffer = (MYREAL *) mycalloc (bufsize+1, sizeof (MYREAL));
3412     temp[0] = (MYREAL)sendtype;
3413     temp[1] = (MYREAL) bufsize;
3414     for (worker = 1; worker < MIN (world->loci + addon, numcpu); worker++)
3415     {
3416       //printf("%i> MASTER requests information from n%i for locus %i\n",myID, worker, worker-1);
3417         MYMPISEND (temp, numelem2 + 2, mpisizeof, worker, worker, comm_world);
3418     }
3419     z = 0;
3420     while (z < world->loci)
3421     {
3422         MYMPIRECV (&bufsize, ONE, MPI_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG,
3423                   comm_world, &status);
3424         buffer = (MYREAL *) myrealloc (buffer, sizeof (MYREAL) * (bufsize + 1));
3425         memset (buffer, 0, sizeof (MYREAL) * (bufsize + 1));
3426         sender = status.MPI_SOURCE;
3427         tag = status.MPI_TAG;
3428 #ifdef DEBUG_MPI
3429 	fprintf(stdout, "%i> z=%li worker=%li bufsize=%li -------------------------------------\n",myID, z, sender, bufsize);
3430 #endif
3431         MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
3432 #ifdef DEBUG_MPI
3433 	fprintf(stdout,"%i>------------------------------------\nbuffer=",myID);
3434 	for(ii=0;ii<bufsize;ii++)
3435 	  fprintf(stdout," %f", buffer[ii]);
3436 	fprintf(stdout,"\n%i>-------------------------------------\n",myID);
3437 #endif
3438         (*unpack) (buffer, world, tag - 1, maxreplicate, numpop);
3439 	//fprintf(stdout,"%i> unpacked bufsize=%li from node %li\n",myID,bufsize,sender); fflush(stdout);
3440         z++;
3441     }
3442     myfree(buffer);
3443     myfree(temp);
3444 }
3445 
3446 void
mpi_results_worker(long bufs,world_fmt * world,long maxrep,long (* pack)(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop))3447 mpi_results_worker (long bufs, world_fmt * world, long maxrep,
3448                     long (*pack) (MYREAL **buffer, world_fmt * world,
3449                                   long locus, long maxrep, long numpop))
3450 {
3451     long numpop = world->numpop;
3452     long ww, locus;
3453     MYREAL *allbuffer;
3454     long bufsize = 1;
3455     allbuffer = (MYREAL *) mycalloc(1, sizeof(MYREAL));
3456     //fprintf(stdout,"%i> locidone=%i\n",myID, locidone); fflush(stdout);
3457     for (ww = 0; ww < locidone; ww++)
3458     {
3459       locus = world->who[ww];
3460       bufsize = (*pack) (&allbuffer, world, locus, maxrep, numpop);
3461 #ifdef DEBUG_MPI
3462       fprintf(stdout,"%i> locus=%li after pack bufsize=%li\n",myID, locus, bufsize); fflush(stdout);
3463 #endif
3464       MYMPISEND (&bufsize, ONE, MPI_LONG, MASTER, (int) (locus + 1), comm_world);
3465       //fprintf(stdout,"%i> sending results from locus=%li using bufsize=%li to master \n",myID, locus, bufsize); fflush(stdout);
3466       MYMPISEND (allbuffer, bufsize, mpisizeof, MASTER, (int) (locus + 1), comm_world);
3467     }
3468     myfree(allbuffer);
3469 }
3470 
3471 void
mpi_broadcast_results(world_fmt * world,long loci,long (* pack)(MYREAL ** buffer,world_fmt * world,long locus,long maxrep,long numpop),void (* unpack)(MYREAL * buffer,world_fmt * world,long locus,long maxrep,long numpop))3472 mpi_broadcast_results (world_fmt * world, long loci,
3473                        long (*pack) (MYREAL **buffer, world_fmt * world,
3474                                      long locus, long maxrep, long numpop),
3475                        void (*unpack) (MYREAL *buffer, world_fmt * world,
3476                                        long locus, long maxrep, long numpop))
3477 {
3478     long locus;
3479     // long addon = (world->loci == 1) 0 : 1;
3480     long bufsize=1;
3481 #ifdef DEBUG_MPI
3482     char nowstr[STRSIZE];
3483 #endif
3484     MYREAL *allbuffer = NULL;// = &world->buffer;
3485 
3486     long maxreplicate = (world->options->replicate
3487                          && world->options->replicatenum >
3488                          0) ? world->options->replicatenum : 1;
3489     allbuffer = (MYREAL *) mycalloc (1, sizeof (MYREAL));
3490 #ifdef DEBUG_MPI
3491     get_time (nowstr, "%H:%M:%S");
3492     if(world->options->progress)
3493       fprintf(stdout, "%i> Redistributing the data\nResult parts [Time is %s]\n",myID, nowstr);
3494 #endif
3495     for (locus = 0; locus < loci; locus++)
3496     {
3497         if (myID == MASTER)
3498         {
3499             bufsize =(*pack) (&allbuffer, world, locus, maxreplicate,
3500                               world->numpop);
3501             MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
3502             MYMPIBCAST (allbuffer, bufsize, mpisizeof, MASTER, comm_world);
3503 #ifdef DEBUG_MPI
3504             printf("%i> Locus %li results sent\n",myID, locus);
3505 #endif
3506         }
3507         else
3508         {
3509             MYMPIBCAST (&bufsize, 1, MPI_LONG, MASTER, comm_world);
3510             allbuffer = (MYREAL *) myrealloc (allbuffer, sizeof (MYREAL) * bufsize + 1);
3511             MYMPIBCAST (allbuffer, bufsize, mpisizeof, MASTER, comm_world);
3512             (*unpack)(allbuffer, world, locus, maxreplicate,
3513                       world->numpop);
3514 #ifdef DEBUG_MPI
3515             printf("%i> Locus %li results received\n",myID, locus);
3516 #endif
3517         }
3518 	//myfree(allbuffer);
3519 	//allbuffer=NULL;
3520 	//memset (allbuffer, 0, sizeof (char) * bufsize);
3521     }
3522     MYMPIBARRIER(comm_world);
3523     myfree(allbuffer);
3524 }
3525 
3526 // slownet profiler
3527 //
3528 #ifdef SLOWNET
3529 
3530 ///
3531 /// orchestrates and prints the profile likelihoods using MPI
3532 /// when the slownet option for MPI work is chosen
3533 void
mpi_profiles_master(world_fmt * world,long nparam,int * profilewho)3534 mpi_profiles_master (world_fmt * world, long nparam, int *profilewho)
3535 {
3536 
3537   boolean done;
3538   int     tag;
3539   int     sender   = 0;
3540 
3541 #ifdef PRETTY
3542   long    location;
3543 #endif
3544   long    pnum;
3545   long    pdone;
3546   long    bufsize;
3547   long    quantsize;
3548   long    failuresize;
3549   long    tempstrsize=STRSIZE;
3550   long    temp[4];
3551   long    numsent  = 0;
3552 
3553   char   *tempstr;
3554   char   *leadstr;
3555   char  **buffer = &world->buffer;
3556 
3557   FILE   *outfile = world->outfile;
3558 
3559   MPI_Request *irequests;
3560   MPI_Status  status;
3561   MPI_Status  *istatus;
3562   long *profilelist;
3563   long    minnodes;
3564   long newnparam = nparam;
3565   long parameternum;
3566   tempstr = (char*) mycalloc(tempstrsize,sizeof(char));
3567   leadstr = (char*) mycalloc(STRSIZE,sizeof(char));
3568   profilelist = (long*) mycalloc(world->numpop2+1,sizeof(long));
3569   newnparam = choose_profile_parameters (world, profilelist);
3570   minnodes = MIN (newnparam, numcpu - 1);
3571   irequests = (MPI_Request *) mycalloc(minnodes,sizeof(MPI_Request));
3572   istatus = (MPI_Status *) mycalloc(minnodes,sizeof(MPI_Status));
3573 
3574   for (pnum = 0; pnum < minnodes; pnum++)
3575     {
3576       parameternum = profilelist[pnum];
3577       MYMPIISEND (&parameternum, 1, MPI_LONG, (MYINT) (pnum + 1), (MYINT) (pnum + 1), comm_world,&irequests[numsent]);
3578       //fprintf(stdout,"%i>>>>> sent parameter number %li to node %li with tag %li\n",myID,pnum,pnum+1,pnum+1);
3579         numsent++;
3580     }
3581     MYMPIWAITALL(minnodes,irequests, istatus);
3582 
3583     for (pnum = 0; pnum < newnparam; pnum++)
3584     {
3585         done = FALSE;
3586         while(!done)
3587         {
3588             MYMPIRECV (leadstr, STRSIZE, MPI_CHAR, (MYINT) MPI_ANY_SOURCE, (MYINT) MPI_ANY_TAG, comm_world, &status);
3589             sender = status.MPI_SOURCE;
3590             tag = status.MPI_TAG;
3591             switch(leadstr[0])
3592             {
3593             case 'M':
3594 	      tempstrsize = atol(leadstr+1);
3595 	      tempstr = (char*) realloc(tempstr,sizeof(char)*(tempstrsize+1));
3596 	      memset(tempstr,0,sizeof(char)*(tempstrsize+1));
3597 	      //	      fprintf(stdout,"%i> ready to receive %li chars\n",myID,tempstrsize);
3598 	      MYMPIRECV (tempstr, tempstrsize, MPI_CHAR, (MYINT) sender, (MYINT) tag,
3599 			 comm_world, &status);
3600 	      handle_message(tempstr,sender, world);
3601 	      break;
3602             case 'P':
3603 	      //fprintf(stdout,"%i> ready to receive 4 longs\n",myID);
3604                 MYMPIRECV (temp, FOUR, MPI_LONG, (MYINT) sender, (MYINT) MPI_ANY_TAG,
3605                            comm_world, &status);
3606                 pdone = temp[0];
3607                 bufsize = temp[1];
3608                 quantsize = temp[2];
3609                 failuresize = temp[3];
3610                 //fprintf(stdout,"%i> ++++++++++++ bufsize=%li quantsize=%li failuresize=%li from sender %i\n",
3611                 //        myID,bufsize,quantsize,failuresize,sender);
3612                 profilewho[pdone] = sender;
3613                 *buffer =
3614                     (char *) myrealloc (*buffer, sizeof (char) * (bufsize + quantsize + failuresize + 1));
3615                 //memset (*buffer, 0, sizeof (char) * (bufsize + quantsize + failuresize + 1));
3616                 MYMPIRECV (*buffer, bufsize + quantsize + failuresize, MPI_CHAR, (MYINT) sender, (MYINT) tag,
3617                           comm_world, &status);
3618                 //fprintf(stdout,"@%s@\n\n@%s@\n\n\n",*buffer+bufsize,*buffer+bufsize+quantsize);
3619                 //fprintf(stdout,"########################\n# %i> buf %li from %i \n########################\n",
3620                 //myID, (long) (long) strlen(*buffer)+1, sender);
3621                 if(world->options->printprofsummary)
3622                 {
3623                     unpack_quantile ((*buffer) + bufsize, world->quantiles[pdone],
3624                                      GRIDSIZE);
3625                     unpack_failed_percentiles ((*buffer) + bufsize + quantsize, world->percentile_failed[pdone],
3626                                      GRIDSIZE);
3627 //                    fprintf(stdout,"%i>failed=%i %i %i %i %i %i %i\n",myID,world->percentile_failed[pdone][0]
3628   //                          ,world->percentile_failed[pdone][1]
3629     //                        ,world->percentile_failed[pdone][2]
3630       //                      ,world->percentile_failed[pdone][3]
3631         //                    ,world->percentile_failed[pdone][4]
3632           //                  ,world->percentile_failed[pdone][5]
3633             //                ,world->percentile_failed[pdone][6]);
3634                     memset ((*buffer) + bufsize , 0, sizeof (char) * (quantsize + failuresize));
3635                 }
3636                 // print profile table that is in the first part of the buffer
3637                 fprintf (outfile, "%s\n\n", *buffer);
3638 #ifdef PRETTY
3639 		location = strlen(*buffer);
3640 		pdf_print_profile_table(55.0f, &page_height, world->options->profilemethod, *buffer + location + 1, world);
3641 #endif
3642                 done=TRUE;
3643                 break;
3644             default:
3645 	      fprintf(stderr,"%i> message=%s\n%i> sender=%i tag=%i\n",myID,tempstr, myID,status.MPI_SOURCE,status.MPI_TAG);
3646                 MPI_Finalize();
3647                 error("DIED because of wrong message from worker");
3648                 break;
3649             }
3650         }
3651         if (numsent < newnparam)
3652         {
3653 	  parameternum = profilelist[numsent];
3654 	  MYMPISEND (&parameternum, ONE, MPI_LONG, (MYINT) sender, (MYINT) (numsent + 1), comm_world);
3655 	  numsent++;
3656         }
3657         else
3658         {
3659             // stop worker because there is nothing to do anymore
3660             MYMPISEND (&nparam, ONE, MPI_LONG, sender, 0, comm_world); //end of parameter list
3661         }
3662     }
3663     // stop workers that did nothing for profiles
3664     for (sender = MIN (newnparam, numcpu - 1) + 1; sender < numcpu ; sender++)
3665     {
3666         // stop all nodes to wait for profiles
3667         MYMPISEND (&newnparam, ONE, MPI_LONG, sender, 0, comm_world);
3668     }
3669     myfree(tempstr);
3670     myfree(leadstr);
3671     myfree(istatus);
3672     myfree(irequests);
3673     myfree(profilelist);
3674 }
3675 
3676 void
mpi_profiles_worker(world_fmt * world,long * gmaxptr)3677 mpi_profiles_worker (world_fmt * world, long *gmaxptr)
3678 {
3679     boolean done = FALSE;
3680     long pnum;
3681     long temp[4];
3682     char *tempstr;
3683     char *quantilebuffer;
3684     char *failedbuffer;
3685     MPI_Status status;
3686     quantilebuffer = (char *) mycalloc (ONE, sizeof (char));
3687     failedbuffer = (char *) mycalloc (ONE, sizeof (char));
3688     tempstr = (char *) mycalloc (STRSIZE, sizeof (char));
3689     while (!done)
3690     {
3691         //fprintf(stdout,"%i> before receive of parameter number\n",myID);
3692         MYMPIRECV (&pnum, 1, MPI_LONG, (MYINT) MASTER, (MYINT) MPI_ANY_TAG, comm_world, &status);
3693         //fprintf(stdout,"%i> RECEIVED parameter number %li from %i with tag %i\n",myID,pnum,status.MPI_SOURCE,status.MPI_TAG);
3694 
3695         if (status.MPI_TAG != 0) //stop condition
3696         {
3697             // fills world->buffer with profile information
3698             print_profile_likelihood_driver (pnum, world, gmaxptr);
3699             temp[0] = pnum;
3700             temp[1] = (long) strlen (world->buffer);
3701 #ifdef PRETTY
3702 	    // the PDF table is also sent in addition to the formatted ascii table
3703 	    temp[1] = (long) strlen(world->buffer + temp[1] + 1) + temp[1] + 1;
3704 #endif
3705             if(world->options->printprofsummary)
3706             {
3707                 temp[2] = pack_quantile (&quantilebuffer, world->quantiles[pnum], GRIDSIZE);
3708                 world->buffer =     (char *) myrealloc (world->buffer,
3709                                                         sizeof (char) * (temp[1] + temp[2] + 1));
3710                 sprintf(world->buffer + temp[1], "%s", quantilebuffer);
3711                 temp[3] = pack_failed_percentiles (&failedbuffer, world->percentile_failed[pnum], GRIDSIZE);
3712                 world->buffer =     (char *) myrealloc (world->buffer,
3713                                                         sizeof (char) * (temp[1] + temp[2] + temp[3] + 1));
3714                 sprintf(world->buffer+temp[1]+temp[2], "%s", failedbuffer);
3715             }
3716             else
3717             {
3718                 temp[2] = 0;
3719                 temp[3] = 0;
3720             }
3721             sprintf(tempstr,"P%li", temp[1]);
3722             //tempstr[0]='P';
3723             MYMPISEND (tempstr, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) pnum + 1, comm_world);
3724             MYMPISEND (temp, FOUR, MPI_LONG, (MYINT) MASTER, (MYINT) pnum + 1, comm_world);
3725             MYMPISEND (world->buffer, temp[1] + temp[2] + temp[3], MPI_CHAR, (MYINT) MASTER, (MYINT)
3726                       pnum + 1, comm_world);
3727             world->profilewho[profiledone++] = pnum;
3728         }
3729         else
3730         {
3731             done = TRUE;
3732         }
3733     }
3734     myfree(tempstr);
3735     myfree(quantilebuffer);
3736     myfree(failedbuffer);
3737 }
3738 
3739 long
pack_quantile(char ** buffer,quantile_fmt quant,long n)3740 pack_quantile (char **buffer, quantile_fmt quant, long n)
3741 {
3742     long i;
3743     char fp[LONGLINESIZE];
3744     long bufsize = LONGLINESIZE;
3745     *buffer = (char *) myrealloc (*buffer, sizeof (char) * bufsize);
3746     sprintf (*buffer, "QUANTILEBUFFER:\n %s\n", quant.name);
3747     for (i = 0; i < n; i++)
3748     {
3749         bufsize += 1 + sprintf (fp, "%20.20f\n", quant.param[i]);
3750         *buffer = (char *) myrealloc (*buffer, sizeof (char) * bufsize);
3751         strcat (*buffer, fp);
3752     }
3753     bufsize = (long) strlen(*buffer);
3754     return bufsize;
3755 }
3756 
3757 void
unpack_quantile(char * buffer,quantile_fmt quant,long n)3758 unpack_quantile (char *buffer, quantile_fmt quant, long n)
3759 {
3760     long i;
3761     char *input;
3762     char *buf = buffer;
3763     input = (char*) mycalloc(LONGLINESIZE,sizeof(char));
3764     sgets (input, LONGLINESIZE, &buf);
3765     sgets (input, LONGLINESIZE, &buf);
3766     strcpy (quant.name, input);
3767     for (i = 0; i < n; i++)
3768     {
3769         sgets (input, LONGLINESIZE, &buf);
3770         quant.param[i] = atof (input);
3771     }
3772     myfree(input);
3773 }
3774 
3775 ///
3776 /// pack notice of failure of convergence to the profile likelihood precentiles
3777 /// this assume that n is never bigger than LONGLINESIZE, a safe assumption
3778 /// n is the number of grid points in the profile calculation, currently set to 9
3779 /// (May 19 2004), changing this number will cause large ripple effects. but see
3780 /// under profile_max_precentile()
3781 long
pack_failed_percentiles(char ** buffer,boolean * failed,long n)3782 pack_failed_percentiles (char **buffer, boolean *failed, long n)
3783 {
3784     long i;
3785     char fp[LONGLINESIZE];
3786     long bufsize = n + ONE;
3787     *buffer = (char *) myrealloc (*buffer, sizeof (char) * bufsize);
3788     memset(*buffer,0,sizeof(char)*bufsize);
3789     for (i = 0; i < n; i++)
3790         fp[i] =  failed[i] ? '1' : '0' ;
3791     fp[i]='\0';
3792     strcat (*buffer, fp);
3793     return bufsize;
3794 }
3795 
3796 ///
3797 /// unpack notice of failure of convergence to the profile likelihood precentiles
3798 /// this assume that n is never bigger than LONGLINESIZE, a safe assumption
3799 void
unpack_failed_percentiles(char * buffer,boolean * failed,long n)3800 unpack_failed_percentiles (char *buffer, boolean *failed, long n)
3801 {
3802     long i;
3803     char *input;
3804     char *buf = buffer;
3805     input = (char*) mycalloc(LONGLINESIZE,sizeof(char));
3806     sgets (input, LONGLINESIZE, &buf);
3807     //fprintf(stdout,"@%s@\n",input);
3808     for (i = 0; i < n; i++)
3809     {
3810         failed[i] = (input[i] == '1');
3811     //    fprintf(stdout,"@%i\n",(int) failed[i]);
3812     }
3813     myfree(input);
3814 }
3815 
3816 #endif
3817 
3818 /*
3819 // send the data over all loci/replicates to all nodes
3820 // including the master node, so that all nodes can then
3821 // start calculating profiles [see calc_profiles()]
3822 //
3823 void distribute_locidata(world_fmt *world)
3824 {
3825   char *buffer;
3826   pack_loci_data(world, &buffer);
3827   MPI_allgather(buffer);
3828   unpack_loci_data(buffer, world);
3829   myfree(buffer);
3830 }
3831 
3832 void pack_loci_data(world_fmt *world, char **buffer)
3833 {
3834   long replicates = world->options->repl
3835   *buffer = myrealloc(*buffer,LONGLINESIZE);
3836   hits = sscanf (input, "%li %li %li %li %li", &world->loci, &world->numpop, &world->numpop2, &tmp, &replicates);
3837 }
3838 */
3839 
3840 // necessary for analyzing old sumfiles using MPI
3841 //
3842 // master is reusing  mpi_runloci_master()
3843 void
assignloci_worker(world_fmt * world,option_fmt * options,long * Gmax)3844 assignloci_worker (world_fmt * world, option_fmt *options, long * Gmax)
3845 {
3846     boolean done = FALSE;
3847     long locus;
3848     MPI_Status status;
3849     long * twolongs;
3850     char *locusstring;
3851     twolongs = (long *) mycalloc(TWO,sizeof(long));
3852     locusstring = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
3853 
3854     world->options->progress=FALSE;
3855     options->progress=FALSE;
3856 
3857     while (!done)
3858     {
3859         MYMPIRECV (twolongs, TWO, MPI_LONG, (MYINT) MASTER, (MYINT) MPI_ANY_TAG,
3860 		   comm_world, &status); //from mpi_runloci_master() around line migrate_mpi.c:163
3861         if (status.MPI_TAG != 0) //stop condition
3862         {
3863 	  locus = twolongs[0];
3864 	  sprintf(locusstring,"R%li",locus);
3865 #ifdef DEBUG_MPI
3866 	  printf("%i>>>>>> received locus %li in assignloci_worker{}\n",myID,locus);
3867 	  swap_atl (locus, locidone, world);
3868 	  printf("%i>>>>>> will send locus %li (%s) in assignloci_worker{}\n",myID,locus,locusstring);
3869 #else
3870 	  swap_atl (locus, locidone, world);
3871 #endif
3872 	  MYMPISEND (locusstring, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) locus + 1, comm_world);
3873 	  /* we want to know what locus we worked for
3874 	     - to control the work sent by master
3875 	     - to use in setup_parameter0() [combroyden2.c] */
3876 	  world->who[locidone++] = locus;
3877 	  //
3878 	  if (options->replicate && options->replicatenum > 0)
3879             {
3880 	      world->locus = locus;
3881 	      world->repkind = MULTIPLERUN;
3882 #ifdef LONGSUM
3883 	      change_longsum_times (world);   //multi run replicates
3884 #endif         /*LONGSUM*/
3885 	      if (!options->bayes_infer)
3886 		{
3887 		  (void) estimateParameter (options->replicatenum, *Gmax, world,
3888 					    options, world->cov[locus], 1, 'l',
3889 					    SINGLELOCUS, world->repkind);
3890 		}
3891             }
3892 
3893 	  //
3894         }
3895         else
3896 	  {
3897             done = TRUE;
3898 	    assign_worker_cleanup();
3899 #ifdef DEBUG_MPI
3900 	    fprintf(stdout,"%i> STOP: received stop from %i\n",myID, status.MPI_SOURCE);
3901 #endif
3902 	  }
3903     }
3904     myfree(locusstring);
3905 }
3906 
3907 void
swap_atl(long from,long to,world_fmt * world)3908 swap_atl (long from, long to, world_fmt * world)
3909 {
3910     long r;
3911     timearchive_fmt *tmp;
3912     for (r = 0; r < world->options->replicatenum; r++)
3913     {
3914         tmp = &world->atl[r][to];
3915         world->atl[r][to] = world->atl[r][from];
3916         world->atl[r][from] = *tmp;
3917     }
3918 }
3919 
3920 
3921 #ifdef SLOWNET
3922 void
setup_parameter0_slowmpi(world_fmt * world,nr_fmt * nr,long repkind,long repstart,long repstop,long loci,long kind,boolean multilocus)3923 setup_parameter0_slowmpi (world_fmt * world, nr_fmt * nr, long repkind,
3924                           long repstart, long repstop, long loci, long kind,
3925                           boolean multilocus)
3926 {
3927     long locus, r;
3928     if (myID != MASTER)
3929     {
3930         if (multilocus)
3931         {
3932             for (locus = 0; locus < loci; locus++)
3933             {
3934                 if (repkind == SINGLECHAIN)
3935                 {
3936                     for (r = repstart; r < repstop; r++)
3937                         create_apg0 (nr->apg0[r][locus], nr,
3938                                      &world->atl[r][locus], locus);
3939                 }
3940                 else
3941                 {
3942 //                    if (kind != PROFILE)
3943 //                    {
3944                         for (r = repstart; r < repstop; r++)
3945                             create_apg0 (nr->apg0[r][locus], nr,
3946                                          &world->atl[r][locus], locus);
3947                         interpolate_like (nr, locus);
3948 //                    }
3949 //                    else
3950 //                    {
3951                         for (r = repstart; r < repstop; r++)
3952                             create_multiapg0 (nr->apg0[r][locus], nr, r, locus);
3953 //                    }
3954                 }
3955             }
3956         }
3957         else   //single locus
3958         {
3959             if (repkind == SINGLECHAIN)
3960             {
3961                 for (r = repstart; r < repstop; r++)
3962                     create_apg0 (nr->apg0[r][world->locus], nr,
3963                                  &world->atl[r][world->locus], world->locus);
3964             }
3965             else
3966             {
3967 //                if (kind != PROFILE)
3968 //                {
3969                     for (r = repstart; r < repstop; r++)
3970                         create_apg0 (nr->apg0[r][world->locus], nr,
3971                                      &world->atl[r][world->locus], world->locus);
3972                     interpolate_like (nr, world->locus);
3973 //                }
3974                 for (r = repstart; r < repstop; r++)
3975                     create_multiapg0 (nr->apg0[r][world->locus], nr, r,
3976                                       world->locus);
3977             }
3978         }
3979     }
3980 }
3981 #endif
3982 
3983 void
setup_parameter0_mpi(world_fmt * world,nr_fmt * nr,long repkind,long repstart,long repstop,long loci,long kind,boolean multilocus)3984 setup_parameter0_mpi (world_fmt * world, nr_fmt * nr, long repkind,
3985                       long repstart, long repstop, long loci, long kind,
3986                       boolean multilocus)
3987 {
3988     long locus, r;
3989     long ll;
3990     if (myID != MASTER)
3991     {
3992         if (multilocus)
3993         {
3994             for (ll = 0; ll < locidone; ll++)
3995             {
3996                 locus = world->locus = world->who[ll];
3997                 if (repkind == SINGLECHAIN)
3998                 {
3999                     for (r = repstart; r < repstop; r++)
4000                         create_apg0 (nr->apg0[r][locus], nr,
4001                                      &world->atl[r][locus], locus);
4002                 }
4003                 else
4004                 {
4005 //                    if (kind != PROFILE)
4006 //                    {
4007                         for (r = repstart; r < repstop; r++)
4008                             create_apg0 (nr->apg0[r][locus], nr,
4009                                          &world->atl[r][locus], locus);
4010                         interpolate_like (nr, locus);
4011 //                    }
4012 //                    else
4013 //                    {
4014                         for (r = repstart; r < repstop; r++)
4015                             create_multiapg0 (nr->apg0[r][locus], nr, r, locus);
4016 //                    }
4017                 }
4018             }
4019         }
4020         else   //single locus
4021         {
4022             if (repkind == SINGLECHAIN)
4023             {
4024                 for (r = repstart; r < repstop; r++)
4025                     create_apg0 (nr->apg0[r][world->locus], nr,
4026                                  &world->atl[r][world->locus], world->locus);
4027             }
4028             else
4029             {
4030 //                if (kind != PROFILE)
4031 //                {
4032                     for (r = repstart; r < repstop; r++)
4033                         create_apg0 (nr->apg0[r][world->locus], nr,
4034                                      &world->atl[r][world->locus], world->locus);
4035                     interpolate_like (nr, world->locus);
4036 //                }
4037                 for (r = repstart; r < repstop; r++)
4038                     create_multiapg0 (nr->apg0[r][world->locus], nr, r,
4039                                       world->locus);
4040             }
4041         }
4042     }
4043 }
4044 
4045 ///
4046 /// checks whether a node is already in the mpistack list
in_mpistack(int sender,world_fmt * world)4047 boolean in_mpistack(int sender, world_fmt *world)
4048 {
4049   long i;
4050   for (i=0;i < world->mpistacknum; i++)
4051     {
4052       if((world->mpistack[i]) == sender)
4053 	return TRUE;
4054     }
4055   return FALSE;
4056 }
4057 
handle_replication(int sender,int tag,char * tempstr,world_fmt * world)4058 void handle_replication(int sender,int tag,char *tempstr, world_fmt *world)
4059 {
4060   //long  pos=0;
4061   long i;
4062   long * temp;
4063   int realsender;
4064   long locus;
4065   long replicate;
4066   int replicator;
4067   //  boolean from_locus_sender = (sender <= world->loci);
4068   //  boolean from_replicator = (sender > world-> loci);
4069   sscanf(tempstr+1,"%i%li%li", &realsender, &locus, &replicate);
4070   //if(from_replicator)
4071   //  warning("these guys should not send to here");
4072   temp = (long *) mycalloc(3, sizeof(long));
4073   temp[0] = sender;
4074   temp[1] = locus;
4075   temp[2] = replicate;
4076   if(world->mpistacknum > 0)
4077     {
4078       world->mpistacknum -= 1;
4079       replicator = world->mpistack[world->mpistacknum];
4080       //printf("%i> checking out replicator:[%4li] %i\n",myID, world->mpistacknum,replicator);
4081       MYMPISEND(temp, 3, MPI_LONG, replicator, (MYINT) tag, comm_world);
4082     }
4083   else
4084     {
4085       if(world->mpistack_requestnum>=world->mpistack_request_numalloc)
4086 	{
4087 
4088 	  world->mpistack_request = (mpirequest_fmt *) myrealloc(world->mpistack_request,
4089 					      sizeof(mpirequest_fmt)*(world->mpistack_requestnum+10));
4090 	  for(i=world->mpistack_request_numalloc; i < world->mpistack_requestnum + 10; i++)
4091 	    {
4092 	      	world->mpistack_request[i].tempstr = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
4093 	    }
4094 	  world->mpistack_request_numalloc = world->mpistack_requestnum + 10;
4095 	}
4096       strcpy(world->mpistack_request[world->mpistack_requestnum].tempstr, tempstr);
4097       world->mpistack_request[world->mpistack_requestnum].sender = sender;
4098       world->mpistack_request[world->mpistack_requestnum].tag = tag;
4099       world->mpistack_requestnum += 1;
4100       //      printf("%i> MASTER: added request from n%i to request-stack position %li\n",myID, sender, world->mpistack_requestnum);
4101     }
4102   myfree(temp);
4103 }
4104 
handle_mdim(float * values,long n,int sender,world_fmt * world)4105 void handle_mdim(float *values,long n, int sender, world_fmt * world)
4106 {
4107     register long i;
4108     register long z;
4109 
4110     int digits;
4111 #ifdef ZNZ
4112     znzFile file = world->bayesmdimfile;
4113 #else
4114     FILE *file = world->bayesmdimfile;
4115 #endif
4116     register char *temp;
4117 
4118     temp = (char *) mycalloc(n*20+LINESIZE,sizeof(char));
4119     z = sprintf(temp,"%li\t%li\t%li\t%f\t%f\t%f\t%f\t%li\t%f",
4120 	   (long) values[0], (long) values[1]+1,(long) values[2]+1,
4121 	    values[3], values[4], values[5], values[6],(long) values[7], values[8]);
4122     for(i=9;i<n; i++)
4123       {
4124 	//fprintf (stdout, "%f (%li) ",values[i], i);
4125 	if(i > (world->numpop2 +  world->bayes->mu))
4126 	  {
4127 	    z += sprintf(temp+z,"\t%f",values[i]);
4128 	    continue;
4129 	  }
4130 	if(values[i] < SMALLEPSILON)
4131 	  {
4132 	    z += sprintf(temp+z,"\t0");
4133 	  }
4134 	else
4135 	  {
4136 	    digits = (long) log10(values[i]);
4137 	    switch(digits)
4138 	      {
4139 	      case -8:
4140 	      case -6:
4141 	      case -5:
4142 		//		fmt = 10;
4143 		z += sprintf(temp + z,"\t%.10f", values[i]);
4144 		break;
4145 	      case -4:
4146 	      case -3:
4147 		//fmt = 8;
4148 		z += sprintf(temp + z,"\t%.8f", values[i]);
4149 		break;
4150 	      case -2:
4151 	      case -1:
4152 		//		fmt= 5;
4153 		z += sprintf(temp + z,"\t%.5f", values[i]);
4154 		break;
4155 	      case 0:
4156 	      case 1:
4157 		//		fmt = 4;
4158 		z += sprintf(temp + z,"\t%.4f", values[i]);
4159 		break;
4160 	      case 2:
4161 		//		fmt = 2;
4162 		z += sprintf(temp + z,"\t%.2f", values[i]);
4163 		break;
4164 	      case 3:
4165 		//		fmt = 1;
4166 		z += sprintf(temp + z,"\t%.1f", values[i]);
4167 		break;
4168 	      case 4:
4169 	      case 5:
4170 	      case 6:
4171 	      case 7:
4172 	      case 8:
4173 		//		fmt = 0;
4174 		z += sprintf(temp + z,"\t%.0f", values[i]);
4175 		break;
4176 	      default:
4177 		if(digits<-8)
4178 		  {
4179 		    //		    fmt=20;
4180 		    z += sprintf(temp + z,"\t%.20f", values[i]);
4181 		  }
4182 		else
4183 		  {
4184 		    //		    fmt = 5;
4185 		    z += sprintf(temp + z,"\t%.5f", values[i]);
4186 		  }
4187 		break;
4188 	      }
4189 	    //	    fprintf(stdout,"\t%f", values[i]);
4190 	  }
4191       }
4192     //    fprintf (stdout, " [%i] \n", myID);
4193 #ifdef ZNZ
4194     znzprintf(file,"%s\n",temp);
4195 #else
4196     fprintf(file,"%s\n",temp);
4197     fflush(file);
4198 #endif
4199     myfree(temp);
4200     //fprintf(stdout,"\n");
4201     // calculate_parallel_convergence(world, values, size);
4202 }
4203 
handle_message(char * rawmessage,int sender,world_fmt * world)4204 void handle_message(char *rawmessage,int sender, world_fmt * world)
4205 {
4206     char *rawptr;
4207     long  pos=0;
4208     void *file = (void *) stdout;
4209     rawptr = rawmessage;
4210     //fprintf(stderr,"%i> handle_message: %s\n",myID, rawmessage);
4211     set_filehandle(rawmessage, world, &file, &pos);
4212     //fprintf(stderr,"%i> handle_message: pos=%li\n",myID,pos);
4213     //fprintf(stderr,"%i> handle_message: %s\n",myID, rawmessage + pos);
4214     fprintf((FILE *) file,"%s", rawptr + pos);
4215     fflush((FILE *) file);
4216 }
4217 
handle_burnin_message(char * rawmessage,int sender,world_fmt * world)4218 void handle_burnin_message(char *rawmessage,int sender, world_fmt * world)
4219 {
4220   static long       z = 0;
4221   static boolean done = FALSE;
4222   static long    *replicates;
4223   long locus;
4224   long step;
4225   MYREAL var;
4226   MYREAL oldvar;
4227   MYREAL ess;
4228   MYREAL acceptance;
4229   if(!done)
4230     {
4231       replicates = mycalloc(world->loci,sizeof(long));
4232       done = TRUE;
4233     }
4234 #ifdef USE_MYREAL_FLOAT
4235   sscanf(rawmessage,"%li%f%f%f%f%li",&locus, &ess, &acceptance, &var, &oldvar, &step);
4236 #else
4237   sscanf(rawmessage,"%li%lf%lf%lf%lf%li",&locus, &ess, &acceptance, &var, &oldvar, &step);
4238 #endif
4239   replicates[locus] += 1;
4240   if(world->options->verbose)
4241     {
4242       fprintf(stdout,"%i> Burn-in on node %i (locus=%li, repl=%li) stopped at step %li with variance-ratio=%.2f/%.2f=%.3f\n       min(ess)=%f and avg(acceptance)=%f", myID, sender, locus, replicates[locus], step, var, oldvar, var/oldvar, ess, acceptance);
4243     }
4244   world->burnin_stops[z].locus        =  locus;
4245   world->burnin_stops[z].replicate =  replicates[locus];
4246   world->burnin_stops[z].stopstep = step;
4247   world->burnin_stops[z].ess       = ess;
4248   world->burnin_stops[z].accept  = acceptance;
4249   world->burnin_stops[z].variance = var;
4250   world->burnin_stops[z].oldvariance = oldvar;
4251   world->burnin_stops[z].worker = sender;
4252   z++;
4253 #ifdef DEBUG
4254   printf("%i> handle burnin_stop: z=%li maxalloc=maxrep*loci\n",myID,z);
4255 #endif
4256 }
4257 
4258 ///
4259 /// sets up a file database so that the master and worker worker end up writing to the same file
4260 /// the workers send the stuff to the master and the master then figures out (using the db)
4261 /// what file pointer the stuff was intended for.
4262 /// needs globals filedb, and filenum
setup_filehandle_db(FILE * file,world_fmt * world,option_fmt * options,data_fmt * data)4263 void setup_filehandle_db(FILE *file, world_fmt *world, option_fmt *options, data_fmt *data)
4264 {
4265     long filehandle = get_filehandle(file, world, options, data);
4266     filedb[filenum].file = file;
4267     filedb[filenum++].handle = filehandle;
4268 #ifdef DEBUG_MPI
4269     fprintf(stdout,"filedb %li: %p %li\n",filenum, file,filehandle);
4270 #endif
4271 }
4272 
retrieve_filehandle(FILE * file)4273 long retrieve_filehandle(FILE *file)
4274 {
4275     long i=0;
4276     long filehandle = 0;
4277     while(filedb[i].file != file && i<filenum)
4278         i++;
4279     if(i!=filenum)
4280         filehandle = filedb[i].handle;
4281     return filehandle;
4282 }
4283 
get_filehandle(void * vfile,world_fmt * world,option_fmt * options,data_fmt * data)4284 long get_filehandle(void *vfile, world_fmt *world, option_fmt *options, data_fmt *data)
4285 {
4286 #ifdef ZNZ
4287   if(((znzFile) vfile) == world->bayesmdimfile)
4288         return BAYESMDIMFILENUM;
4289     FILE * file = (FILE *) vfile;
4290 #else
4291     FILE * file = (FILE *) vfile;
4292     if(file == world->bayesmdimfile)
4293         return BAYESMDIMFILENUM;
4294 #endif
4295     if(file == stdout)
4296         return STDOUTNUM;
4297     if(file == options->logfile)
4298         return LOGFILENUM;
4299     if(file == world->outfile)
4300         return OUTFILENUM;
4301     if(file == options->aicfile)
4302         return AICFILENUM;
4303     if(file == world->mathfile)
4304         return MATHFILENUM;
4305     if(file == world->mighistfile)
4306         return MIGHISTFILENUM;
4307     if(file == world->skylinefile)
4308         return SKYLINEFILENUM;
4309     if(file == world->bayesfile)
4310         return BAYESFILENUM;
4311     if(file == world->pdfoutfile)
4312         return PDFOUTFILENUM;
4313     if(file == world->treefile)
4314         return TREEFILENUM;
4315     return STDOUTNUM;
4316 }
4317 
get_filehandle2(void * vfile,world_fmt * world)4318 long get_filehandle2(void *vfile, world_fmt *world)
4319 {
4320   FILE *file ;
4321 #ifdef ZNZ
4322   if(((znzFile) vfile) == world->bayesmdimfile)
4323         return BAYESMDIMFILENUM;
4324 #else
4325     if(((FILE *) vfile) == world->bayesmdimfile)
4326         return BAYESMDIMFILENUM;
4327 #endif
4328     file = (FILE*) vfile;
4329     if(file == stdout)
4330         return STDOUTNUM;
4331     if(file == world->options->logfile)
4332         return LOGFILENUM;
4333     if(file == world->outfile)
4334         return OUTFILENUM;
4335     if(file == world->mighistfile)
4336         return MIGHISTFILENUM;
4337     if(file == world->skylinefile)
4338         return SKYLINEFILENUM;
4339     if(file == world->options->aicfile)
4340         return AICFILENUM;
4341     if(file == world->mathfile)
4342         return MATHFILENUM;
4343     if(file == world->bayesfile)
4344         return BAYESFILENUM;
4345     if(file == world->treefile)
4346         return TREEFILENUM;
4347  //   fprintf(stdout,"@@@@@@@@@@@@@@@wrong wrong wrong@@@@@@@@@@@@@@@@\n");
4348     return STDOUTNUM;
4349 }
4350 
set_filehandle(char * message,world_fmt * world,void ** file,long * msgstart)4351 void set_filehandle(char *message, world_fmt *world,
4352                     void **file, long *msgstart)
4353 {
4354     char *temp;
4355     long filenum;
4356     long i = 1;
4357     temp = (char *) mycalloc(10,sizeof(char));
4358     if(message[0] == '\0')
4359       {
4360 	warning("%i> set_filehandle() the message was NULL",myID);
4361       }
4362     temp[0] = message[i];
4363     while(temp[i-1]!=':' && i < 9 && message[i]!='\0')
4364       {
4365 #ifdef DEBUG_MPI
4366 	fprintf(stderr,"%i>>>>>> temp     =%s\n",myID,temp);
4367 	fprintf(stderr,"%i>>>>>> temp[%li]=%c\n",myID,i,temp[i]);
4368 	fprintf(stderr,"%i>>>>>> temp     =%s\n",myID,message);
4369 #endif
4370 	i++;
4371 	temp[i-1] = message[i];
4372       }
4373     *msgstart = i+1;
4374     filenum = atol(temp);
4375     //    fprintf(stdout,"\n@@@@@@@@@@@@@@@@@%li@%s@%li@\n",filenum,temp,i);
4376     myfree(temp);
4377     switch(filenum)
4378       {
4379       case STDOUTNUM:
4380 	{
4381 	  //		fprintf(stdout,"\n");
4382 	  *file = stdout;
4383 	  return;
4384 	}
4385       case LOGFILENUM:
4386 	{
4387 	  //	fprintf(stdout," logfile\n");
4388 	  *file = world->options->logfile;
4389 	  return;
4390 	}
4391       case OUTFILENUM:
4392 	{
4393 	  *file = world->outfile;
4394 	  return ;
4395 	}
4396       case AICFILENUM:
4397 	{
4398 	  *file = world->options->aicfile;
4399 	  return ;
4400 	}
4401       case MATHFILENUM:
4402 	{
4403 	  *file = world->mathfile;
4404 	  return ;
4405 	}
4406       case MIGHISTFILENUM:
4407 	{
4408 	  *file = world->mighistfile;
4409 	  return ;
4410 	}
4411       case SKYLINEFILENUM:
4412 	{
4413 	  *file = world->skylinefile;
4414 	  return ;
4415 	}
4416       case BAYESFILENUM:
4417 	{
4418 	  *file = world->bayesfile;
4419 	  return ;
4420 	}
4421       case BAYESMDIMFILENUM:
4422 	{
4423 	  *file = world->bayesmdimfile;
4424 	  return ;
4425 	}
4426       case TREEFILENUM:
4427 	{
4428 	  *file = world->treefile;
4429 	  return ;
4430 	}
4431       case PDFOUTFILENUM:
4432 	{
4433 	  *file = world->pdfoutfile;
4434 	  return ;
4435 	}
4436       }
4437     *file = stdout;
4438     return;
4439 }
4440 
4441 
4442 void
mpi_fprintf(FILE * file,const char * fmt,...)4443 mpi_fprintf(FILE *file, const char *fmt, ...)
4444 {
4445     char *p1 = NULL;
4446     char *p  = NULL;
4447     va_list ap;
4448     long filehandle = 0;
4449     long bufsize = 0;
4450 
4451     long pallocsize = LINESIZE;
4452     p  = (char *) mycalloc(pallocsize,sizeof(char));
4453     p1 = (char *) mycalloc(STRSIZE,sizeof(char));
4454     if(myID != MASTER)
4455     {
4456         filehandle = retrieve_filehandle(file);
4457         bufsize = sprintf(p, "%c%li:",'M',filehandle);
4458     }
4459     va_start(ap, fmt);
4460     bufsize += vsprintf(p+bufsize, fmt, ap);
4461     if(bufsize >= pallocsize)
4462       error("failed in mpi_printf(): problem with buffer size!");
4463     if(myID != MASTER)
4464     {
4465         sprintf(p1,"M%li",bufsize);
4466         MYMPISEND (p1, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4467         MYMPISEND (p, bufsize, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4468     }
4469     else
4470       {
4471         fprintf(file,"%s", p);
4472       }
4473     va_end(ap);
4474     myfree(p);
4475     myfree(p1);
4476 }
4477 
4478 void
mpi_fprintf2(FILE * file,long filesize,const char * fmt,...)4479 mpi_fprintf2(FILE *file, long filesize, const char *fmt, ...)
4480 {
4481     char *p1;
4482     char *p;
4483     va_list ap;
4484     long filehandle = 0;
4485     long bufsize = 0;
4486 
4487     long pallocsize = filesize+strlen(fmt)+10;//leave room for "M:number"
4488 
4489     p  = (char *) mycalloc(pallocsize,sizeof(char));
4490     p1 = (char *) mycalloc(STRSIZE,sizeof(char));
4491     if(myID!=MASTER)
4492     {
4493         filehandle = retrieve_filehandle(file);
4494         bufsize = sprintf(p, "%c%li:",'M',filehandle);
4495     }
4496     va_start(ap, fmt);
4497     bufsize += vsprintf(p+bufsize, fmt, ap);
4498     if(bufsize>=pallocsize)
4499       {
4500 	warning("Failing because bufsize=%li >= allocsize=%li\n",bufsize,pallocsize);
4501 	error("failed in mpi_printf2()");
4502       }
4503     if(myID!=MASTER)
4504     {
4505         sprintf(p1,"M%li",bufsize);
4506         MYMPISEND (p1, STRSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4507         MYMPISEND (p, bufsize, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4508     }
4509     else
4510         fprintf(file,"%s", p);
4511     va_end(ap);
4512     myfree(p);
4513     myfree(p1);
4514 }
4515 
4516 ///
4517 /// sends raw bayesian parameters to master, using label 'Z' to match on the master side
4518 #ifdef PARALIO
mpi_mdim_send(MPI_File * file,char * values,long size)4519 void mpi_mdim_send(MPI_File *file, char *values, long size)
4520 #else
4521 void mpi_mdim_send(float *values, long size)
4522 #endif
4523 {
4524 #ifdef PARALIO
4525   MPI_Status status;
4526   char error_string[LINESIZE];
4527   int length_of_error_string, error_class;
4528 #else
4529     char *p1;
4530     p1 = (char *) mycalloc(SMALLBUFSIZE,sizeof(char));
4531     if(myID!=MASTER)
4532     {
4533 #endif /*with paralio the master writes the header using this service*/
4534 #ifdef PARALIO
4535       my_write_error = MPI_File_write(*file,values,size,MPI_CHAR, &status);
4536       if (my_write_error != MPI_SUCCESS)
4537 	{
4538 	  MPI_Error_class(my_write_error, &error_class);
4539 	  MPI_Error_string(error_class, error_string, &length_of_error_string);
4540 	  printf("%i> %s\n", myID, error_string);
4541 	  MPI_Error_string(my_write_error, error_string, &length_of_error_string);
4542 	  printf("%i> %s\n", myID, error_string);
4543 	  my_write_error = TRUE;
4544 	}
4545 #else
4546       sprintf(p1,"Z%li",size);
4547       MYMPISEND (p1, SMALLBUFSIZE, MPI_CHAR, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4548 #ifdef DEBUG_MPI
4549 	fprintf(stdout,"%i> mdimlast=%f\n",myID,values[size-1]);
4550 #endif
4551 	MYMPISEND (values, size, MPI_FLOAT, (MYINT) MASTER, (MYINT) myID+PRINTTAG, comm_world);
4552 #endif
4553 #ifndef PARALIO /* without paralio the master is not allowed here*/
4554     }
4555     else
4556         error("master sends itself bayesallfile stuff");
4557     myfree(p1);
4558 #endif
4559 }
4560 
4561 
4562 ///
4563 /// assembles the data from a replicant (receives materials from mpi_send_replicant()
4564 ///
4565 void
4566 mpi_receive_replicate(int sender, int tag, long locus, long replicate, world_fmt * world)
4567 {
4568     MYREAL *buffer;
4569     long  bufsize=1;
4570     MPI_Status status;
4571     long z;
4572     MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4573 #ifdef DEBUG_MPI
4574     fprintf(stdout,"%i> WORKER: mpi_receive_replicate received bufsize=%li from sender=%i with tag=%i\n",myID, bufsize, status.MPI_SOURCE, status.MPI_TAG);
4575     sender = status.MPI_SOURCE;
4576     tag = status.MPI_TAG;
4577     fprintf(stdout,"%i> mpi_receive_replicate received bufsize=%li from sender=%i with tag=%i\n",myID, bufsize, sender, tag);
4578 #endif
4579     buffer = (MYREAL *) mycalloc (bufsize, sizeof (MYREAL));
4580     MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4581     //fprintf(stdout,"%i> received bufsize is really %li and bufsize=%li\n",myID,(long) (long) strlen(buffer),bufsize);
4582     if(world->options->bayes_infer)
4583     {
4584         unpack_single_bayes_buffer(buffer,world->bayes,world,locus);
4585     }
4586     else
4587     {
4588       z=0;
4589       unpack_single_sumfile_buffer(buffer, world->atl, world, locus, replicate, world->numpop, &z);
4590       //      fprintf(stdout,"%i> replicate=%li locus=%li\n",myID, replicate, locus);
4591     }
4592     if(world->options->mighist)
4593     {
4594       MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4595 #ifdef DEBUG_MPI
4596       fprintf(stdout,"%i> mpi_receive_replicate received mighistogram bufsize=%li from sender=%i with tag=%i\n",
4597 	      myID, bufsize, status.MPI_SOURCE, status.MPI_TAG);
4598 #endif
4599       buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4600       MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4601       unpack_mighist_replicate_buffer(buffer, world, locus, world->numpop);
4602     }
4603     if(world->options->mighist && world->options->skyline)
4604     {
4605       MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4606       buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4607       MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4608       unpack_skyline_buffer(buffer, world, locus, -1, world->numpop);
4609     }
4610   // send best tree if available
4611     //  if(world->options->treeprint == BEST && world->options->treeinmemory == TRUE)
4612   if(world->options->treeinmemory == TRUE)
4613     {
4614       MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4615       buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4616       //      printf("%i> mpi_receive_buffer() received treespace buffersize: %li %p\n", myID, bufsize,  buffer);
4617       MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4618       unpack_treespace_buffer(buffer, world, locus, -1, world->numpop);
4619     }
4620 
4621     // BF material
4622   if(world->options->bayes_infer)
4623     {
4624       MYMPIRECV (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) tag, comm_world, &status);
4625       buffer = (MYREAL *) myrealloc (buffer, bufsize * sizeof (MYREAL));
4626       MYMPIRECV (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) tag, comm_world, &status);
4627       if(!world->data->skiploci[locus])
4628 	{
4629 	  //printf("%i> mpi_receive_buffer() received BF buffersize: %li %p\n", myID, bufsize,  buffer);
4630 	  bufsize = unpack_BF_buffer(buffer, 0, locus, world);
4631 	}
4632       // ESS material
4633       bufsize = unpack_ess_buffer(buffer, bufsize, world);
4634     }
4635 
4636     myfree(buffer);
4637 }
4638 
4639 ///
4640 /// replicant sends data to sub-master
4641 ///
4642 void
4643 mpi_send_replicate(int sender, long locus,  long replicate, world_fmt * world)
4644 {
4645   long    allocbufsize   = 1;
4646   long    bufsize        = 0;
4647   long    numpop         = world->numpop;
4648   long    numpop2        = numpop * numpop;
4649   //long    numpop2plus    = numpop2 + 2 * numpop;
4650   long    npp            = numpop2 + world->bayes->mu * world->loci;
4651   MYREAL  *buffer        = NULL;
4652   //  timearchive_fmt **ta   = world->atl;
4653   buffer = (MYREAL *) mycalloc (allocbufsize, sizeof (MYREAL));
4654 
4655   if(world->options->bayes_infer)
4656     {
4657       bufsize = pack_single_bayes_buffer(&buffer,world->bayes,world,locus);
4658       allocbufsize = bufsize;
4659     }
4660   else
4661     {
4662       bufsize = pack_single_sumfile_buffer(&buffer, 0, &allocbufsize, world, locus, replicate, world->numpop);
4663     }
4664 
4665   MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4666   MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4667   if(world->options->mighist)
4668     {
4669       bufsize = pack_mighist_buffer(&buffer, world, locus, -1, numpop);
4670       allocbufsize = bufsize;
4671       MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4672       MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4673     }
4674 
4675   if(world->options->mighist && world->options->skyline)
4676     {
4677       bufsize = pack_skyline_buffer(&buffer, world, locus, -1, numpop);
4678       allocbufsize = bufsize;
4679       MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4680       MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4681     }
4682   // send best tree if available
4683   if(/*world->options->treeprint == BEST &&*/ world->options->treeinmemory == TRUE)
4684     {
4685       //      printf("%i> send treespace buffer to %li", myID,locus+1+REPTAG);
4686       bufsize = pack_treespace_buffer(&buffer, world, locus, -1, numpop);
4687       allocbufsize = bufsize;
4688       MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4689       MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4690     }
4691 
4692     // BF material
4693   if(world->options->bayes_infer)
4694     {
4695       bufsize = world->options->heated_chains * world->loci + 5 * world->loci + 20 * (npp+1);
4696       buffer = (MYREAL *) myrealloc (buffer, bufsize *  sizeof (MYREAL));
4697       if(!world->data->skiploci[locus])
4698 	{
4699 	  //fprintf(stderr,"%i> REPLICANT: packed result locus=%li replicate %li\n",myID,locus, replicate);
4700 	  bufsize = pack_BF_buffer(&buffer, 0, locus, world);
4701 	}
4702       // ESS material
4703       bufsize = pack_ess_buffer(&buffer, bufsize, world);
4704       // send BF and ESS material
4705       MYMPISEND (&bufsize, ONE, MPI_LONG, (MYINT) sender, (MYINT) (locus+1+ REPTAG), comm_world);
4706       MYMPISEND (buffer, bufsize, mpisizeof, (MYINT) sender, (MYINT) (locus+1 + REPTAG), comm_world);
4707     }
4708   myfree(buffer);
4709 }
4710 
4711 #ifdef MPICHECK
4712 void set_memory_limit(rlim_t softsize,rlim_t maxsize)
4713 {
4714   struct rlimit r;
4715   r.rlim_cur=softsize;
4716   r.rlim_max=maxsize;
4717   setrlimit(RLIMIT_AS, &r);
4718 }
4719 
4720 void check_memory_limit()
4721 {
4722   struct rlimit r;
4723   getrlimit(RLIMIT_AS, &r);
4724   fprintf(stdout, "%i> current memory/data usage: %f\n",myID, (double) r.rlim_cur);
4725 }
4726 #endif
4727 #endif /* MPI */
4728