1 /*  $Id: periodic_sync.cpp 633628 2021-06-22 18:24:03Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Denis Vakatov, Pavel Ivanov, Sergey Satskiy
27  *
28  * File Description: Data structures and API to support blobs mirroring.
29  *
30  */
31 
32 #include "nc_pch.hpp"
33 
34 #include <corelib/request_ctx.hpp>
35 #include <util/random_gen.hpp>
36 
37 #include "netcached.hpp"
38 #include "periodic_sync.hpp"
39 #include "distribution_conf.hpp"
40 #include "sync_log.hpp"
41 #include "peer_control.hpp"
42 #include "active_handler.hpp"
43 #include "nc_storage.hpp"
44 #include "nc_stat.hpp"
45 #include <random>
46 
47 
48 BEGIN_NCBI_SCOPE
49 
50 
51 static TSyncSlotsList   s_SlotsList;
52 static TSyncSlotsMap    s_SlotsMap;
53 static CMiniMutex       s_RndLock;
54 static CRandom          s_Rnd(CRandom::TValue(time(NULL)));
55 
56 typedef vector<CNCActiveSyncControl*> TSyncControls;
57 static TSyncControls s_SyncControls;
58 static CNCLogCleaner* s_LogCleaner;
59 
60 static FILE* s_LogFile = NULL;
61 
62 
63 template <typename Type> void
s_ShuffleList(vector<Type> & lst)64 s_ShuffleList( vector<Type>& lst)
65 {
66     random_device rd;
67     mt19937 mt(rd());
68     shuffle(lst.begin(), lst.end(), mt);
69 }
70 
71 static void
s_ShuffleSrvsLists(void)72 s_ShuffleSrvsLists(void)
73 {
74     TSyncSlotsList::const_iterator sl = s_SlotsList.begin();
75     for ( ; sl != s_SlotsList.end(); ++sl) {
76         (*sl)->lock.Lock();
77         s_ShuffleList<SSyncSlotSrv*>((*sl)->srvs);
78         (*sl)->lock.Unlock();
79     }
80 }
81 
82 
83 static void
s_FindServerSlot(Uint8 server_id,Uint2 slot,SSyncSlotData * & slot_data,SSyncSlotSrv * & slot_srv)84 s_FindServerSlot(Uint8 server_id,
85                  Uint2 slot,
86                  SSyncSlotData*& slot_data,
87                  SSyncSlotSrv*&  slot_srv)
88 {
89     slot_data = NULL;
90     slot_srv = NULL;
91     TSyncSlotsMap::const_iterator it_slot = s_SlotsMap.find(slot);
92     if (it_slot == s_SlotsMap.end())
93         return;
94     slot_data = it_slot->second;
95     slot_data->lock.Lock();
96     TSlotSrvsList srvs = slot_data->srvs;
97     slot_data->lock.Unlock();
98     ITERATE(TSlotSrvsList, it_srv, srvs) {
99         SSyncSlotSrv* this_srv = *it_srv;
100         if (this_srv->peer->GetSrvId() == server_id) {
101             slot_srv = this_srv;
102             return;
103         }
104     }
105 }
106 
107 static ESyncInitiateResult
s_StartSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,bool is_passive)108 s_StartSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, bool is_passive)
109 {
110     CMiniMutexGuard g_slot(slot_data->lock);
111     if (slot_data->cleaning  ||  slot_data->clean_required) {
112         return eServerBusy;
113     }
114     // there is a feeling (not knowledge) that more than one sync per slot is not good
115 #if 1
116     if (slot_data->cnt_sync_started != 0) {
117         return eServerBusy;
118     }
119 #endif
120 
121     CMiniMutexGuard g_srv(slot_srv->lock);
122     if (slot_srv->sync_started) {
123         if (!is_passive  ||  !slot_srv->is_passive  ||  slot_srv->started_cmds != 0) {
124             return eCrossSynced;
125         }
126         if (slot_srv->cnt_sync_ops != 0) {
127             CNCStat::PeerSyncFinished(slot_srv->peer->GetSrvId(), slot_data->slot, slot_srv->cnt_sync_ops, false);
128         }
129         slot_srv->sync_started = false;
130         if (slot_data->cnt_sync_started != 0) {
131             --slot_data->cnt_sync_started;
132         }
133     }
134 
135     if (!is_passive  &&  !slot_srv->peer->StartActiveSync()) {
136         return eServerBusy;
137     }
138     slot_srv->sync_started = true;
139     slot_srv->is_passive = is_passive;
140     slot_srv->cnt_sync_ops = 0;
141     slot_srv->last_active_time = CSrvTime::CurSecs();
142     ++slot_srv->cur_sync_id;
143     ++slot_data->cnt_sync_started;
144     return eProceedWithEvents;
145 }
146 
147 static void
s_StopSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,Uint8 next_delay,ESyncResult result,int hint)148 s_StopSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, Uint8 next_delay, ESyncResult result,  int hint)
149 {
150     slot_srv->peer->RegisterSyncStop(slot_srv->is_passive,
151                                      slot_srv->next_sync_time,
152                                      next_delay);
153 #ifdef _DEBUG
154     slot_srv->peer->RegisterSyncStat(slot_srv->is_passive, slot_srv->is_by_blobs, result, hint);
155 #endif
156     slot_srv->sync_started = false;
157     slot_srv->started_cmds = 0;
158     slot_srv->is_passive = true;
159     slot_srv->result = result;
160     slot_srv->hint = hint;
161     if (slot_data->cnt_sync_started != 0) {
162         --slot_data->cnt_sync_started;
163     }
164 /*
165     else {
166         SRV_LOG(Error, "SyncSlotData broken");
167     }
168 */
169     if (slot_data->cnt_sync_started == 0  &&  slot_data->clean_required)
170         s_LogCleaner->SetRunnable();
171 }
172 
173 static void
s_CancelSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,Uint8 next_delay,ESyncResult result,int hint)174 s_CancelSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, Uint8 next_delay, ESyncResult result, int hint)
175 {
176     CNCStat::PeerSyncFinished(slot_srv->peer->GetSrvId(), slot_data->slot, slot_srv->cnt_sync_ops, false);
177     s_StopSync(slot_data, slot_srv, next_delay, result, hint);
178 }
179 
180 static void
s_CommitSync(SSyncSlotData * slot_data,SSyncSlotSrv * slot_srv,int hint)181 s_CommitSync(SSyncSlotData* slot_data, SSyncSlotSrv* slot_srv, int hint)
182 {
183     slot_srv->last_success_time = CSrvTime::CurSecs();
184     CNCStat::PeerSyncFinished(slot_srv->peer->GetSrvId(), slot_data->slot, slot_srv->cnt_sync_ops, true);
185     slot_srv->peer->ConnOk();
186     slot_srv->was_blobs_sync = slot_srv->is_by_blobs;
187     slot_srv->cnt_event_sync = slot_srv->is_by_blobs ? 0 : (slot_srv->cnt_event_sync + 1);
188     if (!slot_srv->made_initial_sync)
189     {
190         slot_srv->made_initial_sync = true;
191         slot_srv->peer->AddInitiallySyncedSlot();
192     }
193     s_StopSync(slot_data, slot_srv, CNCDistributionConf::GetPeriodicSyncInterval(), eSynOK, hint);
194 }
195 
196 static void
s_DoCleanLog(CNCLogCleaner * cleaner,Uint2 slot)197 s_DoCleanLog(CNCLogCleaner* cleaner, Uint2 slot)
198 {
199     cleaner->CreateNewDiagCtx();
200     CSrvDiagMsg().StartRequest()
201                  .PrintParam("_type", "clean")
202                  .PrintParam("slot", slot);
203 
204     Uint8 cleaned = CNCSyncLog::Clean(slot);
205     cleaner->GetDiagCtx()->SetBytesWr(Int8(cleaned));
206     CSrvDiagMsg().StopRequest();
207     cleaner->ReleaseDiagCtx();
208 }
209 
210 void
ExecuteSlice(TSrvThreadNum)211 CNCLogCleaner::ExecuteSlice(TSrvThreadNum /* thr_idx */)
212 {
213     if (CTaskServer::IsInShutdown())
214         return;
215 
216 // clean operation log used for synchronization between peers
217 // that is remove already synced ones
218 
219     if (m_NextSlotIt == s_SlotsList.end()) {
220         m_NextSlotIt = s_SlotsList.begin();
221         RunAfter(CNCDistributionConf::GetCleanAttemptInterval());
222         return;
223     }
224 
225     Uint8 min_period = CNCDistributionConf::GetMinForcedCleanPeriod();
226 
227     SSyncSlotData* slot_data = *m_NextSlotIt;
228     Uint2 slot = slot_data->slot;
229     slot_data->lock.Lock();
230 // if no sync currently in progress
231     if (slot_data->cnt_sync_started == 0) {
232         slot_data->cleaning = true;
233         slot_data->lock.Unlock();
234         s_DoCleanLog(this, slot);
235         slot_data->lock.Lock();
236         slot_data->cleaning = false;
237         if (slot_data->clean_required) {
238             slot_data->clean_required = false;
239             m_LastForceTime[slot] = CSrvTime::Current().AsUSec();
240         }
241     }
242     else if (!slot_data->clean_required
243                 &&  !CNCPeerControl::HasServersForInitSync()
244                 &&  CNCSyncLog::IsOverLimit(slot)
245                 &&  CSrvTime::Current().AsUSec() - m_LastForceTime[slot] >= min_period)
246     {
247         slot_data->clean_required = true;
248     }
249     slot_data->lock.Unlock();
250 
251     ++m_NextSlotIt;
252     SetRunnable();
253 }
254 
CNCLogCleaner(void)255 CNCLogCleaner::CNCLogCleaner(void)
256 {
257 #if __NC_TASKS_MONITOR
258     m_TaskName = "CNCLogCleaner";
259 #endif
260     m_NextSlotIt = s_SlotsList.begin();
261 }
262 
~CNCLogCleaner(void)263 CNCLogCleaner::~CNCLogCleaner(void)
264 {}
265 
266 
267 
SSyncSlotData(Uint2 slot_)268 SSyncSlotData::SSyncSlotData(Uint2 slot_)
269     : slot(slot_),
270       cnt_sync_started(0),
271       cleaning(false),
272       clean_required(false)
273 {}
274 
SSyncSlotSrv(CNCPeerControl * peer_)275 SSyncSlotSrv::SSyncSlotSrv(CNCPeerControl* peer_)
276     : peer(peer_),
277       sync_started(false),
278       is_passive(false),
279       is_by_blobs(false),
280       was_blobs_sync(false),
281       made_initial_sync(false),
282       cnt_event_sync(0),
283       started_cmds(0),
284       next_sync_time(0),
285       last_active_time(CSrvTime::Current().Sec()),
286       last_success_time(0),
287       cur_sync_id(0),
288       cnt_sync_ops(0),
289       result(eSynOK),
290       hint(0)
291 {}
292 
293 bool
Initialize(void)294 CNCPeriodicSync::Initialize(void)
295 {
296     if (!CNCDistributionConf::GetPeriodicLogFile().empty()) {
297         s_LogFile = fopen(CNCDistributionConf::GetPeriodicLogFile().c_str(), "a");
298     }
299 
300     const vector<Uint2>& slots = CNCDistributionConf::GetSelfSlots();
301     for (Uint2 i = 0; i < slots.size(); ++i) {
302         SSyncSlotData* data = new SSyncSlotData(slots[i]);
303         s_SlotsList.push_back(data);
304         s_SlotsMap[data->slot] = data;
305     }
306     s_ShuffleList<SSyncSlotData*>(s_SlotsList);
307 
308     Uint4 cnt_to_sync = 0;
309     const TNCPeerList& peers = CNCDistributionConf::GetPeers();
310     ITERATE(TNCPeerList, it_peer, peers) {
311         CNCPeerControl* peer = CNCPeerControl::Peer(it_peer->first);
312         const vector<Uint2>& commonSlots = CNCDistributionConf::GetCommonSlots(it_peer->first);
313         ITERATE(vector<Uint2>, it_slot, commonSlots) {
314             SSyncSlotData* slot_data = s_SlotsMap[*it_slot];
315             SSyncSlotSrv* slot_srv = new SSyncSlotSrv(peer);
316             slot_data->srvs.push_back(slot_srv);
317         }
318         peer->SetSlotsForInitSync(Uint2(commonSlots.size()));
319         if (!commonSlots.empty()) {
320             CNCStat::AddSyncServer(it_peer->first);
321             ++cnt_to_sync;
322         }
323     }
324     s_ShuffleSrvsLists();
325     CNCPeerControl::SetServersForInitSync(cnt_to_sync);
326 
327     Uint1 cnt_syncs = CNCDistributionConf::GetCntActiveSyncs();
328     for (Uint1 i = 0; i < cnt_syncs; ++i) {
329         s_SyncControls.push_back(new CNCActiveSyncControl());
330         if (i == 0) {
331             s_SyncControls[0]->SetFirst();
332         }
333         s_SyncControls[i]->SetRunnable();
334     }
335 
336     s_LogCleaner = new CNCLogCleaner();
337     s_LogCleaner->SetRunnable();
338 
339     if (cnt_to_sync == 0)
340         CNCServer::InitialSyncComplete();
341 
342     return true;
343 }
344 
345 void
ReConfig(void)346 CNCPeriodicSync::ReConfig(void)
347 {
348 // all peers, old and new
349     set<CNCPeerControl*> all_peers;
350 // map of common slots
351     map< Uint8, set<Uint2> > peer_slots;
352     const TNCPeerList& peers = CNCDistributionConf::GetPeers();
353     ITERATE(TNCPeerList, it_peer, peers) {
354         const vector<Uint2>& commonSlots = CNCDistributionConf::GetCommonSlots(it_peer->first);
355         peer_slots[it_peer->first].insert(commonSlots.begin(), commonSlots.end());
356     }
357 // self slots do not change
358     const vector<Uint2>& self_slots = CNCDistributionConf::GetSelfSlots();
359 // for each self slot
360     ITERATE(vector<Uint2>, it_slot, self_slots) {
361         SSyncSlotData* slot_data = s_SlotsMap[*it_slot];
362         slot_data->lock.Lock();
363         set<Uint8> processed;
364 // verify old peers
365         ERASE_ITERATE(TSlotSrvsList, it_srv, slot_data->srvs) {
366             Uint8 srv_id = (*it_srv)->peer->GetSrvId();
367             all_peers.insert((*it_srv)->peer);
368             processed.insert(srv_id);
369             map< Uint8, set<Uint2> >::const_iterator p = peer_slots.find(srv_id);
370             if (p != peer_slots.end() && p->second.find( *it_slot) != p->second.end()) {
371 // peer still exists and this slot is still common
372                 continue;
373             }
374 // peer is removed, or there is no common slots - remove this server from SyncData
375             VECTOR_ERASE(it_srv, slot_data->srvs);
376         }
377 // look for new peers
378         for (map< Uint8, set<Uint2> >::const_iterator p = peer_slots.begin();
379             p != peer_slots.end(); ++p) {
380             if (processed.find(p->first) != processed.end()) {
381                 continue;
382             }
383             processed.insert(p->first);
384             CNCPeerControl* peer = CNCPeerControl::Peer( p->first);
385             all_peers.insert(peer);
386 // if we have common slot with this peer, add it into SyncData
387             if (p->second.find( *it_slot) != p->second.end()) {
388                 SSyncSlotSrv* slot_srv = new SSyncSlotSrv( peer);
389                 slot_data->srvs.push_back( slot_srv);
390             }
391         }
392         slot_data->lock.Unlock();
393     }
394 // now get/change some counters
395     Uint4 cnt_to_sync = 0;
396     ITERATE( set<CNCPeerControl*>, it_peer, all_peers) {
397         map< Uint8, set<Uint2> >::const_iterator p = peer_slots.find((*it_peer)->GetSrvId());
398         if (p != peer_slots.end()) {
399             (*it_peer)->ReconfSlotsForInitSync(Uint2(p->second.size()));
400             if (p->second.size() != 0) {
401                 ++cnt_to_sync;
402                 CNCStat::AddSyncServer(p->first);
403             }
404         }
405     }
406     CNCPeerControl::ReconfServersForInitSync(cnt_to_sync);
407 }
408 
409 void
ReInitialize(void)410 CNCPeriodicSync::ReInitialize(void)
411 {
412     ITERATE(TSyncControls, c, s_SyncControls) {
413         if (!(*c)->IsStuck()) {
414             return;
415         }
416     }
417     CNCServer::InitialSyncRequired();
418     CNCPeerControl::ResetServersForInitSync();
419     ITERATE(TSyncSlotsList, sl, s_SlotsList) {
420         ITERATE(TSlotSrvsList, srv, (*sl)->srvs) {
421             (*srv)->made_initial_sync = false;
422             (*srv)->peer->ResetSlotsForInitSync();
423         }
424     }
425 }
426 
427 void
Finalize(void)428 CNCPeriodicSync::Finalize(void)
429 {
430     if (s_LogFile)
431         fclose(s_LogFile);
432 }
433 
434 ESyncInitiateResult
Initiate(Uint8 server_id,Uint2 slot,Uint8 * local_start_rec_no,Uint8 * remote_start_rec_no,TReducedSyncEvents * events,Uint8 * sync_id)435 CNCPeriodicSync::Initiate(Uint8  server_id,
436                           Uint2  slot,
437                           Uint8* local_start_rec_no,
438                           Uint8* remote_start_rec_no,
439                           TReducedSyncEvents* events,
440                           Uint8* sync_id)
441 {
442     SSyncSlotData* slot_data;
443     SSyncSlotSrv* slot_srv;
444     s_FindServerSlot(server_id, slot, slot_data, slot_srv);
445     if (slot_srv) {
446         slot_srv->peer->RegisterConnSuccess();
447     }
448     if (slot_srv == NULL) {
449         return eUnknownServer;
450     }
451     if ( slot_data->cnt_sync_started != 0 ||  CTaskServer::IsInShutdown()) {
452         return eServerBusy;
453     }
454 
455     ESyncInitiateResult init_res = s_StartSync(slot_data, slot_srv, true);
456     if (init_res != eProceedWithEvents)
457         return init_res;
458 
459     slot_srv->started_cmds = 1;
460     *sync_id = slot_srv->cur_sync_id;
461     bool records_available = CNCSyncLog::GetEventsList(server_id,
462                                                        slot,
463                                                        local_start_rec_no,
464                                                        remote_start_rec_no,
465                                                        events);
466     if (slot_srv->last_success_time == 0) {
467         slot_srv->is_by_blobs = true;
468         return eProceedWithBlobs;
469     }
470     if (records_available
471 //        ||  (CNCSyncLog::GetLogSize(slot) == 0  &&  slot_srv->was_blobs_sync))
472         ||  (CNCSyncLog::GetLogSize(slot) == 0  &&  slot_srv->cnt_event_sync < 8))
473     {
474         slot_srv->is_by_blobs = false;
475         return eProceedWithEvents;
476     }
477     else {
478         slot_srv->is_by_blobs = true;
479         return eProceedWithBlobs;
480     }
481 }
482 
483 ESyncInitiateResult
CanStartSyncCommand(Uint8 server_id,Uint2 slot,bool can_abort,Uint8 & sync_id)484 CNCPeriodicSync::CanStartSyncCommand(Uint8  server_id,
485                                      Uint2  slot,
486                                      bool   can_abort,
487                                      Uint8& sync_id)
488 {
489     SSyncSlotData* slot_data;
490     SSyncSlotSrv* slot_srv;
491     s_FindServerSlot(server_id, slot, slot_data, slot_srv);
492     if (slot_srv == NULL)
493         return eNetworkError;
494 
495     CMiniMutexGuard g_slot(slot_data->lock);
496     if (slot_data->clean_required  &&  can_abort) {
497         return eServerBusy;
498     }
499 
500     CMiniMutexGuard g_srv(slot_srv->lock);
501     if (!slot_srv->sync_started  ||  !slot_srv->is_passive)
502         return eNetworkError;
503 
504     ++slot_srv->started_cmds;
505     ++slot_srv->cnt_sync_ops;
506     slot_srv->last_active_time = CSrvTime::CurSecs();
507     sync_id = slot_srv->cur_sync_id;
508     return eProceedWithEvents;
509 }
510 
511 void
MarkCurSyncByBlobs(Uint8 server_id,Uint2 slot,Uint8 sync_id)512 CNCPeriodicSync::MarkCurSyncByBlobs(Uint8 server_id, Uint2 slot, Uint8 sync_id)
513 {
514     SSyncSlotData* slot_data;
515     SSyncSlotSrv* slot_srv;
516     s_FindServerSlot(server_id, slot, slot_data, slot_srv);
517 
518     CMiniMutexGuard g_slot(slot_data->lock);
519     CMiniMutexGuard g_srv(slot_srv->lock);
520     if (slot_srv->sync_started  &&  slot_srv->is_passive
521         &&  slot_srv->cur_sync_id == sync_id)
522     {
523         slot_srv->is_by_blobs = true;
524     }
525 }
526 
527 void
SyncCommandFinished(Uint8 server_id,Uint2 slot,Uint8 sync_id)528 CNCPeriodicSync::SyncCommandFinished(Uint8 server_id, Uint2 slot, Uint8 sync_id)
529 {
530     SSyncSlotData* slot_data;
531     SSyncSlotSrv* slot_srv;
532     s_FindServerSlot(server_id, slot, slot_data, slot_srv);
533 
534     CMiniMutexGuard g_slot(slot_data->lock);
535     CMiniMutexGuard g_srv(slot_srv->lock);
536     if (slot_srv->sync_started  &&  slot_srv->is_passive
537         &&  slot_srv->cur_sync_id == sync_id)
538     {
539         if (slot_srv->started_cmds == 0) {
540             SRV_FATAL("SyncSlotData broken");
541         }
542         --slot_srv->started_cmds;
543         slot_srv->last_active_time = CSrvTime::CurSecs();
544     }
545 }
546 
547 void
Cancel(Uint8 server_id,Uint2 slot,Uint8 sync_id)548 CNCPeriodicSync::Cancel(Uint8 server_id, Uint2 slot, Uint8 sync_id)
549 {
550     SSyncSlotData* slot_data;
551     SSyncSlotSrv* slot_srv;
552     s_FindServerSlot(server_id, slot, slot_data, slot_srv);
553 
554     CMiniMutexGuard g_slot(slot_data->lock);
555     CMiniMutexGuard g_srv(slot_srv->lock);
556     if (slot_srv->sync_started  &&  slot_srv->is_passive
557         &&  slot_srv->cur_sync_id == sync_id)
558     {
559         if (slot_srv->started_cmds != 0) {
560             --slot_srv->started_cmds;
561         }
562         --slot_srv->cnt_sync_ops;
563         s_CancelSync(slot_data, slot_srv, 0, eSynAborted, 0);
564     }
565 }
566 
567 void
Commit(Uint8 server_id,Uint2 slot,Uint8 sync_id,Uint8 local_synced_rec_no,Uint8 remote_synced_rec_no)568 CNCPeriodicSync::Commit(Uint8 server_id,
569                         Uint2 slot,
570                         Uint8 sync_id,
571                         Uint8 local_synced_rec_no,
572                         Uint8 remote_synced_rec_no)
573 {
574     CNCSyncLog::SetLastSyncRecNo(server_id, slot,
575                                  local_synced_rec_no,
576                                  remote_synced_rec_no);
577     CNCBlobStorage::SaveMaxSyncLogRecNo();
578 
579     SSyncSlotData* slot_data;
580     SSyncSlotSrv* slot_srv;
581     s_FindServerSlot(server_id, slot, slot_data, slot_srv);
582 
583     CMiniMutexGuard g_slot(slot_data->lock);
584     CMiniMutexGuard g_srv(slot_srv->lock);
585     if (slot_srv->sync_started  &&  slot_srv->is_passive
586         &&  slot_srv->cur_sync_id == sync_id)
587     {
588         --slot_srv->cnt_sync_ops;
589         s_CommitSync(slot_data, slot_srv, 0);
590     }
591 }
592 
593 
CNCActiveSyncControl(void)594 CNCActiveSyncControl::CNCActiveSyncControl(void)
595 {
596 #if __NC_TASKS_MONITOR
597     m_TaskName = "CNCActiveSyncControl";
598 #endif
599     SetState(&CNCActiveSyncControl::x_StartScanSlots);
600     m_ForceInitSync = false;
601     m_Stuck = false;
602     m_First = false;
603     m_NeedReply = false;
604     m_StartTime = 0;
605     m_LoopStart = 0;
606     m_CntUnfinished = 0;
607     m_MyTrust = m_TheirTrust = 0;
608 }
609 
~CNCActiveSyncControl(void)610 CNCActiveSyncControl::~CNCActiveSyncControl(void) {
611 }
612 
613 
614 CNCActiveSyncControl::State
x_StartScanSlots(void)615 CNCActiveSyncControl::x_StartScanSlots(void)
616 {
617     if (CTaskServer::IsInShutdown()) {
618         return NULL;
619     }
620     if ( m_CntUnfinished == 0 && CNCBlobStorage::NeedStopWrite()) {
621         // in this scenario, I do not start sync, but still accept sync requests from others
622         RunAfter(CNCDistributionConf::GetPeriodicSyncInterval() / kUSecsPerSecond);
623         if (!m_Stuck && !CNCBlobStorage::IsDraining() &&
624             !CNCDistributionConf::GetPeers().empty()) {
625             m_Stuck = true;
626             CNCPeriodicSync::ReInitialize();
627         }
628         return NULL;
629     }
630     if (m_First) {
631         const TNCPeerList& peers = CNCDistributionConf::GetPeers();
632         ITERATE(TNCPeerList, it_peer, peers) {
633             CNCPeerControl::Peer(it_peer->first)->PeerHandshake();
634         }
635     }
636     m_Stuck = false;
637 
638     m_DidSync = false;
639     m_MinNextTime = numeric_limits<Uint8>::max();
640     m_LoopStart = CSrvTime::Current().AsUSec();
641     m_NextSlotIt = s_SlotsList.begin();
642     m_VisitedSrv.clear();
643     m_CntUnfinished = 0;
644     return &CNCActiveSyncControl::x_CheckSlotOurSync;
645 }
646 
647 CNCActiveSyncControl::State
x_CheckSlotOurSync(void)648 CNCActiveSyncControl::x_CheckSlotOurSync(void)
649 {
650     if (CTaskServer::IsInShutdown())
651         return NULL;
652     if (m_NextSlotIt == s_SlotsList.end())
653         return &CNCActiveSyncControl::x_FinishScanSlots;
654 
655     m_SlotData = *m_NextSlotIt;
656     m_SlotData->lock.Lock();
657     TSlotSrvsList srvs = m_SlotData->srvs;
658     m_SlotData->lock.Unlock();
659     if ((m_SlotData->cnt_sync_started == 0  ||  m_ForceInitSync) && !CNCBlobStorage::NeedStopWrite()) {
660         ITERATE(TSlotSrvsList, it_srv, srvs) {
661             m_SlotSrv = *it_srv;
662             if (m_VisitedSrv.find(m_SlotSrv) != m_VisitedSrv.end()) {
663                 continue;
664             }
665             Uint8 next_time = max(m_SlotSrv->next_sync_time,
666                                   m_SlotSrv->peer->GetNextSyncTime());
667             m_VisitedSrv.insert(m_SlotSrv);
668             if (next_time <= CSrvTime::Current().AsUSec() ||  !m_SlotSrv->made_initial_sync) {
669                 return &CNCActiveSyncControl::x_DoPeriodicSync;
670             }
671         }
672     }
673 #if 0
674     else {
675         ITERATE(TSlotSrvsList, it_srv, srvs) {
676             SSyncSlotSrv* slot_srv = *it_srv;
677             if (slot_srv->sync_started) {
678                 if (CSrvTime::CurSecs() - slot_srv->last_active_time
679                             > (CNCDistributionConf::GetNetworkErrorTimeout() / kUSecsPerSecond)) {
680                     s_CancelSync(m_SlotData, slot_srv, 0);
681                     return &CNCActiveSyncControl::x_CheckSlotOurSync;
682                 } else {
683                     ++m_CntUnfinished;
684                 }
685             }
686         }
687     }
688 #endif
689     return &CNCActiveSyncControl::x_CheckSlotTheirSync;
690 }
691 
692 CNCActiveSyncControl::State
x_CheckSlotTheirSync(void)693 CNCActiveSyncControl::x_CheckSlotTheirSync(void)
694 {
695     if (CTaskServer::IsInShutdown())
696         return NULL;
697 
698     m_SlotData->lock.Lock();
699     ITERATE(TSlotSrvsList, it_srv, m_SlotData->srvs) {
700         SSyncSlotSrv* slot_srv = *it_srv;
701         slot_srv->lock.Lock();
702         if (slot_srv->sync_started) {
703             if (slot_srv->is_passive && slot_srv->started_cmds == 0 &&
704                 CSrvTime::CurSecs() - slot_srv->last_active_time
705                         > (CNCDistributionConf::GetPeriodicSyncTimeout() / kUSecsPerSecond)) {
706                 SRV_LOG(Warning, "Periodic sync canceled by timeout");
707                 s_CancelSync(m_SlotData, slot_srv, 0, eSynAborted, NC_SYNC_HINT);
708                 slot_srv->lock.Unlock();
709                 m_SlotData->lock.Unlock();
710                 return &CNCActiveSyncControl::x_CheckSlotTheirSync;
711             } else {
712                 ++m_CntUnfinished;
713             }
714         }
715         else {
716             Uint8 next_time = max(slot_srv->next_sync_time,
717                                   slot_srv->peer->GetNextSyncTime());
718 // calculate next time we need sync
719             m_MinNextTime = min(m_MinNextTime, next_time);
720         }
721         slot_srv->lock.Unlock();
722     }
723     m_SlotData->lock.Unlock();
724 
725     ++m_NextSlotIt;
726     m_VisitedSrv.clear();
727     m_NeedReply = false;
728     SetState(&CNCActiveSyncControl::x_CheckSlotOurSync);
729     SetRunnable();
730     return NULL;
731 }
732 
733 CNCActiveSyncControl::State
x_FinishScanSlots(void)734 CNCActiveSyncControl::x_FinishScanSlots(void)
735 {
736     Uint8 sync_interval = CNCDistributionConf::GetPeriodicSyncInterval();
737 // I am not sure using m_ForceInitSync makes sense
738 //    m_ForceInitSync = CNCPeerControl::HasServersForInitSync()  &&  !m_DidSync;
739     Uint8 now = CSrvTime::Current().AsUSec();
740     if (now - m_LoopStart >= sync_interval) {
741         s_ShuffleSrvsLists();
742     }
743 
744     Uint8 wait_time;
745     if (m_MinNextTime > now) {
746         wait_time = m_MinNextTime - now;
747         if (wait_time > sync_interval)
748             wait_time = sync_interval;
749     }
750     else {
751         s_RndLock.Lock();
752         wait_time = s_Rnd.GetRand(0, 10000);
753         s_RndLock.Unlock();
754     }
755 
756     Uint4 timeout_sec  = Uint4(wait_time / kUSecsPerSecond);
757     if (wait_time % kUSecsPerSecond)
758         ++timeout_sec;
759 
760     SetState(&CNCActiveSyncControl::x_StartScanSlots);
761     RunAfter(timeout_sec);
762     return NULL;
763 }
764 
765 CNCActiveSyncControl::State
x_DoPeriodicSync(void)766 CNCActiveSyncControl::x_DoPeriodicSync(void)
767 {
768     ESyncInitiateResult init_res = s_StartSync(m_SlotData, m_SlotSrv, false);
769     if (init_res != eProceedWithEvents) {
770         return &CNCActiveSyncControl::x_CheckSlotOurSync;
771     }
772 
773     m_SrvId = m_SlotSrv->peer->GetSrvId();
774     m_MyTrust = CNCDistributionConf::GetSelfTrustLevel();
775     m_TheirTrust = m_SlotSrv->peer->GetTrustLevel();
776     m_Slot = m_SlotData->slot;
777     m_Result = eSynOK;
778     m_Hint = NC_SYNC_HINT;
779     m_Progress = 0;
780     m_SlotSrv->is_by_blobs = false;
781     m_StartedCmds = 0;
782     m_FinishSyncCalled = false;
783     m_NextTask = eSynNoTask;
784     m_StartTime = CSrvTime::Current().AsUSec();
785 
786     m_ReadOK = m_ReadERR = 0;
787     m_WriteOK = m_WriteERR = 0;
788     m_ProlongOK = m_ProlongERR = 0;
789     m_DelOK = m_DelERR = 0;
790     m_NeedReply = false;
791 
792     CreateNewDiagCtx();
793     CSrvDiagMsg().StartRequest()
794                  .PrintParam("_type", "sync")
795                  .PrintParam("srv_id", m_SrvId)
796                  .PrintParam("slot", m_Slot)
797                  .PrintParam("self_id", CNCDistributionConf::GetSelfID());
798 
799     CNCSyncLog::GetLastSyncedRecNo(m_SrvId, m_Slot,
800                                    &m_LocalStartRecNo, &m_RemoteStartRecNo);
801 
802     CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn();
803     if (!conn) {
804         m_Result = eSynNetworkError;
805         m_Hint = NC_SYNC_HINT;
806         return &CNCActiveSyncControl::x_FinishSync;
807     }
808 
809     m_StartedCmds = 1;
810     m_SyncHandlers.clear();
811     conn->SyncStart(this, m_LocalStartRecNo, m_RemoteStartRecNo);
812     return &CNCActiveSyncControl::x_WaitSyncStarted;
813 }
814 
815 CNCActiveSyncControl::State
x_WaitSyncStarted(void)816 CNCActiveSyncControl::x_WaitSyncStarted(void)
817 {
818     // wait for sync started
819     // see CmdFinished()
820     if (m_StartedCmds != 0) {
821         return NULL;
822     }
823     if (CTaskServer::IsInShutdown()) {
824         m_Result = eSynAborted;
825         m_Hint = NC_SYNC_HINT;
826     }
827     if (m_Result != eSynOK)
828         return &CNCActiveSyncControl::x_FinishSync;
829 
830     m_NeedReply = true;
831     m_LocalSyncedRecNo = 0;
832     m_RemoteSyncedRecNo = 0;
833     m_SlotSrv->last_active_time = CSrvTime::CurSecs();
834     // depending on the reply
835     if (m_SlotSrv->is_by_blobs)
836         return &CNCActiveSyncControl::x_PrepareSyncByBlobs;
837     else
838         return &CNCActiveSyncControl::x_PrepareSyncByEvents;
839 }
840 
841 CNCActiveSyncControl::State
x_ExecuteSyncCommands(void)842 CNCActiveSyncControl::x_ExecuteSyncCommands(void)
843 {
844     x_CalcNextTask();
845     return &CNCActiveSyncControl::x_WaitForExecutingTasks;
846 }
847 
848 CNCActiveSyncControl::State
x_ExecuteFinalize(void)849 CNCActiveSyncControl::x_ExecuteFinalize(void)
850 {
851     if (m_FinishSyncCalled) {
852         SRV_FATAL("Command finalized already");
853     }
854     if (!CTaskServer::IsInShutdown()) {
855         if (m_SlotSrv->peer->FinishSync(this)) {
856             m_FinishSyncCalled = true;
857             return &CNCActiveSyncControl::x_WaitForExecutingTasks;
858         } else {
859             RunAfter(1);
860             return NULL;
861         }
862     }
863     m_Result = eSynNetworkError;
864     m_Hint = NC_SYNC_HINT;
865     return &CNCActiveSyncControl::x_FinishSync;
866 }
867 
868 CNCActiveSyncControl::State
x_WaitForExecutingTasks(void)869 CNCActiveSyncControl::x_WaitForExecutingTasks(void)
870 {
871     if (CTaskServer::IsInShutdown()) {
872         return NULL;
873     }
874     bool is_locked = false;
875     m_Lock.Lock(); is_locked = true;
876     if (m_NextTask == eSynNoTask) {
877         if (m_SyncHandlers.empty()) {
878             m_Lock.Unlock();
879             return &CNCActiveSyncControl::x_FinishSync;
880         }
881     } else if (m_NextTask == eSynNeedFinalize) {
882         if (m_SyncHandlers.empty()) {
883         	CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn();
884             if (conn) {
885                 m_SyncHandlers.insert(conn);
886                 SSyncTaskInfo task_info;
887                 GetNextTask(task_info);
888                 m_Lock.Unlock(); is_locked = false;
889                 ExecuteSyncTask(task_info, conn);
890             }
891         }
892     } else {
893         for (;m_NextTask > eSynNeedFinalize;) {
894         	CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn(true);
895             if (!conn) {
896                 break;
897             }
898             if (!is_locked) {m_Lock.Lock(); is_locked = true;}
899             m_SyncHandlers.insert(conn);
900             SSyncTaskInfo task_info;
901             GetNextTask(task_info);
902             m_Lock.Unlock(); is_locked = false;
903             ExecuteSyncTask(task_info, conn);
904         }
905     }
906     if (is_locked) {
907         if (m_SyncHandlers.empty()) {
908             m_Result = eSynServerBusy;
909             m_Hint = NC_SYNC_HINT;
910             m_Lock.Unlock();
911             return &CNCActiveSyncControl::x_FinishSync;
912         }
913         ITERATE(set<CNCActiveHandler*>, h, m_SyncHandlers) {
914             (*h)->CheckCommandTimeout();
915         }
916         m_Lock.Unlock();
917         RunAfter(1);
918     }
919     return NULL;
920 }
921 
922 CNCActiveSyncControl::State
x_FinishSync(void)923 CNCActiveSyncControl::x_FinishSync(void)
924 {
925     x_CleanSyncObjects();
926 
927     switch (m_Result) {
928     case eSynOK:
929         CNCBlobStorage::SaveMaxSyncLogRecNo();
930         break;
931     case eSynAborted:
932         GetDiagCtx()->SetRequestStatus(eStatus_SyncAborted);
933         break;
934     case eSynCrossSynced:
935         GetDiagCtx()->SetRequestStatus(eStatus_CrossSync);
936         break;
937     case eSynServerBusy:
938         GetDiagCtx()->SetRequestStatus(eStatus_SyncBusy);
939         break;
940     case eSynNetworkError:
941         GetDiagCtx()->SetRequestStatus(eStatus_PeerError);
942         break;
943     }
944 
945     CSrvDiagMsg().PrintExtra()
946                  .PrintParam("sync", (m_SlotSrv->is_by_blobs? "blobs": "events"))
947                  .PrintParam("r_ok", m_ReadOK)
948                  .PrintParam("r_err", m_ReadERR)
949                  .PrintParam("w_ok", m_WriteOK)
950                  .PrintParam("w_err", m_WriteERR)
951                  .PrintParam("p_ok", m_ProlongOK)
952                  .PrintParam("p_err", m_ProlongERR)
953                  .PrintParam("d_ok", m_DelOK)
954                  .PrintParam("d_err", m_DelERR);
955     CSrvDiagMsg().StopRequest();
956     ReleaseDiagCtx();
957 
958     if (s_LogFile) {
959         Uint8 end_time = CSrvTime::Current().AsUSec();
960         Uint8 log_size = CNCSyncLog::GetLogSize();
961         fprintf(s_LogFile,
962                 "%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
963                 ",%u,%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
964                 ",%" NCBI_UINT8_FORMAT_SPEC
965                 ",%d,%d,%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
966                 ",%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
967                 ",%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
968                 ",%" NCBI_UINT8_FORMAT_SPEC ",%u,%u\n",
969                 CNCDistributionConf::GetSelfID(), m_SrvId, m_Slot,
970                 m_StartTime, end_time, end_time - m_StartTime,
971                 int(m_SlotSrv->is_by_blobs), m_Result, log_size,
972                 m_ReadOK, m_ReadERR, m_WriteOK, m_WriteERR,
973                 m_ProlongOK, m_ProlongERR,
974                 Uint4(CNCPeerControl::sm_TotalCopyRequests.Get()),
975                 Uint4(CNCPeerControl::sm_CopyReqsRejected.Get()));
976         fflush(s_LogFile);
977     }
978 
979     CMiniMutexGuard g_slot(m_SlotData->lock);
980     CMiniMutexGuard g_srv(m_SlotSrv->lock);
981     if (m_Result == eSynOK) {
982         s_CommitSync(m_SlotData, m_SlotSrv, NC_SYNC_HINT);
983     } else {
984 //        m_SlotSrv->peer->RemoveSyncControl(this);
985         s_CancelSync(m_SlotData, m_SlotSrv, CNCDistributionConf::GetFailedSyncRetryDelay(), m_Result, m_Hint);
986     }
987     m_DidSync = m_Result == eSynOK;
988 
989     SetState(&CNCActiveSyncControl::x_CheckSlotOurSync);
990     SetRunnable();
991     return NULL;
992 }
993 
994 CNCActiveSyncControl::State
x_PrepareSyncByEvents(void)995 CNCActiveSyncControl::x_PrepareSyncByEvents(void)
996 {
997     m_Events2Get.clear();
998     m_Events2Send.clear();
999     m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1000 #if 0
1001     if (CNCSyncLog::GetSyncOperations(m_SrvId, m_Slot,
1002                                       m_LocalStartRecNo,
1003                                       m_RemoteStartRecNo,
1004                                       m_RemoteEvents,
1005                                       &m_Events2Get,
1006                                       &m_Events2Send,
1007                                       &m_LocalSyncedRecNo,
1008                                       &m_RemoteSyncedRecNo)
1009         ||  (CNCSyncLog::GetLogSize(m_Slot) == 0  &&  m_SlotSrv->was_blobs_sync))
1010     {
1011         m_CurGetEvent = m_Events2Get.begin();
1012         m_CurSendEvent = m_Events2Send.begin();
1013         return &CNCActiveSyncControl::x_ExecuteSyncCommands;
1014     }
1015 
1016     // sync by blob list
1017     m_SlotSrv->is_by_blobs = true;
1018     CNCActiveHandler* conn = m_SlotSrv->peer->GetBGConn();
1019     if (!conn) {
1020         m_Result = eSynNetworkError;
1021         m_Hint = NC_SYNC_HINT;
1022         return &CNCActiveSyncControl::x_FinishSync;
1023     }
1024 
1025     // request blob list
1026     m_StartedCmds = 1;
1027     conn->SyncBlobsList(this);
1028     m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1029     return &CNCActiveSyncControl::x_WaitForBlobList;
1030 #else
1031     CNCSyncLog::GetSyncOperations(m_SrvId, m_Slot,
1032                                       m_LocalStartRecNo,
1033                                       m_RemoteStartRecNo,
1034                                       m_RemoteEvents,
1035                                       &m_Events2Get,
1036                                       &m_Events2Send,
1037                                       &m_LocalSyncedRecNo,
1038                                       &m_RemoteSyncedRecNo);
1039     m_CurGetEvent = m_Events2Get.begin();
1040     m_CurSendEvent = m_Events2Send.begin();
1041     return &CNCActiveSyncControl::x_ExecuteSyncCommands;
1042 #endif
1043 }
1044 
1045 CNCActiveSyncControl::State
x_WaitForBlobList(void)1046 CNCActiveSyncControl::x_WaitForBlobList(void)
1047 {
1048     if (m_StartedCmds != 0) {
1049         return NULL;
1050     }
1051     if (CTaskServer::IsInShutdown()) {
1052         m_Result = eSynAborted;
1053         m_Hint = NC_SYNC_HINT;
1054     }
1055     if (m_Result != eSynOK)
1056         return &CNCActiveSyncControl::x_FinishSync;
1057 
1058     return &CNCActiveSyncControl::x_PrepareSyncByBlobs;
1059 }
1060 
1061 CNCActiveSyncControl::State
x_PrepareSyncByBlobs(void)1062 CNCActiveSyncControl::x_PrepareSyncByBlobs(void)
1063 {
1064     m_LocalSyncedRecNo = CNCSyncLog::GetCurrentRecNo(m_Slot);
1065     m_RemoteSyncedRecNo = m_RemoteStartRecNo;
1066     m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1067 
1068     ITERATE(TNCBlobSumList, it, m_LocalBlobs) {
1069         delete it->second;
1070     }
1071     m_LocalBlobs.clear();
1072     CNCBlobStorage::GetFullBlobsList(m_Slot, m_LocalBlobs, NULL);
1073 
1074     m_CurLocalBlob = m_LocalBlobs.begin();
1075     m_CurRemoteBlob = m_RemoteBlobs.begin();
1076     m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1077     return &CNCActiveSyncControl::x_ExecuteSyncCommands;
1078 }
1079 
1080 void
x_CleanSyncObjects(void)1081 CNCActiveSyncControl::x_CleanSyncObjects(void)
1082 {
1083     ITERATE(TNCBlobSumList, it, m_RemoteBlobs) {
1084         delete it->second;
1085     }
1086     m_RemoteBlobs.clear();
1087     m_CurRemoteBlob = m_RemoteBlobs.begin();
1088 
1089 #if 0
1090     ITERATE(TNCBlobSumList, it, m_LocalBlobs) {
1091         delete it->second;
1092     }
1093     m_LocalBlobs.clear();
1094     m_CurLocalBlob = m_LocalBlobs.begin();
1095 #endif
1096 
1097     ITERATE(TReducedSyncEvents, it, m_RemoteEvents) {
1098         delete it->second.wr_or_rm_event;
1099         delete it->second.prolong_event;
1100     }
1101     m_RemoteEvents.clear();
1102 
1103 #if 0
1104     m_Events2Get.clear();
1105     m_Events2Send.clear();
1106     m_CurGetEvent = m_Events2Get.begin();
1107     m_CurSendEvent = m_Events2Send.begin();
1108 #endif
1109 }
1110 
1111 void
x_CalcNextTask(void)1112 CNCActiveSyncControl::x_CalcNextTask(void)
1113 {
1114     for (;;) {
1115     switch (m_NextTask) {
1116     case eSynEventSend:
1117         ++m_CurSendEvent;
1118         break;
1119     case eSynEventGet:
1120         ++m_CurGetEvent;
1121         break;
1122     case eSynBlobUpdateOur:
1123     case eSynBlobUpdatePeer:
1124         ++m_CurLocalBlob;
1125         ++m_CurRemoteBlob;
1126         break;
1127     case eSynBlobSend:
1128         ++m_CurLocalBlob;
1129         break;
1130     case eSynBlobGet:
1131         ++m_CurRemoteBlob;
1132         break;
1133     case eSynNoTask:
1134         break;
1135     case eSynNeedFinalize:
1136         m_NextTask = eSynNoTask;
1137         return;
1138     }
1139 
1140     if (m_SlotData->clean_required  &&  m_Result != eSynNetworkError) {
1141         m_Result = eSynAborted;
1142         m_Hint = NC_SYNC_HINT;
1143     }
1144 
1145     if (m_Result == eSynNetworkError  ||  m_Result == eSynAborted)
1146         m_NextTask = eSynNeedFinalize;
1147     else if (!m_SlotSrv->is_by_blobs) {
1148         if (m_CurSendEvent != m_Events2Send.end()) {
1149             m_NextTask = eSynEventSend;
1150             ++m_Progress;
1151         } else if (m_CurGetEvent != m_Events2Get.end()) {
1152             m_NextTask = eSynEventGet;
1153             ++m_Progress;
1154         } else {
1155             m_NextTask = eSynNeedFinalize;
1156         }
1157     }
1158     else {
1159 sync_next_key:
1160         if (m_CurLocalBlob != m_LocalBlobs.end()
1161             &&  m_CurRemoteBlob != m_RemoteBlobs.end())
1162         {
1163             if (m_CurLocalBlob->first == m_CurRemoteBlob->first) {
1164                 if (m_CurLocalBlob->second->isEqual(*m_CurRemoteBlob->second)) {
1165                     // Equivalent blobs, skip them.
1166                     ++m_CurLocalBlob;
1167                     ++m_CurRemoteBlob;
1168                     ++m_Progress;
1169                     goto sync_next_key;
1170                 }
1171 
1172                 // The same blob key. Test which one is newer.
1173                 if (m_CurLocalBlob->second->isOlder(*m_CurRemoteBlob->second)) {
1174                     m_NextTask = eSynBlobUpdateOur;
1175                     ++m_Progress;
1176                 } else {
1177                     m_NextTask = eSynBlobUpdatePeer;
1178                     ++m_Progress;
1179                 }
1180             }
1181             else if (m_CurLocalBlob->first < m_CurRemoteBlob->first) {
1182                 if (m_CurLocalBlob->second->isExpired()) {
1183                     ++m_CurLocalBlob;
1184                     goto sync_next_key;
1185                 }
1186                 m_NextTask = eSynBlobSend;
1187                 ++m_Progress;
1188             }
1189             else {
1190                 m_NextTask = eSynBlobGet;
1191                 ++m_Progress;
1192             }
1193         }
1194         // Process the tails of the lists
1195         else if (m_CurLocalBlob != m_LocalBlobs.end()) {
1196             if (m_CurLocalBlob->second->isExpired()) {
1197                 ++m_CurLocalBlob;
1198                 goto sync_next_key;
1199             }
1200             m_NextTask = eSynBlobSend;
1201             ++m_Progress;
1202         } else if (m_CurRemoteBlob != m_RemoteBlobs.end()) {
1203             m_NextTask = eSynBlobGet;
1204             ++m_Progress;
1205         } else {
1206             m_NextTask = eSynNeedFinalize;
1207         }
1208     }
1209 /*
1210     SEND:
1211         m_MyTrust >= m_TheirTrust ? SEND : NOOP
1212     GET:
1213         m_MyTrust >  m_TheirTrust ? NOOP : GET
1214         draining ? NOOP (only update existing blobs, never get new ones)
1215     UPDATE:
1216         no restrictions
1217         29jun17: changed to behave same as in SEND/GET
1218 */
1219         if (m_NextTask == eSynEventSend || m_NextTask == eSynBlobSend || m_NextTask == eSynBlobUpdatePeer) {
1220             if (m_MyTrust < m_TheirTrust) {
1221                 continue;
1222             }
1223         }
1224         if (m_NextTask == eSynEventGet || m_NextTask == eSynBlobGet || m_NextTask == eSynBlobUpdateOur) {
1225             if (m_MyTrust > m_TheirTrust || (CNCBlobStorage::IsDraining() && m_NextTask != eSynBlobUpdateOur)) {
1226                 continue;
1227             }
1228         }
1229         break;
1230     };
1231 
1232 // sanity check
1233     string blob_key;
1234     switch (m_NextTask) {
1235     case eSynEventSend:   blob_key = (*m_CurSendEvent)->key.PackedKey();  break;
1236     case eSynEventGet:    blob_key = (*m_CurGetEvent)->key.PackedKey();   break;
1237     case eSynBlobSend:    blob_key = m_CurLocalBlob->first;               break;
1238     case eSynBlobGet:     blob_key = m_CurRemoteBlob->first;              break;
1239     default:
1240         break;
1241     }
1242     if (!blob_key.empty()) {
1243         Uint2 slot=0, bucket;
1244         if (!CNCDistributionConf::GetSlotByKey(blob_key,slot, bucket) || slot != m_Slot) {
1245             m_Result = eSynAborted;
1246             m_Hint = NC_SYNC_HINT;
1247             m_NextTask = eSynNeedFinalize;
1248         }
1249     }
1250 }
1251 
1252 void
x_DoEventSend(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1253 CNCActiveSyncControl::x_DoEventSend(const SSyncTaskInfo& task_info,
1254                                     CNCActiveHandler* conn)
1255 {
1256     SNCSyncEvent* event = *task_info.send_evt;
1257     if (event->blob_size > CNCDistributionConf::GetMaxBlobSizeSync() ||
1258         !conn->GetPeer()->AcceptsBlobKey(event->key)) {
1259         CmdFinished( eSynOK, eSynActionWrite, conn, NC_SYNC_HINT);
1260         conn->Release();
1261         return;
1262     }
1263     switch (event->event_type) {
1264     default:
1265         break;
1266     case eSyncWrite:
1267         conn->SyncSend(this, event);
1268         break;
1269     case eSyncProlong:
1270         conn->SyncProlongPeer(this, event);
1271         break;
1272     }
1273 }
1274 
1275 void
x_DoEventGet(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1276 CNCActiveSyncControl::x_DoEventGet(const SSyncTaskInfo& task_info,
1277                                    CNCActiveHandler* conn)
1278 {
1279     SNCSyncEvent* event = *task_info.get_evt;
1280     switch (event->event_type) {
1281     default:
1282         break;
1283     case eSyncWrite:
1284         conn->SyncRead(this, event);
1285         break;
1286     case eSyncProlong:
1287         conn->SyncProlongOur(this, event);
1288         break;
1289     }
1290 }
1291 
1292 void
x_DoBlobUpdateOur(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1293 CNCActiveSyncControl::x_DoBlobUpdateOur(const SSyncTaskInfo& task_info,
1294                                         CNCActiveHandler* conn)
1295 {
1296     string key(task_info.remote_blob->first);
1297     SNCBlobSummary* local_blob = task_info.local_blob->second;
1298     SNCBlobSummary* remote_blob = task_info.remote_blob->second;
1299     if (local_blob->isSameData(*remote_blob))
1300         conn->SyncProlongOur(this, key, *remote_blob);
1301     else
1302         conn->SyncRead(this, key, remote_blob->create_time);
1303 }
1304 
1305 void
x_DoBlobUpdatePeer(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1306 CNCActiveSyncControl::x_DoBlobUpdatePeer(const SSyncTaskInfo& task_info,
1307                                          CNCActiveHandler* conn)
1308 {
1309     CNCBlobKeyLight key(task_info.remote_blob->first);
1310     if (!conn->GetPeer()->AcceptsBlobKey(key)) {
1311         CmdFinished( eSynOK, eSynActionWrite, conn, NC_SYNC_HINT);
1312         conn->Release();
1313         return;
1314     }
1315     SNCBlobSummary* local_blob = task_info.local_blob->second;
1316     SNCBlobSummary* remote_blob = task_info.remote_blob->second;
1317     if (local_blob->isSameData(*remote_blob))
1318         conn->SyncProlongPeer(this, key, *local_blob);
1319     else
1320         conn->SyncSend(this, key);
1321 }
1322 
1323 void
x_DoBlobSend(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1324 CNCActiveSyncControl::x_DoBlobSend(const SSyncTaskInfo& task_info,
1325                                    CNCActiveHandler* conn)
1326 {
1327     CNCBlobKeyLight key(task_info.local_blob->first);
1328     if (!conn->GetPeer()->AcceptsBlobKey(key)) {
1329         CmdFinished( eSynOK, eSynActionWrite, conn, NC_SYNC_HINT);
1330         conn->Release();
1331         return;
1332     }
1333     conn->SyncSend(this, key);
1334 }
1335 
1336 void
x_DoBlobGet(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1337 CNCActiveSyncControl::x_DoBlobGet(const SSyncTaskInfo& task_info,
1338                                   CNCActiveHandler* conn)
1339 {
1340     string key(task_info.remote_blob->first);
1341     Uint8 create_time = task_info.remote_blob->second->create_time;
1342     conn->SyncRead(this, key, create_time);
1343 }
1344 
1345 void
x_DoFinalize(CNCActiveHandler * conn)1346 CNCActiveSyncControl::x_DoFinalize(CNCActiveHandler* conn)
1347 {
1348     if (m_Result == eSynOK) {
1349         CNCSyncLog::SetLastSyncRecNo(m_SrvId, m_Slot,
1350                                      m_LocalSyncedRecNo, m_RemoteSyncedRecNo);
1351         conn->SyncCommit(this, m_LocalSyncedRecNo, m_RemoteSyncedRecNo);
1352     }
1353     else {
1354         conn->SyncCancel(this);
1355     }
1356 }
1357 
1358 bool
GetNextTask(SSyncTaskInfo & task_info,bool * is_valid)1359 CNCActiveSyncControl::GetNextTask(SSyncTaskInfo& task_info, bool* is_valid)
1360 {
1361 //    m_Lock.Lock();
1362     if (m_NextTask == eSynNoTask) {
1363         if (is_valid) {
1364             *is_valid = false;
1365             task_info.task_type = eSynNoTask;
1366             m_Lock.Unlock();
1367             return false;
1368         } else {
1369             SRV_FATAL("Invalid state: m_NextTask: " << m_NextTask);
1370         }
1371     }
1372     task_info.task_type = m_NextTask;
1373     task_info.get_evt = m_CurGetEvent;
1374     task_info.send_evt = m_CurSendEvent;
1375     task_info.local_blob = m_CurLocalBlob;
1376     task_info.remote_blob = m_CurRemoteBlob;
1377     ++m_StartedCmds;
1378     if (m_StartedCmds == 0) {
1379         SRV_FATAL("Invalid state: no m_StartedCmds");
1380     }
1381     if (m_NextTask != eSynNeedFinalize)
1382         ++m_SlotSrv->cnt_sync_ops;
1383     m_SlotSrv->last_active_time = CSrvTime::CurSecs();
1384     x_CalcNextTask();
1385     bool has_more = m_NextTask != eSynNeedFinalize  &&  m_NextTask != eSynNoTask;
1386 //    m_Lock.Unlock();
1387     if (is_valid) {
1388         *is_valid = true;
1389     }
1390     return has_more;
1391 }
1392 
1393 void
ExecuteSyncTask(const SSyncTaskInfo & task_info,CNCActiveHandler * conn)1394 CNCActiveSyncControl::ExecuteSyncTask(const SSyncTaskInfo& task_info,
1395                                       CNCActiveHandler* conn)
1396 {
1397     switch (task_info.task_type) {
1398     case eSynEventSend:
1399         x_DoEventSend(task_info, conn);
1400         break;
1401     case eSynEventGet:
1402         x_DoEventGet(task_info, conn);
1403         break;
1404     case eSynBlobUpdateOur:
1405         x_DoBlobUpdateOur(task_info, conn);
1406         break;
1407     case eSynBlobUpdatePeer:
1408         x_DoBlobUpdatePeer(task_info, conn);
1409         break;
1410     case eSynBlobSend:
1411         x_DoBlobSend(task_info, conn);
1412         break;
1413     case eSynBlobGet:
1414         x_DoBlobGet(task_info, conn);
1415         break;
1416     case eSynNeedFinalize:
1417         x_DoFinalize(conn);
1418         break;
1419     default:
1420         SRV_FATAL("Unsupported task type: " << task_info.task_type);
1421     }
1422 }
1423 
1424 void
CmdFinished(ESyncResult res,ESynActionType action,CNCActiveHandler * conn,int hint)1425 CNCActiveSyncControl::CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler* conn, int hint)
1426 {
1427     m_Lock.Lock();
1428     m_SyncHandlers.erase(conn);
1429     --m_StartedCmds;
1430     if (res == eSynOK) {
1431         switch (action) {
1432         case eSynActionRead:
1433             ++m_ReadOK;
1434             break;
1435         case eSynActionWrite:
1436             ++m_WriteOK;
1437             break;
1438         case eSynActionProlong:
1439             ++m_ProlongOK;
1440             break;
1441         case eSynActionRemove:
1442             ++m_DelOK;
1443             break;
1444         case eSynActionNone:
1445             break;
1446         }
1447     }
1448     else {
1449         switch (action) {
1450         case eSynActionRead:
1451             ++m_ReadERR;
1452             break;
1453         case eSynActionWrite:
1454             ++m_WriteERR;
1455             break;
1456         case eSynActionProlong:
1457             ++m_ProlongERR;
1458             break;
1459         case eSynActionRemove:
1460             ++m_DelERR;
1461             break;
1462         case eSynActionNone:
1463             break;
1464         }
1465     }
1466 
1467     if (m_Result == eSynOK) {
1468         if (res == eSynAborted  &&  m_Result != eSynNetworkError) {
1469             m_Result = eSynAborted;
1470             m_Hint = hint;
1471         } else if (res != eSynOK) {
1472             m_Result = res;
1473             m_Hint = hint;
1474         }
1475     }
1476     if (m_Result != eSynOK) {
1477         if (m_NextTask >= eSynNeedFinalize) {
1478             m_NextTask = eSynNeedFinalize;
1479         } else {
1480             m_NextTask = eSynNoTask;
1481         }
1482     }
1483 
1484     SetRunnable();
1485     m_Lock.Unlock();
1486 }
1487 
PrintState(TNCBufferType & task,const CTempString & mask)1488 void CNCActiveSyncControl::PrintState(TNCBufferType& task, const CTempString& mask)
1489 {
1490     Uint2 slot = 0;
1491     bool one_slot = false, is_audit = false;
1492     if (!mask.empty()) {
1493         is_audit = mask == "audit";
1494         if (!is_audit) {
1495             try {
1496                 slot = NStr::StringToUInt(mask);
1497                 one_slot = true;
1498             } catch (...) {
1499             }
1500         }
1501     }
1502 
1503     char buf[50];
1504     string is("\": "), iss("\": \""), eol(",\n\"");
1505 
1506     task.WriteText(",\n\"SyncControls\": [");
1507     ITERATE(TSyncControls, c, s_SyncControls) {
1508         if (c != s_SyncControls.begin()) {
1509             task.WriteText(",");
1510         }
1511         task.WriteText("{");
1512         task.WriteText("\n\"").WriteText("peer"       ).WriteText(iss).WriteText(CNCDistributionConf::GetPeerName( (*c)->m_SrvId)).WriteText("\"");
1513         task.WriteText(eol).WriteText("stuck"         ).WriteText(is ).WriteBool( (*c)->IsStuck());
1514         task.WriteText(eol).WriteText("slot"          ).WriteText(is ).WriteNumber( (*c)->m_Slot);
1515         task.WriteText(eol).WriteText("started_cmds"  ).WriteText(is ).WriteNumber( (*c)->m_StartedCmds);
1516         CSrvTime((*c)->m_StartTime / kUSecsPerSecond).Print(buf, CSrvTime::eFmtJson);
1517         task.WriteText(eol).WriteText("sync_StartTime").WriteText(is ).WriteText( buf);
1518         CSrvTime((*c)->m_LoopStart / kUSecsPerSecond).Print(buf, CSrvTime::eFmtJson);
1519         task.WriteText(eol).WriteText("slot_LoopStart").WriteText(is ).WriteText( buf);
1520         task.WriteText(eol).WriteText("cnt_Unfinished").WriteText(is ).WriteNumber( (*c)->m_CntUnfinished);
1521 
1522         task.WriteText(eol).WriteText("Events2Get_size").WriteText(is ).WriteNumber( (*c)->m_Events2Get.size());
1523         task.WriteText(eol).WriteText("Events2Send_size").WriteText(is ).WriteNumber( (*c)->m_Events2Send.size());
1524         task.WriteText(eol).WriteText("LocalBlobs_size").WriteText(is ).WriteNumber( (*c)->m_LocalBlobs.size());
1525         task.WriteText(eol).WriteText("RemoteBlobs_size").WriteText(is ).WriteNumber( (*c)->m_RemoteBlobs.size());
1526         task.WriteText(eol).WriteText("NextTask").WriteText(is ).WriteNumber( (int)(*c)->m_NextTask);
1527         task.WriteText(eol).WriteText("Result").WriteText(is ).WriteNumber( (int)(*c)->m_Result);
1528         task.WriteText(eol).WriteText("Hint").WriteText(is ).WriteNumber( (int)(*c)->m_Hint);
1529         task.WriteText(eol).WriteText("Progress").WriteText(is ).WriteNumber( (int)(*c)->m_Progress);
1530         task.WriteText(eol).WriteText("FinishSyncCalled").WriteText(is ).WriteBool( (*c)->m_FinishSyncCalled);
1531         task.WriteText(eol).WriteText("NeedReply").WriteText(is ).WriteBool( (*c)->m_NeedReply);
1532 
1533         CSrvTime((*c)->m_LastActive).Print(buf, CSrvTime::eFmtJson);
1534         task.WriteText(eol).WriteText("LastActive").WriteText(is ).WriteText( buf);
1535         task.WriteText(eol).WriteText("cntHandlers").WriteText(is ).WriteNumber( (*c)->m_SyncHandlers.size());
1536         if (is_audit) {
1537             task.WriteText(eol).WriteText("Handlers").WriteText(is );
1538             task.WriteText("[");
1539             bool first = true;
1540             ITERATE(set<CNCActiveHandler*>, h, (*c)->m_SyncHandlers) {
1541                 if (first) {
1542                     first = false;
1543                 } else {
1544                     task.WriteText(",");
1545                 }
1546                 CSrvTime((*h)->m_LastActive).Print(buf, CSrvTime::eFmtJson);
1547                 task.WriteText("\n{\"");
1548                 task.WriteText("LastActive").WriteText(is ).WriteText( buf);
1549 
1550                 task.WriteText(eol).WriteText("m_Client").WriteText(is ).WriteBool( (*h)->m_Client != nullptr);
1551                 task.WriteText(eol).WriteText("m_SyncCtrl").WriteText(is ).WriteBool( (*h)->m_SyncCtrl != nullptr);
1552                 task.WriteText(eol).WriteText("m_Proxy").WriteText(is ).WriteBool( (*h)->m_Proxy != nullptr);
1553 
1554                 task.WriteText(eol).WriteText("m_ProcessingStarted").WriteText(is ).WriteBool( (*h)->m_ProcessingStarted);
1555                 task.WriteText(eol).WriteText("m_CmdStarted").WriteText(is ).WriteBool( (*h)->m_CmdStarted);
1556                 task.WriteText(eol).WriteText("m_GotAnyAnswer").WriteText(is ).WriteBool( (*h)->m_GotAnyAnswer);
1557                 task.WriteText(eol).WriteText("m_GotCmdAnswer").WriteText(is ).WriteBool( (*h)->m_GotCmdAnswer);
1558                 task.WriteText(eol).WriteText("m_GotClientResponse").WriteText(is ).WriteBool( (*h)->m_GotClientResponse);
1559                 task.WriteText(eol).WriteText("m_BlobExists").WriteText(is ).WriteBool( (*h)->m_BlobExists);
1560                 task.WriteText(eol).WriteText("m_CmdSuccess").WriteText(is ).WriteBool( (*h)->m_CmdSuccess);
1561                 task.WriteText(eol).WriteText("m_CmdFromClient").WriteText(is ).WriteBool( (*h)->m_CmdFromClient);
1562                 task.WriteText(eol).WriteText("m_Purge").WriteText(is ).WriteBool( (*h)->m_Purge);
1563 
1564                 task.WriteText(eol).WriteText("m_CmdToSend").WriteText(iss ).WriteText( (*h)->m_CmdToSend).WriteText("\"");;
1565                 task.WriteText(eol).WriteText("m_Response").WriteText(iss ).WriteText( (*h)->m_Response).WriteText("\"");;
1566                 task.WriteText(eol).WriteText("m_ErrMsg").WriteText(iss ).WriteText( (*h)->m_ErrMsg).WriteText("\"");;
1567 
1568                 task.WriteText("}");
1569             }
1570             task.WriteText("]");
1571         }
1572         task.WriteText("\n}");
1573     }
1574     task.WriteText("],\n");
1575 
1576     bool is_first = true;
1577     task.WriteText("\"SlotsList\": [");
1578     ITERATE(TSyncSlotsList, sl, s_SlotsList) {
1579         if (one_slot) {
1580             if ((*sl)->slot != slot) {
1581                 continue;
1582             }
1583         } else {
1584             if ((*sl)->cnt_sync_started == 0) {
1585                 continue;
1586             }
1587         }
1588         if (!is_first) {
1589             task.WriteText(",");
1590         }
1591         is_first = false;
1592         task.WriteText("{");
1593         task.WriteText("\n\"").WriteText("slot"         ).WriteText(is ).WriteNumber( (*sl)->slot);
1594         task.WriteText(eol).WriteText("cnt_sync_started").WriteText(is ).WriteNumber( (*sl)->cnt_sync_started);
1595         task.WriteText(eol).WriteText("cleaning"        ).WriteText(is ).WriteBool(   (*sl)->cleaning);
1596         task.WriteText(eol).WriteText("clean_required"  ).WriteText(is ).WriteBool(   (*sl)->clean_required);
1597         task.WriteText(eol).WriteText("srvs"            ).WriteText(is );
1598         task.WriteText("\n[");
1599         ITERATE(TSlotSrvsList, srv, (*sl)->srvs) {
1600             if (srv != (*sl)->srvs.begin()) {
1601                 task.WriteText(",");
1602             }
1603             task.WriteText("{");
1604             task.WriteText("\n\"").WriteText("peer"          ).WriteText(iss).WriteText(CNCDistributionConf::GetPeerName( (*srv)->peer->GetSrvId())).WriteText("\"");
1605             task.WriteText(eol).WriteText("sync_started"     ).WriteText(is ).WriteBool(   (*srv)->sync_started);
1606             task.WriteText(eol).WriteText("is_passive"       ).WriteText(is ).WriteBool(   (*srv)->is_passive);
1607             task.WriteText(eol).WriteText("is_by_blobs"      ).WriteText(is ).WriteBool(   (*srv)->is_by_blobs);
1608             task.WriteText(eol).WriteText("was_blobs_sync"   ).WriteText(is ).WriteBool(   (*srv)->was_blobs_sync);
1609             task.WriteText(eol).WriteText("cnt_event_sync"   ).WriteText(is ).WriteNumber( (*srv)->cnt_event_sync);
1610             task.WriteText(eol).WriteText("result"           ).WriteText(is ).WriteNumber( (int)(*srv)->result);
1611             task.WriteText(eol).WriteText("hint"             ).WriteText(is ).WriteNumber( (*srv)->hint);
1612             task.WriteText(eol).WriteText("made_initial_sync").WriteText(is ).WriteBool(   (*srv)->made_initial_sync);
1613             task.WriteText(eol).WriteText("started_cmds"     ).WriteText(is ).WriteNumber( (*srv)->started_cmds);
1614             CSrvTime((*srv)->next_sync_time / kUSecsPerSecond).Print(buf, CSrvTime::eFmtJson);
1615             task.WriteText(eol).WriteText("next_sync_time"   ).WriteText(is ).WriteText( buf);
1616             CSrvTime((*srv)->last_active_time).Print(buf, CSrvTime::eFmtJson);
1617             task.WriteText(eol).WriteText("last_active_time" ).WriteText(is ).WriteText( buf);
1618             CSrvTime((*srv)->last_success_time).Print(buf, CSrvTime::eFmtJson);
1619             task.WriteText(eol).WriteText("last_success_time").WriteText(is ).WriteText( buf);
1620             task.WriteText("\n}");
1621         }
1622         task.WriteText("]\n}");
1623     }
1624     task.WriteText("]");
1625 }
1626 
1627 END_NCBI_SCOPE
1628