1 /*  $Id: ns_clients_registry.cpp 543877 2017-08-15 13:30:30Z satskyse $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Sergey Satskiy
27  *
28  * File Description:
29  *   NetSchedule clients registry
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/request_ctx.hpp>
35 
36 #include "ns_clients_registry.hpp"
37 #include "ns_affinity.hpp"
38 #include "ns_handler.hpp"
39 
40 
41 BEGIN_NCBI_SCOPE
42 
43 
CNSClientsRegistry()44 CNSClientsRegistry::CNSClientsRegistry() :
45     m_LastID(0), m_BlacklistTimeout(), m_ReadBlacklistTimeout()
46 {}
47 
48 
SetRegistries(CNSAffinityRegistry * aff_registry,CNSNotificationList * notif_registry)49 void CNSClientsRegistry::SetRegistries(CNSAffinityRegistry *  aff_registry,
50                                        CNSNotificationList *  notif_registry)
51 {
52     m_AffRegistry = aff_registry;
53     m_NotifRegistry = notif_registry;
54 }
55 
56 
57 // Called before any command is issued by the client
58 void
Touch(CNSClientId & client,TNSBitVector & running_jobs,TNSBitVector & reading_jobs,bool & client_was_found,bool & session_was_reset,string & old_session,bool & had_wn_pref_affs,bool & had_reader_pref_affs)59 CNSClientsRegistry::Touch(CNSClientId &          client,
60                           TNSBitVector &         running_jobs,
61                           TNSBitVector &         reading_jobs,
62                           bool &                 client_was_found,
63                           bool &                 session_was_reset,
64                           string &               old_session,
65                           bool &                 had_wn_pref_affs,
66                           bool &                 had_reader_pref_affs)
67 {
68     client_was_found = false;
69     session_was_reset = false;
70     old_session.clear();
71     had_wn_pref_affs = false;
72     had_reader_pref_affs = false;
73 
74     // Check if it is an old-style client
75     if (!client.IsComplete())
76         return;
77 
78     map< string, CNSClient >::iterator  known_client;
79     CMutexGuard                         guard(m_Lock);
80 
81     known_client = m_Clients.find(client.GetNode());
82     if (known_client == m_Clients.end()) {
83         // The client is not known yet
84         CNSClient       new_ns_client(client, &m_BlacklistTimeout,
85                                               &m_ReadBlacklistTimeout);
86         unsigned int    client_id = x_GetNextID();
87 
88         new_ns_client.SetID(client_id);
89         client.SetID(client_id);
90         m_Clients[ client.GetNode() ] = new_ns_client;
91         m_RegisteredClients.set_bit(client_id);
92         return;
93     }
94 
95     client_was_found = true;
96 
97     // The client has connected before
98     client.SetID(known_client->second.GetID());
99     old_session = known_client->second.GetSession();
100     if (client.GetSession() != old_session) {
101         session_was_reset = true;
102         ClearClient(client, running_jobs, reading_jobs,
103                     client_was_found, old_session, had_wn_pref_affs,
104                     had_reader_pref_affs);
105     }
106 
107     known_client->second.Touch(client);
108 
109     // The 'reset' client type must not reset the collected types when the next
110     // command is issued
111     if (client.GetType() == eClaimedReset)
112         client.SetClientType(eClaimedAutodetect);
113 }
114 
115 
RegisterSocketWriteError(const CNSClientId & client)116 void CNSClientsRegistry::RegisterSocketWriteError(const CNSClientId &  client)
117 {
118     // Check if it is an old-style client
119     if (!client.IsComplete())
120         return;
121 
122     CMutexGuard                         guard(m_Lock);
123     map< string, CNSClient >::iterator  cl = m_Clients.find(client.GetNode());
124     if (cl != m_Clients.end())
125         cl->second.RegisterSocketWriteError();
126 }
127 
128 
SetLastScope(const CNSClientId & client)129 void CNSClientsRegistry::SetLastScope(const CNSClientId &  client)
130 {
131     // Check if it is an old-style client
132     if (!client.IsComplete())
133         return;
134 
135     CMutexGuard                         guard(m_Lock);
136     map< string, CNSClient >::iterator  cl = m_Clients.find(client.GetNode());
137     if (cl != m_Clients.end())
138         cl->second.SetLastScope(client.GetScope());
139 }
140 
141 
142 void
AppendType(const CNSClientId & client,unsigned int type_to_append)143 CNSClientsRegistry::AppendType(const CNSClientId &  client,
144                                unsigned int         type_to_append)
145 {
146     // Check if it is an old-style client
147     if (!client.IsComplete())
148         return;
149 
150     CMutexGuard                         guard(m_Lock);
151     map< string, CNSClient >::iterator  cl = m_Clients.find(client.GetNode());
152     if (cl != m_Clients.end())
153         cl->second.AppendType(type_to_append);
154 }
155 
156 
157 void
GCBlacklistedJobs(const CJobStatusTracker & tracker,ECommandGroup cmd_group)158 CNSClientsRegistry::GCBlacklistedJobs(const CJobStatusTracker &  tracker,
159                                       ECommandGroup              cmd_group)
160 {
161     CMutexGuard                         guard(m_Lock);
162     map< string, CNSClient >::iterator  k = m_Clients.begin();
163 
164     for ( ; k != m_Clients.end(); ++k)
165         k->second.GCBlacklistedJobs(tracker, cmd_group);
166 }
167 
168 
169 int
SetClientData(const CNSClientId & client,const string & data,int data_version)170 CNSClientsRegistry::SetClientData(const CNSClientId &  client,
171                                   const string &  data, int  data_version)
172 {
173     // Check if it is an old-style client
174     if (!client.IsComplete())
175         NCBI_THROW(CNetScheduleException, eInvalidParameter,
176                    "only non-anonymous clients may set their data");
177 
178     CMutexGuard                     guard(m_Lock);
179     map< string,
180          CNSClient >::iterator      found = m_Clients.find(client.GetNode());
181 
182     if (found == m_Clients.end())
183         NCBI_THROW(CNetScheduleException, eInternalError,
184                    "Cannot find client '" + client.GetNode() +
185                    "' to set client data");
186 
187     return found->second.SetClientData(data, data_version);
188 }
189 
190 
MarkAsAdmin(const CNSClientId & client)191 void  CNSClientsRegistry::MarkAsAdmin(const CNSClientId &  client)
192 {
193     // Check if it is an old-style client
194     if (!client.IsComplete())
195         return;
196 
197     CMutexGuard                 guard(m_Lock);
198     map< string,
199          CNSClient >::iterator  admin = m_Clients.find(client.GetNode());
200 
201     if (admin != m_Clients.end())
202         admin->second.MarkAsAdmin();
203 }
204 
205 
206 // Updates the submitter job.
207 // No need to check session id, it's done in Touch()
AddToSubmitted(const CNSClientId & client,size_t count)208 void  CNSClientsRegistry::AddToSubmitted(const CNSClientId &  client,
209                                          size_t               count)
210 {
211     // Check if it is an old-style client
212     if (!client.IsComplete())
213         return;
214 
215     CMutexGuard                 guard(m_Lock);
216     map< string,
217          CNSClient >::iterator  submitter = m_Clients.find(client.GetNode());
218 
219     if (submitter != m_Clients.end())
220         submitter->second.RegisterSubmittedJobs(count);
221 }
222 
223 
224 // Updates the client jobs list.
225 // No need to check session id, it's done in Touch()
RegisterJob(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)226 void  CNSClientsRegistry::RegisterJob(const CNSClientId &  client,
227                                       unsigned int         job_id,
228                                       ECommandGroup        cmd_group)
229 {
230     // Check if it is an old-style client
231     if (!client.IsComplete())
232         return;
233 
234     CMutexGuard                 guard(m_Lock);
235     map< string,
236          CNSClient >::iterator  cl = m_Clients.find(client.GetNode());
237 
238     if (cl == m_Clients.end())
239         NCBI_THROW(CNetScheduleException, eInternalError,
240                    "Cannot find client '" + client.GetNode() +
241                    "' to register a job");
242 
243     cl->second.RegisterJob(job_id, cmd_group);
244 }
245 
246 
247 // Updates the client blacklisted jobs list.
248 // No need to check session id, it's done in Touch()
RegisterBlacklistedJob(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)249 void  CNSClientsRegistry::RegisterBlacklistedJob(const CNSClientId &  client,
250                                                  unsigned int         job_id,
251                                                  ECommandGroup        cmd_group)
252 {
253     // Check if it is an old-style client
254     if (!client.IsComplete())
255         return;
256 
257     CMutexGuard                 guard(m_Lock);
258     map< string,
259          CNSClient >::iterator  cl = m_Clients.find(client.GetNode());
260 
261     if (cl == m_Clients.end())
262         NCBI_THROW(CNetScheduleException, eInternalError,
263                    "Cannot find client '" + client.GetNode() +
264                    "' to set blacklisted job");
265 
266     cl->second.RegisterBlacklistedJob(job_id, cmd_group);
267 }
268 
269 
270 // Updates the client jobs list.
271 // No need to check session id, it's done in Touch()
UnregisterJob(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)272 void  CNSClientsRegistry::UnregisterJob(const CNSClientId &  client,
273                                         unsigned int         job_id,
274                                         ECommandGroup        cmd_group)
275 {
276     // Check if it is an old-style client
277     if (!client.IsComplete())
278         return;
279 
280     CMutexGuard                 guard(m_Lock);
281     map< string,
282          CNSClient >::iterator  cl = m_Clients.find(client.GetNode());
283 
284     if (cl != m_Clients.end())
285         cl->second.UnregisterJob(job_id, cmd_group);
286 }
287 
288 
289 // Updates the client jobs list.
290 // Used when a job is timed out. At this moment there is no information about
291 // the node identifier so all the clients are checked.
UnregisterJob(unsigned int job_id,ECommandGroup cmd_group)292 void  CNSClientsRegistry::UnregisterJob(unsigned int   job_id,
293                                         ECommandGroup  cmd_group)
294 {
295     // The container itself is not changed.
296     // The changes are in what the element holds.
297     CMutexGuard                         guard(m_Lock);
298     map< string, CNSClient >::iterator  k = m_Clients.begin();
299 
300     for ( ; k != m_Clients.end(); ++k)
301         k->second.UnregisterJob(job_id, cmd_group);
302 }
303 
304 
305 // Moves the client job into a blacklist.
306 // No need to check session id, it's done in Touch()
MoveJobToBlacklist(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)307 void  CNSClientsRegistry::MoveJobToBlacklist(const CNSClientId &  client,
308                                              unsigned int         job_id,
309                                              ECommandGroup        cmd_group)
310 {
311     // Check if it is an old-style client
312     if (!client.IsComplete())
313         return;
314 
315     CMutexGuard                 guard(m_Lock);
316     map< string,
317          CNSClient >::iterator  cl = m_Clients.find(client.GetNode());
318 
319     if (cl != m_Clients.end())
320         cl->second.MoveJobToBlacklist(job_id, cmd_group);
321 }
322 
323 
324 // Updates the client job list and its blacklist.
325 // Used when  the job is timed out and rescheduled as well as when
326 // RDRB/RETURN or FRED/FPUT are received
MoveJobToBlacklist(unsigned int job_id,ECommandGroup cmd_group)327 void  CNSClientsRegistry::MoveJobToBlacklist(unsigned int   job_id,
328                                              ECommandGroup  cmd_group)
329 {
330     // The container itself is not changed.
331     // The changes are in what the element holds.
332     CMutexGuard                         guard(m_Lock);
333     map< string, CNSClient >::iterator  k = m_Clients.begin();
334 
335     for ( ; k != m_Clients.end(); ++k)
336         if (k->second.MoveJobToBlacklist(job_id, cmd_group))
337             break;
338 }
339 
340 
341 
342 // Used in the following situations:
343 // - CLRN
344 // - new session
345 // - GC for a client (it is not strictly required because it may happened only
346 //   for inactive clients, i.e. timeouted or after CLRN
347 void
ClearClient(const CNSClientId & client,TNSBitVector & running_jobs,TNSBitVector & reading_jobs,bool & client_was_found,string & old_session,bool & had_wn_pref_affs,bool & had_reader_pref_affs)348 CNSClientsRegistry::ClearClient(const CNSClientId &    client,
349                                 TNSBitVector &         running_jobs,
350                                 TNSBitVector &         reading_jobs,
351                                 bool &                 client_was_found,
352                                 string &               old_session,
353                                 bool &                 had_wn_pref_affs,
354                                 bool &                 had_reader_pref_affs)
355 {
356     client_was_found = false;
357 
358     // Check if it is an old-style client
359     if (client.IsComplete())
360         ClearClient(client.GetNode(), running_jobs, reading_jobs,
361                     client_was_found, old_session, had_wn_pref_affs,
362                     had_reader_pref_affs);
363 }
364 
365 
366 void
ClearClient(const string & node_name,TNSBitVector & running_jobs,TNSBitVector & reading_jobs,bool & client_was_found,string & old_session,bool & had_wn_pref_affs,bool & had_reader_pref_affs)367 CNSClientsRegistry::ClearClient(const string &         node_name,
368                                 TNSBitVector &         running_jobs,
369                                 TNSBitVector &         reading_jobs,
370                                 bool &                 client_was_found,
371                                 string &               old_session,
372                                 bool &                 had_wn_pref_affs,
373                                 bool &                 had_reader_pref_affs)
374 {
375     client_was_found = false;
376 
377     CMutexGuard                         guard(m_Lock);
378     map< string, CNSClient >::iterator  cl = m_Clients.find(node_name);
379 
380     if (cl != m_Clients.end()) {
381         client_was_found = true;
382         old_session = cl->second.GetSession();
383         cl->second.SetSession("");
384         cl->second.SetState(CNSClient::eQuit);
385         cl->second.SetClaimedType(eClaimedNotProvided);
386         x_ClearClient(node_name, cl->second, running_jobs,
387                       had_wn_pref_affs, eGet);
388         x_ClearClient(node_name, cl->second, reading_jobs,
389                       had_reader_pref_affs, eRead);
390     }
391 }
392 
393 
394 // Must be called under a lock
395 // When a client inactivity timeout is detected there is no need to touch the
396 // running or reading jobs. The only things to be done are:
397 // - cancel waiting
398 // - reset pref affinities
399 // - set reset affinity flag
400 // - log the event
401 void
ClearOnTimeout(CNSClient & client,const string & client_node,bool is_log,ECommandGroup cmd_group)402 CNSClientsRegistry::ClearOnTimeout(CNSClient &      client,
403                                    const string &   client_node,
404                                    bool             is_log,
405                                    ECommandGroup    cmd_group)
406 {
407     // Deregister preferred affinities
408     bool    had_pref_affs = client.HasPreferredAffinities(cmd_group);
409     if (had_pref_affs) {
410         m_AffRegistry->RemoveClientFromAffinities(
411                                     client.GetID(),
412                                     client.GetPreferredAffinities(cmd_group),
413                                     cmd_group);
414         client.ClearPreferredAffinities(cmd_group);
415 
416         if (is_log) {
417             string      aff_part = "get";
418             if (cmd_group == eRead)
419                 aff_part = "read";
420 
421             CRef<CRequestContext>   ctx;
422             ctx.Reset(new CRequestContext());
423             ctx->SetRequestID();
424             GetDiagContext().SetRequestContext(ctx);
425             GetDiagContext().PrintRequestStart()
426                             .Print("_type", "client_watch")
427                             .Print("client_node", client_node)
428                             .Print("client_session", client.GetSession())
429                             .Print(aff_part + "_preferred_affinities_reset",
430                                    "yes");
431             ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
432             GetDiagContext().PrintRequestStop();
433         }
434 
435         client.SetAffinityReset(true, cmd_group);
436     }
437 
438     CancelWaiting(client, cmd_group);
439     if (had_pref_affs)
440         x_BuildAffinities(cmd_group);
441 }
442 
443 
444 string
PrintClientsList(const CQueue * queue,size_t batch_size,bool verbose) const445 CNSClientsRegistry::PrintClientsList(const CQueue *               queue,
446                                      size_t                       batch_size,
447                                      bool                         verbose) const
448 {
449     TNSBitVector        batch;
450     string              result;
451 
452     TNSBitVector                registered_clients = GetRegisteredClients();
453     TNSBitVector::enumerator    en(registered_clients.first());
454 
455     while (en.valid()) {
456         batch.set_bit(*en);
457         ++en;
458 
459         if (batch.count() >= batch_size) {
460             result += x_PrintSelected(batch, queue, verbose) + "\n";
461             batch.clear();
462         }
463     }
464 
465     if (batch.count() > 0)
466         result += x_PrintSelected(batch, queue, verbose) + "\n";
467     return result;
468 }
469 
470 
471 string
x_PrintSelected(const TNSBitVector & batch,const CQueue * queue,bool verbose) const472 CNSClientsRegistry::x_PrintSelected(const TNSBitVector &  batch,
473                                     const CQueue *        queue,
474                                     bool                  verbose) const
475 {
476     string      buffer;
477     size_t      printed = 0;
478 
479     CMutexGuard                                 guard(m_Lock);
480     map< string, CNSClient >::const_iterator    k = m_Clients.begin();
481 
482     for ( ; k != m_Clients.end(); ++k) {
483         if (batch.get_bit(k->second.GetID())) {
484             buffer += k->second.Print(k->first, queue, *m_AffRegistry,
485                                       m_GCWNodeClients, m_GCReaderClients,
486                                       verbose);
487             ++printed;
488             if (printed >= batch.count())
489                 break;
490         }
491     }
492 
493     return buffer;
494 }
495 
496 
497 void
SetNodeWaiting(const CNSClientId & client,unsigned short port,const TNSBitVector & aff_ids,ECommandGroup cmd_group)498 CNSClientsRegistry::SetNodeWaiting(const CNSClientId &   client,
499                                    unsigned short        port,
500                                    const TNSBitVector &  aff_ids,
501                                    ECommandGroup         cmd_group)
502 {
503     // Check if it is an old-style client
504     if (!client.IsComplete())
505         return;
506 
507     CMutexGuard                 guard(m_Lock);
508     map< string,
509          CNSClient >::iterator  node = m_Clients.find(client.GetNode());
510 
511     if (node == m_Clients.end())
512         NCBI_THROW(CNetScheduleException, eInternalError,
513                    "Cannot find client '" + client.GetNode() +
514                    "' to set waiting attributes");
515 
516     node->second.SetWaitPort(port, cmd_group);
517     node->second.SetWaitAffinities(aff_ids, cmd_group);
518     m_AffRegistry->SetWaitClientForAffinities(node->second.GetID(), aff_ids,
519                                               cmd_group);
520 }
521 
522 
523 // Handles the following cases:
524 // - GET2/READ with waiting has been interrupted by another GET2/READ
525 // - CWGET/CWREAD received
526 // - GET2/READ wait timeout is over
527 // - notification timeout is over
CancelWaiting(CNSClient & client,ECommandGroup cmd_group,bool touch_notif_registry)528 bool CNSClientsRegistry::CancelWaiting(CNSClient &    client,
529                                        ECommandGroup  cmd_group,
530                                        bool           touch_notif_registry)
531 {
532     bool        ret_val = false;
533     // Deregister wait affinities if so
534     if (client.HasWaitAffinities(cmd_group))
535         m_AffRegistry->RemoveWaitClientFromAffinities(
536                                             client.GetID(),
537                                             client.GetWaitAffinities(cmd_group),
538                                             cmd_group);
539 
540     // Remove from notifications if wait port is non-zero
541     unsigned short  port = client.GetWaitPort(cmd_group);
542     if (port != 0) {
543         ret_val = true;
544 
545         // One of the cases is when a notification timeout is over.
546         // In this case it is detected in the notification registry and the
547         // corresponding record is already deleted.
548         if (touch_notif_registry)
549             m_NotifRegistry->UnregisterListener(client.GetPeerAddress(), port,
550                                                 cmd_group);
551     }
552 
553     // Clear affinities and port
554     client.CancelWaiting(cmd_group);
555     return ret_val;
556 }
557 
558 
559 bool
CancelWaiting(const CNSClientId & client,ECommandGroup cmd_group)560 CNSClientsRegistry::CancelWaiting(const CNSClientId &  client,
561                                   ECommandGroup        cmd_group)
562 {
563     // Check if it is an old-style client
564     if (client.IsComplete())
565         return CancelWaiting(client.GetNode(), cmd_group);
566     return false;
567 }
568 
569 
570 bool
CancelWaiting(const string & node_name,ECommandGroup cmd_group,bool touch_notif_registry)571 CNSClientsRegistry::CancelWaiting(const string &  node_name,
572                                   ECommandGroup   cmd_group,
573                                   bool            touch_notif_registry)
574 {
575     if (node_name.empty())
576         return false;
577 
578     CMutexGuard                         guard(m_Lock);
579     map< string, CNSClient >::iterator  cl = m_Clients.find(node_name);
580 
581     if (cl != m_Clients.end())
582         return CancelWaiting(cl->second, cmd_group, touch_notif_registry);
583     return false;
584 }
585 
586 
587 void
SubtractBlacklistedJobs(const CNSClientId & client,ECommandGroup cmd_group,TNSBitVector & bv) const588 CNSClientsRegistry::SubtractBlacklistedJobs(
589                                 const CNSClientId &  client,
590                                 ECommandGroup        cmd_group,
591                                 TNSBitVector &       bv) const
592 {
593     if (client.IsComplete())
594         SubtractBlacklistedJobs(client.GetNode(), cmd_group, bv);
595 }
596 
597 
598 void
SubtractBlacklistedJobs(const string & client_node,ECommandGroup cmd_group,TNSBitVector & bv) const599 CNSClientsRegistry::SubtractBlacklistedJobs(
600                                 const string &  client_node,
601                                 ECommandGroup   cmd_group,
602                                 TNSBitVector &  bv) const
603 {
604     CMutexGuard                         guard(m_Lock);
605     map< string,
606          CNSClient >::const_iterator    found = m_Clients.find(client_node);
607 
608     if (found != m_Clients.end())
609         found->second.SubtractBlacklistedJobs(cmd_group, bv);
610 }
611 
612 
613 void
GetScopes(const string & client_node,string & scope,string & virtual_scope)614 CNSClientsRegistry::GetScopes(const string &  client_node,
615                               string &  scope, string &  virtual_scope)
616 {
617     scope.clear();
618     virtual_scope.clear();
619 
620     CMutexGuard                         guard(m_Lock);
621     map< string,
622          CNSClient >::const_iterator    found = m_Clients.find(client_node);
623 
624     if (found != m_Clients.end()) {
625         scope = found->second.GetLastScope();
626         virtual_scope = found->second.GetVirtualScope(client_node);
627     }
628 }
629 
630 
631 void
AddBlacklistedJobs(const CNSClientId & client,ECommandGroup cmd_group,TNSBitVector & bv) const632 CNSClientsRegistry::AddBlacklistedJobs(
633                                 const CNSClientId &  client,
634                                 ECommandGroup        cmd_group,
635                                 TNSBitVector &       bv) const
636 {
637     if (client.IsComplete())
638         AddBlacklistedJobs(client.GetNode(), cmd_group, bv);
639 }
640 
641 
642 void
AddBlacklistedJobs(const string & client_node,ECommandGroup cmd_group,TNSBitVector & bv) const643 CNSClientsRegistry::AddBlacklistedJobs(
644                                 const string &  client_node,
645                                 ECommandGroup   cmd_group,
646                                 TNSBitVector &  bv) const
647 {
648     CMutexGuard                         guard(m_Lock);
649     map< string,
650          CNSClient >::const_iterator    found = m_Clients.find(client_node);
651 
652     if (found != m_Clients.end())
653         found->second.AddBlacklistedJobs(cmd_group, bv);
654 }
655 
656 
657 TNSBitVector
GetPreferredAffinities(const CNSClientId & client,ECommandGroup cmd_group) const658 CNSClientsRegistry::GetPreferredAffinities(const CNSClientId &  client,
659                                            ECommandGroup        cmd_group) const
660 {
661     if (!client.IsComplete())
662         return kEmptyBitVector;
663     return GetPreferredAffinities(client.GetNode(), cmd_group);
664 }
665 
666 
667 TNSBitVector
GetPreferredAffinities(const string & node,ECommandGroup cmd_group) const668 CNSClientsRegistry::GetPreferredAffinities(const string &  node,
669                                            ECommandGroup   cmd_group) const
670 {
671     if (node.empty())
672         return kEmptyBitVector;
673 
674     CMutexGuard                                 guard(m_Lock);
675     map< string, CNSClient >::const_iterator    found = m_Clients.find(node);
676 
677     if (found == m_Clients.end())
678         return kEmptyBitVector;
679     return found->second.GetPreferredAffinities(cmd_group);
680 }
681 
682 
683 TNSBitVector
GetAllPreferredAffinities(ECommandGroup cmd_group) const684 CNSClientsRegistry::GetAllPreferredAffinities(ECommandGroup   cmd_group) const
685 {
686     CMutexGuard     guard(m_Lock);
687 
688     if (cmd_group == eGet)
689         return m_WNodeAffinities;
690     return m_ReaderAffinities;
691 }
692 
693 
694 TNSBitVector
GetWaitAffinities(const CNSClientId & client,ECommandGroup cmd_group) const695 CNSClientsRegistry::GetWaitAffinities(const CNSClientId &  client,
696                                       ECommandGroup        cmd_group) const
697 {
698     if (!client.IsComplete())
699         return kEmptyBitVector;
700     return GetWaitAffinities(client.GetNode(), cmd_group);
701 }
702 
703 
704 TNSBitVector
GetWaitAffinities(const string & node,ECommandGroup cmd_group) const705 CNSClientsRegistry::GetWaitAffinities(const string &  node,
706                                       ECommandGroup   cmd_group) const
707 {
708     if (node.empty())
709         return kEmptyBitVector;
710 
711     CMutexGuard                                 guard(m_Lock);
712     map< string, CNSClient >::const_iterator    found = m_Clients.find(node);
713 
714     if (found == m_Clients.end())
715         return kEmptyBitVector;
716     return found->second.GetWaitAffinities(cmd_group);
717 }
718 
719 
GetRegisteredClients(void) const720 TNSBitVector  CNSClientsRegistry::GetRegisteredClients(void) const
721 {
722     CMutexGuard         guard(m_Lock);
723     return m_RegisteredClients;
724 }
725 
726 
727 void
UpdatePreferredAffinities(const CNSClientId & client,const TNSBitVector & aff_to_add,const TNSBitVector & aff_to_del,ECommandGroup cmd_group)728 CNSClientsRegistry::UpdatePreferredAffinities(const CNSClientId &   client,
729                                               const TNSBitVector &  aff_to_add,
730                                               const TNSBitVector &  aff_to_del,
731                                               ECommandGroup         cmd_group)
732 {
733     if (!client.IsComplete())
734         return;
735 
736     CMutexGuard                 guard(m_Lock);
737     map< string,
738          CNSClient >::iterator  found = m_Clients.find(client.GetNode());
739 
740     if (found == m_Clients.end())
741         NCBI_THROW(CNetScheduleException, eInternalError,
742                    "Cannot find client '" + client.GetNode() +
743                    "' to update preferred affinities");
744 
745     m_AffRegistry->AddClientToAffinities(client.GetID(), aff_to_add, cmd_group);
746     m_AffRegistry->RemoveClientFromAffinities(client.GetID(), aff_to_del,
747                                               cmd_group);
748 
749     found->second.AddPreferredAffinities(aff_to_add, cmd_group);
750     found->second.RemovePreferredAffinities(aff_to_del, cmd_group);
751 
752     // Update the union bit vector with WN/reader affinities
753     if (aff_to_del.any())
754         x_BuildAffinities(cmd_group);
755     else {
756         if (aff_to_add.any()) {
757             if (cmd_group == eGet)
758                 m_WNodeAffinities |= aff_to_add;
759             else
760                 m_ReaderAffinities |= aff_to_add;
761         }
762     }
763 }
764 
765 
766 bool
UpdatePreferredAffinities(const CNSClientId & client,unsigned int aff_to_add,unsigned int aff_to_del,ECommandGroup cmd_group)767 CNSClientsRegistry::UpdatePreferredAffinities(const CNSClientId &   client,
768                                               unsigned int          aff_to_add,
769                                               unsigned int          aff_to_del,
770                                               ECommandGroup         cmd_group)
771 {
772     if (aff_to_add + aff_to_del == 0)
773         return false;
774 
775     if (!client.IsComplete())
776         return false;
777 
778     bool                        aff_added = false;
779     CMutexGuard                 guard(m_Lock);
780     map< string,
781          CNSClient >::iterator  found = m_Clients.find(client.GetNode());
782 
783     if (found == m_Clients.end())
784         NCBI_THROW(CNetScheduleException, eInternalError,
785                    "Cannot find client '" + client.GetNode() +
786                    "' to update preferred affinities");
787 
788     if (aff_to_add != 0) {
789         m_AffRegistry->AddClientToAffinity(client.GetID(), aff_to_add,
790                                            cmd_group);
791         aff_added = found->second.AddPreferredAffinity(aff_to_add,
792                                                        cmd_group);
793     }
794 
795     if (aff_to_del != 0) {
796         m_AffRegistry->RemoveClientFromAffinities(client.GetID(), aff_to_del,
797                                               cmd_group);
798         found->second.RemovePreferredAffinity(aff_to_del, cmd_group);
799     }
800 
801     // Update the union bit vector with WN/reader affinities
802     if (aff_to_del != 0)
803         x_BuildAffinities(cmd_group);
804     else {
805         if (aff_added) {
806             if (cmd_group == eGet)
807                 m_WNodeAffinities.set_bit(aff_to_add);
808             else
809                 m_ReaderAffinities.set_bit(aff_to_add);
810         }
811     }
812     return aff_added;
813 }
814 
815 
816 void
SetPreferredAffinities(const CNSClientId & client,const TNSBitVector & aff_to_set,ECommandGroup cmd_group)817 CNSClientsRegistry::SetPreferredAffinities(const CNSClientId &   client,
818                                            const TNSBitVector &  aff_to_set,
819                                            ECommandGroup         cmd_group)
820 {
821     if (!client.IsComplete())
822         return;
823 
824     string                      client_name = client.GetNode();
825     CMutexGuard                 guard(m_Lock);
826     map< string,
827          CNSClient >::iterator  found = m_Clients.find(client_name);
828 
829     if (found == m_Clients.end())
830         NCBI_THROW(CNetScheduleException, eInternalError,
831                    "Cannot find client '" + client.GetNode() +
832                    "' to update preferred affinities");
833 
834     TNSBitVector    curr_affs = found->second.GetPreferredAffinities(cmd_group);
835     TNSBitVector    aff_to_add = aff_to_set - curr_affs;
836     TNSBitVector    aff_to_del = curr_affs - aff_to_set;
837 
838     if (aff_to_add.any())
839         m_AffRegistry->AddClientToAffinities(client.GetID(), aff_to_add,
840                                              cmd_group);
841 
842     if (aff_to_del.any())
843         m_AffRegistry->RemoveClientFromAffinities(client.GetID(), aff_to_del,
844                                               cmd_group);
845 
846     found->second.SetPreferredAffinities(aff_to_set, cmd_group);
847 
848     // Update the union bit vector with WN/reader affinities
849     if (aff_to_del.any())
850         x_BuildAffinities(cmd_group);
851     else {
852         if (aff_to_add.any()) {
853             if (cmd_group == eGet)
854                 m_WNodeAffinities |= aff_to_add;
855             else
856                 m_ReaderAffinities |= aff_to_add;
857         }
858     }
859 
860     // Remove the client from the GC collected, because it affects affinities
861     // only which are re-set here anyway
862     if (cmd_group == eGet)
863         m_GCWNodeClients.erase(client_name);
864     else
865         m_GCReaderClients.erase(client_name);
866 }
867 
868 
869 bool
IsRequestedAffinity(const string & name,const TNSBitVector & aff,bool use_preferred,ECommandGroup cmd_group) const870 CNSClientsRegistry::IsRequestedAffinity(
871                             const string &         name,
872                             const TNSBitVector &   aff,
873                             bool                   use_preferred,
874                             ECommandGroup          cmd_group) const
875 {
876     if (name.empty())
877         return false;
878 
879     CMutexGuard                         guard(m_Lock);
880     map< string,
881          CNSClient >::const_iterator    node = m_Clients.find(name);
882 
883     if (node == m_Clients.end())
884         return false;
885 
886     return node->second.IsRequestedAffinity(aff, use_preferred, cmd_group);
887 }
888 
889 
890 bool
IsPreferredByAny(unsigned int aff_id,ECommandGroup cmd_group) const891 CNSClientsRegistry::IsPreferredByAny(unsigned int   aff_id,
892                                      ECommandGroup  cmd_group) const
893 {
894     CMutexGuard     guard(m_Lock);
895     if (cmd_group == eGet)
896         return m_WNodeAffinities.get_bit(aff_id);
897     return m_ReaderAffinities.get_bit(aff_id);
898 }
899 
900 
901 bool
GetAffinityReset(const CNSClientId & client,ECommandGroup cmd_group) const902 CNSClientsRegistry::GetAffinityReset(const CNSClientId &   client,
903                                      ECommandGroup         cmd_group) const
904 {
905     if (!client.IsComplete())
906         return false;
907 
908     CMutexGuard                                 guard(m_Lock);
909     map< string, CNSClient >::const_iterator    found =
910                                              m_Clients.find(client.GetNode());
911 
912     if (found == m_Clients.end())
913         return false;
914 
915     return found->second.GetAffinityReset(cmd_group);
916 }
917 
918 
GetNodeName(unsigned int id) const919 string  CNSClientsRegistry::GetNodeName(unsigned int  id) const
920 {
921     CMutexGuard                                 guard(m_Lock);
922     map< string, CNSClient >::const_iterator    k = m_Clients.begin();
923 
924     for ( ; k != m_Clients.end(); ++k )
925         if (k->second.GetID() == id)
926             return k->first;
927     return "";
928 }
929 
930 
931 // true if the node could be made stale
932 bool
x_CouldBeStale(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout,const CNSClient & client,ECommandGroup cmd_group)933 CNSClientsRegistry::x_CouldBeStale(const CNSPreciseTime &  current_time,
934                                    const CNSPreciseTime &  timeout,
935                                    const CNSClient &       client,
936                                    ECommandGroup           cmd_group)
937 {
938     if (current_time - client.GetLastAccess() <= timeout)
939         return false;
940 
941     // The client may wait on GET2/READ longer than the configured timeout.
942     // Take the longest timeout of two if so.
943     unsigned short      wait_port = client.GetWaitPort(cmd_group);
944     unsigned int        wait_address = client.GetPeerAddress();
945     if (wait_port != 0) {
946         CNSPreciseTime      get_lifetime = m_NotifRegistry->
947                                 GetPassiveNotificationLifetime(wait_address,
948                                                                wait_port,
949                                                                cmd_group);
950         if (current_time <= get_lifetime)
951             return false;
952     }
953     return true;
954 }
955 
956 
957 void
StaleNodes(const CNSPreciseTime & current_time,const CNSPreciseTime & wn_timeout,const CNSPreciseTime & reader_timeout,bool is_log)958 CNSClientsRegistry::StaleNodes(const CNSPreciseTime &  current_time,
959                                const CNSPreciseTime &  wn_timeout,
960                                const CNSPreciseTime &  reader_timeout,
961                                bool                    is_log)
962 {
963     // Checks if any of the worker nodes are inactive for too long
964     CNSClient::ENSClientState               state;
965     unsigned int                            type;
966     CMutexGuard                             guard(m_Lock);
967     map< string, CNSClient >::iterator      k = m_Clients.begin();
968 
969     for ( ; k != m_Clients.end(); ++k ) {
970         state = k->second.GetState();
971         if (state == CNSClient::eQuit || state == CNSClient::eWNAndReaderStale)
972             continue;
973 
974         type = k->second.GetType();
975         if ((type & (CNSClient::eWorkerNode | CNSClient::eReader)) == 0)
976             continue;
977 
978         if ((type & CNSClient::eWorkerNode) != 0 &&
979             state != CNSClient::eWNStale) {
980             // This an active WN which should be checked
981             if (x_CouldBeStale(current_time, wn_timeout,
982                                k->second, eGet)) {
983                 ClearOnTimeout(k->second, k->first, is_log, eGet);
984                 if (state == CNSClient::eActive)
985                     k->second.SetState(CNSClient::eWNStale);
986                 else
987                     k->second.SetState(CNSClient::eWNAndReaderStale);
988             }
989         }
990 
991         if ((type & CNSClient::eReader) != 0 &&
992             state != CNSClient::eReaderStale) {
993             // This an active reader which should be checked
994             if (x_CouldBeStale(current_time, reader_timeout,
995                                k->second, eRead)) {
996                 ClearOnTimeout(k->second, k->first, is_log, eRead);
997                 if (state == CNSClient::eActive)
998                     k->second.SetState(CNSClient::eReaderStale);
999                 else
1000                     k->second.SetState(CNSClient::eWNAndReaderStale);
1001             }
1002         }
1003     }
1004 }
1005 
1006 
Purge(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_worker_node,unsigned int min_worker_nodes,const CNSPreciseTime & timeout_admin,unsigned int min_admins,const CNSPreciseTime & timeout_submitter,unsigned int min_submitters,const CNSPreciseTime & timeout_reader,unsigned int min_readers,const CNSPreciseTime & timeout_unknown,unsigned int min_unknowns,bool is_log)1007 void  CNSClientsRegistry::Purge(const CNSPreciseTime &  current_time,
1008                                 const CNSPreciseTime &  timeout_worker_node,
1009                                 unsigned int            min_worker_nodes,
1010                                 const CNSPreciseTime &  timeout_admin,
1011                                 unsigned int            min_admins,
1012                                 const CNSPreciseTime &  timeout_submitter,
1013                                 unsigned int            min_submitters,
1014                                 const CNSPreciseTime &  timeout_reader,
1015                                 unsigned int            min_readers,
1016                                 const CNSPreciseTime &  timeout_unknown,
1017                                 unsigned int            min_unknowns,
1018                                 bool                    is_log)
1019 {
1020     CMutexGuard     guard(m_Lock);
1021 
1022     x_PurgeWNodesAndReaders(current_time, timeout_worker_node, min_worker_nodes,
1023                             timeout_reader, min_readers, is_log);
1024     x_PurgeAdmins(current_time, timeout_admin, min_admins, is_log);
1025     x_PurgeSubmitters(current_time, timeout_submitter, min_submitters, is_log);
1026     x_PurgeUnknowns(current_time, timeout_unknown, min_unknowns, is_log);
1027 }
1028 
1029 
x_GetNextID(void)1030 unsigned int  CNSClientsRegistry::x_GetNextID(void)
1031 {
1032     CFastMutexGuard     guard(m_LastIDLock);
1033 
1034     // 0 is an invalid value, so make sure the ID != 0
1035     ++m_LastID;
1036     if (m_LastID == 0)
1037         m_LastID = 1;
1038     return m_LastID;
1039 }
1040 
1041 
1042 // Must be called under the lock
x_BuildAffinities(ECommandGroup cmd_group)1043 void  CNSClientsRegistry::x_BuildAffinities(ECommandGroup  cmd_group)
1044 {
1045     if (cmd_group == eRead) {
1046         m_ReaderAffinities.clear();
1047         for (map< string, CNSClient >::const_iterator
1048                 k = m_Clients.begin(); k != m_Clients.end(); ++k )
1049             if (k->second.GetType() & CNSClient::eReader)
1050                 m_ReaderAffinities |= k->second.GetPreferredAffinities(eRead);
1051     } else {
1052         m_WNodeAffinities.clear();
1053         for (map< string, CNSClient >::const_iterator
1054                 k = m_Clients.begin(); k != m_Clients.end(); ++k )
1055             if (k->second.GetType() & CNSClient::eWorkerNode)
1056                 m_WNodeAffinities |= k->second.GetPreferredAffinities(eGet);
1057     }
1058 }
1059 
1060 
1061 bool
WasGarbageCollected(const CNSClientId & client,ECommandGroup cmd_group) const1062 CNSClientsRegistry::WasGarbageCollected(const CNSClientId &  client,
1063                                         ECommandGroup        cmd_group) const
1064 {
1065     if (!client.IsComplete())
1066         return false;
1067 
1068     CMutexGuard                                 guard(m_Lock);
1069     if (cmd_group == eGet)
1070         return m_GCWNodeClients.find(client.GetNode()) !=
1071                m_GCWNodeClients.end();
1072     return m_GCReaderClients.find(client.GetNode()) !=
1073            m_GCReaderClients.end();
1074 }
1075 
1076 
1077 // Must be called under the lock
1078 void
x_ClearClient(const string & node_name,CNSClient & client,TNSBitVector & jobs,bool & had_pref_affs,ECommandGroup cmd_group)1079 CNSClientsRegistry::x_ClearClient(const string &     node_name,
1080                                   CNSClient &        client,
1081                                   TNSBitVector &     jobs,
1082                                   bool &             had_pref_affs,
1083                                   ECommandGroup      cmd_group)
1084 {
1085     jobs = client.GetJobs(cmd_group);
1086     client.ClearJobs(cmd_group);
1087 
1088     // Deregister preferred affinities
1089     had_pref_affs = client.HasPreferredAffinities(cmd_group);
1090     if (had_pref_affs) {
1091         m_AffRegistry->RemoveClientFromAffinities(
1092                                     client.GetID(),
1093                                     client.GetPreferredAffinities(cmd_group),
1094                                     cmd_group);
1095         client.ClearPreferredAffinities(cmd_group);
1096     }
1097 
1098     CancelWaiting(client, cmd_group);
1099     if (had_pref_affs)
1100         x_BuildAffinities(cmd_group);
1101 
1102     // Client has gone. No need to remember it if it was
1103     // garbage collected previously.
1104     if (cmd_group == eGet)
1105         m_GCWNodeClients.erase(node_name);
1106     else
1107         m_GCReaderClients.erase(node_name);
1108 }
1109 
1110 
1111 // Compares last access time of two clients basing on their identifiers
1112 class AgeFunctor
1113 {
1114     public:
AgeFunctor(map<string,CNSClient> & clients)1115         AgeFunctor(map< string, CNSClient > &  clients) :
1116             m_Clients(clients)
1117         {}
1118 
operator ()(const string & lhs,const string & rhs)1119         bool operator () (const string &  lhs, const string &  rhs)
1120         {
1121             return m_Clients[ lhs ].GetLastAccess() <
1122                    m_Clients[ rhs ].GetLastAccess();
1123         }
1124 
1125     private:
1126         map< string, CNSClient > &    m_Clients;
1127 };
1128 
1129 
1130 // Must be called under the lock
1131 void
x_PurgeWNodesAndReaders(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_worker_node,unsigned int min_worker_nodes,const CNSPreciseTime & timeout_reader,unsigned int min_readers,bool is_log)1132 CNSClientsRegistry::x_PurgeWNodesAndReaders(
1133                             const CNSPreciseTime &  current_time,
1134                             const CNSPreciseTime &  timeout_worker_node,
1135                             unsigned int            min_worker_nodes,
1136                             const CNSPreciseTime &  timeout_reader,
1137                             unsigned int            min_readers,
1138                             bool                    is_log)
1139 {
1140     map< string, CNSClient >::iterator  k = m_Clients.begin();
1141     list< string >                      inactive_wns;
1142     list< string >                      inactive_readers;
1143     unsigned int                        total_wn_count = 0;
1144     unsigned int                        total_reader_count = 0;
1145     unsigned int                        type;
1146     CNSClient::ENSClientState           state;
1147 
1148     // Count total WNs and readers as well as detect purge candidates
1149     for ( ; k != m_Clients.end(); ++k ) {
1150         type = k->second.GetType();
1151 
1152         if ((type & CNSClient::eWorkerNode) != 0)
1153             ++total_wn_count;
1154         else if ((type & CNSClient::eReader) != 0)
1155             ++total_reader_count;
1156         else
1157             continue;
1158 
1159         state = k->second.GetState();
1160         if (state == CNSClient::eActive)
1161             continue;
1162 
1163         if ((type & (CNSClient::eWorkerNode | CNSClient::eReader)) ==
1164                 CNSClient::eWorkerNode) {
1165             // The client is exclusively a worker node
1166             if (state != CNSClient::eWNStale &&
1167                 state != CNSClient::eWNAndReaderStale &&
1168                 state != CNSClient::eQuit)
1169                 continue;
1170 
1171             // Test if the wn timeout is over
1172             if (current_time - k->second.GetLastAccess() >
1173                     timeout_worker_node)
1174                 inactive_wns.push_back(k->first);
1175 
1176         } else if ((type & (CNSClient::eWorkerNode | CNSClient::eReader)) ==
1177                 CNSClient::eReader) {
1178             // The client is exclusively a reader
1179             if (state != CNSClient::eReaderStale &&
1180                 state != CNSClient::eWNAndReaderStale &&
1181                 state != CNSClient::eQuit)
1182                 continue;
1183 
1184             // Test if the reader timeout is over
1185             if (current_time - k->second.GetLastAccess() >
1186                     timeout_reader)
1187                 inactive_readers.push_back(k->first);
1188 
1189         } else {
1190             // The client is both a worker node and a reader
1191             if (state != CNSClient::eWNAndReaderStale &&
1192                 state != CNSClient::eQuit)
1193                 continue;
1194 
1195             // Test bothe WN and reader timeouts
1196             if (current_time - k->second.GetLastAccess() >
1197                     timeout_worker_node &&
1198                 current_time - k->second.GetLastAccess() >
1199                     timeout_reader) {
1200                 inactive_wns.push_back(k->first);
1201                 inactive_readers.push_back(k->first);
1202             }
1203         }
1204     }
1205 
1206     // Deal with worker nodes first
1207     if (total_wn_count > min_worker_nodes  &&  ! inactive_wns.empty()) {
1208 
1209         // Calculate the number of records to be deleted
1210         unsigned int    active_count = total_wn_count - inactive_wns.size();
1211         unsigned int    remove_count = 0;
1212 
1213         if (active_count >= min_worker_nodes)
1214             remove_count = inactive_wns.size();
1215         else
1216             remove_count = total_wn_count - min_worker_nodes;
1217 
1218         // Sort the inactive ones to delete the oldest
1219         inactive_wns.sort(AgeFunctor(m_Clients));
1220 
1221         // Delete the oldest
1222         for (list<string>::iterator  j = inactive_wns.begin();
1223              j != inactive_wns.end() && remove_count > 0; ++j, --remove_count) {
1224 
1225             // The affinity reset flag is NOT set if:
1226             // - the client issued CLEAR
1227             // - the client did not have preferred affinities
1228             // It makes sense to memo the client only if it had preferred
1229             // affinities.
1230             if (m_Clients[*j].GetAffinityReset(eGet))
1231                 m_GCWNodeClients.insert(*j);
1232             if (m_Clients[*j].GetAffinityReset(eRead))
1233                 m_GCReaderClients.insert(*j);
1234 
1235             if (m_GCWNodeClients.size() > 100000)
1236                 ERR_POST("Garbage collected worker node list exceeds 100000 "
1237                          "records. There are currently " <<
1238                          m_GCWNodeClients.size() << " records.");
1239             if (m_GCReaderClients.size() > 100000)
1240                 ERR_POST("Garbage collected reader list exceeds 100000 "
1241                          "records. There are currently " <<
1242                          m_GCReaderClients.size() << " records.");
1243 
1244             list<string>::iterator  found = find(inactive_readers.begin(),
1245                                                  inactive_readers.end(), *j);
1246             if (found != inactive_readers.end()) {
1247                 // That was a reader too, so adjust the readers info
1248                 inactive_readers.erase(found);
1249                 --total_reader_count;
1250             }
1251 
1252             m_Clients.erase(*j);
1253         }
1254     }
1255 
1256     // Here: deal with readers consideraing that if it was a worker node too
1257     // then the client must not be removed.
1258     if (total_reader_count > min_readers  &&  ! inactive_readers.empty()) {
1259 
1260         // Calculate the number of records to be deleted
1261         unsigned int    active_count = total_reader_count -
1262                                                     inactive_readers.size();
1263         unsigned int    remove_count = 0;
1264 
1265         if (active_count >= min_readers)
1266             remove_count = inactive_readers.size();
1267         else
1268             remove_count = total_reader_count - min_readers;
1269 
1270         // Sort the inactive ones to delete the oldest
1271         inactive_readers.sort(AgeFunctor(m_Clients));
1272 
1273         // Delete the oldest
1274         for (list<string>::iterator  j = inactive_readers.begin();
1275              j != inactive_readers.end() && remove_count > 0; ++j) {
1276 
1277             if (find(inactive_wns.begin(), inactive_wns.end(), *j) !=
1278                                                             inactive_wns.end())
1279                 continue;   // keep the reader because of WN limitations
1280 
1281             // The affinity reset flag is NOT set if:
1282             // - the client issued CLEAR
1283             // - the client did not have preferred affinities
1284             // It makes sense to memo the client only if it had preferred
1285             // affinities.
1286             if (m_Clients[*j].GetAffinityReset(eRead))
1287                 m_GCReaderClients.insert(*j);
1288 
1289 
1290             if (m_GCReaderClients.size() > 100000)
1291                 ERR_POST("Garbage collected reader list exceeds 100000 "
1292                          "records. There are currently " <<
1293                          m_GCReaderClients.size() << " records.");
1294 
1295             m_Clients.erase(*j);
1296             --remove_count;
1297         }
1298     }
1299 }
1300 
1301 
1302 // Must be called under the lock
1303 void
x_PurgeAdmins(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_admin,unsigned int min_admins,bool is_log)1304 CNSClientsRegistry::x_PurgeAdmins(const CNSPreciseTime &  current_time,
1305                                   const CNSPreciseTime &  timeout_admin,
1306                                   unsigned int            min_admins,
1307                                   bool                    is_log)
1308 {
1309     x_PurgeInactiveClients(current_time, timeout_admin, min_admins,
1310                            CNSClient::eAdmin, is_log);
1311 }
1312 
1313 
1314 // Must be called under the lock
1315 void
x_PurgeSubmitters(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_submitter,unsigned int min_submitters,bool is_log)1316 CNSClientsRegistry::x_PurgeSubmitters(const CNSPreciseTime &  current_time,
1317                                       const CNSPreciseTime &  timeout_submitter,
1318                                       unsigned int            min_submitters,
1319                                       bool                    is_log)
1320 {
1321     x_PurgeInactiveClients(current_time, timeout_submitter, min_submitters,
1322                            CNSClient::eSubmitter, is_log);
1323 }
1324 
1325 
1326 // Must be called under the lock
1327 void
x_PurgeUnknowns(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_unknown,unsigned int min_unknowns,bool is_log)1328 CNSClientsRegistry::x_PurgeUnknowns(const CNSPreciseTime &  current_time,
1329                                     const CNSPreciseTime &  timeout_unknown,
1330                                     unsigned int            min_unknowns,
1331                                     bool                    is_log)
1332 {
1333     x_PurgeInactiveClients(current_time, timeout_unknown, min_unknowns,
1334                            0, is_log);
1335 }
1336 
1337 
1338 void
x_PurgeInactiveClients(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout,unsigned int min_clients,unsigned int client_type,bool is_log)1339 CNSClientsRegistry::x_PurgeInactiveClients(const CNSPreciseTime &  current_time,
1340                                            const CNSPreciseTime &  timeout,
1341                                            unsigned int            min_clients,
1342                                            unsigned int            client_type,
1343                                            bool                    is_log)
1344 {
1345     map< string, CNSClient >::iterator      k = m_Clients.begin();
1346     list< string >                          inactive;
1347     unsigned int                            inactive_count = 0;
1348     unsigned int                            total_count = 0;    // of given type
1349     unsigned int                            type;
1350 
1351     // Count active and inactive admins
1352     for ( ; k != m_Clients.end(); ++k ) {
1353         type = k->second.GetType();
1354 
1355         if (client_type == 0) {
1356             if (type != 0)
1357                 continue;
1358         } else {
1359             if ((type & client_type) == 0)
1360                 continue;
1361             // Worker nodes and readers are handled separately
1362             if (type & CNSClient::eWorkerNode || type & CNSClient::eReader)
1363                 continue;
1364         }
1365 
1366         ++total_count;
1367         if (current_time - k->second.GetLastAccess() > timeout) {
1368             ++inactive_count;
1369             inactive.push_back(k->first);
1370         }
1371     }
1372 
1373     if (total_count <= min_clients || inactive_count == 0)
1374         return;
1375 
1376     // Calculate the number of records to be deleted
1377     unsigned int    active_count = total_count - inactive_count;
1378     unsigned int    remove_count = 0;
1379     if (active_count >= min_clients)
1380         remove_count = inactive_count;
1381     else
1382         remove_count = total_count - min_clients;
1383 
1384     // Sort the inactive ones to delete the oldest
1385     inactive.sort(AgeFunctor(m_Clients));
1386 
1387     // Delete the oldest records
1388     for (list<string>::iterator  j = inactive.begin();
1389          j != inactive.end() && remove_count > 0; ++j, --remove_count) {
1390         m_Clients.erase(*j);
1391     }
1392 }
1393 
1394 END_NCBI_SCOPE
1395 
1396