1 #ifndef NETCACHE__PEER_CONTROL__HPP
2 #define NETCACHE__PEER_CONTROL__HPP
3 /*  $Id: peer_control.hpp 545049 2017-08-31 13:38:24Z gouriano $
4  * ===========================================================================
5  *
6  *                            PUBLIC DOMAIN NOTICE
7  *               National Center for Biotechnology Information
8  *
9  *  This software/database is a "United States Government Work" under the
10  *  terms of the United States Copyright Act.  It was written as part of
11  *  the author's official duties as a United States Government employee and
12  *  thus cannot be copyrighted.  This software/database is freely available
13  *  to the public for use. The National Library of Medicine and the U.S.
14  *  Government have not placed any restriction on its use or reproduction.
15  *
16  *  Although all reasonable efforts have been taken to ensure the accuracy
17  *  and reliability of the software and data, the NLM and the U.S.
18  *  Government do not and cannot warrant the performance or results that
19  *  may be obtained by using this software or data. The NLM and the U.S.
20  *  Government disclaim all warranties, express or implied, including
21  *  warranties of performance, merchantability or fitness for any particular
22  *  purpose.
23  *
24  *  Please cite the author in any work or product based on this material.
25  *
26  * ===========================================================================
27  *
28  * Authors:  Pavel Ivanov
29  *
30  * File Description: Network cache daemon
31  *
32  */
33 
34 
35 #include "sync_log.hpp"
36 #include "nc_db_info.hpp"
37 #include "nc_stat.hpp"
38 #include <set>
39 
40 
41 BEGIN_NCBI_SCOPE
42 
43 
44 class CNCActiveHandler;
45 class CNCActiveClientHub;
46 class CNCBlobAccessor;
47 class CNCActiveSyncControl;
48 
49 struct SActiveList_tag;
50 typedef intr::list_base_hook<intr::tag<SActiveList_tag> >   TActiveListHook;
51 typedef intr::list<CNCActiveHandler,
52                    intr::base_hook<TActiveListHook>,
53                    intr::constant_time_size<false> >        TNCPeerConnsList;
54 typedef list<CNCActiveClientHub*>   TNCClientHubsList;
55 typedef list<CNCActiveSyncControl*> TNCActiveSyncList;
56 typedef TNCActiveSyncList::iterator TNCActiveSyncListIt;
57 
58 
59 struct SNCMirrorEvent
60 {
61     ENCSyncEvent evt_type;
62     Uint2   slot;
63     CNCBlobKeyLight  key;
64     Uint8   orig_rec_no;
65 
66 
SNCMirrorEventSNCMirrorEvent67     SNCMirrorEvent(ENCSyncEvent typ,
68                    Uint2 slot_,
69                    const CNCBlobKeyLight& key_,
70                    Uint8 rec_no)
71         : evt_type(typ),
72           slot(slot_),
73           key(key_),
74           orig_rec_no(rec_no)
75     {}
76 };
77 
78 struct SNCMirrorProlong : public SNCMirrorEvent
79 {
80     Uint8 orig_time;
81     SNCBlobSummary blob_sum;
82 
83 
84     SNCMirrorProlong(ENCSyncEvent typ,
85                      Uint2 slot_,
86                      const CNCBlobKeyLight& key_,
87                      Uint8 rec_no,
88                      Uint8 tm,
89                      const CNCBlobAccessor* accessor);
90 };
91 
92 typedef list<SNCMirrorEvent*>   TNCMirrorQueue;
93 
94 
95 
96 class CNCPeerControl : public CSrvTask
97 {
98 public:
99     static bool Initialize(void);
100     static void Finalize(void);
101 
102     static Uint4 FindIPbyAlias(Uint4 alias);
103     static Uint4 FindIPbyName(const string& alias);
104     static bool  HasPeerInThrottle(void);
105 
106     static void SetServersForInitSync(Uint4 cnt_servers);
107     static void ResetServersForInitSync(void);
108     static void ReconfServersForInitSync(Uint4 cnt_servers);
109     static bool HasServersForInitSync(void);
110     static void PrintState(CSrvSocketTask& task);
111 
112     static CNCPeerControl* Peer(Uint8 srv_id);
113     static string GetPeerNameOrEmpty(Uint8 srv_id);
114 
115     static void MirrorUpdate(const CNCBlobKeyLight& key,
116                               Uint2 slot,
117                               Uint8 update_time);
118     static void MirrorRemove(const CNCBlobKeyLight& key,
119                               Uint2 slot,
120                               Uint8 update_time);
121     static void MirrorWrite(const CNCBlobKeyLight& key,
122                             Uint2 slot,
123                             Uint8 orig_rec_no,
124                             Uint8 size,
125                             const TServersList& mirrors_done);
126     static void MirrorProlong(const CNCBlobKeyLight& key,
127                               Uint2 slot,
128                               Uint8 orig_rec_no,
129                               Uint8 orig_time,
130                               const CNCBlobAccessor* accessor);
131     static Uint8 GetMirrorQueueSize(void);
132     static Uint8 GetMirrorQueueSize(Uint8 srv_id);
133     static void ReadCurState(SNCStateStat& state);
134 
135     void PeerHandshake(void);
136     void SetSlotsForInitSync(Uint2 cnt_slots);
137     void ResetSlotsForInitSync();
138     void ReconfSlotsForInitSync(Uint2 cnt_slots);
139     void AddInitiallySyncedSlot(void);
140     void RegisterSyncStop(bool is_passive,
141                           Uint8& next_sync_time,
142                           Uint8 next_sync_delay);
143 #ifdef _DEBUG
144     void RegisterSyncStat(bool is_passive, bool is_by_blobs, int result,  int hint);
145     static void PrintSyncStat(CSrvSocketTask& task);
146 #endif
147     Uint8 GetNextSyncTime(void);
148 
149     Uint8 GetSrvId(void) const;
150     CNCActiveHandler* GetBGConn(bool silent = false);
151     bool StartActiveSync(void);
152     bool AddSyncControl(CNCActiveSyncControl* sync_ctrl);
153     void RemoveSyncControl(CNCActiveSyncControl* sync_ctrl);
154     bool FinishSync(CNCActiveSyncControl* sync_ctrl);
155 
156     void RegisterConnError(void);
157     void RegisterConnSuccess(void);
158     void ConnOk(void);
159     void AssignClientConn(CNCActiveClientHub* hub);
160     CNCActiveHandler* GetPooledConn(void);
161     bool CreateNewSocket(CNCActiveHandler* conn);
162     void PutConnToPool(CNCActiveHandler* conn);
163     void ReleaseConn(CNCActiveHandler* conn);
164 
165     bool GetReadyForShutdown(void);
166 
167 
168     static CAtomicCounter   sm_TotalCopyRequests;
169     static CAtomicCounter   sm_CopyReqsRejected;
170 
171     void AbortInitialSync(void);
172     void SetHostProtocol(Uint8 ver);
173     void SetTrustLevel(Uint8 trust);
174     Uint8 GetTrustLevel(void) const;
175     Uint8 GetRawTrustLevel(void) const;
176 
177     bool AcceptsSyncUpdate(void) const;
178     bool AcceptsSyncRemove(void) const;
179     bool AcceptsBlobKey(const CNCBlobKeyLight& key) const;
180     bool AcceptsBList(void) const;
181     bool AcceptsBList2(void) const;
182     bool AcceptsUserFlags(void) const;
183     bool AcceptsPurge2(void) const;
184 
185 private:
186     CNCPeerControl(Uint8 srv_id);
187     CNCPeerControl(const CNCPeerControl&);
188     CNCPeerControl& operator= (const CNCPeerControl&);
189 
190     virtual void ExecuteSlice(TSrvThreadNum thr_num);
191 
192     void x_SrvInitiallySynced(bool succeeded);
193     void x_SlotsInitiallySynced(Uint2 cnt_slots, bool aborted=false);
194     CNCActiveHandler* x_GetPooledConnImpl(void);
195     bool x_ReserveBGConn(void);
196     bool x_ReserveBGConnNow(void);
197     void x_UnreserveBGConn(void);
198     void x_IncBGConns(void);
199     void x_DecBGConns(void);
200     void x_DecBGConns(CNCActiveHandler* conn);
201     void x_DecActiveConns(void);
202     CNCActiveHandler* x_CreateNewConn(bool for_bg);
203     bool x_AssignClientConn(CNCActiveClientHub* hub, CNCActiveHandler* conn);
204     CNCActiveHandler* x_GetBGConnImpl(void);
205     bool x_DoReleaseConn(CNCActiveHandler* conn);
206     void x_DeleteMirrorEvent(SNCMirrorEvent* event);
207     void x_ProcessUpdateEvent(SNCMirrorEvent* event);
208     void x_ProcessMirrorEvent(CNCActiveHandler* conn, SNCMirrorEvent* event);
209     void x_AddMirrorEvent(SNCMirrorEvent* event, Uint8 size);
210     void x_UpdateHasTasks(void);
211 
212 
213     Uint8 m_SrvId;
214     Uint4 m_HostIP;
215     string m_HostIPname;
216     string m_Hostname;
217     Uint4 m_HostAlias;
218     Uint8 m_HostProtocol;
219     Uint8 m_TrustLevel;
220     CMiniMutex m_ObjLock;
221     TNCPeerConnsList m_PooledConns;
222     TNCPeerConnsList m_BusyConns;
223     Uint8 m_FirstNWErrTime;
224     Uint8 m_ThrottleStart;
225     Uint8 m_NextSyncTime;
226     int m_ActiveConns;
227     int m_BGConns;
228     Uint2 m_SlotsToInitSync;
229     Uint2 m_OrigSlotsToInitSync;
230     Uint2 m_CntActiveSyncs;
231     Uint2 m_CntNWErrors;
232     Uint2 m_CntNWThrottles;
233     bool  m_InThrottle, m_MaybeThrottle;
234     bool  m_HasBGTasks;
235     bool  m_InitiallySynced;
236     TNCClientHubsList m_Clients;
237     TNCMirrorQueue m_SmallMirror;
238     TNCMirrorQueue m_BigMirror;
239     TNCActiveSyncList m_SyncList;
240     TNCActiveSyncListIt m_NextTaskSync;
241 #ifdef _DEBUG
242     map<size_t, size_t> m_SyncStat;
243 #endif
244 };
245 
246 
247 class CNCPeerShutdown : public CSrvShutdownCallback
248 {
249 public:
250     CNCPeerShutdown(void);
251     virtual ~CNCPeerShutdown(void);
252 
253     virtual bool ReadyForShutdown(void);
254 };
255 
256 
257 
258 //////////////////////////////////////////////////////////////////////////
259 // Inline functions
260 //////////////////////////////////////////////////////////////////////////
261 
262 inline void
SetSlotsForInitSync(Uint2 cnt_slots)263 CNCPeerControl::SetSlotsForInitSync(Uint2 cnt_slots)
264 {
265     m_OrigSlotsToInitSync = m_SlotsToInitSync = cnt_slots;
266     m_InitiallySynced = m_OrigSlotsToInitSync == 0;
267 }
268 
269 inline void
ResetSlotsForInitSync()270 CNCPeerControl::ResetSlotsForInitSync()
271 {
272     SetSlotsForInitSync(m_OrigSlotsToInitSync);
273 }
274 
275 inline void
ReconfSlotsForInitSync(Uint2 cnt_slots)276 CNCPeerControl::ReconfSlotsForInitSync(Uint2 cnt_slots)
277 {
278     m_OrigSlotsToInitSync = cnt_slots;
279 }
280 
281 inline Uint8
GetSrvId(void) const282 CNCPeerControl::GetSrvId(void) const
283 {
284     return m_SrvId;
285 }
286 
287 inline Uint8
GetNextSyncTime(void)288 CNCPeerControl::GetNextSyncTime(void)
289 {
290     return m_NextSyncTime;
291 }
292 
293 inline bool
AcceptsSyncUpdate(void) const294 CNCPeerControl::AcceptsSyncUpdate(void) const
295 {
296     return m_HostProtocol >= 60700;
297 }
298 inline bool
AcceptsSyncRemove(void) const299 CNCPeerControl::AcceptsSyncRemove(void) const
300 {
301     return m_HostProtocol >= 60806;
302 }
303 inline bool
AcceptsBlobKey(const CNCBlobKeyLight & key) const304 CNCPeerControl::AcceptsBlobKey(const CNCBlobKeyLight& key) const
305 {
306     return m_HostProtocol >= 60700 || key.KeyVersion() < 3;
307 }
308 
309 inline bool
AcceptsBList(void) const310 CNCPeerControl::AcceptsBList(void) const
311 {
312     return m_HostProtocol >= 60900;
313 }
314 
315 inline bool
AcceptsBList2(void) const316 CNCPeerControl::AcceptsBList2(void) const
317 {
318     return m_HostProtocol > 61100;
319 }
320 
321 inline bool
AcceptsUserFlags(void) const322 CNCPeerControl::AcceptsUserFlags(void) const
323 {
324     return m_HostProtocol >= 61100;
325 }
326 
327 inline bool
AcceptsPurge2(void) const328 CNCPeerControl::AcceptsPurge2(void) const
329 {
330     return m_HostProtocol >= 61107;
331 }
332 
333 inline void
ConnOk(void)334 CNCPeerControl::ConnOk(void)
335 {
336     m_MaybeThrottle = false;
337 }
338 inline void
SetTrustLevel(Uint8 trust)339 CNCPeerControl::SetTrustLevel(Uint8 trust) {
340     m_TrustLevel = trust;
341 }
342 inline Uint8
GetTrustLevel(void) const343 CNCPeerControl::GetTrustLevel(void) const {
344     return m_TrustLevel & 0xF;
345 }
346 inline Uint8
GetRawTrustLevel(void) const347 CNCPeerControl::GetRawTrustLevel(void) const
348 {
349     return m_TrustLevel;
350 }
351 
352 
353 END_NCBI_SCOPE
354 
355 #endif /* NETCACHE__PEER_CONTROL__HPP */
356