1 /*  $Id: nc_storage_blob.cpp 577955 2019-01-10 14:14:00Z gouriano $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  */
28 
29 #include "nc_pch.hpp"
30 #include <corelib/request_ctx.hpp>
31 
32 #include "netcached.hpp"
33 #include "distribution_conf.hpp"
34 #include "nc_storage_blob.hpp"
35 #include "nc_storage.hpp"
36 #include "storage_types.hpp"
37 #include "nc_stat.hpp"
38 #include <set>
39 
40 BEGIN_NCBI_SCOPE
41 
42 struct SWriteBackData
43 {
44     CMiniMutex lock;
45     size_t cur_size;
46     size_t releasable_size;
47     size_t releasing_size;
48     vector<SNCBlobVerData*> *to_add_list;
49     vector<SNCBlobVerData*> *to_del_list;
50 
51     SWriteBackData(void);
52 };
53 
54 extern Uint4 s_TaskPriorityWbMemRelease;
55 static size_t s_WBSoftSizeLimit = NCBI_CONST_UINT8(2000000000);
56 static size_t s_WBHardSizeLimit = NCBI_CONST_UINT8(3000000000);
57 static int s_WBWriteTimeout = 1000;
58 static int s_WBWriteTimeout2 = 1000;
59 static Uint2 s_WBFailedWriteDelay = 2;
60 
61 static ssize_t s_WBCurSize = 0;
62 static ssize_t s_WBReleasableSize = 0;
63 static ssize_t s_WBReleasingSize = 0;
64 static SWriteBackData* s_WBData = NULL;
65 static CWriteBackControl* s_WBControl = NULL;
66 static TVerDataMap* s_VersMap = NULL;
67 
68 static CMiniMutex s_ConsListLock;
69 static Uint4 s_CntConsumers = 0;
70 static TSrvConsList s_ConsList;
71 vector<SNCBlobVerData*> s_WBToAddList;
72 vector<SNCBlobVerData*> s_WBToDelList;
73 /*
74 static Uint8 s_CntMgrs = 0;
75 static Uint8 s_CntVers = 0;
76 */
77 typedef map<string, Uint8> TForgets;
78 static TForgets s_Forgets;
79 static TForgets s_ForgetKeys;
80 static Uint8 s_LatestPurge = 0;
81 static CBulkCleaner* s_BulkCleaner = NULL;
82 
83 // for PutFailed/PutSucceeded
84 static bool s_FailMonitor = false;
85 static CMiniMutex s_FailedListLock;
86 static int s_FailedReserve=0;
87 static CAtomicCounter s_FailedCounter;
88 static set<string> s_FailedKeys;
89 
90 static Uint8 s_AnotherServerMain = 0;
91 
92 static Uint8 s_BlobSync = 0;
93 static Uint8 s_BlobSyncTDiff = 0;
94 static Uint8 s_BlobSyncMaxTDiff = 0;
95 
96 static Uint8 s_BlobNotify = 0;
97 static Uint8 s_BlobNotifyTDiff = 0;
98 static Uint8 s_BlobNotifyMaxTDiff = 0;
99 
100 
101 static const size_t kVerManagerSize = sizeof(CNCBlobVerManager)
102                                       + sizeof(CCurVerReader);
103 static const size_t kDefChunkMapsSize
104                 = sizeof(SNCChunkMaps)
105                   + (kNCMaxBlobMapsDepth + 1)
106                     * (sizeof(SNCChunkMapInfo)
107                        + (kNCMaxChunksInMap - 1) * sizeof(SNCDataCoord));
108 
109 
110 
111 
112 static bool
s_IsCurVerOlder(const SNCBlobVerData * cur_ver,const SNCBlobVerData * new_ver)113 s_IsCurVerOlder(const SNCBlobVerData* cur_ver, const SNCBlobVerData* new_ver)
114 {
115     bool res = true;
116     if (cur_ver) {
117         if (cur_ver->create_time != new_ver->create_time) {
118             res = cur_ver->create_time < new_ver->create_time;
119         } else if (cur_ver->create_server != new_ver->create_server) {
120             res = cur_ver->create_server < new_ver->create_server;
121         } else if (cur_ver->create_id != new_ver->create_id) {
122             res = cur_ver->create_id < new_ver->create_id;
123         } else if (cur_ver->expire != new_ver->expire) {
124             res = cur_ver->expire < new_ver->expire;
125         } else if (cur_ver->ver_expire != new_ver->ver_expire) {
126             res = cur_ver->ver_expire < new_ver->ver_expire;
127         } else {
128             res = cur_ver->dead_time < new_ver->dead_time;
129         }
130     }
131     return res;
132 }
133 
GetWBSoftSizeLimit(void)134 Uint8 GetWBSoftSizeLimit(void) {
135     return s_WBSoftSizeLimit;
136 }
GetWBHardSizeLimit(void)137 Uint8 GetWBHardSizeLimit(void) {
138     return s_WBHardSizeLimit;
139 }
GetWBWriteTimeout(void)140 int GetWBWriteTimeout(void) {
141     return s_WBWriteTimeout;
142 }
GetWBFailedWriteDelay(void)143 int GetWBFailedWriteDelay(void) {
144     return s_WBFailedWriteDelay;
145 }
146 
147 void
SetWBSoftSizeLimit(Uint8 limit)148 SetWBSoftSizeLimit(Uint8 limit)
149 {
150     s_WBSoftSizeLimit = size_t(limit);
151 }
152 
153 void
SetWBHardSizeLimit(Uint8 limit)154 SetWBHardSizeLimit(Uint8 limit)
155 {
156     s_WBHardSizeLimit = max(size_t(limit), s_WBSoftSizeLimit + 1000000000);
157 }
158 
159 void
SetWBWriteTimeout(int timeout1,int timeout2)160 SetWBWriteTimeout(int timeout1, int timeout2)
161 {
162     s_WBWriteTimeout = timeout1;
163     s_WBWriteTimeout2 = timeout2;
164 }
165 void
SetWBInitialSyncComplete(void)166 SetWBInitialSyncComplete(void)
167 {
168     s_WBWriteTimeout = s_WBWriteTimeout2;
169 }
170 
171 void
SetWBFailedWriteDelay(int delay)172 SetWBFailedWriteDelay(int delay)
173 {
174     s_WBFailedWriteDelay = Uint2(delay);
175 }
176 
177 static inline SWriteBackData*
s_GetWBData(void)178 s_GetWBData(void)
179 {
180     return &s_WBData[CTaskServer::GetCurThreadNum()];
181 }
182 
183 static inline size_t
s_CalcVerDataSize(SNCBlobVerData * ver_data)184 s_CalcVerDataSize(SNCBlobVerData* ver_data)
185 {
186    return sizeof(*ver_data) + ver_data->chunks.capacity() * sizeof(char*);
187 }
188 
189 static size_t
s_CalcChunkMapsSize(Uint2 map_size)190 s_CalcChunkMapsSize(Uint2 map_size)
191 {
192     if (map_size == kNCMaxChunksInMap)
193         return kDefChunkMapsSize;
194     else {
195         return sizeof(SNCChunkMaps)
196                + (kNCMaxBlobMapsDepth + 1)
197                   * (sizeof(SNCChunkMapInfo) + (map_size - 1) * sizeof(SNCDataCoord));
198     }
199 }
200 
201 static void
s_AddCurrentMem(size_t mem_size)202 s_AddCurrentMem(size_t mem_size)
203 {
204     SWriteBackData* wb_data = s_GetWBData();
205     wb_data->lock.Lock();
206     wb_data->cur_size += mem_size;
207     wb_data->lock.Unlock();
208 }
209 
210 static void
s_SubCurrentMem(size_t mem_size)211 s_SubCurrentMem(size_t mem_size)
212 {
213     SWriteBackData* wb_data = s_GetWBData();
214     wb_data->lock.Lock();
215     wb_data->cur_size -= mem_size;
216     wb_data->lock.Unlock();
217 }
218 
219 static void
s_AddReleasableMem(SNCBlobVerData * ver_data,size_t add_releasable,size_t sub_releasing)220 s_AddReleasableMem(SNCBlobVerData* ver_data,
221                    size_t add_releasable,
222                    size_t sub_releasing)
223 {
224     SWriteBackData* wb_data = s_GetWBData();
225     wb_data->lock.Lock();
226     wb_data->releasable_size += add_releasable;
227     wb_data->releasing_size -= sub_releasing;
228     if (ver_data) {
229         wb_data->to_add_list->push_back(ver_data);
230     }
231     wb_data->lock.Unlock();
232 }
233 
234 static void
s_SubReleasableMem(size_t mem_size)235 s_SubReleasableMem(size_t mem_size)
236 {
237     SWriteBackData* wb_data = s_GetWBData();
238     wb_data->lock.Lock();
239     wb_data->releasable_size -= mem_size;
240     wb_data->lock.Unlock();
241 }
242 
243 static void
s_AddReleasingMem(size_t add_releasing,size_t sub_releasable)244 s_AddReleasingMem(size_t add_releasing, size_t sub_releasable)
245 {
246     SWriteBackData* wb_data = s_GetWBData();
247     wb_data->lock.Lock();
248     wb_data->releasing_size += add_releasing;
249     wb_data->releasable_size -= sub_releasable;
250     wb_data->lock.Unlock();
251 }
252 
253 static void
s_SubReleasingMem(size_t mem_size)254 s_SubReleasingMem(size_t mem_size)
255 {
256     SWriteBackData* wb_data = s_GetWBData();
257     wb_data->lock.Lock();
258     wb_data->releasing_size -= mem_size;
259     wb_data->lock.Unlock();
260 }
261 
262 static void
s_ScheduleVerDelete(SNCBlobVerData * ver_data)263 s_ScheduleVerDelete(SNCBlobVerData* ver_data)
264 {
265     SWriteBackData* wb_data = s_GetWBData();
266     wb_data->lock.Lock();
267     wb_data->to_del_list->push_back(ver_data);
268     ver_data->delete_scheduled = true;
269     wb_data->lock.Unlock();
270 }
271 
272 static char*
s_AllocWriteBackMem(Uint4 mem_size,CSrvTransConsumer * consumer)273 s_AllocWriteBackMem(Uint4 mem_size, CSrvTransConsumer* consumer)
274 {
275     char* mem = NULL;
276     if (s_WBCurSize <= s_WBReleasingSize
277         || (size_t)(s_WBCurSize - s_WBReleasingSize) + mem_size < s_WBHardSizeLimit
278         ||  s_WBReleasableSize < (ssize_t)mem_size
279         ||  CTaskServer::IsInShutdown())
280     {
281         mem = (char*)malloc(mem_size);
282         if (mem) {
283             s_AddCurrentMem(mem_size);
284         }
285     }
286     else {
287         s_ConsListLock.Lock();
288         s_ConsList.push_front(*consumer);
289         consumer->m_TransFinished = false;
290         ++s_CntConsumers;
291         s_ConsListLock.Unlock();
292     }
293     return mem;
294 }
295 
296 static void
s_NotifyConsumers(void)297 s_NotifyConsumers(void)
298 {
299     TSrvConsList cons_list;
300 
301     s_ConsListLock.Lock();
302     s_ConsList.swap(cons_list);
303     s_ConsListLock.Unlock();
304 
305     while (!cons_list.empty()) {
306         CSrvTransConsumer* consumer = &cons_list.front();
307         cons_list.pop_front();
308         consumer->m_TransFinished = true;
309         consumer->SetRunnable();
310     }
311 }
312 
313 static char*
s_ReallocWriteBackMem(char * mem,Uint4 old_size,Uint4 new_size)314 s_ReallocWriteBackMem(char* mem, Uint4 old_size, Uint4 new_size)
315 {
316     Uint4 diff = old_size - new_size;
317     if (diff != 0) {
318         mem = (char*)realloc(mem, new_size);
319         s_SubCurrentMem(diff);
320     }
321     return mem;
322 }
323 
324 static void
s_FreeWriteBackMem(char * mem,Uint4 mem_size,Uint4 sub_releasing)325 s_FreeWriteBackMem(char* mem, Uint4 mem_size, Uint4 sub_releasing)
326 {
327     free(mem);
328 
329     SWriteBackData* wb_data = s_GetWBData();
330     wb_data->lock.Lock();
331     wb_data->cur_size -= mem_size;
332     wb_data->releasing_size -= sub_releasing;
333     wb_data->lock.Unlock();
334 }
335 
336 static void
s_ReleaseMemory(size_t soft_limit)337 s_ReleaseMemory(size_t soft_limit)
338 {
339     if (s_WBCurSize - s_WBReleasingSize <= (ssize_t)soft_limit)
340         return;
341 
342     size_t to_free = s_WBCurSize - s_WBReleasingSize - soft_limit;
343     size_t freed = 0;
344     while (freed < to_free  &&  !s_VersMap->empty()) {
345         TVerDataMap::iterator it = s_VersMap->begin();
346         SNCBlobVerData* ver_data = &*it;
347         s_VersMap->erase(it);
348 
349         if (ver_data->releasable_mem == 0)
350             continue;
351 
352         int access_time = ACCESS_ONCE(ver_data->last_access_time);
353         if (access_time != ver_data->saved_access_time) {
354             ver_data->saved_access_time = access_time;
355             s_VersMap->insert_equal(*ver_data);
356             continue;
357         }
358         freed += ver_data->RequestMemRelease();
359     }
360     s_WBReleasingSize += freed;
361     s_WBReleasableSize -= freed;
362 }
363 
364 static void
s_TransferVerList(vector<SNCBlobVerData * > & from_list,vector<SNCBlobVerData * > & to_list)365 s_TransferVerList(vector<SNCBlobVerData*>& from_list,
366                   vector<SNCBlobVerData*>& to_list)
367 {
368     if (from_list.size() == 0)
369         return;
370 
371     size_t prev_size = to_list.size();
372     to_list.resize(prev_size + from_list.size(), NULL);
373     memcpy(&to_list[prev_size], &from_list[0],
374            from_list.size() * sizeof(SNCBlobVerData*));
375     from_list.resize(0);
376 }
377 
378 static void
s_CollectWBData(SWriteBackData * wb_data)379 s_CollectWBData(SWriteBackData* wb_data)
380 {
381     vector<SNCBlobVerData*> *next_add = new vector<SNCBlobVerData*>;
382     vector<SNCBlobVerData*> *next_del = new vector<SNCBlobVerData*>;
383     vector<SNCBlobVerData*> *prev_add = nullptr, *prev_del = nullptr;
384 
385     wb_data->lock.Lock();
386     s_WBCurSize += wb_data->cur_size;
387     wb_data->cur_size = 0;
388     s_WBReleasableSize += wb_data->releasable_size;
389     wb_data->releasable_size = 0;
390     s_WBReleasingSize += wb_data->releasing_size;
391     wb_data->releasing_size = 0;
392 
393     if (next_add) {
394         prev_add = wb_data->to_add_list;
395         wb_data->to_add_list = next_add;
396     } else {
397         s_TransferVerList(*(wb_data->to_add_list), s_WBToAddList);
398     }
399     if (next_del) {
400         prev_del = wb_data->to_del_list;
401         wb_data->to_del_list = next_del;
402     } else {
403         s_TransferVerList(*(wb_data->to_del_list), s_WBToDelList);
404     }
405 
406     wb_data->lock.Unlock();
407 
408     if (prev_add) {
409         s_TransferVerList(*prev_add, s_WBToAddList);
410         delete prev_add;
411     }
412     if (prev_del) {
413         s_TransferVerList(*prev_del, s_WBToDelList);
414         delete prev_del;
415     }
416 }
417 
418 static void
s_ProcessWBAddDel(Uint4 was_del_size)419 s_ProcessWBAddDel(Uint4 was_del_size)
420 {
421 // add new verdata into map (s_VersMap) sorted by
422 // last seen access time (ver_data->saved_access_time)
423     for (Uint4 i = 0; i < s_WBToAddList.size(); ++i) {
424         SNCBlobVerData* ver_data = s_WBToAddList[i];
425         if (!ver_data->is_linked()  &&  ver_data->releasable_mem != 0
426             &&  !ver_data->delete_scheduled)
427         {
428             ver_data->saved_access_time = ver_data->last_access_time;
429             s_VersMap->insert_equal(*ver_data);
430         }
431     }
432     s_WBToAddList.clear();
433 // remove from map and delete those about to be deleted
434 //  (SNCBlobVerData request deletion)
435     for (Uint4 i = 0; i < was_del_size; ++i) {
436         SNCBlobVerData* ver_data = s_WBToDelList[i];
437         if (ver_data->is_linked()) {
438             s_VersMap->erase(s_VersMap->iterator_to(*ver_data));
439         }
440         s_WBReleasingSize -= ver_data->meta_mem;
441         s_WBCurSize -= ver_data->meta_mem;
442         ver_data->Terminate();
443     }
444     Uint4 new_size = Uint4(s_WBToDelList.size() - was_del_size);
445     if (was_del_size != 0  &&  new_size != 0) {
446         memmove(&s_WBToDelList[0], &s_WBToDelList[was_del_size],
447                 new_size * sizeof(SNCBlobVerData*));
448     }
449     s_WBToDelList.resize(new_size, NULL);
450 }
451 
452 
453 
SWriteBackData(void)454 SWriteBackData::SWriteBackData(void)
455     : cur_size(0),
456       releasable_size(0),
457       releasing_size(0)
458 {
459     to_add_list = new vector<SNCBlobVerData*>;
460     to_del_list = new vector<SNCBlobVerData*>;
461 }
462 
463 
CWriteBackControl(void)464 CWriteBackControl::CWriteBackControl(void)
465 {
466 #if __NC_TASKS_MONITOR
467     m_TaskName = "CWriteBackControl";
468 #endif
469 }
470 
~CWriteBackControl(void)471 CWriteBackControl::~CWriteBackControl(void)
472 {}
473 
474 void
Initialize(void)475 CWriteBackControl::Initialize(void)
476 {
477     s_VersMap = new TVerDataMap();
478     s_WBData = new SWriteBackData[CTaskServer::GetMaxRunningThreads()];
479     s_WBControl = new CWriteBackControl();
480     s_WBControl->SetRunnable();
481 
482     s_BulkCleaner = new CBulkCleaner;
483     s_BulkCleaner->SetRunnable();
484 }
485 
486 void
ExecuteSlice(TSrvThreadNum)487 CWriteBackControl::ExecuteSlice(TSrvThreadNum /* thr_num */)
488 {
489 // monitor write-back cache statistics (where blobs are stored before being saved on disk)
490 
491     if (CTaskServer::IsInShutdown())
492         return;
493 
494     ssize_t was_cur_size = s_WBCurSize;
495     Uint4 was_del_size = Uint4(s_WBToDelList.size());
496 
497     Uint4 cnt_datas = CTaskServer::GetMaxRunningThreads();
498     for (Uint4 i = 0; i < cnt_datas; ++i) {
499 // collect stat from all threads
500         s_CollectWBData(&s_WBData[i]);
501     }
502 
503 // delete SNCBlobVerData
504     s_ProcessWBAddDel(was_del_size);
505 
506     size_t cur_size_change = 0;
507     if (s_WBCurSize > was_cur_size)
508         cur_size_change = s_WBCurSize - was_cur_size;
509     size_t soft_limit = s_WBSoftSizeLimit;
510     if (cur_size_change < soft_limit)
511         soft_limit -= cur_size_change;
512     Uint4 cnt_cons = ACCESS_ONCE(s_CntConsumers);
513     if (soft_limit > cnt_cons * kNCMaxBlobChunkSize)
514         soft_limit -= cnt_cons * kNCMaxBlobChunkSize;
515 
516     s_ReleaseMemory(soft_limit);
517     s_NotifyConsumers();
518 
519     RunAfter(1);
520 }
521 
522 void
ReadState(SNCStateStat & state)523 CWriteBackControl::ReadState(SNCStateStat& state)
524 {
525     state.wb_size = (ssize_t(s_WBCurSize) > 0? s_WBCurSize: 0);
526     state.wb_releasable = (ssize_t(s_WBReleasableSize) > 0? s_WBReleasableSize: 0);
527     state.wb_releasing = (ssize_t(s_WBReleasingSize) > 0? s_WBReleasingSize: 0);
528 
529     state.cnt_another_server_main = s_AnotherServerMain;
530     Uint8 prev = s_BlobSync;
531     if (prev != 0) {
532         state.avg_tdiff_blobcopy = s_BlobSyncTDiff / prev;
533         state.max_tdiff_blobcopy = s_BlobSyncMaxTDiff;
534     } else {
535         state.avg_tdiff_blobcopy = 0;
536         state.max_tdiff_blobcopy = 0;
537     }
538     prev = s_BlobNotify;
539     if (prev != 0) {
540         state.avg_tdiff_blobnotify = s_BlobNotifyTDiff / prev;
541         state.max_tdiff_blobnotify = s_BlobNotifyMaxTDiff;
542     } else {
543         state.avg_tdiff_blobnotify = 0;
544         state.max_tdiff_blobnotify = 0;
545     }
546 }
547 
AnotherServerMain(void)548 void CWriteBackControl::AnotherServerMain(void)
549 {
550     AtomicAdd( s_AnotherServerMain, 1);
551 }
552 
StartSyncBlob(Uint8 create_time)553 void CWriteBackControl::StartSyncBlob(Uint8 create_time)
554 {
555     if (!CNCServer::IsInitiallySynced()) {
556         return;
557     }
558     Uint8 tdiff = CSrvTime::Current().AsUSec() - create_time;
559     Uint8 prev = s_BlobSync;
560     Uint8 prevdiff = s_BlobSyncTDiff;
561     AtomicAdd(s_BlobSync, 1);
562     AtomicAdd(s_BlobSyncTDiff, tdiff);
563     if (prev > s_BlobSync || prevdiff > s_BlobSyncTDiff) {
564         s_BlobSync = 0;
565         s_BlobSyncTDiff = 0;
566         s_BlobSyncMaxTDiff = 0;
567     }
568     if (s_BlobSyncMaxTDiff < tdiff) {
569         s_BlobSyncMaxTDiff = tdiff;
570     }
571 }
572 
573 void
RecordNotifyUpdateBlob(Uint8 update_received)574 CWriteBackControl::RecordNotifyUpdateBlob(Uint8 update_received)
575 {
576     if (!CNCServer::IsInitiallySynced()) {
577         return;
578     }
579     Uint8 tdiff = CSrvTime::Current().AsUSec() - update_received;
580     Uint8 prev = s_BlobNotify;
581     Uint8 prevdiff = s_BlobNotifyTDiff;
582     AtomicAdd(s_BlobNotify, 1);
583     AtomicAdd(s_BlobNotifyTDiff, tdiff);
584     if (prev > s_BlobNotify || prevdiff > s_BlobNotifyTDiff) {
585         s_BlobNotify = 0;
586         s_BlobNotifyTDiff = 0;
587         s_BlobNotifyMaxTDiff = 0;
588     }
589     if (s_BlobNotifyMaxTDiff < tdiff) {
590         s_BlobNotifyMaxTDiff = tdiff;
591     }
592 }
593 
594 void
ResetStatCounters(void)595 CWriteBackControl::ResetStatCounters(void)
596 {
597     s_BlobSync = 0;
598     s_BlobSyncTDiff = 0;
599     s_BlobSyncMaxTDiff = 0;
600 }
601 
602 
603 void
x_DeleteCurVersion(void)604 CNCBlobVerManager::x_DeleteCurVersion(void)
605 {
606     m_CacheData->coord.clear();
607     m_CacheData->dead_time = 0;
608     CNCBlobStorage::ChangeCacheDeadTime(m_CacheData);
609     m_CacheData->expire = 0;
610     if (m_CurVersion) {
611         m_CurVersion->SetNotCurrent();
612         m_CurVersion.Reset();
613     }
614 }
615 
616 void
DeleteVersion(const SNCBlobVerData * ver_data)617 CNCBlobVerManager::DeleteVersion(const SNCBlobVerData* ver_data)
618 {
619     m_CacheData->lock.Lock();
620     _ASSERT(m_CacheData->Get_ver_mgr() == this);
621     if (m_CurVersion == ver_data)
622         x_DeleteCurVersion();
623     m_CacheData->lock.Unlock();
624 }
625 
626 void
DeleteDeadVersion(int)627 CNCBlobVerManager::DeleteDeadVersion(int /*cut_time*/)
628 {
629     m_CacheData->lock.Lock();
630     _ASSERT(m_CacheData->Get_ver_mgr() == this);
631 #if 0
632     CSrvRef<SNCBlobVerData> cur_ver(m_CurVersion);
633     if (m_CurVersion) {
634         if (m_CurVersion->dead_time <= cut_time)
635             x_DeleteCurVersion();
636     }
637     else if (!m_CacheData->coord.empty()  &&  m_CacheData->dead_time <= cut_time) {
638         x_DeleteCurVersion();
639     }
640 #else
641     x_DeleteCurVersion();
642 #endif
643     m_CacheData->lock.Unlock();
644 }
645 
CNCBlobVerManager(Uint2 time_bucket,const string & key,SNCCacheData * cache_data)646 CNCBlobVerManager::CNCBlobVerManager(Uint2         time_bucket,
647                                      const string& key,
648                                      SNCCacheData* cache_data)
649     : m_TimeBucket(time_bucket),
650       m_NeedReleaseMem(false),
651       m_NeedAbort(false),
652       m_CacheData(cache_data),
653       m_CurVerReader(new CCurVerReader(this)),
654       m_Key(key)
655 {
656     CNCBlobStorage::ReferenceCacheData(m_CacheData);
657 //    m_CacheData->ver_mgr = this;
658     if (!AtomicCAS(cache_data->ver_mgr, nullptr, this)) {
659 #ifdef _DEBUG
660 CNCAlerts::Register(CNCAlerts::eDebugCacheFailedMgrAttach,"CNCBlobVerManager ctor");
661 #endif
662     }
663     //AtomicAdd(s_CntMgrs, 1);
664     //Uint8 cnt = AtomicAdd(s_CntMgrs, 1);
665     //INFO("CNCBlobVerManager, cnt=" << cnt);
666 #if __NC_TASKS_MONITOR
667     m_TaskName = "CNCBlobVerManager";
668 #endif
669     SetPriority(s_TaskPriorityWbMemRelease);
670 }
671 
~CNCBlobVerManager(void)672 CNCBlobVerManager::~CNCBlobVerManager(void)
673 {
674     //AtomicSub(s_CntMgrs, 1);
675     //Uint8 cnt = AtomicSub(s_CntMgrs, 1);
676     //INFO("~CNCBlobVerManager, cnt=" << cnt);
677 }
678 
679 void
ObtainReference(void)680 CNCBlobVerManager::ObtainReference(void)
681 {
682     AddReference();
683     if (ReferencedOnlyOnce()) {
684         m_NeedReleaseMem = false;
685         if (m_CurVersion)
686             m_CurVersion->SetNonReleasable();
687     }
688 }
689 
690 CNCBlobVerManager*
Get(Uint2 time_bucket,const string & key,SNCCacheData * cache_data,bool for_new_version)691 CNCBlobVerManager::Get(Uint2         time_bucket,
692                        const string& key,
693                        SNCCacheData* cache_data,
694                        bool          for_new_version)
695 {
696     cache_data->lock.Lock();
697     CNCBlobVerManager* mgr = cache_data->Get_ver_mgr();
698     if (mgr) {
699         mgr->ObtainReference();
700     }
701     else if (for_new_version  ||  !cache_data->coord.empty()) {
702         mgr = new CNCBlobVerManager(time_bucket, key, cache_data);
703         mgr->AddReference();
704         s_AddCurrentMem(kVerManagerSize);
705     }
706     cache_data->lock.Unlock();
707 
708     return mgr;
709 }
710 
711 void
Release(void)712 CNCBlobVerManager::Release(void)
713 {
714     SNCCacheData* cache_data = m_CacheData;
715     cache_data->lock.Lock();
716     // DeleteThis below should be executed under the lock
717     RemoveReference();
718     cache_data->lock.Unlock();
719 }
720 
721 void
DeleteThis(void)722 CNCBlobVerManager::DeleteThis(void)
723 {
724     if (m_CurVersion)
725         m_CurVersion->SetReleasable();
726     SetRunnable();
727 }
728 
729 void
x_ReleaseMgr(void)730 CNCBlobVerManager::x_ReleaseMgr(void)
731 {
732     SNCCacheData*  cache_data = m_CacheData;
733     m_CacheData = nullptr;
734     if (cache_data) {
735 //        cache_data->ver_mgr = NULL;
736         if (!AtomicCAS(cache_data->ver_mgr, this, nullptr)) {
737 #ifdef _DEBUG
738 CNCAlerts::Register(CNCAlerts::eDebugCacheFailedMgrDetach,"x_ReleaseMgr");
739 if (cache_data->ver_mgr != nullptr) {
740 CNCAlerts::Register(CNCAlerts::eDebugCacheWrongMgr,"x_ReleaseMgr");
741 }
742 #endif
743         }
744         if (m_CurVersion) {
745             if (!m_NeedAbort || m_Key == cache_data->key) {
746                 if (!cache_data->coord.empty() && cache_data->coord != m_CurVersion->coord) {
747 #ifdef _DEBUG
748 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted1,"x_ReleaseMgr");
749 #endif
750                     CExpiredCleaner::x_DeleteData(cache_data);
751                 }
752                 cache_data->coord = m_CurVersion->coord;
753             }
754             m_CurVersion.Reset();
755         }
756         else {
757             s_SubCurrentMem(kVerManagerSize);
758         }
759 
760         if (cache_data->dead_time == 0 && !cache_data->coord.empty()) {
761 #ifdef _DEBUG
762 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted2,"x_ReleaseMgr");
763 #endif
764             CExpiredCleaner::x_DeleteData(cache_data);
765         }
766         cache_data->lock.Unlock();
767         if (!m_NeedAbort) {
768             CNCBlobStorage::ReleaseCacheData(cache_data);
769         }
770     } else {
771         s_SubCurrentMem(kVerManagerSize);
772     }
773     if (!Referenced()) {
774         m_CurVerReader->Terminate();
775         m_CurVerReader = nullptr;
776         Terminate();
777     }
778 }
779 
780 void
ExecuteSlice(TSrvThreadNum)781 CNCBlobVerManager::ExecuteSlice(TSrvThreadNum /* thr_num */)
782 {
783 // CNCBlobAccessor has a reference to this one.
784 // for each blob key there is only one CNCBlobVerManager.
785 
786     CSrvRef<SNCBlobVerData> cur_ver;
787 
788     m_CacheData->lock.Lock();
789 
790     if (m_CacheData->Get_ver_mgr() != this) {
791         m_NeedAbort = true;
792 #ifdef _DEBUG
793 CNCAlerts::Register(CNCAlerts::eDebugCacheWrong,"CNCBlobVerManager::ExecuteSlice");
794 #endif
795         x_ReleaseMgr();
796         return;
797     }
798 
799     cur_ver = m_CurVersion;
800     if (!cur_ver  &&  !Referenced()) {
801         x_ReleaseMgr();
802         return;
803     }
804     if (!cur_ver) {
805         m_CacheData->lock.Unlock();
806         return;
807     }
808 
809 // initially, blob is in memory
810 // check if it is time to be saved onto disk
811     int cur_time = CSrvTime::CurSecs();
812     int write_time = ACCESS_ONCE(cur_ver->need_write_time);
813 
814 #if 1
815     if (cur_time + 60  >= cur_ver->dead_time) {
816         write_time = 0;
817     }
818 #endif
819 
820     if (write_time != 0) {
821         m_CacheData->lock.Unlock();
822         if (write_time <= cur_time  ||  CTaskServer::IsInShutdown()) {
823             if (!CNCBlobStorage::IsAbandoned()) {
824                 CNCBlobStorage::ReferenceCacheData(m_CacheData);
825                 cur_ver->RequestDataWrite();
826             }
827         } else {
828             RunAfter(write_time - cur_time);
829         }
830         return;
831     }
832 
833 // if requested, remove metadata from memory
834     if (m_NeedReleaseMem  &&  !Referenced()) {
835         x_ReleaseMgr();
836         return;
837     }
838     m_NeedReleaseMem = false;
839 
840     m_CacheData->lock.Unlock();
841 }
842 
RevokeDataWrite()843 void CNCBlobVerManager::RevokeDataWrite()
844 {
845     CNCBlobStorage::ReleaseCacheData(m_CacheData);
846 }
847 
DataWritten(void)848 void CNCBlobVerManager::DataWritten(void)
849 {
850     if (m_CurVersion) {
851         m_CacheData->lock.Lock();
852         if (!m_CacheData->coord.empty() && m_CacheData->coord != m_CurVersion->coord) {
853 #ifdef _DEBUG
854 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted3,"DataWritten");
855 #endif
856             CExpiredCleaner::x_DeleteData(m_CacheData);
857         }
858         m_CacheData->coord = m_CurVersion->coord;
859         if (m_CacheData->dead_time == 0 && !m_CacheData->coord.empty()) {
860 #ifdef _DEBUG
861 CNCAlerts::Register(CNCAlerts::eDebugCacheDeleted4,"DataWritten");
862 #endif
863             CExpiredCleaner::x_DeleteData(m_CacheData);
864         }
865         m_CacheData->lock.Unlock();
866         CNCBlobStorage::ReleaseCacheData(m_CacheData);
867     }
868 }
869 
870 void
RequestMemRelease(void)871 CNCBlobVerManager::RequestMemRelease(void)
872 {
873     m_CacheData->lock.Lock();
874     if (!Referenced()) {
875         m_NeedReleaseMem = true;
876         SetRunnable();
877     }
878     m_CacheData->lock.Unlock();
879 }
880 
881 CSrvRef<SNCBlobVerData>
GetCurVersion(void)882 CNCBlobVerManager::GetCurVersion(void)
883 {
884     CSrvRef<SNCBlobVerData> to_del_ver;
885     CSrvRef<SNCBlobVerData> cur_ver;
886 
887     m_CacheData->lock.Lock();
888     _ASSERT(m_CacheData->Get_ver_mgr() == this);
889     if (m_CurVersion
890         &&  m_CurVersion->dead_time <= CSrvTime::CurSecs())
891     {
892         to_del_ver = m_CurVersion;
893         x_DeleteCurVersion();
894     }
895     cur_ver = m_CurVersion;
896     m_CacheData->lock.Unlock();
897 
898     return cur_ver;
899 }
900 
901 CSrvRef<SNCBlobVerData>
CreateNewVersion(void)902 CNCBlobVerManager::CreateNewVersion(void)
903 {
904     SNCBlobVerData* data = new SNCBlobVerData(this);
905 //    data->manager       = this;
906     data->coord.clear();
907     data->data_coord.clear();
908     data->create_time   = 0;
909     data->expire        = 0;
910     data->dead_time     = 0;
911     data->size          = 0;
912     data->blob_ver      = 0;
913     data->chunk_size    = kNCMaxBlobChunkSize;
914     data->map_size      = kNCMaxChunksInMap;
915     data->meta_mem      = s_CalcVerDataSize(data);
916     s_AddCurrentMem(data->meta_mem);
917     return SrvRef(data);
918 }
919 
920 void
FinalizeWriting(SNCBlobVerData * ver_data)921 CNCBlobVerManager::FinalizeWriting(SNCBlobVerData* ver_data)
922 {
923     CSrvRef<SNCBlobVerData> old_ver(ver_data);
924 
925     m_CacheData->lock.Lock();
926     _ASSERT(m_CacheData->Get_ver_mgr() == this);
927     if (ver_data->dead_time > CSrvTime::CurSecs()
928         &&  s_IsCurVerOlder(m_CurVersion, ver_data))
929     {
930         old_ver.Swap(m_CurVersion);
931         m_CacheData->coord = m_CurVersion->coord;
932         m_CacheData->dead_time = m_CurVersion->dead_time;
933         if (m_CacheData->saved_dead_time != m_CacheData->dead_time) {
934             CNCBlobStorage::ChangeCacheDeadTime(m_CacheData);
935         }
936         m_CacheData->create_id = m_CurVersion->create_id;
937         m_CacheData->create_server = m_CurVersion->create_server;
938         m_CacheData->create_time = m_CurVersion->create_time;
939         m_CacheData->expire = m_CurVersion->expire;
940         m_CacheData->ver_expire = m_CurVersion->ver_expire;
941         m_CacheData->size = m_CurVersion->size;
942         m_CacheData->chunk_size = m_CurVersion->chunk_size;
943         m_CacheData->map_size = m_CurVersion->map_size;
944 
945         m_CurVersion->meta_has_changed = true;
946         m_CurVersion->last_access_time = CSrvTime::CurSecs();
947         m_CurVersion->need_write_time = m_CurVersion->last_access_time
948                                         + s_WBWriteTimeout;
949         if (old_ver)
950             old_ver->SetNotCurrent();
951         m_CurVersion->SetCurrent();
952 
953         SetRunnable();
954     }
955     m_CacheData->lock.Unlock();
956 
957     old_ver.Reset();
958 }
959 
960 void
DeadTimeChanged(SNCBlobVerData * ver_data)961 CNCBlobVerManager::DeadTimeChanged(SNCBlobVerData* ver_data)
962 {
963     m_CacheData->lock.Lock();
964     if (m_CurVersion == ver_data) {
965         m_CacheData->dead_time = ver_data->dead_time;
966         m_CacheData->expire = ver_data->expire;
967         m_CacheData->ver_expire = ver_data->ver_expire;
968         m_CurVersion->last_access_time = CSrvTime::CurSecs();
969         m_CurVersion->need_write_time = m_CurVersion->last_access_time
970                                         + s_WBWriteTimeout;
971         m_CurVersion->meta_has_changed = true;
972 
973         SetRunnable();
974     }
975     m_CacheData->lock.Unlock();
976 }
977 
978 
CCurVerReader(CNCBlobVerManager * mgr)979 CCurVerReader::CCurVerReader(CNCBlobVerManager* mgr)
980     : m_VerMgr(mgr)
981 {
982 #if __NC_TASKS_MONITOR
983     m_TaskName = "CCurVerReader";
984 #endif
985 }
986 
~CCurVerReader(void)987 CCurVerReader::~CCurVerReader(void)
988 {}
989 
990 void
ExecuteSlice(TSrvThreadNum thr_num)991 CCurVerReader::ExecuteSlice(TSrvThreadNum thr_num)
992 {
993     if (IsTransStateFinal())
994         return;
995 
996 // read current blob metadata from db
997 
998     SNCCacheData* cache_data = m_VerMgr->m_CacheData;
999     if (cache_data->coord.empty()) {
1000         FinishTransition();
1001         return;
1002     }
1003 
1004     SNCBlobVerData* ver_data = new SNCBlobVerData(m_VerMgr);
1005 //    ver_data->manager = m_VerMgr;
1006     ver_data->coord = cache_data->coord;
1007     ver_data->create_time = cache_data->create_time;
1008     ver_data->size = cache_data->size;
1009     ver_data->chunk_size = cache_data->chunk_size;
1010     ver_data->map_size = cache_data->map_size;
1011     ver_data->is_cur_version = true;
1012     ver_data->last_access_time = CSrvTime::CurSecs();
1013     if (ver_data->size != 0) {
1014         ver_data->cnt_chunks = (ver_data->size - 1) / ver_data->chunk_size + 1;
1015         ver_data->chunks.resize(ver_data->cnt_chunks, NULL);
1016     }
1017     ver_data->cur_chunk_num = ver_data->cnt_chunks;
1018     ver_data->meta_mem = s_CalcVerDataSize(ver_data);
1019     s_AddCurrentMem(ver_data->meta_mem);
1020     ver_data->meta_mem += kVerManagerSize;
1021     m_VerMgr->m_CurVersion = ver_data;
1022     if (!CNCBlobStorage::ReadBlobInfo(ver_data)) {
1023         SRV_LOG(Error, "Problem reading meta-information about blob "
1024                           << CNCBlobKeyLight(m_VerMgr->m_Key).KeyForLogs());
1025         CSrvRef<SNCBlobVerData> cur_ver(ver_data);
1026         m_VerMgr->DeleteVersion(ver_data);
1027     }
1028 
1029     FinishTransition();
1030 }
1031 
1032 
SNCChunkMaps(Uint2 map_size)1033 SNCChunkMaps::SNCChunkMaps(Uint2 map_size)
1034 {
1035     size_t mem_size = (char*)maps[0]->coords - (char*)maps[0]
1036                       + map_size * sizeof(maps[0]->coords[0]);
1037     for (Uint1 i = 0; i <= kNCMaxBlobMapsDepth; ++i) {
1038         maps[i] = (SNCChunkMapInfo*)calloc(mem_size, 1);
1039     }
1040 }
1041 
~SNCChunkMaps(void)1042 SNCChunkMaps::~SNCChunkMaps(void)
1043 {
1044     for (Uint1 i = 0; i <= kNCMaxBlobMapsDepth; ++i) {
1045         free(maps[i]);
1046     }
1047 }
1048 
1049 
CWBMemDeleter(char * mem,Uint4 mem_size)1050 CWBMemDeleter::CWBMemDeleter(char* mem, Uint4 mem_size)
1051     : m_Mem(mem),
1052       m_MemSize(mem_size)
1053 {}
1054 
~CWBMemDeleter(void)1055 CWBMemDeleter::~CWBMemDeleter(void)
1056 {}
1057 
1058 void
ExecuteRCU(void)1059 CWBMemDeleter::ExecuteRCU(void)
1060 {
1061     s_FreeWriteBackMem(m_Mem, m_MemSize, m_MemSize);
1062     delete this;
1063 }
1064 
1065 
SNCBlobVerData(CNCBlobVerManager * mgr)1066 SNCBlobVerData::SNCBlobVerData(CNCBlobVerManager* mgr)
1067     :   size(0),
1068         create_time(0),
1069         create_server(0),
1070         create_id(0),
1071         updated_on_server(0),
1072         updated_at_time(0),
1073         update_received(0),
1074         ttl(0),
1075         expire(0),
1076         dead_time(0),
1077         blob_ver(0),
1078         ver_ttl(0),
1079         ver_expire(0),
1080         chunk_size(0),
1081         map_size(0),
1082         map_depth(0),
1083         has_error(false),
1084         is_cur_version(false),
1085         meta_has_changed(false),
1086         move_or_rewrite(false),
1087         is_releasable(false),
1088         request_data_write(false),
1089         need_write_all(false),
1090         need_stop_write(false),
1091         need_mem_release(false),
1092         delete_scheduled(false),
1093         map_move_counter(0),
1094         last_access_time(0),
1095         need_write_time(0),
1096         cnt_chunks(0),
1097         cur_chunk_num(0),
1098         chunk_maps(NULL),
1099         ver_manager(mgr),
1100         saved_access_time(0),
1101         meta_mem(0),
1102         data_mem(0),
1103         releasable_mem(0),
1104         releasing_mem(0)
1105 {
1106     //AtomicAdd(s_CntVers, 1);
1107     //Uint8 cnt = AtomicAdd(s_CntVers, 1);
1108     //INFO("SNCBlobVerData, cnt=" << cnt);
1109 #if __NC_TASKS_MONITOR
1110     m_TaskName = "SNCBlobVerData";
1111 #endif
1112     SetPriority(s_TaskPriorityWbMemRelease);
1113 }
1114 
~SNCBlobVerData(void)1115 SNCBlobVerData::~SNCBlobVerData(void)
1116 {
1117     if (chunk_maps) {
1118         SRV_FATAL("chunk_maps not released");
1119     }
1120 
1121     //AtomicSub(s_CntVers, 1);
1122     //Uint8 cnt = AtomicSub(s_CntVers, 1);
1123     //INFO("~SNCBlobVerData, cnt=" << cnt);
1124 }
1125 
1126 void
DeleteThis(void)1127 SNCBlobVerData::DeleteThis(void)
1128 {
1129     wb_mem_lock.Lock();
1130     if (!is_cur_version) {
1131         s_AddReleasingMem(releasable_mem + meta_mem, releasable_mem);
1132         releasing_mem += releasable_mem + meta_mem;
1133         releasable_mem = 0;
1134         need_mem_release = true;
1135     }
1136     else if (releasable_mem != 0) {
1137         s_AddReleasingMem(releasable_mem, releasable_mem);
1138         releasing_mem += releasable_mem;
1139         releasable_mem = 0;
1140     }
1141     ver_manager = NULL;
1142     wb_mem_lock.Unlock();
1143 
1144     SetRunnable();
1145 }
1146 
1147 void
RequestDataWrite(void)1148 SNCBlobVerData::RequestDataWrite(void)
1149 {
1150     wb_mem_lock.Lock();
1151     request_data_write = true;
1152     need_write_all = true;
1153     need_write_time = 0;
1154     size_t mem_size = releasable_mem;
1155     if (is_releasable)
1156         mem_size -= meta_mem;
1157     s_AddReleasingMem(mem_size, mem_size);
1158     releasing_mem += mem_size;
1159     releasable_mem -= mem_size;
1160     wb_mem_lock.Unlock();
1161 
1162     SetRunnable();
1163 }
1164 
1165 size_t
RequestMemRelease(void)1166 SNCBlobVerData::RequestMemRelease(void)
1167 {
1168     wb_mem_lock.Lock();
1169     size_t mem_size = releasable_mem;
1170     releasing_mem += mem_size;
1171     releasable_mem = 0;
1172     need_write_time = 0;
1173     need_mem_release = true;
1174     wb_mem_lock.Unlock();
1175 
1176     SetRunnable();
1177     return mem_size;
1178 }
1179 
1180 void
SetNotCurrent(void)1181 SNCBlobVerData::SetNotCurrent(void)
1182 {
1183     wb_mem_lock.Lock();
1184     if (request_data_write) {
1185 #ifdef _DEBUG
1186 CNCAlerts::Register(CNCAlerts::eDebugDeleteSNCBlobVerData,"SetNotCurrent");
1187 #endif
1188         request_data_write = false;
1189         ver_manager->RevokeDataWrite();
1190     }
1191     is_cur_version = false;
1192     need_stop_write = true;
1193     meta_mem -= kVerManagerSize;
1194     wb_mem_lock.Unlock();
1195 
1196     SetRunnable();
1197 }
1198 
1199 void
SetCurrent(void)1200 SNCBlobVerData::SetCurrent(void)
1201 {
1202     wb_mem_lock.Lock();
1203     is_cur_version = true;
1204     meta_mem += kVerManagerSize;
1205     wb_mem_lock.Unlock();
1206 }
1207 
1208 void
SetReleasable(void)1209 SNCBlobVerData::SetReleasable(void)
1210 {
1211     last_access_time = CSrvTime::CurSecs();
1212 
1213     wb_mem_lock.Lock();
1214     is_releasable = true;
1215     releasable_mem += meta_mem;
1216     s_AddReleasableMem(this, meta_mem, 0);
1217     wb_mem_lock.Unlock();
1218 }
1219 
1220 void
SetNonReleasable(void)1221 SNCBlobVerData::SetNonReleasable(void)
1222 {
1223     last_access_time = CSrvTime::CurSecs();
1224 
1225     wb_mem_lock.Lock();
1226     is_releasable = false;
1227     if (releasable_mem != 0) {
1228         if (releasable_mem < meta_mem) {
1229             SRV_FATAL("blob ver data broken");
1230         }
1231         releasable_mem -= meta_mem;
1232         s_SubReleasableMem(meta_mem);
1233     }
1234     else {
1235         if (releasing_mem < meta_mem) {
1236             SRV_FATAL("blob ver data broken");
1237         }
1238         releasing_mem -= meta_mem;
1239         need_mem_release = false;
1240         s_SubReleasingMem(meta_mem);
1241     }
1242     wb_mem_lock.Unlock();
1243 }
1244 
1245 void
x_FreeChunkMaps(void)1246 SNCBlobVerData::x_FreeChunkMaps(void)
1247 {
1248     if (chunk_maps) {
1249         s_SubCurrentMem(s_CalcChunkMapsSize(map_size));
1250         delete chunk_maps;
1251         chunk_maps = NULL;
1252     }
1253 }
1254 
1255 bool
x_WriteBlobInfo(void)1256 SNCBlobVerData::x_WriteBlobInfo(void)
1257 {
1258     if (!meta_has_changed)
1259         return true;
1260 
1261     CNCBlobVerManager* mgr = ACCESS_ONCE(ver_manager);
1262     if (!mgr) {
1263         need_stop_write = true;
1264         return true;
1265     }
1266 
1267     if (move_or_rewrite  ||  !AtomicCAS(move_or_rewrite, false, true)) {
1268         need_write_all = true;
1269         return true;
1270     }
1271 
1272     meta_has_changed = false;
1273     bool new_write = coord.empty();
1274     if (!CNCBlobStorage::WriteBlobInfo(mgr->GetKey(), this, chunk_maps, cnt_chunks, mgr->GetCacheData()))
1275     {
1276 #ifdef _DEBUG
1277 CNCAlerts::Register(CNCAlerts::eDebugWriteBlobInfoFailed,"x_WriteBlobInfo");
1278 #endif
1279         meta_has_changed = true;
1280         move_or_rewrite = false;
1281         need_write_all = true;
1282         RunAfter(s_WBFailedWriteDelay);
1283         return false;
1284     }
1285     if (new_write) {
1286         CNCStat::DiskBlobWrite(size);
1287     }
1288     x_FreeChunkMaps();
1289 
1290     move_or_rewrite = false;
1291     return true;
1292 }
1293 
1294 bool
x_WriteCurChunk(char * write_mem,Uint4 write_size)1295 SNCBlobVerData::x_WriteCurChunk(char* write_mem, Uint4 write_size)
1296 {
1297     if (!chunk_maps) {
1298         chunk_maps = new SNCChunkMaps(map_size);
1299         s_AddCurrentMem(s_CalcChunkMapsSize(map_size));
1300     }
1301     CNCBlobVerManager* mgr = ACCESS_ONCE(ver_manager);
1302     if (!mgr) {
1303         need_stop_write = true;
1304         return true;
1305     }
1306     char* new_mem = CNCBlobStorage::WriteChunkData(
1307                                         this, chunk_maps, mgr->GetCacheData(),
1308                                         cur_chunk_num, write_mem, write_size);
1309     if (!new_mem) {
1310         RunAfter(s_WBFailedWriteDelay);
1311         return false;
1312     }
1313     CNCStat::DiskDataWrite(write_size);
1314 
1315     wb_mem_lock.Lock();
1316     chunks[cur_chunk_num] = new_mem;
1317     ++cur_chunk_num;
1318     if (data_mem < write_size) {
1319         SRV_FATAL("blob ver data broken");
1320     }
1321     data_mem -= write_size;
1322     if (releasing_mem != 0) {
1323         if (releasing_mem < write_size) {
1324             SRV_FATAL("blob ver data broken");
1325         }
1326         releasing_mem -= write_size;
1327     }
1328     else {
1329         if (releasable_mem < write_size) {
1330             SRV_FATAL("blob ver data broken");
1331         }
1332         releasable_mem -= write_size;
1333         // memory will be subtracted from global releasing after call to
1334         // deleter below
1335         s_AddReleasingMem(write_size, write_size);
1336     }
1337     wb_mem_lock.Unlock();
1338 
1339     CWBMemDeleter* deleter = new CWBMemDeleter(write_mem, write_size);
1340     deleter->CallRCU();
1341 
1342     return true;
1343 }
1344 
1345 bool
x_ExecuteWriteAll(void)1346 SNCBlobVerData::x_ExecuteWriteAll(void)
1347 {
1348     wb_mem_lock.Lock();
1349 
1350     if (need_stop_write) {
1351         need_write_all = false;
1352         need_stop_write = false;
1353         if (!need_mem_release  &&  releasing_mem != 0) {
1354             s_AddReleasableMem(this, releasing_mem, releasing_mem);
1355             releasable_mem += releasing_mem;
1356             releasing_mem = 0;
1357         }
1358         wb_mem_lock.Unlock();
1359         return false;
1360     }
1361     else if (cur_chunk_num == cnt_chunks) {
1362         need_write_all = false;
1363         wb_mem_lock.Unlock();
1364 
1365         if (x_WriteBlobInfo())
1366             SetRunnable();
1367         return true;
1368     }
1369     char* write_mem = chunks[cur_chunk_num];
1370     Uint4 write_size = chunk_size;
1371     if (cur_chunk_num == cnt_chunks - 1)
1372         write_size = Uint4(min(size - (cnt_chunks - 1) * chunk_size, Uint8(chunk_size)));
1373     wb_mem_lock.Unlock();
1374 
1375     if (x_WriteCurChunk(write_mem, write_size))
1376         SetRunnable();
1377     return true;
1378 }
1379 
1380 void
x_DeleteVersion(void)1381 SNCBlobVerData::x_DeleteVersion(void)
1382 {
1383     CNCBlobStorage::DeleteBlobInfo(this, chunk_maps);
1384     coord.clear();
1385     x_FreeChunkMaps();
1386     if (cur_chunk_num < cnt_chunks) {
1387         for (Uint8 num = cur_chunk_num; num < cnt_chunks - 1; ++num) {
1388             s_FreeWriteBackMem(chunks[num], chunk_size, chunk_size);
1389             if (releasing_mem < chunk_size) {
1390                 SRV_FATAL("blob ver data broken");
1391             }
1392             releasing_mem -= chunk_size;
1393         }
1394         Uint4 last_size = Uint4(min(size - (cnt_chunks - 1) * chunk_size, Uint8(chunk_size)));
1395         s_FreeWriteBackMem(chunks[cnt_chunks - 1], last_size, last_size);
1396         if (releasing_mem < last_size) {
1397             SRV_FATAL("blob ver data broken");
1398         }
1399         releasing_mem -= last_size;
1400         cur_chunk_num = cnt_chunks;
1401     }
1402 }
1403 
1404 void
ExecuteSlice(TSrvThreadNum)1405 SNCBlobVerData::ExecuteSlice(TSrvThreadNum /* thr_num */)
1406 {
1407 // writes blob data and metadata into db
1408     if (need_write_all  &&  x_ExecuteWriteAll())
1409         return;
1410 
1411 // remove blob from memory
1412     wb_mem_lock.Lock();
1413     CNCBlobVerManager* mgr = ACCESS_ONCE(ver_manager);
1414     if (mgr) {
1415         if (request_data_write) {
1416             request_data_write = false;
1417             mgr->DataWritten();
1418         }
1419         if (need_mem_release) {
1420             // if still has something to write, request that
1421             if (data_mem != 0  ||  (meta_has_changed  &&  is_cur_version)) {
1422                 need_write_all = true;
1423 #ifdef _DEBUG
1424 CNCAlerts::Register(CNCAlerts::eDebugExtraWrite,"SNCBlobVerData::ExecuteSlice");
1425 #endif
1426                 wb_mem_lock.Unlock();
1427                 SetRunnable();
1428                 return;
1429             }
1430             if (is_cur_version) {
1431                 wb_mem_lock.Unlock();
1432                 mgr->RequestMemRelease();
1433                 return;
1434             }
1435             need_mem_release = false;
1436         }
1437         wb_mem_lock.Unlock();
1438     }
1439     else {
1440         wb_mem_lock.Unlock();
1441 
1442         if (!is_cur_version)
1443             x_DeleteVersion();
1444 #if 0
1445         if (releasable_mem != 0  ||  releasing_mem != meta_mem) {
1446             SRV_FATAL("blob ver data broken");
1447         }
1448 #endif
1449         if (!delete_scheduled)
1450             s_ScheduleVerDelete(this);
1451     }
1452 }
1453 
1454 void
AddChunkMem(char * mem,Uint4 mem_size)1455 SNCBlobVerData::AddChunkMem(char* mem, Uint4 mem_size)
1456 {
1457     wb_mem_lock.Lock();
1458     size_t old_meta = s_CalcVerDataSize(this);
1459     chunks.push_back(mem);
1460     ++cnt_chunks;
1461     size_t add_meta_size = s_CalcVerDataSize(this) - old_meta;
1462     if (add_meta_size != 0) {
1463         s_AddCurrentMem(add_meta_size);
1464         meta_mem += add_meta_size;
1465     }
1466     data_mem += mem_size;
1467     releasable_mem += mem_size;
1468     s_AddReleasableMem(this, mem_size, 0);
1469     wb_mem_lock.Unlock();
1470 }
1471 
1472 
1473 //static Uint8 s_CntAccs = 0;
1474 
CNCBlobAccessor(void)1475 CNCBlobAccessor::CNCBlobAccessor(void)
1476     : m_ChunkMaps(NULL),
1477       m_MetaInfoReady(false),
1478       m_WriteMemRequested(false),
1479       m_Buffer(NULL)
1480 {
1481 #if __NC_TASKS_MONITOR
1482     m_TaskName = "CNCBlobAccessor";
1483 #endif
1484     //Uint8 cnt = AtomicAdd(s_CntAccs, 1);
1485     //INFO("CNCBlobAccessor, cnt=" << cnt);
1486 }
1487 
~CNCBlobAccessor(void)1488 CNCBlobAccessor::~CNCBlobAccessor(void)
1489 {
1490     if (m_ChunkMaps) {
1491         SRV_FATAL("blob accessor broken");
1492     }
1493 
1494     //Uint8 cnt = AtomicSub(s_CntAccs, 1);
1495     //INFO("~CNCBlobAccessor, cnt=" << cnt);
1496 }
1497 
1498 void
Prepare(const string & key,const string & password,Uint2 time_bucket,ENCAccessType access_type)1499 CNCBlobAccessor::Prepare(const string& key,
1500                          const string& password,
1501                          Uint2         time_bucket,
1502                          ENCAccessType access_type)
1503 {
1504     m_BlobKey       = key;
1505     m_Password      = password;
1506     m_TimeBucket    = time_bucket;
1507     m_AccessType    = access_type;
1508     m_HasError      = false;
1509     m_VerManager    = NULL;
1510     m_CurChunk      = 0;
1511     m_ChunkPos      = 0;
1512     m_SizeRead      = 0;
1513 }
1514 
1515 void
Initialize(SNCCacheData * cache_data)1516 CNCBlobAccessor::Initialize(SNCCacheData* cache_data)
1517 {
1518     if (cache_data) {
1519         bool new_version = m_AccessType == eNCCreate
1520                            ||  m_AccessType == eNCCopyCreate;
1521         m_VerManager = CNCBlobVerManager::Get(m_TimeBucket, cache_data->key,
1522                                               cache_data, new_version);
1523     }
1524 }
1525 
1526 void
Deinitialize(void)1527 CNCBlobAccessor::Deinitialize(void)
1528 {
1529     switch (m_AccessType) {
1530     case eNCReadData:
1531         if (m_ChunkMaps) {
1532             s_SubCurrentMem(s_CalcChunkMapsSize(m_CurData->map_size));
1533             delete m_ChunkMaps;
1534             m_ChunkMaps = NULL;
1535         }
1536         break;
1537     case eNCCreate:
1538     case eNCCopyCreate:
1539         if (m_NewData  &&  m_Buffer) {
1540             s_FreeWriteBackMem(m_Buffer, m_NewData->chunk_size, 0);
1541             m_Buffer = NULL;
1542         }
1543         break;
1544     default:
1545         break;
1546     }
1547 
1548     m_NewData.Reset();
1549     m_CurData.Reset();
1550     if (m_VerManager) {
1551         m_VerManager->Release();
1552         m_VerManager = NULL;
1553     }
1554 }
1555 
1556 void
Release(void)1557 CNCBlobAccessor::Release(void)
1558 {
1559     Deinitialize();
1560     Terminate();
1561 }
1562 
1563 void
RequestMetaInfo(CSrvTask * owner)1564 CNCBlobAccessor::RequestMetaInfo(CSrvTask* owner)
1565 {
1566     m_Owner = owner;
1567     if (m_VerManager) {
1568         m_VerManager->RequestCurVersion(this);
1569     }
1570     else {
1571         m_MetaInfoReady = true;
1572     }
1573 }
1574 
1575 void
ExecuteSlice(TSrvThreadNum)1576 CNCBlobAccessor::ExecuteSlice(TSrvThreadNum /* thr_num */)
1577 {
1578     if (!IsTransFinished())
1579         return;
1580 
1581     if (!m_MetaInfoReady) {
1582 // read blob metadata from db
1583         m_CurData = m_VerManager->GetCurVersion();
1584         m_MetaInfoReady = true;
1585         m_Owner->SetRunnable();
1586     }
1587     else if (m_WriteMemRequested) {
1588 // client sends data, we need memory
1589 // if no memory, wait a bit, then try again
1590         m_Buffer = s_AllocWriteBackMem(m_NewData->chunk_size, this);
1591         if (m_Buffer) {
1592             m_WriteMemRequested = false;
1593             m_Owner->SetRunnable();
1594         }
1595     }
1596 }
1597 
1598 void
x_DelCorruptedVersion(void)1599 CNCBlobAccessor::x_DelCorruptedVersion(void)
1600 {
1601     SRV_LOG(Critical, "Database information about blob "
1602                       << CNCBlobKeyLight(m_BlobKey).KeyForLogs()
1603                       << " is corrupted. Blob will be deleted");
1604     m_VerManager->DeleteVersion(m_CurData);
1605     m_CurData->has_error = true;
1606     m_HasError = true;
1607 }
1608 
1609 Uint4
GetReadMemSize(void)1610 CNCBlobAccessor::GetReadMemSize(void)
1611 {
1612     if (m_CurData->has_error) {
1613         m_HasError = true;
1614         return 0;
1615     }
1616     if (GetPosition() >= m_CurData->size) {
1617         SRV_FATAL("blob accessor broken");
1618     }
1619     if (m_Buffer) {
1620         if (m_ChunkPos < m_ChunkSize) {
1621             m_Buffer = m_CurData->chunks[m_CurChunk];
1622             return m_ChunkSize - m_ChunkPos;
1623         }
1624         ++m_CurChunk;
1625         m_ChunkPos = 0;
1626     }
1627 
1628     Uint8 need_size = m_CurData->size - GetPosition() + m_ChunkPos;
1629     if (need_size > m_CurData->chunk_size)
1630         need_size = m_CurData->chunk_size;
1631 
1632     m_Buffer = ACCESS_ONCE(m_CurData->chunks[m_CurChunk]);
1633     if (m_Buffer) {
1634         m_ChunkSize = Uint4(need_size);
1635         return m_ChunkSize - m_ChunkPos;
1636     }
1637 
1638     if (!m_ChunkMaps) {
1639         m_ChunkMaps = new SNCChunkMaps(m_CurData->map_size);
1640         s_AddCurrentMem(s_CalcChunkMapsSize(m_CurData->map_size));
1641     }
1642     if (!CNCBlobStorage::ReadChunkData(m_CurData, m_ChunkMaps, m_CurChunk,
1643                                        m_Buffer, m_ChunkSize))
1644     {
1645         x_DelCorruptedVersion();
1646         return 0;
1647     }
1648     if (m_ChunkSize != need_size) {
1649         x_DelCorruptedVersion();
1650         return 0;
1651     }
1652 
1653     ACCESS_ONCE(m_CurData->chunks[m_CurChunk]) = m_Buffer;
1654     return m_ChunkSize - m_ChunkPos;
1655 }
1656 
1657 void
MoveReadPos(Uint4 move_size)1658 CNCBlobAccessor::MoveReadPos(Uint4 move_size)
1659 {
1660     m_ChunkPos += move_size;
1661     m_SizeRead += move_size;
1662     if (m_CurData->cur_chunk_num > m_CurChunk
1663         &&  m_Buffer == m_CurData->chunks[m_CurChunk])
1664     {
1665         CNCStat::DiskDataRead(move_size);
1666     }
1667 }
1668 
1669 void
x_CreateNewData(void)1670 CNCBlobAccessor::x_CreateNewData(void)
1671 {
1672     if (!m_NewData) {
1673         m_NewData = m_VerManager->CreateNewVersion();
1674         m_NewData->password = m_Password;
1675     }
1676 }
1677 
1678 size_t
GetWriteMemSize(void)1679 CNCBlobAccessor::GetWriteMemSize(void)
1680 {
1681     x_CreateNewData();
1682     if (m_WriteMemRequested)
1683         return 0;
1684     if (m_Buffer  &&  m_ChunkPos < m_NewData->chunk_size)
1685         return m_NewData->chunk_size - m_ChunkPos;
1686 
1687     if (m_Buffer) {
1688         if (m_ChunkPos != m_NewData->chunk_size) {
1689             SRV_FATAL("blob accessor broken");
1690         }
1691         m_NewData->AddChunkMem(m_Buffer, m_NewData->chunk_size);
1692         m_Buffer = NULL;
1693         ++m_CurChunk;
1694         m_ChunkPos = 0;
1695     }
1696     m_WriteMemRequested = true;
1697     m_Buffer = s_AllocWriteBackMem(m_NewData->chunk_size, this);
1698     if (!m_Buffer)
1699         return 0;
1700 
1701     m_WriteMemRequested = false;
1702     return m_NewData->chunk_size - m_ChunkPos;
1703 }
1704 
1705 void
Finalize(void)1706 CNCBlobAccessor::Finalize(void)
1707 {
1708     if (m_ChunkPos != 0) {
1709         m_Buffer = s_ReallocWriteBackMem(m_Buffer, m_NewData->chunk_size, m_ChunkPos);
1710         m_NewData->AddChunkMem(m_Buffer, m_ChunkPos);
1711         m_Buffer = NULL;
1712     }
1713     m_VerManager->FinalizeWriting(m_NewData);
1714     if (m_CurData.NotNull() && m_CurData->update_received != 0) {
1715         CWriteBackControl::RecordNotifyUpdateBlob(m_CurData->update_received);
1716     }
1717     m_CurData = m_NewData;
1718 }
1719 
1720 bool
ReplaceBlobInfo(const SNCBlobVerData & new_info)1721 CNCBlobAccessor::ReplaceBlobInfo(const SNCBlobVerData& new_info)
1722 {
1723     if (!s_IsCurVerOlder(m_CurData, &new_info)) {
1724         return false;
1725     }
1726 
1727     x_CreateNewData();
1728     m_NewData->create_time = new_info.create_time;
1729     m_NewData->ttl = new_info.ttl;
1730     m_NewData->dead_time = new_info.dead_time;
1731     m_NewData->expire = new_info.expire;
1732     m_NewData->password = new_info.password;
1733     m_NewData->blob_ver = new_info.blob_ver;
1734     m_NewData->ver_ttl = new_info.ver_ttl;
1735     m_NewData->ver_expire = new_info.ver_expire;
1736     m_NewData->create_server = new_info.create_server;
1737     m_NewData->create_id = new_info.create_id;
1738     return true;
1739 }
1740 
1741 string
GetCurPassword(void) const1742 CNCBlobAccessor::GetCurPassword(void) const
1743 {
1744     if (m_CurData->password.empty())
1745         return m_CurData->password;
1746     else
1747         return CNCBlobStorage::PrintablePassword(m_CurData->password);
1748 }
1749 
GetPurgeCount()1750 Uint8 CNCBlobAccessor::GetPurgeCount()
1751 {
1752     return  s_Forgets.size() + s_ForgetKeys.size();
1753 }
1754 
1755 bool
IsPurged(const CNCBlobKeyLight & nc_key) const1756 CNCBlobAccessor::IsPurged(const CNCBlobKeyLight& nc_key) const
1757 {
1758     if (nc_key.Cache().empty()) {
1759         return false;
1760     }
1761     Uint8 cr_time = GetCurBlobCreateTime();
1762     if (cr_time == 0 || cr_time > s_LatestPurge) {
1763         return false;
1764     }
1765     string key(nc_key.Cache());
1766     bool res = false;
1767     s_ConsListLock.Lock();
1768     for (int t=0; !res && t<2; ++t) {
1769         key.append(1,'\1');
1770         TForgets::const_iterator i = s_ForgetKeys.find(key);
1771         if (i != s_ForgetKeys.end()) {
1772             res = cr_time <= i->second;
1773         }
1774         key.append(nc_key.RawKey());
1775     }
1776     s_ConsListLock.Unlock();
1777     return res;
1778 }
1779 
1780 bool
Purge(const CNCBlobKeyLight & nc_key,Uint8 when)1781 CNCBlobAccessor::Purge(const CNCBlobKeyLight& nc_key, Uint8 when)
1782 {
1783     bool res=false;
1784     Uint8 lifespan = 9U * 24U * 3600U * (Uint8)kUSecsPerSecond; // 9 days
1785     Uint8 now = CSrvTime::Current().AsUSec();
1786     Uint8 longago = now > lifespan ? (now-lifespan) : 0;
1787     s_LatestPurge = max(s_LatestPurge, when);
1788 
1789     s_ConsListLock.Lock();
1790     ERASE_ITERATE( TForgets, f, s_Forgets) {
1791         if (f->second < longago) {
1792             s_Forgets.erase(f);
1793             res=true;
1794         }
1795     }
1796     ERASE_ITERATE( TForgets, f, s_ForgetKeys) {
1797         if (f->second < longago) {
1798             s_ForgetKeys.erase(f);
1799             res=true;
1800         }
1801     }
1802     if (when != 0) {
1803         string key(nc_key.Cache());
1804         if (nc_key.RawKey().empty()) {
1805             TForgets::iterator i = s_Forgets.find(key);
1806             if (i == s_Forgets.end()) {
1807                 s_Forgets[key] = when;
1808                 res=true;
1809             } else if (i->second < when) {
1810                 i->second = when;
1811                 res=true;
1812             }
1813             key.append(1,'\1');
1814         } else {
1815             key.append(1,'\1').append(nc_key.RawKey()).append(1,'\1');
1816         }
1817         TForgets::iterator i = s_ForgetKeys.find(key);
1818         if (i == s_ForgetKeys.end()) {
1819             s_ForgetKeys[key] = when;
1820             res=true;
1821         } else if (i->second < when) {
1822             i->second = when;
1823             res=true;
1824         }
1825     }
1826     s_ConsListLock.Unlock();
1827     return res;
1828 }
1829 
GetPurgeData(char separator)1830 string CNCBlobAccessor::GetPurgeData(char separator)
1831 {
1832     string res;
1833     Purge(CNCBlobKeyLight(),0);
1834     s_ConsListLock.Lock();
1835     ITERATE( TForgets, f, s_Forgets) {
1836         res += NStr::NumericToString(f->second);
1837         res += ' ';
1838         res += f->first;
1839         res += separator;
1840     }
1841     s_ConsListLock.Unlock();
1842     return res;
1843 }
1844 
UpdatePurgeData(const string & data,char separator)1845 bool CNCBlobAccessor::UpdatePurgeData(const string& data, char separator)
1846 {
1847     bool res=false, error=false;;
1848     const char *begin = data.data();
1849     const char *end = begin + data.size();
1850     if (end != begin) {
1851         s_ConsListLock.Lock();
1852         while (end != begin) {
1853             Uint8 when = NStr::StringToUInt8(begin,
1854                 NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols);
1855             if (when == 0) {
1856                 error=true;
1857                 break;
1858             }
1859             const char *blank = strchr(begin,' ');
1860             if (!blank) {
1861                 error=true;
1862                 break;
1863             }
1864             const char *eol = strchr(blank,separator);
1865             if (!eol) {
1866                 error=true;
1867                 break;
1868             }
1869             string cache(blank+1, eol-blank-1);
1870             TForgets::const_iterator i = s_Forgets.find(cache);
1871             if (i == s_Forgets.end() || i->second < when) {
1872                 s_Forgets[cache] = when;
1873                 res = true;
1874             }
1875             begin = eol + 1;
1876         }
1877         s_ConsListLock.Unlock();
1878     }
1879     if (error) {
1880         SRV_LOG(Error, "Invalid PURGE data: " << data);
1881     }
1882     return res;
1883 }
1884 
SetFailedWriteCount(int failed_write)1885 void CNCBlobAccessor::SetFailedWriteCount(int failed_write)
1886 {
1887     s_FailedListLock.Lock();
1888     s_FailedReserve = failed_write - s_FailedKeys.size();
1889     s_FailMonitor = failed_write > 0;
1890     s_FailedListLock.Unlock();
1891 }
GetFailedWriteCount(void)1892 int CNCBlobAccessor::GetFailedWriteCount(void)
1893 {
1894     return s_FailedReserve;
1895 }
PutFailed(const string & blob_key)1896 void CNCBlobAccessor::PutFailed(const string& blob_key)
1897 {
1898     if (s_FailMonitor) {
1899         s_FailedListLock.Lock();
1900         if (s_FailedReserve > 0) {
1901             s_FailedKeys.insert(blob_key);
1902             --s_FailedReserve;
1903             s_FailedCounter.Add(1);
1904         }
1905         s_FailedListLock.Unlock();
1906     }
1907 }
PutSucceeded(const string & blob_key)1908 void CNCBlobAccessor::PutSucceeded(const string& blob_key)
1909 {
1910     if (s_FailMonitor && s_FailedCounter.Get() != 0) {
1911         s_FailedListLock.Lock();
1912         if (s_FailedKeys.erase(blob_key)) {
1913             ++s_FailedReserve;
1914             s_FailedCounter.Add(-1);
1915         }
1916         s_FailedListLock.Unlock();
1917     }
1918 }
HasPutSucceeded(const string & blob_key)1919 bool CNCBlobAccessor::HasPutSucceeded(const string& blob_key)
1920 {
1921     bool b = false;
1922     if (s_FailMonitor && s_FailedCounter.Get() != 0) {
1923         s_FailedListLock.Lock();
1924         b = s_FailedKeys.find(blob_key) != s_FailedKeys.end();
1925         s_FailedListLock.Unlock();
1926     }
1927     return !b;
1928 }
1929 
1930 /////////////////////////////////////////////////////////////////////////////
CBulkCleaner(void)1931 CBulkCleaner::CBulkCleaner(void)
1932 {
1933     SetState(&CBulkCleaner::x_StartSession);
1934 }
1935 
~CBulkCleaner(void)1936 CBulkCleaner::~CBulkCleaner(void)
1937 {
1938 }
1939 
1940 CBulkCleaner::State
x_StartSession(void)1941 CBulkCleaner::x_StartSession(void)
1942 {
1943     if (CTaskServer::IsInShutdown()) {
1944         return NULL;
1945     }
1946     m_CurBucket = 1;
1947     m_CrTime = 0;
1948     m_BlobAccess = NULL;
1949     s_ConsListLock.Lock();
1950     while (!s_ForgetKeys.empty()) {
1951         TForgets::const_iterator i = s_ForgetKeys.begin();
1952         m_Filter = i->first;
1953         m_CrTime = i->second;
1954         if (m_CrTime != 0) {
1955             break;
1956         }
1957         s_ForgetKeys.erase(i);
1958     }
1959     s_ConsListLock.Unlock();
1960     if (m_CrTime != 0) {
1961         CreateNewDiagCtx();
1962         return &CBulkCleaner::x_FindNext;
1963     }
1964     RunAfter(10);
1965     return NULL;
1966 }
1967 
1968 CBulkCleaner::State
x_FindNext(void)1969 CBulkCleaner::x_FindNext(void)
1970 {
1971     if (CTaskServer::IsInShutdown()) {
1972         return NULL;
1973     }
1974     for (; m_CurBucket <= CNCDistributionConf::GetCntTimeBuckets(); ++m_CurBucket) {
1975         m_Key = CNCBlobStorage::FindBlob(m_CurBucket, m_Filter, m_CrTime);
1976         if (!m_Key.empty()) {
1977             return &CBulkCleaner::x_RequestBlobAccess;
1978         }
1979     }
1980     return &CBulkCleaner::x_FinishSession;
1981 }
1982 
1983 CBulkCleaner::State
x_RequestBlobAccess(void)1984 CBulkCleaner::x_RequestBlobAccess(void)
1985 {
1986     CNCBlobKeyLight nc_key(m_Key);
1987     Uint2 slot, bkt;
1988     if (nc_key.IsICacheKey()) {
1989         CNCDistributionConf::GetSlotByICacheKey(nc_key, slot, bkt);
1990     } else {
1991         // we should never be here
1992         ++m_CurBucket;
1993         return &CBulkCleaner::x_FindNext;
1994     }
1995     m_BlobAccess = CNCBlobStorage::GetBlobAccess( eNCCreate, nc_key.PackedKey(), "", bkt);
1996     m_BlobAccess->RequestMetaInfo(this);
1997     return &CBulkCleaner::x_RemoveBlob;
1998 }
1999 
2000 CBulkCleaner::State
x_RemoveBlob(void)2001 CBulkCleaner::x_RemoveBlob(void)
2002 {
2003     if (CTaskServer::IsInShutdown()) {
2004         return NULL;
2005     }
2006     if (!m_BlobAccess->IsMetaInfoReady()) {
2007         return NULL;
2008     }
2009 
2010 // see
2011 // CNCMessageHandler::x_DoCmd_Remove
2012     if ((!m_BlobAccess || !m_BlobAccess->IsBlobExists() || m_BlobAccess->IsCurBlobExpired()))
2013     {
2014         m_BlobAccess->Release();
2015         m_BlobAccess = NULL;
2016         return &CBulkCleaner::x_FindNext;;
2017     }
2018 
2019     m_BlobAccess->SetBlobTTL(m_BlobAccess->GetCurBlobTTL());
2020     m_BlobAccess->SetBlobVersion(0);
2021     int expire = CSrvTime::CurSecs() - 1;
2022     unsigned int ttl = m_BlobAccess->GetNewBlobTTL();
2023     m_BlobAccess->SetNewBlobExpire(expire, expire + ttl + 1);
2024 
2025 // see
2026 // CNCMessageHandler::x_FinishReadingBlob
2027     CSrvTime cur_srv_time = CSrvTime::Current();
2028     Uint8 cur_time = cur_srv_time.AsUSec();
2029     int cur_secs = int(cur_srv_time.Sec());
2030     m_BlobAccess->SetBlobCreateTime(cur_time);
2031     if (m_BlobAccess->GetNewBlobExpire() == 0)
2032         m_BlobAccess->SetNewBlobExpire(cur_secs + m_BlobAccess->GetNewBlobTTL());
2033     m_BlobAccess->SetNewVerExpire(cur_secs + m_BlobAccess->GetNewVersionTTL());
2034     m_BlobAccess->SetCreateServer(CNCDistributionConf::GetSelfID(),
2035                                     CNCBlobStorage::GetNewBlobId());
2036 
2037     return &CBulkCleaner::x_Finalize;
2038 }
2039 
2040 CBulkCleaner::State
x_Finalize(void)2041 CBulkCleaner::x_Finalize(void)
2042 {
2043     if (m_BlobAccess) {
2044         m_BlobAccess->Finalize();
2045         m_BlobAccess->Release();
2046         m_BlobAccess = NULL;
2047 
2048         {
2049             CSrvDiagMsg diag_msg;
2050             GetDiagCtx()->SetRequestID();
2051             diag_msg.StartRequest().PrintParam("_type", "bulkrmv");
2052             CNCBlobKeyLight key(m_Key);
2053             diag_msg.PrintParam("cache",key.Cache()).PrintParam("key",key.RawKey()).PrintParam("subkey",key.SubKey());
2054             diag_msg.Flush();
2055             diag_msg.StopRequest();
2056         }
2057     }
2058     return &CBulkCleaner::x_FindNext;
2059 }
2060 
2061 CBulkCleaner::State
x_FinishSession(void)2062 CBulkCleaner::x_FinishSession(void)
2063 {
2064     ReleaseDiagCtx();
2065 
2066     s_ConsListLock.Lock();
2067     if (!s_ForgetKeys.empty()) {
2068         TForgets::const_iterator i = s_ForgetKeys.begin();
2069         if (m_Filter == i->first && m_CrTime == i->second) {
2070             s_ForgetKeys.erase(i);
2071         }
2072     }
2073     s_ConsListLock.Unlock();
2074 
2075     SetState(&CBulkCleaner::x_StartSession);
2076     SetRunnable();
2077     return NULL;
2078 }
2079 
2080 END_NCBI_SCOPE
2081