1 /* $Id: distribution_conf.cpp 633628 2021-06-22 18:24:03Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Denis Vakatov, Pavel Ivanov, Sergey Satskiy
27 *
28 * File Description: Data structures and API to support blobs mirroring.
29 *
30 */
31
32
33 #include "nc_pch.hpp"
34
35 #include <corelib/ncbireg.hpp>
36 #include <util/checksum.hpp>
37 #include <util/random_gen.hpp>
38
39 #include "task_server.hpp"
40
41 #include "netcached.hpp"
42 #include "distribution_conf.hpp"
43 #include "peer_control.hpp"
44 #include <random>
45
46 BEGIN_NCBI_SCOPE
47
48
49 struct SSrvGroupInfo
50 {
51 Uint8 srv_id;
52 string grp;
53
SSrvGroupInfoSSrvGroupInfo54 SSrvGroupInfo(Uint8 srv, const string& group)
55 : srv_id(srv), grp(group)
56 {}
57 };
58
59 typedef vector<SSrvGroupInfo> TSrvGroupsList;
60 typedef map<Uint2, TSrvGroupsList> TSrvGroupsMap;
61
62 typedef map<Uint2, TServersList> TSlot2SrvMap;
63 typedef map<Uint8, vector<Uint2> > TSrv2SlotMap;
64
65 struct SSrvMirrorInfo
66 {
67 TNCPeerList s_Peers;
68 TSlot2SrvMap s_RawSlot2Servers;
69 TSrvGroupsMap s_Slot2Servers;
70 TSrv2SlotMap s_CommonSlots;
71 TSrv2SlotMap s_Servers2Slots;
72 // ID is defined by IP and port, so this map might be incorrect
73 // but we also check the current slot number
74 // while it is possible to handle same slot on different instances of NC
75 // on the same machine (same IP), it does not make sense and should be avoided
76 map<Uint4, Uint8> s_IpToId;
77 // server credentials
78 map<Uint8, Uint8> s_SrvTrust;
79 };
80 static SSrvMirrorInfo* s_MirrorConf = NULL;
81 #if 0
82 static CMiniMutex s_MirrorConf;
83 static TNCPeerList s_Peers;
84 static TSlot2SrvMap s_RawSlot2Servers;
85 static TSrvGroupsMap s_Slot2Servers;
86 static TSrv2SlotMap s_CommonSlots;
87 #endif
88 static Uint2 s_MaxSlotNumber = 0;
89
90 static vector<Uint2> s_SelfSlots;
91 static Uint2 s_CntSlotBuckets = 0;
92 static Uint2 s_CntTimeBuckets = 0;
93 static Uint4 s_SlotRndShare = numeric_limits<Uint4>::max();
94 static Uint4 s_TimeRndShare = numeric_limits<Uint4>::max();
95 static Uint8 s_SelfID = 0;
96 static Uint8 s_SelfTrust = 0;
97 static Uint4 s_SelfIP = 0;
98 static string s_SelfGroup;
99 static string s_SelfName;
100 static CMiniMutex s_KeyRndLock;
101 static CRandom s_KeyRnd(CRandom::TValue(time(NULL)));
102 static string s_SelfHostIP;
103 static CAtomicCounter s_BlobId;
104 static Uint4 s_SelfAlias = 0;
105 static string s_MirroringSizeFile;
106 static string s_PeriodicLogFile;
107 static string s_CopyDelayLogFile;
108 static FILE* s_CopyDelayLog = NULL;
109 static Uint1 s_CntActiveSyncs = 4;
110 static Uint1 s_MaxSyncsOneServer = 2;
111 static Uint4 s_TaskPrioritySync = 10;
112 static Uint2 s_MaxPeerTotalConns = 100;
113 static Uint2 s_MaxPeerBGConns = 50;
114 static Uint2 s_CntErrorsToThrottle = 10;
115 static Uint2 s_CntThrottlesToIpchange = 10;
116 static Uint8 s_PeerThrottlePeriod = 10 * kUSecsPerSecond;
117 static Uint1 s_PeerTimeout = 2;
118 static Uint1 s_BlobListTimeout = 10;
119 static Uint8 s_SmallBlobBoundary = 65535;
120 static Uint2 s_MaxMirrorQueueSize = 10000;
121 static string s_SyncLogFileName;
122 static Uint4 s_MaxSlotLogEvents = 0;
123 static Uint4 s_CleanLogReserve = 0;
124 static Uint4 s_MaxCleanLogBatch = 0;
125 static Uint8 s_MinForcedCleanPeriod = 0;
126 static Uint4 s_CleanAttemptInterval = 0;
127 static Uint8 s_PeriodicSyncInterval = 0;
128 //static Uint8 s_PeriodicSyncHeadTime;
129 //static Uint8 s_PeriodicSyncTailTime;
130 static Uint8 s_PeriodicSyncTimeout = 0;
131 static Uint8 s_FailedSyncRetryDelay = 0;
132 static Uint8 s_NetworkErrorTimeout = 0;
133 static Uint8 s_MaxBlobSizeSync = 0;
134 static bool s_WarnBlobSizeSync = true;
135 static bool s_BlobUpdateHotline = true;
136 static bool s_SlotByRawkey = false;
137
138 static const char* kNCReg_NCPoolSection = "mirror";
139 static string kNCReg_NCServerPrefix = "server_";
140 static string kNCReg_NCServerSlotsPrefix = "srv_slots_";
141 static string kNCReg_NCServerTrustPrefix = "srv_trust_";
142
143
144 bool
Initialize(Uint2 control_port)145 CNCDistributionConf::Initialize(Uint2 control_port)
146 {
147 string log_pfx("Bad configuration: ");
148 const CNcbiRegistry& reg = CTaskServer::GetConfRegistry();
149
150 Uint4 self_host = CTaskServer::GetIPByHost(CTaskServer::GetHostName());
151 s_SelfHostIP = CTaskServer::IPToString(self_host);
152 s_SelfIP = self_host;
153 s_SelfID = (Uint8(self_host) << 32) + control_port;
154 s_SelfAlias = CNCDistributionConf::CreateHostAlias(self_host, control_port);
155 s_BlobId.Set(0);
156
157 string err_message;
158 if (!InitMirrorConfig(reg, err_message)) {
159 SRV_LOG(Critical, log_pfx << err_message);
160 return false;
161 }
162
163 if (s_MaxSlotNumber <= 1) {
164 s_MaxSlotNumber = 1;
165 s_SlotRndShare = numeric_limits<Uint4>::max();
166 }
167 else {
168 s_SlotRndShare = numeric_limits<Uint4>::max() / s_MaxSlotNumber + 1;
169 }
170
171 s_MirroringSizeFile = reg.Get(kNCReg_NCPoolSection, "mirroring_log_file");
172 s_PeriodicLogFile = reg.Get(kNCReg_NCPoolSection, "periodic_log_file");
173
174 s_CopyDelayLogFile = reg.Get(kNCReg_NCPoolSection, "copy_delay_log_file");
175 if (!s_CopyDelayLogFile.empty()) {
176 s_CopyDelayLog = fopen(s_CopyDelayLogFile.c_str(), "a");
177 }
178
179 try {
180 s_CntSlotBuckets = reg.GetInt(kNCReg_NCPoolSection, "cnt_slot_buckets", 10);
181 if (numeric_limits<Uint2>::max() / s_CntSlotBuckets < s_MaxSlotNumber) {
182 SRV_LOG(Critical, log_pfx << "too many buckets per slot ("
183 << s_CntSlotBuckets << ") with given number of slots ("
184 << s_MaxSlotNumber << ").");
185 return false;
186 }
187 s_CntTimeBuckets = s_CntSlotBuckets * s_MaxSlotNumber;
188 s_TimeRndShare = s_SlotRndShare / s_CntSlotBuckets + 1;
189 s_CntActiveSyncs = reg.GetInt(kNCReg_NCPoolSection, "max_active_syncs", 4);
190 s_MaxSyncsOneServer = reg.GetInt(kNCReg_NCPoolSection, "max_syncs_one_server", 2);
191 s_TaskPrioritySync = reg.GetInt(kNCReg_NCPoolSection, "task_priority_sync", 10);
192 s_MaxPeerTotalConns = reg.GetInt(kNCReg_NCPoolSection, "max_peer_connections", 100);
193 s_MaxPeerBGConns = reg.GetInt(kNCReg_NCPoolSection, "max_peer_bg_connections", 50);
194 s_CntErrorsToThrottle = reg.GetInt(kNCReg_NCPoolSection, "peer_errors_for_throttle", 10);
195 s_CntThrottlesToIpchange = reg.GetInt(kNCReg_NCPoolSection, "peer_throttles_for_ip_change", 10);
196 s_PeerThrottlePeriod = reg.GetInt(kNCReg_NCPoolSection, "peer_throttle_period", 10);
197 s_PeerThrottlePeriod *= kUSecsPerSecond;
198 s_PeerTimeout = reg.GetInt(kNCReg_NCPoolSection, "peer_communication_timeout", 2);
199 s_BlobListTimeout = reg.GetInt(kNCReg_NCPoolSection, "peer_blob_list_timeout", 10);
200 s_SmallBlobBoundary = reg.GetInt(kNCReg_NCPoolSection, "small_blob_max_size", 100);
201 s_SmallBlobBoundary *= 1000;
202 s_MaxMirrorQueueSize = reg.GetInt(kNCReg_NCPoolSection, "max_instant_queue_size", 10000);
203
204 s_SyncLogFileName = reg.GetString(kNCReg_NCPoolSection, "sync_log_file", "./cache/sync_events.log");
205 s_MaxSlotLogEvents = reg.GetInt(kNCReg_NCPoolSection, "max_slot_log_records", 100000);
206 if (s_MaxSlotLogEvents < 10)
207 s_MaxSlotLogEvents = 10;
208 s_CleanLogReserve = reg.GetInt(kNCReg_NCPoolSection, "clean_slot_log_reserve", 1000);
209 if (s_CleanLogReserve >= s_MaxSlotLogEvents)
210 s_CleanLogReserve = s_MaxSlotLogEvents - 1;
211 s_MaxCleanLogBatch = reg.GetInt(kNCReg_NCPoolSection, "max_clean_log_batch", 10000);
212 s_MinForcedCleanPeriod = reg.GetInt(kNCReg_NCPoolSection, "min_forced_clean_log_period", 10);
213 s_MinForcedCleanPeriod *= kUSecsPerSecond;
214 s_CleanAttemptInterval = reg.GetInt(kNCReg_NCPoolSection, "clean_log_attempt_interval", 1);
215 s_PeriodicSyncInterval = reg.GetInt(kNCReg_NCPoolSection, "deferred_sync_interval", 10);
216 s_PeriodicSyncInterval *= kUSecsPerSecond;
217 s_PeriodicSyncTimeout = reg.GetInt(kNCReg_NCPoolSection, "deferred_sync_timeout", 10);
218 s_PeriodicSyncTimeout *= kUSecsPerSecond;
219 s_FailedSyncRetryDelay = reg.GetInt(kNCReg_NCPoolSection, "failed_sync_retry_delay", 1);
220 s_FailedSyncRetryDelay *= kUSecsPerSecond;
221 s_NetworkErrorTimeout = reg.GetInt(kNCReg_NCPoolSection, "network_error_timeout", 300);
222 s_NetworkErrorTimeout *= kUSecsPerSecond;
223 s_MaxBlobSizeSync = NStr::StringToUInt8_DataSize(reg.GetString(
224 kNCReg_NCPoolSection, "max_blob_size_sync", "1 GB"));
225 s_WarnBlobSizeSync = reg.GetBool( kNCReg_NCPoolSection, "warn_blob_size_sync", true);
226 if (s_MaxBlobSizeSync == 0) {
227 s_WarnBlobSizeSync = false;
228 }
229 s_BlobUpdateHotline = reg.GetBool( kNCReg_NCPoolSection, "blob_update_hotline", true);
230 s_SlotByRawkey = reg.GetBool( kNCReg_NCPoolSection, "slot_calculation_by_key_only", false);
231
232 if (s_WarnBlobSizeSync && s_SmallBlobBoundary > s_MaxBlobSizeSync) {
233 SRV_LOG(Critical, log_pfx << "small_blob_max_size ("
234 << s_SmallBlobBoundary << ") is greater than max_blob_size_sync ("
235 << s_MaxBlobSizeSync << ").");
236 return false;
237 }
238 }
239 catch (CStringException& ex) {
240 SRV_LOG(Critical, log_pfx << ex);
241 return false;
242 }
243 return true;
244 }
245
246 bool
InitMirrorConfig(const CNcbiRegistry & reg,string & err_message)247 CNCDistributionConf::InitMirrorConfig(const CNcbiRegistry& reg, string& err_message)
248 {
249 SSrvMirrorInfo* mirrorCfg = new SSrvMirrorInfo;
250 SSrvMirrorInfo* prevMirrorCfg = ACCESS_ONCE(s_MirrorConf);
251 bool isReconf = prevMirrorCfg != NULL;
252 vector<Uint2> self_slots;
253 mirrorCfg->s_IpToId[s_SelfIP] = s_SelfID;
254
255 string reg_value;
256 bool found_self = false;
257 for (int srv_idx = 0; ; ++srv_idx) {
258 string value_name = kNCReg_NCServerPrefix + NStr::NumericToString(srv_idx);
259 reg_value = reg.Get(kNCReg_NCPoolSection, value_name.c_str());
260 if (reg_value.empty())
261 break;
262
263 string srv_name = reg_value;
264 list<CTempString> srv_fields;
265 ncbi_NStr_Split(reg_value, ":", srv_fields);
266 if (srv_fields.size() != 3) {
267 err_message = srv_name + ": Invalid peer server specification";
268 goto do_error;
269 }
270 list<CTempString>::const_iterator it_fields = srv_fields.begin();
271 string grp_name = *it_fields;
272 ++it_fields;
273 string host_str = *it_fields;
274 Uint4 host = CTaskServer::GetIPByHost(host_str);
275 ++it_fields;
276 string port_str = *it_fields;
277 Uint2 port = NStr::StringToUInt(port_str, NStr::fConvErr_NoThrow);
278 if (host == 0 || port == 0) {
279 err_message = srv_name + ": Host not found";
280 goto do_error;
281 }
282
283 Uint8 trustlevel = 0;
284 value_name = kNCReg_NCServerTrustPrefix + NStr::NumericToString(srv_idx);
285 reg_value = reg.Get(kNCReg_NCPoolSection, value_name.c_str());
286 if (!reg_value.empty()) {
287 trustlevel = NStr::StringToUInt8(reg_value, NStr::fConvErr_NoThrow);
288 }
289
290 Uint8 srv_id = (Uint8(host) << 32) + port;
291 string peer_str = host_str + ":" + port_str;
292 if (srv_id == s_SelfID) {
293 if (found_self) {
294 err_message = srv_name + ": Host described twice";
295 goto do_error;
296 }
297 found_self = true;
298 s_SelfTrust = trustlevel;
299 if (isReconf) {
300 if (s_SelfGroup != grp_name || s_SelfName != peer_str) {
301 err_message = srv_name + ": Changes in self description prohibited (group or name)";
302 goto do_error;
303 }
304 } else {
305 s_SelfGroup = grp_name;
306 s_SelfName = peer_str;
307 }
308 }
309 else {
310 if (mirrorCfg->s_Peers.find(srv_id) != mirrorCfg->s_Peers.end()) {
311 err_message = srv_name + + ": Described twice";
312 goto do_error;
313 }
314 mirrorCfg->s_Peers[srv_id] = peer_str;
315 mirrorCfg->s_SrvTrust[srv_id] = trustlevel;
316 }
317
318 // There must be corresponding description of slots
319 value_name = kNCReg_NCServerSlotsPrefix + NStr::NumericToString(srv_idx);
320 reg_value = reg.Get(kNCReg_NCPoolSection, value_name.c_str());
321 if (reg_value.empty()) {
322 err_message = srv_name + ": No slots for server";
323 goto do_error;
324 }
325
326 list<string> values;
327 ncbi_NStr_Split(reg_value, ",", values);
328 ITERATE(list<string>, it, values) {
329 Uint2 slot = NStr::StringToUInt(*it,
330 NStr::fConvErr_NoThrow | NStr::fAllowLeadingSpaces | NStr::fAllowTrailingSpaces);
331 if (slot == 0) {
332 err_message = srv_name + ": Bad slot number: " + string(*it);
333 goto do_error;
334 }
335 TServersList& srvs = mirrorCfg->s_RawSlot2Servers[slot];
336 if (find(srvs.begin(), srvs.end(), srv_id) != srvs.end()) {
337 err_message = srv_name + ": Slot listed twice: " + string(*it);
338 goto do_error;
339 }
340 if (srv_id == s_SelfID) {
341 vector<Uint2>& slots = isReconf ? self_slots : s_SelfSlots;
342 if (find(slots.begin(), slots.end(), slot) != slots.end()) {
343 err_message = srv_name + ": Slot listed twice: " + string(*it);
344 goto do_error;
345 }
346 slots.push_back(slot);
347 } else {
348 srvs.push_back(srv_id);
349 mirrorCfg->s_Slot2Servers[slot].push_back(SSrvGroupInfo(srv_id, grp_name));
350 mirrorCfg->s_Servers2Slots[srv_id].push_back(slot);
351 }
352 if (isReconf) {
353 if (slot > s_MaxSlotNumber) {
354 err_message = srv_name + ": Slot numbers cannot exceed " + NStr::NumericToString(s_MaxSlotNumber);
355 goto do_error;
356 }
357 } else {
358 s_MaxSlotNumber = max(slot, s_MaxSlotNumber);
359 }
360 }
361 }
362 if (!found_self) {
363 if (mirrorCfg->s_Peers.size() != 0) {
364 err_message = CTaskServer::GetHostName() + ": Self description not found";
365 goto do_error;
366 }
367 if (!isReconf) {
368 s_SelfSlots.push_back(1);
369 s_SelfGroup = "grp1";
370 }
371 }
372 if (isReconf && !self_slots.empty()) {
373 if (!std::equal(s_SelfSlots.begin(), s_SelfSlots.end(), self_slots.begin())) {
374 err_message = s_SelfName + ": Changes in self description prohibited (slots)";
375 goto do_error;
376 }
377 }
378
379 ITERATE(TNCPeerList, it_peer, mirrorCfg->s_Peers) {
380 Uint8 srv_id = it_peer->first;
381 vector<Uint2>& common_slots = mirrorCfg->s_CommonSlots[srv_id];
382 ITERATE(TSlot2SrvMap, it_slot, mirrorCfg->s_RawSlot2Servers) {
383 Uint2 slot = it_slot->first;
384 if (find(s_SelfSlots.begin(), s_SelfSlots.end(), slot) == s_SelfSlots.end())
385 continue;
386 const TServersList& srvs = it_slot->second;
387 if (find(srvs.begin(), srvs.end(), srv_id) != srvs.end()) {
388 common_slots.push_back(it_slot->first);
389 Uint4 ip = (srv_id >> 32);
390 if (ip != s_SelfIP) {
391 mirrorCfg->s_IpToId[ Uint4(srv_id >> 32)] = srv_id;
392 }
393 }
394 }
395 }
396
397 if (AtomicCAS(s_MirrorConf, prevMirrorCfg, mirrorCfg)) {
398 // pre-create all peers
399 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
400 ITERATE(TNCPeerList, p, peers) {
401 CNCPeerControl* peer = CNCPeerControl::Peer(p->first);
402 peer->SetHostProtocol(0);
403 peer->SetTrustLevel(s_MirrorConf->s_SrvTrust[p->first]);
404 }
405 return true;
406 }
407 do_error:
408 delete mirrorCfg;
409 return false;
410 }
411
412 bool
ReConfig(const CNcbiRegistry & new_reg,string & err_message)413 CNCDistributionConf::ReConfig(const CNcbiRegistry& new_reg, string& err_message)
414 // we only add or remove peer servers, nothing else
415 {
416 if (!InitMirrorConfig(new_reg, err_message)) {
417 return false;
418 }
419 // modify old registry to remember the changes
420 CNcbiRegistry& old_reg = CTaskServer::SetConfRegistry();
421 // remove old peers
422 string value_name, value;
423 size_t srv_idx;
424 for (srv_idx = 0; ; ++srv_idx) {
425 value_name = kNCReg_NCServerPrefix + NStr::NumericToString(srv_idx);
426 value = old_reg.Get(kNCReg_NCPoolSection, value_name);
427 if (value.empty()) {
428 break;
429 }
430 old_reg.Set(kNCReg_NCPoolSection, value_name, kEmptyStr, CNcbiRegistry::fPersistent);
431 value_name = kNCReg_NCServerSlotsPrefix + NStr::NumericToString(srv_idx);
432 old_reg.Set(kNCReg_NCPoolSection, value_name, kEmptyStr, CNcbiRegistry::fPersistent);
433 }
434 // add new ones
435 for (srv_idx = 0; ; ++srv_idx) {
436 value_name = kNCReg_NCServerPrefix + NStr::NumericToString(srv_idx);
437 value = new_reg.Get(kNCReg_NCPoolSection, value_name);
438 if (value.empty())
439 break;
440 old_reg.Set(kNCReg_NCPoolSection, value_name, value, CNcbiRegistry::fPersistent);
441 value_name = kNCReg_NCServerSlotsPrefix + NStr::NumericToString(srv_idx);
442 value = new_reg.Get(kNCReg_NCPoolSection, value_name);
443 old_reg.Set(kNCReg_NCPoolSection, value_name, value, CNcbiRegistry::fPersistent);
444 }
445
446 #if 0
447 // compare (I am not sure though that slot lists should be identical)
448 map<string, set<Uint2> >::const_iterator pn, po;
449 for(pn = new_peers.begin(); pn != new_peers.end(); ++pn) {
450 po = old_peers.find(pn->first);
451 if (po != old_peers.end()) {
452 if (pn->second.size() != po->second.size() ||
453 !std::equal(pn->second.begin(), pn->second.end(), po->second.begin())) {
454 err_message = pn->first + ": Slot lists differ in old and new configurations";
455 return false;
456 }
457 }
458 }
459 for(po = old_peers.begin(); po != old_peers.end(); ++po) {
460 pn = new_peers.find(po->first);
461 if (pn != new_peers.end()) {
462 if (po->second.size() != pn->second.size() ||
463 !std::equal(po->second.begin(), po->second.end(), pn->second.begin())) {
464 err_message = po->first + ": Slot lists differ in old and new configurations";
465 return false;
466 }
467 }
468 }
469
470 // remove common peers
471 // here I assume common peers have identical slot lists
472 for(pn = new_peers.begin(); pn != new_peers.end(); ++pn) {
473 po = old_peers.find(pn->first);
474 if (po != old_peers.end()) {
475 old_peers.erase(po);
476 }
477 }
478 for(po = old_peers.begin(); po != old_peers.end(); ++po) {
479 pn = new_peers.find(po->first);
480 if (pn != new_peers.end()) {
481 new_peers.erase(pn);
482 }
483 }
484 #endif
485 return true;
486 }
487
488 void
Finalize(void)489 CNCDistributionConf::Finalize(void)
490 {
491 if (s_CopyDelayLog)
492 fclose(s_CopyDelayLog);
493 }
494
WriteSetup(CSrvSocketTask & task)495 void CNCDistributionConf::WriteSetup(CSrvSocketTask& task)
496 {
497 string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
498
499 //self
500 task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteNumber(0).WriteText(iss);
501 task.WriteText(s_SelfName).WriteText("\"");
502 task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteText("id_").WriteNumber(0).WriteText(is);
503 task.WriteNumber(s_SelfID);
504 task.WriteText(eol).WriteText(kNCReg_NCServerTrustPrefix).WriteNumber(0).WriteText(is).WriteNumber(s_SelfTrust);
505 task.WriteText(eol).WriteText(kNCReg_NCServerSlotsPrefix).WriteNumber(0).WriteText(is).WriteText("[");
506 ITERATE( vector<Uint2>, s, s_SelfSlots) {
507 if (s != s_SelfSlots.begin()) {
508 task.WriteText(",");
509 }
510 task.WriteNumber(*s);
511 }
512 task.WriteText("]");
513
514 // peers
515 int idx=1;
516 SSrvMirrorInfo* mirrorConf = ACCESS_ONCE(s_MirrorConf);
517 ITERATE( TNCPeerList, p, mirrorConf->s_Peers) {
518 task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteNumber(idx).WriteText(iss);
519 task.WriteText(p->second).WriteText("\"");
520 task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteText("id_").WriteNumber(idx).WriteText(is);
521 task.WriteNumber(p->first);
522 task.WriteText(eol).WriteText(kNCReg_NCServerTrustPrefix).WriteNumber(idx).WriteText(is);
523 task.WriteNumber(CNCPeerControl::Peer(p->first)->GetRawTrustLevel());
524
525 Uint8 srv_id = p->first;
526 vector<Uint2> slots;
527 // find slots servers by this peer
528 ITERATE(TSlot2SrvMap, it_slot, mirrorConf->s_RawSlot2Servers) {
529 Uint2 slot = it_slot->first;
530 const TServersList& srvs = it_slot->second;
531 if (find(srvs.begin(), srvs.end(), srv_id) != srvs.end()) {
532 slots.push_back(slot);
533 }
534 }
535 task.WriteText(eol).WriteText(kNCReg_NCServerSlotsPrefix).WriteNumber(idx).WriteText(is).WriteText("[");
536 ITERATE( vector<Uint2>, s, slots) {
537 if (s != slots.begin()) {
538 task.WriteText(",");
539 }
540 task.WriteNumber(*s);
541 }
542 task.WriteText("]");
543 ++idx;
544 }
545
546 task.WriteText(eol).WriteText("mirroring_log_file" ).WriteText(iss).WriteText(s_MirroringSizeFile).WriteText(eos);
547 task.WriteText(eol).WriteText("periodic_log_file" ).WriteText(iss).WriteText(s_PeriodicLogFile ).WriteText(eos);
548 task.WriteText(eol).WriteText("copy_delay_log_file").WriteText(iss).WriteText(s_CopyDelayLogFile ).WriteText(eos);
549 task.WriteText(eol).WriteText("sync_log_file" ).WriteText(iss).WriteText(s_SyncLogFileName ).WriteText(eos);
550
551 task.WriteText(eol).WriteText("cnt_slot_buckets" ).WriteText(is).WriteNumber(s_CntSlotBuckets);
552 task.WriteText(eol).WriteText("max_active_syncs" ).WriteText(is).WriteNumber(s_CntActiveSyncs);
553 task.WriteText(eol).WriteText("max_syncs_one_server" ).WriteText(is).WriteNumber(s_MaxSyncsOneServer);
554 task.WriteText(eol).WriteText("task_priority_sync" ).WriteText(is).WriteNumber(s_TaskPrioritySync);
555 task.WriteText(eol).WriteText("max_peer_connections" ).WriteText(is).WriteNumber(s_MaxPeerTotalConns);
556 task.WriteText(eol).WriteText("max_peer_bg_connections" ).WriteText(is).WriteNumber(s_MaxPeerBGConns);
557 task.WriteText(eol).WriteText("peer_errors_for_throttle" ).WriteText(is).WriteNumber(s_CntErrorsToThrottle);
558 task.WriteText(eol).WriteText("peer_throttles_for_ip_change").WriteText(is).WriteNumber(s_CntThrottlesToIpchange);
559 task.WriteText(eol).WriteText("peer_throttle_period" ).WriteText(is).WriteNumber(s_PeerThrottlePeriod/kUSecsPerSecond);
560 task.WriteText(eol).WriteText("peer_communication_timeout" ).WriteText(is).WriteNumber(s_PeerTimeout);
561 task.WriteText(eol).WriteText("peer_blob_list_timeout" ).WriteText(is).WriteNumber(s_BlobListTimeout);
562 task.WriteText(eol).WriteText("small_blob_max_size" ).WriteText(is).WriteNumber(s_SmallBlobBoundary/1000);
563 task.WriteText(eol).WriteText("max_instant_queue_size" ).WriteText(is).WriteNumber(s_MaxMirrorQueueSize);
564 task.WriteText(eol).WriteText("max_slot_log_records" ).WriteText(is).WriteNumber(s_MaxSlotLogEvents);
565 task.WriteText(eol).WriteText("clean_slot_log_reserve" ).WriteText(is).WriteNumber(s_CleanLogReserve);
566 task.WriteText(eol).WriteText("max_clean_log_batch" ).WriteText(is).WriteNumber(s_MaxCleanLogBatch);
567 task.WriteText(eol).WriteText("min_forced_clean_log_period").WriteText(is).WriteNumber(s_MinForcedCleanPeriod/kUSecsPerSecond);
568 task.WriteText(eol).WriteText("clean_log_attempt_interval" ).WriteText(is).WriteNumber(s_CleanAttemptInterval);
569 task.WriteText(eol).WriteText("deferred_sync_interval" ).WriteText(is).WriteNumber(s_PeriodicSyncInterval/kUSecsPerSecond);
570 task.WriteText(eol).WriteText("deferred_sync_timeout" ).WriteText(is).WriteNumber(s_PeriodicSyncTimeout/ kUSecsPerSecond);
571 task.WriteText(eol).WriteText("failed_sync_retry_delay" ).WriteText(is).WriteNumber(s_FailedSyncRetryDelay/kUSecsPerSecond);
572 task.WriteText(eol).WriteText("network_error_timeout" ).WriteText(is).WriteNumber(s_NetworkErrorTimeout/ kUSecsPerSecond);
573 task.WriteText(eol).WriteText("max_blob_size_sync").WriteText(str).WriteText(iss)
574 .WriteText(NStr::UInt8ToString_DataSize( s_MaxBlobSizeSync)).WriteText(eos);
575 task.WriteText(eol).WriteText("max_blob_size_sync").WriteText(is ).WriteNumber( s_MaxBlobSizeSync);
576 task.WriteText(eol).WriteText("warn_blob_size_sync").WriteText(is ).WriteBool(s_WarnBlobSizeSync);
577 task.WriteText(eol).WriteText("blob_update_hotline").WriteText(is ).WriteBool(s_BlobUpdateHotline);
578 task.WriteText(eol).WriteText("slot_calculation_by_rawkey").WriteText(is ).WriteBool(s_SlotByRawkey);
579 }
580
WriteEnvInfo(CSrvSocketTask & task)581 void CNCDistributionConf::WriteEnvInfo(CSrvSocketTask& task)
582 {
583 string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
584 task.WriteText(eol).WriteText("mirroring_log_file" ).WriteText(iss).WriteText(s_MirroringSizeFile).WriteText(eos);
585 task.WriteText(eol).WriteText("periodic_log_file" ).WriteText(iss).WriteText(s_PeriodicLogFile ).WriteText(eos);
586 task.WriteText(eol).WriteText("copy_delay_log_file").WriteText(iss).WriteText(s_CopyDelayLogFile ).WriteText(eos);
587 task.WriteText(eol).WriteText("sync_log_file" ).WriteText(iss).WriteText(s_SyncLogFileName ).WriteText(eos);
588 }
589
590 size_t
CountServersForSlot(Uint2 slot)591 CNCDistributionConf::CountServersForSlot(Uint2 slot)
592 {
593 return s_MirrorConf->s_Slot2Servers[slot].size();
594 }
595
596 void
GetServersForSlot(Uint2 slot,TServersList & lst)597 CNCDistributionConf::GetServersForSlot(Uint2 slot, TServersList& lst)
598 {
599 TSrvGroupsList srvs = s_MirrorConf->s_Slot2Servers[slot];
600 random_device rd;
601 mt19937 mt(rd());
602 shuffle(srvs.begin(), srvs.end(), mt);
603 lst.clear();
604 for (size_t i = 0; i < srvs.size(); ++i) {
605 if (srvs[i].grp == s_SelfGroup)
606 lst.push_back(srvs[i].srv_id);
607 }
608 for (size_t i = 0; i < srvs.size(); ++i) {
609 if (srvs[i].grp != s_SelfGroup)
610 lst.push_back(srvs[i].srv_id);
611 }
612 }
613
614 const TServersList&
GetRawServersForSlot(Uint2 slot)615 CNCDistributionConf::GetRawServersForSlot(Uint2 slot)
616 {
617 return s_MirrorConf->s_RawSlot2Servers[slot];
618 }
619 const vector<Uint2>&
GetSlotsForServer(Uint8 srv_id)620 CNCDistributionConf::GetSlotsForServer(Uint8 srv_id)
621 {
622 return s_MirrorConf->s_Servers2Slots[srv_id];
623 }
GetMaxSlotNumber(void)624 Uint2 CNCDistributionConf::GetMaxSlotNumber(void) {
625 return s_MaxSlotNumber;
626 }
AddServerSlots(set<Uint2> & slots,Uint8 srv_id)627 void CNCDistributionConf::AddServerSlots(set<Uint2>& slots, Uint8 srv_id)
628 {
629 const vector<Uint2>& srv_slots(
630 (srv_id == s_SelfID || srv_id == 0) ? s_SelfSlots : s_MirrorConf->s_Servers2Slots[srv_id]);
631 for( const Uint2 s : srv_slots) {
632 slots.insert(s);
633 }
634 }
635
636 const vector<Uint2>&
GetCommonSlots(Uint8 server)637 CNCDistributionConf::GetCommonSlots(Uint8 server)
638 {
639 return s_MirrorConf->s_CommonSlots[server];
640 }
641
642 bool
HasCommonSlots(Uint8 server)643 CNCDistributionConf::HasCommonSlots(Uint8 server)
644 {
645 return !s_MirrorConf->s_CommonSlots[server].empty();
646 }
647
648 Uint8
GetSelfID(void)649 CNCDistributionConf::GetSelfID(void)
650 {
651 return s_SelfID;
652 }
653
GetSelfTrustLevel(void)654 Uint8 CNCDistributionConf::GetSelfTrustLevel(void)
655 {
656 return s_SelfTrust & 0xF;
657 }
658
659 const TNCPeerList&
GetPeers(void)660 CNCDistributionConf::GetPeers(void)
661 {
662 return s_MirrorConf->s_Peers;
663 }
664
665 bool
HasPeers(void)666 CNCDistributionConf::HasPeers(void)
667 {
668 return !s_MirrorConf->s_Peers.empty();
669 }
670
671 string
GetPeerNameOrEmpty(Uint8 srv_id)672 CNCDistributionConf::GetPeerNameOrEmpty(Uint8 srv_id)
673 {
674 string name;
675 if (srv_id == s_SelfID) {
676 name = s_SelfName;
677 }
678 else {
679 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
680 if (peers.find(srv_id) != peers.end()) {
681 name = peers.find(srv_id)->second;
682 }
683 }
684 return name;
685 }
686
687 string
GetPeerName(Uint8 srv_id)688 CNCDistributionConf::GetPeerName(Uint8 srv_id)
689 {
690 string name(GetPeerNameOrEmpty(srv_id));
691 if (name.empty()) {
692 name = CNCPeerControl::GetPeerNameOrEmpty(srv_id);
693 if (name.empty()) {
694 name = "unknown_server";
695 }
696 }
697 return name;
698 }
699
700 string
GetFullPeerName(Uint8 srv_id)701 CNCDistributionConf::GetFullPeerName(Uint8 srv_id)
702 {
703 string name( GetPeerName(srv_id));
704 name += " (";
705 name += NStr::UInt8ToString(srv_id);
706 name += ") ";
707 return name;
708 }
709
710
711 void
GetPeerServers(TServersList & lst)712 CNCDistributionConf::GetPeerServers(TServersList& lst)
713 {
714 lst.clear();
715 const TNCPeerList& peers = CNCDistributionConf::GetPeers();
716 ITERATE(TNCPeerList, it_peer, peers) {
717 if (GetSelfID() != it_peer->first) {
718 lst.push_back(it_peer->first);
719 }
720 }
721 }
722
723 void
GenerateBlobKey(Uint2 local_port,string & key,Uint2 & slot,Uint2 & time_bucket,unsigned int ver)724 CNCDistributionConf::GenerateBlobKey(Uint2 local_port,
725 string& key, Uint2& slot, Uint2& time_bucket,
726 unsigned int ver)
727 {
728 s_KeyRndLock.Lock();
729 Uint4 rnd_num = s_KeyRnd.GetRand();
730 s_KeyRndLock.Unlock();
731
732 Uint2 cnt_pieces = Uint2(s_SelfSlots.size());
733 Uint4 piece_share = (CRandom::GetMax() + 1) / cnt_pieces + 1;
734 Uint2 index = rnd_num / piece_share;
735 rnd_num -= index * piece_share;
736 slot = s_SelfSlots[index];
737 Uint4 remain = rnd_num % s_SlotRndShare;
738 Uint4 key_rnd = (slot - 1) * s_SlotRndShare + remain;
739 time_bucket = min(
740 Uint2((slot - 1) * s_CntSlotBuckets + remain / s_TimeRndShare + 1),
741 CNCDistributionConf::GetCntTimeBuckets());
742 CNetCacheKey::GenerateBlobKey(&key,
743 static_cast<Uint4>(s_BlobId.Add(1)),
744 s_SelfHostIP, local_port, ver, key_rnd);
745 }
746
747 bool
GetSlotByKey(const string & key,Uint2 & slot,Uint2 & time_bucket)748 CNCDistributionConf::GetSlotByKey(const string& key, Uint2& slot, Uint2& time_bucket)
749 {
750 if (key[0] == '\1')
751 // NetCache-generated key
752 return GetSlotByNetCacheKey(key, slot, time_bucket);
753 else {
754 // ICache key provided by client
755 GetSlotByICacheKey(CNCBlobKeyLight(key), slot, time_bucket);
756 return true;
757 }
758 }
759
760 bool
GetSlotByNetCacheKey(const string & key,Uint2 & slot,Uint2 & time_bucket)761 CNCDistributionConf::GetSlotByNetCacheKey(const string& key,
762 Uint2& slot, Uint2& time_bucket)
763 {
764 size_t ind = 0;
765 unsigned key_rnd = 0;
766 #if 0
767 #define SKIP_UNDERSCORE(key, ind) \
768 ind = key.find('_', ind + 1); \
769 if (ind == string::npos) \
770 return false;
771
772 SKIP_UNDERSCORE(key, ind); // version
773 SKIP_UNDERSCORE(key, ind); // id
774 SKIP_UNDERSCORE(key, ind); // host
775 SKIP_UNDERSCORE(key, ind); // port
776 SKIP_UNDERSCORE(key, ind); // time
777 SKIP_UNDERSCORE(key, ind); // random
778 #else
779 ind = key.rfind('_');
780 if (ind == string::npos) {
781 goto on_error;
782 }
783 #endif
784 ++ind;
785 key_rnd = NStr::StringToUInt(
786 CTempString(&key[ind], key.size() - ind),
787 NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols);
788 if (key_rnd == 0 && errno != 0) {
789 goto on_error;
790 }
791 GetSlotByRnd(key_rnd, slot, time_bucket);
792 return true;
793
794 on_error:
795 CNCBlobKeyLight kl(key);
796 CNetCacheKey tmp;
797 if (!CNetCacheKey::ParseBlobKey(kl.RawKey().data(), kl.RawKey().size(), &tmp)) {
798 SRV_LOG(Critical, "CNetCacheKey failed to parse blob key: " << key);
799 return false;
800 }
801 SRV_LOG(Error, "NetCache failed to parse blob key: " << key);
802 GetSlotByRnd(tmp.GetRandomPart(), slot, time_bucket);
803 return true;
804 }
805
806 void
GetSlotByICacheKey(const CNCBlobKeyLight & key,Uint2 & slot,Uint2 & time_bucket)807 CNCDistributionConf::GetSlotByICacheKey(const CNCBlobKeyLight& key,
808 Uint2& slot, Uint2& time_bucket)
809 {
810 CChecksum crc32(CChecksum::eCRC32);
811
812 if (s_SlotByRawkey) {
813 crc32.AddChars(key.RawKey().data(), key.RawKey().size());
814 } else {
815 crc32.AddChars(key.PackedKey().data(), key.PackedKey().size());
816 }
817
818 GetSlotByRnd(crc32.GetChecksum(), slot, time_bucket);
819 }
820
821 void
GetSlotByRnd(Uint4 key_rnd,Uint2 & slot,Uint2 & time_bucket)822 CNCDistributionConf::GetSlotByRnd(Uint4 key_rnd,
823 Uint2& slot, Uint2& time_bucket)
824 {
825 // Slot numbers are 1-based
826 slot = Uint2(key_rnd / s_SlotRndShare) + 1;
827 time_bucket = min(
828 Uint2((slot - 1) * s_CntSlotBuckets + key_rnd % s_SlotRndShare / s_TimeRndShare + 1),
829 CNCDistributionConf::GetCntTimeBuckets());
830 }
831
GetSrvIdByIP(Uint4 ip)832 Uint8 CNCDistributionConf::GetSrvIdByIP(Uint4 ip)
833 {
834 SSrvMirrorInfo* mirrorConf = ACCESS_ONCE(s_MirrorConf);
835 map<Uint4, Uint8>::const_iterator i = mirrorConf->s_IpToId.find(ip);
836 if (i != mirrorConf->s_IpToId.end()) {
837 return i->second;
838 }
839 return 0;
840 }
841
842 Uint8
GetMainSrvId(const CNCBlobKey & key)843 CNCDistributionConf::GetMainSrvId(const CNCBlobKey& key)
844 {
845 return key.IsICacheKey() ? 0 : GetSrvIdByIP( GetMainSrvIP(key));
846 }
847
848
849 bool
IsThisServerKey(const string & packed_key)850 CNCDistributionConf::IsThisServerKey(const string& packed_key)
851 {
852 CNCBlobKeyLight kl(packed_key);
853 CNCBlobKey blob_key;
854 blob_key.Assign(kl.Cache(), kl.RawKey(), kl.SubKey());
855 return CNCDistributionConf::IsThisServerKey(blob_key);
856 }
857
858 bool
IsThisServerKey(const CNCBlobKey & key)859 CNCDistributionConf::IsThisServerKey(const CNCBlobKey& key)
860 {
861 if (key.IsICacheKey()) {
862 return true;
863 }
864 return (key.KeyVersion() == 3) ?
865 (s_SelfAlias == key.GetHostPortCRC32()) :
866 (s_SelfHostIP == key.GetHost());
867 }
868
869 Uint4
GetMainSrvIP(const CNCBlobKey & key)870 CNCDistributionConf::GetMainSrvIP(const CNCBlobKey& key)
871 {
872 try {
873 if (key.KeyVersion() == 3) {
874 Uint4 alias = key.GetHostPortCRC32();
875 return (alias == s_SelfAlias) ? s_SelfIP :
876 CNCPeerControl::FindIPbyAlias(alias);
877 }
878 const string& host_str(key.GetHost());
879 if (s_SelfHostIP == host_str) {
880 return s_SelfIP;
881 }
882 Uint4 ip = CNCPeerControl::FindIPbyName(host_str);
883 if (ip != 0) {
884 return ip;
885 }
886 return CTaskServer::GetIPByHost(host_str);
887 }
888 catch (...) {
889 return 0;
890 }
891 }
892
893 Uint4
CreateHostAlias(Uint4 ip,Uint4 port)894 CNCDistributionConf::CreateHostAlias(Uint4 ip, Uint4 port)
895 {
896 string ip_str(CTaskServer::IPToString(ip));
897 return CNetCacheKey::CalculateChecksum(ip_str, port);
898 }
899
900 bool
IsServedLocally(Uint2 slot)901 CNCDistributionConf::IsServedLocally(Uint2 slot)
902 {
903 return find(s_SelfSlots.begin(), s_SelfSlots.end(), slot) != s_SelfSlots.end();
904 }
905
906 Uint2
GetCntSlotBuckets(void)907 CNCDistributionConf::GetCntSlotBuckets(void)
908 {
909 return s_CntSlotBuckets;
910 }
911
912 Uint2
GetCntTimeBuckets(void)913 CNCDistributionConf::GetCntTimeBuckets(void)
914 {
915 return s_CntTimeBuckets;
916 }
917
918 const vector<Uint2>&
GetSelfSlots(void)919 CNCDistributionConf::GetSelfSlots(void)
920 {
921 return s_SelfSlots;
922 }
923
924 const string&
GetMirroringSizeFile(void)925 CNCDistributionConf::GetMirroringSizeFile(void)
926 {
927 return s_MirroringSizeFile;
928 }
929
930 const string&
GetPeriodicLogFile(void)931 CNCDistributionConf::GetPeriodicLogFile(void)
932 {
933 return s_PeriodicLogFile;
934 }
935
936 Uint1
GetCntActiveSyncs(void)937 CNCDistributionConf::GetCntActiveSyncs(void)
938 {
939 return s_CntActiveSyncs;
940 }
941
942 Uint1
GetMaxSyncsOneServer(void)943 CNCDistributionConf::GetMaxSyncsOneServer(void)
944 {
945 return s_MaxSyncsOneServer;
946 }
947
948 Uint4
GetSyncPriority(void)949 CNCDistributionConf::GetSyncPriority(void)
950 {
951 return CNCServer::IsInitiallySynced()? s_TaskPrioritySync : GetDefaultTaskPriority();
952 }
953
954 Uint2
GetMaxPeerTotalConns(void)955 CNCDistributionConf::GetMaxPeerTotalConns(void)
956 {
957 return s_MaxPeerTotalConns;
958 }
959
960 Uint2
GetMaxPeerBGConns(void)961 CNCDistributionConf::GetMaxPeerBGConns(void)
962 {
963 return s_MaxPeerBGConns;
964 }
965
966 Uint2
GetCntErrorsToThrottle(void)967 CNCDistributionConf::GetCntErrorsToThrottle(void)
968 {
969 return s_CntErrorsToThrottle;
970 }
971
972 Uint2
GetCntThrottlesToIpchange(void)973 CNCDistributionConf::GetCntThrottlesToIpchange(void)
974 {
975 return s_CntThrottlesToIpchange;
976 }
977
978 Uint8
GetPeerThrottlePeriod(void)979 CNCDistributionConf::GetPeerThrottlePeriod(void)
980 {
981 return s_PeerThrottlePeriod;
982 }
983
984 Uint1
GetPeerTimeout(void)985 CNCDistributionConf::GetPeerTimeout(void)
986 {
987 return s_PeerTimeout;
988 }
989
990 Uint1
GetBlobListTimeout(void)991 CNCDistributionConf::GetBlobListTimeout(void)
992 {
993 return s_BlobListTimeout;
994 }
995
996 Uint8
GetSmallBlobBoundary(void)997 CNCDistributionConf::GetSmallBlobBoundary(void)
998 {
999 return s_SmallBlobBoundary;
1000 }
1001
1002 Uint2
GetMaxMirrorQueueSize(void)1003 CNCDistributionConf::GetMaxMirrorQueueSize(void)
1004 {
1005 return s_MaxMirrorQueueSize;
1006 }
1007
1008 const string&
GetSyncLogFileName(void)1009 CNCDistributionConf::GetSyncLogFileName(void)
1010 {
1011 return s_SyncLogFileName;
1012 }
1013
1014 Uint4
GetMaxSlotLogEvents(void)1015 CNCDistributionConf::GetMaxSlotLogEvents(void)
1016 {
1017 return s_MaxSlotLogEvents;
1018 }
1019
1020 Uint4
GetCleanLogReserve(void)1021 CNCDistributionConf::GetCleanLogReserve(void)
1022 {
1023 return s_CleanLogReserve;
1024 }
1025
1026 Uint4
GetMaxCleanLogBatch(void)1027 CNCDistributionConf::GetMaxCleanLogBatch(void)
1028 {
1029 return s_MaxCleanLogBatch;
1030 }
1031
1032 Uint8
GetMinForcedCleanPeriod(void)1033 CNCDistributionConf::GetMinForcedCleanPeriod(void)
1034 {
1035 return s_MinForcedCleanPeriod;
1036 }
1037
1038 Uint4
GetCleanAttemptInterval(void)1039 CNCDistributionConf::GetCleanAttemptInterval(void)
1040 {
1041 return s_CleanAttemptInterval;
1042 }
1043
1044 Uint8
GetPeriodicSyncInterval(void)1045 CNCDistributionConf::GetPeriodicSyncInterval(void)
1046 {
1047 return s_PeriodicSyncInterval;
1048 }
1049
1050 Uint8
GetPeriodicSyncHeadTime(void)1051 CNCDistributionConf::GetPeriodicSyncHeadTime(void)
1052 {
1053 //return s_PeriodicSyncHeadTime;
1054 return 0;
1055 }
1056
1057 Uint8
GetPeriodicSyncTailTime(void)1058 CNCDistributionConf::GetPeriodicSyncTailTime(void)
1059 {
1060 //return s_PeriodicSyncTailTime;
1061 return 0;
1062 }
1063
1064 Uint8
GetPeriodicSyncTimeout(void)1065 CNCDistributionConf::GetPeriodicSyncTimeout(void)
1066 {
1067 return s_PeriodicSyncTimeout;
1068 }
1069
1070 Uint8
GetFailedSyncRetryDelay(void)1071 CNCDistributionConf::GetFailedSyncRetryDelay(void)
1072 {
1073 return s_FailedSyncRetryDelay;
1074 }
1075
1076 Uint8
GetNetworkErrorTimeout(void)1077 CNCDistributionConf::GetNetworkErrorTimeout(void)
1078 {
1079 return s_NetworkErrorTimeout;
1080 }
1081 Uint8
GetMaxBlobSizeSync(void)1082 CNCDistributionConf::GetMaxBlobSizeSync(void)
1083 {
1084 return s_MaxBlobSizeSync;
1085 }
1086 bool
GetWarnBlobSizeSync(void)1087 CNCDistributionConf::GetWarnBlobSizeSync(void)
1088 {
1089 return s_WarnBlobSizeSync;
1090 }
1091 bool
GetBlobUpdateHotline(void)1092 CNCDistributionConf::GetBlobUpdateHotline(void)
1093 {
1094 return s_BlobUpdateHotline;
1095 }
1096
1097 void
PrintBlobCopyStat(Uint8 create_time,Uint8 create_server,Uint8 write_server)1098 CNCDistributionConf::PrintBlobCopyStat(Uint8 create_time, Uint8 create_server, Uint8 write_server)
1099 {
1100 if (s_CopyDelayLog) {
1101 Uint8 cur_time = CSrvTime::Current().AsUSec();
1102 fprintf(s_CopyDelayLog,
1103 "%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
1104 ",%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC "\n",
1105 cur_time, create_server, write_server, cur_time - create_time);
1106 }
1107 }
1108
1109
1110 END_NCBI_SCOPE
1111