1 /* $Id: message_handler.cpp 587630 2019-06-06 17:41:53Z gouriano $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Pavel Ivanov
27 *
28 */
29
30 #include "nc_pch.hpp"
31
32 #include <corelib/ncbireg.hpp>
33 #include <corelib/ncbifile.hpp>
34 #include <corelib/request_ctx.hpp>
35 #include <corelib/ncbi_bswap.hpp>
36 #include <util/md5.hpp>
37
38 #include "netcached.hpp"
39 #include "message_handler.hpp"
40 #include "netcache_version.hpp"
41 #include "nc_stat.hpp"
42 #include "peer_control.hpp"
43 #include "distribution_conf.hpp"
44 #include "periodic_sync.hpp"
45 #include "active_handler.hpp"
46 #include "nc_storage.hpp"
47 #include "nc_storage_blob.hpp"
48 #include "logging.hpp"
49
50
51
52 BEGIN_NCBI_SCOPE
53
54 #if 0
55 #define LOG_CURRENT_FUNCTION SRV_LOG(Warning, "this: " << (void*)this);
56 #else
57 #define LOG_CURRENT_FUNCTION
58 #endif
59
60 // when 1, server always broadcasts 'blob update' notifications.
61 // This will create empty blob stub on COPY_UPD if needed
62 // This takes time though, and I am not sure this is required.
63 // Another problem is that it overloads communication channels -
64 // NC fails much more often in CNCPeerControl::x_ReserveBGConn (too many connections)
65 #define USE_ALWAYS_COPY_UPD 0
66
67 /// Definition of all NetCache commands
68 ///
69 /// General format of a "NetCache" command is as follows:
70 ///
71 /// CMD param1 param2 ...
72 ///
73 /// Format of "ICache" command is as follows:
74 ///
75 /// IC(cache) CMD param1 param2 ...
76 ///
77 /// Here "IC" is two letters that appear in command literally. "cache" is name
78 /// of the cache where blob is stored; it's mentioned in parameter list as
79 /// first parameter with type eNSPA_ICPrefix. Every command parameter can be
80 /// given as just value or as name=value pair. String parameter values can be
81 /// enclosed in double quotes. If parameter is declared with the flag
82 /// eNSPA_Optional then it can be skipped from the command, in this case its
83 /// default value will be used (if any). If parameter flag eNSPA_Optchain has
84 /// the same meaning except if it's not provided then all following parameters
85 /// marked as eNSPA_Optional will be assumed not provided too.
86 ///
87 /// If command needs some binary data along with it then it's sent split in
88 /// chunks each having 4-byte integer prefix containing the length of the chunk.
89 /// When all data is sent special chunk length 0xFFFFFFFF should be sent
90 /// at the end. Successful response of each command is sent as one line
91 /// starting with "OK:" and then space-separated parameters that need to be
92 /// returned. If response should contain binary data then initial response
93 /// line should have "SIZE=nnn" with the size of binary data to follow.
94 /// Unsuccessful responses to commands always start with "ERR:" and then error
95 /// explanation follows.
96 ///
97 /// Descriptions of commands have
98 /// - command name as it comes from client;
99 /// - structure containing
100 /// * state function processing this command;
101 /// * command name as it appears in statistics;
102 /// * flags controlling command behavior;
103 /// * type of access to blob if needed;
104 /// * type of proxy command that should be executed if command will need to
105 /// be proxied to other servers;
106 /// - set of structures explaining command parameters. Each structure has
107 /// * parameter name which can be used by client if it passes parameters in
108 /// name=value form. Also name is used to distinguish parameters in
109 /// x_AssignCmdParams();
110 /// * type of parameter - parser makes additional checks to see if given
111 /// value is applicable for necessary parameter type;
112 /// * parameter flags.
113 static CNCMessageHandler::SCommandDef s_CommandMap[] = {
114 // "Are you alive?" command. This is old and deprecated command but it's
115 // executed a lot in old ICache clients. All that they need in response is
116 // "OK:".
117 { "A?",
118 {&CNCMessageHandler::x_FinishCommand,
119 "A?", fNoCmdFlags, eNCNone, eProxyNone} },
120 // Requests version of the server.
121 { "VERSION",
122 {&CNCMessageHandler::x_DoCmd_Version,
123 "VERSION", fNoCmdFlags, eNCNone, eProxyNone},
124 {
125 // Client IP for application requesting the info.
126 { "ip", eNSPT_Str, fNSPA_Optional },
127 // Session ID for application requesting the info.
128 { "sid", eNSPT_Str, eNSPA_Optional },
129 // request Hit ID
130 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
131 } },
132 // Requests some "health" information about the server.
133 { "HEALTH",
134 {&CNCMessageHandler::x_DoCmd_Health,
135 "HEALTH", fNoCmdFlags, eNCNone, eProxyNone},
136 {
137 // Client IP for application requesting the info.
138 { "ip", eNSPT_Str, fNSPA_Optional },
139 // Session ID for application requesting the info.
140 { "sid", eNSPT_Str, eNSPA_Optional },
141 // request Hit ID
142 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
143 } },
144 // Check if blob exists. Command for "ICache" clients.
145 { "HASB",
146 {&CNCMessageHandler::x_DoCmd_HasBlob,
147 "IC_HASB",
148 eClientBlobRead | fPeerFindExistsOnly | fNoBlobVersionCheck,
149 eNCRead,
150 eProxyHasBlob},
151 // Name of cache for blob.
152 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
153 // Blob's key.
154 { "key", eNSPT_Str, eNSPA_Required },
155 // Blob's version.
156 { "version", eNSPT_Int, eNSPA_Required },
157 // Blob's subkey.
158 { "subkey", eNSPT_Str, eNSPA_Required },
159 // Quorum to use for this operation.
160 { "qrum", eNSPT_Int, eNSPA_Optional },
161 // Client IP for application requesting the info.
162 { "ip", eNSPT_Str, fNSPA_Optional },
163 // Session ID for application requesting the info.
164 { "sid", eNSPT_Str, eNSPA_Optional },
165 // Password for blob access.
166 { "pass", eNSPT_Str, eNSPA_Optional },
167 // request Hit ID
168 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
169 } },
170 // Read blob contents. Command for "ICache" clients.
171 { "READ",
172 {&CNCMessageHandler::x_DoCmd_Get,
173 "IC_READ",
174 eClientBlobRead,
175 eNCReadData,
176 eProxyRead},
177 // Name of cache for blob.
178 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
179 // Blob's key.
180 { "key", eNSPT_Str, eNSPA_Required },
181 // Blob's version.
182 { "version", eNSPT_Int, eNSPA_Required },
183 // Blob's subkey.
184 { "subkey", eNSPT_Str, eNSPA_Required },
185 // Quorum to use for this operation.
186 { "qrum", eNSPT_Int, eNSPA_Optional },
187 // Client IP for application requesting the info.
188 { "ip", eNSPT_Str, fNSPA_Optional },
189 // Session ID for application requesting the info.
190 { "sid", eNSPT_Str, eNSPA_Optional },
191 // Password for blob access.
192 { "pass", eNSPT_Str, eNSPA_Optional },
193 // Max age of blob (returned blob should be younger)
194 { "age", eNSPT_Int, eNSPA_Optional },
195 // request Hit ID
196 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
197 } },
198 // Write blob contents. Command for "ICache" clients.
199 { "STOR",
200 {&CNCMessageHandler::x_DoCmd_IC_Store,
201 "IC_STOR",
202 eClientBlobWrite | fNoReplyOnFinish,
203 eNCCreate,
204 eProxyWrite},
205 // Name of cache for blob.
206 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
207 // Time-to-live for the blob, 0 means default from server settings.
208 { "ttl", eNSPT_Int, eNSPA_Required },
209 // Blob's key.
210 { "key", eNSPT_Str, eNSPA_Required },
211 // Blob's version.
212 { "version", eNSPT_Int, eNSPA_Required },
213 // Blob's subkey.
214 { "subkey", eNSPT_Str, eNSPA_Required },
215 // 1 if client wants confirmation after blob has been written
216 // (by default it just assumes that everything is written and moves
217 // further).
218 { "confirm", eNSPT_Int, eNSPA_Optional },
219 // Quorum to use for this operation.
220 { "qrum", eNSPT_Int, eNSPA_Optional },
221 // Client IP for application requesting the info.
222 { "ip", eNSPT_Str, fNSPA_Optional },
223 // Session ID for application requesting the info.
224 { "sid", eNSPT_Str, eNSPA_Optional },
225 // Password for blob access.
226 { "pass", eNSPT_Str, eNSPA_Optional },
227 // request Hit ID
228 { "ncbi_phid", eNSPT_Str, eNSPA_Optional },
229 // see ENCUserFlags, added in v6.11.0 (CXX-8737)
230 { "flags", eNSPT_Int, eNSPA_Optional }
231 } },
232 // Write blob contents. Old and deprecated command which probably is not
233 // used by modern ICache clients anymore. It has the size of the blob right
234 // in the command (so client should know it beforehand) and it doesn't use
235 // "EOF" marker at the end of blob data.
236 { "STRS",
237 {&CNCMessageHandler::x_DoCmd_IC_Store,
238 "IC_STRS",
239 eClientBlobWrite | fReadExactBlobSize | fSkipBlobEOF | fNoReplyOnFinish,
240 eNCCreate,
241 eProxyWrite},
242 // Name of cache for blob.
243 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
244 // Time-to-live for the blob, 0 means default from server settings.
245 { "ttl", eNSPT_Int, eNSPA_Required },
246 // Size of the blob to be written.
247 { "size", eNSPT_Int, eNSPA_Required },
248 // Blob's key.
249 { "key", eNSPT_Str, eNSPA_Required },
250 // Blob's version.
251 { "version", eNSPT_Int, eNSPA_Required },
252 // Blob's subkey.
253 { "subkey", eNSPT_Str, eNSPA_Required },
254 // Quorum to use for this operation.
255 { "qrum", eNSPT_Int, eNSPA_Optional },
256 // Client IP for application requesting the info.
257 { "ip", eNSPT_Str, fNSPA_Optional },
258 // Session ID for application requesting the info.
259 { "sid", eNSPT_Str, eNSPA_Optional },
260 // Password for blob access.
261 { "pass", eNSPT_Str, eNSPA_Optional },
262 // request Hit ID
263 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
264 } },
265 // Read all or part of contents of the "last" version of the blob.
266 // In response to the command NC sends blob contents, blob version it has
267 // and flag showing if this version can be considered "valid", i.e. if it's
268 // not expired yet.
269 { "READLAST",
270 {&CNCMessageHandler::x_DoCmd_GetLast,
271 "IC_READLAST",
272 eClientBlobRead | fNoBlobVersionCheck,
273 eNCReadData,
274 eProxyReadLast},
275 // Name of cache for blob.
276 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
277 // Blob's key.
278 { "key", eNSPT_Str, eNSPA_Required },
279 // Blob's subkey.
280 { "subkey", eNSPT_Str, eNSPA_Required },
281 // Starting position of the data that needs to be sent.
282 { "start", eNSPT_Int, eNSPA_Optional },
283 // Size of the data that needs to be sent.
284 { "size", eNSPT_Int, eNSPA_Optional },
285 // Quorum to use for this operation.
286 { "qrum", eNSPT_Int, eNSPA_Optional },
287 // Client IP for application requesting the info.
288 { "ip", eNSPT_Str, fNSPA_Optional },
289 // Session ID for application requesting the info.
290 { "sid", eNSPT_Str, eNSPA_Optional },
291 // Password for blob access.
292 { "pass", eNSPT_Str, eNSPA_Optional },
293 // Max age of blob (returned blob should be younger)
294 { "age", eNSPT_Int, eNSPA_Optional },
295 // request Hit ID
296 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
297 } },
298 // Mark the given blob version as "valid" and do that only if this version
299 // is still current and wasn't rewritten with another version.
300 { "SETVALID",
301 {&CNCMessageHandler::x_DoCmd_SetValid,
302 "IC_SETVALID",
303 fNeedsBlobAccess,
304 eNCRead,
305 eProxySetValid},
306 // Name of cache for blob.
307 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
308 // Blob's key.
309 { "key", eNSPT_Str, eNSPA_Required },
310 // Blob's version.
311 { "version", eNSPT_Int, eNSPA_Required },
312 // Blob's subkey.
313 { "subkey", eNSPT_Str, eNSPA_Required },
314 // Quorum to use for this operation.
315 { "qrum", eNSPT_Int, eNSPA_Optional },
316 // Client IP for application requesting the info.
317 { "ip", eNSPT_Str, fNSPA_Optional },
318 // Session ID for application requesting the info.
319 { "sid", eNSPT_Str, eNSPA_Optional },
320 // Password for blob access.
321 { "pass", eNSPT_Str, eNSPA_Optional },
322 // request Hit ID
323 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
324 } },
325 // Write blob contents. Command is issued only by other servers while
326 // mirroring just written blobs or processing quorum requirements,
327 // i.e. writing to other servers before answering to client that blob is
328 // written.
329 { "COPY_PUT",
330 {&CNCMessageHandler::x_DoCmd_CopyPut,
331 "COPY_PUT",
332 eCopyBlobFromPeer | fNeedsSpaceAsPeer | fReadExactBlobSize
333 | fCopyLogEvent,
334 eNCCopyCreate, eProxyNone},
335 // Name of cache for blob (for NC-generated blob keys this will be
336 // empty).
337 { { "cache", eNSPT_Str, eNSPA_Required },
338 // Blob's key.
339 { "key", eNSPT_Str, eNSPA_Required },
340 // Blob's subkey (for NC-generated blob keys this will be empty).
341 { "subkey", eNSPT_Str, eNSPA_Required },
342 // Blob's version (for NC-generated blob keys this will be equal to 0).
343 { "version", eNSPT_Int, eNSPA_Required },
344 // MD5 checksum of blob's password.
345 { "md5_pass",eNSPT_Str, eNSPA_Required },
346 // Creation time of the blob (microseconds since epoch).
347 { "cr_time", eNSPT_Int, eNSPA_Required },
348 // Time-to-live for the blob.
349 { "ttl", eNSPT_Int, eNSPA_Required },
350 // Dead-time for the blob (can be greater than expiration time).
351 { "dead", eNSPT_Int, eNSPA_Required },
352 // Expiration time for the blob
353 { "exp", eNSPT_Int, eNSPA_Required },
354 // Blob size.
355 { "size", eNSPT_Int, eNSPA_Required },
356 // Time-to-live for blob's version.
357 { "ver_ttl", eNSPT_Int, eNSPA_Required },
358 // Blob's version expiration time.
359 { "ver_dead",eNSPT_Int, eNSPA_Required },
360 // Server_id of the server where blob was created.
361 { "cr_srv", eNSPT_Int, eNSPA_Required },
362 // Id of the blob on the server where it was created.
363 { "cr_id", eNSPT_Int, eNSPA_Required },
364 // Record number of the event of blob creation in synchronization
365 // logs of the server where blob was created.
366 { "log_rec", eNSPT_Int, eNSPA_Required },
367 // Version of the command. Field exists for protocol backwards
368 // compatibility with previous versions of NC. In current NC this
369 // version is always 1.
370 { "cmd_ver", eNSPT_Int, eNSPA_Optional, "0" },
371 // Client IP for application that requested writing the blob.
372 // Parameter is not empty only if command is issued as part of
373 // quorum-related functionality, i.e. before client received
374 // confirmation of blob writing.
375 { "ip", eNSPT_Str, fNSPA_Optional },
376 // Session ID for application that requested writing the blob.
377 // Parameter is not empty only if command is issued as part of
378 // quorum-related functionality, i.e. before client received
379 // confirmation of blob writing.
380 { "sid", eNSPT_Str, eNSPA_Optional } } },
381 // Prolong blob lifetime. Command is issued only by other servers while
382 // mirroring prolonged blobs.
383 { "COPY_PROLONG",
384 {&CNCMessageHandler::x_DoCmd_CopyProlong,
385 "COPY_PROLONG",
386 eCopyBlobFromPeer,
387 eNCRead, eProxyNone},
388 // Name of cache for blob (for NC-generated blob keys this will be
389 // empty).
390 { { "cache", eNSPT_Str, eNSPA_Required },
391 // Blob's key.
392 { "key", eNSPT_Str, eNSPA_Required },
393 // Blob's subkey (for NC-generated blob keys this will be empty).
394 { "subkey", eNSPT_Str, eNSPA_Required },
395 // Creation time of the blob (microseconds since epoch).
396 { "cr_time", eNSPT_Int, eNSPA_Required },
397 // Server_id of the server where blob was created.
398 { "cr_srv", eNSPT_Int, eNSPA_Required },
399 // Id of the blob on the server where it was created.
400 { "cr_id", eNSPT_Int, eNSPA_Required },
401 // Dead-time for the blob (can be greater than expiration time).
402 { "dead", eNSPT_Int, eNSPA_Required },
403 // Expiration time for the blob
404 { "exp", eNSPT_Int, eNSPA_Required },
405 // Blob's version expiration time.
406 { "ver_dead",eNSPT_Int, eNSPA_Required },
407 // Time of creation of initial record in synchronization log about
408 // this operation.
409 { "log_time",eNSPT_Int, fNSPA_Optional },
410 // Server that first made the blob's life prolongation.
411 { "log_srv", eNSPT_Int, eNSPA_Optional },
412 // Record number of the initial record in synchronization log about
413 // this operation.
414 { "log_rec", eNSPT_Int, eNSPA_Optional } } },
415 // Write blob contents. Command for "NetCache" clients.
416 { "PUT3",
417 {&CNCMessageHandler::x_DoCmd_Put,
418 "PUT3",
419 eClientBlobWrite | fCanGenerateKey,
420 eNCCreate,
421 eProxyWrite},
422 // Time-to-live for the blob. If not given or 0 then default TTL
423 // is used.
424 { { "ttl", eNSPT_Int, eNSPA_Optional },
425 // Key of the blob. If it's not given or empty then new key will be
426 // generated.
427 { "key", eNSPT_NCID, eNSPA_Optional },
428 // Quorum to use for this operation.
429 { "qrum", eNSPT_Int, eNSPA_Optional },
430 // Client IP for application sending the command.
431 { "ip", eNSPT_Str, fNSPA_Optional },
432 // Session ID for application sending the command.
433 { "sid", eNSPT_Str, eNSPA_Optional },
434 // Password for blob access.
435 { "pass", eNSPT_Str, eNSPA_Optional },
436 // request Hit ID
437 { "ncbi_phid", eNSPT_Str, eNSPA_Optional },
438 // see ENCUserFlags, added in v6.11.0 (CXX-8737)
439 { "flags", eNSPT_Int, eNSPA_Optional }
440 } },
441 // Read blob contents. Command for "NetCache" clients.
442 { "GET2",
443 {&CNCMessageHandler::x_DoCmd_Get,
444 "GET2",
445 eClientBlobRead,
446 eNCReadData,
447 eProxyRead},
448 // Key of the blob.
449 { { "key", eNSPT_NCID, eNSPA_Required },
450 // Not used and not implemented parameter. Exists just for backwards
451 // compatibility with old clients.
452 { "NW", eNSPT_Id, eNSPA_Obsolete | fNSPA_Match },
453 // Quorum to use for this operation.
454 { "qrum", eNSPT_Int, eNSPA_Optional },
455 // Client IP for application sending the command.
456 { "ip", eNSPT_Str, fNSPA_Optional },
457 // Session ID for application sending the command.
458 { "sid", eNSPT_Str, eNSPA_Optional },
459 // Password for blob access.
460 { "pass", eNSPT_Str, eNSPA_Optional },
461 // Max age of blob (returned blob should be younger)
462 { "age", eNSPT_Int, eNSPA_Optional },
463 // request Hit ID
464 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
465 } },
466 // Check if blob exists. Command for "NetCache" clients.
467 { "HASB",
468 {&CNCMessageHandler::x_DoCmd_HasBlob,
469 "HASB",
470 eClientBlobRead | fPeerFindExistsOnly,
471 eNCRead,
472 eProxyHasBlob},
473 // Key of the blob.
474 { { "key", eNSPT_NCID, eNSPA_Required },
475 // Quorum to use for this operation.
476 { "qrum", eNSPT_Int, eNSPA_Optional },
477 // Client IP for application sending the command.
478 { "ip", eNSPT_Str, fNSPA_Optional },
479 // Session ID for application sending the command.
480 { "sid", eNSPT_Str, eNSPA_Optional },
481 // Password for blob access.
482 { "pass", eNSPT_Str, eNSPA_Optional },
483 // request Hit ID
484 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
485 } },
486 // Delete blob. Command for "NetCache" clients.
487 // If the blob doesn't exist command is still considered successful.
488 { "RMV2",
489 {&CNCMessageHandler::x_DoCmd_Remove,
490 "RMV2",
491 fNeedsBlobAccess | fNoBlobAccessStats,
492 eNCCreate,
493 eProxyRemove},
494 // Key of the blob.
495 { { "key", eNSPT_NCID, eNSPA_Required },
496 // Quorum to use for this operation.
497 { "qrum", eNSPT_Int, eNSPA_Optional },
498 // Client IP for application sending the command.
499 { "ip", eNSPT_Str, fNSPA_Optional },
500 // Session ID for application sending the command.
501 { "sid", eNSPT_Str, eNSPA_Optional },
502 // Password for blob access.
503 { "pass", eNSPT_Str, eNSPA_Optional },
504 // request Hit ID
505 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
506 } },
507 // Get size of the blob. Command for "NetCache" clients.
508 { "GSIZ",
509 {&CNCMessageHandler::x_DoCmd_GetSize,
510 "GetSIZe",
511 eClientBlobRead,
512 eNCRead,
513 eProxyGetSize},
514 // Key of the blob.
515 { { "key", eNSPT_NCID, eNSPA_Required },
516 // Quorum to use for this operation.
517 { "qrum", eNSPT_Int, eNSPA_Optional },
518 // Client IP for application sending the command.
519 { "ip", eNSPT_Str, fNSPA_Optional },
520 // Session ID for application sending the command.
521 { "sid", eNSPT_Str, eNSPA_Optional },
522 // Password for blob access.
523 { "pass", eNSPT_Str, eNSPA_Optional },
524 // request Hit ID
525 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
526 } },
527 // Delete blob. Command for "ICache" clients.
528 // If the blob doesn't exist command is still considered successful.
529 { "REMO",
530 {&CNCMessageHandler::x_DoCmd_Remove,
531 "IC_REMOve",
532 fNeedsBlobAccess | fNoBlobAccessStats,
533 eNCCreate,
534 eProxyRemove},
535 // Name of cache for blob.
536 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
537 // Blob's key.
538 { "key", eNSPT_Str, eNSPA_Required },
539 // Blob's version.
540 { "version", eNSPT_Int, eNSPA_Required },
541 // Blob's subkey.
542 { "subkey", eNSPT_Str, eNSPA_Required },
543 // Quorum to use for this operation.
544 { "qrum", eNSPT_Int, eNSPA_Optional },
545 // Client IP for application sending the command.
546 { "ip", eNSPT_Str, fNSPA_Optional },
547 // Session ID for application sending the command.
548 { "sid", eNSPT_Str, eNSPA_Optional },
549 // Password for blob access.
550 { "pass", eNSPT_Str, eNSPA_Optional },
551 // request Hit ID
552 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
553 } },
554 // Get size of the blob. Command for "ICache" clients.
555 { "GSIZ",
556 {&CNCMessageHandler::x_DoCmd_GetSize,
557 "IC_GetSIZe",
558 eClientBlobRead,
559 eNCRead,
560 eProxyGetSize},
561 // Name of cache for blob.
562 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
563 // Blob's key.
564 { "key", eNSPT_Str, eNSPA_Required },
565 // Blob's version.
566 { "version", eNSPT_Int, eNSPA_Required },
567 // Blob's subkey.
568 { "subkey", eNSPT_Str, eNSPA_Required },
569 // Quorum to use for this operation.
570 { "qrum", eNSPT_Int, eNSPA_Optional },
571 // Client IP for application sending the command.
572 { "ip", eNSPT_Str, fNSPA_Optional },
573 // Session ID for application sending the command.
574 { "sid", eNSPT_Str, eNSPA_Optional },
575 // Password for blob access.
576 { "pass", eNSPT_Str, eNSPA_Optional },
577 // request Hit ID
578 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
579 } },
580 // Read part of the blob contents. Command for "NetCache" clients.
581 { "GETPART",
582 {&CNCMessageHandler::x_DoCmd_Get,
583 "GETPART",
584 eClientBlobRead,
585 eNCReadData,
586 eProxyRead},
587 // Key of the blob.
588 { { "key", eNSPT_NCID, eNSPA_Required },
589 // Starting position of the data that needs to be sent.
590 { "start", eNSPT_Int, eNSPA_Required },
591 // Size of the data that needs to be sent.
592 { "size", eNSPT_Int, eNSPA_Required },
593 // Quorum to use for this operation.
594 { "qrum", eNSPT_Int, eNSPA_Optional },
595 // Client IP for application requesting the info.
596 { "ip", eNSPT_Str, fNSPA_Optional },
597 // Session ID for application requesting the info.
598 { "sid", eNSPT_Str, eNSPA_Optional },
599 // Password for blob access.
600 { "pass", eNSPT_Str, eNSPA_Optional },
601 // Max age of blob (returned blob should be younger)
602 { "age", eNSPT_Int, eNSPA_Optional },
603 // request Hit ID
604 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
605 } },
606 // Read part of the blob contents. Command for "ICache" clients.
607 { "READPART",
608 {&CNCMessageHandler::x_DoCmd_Get,
609 "IC_READPART",
610 eClientBlobRead,
611 eNCReadData,
612 eProxyRead},
613 // Name of cache for blob.
614 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
615 // Blob's key.
616 { "key", eNSPT_Str, eNSPA_Required },
617 // Blob's version.
618 { "version", eNSPT_Int, eNSPA_Required },
619 // Blob's subkey.
620 { "subkey", eNSPT_Str, eNSPA_Required },
621 // Starting position of the data that needs to be sent.
622 { "start", eNSPT_Int, eNSPA_Required },
623 // Size of the data that needs to be sent.
624 { "size", eNSPT_Int, eNSPA_Required },
625 // Quorum to use for this operation.
626 { "qrum", eNSPT_Int, eNSPA_Optional },
627 // Client IP for application requesting the info.
628 { "ip", eNSPT_Str, fNSPA_Optional },
629 // Session ID for application requesting the info.
630 { "sid", eNSPT_Str, eNSPA_Optional },
631 // Password for blob access.
632 { "pass", eNSPT_Str, eNSPA_Optional },
633 // Max age of blob (returned blob should be younger)
634 { "age", eNSPT_Int, eNSPA_Optional },
635 // request Hit ID
636 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
637 } },
638 // Get meta information about the blob. This command is sent only by other
639 // NC servers. And it's used to determine which server has the latest
640 // version of the blob. Thus response to this command is a line containing
641 // enough information to compare the blob's creation time with the same
642 // blob on other servers.
643 { "PROXY_META",
644 {&CNCMessageHandler::x_DoCmd_ProxyMeta,
645 "PROXY_META",
646 fNeedsBlobAccess | fNeedsStorageCache | fDoNotProxyToPeers
647 | fDoNotCheckPassword,
648 eNCRead, eProxyNone},
649 // Name of cache for blob (for NC-generated blob keys this will be
650 // empty).
651 { { "cache", eNSPT_Str, eNSPA_Required },
652 // Blob's key.
653 { "key", eNSPT_Str, eNSPA_Required },
654 // Blob's subkey (for NC-generated blob keys this will be empty).
655 { "subkey", eNSPT_Str, eNSPA_Required },
656 // Client IP for application on behalf of which the info is requested.
657 { "ip", eNSPT_Str, eNSPA_Required },
658 // Session ID for application on behalf of which the info is requested.
659 { "sid", eNSPT_Str, eNSPA_Required } } },
660
661 // another NC server notifies this one that the blob was updated there
662 // the blob data will come later (eg, via COPY_PUT)
663 { "COPY_UPD",
664 {&CNCMessageHandler::x_DoCmd_CopyUpdate,
665 "UPD",
666 fNoBlobAccessStats |
667 fNeedsBlobAccess | fNeedsStorageCache | fDoNotProxyToPeers
668 | fDoNotCheckPassword | fNoBlobVersionCheck,
669 #if USE_ALWAYS_COPY_UPD
670 eNCCopyCreate
671 #else
672 eNCRead
673 #endif
674 , eProxyNone },
675 // Name of cache for blob (for NC-generated blob keys this will be
676 // empty).
677 { { "cache", eNSPT_Str, eNSPA_Required },
678 // Blob's key.
679 { "key", eNSPT_Str, eNSPA_Required },
680 // Blob's subkey (for NC-generated blob keys this will be empty).
681 { "subkey", eNSPT_Str, eNSPA_Required },
682 // time of the update
683 { "cr_time", eNSPT_Int, eNSPA_Required },
684 // Server_id, where the update was made
685 { "cr_srv", eNSPT_Int, eNSPA_Required }
686 }
687 },
688 // Write the blob contents. This command is sent only by other NC servers
689 // in response to client's PUT3 (or similar) command when the server where
690 // client have sent initial command cannot execute it locally for any
691 // reason (slot is not processed by that server or initial database caching
692 // is not completed yet).
693 { "PROXY_PUT",
694 {&CNCMessageHandler::x_DoCmd_Put,
695 "PROXY_PUT",
696 eProxyBlobWrite | fDoNotProxyToPeers,
697 eNCCreate,
698 eProxyWrite},
699 // Name of cache for blob (for NC-generated blob keys this will be
700 // empty).
701 { { "cache", eNSPT_Str, eNSPA_Required },
702 // Blob's key.
703 { "key", eNSPT_Str, eNSPA_Required },
704 // Blob's subkey (for NC-generated blob keys this will be empty).
705 { "subkey", eNSPT_Str, eNSPA_Required },
706 // Blob's version (for NC-generated blob keys this will be equal to 0).
707 { "version", eNSPT_Int, eNSPA_Required },
708 // Time-to-live for the blob.
709 { "ttl", eNSPT_Int, eNSPA_Required },
710 // Quorum to use for this operation.
711 { "qrum", eNSPT_Int, eNSPA_Required },
712 // Client IP for application requesting the info.
713 { "ip", eNSPT_Str, eNSPA_Required },
714 // Session ID for application requesting the info.
715 { "sid", eNSPT_Str, eNSPA_Required },
716 // see ENCUserFlags, added in v6.11.0 (CXX-8737)
717 { "flags", eNSPT_Int, eNSPA_Optional },
718 // Password for blob access.
719 { "pass", eNSPT_Str, eNSPA_Optional }
720 } },
721 // Read all or a part of the blob contents. This command is sent only
722 // by other NC servers in response to client's GET2 (or similar) command
723 // when the server where client have sent initial command cannot execute it
724 // locally for any reason (slot is not processed by that server, or initial
725 // database caching is not completed yet, or it was determined that this
726 // server has latest version of the blob).
727 { "PROXY_GET",
728 {&CNCMessageHandler::x_DoCmd_Get,
729 "PROXY_GET",
730 eProxyBlobRead | fDoNotProxyToPeers,
731 eNCReadData,
732 eProxyRead},
733 // Name of cache for blob (for NC-generated blob keys this will be
734 // empty).
735 { { "cache", eNSPT_Str, eNSPA_Required },
736 // Blob's key.
737 { "key", eNSPT_Str, eNSPA_Required },
738 // Blob's subkey (for NC-generated blob keys this will be empty).
739 { "subkey", eNSPT_Str, eNSPA_Required },
740 // Blob's version (for NC-generated blob keys this will be equal to 0).
741 { "version", eNSPT_Int, eNSPA_Required },
742 // Starting position of the data that needs to be sent.
743 { "start", eNSPT_Int, eNSPA_Required },
744 // Size of the data that needs to be sent.
745 { "size", eNSPT_Int, eNSPA_Required },
746 // Quorum to use for this operation.
747 { "qrum", eNSPT_Int, eNSPA_Required },
748 // Value of the flag "search_on_read" to use with this command, i.e.
749 // whether this server should search the blob if it's not found
750 // locally.
751 { "srch", eNSPT_Int, eNSPA_Required },
752 // Flag whether local execution of this command should be forced
753 // no matter what, i.e. if this flag is set in cases when normal GET2
754 // command would have been proxied to other servers an error should
755 // be returned.
756 { "local", eNSPT_Int, eNSPA_Required },
757 // Client IP for application requesting the info.
758 { "ip", eNSPT_Str, eNSPA_Required },
759 // Session ID for application requesting the info.
760 { "sid", eNSPT_Str, eNSPA_Required },
761 // Password for blob access.
762 { "pass", eNSPT_Str, eNSPA_Optional },
763 // Max age of blob (returned blob should be younger)
764 { "age", eNSPT_Int, eNSPA_Optional }
765 } },
766 // Check if the blob exists. This command is sent only by other NC servers
767 // in response to client's HASB command when the server where
768 // client have sent initial command cannot execute it locally for any
769 // reason (slot is not processed by that server or initial database caching
770 // is not completed yet).
771 { "PROXY_HASB",
772 {&CNCMessageHandler::x_DoCmd_HasBlob,
773 "PROXY_HASB",
774 eProxyBlobRead | fPeerFindExistsOnly | fDoNotProxyToPeers,
775 eNCRead,
776 eProxyHasBlob},
777 // Name of cache for blob (for NC-generated blob keys this will be
778 // empty).
779 { { "cache", eNSPT_Str, eNSPA_Required },
780 // Blob's key.
781 { "key", eNSPT_Str, eNSPA_Required },
782 // Blob's subkey (for NC-generated blob keys this will be empty).
783 { "subkey", eNSPT_Str, eNSPA_Required },
784 // Quorum to use for this operation.
785 { "qrum", eNSPT_Int, eNSPA_Required },
786 // Client IP for application requesting the info.
787 { "ip", eNSPT_Str, eNSPA_Required },
788 // Session ID for application requesting the info.
789 { "sid", eNSPT_Str, eNSPA_Required },
790 // Password for blob access.
791 { "pass", eNSPT_Str, eNSPA_Optional } } },
792 // Get size of the blob. This command is sent only
793 // by other NC servers in response to client's GSIZ command
794 // when the server where client have sent initial command cannot execute it
795 // locally for any reason (slot is not processed by that server, or initial
796 // database caching is not completed yet, or it was determined that this
797 // server has latest version of the blob).
798 { "PROXY_GSIZ",
799 {&CNCMessageHandler::x_DoCmd_GetSize,
800 "PROXY_GetSIZe",
801 eProxyBlobRead | fDoNotProxyToPeers,
802 eNCRead,
803 eProxyGetSize},
804 // Name of cache for blob (for NC-generated blob keys this will be
805 // empty).
806 { { "cache", eNSPT_Str, eNSPA_Required },
807 // Blob's key.
808 { "key", eNSPT_Str, eNSPA_Required },
809 // Blob's subkey (for NC-generated blob keys this will be empty).
810 { "subkey", eNSPT_Str, eNSPA_Required },
811 // Blob's version (for NC-generated blob keys this will be equal to 0).
812 { "version", eNSPT_Int, eNSPA_Required },
813 // Quorum to use for this operation.
814 { "qrum", eNSPT_Int, eNSPA_Required },
815 // Value of the flag "search_on_read" to use with this command, i.e.
816 // whether this server should search the blob if it's not found
817 // locally.
818 { "srch", eNSPT_Int, eNSPA_Required },
819 // Flag whether local execution of this command should be forced
820 // no matter what, i.e. if this flag is set in cases when normal GET2
821 // command would have been proxied to other servers an error should
822 // be returned.
823 { "local", eNSPT_Int, eNSPA_Required },
824 // Client IP for application requesting the info.
825 { "ip", eNSPT_Str, eNSPA_Required },
826 // Session ID for application requesting the info.
827 { "sid", eNSPT_Str, eNSPA_Required },
828 // Password for blob access.
829 { "pass", eNSPT_Str, eNSPA_Optional } } },
830 // Read all or a part of contents of the "last version" of the blob.
831 // This command is sent only by other NC servers in response to client's
832 // READLAST command when the server where client have sent initial command
833 // cannot execute it locally for any reason (slot is not processed by that
834 // server, or initial database caching is not completed yet, or it was
835 // determined that this server has latest version of the blob).
836 { "PROXY_READLAST",
837 {&CNCMessageHandler::x_DoCmd_GetLast,
838 "PROXY_READLAST",
839 eProxyBlobRead | fNoBlobVersionCheck | fDoNotProxyToPeers,
840 eNCReadData,
841 eProxyReadLast},
842 // Name of cache for blob (for NC-generated blob keys this will be
843 // empty).
844 { { "cache", eNSPT_Str, eNSPA_Required },
845 // Blob's key.
846 { "key", eNSPT_Str, eNSPA_Required },
847 // Blob's subkey (for NC-generated blob keys this will be empty).
848 { "subkey", eNSPT_Str, eNSPA_Required },
849 // Starting position of the data that needs to be sent.
850 { "start", eNSPT_Int, eNSPA_Required },
851 // Size of the data that needs to be sent.
852 { "size", eNSPT_Int, eNSPA_Required },
853 // Quorum to use for this operation.
854 { "qrum", eNSPT_Int, eNSPA_Required },
855 // Value of the flag "search_on_read" to use with this command, i.e.
856 // whether this server should search the blob if it's not found
857 // locally.
858 { "srch", eNSPT_Int, eNSPA_Required },
859 // Flag whether local execution of this command should be forced
860 // no matter what, i.e. if this flag is set in cases when normal
861 // READLAST command would have been proxied to other servers an error
862 // should be returned.
863 { "local", eNSPT_Int, eNSPA_Required },
864 // Client IP for application requesting the info.
865 { "ip", eNSPT_Str, eNSPA_Required },
866 // Session ID for application requesting the info.
867 { "sid", eNSPT_Str, eNSPA_Required },
868 // Password for blob access.
869 { "pass", eNSPT_Str, eNSPA_Optional },
870 // Max age of blob (returned blob should be younger)
871 { "age", eNSPT_Int, eNSPA_Optional } } },
872 // Mark the "current version" of the blob as "valid".
873 // This command is sent only by other NC servers in response to client's
874 // SETVALID command when the server where client have sent initial command
875 // cannot execute it locally for any reason (slot is not processed by that
876 // server, or initial database caching is not completed yet).
877 { "PROXY_SETVALID",
878 {&CNCMessageHandler::x_DoCmd_SetValid,
879 "PROXY_SETVALID",
880 fNeedsBlobAccess | fNeedsStorageCache | fDoNotProxyToPeers,
881 eNCRead,
882 eProxySetValid},
883 // Name of cache for blob (for NC-generated blob keys this will be
884 // empty).
885 { { "cache", eNSPT_Str, eNSPA_Required },
886 // Blob's key.
887 { "key", eNSPT_Str, eNSPA_Required },
888 // Blob's subkey (for NC-generated blob keys this will be empty).
889 { "subkey", eNSPT_Str, eNSPA_Required },
890 // Blob's version (for NC-generated blob keys this will be equal to 0).
891 { "version", eNSPT_Int, eNSPA_Required },
892 // Client IP for application requesting the info.
893 { "ip", eNSPT_Str, eNSPA_Required },
894 // Session ID for application requesting the info.
895 { "sid", eNSPT_Str, eNSPA_Required },
896 // Password for blob access.
897 { "pass", eNSPT_Str, eNSPA_Optional } } },
898 // Remove the blob. This command is sent only by other NC servers in response
899 // to client's RMV2 (or similar) command when the server where client have
900 // sent initial command cannot execute it locally for any reason (slot is not
901 // processed by that server, or initial database caching is not completed yet).
902 { "PROXY_RMV",
903 {&CNCMessageHandler::x_DoCmd_Remove,
904 "PROXY_ReMoVe",
905 fNeedsBlobAccess | fNoBlobAccessStats | fDoNotProxyToPeers,
906 eNCCreate,
907 eProxyRemove},
908 // Name of cache for blob (for NC-generated blob keys this will be
909 // empty).
910 { { "cache", eNSPT_Str, eNSPA_Required },
911 // Blob's key.
912 { "key", eNSPT_Str, eNSPA_Required },
913 // Blob's subkey (for NC-generated blob keys this will be empty).
914 { "subkey", eNSPT_Str, eNSPA_Required },
915 // Blob's version (for NC-generated blob keys this will be equal to 0).
916 { "version", eNSPT_Int, eNSPA_Required },
917 // Quorum to use for this operation.
918 { "qrum", eNSPT_Int, eNSPA_Required },
919 // Client IP for application requesting the info.
920 { "ip", eNSPT_Str, eNSPA_Required },
921 // Session ID for application requesting the info.
922 { "sid", eNSPT_Str, eNSPA_Required },
923 // Password for blob access.
924 { "pass", eNSPT_Str, eNSPA_Optional }
925 } },
926 // Remove the blob. This command is sent by other NC servers in certain scenarios only
927 { "COPY_RMV",
928 {&CNCMessageHandler::x_DoCmd_Remove,
929 "COPY_ReMoVe",
930 fNeedsBlobAccess | fNoBlobAccessStats | fDoNotProxyToPeers
931 | fDoNotCheckPassword | fNoBlobVersionCheck | fCopyLogEvent,
932 eNCRead,
933 eProxyRemove},
934 // Name of cache for blob (for NC-generated blob keys this will be
935 // empty).
936 { { "cache", eNSPT_Str, eNSPA_Required },
937 // Blob's key.
938 { "key", eNSPT_Str, eNSPA_Required },
939 // Blob's subkey (for NC-generated blob keys this will be empty).
940 { "subkey", eNSPT_Str, eNSPA_Required },
941 // time of the update
942 { "cr_time", eNSPT_Int, eNSPA_Required },
943 // Server_id, requestor
944 { "cr_srv", eNSPT_Int, eNSPA_Required }
945 }
946 },
947 // Read meta information about the blob. This command is sent only by other
948 // NC servers in response to client's GETMETA command when the server where
949 // client have sent initial command cannot execute it locally for any reason
950 // (slot is not processed by that server, or initial database caching
951 // is not completed yet, or it was determined that this server has latest
952 // version of the blob).
953 { "PROXY_GETMETA",
954 {&CNCMessageHandler::x_DoCmd_GetMeta,
955 "PROXY_GETMETA",
956 eProxyBlobRead | fDoNotCheckPassword | fDoNotProxyToPeers,
957 eNCRead,
958 eProxyGetMeta},
959 // Name of cache for blob (for NC-generated blob keys this will be
960 // empty).
961 { { "cache", eNSPT_Str, eNSPA_Required },
962 // Blob's key.
963 { "key", eNSPT_Str, eNSPA_Required },
964 // Blob's subkey (for NC-generated blob keys this will be empty).
965 { "subkey", eNSPT_Str, eNSPA_Required },
966 // Quorum to use for this operation.
967 { "qrum", eNSPT_Int, eNSPA_Required },
968 // Flag whether local execution of this command should be forced
969 // no matter what, i.e. if this flag is set in cases when normal
970 // GETMETA command would have been proxied to other servers an error
971 // should be returned.
972 { "local", eNSPT_Int, eNSPA_Required },
973 // Client IP for application requesting the info.
974 { "ip", eNSPT_Str, eNSPA_Required },
975 // Session ID for application requesting the info.
976 { "sid", eNSPT_Str, eNSPA_Required },
977 // HTTP flag: read input in NC format, write output in HTTP one
978 { "http", eNSPT_Int, eNSPA_Optional }
979 } },
980 // Start periodic synchronization session. Command is sent only by other
981 // NC servers when CNCActiveSyncControl in them decides to start
982 // synchronization. Response to this command contains list of events from
983 // sync logs of this server which need to be synchronized. Or if this
984 // server understands that synchronization using blob lists is needed then
985 // first line of response will contain ALL_BLOBS word and then full list
986 // of blobs in this slot will be sent.
987 { "SYNC_START",
988 {&CNCMessageHandler::x_DoCmd_SyncStart,
989 "SYNC_START",
990 // this command does not need space, but, to do sync, we WILL need space
991 fNeedsSpaceAsPeer |
992 fNeedsStorageCache | fNeedsLowerPriority | fNeedsAdminClient
993 , eNCNone, eProxyNone},
994 // Server id of the server starting synchronization.
995 { { "srv_id", eNSPT_Int, eNSPA_Required },
996 // Slot to start synchronization on.
997 { "slot", eNSPT_Int, eNSPA_Required },
998 // Last synchronized record number (in sync log) of _that_ server
999 // as _that_ server thinks.
1000 { "rec_my", eNSPT_Int, eNSPA_Required },
1001 // Last synchronized record number (in sync log) of _this_ server
1002 // as _that_ server thinks.
1003 { "rec_your",eNSPT_Int, eNSPA_Required } } },
1004 // Get full list of blobs for the slot. Command is sent only by other NC
1005 // servers when that server decides that synchronization using blob lists
1006 // is needed. Command can be sent only after successful execution of
1007 // SYNC_START command.
1008 { "SYNC_BLIST",
1009 {&CNCMessageHandler::x_DoCmd_SyncBlobsList,
1010 "SYNC_BLIST",
1011 eRunsInStartedSync, eNCNone, eProxyNone},
1012 // Server id of the server managing the synchronization.
1013 { { "srv_id", eNSPT_Int, eNSPA_Required },
1014 // Slot that synchronization is started on.
1015 { "slot", eNSPT_Int, eNSPA_Required } } },
1016 // Write blob contents. This command is sent only by other NC servers
1017 // during synchronization session if some blob was written on that server
1018 // and the same data didn't make it to this server yet.
1019 { "SYNC_PUT",
1020 {&CNCMessageHandler::x_DoCmd_CopyPut,
1021 "SYNC_PUT",
1022 eSyncBlobCmd | fNeedsSpaceAsPeer
1023 | fReadExactBlobSize | fCopyLogEvent,
1024 eNCCopyCreate, eProxyNone},
1025 // Server id of the server managing the synchronization.
1026 { { "srv_id", eNSPT_Int, eNSPA_Required },
1027 // Slot that synchronization is started on.
1028 { "slot", eNSPT_Int, eNSPA_Required },
1029 // Name of cache for blob (for NC-generated blob keys this will be
1030 // empty).
1031 { "cache", eNSPT_Str, eNSPA_Required },
1032 // Blob's key.
1033 { "key", eNSPT_Str, eNSPA_Required },
1034 // Blob's subkey (for NC-generated blob keys this will be empty).
1035 { "subkey", eNSPT_Str, eNSPA_Required },
1036 // Blob's version (for NC-generated blob keys this will be equal to 0).
1037 { "version", eNSPT_Int, eNSPA_Required },
1038 // MD5 checksum of blob's password.
1039 { "md5_pass",eNSPT_Str, eNSPA_Required },
1040 // Creation time of the blob (microseconds since epoch).
1041 { "cr_time", eNSPT_Int, eNSPA_Required },
1042 // Time-to-live for the blob.
1043 { "ttl", eNSPT_Int, eNSPA_Required },
1044 // Dead-time for the blob (can be greater than expiration time).
1045 { "dead", eNSPT_Int, eNSPA_Required },
1046 // Expiration time for the blob
1047 { "exp", eNSPT_Int, eNSPA_Required },
1048 // Blob size.
1049 { "size", eNSPT_Int, eNSPA_Required },
1050 // Time-to-live for blob's version.
1051 { "ver_ttl", eNSPT_Int, eNSPA_Required },
1052 // Blob's version expiration time.
1053 { "ver_dead",eNSPT_Int, eNSPA_Required },
1054 // Server_id of the server where blob was created.
1055 { "cr_srv", eNSPT_Int, eNSPA_Required },
1056 // Id of the blob on the server where it was created.
1057 { "cr_id", eNSPT_Int, eNSPA_Required },
1058 // Record number of the event of blob creation in synchronization
1059 // logs of the server where blob was created.
1060 { "log_rec", eNSPT_Int, eNSPA_Required },
1061 // Version of the command. Field exists for protocol backwards
1062 // compatibility with previous versions of NC. In current NC this
1063 // version is always 1.
1064 { "cmd_ver", eNSPT_Int, eNSPA_Optional, "0" } } },
1065 // Prolong the blob's life. This command is sent only by other NC servers
1066 // during synchronization session if some blob was prolonged on that server
1067 // and the same prolongation didn't happen on this server yet.
1068 { "SYNC_PROLONG",
1069 {&CNCMessageHandler::x_DoCmd_CopyProlong,
1070 "SYNC_PROLONG",
1071 eSyncBlobCmd,
1072 eNCRead, eProxyNone},
1073 // Server id of the server managing the synchronization.
1074 { { "srv_id", eNSPT_Int, eNSPA_Required },
1075 // Slot that synchronization is started on.
1076 { "slot", eNSPT_Int, eNSPA_Required },
1077 // Name of cache for blob (for NC-generated blob keys this will be
1078 // empty).
1079 { "cache", eNSPT_Str, eNSPA_Required },
1080 // Blob's key.
1081 { "key", eNSPT_Str, eNSPA_Required },
1082 // Blob's subkey (for NC-generated blob keys this will be empty).
1083 { "subkey", eNSPT_Str, eNSPA_Required },
1084 // Creation time of the blob (microseconds since epoch).
1085 { "cr_time", eNSPT_Int, eNSPA_Required },
1086 // Server_id of the server where blob was created.
1087 { "cr_srv", eNSPT_Int, eNSPA_Required },
1088 // Id of the blob on the server where it was created.
1089 { "cr_id", eNSPT_Int, eNSPA_Required },
1090 // Dead-time for the blob (can be greater than expiration time).
1091 { "dead", eNSPT_Int, eNSPA_Required },
1092 // Expiration time for the blob
1093 { "exp", eNSPT_Int, eNSPA_Required },
1094 // Blob's version expiration time.
1095 { "ver_dead",eNSPT_Int, eNSPA_Required },
1096 // Time of creation of initial record in synchronization log about
1097 // this operation.
1098 { "log_time",eNSPT_Int, fNSPA_Optional },
1099 // Server that first made the blob's life prolongation.
1100 { "log_srv", eNSPT_Int, eNSPA_Optional },
1101 // Record number of the initial record in synchronization log about
1102 // this operation.
1103 { "log_rec", eNSPT_Int, eNSPA_Optional } } },
1104 // Read blob contents. This command is sent only by other NC servers
1105 // during synchronization session if some blob was written on this server
1106 // and the same data didn't make it to that server yet.
1107 { "SYNC_GET",
1108 {&CNCMessageHandler::x_DoCmd_SyncGet,
1109 "SYNC_GET",
1110 eSyncBlobCmd,
1111 eNCReadData, eProxyNone},
1112 // Server id of the server managing the synchronization.
1113 { { "srv_id", eNSPT_Int, eNSPA_Required },
1114 // Slot that synchronization is started on.
1115 { "slot", eNSPT_Int, eNSPA_Required },
1116 // Name of cache for blob (for NC-generated blob keys this will be
1117 // empty).
1118 { "cache", eNSPT_Str, eNSPA_Required },
1119 // Blob's key.
1120 { "key", eNSPT_Str, eNSPA_Required },
1121 // Blob's subkey (for NC-generated blob keys this will be empty).
1122 { "subkey", eNSPT_Str, eNSPA_Required },
1123 // Time of creation of initial record in synchronization log about
1124 // this operation.
1125 { "log_time",eNSPT_Int, eNSPA_Required },
1126 // Creation time of the blob (microseconds since epoch).
1127 { "cr_time", eNSPT_Int, eNSPA_Required },
1128 // Server_id of the server where blob was created.
1129 { "cr_srv", eNSPT_Int, eNSPA_Required },
1130 // Id of the blob on the server where it was created.
1131 { "cr_id", eNSPT_Int, eNSPA_Required } } },
1132 // Get information necessary to prolong the blob's life. This command
1133 // is sent only by other NC servers during synchronization session if some
1134 // blob was prolonged on this server and the same prolongation didn't
1135 // happen on that server yet.
1136 { "SYNC_PROINFO",
1137 {&CNCMessageHandler::x_DoCmd_SyncProlongInfo,
1138 "SYNC_PROINFO",
1139 eSyncBlobCmd,
1140 eNCRead, eProxyNone},
1141 // Server id of the server managing the synchronization.
1142 { { "srv_id", eNSPT_Int, eNSPA_Required },
1143 // Slot that synchronization is started on.
1144 { "slot", eNSPT_Int, eNSPA_Required },
1145 // Name of cache for blob (for NC-generated blob keys this will be
1146 // empty).
1147 { "cache", eNSPT_Str, eNSPA_Required },
1148 // Blob's key.
1149 { "key", eNSPT_Str, eNSPA_Required },
1150 // Blob's subkey (for NC-generated blob keys this will be empty).
1151 { "subkey", eNSPT_Str, eNSPA_Required } } },
1152 // "Commit" the synchronization session. This command is sent only by other
1153 // NC servers at the end of synchronization session when all necessary
1154 // commands have been executed successfully.
1155 { "SYNC_COMMIT",
1156 {&CNCMessageHandler::x_DoCmd_SyncCommit,
1157 "SYNC_COMMIT",
1158 eRunsInStartedSync | fProhibitsSyncAbort, eNCNone, eProxyNone},
1159 // Server id of the server managing the synchronization.
1160 { { "srv_id", eNSPT_Int, eNSPA_Required },
1161 // Slot that synchronization is started on.
1162 { "slot", eNSPT_Int, eNSPA_Required },
1163 // Last synchronized record number (in sync log) of _that_ server.
1164 { "rec_my", eNSPT_Int, eNSPA_Required },
1165 // Last synchronized record number (in sync log) of _this_ server.
1166 { "rec_your",eNSPT_Int, eNSPA_Required } } },
1167 // "Cancel" the synchronization session. This command is sent only by other
1168 // NC servers at the end of synchronization session when either this server
1169 // requested or that server decided that synchronization should be aborted
1170 // despite the successful execution of all commands. Reason for
1171 // cancellation could be some server going to shutdown, or requirement to
1172 // clean sync logs (and synchronization already executes for too long).
1173 // The cancellation doesn't cancel any commands already executed in this
1174 // synchronization session. It exists only to quickly mark this
1175 // synchronization as no longer executing so that NC could start
1176 // synchronization with some other server.
1177 { "SYNC_CANCEL",
1178 {&CNCMessageHandler::x_DoCmd_SyncCancel,
1179 "SYNC_CANCEL",
1180 eRunsInStartedSync | fProhibitsSyncAbort, eNCNone, eProxyNone},
1181 // Server id of the server managing the synchronization.
1182 { { "srv_id", eNSPT_Int, eNSPA_Required },
1183 // Slot that synchronization is started on.
1184 { "slot", eNSPT_Int, eNSPA_Required } } },
1185 // Get meta information about the blob. Command for "NetCache" clients.
1186 { "GETMETA",
1187 {&CNCMessageHandler::x_DoCmd_GetMeta,
1188 "GETMETA",
1189 eClientBlobRead | fDoNotCheckPassword,
1190 eNCRead,
1191 eProxyGetMeta},
1192 // Key of the blob
1193 { { "key", eNSPT_NCID, eNSPA_Required },
1194 // Flag forcing local execution of the command (without forwarding
1195 // to other servers and without searching for blob on them).
1196 { "local", eNSPT_Int, eNSPA_Optional },
1197 // Quorum to use for this operation.
1198 { "qrum", eNSPT_Int, eNSPA_Optional },
1199 // Client IP for application sending the command.
1200 { "ip", eNSPT_Str, fNSPA_Optional },
1201 // Session ID for application sending the command.
1202 { "sid", eNSPT_Str, eNSPA_Optional },
1203 // request Hit ID
1204 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1205 } },
1206 // Get meta information about the blob. Command for "ICache" clients.
1207 { "GETMETA",
1208 {&CNCMessageHandler::x_DoCmd_GetMeta,
1209 "IC_GETMETA",
1210 eClientBlobRead | fDoNotCheckPassword,
1211 eNCRead,
1212 eProxyGetMeta},
1213 // Name of cache for blob.
1214 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
1215 // Blob's key.
1216 { "key", eNSPT_Str, eNSPA_Required },
1217 // Blob's version.
1218 { "version", eNSPT_Int, eNSPA_Required },
1219 // Blob's subkey.
1220 { "subkey", eNSPT_Str, eNSPA_Required },
1221 // Flag forcing local execution of the command (without forwarding
1222 // to other servers and without searching for blob on them).
1223 { "local", eNSPT_Int, eNSPA_Optional },
1224 // Quorum to use for this operation.
1225 { "qrum", eNSPT_Int, eNSPA_Optional },
1226 // Client IP for application sending the command.
1227 { "ip", eNSPT_Str, fNSPA_Optional },
1228 // Session ID for application sending the command.
1229 { "sid", eNSPT_Str, eNSPA_Optional },
1230 // request Hit ID
1231 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1232 } },
1233 // Prolong blob's life for a specific number of seconds from current time.
1234 // Command provides a minimum time this blob should be still available for.
1235 // If blob's expiration time was already later than that this command is
1236 // a no-op.
1237 { "PROLONG",
1238 {&CNCMessageHandler::x_DoCmd_Prolong,
1239 "PROLONG",
1240 eClientBlobRead,
1241 eNCRead,
1242 eProxyProlong},
1243 // Name of cache for blob (for NC-generated blob keys this will be
1244 // empty).
1245 { { "cache", eNSPT_Str, eNSPA_Required },
1246 // Blob's key.
1247 { "key", eNSPT_Str, eNSPA_Required },
1248 // Blob's subkey (for NC-generated blob keys this will be empty).
1249 { "subkey", eNSPT_Str, eNSPA_Required },
1250 // Period of time for the blob to be available.
1251 { "ttl", eNSPT_Int, eNSPA_Required },
1252 // Quorum to use for this operation.
1253 { "qrum", eNSPT_Int, eNSPA_Optional },
1254 // Client IP for application sending the command.
1255 { "ip", eNSPT_Str, fNSPA_Optional },
1256 // Session ID for application sending the command.
1257 { "sid", eNSPT_Str, eNSPA_Optional },
1258 // Password for blob access.
1259 { "pass", eNSPT_Str, eNSPA_Optional },
1260 // request Hit ID
1261 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1262 } },
1263 // Prolong blob's life for a specific number of seconds from current time.
1264 // This command is sent only by other NC servers in response to client's
1265 // PROLONG command when the server where client have sent initial command
1266 // cannot execute it locally for any reason (slot is not processed by that
1267 // server, or initial database caching is not completed yet, or it was
1268 // determined that this server has latest version of the blob).
1269 { "PROXY_PROLONG",
1270 {&CNCMessageHandler::x_DoCmd_Prolong,
1271 "PROXY_PROLONG",
1272 eProxyBlobRead | fDoNotProxyToPeers,
1273 eNCRead,
1274 eProxyProlong},
1275 // Name of cache for blob (for NC-generated blob keys this will be
1276 // empty).
1277 { { "cache", eNSPT_Str, eNSPA_Required },
1278 // Blob's key.
1279 { "key", eNSPT_Str, eNSPA_Required },
1280 // Blob's subkey (for NC-generated blob keys this will be empty).
1281 { "subkey", eNSPT_Str, eNSPA_Required },
1282 // Period of time for the blob to be available.
1283 { "ttl", eNSPT_Int, eNSPA_Required },
1284 // Quorum to use for this operation.
1285 { "qrum", eNSPT_Int, eNSPA_Required },
1286 // Flag whether local execution of this command should be forced
1287 // no matter what, i.e. if this flag is set in cases when normal GET2
1288 // command would have been proxied to other servers an error should
1289 // be returned.
1290 { "local", eNSPT_Int, eNSPA_Required },
1291 // Value of the flag "search_on_read" to use with this command, i.e.
1292 // whether this server should search the blob if it's not found
1293 // locally.
1294 { "srch", eNSPT_Int, eNSPA_Required },
1295 // Client IP for application sending the command.
1296 { "ip", eNSPT_Str, eNSPA_Required },
1297 // Session ID for application sending the command.
1298 { "sid", eNSPT_Str, eNSPA_Required },
1299 // Password for blob access.
1300 { "pass", eNSPT_Str, eNSPA_Optional } } },
1301 /*{ "BLOBSLIST",
1302 {&CNCMessageHandler::x_DoCmd_GetBlobsList,
1303 "BLOBSLIST",
1304 fNeedsStorageCache + fNeedsAdminClient} },*/
1305 // Write blob contents. Deprecated command used now only by old clients.
1306 // This command is the same as PUT3 except it uses connection closing as
1307 // legitimate EOF marker for blob's data.
1308 { "PUT2",
1309 {&CNCMessageHandler::x_DoCmd_Put,
1310 "PUT2",
1311 eClientBlobWrite | fCanGenerateKey | fCursedPUT2Cmd,
1312 eNCCreate,
1313 eProxyWrite},
1314 // Time-to-live for the blob.
1315 { { "ttl", eNSPT_Int, eNSPA_Optional },
1316 // Key of the blob (if skipped or empty then new one will be created).
1317 { "key", eNSPT_NCID, eNSPA_Optional } } },
1318 // Shutdown the server
1319 { "SHUTDOWN",
1320 {&CNCMessageHandler::x_DoCmd_Shutdown,
1321 "SHUTDOWN",
1322 fNeedsAdminClient, eNCNone, eProxyNone},
1323 // Client IP for application sending the command.
1324 { { "ip", eNSPT_Str, fNSPA_Optional },
1325 // Session ID for application sending the command.
1326 { "sid", eNSPT_Str, eNSPA_Optional },
1327 // drain: wait until all BLOBs are expired, then shutdown
1328 { "drain", eNSPT_Int, eNSPA_Optional },
1329 // reset: shutdown and leave database guard on disk (CXX-10401)
1330 { "reset", eNSPT_Int, eNSPA_Optional },
1331 // request Hit ID
1332 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1333 } },
1334 // Get server statistics.
1335 { "GETSTAT",
1336 {&CNCMessageHandler::x_DoCmd_GetStat, "GETSTAT"},
1337 // Flag showing whether current (value is 0) or previous (value is 1)
1338 // statistics period should be shown.
1339 { { "prev", eNSPT_Int, fNSPA_Optional, "0" },
1340 // Type of statistics period to show. See top of nc_stat.cpp for
1341 // list of all possible period types.
1342 { "type", eNSPT_Str, eNSPA_Optional, "life" },
1343 // Client IP for application sending the command.
1344 { "ip", eNSPT_Str, fNSPA_Optional },
1345 // Session ID for application sending the command.
1346 { "sid", eNSPT_Str, eNSPA_Optional },
1347 // request Hit ID
1348 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1349 } },
1350 // Read full ini-file used by NetCache for configuration.
1351 { "GETCONF",
1352 {&CNCMessageHandler::x_DoCmd_GetConfig, "GETCONF",
1353 fNoCmdFlags, eNCNone, eProxyNone},
1354 {
1355 // Netcached.ini section name
1356 { "section", eNSPT_Str, eNSPA_Optional },
1357 // when section name is "netcache", setup for this port
1358 { "port", eNSPT_Str, eNSPA_Optional },
1359 // when section name is "netcache", setup for this cache
1360 { "cache", eNSPT_Str, eNSPA_Optional },
1361 // Client IP for application sending the command.
1362 { "ip", eNSPT_Str, fNSPA_Optional },
1363 // Session ID for application sending the command.
1364 { "sid", eNSPT_Str, eNSPA_Optional },
1365 // request Hit ID
1366 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1367 } },
1368 // Get state information, added in v6.8.6
1369 { "INFO",
1370 {&CNCMessageHandler::x_DoCmd_GetConfig, "INFO",
1371 fNoCmdFlags, eNCNone, eProxyNone},
1372 {
1373 // Netcached.ini section name
1374 { "section", eNSPT_Str, fNSPA_Required },
1375 // when section name is "netcache", setup for this port
1376 { "port", eNSPT_Str, eNSPA_Optional },
1377 // when section name is "netcache", setup for this cache
1378 { "cache", eNSPT_Str, eNSPA_Optional },
1379 // Client IP for application sending the command.
1380 { "ip", eNSPT_Str, fNSPA_Optional },
1381 // Session ID for application sending the command.
1382 { "sid", eNSPT_Str, eNSPA_Optional },
1383 // request Hit ID
1384 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1385 } },
1386 // Acknowledge alert, added in v6.8.6
1387 { "ACKALERT",
1388 {&CNCMessageHandler::x_DoCmd_AckAlert, "ACKALERT",
1389 fNeedsAdminClient, eNCNone, eProxyNone},
1390 {
1391 // Alert name
1392 { "alert", eNSPT_Str, fNSPA_Required },
1393 // User name
1394 { "user", eNSPT_Str, fNSPA_Required },
1395 // Client IP for application sending the command.
1396 { "ip", eNSPT_Str, fNSPA_Optional },
1397 // Session ID for application sending the command.
1398 { "sid", eNSPT_Str, eNSPA_Optional },
1399 // request Hit ID
1400 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1401 } },
1402 // Re-read ini-file used by NetCache for configuration.
1403 // Find changes and reconfigure
1404 // Only few changes are supported
1405 { "RECONF",
1406 {&CNCMessageHandler::x_DoCmd_ReConfig,
1407 "RECONF",
1408 fNeedsAdminClient, eNCNone, eProxyNone},
1409 {
1410 // Netcached.ini section name
1411 { "section", eNSPT_Str, eNSPA_Required },
1412 // Client IP for application sending the command.
1413 { "ip", eNSPT_Str, fNSPA_Optional },
1414 // Session ID for application sending the command.
1415 { "sid", eNSPT_Str, eNSPA_Optional },
1416 // request Hit ID
1417 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1418 } },
1419
1420 { "PURGE",
1421 {&CNCMessageHandler::x_DoCmd_Purge,
1422 "PURGE",
1423 fNoCmdFlags, eNCNone, eProxyNone},
1424 {
1425 // Cache name.
1426 { "cache", eNSPT_Str, eNSPA_Required },
1427 // Client IP for application sending the command.
1428 { "ip", eNSPT_Str, fNSPA_Optional },
1429 // Session ID for application sending the command.
1430 { "sid", eNSPT_Str, eNSPA_Optional },
1431 // request Hit ID
1432 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1433 } },
1434 { "COPY_PURGE",
1435 {&CNCMessageHandler::x_DoCmd_CopyPurge,
1436 "COPY_PURGE", fNoCmdFlags, eNCNone, eProxyNone },
1437 // Cache name.
1438 { { "cache", eNSPT_Str, eNSPA_Required },
1439 // forget blobs created earlier than cr_time
1440 { "cr_time", eNSPT_Int, eNSPA_Required } } },
1441
1442 // Added in 6.11.7, CXX-8948
1443 { "PURGE2",
1444 {&CNCMessageHandler::x_DoCmd_Purge,
1445 "PURGE",
1446 fNoCmdFlags, eNCNone, eProxyNone},
1447 // Cache name.
1448 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
1449 // Blob's key.
1450 { "key", eNSPT_Str, eNSPA_Required },
1451 // Client IP for application sending the command.
1452 { "ip", eNSPT_Str, fNSPA_Optional },
1453 // Session ID for application sending the command.
1454 { "sid", eNSPT_Str, eNSPA_Optional },
1455 // request Hit ID
1456 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1457 } },
1458 // Added in 6.11.7, CXX-8948
1459 { "COPY_PURGE2",
1460 {&CNCMessageHandler::x_DoCmd_CopyPurge,
1461 "COPY_PURGE", fNoCmdFlags, eNCNone, eProxyNone },
1462 // Cache name.
1463 { { "cache", eNSPT_Str, eNSPA_Required },
1464 // Blob's key.
1465 { "key", eNSPT_Str, eNSPA_Required },
1466 // forget blobs created earlier than cr_time
1467 { "cr_time", eNSPT_Int, eNSPA_Required } } },
1468
1469 // Get list of blobs by mask: "cache,key,*"
1470 // Added in 6.9.0, CXX-6246
1471 { "BLIST",
1472 {&CNCMessageHandler::x_DoCmd_GetBList,
1473 "BLIST",
1474 fComesFromClient |
1475 fNeedsStorageCache | fNeedsBlobList | fDoNotCheckPassword,
1476 eNCNone, eProxyGetBList},
1477 // Name of cache for blob.
1478 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
1479 // Blob's key.
1480 { "key", eNSPT_Str, eNSPA_Required },
1481 // Blob's subkey.
1482 { "subkey", eNSPT_Str, eNSPA_Optional },
1483 { "local", eNSPT_Int, eNSPA_Optional },
1484 // Client IP for application sending the command.
1485 { "ip", eNSPT_Str, fNSPA_Optional },
1486 // Session ID for application sending the command.
1487 { "sid", eNSPT_Str, eNSPA_Optional },
1488 // request Hit ID
1489 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1490 } },
1491 // added in v6.11.0 (CXX-8737)
1492 { "BLIST2",
1493 {&CNCMessageHandler::x_DoCmd_GetBList,
1494 "BLIST2",
1495 fComesFromClient |
1496 fNeedsStorageCache | fNeedsBlobList | fDoNotProxyToPeers | fDoNotCheckPassword,
1497 eNCNone, eProxyGetBList2},
1498 // Name of cache for blob.
1499 { { "cache", eNSPT_Id, eNSPA_ICPrefix },
1500 // Blob's key.
1501 { "key", eNSPT_Str, eNSPA_Optional },
1502 // Blob's subkey.
1503 { "subkey", eNSPT_Str, eNSPA_Optional },
1504 { "local", eNSPT_Int, eNSPA_Optional },
1505
1506 // Created more than N seconds ago
1507 { "fcr_ago_ge", eNSPT_Int, eNSPA_Optional },
1508 // Created less than N seconds ago
1509 { "fcr_ago_lt", eNSPT_Int, eNSPA_Optional },
1510 // Created more than N seconds since epoch
1511 { "fcr_epoch_ge", eNSPT_Int, eNSPA_Optional },
1512 // Created less than N seconds since epoch
1513 { "fcr_epoch_lt", eNSPT_Int, eNSPA_Optional },
1514 // Blob will expire in more than N seconds from now
1515 { "fexp_now_ge", eNSPT_Int, eNSPA_Optional },
1516 // Blob will expire in less than N seconds from now
1517 { "fexp_now_lt", eNSPT_Int, eNSPA_Optional },
1518 // Blob will expire in more than N seconds since epoch
1519 { "fexp_epoch_ge", eNSPT_Int, eNSPA_Optional },
1520 // Blob will expire in less than N seconds since epoch
1521 { "fexp_epoch_lt", eNSPT_Int, eNSPA_Optional },
1522 // Version will expire in more than N seconds from now
1523 { "fvexp_now_ge", eNSPT_Int, eNSPA_Optional },
1524 // Version will expire in less than N seconds from now
1525 { "fvexp_now_lt", eNSPT_Int, eNSPA_Optional },
1526 // Version will expire in more than N seconds since epoch
1527 { "fvexp_epoch_ge", eNSPT_Int, eNSPA_Optional },
1528 // Version will expire in less than N seconds since epoch
1529 { "fvexp_epoch_lt", eNSPT_Int, eNSPA_Optional },
1530 // Server_id of the server where blob was created.
1531 { "fcr_srv", eNSPT_Int, eNSPA_Optional },
1532 // blob bigger than this size
1533 { "fsize_ge", eNSPT_Int, eNSPA_Optional },
1534 // blob smaller than this size
1535 { "fsize_lt", eNSPT_Int, eNSPA_Optional },
1536 // Client IP for application sending the command.
1537 { "ip", eNSPT_Str, fNSPA_Optional },
1538 // Session ID for application sending the command.
1539 { "sid", eNSPT_Str, eNSPA_Optional },
1540 // request Hit ID
1541 { "ncbi_phid", eNSPT_Str, eNSPA_Optional }
1542 } },
1543 { "PROXY_BLIST",
1544 {&CNCMessageHandler::x_DoCmd_GetBList,
1545 "PROXY_BLIST",
1546 fNeedsStorageCache | fNeedsBlobList | fDoNotProxyToPeers | fDoNotCheckPassword,
1547 eNCNone, eProxyNone},
1548 // Name of cache for blob.
1549 { { "cache", eNSPT_Str, eNSPA_Required },
1550 // Blob's key.
1551 { "key", eNSPT_Str, eNSPA_Required },
1552 // Blob's subkey.
1553 { "subkey", eNSPT_Str, eNSPA_Required },
1554 { "local", eNSPT_Int, eNSPA_Required }
1555 } },
1556 // added in v6.11.0 (CXX-8737)
1557 { "PROXY_BLIST2",
1558 {&CNCMessageHandler::x_DoCmd_GetBList,
1559 "PROXY_BLIST2",
1560 fNeedsStorageCache | fNeedsBlobList | fDoNotProxyToPeers | fDoNotCheckPassword,
1561 eNCNone, eProxyNone},
1562 // Name of cache for blob.
1563 { { "cache", eNSPT_Str, eNSPA_Required },
1564 // Blob's key.
1565 { "key", eNSPT_Str, eNSPA_Required },
1566 // Blob's subkey.
1567 { "subkey", eNSPT_Str, eNSPA_Required },
1568 { "local", eNSPT_Int, eNSPA_Required },
1569 // Created more than N seconds ago
1570 { "fcr_ago_ge", eNSPT_Int, eNSPA_Optional },
1571 // Created less than N seconds ago
1572 { "fcr_ago_lt", eNSPT_Int, eNSPA_Optional },
1573 // Created more than N seconds since epoch
1574 { "fcr_epoch_ge", eNSPT_Int, eNSPA_Optional },
1575 // Created less than N seconds since epoch
1576 { "fcr_epoch_lt", eNSPT_Int, eNSPA_Optional },
1577 // Will expire in more than N seconds from now
1578 { "fexp_now_ge", eNSPT_Int, eNSPA_Optional },
1579 // Will expire in less than N seconds from now
1580 { "fexp_now_lt", eNSPT_Int, eNSPA_Optional },
1581 // Will expire in more than N seconds since epoch
1582 { "fexp_epoch_ge", eNSPT_Int, eNSPA_Optional },
1583 // Will expire in less than N seconds since epoch
1584 { "fexp_epoch_lt", eNSPT_Int, eNSPA_Optional },
1585 // Will expire in more than N seconds from now
1586 { "fvexp_now_ge", eNSPT_Int, eNSPA_Optional },
1587 // Will expire in less than N seconds from now
1588 { "fvexp_now_lt", eNSPT_Int, eNSPA_Optional },
1589 // Will expire in more than N seconds since epoch
1590 { "fvexp_epoch_ge", eNSPT_Int, eNSPA_Optional },
1591 // Will expire in less than N seconds since epoch
1592 { "fvexp_epoch_lt", eNSPT_Int, eNSPA_Optional },
1593 // Server_id of the server where blob was created.
1594 { "fcr_srv", eNSPT_Int, eNSPA_Optional },
1595 // blob bigger than this size
1596 { "fsize_ge", eNSPT_Int, eNSPA_Optional },
1597 // blob smaller than this size
1598 { "fsize_lt", eNSPT_Int, eNSPA_Optional }
1599 } },
1600
1601 // HTTP commands
1602 { "DELETE",
1603 {&CNCMessageHandler::x_DoCmd_Remove,
1604 "DELETE",
1605 fNeedsBlobAccess | fNoBlobAccessStats | fIsHttp,
1606 eNCCreate,
1607 eProxyRemove}
1608 },
1609 { "GET",
1610 {&CNCMessageHandler::x_DoCmd_Get,
1611 "GET",
1612 eClientBlobRead | fIsHttp,
1613 eNCReadData,
1614 eProxyRead}
1615 },
1616 { "HEAD",
1617 {&CNCMessageHandler::x_DoCmd_GetMeta,
1618 "HEAD",
1619 eClientBlobRead | fDoNotCheckPassword | fIsHttp,
1620 eNCRead,
1621 eProxyGetMeta}
1622 },
1623 { "POST",
1624 {&CNCMessageHandler::x_DoCmd_Put,
1625 "POST",
1626 eClientBlobWrite | fCanGenerateKey | fReadExactBlobSize | fSkipBlobEOF | fIsHttp,
1627 eNCCreate,
1628 eProxyWrite}
1629 },
1630 { "PUT",
1631 {&CNCMessageHandler::x_DoCmd_Put,
1632 "PUT",
1633 eClientBlobWrite | fIsHttp | fReadExactBlobSize | fSkipBlobEOF | fIsHttp,
1634 eNCCreate,
1635 eProxyWrite}
1636 },
1637
1638
1639 // All commands below are not implemented and mostly old ones not needed
1640 // anymore. One exception is RECONF - it would be nice to have it but it
1641 // needs some thinking on how to implement it.
1642
1643 { "REINIT", {&CNCMessageHandler::x_DoCmd_NotImplemented, "REINIT"} },
1644 { "REINIT", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_REINIT"},
1645 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1646 // { "RECONF", {&CNCMessageHandler::x_DoCmd_NotImplemented, "RECONF"} },
1647 { "LOG", {&CNCMessageHandler::x_DoCmd_NotImplemented, "LOG"} },
1648 { "STAT", {&CNCMessageHandler::x_DoCmd_NotImplemented, "STAT"} },
1649 { "MONI", {&CNCMessageHandler::x_DoCmd_NotImplemented, "MONITOR"} },
1650 { "DROPSTAT", {&CNCMessageHandler::x_DoCmd_NotImplemented, "DROPSTAT"} },
1651 { "GBOW", {&CNCMessageHandler::x_DoCmd_NotImplemented, "GBOW"} },
1652 { "ISLK", {&CNCMessageHandler::x_DoCmd_NotImplemented, "ISLK"} },
1653 { "SMR", {&CNCMessageHandler::x_DoCmd_NotImplemented, "SMR"} },
1654 { "SMU", {&CNCMessageHandler::x_DoCmd_NotImplemented, "SMU"} },
1655 { "OK", {&CNCMessageHandler::x_DoCmd_NotImplemented, "OK"} },
1656 // will be used by HTTP
1657 // { "GET", {&CNCMessageHandler::x_DoCmd_NotImplemented, "GET"} },
1658 // { "PUT", {&CNCMessageHandler::x_DoCmd_NotImplemented, "PUT"} },
1659 { "REMOVE", {&CNCMessageHandler::x_DoCmd_NotImplemented, "REMOVE"} },
1660 { "STSP", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_STSP"},
1661 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1662 { "GTSP", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GTSP"},
1663 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1664 { "SVRP", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_SVRP"},
1665 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1666 { "GVRP", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GVRP"},
1667 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1668 { "PRG1", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_PRG1"},
1669 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1670 { "REMK", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_REMK"},
1671 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1672 { "GBLW", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GBLW"},
1673 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1674 { "ISOP", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_ISOP"},
1675 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1676 { "GACT", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GACT"},
1677 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1678 { "GTOU", {&CNCMessageHandler::x_DoCmd_NotImplemented, "IC_GTOU"},
1679 { { "cache", eNSPT_Id, eNSPA_ICPrefix } } },
1680 { NULL }
1681 };
1682
1683 // List of arguments that can be in client authentication line.
1684 static SNSProtoArgument s_AuthArgs[] = {
1685 { "client", eNSPT_Str, eNSPA_Optional, "Unknown client" },
1686 { "params", eNSPT_Str, eNSPA_Ellipsis },
1687 { NULL }
1688 };
1689
1690
1691
1692
1693 /////////////////////////////////////////////////////////////////////////////
1694 // CNCMessageHandler implementation
1695
1696 inline void
x_ResetFlags(void)1697 CNCMessageHandler::x_ResetFlags(void)
1698 {
1699 TNCCmdFlags keep = m_Flags & fNoReplyOnFinish;
1700 m_Flags = fNoCmdFlags | keep;
1701 }
1702
1703 inline void
x_SetFlag(ENCCmdFlags flag)1704 CNCMessageHandler::x_SetFlag(ENCCmdFlags flag)
1705 {
1706 m_Flags |= flag;
1707 }
1708
1709 inline void
x_UnsetFlag(ENCCmdFlags flag)1710 CNCMessageHandler::x_UnsetFlag(ENCCmdFlags flag)
1711 {
1712 m_Flags &= ~flag;
1713 }
1714
1715 inline bool
x_IsFlagSet(ENCCmdFlags flag)1716 CNCMessageHandler::x_IsFlagSet(ENCCmdFlags flag)
1717 {
1718 return (m_Flags & flag) != 0;
1719 }
1720
1721 inline bool
x_IsUserFlagSet(ENCUserFlags flag)1722 CNCMessageHandler::x_IsUserFlagSet(ENCUserFlags flag)
1723 {
1724 return (m_UserFlags & flag) != 0;
1725 }
1726
1727 inline bool
x_IsCmdSucceeded(int cmd_status)1728 CNCMessageHandler::x_IsCmdSucceeded(int cmd_status)
1729 {
1730 return cmd_status < 300;
1731 }
1732
1733 //static Uint8 s_CntHndls = 0;
1734
CNCMessageHandler(void)1735 CNCMessageHandler::CNCMessageHandler(void)
1736 : m_Flags(0),
1737 m_Parser(s_CommandMap),
1738 m_CmdProcessor(NULL),
1739 m_BlobAccess(NULL),
1740 m_write_event(NULL),
1741 m_ChunkLen(0),
1742 m_SrvsIndex(0),
1743 m_ActiveHub(NULL)
1744 {
1745 LOG_CURRENT_FUNCTION
1746 #if __NC_TASKS_MONITOR
1747 m_TaskName = "CNCMessageHandler";
1748 #endif
1749 m_CopyBlobInfo = new SNCBlobVerData(nullptr);
1750 m_LatestBlobSum = new SNCBlobSummary();
1751 m_BlobFilter = new SNCBlobFilter;
1752 m_CmdLog.reserve(32);
1753
1754 SetState(&CNCMessageHandler::x_SocketOpened);
1755
1756 //Uint8 cnt = AtomicAdd(s_CntHndls, 1);
1757 //INFO("CNCMessageHandler, cnt=" << cnt);
1758 m_HttpMode = eNoHttp;
1759 }
1760
~CNCMessageHandler(void)1761 CNCMessageHandler::~CNCMessageHandler(void)
1762 {
1763 LOG_CURRENT_FUNCTION
1764 delete m_LatestBlobSum;
1765 delete m_CopyBlobInfo;
1766
1767 //Uint8 cnt = AtomicSub(s_CntHndls, 1);
1768 //INFO("~CNCMessageHandler, cnt=" << cnt);
1769 }
1770
1771 CNCMessageHandler::State
x_SocketOpened(void)1772 CNCMessageHandler::x_SocketOpened(void)
1773 {
1774 LOG_CURRENT_FUNCTION
1775 m_PrevCache.clear();
1776 m_ClientParams.clear();
1777 m_CntCmds = 0;
1778
1779 string host;
1780 Uint2 port = 0;
1781 GetPeerAddress(host, port);
1782 m_ClientParams["peer"] = host;
1783 m_ClientParams["pport"] = NStr::UIntToString(port);
1784 m_LocalPort = GetLocalPort();
1785 m_ClientParams["port"] = NStr::UIntToString(m_LocalPort);
1786
1787 return &CNCMessageHandler::x_ReadAuthMessage;
1788 }
1789
1790 CNCMessageHandler::State
x_CloseCmdAndConn(void)1791 CNCMessageHandler::x_CloseCmdAndConn(void)
1792 {
1793 LOG_CURRENT_FUNCTION
1794 if (GetDiagCtx()->GetRequestStatus() == eStatus_OK) {
1795 if (HasError() || !CanHaveMoreRead())
1796 GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
1797 else if (CTaskServer::IsInShutdown())
1798 GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
1799 else
1800 GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
1801 }
1802 int status = GetDiagCtx()->GetRequestStatus();
1803 if (x_IsFlagSet(eBlobPut) && m_NCBlobKey.IsICacheKey()) {
1804 x_JournalBlobPutResult(status, m_NCBlobKey.PackedKey(), m_BlobSlot);
1805 }
1806 x_CleanCmdResources();
1807 GetDiagCtx()->SetRequestStatus(status);
1808 return &CNCMessageHandler::x_SaveStatsAndClose;
1809 }
1810
1811 CNCMessageHandler::State
x_SaveStatsAndClose(void)1812 CNCMessageHandler::x_SaveStatsAndClose(void)
1813 {
1814 LOG_CURRENT_FUNCTION
1815 //CNCStat::AddClosedConnection(conn_span, GetDiagCtx()->GetRequestStatus(), m_CntCmds);
1816 CNCStat::ConnClosing(m_CntCmds);
1817 return &CNCMessageHandler::x_PrintCmdsCntAndClose;
1818 }
1819
1820 CNCMessageHandler::State
x_PrintCmdsCntAndClose(void)1821 CNCMessageHandler::x_PrintCmdsCntAndClose(void)
1822 {
1823 LOG_CURRENT_FUNCTION
1824 CSrvDiagMsg().PrintExtra().PrintParam("cmds_cnt", m_CntCmds);
1825 CloseSocket();
1826 Terminate();
1827 return NULL;
1828 }
1829
1830 CNCMessageHandler::State
x_WriteInitWriteResponse(void)1831 CNCMessageHandler::x_WriteInitWriteResponse(void)
1832 {
1833 LOG_CURRENT_FUNCTION
1834 check_again:
1835 ENCClientHubStatus status = m_ActiveHub->GetStatus();
1836 if (status == eNCHubError || status == eNCHubSuccess) {
1837 m_LastPeerError = m_ActiveHub->GetErrMsg();
1838 SRV_LOG(Warning, "Error executing command on peer "
1839 << m_ActiveHub->GetFullPeerName() << ", peer says: "
1840 << m_LastPeerError);
1841 m_ActiveHub->Release();
1842 m_ActiveHub = NULL;
1843 return &CNCMessageHandler::x_ProxyToNextPeer;
1844 }
1845 else if (status != eNCHubCmdInProgress) {
1846 SRV_FATAL("Unexpected client status: " << status);
1847 }
1848 if (NeedEarlyClose())
1849 return &CNCMessageHandler::x_CloseCmdAndConn;
1850
1851 CNCActiveHandler* active = m_ActiveHub->GetHandler();
1852 if (!active->GotClientResponse())
1853 return NULL;
1854 // Intentionally re-reading the status because it could change since our
1855 // previous read at the beginning of the function.
1856 if (m_ActiveHub->GetStatus() != eNCHubCmdInProgress)
1857 goto check_again;
1858
1859 WriteText(active->GetCmdResponse()).WriteText("\n");
1860 Flush();
1861 if (NeedEarlyClose())
1862 return &CNCMessageHandler::x_CloseCmdAndConn;
1863
1864 return &CNCMessageHandler::x_ReadBlobSignature;
1865 }
1866
1867 CNCMessageHandler::State
x_ReadAuthMessage(void)1868 CNCMessageHandler::x_ReadAuthMessage(void)
1869 {
1870 LOG_CURRENT_FUNCTION
1871 if (NeedToClose() || CTaskServer::IsInSoftShutdown()) {
1872 if (CTaskServer::IsInShutdown())
1873 GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
1874 else
1875 GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
1876 return &CNCMessageHandler::x_SaveStatsAndClose;
1877 }
1878
1879 CTempString auth_line;
1880 if (!ReadLine(&auth_line)) {
1881 if (!HasError() && CanHaveMoreRead())
1882 return NULL;
1883 if (IsReadDataAvailable()) {
1884 GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
1885 return &CNCMessageHandler::x_SaveStatsAndClose;
1886 }
1887 else {
1888 GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
1889 return &CNCMessageHandler::x_PrintCmdsCntAndClose;
1890 }
1891 }
1892
1893 m_HttpMode = eNoHttp;
1894 size_t auth_size = auth_line.size();
1895 if (auth_size > 8 && auth_line[auth_size-8] == 'H') {
1896 if (NStr::strncmp(auth_line.data() + auth_size - 8, "HTTP/1.", 7) == 0) {
1897 if (auth_line[auth_size-1] == '0') {
1898 m_HttpMode = eHttp10;
1899 } else if (auth_line[auth_size-1] == '1') {
1900 m_HttpMode = eHttp11;
1901 }
1902 }
1903 }
1904
1905 if (!x_IsHttpMode()) {
1906 TNSProtoParams params;
1907 try {
1908 m_Parser.ParseArguments(auth_line, s_AuthArgs, ¶ms);
1909 }
1910 catch (CNSProtoParserException& ex) {
1911 SRV_LOG(Error, "Error authenticating client: '"
1912 << auth_line << "': " << ex);
1913 params["client"] = auth_line;
1914 return &CNCMessageHandler::x_SaveStatsAndClose;
1915 }
1916 ITERATE(TNSProtoParams, it, params) {
1917 m_ClientParams[it->first] = it->second;
1918 }
1919 CSrvDiagMsg diag_msg;
1920 diag_msg.PrintExtra();
1921 ITERATE(TNSProtoParams, it, params) {
1922 diag_msg.PrintParam(it->first, it->second);
1923 }
1924 diag_msg.Flush();
1925 } else {
1926 m_PosponedCmd = auth_line;
1927 // m_ClientParams["cache"] = "http";
1928 // m_ClientParams["cache"] = "";
1929 }
1930
1931 m_BaseAppSetup = m_AppSetup = CNCServer::GetAppSetup(m_ClientParams);
1932 if (m_AppSetup->disable) {
1933 SRV_LOG(Warning, "Disabled client is being disconnected ('"
1934 << auth_line << "').");
1935 GetDiagCtx()->SetRequestStatus(eStatus_Disabled);
1936 return &CNCMessageHandler::x_SaveStatsAndClose;
1937 }
1938 else {
1939 return &CNCMessageHandler::x_ReadCommand;
1940 }
1941 }
1942
1943 void
x_AssignCmdParams(void)1944 CNCMessageHandler::x_AssignCmdParams(void)
1945 {
1946 LOG_CURRENT_FUNCTION
1947 CTempString blob_key, blob_subkey;
1948 m_UserFlags = fNoUserFlags;
1949 m_BlobVersion = 0;
1950 m_NCBlobKey.Clear();
1951 m_BlobPass.clear();
1952 m_RawBlobPass.clear();
1953 m_KeyVersion = 1;
1954 m_BlobTTL = 0;
1955 m_StartPos = 0;
1956 m_Size = Uint8(-1);
1957 m_Slot = 0;
1958 m_OrigRecNo = 0;
1959 m_OrigSrvId = 0;
1960 m_OrigTime = 0;
1961 m_Quorum = 1;
1962 m_CmdVersion = 0;
1963 m_ForceLocal = false;
1964 m_AgeMax = m_AgeCur = 0;
1965 m_SlotsDone.clear();
1966 m_CmdParams.clear();
1967 bool quorum_was_set = false;
1968 bool search_was_set = false;
1969
1970 new (m_LatestBlobSum) SNCBlobSummary();
1971 delete m_CopyBlobInfo;
1972 m_CopyBlobInfo = new SNCBlobVerData(nullptr);
1973 new (m_BlobFilter) SNCBlobFilter();
1974
1975 CTempString cache_name;
1976
1977 if (!x_IsHttpMode()) {
1978 m_HttpMode = eNoHttp;
1979 ERASE_ITERATE(TNSProtoParams, it, m_ParsedCmd.params) {
1980 const CTempString& key = it->first;
1981 CTempString& val = it->second;
1982
1983 switch (key[0]) {
1984 case 'a':
1985 if (key == "age") {
1986 m_AgeMax = NStr::StringToUInt8(val);
1987 }
1988 break;
1989 case 'c':
1990 switch (key[1]) {
1991 case 'a':
1992 if (key == "cache") {
1993 cache_name = val;
1994 }
1995 break;
1996 case 'm':
1997 if (key == "cmd_ver") {
1998 m_CmdVersion = NStr::StringToUInt(val);
1999 }
2000 break;
2001 case 'o':
2002 if (key == "confirm") {
2003 if (val == "1")
2004 x_UnsetFlag(fNoReplyOnFinish);
2005 else
2006 x_SetFlag(fNoReplyOnFinish);
2007 }
2008 break;
2009 case 'r':
2010 if (key == "cr_time") {
2011 m_CopyBlobInfo->create_time = NStr::StringToUInt8(val);
2012 }
2013 else if (key == "cr_id") {
2014 m_CopyBlobInfo->create_id = NStr::StringToUInt(val);
2015 }
2016 else if (key == "cr_srv") {
2017 m_CopyBlobInfo->create_server = NStr::StringToUInt8(val);
2018 }
2019 break;
2020 }
2021 break;
2022 case 'd':
2023 if (key == "dead") {
2024 m_CopyBlobInfo->dead_time = NStr::StringToInt(val);
2025 }
2026 break;
2027 case 'e':
2028 if (key == "exp") {
2029 m_CopyBlobInfo->expire = NStr::StringToInt(val);
2030 }
2031 break;
2032 case 'f':
2033 if (key == "flags") {
2034 m_UserFlags = NStr::StringToUInt(val);
2035 } else if (key == "fcr_ago_ge") {
2036 m_BlobFilter->cr_ago_ge = NStr::StringToUInt8(val);
2037 } else if (key == "fcr_ago_lt") {
2038 m_BlobFilter->cr_ago_lt = NStr::StringToUInt8(val);
2039 } else if (key == "fcr_epoch_ge") {
2040 m_BlobFilter->cr_epoch_ge = NStr::StringToUInt8(val);
2041 } else if (key == "fcr_epoch_lt") {
2042 m_BlobFilter->cr_epoch_lt = NStr::StringToUInt8(val);
2043 } else if (key == "fexp_now_ge") {
2044 m_BlobFilter->exp_now_ge = NStr::StringToUInt8(val);
2045 } else if (key == "fexp_now_lt") {
2046 m_BlobFilter->exp_now_lt = NStr::StringToUInt8(val);
2047 } else if (key == "fexp_epoch_ge") {
2048 m_BlobFilter->exp_epoch_ge = NStr::StringToUInt8(val);
2049 } else if (key == "fexp_epoch_lt") {
2050 m_BlobFilter->exp_epoch_lt = NStr::StringToUInt8(val);
2051 } else if (key == "fvexp_now_ge") {
2052 m_BlobFilter->vexp_now_ge = NStr::StringToUInt8(val);
2053 } else if (key == "fvexp_now_lt") {
2054 m_BlobFilter->vexp_now_lt = NStr::StringToUInt8(val);
2055 } else if (key == "fvexp_epoch_ge") {
2056 m_BlobFilter->vexp_epoch_ge = NStr::StringToUInt8(val);
2057 } else if (key == "fvexp_epoch_lt") {
2058 m_BlobFilter->vexp_epoch_lt = NStr::StringToUInt8(val);
2059 } else if (key == "fcr_srv") {
2060 m_BlobFilter->cr_srv = NStr::StringToUInt8(val);
2061 } else if (key == "fsize_ge") {
2062 m_BlobFilter->size_ge = NStr::StringToUInt8(val);
2063 } else if (key == "fsize_lt") {
2064 m_BlobFilter->size_lt = NStr::StringToUInt8(val);
2065 }
2066 break;
2067 case 'h':
2068 if (key == "http") {
2069 m_HttpMode = (EHttpMode)NStr::StringToInt(val);
2070 }
2071 break;
2072 case 'i':
2073 if (key == "ip") {
2074 if (!val.empty())
2075 GetDiagCtx()->SetClientIP(val);
2076 // Erase parameter to not print it in request-start, it will be
2077 // printed as a part of standard log header.
2078 m_ParsedCmd.params.erase(it);
2079 }
2080 break;
2081 case 'k':
2082 if (key == "key") {
2083 blob_key = val;
2084 }
2085 break;
2086 case 'l':
2087 if (key == "log_rec") {
2088 m_OrigRecNo = NStr::StringToUInt8(val);
2089 }
2090 else if (key == "log_srv") {
2091 m_OrigSrvId = NStr::StringToUInt8(val);
2092 }
2093 else if (key == "log_time") {
2094 m_OrigTime = NStr::StringToUInt8(val);
2095 }
2096 else if (key == "local") {
2097 m_ForceLocal = val == "1";
2098 }
2099 break;
2100 case 'm':
2101 if (key == "md5_pass") {
2102 m_BlobPass = val;
2103 }
2104 break;
2105 case 'n':
2106 if (key == "ncbi_phid") {
2107 if (!val.empty()) {
2108 GetDiagCtx()->SetHitID(val);
2109 }
2110 }
2111 break;
2112 case 'p':
2113 if (key == "pass") {
2114 m_RawBlobPass = val;
2115 CMD5 md5;
2116 md5.Update(val.data(), val.size());
2117 unsigned char digest[16];
2118 md5.Finalize(digest);
2119 m_BlobPass.assign((char*)digest, 16);
2120 // Erase parameter to not expose passwords via logs.
2121 m_ParsedCmd.params.erase(it);
2122 }
2123 else if (key == "prev") {
2124 m_StatPrev = val == "1";
2125 }
2126 break;
2127 case 'q':
2128 if (key == "qrum") {
2129 m_Quorum = NStr::StringToUInt(val);
2130 quorum_was_set = true;
2131 }
2132 break;
2133 case 'r':
2134 if (key == "rec_my") {
2135 m_RemoteRecNo = NStr::StringToUInt8(val);
2136 }
2137 else if (key == "rec_your") {
2138 m_LocalRecNo = NStr::StringToUInt8(val);
2139 }
2140 break;
2141 case 's':
2142 switch (key[1]) {
2143 case 'i':
2144 if (key == "sid") {
2145 if (!val.empty())
2146 GetDiagCtx()->SetSessionID(NStr::URLDecode(val));
2147 // Erase parameter to not print it in request-start,
2148 // it will be printed as a part of standard log header.
2149 m_ParsedCmd.params.erase(it);
2150 }
2151 else if (key == "size") {
2152 m_Size = Uint8(NStr::StringToInt8(val));
2153 }
2154 break;
2155 case 'l':
2156 if (key == "slot") {
2157 m_Slot = Uint2(NStr::StringToUInt(val));
2158 }
2159 break;
2160 case 'r':
2161 if (key == "srv_id") {
2162 m_SrvId = NStr::StringToUInt8(val);
2163 }
2164 else if (key == "srch") {
2165 m_SearchOnRead = val != "0";
2166 search_was_set = true;
2167 }
2168 break;
2169 case 't':
2170 if (key == "start") {
2171 m_StartPos = NStr::StringToUInt8(val);
2172 }
2173 break;
2174 case 'u':
2175 if (key == "subkey") {
2176 blob_subkey = val;
2177 }
2178 break;
2179 }
2180 break;
2181 case 't':
2182 if (key == "ttl") {
2183 m_BlobTTL = NStr::StringToUInt(val);
2184 }
2185 else if (key == "type") {
2186 m_StatType = val;
2187 }
2188 break;
2189 case 'v':
2190 if (key == "version") {
2191 m_BlobVersion = NStr::StringToInt(val);
2192 }
2193 else if (key == "ver_ttl") {
2194 m_CopyBlobInfo->ver_ttl = NStr::StringToUInt(val);
2195 }
2196 else if (key == "ver_dead") {
2197 m_CopyBlobInfo->ver_expire = NStr::StringToInt(val);
2198 }
2199 break;
2200 default:
2201 break;
2202 }
2203 }
2204 } else {
2205 m_ClientParams["client"] = "";
2206 cache_name = m_ClientParams["cache"];
2207 if (m_ClientParams.find("key") != m_ClientParams.end()) {
2208 blob_key = m_ClientParams["key"];
2209 }
2210 // parse HTTP header
2211 CTempString cmd_line;
2212 map<string,string> headers;
2213 while (ReadLine(&cmd_line)) {
2214 if (cmd_line.empty()) {
2215 break;
2216 }
2217 const string content_length("Content-Length:");
2218 const string user_agent("User-Agent:");
2219 const string content_range("Range:");
2220 size_t max_pos = cmd_line.size();
2221
2222 if (NStr::StartsWith(cmd_line, content_length)) {
2223 size_t pos = content_length.size();
2224 m_Size = NStr::StringToUInt8(
2225 CTempString(cmd_line.data() + pos, max_pos - pos),
2226 NStr::fAllowLeadingSpaces | NStr::fAllowTrailingSpaces);
2227 }
2228 else if (NStr::StartsWith(cmd_line, content_range)) {
2229 size_t pos = content_range.size();
2230 const char* begin = cmd_line.data() + pos;
2231 while (!isdigit(*begin) && pos < max_pos) {
2232 ++pos; ++begin;
2233 }
2234 m_StartPos = 0;
2235 while (isdigit(*begin) && pos < max_pos) {
2236 m_StartPos = m_StartPos * 10 + (*begin - '0');
2237 ++pos; ++begin;
2238 }
2239 while (!isdigit(*begin) && pos < max_pos) {
2240 ++pos; ++begin;
2241 }
2242 m_Size = 0;
2243 while (isdigit(*begin) && pos < max_pos) {
2244 m_Size = m_Size * 10 + (*begin - '0');
2245 ++pos; ++begin;
2246 }
2247 // byte pos are inclusive
2248 m_Size = (m_Size >= m_StartPos) ? (m_Size - m_StartPos + 1) : 0;
2249 }
2250 else if (NStr::StartsWith(cmd_line, user_agent)) {
2251 list<CTempString> arr;
2252 size_t pos = user_agent.size();
2253 NStr::Split( CTempString(cmd_line.data()+pos, max_pos-pos), " ", arr, NStr::fSplit_Tokenize);
2254 if (!arr.empty()) {
2255 m_ClientParams["client"] = NStr::URLEncode( arr.front());
2256 }
2257 }
2258 else {
2259 string key, value;
2260 NStr::SplitInTwo(cmd_line, ":", key, value);
2261 key = NStr::TruncateSpaces(key);
2262 key = NStr::ToUpper(key);
2263 value = NStr::TruncateSpaces(value);
2264 headers[key] = value;
2265 }
2266 }
2267 list<CTempString> arr;
2268 if (headers.find("NCBI-SID") != headers.end()) {
2269 NStr::Split( headers.at("NCBI-SID"), ", \t", arr, NStr::fSplit_Tokenize);
2270 if (!arr.empty()) {
2271 m_CmdParams["sid"] = arr.back();
2272 }
2273 }
2274 arr.clear();
2275 if (headers.find("NCBI-PHID") != headers.end()) {
2276 NStr::Split( headers.at("NCBI-PHID"), ", \t", arr, NStr::fSplit_Tokenize);
2277 if (!arr.empty()) {
2278 m_CmdParams["ncbi_phid"] = arr.back();
2279 }
2280 }
2281 arr.clear();
2282 string client_ip = g_GetClientIP( headers);
2283 if (!client_ip.empty()) {
2284 NStr::Split( client_ip, ", \t", arr, NStr::fSplit_Tokenize);
2285 }
2286 if (!arr.empty()) {
2287 m_CmdParams["ip"] = arr.front();
2288 } else {
2289 m_CmdParams["ip"] = m_ClientParams["peer"];
2290 }
2291 }
2292
2293 m_NCBlobKey.Assign(cache_name, blob_key, blob_subkey);
2294 if (cache_name.empty()) {
2295 m_AppSetup = m_BaseAppSetup;
2296 m_ClientParams.erase("cache");
2297 }
2298 else if (cache_name == m_PrevCache) {
2299 m_AppSetup = m_PrevAppSetup;
2300 }
2301 else {
2302 m_PrevCache = cache_name;
2303 m_ClientParams["cache"] = cache_name;
2304 m_AppSetup = m_PrevAppSetup = CNCServer::GetAppSetup(m_ClientParams);
2305 }
2306 if (!quorum_was_set)
2307 m_Quorum = m_AppSetup->quorum;
2308 if (m_ForceLocal)
2309 m_Quorum = 1;
2310 if (!search_was_set)
2311 m_SearchOnRead = m_AppSetup->srch_on_read;
2312 }
2313
2314 void
x_PrintRequestStart(CSrvDiagMsg & diag_msg)2315 CNCMessageHandler::x_PrintRequestStart(CSrvDiagMsg& diag_msg)
2316 {
2317 LOG_CURRENT_FUNCTION
2318 diag_msg.StartRequest();
2319 diag_msg.PrintParam("_type", "cmd");
2320 diag_msg.PrintParam("cmd", m_ParsedCmd.command->cmd);
2321 diag_msg.PrintParam("client", m_ClientParams["client"]);
2322 diag_msg.PrintParam("conn", m_ConnReqId);
2323 diag_msg.PrintParam("phost", m_ClientParams["peer"]);
2324 ITERATE(TNSProtoParams, it, m_ParsedCmd.params) {
2325 diag_msg.PrintParam(it->first, it->second);
2326 }
2327 ITERATE(TStringMap, it, m_CmdParams) {
2328 diag_msg.PrintParam(it->first, it->second);
2329 }
2330 if (!m_BlobPass.empty()) {
2331 diag_msg.PrintParam("pass", CNCBlobStorage::PrintablePassword(m_BlobPass));
2332 }
2333 diag_msg.PrintParam("ncbi_role", CNCServer::GetHostRole());
2334 diag_msg.PrintParam("ncbi_location", CNCServer::GetHostLocation());
2335 }
2336
2337 CNCMessageHandler::State
x_StartCommand(void)2338 CNCMessageHandler::x_StartCommand(void)
2339 {
2340 LOG_CURRENT_FUNCTION
2341 m_CmdPrevTime = m_CmdStartTime = CSrvTime::Current();
2342 m_CmdLog.clear();
2343 CNCStat::CmdStarted(m_ParsedCmd.command->cmd);
2344 CSrvDiagMsg diag_msg;
2345 x_PrintRequestStart(diag_msg);
2346
2347 if (NeedToClose()) {
2348 diag_msg.Flush();
2349 x_ResetFlags();
2350 return &CNCMessageHandler::x_CloseCmdAndConn;
2351 }
2352 if (HasError() || !CanHaveMoreRead()) {
2353 diag_msg.Flush();
2354 GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
2355 x_ResetFlags();
2356 return &CNCMessageHandler::x_CloseCmdAndConn;
2357 }
2358
2359 if (x_IsFlagSet(fNeedsAdminClient)
2360 && m_ClientParams["client"] != CNCServer::GetAdminClient()
2361 && m_ClientParams["client"] != kNCPeerClientName)
2362 {
2363 string msg;
2364 msg += "command: " + string(m_ParsedCmd.command->cmd);
2365 msg += ", peer: " + m_ClientParams["peer"];
2366 msg += ", client: " + m_ClientParams["client"];
2367 CNCAlerts::Register(CNCAlerts::eAccessDenied, msg);
2368 diag_msg.Flush();
2369 x_ReportError(eStatus_NeedAdmin);
2370 if (!x_IsHttpMode()) {
2371 x_ResetFlags();
2372 }
2373 return &CNCMessageHandler::x_FinishCommand;
2374 }
2375
2376 if (!CNCServer::IsCachingComplete()
2377 && (x_IsFlagSet(fNeedsStorageCache) || m_ForceLocal))
2378 {
2379 diag_msg.Flush();
2380 x_ReportError(eStatus_JustStarted);
2381 if (!x_IsHttpMode()) {
2382 x_ResetFlags();
2383 }
2384 return &CNCMessageHandler::x_FinishCommand;
2385 }
2386
2387 if (m_AppSetup->disable) {
2388 diag_msg.Flush();
2389 // We'll be here only if generally work for the client is enabled but
2390 // for current particular cache it is disabled.
2391 x_ReportError(eStatus_Disabled);
2392 if (!x_IsHttpMode()) {
2393 x_ResetFlags();
2394 }
2395 return &CNCMessageHandler::x_FinishCommand;
2396 }
2397
2398 if (x_IsFlagSet(fRunsInStartedSync)) {
2399 ESyncInitiateResult start_res = CNCPeriodicSync::CanStartSyncCommand(
2400 m_SrvId, m_Slot,
2401 !x_IsFlagSet(fProhibitsSyncAbort),
2402 m_SyncId);
2403 if (start_res == eNetworkError) {
2404 diag_msg.Flush();
2405 x_ReportError(eStatus_CmdAborted);
2406 if (!x_IsHttpMode()) {
2407 x_ResetFlags();
2408 }
2409 return &CNCMessageHandler::x_FinishCommand;
2410 }
2411 else if (start_res == eServerBusy || CTaskServer::IsInSoftShutdown()) {
2412 diag_msg.Flush();
2413 x_ReportOK("OK:SIZE=0, NEED_ABORT1\n");
2414 GetDiagCtx()->SetRequestStatus(eStatus_SyncAborted);
2415 // Old NC servers (those which used CNetCacheAPI instead of
2416 // CNCActiveHandler) always started to write blob data in SYNC_PUT
2417 // even when we responded to them NEED_ABORT. To avoid breaking the protocol
2418 // we need to read from them those fake blob writes.
2419 bool needs_fake = x_IsFlagSet(fReadExactBlobSize) && m_CmdVersion == 0;
2420 if (start_res == eServerBusy)
2421 x_ResetFlags();
2422 if (needs_fake)
2423 return &CNCMessageHandler::x_ReadBlobSignature;
2424 return &CNCMessageHandler::x_FinishCommand;
2425 }
2426 }
2427
2428 if (x_IsFlagSet(fNeedsSpaceAsPeer)
2429 && !CNCBlobStorage::AcceptWritesFromPeers()) {
2430 x_ReportError(eStatus_NoDiskSpace);
2431 return &CNCMessageHandler::x_FinishCommand;
2432 }
2433
2434 if (!x_IsFlagSet(fNeedsBlobAccess) && !x_IsFlagSet(fNeedsBlobList)) {
2435 diag_msg.Flush();
2436 // if we do not need blob access
2437 return m_CmdProcessor;
2438 }
2439
2440 if (((m_BlobPass.empty() && m_AppSetup->pass_policy == eNCOnlyWithPass)
2441 || (!m_BlobPass.empty() && m_AppSetup->pass_policy == eNCOnlyWithoutPass))
2442 && !x_IsFlagSet(fDoNotCheckPassword))
2443 {
2444 diag_msg.Flush();
2445 GetDiagCtx()->SetRequestStatus(eStatus_NotAllowed);
2446 // why substr?
2447 x_ReportError(GetMessageByStatus(eStatus_NotAllowed).substr(4));
2448 SRV_LOG(Warning, GetMessageByStatus(eStatus_NotAllowed));
2449 return &CNCMessageHandler::x_FinishCommand;
2450 }
2451
2452 if (x_IsFlagSet(fCanGenerateKey) && m_NCBlobKey.RawKey().empty()) {
2453 string raw_key;
2454 //to test
2455 // bool old_ver = (CSrvTime::Current().CurSecs() % 2) != 0;
2456 bool old_ver = !x_IsHttpMode();
2457
2458 CNCDistributionConf::GenerateBlobKey(
2459 !x_IsHttpMode() ? m_LocalPort : Uint4( CNCDistributionConf::GetSelfID()),
2460 raw_key, m_BlobSlot, m_TimeBucket, old_ver ? 1 : 3);
2461 m_NCBlobKey.Assign(raw_key);
2462 diag_msg.PrintParam("key", m_NCBlobKey.RawKey());
2463 diag_msg.PrintParam("gen_key", "1");
2464 }
2465 else if (!m_NCBlobKey.IsValid() && !x_IsFlagSet(fNeedsBlobList)) {
2466 diag_msg.Flush();
2467 x_ReportError(eStatus_NotFound);
2468 SRV_LOG(Warning, "Invalid blob key format: " << m_NCBlobKey.RawKey());
2469 return &CNCMessageHandler::x_FinishCommand;
2470 }
2471 else if (m_NCBlobKey.IsICacheKey()) {
2472 CNCDistributionConf::GetSlotByICacheKey(m_NCBlobKey, m_BlobSlot, m_TimeBucket);
2473 } else {
2474 CNCDistributionConf::GetSlotByRnd(m_NCBlobKey.GetRandomPart(), m_BlobSlot, m_TimeBucket);
2475 }
2476
2477 m_BlobSize = 0;
2478 if (m_Slot == 0) {
2479 diag_msg.PrintParam("slot", m_BlobSlot);
2480 }
2481
2482 if ((!CNCDistributionConf::IsServedLocally(m_BlobSlot)
2483 || !CNCServer::IsCachingComplete())
2484 && !x_IsFlagSet(fDoNotProxyToPeers)
2485 && !m_ForceLocal
2486 && CNCDistributionConf::CountServersForSlot(m_BlobSlot) != 0)
2487 {
2488 diag_msg.PrintParam("proxy", "1");
2489 diag_msg.Flush();
2490 x_GetCurSlotServers();
2491 return &CNCMessageHandler::x_ProxyToNextPeer;
2492 }
2493
2494 if (!x_IsFlagSet(fNeedsBlobAccess)) {
2495 diag_msg.Flush();
2496 // if we do not need blob access
2497 return m_CmdProcessor;
2498 }
2499
2500 // no blob access before caching is done
2501 if (!CNCServer::IsCachingComplete())
2502 {
2503 diag_msg.Flush();
2504 x_ReportError(eStatus_JustStarted);
2505 if (!x_IsHttpMode()) {
2506 x_ResetFlags();
2507 }
2508 return &CNCMessageHandler::x_FinishCommand;
2509 }
2510
2511 diag_msg.Flush();
2512
2513 if (!CNCServer::IsInitiallySynced() && !m_ForceLocal
2514 && x_IsFlagSet(fUsesPeerSearch))
2515 {
2516 m_Quorum = 0;
2517 }
2518 if (x_IsFlagSet(fNeedsLowerPriority))
2519 SetPriority(CNCDistributionConf::GetSyncPriority());
2520 if ((x_IsFlagSet(fNeedsSpaceAsClient)
2521 && CNCBlobStorage::NeedStopWrite())
2522 || (x_IsFlagSet(fNeedsSpaceAsPeer)
2523 && !CNCBlobStorage::AcceptWritesFromPeers()))
2524 {
2525 if (CNCDistributionConf::CountServersForSlot(m_BlobSlot) != 0) {
2526 x_GetCurSlotServers();
2527 return &CNCMessageHandler::x_ProxyToNextPeer;
2528 }
2529 x_ReportError(eStatus_NoDiskSpace);
2530 return &CNCMessageHandler::x_FinishCommand;
2531 }
2532
2533 x_LogCmdEvent("RequestMetaInfo");
2534 m_BlobAccess = CNCBlobStorage::GetBlobAccess(
2535 x_IsUserFlagSet(fNoCreate) ? eNCNone : m_ParsedCmd.command->extra.blob_access,
2536 m_NCBlobKey.PackedKey(), m_BlobPass, m_TimeBucket);
2537 m_BlobAccess->RequestMetaInfo(this);
2538 return &CNCMessageHandler::x_WaitForBlobAccess;
2539 }
2540
2541 CNCMessageHandler::State
x_ReadCommand(void)2542 CNCMessageHandler::x_ReadCommand(void)
2543 {
2544 LOG_CURRENT_FUNCTION
2545 if (NeedToClose() || CTaskServer::IsInSoftShutdown()) {
2546 if (CTaskServer::IsInShutdown())
2547 GetDiagCtx()->SetRequestStatus(eStatus_ShuttingDown);
2548 else
2549 GetDiagCtx()->SetRequestStatus(eStatus_CmdTimeout);
2550 return &CNCMessageHandler::x_SaveStatsAndClose;
2551 }
2552
2553 CTempString cmd_line;
2554 if (x_IsHttpMode() && !m_PosponedCmd.empty()) {
2555 cmd_line = m_PosponedCmd;
2556 } else if (!ReadLine(&cmd_line)) {
2557 if (!HasError() && CanHaveMoreRead())
2558 return NULL;
2559 if (IsReadDataAvailable())
2560 GetDiagCtx()->SetRequestStatus(eStatus_PrematureClose);
2561 return &CNCMessageHandler::x_SaveStatsAndClose;
2562 }
2563
2564 if (!x_IsHttpMode()) {
2565 try {
2566 m_ParsedCmd = m_Parser.ParseCommand(cmd_line);
2567 }
2568 catch (CNSProtoParserException& ex) {
2569 SRV_LOG(Warning, "Error parsing command: " << ex);
2570 GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2571 return &CNCMessageHandler::x_SaveStatsAndClose;
2572 }
2573 } else {
2574 m_ClientParams.erase("key");
2575 list<CTempString> arr;
2576 ncbi_NStr_Split(cmd_line, " ", arr);
2577 bool good = false;
2578 if (arr.size() >= 3) {
2579 CTempString arr_cmd(arr.front());
2580 CTempString arr_uri(*(++arr.begin()));
2581 CTempString arr_key;
2582 if (arr_cmd == "DELETE" ||
2583 arr_cmd == "GET" ||
2584 arr_cmd == "HEAD" ||
2585 arr_cmd == "POST" ||
2586 arr_cmd == "PUT") {
2587 {
2588 // eg, "/" "/service"
2589 list<CTempString> uri_parts;
2590 ncbi_NStr_Split(arr_uri, "/", uri_parts);
2591 if (uri_parts.size() > 0) {
2592 if (arr_cmd != "POST") {
2593 arr_key = uri_parts.back();
2594 m_ClientParams["key"] = arr_key;
2595 uri_parts.pop_back();
2596 }
2597 string service;
2598 if (!uri_parts.empty()) {
2599 service = NStr::Join(uri_parts, "/");
2600 }
2601 m_ClientParams["service"] = service;
2602 }
2603 }
2604 m_ClientParams["cache"].clear();
2605
2606 try {
2607 m_ParsedCmd = m_Parser.ParseCommand(cmd_line);
2608 good = true;
2609 }
2610 catch (CNSProtoParserException& ) {
2611 GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2612 }
2613 } else {
2614 GetDiagCtx()->SetRequestStatus(eStatus_NoImpl);
2615 SRV_LOG(Error, "Unrecognized command: " << cmd_line);
2616 }
2617 } else {
2618 SRV_LOG(Error, "Error parsing command: " << cmd_line);
2619 GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2620 }
2621 m_PosponedCmd.clear();
2622 if (!good) {
2623 x_WriteHttpResponse();
2624 return &CNCMessageHandler::x_SaveStatsAndClose;
2625 }
2626 }
2627
2628 const SCommandExtra& cmd_extra = m_ParsedCmd.command->extra;
2629 m_CmdProcessor = cmd_extra.processor;
2630 m_Flags = cmd_extra.cmd_flags;
2631 CreateNewDiagCtx();
2632 try {
2633 x_AssignCmdParams();
2634 }
2635 catch (CStringException& ex) {
2636 ReleaseDiagCtx();
2637 SRV_LOG(Warning, "Error while parsing command '" << cmd_line
2638 << "': " << ex);
2639 GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
2640 return &CNCMessageHandler::x_SaveStatsAndClose;
2641 }
2642 return &CNCMessageHandler::x_StartCommand;
2643 }
2644
2645 void
x_GetCurSlotServers(void)2646 CNCMessageHandler::x_GetCurSlotServers(void)
2647 {
2648 LOG_CURRENT_FUNCTION
2649 CNCDistributionConf::GetServersForSlot(m_BlobSlot, m_CheckSrvs);
2650 m_SrvsIndex = 0;
2651 Uint4 main_srv_ip = 0;
2652 if (!m_NCBlobKey.IsICacheKey()) {
2653 main_srv_ip = CNCDistributionConf::GetMainSrvIP(m_NCBlobKey);
2654 }
2655 m_ThisServerIsMain = false;
2656 if (main_srv_ip != 0) {
2657 // Note: this check for "main" server for blob assumes that for each
2658 // blob slot only one NetCache instance processing it works on each
2659 // server. It will give false positive results if there are several
2660 // NC instances on the same server processing the same slot. But there's
2661 // no much sense in such setup, so it's pretty safe assumption.
2662 if (Uint4(CNCDistributionConf::GetSelfID() >> 32) == main_srv_ip
2663 // if NeedStopWrite, I cannot be sure that blob on this server is most recent
2664 // and need to ask other servers
2665 && !CNCBlobStorage::NeedStopWrite()
2666 // make sure this slot is served here
2667 && CNCDistributionConf::IsServedLocally(m_BlobSlot)
2668 ) {
2669 //m_ThisServerIsMain = true;
2670 m_ThisServerIsMain =
2671 CNCBlobAccessor::HasPutSucceeded(m_NCBlobKey.PackedKey());
2672 }
2673 if (!m_ThisServerIsMain) {
2674 CWriteBackControl::AnotherServerMain();
2675 for (size_t i = 0; i < m_CheckSrvs.size(); ++i) {
2676 if (Uint4(m_CheckSrvs[i] >> 32) == main_srv_ip) {
2677 Uint8 srv_id = m_CheckSrvs[i];
2678 m_CheckSrvs.erase(m_CheckSrvs.begin() + i);
2679 m_CheckSrvs.insert(m_CheckSrvs.begin(), srv_id);
2680 break;
2681 }
2682 }
2683 }
2684 }
2685 }
2686
2687 void
x_JournalBlobPutResult(int status,const string & blob_key,Uint2 blob_slot)2688 CNCMessageHandler::x_JournalBlobPutResult(int status, const string& blob_key, Uint2 blob_slot)
2689 {
2690 if (CNCDistributionConf::CountServersForSlot(blob_slot) != 0) {
2691 if (status == eStatus_PrematureClose && status == eStatus_CmdTimeout) {
2692 CNCBlobAccessor::PutFailed(blob_key);
2693 } else {
2694 CNCBlobAccessor::PutSucceeded(blob_key);
2695 }
2696 }
2697 }
2698
2699 void
x_ReportError(EHTTPStatus sts,bool eol)2700 CNCMessageHandler::x_ReportError( EHTTPStatus sts, bool eol /*= true*/)
2701 {
2702 GetDiagCtx()->SetRequestStatus(sts);
2703 x_ReportError( GetMessageByStatus(sts), eol);
2704 }
2705
2706 void
x_ReportError(const string & sts,bool eol)2707 CNCMessageHandler::x_ReportError( const string& sts, bool eol /*= true*/)
2708 {
2709 // in HTTP mode we leave it to x_CleanCmdResources always
2710 if (!x_IsHttpMode()) {
2711 if (!x_IsFlagSet(fNoReplyOnFinish)) {
2712 x_SetFlag(fNoReplyOnFinish);
2713 WriteText(sts);
2714 if (eol) {
2715 WriteText("\n");
2716 }
2717 }
2718 }
2719 }
2720
2721 CNCMessageHandler&
x_ReportOK(const string & sts)2722 CNCMessageHandler::x_ReportOK(const string& sts)
2723 {
2724 if (!x_IsHttpMode()) {
2725 if (!x_IsFlagSet(fNoReplyOnFinish)) {
2726 x_SetFlag(fNoReplyOnFinish);
2727 WriteText(sts);
2728 }
2729 }
2730 return *this;
2731 }
2732
2733 void
x_LogCmdEvent(const CTempString & evt)2734 CNCMessageHandler::x_LogCmdEvent( const CTempString& evt)
2735 {
2736 CSrvTime cmd_now = CSrvTime::Current();
2737 CSrvTime cmd_len = cmd_now;
2738 cmd_len -= m_CmdPrevTime;
2739 CSrvTime cmd_time = cmd_now;
2740 cmd_time -= m_CmdStartTime;
2741 m_CmdPrevTime = cmd_now;
2742 m_CmdLog.push_back( evt + ": " + NStr::NumericToString(cmd_len.AsUSec()) + "us ("
2743 + NStr::NumericToString(cmd_time.AsUSec()) + "us)");
2744 }
2745
x_LogCmdLog(void)2746 void CNCMessageHandler::x_LogCmdLog(void)
2747 {
2748 for( const string& l : m_CmdLog) {
2749 SRV_LOG(Warning, l);
2750 }
2751 }
2752
2753 CNCMessageHandler::State
x_WaitForBlobAccess(void)2754 CNCMessageHandler::x_WaitForBlobAccess(void)
2755 {
2756 LOG_CURRENT_FUNCTION
2757 if (!m_BlobAccess->IsMetaInfoReady())
2758 return NULL;
2759 if (NeedEarlyClose())
2760 return &CNCMessageHandler::x_CloseCmdAndConn;
2761
2762 if (!m_BlobAccess->IsAuthorized() && !x_IsFlagSet(fDoNotCheckPassword)) {
2763 x_ReportError(eStatus_BadPassword);
2764 return &CNCMessageHandler::x_FinishCommand;
2765 }
2766
2767 // send COPY_UPD notification early, before blob data is received
2768 // another option is to send it in x_FinishReadingBlob,
2769 // after m_BlobAccess->Finalize(), before CNCPeerControl::MirrorWrite
2770 // sending the notification here we assume the risk that the blob will not be received correctly
2771 // and the notification will create confusion only (it will be corrected by periodic sync).
2772 if (
2773 (m_Flags & eProxyBlobWrite) == eProxyBlobWrite // request from clients only
2774 && CNCDistributionConf::GetBlobUpdateHotline()
2775 #if !USE_ALWAYS_COPY_UPD
2776 // if the blob exists here, it can exist on mirrors as well
2777 && (m_BlobAccess->IsBlobExists()
2778 // if blob does not exist here, but it is created on another server, I should notify them
2779 || !CNCDistributionConf::IsThisServerKey(m_NCBlobKey))
2780 #endif
2781 ) {
2782 CNCPeerControl::MirrorUpdate(m_NCBlobKey, m_BlobSlot, CSrvTime::Current().AsUSec());
2783 }
2784
2785 if (!x_IsFlagSet(fUsesPeerSearch)) {
2786 x_LogCmdEvent("CmdProcessor");
2787 return m_CmdProcessor;
2788 }
2789
2790 // All commands that have fUsesPeerSearch will operate on m_LatestExist and
2791 // m_LatestBlobSum, so we need to fill it here even if in next "if" we'll go
2792 // almost directly to m_CmdProcessor.
2793 bool is_exist = m_BlobAccess->IsBlobExists();
2794 bool is_valid = m_BlobAccess->IsValid();
2795 m_LatestExist = is_exist
2796 && (x_IsFlagSet(fNoBlobVersionCheck)
2797 || m_BlobAccess->GetCurBlobVersion() == m_BlobVersion);
2798 if (x_IsFlagSet(fDoNotProxyToPeers) || m_ForceLocal || !m_SearchOnRead) {
2799 m_LatestSrvId = CNCDistributionConf::GetSelfID();
2800 } else {
2801 m_LatestSrvId = is_valid ?
2802 (is_exist ? CNCDistributionConf::GetSelfID() : CNCDistributionConf::GetMainSrvId(m_NCBlobKey)) :
2803 m_BlobAccess->GetValidServer();
2804 }
2805 if (m_LatestSrvId != 0) {
2806 if (m_LatestExist) {
2807 m_LatestBlobSum->create_time = m_BlobAccess->GetCurBlobCreateTime();
2808 m_LatestBlobSum->create_server = m_BlobAccess->GetCurCreateServer();
2809 m_LatestBlobSum->create_id = m_BlobAccess->GetCurCreateId();
2810 m_LatestBlobSum->dead_time = m_BlobAccess->GetCurBlobDeadTime();
2811 m_LatestBlobSum->expire = m_BlobAccess->GetCurBlobExpire();
2812 m_LatestBlobSum->ver_expire = m_BlobAccess->GetCurVerExpire();
2813 }
2814 if (x_IsFlagSet(fDoNotProxyToPeers)
2815 || m_ForceLocal
2816 || (m_Quorum == 1 && (m_LatestExist || !m_SearchOnRead))
2817 || (m_LatestExist && x_IsFlagSet(fPeerFindExistsOnly)))
2818 {
2819 return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
2820 }
2821 }
2822
2823 x_GetCurSlotServers();
2824 if (m_ThisServerIsMain
2825 && m_AppSetup->fast_on_main
2826 && CNCServer::IsInitiallySynced())
2827 {
2828 if (!is_valid) {
2829 m_CheckSrvs.clear();
2830 m_SrvsIndex = 0;
2831 }
2832 return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
2833 }
2834 if (!is_valid) {
2835 Uint8 srv_id = m_BlobAccess->GetValidServer();
2836 if (srv_id) {
2837 for (size_t i = 0; i < m_CheckSrvs.size(); ++i) {
2838 if (m_CheckSrvs[i] == srv_id) {
2839 m_CheckSrvs.erase(m_CheckSrvs.begin() + i);
2840 m_CheckSrvs.insert(m_CheckSrvs.begin(), srv_id);
2841 break;
2842 }
2843 }
2844 }
2845 }
2846
2847 if (m_LatestExist && m_Quorum != 0) {
2848 --m_Quorum;
2849 }
2850 x_LogCmdEvent("ReadMetaNextPeer");
2851 return &CNCMessageHandler::x_ReadMetaNextPeer;
2852 }
2853
2854 CNCMessageHandler::State
x_ReportBlobNotFound(void)2855 CNCMessageHandler::x_ReportBlobNotFound(void)
2856 {
2857 LOG_CURRENT_FUNCTION
2858 x_SetFlag(fNoBlobAccessStats);
2859 x_ReportError(eStatus_NotFound, false);
2860 if (!x_IsHttpMode()) {
2861 if (m_AgeMax != 0 && m_BlobAccess->IsBlobExists()) {
2862 WriteText(", AGE=").WriteNumber(m_AgeCur);
2863 WriteText(", VER=").WriteNumber(m_BlobAccess->GetCurBlobVersion());
2864 }
2865 WriteText("\n");
2866 }
2867 return &CNCMessageHandler::x_FinishCommand;
2868 }
2869
2870 void
x_ProlongBlobDeadTime(unsigned int add_time)2871 CNCMessageHandler::x_ProlongBlobDeadTime(unsigned int add_time)
2872 {
2873 LOG_CURRENT_FUNCTION
2874
2875 CSrvTime cur_srv_time = CSrvTime::Current();
2876 Uint8 cur_time = cur_srv_time.AsUSec();
2877 int now = int(cur_srv_time.Sec());
2878 int new_expire = now + add_time;
2879 if (new_expire < now) {
2880 return;
2881 }
2882 int old_expire = m_BlobAccess->GetCurBlobExpire();
2883 if (!CNCServer::IsDebugMode() &&
2884 new_expire - old_expire < (int)m_AppSetup->ttl_unit) {
2885 return;
2886 }
2887
2888 if (m_AppSetup->lifespan_ttl > 0) {
2889 int created = (int)(m_BlobAccess->GetCurBlobCreateTime()/kUSecsPerSecond);
2890 int retire = (int)(created + m_AppSetup->lifespan_ttl);
2891 if (retire > created) {
2892 new_expire = min(new_expire,retire);
2893 }
2894 }
2895
2896 m_BlobAccess->SetCurBlobExpire(new_expire);
2897 if (m_BlobAccess->GetCurBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
2898 SNCSyncEvent* event = new SNCSyncEvent();
2899 event->blob_size = m_BlobAccess->GetCurBlobSize();
2900 event->event_type = eSyncProlong;
2901 event->key = m_NCBlobKey;
2902 event->orig_server = CNCDistributionConf::GetSelfID();
2903 event->orig_time = cur_time;
2904 CNCSyncLog::AddEvent(m_BlobSlot, event);
2905 CNCPeerControl::MirrorProlong(m_NCBlobKey, m_BlobSlot,
2906 event->orig_rec_no, cur_time, m_BlobAccess);
2907 }
2908 }
2909
2910 void
x_ProlongVersionLife(void)2911 CNCMessageHandler::x_ProlongVersionLife(void)
2912 {
2913 LOG_CURRENT_FUNCTION
2914 CSrvTime cur_srv_time = CSrvTime::Current();
2915 Uint8 cur_time = cur_srv_time.AsUSec();
2916 int new_expire = int(cur_srv_time.Sec()) + m_BlobAccess->GetCurVersionTTL();
2917 int old_expire = m_BlobAccess->GetCurVerExpire();
2918 if (!CNCServer::IsDebugMode() && new_expire - old_expire < m_AppSetup->ttl_unit)
2919 return;
2920
2921 m_BlobAccess->SetCurVerExpire(new_expire);
2922 if (m_BlobAccess->GetCurBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
2923 SNCSyncEvent* event = new SNCSyncEvent();
2924 event->blob_size = m_BlobAccess->GetCurBlobSize();
2925 event->event_type = eSyncProlong;
2926 event->key = m_NCBlobKey;
2927 event->orig_server = CNCDistributionConf::GetSelfID();
2928 event->orig_time = cur_time;
2929 CNCSyncLog::AddEvent(m_BlobSlot, event);
2930 CNCPeerControl::MirrorProlong(m_NCBlobKey, m_BlobSlot,
2931 event->orig_rec_no, cur_time, m_BlobAccess);
2932 }
2933 }
2934
2935 void
x_CleanCmdResources(void)2936 CNCMessageHandler::x_CleanCmdResources(void)
2937 {
2938 LOG_CURRENT_FUNCTION
2939 int cmd_status = GetDiagCtx()->GetRequestStatus();
2940 bool print_size = false;
2941 Uint8 written_size = 0;
2942 ENCAccessType access_type = eNCCopyCreate;
2943 if (m_write_event) {
2944 delete m_write_event;
2945 m_write_event = NULL;
2946 }
2947 if (m_BlobAccess) {
2948 access_type = m_BlobAccess->GetAccessType();
2949 if (m_BlobAccess->IsBlobExists() && !x_IsFlagSet(fNoBlobAccessStats)) {
2950 print_size = true;
2951 if (access_type == eNCRead || access_type == eNCReadData)
2952 m_BlobSize = m_BlobAccess->GetCurBlobSize();
2953 else
2954 m_BlobSize = m_BlobAccess->GetNewBlobSize();
2955 if (access_type == eNCReadData)
2956 GetDiagCtx()->SetBytesWr(m_BlobAccess->GetSizeRead());
2957 else if (access_type == eNCCreate || access_type == eNCCopyCreate)
2958 GetDiagCtx()->SetBytesRd(m_BlobSize);
2959 }
2960 else if (access_type == eNCCreate) {
2961 written_size = m_BlobAccess->GetNewBlobSize();
2962 }
2963 m_BlobAccess->Release();
2964 m_BlobAccess = NULL;
2965 }
2966 else if (m_ActiveHub) {
2967 switch (m_ParsedCmd.command->extra.proxy_cmd) {
2968 case eProxyRead:
2969 case eProxyReadLast:
2970 print_size = true;
2971 m_BlobSize = m_ActiveHub->GetHandler()->GetSizeRd();
2972 GetDiagCtx()->SetBytesWr(m_BlobSize);
2973 break;
2974 case eProxyWrite:
2975 print_size = true;
2976 m_BlobSize = m_ActiveHub->GetHandler()->GetSizeWr();
2977 GetDiagCtx()->SetBytesRd(m_BlobSize);
2978 break;
2979 default:
2980 break;
2981 }
2982 m_ActiveHub->GetHandler()->ResetSizeRdWr();
2983 }
2984
2985 if (x_IsFlagSet(fRunsInStartedSync)) {
2986 if (x_IsCmdSucceeded(cmd_status) || x_IsFlagSet(fSyncCmdSuccessful)) {
2987 CNCPeriodicSync::SyncCommandFinished(m_SrvId, m_Slot, m_SyncId);
2988 } else {
2989 CNCPeriodicSync::Cancel(m_SrvId, m_Slot, m_SyncId);
2990 }
2991 }
2992 if (!x_IsFlagSet(fNoReplyOnFinish)) {
2993 if (!x_IsHttpMode()) {
2994 if (x_IsCmdSucceeded(cmd_status)) {
2995 WriteText("OK:\n");
2996 } else {
2997 WriteText(GetMessageByStatus(EHTTPStatus(cmd_status))).WriteText("\n");
2998 }
2999 } else {
3000 x_WriteHttpResponse();
3001 }
3002 }
3003 Flush();
3004
3005 if (m_ActiveHub) {
3006 m_ActiveHub->Release();
3007 m_ActiveHub = NULL;
3008 }
3009 m_CheckSrvs.clear();
3010 m_SrvsIndex = 0;
3011 m_ChunkLen = 0;
3012 m_LastPeerError.clear();
3013
3014 CSrvTime cmd_len = CSrvTime::Current();
3015 cmd_len -= m_CmdStartTime;
3016 Uint8 len_usec = cmd_len.AsUSec();
3017 if (IsLongCommand(len_usec)) {
3018 x_LogCmdLog();
3019 }
3020
3021 if (print_size && (x_IsCmdSucceeded(cmd_status) || m_BlobSize != 0))
3022 CSrvDiagMsg().PrintExtra().PrintParam("blob_size", m_BlobSize);
3023 CSrvDiagMsg().StopRequest();
3024
3025 // CSrvTime cmd_len = CSrvTime::Current();
3026 // cmd_len -= m_CmdStartTime;
3027 // Uint8 len_usec = cmd_len.AsUSec();
3028 CNCStat::CmdFinished(m_ParsedCmd.command->cmd, len_usec, cmd_status);
3029 if (m_Flags & fComesFromClient) {
3030 if (access_type == eNCCreate) {
3031 if (print_size && x_IsCmdSucceeded(cmd_status))
3032 CNCStat::ClientBlobWrite(m_BlobSize, len_usec);
3033 else
3034 CNCStat::ClientBlobRollback(written_size);
3035 }
3036 else if (access_type == eNCReadData) {
3037 if (print_size)
3038 CNCStat::ClientBlobRead(m_BlobSize, len_usec);
3039 }
3040 }
3041 ++m_CntCmds;
3042
3043 if (x_IsFlagSet(fNeedsLowerPriority))
3044 SetPriority(GetDefaultTaskPriority());
3045
3046 m_SendBuff.reset();
3047 ReleaseDiagCtx();
3048
3049 m_PosponedCmd.clear();
3050 }
3051
3052 CNCMessageHandler::State
x_FinishCommand(void)3053 CNCMessageHandler::x_FinishCommand(void)
3054 {
3055 LOG_CURRENT_FUNCTION
3056 int status = GetDiagCtx()->GetRequestStatus();
3057 if (x_IsFlagSet(eBlobPut) && m_NCBlobKey.IsICacheKey()) {
3058 x_JournalBlobPutResult(status, m_NCBlobKey.PackedKey(), m_BlobSlot);
3059 }
3060 if (x_IsFlagSet(fCursedPUT2Cmd))
3061 return &CNCMessageHandler::x_CloseCmdAndConn;
3062
3063 x_CleanCmdResources();
3064 SetState(&CNCMessageHandler::x_ReadCommand);
3065 SetRunnable();
3066 return NULL;
3067 }
3068
3069 CNCMessageHandler::State
x_StartReadingBlob(void)3070 CNCMessageHandler::x_StartReadingBlob(void)
3071 {
3072 LOG_CURRENT_FUNCTION
3073 // Flushing the initial response line that client should receive before it
3074 // will start writing blob data.
3075 Flush();
3076 m_BlobSize = 0;
3077 if (NeedEarlyClose())
3078 return &CNCMessageHandler::x_FinishCommand;
3079 else
3080 return &CNCMessageHandler::x_ReadBlobSignature;
3081 }
3082
3083 CNCMessageHandler::State
x_FinishReadingBlob(void)3084 CNCMessageHandler::x_FinishReadingBlob(void)
3085 {
3086 LOG_CURRENT_FUNCTION
3087 x_LogCmdEvent("FinishReadingBlob");
3088 bool fail = false, keep_conn = !x_IsFlagSet(fNoReplyOnFinish);
3089 string errmsg;
3090 if (x_IsFlagSet(fReadExactBlobSize) && m_BlobSize != m_Size) {
3091 fail = true;
3092 x_ReportError(eStatus_CondFailed);
3093 SRV_LOG(Error, "Wrong data for blob size " << m_Size
3094 << " (received " << m_BlobSize << " bytes)");
3095 }
3096 else if (m_BlobSize > CNCBlobStorage::GetMaxBlobSizeStore()) {
3097 fail = true;
3098 x_ReportError(eStatus_BlobTooBig);
3099 SRV_LOG(Error, "Blob size exceeds the allowed maximum of "
3100 << CNCBlobStorage::GetMaxBlobSizeStore()
3101 << " (received " << m_BlobSize << " bytes)");
3102 }
3103 if (fail) {
3104 if (keep_conn) {
3105 return &CNCMessageHandler::x_FinishCommand;
3106 } else {
3107 return &CNCMessageHandler::x_CloseCmdAndConn;
3108 }
3109 }
3110
3111 if (!x_IsCmdSucceeded(GetDiagCtx()->GetRequestStatus()))
3112 return &CNCMessageHandler::x_FinishCommand;
3113 if (NeedEarlyClose())
3114 return &CNCMessageHandler::x_CloseCmdAndConn;
3115
3116 // Fill all new event data but not add it to CNCSyncLog until we execute
3117 // m_BlobAccess->Finalize().
3118 SNCSyncEvent* write_event = new SNCSyncEvent();
3119 write_event->blob_size = m_BlobSize;
3120 write_event->event_type = eSyncWrite;
3121 write_event->key = m_NCBlobKey;
3122 if (x_IsFlagSet(fCopyLogEvent)) {
3123 write_event->orig_time = m_BlobAccess->GetNewBlobCreateTime();
3124 write_event->orig_server = m_BlobAccess->GetNewCreateServer();
3125 write_event->orig_rec_no = m_OrigRecNo;
3126 }
3127 else {
3128 CSrvTime cur_srv_time = CSrvTime::Current();
3129 Uint8 cur_time = cur_srv_time.AsUSec();
3130 int cur_secs = int(cur_srv_time.Sec());
3131 m_BlobAccess->SetBlobCreateTime(cur_time);
3132 if (x_IsUserFlagSet(fNoProlong) && m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired()) {
3133 m_BlobAccess->SetNewBlobExpire(m_BlobAccess->GetCurBlobExpire());
3134 m_BlobAccess->SetNewVerExpire(m_BlobAccess->GetCurVerExpire());
3135 } else {
3136 if (m_BlobAccess->GetNewBlobExpire() == 0) {
3137 m_BlobAccess->SetNewBlobExpire(cur_secs + m_BlobAccess->GetNewBlobTTL());
3138 }
3139 m_BlobAccess->SetNewVerExpire(cur_secs + m_BlobAccess->GetNewVersionTTL());
3140 }
3141 m_BlobAccess->SetCreateServer(CNCDistributionConf::GetSelfID(),
3142 CNCBlobStorage::GetNewBlobId());
3143 write_event->orig_server = CNCDistributionConf::GetSelfID();
3144 write_event->orig_time = cur_time;
3145 }
3146
3147 x_LogCmdEvent("Finalize");
3148 m_BlobAccess->Finalize();
3149 if (m_BlobAccess->HasError()) {
3150 delete write_event;
3151 GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3152 if (x_IsFlagSet(fNoReplyOnFinish))
3153 return &CNCMessageHandler::x_CloseCmdAndConn;
3154
3155 x_ReportError("ERR:Error while reading blob");
3156 return &CNCMessageHandler::x_FinishCommand;
3157 }
3158
3159 m_MirrorsDone.clear();
3160 if (!x_IsFlagSet(fCopyLogEvent)) {
3161 if (m_BlobAccess->GetNewBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
3162 #if 0
3163 if (m_BlobAccess->IsCurBlobExpired()) {
3164 delete write_event;
3165 CNCPeerControl::MirrorRemove(m_NCBlobKey, m_BlobSlot, m_BlobAccess->GetNewBlobCreateTime()-1);
3166 return &CNCMessageHandler::x_FinishCommand;
3167 }
3168 #endif
3169 // If fCopyLogEvent is not set then this blob comes from client and
3170 // thus we need to check quorum value before answering to client.
3171 // If fCopyLogEvent is set then this write comes from other server
3172 // and we don't care about quorum in this case.
3173 if (m_Quorum != 1) {
3174 if (m_Quorum != 0)
3175 --m_Quorum;
3176 x_GetCurSlotServers();
3177 m_write_event = write_event;
3178 // probably, this was made intentionally, but now it looks wrong
3179 // let us respect quorum setting always
3180 // if (!m_ThisServerIsMain || !m_AppSetup->fast_on_main)
3181 return &CNCMessageHandler::x_PutToNextPeer;
3182 }
3183 m_OrigRecNo = CNCSyncLog::AddEvent(m_BlobSlot, write_event);
3184 CNCPeerControl::MirrorWrite(m_NCBlobKey, m_BlobSlot,
3185 m_OrigRecNo, m_BlobAccess->GetNewBlobSize(), m_MirrorsDone);
3186 } else {
3187 if (CNCDistributionConf::GetWarnBlobSizeSync()) {
3188 SRV_LOG(Warning, "Received blob is too big and will not be mirrored:"
3189 << " blob key:" << m_NCBlobKey.RawKey()
3190 << " blob size: " << m_BlobAccess->GetNewBlobSize()
3191 << " max allowed: " << CNCDistributionConf::GetMaxBlobSizeSync());
3192 }
3193 if (CNCDistributionConf::IsThisServerKey(m_NCBlobKey)) {
3194 CNCPeerControl::MirrorRemove(m_NCBlobKey, m_BlobSlot, m_BlobAccess->GetNewBlobCreateTime()-1);
3195 } else {
3196 Uint8 srvId = CNCDistributionConf::GetMainSrvId(m_NCBlobKey);
3197 if (srvId != 0) {
3198 m_CheckSrvs.push_back(srvId);
3199 m_SrvsIndex = 0;
3200 return &CNCMessageHandler::x_PutToNextPeer;
3201 }
3202 }
3203 delete write_event;
3204 }
3205 }
3206 else if (m_OrigRecNo != 0) {
3207 CNCSyncLog::AddEvent(m_BlobSlot, write_event);
3208 }
3209 else {
3210 // m_OrigRecNo can be 0 if blob comes from another server as a result
3211 // of synchronization by blob lists. In this case there's no event to
3212 // link to and thus we don't need to add event to our sync log.
3213 delete write_event;
3214 }
3215
3216 return &CNCMessageHandler::x_FinishCommand;
3217 }
3218
3219 CNCMessageHandler::State
x_CloseOnPeerError(void)3220 CNCMessageHandler::x_CloseOnPeerError(void)
3221 {
3222 LOG_CURRENT_FUNCTION
3223 SRV_LOG(Warning, "Error executing command on peer "
3224 << m_ActiveHub->GetFullPeerName() << ", peer says: "
3225 << m_ActiveHub->GetErrMsg());
3226 GetDiagCtx()->SetRequestStatus(eStatus_PeerError);
3227 return &CNCMessageHandler::x_CloseCmdAndConn;
3228 }
3229
3230 CNCMessageHandler::State
x_ReadBlobSignature(void)3231 CNCMessageHandler::x_ReadBlobSignature(void)
3232 {
3233 LOG_CURRENT_FUNCTION
3234 if (x_IsHttpMode()) {
3235 return &CNCMessageHandler::x_ReadBlobChunkLength;
3236 }
3237
3238 Uint4 sig = 0;
3239 bool has_sig = ReadNumber(&sig);
3240 if (NeedEarlyClose())
3241 return &CNCMessageHandler::x_CloseCmdAndConn;
3242 if (!has_sig)
3243 return NULL;
3244
3245 x_LogCmdEvent("ReadBlobSignature");
3246 if (sig == 0x04030201) {
3247 x_SetFlag(fSwapLengthBytes);
3248 return &CNCMessageHandler::x_ReadBlobChunkLength;
3249 }
3250 if (sig == 0x01020304) {
3251 x_UnsetFlag(fSwapLengthBytes);
3252 return &CNCMessageHandler::x_ReadBlobChunkLength;
3253 }
3254
3255 GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
3256 SRV_LOG(Error, "Cannot determine the byte order. Got: "
3257 << NStr::UIntToString(sig, 0, 16));
3258 return &CNCMessageHandler::x_CloseCmdAndConn;
3259 }
3260
3261 CNCMessageHandler::State
x_ReadBlobChunkLength(void)3262 CNCMessageHandler::x_ReadBlobChunkLength(void)
3263 {
3264 LOG_CURRENT_FUNCTION
3265 if (m_ActiveHub) {
3266 if (ProxyHadError()) {
3267 // If we were proxying blob data from client to another server and some
3268 // error occurred protocol doesn't allow us to anything else but
3269 // close both connections - to client and to other NC server.
3270 CSrvSocketTask* active_sock = m_ActiveHub->GetHandler()->GetSocket();
3271 if (active_sock && active_sock->HasError())
3272 return &CNCMessageHandler::x_CloseOnPeerError;
3273 else
3274 return &CNCMessageHandler::x_CloseCmdAndConn;
3275 }
3276 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3277 if (status == eNCHubError)
3278 return &CNCMessageHandler::x_CloseOnPeerError;
3279 else if (status != eNCHubCmdInProgress) {
3280 SRV_FATAL("Unexpected client status: " << status);
3281 }
3282
3283 if (m_ChunkLen != 0) {
3284 CNCStat::ClientDataWrite(m_ChunkLen);
3285 CNCStat::PeerDataRead(m_ChunkLen);
3286 }
3287 }
3288
3289 bool has_chunklen = true;
3290 if (x_IsFlagSet(fSkipBlobEOF) && m_BlobSize == m_Size) {
3291 // Workaround for old STRS
3292 m_ChunkLen = 0xFFFFFFFF;
3293 }
3294 else if (x_IsHttpMode()) {
3295 m_ChunkLen = m_Size;
3296 has_chunklen = m_ChunkLen != 0;
3297 }
3298 else {
3299 has_chunklen = ReadNumber(&m_ChunkLen);
3300 if (!has_chunklen && !CanHaveMoreRead() && x_IsFlagSet(fCursedPUT2Cmd)) {
3301 return &CNCMessageHandler::x_FinishReadingBlob;
3302 }
3303 }
3304
3305 if (NeedEarlyClose())
3306 return &CNCMessageHandler::x_CloseCmdAndConn;
3307 if (!has_chunklen)
3308 return NULL;
3309
3310 if (x_IsFlagSet(fSwapLengthBytes))
3311 m_ChunkLen = CByteSwap::GetInt4((const unsigned char*)&m_ChunkLen);
3312 if (m_ChunkLen == 0xFFFFFFFF) {
3313 if (m_ActiveHub) {
3314 // Data transfer is finished, CNCActiveHandler will wait for response
3315 // from other NC server and wake us up.
3316 m_ActiveHub->GetHandler()->SetRunnable();
3317 return &CNCMessageHandler::x_WaitForPeerAnswer;
3318 }
3319 return &CNCMessageHandler::x_FinishReadingBlob;
3320 }
3321
3322 if (!m_BlobAccess && !m_ActiveHub) {
3323 // We can be here only when expecting fake start of blob writing from old
3324 // NC server, but for some reason we got non-EOF chunk length.
3325 GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
3326 SRV_LOG(Critical, "Received non-EOF chunk len from peer "
3327 << m_SrvId << ": " << m_ChunkLen);
3328 return &CNCMessageHandler::x_CloseCmdAndConn;
3329 }
3330
3331 if (x_IsFlagSet(fComesFromClient)) {
3332 if (m_BlobSize > CNCBlobStorage::GetMaxBlobSizeStore()) {
3333 RunAfter(1);
3334 return NULL;
3335 }
3336 // this is potentially dangerous, because
3337 // if we close connection here, in the middle of transmission,
3338 // client might have no chance (most likely) to receive our error message
3339 // and might want to retry thinking it was bad luck
3340 if (x_IsFlagSet(fReadExactBlobSize) && (m_BlobSize + m_ChunkLen) > m_Size) {
3341 x_ReportError(eStatus_CondFailed);
3342 SRV_LOG(Error, "Too much data for blob size " << m_Size
3343 << " (received at least "
3344 << (m_BlobSize + m_ChunkLen) << " bytes)");
3345 return &CNCMessageHandler::x_CloseCmdAndConn;
3346 }
3347 if ((m_BlobSize + m_ChunkLen) > CNCBlobStorage::GetMaxBlobSizeStore()) {
3348 x_ReportError(eStatus_BlobTooBig);
3349 if (x_IsHttpMode()) {
3350 x_WriteHttpHeader(eStatus_BlobTooBig, 0, false);
3351 }
3352 SRV_LOG(Error, "Blob size exceeds the allowed maximum of "
3353 << CNCBlobStorage::GetMaxBlobSizeStore()
3354 << " (received " << m_BlobSize
3355 << ", next chunk " << m_ChunkLen << " bytes)");
3356 // I am not going to read it anyway
3357 m_BlobSize += m_ChunkLen;
3358 Flush();
3359 RunAfter(1);
3360 return NULL;
3361 // return &CNCMessageHandler::x_CloseCmdAndConn;
3362 }
3363 }
3364
3365 if (m_ActiveHub) {
3366 CSrvSocketTask* active_sock = m_ActiveHub->GetHandler()->GetSocket();
3367 active_sock->WriteData(&m_ChunkLen, sizeof(m_ChunkLen));
3368 m_ActiveHub->GetHandler()->AddSizeWr(m_ChunkLen);
3369 StartProxyTo(active_sock, m_ChunkLen);
3370 if (IsProxyInProgress())
3371 return NULL;
3372 else
3373 return &CNCMessageHandler::x_ReadBlobChunkLength;
3374 }
3375
3376 return &CNCMessageHandler::x_ReadBlobChunk;
3377 }
3378
3379 CNCMessageHandler::State
x_ReadBlobChunk(void)3380 CNCMessageHandler::x_ReadBlobChunk(void)
3381 {
3382 LOG_CURRENT_FUNCTION
3383 // there are too many of them
3384 // x_LogCmdEvent("ReadBlobChunk");
3385 while (m_ChunkLen != 0) {
3386 Uint4 read_len = Uint4(m_BlobAccess->GetWriteMemSize());
3387 if (m_BlobAccess->HasError()) {
3388 GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3389 if (!x_IsFlagSet(fNoReplyOnFinish) && !x_IsHttpMode()) {
3390 x_ReportError("ERR:Server error");
3391 Flush();
3392 }
3393 SetState(&CNCMessageHandler::x_CloseCmdAndConn);
3394 SetRunnable();
3395 return NULL;
3396 }
3397 if (read_len == 0)
3398 return NULL;
3399 if (read_len > m_ChunkLen)
3400 read_len = m_ChunkLen;
3401
3402 Uint4 n_read = Uint4(Read(m_BlobAccess->GetWriteMemPtr(), read_len));
3403 if (n_read != 0) {
3404 if (m_Flags & fComesFromClient)
3405 CNCStat::ClientDataWrite(n_read);
3406 else
3407 CNCStat::PeerDataWrite(n_read);
3408 }
3409 if (NeedEarlyClose())
3410 return &CNCMessageHandler::x_CloseCmdAndConn;
3411 if (n_read == 0)
3412 return NULL;
3413
3414 m_BlobAccess->MoveWritePos(n_read);
3415 m_ChunkLen -= n_read;
3416 m_BlobSize += n_read;
3417 }
3418 return &CNCMessageHandler::x_ReadBlobChunkLength;
3419 }
3420
3421 CNCMessageHandler::State
x_WriteBlobData(void)3422 CNCMessageHandler::x_WriteBlobData(void)
3423 {
3424 LOG_CURRENT_FUNCTION
3425 x_LogCmdEvent("WriteBlobData");
3426 while (m_Size != 0) {
3427 if (m_BlobAccess->GetPosition() == m_BlobAccess->GetCurBlobSize())
3428 return &CNCMessageHandler::x_FinishCommand;
3429
3430 Uint4 want_read = m_BlobAccess->GetReadMemSize();
3431 if (m_BlobAccess->HasError()) {
3432 GetDiagCtx()->SetRequestStatus(eStatus_ServerError);
3433 return &CNCMessageHandler::x_CloseCmdAndConn;
3434 }
3435 if (m_Size != Uint8(-1) && m_Size < want_read)
3436 want_read = Uint4(m_Size);
3437
3438 Uint4 n_written = Uint4(Write(m_BlobAccess->GetReadMemPtr(), want_read));
3439 // x_LogCmdEvent("Write");
3440 if (n_written != 0) {
3441 if (m_Flags & fComesFromClient)
3442 CNCStat::ClientDataRead(n_written);
3443 else
3444 CNCStat::PeerDataRead(n_written);
3445 m_BlobAccess->MoveReadPos(n_written);
3446 if (m_Size != Uint8(-1))
3447 m_Size -= n_written;
3448 }
3449 if (NeedEarlyClose())
3450 return &CNCMessageHandler::x_CloseCmdAndConn;
3451 if (n_written == 0)
3452 return NULL;
3453 }
3454 return &CNCMessageHandler::x_FinishCommand;
3455 }
3456
3457 CNCMessageHandler::State
x_WriteSendBuff(void)3458 CNCMessageHandler::x_WriteSendBuff(void)
3459 {
3460 LOG_CURRENT_FUNCTION
3461 while (m_SendPos != m_SendBuff->size()) {
3462 size_t n_written = Write(m_SendBuff->data() + m_SendPos,
3463 m_SendBuff->size() - m_SendPos);
3464
3465 if (NeedEarlyClose())
3466 return &CNCMessageHandler::x_CloseCmdAndConn;
3467 if (n_written == 0)
3468 return NULL;
3469
3470 m_SendPos += n_written;
3471 }
3472 if (strcmp(m_ParsedCmd.command->cmd, "SYNC_START") == 0) {
3473 return &CNCMessageHandler::x_WriteSyncStartExtra;
3474 }
3475 ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
3476 if (proxy_cmd == eProxyGetBList2) {
3477 Flush();
3478 return &CNCMessageHandler::x_DoCmd_GetBListNext;
3479 }
3480 return &CNCMessageHandler::x_FinishCommand;
3481 }
3482
3483 CNCMessageHandler::State
x_WriteSyncStartExtra(void)3484 CNCMessageHandler::x_WriteSyncStartExtra(void)
3485 {
3486 LOG_CURRENT_FUNCTION
3487 WriteText("PURGE:\n");
3488 WriteText(CNCBlobAccessor::GetPurgeData()). WriteText(";\n");
3489 return &CNCMessageHandler::x_FinishCommand;
3490 }
3491
3492 CNCMessageHandler::State
x_ProxyToNextPeer(void)3493 CNCMessageHandler::x_ProxyToNextPeer(void)
3494 {
3495 LOG_CURRENT_FUNCTION
3496 if (NeedEarlyClose())
3497 return &CNCMessageHandler::x_CloseCmdAndConn;
3498 x_LogCmdEvent("ProxyToNextPeer");
3499 if (m_SrvsIndex < m_CheckSrvs.size()) {
3500 Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3501 if (m_ActiveHub) {
3502 SRV_FATAL("Previous client not released");
3503 }
3504 m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3505 return &CNCMessageHandler::x_SendCmdAsProxy;
3506 }
3507
3508 // Either there's no servers to execute this command on or all servers were
3509 // tried and some error was the result from all of them.
3510 SRV_LOG(Warning, "Got error on all peer servers, LastPeerError is " << m_LastPeerError);
3511 if (m_LastPeerError.empty())
3512 m_LastPeerError = "ERR:Cannot execute command on peer servers";
3513 GetDiagCtx()->SetRequestStatus( GetStatusByMessage(m_LastPeerError, eStatus_PeerError));
3514 x_ReportError(m_LastPeerError);
3515 return &CNCMessageHandler::x_FinishCommand;
3516 }
3517
3518 CNCMessageHandler::State
x_SendCmdAsProxy(void)3519 CNCMessageHandler::x_SendCmdAsProxy(void)
3520 {
3521 LOG_CURRENT_FUNCTION
3522 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3523 if (status == eNCHubWaitForConn)
3524 return NULL;
3525 x_LogCmdEvent("SendCmdAsProxy");
3526 ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
3527 CNCActiveHandler* pHandler = m_ActiveHub->GetHandler();
3528 if (status == eNCHubError ||
3529 status == eNCHubSuccess ||
3530 (proxy_cmd == eProxyGetBList && !pHandler->GetPeer()->AcceptsBList()) ||
3531 (proxy_cmd == eProxyGetBList2 && !pHandler->GetPeer()->AcceptsBList2()) ||
3532 !pHandler->GetPeer()->AcceptsBlobKey(m_NCBlobKey)
3533 ) {
3534 m_LastPeerError = m_ActiveHub->GetErrMsg();
3535 m_ActiveHub->Release();
3536 m_ActiveHub = NULL;
3537 return &CNCMessageHandler::x_ProxyToNextPeer;
3538 }
3539 if (status != eNCHubConnReady) {
3540 SRV_FATAL("Unexpected client status: " << status);
3541 }
3542 if (NeedEarlyClose())
3543 return &CNCMessageHandler::x_CloseCmdAndConn;
3544
3545
3546 switch (proxy_cmd) {
3547 case eProxyRead:
3548 pHandler->ProxyRead(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3549 m_BlobVersion, m_StartPos, m_Size,
3550 m_Quorum, m_SearchOnRead, m_ForceLocal,
3551 m_AgeMax);
3552 break;
3553 case eProxyWrite:
3554 pHandler->ProxyWrite(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3555 m_BlobVersion, m_BlobTTL, m_Quorum, m_UserFlags);
3556 // The only place that needs to go further to a different state.
3557 return &CNCMessageHandler::x_WriteInitWriteResponse;
3558 case eProxyHasBlob:
3559 pHandler->ProxyHasBlob(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3560 m_Quorum);
3561 break;
3562 case eProxyGetSize:
3563 pHandler->ProxyGetSize(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3564 m_BlobVersion, m_Quorum,
3565 m_SearchOnRead, m_ForceLocal);
3566 break;
3567 case eProxyReadLast:
3568 pHandler->ProxyReadLast(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3569 m_StartPos, m_Size, m_Quorum,
3570 m_SearchOnRead, m_ForceLocal, m_AgeMax);
3571 break;
3572 case eProxySetValid:
3573 pHandler->ProxySetValid(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3574 m_BlobVersion);
3575 break;
3576 case eProxyRemove:
3577 pHandler->ProxyRemove(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3578 m_BlobVersion, m_Quorum);
3579 break;
3580 case eProxyGetMeta:
3581 pHandler->ProxyGetMeta(GetDiagCtx(), m_NCBlobKey,
3582 m_Quorum, m_ForceLocal, m_HttpMode);
3583 break;
3584 case eProxyProlong:
3585 pHandler->ProxyProlong(GetDiagCtx(), m_NCBlobKey, m_RawBlobPass,
3586 m_BlobTTL, m_Quorum,
3587 m_SearchOnRead, m_ForceLocal);
3588 break;
3589 case eProxyGetBList:
3590 pHandler->ProxyBList(GetDiagCtx(), m_NCBlobKey, m_ForceLocal, nullptr);
3591 break;
3592 case eProxyGetBList2:
3593 pHandler->ProxyBList(GetDiagCtx(), m_NCBlobKey, m_ForceLocal, m_BlobFilter);
3594 break;
3595 default:
3596 SRV_FATAL("Unsupported command: " << m_ParsedCmd.command->extra.proxy_cmd);
3597 }
3598
3599 return &CNCMessageHandler::x_WaitForPeerAnswer;
3600 }
3601
3602 CNCMessageHandler::State
x_WaitForPeerAnswer(void)3603 CNCMessageHandler::x_WaitForPeerAnswer(void)
3604 {
3605 LOG_CURRENT_FUNCTION
3606 if (NeedEarlyClose())
3607 return &CNCMessageHandler::x_CloseCmdAndConn;
3608 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3609 if (status == eNCHubCmdInProgress)
3610 return NULL;
3611
3612 x_LogCmdEvent("WaitForPeerAnswer");
3613 if (status == eNCHubError) {
3614 if (m_ActiveHub->GetHandler()->GotClientResponse())
3615 return &CNCMessageHandler::x_CloseOnPeerError;
3616
3617 m_LastPeerError = m_ActiveHub->GetErrMsg();
3618 SRV_LOG(Warning, "Error executing command on peer "
3619 << m_ActiveHub->GetFullPeerName() << ", peer says: " << m_LastPeerError);
3620 m_ActiveHub->Release();
3621 m_ActiveHub = NULL;
3622 return &CNCMessageHandler::x_ProxyToNextPeer;
3623 }
3624 if (status != eNCHubSuccess) {
3625 SRV_FATAL("Unexpected client status: " << status);
3626 }
3627 ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
3628 if (proxy_cmd == eProxyGetBList2) {
3629 return &CNCMessageHandler::x_DoCmd_GetBListNext;
3630 }
3631
3632 const string& err_msg = m_ActiveHub->GetErrMsg();
3633 EHTTPStatus rst = GetStatusByMessage(err_msg, eStatus_OK);
3634 if (rst != eStatus_OK) {
3635 GetDiagCtx()->SetRequestStatus(rst);
3636 }
3637 if (!err_msg.empty()) {
3638 x_ReportError(err_msg);
3639 }
3640 return &CNCMessageHandler::x_FinishCommand;
3641 }
3642
3643 CNCMessageHandler::State
x_ReadMetaNextPeer(void)3644 CNCMessageHandler::x_ReadMetaNextPeer(void)
3645 {
3646 LOG_CURRENT_FUNCTION
3647 if (NeedEarlyClose())
3648 return &CNCMessageHandler::x_CloseCmdAndConn;
3649 if (m_SrvsIndex >= m_CheckSrvs.size())
3650 return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
3651
3652 Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3653 if (m_ActiveHub) {
3654 SRV_FATAL("Previous client not released");
3655 }
3656 m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3657 return &CNCMessageHandler::x_SendGetMetaCmd;
3658 }
3659
3660 CNCMessageHandler::State
x_SendGetMetaCmd(void)3661 CNCMessageHandler::x_SendGetMetaCmd(void)
3662 {
3663 LOG_CURRENT_FUNCTION
3664 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3665 if (status == eNCHubWaitForConn)
3666 return NULL;
3667 if (status == eNCHubError || status == eNCHubSuccess ||
3668 !m_ActiveHub->GetHandler()->GetPeer()->AcceptsBlobKey(m_NCBlobKey)) {
3669 m_LastPeerError = m_ActiveHub->GetErrMsg();
3670 m_ActiveHub->Release();
3671 m_ActiveHub = NULL;
3672 return &CNCMessageHandler::x_ReadMetaNextPeer;
3673 }
3674 if (status != eNCHubConnReady) {
3675 SRV_FATAL("Unexpected client status: " << status);
3676 }
3677 if (NeedEarlyClose())
3678 return &CNCMessageHandler::x_CloseCmdAndConn;
3679
3680 m_ActiveHub->GetHandler()->SearchMeta(GetDiagCtx(), m_NCBlobKey);
3681 return &CNCMessageHandler::x_ReadMetaResults;
3682 }
3683
3684 CNCMessageHandler::State
x_ReadMetaResults(void)3685 CNCMessageHandler::x_ReadMetaResults(void)
3686 {
3687 LOG_CURRENT_FUNCTION
3688 if (NeedEarlyClose())
3689 return &CNCMessageHandler::x_CloseCmdAndConn;
3690 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3691 if (status == eNCHubCmdInProgress)
3692 return NULL;
3693 if (status == eNCHubError)
3694 goto results_processed;
3695 if (status != eNCHubSuccess) {
3696 SRV_FATAL("Unexpected client status: " << status);
3697 }
3698
3699 CNCActiveHandler* handler;
3700 handler = m_ActiveHub->GetHandler();
3701 const SNCBlobSummary* cur_blob_sum;
3702 cur_blob_sum = &handler->GetBlobSummary();
3703 bool cur_exist;
3704 cur_exist = handler->IsBlobExists();
3705 if (!cur_exist && !x_IsFlagSet(fPeerFindExistsOnly))
3706 goto results_processed;
3707
3708 if (cur_exist && x_IsFlagSet(fPeerFindExistsOnly)) {
3709 m_LatestExist = true;
3710 m_LatestSrvId = m_CheckSrvs[m_SrvsIndex - 1];
3711 goto meta_search_finished;
3712 }
3713 if (cur_exist && (!m_LatestExist || m_LatestBlobSum->isOlder(*cur_blob_sum)))
3714 {
3715 m_LatestExist = true;
3716 m_LatestSrvId = m_CheckSrvs[m_SrvsIndex - 1];
3717 *m_LatestBlobSum = *cur_blob_sum;
3718 }
3719 if (cur_blob_sum->size > CNCDistributionConf::GetMaxBlobSizeSync()) {
3720 m_Quorum = 1;
3721 }
3722 if (m_Quorum == 1)
3723 goto meta_search_finished;
3724 if (m_Quorum != 0)
3725 --m_Quorum;
3726
3727 results_processed:
3728
3729 m_LastPeerError = m_ActiveHub->GetErrMsg();
3730 m_ActiveHub->Release();
3731 m_ActiveHub = NULL;
3732 return &CNCMessageHandler::x_ReadMetaNextPeer;
3733
3734 meta_search_finished:
3735
3736 m_LastPeerError = m_ActiveHub->GetErrMsg();
3737 m_ActiveHub->Release();
3738 m_ActiveHub = NULL;
3739 m_CheckSrvs.clear();
3740 m_SrvsIndex = 0;
3741 return &CNCMessageHandler::x_ExecuteOnLatestSrvId;
3742 }
3743
3744 CNCMessageHandler::State
x_ExecuteOnLatestSrvId(void)3745 CNCMessageHandler::x_ExecuteOnLatestSrvId(void)
3746 {
3747 LOG_CURRENT_FUNCTION
3748 x_LogCmdEvent("ExecuteOnLatestSrvId");
3749 if (m_LatestSrvId == 0) {
3750 m_LatestSrvId = CNCDistributionConf::GetSelfID();
3751 }
3752 if (m_LatestExist) {
3753 // if max age specified, check age
3754 if (m_AgeMax != 0 && m_BlobAccess->IsBlobExists()) {
3755 CSrvTime cur_srv_time = CSrvTime::Current();
3756 unsigned int vttl = m_BlobAccess->GetCurVersionTTL();
3757 Uint8 creation = vttl != 0 ?
3758 // see x_ProlongVersionLife
3759 (m_LatestBlobSum->ver_expire - vttl) :
3760 (m_LatestBlobSum->create_time / kUSecsPerSecond);
3761 m_AgeCur = Uint8(cur_srv_time.Sec()) - creation;
3762 if (m_AgeCur > m_AgeMax) {
3763 return &CNCMessageHandler::x_ReportBlobNotFound;
3764 }
3765 }
3766 }
3767 if (m_BlobAccess->IsPurged(m_NCBlobKey)) {
3768 if (x_IsFlagSet(fPeerFindExistsOnly)) {
3769 m_LatestExist = false;
3770 return m_CmdProcessor;
3771 }
3772 #if 0
3773 return &CNCMessageHandler::x_ReportBlobNotFound;
3774 #else
3775 x_ReportError( GetMessageByStatus(eStatus_NotFound));
3776 return &CNCMessageHandler::x_DoCmd_Remove;
3777 #endif
3778 }
3779 if (x_IsFlagSet(fPeerFindExistsOnly) && m_LatestSrvId == CNCDistributionConf::GetSelfID()) {
3780 return m_CmdProcessor;
3781 }
3782 if (m_LatestSrvId == CNCDistributionConf::GetSelfID()) {
3783 if (m_LatestExist && m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired()) {
3784 return m_CmdProcessor;
3785 } else {
3786 return &CNCMessageHandler::x_ReportBlobNotFound;
3787 }
3788 }
3789
3790 CSrvDiagMsg().PrintExtra().PrintParam("proxy", "1");
3791 // Changing parameters that will go to other server: that server have to
3792 // execute command locally, without quorum (quorum equal to 1), and without
3793 // searching on other servers.
3794 m_Quorum = 1;
3795 m_SearchOnRead = false;
3796 m_ForceLocal = true;
3797 m_CheckSrvs.push_back(m_LatestSrvId);
3798 x_LogCmdEvent("ProxyToNextPeer");
3799 return &CNCMessageHandler::x_ProxyToNextPeer;
3800 }
3801
3802 CNCMessageHandler::State
x_PutToNextPeer(void)3803 CNCMessageHandler::x_PutToNextPeer(void)
3804 {
3805 LOG_CURRENT_FUNCTION
3806 x_LogCmdEvent("PutToNextPeer");
3807 if (m_SrvsIndex >= m_CheckSrvs.size() || NeedEarlyClose())
3808 return &CNCMessageHandler::x_FinishCommand;
3809
3810 Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3811 if (m_ActiveHub) {
3812 SRV_FATAL("Previous client not released");
3813 }
3814 if (CNCDistributionConf::GetSelfTrustLevel() < CNCPeerControl::Peer(srv_id)->GetTrustLevel()) {
3815 return &CNCMessageHandler::x_PutToNextPeer;
3816 }
3817 m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3818 return &CNCMessageHandler::x_SendPutToPeerCmd;
3819 }
3820
3821 CNCMessageHandler::State
x_SendPutToPeerCmd(void)3822 CNCMessageHandler::x_SendPutToPeerCmd(void)
3823 {
3824 LOG_CURRENT_FUNCTION
3825 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3826 if (status == eNCHubWaitForConn)
3827 return NULL;
3828 if (status == eNCHubError || status == eNCHubSuccess ||
3829 !m_ActiveHub->GetHandler()->GetPeer()->AcceptsBlobKey(m_NCBlobKey)) {
3830 m_MirrorsDone.push_back(m_CheckSrvs[m_SrvsIndex-1]);
3831 m_LastPeerError = m_ActiveHub->GetErrMsg();
3832 m_ActiveHub->Release();
3833 m_ActiveHub = NULL;
3834 return &CNCMessageHandler::x_PutToNextPeer;
3835 }
3836 if (status != eNCHubConnReady) {
3837 SRV_FATAL("Unexpected client status: " << status);
3838 }
3839 if (NeedEarlyClose())
3840 return &CNCMessageHandler::x_FinishCommand;
3841
3842 m_ActiveHub->GetHandler()->CopyPut(GetDiagCtx(), m_NCBlobKey, m_BlobSlot, m_OrigRecNo);
3843 return &CNCMessageHandler::x_ReadPutResults;
3844 }
3845
3846 CNCMessageHandler::State
x_ReadPutResults(void)3847 CNCMessageHandler::x_ReadPutResults(void)
3848 {
3849 LOG_CURRENT_FUNCTION
3850 x_LogCmdEvent("ReadPutResults");
3851 if (NeedEarlyClose())
3852 return &CNCMessageHandler::x_FinishCommand;
3853 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3854 if (status == eNCHubCmdInProgress)
3855 return NULL;
3856 if (status == eNCHubError)
3857 goto results_processed;
3858 if (status != eNCHubSuccess) {
3859 SRV_FATAL("Unexpected client status: " << status);
3860 }
3861
3862 m_MirrorsDone.push_back(m_CheckSrvs[m_SrvsIndex-1]);
3863 if (m_Quorum == 1) {
3864 if (m_BlobAccess->GetNewBlobSize() <= CNCDistributionConf::GetMaxBlobSizeSync()) {
3865 if (m_write_event) {
3866 m_OrigRecNo = CNCSyncLog::AddEvent(m_BlobSlot, m_write_event);
3867 m_write_event = NULL;
3868 }
3869 CNCPeerControl::MirrorWrite(m_NCBlobKey, m_BlobSlot,
3870 m_OrigRecNo, m_BlobAccess->GetNewBlobSize(),
3871 m_MirrorsDone);
3872 }
3873 return &CNCMessageHandler::x_FinishCommand;
3874 }
3875 if (m_Quorum != 0)
3876 --m_Quorum;
3877
3878 results_processed:
3879 m_ActiveHub->Release();
3880 m_ActiveHub = NULL;
3881 return &CNCMessageHandler::x_PutToNextPeer;
3882 }
3883
3884 CNCMessageHandler::State
x_PurgeToNextPeer(void)3885 CNCMessageHandler::x_PurgeToNextPeer(void)
3886 {
3887 LOG_CURRENT_FUNCTION
3888 if (m_SrvsIndex >= m_CheckSrvs.size() || NeedEarlyClose())
3889 return &CNCMessageHandler::x_FinishCommand;
3890
3891 Uint8 srv_id = m_CheckSrvs[m_SrvsIndex++];
3892 if (m_ActiveHub) {
3893 SRV_FATAL("Previous client not released");
3894 }
3895 if (!m_NCBlobKey.RawKey().empty() && !CNCPeerControl::Peer(srv_id)->AcceptsPurge2()) {
3896 return &CNCMessageHandler::x_PurgeToNextPeer;
3897 }
3898 m_ActiveHub = CNCActiveClientHub::Create(srv_id, this);
3899 return &CNCMessageHandler::x_SendPurgeToPeerCmd;
3900 }
3901
3902 CNCMessageHandler::State
x_SendPurgeToPeerCmd(void)3903 CNCMessageHandler::x_SendPurgeToPeerCmd(void)
3904 {
3905 LOG_CURRENT_FUNCTION
3906 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3907 if (status == eNCHubWaitForConn)
3908 return NULL;
3909 if (status == eNCHubError || status == eNCHubSuccess) {
3910 m_LastPeerError = m_ActiveHub->GetErrMsg();
3911 m_ActiveHub->Release();
3912 m_ActiveHub = NULL;
3913 return &CNCMessageHandler::x_PurgeToNextPeer;
3914 }
3915 if (status != eNCHubConnReady) {
3916 SRV_FATAL("Unexpected client status: " << status);
3917 }
3918 if (NeedEarlyClose())
3919 return &CNCMessageHandler::x_FinishCommand;
3920
3921 m_ActiveHub->GetHandler()->CopyPurge(GetDiagCtx(), m_NCBlobKey, m_CmdStartTime.AsUSec());
3922 return &CNCMessageHandler::x_ReadPurgeResults;
3923 }
3924
3925 CNCMessageHandler::State
x_ReadPurgeResults(void)3926 CNCMessageHandler::x_ReadPurgeResults(void)
3927 {
3928 LOG_CURRENT_FUNCTION
3929 if (NeedEarlyClose())
3930 return &CNCMessageHandler::x_FinishCommand;
3931 ENCClientHubStatus status = m_ActiveHub->GetStatus();
3932 if (status == eNCHubCmdInProgress)
3933 return NULL;
3934 if (status == eNCHubError)
3935 goto results_processed;
3936 if (status != eNCHubSuccess) {
3937 SRV_FATAL("Unexpected client status: " << status);
3938 }
3939
3940 results_processed:
3941 m_ActiveHub->Release();
3942 m_ActiveHub = NULL;
3943 return &CNCMessageHandler::x_PurgeToNextPeer;
3944 }
3945
3946
3947 inline unsigned int
x_GetBlobTTL(void)3948 CNCMessageHandler::x_GetBlobTTL(void)
3949 {
3950 LOG_CURRENT_FUNCTION
3951 return m_BlobTTL != 0 ? min(m_BlobTTL,m_AppSetup->max_ttl) : m_AppSetup->blob_ttl;
3952 }
3953
3954 CNCMessageHandler::State
x_DoCmd_Health(void)3955 CNCMessageHandler::x_DoCmd_Health(void)
3956 {
3957 LOG_CURRENT_FUNCTION
3958 const char* health_coeff = "1";
3959 if (CNCBlobStorage::NeedStopWrite()) {
3960 health_coeff = "0 (does not accept writes)";
3961 } else if (!CNCServer::IsCachingComplete()) {
3962 health_coeff = "0.1 (caching not finished)";
3963 } else if (CNCBlobStorage::IsDraining()) {
3964 health_coeff = "0.2 (draining)";
3965 } else if (!CNCServer::IsInitiallySynced()) {
3966 health_coeff = "0.5 (initial sync not finished)";
3967 } else if (CNCPeerControl::HasPeerInThrottle()) {
3968 health_coeff = "0.8 (some peers unaccessible)";
3969 }
3970 x_ReportOK("OK:HEALTH_COEFF=").WriteText(health_coeff).WriteText("\n");
3971 WriteText("OK:UP_TIME=").WriteNumber(CNCServer::GetUpTime()).WriteText("\n");
3972 WriteText("OK:CACHING_COMPLETE=").WriteText(CNCServer::IsCachingComplete()? "yes": "no").WriteText("\n");
3973 WriteText("OK:INITIALLY_SYNCED=").WriteText(CNCServer::IsInitiallySynced()? "yes": "no").WriteText("\n");
3974 Int8 free_space = CNCBlobStorage::GetDiskFree();
3975 Int8 allowed_size = CNCBlobStorage::GetAllowedDBSize(free_space);
3976 WriteText("OK:DISK_FREE=").WriteNumber(free_space).WriteText("\n");
3977 WriteText("OK:DISK_LIMIT=").WriteNumber(allowed_size).WriteText("\n");
3978 WriteText("OK:DISK_USED=").WriteNumber(CNCBlobStorage::GetDBSize()).WriteText("\n");
3979 WriteText("OK:DISK_LIMIT_ALERT=").WriteText(CNCBlobStorage::IsDBSizeAlert()? "yes": "no").WriteText("\n");
3980 WriteText("OK:N_DB_FILES=").WriteNumber(CNCBlobStorage::GetNDBFiles()).WriteText("\n");
3981 WriteText("OK:COPY_QUEUE_SIZE=").WriteNumber(CNCPeerControl::GetMirrorQueueSize()).WriteText("\n");
3982
3983 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
3984 ITERATE(TNCPeerList, it_peer, peers) {
3985 // WriteText("OK:QUEUE_SIZE_").WriteNumber(it_peer->first).WriteText("=").WriteNumber(CNCPeerControl::GetMirrorQueueSize(it_peer->first)).WriteText("\n");
3986 WriteText("OK:QUEUE_SIZE_").WriteText(it_peer->second).WriteText("=").WriteNumber(CNCPeerControl::GetMirrorQueueSize(it_peer->first)).WriteText("\n");
3987 }
3988 WriteText("OK:SYNC_LOG_SIZE=").WriteNumber(CNCSyncLog::GetLogSize()).WriteText("\n");
3989
3990 WriteText("OK:END\n");
3991 return &CNCMessageHandler::x_FinishCommand;
3992 }
3993
3994 CNCMessageHandler::State
x_DoCmd_Shutdown(void)3995 CNCMessageHandler::x_DoCmd_Shutdown(void)
3996 {
3997 LOG_CURRENT_FUNCTION
3998 TNSProtoParams& param = m_ParsedCmd.params;
3999 if (param.find("drain") != param.end() && param["drain"] != "0") {
4000 CNCBlobStorage::SetDraining(true);
4001 } else if (param.find("reset") != param.end() && param["reset"] != "0") {
4002 CNCBlobStorage::AbandonDB();
4003 CTaskServer::RequestShutdown(eSrvFastShutdown);
4004 } else {
4005 CTaskServer::RequestShutdown(eSrvSlowShutdown);
4006 }
4007 return &CNCMessageHandler::x_FinishCommand;
4008 }
4009
4010 CNCMessageHandler::State
x_DoCmd_Version(void)4011 CNCMessageHandler::x_DoCmd_Version(void)
4012 {
4013 LOG_CURRENT_FUNCTION
4014 x_ReportOK("OK:server_version=" NETCACHED_VERSION
4015 "&storage_version=" NETCACHED_STORAGE_VERSION
4016 "&protocol_version=" NETCACHED_PROTOCOL_VERSION
4017 "&build_date=" + NStr::URLEncode(NETCACHED_BUILD_DATE) +
4018 "&mirrored=" + (NStr::BoolToString( CNCDistributionConf::HasPeers())))
4019 .WriteText("\n");
4020 return &CNCMessageHandler::x_FinishCommand;
4021 }
4022
4023 CNCMessageHandler::State
x_DoCmd_GetConfig(void)4024 CNCMessageHandler::x_DoCmd_GetConfig(void)
4025 {
4026 LOG_CURRENT_FUNCTION
4027 TNSProtoParams& params = m_ParsedCmd.params;
4028 if (params.find("section") != params.end()) {
4029 string section(params["section"]);
4030 WriteText("{\"").WriteText(section).WriteText("\": {\n\"section\": \"");
4031 WriteText(section).WriteText("\"");
4032 if (section == "task_server") {
4033 CTaskServer::WriteSetup(*this);
4034 } else if (section == "netcache") {
4035 TStringMap client;
4036 if (params.find("port") != params.end()) {
4037 client["port"] = params["port"];
4038 }
4039 if (params.find("cache") != params.end()) {
4040 client["cache"] = params["cache"];
4041 }
4042 Flush();
4043 m_SendBuff.reset(new TNCBufferType());
4044 CNCServer::WriteAppSetup(*m_SendBuff, client);
4045 m_SendBuff->WriteText("\n}}\nOK:END\n");
4046 x_SetFlag(fNoReplyOnFinish);
4047 m_SendPos = 0;
4048 return &CNCMessageHandler::x_WriteSendBuff;
4049 } else if (section == "storage") {
4050 CNCBlobStorage::WriteSetup(*this);
4051 } else if (section == "mirror") {
4052 CNCDistributionConf::WriteSetup(*this);
4053 } else if (section == "env") {
4054 CNCServer::WriteEnvInfo(*this);
4055 CNCBlobStorage::WriteEnvInfo(*this);
4056 CNCDistributionConf::WriteEnvInfo(*this);
4057 } else if (section == "stat") {
4058 CSrvRef<CNCStat> stat = CNCStat::GetStat("1min", false);
4059 if (stat) {
4060 stat->PrintState(*this);
4061 }
4062 CNCPeerControl::PrintState(*this);
4063 #ifdef _DEBUG
4064 } else if (section == "syncstat") {
4065 CNCPeerControl::PrintSyncStat(*this);
4066 #endif
4067 #if __NC_TASKS_MONITOR
4068 } else if (section == "tasks") {
4069 CSrvTask::PrintState(*this);
4070 #endif
4071 } else if (section == "allalerts") {
4072 CNCAlerts::Report(*this, true);
4073 } else if (section == "alerts") {
4074 CNCAlerts::Report(*this, false);
4075 } else if (section == "sync") {
4076 CTempString mask = params.find("port") != params.end() ? params.at("port") : CTempString(kEmptyStr);
4077 Flush();
4078 m_SendBuff.reset(new TNCBufferType());
4079 CNCActiveSyncControl::PrintState(*m_SendBuff, mask);
4080 m_SendBuff->WriteText("\n}}\nOK:END\n");
4081 x_SetFlag(fNoReplyOnFinish);
4082 m_SendPos = 0;
4083 return &CNCMessageHandler::x_WriteSendBuff;
4084 } else if (section == "db") {
4085 CTempString mask = params.find("port") != params.end() ? params.at("port") : CTempString(kEmptyStr);
4086 Flush();
4087 m_SendBuff.reset(new TNCBufferType());
4088 CNCBlobStorage::WriteDbInfo(*m_SendBuff, mask);
4089 m_SendBuff->WriteText("\n}}\nOK:END\n");
4090 x_SetFlag(fNoReplyOnFinish);
4091 m_SendPos = 0;
4092 return &CNCMessageHandler::x_WriteSendBuff;
4093 } else if (section == "blobs") {
4094 CNCBlobStorage::WriteBlobStat(*this);
4095 } else if (section == "blist") {
4096 CTempString mask = params.find("port") != params.end() ? params.at("port") : CTempString(kEmptyStr);
4097 Flush();
4098 m_SendBuff.reset(new TNCBufferType());
4099 CNCBlobStorage::WriteBlobList(*m_SendBuff, mask);
4100 m_SendBuff->WriteText("\n}}\nOK:END\n");
4101 x_SetFlag(fNoReplyOnFinish);
4102 m_SendPos = 0;
4103 return &CNCMessageHandler::x_WriteSendBuff;
4104 } else {
4105 WriteText(",\n\"error\": \"Unknown section name, valid names: ");
4106 #if __NC_TASKS_MONITOR
4107 WriteText("task_server, netcache, storage, mirror, alerts, allalerts, env, stat, sync, tasks, blobs, db\"");
4108 #else
4109 WriteText("task_server, netcache, storage, mirror, alerts, allalerts, env, stat, sync, blobs, db\"");
4110 #endif
4111 }
4112 WriteText("\n}}");
4113 } else {
4114 CNcbiOstrstream str;
4115 CTaskServer::GetConfRegistry().Write(str);
4116 string conf = CNcbiOstrstreamToString(str);
4117 WriteText(conf);
4118 }
4119 x_ReportOK("\nOK:END\n");
4120 return &CNCMessageHandler::x_FinishCommand;
4121 }
4122
4123 CNCMessageHandler::State
x_DoCmd_AckAlert(void)4124 CNCMessageHandler::x_DoCmd_AckAlert(void)
4125 {
4126 LOG_CURRENT_FUNCTION
4127 CNCAlerts::EAlertAckResult res = CNCAlerts::eNotFound;
4128 TNSProtoParams& params = m_ParsedCmd.params;
4129 if (params.find("alert") != params.end()) {
4130 string alert(params["alert"]);
4131 if (params.find("user") != params.end()) {
4132 string user(params["user"]);
4133 res = CNCAlerts::Acknowledge(alert, user);
4134 }
4135 }
4136 if (res == CNCAlerts::eNotFound) {
4137 x_ReportError("ERR:Not found");
4138 } else {
4139 x_ReportOK("OK:END\n");
4140 }
4141 return &CNCMessageHandler::x_FinishCommand;
4142 }
4143
4144 CNCMessageHandler::State
x_DoCmd_ReConfig(void)4145 CNCMessageHandler::x_DoCmd_ReConfig(void)
4146 {
4147 LOG_CURRENT_FUNCTION
4148 if (!CNCServer::IsInitiallySynced()) {
4149 x_ReportError("ERR:Initial sync not finished");
4150 GetDiagCtx()->SetRequestStatus(eStatus_CondFailed);
4151 return &CNCMessageHandler::x_FinishCommand;
4152 }
4153 TNSProtoParams& params = m_ParsedCmd.params;
4154 string err_message("ERR:Unknown section name");
4155 bool result = false;
4156 if (params.find("section") != params.end()) {
4157 string section(params["section"]);
4158 if (section != "task_server" &&
4159 section != "mirror" &&
4160 section != "storage") {
4161 err_message += ": " + section;
4162 goto done;
4163 }
4164 CNcbiRegistry* new_reg = NULL;
4165 if (!CTaskServer::ReadConfiguration(new_reg))
4166 {
4167 err_message = "ERR:Failed to load registry";
4168 goto done;
4169 }
4170 if (section == "task_server") {
4171 result = CTaskServer::ReConfig(*new_reg, err_message);
4172 } else if (section == "mirror") {
4173 result = CNCDistributionConf::ReConfig(*new_reg, err_message);
4174 if (result) {
4175 CNCPeriodicSync::ReConfig();
4176 }
4177 } else if (section == "storage") {
4178 result = CNCBlobStorage::ReConfig(*new_reg, err_message);
4179 }
4180 delete new_reg;
4181 }
4182 done:
4183 if (result) {
4184 x_ReportOK("OK:\n");
4185 } else {
4186 x_ReportError(err_message);
4187 }
4188 return &CNCMessageHandler::x_FinishCommand;
4189 }
4190
4191 CNCMessageHandler::State
x_DoCmd_GetStat(void)4192 CNCMessageHandler::x_DoCmd_GetStat(void)
4193 {
4194 LOG_CURRENT_FUNCTION
4195 CSrvRef<CNCStat> stat = CNCStat::GetStat(m_StatType, m_StatPrev);
4196 if (!stat) {
4197 x_ReportError("ERR:Unknown statistics type: " + m_StatType);
4198 GetDiagCtx()->SetRequestStatus(eStatus_BadCmd);
4199 }
4200 else {
4201 stat->PrintToSocket(this);
4202 x_ReportOK("OK:END\n");
4203 }
4204 return &CNCMessageHandler::x_FinishCommand;
4205 }
4206
4207 CNCMessageHandler::State
x_DoCmd_Put(void)4208 CNCMessageHandler::x_DoCmd_Put(void)
4209 {
4210 LOG_CURRENT_FUNCTION
4211 if (x_IsUserFlagSet(fNoCreate) && !(m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired())) {
4212 return &CNCMessageHandler::x_ReportBlobNotFound;
4213 }
4214 if (!m_BlobAccess->IsBlobExists()) {
4215 EHTTPStatus sts = eStatus_Created;
4216 if (x_IsHttpMode()) {
4217 // if blob does not exist, verify that it is POST, not PUT
4218 if (!x_IsFlagSet(fCanGenerateKey)) {
4219 sts = eStatus_NotFound;
4220 }
4221 if (CNCBlobStorage::IsDraining()) {
4222 sts = eStatus_ServiceUnavailable;
4223 }
4224 } else if (CNCBlobStorage::IsDraining()) {
4225 sts = eStatus_ShuttingDown;
4226 }
4227 if (!x_IsCmdSucceeded(sts)) {
4228 x_ReportError(sts);
4229 return &CNCMessageHandler::x_FinishCommand;
4230 }
4231 GetDiagCtx()->SetRequestStatus(sts);
4232 }
4233
4234 m_BlobAccess->SetBlobTTL(x_GetBlobTTL());
4235 m_BlobAccess->SetVersionTTL(0);
4236 m_BlobAccess->SetBlobVersion(0);
4237 if (!x_IsHttpMode()) {
4238 // on Put, client expects 'ready' status before sending blob data
4239 WriteText("OK:ID:").WriteText(m_NCBlobKey.RawKey()).WriteText("\n");
4240 } else {
4241 string key(m_NCBlobKey.RawKey());
4242 if (!m_ClientParams["service"].empty()) {
4243 CNetCacheKey::AddExtensions(key, m_ClientParams["service"], 0, 3);
4244 }
4245 CNcbiOstrstream str;
4246 str << ",\"blob_key\":\"" << key << "\"";
4247 m_PosponedCmd += CNcbiOstrstreamToString(str);
4248 }
4249 return &CNCMessageHandler::x_StartReadingBlob;
4250 }
4251
4252 CNCMessageHandler::State
x_DoCmd_Get(void)4253 CNCMessageHandler::x_DoCmd_Get(void)
4254 {
4255 LOG_CURRENT_FUNCTION
4256 if (m_AppSetup->prolong_on_read)
4257 x_ProlongBlobDeadTime(m_BlobAccess->GetCurBlobTTL());
4258
4259 Uint8 blob_size = m_BlobAccess->GetCurBlobSize();
4260 if (blob_size < m_StartPos)
4261 blob_size = 0;
4262 else
4263 blob_size -= m_StartPos;
4264 bool range = false;
4265 if (m_Size != Uint8(-1)) {
4266 range = true;
4267 if (m_Size < blob_size)
4268 blob_size = m_Size;
4269 else
4270 m_Size = blob_size;
4271 }
4272
4273 if (!x_IsHttpMode()) {
4274 GetDiagCtx()->SetRequestStatus(range ? eStatus_PartialContent : eStatus_OK);
4275 x_ReportOK("OK:BLOB found. SIZE=").WriteNumber(blob_size);
4276 if (m_AgeMax != 0) {
4277 WriteText(", AGE=").WriteNumber(m_AgeCur);
4278 }
4279 WriteText("\n");
4280 } else {
4281 // http://greenbytes.de/tech/webdav/rfc2616.html#header.range
4282 x_WriteHttpHeader(range ? eStatus_PartialContent : eStatus_OK, blob_size, true);
4283 x_SetFlag(fNoReplyOnFinish);
4284 }
4285
4286 if (blob_size == 0)
4287 return &CNCMessageHandler::x_FinishCommand;
4288 if (NeedEarlyClose())
4289 return &CNCMessageHandler::x_CloseCmdAndConn;
4290
4291 m_BlobAccess->SetPosition(m_StartPos);
4292 return &CNCMessageHandler::x_WriteBlobData;
4293 }
4294
4295 CNCMessageHandler::State
x_DoCmd_GetLast(void)4296 CNCMessageHandler::x_DoCmd_GetLast(void)
4297 {
4298 LOG_CURRENT_FUNCTION
4299 if (m_AppSetup->prolong_on_read)
4300 x_ProlongBlobDeadTime(m_BlobAccess->GetCurBlobTTL());
4301
4302 Uint8 blob_size = m_BlobAccess->GetCurBlobSize();
4303 if (blob_size < m_StartPos)
4304 blob_size = 0;
4305 else
4306 blob_size -= m_StartPos;
4307 if (m_Size != Uint8(-1) && m_Size < blob_size)
4308 blob_size = m_Size;
4309
4310 x_ReportOK("OK:BLOB found. SIZE=").WriteNumber(blob_size);
4311 WriteText(", VER=").WriteNumber(m_BlobAccess->GetCurBlobVersion());
4312 WriteText(", VALID=").WriteText(m_BlobAccess->IsCurVerExpired()? "false": "true");
4313 if (m_AgeMax != 0) {
4314 WriteText(", AGE=").WriteNumber(m_AgeCur);
4315 }
4316 WriteText("\n");
4317
4318 if (blob_size == 0)
4319 return &CNCMessageHandler::x_FinishCommand;
4320 if (NeedEarlyClose())
4321 return &CNCMessageHandler::x_CloseCmdAndConn;
4322
4323 m_BlobAccess->SetPosition(m_StartPos);
4324 return &CNCMessageHandler::x_WriteBlobData;
4325 }
4326
4327 CNCMessageHandler::State
x_DoCmd_SetValid(void)4328 CNCMessageHandler::x_DoCmd_SetValid(void)
4329 {
4330 LOG_CURRENT_FUNCTION
4331 if (!m_BlobAccess->IsBlobExists() || m_BlobAccess->IsCurBlobExpired())
4332 return &CNCMessageHandler::x_ReportBlobNotFound;
4333
4334 if (m_BlobAccess->GetCurBlobVersion() != m_BlobVersion) {
4335 GetDiagCtx()->SetRequestStatus(eStatus_RaceCond);
4336 x_ReportOK("OK:WARNING:BLOB was changed");
4337 WriteText(", VER=").WriteNumber(m_BlobAccess->GetCurBlobVersion());
4338 WriteText("\n");
4339 }
4340 else {
4341 x_ProlongVersionLife();
4342 x_ReportOK("OK:\n");
4343 }
4344 return &CNCMessageHandler::x_FinishCommand;
4345 }
4346
4347 CNCMessageHandler::State
x_DoCmd_GetSize(void)4348 CNCMessageHandler::x_DoCmd_GetSize(void)
4349 {
4350 LOG_CURRENT_FUNCTION
4351 if (m_AppSetup->prolong_on_read)
4352 x_ProlongBlobDeadTime(m_BlobAccess->GetCurBlobTTL());
4353
4354 Uint8 size = m_BlobAccess->GetCurBlobSize();
4355 x_ReportOK("OK:").WriteNumber(size).WriteText("\n");
4356
4357 return &CNCMessageHandler::x_FinishCommand;
4358 }
4359
4360 CNCMessageHandler::State
x_DoCmd_Prolong(void)4361 CNCMessageHandler::x_DoCmd_Prolong(void)
4362 {
4363 LOG_CURRENT_FUNCTION
4364 bool ttl_overrun = false;
4365 #if 0
4366 if (m_BlobTTL <= m_BlobAccess->GetCurBlobTTL())
4367 ttl_overrun = false;
4368 else {
4369 m_BlobTTL = m_BlobAccess->GetCurBlobTTL();
4370 ttl_overrun = true;
4371 }
4372 #else
4373 if (m_BlobTTL > m_AppSetup->max_ttl) {
4374 m_BlobTTL = m_AppSetup->max_ttl;
4375 ttl_overrun = true;
4376 }
4377 #endif
4378
4379 x_ProlongBlobDeadTime(m_BlobTTL);
4380 // Distinguish "PROLONG" vs "PROXY_PROLONG".
4381 if (x_IsFlagSet(fComesFromClient)) {
4382 if (!ttl_overrun) {
4383 x_ReportOK("OK:\n");
4384 } else {
4385 x_ReportOK("OK:WARNING:Capped the requested TTL for '").
4386 WriteText(m_NCBlobKey.RawKey()).WriteText("' at ").
4387 WriteNumber(m_BlobTTL).WriteText(" seconds.\n");
4388 }
4389 }
4390 return &CNCMessageHandler::x_FinishCommand;
4391 }
4392
4393 CNCMessageHandler::State
x_DoCmd_HasBlob(void)4394 CNCMessageHandler::x_DoCmd_HasBlob(void)
4395 {
4396 LOG_CURRENT_FUNCTION
4397 int ver = 0;
4398 bool ver_report = false;
4399 bool exist = m_LatestExist && m_LatestBlobSum->expire > CSrvTime::CurSecs();
4400 if (exist) {
4401 ver = m_BlobAccess->GetCurBlobVersion();
4402 ver_report = m_BlobVersion != ver;
4403 exist = !ver_report;
4404 }
4405 if (!exist) {
4406 GetDiagCtx()->SetRequestStatus(ver_report ? sStatus_BlobVersion : eStatus_NotFound);
4407 }
4408 x_ReportOK("OK:").WriteNumber(exist ? 1 : 0);
4409 if (ver_report) {
4410 WriteText(", VER=").WriteNumber(ver);
4411 }
4412 WriteText("\n");
4413 return &CNCMessageHandler::x_FinishCommand;
4414 }
4415
4416 CNCMessageHandler::State
x_DoCmd_Remove(void)4417 CNCMessageHandler::x_DoCmd_Remove(void)
4418 {
4419 LOG_CURRENT_FUNCTION
4420 // We delete blob only from client point of view. From our POV we create
4421 // new blob version with expiration time one second in the past. This is
4422 // necessary for proper synchronization with other servers (if we delete
4423 // blob and then will synchronize using blob lists then other server will
4424 // copy the blob back to us). And we can create this new blob version
4425 // safely even when we haven't completed yet the initial synchronization.
4426 if ((!m_BlobAccess || !m_BlobAccess->IsBlobExists() || m_BlobAccess->IsCurBlobExpired())
4427 /*&& CNCServer::IsInitiallySynced()*/)
4428 {
4429 return &CNCMessageHandler::x_FinishCommand;
4430 }
4431
4432 bool is_mirrored = CNCDistributionConf::CountServersForSlot(m_BlobSlot) != 0;
4433 bool is_good = CNCServer::IsInitiallySynced() && !CNCPeerControl::HasPeerInThrottle();
4434 unsigned int mirrored_ttl = is_good ? min(Uint4(300), x_GetBlobTTL()) : x_GetBlobTTL();
4435 unsigned int local_ttl = 5;
4436 m_BlobAccess->SetBlobTTL( is_mirrored ? mirrored_ttl : local_ttl);
4437 m_BlobAccess->SetBlobVersion(m_BlobVersion);
4438 int expire = CSrvTime::CurSecs() - 1;
4439 unsigned int ttl = m_BlobAccess->GetNewBlobTTL();
4440 m_BlobAccess->SetNewBlobExpire(expire, expire + ttl + 1);
4441 CNCBlobAccessor::PutSucceeded(m_BlobAccess->GetBlobKey());
4442
4443 // On COPY_RMV, modify create_time as well, to preserve blobs on peers (this one will be older)
4444 if (m_BlobAccess->GetAccessType() == eNCRead && m_CopyBlobInfo->create_time != 0) {
4445 m_BlobAccess->SetBlobCreateTime(m_CopyBlobInfo->create_time - 1);
4446 m_OrigRecNo = 0;
4447 }
4448 return &CNCMessageHandler::x_FinishReadingBlob;
4449 }
4450
4451 CNCMessageHandler::State
x_DoCmd_IC_Store(void)4452 CNCMessageHandler::x_DoCmd_IC_Store(void)
4453 {
4454 LOG_CURRENT_FUNCTION
4455 if (x_IsUserFlagSet(fNoCreate) && !(m_BlobAccess->IsBlobExists() && !m_BlobAccess->IsCurBlobExpired())) {
4456 return &CNCMessageHandler::x_ReportBlobNotFound;
4457 }
4458 if (!m_BlobAccess->IsBlobExists()) {
4459 if (CNCBlobStorage::IsDraining()) {
4460 x_ReportError(eStatus_ShuttingDown);
4461 return &CNCMessageHandler::x_FinishCommand;
4462 }
4463 GetDiagCtx()->SetRequestStatus(eStatus_Created);
4464 }
4465 m_BlobAccess->SetBlobTTL(x_GetBlobTTL());
4466 m_BlobAccess->SetVersionTTL(m_AppSetup->ver_ttl);
4467 m_BlobAccess->SetBlobVersion(m_BlobVersion);
4468 // on Store, client expects 'ready' status before sending blob data
4469 WriteText("OK:\n");
4470 if (m_Size == 0) {
4471 return &CNCMessageHandler::x_FinishCommand;
4472 }
4473 return &CNCMessageHandler::x_StartReadingBlob;
4474 }
4475
4476 void
x_WriteFullBlobsList(void)4477 CNCMessageHandler::x_WriteFullBlobsList(void)
4478 {
4479 LOG_CURRENT_FUNCTION
4480 TNCBlobSumList blobs_list;
4481 CNCBlobStorage::GetFullBlobsList(m_Slot, blobs_list, CNCPeerControl::Peer(m_SrvId));
4482 m_SendBuff.reset(new TNCBufferType());
4483 m_SendBuff->reserve_mem(blobs_list.size() * 200);
4484 NON_CONST_ITERATE(TNCBlobSumList, it_blob, blobs_list) {
4485 if (NeedEarlyClose())
4486 goto error_return;
4487
4488 const string& key = it_blob->first;
4489 SNCBlobSummary* blob_sum = it_blob->second;
4490 Uint2 key_size = Uint2(key.size());
4491 m_SendBuff->append(&key_size, sizeof(key_size));
4492 m_SendBuff->append(key.data(), key_size);
4493 m_SendBuff->append(&blob_sum->create_time, sizeof(blob_sum->create_time));
4494 m_SendBuff->append(&blob_sum->create_server, sizeof(blob_sum->create_server));
4495 m_SendBuff->append(&blob_sum->create_id, sizeof(blob_sum->create_id));
4496 m_SendBuff->append(&blob_sum->dead_time, sizeof(blob_sum->dead_time));
4497 m_SendBuff->append(&blob_sum->expire, sizeof(blob_sum->expire));
4498 m_SendBuff->append(&blob_sum->ver_expire, sizeof(blob_sum->ver_expire));
4499 delete blob_sum;
4500 it_blob->second = NULL;
4501 blob_sum = NULL;
4502 }
4503 return;
4504
4505 error_return:
4506 ITERATE(TNCBlobSumList, it_blob, blobs_list) {
4507 if (it_blob->second)
4508 delete it_blob->second;
4509 }
4510 }
4511
4512 CNCMessageHandler::State
x_DoCmd_SyncStart(void)4513 CNCMessageHandler::x_DoCmd_SyncStart(void)
4514 {
4515 LOG_CURRENT_FUNCTION
4516 if (CNCBlobStorage::IsDraining()) {
4517 x_ReportError(eStatus_ShuttingDown);
4518 return &CNCMessageHandler::x_FinishCommand;
4519 }
4520 TReducedSyncEvents sync_events;
4521 ESyncInitiateResult sync_res = CNCPeriodicSync::Initiate(m_SrvId, m_Slot,
4522 &m_LocalRecNo, &m_RemoteRecNo,
4523 &sync_events, &m_SyncId);
4524 if (sync_res == eCrossSynced) {
4525 GetDiagCtx()->SetRequestStatus(eStatus_CrossSync);
4526 x_ReportOK("OK:CROSS_SYNC,SIZE=0\n");
4527 return &CNCMessageHandler::x_FinishCommand;
4528 }
4529 else if (sync_res == eServerBusy) {
4530 GetDiagCtx()->SetRequestStatus(eStatus_SyncBusy);
4531 x_ReportOK("OK:IN_PROGRESS,SIZE=0\n");
4532 return &CNCMessageHandler::x_FinishCommand;
4533 }
4534 else if (sync_res == eUnknownServer) {
4535 GetDiagCtx()->SetRequestStatus(eStatus_NotAllowed);
4536 x_ReportOK("OK:SIZE=0, NEED_ABORT2\n");
4537 if (CNCStat::AddUnknownServer(m_SrvId)) {
4538 SRV_LOG(Warning, "SYNC_START request from unknown server " << m_SrvId);
4539 }
4540 return &CNCMessageHandler::x_FinishCommand;
4541 }
4542
4543 // Set fRunsInStartedSync flag so that x_CleanCmdResources() could properly call
4544 // CNCPeriodicSync::SyncCommandFinished() or CNCPeriodicSync::Cancel().
4545 x_SetFlag(fRunsInStartedSync);
4546 string result;
4547 if (sync_res == eProceedWithEvents) {
4548 m_SendBuff.reset(new TNCBufferType());
4549 m_SendBuff->reserve_mem(sync_events.size() * 200);
4550 ITERATE(TReducedSyncEvents, it_evt, sync_events) {
4551 if (NeedEarlyClose())
4552 break;
4553
4554 const SBlobEvent& blob_evt = it_evt->second;
4555 for (int i = 0; i < 2; ++i) {
4556 SNCSyncEvent* evt = (i == 0? blob_evt.wr_or_rm_event: blob_evt.prolong_event);
4557 if (!evt) {
4558 continue;
4559 }
4560 if (evt->blob_size > CNCDistributionConf::GetMaxBlobSizeSync()) {
4561 if (CNCDistributionConf::IsThisServerKey(evt->key.PackedKey())) {
4562 continue;
4563 }
4564 }
4565 if (!CNCPeerControl::Peer(m_SrvId)->AcceptsBlobKey(evt->key)) {
4566 continue;
4567 }
4568 Uint2 key_size = Uint2(evt->key.PackedKey().size());
4569 m_SendBuff->append(&key_size, sizeof(key_size));
4570 m_SendBuff->append(evt->key.PackedKey().data(), key_size);
4571 char c = char(evt->event_type);
4572 m_SendBuff->append(&c, 1);
4573 m_SendBuff->append(&evt->rec_no, sizeof(evt->rec_no));
4574 m_SendBuff->append(&evt->local_time, sizeof(evt->local_time));
4575 m_SendBuff->append(&evt->orig_rec_no, sizeof(evt->orig_rec_no));
4576 m_SendBuff->append(&evt->orig_server, sizeof(evt->orig_server));
4577 m_SendBuff->append(&evt->orig_time, sizeof(evt->orig_time));
4578 }
4579 }
4580 GetDiagCtx()->SetRequestStatus(eStatus_SyncEvents);
4581 x_SetFlag(fSyncCmdSuccessful);
4582 }
4583 else {
4584 _ASSERT(sync_res == eProceedWithBlobs);
4585 m_LocalRecNo = CNCSyncLog::GetCurrentRecNo(m_Slot);
4586 x_WriteFullBlobsList();
4587 GetDiagCtx()->SetRequestStatus(eStatus_SyncBList);
4588 x_SetFlag(fSyncCmdSuccessful);
4589 result += "ALL_BLOBS,";
4590 }
4591
4592 if (NeedEarlyClose())
4593 return &CNCMessageHandler::x_CloseCmdAndConn;
4594
4595 x_ReportOK("OK:").WriteText(result);
4596 WriteText("SIZE=").WriteNumber(m_SendBuff->size());
4597 WriteText(" ").WriteNumber(m_LocalRecNo);
4598 WriteText(" ").WriteNumber(m_RemoteRecNo);
4599 WriteText("\n");
4600 m_SendPos = 0;
4601 return &CNCMessageHandler::x_WriteSendBuff;
4602 }
4603
4604 CNCMessageHandler::State
x_DoCmd_SyncBlobsList(void)4605 CNCMessageHandler::x_DoCmd_SyncBlobsList(void)
4606 {
4607 LOG_CURRENT_FUNCTION
4608 CNCPeriodicSync::MarkCurSyncByBlobs(m_SrvId, m_Slot, m_SyncId);
4609 Uint8 rec_no = CNCSyncLog::GetCurrentRecNo(m_Slot);
4610 x_WriteFullBlobsList();
4611
4612 if (NeedEarlyClose())
4613 return &CNCMessageHandler::x_CloseCmdAndConn;
4614
4615 x_ReportOK("OK:SIZE=").WriteNumber(m_SendBuff->size());
4616 WriteText(" ").WriteNumber(rec_no);
4617 WriteText("\n");
4618 m_SendPos = 0;
4619 return &CNCMessageHandler::x_WriteSendBuff;
4620 }
4621
4622 CNCMessageHandler::State
x_DoCmd_CopyPut(void)4623 CNCMessageHandler::x_DoCmd_CopyPut(void)
4624 {
4625 LOG_CURRENT_FUNCTION
4626 if (!m_BlobAccess->IsBlobExists()) {
4627 if (CNCBlobStorage::IsDraining()) {
4628 x_ReportError(eStatus_ShuttingDown);
4629 return &CNCMessageHandler::x_FinishCommand;
4630 }
4631 GetDiagCtx()->SetRequestStatus(eStatus_Created);
4632 }
4633 m_CopyBlobInfo->ttl = m_BlobTTL;
4634 m_CopyBlobInfo->password = m_BlobPass;
4635 m_CopyBlobInfo->blob_ver = m_BlobVersion;
4636 bool need_read_blob = m_BlobAccess->ReplaceBlobInfo(*m_CopyBlobInfo);
4637 if (need_read_blob) {
4638 // while receiving new data, redirect clients to "correct" server
4639 if (m_BlobAccess->IsBlobExists()) {
4640 m_BlobAccess->UpdateMetaInfo(m_CopyBlobInfo->create_server, m_CopyBlobInfo->create_time);
4641 }
4642 WriteText("OK:\n");
4643 }
4644 else {
4645 GetDiagCtx()->SetRequestStatus(eStatus_NewerBlob);
4646 x_SetFlag(fNoBlobAccessStats);
4647 x_SetFlag(fSyncCmdSuccessful);
4648 x_UnsetFlag(fCopyLogEvent);
4649 WriteText("OK:HAVE_NEWER1\n");
4650 }
4651 // Old NC servers (those which used CNetCacheAPI instead of
4652 // CNCActiveHandler) always started to write blob data in SYNC_PUT and
4653 // COPY_PUT even when we responded to them HAVE_NEWER. To avoid breaking
4654 // the protocol we need to read from them those fake blob writes. So for
4655 // old NC servers when we answered HAVE_NEWER we'll go to x_StartReadingBlob,
4656 // for newer ones we'll go to x_FinishCommand.
4657 if (!need_read_blob && m_CmdVersion != 0) {
4658 x_SetFlag(fNoReplyOnFinish);
4659 x_UnsetFlag(fReadExactBlobSize);
4660 return &CNCMessageHandler::x_FinishCommand;
4661 }
4662
4663 return &CNCMessageHandler::x_StartReadingBlob;
4664 }
4665
4666 CNCMessageHandler::State
x_DoCmd_CopyProlong(void)4667 CNCMessageHandler::x_DoCmd_CopyProlong(void)
4668 {
4669 LOG_CURRENT_FUNCTION
4670 if (!m_BlobAccess->IsBlobExists()) {
4671 x_SetFlag(fSyncCmdSuccessful);
4672 return &CNCMessageHandler::x_ReportBlobNotFound;
4673 }
4674
4675 if (m_BlobAccess->GetCurBlobCreateTime() == m_CopyBlobInfo->create_time
4676 && m_BlobAccess->GetCurCreateServer() == m_CopyBlobInfo->create_server
4677 && m_BlobAccess->GetCurCreateId() == m_CopyBlobInfo->create_id)
4678 {
4679 bool need_event = false;
4680 if (m_BlobAccess->GetCurBlobExpire() < m_CopyBlobInfo->expire) {
4681 m_BlobAccess->SetCurBlobExpire(m_CopyBlobInfo->expire,
4682 m_CopyBlobInfo->dead_time);
4683 need_event = true;
4684 }
4685 if (m_BlobAccess->GetCurVerExpire() < m_CopyBlobInfo->ver_expire) {
4686 m_BlobAccess->SetCurVerExpire(m_CopyBlobInfo->ver_expire);
4687 need_event = true;
4688 }
4689
4690 // m_OrigRecNo can be 0 if prolong happens as a result of synchronization
4691 // using blob lists. In this case there's no event to link to and thus
4692 // no need to create event here.
4693 if (need_event && m_OrigRecNo != 0) {
4694 SNCSyncEvent* event = new SNCSyncEvent();
4695 event->blob_size = m_BlobAccess->GetCurBlobSize();
4696 event->event_type = eSyncProlong;
4697 event->key = m_NCBlobKey;
4698 event->orig_server = m_OrigSrvId;
4699 event->orig_time = m_OrigTime;
4700 event->orig_rec_no = m_OrigRecNo;
4701 CNCSyncLog::AddEvent(m_BlobSlot, event);
4702 }
4703 }
4704 else {
4705 GetDiagCtx()->SetRequestStatus(eStatus_NewerBlob);
4706 x_SetFlag(fSyncCmdSuccessful);
4707 }
4708 return &CNCMessageHandler::x_FinishCommand;
4709 }
4710
4711 CNCMessageHandler::State
x_DoCmd_SyncGet(void)4712 CNCMessageHandler::x_DoCmd_SyncGet(void)
4713 {
4714 LOG_CURRENT_FUNCTION
4715 if (!m_BlobAccess->IsBlobExists()) {
4716 x_SetFlag(fSyncCmdSuccessful);
4717 return &CNCMessageHandler::x_ReportBlobNotFound;
4718 }
4719
4720 bool need_send = true;
4721 if (m_OrigTime != m_BlobAccess->GetCurBlobCreateTime()) {
4722 need_send = false;
4723 }
4724 else if (m_BlobAccess->GetCurBlobCreateTime() < m_CopyBlobInfo->create_time) {
4725 need_send = false;
4726 }
4727 else if (m_BlobAccess->GetCurBlobCreateTime() == m_CopyBlobInfo->create_time) {
4728 if (m_BlobAccess->GetCurCreateServer() < m_CopyBlobInfo->create_server) {
4729 need_send = false;
4730 }
4731 else if (m_BlobAccess->GetCurCreateServer() == m_CopyBlobInfo->create_server
4732 && m_BlobAccess->GetCurCreateId() <= m_CopyBlobInfo->create_id)
4733 {
4734 need_send = false;
4735 }
4736 }
4737 if (!need_send) {
4738 GetDiagCtx()->SetRequestStatus(eStatus_NewerBlob);
4739 x_SetFlag(fSyncCmdSuccessful);
4740 x_ReportOK("OK:SIZE=0, HAVE_NEWER\n");
4741 return &CNCMessageHandler::x_FinishCommand;
4742 }
4743
4744 x_ReportOK("OK:SIZE=").WriteNumber(m_BlobAccess->GetCurBlobSize());
4745 WriteText(" ").WriteNumber(m_BlobAccess->GetCurBlobVersion());
4746 WriteText(" \"").WriteText(m_BlobAccess->GetCurPassword());
4747 WriteText("\" ").WriteNumber(m_BlobAccess->GetCurBlobCreateTime());
4748 WriteText(" ").WriteNumber(Uint4(m_BlobAccess->GetCurBlobTTL()));
4749 WriteText(" ").WriteNumber(m_BlobAccess->GetCurBlobDeadTime());
4750 WriteText(" ").WriteNumber(m_BlobAccess->GetCurBlobExpire());
4751 WriteText(" ").WriteNumber(Uint4(m_BlobAccess->GetCurVersionTTL()));
4752 WriteText(" ").WriteNumber(m_BlobAccess->GetCurVerExpire());
4753 WriteText(" ").WriteNumber(m_BlobAccess->GetCurCreateServer());
4754 WriteText(" ").WriteNumber(m_BlobAccess->GetCurCreateId());
4755 WriteText("\n");
4756
4757 if (m_BlobAccess->GetCurBlobSize() == 0)
4758 return &CNCMessageHandler::x_FinishCommand;
4759 if (NeedEarlyClose())
4760 return &CNCMessageHandler::x_CloseCmdAndConn;
4761
4762 m_BlobAccess->SetPosition(0);
4763 return &CNCMessageHandler::x_WriteBlobData;
4764 }
4765
4766 CNCMessageHandler::State
x_DoCmd_SyncProlongInfo(void)4767 CNCMessageHandler::x_DoCmd_SyncProlongInfo(void)
4768 {
4769 LOG_CURRENT_FUNCTION
4770 if (!m_BlobAccess->IsBlobExists()) {
4771 x_SetFlag(fSyncCmdSuccessful);
4772 return &CNCMessageHandler::x_ReportBlobNotFound;
4773 }
4774
4775 x_ReportOK("OK:SIZE=0 ");
4776 WriteNumber(m_BlobAccess->GetCurBlobCreateTime()).WriteText(" ");
4777 WriteNumber(m_BlobAccess->GetCurCreateServer()).WriteText(" ");
4778 WriteNumber(m_BlobAccess->GetCurCreateId()).WriteText(" ");
4779 WriteNumber(m_BlobAccess->GetCurBlobDeadTime()).WriteText(" ");
4780 WriteNumber(m_BlobAccess->GetCurBlobExpire()).WriteText(" ");
4781 WriteNumber(m_BlobAccess->GetCurVerExpire());
4782 WriteText("\n");
4783
4784 return &CNCMessageHandler::x_FinishCommand;
4785 }
4786
4787 CNCMessageHandler::State
x_DoCmd_SyncCommit(void)4788 CNCMessageHandler::x_DoCmd_SyncCommit(void)
4789 {
4790 LOG_CURRENT_FUNCTION
4791 CNCPeriodicSync::Commit(m_SrvId, m_Slot, m_SyncId, m_LocalRecNo, m_RemoteRecNo);
4792 x_UnsetFlag(fRunsInStartedSync);
4793 return &CNCMessageHandler::x_FinishCommand;
4794 }
4795
4796 CNCMessageHandler::State
x_DoCmd_SyncCancel(void)4797 CNCMessageHandler::x_DoCmd_SyncCancel(void)
4798 {
4799 LOG_CURRENT_FUNCTION
4800 CNCPeriodicSync::Cancel(m_SrvId, m_Slot, m_SyncId);
4801 x_UnsetFlag(fRunsInStartedSync);
4802 return &CNCMessageHandler::x_FinishCommand;
4803 }
4804
4805 CNCMessageHandler::State
x_DoCmd_GetMeta(void)4806 CNCMessageHandler::x_DoCmd_GetMeta(void)
4807 {
4808 LOG_CURRENT_FUNCTION
4809 char time_buf[50];
4810 string tmp;
4811
4812 if (x_IsHttpMode()) {
4813
4814 CNcbiOstrstream str;
4815
4816 tmp = m_NCBlobKey.RawKey();
4817 if (!m_ClientParams["service"].empty()) {
4818 CNetCacheKey::AddExtensions(tmp, m_ClientParams["service"], 0, 3);
4819 }
4820 str << ",\"blob_key\":\"" << tmp << "\"";
4821 str << ",\"slot\":" << m_BlobSlot;
4822 Uint8 create_time = m_BlobAccess->GetCurBlobCreateTime();
4823 CSrvTime t;
4824 t.Sec() = time_t(create_time / kUSecsPerSecond);
4825 t.NSec() = (create_time % kUSecsPerSecond) * 1000;
4826 t.Print(time_buf, CSrvTime::eFmtHumanUSecs);
4827 str << ",\"write_time\":\"" << time_buf << "\"";
4828 Uint8 create_server = m_BlobAccess->GetCurCreateServer();
4829
4830 string hostport( CNCDistributionConf::GetPeerNameOrEmpty(create_server));
4831 str << ",\"control_server\":\"";
4832 if (hostport.empty()) {
4833 str << CTaskServer::GetHostByIP(Uint4(create_server >> 32)) << ":"
4834 << NStr::UIntToString(Uint4(create_server));
4835 } else {
4836 str << hostport;
4837 }
4838 str << "\"";
4839 str << ",\"control_id\":" << m_BlobAccess->GetCurCreateId();
4840 str << ",\"ttl\":" << m_BlobAccess->GetCurBlobTTL();
4841 t.Sec() = m_BlobAccess->GetCurBlobExpire();
4842 t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4843 str << ",\"expire\":\"" << time_buf << "\"";
4844 str << ",\"size\":" << m_BlobAccess->GetCurBlobSize();
4845 tmp = m_BlobAccess->GetCurPassword();
4846 if (!tmp.empty()) {
4847 tmp = "yes";
4848 }
4849 str << ",\"password\":\"" << tmp << "\"";
4850 str << ",\"version\":" << m_BlobAccess->GetCurBlobVersion();
4851 str << ",\"version_ttl\":" << m_BlobAccess->GetCurVersionTTL();
4852 t.Sec() = m_BlobAccess->GetCurVerExpire();
4853 t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4854 str << ",\"version_expire\":\"" << time_buf << "\"";
4855 m_PosponedCmd += CNcbiOstrstreamToString(str);
4856
4857 return &CNCMessageHandler::x_FinishCommand;
4858 }
4859 m_SendBuff.reset(new TNCBufferType());
4860 m_SendBuff->reserve_mem(1024);
4861
4862 tmp = "OK:Slot: ";
4863 m_SendBuff->append(tmp.data(), tmp.size());
4864 tmp = NStr::UIntToString(m_BlobSlot);
4865 m_SendBuff->append(tmp.data(), tmp.size());
4866
4867 tmp = "\nOK:Write time: ";
4868 m_SendBuff->append(tmp.data(), tmp.size());
4869 Uint8 create_time = m_BlobAccess->GetCurBlobCreateTime();
4870 CSrvTime t;
4871 t.Sec() = time_t(create_time / kUSecsPerSecond);
4872 t.NSec() = (create_time % kUSecsPerSecond) * 1000;
4873 t.Print(time_buf, CSrvTime::eFmtHumanUSecs);
4874 m_SendBuff->append(time_buf, strlen(time_buf));
4875
4876 tmp = "\nOK:Control server: ";
4877 m_SendBuff->append(tmp.data(), tmp.size());
4878 Uint8 create_server = m_BlobAccess->GetCurCreateServer();
4879 tmp = CNCDistributionConf::GetPeerNameOrEmpty(create_server);
4880 if (tmp.empty()) {
4881 tmp = CTaskServer::GetHostByIP(Uint4(create_server >> 32));
4882 m_SendBuff->append(tmp.data(), tmp.size());
4883 m_SendBuff->append(":", 1);
4884 tmp = NStr::UIntToString(Uint4(create_server));
4885 m_SendBuff->append(tmp.data(), tmp.size());
4886 } else {
4887 m_SendBuff->append(tmp.data(), tmp.size());
4888 }
4889 tmp = "\nOK:Control id: ";
4890 m_SendBuff->append(tmp.data(), tmp.size());
4891 tmp = NStr::Int8ToString(m_BlobAccess->GetCurCreateId());
4892 m_SendBuff->append(tmp.data(), tmp.size());
4893
4894 tmp = "\nOK:TTL: ";
4895 m_SendBuff->append(tmp.data(), tmp.size());
4896 tmp = NStr::UIntToString(m_BlobAccess->GetCurBlobTTL());
4897 m_SendBuff->append(tmp.data(), tmp.size());
4898
4899 tmp = "\nOK:Expire: ";
4900 m_SendBuff->append(tmp.data(), tmp.size());
4901 t.Sec() = m_BlobAccess->GetCurBlobExpire();
4902 t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4903 m_SendBuff->append(time_buf, strlen(time_buf));
4904
4905 tmp = "\nOK:Size: ";
4906 m_SendBuff->append(tmp.data(), tmp.size());
4907 tmp = NStr::UInt8ToString(m_BlobAccess->GetCurBlobSize());
4908 m_SendBuff->append(tmp.data(), tmp.size());
4909
4910 tmp = "\nOK:Password: '";
4911 m_SendBuff->append(tmp.data(), tmp.size());
4912 tmp = m_BlobAccess->GetCurPassword();
4913 if (!tmp.empty()) {
4914 tmp = "yes";
4915 m_SendBuff->append(tmp.data(), tmp.size());
4916 }
4917 m_SendBuff->append("'", 1);
4918
4919 tmp = "\nOK:Version: ";
4920 m_SendBuff->append(tmp.data(), tmp.size());
4921 tmp = NStr::IntToString(m_BlobAccess->GetCurBlobVersion());
4922 m_SendBuff->append(tmp.data(), tmp.size());
4923
4924 tmp = "\nOK:Version's TTL: ";
4925 m_SendBuff->append(tmp.data(), tmp.size());
4926 tmp = NStr::UIntToString(m_BlobAccess->GetCurVersionTTL());
4927 m_SendBuff->append(tmp.data(), tmp.size());
4928
4929 tmp = "\nOK:Version expire: ";
4930 m_SendBuff->append(tmp.data(), tmp.size());
4931 t.Sec() = m_BlobAccess->GetCurVerExpire();
4932 t.Print(time_buf, CSrvTime::eFmtHumanSeconds);
4933 m_SendBuff->append(time_buf, strlen(time_buf));
4934
4935 tmp = "\nOK:END\n";
4936 m_SendBuff->append(tmp.data(), tmp.size());
4937
4938
4939 x_ReportOK("OK:SIZE=").WriteNumber(m_SendBuff->size()).WriteText("\n");
4940 m_SendPos = 0;
4941 return &CNCMessageHandler::x_WriteSendBuff;
4942 }
4943
4944 CNCMessageHandler::State
x_DoCmd_ProxyMeta(void)4945 CNCMessageHandler::x_DoCmd_ProxyMeta(void)
4946 {
4947 LOG_CURRENT_FUNCTION
4948 if (!m_BlobAccess->IsBlobExists())
4949 return &CNCMessageHandler::x_ReportBlobNotFound;
4950
4951 x_ReportOK("OK:SIZE=0 ");
4952 WriteNumber(m_BlobAccess->IsValid() ? m_BlobAccess->GetCurBlobCreateTime() : 123).WriteText(" ");
4953 WriteNumber(m_BlobAccess->GetCurCreateServer()).WriteText(" ");
4954 WriteNumber(m_BlobAccess->GetCurCreateId()).WriteText(" ");
4955 WriteNumber(m_BlobAccess->GetCurBlobDeadTime()).WriteText(" ");
4956 WriteNumber(m_BlobAccess->GetCurBlobExpire()).WriteText(" ");
4957 WriteNumber(m_BlobAccess->GetCurVerExpire());
4958 WriteText("\n");
4959
4960 return &CNCMessageHandler::x_FinishCommand;
4961 }
4962
4963 CNCMessageHandler::State
x_DoCmd_CopyUpdate(void)4964 CNCMessageHandler::x_DoCmd_CopyUpdate(void)
4965 {
4966 LOG_CURRENT_FUNCTION
4967 #if USE_ALWAYS_COPY_UPD
4968 if (!m_BlobAccess->IsBlobExists()) {
4969 // create zero-length blob just to note that it exists
4970 // it will come soon anyway
4971 m_BlobAccess->SetBlobTTL(x_GetBlobTTL());
4972 m_BlobAccess->SetVersionTTL(0);
4973 m_BlobAccess->SetBlobVersion(0);
4974 m_BlobAccess->GetWriteMemSize();
4975 int cur_secs = int(CSrvTime::Current().Sec());
4976 // this one will be older
4977 m_BlobAccess->SetBlobCreateTime(m_CopyBlobInfo->create_time - 1);
4978 // and with short life
4979 m_BlobAccess->SetNewBlobExpire( cur_secs + 60);
4980 m_BlobAccess->SetNewVerExpire( cur_secs + 60);
4981 m_BlobAccess->SetCreateServer(CNCDistributionConf::GetSelfID(),
4982 CNCBlobStorage::GetNewBlobId());
4983 m_BlobAccess->Finalize();
4984 }
4985 #endif
4986 if (m_BlobAccess->IsBlobExists()) {
4987 m_BlobAccess->UpdateMetaInfo(m_CopyBlobInfo->create_server, m_CopyBlobInfo->create_time);
4988 }
4989 return &CNCMessageHandler::x_FinishCommand;
4990 }
4991
4992 /*
4993 CNCMessageHandler::State
4994 CNCMessageHandler::x_DoCmd_GetBlobsList(void)
4995 {
4996 const vector<Uint2>& slots = CNCDistributionConf::GetSelfSlots();
4997 string slot_str;
4998 ITERATE(vector<Uint2>, it_slot, slots) {
4999 slot_str += NStr::UIntToString(*it_slot);
5000 slot_str += ",";
5001 }
5002 slot_str.resize(slot_str.size() - 1);
5003 Write("OK:Serving slots: ").Write(slot_str).Write("\n");
5004
5005 ITERATE(vector<Uint2>, it_slot, slots) {
5006 TNCBlobSumList blobs_lst;
5007 CNCBlobStorage::GetFullBlobsList(*it_slot, blobs_lst);
5008 NON_CONST_ITERATE(TNCBlobSumList, it_blob, blobs_lst) {
5009 if (x_NeedEarlyClose())
5010 goto error_return;
5011
5012 const string& raw_key = it_blob->first;
5013 SNCCacheData*& blob_sum = it_blob->second;
5014 string cache_name, key, subkey;
5015 CNCBlobKey::UnpackBlobKey(raw_key, cache_name, key, subkey);
5016 Write("OK:key: ").Write(key);
5017 if (!cache_name.empty())
5018 Write(", subkey: ").Write(subkey);
5019 Write(", expire: ");
5020 CTime tmp_time(CTime::eEmpty, CTime::eLocal);
5021 tmp_time.SetTimeT(time_t(blob_sum->expire));
5022 Write(tmp_time.AsString("M/D/Y h:m:s"));
5023 Write(", create_time: ");
5024 tmp_time.SetTimeT(time_t(blob_sum->create_time / kNCTimeTicksInSec));
5025 tmp_time.SetMicroSecond(blob_sum->create_time % kNCTimeTicksInSec);
5026 Write(tmp_time.AsString("M/D/Y h:m:s.r"));
5027 Write(", create_id: ").Write(blob_sum->create_id);
5028 Write(", create_server: ");
5029 Write(CTaskServer::GetHostByIP(Uint4(blob_sum->create_server >> 32)));
5030 Write(":").Write(Uint4(blob_sum->create_server));
5031 Write("\n");
5032 delete blob_sum;
5033 blob_sum = NULL;
5034 }
5035 continue;
5036
5037 error_return:
5038 ITERATE(TNCBlobSumList, it_blob, blobs_lst) {
5039 delete it_blob->second;
5040 }
5041 return &CNCMessageHandler::x_CloseCmdAndConn;
5042 }
5043 Write("OK:END\n");
5044 return &CNCMessageHandler::x_FinishCommand;
5045 }
5046 */
5047 CNCMessageHandler::State
x_DoCmd_NotImplemented(void)5048 CNCMessageHandler::x_DoCmd_NotImplemented(void)
5049 {
5050 LOG_CURRENT_FUNCTION
5051 x_ReportError(eStatus_NoImpl);
5052 return &CNCMessageHandler::x_FinishCommand;
5053 }
5054
5055 CNCMessageHandler::State
x_DoCmd_Purge(void)5056 CNCMessageHandler::x_DoCmd_Purge(void)
5057 {
5058 LOG_CURRENT_FUNCTION
5059 if (CNCBlobAccessor::Purge( m_NCBlobKey, m_CmdStartTime.AsUSec())) {
5060 CNCBlobStorage::SavePurgeData();
5061 }
5062 CNCDistributionConf::GetPeerServers(m_CheckSrvs);
5063 return &CNCMessageHandler::x_PurgeToNextPeer;
5064 }
5065
5066 CNCMessageHandler::State
x_DoCmd_CopyPurge(void)5067 CNCMessageHandler::x_DoCmd_CopyPurge(void)
5068 {
5069 LOG_CURRENT_FUNCTION
5070 if (CNCBlobAccessor::Purge( m_NCBlobKey, m_CopyBlobInfo->create_time)) {
5071 CNCBlobStorage::SavePurgeData();
5072 }
5073 return &CNCMessageHandler::x_FinishCommand;
5074 }
5075
5076 CNCMessageHandler::State
x_DoCmd_GetBList(void)5077 CNCMessageHandler::x_DoCmd_GetBList(void)
5078 {
5079 m_SendBuff.reset(new TNCBufferType());
5080 bool newsep = NStr::Find(m_ParsedCmd.command->cmd, "BLIST2") != NPOS;
5081 CNCBlobStorage::GetBList(m_NCBlobKey.PackedKey(), m_SendBuff,m_BlobFilter, newsep ? "\t" : ",");
5082 CNCDistributionConf::AddServerSlots(m_SlotsDone, 0);
5083 ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
5084 if (proxy_cmd != eProxyGetBList2) {
5085 x_ReportOK("OK: SIZE=").WriteNumber(m_SendBuff->size()).WriteText("\n");
5086 Flush();
5087 }
5088 m_SendPos = 0;
5089 return &CNCMessageHandler::x_WriteSendBuff;
5090 }
5091
5092 CNCMessageHandler::State
x_DoCmd_GetBListNext(void)5093 CNCMessageHandler::x_DoCmd_GetBListNext(void)
5094 {
5095 x_UnsetFlag(fNoReplyOnFinish);
5096 if (!m_CheckSrvs.empty()) {
5097 CNCDistributionConf::AddServerSlots(m_SlotsDone, m_CheckSrvs[m_SrvsIndex-1]);
5098 }
5099 Uint2 slot, slot_max = CNCDistributionConf::GetMaxSlotNumber();
5100 for (slot=1; slot<slot_max; ++slot) {
5101 if (m_SlotsDone.find(slot) == m_SlotsDone.end()) {
5102 m_CheckSrvs = CNCDistributionConf::GetRawServersForSlot(slot);
5103 if (m_CheckSrvs.empty()) {
5104 x_ReportError(eStatus_CondFailed);
5105 return &CNCMessageHandler::x_FinishCommand;
5106 }
5107 return &CNCMessageHandler::x_ProxyToNextPeer;
5108 }
5109 }
5110 return &CNCMessageHandler::x_FinishCommand;
5111 }
5112
5113 void
BeginProxyResponse(const CTempString & response,size_t content_length)5114 CNCMessageHandler::BeginProxyResponse(const CTempString& response, size_t content_length)
5115 {
5116 if (x_IsHttpMode()) {
5117 if (NStr::StartsWith(response, "HTTP")) {
5118 WriteText(response).WriteText("\r\n");
5119 } else {
5120 x_WriteHttpHeader(eStatus_OK, content_length, true);
5121 }
5122 x_SetFlag(fNoReplyOnFinish);
5123 return;
5124 }
5125 ENCProxyCmd proxy_cmd = m_ParsedCmd.command->extra.proxy_cmd;
5126 if (proxy_cmd != eProxyGetBList2) {
5127 x_ReportOK(response).WriteText("\n");
5128 }
5129 }
5130
5131 void
x_WriteHttpResponse(void)5132 CNCMessageHandler::x_WriteHttpResponse(void)
5133 {
5134 int cmd_status = GetDiagCtx()->GetRequestStatus();
5135 bool succeeded = x_IsCmdSucceeded(cmd_status);
5136 size_t content_length = 0;
5137 string envelope;
5138 if (succeeded && !m_PosponedCmd.empty()) {
5139 CNcbiOstrstream str;
5140 str << "{\"status\":" << cmd_status;
5141 envelope = CNcbiOstrstreamToString(str);
5142 content_length = envelope.size() + m_PosponedCmd.size() + 1;
5143 }
5144 x_WriteHttpHeader(cmd_status, content_length, false);
5145 if (content_length != 0) {
5146 Write( envelope.data(), envelope.size());
5147 Write(m_PosponedCmd.data(), m_PosponedCmd.size());
5148 WriteText("}");
5149 }
5150 }
5151
5152 void
x_WriteHttpHeader(int cmd_status,size_t content_length,bool binary)5153 CNCMessageHandler::x_WriteHttpHeader(int cmd_status, size_t content_length, bool binary)
5154 {
5155 bool succeeded = x_IsCmdSucceeded(cmd_status);
5156 WriteText("HTTP/1.");
5157 WriteText(m_HttpMode == eHttp10 ? "0" : "1");
5158 WriteText(" ").WriteNumber(cmd_status).WriteText(" ");
5159 WriteText(succeeded ? "OK" : "ERR").WriteText("\r\n");
5160 WriteText("Content-Length: ").WriteNumber(content_length).WriteText("\r\n");
5161 if (content_length != 0) {
5162 if (binary) {
5163 WriteText("Content-Type: application/octet-stream\r\n");
5164 } else {
5165 WriteText("Content-Type: application/json\r\n");
5166 }
5167 }
5168 WriteText("\r\n");
5169 }
5170
5171
CNCMsgHandler_Factory(void)5172 CNCMsgHandler_Factory::CNCMsgHandler_Factory(void)
5173 {}
5174
~CNCMsgHandler_Factory(void)5175 CNCMsgHandler_Factory::~CNCMsgHandler_Factory(void)
5176 {}
5177
5178 CSrvSocketTask*
CreateSocketTask(void)5179 CNCMsgHandler_Factory::CreateSocketTask(void)
5180 {
5181 return new CNCMessageHandler();
5182 }
5183
5184 END_NCBI_SCOPE
5185