1 /* $Id: neticache_client.cpp 617404 2020-09-30 19:02:00Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Anatoliy Kuznetsov, Dmitry Kazimirov
27 *
28 * File Description:
29 * Implementation of netcache ICache client.
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include "netcache_api_impl.hpp"
36
37 #include <connect/services/neticache_client.hpp>
38 #include <connect/services/error_codes.hpp>
39 #include <connect/services/netcache_api_expt.hpp>
40 #include <connect/services/srv_connections.hpp>
41
42 #include <connect/ncbi_conn_exception.hpp>
43
44 #include <util/cache/icache_cf.hpp>
45 #include <util/transmissionrw.hpp>
46
47 #include <corelib/ncbitime.hpp>
48 #include <corelib/request_ctx.hpp>
49 #include <corelib/plugin_manager_impl.hpp>
50 #include <corelib/ncbi_system.hpp>
51 #include <corelib/ncbi_safe_static.hpp>
52
53 #include <memory>
54 #include <sstream>
55
56 #define MAX_ICACHE_CACHE_NAME_LENGTH 64
57 #define MAX_ICACHE_KEY_LENGTH 256
58 #define MAX_ICACHE_SUBKEY_LENGTH 256
59
60 #define NCBI_USE_ERRCODE_X ConnServ_NetCache
61
62
63 BEGIN_NCBI_SCOPE
64
65 const char* const kNetICacheDriverName = "netcache";
66
s_CheckKeySubkey(const string & key,const string & subkey,string * encoded_key)67 static string s_CheckKeySubkey(
68 const string& key, const string& subkey, string* encoded_key)
69 {
70 encoded_key->push_back('"');
71 encoded_key->append(NStr::PrintableString(key));
72
73 string encoded_subkey(NStr::PrintableString(subkey));
74
75 if (encoded_key->length() > (1 + MAX_ICACHE_KEY_LENGTH) ||
76 encoded_subkey.length() > MAX_ICACHE_SUBKEY_LENGTH) {
77 NCBI_THROW(CNetCacheException, eKeyFormatError,
78 "ICache key or subkey is too long");
79 }
80
81 return encoded_subkey;
82 }
83
s_KeySubkeyToBlobID(const string & key,const string & subkey)84 static string s_KeySubkeyToBlobID(const string& key, const string& subkey)
85 {
86 string blob_id(kEmptyStr);
87 blob_id.reserve(1 + key.length() + 3 + subkey.length() + 1);
88
89 string encoded_subkey(s_CheckKeySubkey(key, subkey, &blob_id));
90
91 blob_id.append("\" \"", 3);
92
93 blob_id.append(encoded_subkey);
94
95 blob_id.push_back('"');
96
97 return blob_id;
98 }
99
s_KeyVersionSubkeyToBlobID(const string & key,int version,const string & subkey)100 static string s_KeyVersionSubkeyToBlobID(
101 const string& key, int version, const string& subkey)
102 {
103 string blob_id(kEmptyStr);
104 blob_id.reserve(1 + key.length() + 2 +
105 int((double)sizeof(version) * 1.5) + 2 + subkey.length() + 1);
106
107 string encoded_subkey(s_CheckKeySubkey(key, subkey, &blob_id));
108
109 blob_id.append("\" ", 2);
110 blob_id.append(NStr::IntToString(version));
111 blob_id.append(" \"", 2);
112
113 blob_id.append(encoded_subkey);
114
115 blob_id.push_back('"');
116
117 return blob_id;
118 }
119
120 static const char s_NetICacheAPIName[] = "NetICacheClient";
121
122 struct SNetICacheClientImpl : public SNetCacheAPIImpl
123 {
SNetICacheClientImplSNetICacheClientImpl124 SNetICacheClientImpl(CSynRegistryBuilder registry_builder,
125 const string& section,
126 const string& service_name,
127 const string& client_name,
128 const string& cache_name) :
129 m_CacheFlags(ICache::fBestPerformance)
130 {
131 m_DefaultParameters.SetCacheName(cache_name);
132 SRegSynonyms sections{ section, "netcache_api", "netcache_client", kNetICacheDriverName };
133 m_Service = SNetServiceImpl::Create(s_NetICacheAPIName, service_name, client_name,
134 new CNetCacheServerListener,
135 registry_builder, sections);
136 Init(registry_builder, sections);
137 }
138
SNetICacheClientImplSNetICacheClientImpl139 SNetICacheClientImpl(SNetServerInPool* server, SNetICacheClientImpl* parent) :
140 SNetCacheAPIImpl(server, parent),
141 m_CacheFlags(parent->m_CacheFlags)
142 {
143 }
144
145 CNetServer::SExecResult ChooseServerAndExec(const string& cmd,
146 const string& key,
147 bool multiline_output,
148 const CNetCacheAPIParameters* parameters);
149
150 string MakeStdCmd(const char* cmd_base, const string& blob_id,
151 const CNetCacheAPIParameters* parameters,
152 const string& injection = kEmptyStr);
153
154 string ExecStdCmd(const char* cmd_base, const string& key,
155 int version, const string& subkey,
156 const CNetCacheAPIParameters* parameters);
157
158 virtual CNetServerConnection InitiateWriteCmd(CNetCacheWriter* nc_writer,
159 const CNetCacheAPIParameters* parameters);
160
161 IReader* ReadCurrentBlobNotOlderThan(
162 const string& key, const string& subkey,
163 size_t* blob_size_ptr,
164 int* version,
165 ICache::EBlobVersionValidity* validity,
166 unsigned max_age, unsigned* actual_age,
167 const CNamedParameterList* optional = NULL);
168
169 IReader* GetReadStreamPart(const string& key,
170 int version, const string& subkey,
171 size_t offset, size_t part_size,
172 size_t* blob_size_ptr,
173 const CNamedParameterList* optional);
174
175 void Init(CSynRegistry& registry, const SRegSynonyms& sections);
176
177 ICache::TFlags m_CacheFlags;
178 };
179
Init(CSynRegistry & registry,const SRegSynonyms & sections)180 void SNetICacheClientImpl::Init(CSynRegistry& registry, const SRegSynonyms& sections)
181 {
182 SNetCacheAPIImpl::Init(registry, sections);
183
184 auto cache_name = m_DefaultParameters.GetCacheName();
185
186 if (cache_name.empty()) cache_name = registry.Get(sections, { "name", "cache_name" }, "default_cache");
187
188 if (cache_name.length() > MAX_ICACHE_CACHE_NAME_LENGTH) {
189 NCBI_THROW(CNetCacheException, eAuthenticationError, "NetICache: cache name is too long");
190 }
191
192 m_DefaultParameters.SetCacheName(cache_name);
193 m_DefaultParameters.SetTryAllServers(registry.Get(sections, "try_all_servers", false));
194 }
195
InitiateWriteCmd(CNetCacheWriter * nc_writer,const CNetCacheAPIParameters * parameters)196 CNetServerConnection SNetICacheClientImpl::InitiateWriteCmd(
197 CNetCacheWriter* nc_writer, const CNetCacheAPIParameters* parameters)
198 {
199 string cmd("IC(" + NStr::PrintableString(parameters->GetCacheName()));
200 cmd.append(") STOR ");
201
202 cmd.append(NStr::UIntToString(parameters->GetTTL()));
203 cmd.push_back(' ');
204 cmd.append(nc_writer->GetBlobID());
205 if (nc_writer->GetResponseType() == eNetCache_Wait)
206 cmd.append(" confirm=1");
207 AppendClientIPSessionIDPasswordAgeHitID(&cmd, parameters);
208 if (m_FlagsOnWrite) cmd.append(" flags=").append(to_string(m_FlagsOnWrite));
209
210 return ChooseServerAndExec(cmd, nc_writer->GetKey(),
211 false, parameters).conn;
212 }
213
CNetICacheClient(EAppRegistry,const string & conf_section)214 CNetICacheClient::CNetICacheClient(EAppRegistry,
215 const string& conf_section) :
216 m_Impl(new SNetICacheClientImpl(NULL, conf_section,
217 kEmptyStr, kEmptyStr, kEmptyStr))
218 {
219 }
220
CNetICacheClient(const string & host,unsigned short port,const string & cache_name,const string & client_name)221 CNetICacheClient::CNetICacheClient(
222 const string& host,
223 unsigned short port,
224 const string& cache_name,
225 const string& client_name) :
226 m_Impl(new SNetICacheClientImpl(NULL, kEmptyStr,
227 host + ':' + NStr::UIntToString(port),
228 client_name, cache_name))
229 {
230 }
231
CNetICacheClient(const string & service_name,const string & cache_name,const string & client_name)232 CNetICacheClient::CNetICacheClient(
233 const string& service_name,
234 const string& cache_name,
235 const string& client_name) :
236 m_Impl(new SNetICacheClientImpl(NULL, kEmptyStr,
237 service_name, client_name, cache_name))
238 {
239 }
240
CNetICacheClient(CConfig * config,const string & driver_name)241 CNetICacheClient::CNetICacheClient(CConfig* config, const string& driver_name) :
242 m_Impl(new SNetICacheClientImpl(config, driver_name,
243 kEmptyStr, kEmptyStr, kEmptyStr))
244 {
245 }
246
CNetICacheClient(const IRegistry & reg,const string & section)247 CNetICacheClient::CNetICacheClient(const IRegistry& reg, const string& section) :
248 m_Impl(new SNetICacheClientImpl(reg, section,
249 kEmptyStr, kEmptyStr, kEmptyStr))
250 {
251 }
252
SetCommunicationTimeout(const STimeout & to)253 void CNetICacheClient::SetCommunicationTimeout(const STimeout& to)
254 {
255 m_Impl->m_Service->m_ServerPool.SetCommunicationTimeout(to);
256 }
257
258
GetCommunicationTimeout() const259 STimeout CNetICacheClient::GetCommunicationTimeout() const
260 {
261 return m_Impl->m_Service->m_ServerPool.GetCommunicationTimeout();
262 }
263
264 class SWeightedServiceTraversal : public IServiceTraversal
265 {
266 public:
SWeightedServiceTraversal(CNetService::TInstance service,const string & key)267 SWeightedServiceTraversal(CNetService::TInstance service,
268 const string& key) :
269 m_Service(service),
270 m_Key(key)
271 {
272 }
273
274 virtual CNetServer BeginIteration();
275 virtual CNetServer NextServer();
276
277 private:
278 CNetService m_Service;
279 const string& m_Key;
280 CNetServiceIterator m_Iterator;
281 };
282
BeginIteration()283 CNetServer SWeightedServiceTraversal::BeginIteration()
284 {
285 return *(m_Iterator = m_Service.IterateByWeight(m_Key));
286 }
287
NextServer()288 CNetServer SWeightedServiceTraversal::NextServer()
289 {
290 return ++m_Iterator ? *m_Iterator : CNetServer();
291 }
292
ChooseServerAndExec(const string & cmd,const string & key,bool multiline_output,const CNetCacheAPIParameters * parameters)293 CNetServer::SExecResult SNetICacheClientImpl::ChooseServerAndExec(
294 const string& cmd,
295 const string& key,
296 bool multiline_output,
297 const CNetCacheAPIParameters* parameters)
298 {
299 CNetServer selected_server(parameters->GetServerToUse());
300 CNetServer* server_last_used_ptr(parameters->GetServerLastUsedPtr());
301
302 const auto try_all_servers = parameters->GetTryAllServers();
303 CNetServer::SExecResult exec_result;
304
305 if (!try_all_servers && selected_server) {
306 exec_result = selected_server.ExecWithRetry(cmd, multiline_output);
307
308 } else if (selected_server) {
309 ESwitch server_check = eDefault;
310 parameters->GetServerCheck(&server_check);
311
312 SNetCacheMirrorTraversal mirror_traversal(m_Service,
313 selected_server, server_check);
314
315 m_Service->IterateUntilExecOK(cmd, multiline_output, exec_result,
316 &mirror_traversal, SNetServiceImpl::eIgnoreServerErrors);
317 } else {
318 SWeightedServiceTraversal service_traversal(m_Service, key);
319
320 m_Service->IterateUntilExecOK(cmd, multiline_output, exec_result,
321 &service_traversal, try_all_servers ?
322 SNetServiceImpl::eIgnoreServerErrors : SNetServiceImpl::eRethrowAllServerErrors);
323 }
324
325 if (server_last_used_ptr != NULL)
326 *server_last_used_ptr = exec_result.conn->m_Server;
327
328 return exec_result;
329 }
330
RegisterSession(unsigned)331 void CNetICacheClient::RegisterSession(unsigned)
332 {
333 NCBI_THROW(CNetCacheException, eNotImplemented, "SMR is not implemented");
334 }
335
336
UnRegisterSession(unsigned)337 void CNetICacheClient::UnRegisterSession(unsigned)
338 {
339 NCBI_THROW(CNetCacheException, eNotImplemented, "SMU is not implemented");
340 }
341
342
GetFlags()343 ICache::TFlags CNetICacheClient::GetFlags()
344 {
345 return m_Impl->m_CacheFlags;
346 }
347
348
SetFlags(ICache::TFlags flags)349 void CNetICacheClient::SetFlags(ICache::TFlags flags)
350 {
351 m_Impl->m_CacheFlags = flags;
352 }
353
354
SetTimeStampPolicy(TTimeStampFlags,unsigned,unsigned)355 void CNetICacheClient::SetTimeStampPolicy(TTimeStampFlags, unsigned, unsigned)
356 {
357 NCBI_THROW(CNetCacheException, eNotImplemented, "STSP is not implemented");
358 }
359
360
GetTimeStampPolicy() const361 ICache::TTimeStampFlags CNetICacheClient::GetTimeStampPolicy() const
362 {
363 NCBI_THROW(CNetCacheException, eNotImplemented, "GTSP is not implemented");
364 }
365
366
GetTimeout() const367 int CNetICacheClient::GetTimeout() const
368 {
369 NCBI_THROW(CNetCacheException, eNotImplemented, "GTOU is not implemented");
370 }
371
372
IsOpen() const373 bool CNetICacheClient::IsOpen() const
374 {
375 NCBI_THROW(CNetCacheException, eNotImplemented, "ISOP is not implemented");
376 }
377
378
SetVersionRetention(EKeepVersions)379 void CNetICacheClient::SetVersionRetention(EKeepVersions)
380 {
381 NCBI_THROW(CNetCacheException, eNotImplemented, "SVRP is not implemented");
382 }
383
384
GetVersionRetention() const385 ICache::EKeepVersions CNetICacheClient::GetVersionRetention() const
386 {
387 NCBI_THROW(CNetCacheException, eNotImplemented, "GVRP is not implemented");
388 }
389
390
Store(const string & key,int version,const string & subkey,const void * data,size_t size,unsigned int time_to_live,const string &)391 void CNetICacheClient::Store(const string& key,
392 int version,
393 const string& subkey,
394 const void* data,
395 size_t size,
396 unsigned int time_to_live,
397 const string& /*owner*/)
398 {
399 string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
400
401 CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
402
403 parameters.SetTTL(time_to_live);
404 parameters.SetCachingMode(CNetCacheAPI::eCaching_Disable);
405
406 CNetCacheWriter writer(m_Impl, &blob_id, key,
407 m_Impl->m_CacheFlags & ICache::fBestReliability ?
408 eNetCache_Wait : eICache_NoWait, ¶meters);
409
410 writer.WriteBufferAndClose(reinterpret_cast<const char*>(data), size);
411 }
412
413
GetSize(const string & key,int version,const string & subkey)414 size_t CNetICacheClient::GetSize(const string& key,
415 int version,
416 const string& subkey)
417 {
418 return GetBlobSize(key, version, subkey);
419 }
420
GetBlobSize(const string & key,int version,const string & subkey,const CNamedParameterList * optional)421 size_t CNetICacheClient::GetBlobSize(const string& key,
422 int version, const string& subkey,
423 const CNamedParameterList* optional)
424 {
425 CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
426
427 parameters.LoadNamedParameters(optional);
428
429 return CheckBlobSize(NStr::StringToUInt8(m_Impl->ExecStdCmd(
430 "GSIZ", key, version, subkey, ¶meters)));
431 }
432
433
GetBlobOwner(const string &,int,const string &,string * owner)434 void CNetICacheClient::GetBlobOwner(const string&, int, const string&,
435 string* owner)
436 {
437 ERR_POST("NetCache command 'GBLW' has been phased out");
438 *owner = kEmptyStr;
439 }
440
GetReadStreamPart(const string & key,int version,const string & subkey,size_t offset,size_t part_size,size_t * blob_size_ptr,const CNamedParameterList * optional)441 IReader* SNetICacheClientImpl::GetReadStreamPart(
442 const string& key, int version, const string& subkey,
443 size_t offset, size_t part_size,
444 size_t* blob_size_ptr, const CNamedParameterList* optional)
445 {
446 try {
447 string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
448
449 CNetCacheAPIParameters parameters(&m_DefaultParameters);
450
451 parameters.LoadNamedParameters(optional);
452
453 const char* cmd_name;
454 string cmd;
455
456 if (offset == 0 && part_size == 0) {
457 cmd_name = "READ";
458 cmd = MakeStdCmd(cmd_name, blob_id, ¶meters);
459 } else {
460 cmd_name = "READPART";
461 cmd = MakeStdCmd(cmd_name, blob_id, ¶meters,
462 ' ' + NStr::UInt8ToString((Uint8) offset) +
463 ' ' + NStr::UInt8ToString((Uint8) part_size));
464 }
465
466 CNetServer::SExecResult exec_result(
467 ChooseServerAndExec(cmd, key, false, ¶meters));
468
469 unsigned* actual_age_ptr = parameters.GetActualBlobAgePtr();
470 if (parameters.GetMaxBlobAge() > 0 && actual_age_ptr != NULL)
471 *actual_age_ptr = x_ExtractBlobAge(exec_result, cmd_name);
472
473 return new CNetCacheReader(this, blob_id,
474 exec_result, blob_size_ptr, ¶meters);
475 }
476 catch (CNetCacheBlobTooOldException&) {
477 return NULL;
478 }
479 catch (CNetCacheException& e) {
480 if (e.GetErrCode() != CNetCacheException::eBlobNotFound)
481 throw;
482 return NULL;
483 }
484 }
485
Read(const string & key,int version,const string & subkey,void * buf,size_t buf_size)486 bool CNetICacheClient::Read(const string& key,
487 int version,
488 const string& subkey,
489 void* buf,
490 size_t buf_size)
491 {
492 return ReadPart(key, version, subkey, 0, 0, buf, buf_size);
493 }
494
ReadPart(const string & key,int version,const string & subkey,size_t offset,size_t part_size,void * buf,size_t buf_size)495 bool CNetICacheClient::ReadPart(const string& key,
496 int version,
497 const string& subkey,
498 size_t offset,
499 size_t part_size,
500 void* buf,
501 size_t buf_size)
502 {
503 size_t blob_size;
504
505 unique_ptr<IReader> rdr(m_Impl->GetReadStreamPart(
506 key, version, subkey, offset, part_size,
507 &blob_size, nc_caching_mode = CNetCacheAPI::eCaching_Disable));
508
509 if (rdr.get() == NULL)
510 return false;
511
512 return SNetCacheAPIImpl::ReadBuffer(*rdr, (char*) buf, buf_size,
513 NULL, blob_size) == CNetCacheAPI::eReadComplete;
514 }
515
516
GetBlobAccess(const string & key,int version,const string & subkey,SBlobAccessDescr * blob_descr)517 void CNetICacheClient::GetBlobAccess(const string& key,
518 int version,
519 const string& subkey,
520 SBlobAccessDescr* blob_descr)
521 {
522 if (blob_descr->return_current_version) {
523 blob_descr->return_current_version_supported = true;
524 blob_descr->reader.reset(m_Impl->ReadCurrentBlobNotOlderThan(
525 key,
526 subkey,
527 &blob_descr->blob_size,
528 &blob_descr->current_version,
529 &blob_descr->current_version_validity,
530 blob_descr->maximum_age,
531 &blob_descr->actual_age));
532 } else if (blob_descr->maximum_age > 0) {
533 blob_descr->reader.reset(m_Impl->GetReadStreamPart(key, version, subkey,
534 0, 0, &blob_descr->blob_size,
535 (nc_caching_mode = CNetCacheAPI::eCaching_AppDefault,
536 nc_max_age = blob_descr->maximum_age,
537 nc_actual_age = &blob_descr->actual_age)));
538 } else {
539 blob_descr->reader.reset(m_Impl->GetReadStreamPart(key, version, subkey,
540 0, 0, &blob_descr->blob_size,
541 nc_caching_mode = CNetCacheAPI::eCaching_AppDefault));
542 }
543
544 if (blob_descr->reader.get() != NULL) {
545 blob_descr->blob_found = true;
546
547 if (blob_descr->buf && blob_descr->buf_size >= blob_descr->blob_size) {
548 try {
549 SNetCacheAPIImpl::ReadBuffer(*blob_descr->reader,
550 blob_descr->buf, blob_descr->buf_size,
551 NULL, blob_descr->blob_size);
552 }
553 catch (CNetServiceException&) {
554 blob_descr->reader.reset(NULL);
555 throw;
556 }
557 blob_descr->reader.reset(NULL);
558 }
559 } else {
560 blob_descr->blob_size = 0;
561 blob_descr->blob_found = false;
562 }
563 }
564
565
GetWriteStream(const string & key,int version,const string & subkey,unsigned int time_to_live,const string &)566 IWriter* CNetICacheClient::GetWriteStream(const string& key,
567 int version,
568 const string& subkey,
569 unsigned int time_to_live,
570 const string& /*owner*/)
571 {
572 return GetNetCacheWriter(key, version, subkey, nc_blob_ttl = time_to_live);
573 }
574
575
GetNetCacheWriter(const string & key,int version,const string & subkey,const CNamedParameterList * optional)576 IEmbeddedStreamWriter* CNetICacheClient::GetNetCacheWriter(const string& key,
577 int version, const string& subkey, const CNamedParameterList* optional)
578 {
579 string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
580
581 CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
582
583 parameters.LoadNamedParameters(optional);
584
585 return new CNetCacheWriter(m_Impl, &blob_id, key,
586 m_Impl->m_CacheFlags & ICache::fBestReliability ?
587 eNetCache_Wait : eICache_NoWait, ¶meters);
588 }
589
590
Remove(const string & key,int version,const string & subkey)591 void CNetICacheClient::Remove(const string& key,
592 int version,
593 const string& subkey)
594 {
595 RemoveBlob(key, version, subkey);
596 }
597
RemoveBlob(const string & key,int version,const string & subkey,const CNamedParameterList * optional)598 void CNetICacheClient::RemoveBlob(const string& key,
599 int version, const string& subkey,
600 const CNamedParameterList* optional)
601 {
602 CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
603
604 parameters.LoadNamedParameters(optional);
605
606 m_Impl->ExecStdCmd("REMO", key, version, subkey, ¶meters);
607 }
608
GetAccessTime(const string &,int,const string &)609 time_t CNetICacheClient::GetAccessTime(const string&, int, const string&)
610 {
611 NCBI_THROW(CNetCacheException, eNotImplemented, "GACT is not implemented");
612 }
613
614
HasBlobs(const string & key,const string & subkey)615 bool CNetICacheClient::HasBlobs(const string& key,
616 const string& subkey)
617 {
618 return HasBlob(key, subkey);
619 }
620
HasBlob(const string & key,const string & subkey,const CNamedParameterList * optional)621 bool CNetICacheClient::HasBlob(const string& key, const string& subkey,
622 const CNamedParameterList* optional)
623 {
624 CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
625
626 parameters.LoadNamedParameters(optional);
627
628 string response(m_Impl->ExecStdCmd("HASB", key, 0, subkey, ¶meters));
629 return (response[0] == '1'|| NStr::StartsWith(response, "0, VER="));
630 }
631
Purge(time_t access_timeout)632 void CNetICacheClient::Purge(time_t access_timeout)
633 {
634 Purge(kEmptyStr, kEmptyStr, access_timeout);
635 }
636
637
Purge(const string & key,const string & subkey,time_t access_timeout)638 void CNetICacheClient::Purge(const string& key,
639 const string& subkey,
640 time_t access_timeout)
641 {
642 if (access_timeout) {
643 NCBI_THROW(CNetCacheException, eNotImplemented, "Not implemented");
644 }
645
646 if (!subkey.empty()) {
647 if (key.empty()) {
648 NCBI_THROW(CNetCacheException, eNotImplemented, "Not implemented");
649 }
650
651 return RemoveBlob(key, 0, subkey);
652 }
653
654 const auto cmd = m_Impl->MakeStdCmd("PURGE2", "'" + key + "'", &m_Impl->m_DefaultParameters);
655 m_Impl->ChooseServerAndExec(cmd, key, false, &m_Impl->m_DefaultParameters);
656 }
657
GetReadStream(const string & key,int version,const string & subkey,size_t * blob_size_ptr,const CNamedParameterList * optional)658 IReader* CNetICacheClient::GetReadStream(
659 const string& key,
660 int version,
661 const string& subkey,
662 size_t* blob_size_ptr,
663 const CNamedParameterList* optional)
664 {
665 return GetReadStreamPart(key, version, subkey,
666 0, 0, blob_size_ptr, optional);
667 }
668
GetReadStreamPart(const string & key,int version,const string & subkey,size_t offset,size_t part_size,size_t * blob_size_ptr,const CNamedParameterList * optional)669 IReader* CNetICacheClient::GetReadStreamPart(
670 const string& key,
671 int version,
672 const string& subkey,
673 size_t offset,
674 size_t part_size,
675 size_t* blob_size_ptr,
676 const CNamedParameterList* optional)
677 {
678 return m_Impl->GetReadStreamPart(key, version, subkey,
679 offset, part_size, blob_size_ptr, optional);
680 }
681
GetReadStream(const string & key,int version,const string & subkey)682 IReader* CNetICacheClient::GetReadStream(const string& key,
683 int version,
684 const string& subkey)
685 {
686 return GetReadStream(key, version, subkey, NULL,
687 nc_caching_mode = CNetCacheAPI::eCaching_AppDefault);
688 }
689
GetReadStream(const string & key,const string & subkey,int * version,size_t * blob_size_ptr,const CNamedParameterList * optional)690 IReader* CNetICacheClientExt::GetReadStream(const string& key,
691 const string& subkey, int* version,
692 size_t* blob_size_ptr, const CNamedParameterList* optional)
693 {
694 ICache::EBlobVersionValidity not_used;
695 return m_Impl->ReadCurrentBlobNotOlderThan(key, subkey, blob_size_ptr,
696 version, ¬_used, 0, NULL, optional);
697 }
698
GetReadStream(const string & key,const string & subkey,int * version,ICache::EBlobVersionValidity * validity)699 IReader* CNetICacheClient::GetReadStream(const string& key,
700 const string& subkey, int* version,
701 ICache::EBlobVersionValidity* validity)
702 {
703 return m_Impl->ReadCurrentBlobNotOlderThan(key, subkey, NULL,
704 version, validity, 0, NULL);
705 }
706
ReadCurrentBlobNotOlderThan(const string & key,const string & subkey,size_t * blob_size_ptr,int * version,ICache::EBlobVersionValidity * validity,unsigned max_age,unsigned * actual_age,const CNamedParameterList * optional)707 IReader* SNetICacheClientImpl::ReadCurrentBlobNotOlderThan(const string& key,
708 const string& subkey,
709 size_t* blob_size_ptr,
710 int* version,
711 ICache::EBlobVersionValidity* validity,
712 unsigned max_age, unsigned* actual_age,
713 const CNamedParameterList* optional)
714 {
715 try {
716 string blob_id(s_KeySubkeyToBlobID(key, subkey));
717 CNetCacheAPIParameters parameters(&m_DefaultParameters);
718
719 parameters.LoadNamedParameters(optional);
720
721 if (max_age != 0) {
722 parameters.SetMaxBlobAge(max_age);
723 }
724
725 string cmd = MakeStdCmd("READLAST", blob_id, ¶meters);
726
727 CNetServer::SExecResult exec_result(ChooseServerAndExec(cmd,
728 key, false, &m_DefaultParameters));
729
730 string::size_type pos = exec_result.response.find("VER=");
731
732 if (pos == string::npos) {
733 NCBI_THROW(CNetCacheException, eInvalidServerResponse,
734 "No VER field in READLAST output");
735 }
736
737 *version = (int) NStr::StringToUInt(
738 exec_result.response.c_str() + pos + sizeof("VER=") - 1,
739 NStr::fAllowTrailingSymbols);
740
741 pos = exec_result.response.find("VALID=");
742
743 if (pos == string::npos) {
744 NCBI_THROW(CNetCacheException, eInvalidServerResponse,
745 "No VALID field in READLAST output");
746 }
747
748 switch (exec_result.response[pos + sizeof("VALID=") - 1]) {
749 case 't': case 'T': case 'y': case 'Y':
750 *validity = ICache::eCurrent;
751 break;
752 case 'f': case 'F': case 'n': case 'N':
753 *validity = ICache::eExpired;
754 break;
755 default:
756 NCBI_THROW(CNetCacheException, eInvalidServerResponse,
757 "Invalid format of the VALID field in READLAST output");
758 }
759
760 if (max_age > 0)
761 *actual_age = x_ExtractBlobAge(exec_result, "READLAST");
762
763 return new CNetCacheReader(this, blob_id, exec_result,
764 blob_size_ptr, &m_DefaultParameters);
765 } catch (CNetCacheBlobTooOldException& e) {
766 if (actual_age != NULL)
767 *actual_age = e.GetAge();
768 *version = e.GetVersion();
769
770 return NULL;
771 }
772 catch (CNetCacheException& e) {
773 if (e.GetErrCode() != CNetCacheException::eBlobNotFound)
774 throw;
775 return NULL;
776 }
777 }
778
779 class CSetValidWarningSuppressor
780 {
781 public:
CSetValidWarningSuppressor(INetServerConnectionListener * listener,const string & key,const string & subkey,int version)782 CSetValidWarningSuppressor(
783 INetServerConnectionListener* listener,
784 const string& key,
785 const string& subkey,
786 int version) :
787 m_Listener(listener),
788 m_Key(key),
789 m_Subkey(subkey),
790 m_Version(version)
791 {
792 _ASSERT(m_Listener);
793
794 auto warning_handler = [&](const string& m, CNetServer s) {
795 return OnWarning(m, s);
796 };
797
798 m_Listener->SetWarningHandler(warning_handler);
799 }
800
~CSetValidWarningSuppressor()801 ~CSetValidWarningSuppressor()
802 {
803 m_Listener->SetWarningHandler(nullptr);
804 }
805
806 private:
807 bool OnWarning(const string& warn_msg, CNetServer server);
808
809 CRef<INetServerConnectionListener> m_Listener;
810 const string m_Key;
811 const string m_Subkey;
812 const int m_Version;
813 };
814
OnWarning(const string & warn_msg,CNetServer)815 bool CSetValidWarningSuppressor::OnWarning(const string& warn_msg, CNetServer)
816 {
817 SIZE_TYPE ver_pos = NStr::FindCase(warn_msg,
818 CTempString("VER=", sizeof("VER=") - 1));
819
820 if (ver_pos == NPOS)
821 return false;
822 else {
823 int version = atoi(warn_msg.c_str() + ver_pos + sizeof("VER=") - 1);
824 if (version < m_Version) {
825 ERR_POST("Cache actualization error (key \"" << m_Key <<
826 "\", subkey \"" << m_Subkey <<
827 "\"): the cached blob version downgraded from " <<
828 m_Version << " to " << version);
829 }
830 }
831
832 return true;
833 }
834
SetBlobVersionAsCurrent(const string & key,const string & subkey,int version)835 void CNetICacheClient::SetBlobVersionAsCurrent(const string& key,
836 const string& subkey, int version)
837 {
838 CSetValidWarningSuppressor warning_suppressor(m_Impl->m_Service->m_Listener, key, subkey, version);
839
840 CNetServer::SExecResult exec_result(
841 m_Impl->ChooseServerAndExec(
842 m_Impl->MakeStdCmd("SETVALID",
843 s_KeyVersionSubkeyToBlobID(key, version, subkey),
844 &m_Impl->m_DefaultParameters),
845 key,
846 false,
847 &m_Impl->m_DefaultParameters));
848
849 if (!exec_result.response.empty()) {
850 ERR_POST("SetBlobVersionAsCurrent(\"" << key << "\", " <<
851 version << ", \"" << subkey << "\"): " << exec_result.response);
852 }
853 }
854
GetBlobInfo(const string & key,int version,const string & subkey,const CNamedParameterList * optional)855 CNetServerMultilineCmdOutput CNetICacheClient::GetBlobInfo(const string& key,
856 int version, const string& subkey,
857 const CNamedParameterList* optional)
858 {
859 CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
860
861 parameters.LoadNamedParameters(optional);
862
863 CNetServerMultilineCmdOutput output(
864 m_Impl->ChooseServerAndExec(
865 m_Impl->MakeStdCmd("GETMETA",
866 s_KeyVersionSubkeyToBlobID(key, version, subkey),
867 ¶meters),
868 key,
869 true,
870 ¶meters));
871
872 output->SetNetCacheCompatMode();
873
874 return output;
875 }
876
PrintBlobInfo(const string & key,int version,const string & subkey)877 void CNetICacheClient::PrintBlobInfo(const string& key,
878 int version, const string& subkey)
879 {
880 CNetServerMultilineCmdOutput output(GetBlobInfo(key, version, subkey));
881
882 string line;
883
884 if (output.ReadLine(line)) {
885 if (!NStr::StartsWith(line, "SIZE="))
886 NcbiCout << line << NcbiEndl;
887 while (output.ReadLine(line))
888 NcbiCout << line << NcbiEndl;
889 }
890 }
891
GetService()892 CNetService CNetICacheClient::GetService()
893 {
894 return m_Impl->m_Service;
895 }
896
MakeStdCmd(const char * cmd_base,const string & blob_id,const CNetCacheAPIParameters * parameters,const string & injection)897 string SNetICacheClientImpl::MakeStdCmd(const char* cmd_base,
898 const string& blob_id, const CNetCacheAPIParameters* parameters,
899 const string& injection)
900 {
901 string cmd("IC(" + NStr::PrintableString(parameters->GetCacheName()));
902 cmd.append(") ");
903
904 cmd.append(cmd_base);
905
906 cmd.push_back(' ');
907
908 cmd.append(blob_id);
909
910 if (!injection.empty())
911 cmd.append(injection);
912
913 AppendClientIPSessionIDPasswordAgeHitID(&cmd, parameters);
914
915 return cmd;
916 }
917
ExecStdCmd(const char * cmd_base,const string & key,int version,const string & subkey,const CNetCacheAPIParameters * parameters)918 string SNetICacheClientImpl::ExecStdCmd(const char* cmd_base,
919 const string& key, int version, const string& subkey,
920 const CNetCacheAPIParameters* parameters)
921 {
922 return ChooseServerAndExec(
923 MakeStdCmd(cmd_base,
924 s_KeyVersionSubkeyToBlobID(key, version, subkey),
925 parameters),
926 key,
927 false,
928 parameters).response;
929 }
930
GetCacheName(void) const931 string CNetICacheClient::GetCacheName(void) const
932 {
933 return m_Impl->m_DefaultParameters.GetCacheName();
934 }
935
936
SameCacheParams(const TCacheParams *) const937 bool CNetICacheClient::SameCacheParams(const TCacheParams*) const
938 {
939 return false;
940 }
941
GetServer(CNetServer::TInstance server)942 CNetICacheClientExt CNetICacheClientExt::GetServer(CNetServer::TInstance server)
943 {
944 return new SNetICacheClientImpl(server->m_ServerInPool, m_Impl);
945 }
946
Search(CNetICacheClient::CExpression expression,CNetICacheClient::CFields filter)947 vector<CNetICacheClient::CBlobInfo> CNetICacheClient::Search(
948 CNetICacheClient::CExpression expression,
949 CNetICacheClient::CFields filter)
950 {
951 const auto parameters = &m_Impl->m_DefaultParameters;
952 const auto cache_name = NStr::PrintableString(parameters->GetCacheName());
953 string ids;
954 m_Impl->AppendClientIPSessionIDPasswordAgeHitID(&ids, parameters);
955 ostringstream oss;
956 oss << "IC(" << cache_name << ") BLIST2" << expression + filter << ids;
957
958 CNetServerMultilineCmdOutput output(
959 m_Impl->ChooseServerAndExec(
960 oss.str(),
961 kEmptyStr,
962 true,
963 &m_Impl->m_DefaultParameters));
964
965 output->SetNetCacheCompatMode();
966 string line;
967 vector<CNetICacheClient::CBlobInfo> result;
968
969 while (output.ReadLine(line) && !line.empty()) {
970 CBlobInfo blob_info;
971 blob_info << line;
972 result.push_back(blob_info);
973 }
974
975 return result;
976 }
977
ProlongBlobLifetime(const string & key,const string & subkey,const CTimeout & ttl,const CNamedParameterList * optional)978 void CNetICacheClientExt::ProlongBlobLifetime(const string& key, const string& subkey,
979 const CTimeout& ttl, const CNamedParameterList* optional)
980 {
981 CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
982 parameters.LoadNamedParameters(optional);
983
984 string cmd("PROLONG \"");
985 cmd += NStr::PrintableString(parameters.GetCacheName());
986 cmd += "\" \"";
987 cmd += key;
988 cmd += "\" \"";
989 cmd += subkey;
990 cmd += "\" ttl=";
991 cmd += NStr::NumericToString((unsigned)ttl.GetAsDouble());
992
993 m_Impl->AppendClientIPSessionIDHitID(&cmd);
994 m_Impl->ChooseServerAndExec(cmd, key, false, &m_Impl->m_DefaultParameters);
995 }
996
GetNetCacheAPI()997 SNetCacheAPIImpl* CNetICacheClientExt::GetNetCacheAPI()
998 {
999 return new SNetCacheAPIImpl(m_Impl.GetObject());
1000 }
1001
GetNetCacheAPI() const1002 const SNetCacheAPIImpl* CNetICacheClientExt::GetNetCacheAPI() const
1003 {
1004 return new SNetCacheAPIImpl(m_Impl.GetObject());
1005 }
1006
1007 /// Class factory for NetCache implementation of ICache
1008 ///
1009 /// @internal
1010 ///
1011 class CNetICacheCF : public CICacheCF<CNetICacheClient>
1012 {
1013 public:
1014 typedef CICacheCF<CNetICacheClient> TParent;
1015
1016 public:
CNetICacheCF()1017 CNetICacheCF() : TParent(kNetICacheDriverName, 0)
1018 {
1019 }
1020
1021 private:
1022 virtual ICache* x_CreateInstance(
1023 const string& driver = kEmptyStr,
1024 CVersionInfo version = NCBI_INTERFACE_VERSION(ICache),
1025 const TPluginManagerParamTree* params = 0) const;
1026 };
1027
1028
x_CreateInstance(const string & driver,CVersionInfo version,const TPluginManagerParamTree * params) const1029 ICache* CNetICacheCF::x_CreateInstance(
1030 const string& driver,
1031 CVersionInfo version,
1032 const TPluginManagerParamTree* params) const
1033 {
1034 if ((!driver.empty() && driver != m_DriverName) ||
1035 version.Match(NCBI_INTERFACE_VERSION(ICache)) ==
1036 CVersionInfo::eNonCompatible)
1037 return 0;
1038
1039 if (!params)
1040 return new CNetICacheClient;
1041
1042 CConfig conf(params);
1043
1044 return new CNetICacheClient(&conf, driver);
1045 }
1046
1047
NCBI_EntryPoint_xcache_netcache(CPluginManager<ICache>::TDriverInfoList & info_list,CPluginManager<ICache>::EEntryPointRequest method)1048 void NCBI_EntryPoint_xcache_netcache(
1049 CPluginManager<ICache>::TDriverInfoList& info_list,
1050 CPluginManager<ICache>::EEntryPointRequest method)
1051 {
1052 CHostEntryPointImpl<CNetICacheCF>::NCBI_EntryPointImpl(info_list, method);
1053 }
1054
1055
Cache_RegisterDriver_NetCache(void)1056 void Cache_RegisterDriver_NetCache(void)
1057 {
1058 RegisterEntryPoint<ICache>( NCBI_EntryPoint_xcache_netcache );
1059 }
1060
1061 END_NCBI_SCOPE
1062