1 /*  $Id: nc_cmds.cpp 607703 2020-05-06 16:57:56Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Dmitry Kazimirov
27  *
28  * File Description: NetCache-specific commands of the grid_cli application.
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "grid_cli.hpp"
35 
36 USING_NCBI_SCOPE;
37 
SetUp_NetCache()38 void CGridCommandLineInterfaceApp::SetUp_NetCache()
39 {
40     if (IsOptionSet(eEnableMirroring)) {
41         GetRWConfig().Set("netcache_api", "enable_mirroring", "true");
42     }
43 }
44 
SetUp_NetCacheCmd(bool icache_mode,bool require_version,bool require_service)45 void CGridCommandLineInterfaceApp::SetUp_NetCacheCmd(bool icache_mode,
46         bool require_version, bool require_service)
47 {
48     SetUp_NetCache();
49 
50     if (!icache_mode) {
51         if (!m_Opts.ncid.subkey.empty() || NStr::MatchesMask(m_Opts.ncid.key, "*,*,*")) {
52             NCBI_THROW(CArgException, eNoValue, "both '--" NETCACHE_OPTION "' and '--" CACHE_OPTION "' "
53                 "options are required in icache mode.");
54         }
55 
56         m_APIClass = eNetCacheAPI;
57 
58         if (!IsOptionSet(eNetCache)) {
59             if (m_Opts.ncid.key.empty()) {
60                 if (require_service) {
61                     NCBI_THROW(CArgException, eNoValue, "'--" NETCACHE_OPTION "' "
62                         "option is required when ID is not provided.");
63                 }
64             } else {
65                 // If NetCache service is not provided, use server from blob ID
66                 CNetCacheKey key(m_Opts.ncid.key, CCompoundIDPool());
67 
68                 if (key.GetVersion() != 3) {
69                     m_Opts.nc_service = key.GetHost() + ':' + to_string(key.GetPort());
70                 }
71             }
72         }
73 
74         m_NetCacheAPI = CNetCacheAPI(m_Opts.nc_service,
75                 m_Opts.auth, m_NetScheduleAPI);
76 
77         if (!m_Opts.ncid.key.empty() && IsOptionExplicitlySet(eNetCache)) {
78             CNetCacheKey key(m_Opts.ncid.key, CCompoundIDPool());
79 
80             if (key.GetVersion() != 3) {
81                 if (auto address = SSocketAddress::Parse(m_Opts.nc_service)) {
82                     m_NetCacheAPI.GetService().GetServerPool().StickToServer(move(address));
83                 } else {
84                     NCBI_THROW(CArgException, eInvalidArg,
85                         "When blob ID is given, '--" NETCACHE_OPTION "' "
86                         "must be a host:port server address.");
87                 }
88 
89             }
90         }
91 
92         if (IsOptionSet(eNoServerCheck)) {
93             m_NetCacheAPI.SetDefaultParameters(nc_server_check = eOff);
94         }
95 
96     } else {
97         m_Opts.ncid.Parse(icache_mode, require_version);
98 
99         m_APIClass = eNetICacheClient;
100         m_NetICacheClient = CNetICacheClient(m_Opts.nc_service,
101             m_Opts.cache_name, m_Opts.auth);
102 
103         if (m_Opts.nc_service.empty()) {
104             NCBI_THROW(CArgException, eNoValue, "'--" NETCACHE_OPTION "' "
105                 "option is required in icache mode.");
106         }
107 
108         if (!IsOptionSet(eCompatMode))
109             m_NetICacheClient.SetFlags(ICache::fBestReliability);
110     }
111 }
112 
SetUp_NetCacheAdminCmd(CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity)113 void CGridCommandLineInterfaceApp::SetUp_NetCacheAdminCmd(
114         CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity)
115 {
116     SetUp_NetCache();
117 
118     m_APIClass = eNetCacheAdmin;
119 
120     if (cmd_severity != eReadOnlyAdminCmd &&
121             !IsOptionExplicitlySet(eNetCache)) {
122         NCBI_THROW(CArgException, eNoValue, "'--" NETCACHE_OPTION "' "
123             "must be explicitly specified.");
124     }
125     m_NetCacheAPI = CNetCacheAPI(m_Opts.nc_service, m_Opts.auth);
126     m_NetCacheAdmin = m_NetCacheAPI.GetAdmin();
127 }
128 
PrintBlobMeta(const CNetCacheKey & key)129 void CGridCommandLineInterfaceApp::PrintBlobMeta(const CNetCacheKey& key)
130 {
131     CTime generation_time;
132 
133     generation_time.SetTimeT((time_t) key.GetCreationTime());
134 
135     if (key.GetVersion() != 3)
136         printf("server_address: %s:%hu\n",
137             g_NetService_TryResolveHost(key.GetHost()).c_str(), key.GetPort());
138     else
139         printf("server_address_crc32: 0x%08X\n", key.GetHostPortCRC32());
140 
141     printf("id: %u\nkey_generation_time: %s\nrandom: %u\n",
142         key.GetId(),
143         generation_time.AsString().c_str(),
144         (unsigned) key.GetRandomPart());
145 
146     string service(key.GetServiceName());
147 
148     if (!service.empty())
149         printf("service_name: %s\n", service.c_str());
150 }
151 
AddPart(const string & value)152 bool CGridCommandLineInterfaceApp::SOptions::SNCID::AddPart(
153         const string& value)
154 {
155     switch (++parts) {
156         case 1:
157             // First parameter is always key
158             key = value;
159             return true;
160         case 2:
161             // Second parameter could be subkey
162             // (when only two parameters provided) or version (when three).
163             // Until we get third parameter we consider this as subkey
164             subkey = value;
165             return true;
166         case 3:
167             // Okay, we have got third parameter, move second into version
168             ver = subkey;
169             subkey = value;
170             return true;
171     }
172 
173     return false;
174 }
175 
Parse(bool icache_mode,bool require_version)176 void CGridCommandLineInterfaceApp::SOptions::SNCID::Parse(
177         bool icache_mode, bool require_version)
178 {
179     if (!icache_mode) {
180         if (parts > 1) {
181             NCBI_THROW_FMT(CArgException, eInvalidArg,
182                     "Too many positional parameters.");
183         }
184 
185         return;
186     }
187 
188     if (parts == 1) {
189         vector<string> key_parts;
190 
191         NStr::Split(key, ",", key_parts);
192         if (key_parts.size() != 3) {
193             NCBI_THROW_FMT(CArgException, eInvalidArg,
194                 "Invalid ICache key specification \"" << key << "\" ("
195                 "expected a comma-separated key,version,subkey triplet).");
196         }
197         key = key_parts.front();
198         ver = key_parts[1];
199         subkey = key_parts.back();
200     }
201 
202     if (!ver.empty()) {
203         version = NStr::StringToInt(ver);
204     } else if (require_version) {
205         NCBI_THROW(CArgException, eNoValue,
206             "blob version parameter is missing");
207     }
208 }
209 
PrintServerAddress(CNetServer server)210 void CGridCommandLineInterfaceApp::PrintServerAddress(CNetServer server)
211 {
212     printf("Server: %s\n", server.GetServerAddress().c_str());
213 }
214 
Cmd_BlobInfo()215 int CGridCommandLineInterfaceApp::Cmd_BlobInfo()
216 {
217     SetUp_NetCacheCmd();
218 
219     try {
220         CNetServerMultilineCmdOutput output;
221 
222         if (m_APIClass == eNetCacheAPI) {
223             PrintBlobMeta(CNetCacheKey(m_Opts.ncid.key, m_CompoundIDPool));
224 
225             output = m_NetCacheAPI.GetBlobInfo(m_Opts.ncid.key,
226                     nc_try_all_servers = IsOptionSet(eTryAllServers));
227         } else {
228             CNetServer server_last_used;
229 
230             output = m_NetICacheClient.GetBlobInfo(
231                     m_Opts.ncid.key,
232                     m_Opts.ncid.version,
233                     m_Opts.ncid.subkey,
234                     (nc_try_all_servers = IsOptionSet(eTryAllServers),
235                     nc_server_last_used = &server_last_used));
236 
237             PrintServerAddress(server_last_used);
238         }
239 
240         string line;
241 
242         if (output.ReadLine(line)) {
243             if (!NStr::StartsWith(line, "SIZE="))
244                 printf("%s\n", line.c_str());
245             while (output.ReadLine(line))
246                 printf("%s\n", line.c_str());
247         }
248     }
249     catch (CNetCacheException& e) {
250         if (e.GetErrCode() != CNetCacheException::eServerError)
251             throw;
252 
253         if (m_APIClass == eNetCacheAPI)
254             printf("Size: %lu\n", (unsigned long)
255                 m_NetCacheAPI.GetBlobSize(m_Opts.ncid.key));
256         else {
257             CNetServer server_last_used;
258 
259             size_t blob_size = m_NetICacheClient.GetBlobSize(
260                     m_Opts.ncid.key,
261                     m_Opts.ncid.version,
262                     m_Opts.ncid.subkey,
263                     nc_server_last_used = &server_last_used);
264 
265             PrintServerAddress(server_last_used);
266 
267             printf("Size: %lu\n", (unsigned long) blob_size);
268         }
269     }
270 
271     return 0;
272 }
273 
Cmd_GetBlob()274 int CGridCommandLineInterfaceApp::Cmd_GetBlob()
275 {
276     int reader_select = IsOptionSet(ePassword, OPTION_N(1)) |
277         (m_Opts.offset != 0 || m_Opts.size != 0 ? OPTION_N(0) : 0);
278 
279     SetUp_NetCacheCmd(IsOptionSet(eCache), reader_select);
280 
281     unique_ptr<IReader> reader;
282 
283     if (m_APIClass == eNetCacheAPI) {
284         size_t blob_size = 0;
285         switch (reader_select) {
286         case 0: /* no special case */
287             reader.reset(m_NetCacheAPI.GetReader(
288                 m_Opts.ncid.key,
289                 &blob_size,
290                 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
291                 nc_try_all_servers = IsOptionSet(eTryAllServers))));
292             break;
293         case OPTION_N(0): /* use offset */
294             reader.reset(m_NetCacheAPI.GetPartReader(
295                 m_Opts.ncid.key,
296                 m_Opts.offset,
297                 m_Opts.size,
298                 &blob_size,
299                 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
300                 nc_try_all_servers = IsOptionSet(eTryAllServers))));
301             break;
302         case OPTION_N(1): /* use password */
303             reader.reset(m_NetCacheAPI.GetReader(
304                     m_Opts.ncid.key,
305                     &blob_size,
306                     (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
307                     nc_blob_password = m_Opts.password,
308                     nc_try_all_servers = IsOptionSet(eTryAllServers))));
309             break;
310         case OPTION_N(1) | OPTION_N(0): /* use password and offset */
311             reader.reset(m_NetCacheAPI.GetPartReader(
312                     m_Opts.ncid.key,
313                     m_Opts.offset,
314                     m_Opts.size,
315                     &blob_size,
316                     (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
317                     nc_blob_password = m_Opts.password,
318                     nc_try_all_servers = IsOptionSet(eTryAllServers))));
319         }
320     } else {
321         ICache::EBlobVersionValidity validity = ICache::eValid;
322         switch (reader_select) {
323         case 0: /* no special case */
324             reader.reset(m_Opts.ncid.HasVersion() ?
325                 m_NetICacheClient.GetReadStream(
326                     m_Opts.ncid.key,
327                     m_Opts.ncid.version,
328                     m_Opts.ncid.subkey,
329                     NULL,
330                     (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
331                     nc_try_all_servers = IsOptionSet(eTryAllServers))) :
332                 m_NetICacheClient.GetReadStream(
333                     m_Opts.ncid.key,
334                     m_Opts.ncid.subkey,
335                     &m_Opts.ncid.version,
336                     &validity));
337             break;
338         case OPTION_N(0): /* use offset */
339             reader.reset(m_NetICacheClient.GetReadStreamPart(
340                 m_Opts.ncid.key,
341                 m_Opts.ncid.version,
342                 m_Opts.ncid.subkey,
343                 m_Opts.offset,
344                 m_Opts.size,
345                 NULL,
346                 (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
347                 nc_try_all_servers = IsOptionSet(eTryAllServers))));
348             break;
349         case OPTION_N(1): /* use password */
350             reader.reset(m_NetICacheClient.GetReadStream(
351                     m_Opts.ncid.key,
352                     m_Opts.ncid.version,
353                     m_Opts.ncid.subkey,
354                     NULL,
355                     (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
356                     nc_blob_password = m_Opts.password,
357                     nc_try_all_servers = IsOptionSet(eTryAllServers))));
358             break;
359         case OPTION_N(1) | OPTION_N(0): /* use password and offset */
360             reader.reset(m_NetICacheClient.GetReadStreamPart(
361                     m_Opts.ncid.key,
362                     m_Opts.ncid.version,
363                     m_Opts.ncid.subkey,
364                     m_Opts.offset,
365                     m_Opts.size,
366                     NULL,
367                     (nc_caching_mode = CNetCacheAPI::eCaching_Disable,
368                     nc_blob_password = m_Opts.password,
369                     nc_try_all_servers = IsOptionSet(eTryAllServers))));
370         }
371         if (!m_Opts.ncid.HasVersion())
372             NcbiCerr << "Blob version: " <<
373                     m_Opts.ncid.version << NcbiEndl <<
374                 "Blob validity: " << (validity == ICache::eCurrent ?
375                     "current" : "expired") << NcbiEndl;
376     }
377     if (!reader.get()) {
378         NCBI_THROW(CNetCacheException, eBlobNotFound,
379             "Cannot find the requested blob");
380     }
381 
382     char buffer[IO_BUFFER_SIZE];
383     ERW_Result read_result;
384     size_t bytes_read;
385 
386     while ((read_result = reader->Read(buffer,
387             sizeof(buffer), &bytes_read)) == eRW_Success)
388         fwrite(buffer, 1, bytes_read, m_Opts.output_stream);
389 
390     if (read_result != eRW_Eof) {
391         ERR_POST("Error while sending data to the output stream");
392         return 1;
393     }
394 
395     return 0;
396 }
397 
Cmd_PutBlob()398 int CGridCommandLineInterfaceApp::Cmd_PutBlob()
399 {
400     SetUp_NetCacheCmd();
401 
402     unique_ptr<IEmbeddedStreamWriter> writer;
403 
404     // Cannot use a reference here because m_Opts.ncid.key.empty() is
405     // used later to find out whether a blob was given in the
406     // command line.
407     string blob_key = m_Opts.ncid.key;
408 
409     CNetServer server_last_used;
410 
411     if (m_APIClass == eNetCacheAPI) {
412         switch (IsOptionSet(ePassword, 1) | IsOptionSet(eUseCompoundID, 2)) {
413         case 1:
414             writer.reset(m_NetCacheAPI.PutData(&blob_key,
415                     (nc_blob_ttl = m_Opts.ttl,
416                     nc_blob_password = m_Opts.password)));
417             break;
418         case 2:
419             writer.reset(m_NetCacheAPI.PutData(&blob_key,
420                     (nc_blob_ttl = m_Opts.ttl,
421                     nc_use_compound_id = true)));
422             break;
423         case 3:
424             writer.reset(m_NetCacheAPI.PutData(&blob_key,
425                     (nc_blob_ttl = m_Opts.ttl,
426                     nc_blob_password = m_Opts.password,
427                     nc_use_compound_id = true)));
428             break;
429         default:
430             writer.reset(m_NetCacheAPI.PutData(&blob_key,
431                     nc_blob_ttl = m_Opts.ttl));
432         }
433     } else {
434         writer.reset(IsOptionSet(ePassword) ?
435             m_NetICacheClient.GetNetCacheWriter(
436                     m_Opts.ncid.key,
437                     m_Opts.ncid.version,
438                     m_Opts.ncid.subkey,
439                     (nc_blob_ttl = m_Opts.ttl,
440                     nc_blob_password = m_Opts.password,
441                     nc_server_last_used = &server_last_used)) :
442             m_NetICacheClient.GetNetCacheWriter(
443                     m_Opts.ncid.key,
444                     m_Opts.ncid.version,
445                     m_Opts.ncid.subkey,
446                     (nc_blob_ttl = m_Opts.ttl,
447                     nc_server_last_used = &server_last_used)));
448     }
449 
450     if (!writer.get()) {
451         NCBI_USER_THROW("Cannot create blob stream");
452     }
453 
454     if (m_APIClass != eNetCacheAPI &&
455             m_NetICacheClient.GetService().IsLoadBalanced())
456         PrintServerAddress(server_last_used);
457 
458     size_t bytes_written;
459 
460     if (IsOptionSet(eInput)) {
461         if (writer->Write(m_Opts.input.data(), m_Opts.input.length(),
462                 &bytes_written) != eRW_Success ||
463                 bytes_written != m_Opts.input.length())
464             goto ErrorExit;
465     } else {
466         char buffer[IO_BUFFER_SIZE];
467 
468         do {
469             m_Opts.input_stream->read(buffer, sizeof(buffer));
470             if (m_Opts.input_stream->fail() && !m_Opts.input_stream->eof()) {
471                 NCBI_USER_THROW("Error while reading from input stream");
472             }
473             size_t bytes_read = (size_t) m_Opts.input_stream->gcount();
474             if (writer->Write(buffer, bytes_read, &bytes_written) !=
475                     eRW_Success || bytes_written != bytes_read)
476                 goto ErrorExit;
477         } while (!m_Opts.input_stream->eof());
478     }
479 
480     writer->Close();
481 
482     if (m_APIClass == eNetCacheAPI && m_Opts.ncid.key.empty())
483         NcbiCout << blob_key << NcbiEndl;
484 
485     return 0;
486 
487 ErrorExit:
488     NCBI_USER_THROW("Error while writing to NetCache");
489 }
490 
Cmd_RemoveBlob()491 int CGridCommandLineInterfaceApp::Cmd_RemoveBlob()
492 {
493     SetUp_NetCacheCmd();
494 
495     if (m_APIClass == eNetCacheAPI)
496         if (IsOptionSet(ePassword))
497             m_NetCacheAPI.Remove(m_Opts.ncid.key,
498                     nc_blob_password = m_Opts.password);
499         else
500             m_NetCacheAPI.Remove(m_Opts.ncid.key);
501     else {
502         if (IsOptionSet(ePassword))
503             m_NetICacheClient.RemoveBlob(m_Opts.ncid.key,
504                     m_Opts.ncid.version, m_Opts.ncid.subkey,
505                     nc_blob_password = m_Opts.password);
506         else
507             m_NetICacheClient.RemoveBlob(m_Opts.ncid.key,
508                 m_Opts.ncid.version, m_Opts.ncid.subkey);
509     }
510 
511     return 0;
512 }
513 
Cmd_Purge()514 int CGridCommandLineInterfaceApp::Cmd_Purge()
515 {
516     SetUp_NetCacheAdminCmd(eAdminCmdWithSideEffects);
517 
518     m_NetCacheAdmin.Purge(m_Opts.cache_name);
519 
520     return 0;
521 }
522