1 #ifndef NETCACHE__PERIODIC_SYNC__HPP
2 #define NETCACHE__PERIODIC_SYNC__HPP
3 
4 /*  $Id: periodic_sync.hpp 542161 2017-07-27 14:39:32Z gouriano $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Pavel Ivanov
30  *
31  * File Description: Data structures and API to support blobs mirroring.
32  *
33  */
34 
35 #include "sync_log.hpp"
36 #include "nc_db_info.hpp"
37 #include "nc_utils.hpp"
38 #include <set>
39 
40 
41 BEGIN_NCBI_SCOPE
42 
43 
44 class CNCPeerControl;
45 class CNCActiveHandler;
46 class SNCCacheData;
47 
48 
49 enum ESyncInitiateResult {
50     eNetworkError,
51     eServerBusy,            //< Clean is in progress
52     eCrossSynced,
53     eProceedWithEvents,     //< The sync can proceed basing on events log
54     eProceedWithBlobs,      //< The sync can proceed basing on full list of blobs
55     eUnknownServer          //< command came from unknown server
56 };
57 
58 
59 // Front end for periodic synchronization
60 // The interface is for sync initiated by another peer only!
61 // The initiative from the current peer is coming from
62 // CNCActiveSyncControl.
63 class CNCPeriodicSync
64 {
65 public:
66     static bool Initialize(void);
67     static void ReConfig(void);
68     static void ReInitialize(void);
69     static void Finalize(void);
70 
71     // Initiates synchronization which came from a peer netcache.
72     // It can be rejected if another sync is in progress or there are no
73     // records available anymore
74     static ESyncInitiateResult Initiate(Uint8  server_id,
75                                         Uint2  slot,
76                                         Uint8* local_start_rec_no,
77                                         Uint8* remote_start_rec_no,
78                                         TReducedSyncEvents* events,
79                                         Uint8* sync_id);
80 
81     // Cancels the current sync
82     static void Cancel(Uint8 server_id, Uint2 slot, Uint8 sync_id);
83 
84     static ESyncInitiateResult CanStartSyncCommand(Uint8  server_id,
85                                                    Uint2  slot,
86                                                    bool   can_abort,
87                                                    Uint8& sync_id);
88     static void MarkCurSyncByBlobs(Uint8 server_id, Uint2 slot, Uint8 sync_id);
89     static void SyncCommandFinished(Uint8 server_id, Uint2 slot, Uint8 sync_id);
90 
91     // Completes the current sync transaction
92     static void Commit(Uint8 server_id,
93                        Uint2 slot,
94                        Uint8 sync_id,
95                        Uint8 local_synced_rec_no,
96                        Uint8 remote_synced_rec_no);
97 };
98 
99 
100 
101 enum ESyncStatus
102 {
103     eIdle,              // Vacant for something
104     eSyncInProgress,    // Periodic sync is in progress (could be events based or blob lists based)
105     eCleanInProgress    // Events log cleaning is in progress
106 };
107 
108 enum ESyncResult {
109     eSynOK,
110     eSynCrossSynced,
111     eSynServerBusy,
112     eSynNetworkError,
113     eSynAborted
114 };
115 
116 enum ESynTaskType {
117     eSynNoTask,
118     eSynNeedFinalize,
119     eSynEventSend,
120     eSynEventGet,
121     eSynBlobUpdateOur,
122     eSynBlobUpdatePeer,
123     eSynBlobSend,
124     eSynBlobGet
125 };
126 
127 enum ESynActionType {
128     eSynActionNone,
129     eSynActionRead,
130     eSynActionWrite,
131     eSynActionProlong,
132     eSynActionRemove
133 };
134 
135 #define NC_SYNC_HINT  __LINE__
136 
137 struct SSyncSlotSrv
138 {
139     CMiniMutex lock;
140     CNCPeerControl* peer;
141     bool    sync_started;
142     bool    is_passive;
143     bool    is_by_blobs;
144     bool    was_blobs_sync;
145     bool    made_initial_sync;
146     Uint4   cnt_event_sync;
147     Uint4   started_cmds;
148     Uint8   next_sync_time;
149     Uint8   last_active_time;
150     Uint8   last_success_time;
151     Uint8   cur_sync_id;
152     Uint8   cnt_sync_ops;
153     ESyncResult result;
154     int hint;
155 
156     SSyncSlotSrv(CNCPeerControl* peer);
157 };
158 
159 typedef vector<SSyncSlotSrv*>  TSlotSrvsList;
160 
161 
162 struct SSyncSlotData
163 {
164     CMiniMutex lock;
165     TSlotSrvsList srvs;
166     Uint2   slot;
167     Uint1   cnt_sync_started;
168     bool    cleaning;
169     bool    clean_required;
170 
171     SSyncSlotData(Uint2 slot);
172 };
173 
174 typedef map<Uint2, SSyncSlotData*>  TSyncSlotsMap;
175 typedef vector<SSyncSlotData*>      TSyncSlotsList;
176 
177 
178 typedef TSyncEvents::const_iterator     TSyncEventsIt;
179 typedef TNCBlobSumList::const_iterator  TBlobsListIt;
180 
181 struct SSyncTaskInfo
182 {
183     ESynTaskType task_type;
184     TSyncEventsIt get_evt;
185     TSyncEventsIt send_evt;
186     TBlobsListIt local_blob;
187     TBlobsListIt remote_blob;
188 };
189 
190 
191 /*
192     manages sychronization (blob data and metadata) between NC servers
193 
194     begin: x_StartScanSlots
195 
196     -> x_StartScanSlots: pick first slot, goto  x_CheckSlotOurSync
197 
198     -> x_CheckSlotOurSync
199         when all slots processed, goto x_FinishScanSlots
200         for a given slot, check if it is time to sync; if yes, goto x_DoPeriodicSync
201         goto x_CheckSlotTheirSync
202 
203     -> x_CheckSlotTheirSync
204         check if there is a sync on this slot started by somebody else and was not active for too long
205             if yes, cancel it
206         pick next slot, goto x_CheckSlotOurSync
207 
208     -> x_FinishScanSlots
209             calc when to run next time
210             goto x_StartScanSlots
211 
212     ->  x_DoPeriodicSync:
213             once again, check that the sync does not start from both sides at the same time
214                 if so, goto x_CheckSlotTheirSyn
215             get CNCActiveHandler which sends start sync (execute SyncStart command)
216                 ok ? x_WaitSyncStarted : x_FinishSync
217 
218     -> x_WaitSyncStarted
219             wait for sync started (check m_StartedCmds)
220                 NCActiveHandler will report command result using  CmdFinished() method
221             depending on the reply, goto x_PrepareSyncByBlobs, or goto x_PrepareSyncByEvents
222 
223     -> x_PrepareSyncByEvents
224             another server has sent us list of events,
225             now give this list to  CNCSyncLog, which will gve us the difference  (m_Events2Get,m_Events2Send)
226             if list not empty, goto x_ExecuteSyncCommands
227 
228             if CNCSyncLog cannot sync event lists (eg, some our info is lost), request blob list
229                 goto x_WaitForBlobList
230 
231     -> x_WaitForBlobList
232             once blob list received, goto x_PrepareSyncByBlobs
233 
234     -> x_PrepareSyncByBlobs
235             re-fill list of local blobs (in given slot)
236             goto x_ExecuteSyncCommands
237 
238     -> x_ExecuteSyncCommands
239             if no more commands, goto x_ExecuteFinalize
240             if network error, goto x_FinishSync
241             goto x_WaitForExecutingTasks
242 
243     -> x_ExecuteFinalize
244             on success, finish sync, goto  x_WaitForExecutingTasks
245             on error,  goto x_FinishSync
246 
247     -> x_WaitForExecutingTasks
248             woken up by CmdFinished
249             if all commands are executed ok, and need 'commit', goto x_ExecuteFinalize
250             after commit is done, goto x_FinishSync
251 
252     -> x_FinishSync
253             log report
254             CommitSync
255             goto x_CheckSlotTheirSync
256 
257 */
258 
259 class CNCActiveSyncControl : public CSrvStatesTask<CNCActiveSyncControl>
260 {
261 public:
262     CNCActiveSyncControl(void);
263     virtual ~CNCActiveSyncControl(void);
264 
265     Uint2 GetSyncSlot(void);
266     void StartResponse(Uint8 local_rec_no, Uint8 remote_rec_no, bool by_blobs);
267     bool AddStartEvent(SNCSyncEvent* evt);
268     bool AddStartBlob(const string& key, SNCBlobSummary* blob_sum);
269     bool GetNextTask(SSyncTaskInfo& task_info, bool* is_valid = nullptr);
270     void ExecuteSyncTask(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
271     void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler* conn, int hint);
IsStuck(void) const272     bool IsStuck(void) const {
273         return m_Stuck;
274     }
SetFirst(bool f=true)275     void SetFirst(bool f = true) {
276         m_First = f;
277     }
278     static void PrintState(TNCBufferType& sendBuff, const CTempString& mask);
279 
280 private:
281     State x_StartScanSlots(void);
282     State x_CheckSlotOurSync(void);
283     State x_CheckSlotTheirSync(void);
284     State x_FinishScanSlots(void);
285     State x_DoPeriodicSync(void);
286     State x_WaitSyncStarted(void);
287     State x_PrepareSyncByEvents(void);
288     State x_WaitForBlobList(void);
289     State x_PrepareSyncByBlobs(void);
290     State x_ExecuteSyncCommands(void);
291     State x_ExecuteFinalize(void);
292     State x_WaitForExecutingTasks(void);
293     State x_FinishSync(void);
294     void x_CleanSyncObjects(void);
295     void x_CalcNextTask(void);
296     void x_DoEventSend(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
297     void x_DoEventGet(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
298     void x_DoBlobUpdateOur(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
299     void x_DoBlobUpdatePeer(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
300     void x_DoBlobSend(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
301     void x_DoBlobGet(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
302     void x_DoFinalize(CNCActiveHandler* conn);
303 
304 
305     SSyncSlotData*  m_SlotData;
306     SSyncSlotSrv*   m_SlotSrv;
307     set<SSyncSlotSrv*> m_VisitedSrv;
308     Uint8       m_SrvId;
309     Uint2       m_Slot;
310     ESyncResult m_Result;
311     int         m_Hint;
312     Uint4       m_Progress;
313     CMiniMutex  m_Lock;
314     Uint4       m_StartedCmds;
315 
316     Uint8 m_LocalStartRecNo;
317     Uint8 m_RemoteStartRecNo;
318     Uint8 m_LocalSyncedRecNo;
319     Uint8 m_RemoteSyncedRecNo;
320     ESynTaskType m_NextTask;
321     TReducedSyncEvents m_RemoteEvents;
322     TSyncEvents    m_Events2Get;
323     TSyncEvents    m_Events2Send;
324     TSyncEventsIt  m_CurGetEvent;
325     TSyncEventsIt  m_CurSendEvent;
326     TNCBlobSumList m_LocalBlobs;
327     TNCBlobSumList m_RemoteBlobs;
328     TBlobsListIt   m_CurLocalBlob;
329     TBlobsListIt   m_CurRemoteBlob;
330     Uint8   m_ReadOK;
331     Uint8   m_ReadERR;
332     Uint8   m_WriteOK;
333     Uint8   m_WriteERR;
334     Uint8   m_ProlongOK;
335     Uint8   m_ProlongERR;
336     Uint8   m_DelOK;
337     Uint8   m_DelERR;
338     bool m_ForceInitSync;
339     bool m_DidSync;
340     bool m_FinishSyncCalled;
341     bool m_Stuck;
342     bool m_First;
343     bool m_NeedReply;
344     Uint8 m_MinNextTime;
345     Uint8 m_LoopStart;
346     TSyncSlotsList::const_iterator m_NextSlotIt;
347     Uint8 m_StartTime;
348     Uint8 m_CntUnfinished;
349     Uint8 m_MyTrust, m_TheirTrust;
350     set<CNCActiveHandler*> m_SyncHandlers;
351 };
352 
353 
354 class CNCLogCleaner : public CSrvTask
355 {
356 public:
357     CNCLogCleaner(void);
358     virtual ~CNCLogCleaner(void);
359 
360 private:
361     virtual void ExecuteSlice(TSrvThreadNum thr_idx);
362 
363 
364     TSyncSlotsList::const_iterator m_NextSlotIt;
365     map<Uint2, Uint8> m_LastForceTime;
366 };
367 
368 
369 
370 
371 //////////////////////////////////////////////////////////////////////////
372 //  Inline functions
373 //////////////////////////////////////////////////////////////////////////
374 
375 inline Uint2
GetSyncSlot(void)376 CNCActiveSyncControl::GetSyncSlot(void)
377 {
378     return m_Slot;
379 }
380 
381 inline void
StartResponse(Uint8 local_rec_no,Uint8 remote_rec_no,bool by_blobs)382 CNCActiveSyncControl::StartResponse(Uint8 local_rec_no,
383                                     Uint8 remote_rec_no,
384                                     bool by_blobs)
385 {
386     m_LocalStartRecNo = local_rec_no;
387     m_RemoteStartRecNo = remote_rec_no;
388     m_SlotSrv->is_by_blobs = by_blobs;
389 }
390 
391 inline bool
AddStartEvent(SNCSyncEvent * evt)392 CNCActiveSyncControl::AddStartEvent(SNCSyncEvent* evt)
393 {
394     if (m_Result != eSynOK) {
395         return false;
396     }
397     if (evt->event_type == eSyncProlong)
398         m_RemoteEvents[evt->key.PackedKey()].prolong_event = evt;
399     else
400         m_RemoteEvents[evt->key.PackedKey()].wr_or_rm_event = evt;
401     return true;
402 }
403 
404 inline bool
AddStartBlob(const string & key,SNCBlobSummary * blob_sum)405 CNCActiveSyncControl::AddStartBlob(const string& key, SNCBlobSummary* blob_sum)
406 {
407     if (m_Result != eSynOK) {
408         return false;
409     }
410     m_RemoteBlobs[key] = blob_sum;
411     return true;
412 }
413 
414 END_NCBI_SCOPE
415 
416 
417 #endif /* NETCACHE__PERIODIC_SYNC__HPP */
418 
419