1 /*  $Id: test_netschedule_node.cpp 574016 2018-11-05 16:55:15Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Anatoliy Kuznetsov, Dmitry Kazimirov
27  *
28  * File Description:  NetSchedule client test
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <corelib/ncbiapp.hpp>
35 #include <corelib/ncbiargs.hpp>
36 #include <corelib/ncbienv.hpp>
37 #include <corelib/ncbireg.hpp>
38 #include <corelib/ncbi_system.hpp>
39 #include <corelib/ncbimisc.hpp>
40 
41 #include <connect/services/netschedule_api.hpp>
42 #include <connect/services/grid_globals.hpp>
43 #include <connect/services/grid_app_version_info.hpp>
44 
45 #include <connect/ncbi_socket.hpp>
46 #include <connect/ncbi_types.h>
47 
48 #include <util/distribution.hpp>
49 #include <util/random_gen.hpp>
50 
51 #include "test_netschedule_data.hpp"
52 
53 #define GRID_APP_NAME "test_netschedule_node"
54 
55 BEGIN_NCBI_SCOPE
56 
57 NCBI_PARAM_DECL(string, output, size_distr);
58 typedef NCBI_PARAM_TYPE(output, size_distr) TParam_SizeDistr;
59 
60 NCBI_PARAM_DECL(string, output, time_distr);
61 typedef NCBI_PARAM_TYPE(output, time_distr) TParam_TimeDistr;
62 
63 NCBI_PARAM_DECL(double, output, failure_rate);
64 typedef NCBI_PARAM_TYPE(output, failure_rate) TParam_FailureRate;
65 
66 static CRandom s_Random;
67 
68 /// Test application
69 ///
70 /// @internal
71 ///
72 class CTestNetScheduleNode : public CNcbiApplication
73 {
74 public:
75     void Init(void);
76     int Run(void);
77 
78 private:
79     CDiscreteDistributionImpl m_SizeDistr;
80     CDiscreteDistributionImpl m_TimeDistr;
81 };
82 
83 
84 NCBI_PARAM_DEF_EX(string, output, size_distr, "",
85     eParam_NoThread, NS_OUTPUT_SIZE_DISTR);
86 NCBI_PARAM_DEF_EX(string, output, time_distr, "",
87     eParam_NoThread, NS_OUTPUT_TIME_DISTR);
88 NCBI_PARAM_DEF_EX(double, output, failure_rate, 0.0,
89     eParam_NoThread, NS_OUTPUT_FAILURE_RATE);
90 
Init(void)91 void CTestNetScheduleNode::Init(void)
92 {
93     InitOutputBuffer();
94 
95     SetDiagPostFlag(eDPF_Trace);
96     SetDiagPostLevel(eDiag_Info);
97 
98     // Setup command line arguments and parameters
99 
100     // Create command-line argument descriptions class
101     unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
102 
103     // Specify USAGE context
104     arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
105         "NetSchedule node");
106 
107     arg_desc->AddPositional("service", "NetSchedule service name.",
108         CArgDescriptions::eString);
109 
110     arg_desc->AddPositional("queue", "NetSchedule queue name (like: noname).",
111         CArgDescriptions::eString);
112 
113     arg_desc->AddFlag("verbose", "Log progress.");
114 
115     // Setup arg.descriptions for this application
116     SetupArgDescriptions(arg_desc.release());
117 }
118 
Run(void)119 int CTestNetScheduleNode::Run(void)
120 {
121     m_SizeDistr.InitFromParameter("size_distr",
122         TParam_SizeDistr::GetDefault().c_str(), &s_Random);
123     m_TimeDistr.InitFromParameter("time_distr",
124         TParam_TimeDistr::GetDefault().c_str(), &s_Random);
125 
126     const CArgs& args = GetArgs();
127 
128     string service = args["service"].AsString();
129     string queue_name = args["queue"].AsString();
130 
131     bool verbose = args["verbose"];
132 
133     string program_name = GetProgramDisplayName();
134 
135     if (verbose) {
136         LOG_POST(Info << program_name << " started");
137         LOG_POST(Info << "Failure rate: " << TParam_FailureRate::GetDefault());
138     }
139 
140     CNetScheduleAPI ns_api(service, program_name, queue_name);
141 
142     STimeout comm_timeout;
143     comm_timeout.sec  = 1200;
144     comm_timeout.usec = 0;
145     ns_api.GetService().GetServerPool().SetCommunicationTimeout(comm_timeout);
146     CNetScheduleExecutor ns_exec = ns_api.GetExecutor();
147 
148     string job_key;
149     string input;
150 
151     set<string> jobs_processed;
152 
153     // The main job processing loop, polls the
154     // queue for available jobs
155     //
156     // There is no payload algorithm here, node just
157     // sleeps and reports the processing result back to server.
158     // Practical application should use NetCache as result storage
159     //
160     // Well behaved node should not constantly poll the queue for
161     // jobs (GetJob()).
162     // When NetSchedule Server reports there are no more jobs: worker node
163     // should take a brake and do not poll for a while...
164     // Or use WaitJob(), it receives notification messages from the
165     // server using stateless UDP protocol.
166     // It is strongly suggested that there is just one program using
167     // specified UDP port on the machine.
168 
169     CNetScheduleJob job;
170 
171     string expected_input = "Hello " + queue_name;
172 
173     bool done = false;
174 
175     while (!done) {
176         if (ns_exec.GetJob(job, 180)) {
177             if (job.input == "DIE") {
178                 LOG_POST(Info << "Got poison pill, exiting.");
179                 done = true;
180             } else if (job.input != expected_input) {
181                 LOG_POST(Error << "Unexpected input: " + input);
182             }
183 
184             // do no job here, just delay for a little while
185             unsigned delay = m_TimeDistr.GetNextValue();
186             if (delay > 0)
187                 SleepMilliSec(delay);
188 
189             unsigned output_size = m_SizeDistr.GetNextValue();
190             if (output_size > 0) {
191                 if (output_size > MAX_OUTPUT_SIZE)
192                     output_size = MAX_OUTPUT_SIZE;
193 
194                 job.output.assign(output_buffer, output_size);
195                 // job.output[output_size - 1] = '\n';
196             } else
197                 job.output.erase();
198 
199             job.ret_code = 0;
200 
201             if (s_Random.GetRand() >= (s_Random.GetMax() + 1) *
202                     TParam_FailureRate::GetDefault())
203                 ns_exec.PutResult(job);
204             else {
205                 job.error_msg = "DELIBERATE_FAILURE";
206                 ns_exec.PutFailure(job);
207             }
208 
209             if (jobs_processed.find(job.job_id) == jobs_processed.end()) {
210                 jobs_processed.insert(job.job_id);
211                 if (verbose) {
212                     LOG_POST(Info << "Job " << job.job_id << " is done.");
213                 }
214             } else {
215                 LOG_POST(Error << "Job: " << job.job_id <<
216                     " has already been processed.");
217             }
218         }
219     }
220 
221     return CGridGlobals::GetInstance().GetExitCode();
222 }
223 
224 END_NCBI_SCOPE
225 
226 
227 USING_NCBI_SCOPE;
228 
main(int argc,const char * argv[])229 int main(int argc, const char* argv[])
230 {
231     GRID_APP_CHECK_VERSION_ARGS();
232 
233     return CTestNetScheduleNode().AppMain(argc, argv);
234 }
235