1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2014 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC. If not, see <http://www.gnu.org/licenses/>.
17
18 #include "cpp.h"
19
20 #ifdef _WIN32
21 #include "boinc_win.h"
22 #else
23 #include "config.h"
24 #include <cmath>
25 #endif
26
27 #ifdef _MSC_VER
28 #define snprintf _snprintf
29 #endif
30
31 #include "util.h"
32 #include "str_replace.h"
33
34 #include "client_msgs.h"
35 #include "client_state.h"
36 #include "project.h"
37 #include "result.h"
38 #include "scheduler_op.h"
39
40 #include "work_fetch.h"
41
42 #if 0
43 #define WF_DEBUG(x) x
44 #else
45 #define WF_DEBUG(X)
46 #endif
47
48 using std::vector;
49
50 RSC_WORK_FETCH rsc_work_fetch[MAX_RSC];
51 WORK_FETCH work_fetch;
52
53 // does the (NCI) project have a job that's running or uploading?
54 // (don't request another job from NCI project if so)
55 //
has_a_job_in_progress(PROJECT * p)56 static bool has_a_job_in_progress(PROJECT* p) {
57 for (unsigned int j=0; j<gstate.results.size(); j++) {
58 RESULT* rp = gstate.results[j];
59 if (rp->project != p) continue;
60 if (rp->state() < RESULT_FILES_UPLOADED) {
61 return true;
62 }
63 }
64 return false;
65 }
66
has_coproc_app(PROJECT * p,int rsc_type)67 inline bool has_coproc_app(PROJECT* p, int rsc_type) {
68 unsigned int i;
69 for (i=0; i<gstate.app_versions.size(); i++) {
70 APP_VERSION* avp = gstate.app_versions[i];
71 if (avp->project != p) continue;
72 if (avp->gpu_usage.rsc_type == rsc_type) return true;
73 }
74 return false;
75 }
76
77 /////////////// RSC_PROJECT_WORK_FETCH ///////////////
78
rr_init()79 void RSC_PROJECT_WORK_FETCH::rr_init() {
80 fetchable_share = 0;
81 n_runnable_jobs = 0;
82 sim_nused = 0;
83 nused_total = 0;
84 deadlines_missed = 0;
85 }
86
resource_backoff(PROJECT * p,const char * name)87 void RSC_PROJECT_WORK_FETCH::resource_backoff(PROJECT* p, const char* name) {
88 if (backoff_interval) {
89 backoff_interval *= 2;
90 if (backoff_interval > WF_MAX_BACKOFF_INTERVAL) backoff_interval = WF_MAX_BACKOFF_INTERVAL;
91 } else {
92 backoff_interval = WF_MIN_BACKOFF_INTERVAL;
93 }
94 double x = (.5 + drand())*backoff_interval;
95 backoff_time = gstate.now + x;
96 if (log_flags.work_fetch_debug) {
97 msg_printf(p, MSG_INFO,
98 "[work_fetch] backing off %s %.0f sec", name, x
99 );
100 }
101 }
102
103 // checks for whether we should ask this project for work of this type.
104 // check for backoff must go last, so that if that's the reason
105 // we know that there are no other reasons (for piggyback)
106 //
compute_rsc_project_reason(PROJECT * p,int rsc_type)107 int RSC_PROJECT_WORK_FETCH::compute_rsc_project_reason(
108 PROJECT *p, int rsc_type
109 ) {
110 RSC_WORK_FETCH& rwf = rsc_work_fetch[rsc_type];
111 // see whether work fetch for this resource is banned
112 // by prefs, config, project, or acct mgr
113 //
114 if (p->no_rsc_pref[rsc_type]) return DONT_FETCH_PREFS;
115 if (p->no_rsc_config[rsc_type]) return DONT_FETCH_CONFIG;
116 if (p->no_rsc_apps[rsc_type]) return DONT_FETCH_NO_APPS;
117 if (p->no_rsc_ams[rsc_type]) return DONT_FETCH_AMS;
118 if (p->rsc_pwf[rsc_type].has_deferred_job) return DONT_FETCH_DEFER_SCHED;
119
120 // if project has zero resource share,
121 // only fetch work if a device is idle
122 //
123 if (p->resource_share == 0 && rwf.nidle_now == 0) {
124 return DONT_FETCH_ZERO_SHARE;
125 }
126
127 // if project has excluded GPUs of this type,
128 // we need to avoid fetching work just because there's an idle instance
129 // or a shortfall;
130 // fetching work might not alleviate either of these,
131 // and we'd end up fetching unbounded work.
132 // At the same time, we want to respect work buf params if possible.
133 //
134 // Current policy:
135 // don't fetch work if remaining time of this project's jobs
136 // exceeds work_buf_min * (#usable instances / #instances)
137 //
138 // TODO: THIS IS FAIRLY CRUDE. Making it smarter would require
139 // computing shortfall etc. on a per-project basis
140 //
141 int nexcl = ncoprocs_excluded;
142 if (rsc_type && nexcl) {
143 int n_not_excluded = rwf.ninstances - nexcl;
144 if (n_runnable_jobs >= n_not_excluded
145 && queue_est > (gstate.work_buf_min() * n_not_excluded)/rwf.ninstances
146 ) {
147 return DONT_FETCH_BUFFER_FULL;
148 }
149 }
150
151 if (anonymous_platform_no_apps) {
152 return DONT_FETCH_NO_APPS;
153 }
154
155 // this must go last
156 //
157 if (backoff_time > gstate.now) {
158 return DONT_FETCH_BACKED_OFF;
159 }
160 return 0;
161 }
162
163 /////////////// RSC_WORK_FETCH ///////////////
164
copy_request(COPROC & c)165 void RSC_WORK_FETCH::copy_request(COPROC& c) {
166 c.req_secs = req_secs;
167 c.req_instances = req_instances;
168 c.estimated_delay = req_secs?busy_time_estimator.get_busy_time():0;
169 }
170
project_state(PROJECT * p)171 RSC_PROJECT_WORK_FETCH& RSC_WORK_FETCH::project_state(PROJECT* p) {
172 return p->rsc_pwf[rsc_type];
173 }
174
rr_init()175 void RSC_WORK_FETCH::rr_init() {
176 shortfall = 0;
177 nidle_now = 0;
178 sim_nused = 0;
179 total_fetchable_share = 0;
180 deadline_missed_instances = 0;
181 saturated_time = 0;
182 busy_time_estimator.reset();
183 sim_used_instances = 0;
184 }
185
update_stats(double sim_now,double dt,double buf_end)186 void RSC_WORK_FETCH::update_stats(double sim_now, double dt, double buf_end) {
187 double idle = ninstances - sim_nused;
188 if (idle > 1e-6 && sim_now < buf_end) {
189 double dt2;
190 if (sim_now + dt > buf_end) {
191 dt2 = buf_end - sim_now;
192 } else {
193 dt2 = dt;
194 }
195 shortfall += idle*dt2;
196 }
197 if (idle < 1e-6) {
198 saturated_time = sim_now + dt - gstate.now;
199 }
200 }
201
update_busy_time(double dur,double nused)202 void RSC_WORK_FETCH::update_busy_time(double dur, double nused) {
203 busy_time_estimator.update(dur, nused);
204 }
205
wacky_dcf(PROJECT * p)206 static bool wacky_dcf(PROJECT* p) {
207 if (p->dont_use_dcf) return false;
208 double dcf = p->duration_correction_factor;
209 return (dcf < 0.02 || dcf > 80.0);
210 }
211
212 // request this project's share of shortfall and instances.
213 // don't request anything if project is backed off.
214 //
set_request(PROJECT * p)215 void RSC_WORK_FETCH::set_request(PROJECT* p) {
216
217 // if backup project, fetch 1 job per idle instance
218 //
219 if (p->resource_share == 0) {
220 req_instances = nidle_now;
221 req_secs = 1;
222 return;
223 }
224 if (cc_config.fetch_minimal_work) {
225 req_instances = ninstances;
226 req_secs = 1;
227 return;
228 }
229 RSC_PROJECT_WORK_FETCH& w = project_state(p);
230 double non_excl_inst = ninstances - w.ncoprocs_excluded;
231 if (shortfall) {
232 if (wacky_dcf(p)) {
233 // if project's DCF is too big or small,
234 // its completion time estimates are useless; just ask for 1 second
235 //
236 req_secs = 1;
237 } else {
238 req_secs = shortfall;
239 if (w.ncoprocs_excluded) {
240 req_secs *= non_excl_inst/ninstances;
241 }
242 }
243 }
244
245 double instance_share = ninstances*w.fetchable_share;
246 if (instance_share > non_excl_inst) {
247 instance_share = non_excl_inst;
248 }
249 instance_share -= w.nused_total;
250 req_instances = std::max(nidle_now, instance_share);
251
252 if (log_flags.work_fetch_debug) {
253 msg_printf(p, MSG_INFO,
254 "[work_fetch] set_request() for %s: ninst %d nused_total %.2f nidle_now %.2f fetch share %.2f req_inst %.2f req_secs %.2f",
255 rsc_name_long(rsc_type), ninstances, w.nused_total, nidle_now,
256 w.fetchable_share, req_instances, req_secs
257 );
258 }
259 if (req_instances && !req_secs) {
260 req_secs = 1;
261 }
262 }
263
264 // We're fetching work because some instances are starved because
265 // of exclusions.
266 // See how many N of these instances are not excluded for this project.
267 // Ask for N instances and for N*work_buf_min seconds.
268 //
set_request_excluded(PROJECT * p)269 void RSC_WORK_FETCH::set_request_excluded(PROJECT* p) {
270 RSC_PROJECT_WORK_FETCH& pwf = project_state(p);
271
272 int inst_mask = sim_excluded_instances & pwf.non_excluded_instances;
273 int n = 0;
274 for (int i=0; i<ninstances; i++) {
275 if ((1<<i) & inst_mask) {
276 n++;
277 }
278 }
279 WF_DEBUG(msg_printf(p, MSG_INFO, "set_request_excluded() %d %d %d", sim_excluded_instances, pwf.non_excluded_instances, n));
280 req_instances = n;
281 if (p->resource_share == 0 || cc_config.fetch_minimal_work) {
282 req_secs = 1;
283 } else {
284 req_secs = n*gstate.work_buf_total();
285 }
286 }
287
print_state(const char * name)288 void RSC_WORK_FETCH::print_state(const char* name) {
289 msg_printf(0, MSG_INFO, "[work_fetch] --- state for %s ---", name);
290 msg_printf(0, MSG_INFO,
291 "[work_fetch] shortfall %.2f nidle %.2f saturated %.2f busy %.2f",
292 shortfall, nidle_now, saturated_time,
293 busy_time_estimator.get_busy_time()
294 );
295 // msg_printf(0, MSG_INFO, "[work_fetch] sim used inst %d sim excl inst %d",
296 // sim_used_instances, sim_excluded_instances
297 // );
298 for (unsigned int i=0; i<gstate.projects.size(); i++) {
299 char buf[256];
300 PROJECT* p = gstate.projects[i];
301 if (p->non_cpu_intensive) continue;
302 RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
303 double bt = rpwf.backoff_time>gstate.now?rpwf.backoff_time-gstate.now:0;
304 if (bt) {
305 snprintf(buf, sizeof(buf),
306 " (resource backoff: %.2f, inc %.2f)",
307 bt, rpwf.backoff_interval
308 );
309 } else {
310 safe_strcpy(buf, "");
311 }
312 msg_printf(p, MSG_INFO,
313 "[work_fetch] share %.3f %s %s",
314 rpwf.fetchable_share,
315 rsc_project_reason_string(rpwf.rsc_project_reason),
316 buf
317 );
318 }
319 }
320
clear_request()321 void RSC_WORK_FETCH::clear_request() {
322 req_secs = 0;
323 req_instances = 0;
324 }
325
326 /////////////// PROJECT_WORK_FETCH ///////////////
327
compute_project_reason(PROJECT * p)328 int PROJECT_WORK_FETCH::compute_project_reason(PROJECT* p) {
329 if (p->non_cpu_intensive) return CANT_FETCH_WORK_NON_CPU_INTENSIVE;
330 if (p->suspended_via_gui) return CANT_FETCH_WORK_SUSPENDED_VIA_GUI;
331 if (p->master_url_fetch_pending) return CANT_FETCH_WORK_MASTER_URL_FETCH_PENDING;
332 if (p->dont_request_more_work) return CANT_FETCH_WORK_DONT_REQUEST_MORE_WORK;
333 if (p->some_download_stalled()) return CANT_FETCH_WORK_DOWNLOAD_STALLED;
334 if (p->some_result_suspended()) return CANT_FETCH_WORK_RESULT_SUSPENDED;
335 if (p->too_many_uploading_results) return CANT_FETCH_WORK_TOO_MANY_UPLOADS;
336
337 // this goes last, so that if this is the reason we know
338 // that there are no other reasons
339 //
340 if (p->min_rpc_time > gstate.now) return CANT_FETCH_WORK_MIN_RPC_TIME;
341 return 0;
342 }
343
reset(PROJECT * p)344 void PROJECT_WORK_FETCH::reset(PROJECT* p) {
345 for (int i=0; i<coprocs.n_rsc; i++) {
346 p->rsc_pwf[i].reset();
347 }
348 }
349
rr_init(PROJECT * p)350 void PROJECT_WORK_FETCH::rr_init(PROJECT* p) {
351 project_reason = compute_project_reason(p);
352 n_runnable_jobs = 0;
353 }
354
print_state(PROJECT * p)355 void PROJECT_WORK_FETCH::print_state(PROJECT* p) {
356 char buf[1024], buf2[1024];
357 if (project_reason) {
358 snprintf(buf, sizeof(buf),
359 "can't request work: %s",
360 project_reason_string(p, buf2, sizeof(buf2))
361 );
362 } else {
363 safe_strcpy(buf, "can request work");
364 }
365 if (p->min_rpc_time > gstate.now) {
366 snprintf(buf2, sizeof(buf2),
367 " (%.2f sec)",
368 p->min_rpc_time - gstate.now
369 );
370 safe_strcat(buf, buf2);
371 }
372 msg_printf(p, MSG_INFO,
373 "[work_fetch] REC %.3f prio %.3f %s",
374 rec,
375 p->sched_priority,
376 buf
377 );
378 }
379
380 /////////////// WORK_FETCH ///////////////
381
rr_init()382 void WORK_FETCH::rr_init() {
383 // compute PROJECT::RSC_PROJECT_WORK_FETCH::has_deferred_job
384 //
385 for (unsigned int i=0; i<gstate.projects.size(); i++) {
386 PROJECT* p = gstate.projects[i];
387 for (int j=0; j<coprocs.n_rsc; j++) {
388 p->rsc_pwf[j].has_deferred_job = false;
389 }
390 }
391 for (unsigned int i=0; i<gstate.results.size(); i++) {
392 RESULT* rp = gstate.results[i];
393 if (rp->schedule_backoff) {
394 if (rp->schedule_backoff > gstate.now) {
395 int rt = rp->avp->gpu_usage.rsc_type;
396 rp->project->rsc_pwf[rt].has_deferred_job = true;
397 } else {
398 rp->schedule_backoff = 0;
399 gstate.request_schedule_cpus("schedule backoff finished");
400 }
401 }
402 }
403
404 for (int i=0; i<coprocs.n_rsc; i++) {
405 rsc_work_fetch[i].rr_init();
406 }
407 for (unsigned int i=0; i<gstate.projects.size(); i++) {
408 PROJECT* p = gstate.projects[i];
409 p->pwf.rr_init(p);
410 for (int j=0; j<coprocs.n_rsc; j++) {
411 p->rsc_pwf[j].rr_init();
412 }
413 }
414 }
415 // copy request fields from RSC_WORK_FETCH to COPROCS
416 //
copy_requests()417 void WORK_FETCH::copy_requests() {
418 for (int i=0; i<coprocs.n_rsc; i++) {
419 switch (coproc_type_name_to_num(coprocs.coprocs[i].type)) {
420 case PROC_TYPE_NVIDIA_GPU:
421 rsc_work_fetch[i].copy_request(coprocs.nvidia);
422 break;
423 case PROC_TYPE_AMD_GPU:
424 rsc_work_fetch[i].copy_request(coprocs.ati);
425 break;
426 case PROC_TYPE_INTEL_GPU:
427 rsc_work_fetch[i].copy_request(coprocs.intel_gpu);
428 break;
429 default:
430 rsc_work_fetch[i].copy_request(coprocs.coprocs[i]);
431 break;
432 }
433 }
434 }
435
print_state()436 void WORK_FETCH::print_state() {
437 msg_printf(0, MSG_INFO, "[work_fetch] ------- start work fetch state -------");
438 msg_printf(0, MSG_INFO, "[work_fetch] target work buffer: %.2f + %.2f sec",
439 gstate.work_buf_min(), gstate.work_buf_additional()
440 );
441 msg_printf(0, MSG_INFO, "[work_fetch] --- project states ---");
442 for (unsigned int i=0; i<gstate.projects.size(); i++) {
443 PROJECT* p = gstate.projects[i];
444 p->pwf.print_state(p);
445 }
446 for (int i=0; i<coprocs.n_rsc; i++) {
447 rsc_work_fetch[i].print_state(rsc_name_long(i));
448 }
449 msg_printf(0, MSG_INFO, "[work_fetch] ------- end work fetch state -------");
450 }
451
clear_request()452 void WORK_FETCH::clear_request() {
453 for (int i=0; i<coprocs.n_rsc; i++) {
454 rsc_work_fetch[i].clear_request();
455 }
456 }
457
requested_work()458 bool WORK_FETCH::requested_work() {
459 for (int i=0; i<coprocs.n_rsc; i++) {
460 if (rsc_work_fetch[i].req_secs) return true;
461 }
462 return false;
463 }
464
465 // We're going to contact this project for reasons other than work fetch
466 // (e.g., to report completed results, or at user request).
467 // Decide if we should "piggyback" a work fetch request.
468 //
piggyback_work_request(PROJECT * p)469 void WORK_FETCH::piggyback_work_request(PROJECT* p) {
470 WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback_work_request()");)
471 clear_request();
472 if (cc_config.fetch_minimal_work && gstate.had_or_requested_work) return;
473 if (p->non_cpu_intensive) {
474 if (!has_a_job_in_progress(p) && !p->dont_request_more_work) {
475 rsc_work_fetch[0].req_secs = 1;
476 }
477 return;
478 }
479
480 setup();
481
482 switch (p->pwf.project_reason) {
483 case 0:
484 case CANT_FETCH_WORK_MIN_RPC_TIME:
485 break;
486 default:
487 return;
488 }
489
490 // if project was updated from manager and config says so,
491 // fetch work for a resource even if there are higher-prio projects
492 // able to fetch it
493 //
494 bool check_higher_priority_projects = true;
495 if (p->sched_rpc_pending && cc_config.fetch_on_update) {
496 check_higher_priority_projects = false;
497 }
498
499 // For each resource, scan projects in decreasing priority,
500 // seeing if there's one that's higher-priority than this
501 // able to fetch work for the resource.
502 // If not, and the resource needs topping off, do so
503 //
504 for (int i=0; i<coprocs.n_rsc; i++) {
505 WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: resource %s", rsc_name_long(i));)
506 RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
507 if (i && !gpus_usable) {
508 rwf.dont_fetch_reason = DONT_FETCH_GPUS_NOT_USABLE;
509 continue;
510 }
511 RSC_PROJECT_WORK_FETCH& rpwf = rwf.project_state(p);
512 switch (rpwf.rsc_project_reason) {
513 case 0:
514 case DONT_FETCH_BACKED_OFF:
515 break;
516 default:
517 WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: can't fetch %s: %s", rsc_name_long(i), rsc_project_reason_string(rpwf.rsc_project_reason));)
518 continue;
519 }
520 bool buffer_low = (rwf.saturated_time < gstate.work_buf_total());
521 bool need_work = buffer_low;
522 if (rwf.has_exclusions && rwf.uses_starved_excluded_instances(p)) {
523 need_work = true;
524 }
525 if (!need_work) {
526 WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: don't need %s", rsc_name_long(i));)
527 rwf.dont_fetch_reason = DONT_FETCH_BUFFER_FULL;
528 continue;
529 }
530 if (check_higher_priority_projects) {
531 PROJECT* p2 = NULL;
532 for (unsigned int j=0; j<projects_sorted.size(); j++) {
533 p2 = projects_sorted[j];
534 if (p2 == p) break;
535 if (p2->sched_priority == p->sched_priority) continue;
536 if (p2->pwf.project_reason) {
537 WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: %s can't fetch work", p2->project_name);)
538 continue;
539 }
540 RSC_PROJECT_WORK_FETCH& rpwf2 = rwf.project_state(p2);
541 if (!rpwf2.rsc_project_reason) {
542 WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: better proj %s", p2->project_name);)
543 break;
544 }
545 }
546 if (p != p2) {
547 rwf.dont_fetch_reason = DONT_FETCH_NOT_HIGHEST_PRIO;
548 continue;
549 }
550 }
551 WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: requesting %s", rsc_name_long(i));)
552 if (buffer_low) {
553 rwf.set_request(p);
554 } else {
555 rwf.set_request_excluded(p);
556 }
557 }
558 if (!requested_work()) {
559 p->pwf.project_reason = CANT_FETCH_WORK_DONT_NEED;
560 }
561 }
562
563 // see if there's a fetchable non-CPU-intensive project without work
564 //
non_cpu_intensive_project_needing_work()565 PROJECT* WORK_FETCH::non_cpu_intensive_project_needing_work() {
566 for (unsigned int i=0; i<gstate.projects.size(); i++) {
567 PROJECT* p = gstate.projects[i];
568 if (!p->non_cpu_intensive) continue;
569 if (!p->can_request_work()) continue;
570 if (p->rsc_pwf[0].backoff_time > gstate.now) continue;
571 if (has_a_job_in_progress(p)) continue;
572 clear_request();
573 rsc_work_fetch[0].req_secs = 1;
574 return p;
575 }
576 return 0;
577 }
578
higher_priority(PROJECT * p1,PROJECT * p2)579 static bool higher_priority(PROJECT *p1, PROJECT *p2) {
580 return (p1->sched_priority > p2->sched_priority);
581 }
582
583 // return true if there is exclusion starvation
584 // and this project can use the starved instances
585 //
uses_starved_excluded_instances(PROJECT * p)586 bool RSC_WORK_FETCH::uses_starved_excluded_instances(PROJECT* p) {
587 RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
588 if (!sim_excluded_instances) return false;
589 if ((sim_excluded_instances & rpwf.non_excluded_instances) == 0) {
590 WF_DEBUG(msg_printf(p, MSG_INFO, "skip: excl");)
591 return false;
592 }
593 return true;
594 }
595
596 // setup for choose_project() and piggyback()
597 //
setup()598 void WORK_FETCH::setup() {
599 gstate.compute_nuploading_results();
600
601 rr_simulation();
602
603 // Compute rsc_project_reason.
604 // Must do this after rr_simulation() because the logic for
605 // zero-resource-share projects uses #idle instances
606 //
607 for (unsigned int i=0; i<gstate.projects.size(); i++) {
608 PROJECT* p = gstate.projects[i];
609 for (int j=0; j<coprocs.n_rsc; j++) {
610 RSC_PROJECT_WORK_FETCH& rpwf = p->rsc_pwf[j];
611 rpwf.rsc_project_reason = rpwf.compute_rsc_project_reason(p, j);
612 }
613 }
614
615 compute_shares();
616 project_priority_init(true);
617 clear_request();
618
619 // Decrement the priority of projects that have work queued.
620 // Specifically, subtract
621 // (FLOPs queued for P)/(FLOPs of max queue)
622 // which will generally be between 0 and 1.
623 // This is a little arbitrary but I can't think of anything better.
624 //
625 double max_queued_flops = gstate.work_buf_total()*total_peak_flops();
626 for (unsigned int i=0; i<gstate.results.size(); i++) {
627 RESULT* rp = gstate.results[i];
628 PROJECT* p = rp->project;
629 p->sched_priority -= rp->estimated_flops_remaining()/max_queued_flops;
630 }
631
632 // don't request work from projects w/ > 1000 runnable jobs
633 //
634 int job_limit = 1000;
635 for (unsigned int i=0; i<gstate.projects.size(); i++) {
636 PROJECT* p = gstate.projects[i];
637 if (p->pwf.n_runnable_jobs > job_limit && !p->pwf.project_reason) {
638 p->pwf.project_reason = CANT_FETCH_WORK_TOO_MANY_RUNNABLE;
639 }
640 }
641
642 projects_sorted = gstate.projects;
643 std::sort(
644 projects_sorted.begin(),
645 projects_sorted.end(),
646 higher_priority
647 );
648 if (log_flags.work_fetch_debug) {
649 print_state();
650 }
651 }
652
653 // Choose a project to fetch work from,
654 // and set the request fields of resource objects.
655 // Set p->sched_rpc_pending; if you decide not to request work
656 // from the project, you must clear this.
657 //
choose_project()658 PROJECT* WORK_FETCH::choose_project() {
659 PROJECT* p;
660
661 p = non_cpu_intensive_project_needing_work();
662 if (p) return p;
663
664 setup();
665
666 for (int i=0; i<coprocs.n_rsc; i++) {
667 rsc_work_fetch[i].found_project = NULL;
668 }
669
670 // scan projects in order of decreasing priority
671 //
672 bool found = false;
673 for (unsigned int j=0; j<projects_sorted.size(); j++) {
674 p = projects_sorted[j];
675 WF_DEBUG(msg_printf(p, MSG_INFO, "scanning");)
676 if (p->pwf.project_reason) {
677 WF_DEBUG(msg_printf(p, MSG_INFO, "skip: cfwr %d", p->pwf.project_reason);)
678 continue;
679 }
680
681 // For each resource type:
682 // - See if we can ask this project for work of that type;
683 // if so set a flag so that lower-priority projects
684 // won't request it
685 // - If so, see if work is needed for this type;
686 // if so, set "found_project" flag
687 //
688 int rsc_index = -1;
689 for (int i=0; i<coprocs.n_rsc; i++) {
690 if (i && !gpus_usable) continue;
691 RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
692 RSC_PROJECT_WORK_FETCH& rpwf = rwf.project_state(p);
693 if (!rpwf.rsc_project_reason) {
694 if (!rwf.found_project) {
695 rwf.found_project = p;
696 }
697 WF_DEBUG(msg_printf(p, MSG_INFO, "can fetch %s", rsc_name_long(i));)
698 } else {
699 WF_DEBUG(msg_printf(p, MSG_INFO, "can't fetch %s: %s", rsc_name_long(i), rsc_project_reason_string(rpwf.rsc_project_reason));)
700 continue;
701 }
702 if (rwf.saturated_time < gstate.work_buf_min()) {
703 WF_DEBUG(msg_printf(p, MSG_INFO, "%s needs work - buffer low", rsc_name_long(i));)
704 rsc_index = i;
705 break;
706 }
707 if (rwf.has_exclusions && rwf.uses_starved_excluded_instances(p)) {
708 WF_DEBUG(msg_printf(p, MSG_INFO, "%s needs work - excluded instance starved", rsc_name_long(i));)
709 rsc_index = i;
710 break;
711 }
712 }
713
714 // If rsc_index is nonzero, it's a resource that this project
715 // can ask for work, and which needs work.
716 // And this is the highest-priority project having this property.
717 // Request work from this resource,
718 // and any others for which this is the highest-priority project
719 // able to request work
720 //
721 if (rsc_index >= 0) {
722 bool any_request = false;
723 for (int i=0; i<coprocs.n_rsc; i++) {
724 if (i && !gpus_usable) continue;
725 RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
726 bool buffer_low;
727 WF_DEBUG(msg_printf(p, MSG_INFO, "checking %s", rsc_name_long(i));)
728 if (i == rsc_index) {
729 buffer_low = (rwf.saturated_time < gstate.work_buf_min());
730 } else {
731 if (rwf.found_project && rwf.found_project != p) {
732 WF_DEBUG(msg_printf(p, MSG_INFO, "%s not high prio proj", rsc_name_long(i));)
733 continue;
734 }
735 buffer_low = (rwf.saturated_time < gstate.work_buf_total());
736 bool need_work = buffer_low;
737 if (rwf.has_exclusions && rwf.uses_starved_excluded_instances(p)) {
738 need_work = true;
739 }
740 if (!need_work) {
741 WF_DEBUG(msg_printf(p, MSG_INFO, "%s don't need", rsc_name_long(i));)
742 continue;
743 }
744 RSC_PROJECT_WORK_FETCH& rpwf = rwf.project_state(p);
745 int reason = rpwf.rsc_project_reason;
746 switch (reason) {
747 case 0:
748 case DONT_FETCH_BACKED_OFF:
749 // request even if backed off - no reason not to.
750 //
751 break;
752 default:
753 WF_DEBUG(msg_printf(p, MSG_INFO, "%s can't fetch: %s", rsc_name_long(i), rsc_project_reason_string(reason));)
754 continue;
755 }
756 }
757 if (buffer_low) {
758 rwf.set_request(p);
759 WF_DEBUG(msg_printf(p, MSG_INFO, "%s set_request: %f", rsc_name_long(i), rwf.req_secs);)
760 } else {
761 rwf.set_request_excluded(p);
762 WF_DEBUG(msg_printf(p, MSG_INFO, "%s set_request_excluded: %f", rsc_name_long(i), rwf.req_secs);)
763 }
764 if (rwf.req_secs > 0) {
765 any_request = true;
766 }
767 }
768 if (any_request) {
769 found = true;
770 break;
771 }
772 }
773 }
774
775 if (found) {
776 p->sched_rpc_pending = RPC_REASON_NEED_WORK;
777 } else {
778 if (log_flags.work_fetch_debug) {
779 msg_printf(0, MSG_INFO, "[work_fetch] No project chosen for work fetch");
780 }
781 p = NULL;
782 }
783
784 return p;
785 }
786
787 // estimate the amount of CPU and GPU time this task has got
788 // in last dt sec, and add to project totals
789 //
accumulate_inst_sec(ACTIVE_TASK * atp,double dt)790 void WORK_FETCH::accumulate_inst_sec(ACTIVE_TASK* atp, double dt) {
791 APP_VERSION* avp = atp->result->avp;
792 PROJECT* p = atp->result->project;
793 double x = dt*avp->avg_ncpus;
794 p->rsc_pwf[0].secs_this_rec_interval += x;
795 rsc_work_fetch[0].secs_this_rec_interval += x;
796 int rt = avp->gpu_usage.rsc_type;
797 if (rt) {
798 x = dt*avp->gpu_usage.usage;
799 p->rsc_pwf[rt].secs_this_rec_interval += x;
800 rsc_work_fetch[rt].secs_this_rec_interval += x;
801 }
802 }
803
804 // find total and per-project resource shares for each resource
805 //
compute_shares()806 void WORK_FETCH::compute_shares() {
807 unsigned int i;
808 PROJECT* p;
809 for (i=0; i<gstate.projects.size(); i++) {
810 p = gstate.projects[i];
811 if (p->non_cpu_intensive) continue;
812 if (p->pwf.project_reason) continue;
813 for (int j=0; j<coprocs.n_rsc; j++) {
814 if (!p->rsc_pwf[j].rsc_project_reason) {
815 rsc_work_fetch[j].total_fetchable_share += p->resource_share;
816 }
817 }
818 }
819 for (i=0; i<gstate.projects.size(); i++) {
820 p = gstate.projects[i];
821 if (p->non_cpu_intensive) continue;
822 if (p->pwf.project_reason) continue;
823 for (int j=0; j<coprocs.n_rsc; j++) {
824 if (!p->rsc_pwf[j].rsc_project_reason) {
825 p->rsc_pwf[j].fetchable_share = rsc_work_fetch[j].total_fetchable_share?p->resource_share/rsc_work_fetch[j].total_fetchable_share:1;
826 }
827 }
828 }
829 }
830
request_string(char * buf,int len)831 void WORK_FETCH::request_string(char* buf, int len) {
832 char buf2[256];
833 snprintf(buf, len,
834 "[work_fetch] request: CPU (%.2f sec, %.2f inst)",
835 rsc_work_fetch[0].req_secs, rsc_work_fetch[0].req_instances
836 );
837 for (int i=1; i<coprocs.n_rsc; i++) {
838 snprintf(buf2, sizeof(buf2),
839 " %s (%.2f sec, %.2f inst)",
840 rsc_name_long(i), rsc_work_fetch[i].req_secs, rsc_work_fetch[i].req_instances
841 );
842 strlcat(buf, buf2, len);
843 }
844 }
845
write_request(FILE * f,PROJECT * p)846 void WORK_FETCH::write_request(FILE* f, PROJECT* p) {
847 double work_req = rsc_work_fetch[0].req_secs;
848
849 // if project is anonymous platform, set the overall work req
850 // to the max of the requests of resource types for which we have versions.
851 // Otherwise projects with old schedulers won't send us work.
852 // THIS CAN BE REMOVED AT SOME POINT
853 //
854 if (p->anonymous_platform) {
855 for (int i=1; i<coprocs.n_rsc; i++) {
856 if (has_coproc_app(p, i)) {
857 if (rsc_work_fetch[i].req_secs > work_req) {
858 work_req = rsc_work_fetch[i].req_secs;
859 }
860 }
861 }
862 }
863 fprintf(f,
864 " <work_req_seconds>%f</work_req_seconds>\n"
865 " <cpu_req_secs>%f</cpu_req_secs>\n"
866 " <cpu_req_instances>%f</cpu_req_instances>\n"
867 " <estimated_delay>%f</estimated_delay>\n",
868 work_req,
869 rsc_work_fetch[0].req_secs,
870 rsc_work_fetch[0].req_instances,
871 rsc_work_fetch[0].req_secs?rsc_work_fetch[0].busy_time_estimator.get_busy_time():0
872 );
873 if (log_flags.work_fetch_debug) {
874 char buf[256];
875 request_string(buf, sizeof(buf));
876 msg_printf(p, MSG_INFO, "%s", buf);
877 }
878 }
879
880 // we just got a scheduler reply with the given jobs; update backoffs
881 //
handle_reply(PROJECT * p,SCHEDULER_REPLY *,vector<RESULT * > new_results)882 void WORK_FETCH::handle_reply(
883 PROJECT* p, SCHEDULER_REPLY*, vector<RESULT*> new_results
884 ) {
885 bool got_work[MAX_RSC];
886 bool requested_work_rsc[MAX_RSC];
887 for (int i=0; i<coprocs.n_rsc; i++) {
888 got_work[i] = false;
889 requested_work_rsc[i] = (rsc_work_fetch[i].req_secs > 0);
890 }
891 for (unsigned int i=0; i<new_results.size(); i++) {
892 RESULT* rp = new_results[i];
893 got_work[rp->avp->gpu_usage.rsc_type] = true;
894 }
895
896 for (int i=0; i<coprocs.n_rsc; i++) {
897 // back off on a resource type if
898 // - we asked for jobs
899 // - we didn't get any
900 // - we're not currently backed off for that type
901 // (i.e. don't back off because of a piggyback request)
902 // - the RPC was done for a reason that is automatic
903 // and potentially frequent
904 //
905 if (requested_work_rsc[i] && !got_work[i]) {
906 if (p->rsc_pwf[i].backoff_time < gstate.now) {
907 switch (p->sched_rpc_pending) {
908 case RPC_REASON_RESULTS_DUE:
909 case RPC_REASON_NEED_WORK:
910 case RPC_REASON_TRICKLE_UP:
911 p->rsc_pwf[i].resource_backoff(p, rsc_name_long(i));
912 }
913 }
914 }
915 // if we did get jobs, clear backoff
916 //
917 if (got_work[i]) {
918 p->rsc_pwf[i].clear_backoff();
919 }
920 }
921 p->pwf.request_if_idle_and_uploading = false;
922 }
923
924 // set up for initial RPC.
925 // Ask for just 1 job per instance,
926 // since we don't have good runtime estimates yet
927 //
set_initial_work_request(PROJECT * p)928 void WORK_FETCH::set_initial_work_request(PROJECT* p) {
929 clear_request();
930 for (int i=0; i<coprocs.n_rsc; i++) {
931 if (p->resource_share > 0 && !p->dont_request_more_work) {
932 rsc_work_fetch[i].req_secs = 1;
933 if (i) {
934 RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
935 if (rwf.ninstances == p->rsc_pwf[i].ncoprocs_excluded) {
936 rsc_work_fetch[i].req_secs = 0;
937 }
938 }
939 }
940 rsc_work_fetch[i].busy_time_estimator.reset();
941 }
942 }
943
944 // called once, at client startup
945 //
init()946 void WORK_FETCH::init() {
947 rsc_work_fetch[0].init(0, gstate.ncpus, 1);
948 double cpu_flops = gstate.host_info.p_fpops;
949
950 // use 20% as a rough estimate of GPU efficiency
951
952 for (int i=1; i<coprocs.n_rsc; i++) {
953 rsc_work_fetch[i].init(
954 i, coprocs.coprocs[i].count,
955 coprocs.coprocs[i].count*0.2*coprocs.coprocs[i].peak_flops/cpu_flops
956 );
957 }
958
959 // see what resources anon platform projects can use
960 //
961 unsigned int i, j;
962 for (i=0; i<gstate.projects.size(); i++) {
963 PROJECT* p = gstate.projects[i];
964 if (!p->anonymous_platform) continue;
965 for (int k=0; k<coprocs.n_rsc; k++) {
966 p->rsc_pwf[k].anonymous_platform_no_apps = true;
967 }
968 for (j=0; j<gstate.app_versions.size(); j++) {
969 APP_VERSION* avp = gstate.app_versions[j];
970 if (avp->project != p) continue;
971 p->rsc_pwf[avp->gpu_usage.rsc_type].anonymous_platform_no_apps = false;
972 }
973 }
974 }
975
976 // clear backoff for app's resource
977 //
clear_backoffs(APP_VERSION & av)978 void WORK_FETCH::clear_backoffs(APP_VERSION& av) {
979 av.project->rsc_pwf[av.gpu_usage.rsc_type].clear_backoff();
980 }
981
982 ////////////////////////
983
compute_nuploading_results()984 void CLIENT_STATE::compute_nuploading_results() {
985 unsigned int i;
986
987 for (i=0; i<projects.size(); i++) {
988 projects[i]->nuploading_results = 0;
989 projects[i]->too_many_uploading_results = false;
990 }
991 for (i=0; i<results.size(); i++) {
992 RESULT* rp = results[i];
993 if (rp->state() == RESULT_FILES_UPLOADING) {
994 rp->project->nuploading_results++;
995 }
996 }
997 int n = gstate.ncpus;
998 for (int j=1; j<coprocs.n_rsc; j++) {
999 if (coprocs.coprocs[j].count > n) {
1000 n = coprocs.coprocs[j].count;
1001 }
1002 }
1003 n *= 2;
1004 for (i=0; i<projects.size(); i++) {
1005 if (projects[i]->nuploading_results > n) {
1006 projects[i]->too_many_uploading_results = true;
1007 }
1008 }
1009 }
1010
1011 // Returns the estimated total elapsed time of this task.
1012 // Compute this as a weighted average of estimates based on
1013 // 1) the workunit's flops count (static estimate)
1014 // 2) the current elapsed time and fraction done (dynamic estimate)
1015 //
est_dur()1016 double ACTIVE_TASK::est_dur() {
1017 if (fraction_done >= 1) return elapsed_time;
1018 double wu_est = result->estimated_runtime();
1019 if (fraction_done <= 0) {
1020 if (elapsed_time > 0) {
1021 // if app is running but hasn't reported fraction done,
1022 // use the fraction-done guesstimate from ACTIVE_TASK::write_gui()
1023 //
1024 double fd = 1 - exp(-elapsed_time/wu_est);
1025 return elapsed_time/fd;
1026 } else {
1027 return wu_est;
1028 }
1029 }
1030 bool exceeded_wu_est = (elapsed_time > wu_est);
1031 if (exceeded_wu_est) wu_est = elapsed_time;
1032 double frac_est = fraction_done_elapsed_time / fraction_done;
1033
1034 // if app says fraction done is accurate, just use it
1035 // also use it if static estimate has already been exceeded
1036 //
1037 if (result->app->fraction_done_exact || exceeded_wu_est) return frac_est;
1038
1039 // weighting of dynamic estimate is the fraction done
1040 // i.e. when fraction done is 0.5, weighting is 50/50
1041 //
1042 double fd_weight = fraction_done;
1043 double wu_weight = 1 - fd_weight;
1044 double x = fd_weight*frac_est + wu_weight*wu_est;
1045 #if 0
1046 //if (log_flags.rr_simulation) {
1047 msg_printf(result->project, MSG_INFO,
1048 "[rr_sim] %s frac_est %f = %f/%f",
1049 result->name, frac_est, fraction_done_elapsed_time, fraction_done
1050 );
1051 msg_printf(result->project, MSG_INFO,
1052 "[rr_sim] %s dur: %.2f = %.3f*%.2f + %.3f*%.2f",
1053 result->name, x, fd_weight, frac_est, wu_weight, wu_est
1054 );
1055 //}
1056 #endif
1057 return x;
1058 }
1059
1060 // the fraction of time BOINC is processing
1061 //
overall_cpu_frac()1062 double CLIENT_STATE::overall_cpu_frac() {
1063 double x = time_stats.on_frac * time_stats.active_frac;
1064 if (x < 0.01) x = 0.01;
1065 if (x > 1) x = 1;
1066 return x;
1067 }
overall_gpu_frac()1068 double CLIENT_STATE::overall_gpu_frac() {
1069 double x = time_stats.on_frac * time_stats.gpu_active_frac;
1070 if (x < 0.01) x = 0.01;
1071 if (x > 1) x = 1;
1072 return x;
1073 }
overall_cpu_and_network_frac()1074 double CLIENT_STATE::overall_cpu_and_network_frac() {
1075 double x = time_stats.on_frac * time_stats.cpu_and_network_available_frac;
1076 if (x < 0.01) x = 0.01;
1077 if (x > 1) x = 1;
1078 return x;
1079 }
1080
1081 // called when benchmarks change
1082 //
scale_duration_correction_factors(double factor)1083 void CLIENT_STATE::scale_duration_correction_factors(double factor) {
1084 if (factor <= 0) return;
1085 for (unsigned int i=0; i<projects.size(); i++) {
1086 PROJECT* p = projects[i];
1087 if (p->dont_use_dcf) continue;
1088 p->duration_correction_factor *= factor;
1089 }
1090 if (log_flags.dcf_debug) {
1091 msg_printf(NULL, MSG_INFO,
1092 "[dcf] scaling all duration correction factors by %f",
1093 factor
1094 );
1095 }
1096 }
1097
1098 // Choose a new host CPID.
1099 // If using account manager, do scheduler RPCs
1100 // to all acct-mgr-attached projects to propagate the CPID
1101 //
generate_new_host_cpid()1102 void CLIENT_STATE::generate_new_host_cpid() {
1103 host_info.generate_host_cpid();
1104 for (unsigned int i=0; i<projects.size(); i++) {
1105 if (projects[i]->attached_via_acct_mgr) {
1106 projects[i]->sched_rpc_pending = RPC_REASON_ACCT_MGR_REQ;
1107 projects[i]->set_min_rpc_time(now + 15, "Sending new host CPID");
1108 }
1109 }
1110 }
1111
rsc_project_reason_string(int reason)1112 const char* rsc_project_reason_string(int reason) {
1113 switch (reason) {
1114 case 0: return "";
1115 case DONT_FETCH_GPUS_NOT_USABLE: return "GPUs not usable";
1116 case DONT_FETCH_PREFS: return "blocked by project preferences";
1117 case DONT_FETCH_CONFIG: return "client configuration";
1118 case DONT_FETCH_NO_APPS: return "no applications";
1119 case DONT_FETCH_AMS: return "account manager prefs";
1120 case DONT_FETCH_ZERO_SHARE: return "zero resource share";
1121 case DONT_FETCH_BUFFER_FULL: return "job cache full";
1122 case DONT_FETCH_NOT_HIGHEST_PRIO: return "not highest priority project";
1123 case DONT_FETCH_BACKED_OFF: return "project is backed off";
1124 case DONT_FETCH_DEFER_SCHED: return "a job is deferred";
1125 }
1126 return "unknown project reason";
1127 }
1128
project_reason_string(PROJECT * p,char * buf,int len)1129 const char* project_reason_string(PROJECT* p, char* buf, int len) {
1130 switch (p->pwf.project_reason) {
1131 case 0: return "";
1132 case CANT_FETCH_WORK_NON_CPU_INTENSIVE:
1133 return "non CPU intensive";
1134 case CANT_FETCH_WORK_SUSPENDED_VIA_GUI:
1135 return "suspended via Manager";
1136 case CANT_FETCH_WORK_MASTER_URL_FETCH_PENDING:
1137 return "master URL fetch pending";
1138 case CANT_FETCH_WORK_MIN_RPC_TIME:
1139 return "scheduler RPC backoff";
1140 case CANT_FETCH_WORK_DONT_REQUEST_MORE_WORK:
1141 return "\"no new tasks\" requested via Manager";
1142 case CANT_FETCH_WORK_DOWNLOAD_STALLED:
1143 return "some download is stalled";
1144 case CANT_FETCH_WORK_RESULT_SUSPENDED:
1145 return "some task is suspended via Manager";
1146 case CANT_FETCH_WORK_TOO_MANY_UPLOADS:
1147 return "too many uploads in progress";
1148 case CANT_FETCH_WORK_NOT_HIGHEST_PRIORITY:
1149 return "project is not highest priority";
1150 case CANT_FETCH_WORK_TOO_MANY_RUNNABLE:
1151 return "too many runnable tasks";
1152 case CANT_FETCH_WORK_DONT_NEED:
1153 if (coprocs.n_rsc == 1) {
1154 snprintf(buf, len,
1155 "don't need (%s)",
1156 rsc_project_reason_string(rsc_work_fetch[0].dont_fetch_reason)
1157 );
1158 } else {
1159 string x;
1160 x = "don't need (";
1161 for (int i=0; i<coprocs.n_rsc; i++) {
1162 char buf2[256];
1163 snprintf(buf2, sizeof(buf2),
1164 "%s: %s",
1165 rsc_name_long(i),
1166 rsc_project_reason_string(rsc_work_fetch[i].dont_fetch_reason)
1167 );
1168 x += buf2;
1169 if (i < coprocs.n_rsc-1) {
1170 x += "; ";
1171 }
1172 }
1173 x += ")";
1174 strlcpy(buf, x.c_str(), len);
1175 }
1176 return buf;
1177 }
1178 return "unknown reason";
1179 }
1180