1 /* $Id: sync_log.cpp 504689 2016-06-17 13:05:41Z gouriano $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Denis Vakatov, Pavel Ivanov, Sergey Satskiy
27 *
28 * File Description: API to support blobs synchronization
29 *
30 */
31
32 #include "nc_pch.hpp"
33
34 #include <corelib/ncbidbg.hpp>
35
36 #include "netcached.hpp"
37 #include "sync_log.hpp"
38 #include "distribution_conf.hpp"
39 #include "task_server.hpp"
40
41
42
43 BEGIN_NCBI_SCOPE
44
45
46 struct SSlotData
47 {
48 CMiniMutex lock;
49 TSyncEvents events;
50 Uint8 rec_number;
51
SSlotDataSSlotData52 SSlotData(void) : rec_number(0)
53 {}
54
SSlotDataSSlotData55 SSlotData(const SSlotData& other) : rec_number(0)
56 {
57 if (other.rec_number != 0 || !other.events.empty()) {
58 SRV_FATAL("Invalid state");
59 }
60 }
61 };
62
63
64 struct SSrvSyncedData
65 {
66 Uint8 local_rec_no;
67 Uint8 remote_rec_no;
68
SSrvSyncedDataSSrvSyncedData69 SSrvSyncedData(void) : local_rec_no(0), remote_rec_no(0)
70 {}
71 };
72
73
74 static CMiniMutex s_GlobalLock;
75 typedef map<Uint2, SSlotData> TLog;
76 static TLog s_Log;
77 typedef map<Uint2, SSrvSyncedData> TSrvSyncedMap;
78 typedef map<Uint8, TSrvSyncedMap> TSyncedRecsMap;
79 static TSyncedRecsMap s_SyncedData;
80 static CAtomicCounter s_TotalRecords;
81 static Uint8 s_LastWrittenRecord;
82
83
84 // File IO supporting structures
85 static const size_t kMaxKeyLength = 1024;
86
87 struct SFixedPart
88 {
89 Uint8 rec_no; //< Local event sequential number.
90 ENCSyncEvent event_type; //< Event type (write, remove, prolong).
91 Uint2 slot; //< Key slot number.
92 Uint8 orig_time; //< Timestamp of the event when
93 //< it originated by client.
94 Uint8 orig_server; //< The server where event has
95 //< been originated.
96 Uint8 orig_rec_no; //< Record number on the host where the
97 //< event was originated.
98 Uint8 local_time; //< Timestamp when the record was
99 //< recorded locally.
100 };
101
102 struct SFileServRecord
103 {
104 Uint8 key_server;
105 Uint2 key_slot;
106 Uint8 local_rec_no;
107 Uint8 remote_rec_no;
108 };
109
110
111 // Reads the beginning of the file where the information about last synced
112 // records per pair server <--> slot is saved.
113 // Returns true if this information is read successfully.
114 static bool
s_ReadHeader(FILE * file)115 s_ReadHeader(FILE* file)
116 {
117 size_t count = 0;
118
119 // Read the number of pairs server <--> slot
120 if (fread(&count, sizeof(count), 1, file) != 1) {
121 SRV_LOG(Critical, "Cannot read the number of saved pairs(server <--> slot) "
122 "with the last synced record numbers. Invalid file?");
123 return false;
124 }
125
126 for (; count > 0; --count) {
127 SFileServRecord record;
128
129 if (fread(&record, sizeof(record), 1, file) != 1) {
130 SRV_LOG(Critical, "Cannot read last synced record numbers. Invalid file?");
131 return false;
132 }
133 SSrvSyncedData& sync_data = s_SyncedData[record.key_server]
134 [record.key_slot];
135 sync_data.local_rec_no = record.local_rec_no;
136 sync_data.remote_rec_no = record.remote_rec_no;
137 }
138
139 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
140 ERASE_ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
141 bool valid = false;
142 if (peers.find(it_srv->first) != peers.end()) {
143 const vector<Uint2>& slots = CNCDistributionConf::GetCommonSlots(it_srv->first);
144 if (!slots.empty()) {
145 valid = true;
146 TSrvSyncedMap& srv_map = it_srv->second;
147 ERASE_ITERATE(TSrvSyncedMap, it_slot, srv_map) {
148 if (find(slots.begin(), slots.end(), it_slot->first)
149 == slots.end())
150 {
151 srv_map.erase(it_slot);
152 }
153 }
154 }
155 }
156 if (!valid)
157 s_SyncedData.erase(it_srv);
158 }
159
160 return true;
161 }
162
163
164 // Reads and saves a single saved event record.
165 // Returns true if the event was read successfully.
166 static bool
s_ReadRecord(FILE * file)167 s_ReadRecord(FILE* file)
168 {
169 // Read the fixed part
170 SFixedPart record;
171 if (fread(&record, sizeof(record), 1, file) != 1)
172 return false;
173
174 // Read the key size
175 size_t key_size;
176 if (fread(&key_size, sizeof(key_size), 1, file) != 1)
177 return false;
178
179 // Read the key
180 char key[kMaxKeyLength];
181 if (fread(key, key_size, 1, file) != 1)
182 return false;
183
184 // Insert the corresponding record
185 SNCSyncEvent* new_record = new SNCSyncEvent;
186
187 new_record->rec_no = record.rec_no;
188 new_record->event_type = record.event_type;
189 new_record->orig_time = record.orig_time;
190 new_record->orig_server = record.orig_server;
191 new_record->orig_rec_no = record.orig_rec_no;
192 new_record->local_time = record.local_time;
193 new_record->key = CTempString(key, key_size);
194
195 s_Log[record.slot].events.push_back(new_record);
196 return true;
197 }
198
199
200 // Writes the beginning of the file where the information about last synced
201 // records per pair server <--> slot is saved.
202 // Returns true if this information is written successfully.
203 static bool
s_WriteHeader(FILE * file)204 s_WriteHeader(FILE* file)
205 {
206 size_t count = 0;
207 ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
208 count += it_srv->second.size();
209 }
210
211 if (fwrite(&count, sizeof(count), 1, file) != 1)
212 return false;
213
214 ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
215 ITERATE(TSrvSyncedMap, it_slot, it_srv->second) {
216 SFileServRecord record;
217 record.key_server = it_srv->first;
218 record.key_slot = it_slot->first;
219 record.local_rec_no = it_slot->second.local_rec_no;
220 record.remote_rec_no = it_slot->second.remote_rec_no;
221
222 if (fwrite(&record, sizeof(record), 1, file) != 1)
223 return false;
224 }
225 }
226 return true;
227 }
228
229
230 // Writes a single event record.
231 // Returns true if the event is written successfully.
232 static bool
s_WriteRecord(FILE * file,Uint2 slot,const SNCSyncEvent * event)233 s_WriteRecord(FILE* file, Uint2 slot, const SNCSyncEvent* event)
234 {
235 SFixedPart record;
236 memset(&record, 0, sizeof(record));
237 record.rec_no = event->rec_no;
238 record.event_type = event->event_type;
239 record.slot = slot;
240 record.orig_time = event->orig_time;
241 record.orig_server = event->orig_server;
242 record.orig_rec_no = event->orig_rec_no;
243 record.local_time = event->local_time;
244
245 // Write fixed size fields
246 if (fwrite(&record, sizeof(record), 1, file) != 1)
247 return false;
248
249 // Write the key size
250 size_t key_size = event->key.PackedKey().size();
251 if (fwrite(&key_size, sizeof(key_size), 1, file) != 1)
252 return false;
253
254 // Write the key
255 if (fwrite(event->key.PackedKey().data(), key_size, 1, file) != 1)
256 return false;
257
258 return true;
259 }
260
261 static inline SSlotData&
s_GetSlotData(Uint2 slot)262 s_GetSlotData(Uint2 slot)
263 {
264 CMiniMutexGuard guard(s_GlobalLock);
265 return s_Log[slot];
266 }
267
268
269 // Provides the minimum record number till which the synchronization is done
270 // with all the servers
271 static Uint8
s_GetMinLocalSyncedRecordNo(Uint2 slot,const SSlotData & data)272 s_GetMinLocalSyncedRecordNo(Uint2 slot, const SSlotData& data)
273 {
274 CMiniMutexGuard guard(s_GlobalLock);
275
276 Uint8 min_rec_no = (data.events.empty()? s_LastWrittenRecord
277 : data.events.front()->rec_no);
278 Uint8 result = s_LastWrittenRecord;
279 NON_CONST_ITERATE(TSyncedRecsMap, it_srv, s_SyncedData) {
280 SSrvSyncedData& sync_data = it_srv->second[slot];
281 if (sync_data.local_rec_no >= min_rec_no
282 && sync_data.local_rec_no < result)
283 {
284 result = sync_data.local_rec_no;
285 }
286 }
287 return result;
288 }
289
290
291 // The prolong event is found in the src interval
292 static void
s_ProcessProlong(const SBlobEvent & src_event,const SBlobEvent & other_event,TSyncEvents * diff)293 s_ProcessProlong(const SBlobEvent& src_event,
294 const SBlobEvent& other_event,
295 TSyncEvents* diff)
296 {
297 if (other_event.prolong_event != NULL) {
298 if (other_event.prolong_event->orig_server == src_event.prolong_event->orig_server
299 && other_event.prolong_event->orig_rec_no == src_event.prolong_event->orig_rec_no)
300 {
301 return;
302 }
303
304 if (other_event.prolong_event->isOlder(*src_event.prolong_event)
305 && ((src_event.wr_or_rm_event == NULL)
306 || (src_event.wr_or_rm_event != NULL
307 && other_event.wr_or_rm_event != NULL
308 && other_event.wr_or_rm_event->orig_server == src_event.wr_or_rm_event->orig_server
309 && other_event.wr_or_rm_event->orig_rec_no == src_event.wr_or_rm_event->orig_rec_no)
310 || (src_event.wr_or_rm_event != NULL
311 && other_event.wr_or_rm_event != NULL
312 && src_event.wr_or_rm_event->isOlder(*other_event.wr_or_rm_event))))
313 {
314 diff->push_back(src_event.prolong_event);
315 }
316 }
317 else if (other_event.wr_or_rm_event != NULL) {
318 if (other_event.wr_or_rm_event->event_type == eSyncWrite) {
319 if (other_event.wr_or_rm_event->isOlder(*src_event.prolong_event)
320 && ((src_event.wr_or_rm_event == NULL)
321 || (src_event.wr_or_rm_event != NULL
322 && other_event.wr_or_rm_event->orig_server == src_event.wr_or_rm_event->orig_server
323 && other_event.wr_or_rm_event->orig_rec_no == src_event.wr_or_rm_event->orig_rec_no)
324 || (src_event.wr_or_rm_event != NULL
325 && src_event.wr_or_rm_event->isOlder(*other_event.wr_or_rm_event))))
326 {
327 diff->push_back(src_event.prolong_event);
328 }
329 }
330 }
331 }
332
333 static void
s_ProcessWrite(SNCSyncEvent * src_event,const SBlobEvent & other_event,TSyncEvents * diff)334 s_ProcessWrite(SNCSyncEvent* src_event,
335 const SBlobEvent& other_event,
336 TSyncEvents* diff)
337 {
338 if (other_event.wr_or_rm_event != NULL) {
339 // If there is write or remove it does not matter if there was
340 // prolong or not
341 if ((other_event.wr_or_rm_event->orig_server != src_event->orig_server
342 || other_event.wr_or_rm_event->orig_rec_no != src_event->orig_rec_no)
343 && other_event.wr_or_rm_event->isOlder(*src_event))
344 {
345 // Take the most recent event
346 diff->push_back(src_event);
347 return;
348 }
349 }
350 else {
351 // This is lone prolong
352 diff->push_back(src_event);
353 }
354 }
355
356
357 static bool
s_SpecialFind(const TReducedSyncEvents & container,TReducedSyncEvents::const_iterator & current_iterator,const string & key)358 s_SpecialFind(const TReducedSyncEvents& container,
359 TReducedSyncEvents::const_iterator& current_iterator,
360 const string& key)
361 {
362 // It is known that the container is sorted by keys, so
363 for (; current_iterator != container.end(); ++current_iterator) {
364 if (current_iterator->first == key)
365 return true;
366 if (current_iterator->first > key)
367 return false;
368 }
369 return false;
370 }
371
372
373 // Here is how the decision is made:
374 // | User RM | WR | Prolong <-- found in src interval
375 // ----------------------------------------------------------
376 // User RM | none | if WR later | none
377 // | | => take WR |
378 // WR | if RM later | if WRsrc later | if PR later
379 // | => take RM | => take WRsrc | => take PR
380 // Prolong | * | ** | if PRsrc later
381 // | | | => take PRsrc
382 // ^- found in other
383 //
384 // * - if it was a lonely prolong => take User RM
385 // if there was WRother => take User RM if it is later than
386 // WRother
387 // ** - if it was a lonely prolong => take WR
388 // if there was WRother => take WRsrc if it is later
389 static void
s_CompareEvents(const TReducedSyncEvents & src,Uint8 start_rec_no,Uint8 now,const TReducedSyncEvents & other,Uint8 * synced_rec_no,TSyncEvents * diff)390 s_CompareEvents(const TReducedSyncEvents& src,
391 Uint8 start_rec_no,
392 Uint8 now,
393 const TReducedSyncEvents& other,
394 Uint8* synced_rec_no,
395 TSyncEvents* diff)
396 {
397 TReducedSyncEvents::const_iterator k = src.begin();
398 TReducedSyncEvents::const_iterator src_end = src.end();
399 Uint8 max_rec_no = 0;
400 Uint8 time_limit = now - CNCDistributionConf::GetPeriodicSyncHeadTime();
401 TReducedSyncEvents::const_iterator other_event = other.begin();
402
403 for (; k != src_end; ++k) {
404 Uint8 op_rec_no = k->second.getMaxRecNoWithinTimeLimit(time_limit);
405 if (op_rec_no <= start_rec_no)
406 continue;
407
408 // Update the synced record #
409 if (op_rec_no > max_rec_no)
410 max_rec_no = op_rec_no;
411
412 if (s_SpecialFind(other, other_event, k->first) == false) {
413 // No operations found with this blob - take the ops
414 if (k->second.wr_or_rm_event != NULL
415 && (k->second.wr_or_rm_event->rec_no > start_rec_no))
416 {
417 diff->push_back(k->second.wr_or_rm_event);
418 }
419 else if (k->second.prolong_event != NULL
420 && (k->second.prolong_event->local_time < time_limit))
421 {
422 diff->push_back(k->second.prolong_event);
423 }
424 continue;
425 }
426
427 // Found on the other side, let's make the decision
428 if (k->second.wr_or_rm_event != NULL
429 && (k->second.wr_or_rm_event->rec_no > start_rec_no))
430 {
431 s_ProcessWrite(k->second.wr_or_rm_event, other_event->second, diff);
432 }
433 if (k->second.prolong_event != NULL
434 && (k->second.prolong_event->local_time < time_limit))
435 {
436 s_ProcessProlong(k->second, other_event->second, diff);
437 }
438 }
439
440 if (max_rec_no != 0)
441 *synced_rec_no = max_rec_no;
442 else
443 *synced_rec_no = start_rec_no;
444 }
445
446
447 // This call is guaranteed to be done once in one thread only.
448 // So there is no need to bother about thread safety.
449 void
Initialize(bool need_read_saved,Uint8 start_log_rec_no)450 CNCSyncLog::Initialize(bool need_read_saved, Uint8 start_log_rec_no)
451 {
452 s_TotalRecords.Set(0);
453 s_LastWrittenRecord = 0;
454
455 string file_name = CNCDistributionConf::GetSyncLogFileName();
456 if (!need_read_saved || file_name.empty()) {
457 // No need to load the log from the file.
458 // Start from the given record number.
459 s_LastWrittenRecord = start_log_rec_no;
460 return;
461 }
462
463 // Need to load the log from the file and identify the record number to
464 // start with.
465 FILE* log_file = fopen(file_name.c_str(), "r");
466
467 if (!log_file) {
468 SRV_LOG(Warning, "Cannot open file: " << file_name);
469
470 // Could not read the file and therefore could not identify the
471 // start record number. So use the given
472 s_LastWrittenRecord = start_log_rec_no;
473 return;
474 }
475
476 // Read the server records
477 if (!s_ReadHeader(log_file)) {
478 // Error occurred, the file is broken
479 s_SyncedData.clear();
480 s_LastWrittenRecord = start_log_rec_no;
481 return;
482 }
483
484 // Read the log records
485 Uint4 recs_count = 0;
486 while (s_ReadRecord(log_file))
487 ++recs_count;
488 s_TotalRecords.Set(recs_count);
489
490 // Check for the errors
491 if (!feof(log_file)) {
492 SRV_LOG(Critical, "Cannot read records from " << file_name
493 << ". Invalid file?");
494
495 // Error occurred, the file is broken:
496 // Reset the counter, clean the log container, set the start number.
497 s_LastWrittenRecord = start_log_rec_no;
498 s_TotalRecords.Set(0);
499 s_SyncedData.clear();
500 s_Log.clear();
501
502 fclose(log_file);
503 return;
504 }
505 fclose(log_file);
506
507 // Set the last written record number
508 NON_CONST_ITERATE(TLog, slot, s_Log) {
509 SSlotData& data = slot->second;
510 data.rec_number = data.events.size();
511 if (data.rec_number != 0) {
512 Uint8 last_rec_no = data.events.back()->rec_no;
513 if (last_rec_no > s_LastWrittenRecord)
514 s_LastWrittenRecord = last_rec_no;
515 }
516 }
517 }
518
519 // This call is guaranteed to be done once in one thread only.
520 // So there is no need to bother about thread safety.
521 bool
Finalize(void)522 CNCSyncLog::Finalize(void)
523 {
524 const string& file_name = CNCDistributionConf::GetSyncLogFileName();
525 if (file_name.empty()) {
526 return false;
527 }
528 FILE* log_file = fopen(file_name.c_str(), "w");
529
530 if (!log_file)
531 return false;
532
533 // Avoid saving excessive data
534 ITERATE(TLog, it, s_Log) {
535 Clean(it->first);
536 }
537
538 // Save the last synced records
539 if (!s_WriteHeader(log_file)) {
540 fclose(log_file);
541 return false;
542 }
543
544 // Save the log entries
545 ITERATE(TLog, it_slot, s_Log) {
546 const TSyncEvents& events = it_slot->second.events;
547 ITERATE(TSyncEvents, item, events) {
548 if (!s_WriteRecord(log_file, it_slot->first, (*item))) {
549 fclose(log_file);
550 return false;
551 }
552 }
553 }
554 fclose(log_file);
555 return true;
556 }
557
558 Uint8
AddEvent(Uint2 slot,SNCSyncEvent * event)559 CNCSyncLog::AddEvent(Uint2 slot, SNCSyncEvent* event)
560 {
561 Uint2 real_slot = 0;
562 Uint2 time_bucket = 0;
563 if (!CNCDistributionConf::GetSlotByKey(
564 event->key.PackedKey(), real_slot, time_bucket) || real_slot != slot) {
565 SRV_FATAL("Slot verification failed, blob key: " << event->key.PackedKey() <<
566 ", expected slot: " << slot << ", calculated slot: " << real_slot);
567 }
568
569 SSlotData& data = s_GetSlotData(slot);
570 CMiniMutexGuard guard(data.lock);
571
572 event->local_time = CSrvTime::Current().AsUSec();
573 s_GlobalLock.Lock();
574 event->rec_no = ++s_LastWrittenRecord;
575 s_GlobalLock.Unlock();
576 // Avoid race condition:
577 // - user blob comes
578 // - event is written
579 // - sync started in another thread while orig_rec_no is updated
580 if (event->orig_server == CNCDistributionConf::GetSelfID())
581 event->orig_rec_no = event->rec_no;
582
583 data.events.push_back(event);
584 ++data.rec_number;
585 s_TotalRecords.Add(1);
586 return event->rec_no;
587 }
588
589 void
GetLastSyncedRecNo(Uint8 server,Uint2 slot,Uint8 * local_synced_rec_no,Uint8 * remote_synced_rec_no)590 CNCSyncLog::GetLastSyncedRecNo(Uint8 server,
591 Uint2 slot,
592 Uint8* local_synced_rec_no,
593 Uint8* remote_synced_rec_no)
594 {
595 CMiniMutexGuard guard(s_GlobalLock);
596
597 SSrvSyncedData& sync_data = s_SyncedData[server][slot];
598 *local_synced_rec_no = sync_data.local_rec_no;
599 *remote_synced_rec_no = sync_data.remote_rec_no;
600 }
601
602 void
SetLastSyncRecNo(Uint8 server,Uint2 slot,Uint8 local_synced_rec_no,Uint8 remote_synced_rec_no)603 CNCSyncLog::SetLastSyncRecNo(Uint8 server,
604 Uint2 slot,
605 Uint8 local_synced_rec_no,
606 Uint8 remote_synced_rec_no)
607 {
608 CMiniMutexGuard guard(s_GlobalLock);
609
610 SSrvSyncedData& sync_data = s_SyncedData[server][slot];
611 sync_data.local_rec_no = local_synced_rec_no;
612 sync_data.remote_rec_no = remote_synced_rec_no;
613 }
614
615 Uint8
GetCurrentRecNo(Uint2 slot)616 CNCSyncLog::GetCurrentRecNo(Uint2 slot)
617 {
618 SSlotData& data = s_GetSlotData(slot);
619 CMiniMutexGuard guard(data.lock);
620 if (!data.events.empty())
621 return data.events.back()->rec_no;
622 else
623 return s_LastWrittenRecord;
624 }
625
626 Uint8
GetLastRecNo(void)627 CNCSyncLog::GetLastRecNo(void)
628 {
629 return s_LastWrittenRecord;
630 }
631
632 bool
GetEventsList(Uint8 server,Uint2 slot,Uint8 * local_start_rec_no,Uint8 * remote_start_rec_no,TReducedSyncEvents * events)633 CNCSyncLog::GetEventsList(Uint8 server,
634 Uint2 slot,
635 Uint8* local_start_rec_no,
636 Uint8* remote_start_rec_no,
637 TReducedSyncEvents* events)
638 {
639 {{
640 CMiniMutexGuard guard(s_GlobalLock);
641 SSrvSyncedData& sync_data = s_SyncedData[server][slot];
642 if (sync_data.local_rec_no > *local_start_rec_no)
643 *local_start_rec_no = sync_data.local_rec_no;
644 else
645 sync_data.local_rec_no = *local_start_rec_no;
646 if (sync_data.remote_rec_no > *remote_start_rec_no)
647 *remote_start_rec_no = sync_data.remote_rec_no;
648 else
649 sync_data.remote_rec_no = *remote_start_rec_no;
650 }}
651
652 SSlotData& data = s_GetSlotData(slot);
653 CMiniMutexGuard guard(data.lock);
654
655 // Check the presence of the local record
656 if (data.events.empty()
657 || *local_start_rec_no < data.events.front()->rec_no
658 || s_LastWrittenRecord < *local_start_rec_no)
659 {
660 return false; // The required record is not available,
661 // all the blobs will be exchanged
662 }
663
664 //Uint8 time_limit = 0;
665 //bool time_limit_set = false;
666 // Walk the container from the end with copying all the records which
667 // matched: rec_no > local_rec_no or timestamp within 10 sec
668 NON_CONST_REVERSE_ITERATE(TSyncEvents, record, data.events) {
669 SNCSyncEvent* evt = *record;
670 // local_time is used here because the records are ordered by
671 // local_time but not the origin time
672 //if (time_limit_set && evt->local_time < time_limit)
673 // break;
674 if (/*!time_limit_set && */evt->rec_no < *local_start_rec_no) {
675 //time_limit_set = true;
676 //time_limit = evt->local_time
677 // - CNCDistributionConf::GetPeriodicSyncTailTime();
678 break;
679 }
680
681 // Now make the decision if the record should be memorized
682 // | rm | wr | pr <-- was saved
683 // ---------------------------------------------
684 // rm (any) | impossible | nothing | impossible
685 // wr | nothing | nothing | memorize
686 // pr | nothing | nothing | nothing
687 // ^- found
688 SBlobEvent& blob_event = (*events)[evt->key.PackedKey()];
689 switch (evt->event_type) {
690 case eSyncUpdate:
691 case eSyncRemove:
692 break;
693 case eSyncWrite:
694 if (!blob_event.wr_or_rm_event)
695 blob_event.wr_or_rm_event = evt;
696 break;
697 case eSyncProlong:
698 if (!blob_event.wr_or_rm_event && !blob_event.prolong_event)
699 blob_event.prolong_event = evt;
700 break;
701 }
702 }
703 return true;
704 }
705
706
707 bool
GetSyncOperations(Uint8 server,Uint2 slot,Uint8 local_start_rec_no,Uint8 remote_start_rec_no,const TReducedSyncEvents & remote_events,TSyncEvents * events_to_get,TSyncEvents * events_to_send,Uint8 * local_synced_rec_no,Uint8 * remote_synced_rec_no)708 CNCSyncLog::GetSyncOperations(Uint8 server,
709 Uint2 slot,
710 Uint8 local_start_rec_no,
711 Uint8 remote_start_rec_no,
712 const TReducedSyncEvents& remote_events,
713 TSyncEvents* events_to_get,
714 TSyncEvents* events_to_send,
715 Uint8* local_synced_rec_no,
716 Uint8* remote_synced_rec_no)
717 {
718 // Get the local events
719 TReducedSyncEvents local_events;
720 if (!GetEventsList(server, slot, &local_start_rec_no,
721 &remote_start_rec_no, &local_events))
722 {
723 return false;
724 }
725
726 Uint8 now = CSrvTime::Current().AsUSec();
727 s_CompareEvents(local_events, local_start_rec_no,
728 now, remote_events,
729 local_synced_rec_no, events_to_send);
730 s_CompareEvents(remote_events, remote_start_rec_no,
731 now, local_events,
732 remote_synced_rec_no, events_to_get);
733 return true;
734 }
735
736
737 Uint8
Clean(Uint2 slot)738 CNCSyncLog::Clean(Uint2 slot)
739 {
740 SSlotData& data = s_GetSlotData(slot);
741 CMiniMutexGuard guard(data.lock);
742
743 Uint4 max_recs = CNCDistributionConf::GetMaxSlotLogEvents();
744 Uint4 clean_to_recs = max_recs - CNCDistributionConf::GetCleanLogReserve();
745 Uint4 max_clean_cnt = CNCDistributionConf::GetMaxCleanLogBatch();
746 Uint4 cleaned_cnt = 0;
747 Uint8 limit = s_GetMinLocalSyncedRecordNo(slot, data);
748 while (cleaned_cnt < max_clean_cnt) {
749 if (data.events.empty() || data.events.front()->rec_no > limit)
750 break; // Records are younger that should be deleted
751 // Delete the record, it is not required any more
752 delete data.events.front();
753 data.events.pop_front();
754 s_TotalRecords.Add(-1);
755 --data.rec_number;
756 ++cleaned_cnt;
757 }
758 if (data.rec_number > max_recs) {
759 while (data.rec_number > clean_to_recs && cleaned_cnt < max_clean_cnt)
760 {
761 delete data.events.front();
762 data.events.pop_front();
763 s_TotalRecords.Add(-1);
764 --data.rec_number;
765 ++cleaned_cnt;
766 }
767 }
768 return cleaned_cnt;
769 }
770
771 Uint8
GetLogSize(void)772 CNCSyncLog::GetLogSize(void)
773 {
774 return s_TotalRecords.Get();
775 }
776
777 Uint8
GetLogSize(Uint2 slot)778 CNCSyncLog::GetLogSize(Uint2 slot)
779 {
780 SSlotData& data = s_GetSlotData(slot);
781 return data.rec_number;
782 }
783
784 bool
IsOverLimit(Uint2 slot)785 CNCSyncLog::IsOverLimit(Uint2 slot)
786 {
787 SSlotData& data = s_GetSlotData(slot);
788 Uint4 max_recs = CNCDistributionConf::GetMaxSlotLogEvents();
789 return data.rec_number > max_recs;
790 }
791
792 END_NCBI_SCOPE
793
794