1 /*  $Id: netschedule_api_getjob.cpp 607687 2020-05-06 16:16:59Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Rafael Sadyrov
27  *
28  * File Description:
29  *   NetSchedule API get/read job implementation.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "grid_worker_impl.hpp"
36 #include "netschedule_api_impl.hpp"
37 #include "netservice_api_impl.hpp"
38 
39 #include "netschedule_api_getjob.hpp"
40 
41 BEGIN_NCBI_SCOPE
42 
43 
44 template <class TImpl> class CAnyAffinityJob;
45 template <class TImpl> class CMostAffinityJob;
46 
47 template <class TImpl>
GetJob(const CDeadline & deadline,CNetScheduleJob & job,CNetScheduleAPI::EJobStatus * job_status,bool any_affinity)48 CNetScheduleGetJob::EResult CNetScheduleGetJobImpl<TImpl>::GetJob(
49         const CDeadline& deadline,
50         CNetScheduleJob& job,
51         CNetScheduleAPI::EJobStatus* job_status,
52         bool any_affinity)
53 {
54     if (any_affinity) {
55         CAnyAffinityJob<TImpl> holder(job, job_status, m_ImmediateActions);
56         return GetJobImpl(deadline, holder);
57     } else {
58         ReturnNotFullyCheckedServers();
59         CMostAffinityJob<TImpl> holder(job, job_status, m_ImmediateActions, m_Impl);
60         return GetJobImpl(deadline, holder);
61     }
62 }
63 
64 typedef list<SSocketAddress> TServers;
65 typedef list<CNetScheduleGetJob::SEntry> TTimeline;
66 typedef TTimeline::iterator TIterator;
67 
68 template <class TImpl>
69 class CAnyAffinityJob
70 {
71 public:
72     CNetScheduleJob& job;
73     CNetScheduleAPI::EJobStatus* job_status;
74 
CAnyAffinityJob(CNetScheduleJob & j,CNetScheduleAPI::EJobStatus * js,TTimeline & timeline)75     CAnyAffinityJob(CNetScheduleJob& j, CNetScheduleAPI::EJobStatus* js,
76             TTimeline& timeline) :
77         job(j), job_status(js), m_Timeline(timeline)
78     {}
79 
Interrupt()80     void Interrupt()                {}
Begin()81     TIterator Begin()               { return m_Timeline.begin(); }
Next(bool)82     TIterator Next(bool)            { return m_Timeline.begin(); }
Affinity() const83     const string& Affinity() const  { return kEmptyStr; }
Done()84     bool Done()                     { return true; }
HasJob() const85     bool HasJob() const             { return false; }
86 
87 private:
88     TTimeline& m_Timeline;
89 };
90 
91 template <class TImpl>
92 class CMostAffinityJob
93 {
94 public:
95     CNetScheduleJob& job;
96     CNetScheduleAPI::EJobStatus* job_status;
97 
CMostAffinityJob(CNetScheduleJob & j,CNetScheduleAPI::EJobStatus * js,TTimeline & timeline,TImpl & get_job_impl)98     CMostAffinityJob(CNetScheduleJob& j, CNetScheduleAPI::EJobStatus* js,
99             TTimeline& timeline, TImpl& get_job_impl) :
100         job(j), job_status(js), m_JobPriority(numeric_limits<size_t>::max()),
101         m_Timeline(timeline), m_Iterator(timeline.end()),
102         m_GetJobImpl(get_job_impl)
103     {
104         _ASSERT(m_GetJobImpl.m_API->m_AffinityLadder.size());
105     }
106 
Interrupt()107     void Interrupt()
108     {
109         if (HasJob()) {
110             m_GetJobImpl.ReturnJob(job);
111             job.Reset();
112         }
113     }
114 
Begin()115     TIterator Begin()
116     {
117         m_Iterator = m_Timeline.end();
118         return m_Timeline.begin();
119     }
120 
Next(bool increment)121     TIterator Next(bool increment)
122     {
123         if (increment) {
124             if (m_Iterator == m_Timeline.end()) {
125                 m_Iterator = m_Timeline.begin();
126             } else {
127                 ++m_Iterator;
128             }
129 
130             // We've already got a job from an entry at m_Iterator + 1
131             // (that is why increment is true), so must not happen
132             _ASSERT(m_Iterator != m_Timeline.end());
133 
134         } else if (m_Iterator == m_Timeline.end()) {
135             return m_Timeline.begin();
136         }
137 
138         TIterator ret = m_Iterator;
139         return ++ret;
140     }
141 
Affinity() const142     const string& Affinity() const
143     {
144         // Must not happen, since otherwise Done() has returned true already
145         _ASSERT(m_JobPriority);
146 
147         CNetScheduleGetJob::TAffinityLadder&
148             affinity_ladder(m_GetJobImpl.m_API->m_AffinityLadder);
149 
150         if (HasJob()) {
151             // Only affinities that are higher that current job's one
152             return affinity_ladder[m_JobPriority - 1].second;
153         } else {
154             // All affinities
155             return affinity_ladder.back().second;
156         }
157     }
158 
Done()159     bool Done()
160     {
161         // Must not happen, since otherwise Done() has returned true already
162         _ASSERT(m_JobPriority);
163 
164         // Return a less-priority job back
165         if (HasJob()) {
166             m_GetJobImpl.ReturnJob(m_PreviousJob);
167         }
168 
169         m_PreviousJob = job;
170 
171         CNetScheduleGetJob::TAffinityLadder&
172             affinity_ladder(m_GetJobImpl.m_API->m_AffinityLadder);
173 
174         size_t priority = min(affinity_ladder.size(), m_JobPriority) - 1;
175 
176         do {
177             if (job.affinity == affinity_ladder[priority].first) {
178                 m_JobPriority = priority;
179 
180                 // Return true, if job has the highest priority (zero)
181                 return !m_JobPriority;
182             }
183         } while (priority-- > 0);
184 
185         // Whether affinities not from the ladder are allowed
186         if (m_GetJobImpl.m_API->m_AffinityPreference ==
187                 CNetScheduleExecutor::eAnyJob) {
188             // Make it the least-priority
189             m_JobPriority = affinity_ladder.size();
190         } else {
191             // Should not happen
192             ERR_POST("Got a job " << job.job_id <<
193                     " with unexpected affinity " << job.affinity);
194             m_JobPriority = numeric_limits<size_t>::max();
195         }
196 
197         return false;
198     }
199 
HasJob() const200     bool HasJob() const
201     {
202         return m_JobPriority < numeric_limits<size_t>::max();
203     }
204 
205 private:
206     size_t m_JobPriority;
207     TTimeline& m_Timeline;
208     TIterator m_Iterator;
209     TImpl& m_GetJobImpl;
210     CNetScheduleJob m_PreviousJob;
211 };
212 
213 template <class TImpl>
214 template <class TJobHolder>
GetJobImmediately(TJobHolder & holder)215 CNetScheduleGetJob::EResult CNetScheduleGetJobImpl<TImpl>::GetJobImmediately(TJobHolder& holder)
216 {
217     TIterator i = holder.Begin();
218 
219     for (;;) {
220         EState state = m_Impl.CheckState();
221 
222         if (state == eStopped) {
223             holder.Interrupt();
224             return eInterrupt;
225         }
226 
227         if (state == eRestarted) {
228             Restart();
229             i = holder.Begin();
230             continue;
231         }
232 
233         // We must check i here to let state be checked before leaving loop
234         if (i == m_ImmediateActions.end()) {
235             return holder.HasJob() ? eJob : eAgain;
236         }
237 
238         if (*i == m_DiscoveryAction) {
239             NextDiscoveryIteration();
240             i = holder.Begin();
241             continue;
242         }
243 
244         // Whether to move to the next entry
245         // (false means we are already at the next entry due to splice/erase)
246         bool increment = false;
247 
248         try {
249             // Get prioritized affinity list and
250             // a flag whether any affinity job is appropriate
251             const string& prio_aff_list = holder.Affinity();
252             const bool any_affinity = !holder.HasJob();
253 
254             if (m_Impl.CheckEntry(*i, prio_aff_list, any_affinity,
255                         holder.job, holder.job_status)) {
256                 if (i == m_ImmediateActions.begin()) {
257                     increment = true;
258                 } else {
259                     // We have got a more prioritized job from this server.
260                     // Move this server to the top of immediate actions,
261                     // so we will have servers ordered (most-to-least)
262                     // by affinities of the jobs they have returned last
263                     m_ImmediateActions.splice(m_ImmediateActions.begin(),
264                             m_ImmediateActions, i);
265                 }
266 
267                 // A job has been returned; keep the server in
268                 // immediate actions because there can be more
269                 // jobs in the queue.
270                 if (holder.Done()) {
271                     return eJob;
272                 }
273             } else {
274                 // No job has been returned by this server;
275                 // query the server later.
276                 i->deadline = CDeadline(m_Impl.m_Timeout, 0);
277                 i->all_affinities_checked = any_affinity;
278                 m_ScheduledActions.splice(m_ScheduledActions.end(),
279                         m_ImmediateActions, i);
280             }
281         }
282         catch (CNetSrvConnException& e) {
283             // Because a connection error has occurred, do not
284             // put this server back to the timeline.
285             m_ImmediateActions.erase(i);
286             ERR_POST(Warning << e.GetMsg());
287         }
288         catch (...) {
289             m_ImmediateActions.erase(i);
290 
291             if (holder.HasJob()) {
292                 return eJob;
293             }
294 
295             throw;
296         }
297 
298         // Check all servers that have timeout expired
299         while (!m_ScheduledActions.empty() &&
300                 m_ScheduledActions.front().deadline.GetRemainingTime().IsZero()) {
301             m_ImmediateActions.splice(m_ImmediateActions.end(),
302                     m_ScheduledActions, m_ScheduledActions.begin());
303         }
304 
305         // Check if there's a notification in the UDP socket.
306         while (CNetServer server = m_Impl.ReadNotifications()) {
307             MoveToImmediateActions(server);
308         }
309 
310         i = holder.Next(increment);
311     }
312 }
313 
314 template <class TImpl>
315 template <class TJobHolder>
GetJobImpl(const CDeadline & deadline,TJobHolder & holder)316 CNetScheduleGetJob::EResult CNetScheduleGetJobImpl<TImpl>::GetJobImpl(
317         const CDeadline& deadline, TJobHolder& holder)
318 {
319     for (;;) {
320         EResult ret = GetJobImmediately(holder);
321 
322         if (ret != eAgain) {
323             return ret;
324         }
325 
326         auto entry_has_more_jobs = [&](const SEntry& entry) {
327             return m_Impl.MoreJobs(entry);
328         };
329 
330         // If MoreJobs() returned false for all entries of m_ScheduledActions
331         if (find_if(m_ScheduledActions.begin(), m_ScheduledActions.end(),
332                     entry_has_more_jobs) == m_ScheduledActions.end()) {
333             return eNoJobs;
334         }
335 
336         if (deadline.IsExpired())
337             return eAgain;
338 
339         // At least, the discovery action must be there
340         _ASSERT(!m_ScheduledActions.empty());
341 
342         // There's still time. Wait for notifications and query the servers.
343         CDeadline next_event_time = m_ScheduledActions.front().deadline;
344         bool last_wait = deadline < next_event_time;
345         if (last_wait) next_event_time = deadline;
346 
347         if (CNetServer server = m_Impl.WaitForNotifications(next_event_time)) {
348             do {
349                 MoveToImmediateActions(server);
350             } while ((server = m_Impl.ReadNotifications()));
351         } else if (last_wait) {
352             return eAgain;
353         } else {
354             m_ImmediateActions.splice(m_ImmediateActions.end(),
355                     m_ScheduledActions, m_ScheduledActions.begin());
356         }
357     }
358 }
359 
Filter(TTimeline & timeline,TServers & servers)360 inline void Filter(TTimeline& timeline, TServers& servers)
361 {
362     TTimeline::iterator i = timeline.begin();
363 
364     while (i != timeline.end()) {
365         const SSocketAddress& address(i->server_address);
366         TServers::iterator j = find(servers.begin(), servers.end(), address);
367 
368         // If this server is still valid
369         if (j != servers.end()) {
370             servers.erase(j);
371             ++i;
372         } else {
373             timeline.erase(i++);
374         }
375     }
376 }
377 
378 template <class TImpl>
Restart()379 void CNetScheduleGetJobImpl<TImpl>::Restart()
380 {
381     // Rediscover all servers
382     m_ImmediateActions.clear();
383     m_ScheduledActions.clear();
384     NextDiscoveryIteration();
385 }
386 
387 template <class TImpl>
MoveToImmediateActions(SNetServerImpl * server_impl)388 void CNetScheduleGetJobImpl<TImpl>::MoveToImmediateActions(SNetServerImpl* server_impl)
389 {
390     SEntry entry(server_impl->m_ServerInPool->m_Address);
391 
392     TTimeline::iterator i = find(m_ScheduledActions.begin(),
393             m_ScheduledActions.end(), entry);
394 
395     // Server was postponed, move to immediate
396     if (i != m_ScheduledActions.end()) {
397         m_ImmediateActions.splice(m_ImmediateActions.end(),
398                 m_ScheduledActions, i);
399         return;
400     }
401 
402     TTimeline::iterator j = find(m_ImmediateActions.begin(),
403             m_ImmediateActions.end(), entry);
404 
405     // It's new server, add to immediate
406     if (j == m_ImmediateActions.end()) {
407         m_ImmediateActions.push_back(entry);
408     }
409 }
410 
411 template <class TImpl>
NextDiscoveryIteration()412 void CNetScheduleGetJobImpl<TImpl>::NextDiscoveryIteration()
413 {
414     TServers servers;
415 
416     for (CNetServiceIterator it =
417             m_Impl.m_API.GetService().Iterate(
418                 CNetService::eIncludePenalized); it; ++it) {
419         servers.push_back((*it)->m_ServerInPool->m_Address);
420     }
421 
422     // Keep up to date servers
423     Filter(m_ImmediateActions, servers);
424     Filter(m_ScheduledActions, servers);
425 
426     // Add newly discovered servers
427     for (TServers::const_iterator i = servers.begin();
428             i != servers.end(); ++i) {
429         m_ImmediateActions.push_back(*i);
430     }
431 
432     // Reschedule discovery after timeout
433     m_DiscoveryAction.deadline = CDeadline(m_Impl.m_Timeout, 0);
434     m_ScheduledActions.push_back(m_DiscoveryAction);
435 }
436 
437 template <class TImpl>
ReturnNotFullyCheckedServers()438 void CNetScheduleGetJobImpl<TImpl>::ReturnNotFullyCheckedServers()
439 {
440     // Return back to immediate actions
441     // all servers that have not been checked for all possible affinities
442     TIterator i = m_ScheduledActions.begin();
443 
444     while (i != m_ScheduledActions.end()) {
445         if (i->all_affinities_checked) {
446             ++i;
447         } else {
448             m_ImmediateActions.splice(m_ImmediateActions.end(),
449                     m_ScheduledActions, i++);
450         }
451     }
452 }
453 
454 
455 template class CNetScheduleGetJobImpl<CMainLoopThread::CImpl>;
456 template class CNetScheduleGetJobImpl<SNetScheduleJobReaderImpl::CImpl>;
457 
458 
459 END_NCBI_SCOPE
460