1 /* $Id: ns_clients_registry.cpp 543877 2017-08-15 13:30:30Z satskyse $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description:
29 * NetSchedule clients registry
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <corelib/request_ctx.hpp>
35
36 #include "ns_clients_registry.hpp"
37 #include "ns_affinity.hpp"
38 #include "ns_handler.hpp"
39
40
41 BEGIN_NCBI_SCOPE
42
43
CNSClientsRegistry()44 CNSClientsRegistry::CNSClientsRegistry() :
45 m_LastID(0), m_BlacklistTimeout(), m_ReadBlacklistTimeout()
46 {}
47
48
SetRegistries(CNSAffinityRegistry * aff_registry,CNSNotificationList * notif_registry)49 void CNSClientsRegistry::SetRegistries(CNSAffinityRegistry * aff_registry,
50 CNSNotificationList * notif_registry)
51 {
52 m_AffRegistry = aff_registry;
53 m_NotifRegistry = notif_registry;
54 }
55
56
57 // Called before any command is issued by the client
58 void
Touch(CNSClientId & client,TNSBitVector & running_jobs,TNSBitVector & reading_jobs,bool & client_was_found,bool & session_was_reset,string & old_session,bool & had_wn_pref_affs,bool & had_reader_pref_affs)59 CNSClientsRegistry::Touch(CNSClientId & client,
60 TNSBitVector & running_jobs,
61 TNSBitVector & reading_jobs,
62 bool & client_was_found,
63 bool & session_was_reset,
64 string & old_session,
65 bool & had_wn_pref_affs,
66 bool & had_reader_pref_affs)
67 {
68 client_was_found = false;
69 session_was_reset = false;
70 old_session.clear();
71 had_wn_pref_affs = false;
72 had_reader_pref_affs = false;
73
74 // Check if it is an old-style client
75 if (!client.IsComplete())
76 return;
77
78 map< string, CNSClient >::iterator known_client;
79 CMutexGuard guard(m_Lock);
80
81 known_client = m_Clients.find(client.GetNode());
82 if (known_client == m_Clients.end()) {
83 // The client is not known yet
84 CNSClient new_ns_client(client, &m_BlacklistTimeout,
85 &m_ReadBlacklistTimeout);
86 unsigned int client_id = x_GetNextID();
87
88 new_ns_client.SetID(client_id);
89 client.SetID(client_id);
90 m_Clients[ client.GetNode() ] = new_ns_client;
91 m_RegisteredClients.set_bit(client_id);
92 return;
93 }
94
95 client_was_found = true;
96
97 // The client has connected before
98 client.SetID(known_client->second.GetID());
99 old_session = known_client->second.GetSession();
100 if (client.GetSession() != old_session) {
101 session_was_reset = true;
102 ClearClient(client, running_jobs, reading_jobs,
103 client_was_found, old_session, had_wn_pref_affs,
104 had_reader_pref_affs);
105 }
106
107 known_client->second.Touch(client);
108
109 // The 'reset' client type must not reset the collected types when the next
110 // command is issued
111 if (client.GetType() == eClaimedReset)
112 client.SetClientType(eClaimedAutodetect);
113 }
114
115
RegisterSocketWriteError(const CNSClientId & client)116 void CNSClientsRegistry::RegisterSocketWriteError(const CNSClientId & client)
117 {
118 // Check if it is an old-style client
119 if (!client.IsComplete())
120 return;
121
122 CMutexGuard guard(m_Lock);
123 map< string, CNSClient >::iterator cl = m_Clients.find(client.GetNode());
124 if (cl != m_Clients.end())
125 cl->second.RegisterSocketWriteError();
126 }
127
128
SetLastScope(const CNSClientId & client)129 void CNSClientsRegistry::SetLastScope(const CNSClientId & client)
130 {
131 // Check if it is an old-style client
132 if (!client.IsComplete())
133 return;
134
135 CMutexGuard guard(m_Lock);
136 map< string, CNSClient >::iterator cl = m_Clients.find(client.GetNode());
137 if (cl != m_Clients.end())
138 cl->second.SetLastScope(client.GetScope());
139 }
140
141
142 void
AppendType(const CNSClientId & client,unsigned int type_to_append)143 CNSClientsRegistry::AppendType(const CNSClientId & client,
144 unsigned int type_to_append)
145 {
146 // Check if it is an old-style client
147 if (!client.IsComplete())
148 return;
149
150 CMutexGuard guard(m_Lock);
151 map< string, CNSClient >::iterator cl = m_Clients.find(client.GetNode());
152 if (cl != m_Clients.end())
153 cl->second.AppendType(type_to_append);
154 }
155
156
157 void
GCBlacklistedJobs(const CJobStatusTracker & tracker,ECommandGroup cmd_group)158 CNSClientsRegistry::GCBlacklistedJobs(const CJobStatusTracker & tracker,
159 ECommandGroup cmd_group)
160 {
161 CMutexGuard guard(m_Lock);
162 map< string, CNSClient >::iterator k = m_Clients.begin();
163
164 for ( ; k != m_Clients.end(); ++k)
165 k->second.GCBlacklistedJobs(tracker, cmd_group);
166 }
167
168
169 int
SetClientData(const CNSClientId & client,const string & data,int data_version)170 CNSClientsRegistry::SetClientData(const CNSClientId & client,
171 const string & data, int data_version)
172 {
173 // Check if it is an old-style client
174 if (!client.IsComplete())
175 NCBI_THROW(CNetScheduleException, eInvalidParameter,
176 "only non-anonymous clients may set their data");
177
178 CMutexGuard guard(m_Lock);
179 map< string,
180 CNSClient >::iterator found = m_Clients.find(client.GetNode());
181
182 if (found == m_Clients.end())
183 NCBI_THROW(CNetScheduleException, eInternalError,
184 "Cannot find client '" + client.GetNode() +
185 "' to set client data");
186
187 return found->second.SetClientData(data, data_version);
188 }
189
190
MarkAsAdmin(const CNSClientId & client)191 void CNSClientsRegistry::MarkAsAdmin(const CNSClientId & client)
192 {
193 // Check if it is an old-style client
194 if (!client.IsComplete())
195 return;
196
197 CMutexGuard guard(m_Lock);
198 map< string,
199 CNSClient >::iterator admin = m_Clients.find(client.GetNode());
200
201 if (admin != m_Clients.end())
202 admin->second.MarkAsAdmin();
203 }
204
205
206 // Updates the submitter job.
207 // No need to check session id, it's done in Touch()
AddToSubmitted(const CNSClientId & client,size_t count)208 void CNSClientsRegistry::AddToSubmitted(const CNSClientId & client,
209 size_t count)
210 {
211 // Check if it is an old-style client
212 if (!client.IsComplete())
213 return;
214
215 CMutexGuard guard(m_Lock);
216 map< string,
217 CNSClient >::iterator submitter = m_Clients.find(client.GetNode());
218
219 if (submitter != m_Clients.end())
220 submitter->second.RegisterSubmittedJobs(count);
221 }
222
223
224 // Updates the client jobs list.
225 // No need to check session id, it's done in Touch()
RegisterJob(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)226 void CNSClientsRegistry::RegisterJob(const CNSClientId & client,
227 unsigned int job_id,
228 ECommandGroup cmd_group)
229 {
230 // Check if it is an old-style client
231 if (!client.IsComplete())
232 return;
233
234 CMutexGuard guard(m_Lock);
235 map< string,
236 CNSClient >::iterator cl = m_Clients.find(client.GetNode());
237
238 if (cl == m_Clients.end())
239 NCBI_THROW(CNetScheduleException, eInternalError,
240 "Cannot find client '" + client.GetNode() +
241 "' to register a job");
242
243 cl->second.RegisterJob(job_id, cmd_group);
244 }
245
246
247 // Updates the client blacklisted jobs list.
248 // No need to check session id, it's done in Touch()
RegisterBlacklistedJob(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)249 void CNSClientsRegistry::RegisterBlacklistedJob(const CNSClientId & client,
250 unsigned int job_id,
251 ECommandGroup cmd_group)
252 {
253 // Check if it is an old-style client
254 if (!client.IsComplete())
255 return;
256
257 CMutexGuard guard(m_Lock);
258 map< string,
259 CNSClient >::iterator cl = m_Clients.find(client.GetNode());
260
261 if (cl == m_Clients.end())
262 NCBI_THROW(CNetScheduleException, eInternalError,
263 "Cannot find client '" + client.GetNode() +
264 "' to set blacklisted job");
265
266 cl->second.RegisterBlacklistedJob(job_id, cmd_group);
267 }
268
269
270 // Updates the client jobs list.
271 // No need to check session id, it's done in Touch()
UnregisterJob(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)272 void CNSClientsRegistry::UnregisterJob(const CNSClientId & client,
273 unsigned int job_id,
274 ECommandGroup cmd_group)
275 {
276 // Check if it is an old-style client
277 if (!client.IsComplete())
278 return;
279
280 CMutexGuard guard(m_Lock);
281 map< string,
282 CNSClient >::iterator cl = m_Clients.find(client.GetNode());
283
284 if (cl != m_Clients.end())
285 cl->second.UnregisterJob(job_id, cmd_group);
286 }
287
288
289 // Updates the client jobs list.
290 // Used when a job is timed out. At this moment there is no information about
291 // the node identifier so all the clients are checked.
UnregisterJob(unsigned int job_id,ECommandGroup cmd_group)292 void CNSClientsRegistry::UnregisterJob(unsigned int job_id,
293 ECommandGroup cmd_group)
294 {
295 // The container itself is not changed.
296 // The changes are in what the element holds.
297 CMutexGuard guard(m_Lock);
298 map< string, CNSClient >::iterator k = m_Clients.begin();
299
300 for ( ; k != m_Clients.end(); ++k)
301 k->second.UnregisterJob(job_id, cmd_group);
302 }
303
304
305 // Moves the client job into a blacklist.
306 // No need to check session id, it's done in Touch()
MoveJobToBlacklist(const CNSClientId & client,unsigned int job_id,ECommandGroup cmd_group)307 void CNSClientsRegistry::MoveJobToBlacklist(const CNSClientId & client,
308 unsigned int job_id,
309 ECommandGroup cmd_group)
310 {
311 // Check if it is an old-style client
312 if (!client.IsComplete())
313 return;
314
315 CMutexGuard guard(m_Lock);
316 map< string,
317 CNSClient >::iterator cl = m_Clients.find(client.GetNode());
318
319 if (cl != m_Clients.end())
320 cl->second.MoveJobToBlacklist(job_id, cmd_group);
321 }
322
323
324 // Updates the client job list and its blacklist.
325 // Used when the job is timed out and rescheduled as well as when
326 // RDRB/RETURN or FRED/FPUT are received
MoveJobToBlacklist(unsigned int job_id,ECommandGroup cmd_group)327 void CNSClientsRegistry::MoveJobToBlacklist(unsigned int job_id,
328 ECommandGroup cmd_group)
329 {
330 // The container itself is not changed.
331 // The changes are in what the element holds.
332 CMutexGuard guard(m_Lock);
333 map< string, CNSClient >::iterator k = m_Clients.begin();
334
335 for ( ; k != m_Clients.end(); ++k)
336 if (k->second.MoveJobToBlacklist(job_id, cmd_group))
337 break;
338 }
339
340
341
342 // Used in the following situations:
343 // - CLRN
344 // - new session
345 // - GC for a client (it is not strictly required because it may happened only
346 // for inactive clients, i.e. timeouted or after CLRN
347 void
ClearClient(const CNSClientId & client,TNSBitVector & running_jobs,TNSBitVector & reading_jobs,bool & client_was_found,string & old_session,bool & had_wn_pref_affs,bool & had_reader_pref_affs)348 CNSClientsRegistry::ClearClient(const CNSClientId & client,
349 TNSBitVector & running_jobs,
350 TNSBitVector & reading_jobs,
351 bool & client_was_found,
352 string & old_session,
353 bool & had_wn_pref_affs,
354 bool & had_reader_pref_affs)
355 {
356 client_was_found = false;
357
358 // Check if it is an old-style client
359 if (client.IsComplete())
360 ClearClient(client.GetNode(), running_jobs, reading_jobs,
361 client_was_found, old_session, had_wn_pref_affs,
362 had_reader_pref_affs);
363 }
364
365
366 void
ClearClient(const string & node_name,TNSBitVector & running_jobs,TNSBitVector & reading_jobs,bool & client_was_found,string & old_session,bool & had_wn_pref_affs,bool & had_reader_pref_affs)367 CNSClientsRegistry::ClearClient(const string & node_name,
368 TNSBitVector & running_jobs,
369 TNSBitVector & reading_jobs,
370 bool & client_was_found,
371 string & old_session,
372 bool & had_wn_pref_affs,
373 bool & had_reader_pref_affs)
374 {
375 client_was_found = false;
376
377 CMutexGuard guard(m_Lock);
378 map< string, CNSClient >::iterator cl = m_Clients.find(node_name);
379
380 if (cl != m_Clients.end()) {
381 client_was_found = true;
382 old_session = cl->second.GetSession();
383 cl->second.SetSession("");
384 cl->second.SetState(CNSClient::eQuit);
385 cl->second.SetClaimedType(eClaimedNotProvided);
386 x_ClearClient(node_name, cl->second, running_jobs,
387 had_wn_pref_affs, eGet);
388 x_ClearClient(node_name, cl->second, reading_jobs,
389 had_reader_pref_affs, eRead);
390 }
391 }
392
393
394 // Must be called under a lock
395 // When a client inactivity timeout is detected there is no need to touch the
396 // running or reading jobs. The only things to be done are:
397 // - cancel waiting
398 // - reset pref affinities
399 // - set reset affinity flag
400 // - log the event
401 void
ClearOnTimeout(CNSClient & client,const string & client_node,bool is_log,ECommandGroup cmd_group)402 CNSClientsRegistry::ClearOnTimeout(CNSClient & client,
403 const string & client_node,
404 bool is_log,
405 ECommandGroup cmd_group)
406 {
407 // Deregister preferred affinities
408 bool had_pref_affs = client.HasPreferredAffinities(cmd_group);
409 if (had_pref_affs) {
410 m_AffRegistry->RemoveClientFromAffinities(
411 client.GetID(),
412 client.GetPreferredAffinities(cmd_group),
413 cmd_group);
414 client.ClearPreferredAffinities(cmd_group);
415
416 if (is_log) {
417 string aff_part = "get";
418 if (cmd_group == eRead)
419 aff_part = "read";
420
421 CRef<CRequestContext> ctx;
422 ctx.Reset(new CRequestContext());
423 ctx->SetRequestID();
424 GetDiagContext().SetRequestContext(ctx);
425 GetDiagContext().PrintRequestStart()
426 .Print("_type", "client_watch")
427 .Print("client_node", client_node)
428 .Print("client_session", client.GetSession())
429 .Print(aff_part + "_preferred_affinities_reset",
430 "yes");
431 ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
432 GetDiagContext().PrintRequestStop();
433 }
434
435 client.SetAffinityReset(true, cmd_group);
436 }
437
438 CancelWaiting(client, cmd_group);
439 if (had_pref_affs)
440 x_BuildAffinities(cmd_group);
441 }
442
443
444 string
PrintClientsList(const CQueue * queue,size_t batch_size,bool verbose) const445 CNSClientsRegistry::PrintClientsList(const CQueue * queue,
446 size_t batch_size,
447 bool verbose) const
448 {
449 TNSBitVector batch;
450 string result;
451
452 TNSBitVector registered_clients = GetRegisteredClients();
453 TNSBitVector::enumerator en(registered_clients.first());
454
455 while (en.valid()) {
456 batch.set_bit(*en);
457 ++en;
458
459 if (batch.count() >= batch_size) {
460 result += x_PrintSelected(batch, queue, verbose) + "\n";
461 batch.clear();
462 }
463 }
464
465 if (batch.count() > 0)
466 result += x_PrintSelected(batch, queue, verbose) + "\n";
467 return result;
468 }
469
470
471 string
x_PrintSelected(const TNSBitVector & batch,const CQueue * queue,bool verbose) const472 CNSClientsRegistry::x_PrintSelected(const TNSBitVector & batch,
473 const CQueue * queue,
474 bool verbose) const
475 {
476 string buffer;
477 size_t printed = 0;
478
479 CMutexGuard guard(m_Lock);
480 map< string, CNSClient >::const_iterator k = m_Clients.begin();
481
482 for ( ; k != m_Clients.end(); ++k) {
483 if (batch.get_bit(k->second.GetID())) {
484 buffer += k->second.Print(k->first, queue, *m_AffRegistry,
485 m_GCWNodeClients, m_GCReaderClients,
486 verbose);
487 ++printed;
488 if (printed >= batch.count())
489 break;
490 }
491 }
492
493 return buffer;
494 }
495
496
497 void
SetNodeWaiting(const CNSClientId & client,unsigned short port,const TNSBitVector & aff_ids,ECommandGroup cmd_group)498 CNSClientsRegistry::SetNodeWaiting(const CNSClientId & client,
499 unsigned short port,
500 const TNSBitVector & aff_ids,
501 ECommandGroup cmd_group)
502 {
503 // Check if it is an old-style client
504 if (!client.IsComplete())
505 return;
506
507 CMutexGuard guard(m_Lock);
508 map< string,
509 CNSClient >::iterator node = m_Clients.find(client.GetNode());
510
511 if (node == m_Clients.end())
512 NCBI_THROW(CNetScheduleException, eInternalError,
513 "Cannot find client '" + client.GetNode() +
514 "' to set waiting attributes");
515
516 node->second.SetWaitPort(port, cmd_group);
517 node->second.SetWaitAffinities(aff_ids, cmd_group);
518 m_AffRegistry->SetWaitClientForAffinities(node->second.GetID(), aff_ids,
519 cmd_group);
520 }
521
522
523 // Handles the following cases:
524 // - GET2/READ with waiting has been interrupted by another GET2/READ
525 // - CWGET/CWREAD received
526 // - GET2/READ wait timeout is over
527 // - notification timeout is over
CancelWaiting(CNSClient & client,ECommandGroup cmd_group,bool touch_notif_registry)528 bool CNSClientsRegistry::CancelWaiting(CNSClient & client,
529 ECommandGroup cmd_group,
530 bool touch_notif_registry)
531 {
532 bool ret_val = false;
533 // Deregister wait affinities if so
534 if (client.HasWaitAffinities(cmd_group))
535 m_AffRegistry->RemoveWaitClientFromAffinities(
536 client.GetID(),
537 client.GetWaitAffinities(cmd_group),
538 cmd_group);
539
540 // Remove from notifications if wait port is non-zero
541 unsigned short port = client.GetWaitPort(cmd_group);
542 if (port != 0) {
543 ret_val = true;
544
545 // One of the cases is when a notification timeout is over.
546 // In this case it is detected in the notification registry and the
547 // corresponding record is already deleted.
548 if (touch_notif_registry)
549 m_NotifRegistry->UnregisterListener(client.GetPeerAddress(), port,
550 cmd_group);
551 }
552
553 // Clear affinities and port
554 client.CancelWaiting(cmd_group);
555 return ret_val;
556 }
557
558
559 bool
CancelWaiting(const CNSClientId & client,ECommandGroup cmd_group)560 CNSClientsRegistry::CancelWaiting(const CNSClientId & client,
561 ECommandGroup cmd_group)
562 {
563 // Check if it is an old-style client
564 if (client.IsComplete())
565 return CancelWaiting(client.GetNode(), cmd_group);
566 return false;
567 }
568
569
570 bool
CancelWaiting(const string & node_name,ECommandGroup cmd_group,bool touch_notif_registry)571 CNSClientsRegistry::CancelWaiting(const string & node_name,
572 ECommandGroup cmd_group,
573 bool touch_notif_registry)
574 {
575 if (node_name.empty())
576 return false;
577
578 CMutexGuard guard(m_Lock);
579 map< string, CNSClient >::iterator cl = m_Clients.find(node_name);
580
581 if (cl != m_Clients.end())
582 return CancelWaiting(cl->second, cmd_group, touch_notif_registry);
583 return false;
584 }
585
586
587 void
SubtractBlacklistedJobs(const CNSClientId & client,ECommandGroup cmd_group,TNSBitVector & bv) const588 CNSClientsRegistry::SubtractBlacklistedJobs(
589 const CNSClientId & client,
590 ECommandGroup cmd_group,
591 TNSBitVector & bv) const
592 {
593 if (client.IsComplete())
594 SubtractBlacklistedJobs(client.GetNode(), cmd_group, bv);
595 }
596
597
598 void
SubtractBlacklistedJobs(const string & client_node,ECommandGroup cmd_group,TNSBitVector & bv) const599 CNSClientsRegistry::SubtractBlacklistedJobs(
600 const string & client_node,
601 ECommandGroup cmd_group,
602 TNSBitVector & bv) const
603 {
604 CMutexGuard guard(m_Lock);
605 map< string,
606 CNSClient >::const_iterator found = m_Clients.find(client_node);
607
608 if (found != m_Clients.end())
609 found->second.SubtractBlacklistedJobs(cmd_group, bv);
610 }
611
612
613 void
GetScopes(const string & client_node,string & scope,string & virtual_scope)614 CNSClientsRegistry::GetScopes(const string & client_node,
615 string & scope, string & virtual_scope)
616 {
617 scope.clear();
618 virtual_scope.clear();
619
620 CMutexGuard guard(m_Lock);
621 map< string,
622 CNSClient >::const_iterator found = m_Clients.find(client_node);
623
624 if (found != m_Clients.end()) {
625 scope = found->second.GetLastScope();
626 virtual_scope = found->second.GetVirtualScope(client_node);
627 }
628 }
629
630
631 void
AddBlacklistedJobs(const CNSClientId & client,ECommandGroup cmd_group,TNSBitVector & bv) const632 CNSClientsRegistry::AddBlacklistedJobs(
633 const CNSClientId & client,
634 ECommandGroup cmd_group,
635 TNSBitVector & bv) const
636 {
637 if (client.IsComplete())
638 AddBlacklistedJobs(client.GetNode(), cmd_group, bv);
639 }
640
641
642 void
AddBlacklistedJobs(const string & client_node,ECommandGroup cmd_group,TNSBitVector & bv) const643 CNSClientsRegistry::AddBlacklistedJobs(
644 const string & client_node,
645 ECommandGroup cmd_group,
646 TNSBitVector & bv) const
647 {
648 CMutexGuard guard(m_Lock);
649 map< string,
650 CNSClient >::const_iterator found = m_Clients.find(client_node);
651
652 if (found != m_Clients.end())
653 found->second.AddBlacklistedJobs(cmd_group, bv);
654 }
655
656
657 TNSBitVector
GetPreferredAffinities(const CNSClientId & client,ECommandGroup cmd_group) const658 CNSClientsRegistry::GetPreferredAffinities(const CNSClientId & client,
659 ECommandGroup cmd_group) const
660 {
661 if (!client.IsComplete())
662 return kEmptyBitVector;
663 return GetPreferredAffinities(client.GetNode(), cmd_group);
664 }
665
666
667 TNSBitVector
GetPreferredAffinities(const string & node,ECommandGroup cmd_group) const668 CNSClientsRegistry::GetPreferredAffinities(const string & node,
669 ECommandGroup cmd_group) const
670 {
671 if (node.empty())
672 return kEmptyBitVector;
673
674 CMutexGuard guard(m_Lock);
675 map< string, CNSClient >::const_iterator found = m_Clients.find(node);
676
677 if (found == m_Clients.end())
678 return kEmptyBitVector;
679 return found->second.GetPreferredAffinities(cmd_group);
680 }
681
682
683 TNSBitVector
GetAllPreferredAffinities(ECommandGroup cmd_group) const684 CNSClientsRegistry::GetAllPreferredAffinities(ECommandGroup cmd_group) const
685 {
686 CMutexGuard guard(m_Lock);
687
688 if (cmd_group == eGet)
689 return m_WNodeAffinities;
690 return m_ReaderAffinities;
691 }
692
693
694 TNSBitVector
GetWaitAffinities(const CNSClientId & client,ECommandGroup cmd_group) const695 CNSClientsRegistry::GetWaitAffinities(const CNSClientId & client,
696 ECommandGroup cmd_group) const
697 {
698 if (!client.IsComplete())
699 return kEmptyBitVector;
700 return GetWaitAffinities(client.GetNode(), cmd_group);
701 }
702
703
704 TNSBitVector
GetWaitAffinities(const string & node,ECommandGroup cmd_group) const705 CNSClientsRegistry::GetWaitAffinities(const string & node,
706 ECommandGroup cmd_group) const
707 {
708 if (node.empty())
709 return kEmptyBitVector;
710
711 CMutexGuard guard(m_Lock);
712 map< string, CNSClient >::const_iterator found = m_Clients.find(node);
713
714 if (found == m_Clients.end())
715 return kEmptyBitVector;
716 return found->second.GetWaitAffinities(cmd_group);
717 }
718
719
GetRegisteredClients(void) const720 TNSBitVector CNSClientsRegistry::GetRegisteredClients(void) const
721 {
722 CMutexGuard guard(m_Lock);
723 return m_RegisteredClients;
724 }
725
726
727 void
UpdatePreferredAffinities(const CNSClientId & client,const TNSBitVector & aff_to_add,const TNSBitVector & aff_to_del,ECommandGroup cmd_group)728 CNSClientsRegistry::UpdatePreferredAffinities(const CNSClientId & client,
729 const TNSBitVector & aff_to_add,
730 const TNSBitVector & aff_to_del,
731 ECommandGroup cmd_group)
732 {
733 if (!client.IsComplete())
734 return;
735
736 CMutexGuard guard(m_Lock);
737 map< string,
738 CNSClient >::iterator found = m_Clients.find(client.GetNode());
739
740 if (found == m_Clients.end())
741 NCBI_THROW(CNetScheduleException, eInternalError,
742 "Cannot find client '" + client.GetNode() +
743 "' to update preferred affinities");
744
745 m_AffRegistry->AddClientToAffinities(client.GetID(), aff_to_add, cmd_group);
746 m_AffRegistry->RemoveClientFromAffinities(client.GetID(), aff_to_del,
747 cmd_group);
748
749 found->second.AddPreferredAffinities(aff_to_add, cmd_group);
750 found->second.RemovePreferredAffinities(aff_to_del, cmd_group);
751
752 // Update the union bit vector with WN/reader affinities
753 if (aff_to_del.any())
754 x_BuildAffinities(cmd_group);
755 else {
756 if (aff_to_add.any()) {
757 if (cmd_group == eGet)
758 m_WNodeAffinities |= aff_to_add;
759 else
760 m_ReaderAffinities |= aff_to_add;
761 }
762 }
763 }
764
765
766 bool
UpdatePreferredAffinities(const CNSClientId & client,unsigned int aff_to_add,unsigned int aff_to_del,ECommandGroup cmd_group)767 CNSClientsRegistry::UpdatePreferredAffinities(const CNSClientId & client,
768 unsigned int aff_to_add,
769 unsigned int aff_to_del,
770 ECommandGroup cmd_group)
771 {
772 if (aff_to_add + aff_to_del == 0)
773 return false;
774
775 if (!client.IsComplete())
776 return false;
777
778 bool aff_added = false;
779 CMutexGuard guard(m_Lock);
780 map< string,
781 CNSClient >::iterator found = m_Clients.find(client.GetNode());
782
783 if (found == m_Clients.end())
784 NCBI_THROW(CNetScheduleException, eInternalError,
785 "Cannot find client '" + client.GetNode() +
786 "' to update preferred affinities");
787
788 if (aff_to_add != 0) {
789 m_AffRegistry->AddClientToAffinity(client.GetID(), aff_to_add,
790 cmd_group);
791 aff_added = found->second.AddPreferredAffinity(aff_to_add,
792 cmd_group);
793 }
794
795 if (aff_to_del != 0) {
796 m_AffRegistry->RemoveClientFromAffinities(client.GetID(), aff_to_del,
797 cmd_group);
798 found->second.RemovePreferredAffinity(aff_to_del, cmd_group);
799 }
800
801 // Update the union bit vector with WN/reader affinities
802 if (aff_to_del != 0)
803 x_BuildAffinities(cmd_group);
804 else {
805 if (aff_added) {
806 if (cmd_group == eGet)
807 m_WNodeAffinities.set_bit(aff_to_add);
808 else
809 m_ReaderAffinities.set_bit(aff_to_add);
810 }
811 }
812 return aff_added;
813 }
814
815
816 void
SetPreferredAffinities(const CNSClientId & client,const TNSBitVector & aff_to_set,ECommandGroup cmd_group)817 CNSClientsRegistry::SetPreferredAffinities(const CNSClientId & client,
818 const TNSBitVector & aff_to_set,
819 ECommandGroup cmd_group)
820 {
821 if (!client.IsComplete())
822 return;
823
824 string client_name = client.GetNode();
825 CMutexGuard guard(m_Lock);
826 map< string,
827 CNSClient >::iterator found = m_Clients.find(client_name);
828
829 if (found == m_Clients.end())
830 NCBI_THROW(CNetScheduleException, eInternalError,
831 "Cannot find client '" + client.GetNode() +
832 "' to update preferred affinities");
833
834 TNSBitVector curr_affs = found->second.GetPreferredAffinities(cmd_group);
835 TNSBitVector aff_to_add = aff_to_set - curr_affs;
836 TNSBitVector aff_to_del = curr_affs - aff_to_set;
837
838 if (aff_to_add.any())
839 m_AffRegistry->AddClientToAffinities(client.GetID(), aff_to_add,
840 cmd_group);
841
842 if (aff_to_del.any())
843 m_AffRegistry->RemoveClientFromAffinities(client.GetID(), aff_to_del,
844 cmd_group);
845
846 found->second.SetPreferredAffinities(aff_to_set, cmd_group);
847
848 // Update the union bit vector with WN/reader affinities
849 if (aff_to_del.any())
850 x_BuildAffinities(cmd_group);
851 else {
852 if (aff_to_add.any()) {
853 if (cmd_group == eGet)
854 m_WNodeAffinities |= aff_to_add;
855 else
856 m_ReaderAffinities |= aff_to_add;
857 }
858 }
859
860 // Remove the client from the GC collected, because it affects affinities
861 // only which are re-set here anyway
862 if (cmd_group == eGet)
863 m_GCWNodeClients.erase(client_name);
864 else
865 m_GCReaderClients.erase(client_name);
866 }
867
868
869 bool
IsRequestedAffinity(const string & name,const TNSBitVector & aff,bool use_preferred,ECommandGroup cmd_group) const870 CNSClientsRegistry::IsRequestedAffinity(
871 const string & name,
872 const TNSBitVector & aff,
873 bool use_preferred,
874 ECommandGroup cmd_group) const
875 {
876 if (name.empty())
877 return false;
878
879 CMutexGuard guard(m_Lock);
880 map< string,
881 CNSClient >::const_iterator node = m_Clients.find(name);
882
883 if (node == m_Clients.end())
884 return false;
885
886 return node->second.IsRequestedAffinity(aff, use_preferred, cmd_group);
887 }
888
889
890 bool
IsPreferredByAny(unsigned int aff_id,ECommandGroup cmd_group) const891 CNSClientsRegistry::IsPreferredByAny(unsigned int aff_id,
892 ECommandGroup cmd_group) const
893 {
894 CMutexGuard guard(m_Lock);
895 if (cmd_group == eGet)
896 return m_WNodeAffinities.get_bit(aff_id);
897 return m_ReaderAffinities.get_bit(aff_id);
898 }
899
900
901 bool
GetAffinityReset(const CNSClientId & client,ECommandGroup cmd_group) const902 CNSClientsRegistry::GetAffinityReset(const CNSClientId & client,
903 ECommandGroup cmd_group) const
904 {
905 if (!client.IsComplete())
906 return false;
907
908 CMutexGuard guard(m_Lock);
909 map< string, CNSClient >::const_iterator found =
910 m_Clients.find(client.GetNode());
911
912 if (found == m_Clients.end())
913 return false;
914
915 return found->second.GetAffinityReset(cmd_group);
916 }
917
918
GetNodeName(unsigned int id) const919 string CNSClientsRegistry::GetNodeName(unsigned int id) const
920 {
921 CMutexGuard guard(m_Lock);
922 map< string, CNSClient >::const_iterator k = m_Clients.begin();
923
924 for ( ; k != m_Clients.end(); ++k )
925 if (k->second.GetID() == id)
926 return k->first;
927 return "";
928 }
929
930
931 // true if the node could be made stale
932 bool
x_CouldBeStale(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout,const CNSClient & client,ECommandGroup cmd_group)933 CNSClientsRegistry::x_CouldBeStale(const CNSPreciseTime & current_time,
934 const CNSPreciseTime & timeout,
935 const CNSClient & client,
936 ECommandGroup cmd_group)
937 {
938 if (current_time - client.GetLastAccess() <= timeout)
939 return false;
940
941 // The client may wait on GET2/READ longer than the configured timeout.
942 // Take the longest timeout of two if so.
943 unsigned short wait_port = client.GetWaitPort(cmd_group);
944 unsigned int wait_address = client.GetPeerAddress();
945 if (wait_port != 0) {
946 CNSPreciseTime get_lifetime = m_NotifRegistry->
947 GetPassiveNotificationLifetime(wait_address,
948 wait_port,
949 cmd_group);
950 if (current_time <= get_lifetime)
951 return false;
952 }
953 return true;
954 }
955
956
957 void
StaleNodes(const CNSPreciseTime & current_time,const CNSPreciseTime & wn_timeout,const CNSPreciseTime & reader_timeout,bool is_log)958 CNSClientsRegistry::StaleNodes(const CNSPreciseTime & current_time,
959 const CNSPreciseTime & wn_timeout,
960 const CNSPreciseTime & reader_timeout,
961 bool is_log)
962 {
963 // Checks if any of the worker nodes are inactive for too long
964 CNSClient::ENSClientState state;
965 unsigned int type;
966 CMutexGuard guard(m_Lock);
967 map< string, CNSClient >::iterator k = m_Clients.begin();
968
969 for ( ; k != m_Clients.end(); ++k ) {
970 state = k->second.GetState();
971 if (state == CNSClient::eQuit || state == CNSClient::eWNAndReaderStale)
972 continue;
973
974 type = k->second.GetType();
975 if ((type & (CNSClient::eWorkerNode | CNSClient::eReader)) == 0)
976 continue;
977
978 if ((type & CNSClient::eWorkerNode) != 0 &&
979 state != CNSClient::eWNStale) {
980 // This an active WN which should be checked
981 if (x_CouldBeStale(current_time, wn_timeout,
982 k->second, eGet)) {
983 ClearOnTimeout(k->second, k->first, is_log, eGet);
984 if (state == CNSClient::eActive)
985 k->second.SetState(CNSClient::eWNStale);
986 else
987 k->second.SetState(CNSClient::eWNAndReaderStale);
988 }
989 }
990
991 if ((type & CNSClient::eReader) != 0 &&
992 state != CNSClient::eReaderStale) {
993 // This an active reader which should be checked
994 if (x_CouldBeStale(current_time, reader_timeout,
995 k->second, eRead)) {
996 ClearOnTimeout(k->second, k->first, is_log, eRead);
997 if (state == CNSClient::eActive)
998 k->second.SetState(CNSClient::eReaderStale);
999 else
1000 k->second.SetState(CNSClient::eWNAndReaderStale);
1001 }
1002 }
1003 }
1004 }
1005
1006
Purge(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_worker_node,unsigned int min_worker_nodes,const CNSPreciseTime & timeout_admin,unsigned int min_admins,const CNSPreciseTime & timeout_submitter,unsigned int min_submitters,const CNSPreciseTime & timeout_reader,unsigned int min_readers,const CNSPreciseTime & timeout_unknown,unsigned int min_unknowns,bool is_log)1007 void CNSClientsRegistry::Purge(const CNSPreciseTime & current_time,
1008 const CNSPreciseTime & timeout_worker_node,
1009 unsigned int min_worker_nodes,
1010 const CNSPreciseTime & timeout_admin,
1011 unsigned int min_admins,
1012 const CNSPreciseTime & timeout_submitter,
1013 unsigned int min_submitters,
1014 const CNSPreciseTime & timeout_reader,
1015 unsigned int min_readers,
1016 const CNSPreciseTime & timeout_unknown,
1017 unsigned int min_unknowns,
1018 bool is_log)
1019 {
1020 CMutexGuard guard(m_Lock);
1021
1022 x_PurgeWNodesAndReaders(current_time, timeout_worker_node, min_worker_nodes,
1023 timeout_reader, min_readers, is_log);
1024 x_PurgeAdmins(current_time, timeout_admin, min_admins, is_log);
1025 x_PurgeSubmitters(current_time, timeout_submitter, min_submitters, is_log);
1026 x_PurgeUnknowns(current_time, timeout_unknown, min_unknowns, is_log);
1027 }
1028
1029
x_GetNextID(void)1030 unsigned int CNSClientsRegistry::x_GetNextID(void)
1031 {
1032 CFastMutexGuard guard(m_LastIDLock);
1033
1034 // 0 is an invalid value, so make sure the ID != 0
1035 ++m_LastID;
1036 if (m_LastID == 0)
1037 m_LastID = 1;
1038 return m_LastID;
1039 }
1040
1041
1042 // Must be called under the lock
x_BuildAffinities(ECommandGroup cmd_group)1043 void CNSClientsRegistry::x_BuildAffinities(ECommandGroup cmd_group)
1044 {
1045 if (cmd_group == eRead) {
1046 m_ReaderAffinities.clear();
1047 for (map< string, CNSClient >::const_iterator
1048 k = m_Clients.begin(); k != m_Clients.end(); ++k )
1049 if (k->second.GetType() & CNSClient::eReader)
1050 m_ReaderAffinities |= k->second.GetPreferredAffinities(eRead);
1051 } else {
1052 m_WNodeAffinities.clear();
1053 for (map< string, CNSClient >::const_iterator
1054 k = m_Clients.begin(); k != m_Clients.end(); ++k )
1055 if (k->second.GetType() & CNSClient::eWorkerNode)
1056 m_WNodeAffinities |= k->second.GetPreferredAffinities(eGet);
1057 }
1058 }
1059
1060
1061 bool
WasGarbageCollected(const CNSClientId & client,ECommandGroup cmd_group) const1062 CNSClientsRegistry::WasGarbageCollected(const CNSClientId & client,
1063 ECommandGroup cmd_group) const
1064 {
1065 if (!client.IsComplete())
1066 return false;
1067
1068 CMutexGuard guard(m_Lock);
1069 if (cmd_group == eGet)
1070 return m_GCWNodeClients.find(client.GetNode()) !=
1071 m_GCWNodeClients.end();
1072 return m_GCReaderClients.find(client.GetNode()) !=
1073 m_GCReaderClients.end();
1074 }
1075
1076
1077 // Must be called under the lock
1078 void
x_ClearClient(const string & node_name,CNSClient & client,TNSBitVector & jobs,bool & had_pref_affs,ECommandGroup cmd_group)1079 CNSClientsRegistry::x_ClearClient(const string & node_name,
1080 CNSClient & client,
1081 TNSBitVector & jobs,
1082 bool & had_pref_affs,
1083 ECommandGroup cmd_group)
1084 {
1085 jobs = client.GetJobs(cmd_group);
1086 client.ClearJobs(cmd_group);
1087
1088 // Deregister preferred affinities
1089 had_pref_affs = client.HasPreferredAffinities(cmd_group);
1090 if (had_pref_affs) {
1091 m_AffRegistry->RemoveClientFromAffinities(
1092 client.GetID(),
1093 client.GetPreferredAffinities(cmd_group),
1094 cmd_group);
1095 client.ClearPreferredAffinities(cmd_group);
1096 }
1097
1098 CancelWaiting(client, cmd_group);
1099 if (had_pref_affs)
1100 x_BuildAffinities(cmd_group);
1101
1102 // Client has gone. No need to remember it if it was
1103 // garbage collected previously.
1104 if (cmd_group == eGet)
1105 m_GCWNodeClients.erase(node_name);
1106 else
1107 m_GCReaderClients.erase(node_name);
1108 }
1109
1110
1111 // Compares last access time of two clients basing on their identifiers
1112 class AgeFunctor
1113 {
1114 public:
AgeFunctor(map<string,CNSClient> & clients)1115 AgeFunctor(map< string, CNSClient > & clients) :
1116 m_Clients(clients)
1117 {}
1118
operator ()(const string & lhs,const string & rhs)1119 bool operator () (const string & lhs, const string & rhs)
1120 {
1121 return m_Clients[ lhs ].GetLastAccess() <
1122 m_Clients[ rhs ].GetLastAccess();
1123 }
1124
1125 private:
1126 map< string, CNSClient > & m_Clients;
1127 };
1128
1129
1130 // Must be called under the lock
1131 void
x_PurgeWNodesAndReaders(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_worker_node,unsigned int min_worker_nodes,const CNSPreciseTime & timeout_reader,unsigned int min_readers,bool is_log)1132 CNSClientsRegistry::x_PurgeWNodesAndReaders(
1133 const CNSPreciseTime & current_time,
1134 const CNSPreciseTime & timeout_worker_node,
1135 unsigned int min_worker_nodes,
1136 const CNSPreciseTime & timeout_reader,
1137 unsigned int min_readers,
1138 bool is_log)
1139 {
1140 map< string, CNSClient >::iterator k = m_Clients.begin();
1141 list< string > inactive_wns;
1142 list< string > inactive_readers;
1143 unsigned int total_wn_count = 0;
1144 unsigned int total_reader_count = 0;
1145 unsigned int type;
1146 CNSClient::ENSClientState state;
1147
1148 // Count total WNs and readers as well as detect purge candidates
1149 for ( ; k != m_Clients.end(); ++k ) {
1150 type = k->second.GetType();
1151
1152 if ((type & CNSClient::eWorkerNode) != 0)
1153 ++total_wn_count;
1154 else if ((type & CNSClient::eReader) != 0)
1155 ++total_reader_count;
1156 else
1157 continue;
1158
1159 state = k->second.GetState();
1160 if (state == CNSClient::eActive)
1161 continue;
1162
1163 if ((type & (CNSClient::eWorkerNode | CNSClient::eReader)) ==
1164 CNSClient::eWorkerNode) {
1165 // The client is exclusively a worker node
1166 if (state != CNSClient::eWNStale &&
1167 state != CNSClient::eWNAndReaderStale &&
1168 state != CNSClient::eQuit)
1169 continue;
1170
1171 // Test if the wn timeout is over
1172 if (current_time - k->second.GetLastAccess() >
1173 timeout_worker_node)
1174 inactive_wns.push_back(k->first);
1175
1176 } else if ((type & (CNSClient::eWorkerNode | CNSClient::eReader)) ==
1177 CNSClient::eReader) {
1178 // The client is exclusively a reader
1179 if (state != CNSClient::eReaderStale &&
1180 state != CNSClient::eWNAndReaderStale &&
1181 state != CNSClient::eQuit)
1182 continue;
1183
1184 // Test if the reader timeout is over
1185 if (current_time - k->second.GetLastAccess() >
1186 timeout_reader)
1187 inactive_readers.push_back(k->first);
1188
1189 } else {
1190 // The client is both a worker node and a reader
1191 if (state != CNSClient::eWNAndReaderStale &&
1192 state != CNSClient::eQuit)
1193 continue;
1194
1195 // Test bothe WN and reader timeouts
1196 if (current_time - k->second.GetLastAccess() >
1197 timeout_worker_node &&
1198 current_time - k->second.GetLastAccess() >
1199 timeout_reader) {
1200 inactive_wns.push_back(k->first);
1201 inactive_readers.push_back(k->first);
1202 }
1203 }
1204 }
1205
1206 // Deal with worker nodes first
1207 if (total_wn_count > min_worker_nodes && ! inactive_wns.empty()) {
1208
1209 // Calculate the number of records to be deleted
1210 unsigned int active_count = total_wn_count - inactive_wns.size();
1211 unsigned int remove_count = 0;
1212
1213 if (active_count >= min_worker_nodes)
1214 remove_count = inactive_wns.size();
1215 else
1216 remove_count = total_wn_count - min_worker_nodes;
1217
1218 // Sort the inactive ones to delete the oldest
1219 inactive_wns.sort(AgeFunctor(m_Clients));
1220
1221 // Delete the oldest
1222 for (list<string>::iterator j = inactive_wns.begin();
1223 j != inactive_wns.end() && remove_count > 0; ++j, --remove_count) {
1224
1225 // The affinity reset flag is NOT set if:
1226 // - the client issued CLEAR
1227 // - the client did not have preferred affinities
1228 // It makes sense to memo the client only if it had preferred
1229 // affinities.
1230 if (m_Clients[*j].GetAffinityReset(eGet))
1231 m_GCWNodeClients.insert(*j);
1232 if (m_Clients[*j].GetAffinityReset(eRead))
1233 m_GCReaderClients.insert(*j);
1234
1235 if (m_GCWNodeClients.size() > 100000)
1236 ERR_POST("Garbage collected worker node list exceeds 100000 "
1237 "records. There are currently " <<
1238 m_GCWNodeClients.size() << " records.");
1239 if (m_GCReaderClients.size() > 100000)
1240 ERR_POST("Garbage collected reader list exceeds 100000 "
1241 "records. There are currently " <<
1242 m_GCReaderClients.size() << " records.");
1243
1244 list<string>::iterator found = find(inactive_readers.begin(),
1245 inactive_readers.end(), *j);
1246 if (found != inactive_readers.end()) {
1247 // That was a reader too, so adjust the readers info
1248 inactive_readers.erase(found);
1249 --total_reader_count;
1250 }
1251
1252 m_Clients.erase(*j);
1253 }
1254 }
1255
1256 // Here: deal with readers consideraing that if it was a worker node too
1257 // then the client must not be removed.
1258 if (total_reader_count > min_readers && ! inactive_readers.empty()) {
1259
1260 // Calculate the number of records to be deleted
1261 unsigned int active_count = total_reader_count -
1262 inactive_readers.size();
1263 unsigned int remove_count = 0;
1264
1265 if (active_count >= min_readers)
1266 remove_count = inactive_readers.size();
1267 else
1268 remove_count = total_reader_count - min_readers;
1269
1270 // Sort the inactive ones to delete the oldest
1271 inactive_readers.sort(AgeFunctor(m_Clients));
1272
1273 // Delete the oldest
1274 for (list<string>::iterator j = inactive_readers.begin();
1275 j != inactive_readers.end() && remove_count > 0; ++j) {
1276
1277 if (find(inactive_wns.begin(), inactive_wns.end(), *j) !=
1278 inactive_wns.end())
1279 continue; // keep the reader because of WN limitations
1280
1281 // The affinity reset flag is NOT set if:
1282 // - the client issued CLEAR
1283 // - the client did not have preferred affinities
1284 // It makes sense to memo the client only if it had preferred
1285 // affinities.
1286 if (m_Clients[*j].GetAffinityReset(eRead))
1287 m_GCReaderClients.insert(*j);
1288
1289
1290 if (m_GCReaderClients.size() > 100000)
1291 ERR_POST("Garbage collected reader list exceeds 100000 "
1292 "records. There are currently " <<
1293 m_GCReaderClients.size() << " records.");
1294
1295 m_Clients.erase(*j);
1296 --remove_count;
1297 }
1298 }
1299 }
1300
1301
1302 // Must be called under the lock
1303 void
x_PurgeAdmins(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_admin,unsigned int min_admins,bool is_log)1304 CNSClientsRegistry::x_PurgeAdmins(const CNSPreciseTime & current_time,
1305 const CNSPreciseTime & timeout_admin,
1306 unsigned int min_admins,
1307 bool is_log)
1308 {
1309 x_PurgeInactiveClients(current_time, timeout_admin, min_admins,
1310 CNSClient::eAdmin, is_log);
1311 }
1312
1313
1314 // Must be called under the lock
1315 void
x_PurgeSubmitters(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_submitter,unsigned int min_submitters,bool is_log)1316 CNSClientsRegistry::x_PurgeSubmitters(const CNSPreciseTime & current_time,
1317 const CNSPreciseTime & timeout_submitter,
1318 unsigned int min_submitters,
1319 bool is_log)
1320 {
1321 x_PurgeInactiveClients(current_time, timeout_submitter, min_submitters,
1322 CNSClient::eSubmitter, is_log);
1323 }
1324
1325
1326 // Must be called under the lock
1327 void
x_PurgeUnknowns(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout_unknown,unsigned int min_unknowns,bool is_log)1328 CNSClientsRegistry::x_PurgeUnknowns(const CNSPreciseTime & current_time,
1329 const CNSPreciseTime & timeout_unknown,
1330 unsigned int min_unknowns,
1331 bool is_log)
1332 {
1333 x_PurgeInactiveClients(current_time, timeout_unknown, min_unknowns,
1334 0, is_log);
1335 }
1336
1337
1338 void
x_PurgeInactiveClients(const CNSPreciseTime & current_time,const CNSPreciseTime & timeout,unsigned int min_clients,unsigned int client_type,bool is_log)1339 CNSClientsRegistry::x_PurgeInactiveClients(const CNSPreciseTime & current_time,
1340 const CNSPreciseTime & timeout,
1341 unsigned int min_clients,
1342 unsigned int client_type,
1343 bool is_log)
1344 {
1345 map< string, CNSClient >::iterator k = m_Clients.begin();
1346 list< string > inactive;
1347 unsigned int inactive_count = 0;
1348 unsigned int total_count = 0; // of given type
1349 unsigned int type;
1350
1351 // Count active and inactive admins
1352 for ( ; k != m_Clients.end(); ++k ) {
1353 type = k->second.GetType();
1354
1355 if (client_type == 0) {
1356 if (type != 0)
1357 continue;
1358 } else {
1359 if ((type & client_type) == 0)
1360 continue;
1361 // Worker nodes and readers are handled separately
1362 if (type & CNSClient::eWorkerNode || type & CNSClient::eReader)
1363 continue;
1364 }
1365
1366 ++total_count;
1367 if (current_time - k->second.GetLastAccess() > timeout) {
1368 ++inactive_count;
1369 inactive.push_back(k->first);
1370 }
1371 }
1372
1373 if (total_count <= min_clients || inactive_count == 0)
1374 return;
1375
1376 // Calculate the number of records to be deleted
1377 unsigned int active_count = total_count - inactive_count;
1378 unsigned int remove_count = 0;
1379 if (active_count >= min_clients)
1380 remove_count = inactive_count;
1381 else
1382 remove_count = total_count - min_clients;
1383
1384 // Sort the inactive ones to delete the oldest
1385 inactive.sort(AgeFunctor(m_Clients));
1386
1387 // Delete the oldest records
1388 for (list<string>::iterator j = inactive.begin();
1389 j != inactive.end() && remove_count > 0; ++j, --remove_count) {
1390 m_Clients.erase(*j);
1391 }
1392 }
1393
1394 END_NCBI_SCOPE
1395
1396