1 /* $Id: netschedule_api.cpp 599189 2019-12-20 15:57:19Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Anatoliy Kuznetsov, Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 * Implementation of NetSchedule API.
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include "netschedule_api_impl.hpp"
36
37 #include <connect/ncbi_socket.hpp>
38 #include <connect/ncbi_conn_exception.hpp>
39 #include <connect/ncbi_userhost.hpp>
40
41 #include <corelib/ncbi_system.hpp>
42 #include <corelib/plugin_manager_impl.hpp>
43
44 #include <array>
45 #include <memory>
46 #include <stdio.h>
47
48
49 #define COMPATIBLE_NETSCHEDULE_VERSION "4.10.0"
50
51
52 BEGIN_NCBI_SCOPE
53
54 namespace grid {
55 namespace netschedule {
56 namespace limits {
57
ThrowIllegalChar(const string & name,const string & value,char c)58 void ThrowIllegalChar(const string& name, const string& value, char c)
59 {
60 NCBI_THROW_FMT(CConfigException, eInvalidParameter,
61 "Invalid character '" << NStr::PrintableString(CTempString(&c, 1)) <<
62 "' in the " << name << " \"" << NStr::PrintableString(value) << "\".");
63 }
64
65 }
66 }
67 }
68
69 using namespace grid::netschedule;
70
SNetScheduleNotificationThread(SNetScheduleAPIImpl * ns_api)71 SNetScheduleNotificationThread::SNetScheduleNotificationThread(
72 SNetScheduleAPIImpl* ns_api) :
73 m_API(ns_api),
74 m_StopThread(false)
75 {
76 }
77
SNetScheduleNotificationReceiver()78 SNetScheduleNotificationReceiver::SNetScheduleNotificationReceiver()
79 {
80 STimeout rto;
81 rto.sec = rto.usec = 0;
82 socket.SetDataLogging(TServConn_ConnDataLogging::GetDefault() ? eOn : eOff);
83 socket.SetTimeout(eIO_Read, &rto);
84
85 EIO_Status status = socket.Bind(0);
86 if (status != eIO_Success) {
87 NCBI_THROW_FMT(CException, eUnknown,
88 "Could not bind a UDP socket: " << IO_StatusStr(status));
89 }
90
91 port = socket.GetLocalPort(eNH_HostByteOrder);
92 }
93
CmdAppendPortAndTimeout(string * cmd,unsigned remaining_seconds)94 void SNetScheduleNotificationThread::CmdAppendPortAndTimeout(
95 string* cmd, unsigned remaining_seconds)
96 {
97 if (remaining_seconds > 0) {
98 *cmd += " port=";
99 *cmd += NStr::UIntToString(GetPort());
100
101 *cmd += " timeout=";
102 *cmd += NStr::UIntToString(remaining_seconds);
103 }
104 }
105
CNetScheduleNotificationHandler()106 CNetScheduleNotificationHandler::CNetScheduleNotificationHandler()
107 {
108 }
109
s_CreateCUrlArgs(const string & output)110 CUrlArgs s_CreateCUrlArgs(const string& output)
111 try {
112 return CUrlArgs(output);
113 }
114 catch (CUrlParserException&) {
115 return CUrlArgs();
116 }
117
SNetScheduleOutputParser(const string & output)118 SNetScheduleOutputParser::SNetScheduleOutputParser(const string& output) :
119 CUrlArgs(s_CreateCUrlArgs(output))
120 {
121 }
122
operator ()(const string & param) const123 const string& SNetScheduleOutputParser::operator()(const string& param) const
124 {
125 auto it = FindFirst(param);
126 return it != GetArgs().end() ? it->value : kEmptyStr;
127 }
128
g_ParseNSOutput(const string & attr_string,const char * const * attr_names,string * attr_values,size_t attr_count)129 int g_ParseNSOutput(const string& attr_string, const char* const* attr_names,
130 string* attr_values, size_t attr_count)
131 {
132 try {
133 CUrlArgs attr_parser(attr_string);
134 const CUrlArgs::TArgs& attr_list = attr_parser.GetArgs();
135
136 int found_attrs = 0;
137
138 CUrlArgs::const_iterator attr_it;
139
140 do {
141 if ((attr_it = attr_parser.FindFirst(*attr_names)) !=
142 attr_list.end()) {
143 *attr_values = attr_it->value;
144 ++found_attrs;
145 }
146 ++attr_names;
147 ++attr_values;
148 } while (--attr_count > 0);
149
150 return found_attrs;
151 }
152 catch (CUrlParserException&) {
153 }
154
155 return -1;
156 }
157
158 SNetScheduleNotificationThread::ENotificationType
CheckNotification(string * ns_node)159 SNetScheduleNotificationThread::CheckNotification(string* ns_node)
160 {
161 _ASSERT(ns_node);
162
163 SNetScheduleOutputParser parser(m_Receiver.message);
164
165 if (parser("queue") != m_API->m_Queue) return eNT_Unknown;
166
167 *ns_node = parser("ns_node");
168
169 const auto reason = parser("reason");
170
171 if (reason.empty())
172 return eNT_GetNotification;
173 else if (NStr::CompareCase(reason, CTempString("get", 3)) == 0)
174 return eNT_GetNotification;
175 else if (NStr::CompareCase(reason, CTempString("read", 4)) == 0)
176 return eNT_ReadNotification;
177 else
178 return eNT_Unknown;
179 }
180
InterruptWait()181 void SServerNotifications::InterruptWait()
182 {
183 CFastMutexGuard guard(m_Mutex);
184
185 if (m_Interrupted)
186 m_NotificationSemaphore.TryWait();
187 else {
188 m_Interrupted = true;
189 if (!m_ReadyServers.empty())
190 m_NotificationSemaphore.TryWait();
191 }
192
193 m_NotificationSemaphore.Post();
194 }
195
RegisterServer(const string & ns_node)196 void SServerNotifications::RegisterServer(const string& ns_node)
197 {
198 CFastMutexGuard guard(m_Mutex);
199
200 if (!m_ReadyServers.empty())
201 m_Interrupted = false;
202 else {
203 x_ClearInterruptFlag();
204 m_NotificationSemaphore.Post();
205 }
206
207 m_ReadyServers.insert(ns_node);
208 }
209
GetNextNotification(string * ns_node)210 bool SServerNotifications::GetNextNotification(string* ns_node)
211 {
212 CFastMutexGuard guard(m_Mutex);
213
214 x_ClearInterruptFlag();
215
216 if (m_ReadyServers.empty())
217 return false;
218
219 TReadyServers::iterator next_server = m_ReadyServers.begin();
220 *ns_node = *next_server;
221 m_ReadyServers.erase(next_server);
222
223 if (m_ReadyServers.empty())
224 // Make sure the notification semaphore count is reset to zero.
225 m_NotificationSemaphore.TryWait();
226
227 return true;
228 }
229
Main()230 void* SNetScheduleNotificationThread::Main()
231 {
232 SetCurrentThreadName(
233 (CNcbiApplication::Instance()->GetProgramDisplayName() +
234 "_nt").c_str());
235
236 static const STimeout two_seconds = {2, 0};
237
238 string server_host;
239
240 while (!m_StopThread)
241 if (m_Receiver.socket.Wait(&two_seconds) == eIO_Success) {
242 if (m_StopThread)
243 break;
244
245 if (m_Receiver(&server_host)) {
246 string ns_node;
247
248 ENotificationType notif_type = CheckNotification(&ns_node);
249
250 switch (notif_type) {
251 case eNT_GetNotification:
252 m_GetNotifications.RegisterServer(ns_node);
253 break;
254 case eNT_ReadNotification:
255 m_ReadNotifications.RegisterServer(ns_node);
256 break;
257 default:
258 break;
259 }
260 }
261 }
262 return NULL;
263 }
264
operator ()(string * server_host)265 bool SNetScheduleNotificationReceiver::operator()(string* server_host)
266 {
267 array<char, 64 * 1024> buffer; // Enough to hold any UDP
268 size_t msg_len;
269
270 if (socket.Recv(buffer.data(), buffer.size(), &msg_len,
271 server_host, NULL) != eIO_Success)
272 return false;
273
274 _ASSERT(buffer.size() > msg_len);
275 buffer[msg_len] = '\0'; // Make it null-terminated in case it's not
276 message.assign(buffer.data()); // Ignore everything after the first null character
277
278 return true;
279 }
280
ReceiveNotification(string * server_host)281 bool CNetScheduleNotificationHandler::ReceiveNotification(string* server_host)
282 {
283 return m_Receiver(server_host);
284 }
285
WaitForNotification(const CDeadline & deadline,string * server_host)286 bool CNetScheduleNotificationHandler::WaitForNotification(
287 const CDeadline& deadline, string* server_host)
288 {
289 STimeout timeout;
290
291 for (;;) {
292 deadline.GetRemainingTime().Get(&timeout.sec, &timeout.usec);
293
294 if (timeout.sec == 0 && timeout.usec == 0)
295 return false;
296
297 switch (m_Receiver.socket.Wait(&timeout)) {
298 case eIO_Timeout:
299 return false;
300
301 case eIO_Success:
302 if (ReceiveNotification(server_host))
303 return true;
304 /* FALL THROUGH */
305
306 default:
307 break;
308 }
309 }
310
311 return false;
312 }
313
PrintPortNumber()314 void CNetScheduleNotificationHandler::PrintPortNumber()
315 {
316 printf("Using UDP port %hu\n", GetPort());
317 }
318
AllocNotificationThread()319 void SNetScheduleAPIImpl::AllocNotificationThread()
320 {
321 CFastMutexGuard guard(m_NotificationThreadMutex);
322
323 if (m_NotificationThread == NULL)
324 m_NotificationThread = new SNetScheduleNotificationThread(this);
325 }
326
StartNotificationThread()327 void SNetScheduleAPIImpl::StartNotificationThread()
328 {
329 if (m_NotificationThreadStartStopCounter.Add(1) == 1)
330 m_NotificationThread->Run();
331 }
332
~SNetScheduleAPIImpl()333 SNetScheduleAPIImpl::~SNetScheduleAPIImpl()
334 {
335 if (m_NotificationThreadStartStopCounter.Add(-1) == 0) {
336 CFastMutexGuard guard(m_NotificationThreadMutex);
337
338 if (m_NotificationThread != NULL) {
339 m_NotificationThread->m_StopThread = true;
340 CDatagramSocket().Send("INTERRUPT", sizeof("INTERRUPT"),
341 "127.0.0.1", m_NotificationThread->m_Receiver.port);
342 m_NotificationThread->Join();
343 }
344 }
345 }
346
x_ClearNode()347 void SNetScheduleAPIImpl::x_ClearNode()
348 {
349 string cmd("CLRN");
350 g_AppendClientIPSessionIDHitID(cmd);
351
352 for (CNetServiceIterator it =
353 m_Service.Iterate(CNetService::eIncludePenalized); it; ++it) {
354 CNetServer server = *it;
355
356 try {
357 CNetServer::SExecResult exec_result;
358 server->ConnectAndExec(cmd, false, exec_result);
359 } catch (CNetSrvConnException& e) {
360 if (m_Service.IsLoadBalanced()) {
361 ERR_POST(server->m_ServerInPool->m_Address.AsString() <<
362 ": " << e.what());
363 }
364 } catch (CNetServiceException& e) {
365 if (e.GetErrCode() != CNetServiceException::eCommunicationError)
366 throw;
367 else {
368 ERR_POST(server->m_ServerInPool->m_Address.AsString() <<
369 ": " << e.what());
370 }
371 }
372 }
373 }
374
s_GetSection(bool ns_conf)375 CTempString s_GetSection(bool ns_conf)
376 {
377 return ns_conf ? "netschedule_conf_from_netschedule" : "netcache_conf_from_netschedule";
378 }
379
CNetScheduleConfigLoader(CSynRegistry & registry,SRegSynonyms & sections,bool ns_conf)380 CNetScheduleConfigLoader::CNetScheduleConfigLoader(CSynRegistry& registry, SRegSynonyms& sections, bool ns_conf) :
381 m_Registry(registry), m_Sections(sections), m_NsConf(ns_conf), m_Mode(eImplicit)
382 {
383 sections.Insert(s_GetSection(m_NsConf));
384
385 const auto param = "load_config_from_ns";
386
387 if (m_Registry.Has(m_Sections, param)) {
388 m_Mode = m_Registry.Get(m_Sections, param, true) ? eExplicit : eOff;
389 }
390 }
391
Transform(const string & prefix,string & name) const392 bool CNetScheduleConfigLoader::Transform(const string& prefix, string& name) const
393 {
394 if (m_NsConf) {
395 // If it's "service to queue" special case (we do not know queue name)
396 if (name == "queue_name") return true;
397
398 // Queue parameter "timeout" determines the initial TTL of a submitted job.
399 // Since "timeout" is too generic, replaced it with "job_ttl" on client side.
400 if (name == "timeout") {
401 name = "job_ttl";
402 return true;
403 }
404 }
405
406 // Do not load client_name from server
407 if (name == "client_name") return false;
408
409 // Only params starting with provided prefix are used
410 if (NStr::StartsWith(name, prefix)) {
411 name.erase(0, prefix.size());
412 return true;
413 }
414
415 return false;
416 }
417
operator ()(SNetScheduleAPIImpl * impl)418 bool CNetScheduleConfigLoader::operator()(SNetScheduleAPIImpl* impl)
419 {
420 _ASSERT(impl);
421
422 if (m_Mode == eOff) return false;
423
424 // Turn off any subsequent attempts
425 const auto mode = m_Mode;
426 m_Mode = eOff;
427
428 // Errors could happen when we try to load config from servers that either
429 // do not support "GETP2" command (introduced in 4.16.9)
430 // or, do not support "QINF2 service=name" command (introduced in 4.17.0)
431 // or, do not have "service to queue" mapping set
432 // or, are not actually NetSchedule servers but worker nodes
433 // or, are currently not reachable (behind some firewall)
434 // and need cross connectivity which is usually enabled later
435 //
436 // This guard is set to suppress errors and avoid retries if config loading is not enabled explicitly
437 const auto retry_mode = mode == eImplicit ?
438 SNetServiceImpl::SRetry::eNoRetryNoErrors :
439 SNetServiceImpl::SRetry::eDefault;
440 auto retry_guard = impl->m_Service->CreateRetryGuard(retry_mode);
441
442 CNetScheduleAPI::TQueueParams queue_params;
443
444 try {
445 impl->GetQueueParams(kEmptyStr, queue_params);
446 }
447 catch (...) {
448 if (mode == eExplicit) throw;
449 return false;
450 }
451
452 CRef<CMemoryRegistry> mem_registry(new CMemoryRegistry);
453 const string prefix = m_NsConf ? "ns." : "nc.";
454 const string section = s_GetSection(m_NsConf);
455
456 for (auto& param : queue_params) {
457 auto name = param.first;
458
459 if (Transform(prefix, name)) {
460 mem_registry->Set(section, name, param.second);
461 }
462 }
463
464 if (mem_registry->Empty()) return false;
465
466 m_Registry.Add(mem_registry.GetObject());
467 return true;
468 }
469
MakeAuthString()470 string SNetScheduleAPIImpl::MakeAuthString()
471 {
472 string auth(m_Service->MakeAuthString());
473
474 const CVersionAPI* version = nullptr;
475 string name;
476
477 {{
478 if (CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard()) {
479 version = &app->GetFullVersion();
480 name = app->GetProgramDisplayName();
481 }
482 }}
483
484 if (version && m_ProgramVersion.empty()) {
485 m_ProgramVersion += name;
486 auto package_name = version->GetPackageName();
487
488 if (!package_name.empty()) {
489 m_ProgramVersion += ": ";
490 m_ProgramVersion += package_name;
491 m_ProgramVersion += ' ';
492 m_ProgramVersion += version->GetPackageVersion().Print();
493 m_ProgramVersion += " built on ";
494 m_ProgramVersion += version->GetBuildInfo().date;
495 }
496 }
497
498 if (!m_ProgramVersion.empty()) {
499 auth += " prog=\"";
500 auth += m_ProgramVersion;
501 auth += '\"';
502 }
503
504 switch (m_ClientType) {
505 case CNetScheduleAPI::eCT_Admin:
506 auth += " client_type=\"admin\"";
507 break;
508
509 case CNetScheduleAPI::eCT_Submitter:
510 auth += " client_type=\"submitter\"";
511 break;
512
513 case CNetScheduleAPI::eCT_WorkerNode:
514 auth += " client_type=\"worker node\"";
515 break;
516
517 case CNetScheduleAPI::eCT_Reader:
518 auth += " client_type=\"reader\"";
519
520 default: /* eCT_Auto */
521 break;
522 }
523
524 if (!m_ClientNode.empty()) {
525 auth += " client_node=\"";
526 auth += m_ClientNode;
527 auth += '\"';
528 }
529
530 if (!m_ClientSession.empty()) {
531 auth += " client_session=\"";
532 auth += m_ClientSession;
533 auth += '\"';
534 }
535
536 if (version) {
537 auth += " client_version=\"";
538 auth += version->GetVersionInfo().Print();
539 auth += '\"';
540 }
541
542 ITERATE(SNetScheduleAPIImpl::TAuthParams, it, m_AuthParams) {
543 auth += it->second;
544 }
545
546 auth += " ns_compat_ver=\"" COMPATIBLE_NETSCHEDULE_VERSION "\""
547 "\r\n";
548
549 auth += m_Queue;
550
551 // Make the auth token look like a command to be able to
552 // check for potential authentication/initialization errors
553 // like the "queue not found" error.
554 if (m_Mode & fNonWnCompatible) {
555 auth += "\r\nVERSION";
556 }
557
558 return auth;
559 }
560
GetPropCreator() const561 INetServerConnectionListener::TPropCreator CNetScheduleServerListener::GetPropCreator() const
562 {
563 return [] { return new SNetScheduleServerProperties; };
564 }
565
Clone()566 INetServerConnectionListener* CNetScheduleServerListener::Clone()
567 {
568 return new CNetScheduleServerListener(*this);
569 }
570
Init(CSynRegistry & registry,SRegSynonyms & sections)571 void SNetScheduleAPIImpl::Init(CSynRegistry& registry, SRegSynonyms& sections)
572 {
573 SetDiagUserAndHost();
574
575 m_RetryOnException = registry.Get(sections, "enforce_retry_policy", false);
576
577 if (!m_Queue.empty()) limits::Check<limits::SQueueName>(m_Queue);
578
579 const string& user(GetDiagContext().GetUsername());
580 m_ClientNode =
581 m_Service->GetClientName() + "::" +
582 (user.empty() ? kEmptyStr : user + '@') +
583 GetDiagContext().GetHost();
584
585 CNetScheduleConfigLoader loader(registry, sections);
586
587 bool affinities_initialized = false;
588
589 // There are two phases of Init in case we need to load config from server
590 // 1) Setup as much as possible and try to get config from server
591 // 2) Setup everything using received config from server
592 do {
593 if (m_Queue.empty()) {
594 m_Queue = registry.Get(sections, "queue_name", "");
595 if (!m_Queue.empty()) limits::Check<limits::SQueueName>(m_Queue);
596 }
597
598 m_UseEmbeddedStorage = registry.Get(sections, { "use_embedded_storage", "use_embedded_input" }, true);
599 m_JobGroup = registry.Get(sections, "job_group", "");
600 m_JobTtl = registry.Get(sections, "job_ttl", 0);
601 m_ClientNode = registry.Get(sections, "client_node", m_ClientNode);
602 GetListener()->Scope() = registry.Get(sections, "scope", "");
603
604 if (!affinities_initialized && registry.Get(sections, "use_affinities", false)) {
605 affinities_initialized = true;
606 InitAffinities(registry, sections);
607 }
608
609 if (!m_ClientNode.empty()) {
610 m_ClientSession =
611 NStr::NumericToString(CDiagContext::GetPID()) + '@' +
612 NStr::NumericToString(GetFastLocalTime().GetTimeT()) + ':' +
613 GetDiagContext().GetStringUID();
614 }
615
616 GetListener()->SetAuthString(MakeAuthString());
617
618 // If not working in WN compatible mode
619 if (!(m_Mode & fConfigLoading)) break;
620 } while (loader(this));
621 }
622
OnConnected(CNetServerConnection & connection)623 void CNetScheduleServerListener::OnConnected(CNetServerConnection& connection)
624 {
625 if (m_NonWn) {
626 string version_info(connection.Exec(m_Auth, false));
627
628 CNetServerInfo server_info(new SNetServerInfoImpl(version_info));
629
630 string attr_name, attr_value;
631 string ns_node, ns_session;
632 CVersionInfo version;
633
634 while (server_info.GetNextAttribute(attr_name, attr_value))
635 if (attr_name == "ns_node")
636 ns_node = attr_value;
637 else if (attr_name == "ns_session")
638 ns_session = attr_value;
639 else if (attr_name == "server_version")
640 version = CVersionInfo(attr_value);
641
642 // Usually, all attributes come together, so no need to check version
643 if (!ns_node.empty() && !ns_session.empty()) {
644 auto server_props = connection->m_Server->Get<SNetScheduleServerProperties>();
645
646 // Version cannot change without session, so no need to compare, too
647 if (server_props->ns_node != ns_node ||
648 server_props->ns_session != ns_session) {
649 CFastMutexGuard guard(m_SharedData->m_ServerByNodeMutex);
650 server_props->ns_node = ns_node;
651 server_props->ns_session = ns_session;
652 server_props->version = version;
653 m_SharedData->m_ServerByNode[ns_node] = connection->m_Server->m_ServerInPool;
654 server_props->affs_synced = false;
655 }
656 }
657
658 if (!m_Scope.empty()) {
659 string cmd("SETSCOPE " + m_Scope);
660 g_AppendClientIPSessionIDHitID(cmd);
661 connection.Exec(cmd, false);
662 }
663 } else
664 connection->WriteLine(m_Auth);
665 }
666
OnErrorImpl(const string & err_msg,CNetServer & server)667 void CNetScheduleServerListener::OnErrorImpl(
668 const string& err_msg, CNetServer& server)
669 {
670 string code;
671 string msg;
672
673 if (!NStr::SplitInTwo(err_msg, ":", code, msg)) {
674 if (err_msg == "Job not found") {
675 NCBI_THROW(CNetScheduleException, eJobNotFound, err_msg);
676 }
677 code = err_msg;
678 }
679
680 // Map code into numeric value
681 CException::TErrCode err_code = CNetScheduleExceptionMap::GetCode(code);
682
683 switch (err_code) {
684 case CException::eInvalid:
685 NCBI_THROW(CNetServiceException, eCommunicationError, err_msg);
686
687 case CNetScheduleException::eGroupNotFound:
688 case CNetScheduleException::eAffinityNotFound:
689 case CNetScheduleException::eDuplicateName:
690 // Convert these errors into warnings.
691 OnWarning(msg, server);
692 break;
693
694 case CNetScheduleException::eJobNotFound:
695 NCBI_THROW(CNetScheduleException, eJobNotFound, "Job not found");
696
697 default:
698 NCBI_THROW(CNetScheduleException, EErrCode(err_code), !msg.empty() ?
699 msg : CNetScheduleException::GetErrCodeDescription(err_code));
700 }
701 }
702
OnWarningImpl(const string & warn_msg,CNetServer & server)703 void CNetScheduleServerListener::OnWarningImpl(const string& warn_msg,
704 CNetServer& server)
705 {
706 ERR_POST(Warning << server->m_ServerInPool->m_Address.AsString() <<
707 ": " << warn_msg);
708 }
709
710 const char* const kNetScheduleAPIDriverName = "netschedule_api";
711
SNetScheduleAPIImpl(CSynRegistryBuilder registry_builder,const string & section,const string & service_name,const string & client_name,const string & queue_name,bool wn,bool try_config)712 SNetScheduleAPIImpl::SNetScheduleAPIImpl(CSynRegistryBuilder registry_builder, const string& section,
713 const string& service_name, const string& client_name,
714 const string& queue_name, bool wn, bool try_config) :
715 m_Mode(GetMode(wn, try_config)),
716 m_SharedData(new SNetScheduleSharedData),
717 m_Queue(queue_name)
718 {
719 SRegSynonyms sections{ section, kNetScheduleAPIDriverName };
720 m_Service = SNetServiceImpl::Create("NetScheduleAPI", service_name, client_name,
721 new CNetScheduleServerListener(m_Mode & fNonWnCompatible, m_SharedData),
722 registry_builder, sections);
723 Init(registry_builder, sections);
724 }
725
SNetScheduleAPIImpl(SNetServerInPool * server,SNetScheduleAPIImpl * parent)726 SNetScheduleAPIImpl::SNetScheduleAPIImpl(
727 SNetServerInPool* server, SNetScheduleAPIImpl* parent) :
728 m_Mode(parent->m_Mode),
729 m_SharedData(parent->m_SharedData),
730 m_RetryOnException(parent->m_RetryOnException),
731 m_Service(SNetServiceImpl::Clone(server, parent->m_Service)),
732 m_Queue(parent->m_Queue),
733 m_ProgramVersion(parent->m_ProgramVersion),
734 m_ClientNode(parent->m_ClientNode),
735 m_ClientSession(parent->m_ClientSession),
736 m_AffinityPreference(parent->m_AffinityPreference),
737 m_UseEmbeddedStorage(parent->m_UseEmbeddedStorage)
738 {
739 }
740
CNetScheduleAPI(CNetScheduleAPI::EAppRegistry,const string & conf_section)741 CNetScheduleAPI::CNetScheduleAPI(CNetScheduleAPI::EAppRegistry /*use_app_reg*/,
742 const string& conf_section /* = kEmptyStr */) :
743 m_Impl(new SNetScheduleAPIImpl(nullptr, conf_section))
744 {
745 }
746
CNetScheduleAPI(const IRegistry & reg,const string & conf_section)747 CNetScheduleAPI::CNetScheduleAPI(const IRegistry& reg,
748 const string& conf_section) :
749 m_Impl(new SNetScheduleAPIImpl(reg, conf_section))
750 {
751 }
752
CNetScheduleAPI(CConfig * conf,const string & conf_section)753 CNetScheduleAPI::CNetScheduleAPI(CConfig* conf, const string& conf_section) :
754 m_Impl(new SNetScheduleAPIImpl(conf, conf_section))
755 {
756 }
757
CNetScheduleAPI(const string & service_name,const string & client_name,const string & queue_name)758 CNetScheduleAPI::CNetScheduleAPI(const string& service_name,
759 const string& client_name, const string& queue_name) :
760 m_Impl(new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, queue_name))
761 {
762 }
763
SetProgramVersion(const string & pv)764 void CNetScheduleAPI::SetProgramVersion(const string& pv)
765 {
766 m_Impl->m_ProgramVersion = pv;
767
768 m_Impl->UpdateAuthString();
769 }
770
GetProgramVersion() const771 const string& CNetScheduleAPI::GetProgramVersion() const
772 {
773 return m_Impl->m_ProgramVersion;
774 }
775
GetQueueName() const776 const string& CNetScheduleAPI::GetQueueName() const
777 {
778 return m_Impl->m_Queue;
779 }
780
StatusToString(EJobStatus status)781 string CNetScheduleAPI::StatusToString(EJobStatus status)
782 {
783 switch(status) {
784 case eJobNotFound: return "NotFound";
785 case ePending: return "Pending";
786 case eRunning: return "Running";
787 case eCanceled: return "Canceled";
788 case eFailed: return "Failed";
789 case eDone: return "Done";
790 case eReading: return "Reading";
791 case eConfirmed: return "Confirmed";
792 case eReadFailed: return "ReadFailed";
793 case eDeleted: return "Deleted";
794
795 default: _ASSERT(0);
796 }
797 return kEmptyStr;
798 }
799
800 CNetScheduleAPI::EJobStatus
StringToStatus(const CTempString & status_str)801 CNetScheduleAPI::StringToStatus(const CTempString& status_str)
802 {
803 if (NStr::CompareNocase(status_str, "Pending") == 0)
804 return ePending;
805 if (NStr::CompareNocase(status_str, "Running") == 0)
806 return eRunning;
807 if (NStr::CompareNocase(status_str, "Canceled") == 0)
808 return eCanceled;
809 if (NStr::CompareNocase(status_str, "Failed") == 0)
810 return eFailed;
811 if (NStr::CompareNocase(status_str, "Done") == 0)
812 return eDone;
813 if (NStr::CompareNocase(status_str, "Reading") == 0)
814 return eReading;
815 if (NStr::CompareNocase(status_str, "Confirmed") == 0)
816 return eConfirmed;
817 if (NStr::CompareNocase(status_str, "ReadFailed") == 0)
818 return eReadFailed;
819 if (NStr::CompareNocase(status_str, "Deleted") == 0)
820 return eDeleted;
821
822 return eJobNotFound;
823 }
824
825 #define EXTRACT_WARNING_TYPE(warning_type) \
826 if (NStr::StartsWith(warn_msg, "e" #warning_type ":")) { \
827 warn_msg.erase(0, sizeof("e" #warning_type ":") - 1); \
828 return eWarn##warning_type; \
829 }
830
831 CNetScheduleAPI::ENetScheduleWarningType
ExtractWarningType(string & warn_msg)832 CNetScheduleAPI::ExtractWarningType(string& warn_msg)
833 {
834 EXTRACT_WARNING_TYPE(AffinityNotFound);
835 EXTRACT_WARNING_TYPE(AffinityNotPreferred);
836 EXTRACT_WARNING_TYPE(AffinityAlreadyPreferred);
837 EXTRACT_WARNING_TYPE(GroupNotFound);
838 EXTRACT_WARNING_TYPE(JobNotFound);
839 EXTRACT_WARNING_TYPE(JobAlreadyCanceled);
840 EXTRACT_WARNING_TYPE(JobAlreadyDone);
841 EXTRACT_WARNING_TYPE(JobAlreadyFailed);
842 EXTRACT_WARNING_TYPE(JobPassportOnlyMatch);
843 EXTRACT_WARNING_TYPE(NoParametersChanged);
844 EXTRACT_WARNING_TYPE(ConfigFileNotChanged);
845 EXTRACT_WARNING_TYPE(AlertNotFound);
846 EXTRACT_WARNING_TYPE(AlertAlreadyAcknowledged);
847 EXTRACT_WARNING_TYPE(SubmitsDisabledForServer);
848 EXTRACT_WARNING_TYPE(QueueAlreadyPaused);
849 EXTRACT_WARNING_TYPE(QueueNotPaused);
850 EXTRACT_WARNING_TYPE(CommandObsolete);
851 EXTRACT_WARNING_TYPE(JobNotRead);
852 return eWarnUnknown;
853 }
854
855 #define WARNING_TYPE_TO_STRING(warning_type) \
856 case eWarn##warning_type: \
857 return #warning_type;
858
WarningTypeToString(CNetScheduleAPI::ENetScheduleWarningType warning_type)859 const char* CNetScheduleAPI::WarningTypeToString(
860 CNetScheduleAPI::ENetScheduleWarningType warning_type)
861 {
862 switch (warning_type) {
863 WARNING_TYPE_TO_STRING(AffinityNotFound);
864 WARNING_TYPE_TO_STRING(AffinityNotPreferred);
865 WARNING_TYPE_TO_STRING(AffinityAlreadyPreferred);
866 WARNING_TYPE_TO_STRING(GroupNotFound);
867 WARNING_TYPE_TO_STRING(JobNotFound);
868 WARNING_TYPE_TO_STRING(JobAlreadyCanceled);
869 WARNING_TYPE_TO_STRING(JobAlreadyDone);
870 WARNING_TYPE_TO_STRING(JobAlreadyFailed);
871 WARNING_TYPE_TO_STRING(JobPassportOnlyMatch);
872 WARNING_TYPE_TO_STRING(NoParametersChanged);
873 WARNING_TYPE_TO_STRING(ConfigFileNotChanged);
874 WARNING_TYPE_TO_STRING(AlertNotFound);
875 WARNING_TYPE_TO_STRING(AlertAlreadyAcknowledged);
876 WARNING_TYPE_TO_STRING(SubmitsDisabledForServer);
877 WARNING_TYPE_TO_STRING(QueueAlreadyPaused);
878 WARNING_TYPE_TO_STRING(QueueNotPaused);
879 WARNING_TYPE_TO_STRING(CommandObsolete);
880 WARNING_TYPE_TO_STRING(JobNotRead);
881 default:
882 return "eWarnUnknown";
883 }
884 }
885
GetSubmitter()886 CNetScheduleSubmitter CNetScheduleAPI::GetSubmitter()
887 {
888 return new SNetScheduleSubmitterImpl(m_Impl);
889 }
890
GetExecutor()891 CNetScheduleExecutor CNetScheduleAPI::GetExecutor()
892 {
893 return new SNetScheduleExecutorImpl(m_Impl);
894 }
895
GetJobReader(const string & group,const string & affinity)896 CNetScheduleJobReader CNetScheduleAPI::GetJobReader(const string& group,
897 const string& affinity)
898 {
899 m_Impl->AllocNotificationThread();
900 return new SNetScheduleJobReaderImpl(m_Impl, group, affinity);
901 }
902
GetAdmin()903 CNetScheduleAdmin CNetScheduleAPI::GetAdmin()
904 {
905 return new SNetScheduleAdminImpl(m_Impl);
906 }
907
GetService()908 CNetService CNetScheduleAPI::GetService()
909 {
910 return m_Impl->m_Service;
911 }
912
s_SetJobExpTime(time_t * job_exptime,const string & time_str)913 static void s_SetJobExpTime(time_t* job_exptime, const string& time_str)
914 {
915 if (job_exptime != NULL)
916 *job_exptime = (time_t) NStr::StringToUInt8(time_str,
917 NStr::fConvErr_NoThrow);
918 }
919
s_SetPauseMode(ENetScheduleQueuePauseMode * pause_mode,const string & mode_str)920 static void s_SetPauseMode(ENetScheduleQueuePauseMode* pause_mode,
921 const string& mode_str)
922 {
923 if (pause_mode != NULL)
924 *pause_mode = mode_str.empty() ? eNSQ_NoPause :
925 mode_str == "pullback" ? eNSQ_WithPullback :
926 eNSQ_WithoutPullback;
927 }
928
GetJobDetails(CNetScheduleJob & job,time_t * job_exptime,ENetScheduleQueuePauseMode * pause_mode)929 CNetScheduleAPI::EJobStatus CNetScheduleAPI::GetJobDetails(
930 CNetScheduleJob& job,
931 time_t* job_exptime,
932 ENetScheduleQueuePauseMode* pause_mode)
933 {
934 string cmd("STATUS2 " + job.job_id);
935 g_AppendClientIPSessionIDHitID(cmd);
936 cmd += " need_progress_msg=1";
937 auto response = m_Impl->ExecOnJobServer(job, cmd);
938
939 SNetScheduleOutputParser parser(response);
940
941 const auto status = StringToStatus(parser("job_status"));
942
943 s_SetJobExpTime(job_exptime, parser("job_exptime"));
944 s_SetPauseMode(pause_mode, parser("pause"));
945
946 switch (status) {
947 case ePending:
948 case eRunning:
949 case eCanceled:
950 case eFailed:
951 case eDone:
952 case eReading:
953 case eConfirmed:
954 case eReadFailed:
955 job.input = parser("input");
956 job.output = parser("output");
957 job.ret_code = NStr::StringToInt(parser("ret_code"), NStr::fConvErr_NoThrow);
958 job.error_msg = parser("err_msg");
959 break;
960
961 default:
962 job.input.erase();
963 job.ret_code = 0;
964 job.output.erase();
965 job.error_msg.erase();
966 }
967
968 job.affinity.erase();
969 job.mask = CNetScheduleAPI::eEmptyMask;
970 job.progress_msg = parser("msg");
971
972 return status;
973 }
974
GetJobStatus(string cmd,const CNetScheduleJob & job,time_t * job_exptime,ENetScheduleQueuePauseMode * pause_mode)975 CNetScheduleAPI::EJobStatus SNetScheduleAPIImpl::GetJobStatus(string cmd,
976 const CNetScheduleJob& job, time_t* job_exptime,
977 ENetScheduleQueuePauseMode* pause_mode)
978 {
979 string response;
980
981 try {
982 cmd += ' ';
983 cmd += job.job_id;
984 g_AppendClientIPSessionIDHitID(cmd);
985 response = ExecOnJobServer(job, cmd);
986 }
987 catch (CNetScheduleException& e) {
988 if (e.GetErrCode() != CNetScheduleException::eJobNotFound)
989 throw;
990
991 if (job_exptime != NULL)
992 *job_exptime = 0;
993
994 return CNetScheduleAPI::eJobNotFound;
995 }
996
997 SNetScheduleOutputParser parser(response);
998
999 s_SetJobExpTime(job_exptime, parser("job_exptime"));
1000 s_SetPauseMode(pause_mode, parser("pause"));
1001
1002 return CNetScheduleAPI::StringToStatus(parser("job_status"));
1003 }
1004
GetServerByNode(const string & ns_node,CNetServer * server)1005 bool SNetScheduleAPIImpl::GetServerByNode(const string& ns_node,
1006 CNetServer* server)
1007 {
1008 SNetServerInPool* known_server; /* NCBI_FAKE_WARNING */
1009
1010 {{
1011 CFastMutexGuard guard(m_SharedData->m_ServerByNodeMutex);
1012
1013 auto server_props_it = m_SharedData->m_ServerByNode.find(ns_node);
1014
1015 if (server_props_it == m_SharedData->m_ServerByNode.end())
1016 return false;
1017
1018 known_server = server_props_it->second;
1019 }}
1020
1021 *server = new SNetServerImpl(m_Service,
1022 m_Service->m_ServerPool->ReturnServer(known_server));
1023
1024 return true;
1025 }
1026
1027 const CNetScheduleAPI::SServerParams&
operator ()(CNetService & service,const string & queue)1028 SNetScheduleAPIImpl::SServerParamsSync::operator()(CNetService& service, const string& queue)
1029 {
1030 CFastMutexGuard g(m_FastMutex);
1031
1032 if (m_AskCount-- > 0) return m_ServerParams;
1033
1034 m_AskCount = kAskMaxCount;
1035
1036 m_ServerParams.max_input_size = kNetScheduleMaxDBDataSize;
1037 m_ServerParams.max_output_size = kNetScheduleMaxDBDataSize;
1038
1039 string cmd("QINF2 " + queue);
1040 g_AppendClientIPSessionIDHitID(cmd);
1041
1042 CUrlArgs url_parser(service.FindServerAndExec(cmd, false).response);
1043
1044 enum {
1045 eMaxInputSize,
1046 eMaxOutputSize,
1047 eNumberOfSizeParams
1048 };
1049
1050 int field_bits = 0;
1051
1052 ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1053 if (field->name[0] == 'm') {
1054 if (field->name == "max_input_size") {
1055 field_bits |= (1 << eMaxInputSize);
1056 m_ServerParams.max_input_size =
1057 NStr::StringToInt(field->value);
1058 } else if (field->name == "max_output_size") {
1059 field_bits |= (1 << eMaxOutputSize);
1060 m_ServerParams.max_output_size =
1061 NStr::StringToInt(field->value);
1062 }
1063 }
1064 if (field_bits == (1 << eNumberOfSizeParams) - 1)
1065 break;
1066 }
1067
1068 return m_ServerParams;
1069 }
1070
GetServerParams()1071 const CNetScheduleAPI::SServerParams& CNetScheduleAPI::GetServerParams()
1072 {
1073 return m_Impl->GetServerParams();
1074 }
1075
GetQueueParams(const string & queue_name,TQueueParams & queue_params)1076 void SNetScheduleAPIImpl::GetQueueParams(
1077 const string& queue_name, TQueueParams& queue_params)
1078 {
1079 string cmd("QINF2 ");
1080
1081 if (!queue_name.empty()) {
1082 limits::Check<limits::SQueueName>(queue_name);
1083
1084 cmd += queue_name;
1085 } else if (!m_Queue.empty()) {
1086 cmd += m_Queue;
1087 } else {
1088 cmd += "service=" + m_Service->m_ServiceName;
1089 }
1090
1091 g_AppendClientIPSessionIDHitID(cmd);
1092
1093 CUrlArgs url_parser(m_Service.FindServerAndExec(cmd, false).response);
1094
1095 ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1096 queue_params[field->name] = field->value;
1097 }
1098 }
1099
GetQueueParams(const string & queue_name,TQueueParams & queue_params)1100 void CNetScheduleAPI::GetQueueParams(
1101 const string& queue_name, TQueueParams& queue_params)
1102 {
1103 return m_Impl->GetQueueParams(queue_name, queue_params);
1104 }
1105
GetQueueParams(TQueueParams & queue_params)1106 void SNetScheduleAPIImpl::GetQueueParams(TQueueParams& queue_params)
1107 {
1108 string cmd("GETP2");
1109 g_AppendClientIPSessionIDHitID(cmd);
1110
1111 CUrlArgs url_parser(m_Service.FindServerAndExec(cmd,
1112 false).response);
1113
1114 ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1115 queue_params[field->name] = field->value;
1116 }
1117 }
1118
GetQueueParams(TQueueParams & queue_params)1119 void CNetScheduleAPI::GetQueueParams(TQueueParams& queue_params)
1120 {
1121 return m_Impl->GetQueueParams(queue_params);
1122 }
1123
GetProgressMsg(CNetScheduleJob & job)1124 void CNetScheduleAPI::GetProgressMsg(CNetScheduleJob& job)
1125 {
1126 string cmd("MGET " + job.job_id);
1127 g_AppendClientIPSessionIDHitID(cmd);
1128 auto response = m_Impl->ExecOnJobServer(job, cmd);
1129 job.progress_msg = NStr::ParseEscapes(response);
1130 }
1131
SetClientNode(const string & client_node)1132 void CNetScheduleAPI::SetClientNode(const string& client_node)
1133 {
1134 // Cannot add this to limits::SClientNode due to CNetScheduleAPIExt allowing reset to empty values
1135 if (client_node.empty()) {
1136 NCBI_THROW_FMT(CConfigException, eParameterMissing,
1137 "'" << limits::SClientNode::Name() << "' cannot be empty");
1138 }
1139
1140 limits::Check<limits::SClientNode>(client_node);
1141
1142 m_Impl->m_ClientNode = client_node;
1143
1144 m_Impl->UpdateAuthString();
1145 }
1146
SetClientSession(const string & client_session)1147 void CNetScheduleAPI::SetClientSession(const string& client_session)
1148 {
1149 // Cannot add this to limits::SClientSession due to CNetScheduleAPIExt allowing reset to empty values
1150 if (client_session.empty()) {
1151 NCBI_THROW_FMT(CConfigException, eParameterMissing,
1152 "'" << limits::SClientSession::Name() << "' cannot be empty");
1153 }
1154
1155 limits::Check<limits::SClientSession>(client_session);
1156
1157 m_Impl->m_ClientSession = client_session;
1158
1159 m_Impl->UpdateAuthString();
1160 }
1161
UpdateAuthString()1162 void SNetScheduleAPIImpl::UpdateAuthString()
1163 {
1164 m_Service->m_ServerPool->ResetServerConnections();
1165
1166 GetListener()->SetAuthString(MakeAuthString());
1167 }
1168
SetClientType(CNetScheduleAPI::EClientType client_type)1169 void CNetScheduleAPI::SetClientType(CNetScheduleAPI::EClientType client_type)
1170 {
1171 m_Impl->m_ClientType = client_type;
1172
1173 m_Impl->UpdateAuthString();
1174 }
1175
UseOldStyleAuth()1176 void SNetScheduleAPIImpl::UseOldStyleAuth()
1177 {
1178 m_Service->m_ServerPool->m_UseOldStyleAuth = true;
1179
1180 UpdateAuthString();
1181 }
1182
SetAuthParam(const string & param_name,const string & param_value)1183 void SNetScheduleAPIImpl::SetAuthParam(const string& param_name,
1184 const string& param_value)
1185 {
1186 if (!param_value.empty()) {
1187 string auth_param(' ' + param_name);
1188 auth_param += "=\"";
1189 auth_param += NStr::PrintableString(param_value);
1190 auth_param += '"';
1191 m_AuthParams[param_name] = auth_param;
1192 } else
1193 m_AuthParams.erase(param_name);
1194
1195 UpdateAuthString();
1196 }
1197
InitAffinities(CSynRegistry & registry,const SRegSynonyms & sections)1198 void SNetScheduleAPIImpl::InitAffinities(CSynRegistry& registry, const SRegSynonyms& sections)
1199 {
1200 const bool claim_new_affinities = registry.Get(sections, "claim_new_affinities", false);
1201 const bool process_any_job = registry.Get(sections, "process_any_job", false);
1202 const string affinity_list = registry.Get(sections, "affinity_list", kEmptyStr);
1203 const string affinity_ladder = registry.Get(sections, "affinity_ladder", kEmptyStr);
1204
1205 if (affinity_ladder.empty()) {
1206
1207 if (claim_new_affinities) {
1208 m_AffinityPreference = CNetScheduleExecutor::eClaimNewPreferredAffs;
1209
1210 } else if (process_any_job) {
1211 m_AffinityPreference = CNetScheduleExecutor::ePreferredAffsOrAnyJob;
1212
1213 } else {
1214 m_AffinityPreference = CNetScheduleExecutor::ePreferredAffinities;
1215 }
1216
1217 if (affinity_list.empty()) return;
1218
1219 NStr::Split(affinity_list, ", ", m_AffinityList,
1220 NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
1221
1222 for (auto& affinity : m_AffinityList) {
1223 limits::Check<limits::SAffinity>(affinity);
1224 }
1225
1226 return;
1227 }
1228
1229 // Sanity checks
1230 if (claim_new_affinities) {
1231 NCBI_THROW(CConfigException, eInvalidParameter,
1232 "'affinity_ladder' cannot be used with 'claim_new_affinities'");
1233 }
1234 if (!affinity_list.empty()) {
1235 NCBI_THROW(CConfigException, eInvalidParameter,
1236 "'affinity_ladder' cannot be used with 'affinity_list'");
1237 }
1238
1239 if (!process_any_job) {
1240 m_AffinityPreference = CNetScheduleExecutor::eExplicitAffinitiesOnly;
1241 }
1242
1243 list<CTempString> affinities;
1244 NStr::Split(affinity_ladder, ", ", affinities,
1245 NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
1246
1247 if (affinities.empty()) return;
1248
1249 string affinity_step;
1250
1251 for (auto& affinity : affinities) {
1252 limits::Check<limits::SAffinity>(affinity);
1253
1254 if (!affinity_step.empty()) affinity_step += ',';
1255 affinity_step += affinity;
1256 m_AffinityLadder.emplace_back(affinity, affinity_step);
1257 }
1258 }
1259
1260 ///////////////////////////////////////////////////////////////////////////////
1261
1262 /// @internal
1263 class CNetScheduleAPICF : public IClassFactory<SNetScheduleAPIImpl>
1264 {
1265 public:
1266
1267 typedef SNetScheduleAPIImpl TDriver;
1268 typedef SNetScheduleAPIImpl IFace;
1269 typedef IFace TInterface;
1270 typedef IClassFactory<SNetScheduleAPIImpl> TParent;
1271 typedef TParent::SDriverInfo TDriverInfo;
1272 typedef TParent::TDriverList TDriverList;
1273
1274 /// Construction
1275 ///
1276 /// @param driver_name
1277 /// Driver name string
1278 /// @param patch_level
1279 /// Patch level implemented by the driver.
1280 /// By default corresponds to interface patch level.
CNetScheduleAPICF(const string & driver_name=kNetScheduleAPIDriverName,int patch_level=-1)1281 CNetScheduleAPICF(const string& driver_name = kNetScheduleAPIDriverName,
1282 int patch_level = -1)
1283 : m_DriverVersionInfo
1284 (ncbi::CInterfaceVersion<IFace>::eMajor,
1285 ncbi::CInterfaceVersion<IFace>::eMinor,
1286 patch_level >= 0 ?
1287 patch_level : ncbi::CInterfaceVersion<IFace>::ePatchLevel),
1288 m_DriverName(driver_name)
1289 {
1290 _ASSERT(!m_DriverName.empty());
1291 }
1292
1293 /// Create instance of TDriver
1294 virtual TInterface*
CreateInstance(const string & driver=kEmptyStr,CVersionInfo version=NCBI_INTERFACE_VERSION (IFace),const TPluginManagerParamTree * params=0) const1295 CreateInstance(const string& driver = kEmptyStr,
1296 CVersionInfo version = NCBI_INTERFACE_VERSION(IFace),
1297 const TPluginManagerParamTree* params = 0) const
1298 {
1299 if (params && (driver.empty() || driver == m_DriverName) &&
1300 version.Match(NCBI_INTERFACE_VERSION(IFace)) !=
1301 CVersionInfo::eNonCompatible) {
1302 CConfig config(params);
1303 return new SNetScheduleAPIImpl(&config, m_DriverName);
1304 }
1305 return NULL;
1306 }
1307
GetDriverVersions(TDriverList & info_list) const1308 void GetDriverVersions(TDriverList& info_list) const
1309 {
1310 info_list.push_back(TDriverInfo(m_DriverName, m_DriverVersionInfo));
1311 }
1312
1313 protected:
1314 CVersionInfo m_DriverVersionInfo;
1315 string m_DriverName;
1316 };
1317
1318
NCBI_EntryPoint_xnetscheduleapi(CPluginManager<SNetScheduleAPIImpl>::TDriverInfoList & info_list,CPluginManager<SNetScheduleAPIImpl>::EEntryPointRequest method)1319 void NCBI_XCONNECT_EXPORT NCBI_EntryPoint_xnetscheduleapi(
1320 CPluginManager<SNetScheduleAPIImpl>::TDriverInfoList& info_list,
1321 CPluginManager<SNetScheduleAPIImpl>::EEntryPointRequest method)
1322 {
1323 CHostEntryPointImpl<CNetScheduleAPICF>::
1324 NCBI_EntryPointImpl(info_list, method);
1325
1326 }
1327
1328
AddToClientNode(const string & data)1329 void CNetScheduleAPIExt::AddToClientNode(const string& data)
1330 {
1331 string& client_node(m_Impl->m_ClientNode);
1332 client_node += ':';
1333 client_node += data;
1334 UpdateAuthString();
1335 }
1336
UpdateAuthString()1337 void CNetScheduleAPIExt::UpdateAuthString()
1338 {
1339 m_Impl->UpdateAuthString();
1340 }
1341
UseOldStyleAuth()1342 void CNetScheduleAPIExt::UseOldStyleAuth()
1343 {
1344 m_Impl->UseOldStyleAuth();
1345 }
1346
GetCompoundIDPool()1347 CCompoundIDPool CNetScheduleAPIExt::GetCompoundIDPool()
1348 {
1349 return m_Impl->m_CompoundIDPool;
1350 }
1351
GetServer(CNetServer::TInstance server)1352 CNetScheduleAPI CNetScheduleAPIExt::GetServer(CNetServer::TInstance server)
1353 {
1354 return new SNetScheduleAPIImpl(server->m_ServerInPool, m_Impl);
1355 }
1356
ReSetClientNode(const string & client_node)1357 void CNetScheduleAPIExt::ReSetClientNode(const string& client_node)
1358 {
1359 limits::Check<limits::SClientNode>(client_node);
1360 m_Impl->m_ClientNode = client_node;
1361 m_Impl->UpdateAuthString();
1362 }
1363
ReSetClientSession(const string & client_session)1364 void CNetScheduleAPIExt::ReSetClientSession(const string& client_session)
1365 {
1366 limits::Check<limits::SClientSession>(client_session);
1367 m_Impl->m_ClientSession = client_session;
1368 m_Impl->UpdateAuthString();
1369 }
1370
1371 CNetScheduleAPI::TInstance
CreateWnCompat(const string & service_name,const string & client_name)1372 CNetScheduleAPIExt::CreateWnCompat(const string& service_name,
1373 const string& client_name)
1374 {
1375 return new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, kEmptyStr, true, false);
1376 }
1377
1378 CNetScheduleAPI::TInstance
CreateNoCfgLoad(const string & service_name,const string & client_name,const string & queue_name)1379 CNetScheduleAPIExt::CreateNoCfgLoad(const string& service_name,
1380 const string& client_name, const string& queue_name)
1381 {
1382 return new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, queue_name, false, false);
1383 }
1384
1385
1386 END_NCBI_SCOPE
1387