1 /*  $Id: message_handler.cpp 587630 2019-06-06 17:41:53Z gouriano $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  *
28  */
29 
30 #include "nc_pch.hpp"
31 
32 #include <corelib/ncbireg.hpp>
33 #include <corelib/ncbifile.hpp>
34 #include <corelib/request_ctx.hpp>
35 #include <corelib/ncbi_bswap.hpp>
36 #include <util/md5.hpp>
37 
38 #include "netcached.hpp"
39 #include "message_handler.hpp"
40 #include "netcache_version.hpp"
41 #include "nc_stat.hpp"
42 #include "peer_control.hpp"
43 #include "distribution_conf.hpp"
44 #include "periodic_sync.hpp"
45 #include "active_handler.hpp"
46 #include "nc_storage.hpp"
47 #include "nc_storage_blob.hpp"
48 #include "logging.hpp"
49 
50 
51 
52 BEGIN_NCBI_SCOPE
53 
54 #if 0
55 #define LOG_CURRENT_FUNCTION SRV_LOG(Warning, "this: " << (void*)this);
56 #else
57 #define LOG_CURRENT_FUNCTION
58 #endif
59 
60 // when 1, server always broadcasts 'blob update' notifications.
61 // This will create empty blob stub on COPY_UPD if needed
62 // This takes time though, and I am not sure this is required.
63 // Another problem is that it overloads communication channels -
64 // NC fails much more often in CNCPeerControl::x_ReserveBGConn (too many connections)
65 #define USE_ALWAYS_COPY_UPD  0
66 
67 /// Definition of all NetCache commands
68 ///
69 /// General format of a "NetCache" command is as follows:
70 ///
71 /// CMD param1 param2 ...
72 ///
73 /// Format of "ICache" command is as follows:
74 ///
75 /// IC(cache) CMD param1 param2 ...
76 ///
77 /// Here "IC" is two letters that appear in command literally. "cache" is name
78 /// of the cache where blob is stored; it's mentioned in parameter list as
79 /// first parameter with type eNSPA_ICPrefix. Every command parameter can be
80 /// given as just value or as name=value pair. String parameter values can be
81 /// enclosed in double quotes. If parameter is declared with the flag
82 /// eNSPA_Optional then it can be skipped from the command, in this case its
83 /// default value will be used (if any). If parameter flag eNSPA_Optchain has
84 /// the same meaning except if it's not provided then all following parameters
85 /// marked as eNSPA_Optional will be assumed not provided too.
86 ///
87 /// If command needs some binary data along with it then it's sent split in
88 /// chunks each having 4-byte integer prefix containing the length of the chunk.
89 /// When all data is sent special chunk length 0xFFFFFFFF should be sent
90 /// at the end. Successful response of each command is sent as one line
91 /// starting with "OK:" and then space-separated parameters that need to be
92 /// returned. If response should contain binary data then initial response
93 /// line should have "SIZE=nnn" with the size of binary data to follow.
94 /// Unsuccessful responses to commands always start with "ERR:" and then error
95 /// explanation follows.
96 ///
97 /// Descriptions of commands have
98 ///  - command name as it comes from client;
99 ///  - structure containing
100 ///    * state function processing this command;
101 ///    * command name as it appears in statistics;
102 ///    * flags controlling command behavior;
103 ///    * type of access to blob if needed;
104 ///    * type of proxy command that should be executed if command will need to
105 ///      be proxied to other servers;
106 ///  - set of structures explaining command parameters. Each structure has
107 ///    * parameter name which can be used by client if it passes parameters in
108 ///      name=value form. Also name is used to distinguish parameters in
109 ///      x_AssignCmdParams();
110 ///    * type of parameter - parser makes additional checks to see if given
111 ///      value is applicable for necessary parameter type;
112 ///    * parameter flags.
113 static CNCMessageHandler::SCommandDef s_CommandMap[] = {
114     // "Are you alive?" command. This is old and deprecated command but it's
115     // executed a lot in old ICache clients. All that they need in response is
116     // "OK:".
117     { "A?",
118         {&CNCMessageHandler::x_FinishCommand,
119             "A?", fNoCmdFlags, eNCNone, eProxyNone} },
120     // Requests version of the server.
121     { "VERSION",
122         {&CNCMessageHandler::x_DoCmd_Version,
123             "VERSION", fNoCmdFlags, eNCNone, eProxyNone},
124         {
125           // Client IP for application requesting the info.
126           { "ip",      eNSPT_Str,  fNSPA_Optional },
127           // Session ID for application requesting the info.
128           { "sid",     eNSPT_Str,  eNSPA_Optional },
129           // request Hit ID
130           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
131         } },
132     // Requests some "health" information about the server.
133     { "HEALTH",
134         {&CNCMessageHandler::x_DoCmd_Health,
135             "HEALTH", fNoCmdFlags, eNCNone, eProxyNone},
136         {
137           // Client IP for application requesting the info.
138           { "ip",      eNSPT_Str,  fNSPA_Optional },
139           // Session ID for application requesting the info.
140           { "sid",     eNSPT_Str,  eNSPA_Optional },
141           // request Hit ID
142           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
143         } },
144     // Check if blob exists. Command for "ICache" clients.
145     { "HASB",
146         {&CNCMessageHandler::x_DoCmd_HasBlob,
147             "IC_HASB",
148             eClientBlobRead | fPeerFindExistsOnly | fNoBlobVersionCheck,
149             eNCRead,
150             eProxyHasBlob},
151           // Name of cache for blob.
152         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
153           // Blob's key.
154           { "key",     eNSPT_Str,  eNSPA_Required },
155           // Blob's version.
156           { "version", eNSPT_Int,  eNSPA_Required },
157           // Blob's subkey.
158           { "subkey",  eNSPT_Str,  eNSPA_Required },
159           // Quorum to use for this operation.
160           { "qrum",    eNSPT_Int,  eNSPA_Optional },
161           // Client IP for application requesting the info.
162           { "ip",      eNSPT_Str,  fNSPA_Optional },
163           // Session ID for application requesting the info.
164           { "sid",     eNSPT_Str,  eNSPA_Optional },
165           // Password for blob access.
166           { "pass",    eNSPT_Str,  eNSPA_Optional },
167           // request Hit ID
168           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
169         } },
170     // Read blob contents. Command for "ICache" clients.
171     { "READ",
172         {&CNCMessageHandler::x_DoCmd_Get,
173             "IC_READ",
174             eClientBlobRead,
175             eNCReadData,
176             eProxyRead},
177           // Name of cache for blob.
178         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
179           // Blob's key.
180           { "key",     eNSPT_Str,  eNSPA_Required },
181           // Blob's version.
182           { "version", eNSPT_Int,  eNSPA_Required },
183           // Blob's subkey.
184           { "subkey",  eNSPT_Str,  eNSPA_Required },
185           // Quorum to use for this operation.
186           { "qrum",    eNSPT_Int,  eNSPA_Optional },
187           // Client IP for application requesting the info.
188           { "ip",      eNSPT_Str,  fNSPA_Optional },
189           // Session ID for application requesting the info.
190           { "sid",     eNSPT_Str,  eNSPA_Optional },
191           // Password for blob access.
192           { "pass",    eNSPT_Str,  eNSPA_Optional },
193           // Max age of blob (returned blob should be younger)
194           { "age",     eNSPT_Int,  eNSPA_Optional },
195           // request Hit ID
196           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
197         } },
198     // Write blob contents. Command for "ICache" clients.
199     { "STOR",
200         {&CNCMessageHandler::x_DoCmd_IC_Store,
201             "IC_STOR",
202             eClientBlobWrite | fNoReplyOnFinish,
203             eNCCreate,
204             eProxyWrite},
205           // Name of cache for blob.
206         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
207           // Time-to-live for the blob, 0 means default from server settings.
208           { "ttl",     eNSPT_Int,  eNSPA_Required },
209           // Blob's key.
210           { "key",     eNSPT_Str,  eNSPA_Required },
211           // Blob's version.
212           { "version", eNSPT_Int,  eNSPA_Required },
213           // Blob's subkey.
214           { "subkey",  eNSPT_Str,  eNSPA_Required },
215           // 1 if client wants confirmation after blob has been written
216           // (by default it just assumes that everything is written and moves
217           // further).
218           { "confirm", eNSPT_Int,  eNSPA_Optional },
219           // Quorum to use for this operation.
220           { "qrum",    eNSPT_Int,  eNSPA_Optional },
221           // Client IP for application requesting the info.
222           { "ip",      eNSPT_Str,  fNSPA_Optional },
223           // Session ID for application requesting the info.
224           { "sid",     eNSPT_Str,  eNSPA_Optional },
225           // Password for blob access.
226           { "pass",    eNSPT_Str,  eNSPA_Optional },
227           // request Hit ID
228           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional },
229           // see ENCUserFlags, added in v6.11.0 (CXX-8737)
230           { "flags",  eNSPT_Int,  eNSPA_Optional }
231         } },
232     // Write blob contents. Old and deprecated command which probably is not
233     // used by modern ICache clients anymore. It has the size of the blob right
234     // in the command (so client should know it beforehand) and it doesn't use
235     // "EOF" marker at the end of blob data.
236     { "STRS",
237         {&CNCMessageHandler::x_DoCmd_IC_Store,
238             "IC_STRS",
239             eClientBlobWrite | fReadExactBlobSize | fSkipBlobEOF | fNoReplyOnFinish,
240             eNCCreate,
241             eProxyWrite},
242           // Name of cache for blob.
243         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
244           // Time-to-live for the blob, 0 means default from server settings.
245           { "ttl",     eNSPT_Int,  eNSPA_Required },
246           // Size of the blob to be written.
247           { "size",    eNSPT_Int,  eNSPA_Required },
248           // Blob's key.
249           { "key",     eNSPT_Str,  eNSPA_Required },
250           // Blob's version.
251           { "version", eNSPT_Int,  eNSPA_Required },
252           // Blob's subkey.
253           { "subkey",  eNSPT_Str,  eNSPA_Required },
254           // Quorum to use for this operation.
255           { "qrum",    eNSPT_Int,  eNSPA_Optional },
256           // Client IP for application requesting the info.
257           { "ip",      eNSPT_Str,  fNSPA_Optional },
258           // Session ID for application requesting the info.
259           { "sid",     eNSPT_Str,  eNSPA_Optional },
260           // Password for blob access.
261           { "pass",    eNSPT_Str,  eNSPA_Optional },
262           // request Hit ID
263           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
264         } },
265     // Read all or part of contents of the "last" version of the blob.
266     // In response to the command NC sends blob contents, blob version it has
267     // and flag showing if this version can be considered "valid", i.e. if it's
268     // not expired yet.
269     { "READLAST",
270         {&CNCMessageHandler::x_DoCmd_GetLast,
271             "IC_READLAST",
272             eClientBlobRead | fNoBlobVersionCheck,
273             eNCReadData,
274             eProxyReadLast},
275           // Name of cache for blob.
276         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
277           // Blob's key.
278           { "key",     eNSPT_Str,  eNSPA_Required },
279           // Blob's subkey.
280           { "subkey",  eNSPT_Str,  eNSPA_Required },
281           // Starting position of the data that needs to be sent.
282           { "start",   eNSPT_Int,  eNSPA_Optional },
283           // Size of the data that needs to be sent.
284           { "size",    eNSPT_Int,  eNSPA_Optional },
285           // Quorum to use for this operation.
286           { "qrum",    eNSPT_Int,  eNSPA_Optional },
287           // Client IP for application requesting the info.
288           { "ip",      eNSPT_Str,  fNSPA_Optional },
289           // Session ID for application requesting the info.
290           { "sid",     eNSPT_Str,  eNSPA_Optional },
291           // Password for blob access.
292           { "pass",    eNSPT_Str,  eNSPA_Optional },
293           // Max age of blob (returned blob should be younger)
294           { "age",     eNSPT_Int,  eNSPA_Optional },
295           // request Hit ID
296           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
297         } },
298     // Mark the given blob version as "valid" and do that only if this version
299     // is still current and wasn't rewritten with another version.
300     { "SETVALID",
301         {&CNCMessageHandler::x_DoCmd_SetValid,
302             "IC_SETVALID",
303             fNeedsBlobAccess,
304             eNCRead,
305             eProxySetValid},
306           // Name of cache for blob.
307         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
308           // Blob's key.
309           { "key",     eNSPT_Str,  eNSPA_Required },
310           // Blob's version.
311           { "version", eNSPT_Int,  eNSPA_Required },
312           // Blob's subkey.
313           { "subkey",  eNSPT_Str,  eNSPA_Required },
314           // Quorum to use for this operation.
315           { "qrum",    eNSPT_Int,  eNSPA_Optional },
316           // Client IP for application requesting the info.
317           { "ip",      eNSPT_Str,  fNSPA_Optional },
318           // Session ID for application requesting the info.
319           { "sid",     eNSPT_Str,  eNSPA_Optional },
320           // Password for blob access.
321           { "pass",    eNSPT_Str,  eNSPA_Optional },
322           // request Hit ID
323           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
324         } },
325     // Write blob contents. Command is issued only by other servers while
326     // mirroring just written blobs or processing quorum requirements,
327     // i.e. writing to other servers before answering to client that blob is
328     // written.
329     { "COPY_PUT",
330         {&CNCMessageHandler::x_DoCmd_CopyPut,
331             "COPY_PUT",
332             eCopyBlobFromPeer | fNeedsSpaceAsPeer | fReadExactBlobSize
333                               | fCopyLogEvent,
334             eNCCopyCreate, eProxyNone},
335           // Name of cache for blob (for NC-generated blob keys this will be
336           // empty).
337         { { "cache",   eNSPT_Str,  eNSPA_Required },
338           // Blob's key.
339           { "key",     eNSPT_Str,  eNSPA_Required },
340           // Blob's subkey (for NC-generated blob keys this will be empty).
341           { "subkey",  eNSPT_Str,  eNSPA_Required },
342           // Blob's version (for NC-generated blob keys this will be equal to 0).
343           { "version", eNSPT_Int,  eNSPA_Required },
344           // MD5 checksum of blob's password.
345           { "md5_pass",eNSPT_Str,  eNSPA_Required },
346           // Creation time of the blob (microseconds since epoch).
347           { "cr_time", eNSPT_Int,  eNSPA_Required },
348           // Time-to-live for the blob.
349           { "ttl",     eNSPT_Int,  eNSPA_Required },
350           // Dead-time for the blob (can be greater than expiration time).
351           { "dead",    eNSPT_Int,  eNSPA_Required },
352           // Expiration time for the blob
353           { "exp",     eNSPT_Int,  eNSPA_Required },
354           // Blob size.
355           { "size",    eNSPT_Int,  eNSPA_Required },
356           // Time-to-live for blob's version.
357           { "ver_ttl", eNSPT_Int,  eNSPA_Required },
358           // Blob's version expiration time.
359           { "ver_dead",eNSPT_Int,  eNSPA_Required },
360           // Server_id of the server where blob was created.
361           { "cr_srv",  eNSPT_Int,  eNSPA_Required },
362           // Id of the blob on the server where it was created.
363           { "cr_id",   eNSPT_Int,  eNSPA_Required },
364           // Record number of the event of blob creation in synchronization
365           // logs of the server where blob was created.
366           { "log_rec", eNSPT_Int,  eNSPA_Required },
367           // Version of the command. Field exists for protocol backwards
368           // compatibility with previous versions of NC. In current NC this
369           // version is always 1.
370           { "cmd_ver", eNSPT_Int,  eNSPA_Optional, "0" },
371           // Client IP for application that requested writing the blob.
372           // Parameter is not empty only if command is issued as part of
373           // quorum-related functionality, i.e. before client received
374           // confirmation of blob writing.
375           { "ip",      eNSPT_Str,  fNSPA_Optional },
376           // Session ID for application that requested writing the blob.
377           // Parameter is not empty only if command is issued as part of
378           // quorum-related functionality, i.e. before client received
379           // confirmation of blob writing.
380           { "sid",     eNSPT_Str,  eNSPA_Optional } } },
381     // Prolong blob lifetime. Command is issued only by other servers while
382     // mirroring prolonged blobs.
383     { "COPY_PROLONG",
384         {&CNCMessageHandler::x_DoCmd_CopyProlong,
385             "COPY_PROLONG",
386             eCopyBlobFromPeer,
387             eNCRead, eProxyNone},
388           // Name of cache for blob (for NC-generated blob keys this will be
389           // empty).
390         { { "cache",   eNSPT_Str,  eNSPA_Required },
391           // Blob's key.
392           { "key",     eNSPT_Str,  eNSPA_Required },
393           // Blob's subkey (for NC-generated blob keys this will be empty).
394           { "subkey",  eNSPT_Str,  eNSPA_Required },
395           // Creation time of the blob (microseconds since epoch).
396           { "cr_time", eNSPT_Int,  eNSPA_Required },
397           // Server_id of the server where blob was created.
398           { "cr_srv",  eNSPT_Int,  eNSPA_Required },
399           // Id of the blob on the server where it was created.
400           { "cr_id",   eNSPT_Int,  eNSPA_Required },
401           // Dead-time for the blob (can be greater than expiration time).
402           { "dead",    eNSPT_Int,  eNSPA_Required },
403           // Expiration time for the blob
404           { "exp",     eNSPT_Int,  eNSPA_Required },
405           // Blob's version expiration time.
406           { "ver_dead",eNSPT_Int,  eNSPA_Required },
407           // Time of creation of initial record in synchronization log about
408           // this operation.
409           { "log_time",eNSPT_Int,  fNSPA_Optional },
410           // Server that first made the blob's life prolongation.
411           { "log_srv", eNSPT_Int,  eNSPA_Optional },
412           // Record number of the initial record in synchronization log about
413           // this operation.
414           { "log_rec", eNSPT_Int,  eNSPA_Optional } } },
415     // Write blob contents. Command for "NetCache" clients.
416     { "PUT3",
417         {&CNCMessageHandler::x_DoCmd_Put,
418             "PUT3",
419             eClientBlobWrite | fCanGenerateKey,
420             eNCCreate,
421             eProxyWrite},
422           // Time-to-live for the blob. If not given or 0 then default TTL
423           // is used.
424         { { "ttl",     eNSPT_Int,  eNSPA_Optional },
425           // Key of the blob. If it's not given or empty then new key will be
426           // generated.
427           { "key",     eNSPT_NCID, eNSPA_Optional },
428           // Quorum to use for this operation.
429           { "qrum",    eNSPT_Int,  eNSPA_Optional },
430           // Client IP for application sending the command.
431           { "ip",      eNSPT_Str,  fNSPA_Optional },
432           // Session ID for application sending the command.
433           { "sid",     eNSPT_Str,  eNSPA_Optional },
434           // Password for blob access.
435           { "pass",    eNSPT_Str,  eNSPA_Optional },
436           // request Hit ID
437           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional },
438           // see ENCUserFlags, added in v6.11.0 (CXX-8737)
439           { "flags",  eNSPT_Int,  eNSPA_Optional }
440         } },
441     // Read blob contents. Command for "NetCache" clients.
442     { "GET2",
443         {&CNCMessageHandler::x_DoCmd_Get,
444             "GET2",
445             eClientBlobRead,
446             eNCReadData,
447             eProxyRead},
448           // Key of the blob.
449         { { "key",     eNSPT_NCID, eNSPA_Required },
450           // Not used and not implemented parameter. Exists just for backwards
451           // compatibility with old clients.
452           { "NW",      eNSPT_Id,   eNSPA_Obsolete | fNSPA_Match },
453           // Quorum to use for this operation.
454           { "qrum",    eNSPT_Int,  eNSPA_Optional },
455           // Client IP for application sending the command.
456           { "ip",      eNSPT_Str,  fNSPA_Optional },
457           // Session ID for application sending the command.
458           { "sid",     eNSPT_Str,  eNSPA_Optional },
459           // Password for blob access.
460           { "pass",    eNSPT_Str,  eNSPA_Optional },
461           // Max age of blob (returned blob should be younger)
462           { "age",     eNSPT_Int,  eNSPA_Optional },
463           // request Hit ID
464           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
465         } },
466     // Check if blob exists. Command for "NetCache" clients.
467     { "HASB",
468         {&CNCMessageHandler::x_DoCmd_HasBlob,
469             "HASB",
470             eClientBlobRead | fPeerFindExistsOnly,
471             eNCRead,
472             eProxyHasBlob},
473           // Key of the blob.
474         { { "key",     eNSPT_NCID, eNSPA_Required },
475           // Quorum to use for this operation.
476           { "qrum",    eNSPT_Int,  eNSPA_Optional },
477           // Client IP for application sending the command.
478           { "ip",      eNSPT_Str,  fNSPA_Optional },
479           // Session ID for application sending the command.
480           { "sid",     eNSPT_Str,  eNSPA_Optional },
481           // Password for blob access.
482           { "pass",    eNSPT_Str,  eNSPA_Optional },
483           // request Hit ID
484           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
485         } },
486     // Delete blob. Command for "NetCache" clients.
487     // If the blob doesn't exist command is still considered successful.
488     { "RMV2",
489         {&CNCMessageHandler::x_DoCmd_Remove,
490             "RMV2",
491             fNeedsBlobAccess | fNoBlobAccessStats,
492             eNCCreate,
493             eProxyRemove},
494           // Key of the blob.
495         { { "key",     eNSPT_NCID, eNSPA_Required },
496           // Quorum to use for this operation.
497           { "qrum",    eNSPT_Int,  eNSPA_Optional },
498           // Client IP for application sending the command.
499           { "ip",      eNSPT_Str,  fNSPA_Optional },
500           // Session ID for application sending the command.
501           { "sid",     eNSPT_Str,  eNSPA_Optional },
502           // Password for blob access.
503           { "pass",    eNSPT_Str,  eNSPA_Optional },
504           // request Hit ID
505           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
506         } },
507     // Get size of the blob. Command for "NetCache" clients.
508     { "GSIZ",
509         {&CNCMessageHandler::x_DoCmd_GetSize,
510             "GetSIZe",
511             eClientBlobRead,
512             eNCRead,
513             eProxyGetSize},
514           // Key of the blob.
515         { { "key",     eNSPT_NCID, eNSPA_Required },
516           // Quorum to use for this operation.
517           { "qrum",    eNSPT_Int,  eNSPA_Optional },
518           // Client IP for application sending the command.
519           { "ip",      eNSPT_Str,  fNSPA_Optional },
520           // Session ID for application sending the command.
521           { "sid",     eNSPT_Str,  eNSPA_Optional },
522           // Password for blob access.
523           { "pass",    eNSPT_Str,  eNSPA_Optional },
524           // request Hit ID
525           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
526         } },
527     // Delete blob. Command for "ICache" clients.
528     // If the blob doesn't exist command is still considered successful.
529     { "REMO",
530         {&CNCMessageHandler::x_DoCmd_Remove,
531             "IC_REMOve",
532             fNeedsBlobAccess | fNoBlobAccessStats,
533             eNCCreate,
534             eProxyRemove},
535           // Name of cache for blob.
536         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
537           // Blob's key.
538           { "key",     eNSPT_Str,  eNSPA_Required },
539           // Blob's version.
540           { "version", eNSPT_Int,  eNSPA_Required },
541           // Blob's subkey.
542           { "subkey",  eNSPT_Str,  eNSPA_Required },
543           // Quorum to use for this operation.
544           { "qrum",    eNSPT_Int,  eNSPA_Optional },
545           // Client IP for application sending the command.
546           { "ip",      eNSPT_Str,  fNSPA_Optional },
547           // Session ID for application sending the command.
548           { "sid",     eNSPT_Str,  eNSPA_Optional },
549           // Password for blob access.
550           { "pass",    eNSPT_Str,  eNSPA_Optional },
551           // request Hit ID
552           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
553         } },
554     // Get size of the blob. Command for "ICache" clients.
555     { "GSIZ",
556         {&CNCMessageHandler::x_DoCmd_GetSize,
557             "IC_GetSIZe",
558             eClientBlobRead,
559             eNCRead,
560             eProxyGetSize},
561           // Name of cache for blob.
562         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
563           // Blob's key.
564           { "key",     eNSPT_Str,  eNSPA_Required },
565           // Blob's version.
566           { "version", eNSPT_Int,  eNSPA_Required },
567           // Blob's subkey.
568           { "subkey",  eNSPT_Str,  eNSPA_Required },
569           // Quorum to use for this operation.
570           { "qrum",    eNSPT_Int,  eNSPA_Optional },
571           // Client IP for application sending the command.
572           { "ip",      eNSPT_Str,  fNSPA_Optional },
573           // Session ID for application sending the command.
574           { "sid",     eNSPT_Str,  eNSPA_Optional },
575           // Password for blob access.
576           { "pass",    eNSPT_Str,  eNSPA_Optional },
577           // request Hit ID
578           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
579         } },
580     // Read part of the blob contents. Command for "NetCache" clients.
581     { "GETPART",
582         {&CNCMessageHandler::x_DoCmd_Get,
583             "GETPART",
584             eClientBlobRead,
585             eNCReadData,
586             eProxyRead},
587           // Key of the blob.
588         { { "key",     eNSPT_NCID, eNSPA_Required },
589           // Starting position of the data that needs to be sent.
590           { "start",   eNSPT_Int,  eNSPA_Required },
591           // Size of the data that needs to be sent.
592           { "size",    eNSPT_Int,  eNSPA_Required },
593           // Quorum to use for this operation.
594           { "qrum",    eNSPT_Int,  eNSPA_Optional },
595           // Client IP for application requesting the info.
596           { "ip",      eNSPT_Str,  fNSPA_Optional },
597           // Session ID for application requesting the info.
598           { "sid",     eNSPT_Str,  eNSPA_Optional },
599           // Password for blob access.
600           { "pass",    eNSPT_Str,  eNSPA_Optional },
601           // Max age of blob (returned blob should be younger)
602           { "age",     eNSPT_Int,  eNSPA_Optional },
603           // request Hit ID
604           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
605         } },
606     // Read part of the blob contents. Command for "ICache" clients.
607     { "READPART",
608         {&CNCMessageHandler::x_DoCmd_Get,
609             "IC_READPART",
610             eClientBlobRead,
611             eNCReadData,
612             eProxyRead},
613           // Name of cache for blob.
614         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
615           // Blob's key.
616           { "key",     eNSPT_Str,  eNSPA_Required },
617           // Blob's version.
618           { "version", eNSPT_Int,  eNSPA_Required },
619           // Blob's subkey.
620           { "subkey",  eNSPT_Str,  eNSPA_Required },
621           // Starting position of the data that needs to be sent.
622           { "start",   eNSPT_Int,  eNSPA_Required },
623           // Size of the data that needs to be sent.
624           { "size",    eNSPT_Int,  eNSPA_Required },
625           // Quorum to use for this operation.
626           { "qrum",    eNSPT_Int,  eNSPA_Optional },
627           // Client IP for application requesting the info.
628           { "ip",      eNSPT_Str,  fNSPA_Optional },
629           // Session ID for application requesting the info.
630           { "sid",     eNSPT_Str,  eNSPA_Optional },
631           // Password for blob access.
632           { "pass",    eNSPT_Str,  eNSPA_Optional },
633           // Max age of blob (returned blob should be younger)
634           { "age",     eNSPT_Int,  eNSPA_Optional },
635           // request Hit ID
636           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
637         } },
638     // Get meta information about the blob. This command is sent only by other
639     // NC servers. And it's used to determine which server has the latest
640     // version of the blob. Thus response to this command is a line containing
641     // enough information to compare the blob's creation time with the same
642     // blob on other servers.
643     { "PROXY_META",
644         {&CNCMessageHandler::x_DoCmd_ProxyMeta,
645             "PROXY_META",
646             fNeedsBlobAccess | fNeedsStorageCache | fDoNotProxyToPeers
647                              | fDoNotCheckPassword,
648             eNCRead, eProxyNone},
649           // Name of cache for blob (for NC-generated blob keys this will be
650           // empty).
651         { { "cache",   eNSPT_Str,  eNSPA_Required },
652           // Blob's key.
653           { "key",     eNSPT_Str,  eNSPA_Required },
654           // Blob's subkey (for NC-generated blob keys this will be empty).
655           { "subkey",  eNSPT_Str,  eNSPA_Required },
656           // Client IP for application on behalf of which the info is requested.
657           { "ip",      eNSPT_Str,  eNSPA_Required },
658           // Session ID for application on behalf of which the info is requested.
659           { "sid",     eNSPT_Str,  eNSPA_Required } } },
660 
661     // another NC server notifies this one that the blob was updated there
662     // the blob data will come later (eg, via COPY_PUT)
663     { "COPY_UPD",
664         {&CNCMessageHandler::x_DoCmd_CopyUpdate,
665             "UPD",
666             fNoBlobAccessStats  |
667             fNeedsBlobAccess | fNeedsStorageCache  | fDoNotProxyToPeers
668                              | fDoNotCheckPassword | fNoBlobVersionCheck,
669 #if USE_ALWAYS_COPY_UPD
670             eNCCopyCreate
671 #else
672             eNCRead
673 #endif
674              , eProxyNone },
675           // Name of cache for blob (for NC-generated blob keys this will be
676           // empty).
677         { { "cache",   eNSPT_Str,  eNSPA_Required },
678           // Blob's key.
679           { "key",     eNSPT_Str,  eNSPA_Required },
680           // Blob's subkey (for NC-generated blob keys this will be empty).
681           { "subkey",  eNSPT_Str,  eNSPA_Required },
682           // time of the update
683           { "cr_time", eNSPT_Int,  eNSPA_Required },
684           // Server_id, where the update was made
685           { "cr_srv",  eNSPT_Int,  eNSPA_Required }
686         }
687     },
688     // Write the blob contents. This command is sent only by other NC servers
689     // in response to client's PUT3 (or similar) command when the server where
690     // client have sent initial command cannot execute it locally for any
691     // reason (slot is not processed by that server or initial database caching
692     // is not completed yet).
693     { "PROXY_PUT",
694         {&CNCMessageHandler::x_DoCmd_Put,
695             "PROXY_PUT",
696             eProxyBlobWrite | fDoNotProxyToPeers,
697             eNCCreate,
698             eProxyWrite},
699           // Name of cache for blob (for NC-generated blob keys this will be
700           // empty).
701         { { "cache",   eNSPT_Str,  eNSPA_Required },
702           // Blob's key.
703           { "key",     eNSPT_Str,  eNSPA_Required },
704           // Blob's subkey (for NC-generated blob keys this will be empty).
705           { "subkey",  eNSPT_Str,  eNSPA_Required },
706           // Blob's version (for NC-generated blob keys this will be equal to 0).
707           { "version", eNSPT_Int,  eNSPA_Required },
708           // Time-to-live for the blob.
709           { "ttl",     eNSPT_Int,  eNSPA_Required },
710           // Quorum to use for this operation.
711           { "qrum",    eNSPT_Int,  eNSPA_Required },
712           // Client IP for application requesting the info.
713           { "ip",      eNSPT_Str,  eNSPA_Required },
714           // Session ID for application requesting the info.
715           { "sid",     eNSPT_Str,  eNSPA_Required },
716           // see ENCUserFlags, added in v6.11.0 (CXX-8737)
717           { "flags",  eNSPT_Int,  eNSPA_Optional },
718           // Password for blob access.
719           { "pass",    eNSPT_Str,  eNSPA_Optional }
720         } },
721     // Read all or a part of the blob contents. This command is sent only
722     // by other NC servers in response to client's GET2 (or similar) command
723     // when the server where client have sent initial command cannot execute it
724     // locally for any reason (slot is not processed by that server, or initial
725     // database caching is not completed yet, or it was determined that this
726     // server has latest version of the blob).
727     { "PROXY_GET",
728         {&CNCMessageHandler::x_DoCmd_Get,
729             "PROXY_GET",
730             eProxyBlobRead | fDoNotProxyToPeers,
731             eNCReadData,
732             eProxyRead},
733           // Name of cache for blob (for NC-generated blob keys this will be
734           // empty).
735         { { "cache",   eNSPT_Str,  eNSPA_Required },
736           // Blob's key.
737           { "key",     eNSPT_Str,  eNSPA_Required },
738           // Blob's subkey (for NC-generated blob keys this will be empty).
739           { "subkey",  eNSPT_Str,  eNSPA_Required },
740           // Blob's version (for NC-generated blob keys this will be equal to 0).
741           { "version", eNSPT_Int,  eNSPA_Required },
742           // Starting position of the data that needs to be sent.
743           { "start",   eNSPT_Int,  eNSPA_Required },
744           // Size of the data that needs to be sent.
745           { "size",    eNSPT_Int,  eNSPA_Required },
746           // Quorum to use for this operation.
747           { "qrum",    eNSPT_Int,  eNSPA_Required },
748           // Value of the flag "search_on_read" to use with this command, i.e.
749           // whether this server should search the blob if it's not found
750           // locally.
751           { "srch",    eNSPT_Int,  eNSPA_Required },
752           // Flag whether local execution of this command should be forced
753           // no matter what, i.e. if this flag is set in cases when normal GET2
754           // command would have been proxied to other servers an error should
755           // be returned.
756           { "local",   eNSPT_Int,  eNSPA_Required },
757           // Client IP for application requesting the info.
758           { "ip",      eNSPT_Str,  eNSPA_Required },
759           // Session ID for application requesting the info.
760           { "sid",     eNSPT_Str,  eNSPA_Required },
761           // Password for blob access.
762           { "pass",    eNSPT_Str,  eNSPA_Optional },
763           // Max age of blob (returned blob should be younger)
764           { "age",     eNSPT_Int,  eNSPA_Optional }
765         } },
766     // Check if the blob exists. This command is sent only by other NC servers
767     // in response to client's HASB command when the server where
768     // client have sent initial command cannot execute it locally for any
769     // reason (slot is not processed by that server or initial database caching
770     // is not completed yet).
771     { "PROXY_HASB",
772         {&CNCMessageHandler::x_DoCmd_HasBlob,
773             "PROXY_HASB",
774             eProxyBlobRead | fPeerFindExistsOnly | fDoNotProxyToPeers,
775             eNCRead,
776             eProxyHasBlob},
777           // Name of cache for blob (for NC-generated blob keys this will be
778           // empty).
779         { { "cache",   eNSPT_Str,  eNSPA_Required },
780           // Blob's key.
781           { "key",     eNSPT_Str,  eNSPA_Required },
782           // Blob's subkey (for NC-generated blob keys this will be empty).
783           { "subkey",  eNSPT_Str,  eNSPA_Required },
784           // Quorum to use for this operation.
785           { "qrum",    eNSPT_Int,  eNSPA_Required },
786           // Client IP for application requesting the info.
787           { "ip",      eNSPT_Str,  eNSPA_Required },
788           // Session ID for application requesting the info.
789           { "sid",     eNSPT_Str,  eNSPA_Required },
790           // Password for blob access.
791           { "pass",    eNSPT_Str,  eNSPA_Optional } } },
792     // Get size of the blob. This command is sent only
793     // by other NC servers in response to client's GSIZ command
794     // when the server where client have sent initial command cannot execute it
795     // locally for any reason (slot is not processed by that server, or initial
796     // database caching is not completed yet, or it was determined that this
797     // server has latest version of the blob).
798     { "PROXY_GSIZ",
799         {&CNCMessageHandler::x_DoCmd_GetSize,
800             "PROXY_GetSIZe",
801             eProxyBlobRead | fDoNotProxyToPeers,
802             eNCRead,
803             eProxyGetSize},
804           // Name of cache for blob (for NC-generated blob keys this will be
805           // empty).
806         { { "cache",   eNSPT_Str,  eNSPA_Required },
807           // Blob's key.
808           { "key",     eNSPT_Str,  eNSPA_Required },
809           // Blob's subkey (for NC-generated blob keys this will be empty).
810           { "subkey",  eNSPT_Str,  eNSPA_Required },
811           // Blob's version (for NC-generated blob keys this will be equal to 0).
812           { "version", eNSPT_Int,  eNSPA_Required },
813           // Quorum to use for this operation.
814           { "qrum",    eNSPT_Int,  eNSPA_Required },
815           // Value of the flag "search_on_read" to use with this command, i.e.
816           // whether this server should search the blob if it's not found
817           // locally.
818           { "srch",    eNSPT_Int,  eNSPA_Required },
819           // Flag whether local execution of this command should be forced
820           // no matter what, i.e. if this flag is set in cases when normal GET2
821           // command would have been proxied to other servers an error should
822           // be returned.
823           { "local",   eNSPT_Int,  eNSPA_Required },
824           // Client IP for application requesting the info.
825           { "ip",      eNSPT_Str,  eNSPA_Required },
826           // Session ID for application requesting the info.
827           { "sid",     eNSPT_Str,  eNSPA_Required },
828           // Password for blob access.
829           { "pass",    eNSPT_Str,  eNSPA_Optional } } },
830     // Read all or a part of contents of the "last version" of the blob.
831     // This command is sent only by other NC servers in response to client's
832     // READLAST command when the server where client have sent initial command
833     // cannot execute it locally for any reason (slot is not processed by that
834     // server, or initial database caching is not completed yet, or it was
835     // determined that this server has latest version of the blob).
836     { "PROXY_READLAST",
837         {&CNCMessageHandler::x_DoCmd_GetLast,
838             "PROXY_READLAST",
839             eProxyBlobRead | fNoBlobVersionCheck | fDoNotProxyToPeers,
840             eNCReadData,
841             eProxyReadLast},
842           // Name of cache for blob (for NC-generated blob keys this will be
843           // empty).
844         { { "cache",   eNSPT_Str,  eNSPA_Required },
845           // Blob's key.
846           { "key",     eNSPT_Str,  eNSPA_Required },
847           // Blob's subkey (for NC-generated blob keys this will be empty).
848           { "subkey",  eNSPT_Str,  eNSPA_Required },
849           // Starting position of the data that needs to be sent.
850           { "start",   eNSPT_Int,  eNSPA_Required },
851           // Size of the data that needs to be sent.
852           { "size",    eNSPT_Int,  eNSPA_Required },
853           // Quorum to use for this operation.
854           { "qrum",    eNSPT_Int,  eNSPA_Required },
855           // Value of the flag "search_on_read" to use with this command, i.e.
856           // whether this server should search the blob if it's not found
857           // locally.
858           { "srch",    eNSPT_Int,  eNSPA_Required },
859           // Flag whether local execution of this command should be forced
860           // no matter what, i.e. if this flag is set in cases when normal
861           // READLAST command would have been proxied to other servers an error
862           // should be returned.
863           { "local",   eNSPT_Int,  eNSPA_Required },
864           // Client IP for application requesting the info.
865           { "ip",      eNSPT_Str,  eNSPA_Required },
866           // Session ID for application requesting the info.
867           { "sid",     eNSPT_Str,  eNSPA_Required },
868           // Password for blob access.
869           { "pass",    eNSPT_Str,  eNSPA_Optional },
870           // Max age of blob (returned blob should be younger)
871           { "age",     eNSPT_Int,  eNSPA_Optional } } },
872     // Mark the "current version" of the blob as "valid".
873     // This command is sent only by other NC servers in response to client's
874     // SETVALID command when the server where client have sent initial command
875     // cannot execute it locally for any reason (slot is not processed by that
876     // server, or initial database caching is not completed yet).
877     { "PROXY_SETVALID",
878         {&CNCMessageHandler::x_DoCmd_SetValid,
879             "PROXY_SETVALID",
880             fNeedsBlobAccess | fNeedsStorageCache | fDoNotProxyToPeers,
881             eNCRead,
882             eProxySetValid},
883           // Name of cache for blob (for NC-generated blob keys this will be
884           // empty).
885         { { "cache",   eNSPT_Str,  eNSPA_Required },
886           // Blob's key.
887           { "key",     eNSPT_Str,  eNSPA_Required },
888           // Blob's subkey (for NC-generated blob keys this will be empty).
889           { "subkey",  eNSPT_Str,  eNSPA_Required },
890           // Blob's version (for NC-generated blob keys this will be equal to 0).
891           { "version", eNSPT_Int,  eNSPA_Required },
892           // Client IP for application requesting the info.
893           { "ip",      eNSPT_Str,  eNSPA_Required },
894           // Session ID for application requesting the info.
895           { "sid",     eNSPT_Str,  eNSPA_Required },
896           // Password for blob access.
897           { "pass",    eNSPT_Str,  eNSPA_Optional } } },
898     // Remove the blob. This command is sent only by other NC servers in response
899     // to client's RMV2 (or similar) command when the server where client have
900     // sent initial command cannot execute it locally for any reason (slot is not
901     // processed by that server, or initial database caching is not completed yet).
902     { "PROXY_RMV",
903         {&CNCMessageHandler::x_DoCmd_Remove,
904             "PROXY_ReMoVe",
905             fNeedsBlobAccess | fNoBlobAccessStats | fDoNotProxyToPeers,
906             eNCCreate,
907             eProxyRemove},
908           // Name of cache for blob (for NC-generated blob keys this will be
909           // empty).
910         { { "cache",   eNSPT_Str,  eNSPA_Required },
911           // Blob's key.
912           { "key",     eNSPT_Str,  eNSPA_Required },
913           // Blob's subkey (for NC-generated blob keys this will be empty).
914           { "subkey",  eNSPT_Str,  eNSPA_Required },
915           // Blob's version (for NC-generated blob keys this will be equal to 0).
916           { "version", eNSPT_Int,  eNSPA_Required },
917           // Quorum to use for this operation.
918           { "qrum",    eNSPT_Int,  eNSPA_Required },
919           // Client IP for application requesting the info.
920           { "ip",      eNSPT_Str,  eNSPA_Required },
921           // Session ID for application requesting the info.
922           { "sid",     eNSPT_Str,  eNSPA_Required },
923           // Password for blob access.
924           { "pass",    eNSPT_Str,  eNSPA_Optional }
925         } },
926     // Remove the blob. This command is sent by other NC servers in certain scenarios only
927     { "COPY_RMV",
928         {&CNCMessageHandler::x_DoCmd_Remove,
929             "COPY_ReMoVe",
930             fNeedsBlobAccess | fNoBlobAccessStats | fDoNotProxyToPeers
931                              | fDoNotCheckPassword | fNoBlobVersionCheck | fCopyLogEvent,
932             eNCRead,
933             eProxyRemove},
934           // Name of cache for blob (for NC-generated blob keys this will be
935           // empty).
936         { { "cache",   eNSPT_Str,  eNSPA_Required },
937           // Blob's key.
938           { "key",     eNSPT_Str,  eNSPA_Required },
939           // Blob's subkey (for NC-generated blob keys this will be empty).
940           { "subkey",  eNSPT_Str,  eNSPA_Required },
941           // time of the update
942           { "cr_time", eNSPT_Int, eNSPA_Required },
943           // Server_id, requestor
944           { "cr_srv",  eNSPT_Int,  eNSPA_Required }
945         }
946     },
947     // Read meta information about the blob. This command is sent only by other
948     // NC servers in response to client's GETMETA command when the server where
949     // client have sent initial command cannot execute it locally for any reason
950     // (slot is not processed by that server, or initial database caching
951     // is not completed yet, or it was determined that this server has latest
952     // version of the blob).
953     { "PROXY_GETMETA",
954         {&CNCMessageHandler::x_DoCmd_GetMeta,
955             "PROXY_GETMETA",
956             eProxyBlobRead | fDoNotCheckPassword | fDoNotProxyToPeers,
957             eNCRead,
958             eProxyGetMeta},
959           // Name of cache for blob (for NC-generated blob keys this will be
960           // empty).
961         { { "cache",   eNSPT_Str,  eNSPA_Required },
962           // Blob's key.
963           { "key",     eNSPT_Str,  eNSPA_Required },
964           // Blob's subkey (for NC-generated blob keys this will be empty).
965           { "subkey",  eNSPT_Str,  eNSPA_Required },
966           // Quorum to use for this operation.
967           { "qrum",    eNSPT_Int,  eNSPA_Required },
968           // Flag whether local execution of this command should be forced
969           // no matter what, i.e. if this flag is set in cases when normal
970           // GETMETA command would have been proxied to other servers an error
971           // should be returned.
972           { "local",   eNSPT_Int,  eNSPA_Required },
973           // Client IP for application requesting the info.
974           { "ip",      eNSPT_Str,  eNSPA_Required },
975           // Session ID for application requesting the info.
976           { "sid",     eNSPT_Str,  eNSPA_Required },
977           // HTTP flag: read input in NC format, write output in HTTP one
978           { "http", eNSPT_Int,  eNSPA_Optional }
979         } },
980     // Start periodic synchronization session. Command is sent only by other
981     // NC servers when CNCActiveSyncControl in them decides to start
982     // synchronization. Response to this command contains list of events from
983     // sync logs of this server which need to be synchronized. Or if this
984     // server understands that synchronization using blob lists is needed then
985     // first line of response will contain ALL_BLOBS word and then full list
986     // of blobs in this slot will be sent.
987     { "SYNC_START",
988         {&CNCMessageHandler::x_DoCmd_SyncStart,
989             "SYNC_START",
990           // this command does not need space, but, to do sync, we WILL need space
991             fNeedsSpaceAsPeer |
992             fNeedsStorageCache | fNeedsLowerPriority | fNeedsAdminClient
993             , eNCNone, eProxyNone},
994           // Server id of the server starting synchronization.
995         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
996           // Slot to start synchronization on.
997           { "slot",    eNSPT_Int,  eNSPA_Required },
998           // Last synchronized record number (in sync log) of _that_ server
999           // as _that_ server thinks.
1000           { "rec_my",  eNSPT_Int,  eNSPA_Required },
1001           // Last synchronized record number (in sync log) of _this_ server
1002           // as _that_ server thinks.
1003           { "rec_your",eNSPT_Int,  eNSPA_Required } } },
1004     // Get full list of blobs for the slot. Command is sent only by other NC
1005     // servers when that server decides that synchronization using blob lists
1006     // is needed. Command can be sent only after successful execution of
1007     // SYNC_START command.
1008     { "SYNC_BLIST",
1009         {&CNCMessageHandler::x_DoCmd_SyncBlobsList,
1010             "SYNC_BLIST",
1011             eRunsInStartedSync, eNCNone, eProxyNone},
1012           // Server id of the server managing the synchronization.
1013         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
1014           // Slot that synchronization is started on.
1015           { "slot",    eNSPT_Int,  eNSPA_Required } } },
1016     // Write blob contents. This command is sent only by other NC servers
1017     // during synchronization session if some blob was written on that server
1018     // and the same data didn't make it to this server yet.
1019     { "SYNC_PUT",
1020         {&CNCMessageHandler::x_DoCmd_CopyPut,
1021             "SYNC_PUT",
1022             eSyncBlobCmd | fNeedsSpaceAsPeer
1023                          | fReadExactBlobSize | fCopyLogEvent,
1024             eNCCopyCreate, eProxyNone},
1025           // Server id of the server managing the synchronization.
1026         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
1027           // Slot that synchronization is started on.
1028           { "slot",    eNSPT_Int,  eNSPA_Required },
1029           // Name of cache for blob (for NC-generated blob keys this will be
1030           // empty).
1031           { "cache",   eNSPT_Str,  eNSPA_Required },
1032           // Blob's key.
1033           { "key",     eNSPT_Str,  eNSPA_Required },
1034           // Blob's subkey (for NC-generated blob keys this will be empty).
1035           { "subkey",  eNSPT_Str,  eNSPA_Required },
1036           // Blob's version (for NC-generated blob keys this will be equal to 0).
1037           { "version", eNSPT_Int,  eNSPA_Required },
1038           // MD5 checksum of blob's password.
1039           { "md5_pass",eNSPT_Str,  eNSPA_Required },
1040           // Creation time of the blob (microseconds since epoch).
1041           { "cr_time", eNSPT_Int,  eNSPA_Required },
1042           // Time-to-live for the blob.
1043           { "ttl",     eNSPT_Int,  eNSPA_Required },
1044           // Dead-time for the blob (can be greater than expiration time).
1045           { "dead",    eNSPT_Int,  eNSPA_Required },
1046           // Expiration time for the blob
1047           { "exp",     eNSPT_Int,  eNSPA_Required },
1048           // Blob size.
1049           { "size",    eNSPT_Int,  eNSPA_Required },
1050           // Time-to-live for blob's version.
1051           { "ver_ttl", eNSPT_Int,  eNSPA_Required },
1052           // Blob's version expiration time.
1053           { "ver_dead",eNSPT_Int,  eNSPA_Required },
1054           // Server_id of the server where blob was created.
1055           { "cr_srv",  eNSPT_Int,  eNSPA_Required },
1056           // Id of the blob on the server where it was created.
1057           { "cr_id",   eNSPT_Int,  eNSPA_Required },
1058           // Record number of the event of blob creation in synchronization
1059           // logs of the server where blob was created.
1060           { "log_rec", eNSPT_Int,  eNSPA_Required },
1061           // Version of the command. Field exists for protocol backwards
1062           // compatibility with previous versions of NC. In current NC this
1063           // version is always 1.
1064           { "cmd_ver", eNSPT_Int,  eNSPA_Optional, "0" } } },
1065     // Prolong the blob's life. This command is sent only by other NC servers
1066     // during synchronization session if some blob was prolonged on that server
1067     // and the same prolongation didn't happen on this server yet.
1068     { "SYNC_PROLONG",
1069         {&CNCMessageHandler::x_DoCmd_CopyProlong,
1070             "SYNC_PROLONG",
1071             eSyncBlobCmd,
1072             eNCRead, eProxyNone},
1073           // Server id of the server managing the synchronization.
1074         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
1075           // Slot that synchronization is started on.
1076           { "slot",    eNSPT_Int,  eNSPA_Required },
1077           // Name of cache for blob (for NC-generated blob keys this will be
1078           // empty).
1079           { "cache",   eNSPT_Str,  eNSPA_Required },
1080           // Blob's key.
1081           { "key",     eNSPT_Str,  eNSPA_Required },
1082           // Blob's subkey (for NC-generated blob keys this will be empty).
1083           { "subkey",  eNSPT_Str,  eNSPA_Required },
1084           // Creation time of the blob (microseconds since epoch).
1085           { "cr_time", eNSPT_Int,  eNSPA_Required },
1086           // Server_id of the server where blob was created.
1087           { "cr_srv",  eNSPT_Int,  eNSPA_Required },
1088           // Id of the blob on the server where it was created.
1089           { "cr_id",   eNSPT_Int,  eNSPA_Required },
1090           // Dead-time for the blob (can be greater than expiration time).
1091           { "dead",    eNSPT_Int,  eNSPA_Required },
1092           // Expiration time for the blob
1093           { "exp",     eNSPT_Int,  eNSPA_Required },
1094           // Blob's version expiration time.
1095           { "ver_dead",eNSPT_Int,  eNSPA_Required },
1096           // Time of creation of initial record in synchronization log about
1097           // this operation.
1098           { "log_time",eNSPT_Int,  fNSPA_Optional },
1099           // Server that first made the blob's life prolongation.
1100           { "log_srv", eNSPT_Int,  eNSPA_Optional },
1101           // Record number of the initial record in synchronization log about
1102           // this operation.
1103           { "log_rec", eNSPT_Int,  eNSPA_Optional } } },
1104     // Read blob contents. This command is sent only by other NC servers
1105     // during synchronization session if some blob was written on this server
1106     // and the same data didn't make it to that server yet.
1107     { "SYNC_GET",
1108         {&CNCMessageHandler::x_DoCmd_SyncGet,
1109             "SYNC_GET",
1110             eSyncBlobCmd,
1111             eNCReadData, eProxyNone},
1112           // Server id of the server managing the synchronization.
1113         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
1114           // Slot that synchronization is started on.
1115           { "slot",    eNSPT_Int,  eNSPA_Required },
1116           // Name of cache for blob (for NC-generated blob keys this will be
1117           // empty).
1118           { "cache",   eNSPT_Str,  eNSPA_Required },
1119           // Blob's key.
1120           { "key",     eNSPT_Str,  eNSPA_Required },
1121           // Blob's subkey (for NC-generated blob keys this will be empty).
1122           { "subkey",  eNSPT_Str,  eNSPA_Required },
1123           // Time of creation of initial record in synchronization log about
1124           // this operation.
1125           { "log_time",eNSPT_Int,  eNSPA_Required },
1126           // Creation time of the blob (microseconds since epoch).
1127           { "cr_time", eNSPT_Int,  eNSPA_Required },
1128           // Server_id of the server where blob was created.
1129           { "cr_srv",  eNSPT_Int,  eNSPA_Required },
1130           // Id of the blob on the server where it was created.
1131           { "cr_id",   eNSPT_Int,  eNSPA_Required } } },
1132     // Get information necessary to prolong the blob's life. This command
1133     // is sent only by other NC servers during synchronization session if some
1134     // blob was prolonged on this server and the same prolongation didn't
1135     // happen on that server yet.
1136     { "SYNC_PROINFO",
1137         {&CNCMessageHandler::x_DoCmd_SyncProlongInfo,
1138             "SYNC_PROINFO",
1139             eSyncBlobCmd,
1140             eNCRead, eProxyNone},
1141           // Server id of the server managing the synchronization.
1142         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
1143           // Slot that synchronization is started on.
1144           { "slot",    eNSPT_Int,  eNSPA_Required },
1145           // Name of cache for blob (for NC-generated blob keys this will be
1146           // empty).
1147           { "cache",   eNSPT_Str,  eNSPA_Required },
1148           // Blob's key.
1149           { "key",     eNSPT_Str,  eNSPA_Required },
1150           // Blob's subkey (for NC-generated blob keys this will be empty).
1151           { "subkey",  eNSPT_Str,  eNSPA_Required } } },
1152     // "Commit" the synchronization session. This command is sent only by other
1153     // NC servers at the end of synchronization session when all necessary
1154     // commands have been executed successfully.
1155     { "SYNC_COMMIT",
1156         {&CNCMessageHandler::x_DoCmd_SyncCommit,
1157             "SYNC_COMMIT",
1158             eRunsInStartedSync | fProhibitsSyncAbort, eNCNone, eProxyNone},
1159           // Server id of the server managing the synchronization.
1160         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
1161           // Slot that synchronization is started on.
1162           { "slot",    eNSPT_Int,  eNSPA_Required },
1163           // Last synchronized record number (in sync log) of _that_ server.
1164           { "rec_my",  eNSPT_Int,  eNSPA_Required },
1165           // Last synchronized record number (in sync log) of _this_ server.
1166           { "rec_your",eNSPT_Int,  eNSPA_Required } } },
1167     // "Cancel" the synchronization session. This command is sent only by other
1168     // NC servers at the end of synchronization session when either this server
1169     // requested or that server decided that synchronization should be aborted
1170     // despite the successful execution of all commands. Reason for
1171     // cancellation could be some server going to shutdown, or requirement to
1172     // clean sync logs (and synchronization already executes for too long).
1173     // The cancellation doesn't cancel any commands already executed in this
1174     // synchronization session. It exists only to quickly mark this
1175     // synchronization as no longer executing so that NC could start
1176     // synchronization with some other server.
1177     { "SYNC_CANCEL",
1178         {&CNCMessageHandler::x_DoCmd_SyncCancel,
1179             "SYNC_CANCEL",
1180             eRunsInStartedSync | fProhibitsSyncAbort, eNCNone, eProxyNone},
1181           // Server id of the server managing the synchronization.
1182         { { "srv_id",  eNSPT_Int,  eNSPA_Required },
1183           // Slot that synchronization is started on.
1184           { "slot",    eNSPT_Int,  eNSPA_Required } } },
1185     // Get meta information about the blob. Command for "NetCache" clients.
1186     { "GETMETA",
1187         {&CNCMessageHandler::x_DoCmd_GetMeta,
1188             "GETMETA",
1189             eClientBlobRead | fDoNotCheckPassword,
1190             eNCRead,
1191             eProxyGetMeta},
1192           // Key of the blob
1193         { { "key",     eNSPT_NCID, eNSPA_Required },
1194           // Flag forcing local execution of the command (without forwarding
1195           // to other servers and without searching for blob on them).
1196           { "local",   eNSPT_Int,  eNSPA_Optional },
1197           // Quorum to use for this operation.
1198           { "qrum",    eNSPT_Int,  eNSPA_Optional },
1199           // Client IP for application sending the command.
1200           { "ip",      eNSPT_Str,  fNSPA_Optional },
1201           // Session ID for application sending the command.
1202           { "sid",     eNSPT_Str,  eNSPA_Optional },
1203           // request Hit ID
1204           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1205         } },
1206     // Get meta information about the blob. Command for "ICache" clients.
1207     { "GETMETA",
1208         {&CNCMessageHandler::x_DoCmd_GetMeta,
1209             "IC_GETMETA",
1210             eClientBlobRead | fDoNotCheckPassword,
1211             eNCRead,
1212             eProxyGetMeta},
1213           // Name of cache for blob.
1214         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
1215           // Blob's key.
1216           { "key",     eNSPT_Str,  eNSPA_Required },
1217           // Blob's version.
1218           { "version", eNSPT_Int,  eNSPA_Required },
1219           // Blob's subkey.
1220           { "subkey",  eNSPT_Str,  eNSPA_Required },
1221           // Flag forcing local execution of the command (without forwarding
1222           // to other servers and without searching for blob on them).
1223           { "local",   eNSPT_Int,  eNSPA_Optional },
1224           // Quorum to use for this operation.
1225           { "qrum",    eNSPT_Int,  eNSPA_Optional },
1226           // Client IP for application sending the command.
1227           { "ip",      eNSPT_Str,  fNSPA_Optional },
1228           // Session ID for application sending the command.
1229           { "sid",     eNSPT_Str,  eNSPA_Optional },
1230           // request Hit ID
1231           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1232         } },
1233     // Prolong blob's life for a specific number of seconds from current time.
1234     // Command provides a minimum time this blob should be still available for.
1235     // If blob's expiration time was already later than that this command is
1236     // a no-op.
1237     { "PROLONG",
1238         {&CNCMessageHandler::x_DoCmd_Prolong,
1239             "PROLONG",
1240             eClientBlobRead,
1241             eNCRead,
1242             eProxyProlong},
1243           // Name of cache for blob (for NC-generated blob keys this will be
1244           // empty).
1245         { { "cache",   eNSPT_Str,  eNSPA_Required },
1246           // Blob's key.
1247           { "key",     eNSPT_Str,  eNSPA_Required },
1248           // Blob's subkey (for NC-generated blob keys this will be empty).
1249           { "subkey",  eNSPT_Str,  eNSPA_Required },
1250           // Period of time for the blob to be available.
1251           { "ttl",     eNSPT_Int,  eNSPA_Required },
1252           // Quorum to use for this operation.
1253           { "qrum",    eNSPT_Int,  eNSPA_Optional },
1254           // Client IP for application sending the command.
1255           { "ip",      eNSPT_Str,  fNSPA_Optional },
1256           // Session ID for application sending the command.
1257           { "sid",     eNSPT_Str,  eNSPA_Optional },
1258           // Password for blob access.
1259           { "pass",    eNSPT_Str,  eNSPA_Optional },
1260           // request Hit ID
1261           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1262         } },
1263     // Prolong blob's life for a specific number of seconds from current time.
1264     // This command is sent only by other NC servers in response to client's
1265     // PROLONG command when the server where client have sent initial command
1266     // cannot execute it locally for any reason (slot is not processed by that
1267     // server, or initial database caching is not completed yet, or it was
1268     // determined that this server has latest version of the blob).
1269     { "PROXY_PROLONG",
1270         {&CNCMessageHandler::x_DoCmd_Prolong,
1271             "PROXY_PROLONG",
1272             eProxyBlobRead | fDoNotProxyToPeers,
1273             eNCRead,
1274             eProxyProlong},
1275           // Name of cache for blob (for NC-generated blob keys this will be
1276           // empty).
1277         { { "cache",   eNSPT_Str,  eNSPA_Required },
1278           // Blob's key.
1279           { "key",     eNSPT_Str,  eNSPA_Required },
1280           // Blob's subkey (for NC-generated blob keys this will be empty).
1281           { "subkey",  eNSPT_Str,  eNSPA_Required },
1282           // Period of time for the blob to be available.
1283           { "ttl",     eNSPT_Int,  eNSPA_Required },
1284           // Quorum to use for this operation.
1285           { "qrum",    eNSPT_Int,  eNSPA_Required },
1286           // Flag whether local execution of this command should be forced
1287           // no matter what, i.e. if this flag is set in cases when normal GET2
1288           // command would have been proxied to other servers an error should
1289           // be returned.
1290           { "local",   eNSPT_Int,  eNSPA_Required },
1291           // Value of the flag "search_on_read" to use with this command, i.e.
1292           // whether this server should search the blob if it's not found
1293           // locally.
1294           { "srch",    eNSPT_Int,  eNSPA_Required },
1295           // Client IP for application sending the command.
1296           { "ip",      eNSPT_Str,  eNSPA_Required },
1297           // Session ID for application sending the command.
1298           { "sid",     eNSPT_Str,  eNSPA_Required },
1299           // Password for blob access.
1300           { "pass",    eNSPT_Str,  eNSPA_Optional } } },
1301     /*{ "BLOBSLIST",
1302         {&CNCMessageHandler::x_DoCmd_GetBlobsList,
1303             "BLOBSLIST",
1304             fNeedsStorageCache + fNeedsAdminClient} },*/
1305     // Write blob contents. Deprecated command used now only by old clients.
1306     // This command is the same as PUT3 except it uses connection closing as
1307     // legitimate EOF marker for blob's data.
1308     { "PUT2",
1309         {&CNCMessageHandler::x_DoCmd_Put,
1310             "PUT2",
1311             eClientBlobWrite | fCanGenerateKey | fCursedPUT2Cmd,
1312             eNCCreate,
1313             eProxyWrite},
1314           // Time-to-live for the blob.
1315         { { "ttl",     eNSPT_Int,  eNSPA_Optional },
1316           // Key of the blob (if skipped or empty then new one will be created).
1317           { "key",     eNSPT_NCID, eNSPA_Optional } } },
1318     // Shutdown the server
1319     { "SHUTDOWN",
1320         {&CNCMessageHandler::x_DoCmd_Shutdown,
1321             "SHUTDOWN",
1322             fNeedsAdminClient, eNCNone, eProxyNone},
1323           // Client IP for application sending the command.
1324         { { "ip",      eNSPT_Str,  fNSPA_Optional },
1325           // Session ID for application sending the command.
1326           { "sid",     eNSPT_Str,  eNSPA_Optional },
1327           // drain: wait until all BLOBs are expired, then shutdown
1328           { "drain",    eNSPT_Int,  eNSPA_Optional },
1329           // reset: shutdown and leave database guard on disk (CXX-10401)
1330           { "reset",    eNSPT_Int,  eNSPA_Optional },
1331           // request Hit ID
1332           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1333         } },
1334     // Get server statistics.
1335     { "GETSTAT",
1336         {&CNCMessageHandler::x_DoCmd_GetStat, "GETSTAT"},
1337           // Flag showing whether current (value is 0) or previous (value is 1)
1338           // statistics period should be shown.
1339         { { "prev",    eNSPT_Int,  fNSPA_Optional, "0" },
1340           // Type of statistics period to show. See top of nc_stat.cpp for
1341           // list of all possible period types.
1342           { "type",    eNSPT_Str,  eNSPA_Optional, "life" },
1343           // Client IP for application sending the command.
1344           { "ip",      eNSPT_Str,  fNSPA_Optional },
1345           // Session ID for application sending the command.
1346           { "sid",     eNSPT_Str,  eNSPA_Optional },
1347           // request Hit ID
1348           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1349         } },
1350     // Read full ini-file used by NetCache for configuration.
1351     { "GETCONF",
1352         {&CNCMessageHandler::x_DoCmd_GetConfig,      "GETCONF",
1353             fNoCmdFlags, eNCNone, eProxyNone},
1354         {
1355           // Netcached.ini section name
1356           { "section", eNSPT_Str,  eNSPA_Optional },
1357           // when section name is "netcache", setup for this port
1358           { "port",    eNSPT_Str,  eNSPA_Optional },
1359           // when section name is "netcache", setup for this cache
1360           { "cache",   eNSPT_Str,  eNSPA_Optional },
1361           // Client IP for application sending the command.
1362           { "ip",      eNSPT_Str,  fNSPA_Optional },
1363           // Session ID for application sending the command.
1364           { "sid",     eNSPT_Str,  eNSPA_Optional },
1365           // request Hit ID
1366           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1367         } },
1368     // Get state information,  added in v6.8.6
1369     { "INFO",
1370         {&CNCMessageHandler::x_DoCmd_GetConfig,      "INFO",
1371             fNoCmdFlags, eNCNone, eProxyNone},
1372         {
1373           // Netcached.ini section name
1374           { "section", eNSPT_Str,  fNSPA_Required },
1375           // when section name is "netcache", setup for this port
1376           { "port",    eNSPT_Str,  eNSPA_Optional },
1377           // when section name is "netcache", setup for this cache
1378           { "cache",   eNSPT_Str,  eNSPA_Optional },
1379           // Client IP for application sending the command.
1380           { "ip",      eNSPT_Str,  fNSPA_Optional },
1381           // Session ID for application sending the command.
1382           { "sid",     eNSPT_Str,  eNSPA_Optional },
1383           // request Hit ID
1384           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1385         } },
1386     // Acknowledge alert,  added in v6.8.6
1387     { "ACKALERT",
1388         {&CNCMessageHandler::x_DoCmd_AckAlert,  "ACKALERT",
1389             fNeedsAdminClient, eNCNone, eProxyNone},
1390         {
1391           // Alert name
1392           { "alert", eNSPT_Str,  fNSPA_Required },
1393           // User name
1394           { "user", eNSPT_Str,  fNSPA_Required },
1395           // Client IP for application sending the command.
1396           { "ip",      eNSPT_Str,  fNSPA_Optional },
1397           // Session ID for application sending the command.
1398           { "sid",     eNSPT_Str,  eNSPA_Optional },
1399           // request Hit ID
1400           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1401         } },
1402     // Re-read ini-file used by NetCache for configuration.
1403     // Find changes and reconfigure
1404     // Only few changes are supported
1405     { "RECONF",
1406         {&CNCMessageHandler::x_DoCmd_ReConfig,
1407             "RECONF",
1408             fNeedsAdminClient, eNCNone, eProxyNone},
1409         {
1410           // Netcached.ini section name
1411           { "section", eNSPT_Str,  eNSPA_Required },
1412           // Client IP for application sending the command.
1413           { "ip",      eNSPT_Str,  fNSPA_Optional },
1414           // Session ID for application sending the command.
1415           { "sid",     eNSPT_Str,  eNSPA_Optional },
1416           // request Hit ID
1417           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1418         } },
1419 
1420     { "PURGE",
1421         {&CNCMessageHandler::x_DoCmd_Purge,
1422             "PURGE",
1423             fNoCmdFlags, eNCNone, eProxyNone},
1424         {
1425             // Cache name.
1426             { "cache",     eNSPT_Str, eNSPA_Required },
1427           // Client IP for application sending the command.
1428           { "ip",      eNSPT_Str,  fNSPA_Optional },
1429           // Session ID for application sending the command.
1430           { "sid",     eNSPT_Str,  eNSPA_Optional },
1431           // request Hit ID
1432           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1433         } },
1434     { "COPY_PURGE",
1435         {&CNCMessageHandler::x_DoCmd_CopyPurge,
1436             "COPY_PURGE", fNoCmdFlags, eNCNone, eProxyNone },
1437           // Cache name.
1438         { { "cache",   eNSPT_Str,  eNSPA_Required },
1439           // forget blobs created earlier than cr_time
1440           { "cr_time", eNSPT_Int,  eNSPA_Required } } },
1441 
1442     // Added in 6.11.7, CXX-8948
1443     { "PURGE2",
1444         {&CNCMessageHandler::x_DoCmd_Purge,
1445             "PURGE",
1446             fNoCmdFlags, eNCNone, eProxyNone},
1447           // Cache name.
1448         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
1449           // Blob's key.
1450           { "key",     eNSPT_Str,  eNSPA_Required },
1451           // Client IP for application sending the command.
1452           { "ip",      eNSPT_Str,  fNSPA_Optional },
1453           // Session ID for application sending the command.
1454           { "sid",     eNSPT_Str,  eNSPA_Optional },
1455           // request Hit ID
1456           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1457         } },
1458     // Added in 6.11.7, CXX-8948
1459     { "COPY_PURGE2",
1460         {&CNCMessageHandler::x_DoCmd_CopyPurge,
1461             "COPY_PURGE", fNoCmdFlags, eNCNone, eProxyNone },
1462           // Cache name.
1463         { { "cache",   eNSPT_Str,  eNSPA_Required },
1464           // Blob's key.
1465           { "key",     eNSPT_Str,  eNSPA_Required },
1466           // forget blobs created earlier than cr_time
1467           { "cr_time", eNSPT_Int,  eNSPA_Required } } },
1468 
1469     // Get list of blobs by mask: "cache,key,*"
1470     // Added in 6.9.0, CXX-6246
1471     { "BLIST",
1472         {&CNCMessageHandler::x_DoCmd_GetBList,
1473             "BLIST",
1474             fComesFromClient |
1475                 fNeedsStorageCache | fNeedsBlobList | fDoNotCheckPassword,
1476             eNCNone, eProxyGetBList},
1477           // Name of cache for blob.
1478         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
1479           // Blob's key.
1480           { "key",     eNSPT_Str,  eNSPA_Required },
1481           // Blob's subkey.
1482           { "subkey",  eNSPT_Str,  eNSPA_Optional },
1483           { "local",   eNSPT_Int,  eNSPA_Optional },
1484           // Client IP for application sending the command.
1485           { "ip",      eNSPT_Str,  fNSPA_Optional },
1486           // Session ID for application sending the command.
1487           { "sid",     eNSPT_Str,  eNSPA_Optional },
1488           // request Hit ID
1489           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1490         } },
1491 // added in v6.11.0 (CXX-8737)
1492     { "BLIST2",
1493         {&CNCMessageHandler::x_DoCmd_GetBList,
1494             "BLIST2",
1495             fComesFromClient |
1496                 fNeedsStorageCache | fNeedsBlobList | fDoNotProxyToPeers | fDoNotCheckPassword,
1497             eNCNone, eProxyGetBList2},
1498           // Name of cache for blob.
1499         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix },
1500           // Blob's key.
1501           { "key",     eNSPT_Str,  eNSPA_Optional },
1502           // Blob's subkey.
1503           { "subkey",  eNSPT_Str,  eNSPA_Optional },
1504           { "local",   eNSPT_Int,  eNSPA_Optional },
1505 
1506           // Created more than N seconds ago
1507           { "fcr_ago_ge", eNSPT_Int,  eNSPA_Optional },
1508           // Created less than N seconds ago
1509           { "fcr_ago_lt", eNSPT_Int,  eNSPA_Optional },
1510           // Created more than N seconds since epoch
1511           { "fcr_epoch_ge", eNSPT_Int,  eNSPA_Optional },
1512           // Created less than N seconds since epoch
1513           { "fcr_epoch_lt", eNSPT_Int,  eNSPA_Optional },
1514           // Blob will expire in more than N seconds from now
1515           { "fexp_now_ge", eNSPT_Int,  eNSPA_Optional },
1516           // Blob will expire in less than N seconds from now
1517           { "fexp_now_lt", eNSPT_Int,  eNSPA_Optional },
1518           // Blob will expire in more than N seconds since epoch
1519           { "fexp_epoch_ge", eNSPT_Int,  eNSPA_Optional },
1520           // Blob will expire in less than N seconds since epoch
1521           { "fexp_epoch_lt", eNSPT_Int,  eNSPA_Optional },
1522           // Version will expire in more than N seconds from now
1523           { "fvexp_now_ge", eNSPT_Int,  eNSPA_Optional },
1524           // Version will expire in less than N seconds from now
1525           { "fvexp_now_lt", eNSPT_Int,  eNSPA_Optional },
1526           // Version will expire in more than N seconds since epoch
1527           { "fvexp_epoch_ge", eNSPT_Int,  eNSPA_Optional },
1528           // Version will expire in less than N seconds since epoch
1529           { "fvexp_epoch_lt", eNSPT_Int,  eNSPA_Optional },
1530           // Server_id of the server where blob was created.
1531           { "fcr_srv",  eNSPT_Int,  eNSPA_Optional },
1532           // blob bigger than this size
1533           { "fsize_ge",  eNSPT_Int,  eNSPA_Optional },
1534           // blob smaller than this size
1535           { "fsize_lt",  eNSPT_Int,  eNSPA_Optional },
1536           // Client IP for application sending the command.
1537           { "ip",      eNSPT_Str,  fNSPA_Optional },
1538           // Session ID for application sending the command.
1539           { "sid",     eNSPT_Str,  eNSPA_Optional },
1540           // request Hit ID
1541           { "ncbi_phid", eNSPT_Str,  eNSPA_Optional }
1542         } },
1543     { "PROXY_BLIST",
1544         {&CNCMessageHandler::x_DoCmd_GetBList,
1545             "PROXY_BLIST",
1546             fNeedsStorageCache | fNeedsBlobList | fDoNotProxyToPeers | fDoNotCheckPassword,
1547             eNCNone, eProxyNone},
1548           // Name of cache for blob.
1549         { { "cache",   eNSPT_Str,  eNSPA_Required },
1550           // Blob's key.
1551           { "key",     eNSPT_Str,  eNSPA_Required },
1552           // Blob's subkey.
1553           { "subkey",  eNSPT_Str,  eNSPA_Required },
1554           { "local",   eNSPT_Int,  eNSPA_Required }
1555         } },
1556 // added in v6.11.0 (CXX-8737)
1557     { "PROXY_BLIST2",
1558         {&CNCMessageHandler::x_DoCmd_GetBList,
1559             "PROXY_BLIST2",
1560             fNeedsStorageCache | fNeedsBlobList | fDoNotProxyToPeers | fDoNotCheckPassword,
1561             eNCNone, eProxyNone},
1562           // Name of cache for blob.
1563         { { "cache",   eNSPT_Str,  eNSPA_Required },
1564           // Blob's key.
1565           { "key",     eNSPT_Str,  eNSPA_Required },
1566           // Blob's subkey.
1567           { "subkey",  eNSPT_Str,  eNSPA_Required },
1568           { "local",   eNSPT_Int,  eNSPA_Required },
1569           // Created more than N seconds ago
1570           { "fcr_ago_ge", eNSPT_Int,  eNSPA_Optional },
1571           // Created less than N seconds ago
1572           { "fcr_ago_lt", eNSPT_Int,  eNSPA_Optional },
1573           // Created more than N seconds since epoch
1574           { "fcr_epoch_ge", eNSPT_Int,  eNSPA_Optional },
1575           // Created less than N seconds since epoch
1576           { "fcr_epoch_lt", eNSPT_Int,  eNSPA_Optional },
1577           // Will expire in more than N seconds from now
1578           { "fexp_now_ge", eNSPT_Int,  eNSPA_Optional },
1579           // Will expire in less than N seconds from now
1580           { "fexp_now_lt", eNSPT_Int,  eNSPA_Optional },
1581           // Will expire in more than N seconds since epoch
1582           { "fexp_epoch_ge", eNSPT_Int,  eNSPA_Optional },
1583           // Will expire in less than N seconds since epoch
1584           { "fexp_epoch_lt", eNSPT_Int,  eNSPA_Optional },
1585           // Will expire in more than N seconds from now
1586           { "fvexp_now_ge", eNSPT_Int,  eNSPA_Optional },
1587           // Will expire in less than N seconds from now
1588           { "fvexp_now_lt", eNSPT_Int,  eNSPA_Optional },
1589           // Will expire in more than N seconds since epoch
1590           { "fvexp_epoch_ge", eNSPT_Int,  eNSPA_Optional },
1591           // Will expire in less than N seconds since epoch
1592           { "fvexp_epoch_lt", eNSPT_Int,  eNSPA_Optional },
1593           // Server_id of the server where blob was created.
1594           { "fcr_srv",  eNSPT_Int,  eNSPA_Optional },
1595           // blob bigger than this size
1596           { "fsize_ge",  eNSPT_Int,  eNSPA_Optional },
1597           // blob smaller than this size
1598           { "fsize_lt",  eNSPT_Int,  eNSPA_Optional }
1599         } },
1600 
1601 // HTTP commands
1602     { "DELETE",
1603         {&CNCMessageHandler::x_DoCmd_Remove,
1604             "DELETE",
1605             fNeedsBlobAccess | fNoBlobAccessStats | fIsHttp,
1606             eNCCreate,
1607             eProxyRemove}
1608     },
1609     { "GET",
1610         {&CNCMessageHandler::x_DoCmd_Get,
1611             "GET",
1612             eClientBlobRead | fIsHttp,
1613             eNCReadData,
1614             eProxyRead}
1615     },
1616     { "HEAD",
1617         {&CNCMessageHandler::x_DoCmd_GetMeta,
1618             "HEAD",
1619             eClientBlobRead | fDoNotCheckPassword | fIsHttp,
1620             eNCRead,
1621             eProxyGetMeta}
1622     },
1623     { "POST",
1624         {&CNCMessageHandler::x_DoCmd_Put,
1625             "POST",
1626             eClientBlobWrite | fCanGenerateKey | fReadExactBlobSize | fSkipBlobEOF | fIsHttp,
1627             eNCCreate,
1628             eProxyWrite}
1629     },
1630     { "PUT",
1631         {&CNCMessageHandler::x_DoCmd_Put,
1632             "PUT",
1633             eClientBlobWrite | fIsHttp | fReadExactBlobSize | fSkipBlobEOF | fIsHttp,
1634             eNCCreate,
1635             eProxyWrite}
1636     },
1637 
1638 
1639     // All commands below are not implemented and mostly old ones not needed
1640     // anymore. One exception is RECONF - it would be nice to have it but it
1641     // needs some thinking on how to implement it.
1642 
1643     { "REINIT",   {&CNCMessageHandler::x_DoCmd_NotImplemented, "REINIT"} },
1644     { "REINIT",   {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_REINIT"},
1645         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1646 //    { "RECONF",   {&CNCMessageHandler::x_DoCmd_NotImplemented, "RECONF"} },
1647     { "LOG",      {&CNCMessageHandler::x_DoCmd_NotImplemented, "LOG"} },
1648     { "STAT",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "STAT"} },
1649     { "MONI",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "MONITOR"} },
1650     { "DROPSTAT", {&CNCMessageHandler::x_DoCmd_NotImplemented, "DROPSTAT"} },
1651     { "GBOW",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "GBOW"} },
1652     { "ISLK",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "ISLK"} },
1653     { "SMR",      {&CNCMessageHandler::x_DoCmd_NotImplemented, "SMR"} },
1654     { "SMU",      {&CNCMessageHandler::x_DoCmd_NotImplemented, "SMU"} },
1655     { "OK",       {&CNCMessageHandler::x_DoCmd_NotImplemented, "OK"} },
1656 // will be used by HTTP
1657 //    { "GET",      {&CNCMessageHandler::x_DoCmd_NotImplemented, "GET"} },
1658 //    { "PUT",      {&CNCMessageHandler::x_DoCmd_NotImplemented, "PUT"} },
1659     { "REMOVE",   {&CNCMessageHandler::x_DoCmd_NotImplemented, "REMOVE"} },
1660     { "STSP",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_STSP"},
1661         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1662     { "GTSP",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GTSP"},
1663         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1664     { "SVRP",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_SVRP"},
1665         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1666     { "GVRP",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GVRP"},
1667         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1668     { "PRG1",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_PRG1"},
1669         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1670     { "REMK",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_REMK"},
1671         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1672     { "GBLW",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GBLW"},
1673         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1674     { "ISOP",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_ISOP"},
1675         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1676     { "GACT",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GACT"},
1677         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1678     { "GTOU",     {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GTOU"},
1679         { { "cache",   eNSPT_Id,   eNSPA_ICPrefix } } },
1680     { NULL }
1681 };
1682 
1683 // List of arguments that can be in client authentication line.
1684 static SNSProtoArgument s_AuthArgs[] = {
1685     { "client", eNSPT_Str, eNSPA_Optional, "Unknown client" },
1686     { "params", eNSPT_Str, eNSPA_Ellipsis },
1687     { NULL }
1688 };
1689 
1690 
1691 
1692 
1693 /////////////////////////////////////////////////////////////////////////////
1694 // CNCMessageHandler implementation
1695 
1696 inline void
x_ResetFlags(void)1697 CNCMessageHandler::x_ResetFlags(void)
1698 {
1699     TNCCmdFlags keep = m_Flags & fNoReplyOnFinish;
1700     m_Flags = fNoCmdFlags | keep;
1701 }
1702 
1703 inline void
x_SetFlag(ENCCmdFlags flag)1704 CNCMessageHandler::x_SetFlag(ENCCmdFlags flag)
1705 {
1706     m_Flags |= flag;
1707 }
1708 
1709 inline void
x_UnsetFlag(ENCCmdFlags flag)1710 CNCMessageHandler::x_UnsetFlag(ENCCmdFlags flag)
1711 {
1712     m_Flags &= ~flag;
1713 }
1714 
1715 inline bool
x_IsFlagSet(ENCCmdFlags flag)1716 CNCMessageHandler::x_IsFlagSet(ENCCmdFlags flag)
1717 {
1718     return (m_Flags & flag) != 0;
1719 }
1720 
1721 inline bool
x_IsUserFlagSet(ENCUserFlags flag)1722 CNCMessageHandler::x_IsUserFlagSet(ENCUserFlags flag)
1723 {
1724     return (m_UserFlags & flag) != 0;
1725 }
1726 
1727 inline bool
x_IsCmdSucceeded(int cmd_status)1728 CNCMessageHandler::x_IsCmdSucceeded(int cmd_status)
1729 {
1730     return cmd_status < 300;
1731 }
1732 
1733 //static Uint8 s_CntHndls = 0;
1734 
CNCMessageHandler(void)1735 CNCMessageHandler::CNCMessageHandler(void)
1736     : m_Flags(0),
1737       m_Parser(s_CommandMap),
1738       m_CmdProcessor(NULL),
1739       m_BlobAccess(NULL),
1740       m_write_event(NULL),
1741       m_ChunkLen(0),
1742       m_SrvsIndex(0),
1743       m_ActiveHub(NULL)
1744 {
1745     LOG_CURRENT_FUNCTION
1746 #if __NC_TASKS_MONITOR
1747     m_TaskName = "CNCMessageHandler";
1748 #endif
1749     m_CopyBlobInfo = new SNCBlobVerData(nullptr);
1750     m_LatestBlobSum = new SNCBlobSummary();
1751     m_BlobFilter = new SNCBlobFilter;
1752     m_CmdLog.reserve(32);
1753 
1754     SetState(&CNCMessageHandler::x_SocketOpened);
1755 
1756     //Uint8 cnt = AtomicAdd(s_CntHndls, 1);
1757     //INFO("CNCMessageHandler, cnt=" << cnt);
1758     m_HttpMode = eNoHttp;
1759 }
1760 
~CNCMessageHandler(void)1761 CNCMessageHandler::~CNCMessageHandler(void)
1762 {
1763     LOG_CURRENT_FUNCTION
1764     delete m_LatestBlobSum;
1765     delete m_CopyBlobInfo;
1766 
1767     //Uint8 cnt = AtomicSub(s_CntHndls, 1);
1768     //INFO("~CNCMessageHandler, cnt=" << cnt);
1769 }
1770 
1771 CNCMessageHandler::State
x_SocketOpened(void)1772 CNCMessageHandler::x_SocketOpened(void)
1773 {
1774     LOG_CURRENT_FUNCTION
1775     m_PrevCache.clear();
1776     m_ClientParams.clear();
1777     m_CntCmds = 0;
1778 
1779     string host;
1780     Uint2 port = 0;
1781     GetPeerAddress(host, port);
1782     m_ClientParams["peer"]  = host;
1783     m_ClientParams["pport"] = NStr::UIntToString(port);
1784     m_LocalPort             = GetLocalPort();
1785     m_ClientParams["port"]  = NStr::UIntToString(m_LocalPort);
1786 
1787     return &CNCMessageHandler::x_ReadAuthMessage;
1788 }
1789 
1790 CNCMessageHandler::State
x_CloseCmdAndConn(void)1791 CNCMessageHandler::x_CloseCmdAndConn(void)
1792 {
1793     LOG_CURRENT_FUNCTION
1794     if (GetDiagCtx()->GetRequestStatus() == eStatus_OK) {
1795         if (HasError()  ||  !CanHaveMoreRead())
1796             GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
1797         else if (CTaskServer::IsInShutdown())
1798             GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
1799         else
1800             GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
1801     }
1802     int status = GetDiagCtx()->GetRequestStatus();
1803     if (x_IsFlagSet(eBlobPut) && m_NCBlobKey.IsICacheKey()) {
1804         x_JournalBlobPutResult(status, m_NCBlobKey.PackedKey(), m_BlobSlot);
1805     }
1806     x_CleanCmdResources();
1807     GetDiagCtx()->SetRequestStatus(status);
1808     return &CNCMessageHandler::x_SaveStatsAndClose;
1809 }
1810 
1811 CNCMessageHandler::State
x_SaveStatsAndClose(void)1812 CNCMessageHandler::x_SaveStatsAndClose(void)
1813 {
1814     LOG_CURRENT_FUNCTION
1815     //CNCStat::AddClosedConnection(conn_span, GetDiagCtx()->GetRequestStatus(), m_CntCmds);
1816     CNCStat::ConnClosing(m_CntCmds);
1817     return &CNCMessageHandler::x_PrintCmdsCntAndClose;
1818 }
1819 
1820 CNCMessageHandler::State
x_PrintCmdsCntAndClose(void)1821 CNCMessageHandler::x_PrintCmdsCntAndClose(void)
1822 {
1823     LOG_CURRENT_FUNCTION
1824     CSrvDiagMsg().PrintExtra().PrintParam("cmds_cnt", m_CntCmds);
1825     CloseSocket();
1826     Terminate();
1827     return NULL;
1828 }
1829 
1830 CNCMessageHandler::State
x_WriteInitWriteResponse(void)1831 CNCMessageHandler::x_WriteInitWriteResponse(void)
1832 {
1833     LOG_CURRENT_FUNCTION
1834 check_again:
1835     ENCClientHubStatus status = m_ActiveHub->GetStatus();
1836     if (status == eNCHubError || status == eNCHubSuccess) {
1837         m_LastPeerError = m_ActiveHub->GetErrMsg();
1838         SRV_LOG(Warning, "Error executing command on peer "
1839             << m_ActiveHub->GetFullPeerName() << ", peer says: "
1840             << m_LastPeerError);
1841         m_ActiveHub->Release();
1842         m_ActiveHub = NULL;
1843         return &CNCMessageHandler::x_ProxyToNextPeer;
1844     }
1845     else if (status != eNCHubCmdInProgress) {
1846         SRV_FATAL("Unexpected client status: " << status);
1847     }
1848     if (NeedEarlyClose())
1849         return &CNCMessageHandler::x_CloseCmdAndConn;
1850 
1851     CNCActiveHandler* active = m_ActiveHub->GetHandler();
1852     if (!active->GotClientResponse())
1853         return NULL;
1854     // Intentionally re-reading the status because it could change since our
1855     // previous read at the beginning of the function.
1856     if (m_ActiveHub->GetStatus() != eNCHubCmdInProgress)
1857         goto check_again;
1858 
1859     WriteText(active->GetCmdResponse()).WriteText("\n");
1860     Flush();
1861     if (NeedEarlyClose())
1862         return &CNCMessageHandler::x_CloseCmdAndConn;
1863 
1864     return &CNCMessageHandler::x_ReadBlobSignature;
1865 }
1866 
1867 CNCMessageHandler::State
x_ReadAuthMessage(void)1868 CNCMessageHandler::x_ReadAuthMessage(void)
1869 {
1870     LOG_CURRENT_FUNCTION
1871     if (NeedToClose()  ||  CTaskServer::IsInSoftShutdown()) {
1872         if (CTaskServer::IsInShutdown())
1873             GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
1874         else
1875             GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
1876         return &CNCMessageHandler::x_SaveStatsAndClose;
1877     }
1878 
1879     CTempString auth_line;
1880     if (!ReadLine(&auth_line)) {
1881         if (!HasError()  &&  CanHaveMoreRead())
1882             return NULL;
1883         if (IsReadDataAvailable()) {
1884             GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
1885             return &CNCMessageHandler::x_SaveStatsAndClose;
1886         }
1887         else {
1888             GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
1889             return &CNCMessageHandler::x_PrintCmdsCntAndClose;
1890         }
1891     }
1892 
1893     m_HttpMode = eNoHttp;
1894     size_t auth_size = auth_line.size();
1895     if (auth_size > 8 && auth_line[auth_size-8] == 'H') {
1896         if (NStr::strncmp(auth_line.data() + auth_size - 8, "HTTP/1.", 7) == 0) {
1897             if (auth_line[auth_size-1] == '0') {
1898                 m_HttpMode = eHttp10;
1899             } else if (auth_line[auth_size-1] == '1') {
1900                 m_HttpMode = eHttp11;
1901             }
1902         }
1903     }
1904 
1905     if (!x_IsHttpMode()) {
1906         TNSProtoParams params;
1907         try {
1908             m_Parser.ParseArguments(auth_line, s_AuthArgs, &params);
1909         }
1910         catch (CNSProtoParserException& ex) {
1911             SRV_LOG(Error, "Error authenticating client: '"
1912                              << auth_line << "': " << ex);
1913             params["client"] = auth_line;
1914             return &CNCMessageHandler::x_SaveStatsAndClose;
1915         }
1916         ITERATE(TNSProtoParams, it, params) {
1917             m_ClientParams[it->first] = it->second;
1918         }
1919         CSrvDiagMsg diag_msg;
1920         diag_msg.PrintExtra();
1921         ITERATE(TNSProtoParams, it, params) {
1922             diag_msg.PrintParam(it->first, it->second);
1923         }
1924         diag_msg.Flush();
1925     } else {
1926         m_PosponedCmd = auth_line;
1927 //        m_ClientParams["cache"] = "http";
1928 //        m_ClientParams["cache"] = "";
1929     }
1930 
1931     m_BaseAppSetup = m_AppSetup = CNCServer::GetAppSetup(m_ClientParams);
1932     if (m_AppSetup->disable) {
1933         SRV_LOG(Warning, "Disabled client is being disconnected ('"
1934                          << auth_line << "').");
1935         GetDiagCtx()->SetRequestStatus(eStatus_Disabled);
1936         return &CNCMessageHandler::x_SaveStatsAndClose;
1937     }
1938     else {
1939         return &CNCMessageHandler::x_ReadCommand;
1940     }
1941 }
1942 
1943 void
x_AssignCmdParams(void)1944 CNCMessageHandler::x_AssignCmdParams(void)
1945 {
1946     LOG_CURRENT_FUNCTION
1947     CTempString blob_key, blob_subkey;
1948     m_UserFlags = fNoUserFlags;
1949     m_BlobVersion = 0;
1950     m_NCBlobKey.Clear();
1951     m_BlobPass.clear();
1952     m_RawBlobPass.clear();
1953     m_KeyVersion = 1;
1954     m_BlobTTL = 0;
1955     m_StartPos = 0;
1956     m_Size = Uint8(-1);
1957     m_Slot = 0;
1958     m_OrigRecNo = 0;
1959     m_OrigSrvId = 0;
1960     m_OrigTime = 0;
1961     m_Quorum = 1;
1962     m_CmdVersion = 0;
1963     m_ForceLocal = false;
1964     m_AgeMax = m_AgeCur = 0;
1965     m_SlotsDone.clear();
1966     m_CmdParams.clear();
1967     bool quorum_was_set = false;
1968     bool search_was_set = false;
1969 
1970     new (m_LatestBlobSum) SNCBlobSummary();
1971     delete m_CopyBlobInfo;
1972     m_CopyBlobInfo = new SNCBlobVerData(nullptr);
1973     new (m_BlobFilter) SNCBlobFilter();
1974 
1975     CTempString cache_name;
1976 
1977     if (!x_IsHttpMode()) {
1978         m_HttpMode = eNoHttp;
1979         ERASE_ITERATE(TNSProtoParams, it, m_ParsedCmd.params) {
1980             const CTempString& key = it->first;
1981             CTempString& val = it->second;
1982 
1983             switch (key[0]) {
1984             case 'a':
1985                 if (key == "age") {
1986                     m_AgeMax = NStr::StringToUInt8(val);
1987                 }
1988                 break;
1989             case 'c':
1990                 switch (key[1]) {
1991                 case 'a':
1992                     if (key == "cache") {
1993                         cache_name = val;
1994                     }
1995                     break;
1996                 case 'm':
1997                     if (key == "cmd_ver") {
1998                         m_CmdVersion = NStr::StringToUInt(val);
1999                     }
2000                     break;
2001                 case 'o':
2002                     if (key == "confirm") {
2003                         if (val == "1")
2004                             x_UnsetFlag(fNoReplyOnFinish);
2005                         else
2006                             x_SetFlag(fNoReplyOnFinish);
2007                     }
2008                     break;
2009                 case 'r':
2010                     if (key == "cr_time") {
2011                         m_CopyBlobInfo->create_time = NStr::StringToUInt8(val);
2012                     }
2013                     else if (key == "cr_id") {
2014                         m_CopyBlobInfo->create_id = NStr::StringToUInt(val);
2015                     }
2016                     else if (key == "cr_srv") {
2017                         m_CopyBlobInfo->create_server = NStr::StringToUInt8(val);
2018                     }
2019                     break;
2020                 }
2021                 break;
2022             case 'd':
2023                 if (key == "dead") {
2024                     m_CopyBlobInfo->dead_time = NStr::StringToInt(val);
2025                 }
2026                 break;
2027             case 'e':
2028                 if (key == "exp") {
2029                     m_CopyBlobInfo->expire = NStr::StringToInt(val);
2030                 }
2031                 break;
2032             case 'f':
2033                 if (key == "flags") {
2034                     m_UserFlags = NStr::StringToUInt(val);
2035                 } else if (key == "fcr_ago_ge") {
2036                     m_BlobFilter->cr_ago_ge = NStr::StringToUInt8(val);
2037                 } else if (key == "fcr_ago_lt") {
2038                     m_BlobFilter->cr_ago_lt = NStr::StringToUInt8(val);
2039                 } else if (key == "fcr_epoch_ge") {
2040                     m_BlobFilter->cr_epoch_ge = NStr::StringToUInt8(val);
2041                 } else if (key == "fcr_epoch_lt") {
2042                     m_BlobFilter->cr_epoch_lt = NStr::StringToUInt8(val);
2043                 } else if (key == "fexp_now_ge") {
2044                     m_BlobFilter->exp_now_ge = NStr::StringToUInt8(val);
2045                 } else if (key == "fexp_now_lt") {
2046                     m_BlobFilter->exp_now_lt = NStr::StringToUInt8(val);
2047                 } else if (key == "fexp_epoch_ge") {
2048                     m_BlobFilter->exp_epoch_ge = NStr::StringToUInt8(val);
2049                 } else if (key == "fexp_epoch_lt") {
2050                     m_BlobFilter->exp_epoch_lt = NStr::StringToUInt8(val);
2051                 } else if (key == "fvexp_now_ge") {
2052                     m_BlobFilter->vexp_now_ge = NStr::StringToUInt8(val);
2053                 } else if (key == "fvexp_now_lt") {
2054                     m_BlobFilter->vexp_now_lt = NStr::StringToUInt8(val);
2055                 } else if (key == "fvexp_epoch_ge") {
2056                     m_BlobFilter->vexp_epoch_ge = NStr::StringToUInt8(val);
2057                 } else if (key == "fvexp_epoch_lt") {
2058                     m_BlobFilter->vexp_epoch_lt = NStr::StringToUInt8(val);
2059                 } else if (key == "fcr_srv") {
2060                     m_BlobFilter->cr_srv = NStr::StringToUInt8(val);
2061                 } else if (key == "fsize_ge") {
2062                     m_BlobFilter->size_ge = NStr::StringToUInt8(val);
2063                 } else if (key == "fsize_lt") {
2064                     m_BlobFilter->size_lt = NStr::StringToUInt8(val);
2065                 }
2066                 break;
2067             case 'h':
2068                 if (key == "http") {
2069                     m_HttpMode = (EHttpMode)NStr::StringToInt(val);
2070                 }
2071                 break;
2072             case 'i':
2073                 if (key == "ip") {
2074                     if (!val.empty())
2075                         GetDiagCtx()->SetClientIP(val);
2076                     // Erase parameter to not print it in request-start, it will be
2077                     // printed as a part of standard log header.
2078                     m_ParsedCmd.params.erase(it);
2079                 }
2080                 break;
2081             case 'k':
2082                 if (key == "key") {
2083                     blob_key = val;
2084                 }
2085                 break;
2086             case 'l':
2087                 if (key == "log_rec") {
2088                     m_OrigRecNo = NStr::StringToUInt8(val);
2089                 }
2090                 else if (key == "log_srv") {
2091                     m_OrigSrvId = NStr::StringToUInt8(val);
2092                 }
2093                 else if (key == "log_time") {
2094                     m_OrigTime = NStr::StringToUInt8(val);
2095                 }
2096                 else if (key == "local") {
2097                     m_ForceLocal = val == "1";
2098                 }
2099                 break;
2100             case 'm':
2101                 if (key == "md5_pass") {
2102                     m_BlobPass = val;
2103                 }
2104                 break;
2105             case 'n':
2106                 if (key == "ncbi_phid") {
2107                     if (!val.empty()) {
2108                         GetDiagCtx()->SetHitID(val);
2109                     }
2110                 }
2111                 break;
2112             case 'p':
2113                 if (key == "pass") {
2114                     m_RawBlobPass = val;
2115                     CMD5 md5;
2116                     md5.Update(val.data(), val.size());
2117                     unsigned char digest[16];
2118                     md5.Finalize(digest);
2119                     m_BlobPass.assign((char*)digest, 16);
2120                     // Erase parameter to not expose passwords via logs.
2121                     m_ParsedCmd.params.erase(it);
2122                 }
2123                 else if (key == "prev") {
2124                     m_StatPrev = val == "1";
2125                 }
2126                 break;
2127             case 'q':
2128                 if (key == "qrum") {
2129                     m_Quorum = NStr::StringToUInt(val);
2130                     quorum_was_set = true;
2131                 }
2132                 break;
2133             case 'r':
2134                 if (key == "rec_my") {
2135                     m_RemoteRecNo = NStr::StringToUInt8(val);
2136                 }
2137                 else if (key == "rec_your") {
2138                     m_LocalRecNo = NStr::StringToUInt8(val);
2139                 }
2140                 break;
2141             case 's':
2142                 switch (key[1]) {
2143                 case 'i':
2144                     if (key == "sid") {
2145                         if (!val.empty())
2146                             GetDiagCtx()->SetSessionID(NStr::URLDecode(val));
2147                         // Erase parameter to not print it in request-start,
2148                         // it will be printed as a part of standard log header.
2149                         m_ParsedCmd.params.erase(it);
2150                     }
2151                     else if (key == "size") {
2152                         m_Size = Uint8(NStr::StringToInt8(val));
2153                     }
2154                     break;
2155                 case 'l':
2156                     if (key == "slot") {
2157                         m_Slot = Uint2(NStr::StringToUInt(val));
2158                     }
2159                     break;
2160                 case 'r':
2161                     if (key == "srv_id") {
2162                         m_SrvId = NStr::StringToUInt8(val);
2163                     }
2164                     else if (key == "srch") {
2165                         m_SearchOnRead = val != "0";
2166                         search_was_set = true;
2167                     }
2168                     break;
2169                 case 't':
2170                     if (key == "start") {
2171                         m_StartPos = NStr::StringToUInt8(val);
2172                     }
2173                     break;
2174                 case 'u':
2175                     if (key == "subkey") {
2176                         blob_subkey = val;
2177                     }
2178                     break;
2179                 }
2180                 break;
2181             case 't':
2182                 if (key == "ttl") {
2183                     m_BlobTTL = NStr::StringToUInt(val);
2184                 }
2185                 else if (key == "type") {
2186                     m_StatType = val;
2187                 }
2188                 break;
2189             case 'v':
2190                 if (key == "version") {
2191                     m_BlobVersion = NStr::StringToInt(val);
2192                 }
2193                 else if (key == "ver_ttl") {
2194                     m_CopyBlobInfo->ver_ttl = NStr::StringToUInt(val);
2195                 }
2196                 else if (key == "ver_dead") {
2197                     m_CopyBlobInfo->ver_expire = NStr::StringToInt(val);
2198                 }
2199                 break;
2200             default:
2201                 break;
2202             }
2203         }
2204     } else {
2205         m_ClientParams["client"] = "";
2206         cache_name = m_ClientParams["cache"];
2207         if (m_ClientParams.find("key") != m_ClientParams.end()) {
2208             blob_key = m_ClientParams["key"];
2209         }
2210         // parse HTTP header
2211         CTempString cmd_line;
2212         map<string,string> headers;
2213         while (ReadLine(&cmd_line)) {
2214             if (cmd_line.empty()) {
2215                 break;
2216             }
2217             const string content_length("Content-Length:");
2218             const string     user_agent("User-Agent:");
2219             const string  content_range("Range:");
2220             size_t max_pos = cmd_line.size();
2221 
2222             if (NStr::StartsWith(cmd_line, content_length)) {
2223                 size_t pos = content_length.size();
2224                 m_Size = NStr::StringToUInt8(
2225                     CTempString(cmd_line.data() + pos, max_pos - pos),
2226                     NStr::fAllowLeadingSpaces | NStr::fAllowTrailingSpaces);
2227             }
2228             else if (NStr::StartsWith(cmd_line, content_range)) {
2229                 size_t pos = content_range.size();
2230                 const char* begin = cmd_line.data() + pos;
2231                 while (!isdigit(*begin) && pos < max_pos) {
2232                     ++pos; ++begin;
2233                 }
2234                 m_StartPos = 0;
2235                 while (isdigit(*begin) && pos < max_pos) {
2236                     m_StartPos = m_StartPos * 10 + (*begin - '0');
2237                     ++pos; ++begin;
2238                 }
2239                 while (!isdigit(*begin) && pos < max_pos) {
2240                     ++pos; ++begin;
2241                 }
2242                 m_Size = 0;
2243                 while (isdigit(*begin) && pos < max_pos) {
2244                     m_Size = m_Size * 10 + (*begin - '0');
2245                     ++pos; ++begin;
2246                 }
2247                 // byte pos are inclusive
2248                 m_Size = (m_Size >= m_StartPos) ? (m_Size - m_StartPos + 1) : 0;
2249             }
2250             else if (NStr::StartsWith(cmd_line, user_agent)) {
2251                 list<CTempString>   arr;
2252                 size_t pos = user_agent.size();
2253                 NStr::Split( CTempString(cmd_line.data()+pos, max_pos-pos), " ", arr, NStr::fSplit_Tokenize);
2254                 if (!arr.empty()) {
2255                     m_ClientParams["client"] = NStr::URLEncode( arr.front());
2256                 }
2257             }
2258             else {
2259                 string key, value;
2260                 NStr::SplitInTwo(cmd_line, ":", key, value);
2261                 key = NStr::TruncateSpaces(key);
2262                 key = NStr::ToUpper(key);
2263                 value = NStr::TruncateSpaces(value);
2264                 headers[key] = value;
2265             }
2266         }
2267         list<CTempString>   arr;
2268         if (headers.find("NCBI-SID") != headers.end()) {
2269             NStr::Split( headers.at("NCBI-SID"), ", \t", arr, NStr::fSplit_Tokenize);
2270             if (!arr.empty()) {
2271                 m_CmdParams["sid"] = arr.back();
2272             }
2273         }
2274         arr.clear();
2275         if (headers.find("NCBI-PHID") != headers.end()) {
2276             NStr::Split( headers.at("NCBI-PHID"), ", \t", arr, NStr::fSplit_Tokenize);
2277             if (!arr.empty()) {
2278                 m_CmdParams["ncbi_phid"] = arr.back();
2279             }
2280         }
2281         arr.clear();
2282         string client_ip = g_GetClientIP( headers);
2283         if (!client_ip.empty()) {
2284             NStr::Split( client_ip, ", \t", arr, NStr::fSplit_Tokenize);
2285         }
2286         if (!arr.empty()) {
2287             m_CmdParams["ip"] = arr.front();
2288         } else {
2289             m_CmdParams["ip"] = m_ClientParams["peer"];
2290         }
2291     }
2292 
2293     m_NCBlobKey.Assign(cache_name, blob_key, blob_subkey);
2294     if (cache_name.empty()) {
2295         m_AppSetup = m_BaseAppSetup;
2296         m_ClientParams.erase("cache");
2297     }
2298     else if (cache_name == m_PrevCache) {
2299         m_AppSetup = m_PrevAppSetup;
2300     }
2301     else {
2302         m_PrevCache = cache_name;
2303         m_ClientParams["cache"] = cache_name;
2304         m_AppSetup = m_PrevAppSetup = CNCServer::GetAppSetup(m_ClientParams);
2305     }
2306     if (!quorum_was_set)
2307         m_Quorum = m_AppSetup->quorum;
2308     if (m_ForceLocal)
2309         m_Quorum = 1;
2310     if (!search_was_set)
2311         m_SearchOnRead = m_AppSetup->srch_on_read;
2312 }
2313 
2314 void
x_PrintRequestStart(CSrvDiagMsg & diag_msg)2315 CNCMessageHandler::x_PrintRequestStart(CSrvDiagMsg& diag_msg)
2316 {
2317     LOG_CURRENT_FUNCTION
2318     diag_msg.StartRequest();
2319     diag_msg.PrintParam("_type", "cmd");
2320     diag_msg.PrintParam("cmd", m_ParsedCmd.command->cmd);
2321     diag_msg.PrintParam("client", m_ClientParams["client"]);
2322     diag_msg.PrintParam("conn", m_ConnReqId);
2323     diag_msg.PrintParam("phost", m_ClientParams["peer"]);
2324     ITERATE(TNSProtoParams, it, m_ParsedCmd.params) {
2325         diag_msg.PrintParam(it->first, it->second);
2326     }
2327     ITERATE(TStringMap, it, m_CmdParams) {
2328         diag_msg.PrintParam(it->first, it->second);
2329     }
2330     if (!m_BlobPass.empty()) {
2331         diag_msg.PrintParam("pass", CNCBlobStorage::PrintablePassword(m_BlobPass));
2332     }
2333     diag_msg.PrintParam("ncbi_role", CNCServer::GetHostRole());
2334     diag_msg.PrintParam("ncbi_location", CNCServer::GetHostLocation());
2335 }
2336 
2337 CNCMessageHandler::State
x_StartCommand(void)2338 CNCMessageHandler::x_StartCommand(void)
2339 {
2340     LOG_CURRENT_FUNCTION
2341     m_CmdPrevTime = m_CmdStartTime = CSrvTime::Current();
2342     m_CmdLog.clear();
2343     CNCStat::CmdStarted(m_ParsedCmd.command->cmd);
2344     CSrvDiagMsg diag_msg;
2345     x_PrintRequestStart(diag_msg);
2346 
2347     if (NeedToClose()) {
2348         diag_msg.Flush();
2349         x_ResetFlags();
2350         return &CNCMessageHandler::x_CloseCmdAndConn;
2351     }
2352     if (HasError()  ||  !CanHaveMoreRead()) {
2353         diag_msg.Flush();
2354         GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
2355         x_ResetFlags();
2356         return &CNCMessageHandler::x_CloseCmdAndConn;
2357     }
2358 
2359     if (x_IsFlagSet(fNeedsAdminClient)
2360         &&  m_ClientParams["client"] != CNCServer::GetAdminClient()
2361         &&  m_ClientParams["client"] != kNCPeerClientName)
2362     {
2363         string msg;
2364         msg += "command: " + string(m_ParsedCmd.command->cmd);
2365         msg += ", peer: " + m_ClientParams["peer"];
2366         msg += ", client: " + m_ClientParams["client"];
2367         CNCAlerts::Register(CNCAlerts::eAccessDenied, msg);
2368         diag_msg.Flush();
2369         x_ReportError(eStatus_NeedAdmin);
2370         if (!x_IsHttpMode()) {
2371             x_ResetFlags();
2372         }
2373         return &CNCMessageHandler::x_FinishCommand;
2374     }
2375 
2376     if (!CNCServer::IsCachingComplete()
2377         &&  (x_IsFlagSet(fNeedsStorageCache)  ||  m_ForceLocal))
2378     {
2379         diag_msg.Flush();
2380         x_ReportError(eStatus_JustStarted);
2381         if (!x_IsHttpMode()) {
2382             x_ResetFlags();
2383         }
2384         return &CNCMessageHandler::x_FinishCommand;
2385     }
2386 
2387     if (m_AppSetup->disable) {
2388         diag_msg.Flush();
2389         // We'll be here only if generally work for the client is enabled but
2390         // for current particular cache it is disabled.
2391         x_ReportError(eStatus_Disabled);
2392         if (!x_IsHttpMode()) {
2393             x_ResetFlags();
2394         }
2395         return &CNCMessageHandler::x_FinishCommand;
2396     }
2397 
2398     if (x_IsFlagSet(fRunsInStartedSync)) {
2399         ESyncInitiateResult start_res = CNCPeriodicSync::CanStartSyncCommand(
2400                                         m_SrvId, m_Slot,
2401                                         !x_IsFlagSet(fProhibitsSyncAbort),
2402                                         m_SyncId);
2403         if (start_res == eNetworkError) {
2404             diag_msg.Flush();
2405             x_ReportError(eStatus_CmdAborted);
2406             if (!x_IsHttpMode()) {
2407                 x_ResetFlags();
2408             }
2409             return &CNCMessageHandler::x_FinishCommand;
2410         }
2411         else if (start_res == eServerBusy  ||  CTaskServer::IsInSoftShutdown()) {
2412             diag_msg.Flush();
2413             x_ReportOK("OK:SIZE=0, NEED_ABORT1\n");
2414             GetDiagCtx()->SetRequestStatus(eStatus_SyncAborted);
2415             // Old NC servers (those which used CNetCacheAPI instead of
2416             // CNCActiveHandler) always started to write blob data in SYNC_PUT
2417             // even when we responded to them NEED_ABORT. To avoid breaking the protocol
2418             // we need to read from them those fake blob writes.
2419             bool needs_fake = x_IsFlagSet(fReadExactBlobSize)  &&  m_CmdVersion == 0;
2420             if (start_res == eServerBusy)
2421                 x_ResetFlags();
2422             if (needs_fake)
2423                 return &CNCMessageHandler::x_ReadBlobSignature;
2424             return &CNCMessageHandler::x_FinishCommand;
2425         }
2426     }
2427 
2428     if (x_IsFlagSet(fNeedsSpaceAsPeer)
2429         && !CNCBlobStorage::AcceptWritesFromPeers()) {
2430         x_ReportError(eStatus_NoDiskSpace);
2431         return &CNCMessageHandler::x_FinishCommand;
2432     }
2433 
2434     if (!x_IsFlagSet(fNeedsBlobAccess) && !x_IsFlagSet(fNeedsBlobList)) {
2435         diag_msg.Flush();
2436         // if we do not need blob access
2437         return m_CmdProcessor;
2438     }
2439 
2440     if (((m_BlobPass.empty()  &&  m_AppSetup->pass_policy == eNCOnlyWithPass)
2441             ||  (!m_BlobPass.empty()  &&  m_AppSetup->pass_policy == eNCOnlyWithoutPass))
2442         &&  !x_IsFlagSet(fDoNotCheckPassword))
2443     {
2444         diag_msg.Flush();
2445         GetDiagCtx()->SetRequestStatus(eStatus_NotAllowed);
2446         // why substr?
2447         x_ReportError(GetMessageByStatus(eStatus_NotAllowed).substr(4));
2448         SRV_LOG(Warning, GetMessageByStatus(eStatus_NotAllowed));
2449         return &CNCMessageHandler::x_FinishCommand;
2450     }
2451 
2452     if (x_IsFlagSet(fCanGenerateKey)  &&  m_NCBlobKey.RawKey().empty()) {
2453         string raw_key;
2454 //to test
2455 //        bool old_ver = (CSrvTime::Current().CurSecs() % 2) != 0;
2456         bool old_ver = !x_IsHttpMode();
2457 
2458         CNCDistributionConf::GenerateBlobKey(
2459             !x_IsHttpMode() ? m_LocalPort : Uint4( CNCDistributionConf::GetSelfID()),
2460             raw_key, m_BlobSlot, m_TimeBucket, old_ver ? 1 : 3);
2461         m_NCBlobKey.Assign(raw_key);
2462         diag_msg.PrintParam("key", m_NCBlobKey.RawKey());
2463         diag_msg.PrintParam("gen_key", "1");
2464     }
2465     else if (!m_NCBlobKey.IsValid() && !x_IsFlagSet(fNeedsBlobList)) {
2466         diag_msg.Flush();
2467         x_ReportError(eStatus_NotFound);
2468         SRV_LOG(Warning, "Invalid blob key format: " << m_NCBlobKey.RawKey());
2469         return &CNCMessageHandler::x_FinishCommand;
2470     }
2471     else if (m_NCBlobKey.IsICacheKey()) {
2472         CNCDistributionConf::GetSlotByICacheKey(m_NCBlobKey, m_BlobSlot, m_TimeBucket);
2473     } else {
2474         CNCDistributionConf::GetSlotByRnd(m_NCBlobKey.GetRandomPart(), m_BlobSlot, m_TimeBucket);
2475     }
2476 
2477     m_BlobSize = 0;
2478     if (m_Slot == 0) {
2479         diag_msg.PrintParam("slot", m_BlobSlot);
2480     }
2481 
2482     if ((!CNCDistributionConf::IsServedLocally(m_BlobSlot)
2483             ||  !CNCServer::IsCachingComplete())
2484         &&  !x_IsFlagSet(fDoNotProxyToPeers)
2485         &&  !m_ForceLocal
2486         && CNCDistributionConf::CountServersForSlot(m_BlobSlot) != 0)
2487     {
2488         diag_msg.PrintParam("proxy", "1");
2489         diag_msg.Flush();
2490         x_GetCurSlotServers();
2491         return &CNCMessageHandler::x_ProxyToNextPeer;
2492     }
2493 
2494     if (!x_IsFlagSet(fNeedsBlobAccess)) {
2495         diag_msg.Flush();
2496         // if we do not need blob access
2497         return m_CmdProcessor;
2498     }
2499 
2500     // no blob access before caching is done
2501     if (!CNCServer::IsCachingComplete())
2502     {
2503         diag_msg.Flush();
2504         x_ReportError(eStatus_JustStarted);
2505         if (!x_IsHttpMode()) {
2506             x_ResetFlags();
2507         }
2508         return &CNCMessageHandler::x_FinishCommand;
2509     }
2510 
2511     diag_msg.Flush();
2512 
2513     if (!CNCServer::IsInitiallySynced()  &&  !m_ForceLocal
2514         &&  x_IsFlagSet(fUsesPeerSearch))
2515     {
2516         m_Quorum = 0;
2517     }
2518     if (x_IsFlagSet(fNeedsLowerPriority))
2519         SetPriority(CNCDistributionConf::GetSyncPriority());
2520     if ((x_IsFlagSet(fNeedsSpaceAsClient)
2521             &&  CNCBlobStorage::NeedStopWrite())
2522         ||  (x_IsFlagSet(fNeedsSpaceAsPeer)
2523             &&  !CNCBlobStorage::AcceptWritesFromPeers()))
2524     {
2525         if (CNCDistributionConf::CountServersForSlot(m_BlobSlot) != 0) {
2526             x_GetCurSlotServers();
2527             return &CNCMessageHandler::x_ProxyToNextPeer;
2528         }
2529         x_ReportError(eStatus_NoDiskSpace);
2530         return &CNCMessageHandler::x_FinishCommand;
2531     }
2532 
2533     x_LogCmdEvent("RequestMetaInfo");
2534     m_BlobAccess = CNCBlobStorage::GetBlobAccess(
2535                                     x_IsUserFlagSet(fNoCreate) ? eNCNone : m_ParsedCmd.command->extra.blob_access,
2536                                     m_NCBlobKey.PackedKey(), m_BlobPass, m_TimeBucket);
2537     m_BlobAccess->RequestMetaInfo(this);
2538     return &CNCMessageHandler::x_WaitForBlobAccess;
2539 }
2540 
2541 CNCMessageHandler::State
x_ReadCommand(void)2542 CNCMessageHandler::x_ReadCommand(void)
2543 {
2544     LOG_CURRENT_FUNCTION
2545     if (NeedToClose()  ||  CTaskServer::IsInSoftShutdown()) {
2546         if (CTaskServer::IsInShutdown())
2547             GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
2548         else
2549             GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
2550         return &CNCMessageHandler::x_SaveStatsAndClose;
2551     }
2552 
2553     CTempString cmd_line;
2554     if (x_IsHttpMode() && !m_PosponedCmd.empty()) {
2555         cmd_line = m_PosponedCmd;
2556     } else if (!ReadLine(&cmd_line)) {
2557         if (!HasError()  &&  CanHaveMoreRead())
2558             return NULL;
2559         if (IsReadDataAvailable())
2560             GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
2561         return &CNCMessageHandler::x_SaveStatsAndClose;
2562     }
2563 
2564     if (!x_IsHttpMode()) {
2565         try {
2566             m_ParsedCmd = m_Parser.ParseCommand(cmd_line);
2567         }
2568         catch (CNSProtoParserException& ex) {
2569             SRV_LOG(Warning, "Error parsing command: " << ex);
2570             GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2571             return &CNCMessageHandler::x_SaveStatsAndClose;
2572         }
2573     } else {
2574         m_ClientParams.erase("key");
2575         list<CTempString> arr;
2576         ncbi_NStr_Split(cmd_line, " ", arr);
2577         bool good = false;
2578         if (arr.size() >= 3) {
2579             CTempString arr_cmd(arr.front());
2580             CTempString arr_uri(*(++arr.begin()));
2581             CTempString arr_key;
2582             if (arr_cmd == "DELETE" ||
2583                 arr_cmd == "GET"    ||
2584                 arr_cmd == "HEAD"   ||
2585                 arr_cmd == "POST"   ||
2586                 arr_cmd == "PUT") {
2587                 {
2588                     // eg, "/"  "/service"
2589                     list<CTempString> uri_parts;
2590                     ncbi_NStr_Split(arr_uri, "/", uri_parts);
2591                     if (uri_parts.size() > 0) {
2592                         if (arr_cmd != "POST") {
2593                             arr_key = uri_parts.back();
2594                             m_ClientParams["key"] = arr_key;
2595                             uri_parts.pop_back();
2596                         }
2597                         string service;
2598                         if (!uri_parts.empty())  {
2599                             service = NStr::Join(uri_parts, "/");
2600                         }
2601                         m_ClientParams["service"] = service;
2602                     }
2603                 }
2604                 m_ClientParams["cache"].clear();
2605 
2606                 try {
2607                     m_ParsedCmd = m_Parser.ParseCommand(cmd_line);
2608                     good = true;
2609                 }
2610                 catch (CNSProtoParserException& ) {
2611                     GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2612                 }
2613             } else {
2614                 GetDiagCtx()->SetRequestStatus(eStatus_NoImpl);
2615                 SRV_LOG(Error, "Unrecognized command: " << cmd_line);
2616             }
2617         } else {
2618             SRV_LOG(Error, "Error parsing command: " << cmd_line);
2619             GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2620         }
2621         m_PosponedCmd.clear();
2622         if (!good) {
2623             x_WriteHttpResponse();
2624             return &CNCMessageHandler::x_SaveStatsAndClose;
2625         }
2626     }
2627 
2628     const SCommandExtra& cmd_extra = m_ParsedCmd.command->extra;
2629     m_CmdProcessor = cmd_extra.processor;
2630     m_Flags        = cmd_extra.cmd_flags;
2631     CreateNewDiagCtx();
2632     try {
2633         x_AssignCmdParams();
2634     }
2635     catch (CStringException& ex) {
2636         ReleaseDiagCtx();
2637         SRV_LOG(Warning, "Error while parsing command '" << cmd_line
2638                          << "': " << ex);
2639         GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2640         return &CNCMessageHandler::x_SaveStatsAndClose;
2641     }
2642     return &CNCMessageHandler::x_StartCommand;
2643 }
2644 
2645 void
x_GetCurSlotServers(void)2646 CNCMessageHandler::x_GetCurSlotServers(void)
2647 {
2648     LOG_CURRENT_FUNCTION
2649     CNCDistributionConf::GetServersForSlot(m_BlobSlot, m_CheckSrvs);
2650     m_SrvsIndex = 0;
2651     Uint4 main_srv_ip = 0;
2652     if (!m_NCBlobKey.IsICacheKey()) {
2653         main_srv_ip = CNCDistributionConf::GetMainSrvIP(m_NCBlobKey);
2654     }
2655     m_ThisServerIsMain = false;
2656     if (main_srv_ip != 0) {
2657         // Note: this check for "main" server for blob assumes that for each
2658         // blob slot only one NetCache instance processing it works on each
2659         // server. It will give false positive results if there are several
2660         // NC instances on the same server processing the same slot. But there's
2661         // no much sense in such setup, so it's pretty safe assumption.
2662         if (Uint4(CNCDistributionConf::GetSelfID() >> 32) == main_srv_ip
2663             // if NeedStopWrite, I cannot be sure that blob on this server is most recent
2664             // and need to ask other servers
2665             && !CNCBlobStorage::NeedStopWrite()
2666             // make sure this slot is served here
2667             && CNCDistributionConf::IsServedLocally(m_BlobSlot)
2668             ) {
2669             //m_ThisServerIsMain = true;
2670             m_ThisServerIsMain =
2671                 CNCBlobAccessor::HasPutSucceeded(m_NCBlobKey.PackedKey());
2672         }
2673         if (!m_ThisServerIsMain) {
2674             CWriteBackControl::AnotherServerMain();
2675             for (size_t i = 0; i < m_CheckSrvs.size(); ++i) {
2676                 if (Uint4(m_CheckSrvs[i] >> 32) == main_srv_ip) {
2677                     Uint8 srv_id = m_CheckSrvs[i];
2678                     m_CheckSrvs.erase(m_CheckSrvs.begin() + i);
2679                     m_CheckSrvs.insert(m_CheckSrvs.begin(), srv_id);
2680                     break;
2681                 }
2682             }
2683         }
2684     }
2685 }
2686 
2687 void
x_JournalBlobPutResult(int status,const string & blob_key,Uint2 blob_slot)2688 CNCMessageHandler::x_JournalBlobPutResult(int status, const string& blob_key, Uint2 blob_slot)
2689 {
2690     if (CNCDistributionConf::CountServersForSlot(blob_slot) != 0) {
2691         if (status == eStatus_PrematureClose && status == eStatus_CmdTimeout) {
2692             CNCBlobAccessor::PutFailed(blob_key);
2693         } else {
2694             CNCBlobAccessor::PutSucceeded(blob_key);
2695         }
2696     }
2697 }
2698 
2699 void
x_ReportError(EHTTPStatus sts,bool eol)2700 CNCMessageHandler::x_ReportError( EHTTPStatus sts, bool eol /*= true*/)
2701 {
2702     GetDiagCtx()->SetRequestStatus(sts);
2703     x_ReportError( GetMessageByStatus(sts), eol);
2704 }
2705 
2706 void
x_ReportError(const string & sts,bool eol)2707 CNCMessageHandler::x_ReportError( const string& sts, bool eol /*= true*/)
2708 {
2709 // in HTTP mode we leave it to x_CleanCmdResources always
2710     if (!x_IsHttpMode()) {
2711         if (!x_IsFlagSet(fNoReplyOnFinish)) {
2712             x_SetFlag(fNoReplyOnFinish);
2713             WriteText(sts);
2714             if (eol) {
2715                 WriteText("\n");
2716             }
2717         }
2718     }
2719 }
2720 
2721 CNCMessageHandler&
x_ReportOK(const string & sts)2722 CNCMessageHandler::x_ReportOK(const string& sts)
2723 {
2724     if (!x_IsHttpMode()) {
2725         if (!x_IsFlagSet(fNoReplyOnFinish)) {
2726             x_SetFlag(fNoReplyOnFinish);
2727             WriteText(sts);
2728         }
2729     }
2730     return *this;
2731 }
2732 
2733 void
x_LogCmdEvent(const CTempString & evt)2734 CNCMessageHandler::x_LogCmdEvent( const CTempString& evt)
2735 {
2736     CSrvTime cmd_now = CSrvTime::Current();
2737     CSrvTime cmd_len = cmd_now;
2738     cmd_len -= m_CmdPrevTime;
2739     CSrvTime cmd_time = cmd_now;
2740     cmd_time -= m_CmdStartTime;
2741     m_CmdPrevTime = cmd_now;
2742     m_CmdLog.push_back( evt + ": " + NStr::NumericToString(cmd_len.AsUSec())  + "us ("
2743                                    + NStr::NumericToString(cmd_time.AsUSec()) + "us)");
2744 }
2745 
x_LogCmdLog(void)2746 void CNCMessageHandler::x_LogCmdLog(void)
2747 {
2748     for( const string& l : m_CmdLog) {
2749         SRV_LOG(Warning, l);
2750     }
2751 }
2752 
2753 CNCMessageHandler::State
x_WaitForBlobAccess(void)2754 CNCMessageHandler::x_WaitForBlobAccess(void)
2755 {
2756     LOG_CURRENT_FUNCTION
2757     if (!m_BlobAccess->IsMetaInfoReady())
2758         return NULL;
2759     if (NeedEarlyClose())
2760         return &CNCMessageHandler::x_CloseCmdAndConn;
2761 
2762     if (!m_BlobAccess->IsAuthorized()  &&  !x_IsFlagSet(fDoNotCheckPassword)) {
2763         x_ReportError(eStatus_BadPassword);
2764         return &CNCMessageHandler::x_FinishCommand;
2765     }
2766 
2767 // send COPY_UPD notification early, before blob data is received
2768 //  another option is to send it in x_FinishReadingBlob,
2769 //      after m_BlobAccess->Finalize(), before  CNCPeerControl::MirrorWrite
2770 // sending the notification here we assume the risk that the blob will not be received correctly
2771 // and the notification will create confusion only (it will be corrected by periodic sync).
2772     if (
2773         (m_Flags & eProxyBlobWrite) == eProxyBlobWrite // request from clients only
2774         && CNCDistributionConf::GetBlobUpdateHotline()
2775 #if !USE_ALWAYS_COPY_UPD
2776         // if the blob exists here, it can exist on mirrors as well
2777         && (m_BlobAccess->IsBlobExists()
2778         // if blob does not exist here, but it is created on another server, I should notify them
2779             || !CNCDistributionConf::IsThisServerKey(m_NCBlobKey))
2780 #endif
2781         ) {
2782         CNCPeerControl::MirrorUpdate(m_NCBlobKey, m_BlobSlot, CSrvTime::Current().AsUSec());
2783     }
2784 
2785     if (!x_IsFlagSet(fUsesPeerSearch)) {
2786         x_LogCmdEvent("CmdProcessor");
2787         return m_CmdProcessor;
2788     }
2789 
2790     // All commands that have fUsesPeerSearch will operate on m_LatestExist and
2791     // m_LatestBlobSum, so we need to fill it here even if in next "if" we'll go
2792     // almost directly to m_CmdProcessor.
2793     bool is_exist = m_BlobAccess->IsBlobExists();
2794     bool is_valid = m_BlobAccess->IsValid();
2795     m_LatestExist = is_exist
2796                     &&  (x_IsFlagSet(fNoBlobVersionCheck)
2797                          ||  m_BlobAccess->GetCurBlobVersion() == m_BlobVersion);
2798     if (x_IsFlagSet(fDoNotProxyToPeers) || m_ForceLocal || !m_SearchOnRead) {
2799         m_LatestSrvId = CNCDistributionConf::GetSelfID();
2800     } else {
2801         m_LatestSrvId = is_valid ?
2802             (is_exist ? CNCDistributionConf::GetSelfID() : CNCDistributionConf::GetMainSrvId(m_NCBlobKey)) :
2803             m_BlobAccess->GetValidServer();
2804     }
2805     if (m_LatestSrvId != 0) {
2806         if (m_LatestExist) {
2807             m_LatestBlobSum->create_time   = m_BlobAccess->GetCurBlobCreateTime();
2808             m_LatestBlobSum->create_server = m_BlobAccess->GetCurCreateServer();
2809             m_LatestBlobSum->create_id     = m_BlobAccess->GetCurCreateId();
2810             m_LatestBlobSum->dead_time     = m_BlobAccess->GetCurBlobDeadTime();
2811             m_LatestBlobSum->expire        = m_BlobAccess->GetCurBlobExpire();
2812             m_LatestBlobSum->ver_expire    = m_BlobAccess->GetCurVerExpire();
2813         }
2814         if (x_IsFlagSet(fDoNotProxyToPeers)
2815             ||  m_ForceLocal
2816             ||  (m_Quorum == 1  &&  (m_LatestExist  ||  !m_SearchOnRead))
2817             ||  (m_LatestExist  &&  x_IsFlagSet(fPeerFindExistsOnly)))
2818         {
2819             return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
2820         }
2821     }
2822 
2823     x_GetCurSlotServers();
2824     if (m_ThisServerIsMain
2825         &&  m_AppSetup->fast_on_main
2826         &&  CNCServer::IsInitiallySynced())
2827     {
2828         if (!is_valid) {
2829             m_CheckSrvs.clear();
2830             m_SrvsIndex = 0;
2831         }
2832         return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
2833     }
2834     if (!is_valid) {
2835         Uint8 srv_id = m_BlobAccess->GetValidServer();
2836         if (srv_id) {
2837             for (size_t i = 0; i < m_CheckSrvs.size(); ++i) {
2838                 if (m_CheckSrvs[i] == srv_id) {
2839                     m_CheckSrvs.erase(m_CheckSrvs.begin() + i);
2840                     m_CheckSrvs.insert(m_CheckSrvs.begin(), srv_id);
2841                     break;
2842                 }
2843             }
2844         }
2845     }
2846 
2847     if (m_LatestExist  &&  m_Quorum != 0) {
2848         --m_Quorum;
2849     }
2850     x_LogCmdEvent("ReadMetaNextPeer");
2851     return &CNCMessageHandler::x_ReadMetaNextPeer;
2852 }
2853 
2854 CNCMessageHandler::State
x_ReportBlobNotFound(void)2855 CNCMessageHandler::x_ReportBlobNotFound(void)
2856 {
2857     LOG_CURRENT_FUNCTION
2858     x_SetFlag(fNoBlobAccessStats);
2859     x_ReportError(eStatus_NotFound, false);
2860     if (!x_IsHttpMode()) {
2861         if (m_AgeMax != 0 && m_BlobAccess->IsBlobExists()) {
2862             WriteText(", AGE=").WriteNumber(m_AgeCur);
2863             WriteText(", VER=").WriteNumber(m_BlobAccess->GetCurBlobVersion());
2864         }
2865         WriteText("\n");
2866     }
2867     return &CNCMessageHandler::x_FinishCommand;
2868 }
2869 
2870 void
x_ProlongBlobDeadTime(unsigned int add_time)2871 CNCMessageHandler::x_ProlongBlobDeadTime(unsigned int add_time)
2872 {
2873     LOG_CURRENT_FUNCTION
2874 
2875     CSrvTime cur_srv_time = CSrvTime::Current();
2876     Uint8 cur_time = cur_srv_time.AsUSec();
2877     int now = int(cur_srv_time.Sec());
2878     int new_expire = now + add_time;
2879     if (new_expire < now) {
2880         return;
2881     }
2882     int old_expire = m_BlobAccess->GetCurBlobExpire();
2883     if (!CNCServer::IsDebugMode() &&
2884         new_expire - old_expire < (int)m_AppSetup->ttl_unit) {
2885         return;
2886     }
2887 
2888     if (m_AppSetup->lifespan_ttl > 0) {
2889         int created = (int)(m_BlobAccess->GetCurBlobCreateTime()/kUSecsPerSecond);
2890         int retire = (int)(created + m_AppSetup->lifespan_ttl);
2891         if (retire > created) {
2892             new_expire = min(new_expire,retire);
2893         }
2894     }
2895 
2896     m_BlobAccess->SetCurBlobExpire(new_expire);
2897     if (m_BlobAccess->GetCurBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
2898         SNCSyncEvent* event = new SNCSyncEvent();
2899         event->blob_size = m_BlobAccess->GetCurBlobSize();
2900         event->event_type = eSyncProlong;
2901         event->key = m_NCBlobKey;
2902         event->orig_server = CNCDistributionConf::GetSelfID();
2903         event->orig_time = cur_time;
2904         CNCSyncLog::AddEvent(m_BlobSlot, event);
2905         CNCPeerControl::MirrorProlong(m_NCBlobKey, m_BlobSlot,
2906                                       event->orig_rec_no, cur_time, m_BlobAccess);
2907     }
2908 }
2909 
2910 void
x_ProlongVersionLife(void)2911 CNCMessageHandler::x_ProlongVersionLife(void)
2912 {
2913     LOG_CURRENT_FUNCTION
2914     CSrvTime cur_srv_time = CSrvTime::Current();
2915     Uint8 cur_time = cur_srv_time.AsUSec();
2916     int new_expire = int(cur_srv_time.Sec()) + m_BlobAccess->GetCurVersionTTL();
2917     int old_expire = m_BlobAccess->GetCurVerExpire();
2918     if (!CNCServer::IsDebugMode()  &&  new_expire - old_expire < m_AppSetup->ttl_unit)
2919         return;
2920 
2921     m_BlobAccess->SetCurVerExpire(new_expire);
2922     if (m_BlobAccess->GetCurBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
2923         SNCSyncEvent* event = new SNCSyncEvent();
2924         event->blob_size = m_BlobAccess->GetCurBlobSize();
2925         event->event_type = eSyncProlong;
2926         event->key = m_NCBlobKey;
2927         event->orig_server = CNCDistributionConf::GetSelfID();
2928         event->orig_time = cur_time;
2929         CNCSyncLog::AddEvent(m_BlobSlot, event);
2930         CNCPeerControl::MirrorProlong(m_NCBlobKey, m_BlobSlot,
2931                                       event->orig_rec_no, cur_time, m_BlobAccess);
2932     }
2933 }
2934 
2935 void
x_CleanCmdResources(void)2936 CNCMessageHandler::x_CleanCmdResources(void)
2937 {
2938     LOG_CURRENT_FUNCTION
2939     int cmd_status = GetDiagCtx()->GetRequestStatus();
2940     bool print_size = false;
2941     Uint8 written_size = 0;
2942     ENCAccessType access_type = eNCCopyCreate;
2943     if (m_write_event) {
2944         delete m_write_event;
2945         m_write_event = NULL;
2946     }
2947     if (m_BlobAccess) {
2948         access_type = m_BlobAccess->GetAccessType();
2949         if (m_BlobAccess->IsBlobExists()  &&  !x_IsFlagSet(fNoBlobAccessStats)) {
2950             print_size = true;
2951             if (access_type == eNCRead  ||  access_type == eNCReadData)
2952                 m_BlobSize = m_BlobAccess->GetCurBlobSize();
2953             else
2954                 m_BlobSize = m_BlobAccess->GetNewBlobSize();
2955             if (access_type == eNCReadData)
2956                 GetDiagCtx()->SetBytesWr(m_BlobAccess->GetSizeRead());
2957             else if (access_type == eNCCreate  ||  access_type == eNCCopyCreate)
2958                 GetDiagCtx()->SetBytesRd(m_BlobSize);
2959         }
2960         else if (access_type == eNCCreate) {
2961             written_size = m_BlobAccess->GetNewBlobSize();
2962         }
2963         m_BlobAccess->Release();
2964         m_BlobAccess = NULL;
2965     }
2966     else if (m_ActiveHub) {
2967         switch (m_ParsedCmd.command->extra.proxy_cmd) {
2968         case eProxyRead:
2969         case eProxyReadLast:
2970             print_size = true;
2971             m_BlobSize = m_ActiveHub->GetHandler()->GetSizeRd();
2972             GetDiagCtx()->SetBytesWr(m_BlobSize);
2973             break;
2974         case eProxyWrite:
2975             print_size = true;
2976             m_BlobSize = m_ActiveHub->GetHandler()->GetSizeWr();
2977             GetDiagCtx()->SetBytesRd(m_BlobSize);
2978             break;
2979         default:
2980             break;
2981         }
2982         m_ActiveHub->GetHandler()->ResetSizeRdWr();
2983     }
2984 
2985     if (x_IsFlagSet(fRunsInStartedSync)) {
2986         if (x_IsCmdSucceeded(cmd_status)  ||  x_IsFlagSet(fSyncCmdSuccessful)) {
2987             CNCPeriodicSync::SyncCommandFinished(m_SrvId, m_Slot, m_SyncId);
2988         } else {
2989             CNCPeriodicSync::Cancel(m_SrvId, m_Slot, m_SyncId);
2990         }
2991     }
2992     if (!x_IsFlagSet(fNoReplyOnFinish)) {
2993         if (!x_IsHttpMode()) {
2994             if (x_IsCmdSucceeded(cmd_status)) {
2995                 WriteText("OK:\n");
2996             } else {
2997                 WriteText(GetMessageByStatus(EHTTPStatus(cmd_status))).WriteText("\n");
2998             }
2999         } else {
3000             x_WriteHttpResponse();
3001         }
3002     }
3003     Flush();
3004 
3005     if (m_ActiveHub) {
3006         m_ActiveHub->Release();
3007         m_ActiveHub = NULL;
3008     }
3009     m_CheckSrvs.clear();
3010     m_SrvsIndex = 0;
3011     m_ChunkLen = 0;
3012     m_LastPeerError.clear();
3013 
3014     CSrvTime cmd_len = CSrvTime::Current();
3015     cmd_len -= m_CmdStartTime;
3016     Uint8 len_usec = cmd_len.AsUSec();
3017     if (IsLongCommand(len_usec)) {
3018         x_LogCmdLog();
3019     }
3020 
3021     if (print_size  &&  (x_IsCmdSucceeded(cmd_status)  ||  m_BlobSize != 0))
3022         CSrvDiagMsg().PrintExtra().PrintParam("blob_size", m_BlobSize);
3023     CSrvDiagMsg().StopRequest();
3024 
3025 //    CSrvTime cmd_len = CSrvTime::Current();
3026 //    cmd_len -= m_CmdStartTime;
3027 //    Uint8 len_usec = cmd_len.AsUSec();
3028     CNCStat::CmdFinished(m_ParsedCmd.command->cmd, len_usec, cmd_status);
3029     if (m_Flags & fComesFromClient) {
3030         if (access_type == eNCCreate) {
3031             if (print_size  &&  x_IsCmdSucceeded(cmd_status))
3032                 CNCStat::ClientBlobWrite(m_BlobSize, len_usec);
3033             else
3034                 CNCStat::ClientBlobRollback(written_size);
3035         }
3036         else if (access_type == eNCReadData) {
3037             if (print_size)
3038                 CNCStat::ClientBlobRead(m_BlobSize, len_usec);
3039         }
3040     }
3041     ++m_CntCmds;
3042 
3043     if (x_IsFlagSet(fNeedsLowerPriority))
3044         SetPriority(GetDefaultTaskPriority());
3045 
3046     m_SendBuff.reset();
3047     ReleaseDiagCtx();
3048 
3049     m_PosponedCmd.clear();
3050 }
3051 
3052 CNCMessageHandler::State
x_FinishCommand(void)3053 CNCMessageHandler::x_FinishCommand(void)
3054 {
3055     LOG_CURRENT_FUNCTION
3056     int status = GetDiagCtx()->GetRequestStatus();
3057     if (x_IsFlagSet(eBlobPut) && m_NCBlobKey.IsICacheKey()) {
3058         x_JournalBlobPutResult(status, m_NCBlobKey.PackedKey(), m_BlobSlot);
3059     }
3060     if (x_IsFlagSet(fCursedPUT2Cmd))
3061         return &CNCMessageHandler::x_CloseCmdAndConn;
3062 
3063     x_CleanCmdResources();
3064     SetState(&CNCMessageHandler::x_ReadCommand);
3065     SetRunnable();
3066     return NULL;
3067 }
3068 
3069 CNCMessageHandler::State
x_StartReadingBlob(void)3070 CNCMessageHandler::x_StartReadingBlob(void)
3071 {
3072     LOG_CURRENT_FUNCTION
3073     // Flushing the initial response line that client should receive before it
3074     // will start writing blob data.
3075     Flush();
3076     m_BlobSize = 0;
3077     if (NeedEarlyClose())
3078         return &CNCMessageHandler::x_FinishCommand;
3079     else
3080         return &CNCMessageHandler::x_ReadBlobSignature;
3081 }
3082 
3083 CNCMessageHandler::State
x_FinishReadingBlob(void)3084 CNCMessageHandler::x_FinishReadingBlob(void)
3085 {
3086     LOG_CURRENT_FUNCTION
3087     x_LogCmdEvent("FinishReadingBlob");
3088     bool fail = false, keep_conn = !x_IsFlagSet(fNoReplyOnFinish);
3089     string errmsg;
3090     if (x_IsFlagSet(fReadExactBlobSize)  &&  m_BlobSize != m_Size) {
3091         fail = true;
3092         x_ReportError(eStatus_CondFailed);
3093         SRV_LOG(Error, "Wrong data for blob size " << m_Size
3094                         << " (received " << m_BlobSize << " bytes)");
3095     }
3096     else if (m_BlobSize > CNCBlobStorage::GetMaxBlobSizeStore()) {
3097         fail = true;
3098         x_ReportError(eStatus_BlobTooBig);
3099         SRV_LOG(Error, "Blob size exceeds the allowed maximum of "
3100                         << CNCBlobStorage::GetMaxBlobSizeStore()
3101                         << " (received " << m_BlobSize << " bytes)");
3102     }
3103     if (fail) {
3104         if (keep_conn) {
3105             return &CNCMessageHandler::x_FinishCommand;
3106         } else {
3107             return &CNCMessageHandler::x_CloseCmdAndConn;
3108         }
3109     }
3110 
3111     if (!x_IsCmdSucceeded(GetDiagCtx()->GetRequestStatus()))
3112         return &CNCMessageHandler::x_FinishCommand;
3113     if (NeedEarlyClose())
3114         return &CNCMessageHandler::x_CloseCmdAndConn;
3115 
3116     // Fill all new event data but not add it to CNCSyncLog until we execute
3117     // m_BlobAccess->Finalize().
3118     SNCSyncEvent* write_event = new SNCSyncEvent();
3119     write_event->blob_size = m_BlobSize;
3120     write_event->event_type = eSyncWrite;
3121     write_event->key = m_NCBlobKey;
3122     if (x_IsFlagSet(fCopyLogEvent)) {
3123         write_event->orig_time = m_BlobAccess->GetNewBlobCreateTime();
3124         write_event->orig_server = m_BlobAccess->GetNewCreateServer();
3125         write_event->orig_rec_no = m_OrigRecNo;
3126     }
3127     else {
3128         CSrvTime cur_srv_time = CSrvTime::Current();
3129         Uint8 cur_time = cur_srv_time.AsUSec();
3130         int cur_secs = int(cur_srv_time.Sec());
3131         m_BlobAccess->SetBlobCreateTime(cur_time);
3132         if (x_IsUserFlagSet(fNoProlong) && m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired()) {
3133             m_BlobAccess->SetNewBlobExpire(m_BlobAccess->GetCurBlobExpire());
3134             m_BlobAccess->SetNewVerExpire(m_BlobAccess->GetCurVerExpire());
3135         } else {
3136             if (m_BlobAccess->GetNewBlobExpire() == 0) {
3137                 m_BlobAccess->SetNewBlobExpire(cur_secs + m_BlobAccess->GetNewBlobTTL());
3138             }
3139             m_BlobAccess->SetNewVerExpire(cur_secs + m_BlobAccess->GetNewVersionTTL());
3140         }
3141         m_BlobAccess->SetCreateServer(CNCDistributionConf::GetSelfID(),
3142                                       CNCBlobStorage::GetNewBlobId());
3143         write_event->orig_server = CNCDistributionConf::GetSelfID();
3144         write_event->orig_time = cur_time;
3145     }
3146 
3147     x_LogCmdEvent("Finalize");
3148     m_BlobAccess->Finalize();
3149     if (m_BlobAccess->HasError()) {
3150         delete write_event;
3151         GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3152         if (x_IsFlagSet(fNoReplyOnFinish))
3153             return &CNCMessageHandler::x_CloseCmdAndConn;
3154 
3155         x_ReportError("ERR:Error while reading blob");
3156         return &CNCMessageHandler::x_FinishCommand;
3157     }
3158 
3159     m_MirrorsDone.clear();
3160     if (!x_IsFlagSet(fCopyLogEvent)) {
3161         if (m_BlobAccess->GetNewBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
3162 #if 0
3163             if (m_BlobAccess->IsCurBlobExpired()) {
3164                 delete write_event;
3165                 CNCPeerControl::MirrorRemove(m_NCBlobKey, m_BlobSlot, m_BlobAccess->GetNewBlobCreateTime()-1);
3166                 return &CNCMessageHandler::x_FinishCommand;
3167             }
3168 #endif
3169             // If fCopyLogEvent is not set then this blob comes from client and
3170             // thus we need to check quorum value before answering to client.
3171             // If fCopyLogEvent is set then this write comes from other server
3172             // and we don't care about quorum in this case.
3173             if (m_Quorum != 1) {
3174                 if (m_Quorum != 0)
3175                     --m_Quorum;
3176                 x_GetCurSlotServers();
3177                 m_write_event = write_event;
3178 // probably, this was made intentionally, but now it looks wrong
3179 // let us respect quorum setting always
3180 //                if (!m_ThisServerIsMain  ||  !m_AppSetup->fast_on_main)
3181                     return &CNCMessageHandler::x_PutToNextPeer;
3182             }
3183             m_OrigRecNo = CNCSyncLog::AddEvent(m_BlobSlot, write_event);
3184             CNCPeerControl::MirrorWrite(m_NCBlobKey, m_BlobSlot,
3185                                         m_OrigRecNo, m_BlobAccess->GetNewBlobSize(), m_MirrorsDone);
3186         } else {
3187             if (CNCDistributionConf::GetWarnBlobSizeSync()) {
3188                 SRV_LOG(Warning, "Received blob is too big and will not be mirrored:"
3189                     << " blob key:"     << m_NCBlobKey.RawKey()
3190                     << " blob size: "   << m_BlobAccess->GetNewBlobSize()
3191                     << " max allowed: " << CNCDistributionConf::GetMaxBlobSizeSync());
3192             }
3193             if (CNCDistributionConf::IsThisServerKey(m_NCBlobKey)) {
3194                 CNCPeerControl::MirrorRemove(m_NCBlobKey, m_BlobSlot, m_BlobAccess->GetNewBlobCreateTime()-1);
3195             } else {
3196                 Uint8 srvId = CNCDistributionConf::GetMainSrvId(m_NCBlobKey);
3197                 if (srvId != 0) {
3198                     m_CheckSrvs.push_back(srvId);
3199                     m_SrvsIndex = 0;
3200                     return &CNCMessageHandler::x_PutToNextPeer;
3201                 }
3202             }
3203             delete write_event;
3204         }
3205     }
3206     else if (m_OrigRecNo != 0) {
3207         CNCSyncLog::AddEvent(m_BlobSlot, write_event);
3208     }
3209     else {
3210         // m_OrigRecNo can be 0 if blob comes from another server as a result
3211         // of synchronization by blob lists. In this case there's no event to
3212         // link to and thus we don't need to add event to our sync log.
3213         delete write_event;
3214     }
3215 
3216     return &CNCMessageHandler::x_FinishCommand;
3217 }
3218 
3219 CNCMessageHandler::State
x_CloseOnPeerError(void)3220 CNCMessageHandler::x_CloseOnPeerError(void)
3221 {
3222     LOG_CURRENT_FUNCTION
3223     SRV_LOG(Warning, "Error executing command on peer "
3224         << m_ActiveHub->GetFullPeerName() << ", peer says: "
3225         << m_ActiveHub->GetErrMsg());
3226     GetDiagCtx()->SetRequestStatus(eStatus_PeerError);
3227     return &CNCMessageHandler::x_CloseCmdAndConn;
3228 }
3229 
3230 CNCMessageHandler::State
x_ReadBlobSignature(void)3231 CNCMessageHandler::x_ReadBlobSignature(void)
3232 {
3233     LOG_CURRENT_FUNCTION
3234     if (x_IsHttpMode()) {
3235         return &CNCMessageHandler::x_ReadBlobChunkLength;
3236     }
3237 
3238     Uint4 sig = 0;
3239     bool has_sig = ReadNumber(&sig);
3240     if (NeedEarlyClose())
3241         return &CNCMessageHandler::x_CloseCmdAndConn;
3242     if (!has_sig)
3243         return NULL;
3244 
3245     x_LogCmdEvent("ReadBlobSignature");
3246     if (sig == 0x04030201) {
3247         x_SetFlag(fSwapLengthBytes);
3248         return &CNCMessageHandler::x_ReadBlobChunkLength;
3249     }
3250     if (sig == 0x01020304) {
3251         x_UnsetFlag(fSwapLengthBytes);
3252         return &CNCMessageHandler::x_ReadBlobChunkLength;
3253     }
3254 
3255     GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
3256     SRV_LOG(Error, "Cannot determine the byte order. Got: "
3257                    << NStr::UIntToString(sig, 0, 16));
3258     return &CNCMessageHandler::x_CloseCmdAndConn;
3259 }
3260 
3261 CNCMessageHandler::State
x_ReadBlobChunkLength(void)3262 CNCMessageHandler::x_ReadBlobChunkLength(void)
3263 {
3264     LOG_CURRENT_FUNCTION
3265     if (m_ActiveHub) {
3266         if (ProxyHadError()) {
3267             // If we were proxying blob data from client to another server and some
3268             // error occurred protocol doesn't allow us to anything else but
3269             // close both connections - to client and to other NC server.
3270             CSrvSocketTask* active_sock = m_ActiveHub->GetHandler()->GetSocket();
3271             if (active_sock  &&  active_sock->HasError())
3272                 return &CNCMessageHandler::x_CloseOnPeerError;
3273             else
3274                 return &CNCMessageHandler::x_CloseCmdAndConn;
3275         }
3276         ENCClientHubStatus status = m_ActiveHub->GetStatus();
3277         if (status == eNCHubError)
3278             return &CNCMessageHandler::x_CloseOnPeerError;
3279         else if (status != eNCHubCmdInProgress) {
3280             SRV_FATAL("Unexpected client status: " << status);
3281         }
3282 
3283         if (m_ChunkLen != 0) {
3284             CNCStat::ClientDataWrite(m_ChunkLen);
3285             CNCStat::PeerDataRead(m_ChunkLen);
3286         }
3287     }
3288 
3289     bool has_chunklen = true;
3290     if (x_IsFlagSet(fSkipBlobEOF)  &&  m_BlobSize == m_Size) {
3291         // Workaround for old STRS
3292         m_ChunkLen = 0xFFFFFFFF;
3293     }
3294     else if (x_IsHttpMode()) {
3295         m_ChunkLen = m_Size;
3296         has_chunklen = m_ChunkLen != 0;
3297     }
3298     else {
3299         has_chunklen = ReadNumber(&m_ChunkLen);
3300         if (!has_chunklen  &&  !CanHaveMoreRead()  &&  x_IsFlagSet(fCursedPUT2Cmd)) {
3301             return &CNCMessageHandler::x_FinishReadingBlob;
3302         }
3303     }
3304 
3305     if (NeedEarlyClose())
3306         return &CNCMessageHandler::x_CloseCmdAndConn;
3307     if (!has_chunklen)
3308         return NULL;
3309 
3310     if (x_IsFlagSet(fSwapLengthBytes))
3311         m_ChunkLen = CByteSwap::GetInt4((const unsigned char*)&m_ChunkLen);
3312     if (m_ChunkLen == 0xFFFFFFFF) {
3313         if (m_ActiveHub) {
3314             // Data transfer is finished, CNCActiveHandler will wait for response
3315             // from other NC server and wake us up.
3316             m_ActiveHub->GetHandler()->SetRunnable();
3317             return &CNCMessageHandler::x_WaitForPeerAnswer;
3318         }
3319         return &CNCMessageHandler::x_FinishReadingBlob;
3320     }
3321 
3322     if (!m_BlobAccess  &&  !m_ActiveHub) {
3323         // We can be here only when expecting fake start of blob writing from old
3324         // NC server, but for some reason we got non-EOF chunk length.
3325         GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
3326         SRV_LOG(Critical, "Received non-EOF chunk len from peer "
3327             << m_SrvId << ": " << m_ChunkLen);
3328         return &CNCMessageHandler::x_CloseCmdAndConn;
3329     }
3330 
3331     if (x_IsFlagSet(fComesFromClient)) {
3332         if (m_BlobSize > CNCBlobStorage::GetMaxBlobSizeStore()) {
3333             RunAfter(1);
3334             return NULL;
3335         }
3336 // this is potentially dangerous, because
3337 // if we close connection here, in the middle of transmission,
3338 // client might have no chance (most likely) to receive our error message
3339 // and might want to retry thinking it was bad luck
3340         if (x_IsFlagSet(fReadExactBlobSize)  &&  (m_BlobSize + m_ChunkLen) > m_Size) {
3341             x_ReportError(eStatus_CondFailed);
3342             SRV_LOG(Error, "Too much data for blob size " << m_Size
3343                             << " (received at least "
3344                             << (m_BlobSize + m_ChunkLen) << " bytes)");
3345             return &CNCMessageHandler::x_CloseCmdAndConn;
3346         }
3347         if ((m_BlobSize + m_ChunkLen) > CNCBlobStorage::GetMaxBlobSizeStore()) {
3348             x_ReportError(eStatus_BlobTooBig);
3349             if (x_IsHttpMode()) {
3350                 x_WriteHttpHeader(eStatus_BlobTooBig, 0, false);
3351             }
3352             SRV_LOG(Error, "Blob size exceeds the allowed maximum of "
3353                             << CNCBlobStorage::GetMaxBlobSizeStore()
3354                             << " (received " << m_BlobSize
3355                             << ", next chunk " << m_ChunkLen << " bytes)");
3356             // I am not going to read it anyway
3357             m_BlobSize += m_ChunkLen;
3358             Flush();
3359             RunAfter(1);
3360             return NULL;
3361 //            return &CNCMessageHandler::x_CloseCmdAndConn;
3362         }
3363     }
3364 
3365     if (m_ActiveHub) {
3366         CSrvSocketTask* active_sock = m_ActiveHub->GetHandler()->GetSocket();
3367         active_sock->WriteData(&m_ChunkLen, sizeof(m_ChunkLen));
3368         m_ActiveHub->GetHandler()->AddSizeWr(m_ChunkLen);
3369         StartProxyTo(active_sock, m_ChunkLen);
3370         if (IsProxyInProgress())
3371             return NULL;
3372         else
3373             return &CNCMessageHandler::x_ReadBlobChunkLength;
3374     }
3375 
3376     return &CNCMessageHandler::x_ReadBlobChunk;
3377 }
3378 
3379 CNCMessageHandler::State
x_ReadBlobChunk(void)3380 CNCMessageHandler::x_ReadBlobChunk(void)
3381 {
3382     LOG_CURRENT_FUNCTION
3383 // there are too many of them
3384 //    x_LogCmdEvent("ReadBlobChunk");
3385     while (m_ChunkLen != 0) {
3386         Uint4 read_len = Uint4(m_BlobAccess->GetWriteMemSize());
3387         if (m_BlobAccess->HasError()) {
3388             GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3389             if (!x_IsFlagSet(fNoReplyOnFinish) && !x_IsHttpMode()) {
3390                 x_ReportError("ERR:Server error");
3391                 Flush();
3392             }
3393             SetState(&CNCMessageHandler::x_CloseCmdAndConn);
3394             SetRunnable();
3395             return NULL;
3396         }
3397         if (read_len == 0)
3398             return NULL;
3399         if (read_len > m_ChunkLen)
3400             read_len = m_ChunkLen;
3401 
3402         Uint4 n_read = Uint4(Read(m_BlobAccess->GetWriteMemPtr(), read_len));
3403         if (n_read != 0) {
3404             if (m_Flags & fComesFromClient)
3405                 CNCStat::ClientDataWrite(n_read);
3406             else
3407                 CNCStat::PeerDataWrite(n_read);
3408         }
3409         if (NeedEarlyClose())
3410             return &CNCMessageHandler::x_CloseCmdAndConn;
3411         if (n_read == 0)
3412             return NULL;
3413 
3414         m_BlobAccess->MoveWritePos(n_read);
3415         m_ChunkLen -= n_read;
3416         m_BlobSize += n_read;
3417     }
3418     return &CNCMessageHandler::x_ReadBlobChunkLength;
3419 }
3420 
3421 CNCMessageHandler::State
x_WriteBlobData(void)3422 CNCMessageHandler::x_WriteBlobData(void)
3423 {
3424     LOG_CURRENT_FUNCTION
3425     x_LogCmdEvent("WriteBlobData");
3426     while (m_Size != 0) {
3427         if (m_BlobAccess->GetPosition() == m_BlobAccess->GetCurBlobSize())
3428             return &CNCMessageHandler::x_FinishCommand;
3429 
3430         Uint4 want_read = m_BlobAccess->GetReadMemSize();
3431         if (m_BlobAccess->HasError()) {
3432             GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3433             return &CNCMessageHandler::x_CloseCmdAndConn;
3434         }
3435         if (m_Size != Uint8(-1)  &&  m_Size < want_read)
3436             want_read = Uint4(m_Size);
3437 
3438         Uint4 n_written = Uint4(Write(m_BlobAccess->GetReadMemPtr(), want_read));
3439 //        x_LogCmdEvent("Write");
3440         if (n_written != 0) {
3441             if (m_Flags & fComesFromClient)
3442                 CNCStat::ClientDataRead(n_written);
3443             else
3444                 CNCStat::PeerDataRead(n_written);
3445             m_BlobAccess->MoveReadPos(n_written);
3446             if (m_Size != Uint8(-1))
3447                 m_Size -= n_written;
3448         }
3449         if (NeedEarlyClose())
3450             return &CNCMessageHandler::x_CloseCmdAndConn;
3451         if (n_written == 0)
3452             return NULL;
3453     }
3454     return &CNCMessageHandler::x_FinishCommand;
3455 }
3456 
3457 CNCMessageHandler::State
x_WriteSendBuff(void)3458 CNCMessageHandler::x_WriteSendBuff(void)
3459 {
3460     LOG_CURRENT_FUNCTION
3461     while (m_SendPos != m_SendBuff->size()) {
3462         size_t n_written = Write(m_SendBuff->data() + m_SendPos,
3463                                  m_SendBuff->size() - m_SendPos);
3464 
3465         if (NeedEarlyClose())
3466             return &CNCMessageHandler::x_CloseCmdAndConn;
3467         if (n_written == 0)
3468             return NULL;
3469 
3470         m_SendPos += n_written;
3471     }
3472     if (strcmp(m_ParsedCmd.command->cmd, "SYNC_START") == 0) {
3473         return &CNCMessageHandler::x_WriteSyncStartExtra;
3474     }
3475     ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
3476     if (proxy_cmd == eProxyGetBList2) {
3477         Flush();
3478         return &CNCMessageHandler::x_DoCmd_GetBListNext;
3479     }
3480     return &CNCMessageHandler::x_FinishCommand;
3481 }
3482 
3483 CNCMessageHandler::State
x_WriteSyncStartExtra(void)3484 CNCMessageHandler::x_WriteSyncStartExtra(void)
3485 {
3486     LOG_CURRENT_FUNCTION
3487     WriteText("PURGE:\n");
3488     WriteText(CNCBlobAccessor::GetPurgeData()). WriteText(";\n");
3489     return &CNCMessageHandler::x_FinishCommand;
3490 }
3491 
3492 CNCMessageHandler::State
x_ProxyToNextPeer(void)3493 CNCMessageHandler::x_ProxyToNextPeer(void)
3494 {
3495     LOG_CURRENT_FUNCTION
3496     if (NeedEarlyClose())
3497         return &CNCMessageHandler::x_CloseCmdAndConn;
3498     x_LogCmdEvent("ProxyToNextPeer");
3499     if (m_SrvsIndex < m_CheckSrvs.size()) {
3500         Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3501         if (m_ActiveHub) {
3502             SRV_FATAL("Previous client not released");
3503         }
3504         m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3505         return &CNCMessageHandler::x_SendCmdAsProxy;
3506     }
3507 
3508     // Either there's no servers to execute this command on or all servers were
3509     // tried and some error was the result from all of them.
3510     SRV_LOG(Warning, "Got error on all peer servers, LastPeerError is " << m_LastPeerError);
3511     if (m_LastPeerError.empty())
3512         m_LastPeerError = "ERR:Cannot execute command on peer servers";
3513     GetDiagCtx()->SetRequestStatus( GetStatusByMessage(m_LastPeerError, eStatus_PeerError));
3514     x_ReportError(m_LastPeerError);
3515     return &CNCMessageHandler::x_FinishCommand;
3516 }
3517 
3518 CNCMessageHandler::State
x_SendCmdAsProxy(void)3519 CNCMessageHandler::x_SendCmdAsProxy(void)
3520 {
3521     LOG_CURRENT_FUNCTION
3522     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3523     if (status == eNCHubWaitForConn)
3524         return NULL;
3525     x_LogCmdEvent("SendCmdAsProxy");
3526     ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
3527     CNCActiveHandler* pHandler = m_ActiveHub->GetHandler();
3528     if (status == eNCHubError ||
3529         status == eNCHubSuccess ||
3530         (proxy_cmd == eProxyGetBList && !pHandler->GetPeer()->AcceptsBList()) ||
3531         (proxy_cmd == eProxyGetBList2 && !pHandler->GetPeer()->AcceptsBList2()) ||
3532         !pHandler->GetPeer()->AcceptsBlobKey(m_NCBlobKey)
3533        ) {
3534         m_LastPeerError = m_ActiveHub->GetErrMsg();
3535         m_ActiveHub->Release();
3536         m_ActiveHub = NULL;
3537         return &CNCMessageHandler::x_ProxyToNextPeer;
3538     }
3539     if (status != eNCHubConnReady) {
3540         SRV_FATAL("Unexpected client status: " << status);
3541     }
3542     if (NeedEarlyClose())
3543         return &CNCMessageHandler::x_CloseCmdAndConn;
3544 
3545 
3546     switch (proxy_cmd) {
3547     case eProxyRead:
3548         pHandler->ProxyRead(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3549                                              m_BlobVersion, m_StartPos, m_Size,
3550                                              m_Quorum, m_SearchOnRead, m_ForceLocal,
3551                                              m_AgeMax);
3552         break;
3553     case eProxyWrite:
3554         pHandler->ProxyWrite(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3555                                               m_BlobVersion, m_BlobTTL, m_Quorum, m_UserFlags);
3556         // The only place that needs to go further to a different state.
3557         return &CNCMessageHandler::x_WriteInitWriteResponse;
3558     case eProxyHasBlob:
3559         pHandler->ProxyHasBlob(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3560                                                 m_Quorum);
3561         break;
3562     case eProxyGetSize:
3563         pHandler->ProxyGetSize(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3564                                                 m_BlobVersion, m_Quorum,
3565                                                 m_SearchOnRead, m_ForceLocal);
3566         break;
3567     case eProxyReadLast:
3568         pHandler->ProxyReadLast(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3569                                                  m_StartPos, m_Size, m_Quorum,
3570                                                  m_SearchOnRead, m_ForceLocal, m_AgeMax);
3571         break;
3572     case eProxySetValid:
3573         pHandler->ProxySetValid(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3574                                                  m_BlobVersion);
3575         break;
3576     case eProxyRemove:
3577         pHandler->ProxyRemove(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3578                                                m_BlobVersion, m_Quorum);
3579         break;
3580     case eProxyGetMeta:
3581         pHandler->ProxyGetMeta(GetDiagCtx(), m_NCBlobKey,
3582                                                 m_Quorum, m_ForceLocal, m_HttpMode);
3583         break;
3584     case eProxyProlong:
3585         pHandler->ProxyProlong(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3586                                                 m_BlobTTL, m_Quorum,
3587                                                 m_SearchOnRead, m_ForceLocal);
3588         break;
3589     case eProxyGetBList:
3590         pHandler->ProxyBList(GetDiagCtx(), m_NCBlobKey, m_ForceLocal, nullptr);
3591         break;
3592     case eProxyGetBList2:
3593         pHandler->ProxyBList(GetDiagCtx(), m_NCBlobKey, m_ForceLocal, m_BlobFilter);
3594         break;
3595     default:
3596         SRV_FATAL("Unsupported command: " << m_ParsedCmd.command->extra.proxy_cmd);
3597     }
3598 
3599     return &CNCMessageHandler::x_WaitForPeerAnswer;
3600 }
3601 
3602 CNCMessageHandler::State
x_WaitForPeerAnswer(void)3603 CNCMessageHandler::x_WaitForPeerAnswer(void)
3604 {
3605     LOG_CURRENT_FUNCTION
3606     if (NeedEarlyClose())
3607         return &CNCMessageHandler::x_CloseCmdAndConn;
3608     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3609     if (status == eNCHubCmdInProgress)
3610         return NULL;
3611 
3612     x_LogCmdEvent("WaitForPeerAnswer");
3613     if (status == eNCHubError) {
3614         if (m_ActiveHub->GetHandler()->GotClientResponse())
3615             return &CNCMessageHandler::x_CloseOnPeerError;
3616 
3617         m_LastPeerError = m_ActiveHub->GetErrMsg();
3618         SRV_LOG(Warning, "Error executing command on peer "
3619             << m_ActiveHub->GetFullPeerName() << ", peer says: "  << m_LastPeerError);
3620         m_ActiveHub->Release();
3621         m_ActiveHub = NULL;
3622         return &CNCMessageHandler::x_ProxyToNextPeer;
3623     }
3624     if (status != eNCHubSuccess) {
3625         SRV_FATAL("Unexpected client status: " << status);
3626     }
3627     ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
3628     if (proxy_cmd == eProxyGetBList2) {
3629         return &CNCMessageHandler::x_DoCmd_GetBListNext;
3630     }
3631 
3632     const string& err_msg = m_ActiveHub->GetErrMsg();
3633     EHTTPStatus rst = GetStatusByMessage(err_msg, eStatus_OK);
3634     if (rst != eStatus_OK) {
3635         GetDiagCtx()->SetRequestStatus(rst);
3636     }
3637     if (!err_msg.empty()) {
3638     	x_ReportError(err_msg);
3639     }
3640     return &CNCMessageHandler::x_FinishCommand;
3641 }
3642 
3643 CNCMessageHandler::State
x_ReadMetaNextPeer(void)3644 CNCMessageHandler::x_ReadMetaNextPeer(void)
3645 {
3646     LOG_CURRENT_FUNCTION
3647     if (NeedEarlyClose())
3648         return &CNCMessageHandler::x_CloseCmdAndConn;
3649     if (m_SrvsIndex >= m_CheckSrvs.size())
3650         return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
3651 
3652     Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3653     if (m_ActiveHub) {
3654         SRV_FATAL("Previous client not released");
3655     }
3656     m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3657     return &CNCMessageHandler::x_SendGetMetaCmd;
3658 }
3659 
3660 CNCMessageHandler::State
x_SendGetMetaCmd(void)3661 CNCMessageHandler::x_SendGetMetaCmd(void)
3662 {
3663     LOG_CURRENT_FUNCTION
3664     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3665     if (status == eNCHubWaitForConn)
3666         return NULL;
3667     if (status == eNCHubError || status == eNCHubSuccess ||
3668         !m_ActiveHub->GetHandler()->GetPeer()->AcceptsBlobKey(m_NCBlobKey)) {
3669         m_LastPeerError = m_ActiveHub->GetErrMsg();
3670         m_ActiveHub->Release();
3671         m_ActiveHub = NULL;
3672         return &CNCMessageHandler::x_ReadMetaNextPeer;
3673     }
3674     if (status != eNCHubConnReady) {
3675         SRV_FATAL("Unexpected client status: " << status);
3676     }
3677     if (NeedEarlyClose())
3678         return &CNCMessageHandler::x_CloseCmdAndConn;
3679 
3680     m_ActiveHub->GetHandler()->SearchMeta(GetDiagCtx(), m_NCBlobKey);
3681     return &CNCMessageHandler::x_ReadMetaResults;
3682 }
3683 
3684 CNCMessageHandler::State
x_ReadMetaResults(void)3685 CNCMessageHandler::x_ReadMetaResults(void)
3686 {
3687     LOG_CURRENT_FUNCTION
3688     if (NeedEarlyClose())
3689         return &CNCMessageHandler::x_CloseCmdAndConn;
3690     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3691     if (status == eNCHubCmdInProgress)
3692         return NULL;
3693     if (status == eNCHubError)
3694         goto results_processed;
3695     if (status != eNCHubSuccess) {
3696         SRV_FATAL("Unexpected client status: " << status);
3697     }
3698 
3699     CNCActiveHandler* handler;
3700     handler = m_ActiveHub->GetHandler();
3701     const SNCBlobSummary* cur_blob_sum;
3702     cur_blob_sum = &handler->GetBlobSummary();
3703     bool cur_exist;
3704     cur_exist = handler->IsBlobExists();
3705     if (!cur_exist  &&  !x_IsFlagSet(fPeerFindExistsOnly))
3706         goto results_processed;
3707 
3708     if (cur_exist  &&  x_IsFlagSet(fPeerFindExistsOnly)) {
3709         m_LatestExist = true;
3710         m_LatestSrvId = m_CheckSrvs[m_SrvsIndex - 1];
3711         goto meta_search_finished;
3712     }
3713     if (cur_exist  &&  (!m_LatestExist  ||  m_LatestBlobSum->isOlder(*cur_blob_sum)))
3714     {
3715         m_LatestExist = true;
3716         m_LatestSrvId = m_CheckSrvs[m_SrvsIndex - 1];
3717         *m_LatestBlobSum = *cur_blob_sum;
3718     }
3719     if (cur_blob_sum->size > CNCDistributionConf::GetMaxBlobSizeSync()) {
3720         m_Quorum = 1;
3721     }
3722     if (m_Quorum == 1)
3723         goto meta_search_finished;
3724     if (m_Quorum != 0)
3725         --m_Quorum;
3726 
3727 results_processed:
3728 
3729     m_LastPeerError = m_ActiveHub->GetErrMsg();
3730     m_ActiveHub->Release();
3731     m_ActiveHub = NULL;
3732     return &CNCMessageHandler::x_ReadMetaNextPeer;
3733 
3734 meta_search_finished:
3735 
3736     m_LastPeerError = m_ActiveHub->GetErrMsg();
3737     m_ActiveHub->Release();
3738     m_ActiveHub = NULL;
3739     m_CheckSrvs.clear();
3740     m_SrvsIndex = 0;
3741     return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
3742 }
3743 
3744 CNCMessageHandler::State
x_ExecuteOnLatestSrvId(void)3745 CNCMessageHandler::x_ExecuteOnLatestSrvId(void)
3746 {
3747     LOG_CURRENT_FUNCTION
3748     x_LogCmdEvent("ExecuteOnLatestSrvId");
3749     if (m_LatestSrvId == 0) {
3750         m_LatestSrvId = CNCDistributionConf::GetSelfID();
3751     }
3752     if (m_LatestExist) {
3753         // if max age specified, check age
3754         if (m_AgeMax != 0 && m_BlobAccess->IsBlobExists()) {
3755             CSrvTime cur_srv_time = CSrvTime::Current();
3756             unsigned int vttl = m_BlobAccess->GetCurVersionTTL();
3757             Uint8 creation = vttl != 0 ?
3758 // see x_ProlongVersionLife
3759                 (m_LatestBlobSum->ver_expire - vttl) :
3760                 (m_LatestBlobSum->create_time / kUSecsPerSecond);
3761             m_AgeCur =  Uint8(cur_srv_time.Sec()) - creation;
3762             if (m_AgeCur > m_AgeMax) {
3763                 return &CNCMessageHandler::x_ReportBlobNotFound;
3764             }
3765         }
3766     }
3767     if (m_BlobAccess->IsPurged(m_NCBlobKey)) {
3768         if (x_IsFlagSet(fPeerFindExistsOnly)) {
3769             m_LatestExist = false;
3770             return m_CmdProcessor;
3771         }
3772 #if 0
3773         return &CNCMessageHandler::x_ReportBlobNotFound;
3774 #else
3775         x_ReportError( GetMessageByStatus(eStatus_NotFound));
3776         return &CNCMessageHandler::x_DoCmd_Remove;
3777 #endif
3778     }
3779     if (x_IsFlagSet(fPeerFindExistsOnly) && m_LatestSrvId == CNCDistributionConf::GetSelfID()) {
3780         return m_CmdProcessor;
3781     }
3782     if (m_LatestSrvId == CNCDistributionConf::GetSelfID()) {
3783         if (m_LatestExist  && m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired()) {
3784             return m_CmdProcessor;
3785         } else {
3786             return &CNCMessageHandler::x_ReportBlobNotFound;
3787         }
3788     }
3789 
3790     CSrvDiagMsg().PrintExtra().PrintParam("proxy", "1");
3791     // Changing parameters that will go to other server: that server have to
3792     // execute command locally, without quorum (quorum equal to 1), and without
3793     // searching on other servers.
3794     m_Quorum = 1;
3795     m_SearchOnRead = false;
3796     m_ForceLocal = true;
3797     m_CheckSrvs.push_back(m_LatestSrvId);
3798     x_LogCmdEvent("ProxyToNextPeer");
3799     return &CNCMessageHandler::x_ProxyToNextPeer;
3800 }
3801 
3802 CNCMessageHandler::State
x_PutToNextPeer(void)3803 CNCMessageHandler::x_PutToNextPeer(void)
3804 {
3805     LOG_CURRENT_FUNCTION
3806     x_LogCmdEvent("PutToNextPeer");
3807     if (m_SrvsIndex >= m_CheckSrvs.size()  ||  NeedEarlyClose())
3808         return &CNCMessageHandler::x_FinishCommand;
3809 
3810     Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3811     if (m_ActiveHub) {
3812         SRV_FATAL("Previous client not released");
3813     }
3814     if (CNCDistributionConf::GetSelfTrustLevel() < CNCPeerControl::Peer(srv_id)->GetTrustLevel()) {
3815         return &CNCMessageHandler::x_PutToNextPeer;
3816     }
3817     m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3818     return &CNCMessageHandler::x_SendPutToPeerCmd;
3819 }
3820 
3821 CNCMessageHandler::State
x_SendPutToPeerCmd(void)3822 CNCMessageHandler::x_SendPutToPeerCmd(void)
3823 {
3824     LOG_CURRENT_FUNCTION
3825     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3826     if (status == eNCHubWaitForConn)
3827         return NULL;
3828     if (status == eNCHubError || status == eNCHubSuccess ||
3829         !m_ActiveHub->GetHandler()->GetPeer()->AcceptsBlobKey(m_NCBlobKey)) {
3830         m_MirrorsDone.push_back(m_CheckSrvs[m_SrvsIndex-1]);
3831         m_LastPeerError = m_ActiveHub->GetErrMsg();
3832         m_ActiveHub->Release();
3833         m_ActiveHub = NULL;
3834         return &CNCMessageHandler::x_PutToNextPeer;
3835     }
3836     if (status != eNCHubConnReady) {
3837         SRV_FATAL("Unexpected client status: " << status);
3838     }
3839     if (NeedEarlyClose())
3840         return &CNCMessageHandler::x_FinishCommand;
3841 
3842     m_ActiveHub->GetHandler()->CopyPut(GetDiagCtx(), m_NCBlobKey, m_BlobSlot, m_OrigRecNo);
3843     return &CNCMessageHandler::x_ReadPutResults;
3844 }
3845 
3846 CNCMessageHandler::State
x_ReadPutResults(void)3847 CNCMessageHandler::x_ReadPutResults(void)
3848 {
3849     LOG_CURRENT_FUNCTION
3850     x_LogCmdEvent("ReadPutResults");
3851     if (NeedEarlyClose())
3852         return &CNCMessageHandler::x_FinishCommand;
3853     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3854     if (status == eNCHubCmdInProgress)
3855         return NULL;
3856     if (status == eNCHubError)
3857         goto results_processed;
3858     if (status != eNCHubSuccess) {
3859         SRV_FATAL("Unexpected client status: " << status);
3860     }
3861 
3862     m_MirrorsDone.push_back(m_CheckSrvs[m_SrvsIndex-1]);
3863     if (m_Quorum == 1) {
3864         if (m_BlobAccess->GetNewBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
3865             if (m_write_event) {
3866                 m_OrigRecNo = CNCSyncLog::AddEvent(m_BlobSlot, m_write_event);
3867                 m_write_event = NULL;
3868             }
3869             CNCPeerControl::MirrorWrite(m_NCBlobKey, m_BlobSlot,
3870                                         m_OrigRecNo, m_BlobAccess->GetNewBlobSize(),
3871                                         m_MirrorsDone);
3872         }
3873         return &CNCMessageHandler::x_FinishCommand;
3874     }
3875     if (m_Quorum != 0)
3876         --m_Quorum;
3877 
3878 results_processed:
3879     m_ActiveHub->Release();
3880     m_ActiveHub = NULL;
3881     return &CNCMessageHandler::x_PutToNextPeer;
3882 }
3883 
3884 CNCMessageHandler::State
x_PurgeToNextPeer(void)3885 CNCMessageHandler::x_PurgeToNextPeer(void)
3886 {
3887     LOG_CURRENT_FUNCTION
3888     if (m_SrvsIndex >= m_CheckSrvs.size()  ||  NeedEarlyClose())
3889         return &CNCMessageHandler::x_FinishCommand;
3890 
3891     Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3892     if (m_ActiveHub) {
3893         SRV_FATAL("Previous client not released");
3894     }
3895     if (!m_NCBlobKey.RawKey().empty() && !CNCPeerControl::Peer(srv_id)->AcceptsPurge2()) {
3896         return &CNCMessageHandler::x_PurgeToNextPeer;
3897     }
3898     m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3899     return &CNCMessageHandler::x_SendPurgeToPeerCmd;
3900 }
3901 
3902 CNCMessageHandler::State
x_SendPurgeToPeerCmd(void)3903 CNCMessageHandler::x_SendPurgeToPeerCmd(void)
3904 {
3905     LOG_CURRENT_FUNCTION
3906     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3907     if (status == eNCHubWaitForConn)
3908         return NULL;
3909     if (status == eNCHubError || status == eNCHubSuccess) {
3910         m_LastPeerError = m_ActiveHub->GetErrMsg();
3911         m_ActiveHub->Release();
3912         m_ActiveHub = NULL;
3913         return &CNCMessageHandler::x_PurgeToNextPeer;
3914     }
3915     if (status != eNCHubConnReady) {
3916         SRV_FATAL("Unexpected client status: " << status);
3917     }
3918     if (NeedEarlyClose())
3919         return &CNCMessageHandler::x_FinishCommand;
3920 
3921     m_ActiveHub->GetHandler()->CopyPurge(GetDiagCtx(), m_NCBlobKey, m_CmdStartTime.AsUSec());
3922     return &CNCMessageHandler::x_ReadPurgeResults;
3923 }
3924 
3925 CNCMessageHandler::State
x_ReadPurgeResults(void)3926 CNCMessageHandler::x_ReadPurgeResults(void)
3927 {
3928     LOG_CURRENT_FUNCTION
3929     if (NeedEarlyClose())
3930         return &CNCMessageHandler::x_FinishCommand;
3931     ENCClientHubStatus status = m_ActiveHub->GetStatus();
3932     if (status == eNCHubCmdInProgress)
3933         return NULL;
3934     if (status == eNCHubError)
3935         goto results_processed;
3936     if (status != eNCHubSuccess) {
3937         SRV_FATAL("Unexpected client status: " << status);
3938     }
3939 
3940 results_processed:
3941     m_ActiveHub->Release();
3942     m_ActiveHub = NULL;
3943     return &CNCMessageHandler::x_PurgeToNextPeer;
3944 }
3945 
3946 
3947 inline unsigned int
x_GetBlobTTL(void)3948 CNCMessageHandler::x_GetBlobTTL(void)
3949 {
3950     LOG_CURRENT_FUNCTION
3951     return m_BlobTTL != 0 ? min(m_BlobTTL,m_AppSetup->max_ttl) : m_AppSetup->blob_ttl;
3952 }
3953 
3954 CNCMessageHandler::State
x_DoCmd_Health(void)3955 CNCMessageHandler::x_DoCmd_Health(void)
3956 {
3957     LOG_CURRENT_FUNCTION
3958     const char* health_coeff = "1";
3959     if (CNCBlobStorage::NeedStopWrite()) {
3960         health_coeff = "0 (does not accept writes)";
3961     } else if (!CNCServer::IsCachingComplete()) {
3962         health_coeff = "0.1 (caching not finished)";
3963     } else if (CNCBlobStorage::IsDraining()) {
3964         health_coeff = "0.2 (draining)";
3965     } else if (!CNCServer::IsInitiallySynced()) {
3966         health_coeff = "0.5 (initial sync not finished)";
3967     } else if (CNCPeerControl::HasPeerInThrottle()) {
3968         health_coeff = "0.8 (some peers unaccessible)";
3969     }
3970     x_ReportOK("OK:HEALTH_COEFF=").WriteText(health_coeff).WriteText("\n");
3971     WriteText("OK:UP_TIME=").WriteNumber(CNCServer::GetUpTime()).WriteText("\n");
3972     WriteText("OK:CACHING_COMPLETE=").WriteText(CNCServer::IsCachingComplete()? "yes": "no").WriteText("\n");
3973     WriteText("OK:INITIALLY_SYNCED=").WriteText(CNCServer::IsInitiallySynced()? "yes": "no").WriteText("\n");
3974     Int8 free_space = CNCBlobStorage::GetDiskFree();
3975     Int8 allowed_size = CNCBlobStorage::GetAllowedDBSize(free_space);
3976     WriteText("OK:DISK_FREE=").WriteNumber(free_space).WriteText("\n");
3977     WriteText("OK:DISK_LIMIT=").WriteNumber(allowed_size).WriteText("\n");
3978     WriteText("OK:DISK_USED=").WriteNumber(CNCBlobStorage::GetDBSize()).WriteText("\n");
3979     WriteText("OK:DISK_LIMIT_ALERT=").WriteText(CNCBlobStorage::IsDBSizeAlert()? "yes": "no").WriteText("\n");
3980     WriteText("OK:N_DB_FILES=").WriteNumber(CNCBlobStorage::GetNDBFiles()).WriteText("\n");
3981     WriteText("OK:COPY_QUEUE_SIZE=").WriteNumber(CNCPeerControl::GetMirrorQueueSize()).WriteText("\n");
3982 
3983     const TNCPeerList& peers = CNCDistributionConf::GetPeers();
3984     ITERATE(TNCPeerList, it_peer, peers) {
3985 //        WriteText("OK:QUEUE_SIZE_").WriteNumber(it_peer->first).WriteText("=").WriteNumber(CNCPeerControl::GetMirrorQueueSize(it_peer->first)).WriteText("\n");
3986         WriteText("OK:QUEUE_SIZE_").WriteText(it_peer->second).WriteText("=").WriteNumber(CNCPeerControl::GetMirrorQueueSize(it_peer->first)).WriteText("\n");
3987     }
3988     WriteText("OK:SYNC_LOG_SIZE=").WriteNumber(CNCSyncLog::GetLogSize()).WriteText("\n");
3989 
3990     WriteText("OK:END\n");
3991     return &CNCMessageHandler::x_FinishCommand;
3992 }
3993 
3994 CNCMessageHandler::State
x_DoCmd_Shutdown(void)3995 CNCMessageHandler::x_DoCmd_Shutdown(void)
3996 {
3997     LOG_CURRENT_FUNCTION
3998     TNSProtoParams& param = m_ParsedCmd.params;
3999     if (param.find("drain") != param.end() && param["drain"] != "0") {
4000         CNCBlobStorage::SetDraining(true);
4001     } else if (param.find("reset") != param.end() && param["reset"] != "0") {
4002         CNCBlobStorage::AbandonDB();
4003         CTaskServer::RequestShutdown(eSrvFastShutdown);
4004     } else {
4005         CTaskServer::RequestShutdown(eSrvSlowShutdown);
4006     }
4007     return &CNCMessageHandler::x_FinishCommand;
4008 }
4009 
4010 CNCMessageHandler::State
x_DoCmd_Version(void)4011 CNCMessageHandler::x_DoCmd_Version(void)
4012 {
4013     LOG_CURRENT_FUNCTION
4014     x_ReportOK("OK:server_version="   NETCACHED_VERSION
4015               "&storage_version="  NETCACHED_STORAGE_VERSION
4016               "&protocol_version=" NETCACHED_PROTOCOL_VERSION
4017               "&build_date=" + NStr::URLEncode(NETCACHED_BUILD_DATE) +
4018               "&mirrored=" + (NStr::BoolToString( CNCDistributionConf::HasPeers())))
4019         .WriteText("\n");
4020     return &CNCMessageHandler::x_FinishCommand;
4021 }
4022 
4023 CNCMessageHandler::State
x_DoCmd_GetConfig(void)4024 CNCMessageHandler::x_DoCmd_GetConfig(void)
4025 {
4026     LOG_CURRENT_FUNCTION
4027     TNSProtoParams& params = m_ParsedCmd.params;
4028     if (params.find("section") != params.end()) {
4029         string section(params["section"]);
4030         WriteText("{\"").WriteText(section).WriteText("\": {\n\"section\": \"");
4031         WriteText(section).WriteText("\"");
4032         if (section == "task_server") {
4033             CTaskServer::WriteSetup(*this);
4034         } else if (section == "netcache") {
4035             TStringMap client;
4036             if (params.find("port") != params.end()) {
4037                 client["port"] = params["port"];
4038             }
4039             if (params.find("cache") != params.end()) {
4040                 client["cache"] = params["cache"];
4041             }
4042             Flush();
4043             m_SendBuff.reset(new TNCBufferType());
4044             CNCServer::WriteAppSetup(*m_SendBuff, client);
4045             m_SendBuff->WriteText("\n}}\nOK:END\n");
4046             x_SetFlag(fNoReplyOnFinish);
4047             m_SendPos = 0;
4048             return &CNCMessageHandler::x_WriteSendBuff;
4049         } else if (section == "storage") {
4050             CNCBlobStorage::WriteSetup(*this);
4051         } else if (section == "mirror") {
4052             CNCDistributionConf::WriteSetup(*this);
4053         } else if (section == "env") {
4054             CNCServer::WriteEnvInfo(*this);
4055             CNCBlobStorage::WriteEnvInfo(*this);
4056             CNCDistributionConf::WriteEnvInfo(*this);
4057         } else if (section == "stat") {
4058             CSrvRef<CNCStat> stat = CNCStat::GetStat("1min", false);
4059             if (stat) {
4060                 stat->PrintState(*this);
4061             }
4062             CNCPeerControl::PrintState(*this);
4063 #ifdef _DEBUG
4064         } else if (section == "syncstat") {
4065             CNCPeerControl::PrintSyncStat(*this);
4066 #endif
4067 #if __NC_TASKS_MONITOR
4068         } else if (section == "tasks") {
4069             CSrvTask::PrintState(*this);
4070 #endif
4071         } else if (section == "allalerts") {
4072             CNCAlerts::Report(*this, true);
4073         } else if (section == "alerts") {
4074             CNCAlerts::Report(*this, false);
4075         } else if (section == "sync") {
4076             CTempString mask = params.find("port") != params.end() ? params.at("port") : CTempString(kEmptyStr);
4077             Flush();
4078             m_SendBuff.reset(new TNCBufferType());
4079             CNCActiveSyncControl::PrintState(*m_SendBuff, mask);
4080             m_SendBuff->WriteText("\n}}\nOK:END\n");
4081             x_SetFlag(fNoReplyOnFinish);
4082             m_SendPos = 0;
4083             return &CNCMessageHandler::x_WriteSendBuff;
4084         } else if (section == "db") {
4085             CTempString mask = params.find("port") != params.end() ? params.at("port") : CTempString(kEmptyStr);
4086             Flush();
4087             m_SendBuff.reset(new TNCBufferType());
4088             CNCBlobStorage::WriteDbInfo(*m_SendBuff, mask);
4089             m_SendBuff->WriteText("\n}}\nOK:END\n");
4090             x_SetFlag(fNoReplyOnFinish);
4091             m_SendPos = 0;
4092             return &CNCMessageHandler::x_WriteSendBuff;
4093         } else if (section == "blobs") {
4094             CNCBlobStorage::WriteBlobStat(*this);
4095         } else if (section == "blist") {
4096             CTempString mask = params.find("port") != params.end() ? params.at("port") : CTempString(kEmptyStr);
4097             Flush();
4098             m_SendBuff.reset(new TNCBufferType());
4099             CNCBlobStorage::WriteBlobList(*m_SendBuff, mask);
4100             m_SendBuff->WriteText("\n}}\nOK:END\n");
4101             x_SetFlag(fNoReplyOnFinish);
4102             m_SendPos = 0;
4103             return &CNCMessageHandler::x_WriteSendBuff;
4104         } else {
4105             WriteText(",\n\"error\": \"Unknown section name, valid names: ");
4106 #if __NC_TASKS_MONITOR
4107             WriteText("task_server, netcache, storage, mirror, alerts, allalerts, env, stat, sync, tasks, blobs, db\"");
4108 #else
4109             WriteText("task_server, netcache, storage, mirror, alerts, allalerts, env, stat, sync, blobs, db\"");
4110 #endif
4111         }
4112         WriteText("\n}}");
4113     } else {
4114         CNcbiOstrstream str;
4115         CTaskServer::GetConfRegistry().Write(str);
4116         string conf = CNcbiOstrstreamToString(str);
4117         WriteText(conf);
4118     }
4119     x_ReportOK("\nOK:END\n");
4120     return &CNCMessageHandler::x_FinishCommand;
4121 }
4122 
4123 CNCMessageHandler::State
x_DoCmd_AckAlert(void)4124 CNCMessageHandler::x_DoCmd_AckAlert(void)
4125 {
4126     LOG_CURRENT_FUNCTION
4127     CNCAlerts::EAlertAckResult res = CNCAlerts::eNotFound;
4128     TNSProtoParams& params = m_ParsedCmd.params;
4129     if (params.find("alert") != params.end()) {
4130         string alert(params["alert"]);
4131         if (params.find("user") != params.end()) {
4132             string user(params["user"]);
4133             res = CNCAlerts::Acknowledge(alert, user);
4134         }
4135     }
4136     if (res == CNCAlerts::eNotFound) {
4137         x_ReportError("ERR:Not found");
4138     } else {
4139         x_ReportOK("OK:END\n");
4140     }
4141     return &CNCMessageHandler::x_FinishCommand;
4142 }
4143 
4144 CNCMessageHandler::State
x_DoCmd_ReConfig(void)4145 CNCMessageHandler::x_DoCmd_ReConfig(void)
4146 {
4147     LOG_CURRENT_FUNCTION
4148     if (!CNCServer::IsInitiallySynced()) {
4149         x_ReportError("ERR:Initial sync not finished");
4150         GetDiagCtx()->SetRequestStatus(eStatus_CondFailed);
4151         return &CNCMessageHandler::x_FinishCommand;
4152     }
4153     TNSProtoParams& params = m_ParsedCmd.params;
4154     string err_message("ERR:Unknown section name");
4155     bool result = false;
4156     if (params.find("section") != params.end()) {
4157         string section(params["section"]);
4158         if (section != "task_server" &&
4159             section != "mirror" &&
4160             section != "storage") {
4161             err_message += ": " + section;
4162             goto done;
4163         }
4164         CNcbiRegistry* new_reg = NULL;
4165         if (!CTaskServer::ReadConfiguration(new_reg))
4166         {
4167             err_message = "ERR:Failed to load registry";
4168             goto done;
4169         }
4170         if (section == "task_server") {
4171             result = CTaskServer::ReConfig(*new_reg, err_message);
4172         } else if (section == "mirror") {
4173             result = CNCDistributionConf::ReConfig(*new_reg, err_message);
4174             if (result) {
4175                 CNCPeriodicSync::ReConfig();
4176             }
4177         }  else if (section == "storage") {
4178             result = CNCBlobStorage::ReConfig(*new_reg, err_message);
4179         }
4180         delete new_reg;
4181     }
4182 done:
4183     if (result) {
4184         x_ReportOK("OK:\n");
4185     } else {
4186         x_ReportError(err_message);
4187     }
4188     return &CNCMessageHandler::x_FinishCommand;
4189 }
4190 
4191 CNCMessageHandler::State
x_DoCmd_GetStat(void)4192 CNCMessageHandler::x_DoCmd_GetStat(void)
4193 {
4194     LOG_CURRENT_FUNCTION
4195     CSrvRef<CNCStat> stat = CNCStat::GetStat(m_StatType, m_StatPrev);
4196     if (!stat) {
4197         x_ReportError("ERR:Unknown statistics type: " + m_StatType);
4198         GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
4199     }
4200     else {
4201         stat->PrintToSocket(this);
4202         x_ReportOK("OK:END\n");
4203     }
4204     return &CNCMessageHandler::x_FinishCommand;
4205 }
4206 
4207 CNCMessageHandler::State
x_DoCmd_Put(void)4208 CNCMessageHandler::x_DoCmd_Put(void)
4209 {
4210     LOG_CURRENT_FUNCTION
4211     if (x_IsUserFlagSet(fNoCreate) && !(m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired())) {
4212         return &CNCMessageHandler::x_ReportBlobNotFound;
4213     }
4214     if (!m_BlobAccess->IsBlobExists()) {
4215         EHTTPStatus sts = eStatus_Created;
4216         if (x_IsHttpMode()) {
4217     // if blob does not exist, verify that it is POST, not PUT
4218             if (!x_IsFlagSet(fCanGenerateKey)) {
4219                 sts = eStatus_NotFound;
4220             }
4221             if (CNCBlobStorage::IsDraining()) {
4222                 sts = eStatus_ServiceUnavailable;
4223             }
4224         } else if (CNCBlobStorage::IsDraining()) {
4225             sts = eStatus_ShuttingDown;
4226         }
4227         if (!x_IsCmdSucceeded(sts)) {
4228             x_ReportError(sts);
4229             return &CNCMessageHandler::x_FinishCommand;
4230         }
4231         GetDiagCtx()->SetRequestStatus(sts);
4232     }
4233 
4234     m_BlobAccess->SetBlobTTL(x_GetBlobTTL());
4235     m_BlobAccess->SetVersionTTL(0);
4236     m_BlobAccess->SetBlobVersion(0);
4237     if (!x_IsHttpMode()) {
4238         // on Put, client expects 'ready' status before sending blob data
4239         WriteText("OK:ID:").WriteText(m_NCBlobKey.RawKey()).WriteText("\n");
4240     } else {
4241         string key(m_NCBlobKey.RawKey());
4242         if (!m_ClientParams["service"].empty()) {
4243             CNetCacheKey::AddExtensions(key, m_ClientParams["service"], 0, 3);
4244         }
4245         CNcbiOstrstream str;
4246         str << ",\"blob_key\":\"" << key << "\"";
4247         m_PosponedCmd += CNcbiOstrstreamToString(str);
4248     }
4249     return &CNCMessageHandler::x_StartReadingBlob;
4250 }
4251 
4252 CNCMessageHandler::State
x_DoCmd_Get(void)4253 CNCMessageHandler::x_DoCmd_Get(void)
4254 {
4255     LOG_CURRENT_FUNCTION
4256     if (m_AppSetup->prolong_on_read)
4257         x_ProlongBlobDeadTime(m_BlobAccess->GetCurBlobTTL());
4258 
4259     Uint8 blob_size = m_BlobAccess->GetCurBlobSize();
4260     if (blob_size < m_StartPos)
4261         blob_size = 0;
4262     else
4263         blob_size -= m_StartPos;
4264     bool range = false;
4265     if (m_Size != Uint8(-1)) {
4266         range = true;
4267         if (m_Size < blob_size)
4268             blob_size = m_Size;
4269         else
4270             m_Size = blob_size;
4271     }
4272 
4273     if (!x_IsHttpMode()) {
4274         GetDiagCtx()->SetRequestStatus(range ? eStatus_PartialContent : eStatus_OK);
4275         x_ReportOK("OK:BLOB found. SIZE=").WriteNumber(blob_size);
4276         if (m_AgeMax != 0) {
4277             WriteText(", AGE=").WriteNumber(m_AgeCur);
4278         }
4279         WriteText("\n");
4280     } else {
4281         // http://greenbytes.de/tech/webdav/rfc2616.html#header.range
4282         x_WriteHttpHeader(range ? eStatus_PartialContent : eStatus_OK, blob_size, true);
4283         x_SetFlag(fNoReplyOnFinish);
4284     }
4285 
4286     if (blob_size == 0)
4287         return &CNCMessageHandler::x_FinishCommand;
4288     if (NeedEarlyClose())
4289         return &CNCMessageHandler::x_CloseCmdAndConn;
4290 
4291     m_BlobAccess->SetPosition(m_StartPos);
4292     return &CNCMessageHandler::x_WriteBlobData;
4293 }
4294 
4295 CNCMessageHandler::State
x_DoCmd_GetLast(void)4296 CNCMessageHandler::x_DoCmd_GetLast(void)
4297 {
4298     LOG_CURRENT_FUNCTION
4299     if (m_AppSetup->prolong_on_read)
4300         x_ProlongBlobDeadTime(m_BlobAccess->GetCurBlobTTL());
4301 
4302     Uint8 blob_size = m_BlobAccess->GetCurBlobSize();
4303     if (blob_size < m_StartPos)
4304         blob_size = 0;
4305     else
4306         blob_size -= m_StartPos;
4307     if (m_Size != Uint8(-1)  &&  m_Size < blob_size)
4308         blob_size = m_Size;
4309 
4310     x_ReportOK("OK:BLOB found. SIZE=").WriteNumber(blob_size);
4311     WriteText(", VER=").WriteNumber(m_BlobAccess->GetCurBlobVersion());
4312     WriteText(", VALID=").WriteText(m_BlobAccess->IsCurVerExpired()? "false": "true");
4313     if (m_AgeMax != 0) {
4314         WriteText(", AGE=").WriteNumber(m_AgeCur);
4315     }
4316     WriteText("\n");
4317 
4318     if (blob_size == 0)
4319         return &CNCMessageHandler::x_FinishCommand;
4320     if (NeedEarlyClose())
4321         return &CNCMessageHandler::x_CloseCmdAndConn;
4322 
4323     m_BlobAccess->SetPosition(m_StartPos);
4324     return &CNCMessageHandler::x_WriteBlobData;
4325 }
4326 
4327 CNCMessageHandler::State
x_DoCmd_SetValid(void)4328 CNCMessageHandler::x_DoCmd_SetValid(void)
4329 {
4330     LOG_CURRENT_FUNCTION
4331     if (!m_BlobAccess->IsBlobExists()  ||  m_BlobAccess->IsCurBlobExpired())
4332         return &CNCMessageHandler::x_ReportBlobNotFound;
4333 
4334     if (m_BlobAccess->GetCurBlobVersion() != m_BlobVersion) {
4335         GetDiagCtx()->SetRequestStatus(eStatus_RaceCond);
4336         x_ReportOK("OK:WARNING:BLOB was changed");
4337         WriteText(", VER=").WriteNumber(m_BlobAccess->GetCurBlobVersion());
4338         WriteText("\n");
4339     }
4340     else {
4341         x_ProlongVersionLife();
4342         x_ReportOK("OK:\n");
4343     }
4344     return &CNCMessageHandler::x_FinishCommand;
4345 }
4346 
4347 CNCMessageHandler::State
x_DoCmd_GetSize(void)4348 CNCMessageHandler::x_DoCmd_GetSize(void)
4349 {
4350     LOG_CURRENT_FUNCTION
4351     if (m_AppSetup->prolong_on_read)
4352         x_ProlongBlobDeadTime(m_BlobAccess->GetCurBlobTTL());
4353 
4354     Uint8 size = m_BlobAccess->GetCurBlobSize();
4355     x_ReportOK("OK:").WriteNumber(size).WriteText("\n");
4356 
4357     return &CNCMessageHandler::x_FinishCommand;
4358 }
4359 
4360 CNCMessageHandler::State
x_DoCmd_Prolong(void)4361 CNCMessageHandler::x_DoCmd_Prolong(void)
4362 {
4363     LOG_CURRENT_FUNCTION
4364     bool ttl_overrun = false;
4365 #if 0
4366     if (m_BlobTTL <= m_BlobAccess->GetCurBlobTTL())
4367         ttl_overrun = false;
4368     else {
4369         m_BlobTTL = m_BlobAccess->GetCurBlobTTL();
4370         ttl_overrun = true;
4371     }
4372 #else
4373     if (m_BlobTTL > m_AppSetup->max_ttl) {
4374         m_BlobTTL = m_AppSetup->max_ttl;
4375         ttl_overrun = true;
4376     }
4377 #endif
4378 
4379     x_ProlongBlobDeadTime(m_BlobTTL);
4380     // Distinguish "PROLONG" vs "PROXY_PROLONG".
4381     if (x_IsFlagSet(fComesFromClient)) {
4382         if (!ttl_overrun) {
4383             x_ReportOK("OK:\n");
4384         } else {
4385             x_ReportOK("OK:WARNING:Capped the requested TTL for '").
4386                     WriteText(m_NCBlobKey.RawKey()).WriteText("' at ").
4387                     WriteNumber(m_BlobTTL).WriteText(" seconds.\n");
4388         }
4389     }
4390     return &CNCMessageHandler::x_FinishCommand;
4391 }
4392 
4393 CNCMessageHandler::State
x_DoCmd_HasBlob(void)4394 CNCMessageHandler::x_DoCmd_HasBlob(void)
4395 {
4396     LOG_CURRENT_FUNCTION
4397     int ver = 0;
4398     bool ver_report = false;
4399     bool exist = m_LatestExist  &&  m_LatestBlobSum->expire > CSrvTime::CurSecs();
4400     if (exist) {
4401         ver = m_BlobAccess->GetCurBlobVersion();
4402         ver_report = m_BlobVersion != ver;
4403         exist = !ver_report;
4404     }
4405     if (!exist) {
4406         GetDiagCtx()->SetRequestStatus(ver_report ? sStatus_BlobVersion : eStatus_NotFound);
4407     }
4408     x_ReportOK("OK:").WriteNumber(exist ? 1 : 0);
4409     if (ver_report) {
4410         WriteText(", VER=").WriteNumber(ver);
4411     }
4412     WriteText("\n");
4413     return &CNCMessageHandler::x_FinishCommand;
4414 }
4415 
4416 CNCMessageHandler::State
x_DoCmd_Remove(void)4417 CNCMessageHandler::x_DoCmd_Remove(void)
4418 {
4419     LOG_CURRENT_FUNCTION
4420     // We delete blob only from client point of view. From our POV we create
4421     // new blob version with expiration time one second in the past. This is
4422     // necessary for proper synchronization with other servers (if we delete
4423     // blob and then will synchronize using blob lists then other server will
4424     // copy the blob back to us). And we can create this new blob version
4425     // safely even when we haven't completed yet the initial synchronization.
4426     if ((!m_BlobAccess || !m_BlobAccess->IsBlobExists() || m_BlobAccess->IsCurBlobExpired())
4427         /*&&  CNCServer::IsInitiallySynced()*/)
4428     {
4429         return &CNCMessageHandler::x_FinishCommand;
4430     }
4431 
4432     bool is_mirrored = CNCDistributionConf::CountServersForSlot(m_BlobSlot) != 0;
4433     bool is_good = CNCServer::IsInitiallySynced() && !CNCPeerControl::HasPeerInThrottle();
4434     unsigned int mirrored_ttl = is_good ? min(Uint4(300), x_GetBlobTTL()) : x_GetBlobTTL();
4435     unsigned int local_ttl = 5;
4436     m_BlobAccess->SetBlobTTL( is_mirrored ? mirrored_ttl : local_ttl);
4437     m_BlobAccess->SetBlobVersion(m_BlobVersion);
4438     int expire = CSrvTime::CurSecs() - 1;
4439     unsigned int ttl = m_BlobAccess->GetNewBlobTTL();
4440     m_BlobAccess->SetNewBlobExpire(expire, expire + ttl + 1);
4441     CNCBlobAccessor::PutSucceeded(m_BlobAccess->GetBlobKey());
4442 
4443 // On COPY_RMV, modify create_time as well, to preserve blobs on peers (this one will be older)
4444     if (m_BlobAccess->GetAccessType() == eNCRead && m_CopyBlobInfo->create_time != 0) {
4445         m_BlobAccess->SetBlobCreateTime(m_CopyBlobInfo->create_time - 1);
4446         m_OrigRecNo = 0;
4447     }
4448     return &CNCMessageHandler::x_FinishReadingBlob;
4449 }
4450 
4451 CNCMessageHandler::State
x_DoCmd_IC_Store(void)4452 CNCMessageHandler::x_DoCmd_IC_Store(void)
4453 {
4454     LOG_CURRENT_FUNCTION
4455     if (x_IsUserFlagSet(fNoCreate) && !(m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired())) {
4456         return &CNCMessageHandler::x_ReportBlobNotFound;
4457     }
4458     if (!m_BlobAccess->IsBlobExists()) {
4459         if (CNCBlobStorage::IsDraining()) {
4460             x_ReportError(eStatus_ShuttingDown);
4461             return &CNCMessageHandler::x_FinishCommand;
4462         }
4463         GetDiagCtx()->SetRequestStatus(eStatus_Created);
4464     }
4465     m_BlobAccess->SetBlobTTL(x_GetBlobTTL());
4466     m_BlobAccess->SetVersionTTL(m_AppSetup->ver_ttl);
4467     m_BlobAccess->SetBlobVersion(m_BlobVersion);
4468     // on Store, client expects 'ready' status before sending blob data
4469     WriteText("OK:\n");
4470     if (m_Size == 0) {
4471         return &CNCMessageHandler::x_FinishCommand;
4472     }
4473     return &CNCMessageHandler::x_StartReadingBlob;
4474 }
4475 
4476 void
x_WriteFullBlobsList(void)4477 CNCMessageHandler::x_WriteFullBlobsList(void)
4478 {
4479     LOG_CURRENT_FUNCTION
4480     TNCBlobSumList blobs_list;
4481     CNCBlobStorage::GetFullBlobsList(m_Slot, blobs_list, CNCPeerControl::Peer(m_SrvId));
4482     m_SendBuff.reset(new TNCBufferType());
4483     m_SendBuff->reserve_mem(blobs_list.size() * 200);
4484     NON_CONST_ITERATE(TNCBlobSumList, it_blob, blobs_list) {
4485         if (NeedEarlyClose())
4486             goto error_return;
4487 
4488         const string& key = it_blob->first;
4489         SNCBlobSummary* blob_sum = it_blob->second;
4490         Uint2 key_size = Uint2(key.size());
4491         m_SendBuff->append(&key_size, sizeof(key_size));
4492         m_SendBuff->append(key.data(), key_size);
4493         m_SendBuff->append(&blob_sum->create_time,   sizeof(blob_sum->create_time));
4494         m_SendBuff->append(&blob_sum->create_server, sizeof(blob_sum->create_server));
4495         m_SendBuff->append(&blob_sum->create_id,     sizeof(blob_sum->create_id));
4496         m_SendBuff->append(&blob_sum->dead_time,     sizeof(blob_sum->dead_time));
4497         m_SendBuff->append(&blob_sum->expire,        sizeof(blob_sum->expire));
4498         m_SendBuff->append(&blob_sum->ver_expire,    sizeof(blob_sum->ver_expire));
4499         delete blob_sum;
4500         it_blob->second = NULL;
4501         blob_sum = NULL;
4502     }
4503     return;
4504 
4505 error_return:
4506     ITERATE(TNCBlobSumList, it_blob, blobs_list) {
4507         if (it_blob->second)
4508             delete it_blob->second;
4509     }
4510 }
4511 
4512 CNCMessageHandler::State
x_DoCmd_SyncStart(void)4513 CNCMessageHandler::x_DoCmd_SyncStart(void)
4514 {
4515     LOG_CURRENT_FUNCTION
4516     if (CNCBlobStorage::IsDraining()) {
4517         x_ReportError(eStatus_ShuttingDown);
4518         return &CNCMessageHandler::x_FinishCommand;
4519     }
4520     TReducedSyncEvents sync_events;
4521     ESyncInitiateResult sync_res = CNCPeriodicSync::Initiate(m_SrvId, m_Slot,
4522                                                 &m_LocalRecNo, &m_RemoteRecNo,
4523                                                 &sync_events, &m_SyncId);
4524     if (sync_res == eCrossSynced) {
4525         GetDiagCtx()->SetRequestStatus(eStatus_CrossSync);
4526         x_ReportOK("OK:CROSS_SYNC,SIZE=0\n");
4527         return &CNCMessageHandler::x_FinishCommand;
4528     }
4529     else if (sync_res == eServerBusy) {
4530         GetDiagCtx()->SetRequestStatus(eStatus_SyncBusy);
4531         x_ReportOK("OK:IN_PROGRESS,SIZE=0\n");
4532         return &CNCMessageHandler::x_FinishCommand;
4533     }
4534     else if (sync_res == eUnknownServer) {
4535         GetDiagCtx()->SetRequestStatus(eStatus_NotAllowed);
4536         x_ReportOK("OK:SIZE=0, NEED_ABORT2\n");
4537         if (CNCStat::AddUnknownServer(m_SrvId)) {
4538             SRV_LOG(Warning, "SYNC_START request from unknown server " << m_SrvId);
4539         }
4540         return &CNCMessageHandler::x_FinishCommand;
4541     }
4542 
4543     // Set fRunsInStartedSync flag so that x_CleanCmdResources() could properly call
4544     // CNCPeriodicSync::SyncCommandFinished() or CNCPeriodicSync::Cancel().
4545     x_SetFlag(fRunsInStartedSync);
4546     string result;
4547     if (sync_res == eProceedWithEvents) {
4548         m_SendBuff.reset(new TNCBufferType());
4549         m_SendBuff->reserve_mem(sync_events.size() * 200);
4550         ITERATE(TReducedSyncEvents, it_evt, sync_events) {
4551             if (NeedEarlyClose())
4552                 break;
4553 
4554             const SBlobEvent& blob_evt = it_evt->second;
4555             for (int i = 0; i < 2; ++i) {
4556                 SNCSyncEvent* evt = (i == 0? blob_evt.wr_or_rm_event: blob_evt.prolong_event);
4557                 if (!evt) {
4558                     continue;
4559                 }
4560                 if (evt->blob_size > CNCDistributionConf::GetMaxBlobSizeSync()) {
4561                     if (CNCDistributionConf::IsThisServerKey(evt->key.PackedKey())) {
4562                         continue;
4563                     }
4564                 }
4565                 if (!CNCPeerControl::Peer(m_SrvId)->AcceptsBlobKey(evt->key)) {
4566                     continue;
4567                 }
4568                 Uint2 key_size = Uint2(evt->key.PackedKey().size());
4569                 m_SendBuff->append(&key_size, sizeof(key_size));
4570                 m_SendBuff->append(evt->key.PackedKey().data(), key_size);
4571                 char c = char(evt->event_type);
4572                 m_SendBuff->append(&c, 1);
4573                 m_SendBuff->append(&evt->rec_no,      sizeof(evt->rec_no));
4574                 m_SendBuff->append(&evt->local_time,  sizeof(evt->local_time));
4575                 m_SendBuff->append(&evt->orig_rec_no, sizeof(evt->orig_rec_no));
4576                 m_SendBuff->append(&evt->orig_server, sizeof(evt->orig_server));
4577                 m_SendBuff->append(&evt->orig_time,   sizeof(evt->orig_time));
4578             }
4579         }
4580         GetDiagCtx()->SetRequestStatus(eStatus_SyncEvents);
4581         x_SetFlag(fSyncCmdSuccessful);
4582     }
4583     else {
4584         _ASSERT(sync_res == eProceedWithBlobs);
4585         m_LocalRecNo = CNCSyncLog::GetCurrentRecNo(m_Slot);
4586         x_WriteFullBlobsList();
4587         GetDiagCtx()->SetRequestStatus(eStatus_SyncBList);
4588         x_SetFlag(fSyncCmdSuccessful);
4589         result += "ALL_BLOBS,";
4590     }
4591 
4592     if (NeedEarlyClose())
4593         return &CNCMessageHandler::x_CloseCmdAndConn;
4594 
4595     x_ReportOK("OK:").WriteText(result);
4596     WriteText("SIZE=").WriteNumber(m_SendBuff->size());
4597     WriteText(" ").WriteNumber(m_LocalRecNo);
4598     WriteText(" ").WriteNumber(m_RemoteRecNo);
4599     WriteText("\n");
4600     m_SendPos = 0;
4601     return &CNCMessageHandler::x_WriteSendBuff;
4602 }
4603 
4604 CNCMessageHandler::State
x_DoCmd_SyncBlobsList(void)4605 CNCMessageHandler::x_DoCmd_SyncBlobsList(void)
4606 {
4607     LOG_CURRENT_FUNCTION
4608     CNCPeriodicSync::MarkCurSyncByBlobs(m_SrvId, m_Slot, m_SyncId);
4609     Uint8 rec_no = CNCSyncLog::GetCurrentRecNo(m_Slot);
4610     x_WriteFullBlobsList();
4611 
4612     if (NeedEarlyClose())
4613         return &CNCMessageHandler::x_CloseCmdAndConn;
4614 
4615     x_ReportOK("OK:SIZE=").WriteNumber(m_SendBuff->size());
4616     WriteText(" ").WriteNumber(rec_no);
4617     WriteText("\n");
4618     m_SendPos = 0;
4619     return &CNCMessageHandler::x_WriteSendBuff;
4620 }
4621 
4622 CNCMessageHandler::State
x_DoCmd_CopyPut(void)4623 CNCMessageHandler::x_DoCmd_CopyPut(void)
4624 {
4625     LOG_CURRENT_FUNCTION
4626     if (!m_BlobAccess->IsBlobExists()) {
4627         if (CNCBlobStorage::IsDraining()) {
4628             x_ReportError(eStatus_ShuttingDown);
4629             return &CNCMessageHandler::x_FinishCommand;
4630         }
4631         GetDiagCtx()->SetRequestStatus(eStatus_Created);
4632     }
4633     m_CopyBlobInfo->ttl = m_BlobTTL;
4634     m_CopyBlobInfo->password = m_BlobPass;
4635     m_CopyBlobInfo->blob_ver = m_BlobVersion;
4636     bool need_read_blob = m_BlobAccess->ReplaceBlobInfo(*m_CopyBlobInfo);
4637     if (need_read_blob) {
4638         // while receiving new data, redirect clients to "correct" server
4639         if (m_BlobAccess->IsBlobExists()) {
4640             m_BlobAccess->UpdateMetaInfo(m_CopyBlobInfo->create_server, m_CopyBlobInfo->create_time);
4641         }
4642         WriteText("OK:\n");
4643     }
4644     else {
4645         GetDiagCtx()->SetRequestStatus(eStatus_NewerBlob);
4646         x_SetFlag(fNoBlobAccessStats);
4647         x_SetFlag(fSyncCmdSuccessful);
4648         x_UnsetFlag(fCopyLogEvent);
4649         WriteText("OK:HAVE_NEWER1\n");
4650     }
4651     // Old NC servers (those which used CNetCacheAPI instead of
4652     // CNCActiveHandler) always started to write blob data in SYNC_PUT and
4653     // COPY_PUT even when we responded to them HAVE_NEWER. To avoid breaking
4654     // the protocol we need to read from them those fake blob writes. So for
4655     // old NC servers when we answered HAVE_NEWER we'll go to x_StartReadingBlob,
4656     // for newer ones we'll go to x_FinishCommand.
4657     if (!need_read_blob  &&  m_CmdVersion != 0) {
4658         x_SetFlag(fNoReplyOnFinish);
4659         x_UnsetFlag(fReadExactBlobSize);
4660         return &CNCMessageHandler::x_FinishCommand;
4661     }
4662 
4663     return &CNCMessageHandler::x_StartReadingBlob;
4664 }
4665 
4666 CNCMessageHandler::State
x_DoCmd_CopyProlong(void)4667 CNCMessageHandler::x_DoCmd_CopyProlong(void)
4668 {
4669     LOG_CURRENT_FUNCTION
4670     if (!m_BlobAccess->IsBlobExists()) {
4671         x_SetFlag(fSyncCmdSuccessful);
4672         return &CNCMessageHandler::x_ReportBlobNotFound;
4673     }
4674 
4675     if (m_BlobAccess->GetCurBlobCreateTime() == m_CopyBlobInfo->create_time
4676         &&  m_BlobAccess->GetCurCreateServer() == m_CopyBlobInfo->create_server
4677         &&  m_BlobAccess->GetCurCreateId() == m_CopyBlobInfo->create_id)
4678     {
4679         bool need_event = false;
4680         if (m_BlobAccess->GetCurBlobExpire() < m_CopyBlobInfo->expire) {
4681             m_BlobAccess->SetCurBlobExpire(m_CopyBlobInfo->expire,
4682                                            m_CopyBlobInfo->dead_time);
4683             need_event = true;
4684         }
4685         if (m_BlobAccess->GetCurVerExpire() < m_CopyBlobInfo->ver_expire) {
4686             m_BlobAccess->SetCurVerExpire(m_CopyBlobInfo->ver_expire);
4687             need_event = true;
4688         }
4689 
4690         // m_OrigRecNo can be 0 if prolong happens as a result of synchronization
4691         // using blob lists. In this case there's no event to link to and thus
4692         // no need to create event here.
4693         if (need_event  &&  m_OrigRecNo != 0) {
4694             SNCSyncEvent* event = new SNCSyncEvent();
4695             event->blob_size = m_BlobAccess->GetCurBlobSize();
4696             event->event_type = eSyncProlong;
4697             event->key = m_NCBlobKey;
4698             event->orig_server = m_OrigSrvId;
4699             event->orig_time = m_OrigTime;
4700             event->orig_rec_no = m_OrigRecNo;
4701             CNCSyncLog::AddEvent(m_BlobSlot, event);
4702         }
4703     }
4704     else {
4705         GetDiagCtx()->SetRequestStatus(eStatus_NewerBlob);
4706         x_SetFlag(fSyncCmdSuccessful);
4707     }
4708     return &CNCMessageHandler::x_FinishCommand;
4709 }
4710 
4711 CNCMessageHandler::State
x_DoCmd_SyncGet(void)4712 CNCMessageHandler::x_DoCmd_SyncGet(void)
4713 {
4714     LOG_CURRENT_FUNCTION
4715     if (!m_BlobAccess->IsBlobExists()) {
4716         x_SetFlag(fSyncCmdSuccessful);
4717         return &CNCMessageHandler::x_ReportBlobNotFound;
4718     }
4719 
4720     bool need_send = true;
4721     if (m_OrigTime != m_BlobAccess->GetCurBlobCreateTime()) {
4722         need_send = false;
4723     }
4724     else if (m_BlobAccess->GetCurBlobCreateTime() < m_CopyBlobInfo->create_time) {
4725         need_send = false;
4726     }
4727     else if (m_BlobAccess->GetCurBlobCreateTime() == m_CopyBlobInfo->create_time) {
4728         if (m_BlobAccess->GetCurCreateServer() < m_CopyBlobInfo->create_server) {
4729             need_send = false;
4730         }
4731         else if (m_BlobAccess->GetCurCreateServer() == m_CopyBlobInfo->create_server
4732                  &&  m_BlobAccess->GetCurCreateId() <= m_CopyBlobInfo->create_id)
4733         {
4734             need_send = false;
4735         }
4736     }
4737     if (!need_send) {
4738         GetDiagCtx()->SetRequestStatus(eStatus_NewerBlob);
4739         x_SetFlag(fSyncCmdSuccessful);
4740         x_ReportOK("OK:SIZE=0, HAVE_NEWER\n");
4741         return &CNCMessageHandler::x_FinishCommand;
4742     }
4743 
4744     x_ReportOK("OK:SIZE=").WriteNumber(m_BlobAccess->GetCurBlobSize());
4745     WriteText(" ").WriteNumber(m_BlobAccess->GetCurBlobVersion());
4746     WriteText(" \"").WriteText(m_BlobAccess->GetCurPassword());
4747     WriteText("\" ").WriteNumber(m_BlobAccess->GetCurBlobCreateTime());
4748     WriteText(" ").WriteNumber(Uint4(m_BlobAccess->GetCurBlobTTL()));
4749     WriteText(" ").WriteNumber(m_BlobAccess->GetCurBlobDeadTime());
4750     WriteText(" ").WriteNumber(m_BlobAccess->GetCurBlobExpire());
4751     WriteText(" ").WriteNumber(Uint4(m_BlobAccess->GetCurVersionTTL()));
4752     WriteText(" ").WriteNumber(m_BlobAccess->GetCurVerExpire());
4753     WriteText(" ").WriteNumber(m_BlobAccess->GetCurCreateServer());
4754     WriteText(" ").WriteNumber(m_BlobAccess->GetCurCreateId());
4755     WriteText("\n");
4756 
4757     if (m_BlobAccess->GetCurBlobSize() == 0)
4758         return &CNCMessageHandler::x_FinishCommand;
4759     if (NeedEarlyClose())
4760         return &CNCMessageHandler::x_CloseCmdAndConn;
4761 
4762     m_BlobAccess->SetPosition(0);
4763     return &CNCMessageHandler::x_WriteBlobData;
4764 }
4765 
4766 CNCMessageHandler::State
x_DoCmd_SyncProlongInfo(void)4767 CNCMessageHandler::x_DoCmd_SyncProlongInfo(void)
4768 {
4769     LOG_CURRENT_FUNCTION
4770     if (!m_BlobAccess->IsBlobExists()) {
4771         x_SetFlag(fSyncCmdSuccessful);
4772         return &CNCMessageHandler::x_ReportBlobNotFound;
4773     }
4774 
4775     x_ReportOK("OK:SIZE=0 ");
4776     WriteNumber(m_BlobAccess->GetCurBlobCreateTime()).WriteText(" ");
4777     WriteNumber(m_BlobAccess->GetCurCreateServer()).WriteText(" ");
4778     WriteNumber(m_BlobAccess->GetCurCreateId()).WriteText(" ");
4779     WriteNumber(m_BlobAccess->GetCurBlobDeadTime()).WriteText(" ");
4780     WriteNumber(m_BlobAccess->GetCurBlobExpire()).WriteText(" ");
4781     WriteNumber(m_BlobAccess->GetCurVerExpire());
4782     WriteText("\n");
4783 
4784     return &CNCMessageHandler::x_FinishCommand;
4785 }
4786 
4787 CNCMessageHandler::State
x_DoCmd_SyncCommit(void)4788 CNCMessageHandler::x_DoCmd_SyncCommit(void)
4789 {
4790     LOG_CURRENT_FUNCTION
4791     CNCPeriodicSync::Commit(m_SrvId, m_Slot, m_SyncId, m_LocalRecNo, m_RemoteRecNo);
4792     x_UnsetFlag(fRunsInStartedSync);
4793     return &CNCMessageHandler::x_FinishCommand;
4794 }
4795 
4796 CNCMessageHandler::State
x_DoCmd_SyncCancel(void)4797 CNCMessageHandler::x_DoCmd_SyncCancel(void)
4798 {
4799     LOG_CURRENT_FUNCTION
4800     CNCPeriodicSync::Cancel(m_SrvId, m_Slot, m_SyncId);
4801     x_UnsetFlag(fRunsInStartedSync);
4802     return &CNCMessageHandler::x_FinishCommand;
4803 }
4804 
4805 CNCMessageHandler::State
x_DoCmd_GetMeta(void)4806 CNCMessageHandler::x_DoCmd_GetMeta(void)
4807 {
4808     LOG_CURRENT_FUNCTION
4809     char time_buf[50];
4810     string tmp;
4811 
4812     if (x_IsHttpMode()) {
4813 
4814         CNcbiOstrstream str;
4815 
4816         tmp = m_NCBlobKey.RawKey();
4817         if (!m_ClientParams["service"].empty()) {
4818             CNetCacheKey::AddExtensions(tmp, m_ClientParams["service"], 0, 3);
4819         }
4820         str << ",\"blob_key\":\"" << tmp << "\"";
4821         str << ",\"slot\":" << m_BlobSlot;
4822         Uint8 create_time = m_BlobAccess->GetCurBlobCreateTime();
4823         CSrvTime t;
4824         t.Sec() = time_t(create_time / kUSecsPerSecond);
4825         t.NSec() = (create_time % kUSecsPerSecond) * 1000;
4826         t.Print(time_buf, CSrvTime::eFmtHumanUSecs);
4827         str << ",\"write_time\":\"" << time_buf << "\"";
4828         Uint8 create_server = m_BlobAccess->GetCurCreateServer();
4829 
4830         string hostport( CNCDistributionConf::GetPeerNameOrEmpty(create_server));
4831         str << ",\"control_server\":\"";
4832         if (hostport.empty()) {
4833             str << CTaskServer::GetHostByIP(Uint4(create_server >> 32)) << ":"
4834             << NStr::UIntToString(Uint4(create_server));
4835         } else {
4836             str << hostport;
4837         }
4838         str << "\"";
4839         str << ",\"control_id\":" << m_BlobAccess->GetCurCreateId();
4840         str << ",\"ttl\":" << m_BlobAccess->GetCurBlobTTL();
4841         t.Sec() = m_BlobAccess->GetCurBlobExpire();
4842         t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4843         str << ",\"expire\":\"" << time_buf << "\"";
4844         str << ",\"size\":" << m_BlobAccess->GetCurBlobSize();
4845         tmp = m_BlobAccess->GetCurPassword();
4846         if (!tmp.empty()) {
4847             tmp = "yes";
4848         }
4849         str << ",\"password\":\"" << tmp << "\"";
4850         str << ",\"version\":" << m_BlobAccess->GetCurBlobVersion();
4851         str << ",\"version_ttl\":" << m_BlobAccess->GetCurVersionTTL();
4852         t.Sec() = m_BlobAccess->GetCurVerExpire();
4853         t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4854         str << ",\"version_expire\":\"" << time_buf << "\"";
4855         m_PosponedCmd += CNcbiOstrstreamToString(str);
4856 
4857         return &CNCMessageHandler::x_FinishCommand;
4858     }
4859     m_SendBuff.reset(new TNCBufferType());
4860     m_SendBuff->reserve_mem(1024);
4861 
4862     tmp = "OK:Slot: ";
4863     m_SendBuff->append(tmp.data(), tmp.size());
4864     tmp = NStr::UIntToString(m_BlobSlot);
4865     m_SendBuff->append(tmp.data(), tmp.size());
4866 
4867     tmp = "\nOK:Write time: ";
4868     m_SendBuff->append(tmp.data(), tmp.size());
4869     Uint8 create_time = m_BlobAccess->GetCurBlobCreateTime();
4870     CSrvTime t;
4871     t.Sec() = time_t(create_time / kUSecsPerSecond);
4872     t.NSec() = (create_time % kUSecsPerSecond) * 1000;
4873     t.Print(time_buf, CSrvTime::eFmtHumanUSecs);
4874     m_SendBuff->append(time_buf, strlen(time_buf));
4875 
4876     tmp = "\nOK:Control server: ";
4877     m_SendBuff->append(tmp.data(), tmp.size());
4878     Uint8 create_server = m_BlobAccess->GetCurCreateServer();
4879     tmp = CNCDistributionConf::GetPeerNameOrEmpty(create_server);
4880     if (tmp.empty()) {
4881         tmp = CTaskServer::GetHostByIP(Uint4(create_server >> 32));
4882         m_SendBuff->append(tmp.data(), tmp.size());
4883         m_SendBuff->append(":", 1);
4884         tmp = NStr::UIntToString(Uint4(create_server));
4885         m_SendBuff->append(tmp.data(), tmp.size());
4886     } else {
4887         m_SendBuff->append(tmp.data(), tmp.size());
4888     }
4889     tmp = "\nOK:Control id: ";
4890     m_SendBuff->append(tmp.data(), tmp.size());
4891     tmp = NStr::Int8ToString(m_BlobAccess->GetCurCreateId());
4892     m_SendBuff->append(tmp.data(), tmp.size());
4893 
4894     tmp = "\nOK:TTL: ";
4895     m_SendBuff->append(tmp.data(), tmp.size());
4896     tmp = NStr::UIntToString(m_BlobAccess->GetCurBlobTTL());
4897     m_SendBuff->append(tmp.data(), tmp.size());
4898 
4899     tmp = "\nOK:Expire: ";
4900     m_SendBuff->append(tmp.data(), tmp.size());
4901     t.Sec() = m_BlobAccess->GetCurBlobExpire();
4902     t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4903     m_SendBuff->append(time_buf, strlen(time_buf));
4904 
4905     tmp = "\nOK:Size: ";
4906     m_SendBuff->append(tmp.data(), tmp.size());
4907     tmp = NStr::UInt8ToString(m_BlobAccess->GetCurBlobSize());
4908     m_SendBuff->append(tmp.data(), tmp.size());
4909 
4910     tmp = "\nOK:Password: '";
4911     m_SendBuff->append(tmp.data(), tmp.size());
4912     tmp = m_BlobAccess->GetCurPassword();
4913     if (!tmp.empty()) {
4914         tmp = "yes";
4915         m_SendBuff->append(tmp.data(), tmp.size());
4916     }
4917     m_SendBuff->append("'", 1);
4918 
4919     tmp = "\nOK:Version: ";
4920     m_SendBuff->append(tmp.data(), tmp.size());
4921     tmp = NStr::IntToString(m_BlobAccess->GetCurBlobVersion());
4922     m_SendBuff->append(tmp.data(), tmp.size());
4923 
4924     tmp = "\nOK:Version's TTL: ";
4925     m_SendBuff->append(tmp.data(), tmp.size());
4926     tmp = NStr::UIntToString(m_BlobAccess->GetCurVersionTTL());
4927     m_SendBuff->append(tmp.data(), tmp.size());
4928 
4929     tmp = "\nOK:Version expire: ";
4930     m_SendBuff->append(tmp.data(), tmp.size());
4931     t.Sec() = m_BlobAccess->GetCurVerExpire();
4932     t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4933     m_SendBuff->append(time_buf, strlen(time_buf));
4934 
4935     tmp = "\nOK:END\n";
4936     m_SendBuff->append(tmp.data(), tmp.size());
4937 
4938 
4939     x_ReportOK("OK:SIZE=").WriteNumber(m_SendBuff->size()).WriteText("\n");
4940     m_SendPos = 0;
4941     return &CNCMessageHandler::x_WriteSendBuff;
4942 }
4943 
4944 CNCMessageHandler::State
x_DoCmd_ProxyMeta(void)4945 CNCMessageHandler::x_DoCmd_ProxyMeta(void)
4946 {
4947     LOG_CURRENT_FUNCTION
4948     if (!m_BlobAccess->IsBlobExists())
4949         return &CNCMessageHandler::x_ReportBlobNotFound;
4950 
4951     x_ReportOK("OK:SIZE=0 ");
4952     WriteNumber(m_BlobAccess->IsValid() ? m_BlobAccess->GetCurBlobCreateTime() : 123).WriteText(" ");
4953     WriteNumber(m_BlobAccess->GetCurCreateServer()).WriteText(" ");
4954     WriteNumber(m_BlobAccess->GetCurCreateId()).WriteText(" ");
4955     WriteNumber(m_BlobAccess->GetCurBlobDeadTime()).WriteText(" ");
4956     WriteNumber(m_BlobAccess->GetCurBlobExpire()).WriteText(" ");
4957     WriteNumber(m_BlobAccess->GetCurVerExpire());
4958     WriteText("\n");
4959 
4960     return &CNCMessageHandler::x_FinishCommand;
4961 }
4962 
4963 CNCMessageHandler::State
x_DoCmd_CopyUpdate(void)4964 CNCMessageHandler::x_DoCmd_CopyUpdate(void)
4965 {
4966     LOG_CURRENT_FUNCTION
4967 #if USE_ALWAYS_COPY_UPD
4968     if (!m_BlobAccess->IsBlobExists()) {
4969 // create zero-length blob just to note that it exists
4970 // it will come soon anyway
4971         m_BlobAccess->SetBlobTTL(x_GetBlobTTL());
4972         m_BlobAccess->SetVersionTTL(0);
4973         m_BlobAccess->SetBlobVersion(0);
4974         m_BlobAccess->GetWriteMemSize();
4975         int cur_secs = int(CSrvTime::Current().Sec());
4976         // this one will be older
4977         m_BlobAccess->SetBlobCreateTime(m_CopyBlobInfo->create_time - 1);
4978         // and with short life
4979         m_BlobAccess->SetNewBlobExpire( cur_secs + 60);
4980         m_BlobAccess->SetNewVerExpire(  cur_secs + 60);
4981         m_BlobAccess->SetCreateServer(CNCDistributionConf::GetSelfID(),
4982                                       CNCBlobStorage::GetNewBlobId());
4983         m_BlobAccess->Finalize();
4984     }
4985 #endif
4986     if (m_BlobAccess->IsBlobExists()) {
4987         m_BlobAccess->UpdateMetaInfo(m_CopyBlobInfo->create_server, m_CopyBlobInfo->create_time);
4988     }
4989     return &CNCMessageHandler::x_FinishCommand;
4990 }
4991 
4992 /*
4993 CNCMessageHandler::State
4994 CNCMessageHandler::x_DoCmd_GetBlobsList(void)
4995 {
4996     const vector<Uint2>& slots = CNCDistributionConf::GetSelfSlots();
4997     string slot_str;
4998     ITERATE(vector<Uint2>, it_slot, slots) {
4999         slot_str += NStr::UIntToString(*it_slot);
5000         slot_str += ",";
5001     }
5002     slot_str.resize(slot_str.size() - 1);
5003     Write("OK:Serving slots: ").Write(slot_str).Write("\n");
5004 
5005     ITERATE(vector<Uint2>, it_slot, slots) {
5006         TNCBlobSumList blobs_lst;
5007         CNCBlobStorage::GetFullBlobsList(*it_slot, blobs_lst);
5008         NON_CONST_ITERATE(TNCBlobSumList, it_blob, blobs_lst) {
5009             if (x_NeedEarlyClose())
5010                 goto error_return;
5011 
5012             const string& raw_key = it_blob->first;
5013             SNCCacheData*& blob_sum = it_blob->second;
5014             string cache_name, key, subkey;
5015             CNCBlobKey::UnpackBlobKey(raw_key, cache_name, key, subkey);
5016             Write("OK:key: ").Write(key);
5017             if (!cache_name.empty())
5018                 Write(", subkey: ").Write(subkey);
5019             Write(", expire: ");
5020             CTime tmp_time(CTime::eEmpty, CTime::eLocal);
5021             tmp_time.SetTimeT(time_t(blob_sum->expire));
5022             Write(tmp_time.AsString("M/D/Y h:m:s"));
5023             Write(", create_time: ");
5024             tmp_time.SetTimeT(time_t(blob_sum->create_time / kNCTimeTicksInSec));
5025             tmp_time.SetMicroSecond(blob_sum->create_time % kNCTimeTicksInSec);
5026             Write(tmp_time.AsString("M/D/Y h:m:s.r"));
5027             Write(", create_id: ").Write(blob_sum->create_id);
5028             Write(", create_server: ");
5029             Write(CTaskServer::GetHostByIP(Uint4(blob_sum->create_server >> 32)));
5030             Write(":").Write(Uint4(blob_sum->create_server));
5031             Write("\n");
5032             delete blob_sum;
5033             blob_sum = NULL;
5034         }
5035         continue;
5036 
5037 error_return:
5038         ITERATE(TNCBlobSumList, it_blob, blobs_lst) {
5039             delete it_blob->second;
5040         }
5041         return &CNCMessageHandler::x_CloseCmdAndConn;
5042     }
5043     Write("OK:END\n");
5044     return &CNCMessageHandler::x_FinishCommand;
5045 }
5046 */
5047 CNCMessageHandler::State
x_DoCmd_NotImplemented(void)5048 CNCMessageHandler::x_DoCmd_NotImplemented(void)
5049 {
5050     LOG_CURRENT_FUNCTION
5051     x_ReportError(eStatus_NoImpl);
5052     return &CNCMessageHandler::x_FinishCommand;
5053 }
5054 
5055 CNCMessageHandler::State
x_DoCmd_Purge(void)5056 CNCMessageHandler::x_DoCmd_Purge(void)
5057 {
5058     LOG_CURRENT_FUNCTION
5059     if (CNCBlobAccessor::Purge( m_NCBlobKey, m_CmdStartTime.AsUSec())) {
5060         CNCBlobStorage::SavePurgeData();
5061     }
5062     CNCDistributionConf::GetPeerServers(m_CheckSrvs);
5063     return &CNCMessageHandler::x_PurgeToNextPeer;
5064 }
5065 
5066 CNCMessageHandler::State
x_DoCmd_CopyPurge(void)5067 CNCMessageHandler::x_DoCmd_CopyPurge(void)
5068 {
5069     LOG_CURRENT_FUNCTION
5070     if (CNCBlobAccessor::Purge( m_NCBlobKey, m_CopyBlobInfo->create_time)) {
5071         CNCBlobStorage::SavePurgeData();
5072     }
5073     return &CNCMessageHandler::x_FinishCommand;
5074 }
5075 
5076 CNCMessageHandler::State
x_DoCmd_GetBList(void)5077 CNCMessageHandler::x_DoCmd_GetBList(void)
5078 {
5079     m_SendBuff.reset(new TNCBufferType());
5080     bool newsep = NStr::Find(m_ParsedCmd.command->cmd, "BLIST2") != NPOS;
5081     CNCBlobStorage::GetBList(m_NCBlobKey.PackedKey(), m_SendBuff,m_BlobFilter, newsep ? "\t" : ",");
5082     CNCDistributionConf::AddServerSlots(m_SlotsDone, 0);
5083     ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
5084     if (proxy_cmd != eProxyGetBList2) {
5085         x_ReportOK("OK: SIZE=").WriteNumber(m_SendBuff->size()).WriteText("\n");
5086         Flush();
5087     }
5088     m_SendPos = 0;
5089     return &CNCMessageHandler::x_WriteSendBuff;
5090 }
5091 
5092 CNCMessageHandler::State
x_DoCmd_GetBListNext(void)5093 CNCMessageHandler::x_DoCmd_GetBListNext(void)
5094 {
5095     x_UnsetFlag(fNoReplyOnFinish);
5096     if (!m_CheckSrvs.empty()) {
5097         CNCDistributionConf::AddServerSlots(m_SlotsDone, m_CheckSrvs[m_SrvsIndex-1]);
5098     }
5099     Uint2 slot, slot_max = CNCDistributionConf::GetMaxSlotNumber();
5100     for (slot=1; slot<slot_max; ++slot) {
5101         if (m_SlotsDone.find(slot) == m_SlotsDone.end()) {
5102             m_CheckSrvs = CNCDistributionConf::GetRawServersForSlot(slot);
5103             if (m_CheckSrvs.empty()) {
5104                 x_ReportError(eStatus_CondFailed);
5105                 return &CNCMessageHandler::x_FinishCommand;
5106             }
5107             return &CNCMessageHandler::x_ProxyToNextPeer;
5108         }
5109     }
5110     return &CNCMessageHandler::x_FinishCommand;
5111 }
5112 
5113 void
BeginProxyResponse(const CTempString & response,size_t content_length)5114 CNCMessageHandler::BeginProxyResponse(const CTempString& response, size_t content_length)
5115 {
5116     if (x_IsHttpMode()) {
5117         if (NStr::StartsWith(response, "HTTP")) {
5118             WriteText(response).WriteText("\r\n");
5119         } else {
5120             x_WriteHttpHeader(eStatus_OK, content_length, true);
5121         }
5122         x_SetFlag(fNoReplyOnFinish);
5123         return;
5124     }
5125     ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
5126     if (proxy_cmd != eProxyGetBList2) {
5127         x_ReportOK(response).WriteText("\n");
5128     }
5129 }
5130 
5131 void
x_WriteHttpResponse(void)5132 CNCMessageHandler::x_WriteHttpResponse(void)
5133 {
5134     int cmd_status = GetDiagCtx()->GetRequestStatus();
5135     bool succeeded = x_IsCmdSucceeded(cmd_status);
5136     size_t content_length = 0;
5137     string envelope;
5138     if (succeeded && !m_PosponedCmd.empty()) {
5139         CNcbiOstrstream str;
5140         str << "{\"status\":" << cmd_status;
5141         envelope = CNcbiOstrstreamToString(str);
5142         content_length = envelope.size() + m_PosponedCmd.size() + 1;
5143     }
5144     x_WriteHttpHeader(cmd_status, content_length, false);
5145     if (content_length != 0) {
5146         Write(     envelope.data(),      envelope.size());
5147         Write(m_PosponedCmd.data(), m_PosponedCmd.size());
5148         WriteText("}");
5149     }
5150 }
5151 
5152 void
x_WriteHttpHeader(int cmd_status,size_t content_length,bool binary)5153 CNCMessageHandler::x_WriteHttpHeader(int cmd_status, size_t content_length, bool binary)
5154 {
5155     bool succeeded = x_IsCmdSucceeded(cmd_status);
5156     WriteText("HTTP/1.");
5157     WriteText(m_HttpMode == eHttp10 ? "0" : "1");
5158     WriteText(" ").WriteNumber(cmd_status).WriteText(" ");
5159     WriteText(succeeded ? "OK" : "ERR").WriteText("\r\n");
5160     WriteText("Content-Length: ").WriteNumber(content_length).WriteText("\r\n");
5161     if (content_length != 0) {
5162         if (binary) {
5163             WriteText("Content-Type: application/octet-stream\r\n");
5164         } else {
5165             WriteText("Content-Type: application/json\r\n");
5166         }
5167     }
5168     WriteText("\r\n");
5169 }
5170 
5171 
CNCMsgHandler_Factory(void)5172 CNCMsgHandler_Factory::CNCMsgHandler_Factory(void)
5173 {}
5174 
~CNCMsgHandler_Factory(void)5175 CNCMsgHandler_Factory::~CNCMsgHandler_Factory(void)
5176 {}
5177 
5178 CSrvSocketTask*
CreateSocketTask(void)5179 CNCMsgHandler_Factory::CreateSocketTask(void)
5180 {
5181     return new CNCMessageHandler();
5182 }
5183 
5184 END_NCBI_SCOPE
5185