1 /* $Id: periodic_sync.cpp 633628 2021-06-22 18:24:03Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Denis Vakatov, Pavel Ivanov, Sergey Satskiy
27 *
28 * File Description: Data structures and API to support blobs mirroring.
29 *
30 */
31
32 #include "nc_pch.hpp"
33
34 #include <corelib/request_ctx.hpp>
35 #include <util/random_gen.hpp>
36
37 #include "netcached.hpp"
38 #include "periodic_sync.hpp"
39 #include "distribution_conf.hpp"
40 #include "sync_log.hpp"
41 #include "peer_control.hpp"
42 #include "active_handler.hpp"
43 #include "nc_storage.hpp"
44 #include "nc_stat.hpp"
45 #include <random>
46
47
48 BEGIN_NCBI_SCOPE
49
50
51 static TSyncSlotsList s_SlotsList;
52 static TSyncSlotsMap s_SlotsMap;
53 static CMiniMutex s_RndLock;
54 static CRandom s_Rnd(CRandom::TValue(time(NULL)));
55
56 typedef vector<CNCActiveSyncControl*> TSyncControls;
57 static TSyncControls s_SyncControls;
58 static CNCLogCleaner* s_LogCleaner;
59
60 static FILE* s_LogFile = NULL;
61
62
63 template <typename Type> void
s_ShuffleList(vector<Type> & lst)64 s_ShuffleList( vector<Type>& lst)
65 {
66 random_device rd;
67 mt19937 mt(rd());
68 shuffle(lst.begin(), lst.end(), mt);
69 }
70
71 static void
s_ShuffleSrvsLists(void)72 s_ShuffleSrvsLists(void)
73 {
74 TSyncSlotsList::const_iterator sl = s_SlotsList.begin();
75 for ( ; sl != s_SlotsList.end(); ++sl) {
76 (*sl)->lock.Lock();
77 s_ShuffleList<SSyncSlotSrv*>((*sl)->srvs);
78 (*sl)->lock.Unlock();
79 }
80 }
81
82
83 static void
s_FindServerSlot(Uint8 server_id,Uint2 slot,SSyncSlotData * & slot_data,SSyncSlotSrv * & slot_srv)84 s_FindServerSlot(Uint8 server_id,
85 Uint2 slot,
86 SSyncSlotData*& slot_data,
87 SSyncSlotSrv*& slot_srv)
88 {
89 slot_data = NULL;
90 slot_srv = NULL;
91 TSyncSlotsMap::const_iterator it_slot = s_SlotsMap.find(slot);
92 if (it_slot == s_SlotsMap.end())
93 return;
94 slot_data = it_slot->second;
95 slot_data->lock.Lock();
96 TSlotSrvsList srvs = slot_data->srvs;
97 slot_data->lock.Unlock();
98 ITERATE(TSlotSrvsList, it_srv, srvs) {
99 SSyncSlotSrv* this_srv = *it_srv;
100 if (this_srv->peer->GetSrvId() == server_id) {
101 slot_srv = this_srv;
102 return;
103 }
104 }
105 }
106
107 static ESyncInitiateResult
s_StartSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,bool is_passive)108 s_StartSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, bool is_passive)
109 {
110 CMiniMutexGuard g_slot(slot_data->lock);
111 if (slot_data->cleaning || slot_data->clean_required) {
112 return eServerBusy;
113 }
114 // there is a feeling (not knowledge) that more than one sync per slot is not good
115 #if 1
116 if (slot_data->cnt_sync_started != 0) {
117 return eServerBusy;
118 }
119 #endif
120
121 CMiniMutexGuard g_srv(slot_srv->lock);
122 if (slot_srv->sync_started) {
123 if (!is_passive || !slot_srv->is_passive || slot_srv->started_cmds != 0) {
124 return eCrossSynced;
125 }
126 if (slot_srv->cnt_sync_ops != 0) {
127 CNCStat::PeerSyncFinished(slot_srv->peer->GetSrvId(), slot_data->slot, slot_srv->cnt_sync_ops, false);
128 }
129 slot_srv->sync_started = false;
130 if (slot_data->cnt_sync_started != 0) {
131 --slot_data->cnt_sync_started;
132 }
133 }
134
135 if (!is_passive && !slot_srv->peer->StartActiveSync()) {
136 return eServerBusy;
137 }
138 slot_srv->sync_started = true;
139 slot_srv->is_passive = is_passive;
140 slot_srv->cnt_sync_ops = 0;
141 slot_srv->last_active_time = CSrvTime::CurSecs();
142 ++slot_srv->cur_sync_id;
143 ++slot_data->cnt_sync_started;
144 return eProceedWithEvents;
145 }
146
147 static void
s_StopSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,Uint8 next_delay,ESyncResult result,int hint)148 s_StopSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, Uint8 next_delay, ESyncResult result, int hint)
149 {
150 slot_srv->peer->RegisterSyncStop(slot_srv->is_passive,
151 slot_srv->next_sync_time,
152 next_delay);
153 #ifdef _DEBUG
154 slot_srv->peer->RegisterSyncStat(slot_srv->is_passive, slot_srv->is_by_blobs, result, hint);
155 #endif
156 slot_srv->sync_started = false;
157 slot_srv->started_cmds = 0;
158 slot_srv->is_passive = true;
159 slot_srv->result = result;
160 slot_srv->hint = hint;
161 if (slot_data->cnt_sync_started != 0) {
162 --slot_data->cnt_sync_started;
163 }
164 /*
165 else {
166 SRV_LOG(Error, "SyncSlotData broken");
167 }
168 */
169 if (slot_data->cnt_sync_started == 0 && slot_data->clean_required)
170 s_LogCleaner->SetRunnable();
171 }
172
173 static void
s_CancelSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,Uint8 next_delay,ESyncResult result,int hint)174 s_CancelSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, Uint8 next_delay, ESyncResult result, int hint)
175 {
176 CNCStat::PeerSyncFinished(slot_srv->peer->GetSrvId(), slot_data->slot, slot_srv->cnt_sync_ops, false);
177 s_StopSync(slot_data, slot_srv, next_delay, result, hint);
178 }
179
180 static void
s_CommitSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,int hint)181 s_CommitSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, int hint)
182 {
183 slot_srv->last_success_time = CSrvTime::CurSecs();
184 CNCStat::PeerSyncFinished(slot_srv->peer->GetSrvId(), slot_data->slot, slot_srv->cnt_sync_ops, true);
185 slot_srv->peer->ConnOk();
186 slot_srv->was_blobs_sync = slot_srv->is_by_blobs;
187 slot_srv->cnt_event_sync = slot_srv->is_by_blobs ? 0 : (slot_srv->cnt_event_sync + 1);
188 if (!slot_srv->made_initial_sync)
189 {
190 slot_srv->made_initial_sync = true;
191 slot_srv->peer->AddInitiallySyncedSlot();
192 }
193 s_StopSync(slot_data, slot_srv, CNCDistributionConf::GetPeriodicSyncInterval(), eSynOK, hint);
194 }
195
196 static void
s_DoCleanLog(CNCLogCleaner * cleaner,Uint2 slot)197 s_DoCleanLog(CNCLogCleaner* cleaner, Uint2 slot)
198 {
199 cleaner->CreateNewDiagCtx();
200 CSrvDiagMsg().StartRequest()
201 .PrintParam("_type", "clean")
202 .PrintParam("slot", slot);
203
204 Uint8 cleaned = CNCSyncLog::Clean(slot);
205 cleaner->GetDiagCtx()->SetBytesWr(Int8(cleaned));
206 CSrvDiagMsg().StopRequest();
207 cleaner->ReleaseDiagCtx();
208 }
209
210 void
ExecuteSlice(TSrvThreadNum)211 CNCLogCleaner::ExecuteSlice(TSrvThreadNum /* thr_idx */)
212 {
213 if (CTaskServer::IsInShutdown())
214 return;
215
216 // clean operation log used for synchronization between peers
217 // that is remove already synced ones
218
219 if (m_NextSlotIt == s_SlotsList.end()) {
220 m_NextSlotIt = s_SlotsList.begin();
221 RunAfter(CNCDistributionConf::GetCleanAttemptInterval());
222 return;
223 }
224
225 Uint8 min_period = CNCDistributionConf::GetMinForcedCleanPeriod();
226
227 SSyncSlotData* slot_data = *m_NextSlotIt;
228 Uint2 slot = slot_data->slot;
229 slot_data->lock.Lock();
230 // if no sync currently in progress
231 if (slot_data->cnt_sync_started == 0) {
232 slot_data->cleaning = true;
233 slot_data->lock.Unlock();
234 s_DoCleanLog(this, slot);
235 slot_data->lock.Lock();
236 slot_data->cleaning = false;
237 if (slot_data->clean_required) {
238 slot_data->clean_required = false;
239 m_LastForceTime[slot] = CSrvTime::Current().AsUSec();
240 }
241 }
242 else if (!slot_data->clean_required
243 && !CNCPeerControl::HasServersForInitSync()
244 && CNCSyncLog::IsOverLimit(slot)
245 && CSrvTime::Current().AsUSec() - m_LastForceTime[slot] >= min_period)
246 {
247 slot_data->clean_required = true;
248 }
249 slot_data->lock.Unlock();
250
251 ++m_NextSlotIt;
252 SetRunnable();
253 }
254
CNCLogCleaner(void)255 CNCLogCleaner::CNCLogCleaner(void)
256 {
257 #if __NC_TASKS_MONITOR
258 m_TaskName = "CNCLogCleaner";
259 #endif
260 m_NextSlotIt = s_SlotsList.begin();
261 }
262
~CNCLogCleaner(void)263 CNCLogCleaner::~CNCLogCleaner(void)
264 {}
265
266
267
SSyncSlotData(Uint2 slot_)268 SSyncSlotData::SSyncSlotData(Uint2 slot_)
269 : slot(slot_),
270 cnt_sync_started(0),
271 cleaning(false),
272 clean_required(false)
273 {}
274
SSyncSlotSrv(CNCPeerControl * peer_)275 SSyncSlotSrv::SSyncSlotSrv(CNCPeerControl* peer_)
276 : peer(peer_),
277 sync_started(false),
278 is_passive(false),
279 is_by_blobs(false),
280 was_blobs_sync(false),
281 made_initial_sync(false),
282 cnt_event_sync(0),
283 started_cmds(0),
284 next_sync_time(0),
285 last_active_time(CSrvTime::Current().Sec()),
286 last_success_time(0),
287 cur_sync_id(0),
288 cnt_sync_ops(0),
289 result(eSynOK),
290 hint(0)
291 {}
292
293 bool
Initialize(void)294 CNCPeriodicSync::Initialize(void)
295 {
296 if (!CNCDistributionConf::GetPeriodicLogFile().empty()) {
297 s_LogFile = fopen(CNCDistributionConf::GetPeriodicLogFile().c_str(), "a");
298 }
299
300 const vector<Uint2>& slots = CNCDistributionConf::GetSelfSlots();
301 for (Uint2 i = 0; i < slots.size(); ++i) {
302 SSyncSlotData* data = new SSyncSlotData(slots[i]);
303 s_SlotsList.push_back(data);
304 s_SlotsMap[data->slot] = data;
305 }
306 s_ShuffleList<SSyncSlotData*>(s_SlotsList);
307
308 Uint4 cnt_to_sync = 0;
309 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
310 ITERATE(TNCPeerList, it_peer, peers) {
311 CNCPeerControl* peer = CNCPeerControl::Peer(it_peer->first);
312 const vector<Uint2>& commonSlots = CNCDistributionConf::GetCommonSlots(it_peer->first);
313 ITERATE(vector<Uint2>, it_slot, commonSlots) {
314 SSyncSlotData* slot_data = s_SlotsMap[*it_slot];
315 SSyncSlotSrv* slot_srv = new SSyncSlotSrv(peer);
316 slot_data->srvs.push_back(slot_srv);
317 }
318 peer->SetSlotsForInitSync(Uint2(commonSlots.size()));
319 if (!commonSlots.empty()) {
320 CNCStat::AddSyncServer(it_peer->first);
321 ++cnt_to_sync;
322 }
323 }
324 s_ShuffleSrvsLists();
325 CNCPeerControl::SetServersForInitSync(cnt_to_sync);
326
327 Uint1 cnt_syncs = CNCDistributionConf::GetCntActiveSyncs();
328 for (Uint1 i = 0; i < cnt_syncs; ++i) {
329 s_SyncControls.push_back(new CNCActiveSyncControl());
330 if (i == 0) {
331 s_SyncControls[0]->SetFirst();
332 }
333 s_SyncControls[i]->SetRunnable();
334 }
335
336 s_LogCleaner = new CNCLogCleaner();
337 s_LogCleaner->SetRunnable();
338
339 if (cnt_to_sync == 0)
340 CNCServer::InitialSyncComplete();
341
342 return true;
343 }
344
345 void
ReConfig(void)346 CNCPeriodicSync::ReConfig(void)
347 {
348 // all peers, old and new
349 set<CNCPeerControl*> all_peers;
350 // map of common slots
351 map< Uint8, set<Uint2> > peer_slots;
352 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
353 ITERATE(TNCPeerList, it_peer, peers) {
354 const vector<Uint2>& commonSlots = CNCDistributionConf::GetCommonSlots(it_peer->first);
355 peer_slots[it_peer->first].insert(commonSlots.begin(), commonSlots.end());
356 }
357 // self slots do not change
358 const vector<Uint2>& self_slots = CNCDistributionConf::GetSelfSlots();
359 // for each self slot
360 ITERATE(vector<Uint2>, it_slot, self_slots) {
361 SSyncSlotData* slot_data = s_SlotsMap[*it_slot];
362 slot_data->lock.Lock();
363 set<Uint8> processed;
364 // verify old peers
365 ERASE_ITERATE(TSlotSrvsList, it_srv, slot_data->srvs) {
366 Uint8 srv_id = (*it_srv)->peer->GetSrvId();
367 all_peers.insert((*it_srv)->peer);
368 processed.insert(srv_id);
369 map< Uint8, set<Uint2> >::const_iterator p = peer_slots.find(srv_id);
370 if (p != peer_slots.end() && p->second.find( *it_slot) != p->second.end()) {
371 // peer still exists and this slot is still common
372 continue;
373 }
374 // peer is removed, or there is no common slots - remove this server from SyncData
375 VECTOR_ERASE(it_srv, slot_data->srvs);
376 }
377 // look for new peers
378 for (map< Uint8, set<Uint2> >::const_iterator p = peer_slots.begin();
379 p != peer_slots.end(); ++p) {
380 if (processed.find(p->first) != processed.end()) {
381 continue;
382 }
383 processed.insert(p->first);
384 CNCPeerControl* peer = CNCPeerControl::Peer( p->first);
385 all_peers.insert(peer);
386 // if we have common slot with this peer, add it into SyncData
387 if (p->second.find( *it_slot) != p->second.end()) {
388 SSyncSlotSrv* slot_srv = new SSyncSlotSrv( peer);
389 slot_data->srvs.push_back( slot_srv);
390 }
391 }
392 slot_data->lock.Unlock();
393 }
394 // now get/change some counters
395 Uint4 cnt_to_sync = 0;
396 ITERATE( set<CNCPeerControl*>, it_peer, all_peers) {
397 map< Uint8, set<Uint2> >::const_iterator p = peer_slots.find((*it_peer)->GetSrvId());
398 if (p != peer_slots.end()) {
399 (*it_peer)->ReconfSlotsForInitSync(Uint2(p->second.size()));
400 if (p->second.size() != 0) {
401 ++cnt_to_sync;
402 CNCStat::AddSyncServer(p->first);
403 }
404 }
405 }
406 CNCPeerControl::ReconfServersForInitSync(cnt_to_sync);
407 }
408
409 void
ReInitialize(void)410 CNCPeriodicSync::ReInitialize(void)
411 {
412 ITERATE(TSyncControls, c, s_SyncControls) {
413 if (!(*c)->IsStuck()) {
414 return;
415 }
416 }
417 CNCServer::InitialSyncRequired();
418 CNCPeerControl::ResetServersForInitSync();
419 ITERATE(TSyncSlotsList, sl, s_SlotsList) {
420 ITERATE(TSlotSrvsList, srv, (*sl)->srvs) {
421 (*srv)->made_initial_sync = false;
422 (*srv)->peer->ResetSlotsForInitSync();
423 }
424 }
425 }
426
427 void
Finalize(void)428 CNCPeriodicSync::Finalize(void)
429 {
430 if (s_LogFile)
431 fclose(s_LogFile);
432 }
433
434 ESyncInitiateResult
Initiate(Uint8 server_id,Uint2 slot,Uint8 * local_start_rec_no,Uint8 * remote_start_rec_no,TReducedSyncEvents * events,Uint8 * sync_id)435 CNCPeriodicSync::Initiate(Uint8 server_id,
436 Uint2 slot,
437 Uint8* local_start_rec_no,
438 Uint8* remote_start_rec_no,
439 TReducedSyncEvents* events,
440 Uint8* sync_id)
441 {
442 SSyncSlotData* slot_data;
443 SSyncSlotSrv* slot_srv;
444 s_FindServerSlot(server_id, slot, slot_data, slot_srv);
445 if (slot_srv) {
446 slot_srv->peer->RegisterConnSuccess();
447 }
448 if (slot_srv == NULL) {
449 return eUnknownServer;
450 }
451 if ( slot_data->cnt_sync_started != 0 || CTaskServer::IsInShutdown()) {
452 return eServerBusy;
453 }
454
455 ESyncInitiateResult init_res = s_StartSync(slot_data, slot_srv, true);
456 if (init_res != eProceedWithEvents)
457 return init_res;
458
459 slot_srv->started_cmds = 1;
460 *sync_id = slot_srv->cur_sync_id;
461 bool records_available = CNCSyncLog::GetEventsList(server_id,
462 slot,
463 local_start_rec_no,
464 remote_start_rec_no,
465 events);
466 if (slot_srv->last_success_time == 0) {
467 slot_srv->is_by_blobs = true;
468 return eProceedWithBlobs;
469 }
470 if (records_available
471 // || (CNCSyncLog::GetLogSize(slot) == 0 && slot_srv->was_blobs_sync))
472 || (CNCSyncLog::GetLogSize(slot) == 0 && slot_srv->cnt_event_sync < 8))
473 {
474 slot_srv->is_by_blobs = false;
475 return eProceedWithEvents;
476 }
477 else {
478 slot_srv->is_by_blobs = true;
479 return eProceedWithBlobs;
480 }
481 }
482
483 ESyncInitiateResult
CanStartSyncCommand(Uint8 server_id,Uint2 slot,bool can_abort,Uint8 & sync_id)484 CNCPeriodicSync::CanStartSyncCommand(Uint8 server_id,
485 Uint2 slot,
486 bool can_abort,
487 Uint8& sync_id)
488 {
489 SSyncSlotData* slot_data;
490 SSyncSlotSrv* slot_srv;
491 s_FindServerSlot(server_id, slot, slot_data, slot_srv);
492 if (slot_srv == NULL)
493 return eNetworkError;
494
495 CMiniMutexGuard g_slot(slot_data->lock);
496 if (slot_data->clean_required && can_abort) {
497 return eServerBusy;
498 }
499
500 CMiniMutexGuard g_srv(slot_srv->lock);
501 if (!slot_srv->sync_started || !slot_srv->is_passive)
502 return eNetworkError;
503
504 ++slot_srv->started_cmds;
505 ++slot_srv->cnt_sync_ops;
506 slot_srv->last_active_time = CSrvTime::CurSecs();
507 sync_id = slot_srv->cur_sync_id;
508 return eProceedWithEvents;
509 }
510
511 void
MarkCurSyncByBlobs(Uint8 server_id,Uint2 slot,Uint8 sync_id)512 CNCPeriodicSync::MarkCurSyncByBlobs(Uint8 server_id, Uint2 slot, Uint8 sync_id)
513 {
514 SSyncSlotData* slot_data;
515 SSyncSlotSrv* slot_srv;
516 s_FindServerSlot(server_id, slot, slot_data, slot_srv);
517
518 CMiniMutexGuard g_slot(slot_data->lock);
519 CMiniMutexGuard g_srv(slot_srv->lock);
520 if (slot_srv->sync_started && slot_srv->is_passive
521 && slot_srv->cur_sync_id == sync_id)
522 {
523 slot_srv->is_by_blobs = true;
524 }
525 }
526
527 void
SyncCommandFinished(Uint8 server_id,Uint2 slot,Uint8 sync_id)528 CNCPeriodicSync::SyncCommandFinished(Uint8 server_id, Uint2 slot, Uint8 sync_id)
529 {
530 SSyncSlotData* slot_data;
531 SSyncSlotSrv* slot_srv;
532 s_FindServerSlot(server_id, slot, slot_data, slot_srv);
533
534 CMiniMutexGuard g_slot(slot_data->lock);
535 CMiniMutexGuard g_srv(slot_srv->lock);
536 if (slot_srv->sync_started && slot_srv->is_passive
537 && slot_srv->cur_sync_id == sync_id)
538 {
539 if (slot_srv->started_cmds == 0) {
540 SRV_FATAL("SyncSlotData broken");
541 }
542 --slot_srv->started_cmds;
543 slot_srv->last_active_time = CSrvTime::CurSecs();
544 }
545 }
546
547 void
Cancel(Uint8 server_id,Uint2 slot,Uint8 sync_id)548 CNCPeriodicSync::Cancel(Uint8 server_id, Uint2 slot, Uint8 sync_id)
549 {
550 SSyncSlotData* slot_data;
551 SSyncSlotSrv* slot_srv;
552 s_FindServerSlot(server_id, slot, slot_data, slot_srv);
553
554 CMiniMutexGuard g_slot(slot_data->lock);
555 CMiniMutexGuard g_srv(slot_srv->lock);
556 if (slot_srv->sync_started && slot_srv->is_passive
557 && slot_srv->cur_sync_id == sync_id)
558 {
559 if (slot_srv->started_cmds != 0) {
560 --slot_srv->started_cmds;
561 }
562 --slot_srv->cnt_sync_ops;
563 s_CancelSync(slot_data, slot_srv, 0, eSynAborted, 0);
564 }
565 }
566
567 void
Commit(Uint8 server_id,Uint2 slot,Uint8 sync_id,Uint8 local_synced_rec_no,Uint8 remote_synced_rec_no)568 CNCPeriodicSync::Commit(Uint8 server_id,
569 Uint2 slot,
570 Uint8 sync_id,
571 Uint8 local_synced_rec_no,
572 Uint8 remote_synced_rec_no)
573 {
574 CNCSyncLog::SetLastSyncRecNo(server_id, slot,
575 local_synced_rec_no,
576 remote_synced_rec_no);
577 CNCBlobStorage::SaveMaxSyncLogRecNo();
578
579 SSyncSlotData* slot_data;
580 SSyncSlotSrv* slot_srv;
581 s_FindServerSlot(server_id, slot, slot_data, slot_srv);
582
583 CMiniMutexGuard g_slot(slot_data->lock);
584 CMiniMutexGuard g_srv(slot_srv->lock);
585 if (slot_srv->sync_started && slot_srv->is_passive
586 && slot_srv->cur_sync_id == sync_id)
587 {
588 --slot_srv->cnt_sync_ops;
589 s_CommitSync(slot_data, slot_srv, 0);
590 }
591 }
592
593
CNCActiveSyncControl(void)594 CNCActiveSyncControl::CNCActiveSyncControl(void)
595 {
596 #if __NC_TASKS_MONITOR
597 m_TaskName = "CNCActiveSyncControl";
598 #endif
599 SetState(&CNCActiveSyncControl::x_StartScanSlots);
600 m_ForceInitSync = false;
601 m_Stuck = false;
602 m_First = false;
603 m_NeedReply = false;
604 m_StartTime = 0;
605 m_LoopStart = 0;
606 m_CntUnfinished = 0;
607 m_MyTrust = m_TheirTrust = 0;
608 }
609
~CNCActiveSyncControl(void)610 CNCActiveSyncControl::~CNCActiveSyncControl(void) {
611 }
612
613
614 CNCActiveSyncControl::State
x_StartScanSlots(void)615 CNCActiveSyncControl::x_StartScanSlots(void)
616 {
617 if (CTaskServer::IsInShutdown()) {
618 return NULL;
619 }
620 if ( m_CntUnfinished == 0 && CNCBlobStorage::NeedStopWrite()) {
621 // in this scenario, I do not start sync, but still accept sync requests from others
622 RunAfter(CNCDistributionConf::GetPeriodicSyncInterval() / kUSecsPerSecond);
623 if (!m_Stuck && !CNCBlobStorage::IsDraining() &&
624 !CNCDistributionConf::GetPeers().empty()) {
625 m_Stuck = true;
626 CNCPeriodicSync::ReInitialize();
627 }
628 return NULL;
629 }
630 if (m_First) {
631 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
632 ITERATE(TNCPeerList, it_peer, peers) {
633 CNCPeerControl::Peer(it_peer->first)->PeerHandshake();
634 }
635 }
636 m_Stuck = false;
637
638 m_DidSync = false;
639 m_MinNextTime = numeric_limits<Uint8>::max();
640 m_LoopStart = CSrvTime::Current().AsUSec();
641 m_NextSlotIt = s_SlotsList.begin();
642 m_VisitedSrv.clear();
643 m_CntUnfinished = 0;
644 return &CNCActiveSyncControl::x_CheckSlotOurSync;
645 }
646
647 CNCActiveSyncControl::State
x_CheckSlotOurSync(void)648 CNCActiveSyncControl::x_CheckSlotOurSync(void)
649 {
650 if (CTaskServer::IsInShutdown())
651 return NULL;
652 if (m_NextSlotIt == s_SlotsList.end())
653 return &CNCActiveSyncControl::x_FinishScanSlots;
654
655 m_SlotData = *m_NextSlotIt;
656 m_SlotData->lock.Lock();
657 TSlotSrvsList srvs = m_SlotData->srvs;
658 m_SlotData->lock.Unlock();
659 if ((m_SlotData->cnt_sync_started == 0 || m_ForceInitSync) && !CNCBlobStorage::NeedStopWrite()) {
660 ITERATE(TSlotSrvsList, it_srv, srvs) {
661 m_SlotSrv = *it_srv;
662 if (m_VisitedSrv.find(m_SlotSrv) != m_VisitedSrv.end()) {
663 continue;
664 }
665 Uint8 next_time = max(m_SlotSrv->next_sync_time,
666 m_SlotSrv->peer->GetNextSyncTime());
667 m_VisitedSrv.insert(m_SlotSrv);
668 if (next_time <= CSrvTime::Current().AsUSec() || !m_SlotSrv->made_initial_sync) {
669 return &CNCActiveSyncControl::x_DoPeriodicSync;
670 }
671 }
672 }
673 #if 0
674 else {
675 ITERATE(TSlotSrvsList, it_srv, srvs) {
676 SSyncSlotSrv* slot_srv = *it_srv;
677 if (slot_srv->sync_started) {
678 if (CSrvTime::CurSecs() - slot_srv->last_active_time
679 > (CNCDistributionConf::GetNetworkErrorTimeout() / kUSecsPerSecond)) {
680 s_CancelSync(m_SlotData, slot_srv, 0);
681 return &CNCActiveSyncControl::x_CheckSlotOurSync;
682 } else {
683 ++m_CntUnfinished;
684 }
685 }
686 }
687 }
688 #endif
689 return &CNCActiveSyncControl::x_CheckSlotTheirSync;
690 }
691
692 CNCActiveSyncControl::State
x_CheckSlotTheirSync(void)693 CNCActiveSyncControl::x_CheckSlotTheirSync(void)
694 {
695 if (CTaskServer::IsInShutdown())
696 return NULL;
697
698 m_SlotData->lock.Lock();
699 ITERATE(TSlotSrvsList, it_srv, m_SlotData->srvs) {
700 SSyncSlotSrv* slot_srv = *it_srv;
701 slot_srv->lock.Lock();
702 if (slot_srv->sync_started) {
703 if (slot_srv->is_passive && slot_srv->started_cmds == 0 &&
704 CSrvTime::CurSecs() - slot_srv->last_active_time
705 > (CNCDistributionConf::GetPeriodicSyncTimeout() / kUSecsPerSecond)) {
706 SRV_LOG(Warning, "Periodic sync canceled by timeout");
707 s_CancelSync(m_SlotData, slot_srv, 0, eSynAborted, NC_SYNC_HINT);
708 slot_srv->lock.Unlock();
709 m_SlotData->lock.Unlock();
710 return &CNCActiveSyncControl::x_CheckSlotTheirSync;
711 } else {
712 ++m_CntUnfinished;
713 }
714 }
715 else {
716 Uint8 next_time = max(slot_srv->next_sync_time,
717 slot_srv->peer->GetNextSyncTime());
718 // calculate next time we need sync
719 m_MinNextTime = min(m_MinNextTime, next_time);
720 }
721 slot_srv->lock.Unlock();
722 }
723 m_SlotData->lock.Unlock();
724
725 ++m_NextSlotIt;
726 m_VisitedSrv.clear();
727 m_NeedReply = false;
728 SetState(&CNCActiveSyncControl::x_CheckSlotOurSync);
729 SetRunnable();
730 return NULL;
731 }
732
733 CNCActiveSyncControl::State
x_FinishScanSlots(void)734 CNCActiveSyncControl::x_FinishScanSlots(void)
735 {
736 Uint8 sync_interval = CNCDistributionConf::GetPeriodicSyncInterval();
737 // I am not sure using m_ForceInitSync makes sense
738 // m_ForceInitSync = CNCPeerControl::HasServersForInitSync() && !m_DidSync;
739 Uint8 now = CSrvTime::Current().AsUSec();
740 if (now - m_LoopStart >= sync_interval) {
741 s_ShuffleSrvsLists();
742 }
743
744 Uint8 wait_time;
745 if (m_MinNextTime > now) {
746 wait_time = m_MinNextTime - now;
747 if (wait_time > sync_interval)
748 wait_time = sync_interval;
749 }
750 else {
751 s_RndLock.Lock();
752 wait_time = s_Rnd.GetRand(0, 10000);
753 s_RndLock.Unlock();
754 }
755
756 Uint4 timeout_sec = Uint4(wait_time / kUSecsPerSecond);
757 if (wait_time % kUSecsPerSecond)
758 ++timeout_sec;
759
760 SetState(&CNCActiveSyncControl::x_StartScanSlots);
761 RunAfter(timeout_sec);
762 return NULL;
763 }
764
765 CNCActiveSyncControl::State
x_DoPeriodicSync(void)766 CNCActiveSyncControl::x_DoPeriodicSync(void)
767 {
768 ESyncInitiateResult init_res = s_StartSync(m_SlotData, m_SlotSrv, false);
769 if (init_res != eProceedWithEvents) {
770 return &CNCActiveSyncControl::x_CheckSlotOurSync;
771 }
772
773 m_SrvId = m_SlotSrv->peer->GetSrvId();
774 m_MyTrust = CNCDistributionConf::GetSelfTrustLevel();
775 m_TheirTrust = m_SlotSrv->peer->GetTrustLevel();
776 m_Slot = m_SlotData->slot;
777 m_Result = eSynOK;
778 m_Hint = NC_SYNC_HINT;
779 m_Progress = 0;
780 m_SlotSrv->is_by_blobs = false;
781 m_StartedCmds = 0;
782 m_FinishSyncCalled = false;
783 m_NextTask = eSynNoTask;
784 m_StartTime = CSrvTime::Current().AsUSec();
785
786 m_ReadOK = m_ReadERR = 0;
787 m_WriteOK = m_WriteERR = 0;
788 m_ProlongOK = m_ProlongERR = 0;
789 m_DelOK = m_DelERR = 0;
790 m_NeedReply = false;
791
792 CreateNewDiagCtx();
793 CSrvDiagMsg().StartRequest()
794 .PrintParam("_type", "sync")
795 .PrintParam("srv_id", m_SrvId)
796 .PrintParam("slot", m_Slot)
797 .PrintParam("self_id", CNCDistributionConf::GetSelfID());
798
799 CNCSyncLog::GetLastSyncedRecNo(m_SrvId, m_Slot,
800 &m_LocalStartRecNo, &m_RemoteStartRecNo);
801
802 CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn();
803 if (!conn) {
804 m_Result = eSynNetworkError;
805 m_Hint = NC_SYNC_HINT;
806 return &CNCActiveSyncControl::x_FinishSync;
807 }
808
809 m_StartedCmds = 1;
810 m_SyncHandlers.clear();
811 conn->SyncStart(this, m_LocalStartRecNo, m_RemoteStartRecNo);
812 return &CNCActiveSyncControl::x_WaitSyncStarted;
813 }
814
815 CNCActiveSyncControl::State
x_WaitSyncStarted(void)816 CNCActiveSyncControl::x_WaitSyncStarted(void)
817 {
818 // wait for sync started
819 // see CmdFinished()
820 if (m_StartedCmds != 0) {
821 return NULL;
822 }
823 if (CTaskServer::IsInShutdown()) {
824 m_Result = eSynAborted;
825 m_Hint = NC_SYNC_HINT;
826 }
827 if (m_Result != eSynOK)
828 return &CNCActiveSyncControl::x_FinishSync;
829
830 m_NeedReply = true;
831 m_LocalSyncedRecNo = 0;
832 m_RemoteSyncedRecNo = 0;
833 m_SlotSrv->last_active_time = CSrvTime::CurSecs();
834 // depending on the reply
835 if (m_SlotSrv->is_by_blobs)
836 return &CNCActiveSyncControl::x_PrepareSyncByBlobs;
837 else
838 return &CNCActiveSyncControl::x_PrepareSyncByEvents;
839 }
840
841 CNCActiveSyncControl::State
x_ExecuteSyncCommands(void)842 CNCActiveSyncControl::x_ExecuteSyncCommands(void)
843 {
844 x_CalcNextTask();
845 return &CNCActiveSyncControl::x_WaitForExecutingTasks;
846 }
847
848 CNCActiveSyncControl::State
x_ExecuteFinalize(void)849 CNCActiveSyncControl::x_ExecuteFinalize(void)
850 {
851 if (m_FinishSyncCalled) {
852 SRV_FATAL("Command finalized already");
853 }
854 if (!CTaskServer::IsInShutdown()) {
855 if (m_SlotSrv->peer->FinishSync(this)) {
856 m_FinishSyncCalled = true;
857 return &CNCActiveSyncControl::x_WaitForExecutingTasks;
858 } else {
859 RunAfter(1);
860 return NULL;
861 }
862 }
863 m_Result = eSynNetworkError;
864 m_Hint = NC_SYNC_HINT;
865 return &CNCActiveSyncControl::x_FinishSync;
866 }
867
868 CNCActiveSyncControl::State
x_WaitForExecutingTasks(void)869 CNCActiveSyncControl::x_WaitForExecutingTasks(void)
870 {
871 if (CTaskServer::IsInShutdown()) {
872 return NULL;
873 }
874 bool is_locked = false;
875 m_Lock.Lock(); is_locked = true;
876 if (m_NextTask == eSynNoTask) {
877 if (m_SyncHandlers.empty()) {
878 m_Lock.Unlock();
879 return &CNCActiveSyncControl::x_FinishSync;
880 }
881 } else if (m_NextTask == eSynNeedFinalize) {
882 if (m_SyncHandlers.empty()) {
883 CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn();
884 if (conn) {
885 m_SyncHandlers.insert(conn);
886 SSyncTaskInfo task_info;
887 GetNextTask(task_info);
888 m_Lock.Unlock(); is_locked = false;
889 ExecuteSyncTask(task_info, conn);
890 }
891 }
892 } else {
893 for (;m_NextTask > eSynNeedFinalize;) {
894 CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn(true);
895 if (!conn) {
896 break;
897 }
898 if (!is_locked) {m_Lock.Lock(); is_locked = true;}
899 m_SyncHandlers.insert(conn);
900 SSyncTaskInfo task_info;
901 GetNextTask(task_info);
902 m_Lock.Unlock(); is_locked = false;
903 ExecuteSyncTask(task_info, conn);
904 }
905 }
906 if (is_locked) {
907 if (m_SyncHandlers.empty()) {
908 m_Result = eSynServerBusy;
909 m_Hint = NC_SYNC_HINT;
910 m_Lock.Unlock();
911 return &CNCActiveSyncControl::x_FinishSync;
912 }
913 ITERATE(set<CNCActiveHandler*>, h, m_SyncHandlers) {
914 (*h)->CheckCommandTimeout();
915 }
916 m_Lock.Unlock();
917 RunAfter(1);
918 }
919 return NULL;
920 }
921
922 CNCActiveSyncControl::State
x_FinishSync(void)923 CNCActiveSyncControl::x_FinishSync(void)
924 {
925 x_CleanSyncObjects();
926
927 switch (m_Result) {
928 case eSynOK:
929 CNCBlobStorage::SaveMaxSyncLogRecNo();
930 break;
931 case eSynAborted:
932 GetDiagCtx()->SetRequestStatus(eStatus_SyncAborted);
933 break;
934 case eSynCrossSynced:
935 GetDiagCtx()->SetRequestStatus(eStatus_CrossSync);
936 break;
937 case eSynServerBusy:
938 GetDiagCtx()->SetRequestStatus(eStatus_SyncBusy);
939 break;
940 case eSynNetworkError:
941 GetDiagCtx()->SetRequestStatus(eStatus_PeerError);
942 break;
943 }
944
945 CSrvDiagMsg().PrintExtra()
946 .PrintParam("sync", (m_SlotSrv->is_by_blobs? "blobs": "events"))
947 .PrintParam("r_ok", m_ReadOK)
948 .PrintParam("r_err", m_ReadERR)
949 .PrintParam("w_ok", m_WriteOK)
950 .PrintParam("w_err", m_WriteERR)
951 .PrintParam("p_ok", m_ProlongOK)
952 .PrintParam("p_err", m_ProlongERR)
953 .PrintParam("d_ok", m_DelOK)
954 .PrintParam("d_err", m_DelERR);
955 CSrvDiagMsg().StopRequest();
956 ReleaseDiagCtx();
957
958 if (s_LogFile) {
959 Uint8 end_time = CSrvTime::Current().AsUSec();
960 Uint8 log_size = CNCSyncLog::GetLogSize();
961 fprintf(s_LogFile,
962 "%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
963 ",%u,%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
964 ",%" NCBI_UINT8_FORMAT_SPEC
965 ",%d,%d,%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
966 ",%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
967 ",%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
968 ",%" NCBI_UINT8_FORMAT_SPEC ",%u,%u\n",
969 CNCDistributionConf::GetSelfID(), m_SrvId, m_Slot,
970 m_StartTime, end_time, end_time - m_StartTime,
971 int(m_SlotSrv->is_by_blobs), m_Result, log_size,
972 m_ReadOK, m_ReadERR, m_WriteOK, m_WriteERR,
973 m_ProlongOK, m_ProlongERR,
974 Uint4(CNCPeerControl::sm_TotalCopyRequests.Get()),
975 Uint4(CNCPeerControl::sm_CopyReqsRejected.Get()));
976 fflush(s_LogFile);
977 }
978
979 CMiniMutexGuard g_slot(m_SlotData->lock);
980 CMiniMutexGuard g_srv(m_SlotSrv->lock);
981 if (m_Result == eSynOK) {
982 s_CommitSync(m_SlotData, m_SlotSrv, NC_SYNC_HINT);
983 } else {
984 // m_SlotSrv->peer->RemoveSyncControl(this);
985 s_CancelSync(m_SlotData, m_SlotSrv, CNCDistributionConf::GetFailedSyncRetryDelay(), m_Result, m_Hint);
986 }
987 m_DidSync = m_Result == eSynOK;
988
989 SetState(&CNCActiveSyncControl::x_CheckSlotOurSync);
990 SetRunnable();
991 return NULL;
992 }
993
994 CNCActiveSyncControl::State
x_PrepareSyncByEvents(void)995 CNCActiveSyncControl::x_PrepareSyncByEvents(void)
996 {
997 m_Events2Get.clear();
998 m_Events2Send.clear();
999 m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1000 #if 0
1001 if (CNCSyncLog::GetSyncOperations(m_SrvId, m_Slot,
1002 m_LocalStartRecNo,
1003 m_RemoteStartRecNo,
1004 m_RemoteEvents,
1005 &m_Events2Get,
1006 &m_Events2Send,
1007 &m_LocalSyncedRecNo,
1008 &m_RemoteSyncedRecNo)
1009 || (CNCSyncLog::GetLogSize(m_Slot) == 0 && m_SlotSrv->was_blobs_sync))
1010 {
1011 m_CurGetEvent = m_Events2Get.begin();
1012 m_CurSendEvent = m_Events2Send.begin();
1013 return &CNCActiveSyncControl::x_ExecuteSyncCommands;
1014 }
1015
1016 // sync by blob list
1017 m_SlotSrv->is_by_blobs = true;
1018 CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn();
1019 if (!conn) {
1020 m_Result = eSynNetworkError;
1021 m_Hint = NC_SYNC_HINT;
1022 return &CNCActiveSyncControl::x_FinishSync;
1023 }
1024
1025 // request blob list
1026 m_StartedCmds = 1;
1027 conn->SyncBlobsList(this);
1028 m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1029 return &CNCActiveSyncControl::x_WaitForBlobList;
1030 #else
1031 CNCSyncLog::GetSyncOperations(m_SrvId, m_Slot,
1032 m_LocalStartRecNo,
1033 m_RemoteStartRecNo,
1034 m_RemoteEvents,
1035 &m_Events2Get,
1036 &m_Events2Send,
1037 &m_LocalSyncedRecNo,
1038 &m_RemoteSyncedRecNo);
1039 m_CurGetEvent = m_Events2Get.begin();
1040 m_CurSendEvent = m_Events2Send.begin();
1041 return &CNCActiveSyncControl::x_ExecuteSyncCommands;
1042 #endif
1043 }
1044
1045 CNCActiveSyncControl::State
x_WaitForBlobList(void)1046 CNCActiveSyncControl::x_WaitForBlobList(void)
1047 {
1048 if (m_StartedCmds != 0) {
1049 return NULL;
1050 }
1051 if (CTaskServer::IsInShutdown()) {
1052 m_Result = eSynAborted;
1053 m_Hint = NC_SYNC_HINT;
1054 }
1055 if (m_Result != eSynOK)
1056 return &CNCActiveSyncControl::x_FinishSync;
1057
1058 return &CNCActiveSyncControl::x_PrepareSyncByBlobs;
1059 }
1060
1061 CNCActiveSyncControl::State
x_PrepareSyncByBlobs(void)1062 CNCActiveSyncControl::x_PrepareSyncByBlobs(void)
1063 {
1064 m_LocalSyncedRecNo = CNCSyncLog::GetCurrentRecNo(m_Slot);
1065 m_RemoteSyncedRecNo = m_RemoteStartRecNo;
1066 m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1067
1068 ITERATE(TNCBlobSumList, it, m_LocalBlobs) {
1069 delete it->second;
1070 }
1071 m_LocalBlobs.clear();
1072 CNCBlobStorage::GetFullBlobsList(m_Slot, m_LocalBlobs, NULL);
1073
1074 m_CurLocalBlob = m_LocalBlobs.begin();
1075 m_CurRemoteBlob = m_RemoteBlobs.begin();
1076 m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1077 return &CNCActiveSyncControl::x_ExecuteSyncCommands;
1078 }
1079
1080 void
x_CleanSyncObjects(void)1081 CNCActiveSyncControl::x_CleanSyncObjects(void)
1082 {
1083 ITERATE(TNCBlobSumList, it, m_RemoteBlobs) {
1084 delete it->second;
1085 }
1086 m_RemoteBlobs.clear();
1087 m_CurRemoteBlob = m_RemoteBlobs.begin();
1088
1089 #if 0
1090 ITERATE(TNCBlobSumList, it, m_LocalBlobs) {
1091 delete it->second;
1092 }
1093 m_LocalBlobs.clear();
1094 m_CurLocalBlob = m_LocalBlobs.begin();
1095 #endif
1096
1097 ITERATE(TReducedSyncEvents, it, m_RemoteEvents) {
1098 delete it->second.wr_or_rm_event;
1099 delete it->second.prolong_event;
1100 }
1101 m_RemoteEvents.clear();
1102
1103 #if 0
1104 m_Events2Get.clear();
1105 m_Events2Send.clear();
1106 m_CurGetEvent = m_Events2Get.begin();
1107 m_CurSendEvent = m_Events2Send.begin();
1108 #endif
1109 }
1110
1111 void
x_CalcNextTask(void)1112 CNCActiveSyncControl::x_CalcNextTask(void)
1113 {
1114 for (;;) {
1115 switch (m_NextTask) {
1116 case eSynEventSend:
1117 ++m_CurSendEvent;
1118 break;
1119 case eSynEventGet:
1120 ++m_CurGetEvent;
1121 break;
1122 case eSynBlobUpdateOur:
1123 case eSynBlobUpdatePeer:
1124 ++m_CurLocalBlob;
1125 ++m_CurRemoteBlob;
1126 break;
1127 case eSynBlobSend:
1128 ++m_CurLocalBlob;
1129 break;
1130 case eSynBlobGet:
1131 ++m_CurRemoteBlob;
1132 break;
1133 case eSynNoTask:
1134 break;
1135 case eSynNeedFinalize:
1136 m_NextTask = eSynNoTask;
1137 return;
1138 }
1139
1140 if (m_SlotData->clean_required && m_Result != eSynNetworkError) {
1141 m_Result = eSynAborted;
1142 m_Hint = NC_SYNC_HINT;
1143 }
1144
1145 if (m_Result == eSynNetworkError || m_Result == eSynAborted)
1146 m_NextTask = eSynNeedFinalize;
1147 else if (!m_SlotSrv->is_by_blobs) {
1148 if (m_CurSendEvent != m_Events2Send.end()) {
1149 m_NextTask = eSynEventSend;
1150 ++m_Progress;
1151 } else if (m_CurGetEvent != m_Events2Get.end()) {
1152 m_NextTask = eSynEventGet;
1153 ++m_Progress;
1154 } else {
1155 m_NextTask = eSynNeedFinalize;
1156 }
1157 }
1158 else {
1159 sync_next_key:
1160 if (m_CurLocalBlob != m_LocalBlobs.end()
1161 && m_CurRemoteBlob != m_RemoteBlobs.end())
1162 {
1163 if (m_CurLocalBlob->first == m_CurRemoteBlob->first) {
1164 if (m_CurLocalBlob->second->isEqual(*m_CurRemoteBlob->second)) {
1165 // Equivalent blobs, skip them.
1166 ++m_CurLocalBlob;
1167 ++m_CurRemoteBlob;
1168 ++m_Progress;
1169 goto sync_next_key;
1170 }
1171
1172 // The same blob key. Test which one is newer.
1173 if (m_CurLocalBlob->second->isOlder(*m_CurRemoteBlob->second)) {
1174 m_NextTask = eSynBlobUpdateOur;
1175 ++m_Progress;
1176 } else {
1177 m_NextTask = eSynBlobUpdatePeer;
1178 ++m_Progress;
1179 }
1180 }
1181 else if (m_CurLocalBlob->first < m_CurRemoteBlob->first) {
1182 if (m_CurLocalBlob->second->isExpired()) {
1183 ++m_CurLocalBlob;
1184 goto sync_next_key;
1185 }
1186 m_NextTask = eSynBlobSend;
1187 ++m_Progress;
1188 }
1189 else {
1190 m_NextTask = eSynBlobGet;
1191 ++m_Progress;
1192 }
1193 }
1194 // Process the tails of the lists
1195 else if (m_CurLocalBlob != m_LocalBlobs.end()) {
1196 if (m_CurLocalBlob->second->isExpired()) {
1197 ++m_CurLocalBlob;
1198 goto sync_next_key;
1199 }
1200 m_NextTask = eSynBlobSend;
1201 ++m_Progress;
1202 } else if (m_CurRemoteBlob != m_RemoteBlobs.end()) {
1203 m_NextTask = eSynBlobGet;
1204 ++m_Progress;
1205 } else {
1206 m_NextTask = eSynNeedFinalize;
1207 }
1208 }
1209 /*
1210 SEND:
1211 m_MyTrust >= m_TheirTrust ? SEND : NOOP
1212 GET:
1213 m_MyTrust > m_TheirTrust ? NOOP : GET
1214 draining ? NOOP (only update existing blobs, never get new ones)
1215 UPDATE:
1216 no restrictions
1217 29jun17: changed to behave same as in SEND/GET
1218 */
1219 if (m_NextTask == eSynEventSend || m_NextTask == eSynBlobSend || m_NextTask == eSynBlobUpdatePeer) {
1220 if (m_MyTrust < m_TheirTrust) {
1221 continue;
1222 }
1223 }
1224 if (m_NextTask == eSynEventGet || m_NextTask == eSynBlobGet || m_NextTask == eSynBlobUpdateOur) {
1225 if (m_MyTrust > m_TheirTrust || (CNCBlobStorage::IsDraining() && m_NextTask != eSynBlobUpdateOur)) {
1226 continue;
1227 }
1228 }
1229 break;
1230 };
1231
1232 // sanity check
1233 string blob_key;
1234 switch (m_NextTask) {
1235 case eSynEventSend: blob_key = (*m_CurSendEvent)->key.PackedKey(); break;
1236 case eSynEventGet: blob_key = (*m_CurGetEvent)->key.PackedKey(); break;
1237 case eSynBlobSend: blob_key = m_CurLocalBlob->first; break;
1238 case eSynBlobGet: blob_key = m_CurRemoteBlob->first; break;
1239 default:
1240 break;
1241 }
1242 if (!blob_key.empty()) {
1243 Uint2 slot=0, bucket;
1244 if (!CNCDistributionConf::GetSlotByKey(blob_key,slot, bucket) || slot != m_Slot) {
1245 m_Result = eSynAborted;
1246 m_Hint = NC_SYNC_HINT;
1247 m_NextTask = eSynNeedFinalize;
1248 }
1249 }
1250 }
1251
1252 void
x_DoEventSend(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1253 CNCActiveSyncControl::x_DoEventSend(const SSyncTaskInfo& task_info,
1254 CNCActiveHandler* conn)
1255 {
1256 SNCSyncEvent* event = *task_info.send_evt;
1257 if (event->blob_size > CNCDistributionConf::GetMaxBlobSizeSync() ||
1258 !conn->GetPeer()->AcceptsBlobKey(event->key)) {
1259 CmdFinished( eSynOK, eSynActionWrite, conn, NC_SYNC_HINT);
1260 conn->Release();
1261 return;
1262 }
1263 switch (event->event_type) {
1264 default:
1265 break;
1266 case eSyncWrite:
1267 conn->SyncSend(this, event);
1268 break;
1269 case eSyncProlong:
1270 conn->SyncProlongPeer(this, event);
1271 break;
1272 }
1273 }
1274
1275 void
x_DoEventGet(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1276 CNCActiveSyncControl::x_DoEventGet(const SSyncTaskInfo& task_info,
1277 CNCActiveHandler* conn)
1278 {
1279 SNCSyncEvent* event = *task_info.get_evt;
1280 switch (event->event_type) {
1281 default:
1282 break;
1283 case eSyncWrite:
1284 conn->SyncRead(this, event);
1285 break;
1286 case eSyncProlong:
1287 conn->SyncProlongOur(this, event);
1288 break;
1289 }
1290 }
1291
1292 void
x_DoBlobUpdateOur(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1293 CNCActiveSyncControl::x_DoBlobUpdateOur(const SSyncTaskInfo& task_info,
1294 CNCActiveHandler* conn)
1295 {
1296 string key(task_info.remote_blob->first);
1297 SNCBlobSummary* local_blob = task_info.local_blob->second;
1298 SNCBlobSummary* remote_blob = task_info.remote_blob->second;
1299 if (local_blob->isSameData(*remote_blob))
1300 conn->SyncProlongOur(this, key, *remote_blob);
1301 else
1302 conn->SyncRead(this, key, remote_blob->create_time);
1303 }
1304
1305 void
x_DoBlobUpdatePeer(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1306 CNCActiveSyncControl::x_DoBlobUpdatePeer(const SSyncTaskInfo& task_info,
1307 CNCActiveHandler* conn)
1308 {
1309 CNCBlobKeyLight key(task_info.remote_blob->first);
1310 if (!conn->GetPeer()->AcceptsBlobKey(key)) {
1311 CmdFinished( eSynOK, eSynActionWrite, conn, NC_SYNC_HINT);
1312 conn->Release();
1313 return;
1314 }
1315 SNCBlobSummary* local_blob = task_info.local_blob->second;
1316 SNCBlobSummary* remote_blob = task_info.remote_blob->second;
1317 if (local_blob->isSameData(*remote_blob))
1318 conn->SyncProlongPeer(this, key, *local_blob);
1319 else
1320 conn->SyncSend(this, key);
1321 }
1322
1323 void
x_DoBlobSend(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1324 CNCActiveSyncControl::x_DoBlobSend(const SSyncTaskInfo& task_info,
1325 CNCActiveHandler* conn)
1326 {
1327 CNCBlobKeyLight key(task_info.local_blob->first);
1328 if (!conn->GetPeer()->AcceptsBlobKey(key)) {
1329 CmdFinished( eSynOK, eSynActionWrite, conn, NC_SYNC_HINT);
1330 conn->Release();
1331 return;
1332 }
1333 conn->SyncSend(this, key);
1334 }
1335
1336 void
x_DoBlobGet(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1337 CNCActiveSyncControl::x_DoBlobGet(const SSyncTaskInfo& task_info,
1338 CNCActiveHandler* conn)
1339 {
1340 string key(task_info.remote_blob->first);
1341 Uint8 create_time = task_info.remote_blob->second->create_time;
1342 conn->SyncRead(this, key, create_time);
1343 }
1344
1345 void
x_DoFinalize(CNCActiveHandler * conn)1346 CNCActiveSyncControl::x_DoFinalize(CNCActiveHandler* conn)
1347 {
1348 if (m_Result == eSynOK) {
1349 CNCSyncLog::SetLastSyncRecNo(m_SrvId, m_Slot,
1350 m_LocalSyncedRecNo, m_RemoteSyncedRecNo);
1351 conn->SyncCommit(this, m_LocalSyncedRecNo, m_RemoteSyncedRecNo);
1352 }
1353 else {
1354 conn->SyncCancel(this);
1355 }
1356 }
1357
1358 bool
GetNextTask(SSyncTaskInfo & task_info,bool * is_valid)1359 CNCActiveSyncControl::GetNextTask(SSyncTaskInfo& task_info, bool* is_valid)
1360 {
1361 // m_Lock.Lock();
1362 if (m_NextTask == eSynNoTask) {
1363 if (is_valid) {
1364 *is_valid = false;
1365 task_info.task_type = eSynNoTask;
1366 m_Lock.Unlock();
1367 return false;
1368 } else {
1369 SRV_FATAL("Invalid state: m_NextTask: " << m_NextTask);
1370 }
1371 }
1372 task_info.task_type = m_NextTask;
1373 task_info.get_evt = m_CurGetEvent;
1374 task_info.send_evt = m_CurSendEvent;
1375 task_info.local_blob = m_CurLocalBlob;
1376 task_info.remote_blob = m_CurRemoteBlob;
1377 ++m_StartedCmds;
1378 if (m_StartedCmds == 0) {
1379 SRV_FATAL("Invalid state: no m_StartedCmds");
1380 }
1381 if (m_NextTask != eSynNeedFinalize)
1382 ++m_SlotSrv->cnt_sync_ops;
1383 m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1384 x_CalcNextTask();
1385 bool has_more = m_NextTask != eSynNeedFinalize && m_NextTask != eSynNoTask;
1386 // m_Lock.Unlock();
1387 if (is_valid) {
1388 *is_valid = true;
1389 }
1390 return has_more;
1391 }
1392
1393 void
ExecuteSyncTask(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1394 CNCActiveSyncControl::ExecuteSyncTask(const SSyncTaskInfo& task_info,
1395 CNCActiveHandler* conn)
1396 {
1397 switch (task_info.task_type) {
1398 case eSynEventSend:
1399 x_DoEventSend(task_info, conn);
1400 break;
1401 case eSynEventGet:
1402 x_DoEventGet(task_info, conn);
1403 break;
1404 case eSynBlobUpdateOur:
1405 x_DoBlobUpdateOur(task_info, conn);
1406 break;
1407 case eSynBlobUpdatePeer:
1408 x_DoBlobUpdatePeer(task_info, conn);
1409 break;
1410 case eSynBlobSend:
1411 x_DoBlobSend(task_info, conn);
1412 break;
1413 case eSynBlobGet:
1414 x_DoBlobGet(task_info, conn);
1415 break;
1416 case eSynNeedFinalize:
1417 x_DoFinalize(conn);
1418 break;
1419 default:
1420 SRV_FATAL("Unsupported task type: " << task_info.task_type);
1421 }
1422 }
1423
1424 void
CmdFinished(ESyncResult res,ESynActionType action,CNCActiveHandler * conn,int hint)1425 CNCActiveSyncControl::CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler* conn, int hint)
1426 {
1427 m_Lock.Lock();
1428 m_SyncHandlers.erase(conn);
1429 --m_StartedCmds;
1430 if (res == eSynOK) {
1431 switch (action) {
1432 case eSynActionRead:
1433 ++m_ReadOK;
1434 break;
1435 case eSynActionWrite:
1436 ++m_WriteOK;
1437 break;
1438 case eSynActionProlong:
1439 ++m_ProlongOK;
1440 break;
1441 case eSynActionRemove:
1442 ++m_DelOK;
1443 break;
1444 case eSynActionNone:
1445 break;
1446 }
1447 }
1448 else {
1449 switch (action) {
1450 case eSynActionRead:
1451 ++m_ReadERR;
1452 break;
1453 case eSynActionWrite:
1454 ++m_WriteERR;
1455 break;
1456 case eSynActionProlong:
1457 ++m_ProlongERR;
1458 break;
1459 case eSynActionRemove:
1460 ++m_DelERR;
1461 break;
1462 case eSynActionNone:
1463 break;
1464 }
1465 }
1466
1467 if (m_Result == eSynOK) {
1468 if (res == eSynAborted && m_Result != eSynNetworkError) {
1469 m_Result = eSynAborted;
1470 m_Hint = hint;
1471 } else if (res != eSynOK) {
1472 m_Result = res;
1473 m_Hint = hint;
1474 }
1475 }
1476 if (m_Result != eSynOK) {
1477 if (m_NextTask >= eSynNeedFinalize) {
1478 m_NextTask = eSynNeedFinalize;
1479 } else {
1480 m_NextTask = eSynNoTask;
1481 }
1482 }
1483
1484 SetRunnable();
1485 m_Lock.Unlock();
1486 }
1487
PrintState(TNCBufferType & task,const CTempString & mask)1488 void CNCActiveSyncControl::PrintState(TNCBufferType& task, const CTempString& mask)
1489 {
1490 Uint2 slot = 0;
1491 bool one_slot = false, is_audit = false;
1492 if (!mask.empty()) {
1493 is_audit = mask == "audit";
1494 if (!is_audit) {
1495 try {
1496 slot = NStr::StringToUInt(mask);
1497 one_slot = true;
1498 } catch (...) {
1499 }
1500 }
1501 }
1502
1503 char buf[50];
1504 string is("\": "), iss("\": \""), eol(",\n\"");
1505
1506 task.WriteText(",\n\"SyncControls\": [");
1507 ITERATE(TSyncControls, c, s_SyncControls) {
1508 if (c != s_SyncControls.begin()) {
1509 task.WriteText(",");
1510 }
1511 task.WriteText("{");
1512 task.WriteText("\n\"").WriteText("peer" ).WriteText(iss).WriteText(CNCDistributionConf::GetPeerName( (*c)->m_SrvId)).WriteText("\"");
1513 task.WriteText(eol).WriteText("stuck" ).WriteText(is ).WriteBool( (*c)->IsStuck());
1514 task.WriteText(eol).WriteText("slot" ).WriteText(is ).WriteNumber( (*c)->m_Slot);
1515 task.WriteText(eol).WriteText("started_cmds" ).WriteText(is ).WriteNumber( (*c)->m_StartedCmds);
1516 CSrvTime((*c)->m_StartTime / kUSecsPerSecond).Print(buf, CSrvTime::eFmtJson);
1517 task.WriteText(eol).WriteText("sync_StartTime").WriteText(is ).WriteText( buf);
1518 CSrvTime((*c)->m_LoopStart / kUSecsPerSecond).Print(buf, CSrvTime::eFmtJson);
1519 task.WriteText(eol).WriteText("slot_LoopStart").WriteText(is ).WriteText( buf);
1520 task.WriteText(eol).WriteText("cnt_Unfinished").WriteText(is ).WriteNumber( (*c)->m_CntUnfinished);
1521
1522 task.WriteText(eol).WriteText("Events2Get_size").WriteText(is ).WriteNumber( (*c)->m_Events2Get.size());
1523 task.WriteText(eol).WriteText("Events2Send_size").WriteText(is ).WriteNumber( (*c)->m_Events2Send.size());
1524 task.WriteText(eol).WriteText("LocalBlobs_size").WriteText(is ).WriteNumber( (*c)->m_LocalBlobs.size());
1525 task.WriteText(eol).WriteText("RemoteBlobs_size").WriteText(is ).WriteNumber( (*c)->m_RemoteBlobs.size());
1526 task.WriteText(eol).WriteText("NextTask").WriteText(is ).WriteNumber( (int)(*c)->m_NextTask);
1527 task.WriteText(eol).WriteText("Result").WriteText(is ).WriteNumber( (int)(*c)->m_Result);
1528 task.WriteText(eol).WriteText("Hint").WriteText(is ).WriteNumber( (int)(*c)->m_Hint);
1529 task.WriteText(eol).WriteText("Progress").WriteText(is ).WriteNumber( (int)(*c)->m_Progress);
1530 task.WriteText(eol).WriteText("FinishSyncCalled").WriteText(is ).WriteBool( (*c)->m_FinishSyncCalled);
1531 task.WriteText(eol).WriteText("NeedReply").WriteText(is ).WriteBool( (*c)->m_NeedReply);
1532
1533 CSrvTime((*c)->m_LastActive).Print(buf, CSrvTime::eFmtJson);
1534 task.WriteText(eol).WriteText("LastActive").WriteText(is ).WriteText( buf);
1535 task.WriteText(eol).WriteText("cntHandlers").WriteText(is ).WriteNumber( (*c)->m_SyncHandlers.size());
1536 if (is_audit) {
1537 task.WriteText(eol).WriteText("Handlers").WriteText(is );
1538 task.WriteText("[");
1539 bool first = true;
1540 ITERATE(set<CNCActiveHandler*>, h, (*c)->m_SyncHandlers) {
1541 if (first) {
1542 first = false;
1543 } else {
1544 task.WriteText(",");
1545 }
1546 CSrvTime((*h)->m_LastActive).Print(buf, CSrvTime::eFmtJson);
1547 task.WriteText("\n{\"");
1548 task.WriteText("LastActive").WriteText(is ).WriteText( buf);
1549
1550 task.WriteText(eol).WriteText("m_Client").WriteText(is ).WriteBool( (*h)->m_Client != nullptr);
1551 task.WriteText(eol).WriteText("m_SyncCtrl").WriteText(is ).WriteBool( (*h)->m_SyncCtrl != nullptr);
1552 task.WriteText(eol).WriteText("m_Proxy").WriteText(is ).WriteBool( (*h)->m_Proxy != nullptr);
1553
1554 task.WriteText(eol).WriteText("m_ProcessingStarted").WriteText(is ).WriteBool( (*h)->m_ProcessingStarted);
1555 task.WriteText(eol).WriteText("m_CmdStarted").WriteText(is ).WriteBool( (*h)->m_CmdStarted);
1556 task.WriteText(eol).WriteText("m_GotAnyAnswer").WriteText(is ).WriteBool( (*h)->m_GotAnyAnswer);
1557 task.WriteText(eol).WriteText("m_GotCmdAnswer").WriteText(is ).WriteBool( (*h)->m_GotCmdAnswer);
1558 task.WriteText(eol).WriteText("m_GotClientResponse").WriteText(is ).WriteBool( (*h)->m_GotClientResponse);
1559 task.WriteText(eol).WriteText("m_BlobExists").WriteText(is ).WriteBool( (*h)->m_BlobExists);
1560 task.WriteText(eol).WriteText("m_CmdSuccess").WriteText(is ).WriteBool( (*h)->m_CmdSuccess);
1561 task.WriteText(eol).WriteText("m_CmdFromClient").WriteText(is ).WriteBool( (*h)->m_CmdFromClient);
1562 task.WriteText(eol).WriteText("m_Purge").WriteText(is ).WriteBool( (*h)->m_Purge);
1563
1564 task.WriteText(eol).WriteText("m_CmdToSend").WriteText(iss ).WriteText( (*h)->m_CmdToSend).WriteText("\"");;
1565 task.WriteText(eol).WriteText("m_Response").WriteText(iss ).WriteText( (*h)->m_Response).WriteText("\"");;
1566 task.WriteText(eol).WriteText("m_ErrMsg").WriteText(iss ).WriteText( (*h)->m_ErrMsg).WriteText("\"");;
1567
1568 task.WriteText("}");
1569 }
1570 task.WriteText("]");
1571 }
1572 task.WriteText("\n}");
1573 }
1574 task.WriteText("],\n");
1575
1576 bool is_first = true;
1577 task.WriteText("\"SlotsList\": [");
1578 ITERATE(TSyncSlotsList, sl, s_SlotsList) {
1579 if (one_slot) {
1580 if ((*sl)->slot != slot) {
1581 continue;
1582 }
1583 } else {
1584 if ((*sl)->cnt_sync_started == 0) {
1585 continue;
1586 }
1587 }
1588 if (!is_first) {
1589 task.WriteText(",");
1590 }
1591 is_first = false;
1592 task.WriteText("{");
1593 task.WriteText("\n\"").WriteText("slot" ).WriteText(is ).WriteNumber( (*sl)->slot);
1594 task.WriteText(eol).WriteText("cnt_sync_started").WriteText(is ).WriteNumber( (*sl)->cnt_sync_started);
1595 task.WriteText(eol).WriteText("cleaning" ).WriteText(is ).WriteBool( (*sl)->cleaning);
1596 task.WriteText(eol).WriteText("clean_required" ).WriteText(is ).WriteBool( (*sl)->clean_required);
1597 task.WriteText(eol).WriteText("srvs" ).WriteText(is );
1598 task.WriteText("\n[");
1599 ITERATE(TSlotSrvsList, srv, (*sl)->srvs) {
1600 if (srv != (*sl)->srvs.begin()) {
1601 task.WriteText(",");
1602 }
1603 task.WriteText("{");
1604 task.WriteText("\n\"").WriteText("peer" ).WriteText(iss).WriteText(CNCDistributionConf::GetPeerName( (*srv)->peer->GetSrvId())).WriteText("\"");
1605 task.WriteText(eol).WriteText("sync_started" ).WriteText(is ).WriteBool( (*srv)->sync_started);
1606 task.WriteText(eol).WriteText("is_passive" ).WriteText(is ).WriteBool( (*srv)->is_passive);
1607 task.WriteText(eol).WriteText("is_by_blobs" ).WriteText(is ).WriteBool( (*srv)->is_by_blobs);
1608 task.WriteText(eol).WriteText("was_blobs_sync" ).WriteText(is ).WriteBool( (*srv)->was_blobs_sync);
1609 task.WriteText(eol).WriteText("cnt_event_sync" ).WriteText(is ).WriteNumber( (*srv)->cnt_event_sync);
1610 task.WriteText(eol).WriteText("result" ).WriteText(is ).WriteNumber( (int)(*srv)->result);
1611 task.WriteText(eol).WriteText("hint" ).WriteText(is ).WriteNumber( (*srv)->hint);
1612 task.WriteText(eol).WriteText("made_initial_sync").WriteText(is ).WriteBool( (*srv)->made_initial_sync);
1613 task.WriteText(eol).WriteText("started_cmds" ).WriteText(is ).WriteNumber( (*srv)->started_cmds);
1614 CSrvTime((*srv)->next_sync_time / kUSecsPerSecond).Print(buf, CSrvTime::eFmtJson);
1615 task.WriteText(eol).WriteText("next_sync_time" ).WriteText(is ).WriteText( buf);
1616 CSrvTime((*srv)->last_active_time).Print(buf, CSrvTime::eFmtJson);
1617 task.WriteText(eol).WriteText("last_active_time" ).WriteText(is ).WriteText( buf);
1618 CSrvTime((*srv)->last_success_time).Print(buf, CSrvTime::eFmtJson);
1619 task.WriteText(eol).WriteText("last_success_time").WriteText(is ).WriteText( buf);
1620 task.WriteText("\n}");
1621 }
1622 task.WriteText("]\n}");
1623 }
1624 task.WriteText("]");
1625 }
1626
1627 END_NCBI_SCOPE
1628