1 /* $Id: peer_control.cpp 545049 2017-08-31 13:38:24Z gouriano $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Pavel Ivanov
27 *
28 */
29
30 #include "nc_pch.hpp"
31
32 #include <util/random_gen.hpp>
33
34 #include "netcached.hpp"
35 #include "peer_control.hpp"
36 #include "active_handler.hpp"
37 #include "periodic_sync.hpp"
38 #include "distribution_conf.hpp"
39 #include "nc_storage_blob.hpp"
40
41
42 BEGIN_NCBI_SCOPE
43
44
45 typedef map<Uint8, CNCPeerControl*> TControlMap;
46 // "almost" not needed, as I pre-create them all on initialization
47 // BUT I can add new peers on RECONF
48 static CMiniMutex s_MapLock;
49 static TControlMap s_Controls;
50 static CAtomicCounter s_SyncOnInit;
51 static CAtomicCounter s_WaitToOpenToClients;
52 static CAtomicCounter s_AbortedSyncClients;
53
54 static CAtomicCounter s_MirrorQueueSize;
55 static FILE* s_MirrorLogFile = NULL;
56 CAtomicCounter CNCPeerControl::sm_TotalCopyRequests;
57 CAtomicCounter CNCPeerControl::sm_CopyReqsRejected;
58
59 static CNCPeerShutdown* s_ShutdownListener = NULL;
60 static Uint4 s_ServersToSync = 0;
61
62
63
64 static CMiniMutex s_RndLock;
65 static CRandom s_Rnd(CRandom::TValue(time(NULL)));
66
67 static void
s_SetNextTime(Uint8 & next_time,Uint8 value,bool add_random)68 s_SetNextTime(Uint8& next_time, Uint8 value, bool add_random)
69 {
70 if (add_random) {
71 s_RndLock.Lock();
72 value += s_Rnd.GetRand(0, kUSecsPerSecond);
73 s_RndLock.Unlock();
74 }
75 if (next_time < value)
76 next_time = value;
77 }
78
79
SNCMirrorProlong(ENCSyncEvent typ,Uint2 slot_,const CNCBlobKeyLight & key_,Uint8 rec_no,Uint8 tm,const CNCBlobAccessor * accessor)80 SNCMirrorProlong::SNCMirrorProlong(ENCSyncEvent typ,
81 Uint2 slot_,
82 const CNCBlobKeyLight& key_,
83 Uint8 rec_no,
84 Uint8 tm,
85 const CNCBlobAccessor* accessor)
86 : SNCMirrorEvent(typ, slot_, key_, rec_no),
87 orig_time(tm)
88 {
89 blob_sum.create_id = accessor->GetCurCreateId();
90 blob_sum.create_server = accessor->GetCurCreateServer();
91 blob_sum.create_time = accessor->GetCurBlobCreateTime();
92 blob_sum.dead_time = accessor->GetCurBlobDeadTime();
93 blob_sum.expire = accessor->GetCurBlobExpire();
94 blob_sum.ver_expire = accessor->GetCurVerExpire();
95 blob_sum.size = accessor->GetCurBlobSize();
96 }
97
98
99 bool
Initialize(void)100 CNCPeerControl::Initialize(void)
101 {
102 s_MirrorQueueSize.Set(0);
103 if (!CNCDistributionConf::GetMirroringSizeFile().empty()) {
104 s_MirrorLogFile = fopen(CNCDistributionConf::GetMirroringSizeFile().c_str(), "a");
105 }
106 sm_TotalCopyRequests.Set(0);
107 sm_CopyReqsRejected.Set(0);
108
109 s_ShutdownListener = new CNCPeerShutdown();
110 CTaskServer::AddShutdownCallback(s_ShutdownListener);
111
112 s_MapLock.Lock();
113 NON_CONST_ITERATE(TControlMap, it, s_Controls) {
114 it->second->SetRunnable();
115 }
116 s_MapLock.Unlock();
117
118 return true;
119 }
120
121 void
Finalize(void)122 CNCPeerControl::Finalize(void)
123 {
124 if (s_MirrorLogFile)
125 fclose(s_MirrorLogFile);
126 }
127
128 CNCPeerControl*
Peer(Uint8 srv_id)129 CNCPeerControl::Peer(Uint8 srv_id)
130 {
131 CNCPeerControl* ctrl;
132 s_MapLock.Lock();
133 TControlMap::const_iterator it = s_Controls.find(srv_id);
134 if (it == s_Controls.end()) {
135 ctrl = new CNCPeerControl(srv_id);
136 s_Controls[srv_id] = ctrl;
137 // s_ShutdownListener is set during initialization
138 if (s_ShutdownListener)
139 ctrl->SetRunnable();
140 }
141 else {
142 ctrl = it->second;
143 }
144 s_MapLock.Unlock();
145 return ctrl;
146 }
147
148 void
PeerHandshake(void)149 CNCPeerControl::PeerHandshake(void)
150 {
151 // the answer will come some time in the future
152 // until that, we use backward compatible protocol
153 if (AtomicCAS(m_HostProtocol, 0, 1)) {
154 CNCActiveHandler* conn = GetBGConn();
155 if (conn) {
156 conn->AskPeerVersion();
157 } else {
158 m_HostProtocol = 0;
159 }
160 }
161 }
162
163 string
GetPeerNameOrEmpty(Uint8 srv_id)164 CNCPeerControl::GetPeerNameOrEmpty(Uint8 srv_id)
165 {
166 CNCPeerControl *ctrl = NULL;
167 s_MapLock.Lock();
168 TControlMap::const_iterator it = s_Controls.find(srv_id);
169 if (it != s_Controls.end()) {
170 ctrl = it->second;
171 }
172 s_MapLock.Unlock();
173 string res;
174 if (ctrl != NULL) {
175 res += ctrl->m_Hostname;
176 res += ':';
177 res += NStr::NumericToString(Uint2(ctrl->m_SrvId));
178 }
179 return res;
180 }
181
182
CNCPeerControl(Uint8 srv_id)183 CNCPeerControl::CNCPeerControl(Uint8 srv_id)
184 : m_SrvId(srv_id),
185 m_HostIP( Uint4(m_SrvId >> 32)),
186 m_FirstNWErrTime(0),
187 m_NextSyncTime(0),
188 m_ActiveConns(0),
189 m_BGConns(0),
190 m_SlotsToInitSync(0),
191 m_OrigSlotsToInitSync(0),
192 m_CntActiveSyncs(0),
193 m_CntNWErrors(0),
194 m_CntNWThrottles(0),
195 m_InThrottle(false),
196 m_MaybeThrottle(false),
197 m_HasBGTasks(false),
198 m_InitiallySynced(false)
199 {
200 #if __NC_TASKS_MONITOR
201 m_TaskName = "CNCPeerControl";
202 #endif
203
204 m_NextTaskSync = m_SyncList.end();
205
206 // it MUST be "host:port", see CNCDistributionConf::Initialize
207 string hostport( CNCDistributionConf::GetPeerNameOrEmpty(m_SrvId));
208 if (!hostport.empty()) {
209 list<CTempString> srv_fields;
210 ncbi_NStr_Split(hostport, ":", srv_fields);
211 if (srv_fields.size() == 2) {
212 m_Hostname = srv_fields.front();
213 }
214 }
215 m_HostIPname = CTaskServer::IPToString(m_HostIP);
216 m_HostAlias = CNCDistributionConf::CreateHostAlias(m_HostIP, Uint4(m_SrvId));
217 m_HostProtocol = 0;
218 m_TrustLevel = 0;
219 }
220
221 void
RegisterConnError(void)222 CNCPeerControl::RegisterConnError(void)
223 {
224 CMiniMutexGuard guard(m_ObjLock);
225 if (m_FirstNWErrTime == 0)
226 m_FirstNWErrTime = CSrvTime::Current().AsUSec();
227 m_MaybeThrottle = true;
228 if (++m_CntNWErrors >= CNCDistributionConf::GetCntErrorsToThrottle()) {
229 m_InThrottle = true;
230 m_ThrottleStart = CSrvTime::Current().AsUSec();
231 ++m_CntNWThrottles;
232 }
233 m_HostProtocol = 0;
234 CWriteBackControl::ResetStatCounters();
235 }
236
237 void
RegisterConnSuccess(void)238 CNCPeerControl::RegisterConnSuccess(void)
239 {
240 bool ask = false;
241 {
242 CMiniMutexGuard guard(m_ObjLock);
243 m_InThrottle = false;
244 m_MaybeThrottle = !m_InitiallySynced;
245 m_FirstNWErrTime = 0;
246 m_CntNWErrors = 0;
247 m_CntNWThrottles = 0;
248 m_ThrottleStart = 0;
249 ask = m_HostProtocol == 0;
250 }
251 if (ask) {
252 PeerHandshake();
253 }
254 }
255
256 bool
CreateNewSocket(CNCActiveHandler * conn)257 CNCPeerControl::CreateNewSocket(CNCActiveHandler* conn)
258 {
259 if (CTaskServer::IsInHardShutdown())
260 return false;
261 if (m_InThrottle) {
262 m_ObjLock.Lock();
263 if (m_InThrottle) {
264 Uint8 cur_time = CSrvTime::Current().AsUSec();
265 Uint8 period = CNCDistributionConf::GetPeerThrottlePeriod();
266 if (cur_time - m_ThrottleStart <= period) {
267 m_ObjLock.Unlock();
268 SRV_LOG(Warning, "Connection to "
269 << CNCDistributionConf::GetFullPeerName(m_SrvId) << " is throttled");
270 return false;
271 }
272 if (m_CntNWThrottles >= CNCDistributionConf::GetCntThrottlesToIpchange()) {
273 Uint4 host = CTaskServer::GetIPByHost(m_Hostname);
274 if (host != 0 && m_HostIP != host) {
275 m_HostIP = host;
276 m_HostIPname = CTaskServer::IPToString(m_HostIP);
277 m_HostAlias = CNCDistributionConf::CreateHostAlias(m_HostIP, Uint4(m_SrvId));
278 m_HostProtocol = 0;
279 CNCAlerts::Register(CNCAlerts::ePeerIpChanged, CNCDistributionConf::GetFullPeerName(m_SrvId));
280 SRV_LOG(Warning, "IP address change: host "
281 << CNCDistributionConf::GetFullPeerName(m_SrvId));
282 }
283 m_CntNWThrottles = 0;
284 }
285 m_InThrottle = false;
286 m_MaybeThrottle = !m_InitiallySynced;
287 if (m_InitiallySynced)
288 m_FirstNWErrTime = 0;
289 m_CntNWErrors = 0;
290 m_ThrottleStart = 0;
291 }
292 m_ObjLock.Unlock();
293 }
294
295 CNCActiveHandler_Proxy* proxy = new CNCActiveHandler_Proxy(conn);
296 if (!proxy->Connect(m_HostIP, Uint2(m_SrvId))) {
297 delete proxy;
298 RegisterConnError();
299 return false;
300 }
301 conn->SetProxy(proxy);
302 return true;
303 }
304
305 CNCActiveHandler*
x_GetPooledConnImpl(void)306 CNCPeerControl::x_GetPooledConnImpl(void)
307 {
308 if (m_PooledConns.empty() || CTaskServer::IsInHardShutdown())
309 return NULL;
310
311 // it is important to have it this way, not the other
312 CNCActiveHandler* conn = &m_PooledConns.back();
313 m_PooledConns.pop_back();
314
315 m_BusyConns.push_back(*conn);
316
317 conn->m_LastActive = CSrvTime::CurSecs();
318 return conn;
319 }
320
321 CNCActiveHandler*
GetPooledConn(void)322 CNCPeerControl::GetPooledConn(void)
323 {
324 CMiniMutexGuard guard(m_ObjLock);
325 CNCActiveHandler* conn = x_GetPooledConnImpl();
326 if (conn) {
327 ++m_ActiveConns;
328 }
329 return conn;
330 }
331
332 inline void
x_UpdateHasTasks(void)333 CNCPeerControl::x_UpdateHasTasks(void)
334 {
335 m_HasBGTasks = !m_SmallMirror.empty() || !m_BigMirror.empty()
336 || !m_SyncList.empty();
337 if (m_HasBGTasks && CTaskServer::IsInShutdown() && m_BusyConns.empty()) {
338
339 // something went wrong: we still have work to do, but nobody to work on it
340 // but, it is shutdown time...
341 //
342 // the problem has to do with "size == 0" addition in x_AddMirrorEvent, here:
343 // if (size == 0 && x_ReserveBGConn()) {...}
344
345 SRV_LOG(Error, "Incomplete jobs on shutdown:"
346 << " m_SmallMirror: " << m_SmallMirror.size()
347 << ", m_BigMirror: " << m_BigMirror.size()
348 << ", m_SyncList: " << m_SyncList.size());
349 m_HasBGTasks = false;
350 }
351
352 #if 0
353 size_t conn = m_BusyConns.size() + 1;
354 if (conn < (size_t)m_ActiveConns) {
355 m_ActiveConns = conn;
356 #ifdef _DEBUG
357 CNCAlerts::Register(CNCAlerts::eDebugConnAdjusted1, "PutConnToPool");
358 #endif
359 }
360 if (conn < (size_t)m_BGConns) {
361 m_BGConns = conn;
362 #ifdef _DEBUG
363 CNCAlerts::Register(CNCAlerts::eDebugConnAdjusted2, "PutConnToPool");
364 #endif
365 }
366 #endif
367
368 }
369
370 bool
x_ReserveBGConn(void)371 CNCPeerControl::x_ReserveBGConn(void)
372 {
373 if (m_ActiveConns >= CNCDistributionConf::GetMaxPeerTotalConns()
374 || m_BGConns >= CNCDistributionConf::GetMaxPeerBGConns())
375 {
376 return false;
377 }
378 ++m_ActiveConns;
379 ++m_BGConns;
380 return true;
381 }
382
383 bool
x_ReserveBGConnNow(void)384 CNCPeerControl::x_ReserveBGConnNow(void)
385 {
386 ++m_ActiveConns;
387 ++m_BGConns;
388 return true;
389 }
390
391 inline void
x_IncBGConns(void)392 CNCPeerControl::x_IncBGConns(void)
393 {
394 ++m_BGConns;
395 }
396
397 inline void
x_DecBGConns(void)398 CNCPeerControl::x_DecBGConns(void)
399 {
400 --m_BGConns;
401 }
402
403 inline void
x_DecBGConns(CNCActiveHandler * conn)404 CNCPeerControl::x_DecBGConns(CNCActiveHandler* conn)
405 {
406 if (!conn || conn->IsReservedForBG()) {
407 x_DecBGConns();
408 if (conn)
409 conn->SetReservedForBG(false);
410 }
411 }
412
413 inline void
x_DecActiveConns(void)414 CNCPeerControl::x_DecActiveConns(void)
415 {
416 --m_ActiveConns;
417 }
418
419 inline void
x_UnreserveBGConn(void)420 CNCPeerControl::x_UnreserveBGConn(void)
421 {
422 m_ObjLock.Lock();
423 x_DecBGConns();
424 if(x_DoReleaseConn(NULL)) {
425 m_ObjLock.Unlock();
426 }
427 }
428
429 CNCActiveHandler*
x_CreateNewConn(bool for_bg)430 CNCPeerControl::x_CreateNewConn(bool for_bg)
431 {
432 CNCActiveHandler* conn = new CNCActiveHandler(m_SrvId, this);
433 conn->SetReservedForBG(for_bg);
434 if (!CreateNewSocket(conn)) {
435 delete conn;
436 conn = NULL;
437 }
438
439 if (conn) {
440 m_ObjLock.Lock();
441 m_BusyConns.push_back(*conn);
442 m_ObjLock.Unlock();
443 conn->m_LastActive = CSrvTime::CurSecs();
444 }
445
446 return conn;
447 }
448
449 bool
x_AssignClientConn(CNCActiveClientHub * hub,CNCActiveHandler * conn)450 CNCPeerControl::x_AssignClientConn(CNCActiveClientHub* hub,
451 CNCActiveHandler* conn)
452 {
453 if (!conn)
454 conn = x_GetPooledConnImpl();
455 m_ObjLock.Unlock();
456
457 if (!conn) {
458 conn = x_CreateNewConn(false);
459 if (!conn) {
460 hub->SetErrMsg(m_InThrottle? "ERR:Connection is throttled"
461 : "ERR:Cannot connect to peer");
462 hub->SetStatus(eNCHubError);
463 return false;
464 }
465 }
466 hub->SetHandler(conn);
467 conn->SetClientHub(hub);
468 hub->SetStatus(eNCHubConnReady);
469 return true;
470 }
471
472 void
AssignClientConn(CNCActiveClientHub * hub)473 CNCPeerControl::AssignClientConn(CNCActiveClientHub* hub)
474 {
475 m_ObjLock.Lock();
476 if (m_ActiveConns >= CNCDistributionConf::GetMaxPeerTotalConns()) {
477 hub->SetStatus(eNCHubWaitForConn);
478 m_Clients.push_back(hub);
479 m_ObjLock.Unlock();
480 return;
481 }
482 ++m_ActiveConns;
483 if (!x_AssignClientConn(hub, NULL)) {
484 m_ObjLock.Lock();
485 if (x_DoReleaseConn(NULL))
486 m_ObjLock.Unlock();
487 }
488 }
489
490 CNCActiveHandler*
x_GetBGConnImpl(void)491 CNCPeerControl::x_GetBGConnImpl(void)
492 {
493 CNCActiveHandler* conn = x_GetPooledConnImpl();
494 m_ObjLock.Unlock();
495 if (conn) {
496 conn->SetReservedForBG(true);
497 } else {
498 conn = x_CreateNewConn(true);
499 }
500 return conn;
501 }
502
503 CNCActiveHandler*
GetBGConn(bool silent)504 CNCPeerControl::GetBGConn(bool silent)
505 {
506 m_ObjLock.Lock();
507 if (!x_ReserveBGConn()) {
508 m_ObjLock.Unlock();
509 if(!silent) {
510 SRV_LOG(Warning, "Too many active (" << m_ActiveConns
511 << ") or background (" << m_BGConns
512 << ") connections");
513 }
514 return NULL;
515 }
516 CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
517 if (!conn)
518 x_UnreserveBGConn();
519 return conn;
520 }
521
522 bool
x_DoReleaseConn(CNCActiveHandler * conn)523 CNCPeerControl::x_DoReleaseConn(CNCActiveHandler* conn)
524 // m_ObjLock is locked on entrance
525 // method returns m_ObjLock.IsLocked state
526 // it is not unlocked here, because sometimes there is something else to do
527 {
528 retry:
529 bool is_locked = true;
530 if (!m_Clients.empty()) {
531 CNCActiveClientHub* hub = m_Clients.front();
532 m_Clients.pop_front();
533 if (!x_AssignClientConn(hub, conn)) { // m_ObjLock.Unlock
534 // unlocked now; we need to lock to retry
535 m_ObjLock.Lock();
536 goto retry;
537 }
538 is_locked = false;
539 }
540 else if (m_HasBGTasks && conn) {
541 // m_ObjLock is locked
542 if (!m_SmallMirror.empty() || !m_BigMirror.empty()) {
543 SNCMirrorEvent* event;
544 if (!m_SmallMirror.empty()) {
545 event = m_SmallMirror.front();
546 m_SmallMirror.pop_front();
547 }
548 else {
549 event = m_BigMirror.front();
550 m_BigMirror.pop_front();
551 }
552 s_MirrorQueueSize.Add(-1);
553 x_UpdateHasTasks();
554 conn->SetReservedForBG(true);
555 x_IncBGConns();
556 m_ObjLock.Unlock();
557 is_locked = false;
558 x_ProcessMirrorEvent(conn, event);
559 }
560 else if (!m_SyncList.empty()) {
561 bool is_valid = false;
562 CNCActiveSyncControl* sync_ctrl = nullptr;
563 SSyncTaskInfo task_info;
564 while (!is_valid && !m_SyncList.empty()) {
565 sync_ctrl = *m_NextTaskSync;
566 if (!sync_ctrl->GetNextTask(task_info, &is_valid)) {
567 TNCActiveSyncListIt cur_it = m_NextTaskSync;
568 ++m_NextTaskSync;
569 m_SyncList.erase(cur_it);
570 } else {
571 ++m_NextTaskSync;
572 }
573 if (m_NextTaskSync == m_SyncList.end()) {
574 m_NextTaskSync = m_SyncList.begin();
575 }
576 }
577 x_UpdateHasTasks();
578 if (is_valid) {
579 conn->SetReservedForBG(true);
580 x_IncBGConns();
581 m_ObjLock.Unlock();
582 is_locked = false;
583 sync_ctrl->ExecuteSyncTask(task_info, conn);
584 } else {
585 m_ObjLock.Unlock();
586 is_locked = false;
587 }
588 }
589 else {
590 m_HasBGTasks = false;
591 }
592 }
593 else {
594 x_DecActiveConns();
595 }
596 return is_locked;
597 }
598
599 void
PutConnToPool(CNCActiveHandler * conn)600 CNCPeerControl::PutConnToPool(CNCActiveHandler* conn)
601 {
602 m_ObjLock.Lock();
603 x_DecBGConns(conn);
604 if (x_DoReleaseConn(conn)) {
605 m_BusyConns.erase(m_BusyConns.iterator_to(*conn));
606 m_PooledConns.push_back(*conn);
607 m_ObjLock.Unlock();
608 }
609 }
610
611 void
ReleaseConn(CNCActiveHandler * conn)612 CNCPeerControl::ReleaseConn(CNCActiveHandler* conn)
613 {
614 m_ObjLock.Lock();
615 x_DecBGConns(conn);
616 m_BusyConns.erase(m_BusyConns.iterator_to(*conn));
617 if (x_DoReleaseConn(NULL))
618 m_ObjLock.Unlock();
619 }
620
621 void
x_DeleteMirrorEvent(SNCMirrorEvent * event)622 CNCPeerControl::x_DeleteMirrorEvent(SNCMirrorEvent* event)
623 {
624 if (event->evt_type == eSyncWrite || event->evt_type == eSyncUpdate || event->evt_type == eSyncRemove)
625 delete event;
626 else if (event->evt_type == eSyncProlong)
627 delete (SNCMirrorProlong*)event;
628 else {
629 SRV_FATAL("Unexpected mirror event type: " << event->evt_type);
630 }
631 }
632
633 void
x_ProcessUpdateEvent(SNCMirrorEvent * event)634 CNCPeerControl::x_ProcessUpdateEvent(SNCMirrorEvent* event)
635 {
636 m_ObjLock.Lock();
637 // if (x_ReserveBGConnNow()) {
638 if (x_ReserveBGConn()) {
639 CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
640 if (conn) {
641 x_ProcessMirrorEvent(conn, event);
642 } else {
643 x_DeleteMirrorEvent(event);
644 x_UnreserveBGConn();
645 }
646 } else {
647 m_ObjLock.Unlock();
648 x_DeleteMirrorEvent(event);
649 }
650 }
651
652 void
x_ProcessMirrorEvent(CNCActiveHandler * conn,SNCMirrorEvent * event)653 CNCPeerControl::x_ProcessMirrorEvent(CNCActiveHandler* conn, SNCMirrorEvent* event)
654 {
655 if (event->evt_type == eSyncWrite) {
656 conn->CopyPut(NULL, event->key, event->slot, event->orig_rec_no);
657 }
658 else if (event->evt_type == eSyncProlong) {
659 SNCMirrorProlong* prolong = (SNCMirrorProlong*)event;
660 conn->CopyProlong(prolong->key, prolong->slot, prolong->orig_rec_no,
661 prolong->orig_time, prolong->blob_sum);
662 }
663 else if (event->evt_type == eSyncUpdate) {
664 conn->CopyUpdate(event->key, event->orig_rec_no);
665 }
666 else if (event->evt_type == eSyncRemove) {
667 conn->CopyRemove(event->key, event->orig_rec_no);
668 }
669 else {
670 SRV_FATAL("Unexpected mirror event type: " << event->evt_type);
671 }
672 x_DeleteMirrorEvent(event);
673 }
674
675 void
x_AddMirrorEvent(SNCMirrorEvent * event,Uint8 size)676 CNCPeerControl::x_AddMirrorEvent(SNCMirrorEvent* event, Uint8 size)
677 {
678 sm_TotalCopyRequests.Add(1);
679
680 m_ObjLock.Lock();
681 // all blobs (size!=0) go into queue
682 // this reduces response time
683 if ((size == 0 || m_BusyConns.empty()) && x_ReserveBGConn()) {
684 CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
685 if (conn)
686 x_ProcessMirrorEvent(conn, event);
687 else {
688 x_DeleteMirrorEvent(event);
689 x_UnreserveBGConn();
690 }
691 }
692 else {
693 TNCMirrorQueue* q;
694 if (size <= CNCDistributionConf::GetSmallBlobBoundary())
695 q = &m_SmallMirror;
696 else
697 q = &m_BigMirror;
698 if (q->size() < CNCDistributionConf::GetMaxMirrorQueueSize()) {
699 q->push_back(event);
700 m_HasBGTasks = true;
701 m_ObjLock.Unlock();
702
703 int queue_size = s_MirrorQueueSize.Add(1);
704 if (s_MirrorLogFile) {
705 Uint8 cur_time = CSrvTime::Current().AsUSec();
706 fprintf(s_MirrorLogFile, "%" NCBI_UINT8_FORMAT_SPEC ",%d\n",
707 cur_time, queue_size);
708 }
709 }
710 else {
711 m_ObjLock.Unlock();
712 sm_CopyReqsRejected.Add(1);
713 x_DeleteMirrorEvent(event);
714 }
715 }
716 }
717
718 void
MirrorUpdate(const CNCBlobKeyLight & key,Uint2 slot,Uint8 update_time)719 CNCPeerControl::MirrorUpdate(const CNCBlobKeyLight& key,
720 Uint2 slot,
721 Uint8 update_time)
722 {
723 const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
724 ITERATE(TServersList, it_srv, servers) {
725 Uint8 srv_id = *it_srv;
726 CNCPeerControl* peer = Peer(srv_id);
727 if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
728 && peer->AcceptsSyncUpdate()) {
729 SNCMirrorEvent* event = new SNCMirrorEvent(eSyncUpdate, slot, key, update_time);
730 if (event) {
731 peer->x_ProcessUpdateEvent(event);
732 }
733 }
734 }
735 }
736
737 void
MirrorRemove(const CNCBlobKeyLight & key,Uint2 slot,Uint8 update_time)738 CNCPeerControl::MirrorRemove(const CNCBlobKeyLight& key,
739 Uint2 slot,
740 Uint8 update_time)
741 {
742 const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
743 ITERATE(TServersList, it_srv, servers) {
744 Uint8 srv_id = *it_srv;
745 CNCPeerControl* peer = Peer(srv_id);
746 if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
747 && peer->AcceptsSyncRemove()) {
748 SNCMirrorEvent* event = new SNCMirrorEvent(eSyncRemove, slot, key, update_time);
749 if (event) {
750 peer->x_AddMirrorEvent(event, 0);
751 }
752 }
753 }
754 }
755
756 void
MirrorWrite(const CNCBlobKeyLight & key,Uint2 slot,Uint8 orig_rec_no,Uint8 size,const TServersList & mirrors_done)757 CNCPeerControl::MirrorWrite(const CNCBlobKeyLight& key,
758 Uint2 slot,
759 Uint8 orig_rec_no,
760 Uint8 size, const TServersList& mirrors_done)
761 {
762 set<Uint8> done(mirrors_done.begin(), mirrors_done.end());
763 const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
764 ITERATE(TServersList, it_srv, servers) {
765 Uint8 srv_id = *it_srv;
766 if (done.find(srv_id) == done.end()) {
767 CNCPeerControl* peer = Peer(srv_id);
768 if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
769 && peer->AcceptsBlobKey(key)) {
770 SNCMirrorEvent* event = new SNCMirrorEvent(eSyncWrite, slot, key, orig_rec_no);
771 peer->x_AddMirrorEvent(event, size);
772 }
773 }
774 }
775 }
776
777 void
MirrorProlong(const CNCBlobKeyLight & key,Uint2 slot,Uint8 orig_rec_no,Uint8 orig_time,const CNCBlobAccessor * accessor)778 CNCPeerControl::MirrorProlong(const CNCBlobKeyLight& key,
779 Uint2 slot,
780 Uint8 orig_rec_no,
781 Uint8 orig_time,
782 const CNCBlobAccessor* accessor)
783 {
784 const TServersList& servers = CNCDistributionConf::GetRawServersForSlot(slot);
785 ITERATE(TServersList, it_srv, servers) {
786 Uint8 srv_id = *it_srv;
787 CNCPeerControl* peer = Peer(srv_id);
788 if (CNCDistributionConf::GetSelfTrustLevel() >= peer->GetTrustLevel()
789 && peer->AcceptsBlobKey(key)) {
790 SNCMirrorProlong* event = new SNCMirrorProlong(eSyncProlong, slot, key.PackedKey(),
791 orig_rec_no, orig_time, accessor);
792 peer->x_AddMirrorEvent(event, 0);
793 }
794 }
795 }
796
797 Uint8
GetMirrorQueueSize(void)798 CNCPeerControl::GetMirrorQueueSize(void)
799 {
800 return s_MirrorQueueSize.Get();
801 }
802
803 Uint8
GetMirrorQueueSize(Uint8 srv_id)804 CNCPeerControl::GetMirrorQueueSize(Uint8 srv_id)
805 {
806 CNCPeerControl* peer = Peer(srv_id);
807 CMiniMutexGuard guard(peer->m_ObjLock);
808 return peer->m_SmallMirror.size() + peer->m_BigMirror.size();
809 }
810
811 void
ReadCurState(SNCStateStat & state)812 CNCPeerControl::ReadCurState(SNCStateStat& state)
813 {
814 int active = 0, bg = 0;
815 s_MapLock.Lock();
816 ITERATE(TControlMap, it_ctrl, s_Controls) {
817 CNCPeerControl* peer = it_ctrl->second;
818 active += peer->m_ActiveConns;
819 bg += peer->m_BGConns;
820 }
821 s_MapLock.Unlock();
822 state.mirror_queue_size = CNCPeerControl::GetMirrorQueueSize();
823 state.peer_active_conns = active;
824 state.peer_bg_conns = bg;
825 }
826
827 Uint4
FindIPbyAlias(Uint4 alias)828 CNCPeerControl::FindIPbyAlias(Uint4 alias)
829 {
830 Uint4 res = 0;
831 s_MapLock.Lock();
832 ITERATE(TControlMap, it_ctrl, s_Controls) {
833 CNCPeerControl* peer = it_ctrl->second;
834 if (!peer->m_MaybeThrottle && peer->m_HostAlias == alias) {
835 res = peer->m_HostIP;
836 break;
837 }
838 }
839 s_MapLock.Unlock();
840 return res;
841 }
842
843 Uint4
FindIPbyName(const string & alias)844 CNCPeerControl::FindIPbyName(const string& alias)
845 {
846 Uint4 res = 0;
847 s_MapLock.Lock();
848 ITERATE(TControlMap, it_ctrl, s_Controls) {
849 CNCPeerControl* peer = it_ctrl->second;
850 if (!peer->m_MaybeThrottle && peer->m_HostIPname == alias) {
851 res = peer->m_HostIP;
852 break;
853 }
854 }
855 s_MapLock.Unlock();
856 return res;
857 }
858
859 bool
HasPeerInThrottle(void)860 CNCPeerControl::HasPeerInThrottle(void)
861 {
862 bool res = false;
863 s_MapLock.Lock();
864 ITERATE(TControlMap, it_ctrl, s_Controls) {
865 if (CNCDistributionConf::HasCommonSlots(it_ctrl->first) &&
866 it_ctrl->second->m_MaybeThrottle) {
867 res = true;
868 break;
869 }
870 }
871 s_MapLock.Unlock();
872 return res;
873 }
874
PrintState(CSrvSocketTask & task)875 void CNCPeerControl::PrintState(CSrvSocketTask& task)
876 {
877 s_MapLock.Lock();
878 TControlMap ctrl = s_Controls;
879 s_MapLock.Unlock();
880
881 string is("\": "), iss("\": \""), eol(",\n\""), qt("\"");
882
883 task.WriteText(eol).WriteText("peers").WriteText(is).WriteText("\n[");
884 for(TControlMap::const_iterator it = ctrl.begin(); it != ctrl.end(); ++it) {
885 if (it != ctrl.begin()) {
886 task.WriteText(",");
887 }
888 task.WriteText("{\n");
889 CNCPeerControl* peer = it->second;
890 task.WriteText(qt).WriteText("hostname").WriteText(iss).WriteText(
891 CNCDistributionConf::GetPeerName(peer->GetSrvId())).WriteText(qt);
892 task.WriteText(eol).WriteText("hostIPname").WriteText(iss).WriteText(peer->m_HostIPname).WriteText(qt);
893 task.WriteText(eol).WriteText("hostProtocol").WriteText(is).WriteNumber(peer->m_HostProtocol);
894 task.WriteText(eol).WriteText("healthy").WriteText(is).WriteText(
895 (peer->m_InThrottle || peer->m_MaybeThrottle) ? "false" : "true");
896 task.WriteText(eol).WriteText("initiallySynced").WriteText(is).WriteText(
897 peer->m_InitiallySynced ? "true" : "false");
898 task.WriteText(eol).WriteText("origSlotsToInitSync").WriteText(is).WriteNumber(peer->m_OrigSlotsToInitSync);
899 task.WriteText(eol).WriteText("slotsToInitSync").WriteText(is).WriteNumber(peer->m_SlotsToInitSync);
900 task.WriteText(eol).WriteText("cntActiveSyncs").WriteText(is).WriteNumber(peer->m_CntActiveSyncs);
901 task.WriteText(eol).WriteText("cntNWErrors").WriteText(is).WriteNumber(peer->m_CntNWErrors);
902 task.WriteText(eol).WriteText("hasBGTasks").WriteText(is).WriteText(
903 peer->m_HasBGTasks ? "true" : "false");
904 task.WriteText(eol).WriteText("activeConns").WriteText(is).WriteNumber(peer->m_ActiveConns);
905 task.WriteText(eol).WriteText("bGConns").WriteText(is).WriteNumber(peer->m_BGConns);
906 task.WriteText(eol).WriteText("cntBusyConns").WriteText(is).WriteNumber(peer->m_BusyConns.size());
907 task.WriteText(eol).WriteText("cntPooledConns").WriteText(is).WriteNumber(peer->m_PooledConns.size());
908 task.WriteText("\n}");
909 }
910 task.WriteText("]");
911 }
912
913 void
SetServersForInitSync(Uint4 cnt_servers)914 CNCPeerControl::SetServersForInitSync(Uint4 cnt_servers)
915 {
916 s_ServersToSync = cnt_servers;
917 s_SyncOnInit.Set(cnt_servers);
918 s_WaitToOpenToClients.Set(cnt_servers);
919 s_AbortedSyncClients.Set(cnt_servers);
920 }
921
922 void
ResetServersForInitSync(void)923 CNCPeerControl::ResetServersForInitSync(void)
924 {
925 SetServersForInitSync(s_ServersToSync);
926 }
927
928 void
ReconfServersForInitSync(Uint4 cnt_servers)929 CNCPeerControl::ReconfServersForInitSync(Uint4 cnt_servers)
930 {
931 s_ServersToSync = cnt_servers;
932 }
933
934 bool
HasServersForInitSync(void)935 CNCPeerControl::HasServersForInitSync(void)
936 {
937 return s_SyncOnInit.Get() != 0;
938 }
939
940 bool
StartActiveSync(void)941 CNCPeerControl::StartActiveSync(void)
942 {
943 CMiniMutexGuard guard(m_ObjLock);
944 if (m_CntActiveSyncs >= CNCDistributionConf::GetMaxSyncsOneServer()) {
945 return false;
946 }
947 ++m_CntActiveSyncs;
948 return true;
949 }
950
951 void
x_SrvInitiallySynced(bool succeeded)952 CNCPeerControl::x_SrvInitiallySynced(bool succeeded)
953 {
954 if (!m_InitiallySynced) {
955 INFO("Initial sync: for "
956 << CNCDistributionConf::GetFullPeerName(m_SrvId) << " completed");
957 m_InitiallySynced = true;
958 s_SyncOnInit.Add(-1);
959 CNCStat::InitialSyncDone(m_SrvId, succeeded);
960 }
961 }
962
963 void
x_SlotsInitiallySynced(Uint2 cnt_slots,bool aborted)964 CNCPeerControl::x_SlotsInitiallySynced(Uint2 cnt_slots, bool aborted)
965 {
966 if (cnt_slots != 0 && m_SlotsToInitSync != 0) {
967 bool succeeded = true;
968 if (cnt_slots != 1) {
969 CNCAlerts::Register(CNCAlerts::eSyncFailed, CNCDistributionConf::GetFullPeerName(m_SrvId));
970 INFO("Initial sync: Server "
971 << CNCDistributionConf::GetFullPeerName(m_SrvId) << " is out of reach (timeout)");
972 succeeded = false;
973 }
974 m_SlotsToInitSync -= cnt_slots;
975 if (m_SlotsToInitSync == 0) {
976 x_SrvInitiallySynced(succeeded);
977 if (aborted && s_AbortedSyncClients.Add(-1) == 0) {
978 #if 1
979 SRV_LOG(Error, "Initial sync: unable to synchronize with any server");
980 #else
981 SRV_LOG(Critical, "Initial sync: unable to synchronize with any server");
982 CTaskServer::RequestShutdown(eSrvSlowShutdown);
983 #endif
984 }
985 if (s_WaitToOpenToClients.Add(-1) == 0)
986 CNCServer::InitialSyncComplete();
987 }
988 }
989 }
990
991 void
AddInitiallySyncedSlot(void)992 CNCPeerControl::AddInitiallySyncedSlot(void)
993 {
994 CMiniMutexGuard guard(m_ObjLock);
995 x_SlotsInitiallySynced(1);
996 }
997
998 void
RegisterSyncStop(bool is_passive,Uint8 & next_sync_time,Uint8 next_sync_delay)999 CNCPeerControl::RegisterSyncStop(bool is_passive,
1000 Uint8& next_sync_time,
1001 Uint8 next_sync_delay)
1002 {
1003 CMiniMutexGuard guard(m_ObjLock);
1004 Uint8 now = CSrvTime::Current().AsUSec();
1005 Uint8 next_time = now + next_sync_delay;
1006 s_SetNextTime(next_sync_time, next_time, true);
1007 if (m_FirstNWErrTime == 0) {
1008 s_SetNextTime(m_NextSyncTime, now, false);
1009 }
1010 else {
1011 s_SetNextTime(m_NextSyncTime, next_time, true);
1012 if (now - m_FirstNWErrTime >= CNCDistributionConf::GetNetworkErrorTimeout())
1013 x_SlotsInitiallySynced(m_SlotsToInitSync, m_FirstNWErrTime == 1);
1014 }
1015
1016 if (!is_passive)
1017 --m_CntActiveSyncs;
1018 }
1019
1020 #ifdef _DEBUG
RegisterSyncStat(bool is_passive,bool is_by_blobs,int result,int hint)1021 void CNCPeerControl::RegisterSyncStat(bool is_passive, bool is_by_blobs, int result, int hint)
1022 {
1023 size_t key = (is_passive ? 2 : 0) | (is_by_blobs ? 1 : 0);
1024 key <<= 8;
1025 key = key | (result & 0xFF);
1026 key <<= 16;
1027 key = key | (hint & 0xFFFF);
1028 CMiniMutexGuard guard(m_ObjLock);
1029 ++m_SyncStat[key];
1030 }
1031
PrintSyncStat(CSrvSocketTask & task)1032 void CNCPeerControl::PrintSyncStat(CSrvSocketTask& task)
1033 {
1034 s_MapLock.Lock();
1035 TControlMap ctrl = s_Controls;
1036 s_MapLock.Unlock();
1037
1038 string is("\": "), iss("\": \""), eol(",\n\""), qt("\"");
1039
1040 task.WriteText(eol).WriteText("peers").WriteText(is).WriteText("\n[");
1041 for(TControlMap::const_iterator it = ctrl.begin(); it != ctrl.end(); ++it) {
1042 if (it != ctrl.begin()) {
1043 task.WriteText(",");
1044 }
1045 task.WriteText("{\n");
1046 CNCPeerControl* peer = it->second;
1047 map<size_t, size_t> syncStat;
1048 {
1049 CMiniMutexGuard guard(peer->m_ObjLock);
1050 syncStat = peer->m_SyncStat;
1051 }
1052 task.WriteText(qt).WriteText("hostname").WriteText(iss).WriteText(
1053 CNCDistributionConf::GetPeerName(peer->GetSrvId())).WriteText(qt);
1054 task.WriteText(eol).WriteText("stat").WriteText(is);
1055
1056 bool first = true;
1057 for (map<size_t, size_t>::const_iterator i = syncStat.begin(); i != syncStat.end(); ++i) {
1058 if (first) {
1059 first = false;
1060 } else {
1061 task.WriteText(",");
1062 }
1063 task.WriteText("[\n");
1064 size_t key = i->first;
1065 size_t hint = key & 0xFFFF;
1066 key >>= 16;
1067 size_t result = key & 0xFF;
1068 key >>= 8;
1069 bool by_blobs = (key & 1) != 0;
1070 bool passive = (key & 2) != 0;
1071 task.WriteText(qt).WriteText("passive").WriteText(is).WriteBool(passive);
1072 task.WriteText(eol).WriteText("by_blobs").WriteText(is).WriteBool(by_blobs);
1073 task.WriteText(eol).WriteText("result").WriteText(is).WriteNumber(result);
1074 task.WriteText(eol).WriteText("hint").WriteText(is).WriteNumber(hint);
1075 task.WriteText(eol).WriteText("count").WriteText(is).WriteNumber(i->second);
1076 task.WriteText("\n]");
1077 }
1078 task.WriteText("\n}");
1079 }
1080 task.WriteText("]");
1081 }
1082 #endif
1083
AbortInitialSync(void)1084 void CNCPeerControl::AbortInitialSync(void)
1085 {
1086 m_FirstNWErrTime = 1;
1087 }
1088
SetHostProtocol(Uint8 ver)1089 void CNCPeerControl::SetHostProtocol(Uint8 ver)
1090 {
1091 m_HostProtocol = ver;
1092 if (ver != 0) {
1093 m_MaybeThrottle = !m_InitiallySynced;
1094 }
1095 }
1096
1097 bool
AddSyncControl(CNCActiveSyncControl * sync_ctrl)1098 CNCPeerControl::AddSyncControl(CNCActiveSyncControl* sync_ctrl)
1099 {
1100 bool has_more = true;
1101 bool task_added = false;
1102 SSyncTaskInfo task_info;
1103
1104 m_ObjLock.Lock();
1105 while (has_more && x_ReserveBGConn()) {
1106 CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
1107 if (!conn) {
1108 x_UnreserveBGConn();
1109 if (!task_added)
1110 return false;
1111 m_ObjLock.Lock();
1112 break;
1113 }
1114 has_more = sync_ctrl->GetNextTask(task_info);
1115 sync_ctrl->ExecuteSyncTask(task_info, conn);
1116 task_added = true;
1117 m_ObjLock.Lock();
1118 }
1119 if (has_more) {
1120 m_SyncList.push_back(sync_ctrl);
1121 if (m_NextTaskSync == m_SyncList.end())
1122 m_NextTaskSync = m_SyncList.begin();
1123 m_HasBGTasks = true;
1124 }
1125 m_ObjLock.Unlock();
1126
1127 return true;
1128 }
1129
1130 void
RemoveSyncControl(CNCActiveSyncControl * sync_ctrl)1131 CNCPeerControl::RemoveSyncControl(CNCActiveSyncControl* sync_ctrl)
1132 {
1133 m_ObjLock.Lock();
1134 ERASE_ITERATE(TNCActiveSyncList, it_sync, m_SyncList) {
1135 CNCActiveSyncControl* ctrl = *it_sync;
1136 if (sync_ctrl == ctrl) {
1137 m_SyncList.erase(it_sync);
1138 }
1139 }
1140 m_ObjLock.Unlock();
1141 }
1142
1143 bool
FinishSync(CNCActiveSyncControl * sync_ctrl)1144 CNCPeerControl::FinishSync(CNCActiveSyncControl* sync_ctrl)
1145 {
1146 m_ObjLock.Lock();
1147 if (x_ReserveBGConn()) {
1148 CNCActiveHandler* conn = x_GetBGConnImpl(); // m_ObjLock.Unlock
1149 if (!conn) {
1150 x_UnreserveBGConn();
1151 return false;
1152 }
1153
1154 SSyncTaskInfo task_info;
1155 sync_ctrl->GetNextTask(task_info);
1156 sync_ctrl->ExecuteSyncTask(task_info, conn);
1157 }
1158 else
1159 {
1160 m_SyncList.push_back(sync_ctrl);
1161 if (m_NextTaskSync == m_SyncList.end())
1162 m_NextTaskSync = m_SyncList.begin();
1163 m_HasBGTasks = true;
1164 m_ObjLock.Unlock();
1165 }
1166 return true;
1167 }
1168
1169 void
ExecuteSlice(TSrvThreadNum)1170 CNCPeerControl::ExecuteSlice(TSrvThreadNum /* thr_num */)
1171 {
1172 if (CTaskServer::IsInShutdown())
1173 return;
1174
1175 // check for timeouts
1176 m_ObjLock.Lock();
1177
1178 NON_CONST_ITERATE(TNCPeerConnsList, it, m_BusyConns) {
1179 it->CheckCommandTimeout();
1180 }
1181
1182 m_ObjLock.Unlock();
1183
1184 RunAfter(1);
1185 }
1186
1187 bool
GetReadyForShutdown(void)1188 CNCPeerControl::GetReadyForShutdown(void)
1189 {
1190 bool result = true;
1191
1192 m_ObjLock.Lock();
1193 if (CTaskServer::IsInHardShutdown()) {
1194 while (!m_Clients.empty()) {
1195 CNCActiveClientHub* hub = m_Clients.front();
1196 m_Clients.pop_front();
1197 hub->SetErrMsg(GetMessageByStatus(eStatus_ShuttingDown));
1198 hub->SetStatus(eNCHubError);
1199 result = false;
1200 }
1201 }
1202 NON_CONST_ITERATE(TNCPeerConnsList, it, m_BusyConns) {
1203 it->CheckCommandTimeout();
1204 result = false;
1205 }
1206 ERASE_ITERATE(TNCActiveSyncList, it_sync, m_SyncList) {
1207 CNCActiveSyncControl* sync_ctrl = *it_sync;
1208 m_SyncList.erase(it_sync);
1209
1210 SSyncTaskInfo task_info;
1211 bool has_more = sync_ctrl->GetNextTask(task_info);
1212 sync_ctrl->CmdFinished(eSynNetworkError, eSynActionNone, NULL, NC_SYNC_HINT);
1213 if (has_more)
1214 sync_ctrl->GetNextTask(task_info);
1215 result = false;
1216 }
1217 m_SyncList.clear();
1218 m_NextTaskSync = m_SyncList.end();
1219 x_UpdateHasTasks();
1220 if (m_HasBGTasks)
1221 result = false;
1222
1223 if (result) {
1224 if (CNCStat::GetCntRunningCmds() != 0) {
1225 result = false;
1226 }
1227 else {
1228 while (!m_PooledConns.empty()) {
1229 CNCActiveHandler* conn = &m_PooledConns.front();
1230 m_PooledConns.pop_front();
1231 conn->CloseForShutdown();
1232 result = false;
1233 }
1234 }
1235 }
1236 m_ObjLock.Unlock();
1237
1238 return result;
1239 }
1240
1241
CNCPeerShutdown(void)1242 CNCPeerShutdown::CNCPeerShutdown(void)
1243 {}
1244
~CNCPeerShutdown(void)1245 CNCPeerShutdown::~CNCPeerShutdown(void)
1246 {}
1247
1248 bool
ReadyForShutdown(void)1249 CNCPeerShutdown::ReadyForShutdown(void)
1250 {
1251 bool result = true;
1252 s_MapLock.Lock();
1253 ITERATE(TControlMap, it_ctrl, s_Controls) {
1254 CNCPeerControl* peer = it_ctrl->second;
1255 result &= peer->GetReadyForShutdown();
1256 }
1257 s_MapLock.Unlock();
1258 return result;
1259 }
1260
1261 END_NCBI_SCOPE
1262