1 /* $Id: netschedule_api_getjob.cpp 607687 2020-05-06 16:16:59Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Rafael Sadyrov
27 *
28 * File Description:
29 * NetSchedule API get/read job implementation.
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include "grid_worker_impl.hpp"
36 #include "netschedule_api_impl.hpp"
37 #include "netservice_api_impl.hpp"
38
39 #include "netschedule_api_getjob.hpp"
40
41 BEGIN_NCBI_SCOPE
42
43
44 template <class TImpl> class CAnyAffinityJob;
45 template <class TImpl> class CMostAffinityJob;
46
47 template <class TImpl>
GetJob(const CDeadline & deadline,CNetScheduleJob & job,CNetScheduleAPI::EJobStatus * job_status,bool any_affinity)48 CNetScheduleGetJob::EResult CNetScheduleGetJobImpl<TImpl>::GetJob(
49 const CDeadline& deadline,
50 CNetScheduleJob& job,
51 CNetScheduleAPI::EJobStatus* job_status,
52 bool any_affinity)
53 {
54 if (any_affinity) {
55 CAnyAffinityJob<TImpl> holder(job, job_status, m_ImmediateActions);
56 return GetJobImpl(deadline, holder);
57 } else {
58 ReturnNotFullyCheckedServers();
59 CMostAffinityJob<TImpl> holder(job, job_status, m_ImmediateActions, m_Impl);
60 return GetJobImpl(deadline, holder);
61 }
62 }
63
64 typedef list<SSocketAddress> TServers;
65 typedef list<CNetScheduleGetJob::SEntry> TTimeline;
66 typedef TTimeline::iterator TIterator;
67
68 template <class TImpl>
69 class CAnyAffinityJob
70 {
71 public:
72 CNetScheduleJob& job;
73 CNetScheduleAPI::EJobStatus* job_status;
74
CAnyAffinityJob(CNetScheduleJob & j,CNetScheduleAPI::EJobStatus * js,TTimeline & timeline)75 CAnyAffinityJob(CNetScheduleJob& j, CNetScheduleAPI::EJobStatus* js,
76 TTimeline& timeline) :
77 job(j), job_status(js), m_Timeline(timeline)
78 {}
79
Interrupt()80 void Interrupt() {}
Begin()81 TIterator Begin() { return m_Timeline.begin(); }
Next(bool)82 TIterator Next(bool) { return m_Timeline.begin(); }
Affinity() const83 const string& Affinity() const { return kEmptyStr; }
Done()84 bool Done() { return true; }
HasJob() const85 bool HasJob() const { return false; }
86
87 private:
88 TTimeline& m_Timeline;
89 };
90
91 template <class TImpl>
92 class CMostAffinityJob
93 {
94 public:
95 CNetScheduleJob& job;
96 CNetScheduleAPI::EJobStatus* job_status;
97
CMostAffinityJob(CNetScheduleJob & j,CNetScheduleAPI::EJobStatus * js,TTimeline & timeline,TImpl & get_job_impl)98 CMostAffinityJob(CNetScheduleJob& j, CNetScheduleAPI::EJobStatus* js,
99 TTimeline& timeline, TImpl& get_job_impl) :
100 job(j), job_status(js), m_JobPriority(numeric_limits<size_t>::max()),
101 m_Timeline(timeline), m_Iterator(timeline.end()),
102 m_GetJobImpl(get_job_impl)
103 {
104 _ASSERT(m_GetJobImpl.m_API->m_AffinityLadder.size());
105 }
106
Interrupt()107 void Interrupt()
108 {
109 if (HasJob()) {
110 m_GetJobImpl.ReturnJob(job);
111 job.Reset();
112 }
113 }
114
Begin()115 TIterator Begin()
116 {
117 m_Iterator = m_Timeline.end();
118 return m_Timeline.begin();
119 }
120
Next(bool increment)121 TIterator Next(bool increment)
122 {
123 if (increment) {
124 if (m_Iterator == m_Timeline.end()) {
125 m_Iterator = m_Timeline.begin();
126 } else {
127 ++m_Iterator;
128 }
129
130 // We've already got a job from an entry at m_Iterator + 1
131 // (that is why increment is true), so must not happen
132 _ASSERT(m_Iterator != m_Timeline.end());
133
134 } else if (m_Iterator == m_Timeline.end()) {
135 return m_Timeline.begin();
136 }
137
138 TIterator ret = m_Iterator;
139 return ++ret;
140 }
141
Affinity() const142 const string& Affinity() const
143 {
144 // Must not happen, since otherwise Done() has returned true already
145 _ASSERT(m_JobPriority);
146
147 CNetScheduleGetJob::TAffinityLadder&
148 affinity_ladder(m_GetJobImpl.m_API->m_AffinityLadder);
149
150 if (HasJob()) {
151 // Only affinities that are higher that current job's one
152 return affinity_ladder[m_JobPriority - 1].second;
153 } else {
154 // All affinities
155 return affinity_ladder.back().second;
156 }
157 }
158
Done()159 bool Done()
160 {
161 // Must not happen, since otherwise Done() has returned true already
162 _ASSERT(m_JobPriority);
163
164 // Return a less-priority job back
165 if (HasJob()) {
166 m_GetJobImpl.ReturnJob(m_PreviousJob);
167 }
168
169 m_PreviousJob = job;
170
171 CNetScheduleGetJob::TAffinityLadder&
172 affinity_ladder(m_GetJobImpl.m_API->m_AffinityLadder);
173
174 size_t priority = min(affinity_ladder.size(), m_JobPriority) - 1;
175
176 do {
177 if (job.affinity == affinity_ladder[priority].first) {
178 m_JobPriority = priority;
179
180 // Return true, if job has the highest priority (zero)
181 return !m_JobPriority;
182 }
183 } while (priority-- > 0);
184
185 // Whether affinities not from the ladder are allowed
186 if (m_GetJobImpl.m_API->m_AffinityPreference ==
187 CNetScheduleExecutor::eAnyJob) {
188 // Make it the least-priority
189 m_JobPriority = affinity_ladder.size();
190 } else {
191 // Should not happen
192 ERR_POST("Got a job " << job.job_id <<
193 " with unexpected affinity " << job.affinity);
194 m_JobPriority = numeric_limits<size_t>::max();
195 }
196
197 return false;
198 }
199
HasJob() const200 bool HasJob() const
201 {
202 return m_JobPriority < numeric_limits<size_t>::max();
203 }
204
205 private:
206 size_t m_JobPriority;
207 TTimeline& m_Timeline;
208 TIterator m_Iterator;
209 TImpl& m_GetJobImpl;
210 CNetScheduleJob m_PreviousJob;
211 };
212
213 template <class TImpl>
214 template <class TJobHolder>
GetJobImmediately(TJobHolder & holder)215 CNetScheduleGetJob::EResult CNetScheduleGetJobImpl<TImpl>::GetJobImmediately(TJobHolder& holder)
216 {
217 TIterator i = holder.Begin();
218
219 for (;;) {
220 EState state = m_Impl.CheckState();
221
222 if (state == eStopped) {
223 holder.Interrupt();
224 return eInterrupt;
225 }
226
227 if (state == eRestarted) {
228 Restart();
229 i = holder.Begin();
230 continue;
231 }
232
233 // We must check i here to let state be checked before leaving loop
234 if (i == m_ImmediateActions.end()) {
235 return holder.HasJob() ? eJob : eAgain;
236 }
237
238 if (*i == m_DiscoveryAction) {
239 NextDiscoveryIteration();
240 i = holder.Begin();
241 continue;
242 }
243
244 // Whether to move to the next entry
245 // (false means we are already at the next entry due to splice/erase)
246 bool increment = false;
247
248 try {
249 // Get prioritized affinity list and
250 // a flag whether any affinity job is appropriate
251 const string& prio_aff_list = holder.Affinity();
252 const bool any_affinity = !holder.HasJob();
253
254 if (m_Impl.CheckEntry(*i, prio_aff_list, any_affinity,
255 holder.job, holder.job_status)) {
256 if (i == m_ImmediateActions.begin()) {
257 increment = true;
258 } else {
259 // We have got a more prioritized job from this server.
260 // Move this server to the top of immediate actions,
261 // so we will have servers ordered (most-to-least)
262 // by affinities of the jobs they have returned last
263 m_ImmediateActions.splice(m_ImmediateActions.begin(),
264 m_ImmediateActions, i);
265 }
266
267 // A job has been returned; keep the server in
268 // immediate actions because there can be more
269 // jobs in the queue.
270 if (holder.Done()) {
271 return eJob;
272 }
273 } else {
274 // No job has been returned by this server;
275 // query the server later.
276 i->deadline = CDeadline(m_Impl.m_Timeout, 0);
277 i->all_affinities_checked = any_affinity;
278 m_ScheduledActions.splice(m_ScheduledActions.end(),
279 m_ImmediateActions, i);
280 }
281 }
282 catch (CNetSrvConnException& e) {
283 // Because a connection error has occurred, do not
284 // put this server back to the timeline.
285 m_ImmediateActions.erase(i);
286 ERR_POST(Warning << e.GetMsg());
287 }
288 catch (...) {
289 m_ImmediateActions.erase(i);
290
291 if (holder.HasJob()) {
292 return eJob;
293 }
294
295 throw;
296 }
297
298 // Check all servers that have timeout expired
299 while (!m_ScheduledActions.empty() &&
300 m_ScheduledActions.front().deadline.GetRemainingTime().IsZero()) {
301 m_ImmediateActions.splice(m_ImmediateActions.end(),
302 m_ScheduledActions, m_ScheduledActions.begin());
303 }
304
305 // Check if there's a notification in the UDP socket.
306 while (CNetServer server = m_Impl.ReadNotifications()) {
307 MoveToImmediateActions(server);
308 }
309
310 i = holder.Next(increment);
311 }
312 }
313
314 template <class TImpl>
315 template <class TJobHolder>
GetJobImpl(const CDeadline & deadline,TJobHolder & holder)316 CNetScheduleGetJob::EResult CNetScheduleGetJobImpl<TImpl>::GetJobImpl(
317 const CDeadline& deadline, TJobHolder& holder)
318 {
319 for (;;) {
320 EResult ret = GetJobImmediately(holder);
321
322 if (ret != eAgain) {
323 return ret;
324 }
325
326 auto entry_has_more_jobs = [&](const SEntry& entry) {
327 return m_Impl.MoreJobs(entry);
328 };
329
330 // If MoreJobs() returned false for all entries of m_ScheduledActions
331 if (find_if(m_ScheduledActions.begin(), m_ScheduledActions.end(),
332 entry_has_more_jobs) == m_ScheduledActions.end()) {
333 return eNoJobs;
334 }
335
336 if (deadline.IsExpired())
337 return eAgain;
338
339 // At least, the discovery action must be there
340 _ASSERT(!m_ScheduledActions.empty());
341
342 // There's still time. Wait for notifications and query the servers.
343 CDeadline next_event_time = m_ScheduledActions.front().deadline;
344 bool last_wait = deadline < next_event_time;
345 if (last_wait) next_event_time = deadline;
346
347 if (CNetServer server = m_Impl.WaitForNotifications(next_event_time)) {
348 do {
349 MoveToImmediateActions(server);
350 } while ((server = m_Impl.ReadNotifications()));
351 } else if (last_wait) {
352 return eAgain;
353 } else {
354 m_ImmediateActions.splice(m_ImmediateActions.end(),
355 m_ScheduledActions, m_ScheduledActions.begin());
356 }
357 }
358 }
359
Filter(TTimeline & timeline,TServers & servers)360 inline void Filter(TTimeline& timeline, TServers& servers)
361 {
362 TTimeline::iterator i = timeline.begin();
363
364 while (i != timeline.end()) {
365 const SSocketAddress& address(i->server_address);
366 TServers::iterator j = find(servers.begin(), servers.end(), address);
367
368 // If this server is still valid
369 if (j != servers.end()) {
370 servers.erase(j);
371 ++i;
372 } else {
373 timeline.erase(i++);
374 }
375 }
376 }
377
378 template <class TImpl>
Restart()379 void CNetScheduleGetJobImpl<TImpl>::Restart()
380 {
381 // Rediscover all servers
382 m_ImmediateActions.clear();
383 m_ScheduledActions.clear();
384 NextDiscoveryIteration();
385 }
386
387 template <class TImpl>
MoveToImmediateActions(SNetServerImpl * server_impl)388 void CNetScheduleGetJobImpl<TImpl>::MoveToImmediateActions(SNetServerImpl* server_impl)
389 {
390 SEntry entry(server_impl->m_ServerInPool->m_Address);
391
392 TTimeline::iterator i = find(m_ScheduledActions.begin(),
393 m_ScheduledActions.end(), entry);
394
395 // Server was postponed, move to immediate
396 if (i != m_ScheduledActions.end()) {
397 m_ImmediateActions.splice(m_ImmediateActions.end(),
398 m_ScheduledActions, i);
399 return;
400 }
401
402 TTimeline::iterator j = find(m_ImmediateActions.begin(),
403 m_ImmediateActions.end(), entry);
404
405 // It's new server, add to immediate
406 if (j == m_ImmediateActions.end()) {
407 m_ImmediateActions.push_back(entry);
408 }
409 }
410
411 template <class TImpl>
NextDiscoveryIteration()412 void CNetScheduleGetJobImpl<TImpl>::NextDiscoveryIteration()
413 {
414 TServers servers;
415
416 for (CNetServiceIterator it =
417 m_Impl.m_API.GetService().Iterate(
418 CNetService::eIncludePenalized); it; ++it) {
419 servers.push_back((*it)->m_ServerInPool->m_Address);
420 }
421
422 // Keep up to date servers
423 Filter(m_ImmediateActions, servers);
424 Filter(m_ScheduledActions, servers);
425
426 // Add newly discovered servers
427 for (TServers::const_iterator i = servers.begin();
428 i != servers.end(); ++i) {
429 m_ImmediateActions.push_back(*i);
430 }
431
432 // Reschedule discovery after timeout
433 m_DiscoveryAction.deadline = CDeadline(m_Impl.m_Timeout, 0);
434 m_ScheduledActions.push_back(m_DiscoveryAction);
435 }
436
437 template <class TImpl>
ReturnNotFullyCheckedServers()438 void CNetScheduleGetJobImpl<TImpl>::ReturnNotFullyCheckedServers()
439 {
440 // Return back to immediate actions
441 // all servers that have not been checked for all possible affinities
442 TIterator i = m_ScheduledActions.begin();
443
444 while (i != m_ScheduledActions.end()) {
445 if (i->all_affinities_checked) {
446 ++i;
447 } else {
448 m_ImmediateActions.splice(m_ImmediateActions.end(),
449 m_ScheduledActions, i++);
450 }
451 }
452 }
453
454
455 template class CNetScheduleGetJobImpl<CMainLoopThread::CImpl>;
456 template class CNetScheduleGetJobImpl<SNetScheduleJobReaderImpl::CImpl>;
457
458
459 END_NCBI_SCOPE
460