1 /* hmmsearch: search profile HMM(s) against a sequence database.
2  *
3  * To do:
4  *  - in MPI mode, add a check to make sure ncpus >= 2. If 1, then we
5  *    only have a master, no workers. See Infernal commit r3972 on the
6  *    same point; and same note in hmmscan.c's to do list.
7  */
8 #include "p7_config.h"
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 
14 #include "easel.h"
15 #include "esl_alphabet.h"
16 #include "esl_getopts.h"
17 #include "esl_msa.h"
18 #include "esl_msafile.h"
19 #include "esl_sq.h"
20 #include "esl_sqio.h"
21 #include "esl_stopwatch.h"
22 
23 #ifdef HMMER_MPI
24 #include "mpi.h"
25 #include "esl_mpi.h"
26 #endif
27 
28 #ifdef HMMER_THREADS
29 #include <unistd.h>
30 #include "esl_threads.h"
31 #include "esl_workqueue.h"
32 #endif
33 
34 #include "hmmer.h"
35 
36 typedef struct {
37 #ifdef HMMER_THREADS
38   ESL_WORK_QUEUE   *queue;
39 #endif
40   P7_BG            *bg;	         /* null model                              */
41   P7_PIPELINE      *pli;         /* work pipeline                           */
42   P7_TOPHITS       *th;          /* top hit results                         */
43   P7_OPROFILE      *om;          /* optimized query profile                 */
44 } WORKER_INFO;
45 
46 #define REPOPTS     "-E,-T,--cut_ga,--cut_nc,--cut_tc"
47 #define DOMREPOPTS  "--domE,--domT,--cut_ga,--cut_nc,--cut_tc"
48 #define INCOPTS     "--incE,--incT,--cut_ga,--cut_nc,--cut_tc"
49 #define INCDOMOPTS  "--incdomE,--incdomT,--cut_ga,--cut_nc,--cut_tc"
50 #define THRESHOPTS  "-E,-T,--domE,--domT,--incE,--incT,--incdomE,--incdomT,--cut_ga,--cut_nc,--cut_tc"
51 
52 #if defined (HMMER_THREADS) && defined (HMMER_MPI)
53 #define CPUOPTS     "--mpi"
54 #define MPIOPTS     "--cpu"
55 #else
56 #define CPUOPTS     NULL
57 #define MPIOPTS     NULL
58 #endif
59 
60 static ESL_OPTIONS options[] = {
61   /* name           type      default  env  range     toggles   reqs   incomp              help                                                      docgroup*/
62   { "-h",           eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL,  NULL,            "show brief help on version and usage",                         1 },
63   /* Control of output */
64   { "-o",           eslARG_OUTFILE, NULL, NULL, NULL,    NULL,  NULL,  NULL,            "direct output to file <f>, not stdout",                        2 },
65   { "-A",           eslARG_OUTFILE, NULL, NULL, NULL,    NULL,  NULL,  NULL,            "save multiple alignment of all hits to file <f>",              2 },
66   { "--tblout",     eslARG_OUTFILE, NULL, NULL, NULL,    NULL,  NULL,  NULL,            "save parseable table of per-sequence hits to file <f>",        2 },
67   { "--domtblout",  eslARG_OUTFILE, NULL, NULL, NULL,    NULL,  NULL,  NULL,            "save parseable table of per-domain hits to file <f>",          2 },
68   { "--pfamtblout", eslARG_OUTFILE, NULL, NULL, NULL,    NULL,  NULL,  NULL,            "save table of hits and domains to file, in Pfam format <f>",   2 },
69   { "--acc",        eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL,  NULL,            "prefer accessions over names in output",                       2 },
70   { "--noali",      eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL,  NULL,            "don't output alignments, so output is smaller",                2 },
71   { "--notextw",    eslARG_NONE,    NULL, NULL, NULL,    NULL,  NULL, "--textw",        "unlimit ASCII text output line width",                         2 },
72   { "--textw",      eslARG_INT,    "120", NULL, "n>=120",NULL,  NULL, "--notextw",      "set max width of ASCII text output lines",                     2 },
73   /* Control of reporting thresholds */
74   { "-E",           eslARG_REAL,  "10.0", NULL, "x>0",   NULL,  NULL,  REPOPTS,         "report sequences <= this E-value threshold in output",         4 },
75   { "-T",           eslARG_REAL,   FALSE, NULL, NULL,    NULL,  NULL,  REPOPTS,         "report sequences >= this score threshold in output",           4 },
76   { "--domE",       eslARG_REAL,  "10.0", NULL, "x>0",   NULL,  NULL,  DOMREPOPTS,      "report domains <= this E-value threshold in output",           4 },
77   { "--domT",       eslARG_REAL,   FALSE, NULL, NULL,    NULL,  NULL,  DOMREPOPTS,      "report domains >= this score cutoff in output",                4 },
78   /* Control of inclusion (significance) thresholds */
79   { "--incE",       eslARG_REAL,  "0.01", NULL, "x>0",   NULL,  NULL,  INCOPTS,         "consider sequences <= this E-value threshold as significant",  5 },
80   { "--incT",       eslARG_REAL,   FALSE, NULL, NULL,    NULL,  NULL,  INCOPTS,         "consider sequences >= this score threshold as significant",    5 },
81   { "--incdomE",    eslARG_REAL,  "0.01", NULL, "x>0",   NULL,  NULL,  INCDOMOPTS,      "consider domains <= this E-value threshold as significant",    5 },
82   { "--incdomT",    eslARG_REAL,   FALSE, NULL, NULL,    NULL,  NULL,  INCDOMOPTS,      "consider domains >= this score threshold as significant",      5 },
83   /* Model-specific thresholding for both reporting and inclusion */
84   { "--cut_ga",     eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL,  THRESHOPTS,      "use profile's GA gathering cutoffs to set all thresholding",   6 },
85   { "--cut_nc",     eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL,  THRESHOPTS,      "use profile's NC noise cutoffs to set all thresholding",       6 },
86   { "--cut_tc",     eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL,  THRESHOPTS,      "use profile's TC trusted cutoffs to set all thresholding",     6 },
87   /* Control of acceleration pipeline */
88   { "--max",        eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL, "--F1,--F2,--F3", "Turn all heuristic filters off (less speed, more power)",      7 },
89   { "--F1",         eslARG_REAL,  "0.02", NULL, NULL,    NULL,  NULL, "--max",          "Stage 1 (MSV) threshold: promote hits w/ P <= F1",             7 },
90   { "--F2",         eslARG_REAL,  "1e-3", NULL, NULL,    NULL,  NULL, "--max",          "Stage 2 (Vit) threshold: promote hits w/ P <= F2",             7 },
91   { "--F3",         eslARG_REAL,  "1e-5", NULL, NULL,    NULL,  NULL, "--max",          "Stage 3 (Fwd) threshold: promote hits w/ P <= F3",             7 },
92   { "--nobias",     eslARG_NONE,   NULL,  NULL, NULL,    NULL,  NULL, "--max",          "turn off composition bias filter",                             7 },
93 
94 /* Other options */
95   { "--nonull2",    eslARG_NONE,   NULL,  NULL, NULL,    NULL,  NULL,  NULL,            "turn off biased composition score corrections",               12 },
96   { "-Z",           eslARG_REAL,   FALSE, NULL, "x>0",   NULL,  NULL,  NULL,            "set # of comparisons done, for E-value calculation",          12 },
97   { "--domZ",       eslARG_REAL,   FALSE, NULL, "x>0",   NULL,  NULL,  NULL,            "set # of significant seqs, for domain E-value calculation",   12 },
98   { "--seed",       eslARG_INT,    "42",  NULL, "n>=0",  NULL,  NULL,  NULL,            "set RNG seed to <n> (if 0: one-time arbitrary seed)",         12 },
99   { "--tformat",    eslARG_STRING,  NULL, NULL, NULL,    NULL,  NULL,  NULL,            "assert target <seqfile> is in format <s>: no autodetection",  12 },
100 
101 #ifdef HMMER_THREADS
102   { "--cpu",        eslARG_INT, p7_NCPU,"HMMER_NCPU","n>=0",NULL,  NULL,  CPUOPTS,      "number of parallel CPU workers to use for multithreads",      12 },
103 #endif
104 #ifdef HMMER_MPI
105   { "--stall",      eslARG_NONE,   FALSE, NULL, NULL,    NULL,"--mpi", NULL,            "arrest after start: for debugging MPI under gdb",             12 },
106   { "--mpi",        eslARG_NONE,   FALSE, NULL, NULL,    NULL,  NULL,  MPIOPTS,         "run as an MPI parallel program",                              12 },
107 #endif
108 
109   /* Restrict search to subset of database - hidden because these flags are
110    *   (a) currently for internal use
111    *   (b) probably going to change
112    * Doesn't work with MPI
113    */
114   { "--restrictdb_stkey", eslARG_STRING, "0",  NULL, NULL,    NULL,  NULL,  NULL,       "Search starts at the sequence with name <s> (not with MPI)",     99 },
115   { "--restrictdb_n",eslARG_INT,        "-1",  NULL, NULL,    NULL,  NULL,  NULL,       "Search <j> target sequences (starting at --restrictdb_stkey)",   99 },
116   { "--ssifile",    eslARG_STRING,       NULL, NULL, NULL,    NULL,  NULL,  NULL,       "restrictdb_x values require ssi file. Override default to <s>",  99 },
117 
118   {  0, 0, 0, 0, 0, 0, 0, 0, 0, 0 },
119 };
120 
121 static char usage[]  = "[options] <hmmfile> <seqdb>";
122 static char banner[] = "search profile(s) against a sequence database";
123 
124 /* struct cfg_s : "Global" application configuration shared by all threads/processes
125  *
126  * This structure is passed to routines within main.c, as a means of semi-encapsulation
127  * of shared data amongst different parallel processes (threads or MPI processes).
128  */
129 struct cfg_s {
130   char            *dbfile;            /* target sequence database file                   */
131   char            *hmmfile;           /* query HMM file                                  */
132 
133   int              do_mpi;            /* TRUE if we're doing MPI parallelization         */
134   int              nproc;             /* how many MPI processes, total                   */
135   int              my_rank;           /* who am I, in 0..nproc-1                         */
136 
137   char             *firstseq_key;     /* name of the first sequence in the restricted db range */
138   int              n_targetseq;       /* number of sequences in the restricted range */
139 };
140 
141 static int  serial_master(ESL_GETOPTS *go, struct cfg_s *cfg);
142 static int  serial_loop  (WORKER_INFO *info, ESL_SQFILE *dbfp, int n_targetseqs);
143 
144 #ifdef HMMER_THREADS
145 #define BLOCK_SIZE 1000
146 
147 static int  thread_loop(ESL_THREADS *obj, ESL_WORK_QUEUE *queue, ESL_SQFILE *dbfp, int n_targetseqs);
148 static void pipeline_thread(void *arg);
149 #endif
150 
151 #ifdef HMMER_MPI
152 static int  mpi_master   (ESL_GETOPTS *go, struct cfg_s *cfg);
153 static int  mpi_worker   (ESL_GETOPTS *go, struct cfg_s *cfg);
154 #endif
155 
156 
157 static int
process_commandline(int argc,char ** argv,ESL_GETOPTS ** ret_go,char ** ret_hmmfile,char ** ret_seqfile)158 process_commandline(int argc, char **argv, ESL_GETOPTS **ret_go, char **ret_hmmfile, char **ret_seqfile)
159 {
160   ESL_GETOPTS *go = esl_getopts_Create(options);
161   int          status;
162 
163   if (esl_opt_ProcessEnvironment(go)         != eslOK)  { if (printf("Failed to process environment: %s\n", go->errbuf) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
164   if (esl_opt_ProcessCmdline(go, argc, argv) != eslOK)  { if (printf("Failed to parse command line: %s\n",  go->errbuf) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
165   if (esl_opt_VerifyConfig(go)               != eslOK)  { if (printf("Failed to parse command line: %s\n",  go->errbuf) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
166 
167   /* help format: */
168   if (esl_opt_GetBoolean(go, "-h") == TRUE)
169     {
170       p7_banner(stdout, argv[0], banner);
171       esl_usage(stdout, argv[0], usage);
172       if (puts("\nBasic options:")                                           < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
173       esl_opt_DisplayHelp(stdout, go, 1, 2, 80); /* 1= group; 2 = indentation; 80=textwidth*/
174 
175       if (puts("\nOptions directing output:")                                < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
176       esl_opt_DisplayHelp(stdout, go, 2, 2, 80);
177 
178       if (puts("\nOptions controlling reporting thresholds:")                < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
179       esl_opt_DisplayHelp(stdout, go, 4, 2, 80);
180 
181       if (puts("\nOptions controlling inclusion (significance) thresholds:") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
182       esl_opt_DisplayHelp(stdout, go, 5, 2, 80);
183 
184       if (puts("\nOptions controlling model-specific thresholding:")         < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
185       esl_opt_DisplayHelp(stdout, go, 6, 2, 80);
186 
187       if (puts("\nOptions controlling acceleration heuristics:")             < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
188       esl_opt_DisplayHelp(stdout, go, 7, 2, 80);
189 
190       if (puts("\nOther expert options:")                                    < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
191       esl_opt_DisplayHelp(stdout, go, 12, 2, 80);
192       exit(0);
193     }
194 
195   if (esl_opt_ArgNumber(go)                  != 2)     { if (puts("Incorrect number of command line arguments.")      < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
196   if ((*ret_hmmfile = esl_opt_GetArg(go, 1)) == NULL)  { if (puts("Failed to get <hmmfile> argument on command line") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
197   if ((*ret_seqfile = esl_opt_GetArg(go, 2)) == NULL)  { if (puts("Failed to get <seqdb> argument on command line")   < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
198 
199   /* Validate any attempted use of stdin streams */
200   if (strcmp(*ret_hmmfile, "-") == 0 && strcmp(*ret_seqfile, "-") == 0)
201     { if (puts("Either <hmmfile> or <seqdb> may be '-' (to read from stdin), but not both.") < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed"); goto FAILURE; }
202 
203   *ret_go = go;
204   return eslOK;
205 
206  FAILURE:  /* all errors handled here are user errors, so be polite.  */
207   esl_usage(stdout, argv[0], usage);
208   if (puts("\nwhere most common options are:")                                 < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
209   esl_opt_DisplayHelp(stdout, go, 1, 2, 80); /* 1= group; 2 = indentation; 80=textwidth*/
210   if (printf("\nTo see more help on available options, do %s -h\n\n", argv[0]) < 0) ESL_XEXCEPTION_SYS(eslEWRITE, "write failed");
211   esl_getopts_Destroy(go);
212   exit(1);
213 
214  ERROR:
215   if (go) esl_getopts_Destroy(go);
216   exit(status);
217 }
218 
219 static int
output_header(FILE * ofp,const ESL_GETOPTS * go,char * hmmfile,char * seqfile)220 output_header(FILE *ofp, const ESL_GETOPTS *go, char *hmmfile, char *seqfile)
221 {
222   p7_banner(ofp, go->argv[0], banner);
223 
224   if (fprintf(ofp, "# query HMM file:                  %s\n", hmmfile)                                                                                 < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
225   if (fprintf(ofp, "# target sequence database:        %s\n", seqfile)                                                                                 < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
226   if (esl_opt_IsUsed(go, "-o")           && fprintf(ofp, "# output directed to file:         %s\n",             esl_opt_GetString(go, "-o"))           < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
227   if (esl_opt_IsUsed(go, "-A")           && fprintf(ofp, "# MSA of all hits saved to file:   %s\n",             esl_opt_GetString(go, "-A"))           < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
228   if (esl_opt_IsUsed(go, "--tblout")     && fprintf(ofp, "# per-seq hits tabular output:     %s\n",             esl_opt_GetString(go, "--tblout"))     < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
229   if (esl_opt_IsUsed(go, "--domtblout")  && fprintf(ofp, "# per-dom hits tabular output:     %s\n",             esl_opt_GetString(go, "--domtblout"))  < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
230   if (esl_opt_IsUsed(go, "--pfamtblout") && fprintf(ofp, "# pfam-style tabular hit output:   %s\n",             esl_opt_GetString(go, "--pfamtblout")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
231   if (esl_opt_IsUsed(go, "--acc")        && fprintf(ofp, "# prefer accessions over names:    yes\n")                                                   < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
232   if (esl_opt_IsUsed(go, "--noali")      && fprintf(ofp, "# show alignments in output:       no\n")                                                    < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
233   if (esl_opt_IsUsed(go, "--notextw")    && fprintf(ofp, "# max ASCII text line length:      unlimited\n")                                             < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
234   if (esl_opt_IsUsed(go, "--textw")      && fprintf(ofp, "# max ASCII text line length:      %d\n",             esl_opt_GetInteger(go, "--textw"))     < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
235   if (esl_opt_IsUsed(go, "-E")           && fprintf(ofp, "# sequence reporting threshold:    E-value <= %g\n",  esl_opt_GetReal(go, "-E"))             < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
236   if (esl_opt_IsUsed(go, "-T")           && fprintf(ofp, "# sequence reporting threshold:    score >= %g\n",    esl_opt_GetReal(go, "-T"))             < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
237   if (esl_opt_IsUsed(go, "--domE")       && fprintf(ofp, "# domain reporting threshold:      E-value <= %g\n",  esl_opt_GetReal(go, "--domE"))         < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
238   if (esl_opt_IsUsed(go, "--domT")       && fprintf(ofp, "# domain reporting threshold:      score >= %g\n",    esl_opt_GetReal(go, "--domT"))         < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
239   if (esl_opt_IsUsed(go, "--incE")       && fprintf(ofp, "# sequence inclusion threshold:    E-value <= %g\n",  esl_opt_GetReal(go, "--incE"))         < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
240   if (esl_opt_IsUsed(go, "--incT")       && fprintf(ofp, "# sequence inclusion threshold:    score >= %g\n",    esl_opt_GetReal(go, "--incT"))         < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
241   if (esl_opt_IsUsed(go, "--incdomE")    && fprintf(ofp, "# domain inclusion threshold:      E-value <= %g\n",  esl_opt_GetReal(go, "--incdomE"))      < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
242   if (esl_opt_IsUsed(go, "--incdomT")    && fprintf(ofp, "# domain inclusion threshold:      score >= %g\n",    esl_opt_GetReal(go, "--incdomT"))      < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
243   if (esl_opt_IsUsed(go, "--cut_ga")     && fprintf(ofp, "# model-specific thresholding:     GA cutoffs\n")                                            < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
244   if (esl_opt_IsUsed(go, "--cut_nc")     && fprintf(ofp, "# model-specific thresholding:     NC cutoffs\n")                                            < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
245   if (esl_opt_IsUsed(go, "--cut_tc")     && fprintf(ofp, "# model-specific thresholding:     TC cutoffs\n")                                            < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
246   if (esl_opt_IsUsed(go, "--max")        && fprintf(ofp, "# Max sensitivity mode:            on [all heuristic filters off]\n")                        < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
247   if (esl_opt_IsUsed(go, "--F1")         && fprintf(ofp, "# MSV filter P threshold:       <= %g\n",             esl_opt_GetReal(go, "--F1"))           < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
248   if (esl_opt_IsUsed(go, "--F2")         && fprintf(ofp, "# Vit filter P threshold:       <= %g\n",             esl_opt_GetReal(go, "--F2"))           < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
249   if (esl_opt_IsUsed(go, "--F3")         && fprintf(ofp, "# Fwd filter P threshold:       <= %g\n",             esl_opt_GetReal(go, "--F3"))           < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
250   if (esl_opt_IsUsed(go, "--nobias")     && fprintf(ofp, "# biased composition HMM filter:   off\n")                                                   < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
251   if (esl_opt_IsUsed(go, "--restrictdb_stkey") && fprintf(ofp, "# Restrict db to start at seq key: %s\n",            esl_opt_GetString(go, "--restrictdb_stkey"))  < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
252   if (esl_opt_IsUsed(go, "--restrictdb_n")     && fprintf(ofp, "# Restrict db to # target seqs:    %d\n",            esl_opt_GetInteger(go, "--restrictdb_n")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
253   if (esl_opt_IsUsed(go, "--ssifile")          && fprintf(ofp, "# Override ssi file to:            %s\n",            esl_opt_GetString(go, "--ssifile"))       < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
254 
255   if (esl_opt_IsUsed(go, "--nonull2")    && fprintf(ofp, "# null2 bias corrections:          off\n")                                                   < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
256   if (esl_opt_IsUsed(go, "-Z")           && fprintf(ofp, "# sequence search space set to:    %.0f\n",           esl_opt_GetReal(go, "-Z"))             < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
257   if (esl_opt_IsUsed(go, "--domZ")       && fprintf(ofp, "# domain search space set to:      %.0f\n",           esl_opt_GetReal(go, "--domZ"))         < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
258   if (esl_opt_IsUsed(go, "--seed"))  {
259     if (esl_opt_GetInteger(go, "--seed") == 0 && fprintf(ofp, "# random number seed:              one-time arbitrary\n")                               < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
260     else if (                               fprintf(ofp, "# random number seed set to:       %d\n",             esl_opt_GetInteger(go, "--seed"))      < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
261   }
262   if (esl_opt_IsUsed(go, "--tformat")    && fprintf(ofp, "# targ <seqfile> format asserted:  %s\n",             esl_opt_GetString(go, "--tformat"))    < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
263 #ifdef HMMER_THREADS
264   if (esl_opt_IsUsed(go, "--cpu")        && fprintf(ofp, "# number of worker threads:        %d\n",             esl_opt_GetInteger(go, "--cpu"))       < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
265 #endif
266 #ifdef HMMER_MPI
267   if (esl_opt_IsUsed(go, "--mpi")        && fprintf(ofp, "# MPI:                             on\n")                                                    < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
268 #endif
269   if (fprintf(ofp, "# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n\n")                                                    < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
270   return eslOK;
271 }
272 
273 int
main(int argc,char ** argv)274 main(int argc, char **argv)
275 {
276   ESL_GETOPTS     *go       = NULL;
277   struct cfg_s     cfg;
278   int              status   = eslOK;
279 
280   impl_Init();                  /* processor specific initialization */
281   p7_FLogsumInit();		/* we're going to use table-driven Logsum() approximations at times */
282 
283   /* Initialize what we can in the config structure (without knowing the alphabet yet)
284    */
285   cfg.hmmfile    = NULL;
286   cfg.dbfile     = NULL;
287   cfg.do_mpi     = FALSE;	           /* this gets reset below, if we init MPI */
288   cfg.nproc      = 0;		           /* this gets reset below, if we init MPI */
289   cfg.my_rank    = 0;		           /* this gets reset below, if we init MPI */
290   cfg.firstseq_key = NULL;
291   cfg.n_targetseq  = -1;
292 
293   process_commandline(argc, argv, &go, &cfg.hmmfile, &cfg.dbfile);
294 
295 /* is the range restricted? */
296   if (esl_opt_IsUsed(go, "--restrictdb_stkey") )
297     if ((cfg.firstseq_key = esl_opt_GetString(go, "--restrictdb_stkey")) == NULL)  p7_Fail("Failure capturing --restrictdb_stkey\n");
298 
299   if (esl_opt_IsUsed(go, "--restrictdb_n") )
300     cfg.n_targetseq = esl_opt_GetInteger(go, "--restrictdb_n");
301 
302   if ( cfg.n_targetseq != -1 && cfg.n_targetseq < 1 )
303     p7_Fail("--restrictdb_n must be >= 1\n");
304 
305 
306   /* Figure out who we are, and send control there:
307    * we might be an MPI master, an MPI worker, or a serial program.
308    */
309 #ifdef HMMER_MPI
310   /* pause the execution of the programs execution until the user has a
311    * chance to attach with a debugger and send a signal to resume execution
312    * i.e. (gdb) signal SIGCONT
313    */
314   if (esl_opt_GetBoolean(go, "--stall")) pause();
315 
316   if (esl_opt_GetBoolean(go, "--mpi"))
317     {
318       cfg.do_mpi     = TRUE;
319       MPI_Init(&argc, &argv);
320       MPI_Comm_rank(MPI_COMM_WORLD, &(cfg.my_rank));
321       MPI_Comm_size(MPI_COMM_WORLD, &(cfg.nproc));
322 
323       if (cfg.my_rank > 0)  status = mpi_worker(go, &cfg);
324       else 		    status = mpi_master(go, &cfg);
325 
326       MPI_Finalize();
327     }
328   else
329 #endif /*HMMER_MPI*/
330     {
331       status = serial_master(go, &cfg);
332     }
333 
334   esl_getopts_Destroy(go);
335 
336   return status;
337 }
338 
339 
340 /* serial_master()
341  * The serial version of hmmsearch.
342  * For each query HMM in <hmmfile> search the database for hits.
343  *
344  * A master can only return if it's successful. All errors are handled
345  * immediately and fatally with p7_Fail().  We also use the
346  * ESL_EXCEPTION and ERROR: mechanisms, but only because we know we're
347  * using a fatal exception handler.
348  */
349 static int
serial_master(ESL_GETOPTS * go,struct cfg_s * cfg)350 serial_master(ESL_GETOPTS *go, struct cfg_s *cfg)
351 {
352   FILE            *ofp      = stdout;            /* results output file (-o)                        */
353   FILE            *afp      = NULL;              /* alignment output file (-A)                      */
354   FILE            *tblfp    = NULL;              /* output stream for tabular per-seq (--tblout)    */
355   FILE            *domtblfp = NULL;              /* output stream for tabular per-dom (--domtblout) */
356   FILE            *pfamtblfp= NULL;              /* output stream for pfam tabular output (--pfamtblout)    */
357   P7_HMMFILE      *hfp      = NULL;              /* open input HMM file                             */
358   ESL_SQFILE      *dbfp     = NULL;              /* open input sequence file                        */
359   P7_HMM          *hmm      = NULL;              /* one HMM query                                   */
360   ESL_ALPHABET    *abc      = NULL;              /* digital alphabet                                */
361   int              dbfmt    = eslSQFILE_UNKNOWN; /* format code for sequence database file          */
362   ESL_STOPWATCH   *w;
363   int              textw    = 0;
364   int              nquery   = 0;
365   int              status   = eslOK;
366   int              hstatus  = eslOK;
367   int              sstatus  = eslOK;
368   int              i;
369 
370   int              ncpus    = 0;
371 
372   int              infocnt  = 0;
373   WORKER_INFO     *info     = NULL;
374 #ifdef HMMER_THREADS
375   ESL_SQ_BLOCK    *block    = NULL;
376   ESL_THREADS     *threadObj= NULL;
377   ESL_WORK_QUEUE  *queue    = NULL;
378 #endif
379   char             errbuf[eslERRBUFSIZE];
380 
381   w = esl_stopwatch_Create();
382 
383   if (esl_opt_GetBoolean(go, "--notextw")) textw = 0;
384   else                                     textw = esl_opt_GetInteger(go, "--textw");
385 
386   if (esl_opt_IsOn(go, "--tformat")) {
387     dbfmt = esl_sqio_EncodeFormat(esl_opt_GetString(go, "--tformat"));
388     if (dbfmt == eslSQFILE_UNKNOWN) p7_Fail("%s is not a recognized sequence database file format\n", esl_opt_GetString(go, "--tformat"));
389   }
390 
391   /* Open the target sequence database */
392   status = esl_sqfile_Open(cfg->dbfile, dbfmt, p7_SEQDBENV, &dbfp);
393   if      (status == eslENOTFOUND) p7_Fail("Failed to open sequence file %s for reading\n",          cfg->dbfile);
394   else if (status == eslEFORMAT)   p7_Fail("Sequence file %s is empty or misformatted\n",            cfg->dbfile);
395   else if (status == eslEINVAL)    p7_Fail("Can't autodetect format of a stdin or .gz seqfile");
396   else if (status != eslOK)        p7_Fail("Unexpected error %d opening sequence file %s\n", status, cfg->dbfile);
397 
398 
399   if (esl_opt_IsUsed(go, "--restrictdb_stkey") || esl_opt_IsUsed(go, "--restrictdb_n")) {
400     if (esl_opt_IsUsed(go, "--ssifile"))
401       esl_sqfile_OpenSSI(dbfp, esl_opt_GetString(go, "--ssifile"));
402     else
403       esl_sqfile_OpenSSI(dbfp, NULL);
404   }
405 
406 
407 
408   /* Open the query profile HMM file */
409   status = p7_hmmfile_OpenE(cfg->hmmfile, NULL, &hfp, errbuf);
410   if      (status == eslENOTFOUND) p7_Fail("File existence/permissions problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
411   else if (status == eslEFORMAT)   p7_Fail("File format problem in trying to open HMM file %s.\n%s\n",                cfg->hmmfile, errbuf);
412   else if (status != eslOK)        p7_Fail("Unexpected error %d in opening HMM file %s.\n%s\n",               status, cfg->hmmfile, errbuf);
413 
414   /* Open the results output files */
415   if (esl_opt_IsOn(go, "-o"))          { if ((ofp      = fopen(esl_opt_GetString(go, "-o"), "w")) == NULL) p7_Fail("Failed to open output file %s for writing\n",    esl_opt_GetString(go, "-o")); }
416   if (esl_opt_IsOn(go, "-A"))          { if ((afp      = fopen(esl_opt_GetString(go, "-A"), "w")) == NULL) p7_Fail("Failed to open alignment file %s for writing\n", esl_opt_GetString(go, "-A")); }
417   if (esl_opt_IsOn(go, "--tblout"))    { if ((tblfp    = fopen(esl_opt_GetString(go, "--tblout"),    "w")) == NULL)  esl_fatal("Failed to open tabular per-seq output file %s for writing\n", esl_opt_GetString(go, "--tblout")); }
418   if (esl_opt_IsOn(go, "--domtblout")) { if ((domtblfp = fopen(esl_opt_GetString(go, "--domtblout"), "w")) == NULL)  esl_fatal("Failed to open tabular per-dom output file %s for writing\n", esl_opt_GetString(go, "--domtblout")); }
419   if (esl_opt_IsOn(go, "--pfamtblout")){ if ((pfamtblfp = fopen(esl_opt_GetString(go, "--pfamtblout"), "w")) == NULL)  esl_fatal("Failed to open pfam-style tabular output file %s for writing\n", esl_opt_GetString(go, "--pfamtblout")); }
420 
421 #ifdef HMMER_THREADS
422   /* initialize thread data */
423   ncpus = ESL_MIN( esl_opt_GetInteger(go, "--cpu"), esl_threads_GetCPUCount());
424   if (ncpus > 0)
425     {
426       threadObj = esl_threads_Create(&pipeline_thread);
427       queue = esl_workqueue_Create(ncpus * 2);
428     }
429 #endif
430 
431   infocnt = (ncpus == 0) ? 1 : ncpus;
432   ESL_ALLOC(info, sizeof(*info) * infocnt);
433 
434   /* <abc> is not known 'til first HMM is read. */
435   hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
436   if (hstatus == eslOK)
437     {
438       /* One-time initializations after alphabet <abc> becomes known */
439       output_header(ofp, go, cfg->hmmfile, cfg->dbfile);
440       esl_sqfile_SetDigital(dbfp, abc); //ReadBlock requires knowledge of the alphabet to decide how best to read blocks
441 
442       for (i = 0; i < infocnt; ++i)
443 	{
444 	  info[i].bg    = p7_bg_Create(abc);
445 #ifdef HMMER_THREADS
446 	  info[i].queue = queue;
447 #endif
448 	}
449 
450 #ifdef HMMER_THREADS
451       for (i = 0; i < ncpus * 2; ++i)
452 	{
453 	  block = esl_sq_CreateDigitalBlock(BLOCK_SIZE, abc);
454 	  if (block == NULL) 	      esl_fatal("Failed to allocate sequence block");
455 
456  	  status = esl_workqueue_Init(queue, block);
457 	  if (status != eslOK)	      esl_fatal("Failed to add block to work queue");
458 	}
459 #endif
460     }
461 
462   /* Outer loop: over each query HMM in <hmmfile>. */
463   while (hstatus == eslOK)
464     {
465       P7_PROFILE      *gm      = NULL;
466       P7_OPROFILE     *om      = NULL;       /* optimized query profile                  */
467 
468       nquery++;
469       esl_stopwatch_Start(w);
470 
471       /* seqfile may need to be rewound (multiquery mode) */
472       if (nquery > 1)
473       {
474         if (! esl_sqfile_IsRewindable(dbfp))
475           esl_fatal("Target sequence file %s isn't rewindable; can't search it with multiple queries", cfg->dbfile);
476 
477         if (! esl_opt_IsUsed(go, "--restrictdb_stkey") )
478           esl_sqfile_Position(dbfp, 0); //only re-set current position to 0 if we're not planning to set it in a moment
479       }
480 
481       if ( cfg->firstseq_key != NULL ) { //it's tempting to want to do this once and capture the offset position for future passes, but ncbi files make this non-trivial, so this keeps it general
482         sstatus = esl_sqfile_PositionByKey(dbfp, cfg->firstseq_key);
483         if (sstatus != eslOK)
484           p7_Fail("Failure setting restrictdb_stkey to %d\n", cfg->firstseq_key);
485       }
486 
487       if (fprintf(ofp, "Query:       %s  [M=%d]\n", hmm->name, hmm->M)  < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
488       if (hmm->acc)  { if (fprintf(ofp, "Accession:   %s\n", hmm->acc)  < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
489       if (hmm->desc) { if (fprintf(ofp, "Description: %s\n", hmm->desc) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
490 
491       /* Convert to an optimized model */
492       gm = p7_profile_Create (hmm->M, abc);
493       om = p7_oprofile_Create(hmm->M, abc);
494       p7_ProfileConfig(hmm, info->bg, gm, 100, p7_LOCAL); /* 100 is a dummy length for now; and MSVFilter requires local mode */
495       p7_oprofile_Convert(gm, om);                  /* <om> is now p7_LOCAL, multihit */
496 
497       for (i = 0; i < infocnt; ++i)
498       {
499         /* Create processing pipeline and hit list */
500         info[i].th  = p7_tophits_Create();
501         info[i].om  = p7_oprofile_Clone(om);
502         info[i].pli = p7_pipeline_Create(go, om->M, 100, FALSE, p7_SEARCH_SEQS); /* L_hint = 100 is just a dummy for now */
503         status = p7_pli_NewModel(info[i].pli, info[i].om, info[i].bg);
504         if (status == eslEINVAL) p7_Fail(info->pli->errbuf);
505 
506 #ifdef HMMER_THREADS
507         if (ncpus > 0) esl_threads_AddThread(threadObj, &info[i]);
508 #endif
509       }
510 
511 #ifdef HMMER_THREADS
512       if (ncpus > 0)  sstatus = thread_loop(threadObj, queue, dbfp, cfg->n_targetseq);
513       else            sstatus = serial_loop(info, dbfp, cfg->n_targetseq);
514 #else
515       sstatus = serial_loop(info, dbfp, cfg->n_targetseq);
516 #endif
517       switch(sstatus)
518       {
519       case eslEFORMAT:
520         esl_fatal("Parse failed (sequence file %s):\n%s\n",
521             dbfp->filename, esl_sqfile_GetErrorBuf(dbfp));
522         break;
523       case eslEOF:
524         /* do nothing */
525         break;
526       default:
527         esl_fatal("Unexpected error %d reading sequence file %s", sstatus, dbfp->filename);
528       }
529 
530       /* merge the results of the search results */
531       for (i = 1; i < infocnt; ++i)
532       {
533         p7_tophits_Merge(info[0].th, info[i].th);
534         p7_pipeline_Merge(info[0].pli, info[i].pli);
535 
536         p7_pipeline_Destroy(info[i].pli);
537         p7_tophits_Destroy(info[i].th);
538         p7_oprofile_Destroy(info[i].om);
539       }
540 
541       /* Print the results.  */
542       p7_tophits_SortBySortkey(info->th);
543       p7_tophits_Threshold(info->th, info->pli);
544       p7_tophits_Targets(ofp, info->th, info->pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
545       p7_tophits_Domains(ofp, info->th, info->pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
546 
547       if (tblfp)     p7_tophits_TabularTargets(tblfp,    hmm->name, hmm->acc, info->th, info->pli, (nquery == 1));
548       if (domtblfp)  p7_tophits_TabularDomains(domtblfp, hmm->name, hmm->acc, info->th, info->pli, (nquery == 1));
549       if (pfamtblfp) p7_tophits_TabularXfam(pfamtblfp, hmm->name, hmm->acc, info->th, info->pli);
550 
551       esl_stopwatch_Stop(w);
552       p7_pli_Statistics(ofp, info->pli, w);
553       if (fprintf(ofp, "//\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
554 
555       /* Output the results in an MSA (-A option) */
556       if (afp) {
557 	ESL_MSA *msa = NULL;
558 
559 	if (p7_tophits_Alignment(info->th, abc, NULL, NULL, 0, p7_ALL_CONSENSUS_COLS, &msa) == eslOK)
560 	  {
561 	    esl_msa_SetName     (msa, hmm->name, -1);
562 	    esl_msa_SetAccession(msa, hmm->acc,  -1);
563 	    esl_msa_SetDesc     (msa, hmm->desc, -1);
564 	    esl_msa_FormatAuthor(msa, "hmmsearch (HMMER %s)", HMMER_VERSION);
565 
566 	    if (textw > 0) esl_msafile_Write(afp, msa, eslMSAFILE_STOCKHOLM);
567 	    else           esl_msafile_Write(afp, msa, eslMSAFILE_PFAM);
568 
569 	    if (fprintf(ofp, "# Alignment of %d hits satisfying inclusion thresholds saved to: %s\n", msa->nseq, esl_opt_GetString(go, "-A")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
570 	  }
571 	else { if (fprintf(ofp, "# No hits satisfy inclusion thresholds; no alignment saved\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
572 
573 	esl_msa_Destroy(msa);
574       }
575 
576       p7_pipeline_Destroy(info->pli);
577       p7_tophits_Destroy(info->th);
578       p7_oprofile_Destroy(info->om);
579       p7_oprofile_Destroy(om);
580       p7_profile_Destroy(gm);
581       p7_hmm_Destroy(hmm);
582 
583       hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
584     } /* end outer loop over query HMMs */
585 
586   switch(hstatus) {
587   case eslEOD:       p7_Fail("read failed, HMM file %s may be truncated?", cfg->hmmfile);      break;
588   case eslEFORMAT:   p7_Fail("bad file format in HMM file %s",             cfg->hmmfile);      break;
589   case eslEINCOMPAT: p7_Fail("HMM file %s contains different alphabets",   cfg->hmmfile);      break;
590   case eslEOF:       /* do nothing. EOF is what we want. */                                    break;
591   default:           p7_Fail("Unexpected error (%d) in reading HMMs from %s", hstatus, cfg->hmmfile);
592   }
593 
594 
595   /* Terminate outputs... any last words?
596    */
597   if (tblfp)    p7_tophits_TabularTail(tblfp,    "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
598   if (domtblfp) p7_tophits_TabularTail(domtblfp, "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
599   if (pfamtblfp) p7_tophits_TabularTail(pfamtblfp,"hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
600   if (ofp)      { if (fprintf(ofp, "[ok]\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
601 
602   /* Cleanup - prepare for exit
603    */
604   for (i = 0; i < infocnt; ++i)
605     p7_bg_Destroy(info[i].bg);
606 
607 #ifdef HMMER_THREADS
608   if (ncpus > 0)
609     {
610       esl_workqueue_Reset(queue);
611       while (esl_workqueue_Remove(queue, (void **) &block) == eslOK)
612 	esl_sq_DestroyBlock(block);
613       esl_workqueue_Destroy(queue);
614       esl_threads_Destroy(threadObj);
615     }
616 #endif
617 
618   free(info);
619   p7_hmmfile_Close(hfp);
620   esl_sqfile_Close(dbfp);
621   esl_alphabet_Destroy(abc);
622   esl_stopwatch_Destroy(w);
623 
624   if (ofp != stdout) fclose(ofp);
625   if (afp)           fclose(afp);
626   if (tblfp)         fclose(tblfp);
627   if (domtblfp)      fclose(domtblfp);
628   if (pfamtblfp)     fclose(pfamtblfp);
629 
630   return eslOK;
631 
632  ERROR:
633   return eslFAIL;
634 }
635 
636 #ifdef HMMER_MPI
637 
638 /* Define common tags used by the MPI master/slave processes */
639 #define HMMER_ERROR_TAG          1
640 #define HMMER_HMM_TAG            2
641 #define HMMER_SEQUENCE_TAG       3
642 #define HMMER_BLOCK_TAG          4
643 #define HMMER_PIPELINE_TAG       5
644 #define HMMER_TOPHITS_TAG        6
645 #define HMMER_HIT_TAG            7
646 #define HMMER_TERMINATING_TAG    8
647 #define HMMER_READY_TAG          9
648 
649 /* mpi_failure()
650  * Generate an error message.  If the clients rank is not 0, a
651  * message is created with the error message and sent to the
652  * master process for handling.
653  */
654 static void
mpi_failure(char * format,...)655 mpi_failure(char *format, ...)
656 {
657   va_list  argp;
658   int      status = eslFAIL;
659   int      len;
660   int      rank;
661   char     str[512];
662 
663   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
664 
665   /* format the error mesg */
666   va_start(argp, format);
667   len = vsnprintf(str, sizeof(str), format, argp);
668   va_end(argp);
669 
670   /* make sure the error string is terminated */
671   str[sizeof(str)-1] = '\0';
672 
673   /* if the caller is the master, print the results and abort */
674   if (rank == 0)
675     {
676       if (fprintf(stderr, "\nError: ") < 0) exit(eslEWRITE);
677       if (fprintf(stderr, "%s", str)   < 0) exit(eslEWRITE);
678       if (fprintf(stderr, "\n")        < 0) exit(eslEWRITE);
679       fflush(stderr);
680 
681       MPI_Abort(MPI_COMM_WORLD, status);
682       exit(1);
683     }
684   else
685     {
686       MPI_Send(str, len, MPI_CHAR, 0, HMMER_ERROR_TAG, MPI_COMM_WORLD);
687       pause();
688     }
689 }
690 
691 #define MAX_BLOCK_SIZE (512*1024)
692 
693 typedef struct {
694   uint64_t  offset;
695   uint64_t  length;
696   uint64_t  count;
697 } SEQ_BLOCK;
698 
699 typedef struct {
700   int        complete;
701   int        size;
702   int        current;
703   int        last;
704   SEQ_BLOCK *blocks;
705 } BLOCK_LIST;
706 
707 /* this routine parses the database keeping track of the blocks
708  * offset within the file, number of sequences and the length
709  * of the block.  These blocks are passed as work units to the
710  * MPI workers.  If multiple hmm's are in the query file, the
711  * blocks are reused without parsing the database a second time.
712  */
next_block(ESL_SQFILE * sqfp,ESL_SQ * sq,BLOCK_LIST * list,SEQ_BLOCK * block,int n_targetseqs)713 int next_block(ESL_SQFILE *sqfp, ESL_SQ *sq, BLOCK_LIST *list, SEQ_BLOCK *block, int n_targetseqs)
714 {
715   int      status   = eslOK;
716 
717   /* if the list has been calculated, use it instead of parsing the database */
718   if (list->complete)
719     {
720       if (list->current == list->last)
721       {
722         block->offset = 0;
723         block->length = 0;
724         block->count  = 0;
725 
726         status = eslEOF;
727       }
728       else
729       {
730         int inx = list->current++;
731 
732         block->offset = list->blocks[inx].offset;
733         block->length = list->blocks[inx].length;
734         block->count  = list->blocks[inx].count;
735 
736         status = eslOK;
737       }
738 
739       return status;
740     }
741 
742   block->offset = 0;
743   block->length = 0;
744   block->count = 0;
745 
746   esl_sq_Reuse(sq);
747   if (n_targetseqs == 0) status = eslEOF; //this is to handle the end-case of a restrictdb scenario, where no more targets are required, and we want to mark the list as complete
748   while (block->length < MAX_BLOCK_SIZE && (n_targetseqs <0 || block->count < n_targetseqs) && (status = esl_sqio_ReadInfo(sqfp, sq)) == eslOK)
749     {
750       if (block->count == 0) block->offset = sq->roff;
751       block->length = sq->eoff - block->offset + 1;
752       block->count++;
753       esl_sq_Reuse(sq);
754     }
755 
756   if (block->count > 0)
757     if (status == eslEOF || block->count == n_targetseqs)
758       status = eslOK;
759   if (status == eslEOF) list->complete = 1;
760 
761   /* add the block to the list of known blocks */
762   if (status == eslOK)
763     {
764       int inx;
765 
766       if (list->last >= list->size)
767 	{
768 	  void *tmp;
769 	  list->size += 500;
770 	  ESL_RALLOC(list->blocks, tmp, sizeof(SEQ_BLOCK) * list->size);
771 	}
772 
773       inx = list->last++;
774       list->blocks[inx].offset = block->offset;
775       list->blocks[inx].length = block->length;
776       list->blocks[inx].count  = block->count;
777     }
778 
779   return status;
780 
781  ERROR:
782   return eslEMEM;
783 }
784 
785 /* mpi_master()
786  * The MPI version of hmmbuild.
787  * Follows standard pattern for a master/worker load-balanced MPI program (J1/78-79).
788  *
789  * A master can only return if it's successful.
790  * Errors in an MPI master come in two classes: recoverable and nonrecoverable.
791  *
792  * Recoverable errors include all worker-side errors, and any
793  * master-side error that do not affect MPI communication. Error
794  * messages from recoverable messages are delayed until we've cleanly
795  * shut down the workers.
796  *
797  * Unrecoverable errors are master-side errors that may affect MPI
798  * communication, meaning we cannot count on being able to reach the
799  * workers and shut them down. Unrecoverable errors result in immediate
800  * p7_Fail()'s, which will cause MPI to shut down the worker processes
801  * uncleanly.
802  */
803 static int
mpi_master(ESL_GETOPTS * go,struct cfg_s * cfg)804 mpi_master(ESL_GETOPTS *go, struct cfg_s *cfg)
805 {
806   FILE            *ofp      = stdout;            /* results output file (-o)                        */
807   FILE            *afp      = NULL;              /* alignment output file (-A)                      */
808   FILE            *tblfp    = NULL;              /* output stream for tabular per-seq (--tblout)    */
809   FILE            *domtblfp = NULL;              /* output stream for tabular per-dom (--domtblout) */
810   FILE            *pfamtblfp= NULL;              /* output stream for pfam-style tabular output  (--pfamtblout) */
811   P7_BG           *bg       = NULL;	         /* null model                                      */
812   P7_HMMFILE      *hfp      = NULL;              /* open input HMM file                             */
813   ESL_SQFILE      *dbfp     = NULL;              /* open input sequence file                        */
814   P7_HMM          *hmm      = NULL;              /* one HMM query                                   */
815   ESL_SQ          *dbsq     = NULL;              /* one target sequence (digital)                   */
816   ESL_ALPHABET    *abc      = NULL;              /* digital alphabet                                */
817   int              dbfmt    = eslSQFILE_UNKNOWN; /* format code for sequence database file          */
818   ESL_STOPWATCH   *w;
819   int              textw    = 0;
820   int              nquery   = 0;
821   int              status   = eslOK;
822   int              hstatus  = eslOK;
823   int              sstatus  = eslOK;
824   int              dest;
825 
826   char            *mpi_buf  = NULL;              /* buffer used to pack/unpack structures */
827   int              mpi_size = 0;                 /* size of the allocated buffer */
828   BLOCK_LIST      *list     = NULL;
829   SEQ_BLOCK        block;
830 
831   int              i;
832   int              size;
833   MPI_Status       mpistatus;
834   char             errbuf[eslERRBUFSIZE];
835 
836   int              n_targets;
837 
838   w = esl_stopwatch_Create();
839 
840   if (esl_opt_GetBoolean(go, "--notextw")) textw = 0;
841   else                                     textw = esl_opt_GetInteger(go, "--textw");
842 
843   if (esl_opt_IsOn(go, "--tformat")) {
844     dbfmt = esl_sqio_EncodeFormat(esl_opt_GetString(go, "--tformat"));
845     if (dbfmt == eslSQFILE_UNKNOWN) mpi_failure("%s is not a recognized sequence database file format\n", esl_opt_GetString(go, "--tformat"));
846   }
847 
848   /* Open the target sequence database */
849   status = esl_sqfile_Open(cfg->dbfile, dbfmt, p7_SEQDBENV, &dbfp);
850   if      (status == eslENOTFOUND) mpi_failure("Failed to open sequence file %s for reading\n",          cfg->dbfile);
851   else if (status == eslEFORMAT)   mpi_failure("Sequence file %s is empty or misformatted\n",            cfg->dbfile);
852   else if (status == eslEINVAL)    mpi_failure("Can't autodetect format of a stdin or .gz seqfile");
853   else if (status != eslOK)        mpi_failure("Unexpected error %d opening sequence file %s\n", status, cfg->dbfile);
854 
855   if (esl_opt_IsUsed(go, "--restrictdb_stkey") || esl_opt_IsUsed(go, "--restrictdb_n")) {
856       if (esl_opt_IsUsed(go, "--ssifile"))
857         esl_sqfile_OpenSSI(dbfp, esl_opt_GetString(go, "--ssifile"));
858       else
859         esl_sqfile_OpenSSI(dbfp, NULL);
860   }
861 
862 
863   /* Open the query profile HMM file */
864   status = p7_hmmfile_OpenE(cfg->hmmfile, NULL, &hfp, errbuf);
865   if      (status == eslENOTFOUND) mpi_failure("File existence/permissions problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
866   else if (status == eslEFORMAT)   mpi_failure("File format problem in trying to open HMM file %s.\n%s\n",                cfg->hmmfile, errbuf);
867   else if (status != eslOK)        mpi_failure("Unexpected error %d in opening HMM file %s.\n%s\n",               status, cfg->hmmfile, errbuf);
868 
869   /* Open the results output files */
870   if (esl_opt_IsOn(go, "-o") && (ofp = fopen(esl_opt_GetString(go, "-o"), "w")) == NULL)
871     mpi_failure("Failed to open output file %s for writing\n",    esl_opt_GetString(go, "-o"));
872 
873   if (esl_opt_IsOn(go, "-A") && (afp = fopen(esl_opt_GetString(go, "-A"), "w")) == NULL)
874     mpi_failure("Failed to open alignment file %s for writing\n", esl_opt_GetString(go, "-A"));
875 
876   if (esl_opt_IsOn(go, "--tblout") && (tblfp = fopen(esl_opt_GetString(go, "--tblout"), "w")) == NULL)
877     mpi_failure("Failed to open tabular per-seq output file %s for writing\n", esl_opt_GetString(go, "--tblout"));
878 
879   if (esl_opt_IsOn(go, "--domtblout") && (domtblfp = fopen(esl_opt_GetString(go, "--domtblout"), "w")) == NULL)
880     mpi_failure("Failed to open tabular per-dom output file %s for writing\n", esl_opt_GetString(go, "--domtblout"));
881 
882   if (esl_opt_IsOn(go, "--pfamtblout") && (pfamtblfp = fopen(esl_opt_GetString(go, "--pfamtblout"), "w")) == NULL)
883     mpi_failure("Failed to open pfam-style tabular output file %s for writing\n", esl_opt_GetString(go, "--pfamtblout"));
884 
885   ESL_ALLOC(list, sizeof(BLOCK_LIST));
886   list->complete = 0;
887   list->size     = 0;
888   list->current  = 0;
889   list->last     = 0;
890   list->blocks   = NULL;
891 
892   /* <abc> is not known 'til first HMM is read. */
893   hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
894   if (hstatus == eslOK)
895     {
896       /* One-time initializations after alphabet <abc> becomes known */
897       output_header(ofp, go, cfg->hmmfile, cfg->dbfile);
898       dbsq = esl_sq_CreateDigital(abc);
899       bg = p7_bg_Create(abc);
900     }
901 
902 
903   if ( cfg->firstseq_key != NULL ) { //it's tempting to want to do this once and capture the offset position for future passes, but ncbi files make this non-trivial, so this keeps it general
904     sstatus = esl_sqfile_PositionByKey(dbfp, cfg->firstseq_key);
905     if (sstatus != eslOK)
906       p7_Fail("Failure setting restrictdb_stkey to %d\n", cfg->firstseq_key);
907   }
908 
909   /* Outer loop: over each query HMM in <hmmfile>. */
910   while (hstatus == eslOK)
911     {
912       P7_PROFILE      *gm      = NULL;
913       P7_OPROFILE     *om      = NULL;       /* optimized query profile                  */
914       P7_PIPELINE     *pli     = NULL;
915       P7_TOPHITS      *th      = NULL;
916       int              seq_cnt = 0;
917       nquery++;
918       esl_stopwatch_Start(w);
919 
920       n_targets = cfg->n_targetseq;
921 
922       /* seqfile may need to be rewound (multiquery mode) */
923       if (nquery > 1)   list->current = 0;
924 
925       if (fprintf(ofp, "Query:       %s  [M=%d]\n", hmm->name, hmm->M)  < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
926       if (hmm->acc)  { if (fprintf(ofp, "Accession:   %s\n", hmm->acc)  < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
927       if (hmm->desc) { if (fprintf(ofp, "Description: %s\n", hmm->desc) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
928 
929       /* Convert to an optimized model */
930       gm = p7_profile_Create (hmm->M, abc);
931       om = p7_oprofile_Create(hmm->M, abc);
932       p7_ProfileConfig(hmm, bg, gm, 100, p7_LOCAL);
933       p7_oprofile_Convert(gm, om);
934 
935       /* Create processing pipeline and hit list */
936       th  = p7_tophits_Create();
937       pli = p7_pipeline_Create(go, hmm->M, 100, FALSE, p7_SEARCH_SEQS);
938       p7_pli_NewModel(pli, om, bg);
939 
940       /* Main loop: */
941       while ((n_targets==-1 || seq_cnt<=n_targets) && (sstatus = next_block(dbfp, dbsq, list, &block, n_targets-seq_cnt)) == eslOK )
942       {
943         seq_cnt += block.count;
944 
945         if (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpistatus) != 0)
946           mpi_failure("MPI error %d receiving message from %d\n", mpistatus.MPI_SOURCE);
947 
948         MPI_Get_count(&mpistatus, MPI_PACKED, &size);
949         if (mpi_buf == NULL || size > mpi_size) {
950           void *tmp;
951           ESL_RALLOC(mpi_buf, tmp, sizeof(char) * size);
952           mpi_size = size;
953         }
954 
955         dest = mpistatus.MPI_SOURCE;
956         MPI_Recv(mpi_buf, size, MPI_PACKED, dest, mpistatus.MPI_TAG, MPI_COMM_WORLD, &mpistatus);
957 
958         if (mpistatus.MPI_TAG == HMMER_ERROR_TAG)
959           mpi_failure("MPI client %d raised error:\n%s\n", dest, mpi_buf);
960         if (mpistatus.MPI_TAG != HMMER_READY_TAG)
961           mpi_failure("Unexpected tag %d from %d\n", mpistatus.MPI_TAG, dest);
962 
963         MPI_Send(&block, 3, MPI_LONG_LONG_INT, dest, HMMER_BLOCK_TAG, MPI_COMM_WORLD);
964       }
965 
966       if (n_targets!=-1 && seq_cnt==n_targets)
967         sstatus = eslEOF;
968 
969       switch(sstatus)
970       {
971       case eslEFORMAT:
972         mpi_failure("Parse failed (sequence file %s):\n%s\n", dbfp->filename, esl_sqfile_GetErrorBuf(dbfp));
973         break;
974       case eslEOF:
975         break;
976       default:
977         mpi_failure("Unexpected error %d reading sequence file %s", sstatus, dbfp->filename);
978       }
979 
980       block.offset = 0;
981       block.length = 0;
982       block.count  = 0;
983 
984       /* wait for all workers to finish up their work blocks */
985       for (i = 1; i < cfg->nproc; ++i)
986 	{
987 	  if (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpistatus) != 0)
988 	    mpi_failure("MPI error %d receiving message from %d\n", mpistatus.MPI_SOURCE);
989 
990 	  MPI_Get_count(&mpistatus, MPI_PACKED, &size);
991 	  if (mpi_buf == NULL || size > mpi_size) {
992 	    void *tmp;
993 	    ESL_RALLOC(mpi_buf, tmp, sizeof(char) * size);
994 	    mpi_size = size;
995 	  }
996 
997 	  dest = mpistatus.MPI_SOURCE;
998 	  MPI_Recv(mpi_buf, size, MPI_PACKED, dest, mpistatus.MPI_TAG, MPI_COMM_WORLD, &mpistatus);
999 
1000 	  if (mpistatus.MPI_TAG == HMMER_ERROR_TAG)
1001 	    mpi_failure("MPI client %d raised error:\n%s\n", dest, mpi_buf);
1002 	  if (mpistatus.MPI_TAG != HMMER_READY_TAG)
1003 	    mpi_failure("Unexpected tag %d from %d\n", mpistatus.MPI_TAG, dest);
1004 	}
1005 
1006       /* merge the results of the search results */
1007       for (dest = 1; dest < cfg->nproc; ++dest)
1008 	{
1009 	  P7_PIPELINE     *mpi_pli   = NULL;
1010 	  P7_TOPHITS      *mpi_th    = NULL;
1011 
1012 	  /* send an empty block to signal the worker they are done */
1013 	  MPI_Send(&block, 3, MPI_LONG_LONG_INT, dest, HMMER_BLOCK_TAG, MPI_COMM_WORLD);
1014 
1015 	  /* wait for the results */
1016 	  if ((status = p7_tophits_MPIRecv(dest, HMMER_TOPHITS_TAG, MPI_COMM_WORLD, &mpi_buf, &mpi_size, &mpi_th)) != eslOK)
1017 	    mpi_failure("Unexpected error %d receiving tophits from %d", status, dest);
1018 
1019 	  if ((status = p7_pipeline_MPIRecv(dest, HMMER_PIPELINE_TAG, MPI_COMM_WORLD, &mpi_buf, &mpi_size, go, &mpi_pli)) != eslOK)
1020 	    mpi_failure("Unexpected error %d receiving pipeline from %d", status, dest);
1021 
1022 	  p7_tophits_Merge(th, mpi_th);
1023 	  p7_pipeline_Merge(pli, mpi_pli);
1024 
1025 	  p7_pipeline_Destroy(mpi_pli);
1026 	  p7_tophits_Destroy(mpi_th);
1027 	}
1028 
1029       /* Print the results.  */
1030       p7_tophits_SortBySortkey(th);
1031       p7_tophits_Threshold(th, pli);
1032       p7_tophits_Targets(ofp, th, pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1033       p7_tophits_Domains(ofp, th, pli, textw); if (fprintf(ofp, "\n\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1034 
1035       if (tblfp)    p7_tophits_TabularTargets(tblfp,    hmm->name, hmm->acc, th, pli, (nquery == 1));
1036       if (domtblfp) p7_tophits_TabularDomains(domtblfp, hmm->name, hmm->acc, th, pli, (nquery == 1));
1037       if (pfamtblfp) p7_tophits_TabularXfam(pfamtblfp, hmm->name, hmm->acc, th, pli);
1038 
1039       esl_stopwatch_Stop(w);
1040       p7_pli_Statistics(ofp, pli, w);
1041       if (fprintf(ofp, "//\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1042 
1043       /* Output the results in an MSA (-A option) */
1044       if (afp) {
1045 	ESL_MSA *msa = NULL;
1046 
1047 	if (p7_tophits_Alignment(th, abc, NULL, NULL, 0, p7_ALL_CONSENSUS_COLS, &msa) == eslOK)
1048 	  {
1049 	    esl_msa_SetName     (msa, hmm->name, -1);
1050 	    esl_msa_SetAccession(msa, hmm->acc,  -1);
1051 	    esl_msa_SetDesc     (msa, hmm->desc, -1);
1052 	    esl_msa_FormatAuthor(msa, "hmmsearch (HMMER %s)", HMMER_VERSION);
1053 
1054 	    if (textw > 0) esl_msafile_Write(afp, msa, eslMSAFILE_STOCKHOLM);
1055 	    else           esl_msafile_Write(afp, msa, eslMSAFILE_PFAM);
1056 
1057 	    if (fprintf(ofp, "# Alignment of %d hits satisfying inclusion thresholds saved to: %s\n", msa->nseq, esl_opt_GetString(go, "-A")) < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed");
1058 	  }
1059 	else { if (fprintf(ofp, "# No hits satisfy inclusion thresholds; no alignment saved\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
1060 
1061 	esl_msa_Destroy(msa);
1062       }
1063 
1064       p7_pipeline_Destroy(pli);
1065       p7_tophits_Destroy(th);
1066       p7_hmm_Destroy(hmm);
1067 
1068       hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
1069     } /* end outer loop over query HMMs */
1070 
1071   switch(hstatus) {
1072   case eslEOD:       mpi_failure("read failed, HMM file %s may be truncated?", cfg->hmmfile);      break;
1073   case eslEFORMAT:   mpi_failure("bad file format in HMM file %s",             cfg->hmmfile);      break;
1074   case eslEINCOMPAT: mpi_failure("HMM file %s contains different alphabets",   cfg->hmmfile);      break;
1075   case eslEOF:       /* EOF is good, that's what we expect here */                                 break;
1076   default:           mpi_failure("Unexpected error (%d) in reading HMMs from %s", hstatus, cfg->hmmfile);
1077   }
1078 
1079   /* monitor all the workers to make sure they have ended */
1080   for (i = 1; i < cfg->nproc; ++i)
1081     {
1082       if (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpistatus) != 0)
1083 	mpi_failure("MPI error %d receiving message from %d\n", mpistatus.MPI_SOURCE);
1084 
1085       MPI_Get_count(&mpistatus, MPI_PACKED, &size);
1086       if (mpi_buf == NULL || size > mpi_size) {
1087 	void *tmp;
1088 	ESL_RALLOC(mpi_buf, tmp, sizeof(char) * size);
1089 	mpi_size = size;
1090       }
1091 
1092       dest = mpistatus.MPI_SOURCE;
1093       MPI_Recv(mpi_buf, size, MPI_PACKED, dest, mpistatus.MPI_TAG, MPI_COMM_WORLD, &mpistatus);
1094 
1095       if (mpistatus.MPI_TAG == HMMER_ERROR_TAG)
1096 	mpi_failure("MPI client %d raised error:\n%s\n", dest, mpi_buf);
1097       if (mpistatus.MPI_TAG != HMMER_TERMINATING_TAG)
1098 	mpi_failure("Unexpected tag %d from %d\n", mpistatus.MPI_TAG, dest);
1099     }
1100 
1101   /* Terminate outputs... any last words?
1102    */
1103   if (tblfp)    p7_tophits_TabularTail(tblfp,     "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
1104   if (domtblfp) p7_tophits_TabularTail(domtblfp,  "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
1105   if (pfamtblfp)p7_tophits_TabularTail(pfamtblfp, "hmmsearch", p7_SEARCH_SEQS, cfg->hmmfile, cfg->dbfile, go);
1106   if (ofp)     { if (fprintf(ofp, "[ok]\n") < 0) ESL_EXCEPTION_SYS(eslEWRITE, "write failed"); }
1107 
1108   /* Cleanup - prepare for exit
1109    */
1110   free(list);
1111   if (mpi_buf != NULL) free(mpi_buf);
1112 
1113   p7_hmmfile_Close(hfp);
1114   esl_sqfile_Close(dbfp);
1115 
1116   p7_bg_Destroy(bg);
1117   esl_sq_Destroy(dbsq);
1118   esl_stopwatch_Destroy(w);
1119 
1120   if (ofp != stdout) fclose(ofp);
1121   if (afp)           fclose(afp);
1122   if (tblfp)         fclose(tblfp);
1123   if (domtblfp)      fclose(domtblfp);
1124   if (pfamtblfp)     fclose(pfamtblfp);
1125 
1126   return eslOK;
1127 
1128  ERROR:
1129   return eslEMEM;
1130 }
1131 
1132 
1133 static int
mpi_worker(ESL_GETOPTS * go,struct cfg_s * cfg)1134 mpi_worker(ESL_GETOPTS *go, struct cfg_s *cfg)
1135 {
1136   P7_HMM          *hmm      = NULL;              /* one HMM query                                   */
1137   ESL_SQ          *dbsq     = NULL;              /* one target sequence (digital)                   */
1138   ESL_ALPHABET    *abc      = NULL;              /* digital alphabet                                */
1139   P7_BG           *bg       = NULL;	         /* null model                                      */
1140   P7_HMMFILE      *hfp      = NULL;              /* open input HMM file                             */
1141   ESL_SQFILE      *dbfp     = NULL;              /* open input sequence file                        */
1142   int              dbfmt    = eslSQFILE_UNKNOWN; /* format code for sequence database file          */
1143   ESL_STOPWATCH   *w;
1144   int              status   = eslOK;
1145   int              hstatus  = eslOK;
1146   int              sstatus  = eslOK;
1147 
1148   char            *mpi_buf  = NULL;              /* buffer used to pack/unpack structures           */
1149   int              mpi_size = 0;                 /* size of the allocated buffer                    */
1150 
1151   MPI_Status       mpistatus;
1152   char             errbuf[eslERRBUFSIZE];
1153 
1154   w = esl_stopwatch_Create();
1155 
1156   /* Open the target sequence database */
1157   status = esl_sqfile_Open(cfg->dbfile, dbfmt, p7_SEQDBENV, &dbfp);
1158   if      (status == eslENOTFOUND) mpi_failure("Failed to open sequence file %s for reading\n",          cfg->dbfile);
1159   else if (status == eslEFORMAT)   mpi_failure("Sequence file %s is empty or misformatted\n",            cfg->dbfile);
1160   else if (status == eslEINVAL)    mpi_failure("Can't autodetect format of a stdin or .gz seqfile");
1161   else if (status != eslOK)        mpi_failure("Unexpected error %d opening sequence file %s\n", status, cfg->dbfile);
1162 
1163   /* Open the query profile HMM file */
1164   status = p7_hmmfile_OpenE(cfg->hmmfile, NULL, &hfp, errbuf);
1165   if      (status == eslENOTFOUND) mpi_failure("File existence/permissions problem in trying to open HMM file %s.\n%s\n", cfg->hmmfile, errbuf);
1166   else if (status == eslEFORMAT)   mpi_failure("File format problem in trying to open HMM file %s.\n%s\n",                cfg->hmmfile, errbuf);
1167   else if (status != eslOK)        mpi_failure("Unexpected error %d in opening HMM file %s.\n%s\n",               status, cfg->hmmfile, errbuf);
1168 
1169   /* <abc> is not known 'til first HMM is read. */
1170   hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
1171   if (hstatus == eslOK)
1172     {
1173       /* One-time initializations after alphabet <abc> becomes known */
1174       dbsq = esl_sq_CreateDigital(abc);
1175       bg = p7_bg_Create(abc);
1176       esl_sqfile_SetDigital(dbfp, abc);
1177     }
1178 
1179   /* Outer loop: over each query HMM in <hmmfile>. */
1180   while (hstatus == eslOK)
1181     {
1182       P7_PROFILE      *gm      = NULL;
1183       P7_OPROFILE     *om      = NULL;       /* optimized query profile                  */
1184       P7_PIPELINE     *pli     = NULL;
1185       P7_TOPHITS      *th      = NULL;
1186 
1187       SEQ_BLOCK        block;
1188 
1189       esl_stopwatch_Start(w);
1190 
1191       status = 0;
1192       MPI_Send(&status, 1, MPI_INT, 0, HMMER_READY_TAG, MPI_COMM_WORLD);
1193 
1194       /* Convert to an optimized model */
1195       gm = p7_profile_Create (hmm->M, abc);
1196       om = p7_oprofile_Create(hmm->M, abc);
1197       p7_ProfileConfig(hmm, bg, gm, 100, p7_LOCAL);
1198       p7_oprofile_Convert(gm, om);
1199 
1200       th  = p7_tophits_Create();
1201       pli = p7_pipeline_Create(go, om->M, 100, FALSE, p7_SEARCH_SEQS); /* L_hint = 100 is just a dummy for now */
1202       p7_pli_NewModel(pli, om, bg);
1203 
1204       /* receive a sequence block from the master */
1205       MPI_Recv(&block, 3, MPI_LONG_LONG_INT, 0, HMMER_BLOCK_TAG, MPI_COMM_WORLD, &mpistatus);
1206       while (block.count > 0)
1207 	{
1208 	  uint64_t length = 0;
1209 	  uint64_t count  = block.count;
1210 
1211 	  status = esl_sqfile_Position(dbfp, block.offset);
1212 	  if (status != eslOK) mpi_failure("Cannot position sequence database to %ld\n", block.offset);
1213 
1214 	  while (count > 0 && (sstatus = esl_sqio_Read(dbfp, dbsq)) == eslOK)
1215 	    {
1216 	      length = dbsq->eoff - block.offset + 1;
1217 
1218 	      p7_pli_NewSeq(pli, dbsq);
1219 	      p7_bg_SetLength(bg, dbsq->n);
1220 	      p7_oprofile_ReconfigLength(om, dbsq->n);
1221 
1222 	      p7_Pipeline(pli, om, bg, dbsq, NULL, th);
1223 
1224 	      esl_sq_Reuse(dbsq);
1225 	      p7_pipeline_Reuse(pli);
1226 
1227 	      --count;
1228 	    }
1229 
1230 	  /* lets do a little bit of sanity checking here to make sure the blocks are the same */
1231 	  if (count > 0)              mpi_failure("Block count mismatch - expected %ld found %ld at offset %ld\n",  block.count,  block.count - count, block.offset);
1232 	  if (block.length != length) mpi_failure("Block length mismatch - expected %ld found %ld at offset %ld\n", block.length, length,              block.offset);
1233 
1234 	  /* inform the master we need another block of sequences */
1235 	  status = 0;
1236 	  MPI_Send(&status, 1, MPI_INT, 0, HMMER_READY_TAG, MPI_COMM_WORLD);
1237 
1238 	  /* wait for the next block of sequences */
1239 	  MPI_Recv(&block, 3, MPI_LONG_LONG_INT, 0, HMMER_BLOCK_TAG, MPI_COMM_WORLD, &mpistatus);
1240 	}
1241 
1242       esl_stopwatch_Stop(w);
1243 
1244       /* Send the top hits back to the master. */
1245       p7_tophits_MPISend(th, 0, HMMER_TOPHITS_TAG, MPI_COMM_WORLD,  &mpi_buf, &mpi_size);
1246       p7_pipeline_MPISend(pli, 0, HMMER_PIPELINE_TAG, MPI_COMM_WORLD,  &mpi_buf, &mpi_size);
1247 
1248       p7_pipeline_Destroy(pli);
1249       p7_tophits_Destroy(th);
1250       p7_oprofile_Destroy(om);
1251       p7_profile_Destroy(gm);
1252       p7_hmm_Destroy(hmm);
1253 
1254       hstatus = p7_hmmfile_Read(hfp, &abc, &hmm);
1255     } /* end outer loop over query HMMs */
1256 
1257   switch(hstatus)
1258     {
1259     case eslEOF:
1260       /* do nothing */
1261       break;
1262     case eslEFORMAT:
1263       mpi_failure("bad file format in HMM file %s", cfg->hmmfile);
1264       break;
1265     case eslEINCOMPAT:
1266       mpi_failure("HMM file %s contains different alphabets", cfg->hmmfile);
1267       break;
1268     default:
1269       mpi_failure("Unexpected error (%d) in reading HMMs from %s", hstatus, cfg->hmmfile);
1270     }
1271 
1272   status = 0;
1273   MPI_Send(&status, 1, MPI_INT, 0, HMMER_TERMINATING_TAG, MPI_COMM_WORLD);
1274 
1275   if (mpi_buf != NULL) free(mpi_buf);
1276 
1277   p7_hmmfile_Close(hfp);
1278   esl_sqfile_Close(dbfp);
1279 
1280   p7_bg_Destroy(bg);
1281   esl_sq_Destroy(dbsq);
1282   esl_stopwatch_Destroy(w);
1283 
1284   return eslOK;
1285 }
1286 #endif /*HMMER_MPI*/
1287 
1288 static int
serial_loop(WORKER_INFO * info,ESL_SQFILE * dbfp,int n_targetseqs)1289 serial_loop(WORKER_INFO *info, ESL_SQFILE *dbfp, int n_targetseqs)
1290 {
1291   int      sstatus;
1292   ESL_SQ   *dbsq     = NULL;   /* one target sequence (digital)  */
1293   int seq_cnt = 0;
1294 
1295   dbsq = esl_sq_CreateDigital(info->om->abc);
1296 
1297   /* Main loop: */
1298   while ( (n_targetseqs==-1 || seq_cnt<n_targetseqs) &&  (sstatus = esl_sqio_Read(dbfp, dbsq)) == eslOK)
1299   {
1300       p7_pli_NewSeq(info->pli, dbsq);
1301       p7_bg_SetLength(info->bg, dbsq->n);
1302       p7_oprofile_ReconfigLength(info->om, dbsq->n);
1303 
1304       p7_Pipeline(info->pli, info->om, info->bg, dbsq, NULL, info->th);
1305 
1306       seq_cnt++;
1307       esl_sq_Reuse(dbsq);
1308       p7_pipeline_Reuse(info->pli);
1309   }
1310 
1311   if (n_targetseqs!=-1 && seq_cnt==n_targetseqs)
1312     sstatus = eslEOF;
1313 
1314   esl_sq_Destroy(dbsq);
1315 
1316   return sstatus;
1317 }
1318 
1319 #ifdef HMMER_THREADS
1320 static int
thread_loop(ESL_THREADS * obj,ESL_WORK_QUEUE * queue,ESL_SQFILE * dbfp,int n_targetseqs)1321 thread_loop(ESL_THREADS *obj, ESL_WORK_QUEUE *queue, ESL_SQFILE *dbfp, int n_targetseqs)
1322 {
1323   int  status  = eslOK;
1324   int  sstatus = eslOK;
1325   int  eofCount = 0;
1326   ESL_SQ_BLOCK *block;
1327   void         *newBlock;
1328 
1329   esl_workqueue_Reset(queue);
1330   esl_threads_WaitForStart(obj);
1331 
1332   status = esl_workqueue_ReaderUpdate(queue, NULL, &newBlock);
1333   if (status != eslOK) esl_fatal("Work queue reader failed");
1334 
1335   /* Main loop: */
1336   while (sstatus == eslOK )
1337     {
1338       block = (ESL_SQ_BLOCK *) newBlock;
1339 
1340       if (n_targetseqs == 0)
1341       {
1342         block->count = 0;
1343         sstatus = eslEOF;
1344       } else {
1345         sstatus = esl_sqio_ReadBlock(dbfp, block, -1, n_targetseqs, FALSE);
1346         n_targetseqs -= block->count;
1347       }
1348 
1349       if (sstatus == eslEOF)
1350       {
1351         if (eofCount < esl_threads_GetWorkerCount(obj)) sstatus = eslOK;
1352         ++eofCount;
1353       }
1354 
1355       if (sstatus == eslOK)
1356       {
1357         status = esl_workqueue_ReaderUpdate(queue, block, &newBlock);
1358         if (status != eslOK) esl_fatal("Work queue reader failed");
1359       }
1360     }
1361 
1362   status = esl_workqueue_ReaderUpdate(queue, block, NULL);
1363   if (status != eslOK) esl_fatal("Work queue reader failed");
1364 
1365   if (sstatus == eslEOF)
1366     {
1367       /* wait for all the threads to complete */
1368       esl_threads_WaitForFinish(obj);
1369       esl_workqueue_Complete(queue);
1370     }
1371 
1372   return sstatus;
1373 }
1374 
1375 static void
pipeline_thread(void * arg)1376 pipeline_thread(void *arg)
1377 {
1378   int i;
1379   int status;
1380   int workeridx;
1381   WORKER_INFO   *info;
1382   ESL_THREADS   *obj;
1383 
1384   ESL_SQ_BLOCK  *block = NULL;
1385   void          *newBlock;
1386 
1387   impl_Init();
1388 
1389   obj = (ESL_THREADS *) arg;
1390   esl_threads_Started(obj, &workeridx);
1391 
1392   info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
1393 
1394   status = esl_workqueue_WorkerUpdate(info->queue, NULL, &newBlock);
1395   if (status != eslOK) esl_fatal("Work queue worker failed");
1396 
1397   /* loop until all blocks have been processed */
1398   block = (ESL_SQ_BLOCK *) newBlock;
1399   while (block->count > 0)
1400     {
1401       /* Main loop: */
1402       for (i = 0; i < block->count; ++i)
1403 	{
1404 	  ESL_SQ *dbsq = block->list + i;
1405 
1406 	  p7_pli_NewSeq(info->pli, dbsq);
1407 	  p7_bg_SetLength(info->bg, dbsq->n);
1408 	  p7_oprofile_ReconfigLength(info->om, dbsq->n);
1409 
1410 	  p7_Pipeline(info->pli, info->om, info->bg, dbsq, NULL, info->th);
1411 
1412 	  esl_sq_Reuse(dbsq);
1413 	  p7_pipeline_Reuse(info->pli);
1414 	}
1415 
1416       status = esl_workqueue_WorkerUpdate(info->queue, block, &newBlock);
1417       if (status != eslOK) esl_fatal("Work queue worker failed");
1418 
1419       block = (ESL_SQ_BLOCK *) newBlock;
1420     }
1421 
1422   status = esl_workqueue_WorkerUpdate(info->queue, block, NULL);
1423   if (status != eslOK) esl_fatal("Work queue worker failed");
1424 
1425   esl_threads_Finished(obj, workeridx);
1426   return;
1427 }
1428 #endif   /* HMMER_THREADS */
1429 
1430 
1431 
1432