1 /* $Id: work_thr2.c $ */
2
3 /* copyright (c) 1996, 1997, 1998, 1999, 2014 by William R. Pearson
4 and The The Rector & Visitors of the University of Virginia */
5
6 /* Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing,
13 software distributed under this License is distributed on an "AS
14 IS" BASIS, WITHOUT WRRANTIES OR CONDITIONS OF ANY KIND, either
15 express or implied. See the License for the specific language
16 governing permissions and limitations under the License.
17 */
18
19 /* work_thr.c - threaded worker */
20
21 /* modified 21-Oct-1998 to work with reverse complement for DNA */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <time.h>
27 #include <sys/types.h>
28 #include <signal.h>
29
30 #include "defs.h" /* various constants */
31 #include "best_stats.h" /* defines beststr */
32 #include "structs.h"
33 #include "param.h" /* pstruct rstruct */
34 #include "thr_buf_structs.h"
35
36 /***************************************/
37 /* thread global variable declarations */
38 /***************************************/
39
40 #ifndef PCOMPLIB
41 #define XTERNAL
42 #include "thr_bufs2.h"
43 #undef XTERNAL
44 #else
45 #include "msg.h"
46 #define XTERNAL
47 #include "uascii.h"
48 #undef XTERNAL
49 #ifdef MPI_SRC
50 #include "mpi.h"
51 #endif
52 #endif
53
54 void alloc_pam (int, int, struct pstruct *);
55 int **alloc_pam2p(int **,int, int);
56 void revcomp(unsigned char *seq, int n, int *c_nt);
57
58 #if defined(WIN32) || !defined(THR_EXIT)
59 void pthread_exit(void *);
60 #define THR_EXIT pthread_exit
61 #else
62 void THR_EXIT(void *);
63 #endif
64
65 #ifdef DEBUG
66 extern struct buf_head *lib_buf2_list;
67 #endif
68
69 /* functions getting/sending buffers to threads (thr_sub.c) */
70 extern void wait_thr(void);
71 extern int get_wbuf(struct buf_head **cur_buf, int max_work_buf);
72 extern void put_wbuf(struct buf_head *cur_buf, int max_work_buf);
73
74 /* dropxx.c functions */
75 #include "drop_func.h"
76
77 extern void *my_srand();
78 extern unsigned int my_nrand(int, void *);
79 extern void qshuffle(unsigned char *aa0, int n0, int nm0, void *);
80 extern void free_pam2p(int **);
81
82 void init_aa0(unsigned char **aa0, int n0, int nm0,
83 unsigned char **aa0s, unsigned char **aa1s,
84 int qframe, int qshuffle_flg, int max_tot,
85 struct pstruct *ppst, void **f_str, void **qf_str,
86 void *my_rand_state);
87
88 extern void
89 buf_do_work(unsigned char **aa0, int n0, struct buf_head *lib_bhead_p,
90 int max_frame, struct pstruct *ppst, void **f_str);
91 extern void
92 buf_qshuf_work(unsigned char *aa0s, int n0, struct buf_head *lib_bhead_p,
93 int max_frame, struct pstruct *ppst, void *qf_str, int score_ix);
94 extern void
95 buf_shuf_work(unsigned char **aa0, int n0, unsigned char *aa1s,
96 struct buf_head *lib_bhead_p, int max_frame, struct pstruct *ppst,
97 void **f_str, int score_ix, void *);
98
99 void
100 buf_do_align(unsigned char **aa0, int n0,
101 struct buf_head *lib_bhead_p,
102 struct pstruct *ppst, const struct mngmsg *my_msp,
103 void **f_str);
104
105 #ifndef PCOMPLIB
106 #define FIRSTNODE 0
107 void
work_thread(struct thr_str * work_info)108 work_thread (struct thr_str *work_info)
109 #else
110 #if defined(TFAST)
111 extern void aainit(int tr_type, int debug);
112 #endif
113
114 int g_worker;
115
116 void work_comp(int my_worker)
117 #endif
118 {
119 struct buf_head *cur_buf, *my_cur_buf;
120 char info_lib_range[MAX_FN];
121 unsigned char *aa1s=NULL;
122 #ifndef PCOMPLIB
123
124 const struct mngmsg *my_msp;
125 int my_worker;
126 #else
127 #ifdef MPI_SRC
128 struct mngmsg *my_msp;
129 MPI_Status mpi_status;
130 int buf_alloc_flag = 0;
131 #endif
132 struct mngmsg my_msg;
133 int int_msg_b[4];
134 struct buf2_data_s *my_buf2_data;
135 struct buf2_res_s *my_buf2_res;
136 struct buf2_ares_s *my_buf2_ares;
137 struct seq_record *my_seq_buf;
138 unsigned char *my_aa1b_buf;
139 #endif
140 int i, j, npam, n0, nm0;
141 int max_work_buf, max_buf2_res, max_chain_seqs, seq_buf_size;
142 void *my_rand_state;
143
144 struct pstruct my_pst, *my_ppst;
145 unsigned char *aa0[6], *aa0s;
146 void *f_str[6], *qf_str;
147
148 my_rand_state=my_srand();
149
150 #ifndef PCOMPLIB
151 my_worker = work_info->worker;
152 max_work_buf = work_info->max_work_buf;
153 wait_thr(); /* wait for start_thread predicate to drop to 0 */
154
155 my_msp = work_info->m_msp;
156 #else /* PCOMPLIB */
157
158 #ifdef DEBUG
159 /* fprintf(stderr,"%d: work_comp started\n",my_worker); */
160 #endif
161 g_worker = my_worker;
162 my_msp = &my_msg;
163
164 #ifdef MPI_SRC
165 pcomp_loop:
166
167 MPI_Recv(int_msg_b,4,MPI_INT,0, STARTTYPE0,MPI_COMM_WORLD,
168 &mpi_status);
169
170 max_work_buf = int_msg_b[0];
171 max_buf2_res = int_msg_b[1];
172 max_chain_seqs = int_msg_b[2];
173 seq_buf_size = int_msg_b[3];
174
175 /* quit the main loop with a message of 0 max_work_buf */
176 if (max_work_buf == 0) { goto pcomp_final;}
177
178 MPI_Recv((void *)my_msp,sizeof(struct mngmsg),MPI_BYTE,0,STARTTYPE1,MPI_COMM_WORLD,
179 &mpi_status);
180
181 MPI_Recv((void *)&my_pst,(int)sizeof(struct pstruct),MPI_BYTE,0,STARTTYPE2,MPI_COMM_WORLD,
182 &mpi_status);
183 my_ppst = &my_pst;
184
185 #endif /* MPI_SRC */
186
187 if (!buf_alloc_flag) {
188 buf_alloc_flag = 1;
189 /* must allocate buffers for data, sequences, results */
190 if ((my_cur_buf = cur_buf = (struct buf_head *)calloc(1,sizeof(struct buf_head)))==NULL) {
191 fprintf(stderr,"cannot allocate buf_head\n");
192 exit(1);
193 }
194
195 /* allocate results array */
196 if ((my_buf2_res = (struct buf2_res_s*)calloc(max_buf2_res+1,sizeof(struct buf2_res_s)))==NULL) {
197 fprintf(stderr,"cannot allocate buf2_data[%d]\n",max_buf2_res);
198 exit(1);
199 }
200 cur_buf->buf2_res = my_buf2_res;
201
202 /* allocate buffers for ares alignment encodings */
203 if ((my_buf2_ares = (struct buf2_ares_s*)calloc(max_buf2_res+1,sizeof(struct buf2_ares_s)))==NULL) {
204 fprintf(stderr,"cannot allocate buf2_data[%d]\n",max_buf2_res);
205 exit(1);
206 }
207 cur_buf->buf2_ares = my_buf2_ares;
208
209 /* allocate buffers for data */
210 if ((my_buf2_data = (struct buf2_data_s*)calloc(max_buf2_res+1,sizeof(struct buf2_data_s)))==NULL) {
211 fprintf(stderr,"cannot allocate buf2_data[%d]\n",max_buf2_res);
212 exit(1);
213 }
214 cur_buf->buf2_data = my_buf2_data;
215
216 /* also must allocate seq_records */
217 if ((my_seq_buf =
218 (struct seq_record *)calloc((size_t)(max_buf2_res+1), sizeof(struct seq_record)))
219 ==NULL) {
220 fprintf(stderr,"%d: cannot allocate seq_record buffer[%d]\n",my_worker,max_buf2_res+1);
221 exit(1);
222 }
223 cur_buf->buf2_data[0].seq = cur_buf->hdr.seq_b = my_seq_buf;
224
225 if ((my_aa1b_buf = (unsigned char *)calloc((size_t)(seq_buf_size+1),sizeof(unsigned char)))
226 ==NULL) {
227 fprintf(stderr,"%d: cannot allocate sequence buffer[%d]\n",my_worker, seq_buf_size);
228 exit(1);
229 }
230 else { /* now associate the my_aa1b_buf with cur_buf */
231 my_aa1b_buf++;
232 cur_buf->hdr.aa1b_start = cur_buf->buf2_data[0].seq->aa1b = my_aa1b_buf;
233 cur_buf->hdr.aa1b_size = seq_buf_size;
234 }
235 }
236 else {
237 cur_buf = my_cur_buf;
238 cur_buf->buf2_data = my_buf2_data;
239 cur_buf->buf2_data[0].seq = cur_buf->hdr.seq_b = my_seq_buf;
240 cur_buf->buf2_res = my_buf2_res;
241 cur_buf->buf2_ares = my_buf2_ares;
242 cur_buf->hdr.aa1b_start = cur_buf->buf2_data[0].seq->aa1b = my_aa1b_buf;
243 cur_buf->hdr.aa1b_size = seq_buf_size;
244 }
245
246 #if defined(TFAST)
247 /* set up translation tables: faatran.c */
248 aainit(my_ppst->tr_type,my_ppst->debug_lib);
249 #endif
250
251 #endif /* PCOMPLIB */
252
253 /* the pam allocation stuff is very different for threaded vs PCOMPLIB,
254 so the code is separate */
255 #if !defined(PCOMPLIB)
256 /* make certain that all but 0 have their own copy of pst */
257 if (my_worker== 0) {
258 my_ppst=work_info->ppst;
259 }
260 else {
261 my_ppst = &my_pst;
262 memcpy(my_ppst,work_info->ppst,sizeof(struct pstruct));
263 /* #else we already have the stuff in my_pst from initialization */
264
265 my_ppst->pam2p[0] = my_ppst->pam2p[1] = NULL;
266
267 alloc_pam(MAXSQ, MAXSQ, my_ppst);
268
269 npam = my_pst.nsqx;
270
271 /* allocate local copy of pam2[][] */
272 for (i=0; i<npam; i++) {
273 for (j=0; j<npam; j++) {
274 my_pst.pam2[0][i][j] = work_info->ppst->pam2[0][i][j];
275 my_pst.pam2[1][i][j] = work_info->ppst->pam2[1][i][j];
276 }
277 }
278 }
279 #endif
280 #if defined(PCOMPLIB) /* PCOMPLIB */
281 my_ppst = &my_pst; /* for all workers */
282 alloc_pam(my_msg.pamd1,my_msg.pamd2,my_ppst);
283 #ifdef MPI_SRC
284 MPI_Recv(&my_pst.pam2[0][0][0],my_msg.pamd1*my_msg.pamd2,MPI_INT,0,
285 STARTTYPE3, MPI_COMM_WORLD,&mpi_status);
286
287 MPI_Recv(&my_pst.pam2[1][0][0],my_msg.pamd1*my_msg.pamd2,MPI_INT,0,
288 STARTTYPE3, MPI_COMM_WORLD,&mpi_status);
289 /* no code for profiles */
290
291 /* get pascii (only for fasty/tfasty */
292 pascii = aascii;
293 MPI_Recv(pascii, sizeof(aascii), MPI_BYTE, 0, STARTTYPE4, MPI_COMM_WORLD, &mpi_status);
294 #endif
295 #endif
296
297 /* fill in info_lib_range */
298 if (my_worker == FIRSTNODE) {
299 /* label library size limits */
300 if (my_ppst->n1_low > 0 && my_ppst->n1_high < BIGNUM) {
301 sprintf(info_lib_range," (range: %d-%d)",my_ppst->n1_low,my_ppst->n1_high);}
302 else if (my_ppst->n1_low > 0) {
303 sprintf(info_lib_range," (range: >%d)",my_ppst->n1_low);}
304 else if (my_ppst->n1_high < BIGNUM) {
305 sprintf(info_lib_range," (range: <%d)",my_ppst->n1_high);}
306 else {
307 info_lib_range[0]='\0';
308 }
309 info_lib_range[sizeof(info_lib_range)-1]='\0';
310 #ifndef PCOMPLIB
311 strncpy(work_info->info_lib_range,info_lib_range,MAX_SSTR);
312 /* this does not work on some architectures */
313 work_info->f_str_ap = &f_str[0];
314 #endif
315 }
316
317 #ifdef PCOMPLIB
318 #ifdef MPI_SRC
319 /* send back sync message */
320 int_msg_b[0]=my_worker;
321 MPI_Send(int_msg_b,1,MPI_INT,0,MSEQTYPE0,MPI_COMM_WORLD);
322 if (my_worker == FIRSTNODE) {
323 MPI_Send(info_lib_range,MAX_FN,MPI_BYTE,0,MSEQTYPE0,MPI_COMM_WORLD);
324 }
325 #endif
326 #endif
327
328 /* do the aa0[] stuff after m_msg/my_pst are initialized, for later
329 inclusion in a loop */
330
331 #ifdef PCOMPLIB
332 #ifdef MPI_SRC
333 MPI_Recv(int_msg_b,2,MPI_INT,0,
334 QSEQTYPE0, MPI_COMM_WORLD, &mpi_status);
335
336 n0 = int_msg_b[0];
337 nm0 = int_msg_b[1];
338 #endif
339 #else /* COMP_THR */
340 n0 = my_msp->n0;
341 nm0 = my_msp->nm0;
342 if (my_worker != FIRSTNODE) {
343 /* if this is a pssm search, allocate local copy of pam2p[][]*/
344 if (work_info->ppst->pam_pssm && work_info->ppst->pam2p[0]) {
345 my_ppst->pam2p[0] = alloc_pam2p(my_ppst->pam2p[0],n0,npam);
346 my_ppst->pam2p[1] = alloc_pam2p(my_ppst->pam2p[1],n0,npam);
347
348 for (i=0; i<n0; i++) {
349 for (j=0; j < npam; j++) {
350 my_pst.pam2p[0][i][j] = work_info->ppst->pam2p[0][i][j];
351 my_pst.pam2p[1][i][j] = work_info->ppst->pam2p[1][i][j];
352 }
353 }
354 }
355 }
356 #endif
357
358 if ((aa0[0]=(unsigned char *)calloc((size_t)n0+2+SEQ_PAD,sizeof(unsigned char)))
359 ==NULL) {
360 fprintf(stderr," cannot allocate aa00[%d] for worker %d\n",
361 n0, my_worker);
362 exit(1);
363 }
364 *aa0[0]='\0';
365 aa0[0]++;
366
367 #ifndef PCOMPLIB
368 memcpy(aa0[0],work_info->aa0,n0+1);
369 #else
370 #ifdef MPI_SRC
371 /* get aa0[0] from host */
372 MPI_Recv(aa0[0],n0+1,MPI_BYTE,0,
373 QSEQTYPE1,MPI_COMM_WORLD, &mpi_status);
374
375 /* also get annotation if available */
376 if (my_msp->ann_flg && my_msp->aa0a != NULL) {
377 if ((my_msp->aa0a = (unsigned char *)calloc(my_msp->n0+2,sizeof(char)))==NULL) {
378 fprintf(stderr, "*** error -- cannot allocate annotation array\n");
379 exit(1);
380 }
381 MPI_Recv(my_msp->aa0a, (my_msp->n0+2)*sizeof(char), MPI_BYTE, 0,
382 QSEQTYPE1, MPI_COMM_WORLD, &mpi_status);
383 }
384 #endif
385 #endif
386
387 init_aa0(aa0, n0, nm0, &aa0s, &aa1s,
388 my_msp->qframe, my_msp->qshuffle, my_msp->max_tot,
389 my_ppst, &f_str[0], &qf_str, my_rand_state);
390
391 /* **************************************************************** */
392 /* main work loop */
393
394 while (get_wbuf(&cur_buf,max_work_buf)) {
395
396 if (cur_buf->hdr.stop_work) break;
397
398 /* exit thread on specific command -- this option is not used
399 for threads - get_wbuf() stops when rbuf_done() sets reader_done==1
400 but it is used for PCOMPLIB
401 */
402
403 if (cur_buf->hdr.buf2_cnt <= 0) { /* buffers can be empty */
404 cur_buf->hdr.have_results = 0;
405 goto res_done;
406 }
407
408 if (cur_buf->hdr.buf2_type & BUF2_DOWORK) {
409
410 buf_do_work(aa0, n0, cur_buf, my_msp->nitt1, my_ppst, f_str);
411
412 if (my_msp->qshuffle) {
413 buf_qshuf_work(aa0s, n0, cur_buf, my_msp->nitt1,
414 my_ppst, qf_str, my_ppst->score_ix);
415 }
416 }
417
418 if (cur_buf->hdr.buf2_type & BUF2_DOSHUF) {
419 buf_shuf_work(aa0, n0, aa1s, cur_buf, my_msp->nitt1,
420 my_ppst, f_str, my_ppst->score_ix, my_rand_state);
421 }
422
423 /*
424 if (cur_buf->hdr.buf2_type & BUF2_DOOPT) {
425 buf_do_opt(aa0, n0, cur_buf, my_ppst, f_str);
426 }
427 */
428
429 if (cur_buf->hdr.buf2_type & BUF2_DOALIGN) {
430 buf_do_align(aa0, n0, cur_buf, my_ppst, my_msp, f_str);
431 }
432 cur_buf->hdr.have_results = 1;
433
434 res_done:
435 cur_buf->hdr.have_data = 0;
436
437 put_wbuf(cur_buf,max_work_buf);
438
439 } /* end main while */
440
441 /* **************************************************************** */
442 /* all done - clean-up */
443
444 close_work(aa0[0], n0, my_ppst, &f_str[0]);
445 free(aa0[0]-1);
446 if (my_msp->qframe == 2) {
447 close_work(aa0[1], n0, my_ppst, &f_str[1]);
448 free(aa0[1]-1);
449 }
450
451 if (my_msp->qshuffle) {
452 close_work(aa0s, n0, my_ppst, &qf_str);
453 free(aa0s-1);
454 }
455
456 free(aa1s-1);
457
458 #ifdef PCOMPLIB
459 if (my_msp->ann_flg && my_msp->aa0a) { free(my_msp->aa0a);}
460 #endif
461
462 if (my_worker) {
463 free(my_pst.pam2[1][0]);
464 free(my_pst.pam2[0][0]);
465 free(my_pst.pam2[1]);
466 free(my_pst.pam2[0]);
467 }
468
469 if (my_worker && my_pst.pam_pssm) {
470 free_pam2p(my_pst.pam2p[0]);
471 free_pam2p(my_pst.pam2p[1]);
472 }
473
474 /* **************************************************************** */
475 /* and exit */
476
477 #ifdef DEBUG
478 /* fprintf(stderr,"worker [%d] done\n",my_worker); */
479 #endif
480
481 #ifndef PCOMPLIB
482 free(my_rand_state);
483 THR_EXIT(&work_info->status);
484 #else
485 /* the PCOMPLIB version loops after a search, waiting for another max_work_buf */
486 /* max_work_buf==0 signals end of queries */
487 goto pcomp_loop;
488
489 pcomp_final:
490 free(my_rand_state);
491 #endif
492 } /* end work_thread */
493