1 /* $Id: test_netschedule_client.cpp 591546 2019-08-16 16:59:06Z vasilche $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Anatoliy Kuznetsov, Dmitry Kazimirov
27 *
28 * File Description: NetSchedule client test
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "test_netschedule_data.hpp"
35
36 #include <connect/services/netschedule_api.hpp>
37 #include <connect/services/grid_app_version_info.hpp>
38
39 #include <connect/ncbi_core_cxx.hpp>
40
41 #include <util/random_gen.hpp>
42 #include <corelib/ncbiapp.hpp>
43
44 #include <set>
45
46 #define GRID_APP_NAME "test_netschedule_client"
47
48 USING_NCBI_SCOPE;
49
50
51 ///////////////////////////////////////////////////////////////////////
52
53 static CRandom s_Random;
54
55 /// Service functions
56
57 static int s_NumTokens;
58 static const int s_AvgTokenLength = 10;
59 static const int s_DTokenLength = 2;
60 static char **s_Tokens;
61 static char s_TokenLetters[] = "0123456789"
62 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
63 "abcdefghijklmnopqrstuvwxyz";
64
s_SeedTokens(int num_aff_tokens)65 static void s_SeedTokens(int num_aff_tokens)
66 {
67 s_NumTokens = num_aff_tokens;
68 s_Tokens = new char* [s_NumTokens];
69 for (int n = 0; n < s_NumTokens; ++n) {
70 int len = s_Random.GetRand(s_AvgTokenLength - s_DTokenLength,
71 s_AvgTokenLength + s_DTokenLength);
72 s_Tokens[n] = new char[len+1];
73 for (int k = 0; k < len; ++k) {
74 s_Tokens[n][k] =
75 s_TokenLetters[s_Random.GetRand(0, sizeof(s_TokenLetters) - 1)];
76 }
77 s_Tokens[n][len] = 0;
78 }
79 }
80
81
s_GenInput(int input_length)82 static string s_GenInput(int input_length)
83 {
84 string input;
85 input.reserve(input_length);
86 s_Random.SetSeed(CRandom::TValue(time(0)));
87 for (int n = 0; n < input_length; ++n) {
88 input.append(1, (char)s_Random.GetRand(0, 255));
89 }
90 return input;
91 }
92
93
s_GetRandomToken()94 static const char *s_GetRandomToken()
95 {
96 int n = s_Random.GetRand(0, s_NumTokens - 1);
97 return s_Tokens[n];
98 }
99
100
101 /// Test application
102 ///
103 /// @internal
104 ///
105 class CTestNetScheduleClient : public CNcbiApplication
106 {
107 public:
108 void Init(void);
109 int Run(void);
110 };
111
112
113
Init(void)114 void CTestNetScheduleClient::Init(void)
115 {
116 InitOutputBuffer();
117
118 CONNECT_Init();
119 SetDiagPostFlag(eDPF_Trace);
120 SetDiagPostLevel(eDiag_Info);
121
122 // Setup command line arguments and parameters
123
124 // Create command-line argument descriptions class
125 unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
126
127 // Specify USAGE context
128 arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
129 "NetSchedule client");
130
131 arg_desc->AddKey("service", "ServiceName",
132 "NetSchedule service name (format: host:port or servcie_name)",
133 CArgDescriptions::eString);
134
135 arg_desc->AddKey("queue", "QueueName",
136 "NetSchedule queue name",
137 CArgDescriptions::eString);
138
139 arg_desc->AddOptionalKey("ilen", "InputLength", "Average input length",
140 CArgDescriptions::eInteger);
141
142 arg_desc->AddOptionalKey("maxruntime", "MaxRunTime",
143 "Maximum run time of this test", CArgDescriptions::eInteger);
144
145 arg_desc->AddOptionalKey("input", "InputString", "Input string",
146 CArgDescriptions::eString);
147
148 arg_desc->AddOptionalKey("jobs", "jobs", "Number of jobs to submit",
149 CArgDescriptions::eInteger);
150
151 arg_desc->AddOptionalKey("naff", "AffinityTokens",
152 "Number of different affinities", CArgDescriptions::eInteger);
153
154
155 // Setup arg.descriptions for this application
156 SetupArgDescriptions(arg_desc.release());
157 }
158
159
Run(void)160 int CTestNetScheduleClient::Run(void)
161 {
162 const CArgs& args = GetArgs();
163 const string& service = args["service"].AsString();
164 const string& queue_name = args["queue"].AsString();
165
166 unsigned jcount = 1000;
167 if (args["jobs"]) {
168 jcount = args["jobs"].AsInteger();
169 if (jcount == 0) {
170 fputs("The 'jobs' parameter cannot be zero.\n", stderr);
171 return 1;
172 }
173 }
174
175 unsigned naff = 17;
176 if (args["naff"])
177 naff = args["naff"].AsInteger();
178 s_SeedTokens(naff);
179
180 unsigned input_length = 0;
181 if (args["ilen"])
182 input_length = args["ilen"].AsInteger();
183
184 unsigned maximum_runtime = 60;
185 if (args["maxruntime"])
186 maximum_runtime = args["maxruntime"].AsInteger();
187
188 CNetScheduleAPI::EJobStatus status;
189 CNetScheduleAPI cl(service, "client_test", queue_name);
190
191 STimeout comm_timeout;
192 comm_timeout.sec = 1200;
193 comm_timeout.usec = 0;
194 cl.GetService().GetServerPool().SetCommunicationTimeout(comm_timeout);
195
196 CNetScheduleSubmitter submitter = cl.GetSubmitter();
197
198 string input;
199 if (args["input"]) {
200 input = args["input"].AsString();
201 } else if (args["ilen"]) {
202 input = s_GenInput(input_length);
203 } else {
204 input = "Hello " + queue_name;
205 }
206
207 if (0)
208 {{
209
210 NcbiCout << "SubmitAndWait..." << NcbiEndl;
211 unsigned wait_time = 30;
212 CNetScheduleJob j1(input);
213 status = submitter.SubmitJobAndWait(j1, wait_time);
214 if (status == CNetScheduleAPI::eDone) {
215 NcbiCout << j1.job_id << " done." << NcbiEndl;
216 } else {
217 NcbiCout << j1.job_id << " is not done in "
218 << wait_time << " seconds." << NcbiEndl;
219 }
220 NcbiCout << "SubmitAndWait...done." << NcbiEndl;
221 }}
222
223 set<string> submitted_job_ids;
224
225 CStopWatch sw(CStopWatch::eStart);
226
227 {{
228 NcbiCout << "Batch submit " << jcount << " jobs..." << NcbiEndl;
229
230 unsigned batch_size = 1000;
231
232 for (unsigned i = 0; i < jcount; i += batch_size) {
233 vector<CNetScheduleJob> jobs;
234 if (jcount - i < batch_size)
235 batch_size = jcount - i;
236 for (unsigned j = 0; j < batch_size; ++j) {
237 CNetScheduleJob job(input);
238 job.affinity = s_GetRandomToken();
239 jobs.push_back(job);
240 }
241 submitter.SubmitJobBatch(jobs);
242 ITERATE(vector<CNetScheduleJob>, job, jobs) {
243 submitted_job_ids.insert(job->job_id);
244 }
245 NcbiCout << "." << flush;
246 }
247
248 _ASSERT(submitted_job_ids.size() == jcount);
249
250 NcbiCout << NcbiEndl << "Done." << NcbiEndl;
251 double elapsed = sw.Elapsed();
252
253 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
254 NcbiCout << "Avg time: " << (elapsed / jcount) << " sec, "
255 << (jcount/elapsed) << " jobs/sec" << NcbiEndl;
256 }}
257
258
259 // Waiting for jobs to be done
260
261 NcbiCout << "Waiting for " << jcount << " jobs..." << NcbiEndl;
262
263 for (;;) {
264 string job_id;
265 string auth_token;
266 CNetScheduleAPI::EJobStatus job_status;
267
268 while (submitter.Read(&job_id, &auth_token, &job_status)) {
269 submitted_job_ids.erase(job_id);
270 submitter.ReadConfirm(job_id, auth_token);
271 }
272
273 if (submitted_job_ids.empty())
274 break;
275
276 bool all_deliberately_failed = true;
277
278 CNetScheduleJob job;
279
280 ITERATE(set<string>, it, submitted_job_ids) {
281 job.job_id = *it;
282 if (cl.GetJobDetails(job) != CNetScheduleAPI::eFailed ||
283 job.error_msg != "DELIBERATE_FAILURE") {
284 all_deliberately_failed = false;
285 break;
286 }
287 }
288
289 if (all_deliberately_failed)
290 break;
291
292 if (sw.Elapsed() > double(maximum_runtime)) {
293 fprintf(stderr, "The test has exceeded its maximum "
294 "run time of %u seconds.\nUse '-maxruntime' "
295 "to override.\n", maximum_runtime);
296 return 3;
297 }
298
299 NcbiCout << submitted_job_ids.size() << " jobs to go" << NcbiEndl;
300 SleepMilliSec(2000);
301 }
302
303 NcbiCout << "Done. Wall-clock time: " << sw.Elapsed() << NcbiEndl;
304 if (!submitted_job_ids.empty())
305 NcbiCout << submitted_job_ids.size() <<
306 " jobs were deliberately failed by the worker node(s)." << NcbiEndl;
307
308 return 0;
309 }
310
311
main(int argc,const char * argv[])312 int main(int argc, const char* argv[])
313 {
314 GRID_APP_CHECK_VERSION_ARGS();
315
316 return CTestNetScheduleClient().AppMain(argc, argv);
317 }
318