1 /*  $Id: nc_storage.cpp 593132 2019-09-12 15:33:23Z gouriano $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  */
28 
29 #include "nc_pch.hpp"
30 
31 #include <corelib/ncbireg.hpp>
32 #include <corelib/ncbifile.hpp>
33 #include <corelib/request_ctx.hpp>
34 #include <corelib/ncbi_process.hpp>
35 
36 #include "netcached.hpp"
37 #include "nc_storage.hpp"
38 #include "storage_types.hpp"
39 #include "nc_db_files.hpp"
40 #include "distribution_conf.hpp"
41 #include "nc_storage_blob.hpp"
42 #include "sync_log.hpp"
43 #include "nc_stat.hpp"
44 #include "logging.hpp"
45 #include "peer_control.hpp"
46 
47 
48 #ifdef NCBI_OS_LINUX
49 # include <sys/types.h>
50 # include <sys/stat.h>
51 # include <fcntl.h>
52 # include <sys/mman.h>
53 #endif
54 
55 #define __NC_CACHEDATA_ALL_MONITOR 0
56 // uses Boost intrusive rbtree to hold SNCCacheData (versus std::set)
57 #define __NC_CACHEDATA_INTR_SET 1
58 
59 BEGIN_NCBI_SCOPE
60 
61 
62 static const char* kNCStorage_DBFileExt       = ".db";
63 static const char* kNCStorage_MetaFileSuffix  = "";
64 static const char* kNCStorage_DataFileSuffix  = "";
65 static const char* kNCStorage_MapsFileSuffix  = "";
66 static const char* kNCStorage_IndexFileSuffix = ".index";
67 static const char* kNCStorage_StartedFileName = "__ncbi_netcache_started__";
68 
69 static const char* kNCStorage_RegSection        = "storage";
70 static const char* kNCStorage_PathParam         = "path";
71 static const char* kNCStorage_FilePrefixParam   = "prefix";
72 static const char* kNCStorage_GuardNameParam    = "guard_file_name";
73 static const char* kNCStorage_FileSizeParam     = "each_file_size";
74 static const char* kNCStorage_GarbagePctParam   = "max_garbage_pct";
75 static const char* kNCStorage_MinDBSizeParam    = "min_storage_size";
76 static const char* kNCStorage_MoveLifeParam     = "min_lifetime_to_move";
77 static const char* kNCStorage_FailedMoveParam   = "failed_move_delay";
78 static const char* kNCStorage_GCBatchParam      = "gc_batch_size";
79 static const char* kNCStorage_FlushTimeParam    = "sync_time_period";
80 static const char* kNCStorage_ExtraGCOnParam    = "db_limit_del_old_on";
81 static const char* kNCStorage_ExtraGCOffParam   = "db_limit_del_old_off";
82 static const char* kNCStorage_StopWriteOnParam  = "db_limit_stop_write_on";
83 static const char* kNCStorage_StopWriteOffParam = "db_limit_stop_write_off";
84 static const char* kNCStorage_MinFreeDiskParam  = "disk_free_limit";
85 static const char* kNCStorage_DiskCriticalParam = "critical_disk_free_limit";
86 static const char* kNCStorage_MinRecNoSaveParam = "min_rec_no_save_period";
87 static const char* kNCStorage_FailedWriteSize   = "failed_write_blob_key_count";
88 static const char* kNCStorage_MaxBlobSizeStore  = "max_blob_size_store";
89 static const char* kNCStorage_WbMemRelease      = "task_priority_wb_memrelease";
90 
91 
92 // storage file type signatures
93 static const Uint8 kMetaSignature = NCBI_CONST_UINT8(0xeed5be66cdafbfa3);
94 static const Uint8 kDataSignature = NCBI_CONST_UINT8(0xaf9bedf24cfa05ed);
95 static const Uint8 kMapsSignature = NCBI_CONST_UINT8(0xba6efd7b61fdff6c);
96 static const Uint1 kSignatureSize = sizeof(kMetaSignature);
97 
98 
99 /// Size of memory page that is a granularity of all allocations from OS.
100 static const size_t kMemPageSize  = 4 * 1024;
101 /// Mask that can move pointer address or memory size to the memory page
102 /// boundary.
103 static const size_t kMemPageAlignMask = ~(kMemPageSize - 1);
104 
105 
106 enum EStopCause {
107     eNoStop,
108     eStopWarning,
109     eStopDBSize,
110     eStopDiskSpace,
111     eStopDiskCritical
112 };
113 
114 
115 #if __NC_CACHEDATA_INTR_SET
116 struct SCacheDeadCompare
117 {
operator ()SCacheDeadCompare118     bool operator() (const SNCCacheData& x, const SNCCacheData& y) const
119     {
120         return x.saved_dead_time < y.saved_dead_time;
121     }
122 };
123 
124 struct SCacheKeyCompare
125 {
operator ()SCacheKeyCompare126     bool operator() (const SNCCacheData& x, const SNCCacheData& y) const
127     {
128         return x.key < y.key;
129     }
operator ()SCacheKeyCompare130     bool operator() (const string& key, const SNCCacheData& y) const
131     {
132         return key < y.key;
133     }
operator ()SCacheKeyCompare134     bool operator() (const SNCCacheData& x, const string& key) const
135     {
136         return x.key < key;
137     }
138 };
139 
140 typedef intr::rbtree<SNCCacheData,
141                      intr::base_hook<TTimeTableHook>,
142                      intr::constant_time_size<false>,
143                      intr::compare<SCacheDeadCompare> >     TTimeTableMap;
144 typedef intr::rbtree<SNCCacheData,
145                      intr::base_hook<TKeyMapHook>,
146                      intr::constant_time_size<true>,
147                      intr::compare<SCacheKeyCompare> >      TKeyMap;
148 #else  // __NC_CACHEDATA_INTR_SET
149 struct SCacheDeadCompare
150 {
operator ()SCacheDeadCompare151     bool operator() (const SNCCacheData* x, const SNCCacheData* y) const
152     {
153         return (x->saved_dead_time != y->saved_dead_time) ?
154             (x->saved_dead_time < y->saved_dead_time) : (x->key < y->key);
155     }
156 };
157 struct SCacheKeyCompare
158 {
operator ()SCacheKeyCompare159     bool operator() (const SNCCacheData* x, const SNCCacheData* y) const
160     {
161         return x->key < y->key;
162     }
163 };
164 typedef std::set<SNCCacheData*, SCacheDeadCompare>  TTimeTableMap;
165 typedef std::set<SNCCacheData*, SCacheKeyCompare>   TKeyMap;
166 
167 #endif  // __NC_CACHEDATA_INTR_SET
168 
169 struct SBucketCache
170 {
171     CMiniMutex   lock;
172     TKeyMap      key_map;
173 };
174 typedef map<Uint2, SBucketCache*> TBucketCacheMap;
175 
176 struct STimeTable
177 {
178     CMiniMutex lock;
179     TTimeTableMap time_map;
180 };
181 typedef map<Uint2, STimeTable*> TTimeBuckets;
182 
183 #if __NC_CACHEDATA_ALL_MONITOR
184 struct SAllCacheTable
185 {
186     CMiniMutex lock;
187     set<SNCCacheData *> all_cache_set;
188 };
189 typedef map<Uint2, SAllCacheTable*> TAllCacheBuckets;
190 #endif
191 
192 #if __NC_CACHEDATA_MONITOR
193 static CMiniMutex s_AllCacheLock;
194 static set<SNCCacheData *> s_AllCacheSet;
x_Register(void)195 void SNCCacheData::x_Register(void)
196 {
197     s_AllCacheLock.Lock();
198     s_AllCacheSet.insert(this);
199     s_AllCacheLock.Unlock();
200 }
x_Revoke(void)201 void SNCCacheData::x_Revoke(void)
202 {
203     s_AllCacheLock.Lock();
204     s_AllCacheSet.erase(this);
205     s_AllCacheLock.Unlock();
206 }
207 #endif
208 
209 
210 /// Directory for all database files of the storage
211 static string s_Path;
212 /// Name of the storage
213 static string s_Prefix;
214 /// Number of blobs treated by GC and by caching mechanism in one batch
215 static int s_GCBatchSize   = 0;
216 static int s_FlushTimePeriod = 0;
217 static int s_MaxGarbagePct = 0;
218 static int s_MinMoveLife     = 0;
219 static int s_FailedMoveDelay = 0;
220 static Int8 s_MinDBSize     = 0;
221 /// Name of guard file excluding several instances to run on the same
222 /// database.
223 static string s_GuardName;
224 /// Lock for guard file representing this instance running.
225 /// It will hold exclusive lock all the time while NetCache is
226 /// working, so that other instance of NetCache on the same database will
227 /// be unable to start.
228 static auto_ptr<CFileLock> s_GuardLock;
229 
230 /// manages access to s_IndexDB
231 static CMiniMutex s_IndexLock;
232 /// Index database file
233 static auto_ptr<CNCDBIndexFile> s_IndexDB;
234 
235 /// Read-write lock to work with s_DBFiles
236 static CMiniMutex s_DBFilesLock;
237 /// List of all database parts in the storage
238 static TNCDBFilesMap* s_DBFiles = nullptr;
239 
240 static Uint4 s_LastFileId = 0;
241 
242 /// manages access to s_AllWritings
243 static CMiniMutex s_NextWriteLock;
244 static ENCDBFileType const s_AllFileTypes[]
245                     = {eDBFileMeta, eDBFileData, eDBFileMaps
246                        /*, eDBFileMoveMeta, eDBFileMoveData, eDBFileMoveMaps*/};
247 static size_t const s_CntAllFiles = sizeof(s_AllFileTypes) / sizeof(s_AllFileTypes[0]);
248 static SWritingInfo s_AllWritings[s_CntAllFiles];
249 
250 static Uint4 s_NewFileSize = 0;
251 static TTimeBuckets s_TimeTables;
252 static Int8 s_CurBlobsCnt = 0;
253 static Int8 s_CurKeysCnt = 0;
254 static bool s_Draining = false;
255 static bool s_AbandonDB = false;
256 /// Current size of storage database. Kept here for printing statistics.
257 volatile static Int8 s_CurDBSize = 0;
258 volatile static Int8 s_GarbageSize = 0;
259 static int s_LastFlushTime = 0;
260 /// Internal cache of blobs identification information sorted to be able
261 /// to search by key, subkey and version.
262 static TBucketCacheMap s_BucketsCache;
263 
264 #if __NC_CACHEDATA_ALL_MONITOR
265 static TAllCacheBuckets s_AllCache;
266 #endif
267 
268 static EStopCause s_IsStopWrite = eNoStop;
269 static bool s_CleanStart = false;
270 static bool s_NeedSaveLogRecNo = false;
271 static bool s_NeedSavePurgeData = false;
272 static int s_WarnLimitOnPct = 0;
273 static int s_WarnLimitOffPct = 0;
274 static int s_MinRecNoSavePeriod = 0;
275 static int s_LastRecNoSaveTime = 0;
276 static CAtomicCounter s_BlobCounter;
277 static Int8 s_ExtraGCOnSize = 0;
278 static Int8 s_ExtraGCOffSize = 0;
279 static Int8 s_StopWriteOnSize = 0;
280 static Int8 s_StopWriteOffSize = 0;
281 static Int8 s_DiskFreeLimit = 0;
282 static Int8 s_DiskCritical = 0;
283 static Uint8 s_MaxBlobSizeStore = 0;
284 static CNewFileCreator* s_NewFileCreator = nullptr;
285 static CDiskFlusher* s_DiskFlusher = nullptr;
286 static CRecNoSaver* s_RecNoSaver = nullptr;
287 static CSpaceShrinker* s_SpaceShrinker = nullptr;
288 static CExpiredCleaner* s_ExpiredCleaner = nullptr;
289 Uint4 s_TaskPriorityWbMemRelease = 10;
290 
291 
292 
293 #define DB_CORRUPTED(msg)                                                   \
294     do {                                                                    \
295         SRV_LOG(Fatal, "Database is corrupted. " << msg                     \
296                 << " Bug, faulty disk or somebody writes to database?");    \
297         abort();                                                            \
298     } while (0)                                                             \
299 /**/
300 
FindBlob(Uint2 bucket,const string & mask,Uint8 cr_time_hi)301 string CNCBlobStorage::FindBlob(Uint2 bucket, const string& mask, Uint8 cr_time_hi)
302 {
303     string found;
304     Uint8 cur_time = CSrvTime::Current().Sec();
305     TBucketCacheMap::const_iterator bkt = s_BucketsCache.find(bucket);
306     if (bkt != s_BucketsCache.end()) {
307         SNCCacheData search_mask;
308         search_mask.key = mask;
309 
310         SBucketCache* cache = bkt->second;
311         cache->lock.Lock();
312 #if __NC_CACHEDATA_INTR_SET
313         TKeyMap::iterator lb = cache->key_map.lower_bound(search_mask);
314         for ( ; lb != cache->key_map.end(); ++lb) {
315             if (strncmp(search_mask.key.data(), lb->key.data(), search_mask.key.size())== 0) {
316                 if (lb->expire <= cur_time) {
317                     continue;
318                 }
319                 if (lb->create_time <= cr_time_hi) {
320                     found = lb->key;
321                     break;
322                 }
323             } else {
324                 break;
325             }
326         }
327 #else
328         TKeyMap::iterator lb = cache->key_map.lower_bound(&search_mask);
329         for ( ; lb != cache->key_map.end(); ++lb) {
330             if (strncmp(search_mask.key.data(), (*lb)->key.data(), search_mask.key.size())== 0) {
331                 if ((*lb)->expire <= cur_time) {
332                     continue;
333                 }
334                 if ((*lb)->create_time <= cr_time_hi) {
335                     found = (*lb)->key;
336                     break;
337                 }
338             } else {
339                 break;
340             }
341         }
342 #endif
343         cache->lock.Unlock();
344     }
345     return found;
346 }
347 
348 void
GetBList(const string & mask,auto_ptr<TNCBufferType> & buffer,SNCBlobFilter * filters,const string & sep)349 CNCBlobStorage::GetBList(const string& mask, auto_ptr<TNCBufferType>& buffer, SNCBlobFilter* filters, const string& sep)
350 {
351     if (mask.empty()) {
352         return;
353     }
354     SNCCacheData search_mask;
355     const char* mb = mask.data();
356     const char* me = mask.data() + mask.size() - 1;
357     bool show_exp = false;
358     while (*mb == '\"' || *mb == '\'' || *mb == '*') {
359         show_exp = true;
360         ++mb;
361     }
362     while (me > mb && *me == '\1' && *(me-1) == '\1') {
363         --me;
364     }
365     if (me == mb && *me == '\1') {
366         --me;
367     }
368     search_mask.key.assign(mb, me+1-mb);
369 
370     Uint8 cur_time = CSrvTime::Current().Sec();
371 
372     Uint8 cr_time_lo = ((filters->cr_ago_lt   != 0) ? (cur_time - filters->cr_ago_lt)   : filters->cr_epoch_ge) * kUSecsPerSecond;
373     Uint8 cr_time_hi = ((filters->cr_ago_ge   != 0) ? (cur_time - filters->cr_ago_ge)   : filters->cr_epoch_lt) * kUSecsPerSecond;
374     int    expire_lo = ((filters->exp_now_ge  != 0) ? (cur_time + filters->exp_now_ge)  : filters->exp_epoch_ge);
375     int    expire_hi = ((filters->exp_now_lt  != 0) ? (cur_time + filters->exp_now_lt)  : filters->exp_epoch_lt);
376     int   vexpire_lo = ((filters->vexp_now_ge != 0) ? (cur_time + filters->vexp_now_ge) : filters->vexp_epoch_ge);
377     int   vexpire_hi = ((filters->vexp_now_lt != 0) ? (cur_time + filters->vexp_now_lt) : filters->vexp_epoch_lt);
378     Uint8    size_lo = filters->size_ge;
379     Uint8    size_hi = filters->size_lt;
380     Uint8 create_server = filters->cr_srv;
381     bool extra =  cr_time_lo != 0 || cr_time_hi != 0 ||
382                    expire_lo != 0 ||  expire_hi != 0 ||
383                   vexpire_lo != 0 || vexpire_hi != 0 ||
384                      size_lo != 0 ||    size_hi != 0 || create_server != 0;
385 
386     ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
387         SBucketCache* cache = bkt->second;
388         cache->lock.Lock();
389 #if __NC_CACHEDATA_INTR_SET
390         TKeyMap::iterator lb = cache->key_map.lower_bound(search_mask);
391         for ( ; lb != cache->key_map.end(); ++lb) {
392 //            SNCCacheData& d = *lb;
393             if (strncmp(search_mask.key.data(), lb->key.data(), search_mask.key.size())== 0) {
394                 if (!show_exp && lb->expire <= cur_time) {
395                     continue;
396                 }
397                 if (!extra || (
398                     (lb->create_time >= cr_time_lo && (cr_time_hi == 0 || lb->create_time < cr_time_hi)) &&
399                     (lb->expire      >=  expire_lo && ( expire_hi == 0 || lb->expire      <  expire_hi)) &&
400                     (lb->ver_expire  >= vexpire_lo && (vexpire_hi == 0 || lb->ver_expire  < vexpire_hi)) &&
401                     (lb->size        >=    size_lo && (   size_hi == 0 || lb->size        <    size_hi)) &&
402                     (create_server == 0 || lb->create_server == create_server))) {
403 
404                     string bkey( NStr::Replace(lb->key,"\1",sep));
405                     buffer->append(bkey.data(), bkey.size());
406                     if (extra) {
407                         if  (cr_time_lo != 0 || cr_time_hi != 0) {
408                             buffer->WriteText(sep).WriteText("cr_time=").WriteNumber(lb->create_time/kUSecsPerSecond);
409                         }
410                         if  (expire_lo != 0 || expire_hi != 0) {
411                             buffer->WriteText(sep).WriteText("exp=").WriteNumber(lb->expire);
412                         }
413                         if  (vexpire_lo != 0 || vexpire_hi != 0) {
414                             buffer->WriteText(sep).WriteText("ver_dead=").WriteNumber(lb->ver_expire);
415                         }
416                         if  (create_server != 0) {
417                             buffer->WriteText(sep).WriteText("cr_srv=").WriteNumber(lb->create_server);
418                         }
419                         if  (size_lo != 0 || size_hi != 0) {
420                             buffer->WriteText(sep).WriteText("size=").WriteNumber(lb->size);
421                         }
422                     }
423                     buffer->append("\n",1);
424                 }
425                 continue;
426             }
427             break;
428         }
429 #else
430         TKeyMap::iterator lb = cache->key_map.lower_bound(&search_mask);
431         for ( ; lb != cache->key_map.end(); ++lb) {
432             if (strncmp(search_mask.key.data(), (*lb)->key.data(), search_mask.key.size())== 0) {
433                 if (!show_exp && (*lb)->expire <= cur_time) {
434                     continue;
435                 }
436                 if (!extra || (
437                     ((*lb)->create_time >= cr_time_lo && (cr_time_hi == 0 || (*lb)->create_time <= cr_time_hi)) &&
438                     ((*lb)->expire      >=  expire_lo && ( expire_hi == 0 || (*lb)->expire      <=  expire_hi)) &&
439                     ((*lb)->ver_expire  >= vexpire_lo && (vexpire_hi == 0 || (*lb)->ver_expire  <= vexpire_hi)) &&
440                     ((*lb)->size        >=    size_lo && (   size_hi == 0 || (*lb)->size        <=    size_hi)) &&
441                     (create_server == 0 || (*lb)->create_server == create_server))) {
442 
443                     string bkey( NStr::Replace((*lb)->key,"\1",","));
444                     buffer->append(bkey.data(), bkey.size());
445                     if (extra) {
446                         if  (cr_time_lo != 0 || cr_time_hi != 0) {
447                             buffer->WriteText(",cr_time=").WriteNumber((*lb)->create_time/kUSecsPerSecond);
448                         }
449                         if  (expire_lo != 0 || expire_hi != 0) {
450                             buffer->WriteText(",exp=").WriteNumber((*lb)->expire);
451                         }
452                         if  (vexpire_lo != 0 || vexpire_hi != 0) {
453                             buffer->WriteText(",ver_dead=").WriteNumber((*lb)->ver_expire);
454                         }
455                         if  (create_server != 0) {
456                             buffer->WriteText(",cr_srv=").WriteNumber((*lb)->create_server);
457                         }
458                         if  (size_lo != 0 || size_hi != 0) {
459                             buffer->WriteText(",size=").WriteNumber((*lb)->size);
460                         }
461                     }
462                     buffer->append("\n",1);
463                 }
464                 continue;
465             }
466             break;
467         }
468 #endif
469         cache->lock.Unlock();
470     }
471 
472 }
473 
474 
475 static inline char*
s_MapFile(TFileHandle fd,size_t file_size)476 s_MapFile(TFileHandle fd, size_t file_size)
477 {
478     char* mem_ptr = NULL;
479 #ifdef NCBI_OS_LINUX
480     file_size = (file_size + kMemPageSize - 1) & kMemPageAlignMask;
481     mem_ptr = (char*)mmap(NULL, file_size, PROT_READ | PROT_WRITE,
482                           MAP_SHARED, fd, 0);
483     if (mem_ptr == MAP_FAILED) {
484         SRV_LOG(Critical, "Cannot map file into memory, errno=" << errno);
485         return NULL;
486     }
487 #endif
488     return mem_ptr;
489 }
490 
491 static inline void
s_UnmapFile(char * mem_ptr,size_t file_size)492 s_UnmapFile(char* mem_ptr, size_t file_size)
493 {
494 #ifdef NCBI_OS_LINUX
495     file_size = (file_size + kMemPageSize - 1) & kMemPageAlignMask;
496     munmap(mem_ptr, file_size);
497 #endif
498 }
499 
500 static inline void
s_LockFileMem(const void * mem_ptr,size_t mem_size)501 s_LockFileMem(const void* mem_ptr, size_t mem_size)
502 {
503 // mlock is not allowed on NCBI servers (limit 32 KB)
504 #if 0
505 //#ifdef NCBI_OS_LINUX
506     int res = mlock(mem_ptr, mem_size);
507     if (res) {
508         SRV_LOG(Critical, "mlock finished with error, errno=" << errno);
509     }
510 #endif
511 }
512 
513 
514 
515 static bool
s_EnsureDirExist(const string & dir_name)516 s_EnsureDirExist(const string& dir_name)
517 {
518     CDir dir(dir_name);
519     if (!dir.Exists()) {
520         return dir.Create();
521     }
522     return true;
523 }
524 
525 /// Read from registry only those parameters that can be changed on the
526 /// fly, without re-starting the application.
527 static bool
s_ReadVariableParams(const CNcbiRegistry & reg)528 s_ReadVariableParams(const CNcbiRegistry& reg)
529 {
530     s_GCBatchSize = Uint2(reg.GetInt(kNCStorage_RegSection, kNCStorage_GCBatchParam, 500));
531 
532     string str = reg.GetString(kNCStorage_RegSection, kNCStorage_FileSizeParam, "100 MB");
533     s_NewFileSize = Uint4(NStr::StringToUInt8_DataSize(str));
534     s_MaxGarbagePct = reg.GetInt(kNCStorage_RegSection, kNCStorage_GarbagePctParam, 20);
535     str = reg.GetString(kNCStorage_RegSection, kNCStorage_MinDBSizeParam, "1 GB");
536     s_MinDBSize = NStr::StringToUInt8_DataSize(str);
537     s_MinMoveLife = reg.GetInt(kNCStorage_RegSection, kNCStorage_MoveLifeParam, 1000);
538     s_FailedMoveDelay = reg.GetInt(kNCStorage_RegSection, kNCStorage_FailedMoveParam, 10);
539 
540     s_MinRecNoSavePeriod = reg.GetInt(kNCStorage_RegSection, kNCStorage_MinRecNoSaveParam, 30);
541     s_FlushTimePeriod = reg.GetInt(kNCStorage_RegSection, kNCStorage_FlushTimeParam, 0);
542 
543     s_ExtraGCOnSize  = NStr::StringToUInt8_DataSize(reg.GetString(
544                        kNCStorage_RegSection, kNCStorage_ExtraGCOnParam, "0"));
545     s_ExtraGCOffSize = NStr::StringToUInt8_DataSize(reg.GetString(
546                        kNCStorage_RegSection, kNCStorage_ExtraGCOffParam, "0"));
547     s_StopWriteOnSize  = NStr::StringToUInt8_DataSize(reg.GetString(
548                        kNCStorage_RegSection, kNCStorage_StopWriteOnParam, "0"));
549     s_StopWriteOffSize = NStr::StringToUInt8_DataSize(reg.GetString(
550                        kNCStorage_RegSection, kNCStorage_StopWriteOffParam, "0"));
551     s_DiskFreeLimit  = NStr::StringToUInt8_DataSize(reg.GetString(
552                        kNCStorage_RegSection, kNCStorage_MinFreeDiskParam, "5 GB"));
553     s_DiskCritical   = NStr::StringToUInt8_DataSize(reg.GetString(
554                        kNCStorage_RegSection, kNCStorage_DiskCriticalParam, "1 GB"));
555     s_MaxBlobSizeStore = NStr::StringToUInt8_DataSize(reg.GetString(
556                        kNCStorage_RegSection, kNCStorage_MaxBlobSizeStore, "1 GB"));
557     if (s_MaxBlobSizeStore > kNCLargestBlobSize) {
558         SRV_LOG(Error, "Parameter " << kNCStorage_MaxBlobSizeStore << " is too large."
559                        << " Changing it to " << kNCLargestBlobSize);
560         s_MaxBlobSizeStore = kNCLargestBlobSize;
561     }
562 
563     int warn_pct = reg.GetInt(kNCStorage_RegSection, "db_limit_percentage_alert", 65);
564     if (warn_pct <= 0  ||  warn_pct >= 100) {
565         SRV_LOG(Error, "Parameter db_limit_percentage_alert has wrong value "
566                        << warn_pct << ". Assuming it's 65.");
567         warn_pct = 65;
568     }
569     s_WarnLimitOnPct = warn_pct;
570     warn_pct = reg.GetInt(kNCStorage_RegSection, "db_limit_percentage_alert_delta", 5);
571     if (warn_pct <= 0  ||  warn_pct >= s_WarnLimitOnPct) {
572         SRV_LOG(Error, "Parameter db_limit_percentage_alert_delta has wrong value "
573                        << warn_pct << ". Assuming it's 5.");
574         warn_pct = 5;
575     }
576     s_WarnLimitOffPct = s_WarnLimitOnPct - warn_pct;
577 
578     SetWBSoftSizeLimit(NStr::StringToUInt8_DataSize(reg.GetString(
579                        kNCStorage_RegSection, "write_back_soft_size_limit", "3 GB")));
580     SetWBHardSizeLimit(NStr::StringToUInt8_DataSize(reg.GetString(
581                        kNCStorage_RegSection, "write_back_hard_size_limit", "4 GB")));
582 
583     int to2 = reg.GetInt(kNCStorage_RegSection, "write_back_timeout", 1000);
584     int to1 = reg.GetInt(kNCStorage_RegSection, "write_back_timeout_startup", to2);
585     SetWBWriteTimeout( CNCServer::IsInitiallySynced() ? to2 : to1, to2);
586     SetWBFailedWriteDelay(reg.GetInt(kNCStorage_RegSection, "write_back_failed_delay", 2));
587     s_TaskPriorityWbMemRelease = reg.GetInt(kNCStorage_RegSection, kNCStorage_WbMemRelease, 10);
588 
589     int failed_write = reg.GetInt(kNCStorage_RegSection, kNCStorage_FailedWriteSize, 0);
590     CNCBlobAccessor::SetFailedWriteCount((Uint4)failed_write);
591     return true;
592 }
593 
594 /// Read all storage parameters from registry
595 static bool
s_ReadStorageParams(void)596 s_ReadStorageParams(void)
597 {
598     const CNcbiRegistry& reg = CTaskServer::GetConfRegistry();
599 
600     s_Path = reg.GetString(kNCStorage_RegSection, kNCStorage_PathParam, "./cache");
601     s_Prefix = reg.GetString(kNCStorage_RegSection, kNCStorage_FilePrefixParam, "nccache");
602     if (s_Path.empty()  ||  s_Prefix.empty()) {
603         SRV_LOG(Critical, "Incorrect parameters for " << kNCStorage_PathParam
604                           << " and " << kNCStorage_FilePrefixParam
605                           << " in the section '" << kNCStorage_RegSection << "': '"
606                           << s_Path << "' and '" << s_Prefix << "'");
607         return false;
608     }
609     if (!s_EnsureDirExist(s_Path)) {
610         SRV_LOG(Critical, "Cannot create directory " << s_Path);
611         return false;
612     }
613     s_GuardName = reg.Get(kNCStorage_RegSection, kNCStorage_GuardNameParam);
614     if (s_GuardName.empty()) {
615         s_GuardName = CDirEntry::MakePath(s_Path,
616                                           kNCStorage_StartedFileName,
617                                           s_Prefix);
618     }
619     try {
620         return s_ReadVariableParams(reg);
621     }
622     catch (CStringException& ex) {
623         SRV_LOG(Critical, "Bad configuration: " << ex);
624         return false;
625     }
626 }
627 
628 /// Make name of the index file in the storage
629 static string
s_GetIndexFileName(void)630 s_GetIndexFileName(void)
631 {
632     string file_name(s_Prefix);
633     file_name += kNCStorage_IndexFileSuffix;
634     file_name = CDirEntry::MakePath(s_Path, file_name, kNCStorage_DBFileExt);
635     return CDirEntry::CreateAbsolutePath(file_name);
636 }
637 
638 /// Make name of file with meta-information in given database part
639 static string
s_GetFileName(Uint4 file_id,ENCDBFileType file_type)640 s_GetFileName(Uint4 file_id, ENCDBFileType file_type)
641 {
642     string file_name(s_Prefix);
643     switch (file_type) {
644     case eDBFileMeta:
645         file_name += kNCStorage_MetaFileSuffix;
646         break;
647     case eDBFileData:
648         file_name += kNCStorage_DataFileSuffix;
649         break;
650     case eDBFileMaps:
651         file_name += kNCStorage_MapsFileSuffix;
652         break;
653     default:
654         SRV_FATAL("Unsupported file type: " << file_type);
655     }
656     file_name += NStr::UIntToString(file_id);
657     file_name = CDirEntry::MakePath(s_Path, file_name, kNCStorage_DBFileExt);
658     return CDirEntry::CreateAbsolutePath(file_name);
659 }
660 
661 /// Make sure that current storage database is used only with one instance
662 /// of NetCache. Lock specially named file exclusively and hold the lock
663 /// for all time while process is working. This file also is used for
664 /// determining if previous instance of NetCache was closed gracefully.
665 /// If another instance of NetCache using the database method will throw
666 /// an exception.
667 ///
668 /// @return
669 ///   TRUE if guard file existed and was unlocked meaning that previous
670 ///   instance of NetCache was terminated inappropriately. FALSE if file
671 ///   didn't exist so that this storage instance is a clean start.
672 static bool
s_LockInstanceGuard(void)673 s_LockInstanceGuard(void)
674 {
675     s_CleanStart = !CFile(s_GuardName).Exists();
676 
677     try {
678         if (s_CleanStart) {
679             // Just create the file with read and write permissions
680             CFileWriter tmp_writer(s_GuardName, CFileWriter::eOpenAlways);
681             CFile guard_file(s_GuardName);
682             CFile::TMode user_mode, grp_mode, oth_mode;
683 
684             guard_file.GetMode(&user_mode, &grp_mode, &oth_mode);
685             user_mode |= CFile::fRead;
686             guard_file.SetMode(user_mode, grp_mode, oth_mode);
687         }
688 
689         s_GuardLock.reset(new CFileLock(s_GuardName,
690                                         CFileLock::fDefault,
691                                         CFileLock::eExclusive));
692         if (!s_CleanStart  &&  CFile(s_GuardName).GetLength() == 0)
693             s_CleanStart = true;
694         if (!s_CleanStart) {
695             CNCAlerts::Register(CNCAlerts::eStartAfterCrash,
696                 "InstanceGuard file " + s_GuardName + " was present on startup");
697             INFO("NetCache wasn't finished cleanly in previous run. "
698                  "Will try to work with storage as is.");
699         }
700 
701         CFileIO writer;
702         writer.SetFileHandle(s_GuardLock->GetFileHandle());
703         writer.SetFileSize(0, CFileIO::eBegin);
704         string pid(NStr::UInt8ToString(CCurrentProcess::GetPid()));
705         writer.Write(pid.c_str(), pid.size());
706     }
707     catch (CFileErrnoException& ex) {
708         SRV_LOG(Critical, "Can't lock the database (other instance is running?): " << ex);
709         return false;
710     }
711     return true;
712 }
713 
714 /// Unlock and delete specially named file used to ensure only one
715 /// instance working with database.
716 /// Small race exists here now which cannot be resolved for Windows
717 /// without re-writing of the mechanism using low-level functions. Race
718 /// can appear like this: if another instance will start right when this
719 /// instance stops then other instance can be thinking that this instance
720 /// terminated incorrectly.
721 static void
s_UnlockInstanceGuard(void)722 s_UnlockInstanceGuard(void)
723 {
724     if (s_GuardLock.get()) {
725         s_GuardLock.reset();
726         if (!s_AbandonDB) {
727             CFile(s_GuardName).Remove();
728         }
729     }
730 }
731 
732 /// Open and read index database file
733 static bool
s_OpenIndexDB(void)734 s_OpenIndexDB(void)
735 {
736     string index_name = s_GetIndexFileName();
737     for (int i = 0; i < 2; ++i) {
738         try {
739             s_IndexDB.reset(new CNCDBIndexFile(index_name));
740             s_IndexDB->CreateDatabase();
741             s_IndexDB->GetAllDBFiles(s_DBFiles);
742             ERASE_ITERATE(TNCDBFilesMap, it, (*s_DBFiles)) {
743                 CSrvRef<SNCDBFileInfo> info = it->second;
744                 Int8 file_size = -1;
745 #ifdef NCBI_OS_LINUX
746                 info->fd = open(info->file_name.c_str(), O_RDWR | O_NOATIME);
747                 if (info->fd == -1) {
748                     SRV_LOG(Critical, "Cannot open storage file, errno=" << errno);
749 delete_file:
750                     try {
751                         s_IndexDB->DeleteDBFile(info->file_id);
752                     }
753                     catch (CSQLITE_Exception& ex) {
754                         SRV_LOG(Critical, "Error cleaning index file: " << ex);
755                     }
756                     s_DBFiles->erase(it);
757                     continue;
758                 }
759                 file_size = CFile(info->file_name).GetLength();
760                 if (file_size == -1) {
761                     SRV_LOG(Critical, "Cannot read file size (errno=" << errno
762                                       << "). File " << info->file_name
763                                       << " will be deleted.");
764                     goto delete_file;
765                 }
766                 info->file_size = Uint4(file_size);
767                 info->file_map = s_MapFile(info->fd, info->file_size);
768                 if (!info->file_map)
769                     goto delete_file;
770                 if (*(Uint8*)info->file_map == kMetaSignature) {
771                     info->file_type = eDBFileMeta;
772                     info->type_index = eFileIndexMeta;
773                 }
774                 else if (*(Uint8*)info->file_map == kDataSignature) {
775                     info->file_type = eDBFileData;
776                     info->type_index = eFileIndexData;
777                 }
778                 else if (*(Uint8*)info->file_map == kMapsSignature) {
779                     info->file_type = eDBFileMaps;
780                     info->type_index = eFileIndexMaps;
781                 }
782                 else {
783                     SRV_LOG(Critical, "Unknown file signature: " << *(Uint8*)info->file_map);
784                     goto delete_file;
785                 }
786                 info->index_head = (SFileIndexRec*)(info->file_map + file_size);
787                 --info->index_head;
788 #endif
789             }
790             return true;
791         }
792         catch (CSQLITE_Exception& ex) {
793             s_IndexDB.reset();
794             SRV_LOG(Error, "Index file is broken, reinitializing storage. " << ex);
795             CFile(index_name).Remove();
796         }
797     }
798     SRV_LOG(Critical, "Cannot open or create index file for the storage.");
799     return false;
800 }
801 
802 /// Reinitialize database cleaning all data from it.
803 /// Only database is cleaned, internal cache is left intact.
804 static void
s_CleanDatabase(void)805 s_CleanDatabase(void)
806 {
807     CNCAlerts::Register(CNCAlerts::eStorageReinit, "Data storage was reinitialized");
808     INFO("Reinitializing storage " << s_Prefix << " at " << s_Path);
809 
810     s_DBFiles->clear();
811     s_IndexDB->DeleteAllDBFiles();
812 }
813 
814 /// Check if database need re-initialization depending on different
815 /// parameters and state of the guard file protecting from several
816 /// instances running on the same database.
817 static bool
s_ReinitializeStorage(void)818 s_ReinitializeStorage(void)
819 {
820     try {
821         s_CleanDatabase();
822         return true;
823     }
824     catch (CSQLITE_Exception& ex) {
825         SRV_LOG(Error, "Error in soft reinitialization, trying harder. " << ex);
826         s_IndexDB.reset();
827         CFile(s_GetIndexFileName()).Remove();
828         return s_OpenIndexDB();
829     }
830 }
831 
832 static inline bool
s_IsIndexDeleted(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)833 s_IsIndexDeleted(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
834 {
835     Uint4 rec_num = Uint4(file_info->index_head - ind_rec);
836     return ind_rec->next_num == rec_num  ||  ind_rec->prev_num == rec_num;
837 }
838 
839 static SFileIndexRec*
s_GetIndOrDeleted(SNCDBFileInfo * file_info,Uint4 rec_num)840 s_GetIndOrDeleted(SNCDBFileInfo* file_info, Uint4 rec_num)
841 {
842     SFileIndexRec* ind_rec = file_info->index_head - rec_num;
843     char* min_ptr = file_info->file_map + kSignatureSize;
844     if ((char*)ind_rec < min_ptr  ||  ind_rec >= file_info->index_head) {
845         DB_CORRUPTED("Bad record number requested, rec_num=" << rec_num
846                      << " in file " << file_info->file_name
847                      << ". It produces pointer " << (void*)ind_rec
848                      << " which is not in the range between " << (void*)min_ptr
849                      << " and " << (void*)file_info->index_head << ".");
850     }
851     Uint4 next_num = ACCESS_ONCE(ind_rec->next_num);
852     if ((next_num != 0  &&  next_num < rec_num)  ||  ind_rec->prev_num > rec_num)
853     {
854         DB_CORRUPTED("Index record " << rec_num
855                      << " in file " << file_info->file_name
856                      << " contains bad next_num " << next_num
857                      << " and/or prev_num " << ind_rec->prev_num << ".");
858     }
859     return ind_rec;
860 }
861 
862 static SFileIndexRec*
s_GetIndexRec(SNCDBFileInfo * file_info,Uint4 rec_num)863 s_GetIndexRec(SNCDBFileInfo* file_info, Uint4 rec_num)
864 {
865     SFileIndexRec* ind_rec = s_GetIndOrDeleted(file_info, rec_num);
866     if (s_IsIndexDeleted(file_info, ind_rec)) {
867         DB_CORRUPTED("Index record " << rec_num
868                      << " in file " << file_info->file_name
869                      << " has been deleted.");
870     }
871     return ind_rec;
872 }
873 
874 static SFileIndexRec*
s_GetIndexRecTry(SNCDBFileInfo * file_info,Uint4 rec_num)875 s_GetIndexRecTry(SNCDBFileInfo* file_info, Uint4 rec_num)
876 {
877     SFileIndexRec* ind_rec = s_GetIndOrDeleted(file_info, rec_num);
878     if (s_IsIndexDeleted(file_info, ind_rec)) {
879         return nullptr;
880     }
881     return ind_rec;
882 }
883 
884 static void
s_DeleteIndexRecNoLock(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)885 s_DeleteIndexRecNoLock(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
886 {
887     SFileIndexRec* prev_rec;
888     SFileIndexRec* next_rec;
889     if (ind_rec->prev_num == 0) {
890         prev_rec = file_info->index_head;
891     } else {
892         prev_rec = s_GetIndexRec(file_info, ind_rec->prev_num);
893     }
894     if (ind_rec->next_num == 0) {
895         next_rec = file_info->index_head;
896     } else {
897         next_rec = s_GetIndexRec(file_info, ind_rec->next_num);
898     }
899     // These should be in the exactly this order to prevent unrecoverable
900     // corruption.
901     ACCESS_ONCE(prev_rec->next_num) = ind_rec->next_num;
902     ACCESS_ONCE(next_rec->prev_num) = ind_rec->prev_num;
903     ind_rec->next_num = ind_rec->prev_num = Uint4(file_info->index_head - ind_rec);
904 }
905 
906 static inline void
s_DeleteIndexRec(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)907 s_DeleteIndexRec(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
908 {
909     file_info->info_lock.Lock();
910     s_DeleteIndexRecNoLock(file_info, ind_rec);
911     file_info->info_lock.Unlock();
912 }
913 
914 static void
s_MoveRecToGarbageNoLock(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)915 s_MoveRecToGarbageNoLock(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
916 {
917     Uint4 size = ind_rec->rec_size;
918     if (size & 7)
919         size += 8 - (size & 7);
920     size += sizeof(SFileIndexRec);
921 
922     s_DeleteIndexRecNoLock(file_info, ind_rec);
923     if (file_info->used_size < size) {
924         file_info->used_size = 0;
925     } else {
926         file_info->used_size -= size;
927     }
928     file_info->garb_size += size;
929     if (file_info->garb_size > file_info->file_size) {
930         file_info->garb_size = file_info->file_size;
931     }
932     if (file_info->index_head->next_num == 0  &&  file_info->used_size != 0) {
933         SRV_FATAL("DB file info broken");
934     }
935 
936     AtomicAdd(s_GarbageSize,size);
937 }
938 
939 static void
s_MoveRecToGarbage(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)940 s_MoveRecToGarbage(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
941 {
942     file_info->info_lock.Lock();
943     s_MoveRecToGarbageNoLock(file_info, ind_rec);
944     file_info->info_lock.Unlock();
945 }
946 
947 
948 static CSrvRef<SNCDBFileInfo>
s_GetDBFileNoLock(Uint4 file_id)949 s_GetDBFileNoLock(Uint4 file_id)
950 {
951     TNCDBFilesMap::const_iterator it = s_DBFiles->find(file_id);
952     if (it == s_DBFiles->end()) {
953         DB_CORRUPTED("Cannot find file id: " << file_id);
954     }
955     return it->second;
956 }
957 
958 static CSrvRef<SNCDBFileInfo>
s_GetDBFile(Uint4 file_id)959 s_GetDBFile(Uint4 file_id)
960 {
961     s_DBFilesLock.Lock();
962     TNCDBFilesMap::const_iterator it = s_DBFiles->find(file_id);
963     if (it == s_DBFiles->end()) {
964         DB_CORRUPTED("Cannot find file id: " << file_id);
965     }
966     CSrvRef<SNCDBFileInfo> file_info = it->second;
967     s_DBFilesLock.Unlock();
968     return file_info;
969 }
970 
971 static CSrvRef<SNCDBFileInfo>
s_GetDBFileTry(Uint4 file_id)972 s_GetDBFileTry(Uint4 file_id)
973 {
974     s_DBFilesLock.Lock();
975     TNCDBFilesMap::const_iterator it = s_DBFiles->find(file_id);
976     CSrvRef<SNCDBFileInfo> file_info = (it != s_DBFiles->end()) ? it->second : CSrvRef<SNCDBFileInfo>(nullptr);
977     s_DBFilesLock.Unlock();
978 #ifdef _DEBUG
979     if (file_info.IsNull()) {
980         CNCAlerts::Register(CNCAlerts::eDebugDbFileNotFound, NStr::NumericToString(file_id));
981     }
982 #endif
983     return file_info;
984 }
985 
986 static inline Uint1
s_CalcMapDepthImpl(Uint8 size,Uint4 chunk_size,Uint2 map_size)987 s_CalcMapDepthImpl(Uint8 size, Uint4 chunk_size, Uint2 map_size)
988 {
989     Uint1 map_depth = 0;
990     Uint8 cnt_chunks = (size + chunk_size - 1) / chunk_size;
991     while (cnt_chunks > 1  &&  map_depth <= kNCMaxBlobMapsDepth) {
992         ++map_depth;
993         cnt_chunks = (cnt_chunks + map_size - 1) / map_size;
994     }
995     return map_depth;
996 }
997 
998 static Uint1
s_CalcMapDepth(Uint8 size,Uint4 chunk_size,Uint2 map_size)999 s_CalcMapDepth(Uint8 size, Uint4 chunk_size, Uint2 map_size)
1000 {
1001     Uint1 map_depth = s_CalcMapDepthImpl(size, chunk_size, map_size);
1002     if (map_depth > kNCMaxBlobMapsDepth) {
1003         DB_CORRUPTED("Size parameters are resulted in bad map_depth"
1004                      << ", size=" << size
1005                      << ", chunk_size=" << chunk_size
1006                      << ", map_size=" << map_size);
1007     }
1008     return map_depth;
1009 }
1010 
1011 static inline Uint2
s_CalcCntMapDowns(Uint4 rec_size)1012 s_CalcCntMapDowns(Uint4 rec_size)
1013 {
1014     SFileChunkMapRec map_rec;
1015     return Uint2((SNCDataCoord*)((char*)&map_rec + rec_size) - map_rec.down_coords);
1016 }
1017 
1018 static inline Uint4
s_CalcChunkDataSize(Uint4 rec_size)1019 s_CalcChunkDataSize(Uint4 rec_size)
1020 {
1021     SFileChunkDataRec data_rec;
1022     return Uint4((Uint1*)((char*)&data_rec + rec_size) - data_rec.chunk_data);
1023 }
1024 
1025 static inline Uint4
s_CalcMetaRecSize(Uint2 key_size)1026 s_CalcMetaRecSize(Uint2 key_size)
1027 {
1028     SFileMetaRec meta_rec;
1029     return Uint4((char*)&meta_rec.key_data[key_size] - (char*)&meta_rec);
1030 }
1031 
1032 static inline Uint4
s_CalcMapRecSize(Uint2 cnt_downs)1033 s_CalcMapRecSize(Uint2 cnt_downs)
1034 {
1035     SFileChunkMapRec map_rec;
1036     return Uint4((char*)&map_rec.down_coords[cnt_downs] - (char*)&map_rec);
1037 }
1038 
1039 static inline Uint4
s_CalcChunkRecSize(size_t data_size)1040 s_CalcChunkRecSize(size_t data_size)
1041 {
1042     SFileChunkDataRec data_rec;
1043     return Uint4((char*)&data_rec.chunk_data[data_size] - (char*)&data_rec);
1044 }
1045 
1046 static char*
s_CalcRecordAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1047 s_CalcRecordAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1048 {
1049     char* rec_ptr = file_info->file_map + ind_rec->offset;
1050     char* rec_end = rec_ptr + ind_rec->rec_size;
1051     char* min_ptr = file_info->file_map + kSignatureSize;
1052     if (rec_ptr < min_ptr  ||  rec_ptr >= (char*)ind_rec
1053         ||  rec_end < rec_ptr  ||  rec_end > (char*)ind_rec
1054         ||  (file_info->index_head - ind_rec == 1  &&  (char*)rec_ptr != min_ptr))
1055     {
1056         DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1057                      << " in file " << file_info->file_name
1058                      << " has wrong offset " << ind_rec->offset
1059                      << " and/or size " << ind_rec->rec_size
1060                      << ". It results in address " << (void*)rec_ptr
1061                      << " that doesn't fit between " << (void*)min_ptr
1062                      << " and " << (void*)ind_rec << ".");
1063     }
1064     return rec_ptr;
1065 }
1066 
1067 static SFileMetaRec*
s_CalcMetaAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1068 s_CalcMetaAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1069 {
1070     if (ind_rec->rec_type != eFileRecMeta) {
1071         DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1072                      << " in file " << file_info->file_name
1073                      << " has wrong type " << int(ind_rec->rec_type)
1074                      << " when should be " << int(eFileRecMeta) << ".");
1075     }
1076     SFileMetaRec* meta_rec = (SFileMetaRec*)s_CalcRecordAddress(file_info, ind_rec);
1077     Uint4 min_size = sizeof(SFileMetaRec) + 1;
1078     if (meta_rec->has_password)
1079         min_size += 16;
1080     if (ind_rec->rec_size < min_size) {
1081         DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1082                      << " in file " << file_info->file_name
1083                      << " has wrong rec_size " << ind_rec->rec_size
1084                      << " for meta record."
1085                      << " It's less than minimum " << min_size << ".");
1086     }
1087     return meta_rec;
1088 }
1089 
1090 static SFileChunkMapRec*
s_CalcMapAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1091 s_CalcMapAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1092 {
1093     if (ind_rec->rec_type != eFileRecChunkMap) {
1094         DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1095                      << " in file " << file_info->file_name
1096                      << " has wrong type " << int(ind_rec->rec_type)
1097                      << " when should be " << int(eFileRecChunkMap) << ".");
1098     }
1099     char* rec_ptr = s_CalcRecordAddress(file_info, ind_rec);
1100     Uint4 min_size = sizeof(SFileChunkMapRec);
1101     if (ind_rec->rec_size < min_size) {
1102         DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1103                      << " in file " << file_info->file_name
1104                      << " has wrong rec_size " << ind_rec->rec_size
1105                      << " for map record."
1106                      << " It's less than minimum " << min_size << ".");
1107     }
1108     return (SFileChunkMapRec*)rec_ptr;
1109 }
1110 
1111 static SFileChunkDataRec*
s_CalcChunkAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1112 s_CalcChunkAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1113 {
1114     if (ind_rec->rec_type != eFileRecChunkData) {
1115         DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1116                      << " in file " << file_info->file_name
1117                      << " has wrong type " << int(ind_rec->rec_type)
1118                      << " when should be " << int(eFileRecChunkData) << ".");
1119     }
1120     char* rec_ptr = s_CalcRecordAddress(file_info, ind_rec);
1121     Uint4 min_size = sizeof(SFileChunkDataRec);
1122     if (ind_rec->rec_size < min_size) {
1123         DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1124                      << " in file " << file_info->file_name
1125                      << " has wrong rec_size " << ind_rec->rec_size
1126                      << " for data record."
1127                      << " It's less than minimum " << min_size << ".");
1128     }
1129     return (SFileChunkDataRec*)rec_ptr;
1130 }
1131 
1132 /// Do set of procedures creating and initializing new database part and
1133 /// switching storage to using new database part as current one.
1134 static bool
s_CreateNewFile(size_t type_idx,bool need_lock)1135 s_CreateNewFile(size_t type_idx, bool need_lock)
1136 {
1137     // No need in mutex because m_LastBlob is changed only here and will be
1138     // executed only from one thread.
1139     Uint4 file_id;
1140     if (s_LastFileId == kNCMaxDBFileId)
1141         file_id = 1;
1142     else
1143         file_id = s_LastFileId + 1;
1144     //size_t true_type_idx = type_idx - eFileIndexMoveShift;
1145     size_t true_type_idx = type_idx;
1146     string file_name = s_GetFileName(file_id, s_AllFileTypes[true_type_idx]);
1147     SNCDBFileInfo* file_info = new SNCDBFileInfo();
1148     file_info->file_id      = file_id;
1149     file_info->file_name    = file_name;
1150     file_info->create_time  = CSrvTime::CurSecs();
1151     file_info->file_size    = s_NewFileSize;
1152     file_info->file_type    = s_AllFileTypes[true_type_idx];
1153     file_info->type_index   = EDBFileIndex(true_type_idx);
1154 
1155 #ifdef NCBI_OS_LINUX
1156     file_info->fd = open(file_name.c_str(),
1157                          O_RDWR | O_CREAT | O_TRUNC | O_NOATIME,
1158                          S_IRUSR | S_IWUSR);
1159     if (file_info->fd == -1) {
1160         SRV_LOG(Critical, "Cannot create new storage file, errno=" << errno);
1161         delete file_info;
1162         return false;
1163     }
1164     int trunc_res = ftruncate(file_info->fd, s_NewFileSize);
1165     if (trunc_res != 0) {
1166         SRV_LOG(Critical, "Cannot truncate new file, errno=" << errno);
1167         delete file_info;
1168         return false;
1169     }
1170     file_info->file_map = s_MapFile(file_info->fd, s_NewFileSize);
1171     if (!file_info->file_map) {
1172         delete file_info;
1173         return false;
1174     }
1175 #endif
1176 
1177     file_info->index_head = (SFileIndexRec*)(file_info->file_map + file_info->file_size);
1178     --file_info->index_head;
1179     file_info->index_head->next_num = file_info->index_head->prev_num = 0;
1180 
1181     s_IndexLock.Lock();
1182     try {
1183         s_IndexDB->NewDBFile(file_id, file_name);
1184     }
1185     catch (CSQLITE_Exception& ex) {
1186         s_IndexLock.Unlock();
1187         SRV_LOG(Critical, "Error while adding new storage file: " << ex);
1188         delete file_info;
1189         return false;
1190     }
1191     s_IndexLock.Unlock();
1192 
1193     s_LastFileId = file_id;
1194     switch (true_type_idx) {
1195     case eFileIndexMeta:
1196         *(Uint8*)file_info->file_map = kMetaSignature;
1197         break;
1198     case eFileIndexData:
1199         *(Uint8*)file_info->file_map = kDataSignature;
1200         break;
1201     case eFileIndexMaps:
1202         *(Uint8*)file_info->file_map = kMapsSignature;
1203         break;
1204     default:
1205         SRV_FATAL("Unsupported file type: " << true_type_idx);
1206     }
1207 
1208     if (need_lock) {
1209         s_DBFilesLock.Lock();
1210     }
1211     (*s_DBFiles)[file_id] = file_info;
1212     s_NextWriteLock.Lock();
1213     s_AllWritings[type_idx].next_file = file_info;
1214     s_NextWriteLock.Unlock();
1215     if (need_lock) {
1216         s_DBFilesLock.Unlock();
1217     }
1218     AtomicAdd(s_CurDBSize, kSignatureSize + sizeof(SFileIndexRec));
1219 
1220     return true;
1221 }
1222 
1223 static void
s_DeleteDBFile(const CSrvRef<SNCDBFileInfo> & file_info,bool need_lock)1224 s_DeleteDBFile(const CSrvRef<SNCDBFileInfo>& file_info, bool need_lock)
1225 {
1226     s_IndexLock.Lock();
1227     try {
1228         s_IndexDB->DeleteDBFile(file_info->file_id);
1229     }
1230     catch (CSQLITE_Exception& ex) {
1231         SRV_LOG(Critical, "Index database does not delete rows: " << ex);
1232     }
1233     s_IndexLock.Unlock();
1234 
1235     if (need_lock) {
1236         s_DBFilesLock.Lock();
1237     }
1238     SRV_LOG(Info, "Deleted file id: " << file_info->file_id);
1239     s_DBFiles->erase(file_info->file_id);
1240     if (need_lock) {
1241         s_DBFilesLock.Unlock();
1242     }
1243 
1244     AtomicSub(s_CurDBSize,   file_info->file_size);
1245     AtomicSub(s_GarbageSize, file_info->garb_size);
1246 
1247 }
1248 
SNCDBFileInfo(void)1249 SNCDBFileInfo::SNCDBFileInfo(void)
1250     : file_map(NULL),
1251       file_id(0),
1252       file_size(0),
1253       garb_size(0),
1254       used_size(0),
1255       index_head(NULL),
1256       is_releasing(false),
1257       fd(0),
1258       create_time(0),
1259       next_shrink_time(0)
1260 {
1261     cnt_unfinished.Set(0);
1262 }
1263 
~SNCDBFileInfo(void)1264 SNCDBFileInfo::~SNCDBFileInfo(void)
1265 {
1266     if (file_map)
1267         s_UnmapFile(file_map, file_size);
1268 #ifdef NCBI_OS_LINUX
1269     if (fd  &&  fd != -1  &&  close(fd)) {
1270         SRV_LOG(Critical, "Error closing file " << file_name
1271                           << ", errno=" << errno);
1272     }
1273     if (unlink(file_name.c_str())) {
1274 // if file not found, it is not error
1275         if (errno != ENOENT) {
1276             SRV_LOG(Critical, "Error deleting file " << file_name
1277                               << ", errno=" << errno);
1278         }
1279     }
1280 #endif
1281 }
1282 
1283 bool
Initialize(bool do_reinit)1284 CNCBlobStorage::Initialize(bool do_reinit)
1285 {
1286     for (size_t i = 0; i < s_CntAllFiles; ++i) {
1287         s_AllWritings[i].cur_file = s_AllWritings[i].next_file = NULL;
1288     }
1289     s_DBFiles = new TNCDBFilesMap();
1290 
1291     if (!s_ReadStorageParams())
1292         return false;
1293 
1294     if (!s_LockInstanceGuard())
1295         return false;
1296     if (!s_CleanStart) {
1297         do_reinit = true;
1298     }
1299     if (!s_OpenIndexDB())
1300         return false;
1301     if (do_reinit) {
1302         if (!s_ReinitializeStorage())
1303             return false;
1304         s_CleanStart = false;
1305     }
1306 
1307     for (Uint2 i = 1; i <= CNCDistributionConf::GetCntTimeBuckets(); ++i) {
1308         s_BucketsCache[i] = new SBucketCache();
1309         s_TimeTables[i] = new STimeTable();
1310 #if __NC_CACHEDATA_ALL_MONITOR
1311         s_AllCache[i] = new  SAllCacheTable();
1312 #endif
1313     }
1314 
1315     s_BlobCounter.Set(0);
1316     if (!s_DBFiles->empty()) {
1317         int max_create_time = 0;
1318         ITERATE(TNCDBFilesMap, it, (*s_DBFiles)) {
1319             CSrvRef<SNCDBFileInfo> file_info = it->second;
1320             if (file_info->create_time >= max_create_time) {
1321                 max_create_time = file_info->create_time;
1322                 s_LastFileId = file_info->file_id;
1323             }
1324         }
1325     }
1326 
1327     s_NewFileCreator = new CNewFileCreator();
1328     s_DiskFlusher = new CDiskFlusher();
1329     s_RecNoSaver = new CRecNoSaver();
1330     s_SpaceShrinker = new CSpaceShrinker();
1331     s_ExpiredCleaner = new CExpiredCleaner();
1332     CBlobCacher* cacher = new CBlobCacher();
1333     cacher->SetRunnable();
1334 
1335     return true;
1336 }
1337 
1338 void
Finalize(void)1339 CNCBlobStorage::Finalize(void)
1340 {
1341     s_IndexDB.reset();
1342 
1343     s_UnlockInstanceGuard();
1344 }
1345 
ReConfig(const CNcbiRegistry & new_reg,string &)1346 bool CNCBlobStorage::ReConfig(const CNcbiRegistry& new_reg, string& /*err_message*/)
1347 {
1348     return s_ReadVariableParams(new_reg);
1349 }
1350 
WriteSetup(CSrvSocketTask & task)1351 void CNCBlobStorage::WriteSetup(CSrvSocketTask& task)
1352 {
1353     string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1354     task.WriteText(eol).WriteText(kNCStorage_PathParam        ).WriteText(iss).WriteText(   s_Path).WriteText(eos);
1355     task.WriteText(eol).WriteText(kNCStorage_FilePrefixParam  ).WriteText(iss).WriteText(   s_Prefix).WriteText(eos);
1356     task.WriteText(eol).WriteText(kNCStorage_GuardNameParam   ).WriteText(iss).WriteText(   s_GuardName).WriteText(eos);
1357     task.WriteText(eol).WriteText(kNCStorage_GCBatchParam     ).WriteText(is ).WriteNumber( s_GCBatchSize);
1358     task.WriteText(eol).WriteText(kNCStorage_FileSizeParam    ).WriteText(str).WriteText(iss)
1359                                                    .WriteText(NStr::UInt8ToString_DataSize( s_NewFileSize)).WriteText(eos);
1360     task.WriteText(eol).WriteText(kNCStorage_FileSizeParam    ).WriteText(is ).WriteNumber( s_NewFileSize);
1361     task.WriteText(eol).WriteText(kNCStorage_GarbagePctParam  ).WriteText(is ).WriteNumber( s_MaxGarbagePct);
1362     task.WriteText(eol).WriteText(kNCStorage_MinDBSizeParam   ).WriteText(str).WriteText(iss)
1363                                                    .WriteText(NStr::UInt8ToString_DataSize( s_MinDBSize)).WriteText(eos);
1364     task.WriteText(eol).WriteText(kNCStorage_MinDBSizeParam   ).WriteText(is ).WriteNumber( s_MinDBSize);
1365     task.WriteText(eol).WriteText(kNCStorage_MoveLifeParam    ).WriteText(is ).WriteNumber( s_MinMoveLife);
1366     task.WriteText(eol).WriteText(kNCStorage_FailedMoveParam  ).WriteText(is ).WriteNumber( s_FailedMoveDelay);
1367     task.WriteText(eol).WriteText(kNCStorage_MinRecNoSaveParam).WriteText(is ).WriteNumber( s_MinRecNoSavePeriod);
1368     task.WriteText(eol).WriteText(kNCStorage_FlushTimeParam   ).WriteText(is ).WriteNumber( s_FlushTimePeriod);
1369     task.WriteText(eol).WriteText(kNCStorage_ExtraGCOnParam   ).WriteText(is ).WriteNumber( s_ExtraGCOnSize);
1370     task.WriteText(eol).WriteText(kNCStorage_ExtraGCOffParam  ).WriteText(is ).WriteNumber( s_ExtraGCOffSize);
1371     task.WriteText(eol).WriteText(kNCStorage_StopWriteOnParam ).WriteText(is ).WriteNumber( s_StopWriteOnSize);
1372     task.WriteText(eol).WriteText(kNCStorage_StopWriteOffParam).WriteText(is ).WriteNumber( s_StopWriteOffSize);
1373     task.WriteText(eol).WriteText(kNCStorage_MinFreeDiskParam ).WriteText(str).WriteText(iss)
1374                                                    .WriteText(NStr::UInt8ToString_DataSize( s_DiskFreeLimit)).WriteText(eos);
1375     task.WriteText(eol).WriteText(kNCStorage_MinFreeDiskParam ).WriteText(is ).WriteNumber( s_DiskFreeLimit);
1376     task.WriteText(eol).WriteText(kNCStorage_DiskCriticalParam).WriteText(str).WriteText(iss)
1377                                                    .WriteText(NStr::UInt8ToString_DataSize( s_DiskCritical)).WriteText(eos);
1378     task.WriteText(eol).WriteText(kNCStorage_DiskCriticalParam).WriteText(is ).WriteNumber( s_DiskCritical);
1379     task.WriteText(eol).WriteText(kNCStorage_MaxBlobSizeStore).WriteText(str).WriteText(iss)
1380                                                    .WriteText(NStr::UInt8ToString_DataSize( s_MaxBlobSizeStore)).WriteText(eos);
1381     task.WriteText(eol).WriteText(kNCStorage_MaxBlobSizeStore).WriteText(is ).WriteNumber( s_MaxBlobSizeStore);
1382     task.WriteText(eol).WriteText("db_limit_percentage_alert" ).WriteText(is ).WriteNumber( s_WarnLimitOnPct);
1383     task.WriteText(eol).WriteText("db_limit_percentage_alert_delta").WriteText(is).WriteNumber(s_WarnLimitOffPct);
1384     task.WriteText(eol).WriteText("write_back_soft_size_limit").WriteText(str).WriteText(iss)
1385                                                    .WriteText(NStr::UInt8ToString_DataSize( GetWBSoftSizeLimit())).WriteText(eos);
1386     task.WriteText(eol).WriteText("write_back_soft_size_limit").WriteText(is ).WriteNumber( GetWBSoftSizeLimit());
1387     task.WriteText(eol).WriteText("write_back_hard_size_limit").WriteText(str).WriteText(iss)
1388                                                    .WriteText(NStr::UInt8ToString_DataSize( GetWBHardSizeLimit())).WriteText(eos);
1389     task.WriteText(eol).WriteText("write_back_hard_size_limit").WriteText(is ).WriteNumber( GetWBHardSizeLimit());
1390     task.WriteText(eol).WriteText("write_back_timeout"        ).WriteText(is ).WriteNumber( GetWBWriteTimeout());
1391     task.WriteText(eol).WriteText("write_back_failed_delay"   ).WriteText(is ).WriteNumber( GetWBFailedWriteDelay());
1392     task.WriteText(eol).WriteText(kNCStorage_WbMemRelease).WriteText(is).WriteNumber(s_TaskPriorityWbMemRelease);
1393     task.WriteText(eol).WriteText(kNCStorage_FailedWriteSize  ).WriteText(is ).WriteNumber( CNCBlobAccessor::GetFailedWriteCount());
1394 }
1395 
WriteEnvInfo(CSrvSocketTask & task)1396 void CNCBlobStorage::WriteEnvInfo(CSrvSocketTask& task)
1397 {
1398     string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1399     task.WriteText(eol).WriteText("storagepath"  ).WriteText(iss).WriteText(   s_Path).WriteText(eos);
1400     task.WriteText(eol).WriteText(kNCStorage_GuardNameParam   ).WriteText(iss).WriteText(   s_GuardName).WriteText(eos);
1401     task.WriteText(eol).WriteText("DBindex"  ).WriteText(iss).WriteText(   s_GetIndexFileName()).WriteText(eos);
1402     task.WriteText(eol).WriteText("DBfiles_count").WriteText( is).WriteNumber( CNCBlobStorage::GetNDBFiles());
1403     task.WriteText(eol).WriteText("DBsize"       ).WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(s_CurDBSize)).WriteText(eos);
1404     task.WriteText(eol).WriteText("DBgarbage"    ).WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(s_GarbageSize)).WriteText(eos);
1405     task.WriteText(eol).WriteText("diskfreespace").WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(GetDiskFree())).WriteText(eos);
1406     task.WriteText(eol).WriteText("IsStopWrite").WriteText( is).WriteNumber( (int)s_IsStopWrite);
1407 }
1408 
WriteBlobStat(CSrvSocketTask & task)1409 void CNCBlobStorage::WriteBlobStat(CSrvSocketTask& task)
1410 {
1411     int expire = 0;
1412     Uint8 size = 0;
1413     Uint8 cache_count = 0, timetable_count = 0;
1414     map<Uint4, Uint8> blob_per_file;
1415 
1416     ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
1417         SBucketCache* cache = bkt->second;
1418         cache->lock.Lock();
1419         ITERATE(TKeyMap, it, cache->key_map) {
1420             ++cache_count;
1421 #if __NC_CACHEDATA_INTR_SET
1422             size += it->size;
1423             expire = max( expire, it->dead_time);
1424             ++blob_per_file[it->coord.file_id];
1425 #else
1426             size += (*it)->size;
1427             expire = max( expire, (*it)->dead_time);
1428             ++blob_per_file[(*it)->coord.file_id];
1429 #endif
1430         }
1431         cache->lock.Unlock();
1432     }
1433     ITERATE(TTimeBuckets, tt, s_TimeTables) {
1434         STimeTable* table = tt->second;
1435         table->lock.Lock();
1436         timetable_count += table->time_map.size();
1437         table->lock.Unlock();
1438     }
1439 
1440     string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1441     task.WriteText(eol).WriteText("CurBlobsCnt").WriteText( is).WriteNumber( s_CurBlobsCnt);
1442     task.WriteText(eol).WriteText("CurKeysCnt").WriteText( is).WriteNumber( s_CurKeysCnt);
1443     task.WriteText(eol).WriteText("Size").WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(size)).WriteText(eos);
1444     task.WriteText(eol).WriteText("InCache_count").WriteText( is).WriteNumber( cache_count);
1445     task.WriteText(eol).WriteText("TimeTable_count").WriteText( is).WriteNumber( timetable_count);
1446     task.WriteText(eol).WriteText("Purge_count").WriteText( is).WriteNumber( CNCBlobAccessor::GetPurgeCount());
1447 
1448 #if __NC_CACHEDATA_ALL_MONITOR
1449     size_t ncaches_count = 0;
1450     ITERATE(TAllCacheBuckets, tt, s_AllCache) {
1451         SAllCacheTable* table = tt->second;
1452         table->lock.Lock();
1453         ncaches_count += table->all_cache_set.size();
1454         table->lock.Unlock();
1455     }
1456     task.WriteText(eol).WriteText("caches_count").WriteText( is).WriteNumber( ncaches_count);
1457 #endif
1458 #if __NC_CACHEDATA_MONITOR
1459     s_AllCacheLock.Lock();
1460     size_t all_caches_count = s_AllCacheSet.size();
1461     s_AllCacheLock.Unlock();
1462     task.WriteText(eol).WriteText("all_caches_count").WriteText( is).WriteNumber( all_caches_count);
1463 #endif
1464 
1465     char buf[50];
1466     CSrvTime( expire).Print( buf, CSrvTime::eFmtJson);
1467     task.WriteText(eol).WriteText("expiration"  ).WriteText(is).WriteText(buf);
1468     map<Uint4, Uint8>::const_iterator b =  blob_per_file.begin();
1469     for ( ; b != blob_per_file.end(); ++b) {
1470         task.WriteText(eol);
1471         CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(b->first);
1472         if (file_info.IsNull()) {
1473             if (b->first != 0) {
1474                 task.WriteText("Unknown_").WriteNumber(b->first);
1475             } else {
1476                 task.WriteText("InMemory");
1477             }
1478         } else {
1479             task.WriteText(file_info->file_name);
1480         }
1481         task.WriteText( is).WriteNumber( b->second);
1482     }
1483 }
1484 
WriteBlobList(TNCBufferType & sendBuff,const CTempString & mask)1485 void CNCBlobStorage::WriteBlobList(TNCBufferType& sendBuff, const CTempString& mask)
1486 {
1487     string eol("\n{\""), more(",");
1488 
1489     sendBuff.reserve_mem(s_CurBlobsCnt * 250);
1490     sendBuff.append(",\n\"", 3).append("blobs",5).append("\": [",4);
1491 
1492     bool is_first = true;
1493 
1494     ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
1495         SBucketCache* cache = bkt->second;
1496         cache->lock.Lock();
1497 
1498         ITERATE(TKeyMap, it, cache->key_map) {
1499 #if __NC_CACHEDATA_INTR_SET
1500             const SNCCacheData& data = *it;
1501 #else
1502             const SNCCacheData& data = **it;
1503 #endif
1504             CNCBlobKeyLight key(data.key);
1505             if (!mask.empty() && mask != key.Cache()) {
1506                 continue;
1507             }
1508 
1509             if (is_first) {
1510                 is_first = false;
1511             } else {
1512                 sendBuff.append(",", 1);
1513             }
1514 
1515             sendBuff.append(eol.data(), eol.size());
1516             sendBuff.append(key.Cache().data(),  key.Cache().size()).append(":", 1);
1517             sendBuff.append(key.RawKey().data(), key.RawKey().size()).append(":", 1);
1518             sendBuff.append(key.SubKey().data(), key.SubKey().size()).append("\": [",4);
1519             string tmp;
1520             tmp = NStr::NumericToString(data.size);            sendBuff.append(tmp.data(), tmp.size());
1521             tmp = NStr::NumericToString(data.create_time / kUSecsPerSecond);
1522                                                                sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1523             tmp = NStr::NumericToString(data.create_server);   sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1524             tmp = NStr::NumericToString(data.create_id);       sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1525             tmp = NStr::NumericToString(data.dead_time);       sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1526             tmp = NStr::NumericToString(data.expire);          sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1527             tmp = NStr::NumericToString(data.ver_expire);      sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1528             tmp = NStr::NumericToString(data.coord.file_id);   sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1529             tmp = NStr::NumericToString(data.coord.rec_num);   sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1530             tmp = NStr::NumericToString(data.saved_dead_time); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1531             tmp = NStr::NumericToString(data.time_bucket);     sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1532             tmp = NStr::NumericToString(data.map_size);        sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1533             tmp = NStr::NumericToString(data.chunk_size);      sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1534             sendBuff.append("]}", 2);
1535         }
1536         cache->lock.Unlock();
1537     }
1538     sendBuff.append("\n]",2);
1539 }
1540 
WriteDbInfo(TNCBufferType & task,const CTempString & mask)1541 void CNCBlobStorage::WriteDbInfo(TNCBufferType& task, const CTempString& mask)
1542 {
1543     Uint4 id = 0;
1544     bool use_mask = false, is_id = false, is_audit = false;
1545     if (!mask.empty()) {
1546         is_audit = mask == "audit";
1547         if (!is_audit) {
1548             use_mask = true;
1549             try {
1550                 id = NStr::StringToUInt(mask);
1551                 is_id = true;
1552             } catch (...) {
1553             }
1554         }
1555     }
1556 
1557     string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1558     task.WriteText(eol).WriteText("storagepath"  ).WriteText(iss).WriteText(   s_Path).WriteText(eos);
1559     task.WriteText(eol).WriteText("DBfiles\": [\n");
1560     int now = CSrvTime::CurSecs();
1561     s_DBFilesLock.Lock();
1562     bool is_first = true;
1563     for (TNCDBFilesMap::iterator it = s_DBFiles->begin();  it != s_DBFiles->end(); ++it) {
1564         if (use_mask) {
1565             if (is_id && it->second->file_id != id) {
1566                 continue;
1567             }
1568             if (NStr::FindNoCase(it->second->file_name, mask) == NPOS) {
1569                 continue;
1570             }
1571         }
1572         if (!is_first) {
1573             task.WriteText(",");
1574         }
1575         is_first = false;
1576         task.WriteText("{\n");
1577         task.WriteText("\"").WriteText(it->second->file_name).WriteText("\": {");
1578         task.WriteText("\n\"").WriteText("file_type").WriteText( is).WriteNumber( (int)(it->second->file_type));
1579         task.WriteText(eol).WriteText("file_id").WriteText( is).WriteNumber( it->second->file_id);
1580         task.WriteText(eol).WriteText("file_size").WriteText( is).WriteNumber( it->second->file_size);
1581         task.WriteText(eol).WriteText("garb_size").WriteText( is).WriteNumber( it->second->garb_size);
1582         task.WriteText(eol).WriteText("used_size").WriteText( is).WriteNumber( it->second->used_size);
1583 
1584 if (is_audit) {
1585         CSrvRef<SNCDBFileInfo>& file_info = it->second;
1586         Uint4 expired_count = 0, null_cache_count = 0, wrong_cache_count = 0;
1587         map<int, int> rec_alerts;
1588         map<int, int> rec_count;
1589         set<int> file_ref;
1590         set<const SNCCacheData*> all_cache_data;
1591 
1592         ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
1593             SBucketCache* cache = bkt->second;
1594             cache->lock.Lock();
1595             ITERATE(TKeyMap, it, cache->key_map) {
1596 #if __NC_CACHEDATA_INTR_SET
1597                 all_cache_data.insert(&(*it));
1598 #else
1599                 all_cache_data.insert(*it);
1600 #endif
1601             }
1602             cache->lock.Unlock();
1603         }
1604 
1605 // see x_PreCacheRecNums
1606         file_info->info_lock.Lock();
1607         SFileIndexRec* ind_rec = file_info->index_head;
1608         Uint4 prev_rec_num = 0;
1609         char* min_ptr = file_info->file_map + kSignatureSize;
1610         while (ind_rec->next_num != 0) {
1611             Uint4 rec_num = ind_rec->next_num;
1612             if (rec_num <= prev_rec_num) {
1613                 ++rec_alerts[0];
1614                 ind_rec->next_num = 0;
1615                 continue;
1616             }
1617             SFileIndexRec* next_ind = file_info->index_head - rec_num;
1618             if ((char*)next_ind < min_ptr  ||  next_ind >= ind_rec) {
1619                 ++rec_alerts[1];
1620                 ind_rec->next_num = 0;
1621                 continue;
1622             }
1623             char* next_rec_start = file_info->file_map + next_ind->offset;
1624             if (next_rec_start < min_ptr  ||  next_rec_start > (char*)next_ind
1625                 ||  (rec_num == 1  &&  next_rec_start != min_ptr)) {
1626                 ++rec_alerts[2];
1627                 ind_rec->next_num = next_ind->next_num;
1628                 continue;
1629             }
1630             char* next_rec_end;
1631             next_rec_end = next_rec_start + next_ind->rec_size;
1632             if (next_rec_end < next_rec_start  ||  next_rec_end > (char*)next_ind) {
1633                 ++rec_alerts[3];
1634                 ind_rec->next_num = next_ind->next_num;
1635                 continue;
1636             }
1637             if (next_ind->prev_num != prev_rec_num) {
1638                 next_ind->prev_num = prev_rec_num;
1639                 ++rec_alerts[4];
1640             }
1641 
1642             switch (next_ind->rec_type) {
1643             case eFileRecChunkData:
1644                 if (file_info->file_type != eDBFileData) {
1645                     ++rec_alerts[5];
1646                     ind_rec->next_num = next_ind->next_num;
1647                     continue;
1648                 }
1649                 if (next_ind->rec_size < sizeof(SFileChunkDataRec)) {
1650                     ++rec_alerts[6];
1651                     ind_rec->next_num = next_ind->next_num;
1652                     continue;
1653                 }
1654                 break;
1655             case eFileRecChunkMap:
1656                 if (file_info->file_type != eDBFileMaps) {
1657                     ++rec_alerts[7];
1658                     ind_rec->next_num = next_ind->next_num;
1659                     continue;
1660                 }
1661                 if (next_ind->rec_size < sizeof(SFileChunkMapRec)) {
1662                     ++rec_alerts[8];
1663                     ind_rec->next_num = next_ind->next_num;
1664                     continue;
1665                 }
1666                 break;
1667             case eFileRecMeta:
1668                 if (file_info->file_type != eDBFileMeta) {
1669                     ++rec_alerts[9];
1670                     ind_rec->next_num = next_ind->next_num;
1671                     continue;
1672                 }
1673                 SFileMetaRec* meta_rec;
1674                 meta_rec = (SFileMetaRec*)next_rec_start;
1675                 Uint4 min_rec_size;
1676                 // Minimum key length is 2, so we are adding 1 below
1677                 min_rec_size = sizeof(SFileChunkMapRec) + 1;
1678                 if (meta_rec->has_password)
1679                     min_rec_size += 16;
1680                 if (next_ind->rec_size < min_rec_size) {
1681                     ++rec_alerts[10];
1682                     ind_rec->next_num = next_ind->next_num;
1683                     continue;
1684                 }
1685                 break;
1686             default:
1687                     ++rec_alerts[11];
1688                     ind_rec->next_num = next_ind->next_num;
1689                     continue;
1690             }
1691 
1692             ++rec_count[next_ind->rec_type];
1693             file_ref.insert( next_ind->chain_coord.file_id);
1694             if (next_ind->rec_type == eFileRecMeta) {
1695                 SFileMetaRec* meta_rec = s_CalcMetaAddress(file_info, next_ind);
1696                 if (meta_rec->dead_time < now) {
1697                     ++expired_count;
1698                 }
1699             }
1700             if (next_ind->cache_data == nullptr) {
1701                 ++null_cache_count;
1702             } else if (all_cache_data.find(next_ind->cache_data) == all_cache_data.end()) {
1703                 ++wrong_cache_count;
1704             }
1705 
1706             min_ptr = next_rec_end;
1707             ind_rec = next_ind;
1708             prev_rec_num = rec_num;
1709         }
1710         file_info->info_lock.Unlock();
1711 
1712         for(map<int, int>::const_iterator r = rec_count.begin(); r != rec_count.end(); ++r) {
1713             task.WriteText(eol).WriteText("records_type_").WriteNumber(r->first).WriteText(": ").WriteNumber(r->second);
1714         }
1715         for(map<int, int>::const_iterator r = rec_alerts.begin(); r != rec_alerts.end(); ++r) {
1716             task.WriteText(eol).WriteText("alert_type_").WriteNumber(r->first).WriteText(": ").WriteNumber(r->second);
1717         }
1718         if (file_info->file_type == eDBFileMeta) {
1719             task.WriteText(eol).WriteText("rec_expired").WriteText( is).WriteNumber( expired_count);
1720         }
1721         task.WriteText(eol).WriteText("null_cache_count").WriteText( is).WriteNumber(null_cache_count);
1722         task.WriteText(eol).WriteText("wrong_cache_count").WriteText( is).WriteNumber(wrong_cache_count);
1723         task.WriteText(eol).WriteText("ref_files").WriteText( is).WriteText("[");
1724         bool first = true;
1725         for(set<int>::const_iterator f = file_ref.begin(); f != file_ref.end(); ++f) {
1726             task.WriteText(first ? " " : ", ").WriteNumber(*f);
1727             first = false;
1728         }
1729         task.WriteText("]");
1730 } // is_audit
1731 
1732         task.WriteText(eol).WriteText("is_releasing").WriteText( is).WriteBool( it->second->is_releasing);
1733         char buf[50];
1734         CSrvTime(it->second->create_time).Print(buf, CSrvTime::eFmtJson);
1735         task.WriteText(eol).WriteText("create_time").WriteText( is).WriteText( buf);
1736         CSrvTime(it->second->next_shrink_time).Print(buf, CSrvTime::eFmtJson);
1737         task.WriteText(eol).WriteText("next_shrink_time").WriteText( is).WriteText( buf);
1738         task.WriteText(eol).WriteText("cnt_unfinished").WriteText( is).WriteNumber( it->second->cnt_unfinished.Get());
1739         task.WriteText("}\n}");
1740     }
1741     s_DBFilesLock.Unlock();
1742     task.WriteText("]");
1743 }
1744 
1745 string
PrintablePassword(const string & pass)1746 CNCBlobStorage::PrintablePassword(const string& pass)
1747 {
1748     static const char digits[] = {'0', '1', '2', '3', '4', '5', '6', '7',
1749                                   '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
1750 
1751     string result(32, '\0');
1752     for (int i = 0, j = 0; i < 16; ++i) {
1753         Uint1 c = Uint1(pass[i]);
1754         result[j++] = digits[c >> 4];
1755         result[j++] = digits[c & 0xF];
1756     }
1757     return result;
1758 }
1759 
1760 static SBucketCache*
s_GetBucketCache(Uint2 bucket)1761 s_GetBucketCache(Uint2 bucket)
1762 {
1763     TBucketCacheMap::const_iterator it = s_BucketsCache.find(bucket);
1764     if (it == s_BucketsCache.end()) {
1765         SRV_FATAL("Unexpected state");
1766     }
1767     return it->second;
1768 }
1769 
1770 static SNCCacheData*
s_GetKeyCacheData(Uint2 time_bucket,const string & key,bool need_create)1771 s_GetKeyCacheData(Uint2 time_bucket, const string& key, bool need_create)
1772 {
1773     SBucketCache* cache = s_GetBucketCache(time_bucket);
1774     SNCCacheData* data = NULL;
1775     cache->lock.Lock();
1776 #if __NC_CACHEDATA_INTR_SET
1777     if (need_create) {
1778         TKeyMap::insert_commit_data commit_data;
1779         pair<TKeyMap::iterator, bool> ins_res
1780             = cache->key_map.insert_unique_check(key, SCacheKeyCompare(), commit_data);
1781         if (ins_res.second) {
1782             data = new SNCCacheData();
1783             data->key = key;
1784             data->time_bucket = time_bucket;
1785             cache->key_map.insert_unique_commit(*data, commit_data);
1786             AtomicAdd(s_CurKeysCnt, 1);
1787 
1788 #if __NC_CACHEDATA_ALL_MONITOR
1789             SAllCacheTable* table = s_AllCache[time_bucket];
1790             table->lock.Lock();
1791             table->all_cache_set.insert(data);
1792             table->lock.Unlock();
1793 #endif
1794         }
1795         else {
1796             data = &*ins_res.first;
1797         }
1798     }
1799     else {
1800         TKeyMap::iterator it = cache->key_map.find(key, SCacheKeyCompare());
1801         if (it == cache->key_map.end()) {
1802             data = NULL;
1803         }
1804         else {
1805             data = &*it;
1806         }
1807     }
1808 #else  // __NC_CACHEDATA_INTR_SET
1809     SNCCacheData search_mask;
1810     search_mask.key = key;
1811     TKeyMap::iterator it = cache->key_map.find( &search_mask);
1812     if (it != cache->key_map.end()) {
1813         data = *it;
1814 #ifdef _DEBUG
1815         if (data->time_bucket != time_bucket) {
1816             abort();
1817         }
1818 #endif
1819     } else if (need_create) {
1820         data = new SNCCacheData();
1821         data->key = key;
1822         data->time_bucket = time_bucket;
1823         cache->key_map.insert(data);
1824         AtomicAdd(s_CurKeysCnt, 1);
1825 
1826 #if __NC_CACHEDATA_ALL_MONITOR
1827         SAllCacheTable* table = s_AllCache[time_bucket];
1828         table->lock.Lock();
1829         table->all_cache_set.insert(data);
1830         table->lock.Unlock();
1831 #endif
1832     }
1833 #endif //__NC_CACHEDATA_INTR_SET
1834     if (data) {
1835         CNCBlobStorage::ReferenceCacheData(data);
1836     }
1837     cache->lock.Unlock();
1838     return data;
1839 }
1840 
1841 void
ReferenceCacheData(SNCCacheData * data)1842 CNCBlobStorage::ReferenceCacheData(SNCCacheData* data)
1843 {
1844     data->ref_cnt.Add(1);
1845 }
1846 
1847 void
ReleaseCacheData(SNCCacheData * data)1848 CNCBlobStorage::ReleaseCacheData(SNCCacheData* data)
1849 {
1850     if (data->ref_cnt.Get() == 0) {
1851 #ifdef _DEBUG
1852 CNCAlerts::Register(CNCAlerts::eDebugReleaseCacheData1, "ref_cnt is 0");
1853 #endif
1854         return;
1855     }
1856     if (data->ref_cnt.Add(-1) != 0) {
1857         return;
1858     }
1859 
1860     Uint2 time_bucket = data->time_bucket;
1861     TBucketCacheMap::const_iterator it = s_BucketsCache.find(time_bucket);
1862     if (it == s_BucketsCache.end()) {
1863         return;
1864     }
1865     SBucketCache* cache = it->second;
1866     cache->lock.Lock();
1867 
1868 #ifdef _DEBUG
1869     if (!data->coord.empty() && data->dead_time == 0) {
1870         abort();
1871     }
1872 #endif
1873 
1874     if (data->ref_cnt.Get() != 0  ||  !data->coord.empty()
1875 #if 1
1876 #if __NC_CACHEDATA_INTR_SET
1877         ||  !((TKeyMapHook*)data)->is_linked()
1878 #endif
1879 #endif
1880        )
1881     {
1882         cache->lock.Unlock();
1883         return;
1884     }
1885 #if __NC_CACHEDATA_INTR_SET
1886     size_t n = cache->key_map.erase(*data);
1887 #else
1888     size_t n = cache->key_map.erase(data);
1889 #endif
1890     cache->lock.Unlock();
1891 
1892 #if __NC_CACHEDATA_ALL_MONITOR
1893     SAllCacheTable* table = s_AllCache[time_bucket];
1894     table->lock.Lock();
1895     table->all_cache_set.erase(data);
1896     table->lock.Unlock();
1897 #endif
1898 
1899     if (n != 0) {
1900         AtomicSub(s_CurKeysCnt, 1);
1901         data->CallRCU();
1902     }
1903 #ifdef _DEBUG
1904     else {
1905 CNCAlerts::Register(CNCAlerts::eDebugReleaseCacheData2, "nothing erased in key_map");
1906     }
1907 #endif
1908 }
1909 
1910 static void
s_InitializeAccessor(CNCBlobAccessor * acessor)1911 s_InitializeAccessor(CNCBlobAccessor* acessor)
1912 {
1913     const string& key = acessor->GetBlobKey();
1914     Uint2 time_bucket = acessor->GetTimeBucket();
1915     bool need_create = acessor->GetAccessType() == eNCCreate
1916                        ||  acessor->GetAccessType() == eNCCopyCreate;
1917     SNCCacheData* data = s_GetKeyCacheData(time_bucket, key, need_create);
1918     acessor->Initialize(data);
1919     if (data) {
1920         CNCBlobStorage::ReleaseCacheData(data);
1921     }
1922 }
1923 
1924 CNCBlobAccessor*
GetBlobAccess(ENCAccessType access,const string & key,const string & password,Uint2 time_bucket)1925 CNCBlobStorage::GetBlobAccess(ENCAccessType access,
1926                               const string& key,
1927                               const string& password,
1928                               Uint2         time_bucket)
1929 {
1930     CNCBlobAccessor* accessor = new CNCBlobAccessor();
1931     accessor->Prepare(key, password, time_bucket, access);
1932     s_InitializeAccessor(accessor);
1933     return accessor;
1934 }
1935 
1936 static inline void
s_SwitchToNextFile(SWritingInfo & w_info)1937 s_SwitchToNextFile(SWritingInfo& w_info)
1938 {
1939     w_info.cur_file = w_info.next_file;
1940     w_info.next_file = NULL;
1941     w_info.next_rec_num = 1;
1942     w_info.next_offset = kSignatureSize;
1943     w_info.left_file_size = w_info.cur_file->file_size
1944                              - (kSignatureSize + sizeof(SFileIndexRec));
1945 }
1946 
1947 static bool
s_GetNextWriteCoord(EDBFileIndex file_index,Uint4 rec_size,SNCDataCoord & coord,CSrvRef<SNCDBFileInfo> & file_info,SFileIndexRec * & ind_rec)1948 s_GetNextWriteCoord(EDBFileIndex file_index,
1949                     Uint4 rec_size,
1950                     SNCDataCoord& coord,
1951                     CSrvRef<SNCDBFileInfo>& file_info,
1952                     SFileIndexRec*& ind_rec)
1953 {
1954     Uint4 true_rec_size = rec_size;
1955     if (true_rec_size & 7)
1956         true_rec_size += 8 - (true_rec_size & 7);
1957     Uint4 reserve_size = true_rec_size + sizeof(SFileIndexRec);
1958 
1959     bool need_signal_switch = false;
1960     s_NextWriteLock.Lock();
1961     SWritingInfo& w_info = s_AllWritings[file_index];
1962 
1963     if (w_info.cur_file == NULL) {
1964         s_NextWriteLock.Unlock();
1965         return false;
1966     }
1967     if (w_info.left_file_size < reserve_size  &&  w_info.next_file == NULL) {
1968         s_NextWriteLock.Unlock();
1969         return false;
1970     }
1971     if (w_info.left_file_size < reserve_size) {
1972         AtomicAdd(s_CurDBSize,  w_info.left_file_size);
1973         AtomicAdd(s_GarbageSize,w_info.left_file_size);
1974         w_info.cur_file->info_lock.Lock();
1975         w_info.cur_file->garb_size += w_info.left_file_size;
1976         if (w_info.cur_file->used_size
1977                 + w_info.cur_file->garb_size >= w_info.cur_file->file_size)
1978         {
1979             SRV_FATAL("WritingInfo broken");
1980         }
1981         w_info.cur_file->info_lock.Unlock();
1982 
1983         Uint4 last_rec_num = w_info.next_rec_num - 1;
1984         SFileIndexRec* last_ind = w_info.cur_file->index_head - last_rec_num;
1985         s_LockFileMem(last_ind, (last_rec_num + 1) * sizeof(SFileIndexRec));
1986 
1987         s_SwitchToNextFile(w_info);
1988         need_signal_switch = true;
1989     }
1990     file_info = w_info.cur_file;
1991     coord.file_id = file_info->file_id;
1992     coord.rec_num = w_info.next_rec_num++;
1993     ind_rec = file_info->index_head - coord.rec_num;
1994     ind_rec->offset = w_info.next_offset;
1995     ind_rec->rec_size = rec_size;
1996     ind_rec->rec_type = eFileRecNone;
1997     ind_rec->cache_data = NULL;
1998     ind_rec->chain_coord.clear();
1999 
2000     w_info.next_offset += true_rec_size;
2001     w_info.left_file_size -= reserve_size;
2002     AtomicAdd(s_CurDBSize,  reserve_size);
2003 
2004     file_info->cnt_unfinished.Add(1);
2005 
2006     file_info->info_lock.Lock();
2007 
2008     file_info->used_size += reserve_size;
2009     SFileIndexRec* index_head = file_info->index_head;
2010     Uint4 prev_rec_num = index_head->prev_num;
2011     SFileIndexRec* prev_ind_rec = index_head - prev_rec_num;
2012     if (prev_ind_rec->next_num != 0) {
2013         SRV_FATAL("File index broken");
2014     }
2015     ind_rec->prev_num = prev_rec_num;
2016     ind_rec->next_num = 0;
2017     prev_ind_rec->next_num = coord.rec_num;
2018     index_head->prev_num = coord.rec_num;
2019 
2020     file_info->info_lock.Unlock();
2021     s_NextWriteLock.Unlock();
2022 
2023     if (need_signal_switch)
2024         s_NewFileCreator->SetRunnable();
2025 
2026     return true;
2027 }
2028 
2029 bool
ReadBlobInfo(SNCBlobVerData * ver_data)2030 CNCBlobStorage::ReadBlobInfo(SNCBlobVerData* ver_data)
2031 {
2032     if (ver_data->coord.empty()) {
2033 #ifdef _DEBUG
2034 CNCAlerts::Register(CNCAlerts::eDebugReadBlobInfoFailed0, "ReadBlobInfo: coords empty");
2035 #endif
2036         return false;
2037     }
2038     CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(ver_data->coord.file_id);
2039     if (file_info.IsNull()) {
2040 #ifdef _DEBUG
2041 CNCAlerts::Register(CNCAlerts::eDebugReadBlobInfoFailed1, "ReadBlobInfo: file_info");
2042 #endif
2043         return false;
2044     }
2045     SFileIndexRec* ind_rec = s_GetIndexRecTry(file_info, ver_data->coord.rec_num);
2046     if (!ind_rec) {
2047 #ifdef _DEBUG
2048 CNCAlerts::Register(CNCAlerts::eDebugReadBlobInfoFailed2, "ReadBlobInfo: ind_rec");
2049 #endif
2050         return false;
2051     }
2052     if (ind_rec->cache_data != ver_data->ver_manager->GetCacheData()) {
2053         DB_CORRUPTED("Index record " << ver_data->coord.rec_num
2054                      << " in file " << file_info->file_name
2055                      << " has wrong cache_data pointer " << ind_rec->cache_data
2056                      << " when should be " << ver_data->ver_manager->GetCacheData() << ".");
2057     }
2058     SFileMetaRec* meta_rec = s_CalcMetaAddress(file_info, ind_rec);
2059     ver_data->create_time = meta_rec->create_time;
2060     ver_data->ttl = meta_rec->ttl;
2061     ver_data->expire = meta_rec->expire;
2062     ver_data->dead_time = meta_rec->dead_time;
2063     ver_data->size = meta_rec->size;
2064     if (meta_rec->has_password)
2065         ver_data->password.assign(meta_rec->key_data, 16);
2066     else
2067         ver_data->password.clear();
2068     ver_data->blob_ver = meta_rec->blob_ver;
2069     ver_data->ver_ttl = meta_rec->ver_ttl;
2070     ver_data->ver_expire = meta_rec->ver_expire;
2071     ver_data->create_id = meta_rec->create_id;
2072     ver_data->create_server = meta_rec->create_server;
2073     ver_data->data_coord = ind_rec->chain_coord;
2074 
2075     ver_data->map_depth = s_CalcMapDepth(ver_data->size,
2076                                          ver_data->chunk_size,
2077                                          ver_data->map_size);
2078     return true;
2079 }
2080 
2081 static bool
s_UpdateUpCoords(SFileChunkMapRec * map_rec,SFileIndexRec * map_ind,SNCDataCoord coord)2082 s_UpdateUpCoords(SFileChunkMapRec* map_rec,
2083                  SFileIndexRec* map_ind,
2084                  SNCDataCoord coord)
2085 {
2086     Uint2 cnt_downs = s_CalcCntMapDowns(map_ind->rec_size);
2087     for (Uint2 i = 0; i < cnt_downs; ++i) {
2088         SNCDataCoord down_coord = map_rec->down_coords[i];
2089         CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(down_coord.file_id);
2090         if (file_info.IsNull()) {
2091 #ifdef _DEBUG
2092 CNCAlerts::Register(CNCAlerts::eDebugUpdateUpCoords1, "UpdateUpCoords: file_info");
2093 #endif
2094             return false;
2095         }
2096         SFileIndexRec* ind_rec = s_GetIndexRecTry(file_info, down_coord.rec_num);
2097         if (!ind_rec) {
2098 #ifdef _DEBUG
2099 CNCAlerts::Register(CNCAlerts::eDebugUpdateUpCoords2, "UpdateUpCoords: ind_rec");
2100 #endif
2101             return false;
2102         }
2103         ind_rec->chain_coord = coord;
2104     }
2105     return true;
2106 }
2107 
2108 static bool
s_SaveOneMapImpl(SNCChunkMapInfo * save_map,SNCChunkMapInfo * up_map,Uint2 cnt_downs,Uint2 map_size,Uint1 map_depth,SNCCacheData * cache_data)2109 s_SaveOneMapImpl(SNCChunkMapInfo* save_map,
2110                  SNCChunkMapInfo* up_map,
2111                  Uint2 cnt_downs,
2112                  Uint2 map_size,
2113                  Uint1 map_depth,
2114                  SNCCacheData* cache_data)
2115 {
2116     SNCDataCoord map_coord;
2117     CSrvRef<SNCDBFileInfo> map_file;
2118     SFileIndexRec* map_ind;
2119     Uint4 rec_size = s_CalcMapRecSize(cnt_downs);
2120     if (!s_GetNextWriteCoord(eFileIndexMaps, rec_size, map_coord, map_file, map_ind)) {
2121 #ifdef _DEBUG
2122 CNCAlerts::Register(CNCAlerts::eDebugSaveOneMapImpl1,"s_GetNextWriteCoord failed");
2123 #endif
2124         return false;
2125     }
2126 
2127     map_ind->rec_type = eFileRecChunkMap;
2128     map_ind->cache_data = cache_data;
2129     SFileChunkMapRec* map_rec = s_CalcMapAddress(map_file, map_ind);
2130     map_rec->map_idx = save_map->map_idx;
2131     map_rec->map_depth = map_depth;
2132     size_t coords_size = cnt_downs * sizeof(map_rec->down_coords[0]);
2133     memcpy(map_rec->down_coords, save_map->coords, coords_size);
2134 #if 0
2135 cout << "s_SaveOneMapImpl: "
2136      << map_coord.file_id << "[" << map_coord.rec_num << "] ="
2137      << " cnt_downs=" << cnt_downs
2138      << " map_idx=" << map_rec->map_idx
2139      << " map_depth=" << (int)(map_rec->map_depth)
2140      << " down_coords=";
2141     for (Uint2 c=0; c<cnt_downs; ++c) {
2142         cout << " " << map_rec->down_coords[c].file_id
2143              << "[" << map_rec->down_coords[c].rec_num
2144              << "]";
2145     }
2146     cout << endl;
2147 #endif
2148 
2149     up_map->coords[save_map->map_idx] = map_coord;
2150     ++save_map->map_idx;
2151     memset(save_map->coords, 0, map_size * sizeof(save_map->coords[0]));
2152     if (!s_UpdateUpCoords(map_rec, map_ind, map_coord)) {
2153 #ifdef _DEBUG
2154 CNCAlerts::Register(CNCAlerts::eDebugSaveOneMapImpl2,"s_UpdateUpCoords failed");
2155 #endif
2156         return false;
2157     }
2158 
2159     map_file->cnt_unfinished.Add(-1);
2160 
2161     return true;
2162 }
2163 
2164 static bool
s_SaveChunkMap(SNCBlobVerData * ver_data,SNCCacheData * cache_data,SNCChunkMapInfo ** maps,Uint2 cnt_downs,bool save_all_deps)2165 s_SaveChunkMap(SNCBlobVerData* ver_data,
2166                SNCCacheData* cache_data,
2167                SNCChunkMapInfo** maps,
2168                Uint2 cnt_downs,
2169                bool save_all_deps)
2170 {
2171     if (!s_SaveOneMapImpl(maps[0], maps[1], cnt_downs, ver_data->map_size, 1, cache_data)) {
2172         return false;
2173     }
2174 
2175     Uint1 cur_level = 0;
2176 // changed 11may17
2177 #if 0
2178 // prev ver
2179     while (cur_level < kNCMaxBlobMapsDepth
2180            &&  (maps[cur_level]->map_idx == ver_data->map_size
2181                 ||  (maps[cur_level]->map_idx > 1  &&  save_all_deps)))
2182     {
2183         cnt_downs = maps[cur_level]->map_idx;
2184         ++cur_level;
2185         if (!s_SaveOneMapImpl(maps[cur_level], maps[cur_level + 1], cnt_downs,
2186                               ver_data->map_size, cur_level + 1, cache_data))
2187         {
2188             return false;
2189         }
2190     }
2191 #else
2192 // new ver
2193     Uint1 depth = s_CalcMapDepthImpl(ver_data->size,
2194                                      ver_data->chunk_size,
2195                                      ver_data->map_size);
2196     Uint2 prev_idx = maps[cur_level]->map_idx;
2197     while (depth > 1 && (cur_level+1) < kNCMaxBlobMapsDepth
2198            &&  (maps[cur_level]->map_idx == ver_data->map_size
2199                 ||  (prev_idx > 0  &&  save_all_deps)))
2200     {
2201         cnt_downs = maps[cur_level]->map_idx;
2202         ++cur_level;
2203         prev_idx = maps[cur_level]->map_idx;
2204         if (!s_SaveOneMapImpl(maps[cur_level], maps[cur_level + 1], cnt_downs,
2205                               ver_data->map_size, cur_level + 1, cache_data))
2206         {
2207             return false;
2208         }
2209     }
2210 #endif
2211 
2212     return true;
2213 }
2214 
2215 bool
WriteBlobInfo(const string & blob_key,SNCBlobVerData * ver_data,SNCChunkMaps * maps,Uint8 cnt_chunks,SNCCacheData * cache_data)2216 CNCBlobStorage::WriteBlobInfo(const string& blob_key,
2217                               SNCBlobVerData* ver_data,
2218                               SNCChunkMaps* maps,
2219                               Uint8 cnt_chunks,
2220                               SNCCacheData* cache_data)
2221 {
2222     SNCDataCoord old_coord = ver_data->coord;
2223     SNCDataCoord down_coord = ver_data->data_coord;
2224     if (old_coord.empty()) {
2225         if (ver_data->size > ver_data->chunk_size) {
2226             Uint2 last_chunks_cnt = Uint2((cnt_chunks - 1) % ver_data->map_size) + 1;
2227             if (!s_SaveChunkMap(ver_data, cache_data, maps->maps, last_chunks_cnt, true)) {
2228 #ifdef _DEBUG
2229 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo1, "WriteBlobInfo: s_SaveChunkMap");
2230 #endif
2231                 return false;
2232             }
2233 
2234             for (Uint1 i = 1; i <= kNCMaxBlobMapsDepth; ++i) {
2235                 if (!maps->maps[i]->coords[0].empty()) {
2236                     down_coord = maps->maps[i]->coords[0];
2237                     maps->maps[i]->coords[0].clear();
2238                     ver_data->map_depth = i;
2239                     break;
2240                 }
2241             }
2242             if (down_coord.empty()) {
2243                 SRV_FATAL("Blob coords broken");
2244             }
2245         }
2246         else if (ver_data->size != 0) {
2247             down_coord = maps->maps[0]->coords[0];
2248             maps->maps[0]->coords[0].clear();
2249             ver_data->map_depth = 0;
2250         }
2251     }
2252 
2253     Uint2 key_size = Uint2(blob_key.size());
2254     if (!ver_data->password.empty())
2255         key_size += 16;
2256     Uint4 rec_size = s_CalcMetaRecSize(key_size);
2257     SNCDataCoord meta_coord;
2258     CSrvRef<SNCDBFileInfo> meta_file;
2259     SFileIndexRec* meta_ind;
2260     if (!s_GetNextWriteCoord(eFileIndexMeta, rec_size, meta_coord, meta_file, meta_ind)) {
2261 #ifdef _DEBUG
2262 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo2, "WriteBlobInfo: s_GetNextWriteCoord");
2263 #endif
2264         return false;
2265     }
2266 
2267     meta_ind->rec_type = eFileRecMeta;
2268     meta_ind->cache_data = cache_data;
2269     meta_ind->chain_coord = down_coord;
2270 #if 0
2271 cout << "WriteBlobInfo:" << " coord = " << meta_ind->chain_coord.file_id
2272      << "[" << meta_ind->chain_coord.rec_num << "]" << endl;
2273 #endif
2274 
2275     SFileMetaRec* meta_rec = s_CalcMetaAddress(meta_file, meta_ind);
2276     meta_rec->size = ver_data->size;
2277     meta_rec->create_time = ver_data->create_time;
2278     meta_rec->create_server = ver_data->create_server;
2279     meta_rec->create_id = ver_data->create_id;
2280     meta_rec->dead_time = ver_data->dead_time;
2281     meta_rec->ttl = ver_data->ttl;
2282     meta_rec->expire = ver_data->expire;
2283     meta_rec->blob_ver = ver_data->blob_ver;
2284     meta_rec->ver_ttl = ver_data->ver_ttl;
2285     meta_rec->ver_expire = ver_data->ver_expire;
2286     meta_rec->map_size = ver_data->map_size;
2287     meta_rec->chunk_size = ver_data->chunk_size;
2288     char* key_data = meta_rec->key_data;
2289     if (ver_data->password.empty()) {
2290         meta_rec->has_password = 0;
2291     }
2292     else {
2293         meta_rec->has_password = 1;
2294         memcpy(key_data, ver_data->password.data(), 16);
2295         key_data += 16;
2296     }
2297     memcpy(key_data, blob_key.data(), blob_key.size());
2298 
2299     ver_data->coord = meta_coord;
2300     ver_data->data_coord = down_coord;
2301 
2302     if (!down_coord.empty()) {
2303         CSrvRef<SNCDBFileInfo> down_file = s_GetDBFileTry(down_coord.file_id);
2304         if (down_file.IsNull()) {
2305 #ifdef _DEBUG
2306 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo3, "WriteBlobInfo: down_file");
2307 #endif
2308             return false;
2309         }
2310         SFileIndexRec*  down_ind = s_GetIndexRecTry(down_file, down_coord.rec_num);
2311         if (!down_ind) {
2312 #ifdef _DEBUG
2313 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo4, "WriteBlobInfo: down_ind");
2314 #endif
2315             return false;
2316         }
2317         down_ind->chain_coord = meta_coord;
2318     }
2319 
2320     if (!old_coord.empty()) {
2321         CSrvRef<SNCDBFileInfo> old_file = s_GetDBFileTry(old_coord.file_id);
2322         if (old_file.NotNull()) {
2323             SFileIndexRec* old_ind = s_GetIndexRecTry(old_file, old_coord.rec_num);
2324             if (old_ind) {
2325                 // Check that meta-data wasn't corrupted.
2326                 s_CalcMetaAddress(old_file, old_ind);
2327                 s_MoveRecToGarbage(old_file, old_ind);
2328             }
2329         }
2330     }
2331 
2332     meta_file->cnt_unfinished.Add(-1);
2333 
2334     return true;
2335 }
2336 
2337 static void
s_MoveDataToGarbage(SNCDataCoord coord,Uint1 map_depth,SNCDataCoord up_coord,bool need_lock)2338 s_MoveDataToGarbage(SNCDataCoord coord, Uint1 map_depth, SNCDataCoord up_coord, bool need_lock)
2339 {
2340     if (coord.empty()) {
2341         return;
2342     }
2343     CSrvRef<SNCDBFileInfo> file_info = need_lock ? s_GetDBFileTry(coord.file_id) : s_GetDBFileNoLock(coord.file_id);
2344     if (file_info.IsNull()) {
2345         return;
2346     }
2347     SFileIndexRec* ind_rec = s_GetIndexRecTry(file_info, coord.rec_num);
2348     if (!ind_rec) {
2349         return;
2350     }
2351     if (ind_rec->chain_coord != up_coord) {
2352         DB_CORRUPTED("Index record " << coord.rec_num
2353                      << " in file " << file_info->file_name
2354                      << " has wrong up_coord " << ind_rec->chain_coord
2355                      << " when should be " << up_coord << ".");
2356     }
2357     if (ind_rec->rec_type == eFileRecChunkData) {
2358         s_CalcChunkAddress(file_info, ind_rec);
2359         s_MoveRecToGarbage(file_info, ind_rec);
2360     }
2361     else if (ind_rec->rec_type == eFileRecChunkMap) {
2362         Uint2 cnt_downs = s_CalcCntMapDowns(ind_rec->rec_size);
2363         if (cnt_downs != 0) {
2364             SFileChunkMapRec* map_rec = s_CalcMapAddress(file_info, ind_rec);
2365             for (Uint2 i = 0; i < cnt_downs; ++i) {
2366                 s_MoveDataToGarbage(map_rec->down_coords[i], map_depth - 1, coord, need_lock);
2367             }
2368         }
2369         s_MoveRecToGarbage(file_info, ind_rec);
2370     }
2371 }
2372 
2373 void
DeleteBlobInfo(const SNCBlobVerData * ver_data,SNCChunkMaps * maps)2374 CNCBlobStorage::DeleteBlobInfo(const SNCBlobVerData* ver_data,
2375                                SNCChunkMaps* maps)
2376 {
2377     if (!ver_data->coord.empty()) {
2378         CSrvRef<SNCDBFileInfo> meta_file = s_GetDBFileTry(ver_data->coord.file_id);
2379         if (meta_file.NotNull()) {
2380             SFileIndexRec* meta_ind = s_GetIndexRecTry(meta_file, ver_data->coord.rec_num);
2381             if (meta_ind) {
2382                 s_CalcMetaAddress(meta_file, meta_ind);
2383                 if (ver_data->size != 0) {
2384                     s_MoveDataToGarbage(ver_data->data_coord,
2385                                         ver_data->map_depth,
2386                                         ver_data->coord, true);
2387                 }
2388                 s_MoveRecToGarbage(meta_file, meta_ind);
2389             }
2390         }
2391     }
2392     if (maps) {
2393         SNCDataCoord empty_coord;
2394         empty_coord.clear();
2395         for (Uint1 depth = 0; depth <= kNCMaxBlobMapsDepth; ++depth) {
2396             Uint2 idx = 0;
2397             while (idx < ver_data->map_size
2398                    &&  !maps->maps[depth]->coords[idx].empty())
2399             {
2400                 s_MoveDataToGarbage(maps->maps[depth]->coords[idx], depth, empty_coord, true);
2401                 ++idx;
2402             }
2403         }
2404     }
2405 }
2406 
2407 static bool
s_ReadMapInfo(SNCDataCoord map_coord,SNCChunkMapInfo * map_info,Uint1 map_depth)2408 s_ReadMapInfo(SNCDataCoord map_coord,
2409               SNCChunkMapInfo* map_info,
2410               Uint1 map_depth/*,
2411               SNCDataCoord up_coord*/)
2412 {
2413     CSrvRef<SNCDBFileInfo> map_file = s_GetDBFileTry(map_coord.file_id);
2414     if (map_file.IsNull()) {
2415 #ifdef _DEBUG
2416 CNCAlerts::Register(CNCAlerts::eDebugReadMapInfo1, "ReadMapInfo: map_file");
2417 #endif
2418         return false;
2419     }
2420     SFileIndexRec* map_ind = s_GetIndexRecTry(map_file, map_coord.rec_num);
2421     if (!map_ind) {
2422 #ifdef _DEBUG
2423 CNCAlerts::Register(CNCAlerts::eDebugReadMapInfo2, "ReadMapInfo: map_inf");
2424 #endif
2425         return false;
2426     }
2427     /*
2428     This is an incorrect check because it can race with
2429     CNCBlobVerManager::x_UpdateVersion
2430 
2431     if (map_ind->chain_coord != up_coord) {
2432         DB_CORRUPTED("Index record " << map_coord.rec_num
2433                      << " in file " << map_file->file_name
2434                      << " has wrong up_coord " << map_ind->chain_coord
2435                      << " when should be " << up_coord << ".");
2436     }
2437     */
2438     SFileChunkMapRec* map_rec = s_CalcMapAddress(map_file, map_ind);
2439     if (map_rec->map_idx != map_info->map_idx  ||  map_rec->map_depth != map_depth)
2440     {
2441         DB_CORRUPTED("Map at coord " << map_coord
2442                      << " has wrong index " << map_rec->map_idx
2443                      << " and/or depth " << map_rec->map_depth
2444                      << " when it should be " << map_info->map_idx
2445                      << " and " << map_depth << ".");
2446     }
2447     memcpy(map_info->coords, map_rec->down_coords,
2448            s_CalcCntMapDowns(map_ind->rec_size) * sizeof(map_rec->down_coords[0]));
2449     return true;
2450 }
2451 
2452 bool
ReadChunkData(SNCBlobVerData * ver_data,SNCChunkMaps * maps,Uint8 chunk_num,char * & buffer,Uint4 & buf_size)2453 CNCBlobStorage::ReadChunkData(SNCBlobVerData* ver_data,
2454                               SNCChunkMaps* maps,
2455                               Uint8 chunk_num,
2456                               char*& buffer,
2457                               Uint4& buf_size)
2458 {
2459     Uint2 map_idx[kNCMaxBlobMapsDepth] = {0};
2460     Uint1 cur_index = 0;
2461     Uint8 level_num = chunk_num;
2462     do {
2463         map_idx[cur_index] = Uint2(level_num % ver_data->map_size);
2464         level_num /= ver_data->map_size;
2465         ++cur_index;
2466     }
2467     while (level_num != 0);
2468 
2469     SNCDataCoord map_coord/*, up_coord*/;
2470     for (Uint1 depth = ver_data->map_depth; depth > 0; ) {
2471         Uint2 this_index = map_idx[depth];
2472         --depth;
2473         SNCChunkMapInfo* this_map = maps->maps[depth];
2474         if (this_map->map_idx == this_index
2475             &&  !this_map->coords[0].empty()
2476             &&  this_map->map_counter == ver_data->map_move_counter)
2477         {
2478             continue;
2479         }
2480 
2481         this_map->map_counter = ver_data->map_move_counter;
2482         this_map->map_idx = this_index;
2483 
2484         if (depth + 1 == ver_data->map_depth) {
2485             //up_coord = ver_data->coord;
2486             map_coord = ver_data->data_coord;
2487         }
2488         else {
2489             /*if (depth + 2 == ver_data->map_depth)
2490                 up_coord = ver_data->data_coord;
2491             else
2492                 up_coord = maps->maps[depth + 2]->coords[map_idx[depth + 2]];*/
2493             map_coord = maps->maps[depth + 1]->coords[this_index];
2494         }
2495 
2496         if (!s_ReadMapInfo(map_coord, this_map, depth + 1/*, up_coord*/)) {
2497 #ifdef _DEBUG
2498 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData1, "ReadChunkData: ReadMapInfo");
2499 #endif
2500             return false;
2501         }
2502 
2503         while (depth > 0) {
2504             this_index = map_idx[depth];
2505             --depth;
2506             this_map = maps->maps[depth];
2507             this_map->map_counter = ver_data->map_move_counter;
2508             this_map->map_idx = this_index;
2509             //up_coord = map_coord;
2510             map_coord = maps->maps[depth + 1]->coords[this_index];
2511             if (!s_ReadMapInfo(map_coord, this_map, depth + 1/*, up_coord*/)) {
2512 #ifdef _DEBUG
2513 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData2, "ReadChunkData: ReadMapInfo");
2514 #endif
2515                 return false;
2516             }
2517         }
2518     }
2519 
2520     SNCDataCoord chunk_coord;
2521     if (ver_data->map_depth == 0) {
2522         if (chunk_num != 0) {
2523             SRV_FATAL("Blob coords broken");
2524         }
2525         //up_coord = ver_data->coord;
2526         chunk_coord = ver_data->data_coord;
2527     }
2528     else {
2529         /*if (ver_data->map_depth == 1)
2530             up_coord = ver_data->data_coord;
2531         else
2532             up_coord = maps->maps[1]->coords[map_idx[1]];*/
2533         chunk_coord = maps->maps[0]->coords[map_idx[0]];
2534     }
2535 
2536     CSrvRef<SNCDBFileInfo> data_file = s_GetDBFileTry(chunk_coord.file_id);
2537     if (data_file.IsNull()) {
2538 #ifdef _DEBUG
2539 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData3, "ReadChunkData: data_file");
2540 #endif
2541         return false;
2542     }
2543     SFileIndexRec* data_ind = s_GetIndexRecTry(data_file, chunk_coord.rec_num);
2544     if (!data_ind) {
2545 #ifdef _DEBUG
2546 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData4, "ReadChunkData: data_inf");
2547 #endif
2548         return false;
2549     }
2550     /*
2551     This is an incorrect check because it can race with
2552     CNCBlobVerManager::x_UpdateVersion
2553 
2554     if (data_ind->chain_coord != up_coord) {
2555         DB_CORRUPTED("Index record " << chunk_coord.rec_num
2556                      << " in file " << data_file->file_name
2557                      << " has wrong up_coord " << data_ind->chain_coord
2558                      << " when should be " << up_coord << ".");
2559     }
2560     */
2561     SFileChunkDataRec* data_rec = s_CalcChunkAddress(data_file, data_ind);
2562     if (data_rec->chunk_num != chunk_num  ||  data_rec->chunk_idx != map_idx[0])
2563     {
2564         SRV_LOG(Critical, "File " << data_file->file_name
2565                           << " in chunk record " << chunk_coord.rec_num
2566                           << " has wrong chunk number " << data_rec->chunk_num
2567                           << " and/or chunk index " << data_rec->chunk_idx
2568                           << ". Deleting blob.");
2569         return false;
2570     }
2571 
2572     buf_size = s_CalcChunkDataSize(data_ind->rec_size);
2573     buffer = (char*)data_rec->chunk_data;
2574 
2575     return true;
2576 }
2577 
2578 char*
WriteChunkData(SNCBlobVerData * ver_data,SNCChunkMaps * maps,SNCCacheData * cache_data,Uint8 chunk_num,char * buffer,Uint4 buf_size)2579 CNCBlobStorage::WriteChunkData(SNCBlobVerData* ver_data,
2580                                SNCChunkMaps* maps,
2581                                SNCCacheData* cache_data,
2582                                Uint8 chunk_num,
2583                                char* buffer,
2584                                Uint4 buf_size)
2585 {
2586     Uint2 map_idx[kNCMaxBlobMapsDepth] = {0};
2587     Uint1 cur_index = 0;
2588     Uint8 level_num = chunk_num;
2589     do {
2590         map_idx[cur_index] = Uint2(level_num % ver_data->map_size);
2591         level_num /= ver_data->map_size;
2592         ++cur_index;
2593     }
2594     while (level_num != 0  &&  cur_index < kNCMaxBlobMapsDepth);
2595     if (level_num != 0  &&  cur_index >= kNCMaxBlobMapsDepth) {
2596         SRV_LOG(Critical, "Chunk number " << chunk_num
2597                           << " exceeded maximum map depth " << kNCMaxBlobMapsDepth
2598                           << " with map_size=" << ver_data->map_size << ".");
2599 #ifdef _DEBUG
2600 CNCAlerts::Register(CNCAlerts::eDebugWriteChunkData1,"chunk number");
2601 #endif
2602         return NULL;
2603     }
2604 
2605     if (map_idx[1] != maps->maps[0]->map_idx) {
2606         if (!s_SaveChunkMap(ver_data, cache_data, maps->maps, ver_data->map_size, false))
2607             return NULL;
2608         for (Uint1 i = 0; i < kNCMaxBlobMapsDepth - 1; ++i)
2609             maps->maps[i]->map_idx = map_idx[i + 1];
2610     }
2611 
2612     SNCDataCoord data_coord;
2613     CSrvRef<SNCDBFileInfo> data_file;
2614     SFileIndexRec* data_ind;
2615     Uint4 rec_size = s_CalcChunkRecSize(buf_size);
2616     if (!s_GetNextWriteCoord(eFileIndexData, rec_size, data_coord, data_file, data_ind)) {
2617 #ifdef _DEBUG
2618 CNCAlerts::Register(CNCAlerts::eDebugWriteChunkData2,"s_GetNextWriteCoord");
2619 #endif
2620         return NULL;
2621     }
2622 
2623     data_ind->rec_type = eFileRecChunkData;
2624     data_ind->cache_data = cache_data;
2625     SFileChunkDataRec* data_rec = s_CalcChunkAddress(data_file, data_ind);
2626     data_rec->chunk_num = chunk_num;
2627     data_rec->chunk_idx = map_idx[0];
2628     memcpy(data_rec->chunk_data, buffer, buf_size);
2629 
2630     maps->maps[0]->coords[map_idx[0]] = data_coord;
2631 
2632     data_file->cnt_unfinished.Add(-1);
2633 
2634     return (char*)data_rec->chunk_data;
2635 }
2636 
2637 void
ChangeCacheDeadTime(SNCCacheData * cache_data)2638 CNCBlobStorage::ChangeCacheDeadTime(SNCCacheData* cache_data)
2639 {
2640     STimeTable* table = s_TimeTables[cache_data->time_bucket];
2641     table->lock.Lock();
2642     if (cache_data->saved_dead_time != 0) {
2643 #if __NC_CACHEDATA_INTR_SET
2644         table->time_map.erase(table->time_map.iterator_to(*cache_data));
2645 #else
2646         table->time_map.erase(cache_data);
2647 #endif
2648         AtomicSub(s_CurBlobsCnt, 1);
2649         if (CNCBlobStorage::IsDraining() && s_CurBlobsCnt <= 0) {
2650             CTaskServer::RequestShutdown(eSrvSlowShutdown);
2651         }
2652     }
2653     cache_data->saved_dead_time = cache_data->dead_time;
2654     if (cache_data->saved_dead_time != 0) {
2655 #if __NC_CACHEDATA_INTR_SET
2656         table->time_map.insert_equal(*cache_data);
2657 #else
2658         table->time_map.insert(cache_data);
2659 #endif
2660         AtomicAdd(s_CurBlobsCnt, 1);
2661     }
2662     table->lock.Unlock();
2663 }
2664 
2665 Uint8
GetMaxSyncLogRecNo(void)2666 CNCBlobStorage::GetMaxSyncLogRecNo(void)
2667 {
2668     Uint8 result = 0;
2669     s_IndexLock.Lock();
2670     try {
2671         result = s_IndexDB->GetMaxSyncLogRecNo();
2672     }
2673     catch (CSQLITE_Exception& ex) {
2674         SRV_LOG(Critical, "Cannot read max_sync_log_rec_no: " << ex);
2675     }
2676     s_IndexLock.Unlock();
2677     return result;
2678 }
2679 
2680 void
SaveMaxSyncLogRecNo(void)2681 CNCBlobStorage::SaveMaxSyncLogRecNo(void)
2682 {
2683     s_NeedSaveLogRecNo = true;
2684 }
2685 
2686 string
GetPurgeData(void)2687 CNCBlobStorage::GetPurgeData(void)
2688 {
2689     string result;
2690     s_IndexLock.Lock();
2691     try {
2692         result = s_IndexDB->GetPurgeData();
2693     }
2694     catch (CSQLITE_Exception&) {
2695     }
2696     s_IndexLock.Unlock();
2697     return result;
2698 }
2699 
SavePurgeData(void)2700 void CNCBlobStorage::SavePurgeData(void)
2701 {
2702     s_NeedSavePurgeData = true;
2703     s_RecNoSaver->SetRunnable();
2704 }
2705 
2706 Uint8
GetMaxBlobSizeStore(void)2707 CNCBlobStorage::GetMaxBlobSizeStore(void)
2708 {
2709     return s_MaxBlobSizeStore;
2710 }
2711 
2712 Int8
GetDiskFree(void)2713 CNCBlobStorage::GetDiskFree(void)
2714 {
2715     try {
2716         return CFileUtil::GetFreeDiskSpace(s_Path);
2717     }
2718     catch (CFileErrnoException& ex) {
2719         SRV_LOG(Critical, "Cannot read free disk space: " << ex);
2720         return 0;
2721     }
2722 }
2723 
2724 Int8
GetAllowedDBSize(Int8 free_space)2725 CNCBlobStorage::GetAllowedDBSize(Int8 free_space)
2726 {
2727     Int8 total_space = s_CurDBSize + free_space;
2728     Int8 allowed_db_size = total_space;
2729 
2730     if (total_space < s_DiskCritical)
2731         allowed_db_size = 16;
2732     else
2733         allowed_db_size = min(allowed_db_size, total_space - s_DiskCritical);
2734     if (total_space < s_DiskFreeLimit)
2735         allowed_db_size = 16;
2736     else
2737         allowed_db_size = min(allowed_db_size, total_space - s_DiskFreeLimit);
2738     if (s_StopWriteOnSize != 0  &&  s_StopWriteOnSize < allowed_db_size)
2739         allowed_db_size = s_StopWriteOnSize;
2740 
2741     return allowed_db_size;
2742 }
2743 
2744 Int8
GetDBSize(void)2745 CNCBlobStorage::GetDBSize(void)
2746 {
2747     return s_CurDBSize;
2748 }
2749 
2750 size_t
GetNDBFiles(void)2751 CNCBlobStorage::GetNDBFiles(void)
2752 {
2753     return s_DBFiles->size();
2754 }
2755 
2756 bool
IsCleanStart(void)2757 CNCBlobStorage::IsCleanStart(void)
2758 {
2759     return s_CleanStart;
2760 }
2761 
2762 bool
NeedStopWrite(void)2763 CNCBlobStorage::NeedStopWrite(void)
2764 {
2765     return s_IsStopWrite != eNoStop  &&  s_IsStopWrite != eStopWarning;
2766 }
2767 
2768 bool
AcceptWritesFromPeers(void)2769 CNCBlobStorage::AcceptWritesFromPeers(void)
2770 {
2771     return s_IsStopWrite != eStopDiskCritical;
2772 }
2773 
2774 void
SetDraining(bool draining)2775 CNCBlobStorage::SetDraining(bool draining)
2776 {
2777     s_Draining = draining;
2778     if (draining) {
2779         if (s_CurBlobsCnt <= 0) {
2780             CTaskServer::RequestShutdown(eSrvSlowShutdown);
2781         }
2782     }
2783 }
2784 
AbandonDB(void)2785 void CNCBlobStorage::AbandonDB(void)
2786 {
2787     s_AbandonDB = true;
2788 }
2789 
IsAbandoned(void)2790 bool CNCBlobStorage::IsAbandoned(void)
2791 {
2792     return s_AbandonDB;
2793 }
2794 
2795 bool
IsDraining(void)2796 CNCBlobStorage::IsDraining(void)
2797 {
2798     return s_Draining;
2799 }
2800 
2801 bool
IsDBSizeAlert(void)2802 CNCBlobStorage::IsDBSizeAlert(void)
2803 {
2804     return s_IsStopWrite != eNoStop;
2805 }
2806 
2807 Uint4
GetNewBlobId(void)2808 CNCBlobStorage::GetNewBlobId(void)
2809 {
2810     return Uint4(s_BlobCounter.Add(1));
2811 }
2812 
2813 int
GetLatestBlobExpire(void)2814 CNCBlobStorage::GetLatestBlobExpire(void)
2815 {
2816     int res = CSrvTime::CurSecs();
2817     ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
2818         SBucketCache* cache = bkt->second;
2819         cache->lock.Lock();
2820         ITERATE(TKeyMap, it, cache->key_map) {
2821 #if __NC_CACHEDATA_INTR_SET
2822             res = max( res, it->dead_time);
2823 #else
2824             res = max( res, (*it)->dead_time);
2825 #endif
2826         }
2827         cache->lock.Unlock();
2828     }
2829     return res;
2830 }
2831 
2832 void
GetFullBlobsList(Uint2 slot,TNCBlobSumList & blobs_lst,const CNCPeerControl * peer)2833 CNCBlobStorage::GetFullBlobsList(Uint2 slot, TNCBlobSumList& blobs_lst, const CNCPeerControl* peer)
2834 {
2835     blobs_lst.clear();
2836     Uint2 slot_buckets = CNCDistributionConf::GetCntSlotBuckets();
2837     Uint2 bucket_num = (slot - 1) * slot_buckets + 1;
2838     for (Uint2 i = 0; i < slot_buckets; ++i, ++bucket_num) {
2839         SBucketCache* cache = s_GetBucketCache(bucket_num);
2840 
2841         cache->lock.Lock();
2842         Uint8 cnt_blobs = cache->key_map.size();
2843         void* big_block = malloc(size_t(cnt_blobs * sizeof(SNCTempBlobInfo)));
2844         if (!big_block) {
2845             cache->lock.Unlock();
2846             return;
2847         }
2848         SNCTempBlobInfo* info_ptr = (SNCTempBlobInfo*)big_block;
2849 
2850         ITERATE(TKeyMap, it, cache->key_map) {
2851 #if __NC_CACHEDATA_INTR_SET
2852             new (info_ptr) SNCTempBlobInfo(*it);
2853 #else
2854             new (info_ptr) SNCTempBlobInfo(**it);
2855 #endif
2856             ++info_ptr;
2857         }
2858         cache->lock.Unlock();
2859 
2860         info_ptr = (SNCTempBlobInfo*)big_block;
2861         for (Uint8 i = 0; i < cnt_blobs; ++i) {
2862             Uint2 key_slot = 0, key_bucket = 0;
2863             if (!CNCDistributionConf::GetSlotByKey(info_ptr->key, key_slot, key_bucket) ||
2864                 key_slot != slot /*|| key_bucket != bucket_num*/) {
2865                 SRV_FATAL("Slot verification failed, blob key: " << info_ptr->key <<
2866                           ", expected slot: " << slot << ", calculated slot: " << key_slot);
2867             }
2868             if (key_bucket != bucket_num) {
2869                 SRV_LOG(Critical, "Slot verification failed, blob key: " << info_ptr->key <<
2870                           ", slot: " << slot <<
2871                           ", expected bucket: " << bucket_num << ", calculated bucket: " << key_bucket);
2872             }
2873 
2874             if (info_ptr->size > CNCDistributionConf::GetMaxBlobSizeSync()) {
2875                 if (CNCDistributionConf::IsThisServerKey(info_ptr->key)) {
2876                     continue;
2877                 }
2878             }
2879 
2880             if (peer && !peer->AcceptsBlobKey(info_ptr->key)) {
2881                 continue;
2882             }
2883             SNCBlobSummary* blob_sum = new SNCBlobSummary();
2884             blob_sum->size           = info_ptr->size;
2885             blob_sum->create_id      = info_ptr->create_id;
2886             blob_sum->create_server  = info_ptr->create_server;
2887             blob_sum->create_time    = info_ptr->create_time;
2888             blob_sum->dead_time      = info_ptr->dead_time;
2889             blob_sum->expire         = info_ptr->expire;
2890             blob_sum->ver_expire     = info_ptr->ver_expire;
2891             blobs_lst[info_ptr->key] = blob_sum;
2892 
2893             info_ptr->~SNCTempBlobInfo();
2894             ++info_ptr;
2895         }
2896 
2897         free(big_block);
2898     }
2899 }
2900 
2901 void
MeasureDB(SNCStateStat & state)2902 CNCBlobStorage::MeasureDB(SNCStateStat& state)
2903 {
2904     state.db_files = Uint4(CNCBlobStorage::GetNDBFiles());
2905     state.db_size = s_CurDBSize;
2906     state.db_garb = s_GarbageSize;
2907     state.cnt_blobs = s_CurBlobsCnt;
2908     state.cnt_keys = s_CurKeysCnt;
2909     state.min_dead_time = numeric_limits<int>::max();
2910     ITERATE(TTimeBuckets, it, s_TimeTables) {
2911         STimeTable* table = it->second;
2912         table->lock.Lock();
2913         if (!table->time_map.empty()) {
2914 #if __NC_CACHEDATA_INTR_SET
2915             state.min_dead_time = min(state.min_dead_time,
2916                                       table->time_map.begin()->dead_time);
2917 #else
2918             state.min_dead_time = min(state.min_dead_time,
2919                                       (*table->time_map.begin())->dead_time);
2920 #endif
2921         }
2922         table->lock.Unlock();
2923     }
2924     if (state.min_dead_time == numeric_limits<int>::max())
2925         state.min_dead_time = 0;
2926 }
2927 
2928 void
x_DeleteIndexes(SNCDataCoord map_coord,Uint1 map_depth)2929 CBlobCacher::x_DeleteIndexes(SNCDataCoord map_coord, Uint1 map_depth)
2930 {
2931     CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileNoLock(map_coord.file_id);
2932     SFileIndexRec* map_ind = s_GetIndexRec(file_info, map_coord.rec_num);
2933     s_DeleteIndexRec(file_info, map_ind);
2934     if (map_depth != 0) {
2935         Uint2 cnt_downs = s_CalcCntMapDowns(map_ind->rec_size);
2936         if (cnt_downs != 0) {
2937             SFileChunkMapRec* map_rec = s_CalcMapAddress(file_info, map_ind);
2938             for (Uint2 i = 0; i < cnt_downs; ++i) {
2939                 x_DeleteIndexes(map_rec->down_coords[i], map_depth - 1);
2940             }
2941         }
2942     }
2943 }
2944 
2945 bool
x_CacheMapRecs(SNCDataCoord map_coord,Uint1 map_depth,SNCDataCoord up_coord,Uint2 up_index,SNCCacheData * cache_data,Uint8 cnt_chunks,Uint8 & chunk_num,map<Uint4,Uint4> & sizes_map)2946 CBlobCacher::x_CacheMapRecs(SNCDataCoord map_coord,
2947                             Uint1 map_depth,
2948                             SNCDataCoord up_coord,
2949                             Uint2 up_index,
2950                             SNCCacheData* cache_data,
2951                             Uint8 cnt_chunks,
2952                             Uint8& chunk_num,
2953                             map<Uint4, Uint4>& sizes_map)
2954 {
2955     TNCDBFilesMap::const_iterator it_file = s_DBFiles->find(map_coord.file_id);
2956     if (it_file == s_DBFiles->end()) {
2957         SRV_LOG(Critical, "Cannot find file for the coord " << map_coord
2958                           << " which is referenced by blob " << cache_data->key
2959                           << ". Deleting blob.");
2960         return false;
2961     }
2962 
2963     SNCDBFileInfo* file_info = it_file->second.GetNCPointerOrNull();
2964     TRecNumsSet& recs_set = m_RecsMap[map_coord.file_id];
2965     TRecNumsSet::iterator it_recs = recs_set.find(map_coord.rec_num);
2966     if (it_recs == recs_set.end()) {
2967         SRV_LOG(Critical, "Blob " << cache_data->key
2968                           << " references record with coord " << map_coord
2969                           << ", but its index wasn't in live chain. Deleting blob.");
2970         return false;
2971     }
2972 
2973     SFileIndexRec* map_ind = s_GetIndexRec(file_info, map_coord.rec_num);
2974     if (map_ind->chain_coord != up_coord) {
2975         SRV_LOG(Critical, "Up_coord for map/data in blob " << cache_data->key
2976                           << " is incorrect: " << map_ind->chain_coord
2977                           << " when should be " << up_coord
2978                           << ". Correcting it.");
2979         map_ind->chain_coord = up_coord;
2980     }
2981     map_ind->cache_data = cache_data;
2982 
2983     Uint4 rec_size = map_ind->rec_size;
2984     if (rec_size & 7)
2985         rec_size += 8 - (rec_size & 7);
2986     sizes_map[map_coord.file_id] += rec_size + sizeof(SFileIndexRec);
2987 
2988     if (map_depth == 0) {
2989         if (map_ind->rec_type != eFileRecChunkData) {
2990             SRV_LOG(Critical, "Blob " << cache_data->key
2991                               << " with size " << cache_data->size
2992                               << " references record with coord " << map_coord
2993                               << " that has type " << int(map_ind->rec_type)
2994                               << " when it should be " << int(eFileRecChunkData)
2995                               << ". Deleting blob.");
2996             return false;
2997         }
2998         ++chunk_num;
2999         if (chunk_num > cnt_chunks) {
3000             SRV_LOG(Critical, "Blob " << cache_data->key
3001                               << " with size " << cache_data->size
3002                               << " has too many data chunks, should be " << cnt_chunks
3003                               << ". Deleting blob.");
3004             return false;
3005         }
3006         Uint4 data_size = s_CalcChunkDataSize(map_ind->rec_size);
3007         Uint4 need_size;
3008         if (chunk_num < cnt_chunks)
3009             need_size = cache_data->chunk_size;
3010         else
3011             need_size = (cache_data->size - 1) % cache_data->chunk_size + 1;
3012         if (data_size != need_size) {
3013             SRV_LOG(Critical, "Blob " << cache_data->key
3014                               << " with size " << cache_data->size
3015                               << " references data record with coord " << map_coord
3016                               << " that has data size " << data_size
3017                               << " when it should be " << need_size
3018                               << ". Deleting blob.");
3019             return false;
3020         }
3021     }
3022     else {
3023         if (map_ind->rec_type != eFileRecChunkMap) {
3024             SRV_LOG(Critical, "Blob " << cache_data->key
3025                               << " with size " << cache_data->size
3026                               << " references record with coord " << map_coord
3027                               << " at map_depth=" << map_depth
3028                               << ". Record has type " << int(map_ind->rec_type)
3029                               << " when it should be " << int(eFileRecChunkMap)
3030                               << ". Deleting blob.");
3031             return false;
3032         }
3033         SFileChunkMapRec* map_rec = s_CalcMapAddress(file_info, map_ind);
3034         if (map_rec->map_idx != up_index  ||  map_rec->map_depth != map_depth)
3035         {
3036             SRV_LOG(Critical, "Blob " << cache_data->key
3037                               << " with size " << cache_data->size
3038                               << " references map with coord " << map_coord
3039                               << " that has wrong index " << map_rec->map_idx
3040                               << " and/or map depth " << map_rec->map_depth
3041                               << ", they should be " << up_index
3042                               << " and " << map_depth
3043                               << ". Deleting blob.");
3044             return false;
3045         }
3046         Uint2 cnt_downs = s_CalcCntMapDowns(map_ind->rec_size);
3047         for (Uint2 i = 0; i < cnt_downs; ++i) {
3048             if (!x_CacheMapRecs(map_rec->down_coords[i], map_depth - 1,
3049                                 map_coord, i, cache_data, cnt_chunks,
3050                                 chunk_num, sizes_map))
3051             {
3052                 for (Uint2 j = 0; j < i; ++j) {
3053                     x_DeleteIndexes(map_rec->down_coords[j], map_depth - 1);
3054                 }
3055                 return false;
3056             }
3057         }
3058         if (chunk_num < cnt_chunks  &&  cnt_downs != cache_data->map_size) {
3059             SRV_LOG(Critical, "Blob " << cache_data->key
3060                               << " with size " << cache_data->size
3061                               << " references map record with coord " << map_coord
3062                               << " that has " << cnt_downs
3063                               << " children when it should be " << cache_data->map_size
3064                               << " because accumulated chunk_num=" << chunk_num
3065                               << " is less than total " << cnt_chunks
3066                               << ". Deleting blob.");
3067             return false;
3068         }
3069     }
3070 
3071     recs_set.erase(it_recs);
3072     return true;
3073 }
3074 
3075 bool
x_CacheMetaRec(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec,SNCDataCoord coord)3076 CBlobCacher::x_CacheMetaRec(SNCDBFileInfo* file_info,
3077                             SFileIndexRec* ind_rec,
3078                             SNCDataCoord coord)
3079 {
3080     SFileMetaRec* meta_rec = s_CalcMetaAddress(file_info, ind_rec);
3081     if (meta_rec->has_password > 1  ||  meta_rec->dead_time < meta_rec->expire
3082         ||  meta_rec->dead_time < meta_rec->ver_expire
3083         ||  meta_rec->chunk_size == 0  ||  meta_rec->map_size == 0)
3084     {
3085         SRV_LOG(Critical, "Meta record in file " << file_info->file_name
3086                           << " at offset " << ind_rec->offset << " was corrupted."
3087                           << " passwd=" << meta_rec->has_password
3088                           << ", expire=" << meta_rec->expire
3089                           << ", ver_expire=" << meta_rec->ver_expire
3090                           << ", dead=" << meta_rec->dead_time
3091                           << ", chunk_size=" << meta_rec->chunk_size
3092                           << ", map_size=" << meta_rec->map_size
3093                           << ". Deleting it.");
3094         return false;
3095     }
3096     if (meta_rec->dead_time <= CSrvTime::CurSecs())
3097         return false;
3098 
3099     char* key_data = meta_rec->key_data;
3100     char* key_end = (char*)meta_rec + ind_rec->rec_size;
3101     if (meta_rec->has_password)
3102         key_data += 16;
3103     string key(key_data, key_end - key_data);
3104     int cnt = 0;
3105     ITERATE(string, it, key) {
3106         if (*it == '\1')
3107             ++cnt;
3108     }
3109     if (cnt != 2) {
3110         SRV_LOG(Critical, "Meta record in file " << file_info->file_name
3111                           << " at offset " << ind_rec->offset << " was corrupted."
3112                           << " key=" << key
3113                           << ", cnt_special=" << cnt
3114                           << ". Deleting it.");
3115         return false;
3116     }
3117     Uint2 slot = 0, time_bucket = 0;
3118     if (!CNCDistributionConf::GetSlotByKey(key, slot, time_bucket)) {
3119         SRV_LOG(Critical, "Could not extract slot number from key: " << key);
3120         return false;
3121     }
3122 
3123     SNCCacheData* cache_data = new SNCCacheData();
3124     cache_data->key = key;
3125     cache_data->coord = coord;
3126     cache_data->time_bucket = time_bucket;
3127     cache_data->size = meta_rec->size;
3128     cache_data->chunk_size = meta_rec->chunk_size;
3129     cache_data->map_size = meta_rec->map_size;
3130     cache_data->create_time = meta_rec->create_time;
3131     cache_data->create_server = meta_rec->create_server;
3132     cache_data->create_id = meta_rec->create_id;
3133     cache_data->dead_time = meta_rec->dead_time;
3134     cache_data->saved_dead_time = meta_rec->dead_time;
3135     cache_data->expire = meta_rec->expire;
3136     cache_data->ver_expire = meta_rec->ver_expire;
3137 
3138     if (meta_rec->size == 0) {
3139         if (!ind_rec->chain_coord.empty()) {
3140             SRV_LOG(Critical, "Index record " << (file_info->index_head - ind_rec)
3141                               << " in file " << file_info->file_name
3142                               << " was corrupted. size=0 but chain_coord="
3143                               << ind_rec->chain_coord << ". Ignoring that.");
3144             ind_rec->chain_coord.clear();
3145         }
3146     }
3147     else {
3148         Uint1 map_depth = s_CalcMapDepthImpl(meta_rec->size,
3149                                              meta_rec->chunk_size,
3150                                              meta_rec->map_size);
3151         if (map_depth > kNCMaxBlobMapsDepth) {
3152             SRV_LOG(Critical, "Meta record in file " << file_info->file_name
3153                               << " at offset " << ind_rec->offset << " was corrupted."
3154                               << ", size=" << meta_rec->size
3155                               << ", chunk_size=" << meta_rec->chunk_size
3156                               << ", map_size=" << meta_rec->map_size
3157                               << " (map depth is more than " << kNCMaxBlobMapsDepth
3158                               << "). Deleting it.");
3159             return false;
3160         }
3161         Uint8 cnt_chunks = (meta_rec->size - 1) / meta_rec->chunk_size + 1;
3162         Uint8 chunk_num = 0;
3163         typedef map<Uint4, Uint4> TSizesMap;
3164         TSizesMap sizes_map;
3165         if (!x_CacheMapRecs(ind_rec->chain_coord, map_depth, coord, 0, cache_data,
3166                             cnt_chunks, chunk_num, sizes_map))
3167         {
3168             delete cache_data;
3169             return false;
3170         }
3171         ITERATE(TSizesMap, it, sizes_map) {
3172             CSrvRef<SNCDBFileInfo> info = s_GetDBFileNoLock(it->first);
3173             info->used_size += it->second;
3174             if (info->garb_size < it->second) {
3175                 SRV_FATAL("Blob coords broken");
3176             }
3177             info->garb_size -= it->second;
3178             AtomicSub(s_GarbageSize, it->second);
3179         }
3180     }
3181     ind_rec->cache_data = cache_data;
3182     Uint4 rec_size = ind_rec->rec_size;
3183     if (rec_size & 7)
3184         rec_size += 8 - (rec_size & 7);
3185     rec_size += sizeof(SFileIndexRec);
3186     file_info->used_size += rec_size;
3187     if (file_info->garb_size < rec_size) {
3188         SRV_FATAL("Blob coords broken");
3189     }
3190     file_info->garb_size -= rec_size;
3191     AtomicSub(s_GarbageSize, rec_size);
3192 
3193     TBucketCacheMap::iterator it_bucket = s_BucketsCache.lower_bound(time_bucket);
3194     SBucketCache* bucket_cache;
3195     if (it_bucket == s_BucketsCache.end()  ||  it_bucket->first != time_bucket) {
3196         bucket_cache = new SBucketCache();
3197         s_BucketsCache.insert(it_bucket, TBucketCacheMap::value_type(time_bucket, bucket_cache));
3198     }
3199     else {
3200         bucket_cache = it_bucket->second;
3201     }
3202     STimeTable* time_table = s_TimeTables[time_bucket];
3203 #if __NC_CACHEDATA_INTR_SET
3204     TKeyMap::insert_commit_data commit_data;
3205     pair<TKeyMap::iterator, bool> ins_res =
3206         bucket_cache->key_map.insert_unique_check(key, SCacheKeyCompare(), commit_data);
3207     if (ins_res.second) {
3208         bucket_cache->key_map.insert_unique_commit(*cache_data, commit_data);
3209         ++s_CurKeysCnt;
3210     }
3211 #else
3212     TKeyMap::iterator ins_res = bucket_cache->key_map.find(cache_data);
3213     if (ins_res == bucket_cache->key_map.end()) {
3214         bucket_cache->key_map.insert(cache_data);
3215         ++s_CurKeysCnt;
3216     }
3217 #endif
3218     else {
3219 #if __NC_CACHEDATA_INTR_SET
3220         SNCCacheData* old_data = &*ins_res.first;
3221         bucket_cache->key_map.erase(ins_res.first);
3222         bucket_cache->key_map.insert_equal(*cache_data);
3223 #else
3224         SNCCacheData* old_data = *ins_res;
3225         bucket_cache->key_map.erase(old_data);
3226         bucket_cache->key_map.insert(cache_data);
3227 #endif
3228 #if __NC_CACHEDATA_ALL_MONITOR
3229         s_AllCache[time_bucket]->all_cache_set.erase(old_data);
3230 #endif
3231         --s_CurBlobsCnt;
3232 #if __NC_CACHEDATA_INTR_SET
3233         time_table->time_map.erase(time_table->time_map.iterator_to(*old_data));
3234 #else
3235         time_table->time_map.erase(old_data);
3236 #endif
3237 #ifdef _DEBUG
3238         if (old_data->Get_ver_mgr() || old_data->ref_cnt.Get() != 0) {
3239             abort();
3240         }
3241 #endif
3242         if (!old_data->coord.empty()) {
3243             CSrvRef<SNCDBFileInfo> old_file = s_GetDBFileNoLock(old_data->coord.file_id);
3244             SFileIndexRec* old_ind = s_GetIndexRec(old_file, old_data->coord.rec_num);
3245             SFileMetaRec* old_rec = s_CalcMetaAddress(old_file, old_ind);
3246             if (old_rec->size != 0  &&  old_ind->chain_coord != ind_rec->chain_coord && !old_ind->chain_coord.empty()) {
3247                 Uint1 map_depth = s_CalcMapDepth(old_rec->size,
3248                                                  old_rec->chunk_size,
3249                                                  old_rec->map_size);
3250                 s_MoveDataToGarbage(old_ind->chain_coord, map_depth, old_data->coord, false);
3251             }
3252             s_MoveRecToGarbage(old_file, old_ind);
3253             old_data->coord.clear();
3254         }
3255         delete old_data;
3256     }
3257 #if __NC_CACHEDATA_INTR_SET
3258     time_table->time_map.insert_equal(*cache_data);
3259 #else
3260     time_table->time_map.insert(cache_data);
3261 #endif
3262 #if __NC_CACHEDATA_ALL_MONITOR
3263     s_AllCache[time_bucket]->all_cache_set.insert(cache_data);
3264 #endif
3265     ++s_CurBlobsCnt;
3266 
3267     return true;
3268 }
3269 
3270 CBlobCacher::State
x_StartCreateFiles(void)3271 CBlobCacher::x_StartCreateFiles(void)
3272 {
3273     m_CurCreatePass = 0;
3274     m_CurCreateFile = 0;
3275     return &CBlobCacher::x_CreateInitialFile;
3276 }
3277 
3278 CBlobCacher::State
x_CreateInitialFile(void)3279 CBlobCacher::x_CreateInitialFile(void)
3280 {
3281     if (CTaskServer::IsInShutdown())
3282         return &CBlobCacher::x_CancelCaching;
3283     if (m_CurCreatePass == 1  &&  m_CurCreateFile >= s_CntAllFiles)
3284         return &CBlobCacher::x_StartCacheBlobs;
3285 
3286     if (m_CurCreateFile < s_CntAllFiles) {
3287         if (!s_CreateNewFile(m_CurCreateFile, false))
3288             return &CBlobCacher::x_DelFileAndRetryCreate;
3289 
3290         m_NewFileIds.insert(s_AllWritings[m_CurCreateFile].next_file->file_id);
3291         ++m_CurCreateFile;
3292     }
3293     else {
3294         s_NextWriteLock.Lock();
3295         for (size_t i = 0; i < s_CntAllFiles; ++i) {
3296             s_SwitchToNextFile(s_AllWritings[i]);
3297         }
3298         s_NextWriteLock.Unlock();
3299         m_CurCreateFile = 0;
3300         ++m_CurCreatePass;
3301     }
3302 
3303     // Yield execution to other tasks
3304     SetRunnable();
3305     return NULL;
3306 }
3307 
3308 CBlobCacher::State
x_DelFileAndRetryCreate(void)3309 CBlobCacher::x_DelFileAndRetryCreate(void)
3310 {
3311     if (s_DBFiles->empty()) {
3312         SRV_LOG(Critical, "Cannot create initial database files.");
3313         GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3314         CTaskServer::RequestShutdown(eSrvFastShutdown);
3315         return &CBlobCacher::x_CancelCaching;
3316     }
3317 
3318     CSrvRef<SNCDBFileInfo> file_to_del;
3319     ITERATE(TNCDBFilesMap, it, (*s_DBFiles)) {
3320         CSrvRef<SNCDBFileInfo> file_info = it->second;
3321         if (file_info->file_type == eDBFileData) {
3322             file_to_del = file_info;
3323             break;
3324         }
3325     }
3326     if (!file_to_del)
3327         file_to_del = s_DBFiles->begin()->second;
3328     s_DeleteDBFile(file_to_del, false);
3329 
3330     // Yield execution to other tasks
3331     SetState(&CBlobCacher::x_CreateInitialFile);
3332 // file creation failed; probably, makes sense to wait longer
3333 // instead of failing again and again, like thousands times per sec
3334 //    SetRunnable();
3335     RunAfter(1);
3336 
3337     return NULL;
3338 }
3339 
3340 CBlobCacher::State
x_PreCacheRecNums(void)3341 CBlobCacher::x_PreCacheRecNums(void)
3342 {
3343     if (CTaskServer::IsInShutdown())
3344         return &CBlobCacher::x_CancelCaching;
3345     if (m_CurFile == s_DBFiles->end())
3346         return &CBlobCacher::x_StartCreateFiles;
3347 
3348     SNCDBFileInfo* const file_info = m_CurFile->second.GetNCPointerOrNull();
3349     TRecNumsSet& recs_set = m_RecsMap[file_info->file_id];
3350 
3351     AtomicAdd(s_CurDBSize,  file_info->file_size);
3352     // Non-garbage is left the same as in x_CreateNewFile
3353     Uint4 garb_size = file_info->file_size
3354                       - (kSignatureSize + sizeof(SFileIndexRec));
3355     file_info->garb_size += garb_size;
3356     AtomicAdd(s_GarbageSize,garb_size);
3357 
3358     SFileIndexRec* ind_rec = file_info->index_head;
3359     Uint4 prev_rec_num = 0;
3360     char* min_ptr = file_info->file_map + kSignatureSize;
3361     while (!CTaskServer::IsInShutdown()  &&  ind_rec->next_num != 0) {
3362         Uint4 rec_num = ind_rec->next_num;
3363         if (rec_num <= prev_rec_num) {
3364             SRV_LOG(Critical, "File " << file_info->file_name
3365                               << " contains wrong next_num=" << rec_num
3366                               << " (it's not greater than current " << prev_rec_num
3367                               << "). Won't cache anything else from this file.");
3368             goto wrap_index_and_return;
3369         }
3370         SFileIndexRec* next_ind;
3371         next_ind = file_info->index_head - rec_num;
3372         if ((char*)next_ind < min_ptr  ||  next_ind >= ind_rec) {
3373             SRV_LOG(Critical, "File " << file_info->file_name
3374                               << " contains wrong next_num=" << rec_num
3375                               << " in index record " << (file_info->index_head - ind_rec)
3376                               << ". It produces pointer " << (void*)next_ind
3377                               << " which is not in the range between " << (void*)min_ptr
3378                               << " and " << (void*)ind_rec
3379                               << ". Won't cache anything else from this file.");
3380             goto wrap_index_and_return;
3381         }
3382         char* next_rec_start = file_info->file_map + next_ind->offset;
3383         if (next_rec_start < min_ptr  ||  next_rec_start > (char*)next_ind
3384             ||  (rec_num == 1  &&  next_rec_start != min_ptr))
3385         {
3386             SRV_LOG(Critical, "File " << file_info->file_name
3387                               << " contains wrong offset=" << ind_rec->offset
3388                               << " in index record " << rec_num
3389                               << ", resulting ptr " << (void*)next_rec_start
3390                               << " which is not in the range " << (void*)min_ptr
3391                               << " and " << (void*)next_ind
3392                               << ". This record will be ignored.");
3393             goto ignore_rec_and_continue;
3394         }
3395         char* next_rec_end;
3396         next_rec_end = next_rec_start + next_ind->rec_size;
3397         if (next_rec_end < next_rec_start  ||  next_rec_end > (char*)next_ind) {
3398             SRV_LOG(Critical, "File " << file_info->file_name
3399                               << " contains wrong rec_size=" << next_ind->rec_size
3400                               << " for offset " << next_ind->offset
3401                               << " in index record " << rec_num
3402                               << " (resulting end ptr " << (void*)next_rec_end
3403                               << " is greater than index record " << (void*)next_ind
3404                               << "). This record will be ignored.");
3405             goto ignore_rec_and_continue;
3406         }
3407         if (next_ind->prev_num != prev_rec_num)
3408             next_ind->prev_num = prev_rec_num;
3409         switch (next_ind->rec_type) {
3410         case eFileRecChunkData:
3411             if (file_info->file_type != eDBFileData) {
3412                 // This is not an error. It can be a result of failed move
3413                 // attempt.
3414                 goto ignore_rec_and_continue;
3415             }
3416             if (next_ind->rec_size < sizeof(SFileChunkDataRec)) {
3417                 SRV_LOG(Critical, "File " << file_info->file_name
3418                                   << " contains wrong rec_size=" << next_ind->rec_size
3419                                   << " for offset " << next_ind->offset
3420                                   << " in index record " << rec_num
3421                                   << ", rec_type=" << int(next_ind->rec_type)
3422                                   << ", min_size=" << sizeof(SFileChunkDataRec)
3423                                   << ". This record will be ignored.");
3424                 goto ignore_rec_and_continue;
3425             }
3426             break;
3427         case eFileRecChunkMap:
3428             if (file_info->file_type != eDBFileMaps)
3429                 goto bad_rec_type;
3430             if (next_ind->rec_size < sizeof(SFileChunkMapRec)) {
3431                 SRV_LOG(Critical, "File " << file_info->file_name
3432                                   << " contains wrong rec_size=" << next_ind->rec_size
3433                                   << " for offset " << next_ind->offset
3434                                   << " in index record " << rec_num
3435                                   << ", rec_type=" << int(next_ind->rec_type)
3436                                   << ", min_size=" << sizeof(SFileChunkMapRec)
3437                                   << ". This record will be ignored.");
3438                 goto ignore_rec_and_continue;
3439             }
3440             break;
3441         case eFileRecMeta:
3442             if (file_info->file_type != eDBFileMeta)
3443                 goto bad_rec_type;
3444             SFileMetaRec* meta_rec;
3445             meta_rec = (SFileMetaRec*)next_rec_start;
3446             Uint4 min_rec_size;
3447             // Minimum key length is 2, so we are adding 1 below
3448             min_rec_size = sizeof(SFileChunkMapRec) + 1;
3449             if (meta_rec->has_password)
3450                 min_rec_size += 16;
3451             if (next_ind->rec_size < min_rec_size) {
3452                 SRV_LOG(Critical, "File " << file_info->file_name
3453                                   << " contains wrong rec_size=" << next_ind->rec_size
3454                                   << " for offset " << next_ind->offset
3455                                   << " in index record " << rec_num
3456                                   << ", rec_type=" << int(next_ind->rec_type)
3457                                   << ", min_size=" << min_rec_size
3458                                   << ". This record will be ignored.");
3459                 goto ignore_rec_and_continue;
3460             }
3461             break;
3462         default:
3463 bad_rec_type:
3464             SRV_LOG(Critical, "File " << file_info->file_name
3465                               << " with type " << int(file_info->file_type)
3466                               << " contains wrong rec_type=" << next_ind->rec_type
3467                               << " in index record " << rec_num
3468                               << "). This record will be ignored.");
3469             goto ignore_rec_and_continue;
3470         }
3471 
3472         recs_set.insert(rec_num);
3473         min_ptr = next_rec_end;
3474         ind_rec = next_ind;
3475         prev_rec_num = rec_num;
3476         continue;
3477 
3478 ignore_rec_and_continue:
3479         ind_rec->next_num = next_ind->next_num;
3480     }
3481     goto finalize_and_return;
3482 
3483 wrap_index_and_return:
3484     ind_rec->next_num = 0;
3485 
3486 finalize_and_return:
3487     s_LockFileMem(ind_rec, (prev_rec_num + 1) * sizeof(SFileIndexRec));
3488 // to next file
3489     ++m_CurFile;
3490     SetRunnable();
3491     return NULL;
3492 }
3493 
3494 CBlobCacher::State
x_StartCaching(void)3495 CBlobCacher::x_StartCaching(void)
3496 {
3497     CreateNewDiagCtx();
3498     CSrvDiagMsg().StartRequest().PrintParam("_type", "caching");
3499 
3500     s_DBFilesLock.Lock();
3501     m_CurFile = s_DBFiles->begin();
3502     return &CBlobCacher::x_PreCacheRecNums;
3503 }
3504 
3505 CBlobCacher::State
x_CancelCaching(void)3506 CBlobCacher::x_CancelCaching(void)
3507 {
3508     s_DBFilesLock.Unlock();
3509     if (GetDiagCtx()->GetRequestStatus() == eStatus_OK)
3510         GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
3511     CSrvDiagMsg().StopRequest();
3512     ReleaseDiagCtx();
3513 
3514     Terminate();
3515     return NULL;
3516 }
3517 
3518 CBlobCacher::State
x_FinishCaching(void)3519 CBlobCacher::x_FinishCaching(void)
3520 {
3521     s_DBFilesLock.Unlock();
3522     CSrvDiagMsg().StopRequest();
3523     ReleaseDiagCtx();
3524 
3525     s_DiskFlusher->SetRunnable();
3526     s_RecNoSaver->SetRunnable();
3527     s_SpaceShrinker->SetRunnable();
3528     s_ExpiredCleaner->SetRunnable();
3529 
3530     CNCServer::CachingCompleted();
3531 
3532     Terminate();
3533     return NULL;
3534 }
3535 
3536 CBlobCacher::State
x_StartCacheBlobs(void)3537 CBlobCacher::x_StartCacheBlobs(void)
3538 {
3539     m_CurFile = s_DBFiles->begin();
3540     return &CBlobCacher::x_CacheNextFile;
3541 }
3542 
3543 CBlobCacher::State
x_CacheNextFile(void)3544 CBlobCacher::x_CacheNextFile(void)
3545 {
3546 try_next_file:
3547     if (CTaskServer::IsInShutdown())
3548         return &CBlobCacher::x_CancelCaching;
3549     if (m_CurFile == s_DBFiles->end())
3550         return &CBlobCacher::x_CleanOrphanRecs;
3551 
3552     SNCDBFileInfo* file_info = m_CurFile->second.GetNCPointerOrNull();
3553     if (file_info->file_type != eDBFileMeta
3554         ||  m_NewFileIds.find(file_info->file_id) != m_NewFileIds.end())
3555     {
3556         ++m_CurFile;
3557         goto try_next_file;
3558     }
3559     m_CurRecsSet = &m_RecsMap[file_info->file_id];
3560     m_CurRecIt = m_CurRecsSet->begin();
3561     return &CBlobCacher::x_CacheNextRecord;
3562 }
3563 
3564 CBlobCacher::State
x_CacheNextRecord(void)3565 CBlobCacher::x_CacheNextRecord(void)
3566 {
3567     if (CTaskServer::IsInShutdown())
3568         return &CBlobCacher::x_CancelCaching;
3569     if (m_CurRecIt == m_CurRecsSet->end()) {
3570         m_CurRecsSet->clear();
3571         ++m_CurFile;
3572         return &CBlobCacher::x_CacheNextFile;
3573     }
3574 
3575     Uint4 rec_num = *m_CurRecIt;
3576     SNCDBFileInfo* file_info = m_CurFile->second.GetNCPointerOrNull();
3577     SNCDataCoord coord;
3578     coord.file_id = file_info->file_id;
3579     coord.rec_num = rec_num;
3580     SFileIndexRec* ind_rec = s_GetIndexRec(file_info, rec_num);
3581     if (!x_CacheMetaRec(file_info, ind_rec, coord))
3582         s_DeleteIndexRec(file_info, ind_rec);
3583 
3584     ++m_CurRecIt;
3585     SetRunnable();
3586     return NULL;
3587 }
3588 
3589 CBlobCacher::State
x_CleanOrphanRecs(void)3590 CBlobCacher::x_CleanOrphanRecs(void)
3591 {
3592     ITERATE(TFileRecsMap, it_id, m_RecsMap) {
3593         Uint4 file_id = it_id->first;
3594         const TRecNumsSet& recs_set = it_id->second;
3595         TNCDBFilesMap::iterator it_file = s_DBFiles->find(file_id);
3596         if (it_file == s_DBFiles->end()) {
3597             // File could be deleted when we tried to create initial files.
3598             continue;
3599         }
3600         SNCDBFileInfo* file_info = it_file->second;
3601         TRecNumsSet::iterator it_rec = recs_set.begin();
3602         for (; it_rec != recs_set.end(); ++it_rec) {
3603             SFileIndexRec* ind_rec = s_GetIndexRec(file_info, *it_rec);
3604             s_DeleteIndexRec(file_info, ind_rec);
3605         }
3606     }
3607 
3608     return &CBlobCacher::x_FinishCaching;
3609 }
3610 
CBlobCacher(void)3611 CBlobCacher::CBlobCacher(void)
3612 {
3613 #if __NC_TASKS_MONITOR
3614     m_TaskName = "CBlobCacher";
3615 #endif
3616     SetState(&CBlobCacher::x_StartCaching);
3617 }
3618 
~CBlobCacher(void)3619 CBlobCacher::~CBlobCacher(void)
3620 {}
3621 
3622 
3623 void
CheckDiskSpace(void)3624 CNCBlobStorage::CheckDiskSpace(void)
3625 {
3626     Int8 free_space = GetDiskFree();
3627     Int8 allowed_db_size = GetAllowedDBSize(free_space);
3628     Int8 cur_db_size = s_CurDBSize;
3629 
3630     if (s_IsStopWrite == eNoStop
3631         &&  cur_db_size * 100 >= allowed_db_size * s_WarnLimitOnPct)
3632     {
3633         string msg("Current db size is "   + g_ToSizeStr(cur_db_size) +
3634                    ", allowed db size is " + g_ToSizeStr(allowed_db_size) +
3635                    " (" + g_ToSmartStr(cur_db_size * 100 / allowed_db_size) + "%)");
3636         CNCAlerts::Register(CNCAlerts::eDatabaseTooLarge, msg);
3637         ERR_POST(Critical << "ALERT! Database is too large. " << msg);
3638         s_IsStopWrite = eStopWarning;
3639         Logging_DiskSpaceAlert();
3640     }
3641 
3642     if (s_IsStopWrite == eStopWarning) {
3643         if (s_StopWriteOnSize != 0  &&  cur_db_size >= s_StopWriteOnSize) {
3644             s_IsStopWrite = eStopDBSize;
3645             string msg("Current db size is "  + g_ToSizeStr(cur_db_size) +
3646                        ", stopwrite size is " + g_ToSizeStr(s_StopWriteOnSize) +
3647                        " (" + g_ToSmartStr(cur_db_size * 100 / s_StopWriteOnSize) + "%)");
3648             CNCAlerts::Register(CNCAlerts::eDatabaseOverLimit, msg);
3649             ERR_POST(Critical << "Database size exceeded its limit. "
3650                               << msg << ". Will no longer accept any writes from clients.");
3651         }
3652     }
3653     else if (s_IsStopWrite ==  eStopDBSize  &&  cur_db_size <= s_StopWriteOffSize)
3654     {
3655         s_IsStopWrite = eStopWarning;
3656     }
3657     if (free_space <= s_DiskCritical) {
3658         if (s_IsStopWrite < eStopDiskCritical) {
3659             s_IsStopWrite = eStopDiskCritical;
3660             string msg("free "    + g_ToSizeStr(free_space) +
3661                        ", limit " + g_ToSizeStr(s_DiskCritical));
3662             CNCAlerts::Register(CNCAlerts::eDiskSpaceCritical, msg);
3663             ERR_POST(Critical << "Free disk space is below CRITICAL threshold: "
3664                               << msg << ". Will no longer accept any writes.");
3665         }
3666     }
3667     else if (free_space <= s_DiskFreeLimit) {
3668         if (s_IsStopWrite < eStopDiskSpace) {
3669             s_IsStopWrite = eStopDiskSpace;
3670             string msg("free "    + g_ToSizeStr(free_space) +
3671                        ", limit " + g_ToSizeStr(s_DiskFreeLimit));
3672             CNCAlerts::Register(CNCAlerts::eDiskSpaceLow, msg);
3673             ERR_POST(Critical << "Free disk space is below threshold: "
3674                               << msg << ". Will no longer accept any writes from clients.");
3675         }
3676     }
3677     else if (s_IsStopWrite == eStopDiskSpace || s_IsStopWrite == eStopDiskCritical)
3678     {
3679         s_IsStopWrite = eStopWarning;
3680     }
3681 
3682     if (s_IsStopWrite == eStopWarning
3683         &&  cur_db_size * 100 < allowed_db_size * s_WarnLimitOffPct)
3684     {
3685         string msg("Current db size is "   + g_ToSizeStr(cur_db_size) +
3686                    ", allowed db size is " + g_ToSizeStr(allowed_db_size) +
3687                    " (" + g_ToSmartStr(cur_db_size * 100 / allowed_db_size) + "%)");
3688         CNCAlerts::Register(CNCAlerts::eDiskSpaceNormal, msg);
3689         ERR_POST(Critical << "OK. Database is back to normal size. " << msg);
3690         s_IsStopWrite = eNoStop;
3691     }
3692 }
3693 
3694 
3695 CExpiredCleaner::State
x_DeleteNextData(void)3696 CExpiredCleaner::x_DeleteNextData(void)
3697 {
3698     if (CTaskServer::IsInShutdown())
3699         return NULL;
3700 
3701     if (m_CurDelData >= m_CacheDatas.size()) {
3702         if (m_BatchSize < s_GCBatchSize)
3703             ++m_CurBucket;
3704         m_CacheDatas.clear();
3705         return &CExpiredCleaner::x_CleanNextBucket;
3706     }
3707 
3708     SNCCacheData* cache_data = m_CacheDatas[m_CurDelData];
3709 
3710     cache_data->lock.Lock();
3711     {
3712         CSrvDiagMsg diag_msg;
3713         GetDiagCtx()->SetRequestID();
3714         diag_msg.StartRequest().PrintParam("_type", "expiration");
3715         CNCBlobKeyLight key(cache_data->key);
3716         if (key.IsICacheKey()) {
3717             diag_msg.PrintParam("cache",key.Cache()).PrintParam("key",key.RawKey()).PrintParam("subkey",key.SubKey());
3718         } else {
3719             diag_msg.PrintParam("key",key.RawKey());
3720         }
3721         diag_msg.Flush();
3722         diag_msg.StopRequest();
3723     }
3724     SNCDataCoord coord = cache_data->coord;
3725     Uint8 size = cache_data->size;
3726     Uint4 chunk_size = cache_data->chunk_size;
3727     Uint2 map_size = cache_data->map_size;
3728     cache_data->coord.clear();
3729     cache_data->dead_time = 0;
3730     CNCBlobVerManager* mgr = cache_data->Get_ver_mgr();
3731     if (mgr) {
3732         mgr->ObtainReference();
3733     } else {
3734         CNCBlobStorage::ChangeCacheDeadTime(cache_data);
3735     }
3736 
3737     if (!coord.empty()) {
3738         CSrvRef<SNCDBFileInfo> meta_file = s_GetDBFileTry(coord.file_id);
3739         if (meta_file.NotNull()) {
3740             SFileIndexRec* meta_ind = s_GetIndexRecTry(meta_file, coord.rec_num);
3741             if (meta_ind) {
3742                 s_CalcMetaAddress(meta_file, meta_ind);
3743                 if (!meta_ind->chain_coord.empty()) {
3744                     Uint1 map_depth = s_CalcMapDepth(size, chunk_size, map_size);
3745                     s_MoveDataToGarbage(meta_ind->chain_coord, map_depth, coord, true);
3746                 }
3747                 s_MoveRecToGarbage(meta_file, meta_ind);
3748             }
3749 #ifdef _DEBUG
3750             else {
3751 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData2, "DeleteNextData: meta_ind");
3752             }
3753 #endif
3754         }
3755 #ifdef _DEBUG
3756         else {
3757 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData1, "DeleteNextData: meta_file");
3758         }
3759 #endif
3760     }
3761     cache_data->lock.Unlock();
3762 
3763     if (mgr) {
3764         mgr->DeleteDeadVersion(m_NextDead);
3765         mgr->Release();
3766     }
3767     CNCBlobStorage::ReleaseCacheData(cache_data);
3768 
3769     ++m_CurDelData;
3770     SetRunnable();
3771     return NULL;
3772 }
3773 
x_DeleteData(SNCCacheData * cache_data)3774 void CExpiredCleaner::x_DeleteData(SNCCacheData* cache_data)
3775 {
3776 // cache_data is locked by caller
3777     SNCDataCoord coord = cache_data->coord;
3778     Uint8 size = cache_data->size;
3779     Uint4 chunk_size = cache_data->chunk_size;
3780     Uint2 map_size = cache_data->map_size;
3781     cache_data->coord.clear();
3782     if (!coord.empty()) {
3783 #ifdef _DEBUG
3784 CNCAlerts::Register(CNCAlerts::eDebugDeleteVersionData, "x_DeleteData");
3785 #endif
3786         CSrvRef<SNCDBFileInfo> meta_file = s_GetDBFileTry(coord.file_id);
3787         if (meta_file.NotNull()) {
3788             SFileIndexRec* meta_ind = s_GetIndexRecTry(meta_file, coord.rec_num);
3789             if (meta_ind) {
3790                 s_CalcMetaAddress(meta_file, meta_ind);
3791                 if (!meta_ind->chain_coord.empty()) {
3792                     Uint1 map_depth = s_CalcMapDepth(size, chunk_size, map_size);
3793                     s_MoveDataToGarbage(meta_ind->chain_coord, map_depth, coord, true);
3794                 }
3795                 s_MoveRecToGarbage(meta_file, meta_ind);
3796             }
3797 #ifdef _DEBUG
3798             else {
3799 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData2, "DeleteNextData: meta_ind");
3800             }
3801 #endif
3802         }
3803 #ifdef _DEBUG
3804         else {
3805 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData1, "DeleteNextData: meta_file");
3806         }
3807 #endif
3808     }
3809 }
3810 
3811 CExpiredCleaner::State
x_StartSession(void)3812 CExpiredCleaner::x_StartSession(void)
3813 {
3814     m_StartTime = CSrvTime::CurSecs();
3815     m_NextDead = m_StartTime;
3816     if (m_DoExtraGC) {
3817         if (s_CurDBSize <= s_ExtraGCOffSize) {
3818             m_DoExtraGC = false;
3819         }
3820         else {
3821             ++m_ExtraGCTime;
3822             m_NextDead += m_ExtraGCTime;
3823         }
3824     }
3825     else if (s_ExtraGCOnSize != 0
3826              &&  s_CurDBSize >= s_ExtraGCOnSize)
3827     {
3828         m_DoExtraGC = true;
3829         m_ExtraGCTime = 1;
3830         m_NextDead += m_ExtraGCTime;
3831     }
3832 
3833     m_CurBucket = 1;
3834     CreateNewDiagCtx();
3835     return &CExpiredCleaner::x_CleanNextBucket;
3836 }
3837 
3838 CExpiredCleaner::State
x_CleanNextBucket(void)3839 CExpiredCleaner::x_CleanNextBucket(void)
3840 {
3841     if (CTaskServer::IsInShutdown())
3842         return NULL;
3843     // if all buckets clean, finish
3844     if (m_CurBucket > CNCDistributionConf::GetCntTimeBuckets())
3845         return &CExpiredCleaner::x_FinishSession;
3846 
3847     m_BatchSize = 0;
3848     // s_TimeTables has blob info sorted by dead_time
3849     STimeTable* table = s_TimeTables[m_CurBucket];
3850     TTimeTableMap& time_map = table->time_map;
3851     table->lock.Lock();
3852     TTimeTableMap::iterator it = time_map.begin();
3853     SNCCacheData* last_data = NULL;
3854     for (; it != time_map.end(); ++m_BatchSize) {
3855         if (m_BatchSize >= s_GCBatchSize)
3856             break;
3857 
3858 #if __NC_CACHEDATA_INTR_SET
3859         SNCCacheData* cache_data = &*it;
3860 #else
3861         SNCCacheData* cache_data = *it;
3862 #endif
3863         if (cache_data->saved_dead_time >= m_NextDead)
3864             break;
3865 
3866         int dead_time = ACCESS_ONCE(cache_data->dead_time);
3867         if (dead_time != 0) {
3868             // dead_time has changed, put blob back
3869             if (dead_time != cache_data->saved_dead_time) {
3870 #if __NC_CACHEDATA_INTR_SET
3871                 time_map.erase(time_map.iterator_to(*cache_data));
3872                 cache_data->saved_dead_time = dead_time;
3873                 time_map.insert_equal(*cache_data);
3874                 if (last_data) {
3875                     it = time_map.iterator_to(*last_data);
3876                     ++it;
3877                 }
3878                 else
3879                     it = time_map.begin();
3880 #else
3881                 time_map.erase(cache_data);
3882                 cache_data->saved_dead_time = dead_time;
3883                 time_map.insert(cache_data);
3884                 if (last_data) {
3885                     it = time_map.find(last_data);
3886                     ++it;
3887                 } else {
3888                     it = time_map.begin();
3889                 }
3890 #endif
3891                 continue;
3892             }
3893             // increment ref counter
3894             CNCBlobStorage::ReferenceCacheData(cache_data);
3895             m_CacheDatas.push_back(cache_data);
3896         }
3897         last_data = cache_data;
3898         ++it;
3899     }
3900     table->lock.Unlock();
3901 
3902     if (m_BatchSize == 0) {
3903         // goto next bucket
3904         ++m_CurBucket;
3905         SetRunnable();
3906         return NULL;
3907     }
3908 
3909     m_CurDelData = 0;
3910     return &CExpiredCleaner::x_DeleteNextData;
3911 }
3912 
3913 CExpiredCleaner::State
x_FinishSession(void)3914 CExpiredCleaner::x_FinishSession(void)
3915 {
3916     ReleaseDiagCtx();
3917     SetState(&CExpiredCleaner::x_StartSession);
3918     if (!CTaskServer::IsInShutdown()) {
3919         if (CSrvTime::CurSecs() == m_StartTime)
3920             RunAfter(1);
3921         else
3922             SetRunnable();
3923     }
3924     return NULL;
3925 }
3926 
CExpiredCleaner(void)3927 CExpiredCleaner::CExpiredCleaner(void)
3928     : m_DoExtraGC(false)
3929 {
3930 #if __NC_TASKS_MONITOR
3931     m_TaskName = "CExpiredCleaner";
3932 #endif
3933     SetState(&CExpiredCleaner::x_StartSession);
3934 }
3935 
~CExpiredCleaner(void)3936 CExpiredCleaner::~CExpiredCleaner(void)
3937 {}
3938 
3939 
3940 CSpaceShrinker::State
x_MoveRecord(void)3941 CSpaceShrinker::x_MoveRecord(void)
3942 {
3943     CSrvRef<SNCDBFileInfo> chain_file;
3944     SFileIndexRec* chain_ind = NULL;
3945     SFileChunkMapRec* map_rec = NULL;
3946     SFileChunkMapRec* up_map = NULL;
3947     Uint2 up_index = Uint2(-1);
3948 
3949     SNCDataCoord old_coord;
3950     old_coord.file_id = m_MaxFile->file_id;
3951     old_coord.rec_num = m_RecNum;
3952 
3953     SNCDataCoord new_coord;
3954     CSrvRef<SNCDBFileInfo> new_file;
3955     SFileIndexRec* new_ind;
3956     if (!s_GetNextWriteCoord(m_MaxFile->type_index,
3957                              m_IndRec->rec_size, new_coord, new_file, new_ind))
3958     {
3959 #ifdef _DEBUG
3960 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord0,"s_GetNextWriteCoord");
3961 #endif
3962         m_Failed = true;
3963         return &CSpaceShrinker::x_FinishMoveRecord;
3964     }
3965 
3966     memcpy(new_file->file_map + new_ind->offset,
3967            m_MaxFile->file_map + m_IndRec->offset,
3968            m_IndRec->rec_size);
3969 
3970     new_ind->cache_data = m_CacheData;
3971     new_ind->rec_type = m_IndRec->rec_type;
3972     SNCDataCoord chain_coord = m_IndRec->chain_coord;
3973     new_ind->chain_coord = chain_coord;
3974 
3975     if (!chain_coord.empty()) {
3976         chain_file = s_GetDBFileTry(chain_coord.file_id);
3977         if (chain_file.IsNull()) {
3978             goto wipe_new_record;
3979         }
3980         chain_ind = s_GetIndOrDeleted(chain_file, chain_coord.rec_num);
3981         if (s_IsIndexDeleted(chain_file, chain_ind)
3982             ||  chain_file->cnt_unfinished.Get() != 0)
3983         {
3984             goto wipe_new_record;
3985         }
3986 
3987         // Checks below are not just checks for corruption but also
3988         // an opportunity to fault in all database pages that will be necessary
3989         // to make all changes. This way we minimize the time that cache_data
3990         // lock will be held.
3991         switch (m_IndRec->rec_type) {
3992         case eFileRecMeta:
3993             if (chain_ind->chain_coord != old_coord
3994                 &&  !s_IsIndexDeleted(m_MaxFile, m_IndRec))
3995             {
3996                 DB_CORRUPTED("Meta with coord " << old_coord
3997                              << " links down to record with coord " << new_ind->chain_coord
3998                              << " but it has up coord " << chain_ind->chain_coord);
3999             }
4000             break;
4001         case eFileRecChunkMap:
4002             map_rec = s_CalcMapAddress(new_file, new_ind);
4003             up_index = map_rec->map_idx;
4004             Uint2 cnt_downs;
4005             cnt_downs = s_CalcCntMapDowns(new_ind->rec_size);
4006             for (Uint2 i = 0; i < cnt_downs; ++i) {
4007                 SNCDataCoord down_coord = map_rec->down_coords[i];
4008                 CSrvRef<SNCDBFileInfo> down_file = s_GetDBFileTry(down_coord.file_id);
4009                 if (down_file.IsNull()) {
4010                     goto wipe_new_record;
4011                 }
4012                 SFileIndexRec* down_ind = s_GetIndOrDeleted(down_file, down_coord.rec_num);
4013                 if (s_IsIndexDeleted(down_file, down_ind))
4014                     goto wipe_new_record;
4015                 if (down_ind->chain_coord != old_coord) {
4016                     DB_CORRUPTED("Map with coord " << old_coord
4017                                  << " links down to record with coord " << down_coord
4018                                  << " at index " << i
4019                                  << " but it has up coord " << down_ind->chain_coord);
4020                 }
4021             }
4022             goto check_up_index;
4023 
4024         case eFileRecChunkData:
4025             SFileChunkDataRec* data_rec;
4026             data_rec = s_CalcChunkAddress(new_file, new_ind);
4027             up_index = data_rec->chunk_idx;
4028         check_up_index:
4029             if (chain_ind->rec_type == eFileRecChunkMap) {
4030                 up_map = s_CalcMapAddress(chain_file, chain_ind);
4031                 if (up_map->down_coords[up_index] != old_coord) {
4032                     DB_CORRUPTED("Record with coord " << old_coord
4033                                  << " links up to map with coord " << new_ind->chain_coord
4034                                  << " but at the index " << up_index
4035                                  << " it has coord " << up_map->down_coords[up_index]);
4036                 }
4037             }
4038             else if (chain_ind->chain_coord != old_coord) {
4039                 DB_CORRUPTED("Record with coord " << old_coord
4040                              << " links up to meta with coord " << new_ind->chain_coord
4041                              << " but it has down coord " << chain_ind->chain_coord);
4042             }
4043             break;
4044         }
4045     }
4046 
4047     if (!m_CurVer) {
4048         m_CacheData->lock.Lock();
4049         if (m_CacheData->Get_ver_mgr()
4050             ||  m_IndRec->chain_coord != chain_coord
4051             ||  (!chain_coord.empty()  &&  s_IsIndexDeleted(chain_file, chain_ind)
4052                  &&  !s_IsIndexDeleted(m_MaxFile, m_IndRec)))
4053         {
4054             m_Failed = true;
4055             goto unlock_and_wipe;
4056         }
4057         if (m_CacheData->coord.empty()
4058             ||  m_CacheData->dead_time <= CSrvTime::CurSecs() + s_MinMoveLife
4059             ||  s_IsIndexDeleted(m_MaxFile, m_IndRec))
4060         {
4061             goto unlock_and_wipe;
4062         }
4063     }
4064 
4065     switch (m_IndRec->rec_type) {
4066     case eFileRecMeta:
4067 #ifdef _DEBUG
4068 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord1,"eFileRecMeta");
4069 #endif
4070         if (m_CurVer) {
4071             if (m_CurVer->coord != old_coord) {
4072                 // If coord for the version changed under us it's a bug.
4073                 DB_CORRUPTED("Coord " << old_coord << " of the current version for blob "
4074                              << m_CacheData->key << " changed while move was in progress.");
4075             }
4076             m_CurVer->coord = new_coord;
4077         }
4078         else {
4079             if (m_CacheData->coord != old_coord) {
4080                 // If there is no VerManager and still coord is something different
4081                 // it's a bug.
4082                 DB_CORRUPTED("Meta record with coord " << old_coord
4083                              << " points to cache_data with key " << m_CacheData->key
4084                              << " which has different coord " << m_CacheData->coord << ".");
4085             }
4086             m_CacheData->coord = new_coord;
4087         }
4088         if (chain_ind)
4089             chain_ind->chain_coord = new_coord;
4090         break;
4091     case eFileRecChunkMap:
4092 #ifdef _DEBUG
4093 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord2,"eFileRecChunkMap");
4094 #endif
4095         if (!s_UpdateUpCoords(map_rec, new_ind, new_coord)) {
4096             if (!m_CurVer) {
4097                 goto unlock_and_wipe;
4098             } else {
4099                 goto wipe_new_record;
4100             }
4101         }
4102         if (m_CurVer)
4103             ++m_CurVer->map_move_counter;
4104         goto update_up_map;
4105 
4106     case eFileRecChunkData:
4107 #ifdef _DEBUG
4108 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord3,"eFileRecChunkData");
4109 #endif
4110         if (m_CurVer) {
4111             SFileChunkDataRec* new_data = s_CalcChunkAddress(new_file, new_ind);
4112             m_CurVer->chunks[new_data->chunk_num] = (char*)new_data->chunk_data;
4113         }
4114     update_up_map:
4115         if (up_map) {
4116             up_map->down_coords[up_index] = new_coord;
4117         }
4118         else {
4119             chain_ind->chain_coord = new_coord;
4120             if (m_CurVer)
4121                 m_CurVer->data_coord = new_coord;
4122         }
4123         break;
4124     }
4125 
4126     if (m_CurVer) {
4127         if (m_MovingMeta) {
4128             s_MoveRecToGarbage(m_MaxFile, m_IndRec);
4129         }
4130         else {
4131             CMovedRecDeleter* deleter = new CMovedRecDeleter(m_MaxFile, m_IndRec);
4132             deleter->CallRCU();
4133         }
4134     }
4135     else {
4136         m_CacheData->lock.Unlock();
4137         s_MoveRecToGarbage(m_MaxFile, m_IndRec);
4138     }
4139     new_file->cnt_unfinished.Add(-1);
4140 
4141     ++m_CntMoved;
4142     m_SizeMoved += m_IndRec->rec_size + sizeof(SFileIndexRec);
4143 
4144     return &CSpaceShrinker::x_FinishMoveRecord;
4145 
4146 unlock_and_wipe:
4147     m_CacheData->lock.Unlock();
4148 wipe_new_record:
4149     new_ind->rec_type = eFileRecChunkData;
4150     new_ind->chain_coord.clear();
4151     s_MoveRecToGarbage(new_file, new_ind);
4152     new_file->cnt_unfinished.Add(-1);
4153 
4154     return &CSpaceShrinker::x_FinishMoveRecord;
4155 }
4156 
4157 CSpaceShrinker::State
x_PrepareToShrink(void)4158 CSpaceShrinker::x_PrepareToShrink(void)
4159 {
4160     if (CTaskServer::IsInShutdown())
4161         return NULL;
4162 
4163     int cur_time = CSrvTime::CurSecs();
4164     bool need_move = s_CurDBSize >= s_MinDBSize
4165                      &&  s_GarbageSize * 100 > s_CurDBSize * s_MaxGarbagePct;
4166     m_MaxFile = NULL;
4167 
4168     double max_pct = 0;
4169     Uint8 total_rel_used = 0;
4170     Uint8 total_rel_garb = 0;
4171 
4172     s_DBFilesLock.Lock();
4173     ITERATE(TNCDBFilesMap, it_file, (*s_DBFiles)) {
4174         SNCDBFileInfo* this_file = it_file->second.GetNCPointerOrNull();
4175         s_NextWriteLock.Lock();
4176         bool is_current = false;
4177         for (size_t i = 0; i < s_CntAllFiles; ++i) {
4178             if (this_file == s_AllWritings[i].cur_file
4179                 ||  this_file == s_AllWritings[i].next_file)
4180             {
4181                 is_current = true;
4182                 break;
4183             }
4184         }
4185         s_NextWriteLock.Unlock();
4186         if (is_current  ||  this_file->cnt_unfinished.Get() != 0)
4187             continue;
4188 
4189         if (this_file->used_size == 0) {
4190             m_FilesToDel.push_back(SrvRef(this_file));
4191         }
4192         else if (need_move) {
4193             if (cur_time >= this_file->next_shrink_time) {
4194                 this_file->info_lock.Lock();
4195                 this_file->is_releasing = false;
4196                 if (this_file->garb_size + this_file->used_size != 0) {
4197 #if 0
4198                     double this_pct = double(this_file->garb_size)
4199                                       / (this_file->garb_size + this_file->used_size);
4200 #else
4201                     double this_pct = double(this_file->garb_size) / double(this_file->file_size);
4202 #endif
4203                     if (this_pct > max_pct) {
4204                         max_pct = this_pct;
4205                         m_MaxFile = this_file;
4206                     }
4207                 }
4208                 this_file->info_lock.Unlock();
4209             }
4210             else if (this_file->is_releasing) {
4211                 this_file->info_lock.Lock();
4212                 total_rel_used += this_file->used_size;
4213                 total_rel_garb += this_file->garb_size;
4214                 this_file->info_lock.Unlock();
4215             }
4216         }
4217     }
4218     s_DBFilesLock.Unlock();
4219 
4220     if (max_pct < 0.9) {
4221         Uint8 proj_garbage = s_GarbageSize - total_rel_garb;
4222         Uint8 proj_size = s_CurDBSize - (total_rel_garb + total_rel_used);
4223         if (need_move
4224             &&  (proj_garbage * 100 <= proj_size * s_MaxGarbagePct
4225                  ||  max_pct * 100 <= s_MaxGarbagePct))
4226         {
4227             m_MaxFile = NULL;
4228         }
4229     }
4230 
4231     m_CurDelFile = m_FilesToDel.begin();
4232     SetState(&CSpaceShrinker::x_DeleteNextFile);
4233     SetRunnable();
4234     return NULL;
4235 }
4236 
4237 CSpaceShrinker::State
x_DeleteNextFile(void)4238 CSpaceShrinker::x_DeleteNextFile(void)
4239 {
4240     if (CTaskServer::IsInShutdown())
4241         return NULL;
4242 
4243     if (m_CurDelFile == m_FilesToDel.end()) {
4244         m_FilesToDel.clear();
4245         return &CSpaceShrinker::x_StartMoves;
4246     }
4247 
4248 #ifdef _DEBUG
4249 CNCAlerts::Register(CNCAlerts::eDebugDeleteFile,"x_DeleteNextFile");
4250 #endif
4251     s_DeleteDBFile(*m_CurDelFile, true);
4252     m_CurDelFile->Reset();
4253     ++m_CurDelFile;
4254     SetRunnable();
4255     return NULL;
4256 }
4257 
4258 CSpaceShrinker::State
x_StartMoves(void)4259 CSpaceShrinker::x_StartMoves(void)
4260 {
4261     m_StartTime = CSrvTime::CurSecs();
4262 /*
4263     bool need_move = s_CurDBSize >= s_MinDBSize
4264                      &&  s_GarbageSize * 100 > s_CurDBSize * s_MaxGarbagePct;
4265 */
4266     if (!m_MaxFile  /*||  !need_move*/)
4267         return &CSpaceShrinker::x_FinishSession;
4268 
4269     CreateNewDiagCtx();
4270     CSrvDiagMsg().StartRequest()
4271                  .PrintParam("_type", "move")
4272                  .PrintParam("file_id", m_MaxFile->file_id);
4273 
4274     m_Failed = false;
4275     m_PrevRecNum = 0;
4276     m_LastAlive = 0;
4277     m_CntProcessed = 0;
4278     m_CntMoved = 0;
4279     m_SizeMoved = 0;
4280 
4281     return &CSpaceShrinker::x_MoveNextRecord;
4282 }
4283 
4284 CSpaceShrinker::State
x_MoveNextRecord(void)4285 CSpaceShrinker::x_MoveNextRecord(void)
4286 {
4287     if (CTaskServer::IsInShutdown())
4288         return &CSpaceShrinker::x_FinishMoves;
4289 
4290     int cur_time = CSrvTime::CurSecs();
4291     m_MaxFile->info_lock.Lock();
4292     m_RecNum = m_LastAlive;
4293     m_IndRec = m_MaxFile->index_head - m_RecNum;
4294     if (m_RecNum != 0  &&  s_IsIndexDeleted(m_MaxFile, m_IndRec)) {
4295         m_LastAlive = m_RecNum = 0;
4296         m_IndRec = m_MaxFile->index_head;
4297     }
4298     do {
4299         if (m_RecNum > m_PrevRecNum)
4300             ++m_CntProcessed;
4301         if (m_IndRec->next_num == 0) {
4302             if (m_MaxFile->index_head->next_num == 0  &&  m_MaxFile->used_size != 0) {
4303                 SRV_FATAL("Blob coords index broken");
4304             }
4305             m_IndRec = NULL;
4306             break;
4307         }
4308         m_LastAlive = m_RecNum;
4309         m_RecNum = m_IndRec->next_num;
4310         m_IndRec = m_MaxFile->index_head - m_RecNum;
4311     }
4312     while (m_RecNum <= m_PrevRecNum
4313 #if !__NC_CACHEDATA_ALL_MONITOR
4314            ||  m_IndRec->cache_data->dead_time <= cur_time + s_MinMoveLife
4315 #endif
4316            );
4317     m_MaxFile->info_lock.Unlock();
4318     if (!m_IndRec)
4319         return &CSpaceShrinker::x_FinishMoves;
4320     if (s_IsIndexDeleted(m_MaxFile, m_IndRec))
4321         return &CSpaceShrinker::x_FinishMoveRecord;
4322 
4323     m_CacheData = m_IndRec->cache_data;
4324 
4325 #if __NC_CACHEDATA_ALL_MONITOR
4326     bool found = false;
4327     SAllCacheTable* table = nullptr;
4328     {
4329         vector<SAllCacheTable*> v_all;
4330         v_all.reserve(s_AllCache.size());
4331         ITERATE(TAllCacheBuckets, tt, s_AllCache) {
4332             v_all.push_back(tt->second);
4333         }
4334         random_shuffle(v_all.begin(), v_all.end());
4335         ITERATE(vector<SAllCacheTable*>, tt, v_all) {
4336             table = *tt;
4337             table->lock.Lock();
4338             found = table->all_cache_set.find(m_CacheData) != table->all_cache_set.end();
4339             table->lock.Unlock();
4340             if (found) {
4341                 break;
4342             }
4343         }
4344     }
4345     if (found) {
4346         if (m_CacheData->dead_time == 0) {
4347             table->lock.Lock();
4348             table->all_cache_set.erase(m_CacheData);
4349             table->lock.Unlock();
4350 //            found = false;
4351 #ifdef _DEBUG
4352 CNCAlerts::Register(CNCAlerts::eDebugWrongCacheFound2, "CSpaceShrinker::x_MoveNextRecord");
4353 #endif
4354         }
4355     }
4356     if (!found) {
4357 #ifdef _DEBUG
4358 CNCAlerts::Register(CNCAlerts::eDebugWrongCacheFound1, "CSpaceShrinker::x_MoveNextRecord");
4359 #endif
4360         m_CacheData = nullptr;
4361         m_MaxFile->info_lock.Lock();
4362         if (!s_IsIndexDeleted(m_MaxFile, m_IndRec)) {
4363             s_MoveRecToGarbageNoLock(m_MaxFile, m_IndRec);
4364         }
4365         m_MaxFile->info_lock.Unlock();
4366         return &CSpaceShrinker::x_FinishMoveRecord;
4367     }
4368 #endif // __NC_CACHEDATA_ALL_MONITOR
4369 
4370     m_CacheData->lock.Lock();
4371     bool need_move = m_CacheData->dead_time > cur_time + s_MinMoveLife;
4372     if (need_move) {
4373         CNCBlobStorage::ReferenceCacheData(m_CacheData);
4374         m_VerMgr = m_CacheData->Get_ver_mgr();
4375         if (m_VerMgr) {
4376             m_VerMgr->ObtainReference();
4377             m_CacheData->lock.Unlock();
4378 
4379             m_VerMgr->RequestCurVersion(this);
4380             return &CSpaceShrinker::x_CheckCurVersion;
4381         }
4382     }
4383     m_CacheData->lock.Unlock();
4384     if (!need_move) {
4385         return &CSpaceShrinker::x_FinishMoves;
4386     }
4387 
4388     if (s_IsIndexDeleted(m_MaxFile, m_IndRec)) {
4389         CNCBlobStorage::ReleaseCacheData(m_CacheData);
4390         m_CacheData = nullptr;
4391         SetRunnable();
4392         return NULL;
4393     }
4394     return &CSpaceShrinker::x_MoveRecord;
4395 }
4396 
4397 SNCDataCoord
x_FindMetaCoord(SNCDataCoord coord,Uint1 max_map_depth)4398 CSpaceShrinker::x_FindMetaCoord(SNCDataCoord coord, Uint1 max_map_depth)
4399 {
4400     if (max_map_depth == 0  ||  coord.empty())
4401         return coord;
4402 
4403     CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(coord.file_id);
4404     if (file_info.IsNull()) {
4405         coord.clear();
4406         return coord;
4407     }
4408     SFileIndexRec* ind_rec = s_GetIndOrDeleted(file_info, coord.rec_num);
4409     if (s_IsIndexDeleted(file_info, ind_rec)  ||  ind_rec->rec_type == eFileRecMeta)
4410         return coord;
4411 
4412     return x_FindMetaCoord(ind_rec->chain_coord, max_map_depth - 1);
4413 }
4414 
4415 CSpaceShrinker::State
x_CheckCurVersion(void)4416 CSpaceShrinker::x_CheckCurVersion(void)
4417 {
4418     if (!IsTransFinished())
4419         return NULL;
4420     if (CTaskServer::IsInShutdown())
4421         return &CSpaceShrinker::x_FinishMoveRecord;
4422 
4423     if (s_IsIndexDeleted(m_MaxFile, m_IndRec))
4424         return &CSpaceShrinker::x_FinishMoveRecord;
4425 
4426     m_CurVer = m_VerMgr->GetCurVersion();
4427     if (!m_CurVer)
4428         return &CSpaceShrinker::x_FinishMoveRecord;
4429 
4430     if (m_IndRec->rec_type == eFileRecChunkData) {
4431         SFileChunkDataRec* data_rec = s_CalcChunkAddress(m_MaxFile, m_IndRec);
4432         Uint8 chunk_num = data_rec->chunk_num;
4433         char* cur_chunk_ptr = NULL;
4434         if (m_CurVer->chunks.size() > chunk_num)
4435             cur_chunk_ptr = ACCESS_ONCE(m_CurVer->chunks[chunk_num]);
4436         if (cur_chunk_ptr) {
4437             if (cur_chunk_ptr != (char*)data_rec->chunk_data)
4438                 return &CSpaceShrinker::x_FinishMoveRecord;
4439             if (m_CurVer->coord.empty()) {
4440                 m_Failed = true;
4441                 return &CSpaceShrinker::x_FinishMoveRecord;
4442             }
4443         }
4444         else {
4445             SNCDataCoord meta_coord = x_FindMetaCoord(m_IndRec->chain_coord,
4446                                                       m_CurVer->map_depth);
4447             if (meta_coord.empty()  ||  meta_coord != m_CurVer->coord)
4448                 return &CSpaceShrinker::x_FinishMoveRecord;
4449         }
4450     }
4451     else if (m_IndRec->rec_type == eFileRecChunkMap) {
4452         if (m_CurVer->map_depth == 0)
4453             return &CSpaceShrinker::x_FinishMoveRecord;
4454         SNCDataCoord meta_coord = x_FindMetaCoord(m_IndRec->chain_coord,
4455                                                   m_CurVer->map_depth - 1);
4456         if (meta_coord.empty()  &&  m_CurVer->coord.empty()) {
4457             m_Failed = true;
4458             return &CSpaceShrinker::x_FinishMoveRecord;
4459         }
4460         if (meta_coord.empty()  ||  meta_coord != m_CurVer->coord)
4461             return &CSpaceShrinker::x_FinishMoveRecord;
4462     }
4463     else if (m_CurVer->coord.file_id != m_MaxFile->file_id
4464              ||  m_CurVer->coord.rec_num != m_RecNum)
4465     {
4466         return &CSpaceShrinker::x_FinishMoveRecord;
4467     }
4468     else if (m_CurVer->move_or_rewrite
4469              ||  !AtomicCAS(m_CurVer->move_or_rewrite, false, true))
4470     {
4471         m_Failed = true;
4472         return &CSpaceShrinker::x_FinishMoveRecord;
4473     }
4474 
4475     m_MovingMeta = m_IndRec->rec_type == eFileRecMeta;
4476     if (m_CurVer->dead_time < CSrvTime::CurSecs() + s_MinMoveLife)
4477         return &CSpaceShrinker::x_FinishMoveRecord;
4478     else
4479         return &CSpaceShrinker::x_MoveRecord;
4480 }
4481 
4482 CSpaceShrinker::State
x_FinishMoveRecord(void)4483 CSpaceShrinker::x_FinishMoveRecord(void)
4484 {
4485     if (m_CacheData) {
4486         CNCBlobStorage::ReleaseCacheData(m_CacheData);
4487         m_CacheData = nullptr;
4488     }
4489     if (m_VerMgr) {
4490         if (m_CurVer) {
4491             if (m_MovingMeta)
4492                 m_CurVer->move_or_rewrite = false;
4493             m_CurVer.Reset();
4494         }
4495         m_VerMgr->Release();
4496         m_VerMgr = NULL;
4497     }
4498 
4499     ++m_CntProcessed;
4500     m_PrevRecNum = m_RecNum;
4501 
4502     if (m_Failed || CTaskServer::IsInShutdown())
4503         SetState(&CSpaceShrinker::x_FinishMoves);
4504     else
4505         SetState(&CSpaceShrinker::x_MoveNextRecord);
4506     SetRunnable();
4507     return NULL;
4508 }
4509 
4510 CSpaceShrinker::State
x_FinishMoves(void)4511 CSpaceShrinker::x_FinishMoves(void)
4512 {
4513     if (!m_Failed) {
4514         m_MaxFile->is_releasing = true;
4515         if (m_MaxFile->used_size == 0)
4516             s_DeleteDBFile(m_MaxFile, true);
4517         else if (m_CntProcessed == 0) {
4518             SRV_LOG(Warning, "Didn't find anything to process in the file");
4519             m_MaxFile->next_shrink_time = CSrvTime::CurSecs() + max(s_MinMoveLife, 300);
4520         }
4521         else
4522             m_MaxFile->next_shrink_time = CSrvTime::CurSecs() + s_MinMoveLife;
4523     }
4524     else {
4525         GetDiagCtx()->SetRequestStatus(eStatus_CmdAborted);
4526         m_MaxFile->next_shrink_time = CSrvTime::CurSecs() + s_FailedMoveDelay;
4527     }
4528     m_MaxFile.Reset();
4529 
4530     CSrvDiagMsg().PrintExtra()
4531                  .PrintParam("cnt_processed", m_CntProcessed)
4532                  .PrintParam("cnt_moved", m_CntMoved)
4533                  .PrintParam("size_moved", m_SizeMoved);
4534     CSrvDiagMsg().StopRequest();
4535     ReleaseDiagCtx();
4536     CNCStat::DBFileCleaned(!m_Failed, m_CntProcessed, m_CntMoved, m_SizeMoved);
4537 
4538     return &CSpaceShrinker::x_FinishSession;
4539 }
4540 
4541 CSpaceShrinker::State
x_FinishSession(void)4542 CSpaceShrinker::x_FinishSession(void)
4543 {
4544     SetState(&CSpaceShrinker::x_PrepareToShrink);
4545     if (!CTaskServer::IsInShutdown()) {
4546         if (CSrvTime::CurSecs() != m_StartTime)
4547             SetRunnable();
4548         else
4549             RunAfter(1);
4550     }
4551     return NULL;
4552 }
4553 
CSpaceShrinker(void)4554 CSpaceShrinker::CSpaceShrinker(void)
4555 {
4556 #if __NC_TASKS_MONITOR
4557     m_TaskName = "CSpaceShrinker";
4558 #endif
4559     SetState(&CSpaceShrinker::x_PrepareToShrink);
4560 }
4561 
~CSpaceShrinker(void)4562 CSpaceShrinker::~CSpaceShrinker(void)
4563 {}
4564 
4565 
CMovedRecDeleter(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)4566 CMovedRecDeleter::CMovedRecDeleter(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
4567     : m_FileInfo(file_info),
4568       m_IndRec(ind_rec)
4569 {}
4570 
~CMovedRecDeleter(void)4571 CMovedRecDeleter::~CMovedRecDeleter(void)
4572 {}
4573 
4574 void
ExecuteRCU(void)4575 CMovedRecDeleter::ExecuteRCU(void)
4576 {
4577     s_MoveRecToGarbage(m_FileInfo, m_IndRec);
4578     delete this;
4579 }
4580 
4581 
CRecNoSaver(void)4582 CRecNoSaver::CRecNoSaver(void)
4583 {
4584 #if __NC_TASKS_MONITOR
4585     m_TaskName = "CRecNoSaver";
4586 #endif
4587 }
4588 
~CRecNoSaver(void)4589 CRecNoSaver::~CRecNoSaver(void)
4590 {}
4591 
4592 void
ExecuteSlice(TSrvThreadNum)4593 CRecNoSaver::ExecuteSlice(TSrvThreadNum /* thr_num */)
4594 {
4595 
4596     if (s_NeedSavePurgeData) {
4597         s_NeedSavePurgeData = false;
4598         s_IndexLock.Lock();
4599         try {
4600             string forget = CNCBlobAccessor::GetPurgeData(';');
4601             INFO("Updated Purge data: " << forget);
4602             s_IndexDB->UpdatePurgeData(forget);
4603         }
4604         catch (CSQLITE_Exception&) {
4605         }
4606         s_IndexLock.Unlock();
4607     }
4608 
4609 // max record number used in sync logs
4610     int cur_time = CSrvTime::CurSecs();
4611     int next_save = s_LastRecNoSaveTime + s_MinRecNoSavePeriod;
4612     if (!s_NeedSaveLogRecNo)
4613         next_save = cur_time + s_MinRecNoSavePeriod;
4614     else if (CTaskServer::IsInShutdown())
4615         next_save = cur_time;
4616 
4617     if (cur_time < next_save) {
4618         if (!CTaskServer::IsInShutdown())
4619             RunAfter(next_save - cur_time);
4620         return;
4621     }
4622 
4623     s_NeedSaveLogRecNo = false;
4624     Uint8 log_rec_no = CNCSyncLog::GetLastRecNo();
4625     s_IndexLock.Lock();
4626     try {
4627         s_IndexDB->SetMaxSyncLogRecNo(log_rec_no);
4628     }
4629     catch (CSQLITE_Exception& ex) {
4630         SRV_LOG(Critical, "Cannot save sync log record number: " << ex);
4631     }
4632     s_IndexLock.Unlock();
4633 
4634     s_LastRecNoSaveTime = cur_time;
4635     RunAfter(s_MinRecNoSavePeriod);
4636 }
4637 
4638 
CDiskFlusher(void)4639 CDiskFlusher::CDiskFlusher(void)
4640 {
4641 #if __NC_TASKS_MONITOR
4642     m_TaskName = "CDiskFlusher";
4643 #endif
4644     SetState(&CDiskFlusher::x_CheckFlushTime);
4645 }
4646 
~CDiskFlusher(void)4647 CDiskFlusher::~CDiskFlusher(void)
4648 {}
4649 
4650 CDiskFlusher::State
x_CheckFlushTime(void)4651 CDiskFlusher::x_CheckFlushTime(void)
4652 {
4653     if (!s_FlushTimePeriod  ||  CTaskServer::IsInShutdown())
4654         return NULL;
4655 
4656     int cur_time = CSrvTime::CurSecs();
4657     if (cur_time < s_LastFlushTime + s_FlushTimePeriod) {
4658         RunAfter(s_LastFlushTime + s_FlushTimePeriod - cur_time);
4659         return NULL;
4660     }
4661 
4662     m_LastId = 0;
4663     return &CDiskFlusher::x_FlushNextFile;
4664 }
4665 
4666 CDiskFlusher::State
x_FlushNextFile(void)4667 CDiskFlusher::x_FlushNextFile(void)
4668 {
4669     if (CTaskServer::IsInShutdown())
4670         return NULL;
4671 
4672     s_DBFilesLock.Lock();
4673     TNCDBFilesMap::iterator file_it = s_DBFiles->upper_bound(m_LastId);
4674     if (file_it == s_DBFiles->end()) {
4675         s_DBFilesLock.Unlock();
4676         s_LastFlushTime = CSrvTime::CurSecs();
4677         return &CDiskFlusher::x_CheckFlushTime;
4678     }
4679     CSrvRef<SNCDBFileInfo> file_info = file_it->second;
4680     s_DBFilesLock.Unlock();
4681 
4682 #ifdef NCBI_OS_LINUX
4683     if (msync(file_info->file_map, file_info->file_size, MS_SYNC)) {
4684         SRV_LOG(Critical, "Unable to sync file" << file_info->file_name
4685                           << ", errno=" << errno);
4686     }
4687 #endif
4688 
4689     m_LastId = file_info->file_id;
4690     SetRunnable();
4691     return NULL;
4692 }
4693 
4694 
CNewFileCreator(void)4695 CNewFileCreator::CNewFileCreator(void)
4696 {
4697 #if __NC_TASKS_MONITOR
4698     m_TaskName = "CNewFileCreator";
4699 #endif
4700 }
4701 
~CNewFileCreator(void)4702 CNewFileCreator::~CNewFileCreator(void)
4703 {}
4704 
4705 void
ExecuteSlice(TSrvThreadNum)4706 CNewFileCreator::ExecuteSlice(TSrvThreadNum /* thr_num */)
4707 {
4708 // create new (next) db file
4709     for (size_t i = 0; i < s_CntAllFiles; ++i) {
4710         bool need_new = false;
4711         s_NextWriteLock.Lock();
4712         need_new = s_AllWritings[i].next_file == NULL;
4713         s_NextWriteLock.Unlock();
4714         if (need_new) {
4715             s_CreateNewFile(i, true);
4716             SetRunnable();
4717             break;
4718         }
4719     }
4720 }
4721 
4722 
4723 void
ExecuteRCU(void)4724 SNCCacheData::ExecuteRCU(void)
4725 {
4726     delete this;
4727 }
4728 
4729 END_NCBI_SCOPE
4730