1 /*  $Id: rpstblastn_app.cpp 632182 2021-05-27 13:23:33Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Jason Papadopoulod
27  *
28  */
29 
30 /** @file rpstblastn_app.cpp
31  * RPS TBLASTN command line application
32  */
33 
34 #include <ncbi_pch.hpp>
35 #include <corelib/ncbiapp.hpp>
36 #include <algo/blast/api/local_blast.hpp>
37 #include <algo/blast/api/remote_blast.hpp>
38 #include <algo/blast/blastinput/blast_fasta_input.hpp>
39 #include <algo/blast/blastinput/rpstblastn_args.hpp>
40 #include <algo/blast/api/objmgr_query_data.hpp>
41 #include <algo/blast/format/blast_format.hpp>
42 #include "blast_app_util.hpp"
43 #include "rpstblastn_node.hpp"
44 #include <objtools/blast/seqdb_reader/seqdb.hpp>
45 #include <algo/blast/api/rpsblast_local.hpp>
46 
47 #ifndef SKIP_DOXYGEN_PROCESSING
48 USING_NCBI_SCOPE;
49 USING_SCOPE(blast);
50 USING_SCOPE(objects);
51 #endif
52 
53 class CRPSTBlastnApp : public CNcbiApplication
54 {
55 public:
56     /** @inheritDoc */
CRPSTBlastnApp()57     CRPSTBlastnApp() {
58         CRef<CVersion> version(new CVersion());
59         version->SetVersionInfo(new CBlastVersion());
60         SetFullVersion(version);
61         m_StopWatch.Start();
62         if (m_UsageReport.IsEnabled()) {
63         	m_UsageReport.AddParam(CBlastUsageReport::eVersion, GetVersion().Print());
64         }
65     }
~CRPSTBlastnApp()66     ~CRPSTBlastnApp() {
67     	m_UsageReport.AddParam(CBlastUsageReport::eRunTime, m_StopWatch.Elapsed());
68     }
69 private:
70     /** @inheritDoc */
71     virtual void Init();
72     /** @inheritDoc */
73     virtual int Run();
74 
75     int x_RunMTBySplitDB();
76     int x_RunMTBySplitQuery();
77 
78     /// This application's command line args
79     CRef<CRPSTBlastnAppArgs> m_CmdLineArgs;
80     CBlastUsageReport m_UsageReport;
81     CStopWatch m_StopWatch;
82 };
83 
Init()84 void CRPSTBlastnApp::Init()
85 {
86     // formulate command line arguments
87 
88     m_CmdLineArgs.Reset(new CRPSTBlastnAppArgs());
89 
90     // read the command line
91 
92     HideStdArgs(fHideLogfile | fHideConffile | fHideFullVersion | fHideXmlHelp | fHideDryRun);
93     SetupArgDescriptions(m_CmdLineArgs->SetCommandLine());
94 }
95 
96 
Run(void)97 int CRPSTBlastnApp::Run(void)
98 {
99 	const CArgs& args = GetArgs();
100 	CMTArgs mt_args(args);
101 	if ((mt_args.GetMTMode() == CMTArgs::eSplitByQueries) &&
102 			(mt_args.GetNumThreads() > 1)){
103 		m_UsageReport.AddParam(CBlastUsageReport::eMTMode, args[kArgMTMode].AsInteger());
104 		return x_RunMTBySplitQuery();
105 	}
106 	else {
107 		return x_RunMTBySplitDB();
108 	}
109 }
110 
x_RunMTBySplitDB(void)111 int CRPSTBlastnApp::x_RunMTBySplitDB(void)
112 {
113     int status = BLAST_EXIT_SUCCESS;
114     CBlastAppDiagHandler bah;
115 
116     try {
117         // Allow the fasta reader to complain on invalid sequence input
118         SetDiagPostLevel(eDiag_Warning);
119         SetDiagPostPrefix("rpstblastn");
120         SetDiagHandler(&bah, false);
121 
122         /*** Get the BLAST options ***/
123         const CArgs& args = GetArgs();
124         CRef<CBlastOptionsHandle> opts_hndl;
125         if(RecoverSearchStrategy(args, m_CmdLineArgs)) {
126         	opts_hndl.Reset(&*m_CmdLineArgs->SetOptionsForSavedStrategy(args));
127         }
128         else {
129         	opts_hndl.Reset(&*m_CmdLineArgs->SetOptions(args));
130         }
131         const CBlastOptions& opt = opts_hndl->GetOptions();
132 
133         /*** Initialize the database ***/
134         CRef<CBlastDatabaseArgs> db_args(m_CmdLineArgs->GetBlastDatabaseArgs());
135         CRef<CLocalDbAdapter> db_adapter;
136         CRef<CScope> scope;
137         InitializeSubject(db_args, opts_hndl, m_CmdLineArgs->ExecuteRemotely(),
138                          db_adapter, scope);
139         _ASSERT(db_adapter && scope);
140 
141         /*** Get the query sequence(s) ***/
142         CRef<CQueryOptionsArgs> query_opts =
143             m_CmdLineArgs->GetQueryOptionsArgs();
144         SDataLoaderConfig dlconfig =
145             InitializeQueryDataLoaderConfiguration(query_opts->QueryIsProtein(),
146                                                    db_adapter);
147         CBlastInputSourceConfig iconfig(dlconfig, query_opts->GetStrand(),
148                                      query_opts->UseLowercaseMasks(),
149                                      query_opts->GetParseDeflines(),
150                                      query_opts->GetRange());
151         if(IsIStreamEmpty(m_CmdLineArgs->GetInputStream())){
152            	ERR_POST(Warning << "Query is Empty!");
153            	return BLAST_EXIT_SUCCESS;
154         }
155         CBlastFastaInputSource fasta(m_CmdLineArgs->GetInputStream(), iconfig);
156         CBlastInput input(&fasta, m_CmdLineArgs->GetQueryBatchSize());
157 
158         /*** Get the formatting options ***/
159         CRef<CFormattingArgs> fmt_args(m_CmdLineArgs->GetFormattingArgs());
160         bool isArchiveFormat = fmt_args->ArchiveFormatRequested(args);
161         if(!isArchiveFormat) {
162         	bah.DoNotSaveMessages();
163         }
164         CBlastFormat formatter(opt, *db_adapter,
165                                fmt_args->GetFormattedOutputChoice(),
166                                query_opts->GetParseDeflines(),
167                                m_CmdLineArgs->GetOutputStream(),
168                                fmt_args->GetNumDescriptions(),
169                                fmt_args->GetNumAlignments(),
170                                *scope,
171                                opt.GetMatrixName(),
172                                fmt_args->ShowGis(),
173                                fmt_args->DisplayHtmlOutput(),
174                                opt.GetQueryGeneticCode(),
175                                opt.GetDbGeneticCode(),
176                                opt.GetSumStatisticsMode(),
177                                m_CmdLineArgs->ExecuteRemotely(),
178                                db_adapter->GetFilteringAlgorithm(),
179                                fmt_args->GetCustomOutputFormatSpec(),
180                                false, false, NULL, NULL,
181                                GetCmdlineArgs(GetArguments()));
182 
183         formatter.SetQueryRange(query_opts->GetRange());
184         formatter.SetLineLength(fmt_args->GetLineLength());
185         if(UseXInclude(*fmt_args, args[kArgOutput].AsString())) {
186         	formatter.SetBaseFile(args[kArgOutput].AsString());
187         }
188         formatter.PrintProlog();
189 
190         /*** Process the input ***/
191         for (; !input.End(); formatter.ResetScopeHistory(), QueryBatchCleanup()) {
192 
193             CRef<CBlastQueryVector> query_batch(input.GetNextSeqBatch(*scope));
194             CRef<IQueryFactory> queries(new CObjMgr_QueryFactory(*query_batch));
195 
196             SaveSearchStrategy(args, m_CmdLineArgs, queries, opts_hndl);
197 
198             CRef<CSearchResultSet> results;
199 
200             if (m_CmdLineArgs->ExecuteRemotely()) {
201                 CRef<CRemoteBlast> rmt_blast =
202                     InitializeRemoteBlast(queries, db_args, opts_hndl,
203                           m_CmdLineArgs->ProduceDebugRemoteOutput(),
204                           m_CmdLineArgs->GetClientId());
205                 results = rmt_blast->GetResultSet();
206             } else {
207             	CLocalRPSBlast  local_search (query_batch, db_args->GetDatabaseName(), opts_hndl, args[kArgNumThreads].AsInteger() );
208             	results = local_search.Run();
209             }
210 
211             if (fmt_args->ArchiveFormatRequested(args)) {
212                 formatter.WriteArchive(*queries, *opts_hndl, *results, 0, bah.GetMessages());
213                 bah.ResetMessages();
214             } else {
215                 BlastFormatter_PreFetchSequenceData(*results, scope,
216                 		                            fmt_args->GetFormattedOutputChoice());
217                 ITERATE(CSearchResultSet, result, *results) {
218                     formatter.PrintOneResultSet(**result, query_batch);
219                 }
220             }
221         }
222 
223         formatter.PrintEpilog(opt);
224 
225         if (m_CmdLineArgs->ProduceDebugOutput()) {
226             opts_hndl->GetOptions().DebugDumpText(NcbiCerr, "BLAST options", 1);
227         }
228 
229         LogQueryInfo(m_UsageReport, input);
230         formatter.LogBlastSearchInfo(m_UsageReport);
231     } CATCH_ALL(status)
232     if(!bah.GetMessages().empty()) {
233     	const CArgs & a = GetArgs();
234     	PrintErrorArchive(a, bah.GetMessages());
235     }
236 	m_UsageReport.AddParam(CBlastUsageReport::eNumThreads, (int) m_CmdLineArgs->GetNumThreads());
237     m_UsageReport.AddParam(CBlastUsageReport::eExitStatus, status);
238     return status;
239 }
240 
x_RunMTBySplitQuery(void)241 int CRPSTBlastnApp::x_RunMTBySplitQuery(void)
242 {
243     int status = BLAST_EXIT_SUCCESS;
244     CBlastAppDiagHandler bah;
245     int batch_size = 8000;
246 
247 	char * mt_query_batch_env = getenv("BLAST_MT_QUERY_BATCH_SIZE");
248 	if (mt_query_batch_env) {
249 		batch_size = NStr::StringToInt(mt_query_batch_env);
250 	}
251 	INFO_POST("Batch Size: " << batch_size);
252     // Allow the fasta reader to complain on invalid sequence input
253     SetDiagPostLevel(eDiag_Warning);
254     SetDiagPostPrefix("rpstblastn_mt");
255     SetDiagHandler(&bah, false);
256 
257 	try {
258     	const CArgs& args = GetArgs();
259     	CRef<CBlastOptionsHandle> opts_hndl;
260         if(RecoverSearchStrategy(args, m_CmdLineArgs)) {
261         	opts_hndl.Reset(&*m_CmdLineArgs->SetOptionsForSavedStrategy(args));
262         }
263         else {
264         	opts_hndl.Reset(&*m_CmdLineArgs->SetOptions(args));
265         }
266     	if(IsIStreamEmpty(m_CmdLineArgs->GetInputStream())){
267        		ERR_POST(Warning << "Query is Empty!");
268        		return BLAST_EXIT_SUCCESS;
269     	}
270     	CNcbiOstream & out_stream = m_CmdLineArgs->GetOutputStream();
271     	const int kMaxNumOfThreads =  m_CmdLineArgs->GetNumThreads();
272 		CBlastMasterNode master_node(out_stream, kMaxNumOfThreads);
273    		int chunk_num = 0;
274 
275    		LogRPSBlastOptions(m_UsageReport, opts_hndl->GetOptions());
276    		LogRPSCmdOptions(m_UsageReport, *m_CmdLineArgs);
277    		CBlastNodeInputReader input(m_CmdLineArgs->GetInputStream(), batch_size, 4500);
278 		while (master_node.Processing()) {
279 			if (!input.AtEOF()) {
280 			 	if (!master_node.IsFull()) {
281 			 		int q_index = 0;
282 					string qb;
283 					int num_q = input.GetQueryBatch(qb, q_index);
284 					if (num_q > 0) {
285 						CBlastNodeMailbox * mb(new CBlastNodeMailbox(chunk_num, master_node.GetBuzzer()));
286 						CRPSTBlastnNode * t(new CRPSTBlastnNode(chunk_num, GetArguments(), args, bah, qb, q_index, num_q, mb));
287 						master_node.RegisterNode(t, mb);
288 						chunk_num ++;
289 					}
290 				}
291 			}
292 			else {
293 				master_node.Shutdown();
294 				m_UsageReport.AddParam(CBlastUsageReport::eNumQueries, master_node.GetNumOfQueries());
295 				m_UsageReport.AddParam(CBlastUsageReport::eTotalQueryLength, master_node.GetQueriesLength());
296 				m_UsageReport.AddParam(CBlastUsageReport::eNumErrStatus, master_node.GetNumErrStatus());
297 				m_UsageReport.AddParam(CBlastUsageReport::eNumQueryBatches, chunk_num);
298 			}
299 
300     	}
301 
302 
303 		if(chunk_num < kMaxNumOfThreads){
304 			CheckMTByQueries_QuerySize(opts_hndl->GetOptions().GetProgram(), batch_size);
305 		}
306 	} CATCH_ALL (status)
307 
308     if(!bah.GetMessages().empty()) {
309     	const CArgs & a = GetArgs();
310     	PrintErrorArchive(a, bah.GetMessages());
311     }
312 	m_UsageReport.AddParam(CBlastUsageReport::eNumThreads, (int) m_CmdLineArgs->GetNumThreads());
313     m_UsageReport.AddParam(CBlastUsageReport::eExitStatus, status);
314     return status;
315 }
316 
317 
318 #ifndef SKIP_DOXYGEN_PROCESSING
main(int argc,const char * argv[])319 int main(int argc, const char* argv[] /*, const char* envp[]*/)
320 {
321     return CRPSTBlastnApp().AppMain(argc, argv);
322 }
323 #endif /* SKIP_DOXYGEN_PROCESSING */
324