1 /* $Id: test_netschedule_stress.cpp 574016 2018-11-05 16:55:15Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Anatoliy Kuznetsov
27 *
28 * File Description: NetSchedule stress test (used for performance tuning)
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbiapp.hpp>
34 #include <corelib/ncbiargs.hpp>
35 #include <corelib/ncbienv.hpp>
36 #include <corelib/ncbireg.hpp>
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/ncbimisc.hpp>
39
40 #include <connect/services/netschedule_api.hpp>
41 #include <connect/services/grid_app_version_info.hpp>
42
43 #include <connect/ncbi_socket.hpp>
44 #include <connect/ncbi_core_cxx.hpp>
45 #include <connect/ncbi_types.h>
46
47 #define GRID_APP_NAME "test_netschedule_stress"
48
49 USING_NCBI_SCOPE;
50
51
52 ///////////////////////////////////////////////////////////////////////
53
54 /// Test application
55 ///
56 /// @internal
57 ///
58 class CTestNetScheduleStress : public CNcbiApplication
59 {
60 public:
61 void Init(void);
62 int Run(void);
63 };
64
65
66
Init(void)67 void CTestNetScheduleStress::Init(void)
68 {
69 CONNECT_Init();
70 SetDiagPostFlag(eDPF_Trace);
71 SetDiagPostLevel(eDiag_Info);
72
73 // Setup command line arguments and parameters
74
75 // Create command-line argument descriptions class
76 unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
77
78 // Specify USAGE context
79 arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
80 "NetSchedule stress test prog='test 1.2'");
81
82 arg_desc->AddKey("service",
83 "service_name",
84 "NetSchedule service name (format: host:port or servcie_name).",
85 CArgDescriptions::eString);
86
87 arg_desc->AddKey("queue",
88 "queue_name",
89 "NetSchedule queue name (like: noname).",
90 CArgDescriptions::eString);
91
92
93 arg_desc->AddOptionalKey("jobs",
94 "jobs",
95 "Number of jobs to submit",
96 CArgDescriptions::eInteger);
97
98 arg_desc->AddDefaultKey("batch", "batch",
99 "Test batch submit",
100 CArgDescriptions::eBoolean,
101 "true",
102 0);
103
104 // Setup arg.descriptions for this application
105 SetupArgDescriptions(arg_desc.release());
106 }
107
108
TestBatchSubmit(const string & service,const string & queue_name,unsigned jcount)109 void TestBatchSubmit(const string& service,
110 const string& queue_name, unsigned jcount)
111 {
112 CNetScheduleAPI cl(service, kEmptyStr, queue_name);
113 cl.SetProgramVersion("test 1.0.0");
114
115 typedef vector<CNetScheduleJob> TJobs;
116 TJobs jobs;
117
118 for (unsigned i = 0; i < jcount; ++i) {
119 CNetScheduleJob job("HELLO BSUBMIT", "affinity",
120 CNetScheduleAPI::eExclusiveJob);
121 jobs.push_back(job);
122 }
123
124 {{
125 NcbiCout << "Submit " << jobs.size() << " jobs..." << NcbiEndl;
126
127 CNetScheduleSubmitter submitter = cl.GetSubmitter();
128 CStopWatch sw(CStopWatch::eStart);
129 submitter.SubmitJobBatch(jobs);
130
131 NcbiCout << NcbiEndl << "Done." << NcbiEndl;
132 double elapsed = sw.Elapsed();
133 double rate = (double)jobs.size() / elapsed;
134
135 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
136 NcbiCout << "Elapsed: " << elapsed << " sec." << NcbiEndl;
137 NcbiCout << "Rate: " << rate << " jobs/sec." << NcbiEndl;
138 }}
139
140 ITERATE(TJobs, it, jobs) {
141 _ASSERT(!it->job_id.empty());
142 }
143
144 NcbiCout << "Get Jobs with permanent connection... " << NcbiEndl;
145
146 // Fetch it back
147
148 // cl.SetCommunicationTimeout().sec = 20;
149
150 {{
151 unsigned cnt = 0;
152 CNetScheduleJob job("INPUT");
153 job.output = "DONE";
154
155 CStopWatch sw(CStopWatch::eStart);
156 CNetScheduleExecutor executor = cl.GetExecutor();
157
158 for (;1;++cnt) {
159 job.output = "DONE";
160 job.ret_code = 0;
161 if (!executor.GetJob(job))
162 break;
163 executor.PutResult(job);
164 }
165 NcbiCout << NcbiEndl << "Done." << NcbiEndl;
166 double elapsed = sw.Elapsed();
167 double rate = cnt / elapsed;
168
169 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
170 NcbiCout << "Jobs processed: " << cnt << NcbiEndl;
171 NcbiCout << "Elapsed: " << elapsed << " sec." << NcbiEndl;
172 NcbiCout << "Rate: " << rate << " jobs/sec." << NcbiEndl;
173 }}
174 }
175
176
Run(void)177 int CTestNetScheduleStress::Run(void)
178 {
179 const CArgs& args = GetArgs();
180 const string& service = args["service"].AsString();
181 const string& queue = args["queue"].AsString();
182 bool batch = args["batch"].AsBoolean();
183
184 unsigned jcount = 10000;
185 if (args["jobs"]) {
186 jcount = args["jobs"].AsInteger();
187 }
188
189 // TestRunTimeout(host, port, queue_name);
190 // TestNetwork(host, port, queue_name);
191
192 if (batch) {
193 TestBatchSubmit(service, queue, jcount);
194 return 0;
195 }
196
197 CNetScheduleAPI::EJobStatus status;
198 CNetScheduleAPI cl(service, "stress_test", queue);
199 cl.SetProgramVersion("test wn 1.0.1");
200
201 cl.GetAdmin().PrintServerVersion(NcbiCout);
202
203 // SleepSec(40);
204 // return 0;
205 string input = "Hello " + queue;
206
207 CNetScheduleSubmitter submitter = cl.GetSubmitter();
208 CNetScheduleExecutor executor = cl.GetExecutor();
209
210 CNetScheduleJob job(input);
211 job.progress_msg = "pmsg";
212 submitter.SubmitJob(job);
213 NcbiCout << job.job_id << NcbiEndl;
214 submitter.GetProgressMsg(job);
215 _ASSERT(job.progress_msg == "pmsg");
216
217 // test progress message
218 job.progress_msg = "progress report message";
219 executor.PutProgressMsg(job);
220
221 string pmsg = job.progress_msg;
222 job.progress_msg = "";
223 submitter.GetProgressMsg(job);
224 NcbiCout << pmsg << NcbiEndl;
225 _ASSERT(pmsg == job.progress_msg);
226
227
228 job.error_msg = "test error\r\nmessage";
229 executor.PutFailure(job);
230 status = cl.GetJobDetails(job);
231
232 // _ASSERT(status == CNetScheduleAPI::eFailed);
233 // NcbiCout << err_msg << NcbiEndl;
234 // _ASSERT(err_msg == err);
235
236 //> ?????????? How should it really work??????????????
237 if (status != CNetScheduleAPI::eFailed) {
238 NcbiCerr << "Job " << job.job_id << " succeeded!" << NcbiEndl;
239 } else {
240 NcbiCout << job.error_msg << NcbiEndl;
241 /*
242 if (job.error_msg != err) {
243 NcbiCerr << "Incorrect error message: " << job.error_msg << NcbiEndl;
244 }
245 */
246 }
247 //< ?????????? How should it really work??????????????
248
249 submitter.CancelJob(job.job_id);
250
251 _VERIFY(executor.GetJobStatus(job) == CNetScheduleAPI::eCanceled);
252
253 vector<string> jobs;
254 jobs.reserve(jcount);
255
256 {{
257 CStopWatch sw(CStopWatch::eStart);
258
259 NcbiCout << "Submit " << jcount << " jobs..." << NcbiEndl;
260
261 for (unsigned i = 0; i < jcount; ++i) {
262 CNetScheduleJob job2(input);
263 submitter.SubmitJob(job2);
264 jobs.push_back( job2.job_id );
265 if (i % 1000 == 0) {
266 NcbiCout << "." << flush;
267 }
268 }
269 NcbiCout << NcbiEndl << "Done." << NcbiEndl;
270 double elapsed = sw.Elapsed();
271 double avg = elapsed / jcount;
272
273 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
274 NcbiCout << "Elapsed: " << elapsed << " sec." << NcbiEndl;
275 NcbiCout << "Avg time: " << avg << " sec." << NcbiEndl;
276 }}
277
278
279
280 NcbiCout << NcbiEndl << "Waiting..." << NcbiEndl;
281 SleepMilliSec(40 * 1000);
282 NcbiCout << NcbiEndl << "Ok." << NcbiEndl;
283
284 {{
285 NcbiCout << NcbiEndl
286 << "GetStatus " << jobs.size() << " jobs..." << NcbiEndl;
287
288 CStopWatch sw(CStopWatch::eStart);
289
290 CNetScheduleJob job3;
291 unsigned i = 0;
292 NON_CONST_ITERATE(vector<string>, it, jobs) {
293 job3.job_id = *it;
294 /*status =*/ executor.GetJobStatus(job3);
295 //status = cl.GetStatus(jk, &ret_code, &output);
296 if (i++ % 1000 == 0) {
297 NcbiCout << "." << flush;
298 }
299 }
300
301 double elapsed = sw.Elapsed();
302 if (jcount) {
303 double avg = elapsed / (double)jcount;
304
305 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
306 NcbiCout << "Elapsed :" << elapsed << " sec." << NcbiEndl;
307 NcbiCout << "Avg time :" << avg << " sec." << NcbiEndl;
308 }
309 }}
310
311
312
313 NcbiCout << NcbiEndl << "Waiting..." << NcbiEndl;
314 SleepMilliSec(40 * 1000);
315 NcbiCout << NcbiEndl << "Ok." << NcbiEndl;
316
317 vector<string> jobs_returned;
318 jobs_returned.reserve(jcount);
319
320 {{
321 NcbiCout << NcbiEndl << "Take-Return jobs..." << NcbiEndl;
322 CStopWatch sw(CStopWatch::eStart);
323
324 unsigned cnt = 0;
325 for (; cnt < jcount/2; ++cnt) {
326 CNetScheduleJob job4;
327 if (!executor.GetJob(job4, 60))
328 break;
329 executor.ReturnJob(job4);
330 jobs_returned.push_back(job4.job_id);
331 }
332 NcbiCout << "Returned " << cnt << " jobs." << NcbiEndl;
333
334 double elapsed = sw.Elapsed();
335 if (cnt) {
336 double avg = elapsed / cnt;
337
338 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
339 NcbiCout << "Jobs processed: " << cnt << NcbiEndl;
340 NcbiCout << "Elapsed: " << elapsed << " sec." << NcbiEndl;
341 NcbiCout << "Avg time: " << avg << " sec." << NcbiEndl;
342 }
343
344
345
346
347 NcbiCout << NcbiEndl << "Waiting..." << NcbiEndl;
348 SleepMilliSec(40 * 1000);
349 NcbiCout << NcbiEndl << "Ok." << NcbiEndl;
350
351 }}
352
353 vector<string> jobs_processed;
354 jobs_processed.reserve(jcount);
355
356 {{
357 NcbiCout << NcbiEndl << "Processing..." << NcbiEndl;
358 SleepMilliSec(8 * 1000);
359 CStopWatch sw(CStopWatch::eStart);
360
361 unsigned cnt = 0;
362
363 for (; 1; ++cnt) {
364 CNetScheduleJob job5;
365
366 if (!executor.GetJob(job5))
367 break;
368
369 jobs_processed.push_back(job5.job_id);
370
371 job5.output = "DONE " + queue;
372 executor.PutResult(job5);
373 }
374 double elapsed = sw.Elapsed();
375
376 if (cnt) {
377 double avg = elapsed / cnt;
378
379 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
380 NcbiCout << "Jobs processed: " << cnt << NcbiEndl;
381 NcbiCout << "Elapsed: " << elapsed << " sec." << NcbiEndl;
382 NcbiCout << "Avg time: " << avg << " sec." << NcbiEndl;
383 }
384
385 }}
386
387
388 {{
389 NcbiCout << NcbiEndl << "Check returned jobs..." << NcbiEndl;
390 SleepMilliSec(5 * 1000);
391 CStopWatch sw(CStopWatch::eStart);
392
393 NON_CONST_ITERATE(vector<string>, it, jobs_returned) {
394 const string& jk = *it;
395 status = submitter.GetJobStatus(jk);
396 switch (status) {
397 case CNetScheduleAPI::ePending:
398 NcbiCout << "Job pending: " << jk << NcbiEndl;
399 break;
400 default:
401 break;
402 }
403 }
404
405 }}
406
407 NcbiCout << NcbiEndl << "Done." << NcbiEndl;
408 return 0;
409 }
410
411
main(int argc,const char * argv[])412 int main(int argc, const char* argv[])
413 {
414 GRID_APP_CHECK_VERSION_ARGS();
415
416 return CTestNetScheduleStress().AppMain(argc, argv);
417 }
418