1 /*  $Id: distribution_conf.cpp 633628 2021-06-22 18:24:03Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Denis Vakatov, Pavel Ivanov, Sergey Satskiy
27  *
28  * File Description: Data structures and API to support blobs mirroring.
29  *
30  */
31 
32 
33 #include "nc_pch.hpp"
34 
35 #include <corelib/ncbireg.hpp>
36 #include <util/checksum.hpp>
37 #include <util/random_gen.hpp>
38 
39 #include "task_server.hpp"
40 
41 #include "netcached.hpp"
42 #include "distribution_conf.hpp"
43 #include "peer_control.hpp"
44 #include <random>
45 
46 BEGIN_NCBI_SCOPE
47 
48 
49 struct SSrvGroupInfo
50 {
51     Uint8   srv_id;
52     string  grp;
53 
SSrvGroupInfoSSrvGroupInfo54     SSrvGroupInfo(Uint8 srv, const string& group)
55         : srv_id(srv), grp(group)
56     {}
57 };
58 
59 typedef vector<SSrvGroupInfo>       TSrvGroupsList;
60 typedef map<Uint2, TSrvGroupsList>  TSrvGroupsMap;
61 
62 typedef map<Uint2, TServersList>    TSlot2SrvMap;
63 typedef map<Uint8, vector<Uint2> >  TSrv2SlotMap;
64 
65 struct SSrvMirrorInfo
66 {
67     TNCPeerList   s_Peers;
68     TSlot2SrvMap  s_RawSlot2Servers;
69     TSrvGroupsMap s_Slot2Servers;
70     TSrv2SlotMap  s_CommonSlots;
71     TSrv2SlotMap  s_Servers2Slots;
72 // ID is defined by IP and port, so this map might be incorrect
73 // but we also check the current slot number
74 // while it is possible to handle same slot on different instances of NC
75 // on the same machine (same IP), it does not make sense and should be avoided
76     map<Uint4, Uint8> s_IpToId;
77 // server credentials
78     map<Uint8, Uint8> s_SrvTrust;
79 };
80 static SSrvMirrorInfo* s_MirrorConf = NULL;
81 #if 0
82 static CMiniMutex s_MirrorConf;
83 static TNCPeerList s_Peers;
84 static TSlot2SrvMap s_RawSlot2Servers;
85 static TSrvGroupsMap s_Slot2Servers;
86 static TSrv2SlotMap s_CommonSlots;
87 #endif
88 static Uint2    s_MaxSlotNumber = 0;
89 
90 static vector<Uint2> s_SelfSlots;
91 static Uint2    s_CntSlotBuckets = 0;
92 static Uint2    s_CntTimeBuckets = 0;
93 static Uint4    s_SlotRndShare  = numeric_limits<Uint4>::max();
94 static Uint4    s_TimeRndShare  = numeric_limits<Uint4>::max();
95 static Uint8    s_SelfID        = 0;
96 static Uint8    s_SelfTrust     = 0;
97 static Uint4    s_SelfIP        = 0;
98 static string   s_SelfGroup;
99 static string   s_SelfName;
100 static CMiniMutex s_KeyRndLock;
101 static CRandom  s_KeyRnd(CRandom::TValue(time(NULL)));
102 static string   s_SelfHostIP;
103 static CAtomicCounter s_BlobId;
104 static Uint4    s_SelfAlias = 0;
105 static string   s_MirroringSizeFile;
106 static string   s_PeriodicLogFile;
107 static string   s_CopyDelayLogFile;
108 static FILE*    s_CopyDelayLog = NULL;
109 static Uint1    s_CntActiveSyncs = 4;
110 static Uint1    s_MaxSyncsOneServer = 2;
111 static Uint4    s_TaskPrioritySync = 10;
112 static Uint2    s_MaxPeerTotalConns = 100;
113 static Uint2    s_MaxPeerBGConns = 50;
114 static Uint2    s_CntErrorsToThrottle = 10;
115 static Uint2    s_CntThrottlesToIpchange = 10;
116 static Uint8    s_PeerThrottlePeriod = 10 * kUSecsPerSecond;
117 static Uint1    s_PeerTimeout = 2;
118 static Uint1    s_BlobListTimeout = 10;
119 static Uint8    s_SmallBlobBoundary = 65535;
120 static Uint2    s_MaxMirrorQueueSize = 10000;
121 static string   s_SyncLogFileName;
122 static Uint4    s_MaxSlotLogEvents = 0;
123 static Uint4    s_CleanLogReserve = 0;
124 static Uint4    s_MaxCleanLogBatch = 0;
125 static Uint8    s_MinForcedCleanPeriod = 0;
126 static Uint4    s_CleanAttemptInterval = 0;
127 static Uint8    s_PeriodicSyncInterval = 0;
128 //static Uint8    s_PeriodicSyncHeadTime;
129 //static Uint8    s_PeriodicSyncTailTime;
130 static Uint8    s_PeriodicSyncTimeout = 0;
131 static Uint8    s_FailedSyncRetryDelay = 0;
132 static Uint8    s_NetworkErrorTimeout = 0;
133 static Uint8    s_MaxBlobSizeSync = 0;
134 static bool     s_WarnBlobSizeSync = true;
135 static bool     s_BlobUpdateHotline = true;
136 static bool     s_SlotByRawkey = false;
137 
138 static const char*  kNCReg_NCPoolSection       = "mirror";
139 static string       kNCReg_NCServerPrefix      = "server_";
140 static string       kNCReg_NCServerSlotsPrefix = "srv_slots_";
141 static string       kNCReg_NCServerTrustPrefix = "srv_trust_";
142 
143 
144 bool
Initialize(Uint2 control_port)145 CNCDistributionConf::Initialize(Uint2 control_port)
146 {
147     string log_pfx("Bad configuration: ");
148     const CNcbiRegistry& reg = CTaskServer::GetConfRegistry();
149 
150     Uint4 self_host = CTaskServer::GetIPByHost(CTaskServer::GetHostName());
151     s_SelfHostIP = CTaskServer::IPToString(self_host);
152     s_SelfIP = self_host;
153     s_SelfID = (Uint8(self_host) << 32) + control_port;
154     s_SelfAlias = CNCDistributionConf::CreateHostAlias(self_host, control_port);
155     s_BlobId.Set(0);
156 
157     string err_message;
158     if (!InitMirrorConfig(reg, err_message)) {
159         SRV_LOG(Critical, log_pfx << err_message);
160         return false;
161     }
162 
163     if (s_MaxSlotNumber <= 1) {
164         s_MaxSlotNumber = 1;
165         s_SlotRndShare = numeric_limits<Uint4>::max();
166     }
167     else {
168         s_SlotRndShare = numeric_limits<Uint4>::max() / s_MaxSlotNumber + 1;
169     }
170 
171     s_MirroringSizeFile = reg.Get(kNCReg_NCPoolSection, "mirroring_log_file");
172     s_PeriodicLogFile   = reg.Get(kNCReg_NCPoolSection, "periodic_log_file");
173 
174     s_CopyDelayLogFile = reg.Get(kNCReg_NCPoolSection, "copy_delay_log_file");
175     if (!s_CopyDelayLogFile.empty()) {
176         s_CopyDelayLog = fopen(s_CopyDelayLogFile.c_str(), "a");
177     }
178 
179     try {
180         s_CntSlotBuckets = reg.GetInt(kNCReg_NCPoolSection, "cnt_slot_buckets", 10);
181         if (numeric_limits<Uint2>::max() / s_CntSlotBuckets < s_MaxSlotNumber) {
182             SRV_LOG(Critical, log_pfx << "too many buckets per slot ("
183                               << s_CntSlotBuckets << ") with given number of slots ("
184                               << s_MaxSlotNumber << ").");
185             return false;
186         }
187         s_CntTimeBuckets = s_CntSlotBuckets * s_MaxSlotNumber;
188         s_TimeRndShare = s_SlotRndShare / s_CntSlotBuckets + 1;
189         s_CntActiveSyncs = reg.GetInt(kNCReg_NCPoolSection, "max_active_syncs", 4);
190         s_MaxSyncsOneServer = reg.GetInt(kNCReg_NCPoolSection, "max_syncs_one_server", 2);
191         s_TaskPrioritySync = reg.GetInt(kNCReg_NCPoolSection, "task_priority_sync", 10);
192         s_MaxPeerTotalConns = reg.GetInt(kNCReg_NCPoolSection, "max_peer_connections", 100);
193         s_MaxPeerBGConns = reg.GetInt(kNCReg_NCPoolSection, "max_peer_bg_connections", 50);
194         s_CntErrorsToThrottle = reg.GetInt(kNCReg_NCPoolSection, "peer_errors_for_throttle", 10);
195         s_CntThrottlesToIpchange = reg.GetInt(kNCReg_NCPoolSection, "peer_throttles_for_ip_change", 10);
196         s_PeerThrottlePeriod = reg.GetInt(kNCReg_NCPoolSection, "peer_throttle_period", 10);
197         s_PeerThrottlePeriod *= kUSecsPerSecond;
198         s_PeerTimeout = reg.GetInt(kNCReg_NCPoolSection, "peer_communication_timeout", 2);
199         s_BlobListTimeout = reg.GetInt(kNCReg_NCPoolSection, "peer_blob_list_timeout", 10);
200         s_SmallBlobBoundary = reg.GetInt(kNCReg_NCPoolSection, "small_blob_max_size", 100);
201         s_SmallBlobBoundary *= 1000;
202         s_MaxMirrorQueueSize = reg.GetInt(kNCReg_NCPoolSection, "max_instant_queue_size", 10000);
203 
204         s_SyncLogFileName = reg.GetString(kNCReg_NCPoolSection, "sync_log_file", "./cache/sync_events.log");
205         s_MaxSlotLogEvents = reg.GetInt(kNCReg_NCPoolSection, "max_slot_log_records", 100000);
206         if (s_MaxSlotLogEvents < 10)
207             s_MaxSlotLogEvents = 10;
208         s_CleanLogReserve = reg.GetInt(kNCReg_NCPoolSection, "clean_slot_log_reserve", 1000);
209         if (s_CleanLogReserve >= s_MaxSlotLogEvents)
210             s_CleanLogReserve = s_MaxSlotLogEvents - 1;
211         s_MaxCleanLogBatch = reg.GetInt(kNCReg_NCPoolSection, "max_clean_log_batch", 10000);
212         s_MinForcedCleanPeriod = reg.GetInt(kNCReg_NCPoolSection, "min_forced_clean_log_period", 10);
213         s_MinForcedCleanPeriod *= kUSecsPerSecond;
214         s_CleanAttemptInterval = reg.GetInt(kNCReg_NCPoolSection, "clean_log_attempt_interval", 1);
215         s_PeriodicSyncInterval = reg.GetInt(kNCReg_NCPoolSection, "deferred_sync_interval", 10);
216         s_PeriodicSyncInterval *= kUSecsPerSecond;
217         s_PeriodicSyncTimeout = reg.GetInt(kNCReg_NCPoolSection, "deferred_sync_timeout", 10);
218         s_PeriodicSyncTimeout *= kUSecsPerSecond;
219         s_FailedSyncRetryDelay = reg.GetInt(kNCReg_NCPoolSection, "failed_sync_retry_delay", 1);
220         s_FailedSyncRetryDelay *= kUSecsPerSecond;
221         s_NetworkErrorTimeout = reg.GetInt(kNCReg_NCPoolSection, "network_error_timeout", 300);
222         s_NetworkErrorTimeout *= kUSecsPerSecond;
223         s_MaxBlobSizeSync = NStr::StringToUInt8_DataSize(reg.GetString(
224                            kNCReg_NCPoolSection, "max_blob_size_sync", "1 GB"));
225         s_WarnBlobSizeSync  =  reg.GetBool( kNCReg_NCPoolSection, "warn_blob_size_sync", true);
226         if (s_MaxBlobSizeSync == 0) {
227             s_WarnBlobSizeSync = false;
228         }
229         s_BlobUpdateHotline =  reg.GetBool( kNCReg_NCPoolSection, "blob_update_hotline", true);
230         s_SlotByRawkey = reg.GetBool( kNCReg_NCPoolSection, "slot_calculation_by_key_only", false);
231 
232         if (s_WarnBlobSizeSync && s_SmallBlobBoundary > s_MaxBlobSizeSync) {
233             SRV_LOG(Critical, log_pfx << "small_blob_max_size ("
234                               << s_SmallBlobBoundary << ") is greater than max_blob_size_sync ("
235                               << s_MaxBlobSizeSync << ").");
236             return false;
237         }
238     }
239     catch (CStringException& ex) {
240         SRV_LOG(Critical, log_pfx  << ex);
241         return false;
242     }
243     return true;
244 }
245 
246 bool
InitMirrorConfig(const CNcbiRegistry & reg,string & err_message)247 CNCDistributionConf::InitMirrorConfig(const CNcbiRegistry& reg, string& err_message)
248 {
249     SSrvMirrorInfo* mirrorCfg = new SSrvMirrorInfo;
250     SSrvMirrorInfo* prevMirrorCfg = ACCESS_ONCE(s_MirrorConf);
251     bool isReconf = prevMirrorCfg != NULL;
252     vector<Uint2> self_slots;
253     mirrorCfg->s_IpToId[s_SelfIP] = s_SelfID;
254 
255     string reg_value;
256     bool found_self = false;
257     for (int srv_idx = 0; ; ++srv_idx) {
258         string value_name = kNCReg_NCServerPrefix + NStr::NumericToString(srv_idx);
259         reg_value = reg.Get(kNCReg_NCPoolSection, value_name.c_str());
260         if (reg_value.empty())
261             break;
262 
263         string srv_name = reg_value;
264         list<CTempString> srv_fields;
265         ncbi_NStr_Split(reg_value, ":", srv_fields);
266         if (srv_fields.size() != 3) {
267             err_message = srv_name + ": Invalid peer server specification";
268             goto do_error;
269         }
270         list<CTempString>::const_iterator it_fields = srv_fields.begin();
271         string grp_name = *it_fields;
272         ++it_fields;
273         string host_str = *it_fields;
274         Uint4 host = CTaskServer::GetIPByHost(host_str);
275         ++it_fields;
276         string port_str = *it_fields;
277         Uint2 port = NStr::StringToUInt(port_str, NStr::fConvErr_NoThrow);
278         if (host == 0  ||  port == 0) {
279             err_message = srv_name + ": Host not found";
280             goto do_error;
281         }
282 
283         Uint8 trustlevel = 0;
284         value_name = kNCReg_NCServerTrustPrefix + NStr::NumericToString(srv_idx);
285         reg_value = reg.Get(kNCReg_NCPoolSection, value_name.c_str());
286         if (!reg_value.empty()) {
287             trustlevel = NStr::StringToUInt8(reg_value, NStr::fConvErr_NoThrow);
288         }
289 
290         Uint8 srv_id = (Uint8(host) << 32) + port;
291         string peer_str = host_str + ":" + port_str;
292         if (srv_id == s_SelfID) {
293             if (found_self) {
294                 err_message = srv_name + ": Host described twice";
295                 goto do_error;
296             }
297             found_self = true;
298             s_SelfTrust = trustlevel;
299             if (isReconf) {
300                 if (s_SelfGroup != grp_name || s_SelfName != peer_str) {
301                     err_message = srv_name + ": Changes in self description prohibited (group or name)";
302                     goto do_error;
303                 }
304             } else {
305                 s_SelfGroup = grp_name;
306                 s_SelfName = peer_str;
307             }
308         }
309         else {
310             if (mirrorCfg->s_Peers.find(srv_id) != mirrorCfg->s_Peers.end()) {
311                 err_message = srv_name +  + ": Described twice";
312                 goto do_error;
313             }
314             mirrorCfg->s_Peers[srv_id] = peer_str;
315             mirrorCfg->s_SrvTrust[srv_id] = trustlevel;
316         }
317 
318         // There must be corresponding description of slots
319         value_name = kNCReg_NCServerSlotsPrefix + NStr::NumericToString(srv_idx);
320         reg_value = reg.Get(kNCReg_NCPoolSection, value_name.c_str());
321         if (reg_value.empty()) {
322             err_message = srv_name + ": No slots for server";
323             goto do_error;
324         }
325 
326         list<string> values;
327         ncbi_NStr_Split(reg_value, ",", values);
328         ITERATE(list<string>, it, values) {
329             Uint2 slot = NStr::StringToUInt(*it,
330                 NStr::fConvErr_NoThrow | NStr::fAllowLeadingSpaces | NStr::fAllowTrailingSpaces);
331             if (slot == 0) {
332                 err_message = srv_name + ": Bad slot number: " + string(*it);
333                 goto do_error;
334             }
335             TServersList& srvs = mirrorCfg->s_RawSlot2Servers[slot];
336             if (find(srvs.begin(), srvs.end(), srv_id) != srvs.end()) {
337                 err_message = srv_name + ": Slot listed twice: " + string(*it);
338                 goto do_error;
339             }
340             if (srv_id == s_SelfID) {
341                 vector<Uint2>& slots = isReconf ? self_slots : s_SelfSlots;
342                 if (find(slots.begin(), slots.end(), slot) != slots.end()) {
343                     err_message = srv_name + ": Slot listed twice: " + string(*it);
344                     goto do_error;
345                 }
346                 slots.push_back(slot);
347             } else {
348                 srvs.push_back(srv_id);
349                 mirrorCfg->s_Slot2Servers[slot].push_back(SSrvGroupInfo(srv_id, grp_name));
350                 mirrorCfg->s_Servers2Slots[srv_id].push_back(slot);
351             }
352             if (isReconf) {
353                 if (slot > s_MaxSlotNumber) {
354                     err_message = srv_name + ": Slot numbers cannot exceed " + NStr::NumericToString(s_MaxSlotNumber);
355                     goto do_error;
356                 }
357             } else {
358                 s_MaxSlotNumber = max(slot, s_MaxSlotNumber);
359             }
360         }
361     }
362     if (!found_self) {
363         if (mirrorCfg->s_Peers.size() != 0) {
364             err_message = CTaskServer::GetHostName() + ": Self description not found";
365             goto do_error;
366         }
367         if (!isReconf) {
368             s_SelfSlots.push_back(1);
369             s_SelfGroup = "grp1";
370         }
371     }
372     if (isReconf && !self_slots.empty()) {
373         if (!std::equal(s_SelfSlots.begin(), s_SelfSlots.end(), self_slots.begin())) {
374             err_message = s_SelfName + ": Changes in self description prohibited (slots)";
375             goto do_error;
376         }
377     }
378 
379     ITERATE(TNCPeerList, it_peer, mirrorCfg->s_Peers)  {
380         Uint8 srv_id = it_peer->first;
381         vector<Uint2>& common_slots = mirrorCfg->s_CommonSlots[srv_id];
382         ITERATE(TSlot2SrvMap, it_slot, mirrorCfg->s_RawSlot2Servers) {
383             Uint2 slot = it_slot->first;
384             if (find(s_SelfSlots.begin(), s_SelfSlots.end(), slot) == s_SelfSlots.end())
385                 continue;
386             const TServersList& srvs = it_slot->second;
387             if (find(srvs.begin(), srvs.end(), srv_id) != srvs.end()) {
388                 common_slots.push_back(it_slot->first);
389                 Uint4 ip = (srv_id >> 32);
390                 if (ip != s_SelfIP) {
391                     mirrorCfg->s_IpToId[ Uint4(srv_id >> 32)] = srv_id;
392                 }
393             }
394         }
395     }
396 
397     if (AtomicCAS(s_MirrorConf, prevMirrorCfg, mirrorCfg)) {
398 // pre-create all peers
399         const TNCPeerList& peers = CNCDistributionConf::GetPeers();
400         ITERATE(TNCPeerList, p, peers) {
401             CNCPeerControl* peer = CNCPeerControl::Peer(p->first);
402             peer->SetHostProtocol(0);
403             peer->SetTrustLevel(s_MirrorConf->s_SrvTrust[p->first]);
404         }
405         return true;
406     }
407 do_error:
408     delete mirrorCfg;
409     return false;
410 }
411 
412 bool
ReConfig(const CNcbiRegistry & new_reg,string & err_message)413 CNCDistributionConf::ReConfig(const CNcbiRegistry& new_reg, string& err_message)
414 // we only add or remove peer servers, nothing else
415 {
416     if (!InitMirrorConfig(new_reg, err_message)) {
417         return false;
418     }
419 // modify old registry to remember the changes
420     CNcbiRegistry& old_reg = CTaskServer::SetConfRegistry();
421     // remove old peers
422     string value_name, value;
423     size_t srv_idx;
424     for (srv_idx = 0; ; ++srv_idx) {
425         value_name = kNCReg_NCServerPrefix + NStr::NumericToString(srv_idx);
426         value = old_reg.Get(kNCReg_NCPoolSection, value_name);
427         if (value.empty()) {
428             break;
429         }
430         old_reg.Set(kNCReg_NCPoolSection, value_name, kEmptyStr, CNcbiRegistry::fPersistent);
431         value_name = kNCReg_NCServerSlotsPrefix + NStr::NumericToString(srv_idx);
432         old_reg.Set(kNCReg_NCPoolSection, value_name, kEmptyStr, CNcbiRegistry::fPersistent);
433     }
434     // add new ones
435     for (srv_idx = 0; ; ++srv_idx) {
436         value_name = kNCReg_NCServerPrefix + NStr::NumericToString(srv_idx);
437         value = new_reg.Get(kNCReg_NCPoolSection, value_name);
438         if (value.empty())
439             break;
440         old_reg.Set(kNCReg_NCPoolSection, value_name, value, CNcbiRegistry::fPersistent);
441         value_name = kNCReg_NCServerSlotsPrefix + NStr::NumericToString(srv_idx);
442         value = new_reg.Get(kNCReg_NCPoolSection, value_name);
443         old_reg.Set(kNCReg_NCPoolSection, value_name, value, CNcbiRegistry::fPersistent);
444     }
445 
446 #if 0
447 // compare (I am not sure though that slot lists should be identical)
448     map<string, set<Uint2> >::const_iterator pn, po;
449     for(pn = new_peers.begin(); pn != new_peers.end(); ++pn) {
450         po = old_peers.find(pn->first);
451         if (po != old_peers.end()) {
452             if (pn->second.size() != po->second.size() ||
453                 !std::equal(pn->second.begin(), pn->second.end(), po->second.begin())) {
454                 err_message = pn->first + ": Slot lists differ in old and new configurations";
455                 return false;
456             }
457         }
458     }
459     for(po = old_peers.begin(); po != old_peers.end(); ++po) {
460         pn = new_peers.find(po->first);
461         if (pn != new_peers.end()) {
462             if (po->second.size() != pn->second.size() ||
463                 !std::equal(po->second.begin(), po->second.end(), pn->second.begin())) {
464                 err_message = po->first + ": Slot lists differ in old and new configurations";
465                 return false;
466             }
467         }
468     }
469 
470 // remove common peers
471     // here I assume common peers have identical slot lists
472     for(pn = new_peers.begin(); pn != new_peers.end(); ++pn) {
473         po = old_peers.find(pn->first);
474         if (po != old_peers.end()) {
475             old_peers.erase(po);
476         }
477     }
478     for(po = old_peers.begin(); po != old_peers.end(); ++po) {
479         pn = new_peers.find(po->first);
480         if (pn != new_peers.end()) {
481             new_peers.erase(pn);
482         }
483     }
484 #endif
485     return true;
486 }
487 
488 void
Finalize(void)489 CNCDistributionConf::Finalize(void)
490 {
491     if (s_CopyDelayLog)
492         fclose(s_CopyDelayLog);
493 }
494 
WriteSetup(CSrvSocketTask & task)495 void CNCDistributionConf::WriteSetup(CSrvSocketTask& task)
496 {
497     string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
498 
499     //self
500     task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteNumber(0).WriteText(iss);
501     task.WriteText(s_SelfName).WriteText("\"");
502     task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteText("id_").WriteNumber(0).WriteText(is);
503     task.WriteNumber(s_SelfID);
504     task.WriteText(eol).WriteText(kNCReg_NCServerTrustPrefix).WriteNumber(0).WriteText(is).WriteNumber(s_SelfTrust);
505     task.WriteText(eol).WriteText(kNCReg_NCServerSlotsPrefix).WriteNumber(0).WriteText(is).WriteText("[");
506     ITERATE( vector<Uint2>, s, s_SelfSlots) {
507         if (s != s_SelfSlots.begin()) {
508             task.WriteText(",");
509         }
510         task.WriteNumber(*s);
511     }
512     task.WriteText("]");
513 
514     // peers
515     int idx=1;
516     SSrvMirrorInfo* mirrorConf = ACCESS_ONCE(s_MirrorConf);
517     ITERATE( TNCPeerList, p, mirrorConf->s_Peers) {
518         task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteNumber(idx).WriteText(iss);
519         task.WriteText(p->second).WriteText("\"");
520         task.WriteText(eol).WriteText(kNCReg_NCServerPrefix).WriteText("id_").WriteNumber(idx).WriteText(is);
521         task.WriteNumber(p->first);
522         task.WriteText(eol).WriteText(kNCReg_NCServerTrustPrefix).WriteNumber(idx).WriteText(is);
523         task.WriteNumber(CNCPeerControl::Peer(p->first)->GetRawTrustLevel());
524 
525         Uint8 srv_id = p->first;
526         vector<Uint2> slots;
527         // find slots servers by this peer
528         ITERATE(TSlot2SrvMap, it_slot, mirrorConf->s_RawSlot2Servers) {
529             Uint2 slot = it_slot->first;
530             const TServersList& srvs = it_slot->second;
531             if (find(srvs.begin(), srvs.end(), srv_id) != srvs.end()) {
532                 slots.push_back(slot);
533             }
534         }
535         task.WriteText(eol).WriteText(kNCReg_NCServerSlotsPrefix).WriteNumber(idx).WriteText(is).WriteText("[");
536         ITERATE( vector<Uint2>, s, slots) {
537             if (s != slots.begin()) {
538                 task.WriteText(",");
539             }
540             task.WriteNumber(*s);
541         }
542         task.WriteText("]");
543         ++idx;
544     }
545 
546     task.WriteText(eol).WriteText("mirroring_log_file" ).WriteText(iss).WriteText(s_MirroringSizeFile).WriteText(eos);
547     task.WriteText(eol).WriteText("periodic_log_file"  ).WriteText(iss).WriteText(s_PeriodicLogFile  ).WriteText(eos);
548     task.WriteText(eol).WriteText("copy_delay_log_file").WriteText(iss).WriteText(s_CopyDelayLogFile ).WriteText(eos);
549     task.WriteText(eol).WriteText("sync_log_file"      ).WriteText(iss).WriteText(s_SyncLogFileName  ).WriteText(eos);
550 
551     task.WriteText(eol).WriteText("cnt_slot_buckets"           ).WriteText(is).WriteNumber(s_CntSlotBuckets);
552     task.WriteText(eol).WriteText("max_active_syncs"           ).WriteText(is).WriteNumber(s_CntActiveSyncs);
553     task.WriteText(eol).WriteText("max_syncs_one_server"       ).WriteText(is).WriteNumber(s_MaxSyncsOneServer);
554     task.WriteText(eol).WriteText("task_priority_sync"         ).WriteText(is).WriteNumber(s_TaskPrioritySync);
555     task.WriteText(eol).WriteText("max_peer_connections"       ).WriteText(is).WriteNumber(s_MaxPeerTotalConns);
556     task.WriteText(eol).WriteText("max_peer_bg_connections"    ).WriteText(is).WriteNumber(s_MaxPeerBGConns);
557     task.WriteText(eol).WriteText("peer_errors_for_throttle"   ).WriteText(is).WriteNumber(s_CntErrorsToThrottle);
558     task.WriteText(eol).WriteText("peer_throttles_for_ip_change").WriteText(is).WriteNumber(s_CntThrottlesToIpchange);
559     task.WriteText(eol).WriteText("peer_throttle_period"       ).WriteText(is).WriteNumber(s_PeerThrottlePeriod/kUSecsPerSecond);
560     task.WriteText(eol).WriteText("peer_communication_timeout" ).WriteText(is).WriteNumber(s_PeerTimeout);
561     task.WriteText(eol).WriteText("peer_blob_list_timeout"     ).WriteText(is).WriteNumber(s_BlobListTimeout);
562     task.WriteText(eol).WriteText("small_blob_max_size"        ).WriteText(is).WriteNumber(s_SmallBlobBoundary/1000);
563     task.WriteText(eol).WriteText("max_instant_queue_size"     ).WriteText(is).WriteNumber(s_MaxMirrorQueueSize);
564     task.WriteText(eol).WriteText("max_slot_log_records"       ).WriteText(is).WriteNumber(s_MaxSlotLogEvents);
565     task.WriteText(eol).WriteText("clean_slot_log_reserve"     ).WriteText(is).WriteNumber(s_CleanLogReserve);
566     task.WriteText(eol).WriteText("max_clean_log_batch"        ).WriteText(is).WriteNumber(s_MaxCleanLogBatch);
567     task.WriteText(eol).WriteText("min_forced_clean_log_period").WriteText(is).WriteNumber(s_MinForcedCleanPeriod/kUSecsPerSecond);
568     task.WriteText(eol).WriteText("clean_log_attempt_interval" ).WriteText(is).WriteNumber(s_CleanAttemptInterval);
569     task.WriteText(eol).WriteText("deferred_sync_interval"     ).WriteText(is).WriteNumber(s_PeriodicSyncInterval/kUSecsPerSecond);
570     task.WriteText(eol).WriteText("deferred_sync_timeout"      ).WriteText(is).WriteNumber(s_PeriodicSyncTimeout/ kUSecsPerSecond);
571     task.WriteText(eol).WriteText("failed_sync_retry_delay"    ).WriteText(is).WriteNumber(s_FailedSyncRetryDelay/kUSecsPerSecond);
572     task.WriteText(eol).WriteText("network_error_timeout"      ).WriteText(is).WriteNumber(s_NetworkErrorTimeout/ kUSecsPerSecond);
573     task.WriteText(eol).WriteText("max_blob_size_sync").WriteText(str).WriteText(iss)
574                                                    .WriteText(NStr::UInt8ToString_DataSize( s_MaxBlobSizeSync)).WriteText(eos);
575     task.WriteText(eol).WriteText("max_blob_size_sync").WriteText(is ).WriteNumber( s_MaxBlobSizeSync);
576     task.WriteText(eol).WriteText("warn_blob_size_sync").WriteText(is ).WriteBool(s_WarnBlobSizeSync);
577     task.WriteText(eol).WriteText("blob_update_hotline").WriteText(is ).WriteBool(s_BlobUpdateHotline);
578     task.WriteText(eol).WriteText("slot_calculation_by_rawkey").WriteText(is ).WriteBool(s_SlotByRawkey);
579 }
580 
WriteEnvInfo(CSrvSocketTask & task)581 void CNCDistributionConf::WriteEnvInfo(CSrvSocketTask& task)
582 {
583     string is("\": "),iss("\": \""), eol(",\n\""), str("_str"), eos("\"");
584     task.WriteText(eol).WriteText("mirroring_log_file" ).WriteText(iss).WriteText(s_MirroringSizeFile).WriteText(eos);
585     task.WriteText(eol).WriteText("periodic_log_file"  ).WriteText(iss).WriteText(s_PeriodicLogFile  ).WriteText(eos);
586     task.WriteText(eol).WriteText("copy_delay_log_file").WriteText(iss).WriteText(s_CopyDelayLogFile ).WriteText(eos);
587     task.WriteText(eol).WriteText("sync_log_file"      ).WriteText(iss).WriteText(s_SyncLogFileName  ).WriteText(eos);
588 }
589 
590 size_t
CountServersForSlot(Uint2 slot)591 CNCDistributionConf::CountServersForSlot(Uint2 slot)
592 {
593     return s_MirrorConf->s_Slot2Servers[slot].size();
594 }
595 
596 void
GetServersForSlot(Uint2 slot,TServersList & lst)597 CNCDistributionConf::GetServersForSlot(Uint2 slot, TServersList& lst)
598 {
599     TSrvGroupsList srvs = s_MirrorConf->s_Slot2Servers[slot];
600     random_device rd;
601     mt19937 mt(rd());
602     shuffle(srvs.begin(), srvs.end(), mt);
603     lst.clear();
604     for (size_t i = 0; i < srvs.size(); ++i) {
605         if (srvs[i].grp == s_SelfGroup)
606             lst.push_back(srvs[i].srv_id);
607     }
608     for (size_t i = 0; i < srvs.size(); ++i) {
609         if (srvs[i].grp != s_SelfGroup)
610             lst.push_back(srvs[i].srv_id);
611     }
612 }
613 
614 const TServersList&
GetRawServersForSlot(Uint2 slot)615 CNCDistributionConf::GetRawServersForSlot(Uint2 slot)
616 {
617     return s_MirrorConf->s_RawSlot2Servers[slot];
618 }
619 const vector<Uint2>&
GetSlotsForServer(Uint8 srv_id)620 CNCDistributionConf::GetSlotsForServer(Uint8 srv_id)
621 {
622     return s_MirrorConf->s_Servers2Slots[srv_id];
623 }
GetMaxSlotNumber(void)624 Uint2 CNCDistributionConf::GetMaxSlotNumber(void) {
625     return s_MaxSlotNumber;
626 }
AddServerSlots(set<Uint2> & slots,Uint8 srv_id)627 void CNCDistributionConf::AddServerSlots(set<Uint2>& slots, Uint8 srv_id)
628 {
629     const vector<Uint2>& srv_slots(
630         (srv_id == s_SelfID || srv_id == 0) ? s_SelfSlots : s_MirrorConf->s_Servers2Slots[srv_id]);
631     for( const Uint2 s : srv_slots) {
632         slots.insert(s);
633     }
634 }
635 
636 const vector<Uint2>&
GetCommonSlots(Uint8 server)637 CNCDistributionConf::GetCommonSlots(Uint8 server)
638 {
639     return s_MirrorConf->s_CommonSlots[server];
640 }
641 
642 bool
HasCommonSlots(Uint8 server)643 CNCDistributionConf::HasCommonSlots(Uint8 server)
644 {
645     return !s_MirrorConf->s_CommonSlots[server].empty();
646 }
647 
648 Uint8
GetSelfID(void)649 CNCDistributionConf::GetSelfID(void)
650 {
651     return s_SelfID;
652 }
653 
GetSelfTrustLevel(void)654 Uint8 CNCDistributionConf::GetSelfTrustLevel(void)
655 {
656     return s_SelfTrust & 0xF;
657 }
658 
659 const TNCPeerList&
GetPeers(void)660 CNCDistributionConf::GetPeers(void)
661 {
662     return s_MirrorConf->s_Peers;
663 }
664 
665 bool
HasPeers(void)666 CNCDistributionConf::HasPeers(void)
667 {
668     return !s_MirrorConf->s_Peers.empty();
669 }
670 
671 string
GetPeerNameOrEmpty(Uint8 srv_id)672 CNCDistributionConf::GetPeerNameOrEmpty(Uint8 srv_id)
673 {
674     string name;
675     if (srv_id == s_SelfID) {
676         name = s_SelfName;
677     }
678     else {
679         const TNCPeerList& peers = CNCDistributionConf::GetPeers();
680         if (peers.find(srv_id) != peers.end()) {
681             name = peers.find(srv_id)->second;
682         }
683     }
684     return name;
685 }
686 
687 string
GetPeerName(Uint8 srv_id)688 CNCDistributionConf::GetPeerName(Uint8 srv_id)
689 {
690     string name(GetPeerNameOrEmpty(srv_id));
691     if (name.empty()) {
692         name = CNCPeerControl::GetPeerNameOrEmpty(srv_id);
693         if (name.empty()) {
694             name = "unknown_server";
695         }
696     }
697     return name;
698 }
699 
700 string
GetFullPeerName(Uint8 srv_id)701 CNCDistributionConf::GetFullPeerName(Uint8 srv_id)
702 {
703     string name( GetPeerName(srv_id));
704     name += " (";
705     name += NStr::UInt8ToString(srv_id);
706     name += ") ";
707     return name;
708 }
709 
710 
711 void
GetPeerServers(TServersList & lst)712 CNCDistributionConf::GetPeerServers(TServersList& lst)
713 {
714     lst.clear();
715     const TNCPeerList& peers = CNCDistributionConf::GetPeers();
716     ITERATE(TNCPeerList, it_peer, peers)  {
717         if (GetSelfID() != it_peer->first) {
718             lst.push_back(it_peer->first);
719         }
720     }
721 }
722 
723 void
GenerateBlobKey(Uint2 local_port,string & key,Uint2 & slot,Uint2 & time_bucket,unsigned int ver)724 CNCDistributionConf::GenerateBlobKey(Uint2 local_port,
725                                      string& key, Uint2& slot, Uint2& time_bucket,
726                                      unsigned int ver)
727 {
728     s_KeyRndLock.Lock();
729     Uint4 rnd_num = s_KeyRnd.GetRand();
730     s_KeyRndLock.Unlock();
731 
732     Uint2 cnt_pieces = Uint2(s_SelfSlots.size());
733     Uint4 piece_share = (CRandom::GetMax() + 1) / cnt_pieces + 1;
734     Uint2 index = rnd_num / piece_share;
735     rnd_num -= index * piece_share;
736     slot = s_SelfSlots[index];
737     Uint4 remain = rnd_num % s_SlotRndShare;
738     Uint4 key_rnd = (slot - 1) * s_SlotRndShare + remain;
739     time_bucket = min(
740         Uint2((slot - 1) * s_CntSlotBuckets + remain / s_TimeRndShare + 1),
741         CNCDistributionConf::GetCntTimeBuckets());
742     CNetCacheKey::GenerateBlobKey(&key,
743                                   static_cast<Uint4>(s_BlobId.Add(1)),
744                                   s_SelfHostIP, local_port, ver, key_rnd);
745 }
746 
747 bool
GetSlotByKey(const string & key,Uint2 & slot,Uint2 & time_bucket)748 CNCDistributionConf::GetSlotByKey(const string& key, Uint2& slot, Uint2& time_bucket)
749 {
750     if (key[0] == '\1')
751         // NetCache-generated key
752         return GetSlotByNetCacheKey(key, slot, time_bucket);
753     else {
754         // ICache key provided by client
755         GetSlotByICacheKey(CNCBlobKeyLight(key), slot, time_bucket);
756         return true;
757     }
758 }
759 
760 bool
GetSlotByNetCacheKey(const string & key,Uint2 & slot,Uint2 & time_bucket)761 CNCDistributionConf::GetSlotByNetCacheKey(const string& key,
762         Uint2& slot, Uint2& time_bucket)
763 {
764     size_t ind = 0;
765     unsigned key_rnd = 0;
766 #if 0
767 #define SKIP_UNDERSCORE(key, ind) \
768     ind = key.find('_', ind + 1); \
769     if (ind == string::npos) \
770         return false;
771 
772     SKIP_UNDERSCORE(key, ind);      // version
773     SKIP_UNDERSCORE(key, ind);      // id
774     SKIP_UNDERSCORE(key, ind);      // host
775     SKIP_UNDERSCORE(key, ind);      // port
776     SKIP_UNDERSCORE(key, ind);      // time
777     SKIP_UNDERSCORE(key, ind);      // random
778 #else
779     ind = key.rfind('_');
780     if (ind == string::npos) {
781         goto on_error;
782     }
783 #endif
784     ++ind;
785     key_rnd = NStr::StringToUInt(
786             CTempString(&key[ind], key.size() - ind),
787             NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols);
788     if (key_rnd == 0 && errno != 0) {
789         goto on_error;
790     }
791     GetSlotByRnd(key_rnd, slot, time_bucket);
792     return true;
793 
794 on_error:
795     CNCBlobKeyLight kl(key);
796     CNetCacheKey tmp;
797     if (!CNetCacheKey::ParseBlobKey(kl.RawKey().data(), kl.RawKey().size(), &tmp)) {
798         SRV_LOG(Critical, "CNetCacheKey failed to parse blob key: " << key);
799         return false;
800     }
801     SRV_LOG(Error, "NetCache failed to parse blob key: " << key);
802     GetSlotByRnd(tmp.GetRandomPart(), slot, time_bucket);
803     return true;
804 }
805 
806 void
GetSlotByICacheKey(const CNCBlobKeyLight & key,Uint2 & slot,Uint2 & time_bucket)807 CNCDistributionConf::GetSlotByICacheKey(const CNCBlobKeyLight& key,
808         Uint2& slot, Uint2& time_bucket)
809 {
810     CChecksum crc32(CChecksum::eCRC32);
811 
812     if (s_SlotByRawkey) {
813         crc32.AddChars(key.RawKey().data(), key.RawKey().size());
814     } else {
815         crc32.AddChars(key.PackedKey().data(), key.PackedKey().size());
816     }
817 
818     GetSlotByRnd(crc32.GetChecksum(), slot, time_bucket);
819 }
820 
821 void
GetSlotByRnd(Uint4 key_rnd,Uint2 & slot,Uint2 & time_bucket)822 CNCDistributionConf::GetSlotByRnd(Uint4 key_rnd,
823         Uint2& slot, Uint2& time_bucket)
824 {
825     // Slot numbers are 1-based
826     slot = Uint2(key_rnd / s_SlotRndShare) + 1;
827     time_bucket = min(
828         Uint2((slot - 1) * s_CntSlotBuckets + key_rnd % s_SlotRndShare / s_TimeRndShare + 1),
829         CNCDistributionConf::GetCntTimeBuckets());
830 }
831 
GetSrvIdByIP(Uint4 ip)832 Uint8 CNCDistributionConf::GetSrvIdByIP(Uint4 ip)
833 {
834     SSrvMirrorInfo* mirrorConf = ACCESS_ONCE(s_MirrorConf);
835     map<Uint4, Uint8>::const_iterator i = mirrorConf->s_IpToId.find(ip);
836     if (i != mirrorConf->s_IpToId.end()) {
837         return i->second;
838     }
839     return 0;
840 }
841 
842 Uint8
GetMainSrvId(const CNCBlobKey & key)843 CNCDistributionConf::GetMainSrvId(const CNCBlobKey& key)
844 {
845     return key.IsICacheKey() ? 0 : GetSrvIdByIP( GetMainSrvIP(key));
846 }
847 
848 
849 bool
IsThisServerKey(const string & packed_key)850 CNCDistributionConf::IsThisServerKey(const string& packed_key)
851 {
852     CNCBlobKeyLight kl(packed_key);
853     CNCBlobKey blob_key;
854     blob_key.Assign(kl.Cache(), kl.RawKey(), kl.SubKey());
855     return CNCDistributionConf::IsThisServerKey(blob_key);
856 }
857 
858 bool
IsThisServerKey(const CNCBlobKey & key)859 CNCDistributionConf::IsThisServerKey(const CNCBlobKey& key)
860 {
861     if (key.IsICacheKey()) {
862         return true;
863     }
864     return (key.KeyVersion() == 3) ?
865         (s_SelfAlias  == key.GetHostPortCRC32()) :
866         (s_SelfHostIP == key.GetHost());
867 }
868 
869 Uint4
GetMainSrvIP(const CNCBlobKey & key)870 CNCDistributionConf::GetMainSrvIP(const CNCBlobKey& key)
871 {
872     try {
873         if (key.KeyVersion() == 3) {
874             Uint4 alias = key.GetHostPortCRC32();
875             return (alias == s_SelfAlias) ? s_SelfIP :
876                 CNCPeerControl::FindIPbyAlias(alias);
877         }
878         const string& host_str(key.GetHost());
879         if (s_SelfHostIP == host_str) {
880             return s_SelfIP;
881         }
882         Uint4 ip = CNCPeerControl::FindIPbyName(host_str);
883         if (ip != 0) {
884             return ip;
885         }
886         return CTaskServer::GetIPByHost(host_str);
887     }
888     catch (...) {
889         return 0;
890     }
891 }
892 
893 Uint4
CreateHostAlias(Uint4 ip,Uint4 port)894 CNCDistributionConf::CreateHostAlias(Uint4 ip, Uint4 port)
895 {
896     string ip_str(CTaskServer::IPToString(ip));
897     return CNetCacheKey::CalculateChecksum(ip_str, port);
898 }
899 
900 bool
IsServedLocally(Uint2 slot)901 CNCDistributionConf::IsServedLocally(Uint2 slot)
902 {
903     return find(s_SelfSlots.begin(), s_SelfSlots.end(), slot) != s_SelfSlots.end();
904 }
905 
906 Uint2
GetCntSlotBuckets(void)907 CNCDistributionConf::GetCntSlotBuckets(void)
908 {
909     return s_CntSlotBuckets;
910 }
911 
912 Uint2
GetCntTimeBuckets(void)913 CNCDistributionConf::GetCntTimeBuckets(void)
914 {
915     return s_CntTimeBuckets;
916 }
917 
918 const vector<Uint2>&
GetSelfSlots(void)919 CNCDistributionConf::GetSelfSlots(void)
920 {
921     return s_SelfSlots;
922 }
923 
924 const string&
GetMirroringSizeFile(void)925 CNCDistributionConf::GetMirroringSizeFile(void)
926 {
927     return s_MirroringSizeFile;
928 }
929 
930 const string&
GetPeriodicLogFile(void)931 CNCDistributionConf::GetPeriodicLogFile(void)
932 {
933     return s_PeriodicLogFile;
934 }
935 
936 Uint1
GetCntActiveSyncs(void)937 CNCDistributionConf::GetCntActiveSyncs(void)
938 {
939     return s_CntActiveSyncs;
940 }
941 
942 Uint1
GetMaxSyncsOneServer(void)943 CNCDistributionConf::GetMaxSyncsOneServer(void)
944 {
945     return s_MaxSyncsOneServer;
946 }
947 
948 Uint4
GetSyncPriority(void)949 CNCDistributionConf::GetSyncPriority(void)
950 {
951     return CNCServer::IsInitiallySynced()? s_TaskPrioritySync : GetDefaultTaskPriority();
952 }
953 
954 Uint2
GetMaxPeerTotalConns(void)955 CNCDistributionConf::GetMaxPeerTotalConns(void)
956 {
957     return s_MaxPeerTotalConns;
958 }
959 
960 Uint2
GetMaxPeerBGConns(void)961 CNCDistributionConf::GetMaxPeerBGConns(void)
962 {
963     return s_MaxPeerBGConns;
964 }
965 
966 Uint2
GetCntErrorsToThrottle(void)967 CNCDistributionConf::GetCntErrorsToThrottle(void)
968 {
969     return s_CntErrorsToThrottle;
970 }
971 
972 Uint2
GetCntThrottlesToIpchange(void)973 CNCDistributionConf::GetCntThrottlesToIpchange(void)
974 {
975     return s_CntThrottlesToIpchange;
976 }
977 
978 Uint8
GetPeerThrottlePeriod(void)979 CNCDistributionConf::GetPeerThrottlePeriod(void)
980 {
981     return s_PeerThrottlePeriod;
982 }
983 
984 Uint1
GetPeerTimeout(void)985 CNCDistributionConf::GetPeerTimeout(void)
986 {
987     return s_PeerTimeout;
988 }
989 
990 Uint1
GetBlobListTimeout(void)991 CNCDistributionConf::GetBlobListTimeout(void)
992 {
993     return s_BlobListTimeout;
994 }
995 
996 Uint8
GetSmallBlobBoundary(void)997 CNCDistributionConf::GetSmallBlobBoundary(void)
998 {
999     return s_SmallBlobBoundary;
1000 }
1001 
1002 Uint2
GetMaxMirrorQueueSize(void)1003 CNCDistributionConf::GetMaxMirrorQueueSize(void)
1004 {
1005     return s_MaxMirrorQueueSize;
1006 }
1007 
1008 const string&
GetSyncLogFileName(void)1009 CNCDistributionConf::GetSyncLogFileName(void)
1010 {
1011     return s_SyncLogFileName;
1012 }
1013 
1014 Uint4
GetMaxSlotLogEvents(void)1015 CNCDistributionConf::GetMaxSlotLogEvents(void)
1016 {
1017     return s_MaxSlotLogEvents;
1018 }
1019 
1020 Uint4
GetCleanLogReserve(void)1021 CNCDistributionConf::GetCleanLogReserve(void)
1022 {
1023     return s_CleanLogReserve;
1024 }
1025 
1026 Uint4
GetMaxCleanLogBatch(void)1027 CNCDistributionConf::GetMaxCleanLogBatch(void)
1028 {
1029     return s_MaxCleanLogBatch;
1030 }
1031 
1032 Uint8
GetMinForcedCleanPeriod(void)1033 CNCDistributionConf::GetMinForcedCleanPeriod(void)
1034 {
1035     return s_MinForcedCleanPeriod;
1036 }
1037 
1038 Uint4
GetCleanAttemptInterval(void)1039 CNCDistributionConf::GetCleanAttemptInterval(void)
1040 {
1041     return s_CleanAttemptInterval;
1042 }
1043 
1044 Uint8
GetPeriodicSyncInterval(void)1045 CNCDistributionConf::GetPeriodicSyncInterval(void)
1046 {
1047     return s_PeriodicSyncInterval;
1048 }
1049 
1050 Uint8
GetPeriodicSyncHeadTime(void)1051 CNCDistributionConf::GetPeriodicSyncHeadTime(void)
1052 {
1053     //return s_PeriodicSyncHeadTime;
1054     return 0;
1055 }
1056 
1057 Uint8
GetPeriodicSyncTailTime(void)1058 CNCDistributionConf::GetPeriodicSyncTailTime(void)
1059 {
1060     //return s_PeriodicSyncTailTime;
1061     return 0;
1062 }
1063 
1064 Uint8
GetPeriodicSyncTimeout(void)1065 CNCDistributionConf::GetPeriodicSyncTimeout(void)
1066 {
1067     return s_PeriodicSyncTimeout;
1068 }
1069 
1070 Uint8
GetFailedSyncRetryDelay(void)1071 CNCDistributionConf::GetFailedSyncRetryDelay(void)
1072 {
1073     return s_FailedSyncRetryDelay;
1074 }
1075 
1076 Uint8
GetNetworkErrorTimeout(void)1077 CNCDistributionConf::GetNetworkErrorTimeout(void)
1078 {
1079     return s_NetworkErrorTimeout;
1080 }
1081 Uint8
GetMaxBlobSizeSync(void)1082 CNCDistributionConf::GetMaxBlobSizeSync(void)
1083 {
1084     return s_MaxBlobSizeSync;
1085 }
1086 bool
GetWarnBlobSizeSync(void)1087  CNCDistributionConf::GetWarnBlobSizeSync(void)
1088  {
1089     return s_WarnBlobSizeSync;
1090  }
1091 bool
GetBlobUpdateHotline(void)1092  CNCDistributionConf::GetBlobUpdateHotline(void)
1093  {
1094     return s_BlobUpdateHotline;
1095  }
1096 
1097 void
PrintBlobCopyStat(Uint8 create_time,Uint8 create_server,Uint8 write_server)1098 CNCDistributionConf::PrintBlobCopyStat(Uint8 create_time, Uint8 create_server, Uint8 write_server)
1099 {
1100     if (s_CopyDelayLog) {
1101         Uint8 cur_time = CSrvTime::Current().AsUSec();
1102         fprintf(s_CopyDelayLog,
1103                 "%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC
1104                 ",%" NCBI_UINT8_FORMAT_SPEC ",%" NCBI_UINT8_FORMAT_SPEC "\n",
1105                 cur_time, create_server, write_server, cur_time - create_time);
1106     }
1107 }
1108 
1109 
1110 END_NCBI_SCOPE
1111