1 /* $Id: nc_cmds.cpp 607703 2020-05-06 16:57:56Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitry Kazimirov
27 *
28 * File Description: NetCache-specific commands of the grid_cli application.
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "grid_cli.hpp"
35
36 USING_NCBI_SCOPE;
37
SetUp_NetCache()38 void CGridCommandLineInterfaceApp::SetUp_NetCache()
39 {
40 if (IsOptionSet(eEnableMirroring)) {
41 GetRWConfig().Set("netcache_api", "enable_mirroring", "true");
42 }
43 }
44
SetUp_NetCacheCmd(bool icache_mode,bool require_version,bool require_service)45 void CGridCommandLineInterfaceApp::SetUp_NetCacheCmd(bool icache_mode,
46 bool require_version, bool require_service)
47 {
48 SetUp_NetCache();
49
50 if (!icache_mode) {
51 if (!m_Opts.ncid.subkey.empty() || NStr::MatchesMask(m_Opts.ncid.key, "*,*,*")) {
52 NCBI_THROW(CArgException, eNoValue, "both '--" NETCACHE_OPTION "' and '--" CACHE_OPTION "' "
53 "options are required in icache mode.");
54 }
55
56 m_APIClass = eNetCacheAPI;
57
58 if (!IsOptionSet(eNetCache)) {
59 if (m_Opts.ncid.key.empty()) {
60 if (require_service) {
61 NCBI_THROW(CArgException, eNoValue, "'--" NETCACHE_OPTION "' "
62 "option is required when ID is not provided.");
63 }
64 } else {
65 // If NetCache service is not provided, use server from blob ID
66 CNetCacheKey key(m_Opts.ncid.key, CCompoundIDPool());
67
68 if (key.GetVersion() != 3) {
69 m_Opts.nc_service = key.GetHost() + ':' + to_string(key.GetPort());
70 }
71 }
72 }
73
74 m_NetCacheAPI = CNetCacheAPI(m_Opts.nc_service,
75 m_Opts.auth, m_NetScheduleAPI);
76
77 if (!m_Opts.ncid.key.empty() && IsOptionExplicitlySet(eNetCache)) {
78 CNetCacheKey key(m_Opts.ncid.key, CCompoundIDPool());
79
80 if (key.GetVersion() != 3) {
81 if (auto address = SSocketAddress::Parse(m_Opts.nc_service)) {
82 m_NetCacheAPI.GetService().GetServerPool().StickToServer(move(address));
83 } else {
84 NCBI_THROW(CArgException, eInvalidArg,
85 "When blob ID is given, '--" NETCACHE_OPTION "' "
86 "must be a host:port server address.");
87 }
88
89 }
90 }
91
92 if (IsOptionSet(eNoServerCheck)) {
93 m_NetCacheAPI.SetDefaultParameters(nc_server_check = eOff);
94 }
95
96 } else {
97 m_Opts.ncid.Parse(icache_mode, require_version);
98
99 m_APIClass = eNetICacheClient;
100 m_NetICacheClient = CNetICacheClient(m_Opts.nc_service,
101 m_Opts.cache_name, m_Opts.auth);
102
103 if (m_Opts.nc_service.empty()) {
104 NCBI_THROW(CArgException, eNoValue, "'--" NETCACHE_OPTION "' "
105 "option is required in icache mode.");
106 }
107
108 if (!IsOptionSet(eCompatMode))
109 m_NetICacheClient.SetFlags(ICache::fBestReliability);
110 }
111 }
112
SetUp_NetCacheAdminCmd(CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity)113 void CGridCommandLineInterfaceApp::SetUp_NetCacheAdminCmd(
114 CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity)
115 {
116 SetUp_NetCache();
117
118 m_APIClass = eNetCacheAdmin;
119
120 if (cmd_severity != eReadOnlyAdminCmd &&
121 !IsOptionExplicitlySet(eNetCache)) {
122 NCBI_THROW(CArgException, eNoValue, "'--" NETCACHE_OPTION "' "
123 "must be explicitly specified.");
124 }
125 m_NetCacheAPI = CNetCacheAPI(m_Opts.nc_service, m_Opts.auth);
126 m_NetCacheAdmin = m_NetCacheAPI.GetAdmin();
127 }
128
PrintBlobMeta(const CNetCacheKey & key)129 void CGridCommandLineInterfaceApp::PrintBlobMeta(const CNetCacheKey& key)
130 {
131 CTime generation_time;
132
133 generation_time.SetTimeT((time_t) key.GetCreationTime());
134
135 if (key.GetVersion() != 3)
136 printf("server_address: %s:%hu\n",
137 g_NetService_TryResolveHost(key.GetHost()).c_str(), key.GetPort());
138 else
139 printf("server_address_crc32: 0x%08X\n", key.GetHostPortCRC32());
140
141 printf("id: %u\nkey_generation_time: %s\nrandom: %u\n",
142 key.GetId(),
143 generation_time.AsString().c_str(),
144 (unsigned) key.GetRandomPart());
145
146 string service(key.GetServiceName());
147
148 if (!service.empty())
149 printf("service_name: %s\n", service.c_str());
150 }
151
AddPart(const string & value)152 bool CGridCommandLineInterfaceApp::SOptions::SNCID::AddPart(
153 const string& value)
154 {
155 switch (++parts) {
156 case 1:
157 // First parameter is always key
158 key = value;
159 return true;
160 case 2:
161 // Second parameter could be subkey
162 // (when only two parameters provided) or version (when three).
163 // Until we get third parameter we consider this as subkey
164 subkey = value;
165 return true;
166 case 3:
167 // Okay, we have got third parameter, move second into version
168 ver = subkey;
169 subkey = value;
170 return true;
171 }
172
173 return false;
174 }
175
Parse(bool icache_mode,bool require_version)176 void CGridCommandLineInterfaceApp::SOptions::SNCID::Parse(
177 bool icache_mode, bool require_version)
178 {
179 if (!icache_mode) {
180 if (parts > 1) {
181 NCBI_THROW_FMT(CArgException, eInvalidArg,
182 "Too many positional parameters.");
183 }
184
185 return;
186 }
187
188 if (parts == 1) {
189 vector<string> key_parts;
190
191 NStr::Split(key, ",", key_parts);
192 if (key_parts.size() != 3) {
193 NCBI_THROW_FMT(CArgException, eInvalidArg,
194 "Invalid ICache key specification \"" << key << "\" ("
195 "expected a comma-separated key,version,subkey triplet).");
196 }
197 key = key_parts.front();
198 ver = key_parts[1];
199 subkey = key_parts.back();
200 }
201
202 if (!ver.empty()) {
203 version = NStr::StringToInt(ver);
204 } else if (require_version) {
205 NCBI_THROW(CArgException, eNoValue,
206 "blob version parameter is missing");
207 }
208 }
209
PrintServerAddress(CNetServer server)210 void CGridCommandLineInterfaceApp::PrintServerAddress(CNetServer server)
211 {
212 printf("Server: %s\n", server.GetServerAddress().c_str());
213 }
214
Cmd_BlobInfo()215 int CGridCommandLineInterfaceApp::Cmd_BlobInfo()
216 {
217 SetUp_NetCacheCmd();
218
219 try {
220 CNetServerMultilineCmdOutput output;
221
222 if (m_APIClass == eNetCacheAPI) {
223 PrintBlobMeta(CNetCacheKey(m_Opts.ncid.key, m_CompoundIDPool));
224
225 output = m_NetCacheAPI.GetBlobInfo(m_Opts.ncid.key,
226 nc_try_all_servers = IsOptionSet(eTryAllServers));
227 } else {
228 CNetServer server_last_used;
229
230 output = m_NetICacheClient.GetBlobInfo(
231 m_Opts.ncid.key,
232 m_Opts.ncid.version,
233 m_Opts.ncid.subkey,
234 (nc_try_all_servers = IsOptionSet(eTryAllServers),
235 nc_server_last_used = &server_last_used));
236
237 PrintServerAddress(server_last_used);
238 }
239
240 string line;
241
242 if (output.ReadLine(line)) {
243 if (!NStr::StartsWith(line, "SIZE="))
244 printf("%s\n", line.c_str());
245 while (output.ReadLine(line))
246 printf("%s\n", line.c_str());
247 }
248 }
249 catch (CNetCacheException& e) {
250 if (e.GetErrCode() != CNetCacheException::eServerError)
251 throw;
252
253 if (m_APIClass == eNetCacheAPI)
254 printf("Size: %lu\n", (unsigned long)
255 m_NetCacheAPI.GetBlobSize(m_Opts.ncid.key));
256 else {
257 CNetServer server_last_used;
258
259 size_t blob_size = m_NetICacheClient.GetBlobSize(
260 m_Opts.ncid.key,
261 m_Opts.ncid.version,
262 m_Opts.ncid.subkey,
263 nc_server_last_used = &server_last_used);
264
265 PrintServerAddress(server_last_used);
266
267 printf("Size: %lu\n", (unsigned long) blob_size);
268 }
269 }
270
271 return 0;
272 }
273
Cmd_GetBlob()274 int CGridCommandLineInterfaceApp::Cmd_GetBlob()
275 {
276 int reader_select = IsOptionSet(ePassword, OPTION_N(1)) |
277 (m_Opts.offset != 0 || m_Opts.size != 0 ? OPTION_N(0) : 0);
278
279 SetUp_NetCacheCmd(IsOptionSet(eCache), reader_select);
280
281 unique_ptr<IReader> reader;
282
283 if (m_APIClass == eNetCacheAPI) {
284 size_t blob_size = 0;
285 switch (reader_select) {
286 case 0: /* no special case */
287 reader.reset(m_NetCacheAPI.GetReader(
288 m_Opts.ncid.key,
289 &blob_size,
290 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
291 nc_try_all_servers = IsOptionSet(eTryAllServers))));
292 break;
293 case OPTION_N(0): /* use offset */
294 reader.reset(m_NetCacheAPI.GetPartReader(
295 m_Opts.ncid.key,
296 m_Opts.offset,
297 m_Opts.size,
298 &blob_size,
299 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
300 nc_try_all_servers = IsOptionSet(eTryAllServers))));
301 break;
302 case OPTION_N(1): /* use password */
303 reader.reset(m_NetCacheAPI.GetReader(
304 m_Opts.ncid.key,
305 &blob_size,
306 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
307 nc_blob_password = m_Opts.password,
308 nc_try_all_servers = IsOptionSet(eTryAllServers))));
309 break;
310 case OPTION_N(1) | OPTION_N(0): /* use password and offset */
311 reader.reset(m_NetCacheAPI.GetPartReader(
312 m_Opts.ncid.key,
313 m_Opts.offset,
314 m_Opts.size,
315 &blob_size,
316 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
317 nc_blob_password = m_Opts.password,
318 nc_try_all_servers = IsOptionSet(eTryAllServers))));
319 }
320 } else {
321 ICache::EBlobVersionValidity validity = ICache::eValid;
322 switch (reader_select) {
323 case 0: /* no special case */
324 reader.reset(m_Opts.ncid.HasVersion() ?
325 m_NetICacheClient.GetReadStream(
326 m_Opts.ncid.key,
327 m_Opts.ncid.version,
328 m_Opts.ncid.subkey,
329 NULL,
330 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
331 nc_try_all_servers = IsOptionSet(eTryAllServers))) :
332 m_NetICacheClient.GetReadStream(
333 m_Opts.ncid.key,
334 m_Opts.ncid.subkey,
335 &m_Opts.ncid.version,
336 &validity));
337 break;
338 case OPTION_N(0): /* use offset */
339 reader.reset(m_NetICacheClient.GetReadStreamPart(
340 m_Opts.ncid.key,
341 m_Opts.ncid.version,
342 m_Opts.ncid.subkey,
343 m_Opts.offset,
344 m_Opts.size,
345 NULL,
346 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
347 nc_try_all_servers = IsOptionSet(eTryAllServers))));
348 break;
349 case OPTION_N(1): /* use password */
350 reader.reset(m_NetICacheClient.GetReadStream(
351 m_Opts.ncid.key,
352 m_Opts.ncid.version,
353 m_Opts.ncid.subkey,
354 NULL,
355 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
356 nc_blob_password = m_Opts.password,
357 nc_try_all_servers = IsOptionSet(eTryAllServers))));
358 break;
359 case OPTION_N(1) | OPTION_N(0): /* use password and offset */
360 reader.reset(m_NetICacheClient.GetReadStreamPart(
361 m_Opts.ncid.key,
362 m_Opts.ncid.version,
363 m_Opts.ncid.subkey,
364 m_Opts.offset,
365 m_Opts.size,
366 NULL,
367 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
368 nc_blob_password = m_Opts.password,
369 nc_try_all_servers = IsOptionSet(eTryAllServers))));
370 }
371 if (!m_Opts.ncid.HasVersion())
372 NcbiCerr << "Blob version: " <<
373 m_Opts.ncid.version << NcbiEndl <<
374 "Blob validity: " << (validity == ICache::eCurrent ?
375 "current" : "expired") << NcbiEndl;
376 }
377 if (!reader.get()) {
378 NCBI_THROW(CNetCacheException, eBlobNotFound,
379 "Cannot find the requested blob");
380 }
381
382 char buffer[IO_BUFFER_SIZE];
383 ERW_Result read_result;
384 size_t bytes_read;
385
386 while ((read_result = reader->Read(buffer,
387 sizeof(buffer), &bytes_read)) == eRW_Success)
388 fwrite(buffer, 1, bytes_read, m_Opts.output_stream);
389
390 if (read_result != eRW_Eof) {
391 ERR_POST("Error while sending data to the output stream");
392 return 1;
393 }
394
395 return 0;
396 }
397
Cmd_PutBlob()398 int CGridCommandLineInterfaceApp::Cmd_PutBlob()
399 {
400 SetUp_NetCacheCmd();
401
402 unique_ptr<IEmbeddedStreamWriter> writer;
403
404 // Cannot use a reference here because m_Opts.ncid.key.empty() is
405 // used later to find out whether a blob was given in the
406 // command line.
407 string blob_key = m_Opts.ncid.key;
408
409 CNetServer server_last_used;
410
411 if (m_APIClass == eNetCacheAPI) {
412 switch (IsOptionSet(ePassword, 1) | IsOptionSet(eUseCompoundID, 2)) {
413 case 1:
414 writer.reset(m_NetCacheAPI.PutData(&blob_key,
415 (nc_blob_ttl = m_Opts.ttl,
416 nc_blob_password = m_Opts.password)));
417 break;
418 case 2:
419 writer.reset(m_NetCacheAPI.PutData(&blob_key,
420 (nc_blob_ttl = m_Opts.ttl,
421 nc_use_compound_id = true)));
422 break;
423 case 3:
424 writer.reset(m_NetCacheAPI.PutData(&blob_key,
425 (nc_blob_ttl = m_Opts.ttl,
426 nc_blob_password = m_Opts.password,
427 nc_use_compound_id = true)));
428 break;
429 default:
430 writer.reset(m_NetCacheAPI.PutData(&blob_key,
431 nc_blob_ttl = m_Opts.ttl));
432 }
433 } else {
434 writer.reset(IsOptionSet(ePassword) ?
435 m_NetICacheClient.GetNetCacheWriter(
436 m_Opts.ncid.key,
437 m_Opts.ncid.version,
438 m_Opts.ncid.subkey,
439 (nc_blob_ttl = m_Opts.ttl,
440 nc_blob_password = m_Opts.password,
441 nc_server_last_used = &server_last_used)) :
442 m_NetICacheClient.GetNetCacheWriter(
443 m_Opts.ncid.key,
444 m_Opts.ncid.version,
445 m_Opts.ncid.subkey,
446 (nc_blob_ttl = m_Opts.ttl,
447 nc_server_last_used = &server_last_used)));
448 }
449
450 if (!writer.get()) {
451 NCBI_USER_THROW("Cannot create blob stream");
452 }
453
454 if (m_APIClass != eNetCacheAPI &&
455 m_NetICacheClient.GetService().IsLoadBalanced())
456 PrintServerAddress(server_last_used);
457
458 size_t bytes_written;
459
460 if (IsOptionSet(eInput)) {
461 if (writer->Write(m_Opts.input.data(), m_Opts.input.length(),
462 &bytes_written) != eRW_Success ||
463 bytes_written != m_Opts.input.length())
464 goto ErrorExit;
465 } else {
466 char buffer[IO_BUFFER_SIZE];
467
468 do {
469 m_Opts.input_stream->read(buffer, sizeof(buffer));
470 if (m_Opts.input_stream->fail() && !m_Opts.input_stream->eof()) {
471 NCBI_USER_THROW("Error while reading from input stream");
472 }
473 size_t bytes_read = (size_t) m_Opts.input_stream->gcount();
474 if (writer->Write(buffer, bytes_read, &bytes_written) !=
475 eRW_Success || bytes_written != bytes_read)
476 goto ErrorExit;
477 } while (!m_Opts.input_stream->eof());
478 }
479
480 writer->Close();
481
482 if (m_APIClass == eNetCacheAPI && m_Opts.ncid.key.empty())
483 NcbiCout << blob_key << NcbiEndl;
484
485 return 0;
486
487 ErrorExit:
488 NCBI_USER_THROW("Error while writing to NetCache");
489 }
490
Cmd_RemoveBlob()491 int CGridCommandLineInterfaceApp::Cmd_RemoveBlob()
492 {
493 SetUp_NetCacheCmd();
494
495 if (m_APIClass == eNetCacheAPI)
496 if (IsOptionSet(ePassword))
497 m_NetCacheAPI.Remove(m_Opts.ncid.key,
498 nc_blob_password = m_Opts.password);
499 else
500 m_NetCacheAPI.Remove(m_Opts.ncid.key);
501 else {
502 if (IsOptionSet(ePassword))
503 m_NetICacheClient.RemoveBlob(m_Opts.ncid.key,
504 m_Opts.ncid.version, m_Opts.ncid.subkey,
505 nc_blob_password = m_Opts.password);
506 else
507 m_NetICacheClient.RemoveBlob(m_Opts.ncid.key,
508 m_Opts.ncid.version, m_Opts.ncid.subkey);
509 }
510
511 return 0;
512 }
513
Cmd_Purge()514 int CGridCommandLineInterfaceApp::Cmd_Purge()
515 {
516 SetUp_NetCacheAdminCmd(eAdminCmdWithSideEffects);
517
518 m_NetCacheAdmin.Purge(m_Opts.cache_name);
519
520 return 0;
521 }
522