1 #ifndef NETCACHE__PERIODIC_SYNC__HPP
2 #define NETCACHE__PERIODIC_SYNC__HPP
3
4 /* $Id: periodic_sync.hpp 542161 2017-07-27 14:39:32Z gouriano $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Authors: Pavel Ivanov
30 *
31 * File Description: Data structures and API to support blobs mirroring.
32 *
33 */
34
35 #include "sync_log.hpp"
36 #include "nc_db_info.hpp"
37 #include "nc_utils.hpp"
38 #include <set>
39
40
41 BEGIN_NCBI_SCOPE
42
43
44 class CNCPeerControl;
45 class CNCActiveHandler;
46 class SNCCacheData;
47
48
49 enum ESyncInitiateResult {
50 eNetworkError,
51 eServerBusy, //< Clean is in progress
52 eCrossSynced,
53 eProceedWithEvents, //< The sync can proceed basing on events log
54 eProceedWithBlobs, //< The sync can proceed basing on full list of blobs
55 eUnknownServer //< command came from unknown server
56 };
57
58
59 // Front end for periodic synchronization
60 // The interface is for sync initiated by another peer only!
61 // The initiative from the current peer is coming from
62 // CNCActiveSyncControl.
63 class CNCPeriodicSync
64 {
65 public:
66 static bool Initialize(void);
67 static void ReConfig(void);
68 static void ReInitialize(void);
69 static void Finalize(void);
70
71 // Initiates synchronization which came from a peer netcache.
72 // It can be rejected if another sync is in progress or there are no
73 // records available anymore
74 static ESyncInitiateResult Initiate(Uint8 server_id,
75 Uint2 slot,
76 Uint8* local_start_rec_no,
77 Uint8* remote_start_rec_no,
78 TReducedSyncEvents* events,
79 Uint8* sync_id);
80
81 // Cancels the current sync
82 static void Cancel(Uint8 server_id, Uint2 slot, Uint8 sync_id);
83
84 static ESyncInitiateResult CanStartSyncCommand(Uint8 server_id,
85 Uint2 slot,
86 bool can_abort,
87 Uint8& sync_id);
88 static void MarkCurSyncByBlobs(Uint8 server_id, Uint2 slot, Uint8 sync_id);
89 static void SyncCommandFinished(Uint8 server_id, Uint2 slot, Uint8 sync_id);
90
91 // Completes the current sync transaction
92 static void Commit(Uint8 server_id,
93 Uint2 slot,
94 Uint8 sync_id,
95 Uint8 local_synced_rec_no,
96 Uint8 remote_synced_rec_no);
97 };
98
99
100
101 enum ESyncStatus
102 {
103 eIdle, // Vacant for something
104 eSyncInProgress, // Periodic sync is in progress (could be events based or blob lists based)
105 eCleanInProgress // Events log cleaning is in progress
106 };
107
108 enum ESyncResult {
109 eSynOK,
110 eSynCrossSynced,
111 eSynServerBusy,
112 eSynNetworkError,
113 eSynAborted
114 };
115
116 enum ESynTaskType {
117 eSynNoTask,
118 eSynNeedFinalize,
119 eSynEventSend,
120 eSynEventGet,
121 eSynBlobUpdateOur,
122 eSynBlobUpdatePeer,
123 eSynBlobSend,
124 eSynBlobGet
125 };
126
127 enum ESynActionType {
128 eSynActionNone,
129 eSynActionRead,
130 eSynActionWrite,
131 eSynActionProlong,
132 eSynActionRemove
133 };
134
135 #define NC_SYNC_HINT __LINE__
136
137 struct SSyncSlotSrv
138 {
139 CMiniMutex lock;
140 CNCPeerControl* peer;
141 bool sync_started;
142 bool is_passive;
143 bool is_by_blobs;
144 bool was_blobs_sync;
145 bool made_initial_sync;
146 Uint4 cnt_event_sync;
147 Uint4 started_cmds;
148 Uint8 next_sync_time;
149 Uint8 last_active_time;
150 Uint8 last_success_time;
151 Uint8 cur_sync_id;
152 Uint8 cnt_sync_ops;
153 ESyncResult result;
154 int hint;
155
156 SSyncSlotSrv(CNCPeerControl* peer);
157 };
158
159 typedef vector<SSyncSlotSrv*> TSlotSrvsList;
160
161
162 struct SSyncSlotData
163 {
164 CMiniMutex lock;
165 TSlotSrvsList srvs;
166 Uint2 slot;
167 Uint1 cnt_sync_started;
168 bool cleaning;
169 bool clean_required;
170
171 SSyncSlotData(Uint2 slot);
172 };
173
174 typedef map<Uint2, SSyncSlotData*> TSyncSlotsMap;
175 typedef vector<SSyncSlotData*> TSyncSlotsList;
176
177
178 typedef TSyncEvents::const_iterator TSyncEventsIt;
179 typedef TNCBlobSumList::const_iterator TBlobsListIt;
180
181 struct SSyncTaskInfo
182 {
183 ESynTaskType task_type;
184 TSyncEventsIt get_evt;
185 TSyncEventsIt send_evt;
186 TBlobsListIt local_blob;
187 TBlobsListIt remote_blob;
188 };
189
190
191 /*
192 manages sychronization (blob data and metadata) between NC servers
193
194 begin: x_StartScanSlots
195
196 -> x_StartScanSlots: pick first slot, goto x_CheckSlotOurSync
197
198 -> x_CheckSlotOurSync
199 when all slots processed, goto x_FinishScanSlots
200 for a given slot, check if it is time to sync; if yes, goto x_DoPeriodicSync
201 goto x_CheckSlotTheirSync
202
203 -> x_CheckSlotTheirSync
204 check if there is a sync on this slot started by somebody else and was not active for too long
205 if yes, cancel it
206 pick next slot, goto x_CheckSlotOurSync
207
208 -> x_FinishScanSlots
209 calc when to run next time
210 goto x_StartScanSlots
211
212 -> x_DoPeriodicSync:
213 once again, check that the sync does not start from both sides at the same time
214 if so, goto x_CheckSlotTheirSyn
215 get CNCActiveHandler which sends start sync (execute SyncStart command)
216 ok ? x_WaitSyncStarted : x_FinishSync
217
218 -> x_WaitSyncStarted
219 wait for sync started (check m_StartedCmds)
220 NCActiveHandler will report command result using CmdFinished() method
221 depending on the reply, goto x_PrepareSyncByBlobs, or goto x_PrepareSyncByEvents
222
223 -> x_PrepareSyncByEvents
224 another server has sent us list of events,
225 now give this list to CNCSyncLog, which will gve us the difference (m_Events2Get,m_Events2Send)
226 if list not empty, goto x_ExecuteSyncCommands
227
228 if CNCSyncLog cannot sync event lists (eg, some our info is lost), request blob list
229 goto x_WaitForBlobList
230
231 -> x_WaitForBlobList
232 once blob list received, goto x_PrepareSyncByBlobs
233
234 -> x_PrepareSyncByBlobs
235 re-fill list of local blobs (in given slot)
236 goto x_ExecuteSyncCommands
237
238 -> x_ExecuteSyncCommands
239 if no more commands, goto x_ExecuteFinalize
240 if network error, goto x_FinishSync
241 goto x_WaitForExecutingTasks
242
243 -> x_ExecuteFinalize
244 on success, finish sync, goto x_WaitForExecutingTasks
245 on error, goto x_FinishSync
246
247 -> x_WaitForExecutingTasks
248 woken up by CmdFinished
249 if all commands are executed ok, and need 'commit', goto x_ExecuteFinalize
250 after commit is done, goto x_FinishSync
251
252 -> x_FinishSync
253 log report
254 CommitSync
255 goto x_CheckSlotTheirSync
256
257 */
258
259 class CNCActiveSyncControl : public CSrvStatesTask<CNCActiveSyncControl>
260 {
261 public:
262 CNCActiveSyncControl(void);
263 virtual ~CNCActiveSyncControl(void);
264
265 Uint2 GetSyncSlot(void);
266 void StartResponse(Uint8 local_rec_no, Uint8 remote_rec_no, bool by_blobs);
267 bool AddStartEvent(SNCSyncEvent* evt);
268 bool AddStartBlob(const string& key, SNCBlobSummary* blob_sum);
269 bool GetNextTask(SSyncTaskInfo& task_info, bool* is_valid = nullptr);
270 void ExecuteSyncTask(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
271 void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler* conn, int hint);
IsStuck(void) const272 bool IsStuck(void) const {
273 return m_Stuck;
274 }
SetFirst(bool f=true)275 void SetFirst(bool f = true) {
276 m_First = f;
277 }
278 static void PrintState(TNCBufferType& sendBuff, const CTempString& mask);
279
280 private:
281 State x_StartScanSlots(void);
282 State x_CheckSlotOurSync(void);
283 State x_CheckSlotTheirSync(void);
284 State x_FinishScanSlots(void);
285 State x_DoPeriodicSync(void);
286 State x_WaitSyncStarted(void);
287 State x_PrepareSyncByEvents(void);
288 State x_WaitForBlobList(void);
289 State x_PrepareSyncByBlobs(void);
290 State x_ExecuteSyncCommands(void);
291 State x_ExecuteFinalize(void);
292 State x_WaitForExecutingTasks(void);
293 State x_FinishSync(void);
294 void x_CleanSyncObjects(void);
295 void x_CalcNextTask(void);
296 void x_DoEventSend(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
297 void x_DoEventGet(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
298 void x_DoBlobUpdateOur(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
299 void x_DoBlobUpdatePeer(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
300 void x_DoBlobSend(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
301 void x_DoBlobGet(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
302 void x_DoFinalize(CNCActiveHandler* conn);
303
304
305 SSyncSlotData* m_SlotData;
306 SSyncSlotSrv* m_SlotSrv;
307 set<SSyncSlotSrv*> m_VisitedSrv;
308 Uint8 m_SrvId;
309 Uint2 m_Slot;
310 ESyncResult m_Result;
311 int m_Hint;
312 Uint4 m_Progress;
313 CMiniMutex m_Lock;
314 Uint4 m_StartedCmds;
315
316 Uint8 m_LocalStartRecNo;
317 Uint8 m_RemoteStartRecNo;
318 Uint8 m_LocalSyncedRecNo;
319 Uint8 m_RemoteSyncedRecNo;
320 ESynTaskType m_NextTask;
321 TReducedSyncEvents m_RemoteEvents;
322 TSyncEvents m_Events2Get;
323 TSyncEvents m_Events2Send;
324 TSyncEventsIt m_CurGetEvent;
325 TSyncEventsIt m_CurSendEvent;
326 TNCBlobSumList m_LocalBlobs;
327 TNCBlobSumList m_RemoteBlobs;
328 TBlobsListIt m_CurLocalBlob;
329 TBlobsListIt m_CurRemoteBlob;
330 Uint8 m_ReadOK;
331 Uint8 m_ReadERR;
332 Uint8 m_WriteOK;
333 Uint8 m_WriteERR;
334 Uint8 m_ProlongOK;
335 Uint8 m_ProlongERR;
336 Uint8 m_DelOK;
337 Uint8 m_DelERR;
338 bool m_ForceInitSync;
339 bool m_DidSync;
340 bool m_FinishSyncCalled;
341 bool m_Stuck;
342 bool m_First;
343 bool m_NeedReply;
344 Uint8 m_MinNextTime;
345 Uint8 m_LoopStart;
346 TSyncSlotsList::const_iterator m_NextSlotIt;
347 Uint8 m_StartTime;
348 Uint8 m_CntUnfinished;
349 Uint8 m_MyTrust, m_TheirTrust;
350 set<CNCActiveHandler*> m_SyncHandlers;
351 };
352
353
354 class CNCLogCleaner : public CSrvTask
355 {
356 public:
357 CNCLogCleaner(void);
358 virtual ~CNCLogCleaner(void);
359
360 private:
361 virtual void ExecuteSlice(TSrvThreadNum thr_idx);
362
363
364 TSyncSlotsList::const_iterator m_NextSlotIt;
365 map<Uint2, Uint8> m_LastForceTime;
366 };
367
368
369
370
371 //////////////////////////////////////////////////////////////////////////
372 // Inline functions
373 //////////////////////////////////////////////////////////////////////////
374
375 inline Uint2
GetSyncSlot(void)376 CNCActiveSyncControl::GetSyncSlot(void)
377 {
378 return m_Slot;
379 }
380
381 inline void
StartResponse(Uint8 local_rec_no,Uint8 remote_rec_no,bool by_blobs)382 CNCActiveSyncControl::StartResponse(Uint8 local_rec_no,
383 Uint8 remote_rec_no,
384 bool by_blobs)
385 {
386 m_LocalStartRecNo = local_rec_no;
387 m_RemoteStartRecNo = remote_rec_no;
388 m_SlotSrv->is_by_blobs = by_blobs;
389 }
390
391 inline bool
AddStartEvent(SNCSyncEvent * evt)392 CNCActiveSyncControl::AddStartEvent(SNCSyncEvent* evt)
393 {
394 if (m_Result != eSynOK) {
395 return false;
396 }
397 if (evt->event_type == eSyncProlong)
398 m_RemoteEvents[evt->key.PackedKey()].prolong_event = evt;
399 else
400 m_RemoteEvents[evt->key.PackedKey()].wr_or_rm_event = evt;
401 return true;
402 }
403
404 inline bool
AddStartBlob(const string & key,SNCBlobSummary * blob_sum)405 CNCActiveSyncControl::AddStartBlob(const string& key, SNCBlobSummary* blob_sum)
406 {
407 if (m_Result != eSynOK) {
408 return false;
409 }
410 m_RemoteBlobs[key] = blob_sum;
411 return true;
412 }
413
414 END_NCBI_SCOPE
415
416
417 #endif /* NETCACHE__PERIODIC_SYNC__HPP */
418
419