1 /* $Id: nc_storage.cpp 593132 2019-09-12 15:33:23Z gouriano $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Pavel Ivanov
27 */
28
29 #include "nc_pch.hpp"
30
31 #include <corelib/ncbireg.hpp>
32 #include <corelib/ncbifile.hpp>
33 #include <corelib/request_ctx.hpp>
34 #include <corelib/ncbi_process.hpp>
35
36 #include "netcached.hpp"
37 #include "nc_storage.hpp"
38 #include "storage_types.hpp"
39 #include "nc_db_files.hpp"
40 #include "distribution_conf.hpp"
41 #include "nc_storage_blob.hpp"
42 #include "sync_log.hpp"
43 #include "nc_stat.hpp"
44 #include "logging.hpp"
45 #include "peer_control.hpp"
46
47
48 #ifdef NCBI_OS_LINUX
49 # include <sys/types.h>
50 # include <sys/stat.h>
51 # include <fcntl.h>
52 # include <sys/mman.h>
53 #endif
54
55 #define __NC_CACHEDATA_ALL_MONITOR 0
56 // uses Boost intrusive rbtree to hold SNCCacheData (versus std::set)
57 #define __NC_CACHEDATA_INTR_SET 1
58
59 BEGIN_NCBI_SCOPE
60
61
62 static const char* kNCStorage_DBFileExt = ".db";
63 static const char* kNCStorage_MetaFileSuffix = "";
64 static const char* kNCStorage_DataFileSuffix = "";
65 static const char* kNCStorage_MapsFileSuffix = "";
66 static const char* kNCStorage_IndexFileSuffix = ".index";
67 static const char* kNCStorage_StartedFileName = "__ncbi_netcache_started__";
68
69 static const char* kNCStorage_RegSection = "storage";
70 static const char* kNCStorage_PathParam = "path";
71 static const char* kNCStorage_FilePrefixParam = "prefix";
72 static const char* kNCStorage_GuardNameParam = "guard_file_name";
73 static const char* kNCStorage_FileSizeParam = "each_file_size";
74 static const char* kNCStorage_GarbagePctParam = "max_garbage_pct";
75 static const char* kNCStorage_MinDBSizeParam = "min_storage_size";
76 static const char* kNCStorage_MoveLifeParam = "min_lifetime_to_move";
77 static const char* kNCStorage_FailedMoveParam = "failed_move_delay";
78 static const char* kNCStorage_GCBatchParam = "gc_batch_size";
79 static const char* kNCStorage_FlushTimeParam = "sync_time_period";
80 static const char* kNCStorage_ExtraGCOnParam = "db_limit_del_old_on";
81 static const char* kNCStorage_ExtraGCOffParam = "db_limit_del_old_off";
82 static const char* kNCStorage_StopWriteOnParam = "db_limit_stop_write_on";
83 static const char* kNCStorage_StopWriteOffParam = "db_limit_stop_write_off";
84 static const char* kNCStorage_MinFreeDiskParam = "disk_free_limit";
85 static const char* kNCStorage_DiskCriticalParam = "critical_disk_free_limit";
86 static const char* kNCStorage_MinRecNoSaveParam = "min_rec_no_save_period";
87 static const char* kNCStorage_FailedWriteSize = "failed_write_blob_key_count";
88 static const char* kNCStorage_MaxBlobSizeStore = "max_blob_size_store";
89 static const char* kNCStorage_WbMemRelease = "task_priority_wb_memrelease";
90
91
92 // storage file type signatures
93 static const Uint8 kMetaSignature = NCBI_CONST_UINT8(0xeed5be66cdafbfa3);
94 static const Uint8 kDataSignature = NCBI_CONST_UINT8(0xaf9bedf24cfa05ed);
95 static const Uint8 kMapsSignature = NCBI_CONST_UINT8(0xba6efd7b61fdff6c);
96 static const Uint1 kSignatureSize = sizeof(kMetaSignature);
97
98
99 /// Size of memory page that is a granularity of all allocations from OS.
100 static const size_t kMemPageSize = 4 * 1024;
101 /// Mask that can move pointer address or memory size to the memory page
102 /// boundary.
103 static const size_t kMemPageAlignMask = ~(kMemPageSize - 1);
104
105
106 enum EStopCause {
107 eNoStop,
108 eStopWarning,
109 eStopDBSize,
110 eStopDiskSpace,
111 eStopDiskCritical
112 };
113
114
115 #if __NC_CACHEDATA_INTR_SET
116 struct SCacheDeadCompare
117 {
operator ()SCacheDeadCompare118 bool operator() (const SNCCacheData& x, const SNCCacheData& y) const
119 {
120 return x.saved_dead_time < y.saved_dead_time;
121 }
122 };
123
124 struct SCacheKeyCompare
125 {
operator ()SCacheKeyCompare126 bool operator() (const SNCCacheData& x, const SNCCacheData& y) const
127 {
128 return x.key < y.key;
129 }
operator ()SCacheKeyCompare130 bool operator() (const string& key, const SNCCacheData& y) const
131 {
132 return key < y.key;
133 }
operator ()SCacheKeyCompare134 bool operator() (const SNCCacheData& x, const string& key) const
135 {
136 return x.key < key;
137 }
138 };
139
140 typedef intr::rbtree<SNCCacheData,
141 intr::base_hook<TTimeTableHook>,
142 intr::constant_time_size<false>,
143 intr::compare<SCacheDeadCompare> > TTimeTableMap;
144 typedef intr::rbtree<SNCCacheData,
145 intr::base_hook<TKeyMapHook>,
146 intr::constant_time_size<true>,
147 intr::compare<SCacheKeyCompare> > TKeyMap;
148 #else // __NC_CACHEDATA_INTR_SET
149 struct SCacheDeadCompare
150 {
operator ()SCacheDeadCompare151 bool operator() (const SNCCacheData* x, const SNCCacheData* y) const
152 {
153 return (x->saved_dead_time != y->saved_dead_time) ?
154 (x->saved_dead_time < y->saved_dead_time) : (x->key < y->key);
155 }
156 };
157 struct SCacheKeyCompare
158 {
operator ()SCacheKeyCompare159 bool operator() (const SNCCacheData* x, const SNCCacheData* y) const
160 {
161 return x->key < y->key;
162 }
163 };
164 typedef std::set<SNCCacheData*, SCacheDeadCompare> TTimeTableMap;
165 typedef std::set<SNCCacheData*, SCacheKeyCompare> TKeyMap;
166
167 #endif // __NC_CACHEDATA_INTR_SET
168
169 struct SBucketCache
170 {
171 CMiniMutex lock;
172 TKeyMap key_map;
173 };
174 typedef map<Uint2, SBucketCache*> TBucketCacheMap;
175
176 struct STimeTable
177 {
178 CMiniMutex lock;
179 TTimeTableMap time_map;
180 };
181 typedef map<Uint2, STimeTable*> TTimeBuckets;
182
183 #if __NC_CACHEDATA_ALL_MONITOR
184 struct SAllCacheTable
185 {
186 CMiniMutex lock;
187 set<SNCCacheData *> all_cache_set;
188 };
189 typedef map<Uint2, SAllCacheTable*> TAllCacheBuckets;
190 #endif
191
192 #if __NC_CACHEDATA_MONITOR
193 static CMiniMutex s_AllCacheLock;
194 static set<SNCCacheData *> s_AllCacheSet;
x_Register(void)195 void SNCCacheData::x_Register(void)
196 {
197 s_AllCacheLock.Lock();
198 s_AllCacheSet.insert(this);
199 s_AllCacheLock.Unlock();
200 }
x_Revoke(void)201 void SNCCacheData::x_Revoke(void)
202 {
203 s_AllCacheLock.Lock();
204 s_AllCacheSet.erase(this);
205 s_AllCacheLock.Unlock();
206 }
207 #endif
208
209
210 /// Directory for all database files of the storage
211 static string s_Path;
212 /// Name of the storage
213 static string s_Prefix;
214 /// Number of blobs treated by GC and by caching mechanism in one batch
215 static int s_GCBatchSize = 0;
216 static int s_FlushTimePeriod = 0;
217 static int s_MaxGarbagePct = 0;
218 static int s_MinMoveLife = 0;
219 static int s_FailedMoveDelay = 0;
220 static Int8 s_MinDBSize = 0;
221 /// Name of guard file excluding several instances to run on the same
222 /// database.
223 static string s_GuardName;
224 /// Lock for guard file representing this instance running.
225 /// It will hold exclusive lock all the time while NetCache is
226 /// working, so that other instance of NetCache on the same database will
227 /// be unable to start.
228 static auto_ptr<CFileLock> s_GuardLock;
229
230 /// manages access to s_IndexDB
231 static CMiniMutex s_IndexLock;
232 /// Index database file
233 static auto_ptr<CNCDBIndexFile> s_IndexDB;
234
235 /// Read-write lock to work with s_DBFiles
236 static CMiniMutex s_DBFilesLock;
237 /// List of all database parts in the storage
238 static TNCDBFilesMap* s_DBFiles = nullptr;
239
240 static Uint4 s_LastFileId = 0;
241
242 /// manages access to s_AllWritings
243 static CMiniMutex s_NextWriteLock;
244 static ENCDBFileType const s_AllFileTypes[]
245 = {eDBFileMeta, eDBFileData, eDBFileMaps
246 /*, eDBFileMoveMeta, eDBFileMoveData, eDBFileMoveMaps*/};
247 static size_t const s_CntAllFiles = sizeof(s_AllFileTypes) / sizeof(s_AllFileTypes[0]);
248 static SWritingInfo s_AllWritings[s_CntAllFiles];
249
250 static Uint4 s_NewFileSize = 0;
251 static TTimeBuckets s_TimeTables;
252 static Int8 s_CurBlobsCnt = 0;
253 static Int8 s_CurKeysCnt = 0;
254 static bool s_Draining = false;
255 static bool s_AbandonDB = false;
256 /// Current size of storage database. Kept here for printing statistics.
257 volatile static Int8 s_CurDBSize = 0;
258 volatile static Int8 s_GarbageSize = 0;
259 static int s_LastFlushTime = 0;
260 /// Internal cache of blobs identification information sorted to be able
261 /// to search by key, subkey and version.
262 static TBucketCacheMap s_BucketsCache;
263
264 #if __NC_CACHEDATA_ALL_MONITOR
265 static TAllCacheBuckets s_AllCache;
266 #endif
267
268 static EStopCause s_IsStopWrite = eNoStop;
269 static bool s_CleanStart = false;
270 static bool s_NeedSaveLogRecNo = false;
271 static bool s_NeedSavePurgeData = false;
272 static int s_WarnLimitOnPct = 0;
273 static int s_WarnLimitOffPct = 0;
274 static int s_MinRecNoSavePeriod = 0;
275 static int s_LastRecNoSaveTime = 0;
276 static CAtomicCounter s_BlobCounter;
277 static Int8 s_ExtraGCOnSize = 0;
278 static Int8 s_ExtraGCOffSize = 0;
279 static Int8 s_StopWriteOnSize = 0;
280 static Int8 s_StopWriteOffSize = 0;
281 static Int8 s_DiskFreeLimit = 0;
282 static Int8 s_DiskCritical = 0;
283 static Uint8 s_MaxBlobSizeStore = 0;
284 static CNewFileCreator* s_NewFileCreator = nullptr;
285 static CDiskFlusher* s_DiskFlusher = nullptr;
286 static CRecNoSaver* s_RecNoSaver = nullptr;
287 static CSpaceShrinker* s_SpaceShrinker = nullptr;
288 static CExpiredCleaner* s_ExpiredCleaner = nullptr;
289 Uint4 s_TaskPriorityWbMemRelease = 10;
290
291
292
293 #define DB_CORRUPTED(msg) \
294 do { \
295 SRV_LOG(Fatal, "Database is corrupted. " << msg \
296 << " Bug, faulty disk or somebody writes to database?"); \
297 abort(); \
298 } while (0) \
299 /**/
300
FindBlob(Uint2 bucket,const string & mask,Uint8 cr_time_hi)301 string CNCBlobStorage::FindBlob(Uint2 bucket, const string& mask, Uint8 cr_time_hi)
302 {
303 string found;
304 Uint8 cur_time = CSrvTime::Current().Sec();
305 TBucketCacheMap::const_iterator bkt = s_BucketsCache.find(bucket);
306 if (bkt != s_BucketsCache.end()) {
307 SNCCacheData search_mask;
308 search_mask.key = mask;
309
310 SBucketCache* cache = bkt->second;
311 cache->lock.Lock();
312 #if __NC_CACHEDATA_INTR_SET
313 TKeyMap::iterator lb = cache->key_map.lower_bound(search_mask);
314 for ( ; lb != cache->key_map.end(); ++lb) {
315 if (strncmp(search_mask.key.data(), lb->key.data(), search_mask.key.size())== 0) {
316 if (lb->expire <= cur_time) {
317 continue;
318 }
319 if (lb->create_time <= cr_time_hi) {
320 found = lb->key;
321 break;
322 }
323 } else {
324 break;
325 }
326 }
327 #else
328 TKeyMap::iterator lb = cache->key_map.lower_bound(&search_mask);
329 for ( ; lb != cache->key_map.end(); ++lb) {
330 if (strncmp(search_mask.key.data(), (*lb)->key.data(), search_mask.key.size())== 0) {
331 if ((*lb)->expire <= cur_time) {
332 continue;
333 }
334 if ((*lb)->create_time <= cr_time_hi) {
335 found = (*lb)->key;
336 break;
337 }
338 } else {
339 break;
340 }
341 }
342 #endif
343 cache->lock.Unlock();
344 }
345 return found;
346 }
347
348 void
GetBList(const string & mask,auto_ptr<TNCBufferType> & buffer,SNCBlobFilter * filters,const string & sep)349 CNCBlobStorage::GetBList(const string& mask, auto_ptr<TNCBufferType>& buffer, SNCBlobFilter* filters, const string& sep)
350 {
351 if (mask.empty()) {
352 return;
353 }
354 SNCCacheData search_mask;
355 const char* mb = mask.data();
356 const char* me = mask.data() + mask.size() - 1;
357 bool show_exp = false;
358 while (*mb == '\"' || *mb == '\'' || *mb == '*') {
359 show_exp = true;
360 ++mb;
361 }
362 while (me > mb && *me == '\1' && *(me-1) == '\1') {
363 --me;
364 }
365 if (me == mb && *me == '\1') {
366 --me;
367 }
368 search_mask.key.assign(mb, me+1-mb);
369
370 Uint8 cur_time = CSrvTime::Current().Sec();
371
372 Uint8 cr_time_lo = ((filters->cr_ago_lt != 0) ? (cur_time - filters->cr_ago_lt) : filters->cr_epoch_ge) * kUSecsPerSecond;
373 Uint8 cr_time_hi = ((filters->cr_ago_ge != 0) ? (cur_time - filters->cr_ago_ge) : filters->cr_epoch_lt) * kUSecsPerSecond;
374 int expire_lo = ((filters->exp_now_ge != 0) ? (cur_time + filters->exp_now_ge) : filters->exp_epoch_ge);
375 int expire_hi = ((filters->exp_now_lt != 0) ? (cur_time + filters->exp_now_lt) : filters->exp_epoch_lt);
376 int vexpire_lo = ((filters->vexp_now_ge != 0) ? (cur_time + filters->vexp_now_ge) : filters->vexp_epoch_ge);
377 int vexpire_hi = ((filters->vexp_now_lt != 0) ? (cur_time + filters->vexp_now_lt) : filters->vexp_epoch_lt);
378 Uint8 size_lo = filters->size_ge;
379 Uint8 size_hi = filters->size_lt;
380 Uint8 create_server = filters->cr_srv;
381 bool extra = cr_time_lo != 0 || cr_time_hi != 0 ||
382 expire_lo != 0 || expire_hi != 0 ||
383 vexpire_lo != 0 || vexpire_hi != 0 ||
384 size_lo != 0 || size_hi != 0 || create_server != 0;
385
386 ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
387 SBucketCache* cache = bkt->second;
388 cache->lock.Lock();
389 #if __NC_CACHEDATA_INTR_SET
390 TKeyMap::iterator lb = cache->key_map.lower_bound(search_mask);
391 for ( ; lb != cache->key_map.end(); ++lb) {
392 // SNCCacheData& d = *lb;
393 if (strncmp(search_mask.key.data(), lb->key.data(), search_mask.key.size())== 0) {
394 if (!show_exp && lb->expire <= cur_time) {
395 continue;
396 }
397 if (!extra || (
398 (lb->create_time >= cr_time_lo && (cr_time_hi == 0 || lb->create_time < cr_time_hi)) &&
399 (lb->expire >= expire_lo && ( expire_hi == 0 || lb->expire < expire_hi)) &&
400 (lb->ver_expire >= vexpire_lo && (vexpire_hi == 0 || lb->ver_expire < vexpire_hi)) &&
401 (lb->size >= size_lo && ( size_hi == 0 || lb->size < size_hi)) &&
402 (create_server == 0 || lb->create_server == create_server))) {
403
404 string bkey( NStr::Replace(lb->key,"\1",sep));
405 buffer->append(bkey.data(), bkey.size());
406 if (extra) {
407 if (cr_time_lo != 0 || cr_time_hi != 0) {
408 buffer->WriteText(sep).WriteText("cr_time=").WriteNumber(lb->create_time/kUSecsPerSecond);
409 }
410 if (expire_lo != 0 || expire_hi != 0) {
411 buffer->WriteText(sep).WriteText("exp=").WriteNumber(lb->expire);
412 }
413 if (vexpire_lo != 0 || vexpire_hi != 0) {
414 buffer->WriteText(sep).WriteText("ver_dead=").WriteNumber(lb->ver_expire);
415 }
416 if (create_server != 0) {
417 buffer->WriteText(sep).WriteText("cr_srv=").WriteNumber(lb->create_server);
418 }
419 if (size_lo != 0 || size_hi != 0) {
420 buffer->WriteText(sep).WriteText("size=").WriteNumber(lb->size);
421 }
422 }
423 buffer->append("\n",1);
424 }
425 continue;
426 }
427 break;
428 }
429 #else
430 TKeyMap::iterator lb = cache->key_map.lower_bound(&search_mask);
431 for ( ; lb != cache->key_map.end(); ++lb) {
432 if (strncmp(search_mask.key.data(), (*lb)->key.data(), search_mask.key.size())== 0) {
433 if (!show_exp && (*lb)->expire <= cur_time) {
434 continue;
435 }
436 if (!extra || (
437 ((*lb)->create_time >= cr_time_lo && (cr_time_hi == 0 || (*lb)->create_time <= cr_time_hi)) &&
438 ((*lb)->expire >= expire_lo && ( expire_hi == 0 || (*lb)->expire <= expire_hi)) &&
439 ((*lb)->ver_expire >= vexpire_lo && (vexpire_hi == 0 || (*lb)->ver_expire <= vexpire_hi)) &&
440 ((*lb)->size >= size_lo && ( size_hi == 0 || (*lb)->size <= size_hi)) &&
441 (create_server == 0 || (*lb)->create_server == create_server))) {
442
443 string bkey( NStr::Replace((*lb)->key,"\1",","));
444 buffer->append(bkey.data(), bkey.size());
445 if (extra) {
446 if (cr_time_lo != 0 || cr_time_hi != 0) {
447 buffer->WriteText(",cr_time=").WriteNumber((*lb)->create_time/kUSecsPerSecond);
448 }
449 if (expire_lo != 0 || expire_hi != 0) {
450 buffer->WriteText(",exp=").WriteNumber((*lb)->expire);
451 }
452 if (vexpire_lo != 0 || vexpire_hi != 0) {
453 buffer->WriteText(",ver_dead=").WriteNumber((*lb)->ver_expire);
454 }
455 if (create_server != 0) {
456 buffer->WriteText(",cr_srv=").WriteNumber((*lb)->create_server);
457 }
458 if (size_lo != 0 || size_hi != 0) {
459 buffer->WriteText(",size=").WriteNumber((*lb)->size);
460 }
461 }
462 buffer->append("\n",1);
463 }
464 continue;
465 }
466 break;
467 }
468 #endif
469 cache->lock.Unlock();
470 }
471
472 }
473
474
475 static inline char*
s_MapFile(TFileHandle fd,size_t file_size)476 s_MapFile(TFileHandle fd, size_t file_size)
477 {
478 char* mem_ptr = NULL;
479 #ifdef NCBI_OS_LINUX
480 file_size = (file_size + kMemPageSize - 1) & kMemPageAlignMask;
481 mem_ptr = (char*)mmap(NULL, file_size, PROT_READ | PROT_WRITE,
482 MAP_SHARED, fd, 0);
483 if (mem_ptr == MAP_FAILED) {
484 SRV_LOG(Critical, "Cannot map file into memory, errno=" << errno);
485 return NULL;
486 }
487 #endif
488 return mem_ptr;
489 }
490
491 static inline void
s_UnmapFile(char * mem_ptr,size_t file_size)492 s_UnmapFile(char* mem_ptr, size_t file_size)
493 {
494 #ifdef NCBI_OS_LINUX
495 file_size = (file_size + kMemPageSize - 1) & kMemPageAlignMask;
496 munmap(mem_ptr, file_size);
497 #endif
498 }
499
500 static inline void
s_LockFileMem(const void * mem_ptr,size_t mem_size)501 s_LockFileMem(const void* mem_ptr, size_t mem_size)
502 {
503 // mlock is not allowed on NCBI servers (limit 32 KB)
504 #if 0
505 //#ifdef NCBI_OS_LINUX
506 int res = mlock(mem_ptr, mem_size);
507 if (res) {
508 SRV_LOG(Critical, "mlock finished with error, errno=" << errno);
509 }
510 #endif
511 }
512
513
514
515 static bool
s_EnsureDirExist(const string & dir_name)516 s_EnsureDirExist(const string& dir_name)
517 {
518 CDir dir(dir_name);
519 if (!dir.Exists()) {
520 return dir.Create();
521 }
522 return true;
523 }
524
525 /// Read from registry only those parameters that can be changed on the
526 /// fly, without re-starting the application.
527 static bool
s_ReadVariableParams(const CNcbiRegistry & reg)528 s_ReadVariableParams(const CNcbiRegistry& reg)
529 {
530 s_GCBatchSize = Uint2(reg.GetInt(kNCStorage_RegSection, kNCStorage_GCBatchParam, 500));
531
532 string str = reg.GetString(kNCStorage_RegSection, kNCStorage_FileSizeParam, "100 MB");
533 s_NewFileSize = Uint4(NStr::StringToUInt8_DataSize(str));
534 s_MaxGarbagePct = reg.GetInt(kNCStorage_RegSection, kNCStorage_GarbagePctParam, 20);
535 str = reg.GetString(kNCStorage_RegSection, kNCStorage_MinDBSizeParam, "1 GB");
536 s_MinDBSize = NStr::StringToUInt8_DataSize(str);
537 s_MinMoveLife = reg.GetInt(kNCStorage_RegSection, kNCStorage_MoveLifeParam, 1000);
538 s_FailedMoveDelay = reg.GetInt(kNCStorage_RegSection, kNCStorage_FailedMoveParam, 10);
539
540 s_MinRecNoSavePeriod = reg.GetInt(kNCStorage_RegSection, kNCStorage_MinRecNoSaveParam, 30);
541 s_FlushTimePeriod = reg.GetInt(kNCStorage_RegSection, kNCStorage_FlushTimeParam, 0);
542
543 s_ExtraGCOnSize = NStr::StringToUInt8_DataSize(reg.GetString(
544 kNCStorage_RegSection, kNCStorage_ExtraGCOnParam, "0"));
545 s_ExtraGCOffSize = NStr::StringToUInt8_DataSize(reg.GetString(
546 kNCStorage_RegSection, kNCStorage_ExtraGCOffParam, "0"));
547 s_StopWriteOnSize = NStr::StringToUInt8_DataSize(reg.GetString(
548 kNCStorage_RegSection, kNCStorage_StopWriteOnParam, "0"));
549 s_StopWriteOffSize = NStr::StringToUInt8_DataSize(reg.GetString(
550 kNCStorage_RegSection, kNCStorage_StopWriteOffParam, "0"));
551 s_DiskFreeLimit = NStr::StringToUInt8_DataSize(reg.GetString(
552 kNCStorage_RegSection, kNCStorage_MinFreeDiskParam, "5 GB"));
553 s_DiskCritical = NStr::StringToUInt8_DataSize(reg.GetString(
554 kNCStorage_RegSection, kNCStorage_DiskCriticalParam, "1 GB"));
555 s_MaxBlobSizeStore = NStr::StringToUInt8_DataSize(reg.GetString(
556 kNCStorage_RegSection, kNCStorage_MaxBlobSizeStore, "1 GB"));
557 if (s_MaxBlobSizeStore > kNCLargestBlobSize) {
558 SRV_LOG(Error, "Parameter " << kNCStorage_MaxBlobSizeStore << " is too large."
559 << " Changing it to " << kNCLargestBlobSize);
560 s_MaxBlobSizeStore = kNCLargestBlobSize;
561 }
562
563 int warn_pct = reg.GetInt(kNCStorage_RegSection, "db_limit_percentage_alert", 65);
564 if (warn_pct <= 0 || warn_pct >= 100) {
565 SRV_LOG(Error, "Parameter db_limit_percentage_alert has wrong value "
566 << warn_pct << ". Assuming it's 65.");
567 warn_pct = 65;
568 }
569 s_WarnLimitOnPct = warn_pct;
570 warn_pct = reg.GetInt(kNCStorage_RegSection, "db_limit_percentage_alert_delta", 5);
571 if (warn_pct <= 0 || warn_pct >= s_WarnLimitOnPct) {
572 SRV_LOG(Error, "Parameter db_limit_percentage_alert_delta has wrong value "
573 << warn_pct << ". Assuming it's 5.");
574 warn_pct = 5;
575 }
576 s_WarnLimitOffPct = s_WarnLimitOnPct - warn_pct;
577
578 SetWBSoftSizeLimit(NStr::StringToUInt8_DataSize(reg.GetString(
579 kNCStorage_RegSection, "write_back_soft_size_limit", "3 GB")));
580 SetWBHardSizeLimit(NStr::StringToUInt8_DataSize(reg.GetString(
581 kNCStorage_RegSection, "write_back_hard_size_limit", "4 GB")));
582
583 int to2 = reg.GetInt(kNCStorage_RegSection, "write_back_timeout", 1000);
584 int to1 = reg.GetInt(kNCStorage_RegSection, "write_back_timeout_startup", to2);
585 SetWBWriteTimeout( CNCServer::IsInitiallySynced() ? to2 : to1, to2);
586 SetWBFailedWriteDelay(reg.GetInt(kNCStorage_RegSection, "write_back_failed_delay", 2));
587 s_TaskPriorityWbMemRelease = reg.GetInt(kNCStorage_RegSection, kNCStorage_WbMemRelease, 10);
588
589 int failed_write = reg.GetInt(kNCStorage_RegSection, kNCStorage_FailedWriteSize, 0);
590 CNCBlobAccessor::SetFailedWriteCount((Uint4)failed_write);
591 return true;
592 }
593
594 /// Read all storage parameters from registry
595 static bool
s_ReadStorageParams(void)596 s_ReadStorageParams(void)
597 {
598 const CNcbiRegistry& reg = CTaskServer::GetConfRegistry();
599
600 s_Path = reg.GetString(kNCStorage_RegSection, kNCStorage_PathParam, "./cache");
601 s_Prefix = reg.GetString(kNCStorage_RegSection, kNCStorage_FilePrefixParam, "nccache");
602 if (s_Path.empty() || s_Prefix.empty()) {
603 SRV_LOG(Critical, "Incorrect parameters for " << kNCStorage_PathParam
604 << " and " << kNCStorage_FilePrefixParam
605 << " in the section '" << kNCStorage_RegSection << "': '"
606 << s_Path << "' and '" << s_Prefix << "'");
607 return false;
608 }
609 if (!s_EnsureDirExist(s_Path)) {
610 SRV_LOG(Critical, "Cannot create directory " << s_Path);
611 return false;
612 }
613 s_GuardName = reg.Get(kNCStorage_RegSection, kNCStorage_GuardNameParam);
614 if (s_GuardName.empty()) {
615 s_GuardName = CDirEntry::MakePath(s_Path,
616 kNCStorage_StartedFileName,
617 s_Prefix);
618 }
619 try {
620 return s_ReadVariableParams(reg);
621 }
622 catch (CStringException& ex) {
623 SRV_LOG(Critical, "Bad configuration: " << ex);
624 return false;
625 }
626 }
627
628 /// Make name of the index file in the storage
629 static string
s_GetIndexFileName(void)630 s_GetIndexFileName(void)
631 {
632 string file_name(s_Prefix);
633 file_name += kNCStorage_IndexFileSuffix;
634 file_name = CDirEntry::MakePath(s_Path, file_name, kNCStorage_DBFileExt);
635 return CDirEntry::CreateAbsolutePath(file_name);
636 }
637
638 /// Make name of file with meta-information in given database part
639 static string
s_GetFileName(Uint4 file_id,ENCDBFileType file_type)640 s_GetFileName(Uint4 file_id, ENCDBFileType file_type)
641 {
642 string file_name(s_Prefix);
643 switch (file_type) {
644 case eDBFileMeta:
645 file_name += kNCStorage_MetaFileSuffix;
646 break;
647 case eDBFileData:
648 file_name += kNCStorage_DataFileSuffix;
649 break;
650 case eDBFileMaps:
651 file_name += kNCStorage_MapsFileSuffix;
652 break;
653 default:
654 SRV_FATAL("Unsupported file type: " << file_type);
655 }
656 file_name += NStr::UIntToString(file_id);
657 file_name = CDirEntry::MakePath(s_Path, file_name, kNCStorage_DBFileExt);
658 return CDirEntry::CreateAbsolutePath(file_name);
659 }
660
661 /// Make sure that current storage database is used only with one instance
662 /// of NetCache. Lock specially named file exclusively and hold the lock
663 /// for all time while process is working. This file also is used for
664 /// determining if previous instance of NetCache was closed gracefully.
665 /// If another instance of NetCache using the database method will throw
666 /// an exception.
667 ///
668 /// @return
669 /// TRUE if guard file existed and was unlocked meaning that previous
670 /// instance of NetCache was terminated inappropriately. FALSE if file
671 /// didn't exist so that this storage instance is a clean start.
672 static bool
s_LockInstanceGuard(void)673 s_LockInstanceGuard(void)
674 {
675 s_CleanStart = !CFile(s_GuardName).Exists();
676
677 try {
678 if (s_CleanStart) {
679 // Just create the file with read and write permissions
680 CFileWriter tmp_writer(s_GuardName, CFileWriter::eOpenAlways);
681 CFile guard_file(s_GuardName);
682 CFile::TMode user_mode, grp_mode, oth_mode;
683
684 guard_file.GetMode(&user_mode, &grp_mode, &oth_mode);
685 user_mode |= CFile::fRead;
686 guard_file.SetMode(user_mode, grp_mode, oth_mode);
687 }
688
689 s_GuardLock.reset(new CFileLock(s_GuardName,
690 CFileLock::fDefault,
691 CFileLock::eExclusive));
692 if (!s_CleanStart && CFile(s_GuardName).GetLength() == 0)
693 s_CleanStart = true;
694 if (!s_CleanStart) {
695 CNCAlerts::Register(CNCAlerts::eStartAfterCrash,
696 "InstanceGuard file " + s_GuardName + " was present on startup");
697 INFO("NetCache wasn't finished cleanly in previous run. "
698 "Will try to work with storage as is.");
699 }
700
701 CFileIO writer;
702 writer.SetFileHandle(s_GuardLock->GetFileHandle());
703 writer.SetFileSize(0, CFileIO::eBegin);
704 string pid(NStr::UInt8ToString(CCurrentProcess::GetPid()));
705 writer.Write(pid.c_str(), pid.size());
706 }
707 catch (CFileErrnoException& ex) {
708 SRV_LOG(Critical, "Can't lock the database (other instance is running?): " << ex);
709 return false;
710 }
711 return true;
712 }
713
714 /// Unlock and delete specially named file used to ensure only one
715 /// instance working with database.
716 /// Small race exists here now which cannot be resolved for Windows
717 /// without re-writing of the mechanism using low-level functions. Race
718 /// can appear like this: if another instance will start right when this
719 /// instance stops then other instance can be thinking that this instance
720 /// terminated incorrectly.
721 static void
s_UnlockInstanceGuard(void)722 s_UnlockInstanceGuard(void)
723 {
724 if (s_GuardLock.get()) {
725 s_GuardLock.reset();
726 if (!s_AbandonDB) {
727 CFile(s_GuardName).Remove();
728 }
729 }
730 }
731
732 /// Open and read index database file
733 static bool
s_OpenIndexDB(void)734 s_OpenIndexDB(void)
735 {
736 string index_name = s_GetIndexFileName();
737 for (int i = 0; i < 2; ++i) {
738 try {
739 s_IndexDB.reset(new CNCDBIndexFile(index_name));
740 s_IndexDB->CreateDatabase();
741 s_IndexDB->GetAllDBFiles(s_DBFiles);
742 ERASE_ITERATE(TNCDBFilesMap, it, (*s_DBFiles)) {
743 CSrvRef<SNCDBFileInfo> info = it->second;
744 Int8 file_size = -1;
745 #ifdef NCBI_OS_LINUX
746 info->fd = open(info->file_name.c_str(), O_RDWR | O_NOATIME);
747 if (info->fd == -1) {
748 SRV_LOG(Critical, "Cannot open storage file, errno=" << errno);
749 delete_file:
750 try {
751 s_IndexDB->DeleteDBFile(info->file_id);
752 }
753 catch (CSQLITE_Exception& ex) {
754 SRV_LOG(Critical, "Error cleaning index file: " << ex);
755 }
756 s_DBFiles->erase(it);
757 continue;
758 }
759 file_size = CFile(info->file_name).GetLength();
760 if (file_size == -1) {
761 SRV_LOG(Critical, "Cannot read file size (errno=" << errno
762 << "). File " << info->file_name
763 << " will be deleted.");
764 goto delete_file;
765 }
766 info->file_size = Uint4(file_size);
767 info->file_map = s_MapFile(info->fd, info->file_size);
768 if (!info->file_map)
769 goto delete_file;
770 if (*(Uint8*)info->file_map == kMetaSignature) {
771 info->file_type = eDBFileMeta;
772 info->type_index = eFileIndexMeta;
773 }
774 else if (*(Uint8*)info->file_map == kDataSignature) {
775 info->file_type = eDBFileData;
776 info->type_index = eFileIndexData;
777 }
778 else if (*(Uint8*)info->file_map == kMapsSignature) {
779 info->file_type = eDBFileMaps;
780 info->type_index = eFileIndexMaps;
781 }
782 else {
783 SRV_LOG(Critical, "Unknown file signature: " << *(Uint8*)info->file_map);
784 goto delete_file;
785 }
786 info->index_head = (SFileIndexRec*)(info->file_map + file_size);
787 --info->index_head;
788 #endif
789 }
790 return true;
791 }
792 catch (CSQLITE_Exception& ex) {
793 s_IndexDB.reset();
794 SRV_LOG(Error, "Index file is broken, reinitializing storage. " << ex);
795 CFile(index_name).Remove();
796 }
797 }
798 SRV_LOG(Critical, "Cannot open or create index file for the storage.");
799 return false;
800 }
801
802 /// Reinitialize database cleaning all data from it.
803 /// Only database is cleaned, internal cache is left intact.
804 static void
s_CleanDatabase(void)805 s_CleanDatabase(void)
806 {
807 CNCAlerts::Register(CNCAlerts::eStorageReinit, "Data storage was reinitialized");
808 INFO("Reinitializing storage " << s_Prefix << " at " << s_Path);
809
810 s_DBFiles->clear();
811 s_IndexDB->DeleteAllDBFiles();
812 }
813
814 /// Check if database need re-initialization depending on different
815 /// parameters and state of the guard file protecting from several
816 /// instances running on the same database.
817 static bool
s_ReinitializeStorage(void)818 s_ReinitializeStorage(void)
819 {
820 try {
821 s_CleanDatabase();
822 return true;
823 }
824 catch (CSQLITE_Exception& ex) {
825 SRV_LOG(Error, "Error in soft reinitialization, trying harder. " << ex);
826 s_IndexDB.reset();
827 CFile(s_GetIndexFileName()).Remove();
828 return s_OpenIndexDB();
829 }
830 }
831
832 static inline bool
s_IsIndexDeleted(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)833 s_IsIndexDeleted(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
834 {
835 Uint4 rec_num = Uint4(file_info->index_head - ind_rec);
836 return ind_rec->next_num == rec_num || ind_rec->prev_num == rec_num;
837 }
838
839 static SFileIndexRec*
s_GetIndOrDeleted(SNCDBFileInfo * file_info,Uint4 rec_num)840 s_GetIndOrDeleted(SNCDBFileInfo* file_info, Uint4 rec_num)
841 {
842 SFileIndexRec* ind_rec = file_info->index_head - rec_num;
843 char* min_ptr = file_info->file_map + kSignatureSize;
844 if ((char*)ind_rec < min_ptr || ind_rec >= file_info->index_head) {
845 DB_CORRUPTED("Bad record number requested, rec_num=" << rec_num
846 << " in file " << file_info->file_name
847 << ". It produces pointer " << (void*)ind_rec
848 << " which is not in the range between " << (void*)min_ptr
849 << " and " << (void*)file_info->index_head << ".");
850 }
851 Uint4 next_num = ACCESS_ONCE(ind_rec->next_num);
852 if ((next_num != 0 && next_num < rec_num) || ind_rec->prev_num > rec_num)
853 {
854 DB_CORRUPTED("Index record " << rec_num
855 << " in file " << file_info->file_name
856 << " contains bad next_num " << next_num
857 << " and/or prev_num " << ind_rec->prev_num << ".");
858 }
859 return ind_rec;
860 }
861
862 static SFileIndexRec*
s_GetIndexRec(SNCDBFileInfo * file_info,Uint4 rec_num)863 s_GetIndexRec(SNCDBFileInfo* file_info, Uint4 rec_num)
864 {
865 SFileIndexRec* ind_rec = s_GetIndOrDeleted(file_info, rec_num);
866 if (s_IsIndexDeleted(file_info, ind_rec)) {
867 DB_CORRUPTED("Index record " << rec_num
868 << " in file " << file_info->file_name
869 << " has been deleted.");
870 }
871 return ind_rec;
872 }
873
874 static SFileIndexRec*
s_GetIndexRecTry(SNCDBFileInfo * file_info,Uint4 rec_num)875 s_GetIndexRecTry(SNCDBFileInfo* file_info, Uint4 rec_num)
876 {
877 SFileIndexRec* ind_rec = s_GetIndOrDeleted(file_info, rec_num);
878 if (s_IsIndexDeleted(file_info, ind_rec)) {
879 return nullptr;
880 }
881 return ind_rec;
882 }
883
884 static void
s_DeleteIndexRecNoLock(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)885 s_DeleteIndexRecNoLock(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
886 {
887 SFileIndexRec* prev_rec;
888 SFileIndexRec* next_rec;
889 if (ind_rec->prev_num == 0) {
890 prev_rec = file_info->index_head;
891 } else {
892 prev_rec = s_GetIndexRec(file_info, ind_rec->prev_num);
893 }
894 if (ind_rec->next_num == 0) {
895 next_rec = file_info->index_head;
896 } else {
897 next_rec = s_GetIndexRec(file_info, ind_rec->next_num);
898 }
899 // These should be in the exactly this order to prevent unrecoverable
900 // corruption.
901 ACCESS_ONCE(prev_rec->next_num) = ind_rec->next_num;
902 ACCESS_ONCE(next_rec->prev_num) = ind_rec->prev_num;
903 ind_rec->next_num = ind_rec->prev_num = Uint4(file_info->index_head - ind_rec);
904 }
905
906 static inline void
s_DeleteIndexRec(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)907 s_DeleteIndexRec(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
908 {
909 file_info->info_lock.Lock();
910 s_DeleteIndexRecNoLock(file_info, ind_rec);
911 file_info->info_lock.Unlock();
912 }
913
914 static void
s_MoveRecToGarbageNoLock(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)915 s_MoveRecToGarbageNoLock(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
916 {
917 Uint4 size = ind_rec->rec_size;
918 if (size & 7)
919 size += 8 - (size & 7);
920 size += sizeof(SFileIndexRec);
921
922 s_DeleteIndexRecNoLock(file_info, ind_rec);
923 if (file_info->used_size < size) {
924 file_info->used_size = 0;
925 } else {
926 file_info->used_size -= size;
927 }
928 file_info->garb_size += size;
929 if (file_info->garb_size > file_info->file_size) {
930 file_info->garb_size = file_info->file_size;
931 }
932 if (file_info->index_head->next_num == 0 && file_info->used_size != 0) {
933 SRV_FATAL("DB file info broken");
934 }
935
936 AtomicAdd(s_GarbageSize,size);
937 }
938
939 static void
s_MoveRecToGarbage(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)940 s_MoveRecToGarbage(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
941 {
942 file_info->info_lock.Lock();
943 s_MoveRecToGarbageNoLock(file_info, ind_rec);
944 file_info->info_lock.Unlock();
945 }
946
947
948 static CSrvRef<SNCDBFileInfo>
s_GetDBFileNoLock(Uint4 file_id)949 s_GetDBFileNoLock(Uint4 file_id)
950 {
951 TNCDBFilesMap::const_iterator it = s_DBFiles->find(file_id);
952 if (it == s_DBFiles->end()) {
953 DB_CORRUPTED("Cannot find file id: " << file_id);
954 }
955 return it->second;
956 }
957
958 static CSrvRef<SNCDBFileInfo>
s_GetDBFile(Uint4 file_id)959 s_GetDBFile(Uint4 file_id)
960 {
961 s_DBFilesLock.Lock();
962 TNCDBFilesMap::const_iterator it = s_DBFiles->find(file_id);
963 if (it == s_DBFiles->end()) {
964 DB_CORRUPTED("Cannot find file id: " << file_id);
965 }
966 CSrvRef<SNCDBFileInfo> file_info = it->second;
967 s_DBFilesLock.Unlock();
968 return file_info;
969 }
970
971 static CSrvRef<SNCDBFileInfo>
s_GetDBFileTry(Uint4 file_id)972 s_GetDBFileTry(Uint4 file_id)
973 {
974 s_DBFilesLock.Lock();
975 TNCDBFilesMap::const_iterator it = s_DBFiles->find(file_id);
976 CSrvRef<SNCDBFileInfo> file_info = (it != s_DBFiles->end()) ? it->second : CSrvRef<SNCDBFileInfo>(nullptr);
977 s_DBFilesLock.Unlock();
978 #ifdef _DEBUG
979 if (file_info.IsNull()) {
980 CNCAlerts::Register(CNCAlerts::eDebugDbFileNotFound, NStr::NumericToString(file_id));
981 }
982 #endif
983 return file_info;
984 }
985
986 static inline Uint1
s_CalcMapDepthImpl(Uint8 size,Uint4 chunk_size,Uint2 map_size)987 s_CalcMapDepthImpl(Uint8 size, Uint4 chunk_size, Uint2 map_size)
988 {
989 Uint1 map_depth = 0;
990 Uint8 cnt_chunks = (size + chunk_size - 1) / chunk_size;
991 while (cnt_chunks > 1 && map_depth <= kNCMaxBlobMapsDepth) {
992 ++map_depth;
993 cnt_chunks = (cnt_chunks + map_size - 1) / map_size;
994 }
995 return map_depth;
996 }
997
998 static Uint1
s_CalcMapDepth(Uint8 size,Uint4 chunk_size,Uint2 map_size)999 s_CalcMapDepth(Uint8 size, Uint4 chunk_size, Uint2 map_size)
1000 {
1001 Uint1 map_depth = s_CalcMapDepthImpl(size, chunk_size, map_size);
1002 if (map_depth > kNCMaxBlobMapsDepth) {
1003 DB_CORRUPTED("Size parameters are resulted in bad map_depth"
1004 << ", size=" << size
1005 << ", chunk_size=" << chunk_size
1006 << ", map_size=" << map_size);
1007 }
1008 return map_depth;
1009 }
1010
1011 static inline Uint2
s_CalcCntMapDowns(Uint4 rec_size)1012 s_CalcCntMapDowns(Uint4 rec_size)
1013 {
1014 SFileChunkMapRec map_rec;
1015 return Uint2((SNCDataCoord*)((char*)&map_rec + rec_size) - map_rec.down_coords);
1016 }
1017
1018 static inline Uint4
s_CalcChunkDataSize(Uint4 rec_size)1019 s_CalcChunkDataSize(Uint4 rec_size)
1020 {
1021 SFileChunkDataRec data_rec;
1022 return Uint4((Uint1*)((char*)&data_rec + rec_size) - data_rec.chunk_data);
1023 }
1024
1025 static inline Uint4
s_CalcMetaRecSize(Uint2 key_size)1026 s_CalcMetaRecSize(Uint2 key_size)
1027 {
1028 SFileMetaRec meta_rec;
1029 return Uint4((char*)&meta_rec.key_data[key_size] - (char*)&meta_rec);
1030 }
1031
1032 static inline Uint4
s_CalcMapRecSize(Uint2 cnt_downs)1033 s_CalcMapRecSize(Uint2 cnt_downs)
1034 {
1035 SFileChunkMapRec map_rec;
1036 return Uint4((char*)&map_rec.down_coords[cnt_downs] - (char*)&map_rec);
1037 }
1038
1039 static inline Uint4
s_CalcChunkRecSize(size_t data_size)1040 s_CalcChunkRecSize(size_t data_size)
1041 {
1042 SFileChunkDataRec data_rec;
1043 return Uint4((char*)&data_rec.chunk_data[data_size] - (char*)&data_rec);
1044 }
1045
1046 static char*
s_CalcRecordAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1047 s_CalcRecordAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1048 {
1049 char* rec_ptr = file_info->file_map + ind_rec->offset;
1050 char* rec_end = rec_ptr + ind_rec->rec_size;
1051 char* min_ptr = file_info->file_map + kSignatureSize;
1052 if (rec_ptr < min_ptr || rec_ptr >= (char*)ind_rec
1053 || rec_end < rec_ptr || rec_end > (char*)ind_rec
1054 || (file_info->index_head - ind_rec == 1 && (char*)rec_ptr != min_ptr))
1055 {
1056 DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1057 << " in file " << file_info->file_name
1058 << " has wrong offset " << ind_rec->offset
1059 << " and/or size " << ind_rec->rec_size
1060 << ". It results in address " << (void*)rec_ptr
1061 << " that doesn't fit between " << (void*)min_ptr
1062 << " and " << (void*)ind_rec << ".");
1063 }
1064 return rec_ptr;
1065 }
1066
1067 static SFileMetaRec*
s_CalcMetaAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1068 s_CalcMetaAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1069 {
1070 if (ind_rec->rec_type != eFileRecMeta) {
1071 DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1072 << " in file " << file_info->file_name
1073 << " has wrong type " << int(ind_rec->rec_type)
1074 << " when should be " << int(eFileRecMeta) << ".");
1075 }
1076 SFileMetaRec* meta_rec = (SFileMetaRec*)s_CalcRecordAddress(file_info, ind_rec);
1077 Uint4 min_size = sizeof(SFileMetaRec) + 1;
1078 if (meta_rec->has_password)
1079 min_size += 16;
1080 if (ind_rec->rec_size < min_size) {
1081 DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1082 << " in file " << file_info->file_name
1083 << " has wrong rec_size " << ind_rec->rec_size
1084 << " for meta record."
1085 << " It's less than minimum " << min_size << ".");
1086 }
1087 return meta_rec;
1088 }
1089
1090 static SFileChunkMapRec*
s_CalcMapAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1091 s_CalcMapAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1092 {
1093 if (ind_rec->rec_type != eFileRecChunkMap) {
1094 DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1095 << " in file " << file_info->file_name
1096 << " has wrong type " << int(ind_rec->rec_type)
1097 << " when should be " << int(eFileRecChunkMap) << ".");
1098 }
1099 char* rec_ptr = s_CalcRecordAddress(file_info, ind_rec);
1100 Uint4 min_size = sizeof(SFileChunkMapRec);
1101 if (ind_rec->rec_size < min_size) {
1102 DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1103 << " in file " << file_info->file_name
1104 << " has wrong rec_size " << ind_rec->rec_size
1105 << " for map record."
1106 << " It's less than minimum " << min_size << ".");
1107 }
1108 return (SFileChunkMapRec*)rec_ptr;
1109 }
1110
1111 static SFileChunkDataRec*
s_CalcChunkAddress(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)1112 s_CalcChunkAddress(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
1113 {
1114 if (ind_rec->rec_type != eFileRecChunkData) {
1115 DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1116 << " in file " << file_info->file_name
1117 << " has wrong type " << int(ind_rec->rec_type)
1118 << " when should be " << int(eFileRecChunkData) << ".");
1119 }
1120 char* rec_ptr = s_CalcRecordAddress(file_info, ind_rec);
1121 Uint4 min_size = sizeof(SFileChunkDataRec);
1122 if (ind_rec->rec_size < min_size) {
1123 DB_CORRUPTED("Index record " << (file_info->index_head - ind_rec)
1124 << " in file " << file_info->file_name
1125 << " has wrong rec_size " << ind_rec->rec_size
1126 << " for data record."
1127 << " It's less than minimum " << min_size << ".");
1128 }
1129 return (SFileChunkDataRec*)rec_ptr;
1130 }
1131
1132 /// Do set of procedures creating and initializing new database part and
1133 /// switching storage to using new database part as current one.
1134 static bool
s_CreateNewFile(size_t type_idx,bool need_lock)1135 s_CreateNewFile(size_t type_idx, bool need_lock)
1136 {
1137 // No need in mutex because m_LastBlob is changed only here and will be
1138 // executed only from one thread.
1139 Uint4 file_id;
1140 if (s_LastFileId == kNCMaxDBFileId)
1141 file_id = 1;
1142 else
1143 file_id = s_LastFileId + 1;
1144 //size_t true_type_idx = type_idx - eFileIndexMoveShift;
1145 size_t true_type_idx = type_idx;
1146 string file_name = s_GetFileName(file_id, s_AllFileTypes[true_type_idx]);
1147 SNCDBFileInfo* file_info = new SNCDBFileInfo();
1148 file_info->file_id = file_id;
1149 file_info->file_name = file_name;
1150 file_info->create_time = CSrvTime::CurSecs();
1151 file_info->file_size = s_NewFileSize;
1152 file_info->file_type = s_AllFileTypes[true_type_idx];
1153 file_info->type_index = EDBFileIndex(true_type_idx);
1154
1155 #ifdef NCBI_OS_LINUX
1156 file_info->fd = open(file_name.c_str(),
1157 O_RDWR | O_CREAT | O_TRUNC | O_NOATIME,
1158 S_IRUSR | S_IWUSR);
1159 if (file_info->fd == -1) {
1160 SRV_LOG(Critical, "Cannot create new storage file, errno=" << errno);
1161 delete file_info;
1162 return false;
1163 }
1164 int trunc_res = ftruncate(file_info->fd, s_NewFileSize);
1165 if (trunc_res != 0) {
1166 SRV_LOG(Critical, "Cannot truncate new file, errno=" << errno);
1167 delete file_info;
1168 return false;
1169 }
1170 file_info->file_map = s_MapFile(file_info->fd, s_NewFileSize);
1171 if (!file_info->file_map) {
1172 delete file_info;
1173 return false;
1174 }
1175 #endif
1176
1177 file_info->index_head = (SFileIndexRec*)(file_info->file_map + file_info->file_size);
1178 --file_info->index_head;
1179 file_info->index_head->next_num = file_info->index_head->prev_num = 0;
1180
1181 s_IndexLock.Lock();
1182 try {
1183 s_IndexDB->NewDBFile(file_id, file_name);
1184 }
1185 catch (CSQLITE_Exception& ex) {
1186 s_IndexLock.Unlock();
1187 SRV_LOG(Critical, "Error while adding new storage file: " << ex);
1188 delete file_info;
1189 return false;
1190 }
1191 s_IndexLock.Unlock();
1192
1193 s_LastFileId = file_id;
1194 switch (true_type_idx) {
1195 case eFileIndexMeta:
1196 *(Uint8*)file_info->file_map = kMetaSignature;
1197 break;
1198 case eFileIndexData:
1199 *(Uint8*)file_info->file_map = kDataSignature;
1200 break;
1201 case eFileIndexMaps:
1202 *(Uint8*)file_info->file_map = kMapsSignature;
1203 break;
1204 default:
1205 SRV_FATAL("Unsupported file type: " << true_type_idx);
1206 }
1207
1208 if (need_lock) {
1209 s_DBFilesLock.Lock();
1210 }
1211 (*s_DBFiles)[file_id] = file_info;
1212 s_NextWriteLock.Lock();
1213 s_AllWritings[type_idx].next_file = file_info;
1214 s_NextWriteLock.Unlock();
1215 if (need_lock) {
1216 s_DBFilesLock.Unlock();
1217 }
1218 AtomicAdd(s_CurDBSize, kSignatureSize + sizeof(SFileIndexRec));
1219
1220 return true;
1221 }
1222
1223 static void
s_DeleteDBFile(const CSrvRef<SNCDBFileInfo> & file_info,bool need_lock)1224 s_DeleteDBFile(const CSrvRef<SNCDBFileInfo>& file_info, bool need_lock)
1225 {
1226 s_IndexLock.Lock();
1227 try {
1228 s_IndexDB->DeleteDBFile(file_info->file_id);
1229 }
1230 catch (CSQLITE_Exception& ex) {
1231 SRV_LOG(Critical, "Index database does not delete rows: " << ex);
1232 }
1233 s_IndexLock.Unlock();
1234
1235 if (need_lock) {
1236 s_DBFilesLock.Lock();
1237 }
1238 SRV_LOG(Info, "Deleted file id: " << file_info->file_id);
1239 s_DBFiles->erase(file_info->file_id);
1240 if (need_lock) {
1241 s_DBFilesLock.Unlock();
1242 }
1243
1244 AtomicSub(s_CurDBSize, file_info->file_size);
1245 AtomicSub(s_GarbageSize, file_info->garb_size);
1246
1247 }
1248
SNCDBFileInfo(void)1249 SNCDBFileInfo::SNCDBFileInfo(void)
1250 : file_map(NULL),
1251 file_id(0),
1252 file_size(0),
1253 garb_size(0),
1254 used_size(0),
1255 index_head(NULL),
1256 is_releasing(false),
1257 fd(0),
1258 create_time(0),
1259 next_shrink_time(0)
1260 {
1261 cnt_unfinished.Set(0);
1262 }
1263
~SNCDBFileInfo(void)1264 SNCDBFileInfo::~SNCDBFileInfo(void)
1265 {
1266 if (file_map)
1267 s_UnmapFile(file_map, file_size);
1268 #ifdef NCBI_OS_LINUX
1269 if (fd && fd != -1 && close(fd)) {
1270 SRV_LOG(Critical, "Error closing file " << file_name
1271 << ", errno=" << errno);
1272 }
1273 if (unlink(file_name.c_str())) {
1274 // if file not found, it is not error
1275 if (errno != ENOENT) {
1276 SRV_LOG(Critical, "Error deleting file " << file_name
1277 << ", errno=" << errno);
1278 }
1279 }
1280 #endif
1281 }
1282
1283 bool
Initialize(bool do_reinit)1284 CNCBlobStorage::Initialize(bool do_reinit)
1285 {
1286 for (size_t i = 0; i < s_CntAllFiles; ++i) {
1287 s_AllWritings[i].cur_file = s_AllWritings[i].next_file = NULL;
1288 }
1289 s_DBFiles = new TNCDBFilesMap();
1290
1291 if (!s_ReadStorageParams())
1292 return false;
1293
1294 if (!s_LockInstanceGuard())
1295 return false;
1296 if (!s_CleanStart) {
1297 do_reinit = true;
1298 }
1299 if (!s_OpenIndexDB())
1300 return false;
1301 if (do_reinit) {
1302 if (!s_ReinitializeStorage())
1303 return false;
1304 s_CleanStart = false;
1305 }
1306
1307 for (Uint2 i = 1; i <= CNCDistributionConf::GetCntTimeBuckets(); ++i) {
1308 s_BucketsCache[i] = new SBucketCache();
1309 s_TimeTables[i] = new STimeTable();
1310 #if __NC_CACHEDATA_ALL_MONITOR
1311 s_AllCache[i] = new SAllCacheTable();
1312 #endif
1313 }
1314
1315 s_BlobCounter.Set(0);
1316 if (!s_DBFiles->empty()) {
1317 int max_create_time = 0;
1318 ITERATE(TNCDBFilesMap, it, (*s_DBFiles)) {
1319 CSrvRef<SNCDBFileInfo> file_info = it->second;
1320 if (file_info->create_time >= max_create_time) {
1321 max_create_time = file_info->create_time;
1322 s_LastFileId = file_info->file_id;
1323 }
1324 }
1325 }
1326
1327 s_NewFileCreator = new CNewFileCreator();
1328 s_DiskFlusher = new CDiskFlusher();
1329 s_RecNoSaver = new CRecNoSaver();
1330 s_SpaceShrinker = new CSpaceShrinker();
1331 s_ExpiredCleaner = new CExpiredCleaner();
1332 CBlobCacher* cacher = new CBlobCacher();
1333 cacher->SetRunnable();
1334
1335 return true;
1336 }
1337
1338 void
Finalize(void)1339 CNCBlobStorage::Finalize(void)
1340 {
1341 s_IndexDB.reset();
1342
1343 s_UnlockInstanceGuard();
1344 }
1345
ReConfig(const CNcbiRegistry & new_reg,string &)1346 bool CNCBlobStorage::ReConfig(const CNcbiRegistry& new_reg, string& /*err_message*/)
1347 {
1348 return s_ReadVariableParams(new_reg);
1349 }
1350
WriteSetup(CSrvSocketTask & task)1351 void CNCBlobStorage::WriteSetup(CSrvSocketTask& task)
1352 {
1353 string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1354 task.WriteText(eol).WriteText(kNCStorage_PathParam ).WriteText(iss).WriteText( s_Path).WriteText(eos);
1355 task.WriteText(eol).WriteText(kNCStorage_FilePrefixParam ).WriteText(iss).WriteText( s_Prefix).WriteText(eos);
1356 task.WriteText(eol).WriteText(kNCStorage_GuardNameParam ).WriteText(iss).WriteText( s_GuardName).WriteText(eos);
1357 task.WriteText(eol).WriteText(kNCStorage_GCBatchParam ).WriteText(is ).WriteNumber( s_GCBatchSize);
1358 task.WriteText(eol).WriteText(kNCStorage_FileSizeParam ).WriteText(str).WriteText(iss)
1359 .WriteText(NStr::UInt8ToString_DataSize( s_NewFileSize)).WriteText(eos);
1360 task.WriteText(eol).WriteText(kNCStorage_FileSizeParam ).WriteText(is ).WriteNumber( s_NewFileSize);
1361 task.WriteText(eol).WriteText(kNCStorage_GarbagePctParam ).WriteText(is ).WriteNumber( s_MaxGarbagePct);
1362 task.WriteText(eol).WriteText(kNCStorage_MinDBSizeParam ).WriteText(str).WriteText(iss)
1363 .WriteText(NStr::UInt8ToString_DataSize( s_MinDBSize)).WriteText(eos);
1364 task.WriteText(eol).WriteText(kNCStorage_MinDBSizeParam ).WriteText(is ).WriteNumber( s_MinDBSize);
1365 task.WriteText(eol).WriteText(kNCStorage_MoveLifeParam ).WriteText(is ).WriteNumber( s_MinMoveLife);
1366 task.WriteText(eol).WriteText(kNCStorage_FailedMoveParam ).WriteText(is ).WriteNumber( s_FailedMoveDelay);
1367 task.WriteText(eol).WriteText(kNCStorage_MinRecNoSaveParam).WriteText(is ).WriteNumber( s_MinRecNoSavePeriod);
1368 task.WriteText(eol).WriteText(kNCStorage_FlushTimeParam ).WriteText(is ).WriteNumber( s_FlushTimePeriod);
1369 task.WriteText(eol).WriteText(kNCStorage_ExtraGCOnParam ).WriteText(is ).WriteNumber( s_ExtraGCOnSize);
1370 task.WriteText(eol).WriteText(kNCStorage_ExtraGCOffParam ).WriteText(is ).WriteNumber( s_ExtraGCOffSize);
1371 task.WriteText(eol).WriteText(kNCStorage_StopWriteOnParam ).WriteText(is ).WriteNumber( s_StopWriteOnSize);
1372 task.WriteText(eol).WriteText(kNCStorage_StopWriteOffParam).WriteText(is ).WriteNumber( s_StopWriteOffSize);
1373 task.WriteText(eol).WriteText(kNCStorage_MinFreeDiskParam ).WriteText(str).WriteText(iss)
1374 .WriteText(NStr::UInt8ToString_DataSize( s_DiskFreeLimit)).WriteText(eos);
1375 task.WriteText(eol).WriteText(kNCStorage_MinFreeDiskParam ).WriteText(is ).WriteNumber( s_DiskFreeLimit);
1376 task.WriteText(eol).WriteText(kNCStorage_DiskCriticalParam).WriteText(str).WriteText(iss)
1377 .WriteText(NStr::UInt8ToString_DataSize( s_DiskCritical)).WriteText(eos);
1378 task.WriteText(eol).WriteText(kNCStorage_DiskCriticalParam).WriteText(is ).WriteNumber( s_DiskCritical);
1379 task.WriteText(eol).WriteText(kNCStorage_MaxBlobSizeStore).WriteText(str).WriteText(iss)
1380 .WriteText(NStr::UInt8ToString_DataSize( s_MaxBlobSizeStore)).WriteText(eos);
1381 task.WriteText(eol).WriteText(kNCStorage_MaxBlobSizeStore).WriteText(is ).WriteNumber( s_MaxBlobSizeStore);
1382 task.WriteText(eol).WriteText("db_limit_percentage_alert" ).WriteText(is ).WriteNumber( s_WarnLimitOnPct);
1383 task.WriteText(eol).WriteText("db_limit_percentage_alert_delta").WriteText(is).WriteNumber(s_WarnLimitOffPct);
1384 task.WriteText(eol).WriteText("write_back_soft_size_limit").WriteText(str).WriteText(iss)
1385 .WriteText(NStr::UInt8ToString_DataSize( GetWBSoftSizeLimit())).WriteText(eos);
1386 task.WriteText(eol).WriteText("write_back_soft_size_limit").WriteText(is ).WriteNumber( GetWBSoftSizeLimit());
1387 task.WriteText(eol).WriteText("write_back_hard_size_limit").WriteText(str).WriteText(iss)
1388 .WriteText(NStr::UInt8ToString_DataSize( GetWBHardSizeLimit())).WriteText(eos);
1389 task.WriteText(eol).WriteText("write_back_hard_size_limit").WriteText(is ).WriteNumber( GetWBHardSizeLimit());
1390 task.WriteText(eol).WriteText("write_back_timeout" ).WriteText(is ).WriteNumber( GetWBWriteTimeout());
1391 task.WriteText(eol).WriteText("write_back_failed_delay" ).WriteText(is ).WriteNumber( GetWBFailedWriteDelay());
1392 task.WriteText(eol).WriteText(kNCStorage_WbMemRelease).WriteText(is).WriteNumber(s_TaskPriorityWbMemRelease);
1393 task.WriteText(eol).WriteText(kNCStorage_FailedWriteSize ).WriteText(is ).WriteNumber( CNCBlobAccessor::GetFailedWriteCount());
1394 }
1395
WriteEnvInfo(CSrvSocketTask & task)1396 void CNCBlobStorage::WriteEnvInfo(CSrvSocketTask& task)
1397 {
1398 string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1399 task.WriteText(eol).WriteText("storagepath" ).WriteText(iss).WriteText( s_Path).WriteText(eos);
1400 task.WriteText(eol).WriteText(kNCStorage_GuardNameParam ).WriteText(iss).WriteText( s_GuardName).WriteText(eos);
1401 task.WriteText(eol).WriteText("DBindex" ).WriteText(iss).WriteText( s_GetIndexFileName()).WriteText(eos);
1402 task.WriteText(eol).WriteText("DBfiles_count").WriteText( is).WriteNumber( CNCBlobStorage::GetNDBFiles());
1403 task.WriteText(eol).WriteText("DBsize" ).WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(s_CurDBSize)).WriteText(eos);
1404 task.WriteText(eol).WriteText("DBgarbage" ).WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(s_GarbageSize)).WriteText(eos);
1405 task.WriteText(eol).WriteText("diskfreespace").WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(GetDiskFree())).WriteText(eos);
1406 task.WriteText(eol).WriteText("IsStopWrite").WriteText( is).WriteNumber( (int)s_IsStopWrite);
1407 }
1408
WriteBlobStat(CSrvSocketTask & task)1409 void CNCBlobStorage::WriteBlobStat(CSrvSocketTask& task)
1410 {
1411 int expire = 0;
1412 Uint8 size = 0;
1413 Uint8 cache_count = 0, timetable_count = 0;
1414 map<Uint4, Uint8> blob_per_file;
1415
1416 ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
1417 SBucketCache* cache = bkt->second;
1418 cache->lock.Lock();
1419 ITERATE(TKeyMap, it, cache->key_map) {
1420 ++cache_count;
1421 #if __NC_CACHEDATA_INTR_SET
1422 size += it->size;
1423 expire = max( expire, it->dead_time);
1424 ++blob_per_file[it->coord.file_id];
1425 #else
1426 size += (*it)->size;
1427 expire = max( expire, (*it)->dead_time);
1428 ++blob_per_file[(*it)->coord.file_id];
1429 #endif
1430 }
1431 cache->lock.Unlock();
1432 }
1433 ITERATE(TTimeBuckets, tt, s_TimeTables) {
1434 STimeTable* table = tt->second;
1435 table->lock.Lock();
1436 timetable_count += table->time_map.size();
1437 table->lock.Unlock();
1438 }
1439
1440 string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1441 task.WriteText(eol).WriteText("CurBlobsCnt").WriteText( is).WriteNumber( s_CurBlobsCnt);
1442 task.WriteText(eol).WriteText("CurKeysCnt").WriteText( is).WriteNumber( s_CurKeysCnt);
1443 task.WriteText(eol).WriteText("Size").WriteText(iss).WriteText(NStr::UInt8ToString_DataSize(size)).WriteText(eos);
1444 task.WriteText(eol).WriteText("InCache_count").WriteText( is).WriteNumber( cache_count);
1445 task.WriteText(eol).WriteText("TimeTable_count").WriteText( is).WriteNumber( timetable_count);
1446 task.WriteText(eol).WriteText("Purge_count").WriteText( is).WriteNumber( CNCBlobAccessor::GetPurgeCount());
1447
1448 #if __NC_CACHEDATA_ALL_MONITOR
1449 size_t ncaches_count = 0;
1450 ITERATE(TAllCacheBuckets, tt, s_AllCache) {
1451 SAllCacheTable* table = tt->second;
1452 table->lock.Lock();
1453 ncaches_count += table->all_cache_set.size();
1454 table->lock.Unlock();
1455 }
1456 task.WriteText(eol).WriteText("caches_count").WriteText( is).WriteNumber( ncaches_count);
1457 #endif
1458 #if __NC_CACHEDATA_MONITOR
1459 s_AllCacheLock.Lock();
1460 size_t all_caches_count = s_AllCacheSet.size();
1461 s_AllCacheLock.Unlock();
1462 task.WriteText(eol).WriteText("all_caches_count").WriteText( is).WriteNumber( all_caches_count);
1463 #endif
1464
1465 char buf[50];
1466 CSrvTime( expire).Print( buf, CSrvTime::eFmtJson);
1467 task.WriteText(eol).WriteText("expiration" ).WriteText(is).WriteText(buf);
1468 map<Uint4, Uint8>::const_iterator b = blob_per_file.begin();
1469 for ( ; b != blob_per_file.end(); ++b) {
1470 task.WriteText(eol);
1471 CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(b->first);
1472 if (file_info.IsNull()) {
1473 if (b->first != 0) {
1474 task.WriteText("Unknown_").WriteNumber(b->first);
1475 } else {
1476 task.WriteText("InMemory");
1477 }
1478 } else {
1479 task.WriteText(file_info->file_name);
1480 }
1481 task.WriteText( is).WriteNumber( b->second);
1482 }
1483 }
1484
WriteBlobList(TNCBufferType & sendBuff,const CTempString & mask)1485 void CNCBlobStorage::WriteBlobList(TNCBufferType& sendBuff, const CTempString& mask)
1486 {
1487 string eol("\n{\""), more(",");
1488
1489 sendBuff.reserve_mem(s_CurBlobsCnt * 250);
1490 sendBuff.append(",\n\"", 3).append("blobs",5).append("\": [",4);
1491
1492 bool is_first = true;
1493
1494 ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
1495 SBucketCache* cache = bkt->second;
1496 cache->lock.Lock();
1497
1498 ITERATE(TKeyMap, it, cache->key_map) {
1499 #if __NC_CACHEDATA_INTR_SET
1500 const SNCCacheData& data = *it;
1501 #else
1502 const SNCCacheData& data = **it;
1503 #endif
1504 CNCBlobKeyLight key(data.key);
1505 if (!mask.empty() && mask != key.Cache()) {
1506 continue;
1507 }
1508
1509 if (is_first) {
1510 is_first = false;
1511 } else {
1512 sendBuff.append(",", 1);
1513 }
1514
1515 sendBuff.append(eol.data(), eol.size());
1516 sendBuff.append(key.Cache().data(), key.Cache().size()).append(":", 1);
1517 sendBuff.append(key.RawKey().data(), key.RawKey().size()).append(":", 1);
1518 sendBuff.append(key.SubKey().data(), key.SubKey().size()).append("\": [",4);
1519 string tmp;
1520 tmp = NStr::NumericToString(data.size); sendBuff.append(tmp.data(), tmp.size());
1521 tmp = NStr::NumericToString(data.create_time / kUSecsPerSecond);
1522 sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1523 tmp = NStr::NumericToString(data.create_server); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1524 tmp = NStr::NumericToString(data.create_id); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1525 tmp = NStr::NumericToString(data.dead_time); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1526 tmp = NStr::NumericToString(data.expire); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1527 tmp = NStr::NumericToString(data.ver_expire); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1528 tmp = NStr::NumericToString(data.coord.file_id); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1529 tmp = NStr::NumericToString(data.coord.rec_num); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1530 tmp = NStr::NumericToString(data.saved_dead_time); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1531 tmp = NStr::NumericToString(data.time_bucket); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1532 tmp = NStr::NumericToString(data.map_size); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1533 tmp = NStr::NumericToString(data.chunk_size); sendBuff.append(more.data(), more.size()).append(tmp.data(), tmp.size());
1534 sendBuff.append("]}", 2);
1535 }
1536 cache->lock.Unlock();
1537 }
1538 sendBuff.append("\n]",2);
1539 }
1540
WriteDbInfo(TNCBufferType & task,const CTempString & mask)1541 void CNCBlobStorage::WriteDbInfo(TNCBufferType& task, const CTempString& mask)
1542 {
1543 Uint4 id = 0;
1544 bool use_mask = false, is_id = false, is_audit = false;
1545 if (!mask.empty()) {
1546 is_audit = mask == "audit";
1547 if (!is_audit) {
1548 use_mask = true;
1549 try {
1550 id = NStr::StringToUInt(mask);
1551 is_id = true;
1552 } catch (...) {
1553 }
1554 }
1555 }
1556
1557 string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
1558 task.WriteText(eol).WriteText("storagepath" ).WriteText(iss).WriteText( s_Path).WriteText(eos);
1559 task.WriteText(eol).WriteText("DBfiles\": [\n");
1560 int now = CSrvTime::CurSecs();
1561 s_DBFilesLock.Lock();
1562 bool is_first = true;
1563 for (TNCDBFilesMap::iterator it = s_DBFiles->begin(); it != s_DBFiles->end(); ++it) {
1564 if (use_mask) {
1565 if (is_id && it->second->file_id != id) {
1566 continue;
1567 }
1568 if (NStr::FindNoCase(it->second->file_name, mask) == NPOS) {
1569 continue;
1570 }
1571 }
1572 if (!is_first) {
1573 task.WriteText(",");
1574 }
1575 is_first = false;
1576 task.WriteText("{\n");
1577 task.WriteText("\"").WriteText(it->second->file_name).WriteText("\": {");
1578 task.WriteText("\n\"").WriteText("file_type").WriteText( is).WriteNumber( (int)(it->second->file_type));
1579 task.WriteText(eol).WriteText("file_id").WriteText( is).WriteNumber( it->second->file_id);
1580 task.WriteText(eol).WriteText("file_size").WriteText( is).WriteNumber( it->second->file_size);
1581 task.WriteText(eol).WriteText("garb_size").WriteText( is).WriteNumber( it->second->garb_size);
1582 task.WriteText(eol).WriteText("used_size").WriteText( is).WriteNumber( it->second->used_size);
1583
1584 if (is_audit) {
1585 CSrvRef<SNCDBFileInfo>& file_info = it->second;
1586 Uint4 expired_count = 0, null_cache_count = 0, wrong_cache_count = 0;
1587 map<int, int> rec_alerts;
1588 map<int, int> rec_count;
1589 set<int> file_ref;
1590 set<const SNCCacheData*> all_cache_data;
1591
1592 ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
1593 SBucketCache* cache = bkt->second;
1594 cache->lock.Lock();
1595 ITERATE(TKeyMap, it, cache->key_map) {
1596 #if __NC_CACHEDATA_INTR_SET
1597 all_cache_data.insert(&(*it));
1598 #else
1599 all_cache_data.insert(*it);
1600 #endif
1601 }
1602 cache->lock.Unlock();
1603 }
1604
1605 // see x_PreCacheRecNums
1606 file_info->info_lock.Lock();
1607 SFileIndexRec* ind_rec = file_info->index_head;
1608 Uint4 prev_rec_num = 0;
1609 char* min_ptr = file_info->file_map + kSignatureSize;
1610 while (ind_rec->next_num != 0) {
1611 Uint4 rec_num = ind_rec->next_num;
1612 if (rec_num <= prev_rec_num) {
1613 ++rec_alerts[0];
1614 ind_rec->next_num = 0;
1615 continue;
1616 }
1617 SFileIndexRec* next_ind = file_info->index_head - rec_num;
1618 if ((char*)next_ind < min_ptr || next_ind >= ind_rec) {
1619 ++rec_alerts[1];
1620 ind_rec->next_num = 0;
1621 continue;
1622 }
1623 char* next_rec_start = file_info->file_map + next_ind->offset;
1624 if (next_rec_start < min_ptr || next_rec_start > (char*)next_ind
1625 || (rec_num == 1 && next_rec_start != min_ptr)) {
1626 ++rec_alerts[2];
1627 ind_rec->next_num = next_ind->next_num;
1628 continue;
1629 }
1630 char* next_rec_end;
1631 next_rec_end = next_rec_start + next_ind->rec_size;
1632 if (next_rec_end < next_rec_start || next_rec_end > (char*)next_ind) {
1633 ++rec_alerts[3];
1634 ind_rec->next_num = next_ind->next_num;
1635 continue;
1636 }
1637 if (next_ind->prev_num != prev_rec_num) {
1638 next_ind->prev_num = prev_rec_num;
1639 ++rec_alerts[4];
1640 }
1641
1642 switch (next_ind->rec_type) {
1643 case eFileRecChunkData:
1644 if (file_info->file_type != eDBFileData) {
1645 ++rec_alerts[5];
1646 ind_rec->next_num = next_ind->next_num;
1647 continue;
1648 }
1649 if (next_ind->rec_size < sizeof(SFileChunkDataRec)) {
1650 ++rec_alerts[6];
1651 ind_rec->next_num = next_ind->next_num;
1652 continue;
1653 }
1654 break;
1655 case eFileRecChunkMap:
1656 if (file_info->file_type != eDBFileMaps) {
1657 ++rec_alerts[7];
1658 ind_rec->next_num = next_ind->next_num;
1659 continue;
1660 }
1661 if (next_ind->rec_size < sizeof(SFileChunkMapRec)) {
1662 ++rec_alerts[8];
1663 ind_rec->next_num = next_ind->next_num;
1664 continue;
1665 }
1666 break;
1667 case eFileRecMeta:
1668 if (file_info->file_type != eDBFileMeta) {
1669 ++rec_alerts[9];
1670 ind_rec->next_num = next_ind->next_num;
1671 continue;
1672 }
1673 SFileMetaRec* meta_rec;
1674 meta_rec = (SFileMetaRec*)next_rec_start;
1675 Uint4 min_rec_size;
1676 // Minimum key length is 2, so we are adding 1 below
1677 min_rec_size = sizeof(SFileChunkMapRec) + 1;
1678 if (meta_rec->has_password)
1679 min_rec_size += 16;
1680 if (next_ind->rec_size < min_rec_size) {
1681 ++rec_alerts[10];
1682 ind_rec->next_num = next_ind->next_num;
1683 continue;
1684 }
1685 break;
1686 default:
1687 ++rec_alerts[11];
1688 ind_rec->next_num = next_ind->next_num;
1689 continue;
1690 }
1691
1692 ++rec_count[next_ind->rec_type];
1693 file_ref.insert( next_ind->chain_coord.file_id);
1694 if (next_ind->rec_type == eFileRecMeta) {
1695 SFileMetaRec* meta_rec = s_CalcMetaAddress(file_info, next_ind);
1696 if (meta_rec->dead_time < now) {
1697 ++expired_count;
1698 }
1699 }
1700 if (next_ind->cache_data == nullptr) {
1701 ++null_cache_count;
1702 } else if (all_cache_data.find(next_ind->cache_data) == all_cache_data.end()) {
1703 ++wrong_cache_count;
1704 }
1705
1706 min_ptr = next_rec_end;
1707 ind_rec = next_ind;
1708 prev_rec_num = rec_num;
1709 }
1710 file_info->info_lock.Unlock();
1711
1712 for(map<int, int>::const_iterator r = rec_count.begin(); r != rec_count.end(); ++r) {
1713 task.WriteText(eol).WriteText("records_type_").WriteNumber(r->first).WriteText(": ").WriteNumber(r->second);
1714 }
1715 for(map<int, int>::const_iterator r = rec_alerts.begin(); r != rec_alerts.end(); ++r) {
1716 task.WriteText(eol).WriteText("alert_type_").WriteNumber(r->first).WriteText(": ").WriteNumber(r->second);
1717 }
1718 if (file_info->file_type == eDBFileMeta) {
1719 task.WriteText(eol).WriteText("rec_expired").WriteText( is).WriteNumber( expired_count);
1720 }
1721 task.WriteText(eol).WriteText("null_cache_count").WriteText( is).WriteNumber(null_cache_count);
1722 task.WriteText(eol).WriteText("wrong_cache_count").WriteText( is).WriteNumber(wrong_cache_count);
1723 task.WriteText(eol).WriteText("ref_files").WriteText( is).WriteText("[");
1724 bool first = true;
1725 for(set<int>::const_iterator f = file_ref.begin(); f != file_ref.end(); ++f) {
1726 task.WriteText(first ? " " : ", ").WriteNumber(*f);
1727 first = false;
1728 }
1729 task.WriteText("]");
1730 } // is_audit
1731
1732 task.WriteText(eol).WriteText("is_releasing").WriteText( is).WriteBool( it->second->is_releasing);
1733 char buf[50];
1734 CSrvTime(it->second->create_time).Print(buf, CSrvTime::eFmtJson);
1735 task.WriteText(eol).WriteText("create_time").WriteText( is).WriteText( buf);
1736 CSrvTime(it->second->next_shrink_time).Print(buf, CSrvTime::eFmtJson);
1737 task.WriteText(eol).WriteText("next_shrink_time").WriteText( is).WriteText( buf);
1738 task.WriteText(eol).WriteText("cnt_unfinished").WriteText( is).WriteNumber( it->second->cnt_unfinished.Get());
1739 task.WriteText("}\n}");
1740 }
1741 s_DBFilesLock.Unlock();
1742 task.WriteText("]");
1743 }
1744
1745 string
PrintablePassword(const string & pass)1746 CNCBlobStorage::PrintablePassword(const string& pass)
1747 {
1748 static const char digits[] = {'0', '1', '2', '3', '4', '5', '6', '7',
1749 '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
1750
1751 string result(32, '\0');
1752 for (int i = 0, j = 0; i < 16; ++i) {
1753 Uint1 c = Uint1(pass[i]);
1754 result[j++] = digits[c >> 4];
1755 result[j++] = digits[c & 0xF];
1756 }
1757 return result;
1758 }
1759
1760 static SBucketCache*
s_GetBucketCache(Uint2 bucket)1761 s_GetBucketCache(Uint2 bucket)
1762 {
1763 TBucketCacheMap::const_iterator it = s_BucketsCache.find(bucket);
1764 if (it == s_BucketsCache.end()) {
1765 SRV_FATAL("Unexpected state");
1766 }
1767 return it->second;
1768 }
1769
1770 static SNCCacheData*
s_GetKeyCacheData(Uint2 time_bucket,const string & key,bool need_create)1771 s_GetKeyCacheData(Uint2 time_bucket, const string& key, bool need_create)
1772 {
1773 SBucketCache* cache = s_GetBucketCache(time_bucket);
1774 SNCCacheData* data = NULL;
1775 cache->lock.Lock();
1776 #if __NC_CACHEDATA_INTR_SET
1777 if (need_create) {
1778 TKeyMap::insert_commit_data commit_data;
1779 pair<TKeyMap::iterator, bool> ins_res
1780 = cache->key_map.insert_unique_check(key, SCacheKeyCompare(), commit_data);
1781 if (ins_res.second) {
1782 data = new SNCCacheData();
1783 data->key = key;
1784 data->time_bucket = time_bucket;
1785 cache->key_map.insert_unique_commit(*data, commit_data);
1786 AtomicAdd(s_CurKeysCnt, 1);
1787
1788 #if __NC_CACHEDATA_ALL_MONITOR
1789 SAllCacheTable* table = s_AllCache[time_bucket];
1790 table->lock.Lock();
1791 table->all_cache_set.insert(data);
1792 table->lock.Unlock();
1793 #endif
1794 }
1795 else {
1796 data = &*ins_res.first;
1797 }
1798 }
1799 else {
1800 TKeyMap::iterator it = cache->key_map.find(key, SCacheKeyCompare());
1801 if (it == cache->key_map.end()) {
1802 data = NULL;
1803 }
1804 else {
1805 data = &*it;
1806 }
1807 }
1808 #else // __NC_CACHEDATA_INTR_SET
1809 SNCCacheData search_mask;
1810 search_mask.key = key;
1811 TKeyMap::iterator it = cache->key_map.find( &search_mask);
1812 if (it != cache->key_map.end()) {
1813 data = *it;
1814 #ifdef _DEBUG
1815 if (data->time_bucket != time_bucket) {
1816 abort();
1817 }
1818 #endif
1819 } else if (need_create) {
1820 data = new SNCCacheData();
1821 data->key = key;
1822 data->time_bucket = time_bucket;
1823 cache->key_map.insert(data);
1824 AtomicAdd(s_CurKeysCnt, 1);
1825
1826 #if __NC_CACHEDATA_ALL_MONITOR
1827 SAllCacheTable* table = s_AllCache[time_bucket];
1828 table->lock.Lock();
1829 table->all_cache_set.insert(data);
1830 table->lock.Unlock();
1831 #endif
1832 }
1833 #endif //__NC_CACHEDATA_INTR_SET
1834 if (data) {
1835 CNCBlobStorage::ReferenceCacheData(data);
1836 }
1837 cache->lock.Unlock();
1838 return data;
1839 }
1840
1841 void
ReferenceCacheData(SNCCacheData * data)1842 CNCBlobStorage::ReferenceCacheData(SNCCacheData* data)
1843 {
1844 data->ref_cnt.Add(1);
1845 }
1846
1847 void
ReleaseCacheData(SNCCacheData * data)1848 CNCBlobStorage::ReleaseCacheData(SNCCacheData* data)
1849 {
1850 if (data->ref_cnt.Get() == 0) {
1851 #ifdef _DEBUG
1852 CNCAlerts::Register(CNCAlerts::eDebugReleaseCacheData1, "ref_cnt is 0");
1853 #endif
1854 return;
1855 }
1856 if (data->ref_cnt.Add(-1) != 0) {
1857 return;
1858 }
1859
1860 Uint2 time_bucket = data->time_bucket;
1861 TBucketCacheMap::const_iterator it = s_BucketsCache.find(time_bucket);
1862 if (it == s_BucketsCache.end()) {
1863 return;
1864 }
1865 SBucketCache* cache = it->second;
1866 cache->lock.Lock();
1867
1868 #ifdef _DEBUG
1869 if (!data->coord.empty() && data->dead_time == 0) {
1870 abort();
1871 }
1872 #endif
1873
1874 if (data->ref_cnt.Get() != 0 || !data->coord.empty()
1875 #if 1
1876 #if __NC_CACHEDATA_INTR_SET
1877 || !((TKeyMapHook*)data)->is_linked()
1878 #endif
1879 #endif
1880 )
1881 {
1882 cache->lock.Unlock();
1883 return;
1884 }
1885 #if __NC_CACHEDATA_INTR_SET
1886 size_t n = cache->key_map.erase(*data);
1887 #else
1888 size_t n = cache->key_map.erase(data);
1889 #endif
1890 cache->lock.Unlock();
1891
1892 #if __NC_CACHEDATA_ALL_MONITOR
1893 SAllCacheTable* table = s_AllCache[time_bucket];
1894 table->lock.Lock();
1895 table->all_cache_set.erase(data);
1896 table->lock.Unlock();
1897 #endif
1898
1899 if (n != 0) {
1900 AtomicSub(s_CurKeysCnt, 1);
1901 data->CallRCU();
1902 }
1903 #ifdef _DEBUG
1904 else {
1905 CNCAlerts::Register(CNCAlerts::eDebugReleaseCacheData2, "nothing erased in key_map");
1906 }
1907 #endif
1908 }
1909
1910 static void
s_InitializeAccessor(CNCBlobAccessor * acessor)1911 s_InitializeAccessor(CNCBlobAccessor* acessor)
1912 {
1913 const string& key = acessor->GetBlobKey();
1914 Uint2 time_bucket = acessor->GetTimeBucket();
1915 bool need_create = acessor->GetAccessType() == eNCCreate
1916 || acessor->GetAccessType() == eNCCopyCreate;
1917 SNCCacheData* data = s_GetKeyCacheData(time_bucket, key, need_create);
1918 acessor->Initialize(data);
1919 if (data) {
1920 CNCBlobStorage::ReleaseCacheData(data);
1921 }
1922 }
1923
1924 CNCBlobAccessor*
GetBlobAccess(ENCAccessType access,const string & key,const string & password,Uint2 time_bucket)1925 CNCBlobStorage::GetBlobAccess(ENCAccessType access,
1926 const string& key,
1927 const string& password,
1928 Uint2 time_bucket)
1929 {
1930 CNCBlobAccessor* accessor = new CNCBlobAccessor();
1931 accessor->Prepare(key, password, time_bucket, access);
1932 s_InitializeAccessor(accessor);
1933 return accessor;
1934 }
1935
1936 static inline void
s_SwitchToNextFile(SWritingInfo & w_info)1937 s_SwitchToNextFile(SWritingInfo& w_info)
1938 {
1939 w_info.cur_file = w_info.next_file;
1940 w_info.next_file = NULL;
1941 w_info.next_rec_num = 1;
1942 w_info.next_offset = kSignatureSize;
1943 w_info.left_file_size = w_info.cur_file->file_size
1944 - (kSignatureSize + sizeof(SFileIndexRec));
1945 }
1946
1947 static bool
s_GetNextWriteCoord(EDBFileIndex file_index,Uint4 rec_size,SNCDataCoord & coord,CSrvRef<SNCDBFileInfo> & file_info,SFileIndexRec * & ind_rec)1948 s_GetNextWriteCoord(EDBFileIndex file_index,
1949 Uint4 rec_size,
1950 SNCDataCoord& coord,
1951 CSrvRef<SNCDBFileInfo>& file_info,
1952 SFileIndexRec*& ind_rec)
1953 {
1954 Uint4 true_rec_size = rec_size;
1955 if (true_rec_size & 7)
1956 true_rec_size += 8 - (true_rec_size & 7);
1957 Uint4 reserve_size = true_rec_size + sizeof(SFileIndexRec);
1958
1959 bool need_signal_switch = false;
1960 s_NextWriteLock.Lock();
1961 SWritingInfo& w_info = s_AllWritings[file_index];
1962
1963 if (w_info.cur_file == NULL) {
1964 s_NextWriteLock.Unlock();
1965 return false;
1966 }
1967 if (w_info.left_file_size < reserve_size && w_info.next_file == NULL) {
1968 s_NextWriteLock.Unlock();
1969 return false;
1970 }
1971 if (w_info.left_file_size < reserve_size) {
1972 AtomicAdd(s_CurDBSize, w_info.left_file_size);
1973 AtomicAdd(s_GarbageSize,w_info.left_file_size);
1974 w_info.cur_file->info_lock.Lock();
1975 w_info.cur_file->garb_size += w_info.left_file_size;
1976 if (w_info.cur_file->used_size
1977 + w_info.cur_file->garb_size >= w_info.cur_file->file_size)
1978 {
1979 SRV_FATAL("WritingInfo broken");
1980 }
1981 w_info.cur_file->info_lock.Unlock();
1982
1983 Uint4 last_rec_num = w_info.next_rec_num - 1;
1984 SFileIndexRec* last_ind = w_info.cur_file->index_head - last_rec_num;
1985 s_LockFileMem(last_ind, (last_rec_num + 1) * sizeof(SFileIndexRec));
1986
1987 s_SwitchToNextFile(w_info);
1988 need_signal_switch = true;
1989 }
1990 file_info = w_info.cur_file;
1991 coord.file_id = file_info->file_id;
1992 coord.rec_num = w_info.next_rec_num++;
1993 ind_rec = file_info->index_head - coord.rec_num;
1994 ind_rec->offset = w_info.next_offset;
1995 ind_rec->rec_size = rec_size;
1996 ind_rec->rec_type = eFileRecNone;
1997 ind_rec->cache_data = NULL;
1998 ind_rec->chain_coord.clear();
1999
2000 w_info.next_offset += true_rec_size;
2001 w_info.left_file_size -= reserve_size;
2002 AtomicAdd(s_CurDBSize, reserve_size);
2003
2004 file_info->cnt_unfinished.Add(1);
2005
2006 file_info->info_lock.Lock();
2007
2008 file_info->used_size += reserve_size;
2009 SFileIndexRec* index_head = file_info->index_head;
2010 Uint4 prev_rec_num = index_head->prev_num;
2011 SFileIndexRec* prev_ind_rec = index_head - prev_rec_num;
2012 if (prev_ind_rec->next_num != 0) {
2013 SRV_FATAL("File index broken");
2014 }
2015 ind_rec->prev_num = prev_rec_num;
2016 ind_rec->next_num = 0;
2017 prev_ind_rec->next_num = coord.rec_num;
2018 index_head->prev_num = coord.rec_num;
2019
2020 file_info->info_lock.Unlock();
2021 s_NextWriteLock.Unlock();
2022
2023 if (need_signal_switch)
2024 s_NewFileCreator->SetRunnable();
2025
2026 return true;
2027 }
2028
2029 bool
ReadBlobInfo(SNCBlobVerData * ver_data)2030 CNCBlobStorage::ReadBlobInfo(SNCBlobVerData* ver_data)
2031 {
2032 if (ver_data->coord.empty()) {
2033 #ifdef _DEBUG
2034 CNCAlerts::Register(CNCAlerts::eDebugReadBlobInfoFailed0, "ReadBlobInfo: coords empty");
2035 #endif
2036 return false;
2037 }
2038 CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(ver_data->coord.file_id);
2039 if (file_info.IsNull()) {
2040 #ifdef _DEBUG
2041 CNCAlerts::Register(CNCAlerts::eDebugReadBlobInfoFailed1, "ReadBlobInfo: file_info");
2042 #endif
2043 return false;
2044 }
2045 SFileIndexRec* ind_rec = s_GetIndexRecTry(file_info, ver_data->coord.rec_num);
2046 if (!ind_rec) {
2047 #ifdef _DEBUG
2048 CNCAlerts::Register(CNCAlerts::eDebugReadBlobInfoFailed2, "ReadBlobInfo: ind_rec");
2049 #endif
2050 return false;
2051 }
2052 if (ind_rec->cache_data != ver_data->ver_manager->GetCacheData()) {
2053 DB_CORRUPTED("Index record " << ver_data->coord.rec_num
2054 << " in file " << file_info->file_name
2055 << " has wrong cache_data pointer " << ind_rec->cache_data
2056 << " when should be " << ver_data->ver_manager->GetCacheData() << ".");
2057 }
2058 SFileMetaRec* meta_rec = s_CalcMetaAddress(file_info, ind_rec);
2059 ver_data->create_time = meta_rec->create_time;
2060 ver_data->ttl = meta_rec->ttl;
2061 ver_data->expire = meta_rec->expire;
2062 ver_data->dead_time = meta_rec->dead_time;
2063 ver_data->size = meta_rec->size;
2064 if (meta_rec->has_password)
2065 ver_data->password.assign(meta_rec->key_data, 16);
2066 else
2067 ver_data->password.clear();
2068 ver_data->blob_ver = meta_rec->blob_ver;
2069 ver_data->ver_ttl = meta_rec->ver_ttl;
2070 ver_data->ver_expire = meta_rec->ver_expire;
2071 ver_data->create_id = meta_rec->create_id;
2072 ver_data->create_server = meta_rec->create_server;
2073 ver_data->data_coord = ind_rec->chain_coord;
2074
2075 ver_data->map_depth = s_CalcMapDepth(ver_data->size,
2076 ver_data->chunk_size,
2077 ver_data->map_size);
2078 return true;
2079 }
2080
2081 static bool
s_UpdateUpCoords(SFileChunkMapRec * map_rec,SFileIndexRec * map_ind,SNCDataCoord coord)2082 s_UpdateUpCoords(SFileChunkMapRec* map_rec,
2083 SFileIndexRec* map_ind,
2084 SNCDataCoord coord)
2085 {
2086 Uint2 cnt_downs = s_CalcCntMapDowns(map_ind->rec_size);
2087 for (Uint2 i = 0; i < cnt_downs; ++i) {
2088 SNCDataCoord down_coord = map_rec->down_coords[i];
2089 CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(down_coord.file_id);
2090 if (file_info.IsNull()) {
2091 #ifdef _DEBUG
2092 CNCAlerts::Register(CNCAlerts::eDebugUpdateUpCoords1, "UpdateUpCoords: file_info");
2093 #endif
2094 return false;
2095 }
2096 SFileIndexRec* ind_rec = s_GetIndexRecTry(file_info, down_coord.rec_num);
2097 if (!ind_rec) {
2098 #ifdef _DEBUG
2099 CNCAlerts::Register(CNCAlerts::eDebugUpdateUpCoords2, "UpdateUpCoords: ind_rec");
2100 #endif
2101 return false;
2102 }
2103 ind_rec->chain_coord = coord;
2104 }
2105 return true;
2106 }
2107
2108 static bool
s_SaveOneMapImpl(SNCChunkMapInfo * save_map,SNCChunkMapInfo * up_map,Uint2 cnt_downs,Uint2 map_size,Uint1 map_depth,SNCCacheData * cache_data)2109 s_SaveOneMapImpl(SNCChunkMapInfo* save_map,
2110 SNCChunkMapInfo* up_map,
2111 Uint2 cnt_downs,
2112 Uint2 map_size,
2113 Uint1 map_depth,
2114 SNCCacheData* cache_data)
2115 {
2116 SNCDataCoord map_coord;
2117 CSrvRef<SNCDBFileInfo> map_file;
2118 SFileIndexRec* map_ind;
2119 Uint4 rec_size = s_CalcMapRecSize(cnt_downs);
2120 if (!s_GetNextWriteCoord(eFileIndexMaps, rec_size, map_coord, map_file, map_ind)) {
2121 #ifdef _DEBUG
2122 CNCAlerts::Register(CNCAlerts::eDebugSaveOneMapImpl1,"s_GetNextWriteCoord failed");
2123 #endif
2124 return false;
2125 }
2126
2127 map_ind->rec_type = eFileRecChunkMap;
2128 map_ind->cache_data = cache_data;
2129 SFileChunkMapRec* map_rec = s_CalcMapAddress(map_file, map_ind);
2130 map_rec->map_idx = save_map->map_idx;
2131 map_rec->map_depth = map_depth;
2132 size_t coords_size = cnt_downs * sizeof(map_rec->down_coords[0]);
2133 memcpy(map_rec->down_coords, save_map->coords, coords_size);
2134 #if 0
2135 cout << "s_SaveOneMapImpl: "
2136 << map_coord.file_id << "[" << map_coord.rec_num << "] ="
2137 << " cnt_downs=" << cnt_downs
2138 << " map_idx=" << map_rec->map_idx
2139 << " map_depth=" << (int)(map_rec->map_depth)
2140 << " down_coords=";
2141 for (Uint2 c=0; c<cnt_downs; ++c) {
2142 cout << " " << map_rec->down_coords[c].file_id
2143 << "[" << map_rec->down_coords[c].rec_num
2144 << "]";
2145 }
2146 cout << endl;
2147 #endif
2148
2149 up_map->coords[save_map->map_idx] = map_coord;
2150 ++save_map->map_idx;
2151 memset(save_map->coords, 0, map_size * sizeof(save_map->coords[0]));
2152 if (!s_UpdateUpCoords(map_rec, map_ind, map_coord)) {
2153 #ifdef _DEBUG
2154 CNCAlerts::Register(CNCAlerts::eDebugSaveOneMapImpl2,"s_UpdateUpCoords failed");
2155 #endif
2156 return false;
2157 }
2158
2159 map_file->cnt_unfinished.Add(-1);
2160
2161 return true;
2162 }
2163
2164 static bool
s_SaveChunkMap(SNCBlobVerData * ver_data,SNCCacheData * cache_data,SNCChunkMapInfo ** maps,Uint2 cnt_downs,bool save_all_deps)2165 s_SaveChunkMap(SNCBlobVerData* ver_data,
2166 SNCCacheData* cache_data,
2167 SNCChunkMapInfo** maps,
2168 Uint2 cnt_downs,
2169 bool save_all_deps)
2170 {
2171 if (!s_SaveOneMapImpl(maps[0], maps[1], cnt_downs, ver_data->map_size, 1, cache_data)) {
2172 return false;
2173 }
2174
2175 Uint1 cur_level = 0;
2176 // changed 11may17
2177 #if 0
2178 // prev ver
2179 while (cur_level < kNCMaxBlobMapsDepth
2180 && (maps[cur_level]->map_idx == ver_data->map_size
2181 || (maps[cur_level]->map_idx > 1 && save_all_deps)))
2182 {
2183 cnt_downs = maps[cur_level]->map_idx;
2184 ++cur_level;
2185 if (!s_SaveOneMapImpl(maps[cur_level], maps[cur_level + 1], cnt_downs,
2186 ver_data->map_size, cur_level + 1, cache_data))
2187 {
2188 return false;
2189 }
2190 }
2191 #else
2192 // new ver
2193 Uint1 depth = s_CalcMapDepthImpl(ver_data->size,
2194 ver_data->chunk_size,
2195 ver_data->map_size);
2196 Uint2 prev_idx = maps[cur_level]->map_idx;
2197 while (depth > 1 && (cur_level+1) < kNCMaxBlobMapsDepth
2198 && (maps[cur_level]->map_idx == ver_data->map_size
2199 || (prev_idx > 0 && save_all_deps)))
2200 {
2201 cnt_downs = maps[cur_level]->map_idx;
2202 ++cur_level;
2203 prev_idx = maps[cur_level]->map_idx;
2204 if (!s_SaveOneMapImpl(maps[cur_level], maps[cur_level + 1], cnt_downs,
2205 ver_data->map_size, cur_level + 1, cache_data))
2206 {
2207 return false;
2208 }
2209 }
2210 #endif
2211
2212 return true;
2213 }
2214
2215 bool
WriteBlobInfo(const string & blob_key,SNCBlobVerData * ver_data,SNCChunkMaps * maps,Uint8 cnt_chunks,SNCCacheData * cache_data)2216 CNCBlobStorage::WriteBlobInfo(const string& blob_key,
2217 SNCBlobVerData* ver_data,
2218 SNCChunkMaps* maps,
2219 Uint8 cnt_chunks,
2220 SNCCacheData* cache_data)
2221 {
2222 SNCDataCoord old_coord = ver_data->coord;
2223 SNCDataCoord down_coord = ver_data->data_coord;
2224 if (old_coord.empty()) {
2225 if (ver_data->size > ver_data->chunk_size) {
2226 Uint2 last_chunks_cnt = Uint2((cnt_chunks - 1) % ver_data->map_size) + 1;
2227 if (!s_SaveChunkMap(ver_data, cache_data, maps->maps, last_chunks_cnt, true)) {
2228 #ifdef _DEBUG
2229 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo1, "WriteBlobInfo: s_SaveChunkMap");
2230 #endif
2231 return false;
2232 }
2233
2234 for (Uint1 i = 1; i <= kNCMaxBlobMapsDepth; ++i) {
2235 if (!maps->maps[i]->coords[0].empty()) {
2236 down_coord = maps->maps[i]->coords[0];
2237 maps->maps[i]->coords[0].clear();
2238 ver_data->map_depth = i;
2239 break;
2240 }
2241 }
2242 if (down_coord.empty()) {
2243 SRV_FATAL("Blob coords broken");
2244 }
2245 }
2246 else if (ver_data->size != 0) {
2247 down_coord = maps->maps[0]->coords[0];
2248 maps->maps[0]->coords[0].clear();
2249 ver_data->map_depth = 0;
2250 }
2251 }
2252
2253 Uint2 key_size = Uint2(blob_key.size());
2254 if (!ver_data->password.empty())
2255 key_size += 16;
2256 Uint4 rec_size = s_CalcMetaRecSize(key_size);
2257 SNCDataCoord meta_coord;
2258 CSrvRef<SNCDBFileInfo> meta_file;
2259 SFileIndexRec* meta_ind;
2260 if (!s_GetNextWriteCoord(eFileIndexMeta, rec_size, meta_coord, meta_file, meta_ind)) {
2261 #ifdef _DEBUG
2262 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo2, "WriteBlobInfo: s_GetNextWriteCoord");
2263 #endif
2264 return false;
2265 }
2266
2267 meta_ind->rec_type = eFileRecMeta;
2268 meta_ind->cache_data = cache_data;
2269 meta_ind->chain_coord = down_coord;
2270 #if 0
2271 cout << "WriteBlobInfo:" << " coord = " << meta_ind->chain_coord.file_id
2272 << "[" << meta_ind->chain_coord.rec_num << "]" << endl;
2273 #endif
2274
2275 SFileMetaRec* meta_rec = s_CalcMetaAddress(meta_file, meta_ind);
2276 meta_rec->size = ver_data->size;
2277 meta_rec->create_time = ver_data->create_time;
2278 meta_rec->create_server = ver_data->create_server;
2279 meta_rec->create_id = ver_data->create_id;
2280 meta_rec->dead_time = ver_data->dead_time;
2281 meta_rec->ttl = ver_data->ttl;
2282 meta_rec->expire = ver_data->expire;
2283 meta_rec->blob_ver = ver_data->blob_ver;
2284 meta_rec->ver_ttl = ver_data->ver_ttl;
2285 meta_rec->ver_expire = ver_data->ver_expire;
2286 meta_rec->map_size = ver_data->map_size;
2287 meta_rec->chunk_size = ver_data->chunk_size;
2288 char* key_data = meta_rec->key_data;
2289 if (ver_data->password.empty()) {
2290 meta_rec->has_password = 0;
2291 }
2292 else {
2293 meta_rec->has_password = 1;
2294 memcpy(key_data, ver_data->password.data(), 16);
2295 key_data += 16;
2296 }
2297 memcpy(key_data, blob_key.data(), blob_key.size());
2298
2299 ver_data->coord = meta_coord;
2300 ver_data->data_coord = down_coord;
2301
2302 if (!down_coord.empty()) {
2303 CSrvRef<SNCDBFileInfo> down_file = s_GetDBFileTry(down_coord.file_id);
2304 if (down_file.IsNull()) {
2305 #ifdef _DEBUG
2306 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo3, "WriteBlobInfo: down_file");
2307 #endif
2308 return false;
2309 }
2310 SFileIndexRec* down_ind = s_GetIndexRecTry(down_file, down_coord.rec_num);
2311 if (!down_ind) {
2312 #ifdef _DEBUG
2313 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfo4, "WriteBlobInfo: down_ind");
2314 #endif
2315 return false;
2316 }
2317 down_ind->chain_coord = meta_coord;
2318 }
2319
2320 if (!old_coord.empty()) {
2321 CSrvRef<SNCDBFileInfo> old_file = s_GetDBFileTry(old_coord.file_id);
2322 if (old_file.NotNull()) {
2323 SFileIndexRec* old_ind = s_GetIndexRecTry(old_file, old_coord.rec_num);
2324 if (old_ind) {
2325 // Check that meta-data wasn't corrupted.
2326 s_CalcMetaAddress(old_file, old_ind);
2327 s_MoveRecToGarbage(old_file, old_ind);
2328 }
2329 }
2330 }
2331
2332 meta_file->cnt_unfinished.Add(-1);
2333
2334 return true;
2335 }
2336
2337 static void
s_MoveDataToGarbage(SNCDataCoord coord,Uint1 map_depth,SNCDataCoord up_coord,bool need_lock)2338 s_MoveDataToGarbage(SNCDataCoord coord, Uint1 map_depth, SNCDataCoord up_coord, bool need_lock)
2339 {
2340 if (coord.empty()) {
2341 return;
2342 }
2343 CSrvRef<SNCDBFileInfo> file_info = need_lock ? s_GetDBFileTry(coord.file_id) : s_GetDBFileNoLock(coord.file_id);
2344 if (file_info.IsNull()) {
2345 return;
2346 }
2347 SFileIndexRec* ind_rec = s_GetIndexRecTry(file_info, coord.rec_num);
2348 if (!ind_rec) {
2349 return;
2350 }
2351 if (ind_rec->chain_coord != up_coord) {
2352 DB_CORRUPTED("Index record " << coord.rec_num
2353 << " in file " << file_info->file_name
2354 << " has wrong up_coord " << ind_rec->chain_coord
2355 << " when should be " << up_coord << ".");
2356 }
2357 if (ind_rec->rec_type == eFileRecChunkData) {
2358 s_CalcChunkAddress(file_info, ind_rec);
2359 s_MoveRecToGarbage(file_info, ind_rec);
2360 }
2361 else if (ind_rec->rec_type == eFileRecChunkMap) {
2362 Uint2 cnt_downs = s_CalcCntMapDowns(ind_rec->rec_size);
2363 if (cnt_downs != 0) {
2364 SFileChunkMapRec* map_rec = s_CalcMapAddress(file_info, ind_rec);
2365 for (Uint2 i = 0; i < cnt_downs; ++i) {
2366 s_MoveDataToGarbage(map_rec->down_coords[i], map_depth - 1, coord, need_lock);
2367 }
2368 }
2369 s_MoveRecToGarbage(file_info, ind_rec);
2370 }
2371 }
2372
2373 void
DeleteBlobInfo(const SNCBlobVerData * ver_data,SNCChunkMaps * maps)2374 CNCBlobStorage::DeleteBlobInfo(const SNCBlobVerData* ver_data,
2375 SNCChunkMaps* maps)
2376 {
2377 if (!ver_data->coord.empty()) {
2378 CSrvRef<SNCDBFileInfo> meta_file = s_GetDBFileTry(ver_data->coord.file_id);
2379 if (meta_file.NotNull()) {
2380 SFileIndexRec* meta_ind = s_GetIndexRecTry(meta_file, ver_data->coord.rec_num);
2381 if (meta_ind) {
2382 s_CalcMetaAddress(meta_file, meta_ind);
2383 if (ver_data->size != 0) {
2384 s_MoveDataToGarbage(ver_data->data_coord,
2385 ver_data->map_depth,
2386 ver_data->coord, true);
2387 }
2388 s_MoveRecToGarbage(meta_file, meta_ind);
2389 }
2390 }
2391 }
2392 if (maps) {
2393 SNCDataCoord empty_coord;
2394 empty_coord.clear();
2395 for (Uint1 depth = 0; depth <= kNCMaxBlobMapsDepth; ++depth) {
2396 Uint2 idx = 0;
2397 while (idx < ver_data->map_size
2398 && !maps->maps[depth]->coords[idx].empty())
2399 {
2400 s_MoveDataToGarbage(maps->maps[depth]->coords[idx], depth, empty_coord, true);
2401 ++idx;
2402 }
2403 }
2404 }
2405 }
2406
2407 static bool
s_ReadMapInfo(SNCDataCoord map_coord,SNCChunkMapInfo * map_info,Uint1 map_depth)2408 s_ReadMapInfo(SNCDataCoord map_coord,
2409 SNCChunkMapInfo* map_info,
2410 Uint1 map_depth/*,
2411 SNCDataCoord up_coord*/)
2412 {
2413 CSrvRef<SNCDBFileInfo> map_file = s_GetDBFileTry(map_coord.file_id);
2414 if (map_file.IsNull()) {
2415 #ifdef _DEBUG
2416 CNCAlerts::Register(CNCAlerts::eDebugReadMapInfo1, "ReadMapInfo: map_file");
2417 #endif
2418 return false;
2419 }
2420 SFileIndexRec* map_ind = s_GetIndexRecTry(map_file, map_coord.rec_num);
2421 if (!map_ind) {
2422 #ifdef _DEBUG
2423 CNCAlerts::Register(CNCAlerts::eDebugReadMapInfo2, "ReadMapInfo: map_inf");
2424 #endif
2425 return false;
2426 }
2427 /*
2428 This is an incorrect check because it can race with
2429 CNCBlobVerManager::x_UpdateVersion
2430
2431 if (map_ind->chain_coord != up_coord) {
2432 DB_CORRUPTED("Index record " << map_coord.rec_num
2433 << " in file " << map_file->file_name
2434 << " has wrong up_coord " << map_ind->chain_coord
2435 << " when should be " << up_coord << ".");
2436 }
2437 */
2438 SFileChunkMapRec* map_rec = s_CalcMapAddress(map_file, map_ind);
2439 if (map_rec->map_idx != map_info->map_idx || map_rec->map_depth != map_depth)
2440 {
2441 DB_CORRUPTED("Map at coord " << map_coord
2442 << " has wrong index " << map_rec->map_idx
2443 << " and/or depth " << map_rec->map_depth
2444 << " when it should be " << map_info->map_idx
2445 << " and " << map_depth << ".");
2446 }
2447 memcpy(map_info->coords, map_rec->down_coords,
2448 s_CalcCntMapDowns(map_ind->rec_size) * sizeof(map_rec->down_coords[0]));
2449 return true;
2450 }
2451
2452 bool
ReadChunkData(SNCBlobVerData * ver_data,SNCChunkMaps * maps,Uint8 chunk_num,char * & buffer,Uint4 & buf_size)2453 CNCBlobStorage::ReadChunkData(SNCBlobVerData* ver_data,
2454 SNCChunkMaps* maps,
2455 Uint8 chunk_num,
2456 char*& buffer,
2457 Uint4& buf_size)
2458 {
2459 Uint2 map_idx[kNCMaxBlobMapsDepth] = {0};
2460 Uint1 cur_index = 0;
2461 Uint8 level_num = chunk_num;
2462 do {
2463 map_idx[cur_index] = Uint2(level_num % ver_data->map_size);
2464 level_num /= ver_data->map_size;
2465 ++cur_index;
2466 }
2467 while (level_num != 0);
2468
2469 SNCDataCoord map_coord/*, up_coord*/;
2470 for (Uint1 depth = ver_data->map_depth; depth > 0; ) {
2471 Uint2 this_index = map_idx[depth];
2472 --depth;
2473 SNCChunkMapInfo* this_map = maps->maps[depth];
2474 if (this_map->map_idx == this_index
2475 && !this_map->coords[0].empty()
2476 && this_map->map_counter == ver_data->map_move_counter)
2477 {
2478 continue;
2479 }
2480
2481 this_map->map_counter = ver_data->map_move_counter;
2482 this_map->map_idx = this_index;
2483
2484 if (depth + 1 == ver_data->map_depth) {
2485 //up_coord = ver_data->coord;
2486 map_coord = ver_data->data_coord;
2487 }
2488 else {
2489 /*if (depth + 2 == ver_data->map_depth)
2490 up_coord = ver_data->data_coord;
2491 else
2492 up_coord = maps->maps[depth + 2]->coords[map_idx[depth + 2]];*/
2493 map_coord = maps->maps[depth + 1]->coords[this_index];
2494 }
2495
2496 if (!s_ReadMapInfo(map_coord, this_map, depth + 1/*, up_coord*/)) {
2497 #ifdef _DEBUG
2498 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData1, "ReadChunkData: ReadMapInfo");
2499 #endif
2500 return false;
2501 }
2502
2503 while (depth > 0) {
2504 this_index = map_idx[depth];
2505 --depth;
2506 this_map = maps->maps[depth];
2507 this_map->map_counter = ver_data->map_move_counter;
2508 this_map->map_idx = this_index;
2509 //up_coord = map_coord;
2510 map_coord = maps->maps[depth + 1]->coords[this_index];
2511 if (!s_ReadMapInfo(map_coord, this_map, depth + 1/*, up_coord*/)) {
2512 #ifdef _DEBUG
2513 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData2, "ReadChunkData: ReadMapInfo");
2514 #endif
2515 return false;
2516 }
2517 }
2518 }
2519
2520 SNCDataCoord chunk_coord;
2521 if (ver_data->map_depth == 0) {
2522 if (chunk_num != 0) {
2523 SRV_FATAL("Blob coords broken");
2524 }
2525 //up_coord = ver_data->coord;
2526 chunk_coord = ver_data->data_coord;
2527 }
2528 else {
2529 /*if (ver_data->map_depth == 1)
2530 up_coord = ver_data->data_coord;
2531 else
2532 up_coord = maps->maps[1]->coords[map_idx[1]];*/
2533 chunk_coord = maps->maps[0]->coords[map_idx[0]];
2534 }
2535
2536 CSrvRef<SNCDBFileInfo> data_file = s_GetDBFileTry(chunk_coord.file_id);
2537 if (data_file.IsNull()) {
2538 #ifdef _DEBUG
2539 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData3, "ReadChunkData: data_file");
2540 #endif
2541 return false;
2542 }
2543 SFileIndexRec* data_ind = s_GetIndexRecTry(data_file, chunk_coord.rec_num);
2544 if (!data_ind) {
2545 #ifdef _DEBUG
2546 CNCAlerts::Register(CNCAlerts::eDebugReadChunkData4, "ReadChunkData: data_inf");
2547 #endif
2548 return false;
2549 }
2550 /*
2551 This is an incorrect check because it can race with
2552 CNCBlobVerManager::x_UpdateVersion
2553
2554 if (data_ind->chain_coord != up_coord) {
2555 DB_CORRUPTED("Index record " << chunk_coord.rec_num
2556 << " in file " << data_file->file_name
2557 << " has wrong up_coord " << data_ind->chain_coord
2558 << " when should be " << up_coord << ".");
2559 }
2560 */
2561 SFileChunkDataRec* data_rec = s_CalcChunkAddress(data_file, data_ind);
2562 if (data_rec->chunk_num != chunk_num || data_rec->chunk_idx != map_idx[0])
2563 {
2564 SRV_LOG(Critical, "File " << data_file->file_name
2565 << " in chunk record " << chunk_coord.rec_num
2566 << " has wrong chunk number " << data_rec->chunk_num
2567 << " and/or chunk index " << data_rec->chunk_idx
2568 << ". Deleting blob.");
2569 return false;
2570 }
2571
2572 buf_size = s_CalcChunkDataSize(data_ind->rec_size);
2573 buffer = (char*)data_rec->chunk_data;
2574
2575 return true;
2576 }
2577
2578 char*
WriteChunkData(SNCBlobVerData * ver_data,SNCChunkMaps * maps,SNCCacheData * cache_data,Uint8 chunk_num,char * buffer,Uint4 buf_size)2579 CNCBlobStorage::WriteChunkData(SNCBlobVerData* ver_data,
2580 SNCChunkMaps* maps,
2581 SNCCacheData* cache_data,
2582 Uint8 chunk_num,
2583 char* buffer,
2584 Uint4 buf_size)
2585 {
2586 Uint2 map_idx[kNCMaxBlobMapsDepth] = {0};
2587 Uint1 cur_index = 0;
2588 Uint8 level_num = chunk_num;
2589 do {
2590 map_idx[cur_index] = Uint2(level_num % ver_data->map_size);
2591 level_num /= ver_data->map_size;
2592 ++cur_index;
2593 }
2594 while (level_num != 0 && cur_index < kNCMaxBlobMapsDepth);
2595 if (level_num != 0 && cur_index >= kNCMaxBlobMapsDepth) {
2596 SRV_LOG(Critical, "Chunk number " << chunk_num
2597 << " exceeded maximum map depth " << kNCMaxBlobMapsDepth
2598 << " with map_size=" << ver_data->map_size << ".");
2599 #ifdef _DEBUG
2600 CNCAlerts::Register(CNCAlerts::eDebugWriteChunkData1,"chunk number");
2601 #endif
2602 return NULL;
2603 }
2604
2605 if (map_idx[1] != maps->maps[0]->map_idx) {
2606 if (!s_SaveChunkMap(ver_data, cache_data, maps->maps, ver_data->map_size, false))
2607 return NULL;
2608 for (Uint1 i = 0; i < kNCMaxBlobMapsDepth - 1; ++i)
2609 maps->maps[i]->map_idx = map_idx[i + 1];
2610 }
2611
2612 SNCDataCoord data_coord;
2613 CSrvRef<SNCDBFileInfo> data_file;
2614 SFileIndexRec* data_ind;
2615 Uint4 rec_size = s_CalcChunkRecSize(buf_size);
2616 if (!s_GetNextWriteCoord(eFileIndexData, rec_size, data_coord, data_file, data_ind)) {
2617 #ifdef _DEBUG
2618 CNCAlerts::Register(CNCAlerts::eDebugWriteChunkData2,"s_GetNextWriteCoord");
2619 #endif
2620 return NULL;
2621 }
2622
2623 data_ind->rec_type = eFileRecChunkData;
2624 data_ind->cache_data = cache_data;
2625 SFileChunkDataRec* data_rec = s_CalcChunkAddress(data_file, data_ind);
2626 data_rec->chunk_num = chunk_num;
2627 data_rec->chunk_idx = map_idx[0];
2628 memcpy(data_rec->chunk_data, buffer, buf_size);
2629
2630 maps->maps[0]->coords[map_idx[0]] = data_coord;
2631
2632 data_file->cnt_unfinished.Add(-1);
2633
2634 return (char*)data_rec->chunk_data;
2635 }
2636
2637 void
ChangeCacheDeadTime(SNCCacheData * cache_data)2638 CNCBlobStorage::ChangeCacheDeadTime(SNCCacheData* cache_data)
2639 {
2640 STimeTable* table = s_TimeTables[cache_data->time_bucket];
2641 table->lock.Lock();
2642 if (cache_data->saved_dead_time != 0) {
2643 #if __NC_CACHEDATA_INTR_SET
2644 table->time_map.erase(table->time_map.iterator_to(*cache_data));
2645 #else
2646 table->time_map.erase(cache_data);
2647 #endif
2648 AtomicSub(s_CurBlobsCnt, 1);
2649 if (CNCBlobStorage::IsDraining() && s_CurBlobsCnt <= 0) {
2650 CTaskServer::RequestShutdown(eSrvSlowShutdown);
2651 }
2652 }
2653 cache_data->saved_dead_time = cache_data->dead_time;
2654 if (cache_data->saved_dead_time != 0) {
2655 #if __NC_CACHEDATA_INTR_SET
2656 table->time_map.insert_equal(*cache_data);
2657 #else
2658 table->time_map.insert(cache_data);
2659 #endif
2660 AtomicAdd(s_CurBlobsCnt, 1);
2661 }
2662 table->lock.Unlock();
2663 }
2664
2665 Uint8
GetMaxSyncLogRecNo(void)2666 CNCBlobStorage::GetMaxSyncLogRecNo(void)
2667 {
2668 Uint8 result = 0;
2669 s_IndexLock.Lock();
2670 try {
2671 result = s_IndexDB->GetMaxSyncLogRecNo();
2672 }
2673 catch (CSQLITE_Exception& ex) {
2674 SRV_LOG(Critical, "Cannot read max_sync_log_rec_no: " << ex);
2675 }
2676 s_IndexLock.Unlock();
2677 return result;
2678 }
2679
2680 void
SaveMaxSyncLogRecNo(void)2681 CNCBlobStorage::SaveMaxSyncLogRecNo(void)
2682 {
2683 s_NeedSaveLogRecNo = true;
2684 }
2685
2686 string
GetPurgeData(void)2687 CNCBlobStorage::GetPurgeData(void)
2688 {
2689 string result;
2690 s_IndexLock.Lock();
2691 try {
2692 result = s_IndexDB->GetPurgeData();
2693 }
2694 catch (CSQLITE_Exception&) {
2695 }
2696 s_IndexLock.Unlock();
2697 return result;
2698 }
2699
SavePurgeData(void)2700 void CNCBlobStorage::SavePurgeData(void)
2701 {
2702 s_NeedSavePurgeData = true;
2703 s_RecNoSaver->SetRunnable();
2704 }
2705
2706 Uint8
GetMaxBlobSizeStore(void)2707 CNCBlobStorage::GetMaxBlobSizeStore(void)
2708 {
2709 return s_MaxBlobSizeStore;
2710 }
2711
2712 Int8
GetDiskFree(void)2713 CNCBlobStorage::GetDiskFree(void)
2714 {
2715 try {
2716 return CFileUtil::GetFreeDiskSpace(s_Path);
2717 }
2718 catch (CFileErrnoException& ex) {
2719 SRV_LOG(Critical, "Cannot read free disk space: " << ex);
2720 return 0;
2721 }
2722 }
2723
2724 Int8
GetAllowedDBSize(Int8 free_space)2725 CNCBlobStorage::GetAllowedDBSize(Int8 free_space)
2726 {
2727 Int8 total_space = s_CurDBSize + free_space;
2728 Int8 allowed_db_size = total_space;
2729
2730 if (total_space < s_DiskCritical)
2731 allowed_db_size = 16;
2732 else
2733 allowed_db_size = min(allowed_db_size, total_space - s_DiskCritical);
2734 if (total_space < s_DiskFreeLimit)
2735 allowed_db_size = 16;
2736 else
2737 allowed_db_size = min(allowed_db_size, total_space - s_DiskFreeLimit);
2738 if (s_StopWriteOnSize != 0 && s_StopWriteOnSize < allowed_db_size)
2739 allowed_db_size = s_StopWriteOnSize;
2740
2741 return allowed_db_size;
2742 }
2743
2744 Int8
GetDBSize(void)2745 CNCBlobStorage::GetDBSize(void)
2746 {
2747 return s_CurDBSize;
2748 }
2749
2750 size_t
GetNDBFiles(void)2751 CNCBlobStorage::GetNDBFiles(void)
2752 {
2753 return s_DBFiles->size();
2754 }
2755
2756 bool
IsCleanStart(void)2757 CNCBlobStorage::IsCleanStart(void)
2758 {
2759 return s_CleanStart;
2760 }
2761
2762 bool
NeedStopWrite(void)2763 CNCBlobStorage::NeedStopWrite(void)
2764 {
2765 return s_IsStopWrite != eNoStop && s_IsStopWrite != eStopWarning;
2766 }
2767
2768 bool
AcceptWritesFromPeers(void)2769 CNCBlobStorage::AcceptWritesFromPeers(void)
2770 {
2771 return s_IsStopWrite != eStopDiskCritical;
2772 }
2773
2774 void
SetDraining(bool draining)2775 CNCBlobStorage::SetDraining(bool draining)
2776 {
2777 s_Draining = draining;
2778 if (draining) {
2779 if (s_CurBlobsCnt <= 0) {
2780 CTaskServer::RequestShutdown(eSrvSlowShutdown);
2781 }
2782 }
2783 }
2784
AbandonDB(void)2785 void CNCBlobStorage::AbandonDB(void)
2786 {
2787 s_AbandonDB = true;
2788 }
2789
IsAbandoned(void)2790 bool CNCBlobStorage::IsAbandoned(void)
2791 {
2792 return s_AbandonDB;
2793 }
2794
2795 bool
IsDraining(void)2796 CNCBlobStorage::IsDraining(void)
2797 {
2798 return s_Draining;
2799 }
2800
2801 bool
IsDBSizeAlert(void)2802 CNCBlobStorage::IsDBSizeAlert(void)
2803 {
2804 return s_IsStopWrite != eNoStop;
2805 }
2806
2807 Uint4
GetNewBlobId(void)2808 CNCBlobStorage::GetNewBlobId(void)
2809 {
2810 return Uint4(s_BlobCounter.Add(1));
2811 }
2812
2813 int
GetLatestBlobExpire(void)2814 CNCBlobStorage::GetLatestBlobExpire(void)
2815 {
2816 int res = CSrvTime::CurSecs();
2817 ITERATE( TBucketCacheMap, bkt, s_BucketsCache) {
2818 SBucketCache* cache = bkt->second;
2819 cache->lock.Lock();
2820 ITERATE(TKeyMap, it, cache->key_map) {
2821 #if __NC_CACHEDATA_INTR_SET
2822 res = max( res, it->dead_time);
2823 #else
2824 res = max( res, (*it)->dead_time);
2825 #endif
2826 }
2827 cache->lock.Unlock();
2828 }
2829 return res;
2830 }
2831
2832 void
GetFullBlobsList(Uint2 slot,TNCBlobSumList & blobs_lst,const CNCPeerControl * peer)2833 CNCBlobStorage::GetFullBlobsList(Uint2 slot, TNCBlobSumList& blobs_lst, const CNCPeerControl* peer)
2834 {
2835 blobs_lst.clear();
2836 Uint2 slot_buckets = CNCDistributionConf::GetCntSlotBuckets();
2837 Uint2 bucket_num = (slot - 1) * slot_buckets + 1;
2838 for (Uint2 i = 0; i < slot_buckets; ++i, ++bucket_num) {
2839 SBucketCache* cache = s_GetBucketCache(bucket_num);
2840
2841 cache->lock.Lock();
2842 Uint8 cnt_blobs = cache->key_map.size();
2843 void* big_block = malloc(size_t(cnt_blobs * sizeof(SNCTempBlobInfo)));
2844 if (!big_block) {
2845 cache->lock.Unlock();
2846 return;
2847 }
2848 SNCTempBlobInfo* info_ptr = (SNCTempBlobInfo*)big_block;
2849
2850 ITERATE(TKeyMap, it, cache->key_map) {
2851 #if __NC_CACHEDATA_INTR_SET
2852 new (info_ptr) SNCTempBlobInfo(*it);
2853 #else
2854 new (info_ptr) SNCTempBlobInfo(**it);
2855 #endif
2856 ++info_ptr;
2857 }
2858 cache->lock.Unlock();
2859
2860 info_ptr = (SNCTempBlobInfo*)big_block;
2861 for (Uint8 i = 0; i < cnt_blobs; ++i) {
2862 Uint2 key_slot = 0, key_bucket = 0;
2863 if (!CNCDistributionConf::GetSlotByKey(info_ptr->key, key_slot, key_bucket) ||
2864 key_slot != slot /*|| key_bucket != bucket_num*/) {
2865 SRV_FATAL("Slot verification failed, blob key: " << info_ptr->key <<
2866 ", expected slot: " << slot << ", calculated slot: " << key_slot);
2867 }
2868 if (key_bucket != bucket_num) {
2869 SRV_LOG(Critical, "Slot verification failed, blob key: " << info_ptr->key <<
2870 ", slot: " << slot <<
2871 ", expected bucket: " << bucket_num << ", calculated bucket: " << key_bucket);
2872 }
2873
2874 if (info_ptr->size > CNCDistributionConf::GetMaxBlobSizeSync()) {
2875 if (CNCDistributionConf::IsThisServerKey(info_ptr->key)) {
2876 continue;
2877 }
2878 }
2879
2880 if (peer && !peer->AcceptsBlobKey(info_ptr->key)) {
2881 continue;
2882 }
2883 SNCBlobSummary* blob_sum = new SNCBlobSummary();
2884 blob_sum->size = info_ptr->size;
2885 blob_sum->create_id = info_ptr->create_id;
2886 blob_sum->create_server = info_ptr->create_server;
2887 blob_sum->create_time = info_ptr->create_time;
2888 blob_sum->dead_time = info_ptr->dead_time;
2889 blob_sum->expire = info_ptr->expire;
2890 blob_sum->ver_expire = info_ptr->ver_expire;
2891 blobs_lst[info_ptr->key] = blob_sum;
2892
2893 info_ptr->~SNCTempBlobInfo();
2894 ++info_ptr;
2895 }
2896
2897 free(big_block);
2898 }
2899 }
2900
2901 void
MeasureDB(SNCStateStat & state)2902 CNCBlobStorage::MeasureDB(SNCStateStat& state)
2903 {
2904 state.db_files = Uint4(CNCBlobStorage::GetNDBFiles());
2905 state.db_size = s_CurDBSize;
2906 state.db_garb = s_GarbageSize;
2907 state.cnt_blobs = s_CurBlobsCnt;
2908 state.cnt_keys = s_CurKeysCnt;
2909 state.min_dead_time = numeric_limits<int>::max();
2910 ITERATE(TTimeBuckets, it, s_TimeTables) {
2911 STimeTable* table = it->second;
2912 table->lock.Lock();
2913 if (!table->time_map.empty()) {
2914 #if __NC_CACHEDATA_INTR_SET
2915 state.min_dead_time = min(state.min_dead_time,
2916 table->time_map.begin()->dead_time);
2917 #else
2918 state.min_dead_time = min(state.min_dead_time,
2919 (*table->time_map.begin())->dead_time);
2920 #endif
2921 }
2922 table->lock.Unlock();
2923 }
2924 if (state.min_dead_time == numeric_limits<int>::max())
2925 state.min_dead_time = 0;
2926 }
2927
2928 void
x_DeleteIndexes(SNCDataCoord map_coord,Uint1 map_depth)2929 CBlobCacher::x_DeleteIndexes(SNCDataCoord map_coord, Uint1 map_depth)
2930 {
2931 CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileNoLock(map_coord.file_id);
2932 SFileIndexRec* map_ind = s_GetIndexRec(file_info, map_coord.rec_num);
2933 s_DeleteIndexRec(file_info, map_ind);
2934 if (map_depth != 0) {
2935 Uint2 cnt_downs = s_CalcCntMapDowns(map_ind->rec_size);
2936 if (cnt_downs != 0) {
2937 SFileChunkMapRec* map_rec = s_CalcMapAddress(file_info, map_ind);
2938 for (Uint2 i = 0; i < cnt_downs; ++i) {
2939 x_DeleteIndexes(map_rec->down_coords[i], map_depth - 1);
2940 }
2941 }
2942 }
2943 }
2944
2945 bool
x_CacheMapRecs(SNCDataCoord map_coord,Uint1 map_depth,SNCDataCoord up_coord,Uint2 up_index,SNCCacheData * cache_data,Uint8 cnt_chunks,Uint8 & chunk_num,map<Uint4,Uint4> & sizes_map)2946 CBlobCacher::x_CacheMapRecs(SNCDataCoord map_coord,
2947 Uint1 map_depth,
2948 SNCDataCoord up_coord,
2949 Uint2 up_index,
2950 SNCCacheData* cache_data,
2951 Uint8 cnt_chunks,
2952 Uint8& chunk_num,
2953 map<Uint4, Uint4>& sizes_map)
2954 {
2955 TNCDBFilesMap::const_iterator it_file = s_DBFiles->find(map_coord.file_id);
2956 if (it_file == s_DBFiles->end()) {
2957 SRV_LOG(Critical, "Cannot find file for the coord " << map_coord
2958 << " which is referenced by blob " << cache_data->key
2959 << ". Deleting blob.");
2960 return false;
2961 }
2962
2963 SNCDBFileInfo* file_info = it_file->second.GetNCPointerOrNull();
2964 TRecNumsSet& recs_set = m_RecsMap[map_coord.file_id];
2965 TRecNumsSet::iterator it_recs = recs_set.find(map_coord.rec_num);
2966 if (it_recs == recs_set.end()) {
2967 SRV_LOG(Critical, "Blob " << cache_data->key
2968 << " references record with coord " << map_coord
2969 << ", but its index wasn't in live chain. Deleting blob.");
2970 return false;
2971 }
2972
2973 SFileIndexRec* map_ind = s_GetIndexRec(file_info, map_coord.rec_num);
2974 if (map_ind->chain_coord != up_coord) {
2975 SRV_LOG(Critical, "Up_coord for map/data in blob " << cache_data->key
2976 << " is incorrect: " << map_ind->chain_coord
2977 << " when should be " << up_coord
2978 << ". Correcting it.");
2979 map_ind->chain_coord = up_coord;
2980 }
2981 map_ind->cache_data = cache_data;
2982
2983 Uint4 rec_size = map_ind->rec_size;
2984 if (rec_size & 7)
2985 rec_size += 8 - (rec_size & 7);
2986 sizes_map[map_coord.file_id] += rec_size + sizeof(SFileIndexRec);
2987
2988 if (map_depth == 0) {
2989 if (map_ind->rec_type != eFileRecChunkData) {
2990 SRV_LOG(Critical, "Blob " << cache_data->key
2991 << " with size " << cache_data->size
2992 << " references record with coord " << map_coord
2993 << " that has type " << int(map_ind->rec_type)
2994 << " when it should be " << int(eFileRecChunkData)
2995 << ". Deleting blob.");
2996 return false;
2997 }
2998 ++chunk_num;
2999 if (chunk_num > cnt_chunks) {
3000 SRV_LOG(Critical, "Blob " << cache_data->key
3001 << " with size " << cache_data->size
3002 << " has too many data chunks, should be " << cnt_chunks
3003 << ". Deleting blob.");
3004 return false;
3005 }
3006 Uint4 data_size = s_CalcChunkDataSize(map_ind->rec_size);
3007 Uint4 need_size;
3008 if (chunk_num < cnt_chunks)
3009 need_size = cache_data->chunk_size;
3010 else
3011 need_size = (cache_data->size - 1) % cache_data->chunk_size + 1;
3012 if (data_size != need_size) {
3013 SRV_LOG(Critical, "Blob " << cache_data->key
3014 << " with size " << cache_data->size
3015 << " references data record with coord " << map_coord
3016 << " that has data size " << data_size
3017 << " when it should be " << need_size
3018 << ". Deleting blob.");
3019 return false;
3020 }
3021 }
3022 else {
3023 if (map_ind->rec_type != eFileRecChunkMap) {
3024 SRV_LOG(Critical, "Blob " << cache_data->key
3025 << " with size " << cache_data->size
3026 << " references record with coord " << map_coord
3027 << " at map_depth=" << map_depth
3028 << ". Record has type " << int(map_ind->rec_type)
3029 << " when it should be " << int(eFileRecChunkMap)
3030 << ". Deleting blob.");
3031 return false;
3032 }
3033 SFileChunkMapRec* map_rec = s_CalcMapAddress(file_info, map_ind);
3034 if (map_rec->map_idx != up_index || map_rec->map_depth != map_depth)
3035 {
3036 SRV_LOG(Critical, "Blob " << cache_data->key
3037 << " with size " << cache_data->size
3038 << " references map with coord " << map_coord
3039 << " that has wrong index " << map_rec->map_idx
3040 << " and/or map depth " << map_rec->map_depth
3041 << ", they should be " << up_index
3042 << " and " << map_depth
3043 << ". Deleting blob.");
3044 return false;
3045 }
3046 Uint2 cnt_downs = s_CalcCntMapDowns(map_ind->rec_size);
3047 for (Uint2 i = 0; i < cnt_downs; ++i) {
3048 if (!x_CacheMapRecs(map_rec->down_coords[i], map_depth - 1,
3049 map_coord, i, cache_data, cnt_chunks,
3050 chunk_num, sizes_map))
3051 {
3052 for (Uint2 j = 0; j < i; ++j) {
3053 x_DeleteIndexes(map_rec->down_coords[j], map_depth - 1);
3054 }
3055 return false;
3056 }
3057 }
3058 if (chunk_num < cnt_chunks && cnt_downs != cache_data->map_size) {
3059 SRV_LOG(Critical, "Blob " << cache_data->key
3060 << " with size " << cache_data->size
3061 << " references map record with coord " << map_coord
3062 << " that has " << cnt_downs
3063 << " children when it should be " << cache_data->map_size
3064 << " because accumulated chunk_num=" << chunk_num
3065 << " is less than total " << cnt_chunks
3066 << ". Deleting blob.");
3067 return false;
3068 }
3069 }
3070
3071 recs_set.erase(it_recs);
3072 return true;
3073 }
3074
3075 bool
x_CacheMetaRec(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec,SNCDataCoord coord)3076 CBlobCacher::x_CacheMetaRec(SNCDBFileInfo* file_info,
3077 SFileIndexRec* ind_rec,
3078 SNCDataCoord coord)
3079 {
3080 SFileMetaRec* meta_rec = s_CalcMetaAddress(file_info, ind_rec);
3081 if (meta_rec->has_password > 1 || meta_rec->dead_time < meta_rec->expire
3082 || meta_rec->dead_time < meta_rec->ver_expire
3083 || meta_rec->chunk_size == 0 || meta_rec->map_size == 0)
3084 {
3085 SRV_LOG(Critical, "Meta record in file " << file_info->file_name
3086 << " at offset " << ind_rec->offset << " was corrupted."
3087 << " passwd=" << meta_rec->has_password
3088 << ", expire=" << meta_rec->expire
3089 << ", ver_expire=" << meta_rec->ver_expire
3090 << ", dead=" << meta_rec->dead_time
3091 << ", chunk_size=" << meta_rec->chunk_size
3092 << ", map_size=" << meta_rec->map_size
3093 << ". Deleting it.");
3094 return false;
3095 }
3096 if (meta_rec->dead_time <= CSrvTime::CurSecs())
3097 return false;
3098
3099 char* key_data = meta_rec->key_data;
3100 char* key_end = (char*)meta_rec + ind_rec->rec_size;
3101 if (meta_rec->has_password)
3102 key_data += 16;
3103 string key(key_data, key_end - key_data);
3104 int cnt = 0;
3105 ITERATE(string, it, key) {
3106 if (*it == '\1')
3107 ++cnt;
3108 }
3109 if (cnt != 2) {
3110 SRV_LOG(Critical, "Meta record in file " << file_info->file_name
3111 << " at offset " << ind_rec->offset << " was corrupted."
3112 << " key=" << key
3113 << ", cnt_special=" << cnt
3114 << ". Deleting it.");
3115 return false;
3116 }
3117 Uint2 slot = 0, time_bucket = 0;
3118 if (!CNCDistributionConf::GetSlotByKey(key, slot, time_bucket)) {
3119 SRV_LOG(Critical, "Could not extract slot number from key: " << key);
3120 return false;
3121 }
3122
3123 SNCCacheData* cache_data = new SNCCacheData();
3124 cache_data->key = key;
3125 cache_data->coord = coord;
3126 cache_data->time_bucket = time_bucket;
3127 cache_data->size = meta_rec->size;
3128 cache_data->chunk_size = meta_rec->chunk_size;
3129 cache_data->map_size = meta_rec->map_size;
3130 cache_data->create_time = meta_rec->create_time;
3131 cache_data->create_server = meta_rec->create_server;
3132 cache_data->create_id = meta_rec->create_id;
3133 cache_data->dead_time = meta_rec->dead_time;
3134 cache_data->saved_dead_time = meta_rec->dead_time;
3135 cache_data->expire = meta_rec->expire;
3136 cache_data->ver_expire = meta_rec->ver_expire;
3137
3138 if (meta_rec->size == 0) {
3139 if (!ind_rec->chain_coord.empty()) {
3140 SRV_LOG(Critical, "Index record " << (file_info->index_head - ind_rec)
3141 << " in file " << file_info->file_name
3142 << " was corrupted. size=0 but chain_coord="
3143 << ind_rec->chain_coord << ". Ignoring that.");
3144 ind_rec->chain_coord.clear();
3145 }
3146 }
3147 else {
3148 Uint1 map_depth = s_CalcMapDepthImpl(meta_rec->size,
3149 meta_rec->chunk_size,
3150 meta_rec->map_size);
3151 if (map_depth > kNCMaxBlobMapsDepth) {
3152 SRV_LOG(Critical, "Meta record in file " << file_info->file_name
3153 << " at offset " << ind_rec->offset << " was corrupted."
3154 << ", size=" << meta_rec->size
3155 << ", chunk_size=" << meta_rec->chunk_size
3156 << ", map_size=" << meta_rec->map_size
3157 << " (map depth is more than " << kNCMaxBlobMapsDepth
3158 << "). Deleting it.");
3159 return false;
3160 }
3161 Uint8 cnt_chunks = (meta_rec->size - 1) / meta_rec->chunk_size + 1;
3162 Uint8 chunk_num = 0;
3163 typedef map<Uint4, Uint4> TSizesMap;
3164 TSizesMap sizes_map;
3165 if (!x_CacheMapRecs(ind_rec->chain_coord, map_depth, coord, 0, cache_data,
3166 cnt_chunks, chunk_num, sizes_map))
3167 {
3168 delete cache_data;
3169 return false;
3170 }
3171 ITERATE(TSizesMap, it, sizes_map) {
3172 CSrvRef<SNCDBFileInfo> info = s_GetDBFileNoLock(it->first);
3173 info->used_size += it->second;
3174 if (info->garb_size < it->second) {
3175 SRV_FATAL("Blob coords broken");
3176 }
3177 info->garb_size -= it->second;
3178 AtomicSub(s_GarbageSize, it->second);
3179 }
3180 }
3181 ind_rec->cache_data = cache_data;
3182 Uint4 rec_size = ind_rec->rec_size;
3183 if (rec_size & 7)
3184 rec_size += 8 - (rec_size & 7);
3185 rec_size += sizeof(SFileIndexRec);
3186 file_info->used_size += rec_size;
3187 if (file_info->garb_size < rec_size) {
3188 SRV_FATAL("Blob coords broken");
3189 }
3190 file_info->garb_size -= rec_size;
3191 AtomicSub(s_GarbageSize, rec_size);
3192
3193 TBucketCacheMap::iterator it_bucket = s_BucketsCache.lower_bound(time_bucket);
3194 SBucketCache* bucket_cache;
3195 if (it_bucket == s_BucketsCache.end() || it_bucket->first != time_bucket) {
3196 bucket_cache = new SBucketCache();
3197 s_BucketsCache.insert(it_bucket, TBucketCacheMap::value_type(time_bucket, bucket_cache));
3198 }
3199 else {
3200 bucket_cache = it_bucket->second;
3201 }
3202 STimeTable* time_table = s_TimeTables[time_bucket];
3203 #if __NC_CACHEDATA_INTR_SET
3204 TKeyMap::insert_commit_data commit_data;
3205 pair<TKeyMap::iterator, bool> ins_res =
3206 bucket_cache->key_map.insert_unique_check(key, SCacheKeyCompare(), commit_data);
3207 if (ins_res.second) {
3208 bucket_cache->key_map.insert_unique_commit(*cache_data, commit_data);
3209 ++s_CurKeysCnt;
3210 }
3211 #else
3212 TKeyMap::iterator ins_res = bucket_cache->key_map.find(cache_data);
3213 if (ins_res == bucket_cache->key_map.end()) {
3214 bucket_cache->key_map.insert(cache_data);
3215 ++s_CurKeysCnt;
3216 }
3217 #endif
3218 else {
3219 #if __NC_CACHEDATA_INTR_SET
3220 SNCCacheData* old_data = &*ins_res.first;
3221 bucket_cache->key_map.erase(ins_res.first);
3222 bucket_cache->key_map.insert_equal(*cache_data);
3223 #else
3224 SNCCacheData* old_data = *ins_res;
3225 bucket_cache->key_map.erase(old_data);
3226 bucket_cache->key_map.insert(cache_data);
3227 #endif
3228 #if __NC_CACHEDATA_ALL_MONITOR
3229 s_AllCache[time_bucket]->all_cache_set.erase(old_data);
3230 #endif
3231 --s_CurBlobsCnt;
3232 #if __NC_CACHEDATA_INTR_SET
3233 time_table->time_map.erase(time_table->time_map.iterator_to(*old_data));
3234 #else
3235 time_table->time_map.erase(old_data);
3236 #endif
3237 #ifdef _DEBUG
3238 if (old_data->Get_ver_mgr() || old_data->ref_cnt.Get() != 0) {
3239 abort();
3240 }
3241 #endif
3242 if (!old_data->coord.empty()) {
3243 CSrvRef<SNCDBFileInfo> old_file = s_GetDBFileNoLock(old_data->coord.file_id);
3244 SFileIndexRec* old_ind = s_GetIndexRec(old_file, old_data->coord.rec_num);
3245 SFileMetaRec* old_rec = s_CalcMetaAddress(old_file, old_ind);
3246 if (old_rec->size != 0 && old_ind->chain_coord != ind_rec->chain_coord && !old_ind->chain_coord.empty()) {
3247 Uint1 map_depth = s_CalcMapDepth(old_rec->size,
3248 old_rec->chunk_size,
3249 old_rec->map_size);
3250 s_MoveDataToGarbage(old_ind->chain_coord, map_depth, old_data->coord, false);
3251 }
3252 s_MoveRecToGarbage(old_file, old_ind);
3253 old_data->coord.clear();
3254 }
3255 delete old_data;
3256 }
3257 #if __NC_CACHEDATA_INTR_SET
3258 time_table->time_map.insert_equal(*cache_data);
3259 #else
3260 time_table->time_map.insert(cache_data);
3261 #endif
3262 #if __NC_CACHEDATA_ALL_MONITOR
3263 s_AllCache[time_bucket]->all_cache_set.insert(cache_data);
3264 #endif
3265 ++s_CurBlobsCnt;
3266
3267 return true;
3268 }
3269
3270 CBlobCacher::State
x_StartCreateFiles(void)3271 CBlobCacher::x_StartCreateFiles(void)
3272 {
3273 m_CurCreatePass = 0;
3274 m_CurCreateFile = 0;
3275 return &CBlobCacher::x_CreateInitialFile;
3276 }
3277
3278 CBlobCacher::State
x_CreateInitialFile(void)3279 CBlobCacher::x_CreateInitialFile(void)
3280 {
3281 if (CTaskServer::IsInShutdown())
3282 return &CBlobCacher::x_CancelCaching;
3283 if (m_CurCreatePass == 1 && m_CurCreateFile >= s_CntAllFiles)
3284 return &CBlobCacher::x_StartCacheBlobs;
3285
3286 if (m_CurCreateFile < s_CntAllFiles) {
3287 if (!s_CreateNewFile(m_CurCreateFile, false))
3288 return &CBlobCacher::x_DelFileAndRetryCreate;
3289
3290 m_NewFileIds.insert(s_AllWritings[m_CurCreateFile].next_file->file_id);
3291 ++m_CurCreateFile;
3292 }
3293 else {
3294 s_NextWriteLock.Lock();
3295 for (size_t i = 0; i < s_CntAllFiles; ++i) {
3296 s_SwitchToNextFile(s_AllWritings[i]);
3297 }
3298 s_NextWriteLock.Unlock();
3299 m_CurCreateFile = 0;
3300 ++m_CurCreatePass;
3301 }
3302
3303 // Yield execution to other tasks
3304 SetRunnable();
3305 return NULL;
3306 }
3307
3308 CBlobCacher::State
x_DelFileAndRetryCreate(void)3309 CBlobCacher::x_DelFileAndRetryCreate(void)
3310 {
3311 if (s_DBFiles->empty()) {
3312 SRV_LOG(Critical, "Cannot create initial database files.");
3313 GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3314 CTaskServer::RequestShutdown(eSrvFastShutdown);
3315 return &CBlobCacher::x_CancelCaching;
3316 }
3317
3318 CSrvRef<SNCDBFileInfo> file_to_del;
3319 ITERATE(TNCDBFilesMap, it, (*s_DBFiles)) {
3320 CSrvRef<SNCDBFileInfo> file_info = it->second;
3321 if (file_info->file_type == eDBFileData) {
3322 file_to_del = file_info;
3323 break;
3324 }
3325 }
3326 if (!file_to_del)
3327 file_to_del = s_DBFiles->begin()->second;
3328 s_DeleteDBFile(file_to_del, false);
3329
3330 // Yield execution to other tasks
3331 SetState(&CBlobCacher::x_CreateInitialFile);
3332 // file creation failed; probably, makes sense to wait longer
3333 // instead of failing again and again, like thousands times per sec
3334 // SetRunnable();
3335 RunAfter(1);
3336
3337 return NULL;
3338 }
3339
3340 CBlobCacher::State
x_PreCacheRecNums(void)3341 CBlobCacher::x_PreCacheRecNums(void)
3342 {
3343 if (CTaskServer::IsInShutdown())
3344 return &CBlobCacher::x_CancelCaching;
3345 if (m_CurFile == s_DBFiles->end())
3346 return &CBlobCacher::x_StartCreateFiles;
3347
3348 SNCDBFileInfo* const file_info = m_CurFile->second.GetNCPointerOrNull();
3349 TRecNumsSet& recs_set = m_RecsMap[file_info->file_id];
3350
3351 AtomicAdd(s_CurDBSize, file_info->file_size);
3352 // Non-garbage is left the same as in x_CreateNewFile
3353 Uint4 garb_size = file_info->file_size
3354 - (kSignatureSize + sizeof(SFileIndexRec));
3355 file_info->garb_size += garb_size;
3356 AtomicAdd(s_GarbageSize,garb_size);
3357
3358 SFileIndexRec* ind_rec = file_info->index_head;
3359 Uint4 prev_rec_num = 0;
3360 char* min_ptr = file_info->file_map + kSignatureSize;
3361 while (!CTaskServer::IsInShutdown() && ind_rec->next_num != 0) {
3362 Uint4 rec_num = ind_rec->next_num;
3363 if (rec_num <= prev_rec_num) {
3364 SRV_LOG(Critical, "File " << file_info->file_name
3365 << " contains wrong next_num=" << rec_num
3366 << " (it's not greater than current " << prev_rec_num
3367 << "). Won't cache anything else from this file.");
3368 goto wrap_index_and_return;
3369 }
3370 SFileIndexRec* next_ind;
3371 next_ind = file_info->index_head - rec_num;
3372 if ((char*)next_ind < min_ptr || next_ind >= ind_rec) {
3373 SRV_LOG(Critical, "File " << file_info->file_name
3374 << " contains wrong next_num=" << rec_num
3375 << " in index record " << (file_info->index_head - ind_rec)
3376 << ". It produces pointer " << (void*)next_ind
3377 << " which is not in the range between " << (void*)min_ptr
3378 << " and " << (void*)ind_rec
3379 << ". Won't cache anything else from this file.");
3380 goto wrap_index_and_return;
3381 }
3382 char* next_rec_start = file_info->file_map + next_ind->offset;
3383 if (next_rec_start < min_ptr || next_rec_start > (char*)next_ind
3384 || (rec_num == 1 && next_rec_start != min_ptr))
3385 {
3386 SRV_LOG(Critical, "File " << file_info->file_name
3387 << " contains wrong offset=" << ind_rec->offset
3388 << " in index record " << rec_num
3389 << ", resulting ptr " << (void*)next_rec_start
3390 << " which is not in the range " << (void*)min_ptr
3391 << " and " << (void*)next_ind
3392 << ". This record will be ignored.");
3393 goto ignore_rec_and_continue;
3394 }
3395 char* next_rec_end;
3396 next_rec_end = next_rec_start + next_ind->rec_size;
3397 if (next_rec_end < next_rec_start || next_rec_end > (char*)next_ind) {
3398 SRV_LOG(Critical, "File " << file_info->file_name
3399 << " contains wrong rec_size=" << next_ind->rec_size
3400 << " for offset " << next_ind->offset
3401 << " in index record " << rec_num
3402 << " (resulting end ptr " << (void*)next_rec_end
3403 << " is greater than index record " << (void*)next_ind
3404 << "). This record will be ignored.");
3405 goto ignore_rec_and_continue;
3406 }
3407 if (next_ind->prev_num != prev_rec_num)
3408 next_ind->prev_num = prev_rec_num;
3409 switch (next_ind->rec_type) {
3410 case eFileRecChunkData:
3411 if (file_info->file_type != eDBFileData) {
3412 // This is not an error. It can be a result of failed move
3413 // attempt.
3414 goto ignore_rec_and_continue;
3415 }
3416 if (next_ind->rec_size < sizeof(SFileChunkDataRec)) {
3417 SRV_LOG(Critical, "File " << file_info->file_name
3418 << " contains wrong rec_size=" << next_ind->rec_size
3419 << " for offset " << next_ind->offset
3420 << " in index record " << rec_num
3421 << ", rec_type=" << int(next_ind->rec_type)
3422 << ", min_size=" << sizeof(SFileChunkDataRec)
3423 << ". This record will be ignored.");
3424 goto ignore_rec_and_continue;
3425 }
3426 break;
3427 case eFileRecChunkMap:
3428 if (file_info->file_type != eDBFileMaps)
3429 goto bad_rec_type;
3430 if (next_ind->rec_size < sizeof(SFileChunkMapRec)) {
3431 SRV_LOG(Critical, "File " << file_info->file_name
3432 << " contains wrong rec_size=" << next_ind->rec_size
3433 << " for offset " << next_ind->offset
3434 << " in index record " << rec_num
3435 << ", rec_type=" << int(next_ind->rec_type)
3436 << ", min_size=" << sizeof(SFileChunkMapRec)
3437 << ". This record will be ignored.");
3438 goto ignore_rec_and_continue;
3439 }
3440 break;
3441 case eFileRecMeta:
3442 if (file_info->file_type != eDBFileMeta)
3443 goto bad_rec_type;
3444 SFileMetaRec* meta_rec;
3445 meta_rec = (SFileMetaRec*)next_rec_start;
3446 Uint4 min_rec_size;
3447 // Minimum key length is 2, so we are adding 1 below
3448 min_rec_size = sizeof(SFileChunkMapRec) + 1;
3449 if (meta_rec->has_password)
3450 min_rec_size += 16;
3451 if (next_ind->rec_size < min_rec_size) {
3452 SRV_LOG(Critical, "File " << file_info->file_name
3453 << " contains wrong rec_size=" << next_ind->rec_size
3454 << " for offset " << next_ind->offset
3455 << " in index record " << rec_num
3456 << ", rec_type=" << int(next_ind->rec_type)
3457 << ", min_size=" << min_rec_size
3458 << ". This record will be ignored.");
3459 goto ignore_rec_and_continue;
3460 }
3461 break;
3462 default:
3463 bad_rec_type:
3464 SRV_LOG(Critical, "File " << file_info->file_name
3465 << " with type " << int(file_info->file_type)
3466 << " contains wrong rec_type=" << next_ind->rec_type
3467 << " in index record " << rec_num
3468 << "). This record will be ignored.");
3469 goto ignore_rec_and_continue;
3470 }
3471
3472 recs_set.insert(rec_num);
3473 min_ptr = next_rec_end;
3474 ind_rec = next_ind;
3475 prev_rec_num = rec_num;
3476 continue;
3477
3478 ignore_rec_and_continue:
3479 ind_rec->next_num = next_ind->next_num;
3480 }
3481 goto finalize_and_return;
3482
3483 wrap_index_and_return:
3484 ind_rec->next_num = 0;
3485
3486 finalize_and_return:
3487 s_LockFileMem(ind_rec, (prev_rec_num + 1) * sizeof(SFileIndexRec));
3488 // to next file
3489 ++m_CurFile;
3490 SetRunnable();
3491 return NULL;
3492 }
3493
3494 CBlobCacher::State
x_StartCaching(void)3495 CBlobCacher::x_StartCaching(void)
3496 {
3497 CreateNewDiagCtx();
3498 CSrvDiagMsg().StartRequest().PrintParam("_type", "caching");
3499
3500 s_DBFilesLock.Lock();
3501 m_CurFile = s_DBFiles->begin();
3502 return &CBlobCacher::x_PreCacheRecNums;
3503 }
3504
3505 CBlobCacher::State
x_CancelCaching(void)3506 CBlobCacher::x_CancelCaching(void)
3507 {
3508 s_DBFilesLock.Unlock();
3509 if (GetDiagCtx()->GetRequestStatus() == eStatus_OK)
3510 GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
3511 CSrvDiagMsg().StopRequest();
3512 ReleaseDiagCtx();
3513
3514 Terminate();
3515 return NULL;
3516 }
3517
3518 CBlobCacher::State
x_FinishCaching(void)3519 CBlobCacher::x_FinishCaching(void)
3520 {
3521 s_DBFilesLock.Unlock();
3522 CSrvDiagMsg().StopRequest();
3523 ReleaseDiagCtx();
3524
3525 s_DiskFlusher->SetRunnable();
3526 s_RecNoSaver->SetRunnable();
3527 s_SpaceShrinker->SetRunnable();
3528 s_ExpiredCleaner->SetRunnable();
3529
3530 CNCServer::CachingCompleted();
3531
3532 Terminate();
3533 return NULL;
3534 }
3535
3536 CBlobCacher::State
x_StartCacheBlobs(void)3537 CBlobCacher::x_StartCacheBlobs(void)
3538 {
3539 m_CurFile = s_DBFiles->begin();
3540 return &CBlobCacher::x_CacheNextFile;
3541 }
3542
3543 CBlobCacher::State
x_CacheNextFile(void)3544 CBlobCacher::x_CacheNextFile(void)
3545 {
3546 try_next_file:
3547 if (CTaskServer::IsInShutdown())
3548 return &CBlobCacher::x_CancelCaching;
3549 if (m_CurFile == s_DBFiles->end())
3550 return &CBlobCacher::x_CleanOrphanRecs;
3551
3552 SNCDBFileInfo* file_info = m_CurFile->second.GetNCPointerOrNull();
3553 if (file_info->file_type != eDBFileMeta
3554 || m_NewFileIds.find(file_info->file_id) != m_NewFileIds.end())
3555 {
3556 ++m_CurFile;
3557 goto try_next_file;
3558 }
3559 m_CurRecsSet = &m_RecsMap[file_info->file_id];
3560 m_CurRecIt = m_CurRecsSet->begin();
3561 return &CBlobCacher::x_CacheNextRecord;
3562 }
3563
3564 CBlobCacher::State
x_CacheNextRecord(void)3565 CBlobCacher::x_CacheNextRecord(void)
3566 {
3567 if (CTaskServer::IsInShutdown())
3568 return &CBlobCacher::x_CancelCaching;
3569 if (m_CurRecIt == m_CurRecsSet->end()) {
3570 m_CurRecsSet->clear();
3571 ++m_CurFile;
3572 return &CBlobCacher::x_CacheNextFile;
3573 }
3574
3575 Uint4 rec_num = *m_CurRecIt;
3576 SNCDBFileInfo* file_info = m_CurFile->second.GetNCPointerOrNull();
3577 SNCDataCoord coord;
3578 coord.file_id = file_info->file_id;
3579 coord.rec_num = rec_num;
3580 SFileIndexRec* ind_rec = s_GetIndexRec(file_info, rec_num);
3581 if (!x_CacheMetaRec(file_info, ind_rec, coord))
3582 s_DeleteIndexRec(file_info, ind_rec);
3583
3584 ++m_CurRecIt;
3585 SetRunnable();
3586 return NULL;
3587 }
3588
3589 CBlobCacher::State
x_CleanOrphanRecs(void)3590 CBlobCacher::x_CleanOrphanRecs(void)
3591 {
3592 ITERATE(TFileRecsMap, it_id, m_RecsMap) {
3593 Uint4 file_id = it_id->first;
3594 const TRecNumsSet& recs_set = it_id->second;
3595 TNCDBFilesMap::iterator it_file = s_DBFiles->find(file_id);
3596 if (it_file == s_DBFiles->end()) {
3597 // File could be deleted when we tried to create initial files.
3598 continue;
3599 }
3600 SNCDBFileInfo* file_info = it_file->second;
3601 TRecNumsSet::iterator it_rec = recs_set.begin();
3602 for (; it_rec != recs_set.end(); ++it_rec) {
3603 SFileIndexRec* ind_rec = s_GetIndexRec(file_info, *it_rec);
3604 s_DeleteIndexRec(file_info, ind_rec);
3605 }
3606 }
3607
3608 return &CBlobCacher::x_FinishCaching;
3609 }
3610
CBlobCacher(void)3611 CBlobCacher::CBlobCacher(void)
3612 {
3613 #if __NC_TASKS_MONITOR
3614 m_TaskName = "CBlobCacher";
3615 #endif
3616 SetState(&CBlobCacher::x_StartCaching);
3617 }
3618
~CBlobCacher(void)3619 CBlobCacher::~CBlobCacher(void)
3620 {}
3621
3622
3623 void
CheckDiskSpace(void)3624 CNCBlobStorage::CheckDiskSpace(void)
3625 {
3626 Int8 free_space = GetDiskFree();
3627 Int8 allowed_db_size = GetAllowedDBSize(free_space);
3628 Int8 cur_db_size = s_CurDBSize;
3629
3630 if (s_IsStopWrite == eNoStop
3631 && cur_db_size * 100 >= allowed_db_size * s_WarnLimitOnPct)
3632 {
3633 string msg("Current db size is " + g_ToSizeStr(cur_db_size) +
3634 ", allowed db size is " + g_ToSizeStr(allowed_db_size) +
3635 " (" + g_ToSmartStr(cur_db_size * 100 / allowed_db_size) + "%)");
3636 CNCAlerts::Register(CNCAlerts::eDatabaseTooLarge, msg);
3637 ERR_POST(Critical << "ALERT! Database is too large. " << msg);
3638 s_IsStopWrite = eStopWarning;
3639 Logging_DiskSpaceAlert();
3640 }
3641
3642 if (s_IsStopWrite == eStopWarning) {
3643 if (s_StopWriteOnSize != 0 && cur_db_size >= s_StopWriteOnSize) {
3644 s_IsStopWrite = eStopDBSize;
3645 string msg("Current db size is " + g_ToSizeStr(cur_db_size) +
3646 ", stopwrite size is " + g_ToSizeStr(s_StopWriteOnSize) +
3647 " (" + g_ToSmartStr(cur_db_size * 100 / s_StopWriteOnSize) + "%)");
3648 CNCAlerts::Register(CNCAlerts::eDatabaseOverLimit, msg);
3649 ERR_POST(Critical << "Database size exceeded its limit. "
3650 << msg << ". Will no longer accept any writes from clients.");
3651 }
3652 }
3653 else if (s_IsStopWrite == eStopDBSize && cur_db_size <= s_StopWriteOffSize)
3654 {
3655 s_IsStopWrite = eStopWarning;
3656 }
3657 if (free_space <= s_DiskCritical) {
3658 if (s_IsStopWrite < eStopDiskCritical) {
3659 s_IsStopWrite = eStopDiskCritical;
3660 string msg("free " + g_ToSizeStr(free_space) +
3661 ", limit " + g_ToSizeStr(s_DiskCritical));
3662 CNCAlerts::Register(CNCAlerts::eDiskSpaceCritical, msg);
3663 ERR_POST(Critical << "Free disk space is below CRITICAL threshold: "
3664 << msg << ". Will no longer accept any writes.");
3665 }
3666 }
3667 else if (free_space <= s_DiskFreeLimit) {
3668 if (s_IsStopWrite < eStopDiskSpace) {
3669 s_IsStopWrite = eStopDiskSpace;
3670 string msg("free " + g_ToSizeStr(free_space) +
3671 ", limit " + g_ToSizeStr(s_DiskFreeLimit));
3672 CNCAlerts::Register(CNCAlerts::eDiskSpaceLow, msg);
3673 ERR_POST(Critical << "Free disk space is below threshold: "
3674 << msg << ". Will no longer accept any writes from clients.");
3675 }
3676 }
3677 else if (s_IsStopWrite == eStopDiskSpace || s_IsStopWrite == eStopDiskCritical)
3678 {
3679 s_IsStopWrite = eStopWarning;
3680 }
3681
3682 if (s_IsStopWrite == eStopWarning
3683 && cur_db_size * 100 < allowed_db_size * s_WarnLimitOffPct)
3684 {
3685 string msg("Current db size is " + g_ToSizeStr(cur_db_size) +
3686 ", allowed db size is " + g_ToSizeStr(allowed_db_size) +
3687 " (" + g_ToSmartStr(cur_db_size * 100 / allowed_db_size) + "%)");
3688 CNCAlerts::Register(CNCAlerts::eDiskSpaceNormal, msg);
3689 ERR_POST(Critical << "OK. Database is back to normal size. " << msg);
3690 s_IsStopWrite = eNoStop;
3691 }
3692 }
3693
3694
3695 CExpiredCleaner::State
x_DeleteNextData(void)3696 CExpiredCleaner::x_DeleteNextData(void)
3697 {
3698 if (CTaskServer::IsInShutdown())
3699 return NULL;
3700
3701 if (m_CurDelData >= m_CacheDatas.size()) {
3702 if (m_BatchSize < s_GCBatchSize)
3703 ++m_CurBucket;
3704 m_CacheDatas.clear();
3705 return &CExpiredCleaner::x_CleanNextBucket;
3706 }
3707
3708 SNCCacheData* cache_data = m_CacheDatas[m_CurDelData];
3709
3710 cache_data->lock.Lock();
3711 {
3712 CSrvDiagMsg diag_msg;
3713 GetDiagCtx()->SetRequestID();
3714 diag_msg.StartRequest().PrintParam("_type", "expiration");
3715 CNCBlobKeyLight key(cache_data->key);
3716 if (key.IsICacheKey()) {
3717 diag_msg.PrintParam("cache",key.Cache()).PrintParam("key",key.RawKey()).PrintParam("subkey",key.SubKey());
3718 } else {
3719 diag_msg.PrintParam("key",key.RawKey());
3720 }
3721 diag_msg.Flush();
3722 diag_msg.StopRequest();
3723 }
3724 SNCDataCoord coord = cache_data->coord;
3725 Uint8 size = cache_data->size;
3726 Uint4 chunk_size = cache_data->chunk_size;
3727 Uint2 map_size = cache_data->map_size;
3728 cache_data->coord.clear();
3729 cache_data->dead_time = 0;
3730 CNCBlobVerManager* mgr = cache_data->Get_ver_mgr();
3731 if (mgr) {
3732 mgr->ObtainReference();
3733 } else {
3734 CNCBlobStorage::ChangeCacheDeadTime(cache_data);
3735 }
3736
3737 if (!coord.empty()) {
3738 CSrvRef<SNCDBFileInfo> meta_file = s_GetDBFileTry(coord.file_id);
3739 if (meta_file.NotNull()) {
3740 SFileIndexRec* meta_ind = s_GetIndexRecTry(meta_file, coord.rec_num);
3741 if (meta_ind) {
3742 s_CalcMetaAddress(meta_file, meta_ind);
3743 if (!meta_ind->chain_coord.empty()) {
3744 Uint1 map_depth = s_CalcMapDepth(size, chunk_size, map_size);
3745 s_MoveDataToGarbage(meta_ind->chain_coord, map_depth, coord, true);
3746 }
3747 s_MoveRecToGarbage(meta_file, meta_ind);
3748 }
3749 #ifdef _DEBUG
3750 else {
3751 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData2, "DeleteNextData: meta_ind");
3752 }
3753 #endif
3754 }
3755 #ifdef _DEBUG
3756 else {
3757 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData1, "DeleteNextData: meta_file");
3758 }
3759 #endif
3760 }
3761 cache_data->lock.Unlock();
3762
3763 if (mgr) {
3764 mgr->DeleteDeadVersion(m_NextDead);
3765 mgr->Release();
3766 }
3767 CNCBlobStorage::ReleaseCacheData(cache_data);
3768
3769 ++m_CurDelData;
3770 SetRunnable();
3771 return NULL;
3772 }
3773
x_DeleteData(SNCCacheData * cache_data)3774 void CExpiredCleaner::x_DeleteData(SNCCacheData* cache_data)
3775 {
3776 // cache_data is locked by caller
3777 SNCDataCoord coord = cache_data->coord;
3778 Uint8 size = cache_data->size;
3779 Uint4 chunk_size = cache_data->chunk_size;
3780 Uint2 map_size = cache_data->map_size;
3781 cache_data->coord.clear();
3782 if (!coord.empty()) {
3783 #ifdef _DEBUG
3784 CNCAlerts::Register(CNCAlerts::eDebugDeleteVersionData, "x_DeleteData");
3785 #endif
3786 CSrvRef<SNCDBFileInfo> meta_file = s_GetDBFileTry(coord.file_id);
3787 if (meta_file.NotNull()) {
3788 SFileIndexRec* meta_ind = s_GetIndexRecTry(meta_file, coord.rec_num);
3789 if (meta_ind) {
3790 s_CalcMetaAddress(meta_file, meta_ind);
3791 if (!meta_ind->chain_coord.empty()) {
3792 Uint1 map_depth = s_CalcMapDepth(size, chunk_size, map_size);
3793 s_MoveDataToGarbage(meta_ind->chain_coord, map_depth, coord, true);
3794 }
3795 s_MoveRecToGarbage(meta_file, meta_ind);
3796 }
3797 #ifdef _DEBUG
3798 else {
3799 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData2, "DeleteNextData: meta_ind");
3800 }
3801 #endif
3802 }
3803 #ifdef _DEBUG
3804 else {
3805 CNCAlerts::Register(CNCAlerts::eDebugDeleteNextData1, "DeleteNextData: meta_file");
3806 }
3807 #endif
3808 }
3809 }
3810
3811 CExpiredCleaner::State
x_StartSession(void)3812 CExpiredCleaner::x_StartSession(void)
3813 {
3814 m_StartTime = CSrvTime::CurSecs();
3815 m_NextDead = m_StartTime;
3816 if (m_DoExtraGC) {
3817 if (s_CurDBSize <= s_ExtraGCOffSize) {
3818 m_DoExtraGC = false;
3819 }
3820 else {
3821 ++m_ExtraGCTime;
3822 m_NextDead += m_ExtraGCTime;
3823 }
3824 }
3825 else if (s_ExtraGCOnSize != 0
3826 && s_CurDBSize >= s_ExtraGCOnSize)
3827 {
3828 m_DoExtraGC = true;
3829 m_ExtraGCTime = 1;
3830 m_NextDead += m_ExtraGCTime;
3831 }
3832
3833 m_CurBucket = 1;
3834 CreateNewDiagCtx();
3835 return &CExpiredCleaner::x_CleanNextBucket;
3836 }
3837
3838 CExpiredCleaner::State
x_CleanNextBucket(void)3839 CExpiredCleaner::x_CleanNextBucket(void)
3840 {
3841 if (CTaskServer::IsInShutdown())
3842 return NULL;
3843 // if all buckets clean, finish
3844 if (m_CurBucket > CNCDistributionConf::GetCntTimeBuckets())
3845 return &CExpiredCleaner::x_FinishSession;
3846
3847 m_BatchSize = 0;
3848 // s_TimeTables has blob info sorted by dead_time
3849 STimeTable* table = s_TimeTables[m_CurBucket];
3850 TTimeTableMap& time_map = table->time_map;
3851 table->lock.Lock();
3852 TTimeTableMap::iterator it = time_map.begin();
3853 SNCCacheData* last_data = NULL;
3854 for (; it != time_map.end(); ++m_BatchSize) {
3855 if (m_BatchSize >= s_GCBatchSize)
3856 break;
3857
3858 #if __NC_CACHEDATA_INTR_SET
3859 SNCCacheData* cache_data = &*it;
3860 #else
3861 SNCCacheData* cache_data = *it;
3862 #endif
3863 if (cache_data->saved_dead_time >= m_NextDead)
3864 break;
3865
3866 int dead_time = ACCESS_ONCE(cache_data->dead_time);
3867 if (dead_time != 0) {
3868 // dead_time has changed, put blob back
3869 if (dead_time != cache_data->saved_dead_time) {
3870 #if __NC_CACHEDATA_INTR_SET
3871 time_map.erase(time_map.iterator_to(*cache_data));
3872 cache_data->saved_dead_time = dead_time;
3873 time_map.insert_equal(*cache_data);
3874 if (last_data) {
3875 it = time_map.iterator_to(*last_data);
3876 ++it;
3877 }
3878 else
3879 it = time_map.begin();
3880 #else
3881 time_map.erase(cache_data);
3882 cache_data->saved_dead_time = dead_time;
3883 time_map.insert(cache_data);
3884 if (last_data) {
3885 it = time_map.find(last_data);
3886 ++it;
3887 } else {
3888 it = time_map.begin();
3889 }
3890 #endif
3891 continue;
3892 }
3893 // increment ref counter
3894 CNCBlobStorage::ReferenceCacheData(cache_data);
3895 m_CacheDatas.push_back(cache_data);
3896 }
3897 last_data = cache_data;
3898 ++it;
3899 }
3900 table->lock.Unlock();
3901
3902 if (m_BatchSize == 0) {
3903 // goto next bucket
3904 ++m_CurBucket;
3905 SetRunnable();
3906 return NULL;
3907 }
3908
3909 m_CurDelData = 0;
3910 return &CExpiredCleaner::x_DeleteNextData;
3911 }
3912
3913 CExpiredCleaner::State
x_FinishSession(void)3914 CExpiredCleaner::x_FinishSession(void)
3915 {
3916 ReleaseDiagCtx();
3917 SetState(&CExpiredCleaner::x_StartSession);
3918 if (!CTaskServer::IsInShutdown()) {
3919 if (CSrvTime::CurSecs() == m_StartTime)
3920 RunAfter(1);
3921 else
3922 SetRunnable();
3923 }
3924 return NULL;
3925 }
3926
CExpiredCleaner(void)3927 CExpiredCleaner::CExpiredCleaner(void)
3928 : m_DoExtraGC(false)
3929 {
3930 #if __NC_TASKS_MONITOR
3931 m_TaskName = "CExpiredCleaner";
3932 #endif
3933 SetState(&CExpiredCleaner::x_StartSession);
3934 }
3935
~CExpiredCleaner(void)3936 CExpiredCleaner::~CExpiredCleaner(void)
3937 {}
3938
3939
3940 CSpaceShrinker::State
x_MoveRecord(void)3941 CSpaceShrinker::x_MoveRecord(void)
3942 {
3943 CSrvRef<SNCDBFileInfo> chain_file;
3944 SFileIndexRec* chain_ind = NULL;
3945 SFileChunkMapRec* map_rec = NULL;
3946 SFileChunkMapRec* up_map = NULL;
3947 Uint2 up_index = Uint2(-1);
3948
3949 SNCDataCoord old_coord;
3950 old_coord.file_id = m_MaxFile->file_id;
3951 old_coord.rec_num = m_RecNum;
3952
3953 SNCDataCoord new_coord;
3954 CSrvRef<SNCDBFileInfo> new_file;
3955 SFileIndexRec* new_ind;
3956 if (!s_GetNextWriteCoord(m_MaxFile->type_index,
3957 m_IndRec->rec_size, new_coord, new_file, new_ind))
3958 {
3959 #ifdef _DEBUG
3960 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord0,"s_GetNextWriteCoord");
3961 #endif
3962 m_Failed = true;
3963 return &CSpaceShrinker::x_FinishMoveRecord;
3964 }
3965
3966 memcpy(new_file->file_map + new_ind->offset,
3967 m_MaxFile->file_map + m_IndRec->offset,
3968 m_IndRec->rec_size);
3969
3970 new_ind->cache_data = m_CacheData;
3971 new_ind->rec_type = m_IndRec->rec_type;
3972 SNCDataCoord chain_coord = m_IndRec->chain_coord;
3973 new_ind->chain_coord = chain_coord;
3974
3975 if (!chain_coord.empty()) {
3976 chain_file = s_GetDBFileTry(chain_coord.file_id);
3977 if (chain_file.IsNull()) {
3978 goto wipe_new_record;
3979 }
3980 chain_ind = s_GetIndOrDeleted(chain_file, chain_coord.rec_num);
3981 if (s_IsIndexDeleted(chain_file, chain_ind)
3982 || chain_file->cnt_unfinished.Get() != 0)
3983 {
3984 goto wipe_new_record;
3985 }
3986
3987 // Checks below are not just checks for corruption but also
3988 // an opportunity to fault in all database pages that will be necessary
3989 // to make all changes. This way we minimize the time that cache_data
3990 // lock will be held.
3991 switch (m_IndRec->rec_type) {
3992 case eFileRecMeta:
3993 if (chain_ind->chain_coord != old_coord
3994 && !s_IsIndexDeleted(m_MaxFile, m_IndRec))
3995 {
3996 DB_CORRUPTED("Meta with coord " << old_coord
3997 << " links down to record with coord " << new_ind->chain_coord
3998 << " but it has up coord " << chain_ind->chain_coord);
3999 }
4000 break;
4001 case eFileRecChunkMap:
4002 map_rec = s_CalcMapAddress(new_file, new_ind);
4003 up_index = map_rec->map_idx;
4004 Uint2 cnt_downs;
4005 cnt_downs = s_CalcCntMapDowns(new_ind->rec_size);
4006 for (Uint2 i = 0; i < cnt_downs; ++i) {
4007 SNCDataCoord down_coord = map_rec->down_coords[i];
4008 CSrvRef<SNCDBFileInfo> down_file = s_GetDBFileTry(down_coord.file_id);
4009 if (down_file.IsNull()) {
4010 goto wipe_new_record;
4011 }
4012 SFileIndexRec* down_ind = s_GetIndOrDeleted(down_file, down_coord.rec_num);
4013 if (s_IsIndexDeleted(down_file, down_ind))
4014 goto wipe_new_record;
4015 if (down_ind->chain_coord != old_coord) {
4016 DB_CORRUPTED("Map with coord " << old_coord
4017 << " links down to record with coord " << down_coord
4018 << " at index " << i
4019 << " but it has up coord " << down_ind->chain_coord);
4020 }
4021 }
4022 goto check_up_index;
4023
4024 case eFileRecChunkData:
4025 SFileChunkDataRec* data_rec;
4026 data_rec = s_CalcChunkAddress(new_file, new_ind);
4027 up_index = data_rec->chunk_idx;
4028 check_up_index:
4029 if (chain_ind->rec_type == eFileRecChunkMap) {
4030 up_map = s_CalcMapAddress(chain_file, chain_ind);
4031 if (up_map->down_coords[up_index] != old_coord) {
4032 DB_CORRUPTED("Record with coord " << old_coord
4033 << " links up to map with coord " << new_ind->chain_coord
4034 << " but at the index " << up_index
4035 << " it has coord " << up_map->down_coords[up_index]);
4036 }
4037 }
4038 else if (chain_ind->chain_coord != old_coord) {
4039 DB_CORRUPTED("Record with coord " << old_coord
4040 << " links up to meta with coord " << new_ind->chain_coord
4041 << " but it has down coord " << chain_ind->chain_coord);
4042 }
4043 break;
4044 }
4045 }
4046
4047 if (!m_CurVer) {
4048 m_CacheData->lock.Lock();
4049 if (m_CacheData->Get_ver_mgr()
4050 || m_IndRec->chain_coord != chain_coord
4051 || (!chain_coord.empty() && s_IsIndexDeleted(chain_file, chain_ind)
4052 && !s_IsIndexDeleted(m_MaxFile, m_IndRec)))
4053 {
4054 m_Failed = true;
4055 goto unlock_and_wipe;
4056 }
4057 if (m_CacheData->coord.empty()
4058 || m_CacheData->dead_time <= CSrvTime::CurSecs() + s_MinMoveLife
4059 || s_IsIndexDeleted(m_MaxFile, m_IndRec))
4060 {
4061 goto unlock_and_wipe;
4062 }
4063 }
4064
4065 switch (m_IndRec->rec_type) {
4066 case eFileRecMeta:
4067 #ifdef _DEBUG
4068 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord1,"eFileRecMeta");
4069 #endif
4070 if (m_CurVer) {
4071 if (m_CurVer->coord != old_coord) {
4072 // If coord for the version changed under us it's a bug.
4073 DB_CORRUPTED("Coord " << old_coord << " of the current version for blob "
4074 << m_CacheData->key << " changed while move was in progress.");
4075 }
4076 m_CurVer->coord = new_coord;
4077 }
4078 else {
4079 if (m_CacheData->coord != old_coord) {
4080 // If there is no VerManager and still coord is something different
4081 // it's a bug.
4082 DB_CORRUPTED("Meta record with coord " << old_coord
4083 << " points to cache_data with key " << m_CacheData->key
4084 << " which has different coord " << m_CacheData->coord << ".");
4085 }
4086 m_CacheData->coord = new_coord;
4087 }
4088 if (chain_ind)
4089 chain_ind->chain_coord = new_coord;
4090 break;
4091 case eFileRecChunkMap:
4092 #ifdef _DEBUG
4093 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord2,"eFileRecChunkMap");
4094 #endif
4095 if (!s_UpdateUpCoords(map_rec, new_ind, new_coord)) {
4096 if (!m_CurVer) {
4097 goto unlock_and_wipe;
4098 } else {
4099 goto wipe_new_record;
4100 }
4101 }
4102 if (m_CurVer)
4103 ++m_CurVer->map_move_counter;
4104 goto update_up_map;
4105
4106 case eFileRecChunkData:
4107 #ifdef _DEBUG
4108 CNCAlerts::Register(CNCAlerts::eDebugMoveRecord3,"eFileRecChunkData");
4109 #endif
4110 if (m_CurVer) {
4111 SFileChunkDataRec* new_data = s_CalcChunkAddress(new_file, new_ind);
4112 m_CurVer->chunks[new_data->chunk_num] = (char*)new_data->chunk_data;
4113 }
4114 update_up_map:
4115 if (up_map) {
4116 up_map->down_coords[up_index] = new_coord;
4117 }
4118 else {
4119 chain_ind->chain_coord = new_coord;
4120 if (m_CurVer)
4121 m_CurVer->data_coord = new_coord;
4122 }
4123 break;
4124 }
4125
4126 if (m_CurVer) {
4127 if (m_MovingMeta) {
4128 s_MoveRecToGarbage(m_MaxFile, m_IndRec);
4129 }
4130 else {
4131 CMovedRecDeleter* deleter = new CMovedRecDeleter(m_MaxFile, m_IndRec);
4132 deleter->CallRCU();
4133 }
4134 }
4135 else {
4136 m_CacheData->lock.Unlock();
4137 s_MoveRecToGarbage(m_MaxFile, m_IndRec);
4138 }
4139 new_file->cnt_unfinished.Add(-1);
4140
4141 ++m_CntMoved;
4142 m_SizeMoved += m_IndRec->rec_size + sizeof(SFileIndexRec);
4143
4144 return &CSpaceShrinker::x_FinishMoveRecord;
4145
4146 unlock_and_wipe:
4147 m_CacheData->lock.Unlock();
4148 wipe_new_record:
4149 new_ind->rec_type = eFileRecChunkData;
4150 new_ind->chain_coord.clear();
4151 s_MoveRecToGarbage(new_file, new_ind);
4152 new_file->cnt_unfinished.Add(-1);
4153
4154 return &CSpaceShrinker::x_FinishMoveRecord;
4155 }
4156
4157 CSpaceShrinker::State
x_PrepareToShrink(void)4158 CSpaceShrinker::x_PrepareToShrink(void)
4159 {
4160 if (CTaskServer::IsInShutdown())
4161 return NULL;
4162
4163 int cur_time = CSrvTime::CurSecs();
4164 bool need_move = s_CurDBSize >= s_MinDBSize
4165 && s_GarbageSize * 100 > s_CurDBSize * s_MaxGarbagePct;
4166 m_MaxFile = NULL;
4167
4168 double max_pct = 0;
4169 Uint8 total_rel_used = 0;
4170 Uint8 total_rel_garb = 0;
4171
4172 s_DBFilesLock.Lock();
4173 ITERATE(TNCDBFilesMap, it_file, (*s_DBFiles)) {
4174 SNCDBFileInfo* this_file = it_file->second.GetNCPointerOrNull();
4175 s_NextWriteLock.Lock();
4176 bool is_current = false;
4177 for (size_t i = 0; i < s_CntAllFiles; ++i) {
4178 if (this_file == s_AllWritings[i].cur_file
4179 || this_file == s_AllWritings[i].next_file)
4180 {
4181 is_current = true;
4182 break;
4183 }
4184 }
4185 s_NextWriteLock.Unlock();
4186 if (is_current || this_file->cnt_unfinished.Get() != 0)
4187 continue;
4188
4189 if (this_file->used_size == 0) {
4190 m_FilesToDel.push_back(SrvRef(this_file));
4191 }
4192 else if (need_move) {
4193 if (cur_time >= this_file->next_shrink_time) {
4194 this_file->info_lock.Lock();
4195 this_file->is_releasing = false;
4196 if (this_file->garb_size + this_file->used_size != 0) {
4197 #if 0
4198 double this_pct = double(this_file->garb_size)
4199 / (this_file->garb_size + this_file->used_size);
4200 #else
4201 double this_pct = double(this_file->garb_size) / double(this_file->file_size);
4202 #endif
4203 if (this_pct > max_pct) {
4204 max_pct = this_pct;
4205 m_MaxFile = this_file;
4206 }
4207 }
4208 this_file->info_lock.Unlock();
4209 }
4210 else if (this_file->is_releasing) {
4211 this_file->info_lock.Lock();
4212 total_rel_used += this_file->used_size;
4213 total_rel_garb += this_file->garb_size;
4214 this_file->info_lock.Unlock();
4215 }
4216 }
4217 }
4218 s_DBFilesLock.Unlock();
4219
4220 if (max_pct < 0.9) {
4221 Uint8 proj_garbage = s_GarbageSize - total_rel_garb;
4222 Uint8 proj_size = s_CurDBSize - (total_rel_garb + total_rel_used);
4223 if (need_move
4224 && (proj_garbage * 100 <= proj_size * s_MaxGarbagePct
4225 || max_pct * 100 <= s_MaxGarbagePct))
4226 {
4227 m_MaxFile = NULL;
4228 }
4229 }
4230
4231 m_CurDelFile = m_FilesToDel.begin();
4232 SetState(&CSpaceShrinker::x_DeleteNextFile);
4233 SetRunnable();
4234 return NULL;
4235 }
4236
4237 CSpaceShrinker::State
x_DeleteNextFile(void)4238 CSpaceShrinker::x_DeleteNextFile(void)
4239 {
4240 if (CTaskServer::IsInShutdown())
4241 return NULL;
4242
4243 if (m_CurDelFile == m_FilesToDel.end()) {
4244 m_FilesToDel.clear();
4245 return &CSpaceShrinker::x_StartMoves;
4246 }
4247
4248 #ifdef _DEBUG
4249 CNCAlerts::Register(CNCAlerts::eDebugDeleteFile,"x_DeleteNextFile");
4250 #endif
4251 s_DeleteDBFile(*m_CurDelFile, true);
4252 m_CurDelFile->Reset();
4253 ++m_CurDelFile;
4254 SetRunnable();
4255 return NULL;
4256 }
4257
4258 CSpaceShrinker::State
x_StartMoves(void)4259 CSpaceShrinker::x_StartMoves(void)
4260 {
4261 m_StartTime = CSrvTime::CurSecs();
4262 /*
4263 bool need_move = s_CurDBSize >= s_MinDBSize
4264 && s_GarbageSize * 100 > s_CurDBSize * s_MaxGarbagePct;
4265 */
4266 if (!m_MaxFile /*|| !need_move*/)
4267 return &CSpaceShrinker::x_FinishSession;
4268
4269 CreateNewDiagCtx();
4270 CSrvDiagMsg().StartRequest()
4271 .PrintParam("_type", "move")
4272 .PrintParam("file_id", m_MaxFile->file_id);
4273
4274 m_Failed = false;
4275 m_PrevRecNum = 0;
4276 m_LastAlive = 0;
4277 m_CntProcessed = 0;
4278 m_CntMoved = 0;
4279 m_SizeMoved = 0;
4280
4281 return &CSpaceShrinker::x_MoveNextRecord;
4282 }
4283
4284 CSpaceShrinker::State
x_MoveNextRecord(void)4285 CSpaceShrinker::x_MoveNextRecord(void)
4286 {
4287 if (CTaskServer::IsInShutdown())
4288 return &CSpaceShrinker::x_FinishMoves;
4289
4290 int cur_time = CSrvTime::CurSecs();
4291 m_MaxFile->info_lock.Lock();
4292 m_RecNum = m_LastAlive;
4293 m_IndRec = m_MaxFile->index_head - m_RecNum;
4294 if (m_RecNum != 0 && s_IsIndexDeleted(m_MaxFile, m_IndRec)) {
4295 m_LastAlive = m_RecNum = 0;
4296 m_IndRec = m_MaxFile->index_head;
4297 }
4298 do {
4299 if (m_RecNum > m_PrevRecNum)
4300 ++m_CntProcessed;
4301 if (m_IndRec->next_num == 0) {
4302 if (m_MaxFile->index_head->next_num == 0 && m_MaxFile->used_size != 0) {
4303 SRV_FATAL("Blob coords index broken");
4304 }
4305 m_IndRec = NULL;
4306 break;
4307 }
4308 m_LastAlive = m_RecNum;
4309 m_RecNum = m_IndRec->next_num;
4310 m_IndRec = m_MaxFile->index_head - m_RecNum;
4311 }
4312 while (m_RecNum <= m_PrevRecNum
4313 #if !__NC_CACHEDATA_ALL_MONITOR
4314 || m_IndRec->cache_data->dead_time <= cur_time + s_MinMoveLife
4315 #endif
4316 );
4317 m_MaxFile->info_lock.Unlock();
4318 if (!m_IndRec)
4319 return &CSpaceShrinker::x_FinishMoves;
4320 if (s_IsIndexDeleted(m_MaxFile, m_IndRec))
4321 return &CSpaceShrinker::x_FinishMoveRecord;
4322
4323 m_CacheData = m_IndRec->cache_data;
4324
4325 #if __NC_CACHEDATA_ALL_MONITOR
4326 bool found = false;
4327 SAllCacheTable* table = nullptr;
4328 {
4329 vector<SAllCacheTable*> v_all;
4330 v_all.reserve(s_AllCache.size());
4331 ITERATE(TAllCacheBuckets, tt, s_AllCache) {
4332 v_all.push_back(tt->second);
4333 }
4334 random_shuffle(v_all.begin(), v_all.end());
4335 ITERATE(vector<SAllCacheTable*>, tt, v_all) {
4336 table = *tt;
4337 table->lock.Lock();
4338 found = table->all_cache_set.find(m_CacheData) != table->all_cache_set.end();
4339 table->lock.Unlock();
4340 if (found) {
4341 break;
4342 }
4343 }
4344 }
4345 if (found) {
4346 if (m_CacheData->dead_time == 0) {
4347 table->lock.Lock();
4348 table->all_cache_set.erase(m_CacheData);
4349 table->lock.Unlock();
4350 // found = false;
4351 #ifdef _DEBUG
4352 CNCAlerts::Register(CNCAlerts::eDebugWrongCacheFound2, "CSpaceShrinker::x_MoveNextRecord");
4353 #endif
4354 }
4355 }
4356 if (!found) {
4357 #ifdef _DEBUG
4358 CNCAlerts::Register(CNCAlerts::eDebugWrongCacheFound1, "CSpaceShrinker::x_MoveNextRecord");
4359 #endif
4360 m_CacheData = nullptr;
4361 m_MaxFile->info_lock.Lock();
4362 if (!s_IsIndexDeleted(m_MaxFile, m_IndRec)) {
4363 s_MoveRecToGarbageNoLock(m_MaxFile, m_IndRec);
4364 }
4365 m_MaxFile->info_lock.Unlock();
4366 return &CSpaceShrinker::x_FinishMoveRecord;
4367 }
4368 #endif // __NC_CACHEDATA_ALL_MONITOR
4369
4370 m_CacheData->lock.Lock();
4371 bool need_move = m_CacheData->dead_time > cur_time + s_MinMoveLife;
4372 if (need_move) {
4373 CNCBlobStorage::ReferenceCacheData(m_CacheData);
4374 m_VerMgr = m_CacheData->Get_ver_mgr();
4375 if (m_VerMgr) {
4376 m_VerMgr->ObtainReference();
4377 m_CacheData->lock.Unlock();
4378
4379 m_VerMgr->RequestCurVersion(this);
4380 return &CSpaceShrinker::x_CheckCurVersion;
4381 }
4382 }
4383 m_CacheData->lock.Unlock();
4384 if (!need_move) {
4385 return &CSpaceShrinker::x_FinishMoves;
4386 }
4387
4388 if (s_IsIndexDeleted(m_MaxFile, m_IndRec)) {
4389 CNCBlobStorage::ReleaseCacheData(m_CacheData);
4390 m_CacheData = nullptr;
4391 SetRunnable();
4392 return NULL;
4393 }
4394 return &CSpaceShrinker::x_MoveRecord;
4395 }
4396
4397 SNCDataCoord
x_FindMetaCoord(SNCDataCoord coord,Uint1 max_map_depth)4398 CSpaceShrinker::x_FindMetaCoord(SNCDataCoord coord, Uint1 max_map_depth)
4399 {
4400 if (max_map_depth == 0 || coord.empty())
4401 return coord;
4402
4403 CSrvRef<SNCDBFileInfo> file_info = s_GetDBFileTry(coord.file_id);
4404 if (file_info.IsNull()) {
4405 coord.clear();
4406 return coord;
4407 }
4408 SFileIndexRec* ind_rec = s_GetIndOrDeleted(file_info, coord.rec_num);
4409 if (s_IsIndexDeleted(file_info, ind_rec) || ind_rec->rec_type == eFileRecMeta)
4410 return coord;
4411
4412 return x_FindMetaCoord(ind_rec->chain_coord, max_map_depth - 1);
4413 }
4414
4415 CSpaceShrinker::State
x_CheckCurVersion(void)4416 CSpaceShrinker::x_CheckCurVersion(void)
4417 {
4418 if (!IsTransFinished())
4419 return NULL;
4420 if (CTaskServer::IsInShutdown())
4421 return &CSpaceShrinker::x_FinishMoveRecord;
4422
4423 if (s_IsIndexDeleted(m_MaxFile, m_IndRec))
4424 return &CSpaceShrinker::x_FinishMoveRecord;
4425
4426 m_CurVer = m_VerMgr->GetCurVersion();
4427 if (!m_CurVer)
4428 return &CSpaceShrinker::x_FinishMoveRecord;
4429
4430 if (m_IndRec->rec_type == eFileRecChunkData) {
4431 SFileChunkDataRec* data_rec = s_CalcChunkAddress(m_MaxFile, m_IndRec);
4432 Uint8 chunk_num = data_rec->chunk_num;
4433 char* cur_chunk_ptr = NULL;
4434 if (m_CurVer->chunks.size() > chunk_num)
4435 cur_chunk_ptr = ACCESS_ONCE(m_CurVer->chunks[chunk_num]);
4436 if (cur_chunk_ptr) {
4437 if (cur_chunk_ptr != (char*)data_rec->chunk_data)
4438 return &CSpaceShrinker::x_FinishMoveRecord;
4439 if (m_CurVer->coord.empty()) {
4440 m_Failed = true;
4441 return &CSpaceShrinker::x_FinishMoveRecord;
4442 }
4443 }
4444 else {
4445 SNCDataCoord meta_coord = x_FindMetaCoord(m_IndRec->chain_coord,
4446 m_CurVer->map_depth);
4447 if (meta_coord.empty() || meta_coord != m_CurVer->coord)
4448 return &CSpaceShrinker::x_FinishMoveRecord;
4449 }
4450 }
4451 else if (m_IndRec->rec_type == eFileRecChunkMap) {
4452 if (m_CurVer->map_depth == 0)
4453 return &CSpaceShrinker::x_FinishMoveRecord;
4454 SNCDataCoord meta_coord = x_FindMetaCoord(m_IndRec->chain_coord,
4455 m_CurVer->map_depth - 1);
4456 if (meta_coord.empty() && m_CurVer->coord.empty()) {
4457 m_Failed = true;
4458 return &CSpaceShrinker::x_FinishMoveRecord;
4459 }
4460 if (meta_coord.empty() || meta_coord != m_CurVer->coord)
4461 return &CSpaceShrinker::x_FinishMoveRecord;
4462 }
4463 else if (m_CurVer->coord.file_id != m_MaxFile->file_id
4464 || m_CurVer->coord.rec_num != m_RecNum)
4465 {
4466 return &CSpaceShrinker::x_FinishMoveRecord;
4467 }
4468 else if (m_CurVer->move_or_rewrite
4469 || !AtomicCAS(m_CurVer->move_or_rewrite, false, true))
4470 {
4471 m_Failed = true;
4472 return &CSpaceShrinker::x_FinishMoveRecord;
4473 }
4474
4475 m_MovingMeta = m_IndRec->rec_type == eFileRecMeta;
4476 if (m_CurVer->dead_time < CSrvTime::CurSecs() + s_MinMoveLife)
4477 return &CSpaceShrinker::x_FinishMoveRecord;
4478 else
4479 return &CSpaceShrinker::x_MoveRecord;
4480 }
4481
4482 CSpaceShrinker::State
x_FinishMoveRecord(void)4483 CSpaceShrinker::x_FinishMoveRecord(void)
4484 {
4485 if (m_CacheData) {
4486 CNCBlobStorage::ReleaseCacheData(m_CacheData);
4487 m_CacheData = nullptr;
4488 }
4489 if (m_VerMgr) {
4490 if (m_CurVer) {
4491 if (m_MovingMeta)
4492 m_CurVer->move_or_rewrite = false;
4493 m_CurVer.Reset();
4494 }
4495 m_VerMgr->Release();
4496 m_VerMgr = NULL;
4497 }
4498
4499 ++m_CntProcessed;
4500 m_PrevRecNum = m_RecNum;
4501
4502 if (m_Failed || CTaskServer::IsInShutdown())
4503 SetState(&CSpaceShrinker::x_FinishMoves);
4504 else
4505 SetState(&CSpaceShrinker::x_MoveNextRecord);
4506 SetRunnable();
4507 return NULL;
4508 }
4509
4510 CSpaceShrinker::State
x_FinishMoves(void)4511 CSpaceShrinker::x_FinishMoves(void)
4512 {
4513 if (!m_Failed) {
4514 m_MaxFile->is_releasing = true;
4515 if (m_MaxFile->used_size == 0)
4516 s_DeleteDBFile(m_MaxFile, true);
4517 else if (m_CntProcessed == 0) {
4518 SRV_LOG(Warning, "Didn't find anything to process in the file");
4519 m_MaxFile->next_shrink_time = CSrvTime::CurSecs() + max(s_MinMoveLife, 300);
4520 }
4521 else
4522 m_MaxFile->next_shrink_time = CSrvTime::CurSecs() + s_MinMoveLife;
4523 }
4524 else {
4525 GetDiagCtx()->SetRequestStatus(eStatus_CmdAborted);
4526 m_MaxFile->next_shrink_time = CSrvTime::CurSecs() + s_FailedMoveDelay;
4527 }
4528 m_MaxFile.Reset();
4529
4530 CSrvDiagMsg().PrintExtra()
4531 .PrintParam("cnt_processed", m_CntProcessed)
4532 .PrintParam("cnt_moved", m_CntMoved)
4533 .PrintParam("size_moved", m_SizeMoved);
4534 CSrvDiagMsg().StopRequest();
4535 ReleaseDiagCtx();
4536 CNCStat::DBFileCleaned(!m_Failed, m_CntProcessed, m_CntMoved, m_SizeMoved);
4537
4538 return &CSpaceShrinker::x_FinishSession;
4539 }
4540
4541 CSpaceShrinker::State
x_FinishSession(void)4542 CSpaceShrinker::x_FinishSession(void)
4543 {
4544 SetState(&CSpaceShrinker::x_PrepareToShrink);
4545 if (!CTaskServer::IsInShutdown()) {
4546 if (CSrvTime::CurSecs() != m_StartTime)
4547 SetRunnable();
4548 else
4549 RunAfter(1);
4550 }
4551 return NULL;
4552 }
4553
CSpaceShrinker(void)4554 CSpaceShrinker::CSpaceShrinker(void)
4555 {
4556 #if __NC_TASKS_MONITOR
4557 m_TaskName = "CSpaceShrinker";
4558 #endif
4559 SetState(&CSpaceShrinker::x_PrepareToShrink);
4560 }
4561
~CSpaceShrinker(void)4562 CSpaceShrinker::~CSpaceShrinker(void)
4563 {}
4564
4565
CMovedRecDeleter(SNCDBFileInfo * file_info,SFileIndexRec * ind_rec)4566 CMovedRecDeleter::CMovedRecDeleter(SNCDBFileInfo* file_info, SFileIndexRec* ind_rec)
4567 : m_FileInfo(file_info),
4568 m_IndRec(ind_rec)
4569 {}
4570
~CMovedRecDeleter(void)4571 CMovedRecDeleter::~CMovedRecDeleter(void)
4572 {}
4573
4574 void
ExecuteRCU(void)4575 CMovedRecDeleter::ExecuteRCU(void)
4576 {
4577 s_MoveRecToGarbage(m_FileInfo, m_IndRec);
4578 delete this;
4579 }
4580
4581
CRecNoSaver(void)4582 CRecNoSaver::CRecNoSaver(void)
4583 {
4584 #if __NC_TASKS_MONITOR
4585 m_TaskName = "CRecNoSaver";
4586 #endif
4587 }
4588
~CRecNoSaver(void)4589 CRecNoSaver::~CRecNoSaver(void)
4590 {}
4591
4592 void
ExecuteSlice(TSrvThreadNum)4593 CRecNoSaver::ExecuteSlice(TSrvThreadNum /* thr_num */)
4594 {
4595
4596 if (s_NeedSavePurgeData) {
4597 s_NeedSavePurgeData = false;
4598 s_IndexLock.Lock();
4599 try {
4600 string forget = CNCBlobAccessor::GetPurgeData(';');
4601 INFO("Updated Purge data: " << forget);
4602 s_IndexDB->UpdatePurgeData(forget);
4603 }
4604 catch (CSQLITE_Exception&) {
4605 }
4606 s_IndexLock.Unlock();
4607 }
4608
4609 // max record number used in sync logs
4610 int cur_time = CSrvTime::CurSecs();
4611 int next_save = s_LastRecNoSaveTime + s_MinRecNoSavePeriod;
4612 if (!s_NeedSaveLogRecNo)
4613 next_save = cur_time + s_MinRecNoSavePeriod;
4614 else if (CTaskServer::IsInShutdown())
4615 next_save = cur_time;
4616
4617 if (cur_time < next_save) {
4618 if (!CTaskServer::IsInShutdown())
4619 RunAfter(next_save - cur_time);
4620 return;
4621 }
4622
4623 s_NeedSaveLogRecNo = false;
4624 Uint8 log_rec_no = CNCSyncLog::GetLastRecNo();
4625 s_IndexLock.Lock();
4626 try {
4627 s_IndexDB->SetMaxSyncLogRecNo(log_rec_no);
4628 }
4629 catch (CSQLITE_Exception& ex) {
4630 SRV_LOG(Critical, "Cannot save sync log record number: " << ex);
4631 }
4632 s_IndexLock.Unlock();
4633
4634 s_LastRecNoSaveTime = cur_time;
4635 RunAfter(s_MinRecNoSavePeriod);
4636 }
4637
4638
CDiskFlusher(void)4639 CDiskFlusher::CDiskFlusher(void)
4640 {
4641 #if __NC_TASKS_MONITOR
4642 m_TaskName = "CDiskFlusher";
4643 #endif
4644 SetState(&CDiskFlusher::x_CheckFlushTime);
4645 }
4646
~CDiskFlusher(void)4647 CDiskFlusher::~CDiskFlusher(void)
4648 {}
4649
4650 CDiskFlusher::State
x_CheckFlushTime(void)4651 CDiskFlusher::x_CheckFlushTime(void)
4652 {
4653 if (!s_FlushTimePeriod || CTaskServer::IsInShutdown())
4654 return NULL;
4655
4656 int cur_time = CSrvTime::CurSecs();
4657 if (cur_time < s_LastFlushTime + s_FlushTimePeriod) {
4658 RunAfter(s_LastFlushTime + s_FlushTimePeriod - cur_time);
4659 return NULL;
4660 }
4661
4662 m_LastId = 0;
4663 return &CDiskFlusher::x_FlushNextFile;
4664 }
4665
4666 CDiskFlusher::State
x_FlushNextFile(void)4667 CDiskFlusher::x_FlushNextFile(void)
4668 {
4669 if (CTaskServer::IsInShutdown())
4670 return NULL;
4671
4672 s_DBFilesLock.Lock();
4673 TNCDBFilesMap::iterator file_it = s_DBFiles->upper_bound(m_LastId);
4674 if (file_it == s_DBFiles->end()) {
4675 s_DBFilesLock.Unlock();
4676 s_LastFlushTime = CSrvTime::CurSecs();
4677 return &CDiskFlusher::x_CheckFlushTime;
4678 }
4679 CSrvRef<SNCDBFileInfo> file_info = file_it->second;
4680 s_DBFilesLock.Unlock();
4681
4682 #ifdef NCBI_OS_LINUX
4683 if (msync(file_info->file_map, file_info->file_size, MS_SYNC)) {
4684 SRV_LOG(Critical, "Unable to sync file" << file_info->file_name
4685 << ", errno=" << errno);
4686 }
4687 #endif
4688
4689 m_LastId = file_info->file_id;
4690 SetRunnable();
4691 return NULL;
4692 }
4693
4694
CNewFileCreator(void)4695 CNewFileCreator::CNewFileCreator(void)
4696 {
4697 #if __NC_TASKS_MONITOR
4698 m_TaskName = "CNewFileCreator";
4699 #endif
4700 }
4701
~CNewFileCreator(void)4702 CNewFileCreator::~CNewFileCreator(void)
4703 {}
4704
4705 void
ExecuteSlice(TSrvThreadNum)4706 CNewFileCreator::ExecuteSlice(TSrvThreadNum /* thr_num */)
4707 {
4708 // create new (next) db file
4709 for (size_t i = 0; i < s_CntAllFiles; ++i) {
4710 bool need_new = false;
4711 s_NextWriteLock.Lock();
4712 need_new = s_AllWritings[i].next_file == NULL;
4713 s_NextWriteLock.Unlock();
4714 if (need_new) {
4715 s_CreateNewFile(i, true);
4716 SetRunnable();
4717 break;
4718 }
4719 }
4720 }
4721
4722
4723 void
ExecuteRCU(void)4724 SNCCacheData::ExecuteRCU(void)
4725 {
4726 delete this;
4727 }
4728
4729 END_NCBI_SCOPE
4730