1 /*  $Id: netschedule_api.cpp 599189 2019-12-20 15:57:19Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author:  Anatoliy Kuznetsov, Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *   Implementation of NetSchedule API.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "netschedule_api_impl.hpp"
36 
37 #include <connect/ncbi_socket.hpp>
38 #include <connect/ncbi_conn_exception.hpp>
39 #include <connect/ncbi_userhost.hpp>
40 
41 #include <corelib/ncbi_system.hpp>
42 #include <corelib/plugin_manager_impl.hpp>
43 
44 #include <array>
45 #include <memory>
46 #include <stdio.h>
47 
48 
49 #define COMPATIBLE_NETSCHEDULE_VERSION "4.10.0"
50 
51 
52 BEGIN_NCBI_SCOPE
53 
54 namespace grid {
55 namespace netschedule {
56 namespace limits {
57 
ThrowIllegalChar(const string & name,const string & value,char c)58 void ThrowIllegalChar(const string& name, const string& value, char c)
59 {
60     NCBI_THROW_FMT(CConfigException, eInvalidParameter,
61             "Invalid character '" << NStr::PrintableString(CTempString(&c, 1)) <<
62             "' in the " << name << " \"" << NStr::PrintableString(value) << "\".");
63 }
64 
65 }
66 }
67 }
68 
69 using namespace grid::netschedule;
70 
SNetScheduleNotificationThread(SNetScheduleAPIImpl * ns_api)71 SNetScheduleNotificationThread::SNetScheduleNotificationThread(
72         SNetScheduleAPIImpl* ns_api) :
73     m_API(ns_api),
74     m_StopThread(false)
75 {
76 }
77 
SNetScheduleNotificationReceiver()78 SNetScheduleNotificationReceiver::SNetScheduleNotificationReceiver()
79 {
80     STimeout rto;
81     rto.sec = rto.usec = 0;
82     socket.SetDataLogging(TServConn_ConnDataLogging::GetDefault() ? eOn : eOff);
83     socket.SetTimeout(eIO_Read, &rto);
84 
85     EIO_Status status = socket.Bind(0);
86     if (status != eIO_Success) {
87         NCBI_THROW_FMT(CException, eUnknown,
88             "Could not bind a UDP socket: " << IO_StatusStr(status));
89     }
90 
91     port = socket.GetLocalPort(eNH_HostByteOrder);
92 }
93 
CmdAppendPortAndTimeout(string * cmd,unsigned remaining_seconds)94 void SNetScheduleNotificationThread::CmdAppendPortAndTimeout(
95         string* cmd, unsigned remaining_seconds)
96 {
97     if (remaining_seconds > 0) {
98         *cmd += " port=";
99         *cmd += NStr::UIntToString(GetPort());
100 
101         *cmd += " timeout=";
102         *cmd += NStr::UIntToString(remaining_seconds);
103     }
104 }
105 
CNetScheduleNotificationHandler()106 CNetScheduleNotificationHandler::CNetScheduleNotificationHandler()
107 {
108 }
109 
s_CreateCUrlArgs(const string & output)110 CUrlArgs s_CreateCUrlArgs(const string& output)
111 try {
112     return CUrlArgs(output);
113 }
114 catch (CUrlParserException&) {
115     return CUrlArgs();
116 }
117 
SNetScheduleOutputParser(const string & output)118 SNetScheduleOutputParser::SNetScheduleOutputParser(const string& output) :
119         CUrlArgs(s_CreateCUrlArgs(output))
120 {
121 }
122 
operator ()(const string & param) const123 const string& SNetScheduleOutputParser::operator()(const string& param) const
124 {
125     auto it = FindFirst(param);
126     return it != GetArgs().end() ? it->value : kEmptyStr;
127 }
128 
g_ParseNSOutput(const string & attr_string,const char * const * attr_names,string * attr_values,size_t attr_count)129 int g_ParseNSOutput(const string& attr_string, const char* const* attr_names,
130         string* attr_values, size_t attr_count)
131 {
132     try {
133         CUrlArgs attr_parser(attr_string);
134         const CUrlArgs::TArgs& attr_list = attr_parser.GetArgs();
135 
136         int found_attrs = 0;
137 
138         CUrlArgs::const_iterator attr_it;
139 
140         do {
141             if ((attr_it = attr_parser.FindFirst(*attr_names)) !=
142                     attr_list.end()) {
143                 *attr_values = attr_it->value;
144                 ++found_attrs;
145             }
146             ++attr_names;
147             ++attr_values;
148         } while (--attr_count > 0);
149 
150         return found_attrs;
151     }
152     catch (CUrlParserException&) {
153     }
154 
155     return -1;
156 }
157 
158 SNetScheduleNotificationThread::ENotificationType
CheckNotification(string * ns_node)159         SNetScheduleNotificationThread::CheckNotification(string* ns_node)
160 {
161     _ASSERT(ns_node);
162 
163     SNetScheduleOutputParser parser(m_Receiver.message);
164 
165     if (parser("queue") != m_API->m_Queue) return eNT_Unknown;
166 
167     *ns_node = parser("ns_node");
168 
169     const auto reason = parser("reason");
170 
171     if (reason.empty())
172         return eNT_GetNotification;
173     else if (NStr::CompareCase(reason, CTempString("get", 3)) == 0)
174         return eNT_GetNotification;
175     else if (NStr::CompareCase(reason, CTempString("read", 4)) == 0)
176         return eNT_ReadNotification;
177     else
178         return eNT_Unknown;
179 }
180 
InterruptWait()181 void SServerNotifications::InterruptWait()
182 {
183     CFastMutexGuard guard(m_Mutex);
184 
185     if (m_Interrupted)
186         m_NotificationSemaphore.TryWait();
187     else {
188         m_Interrupted = true;
189         if (!m_ReadyServers.empty())
190             m_NotificationSemaphore.TryWait();
191     }
192 
193     m_NotificationSemaphore.Post();
194 }
195 
RegisterServer(const string & ns_node)196 void SServerNotifications::RegisterServer(const string& ns_node)
197 {
198     CFastMutexGuard guard(m_Mutex);
199 
200     if (!m_ReadyServers.empty())
201         m_Interrupted = false;
202     else {
203         x_ClearInterruptFlag();
204         m_NotificationSemaphore.Post();
205     }
206 
207     m_ReadyServers.insert(ns_node);
208 }
209 
GetNextNotification(string * ns_node)210 bool SServerNotifications::GetNextNotification(string* ns_node)
211 {
212     CFastMutexGuard guard(m_Mutex);
213 
214     x_ClearInterruptFlag();
215 
216     if (m_ReadyServers.empty())
217         return false;
218 
219     TReadyServers::iterator next_server = m_ReadyServers.begin();
220     *ns_node = *next_server;
221     m_ReadyServers.erase(next_server);
222 
223     if (m_ReadyServers.empty())
224         // Make sure the notification semaphore count is reset to zero.
225         m_NotificationSemaphore.TryWait();
226 
227     return true;
228 }
229 
Main()230 void* SNetScheduleNotificationThread::Main()
231 {
232     SetCurrentThreadName(
233             (CNcbiApplication::Instance()->GetProgramDisplayName() +
234                     "_nt").c_str());
235 
236     static const STimeout two_seconds = {2, 0};
237 
238     string server_host;
239 
240     while (!m_StopThread)
241         if (m_Receiver.socket.Wait(&two_seconds) == eIO_Success) {
242             if (m_StopThread)
243                 break;
244 
245             if (m_Receiver(&server_host)) {
246                 string ns_node;
247 
248                 ENotificationType notif_type = CheckNotification(&ns_node);
249 
250                 switch (notif_type) {
251                 case eNT_GetNotification:
252                     m_GetNotifications.RegisterServer(ns_node);
253                     break;
254                 case eNT_ReadNotification:
255                     m_ReadNotifications.RegisterServer(ns_node);
256                     break;
257                 default:
258                     break;
259                 }
260             }
261         }
262     return NULL;
263 }
264 
operator ()(string * server_host)265 bool SNetScheduleNotificationReceiver::operator()(string* server_host)
266 {
267     array<char, 64 * 1024> buffer; // Enough to hold any UDP
268     size_t msg_len;
269 
270     if (socket.Recv(buffer.data(), buffer.size(), &msg_len,
271             server_host, NULL) != eIO_Success)
272         return false;
273 
274     _ASSERT(buffer.size() > msg_len);
275     buffer[msg_len] = '\0'; // Make it null-terminated in case it's not
276     message.assign(buffer.data()); // Ignore everything after the first null character
277 
278     return true;
279 }
280 
ReceiveNotification(string * server_host)281 bool CNetScheduleNotificationHandler::ReceiveNotification(string* server_host)
282 {
283     return m_Receiver(server_host);
284 }
285 
WaitForNotification(const CDeadline & deadline,string * server_host)286 bool CNetScheduleNotificationHandler::WaitForNotification(
287         const CDeadline& deadline, string* server_host)
288 {
289     STimeout timeout;
290 
291     for (;;) {
292         deadline.GetRemainingTime().Get(&timeout.sec, &timeout.usec);
293 
294         if (timeout.sec == 0 && timeout.usec == 0)
295             return false;
296 
297         switch (m_Receiver.socket.Wait(&timeout)) {
298         case eIO_Timeout:
299             return false;
300 
301         case eIO_Success:
302             if (ReceiveNotification(server_host))
303                 return true;
304             /* FALL THROUGH */
305 
306         default:
307             break;
308         }
309     }
310 
311     return false;
312 }
313 
PrintPortNumber()314 void CNetScheduleNotificationHandler::PrintPortNumber()
315 {
316     printf("Using UDP port %hu\n", GetPort());
317 }
318 
AllocNotificationThread()319 void SNetScheduleAPIImpl::AllocNotificationThread()
320 {
321     CFastMutexGuard guard(m_NotificationThreadMutex);
322 
323     if (m_NotificationThread == NULL)
324         m_NotificationThread = new SNetScheduleNotificationThread(this);
325 }
326 
StartNotificationThread()327 void SNetScheduleAPIImpl::StartNotificationThread()
328 {
329     if (m_NotificationThreadStartStopCounter.Add(1) == 1)
330         m_NotificationThread->Run();
331 }
332 
~SNetScheduleAPIImpl()333 SNetScheduleAPIImpl::~SNetScheduleAPIImpl()
334 {
335     if (m_NotificationThreadStartStopCounter.Add(-1) == 0) {
336         CFastMutexGuard guard(m_NotificationThreadMutex);
337 
338         if (m_NotificationThread != NULL) {
339             m_NotificationThread->m_StopThread = true;
340             CDatagramSocket().Send("INTERRUPT", sizeof("INTERRUPT"),
341                     "127.0.0.1", m_NotificationThread->m_Receiver.port);
342             m_NotificationThread->Join();
343         }
344     }
345 }
346 
x_ClearNode()347 void SNetScheduleAPIImpl::x_ClearNode()
348 {
349     string cmd("CLRN");
350     g_AppendClientIPSessionIDHitID(cmd);
351 
352     for (CNetServiceIterator it =
353             m_Service.Iterate(CNetService::eIncludePenalized); it; ++it) {
354         CNetServer server = *it;
355 
356         try {
357             CNetServer::SExecResult exec_result;
358             server->ConnectAndExec(cmd, false, exec_result);
359         } catch (CNetSrvConnException& e) {
360             if (m_Service.IsLoadBalanced()) {
361                 ERR_POST(server->m_ServerInPool->m_Address.AsString() <<
362                     ": " << e.what());
363             }
364         } catch (CNetServiceException& e) {
365             if (e.GetErrCode() != CNetServiceException::eCommunicationError)
366                 throw;
367             else {
368                 ERR_POST(server->m_ServerInPool->m_Address.AsString() <<
369                     ": " << e.what());
370             }
371         }
372     }
373 }
374 
s_GetSection(bool ns_conf)375 CTempString s_GetSection(bool ns_conf)
376 {
377     return ns_conf ? "netschedule_conf_from_netschedule" : "netcache_conf_from_netschedule";
378 }
379 
CNetScheduleConfigLoader(CSynRegistry & registry,SRegSynonyms & sections,bool ns_conf)380 CNetScheduleConfigLoader::CNetScheduleConfigLoader(CSynRegistry& registry, SRegSynonyms& sections, bool ns_conf) :
381     m_Registry(registry), m_Sections(sections), m_NsConf(ns_conf), m_Mode(eImplicit)
382 {
383     sections.Insert(s_GetSection(m_NsConf));
384 
385     const auto param = "load_config_from_ns";
386 
387     if (m_Registry.Has(m_Sections, param)) {
388         m_Mode = m_Registry.Get(m_Sections, param, true) ? eExplicit : eOff;
389     }
390 }
391 
Transform(const string & prefix,string & name) const392 bool CNetScheduleConfigLoader::Transform(const string& prefix, string& name) const
393 {
394     if (m_NsConf) {
395         // If it's "service to queue" special case (we do not know queue name)
396         if (name == "queue_name") return true;
397 
398         // Queue parameter "timeout" determines the initial TTL of a submitted job.
399         // Since "timeout" is too generic, replaced it with "job_ttl" on client side.
400         if (name == "timeout") {
401             name = "job_ttl";
402             return true;
403         }
404     }
405 
406     // Do not load client_name from server
407     if (name == "client_name") return false;
408 
409     // Only params starting with provided prefix are used
410     if (NStr::StartsWith(name, prefix)) {
411         name.erase(0, prefix.size());
412         return true;
413     }
414 
415     return false;
416 }
417 
operator ()(SNetScheduleAPIImpl * impl)418 bool CNetScheduleConfigLoader::operator()(SNetScheduleAPIImpl* impl)
419 {
420     _ASSERT(impl);
421 
422     if (m_Mode == eOff) return false;
423 
424     // Turn off any subsequent attempts
425     const auto mode = m_Mode;
426     m_Mode = eOff;
427 
428     // Errors could happen when we try to load config from servers that either
429     // do not support "GETP2" command (introduced in 4.16.9)
430     // or, do not support "QINF2 service=name" command (introduced in 4.17.0)
431     // or, do not have "service to queue" mapping set
432     // or, are not actually NetSchedule servers but worker nodes
433     // or, are currently not reachable (behind some firewall)
434     // and need cross connectivity which is usually enabled later
435     //
436     // This guard is set to suppress errors and avoid retries if config loading is not enabled explicitly
437     const auto retry_mode = mode == eImplicit ?
438         SNetServiceImpl::SRetry::eNoRetryNoErrors :
439         SNetServiceImpl::SRetry::eDefault;
440     auto retry_guard = impl->m_Service->CreateRetryGuard(retry_mode);
441 
442     CNetScheduleAPI::TQueueParams queue_params;
443 
444     try {
445         impl->GetQueueParams(kEmptyStr, queue_params);
446     }
447     catch (...) {
448         if (mode == eExplicit) throw;
449         return false;
450     }
451 
452     CRef<CMemoryRegistry> mem_registry(new CMemoryRegistry);
453     const string prefix = m_NsConf ? "ns." : "nc.";
454     const string section = s_GetSection(m_NsConf);
455 
456     for (auto& param : queue_params) {
457         auto name = param.first;
458 
459         if (Transform(prefix, name)) {
460             mem_registry->Set(section, name, param.second);
461         }
462     }
463 
464     if (mem_registry->Empty()) return false;
465 
466     m_Registry.Add(mem_registry.GetObject());
467     return true;
468 }
469 
MakeAuthString()470 string SNetScheduleAPIImpl::MakeAuthString()
471 {
472     string auth(m_Service->MakeAuthString());
473 
474     const CVersionAPI* version = nullptr;
475     string name;
476 
477     {{
478         if (CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard()) {
479             version = &app->GetFullVersion();
480             name = app->GetProgramDisplayName();
481         }
482     }}
483 
484     if (version && m_ProgramVersion.empty()) {
485         m_ProgramVersion += name;
486         auto package_name = version->GetPackageName();
487 
488         if (!package_name.empty()) {
489             m_ProgramVersion += ": ";
490             m_ProgramVersion += package_name;
491             m_ProgramVersion += ' ';
492             m_ProgramVersion += version->GetPackageVersion().Print();
493             m_ProgramVersion += " built on ";
494             m_ProgramVersion += version->GetBuildInfo().date;
495         }
496     }
497 
498     if (!m_ProgramVersion.empty()) {
499         auth += " prog=\"";
500         auth += m_ProgramVersion;
501         auth += '\"';
502     }
503 
504     switch (m_ClientType) {
505     case CNetScheduleAPI::eCT_Admin:
506         auth += " client_type=\"admin\"";
507         break;
508 
509     case CNetScheduleAPI::eCT_Submitter:
510         auth += " client_type=\"submitter\"";
511         break;
512 
513     case CNetScheduleAPI::eCT_WorkerNode:
514         auth += " client_type=\"worker node\"";
515         break;
516 
517     case CNetScheduleAPI::eCT_Reader:
518         auth += " client_type=\"reader\"";
519 
520     default: /* eCT_Auto */
521         break;
522     }
523 
524     if (!m_ClientNode.empty()) {
525         auth += " client_node=\"";
526         auth += m_ClientNode;
527         auth += '\"';
528     }
529 
530     if (!m_ClientSession.empty()) {
531         auth += " client_session=\"";
532         auth += m_ClientSession;
533         auth += '\"';
534     }
535 
536     if (version) {
537         auth += " client_version=\"";
538         auth += version->GetVersionInfo().Print();
539         auth += '\"';
540     }
541 
542     ITERATE(SNetScheduleAPIImpl::TAuthParams, it, m_AuthParams) {
543         auth += it->second;
544     }
545 
546     auth += " ns_compat_ver=\"" COMPATIBLE_NETSCHEDULE_VERSION "\""
547         "\r\n";
548 
549     auth += m_Queue;
550 
551     // Make the auth token look like a command to be able to
552     // check for potential authentication/initialization errors
553     // like the "queue not found" error.
554     if (m_Mode & fNonWnCompatible) {
555         auth += "\r\nVERSION";
556     }
557 
558     return auth;
559 }
560 
GetPropCreator() const561 INetServerConnectionListener::TPropCreator CNetScheduleServerListener::GetPropCreator() const
562 {
563     return [] { return new SNetScheduleServerProperties; };
564 }
565 
Clone()566 INetServerConnectionListener* CNetScheduleServerListener::Clone()
567 {
568     return new CNetScheduleServerListener(*this);
569 }
570 
Init(CSynRegistry & registry,SRegSynonyms & sections)571 void SNetScheduleAPIImpl::Init(CSynRegistry& registry, SRegSynonyms& sections)
572 {
573     SetDiagUserAndHost();
574 
575     m_RetryOnException = registry.Get(sections, "enforce_retry_policy", false);
576 
577     if (!m_Queue.empty()) limits::Check<limits::SQueueName>(m_Queue);
578 
579     const string& user(GetDiagContext().GetUsername());
580     m_ClientNode =
581         m_Service->GetClientName() + "::" +
582         (user.empty() ? kEmptyStr : user + '@') +
583         GetDiagContext().GetHost();
584 
585     CNetScheduleConfigLoader loader(registry, sections);
586 
587     bool affinities_initialized = false;
588 
589     // There are two phases of Init in case we need to load config from server
590     // 1) Setup as much as possible and try to get config from server
591     // 2) Setup everything using received config from server
592     do {
593         if (m_Queue.empty()) {
594             m_Queue = registry.Get(sections, "queue_name", "");
595             if (!m_Queue.empty()) limits::Check<limits::SQueueName>(m_Queue);
596         }
597 
598         m_UseEmbeddedStorage =   registry.Get(sections, { "use_embedded_storage", "use_embedded_input" }, true);
599         m_JobGroup =             registry.Get(sections, "job_group", "");
600         m_JobTtl =               registry.Get(sections, "job_ttl", 0);
601         m_ClientNode =           registry.Get(sections, "client_node", m_ClientNode);
602         GetListener()->Scope() = registry.Get(sections, "scope", "");
603 
604         if (!affinities_initialized && registry.Get(sections, "use_affinities", false)) {
605             affinities_initialized = true;
606             InitAffinities(registry, sections);
607         }
608 
609         if (!m_ClientNode.empty()) {
610             m_ClientSession =
611                 NStr::NumericToString(CDiagContext::GetPID()) + '@' +
612                 NStr::NumericToString(GetFastLocalTime().GetTimeT()) + ':' +
613                 GetDiagContext().GetStringUID();
614         }
615 
616         GetListener()->SetAuthString(MakeAuthString());
617 
618         // If not working in WN compatible mode
619         if (!(m_Mode & fConfigLoading)) break;
620     } while (loader(this));
621 }
622 
OnConnected(CNetServerConnection & connection)623 void CNetScheduleServerListener::OnConnected(CNetServerConnection& connection)
624 {
625     if (m_NonWn) {
626         string version_info(connection.Exec(m_Auth, false));
627 
628         CNetServerInfo server_info(new SNetServerInfoImpl(version_info));
629 
630         string attr_name, attr_value;
631         string ns_node, ns_session;
632         CVersionInfo version;
633 
634         while (server_info.GetNextAttribute(attr_name, attr_value))
635             if (attr_name == "ns_node")
636                 ns_node = attr_value;
637             else if (attr_name == "ns_session")
638                 ns_session = attr_value;
639             else if (attr_name == "server_version")
640                 version = CVersionInfo(attr_value);
641 
642         // Usually, all attributes come together, so no need to check version
643         if (!ns_node.empty() && !ns_session.empty()) {
644             auto server_props = connection->m_Server->Get<SNetScheduleServerProperties>();
645 
646             // Version cannot change without session, so no need to compare, too
647             if (server_props->ns_node != ns_node ||
648                     server_props->ns_session != ns_session) {
649                 CFastMutexGuard guard(m_SharedData->m_ServerByNodeMutex);
650                 server_props->ns_node = ns_node;
651                 server_props->ns_session = ns_session;
652                 server_props->version = version;
653                 m_SharedData->m_ServerByNode[ns_node] = connection->m_Server->m_ServerInPool;
654                 server_props->affs_synced = false;
655             }
656         }
657 
658         if (!m_Scope.empty()) {
659             string cmd("SETSCOPE " + m_Scope);
660             g_AppendClientIPSessionIDHitID(cmd);
661             connection.Exec(cmd, false);
662         }
663     } else
664         connection->WriteLine(m_Auth);
665 }
666 
OnErrorImpl(const string & err_msg,CNetServer & server)667 void CNetScheduleServerListener::OnErrorImpl(
668     const string& err_msg, CNetServer& server)
669 {
670     string code;
671     string msg;
672 
673     if (!NStr::SplitInTwo(err_msg, ":", code, msg)) {
674         if (err_msg == "Job not found") {
675             NCBI_THROW(CNetScheduleException, eJobNotFound, err_msg);
676         }
677         code = err_msg;
678     }
679 
680     // Map code into numeric value
681     CException::TErrCode err_code = CNetScheduleExceptionMap::GetCode(code);
682 
683     switch (err_code) {
684     case CException::eInvalid:
685         NCBI_THROW(CNetServiceException, eCommunicationError, err_msg);
686 
687     case CNetScheduleException::eGroupNotFound:
688     case CNetScheduleException::eAffinityNotFound:
689     case CNetScheduleException::eDuplicateName:
690         // Convert these errors into warnings.
691         OnWarning(msg, server);
692         break;
693 
694     case CNetScheduleException::eJobNotFound:
695         NCBI_THROW(CNetScheduleException, eJobNotFound, "Job not found");
696 
697     default:
698         NCBI_THROW(CNetScheduleException, EErrCode(err_code), !msg.empty() ?
699                 msg : CNetScheduleException::GetErrCodeDescription(err_code));
700     }
701 }
702 
OnWarningImpl(const string & warn_msg,CNetServer & server)703 void CNetScheduleServerListener::OnWarningImpl(const string& warn_msg,
704         CNetServer& server)
705 {
706         ERR_POST(Warning << server->m_ServerInPool->m_Address.AsString() <<
707                 ": " << warn_msg);
708 }
709 
710 const char* const kNetScheduleAPIDriverName = "netschedule_api";
711 
SNetScheduleAPIImpl(CSynRegistryBuilder registry_builder,const string & section,const string & service_name,const string & client_name,const string & queue_name,bool wn,bool try_config)712 SNetScheduleAPIImpl::SNetScheduleAPIImpl(CSynRegistryBuilder registry_builder, const string& section,
713         const string& service_name, const string& client_name,
714         const string& queue_name, bool wn, bool try_config) :
715     m_Mode(GetMode(wn, try_config)),
716     m_SharedData(new SNetScheduleSharedData),
717     m_Queue(queue_name)
718 {
719     SRegSynonyms sections{ section, kNetScheduleAPIDriverName };
720     m_Service = SNetServiceImpl::Create("NetScheduleAPI", service_name, client_name,
721             new CNetScheduleServerListener(m_Mode & fNonWnCompatible, m_SharedData),
722             registry_builder, sections);
723     Init(registry_builder, sections);
724 }
725 
SNetScheduleAPIImpl(SNetServerInPool * server,SNetScheduleAPIImpl * parent)726 SNetScheduleAPIImpl::SNetScheduleAPIImpl(
727         SNetServerInPool* server, SNetScheduleAPIImpl* parent) :
728     m_Mode(parent->m_Mode),
729     m_SharedData(parent->m_SharedData),
730     m_RetryOnException(parent->m_RetryOnException),
731     m_Service(SNetServiceImpl::Clone(server, parent->m_Service)),
732     m_Queue(parent->m_Queue),
733     m_ProgramVersion(parent->m_ProgramVersion),
734     m_ClientNode(parent->m_ClientNode),
735     m_ClientSession(parent->m_ClientSession),
736     m_AffinityPreference(parent->m_AffinityPreference),
737     m_UseEmbeddedStorage(parent->m_UseEmbeddedStorage)
738 {
739 }
740 
CNetScheduleAPI(CNetScheduleAPI::EAppRegistry,const string & conf_section)741 CNetScheduleAPI::CNetScheduleAPI(CNetScheduleAPI::EAppRegistry /*use_app_reg*/,
742         const string& conf_section /* = kEmptyStr */) :
743     m_Impl(new SNetScheduleAPIImpl(nullptr, conf_section))
744 {
745 }
746 
CNetScheduleAPI(const IRegistry & reg,const string & conf_section)747 CNetScheduleAPI::CNetScheduleAPI(const IRegistry& reg,
748         const string& conf_section) :
749     m_Impl(new SNetScheduleAPIImpl(reg, conf_section))
750 {
751 }
752 
CNetScheduleAPI(CConfig * conf,const string & conf_section)753 CNetScheduleAPI::CNetScheduleAPI(CConfig* conf, const string& conf_section) :
754     m_Impl(new SNetScheduleAPIImpl(conf, conf_section))
755 {
756 }
757 
CNetScheduleAPI(const string & service_name,const string & client_name,const string & queue_name)758 CNetScheduleAPI::CNetScheduleAPI(const string& service_name,
759         const string& client_name, const string& queue_name) :
760     m_Impl(new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, queue_name))
761 {
762 }
763 
SetProgramVersion(const string & pv)764 void CNetScheduleAPI::SetProgramVersion(const string& pv)
765 {
766     m_Impl->m_ProgramVersion = pv;
767 
768     m_Impl->UpdateAuthString();
769 }
770 
GetProgramVersion() const771 const string& CNetScheduleAPI::GetProgramVersion() const
772 {
773     return m_Impl->m_ProgramVersion;
774 }
775 
GetQueueName() const776 const string& CNetScheduleAPI::GetQueueName() const
777 {
778     return m_Impl->m_Queue;
779 }
780 
StatusToString(EJobStatus status)781 string CNetScheduleAPI::StatusToString(EJobStatus status)
782 {
783     switch(status) {
784     case eJobNotFound: return "NotFound";
785     case ePending:     return "Pending";
786     case eRunning:     return "Running";
787     case eCanceled:    return "Canceled";
788     case eFailed:      return "Failed";
789     case eDone:        return "Done";
790     case eReading:     return "Reading";
791     case eConfirmed:   return "Confirmed";
792     case eReadFailed:  return "ReadFailed";
793     case eDeleted:     return "Deleted";
794 
795     default: _ASSERT(0);
796     }
797     return kEmptyStr;
798 }
799 
800 CNetScheduleAPI::EJobStatus
StringToStatus(const CTempString & status_str)801 CNetScheduleAPI::StringToStatus(const CTempString& status_str)
802 {
803     if (NStr::CompareNocase(status_str, "Pending") == 0)
804         return ePending;
805     if (NStr::CompareNocase(status_str, "Running") == 0)
806         return eRunning;
807     if (NStr::CompareNocase(status_str, "Canceled") == 0)
808         return eCanceled;
809     if (NStr::CompareNocase(status_str, "Failed") == 0)
810         return eFailed;
811     if (NStr::CompareNocase(status_str, "Done") == 0)
812         return eDone;
813     if (NStr::CompareNocase(status_str, "Reading") == 0)
814         return eReading;
815     if (NStr::CompareNocase(status_str, "Confirmed") == 0)
816         return eConfirmed;
817     if (NStr::CompareNocase(status_str, "ReadFailed") == 0)
818         return eReadFailed;
819     if (NStr::CompareNocase(status_str, "Deleted") == 0)
820         return eDeleted;
821 
822     return eJobNotFound;
823 }
824 
825 #define EXTRACT_WARNING_TYPE(warning_type) \
826     if (NStr::StartsWith(warn_msg, "e" #warning_type ":")) { \
827         warn_msg.erase(0, sizeof("e" #warning_type ":") - 1); \
828         return eWarn##warning_type; \
829     }
830 
831 CNetScheduleAPI::ENetScheduleWarningType
ExtractWarningType(string & warn_msg)832         CNetScheduleAPI::ExtractWarningType(string& warn_msg)
833 {
834     EXTRACT_WARNING_TYPE(AffinityNotFound);
835     EXTRACT_WARNING_TYPE(AffinityNotPreferred);
836     EXTRACT_WARNING_TYPE(AffinityAlreadyPreferred);
837     EXTRACT_WARNING_TYPE(GroupNotFound);
838     EXTRACT_WARNING_TYPE(JobNotFound);
839     EXTRACT_WARNING_TYPE(JobAlreadyCanceled);
840     EXTRACT_WARNING_TYPE(JobAlreadyDone);
841     EXTRACT_WARNING_TYPE(JobAlreadyFailed);
842     EXTRACT_WARNING_TYPE(JobPassportOnlyMatch);
843     EXTRACT_WARNING_TYPE(NoParametersChanged);
844     EXTRACT_WARNING_TYPE(ConfigFileNotChanged);
845     EXTRACT_WARNING_TYPE(AlertNotFound);
846     EXTRACT_WARNING_TYPE(AlertAlreadyAcknowledged);
847     EXTRACT_WARNING_TYPE(SubmitsDisabledForServer);
848     EXTRACT_WARNING_TYPE(QueueAlreadyPaused);
849     EXTRACT_WARNING_TYPE(QueueNotPaused);
850     EXTRACT_WARNING_TYPE(CommandObsolete);
851     EXTRACT_WARNING_TYPE(JobNotRead);
852     return eWarnUnknown;
853 }
854 
855 #define WARNING_TYPE_TO_STRING(warning_type) \
856     case eWarn##warning_type: \
857         return #warning_type;
858 
WarningTypeToString(CNetScheduleAPI::ENetScheduleWarningType warning_type)859 const char* CNetScheduleAPI::WarningTypeToString(
860         CNetScheduleAPI::ENetScheduleWarningType warning_type)
861 {
862     switch (warning_type) {
863     WARNING_TYPE_TO_STRING(AffinityNotFound);
864     WARNING_TYPE_TO_STRING(AffinityNotPreferred);
865     WARNING_TYPE_TO_STRING(AffinityAlreadyPreferred);
866     WARNING_TYPE_TO_STRING(GroupNotFound);
867     WARNING_TYPE_TO_STRING(JobNotFound);
868     WARNING_TYPE_TO_STRING(JobAlreadyCanceled);
869     WARNING_TYPE_TO_STRING(JobAlreadyDone);
870     WARNING_TYPE_TO_STRING(JobAlreadyFailed);
871     WARNING_TYPE_TO_STRING(JobPassportOnlyMatch);
872     WARNING_TYPE_TO_STRING(NoParametersChanged);
873     WARNING_TYPE_TO_STRING(ConfigFileNotChanged);
874     WARNING_TYPE_TO_STRING(AlertNotFound);
875     WARNING_TYPE_TO_STRING(AlertAlreadyAcknowledged);
876     WARNING_TYPE_TO_STRING(SubmitsDisabledForServer);
877     WARNING_TYPE_TO_STRING(QueueAlreadyPaused);
878     WARNING_TYPE_TO_STRING(QueueNotPaused);
879     WARNING_TYPE_TO_STRING(CommandObsolete);
880     WARNING_TYPE_TO_STRING(JobNotRead);
881     default:
882         return "eWarnUnknown";
883     }
884 }
885 
GetSubmitter()886 CNetScheduleSubmitter CNetScheduleAPI::GetSubmitter()
887 {
888     return new SNetScheduleSubmitterImpl(m_Impl);
889 }
890 
GetExecutor()891 CNetScheduleExecutor CNetScheduleAPI::GetExecutor()
892 {
893     return new SNetScheduleExecutorImpl(m_Impl);
894 }
895 
GetJobReader(const string & group,const string & affinity)896 CNetScheduleJobReader CNetScheduleAPI::GetJobReader(const string& group,
897         const string& affinity)
898 {
899     m_Impl->AllocNotificationThread();
900     return new SNetScheduleJobReaderImpl(m_Impl, group, affinity);
901 }
902 
GetAdmin()903 CNetScheduleAdmin CNetScheduleAPI::GetAdmin()
904 {
905     return new SNetScheduleAdminImpl(m_Impl);
906 }
907 
GetService()908 CNetService CNetScheduleAPI::GetService()
909 {
910     return m_Impl->m_Service;
911 }
912 
s_SetJobExpTime(time_t * job_exptime,const string & time_str)913 static void s_SetJobExpTime(time_t* job_exptime, const string& time_str)
914 {
915     if (job_exptime != NULL)
916         *job_exptime = (time_t) NStr::StringToUInt8(time_str,
917                 NStr::fConvErr_NoThrow);
918 }
919 
s_SetPauseMode(ENetScheduleQueuePauseMode * pause_mode,const string & mode_str)920 static void s_SetPauseMode(ENetScheduleQueuePauseMode* pause_mode,
921         const string& mode_str)
922 {
923     if (pause_mode != NULL)
924         *pause_mode = mode_str.empty() ? eNSQ_NoPause :
925                 mode_str == "pullback" ? eNSQ_WithPullback :
926                         eNSQ_WithoutPullback;
927 }
928 
GetJobDetails(CNetScheduleJob & job,time_t * job_exptime,ENetScheduleQueuePauseMode * pause_mode)929 CNetScheduleAPI::EJobStatus CNetScheduleAPI::GetJobDetails(
930         CNetScheduleJob& job,
931         time_t* job_exptime,
932         ENetScheduleQueuePauseMode* pause_mode)
933 {
934     string cmd("STATUS2 " + job.job_id);
935     g_AppendClientIPSessionIDHitID(cmd);
936     cmd += " need_progress_msg=1";
937     auto response = m_Impl->ExecOnJobServer(job, cmd);
938 
939     SNetScheduleOutputParser parser(response);
940 
941     const auto status = StringToStatus(parser("job_status"));
942 
943     s_SetJobExpTime(job_exptime, parser("job_exptime"));
944     s_SetPauseMode(pause_mode, parser("pause"));
945 
946     switch (status) {
947     case ePending:
948     case eRunning:
949     case eCanceled:
950     case eFailed:
951     case eDone:
952     case eReading:
953     case eConfirmed:
954     case eReadFailed:
955         job.input = parser("input");
956         job.output = parser("output");
957         job.ret_code = NStr::StringToInt(parser("ret_code"), NStr::fConvErr_NoThrow);
958         job.error_msg = parser("err_msg");
959         break;
960 
961     default:
962         job.input.erase();
963         job.ret_code = 0;
964         job.output.erase();
965         job.error_msg.erase();
966     }
967 
968     job.affinity.erase();
969     job.mask = CNetScheduleAPI::eEmptyMask;
970     job.progress_msg = parser("msg");
971 
972     return status;
973 }
974 
GetJobStatus(string cmd,const CNetScheduleJob & job,time_t * job_exptime,ENetScheduleQueuePauseMode * pause_mode)975 CNetScheduleAPI::EJobStatus SNetScheduleAPIImpl::GetJobStatus(string cmd,
976         const CNetScheduleJob& job, time_t* job_exptime,
977         ENetScheduleQueuePauseMode* pause_mode)
978 {
979     string response;
980 
981     try {
982         cmd += ' ';
983         cmd += job.job_id;
984         g_AppendClientIPSessionIDHitID(cmd);
985         response = ExecOnJobServer(job, cmd);
986     }
987     catch (CNetScheduleException& e) {
988         if (e.GetErrCode() != CNetScheduleException::eJobNotFound)
989             throw;
990 
991         if (job_exptime != NULL)
992             *job_exptime = 0;
993 
994         return CNetScheduleAPI::eJobNotFound;
995     }
996 
997     SNetScheduleOutputParser parser(response);
998 
999     s_SetJobExpTime(job_exptime, parser("job_exptime"));
1000     s_SetPauseMode(pause_mode, parser("pause"));
1001 
1002     return CNetScheduleAPI::StringToStatus(parser("job_status"));
1003 }
1004 
GetServerByNode(const string & ns_node,CNetServer * server)1005 bool SNetScheduleAPIImpl::GetServerByNode(const string& ns_node,
1006         CNetServer* server)
1007 {
1008     SNetServerInPool* known_server; /* NCBI_FAKE_WARNING */
1009 
1010     {{
1011         CFastMutexGuard guard(m_SharedData->m_ServerByNodeMutex);
1012 
1013         auto server_props_it = m_SharedData->m_ServerByNode.find(ns_node);
1014 
1015         if (server_props_it == m_SharedData->m_ServerByNode.end())
1016             return false;
1017 
1018         known_server = server_props_it->second;
1019     }}
1020 
1021     *server = new SNetServerImpl(m_Service,
1022             m_Service->m_ServerPool->ReturnServer(known_server));
1023 
1024     return true;
1025 }
1026 
1027 const CNetScheduleAPI::SServerParams&
operator ()(CNetService & service,const string & queue)1028 SNetScheduleAPIImpl::SServerParamsSync::operator()(CNetService& service, const string& queue)
1029 {
1030     CFastMutexGuard g(m_FastMutex);
1031 
1032     if (m_AskCount-- > 0) return m_ServerParams;
1033 
1034     m_AskCount = kAskMaxCount;
1035 
1036     m_ServerParams.max_input_size = kNetScheduleMaxDBDataSize;
1037     m_ServerParams.max_output_size = kNetScheduleMaxDBDataSize;
1038 
1039     string cmd("QINF2 " + queue);
1040     g_AppendClientIPSessionIDHitID(cmd);
1041 
1042     CUrlArgs url_parser(service.FindServerAndExec(cmd, false).response);
1043 
1044     enum {
1045         eMaxInputSize,
1046         eMaxOutputSize,
1047         eNumberOfSizeParams
1048     };
1049 
1050     int field_bits = 0;
1051 
1052     ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1053         if (field->name[0] == 'm') {
1054             if (field->name == "max_input_size") {
1055                 field_bits |= (1 << eMaxInputSize);
1056                 m_ServerParams.max_input_size =
1057                         NStr::StringToInt(field->value);
1058             } else if (field->name == "max_output_size") {
1059                 field_bits |= (1 << eMaxOutputSize);
1060                 m_ServerParams.max_output_size =
1061                         NStr::StringToInt(field->value);
1062             }
1063         }
1064         if (field_bits == (1 << eNumberOfSizeParams) - 1)
1065             break;
1066     }
1067 
1068     return m_ServerParams;
1069 }
1070 
GetServerParams()1071 const CNetScheduleAPI::SServerParams& CNetScheduleAPI::GetServerParams()
1072 {
1073     return m_Impl->GetServerParams();
1074 }
1075 
GetQueueParams(const string & queue_name,TQueueParams & queue_params)1076 void SNetScheduleAPIImpl::GetQueueParams(
1077         const string& queue_name, TQueueParams& queue_params)
1078 {
1079     string cmd("QINF2 ");
1080 
1081     if (!queue_name.empty()) {
1082         limits::Check<limits::SQueueName>(queue_name);
1083 
1084         cmd += queue_name;
1085     } else if (!m_Queue.empty()) {
1086         cmd += m_Queue;
1087     } else {
1088         cmd += "service=" + m_Service->m_ServiceName;
1089     }
1090 
1091     g_AppendClientIPSessionIDHitID(cmd);
1092 
1093     CUrlArgs url_parser(m_Service.FindServerAndExec(cmd, false).response);
1094 
1095     ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1096         queue_params[field->name] = field->value;
1097     }
1098 }
1099 
GetQueueParams(const string & queue_name,TQueueParams & queue_params)1100 void CNetScheduleAPI::GetQueueParams(
1101         const string& queue_name, TQueueParams& queue_params)
1102 {
1103     return m_Impl->GetQueueParams(queue_name, queue_params);
1104 }
1105 
GetQueueParams(TQueueParams & queue_params)1106 void SNetScheduleAPIImpl::GetQueueParams(TQueueParams& queue_params)
1107 {
1108     string cmd("GETP2");
1109     g_AppendClientIPSessionIDHitID(cmd);
1110 
1111     CUrlArgs url_parser(m_Service.FindServerAndExec(cmd,
1112             false).response);
1113 
1114     ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1115         queue_params[field->name] = field->value;
1116     }
1117 }
1118 
GetQueueParams(TQueueParams & queue_params)1119 void CNetScheduleAPI::GetQueueParams(TQueueParams& queue_params)
1120 {
1121     return m_Impl->GetQueueParams(queue_params);
1122 }
1123 
GetProgressMsg(CNetScheduleJob & job)1124 void CNetScheduleAPI::GetProgressMsg(CNetScheduleJob& job)
1125 {
1126     string cmd("MGET " + job.job_id);
1127     g_AppendClientIPSessionIDHitID(cmd);
1128     auto response = m_Impl->ExecOnJobServer(job, cmd);
1129     job.progress_msg = NStr::ParseEscapes(response);
1130 }
1131 
SetClientNode(const string & client_node)1132 void CNetScheduleAPI::SetClientNode(const string& client_node)
1133 {
1134     // Cannot add this to limits::SClientNode due to CNetScheduleAPIExt allowing reset to empty values
1135     if (client_node.empty()) {
1136         NCBI_THROW_FMT(CConfigException, eParameterMissing,
1137                 "'" << limits::SClientNode::Name() << "' cannot be empty");
1138     }
1139 
1140     limits::Check<limits::SClientNode>(client_node);
1141 
1142     m_Impl->m_ClientNode = client_node;
1143 
1144     m_Impl->UpdateAuthString();
1145 }
1146 
SetClientSession(const string & client_session)1147 void CNetScheduleAPI::SetClientSession(const string& client_session)
1148 {
1149     // Cannot add this to limits::SClientSession due to CNetScheduleAPIExt allowing reset to empty values
1150     if (client_session.empty()) {
1151         NCBI_THROW_FMT(CConfigException, eParameterMissing,
1152                 "'" << limits::SClientSession::Name() << "' cannot be empty");
1153     }
1154 
1155     limits::Check<limits::SClientSession>(client_session);
1156 
1157     m_Impl->m_ClientSession = client_session;
1158 
1159     m_Impl->UpdateAuthString();
1160 }
1161 
UpdateAuthString()1162 void SNetScheduleAPIImpl::UpdateAuthString()
1163 {
1164     m_Service->m_ServerPool->ResetServerConnections();
1165 
1166     GetListener()->SetAuthString(MakeAuthString());
1167 }
1168 
SetClientType(CNetScheduleAPI::EClientType client_type)1169 void CNetScheduleAPI::SetClientType(CNetScheduleAPI::EClientType client_type)
1170 {
1171     m_Impl->m_ClientType = client_type;
1172 
1173     m_Impl->UpdateAuthString();
1174 }
1175 
UseOldStyleAuth()1176 void SNetScheduleAPIImpl::UseOldStyleAuth()
1177 {
1178     m_Service->m_ServerPool->m_UseOldStyleAuth = true;
1179 
1180     UpdateAuthString();
1181 }
1182 
SetAuthParam(const string & param_name,const string & param_value)1183 void SNetScheduleAPIImpl::SetAuthParam(const string& param_name,
1184         const string& param_value)
1185 {
1186     if (!param_value.empty()) {
1187         string auth_param(' ' + param_name);
1188         auth_param += "=\"";
1189         auth_param += NStr::PrintableString(param_value);
1190         auth_param += '"';
1191         m_AuthParams[param_name] = auth_param;
1192     } else
1193         m_AuthParams.erase(param_name);
1194 
1195     UpdateAuthString();
1196 }
1197 
InitAffinities(CSynRegistry & registry,const SRegSynonyms & sections)1198 void SNetScheduleAPIImpl::InitAffinities(CSynRegistry& registry, const SRegSynonyms& sections)
1199 {
1200     const bool claim_new_affinities = registry.Get(sections, "claim_new_affinities", false);
1201     const bool process_any_job =      registry.Get(sections, "process_any_job", false);
1202     const string affinity_list =      registry.Get(sections, "affinity_list", kEmptyStr);
1203     const string affinity_ladder =    registry.Get(sections, "affinity_ladder", kEmptyStr);
1204 
1205     if (affinity_ladder.empty()) {
1206 
1207         if (claim_new_affinities) {
1208             m_AffinityPreference = CNetScheduleExecutor::eClaimNewPreferredAffs;
1209 
1210         } else if (process_any_job) {
1211             m_AffinityPreference = CNetScheduleExecutor::ePreferredAffsOrAnyJob;
1212 
1213         } else {
1214             m_AffinityPreference = CNetScheduleExecutor::ePreferredAffinities;
1215         }
1216 
1217         if (affinity_list.empty()) return;
1218 
1219         NStr::Split(affinity_list, ", ", m_AffinityList,
1220                 NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
1221 
1222         for (auto& affinity : m_AffinityList) {
1223             limits::Check<limits::SAffinity>(affinity);
1224         }
1225 
1226         return;
1227     }
1228 
1229     // Sanity checks
1230     if (claim_new_affinities) {
1231         NCBI_THROW(CConfigException, eInvalidParameter,
1232                 "'affinity_ladder' cannot be used with 'claim_new_affinities'");
1233     }
1234     if (!affinity_list.empty()) {
1235         NCBI_THROW(CConfigException, eInvalidParameter,
1236                 "'affinity_ladder' cannot be used with 'affinity_list'");
1237     }
1238 
1239     if (!process_any_job) {
1240         m_AffinityPreference = CNetScheduleExecutor::eExplicitAffinitiesOnly;
1241     }
1242 
1243     list<CTempString> affinities;
1244     NStr::Split(affinity_ladder, ", ", affinities,
1245             NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
1246 
1247     if (affinities.empty()) return;
1248 
1249     string affinity_step;
1250 
1251     for (auto& affinity : affinities) {
1252         limits::Check<limits::SAffinity>(affinity);
1253 
1254         if (!affinity_step.empty()) affinity_step += ',';
1255         affinity_step += affinity;
1256         m_AffinityLadder.emplace_back(affinity, affinity_step);
1257     }
1258 }
1259 
1260 ///////////////////////////////////////////////////////////////////////////////
1261 
1262 /// @internal
1263 class CNetScheduleAPICF : public IClassFactory<SNetScheduleAPIImpl>
1264 {
1265 public:
1266 
1267     typedef SNetScheduleAPIImpl TDriver;
1268     typedef SNetScheduleAPIImpl IFace;
1269     typedef IFace TInterface;
1270     typedef IClassFactory<SNetScheduleAPIImpl> TParent;
1271     typedef TParent::SDriverInfo TDriverInfo;
1272     typedef TParent::TDriverList TDriverList;
1273 
1274     /// Construction
1275     ///
1276     /// @param driver_name
1277     ///   Driver name string
1278     /// @param patch_level
1279     ///   Patch level implemented by the driver.
1280     ///   By default corresponds to interface patch level.
CNetScheduleAPICF(const string & driver_name=kNetScheduleAPIDriverName,int patch_level=-1)1281     CNetScheduleAPICF(const string& driver_name = kNetScheduleAPIDriverName,
1282                       int patch_level = -1)
1283         : m_DriverVersionInfo
1284         (ncbi::CInterfaceVersion<IFace>::eMajor,
1285          ncbi::CInterfaceVersion<IFace>::eMinor,
1286          patch_level >= 0 ?
1287             patch_level : ncbi::CInterfaceVersion<IFace>::ePatchLevel),
1288           m_DriverName(driver_name)
1289     {
1290         _ASSERT(!m_DriverName.empty());
1291     }
1292 
1293     /// Create instance of TDriver
1294     virtual TInterface*
CreateInstance(const string & driver=kEmptyStr,CVersionInfo version=NCBI_INTERFACE_VERSION (IFace),const TPluginManagerParamTree * params=0) const1295     CreateInstance(const string& driver  = kEmptyStr,
1296                    CVersionInfo version = NCBI_INTERFACE_VERSION(IFace),
1297                    const TPluginManagerParamTree* params = 0) const
1298     {
1299         if (params && (driver.empty() || driver == m_DriverName) &&
1300                 version.Match(NCBI_INTERFACE_VERSION(IFace)) !=
1301                     CVersionInfo::eNonCompatible) {
1302             CConfig config(params);
1303             return new SNetScheduleAPIImpl(&config, m_DriverName);
1304         }
1305         return NULL;
1306     }
1307 
GetDriverVersions(TDriverList & info_list) const1308     void GetDriverVersions(TDriverList& info_list) const
1309     {
1310         info_list.push_back(TDriverInfo(m_DriverName, m_DriverVersionInfo));
1311     }
1312 
1313 protected:
1314     CVersionInfo  m_DriverVersionInfo;
1315     string        m_DriverName;
1316 };
1317 
1318 
NCBI_EntryPoint_xnetscheduleapi(CPluginManager<SNetScheduleAPIImpl>::TDriverInfoList & info_list,CPluginManager<SNetScheduleAPIImpl>::EEntryPointRequest method)1319 void NCBI_XCONNECT_EXPORT NCBI_EntryPoint_xnetscheduleapi(
1320      CPluginManager<SNetScheduleAPIImpl>::TDriverInfoList&   info_list,
1321      CPluginManager<SNetScheduleAPIImpl>::EEntryPointRequest method)
1322 {
1323        CHostEntryPointImpl<CNetScheduleAPICF>::
1324            NCBI_EntryPointImpl(info_list, method);
1325 
1326 }
1327 
1328 
AddToClientNode(const string & data)1329 void CNetScheduleAPIExt::AddToClientNode(const string& data)
1330 {
1331     string& client_node(m_Impl->m_ClientNode);
1332     client_node += ':';
1333     client_node += data;
1334     UpdateAuthString();
1335 }
1336 
UpdateAuthString()1337 void CNetScheduleAPIExt::UpdateAuthString()
1338 {
1339     m_Impl->UpdateAuthString();
1340 }
1341 
UseOldStyleAuth()1342 void CNetScheduleAPIExt::UseOldStyleAuth()
1343 {
1344     m_Impl->UseOldStyleAuth();
1345 }
1346 
GetCompoundIDPool()1347 CCompoundIDPool CNetScheduleAPIExt::GetCompoundIDPool()
1348 {
1349     return m_Impl->m_CompoundIDPool;
1350 }
1351 
GetServer(CNetServer::TInstance server)1352 CNetScheduleAPI CNetScheduleAPIExt::GetServer(CNetServer::TInstance server)
1353 {
1354     return new SNetScheduleAPIImpl(server->m_ServerInPool, m_Impl);
1355 }
1356 
ReSetClientNode(const string & client_node)1357 void CNetScheduleAPIExt::ReSetClientNode(const string& client_node)
1358 {
1359     limits::Check<limits::SClientNode>(client_node);
1360     m_Impl->m_ClientNode = client_node;
1361     m_Impl->UpdateAuthString();
1362 }
1363 
ReSetClientSession(const string & client_session)1364 void CNetScheduleAPIExt::ReSetClientSession(const string& client_session)
1365 {
1366     limits::Check<limits::SClientSession>(client_session);
1367     m_Impl->m_ClientSession = client_session;
1368     m_Impl->UpdateAuthString();
1369 }
1370 
1371 CNetScheduleAPI::TInstance
CreateWnCompat(const string & service_name,const string & client_name)1372 CNetScheduleAPIExt::CreateWnCompat(const string& service_name,
1373         const string& client_name)
1374 {
1375     return new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, kEmptyStr, true, false);
1376 }
1377 
1378 CNetScheduleAPI::TInstance
CreateNoCfgLoad(const string & service_name,const string & client_name,const string & queue_name)1379 CNetScheduleAPIExt::CreateNoCfgLoad(const string& service_name,
1380         const string& client_name, const string& queue_name)
1381 {
1382     return new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, queue_name, false, false);
1383 }
1384 
1385 
1386 END_NCBI_SCOPE
1387