1 /*  $Id: work_thr2.c $ */
2 
3 /* copyright (c) 1996, 1997, 1998, 1999, 2014 by William R. Pearson
4    and The The Rector & Visitors of the University of Virginia */
5 
6 /* Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9 
10    http://www.apache.org/licenses/LICENSE-2.0
11 
12    Unless required by applicable law or agreed to in writing,
13    software distributed under this License is distributed on an "AS
14    IS" BASIS, WITHOUT WRRANTIES OR CONDITIONS OF ANY KIND, either
15    express or implied.  See the License for the specific language
16    governing permissions and limitations under the License.
17 */
18 
19 /* work_thr.c - threaded worker */
20 
21 /* modified 21-Oct-1998 to work with reverse complement for DNA */
22 
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <time.h>
27 #include <sys/types.h>
28 #include <signal.h>
29 
30 #include "defs.h"		/* various constants */
31 #include "best_stats.h"			/* defines beststr */
32 #include "structs.h"
33 #include "param.h"		/* pstruct rstruct */
34 #include "thr_buf_structs.h"
35 
36 /***************************************/
37 /* thread global variable declarations */
38 /***************************************/
39 
40 #ifndef PCOMPLIB
41 #define XTERNAL
42 #include "thr_bufs2.h"
43 #undef XTERNAL
44 #else
45 #include "msg.h"
46 #define XTERNAL
47 #include "uascii.h"
48 #undef XTERNAL
49 #ifdef MPI_SRC
50 #include "mpi.h"
51 #endif
52 #endif
53 
54 void alloc_pam (int, int, struct pstruct *);
55 int **alloc_pam2p(int **,int, int);
56 void revcomp(unsigned char *seq, int n, int *c_nt);
57 
58 #if defined(WIN32) || !defined(THR_EXIT)
59 void pthread_exit(void *);
60 #define THR_EXIT pthread_exit
61 #else
62 void THR_EXIT(void *);
63 #endif
64 
65 #ifdef DEBUG
66 extern struct buf_head *lib_buf2_list;
67 #endif
68 
69 /* functions getting/sending buffers to threads (thr_sub.c) */
70 extern void wait_thr(void);
71 extern int get_wbuf(struct buf_head **cur_buf, int max_work_buf);
72 extern void put_wbuf(struct buf_head *cur_buf, int max_work_buf);
73 
74 /* dropxx.c functions */
75 #include "drop_func.h"
76 
77 extern void *my_srand();
78 extern unsigned int my_nrand(int, void *);
79 extern void qshuffle(unsigned char *aa0, int n0, int nm0, void *);
80 extern void free_pam2p(int **);
81 
82 void init_aa0(unsigned char **aa0, int n0, int nm0,
83 	      unsigned char **aa0s, unsigned char **aa1s,
84 	      int qframe, int qshuffle_flg, int max_tot,
85 	      struct pstruct *ppst, void **f_str, void **qf_str,
86 	      void *my_rand_state);
87 
88 extern void
89 buf_do_work(unsigned char **aa0, int n0, struct buf_head *lib_bhead_p,
90 	    int max_frame, struct pstruct *ppst, void **f_str);
91 extern void
92 buf_qshuf_work(unsigned char *aa0s, int n0, struct buf_head *lib_bhead_p,
93 	       int max_frame, struct pstruct *ppst, void *qf_str, int score_ix);
94 extern void
95 buf_shuf_work(unsigned char **aa0, int n0, unsigned char *aa1s,
96 	      struct buf_head *lib_bhead_p, int max_frame, struct pstruct *ppst,
97 	      void **f_str, int score_ix, void *);
98 
99 void
100 buf_do_align(unsigned char **aa0,  int n0,
101 	     struct buf_head *lib_bhead_p,
102 	     struct pstruct *ppst, const struct mngmsg *my_msp,
103 	     void **f_str);
104 
105 #ifndef PCOMPLIB
106 #define FIRSTNODE 0
107 void
work_thread(struct thr_str * work_info)108 work_thread (struct thr_str *work_info)
109 #else
110 #if defined(TFAST)
111 extern void aainit(int tr_type, int debug);
112 #endif
113 
114 int g_worker;
115 
116 void work_comp(int my_worker)
117 #endif
118 {
119   struct buf_head *cur_buf, *my_cur_buf;
120   char info_lib_range[MAX_FN];
121   unsigned char *aa1s=NULL;
122 #ifndef PCOMPLIB
123 
124   const struct mngmsg *my_msp;
125   int my_worker;
126 #else
127 #ifdef MPI_SRC
128   struct mngmsg *my_msp;
129   MPI_Status mpi_status;
130   int buf_alloc_flag = 0;
131 #endif
132   struct mngmsg my_msg;
133   int int_msg_b[4];
134   struct buf2_data_s *my_buf2_data;
135   struct buf2_res_s *my_buf2_res;
136   struct buf2_ares_s *my_buf2_ares;
137   struct seq_record *my_seq_buf;
138   unsigned char *my_aa1b_buf;
139 #endif
140   int i, j, npam, n0, nm0;
141   int max_work_buf, max_buf2_res, max_chain_seqs, seq_buf_size;
142   void *my_rand_state;
143 
144   struct pstruct my_pst, *my_ppst;
145   unsigned char *aa0[6], *aa0s;
146   void *f_str[6], *qf_str;
147 
148   my_rand_state=my_srand();
149 
150 #ifndef PCOMPLIB
151   my_worker = work_info->worker;
152   max_work_buf = work_info->max_work_buf;
153   wait_thr();	/* wait for start_thread predicate to drop to  0 */
154 
155   my_msp = work_info->m_msp;
156 #else 	/* PCOMPLIB */
157 
158 #ifdef DEBUG
159 /*  fprintf(stderr,"%d: work_comp started\n",my_worker); */
160 #endif
161   g_worker = my_worker;
162   my_msp = &my_msg;
163 
164 #ifdef MPI_SRC
165  pcomp_loop:
166 
167   MPI_Recv(int_msg_b,4,MPI_INT,0, STARTTYPE0,MPI_COMM_WORLD,
168 	   &mpi_status);
169 
170   max_work_buf = int_msg_b[0];
171   max_buf2_res = int_msg_b[1];
172   max_chain_seqs = int_msg_b[2];
173   seq_buf_size = int_msg_b[3];
174 
175   /* quit the main loop with a message of 0 max_work_buf */
176   if (max_work_buf == 0) { goto pcomp_final;}
177 
178   MPI_Recv((void *)my_msp,sizeof(struct mngmsg),MPI_BYTE,0,STARTTYPE1,MPI_COMM_WORLD,
179 	   &mpi_status);
180 
181   MPI_Recv((void *)&my_pst,(int)sizeof(struct pstruct),MPI_BYTE,0,STARTTYPE2,MPI_COMM_WORLD,
182 	   &mpi_status);
183   my_ppst = &my_pst;
184 
185 #endif	/* MPI_SRC */
186 
187   if (!buf_alloc_flag) {
188     buf_alloc_flag = 1;
189     /* must allocate buffers for data, sequences, results */
190     if ((my_cur_buf = cur_buf = (struct buf_head *)calloc(1,sizeof(struct buf_head)))==NULL) {
191       fprintf(stderr,"cannot allocate buf_head\n");
192       exit(1);
193     }
194 
195     /* allocate results array */
196     if ((my_buf2_res = (struct buf2_res_s*)calloc(max_buf2_res+1,sizeof(struct buf2_res_s)))==NULL) {
197       fprintf(stderr,"cannot allocate buf2_data[%d]\n",max_buf2_res);
198       exit(1);
199     }
200     cur_buf->buf2_res = my_buf2_res;
201 
202     /* allocate buffers for ares alignment encodings */
203     if ((my_buf2_ares = (struct buf2_ares_s*)calloc(max_buf2_res+1,sizeof(struct buf2_ares_s)))==NULL) {
204       fprintf(stderr,"cannot allocate buf2_data[%d]\n",max_buf2_res);
205       exit(1);
206     }
207     cur_buf->buf2_ares = my_buf2_ares;
208 
209     /* allocate buffers for data */
210     if ((my_buf2_data = (struct buf2_data_s*)calloc(max_buf2_res+1,sizeof(struct buf2_data_s)))==NULL) {
211       fprintf(stderr,"cannot allocate buf2_data[%d]\n",max_buf2_res);
212       exit(1);
213     }
214     cur_buf->buf2_data = my_buf2_data;
215 
216     /* also must allocate seq_records */
217     if ((my_seq_buf =
218 	 (struct seq_record *)calloc((size_t)(max_buf2_res+1), sizeof(struct seq_record)))
219         ==NULL) {
220       fprintf(stderr,"%d: cannot allocate seq_record buffer[%d]\n",my_worker,max_buf2_res+1);
221       exit(1);
222     }
223     cur_buf->buf2_data[0].seq = cur_buf->hdr.seq_b = my_seq_buf;
224 
225     if ((my_aa1b_buf = (unsigned char *)calloc((size_t)(seq_buf_size+1),sizeof(unsigned char)))
226         ==NULL) {
227       fprintf(stderr,"%d: cannot allocate sequence buffer[%d]\n",my_worker, seq_buf_size);
228       exit(1);
229     }
230     else {	  /* now associate the my_aa1b_buf with cur_buf */
231       my_aa1b_buf++;
232       cur_buf->hdr.aa1b_start = cur_buf->buf2_data[0].seq->aa1b = my_aa1b_buf;
233       cur_buf->hdr.aa1b_size = seq_buf_size;
234    }
235   }
236   else {
237     cur_buf = my_cur_buf;
238     cur_buf->buf2_data = my_buf2_data;
239     cur_buf->buf2_data[0].seq = cur_buf->hdr.seq_b = my_seq_buf;
240     cur_buf->buf2_res = my_buf2_res;
241     cur_buf->buf2_ares = my_buf2_ares;
242     cur_buf->hdr.aa1b_start = cur_buf->buf2_data[0].seq->aa1b = my_aa1b_buf;
243     cur_buf->hdr.aa1b_size = seq_buf_size;
244   }
245 
246 #if defined(TFAST)
247     /* set up translation tables: faatran.c */
248   aainit(my_ppst->tr_type,my_ppst->debug_lib);
249 #endif
250 
251 #endif	/* PCOMPLIB */
252 
253   /* the pam allocation stuff is very different for threaded vs PCOMPLIB,
254      so the code is separate */
255 #if !defined(PCOMPLIB)
256   /* make certain that all but 0 have their own copy of pst */
257   if (my_worker== 0) {
258     my_ppst=work_info->ppst;
259   }
260   else {
261     my_ppst = &my_pst;
262     memcpy(my_ppst,work_info->ppst,sizeof(struct pstruct));
263     /* #else we already have the stuff in my_pst from initialization */
264 
265     my_ppst->pam2p[0] = my_ppst->pam2p[1] = NULL;
266 
267     alloc_pam(MAXSQ, MAXSQ, my_ppst);
268 
269     npam = my_pst.nsqx;
270 
271     /* allocate local copy of pam2[][] */
272     for (i=0; i<npam; i++) {
273       for (j=0; j<npam; j++) {
274 	my_pst.pam2[0][i][j] = work_info->ppst->pam2[0][i][j];
275 	my_pst.pam2[1][i][j] = work_info->ppst->pam2[1][i][j];
276       }
277     }
278   }
279 #endif
280 #if defined(PCOMPLIB)	/* PCOMPLIB */
281   my_ppst = &my_pst;	/* for all workers */
282   alloc_pam(my_msg.pamd1,my_msg.pamd2,my_ppst);
283 #ifdef MPI_SRC
284   MPI_Recv(&my_pst.pam2[0][0][0],my_msg.pamd1*my_msg.pamd2,MPI_INT,0,
285 	   STARTTYPE3, MPI_COMM_WORLD,&mpi_status);
286 
287   MPI_Recv(&my_pst.pam2[1][0][0],my_msg.pamd1*my_msg.pamd2,MPI_INT,0,
288 	   STARTTYPE3, MPI_COMM_WORLD,&mpi_status);
289   /* no code for profiles */
290 
291   /* get pascii (only for fasty/tfasty */
292   pascii = aascii;
293   MPI_Recv(pascii, sizeof(aascii), MPI_BYTE, 0, STARTTYPE4, MPI_COMM_WORLD, &mpi_status);
294 #endif
295 #endif
296 
297   /* fill in info_lib_range */
298   if (my_worker == FIRSTNODE) {
299     /* label library size limits */
300     if (my_ppst->n1_low > 0 && my_ppst->n1_high < BIGNUM) {
301       sprintf(info_lib_range," (range: %d-%d)",my_ppst->n1_low,my_ppst->n1_high);}
302     else if (my_ppst->n1_low > 0) {
303       sprintf(info_lib_range," (range: >%d)",my_ppst->n1_low);}
304     else if (my_ppst->n1_high < BIGNUM) {
305       sprintf(info_lib_range," (range: <%d)",my_ppst->n1_high);}
306     else {
307       info_lib_range[0]='\0';
308     }
309     info_lib_range[sizeof(info_lib_range)-1]='\0';
310 #ifndef PCOMPLIB
311     strncpy(work_info->info_lib_range,info_lib_range,MAX_SSTR);
312     /* this does not work on some architectures */
313     work_info->f_str_ap = &f_str[0];
314 #endif
315   }
316 
317 #ifdef PCOMPLIB
318 #ifdef MPI_SRC
319   /* send back sync message */
320   int_msg_b[0]=my_worker;
321   MPI_Send(int_msg_b,1,MPI_INT,0,MSEQTYPE0,MPI_COMM_WORLD);
322   if (my_worker == FIRSTNODE) {
323     MPI_Send(info_lib_range,MAX_FN,MPI_BYTE,0,MSEQTYPE0,MPI_COMM_WORLD);
324   }
325 #endif
326 #endif
327 
328   /* do the aa0[] stuff after m_msg/my_pst are initialized, for later
329      inclusion in a loop */
330 
331 #ifdef PCOMPLIB
332 #ifdef MPI_SRC
333   MPI_Recv(int_msg_b,2,MPI_INT,0,
334 	   QSEQTYPE0, MPI_COMM_WORLD, &mpi_status);
335 
336   n0 = int_msg_b[0];
337   nm0 = int_msg_b[1];
338 #endif
339 #else	/* COMP_THR */
340   n0 = my_msp->n0;
341   nm0 = my_msp->nm0;
342   if (my_worker != FIRSTNODE) {
343     /* if this is a pssm search, allocate local copy of pam2p[][]*/
344     if (work_info->ppst->pam_pssm && work_info->ppst->pam2p[0]) {
345       my_ppst->pam2p[0] = alloc_pam2p(my_ppst->pam2p[0],n0,npam);
346       my_ppst->pam2p[1] = alloc_pam2p(my_ppst->pam2p[1],n0,npam);
347 
348       for (i=0; i<n0; i++) {
349 	for (j=0; j < npam; j++) {
350 	  my_pst.pam2p[0][i][j] = work_info->ppst->pam2p[0][i][j];
351 	  my_pst.pam2p[1][i][j] = work_info->ppst->pam2p[1][i][j];
352 	}
353       }
354     }
355   }
356 #endif
357 
358   if ((aa0[0]=(unsigned char *)calloc((size_t)n0+2+SEQ_PAD,sizeof(unsigned char)))
359       ==NULL) {
360     fprintf(stderr," cannot allocate aa00[%d] for worker %d\n",
361 	    n0, my_worker);
362     exit(1);
363   }
364   *aa0[0]='\0';
365   aa0[0]++;
366 
367 #ifndef PCOMPLIB
368   memcpy(aa0[0],work_info->aa0,n0+1);
369 #else
370 #ifdef MPI_SRC
371   /* get aa0[0] from host */
372   MPI_Recv(aa0[0],n0+1,MPI_BYTE,0,
373 	   QSEQTYPE1,MPI_COMM_WORLD, &mpi_status);
374 
375   /* also get annotation if available */
376   if (my_msp->ann_flg && my_msp->aa0a != NULL) {
377     if ((my_msp->aa0a = (unsigned char *)calloc(my_msp->n0+2,sizeof(char)))==NULL) {
378       fprintf(stderr, "*** error -- cannot allocate annotation array\n");
379       exit(1);
380     }
381     MPI_Recv(my_msp->aa0a, (my_msp->n0+2)*sizeof(char), MPI_BYTE, 0,
382 	     QSEQTYPE1, MPI_COMM_WORLD, &mpi_status);
383   }
384 #endif
385 #endif
386 
387   init_aa0(aa0, n0, nm0, &aa0s, &aa1s,
388 	   my_msp->qframe, my_msp->qshuffle, my_msp->max_tot,
389 	   my_ppst,  &f_str[0], &qf_str, my_rand_state);
390 
391 /* **************************************************************** */
392 /* main work loop */
393 
394   while (get_wbuf(&cur_buf,max_work_buf)) {
395 
396     if (cur_buf->hdr.stop_work) break;
397 
398     /* exit thread on specific command -- this option is not used
399        for threads - get_wbuf() stops when rbuf_done() sets reader_done==1
400        but it is used for PCOMPLIB
401     */
402 
403     if (cur_buf->hdr.buf2_cnt <= 0) {	/* buffers can be empty */
404       cur_buf->hdr.have_results = 0;
405       goto res_done;
406     }
407 
408     if (cur_buf->hdr.buf2_type & BUF2_DOWORK) {
409 
410       buf_do_work(aa0, n0, cur_buf, my_msp->nitt1, my_ppst, f_str);
411 
412       if (my_msp->qshuffle) {
413 	buf_qshuf_work(aa0s, n0, cur_buf, my_msp->nitt1,
414 		       my_ppst, qf_str, my_ppst->score_ix);
415       }
416     }
417 
418     if (cur_buf->hdr.buf2_type & BUF2_DOSHUF) {
419       buf_shuf_work(aa0, n0, aa1s,  cur_buf, my_msp->nitt1,
420 		    my_ppst, f_str, my_ppst->score_ix, my_rand_state);
421     }
422 
423     /*
424     if (cur_buf->hdr.buf2_type & BUF2_DOOPT) {
425       buf_do_opt(aa0, n0, cur_buf, my_ppst, f_str);
426     }
427     */
428 
429     if (cur_buf->hdr.buf2_type & BUF2_DOALIGN) {
430       buf_do_align(aa0, n0, cur_buf, my_ppst, my_msp, f_str);
431     }
432     cur_buf->hdr.have_results = 1;
433 
434   res_done:
435     cur_buf->hdr.have_data = 0;
436 
437     put_wbuf(cur_buf,max_work_buf);
438 
439   } /* end main while */
440 
441 /* **************************************************************** */
442 /* all done - clean-up */
443 
444   close_work(aa0[0], n0, my_ppst, &f_str[0]);
445   free(aa0[0]-1);
446   if (my_msp->qframe == 2) {
447     close_work(aa0[1], n0, my_ppst, &f_str[1]);
448     free(aa0[1]-1);
449   }
450 
451   if (my_msp->qshuffle) {
452     close_work(aa0s, n0, my_ppst, &qf_str);
453     free(aa0s-1);
454   }
455 
456   free(aa1s-1);
457 
458 #ifdef PCOMPLIB
459   if (my_msp->ann_flg && my_msp->aa0a) { free(my_msp->aa0a);}
460 #endif
461 
462   if (my_worker) {
463     free(my_pst.pam2[1][0]);
464     free(my_pst.pam2[0][0]);
465     free(my_pst.pam2[1]);
466     free(my_pst.pam2[0]);
467   }
468 
469   if (my_worker && my_pst.pam_pssm) {
470     free_pam2p(my_pst.pam2p[0]);
471     free_pam2p(my_pst.pam2p[1]);
472   }
473 
474 /* **************************************************************** */
475 /* and exit */
476 
477 #ifdef DEBUG
478   /*   fprintf(stderr,"worker [%d] done\n",my_worker); */
479 #endif
480 
481 #ifndef PCOMPLIB
482   free(my_rand_state);
483   THR_EXIT(&work_info->status);
484 #else
485   /* the PCOMPLIB version loops after a search, waiting for another max_work_buf */
486   /* max_work_buf==0 signals end of queries */
487   goto pcomp_loop;
488 
489  pcomp_final:
490   free(my_rand_state);
491 #endif
492 }  /* end work_thread */
493