1 #ifndef NETCACHE__ACTIVE_HANDLER__HPP
2 #define NETCACHE__ACTIVE_HANDLER__HPP
3 /*  $Id: active_handler.hpp 524617 2017-01-12 17:33:22Z gouriano $
4  * ===========================================================================
5  *
6  *                            PUBLIC DOMAIN NOTICE
7  *               National Center for Biotechnology Information
8  *
9  *  This software/database is a "United States Government Work" under the
10  *  terms of the United States Copyright Act.  It was written as part of
11  *  the author's official duties as a United States Government employee and
12  *  thus cannot be copyrighted.  This software/database is freely available
13  *  to the public for use. The National Library of Medicine and the U.S.
14  *  Government have not placed any restriction on its use or reproduction.
15  *
16  *  Although all reasonable efforts have been taken to ensure the accuracy
17  *  and reliability of the software and data, the NLM and the U.S.
18  *  Government do not and cannot warrant the performance or results that
19  *  may be obtained by using this software or data. The NLM and the U.S.
20  *  Government disclaim all warranties, express or implied, including
21  *  warranties of performance, merchantability or fitness for any particular
22  *  purpose.
23  *
24  *  Please cite the author in any work or product based on this material.
25  *
26  * ===========================================================================
27  *
28  * Authors:  Pavel Ivanov
29  *
30  * File Description: Network cache daemon
31  *
32  */
33 
34 
35 #include "nc_utils.hpp"
36 #include "periodic_sync.hpp"
37 #include "distribution_conf.hpp"
38 #include "message_handler.hpp"
39 
40 
41 BEGIN_NCBI_SCOPE
42 
43 
44 class CNCMessageHandler;
45 class CNCActiveHandler;
46 class CNCActiveHandler_Proxy;
47 class CNCPeerControl;
48 class CNCActiveSyncControl;
49 struct SNCBlobSummary;
50 struct SNCSyncEvent;
51 class CNCBlobAccessor;
52 
53 
54 enum ENCClientHubStatus {
55     eNCHubError,
56     eNCHubWaitForConn,
57     eNCHubConnReady,
58     eNCHubCmdInProgress,
59     eNCHubSuccess,
60     eNCHubDataReady
61 };
62 
63 class CNCActiveClientHub : public CSrvRCUUser
64 {
65 public:
66     static CNCActiveClientHub* Create(Uint8 srv_id, CNCMessageHandler* client);
67 
68     ENCClientHubStatus GetStatus(void);
69     const string& GetErrMsg(void);
70     CNCActiveHandler* GetHandler(void);
71     void Release(void);
72     string GetFullPeerName(void);
73 
74 public:
75     void SetStatus(ENCClientHubStatus status);
76     void SetErrMsg(const string& msg);
77     void SetHandler(CNCActiveHandler* handler);
78     CNCMessageHandler* GetClient(void);
79 
80 private:
81     CNCActiveClientHub(CNCMessageHandler* client);
82 
83     virtual void ExecuteRCU(void);
84 
85 
86     CNCMessageHandler* m_Client;
87     CNCActiveHandler* m_Handler;
88     ENCClientHubStatus m_Status;
89     string m_ErrMsg;
90 };
91 
92 
93 struct SActiveList_tag;
94 typedef intr::list_base_hook<intr::tag<SActiveList_tag> >   TActiveListHook;
95 
96 
97 /*
98     sends command to other instances of NetCache.
99     processes results.
100     created by CNCPeerControl
101     used by CNCPeerControl, CNCActiveSyncControl, CNCMessageHandler
102 
103     begin: x_InvalidState
104     -> x_InvalidState: abort
105 
106     Object becomes usable on SetClientHub (state becomes x_WaitClientRelease)
107     or, somebody (peercontrol) calls CopyPut, or CopyProlong or other methods
108     then, it sets proper State for itself.
109 
110     All processing goes approx like this:
111     somebody calls a method (eg, ProxyRemove):
112         initial preparation;
113         create a command, goto x_SendCmdToExecute
114         check for errors, send command, wait for reply (x_WaitOneLineAnswer)
115         check for errors, read reply
116         do cmd, goto x_FinishCommand
117         clean resources, x_WaitClientRelease, x_PutSelfToPool (waiting for next cmd)
118 */
119 class CNCActiveHandler : public CSrvStatesTask<CNCActiveHandler>,
120                          public TActiveListHook
121 {
122 public:
123     static void Initialize(void);
124 
125     void ClientReleased(void);
126     void Release(void);
127 
128     void SearchMeta(CRequestContext* cmd_ctx, const CNCBlobKey& key);
129     bool IsBlobExists(void);
130     const SNCBlobSummary& GetBlobSummary(void);
131 
132     void AskPeerVersion(void);
133     void CopyPurge(CRequestContext* cmd_ctx, const CNCBlobKeyLight& key,
134                  Uint8 when);
135     /*
136         request meta info, x_WaitForMetaInfo ->  x_SendCopyPutCmd ->
137         on error, goto  x_FinishCommand
138         create command, goto x_SendCmdToExecute
139         -> x_SendCmdToExecute
140             on error, goto x_CloseCmdAndConn, or  x_ConnClosedReplaceable
141             write cmd, goto  x_WaitOneLineAnswer  -> x_ReadCopyPut
142         -> x_ReadCopyPut
143             on error, goto  x_FinishCommand, or x_FakeWritingBlob
144             goto x_WriteBlobData -> x_FinishWritingBlob ->
145                 request confirmation -> x_WaitOneLineAnswer
146     */
147     void CopyUpdate(const CNCBlobKeyLight& key, Uint8 create_time);
148     void CopyRemove(const CNCBlobKeyLight& key, Uint8 create_time);
149     void CopyPut(CRequestContext* cmd_ctx,
150                  const CNCBlobKeyLight& key,
151                  Uint2 slot,
152                  Uint8 orig_rec_no);
153     // x_SendCmdToExecute ->  x_WaitOneLineAnswer -> x_ReadCopyProlong -> x_FinishCommand
154     void CopyProlong(const CNCBlobKeyLight& key,
155                      Uint2 slot,
156                      Uint8 orig_rec_no,
157                      Uint8 orig_time,
158                      const SNCBlobSummary& blob_sum);
159     void ProxyRemove(CRequestContext* cmd_ctx,
160                      const CNCBlobKey& key,
161                      const string& password,
162                      int version,
163                      Uint1 quorum);
164     void ProxyHasBlob(CRequestContext* cmd_ctx,
165                       const CNCBlobKey& key,
166                       const string& password,
167                       Uint1 quorum);
168     void ProxyGetSize(CRequestContext* cmd_ctx,
169                       const CNCBlobKey& key,
170                       const string& password,
171                       int version,
172                       Uint1 quorum,
173                       bool search,
174                       bool force_local);
175     void ProxySetValid(CRequestContext* cmd_ctx,
176                        const CNCBlobKey& key,
177                        const string& password,
178                        int version);
179     void ProxyRead(CRequestContext* cmd_ctx,
180                    const CNCBlobKey& key,
181                    const string& password,
182                    int version,
183                    Uint8 start_pos,
184                    Uint8 size,
185                    Uint1 quorum,
186                    bool search,
187                    bool force_local,
188                    Uint8 age);
189     void ProxyReadLast(CRequestContext* cmd_ctx,
190                        const CNCBlobKey& key,
191                        const string& password,
192                        Uint8 start_pos,
193                        Uint8 size,
194                        Uint1 quorum,
195                        bool search,
196                        bool force_local,
197                        Uint8 age);
198     void ProxyGetMeta(CRequestContext* cmd_ctx,
199                       const CNCBlobKey& key,
200                       Uint1 quorum,
201                       bool force_local,
202                       int http);
203     void ProxyWrite(CRequestContext* cmd_ctx,
204                     const CNCBlobKey& key,
205                     const string& password,
206                     int version,
207                     Uint4 ttl,
208                     Uint1 quorum,
209                     TNCUserFlags flags);
210     void ProxyProlong(CRequestContext* cmd_ctx,
211                       const CNCBlobKey& key,
212                       const string& password,
213                       unsigned int add_time,
214                       Uint1 quorum,
215                       bool search,
216                       bool force_local);
217     void ProxyBList(CRequestContext* cmd_ctx,
218                     const CNCBlobKey& key,
219                     bool force_local,
220                     SNCBlobFilter* filters);
221 
222     CSrvSocketTask* GetSocket(void);
223     CTempString GetCmdResponse(void);
224     bool GotClientResponse(void);
225 
226     void SyncStart(CNCActiveSyncControl* ctrl, Uint8 local_rec_no, Uint8 remote_rec_no);
227     void SyncBlobsList(CNCActiveSyncControl* ctrl);
228     void SyncSend(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
229     void SyncSend(CNCActiveSyncControl* ctrl, const CNCBlobKeyLight& key);
230     void SyncRead(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
231     void SyncRead(CNCActiveSyncControl* ctrl,
232                   const CNCBlobKeyLight& key,
233                   Uint8 create_time);
234     void SyncProlongPeer(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
235     void SyncProlongPeer(CNCActiveSyncControl* ctrl,
236                          const CNCBlobKeyLight& key,
237                          const SNCBlobSummary& blob_sum);
238     void SyncProlongOur(CNCActiveSyncControl* ctrl, SNCSyncEvent* event);
239     void SyncProlongOur(CNCActiveSyncControl* ctrl,
240                         const string& key,
241                         const SNCBlobSummary& blob_sum);
242     void SyncCancel(CNCActiveSyncControl* ctrl);
243     void SyncCommit(CNCActiveSyncControl* ctrl,
244                     Uint8 local_rec_no,
245                     Uint8 remote_rec_no);
246 
GetPeer(void) const247     const CNCPeerControl* GetPeer(void) const {
248         return m_Peer;
249     }
250 
251 public:
252     CNCActiveHandler(Uint8 srv_id, CNCPeerControl* peer);
253     virtual ~CNCActiveHandler(void);
254 
255     void SetProxy(CNCActiveHandler_Proxy* proxy);
256     void SetClientHub(CNCActiveClientHub* hub);
257     void SetReservedForBG(bool value);
258     bool IsReservedForBG(void);
259 
260     void CloseForShutdown(void);
261     void CheckCommandTimeout(void);
262 
GetSrvId(void) const263     Uint8 GetSrvId(void) const {
264         return  m_SrvId;
265     }
266 
267 private:
268     friend class CNCActiveClientHub;
269     friend class CNCActiveHandler_Proxy;
270 
271 
272     enum ECommands {
273         eSearchMeta = 1,
274         eCopyPut,
275         eCopyProlong,
276         eNeedOnlyConfirm,
277         eReadData,
278         eWriteData,
279         eSyncStart,
280         eSyncBList,
281         eSyncGet,
282         eSyncProlongPeer,
283         eSyncProInfo,
284         ePeerVersion
285     };
286 
287 
288     State x_MayDeleteThis(void);
289     State x_ReplaceServerConn(void);
290     State x_ConnClosedReplaceable(void);
291     State x_CloseCmdAndConn(void);
292     State x_CloseConn(void);
293     bool x_StartProcessing(void);
294     void x_SetStateAndStartProcessing(State state);
295     State x_SendCmdToExecute(void);
296     void x_StartWritingBlob(void);
297     State x_FinishWritingBlob(void);
298     State x_FakeWritingBlob(void);
299     void x_FinishSyncCmd(ESyncResult result, int hint);
300     void x_CleanCmdResources(void);
301     State x_FinishCommand(void);
302     State x_ProcessPeerError(void);
303     State x_ProcessProtocolError(void);
304     void x_DoCopyPut(void);
305     void x_DoSyncGet(void);
306     void x_SendCopyProlongCmd(const SNCBlobSummary& blob_sum);
307     State x_ReadSizeToRead(void);
308     void x_DoProlongOur(void);
309 
310     State x_InvalidState(void);
311     State x_IdleState(void);
312     State x_WaitClientRelease(void);
313     State x_PutSelfToPool(void);
314     State x_ReadFoundMeta(void);
315     State x_SendCopyPutCmd(void);
316     State x_ReadCopyPut(void);
317     State x_ReadCopyProlong(void);
318     State x_ReadConfirm(void);
319     State x_ReadDataPrefix(void);
320     State x_ReadHttpDataPrefix(void);
321     State x_ReadDataForClient(void);
322     State x_ReadWritePrefix(void);
323     State x_FinishBlobFromClient(void);
324     State x_WaitOneLineAnswer(void);
325     State x_WaitForMetaInfo(void);
326     State x_WriteBlobData(void);
327     State x_PrepareSyncProlongCmd(void);
328     State x_ReadSyncStartHeader(void);
329     State x_ReadSyncStartAnswer(void);
330     State x_ReadSyncStartExtra(void);
331     State x_ReadEventsListKeySize(void);
332     State x_ReadEventsListBody(void);
333     State x_ReadBlobsListKeySize(void);
334     State x_ReadBlobsListBody(void);
335     State x_SendSyncGetCmd(void);
336     State x_ReadSyncGetHeader(void);
337     State x_ReadSyncGetAnswer(void);
338     State x_ReadBlobData(void);
339     State x_ReadSyncProInfoAnswer(void);
340     State x_ReadPeerVersion(void);
341     State x_ExecuteProInfoCmd(void);
342 
343     void x_SetSlotAndBucketAndVerifySlot(Uint2 slot);
344 
345 
346     Uint8   m_SrvId;
347     string  m_CmdToSend;
348     CTempString m_Response;
349     CNCPeerControl* m_Peer;
350     CNCActiveClientHub* m_Client;
351     CNCActiveSyncControl* m_SyncCtrl;
352     CNCActiveHandler_Proxy* m_Proxy;
353     Uint8  m_CntCmds;
354     CNCBlobKeyLight m_BlobKey;
355     CNCBlobAccessor* m_BlobAccess;
356     SNCBlobSummary m_BlobSum;
357     Uint8 m_OrigTime;
358     Uint8 m_OrigRecNo;
359     Uint8 m_OrigServer;
360     ECommands m_CurCmd;
361     Uint8 m_SizeToRead;
362     Uint4 m_ChunkSize;
363     Uint2 m_BlobSlot;
364     Uint2 m_TimeBucket;
365     ESynActionType m_SyncAction;
366     bool m_ReservedForBG;
367     bool m_ProcessingStarted;
368     bool m_CmdStarted;
369     bool m_GotAnyAnswer;
370     bool m_GotCmdAnswer;
371     bool m_GotClientResponse;
372     bool m_BlobExists;
373     bool m_CmdSuccess;
374     bool m_CmdFromClient;
375     bool m_Purge;
376     Uint2 m_KeySize;
377     string m_ErrMsg;
378     string m_SyncStartExtra;
379     TNCBufferType m_ReadBuf;
380 
381     Uint8 m_SizeToWriteReq;
382     Uint8 m_SizeToReadReq;
383 public:
AddSizeWr(Uint8 chunk)384     void AddSizeWr( Uint8 chunk) {
385         m_SizeToWriteReq += chunk;
386     }
GetSizeWr(void) const387     Uint8 GetSizeWr(void) const {
388         return m_SizeToWriteReq;
389     }
GetSizeRd(void) const390     Uint8 GetSizeRd(void) const {
391         return m_SizeToReadReq;
392     }
ResetSizeRdWr(void)393     void ResetSizeRdWr(void) {
394         m_SizeToWriteReq = m_SizeToReadReq = 0;
395     }
396     friend class CNCActiveSyncControl;
397 };
398 
399 inline void
x_SetSlotAndBucketAndVerifySlot(Uint2 slot)400 CNCActiveHandler::x_SetSlotAndBucketAndVerifySlot(Uint2 slot)
401 {
402     if (!CNCDistributionConf::GetSlotByKey(
403         m_BlobKey.PackedKey(),m_BlobSlot, m_TimeBucket) || m_BlobSlot != slot) {
404         SRV_FATAL("Slot verification failed, blob key: " << m_BlobKey.PackedKey() <<
405                   ", expected slot: " << slot << ", calculated slot: " << m_BlobSlot);
406     }
407 }
408 
409 class CNCActiveHandler_Proxy : public CSrvSocketTask
410 {
411 public:
412     CNCActiveHandler_Proxy(CNCActiveHandler* handler);
413     virtual ~CNCActiveHandler_Proxy(void);
414     void SetHandler(CNCActiveHandler* handler);
415     void NeedToProxySocket(void);
416     bool SocketProxyDone(void);
417 
418     virtual void ExecuteSlice(TSrvThreadNum thr_num);
419 
420 private:
421     CNCActiveHandler_Proxy(CNCActiveHandler_Proxy&);
422     CNCActiveHandler_Proxy& operator= (CNCActiveHandler_Proxy&);
423 
424 
425     CNCActiveHandler* m_Handler;
426     bool m_NeedToProxy;
427 };
428 
429 
430 
431 //////////////////////////////////////////////////////////////////////////
432 // Inline functions
433 //////////////////////////////////////////////////////////////////////////
434 
435 inline
CNCActiveClientHub(CNCMessageHandler * client)436 CNCActiveClientHub::CNCActiveClientHub(CNCMessageHandler* client)
437     : m_Client(client),
438       m_Handler(NULL),
439       m_Status(eNCHubWaitForConn)
440 {}
441 
442 inline ENCClientHubStatus
GetStatus(void)443 CNCActiveClientHub::GetStatus(void)
444 {
445     return ACCESS_ONCE(m_Status);
446 }
447 
448 inline void
SetErrMsg(const string & msg)449 CNCActiveClientHub::SetErrMsg(const string& msg)
450 {
451     m_ErrMsg = msg;
452 }
453 
454 inline const string&
GetErrMsg(void)455 CNCActiveClientHub::GetErrMsg(void)
456 {
457     return m_ErrMsg;
458 }
459 
460 inline void
SetHandler(CNCActiveHandler * handler)461 CNCActiveClientHub::SetHandler(CNCActiveHandler* handler)
462 {
463     m_Handler = handler;
464 }
465 
466 inline CNCActiveHandler*
GetHandler(void)467 CNCActiveClientHub::GetHandler(void)
468 {
469     return m_Handler;
470 }
471 
472 inline CNCMessageHandler*
GetClient(void)473 CNCActiveClientHub::GetClient(void)
474 {
475     return m_Client;
476 }
477 
478 
479 inline CSrvSocketTask*
GetSocket(void)480 CNCActiveHandler::GetSocket(void)
481 {
482     return m_Proxy;
483 }
484 
485 inline CTempString
GetCmdResponse(void)486 CNCActiveHandler::GetCmdResponse(void)
487 {
488     return m_Response;
489 }
490 
491 inline bool
GotClientResponse(void)492 CNCActiveHandler::GotClientResponse(void)
493 {
494     return m_GotClientResponse;
495 }
496 
497 inline bool
IsBlobExists(void)498 CNCActiveHandler::IsBlobExists(void)
499 {
500     return m_BlobExists;
501 }
502 
503 inline const SNCBlobSummary&
GetBlobSummary(void)504 CNCActiveHandler::GetBlobSummary(void)
505 {
506     return m_BlobSum;
507 }
508 
509 
510 inline void
SetHandler(CNCActiveHandler * handler)511 CNCActiveHandler_Proxy::SetHandler(CNCActiveHandler* handler)
512 {
513     m_Handler = handler;
514 }
515 
516 END_NCBI_SCOPE
517 
518 #endif /* NETCACHE__ACTIVE_HANDLER__HPP */
519