1 /* $Id: osg_mapper.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Eugene Vasilchenko, Aaron Ucko
27 *
28 * File Description:
29 * PSG to OSG connection service mapper.
30 *
31 * ===========================================================================
32 */
33
34 #include <ncbi_pch.hpp>
35 #include "osg_mapper.hpp"
36
37 #include <corelib/ncbiapp.hpp>
38 #include <connect/ncbi_socket.hpp>
39
40 #include <cmath>
41
42 BEGIN_NCBI_NAMESPACE;
43 BEGIN_NAMESPACE(psg);
44 BEGIN_NAMESPACE(osg);
45
46 static double s_DefaultPositiveWeight;
47 static double s_DefaultNegativeWeight;
48 static double s_DefaultNormalizationInterval;
49 static double s_DefaultDecayRate;
50 static double s_DefaultInitialPenalty;
51 static bool s_DefaultsInitialized;
52
53
COSGServiceMapper(const IRegistry * registry)54 COSGServiceMapper::COSGServiceMapper(const IRegistry* registry)
55 {
56 m_AllServerRatingsTimer.Start();
57 m_Random.Randomize();
58 Configure(registry);
59 }
60
61
~COSGServiceMapper()62 COSGServiceMapper::~COSGServiceMapper()
63 {
64 }
65
66
InitDefaults(IRWRegistry & reg)67 void COSGServiceMapper::InitDefaults(IRWRegistry& reg)
68 {
69 CNcbiApplication* app = CNcbiApplication::Instance();
70 _ASSERT(app);
71 string section = app->GetAppName();
72
73 reg.Set(section, "positive_feedback_weight", "0.01",
74 IRegistry::fNoOverride);
75 reg.Set(section, "negative_feedback_weight", "0.5",
76 IRegistry::fNoOverride);
77 reg.Set(section, "penalty_normalization_interval", "10.0",
78 IRegistry::fNoOverride);
79 reg.Set(section, "penalty_half_life", "3600.0", IRegistry::fNoOverride);
80 reg.Set(section, "initial_penalty", "0.15", IRegistry::fNoOverride);
81
82 s_DefaultPositiveWeight
83 = reg.GetDouble(section, "positive_feedback_weight", 0);
84 s_DefaultNegativeWeight
85 = reg.GetDouble(section, "negative_feedback_weight", 0);
86 s_DefaultNormalizationInterval
87 = reg.GetDouble(section, "penalty_normalization_interval", 0);
88 s_DefaultDecayRate
89 = -M_LN2 / reg.GetDouble(section, "penalty_half_life", 0);
90 s_DefaultInitialPenalty = reg.GetDouble(section, "initial_penalty", 0);
91
92 s_DefaultsInitialized = true;
93 }
94
95
Configure(const IRegistry * registry)96 void COSGServiceMapper::Configure(const IRegistry* registry)
97 {
98 ConfigureFromRegistry(registry);
99
100 CNcbiApplication* app = CNcbiApplication::Instance();
101 _ASSERT(app);
102 string section = app->GetAppName();
103
104 if (registry == NULL) {
105 registry = &app->GetConfig();
106 }
107
108 _ASSERT(s_DefaultsInitialized);
109
110 m_PositiveFeedbackWeight
111 = registry->GetDouble(section, "positive_feedback_weight",
112 s_DefaultPositiveWeight);
113 m_NegativeFeedbackWeight
114 = registry->GetDouble(section, "negative_feedback_weight",
115 s_DefaultNegativeWeight);
116 m_PenaltyNormalizationInterval
117 = registry->GetDouble(section, "penalty_normalization_interval",
118 s_DefaultNormalizationInterval);
119 m_PenaltyDecayRate
120 = -M_LN2 / registry->GetDouble(section, "penalty_half_life",
121 s_DefaultDecayRate);
122 m_InitialPenalty
123 = registry->GetDouble(section, "initial_penalty",
124 s_DefaultInitialPenalty);
125
126 list<string> entries;
127 string msn_section = section + "/main_service_name";
128 registry->EnumerateEntries(msn_section, &entries);
129 for (const auto &it : entries) {
130 m_MainServiceNameMap[it] = registry->Get(msn_section, it);
131 }
132 }
133
134
135 inline
s_EndpointKeyName(TOSGEndpointKey k)136 static string s_EndpointKeyName(TOSGEndpointKey k)
137 {
138 return CSocketAPI::HostPortToString(g_OSG_GetHost(k), g_OSG_GetPort(k));
139 }
140
141
x_GetServer(const string & service,const TTried * tried)142 TSvrRef COSGServiceMapper::x_GetServer(const string& service,
143 const TTried* tried)
144 {
145 TServerRatings ratings;
146 double min_penalty = 1.0;
147 size_t full_count = 0, num_ratings = 0;
148
149 _TRACE("Finding a server for " << service);
150
151 x_NormalizePenalties();
152
153 {{
154 CFastReadGuard LOCK(m_AllServerRatingsLock);
155 TAllServerRatings::const_iterator raw_ratings
156 = m_AllServerRatings.find(service);
157 if (raw_ratings == m_AllServerRatings.end()
158 || raw_ratings->second.empty()) {
159 LOCK.Release();
160 list<string> serv_list;
161 GetServersList(service, &serv_list);
162 } else {
163 double min_untried_penalty = 1.0;
164
165 ITERATE (TServerRatings, it, raw_ratings->second) {
166 bool tried_it
167 = (tried != NULL
168 && (find(tried->begin(), tried->end(), it->first)
169 != tried->end()));
170 if (tried_it || it->second.excluded
171 || it->second.penalty > 0.0) {
172 _ASSERT(it->second.ref.NotEmpty());
173 ratings.insert(*it);
174 ++num_ratings;
175 }
176 if ( !it->second.excluded ) {
177 if (it->second.penalty < min_penalty) {
178 min_penalty = it->second.penalty;
179 }
180 if ( !tried_it
181 && it->second.penalty < min_untried_penalty) {
182 min_untried_penalty = it->second.penalty;
183 }
184 }
185 ++full_count;
186 }
187
188 if (tried != NULL && !tried->empty()) {
189 if (min_untried_penalty < 1.0) {
190 min_penalty = min_untried_penalty;
191 ITERATE (TTried, it, *tried) {
192 TServerRatings::iterator rit = ratings.find(*it);
193 if (rit == ratings.end()) {
194 string msg
195 = FORMAT(s_EndpointKeyName(*it)
196 << " unknown, but listed as tried.\n"
197 << CStackTrace());
198 ERR_POST(Warning << "OSG: " << msg);
199 } else {
200 _TRACE("Skipping " << s_EndpointKeyName(*it)
201 << " (already tried for this request)");
202 rit->second.excluded = true;
203 }
204 }
205 } else {
206 _TRACE("Re-allowing previously tried backends. "
207 "(Out of alternatives.)");
208 }
209 }
210 }
211 }}
212
213 vector<TSvrRef> to_exclude;
214 if ( !ratings.empty() ) {
215 CFastMutexGuard LOCK(m_RandomMutex);
216 CRandom::TValue max_random = CRandom::GetMax();
217 to_exclude.reserve(num_ratings);
218 ITERATE (TServerRatings, it, ratings) {
219 if (it->second.excluded
220 || (m_Random.GetRand() < it->second.penalty * max_random)) {
221 to_exclude.push_back(it->second.ref);
222 _TRACE("Temporarily excluding "
223 << s_EndpointKeyName(it->first) << " (penalty: "
224 << it->second.penalty << ')');
225 } else {
226 _TRACE("Considering " << s_EndpointKeyName(it->first)
227 << " (penalty: " << it->second.penalty << ')');
228 }
229 }
230 if (to_exclude.size() == full_count) {
231 // Apparently excluded everything; rescale and try again.
232 // (Not done right away, as this technique can introduce skew
233 // if somehow starting with an incomplete list.)
234 to_exclude.clear();
235 to_exclude.reserve(full_count);
236 if (min_penalty < 1.0) {
237 // Give the numerator a slight boost to keep roundoff
238 // error from yielding a (slim!) possibility of still
239 // excluding everything.
240 double scale_factor = (max_random + 1.0) / (1.0 - min_penalty);
241 ITERATE (TServerRatings, it, ratings) {
242 double score = (1.0 - it->second.penalty) * scale_factor;
243 if (it->second.excluded || m_Random.GetRand() > score) {
244 to_exclude.push_back(it->second.ref);
245 } else {
246 _TRACE("Reconsidering "
247 << s_EndpointKeyName(it->first) << " (score: "
248 << NStr::UInt8ToString(score,
249 NStr::fWithCommas)
250 << ')');
251 }
252 }
253 }
254 }
255 }
256
257 CFastMutexGuard LOCK(m_DBLBExclusionsMutex);
258 TParent::CleanExcluded(service);
259 ITERATE (vector<TSvrRef>, it, to_exclude) {
260 TParent::Exclude(service, *it);
261 }
262 TSvrRef result = TParent::GetServer(service);
263 TParent::CleanExcluded(service);
264 ITERATE (TServerRatings, it, ratings) {
265 if (it->second.excluded) {
266 TParent::Exclude(service, it->second.ref);
267 }
268 }
269 _TRACE("Returning " << CSocketAPI::ntoa(result->GetHost())
270 << ", expiring "
271 << CTime(result->GetExpireTime()).ToLocalTime().AsString());
272 return result;
273 }
274
275
Exclude(const string & service,const TSvrRef & server)276 void COSGServiceMapper::Exclude(const string& service, const TSvrRef& server)
277 {
278 {{
279 CFastWriteGuard LOCK(m_AllServerRatingsLock);
280 bool was_new = false;
281 SServerRating& rating
282 = x_SetRating(m_AllServerRatings[service], server->GetHost(),
283 server->GetPort(), &was_new, server->GetName());
284 rating.excluded = true;
285 if (was_new) {
286 string msg
287 = "Excluding previously undiscovered " + service + " on "
288 + CSocketAPI::ntoa(server->GetHost()) + '.';
289 ERR_POST(Warning << "OSG: " << msg);
290 }
291 }}
292 {{
293 CFastMutexGuard LOCK(m_DBLBExclusionsMutex);
294 TParent::Exclude(service, server);
295 }}
296 auto msn = m_MainServiceNameMap.find(service);
297 if (msn != m_MainServiceNameMap.end()) {
298 Exclude(msn->second, server);
299 }
300 }
301
302
CleanExcluded(const string & service)303 void COSGServiceMapper::CleanExcluded(const string& service)
304 {
305 {{
306 CFastWriteGuard LOCK(m_AllServerRatingsLock);
307 TServerRatings& ratings = m_AllServerRatings[service];
308 NON_CONST_ITERATE (TServerRatings, it, ratings) {
309 it->second.excluded = false;
310 }
311 }}
312 {{
313 CFastMutexGuard LOCK(m_DBLBExclusionsMutex);
314 TParent::CleanExcluded(service);
315 }}
316 auto msn = m_MainServiceNameMap.find(service);
317 if (msn != m_MainServiceNameMap.end()) {
318 CleanExcluded(msn->second);
319 }
320 }
321
322
GetServersList(const string & service,list<string> * serv_list) const323 void COSGServiceMapper::GetServersList(const string& service,
324 list<string>* serv_list) const
325 {
326 unique_ptr<set<string>> main_set;
327 TServerRatings* main_ratings = nullptr;
328 TParent::GetServersList(service, serv_list);
329 auto msn = m_MainServiceNameMap.find(service);
330 if (msn != m_MainServiceNameMap.end()) {
331 list<string> main_list;
332 TParent::GetServersList(msn->second, &main_list);
333 main_set.reset(new set<string>(main_list.begin(), main_list.end()));
334 }
335
336 CFastWriteGuard LOCK(m_AllServerRatingsLock);
337 TServerRatings& ratings = m_AllServerRatings[service];
338 if (main_set.get() != nullptr) {
339 main_ratings = &m_AllServerRatings[msn->second];
340 }
341 ITERATE (list<string>, it, *serv_list) {
342 Uint4 host;
343 Uint2 port;
344 bool was_new = false;
345 CSocketAPI::StringToHostPort(*it, &host, &port);
346 auto& rating = x_SetRating(ratings, host, port, &was_new);
347 if (was_new) {
348 _TRACE("Discovered " << service << " on " << *it);
349 if (main_ratings != nullptr) {
350 auto& main_rating = x_SetRating(*main_ratings, host, port,
351 &was_new);
352 if (main_set->find(*it) == main_set->end()) {
353 main_rating.penalty = 1.0;
354 main_rating.excluded = true;
355 } else {
356 _TRACE("Discovered " << msn->second << " on " << *it);
357 main_rating = rating;
358 }
359 }
360 }
361 }
362 }
363
364
GetServerOptions(const string & service,TOptions * options)365 void COSGServiceMapper::GetServerOptions(const string& service,
366 TOptions* options)
367 {
368 set<TOSGEndpointKey> main_set;
369 TServerRatings* main_ratings = nullptr;
370 TParent::GetServerOptions(service, options);
371 auto msn = m_MainServiceNameMap.find(service);
372 if (msn != m_MainServiceNameMap.end()) {
373 TOptions main_options;
374 TParent::GetServerOptions(msn->second, &main_options);
375 for (const auto &it : main_options) {
376 main_set.insert(g_OSG_MakeEndpointKey(it->GetHost(),
377 it->GetPort()));
378 }
379 }
380
381 CFastWriteGuard LOCK(m_AllServerRatingsLock);
382 TServerRatings& ratings = m_AllServerRatings[service];
383 if (msn != m_MainServiceNameMap.end()) {
384 main_ratings = &m_AllServerRatings[msn->second];
385 }
386 for (const auto& it : *options) {
387 auto host = it->GetHost();
388 auto port = it->GetPort();
389 bool was_new = false;
390 auto& rating = x_SetRating(ratings, host, port, &was_new);
391 if (was_new) {
392 _TRACE("Discovered " << service << " on "
393 << CSocketAPI::HostPortToString(host, port));
394 if (main_ratings != nullptr) {
395 auto& main_rating = x_SetRating(*main_ratings, host, port,
396 &was_new);
397 auto key = g_OSG_MakeEndpointKey(host, port);
398 if (main_set.find(key) == main_set.end()) {
399 main_rating.penalty = 1.0;
400 main_rating.excluded = true;
401 } else {
402 _TRACE("Discovered " << msn->second << " on "
403 << CSocketAPI::HostPortToString(host, port));
404 main_rating = rating;
405 }
406 }
407 }
408 }
409 }
410
411
AcceptFeedback(const string & service,unsigned int host,unsigned short port,EFeedback feedback)412 void COSGServiceMapper::AcceptFeedback(const string& service,
413 unsigned int host, unsigned short port,
414 EFeedback feedback)
415 {
416 x_NormalizePenalties();
417 CFastWriteGuard LOCK(m_AllServerRatingsLock);
418 bool was_new = false;
419 SServerRating& rating = x_SetRating(m_AllServerRatings[service],
420 host, port, &was_new);
421 double& penalty = rating.penalty;
422 _DEBUG_ARG(double old_penalty = penalty);
423 if (was_new) {
424 string msg = ("Accepting feedback for previously undiscovered "
425 + service + " on " + rating.ref->GetName() + '.');
426 ERR_POST(Warning << "OSG: " << msg);
427 }
428 if (feedback == ePositiveFeedback) {
429 penalty *= (1.0 - m_PositiveFeedbackWeight);
430 } else {
431 penalty = penalty * (1.0 - m_NegativeFeedbackWeight)
432 + m_NegativeFeedbackWeight;
433 }
434 _TRACE(((feedback == ePositiveFeedback)
435 ? "Reducing" : "Increasing")
436 << " penalty for " << service << " on "
437 << CSocketAPI::HostPortToString(host, port)
438 << " from " << old_penalty << " to " << penalty);
439 LOCK.Release();
440 auto msn = m_MainServiceNameMap.find(service);
441 if (msn != m_MainServiceNameMap.end()) {
442 AcceptFeedback(msn->second, host, port, feedback);
443 }
444 }
445
446
x_NormalizePenalties(void)447 void COSGServiceMapper::x_NormalizePenalties(void)
448 {
449 {{
450 CFastReadGuard LOCK(m_AllServerRatingsLock);
451 if (m_AllServerRatingsTimer.Elapsed()
452 < m_PenaltyNormalizationInterval) {
453 return;
454 }
455 }}
456
457 CFastWriteGuard LOCK(m_AllServerRatingsLock);
458 // Recheck, as another thread could have just taken care of it.
459 double elapsed = m_AllServerRatingsTimer.Elapsed();
460 if (elapsed < m_PenaltyNormalizationInterval) {
461 return;
462 }
463
464 double decay = exp(m_PenaltyDecayRate * elapsed);
465 _TRACE("Decaying penalties by " << decay << " after " << elapsed
466 << " s");
467 NON_CONST_ITERATE (TAllServerRatings, it, m_AllServerRatings) {
468 NON_CONST_ITERATE (TServerRatings, it2, it->second) {
469 it2->second.penalty
470 = (it2->second.penalty - m_InitialPenalty) * decay
471 + m_InitialPenalty;
472 }
473 }
474 m_AllServerRatingsTimer.Restart();
475 }
476
477
x_SetRating(TServerRatings & ratings,Uint4 host,Uint2 port,bool * was_new,CTempString name) const478 COSGServiceMapper::SServerRating& COSGServiceMapper::x_SetRating
479 (TServerRatings& ratings, Uint4 host, Uint2 port, bool* was_new,
480 CTempString name) const
481 {
482 TOSGEndpointKey key = g_OSG_MakeEndpointKey(host, port);
483 TServerRatings::iterator it = ratings.lower_bound(key);
484
485 if (it == ratings.end() || it->first != key) {
486 if (was_new != NULL) {
487 *was_new = true;
488 }
489 TServerRatings::value_type node(key, SServerRating());
490 string name_str
491 = (name.empty() ? CSocketAPI::HostPortToString(host, port)
492 : string(name));
493 node.second.ref.Reset(new CDBServer(name_str, host, port, kMax_Auto));
494 node.second.penalty = m_InitialPenalty;
495 node.second.excluded = false;
496 it = ratings.insert(it, node);
497 }
498
499 return it->second;
500 }
501
502
503 END_NAMESPACE(osg);
504 END_NAMESPACE(psg);
505 END_NCBI_NAMESPACE;
506