1 /*  $Id: osg_mapper.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 *                            PUBLIC DOMAIN NOTICE
5 *               National Center for Biotechnology Information
6 *
7 *  This software/database is a "United States Government Work" under the
8 *  terms of the United States Copyright Act.  It was written as part of
9 *  the author's official duties as a United States Government employee and
10 *  thus cannot be copyrighted.  This software/database is freely available
11 *  to the public for use. The National Library of Medicine and the U.S.
12 *  Government have not placed any restriction on its use or reproduction.
13 *
14 *  Although all reasonable efforts have been taken to ensure the accuracy
15 *  and reliability of the software and data, the NLM and the U.S.
16 *  Government do not and cannot warrant the performance or results that
17 *  may be obtained by using this software or data. The NLM and the U.S.
18 *  Government disclaim all warranties, express or implied, including
19 *  warranties of performance, merchantability or fitness for any particular
20 *  purpose.
21 *
22 *  Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author:  Eugene Vasilchenko, Aaron Ucko
27 *
28 * File Description:
29 *   PSG to OSG connection service mapper.
30 *
31 * ===========================================================================
32 */
33 
34 #include <ncbi_pch.hpp>
35 #include "osg_mapper.hpp"
36 
37 #include <corelib/ncbiapp.hpp>
38 #include <connect/ncbi_socket.hpp>
39 
40 #include <cmath>
41 
42 BEGIN_NCBI_NAMESPACE;
43 BEGIN_NAMESPACE(psg);
44 BEGIN_NAMESPACE(osg);
45 
46 static double s_DefaultPositiveWeight;
47 static double s_DefaultNegativeWeight;
48 static double s_DefaultNormalizationInterval;
49 static double s_DefaultDecayRate;
50 static double s_DefaultInitialPenalty;
51 static bool   s_DefaultsInitialized;
52 
53 
COSGServiceMapper(const IRegistry * registry)54 COSGServiceMapper::COSGServiceMapper(const IRegistry* registry)
55 {
56     m_AllServerRatingsTimer.Start();
57     m_Random.Randomize();
58     Configure(registry);
59 }
60 
61 
~COSGServiceMapper()62 COSGServiceMapper::~COSGServiceMapper()
63 {
64 }
65 
66 
InitDefaults(IRWRegistry & reg)67 void COSGServiceMapper::InitDefaults(IRWRegistry& reg)
68 {
69     CNcbiApplication* app = CNcbiApplication::Instance();
70     _ASSERT(app);
71     string section = app->GetAppName();
72 
73     reg.Set(section, "positive_feedback_weight", "0.01",
74             IRegistry::fNoOverride);
75     reg.Set(section, "negative_feedback_weight", "0.5",
76             IRegistry::fNoOverride);
77     reg.Set(section, "penalty_normalization_interval", "10.0",
78             IRegistry::fNoOverride);
79     reg.Set(section, "penalty_half_life", "3600.0", IRegistry::fNoOverride);
80     reg.Set(section, "initial_penalty", "0.15", IRegistry::fNoOverride);
81 
82     s_DefaultPositiveWeight
83         = reg.GetDouble(section, "positive_feedback_weight", 0);
84     s_DefaultNegativeWeight
85         = reg.GetDouble(section, "negative_feedback_weight", 0);
86     s_DefaultNormalizationInterval
87         = reg.GetDouble(section, "penalty_normalization_interval", 0);
88     s_DefaultDecayRate
89         = -M_LN2 / reg.GetDouble(section, "penalty_half_life", 0);
90     s_DefaultInitialPenalty = reg.GetDouble(section, "initial_penalty", 0);
91 
92     s_DefaultsInitialized = true;
93 }
94 
95 
Configure(const IRegistry * registry)96 void COSGServiceMapper::Configure(const IRegistry* registry)
97 {
98     ConfigureFromRegistry(registry);
99 
100     CNcbiApplication* app = CNcbiApplication::Instance();
101     _ASSERT(app);
102     string section = app->GetAppName();
103 
104     if (registry == NULL) {
105         registry = &app->GetConfig();
106     }
107 
108     _ASSERT(s_DefaultsInitialized);
109 
110     m_PositiveFeedbackWeight
111         = registry->GetDouble(section, "positive_feedback_weight",
112                               s_DefaultPositiveWeight);
113     m_NegativeFeedbackWeight
114         = registry->GetDouble(section, "negative_feedback_weight",
115                               s_DefaultNegativeWeight);
116     m_PenaltyNormalizationInterval
117         = registry->GetDouble(section, "penalty_normalization_interval",
118                               s_DefaultNormalizationInterval);
119     m_PenaltyDecayRate
120         = -M_LN2 / registry->GetDouble(section, "penalty_half_life",
121                                        s_DefaultDecayRate);
122     m_InitialPenalty
123         = registry->GetDouble(section, "initial_penalty",
124                               s_DefaultInitialPenalty);
125 
126     list<string> entries;
127     string msn_section = section + "/main_service_name";
128     registry->EnumerateEntries(msn_section, &entries);
129     for (const auto &it : entries) {
130         m_MainServiceNameMap[it] = registry->Get(msn_section, it);
131     }
132 }
133 
134 
135 inline
s_EndpointKeyName(TOSGEndpointKey k)136 static string s_EndpointKeyName(TOSGEndpointKey k)
137 {
138     return CSocketAPI::HostPortToString(g_OSG_GetHost(k), g_OSG_GetPort(k));
139 }
140 
141 
x_GetServer(const string & service,const TTried * tried)142 TSvrRef COSGServiceMapper::x_GetServer(const string& service,
143                                        const TTried* tried)
144 {
145     TServerRatings ratings;
146     double         min_penalty = 1.0;
147     size_t         full_count = 0, num_ratings = 0;
148 
149     _TRACE("Finding a server for " << service);
150 
151     x_NormalizePenalties();
152 
153     {{
154         CFastReadGuard LOCK(m_AllServerRatingsLock);
155         TAllServerRatings::const_iterator raw_ratings
156             = m_AllServerRatings.find(service);
157         if (raw_ratings == m_AllServerRatings.end()
158             ||  raw_ratings->second.empty()) {
159             LOCK.Release();
160             list<string> serv_list;
161             GetServersList(service, &serv_list);
162         } else {
163             double min_untried_penalty = 1.0;
164 
165             ITERATE (TServerRatings, it, raw_ratings->second) {
166                 bool tried_it
167                     = (tried != NULL
168                        &&  (find(tried->begin(), tried->end(), it->first)
169                             != tried->end()));
170                 if (tried_it  ||  it->second.excluded
171                     ||  it->second.penalty > 0.0) {
172                     _ASSERT(it->second.ref.NotEmpty());
173                     ratings.insert(*it);
174                     ++num_ratings;
175                 }
176                 if ( !it->second.excluded ) {
177                     if (it->second.penalty < min_penalty) {
178                         min_penalty = it->second.penalty;
179                     }
180                     if ( !tried_it
181                         &&  it->second.penalty < min_untried_penalty) {
182                         min_untried_penalty = it->second.penalty;
183                     }
184                 }
185                 ++full_count;
186             }
187 
188             if (tried != NULL  &&  !tried->empty()) {
189                 if (min_untried_penalty < 1.0) {
190                     min_penalty = min_untried_penalty;
191                     ITERATE (TTried, it, *tried) {
192                         TServerRatings::iterator rit = ratings.find(*it);
193                         if (rit == ratings.end()) {
194                             string msg
195                                 = FORMAT(s_EndpointKeyName(*it)
196                                          << " unknown, but listed as tried.\n"
197                                          << CStackTrace());
198                             ERR_POST(Warning << "OSG: " << msg);
199                         } else {
200                             _TRACE("Skipping " << s_EndpointKeyName(*it)
201                                    << " (already tried for this request)");
202                             rit->second.excluded = true;
203                         }
204                     }
205                 } else {
206                     _TRACE("Re-allowing previously tried backends. "
207                            "(Out of alternatives.)");
208                 }
209             }
210         }
211     }}
212 
213     vector<TSvrRef> to_exclude;
214     if ( !ratings.empty() ) {
215         CFastMutexGuard LOCK(m_RandomMutex);
216         CRandom::TValue max_random = CRandom::GetMax();
217         to_exclude.reserve(num_ratings);
218         ITERATE (TServerRatings, it, ratings) {
219             if (it->second.excluded
220                 ||  (m_Random.GetRand() < it->second.penalty * max_random)) {
221                 to_exclude.push_back(it->second.ref);
222                 _TRACE("Temporarily excluding "
223                        << s_EndpointKeyName(it->first) << " (penalty: "
224                        << it->second.penalty << ')');
225             } else {
226                 _TRACE("Considering " << s_EndpointKeyName(it->first)
227                        << " (penalty: " << it->second.penalty << ')');
228             }
229         }
230         if (to_exclude.size() == full_count) {
231             // Apparently excluded everything; rescale and try again.
232             // (Not done right away, as this technique can introduce skew
233             // if somehow starting with an incomplete list.)
234             to_exclude.clear();
235             to_exclude.reserve(full_count);
236             if (min_penalty < 1.0) {
237                 // Give the numerator a slight boost to keep roundoff
238                 // error from yielding a (slim!) possibility of still
239                 // excluding everything.
240                 double scale_factor = (max_random + 1.0) / (1.0 - min_penalty);
241                 ITERATE (TServerRatings, it, ratings) {
242                     double score = (1.0 - it->second.penalty) * scale_factor;
243                     if (it->second.excluded  ||  m_Random.GetRand() > score) {
244                         to_exclude.push_back(it->second.ref);
245                     } else {
246                         _TRACE("Reconsidering "
247                                << s_EndpointKeyName(it->first) << " (score: "
248                                << NStr::UInt8ToString(score,
249                                                       NStr::fWithCommas)
250                                << ')');
251                     }
252                 }
253             }
254         }
255     }
256 
257     CFastMutexGuard LOCK(m_DBLBExclusionsMutex);
258     TParent::CleanExcluded(service);
259     ITERATE (vector<TSvrRef>, it, to_exclude) {
260         TParent::Exclude(service, *it);
261     }
262     TSvrRef result = TParent::GetServer(service);
263     TParent::CleanExcluded(service);
264     ITERATE (TServerRatings, it, ratings) {
265         if (it->second.excluded) {
266             TParent::Exclude(service, it->second.ref);
267         }
268     }
269     _TRACE("Returning " << CSocketAPI::ntoa(result->GetHost())
270            << ", expiring "
271            << CTime(result->GetExpireTime()).ToLocalTime().AsString());
272     return result;
273 }
274 
275 
Exclude(const string & service,const TSvrRef & server)276 void COSGServiceMapper::Exclude(const string& service, const TSvrRef& server)
277 {
278     {{
279         CFastWriteGuard LOCK(m_AllServerRatingsLock);
280         bool was_new = false;
281         SServerRating& rating
282             = x_SetRating(m_AllServerRatings[service], server->GetHost(),
283                           server->GetPort(), &was_new, server->GetName());
284         rating.excluded = true;
285         if (was_new) {
286             string msg
287                 = "Excluding previously undiscovered " + service + " on "
288                 + CSocketAPI::ntoa(server->GetHost()) + '.';
289             ERR_POST(Warning << "OSG: " << msg);
290         }
291     }}
292     {{
293         CFastMutexGuard LOCK(m_DBLBExclusionsMutex);
294         TParent::Exclude(service, server);
295     }}
296     auto msn = m_MainServiceNameMap.find(service);
297     if (msn != m_MainServiceNameMap.end()) {
298         Exclude(msn->second, server);
299     }
300 }
301 
302 
CleanExcluded(const string & service)303 void COSGServiceMapper::CleanExcluded(const string& service)
304 {
305     {{
306         CFastWriteGuard LOCK(m_AllServerRatingsLock);
307         TServerRatings& ratings = m_AllServerRatings[service];
308         NON_CONST_ITERATE (TServerRatings, it, ratings) {
309             it->second.excluded = false;
310         }
311     }}
312     {{
313         CFastMutexGuard LOCK(m_DBLBExclusionsMutex);
314         TParent::CleanExcluded(service);
315     }}
316     auto msn = m_MainServiceNameMap.find(service);
317     if (msn != m_MainServiceNameMap.end()) {
318         CleanExcluded(msn->second);
319     }
320 }
321 
322 
GetServersList(const string & service,list<string> * serv_list) const323 void COSGServiceMapper::GetServersList(const string& service,
324                                        list<string>* serv_list) const
325 {
326     unique_ptr<set<string>> main_set;
327     TServerRatings* main_ratings = nullptr;
328     TParent::GetServersList(service, serv_list);
329     auto msn = m_MainServiceNameMap.find(service);
330     if (msn != m_MainServiceNameMap.end()) {
331         list<string> main_list;
332         TParent::GetServersList(msn->second, &main_list);
333         main_set.reset(new set<string>(main_list.begin(), main_list.end()));
334     }
335 
336     CFastWriteGuard LOCK(m_AllServerRatingsLock);
337     TServerRatings& ratings = m_AllServerRatings[service];
338     if (main_set.get() != nullptr) {
339         main_ratings = &m_AllServerRatings[msn->second];
340     }
341     ITERATE (list<string>, it, *serv_list) {
342         Uint4 host;
343         Uint2 port;
344         bool  was_new = false;
345         CSocketAPI::StringToHostPort(*it, &host, &port);
346         auto& rating = x_SetRating(ratings, host, port, &was_new);
347         if (was_new) {
348             _TRACE("Discovered " << service << " on " << *it);
349             if (main_ratings != nullptr) {
350                 auto& main_rating = x_SetRating(*main_ratings, host, port,
351                                                 &was_new);
352                 if (main_set->find(*it) == main_set->end()) {
353                     main_rating.penalty = 1.0;
354                     main_rating.excluded = true;
355                 } else {
356                     _TRACE("Discovered " << msn->second << " on " << *it);
357                     main_rating = rating;
358                 }
359             }
360         }
361     }
362 }
363 
364 
GetServerOptions(const string & service,TOptions * options)365 void COSGServiceMapper::GetServerOptions(const string& service,
366                                          TOptions* options)
367 {
368     set<TOSGEndpointKey> main_set;
369     TServerRatings* main_ratings = nullptr;
370     TParent::GetServerOptions(service, options);
371     auto msn = m_MainServiceNameMap.find(service);
372     if (msn != m_MainServiceNameMap.end()) {
373         TOptions main_options;
374         TParent::GetServerOptions(msn->second, &main_options);
375         for (const auto &it : main_options) {
376             main_set.insert(g_OSG_MakeEndpointKey(it->GetHost(),
377                                                   it->GetPort()));
378         }
379     }
380 
381     CFastWriteGuard LOCK(m_AllServerRatingsLock);
382     TServerRatings& ratings = m_AllServerRatings[service];
383     if (msn != m_MainServiceNameMap.end()) {
384         main_ratings = &m_AllServerRatings[msn->second];
385     }
386     for (const auto& it : *options) {
387         auto host = it->GetHost();
388         auto port = it->GetPort();
389         bool was_new = false;
390         auto& rating = x_SetRating(ratings, host, port, &was_new);
391         if (was_new) {
392             _TRACE("Discovered " << service << " on "
393                    << CSocketAPI::HostPortToString(host, port));
394             if (main_ratings != nullptr) {
395                 auto& main_rating = x_SetRating(*main_ratings, host, port,
396                                                 &was_new);
397                 auto key = g_OSG_MakeEndpointKey(host, port);
398                 if (main_set.find(key) == main_set.end()) {
399                     main_rating.penalty = 1.0;
400                     main_rating.excluded = true;
401                 } else {
402                     _TRACE("Discovered " << msn->second << " on "
403                            << CSocketAPI::HostPortToString(host, port));
404                     main_rating = rating;
405                 }
406             }
407         }
408     }
409 }
410 
411 
AcceptFeedback(const string & service,unsigned int host,unsigned short port,EFeedback feedback)412 void COSGServiceMapper::AcceptFeedback(const string& service,
413                                        unsigned int host, unsigned short port,
414                                        EFeedback feedback)
415 {
416     x_NormalizePenalties();
417     CFastWriteGuard LOCK(m_AllServerRatingsLock);
418     bool was_new = false;
419     SServerRating& rating = x_SetRating(m_AllServerRatings[service],
420                                         host, port, &was_new);
421     double&        penalty = rating.penalty;
422     _DEBUG_ARG(double old_penalty = penalty);
423     if (was_new) {
424         string msg = ("Accepting feedback for previously undiscovered "
425                       + service + " on " + rating.ref->GetName() + '.');
426         ERR_POST(Warning << "OSG: " << msg);
427     }
428     if (feedback == ePositiveFeedback) {
429         penalty *= (1.0 - m_PositiveFeedbackWeight);
430     } else {
431         penalty = penalty * (1.0 - m_NegativeFeedbackWeight)
432             + m_NegativeFeedbackWeight;
433     }
434     _TRACE(((feedback == ePositiveFeedback)
435             ? "Reducing" : "Increasing")
436            << " penalty for " << service << " on "
437            << CSocketAPI::HostPortToString(host, port)
438            << " from " << old_penalty << " to " << penalty);
439     LOCK.Release();
440     auto msn = m_MainServiceNameMap.find(service);
441     if (msn != m_MainServiceNameMap.end()) {
442         AcceptFeedback(msn->second, host, port, feedback);
443     }
444 }
445 
446 
x_NormalizePenalties(void)447 void COSGServiceMapper::x_NormalizePenalties(void)
448 {
449     {{
450         CFastReadGuard LOCK(m_AllServerRatingsLock);
451         if (m_AllServerRatingsTimer.Elapsed()
452             < m_PenaltyNormalizationInterval) {
453             return;
454         }
455     }}
456 
457     CFastWriteGuard LOCK(m_AllServerRatingsLock);
458     // Recheck, as another thread could have just taken care of it.
459     double elapsed = m_AllServerRatingsTimer.Elapsed();
460     if (elapsed < m_PenaltyNormalizationInterval) {
461         return;
462     }
463 
464     double decay = exp(m_PenaltyDecayRate * elapsed);
465     _TRACE("Decaying penalties by " << decay << " after " << elapsed
466            << " s");
467     NON_CONST_ITERATE (TAllServerRatings, it, m_AllServerRatings) {
468         NON_CONST_ITERATE (TServerRatings, it2, it->second) {
469             it2->second.penalty
470                 = (it2->second.penalty - m_InitialPenalty) * decay
471                 + m_InitialPenalty;
472         }
473     }
474     m_AllServerRatingsTimer.Restart();
475 }
476 
477 
x_SetRating(TServerRatings & ratings,Uint4 host,Uint2 port,bool * was_new,CTempString name) const478 COSGServiceMapper::SServerRating& COSGServiceMapper::x_SetRating
479 (TServerRatings& ratings, Uint4 host, Uint2 port, bool* was_new,
480  CTempString name) const
481 {
482     TOSGEndpointKey          key = g_OSG_MakeEndpointKey(host, port);
483     TServerRatings::iterator it  = ratings.lower_bound(key);
484 
485     if (it == ratings.end()  ||  it->first != key) {
486         if (was_new != NULL) {
487             *was_new = true;
488         }
489         TServerRatings::value_type node(key, SServerRating());
490         string name_str
491             = (name.empty() ? CSocketAPI::HostPortToString(host, port)
492                : string(name));
493         node.second.ref.Reset(new CDBServer(name_str, host, port, kMax_Auto));
494         node.second.penalty  = m_InitialPenalty;
495         node.second.excluded = false;
496         it = ratings.insert(it, node);
497     }
498 
499     return it->second;
500 }
501 
502 
503 END_NAMESPACE(osg);
504 END_NAMESPACE(psg);
505 END_NCBI_NAMESPACE;
506