1 /*  $Id: sync_log.cpp 504689 2016-06-17 13:05:41Z gouriano $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Denis Vakatov, Pavel Ivanov, Sergey Satskiy
27  *
28  * File Description: API to support blobs synchronization
29  *
30  */
31 
32 #include "nc_pch.hpp"
33 
34 #include <corelib/ncbidbg.hpp>
35 
36 #include "netcached.hpp"
37 #include "sync_log.hpp"
38 #include "distribution_conf.hpp"
39 #include "task_server.hpp"
40 
41 
42 
43 BEGIN_NCBI_SCOPE
44 
45 
46 struct SSlotData
47 {
48     CMiniMutex  lock;
49     TSyncEvents events;
50     Uint8       rec_number;
51 
SSlotDataSSlotData52     SSlotData(void) : rec_number(0)
53     {}
54 
SSlotDataSSlotData55     SSlotData(const SSlotData& other) : rec_number(0)
56     {
57         if (other.rec_number != 0  ||  !other.events.empty()) {
58             SRV_FATAL("Invalid state");
59         }
60     }
61 };
62 
63 
64 struct SSrvSyncedData
65 {
66     Uint8 local_rec_no;
67     Uint8 remote_rec_no;
68 
SSrvSyncedDataSSrvSyncedData69     SSrvSyncedData(void) : local_rec_no(0), remote_rec_no(0)
70     {}
71 };
72 
73 
74 static CMiniMutex       s_GlobalLock;
75 typedef map<Uint2, SSlotData> TLog;
76 static TLog             s_Log;
77 typedef map<Uint2, SSrvSyncedData>  TSrvSyncedMap;
78 typedef map<Uint8, TSrvSyncedMap>   TSyncedRecsMap;
79 static TSyncedRecsMap   s_SyncedData;
80 static CAtomicCounter   s_TotalRecords;
81 static Uint8            s_LastWrittenRecord;
82 
83 
84 // File IO supporting structures
85 static const size_t kMaxKeyLength = 1024;
86 
87 struct SFixedPart
88 {
89     Uint8         rec_no;       //< Local event sequential number.
90     ENCSyncEvent  event_type;   //< Event type (write, remove, prolong).
91     Uint2         slot;         //< Key slot number.
92     Uint8         orig_time;    //< Timestamp of the event when
93                                 //< it originated by client.
94     Uint8         orig_server;  //< The server where event has
95                                 //< been originated.
96     Uint8         orig_rec_no;  //< Record number on the host where the
97                                 //< event was originated.
98     Uint8         local_time;   //< Timestamp when the record was
99                                 //< recorded locally.
100 };
101 
102 struct SFileServRecord
103 {
104     Uint8 key_server;
105     Uint2 key_slot;
106     Uint8 local_rec_no;
107     Uint8 remote_rec_no;
108 };
109 
110 
111 // Reads the beginning of the file where the information about last synced
112 // records per pair server <--> slot is saved.
113 // Returns true if this information is read successfully.
114 static bool
s_ReadHeader(FILE * file)115 s_ReadHeader(FILE* file)
116 {
117     size_t count = 0;
118 
119     // Read the number of pairs server <--> slot
120     if (fread(&count, sizeof(count), 1, file) != 1) {
121         SRV_LOG(Critical, "Cannot read the number of saved pairs(server <--> slot) "
122                           "with the last synced record numbers. Invalid file?");
123         return false;
124     }
125 
126     for (; count > 0; --count) {
127         SFileServRecord record;
128 
129         if (fread(&record, sizeof(record), 1, file) != 1) {
130             SRV_LOG(Critical, "Cannot read last synced record numbers. Invalid file?");
131             return false;
132         }
133         SSrvSyncedData& sync_data = s_SyncedData[record.key_server]
134                                                 [record.key_slot];
135         sync_data.local_rec_no = record.local_rec_no;
136         sync_data.remote_rec_no = record.remote_rec_no;
137     }
138 
139     const TNCPeerList& peers = CNCDistributionConf::GetPeers();
140     ERASE_ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
141         bool valid = false;
142         if (peers.find(it_srv->first) != peers.end()) {
143             const vector<Uint2>& slots = CNCDistributionConf::GetCommonSlots(it_srv->first);
144             if (!slots.empty()) {
145                 valid = true;
146                 TSrvSyncedMap& srv_map = it_srv->second;
147                 ERASE_ITERATE(TSrvSyncedMap, it_slot, srv_map) {
148                     if (find(slots.begin(), slots.end(), it_slot->first)
149                                                                 == slots.end())
150                     {
151                         srv_map.erase(it_slot);
152                     }
153                 }
154             }
155         }
156         if (!valid)
157             s_SyncedData.erase(it_srv);
158     }
159 
160     return true;
161 }
162 
163 
164 // Reads and saves a single saved event record.
165 // Returns true if the event was read successfully.
166 static bool
s_ReadRecord(FILE * file)167 s_ReadRecord(FILE* file)
168 {
169     // Read the fixed part
170     SFixedPart record;
171     if (fread(&record, sizeof(record), 1, file) != 1)
172         return false;
173 
174     // Read the key size
175     size_t key_size;
176     if (fread(&key_size, sizeof(key_size), 1, file) != 1)
177         return false;
178 
179     // Read the key
180     char key[kMaxKeyLength];
181     if (fread(key, key_size, 1, file) != 1)
182         return false;
183 
184     // Insert the corresponding record
185     SNCSyncEvent* new_record = new SNCSyncEvent;
186 
187     new_record->rec_no      = record.rec_no;
188     new_record->event_type  = record.event_type;
189     new_record->orig_time   = record.orig_time;
190     new_record->orig_server = record.orig_server;
191     new_record->orig_rec_no = record.orig_rec_no;
192     new_record->local_time  = record.local_time;
193     new_record->key         = CTempString(key, key_size);
194 
195     s_Log[record.slot].events.push_back(new_record);
196     return true;
197 }
198 
199 
200 // Writes the beginning of the file where the information about last synced
201 // records per pair server <--> slot is saved.
202 // Returns true if this information is written successfully.
203 static bool
s_WriteHeader(FILE * file)204 s_WriteHeader(FILE* file)
205 {
206     size_t count = 0;
207     ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
208         count += it_srv->second.size();
209     }
210 
211     if (fwrite(&count, sizeof(count), 1, file) != 1)
212         return false;
213 
214     ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
215         ITERATE(TSrvSyncedMap, it_slot, it_srv->second) {
216             SFileServRecord record;
217             record.key_server = it_srv->first;
218             record.key_slot = it_slot->first;
219             record.local_rec_no = it_slot->second.local_rec_no;
220             record.remote_rec_no = it_slot->second.remote_rec_no;
221 
222             if (fwrite(&record, sizeof(record), 1, file) != 1)
223                 return false;
224         }
225     }
226     return true;
227 }
228 
229 
230 // Writes a single event record.
231 // Returns true if the event is written successfully.
232 static bool
s_WriteRecord(FILE * file,Uint2 slot,const SNCSyncEvent * event)233 s_WriteRecord(FILE* file, Uint2 slot, const SNCSyncEvent* event)
234 {
235     SFixedPart record;
236     memset(&record, 0, sizeof(record));
237     record.rec_no       = event->rec_no;
238     record.event_type   = event->event_type;
239     record.slot         = slot;
240     record.orig_time    = event->orig_time;
241     record.orig_server  = event->orig_server;
242     record.orig_rec_no  = event->orig_rec_no;
243     record.local_time   = event->local_time;
244 
245     // Write fixed size fields
246     if (fwrite(&record, sizeof(record), 1, file) != 1)
247         return false;
248 
249     // Write the key size
250     size_t key_size = event->key.PackedKey().size();
251     if (fwrite(&key_size, sizeof(key_size), 1, file) != 1)
252         return false;
253 
254     // Write the key
255     if (fwrite(event->key.PackedKey().data(), key_size, 1, file) != 1)
256         return false;
257 
258     return true;
259 }
260 
261 static inline SSlotData&
s_GetSlotData(Uint2 slot)262 s_GetSlotData(Uint2 slot)
263 {
264     CMiniMutexGuard guard(s_GlobalLock);
265     return s_Log[slot];
266 }
267 
268 
269 // Provides the minimum record number till which the synchronization is done
270 // with all the servers
271 static Uint8
s_GetMinLocalSyncedRecordNo(Uint2 slot,const SSlotData & data)272 s_GetMinLocalSyncedRecordNo(Uint2 slot, const SSlotData& data)
273 {
274     CMiniMutexGuard guard(s_GlobalLock);
275 
276     Uint8 min_rec_no = (data.events.empty()? s_LastWrittenRecord
277                                            : data.events.front()->rec_no);
278     Uint8 result = s_LastWrittenRecord;
279     NON_CONST_ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
280         SSrvSyncedData& sync_data = it_srv->second[slot];
281         if (sync_data.local_rec_no >= min_rec_no
282             &&  sync_data.local_rec_no < result)
283         {
284             result = sync_data.local_rec_no;
285         }
286     }
287     return result;
288 }
289 
290 
291 // The prolong event is found in the src interval
292 static void
s_ProcessProlong(const SBlobEvent & src_event,const SBlobEvent & other_event,TSyncEvents * diff)293 s_ProcessProlong(const SBlobEvent& src_event,
294                  const SBlobEvent& other_event,
295                  TSyncEvents*      diff)
296 {
297     if (other_event.prolong_event != NULL) {
298         if (other_event.prolong_event->orig_server == src_event.prolong_event->orig_server
299             &&  other_event.prolong_event->orig_rec_no == src_event.prolong_event->orig_rec_no)
300         {
301             return;
302         }
303 
304         if (other_event.prolong_event->isOlder(*src_event.prolong_event)
305             &&  ((src_event.wr_or_rm_event == NULL)
306                  ||  (src_event.wr_or_rm_event != NULL
307                       &&  other_event.wr_or_rm_event != NULL
308                       &&  other_event.wr_or_rm_event->orig_server == src_event.wr_or_rm_event->orig_server
309                       &&  other_event.wr_or_rm_event->orig_rec_no == src_event.wr_or_rm_event->orig_rec_no)
310                  ||  (src_event.wr_or_rm_event != NULL
311                       &&  other_event.wr_or_rm_event != NULL
312                       &&  src_event.wr_or_rm_event->isOlder(*other_event.wr_or_rm_event))))
313         {
314             diff->push_back(src_event.prolong_event);
315         }
316     }
317     else if (other_event.wr_or_rm_event != NULL) {
318         if (other_event.wr_or_rm_event->event_type == eSyncWrite) {
319             if (other_event.wr_or_rm_event->isOlder(*src_event.prolong_event)
320                 &&  ((src_event.wr_or_rm_event == NULL)
321                      ||  (src_event.wr_or_rm_event != NULL
322                           &&  other_event.wr_or_rm_event->orig_server == src_event.wr_or_rm_event->orig_server
323                           &&  other_event.wr_or_rm_event->orig_rec_no == src_event.wr_or_rm_event->orig_rec_no)
324                      ||  (src_event.wr_or_rm_event != NULL
325                           &&  src_event.wr_or_rm_event->isOlder(*other_event.wr_or_rm_event))))
326             {
327                 diff->push_back(src_event.prolong_event);
328             }
329         }
330     }
331 }
332 
333 static void
s_ProcessWrite(SNCSyncEvent * src_event,const SBlobEvent & other_event,TSyncEvents * diff)334 s_ProcessWrite(SNCSyncEvent*     src_event,
335                const SBlobEvent& other_event,
336                TSyncEvents*      diff)
337 {
338     if (other_event.wr_or_rm_event != NULL) {
339         // If there is write or remove it does not matter if there was
340         // prolong or not
341         if ((other_event.wr_or_rm_event->orig_server != src_event->orig_server
342              ||  other_event.wr_or_rm_event->orig_rec_no != src_event->orig_rec_no)
343             &&  other_event.wr_or_rm_event->isOlder(*src_event))
344         {
345             // Take the most recent event
346             diff->push_back(src_event);
347             return;
348         }
349     }
350     else {
351         // This is lone prolong
352         diff->push_back(src_event);
353     }
354 }
355 
356 
357 static bool
s_SpecialFind(const TReducedSyncEvents & container,TReducedSyncEvents::const_iterator & current_iterator,const string & key)358 s_SpecialFind(const TReducedSyncEvents& container,
359               TReducedSyncEvents::const_iterator& current_iterator,
360               const string& key)
361 {
362     // It is known that the container is sorted by keys, so
363     for (; current_iterator != container.end(); ++current_iterator) {
364         if (current_iterator->first == key)
365             return true;
366         if (current_iterator->first > key)
367             return false;
368     }
369     return false;
370 }
371 
372 
373 // Here is how the decision is made:
374 //         |  User RM    | WR             | Prolong  <-- found in src interval
375 // ----------------------------------------------------------
376 // User RM | none        | if WR later    | none
377 //         |             | => take WR     |
378 // WR      | if RM later | if WRsrc later | if PR later
379 //         | => take RM  | => take WRsrc  | => take PR
380 // Prolong | *           | **             | if PRsrc later
381 //         |             |                | => take PRsrc
382 // ^- found in other
383 //
384 // * - if it was a lonely prolong => take User RM
385 //     if there was WRother => take User RM if it is later than
386 //     WRother
387 // ** - if it was a lonely prolong => take WR
388 //      if there was WRother => take WRsrc if it is later
389 static void
s_CompareEvents(const TReducedSyncEvents & src,Uint8 start_rec_no,Uint8 now,const TReducedSyncEvents & other,Uint8 * synced_rec_no,TSyncEvents * diff)390 s_CompareEvents(const TReducedSyncEvents& src,
391                 Uint8 start_rec_no,
392                 Uint8 now,
393                 const TReducedSyncEvents& other,
394                 Uint8* synced_rec_no,
395                 TSyncEvents* diff)
396 {
397     TReducedSyncEvents::const_iterator k = src.begin();
398     TReducedSyncEvents::const_iterator src_end = src.end();
399     Uint8 max_rec_no = 0;
400     Uint8 time_limit = now - CNCDistributionConf::GetPeriodicSyncHeadTime();
401     TReducedSyncEvents::const_iterator other_event = other.begin();
402 
403     for (; k != src_end; ++k) {
404         Uint8 op_rec_no = k->second.getMaxRecNoWithinTimeLimit(time_limit);
405         if (op_rec_no <= start_rec_no)
406             continue;
407 
408         // Update the synced record #
409         if (op_rec_no > max_rec_no)
410             max_rec_no = op_rec_no;
411 
412         if (s_SpecialFind(other, other_event, k->first) == false) {
413             // No operations found with this blob - take the ops
414             if (k->second.wr_or_rm_event != NULL
415                 &&  (k->second.wr_or_rm_event->rec_no > start_rec_no))
416             {
417                 diff->push_back(k->second.wr_or_rm_event);
418             }
419             else if (k->second.prolong_event != NULL
420                      &&  (k->second.prolong_event->local_time < time_limit))
421             {
422                 diff->push_back(k->second.prolong_event);
423             }
424             continue;
425         }
426 
427         // Found on the other side, let's make the decision
428         if (k->second.wr_or_rm_event != NULL
429             &&  (k->second.wr_or_rm_event->rec_no > start_rec_no))
430         {
431             s_ProcessWrite(k->second.wr_or_rm_event, other_event->second, diff);
432         }
433         if (k->second.prolong_event != NULL
434             &&  (k->second.prolong_event->local_time < time_limit))
435         {
436             s_ProcessProlong(k->second, other_event->second, diff);
437         }
438     }
439 
440     if (max_rec_no != 0)
441         *synced_rec_no = max_rec_no;
442     else
443         *synced_rec_no = start_rec_no;
444 }
445 
446 
447 // This call is guaranteed to be done once in one thread only.
448 // So there is no need to bother about thread safety.
449 void
Initialize(bool need_read_saved,Uint8 start_log_rec_no)450 CNCSyncLog::Initialize(bool need_read_saved, Uint8 start_log_rec_no)
451 {
452     s_TotalRecords.Set(0);
453     s_LastWrittenRecord = 0;
454 
455     string file_name = CNCDistributionConf::GetSyncLogFileName();
456     if (!need_read_saved  ||  file_name.empty()) {
457         // No need to load the log from the file.
458         // Start from the given record number.
459         s_LastWrittenRecord = start_log_rec_no;
460         return;
461     }
462 
463     // Need to load the log from the file and identify the record number to
464     // start with.
465     FILE* log_file = fopen(file_name.c_str(), "r");
466 
467     if (!log_file) {
468         SRV_LOG(Warning, "Cannot open file: " << file_name);
469 
470         // Could not read the file and therefore could not identify the
471         // start record number. So use the given
472         s_LastWrittenRecord = start_log_rec_no;
473         return;
474     }
475 
476     // Read the server records
477     if (!s_ReadHeader(log_file)) {
478         // Error occurred, the file is broken
479         s_SyncedData.clear();
480         s_LastWrittenRecord = start_log_rec_no;
481         return;
482     }
483 
484     // Read the log records
485     Uint4 recs_count = 0;
486     while (s_ReadRecord(log_file))
487         ++recs_count;
488     s_TotalRecords.Set(recs_count);
489 
490     // Check for the errors
491     if (!feof(log_file)) {
492         SRV_LOG(Critical, "Cannot read records from " << file_name
493                           << ". Invalid file?");
494 
495         // Error occurred, the file is broken:
496         // Reset the counter, clean the log container, set the start number.
497         s_LastWrittenRecord = start_log_rec_no;
498         s_TotalRecords.Set(0);
499         s_SyncedData.clear();
500         s_Log.clear();
501 
502         fclose(log_file);
503         return;
504     }
505     fclose(log_file);
506 
507     // Set the last written record number
508     NON_CONST_ITERATE(TLog, slot, s_Log) {
509         SSlotData& data = slot->second;
510         data.rec_number = data.events.size();
511         if (data.rec_number != 0) {
512             Uint8 last_rec_no = data.events.back()->rec_no;
513             if (last_rec_no > s_LastWrittenRecord)
514                 s_LastWrittenRecord = last_rec_no;
515         }
516     }
517 }
518 
519 // This call is guaranteed to be done once in one thread only.
520 // So there is no need to bother about thread safety.
521 bool
Finalize(void)522 CNCSyncLog::Finalize(void)
523 {
524     const string& file_name = CNCDistributionConf::GetSyncLogFileName();
525     if (file_name.empty()) {
526         return false;
527     }
528     FILE* log_file = fopen(file_name.c_str(), "w");
529 
530     if (!log_file)
531         return false;
532 
533     // Avoid saving excessive data
534     ITERATE(TLog, it, s_Log) {
535         Clean(it->first);
536     }
537 
538     // Save the last synced records
539     if (!s_WriteHeader(log_file)) {
540         fclose(log_file);
541         return false;
542     }
543 
544     // Save the log entries
545     ITERATE(TLog, it_slot, s_Log) {
546         const TSyncEvents& events = it_slot->second.events;
547         ITERATE(TSyncEvents, item, events) {
548             if (!s_WriteRecord(log_file, it_slot->first, (*item))) {
549                 fclose(log_file);
550                 return false;
551             }
552         }
553     }
554     fclose(log_file);
555     return true;
556 }
557 
558 Uint8
AddEvent(Uint2 slot,SNCSyncEvent * event)559 CNCSyncLog::AddEvent(Uint2 slot, SNCSyncEvent* event)
560 {
561     Uint2 real_slot = 0;
562     Uint2 time_bucket = 0;
563     if (!CNCDistributionConf::GetSlotByKey(
564         event->key.PackedKey(), real_slot, time_bucket) || real_slot != slot) {
565         SRV_FATAL("Slot verification failed, blob key: " << event->key.PackedKey() <<
566                   ", expected slot: " << slot << ", calculated slot: " << real_slot);
567     }
568 
569     SSlotData& data = s_GetSlotData(slot);
570     CMiniMutexGuard guard(data.lock);
571 
572     event->local_time = CSrvTime::Current().AsUSec();
573     s_GlobalLock.Lock();
574     event->rec_no = ++s_LastWrittenRecord;
575     s_GlobalLock.Unlock();
576     // Avoid race condition:
577     // - user blob comes
578     // - event is written
579     // - sync started in another thread while orig_rec_no is updated
580     if (event->orig_server == CNCDistributionConf::GetSelfID())
581         event->orig_rec_no = event->rec_no;
582 
583     data.events.push_back(event);
584     ++data.rec_number;
585     s_TotalRecords.Add(1);
586     return event->rec_no;
587 }
588 
589 void
GetLastSyncedRecNo(Uint8 server,Uint2 slot,Uint8 * local_synced_rec_no,Uint8 * remote_synced_rec_no)590 CNCSyncLog::GetLastSyncedRecNo(Uint8  server,
591                                Uint2  slot,
592                                Uint8* local_synced_rec_no,
593                                Uint8* remote_synced_rec_no)
594 {
595     CMiniMutexGuard guard(s_GlobalLock);
596 
597     SSrvSyncedData& sync_data = s_SyncedData[server][slot];
598     *local_synced_rec_no = sync_data.local_rec_no;
599     *remote_synced_rec_no = sync_data.remote_rec_no;
600 }
601 
602 void
SetLastSyncRecNo(Uint8 server,Uint2 slot,Uint8 local_synced_rec_no,Uint8 remote_synced_rec_no)603 CNCSyncLog::SetLastSyncRecNo(Uint8 server,
604                              Uint2 slot,
605                              Uint8 local_synced_rec_no,
606                              Uint8 remote_synced_rec_no)
607 {
608     CMiniMutexGuard guard(s_GlobalLock);
609 
610     SSrvSyncedData& sync_data = s_SyncedData[server][slot];
611     sync_data.local_rec_no = local_synced_rec_no;
612     sync_data.remote_rec_no = remote_synced_rec_no;
613 }
614 
615 Uint8
GetCurrentRecNo(Uint2 slot)616 CNCSyncLog::GetCurrentRecNo(Uint2 slot)
617 {
618     SSlotData& data = s_GetSlotData(slot);
619     CMiniMutexGuard guard(data.lock);
620     if (!data.events.empty())
621         return data.events.back()->rec_no;
622     else
623         return s_LastWrittenRecord;
624 }
625 
626 Uint8
GetLastRecNo(void)627 CNCSyncLog::GetLastRecNo(void)
628 {
629     return s_LastWrittenRecord;
630 }
631 
632 bool
GetEventsList(Uint8 server,Uint2 slot,Uint8 * local_start_rec_no,Uint8 * remote_start_rec_no,TReducedSyncEvents * events)633 CNCSyncLog::GetEventsList(Uint8  server,
634                           Uint2  slot,
635                           Uint8* local_start_rec_no,
636                           Uint8* remote_start_rec_no,
637                           TReducedSyncEvents* events)
638 {
639     {{
640         CMiniMutexGuard guard(s_GlobalLock);
641         SSrvSyncedData& sync_data = s_SyncedData[server][slot];
642         if (sync_data.local_rec_no > *local_start_rec_no)
643             *local_start_rec_no = sync_data.local_rec_no;
644         else
645             sync_data.local_rec_no = *local_start_rec_no;
646         if (sync_data.remote_rec_no > *remote_start_rec_no)
647             *remote_start_rec_no = sync_data.remote_rec_no;
648         else
649             sync_data.remote_rec_no = *remote_start_rec_no;
650     }}
651 
652     SSlotData& data = s_GetSlotData(slot);
653     CMiniMutexGuard guard(data.lock);
654 
655     // Check the presence of the local record
656     if (data.events.empty()
657         ||  *local_start_rec_no < data.events.front()->rec_no
658         ||  s_LastWrittenRecord < *local_start_rec_no)
659     {
660         return false;   // The required record is not available,
661                         // all the blobs will be exchanged
662     }
663 
664     //Uint8 time_limit = 0;
665     //bool  time_limit_set = false;
666     // Walk the container from the end with copying all the records which
667     // matched: rec_no > local_rec_no or timestamp within 10 sec
668     NON_CONST_REVERSE_ITERATE(TSyncEvents, record, data.events) {
669         SNCSyncEvent* evt = *record;
670         // local_time is used here because the records are ordered by
671         // local_time but not the origin time
672         //if (time_limit_set  &&  evt->local_time < time_limit)
673         //    break;
674         if (/*!time_limit_set  &&  */evt->rec_no < *local_start_rec_no) {
675             //time_limit_set = true;
676             //time_limit = evt->local_time
677             //             - CNCDistributionConf::GetPeriodicSyncTailTime();
678             break;
679         }
680 
681         // Now make the decision if the record should be memorized
682         //          | rm         | wr       | pr          <-- was saved
683         // ---------------------------------------------
684         // rm (any) | impossible | nothing  | impossible
685         // wr       | nothing    | nothing  | memorize
686         // pr       | nothing    | nothing  | nothing
687         // ^- found
688         SBlobEvent& blob_event = (*events)[evt->key.PackedKey()];
689         switch (evt->event_type) {
690         case eSyncUpdate:
691         case eSyncRemove:
692             break;
693         case eSyncWrite:
694             if (!blob_event.wr_or_rm_event)
695                 blob_event.wr_or_rm_event = evt;
696             break;
697         case eSyncProlong:
698             if (!blob_event.wr_or_rm_event  &&  !blob_event.prolong_event)
699                 blob_event.prolong_event = evt;
700             break;
701         }
702     }
703     return true;
704 }
705 
706 
707 bool
GetSyncOperations(Uint8 server,Uint2 slot,Uint8 local_start_rec_no,Uint8 remote_start_rec_no,const TReducedSyncEvents & remote_events,TSyncEvents * events_to_get,TSyncEvents * events_to_send,Uint8 * local_synced_rec_no,Uint8 * remote_synced_rec_no)708 CNCSyncLog::GetSyncOperations(Uint8 server,
709                               Uint2 slot,
710                               Uint8 local_start_rec_no,
711                               Uint8 remote_start_rec_no,
712                               const TReducedSyncEvents& remote_events,
713                               TSyncEvents* events_to_get,
714                               TSyncEvents* events_to_send,
715                               Uint8* local_synced_rec_no,
716                               Uint8* remote_synced_rec_no)
717 {
718     // Get the local events
719     TReducedSyncEvents local_events;
720     if (!GetEventsList(server, slot, &local_start_rec_no,
721                        &remote_start_rec_no, &local_events))
722     {
723         return false;
724     }
725 
726     Uint8 now = CSrvTime::Current().AsUSec();
727     s_CompareEvents(local_events, local_start_rec_no,
728                     now, remote_events,
729                     local_synced_rec_no, events_to_send);
730     s_CompareEvents(remote_events, remote_start_rec_no,
731                     now, local_events,
732                     remote_synced_rec_no, events_to_get);
733     return true;
734 }
735 
736 
737 Uint8
Clean(Uint2 slot)738 CNCSyncLog::Clean(Uint2 slot)
739 {
740     SSlotData& data = s_GetSlotData(slot);
741     CMiniMutexGuard guard(data.lock);
742 
743     Uint4 max_recs = CNCDistributionConf::GetMaxSlotLogEvents();
744     Uint4 clean_to_recs = max_recs - CNCDistributionConf::GetCleanLogReserve();
745     Uint4 max_clean_cnt = CNCDistributionConf::GetMaxCleanLogBatch();
746     Uint4 cleaned_cnt = 0;
747     Uint8 limit = s_GetMinLocalSyncedRecordNo(slot, data);
748     while (cleaned_cnt < max_clean_cnt) {
749         if (data.events.empty()  ||  data.events.front()->rec_no > limit)
750             break;       // Records are younger that should be deleted
751         // Delete the record, it is not required any more
752         delete data.events.front();
753         data.events.pop_front();
754         s_TotalRecords.Add(-1);
755         --data.rec_number;
756         ++cleaned_cnt;
757     }
758     if (data.rec_number > max_recs) {
759         while (data.rec_number > clean_to_recs  &&  cleaned_cnt < max_clean_cnt)
760         {
761             delete data.events.front();
762             data.events.pop_front();
763             s_TotalRecords.Add(-1);
764             --data.rec_number;
765             ++cleaned_cnt;
766         }
767     }
768     return cleaned_cnt;
769 }
770 
771 Uint8
GetLogSize(void)772 CNCSyncLog::GetLogSize(void)
773 {
774     return s_TotalRecords.Get();
775 }
776 
777 Uint8
GetLogSize(Uint2 slot)778 CNCSyncLog::GetLogSize(Uint2 slot)
779 {
780     SSlotData& data = s_GetSlotData(slot);
781     return data.rec_number;
782 }
783 
784 bool
IsOverLimit(Uint2 slot)785 CNCSyncLog::IsOverLimit(Uint2 slot)
786 {
787     SSlotData& data = s_GetSlotData(slot);
788     Uint4 max_recs = CNCDistributionConf::GetMaxSlotLogEvents();
789     return data.rec_number > max_recs;
790 }
791 
792 END_NCBI_SCOPE
793 
794