1 /*  $Id: peer_control.cpp 545049 2017-08-31 13:38:24Z gouriano $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  *
28  */
29 
30 #include "nc_pch.hpp"
31 
32 #include <util/random_gen.hpp>
33 
34 #include "netcached.hpp"
35 #include "peer_control.hpp"
36 #include "active_handler.hpp"
37 #include "periodic_sync.hpp"
38 #include "distribution_conf.hpp"
39 #include "nc_storage_blob.hpp"
40 
41 
42 BEGIN_NCBI_SCOPE
43 
44 
45 typedef map<Uint8, CNCPeerControl*> TControlMap;
46 // "almost" not needed, as I pre-create them all on initialization
47 // BUT I can add new peers on RECONF
48 static CMiniMutex s_MapLock;
49 static TControlMap s_Controls;
50 static CAtomicCounter s_SyncOnInit;
51 static CAtomicCounter s_WaitToOpenToClients;
52 static CAtomicCounter s_AbortedSyncClients;
53 
54 static CAtomicCounter s_MirrorQueueSize;
55 static FILE* s_MirrorLogFile = NULL;
56 CAtomicCounter CNCPeerControl::sm_TotalCopyRequests;
57 CAtomicCounter CNCPeerControl::sm_CopyReqsRejected;
58 
59 static CNCPeerShutdown* s_ShutdownListener = NULL;
60 static Uint4 s_ServersToSync = 0;
61 
62 
63 
64 static CMiniMutex s_RndLock;
65 static CRandom s_Rnd(CRandom::TValue(time(NULL)));
66 
67 static void
s_SetNextTime(Uint8 & next_time,Uint8 value,bool add_random)68 s_SetNextTime(Uint8& next_time, Uint8 value, bool add_random)
69 {
70     if (add_random) {
71         s_RndLock.Lock();
72         value += s_Rnd.GetRand(0, kUSecsPerSecond);
73         s_RndLock.Unlock();
74     }
75     if (next_time < value)
76         next_time = value;
77 }
78 
79 
SNCMirrorProlong(ENCSyncEvent typ,Uint2 slot_,const CNCBlobKeyLight & key_,Uint8 rec_no,Uint8 tm,const CNCBlobAccessor * accessor)80 SNCMirrorProlong::SNCMirrorProlong(ENCSyncEvent typ,
81                                    Uint2 slot_,
82                                    const CNCBlobKeyLight& key_,
83                                    Uint8 rec_no,
84                                    Uint8 tm,
85                                    const CNCBlobAccessor* accessor)
86     : SNCMirrorEvent(typ, slot_, key_, rec_no),
87       orig_time(tm)
88 {
89     blob_sum.create_id = accessor->GetCurCreateId();
90     blob_sum.create_server = accessor->GetCurCreateServer();
91     blob_sum.create_time = accessor->GetCurBlobCreateTime();
92     blob_sum.dead_time = accessor->GetCurBlobDeadTime();
93     blob_sum.expire = accessor->GetCurBlobExpire();
94     blob_sum.ver_expire = accessor->GetCurVerExpire();
95     blob_sum.size = accessor->GetCurBlobSize();
96 }
97 
98 
99 bool
Initialize(void)100 CNCPeerControl::Initialize(void)
101 {
102     s_MirrorQueueSize.Set(0);
103     if (!CNCDistributionConf::GetMirroringSizeFile().empty()) {
104         s_MirrorLogFile = fopen(CNCDistributionConf::GetMirroringSizeFile().c_str(), "a");
105     }
106     sm_TotalCopyRequests.Set(0);
107     sm_CopyReqsRejected.Set(0);
108 
109     s_ShutdownListener = new CNCPeerShutdown();
110     CTaskServer::AddShutdownCallback(s_ShutdownListener);
111 
112     s_MapLock.Lock();
113     NON_CONST_ITERATE(TControlMap, it, s_Controls) {
114         it->second->SetRunnable();
115     }
116     s_MapLock.Unlock();
117 
118     return true;
119 }
120 
121 void
Finalize(void)122 CNCPeerControl::Finalize(void)
123 {
124     if (s_MirrorLogFile)
125         fclose(s_MirrorLogFile);
126 }
127 
128 CNCPeerControl*
Peer(Uint8 srv_id)129 CNCPeerControl::Peer(Uint8 srv_id)
130 {
131     CNCPeerControl* ctrl;
132     s_MapLock.Lock();
133     TControlMap::const_iterator it = s_Controls.find(srv_id);
134     if (it == s_Controls.end()) {
135         ctrl = new CNCPeerControl(srv_id);
136         s_Controls[srv_id] = ctrl;
137         // s_ShutdownListener is set during initialization
138         if (s_ShutdownListener)
139             ctrl->SetRunnable();
140     }
141     else {
142         ctrl = it->second;
143     }
144     s_MapLock.Unlock();
145     return ctrl;
146 }
147 
148 void
PeerHandshake(void)149 CNCPeerControl::PeerHandshake(void)
150 {
151 // the answer will come some time in the future
152 // until that, we use backward compatible protocol
153     if (AtomicCAS(m_HostProtocol, 0, 1)) {
154         CNCActiveHandler* conn = GetBGConn();
155         if (conn) {
156             conn->AskPeerVersion();
157         } else {
158             m_HostProtocol = 0;
159         }
160     }
161 }
162 
163 string
GetPeerNameOrEmpty(Uint8 srv_id)164 CNCPeerControl::GetPeerNameOrEmpty(Uint8 srv_id)
165 {
166     CNCPeerControl *ctrl = NULL;
167     s_MapLock.Lock();
168     TControlMap::const_iterator it = s_Controls.find(srv_id);
169     if (it != s_Controls.end()) {
170         ctrl = it->second;
171     }
172     s_MapLock.Unlock();
173     string res;
174     if (ctrl != NULL) {
175         res += ctrl->m_Hostname;
176         res += ':';
177         res += NStr::NumericToString(Uint2(ctrl->m_SrvId));
178     }
179     return res;
180 }
181 
182 
CNCPeerControl(Uint8 srv_id)183 CNCPeerControl::CNCPeerControl(Uint8 srv_id)
184     : m_SrvId(srv_id),
185       m_HostIP( Uint4(m_SrvId >> 32)),
186       m_FirstNWErrTime(0),
187       m_NextSyncTime(0),
188       m_ActiveConns(0),
189       m_BGConns(0),
190       m_SlotsToInitSync(0),
191       m_OrigSlotsToInitSync(0),
192       m_CntActiveSyncs(0),
193       m_CntNWErrors(0),
194       m_CntNWThrottles(0),
195       m_InThrottle(false),
196       m_MaybeThrottle(false),
197       m_HasBGTasks(false),
198       m_InitiallySynced(false)
199 {
200 #if __NC_TASKS_MONITOR
201     m_TaskName = "CNCPeerControl";
202 #endif
203 
204     m_NextTaskSync = m_SyncList.end();
205 
206      // it MUST be "host:port", see CNCDistributionConf::Initialize
207     string hostport( CNCDistributionConf::GetPeerNameOrEmpty(m_SrvId));
208     if (!hostport.empty()) {
209         list<CTempString> srv_fields;
210         ncbi_NStr_Split(hostport, ":", srv_fields);
211         if (srv_fields.size() == 2) {
212             m_Hostname = srv_fields.front();
213         }
214     }
215     m_HostIPname = CTaskServer::IPToString(m_HostIP);
216     m_HostAlias = CNCDistributionConf::CreateHostAlias(m_HostIP, Uint4(m_SrvId));
217     m_HostProtocol = 0;
218     m_TrustLevel = 0;
219 }
220 
221 void
RegisterConnError(void)222 CNCPeerControl::RegisterConnError(void)
223 {
224     CMiniMutexGuard guard(m_ObjLock);
225     if (m_FirstNWErrTime == 0)
226         m_FirstNWErrTime = CSrvTime::Current().AsUSec();
227     m_MaybeThrottle = true;
228     if (++m_CntNWErrors >= CNCDistributionConf::GetCntErrorsToThrottle()) {
229         m_InThrottle = true;
230         m_ThrottleStart = CSrvTime::Current().AsUSec();
231         ++m_CntNWThrottles;
232     }
233     m_HostProtocol = 0;
234     CWriteBackControl::ResetStatCounters();
235 }
236 
237 void
RegisterConnSuccess(void)238 CNCPeerControl::RegisterConnSuccess(void)
239 {
240     bool ask = false;
241     {
242         CMiniMutexGuard guard(m_ObjLock);
243         m_InThrottle = false;
244         m_MaybeThrottle = !m_InitiallySynced;
245         m_FirstNWErrTime = 0;
246         m_CntNWErrors = 0;
247         m_CntNWThrottles = 0;
248         m_ThrottleStart = 0;
249         ask = m_HostProtocol == 0;
250     }
251     if (ask) {
252         PeerHandshake();
253     }
254 }
255 
256 bool
CreateNewSocket(CNCActiveHandler * conn)257 CNCPeerControl::CreateNewSocket(CNCActiveHandler* conn)
258 {
259     if (CTaskServer::IsInHardShutdown())
260         return false;
261     if (m_InThrottle) {
262         m_ObjLock.Lock();
263         if (m_InThrottle) {
264             Uint8 cur_time = CSrvTime::Current().AsUSec();
265             Uint8 period = CNCDistributionConf::GetPeerThrottlePeriod();
266             if (cur_time - m_ThrottleStart <= period) {
267                 m_ObjLock.Unlock();
268                 SRV_LOG(Warning, "Connection to "
269                     << CNCDistributionConf::GetFullPeerName(m_SrvId) << " is throttled");
270                 return false;
271             }
272             if (m_CntNWThrottles >= CNCDistributionConf::GetCntThrottlesToIpchange()) {
273                 Uint4 host = CTaskServer::GetIPByHost(m_Hostname);
274                 if (host != 0 && m_HostIP != host) {
275                     m_HostIP = host;
276                     m_HostIPname = CTaskServer::IPToString(m_HostIP);
277                     m_HostAlias = CNCDistributionConf::CreateHostAlias(m_HostIP, Uint4(m_SrvId));
278                     m_HostProtocol = 0;
279                     CNCAlerts::Register(CNCAlerts::ePeerIpChanged, CNCDistributionConf::GetFullPeerName(m_SrvId));
280                     SRV_LOG(Warning, "IP address change: host "
281                         << CNCDistributionConf::GetFullPeerName(m_SrvId));
282                 }
283                 m_CntNWThrottles = 0;
284             }
285             m_InThrottle = false;
286             m_MaybeThrottle = !m_InitiallySynced;
287             if (m_InitiallySynced)
288                 m_FirstNWErrTime = 0;
289             m_CntNWErrors = 0;
290             m_ThrottleStart = 0;
291         }
292         m_ObjLock.Unlock();
293     }
294 
295     CNCActiveHandler_Proxy* proxy = new CNCActiveHandler_Proxy(conn);
296     if (!proxy->Connect(m_HostIP, Uint2(m_SrvId))) {
297         delete proxy;
298         RegisterConnError();
299         return false;
300     }
301     conn->SetProxy(proxy);
302     return true;
303 }
304 
305 CNCActiveHandler*
x_GetPooledConnImpl(void)306 CNCPeerControl::x_GetPooledConnImpl(void)
307 {
308     if (m_PooledConns.empty()  ||  CTaskServer::IsInHardShutdown())
309         return NULL;
310 
311 // it is important to have it this way, not the other
312     CNCActiveHandler* conn = &m_PooledConns.back();
313     m_PooledConns.pop_back();
314 
315     m_BusyConns.push_back(*conn);
316 
317     conn->m_LastActive = CSrvTime::CurSecs();
318     return conn;
319 }
320 
321 CNCActiveHandler*
GetPooledConn(void)322 CNCPeerControl::GetPooledConn(void)
323 {
324     CMiniMutexGuard guard(m_ObjLock);
325     CNCActiveHandler* conn = x_GetPooledConnImpl();
326     if (conn) {
327         ++m_ActiveConns;
328     }
329     return conn;
330 }
331 
332 inline void
x_UpdateHasTasks(void)333 CNCPeerControl::x_UpdateHasTasks(void)
334 {
335     m_HasBGTasks = !m_SmallMirror.empty()  ||  !m_BigMirror.empty()
336                    ||  !m_SyncList.empty();
337     if (m_HasBGTasks && CTaskServer::IsInShutdown() && m_BusyConns.empty()) {
338 
339 // something went wrong: we still have work to do, but nobody to work on it
340 // but, it is shutdown time...
341 //
342 // the problem has to do with "size == 0" addition in x_AddMirrorEvent, here:
343 //    if (size == 0 && x_ReserveBGConn()) {...}
344 
345         SRV_LOG(Error, "Incomplete jobs on shutdown:"
346             << " m_SmallMirror: " << m_SmallMirror.size()
347             << ", m_BigMirror: " << m_BigMirror.size()
348             << ", m_SyncList: " << m_SyncList.size());
349         m_HasBGTasks = false;
350     }
351 
352 #if 0
353     size_t conn = m_BusyConns.size() + 1;
354     if (conn < (size_t)m_ActiveConns) {
355         m_ActiveConns = conn;
356 #ifdef _DEBUG
357 CNCAlerts::Register(CNCAlerts::eDebugConnAdjusted1, "PutConnToPool");
358 #endif
359     }
360     if (conn < (size_t)m_BGConns) {
361         m_BGConns = conn;
362 #ifdef _DEBUG
363 CNCAlerts::Register(CNCAlerts::eDebugConnAdjusted2, "PutConnToPool");
364 #endif
365     }
366 #endif
367 
368 }
369 
370 bool
x_ReserveBGConn(void)371 CNCPeerControl::x_ReserveBGConn(void)
372 {
373     if (m_ActiveConns >= CNCDistributionConf::GetMaxPeerTotalConns()
374         ||  m_BGConns >= CNCDistributionConf::GetMaxPeerBGConns())
375     {
376         return false;
377     }
378     ++m_ActiveConns;
379     ++m_BGConns;
380     return true;
381 }
382 
383 bool
x_ReserveBGConnNow(void)384 CNCPeerControl::x_ReserveBGConnNow(void)
385 {
386     ++m_ActiveConns;
387     ++m_BGConns;
388     return true;
389 }
390 
391 inline void
x_IncBGConns(void)392 CNCPeerControl::x_IncBGConns(void)
393 {
394     ++m_BGConns;
395 }
396 
397 inline void
x_DecBGConns(void)398 CNCPeerControl::x_DecBGConns(void)
399 {
400     --m_BGConns;
401 }
402 
403 inline void
x_DecBGConns(CNCActiveHandler * conn)404 CNCPeerControl::x_DecBGConns(CNCActiveHandler* conn)
405 {
406     if (!conn  ||  conn->IsReservedForBG()) {
407         x_DecBGConns();
408         if (conn)
409             conn->SetReservedForBG(false);
410     }
411 }
412 
413 inline void
x_DecActiveConns(void)414 CNCPeerControl::x_DecActiveConns(void)
415 {
416     --m_ActiveConns;
417 }
418 
419 inline void
x_UnreserveBGConn(void)420 CNCPeerControl::x_UnreserveBGConn(void)
421 {
422     m_ObjLock.Lock();
423     x_DecBGConns();
424     if(x_DoReleaseConn(NULL)) {
425         m_ObjLock.Unlock();
426     }
427 }
428 
429 CNCActiveHandler*
x_CreateNewConn(bool for_bg)430 CNCPeerControl::x_CreateNewConn(bool for_bg)
431 {
432     CNCActiveHandler* conn = new CNCActiveHandler(m_SrvId, this);
433     conn->SetReservedForBG(for_bg);
434     if (!CreateNewSocket(conn)) {
435         delete conn;
436         conn = NULL;
437     }
438 
439     if (conn) {
440         m_ObjLock.Lock();
441         m_BusyConns.push_back(*conn);
442         m_ObjLock.Unlock();
443         conn->m_LastActive = CSrvTime::CurSecs();
444     }
445 
446     return conn;
447 }
448 
449 bool
x_AssignClientConn(CNCActiveClientHub * hub,CNCActiveHandler * conn)450 CNCPeerControl::x_AssignClientConn(CNCActiveClientHub* hub,
451                                    CNCActiveHandler* conn)
452 {
453     if (!conn)
454         conn = x_GetPooledConnImpl();
455     m_ObjLock.Unlock();
456 
457     if (!conn) {
458         conn = x_CreateNewConn(false);
459         if (!conn) {
460             hub->SetErrMsg(m_InThrottle? "ERR:Connection is throttled"
461                                        : "ERR:Cannot connect to peer");
462             hub->SetStatus(eNCHubError);
463             return false;
464         }
465     }
466     hub->SetHandler(conn);
467     conn->SetClientHub(hub);
468     hub->SetStatus(eNCHubConnReady);
469     return true;
470 }
471 
472 void
AssignClientConn(CNCActiveClientHub * hub)473 CNCPeerControl::AssignClientConn(CNCActiveClientHub* hub)
474 {
475     m_ObjLock.Lock();
476     if (m_ActiveConns >= CNCDistributionConf::GetMaxPeerTotalConns()) {
477         hub->SetStatus(eNCHubWaitForConn);
478         m_Clients.push_back(hub);
479         m_ObjLock.Unlock();
480         return;
481     }
482     ++m_ActiveConns;
483     if (!x_AssignClientConn(hub, NULL)) {
484         m_ObjLock.Lock();
485         if (x_DoReleaseConn(NULL))
486             m_ObjLock.Unlock();
487     }
488 }
489 
490 CNCActiveHandler*
x_GetBGConnImpl(void)491 CNCPeerControl::x_GetBGConnImpl(void)
492 {
493     CNCActiveHandler* conn = x_GetPooledConnImpl();
494     m_ObjLock.Unlock();
495     if (conn) {
496         conn->SetReservedForBG(true);
497     } else {
498         conn = x_CreateNewConn(true);
499     }
500     return conn;
501 }
502 
503 CNCActiveHandler*
GetBGConn(bool silent)504 CNCPeerControl::GetBGConn(bool silent)
505 {
506     m_ObjLock.Lock();
507     if (!x_ReserveBGConn()) {
508         m_ObjLock.Unlock();
509         if(!silent) {
510             SRV_LOG(Warning, "Too many active (" << m_ActiveConns
511                              << ") or background (" << m_BGConns
512                              << ") connections");
513         }
514         return NULL;
515     }
516     CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
517     if (!conn)
518         x_UnreserveBGConn();
519     return conn;
520 }
521 
522 bool
x_DoReleaseConn(CNCActiveHandler * conn)523 CNCPeerControl::x_DoReleaseConn(CNCActiveHandler* conn)
524 // m_ObjLock is locked on entrance
525 // method returns m_ObjLock.IsLocked state
526 // it is not unlocked here, because sometimes there is something else to do
527 {
528 retry:
529     bool is_locked = true;
530     if (!m_Clients.empty()) {
531         CNCActiveClientHub* hub = m_Clients.front();
532         m_Clients.pop_front();
533         if (!x_AssignClientConn(hub, conn)) {  // m_ObjLock.Unlock
534             // unlocked now; we need to lock to retry
535             m_ObjLock.Lock();
536             goto retry;
537         }
538         is_locked = false;
539     }
540     else if (m_HasBGTasks && conn) {
541         // m_ObjLock is locked
542         if (!m_SmallMirror.empty() || !m_BigMirror.empty()) {
543             SNCMirrorEvent* event;
544             if (!m_SmallMirror.empty()) {
545                 event = m_SmallMirror.front();
546                 m_SmallMirror.pop_front();
547             }
548             else {
549                 event = m_BigMirror.front();
550                 m_BigMirror.pop_front();
551             }
552             s_MirrorQueueSize.Add(-1);
553             x_UpdateHasTasks();
554             conn->SetReservedForBG(true);
555             x_IncBGConns();
556             m_ObjLock.Unlock();
557             is_locked = false;
558             x_ProcessMirrorEvent(conn, event);
559         }
560         else if (!m_SyncList.empty()) {
561             bool is_valid = false;
562             CNCActiveSyncControl* sync_ctrl = nullptr;
563             SSyncTaskInfo task_info;
564             while (!is_valid && !m_SyncList.empty()) {
565                 sync_ctrl = *m_NextTaskSync;
566                 if (!sync_ctrl->GetNextTask(task_info, &is_valid)) {
567                     TNCActiveSyncListIt cur_it = m_NextTaskSync;
568                     ++m_NextTaskSync;
569                     m_SyncList.erase(cur_it);
570                 } else  {
571                     ++m_NextTaskSync;
572                 }
573                 if (m_NextTaskSync == m_SyncList.end()) {
574                     m_NextTaskSync = m_SyncList.begin();
575                 }
576             }
577             x_UpdateHasTasks();
578             if (is_valid) {
579                 conn->SetReservedForBG(true);
580                 x_IncBGConns();
581                 m_ObjLock.Unlock();
582                 is_locked = false;
583                 sync_ctrl->ExecuteSyncTask(task_info, conn);
584             } else {
585                 m_ObjLock.Unlock();
586                 is_locked = false;
587             }
588         }
589         else {
590             m_HasBGTasks = false;
591         }
592     }
593     else {
594         x_DecActiveConns();
595     }
596     return is_locked;
597 }
598 
599 void
PutConnToPool(CNCActiveHandler * conn)600 CNCPeerControl::PutConnToPool(CNCActiveHandler* conn)
601 {
602     m_ObjLock.Lock();
603     x_DecBGConns(conn);
604     if (x_DoReleaseConn(conn)) {
605         m_BusyConns.erase(m_BusyConns.iterator_to(*conn));
606         m_PooledConns.push_back(*conn);
607         m_ObjLock.Unlock();
608     }
609 }
610 
611 void
ReleaseConn(CNCActiveHandler * conn)612 CNCPeerControl::ReleaseConn(CNCActiveHandler* conn)
613 {
614     m_ObjLock.Lock();
615     x_DecBGConns(conn);
616     m_BusyConns.erase(m_BusyConns.iterator_to(*conn));
617     if (x_DoReleaseConn(NULL))
618         m_ObjLock.Unlock();
619 }
620 
621 void
x_DeleteMirrorEvent(SNCMirrorEvent * event)622 CNCPeerControl::x_DeleteMirrorEvent(SNCMirrorEvent* event)
623 {
624     if (event->evt_type == eSyncWrite || event->evt_type == eSyncUpdate || event->evt_type == eSyncRemove)
625         delete event;
626     else if (event->evt_type == eSyncProlong)
627         delete (SNCMirrorProlong*)event;
628     else {
629         SRV_FATAL("Unexpected mirror event type: " << event->evt_type);
630     }
631 }
632 
633 void
x_ProcessUpdateEvent(SNCMirrorEvent * event)634 CNCPeerControl::x_ProcessUpdateEvent(SNCMirrorEvent* event)
635 {
636     m_ObjLock.Lock();
637 //    if (x_ReserveBGConnNow()) {
638     if (x_ReserveBGConn()) {
639         CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
640         if (conn) {
641             x_ProcessMirrorEvent(conn, event);
642         } else {
643             x_DeleteMirrorEvent(event);
644             x_UnreserveBGConn();
645         }
646     } else {
647         m_ObjLock.Unlock();
648         x_DeleteMirrorEvent(event);
649     }
650 }
651 
652 void
x_ProcessMirrorEvent(CNCActiveHandler * conn,SNCMirrorEvent * event)653 CNCPeerControl::x_ProcessMirrorEvent(CNCActiveHandler* conn, SNCMirrorEvent* event)
654 {
655     if (event->evt_type == eSyncWrite) {
656         conn->CopyPut(NULL, event->key, event->slot, event->orig_rec_no);
657     }
658     else if (event->evt_type == eSyncProlong) {
659         SNCMirrorProlong* prolong = (SNCMirrorProlong*)event;
660         conn->CopyProlong(prolong->key, prolong->slot, prolong->orig_rec_no,
661                           prolong->orig_time, prolong->blob_sum);
662     }
663     else if (event->evt_type == eSyncUpdate) {
664         conn->CopyUpdate(event->key, event->orig_rec_no);
665     }
666     else if (event->evt_type == eSyncRemove) {
667         conn->CopyRemove(event->key, event->orig_rec_no);
668     }
669     else {
670         SRV_FATAL("Unexpected mirror event type: " << event->evt_type);
671     }
672     x_DeleteMirrorEvent(event);
673 }
674 
675 void
x_AddMirrorEvent(SNCMirrorEvent * event,Uint8 size)676 CNCPeerControl::x_AddMirrorEvent(SNCMirrorEvent* event, Uint8 size)
677 {
678     sm_TotalCopyRequests.Add(1);
679 
680     m_ObjLock.Lock();
681 // all blobs (size!=0) go into queue
682 // this reduces response time
683     if ((size == 0 || m_BusyConns.empty()) && x_ReserveBGConn()) {
684         CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
685         if (conn)
686             x_ProcessMirrorEvent(conn, event);
687         else {
688             x_DeleteMirrorEvent(event);
689             x_UnreserveBGConn();
690         }
691     }
692     else {
693         TNCMirrorQueue* q;
694         if (size <= CNCDistributionConf::GetSmallBlobBoundary())
695             q = &m_SmallMirror;
696         else
697             q = &m_BigMirror;
698         if (q->size() < CNCDistributionConf::GetMaxMirrorQueueSize()) {
699             q->push_back(event);
700             m_HasBGTasks = true;
701             m_ObjLock.Unlock();
702 
703             int queue_size = s_MirrorQueueSize.Add(1);
704             if (s_MirrorLogFile) {
705                 Uint8 cur_time = CSrvTime::Current().AsUSec();
706                 fprintf(s_MirrorLogFile, "%" NCBI_UINT8_FORMAT_SPEC ",%d\n",
707                                          cur_time, queue_size);
708             }
709         }
710         else {
711             m_ObjLock.Unlock();
712             sm_CopyReqsRejected.Add(1);
713             x_DeleteMirrorEvent(event);
714         }
715     }
716 }
717 
718 void
MirrorUpdate(const CNCBlobKeyLight & key,Uint2 slot,Uint8 update_time)719 CNCPeerControl::MirrorUpdate(const CNCBlobKeyLight& key,
720                               Uint2 slot,
721                               Uint8 update_time)
722 {
723     const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
724     ITERATE(TServersList, it_srv, servers) {
725         Uint8 srv_id = *it_srv;
726         CNCPeerControl* peer = Peer(srv_id);
727         if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
728             && peer->AcceptsSyncUpdate()) {
729             SNCMirrorEvent* event = new SNCMirrorEvent(eSyncUpdate, slot, key, update_time);
730             if (event) {
731                 peer->x_ProcessUpdateEvent(event);
732             }
733         }
734     }
735 }
736 
737 void
MirrorRemove(const CNCBlobKeyLight & key,Uint2 slot,Uint8 update_time)738 CNCPeerControl::MirrorRemove(const CNCBlobKeyLight& key,
739                                   Uint2 slot,
740                                   Uint8 update_time)
741 {
742     const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
743     ITERATE(TServersList, it_srv, servers) {
744         Uint8 srv_id = *it_srv;
745         CNCPeerControl* peer = Peer(srv_id);
746         if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
747             && peer->AcceptsSyncRemove()) {
748             SNCMirrorEvent* event = new SNCMirrorEvent(eSyncRemove, slot, key, update_time);
749             if (event) {
750                 peer->x_AddMirrorEvent(event, 0);
751             }
752         }
753     }
754 }
755 
756 void
MirrorWrite(const CNCBlobKeyLight & key,Uint2 slot,Uint8 orig_rec_no,Uint8 size,const TServersList & mirrors_done)757 CNCPeerControl::MirrorWrite(const CNCBlobKeyLight& key,
758                           Uint2 slot,
759                           Uint8 orig_rec_no,
760                           Uint8 size, const TServersList& mirrors_done)
761 {
762     set<Uint8> done(mirrors_done.begin(), mirrors_done.end());
763     const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
764     ITERATE(TServersList, it_srv, servers) {
765         Uint8 srv_id = *it_srv;
766         if (done.find(srv_id) == done.end()) {
767             CNCPeerControl* peer = Peer(srv_id);
768             if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
769                 && peer->AcceptsBlobKey(key)) {
770                 SNCMirrorEvent* event = new SNCMirrorEvent(eSyncWrite, slot, key, orig_rec_no);
771                 peer->x_AddMirrorEvent(event, size);
772             }
773         }
774     }
775 }
776 
777 void
MirrorProlong(const CNCBlobKeyLight & key,Uint2 slot,Uint8 orig_rec_no,Uint8 orig_time,const CNCBlobAccessor * accessor)778 CNCPeerControl::MirrorProlong(const CNCBlobKeyLight& key,
779                             Uint2 slot,
780                             Uint8 orig_rec_no,
781                             Uint8 orig_time,
782                             const CNCBlobAccessor* accessor)
783 {
784     const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
785     ITERATE(TServersList, it_srv, servers) {
786         Uint8 srv_id = *it_srv;
787         CNCPeerControl* peer = Peer(srv_id);
788         if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
789             && peer->AcceptsBlobKey(key)) {
790             SNCMirrorProlong* event = new SNCMirrorProlong(eSyncProlong, slot, key.PackedKey(),
791                                                    orig_rec_no, orig_time, accessor);
792             peer->x_AddMirrorEvent(event, 0);
793         }
794     }
795 }
796 
797 Uint8
GetMirrorQueueSize(void)798 CNCPeerControl::GetMirrorQueueSize(void)
799 {
800     return s_MirrorQueueSize.Get();
801 }
802 
803 Uint8
GetMirrorQueueSize(Uint8 srv_id)804 CNCPeerControl::GetMirrorQueueSize(Uint8 srv_id)
805 {
806     CNCPeerControl* peer = Peer(srv_id);
807     CMiniMutexGuard guard(peer->m_ObjLock);
808     return peer->m_SmallMirror.size() + peer->m_BigMirror.size();
809 }
810 
811 void
ReadCurState(SNCStateStat & state)812 CNCPeerControl::ReadCurState(SNCStateStat& state)
813 {
814     int active = 0,  bg = 0;
815     s_MapLock.Lock();
816     ITERATE(TControlMap, it_ctrl, s_Controls) {
817         CNCPeerControl* peer = it_ctrl->second;
818         active += peer->m_ActiveConns;
819         bg += peer->m_BGConns;
820     }
821     s_MapLock.Unlock();
822     state.mirror_queue_size = CNCPeerControl::GetMirrorQueueSize();
823     state.peer_active_conns = active;
824     state.peer_bg_conns = bg;
825 }
826 
827 Uint4
FindIPbyAlias(Uint4 alias)828 CNCPeerControl::FindIPbyAlias(Uint4 alias)
829 {
830     Uint4 res = 0;
831     s_MapLock.Lock();
832     ITERATE(TControlMap, it_ctrl, s_Controls) {
833         CNCPeerControl* peer = it_ctrl->second;
834         if (!peer->m_MaybeThrottle && peer->m_HostAlias == alias) {
835             res = peer->m_HostIP;
836             break;
837         }
838     }
839     s_MapLock.Unlock();
840     return res;
841 }
842 
843 Uint4
FindIPbyName(const string & alias)844 CNCPeerControl::FindIPbyName(const string& alias)
845 {
846     Uint4 res = 0;
847     s_MapLock.Lock();
848     ITERATE(TControlMap, it_ctrl, s_Controls) {
849         CNCPeerControl* peer = it_ctrl->second;
850         if (!peer->m_MaybeThrottle && peer->m_HostIPname == alias) {
851             res = peer->m_HostIP;
852             break;
853         }
854     }
855     s_MapLock.Unlock();
856     return res;
857 }
858 
859 bool
HasPeerInThrottle(void)860 CNCPeerControl::HasPeerInThrottle(void)
861 {
862     bool res = false;
863     s_MapLock.Lock();
864     ITERATE(TControlMap, it_ctrl, s_Controls) {
865         if (CNCDistributionConf::HasCommonSlots(it_ctrl->first) &&
866             it_ctrl->second->m_MaybeThrottle) {
867             res = true;
868             break;
869         }
870     }
871     s_MapLock.Unlock();
872     return res;
873 }
874 
PrintState(CSrvSocketTask & task)875 void CNCPeerControl::PrintState(CSrvSocketTask& task)
876 {
877     s_MapLock.Lock();
878     TControlMap ctrl = s_Controls;
879     s_MapLock.Unlock();
880 
881     string is("\": "), iss("\": \""), eol(",\n\""),  qt("\"");
882 
883     task.WriteText(eol).WriteText("peers").WriteText(is).WriteText("\n[");
884     for(TControlMap::const_iterator it = ctrl.begin(); it != ctrl.end(); ++it) {
885         if (it != ctrl.begin()) {
886             task.WriteText(",");
887         }
888         task.WriteText("{\n");
889         CNCPeerControl* peer = it->second;
890         task.WriteText(qt).WriteText("hostname").WriteText(iss).WriteText(
891                     CNCDistributionConf::GetPeerName(peer->GetSrvId())).WriteText(qt);
892         task.WriteText(eol).WriteText("hostIPname").WriteText(iss).WriteText(peer->m_HostIPname).WriteText(qt);
893         task.WriteText(eol).WriteText("hostProtocol").WriteText(is).WriteNumber(peer->m_HostProtocol);
894         task.WriteText(eol).WriteText("healthy").WriteText(is).WriteText(
895                     (peer->m_InThrottle || peer->m_MaybeThrottle) ? "false" : "true");
896         task.WriteText(eol).WriteText("initiallySynced").WriteText(is).WriteText(
897                     peer->m_InitiallySynced ? "true" : "false");
898         task.WriteText(eol).WriteText("origSlotsToInitSync").WriteText(is).WriteNumber(peer->m_OrigSlotsToInitSync);
899         task.WriteText(eol).WriteText("slotsToInitSync").WriteText(is).WriteNumber(peer->m_SlotsToInitSync);
900         task.WriteText(eol).WriteText("cntActiveSyncs").WriteText(is).WriteNumber(peer->m_CntActiveSyncs);
901         task.WriteText(eol).WriteText("cntNWErrors").WriteText(is).WriteNumber(peer->m_CntNWErrors);
902         task.WriteText(eol).WriteText("hasBGTasks").WriteText(is).WriteText(
903                     peer->m_HasBGTasks ? "true" : "false");
904         task.WriteText(eol).WriteText("activeConns").WriteText(is).WriteNumber(peer->m_ActiveConns);
905         task.WriteText(eol).WriteText("bGConns").WriteText(is).WriteNumber(peer->m_BGConns);
906         task.WriteText(eol).WriteText("cntBusyConns").WriteText(is).WriteNumber(peer->m_BusyConns.size());
907         task.WriteText(eol).WriteText("cntPooledConns").WriteText(is).WriteNumber(peer->m_PooledConns.size());
908         task.WriteText("\n}");
909     }
910     task.WriteText("]");
911 }
912 
913 void
SetServersForInitSync(Uint4 cnt_servers)914 CNCPeerControl::SetServersForInitSync(Uint4 cnt_servers)
915 {
916     s_ServersToSync = cnt_servers;
917     s_SyncOnInit.Set(cnt_servers);
918     s_WaitToOpenToClients.Set(cnt_servers);
919     s_AbortedSyncClients.Set(cnt_servers);
920 }
921 
922 void
ResetServersForInitSync(void)923 CNCPeerControl::ResetServersForInitSync(void)
924 {
925     SetServersForInitSync(s_ServersToSync);
926 }
927 
928 void
ReconfServersForInitSync(Uint4 cnt_servers)929 CNCPeerControl::ReconfServersForInitSync(Uint4 cnt_servers)
930 {
931     s_ServersToSync = cnt_servers;
932 }
933 
934 bool
HasServersForInitSync(void)935 CNCPeerControl::HasServersForInitSync(void)
936 {
937     return s_SyncOnInit.Get() != 0;
938 }
939 
940 bool
StartActiveSync(void)941 CNCPeerControl::StartActiveSync(void)
942 {
943     CMiniMutexGuard guard(m_ObjLock);
944     if (m_CntActiveSyncs >= CNCDistributionConf::GetMaxSyncsOneServer()) {
945         return false;
946     }
947     ++m_CntActiveSyncs;
948     return true;
949 }
950 
951 void
x_SrvInitiallySynced(bool succeeded)952 CNCPeerControl::x_SrvInitiallySynced(bool succeeded)
953 {
954     if (!m_InitiallySynced) {
955         INFO("Initial sync: for "
956             << CNCDistributionConf::GetFullPeerName(m_SrvId) << " completed");
957         m_InitiallySynced = true;
958         s_SyncOnInit.Add(-1);
959         CNCStat::InitialSyncDone(m_SrvId, succeeded);
960     }
961 }
962 
963 void
x_SlotsInitiallySynced(Uint2 cnt_slots,bool aborted)964 CNCPeerControl::x_SlotsInitiallySynced(Uint2 cnt_slots, bool aborted)
965 {
966     if (cnt_slots != 0  &&  m_SlotsToInitSync != 0) {
967         bool succeeded = true;
968         if (cnt_slots != 1) {
969             CNCAlerts::Register(CNCAlerts::eSyncFailed, CNCDistributionConf::GetFullPeerName(m_SrvId));
970             INFO("Initial sync: Server "
971                 << CNCDistributionConf::GetFullPeerName(m_SrvId) << " is out of reach (timeout)");
972             succeeded = false;
973         }
974         m_SlotsToInitSync -= cnt_slots;
975         if (m_SlotsToInitSync == 0) {
976             x_SrvInitiallySynced(succeeded);
977             if (aborted && s_AbortedSyncClients.Add(-1) == 0) {
978 #if 1
979                 SRV_LOG(Error, "Initial sync: unable to synchronize with any server");
980 #else
981                 SRV_LOG(Critical, "Initial sync: unable to synchronize with any server");
982                 CTaskServer::RequestShutdown(eSrvSlowShutdown);
983 #endif
984             }
985             if (s_WaitToOpenToClients.Add(-1) == 0)
986                 CNCServer::InitialSyncComplete();
987         }
988     }
989 }
990 
991 void
AddInitiallySyncedSlot(void)992 CNCPeerControl::AddInitiallySyncedSlot(void)
993 {
994     CMiniMutexGuard guard(m_ObjLock);
995     x_SlotsInitiallySynced(1);
996 }
997 
998 void
RegisterSyncStop(bool is_passive,Uint8 & next_sync_time,Uint8 next_sync_delay)999 CNCPeerControl::RegisterSyncStop(bool is_passive,
1000                                  Uint8& next_sync_time,
1001                                  Uint8 next_sync_delay)
1002 {
1003     CMiniMutexGuard guard(m_ObjLock);
1004     Uint8 now = CSrvTime::Current().AsUSec();
1005     Uint8 next_time = now + next_sync_delay;
1006     s_SetNextTime(next_sync_time, next_time, true);
1007     if (m_FirstNWErrTime == 0) {
1008         s_SetNextTime(m_NextSyncTime, now, false);
1009     }
1010     else {
1011         s_SetNextTime(m_NextSyncTime, next_time, true);
1012         if (now - m_FirstNWErrTime >= CNCDistributionConf::GetNetworkErrorTimeout())
1013             x_SlotsInitiallySynced(m_SlotsToInitSync, m_FirstNWErrTime == 1);
1014     }
1015 
1016     if (!is_passive)
1017         --m_CntActiveSyncs;
1018 }
1019 
1020 #ifdef _DEBUG
RegisterSyncStat(bool is_passive,bool is_by_blobs,int result,int hint)1021 void CNCPeerControl::RegisterSyncStat(bool is_passive, bool is_by_blobs, int result,  int hint)
1022 {
1023     size_t key = (is_passive ? 2 : 0) | (is_by_blobs ? 1 : 0);
1024     key <<= 8;
1025     key = key | (result & 0xFF);
1026     key <<= 16;
1027     key = key | (hint & 0xFFFF);
1028     CMiniMutexGuard guard(m_ObjLock);
1029     ++m_SyncStat[key];
1030 }
1031 
PrintSyncStat(CSrvSocketTask & task)1032 void CNCPeerControl::PrintSyncStat(CSrvSocketTask& task)
1033 {
1034     s_MapLock.Lock();
1035     TControlMap ctrl = s_Controls;
1036     s_MapLock.Unlock();
1037 
1038     string is("\": "), iss("\": \""), eol(",\n\""),  qt("\"");
1039 
1040     task.WriteText(eol).WriteText("peers").WriteText(is).WriteText("\n[");
1041     for(TControlMap::const_iterator it = ctrl.begin(); it != ctrl.end(); ++it) {
1042         if (it != ctrl.begin()) {
1043             task.WriteText(",");
1044         }
1045         task.WriteText("{\n");
1046         CNCPeerControl* peer = it->second;
1047         map<size_t, size_t> syncStat;
1048         {
1049             CMiniMutexGuard guard(peer->m_ObjLock);
1050             syncStat = peer->m_SyncStat;
1051         }
1052         task.WriteText(qt).WriteText("hostname").WriteText(iss).WriteText(
1053                     CNCDistributionConf::GetPeerName(peer->GetSrvId())).WriteText(qt);
1054         task.WriteText(eol).WriteText("stat").WriteText(is);
1055 
1056         bool first = true;
1057         for (map<size_t, size_t>::const_iterator i = syncStat.begin(); i != syncStat.end(); ++i) {
1058             if (first) {
1059                 first = false;
1060             } else {
1061                 task.WriteText(",");
1062             }
1063             task.WriteText("[\n");
1064             size_t key = i->first;
1065             size_t hint = key & 0xFFFF;
1066             key >>= 16;
1067             size_t result = key & 0xFF;
1068             key >>= 8;
1069             bool by_blobs = (key & 1) != 0;
1070             bool passive =  (key & 2) != 0;
1071             task.WriteText(qt).WriteText("passive").WriteText(is).WriteBool(passive);
1072             task.WriteText(eol).WriteText("by_blobs").WriteText(is).WriteBool(by_blobs);
1073             task.WriteText(eol).WriteText("result").WriteText(is).WriteNumber(result);
1074             task.WriteText(eol).WriteText("hint").WriteText(is).WriteNumber(hint);
1075             task.WriteText(eol).WriteText("count").WriteText(is).WriteNumber(i->second);
1076             task.WriteText("\n]");
1077         }
1078         task.WriteText("\n}");
1079     }
1080     task.WriteText("]");
1081 }
1082 #endif
1083 
AbortInitialSync(void)1084 void CNCPeerControl::AbortInitialSync(void)
1085 {
1086     m_FirstNWErrTime = 1;
1087 }
1088 
SetHostProtocol(Uint8 ver)1089 void CNCPeerControl::SetHostProtocol(Uint8 ver)
1090 {
1091     m_HostProtocol = ver;
1092     if (ver != 0) {
1093         m_MaybeThrottle = !m_InitiallySynced;
1094     }
1095 }
1096 
1097 bool
AddSyncControl(CNCActiveSyncControl * sync_ctrl)1098 CNCPeerControl::AddSyncControl(CNCActiveSyncControl* sync_ctrl)
1099 {
1100     bool has_more = true;
1101     bool task_added = false;
1102     SSyncTaskInfo task_info;
1103 
1104     m_ObjLock.Lock();
1105     while (has_more  &&  x_ReserveBGConn()) {
1106         CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
1107         if (!conn) {
1108             x_UnreserveBGConn();
1109             if (!task_added)
1110                 return false;
1111             m_ObjLock.Lock();
1112             break;
1113         }
1114         has_more = sync_ctrl->GetNextTask(task_info);
1115         sync_ctrl->ExecuteSyncTask(task_info, conn);
1116         task_added = true;
1117         m_ObjLock.Lock();
1118     }
1119     if (has_more) {
1120         m_SyncList.push_back(sync_ctrl);
1121         if (m_NextTaskSync == m_SyncList.end())
1122             m_NextTaskSync = m_SyncList.begin();
1123         m_HasBGTasks = true;
1124     }
1125     m_ObjLock.Unlock();
1126 
1127     return true;
1128 }
1129 
1130 void
RemoveSyncControl(CNCActiveSyncControl * sync_ctrl)1131 CNCPeerControl::RemoveSyncControl(CNCActiveSyncControl* sync_ctrl)
1132 {
1133     m_ObjLock.Lock();
1134     ERASE_ITERATE(TNCActiveSyncList, it_sync, m_SyncList) {
1135         CNCActiveSyncControl* ctrl = *it_sync;
1136         if (sync_ctrl == ctrl) {
1137             m_SyncList.erase(it_sync);
1138         }
1139     }
1140     m_ObjLock.Unlock();
1141 }
1142 
1143 bool
FinishSync(CNCActiveSyncControl * sync_ctrl)1144 CNCPeerControl::FinishSync(CNCActiveSyncControl* sync_ctrl)
1145 {
1146     m_ObjLock.Lock();
1147     if (x_ReserveBGConn()) {
1148         CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
1149         if (!conn) {
1150             x_UnreserveBGConn();
1151             return false;
1152         }
1153 
1154         SSyncTaskInfo task_info;
1155         sync_ctrl->GetNextTask(task_info);
1156         sync_ctrl->ExecuteSyncTask(task_info, conn);
1157     }
1158     else
1159     {
1160         m_SyncList.push_back(sync_ctrl);
1161         if (m_NextTaskSync == m_SyncList.end())
1162             m_NextTaskSync = m_SyncList.begin();
1163         m_HasBGTasks = true;
1164         m_ObjLock.Unlock();
1165     }
1166     return true;
1167 }
1168 
1169 void
ExecuteSlice(TSrvThreadNum)1170 CNCPeerControl::ExecuteSlice(TSrvThreadNum /* thr_num */)
1171 {
1172     if (CTaskServer::IsInShutdown())
1173         return;
1174 
1175 // check for timeouts
1176     m_ObjLock.Lock();
1177 
1178     NON_CONST_ITERATE(TNCPeerConnsList, it, m_BusyConns) {
1179         it->CheckCommandTimeout();
1180     }
1181 
1182     m_ObjLock.Unlock();
1183 
1184     RunAfter(1);
1185 }
1186 
1187 bool
GetReadyForShutdown(void)1188 CNCPeerControl::GetReadyForShutdown(void)
1189 {
1190     bool result = true;
1191 
1192     m_ObjLock.Lock();
1193     if (CTaskServer::IsInHardShutdown()) {
1194         while (!m_Clients.empty()) {
1195             CNCActiveClientHub* hub = m_Clients.front();
1196             m_Clients.pop_front();
1197             hub->SetErrMsg(GetMessageByStatus(eStatus_ShuttingDown));
1198             hub->SetStatus(eNCHubError);
1199             result = false;
1200         }
1201     }
1202     NON_CONST_ITERATE(TNCPeerConnsList, it, m_BusyConns) {
1203         it->CheckCommandTimeout();
1204         result = false;
1205     }
1206     ERASE_ITERATE(TNCActiveSyncList, it_sync, m_SyncList) {
1207         CNCActiveSyncControl* sync_ctrl = *it_sync;
1208         m_SyncList.erase(it_sync);
1209 
1210         SSyncTaskInfo task_info;
1211         bool has_more = sync_ctrl->GetNextTask(task_info);
1212         sync_ctrl->CmdFinished(eSynNetworkError, eSynActionNone, NULL, NC_SYNC_HINT);
1213         if (has_more)
1214             sync_ctrl->GetNextTask(task_info);
1215         result = false;
1216     }
1217     m_SyncList.clear();
1218     m_NextTaskSync = m_SyncList.end();
1219     x_UpdateHasTasks();
1220     if (m_HasBGTasks)
1221         result = false;
1222 
1223     if (result) {
1224         if (CNCStat::GetCntRunningCmds() != 0) {
1225             result = false;
1226         }
1227         else {
1228             while (!m_PooledConns.empty()) {
1229                 CNCActiveHandler* conn = &m_PooledConns.front();
1230                 m_PooledConns.pop_front();
1231                 conn->CloseForShutdown();
1232                 result = false;
1233             }
1234         }
1235     }
1236     m_ObjLock.Unlock();
1237 
1238     return result;
1239 }
1240 
1241 
CNCPeerShutdown(void)1242 CNCPeerShutdown::CNCPeerShutdown(void)
1243 {}
1244 
~CNCPeerShutdown(void)1245 CNCPeerShutdown::~CNCPeerShutdown(void)
1246 {}
1247 
1248 bool
ReadyForShutdown(void)1249 CNCPeerShutdown::ReadyForShutdown(void)
1250 {
1251     bool result = true;
1252     s_MapLock.Lock();
1253     ITERATE(TControlMap, it_ctrl, s_Controls) {
1254         CNCPeerControl* peer = it_ctrl->second;
1255         result &= peer->GetReadyForShutdown();
1256     }
1257     s_MapLock.Unlock();
1258     return result;
1259 }
1260 
1261 END_NCBI_SCOPE
1262