1 #ifndef NETCACHE__MESSAGE_HANDLER__HPP
2 #define NETCACHE__MESSAGE_HANDLER__HPP
3 /* $Id: message_handler.hpp 564680 2018-05-31 19:19:16Z gouriano $
4 * ===========================================================================
5 *
6 * PUBLIC DOMAIN NOTICE
7 * National Center for Biotechnology Information
8 *
9 * This software/database is a "United States Government Work" under the
10 * terms of the United States Copyright Act. It was written as part of
11 * the author's official duties as a United States Government employee and
12 * thus cannot be copyrighted. This software/database is freely available
13 * to the public for use. The National Library of Medicine and the U.S.
14 * Government have not placed any restriction on its use or reproduction.
15 *
16 * Although all reasonable efforts have been taken to ensure the accuracy
17 * and reliability of the software and data, the NLM and the U.S.
18 * Government do not and cannot warrant the performance or results that
19 * may be obtained by using this software or data. The NLM and the U.S.
20 * Government disclaim all warranties, express or implied, including
21 * warranties of performance, merchantability or fitness for any particular
22 * purpose.
23 *
24 * Please cite the author in any work or product based on this material.
25 *
26 * ===========================================================================
27 *
28 * Authors: Pavel Ivanov
29 *
30 * File Description: Network cache daemon
31 *
32 */
33
34 #include <connect/services/netservice_protocol_parser.hpp>
35
36 #include "nc_utils.hpp"
37
38
39 BEGIN_NCBI_SCOPE
40
41
42 class CNCActiveClientHub;
43 class CNCBlobAccessor;
44 struct SNCSpecificParams;
45 struct SNCBlobVerData;
46 struct SNCBlobSummary;
47 struct SNCBlobFilter;
48 struct SNCSyncEvent;
49
50
51 enum ENCCmdFlags {
52 fNoCmdFlags = 0,
53 /// Command needs access to the blob.
54 fNeedsBlobAccess = 1 << 0,
55 /// Command needs to generate blob key if it's empty.
56 fCanGenerateKey = 1 << 1,
57 /// Command needs finished database caching to be executed locally.
58 fNeedsStorageCache = 1 << 2,
59 /// Do not check access password during command execution.
60 fDoNotCheckPassword = 1 << 3,
61 /// Always execute the command locally without proxying to peers.
62 fDoNotProxyToPeers = 1 << 4,
63 /// Command needs to search for the latest version of the blob on all servers.
64 fUsesPeerSearch = 1 << 5,
65 /// During the blob search on other servers command needs to know only
66 /// whether this blob exists or not, i.e. first positive response
67 /// of existence should finish the search.
68 fPeerFindExistsOnly = 1 << 6,
69 /// Command comes from client and needs disk space to execute.
70 fNeedsSpaceAsClient = 1 << 7,
71 /// Command comes from other NC server and needs disk space to execute.
72 fNeedsSpaceAsPeer = 1 << 8,
73 /// Byte order should be swapped when reading length of chunks in blob
74 /// transfer protocol.
75 fSwapLengthBytes = 1 << 9,
76 /// Command does not need a confirmation at the end of execution.
77 /// Usually, all commands need a reply,
78 /// but storing blob (STOR) may go without (in 'performance' mode)
79 fNoReplyOnFinish = 1 << 10,
80 /// There is exact size of the blob transferred to NetCache.
81 fReadExactBlobSize = 1 << 11,
82 /// Client will send blob data for the command and won't send EOF marker.
83 fSkipBlobEOF = 1 << 12,
84 /// Command copies sync log event from other server, not creates a new one.
85 fCopyLogEvent = 1 << 13,
86 /// Command can be executed only by admin client.
87 fNeedsAdminClient = 1 << 14,
88 /// Command can be executed only after successful execution of SYNC_START.
89 fRunsInStartedSync = 1 << 15,
90 /// Command should be executed even if server wants to abort
91 /// the synchronization.
92 fProhibitsSyncAbort = 1 << 16,
93 /// Command should be executed with a lower priority.
94 fNeedsLowerPriority = 1 << 17,
95 /// Command shouldn't check the blob version (it doesn't come with
96 /// the command).
97 fNoBlobVersionCheck = 1 << 18,
98 /// Access information about blob shouldn't be printed at the end
99 /// of the command.
100 fNoBlobAccessStats = 1 << 19,
101 /// Consider synchronization command to be successful even though
102 /// RequestStatus is not eStatus_OK.
103 fSyncCmdSuccessful = 1 << 20,
104 /// This is PUT2 command and connection that used PUT2 command.
105 fCursedPUT2Cmd = 1 << 21,
106 /// Command comes from client, not from other NC server.
107 fComesFromClient = 1 << 22,
108 /// HTTP-related.
109 fIsHttp = 1 << 23,
110 /// Command needs access to the blob list.
111 fNeedsBlobList = 1 << 24,
112
113
114 eProxyBlobRead = fNeedsBlobAccess | fUsesPeerSearch,
115 eClientBlobRead = eProxyBlobRead | fComesFromClient,
116 eProxyBlobWrite = fNeedsBlobAccess | fNeedsSpaceAsClient,
117 eClientBlobWrite = eProxyBlobWrite | fComesFromClient,
118 eCopyBlobFromPeer = fNeedsBlobAccess | fNeedsStorageCache | fDoNotProxyToPeers
119 | fDoNotCheckPassword | fNeedsAdminClient,
120 eRunsInStartedSync = fRunsInStartedSync | fNeedsAdminClient | fNeedsLowerPriority
121 | fNeedsStorageCache,
122 eSyncBlobCmd = eRunsInStartedSync | fNeedsBlobAccess | fDoNotProxyToPeers
123 | fDoNotCheckPassword,
124 eBlobPut = fNeedsSpaceAsClient | fNeedsSpaceAsPeer
125 };
126 typedef Uint4 TNCCmdFlags;
127
128 enum ENCUserFlags {
129 fNoUserFlags = 0,
130 /// Command does not update blob expiration time
131 fNoProlong = 1 << 0,
132 /// Command does not create blob if it does not exist
133 fNoCreate = 1 << 1
134 };
135 typedef Uint4 TNCUserFlags;
136
137 enum ENCProxyCmd {
138 eProxyNone = 0,
139 eProxyRead = 1,
140 eProxyWrite,
141 eProxyHasBlob,
142 eProxyGetSize,
143 eProxyReadLast,
144 eProxySetValid,
145 eProxyRemove,
146 eProxyGetMeta,
147 eProxyProlong,
148 eProxyGetBList,
149 eProxyGetBList2
150 } NCBI_PACKED_ENUM_END();
151
152 /// Handler of all NetCache incoming requests.
153 /// Handler written to be reusable object so that if one connection to
154 /// NetCache is closed then handler from it can go to serve for another newly
155 /// opened connection.
156 class CNCMessageHandler : public CSrvSocketTask,
157 public CSrvStatesTask<CNCMessageHandler>
158 {
159 public:
160 /// Create handler for the given NetCache server object
161 CNCMessageHandler(void);
162 virtual ~CNCMessageHandler(void);
163
164 bool IsBlobWritingFinished(void);
165
166 /// Extra information about each NetCache command
167 struct SCommandExtra {
168 /// Which method will process the command
169 State processor;
170 /// Command name to present to user in statistics
171 const char* cmd_name;
172 TNCCmdFlags cmd_flags;
173 /// What access to the blob command should receive
174 ENCAccessType blob_access;
175 ENCProxyCmd proxy_cmd;
176 };
177 /// Type definitions for NetCache protocol parser
178 typedef SNSProtoCmdDef<SCommandExtra> SCommandDef;
179 typedef SNSProtoParsedCmd<SCommandExtra> SParsedCmd;
180 typedef CNetServProtoParser<SCommandExtra> TProtoParser;
181
182 /// Command processors
183 State x_DoCmd_Health(void);
184 State x_DoCmd_Shutdown(void);
185 State x_DoCmd_Version(void);
186 State x_DoCmd_GetConfig(void);
187 State x_DoCmd_AckAlert(void);
188 State x_DoCmd_ReConfig(void);
189 State x_DoCmd_GetStat(void);
190 State x_DoCmd_Put(void);
191 State x_DoCmd_Get(void);
192 State x_DoCmd_GetLast(void);
193 State x_DoCmd_SetValid(void);
194 State x_DoCmd_GetSize(void);
195 State x_DoCmd_Prolong(void);
196 State x_DoCmd_HasBlob(void);
197 State x_DoCmd_Remove(void);
198 State x_DoCmd_IC_Store(void);
199 State x_DoCmd_SyncStart(void);
200 State x_DoCmd_SyncBlobsList(void);
201 State x_DoCmd_CopyPut(void);
202 State x_DoCmd_CopyProlong(void);
203 State x_DoCmd_SyncGet(void);
204 State x_DoCmd_SyncProlongInfo(void);
205 State x_DoCmd_SyncCommit(void);
206 State x_DoCmd_SyncCancel(void);
207 State x_DoCmd_GetMeta(void);
208 State x_DoCmd_ProxyMeta(void);
209 State x_DoCmd_CopyUpdate(void);
210 //State x_DoCmd_GetBlobsList(void);
211 /// Universal processor for all commands not implemented now.
212 State x_DoCmd_NotImplemented(void);
213
214 State x_DoCmd_Purge(void);
215 State x_DoCmd_CopyPurge(void);
216
217 State x_DoCmd_GetBList(void);
218 State x_DoCmd_GetBListNext(void);
219
220 State x_FinishCommand(void);
221
222 private:
223 void x_ResetFlags(void);
224 /// Set additional machine state flag
225 void x_SetFlag (ENCCmdFlags flag);
226 /// Remove additional machine state flag
227 void x_UnsetFlag(ENCCmdFlags flag);
228 /// Check if additional machine state flag is set
229 bool x_IsFlagSet(ENCCmdFlags flag);
230 bool x_IsUserFlagSet(ENCUserFlags flag);
231 bool x_IsCmdSucceeded(int cmd_status);
232 unsigned int x_GetBlobTTL(void);
233
234 // State machine implementation
235
236 State x_SocketOpened(void);
237 State x_CloseCmdAndConn(void);
238 State x_SaveStatsAndClose(void);
239 State x_PrintCmdsCntAndClose(void);
240 /// Read authentication message from client
241 State x_ReadAuthMessage(void);
242 /// Read command and start it if it's available
243 State x_ReadCommand(void);
244 /// Process "waiting" for blob locking. In fact just shift to next state
245 /// if lock is acquired and just return if not.
246 State x_WaitForBlobAccess(void);
247 State x_ReportBlobNotFound(void);
248 State x_CloseOnPeerError(void);
249 /// Read signature in blob transfer protocol
250 State x_ReadBlobSignature(void);
251 /// Read length of chunk in blob transfer protocol
252 State x_ReadBlobChunkLength(void);
253 /// Read chunk data in blob transfer protocol
254 State x_ReadBlobChunk(void);
255 /// Write data from blob to socket
256 State x_WriteBlobData(void);
257 State x_WriteSendBuff(void);
258 State x_WriteSyncStartExtra(void);
259 State x_ProxyToNextPeer(void);
260 State x_SendCmdAsProxy(void);
261 State x_WaitForPeerAnswer(void);
262 State x_WriteInitWriteResponse(void);
263 State x_ReadMetaNextPeer(void);
264 State x_SendGetMetaCmd(void);
265 State x_ReadMetaResults(void);
266 State x_ExecuteOnLatestSrvId(void);
267 State x_PutToNextPeer(void);
268 State x_SendPutToPeerCmd(void);
269 State x_ReadPutResults(void);
270 State x_PurgeToNextPeer(void);
271 State x_SendPurgeToPeerCmd(void);
272 State x_ReadPurgeResults(void);
273
274 /// Close connection (or at least make CServer believe that we closed it
275 /// by ourselves)
276 void x_CloseConnection(void);
277 /// Assign command parameters returned by protocol parser to handler
278 /// member variables
279 void x_AssignCmdParams(void);
280 /// Print "request_start" message into diagnostics with all parameters of
281 /// the current command.
282 void x_PrintRequestStart(CSrvDiagMsg& diag_msg);
283 /// Start command returned by protocol parser
284 State x_StartCommand(void);
285 /// Command execution is finished, do cleanup work
286 void x_CleanCmdResources(void);
287 /// Start reading blob from socket
288 State x_StartReadingBlob(void);
289 /// Client finished sending blob, do cleanup
290 State x_FinishReadingBlob(void);
291
292 void x_ProlongBlobDeadTime(unsigned int add_time);
293 void x_ProlongVersionLife(void);
294 void x_WriteFullBlobsList(void);
295 void x_GetCurSlotServers(void);
296
297 void x_JournalBlobPutResult(int status, const string& blob_key, Uint2 blob_slot);
298 void x_ReportError( EHTTPStatus sts, bool eol = true);
299 void x_ReportError( const string& sts, bool eol = true);
300 CNCMessageHandler& x_ReportOK(const string& sts);
301 void x_LogCmdEvent( const CTempString& evt);
302 void x_LogCmdLog(void);
303
304 TNCCmdFlags m_Flags;
305 TNCUserFlags m_UserFlags;
306 /// NetCache protocol parser
307 TProtoParser m_Parser;
308 SParsedCmd m_ParsedCmd;
309 /// Processor for the currently executed NetCache command
310 State m_CmdProcessor;
311 ///
312 Uint8 m_CntCmds;
313 /// Holder of the lock for blob
314 CNCBlobAccessor* m_BlobAccess;
315 SNCSyncEvent* m_write_event;
316 /// Length of the current blob chunk to read from socket
317 Uint4 m_ChunkLen;
318
319 Uint2 m_LocalPort;
320 ///
321 TStringMap m_ClientParams;
322 TStringMap m_CmdParams;
323 ///
324 const SNCSpecificParams* m_AppSetup;
325 ///
326 const SNCSpecificParams* m_BaseAppSetup;
327 ///
328 string m_PrevCache;
329 ///
330 const SNCSpecificParams* m_PrevAppSetup;
331 /// Time when command started execution
332 CSrvTime m_CmdStartTime;
333 CSrvTime m_CmdPrevTime;
334 vector<string> m_CmdLog;
335
336 /// Blob key in current command
337 CNCBlobKey m_NCBlobKey;
338 /// Blob version in current command
339 int m_BlobVersion;
340 /// "Password" to access the blob
341 string m_BlobPass;
342 /// Version of the blob key to generate
343 unsigned int m_KeyVersion;
344 /// Time-to-live value for the blob
345 unsigned int m_BlobTTL;
346 /// Offset to start blob reading from
347 Uint8 m_StartPos;
348 /// Exact size of the blob sent by client
349 Uint8 m_Size;
350 Uint8 m_BlobSize;
351 SNCBlobVerData* m_CopyBlobInfo;
352 Uint8 m_OrigRecNo;
353 Uint8 m_OrigSrvId;
354 Uint8 m_OrigTime;
355 Uint8 m_SrvId;
356 Uint2 m_Slot;
357 Uint8 m_LocalRecNo;
358 Uint8 m_RemoteRecNo;
359 auto_ptr<TNCBufferType> m_SendBuff;
360 size_t m_SendPos;
361 string m_RawBlobPass;
362 Uint8 m_SyncId;
363 Uint2 m_BlobSlot;
364 Uint2 m_TimeBucket;
365 Uint1 m_Quorum;
366 bool m_SearchOnRead;
367 bool m_ThisServerIsMain;
368 bool m_LatestExist;
369 bool m_ForceLocal;
370 bool m_StatPrev;
371 Uint1 m_SrvsIndex;
372 int m_CmdVersion;
373 Uint8 m_LatestSrvId;
374 SNCBlobSummary* m_LatestBlobSum;
375 TServersList m_CheckSrvs;
376 TServersList m_MirrorsDone;
377 CNCActiveClientHub* m_ActiveHub;
378 string m_LastPeerError;
379 string m_StatType;
380 Uint8 m_AgeMax;
381 Uint8 m_AgeCur;
382 SNCBlobFilter* m_BlobFilter;
383 set<Uint2> m_SlotsDone;
384
385 string m_PosponedCmd;
386 enum EHttpMode {
387 eNoHttp = 0,
388 eHttp10 = 1,
389 eHttp11 = 2
390 } m_HttpMode;
391
x_IsHttpMode(void) const392 bool x_IsHttpMode(void) const {
393 return m_HttpMode != eNoHttp;
394 }
395 void x_WriteHttpResponse(void);
396 void x_WriteHttpHeader(int cmd_status, size_t content_length, bool binary);
397 public:
398 void BeginProxyResponse(const CTempString& response, size_t content_length);
399 };
400
401
402 class CNCMsgHandler_Factory : public CSrvSocketFactory
403 {
404 public:
405 CNCMsgHandler_Factory(void);
406 virtual ~CNCMsgHandler_Factory(void);
407
408 virtual CSrvSocketTask* CreateSocketTask(void);
409 };
410
411
412
413 //////////////////////////////////////////////////////////////////////////
414 // Inline functions
415 //////////////////////////////////////////////////////////////////////////
416
417 inline bool
IsBlobWritingFinished(void)418 CNCMessageHandler::IsBlobWritingFinished(void)
419 {
420 return m_ChunkLen == 0xFFFFFFFF;
421 }
422
423 END_NCBI_SCOPE
424
425 #endif /* NETCACHE__MESSAGE_HANDLER__HPP */
426