1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC. If not, see <http://www.gnu.org/licenses/>.
17
18 // CPU scheduling logic.
19 //
20 // - create an ordered "run list" (make_run_list()).
21 // The ordering is roughly as follows:
22 // - GPU jobs first, then CPU jobs
23 // - for a given resource, jobs in deadline danger first
24 // - jobs from projects with lower recent est. credit first
25 // In principle, the run list could include all runnable jobs.
26 // For efficiency, we stop adding:
27 // - GPU jobs: when all GPU instances used
28 // - CPU jobs: when the # of CPUs allocated to single-thread jobs,
29 // OR the # allocated to multi-thread jobs, exceeds # CPUs
30 // (ensure we have enough single-thread jobs
31 // in case we can't run the multi-thread jobs)
32 // NOTE: RAM usage is not taken into consideration
33 // in the process of building this list.
34 // It's possible that we include a bunch of jobs that can't run
35 // because of memory limits,
36 // even though there are other jobs that could run.
37 // - add running jobs to the list
38 // (in case they haven't finished time slice or checkpointed)
39 // - sort the list according to "more_important()"
40 // - shuffle the list to avoid starving multi-thread jobs
41 //
42 // - scan through the resulting list, running the jobs and preempting
43 // other jobs (enforce_run_list).
44 // Don't run a job if
45 // - its GPUs can't be assigned (possible if need >1 GPU)
46 // - it's a multi-thread job, and CPU usage would be #CPUs+1 or more
47 // - it's a single-thread job, don't oversaturate CPU
48 // (details depend on whether a MT job is running)
49 // - its memory usage would exceed RAM limits
50 // If there's a running job using a given app version,
51 // unstarted jobs using that app version
52 // are assumed to have the same working set size.
53
54 #include "cpp.h"
55
56 #ifdef _WIN32
57 #include "boinc_win.h"
58 #include "sysmon_win.h"
59 #else
60 #include "config.h"
61 #include <string>
62 #include <cstring>
63 #include <list>
64 #endif
65
66
67 #include "coproc.h"
68 #include "error_numbers.h"
69 #include "filesys.h"
70 #include "str_util.h"
71 #include "util.h"
72
73 #include "app.h"
74 #include "app_config.h"
75 #include "client_msgs.h"
76 #include "client_state.h"
77 #include "coproc_sched.h"
78 #include "log_flags.h"
79 #include "project.h"
80 #include "result.h"
81
82
83 using std::vector;
84 using std::list;
85
86 static double rec_sum;
87
88 // used in make_run_list() to keep track of resources used
89 // by jobs tentatively scheduled so far
90 //
91 struct PROC_RESOURCES {
92 int ncpus;
93 double ncpus_used_st; // #CPUs of GPU or single-thread jobs
94 double ncpus_used_mt; // #CPUs of multi-thread jobs
95 COPROCS pr_coprocs;
96
initPROC_RESOURCES97 void init() {
98 ncpus = gstate.ncpus;
99 ncpus_used_st = 0;
100 ncpus_used_mt = 0;
101 pr_coprocs.clone(coprocs, false);
102 pr_coprocs.clear_usage();
103 if (have_max_concurrent) {
104 max_concurrent_init();
105 }
106 }
107
108 // should we stop scanning jobs?
109 //
stop_scan_cpuPROC_RESOURCES110 inline bool stop_scan_cpu() {
111 if (ncpus_used_st >= ncpus) return true;
112 if (ncpus_used_mt >= 2*ncpus) return true;
113 // kind of arbitrary, but need to have some limit
114 // in case there are only MT jobs, and lots of them
115 return false;
116 }
117
stop_scan_coprocPROC_RESOURCES118 inline bool stop_scan_coproc(int rsc_type) {
119 COPROC& cp = pr_coprocs.coprocs[rsc_type];
120 for (int i=0; i<cp.count; i++) {
121 if (cp.usage[i] < 1) return false;
122 }
123 return true;
124 }
125
126 // should we consider scheduling this job?
127 // (i.e add it to the runnable list; not actually run it)
128 //
can_schedulePROC_RESOURCES129 bool can_schedule(RESULT* rp, ACTIVE_TASK* atp) {
130 if (max_concurrent_exceeded(rp)) return false;
131 if (atp) {
132 // don't schedule if something's pending
133 //
134 switch (atp->task_state()) {
135 case PROCESS_ABORT_PENDING:
136 case PROCESS_QUIT_PENDING:
137 return false;
138 }
139 if (gstate.retry_shmem_time > gstate.now) {
140 if (atp->app_client_shm.shm == NULL) {
141 if (log_flags.cpu_sched_debug) {
142 msg_printf(rp->project, MSG_INFO,
143 "[cpu_sched_debug] waiting for shared mem: %s",
144 rp->name
145 );
146 }
147 atp->needs_shmem = true;
148 return false;
149 }
150 atp->needs_shmem = false;
151 }
152 }
153 if (rp->schedule_backoff > gstate.now) return false;
154 if (rp->uses_gpu()) {
155 if (gpu_suspend_reason) return false;
156 }
157 if (rp->uses_coprocs()) {
158 if (sufficient_coprocs(*rp)) {
159 return true;
160 } else {
161 return false;
162 }
163 } else if (rp->avp->avg_ncpus > 1) {
164 if (ncpus_used_mt == 0) return true;
165 return (ncpus_used_mt + rp->avp->avg_ncpus <= ncpus);
166 } else {
167 return (ncpus_used_st < ncpus);
168 }
169 }
170
171 // we've decided to add this to the runnable list; update bookkeeping
172 //
schedulePROC_RESOURCES173 void schedule(RESULT* rp, ACTIVE_TASK* atp, bool is_edf) {
174 int rt = rp->avp->gpu_usage.rsc_type;
175
176 // see if it's possible this job will be ruled out
177 // when we try to actually run it
178 // (e.g. it won't fit in RAM, or it uses GPU type w/ exclusions)
179 // If so, don't reserve CPU/GPU for it, to avoid starvation scenario
180 //
181 bool may_not_run = false;
182 if (atp && atp->too_large) {
183 may_not_run = true;
184 }
185 if (rt && rsc_work_fetch[rt].has_exclusions) {
186 may_not_run = true;
187 }
188
189 if (!may_not_run) {
190 if (rt) {
191 reserve_coprocs(*rp);
192 // don't increment CPU usage.
193 // This may seem odd; the reason is the following scenario:
194 // - this job uses lots of CPU (say, a whole one)
195 // - there's an uncheckpointed GPU job that uses little CPU
196 // - we end up running the uncheckpointed job
197 // - this causes all or part of a CPU to be idle
198 //
199 } else if (rp->avp->avg_ncpus > 1) {
200 ncpus_used_mt += rp->avp->avg_ncpus;
201 } else {
202 ncpus_used_st += rp->avp->avg_ncpus;
203 }
204 }
205 if (log_flags.cpu_sched_debug) {
206 msg_printf(rp->project, MSG_INFO,
207 "[cpu_sched_debug] add to run list: %s (%s, %s) (prio %f)",
208 rp->name,
209 rsc_name_long(rt),
210 is_edf?"EDF":"FIFO",
211 rp->project->sched_priority
212 );
213 }
214
215 adjust_rec_sched(rp);
216 max_concurrent_inc(rp);
217 }
218
sufficient_coprocsPROC_RESOURCES219 bool sufficient_coprocs(RESULT& r) {
220 APP_VERSION& av = *r.avp;
221 int rt = av.gpu_usage.rsc_type;
222 if (!rt) return true;
223 double x = av.gpu_usage.usage;
224 COPROC& cp = pr_coprocs.coprocs[rt];
225 for (int i=0; i<cp.count; i++) {
226 if (gpu_excluded(r.app, cp, i)) continue;
227 double unused = 1 - cp.usage[i];
228 x -= unused;
229 if (x <= 0) return true;
230 }
231 if (log_flags.cpu_sched_debug) {
232 msg_printf(r.project, MSG_INFO,
233 "[cpu_sched_debug] insufficient %s for %s",
234 cp.type, r.name
235 );
236 }
237 return false;
238 }
239
reserve_coprocsPROC_RESOURCES240 void reserve_coprocs(RESULT& r) {
241 double x;
242 APP_VERSION& av = *r.avp;
243 int rt = av.gpu_usage.rsc_type;
244 COPROC& cp = pr_coprocs.coprocs[rt];
245 x = av.gpu_usage.usage;
246 for (int i=0; i<cp.count; i++) {
247 if (gpu_excluded(r.app, cp, i)) continue;
248 double unused = 1 - cp.usage[i];
249 if (unused >= x) {
250 cp.usage[i] += x;
251 break;
252 } else {
253 cp.usage[i] = 1;
254 x -= unused;
255 }
256 }
257 if (log_flags.cpu_sched_debug) {
258 msg_printf(r.project, MSG_INFO,
259 "[cpu_sched_debug] reserving %f of coproc %s",
260 av.gpu_usage.usage, cp.type
261 );
262 }
263 }
264 };
265
266 bool gpus_usable = true;
267
268 #ifndef SIM
269 // see whether there's been a change in coproc usability;
270 // if so set or clear "coproc_missing" flags and return true.
271 //
check_coprocs_usable()272 bool check_coprocs_usable() {
273 #ifdef _WIN32
274 unsigned int i;
275 bool new_usable = !is_remote_desktop();
276 if (gpus_usable) {
277 if (!new_usable) {
278 gpus_usable = false;
279 for (i=0; i<gstate.results.size(); i++) {
280 RESULT* rp = gstate.results[i];
281 if (rp->avp->gpu_usage.rsc_type) {
282 rp->coproc_missing = true;
283 }
284 }
285 msg_printf(NULL, MSG_INFO,
286 "Remote desktop in use; disabling GPU tasks"
287 );
288 return true;
289 }
290 } else {
291 if (new_usable) {
292 gpus_usable = true;
293 for (i=0; i<gstate.results.size(); i++) {
294 RESULT* rp = gstate.results[i];
295 if (rp->avp->gpu_usage.rsc_type) {
296 rp->coproc_missing = false;
297 }
298 }
299 msg_printf(NULL, MSG_INFO,
300 "Remote desktop not in use; enabling GPU tasks"
301 );
302 return true;
303 }
304 }
305 #endif
306 return false;
307 }
308 #endif
309
310 // return true if the task has finished its time slice
311 // and has checkpointed since the end of the time slice
312 // (called only for scheduled tasks)
313 //
finished_time_slice(ACTIVE_TASK * atp)314 static inline bool finished_time_slice(ACTIVE_TASK* atp) {
315 double time_slice_end = atp->run_interval_start_wall_time + gstate.global_prefs.cpu_scheduling_period();
316 bool running_beyond_sched_period = gstate.now > time_slice_end;
317 bool checkpointed = atp->checkpoint_wall_time > time_slice_end;
318 if (running_beyond_sched_period && !checkpointed) {
319 atp->overdue_checkpoint = true;
320 }
321 return (running_beyond_sched_period && checkpointed);
322 }
323
324 // Choose a "best" runnable CPU job for each project
325 //
326 // Values are returned in project->next_runnable_result
327 // (skip projects for which this is already non-NULL)
328 //
329 // Don't choose results with already_selected == true;
330 // mark chosen results as already_selected.
331 //
332 // The preference order:
333 // 1. results with active tasks that are running
334 // 2. results with active tasks that are preempted (but have a process)
335 // 3. results with active tasks that have no process
336 // 4. results with no active task
337 //
338 // TODO: this is called in a loop over NCPUs, which is silly.
339 // Should call it once, and have it make an ordered list per project.
340 //
assign_results_to_projects()341 void CLIENT_STATE::assign_results_to_projects() {
342 unsigned int i;
343 RESULT* rp;
344 PROJECT* project;
345
346 // scan results with an ACTIVE_TASK
347 //
348 for (i=0; i<active_tasks.active_tasks.size(); i++) {
349 ACTIVE_TASK *atp = active_tasks.active_tasks[i];
350 if (!atp->runnable()) continue;
351 rp = atp->result;
352 if (rp->already_selected) continue;
353 if (rp->uses_coprocs()) continue;
354 if (!rp->runnable()) continue;
355 project = rp->project;
356 if (!project->next_runnable_result) {
357 project->next_runnable_result = rp;
358 continue;
359 }
360
361 // see if this task is "better" than the one currently
362 // selected for this project
363 //
364 ACTIVE_TASK *next_atp = lookup_active_task_by_result(
365 project->next_runnable_result
366 );
367
368 if ((next_atp->task_state() == PROCESS_UNINITIALIZED && atp->process_exists())
369 || (next_atp->scheduler_state == CPU_SCHED_PREEMPTED
370 && atp->scheduler_state == CPU_SCHED_SCHEDULED)
371 ) {
372 project->next_runnable_result = atp->result;
373 }
374 }
375
376 // Now consider results that don't have an active task
377 //
378 for (i=0; i<results.size(); i++) {
379 rp = results[i];
380 if (rp->already_selected) continue;
381 if (rp->uses_coprocs()) continue;
382 if (lookup_active_task_by_result(rp)) continue;
383 if (!rp->runnable()) continue;
384
385 project = rp->project;
386 if (project->next_runnable_result) continue;
387 project->next_runnable_result = rp;
388 }
389
390 // mark selected results, so CPU scheduler won't try to consider
391 // a result more than once
392 //
393 for (i=0; i<projects.size(); i++) {
394 project = projects[i];
395 if (project->next_runnable_result) {
396 project->next_runnable_result->already_selected = true;
397 }
398 }
399 }
400
401 // Among projects with a "next runnable result",
402 // find the project P with the largest priority,
403 // and return its next runnable result
404 //
highest_prio_project_best_result()405 RESULT* CLIENT_STATE::highest_prio_project_best_result() {
406 PROJECT *best_project = NULL;
407 double best_prio = 0;
408 bool first = true;
409 unsigned int i;
410
411 for (i=0; i<projects.size(); i++) {
412 PROJECT* p = projects[i];
413 if (!p->next_runnable_result) continue;
414 if (p->non_cpu_intensive) continue;
415 if (first || p->sched_priority > best_prio) {
416 first = false;
417 best_project = p;
418 best_prio = p->sched_priority;
419 }
420 }
421 if (!best_project) return NULL;
422
423 RESULT* rp = best_project->next_runnable_result;
424 best_project->next_runnable_result = 0;
425 return rp;
426 }
427
428 // Return a job of the given type according to the following criteria
429 // (desc priority):
430 // - from project with higher priority
431 // - already-started job
432 // - earlier received_time
433 // - lexicographically earlier name
434 //
435 // Give priority to already-started jobs because of the following scenario:
436 // - client gets several jobs in a sched reply and starts downloading files
437 // - a later job finishes downloading and starts
438 // - an earlier finishes downloading and preempts
439 //
first_coproc_result(int rsc_type)440 RESULT* first_coproc_result(int rsc_type) {
441 unsigned int i;
442 RESULT* best = NULL;
443 double best_prio=0, prio;
444 for (i=0; i<gstate.results.size(); i++) {
445 RESULT* rp = gstate.results[i];
446 if (rp->resource_type() != rsc_type) continue;
447 if (!rp->runnable()) {
448 //msg_printf(rp->project, MSG_INFO, "not runnable: %s", rp->name);
449 continue;
450 }
451 if (rp->non_cpu_intensive()) continue;
452 if (rp->already_selected) continue;
453 prio = rp->project->sched_priority;
454 if (!best) {
455 best = rp;
456 best_prio = prio;
457 continue;
458 }
459
460 if (prio < best_prio) {
461 continue;
462 }
463 if (prio > best_prio) {
464 best = rp;
465 best_prio = prio;
466 continue;
467 }
468
469 bool bs = !best->not_started;
470 bool rs = !rp->not_started;
471 if (rs && !bs) {
472 best = rp;
473 best_prio = prio;
474 continue;
475 }
476 if (!rs && bs) {
477 continue;
478 }
479
480 // else used "arrived first" order
481 //
482 if (rp->index < best->index) {
483 best = rp;
484 best_prio = prio;
485 }
486 }
487 return best;
488 }
489
490 // Return earliest-deadline result for given resource type;
491 // return only results projected to miss their deadline,
492 // or from projects with extreme DCF
493 //
earliest_deadline_result(int rsc_type)494 static RESULT* earliest_deadline_result(int rsc_type) {
495 RESULT *best_result = NULL;
496 ACTIVE_TASK* best_atp = NULL;
497 unsigned int i;
498
499 for (i=0; i<gstate.results.size(); i++) {
500 RESULT* rp = gstate.results[i];
501 if (rp->resource_type() != rsc_type) continue;
502 if (rp->already_selected) continue;
503 if (!rp->runnable()) continue;
504 if (rp->non_cpu_intensive()) continue;
505 PROJECT* p = rp->project;
506
507 // Skip this job if the project's deadline-miss count is zero.
508 // If the project's DCF is > 90 (and we're not ignoring it)
509 // treat all jobs as deadline misses
510 //
511 if (p->dont_use_dcf || p->duration_correction_factor < 90.0) {
512 if (p->rsc_pwf[rsc_type].deadlines_missed_copy <= 0) {
513 continue;
514 }
515 }
516
517 bool new_best = false;
518 if (best_result) {
519 if (rp->report_deadline < best_result->report_deadline) {
520 new_best = true;
521 }
522 } else {
523 new_best = true;
524 }
525 if (new_best) {
526 best_result = rp;
527 best_atp = gstate.lookup_active_task_by_result(rp);
528 continue;
529 }
530 if (rp->report_deadline > best_result->report_deadline) {
531 continue;
532 }
533
534 // If there's a tie, pick the job with the least remaining time
535 // (but don't pick an unstarted job over one that's started)
536 //
537 ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
538 if (best_atp && !atp) continue;
539 if (rp->estimated_runtime_remaining() < best_result->estimated_runtime_remaining()
540 || (!best_atp && atp)
541 ) {
542 best_result = rp;
543 best_atp = atp;
544 }
545 }
546 if (!best_result) return NULL;
547
548 return best_result;
549 }
550
reset_rec_accounting()551 void CLIENT_STATE::reset_rec_accounting() {
552 unsigned int i;
553 for (i=0; i<projects.size(); i++) {
554 PROJECT* p = projects[i];
555 for (int j=0; j<coprocs.n_rsc; j++) {
556 p->rsc_pwf[j].reset_rec_accounting();
557 }
558 }
559 for (int j=0; j<coprocs.n_rsc; j++) {
560 rsc_work_fetch[j].reset_rec_accounting();
561 }
562 rec_interval_start = now;
563 }
564
565 // update per-project accounting:
566 // - recent estimated credit (EC)
567 // - total CPU and GPU EC
568 // - total CPU and GPU time
569 //
update_rec()570 static void update_rec() {
571 double f = gstate.host_info.p_fpops;
572 double on_frac = gstate.global_prefs.cpu_usage_limit / 100;
573
574 for (unsigned int i=0; i<gstate.projects.size(); i++) {
575 PROJECT* p = gstate.projects[i];
576
577 double x = 0;
578 for (int j=0; j<coprocs.n_rsc; j++) {
579 double dt = p->rsc_pwf[j].secs_this_rec_interval * on_frac;
580 double flops = dt * f * rsc_work_fetch[j].relative_speed;
581 x += flops;
582 if (j) {
583 p->gpu_ec += flops*COBBLESTONE_SCALE;
584 p->gpu_time += dt;
585 } else {
586 p->cpu_ec += flops*COBBLESTONE_SCALE;
587 p->cpu_time += dt;
588 }
589 }
590 x *= COBBLESTONE_SCALE;
591 double old = p->pwf.rec;
592
593 // start averages at zero
594 //
595 if (p->pwf.rec_time == 0) {
596 p->pwf.rec_time = gstate.rec_interval_start;
597 }
598
599 update_average(
600 gstate.now,
601 gstate.rec_interval_start,
602 x,
603 cc_config.rec_half_life,
604 p->pwf.rec,
605 p->pwf.rec_time
606 );
607
608 if (log_flags.priority_debug) {
609 double dt = gstate.now - gstate.rec_interval_start;
610 msg_printf(p, MSG_INFO,
611 "[prio] recent est credit: %.2fG in %.2f sec, %f + %f ->%f",
612 x, dt, old, p->pwf.rec-old, p->pwf.rec
613 );
614 }
615 }
616 }
617
peak_flops(APP_VERSION * avp)618 static double peak_flops(APP_VERSION* avp) {
619 double f = gstate.host_info.p_fpops;
620 double x = f * avp->avg_ncpus;
621 int rt = avp->gpu_usage.rsc_type;
622 if (rt) {
623 x += f * avp->gpu_usage.usage * rsc_work_fetch[rt].relative_speed;
624 }
625 return x;
626 }
627
total_peak_flops()628 double total_peak_flops() {
629 static bool first=true;
630 static double tpf;
631 if (first) {
632 first = false;
633 tpf = gstate.host_info.p_fpops * gstate.ncpus;
634 for (int i=1; i<coprocs.n_rsc; i++) {
635 COPROC& cp = coprocs.coprocs[i];
636 tpf += rsc_work_fetch[i].relative_speed * gstate.host_info.p_fpops * cp.count;
637 }
638 }
639 return tpf;
640 }
641
642 // Initialize project "priorities" based on REC:
643 // compute resource share and REC fractions
644 // among compute-intensive, non-suspended projects
645 //
project_priority_init(bool for_work_fetch)646 void project_priority_init(bool for_work_fetch) {
647 double rs_sum = 0;
648 rec_sum = 0;
649 for (unsigned int i=0; i<gstate.projects.size(); i++) {
650 PROJECT* p = gstate.projects[i];
651 if (p->non_cpu_intensive) continue;
652 if (for_work_fetch) {
653 if (!p->can_request_work()) continue;
654 } else {
655 if (!p->runnable(RSC_TYPE_ANY)) continue;
656 }
657 p->pwf.rec_temp = p->pwf.rec;
658 rs_sum += p->resource_share;
659 rec_sum += p->pwf.rec_temp;
660 }
661 if (rec_sum == 0) {
662 rec_sum = 1;
663 }
664 for (unsigned int i=0; i<gstate.projects.size(); i++) {
665 PROJECT* p = gstate.projects[i];
666 if (p->non_cpu_intensive || p->suspended_via_gui || rs_sum==0) {
667 p->resource_share_frac = 0;
668 p->sched_priority = 0;
669 } else {
670 p->resource_share_frac = p->resource_share/rs_sum;
671 p->compute_sched_priority();
672 if (log_flags.priority_debug) {
673 msg_printf(p, MSG_INFO, "[prio] %f rsf %f rt %f rs %f",
674 p->sched_priority, p->resource_share_frac,
675 p->pwf.rec_temp, rec_sum
676 );
677 }
678 }
679 }
680 }
681
compute_sched_priority()682 void PROJECT::compute_sched_priority() {
683 double rec_frac = pwf.rec_temp/rec_sum;
684
685 // projects with zero resource share are always lower priority
686 // than those with positive resource share
687 //
688 if (resource_share == 0) {
689 sched_priority = -1e3 - rec_frac;
690 } else {
691 sched_priority = - rec_frac/resource_share_frac;
692 }
693 }
694
695 // called from the scheduler's job-selection loop;
696 // we plan to run this job;
697 // bump the project's temp REC by the estimated credit for 1 hour.
698 // This encourages a mixture jobs from different projects.
699 //
adjust_rec_sched(RESULT * rp)700 void adjust_rec_sched(RESULT* rp) {
701 PROJECT* p = rp->project;
702 p->pwf.rec_temp += peak_flops(rp->avp)/total_peak_flops() * rec_sum/24;
703 p->compute_sched_priority();
704 }
705
706 // make this a variable so simulator can change it
707 //
708 double rec_adjust_period = REC_ADJUST_PERIOD;
709
710 // adjust project REC
711 //
adjust_rec()712 void CLIENT_STATE::adjust_rec() {
713 unsigned int i;
714 double elapsed_time = now - rec_interval_start;
715
716 // If the elapsed time is negative or more than 2*REC_ADJUST_PERIOD
717 // it must be because either
718 // - the system clock was changed.
719 // - the host was suspended for a long time.
720 // In either case, ignore the last period
721 //
722 if (elapsed_time > 2*rec_adjust_period || elapsed_time < 0) {
723 if (log_flags.priority_debug) {
724 msg_printf(NULL, MSG_INFO,
725 "[priority] adjust_rec: elapsed time (%.0f) negative or longer than sched enforce period(%.0f). Ignoring this period.",
726 elapsed_time, rec_adjust_period
727 );
728 }
729 reset_rec_accounting();
730 return;
731 }
732
733 // skip small intervals
734 //
735 if (elapsed_time < 1) {
736 return;
737 }
738
739 // total up how many instance-seconds projects got
740 //
741 for (i=0; i<active_tasks.active_tasks.size(); i++) {
742 ACTIVE_TASK* atp = active_tasks.active_tasks[i];
743 if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
744 PROJECT* p = atp->result->project;
745 if (p->non_cpu_intensive) continue;
746 work_fetch.accumulate_inst_sec(atp, elapsed_time);
747 }
748
749 update_rec();
750
751 reset_rec_accounting();
752 }
753
754
755 // Possibly do job scheduling.
756 // This is called periodically.
757 //
schedule_cpus()758 bool CLIENT_STATE::schedule_cpus() {
759 double elapsed_time;
760 static double last_reschedule=0;
761 vector<RESULT*> run_list;
762
763 if (projects.size() == 0) return false;
764 if (results.size() == 0) return false;
765
766 // Reschedule every CPU_SCHED_PERIOD seconds,
767 // or if must_schedule_cpus is set
768 // (meaning a new result is available, or a CPU has been freed).
769 //
770 elapsed_time = now - last_reschedule;
771 if (gstate.clock_change || elapsed_time >= CPU_SCHED_PERIOD) {
772 request_schedule_cpus("periodic CPU scheduling");
773 }
774
775 if (!must_schedule_cpus) return false;
776 last_reschedule = now;
777 must_schedule_cpus = false;
778
779 // NOTE: there's an assumption that REC is adjusted at
780 // least as often as the CPU sched period (see client_state.h).
781 // If you remove the following, make changes accordingly
782 //
783 adjust_rec();
784
785 make_run_list(run_list);
786 return enforce_run_list(run_list);
787 }
788
789 // Mark a job J as a deadline miss if either
790 // - it once ran in EDF, and its project has another job
791 // of the same resource type marked as deadline miss.
792 // This avoids domino-effect preemption
793 // - it was recently marked as a deadline miss by RR sim.
794 // This avoids "thrashing" if a job oscillates between miss and not miss.
795 //
promote_once_ran_edf()796 static void promote_once_ran_edf() {
797 for (unsigned int i=0; i<gstate.active_tasks.active_tasks.size(); i++) {
798 ACTIVE_TASK* atp = gstate.active_tasks.active_tasks[i];
799 if (atp->result->rr_sim_misses_deadline) continue;
800 if (atp->once_ran_edf) {
801 RESULT* rp = atp->result;
802 PROJECT* p = rp->project;
803 if (p->deadlines_missed(rp->avp->rsc_type())) {
804 if (log_flags.cpu_sched_debug) {
805 msg_printf(p, MSG_INFO,
806 "[cpu_sched_debug] domino prevention: mark %s as deadline miss",
807 rp->name
808 );
809 }
810 rp->rr_sim_misses_deadline = true;
811 continue;
812 }
813 }
814 if (gstate.now - atp->last_deadline_miss_time < gstate.global_prefs.cpu_scheduling_period()) {
815 if (log_flags.cpu_sched_debug) {
816 RESULT* rp = atp->result;
817 PROJECT* p = rp->project;
818 msg_printf(p, MSG_INFO,
819 "[cpu_sched_debug] thrashing prevention: mark %s as deadline miss",
820 rp->name
821 );
822 }
823 atp->result->rr_sim_misses_deadline = true;
824 }
825 }
826 }
827
add_coproc_jobs(vector<RESULT * > & run_list,int rsc_type,PROC_RESOURCES & proc_rsc)828 void add_coproc_jobs(
829 vector<RESULT*>& run_list, int rsc_type, PROC_RESOURCES& proc_rsc
830 ) {
831 ACTIVE_TASK* atp;
832 RESULT* rp;
833
834 #ifdef SIM
835 if (!cpu_sched_rr_only) {
836 #endif
837 // choose coproc jobs from projects with coproc deadline misses
838 //
839 while (!proc_rsc.stop_scan_coproc(rsc_type)) {
840 rp = earliest_deadline_result(rsc_type);
841 if (!rp) break;
842 rp->already_selected = true;
843 atp = gstate.lookup_active_task_by_result(rp);
844 if (!proc_rsc.can_schedule(rp, atp)) continue;
845 proc_rsc.schedule(rp, atp, true);
846 rp->project->rsc_pwf[rsc_type].deadlines_missed_copy--;
847 rp->edf_scheduled = true;
848 run_list.push_back(rp);
849 }
850 #ifdef SIM
851 }
852 #endif
853
854 // then coproc jobs in FIFO order
855 //
856 while (!proc_rsc.stop_scan_coproc(rsc_type)) {
857 rp = first_coproc_result(rsc_type);
858 if (!rp) break;
859 rp->already_selected = true;
860 atp = gstate.lookup_active_task_by_result(rp);
861 if (!proc_rsc.can_schedule(rp, atp)) continue;
862 proc_rsc.schedule(rp, atp, false);
863 run_list.push_back(rp);
864 }
865 }
866
867 // Make an ordered list of jobs to run.
868 //
make_run_list(vector<RESULT * > & run_list)869 void CLIENT_STATE::make_run_list(vector<RESULT*>& run_list) {
870 RESULT* rp;
871 PROJECT* p;
872 unsigned int i;
873 PROC_RESOURCES proc_rsc;
874 ACTIVE_TASK* atp;
875
876 if (log_flags.cpu_sched_debug) {
877 msg_printf(0, MSG_INFO, "[cpu_sched_debug] schedule_cpus(): start");
878 }
879
880 proc_rsc.init();
881
882 // do round-robin simulation to find what results miss deadline
883 //
884 rr_simulation();
885 if (log_flags.rr_simulation) {
886 print_deadline_misses();
887 }
888
889 // avoid preemption of jobs that once ran EDF
890 //
891 promote_once_ran_edf();
892
893 // set temporary variables
894 //
895 project_priority_init(false);
896 for (i=0; i<results.size(); i++) {
897 rp = results[i];
898 rp->already_selected = false;
899 rp->edf_scheduled = false;
900 rp->not_started = !rp->computing_done();
901 }
902 for (i=0; i<projects.size(); i++) {
903 p = projects[i];
904 p->next_runnable_result = NULL;
905 for (int j=0; j<coprocs.n_rsc; j++) {
906 p->rsc_pwf[j].deadlines_missed_copy = p->rsc_pwf[j].deadlines_missed;
907 }
908 }
909
910 // compute max working set size for app versions
911 // (max of working sets of currently running jobs)
912 //
913 for (i=0; i<app_versions.size(); i++) {
914 app_versions[i]->max_working_set_size = 0;
915 }
916 for (i=0; i<active_tasks.active_tasks.size(); i++) {
917 atp = active_tasks.active_tasks[i];
918 atp->too_large = false;
919 double w = atp->procinfo.working_set_size_smoothed;
920 APP_VERSION* avp = atp->app_version;
921 if (w > avp->max_working_set_size) {
922 avp->max_working_set_size = w;
923 }
924 atp->result->not_started = false;
925 }
926
927 // first, add GPU jobs
928
929 for (int j=1; j<coprocs.n_rsc; j++) {
930 add_coproc_jobs(run_list, j, proc_rsc);
931 }
932
933 // then add CPU jobs.
934 // Note: the jobs that actually get run are not necessarily
935 // an initial segment of this list;
936 // e.g. a multithread job may not get run because it has
937 // a high-priority single-thread job ahead of it.
938
939 // choose CPU jobs from projects with CPU deadline misses
940 //
941 #ifdef SIM
942 if (!cpu_sched_rr_only) {
943 #endif
944 while (!proc_rsc.stop_scan_cpu()) {
945 rp = earliest_deadline_result(RSC_TYPE_CPU);
946 if (!rp) break;
947 rp->already_selected = true;
948 atp = lookup_active_task_by_result(rp);
949 if (!proc_rsc.can_schedule(rp, atp)) continue;
950 proc_rsc.schedule(rp, atp, true);
951 rp->project->rsc_pwf[0].deadlines_missed_copy--;
952 rp->edf_scheduled = true;
953 run_list.push_back(rp);
954 }
955 #ifdef SIM
956 }
957 #endif
958
959 // Next, choose CPU jobs from highest priority projects
960 //
961 while (!proc_rsc.stop_scan_cpu()) {
962 assign_results_to_projects();
963 rp = highest_prio_project_best_result();
964 if (!rp) break;
965 atp = lookup_active_task_by_result(rp);
966 if (!proc_rsc.can_schedule(rp, atp)) continue;
967 proc_rsc.schedule(rp, atp, false);
968 run_list.push_back(rp);
969 }
970
971 }
972
in_run_list(vector<RESULT * > & run_list,ACTIVE_TASK * atp)973 static inline bool in_run_list(vector<RESULT*>& run_list, ACTIVE_TASK* atp) {
974 for (unsigned int i=0; i<run_list.size(); i++) {
975 if (atp->result == run_list[i]) return true;
976 }
977 return false;
978 }
979
980 #if 0
981 // scan the runnable list, keeping track of CPU usage X.
982 // if find a MT job J, and X < ncpus, move J before all non-MT jobs
983 // But don't promote a MT job ahead of a job in EDF
984 //
985 // This is needed because there may always be a 1-CPU job
986 // in the middle of its time-slice, and MT jobs could starve.
987 //
988 static void promote_multi_thread_jobs(vector<RESULT*>& runnable_jobs) {
989 double cpus_used = 0;
990 vector<RESULT*>::iterator first_non_mt = runnable_jobs.end();
991 vector<RESULT*>::iterator cur = runnable_jobs.begin();
992 while(1) {
993 if (cur == runnable_jobs.end()) break;
994 if (cpus_used >= gstate.ncpus) break;
995 RESULT* rp = *cur;
996 if (rp->rr_sim_misses_deadline) break;
997 double nc = rp->avp->avg_ncpus;
998 if (nc > 1) {
999 if (first_non_mt != runnable_jobs.end()) {
1000 cur = runnable_jobs.erase(cur);
1001 runnable_jobs.insert(first_non_mt, rp);
1002 cpus_used = 0;
1003 first_non_mt = runnable_jobs.end();
1004 cur = runnable_jobs.begin();
1005 continue;
1006 }
1007 } else {
1008 if (first_non_mt == runnable_jobs.end()) {
1009 first_non_mt = cur;
1010 }
1011 }
1012 cpus_used += nc;
1013 cur++;
1014 }
1015 }
1016 #endif
1017
1018 // return true if r0 is more important to run than r1
1019 //
more_important(RESULT * r0,RESULT * r1)1020 static inline bool more_important(RESULT* r0, RESULT* r1) {
1021 // favor jobs in danger of deadline miss
1022 //
1023 bool miss0 = r0->edf_scheduled;
1024 bool miss1 = r1->edf_scheduled;
1025 if (miss0 && !miss1) return true;
1026 if (!miss0 && miss1) return false;
1027
1028 // favor coproc jobs, so that e.g. if we're RAM-limited
1029 // we'll use the GPU instead of the CPU
1030 //
1031 bool cp0 = r0->uses_coprocs();
1032 bool cp1 = r1->uses_coprocs();
1033 if (cp0 && !cp1) return true;
1034 if (!cp0 && cp1) return false;
1035
1036 // favor jobs in the middle of time slice,
1037 // or that haven't checkpointed since start of time slice
1038 //
1039 bool unfin0 = r0->unfinished_time_slice;
1040 bool unfin1 = r1->unfinished_time_slice;
1041 if (unfin0 && !unfin1) return true;
1042 if (!unfin0 && unfin1) return false;
1043
1044 // for CPU jobs, favor jobs that use more CPUs
1045 //
1046 if (!cp0) {
1047 if (r0->avp->avg_ncpus > r1->avp->avg_ncpus) return true;
1048 if (r1->avp->avg_ncpus > r0->avp->avg_ncpus) return false;
1049 }
1050
1051 // favor jobs selected first by schedule_cpus()
1052 // (e.g., because their project has high sched priority)
1053 //
1054 if (r0->seqno < r1->seqno) return true;
1055 if (r0->seqno > r1->seqno) return false;
1056
1057 // tie breaker
1058 return (r0 < r1);
1059 }
1060
print_job_list(vector<RESULT * > & jobs)1061 static void print_job_list(vector<RESULT*>& jobs) {
1062 for (unsigned int i=0; i<jobs.size(); i++) {
1063 RESULT* rp = jobs[i];
1064 msg_printf(rp->project, MSG_INFO,
1065 "[cpu_sched_debug] %d: %s (MD: %s; UTS: %s)",
1066 i, rp->name,
1067 rp->edf_scheduled?"yes":"no",
1068 rp->unfinished_time_slice?"yes":"no"
1069 );
1070 }
1071 }
1072
1073 // find running jobs that haven't finished their time slice.
1074 // Mark them as such, and add to list if not already there
1075 //
append_unfinished_time_slice(vector<RESULT * > & run_list)1076 void CLIENT_STATE::append_unfinished_time_slice(vector<RESULT*> &run_list) {
1077 unsigned int i;
1078 int seqno = (int)run_list.size();
1079
1080 for (i=0; i<active_tasks.active_tasks.size(); i++) {
1081 ACTIVE_TASK* atp = active_tasks.active_tasks[i];
1082 atp->overdue_checkpoint = false;
1083 if (!atp->result->runnable()) continue;
1084 if (atp->result->uses_gpu() && gpu_suspend_reason) continue;
1085 if (atp->result->non_cpu_intensive()) continue;
1086 if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
1087 if (finished_time_slice(atp)) continue;
1088 atp->result->unfinished_time_slice = true;
1089 if (in_run_list(run_list, atp)) continue;
1090 run_list.push_back(atp->result);
1091 atp->result->seqno = seqno;
1092 }
1093 }
1094
1095 // Enforce the CPU schedule.
1096 // Inputs:
1097 // ordered_scheduled_results
1098 // List of tasks that should (ideally) run, set by schedule_cpus().
1099 // Most important tasks (e.g. early deadline) are first.
1100 // The set of tasks that actually run may be different:
1101 // - if a task hasn't checkpointed recently we avoid preempting it
1102 // - we don't run tasks that would exceed working-set limits
1103 // Details:
1104 // Initially, each task's scheduler_state is PREEMPTED or SCHEDULED
1105 // depending on whether or not it is running.
1106 // This function sets each task's next_scheduler_state,
1107 // and at the end it starts/resumes and preempts tasks
1108 // based on scheduler_state and next_scheduler_state.
1109 //
enforce_run_list(vector<RESULT * > & run_list)1110 bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
1111 unsigned int i;
1112 int retval;
1113 double ncpus_used=0;
1114 ACTIVE_TASK* atp;
1115
1116 bool action = false;
1117
1118 if (have_max_concurrent) max_concurrent_init();
1119
1120 #ifndef SIM
1121 // check whether GPUs are usable
1122 //
1123 if (check_coprocs_usable()) {
1124 request_schedule_cpus("GPU usability change");
1125 return true;
1126 }
1127 #endif
1128
1129 if (log_flags.cpu_sched_debug) {
1130 msg_printf(0, MSG_INFO, "[cpu_sched_debug] enforce_run_list(): start");
1131 msg_printf(0, MSG_INFO, "[cpu_sched_debug] preliminary job list:");
1132 print_job_list(run_list);
1133 }
1134
1135 // Set next_scheduler_state to PREEMPT for all tasks
1136 //
1137 for (i=0; i< active_tasks.active_tasks.size(); i++) {
1138 atp = active_tasks.active_tasks[i];
1139 atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
1140 }
1141
1142 for (i=0; i<run_list.size(); i++) {
1143 RESULT* rp = run_list[i];
1144 rp->seqno = i;
1145 rp->unfinished_time_slice = false;
1146 }
1147
1148 // append running jobs not done with time slice to the to-run list
1149 //
1150 append_unfinished_time_slice(run_list);
1151
1152 // sort to-run list by decreasing importance
1153 //
1154 std::sort(
1155 run_list.begin(),
1156 run_list.end(),
1157 more_important
1158 );
1159
1160 #if 0
1161 promote_multi_thread_jobs(run_list);
1162 #endif
1163
1164 if (log_flags.cpu_sched_debug) {
1165 msg_printf(0, MSG_INFO, "[cpu_sched_debug] final job list:");
1166 print_job_list(run_list);
1167 }
1168
1169 double ram_left = available_ram();
1170 double swap_left = (global_prefs.vm_max_used_frac)*host_info.m_swap;
1171
1172 if (log_flags.mem_usage_debug) {
1173 msg_printf(0, MSG_INFO,
1174 "[mem_usage] enforce: available RAM %.2fMB swap %.2fMB",
1175 ram_left/MEGA, swap_left/MEGA
1176 );
1177 }
1178
1179 // schedule non-CPU-intensive tasks,
1180 // and look for backed-off GPU jobs
1181 //
1182 for (i=0; i<results.size(); i++) {
1183 RESULT* rp = results[i];
1184 if (rp->non_cpu_intensive() && rp->runnable()) {
1185 atp = get_task(rp);
1186 if (!atp) {
1187 msg_printf(rp->project, MSG_INTERNAL_ERROR,
1188 "Can't create task for %s", rp->name
1189 );
1190 continue;
1191 }
1192 atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
1193
1194 // don't count RAM usage because it's used sporadically,
1195 // and doing so can starve other jobs
1196 //
1197 //ram_left -= atp->procinfo.working_set_size_smoothed;
1198 swap_left -= atp->procinfo.swap_size;
1199 }
1200 }
1201
1202 // assign coprocessors to coproc jobs,
1203 // and prune those that can't be assigned
1204 //
1205 assign_coprocs(run_list);
1206 bool scheduled_mt = false;
1207
1208 // prune jobs that don't fit in RAM or that exceed CPU usage limits.
1209 // Mark the rest as SCHEDULED
1210 //
1211 for (i=0; i<run_list.size(); i++) {
1212 RESULT* rp = run_list[i];
1213
1214 if (max_concurrent_exceeded(rp)) {
1215 if (log_flags.cpu_sched_debug) {
1216 msg_printf(rp->project, MSG_INFO,
1217 "[cpu_sched_debug] skipping %s; max concurrent limit %d reached",
1218 rp->name, rp->app->max_concurrent
1219 );
1220 }
1221 continue;
1222 }
1223
1224 atp = lookup_active_task_by_result(rp);
1225
1226 // if we're already using all the CPUs,
1227 // don't allow additional CPU jobs;
1228 // allow coproc jobs if the resulting CPU load is at most ncpus+1
1229 //
1230 if (ncpus_used >= ncpus) {
1231 if (rp->uses_coprocs()) {
1232 if (ncpus_used + rp->avp->avg_ncpus > ncpus+1) {
1233 if (log_flags.cpu_sched_debug) {
1234 msg_printf(rp->project, MSG_INFO,
1235 "[cpu_sched_debug] skipping GPU job %s; CPU committed",
1236 rp->name
1237 );
1238 }
1239 continue;
1240 }
1241 } else {
1242 if (log_flags.cpu_sched_debug) {
1243 msg_printf(rp->project, MSG_INFO,
1244 "[cpu_sched_debug] all CPUs used (%.2f >= %d), skipping %s",
1245 ncpus_used, ncpus,
1246 rp->name
1247 );
1248 }
1249 continue;
1250 }
1251 }
1252
1253 #if 0
1254 // Don't overcommit CPUs by > 1 if a MT job is scheduled.
1255 // Skip this check for coproc jobs.
1256 //
1257 if (!rp->uses_coprocs()
1258 && (scheduled_mt || (rp->avp->avg_ncpus > 1))
1259 && (ncpus_used + rp->avp->avg_ncpus > ncpus + 1)
1260 ) {
1261 if (log_flags.cpu_sched_debug) {
1262 msg_printf(rp->project, MSG_INFO,
1263 "[cpu_sched_debug] avoid MT overcommit: skipping %s",
1264 rp->name
1265 );
1266 }
1267 continue;
1268 }
1269 #endif
1270
1271 // skip jobs whose working set is too large to fit in available RAM
1272 //
1273 double wss = 0;
1274 if (atp) {
1275 atp->too_large = false;
1276 wss = atp->procinfo.working_set_size_smoothed;
1277 } else {
1278 wss = rp->avp->max_working_set_size;
1279 }
1280 if (wss == 0) {
1281 wss = rp->wup->rsc_memory_bound;
1282 }
1283 if (wss > ram_left) {
1284 if (atp) {
1285 atp->too_large = true;
1286 }
1287 if (log_flags.cpu_sched_debug || log_flags.mem_usage_debug) {
1288 msg_printf(rp->project, MSG_INFO,
1289 "[cpu_sched_debug] enforce: task %s can't run, too big %.2fMB > %.2fMB",
1290 rp->name, wss/MEGA, ram_left/MEGA
1291 );
1292 }
1293 continue;
1294 }
1295
1296 if (log_flags.cpu_sched_debug) {
1297 msg_printf(rp->project, MSG_INFO,
1298 "[cpu_sched_debug] scheduling %s%s",
1299 rp->name,
1300 rp->edf_scheduled?" (high priority)":""
1301 );
1302 }
1303
1304 // We've decided to run this job; create an ACTIVE_TASK if needed.
1305 //
1306 if (!atp) {
1307 atp = get_task(rp);
1308 }
1309 if (!atp) {
1310 msg_printf(rp->project, MSG_INTERNAL_ERROR,
1311 "Can't create task for %s", rp->name
1312 );
1313 continue;
1314 }
1315
1316 if (rp->avp->avg_ncpus > 1) {
1317 scheduled_mt = true;
1318 }
1319 ncpus_used += rp->avp->avg_ncpus;
1320 atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
1321 ram_left -= wss;
1322 max_concurrent_inc(rp);
1323 }
1324
1325 if (log_flags.cpu_sched_debug && ncpus_used < ncpus) {
1326 msg_printf(0, MSG_INFO, "[cpu_sched_debug] using %.2f out of %d CPUs",
1327 ncpus_used, ncpus
1328 );
1329 if (ncpus_used < ncpus) {
1330 request_work_fetch("CPUs idle");
1331 }
1332 }
1333
1334 bool check_swap = (host_info.m_swap != 0);
1335 // in case couldn't measure swap on this host
1336
1337 // TODO: enforcement of swap space is broken right now
1338
1339 // preempt tasks as needed, and note whether there are any coproc jobs
1340 // in QUIT_PENDING state (in which case we won't start new coproc jobs)
1341 //
1342 bool coproc_quit_pending = false;
1343 for (i=0; i<active_tasks.active_tasks.size(); i++) {
1344 atp = active_tasks.active_tasks[i];
1345 #if 0
1346 if (log_flags.cpu_sched_debug) {
1347 msg_printf(atp->result->project, MSG_INFO,
1348 "[cpu_sched_debug] %s sched state %d next %d task state %d",
1349 atp->result->name, atp->scheduler_state,
1350 atp->next_scheduler_state, atp->task_state()
1351 );
1352 }
1353 #endif
1354 int preempt_type = REMOVE_MAYBE_SCHED;
1355 switch (atp->next_scheduler_state) {
1356 case CPU_SCHED_PREEMPTED:
1357 switch (atp->task_state()) {
1358 case PROCESS_EXECUTING:
1359 action = true;
1360 if (check_swap && swap_left < 0) {
1361 if (log_flags.mem_usage_debug) {
1362 msg_printf(atp->result->project, MSG_INFO,
1363 "[mem_usage] out of swap space, will preempt by quit"
1364 );
1365 }
1366 preempt_type = REMOVE_ALWAYS;
1367 }
1368 if (atp->too_large) {
1369 if (log_flags.mem_usage_debug) {
1370 msg_printf(atp->result->project, MSG_INFO,
1371 "[mem_usage] job using too much memory, will preempt by quit"
1372 );
1373 }
1374 preempt_type = REMOVE_ALWAYS;
1375 }
1376 atp->preempt(preempt_type);
1377 break;
1378 case PROCESS_SUSPENDED:
1379 // remove from memory GPU jobs that were suspended by CPU throttling
1380 // and are now unscheduled.
1381 //
1382 if (atp->result->uses_gpu()) {
1383 atp->preempt(REMOVE_ALWAYS);
1384 request_schedule_cpus("removed suspended GPU task");
1385 break;
1386 }
1387
1388 // Handle the case where user changes prefs from
1389 // "leave in memory" to "remove from memory";
1390 // need to quit suspended tasks.
1391 //
1392 if (atp->checkpoint_cpu_time && !global_prefs.leave_apps_in_memory) {
1393 atp->preempt(REMOVE_ALWAYS);
1394 }
1395 break;
1396 }
1397 atp->scheduler_state = CPU_SCHED_PREEMPTED;
1398 break;
1399 }
1400 if (atp->result->uses_coprocs() && atp->task_state() == PROCESS_QUIT_PENDING) {
1401 coproc_quit_pending = true;
1402 }
1403 }
1404
1405 bool coproc_start_deferred = false;
1406 for (i=0; i<active_tasks.active_tasks.size(); i++) {
1407 atp = active_tasks.active_tasks[i];
1408 if (atp->next_scheduler_state != CPU_SCHED_SCHEDULED) continue;
1409 int ts = atp->task_state();
1410 if (ts == PROCESS_UNINITIALIZED || ts == PROCESS_SUSPENDED) {
1411 // If there's a quit pending for a coproc job,
1412 // don't start new ones since they may bomb out
1413 // on memory allocation. Instead, trigger a retry
1414 //
1415 if (atp->result->uses_coprocs() && coproc_quit_pending) {
1416 coproc_start_deferred = true;
1417 continue;
1418 }
1419 action = true;
1420
1421 bool first_time;
1422 // GPU tasks can get suspended before they're ever run,
1423 // so the only safe way of telling whether this is the
1424 // first time the app is run is to check
1425 // whether the slot dir is empty
1426 //
1427 #ifdef SIM
1428 first_time = atp->scheduler_state == CPU_SCHED_UNINITIALIZED;
1429 #else
1430 first_time = is_dir_empty(atp->slot_dir);
1431 #endif
1432 retval = atp->resume_or_start(first_time);
1433 if ((retval == ERR_SHMGET) || (retval == ERR_SHMAT)) {
1434 // Assume no additional shared memory segs
1435 // will be available in the next 10 seconds
1436 // (run only tasks which are already attached to shared memory).
1437 //
1438 if (gstate.retry_shmem_time < gstate.now) {
1439 request_schedule_cpus("no more shared memory");
1440 }
1441 gstate.retry_shmem_time = gstate.now + 10.0;
1442 continue;
1443 }
1444 if (retval) {
1445 char err_msg[4096];
1446 sprintf(err_msg, "Couldn't start or resume: %d", retval);
1447 report_result_error(*(atp->result), err_msg);
1448 request_schedule_cpus("start failed");
1449 continue;
1450 }
1451 if (atp->result->rr_sim_misses_deadline) {
1452 atp->once_ran_edf = true;
1453 }
1454 atp->run_interval_start_wall_time = now;
1455 app_started = now;
1456 }
1457 if (log_flags.cpu_sched_status) {
1458 msg_printf(atp->result->project, MSG_INFO,
1459 "[css] running %s (%s)",
1460 atp->result->name, atp->result->resources
1461 );
1462 }
1463 atp->scheduler_state = CPU_SCHED_SCHEDULED;
1464 swap_left -= atp->procinfo.swap_size;
1465
1466 #ifndef SIM
1467 // if we've been in this loop for > 10 secs,
1468 // break out of it and arrange for another schedule()
1469 // so that we don't miss GUI RPCs, heartbeats etc.
1470 //
1471 if (dtime() - now > MAX_STARTUP_TIME) {
1472 if (log_flags.cpu_sched_debug) {
1473 msg_printf(0, MSG_INFO,
1474 "[cpu_sched_debug] app startup took %f secs", dtime() - now
1475 );
1476 }
1477 request_schedule_cpus("slow app startup");
1478 break;
1479 }
1480 #endif
1481
1482 }
1483 if (action) {
1484 set_client_state_dirty("enforce_cpu_schedule");
1485 }
1486 if (log_flags.cpu_sched_debug) {
1487 msg_printf(0, MSG_INFO, "[cpu_sched_debug] enforce_run_list: end");
1488 }
1489 if (coproc_start_deferred) {
1490 if (log_flags.cpu_sched_debug) {
1491 msg_printf(0, MSG_INFO,
1492 "[cpu_sched_debug] coproc quit pending, deferring start"
1493 );
1494 }
1495 request_schedule_cpus("coproc quit retry");
1496 }
1497 return action;
1498 }
1499
1500 // trigger CPU scheduling.
1501 // Called when a result is completed,
1502 // when new results become runnable,
1503 // or when the user performs a UI interaction
1504 // (e.g. suspending or resuming a project or result).
1505 //
request_schedule_cpus(const char * where)1506 void CLIENT_STATE::request_schedule_cpus(const char* where) {
1507 if (log_flags.cpu_sched_debug) {
1508 msg_printf(0, MSG_INFO, "[cpu_sched_debug] Request CPU reschedule: %s", where);
1509 }
1510 must_schedule_cpus = true;
1511 }
1512
1513 // Find the active task for a given result
1514 //
lookup_active_task_by_result(RESULT * rep)1515 ACTIVE_TASK* CLIENT_STATE::lookup_active_task_by_result(RESULT* rep) {
1516 for (unsigned int i = 0; i < active_tasks.active_tasks.size(); i ++) {
1517 if (active_tasks.active_tasks[i]->result == rep) {
1518 return active_tasks.active_tasks[i];
1519 }
1520 }
1521 return NULL;
1522 }
1523
1524 // find total resource shares of all CPU-intensive projects
1525 //
total_resource_share()1526 double CLIENT_STATE::total_resource_share() {
1527 double x = 0;
1528 for (unsigned int i=0; i<projects.size(); i++) {
1529 if (!projects[i]->non_cpu_intensive ) {
1530 x += projects[i]->resource_share;
1531 }
1532 }
1533 return x;
1534 }
1535
1536 // same, but only runnable projects (can use CPU right now)
1537 //
runnable_resource_share(int rsc_type)1538 double CLIENT_STATE::runnable_resource_share(int rsc_type) {
1539 double x = 0;
1540 for (unsigned int i=0; i<projects.size(); i++) {
1541 PROJECT* p = projects[i];
1542 if (p->non_cpu_intensive) continue;
1543 if (p->runnable(rsc_type)) {
1544 x += p->resource_share;
1545 }
1546 }
1547 return x;
1548 }
1549
1550 // same, but potentially runnable (could ask for work right now)
1551 //
potentially_runnable_resource_share()1552 double CLIENT_STATE::potentially_runnable_resource_share() {
1553 double x = 0;
1554 for (unsigned int i=0; i<projects.size(); i++) {
1555 PROJECT* p = projects[i];
1556 if (p->non_cpu_intensive) continue;
1557 if (p->potentially_runnable()) {
1558 x += p->resource_share;
1559 }
1560 }
1561 return x;
1562 }
1563
1564 // same, but nearly runnable (could be downloading work right now)
1565 //
nearly_runnable_resource_share()1566 double CLIENT_STATE::nearly_runnable_resource_share() {
1567 double x = 0;
1568 for (unsigned int i=0; i<projects.size(); i++) {
1569 PROJECT* p = projects[i];
1570 if (p->non_cpu_intensive) continue;
1571 if (p->nearly_runnable()) {
1572 x += p->resource_share;
1573 }
1574 }
1575 return x;
1576 }
1577
1578 // if there's not an active task for the result, make one
1579 //
get_task(RESULT * rp)1580 ACTIVE_TASK* CLIENT_STATE::get_task(RESULT* rp) {
1581 ACTIVE_TASK *atp = lookup_active_task_by_result(rp);
1582 if (!atp) {
1583 atp = new ACTIVE_TASK;
1584 int retval = atp->get_free_slot(rp);
1585 if (retval) {
1586 delete atp;
1587 return NULL;
1588 }
1589 atp->init(rp);
1590 active_tasks.active_tasks.push_back(atp);
1591 }
1592 return atp;
1593 }
1594
1595 // called at startup (after get_host_info())
1596 // and when general prefs have been parsed.
1597 // NOTE: GSTATE.NCPUS MUST BE 1 OR MORE; WE DIVIDE BY IT IN A COUPLE OF PLACES
1598 //
set_ncpus()1599 void CLIENT_STATE::set_ncpus() {
1600 int ncpus_old = ncpus;
1601
1602 if (cc_config.ncpus>0) {
1603 ncpus = cc_config.ncpus;
1604 host_info.p_ncpus = ncpus;
1605 } else if (host_info.p_ncpus>0) {
1606 ncpus = host_info.p_ncpus;
1607 } else {
1608 ncpus = 1;
1609 }
1610
1611 if (global_prefs.max_ncpus_pct) {
1612 ncpus = (int)((ncpus * global_prefs.max_ncpus_pct)/100);
1613 if (ncpus == 0) ncpus = 1;
1614 }
1615
1616 if (initialized && ncpus != ncpus_old) {
1617 msg_printf(0, MSG_INFO,
1618 "Number of usable CPUs has changed from %d to %d.",
1619 ncpus_old, ncpus
1620 );
1621 request_schedule_cpus("Number of usable CPUs has changed");
1622 request_work_fetch("Number of usable CPUs has changed");
1623 work_fetch.init();
1624 }
1625 }
1626
1627 // The given result has just completed successfully.
1628 // Update the correction factor used to predict
1629 // completion time for this project's results
1630 //
update_duration_correction_factor(ACTIVE_TASK * atp)1631 void PROJECT::update_duration_correction_factor(ACTIVE_TASK* atp) {
1632 if (dont_use_dcf) return;
1633 RESULT* rp = atp->result;
1634 double raw_ratio = atp->elapsed_time/rp->estimated_runtime_uncorrected();
1635 double adj_ratio = atp->elapsed_time/rp->estimated_runtime();
1636 double old_dcf = duration_correction_factor;
1637
1638 // it's OK to overestimate completion time,
1639 // but bad to underestimate it.
1640 // So make it easy for the factor to increase,
1641 // but decrease it with caution
1642 //
1643 if (adj_ratio > 1.1) {
1644 duration_correction_factor = raw_ratio;
1645 } else {
1646 // in particular, don't give much weight to results
1647 // that completed a lot earlier than expected
1648 //
1649 if (adj_ratio < 0.1) {
1650 duration_correction_factor = duration_correction_factor*0.99 + 0.01*raw_ratio;
1651 } else {
1652 duration_correction_factor = duration_correction_factor*0.9 + 0.1*raw_ratio;
1653 }
1654 }
1655 // limit to [.01 .. 100]
1656 //
1657 if (duration_correction_factor > 100) duration_correction_factor = 100;
1658 if (duration_correction_factor < 0.01) duration_correction_factor = 0.01;
1659
1660 if (log_flags.dcf_debug) {
1661 msg_printf(this, MSG_INFO,
1662 "[dcf] DCF: %f->%f, raw_ratio %f, adj_ratio %f",
1663 old_dcf, duration_correction_factor, raw_ratio, adj_ratio
1664 );
1665 }
1666 }
1667