1 /* $Id: nc_storage_blob.cpp 577955 2019-01-10 14:14:00Z gouriano $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Pavel Ivanov
27 */
28
29 #include "nc_pch.hpp"
30 #include <corelib/request_ctx.hpp>
31
32 #include "netcached.hpp"
33 #include "distribution_conf.hpp"
34 #include "nc_storage_blob.hpp"
35 #include "nc_storage.hpp"
36 #include "storage_types.hpp"
37 #include "nc_stat.hpp"
38 #include <set>
39
40 BEGIN_NCBI_SCOPE
41
42 struct SWriteBackData
43 {
44 CMiniMutex lock;
45 size_t cur_size;
46 size_t releasable_size;
47 size_t releasing_size;
48 vector<SNCBlobVerData*> *to_add_list;
49 vector<SNCBlobVerData*> *to_del_list;
50
51 SWriteBackData(void);
52 };
53
54 extern Uint4 s_TaskPriorityWbMemRelease;
55 static size_t s_WBSoftSizeLimit = NCBI_CONST_UINT8(2000000000);
56 static size_t s_WBHardSizeLimit = NCBI_CONST_UINT8(3000000000);
57 static int s_WBWriteTimeout = 1000;
58 static int s_WBWriteTimeout2 = 1000;
59 static Uint2 s_WBFailedWriteDelay = 2;
60
61 static ssize_t s_WBCurSize = 0;
62 static ssize_t s_WBReleasableSize = 0;
63 static ssize_t s_WBReleasingSize = 0;
64 static SWriteBackData* s_WBData = NULL;
65 static CWriteBackControl* s_WBControl = NULL;
66 static TVerDataMap* s_VersMap = NULL;
67
68 static CMiniMutex s_ConsListLock;
69 static Uint4 s_CntConsumers = 0;
70 static TSrvConsList s_ConsList;
71 vector<SNCBlobVerData*> s_WBToAddList;
72 vector<SNCBlobVerData*> s_WBToDelList;
73 /*
74 static Uint8 s_CntMgrs = 0;
75 static Uint8 s_CntVers = 0;
76 */
77 typedef map<string, Uint8> TForgets;
78 static TForgets s_Forgets;
79 static TForgets s_ForgetKeys;
80 static Uint8 s_LatestPurge = 0;
81 static CBulkCleaner* s_BulkCleaner = NULL;
82
83 // for PutFailed/PutSucceeded
84 static bool s_FailMonitor = false;
85 static CMiniMutex s_FailedListLock;
86 static int s_FailedReserve=0;
87 static CAtomicCounter s_FailedCounter;
88 static set<string> s_FailedKeys;
89
90 static Uint8 s_AnotherServerMain = 0;
91
92 static Uint8 s_BlobSync = 0;
93 static Uint8 s_BlobSyncTDiff = 0;
94 static Uint8 s_BlobSyncMaxTDiff = 0;
95
96 static Uint8 s_BlobNotify = 0;
97 static Uint8 s_BlobNotifyTDiff = 0;
98 static Uint8 s_BlobNotifyMaxTDiff = 0;
99
100
101 static const size_t kVerManagerSize = sizeof(CNCBlobVerManager)
102 + sizeof(CCurVerReader);
103 static const size_t kDefChunkMapsSize
104 = sizeof(SNCChunkMaps)
105 + (kNCMaxBlobMapsDepth + 1)
106 * (sizeof(SNCChunkMapInfo)
107 + (kNCMaxChunksInMap - 1) * sizeof(SNCDataCoord));
108
109
110
111
112 static bool
s_IsCurVerOlder(const SNCBlobVerData * cur_ver,const SNCBlobVerData * new_ver)113 s_IsCurVerOlder(const SNCBlobVerData* cur_ver, const SNCBlobVerData* new_ver)
114 {
115 bool res = true;
116 if (cur_ver) {
117 if (cur_ver->create_time != new_ver->create_time) {
118 res = cur_ver->create_time < new_ver->create_time;
119 } else if (cur_ver->create_server != new_ver->create_server) {
120 res = cur_ver->create_server < new_ver->create_server;
121 } else if (cur_ver->create_id != new_ver->create_id) {
122 res = cur_ver->create_id < new_ver->create_id;
123 } else if (cur_ver->expire != new_ver->expire) {
124 res = cur_ver->expire < new_ver->expire;
125 } else if (cur_ver->ver_expire != new_ver->ver_expire) {
126 res = cur_ver->ver_expire < new_ver->ver_expire;
127 } else {
128 res = cur_ver->dead_time < new_ver->dead_time;
129 }
130 }
131 return res;
132 }
133
GetWBSoftSizeLimit(void)134 Uint8 GetWBSoftSizeLimit(void) {
135 return s_WBSoftSizeLimit;
136 }
GetWBHardSizeLimit(void)137 Uint8 GetWBHardSizeLimit(void) {
138 return s_WBHardSizeLimit;
139 }
GetWBWriteTimeout(void)140 int GetWBWriteTimeout(void) {
141 return s_WBWriteTimeout;
142 }
GetWBFailedWriteDelay(void)143 int GetWBFailedWriteDelay(void) {
144 return s_WBFailedWriteDelay;
145 }
146
147 void
SetWBSoftSizeLimit(Uint8 limit)148 SetWBSoftSizeLimit(Uint8 limit)
149 {
150 s_WBSoftSizeLimit = size_t(limit);
151 }
152
153 void
SetWBHardSizeLimit(Uint8 limit)154 SetWBHardSizeLimit(Uint8 limit)
155 {
156 s_WBHardSizeLimit = max(size_t(limit), s_WBSoftSizeLimit + 1000000000);
157 }
158
159 void
SetWBWriteTimeout(int timeout1,int timeout2)160 SetWBWriteTimeout(int timeout1, int timeout2)
161 {
162 s_WBWriteTimeout = timeout1;
163 s_WBWriteTimeout2 = timeout2;
164 }
165 void
SetWBInitialSyncComplete(void)166 SetWBInitialSyncComplete(void)
167 {
168 s_WBWriteTimeout = s_WBWriteTimeout2;
169 }
170
171 void
SetWBFailedWriteDelay(int delay)172 SetWBFailedWriteDelay(int delay)
173 {
174 s_WBFailedWriteDelay = Uint2(delay);
175 }
176
177 static inline SWriteBackData*
s_GetWBData(void)178 s_GetWBData(void)
179 {
180 return &s_WBData[CTaskServer::GetCurThreadNum()];
181 }
182
183 static inline size_t
s_CalcVerDataSize(SNCBlobVerData * ver_data)184 s_CalcVerDataSize(SNCBlobVerData* ver_data)
185 {
186 return sizeof(*ver_data) + ver_data->chunks.capacity() * sizeof(char*);
187 }
188
189 static size_t
s_CalcChunkMapsSize(Uint2 map_size)190 s_CalcChunkMapsSize(Uint2 map_size)
191 {
192 if (map_size == kNCMaxChunksInMap)
193 return kDefChunkMapsSize;
194 else {
195 return sizeof(SNCChunkMaps)
196 + (kNCMaxBlobMapsDepth + 1)
197 * (sizeof(SNCChunkMapInfo) + (map_size - 1) * sizeof(SNCDataCoord));
198 }
199 }
200
201 static void
s_AddCurrentMem(size_t mem_size)202 s_AddCurrentMem(size_t mem_size)
203 {
204 SWriteBackData* wb_data = s_GetWBData();
205 wb_data->lock.Lock();
206 wb_data->cur_size += mem_size;
207 wb_data->lock.Unlock();
208 }
209
210 static void
s_SubCurrentMem(size_t mem_size)211 s_SubCurrentMem(size_t mem_size)
212 {
213 SWriteBackData* wb_data = s_GetWBData();
214 wb_data->lock.Lock();
215 wb_data->cur_size -= mem_size;
216 wb_data->lock.Unlock();
217 }
218
219 static void
s_AddReleasableMem(SNCBlobVerData * ver_data,size_t add_releasable,size_t sub_releasing)220 s_AddReleasableMem(SNCBlobVerData* ver_data,
221 size_t add_releasable,
222 size_t sub_releasing)
223 {
224 SWriteBackData* wb_data = s_GetWBData();
225 wb_data->lock.Lock();
226 wb_data->releasable_size += add_releasable;
227 wb_data->releasing_size -= sub_releasing;
228 if (ver_data) {
229 wb_data->to_add_list->push_back(ver_data);
230 }
231 wb_data->lock.Unlock();
232 }
233
234 static void
s_SubReleasableMem(size_t mem_size)235 s_SubReleasableMem(size_t mem_size)
236 {
237 SWriteBackData* wb_data = s_GetWBData();
238 wb_data->lock.Lock();
239 wb_data->releasable_size -= mem_size;
240 wb_data->lock.Unlock();
241 }
242
243 static void
s_AddReleasingMem(size_t add_releasing,size_t sub_releasable)244 s_AddReleasingMem(size_t add_releasing, size_t sub_releasable)
245 {
246 SWriteBackData* wb_data = s_GetWBData();
247 wb_data->lock.Lock();
248 wb_data->releasing_size += add_releasing;
249 wb_data->releasable_size -= sub_releasable;
250 wb_data->lock.Unlock();
251 }
252
253 static void
s_SubReleasingMem(size_t mem_size)254 s_SubReleasingMem(size_t mem_size)
255 {
256 SWriteBackData* wb_data = s_GetWBData();
257 wb_data->lock.Lock();
258 wb_data->releasing_size -= mem_size;
259 wb_data->lock.Unlock();
260 }
261
262 static void
s_ScheduleVerDelete(SNCBlobVerData * ver_data)263 s_ScheduleVerDelete(SNCBlobVerData* ver_data)
264 {
265 SWriteBackData* wb_data = s_GetWBData();
266 wb_data->lock.Lock();
267 wb_data->to_del_list->push_back(ver_data);
268 ver_data->delete_scheduled = true;
269 wb_data->lock.Unlock();
270 }
271
272 static char*
s_AllocWriteBackMem(Uint4 mem_size,CSrvTransConsumer * consumer)273 s_AllocWriteBackMem(Uint4 mem_size, CSrvTransConsumer* consumer)
274 {
275 char* mem = NULL;
276 if (s_WBCurSize <= s_WBReleasingSize
277 || (size_t)(s_WBCurSize - s_WBReleasingSize) + mem_size < s_WBHardSizeLimit
278 || s_WBReleasableSize < (ssize_t)mem_size
279 || CTaskServer::IsInShutdown())
280 {
281 mem = (char*)malloc(mem_size);
282 if (mem) {
283 s_AddCurrentMem(mem_size);
284 }
285 }
286 else {
287 s_ConsListLock.Lock();
288 s_ConsList.push_front(*consumer);
289 consumer->m_TransFinished = false;
290 ++s_CntConsumers;
291 s_ConsListLock.Unlock();
292 }
293 return mem;
294 }
295
296 static void
s_NotifyConsumers(void)297 s_NotifyConsumers(void)
298 {
299 TSrvConsList cons_list;
300
301 s_ConsListLock.Lock();
302 s_ConsList.swap(cons_list);
303 s_ConsListLock.Unlock();
304
305 while (!cons_list.empty()) {
306 CSrvTransConsumer* consumer = &cons_list.front();
307 cons_list.pop_front();
308 consumer->m_TransFinished = true;
309 consumer->SetRunnable();
310 }
311 }
312
313 static char*
s_ReallocWriteBackMem(char * mem,Uint4 old_size,Uint4 new_size)314 s_ReallocWriteBackMem(char* mem, Uint4 old_size, Uint4 new_size)
315 {
316 Uint4 diff = old_size - new_size;
317 if (diff != 0) {
318 mem = (char*)realloc(mem, new_size);
319 s_SubCurrentMem(diff);
320 }
321 return mem;
322 }
323
324 static void
s_FreeWriteBackMem(char * mem,Uint4 mem_size,Uint4 sub_releasing)325 s_FreeWriteBackMem(char* mem, Uint4 mem_size, Uint4 sub_releasing)
326 {
327 free(mem);
328
329 SWriteBackData* wb_data = s_GetWBData();
330 wb_data->lock.Lock();
331 wb_data->cur_size -= mem_size;
332 wb_data->releasing_size -= sub_releasing;
333 wb_data->lock.Unlock();
334 }
335
336 static void
s_ReleaseMemory(size_t soft_limit)337 s_ReleaseMemory(size_t soft_limit)
338 {
339 if (s_WBCurSize - s_WBReleasingSize <= (ssize_t)soft_limit)
340 return;
341
342 size_t to_free = s_WBCurSize - s_WBReleasingSize - soft_limit;
343 size_t freed = 0;
344 while (freed < to_free && !s_VersMap->empty()) {
345 TVerDataMap::iterator it = s_VersMap->begin();
346 SNCBlobVerData* ver_data = &*it;
347 s_VersMap->erase(it);
348
349 if (ver_data->releasable_mem == 0)
350 continue;
351
352 int access_time = ACCESS_ONCE(ver_data->last_access_time);
353 if (access_time != ver_data->saved_access_time) {
354 ver_data->saved_access_time = access_time;
355 s_VersMap->insert_equal(*ver_data);
356 continue;
357 }
358 freed += ver_data->RequestMemRelease();
359 }
360 s_WBReleasingSize += freed;
361 s_WBReleasableSize -= freed;
362 }
363
364 static void
s_TransferVerList(vector<SNCBlobVerData * > & from_list,vector<SNCBlobVerData * > & to_list)365 s_TransferVerList(vector<SNCBlobVerData*>& from_list,
366 vector<SNCBlobVerData*>& to_list)
367 {
368 if (from_list.size() == 0)
369 return;
370
371 size_t prev_size = to_list.size();
372 to_list.resize(prev_size + from_list.size(), NULL);
373 memcpy(&to_list[prev_size], &from_list[0],
374 from_list.size() * sizeof(SNCBlobVerData*));
375 from_list.resize(0);
376 }
377
378 static void
s_CollectWBData(SWriteBackData * wb_data)379 s_CollectWBData(SWriteBackData* wb_data)
380 {
381 vector<SNCBlobVerData*> *next_add = new vector<SNCBlobVerData*>;
382 vector<SNCBlobVerData*> *next_del = new vector<SNCBlobVerData*>;
383 vector<SNCBlobVerData*> *prev_add = nullptr, *prev_del = nullptr;
384
385 wb_data->lock.Lock();
386 s_WBCurSize += wb_data->cur_size;
387 wb_data->cur_size = 0;
388 s_WBReleasableSize += wb_data->releasable_size;
389 wb_data->releasable_size = 0;
390 s_WBReleasingSize += wb_data->releasing_size;
391 wb_data->releasing_size = 0;
392
393 if (next_add) {
394 prev_add = wb_data->to_add_list;
395 wb_data->to_add_list = next_add;
396 } else {
397 s_TransferVerList(*(wb_data->to_add_list), s_WBToAddList);
398 }
399 if (next_del) {
400 prev_del = wb_data->to_del_list;
401 wb_data->to_del_list = next_del;
402 } else {
403 s_TransferVerList(*(wb_data->to_del_list), s_WBToDelList);
404 }
405
406 wb_data->lock.Unlock();
407
408 if (prev_add) {
409 s_TransferVerList(*prev_add, s_WBToAddList);
410 delete prev_add;
411 }
412 if (prev_del) {
413 s_TransferVerList(*prev_del, s_WBToDelList);
414 delete prev_del;
415 }
416 }
417
418 static void
s_ProcessWBAddDel(Uint4 was_del_size)419 s_ProcessWBAddDel(Uint4 was_del_size)
420 {
421 // add new verdata into map (s_VersMap) sorted by
422 // last seen access time (ver_data->saved_access_time)
423 for (Uint4 i = 0; i < s_WBToAddList.size(); ++i) {
424 SNCBlobVerData* ver_data = s_WBToAddList[i];
425 if (!ver_data->is_linked() && ver_data->releasable_mem != 0
426 && !ver_data->delete_scheduled)
427 {
428 ver_data->saved_access_time = ver_data->last_access_time;
429 s_VersMap->insert_equal(*ver_data);
430 }
431 }
432 s_WBToAddList.clear();
433 // remove from map and delete those about to be deleted
434 // (SNCBlobVerData request deletion)
435 for (Uint4 i = 0; i < was_del_size; ++i) {
436 SNCBlobVerData* ver_data = s_WBToDelList[i];
437 if (ver_data->is_linked()) {
438 s_VersMap->erase(s_VersMap->iterator_to(*ver_data));
439 }
440 s_WBReleasingSize -= ver_data->meta_mem;
441 s_WBCurSize -= ver_data->meta_mem;
442 ver_data->Terminate();
443 }
444 Uint4 new_size = Uint4(s_WBToDelList.size() - was_del_size);
445 if (was_del_size != 0 && new_size != 0) {
446 memmove(&s_WBToDelList[0], &s_WBToDelList[was_del_size],
447 new_size * sizeof(SNCBlobVerData*));
448 }
449 s_WBToDelList.resize(new_size, NULL);
450 }
451
452
453
SWriteBackData(void)454 SWriteBackData::SWriteBackData(void)
455 : cur_size(0),
456 releasable_size(0),
457 releasing_size(0)
458 {
459 to_add_list = new vector<SNCBlobVerData*>;
460 to_del_list = new vector<SNCBlobVerData*>;
461 }
462
463
CWriteBackControl(void)464 CWriteBackControl::CWriteBackControl(void)
465 {
466 #if __NC_TASKS_MONITOR
467 m_TaskName = "CWriteBackControl";
468 #endif
469 }
470
~CWriteBackControl(void)471 CWriteBackControl::~CWriteBackControl(void)
472 {}
473
474 void
Initialize(void)475 CWriteBackControl::Initialize(void)
476 {
477 s_VersMap = new TVerDataMap();
478 s_WBData = new SWriteBackData[CTaskServer::GetMaxRunningThreads()];
479 s_WBControl = new CWriteBackControl();
480 s_WBControl->SetRunnable();
481
482 s_BulkCleaner = new CBulkCleaner;
483 s_BulkCleaner->SetRunnable();
484 }
485
486 void
ExecuteSlice(TSrvThreadNum)487 CWriteBackControl::ExecuteSlice(TSrvThreadNum /* thr_num */)
488 {
489 // monitor write-back cache statistics (where blobs are stored before being saved on disk)
490
491 if (CTaskServer::IsInShutdown())
492 return;
493
494 ssize_t was_cur_size = s_WBCurSize;
495 Uint4 was_del_size = Uint4(s_WBToDelList.size());
496
497 Uint4 cnt_datas = CTaskServer::GetMaxRunningThreads();
498 for (Uint4 i = 0; i < cnt_datas; ++i) {
499 // collect stat from all threads
500 s_CollectWBData(&s_WBData[i]);
501 }
502
503 // delete SNCBlobVerData
504 s_ProcessWBAddDel(was_del_size);
505
506 size_t cur_size_change = 0;
507 if (s_WBCurSize > was_cur_size)
508 cur_size_change = s_WBCurSize - was_cur_size;
509 size_t soft_limit = s_WBSoftSizeLimit;
510 if (cur_size_change < soft_limit)
511 soft_limit -= cur_size_change;
512 Uint4 cnt_cons = ACCESS_ONCE(s_CntConsumers);
513 if (soft_limit > cnt_cons * kNCMaxBlobChunkSize)
514 soft_limit -= cnt_cons * kNCMaxBlobChunkSize;
515
516 s_ReleaseMemory(soft_limit);
517 s_NotifyConsumers();
518
519 RunAfter(1);
520 }
521
522 void
ReadState(SNCStateStat & state)523 CWriteBackControl::ReadState(SNCStateStat& state)
524 {
525 state.wb_size = (ssize_t(s_WBCurSize) > 0? s_WBCurSize: 0);
526 state.wb_releasable = (ssize_t(s_WBReleasableSize) > 0? s_WBReleasableSize: 0);
527 state.wb_releasing = (ssize_t(s_WBReleasingSize) > 0? s_WBReleasingSize: 0);
528
529 state.cnt_another_server_main = s_AnotherServerMain;
530 Uint8 prev = s_BlobSync;
531 if (prev != 0) {
532 state.avg_tdiff_blobcopy = s_BlobSyncTDiff / prev;
533 state.max_tdiff_blobcopy = s_BlobSyncMaxTDiff;
534 } else {
535 state.avg_tdiff_blobcopy = 0;
536 state.max_tdiff_blobcopy = 0;
537 }
538 prev = s_BlobNotify;
539 if (prev != 0) {
540 state.avg_tdiff_blobnotify = s_BlobNotifyTDiff / prev;
541 state.max_tdiff_blobnotify = s_BlobNotifyMaxTDiff;
542 } else {
543 state.avg_tdiff_blobnotify = 0;
544 state.max_tdiff_blobnotify = 0;
545 }
546 }
547
AnotherServerMain(void)548 void CWriteBackControl::AnotherServerMain(void)
549 {
550 AtomicAdd( s_AnotherServerMain, 1);
551 }
552
StartSyncBlob(Uint8 create_time)553 void CWriteBackControl::StartSyncBlob(Uint8 create_time)
554 {
555 if (!CNCServer::IsInitiallySynced()) {
556 return;
557 }
558 Uint8 tdiff = CSrvTime::Current().AsUSec() - create_time;
559 Uint8 prev = s_BlobSync;
560 Uint8 prevdiff = s_BlobSyncTDiff;
561 AtomicAdd(s_BlobSync, 1);
562 AtomicAdd(s_BlobSyncTDiff, tdiff);
563 if (prev > s_BlobSync || prevdiff > s_BlobSyncTDiff) {
564 s_BlobSync = 0;
565 s_BlobSyncTDiff = 0;
566 s_BlobSyncMaxTDiff = 0;
567 }
568 if (s_BlobSyncMaxTDiff < tdiff) {
569 s_BlobSyncMaxTDiff = tdiff;
570 }
571 }
572
573 void
RecordNotifyUpdateBlob(Uint8 update_received)574 CWriteBackControl::RecordNotifyUpdateBlob(Uint8 update_received)
575 {
576 if (!CNCServer::IsInitiallySynced()) {
577 return;
578 }
579 Uint8 tdiff = CSrvTime::Current().AsUSec() - update_received;
580 Uint8 prev = s_BlobNotify;
581 Uint8 prevdiff = s_BlobNotifyTDiff;
582 AtomicAdd(s_BlobNotify, 1);
583 AtomicAdd(s_BlobNotifyTDiff, tdiff);
584 if (prev > s_BlobNotify || prevdiff > s_BlobNotifyTDiff) {
585 s_BlobNotify = 0;
586 s_BlobNotifyTDiff = 0;
587 s_BlobNotifyMaxTDiff = 0;
588 }
589 if (s_BlobNotifyMaxTDiff < tdiff) {
590 s_BlobNotifyMaxTDiff = tdiff;
591 }
592 }
593
594 void
ResetStatCounters(void)595 CWriteBackControl::ResetStatCounters(void)
596 {
597 s_BlobSync = 0;
598 s_BlobSyncTDiff = 0;
599 s_BlobSyncMaxTDiff = 0;
600 }
601
602
603 void
x_DeleteCurVersion(void)604 CNCBlobVerManager::x_DeleteCurVersion(void)
605 {
606 m_CacheData->coord.clear();
607 m_CacheData->dead_time = 0;
608 CNCBlobStorage::ChangeCacheDeadTime(m_CacheData);
609 m_CacheData->expire = 0;
610 if (m_CurVersion) {
611 m_CurVersion->SetNotCurrent();
612 m_CurVersion.Reset();
613 }
614 }
615
616 void
DeleteVersion(const SNCBlobVerData * ver_data)617 CNCBlobVerManager::DeleteVersion(const SNCBlobVerData* ver_data)
618 {
619 m_CacheData->lock.Lock();
620 _ASSERT(m_CacheData->Get_ver_mgr() == this);
621 if (m_CurVersion == ver_data)
622 x_DeleteCurVersion();
623 m_CacheData->lock.Unlock();
624 }
625
626 void
DeleteDeadVersion(int)627 CNCBlobVerManager::DeleteDeadVersion(int /*cut_time*/)
628 {
629 m_CacheData->lock.Lock();
630 _ASSERT(m_CacheData->Get_ver_mgr() == this);
631 #if 0
632 CSrvRef<SNCBlobVerData> cur_ver(m_CurVersion);
633 if (m_CurVersion) {
634 if (m_CurVersion->dead_time <= cut_time)
635 x_DeleteCurVersion();
636 }
637 else if (!m_CacheData->coord.empty() && m_CacheData->dead_time <= cut_time) {
638 x_DeleteCurVersion();
639 }
640 #else
641 x_DeleteCurVersion();
642 #endif
643 m_CacheData->lock.Unlock();
644 }
645
CNCBlobVerManager(Uint2 time_bucket,const string & key,SNCCacheData * cache_data)646 CNCBlobVerManager::CNCBlobVerManager(Uint2 time_bucket,
647 const string& key,
648 SNCCacheData* cache_data)
649 : m_TimeBucket(time_bucket),
650 m_NeedReleaseMem(false),
651 m_NeedAbort(false),
652 m_CacheData(cache_data),
653 m_CurVerReader(new CCurVerReader(this)),
654 m_Key(key)
655 {
656 CNCBlobStorage::ReferenceCacheData(m_CacheData);
657 // m_CacheData->ver_mgr = this;
658 if (!AtomicCAS(cache_data->ver_mgr, nullptr, this)) {
659 #ifdef _DEBUG
660 CNCAlerts::Register(CNCAlerts::eDebugCacheFailedMgrAttach,"CNCBlobVerManager ctor");
661 #endif
662 }
663 //AtomicAdd(s_CntMgrs, 1);
664 //Uint8 cnt = AtomicAdd(s_CntMgrs, 1);
665 //INFO("CNCBlobVerManager, cnt=" << cnt);
666 #if __NC_TASKS_MONITOR
667 m_TaskName = "CNCBlobVerManager";
668 #endif
669 SetPriority(s_TaskPriorityWbMemRelease);
670 }
671
~CNCBlobVerManager(void)672 CNCBlobVerManager::~CNCBlobVerManager(void)
673 {
674 //AtomicSub(s_CntMgrs, 1);
675 //Uint8 cnt = AtomicSub(s_CntMgrs, 1);
676 //INFO("~CNCBlobVerManager, cnt=" << cnt);
677 }
678
679 void
ObtainReference(void)680 CNCBlobVerManager::ObtainReference(void)
681 {
682 AddReference();
683 if (ReferencedOnlyOnce()) {
684 m_NeedReleaseMem = false;
685 if (m_CurVersion)
686 m_CurVersion->SetNonReleasable();
687 }
688 }
689
690 CNCBlobVerManager*
Get(Uint2 time_bucket,const string & key,SNCCacheData * cache_data,bool for_new_version)691 CNCBlobVerManager::Get(Uint2 time_bucket,
692 const string& key,
693 SNCCacheData* cache_data,
694 bool for_new_version)
695 {
696 cache_data->lock.Lock();
697 CNCBlobVerManager* mgr = cache_data->Get_ver_mgr();
698 if (mgr) {
699 mgr->ObtainReference();
700 }
701 else if (for_new_version || !cache_data->coord.empty()) {
702 mgr = new CNCBlobVerManager(time_bucket, key, cache_data);
703 mgr->AddReference();
704 s_AddCurrentMem(kVerManagerSize);
705 }
706 cache_data->lock.Unlock();
707
708 return mgr;
709 }
710
711 void
Release(void)712 CNCBlobVerManager::Release(void)
713 {
714 SNCCacheData* cache_data = m_CacheData;
715 cache_data->lock.Lock();
716 // DeleteThis below should be executed under the lock
717 RemoveReference();
718 cache_data->lock.Unlock();
719 }
720
721 void
DeleteThis(void)722 CNCBlobVerManager::DeleteThis(void)
723 {
724 if (m_CurVersion)
725 m_CurVersion->SetReleasable();
726 SetRunnable();
727 }
728
729 void
x_ReleaseMgr(void)730 CNCBlobVerManager::x_ReleaseMgr(void)
731 {
732 SNCCacheData* cache_data = m_CacheData;
733 m_CacheData = nullptr;
734 if (cache_data) {
735 // cache_data->ver_mgr = NULL;
736 if (!AtomicCAS(cache_data->ver_mgr, this, nullptr)) {
737 #ifdef _DEBUG
738 CNCAlerts::Register(CNCAlerts::eDebugCacheFailedMgrDetach,"x_ReleaseMgr");
739 if (cache_data->ver_mgr != nullptr) {
740 CNCAlerts::Register(CNCAlerts::eDebugCacheWrongMgr,"x_ReleaseMgr");
741 }
742 #endif
743 }
744 if (m_CurVersion) {
745 if (!m_NeedAbort || m_Key == cache_data->key) {
746 if (!cache_data->coord.empty() && cache_data->coord != m_CurVersion->coord) {
747 #ifdef _DEBUG
748 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted1,"x_ReleaseMgr");
749 #endif
750 CExpiredCleaner::x_DeleteData(cache_data);
751 }
752 cache_data->coord = m_CurVersion->coord;
753 }
754 m_CurVersion.Reset();
755 }
756 else {
757 s_SubCurrentMem(kVerManagerSize);
758 }
759
760 if (cache_data->dead_time == 0 && !cache_data->coord.empty()) {
761 #ifdef _DEBUG
762 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted2,"x_ReleaseMgr");
763 #endif
764 CExpiredCleaner::x_DeleteData(cache_data);
765 }
766 cache_data->lock.Unlock();
767 if (!m_NeedAbort) {
768 CNCBlobStorage::ReleaseCacheData(cache_data);
769 }
770 } else {
771 s_SubCurrentMem(kVerManagerSize);
772 }
773 if (!Referenced()) {
774 m_CurVerReader->Terminate();
775 m_CurVerReader = nullptr;
776 Terminate();
777 }
778 }
779
780 void
ExecuteSlice(TSrvThreadNum)781 CNCBlobVerManager::ExecuteSlice(TSrvThreadNum /* thr_num */)
782 {
783 // CNCBlobAccessor has a reference to this one.
784 // for each blob key there is only one CNCBlobVerManager.
785
786 CSrvRef<SNCBlobVerData> cur_ver;
787
788 m_CacheData->lock.Lock();
789
790 if (m_CacheData->Get_ver_mgr() != this) {
791 m_NeedAbort = true;
792 #ifdef _DEBUG
793 CNCAlerts::Register(CNCAlerts::eDebugCacheWrong,"CNCBlobVerManager::ExecuteSlice");
794 #endif
795 x_ReleaseMgr();
796 return;
797 }
798
799 cur_ver = m_CurVersion;
800 if (!cur_ver && !Referenced()) {
801 x_ReleaseMgr();
802 return;
803 }
804 if (!cur_ver) {
805 m_CacheData->lock.Unlock();
806 return;
807 }
808
809 // initially, blob is in memory
810 // check if it is time to be saved onto disk
811 int cur_time = CSrvTime::CurSecs();
812 int write_time = ACCESS_ONCE(cur_ver->need_write_time);
813
814 #if 1
815 if (cur_time + 60 >= cur_ver->dead_time) {
816 write_time = 0;
817 }
818 #endif
819
820 if (write_time != 0) {
821 m_CacheData->lock.Unlock();
822 if (write_time <= cur_time || CTaskServer::IsInShutdown()) {
823 if (!CNCBlobStorage::IsAbandoned()) {
824 CNCBlobStorage::ReferenceCacheData(m_CacheData);
825 cur_ver->RequestDataWrite();
826 }
827 } else {
828 RunAfter(write_time - cur_time);
829 }
830 return;
831 }
832
833 // if requested, remove metadata from memory
834 if (m_NeedReleaseMem && !Referenced()) {
835 x_ReleaseMgr();
836 return;
837 }
838 m_NeedReleaseMem = false;
839
840 m_CacheData->lock.Unlock();
841 }
842
RevokeDataWrite()843 void CNCBlobVerManager::RevokeDataWrite()
844 {
845 CNCBlobStorage::ReleaseCacheData(m_CacheData);
846 }
847
DataWritten(void)848 void CNCBlobVerManager::DataWritten(void)
849 {
850 if (m_CurVersion) {
851 m_CacheData->lock.Lock();
852 if (!m_CacheData->coord.empty() && m_CacheData->coord != m_CurVersion->coord) {
853 #ifdef _DEBUG
854 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted3,"DataWritten");
855 #endif
856 CExpiredCleaner::x_DeleteData(m_CacheData);
857 }
858 m_CacheData->coord = m_CurVersion->coord;
859 if (m_CacheData->dead_time == 0 && !m_CacheData->coord.empty()) {
860 #ifdef _DEBUG
861 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted4,"DataWritten");
862 #endif
863 CExpiredCleaner::x_DeleteData(m_CacheData);
864 }
865 m_CacheData->lock.Unlock();
866 CNCBlobStorage::ReleaseCacheData(m_CacheData);
867 }
868 }
869
870 void
RequestMemRelease(void)871 CNCBlobVerManager::RequestMemRelease(void)
872 {
873 m_CacheData->lock.Lock();
874 if (!Referenced()) {
875 m_NeedReleaseMem = true;
876 SetRunnable();
877 }
878 m_CacheData->lock.Unlock();
879 }
880
881 CSrvRef<SNCBlobVerData>
GetCurVersion(void)882 CNCBlobVerManager::GetCurVersion(void)
883 {
884 CSrvRef<SNCBlobVerData> to_del_ver;
885 CSrvRef<SNCBlobVerData> cur_ver;
886
887 m_CacheData->lock.Lock();
888 _ASSERT(m_CacheData->Get_ver_mgr() == this);
889 if (m_CurVersion
890 && m_CurVersion->dead_time <= CSrvTime::CurSecs())
891 {
892 to_del_ver = m_CurVersion;
893 x_DeleteCurVersion();
894 }
895 cur_ver = m_CurVersion;
896 m_CacheData->lock.Unlock();
897
898 return cur_ver;
899 }
900
901 CSrvRef<SNCBlobVerData>
CreateNewVersion(void)902 CNCBlobVerManager::CreateNewVersion(void)
903 {
904 SNCBlobVerData* data = new SNCBlobVerData(this);
905 // data->manager = this;
906 data->coord.clear();
907 data->data_coord.clear();
908 data->create_time = 0;
909 data->expire = 0;
910 data->dead_time = 0;
911 data->size = 0;
912 data->blob_ver = 0;
913 data->chunk_size = kNCMaxBlobChunkSize;
914 data->map_size = kNCMaxChunksInMap;
915 data->meta_mem = s_CalcVerDataSize(data);
916 s_AddCurrentMem(data->meta_mem);
917 return SrvRef(data);
918 }
919
920 void
FinalizeWriting(SNCBlobVerData * ver_data)921 CNCBlobVerManager::FinalizeWriting(SNCBlobVerData* ver_data)
922 {
923 CSrvRef<SNCBlobVerData> old_ver(ver_data);
924
925 m_CacheData->lock.Lock();
926 _ASSERT(m_CacheData->Get_ver_mgr() == this);
927 if (ver_data->dead_time > CSrvTime::CurSecs()
928 && s_IsCurVerOlder(m_CurVersion, ver_data))
929 {
930 old_ver.Swap(m_CurVersion);
931 m_CacheData->coord = m_CurVersion->coord;
932 m_CacheData->dead_time = m_CurVersion->dead_time;
933 if (m_CacheData->saved_dead_time != m_CacheData->dead_time) {
934 CNCBlobStorage::ChangeCacheDeadTime(m_CacheData);
935 }
936 m_CacheData->create_id = m_CurVersion->create_id;
937 m_CacheData->create_server = m_CurVersion->create_server;
938 m_CacheData->create_time = m_CurVersion->create_time;
939 m_CacheData->expire = m_CurVersion->expire;
940 m_CacheData->ver_expire = m_CurVersion->ver_expire;
941 m_CacheData->size = m_CurVersion->size;
942 m_CacheData->chunk_size = m_CurVersion->chunk_size;
943 m_CacheData->map_size = m_CurVersion->map_size;
944
945 m_CurVersion->meta_has_changed = true;
946 m_CurVersion->last_access_time = CSrvTime::CurSecs();
947 m_CurVersion->need_write_time = m_CurVersion->last_access_time
948 + s_WBWriteTimeout;
949 if (old_ver)
950 old_ver->SetNotCurrent();
951 m_CurVersion->SetCurrent();
952
953 SetRunnable();
954 }
955 m_CacheData->lock.Unlock();
956
957 old_ver.Reset();
958 }
959
960 void
DeadTimeChanged(SNCBlobVerData * ver_data)961 CNCBlobVerManager::DeadTimeChanged(SNCBlobVerData* ver_data)
962 {
963 m_CacheData->lock.Lock();
964 if (m_CurVersion == ver_data) {
965 m_CacheData->dead_time = ver_data->dead_time;
966 m_CacheData->expire = ver_data->expire;
967 m_CacheData->ver_expire = ver_data->ver_expire;
968 m_CurVersion->last_access_time = CSrvTime::CurSecs();
969 m_CurVersion->need_write_time = m_CurVersion->last_access_time
970 + s_WBWriteTimeout;
971 m_CurVersion->meta_has_changed = true;
972
973 SetRunnable();
974 }
975 m_CacheData->lock.Unlock();
976 }
977
978
CCurVerReader(CNCBlobVerManager * mgr)979 CCurVerReader::CCurVerReader(CNCBlobVerManager* mgr)
980 : m_VerMgr(mgr)
981 {
982 #if __NC_TASKS_MONITOR
983 m_TaskName = "CCurVerReader";
984 #endif
985 }
986
~CCurVerReader(void)987 CCurVerReader::~CCurVerReader(void)
988 {}
989
990 void
ExecuteSlice(TSrvThreadNum thr_num)991 CCurVerReader::ExecuteSlice(TSrvThreadNum thr_num)
992 {
993 if (IsTransStateFinal())
994 return;
995
996 // read current blob metadata from db
997
998 SNCCacheData* cache_data = m_VerMgr->m_CacheData;
999 if (cache_data->coord.empty()) {
1000 FinishTransition();
1001 return;
1002 }
1003
1004 SNCBlobVerData* ver_data = new SNCBlobVerData(m_VerMgr);
1005 // ver_data->manager = m_VerMgr;
1006 ver_data->coord = cache_data->coord;
1007 ver_data->create_time = cache_data->create_time;
1008 ver_data->size = cache_data->size;
1009 ver_data->chunk_size = cache_data->chunk_size;
1010 ver_data->map_size = cache_data->map_size;
1011 ver_data->is_cur_version = true;
1012 ver_data->last_access_time = CSrvTime::CurSecs();
1013 if (ver_data->size != 0) {
1014 ver_data->cnt_chunks = (ver_data->size - 1) / ver_data->chunk_size + 1;
1015 ver_data->chunks.resize(ver_data->cnt_chunks, NULL);
1016 }
1017 ver_data->cur_chunk_num = ver_data->cnt_chunks;
1018 ver_data->meta_mem = s_CalcVerDataSize(ver_data);
1019 s_AddCurrentMem(ver_data->meta_mem);
1020 ver_data->meta_mem += kVerManagerSize;
1021 m_VerMgr->m_CurVersion = ver_data;
1022 if (!CNCBlobStorage::ReadBlobInfo(ver_data)) {
1023 SRV_LOG(Error, "Problem reading meta-information about blob "
1024 << CNCBlobKeyLight(m_VerMgr->m_Key).KeyForLogs());
1025 CSrvRef<SNCBlobVerData> cur_ver(ver_data);
1026 m_VerMgr->DeleteVersion(ver_data);
1027 }
1028
1029 FinishTransition();
1030 }
1031
1032
SNCChunkMaps(Uint2 map_size)1033 SNCChunkMaps::SNCChunkMaps(Uint2 map_size)
1034 {
1035 size_t mem_size = (char*)maps[0]->coords - (char*)maps[0]
1036 + map_size * sizeof(maps[0]->coords[0]);
1037 for (Uint1 i = 0; i <= kNCMaxBlobMapsDepth; ++i) {
1038 maps[i] = (SNCChunkMapInfo*)calloc(mem_size, 1);
1039 }
1040 }
1041
~SNCChunkMaps(void)1042 SNCChunkMaps::~SNCChunkMaps(void)
1043 {
1044 for (Uint1 i = 0; i <= kNCMaxBlobMapsDepth; ++i) {
1045 free(maps[i]);
1046 }
1047 }
1048
1049
CWBMemDeleter(char * mem,Uint4 mem_size)1050 CWBMemDeleter::CWBMemDeleter(char* mem, Uint4 mem_size)
1051 : m_Mem(mem),
1052 m_MemSize(mem_size)
1053 {}
1054
~CWBMemDeleter(void)1055 CWBMemDeleter::~CWBMemDeleter(void)
1056 {}
1057
1058 void
ExecuteRCU(void)1059 CWBMemDeleter::ExecuteRCU(void)
1060 {
1061 s_FreeWriteBackMem(m_Mem, m_MemSize, m_MemSize);
1062 delete this;
1063 }
1064
1065
SNCBlobVerData(CNCBlobVerManager * mgr)1066 SNCBlobVerData::SNCBlobVerData(CNCBlobVerManager* mgr)
1067 : size(0),
1068 create_time(0),
1069 create_server(0),
1070 create_id(0),
1071 updated_on_server(0),
1072 updated_at_time(0),
1073 update_received(0),
1074 ttl(0),
1075 expire(0),
1076 dead_time(0),
1077 blob_ver(0),
1078 ver_ttl(0),
1079 ver_expire(0),
1080 chunk_size(0),
1081 map_size(0),
1082 map_depth(0),
1083 has_error(false),
1084 is_cur_version(false),
1085 meta_has_changed(false),
1086 move_or_rewrite(false),
1087 is_releasable(false),
1088 request_data_write(false),
1089 need_write_all(false),
1090 need_stop_write(false),
1091 need_mem_release(false),
1092 delete_scheduled(false),
1093 map_move_counter(0),
1094 last_access_time(0),
1095 need_write_time(0),
1096 cnt_chunks(0),
1097 cur_chunk_num(0),
1098 chunk_maps(NULL),
1099 ver_manager(mgr),
1100 saved_access_time(0),
1101 meta_mem(0),
1102 data_mem(0),
1103 releasable_mem(0),
1104 releasing_mem(0)
1105 {
1106 //AtomicAdd(s_CntVers, 1);
1107 //Uint8 cnt = AtomicAdd(s_CntVers, 1);
1108 //INFO("SNCBlobVerData, cnt=" << cnt);
1109 #if __NC_TASKS_MONITOR
1110 m_TaskName = "SNCBlobVerData";
1111 #endif
1112 SetPriority(s_TaskPriorityWbMemRelease);
1113 }
1114
~SNCBlobVerData(void)1115 SNCBlobVerData::~SNCBlobVerData(void)
1116 {
1117 if (chunk_maps) {
1118 SRV_FATAL("chunk_maps not released");
1119 }
1120
1121 //AtomicSub(s_CntVers, 1);
1122 //Uint8 cnt = AtomicSub(s_CntVers, 1);
1123 //INFO("~SNCBlobVerData, cnt=" << cnt);
1124 }
1125
1126 void
DeleteThis(void)1127 SNCBlobVerData::DeleteThis(void)
1128 {
1129 wb_mem_lock.Lock();
1130 if (!is_cur_version) {
1131 s_AddReleasingMem(releasable_mem + meta_mem, releasable_mem);
1132 releasing_mem += releasable_mem + meta_mem;
1133 releasable_mem = 0;
1134 need_mem_release = true;
1135 }
1136 else if (releasable_mem != 0) {
1137 s_AddReleasingMem(releasable_mem, releasable_mem);
1138 releasing_mem += releasable_mem;
1139 releasable_mem = 0;
1140 }
1141 ver_manager = NULL;
1142 wb_mem_lock.Unlock();
1143
1144 SetRunnable();
1145 }
1146
1147 void
RequestDataWrite(void)1148 SNCBlobVerData::RequestDataWrite(void)
1149 {
1150 wb_mem_lock.Lock();
1151 request_data_write = true;
1152 need_write_all = true;
1153 need_write_time = 0;
1154 size_t mem_size = releasable_mem;
1155 if (is_releasable)
1156 mem_size -= meta_mem;
1157 s_AddReleasingMem(mem_size, mem_size);
1158 releasing_mem += mem_size;
1159 releasable_mem -= mem_size;
1160 wb_mem_lock.Unlock();
1161
1162 SetRunnable();
1163 }
1164
1165 size_t
RequestMemRelease(void)1166 SNCBlobVerData::RequestMemRelease(void)
1167 {
1168 wb_mem_lock.Lock();
1169 size_t mem_size = releasable_mem;
1170 releasing_mem += mem_size;
1171 releasable_mem = 0;
1172 need_write_time = 0;
1173 need_mem_release = true;
1174 wb_mem_lock.Unlock();
1175
1176 SetRunnable();
1177 return mem_size;
1178 }
1179
1180 void
SetNotCurrent(void)1181 SNCBlobVerData::SetNotCurrent(void)
1182 {
1183 wb_mem_lock.Lock();
1184 if (request_data_write) {
1185 #ifdef _DEBUG
1186 CNCAlerts::Register(CNCAlerts::eDebugDeleteSNCBlobVerData,"SetNotCurrent");
1187 #endif
1188 request_data_write = false;
1189 ver_manager->RevokeDataWrite();
1190 }
1191 is_cur_version = false;
1192 need_stop_write = true;
1193 meta_mem -= kVerManagerSize;
1194 wb_mem_lock.Unlock();
1195
1196 SetRunnable();
1197 }
1198
1199 void
SetCurrent(void)1200 SNCBlobVerData::SetCurrent(void)
1201 {
1202 wb_mem_lock.Lock();
1203 is_cur_version = true;
1204 meta_mem += kVerManagerSize;
1205 wb_mem_lock.Unlock();
1206 }
1207
1208 void
SetReleasable(void)1209 SNCBlobVerData::SetReleasable(void)
1210 {
1211 last_access_time = CSrvTime::CurSecs();
1212
1213 wb_mem_lock.Lock();
1214 is_releasable = true;
1215 releasable_mem += meta_mem;
1216 s_AddReleasableMem(this, meta_mem, 0);
1217 wb_mem_lock.Unlock();
1218 }
1219
1220 void
SetNonReleasable(void)1221 SNCBlobVerData::SetNonReleasable(void)
1222 {
1223 last_access_time = CSrvTime::CurSecs();
1224
1225 wb_mem_lock.Lock();
1226 is_releasable = false;
1227 if (releasable_mem != 0) {
1228 if (releasable_mem < meta_mem) {
1229 SRV_FATAL("blob ver data broken");
1230 }
1231 releasable_mem -= meta_mem;
1232 s_SubReleasableMem(meta_mem);
1233 }
1234 else {
1235 if (releasing_mem < meta_mem) {
1236 SRV_FATAL("blob ver data broken");
1237 }
1238 releasing_mem -= meta_mem;
1239 need_mem_release = false;
1240 s_SubReleasingMem(meta_mem);
1241 }
1242 wb_mem_lock.Unlock();
1243 }
1244
1245 void
x_FreeChunkMaps(void)1246 SNCBlobVerData::x_FreeChunkMaps(void)
1247 {
1248 if (chunk_maps) {
1249 s_SubCurrentMem(s_CalcChunkMapsSize(map_size));
1250 delete chunk_maps;
1251 chunk_maps = NULL;
1252 }
1253 }
1254
1255 bool
x_WriteBlobInfo(void)1256 SNCBlobVerData::x_WriteBlobInfo(void)
1257 {
1258 if (!meta_has_changed)
1259 return true;
1260
1261 CNCBlobVerManager* mgr = ACCESS_ONCE(ver_manager);
1262 if (!mgr) {
1263 need_stop_write = true;
1264 return true;
1265 }
1266
1267 if (move_or_rewrite || !AtomicCAS(move_or_rewrite, false, true)) {
1268 need_write_all = true;
1269 return true;
1270 }
1271
1272 meta_has_changed = false;
1273 bool new_write = coord.empty();
1274 if (!CNCBlobStorage::WriteBlobInfo(mgr->GetKey(), this, chunk_maps, cnt_chunks, mgr->GetCacheData()))
1275 {
1276 #ifdef _DEBUG
1277 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfoFailed,"x_WriteBlobInfo");
1278 #endif
1279 meta_has_changed = true;
1280 move_or_rewrite = false;
1281 need_write_all = true;
1282 RunAfter(s_WBFailedWriteDelay);
1283 return false;
1284 }
1285 if (new_write) {
1286 CNCStat::DiskBlobWrite(size);
1287 }
1288 x_FreeChunkMaps();
1289
1290 move_or_rewrite = false;
1291 return true;
1292 }
1293
1294 bool
x_WriteCurChunk(char * write_mem,Uint4 write_size)1295 SNCBlobVerData::x_WriteCurChunk(char* write_mem, Uint4 write_size)
1296 {
1297 if (!chunk_maps) {
1298 chunk_maps = new SNCChunkMaps(map_size);
1299 s_AddCurrentMem(s_CalcChunkMapsSize(map_size));
1300 }
1301 CNCBlobVerManager* mgr = ACCESS_ONCE(ver_manager);
1302 if (!mgr) {
1303 need_stop_write = true;
1304 return true;
1305 }
1306 char* new_mem = CNCBlobStorage::WriteChunkData(
1307 this, chunk_maps, mgr->GetCacheData(),
1308 cur_chunk_num, write_mem, write_size);
1309 if (!new_mem) {
1310 RunAfter(s_WBFailedWriteDelay);
1311 return false;
1312 }
1313 CNCStat::DiskDataWrite(write_size);
1314
1315 wb_mem_lock.Lock();
1316 chunks[cur_chunk_num] = new_mem;
1317 ++cur_chunk_num;
1318 if (data_mem < write_size) {
1319 SRV_FATAL("blob ver data broken");
1320 }
1321 data_mem -= write_size;
1322 if (releasing_mem != 0) {
1323 if (releasing_mem < write_size) {
1324 SRV_FATAL("blob ver data broken");
1325 }
1326 releasing_mem -= write_size;
1327 }
1328 else {
1329 if (releasable_mem < write_size) {
1330 SRV_FATAL("blob ver data broken");
1331 }
1332 releasable_mem -= write_size;
1333 // memory will be subtracted from global releasing after call to
1334 // deleter below
1335 s_AddReleasingMem(write_size, write_size);
1336 }
1337 wb_mem_lock.Unlock();
1338
1339 CWBMemDeleter* deleter = new CWBMemDeleter(write_mem, write_size);
1340 deleter->CallRCU();
1341
1342 return true;
1343 }
1344
1345 bool
x_ExecuteWriteAll(void)1346 SNCBlobVerData::x_ExecuteWriteAll(void)
1347 {
1348 wb_mem_lock.Lock();
1349
1350 if (need_stop_write) {
1351 need_write_all = false;
1352 need_stop_write = false;
1353 if (!need_mem_release && releasing_mem != 0) {
1354 s_AddReleasableMem(this, releasing_mem, releasing_mem);
1355 releasable_mem += releasing_mem;
1356 releasing_mem = 0;
1357 }
1358 wb_mem_lock.Unlock();
1359 return false;
1360 }
1361 else if (cur_chunk_num == cnt_chunks) {
1362 need_write_all = false;
1363 wb_mem_lock.Unlock();
1364
1365 if (x_WriteBlobInfo())
1366 SetRunnable();
1367 return true;
1368 }
1369 char* write_mem = chunks[cur_chunk_num];
1370 Uint4 write_size = chunk_size;
1371 if (cur_chunk_num == cnt_chunks - 1)
1372 write_size = Uint4(min(size - (cnt_chunks - 1) * chunk_size, Uint8(chunk_size)));
1373 wb_mem_lock.Unlock();
1374
1375 if (x_WriteCurChunk(write_mem, write_size))
1376 SetRunnable();
1377 return true;
1378 }
1379
1380 void
x_DeleteVersion(void)1381 SNCBlobVerData::x_DeleteVersion(void)
1382 {
1383 CNCBlobStorage::DeleteBlobInfo(this, chunk_maps);
1384 coord.clear();
1385 x_FreeChunkMaps();
1386 if (cur_chunk_num < cnt_chunks) {
1387 for (Uint8 num = cur_chunk_num; num < cnt_chunks - 1; ++num) {
1388 s_FreeWriteBackMem(chunks[num], chunk_size, chunk_size);
1389 if (releasing_mem < chunk_size) {
1390 SRV_FATAL("blob ver data broken");
1391 }
1392 releasing_mem -= chunk_size;
1393 }
1394 Uint4 last_size = Uint4(min(size - (cnt_chunks - 1) * chunk_size, Uint8(chunk_size)));
1395 s_FreeWriteBackMem(chunks[cnt_chunks - 1], last_size, last_size);
1396 if (releasing_mem < last_size) {
1397 SRV_FATAL("blob ver data broken");
1398 }
1399 releasing_mem -= last_size;
1400 cur_chunk_num = cnt_chunks;
1401 }
1402 }
1403
1404 void
ExecuteSlice(TSrvThreadNum)1405 SNCBlobVerData::ExecuteSlice(TSrvThreadNum /* thr_num */)
1406 {
1407 // writes blob data and metadata into db
1408 if (need_write_all && x_ExecuteWriteAll())
1409 return;
1410
1411 // remove blob from memory
1412 wb_mem_lock.Lock();
1413 CNCBlobVerManager* mgr = ACCESS_ONCE(ver_manager);
1414 if (mgr) {
1415 if (request_data_write) {
1416 request_data_write = false;
1417 mgr->DataWritten();
1418 }
1419 if (need_mem_release) {
1420 // if still has something to write, request that
1421 if (data_mem != 0 || (meta_has_changed && is_cur_version)) {
1422 need_write_all = true;
1423 #ifdef _DEBUG
1424 CNCAlerts::Register(CNCAlerts::eDebugExtraWrite,"SNCBlobVerData::ExecuteSlice");
1425 #endif
1426 wb_mem_lock.Unlock();
1427 SetRunnable();
1428 return;
1429 }
1430 if (is_cur_version) {
1431 wb_mem_lock.Unlock();
1432 mgr->RequestMemRelease();
1433 return;
1434 }
1435 need_mem_release = false;
1436 }
1437 wb_mem_lock.Unlock();
1438 }
1439 else {
1440 wb_mem_lock.Unlock();
1441
1442 if (!is_cur_version)
1443 x_DeleteVersion();
1444 #if 0
1445 if (releasable_mem != 0 || releasing_mem != meta_mem) {
1446 SRV_FATAL("blob ver data broken");
1447 }
1448 #endif
1449 if (!delete_scheduled)
1450 s_ScheduleVerDelete(this);
1451 }
1452 }
1453
1454 void
AddChunkMem(char * mem,Uint4 mem_size)1455 SNCBlobVerData::AddChunkMem(char* mem, Uint4 mem_size)
1456 {
1457 wb_mem_lock.Lock();
1458 size_t old_meta = s_CalcVerDataSize(this);
1459 chunks.push_back(mem);
1460 ++cnt_chunks;
1461 size_t add_meta_size = s_CalcVerDataSize(this) - old_meta;
1462 if (add_meta_size != 0) {
1463 s_AddCurrentMem(add_meta_size);
1464 meta_mem += add_meta_size;
1465 }
1466 data_mem += mem_size;
1467 releasable_mem += mem_size;
1468 s_AddReleasableMem(this, mem_size, 0);
1469 wb_mem_lock.Unlock();
1470 }
1471
1472
1473 //static Uint8 s_CntAccs = 0;
1474
CNCBlobAccessor(void)1475 CNCBlobAccessor::CNCBlobAccessor(void)
1476 : m_ChunkMaps(NULL),
1477 m_MetaInfoReady(false),
1478 m_WriteMemRequested(false),
1479 m_Buffer(NULL)
1480 {
1481 #if __NC_TASKS_MONITOR
1482 m_TaskName = "CNCBlobAccessor";
1483 #endif
1484 //Uint8 cnt = AtomicAdd(s_CntAccs, 1);
1485 //INFO("CNCBlobAccessor, cnt=" << cnt);
1486 }
1487
~CNCBlobAccessor(void)1488 CNCBlobAccessor::~CNCBlobAccessor(void)
1489 {
1490 if (m_ChunkMaps) {
1491 SRV_FATAL("blob accessor broken");
1492 }
1493
1494 //Uint8 cnt = AtomicSub(s_CntAccs, 1);
1495 //INFO("~CNCBlobAccessor, cnt=" << cnt);
1496 }
1497
1498 void
Prepare(const string & key,const string & password,Uint2 time_bucket,ENCAccessType access_type)1499 CNCBlobAccessor::Prepare(const string& key,
1500 const string& password,
1501 Uint2 time_bucket,
1502 ENCAccessType access_type)
1503 {
1504 m_BlobKey = key;
1505 m_Password = password;
1506 m_TimeBucket = time_bucket;
1507 m_AccessType = access_type;
1508 m_HasError = false;
1509 m_VerManager = NULL;
1510 m_CurChunk = 0;
1511 m_ChunkPos = 0;
1512 m_SizeRead = 0;
1513 }
1514
1515 void
Initialize(SNCCacheData * cache_data)1516 CNCBlobAccessor::Initialize(SNCCacheData* cache_data)
1517 {
1518 if (cache_data) {
1519 bool new_version = m_AccessType == eNCCreate
1520 || m_AccessType == eNCCopyCreate;
1521 m_VerManager = CNCBlobVerManager::Get(m_TimeBucket, cache_data->key,
1522 cache_data, new_version);
1523 }
1524 }
1525
1526 void
Deinitialize(void)1527 CNCBlobAccessor::Deinitialize(void)
1528 {
1529 switch (m_AccessType) {
1530 case eNCReadData:
1531 if (m_ChunkMaps) {
1532 s_SubCurrentMem(s_CalcChunkMapsSize(m_CurData->map_size));
1533 delete m_ChunkMaps;
1534 m_ChunkMaps = NULL;
1535 }
1536 break;
1537 case eNCCreate:
1538 case eNCCopyCreate:
1539 if (m_NewData && m_Buffer) {
1540 s_FreeWriteBackMem(m_Buffer, m_NewData->chunk_size, 0);
1541 m_Buffer = NULL;
1542 }
1543 break;
1544 default:
1545 break;
1546 }
1547
1548 m_NewData.Reset();
1549 m_CurData.Reset();
1550 if (m_VerManager) {
1551 m_VerManager->Release();
1552 m_VerManager = NULL;
1553 }
1554 }
1555
1556 void
Release(void)1557 CNCBlobAccessor::Release(void)
1558 {
1559 Deinitialize();
1560 Terminate();
1561 }
1562
1563 void
RequestMetaInfo(CSrvTask * owner)1564 CNCBlobAccessor::RequestMetaInfo(CSrvTask* owner)
1565 {
1566 m_Owner = owner;
1567 if (m_VerManager) {
1568 m_VerManager->RequestCurVersion(this);
1569 }
1570 else {
1571 m_MetaInfoReady = true;
1572 }
1573 }
1574
1575 void
ExecuteSlice(TSrvThreadNum)1576 CNCBlobAccessor::ExecuteSlice(TSrvThreadNum /* thr_num */)
1577 {
1578 if (!IsTransFinished())
1579 return;
1580
1581 if (!m_MetaInfoReady) {
1582 // read blob metadata from db
1583 m_CurData = m_VerManager->GetCurVersion();
1584 m_MetaInfoReady = true;
1585 m_Owner->SetRunnable();
1586 }
1587 else if (m_WriteMemRequested) {
1588 // client sends data, we need memory
1589 // if no memory, wait a bit, then try again
1590 m_Buffer = s_AllocWriteBackMem(m_NewData->chunk_size, this);
1591 if (m_Buffer) {
1592 m_WriteMemRequested = false;
1593 m_Owner->SetRunnable();
1594 }
1595 }
1596 }
1597
1598 void
x_DelCorruptedVersion(void)1599 CNCBlobAccessor::x_DelCorruptedVersion(void)
1600 {
1601 SRV_LOG(Critical, "Database information about blob "
1602 << CNCBlobKeyLight(m_BlobKey).KeyForLogs()
1603 << " is corrupted. Blob will be deleted");
1604 m_VerManager->DeleteVersion(m_CurData);
1605 m_CurData->has_error = true;
1606 m_HasError = true;
1607 }
1608
1609 Uint4
GetReadMemSize(void)1610 CNCBlobAccessor::GetReadMemSize(void)
1611 {
1612 if (m_CurData->has_error) {
1613 m_HasError = true;
1614 return 0;
1615 }
1616 if (GetPosition() >= m_CurData->size) {
1617 SRV_FATAL("blob accessor broken");
1618 }
1619 if (m_Buffer) {
1620 if (m_ChunkPos < m_ChunkSize) {
1621 m_Buffer = m_CurData->chunks[m_CurChunk];
1622 return m_ChunkSize - m_ChunkPos;
1623 }
1624 ++m_CurChunk;
1625 m_ChunkPos = 0;
1626 }
1627
1628 Uint8 need_size = m_CurData->size - GetPosition() + m_ChunkPos;
1629 if (need_size > m_CurData->chunk_size)
1630 need_size = m_CurData->chunk_size;
1631
1632 m_Buffer = ACCESS_ONCE(m_CurData->chunks[m_CurChunk]);
1633 if (m_Buffer) {
1634 m_ChunkSize = Uint4(need_size);
1635 return m_ChunkSize - m_ChunkPos;
1636 }
1637
1638 if (!m_ChunkMaps) {
1639 m_ChunkMaps = new SNCChunkMaps(m_CurData->map_size);
1640 s_AddCurrentMem(s_CalcChunkMapsSize(m_CurData->map_size));
1641 }
1642 if (!CNCBlobStorage::ReadChunkData(m_CurData, m_ChunkMaps, m_CurChunk,
1643 m_Buffer, m_ChunkSize))
1644 {
1645 x_DelCorruptedVersion();
1646 return 0;
1647 }
1648 if (m_ChunkSize != need_size) {
1649 x_DelCorruptedVersion();
1650 return 0;
1651 }
1652
1653 ACCESS_ONCE(m_CurData->chunks[m_CurChunk]) = m_Buffer;
1654 return m_ChunkSize - m_ChunkPos;
1655 }
1656
1657 void
MoveReadPos(Uint4 move_size)1658 CNCBlobAccessor::MoveReadPos(Uint4 move_size)
1659 {
1660 m_ChunkPos += move_size;
1661 m_SizeRead += move_size;
1662 if (m_CurData->cur_chunk_num > m_CurChunk
1663 && m_Buffer == m_CurData->chunks[m_CurChunk])
1664 {
1665 CNCStat::DiskDataRead(move_size);
1666 }
1667 }
1668
1669 void
x_CreateNewData(void)1670 CNCBlobAccessor::x_CreateNewData(void)
1671 {
1672 if (!m_NewData) {
1673 m_NewData = m_VerManager->CreateNewVersion();
1674 m_NewData->password = m_Password;
1675 }
1676 }
1677
1678 size_t
GetWriteMemSize(void)1679 CNCBlobAccessor::GetWriteMemSize(void)
1680 {
1681 x_CreateNewData();
1682 if (m_WriteMemRequested)
1683 return 0;
1684 if (m_Buffer && m_ChunkPos < m_NewData->chunk_size)
1685 return m_NewData->chunk_size - m_ChunkPos;
1686
1687 if (m_Buffer) {
1688 if (m_ChunkPos != m_NewData->chunk_size) {
1689 SRV_FATAL("blob accessor broken");
1690 }
1691 m_NewData->AddChunkMem(m_Buffer, m_NewData->chunk_size);
1692 m_Buffer = NULL;
1693 ++m_CurChunk;
1694 m_ChunkPos = 0;
1695 }
1696 m_WriteMemRequested = true;
1697 m_Buffer = s_AllocWriteBackMem(m_NewData->chunk_size, this);
1698 if (!m_Buffer)
1699 return 0;
1700
1701 m_WriteMemRequested = false;
1702 return m_NewData->chunk_size - m_ChunkPos;
1703 }
1704
1705 void
Finalize(void)1706 CNCBlobAccessor::Finalize(void)
1707 {
1708 if (m_ChunkPos != 0) {
1709 m_Buffer = s_ReallocWriteBackMem(m_Buffer, m_NewData->chunk_size, m_ChunkPos);
1710 m_NewData->AddChunkMem(m_Buffer, m_ChunkPos);
1711 m_Buffer = NULL;
1712 }
1713 m_VerManager->FinalizeWriting(m_NewData);
1714 if (m_CurData.NotNull() && m_CurData->update_received != 0) {
1715 CWriteBackControl::RecordNotifyUpdateBlob(m_CurData->update_received);
1716 }
1717 m_CurData = m_NewData;
1718 }
1719
1720 bool
ReplaceBlobInfo(const SNCBlobVerData & new_info)1721 CNCBlobAccessor::ReplaceBlobInfo(const SNCBlobVerData& new_info)
1722 {
1723 if (!s_IsCurVerOlder(m_CurData, &new_info)) {
1724 return false;
1725 }
1726
1727 x_CreateNewData();
1728 m_NewData->create_time = new_info.create_time;
1729 m_NewData->ttl = new_info.ttl;
1730 m_NewData->dead_time = new_info.dead_time;
1731 m_NewData->expire = new_info.expire;
1732 m_NewData->password = new_info.password;
1733 m_NewData->blob_ver = new_info.blob_ver;
1734 m_NewData->ver_ttl = new_info.ver_ttl;
1735 m_NewData->ver_expire = new_info.ver_expire;
1736 m_NewData->create_server = new_info.create_server;
1737 m_NewData->create_id = new_info.create_id;
1738 return true;
1739 }
1740
1741 string
GetCurPassword(void) const1742 CNCBlobAccessor::GetCurPassword(void) const
1743 {
1744 if (m_CurData->password.empty())
1745 return m_CurData->password;
1746 else
1747 return CNCBlobStorage::PrintablePassword(m_CurData->password);
1748 }
1749
GetPurgeCount()1750 Uint8 CNCBlobAccessor::GetPurgeCount()
1751 {
1752 return s_Forgets.size() + s_ForgetKeys.size();
1753 }
1754
1755 bool
IsPurged(const CNCBlobKeyLight & nc_key) const1756 CNCBlobAccessor::IsPurged(const CNCBlobKeyLight& nc_key) const
1757 {
1758 if (nc_key.Cache().empty()) {
1759 return false;
1760 }
1761 Uint8 cr_time = GetCurBlobCreateTime();
1762 if (cr_time == 0 || cr_time > s_LatestPurge) {
1763 return false;
1764 }
1765 string key(nc_key.Cache());
1766 bool res = false;
1767 s_ConsListLock.Lock();
1768 for (int t=0; !res && t<2; ++t) {
1769 key.append(1,'\1');
1770 TForgets::const_iterator i = s_ForgetKeys.find(key);
1771 if (i != s_ForgetKeys.end()) {
1772 res = cr_time <= i->second;
1773 }
1774 key.append(nc_key.RawKey());
1775 }
1776 s_ConsListLock.Unlock();
1777 return res;
1778 }
1779
1780 bool
Purge(const CNCBlobKeyLight & nc_key,Uint8 when)1781 CNCBlobAccessor::Purge(const CNCBlobKeyLight& nc_key, Uint8 when)
1782 {
1783 bool res=false;
1784 Uint8 lifespan = 9U * 24U * 3600U * (Uint8)kUSecsPerSecond; // 9 days
1785 Uint8 now = CSrvTime::Current().AsUSec();
1786 Uint8 longago = now > lifespan ? (now-lifespan) : 0;
1787 s_LatestPurge = max(s_LatestPurge, when);
1788
1789 s_ConsListLock.Lock();
1790 ERASE_ITERATE( TForgets, f, s_Forgets) {
1791 if (f->second < longago) {
1792 s_Forgets.erase(f);
1793 res=true;
1794 }
1795 }
1796 ERASE_ITERATE( TForgets, f, s_ForgetKeys) {
1797 if (f->second < longago) {
1798 s_ForgetKeys.erase(f);
1799 res=true;
1800 }
1801 }
1802 if (when != 0) {
1803 string key(nc_key.Cache());
1804 if (nc_key.RawKey().empty()) {
1805 TForgets::iterator i = s_Forgets.find(key);
1806 if (i == s_Forgets.end()) {
1807 s_Forgets[key] = when;
1808 res=true;
1809 } else if (i->second < when) {
1810 i->second = when;
1811 res=true;
1812 }
1813 key.append(1,'\1');
1814 } else {
1815 key.append(1,'\1').append(nc_key.RawKey()).append(1,'\1');
1816 }
1817 TForgets::iterator i = s_ForgetKeys.find(key);
1818 if (i == s_ForgetKeys.end()) {
1819 s_ForgetKeys[key] = when;
1820 res=true;
1821 } else if (i->second < when) {
1822 i->second = when;
1823 res=true;
1824 }
1825 }
1826 s_ConsListLock.Unlock();
1827 return res;
1828 }
1829
GetPurgeData(char separator)1830 string CNCBlobAccessor::GetPurgeData(char separator)
1831 {
1832 string res;
1833 Purge(CNCBlobKeyLight(),0);
1834 s_ConsListLock.Lock();
1835 ITERATE( TForgets, f, s_Forgets) {
1836 res += NStr::NumericToString(f->second);
1837 res += ' ';
1838 res += f->first;
1839 res += separator;
1840 }
1841 s_ConsListLock.Unlock();
1842 return res;
1843 }
1844
UpdatePurgeData(const string & data,char separator)1845 bool CNCBlobAccessor::UpdatePurgeData(const string& data, char separator)
1846 {
1847 bool res=false, error=false;;
1848 const char *begin = data.data();
1849 const char *end = begin + data.size();
1850 if (end != begin) {
1851 s_ConsListLock.Lock();
1852 while (end != begin) {
1853 Uint8 when = NStr::StringToUInt8(begin,
1854 NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols);
1855 if (when == 0) {
1856 error=true;
1857 break;
1858 }
1859 const char *blank = strchr(begin,' ');
1860 if (!blank) {
1861 error=true;
1862 break;
1863 }
1864 const char *eol = strchr(blank,separator);
1865 if (!eol) {
1866 error=true;
1867 break;
1868 }
1869 string cache(blank+1, eol-blank-1);
1870 TForgets::const_iterator i = s_Forgets.find(cache);
1871 if (i == s_Forgets.end() || i->second < when) {
1872 s_Forgets[cache] = when;
1873 res = true;
1874 }
1875 begin = eol + 1;
1876 }
1877 s_ConsListLock.Unlock();
1878 }
1879 if (error) {
1880 SRV_LOG(Error, "Invalid PURGE data: " << data);
1881 }
1882 return res;
1883 }
1884
SetFailedWriteCount(int failed_write)1885 void CNCBlobAccessor::SetFailedWriteCount(int failed_write)
1886 {
1887 s_FailedListLock.Lock();
1888 s_FailedReserve = failed_write - s_FailedKeys.size();
1889 s_FailMonitor = failed_write > 0;
1890 s_FailedListLock.Unlock();
1891 }
GetFailedWriteCount(void)1892 int CNCBlobAccessor::GetFailedWriteCount(void)
1893 {
1894 return s_FailedReserve;
1895 }
PutFailed(const string & blob_key)1896 void CNCBlobAccessor::PutFailed(const string& blob_key)
1897 {
1898 if (s_FailMonitor) {
1899 s_FailedListLock.Lock();
1900 if (s_FailedReserve > 0) {
1901 s_FailedKeys.insert(blob_key);
1902 --s_FailedReserve;
1903 s_FailedCounter.Add(1);
1904 }
1905 s_FailedListLock.Unlock();
1906 }
1907 }
PutSucceeded(const string & blob_key)1908 void CNCBlobAccessor::PutSucceeded(const string& blob_key)
1909 {
1910 if (s_FailMonitor && s_FailedCounter.Get() != 0) {
1911 s_FailedListLock.Lock();
1912 if (s_FailedKeys.erase(blob_key)) {
1913 ++s_FailedReserve;
1914 s_FailedCounter.Add(-1);
1915 }
1916 s_FailedListLock.Unlock();
1917 }
1918 }
HasPutSucceeded(const string & blob_key)1919 bool CNCBlobAccessor::HasPutSucceeded(const string& blob_key)
1920 {
1921 bool b = false;
1922 if (s_FailMonitor && s_FailedCounter.Get() != 0) {
1923 s_FailedListLock.Lock();
1924 b = s_FailedKeys.find(blob_key) != s_FailedKeys.end();
1925 s_FailedListLock.Unlock();
1926 }
1927 return !b;
1928 }
1929
1930 /////////////////////////////////////////////////////////////////////////////
CBulkCleaner(void)1931 CBulkCleaner::CBulkCleaner(void)
1932 {
1933 SetState(&CBulkCleaner::x_StartSession);
1934 }
1935
~CBulkCleaner(void)1936 CBulkCleaner::~CBulkCleaner(void)
1937 {
1938 }
1939
1940 CBulkCleaner::State
x_StartSession(void)1941 CBulkCleaner::x_StartSession(void)
1942 {
1943 if (CTaskServer::IsInShutdown()) {
1944 return NULL;
1945 }
1946 m_CurBucket = 1;
1947 m_CrTime = 0;
1948 m_BlobAccess = NULL;
1949 s_ConsListLock.Lock();
1950 while (!s_ForgetKeys.empty()) {
1951 TForgets::const_iterator i = s_ForgetKeys.begin();
1952 m_Filter = i->first;
1953 m_CrTime = i->second;
1954 if (m_CrTime != 0) {
1955 break;
1956 }
1957 s_ForgetKeys.erase(i);
1958 }
1959 s_ConsListLock.Unlock();
1960 if (m_CrTime != 0) {
1961 CreateNewDiagCtx();
1962 return &CBulkCleaner::x_FindNext;
1963 }
1964 RunAfter(10);
1965 return NULL;
1966 }
1967
1968 CBulkCleaner::State
x_FindNext(void)1969 CBulkCleaner::x_FindNext(void)
1970 {
1971 if (CTaskServer::IsInShutdown()) {
1972 return NULL;
1973 }
1974 for (; m_CurBucket <= CNCDistributionConf::GetCntTimeBuckets(); ++m_CurBucket) {
1975 m_Key = CNCBlobStorage::FindBlob(m_CurBucket, m_Filter, m_CrTime);
1976 if (!m_Key.empty()) {
1977 return &CBulkCleaner::x_RequestBlobAccess;
1978 }
1979 }
1980 return &CBulkCleaner::x_FinishSession;
1981 }
1982
1983 CBulkCleaner::State
x_RequestBlobAccess(void)1984 CBulkCleaner::x_RequestBlobAccess(void)
1985 {
1986 CNCBlobKeyLight nc_key(m_Key);
1987 Uint2 slot, bkt;
1988 if (nc_key.IsICacheKey()) {
1989 CNCDistributionConf::GetSlotByICacheKey(nc_key, slot, bkt);
1990 } else {
1991 // we should never be here
1992 ++m_CurBucket;
1993 return &CBulkCleaner::x_FindNext;
1994 }
1995 m_BlobAccess = CNCBlobStorage::GetBlobAccess( eNCCreate, nc_key.PackedKey(), "", bkt);
1996 m_BlobAccess->RequestMetaInfo(this);
1997 return &CBulkCleaner::x_RemoveBlob;
1998 }
1999
2000 CBulkCleaner::State
x_RemoveBlob(void)2001 CBulkCleaner::x_RemoveBlob(void)
2002 {
2003 if (CTaskServer::IsInShutdown()) {
2004 return NULL;
2005 }
2006 if (!m_BlobAccess->IsMetaInfoReady()) {
2007 return NULL;
2008 }
2009
2010 // see
2011 // CNCMessageHandler::x_DoCmd_Remove
2012 if ((!m_BlobAccess || !m_BlobAccess->IsBlobExists() || m_BlobAccess->IsCurBlobExpired()))
2013 {
2014 m_BlobAccess->Release();
2015 m_BlobAccess = NULL;
2016 return &CBulkCleaner::x_FindNext;;
2017 }
2018
2019 m_BlobAccess->SetBlobTTL(m_BlobAccess->GetCurBlobTTL());
2020 m_BlobAccess->SetBlobVersion(0);
2021 int expire = CSrvTime::CurSecs() - 1;
2022 unsigned int ttl = m_BlobAccess->GetNewBlobTTL();
2023 m_BlobAccess->SetNewBlobExpire(expire, expire + ttl + 1);
2024
2025 // see
2026 // CNCMessageHandler::x_FinishReadingBlob
2027 CSrvTime cur_srv_time = CSrvTime::Current();
2028 Uint8 cur_time = cur_srv_time.AsUSec();
2029 int cur_secs = int(cur_srv_time.Sec());
2030 m_BlobAccess->SetBlobCreateTime(cur_time);
2031 if (m_BlobAccess->GetNewBlobExpire() == 0)
2032 m_BlobAccess->SetNewBlobExpire(cur_secs + m_BlobAccess->GetNewBlobTTL());
2033 m_BlobAccess->SetNewVerExpire(cur_secs + m_BlobAccess->GetNewVersionTTL());
2034 m_BlobAccess->SetCreateServer(CNCDistributionConf::GetSelfID(),
2035 CNCBlobStorage::GetNewBlobId());
2036
2037 return &CBulkCleaner::x_Finalize;
2038 }
2039
2040 CBulkCleaner::State
x_Finalize(void)2041 CBulkCleaner::x_Finalize(void)
2042 {
2043 if (m_BlobAccess) {
2044 m_BlobAccess->Finalize();
2045 m_BlobAccess->Release();
2046 m_BlobAccess = NULL;
2047
2048 {
2049 CSrvDiagMsg diag_msg;
2050 GetDiagCtx()->SetRequestID();
2051 diag_msg.StartRequest().PrintParam("_type", "bulkrmv");
2052 CNCBlobKeyLight key(m_Key);
2053 diag_msg.PrintParam("cache",key.Cache()).PrintParam("key",key.RawKey()).PrintParam("subkey",key.SubKey());
2054 diag_msg.Flush();
2055 diag_msg.StopRequest();
2056 }
2057 }
2058 return &CBulkCleaner::x_FindNext;
2059 }
2060
2061 CBulkCleaner::State
x_FinishSession(void)2062 CBulkCleaner::x_FinishSession(void)
2063 {
2064 ReleaseDiagCtx();
2065
2066 s_ConsListLock.Lock();
2067 if (!s_ForgetKeys.empty()) {
2068 TForgets::const_iterator i = s_ForgetKeys.begin();
2069 if (m_Filter == i->first && m_CrTime == i->second) {
2070 s_ForgetKeys.erase(i);
2071 }
2072 }
2073 s_ConsListLock.Unlock();
2074
2075 SetState(&CBulkCleaner::x_StartSession);
2076 SetRunnable();
2077 return NULL;
2078 }
2079
2080 END_NCBI_SCOPE
2081