1 /* $Id: pcomp_subs2.c $ */
2
3 /* copyright (c) 1996, 1997, 1998, 1999, 2014 by William R. Pearson and
4 The Rector & Visitors of the University of Virginia */
5
6 /* Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing,
13 software distributed under this License is distributed on an "AS
14 IS" BASIS, WITHOUT WRRANTIES OR CONDITIONS OF ANY KIND, either
15 express or implied. See the License for the specific language
16 governing permissions and limitations under the License.
17 */
18
19 /* modified to do more initialization of work_info here, rather than
20 in main() */
21
22 /* this file provides the same functions for PCOMPLIB as pthr_subs2.c does for COMP_THR */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <time.h>
28 #include <sys/types.h>
29 #ifdef UNIX
30 #include <unistd.h>
31 #endif
32 #include <signal.h>
33
34 #include "defs.h"
35 #include "structs.h" /* mngmsg, libstruct */
36 #include "param.h" /* pstruct, thr_str, buf_head, rstruct */
37 #include "thr_buf_structs.h"
38
39 #ifdef MPI_SRC
40 #include "mpi.h"
41 #endif
42
43 #include "msg.h"
44 #include "pcomp_bufs.h"
45 #define XTERNAL
46 #include "uascii.h"
47 #undef XTERNAL
48 #include "pthr_subs.h"
49
50 #ifdef DEBUG
51 unsigned long adler32(unsigned long, const unsigned char *, unsigned int);
52 #endif
53
54 static int next_worker_idx, num_workers_idle;
55 extern int g_worker;
56
57 /* used for debugging */
58 /*
59 int check_seq_range(unsigned char *aa1b, int n1, int nsq, char *);
60 */
61
62 /* start the workers, nworkers == number of workers, not nodes */
63 void
init_thr(int nworkers,char * info_lib_range_p,const struct mngmsg * m_msp,struct pstruct * ppst,unsigned char * aa0,struct mng_thr * m_bufi_p)64 init_thr(int nworkers, char *info_lib_range_p, const struct mngmsg *m_msp, struct pstruct *ppst,
65 unsigned char *aa0, struct mng_thr *m_bufi_p)
66 {
67 #ifdef MPI_SRC
68 MPI_Status mpi_status;
69 int int_msg_b[4]; /* general purpose buffer for integers */
70 #endif
71 int node, snode;
72
73 /* start the worker processes */
74
75 if (work_q == NULL) {
76 if ((work_q=(int *)calloc(nworkers, sizeof(int)))==NULL) {
77 fprintf(stderr, " cannot allocate work_q[%d] structure\n",
78 nworkers);
79 exit(1);
80 }
81 else {max_worker_q = nworkers;}
82 }
83 num_workers_idle = 0;
84
85 /* setup thread buffer info */
86 if (aa0 == NULL) {
87 int_msg_b[0] = int_msg_b[1] = int_msg_b[2] = 0;
88 }
89 else {
90 int_msg_b[0] = nworkers;
91 int_msg_b[2] = m_bufi_p->max_chain_seqs;
92 int_msg_b[3] = m_bufi_p->seq_buf_size;
93 }
94
95 /* send thread info */
96 for (node=FIRSTNODE; node < nworkers+FIRSTNODE; node++) {
97
98 MPI_Send(int_msg_b, 4, MPI_INT, node, STARTTYPE0, MPI_COMM_WORLD);
99
100 if (aa0 == NULL) { continue;}
101
102 /* send mngmsg */
103 MPI_Send((void *)m_msp, sizeof(struct mngmsg), MPI_BYTE, node,
104 STARTTYPE1, MPI_COMM_WORLD);
105
106 MPI_Send(ppst, sizeof(struct pstruct), MPI_BYTE, node,
107 STARTTYPE2, MPI_COMM_WORLD);
108
109 /* send the rest of the pieces of pam[2] */
110 MPI_Send(&ppst->pam2[0][0][0],m_msp->pamd1*m_msp->pamd2,MPI_INT,node,STARTTYPE3,
111 MPI_COMM_WORLD);
112 MPI_Send(&ppst->pam2[1][0][0],m_msp->pamd1*m_msp->pamd2,MPI_INT,node,STARTTYPE3,
113 MPI_COMM_WORLD);
114
115 /* send pascii (only for fasty/tfasty */
116 MPI_Send(pascii, sizeof(aascii), MPI_BYTE, node, STARTTYPE4, MPI_COMM_WORLD);
117 }
118
119 if (aa0 == NULL) {
120 /* all done */
121 free(work_q);
122 return;
123 }
124
125 /* wait for returned status results */
126 while (num_workers_idle < max_worker_q) {
127 MPI_Recv(&node, 1, MPI_INT, MPI_ANY_SOURCE,MSEQTYPE0,
128 MPI_COMM_WORLD, &mpi_status);
129 snode= mpi_status.MPI_SOURCE;
130 if (snode == FIRSTNODE) {
131 MPI_Recv(info_lib_range_p, MAX_FN, MPI_BYTE, snode,MSEQTYPE0,
132 MPI_COMM_WORLD, &mpi_status);
133 }
134
135 if (snode != node) {
136 fprintf(stderr, " initial node mismatch [%d!=%d]\n",node, snode);
137 }
138 worker_buf[snode-FIRSTNODE]->hdr.have_data = 0;
139 worker_buf[snode-FIRSTNODE]->hdr.have_results = 0;
140 worker_buf[snode-FIRSTNODE]->hdr.worker_idx = snode;
141 work_q[num_workers_idle++] = snode;
142 }
143 next_worker_idx = 0;
144
145 /* send query sequence info to workers */
146 for (node=FIRSTNODE; node < nworkers+FIRSTNODE; node++) {
147 /* send thread buffer info */
148 int_msg_b[0] = m_msp->n0;
149 int_msg_b[1] = m_msp->nm0;
150 MPI_Send(int_msg_b, 2, MPI_INT, node, QSEQTYPE0, MPI_COMM_WORLD);
151 MPI_Send(aa0, m_msp->n0+1, MPI_BYTE, node, QSEQTYPE1, MPI_COMM_WORLD);
152 if (m_msp->ann_flg && m_msp->aa0a) {
153 MPI_Send(m_msp->aa0a, m_msp->n0+2, MPI_BYTE, node, QSEQTYPE1, MPI_COMM_WORLD);
154 }
155 }
156 }
157
158 /* get_rbuf() provides buffers containing sequences to the main
159 program. max_work_q buffers are available, with each
160 buffer tied to a worker.
161
162 As the main program runs, it calls get_rbuf() to get a worker
163 buffer (reader buffers are not used with PCOMPLIB), fills it with
164 sequences, and sends it to a worker with put_rbuf().
165
166 At the same time, the worker programs call get_wbuf(), to get a
167 filled worker buffer sent by put_rbuf(), takes the sequences from
168 the buffer and does the comparisons, and sends the results back to
169 the manager by calling put_wbuf().
170 */
171
172 /* wait for results from any worker */
173 struct buf_head *
next_work_result(int * snode)174 next_work_result(int *snode) {
175 int this_node, buf2_cnt;
176 int int_msg_b[4]; /* general purpose int buffer */
177 int i;
178 struct buf2_hdr_s buf2_head;
179 struct buf_head *this_buf_p, tmp_buf_head;
180 struct seq_record *seq_b_save;
181 struct mseq_record *mseq_b_save;
182 unsigned char *aa1b_start_save;
183 struct a_res_str *new_ares_p, *prev_ares_p;
184 #ifdef MPI_SRC
185 MPI_Status mpi_status;
186 #endif
187
188 /* wait for a returned result */
189 MPI_Recv(&tmp_buf_head, sizeof(struct buf_head), MPI_BYTE, MPI_ANY_SOURCE,RES_TYPE0,
190 MPI_COMM_WORLD, &mpi_status);
191 this_node = mpi_status.MPI_SOURCE;
192 buf2_cnt = tmp_buf_head.hdr.buf2_cnt;
193
194 #ifdef DEBUG
195 /*
196 fprintf(stderr," %d: %d results\n", this_node, buf2_cnt);
197 */
198 #endif
199
200 this_buf_p = worker_buf[this_node-FIRSTNODE];
201 /* move things selectively to avoid over-writing pointers to res, a_res arrays */
202
203 aa1b_start_save = this_buf_p->hdr.aa1b_start;
204 seq_b_save = this_buf_p->hdr.seq_b;
205 mseq_b_save = this_buf_p->hdr.mseq_b;
206
207 memcpy(&this_buf_p->hdr,&tmp_buf_head.hdr,sizeof(struct buf2_hdr_s));
208
209 this_buf_p->hdr.aa1b_start = aa1b_start_save;
210 this_buf_p->hdr.seq_b = seq_b_save;
211 this_buf_p->hdr.mseq_b =mseq_b_save;
212
213 memcpy(&this_buf_p->s_cnt_info,&tmp_buf_head.s_cnt_info,sizeof(struct score_count_s));
214
215 if (this_buf_p->hdr.have_results) {
216 if (this_buf_p->hdr.buf2_type & (BUF2_DOWORK + BUF2_DOSHUF + BUF2_DOOPT)) {
217 MPI_Recv(this_buf_p->buf2_res, sizeof(struct buf2_res_s)*buf2_cnt,
218 MPI_BYTE, this_node, RES_TYPE1, MPI_COMM_WORLD, &mpi_status);
219 /*
220 for (i=0; i < buf2_cnt; i++) {
221 if (this_buf_p->buf2_res[i].rst.score[2] > 200) {
222 fprintf(stderr, "HS[%d:%d,%d]: %d (%d:%d)\n",i,this_node, tmp_buf_head.hdr.worker_idx, this_buf_p->buf2_res[i].rst.score[2],
223 this_buf_p->buf2_data[i].seq->index,this_buf_p->buf2_data[i].seq->n1);
224 }
225 }
226 */
227 }
228
229 if (this_buf_p->hdr.buf2_type & BUF2_DOALIGN) {
230 /* (1) get a message that has "have_ares"
231 (2) allocate space for each a_res and receive it individually
232 (3) reset the ->next pointers for the a_res chain
233 */
234
235 for (i = 0; i < buf2_cnt; i++) {
236 MPI_Recv(int_msg_b, 1, MPI_INT, this_node, ALN_TYPE0, MPI_COMM_WORLD, &mpi_status);
237 this_buf_p->buf2_ares[i].have_ares = int_msg_b[0];
238 this_buf_p->buf2_ares[i].a_res = NULL; /* pre-initialize */
239
240 if (this_buf_p->buf2_ares[i].have_ares) {
241 /* allocate space to receive it */
242 if ((new_ares_p = (struct a_res_str *)calloc(1,sizeof(struct a_res_str)))==NULL) {
243 fprintf(stderr, "cannot allocate a_res from %d\n",this_node);
244 exit(1);
245 }
246 /* save the head of the ares_chain */
247 this_buf_p->buf2_ares[i].a_res = new_ares_p;
248
249 /* get the first a_res */
250 MPI_Recv(new_ares_p, sizeof(struct a_res_str), MPI_BYTE, this_node,
251 ALN_TYPE1, MPI_COMM_WORLD, &mpi_status);
252 /* get the associated res[nres] */
253 if ((new_ares_p->res = (int *)calloc(new_ares_p->nres,sizeof(int)))==NULL) {
254 fprintf(stderr, "cannot allocate res for a_res from %d\n",this_node);
255 exit(1);
256 }
257 MPI_Recv(new_ares_p->res, new_ares_p->nres, MPI_INT, this_node,
258 ALN_TYPE2, MPI_COMM_WORLD, &mpi_status);
259
260 /* now get alignment encodings if available */
261 if (new_ares_p->aln_code) {
262 if ((new_ares_p->aln_code = (char *)calloc(new_ares_p->aln_code_n+1,sizeof(char)))==NULL) {
263 fprintf(stderr, "cannot allocate aln_code for a_res from %d\n",this_node);
264 exit(1);
265 }
266 MPI_Recv(new_ares_p->aln_code, new_ares_p->aln_code_n+1, MPI_BYTE, this_node,
267 ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
268 }
269 if (new_ares_p->ann_code) {
270 if ((new_ares_p->ann_code = (char *)calloc(new_ares_p->ann_code_n+1,sizeof(char)))==NULL) {
271 fprintf(stderr, "cannot allocate ann_code for a_res from %d\n",this_node);
272 exit(1);
273 }
274 MPI_Recv(new_ares_p->ann_code, new_ares_p->ann_code_n+1, MPI_BYTE, this_node,
275 ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
276
277 }
278
279 while (new_ares_p->next) { /* while the chain continues */
280 prev_ares_p = new_ares_p; /* save pointer to previous a_res to fix prev_ares->next */
281 if ((new_ares_p = (struct a_res_str *)calloc(1,sizeof(struct a_res_str)))==NULL) {
282 fprintf(stderr, "cannot allocate a_res from %d\n",this_node);
283 exit(1);
284 }
285 prev_ares_p->next = new_ares_p;
286 MPI_Recv(new_ares_p, sizeof(struct a_res_str), MPI_BYTE, this_node,
287 ALN_TYPE1, MPI_COMM_WORLD, &mpi_status);
288 if ((new_ares_p->res = (int *)calloc(new_ares_p->nres,sizeof(int)))==NULL) {
289 fprintf(stderr, "cannot allocate res for a_res from %d\n",this_node);
290 exit(1);
291 }
292 MPI_Recv(new_ares_p->res, new_ares_p->nres, MPI_INT, this_node,
293 ALN_TYPE2, MPI_COMM_WORLD, &mpi_status);
294 /* now get alignment encodings if available */
295 if (new_ares_p->aln_code) {
296 if ((new_ares_p->aln_code = (char *)calloc(new_ares_p->aln_code_n+1,sizeof(char)))==NULL) {
297 fprintf(stderr, "cannot allocate aln_code for a_res from %d\n",this_node);
298 exit(1);
299 }
300 MPI_Recv(new_ares_p->aln_code, new_ares_p->aln_code_n+1, MPI_BYTE, this_node,
301 ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
302 }
303 if (new_ares_p->ann_code) {
304 if ((new_ares_p->ann_code = (char *)calloc(new_ares_p->ann_code_n+1,sizeof(char)))==NULL) {
305 fprintf(stderr, "cannot allocate ann_code for a_res from %d\n",this_node);
306 exit(1);
307 }
308 MPI_Recv(new_ares_p->ann_code, new_ares_p->ann_code_n+1, MPI_BYTE, this_node,
309 ALN_TYPE3, MPI_COMM_WORLD, &mpi_status);
310
311 }
312 } /* finished with the ares_chain */
313 } /* done with have_ares */
314 else {
315 #ifdef DEBUG
316 fprintf(stderr, " getting alignment with no have_ares[%d]: %d/%d",
317 this_buf_p->hdr.worker_idx,i,this_buf_p->buf2_ares[i].best_idx);
318 #endif
319 }
320 } /* done with buf2_ares[buf2_cnt] */
321 } /* done with BUF_DOALIGN */
322 } /* done with have_results */
323 *snode = this_node;
324 return this_buf_p;
325 }
326
327 /* wait until a worker/buffer is available */
get_rbuf(struct buf_head ** cur_buf,int max_work_buf)328 void get_rbuf(struct buf_head **cur_buf, int max_work_buf)
329 {
330 int node, snode;
331 int i_msg_b[2], nresults;
332 struct buf_head *this_buf_p;
333
334 #ifdef MPI_SRC
335 MPI_Status mpi_status;
336 #endif
337
338 if (num_workers_idle == 0) {
339 this_buf_p = next_work_result(&snode);
340
341 work_q[next_worker_idx] = snode;
342 num_workers_idle++;
343 }
344 else {
345 this_buf_p = worker_buf[work_q[next_worker_idx]-FIRSTNODE];
346 }
347
348 *cur_buf = this_buf_p;
349
350 /* update worker queue */
351 next_worker_idx = (next_worker_idx+1)%(max_work_buf);
352 }
353
354 /* put_rbuf() takes a buffer filled with sequences to be compared
355 sends it to a worker */
356
put_rbuf(struct buf_head * cur_buf,int max_work_buf)357 void put_rbuf(struct buf_head *cur_buf, int max_work_buf)
358 {
359 #ifdef MPI_SRC
360 MPI_Status mpi_status;
361 #endif
362 struct seq_record *cur_seq_p, *tmp_seq_p;
363 int i, j, snode, buf2_cnt, seqr_cnt;
364 int cur_aa1b_size, max_aa1b_size;
365
366 /* do not send msg if no data */
367 if (!cur_buf->hdr.have_data || !(cur_buf->hdr.buf2_cnt > 0)) {return;}
368
369 /* here, since we have a buffer, we have a worker, just send the info */
370 snode = cur_buf->hdr.worker_idx;
371 buf2_cnt = cur_buf->hdr.buf2_cnt;
372 seqr_cnt = cur_buf->hdr.seqr_cnt;
373 max_aa1b_size = cur_buf->hdr.aa1b_size;
374
375 #ifdef DEBUG
376 /* fprintf(stderr," sending %d/%d seqs to %d\n", buf2_cnt, seqr_cnt, snode); */
377 #endif
378 /* send header */
379 MPI_Send(&cur_buf->hdr, sizeof(struct buf2_hdr_s), MPI_BYTE, snode,
380 MSEQTYPE0, MPI_COMM_WORLD);
381
382 /* send data */
383 MPI_Send(cur_buf->buf2_data, sizeof(struct buf2_data_s)*buf2_cnt,
384 MPI_BYTE, snode, MSEQTYPE1, MPI_COMM_WORLD);
385
386 /* before sending sequence records, we need to check to see if we
387 need to transfer to a continuous location (or send lots of short
388 records) */
389
390 #ifdef DEBUG
391 cur_aa1b_size = 0;
392 for (i=0; i < buf2_cnt; i++) {
393 cur_seq_p = cur_buf->buf2_data[i].seq;
394 if (!cur_buf->buf2_data[i].seq_dup) {
395 cur_aa1b_size += cur_seq_p->n1+1;
396 }
397 if (check_seq_range(cur_seq_p->aa1b, cur_seq_p->n1, 50, "put_rbuf()")) {
398 fprintf(stderr, "[put_rbuf] range error at: %d\n", i);
399 }
400 }
401
402 if (cur_aa1b_size != cur_buf->hdr.aa1b_used) {
403 fprintf(stderr,"[put_rbuf:%d] aa1b_used size mismatch: %d != %d\n",
404 snode, cur_aa1b_size, cur_buf->hdr.aa1b_used);
405 }
406 #endif
407
408 if (cur_buf->hdr.seq_record_continuous) {
409 /* send sequence records associated with data in one message */
410 MPI_Send(cur_buf->hdr.seq_b, sizeof(struct seq_record)*seqr_cnt,
411 MPI_BYTE, snode, MSEQTYPE2, MPI_COMM_WORLD);
412 MPI_Send(cur_buf->hdr.aa1b_start, cur_buf->hdr.aa1b_used+1,
413 MPI_BYTE, snode, MSEQTYPE3, MPI_COMM_WORLD);
414 }
415 else {
416 /* send individual sequence records */
417 cur_aa1b_size = 0;
418 for (i=0; i < buf2_cnt; i++) {
419 cur_seq_p = cur_buf->buf2_data[i].seq;
420 if (!cur_buf->buf2_data[i].seq_dup) { /* don't send sequence if its a duplicate */
421 MPI_Send(cur_seq_p, sizeof(struct seq_record),
422 MPI_BYTE, snode, MSEQTYPE4, MPI_COMM_WORLD);
423 MPI_Send(cur_seq_p->aa1b, cur_seq_p->n1+1,
424 MPI_BYTE, snode, MSEQTYPE5, MPI_COMM_WORLD);
425 }
426 }
427 }
428
429 /* reduce the number of idle workers */
430 num_workers_idle--;
431 }
432
433 /* wait_rbuf() -- wait for the worker threads to finish with the
434 current sequence buffers.
435 */
wait_rbuf(int used_reader_bufs)436 void wait_rbuf(int used_reader_bufs) {
437 int snode;
438
439 while (num_workers_idle < max_worker_q) {
440 next_work_result(&snode);
441 num_workers_idle++;
442 }
443
444 /* all workers are idle, re-initialize work_q */
445 for (snode = 0; snode < max_worker_q; snode++) {
446 work_q[snode] = snode + FIRSTNODE;
447 }
448 }
449
rbuf_done(int nthreads)450 void rbuf_done(int nthreads)
451 {
452 #ifdef MPI_SRC
453 MPI_Status mpi_status;
454 #endif
455 int status, i;
456
457 /* use a dummy buf_head to send buf2_cnt=0, stop_work=1 */
458 struct buf2_hdr_s tmp_buf2_hdr;
459
460 tmp_buf2_hdr.stop_work = 1;
461 tmp_buf2_hdr.buf2_cnt = 0;
462
463 /* send a message to all the workers waiting for get_wbuf()
464 to quit
465 */
466
467 for (i=FIRSTNODE; i < nthreads+FIRSTNODE; i++) {
468 MPI_Send(&tmp_buf2_hdr, sizeof(struct buf2_hdr_s), MPI_BYTE, i,
469 MSEQTYPE0, MPI_COMM_WORLD);
470 }
471 }
472
473 /* get_wbuf() -- called in workers
474 get a buffer full of sequences to be compared from the main program
475
476 this function should follow put_rbuf() message for message
477
478 In the PCOMPLIB version, there is no queue of buffers to be read,
479 but we must have space to put the messages in as we receive them,
480 and we must fix the pointers in the seq_records
481 */
482
get_wbuf(struct buf_head ** cur_buf,int max_work_buf)483 int get_wbuf(struct buf_head **cur_buf, int max_work_buf)
484 {
485 #ifdef MPI_SRC
486 MPI_Status mpi_status;
487 #endif
488
489 /* we need to preserve some sequence pointer information so it is not
490 over-written by the messages */
491
492 struct seq_record *seq_base, *cur_seq_p, *prev_seq_p, *old_host_seq_p, *host_seq_p;
493 struct buf2_data_s *cur_buf2_dp;
494 unsigned char *aa1b_start_save, *old_aa1b_start, *cur_aa1b;
495 unsigned char *host_aa1b, *old_host_aa1b;
496 int buf2_cnt, i, j, cur_n1, seqr_cnt;
497 int max_aa1b_size, aa1b_size_save;
498 int cur_aa1b_size;
499 int snode;
500
501 snode = (*cur_buf)->hdr.worker_idx;
502 seq_base = (*cur_buf)->hdr.seq_b;
503 aa1b_start_save = (*cur_buf)->hdr.aa1b_start;
504 max_aa1b_size = aa1b_size_save = (*cur_buf)->hdr.aa1b_size;
505
506 /* put invalid bytes in aa1b to check for transmission errors */
507 memset(aa1b_start_save, 127, aa1b_size_save);
508
509 MPI_Recv(&(*cur_buf)->hdr, sizeof(struct buf2_hdr_s), MPI_BYTE, 0,
510 MSEQTYPE0, MPI_COMM_WORLD, &mpi_status);
511
512 buf2_cnt = (*cur_buf)->hdr.buf2_cnt;
513 seqr_cnt = (*cur_buf)->hdr.seqr_cnt;
514
515 if (buf2_cnt <= 0 || (*cur_buf)->hdr.stop_work) { return 0; }
516
517 /* get the buf2_data array, which has seq_dup and ->seq records */
518 MPI_Recv((*cur_buf)->buf2_data, sizeof(struct buf2_data_s)*buf2_cnt,
519 MPI_BYTE, 0, MSEQTYPE1, MPI_COMM_WORLD, &mpi_status);
520
521 #ifdef DEBUG
522 /* fprintf(stderr,"[%d/get_wbuf] receiving %d/%d sequences\n",snode, buf2_cnt, seqr_cnt); */
523 #endif
524
525 /* get seq_records (but not mseq_records, don't need them) */
526 if ((*cur_buf)->hdr.seq_record_continuous) {
527 MPI_Recv(seq_base, sizeof(struct seq_record)*seqr_cnt,
528 MPI_BYTE, 0, MSEQTYPE2, MPI_COMM_WORLD, &mpi_status);
529
530 /* now get the sequence data */
531 MPI_Recv(aa1b_start_save, (*cur_buf)->hdr.aa1b_used+1,
532 MPI_BYTE, 0, MSEQTYPE3, MPI_COMM_WORLD, &mpi_status);
533
534 /* map the seq records back into buf2_data */
535 /* must check for duplicate sequence records, initialize buf2_data[i]->seq
536 AND seq.aa1b in the same pass */
537
538 cur_buf2_dp = (*cur_buf)->buf2_data;
539 cur_seq_p = prev_seq_p = seq_base;
540
541 cur_aa1b = aa1b_start_save;
542 cur_aa1b_size = 0;
543
544 for (i=0; i < buf2_cnt; i++, cur_buf2_dp++) {
545 if (!cur_buf2_dp->seq_dup) { /* not a duplicate */
546 cur_seq_p->aa1b = cur_aa1b;
547 cur_aa1b += cur_seq_p->n1 + 1;
548 cur_aa1b_size += cur_seq_p->n1 + 1;
549 cur_buf2_dp->seq = cur_seq_p++;
550 }
551 else { /* duplicate */
552 cur_buf2_dp->seq = prev_seq_p; /* point to the previous value */
553 prev_seq_p = cur_seq_p;
554 }
555 }
556
557 if (cur_aa1b_size != (*cur_buf)->hdr.aa1b_used) {
558 fprintf(stderr, "[%d] incorrect cur_aa1b_size: %d != %d [%d]\n",
559 snode, cur_aa1b_size, (*cur_buf)->hdr.aa1b_used);
560 }
561 }
562 else { /* not continuous, get seq_records one at a time */
563 cur_seq_p = seq_base;
564 cur_aa1b = aa1b_start_save;
565 cur_buf2_dp = (*cur_buf)->buf2_data;
566 cur_aa1b_size = 0;
567 for (i=0; i < buf2_cnt; i++) {
568 /* get a seq record */
569 if (!(*cur_buf)->buf2_data[i].seq_dup) { /* not a duplicate, so get it */
570 MPI_Recv(cur_seq_p, sizeof(struct seq_record),
571 MPI_BYTE, 0, MSEQTYPE4, MPI_COMM_WORLD, &mpi_status);
572 /* get the sequence itself */
573 prev_seq_p = cur_seq_p;
574 cur_n1 = cur_seq_p->n1;
575 cur_aa1b_size += cur_n1+1;
576 if (cur_aa1b_size >= max_aa1b_size) {
577 fprintf(stderr,"[get_wbuf:%d] -- receive buffer too small %d > %d\n",
578 (*cur_buf)->hdr.worker_idx, cur_aa1b_size, max_aa1b_size);
579 exit(1);
580 }
581
582 MPI_Recv(cur_aa1b, cur_n1+1, MPI_BYTE, 0, MSEQTYPE5, MPI_COMM_WORLD, &mpi_status);
583 cur_seq_p->aa1b = cur_aa1b;
584 #ifdef DEBUG
585 if (cur_seq_p->adler32_crc != adler32(1L,cur_aa1b,cur_n1)) {
586 fprintf(stderr," [get_wbuf:%d] -- adler32 mismatch; n1: %d\n",
587 (*cur_buf)->hdr.worker_idx, cur_n1);
588 }
589 #endif
590
591 cur_buf2_dp->seq = cur_seq_p++;
592 cur_aa1b += cur_n1+1;
593 }
594 else { /* its a duplicate, so point to the original version */
595 cur_buf2_dp->seq = prev_seq_p;
596 }
597 cur_buf2_dp++;
598 }
599 }
600
601 /* restore the seq_b, aa1b_start that were over-written */
602 (*cur_buf)->hdr.seq_b = seq_base;
603 (*cur_buf)->hdr.aa1b_start = aa1b_start_save;
604 (*cur_buf)->hdr.aa1b_size = aa1b_size_save;
605
606 /*
607 for (i=0; i < buf2_cnt; i++) {
608 cur_seq_p = (*cur_buf)->buf2_data[i].seq;
609 if (check_seq_range(cur_seq_p->aa1b, cur_seq_p->n1, 50, "get_wbuf()")) {
610 fprintf(stderr, "[%d] (get_wbuf) range error at: %d/%d (seqr_cnt: %d)\n",
611 (*cur_buf)->hdr.worker_idx, i, buf2_cnt, seqr_cnt);
612 }
613 }
614 */
615
616 return 1;
617 }
618
619 /* put_wbuf() -- called in workers
620
621 In the PCOMPLIB version, there is no queue of buffers to be read,
622 so just send the buffer to the manager
623 */
put_wbuf(struct buf_head * cur_buf,int max_work_buf)624 void put_wbuf(struct buf_head *cur_buf, int max_work_buf)
625 {
626 int int_msg_b[4], i;
627 struct buf2_ares_s *buf2_ares_p;
628 struct a_res_str *cur_ares_p, *next_ares_p;
629 #ifdef MPI_SRC
630 MPI_Status mpi_status;
631 #endif
632
633 MPI_Send(&cur_buf->hdr, sizeof(struct buf_head), MPI_BYTE, 0,
634 RES_TYPE0, MPI_COMM_WORLD);
635
636 if (!cur_buf->hdr.have_results) { return;}
637
638 /* have buf2_res type results */
639 if (cur_buf->hdr.buf2_type & (BUF2_DOWORK + BUF2_DOSHUF+BUF2_DOOPT)) {
640 MPI_Send(cur_buf->buf2_res, sizeof(struct buf2_res_s)*cur_buf->hdr.buf2_cnt, MPI_BYTE, 0,
641 RES_TYPE1, MPI_COMM_WORLD);
642 }
643
644 /* have buf2_ares type results */
645 if (cur_buf->hdr.buf2_type & BUF2_DOALIGN) {
646 /* buf2_ares does not have much useful information, except have_ares and a chain of *a_res pointers.
647 so we need to:
648 (1) send have_ares
649 (2) send each part of the a_res chain individually
650 */
651
652 buf2_ares_p = cur_buf->buf2_ares;
653 for (i=0; i < cur_buf->hdr.buf2_cnt; i++) {
654 int_msg_b[0] = buf2_ares_p->have_ares;
655 MPI_Send(int_msg_b, 1, MPI_INT, 0, ALN_TYPE0, MPI_COMM_WORLD);
656 if (buf2_ares_p->have_ares) {
657 /* (a) send the first one */
658 for (cur_ares_p = buf2_ares_p->a_res; cur_ares_p; cur_ares_p = cur_ares_p->next) {
659 MPI_Send(cur_ares_p, sizeof(struct a_res_str), MPI_BYTE, 0, ALN_TYPE1, MPI_COMM_WORLD);
660 MPI_Send(cur_ares_p->res, cur_ares_p->nres ,MPI_INT, 0, ALN_TYPE2, MPI_COMM_WORLD);
661 if (cur_ares_p->aln_code) {
662 MPI_Send(cur_ares_p->aln_code, cur_ares_p->aln_code_n+1 ,MPI_BYTE, 0, ALN_TYPE3, MPI_COMM_WORLD);
663 }
664 if (cur_ares_p->ann_code) {
665 MPI_Send(cur_ares_p->ann_code, cur_ares_p->ann_code_n+1 ,MPI_BYTE, 0, ALN_TYPE3, MPI_COMM_WORLD);
666 }
667 } /* done with a_res chain */
668
669 /* free the chain */
670 cur_ares_p = buf2_ares_p->a_res;
671 while (cur_ares_p) {
672 if (cur_ares_p->aln_code) free(cur_ares_p->aln_code);
673 if (cur_ares_p->ann_code) free(cur_ares_p->ann_code);
674 if ((buf2_ares_p->have_ares & 0x1) && cur_ares_p->res) free(cur_ares_p->res);
675 next_ares_p = cur_ares_p->next;
676 free(cur_ares_p);
677 cur_ares_p = next_ares_p;
678 }
679 buf2_ares_p->a_res = NULL;
680 } /* done with have_ares */
681 buf2_ares_p->have_ares = 0; /* must be zero-ed out for later use */
682 buf2_ares_p++;
683 } /* done with buf2_ares[buf2_cnt] */
684 } /* done with BUF2_DOALIGN */
685 } /* done with put_wbuf() */
686