1 /*  $Id: test_netschedule_stress.cpp 574016 2018-11-05 16:55:15Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Anatoliy Kuznetsov
27  *
28  * File Description:  NetSchedule stress test (used for performance tuning)
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbiapp.hpp>
34 #include <corelib/ncbiargs.hpp>
35 #include <corelib/ncbienv.hpp>
36 #include <corelib/ncbireg.hpp>
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/ncbimisc.hpp>
39 
40 #include <connect/services/netschedule_api.hpp>
41 #include <connect/services/grid_app_version_info.hpp>
42 
43 #include <connect/ncbi_socket.hpp>
44 #include <connect/ncbi_core_cxx.hpp>
45 #include <connect/ncbi_types.h>
46 
47 #define GRID_APP_NAME "test_netschedule_stress"
48 
49 USING_NCBI_SCOPE;
50 
51 
52 ///////////////////////////////////////////////////////////////////////
53 
54 /// Test application
55 ///
56 /// @internal
57 ///
58 class CTestNetScheduleStress : public CNcbiApplication
59 {
60 public:
61     void Init(void);
62     int Run(void);
63 };
64 
65 
66 
Init(void)67 void CTestNetScheduleStress::Init(void)
68 {
69     CONNECT_Init();
70     SetDiagPostFlag(eDPF_Trace);
71     SetDiagPostLevel(eDiag_Info);
72 
73     // Setup command line arguments and parameters
74 
75     // Create command-line argument descriptions class
76     unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
77 
78     // Specify USAGE context
79     arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
80                               "NetSchedule stress test prog='test 1.2'");
81 
82     arg_desc->AddKey("service",
83                      "service_name",
84                      "NetSchedule service name (format: host:port or servcie_name).",
85                      CArgDescriptions::eString);
86 
87     arg_desc->AddKey("queue",
88                      "queue_name",
89                      "NetSchedule queue name (like: noname).",
90                      CArgDescriptions::eString);
91 
92 
93     arg_desc->AddOptionalKey("jobs",
94                              "jobs",
95                              "Number of jobs to submit",
96                              CArgDescriptions::eInteger);
97 
98     arg_desc->AddDefaultKey("batch", "batch",
99                             "Test batch submit",
100                             CArgDescriptions::eBoolean,
101                             "true",
102                             0);
103 
104     // Setup arg.descriptions for this application
105     SetupArgDescriptions(arg_desc.release());
106 }
107 
108 
TestBatchSubmit(const string & service,const string & queue_name,unsigned jcount)109 void TestBatchSubmit(const string& service,
110                      const string& queue_name, unsigned jcount)
111 {
112     CNetScheduleAPI cl(service, kEmptyStr, queue_name);
113     cl.SetProgramVersion("test 1.0.0");
114 
115     typedef vector<CNetScheduleJob> TJobs;
116     TJobs jobs;
117 
118     for (unsigned i = 0; i < jcount; ++i) {
119         CNetScheduleJob job("HELLO BSUBMIT", "affinity",
120                             CNetScheduleAPI::eExclusiveJob);
121         jobs.push_back(job);
122     }
123 
124     {{
125     NcbiCout << "Submit " << jobs.size() << " jobs..." << NcbiEndl;
126 
127     CNetScheduleSubmitter submitter = cl.GetSubmitter();
128     CStopWatch sw(CStopWatch::eStart);
129     submitter.SubmitJobBatch(jobs);
130 
131     NcbiCout << NcbiEndl << "Done." << NcbiEndl;
132     double elapsed = sw.Elapsed();
133     double rate = (double)jobs.size() / elapsed;
134 
135     NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
136     NcbiCout << "Elapsed: "  << elapsed << " sec." << NcbiEndl;
137     NcbiCout << "Rate: "     << rate    << " jobs/sec." << NcbiEndl;
138     }}
139 
140     ITERATE(TJobs, it, jobs) {
141         _ASSERT(!it->job_id.empty());
142     }
143 
144     NcbiCout << "Get Jobs with permanent connection... " << NcbiEndl;
145 
146     // Fetch it back
147 
148     //    cl.SetCommunicationTimeout().sec = 20;
149 
150     {{
151     unsigned cnt = 0;
152     CNetScheduleJob job("INPUT");
153     job.output = "DONE";
154 
155     CStopWatch sw(CStopWatch::eStart);
156     CNetScheduleExecutor executor = cl.GetExecutor();
157 
158     for (;1;++cnt) {
159         job.output = "DONE";
160         job.ret_code = 0;
161         if (!executor.GetJob(job))
162             break;
163         executor.PutResult(job);
164     }
165     NcbiCout << NcbiEndl << "Done." << NcbiEndl;
166     double elapsed = sw.Elapsed();
167     double rate = cnt / elapsed;
168 
169     NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
170     NcbiCout << "Jobs processed: " << cnt         << NcbiEndl;
171     NcbiCout << "Elapsed: "        << elapsed     << " sec."      << NcbiEndl;
172     NcbiCout << "Rate: "           << rate        << " jobs/sec." << NcbiEndl;
173     }}
174 }
175 
176 
Run(void)177 int CTestNetScheduleStress::Run(void)
178 {
179     const CArgs& args = GetArgs();
180     const string&  service  = args["service"].AsString();
181     const string&  queue = args["queue"].AsString();
182     bool batch = args["batch"].AsBoolean();
183 
184     unsigned jcount = 10000;
185     if (args["jobs"]) {
186         jcount = args["jobs"].AsInteger();
187     }
188 
189     //    TestRunTimeout(host, port, queue_name);
190 //    TestNetwork(host, port, queue_name);
191 
192     if (batch) {
193         TestBatchSubmit(service, queue, jcount);
194         return 0;
195     }
196 
197     CNetScheduleAPI::EJobStatus status;
198     CNetScheduleAPI cl(service, "stress_test", queue);
199     cl.SetProgramVersion("test wn 1.0.1");
200 
201     cl.GetAdmin().PrintServerVersion(NcbiCout);
202 
203     //        SleepSec(40);
204     //        return 0;
205     string input = "Hello " + queue;
206 
207     CNetScheduleSubmitter submitter = cl.GetSubmitter();
208     CNetScheduleExecutor executor = cl.GetExecutor();
209 
210     CNetScheduleJob job(input);
211     job.progress_msg = "pmsg";
212     submitter.SubmitJob(job);
213     NcbiCout << job.job_id << NcbiEndl;
214     submitter.GetProgressMsg(job);
215     _ASSERT(job.progress_msg == "pmsg");
216 
217     // test progress message
218     job.progress_msg = "progress report message";
219     executor.PutProgressMsg(job);
220 
221     string pmsg = job.progress_msg;
222     job.progress_msg = "";
223     submitter.GetProgressMsg(job);
224     NcbiCout << pmsg << NcbiEndl;
225     _ASSERT(pmsg == job.progress_msg);
226 
227 
228     job.error_msg = "test error\r\nmessage";
229     executor.PutFailure(job);
230     status = cl.GetJobDetails(job);
231 
232     //    _ASSERT(status == CNetScheduleAPI::eFailed);
233     //    NcbiCout << err_msg << NcbiEndl;
234     //    _ASSERT(err_msg == err);
235 
236     //> ?????????? How should it really work??????????????
237     if (status != CNetScheduleAPI::eFailed) {
238         NcbiCerr << "Job " << job.job_id << " succeeded!" << NcbiEndl;
239     } else {
240         NcbiCout << job.error_msg << NcbiEndl;
241         /*
242         if (job.error_msg != err) {
243             NcbiCerr << "Incorrect error message: " << job.error_msg << NcbiEndl;
244         }
245         */
246     }
247     //< ?????????? How should it really work??????????????
248 
249     submitter.CancelJob(job.job_id);
250 
251     _VERIFY(executor.GetJobStatus(job) == CNetScheduleAPI::eCanceled);
252 
253     vector<string> jobs;
254     jobs.reserve(jcount);
255 
256     {{
257     CStopWatch sw(CStopWatch::eStart);
258 
259     NcbiCout << "Submit " << jcount << " jobs..." << NcbiEndl;
260 
261     for (unsigned i = 0; i < jcount; ++i) {
262         CNetScheduleJob job2(input);
263         submitter.SubmitJob(job2);
264         jobs.push_back( job2.job_id );
265         if (i % 1000 == 0) {
266             NcbiCout << "." << flush;
267         }
268     }
269     NcbiCout << NcbiEndl << "Done." << NcbiEndl;
270     double elapsed = sw.Elapsed();
271     double avg = elapsed / jcount;
272 
273     NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
274     NcbiCout << "Elapsed: "  << elapsed << " sec." << NcbiEndl;
275     NcbiCout << "Avg time: " << avg << " sec." << NcbiEndl;
276     }}
277 
278 
279 
280     NcbiCout << NcbiEndl << "Waiting..." << NcbiEndl;
281     SleepMilliSec(40 * 1000);
282     NcbiCout << NcbiEndl << "Ok." << NcbiEndl;
283 
284     {{
285     NcbiCout << NcbiEndl
286              << "GetStatus " << jobs.size() << " jobs..." << NcbiEndl;
287 
288     CStopWatch sw(CStopWatch::eStart);
289 
290     CNetScheduleJob job3;
291     unsigned i = 0;
292     NON_CONST_ITERATE(vector<string>, it, jobs) {
293         job3.job_id = *it;
294         /*status =*/ executor.GetJobStatus(job3);
295         //status = cl.GetStatus(jk, &ret_code, &output);
296         if (i++ % 1000 == 0) {
297             NcbiCout << "." << flush;
298         }
299     }
300 
301     double elapsed = sw.Elapsed();
302     if (jcount) {
303         double avg = elapsed / (double)jcount;
304 
305         NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
306         NcbiCout << "Elapsed :"  << elapsed << " sec." << NcbiEndl;
307         NcbiCout << "Avg time :" << avg << " sec." << NcbiEndl;
308     }
309     }}
310 
311 
312 
313     NcbiCout << NcbiEndl << "Waiting..." << NcbiEndl;
314     SleepMilliSec(40 * 1000);
315     NcbiCout << NcbiEndl << "Ok." << NcbiEndl;
316 
317     vector<string> jobs_returned;
318     jobs_returned.reserve(jcount);
319 
320     {{
321     NcbiCout << NcbiEndl << "Take-Return jobs..." << NcbiEndl;
322     CStopWatch sw(CStopWatch::eStart);
323 
324     unsigned cnt = 0;
325     for (; cnt < jcount/2; ++cnt) {
326         CNetScheduleJob job4;
327         if (!executor.GetJob(job4, 60))
328             break;
329         executor.ReturnJob(job4);
330         jobs_returned.push_back(job4.job_id);
331     }
332     NcbiCout << "Returned " << cnt << " jobs." << NcbiEndl;
333 
334     double elapsed = sw.Elapsed();
335     if (cnt) {
336         double avg = elapsed / cnt;
337 
338         NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
339         NcbiCout << "Jobs processed: " << cnt << NcbiEndl;
340         NcbiCout << "Elapsed: "  << elapsed << " sec." << NcbiEndl;
341         NcbiCout << "Avg time: " << avg << " sec." << NcbiEndl;
342     }
343 
344 
345 
346 
347     NcbiCout << NcbiEndl << "Waiting..." << NcbiEndl;
348     SleepMilliSec(40 * 1000);
349     NcbiCout << NcbiEndl << "Ok." << NcbiEndl;
350 
351     }}
352 
353     vector<string> jobs_processed;
354     jobs_processed.reserve(jcount);
355 
356     {{
357     NcbiCout << NcbiEndl << "Processing..." << NcbiEndl;
358     SleepMilliSec(8 * 1000);
359     CStopWatch sw(CStopWatch::eStart);
360 
361     unsigned cnt = 0;
362 
363     for (; 1; ++cnt) {
364         CNetScheduleJob job5;
365 
366         if (!executor.GetJob(job5))
367             break;
368 
369         jobs_processed.push_back(job5.job_id);
370 
371         job5.output = "DONE " + queue;
372         executor.PutResult(job5);
373     }
374     double elapsed = sw.Elapsed();
375 
376     if (cnt) {
377         double avg = elapsed / cnt;
378 
379         NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
380         NcbiCout << "Jobs processed: " << cnt << NcbiEndl;
381         NcbiCout << "Elapsed: "  << elapsed << " sec." << NcbiEndl;
382         NcbiCout << "Avg time: " << avg << " sec." << NcbiEndl;
383     }
384 
385     }}
386 
387 
388     {{
389     NcbiCout << NcbiEndl << "Check returned jobs..." << NcbiEndl;
390     SleepMilliSec(5 * 1000);
391     CStopWatch sw(CStopWatch::eStart);
392 
393     NON_CONST_ITERATE(vector<string>, it, jobs_returned) {
394         const string& jk = *it;
395         status = submitter.GetJobStatus(jk);
396         switch (status) {
397         case CNetScheduleAPI::ePending:
398             NcbiCout << "Job pending: " << jk << NcbiEndl;
399             break;
400         default:
401             break;
402         }
403     }
404 
405     }}
406 
407     NcbiCout << NcbiEndl << "Done." << NcbiEndl;
408     return 0;
409 }
410 
411 
main(int argc,const char * argv[])412 int main(int argc, const char* argv[])
413 {
414     GRID_APP_CHECK_VERSION_ARGS();
415 
416     return CTestNetScheduleStress().AppMain(argc, argv);
417 }
418