1 /* $Id: test_netschedule_node.cpp 574016 2018-11-05 16:55:15Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Anatoliy Kuznetsov, Dmitry Kazimirov
27 *
28 * File Description: NetSchedule client test
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include <corelib/ncbiapp.hpp>
35 #include <corelib/ncbiargs.hpp>
36 #include <corelib/ncbienv.hpp>
37 #include <corelib/ncbireg.hpp>
38 #include <corelib/ncbi_system.hpp>
39 #include <corelib/ncbimisc.hpp>
40
41 #include <connect/services/netschedule_api.hpp>
42 #include <connect/services/grid_globals.hpp>
43 #include <connect/services/grid_app_version_info.hpp>
44
45 #include <connect/ncbi_socket.hpp>
46 #include <connect/ncbi_types.h>
47
48 #include <util/distribution.hpp>
49 #include <util/random_gen.hpp>
50
51 #include "test_netschedule_data.hpp"
52
53 #define GRID_APP_NAME "test_netschedule_node"
54
55 BEGIN_NCBI_SCOPE
56
57 NCBI_PARAM_DECL(string, output, size_distr);
58 typedef NCBI_PARAM_TYPE(output, size_distr) TParam_SizeDistr;
59
60 NCBI_PARAM_DECL(string, output, time_distr);
61 typedef NCBI_PARAM_TYPE(output, time_distr) TParam_TimeDistr;
62
63 NCBI_PARAM_DECL(double, output, failure_rate);
64 typedef NCBI_PARAM_TYPE(output, failure_rate) TParam_FailureRate;
65
66 static CRandom s_Random;
67
68 /// Test application
69 ///
70 /// @internal
71 ///
72 class CTestNetScheduleNode : public CNcbiApplication
73 {
74 public:
75 void Init(void);
76 int Run(void);
77
78 private:
79 CDiscreteDistributionImpl m_SizeDistr;
80 CDiscreteDistributionImpl m_TimeDistr;
81 };
82
83
84 NCBI_PARAM_DEF_EX(string, output, size_distr, "",
85 eParam_NoThread, NS_OUTPUT_SIZE_DISTR);
86 NCBI_PARAM_DEF_EX(string, output, time_distr, "",
87 eParam_NoThread, NS_OUTPUT_TIME_DISTR);
88 NCBI_PARAM_DEF_EX(double, output, failure_rate, 0.0,
89 eParam_NoThread, NS_OUTPUT_FAILURE_RATE);
90
Init(void)91 void CTestNetScheduleNode::Init(void)
92 {
93 InitOutputBuffer();
94
95 SetDiagPostFlag(eDPF_Trace);
96 SetDiagPostLevel(eDiag_Info);
97
98 // Setup command line arguments and parameters
99
100 // Create command-line argument descriptions class
101 unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
102
103 // Specify USAGE context
104 arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
105 "NetSchedule node");
106
107 arg_desc->AddPositional("service", "NetSchedule service name.",
108 CArgDescriptions::eString);
109
110 arg_desc->AddPositional("queue", "NetSchedule queue name (like: noname).",
111 CArgDescriptions::eString);
112
113 arg_desc->AddFlag("verbose", "Log progress.");
114
115 // Setup arg.descriptions for this application
116 SetupArgDescriptions(arg_desc.release());
117 }
118
Run(void)119 int CTestNetScheduleNode::Run(void)
120 {
121 m_SizeDistr.InitFromParameter("size_distr",
122 TParam_SizeDistr::GetDefault().c_str(), &s_Random);
123 m_TimeDistr.InitFromParameter("time_distr",
124 TParam_TimeDistr::GetDefault().c_str(), &s_Random);
125
126 const CArgs& args = GetArgs();
127
128 string service = args["service"].AsString();
129 string queue_name = args["queue"].AsString();
130
131 bool verbose = args["verbose"];
132
133 string program_name = GetProgramDisplayName();
134
135 if (verbose) {
136 LOG_POST(Info << program_name << " started");
137 LOG_POST(Info << "Failure rate: " << TParam_FailureRate::GetDefault());
138 }
139
140 CNetScheduleAPI ns_api(service, program_name, queue_name);
141
142 STimeout comm_timeout;
143 comm_timeout.sec = 1200;
144 comm_timeout.usec = 0;
145 ns_api.GetService().GetServerPool().SetCommunicationTimeout(comm_timeout);
146 CNetScheduleExecutor ns_exec = ns_api.GetExecutor();
147
148 string job_key;
149 string input;
150
151 set<string> jobs_processed;
152
153 // The main job processing loop, polls the
154 // queue for available jobs
155 //
156 // There is no payload algorithm here, node just
157 // sleeps and reports the processing result back to server.
158 // Practical application should use NetCache as result storage
159 //
160 // Well behaved node should not constantly poll the queue for
161 // jobs (GetJob()).
162 // When NetSchedule Server reports there are no more jobs: worker node
163 // should take a brake and do not poll for a while...
164 // Or use WaitJob(), it receives notification messages from the
165 // server using stateless UDP protocol.
166 // It is strongly suggested that there is just one program using
167 // specified UDP port on the machine.
168
169 CNetScheduleJob job;
170
171 string expected_input = "Hello " + queue_name;
172
173 bool done = false;
174
175 while (!done) {
176 if (ns_exec.GetJob(job, 180)) {
177 if (job.input == "DIE") {
178 LOG_POST(Info << "Got poison pill, exiting.");
179 done = true;
180 } else if (job.input != expected_input) {
181 LOG_POST(Error << "Unexpected input: " + input);
182 }
183
184 // do no job here, just delay for a little while
185 unsigned delay = m_TimeDistr.GetNextValue();
186 if (delay > 0)
187 SleepMilliSec(delay);
188
189 unsigned output_size = m_SizeDistr.GetNextValue();
190 if (output_size > 0) {
191 if (output_size > MAX_OUTPUT_SIZE)
192 output_size = MAX_OUTPUT_SIZE;
193
194 job.output.assign(output_buffer, output_size);
195 // job.output[output_size - 1] = '\n';
196 } else
197 job.output.erase();
198
199 job.ret_code = 0;
200
201 if (s_Random.GetRand() >= (s_Random.GetMax() + 1) *
202 TParam_FailureRate::GetDefault())
203 ns_exec.PutResult(job);
204 else {
205 job.error_msg = "DELIBERATE_FAILURE";
206 ns_exec.PutFailure(job);
207 }
208
209 if (jobs_processed.find(job.job_id) == jobs_processed.end()) {
210 jobs_processed.insert(job.job_id);
211 if (verbose) {
212 LOG_POST(Info << "Job " << job.job_id << " is done.");
213 }
214 } else {
215 LOG_POST(Error << "Job: " << job.job_id <<
216 " has already been processed.");
217 }
218 }
219 }
220
221 return CGridGlobals::GetInstance().GetExitCode();
222 }
223
224 END_NCBI_SCOPE
225
226
227 USING_NCBI_SCOPE;
228
main(int argc,const char * argv[])229 int main(int argc, const char* argv[])
230 {
231 GRID_APP_CHECK_VERSION_ARGS();
232
233 return CTestNetScheduleNode().AppMain(argc, argv);
234 }
235