1 /* $Id: pcomp_subs2.c $ */
2 
3 /* copyright (c) 1996, 1997, 1998, 1999, 2014 by William R. Pearson and
4    The Rector & Visitors of the University of Virginia */
5 
6 /* Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9 
10    http://www.apache.org/licenses/LICENSE-2.0
11 
12    Unless required by applicable law or agreed to in writing,
13    software distributed under this License is distributed on an "AS
14    IS" BASIS, WITHOUT WRRANTIES OR CONDITIONS OF ANY KIND, either
15    express or implied.  See the License for the specific language
16    governing permissions and limitations under the License.
17 */
18 
19 /* modified to do more initialization of work_info here, rather than
20    in main() */
21 
22 /* this file provides the same functions for PCOMPLIB as pthr_subs2.c does for COMP_THR */
23 
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <time.h>
28 #include <sys/types.h>
29 #ifdef UNIX
30 #include <unistd.h>
31 #endif
32 #include <signal.h>
33 
34 #include "defs.h"
35 #include "structs.h"		/* mngmsg, libstruct */
36 #include "param.h"		/* pstruct, thr_str, buf_head, rstruct */
37 #include "thr_buf_structs.h"
38 
39 #ifdef MPI_SRC
40 #include "mpi.h"
41 #endif
42 
43 #include "msg.h"
44 #include "pcomp_bufs.h"
45 #define XTERNAL
46 #include "uascii.h"
47 #undef XTERNAL
48 #include "pthr_subs.h"
49 
50 #ifdef DEBUG
51 unsigned long adler32(unsigned long, const unsigned char *, unsigned int);
52 #endif
53 
54 static int next_worker_idx, num_workers_idle;
55 extern int g_worker;
56 
57 /* used for debugging */
58 /*
59 int check_seq_range(unsigned char *aa1b, int n1, int nsq, char *);
60 */
61 
62 /* start the workers, nworkers == number of workers, not nodes */
63 void
init_thr(int nworkers,char * info_lib_range_p,const struct mngmsg * m_msp,struct pstruct * ppst,unsigned char * aa0,struct mng_thr * m_bufi_p)64 init_thr(int nworkers, char *info_lib_range_p, const struct mngmsg *m_msp, struct pstruct *ppst,
65 	 unsigned char *aa0, struct mng_thr *m_bufi_p)
66 {
67 #ifdef MPI_SRC
68   MPI_Status mpi_status;
69   int int_msg_b[4];	/* general purpose buffer for integers */
70 #endif
71   int node, snode;
72 
73   /* start the worker processes */
74 
75   if (work_q == NULL) {
76     if ((work_q=(int *)calloc(nworkers, sizeof(int)))==NULL) {
77       fprintf(stderr, " cannot allocate work_q[%d] structure\n",
78 	      nworkers);
79       exit(1);
80     }
81     else {max_worker_q = nworkers;}
82   }
83   num_workers_idle = 0;
84 
85   /* setup thread buffer info */
86   if (aa0 == NULL) {
87     int_msg_b[0] = int_msg_b[1] = int_msg_b[2] = 0;
88   }
89   else {
90     int_msg_b[0] = nworkers;
91     int_msg_b[2] = m_bufi_p->max_chain_seqs;
92     int_msg_b[3] = m_bufi_p->seq_buf_size;
93   }
94 
95   /* send thread info */
96   for (node=FIRSTNODE; node < nworkers+FIRSTNODE; node++) {
97 
98     MPI_Send(int_msg_b, 4, MPI_INT, node, STARTTYPE0, MPI_COMM_WORLD);
99 
100     if (aa0 == NULL) { continue;}
101 
102     /* send mngmsg */
103     MPI_Send((void *)m_msp, sizeof(struct mngmsg), MPI_BYTE, node,
104 	     STARTTYPE1, MPI_COMM_WORLD);
105 
106     MPI_Send(ppst, sizeof(struct pstruct), MPI_BYTE, node,
107 	     STARTTYPE2, MPI_COMM_WORLD);
108 
109     /* send the rest of the pieces of pam[2] */
110     MPI_Send(&ppst->pam2[0][0][0],m_msp->pamd1*m_msp->pamd2,MPI_INT,node,STARTTYPE3,
111 	     MPI_COMM_WORLD);
112     MPI_Send(&ppst->pam2[1][0][0],m_msp->pamd1*m_msp->pamd2,MPI_INT,node,STARTTYPE3,
113 	     MPI_COMM_WORLD);
114 
115     /* send pascii (only for fasty/tfasty */
116     MPI_Send(pascii, sizeof(aascii), MPI_BYTE, node, STARTTYPE4, MPI_COMM_WORLD);
117   }
118 
119   if (aa0 == NULL) {
120     /* all done */
121     free(work_q);
122     return;
123   }
124 
125   /* wait for returned status results */
126   while (num_workers_idle < max_worker_q) {
127     MPI_Recv(&node, 1, MPI_INT, MPI_ANY_SOURCE,MSEQTYPE0,
128 	     MPI_COMM_WORLD, &mpi_status);
129     snode= mpi_status.MPI_SOURCE;
130     if (snode == FIRSTNODE) {
131       MPI_Recv(info_lib_range_p, MAX_FN, MPI_BYTE, snode,MSEQTYPE0,
132 	       MPI_COMM_WORLD, &mpi_status);
133     }
134 
135     if (snode != node) {
136       fprintf(stderr, " initial node mismatch [%d!=%d]\n",node, snode);
137     }
138     worker_buf[snode-FIRSTNODE]->hdr.have_data = 0;
139     worker_buf[snode-FIRSTNODE]->hdr.have_results = 0;
140     worker_buf[snode-FIRSTNODE]->hdr.worker_idx = snode;
141     work_q[num_workers_idle++] = snode;
142   }
143   next_worker_idx = 0;
144 
145   /* send query sequence info to workers */
146   for (node=FIRSTNODE; node < nworkers+FIRSTNODE; node++) {
147     /* send thread buffer info */
148     int_msg_b[0] = m_msp->n0;
149     int_msg_b[1] = m_msp->nm0;
150     MPI_Send(int_msg_b, 2, MPI_INT, node, QSEQTYPE0, MPI_COMM_WORLD);
151     MPI_Send(aa0, m_msp->n0+1, MPI_BYTE, node, QSEQTYPE1, MPI_COMM_WORLD);
152     if (m_msp->ann_flg && m_msp->aa0a) {
153       MPI_Send(m_msp->aa0a, m_msp->n0+2, MPI_BYTE, node, QSEQTYPE1, MPI_COMM_WORLD);
154     }
155   }
156 }
157 
158 /* get_rbuf() provides buffers containing sequences to the main
159    program. max_work_q buffers are available, with each
160    buffer tied to a worker.
161 
162    As the main program runs, it calls get_rbuf() to get a worker
163    buffer (reader buffers are not used with PCOMPLIB), fills it with
164    sequences, and sends it to a worker with put_rbuf().
165 
166    At the same time, the worker programs call get_wbuf(), to get a
167    filled worker buffer sent by put_rbuf(), takes the sequences from
168    the buffer and does the comparisons, and sends the results back to
169    the manager by calling put_wbuf().
170 */
171 
172 /* wait for results from any worker */
173 struct buf_head *
next_work_result(int * snode)174 next_work_result(int *snode) {
175   int this_node, buf2_cnt;
176   int int_msg_b[4];	/* general purpose int buffer */
177   int i;
178   struct buf2_hdr_s buf2_head;
179   struct buf_head *this_buf_p, tmp_buf_head;
180   struct seq_record *seq_b_save;
181   struct mseq_record *mseq_b_save;
182   unsigned char *aa1b_start_save;
183   struct a_res_str *new_ares_p, *prev_ares_p;
184 #ifdef MPI_SRC
185   MPI_Status mpi_status;
186 #endif
187 
188   /* wait for a returned result */
189   MPI_Recv(&tmp_buf_head, sizeof(struct buf_head), MPI_BYTE, MPI_ANY_SOURCE,RES_TYPE0,
190 	   MPI_COMM_WORLD, &mpi_status);
191   this_node = mpi_status.MPI_SOURCE;
192   buf2_cnt = tmp_buf_head.hdr.buf2_cnt;
193 
194 #ifdef DEBUG
195   /*
196   fprintf(stderr," %d: %d results\n", this_node, buf2_cnt);
197   */
198 #endif
199 
200   this_buf_p = worker_buf[this_node-FIRSTNODE];
201   /* move things selectively to avoid over-writing pointers to res, a_res arrays */
202 
203   aa1b_start_save = this_buf_p->hdr.aa1b_start;
204   seq_b_save = this_buf_p->hdr.seq_b;
205   mseq_b_save = this_buf_p->hdr.mseq_b;
206 
207   memcpy(&this_buf_p->hdr,&tmp_buf_head.hdr,sizeof(struct buf2_hdr_s));
208 
209   this_buf_p->hdr.aa1b_start = aa1b_start_save;
210   this_buf_p->hdr.seq_b = seq_b_save;
211   this_buf_p->hdr.mseq_b =mseq_b_save;
212 
213   memcpy(&this_buf_p->s_cnt_info,&tmp_buf_head.s_cnt_info,sizeof(struct score_count_s));
214 
215   if (this_buf_p->hdr.have_results) {
216     if (this_buf_p->hdr.buf2_type & (BUF2_DOWORK + BUF2_DOSHUF + BUF2_DOOPT)) {
217       MPI_Recv(this_buf_p->buf2_res, sizeof(struct buf2_res_s)*buf2_cnt,
218 	       MPI_BYTE, this_node, RES_TYPE1, MPI_COMM_WORLD, &mpi_status);
219       /*
220       for (i=0; i < buf2_cnt; i++) {
221 	if (this_buf_p->buf2_res[i].rst.score[2] > 200) {
222 	  fprintf(stderr, "HS[%d:%d,%d]: %d (%d:%d)\n",i,this_node, tmp_buf_head.hdr.worker_idx, this_buf_p->buf2_res[i].rst.score[2],
223 		  this_buf_p->buf2_data[i].seq->index,this_buf_p->buf2_data[i].seq->n1);
224 	}
225       }
226       */
227     }
228 
229     if (this_buf_p->hdr.buf2_type & BUF2_DOALIGN) {
230       /* (1) get a message that has "have_ares"
231 	 (2) allocate space for each a_res and receive it individually
232 	 (3) reset the ->next pointers for the a_res chain
233       */
234 
235       for (i = 0; i < buf2_cnt; i++) {
236 	MPI_Recv(int_msg_b, 1, MPI_INT, this_node, ALN_TYPE0, MPI_COMM_WORLD, &mpi_status);
237 	this_buf_p->buf2_ares[i].have_ares = int_msg_b[0];
238 	this_buf_p->buf2_ares[i].a_res = NULL;	/* pre-initialize */
239 
240 	if (this_buf_p->buf2_ares[i].have_ares) {
241 	  /* allocate space to receive it */
242 	  if ((new_ares_p = (struct a_res_str *)calloc(1,sizeof(struct a_res_str)))==NULL) {
243 	    fprintf(stderr, "cannot allocate a_res from %d\n",this_node);
244 	    exit(1);
245 	  }
246 	  /* save the head of the ares_chain */
247 	  this_buf_p->buf2_ares[i].a_res = new_ares_p;
248 
249 	  /* get the first a_res */
250 	  MPI_Recv(new_ares_p, sizeof(struct a_res_str), MPI_BYTE, this_node,
251 		   ALN_TYPE1, MPI_COMM_WORLD, &mpi_status);
252 	  /* get the associated res[nres] */
253 	  if ((new_ares_p->res = (int *)calloc(new_ares_p->nres,sizeof(int)))==NULL) {
254 	    fprintf(stderr, "cannot allocate res for a_res from %d\n",this_node);
255 	    exit(1);
256 	  }
257 	  MPI_Recv(new_ares_p->res, new_ares_p->nres, MPI_INT, this_node,
258 		   ALN_TYPE2, MPI_COMM_WORLD, &mpi_status);
259 
260 	  /* now get alignment encodings if available */
261 	  if (new_ares_p->aln_code) {
262 	    if ((new_ares_p->aln_code = (char *)calloc(new_ares_p->aln_code_n+1,sizeof(char)))==NULL) {
263 	      fprintf(stderr, "cannot allocate aln_code for a_res from %d\n",this_node);
264 	      exit(1);
265 	    }
266 	    MPI_Recv(new_ares_p->aln_code, new_ares_p->aln_code_n+1, MPI_BYTE, this_node,
267 		   ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
268 	  }
269 	  if (new_ares_p->ann_code) {
270 	    if ((new_ares_p->ann_code = (char *)calloc(new_ares_p->ann_code_n+1,sizeof(char)))==NULL) {
271 	      fprintf(stderr, "cannot allocate ann_code for a_res from %d\n",this_node);
272 	      exit(1);
273 	    }
274 	    MPI_Recv(new_ares_p->ann_code, new_ares_p->ann_code_n+1, MPI_BYTE, this_node,
275 		   ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
276 
277 	  }
278 
279 	  while (new_ares_p->next) {	/* while the chain continues */
280 	    prev_ares_p = new_ares_p;	/* save pointer to previous a_res to fix prev_ares->next */
281 	    if ((new_ares_p = (struct a_res_str *)calloc(1,sizeof(struct a_res_str)))==NULL) {
282 	      fprintf(stderr, "cannot allocate a_res from %d\n",this_node);
283 	      exit(1);
284 	    }
285 	    prev_ares_p->next = new_ares_p;
286 	    MPI_Recv(new_ares_p, sizeof(struct a_res_str), MPI_BYTE, this_node,
287 		     ALN_TYPE1, MPI_COMM_WORLD, &mpi_status);
288 	    if ((new_ares_p->res = (int *)calloc(new_ares_p->nres,sizeof(int)))==NULL) {
289 	      fprintf(stderr, "cannot allocate res for a_res from %d\n",this_node);
290 	      exit(1);
291 	    }
292 	    MPI_Recv(new_ares_p->res, new_ares_p->nres, MPI_INT, this_node,
293 		     ALN_TYPE2, MPI_COMM_WORLD, &mpi_status);
294 	    /* now get alignment encodings if available */
295 	    if (new_ares_p->aln_code) {
296 	      if ((new_ares_p->aln_code = (char *)calloc(new_ares_p->aln_code_n+1,sizeof(char)))==NULL) {
297 		fprintf(stderr, "cannot allocate aln_code for a_res from %d\n",this_node);
298 		exit(1);
299 	      }
300 	      MPI_Recv(new_ares_p->aln_code, new_ares_p->aln_code_n+1, MPI_BYTE, this_node,
301 		       ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
302 	    }
303 	    if (new_ares_p->ann_code) {
304 	      if ((new_ares_p->ann_code = (char *)calloc(new_ares_p->ann_code_n+1,sizeof(char)))==NULL) {
305 		fprintf(stderr, "cannot allocate ann_code for a_res from %d\n",this_node);
306 		exit(1);
307 	      }
308 	      MPI_Recv(new_ares_p->ann_code, new_ares_p->ann_code_n+1, MPI_BYTE, this_node,
309 		       ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
310 
311 	    }
312 	  }	/* finished with the ares_chain */
313 	} /* done with have_ares */
314 	else {
315 #ifdef DEBUG
316 	  fprintf(stderr, " getting alignment with no have_ares[%d]: %d/%d",
317 		  this_buf_p->hdr.worker_idx,i,this_buf_p->buf2_ares[i].best_idx);
318 #endif
319 	}
320       }	/* done with buf2_ares[buf2_cnt] */
321     } /* done with BUF_DOALIGN */
322   } /* done with have_results */
323   *snode = this_node;
324   return this_buf_p;
325 }
326 
327 /* wait until a worker/buffer is available */
get_rbuf(struct buf_head ** cur_buf,int max_work_buf)328 void get_rbuf(struct buf_head **cur_buf, int max_work_buf)
329 {
330   int node, snode;
331   int i_msg_b[2], nresults;
332   struct buf_head *this_buf_p;
333 
334 #ifdef MPI_SRC
335   MPI_Status mpi_status;
336 #endif
337 
338   if (num_workers_idle == 0) {
339     this_buf_p = next_work_result(&snode);
340 
341     work_q[next_worker_idx] = snode;
342     num_workers_idle++;
343   }
344   else {
345     this_buf_p = worker_buf[work_q[next_worker_idx]-FIRSTNODE];
346   }
347 
348   *cur_buf = this_buf_p;
349 
350   /* update worker queue */
351   next_worker_idx = (next_worker_idx+1)%(max_work_buf);
352 }
353 
354 /* put_rbuf() takes a buffer filled with sequences to be compared
355    sends it to a worker */
356 
put_rbuf(struct buf_head * cur_buf,int max_work_buf)357 void put_rbuf(struct buf_head *cur_buf, int max_work_buf)
358 {
359 #ifdef MPI_SRC
360   MPI_Status mpi_status;
361 #endif
362   struct seq_record *cur_seq_p, *tmp_seq_p;
363   int i, j, snode, buf2_cnt, seqr_cnt;
364   int cur_aa1b_size, max_aa1b_size;
365 
366   /* do not send msg if no data */
367   if (!cur_buf->hdr.have_data || !(cur_buf->hdr.buf2_cnt > 0)) {return;}
368 
369   /* here, since we have a buffer, we have a worker, just send the info */
370   snode = cur_buf->hdr.worker_idx;
371   buf2_cnt = cur_buf->hdr.buf2_cnt;
372   seqr_cnt = cur_buf->hdr.seqr_cnt;
373   max_aa1b_size = cur_buf->hdr.aa1b_size;
374 
375 #ifdef DEBUG
376   /*   fprintf(stderr," sending %d/%d seqs to %d\n", buf2_cnt, seqr_cnt, snode); */
377 #endif
378   /* send header */
379   MPI_Send(&cur_buf->hdr, sizeof(struct buf2_hdr_s), MPI_BYTE, snode,
380 	   MSEQTYPE0, MPI_COMM_WORLD);
381 
382   /* send data */
383   MPI_Send(cur_buf->buf2_data, sizeof(struct buf2_data_s)*buf2_cnt,
384 	   MPI_BYTE, snode, MSEQTYPE1, MPI_COMM_WORLD);
385 
386   /* before sending sequence records, we need to check to see if we
387      need to transfer to a continuous location (or send lots of short
388      records) */
389 
390 #ifdef DEBUG
391   cur_aa1b_size = 0;
392   for (i=0; i < buf2_cnt; i++) {
393     cur_seq_p = cur_buf->buf2_data[i].seq;
394     if (!cur_buf->buf2_data[i].seq_dup) {
395       cur_aa1b_size += cur_seq_p->n1+1;
396     }
397     if (check_seq_range(cur_seq_p->aa1b, cur_seq_p->n1, 50, "put_rbuf()")) {
398       fprintf(stderr, "[put_rbuf] range error at: %d\n", i);
399     }
400   }
401 
402   if (cur_aa1b_size != cur_buf->hdr.aa1b_used) {
403     fprintf(stderr,"[put_rbuf:%d] aa1b_used size mismatch: %d != %d\n",
404 	    snode, cur_aa1b_size, cur_buf->hdr.aa1b_used);
405   }
406 #endif
407 
408   if (cur_buf->hdr.seq_record_continuous) {
409     /* send sequence records associated with data in one message */
410     MPI_Send(cur_buf->hdr.seq_b, sizeof(struct seq_record)*seqr_cnt,
411 	     MPI_BYTE, snode, MSEQTYPE2, MPI_COMM_WORLD);
412     MPI_Send(cur_buf->hdr.aa1b_start, cur_buf->hdr.aa1b_used+1,
413 	     MPI_BYTE, snode, MSEQTYPE3, MPI_COMM_WORLD);
414   }
415   else {
416     /* send individual sequence records */
417     cur_aa1b_size = 0;
418     for (i=0; i < buf2_cnt; i++) {
419       cur_seq_p = cur_buf->buf2_data[i].seq;
420       if (!cur_buf->buf2_data[i].seq_dup) {	/* don't send sequence if its a duplicate */
421 	MPI_Send(cur_seq_p, sizeof(struct seq_record),
422 		 MPI_BYTE, snode, MSEQTYPE4, MPI_COMM_WORLD);
423 	MPI_Send(cur_seq_p->aa1b, cur_seq_p->n1+1,
424 		 MPI_BYTE, snode, MSEQTYPE5, MPI_COMM_WORLD);
425       }
426     }
427   }
428 
429   /* reduce the number of idle workers */
430   num_workers_idle--;
431 }
432 
433 /* wait_rbuf() -- wait for the worker threads to finish with the
434    current sequence buffers.
435 */
wait_rbuf(int used_reader_bufs)436 void wait_rbuf(int used_reader_bufs) {
437   int snode;
438 
439   while (num_workers_idle < max_worker_q) {
440     next_work_result(&snode);
441     num_workers_idle++;
442   }
443 
444   /* all workers are idle, re-initialize work_q */
445   for (snode = 0; snode < max_worker_q; snode++) {
446     work_q[snode] = snode + FIRSTNODE;
447   }
448 }
449 
rbuf_done(int nthreads)450 void rbuf_done(int nthreads)
451 {
452 #ifdef MPI_SRC
453   MPI_Status mpi_status;
454 #endif
455   int status, i;
456 
457   /* use a dummy buf_head to send buf2_cnt=0, stop_work=1 */
458   struct buf2_hdr_s tmp_buf2_hdr;
459 
460   tmp_buf2_hdr.stop_work = 1;
461   tmp_buf2_hdr.buf2_cnt = 0;
462 
463   /* send a message to all the workers waiting for get_wbuf()
464      to quit
465    */
466 
467   for (i=FIRSTNODE; i < nthreads+FIRSTNODE; i++) {
468     MPI_Send(&tmp_buf2_hdr, sizeof(struct buf2_hdr_s), MPI_BYTE, i,
469 	     MSEQTYPE0, MPI_COMM_WORLD);
470   }
471 }
472 
473 /* get_wbuf() -- called in workers
474    get a buffer full of sequences to be compared from the main program
475 
476    this function should follow put_rbuf() message for message
477 
478    In the PCOMPLIB version, there is no queue of buffers to be read,
479    but we must have space to put the messages in as we receive them,
480    and we must fix the pointers in the seq_records
481  */
482 
get_wbuf(struct buf_head ** cur_buf,int max_work_buf)483 int get_wbuf(struct buf_head **cur_buf, int max_work_buf)
484 {
485 #ifdef MPI_SRC
486   MPI_Status mpi_status;
487 #endif
488 
489   /* we need to preserve some sequence pointer information so it is not
490      over-written by the messages */
491 
492   struct seq_record *seq_base, *cur_seq_p, *prev_seq_p, *old_host_seq_p, *host_seq_p;
493   struct buf2_data_s *cur_buf2_dp;
494   unsigned char *aa1b_start_save, *old_aa1b_start, *cur_aa1b;
495   unsigned char *host_aa1b, *old_host_aa1b;
496   int buf2_cnt, i, j, cur_n1, seqr_cnt;
497   int max_aa1b_size, aa1b_size_save;
498   int cur_aa1b_size;
499   int snode;
500 
501   snode = (*cur_buf)->hdr.worker_idx;
502   seq_base = (*cur_buf)->hdr.seq_b;
503   aa1b_start_save = (*cur_buf)->hdr.aa1b_start;
504   max_aa1b_size = aa1b_size_save = (*cur_buf)->hdr.aa1b_size;
505 
506   /* put invalid bytes in aa1b to check for transmission errors */
507   memset(aa1b_start_save, 127, aa1b_size_save);
508 
509   MPI_Recv(&(*cur_buf)->hdr, sizeof(struct buf2_hdr_s), MPI_BYTE, 0,
510 	   MSEQTYPE0, MPI_COMM_WORLD, &mpi_status);
511 
512   buf2_cnt = (*cur_buf)->hdr.buf2_cnt;
513   seqr_cnt = (*cur_buf)->hdr.seqr_cnt;
514 
515   if (buf2_cnt <= 0 || (*cur_buf)->hdr.stop_work) { return 0; }
516 
517   /* get the buf2_data array, which has seq_dup and ->seq records */
518   MPI_Recv((*cur_buf)->buf2_data, sizeof(struct buf2_data_s)*buf2_cnt,
519 	   MPI_BYTE, 0, MSEQTYPE1, MPI_COMM_WORLD, &mpi_status);
520 
521 #ifdef DEBUG
522   /*   fprintf(stderr,"[%d/get_wbuf] receiving %d/%d sequences\n",snode, buf2_cnt, seqr_cnt); */
523 #endif
524 
525   /* get seq_records (but not mseq_records, don't need them) */
526   if ((*cur_buf)->hdr.seq_record_continuous) {
527     MPI_Recv(seq_base, sizeof(struct seq_record)*seqr_cnt,
528 	     MPI_BYTE, 0, MSEQTYPE2, MPI_COMM_WORLD, &mpi_status);
529 
530     /* now get the sequence data */
531     MPI_Recv(aa1b_start_save, (*cur_buf)->hdr.aa1b_used+1,
532 	     MPI_BYTE, 0, MSEQTYPE3, MPI_COMM_WORLD, &mpi_status);
533 
534     /* map the seq records back into buf2_data */
535     /* must check for duplicate sequence records, initialize buf2_data[i]->seq
536        AND seq.aa1b in the same pass */
537 
538     cur_buf2_dp = (*cur_buf)->buf2_data;
539     cur_seq_p = prev_seq_p = seq_base;
540 
541     cur_aa1b = aa1b_start_save;
542     cur_aa1b_size = 0;
543 
544     for (i=0; i < buf2_cnt; i++, cur_buf2_dp++) {
545       if (!cur_buf2_dp->seq_dup) {	/* not a duplicate */
546 	cur_seq_p->aa1b = cur_aa1b;
547 	cur_aa1b += cur_seq_p->n1 + 1;
548 	cur_aa1b_size += cur_seq_p->n1 + 1;
549 	cur_buf2_dp->seq = cur_seq_p++;
550       }
551       else {		/* duplicate */
552 	cur_buf2_dp->seq = prev_seq_p;	/* point to the previous value */
553 	prev_seq_p = cur_seq_p;
554       }
555     }
556 
557     if (cur_aa1b_size != (*cur_buf)->hdr.aa1b_used) {
558       fprintf(stderr, "[%d] incorrect cur_aa1b_size: %d != %d [%d]\n",
559 	      snode, cur_aa1b_size, (*cur_buf)->hdr.aa1b_used);
560     }
561   }
562   else { /* not continuous, get seq_records one at a time */
563     cur_seq_p = seq_base;
564     cur_aa1b = aa1b_start_save;
565     cur_buf2_dp = (*cur_buf)->buf2_data;
566     cur_aa1b_size = 0;
567     for (i=0; i < buf2_cnt; i++) {
568       /* get a seq record */
569       if (!(*cur_buf)->buf2_data[i].seq_dup) {	/* not a duplicate, so get it */
570 	MPI_Recv(cur_seq_p, sizeof(struct seq_record),
571 		 MPI_BYTE, 0, MSEQTYPE4, MPI_COMM_WORLD, &mpi_status);
572 	/* get the sequence itself */
573 	prev_seq_p = cur_seq_p;
574 	cur_n1 = cur_seq_p->n1;
575 	cur_aa1b_size += cur_n1+1;
576 	if (cur_aa1b_size >= max_aa1b_size) {
577 	  fprintf(stderr,"[get_wbuf:%d] -- receive buffer too small %d > %d\n",
578 		  (*cur_buf)->hdr.worker_idx, cur_aa1b_size, max_aa1b_size);
579 	  exit(1);
580 	}
581 
582 	MPI_Recv(cur_aa1b, cur_n1+1, MPI_BYTE, 0, MSEQTYPE5, MPI_COMM_WORLD, &mpi_status);
583 	cur_seq_p->aa1b = cur_aa1b;
584 #ifdef DEBUG
585 	if (cur_seq_p->adler32_crc != adler32(1L,cur_aa1b,cur_n1)) {
586 	  fprintf(stderr," [get_wbuf:%d] -- adler32 mismatch; n1: %d\n",
587 		  (*cur_buf)->hdr.worker_idx, cur_n1);
588 	}
589 #endif
590 
591 	cur_buf2_dp->seq = cur_seq_p++;
592 	cur_aa1b += cur_n1+1;
593       }
594       else {	/* its a duplicate, so point to the original version */
595 	cur_buf2_dp->seq = prev_seq_p;
596       }
597       cur_buf2_dp++;
598     }
599   }
600 
601   /* restore the seq_b, aa1b_start that were over-written */
602   (*cur_buf)->hdr.seq_b = seq_base;
603   (*cur_buf)->hdr.aa1b_start = aa1b_start_save;
604   (*cur_buf)->hdr.aa1b_size = aa1b_size_save;
605 
606   /*
607   for (i=0; i < buf2_cnt; i++) {
608     cur_seq_p = (*cur_buf)->buf2_data[i].seq;
609     if (check_seq_range(cur_seq_p->aa1b, cur_seq_p->n1, 50, "get_wbuf()")) {
610       fprintf(stderr, "[%d] (get_wbuf) range error at: %d/%d (seqr_cnt: %d)\n",
611 	      (*cur_buf)->hdr.worker_idx, i, buf2_cnt, seqr_cnt);
612     }
613   }
614   */
615 
616   return 1;
617 }
618 
619 /* put_wbuf() -- called in workers
620 
621    In the PCOMPLIB version, there is no queue of buffers to be read,
622    so just send the buffer to the manager
623  */
put_wbuf(struct buf_head * cur_buf,int max_work_buf)624 void put_wbuf(struct buf_head *cur_buf, int max_work_buf)
625 {
626   int int_msg_b[4], i;
627   struct buf2_ares_s *buf2_ares_p;
628   struct a_res_str *cur_ares_p, *next_ares_p;
629 #ifdef MPI_SRC
630   MPI_Status mpi_status;
631 #endif
632 
633   MPI_Send(&cur_buf->hdr, sizeof(struct buf_head), MPI_BYTE, 0,
634 	   RES_TYPE0, MPI_COMM_WORLD);
635 
636   if (!cur_buf->hdr.have_results) { return;}
637 
638   /* have buf2_res type results */
639   if (cur_buf->hdr.buf2_type & (BUF2_DOWORK + BUF2_DOSHUF+BUF2_DOOPT)) {
640     MPI_Send(cur_buf->buf2_res, sizeof(struct buf2_res_s)*cur_buf->hdr.buf2_cnt, MPI_BYTE, 0,
641 	     RES_TYPE1, MPI_COMM_WORLD);
642   }
643 
644   /* have buf2_ares type results */
645   if (cur_buf->hdr.buf2_type & BUF2_DOALIGN) {
646     /* buf2_ares does not have much useful information, except have_ares and a chain of *a_res pointers.
647        so we need to:
648        (1) send have_ares
649        (2) send each part of the a_res chain individually
650     */
651 
652     buf2_ares_p = cur_buf->buf2_ares;
653     for (i=0; i < cur_buf->hdr.buf2_cnt; i++) {
654       int_msg_b[0] = buf2_ares_p->have_ares;
655       MPI_Send(int_msg_b, 1, MPI_INT, 0, ALN_TYPE0, MPI_COMM_WORLD);
656       if (buf2_ares_p->have_ares) {
657 	/* (a) send the first one */
658 	for (cur_ares_p = buf2_ares_p->a_res; cur_ares_p; cur_ares_p = cur_ares_p->next) {
659 	  MPI_Send(cur_ares_p, sizeof(struct a_res_str), MPI_BYTE, 0, ALN_TYPE1, MPI_COMM_WORLD);
660 	  MPI_Send(cur_ares_p->res, cur_ares_p->nres ,MPI_INT, 0, ALN_TYPE2, MPI_COMM_WORLD);
661 	  if (cur_ares_p->aln_code) {
662 	    MPI_Send(cur_ares_p->aln_code, cur_ares_p->aln_code_n+1 ,MPI_BYTE, 0, ALN_TYPE3, MPI_COMM_WORLD);
663 	  }
664 	  if (cur_ares_p->ann_code) {
665 	    MPI_Send(cur_ares_p->ann_code, cur_ares_p->ann_code_n+1 ,MPI_BYTE, 0, ALN_TYPE3, MPI_COMM_WORLD);
666 	  }
667 	} /* done with a_res chain */
668 
669 	/* free the chain */
670 	cur_ares_p = buf2_ares_p->a_res;
671 	while (cur_ares_p) {
672 	  if (cur_ares_p->aln_code) free(cur_ares_p->aln_code);
673 	  if (cur_ares_p->ann_code) free(cur_ares_p->ann_code);
674 	  if ((buf2_ares_p->have_ares & 0x1) && cur_ares_p->res) free(cur_ares_p->res);
675 	  next_ares_p = cur_ares_p->next;
676 	  free(cur_ares_p);
677 	  cur_ares_p = next_ares_p;
678 	}
679 	buf2_ares_p->a_res = NULL;
680       } /* done with have_ares */
681       buf2_ares_p->have_ares = 0;	/* must be zero-ed out for later use */
682       buf2_ares_p++;
683     } /* done with buf2_ares[buf2_cnt] */
684   } /* done with BUF2_DOALIGN */
685 } /* done with put_wbuf() */
686