1 /*  $Id: neticache_client.cpp 617404 2020-09-30 19:02:00Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author:  Anatoliy Kuznetsov, Dmitry Kazimirov
27  *
28  * File Description:
29  *   Implementation of netcache ICache client.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "netcache_api_impl.hpp"
36 
37 #include <connect/services/neticache_client.hpp>
38 #include <connect/services/error_codes.hpp>
39 #include <connect/services/netcache_api_expt.hpp>
40 #include <connect/services/srv_connections.hpp>
41 
42 #include <connect/ncbi_conn_exception.hpp>
43 
44 #include <util/cache/icache_cf.hpp>
45 #include <util/transmissionrw.hpp>
46 
47 #include <corelib/ncbitime.hpp>
48 #include <corelib/request_ctx.hpp>
49 #include <corelib/plugin_manager_impl.hpp>
50 #include <corelib/ncbi_system.hpp>
51 #include <corelib/ncbi_safe_static.hpp>
52 
53 #include <memory>
54 #include <sstream>
55 
56 #define MAX_ICACHE_CACHE_NAME_LENGTH 64
57 #define MAX_ICACHE_KEY_LENGTH 256
58 #define MAX_ICACHE_SUBKEY_LENGTH 256
59 
60 #define NCBI_USE_ERRCODE_X   ConnServ_NetCache
61 
62 
63 BEGIN_NCBI_SCOPE
64 
65 const char* const kNetICacheDriverName = "netcache";
66 
s_CheckKeySubkey(const string & key,const string & subkey,string * encoded_key)67 static string s_CheckKeySubkey(
68     const string& key, const string& subkey, string* encoded_key)
69 {
70     encoded_key->push_back('"');
71     encoded_key->append(NStr::PrintableString(key));
72 
73     string encoded_subkey(NStr::PrintableString(subkey));
74 
75     if (encoded_key->length() > (1 + MAX_ICACHE_KEY_LENGTH) ||
76             encoded_subkey.length() > MAX_ICACHE_SUBKEY_LENGTH) {
77         NCBI_THROW(CNetCacheException, eKeyFormatError,
78             "ICache key or subkey is too long");
79     }
80 
81     return encoded_subkey;
82 }
83 
s_KeySubkeyToBlobID(const string & key,const string & subkey)84 static string s_KeySubkeyToBlobID(const string& key, const string& subkey)
85 {
86     string blob_id(kEmptyStr);
87     blob_id.reserve(1 + key.length() + 3 + subkey.length() + 1);
88 
89     string encoded_subkey(s_CheckKeySubkey(key, subkey, &blob_id));
90 
91     blob_id.append("\" \"", 3);
92 
93     blob_id.append(encoded_subkey);
94 
95     blob_id.push_back('"');
96 
97     return blob_id;
98 }
99 
s_KeyVersionSubkeyToBlobID(const string & key,int version,const string & subkey)100 static string s_KeyVersionSubkeyToBlobID(
101     const string& key, int version, const string& subkey)
102 {
103     string blob_id(kEmptyStr);
104     blob_id.reserve(1 + key.length() + 2 +
105         int((double)sizeof(version) * 1.5) + 2 + subkey.length() + 1);
106 
107     string encoded_subkey(s_CheckKeySubkey(key, subkey, &blob_id));
108 
109     blob_id.append("\" ", 2);
110     blob_id.append(NStr::IntToString(version));
111     blob_id.append(" \"", 2);
112 
113     blob_id.append(encoded_subkey);
114 
115     blob_id.push_back('"');
116 
117     return blob_id;
118 }
119 
120 static const char s_NetICacheAPIName[] = "NetICacheClient";
121 
122 struct SNetICacheClientImpl : public SNetCacheAPIImpl
123 {
SNetICacheClientImplSNetICacheClientImpl124     SNetICacheClientImpl(CSynRegistryBuilder registry_builder,
125             const string& section,
126             const string& service_name,
127             const string& client_name,
128             const string& cache_name) :
129         m_CacheFlags(ICache::fBestPerformance)
130     {
131         m_DefaultParameters.SetCacheName(cache_name);
132         SRegSynonyms sections{ section, "netcache_api", "netcache_client", kNetICacheDriverName };
133         m_Service = SNetServiceImpl::Create(s_NetICacheAPIName, service_name, client_name,
134                 new CNetCacheServerListener,
135                 registry_builder, sections);
136         Init(registry_builder, sections);
137     }
138 
SNetICacheClientImplSNetICacheClientImpl139     SNetICacheClientImpl(SNetServerInPool* server, SNetICacheClientImpl* parent) :
140         SNetCacheAPIImpl(server, parent),
141         m_CacheFlags(parent->m_CacheFlags)
142     {
143     }
144 
145     CNetServer::SExecResult ChooseServerAndExec(const string& cmd,
146             const string& key,
147             bool multiline_output,
148             const CNetCacheAPIParameters* parameters);
149 
150     string MakeStdCmd(const char* cmd_base, const string& blob_id,
151         const CNetCacheAPIParameters* parameters,
152         const string& injection = kEmptyStr);
153 
154     string ExecStdCmd(const char* cmd_base, const string& key,
155         int version, const string& subkey,
156         const CNetCacheAPIParameters* parameters);
157 
158     virtual CNetServerConnection InitiateWriteCmd(CNetCacheWriter* nc_writer,
159             const CNetCacheAPIParameters* parameters);
160 
161     IReader* ReadCurrentBlobNotOlderThan(
162         const string& key, const string& subkey,
163         size_t* blob_size_ptr,
164         int* version,
165         ICache::EBlobVersionValidity* validity,
166         unsigned max_age, unsigned* actual_age,
167         const CNamedParameterList* optional = NULL);
168 
169     IReader* GetReadStreamPart(const string& key,
170         int version, const string& subkey,
171         size_t offset, size_t part_size,
172         size_t* blob_size_ptr,
173         const CNamedParameterList* optional);
174 
175     void Init(CSynRegistry& registry, const SRegSynonyms& sections);
176 
177     ICache::TFlags m_CacheFlags;
178 };
179 
Init(CSynRegistry & registry,const SRegSynonyms & sections)180 void SNetICacheClientImpl::Init(CSynRegistry& registry, const SRegSynonyms& sections)
181 {
182     SNetCacheAPIImpl::Init(registry, sections);
183 
184     auto cache_name = m_DefaultParameters.GetCacheName();
185 
186     if (cache_name.empty()) cache_name = registry.Get(sections, { "name", "cache_name" }, "default_cache");
187 
188     if (cache_name.length() > MAX_ICACHE_CACHE_NAME_LENGTH) {
189         NCBI_THROW(CNetCacheException, eAuthenticationError, "NetICache: cache name is too long");
190     }
191 
192     m_DefaultParameters.SetCacheName(cache_name);
193     m_DefaultParameters.SetTryAllServers(registry.Get(sections, "try_all_servers", false));
194 }
195 
InitiateWriteCmd(CNetCacheWriter * nc_writer,const CNetCacheAPIParameters * parameters)196 CNetServerConnection SNetICacheClientImpl::InitiateWriteCmd(
197     CNetCacheWriter* nc_writer, const CNetCacheAPIParameters* parameters)
198 {
199     string cmd("IC(" + NStr::PrintableString(parameters->GetCacheName()));
200     cmd.append(") STOR ");
201 
202     cmd.append(NStr::UIntToString(parameters->GetTTL()));
203     cmd.push_back(' ');
204     cmd.append(nc_writer->GetBlobID());
205     if (nc_writer->GetResponseType() == eNetCache_Wait)
206         cmd.append(" confirm=1");
207     AppendClientIPSessionIDPasswordAgeHitID(&cmd, parameters);
208     if (m_FlagsOnWrite) cmd.append(" flags=").append(to_string(m_FlagsOnWrite));
209 
210     return ChooseServerAndExec(cmd, nc_writer->GetKey(),
211             false, parameters).conn;
212 }
213 
CNetICacheClient(EAppRegistry,const string & conf_section)214 CNetICacheClient::CNetICacheClient(EAppRegistry,
215         const string& conf_section) :
216     m_Impl(new SNetICacheClientImpl(NULL, conf_section,
217         kEmptyStr, kEmptyStr, kEmptyStr))
218 {
219 }
220 
CNetICacheClient(const string & host,unsigned short port,const string & cache_name,const string & client_name)221 CNetICacheClient::CNetICacheClient(
222         const string& host,
223         unsigned short port,
224         const string& cache_name,
225         const string& client_name) :
226     m_Impl(new SNetICacheClientImpl(NULL, kEmptyStr,
227         host + ':' + NStr::UIntToString(port),
228         client_name, cache_name))
229 {
230 }
231 
CNetICacheClient(const string & service_name,const string & cache_name,const string & client_name)232 CNetICacheClient::CNetICacheClient(
233         const string& service_name,
234         const string& cache_name,
235         const string& client_name) :
236     m_Impl(new SNetICacheClientImpl(NULL, kEmptyStr,
237         service_name, client_name, cache_name))
238 {
239 }
240 
CNetICacheClient(CConfig * config,const string & driver_name)241 CNetICacheClient::CNetICacheClient(CConfig* config, const string& driver_name) :
242     m_Impl(new SNetICacheClientImpl(config, driver_name,
243         kEmptyStr, kEmptyStr, kEmptyStr))
244 {
245 }
246 
CNetICacheClient(const IRegistry & reg,const string & section)247 CNetICacheClient::CNetICacheClient(const IRegistry& reg, const string& section) :
248     m_Impl(new SNetICacheClientImpl(reg, section,
249         kEmptyStr, kEmptyStr, kEmptyStr))
250 {
251 }
252 
SetCommunicationTimeout(const STimeout & to)253 void CNetICacheClient::SetCommunicationTimeout(const STimeout& to)
254 {
255     m_Impl->m_Service->m_ServerPool.SetCommunicationTimeout(to);
256 }
257 
258 
GetCommunicationTimeout() const259 STimeout CNetICacheClient::GetCommunicationTimeout() const
260 {
261     return m_Impl->m_Service->m_ServerPool.GetCommunicationTimeout();
262 }
263 
264 class SWeightedServiceTraversal : public IServiceTraversal
265 {
266 public:
SWeightedServiceTraversal(CNetService::TInstance service,const string & key)267     SWeightedServiceTraversal(CNetService::TInstance service,
268             const string& key) :
269         m_Service(service),
270         m_Key(key)
271     {
272     }
273 
274     virtual CNetServer BeginIteration();
275     virtual CNetServer NextServer();
276 
277 private:
278     CNetService m_Service;
279     const string& m_Key;
280     CNetServiceIterator m_Iterator;
281 };
282 
BeginIteration()283 CNetServer SWeightedServiceTraversal::BeginIteration()
284 {
285     return *(m_Iterator = m_Service.IterateByWeight(m_Key));
286 }
287 
NextServer()288 CNetServer SWeightedServiceTraversal::NextServer()
289 {
290     return ++m_Iterator ? *m_Iterator : CNetServer();
291 }
292 
ChooseServerAndExec(const string & cmd,const string & key,bool multiline_output,const CNetCacheAPIParameters * parameters)293 CNetServer::SExecResult SNetICacheClientImpl::ChooseServerAndExec(
294         const string& cmd,
295         const string& key,
296         bool multiline_output,
297         const CNetCacheAPIParameters* parameters)
298 {
299     CNetServer selected_server(parameters->GetServerToUse());
300     CNetServer* server_last_used_ptr(parameters->GetServerLastUsedPtr());
301 
302     const auto try_all_servers = parameters->GetTryAllServers();
303     CNetServer::SExecResult exec_result;
304 
305     if (!try_all_servers && selected_server) {
306         exec_result = selected_server.ExecWithRetry(cmd, multiline_output);
307 
308     } else if (selected_server) {
309         ESwitch server_check = eDefault;
310         parameters->GetServerCheck(&server_check);
311 
312         SNetCacheMirrorTraversal mirror_traversal(m_Service,
313                 selected_server, server_check);
314 
315         m_Service->IterateUntilExecOK(cmd, multiline_output, exec_result,
316                 &mirror_traversal, SNetServiceImpl::eIgnoreServerErrors);
317     } else {
318         SWeightedServiceTraversal service_traversal(m_Service, key);
319 
320         m_Service->IterateUntilExecOK(cmd, multiline_output, exec_result,
321                 &service_traversal, try_all_servers ?
322                 SNetServiceImpl::eIgnoreServerErrors : SNetServiceImpl::eRethrowAllServerErrors);
323     }
324 
325     if (server_last_used_ptr != NULL)
326         *server_last_used_ptr = exec_result.conn->m_Server;
327 
328     return exec_result;
329 }
330 
RegisterSession(unsigned)331 void CNetICacheClient::RegisterSession(unsigned)
332 {
333     NCBI_THROW(CNetCacheException, eNotImplemented, "SMR is not implemented");
334 }
335 
336 
UnRegisterSession(unsigned)337 void CNetICacheClient::UnRegisterSession(unsigned)
338 {
339     NCBI_THROW(CNetCacheException, eNotImplemented, "SMU is not implemented");
340 }
341 
342 
GetFlags()343 ICache::TFlags CNetICacheClient::GetFlags()
344 {
345     return m_Impl->m_CacheFlags;
346 }
347 
348 
SetFlags(ICache::TFlags flags)349 void CNetICacheClient::SetFlags(ICache::TFlags flags)
350 {
351     m_Impl->m_CacheFlags = flags;
352 }
353 
354 
SetTimeStampPolicy(TTimeStampFlags,unsigned,unsigned)355 void CNetICacheClient::SetTimeStampPolicy(TTimeStampFlags, unsigned, unsigned)
356 {
357     NCBI_THROW(CNetCacheException, eNotImplemented, "STSP is not implemented");
358 }
359 
360 
GetTimeStampPolicy() const361 ICache::TTimeStampFlags CNetICacheClient::GetTimeStampPolicy() const
362 {
363     NCBI_THROW(CNetCacheException, eNotImplemented, "GTSP is not implemented");
364 }
365 
366 
GetTimeout() const367 int CNetICacheClient::GetTimeout() const
368 {
369     NCBI_THROW(CNetCacheException, eNotImplemented, "GTOU is not implemented");
370 }
371 
372 
IsOpen() const373 bool CNetICacheClient::IsOpen() const
374 {
375     NCBI_THROW(CNetCacheException, eNotImplemented, "ISOP is not implemented");
376 }
377 
378 
SetVersionRetention(EKeepVersions)379 void CNetICacheClient::SetVersionRetention(EKeepVersions)
380 {
381     NCBI_THROW(CNetCacheException, eNotImplemented, "SVRP is not implemented");
382 }
383 
384 
GetVersionRetention() const385 ICache::EKeepVersions CNetICacheClient::GetVersionRetention() const
386 {
387     NCBI_THROW(CNetCacheException, eNotImplemented, "GVRP is not implemented");
388 }
389 
390 
Store(const string & key,int version,const string & subkey,const void * data,size_t size,unsigned int time_to_live,const string &)391 void CNetICacheClient::Store(const string&  key,
392                              int            version,
393                              const string&  subkey,
394                              const void*    data,
395                              size_t         size,
396                              unsigned int   time_to_live,
397                              const string&  /*owner*/)
398 {
399     string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
400 
401     CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
402 
403     parameters.SetTTL(time_to_live);
404     parameters.SetCachingMode(CNetCacheAPI::eCaching_Disable);
405 
406     CNetCacheWriter writer(m_Impl, &blob_id, key,
407         m_Impl->m_CacheFlags & ICache::fBestReliability ?
408             eNetCache_Wait : eICache_NoWait, &parameters);
409 
410     writer.WriteBufferAndClose(reinterpret_cast<const char*>(data), size);
411 }
412 
413 
GetSize(const string & key,int version,const string & subkey)414 size_t CNetICacheClient::GetSize(const string&  key,
415                                  int            version,
416                                  const string&  subkey)
417 {
418     return GetBlobSize(key, version, subkey);
419 }
420 
GetBlobSize(const string & key,int version,const string & subkey,const CNamedParameterList * optional)421 size_t CNetICacheClient::GetBlobSize(const string& key,
422         int version, const string& subkey,
423         const CNamedParameterList* optional)
424 {
425     CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
426 
427     parameters.LoadNamedParameters(optional);
428 
429     return CheckBlobSize(NStr::StringToUInt8(m_Impl->ExecStdCmd(
430             "GSIZ", key, version, subkey, &parameters)));
431 }
432 
433 
GetBlobOwner(const string &,int,const string &,string * owner)434 void CNetICacheClient::GetBlobOwner(const string&, int, const string&,
435         string* owner)
436 {
437     ERR_POST("NetCache command 'GBLW' has been phased out");
438     *owner = kEmptyStr;
439 }
440 
GetReadStreamPart(const string & key,int version,const string & subkey,size_t offset,size_t part_size,size_t * blob_size_ptr,const CNamedParameterList * optional)441 IReader* SNetICacheClientImpl::GetReadStreamPart(
442     const string& key, int version, const string& subkey,
443     size_t offset, size_t part_size,
444     size_t* blob_size_ptr, const CNamedParameterList* optional)
445 {
446     try {
447         string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
448 
449         CNetCacheAPIParameters parameters(&m_DefaultParameters);
450 
451         parameters.LoadNamedParameters(optional);
452 
453         const char* cmd_name;
454         string cmd;
455 
456         if (offset == 0 && part_size == 0) {
457             cmd_name = "READ";
458             cmd = MakeStdCmd(cmd_name, blob_id, &parameters);
459         } else {
460             cmd_name = "READPART";
461             cmd = MakeStdCmd(cmd_name, blob_id, &parameters,
462                     ' ' + NStr::UInt8ToString((Uint8) offset) +
463                     ' ' + NStr::UInt8ToString((Uint8) part_size));
464         }
465 
466         CNetServer::SExecResult exec_result(
467                 ChooseServerAndExec(cmd, key, false, &parameters));
468 
469         unsigned* actual_age_ptr = parameters.GetActualBlobAgePtr();
470         if (parameters.GetMaxBlobAge() > 0 && actual_age_ptr != NULL)
471             *actual_age_ptr = x_ExtractBlobAge(exec_result, cmd_name);
472 
473         return new CNetCacheReader(this, blob_id,
474             exec_result, blob_size_ptr, &parameters);
475     }
476     catch (CNetCacheBlobTooOldException&) {
477         return NULL;
478     }
479     catch (CNetCacheException& e) {
480         if (e.GetErrCode() != CNetCacheException::eBlobNotFound)
481             throw;
482         return NULL;
483     }
484 }
485 
Read(const string & key,int version,const string & subkey,void * buf,size_t buf_size)486 bool CNetICacheClient::Read(const string& key,
487                             int           version,
488                             const string& subkey,
489                             void*         buf,
490                             size_t        buf_size)
491 {
492     return ReadPart(key, version, subkey, 0, 0, buf, buf_size);
493 }
494 
ReadPart(const string & key,int version,const string & subkey,size_t offset,size_t part_size,void * buf,size_t buf_size)495 bool CNetICacheClient::ReadPart(const string& key,
496     int version,
497     const string& subkey,
498     size_t offset,
499     size_t part_size,
500     void* buf,
501     size_t buf_size)
502 {
503     size_t blob_size;
504 
505     unique_ptr<IReader> rdr(m_Impl->GetReadStreamPart(
506         key, version, subkey, offset, part_size,
507             &blob_size, nc_caching_mode = CNetCacheAPI::eCaching_Disable));
508 
509     if (rdr.get() == NULL)
510         return false;
511 
512     return SNetCacheAPIImpl::ReadBuffer(*rdr, (char*) buf, buf_size,
513         NULL, blob_size) == CNetCacheAPI::eReadComplete;
514 }
515 
516 
GetBlobAccess(const string & key,int version,const string & subkey,SBlobAccessDescr * blob_descr)517 void CNetICacheClient::GetBlobAccess(const string&     key,
518                                      int               version,
519                                      const string&     subkey,
520                                      SBlobAccessDescr* blob_descr)
521 {
522     if (blob_descr->return_current_version) {
523         blob_descr->return_current_version_supported = true;
524         blob_descr->reader.reset(m_Impl->ReadCurrentBlobNotOlderThan(
525                 key,
526                 subkey,
527                 &blob_descr->blob_size,
528                 &blob_descr->current_version,
529                 &blob_descr->current_version_validity,
530                 blob_descr->maximum_age,
531                 &blob_descr->actual_age));
532     } else if (blob_descr->maximum_age > 0) {
533         blob_descr->reader.reset(m_Impl->GetReadStreamPart(key, version, subkey,
534                 0, 0, &blob_descr->blob_size,
535                 (nc_caching_mode = CNetCacheAPI::eCaching_AppDefault,
536                 nc_max_age = blob_descr->maximum_age,
537                 nc_actual_age = &blob_descr->actual_age)));
538     } else {
539         blob_descr->reader.reset(m_Impl->GetReadStreamPart(key, version, subkey,
540                 0, 0, &blob_descr->blob_size,
541                 nc_caching_mode = CNetCacheAPI::eCaching_AppDefault));
542     }
543 
544     if (blob_descr->reader.get() != NULL) {
545         blob_descr->blob_found = true;
546 
547         if (blob_descr->buf && blob_descr->buf_size >= blob_descr->blob_size) {
548             try {
549                 SNetCacheAPIImpl::ReadBuffer(*blob_descr->reader,
550                     blob_descr->buf, blob_descr->buf_size,
551                         NULL, blob_descr->blob_size);
552             }
553             catch (CNetServiceException&) {
554                 blob_descr->reader.reset(NULL);
555                 throw;
556             }
557             blob_descr->reader.reset(NULL);
558         }
559     } else {
560         blob_descr->blob_size = 0;
561         blob_descr->blob_found = false;
562     }
563 }
564 
565 
GetWriteStream(const string & key,int version,const string & subkey,unsigned int time_to_live,const string &)566 IWriter* CNetICacheClient::GetWriteStream(const string&    key,
567                                           int              version,
568                                           const string&    subkey,
569                                           unsigned int     time_to_live,
570                                           const string&    /*owner*/)
571 {
572     return GetNetCacheWriter(key, version, subkey, nc_blob_ttl = time_to_live);
573 }
574 
575 
GetNetCacheWriter(const string & key,int version,const string & subkey,const CNamedParameterList * optional)576 IEmbeddedStreamWriter* CNetICacheClient::GetNetCacheWriter(const string& key,
577         int version, const string& subkey, const CNamedParameterList* optional)
578 {
579     string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
580 
581     CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
582 
583     parameters.LoadNamedParameters(optional);
584 
585     return new CNetCacheWriter(m_Impl, &blob_id, key,
586         m_Impl->m_CacheFlags & ICache::fBestReliability ?
587             eNetCache_Wait : eICache_NoWait, &parameters);
588 }
589 
590 
Remove(const string & key,int version,const string & subkey)591 void CNetICacheClient::Remove(const string&    key,
592                               int              version,
593                               const string&    subkey)
594 {
595     RemoveBlob(key, version, subkey);
596 }
597 
RemoveBlob(const string & key,int version,const string & subkey,const CNamedParameterList * optional)598 void CNetICacheClient::RemoveBlob(const string& key,
599         int version, const string& subkey,
600         const CNamedParameterList* optional)
601 {
602     CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
603 
604     parameters.LoadNamedParameters(optional);
605 
606     m_Impl->ExecStdCmd("REMO", key, version, subkey, &parameters);
607 }
608 
GetAccessTime(const string &,int,const string &)609 time_t CNetICacheClient::GetAccessTime(const string&, int, const string&)
610 {
611     NCBI_THROW(CNetCacheException, eNotImplemented, "GACT is not implemented");
612 }
613 
614 
HasBlobs(const string & key,const string & subkey)615 bool CNetICacheClient::HasBlobs(const string&  key,
616                                 const string&  subkey)
617 {
618     return HasBlob(key, subkey);
619 }
620 
HasBlob(const string & key,const string & subkey,const CNamedParameterList * optional)621 bool CNetICacheClient::HasBlob(const string& key, const string& subkey,
622         const CNamedParameterList* optional)
623 {
624     CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
625 
626     parameters.LoadNamedParameters(optional);
627 
628     string response(m_Impl->ExecStdCmd("HASB", key, 0, subkey, &parameters));
629     return (response[0] == '1'|| NStr::StartsWith(response, "0, VER="));
630 }
631 
Purge(time_t access_timeout)632 void CNetICacheClient::Purge(time_t access_timeout)
633 {
634     Purge(kEmptyStr, kEmptyStr, access_timeout);
635 }
636 
637 
Purge(const string & key,const string & subkey,time_t access_timeout)638 void CNetICacheClient::Purge(const string&    key,
639                              const string&    subkey,
640                              time_t           access_timeout)
641 {
642     if (access_timeout) {
643         NCBI_THROW(CNetCacheException, eNotImplemented, "Not implemented");
644     }
645 
646     if (!subkey.empty()) {
647         if (key.empty()) {
648             NCBI_THROW(CNetCacheException, eNotImplemented, "Not implemented");
649         }
650 
651         return RemoveBlob(key, 0, subkey);
652     }
653 
654     const auto cmd = m_Impl->MakeStdCmd("PURGE2", "'" + key + "'", &m_Impl->m_DefaultParameters);
655     m_Impl->ChooseServerAndExec(cmd, key, false, &m_Impl->m_DefaultParameters);
656 }
657 
GetReadStream(const string & key,int version,const string & subkey,size_t * blob_size_ptr,const CNamedParameterList * optional)658 IReader* CNetICacheClient::GetReadStream(
659     const string& key,
660     int version,
661     const string& subkey,
662     size_t* blob_size_ptr,
663     const CNamedParameterList* optional)
664 {
665     return GetReadStreamPart(key, version, subkey,
666         0, 0, blob_size_ptr, optional);
667 }
668 
GetReadStreamPart(const string & key,int version,const string & subkey,size_t offset,size_t part_size,size_t * blob_size_ptr,const CNamedParameterList * optional)669 IReader* CNetICacheClient::GetReadStreamPart(
670     const string& key,
671     int version,
672     const string& subkey,
673     size_t offset,
674     size_t part_size,
675     size_t* blob_size_ptr,
676     const CNamedParameterList* optional)
677 {
678     return m_Impl->GetReadStreamPart(key, version, subkey,
679         offset, part_size, blob_size_ptr, optional);
680 }
681 
GetReadStream(const string & key,int version,const string & subkey)682 IReader* CNetICacheClient::GetReadStream(const string&  key,
683                                          int            version,
684                                          const string&  subkey)
685 {
686     return GetReadStream(key, version, subkey, NULL,
687             nc_caching_mode = CNetCacheAPI::eCaching_AppDefault);
688 }
689 
GetReadStream(const string & key,const string & subkey,int * version,size_t * blob_size_ptr,const CNamedParameterList * optional)690 IReader* CNetICacheClientExt::GetReadStream(const string& key,
691         const string& subkey, int* version,
692         size_t* blob_size_ptr, const CNamedParameterList* optional)
693 {
694     ICache::EBlobVersionValidity not_used;
695     return m_Impl->ReadCurrentBlobNotOlderThan(key, subkey, blob_size_ptr,
696             version, &not_used, 0, NULL, optional);
697 }
698 
GetReadStream(const string & key,const string & subkey,int * version,ICache::EBlobVersionValidity * validity)699 IReader* CNetICacheClient::GetReadStream(const string& key,
700         const string& subkey, int* version,
701         ICache::EBlobVersionValidity* validity)
702 {
703     return m_Impl->ReadCurrentBlobNotOlderThan(key, subkey, NULL,
704             version, validity, 0, NULL);
705 }
706 
ReadCurrentBlobNotOlderThan(const string & key,const string & subkey,size_t * blob_size_ptr,int * version,ICache::EBlobVersionValidity * validity,unsigned max_age,unsigned * actual_age,const CNamedParameterList * optional)707 IReader* SNetICacheClientImpl::ReadCurrentBlobNotOlderThan(const string& key,
708         const string& subkey,
709         size_t* blob_size_ptr,
710         int* version,
711         ICache::EBlobVersionValidity* validity,
712         unsigned max_age, unsigned* actual_age,
713         const CNamedParameterList* optional)
714 {
715     try {
716         string blob_id(s_KeySubkeyToBlobID(key, subkey));
717         CNetCacheAPIParameters parameters(&m_DefaultParameters);
718 
719         parameters.LoadNamedParameters(optional);
720 
721         if (max_age != 0) {
722             parameters.SetMaxBlobAge(max_age);
723         }
724 
725         string cmd = MakeStdCmd("READLAST", blob_id, &parameters);
726 
727         CNetServer::SExecResult exec_result(ChooseServerAndExec(cmd,
728                 key, false, &m_DefaultParameters));
729 
730         string::size_type pos = exec_result.response.find("VER=");
731 
732         if (pos == string::npos) {
733             NCBI_THROW(CNetCacheException, eInvalidServerResponse,
734                 "No VER field in READLAST output");
735         }
736 
737         *version = (int) NStr::StringToUInt(
738             exec_result.response.c_str() + pos + sizeof("VER=") - 1,
739             NStr::fAllowTrailingSymbols);
740 
741         pos = exec_result.response.find("VALID=");
742 
743         if (pos == string::npos) {
744             NCBI_THROW(CNetCacheException, eInvalidServerResponse,
745                 "No VALID field in READLAST output");
746         }
747 
748         switch (exec_result.response[pos + sizeof("VALID=") - 1]) {
749         case 't': case 'T': case 'y': case 'Y':
750             *validity = ICache::eCurrent;
751             break;
752         case 'f': case 'F': case 'n': case 'N':
753             *validity = ICache::eExpired;
754             break;
755         default:
756             NCBI_THROW(CNetCacheException, eInvalidServerResponse,
757                 "Invalid format of the VALID field in READLAST output");
758         }
759 
760         if (max_age > 0)
761             *actual_age = x_ExtractBlobAge(exec_result, "READLAST");
762 
763         return new CNetCacheReader(this, blob_id, exec_result,
764                 blob_size_ptr, &m_DefaultParameters);
765     } catch (CNetCacheBlobTooOldException& e) {
766         if (actual_age != NULL)
767             *actual_age = e.GetAge();
768         *version = e.GetVersion();
769 
770         return NULL;
771     }
772     catch (CNetCacheException& e) {
773         if (e.GetErrCode() != CNetCacheException::eBlobNotFound)
774             throw;
775         return NULL;
776     }
777 }
778 
779 class CSetValidWarningSuppressor
780 {
781 public:
CSetValidWarningSuppressor(INetServerConnectionListener * listener,const string & key,const string & subkey,int version)782     CSetValidWarningSuppressor(
783             INetServerConnectionListener* listener,
784             const string& key,
785             const string& subkey,
786             int version) :
787         m_Listener(listener),
788         m_Key(key),
789         m_Subkey(subkey),
790         m_Version(version)
791     {
792         _ASSERT(m_Listener);
793 
794         auto warning_handler = [&](const string& m, CNetServer s) {
795             return OnWarning(m, s);
796         };
797 
798         m_Listener->SetWarningHandler(warning_handler);
799     }
800 
~CSetValidWarningSuppressor()801     ~CSetValidWarningSuppressor()
802     {
803         m_Listener->SetWarningHandler(nullptr);
804     }
805 
806 private:
807     bool OnWarning(const string& warn_msg, CNetServer server);
808 
809     CRef<INetServerConnectionListener> m_Listener;
810     const string m_Key;
811     const string m_Subkey;
812     const int m_Version;
813 };
814 
OnWarning(const string & warn_msg,CNetServer)815 bool CSetValidWarningSuppressor::OnWarning(const string& warn_msg, CNetServer)
816 {
817     SIZE_TYPE ver_pos = NStr::FindCase(warn_msg,
818             CTempString("VER=", sizeof("VER=") - 1));
819 
820     if (ver_pos == NPOS)
821         return false;
822     else {
823         int version = atoi(warn_msg.c_str() + ver_pos + sizeof("VER=") - 1);
824         if (version < m_Version) {
825             ERR_POST("Cache actualization error (key \"" << m_Key <<
826                     "\", subkey \"" << m_Subkey <<
827                     "\"): the cached blob version downgraded from " <<
828                     m_Version << " to " << version);
829         }
830     }
831 
832     return true;
833 }
834 
SetBlobVersionAsCurrent(const string & key,const string & subkey,int version)835 void CNetICacheClient::SetBlobVersionAsCurrent(const string& key,
836         const string& subkey, int version)
837 {
838     CSetValidWarningSuppressor warning_suppressor(m_Impl->m_Service->m_Listener, key, subkey, version);
839 
840     CNetServer::SExecResult exec_result(
841             m_Impl->ChooseServerAndExec(
842                     m_Impl->MakeStdCmd("SETVALID",
843                             s_KeyVersionSubkeyToBlobID(key, version, subkey),
844                             &m_Impl->m_DefaultParameters),
845                     key,
846                     false,
847                     &m_Impl->m_DefaultParameters));
848 
849     if (!exec_result.response.empty()) {
850         ERR_POST("SetBlobVersionAsCurrent(\"" << key << "\", " <<
851             version << ", \"" << subkey << "\"): " << exec_result.response);
852     }
853 }
854 
GetBlobInfo(const string & key,int version,const string & subkey,const CNamedParameterList * optional)855 CNetServerMultilineCmdOutput CNetICacheClient::GetBlobInfo(const string& key,
856         int version, const string& subkey,
857         const CNamedParameterList* optional)
858 {
859     CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
860 
861     parameters.LoadNamedParameters(optional);
862 
863     CNetServerMultilineCmdOutput output(
864             m_Impl->ChooseServerAndExec(
865                     m_Impl->MakeStdCmd("GETMETA",
866                             s_KeyVersionSubkeyToBlobID(key, version, subkey),
867                             &parameters),
868                     key,
869                     true,
870                     &parameters));
871 
872     output->SetNetCacheCompatMode();
873 
874     return output;
875 }
876 
PrintBlobInfo(const string & key,int version,const string & subkey)877 void CNetICacheClient::PrintBlobInfo(const string& key,
878         int version, const string& subkey)
879 {
880     CNetServerMultilineCmdOutput output(GetBlobInfo(key, version, subkey));
881 
882     string line;
883 
884     if (output.ReadLine(line)) {
885         if (!NStr::StartsWith(line, "SIZE="))
886             NcbiCout << line << NcbiEndl;
887         while (output.ReadLine(line))
888             NcbiCout << line << NcbiEndl;
889     }
890 }
891 
GetService()892 CNetService CNetICacheClient::GetService()
893 {
894     return m_Impl->m_Service;
895 }
896 
MakeStdCmd(const char * cmd_base,const string & blob_id,const CNetCacheAPIParameters * parameters,const string & injection)897 string SNetICacheClientImpl::MakeStdCmd(const char* cmd_base,
898     const string& blob_id, const CNetCacheAPIParameters* parameters,
899     const string& injection)
900 {
901     string cmd("IC(" + NStr::PrintableString(parameters->GetCacheName()));
902     cmd.append(") ");
903 
904     cmd.append(cmd_base);
905 
906     cmd.push_back(' ');
907 
908     cmd.append(blob_id);
909 
910     if (!injection.empty())
911         cmd.append(injection);
912 
913     AppendClientIPSessionIDPasswordAgeHitID(&cmd, parameters);
914 
915     return cmd;
916 }
917 
ExecStdCmd(const char * cmd_base,const string & key,int version,const string & subkey,const CNetCacheAPIParameters * parameters)918 string SNetICacheClientImpl::ExecStdCmd(const char* cmd_base,
919     const string& key, int version, const string& subkey,
920     const CNetCacheAPIParameters* parameters)
921 {
922     return ChooseServerAndExec(
923             MakeStdCmd(cmd_base,
924                     s_KeyVersionSubkeyToBlobID(key, version, subkey),
925                     parameters),
926             key,
927             false,
928             parameters).response;
929 }
930 
GetCacheName(void) const931 string CNetICacheClient::GetCacheName(void) const
932 {
933     return m_Impl->m_DefaultParameters.GetCacheName();
934 }
935 
936 
SameCacheParams(const TCacheParams *) const937 bool CNetICacheClient::SameCacheParams(const TCacheParams*) const
938 {
939     return false;
940 }
941 
GetServer(CNetServer::TInstance server)942 CNetICacheClientExt CNetICacheClientExt::GetServer(CNetServer::TInstance server)
943 {
944     return new SNetICacheClientImpl(server->m_ServerInPool, m_Impl);
945 }
946 
Search(CNetICacheClient::CExpression expression,CNetICacheClient::CFields filter)947 vector<CNetICacheClient::CBlobInfo> CNetICacheClient::Search(
948         CNetICacheClient::CExpression expression,
949         CNetICacheClient::CFields filter)
950 {
951     const auto parameters = &m_Impl->m_DefaultParameters;
952     const auto cache_name = NStr::PrintableString(parameters->GetCacheName());
953     string ids;
954     m_Impl->AppendClientIPSessionIDPasswordAgeHitID(&ids, parameters);
955     ostringstream oss;
956     oss << "IC(" << cache_name << ") BLIST2" << expression + filter << ids;
957 
958     CNetServerMultilineCmdOutput output(
959             m_Impl->ChooseServerAndExec(
960                 oss.str(),
961                 kEmptyStr,
962                 true,
963                 &m_Impl->m_DefaultParameters));
964 
965     output->SetNetCacheCompatMode();
966     string line;
967     vector<CNetICacheClient::CBlobInfo> result;
968 
969     while (output.ReadLine(line) && !line.empty()) {
970         CBlobInfo blob_info;
971         blob_info << line;
972         result.push_back(blob_info);
973     }
974 
975     return result;
976 }
977 
ProlongBlobLifetime(const string & key,const string & subkey,const CTimeout & ttl,const CNamedParameterList * optional)978 void CNetICacheClientExt::ProlongBlobLifetime(const string& key, const string& subkey,
979         const CTimeout& ttl, const CNamedParameterList* optional)
980 {
981     CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
982     parameters.LoadNamedParameters(optional);
983 
984     string cmd("PROLONG \"");
985     cmd += NStr::PrintableString(parameters.GetCacheName());
986     cmd += "\" \"";
987     cmd += key;
988     cmd += "\" \"";
989     cmd += subkey;
990     cmd += "\" ttl=";
991     cmd += NStr::NumericToString((unsigned)ttl.GetAsDouble());
992 
993     m_Impl->AppendClientIPSessionIDHitID(&cmd);
994     m_Impl->ChooseServerAndExec(cmd, key, false, &m_Impl->m_DefaultParameters);
995 }
996 
GetNetCacheAPI()997 SNetCacheAPIImpl* CNetICacheClientExt::GetNetCacheAPI()
998 {
999     return new SNetCacheAPIImpl(m_Impl.GetObject());
1000 }
1001 
GetNetCacheAPI() const1002 const SNetCacheAPIImpl* CNetICacheClientExt::GetNetCacheAPI() const
1003 {
1004     return new SNetCacheAPIImpl(m_Impl.GetObject());
1005 }
1006 
1007 /// Class factory for NetCache implementation of ICache
1008 ///
1009 /// @internal
1010 ///
1011 class CNetICacheCF : public CICacheCF<CNetICacheClient>
1012 {
1013 public:
1014     typedef CICacheCF<CNetICacheClient> TParent;
1015 
1016 public:
CNetICacheCF()1017     CNetICacheCF() : TParent(kNetICacheDriverName, 0)
1018     {
1019     }
1020 
1021 private:
1022     virtual ICache* x_CreateInstance(
1023                    const string&    driver  = kEmptyStr,
1024                    CVersionInfo     version = NCBI_INTERFACE_VERSION(ICache),
1025                    const TPluginManagerParamTree* params = 0) const;
1026 };
1027 
1028 
x_CreateInstance(const string & driver,CVersionInfo version,const TPluginManagerParamTree * params) const1029 ICache* CNetICacheCF::x_CreateInstance(
1030            const string&                  driver,
1031            CVersionInfo                   version,
1032            const TPluginManagerParamTree* params) const
1033 {
1034     if ((!driver.empty() && driver != m_DriverName) ||
1035         version.Match(NCBI_INTERFACE_VERSION(ICache)) ==
1036             CVersionInfo::eNonCompatible)
1037         return 0;
1038 
1039     if (!params)
1040         return new CNetICacheClient;
1041 
1042     CConfig conf(params);
1043 
1044     return new CNetICacheClient(&conf, driver);
1045 }
1046 
1047 
NCBI_EntryPoint_xcache_netcache(CPluginManager<ICache>::TDriverInfoList & info_list,CPluginManager<ICache>::EEntryPointRequest method)1048 void NCBI_EntryPoint_xcache_netcache(
1049      CPluginManager<ICache>::TDriverInfoList&   info_list,
1050      CPluginManager<ICache>::EEntryPointRequest method)
1051 {
1052     CHostEntryPointImpl<CNetICacheCF>::NCBI_EntryPointImpl(info_list, method);
1053 }
1054 
1055 
Cache_RegisterDriver_NetCache(void)1056 void Cache_RegisterDriver_NetCache(void)
1057 {
1058     RegisterEntryPoint<ICache>( NCBI_EntryPoint_xcache_netcache );
1059 }
1060 
1061 END_NCBI_SCOPE
1062