1 #ifndef NETCACHE__MESSAGE_HANDLER__HPP
2 #define NETCACHE__MESSAGE_HANDLER__HPP
3 /*  $Id: message_handler.hpp 564680 2018-05-31 19:19:16Z gouriano $
4  * ===========================================================================
5  *
6  *                            PUBLIC DOMAIN NOTICE
7  *               National Center for Biotechnology Information
8  *
9  *  This software/database is a "United States Government Work" under the
10  *  terms of the United States Copyright Act.  It was written as part of
11  *  the author's official duties as a United States Government employee and
12  *  thus cannot be copyrighted.  This software/database is freely available
13  *  to the public for use. The National Library of Medicine and the U.S.
14  *  Government have not placed any restriction on its use or reproduction.
15  *
16  *  Although all reasonable efforts have been taken to ensure the accuracy
17  *  and reliability of the software and data, the NLM and the U.S.
18  *  Government do not and cannot warrant the performance or results that
19  *  may be obtained by using this software or data. The NLM and the U.S.
20  *  Government disclaim all warranties, express or implied, including
21  *  warranties of performance, merchantability or fitness for any particular
22  *  purpose.
23  *
24  *  Please cite the author in any work or product based on this material.
25  *
26  * ===========================================================================
27  *
28  * Authors:  Pavel Ivanov
29  *
30  * File Description: Network cache daemon
31  *
32  */
33 
34 #include <connect/services/netservice_protocol_parser.hpp>
35 
36 #include "nc_utils.hpp"
37 
38 
39 BEGIN_NCBI_SCOPE
40 
41 
42 class CNCActiveClientHub;
43 class CNCBlobAccessor;
44 struct SNCSpecificParams;
45 struct SNCBlobVerData;
46 struct SNCBlobSummary;
47 struct SNCBlobFilter;
48 struct SNCSyncEvent;
49 
50 
51 enum ENCCmdFlags {
52     fNoCmdFlags = 0,
53     /// Command needs access to the blob.
54     fNeedsBlobAccess    = 1 <<  0,
55     /// Command needs to generate blob key if it's empty.
56     fCanGenerateKey     = 1 <<  1,
57     /// Command needs finished database caching to be executed locally.
58     fNeedsStorageCache  = 1 <<  2,
59     /// Do not check access password during command execution.
60     fDoNotCheckPassword = 1 <<  3,
61     /// Always execute the command locally without proxying to peers.
62     fDoNotProxyToPeers  = 1 <<  4,
63     /// Command needs to search for the latest version of the blob on all servers.
64     fUsesPeerSearch     = 1 <<  5,
65     /// During the blob search on other servers command needs to know only
66     /// whether this blob exists or not, i.e. first positive response
67     /// of existence should finish the search.
68     fPeerFindExistsOnly = 1 <<  6,
69     /// Command comes from client and needs disk space to execute.
70     fNeedsSpaceAsClient = 1 <<  7,
71     /// Command comes from other NC server and needs disk space to execute.
72     fNeedsSpaceAsPeer   = 1 <<  8,
73     /// Byte order should be swapped when reading length of chunks in blob
74     /// transfer protocol.
75     fSwapLengthBytes    = 1 <<  9,
76     /// Command does not need a confirmation at the end of execution.
77     /// Usually, all commands need a reply,
78     /// but storing blob (STOR) may go without (in 'performance' mode)
79     fNoReplyOnFinish    = 1 << 10,
80     /// There is exact size of the blob transferred to NetCache.
81     fReadExactBlobSize  = 1 << 11,
82     /// Client will send blob data for the command and won't send EOF marker.
83     fSkipBlobEOF        = 1 << 12,
84     /// Command copies sync log event from other server, not creates a new one.
85     fCopyLogEvent       = 1 << 13,
86     /// Command can be executed only by admin client.
87     fNeedsAdminClient   = 1 << 14,
88     /// Command can be executed only after successful execution of SYNC_START.
89     fRunsInStartedSync  = 1 << 15,
90     /// Command should be executed even if server wants to abort
91     /// the synchronization.
92     fProhibitsSyncAbort = 1 << 16,
93     /// Command should be executed with a lower priority.
94     fNeedsLowerPriority = 1 << 17,
95     /// Command shouldn't check the blob version (it doesn't come with
96     /// the command).
97     fNoBlobVersionCheck = 1 << 18,
98     /// Access information about blob shouldn't be printed at the end
99     /// of the command.
100     fNoBlobAccessStats  = 1 << 19,
101     /// Consider synchronization command to be successful even though
102     /// RequestStatus is not eStatus_OK.
103     fSyncCmdSuccessful  = 1 << 20,
104     /// This is PUT2 command and connection that used PUT2 command.
105     fCursedPUT2Cmd      = 1 << 21,
106     /// Command comes from client, not from other NC server.
107     fComesFromClient    = 1 << 22,
108     /// HTTP-related.
109     fIsHttp             = 1 << 23,
110     /// Command needs access to the blob list.
111     fNeedsBlobList     = 1 <<  24,
112 
113 
114     eProxyBlobRead      = fNeedsBlobAccess | fUsesPeerSearch,
115     eClientBlobRead     = eProxyBlobRead   | fComesFromClient,
116     eProxyBlobWrite     = fNeedsBlobAccess | fNeedsSpaceAsClient,
117     eClientBlobWrite    = eProxyBlobWrite  | fComesFromClient,
118     eCopyBlobFromPeer   = fNeedsBlobAccess | fNeedsStorageCache | fDoNotProxyToPeers
119                           | fDoNotCheckPassword | fNeedsAdminClient,
120     eRunsInStartedSync  = fRunsInStartedSync | fNeedsAdminClient | fNeedsLowerPriority
121                           | fNeedsStorageCache,
122     eSyncBlobCmd        = eRunsInStartedSync | fNeedsBlobAccess | fDoNotProxyToPeers
123                           | fDoNotCheckPassword,
124     eBlobPut            =   fNeedsSpaceAsClient | fNeedsSpaceAsPeer
125 };
126 typedef Uint4 TNCCmdFlags;
127 
128 enum ENCUserFlags {
129     fNoUserFlags = 0,
130     /// Command does not update blob expiration time
131     fNoProlong    = 1 <<  0,
132     /// Command does not create blob if it does not exist
133     fNoCreate     = 1 <<  1
134 };
135 typedef Uint4 TNCUserFlags;
136 
137 enum ENCProxyCmd {
138     eProxyNone = 0,
139     eProxyRead = 1,
140     eProxyWrite,
141     eProxyHasBlob,
142     eProxyGetSize,
143     eProxyReadLast,
144     eProxySetValid,
145     eProxyRemove,
146     eProxyGetMeta,
147     eProxyProlong,
148     eProxyGetBList,
149     eProxyGetBList2
150 } NCBI_PACKED_ENUM_END();
151 
152 /// Handler of all NetCache incoming requests.
153 /// Handler written to be reusable object so that if one connection to
154 /// NetCache is closed then handler from it can go to serve for another newly
155 /// opened connection.
156 class CNCMessageHandler : public CSrvSocketTask,
157                           public CSrvStatesTask<CNCMessageHandler>
158 {
159 public:
160     /// Create handler for the given NetCache server object
161     CNCMessageHandler(void);
162     virtual ~CNCMessageHandler(void);
163 
164     bool IsBlobWritingFinished(void);
165 
166     /// Extra information about each NetCache command
167     struct SCommandExtra {
168         /// Which method will process the command
169         State           processor;
170         /// Command name to present to user in statistics
171         const char*     cmd_name;
172         TNCCmdFlags     cmd_flags;
173         /// What access to the blob command should receive
174         ENCAccessType   blob_access;
175         ENCProxyCmd     proxy_cmd;
176     };
177     /// Type definitions for NetCache protocol parser
178     typedef SNSProtoCmdDef<SCommandExtra>      SCommandDef;
179     typedef SNSProtoParsedCmd<SCommandExtra>   SParsedCmd;
180     typedef CNetServProtoParser<SCommandExtra> TProtoParser;
181 
182     /// Command processors
183     State x_DoCmd_Health(void);
184     State x_DoCmd_Shutdown(void);
185     State x_DoCmd_Version(void);
186     State x_DoCmd_GetConfig(void);
187     State x_DoCmd_AckAlert(void);
188     State x_DoCmd_ReConfig(void);
189     State x_DoCmd_GetStat(void);
190     State x_DoCmd_Put(void);
191     State x_DoCmd_Get(void);
192     State x_DoCmd_GetLast(void);
193     State x_DoCmd_SetValid(void);
194     State x_DoCmd_GetSize(void);
195     State x_DoCmd_Prolong(void);
196     State x_DoCmd_HasBlob(void);
197     State x_DoCmd_Remove(void);
198     State x_DoCmd_IC_Store(void);
199     State x_DoCmd_SyncStart(void);
200     State x_DoCmd_SyncBlobsList(void);
201     State x_DoCmd_CopyPut(void);
202     State x_DoCmd_CopyProlong(void);
203     State x_DoCmd_SyncGet(void);
204     State x_DoCmd_SyncProlongInfo(void);
205     State x_DoCmd_SyncCommit(void);
206     State x_DoCmd_SyncCancel(void);
207     State x_DoCmd_GetMeta(void);
208     State x_DoCmd_ProxyMeta(void);
209     State x_DoCmd_CopyUpdate(void);
210     //State x_DoCmd_GetBlobsList(void);
211     /// Universal processor for all commands not implemented now.
212     State x_DoCmd_NotImplemented(void);
213 
214     State x_DoCmd_Purge(void);
215     State x_DoCmd_CopyPurge(void);
216 
217     State x_DoCmd_GetBList(void);
218     State x_DoCmd_GetBListNext(void);
219 
220     State x_FinishCommand(void);
221 
222 private:
223     void x_ResetFlags(void);
224     /// Set additional machine state flag
225     void x_SetFlag  (ENCCmdFlags flag);
226     /// Remove additional machine state flag
227     void x_UnsetFlag(ENCCmdFlags flag);
228     /// Check if additional machine state flag is set
229     bool x_IsFlagSet(ENCCmdFlags flag);
230     bool x_IsUserFlagSet(ENCUserFlags flag);
231     bool x_IsCmdSucceeded(int cmd_status);
232     unsigned int x_GetBlobTTL(void);
233 
234     // State machine implementation
235 
236     State x_SocketOpened(void);
237     State x_CloseCmdAndConn(void);
238     State x_SaveStatsAndClose(void);
239     State x_PrintCmdsCntAndClose(void);
240     /// Read authentication message from client
241     State x_ReadAuthMessage(void);
242     /// Read command and start it if it's available
243     State x_ReadCommand(void);
244     /// Process "waiting" for blob locking. In fact just shift to next state
245     /// if lock is acquired and just return if not.
246     State x_WaitForBlobAccess(void);
247     State x_ReportBlobNotFound(void);
248     State x_CloseOnPeerError(void);
249     /// Read signature in blob transfer protocol
250     State x_ReadBlobSignature(void);
251     /// Read length of chunk in blob transfer protocol
252     State x_ReadBlobChunkLength(void);
253     /// Read chunk data in blob transfer protocol
254     State x_ReadBlobChunk(void);
255     /// Write data from blob to socket
256     State x_WriteBlobData(void);
257     State x_WriteSendBuff(void);
258     State x_WriteSyncStartExtra(void);
259     State x_ProxyToNextPeer(void);
260     State x_SendCmdAsProxy(void);
261     State x_WaitForPeerAnswer(void);
262     State x_WriteInitWriteResponse(void);
263     State x_ReadMetaNextPeer(void);
264     State x_SendGetMetaCmd(void);
265     State x_ReadMetaResults(void);
266     State x_ExecuteOnLatestSrvId(void);
267     State x_PutToNextPeer(void);
268     State x_SendPutToPeerCmd(void);
269     State x_ReadPutResults(void);
270     State x_PurgeToNextPeer(void);
271     State x_SendPurgeToPeerCmd(void);
272     State x_ReadPurgeResults(void);
273 
274     /// Close connection (or at least make CServer believe that we closed it
275     /// by ourselves)
276     void x_CloseConnection(void);
277     /// Assign command parameters returned by protocol parser to handler
278     /// member variables
279     void x_AssignCmdParams(void);
280     /// Print "request_start" message into diagnostics with all parameters of
281     /// the current command.
282     void x_PrintRequestStart(CSrvDiagMsg& diag_msg);
283     /// Start command returned by protocol parser
284     State x_StartCommand(void);
285     /// Command execution is finished, do cleanup work
286     void x_CleanCmdResources(void);
287     /// Start reading blob from socket
288     State x_StartReadingBlob(void);
289     /// Client finished sending blob, do cleanup
290     State x_FinishReadingBlob(void);
291 
292     void x_ProlongBlobDeadTime(unsigned int add_time);
293     void x_ProlongVersionLife(void);
294     void x_WriteFullBlobsList(void);
295     void x_GetCurSlotServers(void);
296 
297     void x_JournalBlobPutResult(int status, const string& blob_key, Uint2 blob_slot);
298     void x_ReportError( EHTTPStatus sts, bool eol = true);
299     void x_ReportError( const string& sts, bool eol = true);
300     CNCMessageHandler& x_ReportOK(const string& sts);
301     void x_LogCmdEvent( const CTempString& evt);
302     void x_LogCmdLog(void);
303 
304     TNCCmdFlags               m_Flags;
305     TNCUserFlags              m_UserFlags;
306     /// NetCache protocol parser
307     TProtoParser              m_Parser;
308     SParsedCmd                m_ParsedCmd;
309     /// Processor for the currently executed NetCache command
310     State                     m_CmdProcessor;
311     ///
312     Uint8                     m_CntCmds;
313     /// Holder of the lock for blob
314     CNCBlobAccessor*          m_BlobAccess;
315     SNCSyncEvent*             m_write_event;
316     /// Length of the current blob chunk to read from socket
317     Uint4                     m_ChunkLen;
318 
319     Uint2                     m_LocalPort;
320     ///
321     TStringMap                m_ClientParams;
322     TStringMap                m_CmdParams;
323     ///
324     const SNCSpecificParams*  m_AppSetup;
325     ///
326     const SNCSpecificParams*  m_BaseAppSetup;
327     ///
328     string                    m_PrevCache;
329     ///
330     const SNCSpecificParams*  m_PrevAppSetup;
331     /// Time when command started execution
332     CSrvTime                  m_CmdStartTime;
333     CSrvTime                  m_CmdPrevTime;
334     vector<string>            m_CmdLog;
335 
336     /// Blob key in current command
337     CNCBlobKey                m_NCBlobKey;
338     /// Blob version in current command
339     int                       m_BlobVersion;
340     /// "Password" to access the blob
341     string                    m_BlobPass;
342     /// Version of the blob key to generate
343     unsigned int              m_KeyVersion;
344     /// Time-to-live value for the blob
345     unsigned int              m_BlobTTL;
346     /// Offset to start blob reading from
347     Uint8                      m_StartPos;
348     /// Exact size of the blob sent by client
349     Uint8                     m_Size;
350     Uint8                     m_BlobSize;
351     SNCBlobVerData*           m_CopyBlobInfo;
352     Uint8                     m_OrigRecNo;
353     Uint8                     m_OrigSrvId;
354     Uint8                     m_OrigTime;
355     Uint8                     m_SrvId;
356     Uint2                     m_Slot;
357     Uint8                     m_LocalRecNo;
358     Uint8                     m_RemoteRecNo;
359     auto_ptr<TNCBufferType>   m_SendBuff;
360     size_t                    m_SendPos;
361     string                    m_RawBlobPass;
362     Uint8                     m_SyncId;
363     Uint2                     m_BlobSlot;
364     Uint2                     m_TimeBucket;
365     Uint1                     m_Quorum;
366     bool                      m_SearchOnRead;
367     bool                      m_ThisServerIsMain;
368     bool                      m_LatestExist;
369     bool                      m_ForceLocal;
370     bool                      m_StatPrev;
371     Uint1                     m_SrvsIndex;
372     int                       m_CmdVersion;
373     Uint8                     m_LatestSrvId;
374     SNCBlobSummary*           m_LatestBlobSum;
375     TServersList              m_CheckSrvs;
376     TServersList              m_MirrorsDone;
377     CNCActiveClientHub*       m_ActiveHub;
378     string                    m_LastPeerError;
379     string                    m_StatType;
380     Uint8                     m_AgeMax;
381     Uint8                     m_AgeCur;
382     SNCBlobFilter*            m_BlobFilter;
383     set<Uint2>                m_SlotsDone;
384 
385     string m_PosponedCmd;
386     enum EHttpMode {
387         eNoHttp = 0,
388         eHttp10 = 1,
389         eHttp11 = 2
390     } m_HttpMode;
391 
x_IsHttpMode(void) const392     bool x_IsHttpMode(void) const {
393         return m_HttpMode != eNoHttp;
394     }
395     void x_WriteHttpResponse(void);
396     void x_WriteHttpHeader(int cmd_status, size_t content_length, bool binary);
397 public:
398     void BeginProxyResponse(const CTempString& response, size_t content_length);
399 };
400 
401 
402 class CNCMsgHandler_Factory : public CSrvSocketFactory
403 {
404 public:
405     CNCMsgHandler_Factory(void);
406     virtual ~CNCMsgHandler_Factory(void);
407 
408     virtual CSrvSocketTask* CreateSocketTask(void);
409 };
410 
411 
412 
413 //////////////////////////////////////////////////////////////////////////
414 // Inline functions
415 //////////////////////////////////////////////////////////////////////////
416 
417 inline bool
IsBlobWritingFinished(void)418 CNCMessageHandler::IsBlobWritingFinished(void)
419 {
420     return m_ChunkLen == 0xFFFFFFFF;
421 }
422 
423 END_NCBI_SCOPE
424 
425 #endif /* NETCACHE__MESSAGE_HANDLER__HPP */
426