1 /* hmmsearch: search profile HMM(s) against a sequence database.
2 *
3 * To do:
4 * - in MPI mode, add a check to make sure ncpus >= 2. If 1, then we
5 * only have a master, no workers. See Infernal commit r3972 on the
6 * same point; and same note in hmmscan.c's to do list.
7 */
8 #include "p7_config.h"
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "easel.h"
15 #include "esl_alphabet.h"
16 #include "esl_getopts.h"
17 #include "esl_msa.h"
18 #include "esl_msafile.h"
19 #include "esl_sq.h"
20 #include "esl_sqio.h"
21 #include "esl_stopwatch.h"
22
23 #ifdef HMMER_MPI
24 #include "mpi.h"
25 #include "esl_mpi.h"
26 #endif
27
28 #ifdef HMMER_THREADS
29 #include <unistd.h>
30 #include "esl_threads.h"
31 #include "esl_workqueue.h"
32 #endif
33
34 #include "hmmer.h"
35
36 typedef struct {
37 #ifdef HMMER_THREADS
38 ESL_WORK_QUEUE *queue;
39 #endif
40 P7_BG *bg; /* null model */
41 P7_PIPELINE *pli; /* work pipeline */
42 P7_TOPHITS *th; /* top hit results */
43 P7_OPROFILE *om; /* optimized query profile */
44 } WORKER_INFO;
45
46 #define REPOPTS "-E,-T,--cut_ga,--cut_nc,--cut_tc"
47 #define DOMREPOPTS "--domE,--domT,--cut_ga,--cut_nc,--cut_tc"
48 #define INCOPTS "--incE,--incT,--cut_ga,--cut_nc,--cut_tc"
49 #define INCDOMOPTS "--incdomE,--incdomT,--cut_ga,--cut_nc,--cut_tc"
50 #define THRESHOPTS "-E,-T,--domE,--domT,--incE,--incT,--incdomE,--incdomT,--cut_ga,--cut_nc,--cut_tc"
51
52 #if defined (HMMER_THREADS) && defined (HMMER_MPI)
53 #define CPUOPTS "--mpi"
54 #define MPIOPTS "--cpu"
55 #else
56 #define CPUOPTS NULL
57 #define MPIOPTS NULL
58 #endif
59
60 static ESL_OPTIONS options[] = {
61 /* name type default env range toggles reqs incomp help docgroup*/
62 { "-h", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, NULL, "show brief help on version and usage", 1 },
63 /* Control of output */
64 { "-o", eslARG_OUTFILE, NULL, NULL, NULL, NULL, NULL, NULL, "direct output to file <f>, not stdout", 2 },
65 { "-A", eslARG_OUTFILE, NULL, NULL, NULL, NULL, NULL, NULL, "save multiple alignment of all hits to file <f>", 2 },
66 { "--tblout", eslARG_OUTFILE, NULL, NULL, NULL, NULL, NULL, NULL, "save parseable table of per-sequence hits to file <f>", 2 },
67 { "--domtblout", eslARG_OUTFILE, NULL, NULL, NULL, NULL, NULL, NULL, "save parseable table of per-domain hits to file <f>", 2 },
68 { "--pfamtblout", eslARG_OUTFILE, NULL, NULL, NULL, NULL, NULL, NULL, "save table of hits and domains to file, in Pfam format <f>", 2 },
69 { "--acc", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, NULL, "prefer accessions over names in output", 2 },
70 { "--noali", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, NULL, "don't output alignments, so output is smaller", 2 },
71 { "--notextw", eslARG_NONE, NULL, NULL, NULL, NULL, NULL, "--textw", "unlimit ASCII text output line width", 2 },
72 { "--textw", eslARG_INT, "120", NULL, "n>=120",NULL, NULL, "--notextw", "set max width of ASCII text output lines", 2 },
73 /* Control of reporting thresholds */
74 { "-E", eslARG_REAL, "10.0", NULL, "x>0", NULL, NULL, REPOPTS, "report sequences <= this E-value threshold in output", 4 },
75 { "-T", eslARG_REAL, FALSE, NULL, NULL, NULL, NULL, REPOPTS, "report sequences >= this score threshold in output", 4 },
76 { "--domE", eslARG_REAL, "10.0", NULL, "x>0", NULL, NULL, DOMREPOPTS, "report domains <= this E-value threshold in output", 4 },
77 { "--domT", eslARG_REAL, FALSE, NULL, NULL, NULL, NULL, DOMREPOPTS, "report domains >= this score cutoff in output", 4 },
78 /* Control of inclusion (significance) thresholds */
79 { "--incE", eslARG_REAL, "0.01", NULL, "x>0", NULL, NULL, INCOPTS, "consider sequences <= this E-value threshold as significant", 5 },
80 { "--incT", eslARG_REAL, FALSE, NULL, NULL, NULL, NULL, INCOPTS, "consider sequences >= this score threshold as significant", 5 },
81 { "--incdomE", eslARG_REAL, "0.01", NULL, "x>0", NULL, NULL, INCDOMOPTS, "consider domains <= this E-value threshold as significant", 5 },
82 { "--incdomT", eslARG_REAL, FALSE, NULL, NULL, NULL, NULL, INCDOMOPTS, "consider domains >= this score threshold as significant", 5 },
83 /* Model-specific thresholding for both reporting and inclusion */
84 { "--cut_ga", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, THRESHOPTS, "use profile's GA gathering cutoffs to set all thresholding", 6 },
85 { "--cut_nc", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, THRESHOPTS, "use profile's NC noise cutoffs to set all thresholding", 6 },
86 { "--cut_tc", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, THRESHOPTS, "use profile's TC trusted cutoffs to set all thresholding", 6 },
87 /* Control of acceleration pipeline */
88 { "--max", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, "--F1,--F2,--F3", "Turn all heuristic filters off (less speed, more power)", 7 },
89 { "--F1", eslARG_REAL, "0.02", NULL, NULL, NULL, NULL, "--max", "Stage 1 (MSV) threshold: promote hits w/ P <= F1", 7 },
90 { "--F2", eslARG_REAL, "1e-3", NULL, NULL, NULL, NULL, "--max", "Stage 2 (Vit) threshold: promote hits w/ P <= F2", 7 },
91 { "--F3", eslARG_REAL, "1e-5", NULL, NULL, NULL, NULL, "--max", "Stage 3 (Fwd) threshold: promote hits w/ P <= F3", 7 },
92 { "--nobias", eslARG_NONE, NULL, NULL, NULL, NULL, NULL, "--max", "turn off composition bias filter", 7 },
93
94 /* Other options */
95 { "--nonull2", eslARG_NONE, NULL, NULL, NULL, NULL, NULL, NULL, "turn off biased composition score corrections", 12 },
96 { "-Z", eslARG_REAL, FALSE, NULL, "x>0", NULL, NULL, NULL, "set # of comparisons done, for E-value calculation", 12 },
97 { "--domZ", eslARG_REAL, FALSE, NULL, "x>0", NULL, NULL, NULL, "set # of significant seqs, for domain E-value calculation", 12 },
98 { "--seed", eslARG_INT, "42", NULL, "n>=0", NULL, NULL, NULL, "set RNG seed to <n> (if 0: one-time arbitrary seed)", 12 },
99 { "--tformat", eslARG_STRING, NULL, NULL, NULL, NULL, NULL, NULL, "assert target <seqfile> is in format <s>: no autodetection", 12 },
100
101 #ifdef HMMER_THREADS
102 { "--cpu", eslARG_INT, p7_NCPU,"HMMER_NCPU","n>=0",NULL, NULL, CPUOPTS, "number of parallel CPU workers to use for multithreads", 12 },
103 #endif
104 #ifdef HMMER_MPI
105 { "--stall", eslARG_NONE, FALSE, NULL, NULL, NULL,"--mpi", NULL, "arrest after start: for debugging MPI under gdb", 12 },
106 { "--mpi", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, MPIOPTS, "run as an MPI parallel program", 12 },
107 #endif
108
109 /* Restrict search to subset of database - hidden because these flags are
110 * (a) currently for internal use
111 * (b) probably going to change
112 * Doesn't work with MPI
113 */
114 { "--restrictdb_stkey", eslARG_STRING, "0", NULL, NULL, NULL, NULL, NULL, "Search starts at the sequence with name <s> (not with MPI)", 99 },
115 { "--restrictdb_n",eslARG_INT, "-1", NULL, NULL, NULL, NULL, NULL, "Search <j> target sequences (starting at --restrictdb_stkey)", 99 },
116 { "--ssifile", eslARG_STRING, NULL, NULL, NULL, NULL, NULL, NULL, "restrictdb_x values require ssi file. Override default to <s>", 99 },
117
118 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 },
119 };
120
121 static char usage[] = "[options] <hmmfile> <seqdb>";
122 static char banner[] = "search profile(s) against a sequence database";
123
124 /* struct cfg_s : "Global" application configuration shared by all threads/processes
125 *
126 * This structure is passed to routines within main.c, as a means of semi-encapsulation
127 * of shared data amongst different parallel processes (threads or MPI processes).
128 */
129 struct cfg_s {
130 char *dbfile; /* target sequence database file */
131 char *hmmfile; /* query HMM file */
132
133 int do_mpi; /* TRUE if we're doing MPI parallelization */
134 int nproc; /* how many MPI processes, total */
135 int my_rank; /* who am I, in 0..nproc-1 */
136
137 char *firstseq_key; /* name of the first sequence in the restricted db range */
138 int n_targetseq; /* number of sequences in the restricted range */
139 };
140
141 static int serial_master(ESL_GETOPTS *go, struct cfg_s *cfg);
142 static int serial_loop (WORKER_INFO *info, ESL_SQFILE *dbfp, int n_targetseqs);
143
144 #ifdef HMMER_THREADS
145 #define BLOCK_SIZE 1000
146
147 static int thread_loop(ESL_THREADS *obj, ESL_WORK_QUEUE *queue, ESL_SQFILE *dbfp, int n_targetseqs);
148 static void pipeline_thread(void *arg);
149 #endif
150
151 #ifdef HMMER_MPI
152 static int mpi_master (ESL_GETOPTS *go, struct cfg_s *cfg);
153 static int mpi_worker (ESL_GETOPTS *go, struct cfg_s *cfg);
154 #endif
155
156
157 static int
process_commandline(int argc,char ** argv,ESL_GETOPTS ** ret_go,char ** ret_hmmfile,char ** ret_seqfile)158 process_commandline(int argc, char **argv, ESL_GETOPTS **ret_go, char **ret_hmmfile, char **ret_seqfile)
159 {
160 ESL_GETOPTS *go = esl_getopts_Create(options);
161 int status;
162
163 if (esl_opt_ProcessEnvironment(go) != eslOK) { if (printf("Failed to process environment: %s\n", go->errbuf) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
164 if (esl_opt_ProcessCmdline(go, argc, argv) != eslOK) { if (printf("Failed to parse command line: %s\n", go->errbuf) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
165 if (esl_opt_VerifyConfig(go) != eslOK) { if (printf("Failed to parse command line: %s\n", go->errbuf) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
166
167 /* help format: */
168 if (esl_opt_GetBoolean(go, "-h") == TRUE)
169 {
170 p7_banner(stdout, argv[0], banner);
171 esl_usage(stdout, argv[0], usage);
172 if (puts("\nBasic options:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
173 esl_opt_DisplayHelp(stdout, go, 1, 2, 80); /* 1= group; 2 = indentation; 80=textwidth*/
174
175 if (puts("\nOptions directing output:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
176 esl_opt_DisplayHelp(stdout, go, 2, 2, 80);
177
178 if (puts("\nOptions controlling reporting thresholds:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
179 esl_opt_DisplayHelp(stdout, go, 4, 2, 80);
180
181 if (puts("\nOptions controlling inclusion (significance) thresholds:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
182 esl_opt_DisplayHelp(stdout, go, 5, 2, 80);
183
184 if (puts("\nOptions controlling model-specific thresholding:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
185 esl_opt_DisplayHelp(stdout, go, 6, 2, 80);
186
187 if (puts("\nOptions controlling acceleration heuristics:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
188 esl_opt_DisplayHelp(stdout, go, 7, 2, 80);
189
190 if (puts("\nOther expert options:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
191 esl_opt_DisplayHelp(stdout, go, 12, 2, 80);
192 exit(0);
193 }
194
195 if (esl_opt_ArgNumber(go) != 2) { if (puts("Incorrect number of command line arguments.") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
196 if ((*ret_hmmfile = esl_opt_GetArg(go, 1)) == NULL) { if (puts("Failed to get <hmmfile> argument on command line") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
197 if ((*ret_seqfile = esl_opt_GetArg(go, 2)) == NULL) { if (puts("Failed to get <seqdb> argument on command line") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
198
199 /* Validate any attempted use of stdin streams */
200 if (strcmp(*ret_hmmfile, "-") == 0 && strcmp(*ret_seqfile, "-") == 0)
201 { if (puts("Either <hmmfile> or <seqdb> may be '-' (to read from stdin), but not both.") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
202
203 *ret_go = go;
204 return eslOK;
205
206 FAILURE: /* all errors handled here are user errors, so be polite. */
207 esl_usage(stdout, argv[0], usage);
208 if (puts("\nwhere most common options are:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
209 esl_opt_DisplayHelp(stdout, go, 1, 2, 80); /* 1= group; 2 = indentation; 80=textwidth*/
210 if (printf("\nTo see more help on available options, do %s -h\n\n", argv[0]) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
211 esl_getopts_Destroy(go);
212 exit(1);
213
214 ERROR:
215 if (go) esl_getopts_Destroy(go);
216 exit(status);
217 }
218
219 static int
output_header(FILE * ofp,const ESL_GETOPTS * go,char * hmmfile,char * seqfile)220 output_header(FILE *ofp, const ESL_GETOPTS *go, char *hmmfile, char *seqfile)
221 {
222 p7_banner(ofp, go->argv[0], banner);
223
224 if (fprintf(ofp, "# query HMM file: %s\n", hmmfile) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
225 if (fprintf(ofp, "# target sequence database: %s\n", seqfile) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
226 if (esl_opt_IsUsed(go, "-o") && fprintf(ofp, "# output directed to file: %s\n", esl_opt_GetString(go, "-o")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
227 if (esl_opt_IsUsed(go, "-A") && fprintf(ofp, "# MSA of all hits saved to file: %s\n", esl_opt_GetString(go, "-A")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
228 if (esl_opt_IsUsed(go, "--tblout") && fprintf(ofp, "# per-seq hits tabular output: %s\n", esl_opt_GetString(go, "--tblout")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
229 if (esl_opt_IsUsed(go, "--domtblout") && fprintf(ofp, "# per-dom hits tabular output: %s\n", esl_opt_GetString(go, "--domtblout")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
230 if (esl_opt_IsUsed(go, "--pfamtblout") && fprintf(ofp, "# pfam-style tabular hit output: %s\n", esl_opt_GetString(go, "--pfamtblout")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
231 if (esl_opt_IsUsed(go, "--acc") && fprintf(ofp, "# prefer accessions over names: yes\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
232 if (esl_opt_IsUsed(go, "--noali") && fprintf(ofp, "# show alignments in output: no\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
233 if (esl_opt_IsUsed(go, "--notextw") && fprintf(ofp, "# max ASCII text line length: unlimited\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
234 if (esl_opt_IsUsed(go, "--textw") && fprintf(ofp, "# max ASCII text line length: %d\n", esl_opt_GetInteger(go, "--textw")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
235 if (esl_opt_IsUsed(go, "-E") && fprintf(ofp, "# sequence reporting threshold: E-value <= %g\n", esl_opt_GetReal(go, "-E")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
236 if (esl_opt_IsUsed(go, "-T") && fprintf(ofp, "# sequence reporting threshold: score >= %g\n", esl_opt_GetReal(go, "-T")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
237 if (esl_opt_IsUsed(go, "--domE") && fprintf(ofp, "# domain reporting threshold: E-value <= %g\n", esl_opt_GetReal(go, "--domE")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
238 if (esl_opt_IsUsed(go, "--domT") && fprintf(ofp, "# domain reporting threshold: score >= %g\n", esl_opt_GetReal(go, "--domT")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
239 if (esl_opt_IsUsed(go, "--incE") && fprintf(ofp, "# sequence inclusion threshold: E-value <= %g\n", esl_opt_GetReal(go, "--incE")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
240 if (esl_opt_IsUsed(go, "--incT") && fprintf(ofp, "# sequence inclusion threshold: score >= %g\n", esl_opt_GetReal(go, "--incT")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
241 if (esl_opt_IsUsed(go, "--incdomE") && fprintf(ofp, "# domain inclusion threshold: E-value <= %g\n", esl_opt_GetReal(go, "--incdomE")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
242 if (esl_opt_IsUsed(go, "--incdomT") && fprintf(ofp, "# domain inclusion threshold: score >= %g\n", esl_opt_GetReal(go, "--incdomT")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
243 if (esl_opt_IsUsed(go, "--cut_ga") && fprintf(ofp, "# model-specific thresholding: GA cutoffs\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
244 if (esl_opt_IsUsed(go, "--cut_nc") && fprintf(ofp, "# model-specific thresholding: NC cutoffs\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
245 if (esl_opt_IsUsed(go, "--cut_tc") && fprintf(ofp, "# model-specific thresholding: TC cutoffs\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
246 if (esl_opt_IsUsed(go, "--max") && fprintf(ofp, "# Max sensitivity mode: on [all heuristic filters off]\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
247 if (esl_opt_IsUsed(go, "--F1") && fprintf(ofp, "# MSV filter P threshold: <= %g\n", esl_opt_GetReal(go, "--F1")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
248 if (esl_opt_IsUsed(go, "--F2") && fprintf(ofp, "# Vit filter P threshold: <= %g\n", esl_opt_GetReal(go, "--F2")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
249 if (esl_opt_IsUsed(go, "--F3") && fprintf(ofp, "# Fwd filter P threshold: <= %g\n", esl_opt_GetReal(go, "--F3")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
250 if (esl_opt_IsUsed(go, "--nobias") && fprintf(ofp, "# biased composition HMM filter: off\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
251 if (esl_opt_IsUsed(go, "--restrictdb_stkey") && fprintf(ofp, "# Restrict db to start at seq key: %s\n", esl_opt_GetString(go, "--restrictdb_stkey")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
252 if (esl_opt_IsUsed(go, "--restrictdb_n") && fprintf(ofp, "# Restrict db to # target seqs: %d\n", esl_opt_GetInteger(go, "--restrictdb_n")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
253 if (esl_opt_IsUsed(go, "--ssifile") && fprintf(ofp, "# Override ssi file to: %s\n", esl_opt_GetString(go, "--ssifile")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
254
255 if (esl_opt_IsUsed(go, "--nonull2") && fprintf(ofp, "# null2 bias corrections: off\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
256 if (esl_opt_IsUsed(go, "-Z") && fprintf(ofp, "# sequence search space set to: %.0f\n", esl_opt_GetReal(go, "-Z")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
257 if (esl_opt_IsUsed(go, "--domZ") && fprintf(ofp, "# domain search space set to: %.0f\n", esl_opt_GetReal(go, "--domZ")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
258 if (esl_opt_IsUsed(go, "--seed")) {
259 if (esl_opt_GetInteger(go, "--seed") == 0 && fprintf(ofp, "# random number seed: one-time arbitrary\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
260 else if ( fprintf(ofp, "# random number seed set to: %d\n", esl_opt_GetInteger(go, "--seed")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
261 }
262 if (esl_opt_IsUsed(go, "--tformat") && fprintf(ofp, "# targ <seqfile> format asserted: %s\n", esl_opt_GetString(go, "--tformat")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
263 #ifdef HMMER_THREADS
264 if (esl_opt_IsUsed(go, "--cpu") && fprintf(ofp, "# number of worker threads: %d\n", esl_opt_GetInteger(go, "--cpu")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
265 #endif
266 #ifdef HMMER_MPI
267 if (esl_opt_IsUsed(go, "--mpi") && fprintf(ofp, "# MPI: on\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
268 #endif
269 if (fprintf(ofp, "# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
270 return eslOK;
271 }
272
273 int
main(int argc,char ** argv)274 main(int argc, char **argv)
275 {
276 ESL_GETOPTS *go = NULL;
277 struct cfg_s cfg;
278 int status = eslOK;
279
280 impl_Init(); /* processor specific initialization */
281 p7_FLogsumInit(); /* we're going to use table-driven Logsum() approximations at times */
282
283 /* Initialize what we can in the config structure (without knowing the alphabet yet)
284 */
285 cfg.hmmfile = NULL;
286 cfg.dbfile = NULL;
287 cfg.do_mpi = FALSE; /* this gets reset below, if we init MPI */
288 cfg.nproc = 0; /* this gets reset below, if we init MPI */
289 cfg.my_rank = 0; /* this gets reset below, if we init MPI */
290 cfg.firstseq_key = NULL;
291 cfg.n_targetseq = -1;
292
293 process_commandline(argc, argv, &go, &cfg.hmmfile, &cfg.dbfile);
294
295 /* is the range restricted? */
296 if (esl_opt_IsUsed(go, "--restrictdb_stkey") )
297 if ((cfg.firstseq_key = esl_opt_GetString(go, "--restrictdb_stkey")) == NULL) p7_Fail("Failure capturing --restrictdb_stkey\n");
298
299 if (esl_opt_IsUsed(go, "--restrictdb_n") )
300 cfg.n_targetseq = esl_opt_GetInteger(go, "--restrictdb_n");
301
302 if ( cfg.n_targetseq != -1 && cfg.n_targetseq < 1 )
303 p7_Fail("--restrictdb_n must be >= 1\n");
304
305
306 /* Figure out who we are, and send control there:
307 * we might be an MPI master, an MPI worker, or a serial program.
308 */
309 #ifdef HMMER_MPI
310 /* pause the execution of the programs execution until the user has a
311 * chance to attach with a debugger and send a signal to resume execution
312 * i.e. (gdb) signal SIGCONT
313 */
314 if (esl_opt_GetBoolean(go, "--stall")) pause();
315
316 if (esl_opt_GetBoolean(go, "--mpi"))
317 {
318 cfg.do_mpi = TRUE;
319 MPI_Init(&argc, &argv);
320 MPI_Comm_rank(MPI_COMM_WORLD, &(cfg.my_rank));
321 MPI_Comm_size(MPI_COMM_WORLD, &(cfg.nproc));
322
323 if (cfg.my_rank > 0) status = mpi_worker(go, &cfg);
324 else status = mpi_master(go, &cfg);
325
326 MPI_Finalize();
327 }
328 else
329 #endif /*HMMER_MPI*/
330 {
331 status = serial_master(go, &cfg);
332 }
333
334 esl_getopts_Destroy(go);
335
336 return status;
337 }
338
339
340 /* serial_master()
341 * The serial version of hmmsearch.
342 * For each query HMM in <hmmfile> search the database for hits.
343 *
344 * A master can only return if it's successful. All errors are handled
345 * immediately and fatally with p7_Fail(). We also use the
346 * ESL_EXCEPTION and ERROR: mechanisms, but only because we know we're
347 * using a fatal exception handler.
348 */
349 static int
serial_master(ESL_GETOPTS * go,struct cfg_s * cfg)350 serial_master(ESL_GETOPTS *go, struct cfg_s *cfg)
351 {
352 FILE *ofp = stdout; /* results output file (-o) */
353 FILE *afp = NULL; /* alignment output file (-A) */
354 FILE *tblfp = NULL; /* output stream for tabular per-seq (--tblout) */
355 FILE *domtblfp = NULL; /* output stream for tabular per-dom (--domtblout) */
356 FILE *pfamtblfp= NULL; /* output stream for pfam tabular output (--pfamtblout) */
357 P7_HMMFILE *hfp = NULL; /* open input HMM file */
358 ESL_SQFILE *dbfp = NULL; /* open input sequence file */
359 P7_HMM *hmm = NULL; /* one HMM query */
360 ESL_ALPHABET *abc = NULL; /* digital alphabet */
361 int dbfmt = eslSQFILE_UNKNOWN; /* format code for sequence database file */
362 ESL_STOPWATCH *w;
363 int textw = 0;
364 int nquery = 0;
365 int status = eslOK;
366 int hstatus = eslOK;
367 int sstatus = eslOK;
368 int i;
369
370 int ncpus = 0;
371
372 int infocnt = 0;
373 WORKER_INFO *info = NULL;
374 #ifdef HMMER_THREADS
375 ESL_SQ_BLOCK *block = NULL;
376 ESL_THREADS *threadObj= NULL;
377 ESL_WORK_QUEUE *queue = NULL;
378 #endif
379 char errbuf[eslERRBUFSIZE];
380
381 w = esl_stopwatch_Create();
382
383 if (esl_opt_GetBoolean(go, "--notextw")) textw = 0;
384 else textw = esl_opt_GetInteger(go, "--textw");
385
386 if (esl_opt_IsOn(go, "--tformat")) {
387 dbfmt = esl_sqio_EncodeFormat(esl_opt_GetString(go, "--tformat"));
388 if (dbfmt == eslSQFILE_UNKNOWN) p7_Fail("%s is not a recognized sequence database file format\n", esl_opt_GetString(go, "--tformat"));
389 }
390
391 /* Open the target sequence database */
392 status = esl_sqfile_Open(cfg->dbfile, dbfmt, p7_SEQDBENV, &dbfp);
393 if (status == eslENOTFOUND) p7_Fail("Failed to open sequence file %s for reading\n", cfg->dbfile);
394 else if (status == eslEFORMAT) p7_Fail("Sequence file %s is empty or misformatted\n", cfg->dbfile);
395 else if (status == eslEINVAL) p7_Fail("Can't autodetect format of a stdin or .gz seqfile");
396 else if (status != eslOK) p7_Fail("Unexpected error %d opening sequence file %s\n", status, cfg->dbfile);
397
398
399 if (esl_opt_IsUsed(go, "--restrictdb_stkey") || esl_opt_IsUsed(go, "--restrictdb_n")) {
400 if (esl_opt_IsUsed(go, "--ssifile"))
401 esl_sqfile_OpenSSI(dbfp, esl_opt_GetString(go, "--ssifile"));
402 else
403 esl_sqfile_OpenSSI(dbfp, NULL);
404 }
405
406
407
408 /* Open the query profile HMM file */
409 status = p7_hmmfile_OpenE(cfg->hmmfile, NULL, &hfp, errbuf);
410 if (status == eslENOTFOUND) p7_Fail("File existence/permissions problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
411 else if (status == eslEFORMAT) p7_Fail("File format problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
412 else if (status != eslOK) p7_Fail("Unexpected error %d in opening HMM file %s.\n%s\n", status, cfg->hmmfile, errbuf);
413
414 /* Open the results output files */
415 if (esl_opt_IsOn(go, "-o")) { if ((ofp = fopen(esl_opt_GetString(go, "-o"), "w")) == NULL) p7_Fail("Failed to open output file %s for writing\n", esl_opt_GetString(go, "-o")); }
416 if (esl_opt_IsOn(go, "-A")) { if ((afp = fopen(esl_opt_GetString(go, "-A"), "w")) == NULL) p7_Fail("Failed to open alignment file %s for writing\n", esl_opt_GetString(go, "-A")); }
417 if (esl_opt_IsOn(go, "--tblout")) { if ((tblfp = fopen(esl_opt_GetString(go, "--tblout"), "w")) == NULL) esl_fatal("Failed to open tabular per-seq output file %s for writing\n", esl_opt_GetString(go, "--tblout")); }
418 if (esl_opt_IsOn(go, "--domtblout")) { if ((domtblfp = fopen(esl_opt_GetString(go, "--domtblout"), "w")) == NULL) esl_fatal("Failed to open tabular per-dom output file %s for writing\n", esl_opt_GetString(go, "--domtblout")); }
419 if (esl_opt_IsOn(go, "--pfamtblout")){ if ((pfamtblfp = fopen(esl_opt_GetString(go, "--pfamtblout"), "w")) == NULL) esl_fatal("Failed to open pfam-style tabular output file %s for writing\n", esl_opt_GetString(go, "--pfamtblout")); }
420
421 #ifdef HMMER_THREADS
422 /* initialize thread data */
423 ncpus = ESL_MIN( esl_opt_GetInteger(go, "--cpu"), esl_threads_GetCPUCount());
424 if (ncpus > 0)
425 {
426 threadObj = esl_threads_Create(&pipeline_thread);
427 queue = esl_workqueue_Create(ncpus * 2);
428 }
429 #endif
430
431 infocnt = (ncpus == 0) ? 1 : ncpus;
432 ESL_ALLOC(info, sizeof(*info) * infocnt);
433
434 /* <abc> is not known 'til first HMM is read. */
435 hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
436 if (hstatus == eslOK)
437 {
438 /* One-time initializations after alphabet <abc> becomes known */
439 output_header(ofp, go, cfg->hmmfile, cfg->dbfile);
440 esl_sqfile_SetDigital(dbfp, abc); //ReadBlock requires knowledge of the alphabet to decide how best to read blocks
441
442 for (i = 0; i < infocnt; ++i)
443 {
444 info[i].bg = p7_bg_Create(abc);
445 #ifdef HMMER_THREADS
446 info[i].queue = queue;
447 #endif
448 }
449
450 #ifdef HMMER_THREADS
451 for (i = 0; i < ncpus * 2; ++i)
452 {
453 block = esl_sq_CreateDigitalBlock(BLOCK_SIZE, abc);
454 if (block == NULL) esl_fatal("Failed to allocate sequence block");
455
456 status = esl_workqueue_Init(queue, block);
457 if (status != eslOK) esl_fatal("Failed to add block to work queue");
458 }
459 #endif
460 }
461
462 /* Outer loop: over each query HMM in <hmmfile>. */
463 while (hstatus == eslOK)
464 {
465 P7_PROFILE *gm = NULL;
466 P7_OPROFILE *om = NULL; /* optimized query profile */
467
468 nquery++;
469 esl_stopwatch_Start(w);
470
471 /* seqfile may need to be rewound (multiquery mode) */
472 if (nquery > 1)
473 {
474 if (! esl_sqfile_IsRewindable(dbfp))
475 esl_fatal("Target sequence file %s isn't rewindable; can't search it with multiple queries", cfg->dbfile);
476
477 if (! esl_opt_IsUsed(go, "--restrictdb_stkey") )
478 esl_sqfile_Position(dbfp, 0); //only re-set current position to 0 if we're not planning to set it in a moment
479 }
480
481 if ( cfg->firstseq_key != NULL ) { //it's tempting to want to do this once and capture the offset position for future passes, but ncbi files make this non-trivial, so this keeps it general
482 sstatus = esl_sqfile_PositionByKey(dbfp, cfg->firstseq_key);
483 if (sstatus != eslOK)
484 p7_Fail("Failure setting restrictdb_stkey to %d\n", cfg->firstseq_key);
485 }
486
487 if (fprintf(ofp, "Query: %s [M=%d]\n", hmm->name, hmm->M) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
488 if (hmm->acc) { if (fprintf(ofp, "Accession: %s\n", hmm->acc) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
489 if (hmm->desc) { if (fprintf(ofp, "Description: %s\n", hmm->desc) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
490
491 /* Convert to an optimized model */
492 gm = p7_profile_Create (hmm->M, abc);
493 om = p7_oprofile_Create(hmm->M, abc);
494 p7_ProfileConfig(hmm, info->bg, gm, 100, p7_LOCAL); /* 100 is a dummy length for now; and MSVFilter requires local mode */
495 p7_oprofile_Convert(gm, om); /* <om> is now p7_LOCAL, multihit */
496
497 for (i = 0; i < infocnt; ++i)
498 {
499 /* Create processing pipeline and hit list */
500 info[i].th = p7_tophits_Create();
501 info[i].om = p7_oprofile_Clone(om);
502 info[i].pli = p7_pipeline_Create(go, om->M, 100, FALSE, p7_SEARCH_SEQS); /* L_hint = 100 is just a dummy for now */
503 status = p7_pli_NewModel(info[i].pli, info[i].om, info[i].bg);
504 if (status == eslEINVAL) p7_Fail(info->pli->errbuf);
505
506 #ifdef HMMER_THREADS
507 if (ncpus > 0) esl_threads_AddThread(threadObj, &info[i]);
508 #endif
509 }
510
511 #ifdef HMMER_THREADS
512 if (ncpus > 0) sstatus = thread_loop(threadObj, queue, dbfp, cfg->n_targetseq);
513 else sstatus = serial_loop(info, dbfp, cfg->n_targetseq);
514 #else
515 sstatus = serial_loop(info, dbfp, cfg->n_targetseq);
516 #endif
517 switch(sstatus)
518 {
519 case eslEFORMAT:
520 esl_fatal("Parse failed (sequence file %s):\n%s\n",
521 dbfp->filename, esl_sqfile_GetErrorBuf(dbfp));
522 break;
523 case eslEOF:
524 /* do nothing */
525 break;
526 default:
527 esl_fatal("Unexpected error %d reading sequence file %s", sstatus, dbfp->filename);
528 }
529
530 /* merge the results of the search results */
531 for (i = 1; i < infocnt; ++i)
532 {
533 p7_tophits_Merge(info[0].th, info[i].th);
534 p7_pipeline_Merge(info[0].pli, info[i].pli);
535
536 p7_pipeline_Destroy(info[i].pli);
537 p7_tophits_Destroy(info[i].th);
538 p7_oprofile_Destroy(info[i].om);
539 }
540
541 /* Print the results. */
542 p7_tophits_SortBySortkey(info->th);
543 p7_tophits_Threshold(info->th, info->pli);
544 p7_tophits_Targets(ofp, info->th, info->pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
545 p7_tophits_Domains(ofp, info->th, info->pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
546
547 if (tblfp) p7_tophits_TabularTargets(tblfp, hmm->name, hmm->acc, info->th, info->pli, (nquery == 1));
548 if (domtblfp) p7_tophits_TabularDomains(domtblfp, hmm->name, hmm->acc, info->th, info->pli, (nquery == 1));
549 if (pfamtblfp) p7_tophits_TabularXfam(pfamtblfp, hmm->name, hmm->acc, info->th, info->pli);
550
551 esl_stopwatch_Stop(w);
552 p7_pli_Statistics(ofp, info->pli, w);
553 if (fprintf(ofp, "//\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
554
555 /* Output the results in an MSA (-A option) */
556 if (afp) {
557 ESL_MSA *msa = NULL;
558
559 if (p7_tophits_Alignment(info->th, abc, NULL, NULL, 0, p7_ALL_CONSENSUS_COLS, &msa) == eslOK)
560 {
561 esl_msa_SetName (msa, hmm->name, -1);
562 esl_msa_SetAccession(msa, hmm->acc, -1);
563 esl_msa_SetDesc (msa, hmm->desc, -1);
564 esl_msa_FormatAuthor(msa, "hmmsearch (HMMER %s)", HMMER_VERSION);
565
566 if (textw > 0) esl_msafile_Write(afp, msa, eslMSAFILE_STOCKHOLM);
567 else esl_msafile_Write(afp, msa, eslMSAFILE_PFAM);
568
569 if (fprintf(ofp, "# Alignment of %d hits satisfying inclusion thresholds saved to: %s\n", msa->nseq, esl_opt_GetString(go, "-A")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
570 }
571 else { if (fprintf(ofp, "# No hits satisfy inclusion thresholds; no alignment saved\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
572
573 esl_msa_Destroy(msa);
574 }
575
576 p7_pipeline_Destroy(info->pli);
577 p7_tophits_Destroy(info->th);
578 p7_oprofile_Destroy(info->om);
579 p7_oprofile_Destroy(om);
580 p7_profile_Destroy(gm);
581 p7_hmm_Destroy(hmm);
582
583 hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
584 } /* end outer loop over query HMMs */
585
586 switch(hstatus) {
587 case eslEOD: p7_Fail("read failed, HMM file %s may be truncated?", cfg->hmmfile); break;
588 case eslEFORMAT: p7_Fail("bad file format in HMM file %s", cfg->hmmfile); break;
589 case eslEINCOMPAT: p7_Fail("HMM file %s contains different alphabets", cfg->hmmfile); break;
590 case eslEOF: /* do nothing. EOF is what we want. */ break;
591 default: p7_Fail("Unexpected error (%d) in reading HMMs from %s", hstatus, cfg->hmmfile);
592 }
593
594
595 /* Terminate outputs... any last words?
596 */
597 if (tblfp) p7_tophits_TabularTail(tblfp, "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
598 if (domtblfp) p7_tophits_TabularTail(domtblfp, "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
599 if (pfamtblfp) p7_tophits_TabularTail(pfamtblfp,"hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
600 if (ofp) { if (fprintf(ofp, "[ok]\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
601
602 /* Cleanup - prepare for exit
603 */
604 for (i = 0; i < infocnt; ++i)
605 p7_bg_Destroy(info[i].bg);
606
607 #ifdef HMMER_THREADS
608 if (ncpus > 0)
609 {
610 esl_workqueue_Reset(queue);
611 while (esl_workqueue_Remove(queue, (void **) &block) == eslOK)
612 esl_sq_DestroyBlock(block);
613 esl_workqueue_Destroy(queue);
614 esl_threads_Destroy(threadObj);
615 }
616 #endif
617
618 free(info);
619 p7_hmmfile_Close(hfp);
620 esl_sqfile_Close(dbfp);
621 esl_alphabet_Destroy(abc);
622 esl_stopwatch_Destroy(w);
623
624 if (ofp != stdout) fclose(ofp);
625 if (afp) fclose(afp);
626 if (tblfp) fclose(tblfp);
627 if (domtblfp) fclose(domtblfp);
628 if (pfamtblfp) fclose(pfamtblfp);
629
630 return eslOK;
631
632 ERROR:
633 return eslFAIL;
634 }
635
636 #ifdef HMMER_MPI
637
638 /* Define common tags used by the MPI master/slave processes */
639 #define HMMER_ERROR_TAG 1
640 #define HMMER_HMM_TAG 2
641 #define HMMER_SEQUENCE_TAG 3
642 #define HMMER_BLOCK_TAG 4
643 #define HMMER_PIPELINE_TAG 5
644 #define HMMER_TOPHITS_TAG 6
645 #define HMMER_HIT_TAG 7
646 #define HMMER_TERMINATING_TAG 8
647 #define HMMER_READY_TAG 9
648
649 /* mpi_failure()
650 * Generate an error message. If the clients rank is not 0, a
651 * message is created with the error message and sent to the
652 * master process for handling.
653 */
654 static void
mpi_failure(char * format,...)655 mpi_failure(char *format, ...)
656 {
657 va_list argp;
658 int status = eslFAIL;
659 int len;
660 int rank;
661 char str[512];
662
663 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
664
665 /* format the error mesg */
666 va_start(argp, format);
667 len = vsnprintf(str, sizeof(str), format, argp);
668 va_end(argp);
669
670 /* make sure the error string is terminated */
671 str[sizeof(str)-1] = '\0';
672
673 /* if the caller is the master, print the results and abort */
674 if (rank == 0)
675 {
676 if (fprintf(stderr, "\nError: ") < 0) exit(eslEWRITE);
677 if (fprintf(stderr, "%s", str) < 0) exit(eslEWRITE);
678 if (fprintf(stderr, "\n") < 0) exit(eslEWRITE);
679 fflush(stderr);
680
681 MPI_Abort(MPI_COMM_WORLD, status);
682 exit(1);
683 }
684 else
685 {
686 MPI_Send(str, len, MPI_CHAR, 0, HMMER_ERROR_TAG, MPI_COMM_WORLD);
687 pause();
688 }
689 }
690
691 #define MAX_BLOCK_SIZE (512*1024)
692
693 typedef struct {
694 uint64_t offset;
695 uint64_t length;
696 uint64_t count;
697 } SEQ_BLOCK;
698
699 typedef struct {
700 int complete;
701 int size;
702 int current;
703 int last;
704 SEQ_BLOCK *blocks;
705 } BLOCK_LIST;
706
707 /* this routine parses the database keeping track of the blocks
708 * offset within the file, number of sequences and the length
709 * of the block. These blocks are passed as work units to the
710 * MPI workers. If multiple hmm's are in the query file, the
711 * blocks are reused without parsing the database a second time.
712 */
next_block(ESL_SQFILE * sqfp,ESL_SQ * sq,BLOCK_LIST * list,SEQ_BLOCK * block,int n_targetseqs)713 int next_block(ESL_SQFILE *sqfp, ESL_SQ *sq, BLOCK_LIST *list, SEQ_BLOCK *block, int n_targetseqs)
714 {
715 int status = eslOK;
716
717 /* if the list has been calculated, use it instead of parsing the database */
718 if (list->complete)
719 {
720 if (list->current == list->last)
721 {
722 block->offset = 0;
723 block->length = 0;
724 block->count = 0;
725
726 status = eslEOF;
727 }
728 else
729 {
730 int inx = list->current++;
731
732 block->offset = list->blocks[inx].offset;
733 block->length = list->blocks[inx].length;
734 block->count = list->blocks[inx].count;
735
736 status = eslOK;
737 }
738
739 return status;
740 }
741
742 block->offset = 0;
743 block->length = 0;
744 block->count = 0;
745
746 esl_sq_Reuse(sq);
747 if (n_targetseqs == 0) status = eslEOF; //this is to handle the end-case of a restrictdb scenario, where no more targets are required, and we want to mark the list as complete
748 while (block->length < MAX_BLOCK_SIZE && (n_targetseqs <0 || block->count < n_targetseqs) && (status = esl_sqio_ReadInfo(sqfp, sq)) == eslOK)
749 {
750 if (block->count == 0) block->offset = sq->roff;
751 block->length = sq->eoff - block->offset + 1;
752 block->count++;
753 esl_sq_Reuse(sq);
754 }
755
756 if (block->count > 0)
757 if (status == eslEOF || block->count == n_targetseqs)
758 status = eslOK;
759 if (status == eslEOF) list->complete = 1;
760
761 /* add the block to the list of known blocks */
762 if (status == eslOK)
763 {
764 int inx;
765
766 if (list->last >= list->size)
767 {
768 void *tmp;
769 list->size += 500;
770 ESL_RALLOC(list->blocks, tmp, sizeof(SEQ_BLOCK) * list->size);
771 }
772
773 inx = list->last++;
774 list->blocks[inx].offset = block->offset;
775 list->blocks[inx].length = block->length;
776 list->blocks[inx].count = block->count;
777 }
778
779 return status;
780
781 ERROR:
782 return eslEMEM;
783 }
784
785 /* mpi_master()
786 * The MPI version of hmmbuild.
787 * Follows standard pattern for a master/worker load-balanced MPI program (J1/78-79).
788 *
789 * A master can only return if it's successful.
790 * Errors in an MPI master come in two classes: recoverable and nonrecoverable.
791 *
792 * Recoverable errors include all worker-side errors, and any
793 * master-side error that do not affect MPI communication. Error
794 * messages from recoverable messages are delayed until we've cleanly
795 * shut down the workers.
796 *
797 * Unrecoverable errors are master-side errors that may affect MPI
798 * communication, meaning we cannot count on being able to reach the
799 * workers and shut them down. Unrecoverable errors result in immediate
800 * p7_Fail()'s, which will cause MPI to shut down the worker processes
801 * uncleanly.
802 */
803 static int
mpi_master(ESL_GETOPTS * go,struct cfg_s * cfg)804 mpi_master(ESL_GETOPTS *go, struct cfg_s *cfg)
805 {
806 FILE *ofp = stdout; /* results output file (-o) */
807 FILE *afp = NULL; /* alignment output file (-A) */
808 FILE *tblfp = NULL; /* output stream for tabular per-seq (--tblout) */
809 FILE *domtblfp = NULL; /* output stream for tabular per-dom (--domtblout) */
810 FILE *pfamtblfp= NULL; /* output stream for pfam-style tabular output (--pfamtblout) */
811 P7_BG *bg = NULL; /* null model */
812 P7_HMMFILE *hfp = NULL; /* open input HMM file */
813 ESL_SQFILE *dbfp = NULL; /* open input sequence file */
814 P7_HMM *hmm = NULL; /* one HMM query */
815 ESL_SQ *dbsq = NULL; /* one target sequence (digital) */
816 ESL_ALPHABET *abc = NULL; /* digital alphabet */
817 int dbfmt = eslSQFILE_UNKNOWN; /* format code for sequence database file */
818 ESL_STOPWATCH *w;
819 int textw = 0;
820 int nquery = 0;
821 int status = eslOK;
822 int hstatus = eslOK;
823 int sstatus = eslOK;
824 int dest;
825
826 char *mpi_buf = NULL; /* buffer used to pack/unpack structures */
827 int mpi_size = 0; /* size of the allocated buffer */
828 BLOCK_LIST *list = NULL;
829 SEQ_BLOCK block;
830
831 int i;
832 int size;
833 MPI_Status mpistatus;
834 char errbuf[eslERRBUFSIZE];
835
836 int n_targets;
837
838 w = esl_stopwatch_Create();
839
840 if (esl_opt_GetBoolean(go, "--notextw")) textw = 0;
841 else textw = esl_opt_GetInteger(go, "--textw");
842
843 if (esl_opt_IsOn(go, "--tformat")) {
844 dbfmt = esl_sqio_EncodeFormat(esl_opt_GetString(go, "--tformat"));
845 if (dbfmt == eslSQFILE_UNKNOWN) mpi_failure("%s is not a recognized sequence database file format\n", esl_opt_GetString(go, "--tformat"));
846 }
847
848 /* Open the target sequence database */
849 status = esl_sqfile_Open(cfg->dbfile, dbfmt, p7_SEQDBENV, &dbfp);
850 if (status == eslENOTFOUND) mpi_failure("Failed to open sequence file %s for reading\n", cfg->dbfile);
851 else if (status == eslEFORMAT) mpi_failure("Sequence file %s is empty or misformatted\n", cfg->dbfile);
852 else if (status == eslEINVAL) mpi_failure("Can't autodetect format of a stdin or .gz seqfile");
853 else if (status != eslOK) mpi_failure("Unexpected error %d opening sequence file %s\n", status, cfg->dbfile);
854
855 if (esl_opt_IsUsed(go, "--restrictdb_stkey") || esl_opt_IsUsed(go, "--restrictdb_n")) {
856 if (esl_opt_IsUsed(go, "--ssifile"))
857 esl_sqfile_OpenSSI(dbfp, esl_opt_GetString(go, "--ssifile"));
858 else
859 esl_sqfile_OpenSSI(dbfp, NULL);
860 }
861
862
863 /* Open the query profile HMM file */
864 status = p7_hmmfile_OpenE(cfg->hmmfile, NULL, &hfp, errbuf);
865 if (status == eslENOTFOUND) mpi_failure("File existence/permissions problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
866 else if (status == eslEFORMAT) mpi_failure("File format problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
867 else if (status != eslOK) mpi_failure("Unexpected error %d in opening HMM file %s.\n%s\n", status, cfg->hmmfile, errbuf);
868
869 /* Open the results output files */
870 if (esl_opt_IsOn(go, "-o") && (ofp = fopen(esl_opt_GetString(go, "-o"), "w")) == NULL)
871 mpi_failure("Failed to open output file %s for writing\n", esl_opt_GetString(go, "-o"));
872
873 if (esl_opt_IsOn(go, "-A") && (afp = fopen(esl_opt_GetString(go, "-A"), "w")) == NULL)
874 mpi_failure("Failed to open alignment file %s for writing\n", esl_opt_GetString(go, "-A"));
875
876 if (esl_opt_IsOn(go, "--tblout") && (tblfp = fopen(esl_opt_GetString(go, "--tblout"), "w")) == NULL)
877 mpi_failure("Failed to open tabular per-seq output file %s for writing\n", esl_opt_GetString(go, "--tblout"));
878
879 if (esl_opt_IsOn(go, "--domtblout") && (domtblfp = fopen(esl_opt_GetString(go, "--domtblout"), "w")) == NULL)
880 mpi_failure("Failed to open tabular per-dom output file %s for writing\n", esl_opt_GetString(go, "--domtblout"));
881
882 if (esl_opt_IsOn(go, "--pfamtblout") && (pfamtblfp = fopen(esl_opt_GetString(go, "--pfamtblout"), "w")) == NULL)
883 mpi_failure("Failed to open pfam-style tabular output file %s for writing\n", esl_opt_GetString(go, "--pfamtblout"));
884
885 ESL_ALLOC(list, sizeof(BLOCK_LIST));
886 list->complete = 0;
887 list->size = 0;
888 list->current = 0;
889 list->last = 0;
890 list->blocks = NULL;
891
892 /* <abc> is not known 'til first HMM is read. */
893 hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
894 if (hstatus == eslOK)
895 {
896 /* One-time initializations after alphabet <abc> becomes known */
897 output_header(ofp, go, cfg->hmmfile, cfg->dbfile);
898 dbsq = esl_sq_CreateDigital(abc);
899 bg = p7_bg_Create(abc);
900 }
901
902
903 if ( cfg->firstseq_key != NULL ) { //it's tempting to want to do this once and capture the offset position for future passes, but ncbi files make this non-trivial, so this keeps it general
904 sstatus = esl_sqfile_PositionByKey(dbfp, cfg->firstseq_key);
905 if (sstatus != eslOK)
906 p7_Fail("Failure setting restrictdb_stkey to %d\n", cfg->firstseq_key);
907 }
908
909 /* Outer loop: over each query HMM in <hmmfile>. */
910 while (hstatus == eslOK)
911 {
912 P7_PROFILE *gm = NULL;
913 P7_OPROFILE *om = NULL; /* optimized query profile */
914 P7_PIPELINE *pli = NULL;
915 P7_TOPHITS *th = NULL;
916 int seq_cnt = 0;
917 nquery++;
918 esl_stopwatch_Start(w);
919
920 n_targets = cfg->n_targetseq;
921
922 /* seqfile may need to be rewound (multiquery mode) */
923 if (nquery > 1) list->current = 0;
924
925 if (fprintf(ofp, "Query: %s [M=%d]\n", hmm->name, hmm->M) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
926 if (hmm->acc) { if (fprintf(ofp, "Accession: %s\n", hmm->acc) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
927 if (hmm->desc) { if (fprintf(ofp, "Description: %s\n", hmm->desc) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
928
929 /* Convert to an optimized model */
930 gm = p7_profile_Create (hmm->M, abc);
931 om = p7_oprofile_Create(hmm->M, abc);
932 p7_ProfileConfig(hmm, bg, gm, 100, p7_LOCAL);
933 p7_oprofile_Convert(gm, om);
934
935 /* Create processing pipeline and hit list */
936 th = p7_tophits_Create();
937 pli = p7_pipeline_Create(go, hmm->M, 100, FALSE, p7_SEARCH_SEQS);
938 p7_pli_NewModel(pli, om, bg);
939
940 /* Main loop: */
941 while ((n_targets==-1 || seq_cnt<=n_targets) && (sstatus = next_block(dbfp, dbsq, list, &block, n_targets-seq_cnt)) == eslOK )
942 {
943 seq_cnt += block.count;
944
945 if (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpistatus) != 0)
946 mpi_failure("MPI error %d receiving message from %d\n", mpistatus.MPI_SOURCE);
947
948 MPI_Get_count(&mpistatus, MPI_PACKED, &size);
949 if (mpi_buf == NULL || size > mpi_size) {
950 void *tmp;
951 ESL_RALLOC(mpi_buf, tmp, sizeof(char) * size);
952 mpi_size = size;
953 }
954
955 dest = mpistatus.MPI_SOURCE;
956 MPI_Recv(mpi_buf, size, MPI_PACKED, dest, mpistatus.MPI_TAG, MPI_COMM_WORLD, &mpistatus);
957
958 if (mpistatus.MPI_TAG == HMMER_ERROR_TAG)
959 mpi_failure("MPI client %d raised error:\n%s\n", dest, mpi_buf);
960 if (mpistatus.MPI_TAG != HMMER_READY_TAG)
961 mpi_failure("Unexpected tag %d from %d\n", mpistatus.MPI_TAG, dest);
962
963 MPI_Send(&block, 3, MPI_LONG_LONG_INT, dest, HMMER_BLOCK_TAG, MPI_COMM_WORLD);
964 }
965
966 if (n_targets!=-1 && seq_cnt==n_targets)
967 sstatus = eslEOF;
968
969 switch(sstatus)
970 {
971 case eslEFORMAT:
972 mpi_failure("Parse failed (sequence file %s):\n%s\n", dbfp->filename, esl_sqfile_GetErrorBuf(dbfp));
973 break;
974 case eslEOF:
975 break;
976 default:
977 mpi_failure("Unexpected error %d reading sequence file %s", sstatus, dbfp->filename);
978 }
979
980 block.offset = 0;
981 block.length = 0;
982 block.count = 0;
983
984 /* wait for all workers to finish up their work blocks */
985 for (i = 1; i < cfg->nproc; ++i)
986 {
987 if (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpistatus) != 0)
988 mpi_failure("MPI error %d receiving message from %d\n", mpistatus.MPI_SOURCE);
989
990 MPI_Get_count(&mpistatus, MPI_PACKED, &size);
991 if (mpi_buf == NULL || size > mpi_size) {
992 void *tmp;
993 ESL_RALLOC(mpi_buf, tmp, sizeof(char) * size);
994 mpi_size = size;
995 }
996
997 dest = mpistatus.MPI_SOURCE;
998 MPI_Recv(mpi_buf, size, MPI_PACKED, dest, mpistatus.MPI_TAG, MPI_COMM_WORLD, &mpistatus);
999
1000 if (mpistatus.MPI_TAG == HMMER_ERROR_TAG)
1001 mpi_failure("MPI client %d raised error:\n%s\n", dest, mpi_buf);
1002 if (mpistatus.MPI_TAG != HMMER_READY_TAG)
1003 mpi_failure("Unexpected tag %d from %d\n", mpistatus.MPI_TAG, dest);
1004 }
1005
1006 /* merge the results of the search results */
1007 for (dest = 1; dest < cfg->nproc; ++dest)
1008 {
1009 P7_PIPELINE *mpi_pli = NULL;
1010 P7_TOPHITS *mpi_th = NULL;
1011
1012 /* send an empty block to signal the worker they are done */
1013 MPI_Send(&block, 3, MPI_LONG_LONG_INT, dest, HMMER_BLOCK_TAG, MPI_COMM_WORLD);
1014
1015 /* wait for the results */
1016 if ((status = p7_tophits_MPIRecv(dest, HMMER_TOPHITS_TAG, MPI_COMM_WORLD, &mpi_buf, &mpi_size, &mpi_th)) != eslOK)
1017 mpi_failure("Unexpected error %d receiving tophits from %d", status, dest);
1018
1019 if ((status = p7_pipeline_MPIRecv(dest, HMMER_PIPELINE_TAG, MPI_COMM_WORLD, &mpi_buf, &mpi_size, go, &mpi_pli)) != eslOK)
1020 mpi_failure("Unexpected error %d receiving pipeline from %d", status, dest);
1021
1022 p7_tophits_Merge(th, mpi_th);
1023 p7_pipeline_Merge(pli, mpi_pli);
1024
1025 p7_pipeline_Destroy(mpi_pli);
1026 p7_tophits_Destroy(mpi_th);
1027 }
1028
1029 /* Print the results. */
1030 p7_tophits_SortBySortkey(th);
1031 p7_tophits_Threshold(th, pli);
1032 p7_tophits_Targets(ofp, th, pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1033 p7_tophits_Domains(ofp, th, pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1034
1035 if (tblfp) p7_tophits_TabularTargets(tblfp, hmm->name, hmm->acc, th, pli, (nquery == 1));
1036 if (domtblfp) p7_tophits_TabularDomains(domtblfp, hmm->name, hmm->acc, th, pli, (nquery == 1));
1037 if (pfamtblfp) p7_tophits_TabularXfam(pfamtblfp, hmm->name, hmm->acc, th, pli);
1038
1039 esl_stopwatch_Stop(w);
1040 p7_pli_Statistics(ofp, pli, w);
1041 if (fprintf(ofp, "//\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1042
1043 /* Output the results in an MSA (-A option) */
1044 if (afp) {
1045 ESL_MSA *msa = NULL;
1046
1047 if (p7_tophits_Alignment(th, abc, NULL, NULL, 0, p7_ALL_CONSENSUS_COLS, &msa) == eslOK)
1048 {
1049 esl_msa_SetName (msa, hmm->name, -1);
1050 esl_msa_SetAccession(msa, hmm->acc, -1);
1051 esl_msa_SetDesc (msa, hmm->desc, -1);
1052 esl_msa_FormatAuthor(msa, "hmmsearch (HMMER %s)", HMMER_VERSION);
1053
1054 if (textw > 0) esl_msafile_Write(afp, msa, eslMSAFILE_STOCKHOLM);
1055 else esl_msafile_Write(afp, msa, eslMSAFILE_PFAM);
1056
1057 if (fprintf(ofp, "# Alignment of %d hits satisfying inclusion thresholds saved to: %s\n", msa->nseq, esl_opt_GetString(go, "-A")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1058 }
1059 else { if (fprintf(ofp, "# No hits satisfy inclusion thresholds; no alignment saved\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
1060
1061 esl_msa_Destroy(msa);
1062 }
1063
1064 p7_pipeline_Destroy(pli);
1065 p7_tophits_Destroy(th);
1066 p7_hmm_Destroy(hmm);
1067
1068 hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
1069 } /* end outer loop over query HMMs */
1070
1071 switch(hstatus) {
1072 case eslEOD: mpi_failure("read failed, HMM file %s may be truncated?", cfg->hmmfile); break;
1073 case eslEFORMAT: mpi_failure("bad file format in HMM file %s", cfg->hmmfile); break;
1074 case eslEINCOMPAT: mpi_failure("HMM file %s contains different alphabets", cfg->hmmfile); break;
1075 case eslEOF: /* EOF is good, that's what we expect here */ break;
1076 default: mpi_failure("Unexpected error (%d) in reading HMMs from %s", hstatus, cfg->hmmfile);
1077 }
1078
1079 /* monitor all the workers to make sure they have ended */
1080 for (i = 1; i < cfg->nproc; ++i)
1081 {
1082 if (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpistatus) != 0)
1083 mpi_failure("MPI error %d receiving message from %d\n", mpistatus.MPI_SOURCE);
1084
1085 MPI_Get_count(&mpistatus, MPI_PACKED, &size);
1086 if (mpi_buf == NULL || size > mpi_size) {
1087 void *tmp;
1088 ESL_RALLOC(mpi_buf, tmp, sizeof(char) * size);
1089 mpi_size = size;
1090 }
1091
1092 dest = mpistatus.MPI_SOURCE;
1093 MPI_Recv(mpi_buf, size, MPI_PACKED, dest, mpistatus.MPI_TAG, MPI_COMM_WORLD, &mpistatus);
1094
1095 if (mpistatus.MPI_TAG == HMMER_ERROR_TAG)
1096 mpi_failure("MPI client %d raised error:\n%s\n", dest, mpi_buf);
1097 if (mpistatus.MPI_TAG != HMMER_TERMINATING_TAG)
1098 mpi_failure("Unexpected tag %d from %d\n", mpistatus.MPI_TAG, dest);
1099 }
1100
1101 /* Terminate outputs... any last words?
1102 */
1103 if (tblfp) p7_tophits_TabularTail(tblfp, "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
1104 if (domtblfp) p7_tophits_TabularTail(domtblfp, "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
1105 if (pfamtblfp)p7_tophits_TabularTail(pfamtblfp, "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
1106 if (ofp) { if (fprintf(ofp, "[ok]\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
1107
1108 /* Cleanup - prepare for exit
1109 */
1110 free(list);
1111 if (mpi_buf != NULL) free(mpi_buf);
1112
1113 p7_hmmfile_Close(hfp);
1114 esl_sqfile_Close(dbfp);
1115
1116 p7_bg_Destroy(bg);
1117 esl_sq_Destroy(dbsq);
1118 esl_stopwatch_Destroy(w);
1119
1120 if (ofp != stdout) fclose(ofp);
1121 if (afp) fclose(afp);
1122 if (tblfp) fclose(tblfp);
1123 if (domtblfp) fclose(domtblfp);
1124 if (pfamtblfp) fclose(pfamtblfp);
1125
1126 return eslOK;
1127
1128 ERROR:
1129 return eslEMEM;
1130 }
1131
1132
1133 static int
mpi_worker(ESL_GETOPTS * go,struct cfg_s * cfg)1134 mpi_worker(ESL_GETOPTS *go, struct cfg_s *cfg)
1135 {
1136 P7_HMM *hmm = NULL; /* one HMM query */
1137 ESL_SQ *dbsq = NULL; /* one target sequence (digital) */
1138 ESL_ALPHABET *abc = NULL; /* digital alphabet */
1139 P7_BG *bg = NULL; /* null model */
1140 P7_HMMFILE *hfp = NULL; /* open input HMM file */
1141 ESL_SQFILE *dbfp = NULL; /* open input sequence file */
1142 int dbfmt = eslSQFILE_UNKNOWN; /* format code for sequence database file */
1143 ESL_STOPWATCH *w;
1144 int status = eslOK;
1145 int hstatus = eslOK;
1146 int sstatus = eslOK;
1147
1148 char *mpi_buf = NULL; /* buffer used to pack/unpack structures */
1149 int mpi_size = 0; /* size of the allocated buffer */
1150
1151 MPI_Status mpistatus;
1152 char errbuf[eslERRBUFSIZE];
1153
1154 w = esl_stopwatch_Create();
1155
1156 /* Open the target sequence database */
1157 status = esl_sqfile_Open(cfg->dbfile, dbfmt, p7_SEQDBENV, &dbfp);
1158 if (status == eslENOTFOUND) mpi_failure("Failed to open sequence file %s for reading\n", cfg->dbfile);
1159 else if (status == eslEFORMAT) mpi_failure("Sequence file %s is empty or misformatted\n", cfg->dbfile);
1160 else if (status == eslEINVAL) mpi_failure("Can't autodetect format of a stdin or .gz seqfile");
1161 else if (status != eslOK) mpi_failure("Unexpected error %d opening sequence file %s\n", status, cfg->dbfile);
1162
1163 /* Open the query profile HMM file */
1164 status = p7_hmmfile_OpenE(cfg->hmmfile, NULL, &hfp, errbuf);
1165 if (status == eslENOTFOUND) mpi_failure("File existence/permissions problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
1166 else if (status == eslEFORMAT) mpi_failure("File format problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
1167 else if (status != eslOK) mpi_failure("Unexpected error %d in opening HMM file %s.\n%s\n", status, cfg->hmmfile, errbuf);
1168
1169 /* <abc> is not known 'til first HMM is read. */
1170 hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
1171 if (hstatus == eslOK)
1172 {
1173 /* One-time initializations after alphabet <abc> becomes known */
1174 dbsq = esl_sq_CreateDigital(abc);
1175 bg = p7_bg_Create(abc);
1176 esl_sqfile_SetDigital(dbfp, abc);
1177 }
1178
1179 /* Outer loop: over each query HMM in <hmmfile>. */
1180 while (hstatus == eslOK)
1181 {
1182 P7_PROFILE *gm = NULL;
1183 P7_OPROFILE *om = NULL; /* optimized query profile */
1184 P7_PIPELINE *pli = NULL;
1185 P7_TOPHITS *th = NULL;
1186
1187 SEQ_BLOCK block;
1188
1189 esl_stopwatch_Start(w);
1190
1191 status = 0;
1192 MPI_Send(&status, 1, MPI_INT, 0, HMMER_READY_TAG, MPI_COMM_WORLD);
1193
1194 /* Convert to an optimized model */
1195 gm = p7_profile_Create (hmm->M, abc);
1196 om = p7_oprofile_Create(hmm->M, abc);
1197 p7_ProfileConfig(hmm, bg, gm, 100, p7_LOCAL);
1198 p7_oprofile_Convert(gm, om);
1199
1200 th = p7_tophits_Create();
1201 pli = p7_pipeline_Create(go, om->M, 100, FALSE, p7_SEARCH_SEQS); /* L_hint = 100 is just a dummy for now */
1202 p7_pli_NewModel(pli, om, bg);
1203
1204 /* receive a sequence block from the master */
1205 MPI_Recv(&block, 3, MPI_LONG_LONG_INT, 0, HMMER_BLOCK_TAG, MPI_COMM_WORLD, &mpistatus);
1206 while (block.count > 0)
1207 {
1208 uint64_t length = 0;
1209 uint64_t count = block.count;
1210
1211 status = esl_sqfile_Position(dbfp, block.offset);
1212 if (status != eslOK) mpi_failure("Cannot position sequence database to %ld\n", block.offset);
1213
1214 while (count > 0 && (sstatus = esl_sqio_Read(dbfp, dbsq)) == eslOK)
1215 {
1216 length = dbsq->eoff - block.offset + 1;
1217
1218 p7_pli_NewSeq(pli, dbsq);
1219 p7_bg_SetLength(bg, dbsq->n);
1220 p7_oprofile_ReconfigLength(om, dbsq->n);
1221
1222 p7_Pipeline(pli, om, bg, dbsq, NULL, th);
1223
1224 esl_sq_Reuse(dbsq);
1225 p7_pipeline_Reuse(pli);
1226
1227 --count;
1228 }
1229
1230 /* lets do a little bit of sanity checking here to make sure the blocks are the same */
1231 if (count > 0) mpi_failure("Block count mismatch - expected %ld found %ld at offset %ld\n", block.count, block.count - count, block.offset);
1232 if (block.length != length) mpi_failure("Block length mismatch - expected %ld found %ld at offset %ld\n", block.length, length, block.offset);
1233
1234 /* inform the master we need another block of sequences */
1235 status = 0;
1236 MPI_Send(&status, 1, MPI_INT, 0, HMMER_READY_TAG, MPI_COMM_WORLD);
1237
1238 /* wait for the next block of sequences */
1239 MPI_Recv(&block, 3, MPI_LONG_LONG_INT, 0, HMMER_BLOCK_TAG, MPI_COMM_WORLD, &mpistatus);
1240 }
1241
1242 esl_stopwatch_Stop(w);
1243
1244 /* Send the top hits back to the master. */
1245 p7_tophits_MPISend(th, 0, HMMER_TOPHITS_TAG, MPI_COMM_WORLD, &mpi_buf, &mpi_size);
1246 p7_pipeline_MPISend(pli, 0, HMMER_PIPELINE_TAG, MPI_COMM_WORLD, &mpi_buf, &mpi_size);
1247
1248 p7_pipeline_Destroy(pli);
1249 p7_tophits_Destroy(th);
1250 p7_oprofile_Destroy(om);
1251 p7_profile_Destroy(gm);
1252 p7_hmm_Destroy(hmm);
1253
1254 hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
1255 } /* end outer loop over query HMMs */
1256
1257 switch(hstatus)
1258 {
1259 case eslEOF:
1260 /* do nothing */
1261 break;
1262 case eslEFORMAT:
1263 mpi_failure("bad file format in HMM file %s", cfg->hmmfile);
1264 break;
1265 case eslEINCOMPAT:
1266 mpi_failure("HMM file %s contains different alphabets", cfg->hmmfile);
1267 break;
1268 default:
1269 mpi_failure("Unexpected error (%d) in reading HMMs from %s", hstatus, cfg->hmmfile);
1270 }
1271
1272 status = 0;
1273 MPI_Send(&status, 1, MPI_INT, 0, HMMER_TERMINATING_TAG, MPI_COMM_WORLD);
1274
1275 if (mpi_buf != NULL) free(mpi_buf);
1276
1277 p7_hmmfile_Close(hfp);
1278 esl_sqfile_Close(dbfp);
1279
1280 p7_bg_Destroy(bg);
1281 esl_sq_Destroy(dbsq);
1282 esl_stopwatch_Destroy(w);
1283
1284 return eslOK;
1285 }
1286 #endif /*HMMER_MPI*/
1287
1288 static int
serial_loop(WORKER_INFO * info,ESL_SQFILE * dbfp,int n_targetseqs)1289 serial_loop(WORKER_INFO *info, ESL_SQFILE *dbfp, int n_targetseqs)
1290 {
1291 int sstatus;
1292 ESL_SQ *dbsq = NULL; /* one target sequence (digital) */
1293 int seq_cnt = 0;
1294
1295 dbsq = esl_sq_CreateDigital(info->om->abc);
1296
1297 /* Main loop: */
1298 while ( (n_targetseqs==-1 || seq_cnt<n_targetseqs) && (sstatus = esl_sqio_Read(dbfp, dbsq)) == eslOK)
1299 {
1300 p7_pli_NewSeq(info->pli, dbsq);
1301 p7_bg_SetLength(info->bg, dbsq->n);
1302 p7_oprofile_ReconfigLength(info->om, dbsq->n);
1303
1304 p7_Pipeline(info->pli, info->om, info->bg, dbsq, NULL, info->th);
1305
1306 seq_cnt++;
1307 esl_sq_Reuse(dbsq);
1308 p7_pipeline_Reuse(info->pli);
1309 }
1310
1311 if (n_targetseqs!=-1 && seq_cnt==n_targetseqs)
1312 sstatus = eslEOF;
1313
1314 esl_sq_Destroy(dbsq);
1315
1316 return sstatus;
1317 }
1318
1319 #ifdef HMMER_THREADS
1320 static int
thread_loop(ESL_THREADS * obj,ESL_WORK_QUEUE * queue,ESL_SQFILE * dbfp,int n_targetseqs)1321 thread_loop(ESL_THREADS *obj, ESL_WORK_QUEUE *queue, ESL_SQFILE *dbfp, int n_targetseqs)
1322 {
1323 int status = eslOK;
1324 int sstatus = eslOK;
1325 int eofCount = 0;
1326 ESL_SQ_BLOCK *block;
1327 void *newBlock;
1328
1329 esl_workqueue_Reset(queue);
1330 esl_threads_WaitForStart(obj);
1331
1332 status = esl_workqueue_ReaderUpdate(queue, NULL, &newBlock);
1333 if (status != eslOK) esl_fatal("Work queue reader failed");
1334
1335 /* Main loop: */
1336 while (sstatus == eslOK )
1337 {
1338 block = (ESL_SQ_BLOCK *) newBlock;
1339
1340 if (n_targetseqs == 0)
1341 {
1342 block->count = 0;
1343 sstatus = eslEOF;
1344 } else {
1345 sstatus = esl_sqio_ReadBlock(dbfp, block, -1, n_targetseqs, FALSE);
1346 n_targetseqs -= block->count;
1347 }
1348
1349 if (sstatus == eslEOF)
1350 {
1351 if (eofCount < esl_threads_GetWorkerCount(obj)) sstatus = eslOK;
1352 ++eofCount;
1353 }
1354
1355 if (sstatus == eslOK)
1356 {
1357 status = esl_workqueue_ReaderUpdate(queue, block, &newBlock);
1358 if (status != eslOK) esl_fatal("Work queue reader failed");
1359 }
1360 }
1361
1362 status = esl_workqueue_ReaderUpdate(queue, block, NULL);
1363 if (status != eslOK) esl_fatal("Work queue reader failed");
1364
1365 if (sstatus == eslEOF)
1366 {
1367 /* wait for all the threads to complete */
1368 esl_threads_WaitForFinish(obj);
1369 esl_workqueue_Complete(queue);
1370 }
1371
1372 return sstatus;
1373 }
1374
1375 static void
pipeline_thread(void * arg)1376 pipeline_thread(void *arg)
1377 {
1378 int i;
1379 int status;
1380 int workeridx;
1381 WORKER_INFO *info;
1382 ESL_THREADS *obj;
1383
1384 ESL_SQ_BLOCK *block = NULL;
1385 void *newBlock;
1386
1387 impl_Init();
1388
1389 obj = (ESL_THREADS *) arg;
1390 esl_threads_Started(obj, &workeridx);
1391
1392 info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
1393
1394 status = esl_workqueue_WorkerUpdate(info->queue, NULL, &newBlock);
1395 if (status != eslOK) esl_fatal("Work queue worker failed");
1396
1397 /* loop until all blocks have been processed */
1398 block = (ESL_SQ_BLOCK *) newBlock;
1399 while (block->count > 0)
1400 {
1401 /* Main loop: */
1402 for (i = 0; i < block->count; ++i)
1403 {
1404 ESL_SQ *dbsq = block->list + i;
1405
1406 p7_pli_NewSeq(info->pli, dbsq);
1407 p7_bg_SetLength(info->bg, dbsq->n);
1408 p7_oprofile_ReconfigLength(info->om, dbsq->n);
1409
1410 p7_Pipeline(info->pli, info->om, info->bg, dbsq, NULL, info->th);
1411
1412 esl_sq_Reuse(dbsq);
1413 p7_pipeline_Reuse(info->pli);
1414 }
1415
1416 status = esl_workqueue_WorkerUpdate(info->queue, block, &newBlock);
1417 if (status != eslOK) esl_fatal("Work queue worker failed");
1418
1419 block = (ESL_SQ_BLOCK *) newBlock;
1420 }
1421
1422 status = esl_workqueue_WorkerUpdate(info->queue, block, NULL);
1423 if (status != eslOK) esl_fatal("Work queue worker failed");
1424
1425 esl_threads_Finished(obj, workeridx);
1426 return;
1427 }
1428 #endif /* HMMER_THREADS */
1429
1430
1431
1432