1 /*  $Id: test_netschedule_client.cpp 591546 2019-08-16 16:59:06Z vasilche $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Anatoliy Kuznetsov, Dmitry Kazimirov
27  *
28  * File Description:  NetSchedule client test
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "test_netschedule_data.hpp"
35 
36 #include <connect/services/netschedule_api.hpp>
37 #include <connect/services/grid_app_version_info.hpp>
38 
39 #include <connect/ncbi_core_cxx.hpp>
40 
41 #include <util/random_gen.hpp>
42 #include <corelib/ncbiapp.hpp>
43 
44 #include <set>
45 
46 #define GRID_APP_NAME "test_netschedule_client"
47 
48 USING_NCBI_SCOPE;
49 
50 
51 ///////////////////////////////////////////////////////////////////////
52 
53 static CRandom s_Random;
54 
55 /// Service functions
56 
57 static int s_NumTokens;
58 static const int s_AvgTokenLength = 10;
59 static const int s_DTokenLength   = 2;
60 static char **s_Tokens;
61 static char s_TokenLetters[] = "0123456789"
62                                "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
63                                "abcdefghijklmnopqrstuvwxyz";
64 
s_SeedTokens(int num_aff_tokens)65 static void s_SeedTokens(int num_aff_tokens)
66 {
67     s_NumTokens = num_aff_tokens;
68     s_Tokens = new char* [s_NumTokens];
69     for (int n = 0; n < s_NumTokens; ++n) {
70         int len = s_Random.GetRand(s_AvgTokenLength - s_DTokenLength,
71                                    s_AvgTokenLength + s_DTokenLength);
72         s_Tokens[n] = new char[len+1];
73         for (int k = 0; k < len; ++k) {
74             s_Tokens[n][k] =
75                 s_TokenLetters[s_Random.GetRand(0, sizeof(s_TokenLetters) - 1)];
76         }
77         s_Tokens[n][len] = 0;
78     }
79 }
80 
81 
s_GenInput(int input_length)82 static string s_GenInput(int input_length)
83 {
84     string input;
85     input.reserve(input_length);
86     s_Random.SetSeed(CRandom::TValue(time(0)));
87     for (int n = 0; n < input_length; ++n) {
88         input.append(1, (char)s_Random.GetRand(0, 255));
89     }
90     return input;
91 }
92 
93 
s_GetRandomToken()94 static const char *s_GetRandomToken()
95 {
96     int n = s_Random.GetRand(0, s_NumTokens - 1);
97     return s_Tokens[n];
98 }
99 
100 
101 /// Test application
102 ///
103 /// @internal
104 ///
105 class CTestNetScheduleClient : public CNcbiApplication
106 {
107 public:
108     void Init(void);
109     int Run(void);
110 };
111 
112 
113 
Init(void)114 void CTestNetScheduleClient::Init(void)
115 {
116     InitOutputBuffer();
117 
118     CONNECT_Init();
119     SetDiagPostFlag(eDPF_Trace);
120     SetDiagPostLevel(eDiag_Info);
121 
122     // Setup command line arguments and parameters
123 
124     // Create command-line argument descriptions class
125     unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
126 
127     // Specify USAGE context
128     arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
129                               "NetSchedule client");
130 
131     arg_desc->AddKey("service", "ServiceName",
132         "NetSchedule service name (format: host:port or servcie_name)",
133         CArgDescriptions::eString);
134 
135     arg_desc->AddKey("queue", "QueueName",
136         "NetSchedule queue name",
137         CArgDescriptions::eString);
138 
139     arg_desc->AddOptionalKey("ilen", "InputLength", "Average input length",
140                              CArgDescriptions::eInteger);
141 
142     arg_desc->AddOptionalKey("maxruntime", "MaxRunTime",
143             "Maximum run time of this test", CArgDescriptions::eInteger);
144 
145     arg_desc->AddOptionalKey("input", "InputString", "Input string",
146                              CArgDescriptions::eString);
147 
148     arg_desc->AddOptionalKey("jobs", "jobs", "Number of jobs to submit",
149                              CArgDescriptions::eInteger);
150 
151     arg_desc->AddOptionalKey("naff", "AffinityTokens",
152         "Number of different affinities", CArgDescriptions::eInteger);
153 
154 
155     // Setup arg.descriptions for this application
156     SetupArgDescriptions(arg_desc.release());
157 }
158 
159 
Run(void)160 int CTestNetScheduleClient::Run(void)
161 {
162     const CArgs&   args = GetArgs();
163     const string&  service  = args["service"].AsString();
164     const string&  queue_name = args["queue"].AsString();
165 
166     unsigned jcount = 1000;
167     if (args["jobs"]) {
168         jcount = args["jobs"].AsInteger();
169         if (jcount == 0) {
170             fputs("The 'jobs' parameter cannot be zero.\n", stderr);
171             return 1;
172         }
173     }
174 
175     unsigned naff = 17;
176     if (args["naff"])
177         naff = args["naff"].AsInteger();
178     s_SeedTokens(naff);
179 
180     unsigned input_length = 0;
181     if (args["ilen"])
182         input_length = args["ilen"].AsInteger();
183 
184     unsigned maximum_runtime = 60;
185     if (args["maxruntime"])
186         maximum_runtime = args["maxruntime"].AsInteger();
187 
188     CNetScheduleAPI::EJobStatus status;
189     CNetScheduleAPI cl(service, "client_test", queue_name);
190 
191     STimeout comm_timeout;
192     comm_timeout.sec  = 1200;
193     comm_timeout.usec = 0;
194     cl.GetService().GetServerPool().SetCommunicationTimeout(comm_timeout);
195 
196     CNetScheduleSubmitter submitter = cl.GetSubmitter();
197 
198     string input;
199     if (args["input"]) {
200         input = args["input"].AsString();
201     } else if (args["ilen"]) {
202         input = s_GenInput(input_length);
203     } else {
204         input = "Hello " + queue_name;
205     }
206 
207     if (0)
208     {{
209 
210     NcbiCout << "SubmitAndWait..." << NcbiEndl;
211     unsigned wait_time = 30;
212     CNetScheduleJob j1(input);
213     status = submitter.SubmitJobAndWait(j1, wait_time);
214     if (status == CNetScheduleAPI::eDone) {
215         NcbiCout << j1.job_id << " done." << NcbiEndl;
216     } else {
217         NcbiCout << j1.job_id << " is not done in "
218                  << wait_time << " seconds." << NcbiEndl;
219     }
220     NcbiCout << "SubmitAndWait...done." << NcbiEndl;
221     }}
222 
223     set<string> submitted_job_ids;
224 
225     CStopWatch sw(CStopWatch::eStart);
226 
227     {{
228     NcbiCout << "Batch submit " << jcount << " jobs..." << NcbiEndl;
229 
230     unsigned batch_size = 1000;
231 
232     for (unsigned i = 0; i < jcount; i += batch_size) {
233         vector<CNetScheduleJob> jobs;
234         if (jcount - i < batch_size)
235             batch_size = jcount - i;
236         for (unsigned j = 0; j < batch_size; ++j) {
237             CNetScheduleJob job(input);
238             job.affinity = s_GetRandomToken();
239             jobs.push_back(job);
240         }
241         submitter.SubmitJobBatch(jobs);
242         ITERATE(vector<CNetScheduleJob>, job, jobs) {
243             submitted_job_ids.insert(job->job_id);
244         }
245         NcbiCout << "." << flush;
246     }
247 
248     _ASSERT(submitted_job_ids.size() == jcount);
249 
250     NcbiCout << NcbiEndl << "Done." << NcbiEndl;
251     double elapsed = sw.Elapsed();
252 
253     NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
254     NcbiCout << "Avg time: " << (elapsed / jcount) << " sec, "
255              << (jcount/elapsed) << " jobs/sec" << NcbiEndl;
256     }}
257 
258 
259     // Waiting for jobs to be done
260 
261     NcbiCout << "Waiting for " << jcount << " jobs..." << NcbiEndl;
262 
263     for (;;) {
264         string job_id;
265         string auth_token;
266         CNetScheduleAPI::EJobStatus job_status;
267 
268         while (submitter.Read(&job_id, &auth_token, &job_status)) {
269             submitted_job_ids.erase(job_id);
270             submitter.ReadConfirm(job_id, auth_token);
271         }
272 
273         if (submitted_job_ids.empty())
274             break;
275 
276         bool all_deliberately_failed = true;
277 
278         CNetScheduleJob job;
279 
280         ITERATE(set<string>, it, submitted_job_ids) {
281             job.job_id = *it;
282             if (cl.GetJobDetails(job) != CNetScheduleAPI::eFailed ||
283                     job.error_msg != "DELIBERATE_FAILURE") {
284                 all_deliberately_failed = false;
285                 break;
286             }
287         }
288 
289         if (all_deliberately_failed)
290             break;
291 
292         if (sw.Elapsed() > double(maximum_runtime)) {
293             fprintf(stderr, "The test has exceeded its maximum "
294                     "run time of %u seconds.\nUse '-maxruntime' "
295                     "to override.\n", maximum_runtime);
296             return 3;
297         }
298 
299         NcbiCout << submitted_job_ids.size() << " jobs to go" << NcbiEndl;
300         SleepMilliSec(2000);
301     }
302 
303     NcbiCout << "Done. Wall-clock time: " << sw.Elapsed() << NcbiEndl;
304     if (!submitted_job_ids.empty())
305         NcbiCout << submitted_job_ids.size() <<
306             " jobs were deliberately failed by the worker node(s)." << NcbiEndl;
307 
308     return 0;
309 }
310 
311 
main(int argc,const char * argv[])312 int main(int argc, const char* argv[])
313 {
314     GRID_APP_CHECK_VERSION_ARGS();
315 
316     return CTestNetScheduleClient().AppMain(argc, argv);
317 }
318