1 #ifndef NETCACHE__ACTIVE_HANDLER__HPP
2 #define NETCACHE__ACTIVE_HANDLER__HPP
3 /* $Id: active_handler.hpp 524617 2017-01-12 17:33:22Z gouriano $
4 * ===========================================================================
5 *
6 * PUBLIC DOMAIN NOTICE
7 * National Center for Biotechnology Information
8 *
9 * This software/database is a "United States Government Work" under the
10 * terms of the United States Copyright Act. It was written as part of
11 * the author's official duties as a United States Government employee and
12 * thus cannot be copyrighted. This software/database is freely available
13 * to the public for use. The National Library of Medicine and the U.S.
14 * Government have not placed any restriction on its use or reproduction.
15 *
16 * Although all reasonable efforts have been taken to ensure the accuracy
17 * and reliability of the software and data, the NLM and the U.S.
18 * Government do not and cannot warrant the performance or results that
19 * may be obtained by using this software or data. The NLM and the U.S.
20 * Government disclaim all warranties, express or implied, including
21 * warranties of performance, merchantability or fitness for any particular
22 * purpose.
23 *
24 * Please cite the author in any work or product based on this material.
25 *
26 * ===========================================================================
27 *
28 * Authors: Pavel Ivanov
29 *
30 * File Description: Network cache daemon
31 *
32 */
33
34
35 #include "nc_utils.hpp"
36 #include "periodic_sync.hpp"
37 #include "distribution_conf.hpp"
38 #include "message_handler.hpp"
39
40
41 BEGIN_NCBI_SCOPE
42
43
44 class CNCMessageHandler;
45 class CNCActiveHandler;
46 class CNCActiveHandler_Proxy;
47 class CNCPeerControl;
48 class CNCActiveSyncControl;
49 struct SNCBlobSummary;
50 struct SNCSyncEvent;
51 class CNCBlobAccessor;
52
53
54 enum ENCClientHubStatus {
55 eNCHubError,
56 eNCHubWaitForConn,
57 eNCHubConnReady,
58 eNCHubCmdInProgress,
59 eNCHubSuccess,
60 eNCHubDataReady
61 };
62
63 class CNCActiveClientHub : public CSrvRCUUser
64 {
65 public:
66 static CNCActiveClientHub* Create(Uint8 srv_id, CNCMessageHandler* client);
67
68 ENCClientHubStatus GetStatus(void);
69 const string& GetErrMsg(void);
70 CNCActiveHandler* GetHandler(void);
71 void Release(void);
72 string GetFullPeerName(void);
73
74 public:
75 void SetStatus(ENCClientHubStatus status);
76 void SetErrMsg(const string& msg);
77 void SetHandler(CNCActiveHandler* handler);
78 CNCMessageHandler* GetClient(void);
79
80 private:
81 CNCActiveClientHub(CNCMessageHandler* client);
82
83 virtual void ExecuteRCU(void);
84
85
86 CNCMessageHandler* m_Client;
87 CNCActiveHandler* m_Handler;
88 ENCClientHubStatus m_Status;
89 string m_ErrMsg;
90 };
91
92
93 struct SActiveList_tag;
94 typedef intr::list_base_hook<intr::tag<SActiveList_tag> > TActiveListHook;
95
96
97 /*
98 sends command to other instances of NetCache.
99 processes results.
100 created by CNCPeerControl
101 used by CNCPeerControl, CNCActiveSyncControl, CNCMessageHandler
102
103 begin: x_InvalidState
104 -> x_InvalidState: abort
105
106 Object becomes usable on SetClientHub (state becomes x_WaitClientRelease)
107 or, somebody (peercontrol) calls CopyPut, or CopyProlong or other methods
108 then, it sets proper State for itself.
109
110 All processing goes approx like this:
111 somebody calls a method (eg, ProxyRemove):
112 initial preparation;
113 create a command, goto x_SendCmdToExecute
114 check for errors, send command, wait for reply (x_WaitOneLineAnswer)
115 check for errors, read reply
116 do cmd, goto x_FinishCommand
117 clean resources, x_WaitClientRelease, x_PutSelfToPool (waiting for next cmd)
118 */
119 class CNCActiveHandler : public CSrvStatesTask<CNCActiveHandler>,
120 public TActiveListHook
121 {
122 public:
123 static void Initialize(void);
124
125 void ClientReleased(void);
126 void Release(void);
127
128 void SearchMeta(CRequestContext* cmd_ctx, const CNCBlobKey& key);
129 bool IsBlobExists(void);
130 const SNCBlobSummary& GetBlobSummary(void);
131
132 void AskPeerVersion(void);
133 void CopyPurge(CRequestContext* cmd_ctx, const CNCBlobKeyLight& key,
134 Uint8 when);
135 /*
136 request meta info, x_WaitForMetaInfo -> x_SendCopyPutCmd ->
137 on error, goto x_FinishCommand
138 create command, goto x_SendCmdToExecute
139 -> x_SendCmdToExecute
140 on error, goto x_CloseCmdAndConn, or x_ConnClosedReplaceable
141 write cmd, goto x_WaitOneLineAnswer -> x_ReadCopyPut
142 -> x_ReadCopyPut
143 on error, goto x_FinishCommand, or x_FakeWritingBlob
144 goto x_WriteBlobData -> x_FinishWritingBlob ->
145 request confirmation -> x_WaitOneLineAnswer
146 */
147 void CopyUpdate(const CNCBlobKeyLight& key, Uint8 create_time);
148 void CopyRemove(const CNCBlobKeyLight& key, Uint8 create_time);
149 void CopyPut(CRequestContext* cmd_ctx,
150 const CNCBlobKeyLight& key,
151 Uint2 slot,
152 Uint8 orig_rec_no);
153 // x_SendCmdToExecute -> x_WaitOneLineAnswer -> x_ReadCopyProlong -> x_FinishCommand
154 void CopyProlong(const CNCBlobKeyLight& key,
155 Uint2 slot,
156 Uint8 orig_rec_no,
157 Uint8 orig_time,
158 const SNCBlobSummary& blob_sum);
159 void ProxyRemove(CRequestContext* cmd_ctx,
160 const CNCBlobKey& key,
161 const string& password,
162 int version,
163 Uint1 quorum);
164 void ProxyHasBlob(CRequestContext* cmd_ctx,
165 const CNCBlobKey& key,
166 const string& password,
167 Uint1 quorum);
168 void ProxyGetSize(CRequestContext* cmd_ctx,
169 const CNCBlobKey& key,
170 const string& password,
171 int version,
172 Uint1 quorum,
173 bool search,
174 bool force_local);
175 void ProxySetValid(CRequestContext* cmd_ctx,
176 const CNCBlobKey& key,
177 const string& password,
178 int version);
179 void ProxyRead(CRequestContext* cmd_ctx,
180 const CNCBlobKey& key,
181 const string& password,
182 int version,
183 Uint8 start_pos,
184 Uint8 size,
185 Uint1 quorum,
186 bool search,
187 bool force_local,
188 Uint8 age);
189 void ProxyReadLast(CRequestContext* cmd_ctx,
190 const CNCBlobKey& key,
191 const string& password,
192 Uint8 start_pos,
193 Uint8 size,
194 Uint1 quorum,
195 bool search,
196 bool force_local,
197 Uint8 age);
198 void ProxyGetMeta(CRequestContext* cmd_ctx,
199 const CNCBlobKey& key,
200 Uint1 quorum,
201 bool force_local,
202 int http);
203 void ProxyWrite(CRequestContext* cmd_ctx,
204 const CNCBlobKey& key,
205 const string& password,
206 int version,
207 Uint4 ttl,
208 Uint1 quorum,
209 TNCUserFlags flags);
210 void ProxyProlong(CRequestContext* cmd_ctx,
211 const CNCBlobKey& key,
212 const string& password,
213 unsigned int add_time,
214 Uint1 quorum,
215 bool search,
216 bool force_local);
217 void ProxyBList(CRequestContext* cmd_ctx,
218 const CNCBlobKey& key,
219 bool force_local,
220 SNCBlobFilter* filters);
221
222 CSrvSocketTask* GetSocket(void);
223 CTempString GetCmdResponse(void);
224 bool GotClientResponse(void);
225
226 void SyncStart(CNCActiveSyncControl* ctrl, Uint8 local_rec_no, Uint8 remote_rec_no);
227 void SyncBlobsList(CNCActiveSyncControl* ctrl);
228 void SyncSend(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
229 void SyncSend(CNCActiveSyncControl* ctrl, const CNCBlobKeyLight& key);
230 void SyncRead(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
231 void SyncRead(CNCActiveSyncControl* ctrl,
232 const CNCBlobKeyLight& key,
233 Uint8 create_time);
234 void SyncProlongPeer(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
235 void SyncProlongPeer(CNCActiveSyncControl* ctrl,
236 const CNCBlobKeyLight& key,
237 const SNCBlobSummary& blob_sum);
238 void SyncProlongOur(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
239 void SyncProlongOur(CNCActiveSyncControl* ctrl,
240 const string& key,
241 const SNCBlobSummary& blob_sum);
242 void SyncCancel(CNCActiveSyncControl* ctrl);
243 void SyncCommit(CNCActiveSyncControl* ctrl,
244 Uint8 local_rec_no,
245 Uint8 remote_rec_no);
246
GetPeer(void) const247 const CNCPeerControl* GetPeer(void) const {
248 return m_Peer;
249 }
250
251 public:
252 CNCActiveHandler(Uint8 srv_id, CNCPeerControl* peer);
253 virtual ~CNCActiveHandler(void);
254
255 void SetProxy(CNCActiveHandler_Proxy* proxy);
256 void SetClientHub(CNCActiveClientHub* hub);
257 void SetReservedForBG(bool value);
258 bool IsReservedForBG(void);
259
260 void CloseForShutdown(void);
261 void CheckCommandTimeout(void);
262
GetSrvId(void) const263 Uint8 GetSrvId(void) const {
264 return m_SrvId;
265 }
266
267 private:
268 friend class CNCActiveClientHub;
269 friend class CNCActiveHandler_Proxy;
270
271
272 enum ECommands {
273 eSearchMeta = 1,
274 eCopyPut,
275 eCopyProlong,
276 eNeedOnlyConfirm,
277 eReadData,
278 eWriteData,
279 eSyncStart,
280 eSyncBList,
281 eSyncGet,
282 eSyncProlongPeer,
283 eSyncProInfo,
284 ePeerVersion
285 };
286
287
288 State x_MayDeleteThis(void);
289 State x_ReplaceServerConn(void);
290 State x_ConnClosedReplaceable(void);
291 State x_CloseCmdAndConn(void);
292 State x_CloseConn(void);
293 bool x_StartProcessing(void);
294 void x_SetStateAndStartProcessing(State state);
295 State x_SendCmdToExecute(void);
296 void x_StartWritingBlob(void);
297 State x_FinishWritingBlob(void);
298 State x_FakeWritingBlob(void);
299 void x_FinishSyncCmd(ESyncResult result, int hint);
300 void x_CleanCmdResources(void);
301 State x_FinishCommand(void);
302 State x_ProcessPeerError(void);
303 State x_ProcessProtocolError(void);
304 void x_DoCopyPut(void);
305 void x_DoSyncGet(void);
306 void x_SendCopyProlongCmd(const SNCBlobSummary& blob_sum);
307 State x_ReadSizeToRead(void);
308 void x_DoProlongOur(void);
309
310 State x_InvalidState(void);
311 State x_IdleState(void);
312 State x_WaitClientRelease(void);
313 State x_PutSelfToPool(void);
314 State x_ReadFoundMeta(void);
315 State x_SendCopyPutCmd(void);
316 State x_ReadCopyPut(void);
317 State x_ReadCopyProlong(void);
318 State x_ReadConfirm(void);
319 State x_ReadDataPrefix(void);
320 State x_ReadHttpDataPrefix(void);
321 State x_ReadDataForClient(void);
322 State x_ReadWritePrefix(void);
323 State x_FinishBlobFromClient(void);
324 State x_WaitOneLineAnswer(void);
325 State x_WaitForMetaInfo(void);
326 State x_WriteBlobData(void);
327 State x_PrepareSyncProlongCmd(void);
328 State x_ReadSyncStartHeader(void);
329 State x_ReadSyncStartAnswer(void);
330 State x_ReadSyncStartExtra(void);
331 State x_ReadEventsListKeySize(void);
332 State x_ReadEventsListBody(void);
333 State x_ReadBlobsListKeySize(void);
334 State x_ReadBlobsListBody(void);
335 State x_SendSyncGetCmd(void);
336 State x_ReadSyncGetHeader(void);
337 State x_ReadSyncGetAnswer(void);
338 State x_ReadBlobData(void);
339 State x_ReadSyncProInfoAnswer(void);
340 State x_ReadPeerVersion(void);
341 State x_ExecuteProInfoCmd(void);
342
343 void x_SetSlotAndBucketAndVerifySlot(Uint2 slot);
344
345
346 Uint8 m_SrvId;
347 string m_CmdToSend;
348 CTempString m_Response;
349 CNCPeerControl* m_Peer;
350 CNCActiveClientHub* m_Client;
351 CNCActiveSyncControl* m_SyncCtrl;
352 CNCActiveHandler_Proxy* m_Proxy;
353 Uint8 m_CntCmds;
354 CNCBlobKeyLight m_BlobKey;
355 CNCBlobAccessor* m_BlobAccess;
356 SNCBlobSummary m_BlobSum;
357 Uint8 m_OrigTime;
358 Uint8 m_OrigRecNo;
359 Uint8 m_OrigServer;
360 ECommands m_CurCmd;
361 Uint8 m_SizeToRead;
362 Uint4 m_ChunkSize;
363 Uint2 m_BlobSlot;
364 Uint2 m_TimeBucket;
365 ESynActionType m_SyncAction;
366 bool m_ReservedForBG;
367 bool m_ProcessingStarted;
368 bool m_CmdStarted;
369 bool m_GotAnyAnswer;
370 bool m_GotCmdAnswer;
371 bool m_GotClientResponse;
372 bool m_BlobExists;
373 bool m_CmdSuccess;
374 bool m_CmdFromClient;
375 bool m_Purge;
376 Uint2 m_KeySize;
377 string m_ErrMsg;
378 string m_SyncStartExtra;
379 TNCBufferType m_ReadBuf;
380
381 Uint8 m_SizeToWriteReq;
382 Uint8 m_SizeToReadReq;
383 public:
AddSizeWr(Uint8 chunk)384 void AddSizeWr( Uint8 chunk) {
385 m_SizeToWriteReq += chunk;
386 }
GetSizeWr(void) const387 Uint8 GetSizeWr(void) const {
388 return m_SizeToWriteReq;
389 }
GetSizeRd(void) const390 Uint8 GetSizeRd(void) const {
391 return m_SizeToReadReq;
392 }
ResetSizeRdWr(void)393 void ResetSizeRdWr(void) {
394 m_SizeToWriteReq = m_SizeToReadReq = 0;
395 }
396 friend class CNCActiveSyncControl;
397 };
398
399 inline void
x_SetSlotAndBucketAndVerifySlot(Uint2 slot)400 CNCActiveHandler::x_SetSlotAndBucketAndVerifySlot(Uint2 slot)
401 {
402 if (!CNCDistributionConf::GetSlotByKey(
403 m_BlobKey.PackedKey(),m_BlobSlot, m_TimeBucket) || m_BlobSlot != slot) {
404 SRV_FATAL("Slot verification failed, blob key: " << m_BlobKey.PackedKey() <<
405 ", expected slot: " << slot << ", calculated slot: " << m_BlobSlot);
406 }
407 }
408
409 class CNCActiveHandler_Proxy : public CSrvSocketTask
410 {
411 public:
412 CNCActiveHandler_Proxy(CNCActiveHandler* handler);
413 virtual ~CNCActiveHandler_Proxy(void);
414 void SetHandler(CNCActiveHandler* handler);
415 void NeedToProxySocket(void);
416 bool SocketProxyDone(void);
417
418 virtual void ExecuteSlice(TSrvThreadNum thr_num);
419
420 private:
421 CNCActiveHandler_Proxy(CNCActiveHandler_Proxy&);
422 CNCActiveHandler_Proxy& operator= (CNCActiveHandler_Proxy&);
423
424
425 CNCActiveHandler* m_Handler;
426 bool m_NeedToProxy;
427 };
428
429
430
431 //////////////////////////////////////////////////////////////////////////
432 // Inline functions
433 //////////////////////////////////////////////////////////////////////////
434
435 inline
CNCActiveClientHub(CNCMessageHandler * client)436 CNCActiveClientHub::CNCActiveClientHub(CNCMessageHandler* client)
437 : m_Client(client),
438 m_Handler(NULL),
439 m_Status(eNCHubWaitForConn)
440 {}
441
442 inline ENCClientHubStatus
GetStatus(void)443 CNCActiveClientHub::GetStatus(void)
444 {
445 return ACCESS_ONCE(m_Status);
446 }
447
448 inline void
SetErrMsg(const string & msg)449 CNCActiveClientHub::SetErrMsg(const string& msg)
450 {
451 m_ErrMsg = msg;
452 }
453
454 inline const string&
GetErrMsg(void)455 CNCActiveClientHub::GetErrMsg(void)
456 {
457 return m_ErrMsg;
458 }
459
460 inline void
SetHandler(CNCActiveHandler * handler)461 CNCActiveClientHub::SetHandler(CNCActiveHandler* handler)
462 {
463 m_Handler = handler;
464 }
465
466 inline CNCActiveHandler*
GetHandler(void)467 CNCActiveClientHub::GetHandler(void)
468 {
469 return m_Handler;
470 }
471
472 inline CNCMessageHandler*
GetClient(void)473 CNCActiveClientHub::GetClient(void)
474 {
475 return m_Client;
476 }
477
478
479 inline CSrvSocketTask*
GetSocket(void)480 CNCActiveHandler::GetSocket(void)
481 {
482 return m_Proxy;
483 }
484
485 inline CTempString
GetCmdResponse(void)486 CNCActiveHandler::GetCmdResponse(void)
487 {
488 return m_Response;
489 }
490
491 inline bool
GotClientResponse(void)492 CNCActiveHandler::GotClientResponse(void)
493 {
494 return m_GotClientResponse;
495 }
496
497 inline bool
IsBlobExists(void)498 CNCActiveHandler::IsBlobExists(void)
499 {
500 return m_BlobExists;
501 }
502
503 inline const SNCBlobSummary&
GetBlobSummary(void)504 CNCActiveHandler::GetBlobSummary(void)
505 {
506 return m_BlobSum;
507 }
508
509
510 inline void
SetHandler(CNCActiveHandler * handler)511 CNCActiveHandler_Proxy::SetHandler(CNCActiveHandler* handler)
512 {
513 m_Handler = handler;
514 }
515
516 END_NCBI_SCOPE
517
518 #endif /* NETCACHE__ACTIVE_HANDLER__HPP */
519