1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2014 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17 
18 #include "cpp.h"
19 
20 #ifdef _WIN32
21 #include "boinc_win.h"
22 #else
23 #include "config.h"
24 #include <cmath>
25 #endif
26 
27 #ifdef _MSC_VER
28 #define snprintf _snprintf
29 #endif
30 
31 #include "util.h"
32 #include "str_replace.h"
33 
34 #include "client_msgs.h"
35 #include "client_state.h"
36 #include "project.h"
37 #include "result.h"
38 #include "scheduler_op.h"
39 
40 #include "work_fetch.h"
41 
42 #if 0
43 #define WF_DEBUG(x) x
44 #else
45 #define WF_DEBUG(X)
46 #endif
47 
48 using std::vector;
49 
50 RSC_WORK_FETCH rsc_work_fetch[MAX_RSC];
51 WORK_FETCH work_fetch;
52 
53 // does the (NCI) project have a job that's running or uploading?
54 // (don't request another job from NCI project if so)
55 //
has_a_job_in_progress(PROJECT * p)56 static bool has_a_job_in_progress(PROJECT* p) {
57     for (unsigned int j=0; j<gstate.results.size(); j++) {
58         RESULT* rp = gstate.results[j];
59         if (rp->project != p) continue;
60         if (rp->state() < RESULT_FILES_UPLOADED) {
61             return true;
62         }
63     }
64     return false;
65 }
66 
has_coproc_app(PROJECT * p,int rsc_type)67 inline bool has_coproc_app(PROJECT* p, int rsc_type) {
68     unsigned int i;
69     for (i=0; i<gstate.app_versions.size(); i++) {
70         APP_VERSION* avp = gstate.app_versions[i];
71         if (avp->project != p) continue;
72         if (avp->gpu_usage.rsc_type == rsc_type) return true;
73     }
74     return false;
75 }
76 
77 ///////////////  RSC_PROJECT_WORK_FETCH  ///////////////
78 
rr_init()79 void RSC_PROJECT_WORK_FETCH::rr_init() {
80     fetchable_share = 0;
81     n_runnable_jobs = 0;
82     sim_nused = 0;
83     nused_total = 0;
84     deadlines_missed = 0;
85 }
86 
resource_backoff(PROJECT * p,const char * name)87 void RSC_PROJECT_WORK_FETCH::resource_backoff(PROJECT* p, const char* name) {
88     if (backoff_interval) {
89         backoff_interval *= 2;
90         if (backoff_interval > WF_MAX_BACKOFF_INTERVAL) backoff_interval = WF_MAX_BACKOFF_INTERVAL;
91     } else {
92         backoff_interval = WF_MIN_BACKOFF_INTERVAL;
93     }
94     double x = (.5 + drand())*backoff_interval;
95     backoff_time = gstate.now + x;
96     if (log_flags.work_fetch_debug) {
97         msg_printf(p, MSG_INFO,
98             "[work_fetch] backing off %s %.0f sec", name, x
99         );
100     }
101 }
102 
103 // checks for whether we should ask this project for work of this type.
104 // check for backoff must go last, so that if that's the reason
105 // we know that there are no other reasons (for piggyback)
106 //
compute_rsc_project_reason(PROJECT * p,int rsc_type)107 int RSC_PROJECT_WORK_FETCH::compute_rsc_project_reason(
108     PROJECT *p, int rsc_type
109 ) {
110     RSC_WORK_FETCH& rwf = rsc_work_fetch[rsc_type];
111     // see whether work fetch for this resource is banned
112     // by prefs, config, project, or acct mgr
113     //
114     if (p->no_rsc_pref[rsc_type]) return DONT_FETCH_PREFS;
115     if (p->no_rsc_config[rsc_type]) return DONT_FETCH_CONFIG;
116     if (p->no_rsc_apps[rsc_type]) return DONT_FETCH_NO_APPS;
117     if (p->no_rsc_ams[rsc_type]) return DONT_FETCH_AMS;
118     if (p->rsc_pwf[rsc_type].has_deferred_job) return DONT_FETCH_DEFER_SCHED;
119 
120     // if project has zero resource share,
121     // only fetch work if a device is idle
122     //
123     if (p->resource_share == 0 && rwf.nidle_now == 0) {
124         return DONT_FETCH_ZERO_SHARE;
125     }
126 
127     // if project has excluded GPUs of this type,
128     // we need to avoid fetching work just because there's an idle instance
129     // or a shortfall;
130     // fetching work might not alleviate either of these,
131     // and we'd end up fetching unbounded work.
132     // At the same time, we want to respect work buf params if possible.
133     //
134     // Current policy:
135     // don't fetch work if remaining time of this project's jobs
136     // exceeds work_buf_min * (#usable instances / #instances)
137     //
138     // TODO: THIS IS FAIRLY CRUDE. Making it smarter would require
139     // computing shortfall etc. on a per-project basis
140     //
141     int nexcl = ncoprocs_excluded;
142     if (rsc_type && nexcl) {
143         int n_not_excluded = rwf.ninstances - nexcl;
144         if (n_runnable_jobs >= n_not_excluded
145             && queue_est > (gstate.work_buf_min() * n_not_excluded)/rwf.ninstances
146         ) {
147             return DONT_FETCH_BUFFER_FULL;
148         }
149     }
150 
151     if (anonymous_platform_no_apps) {
152         return DONT_FETCH_NO_APPS;
153     }
154 
155     // this must go last
156     //
157     if (backoff_time > gstate.now) {
158         return DONT_FETCH_BACKED_OFF;
159     }
160     return 0;
161 }
162 
163 ///////////////  RSC_WORK_FETCH  ///////////////
164 
copy_request(COPROC & c)165 void RSC_WORK_FETCH::copy_request(COPROC& c) {
166     c.req_secs = req_secs;
167     c.req_instances = req_instances;
168     c.estimated_delay =  req_secs?busy_time_estimator.get_busy_time():0;
169 }
170 
project_state(PROJECT * p)171 RSC_PROJECT_WORK_FETCH& RSC_WORK_FETCH::project_state(PROJECT* p) {
172     return p->rsc_pwf[rsc_type];
173 }
174 
rr_init()175 void RSC_WORK_FETCH::rr_init() {
176     shortfall = 0;
177     nidle_now = 0;
178     sim_nused = 0;
179     total_fetchable_share = 0;
180     deadline_missed_instances = 0;
181     saturated_time = 0;
182     busy_time_estimator.reset();
183     sim_used_instances = 0;
184 }
185 
update_stats(double sim_now,double dt,double buf_end)186 void RSC_WORK_FETCH::update_stats(double sim_now, double dt, double buf_end) {
187     double idle = ninstances - sim_nused;
188     if (idle > 1e-6 && sim_now < buf_end) {
189         double dt2;
190         if (sim_now + dt > buf_end) {
191             dt2 = buf_end - sim_now;
192         } else {
193             dt2 = dt;
194         }
195         shortfall += idle*dt2;
196     }
197     if (idle < 1e-6) {
198         saturated_time = sim_now + dt - gstate.now;
199     }
200 }
201 
update_busy_time(double dur,double nused)202 void RSC_WORK_FETCH::update_busy_time(double dur, double nused) {
203     busy_time_estimator.update(dur, nused);
204 }
205 
wacky_dcf(PROJECT * p)206 static bool wacky_dcf(PROJECT* p) {
207     if (p->dont_use_dcf) return false;
208     double dcf = p->duration_correction_factor;
209     return (dcf < 0.02 || dcf > 80.0);
210 }
211 
212 // request this project's share of shortfall and instances.
213 // don't request anything if project is backed off.
214 //
set_request(PROJECT * p)215 void RSC_WORK_FETCH::set_request(PROJECT* p) {
216 
217     // if backup project, fetch 1 job per idle instance
218     //
219     if (p->resource_share == 0) {
220         req_instances = nidle_now;
221         req_secs = 1;
222         return;
223     }
224     if (cc_config.fetch_minimal_work) {
225         req_instances = ninstances;
226         req_secs = 1;
227         return;
228     }
229     RSC_PROJECT_WORK_FETCH& w = project_state(p);
230     double non_excl_inst = ninstances - w.ncoprocs_excluded;
231     if (shortfall) {
232         if (wacky_dcf(p)) {
233             // if project's DCF is too big or small,
234             // its completion time estimates are useless; just ask for 1 second
235             //
236             req_secs = 1;
237         } else {
238             req_secs = shortfall;
239             if (w.ncoprocs_excluded) {
240                 req_secs *= non_excl_inst/ninstances;
241             }
242         }
243     }
244 
245     double instance_share = ninstances*w.fetchable_share;
246     if (instance_share > non_excl_inst) {
247         instance_share = non_excl_inst;
248     }
249     instance_share -= w.nused_total;
250     req_instances = std::max(nidle_now, instance_share);
251 
252     if (log_flags.work_fetch_debug) {
253         msg_printf(p, MSG_INFO,
254             "[work_fetch] set_request() for %s: ninst %d nused_total %.2f nidle_now %.2f fetch share %.2f req_inst %.2f req_secs %.2f",
255             rsc_name_long(rsc_type), ninstances, w.nused_total, nidle_now,
256             w.fetchable_share, req_instances, req_secs
257         );
258     }
259     if (req_instances && !req_secs) {
260         req_secs = 1;
261     }
262 }
263 
264 // We're fetching work because some instances are starved because
265 // of exclusions.
266 // See how many N of these instances are not excluded for this project.
267 // Ask for N instances and for N*work_buf_min seconds.
268 //
set_request_excluded(PROJECT * p)269 void RSC_WORK_FETCH::set_request_excluded(PROJECT* p) {
270     RSC_PROJECT_WORK_FETCH& pwf = project_state(p);
271 
272     int inst_mask = sim_excluded_instances & pwf.non_excluded_instances;
273     int n = 0;
274     for (int i=0; i<ninstances; i++) {
275         if ((1<<i) & inst_mask) {
276             n++;
277         }
278     }
279     WF_DEBUG(msg_printf(p, MSG_INFO, "set_request_excluded() %d %d %d", sim_excluded_instances, pwf.non_excluded_instances, n));
280     req_instances = n;
281     if (p->resource_share == 0 || cc_config.fetch_minimal_work) {
282         req_secs = 1;
283     } else {
284         req_secs = n*gstate.work_buf_total();
285     }
286 }
287 
print_state(const char * name)288 void RSC_WORK_FETCH::print_state(const char* name) {
289     msg_printf(0, MSG_INFO, "[work_fetch] --- state for %s ---", name);
290     msg_printf(0, MSG_INFO,
291         "[work_fetch] shortfall %.2f nidle %.2f saturated %.2f busy %.2f",
292         shortfall, nidle_now, saturated_time,
293         busy_time_estimator.get_busy_time()
294     );
295 //    msg_printf(0, MSG_INFO, "[work_fetch] sim used inst %d sim excl inst %d",
296 //        sim_used_instances, sim_excluded_instances
297 //    );
298     for (unsigned int i=0; i<gstate.projects.size(); i++) {
299         char buf[256];
300         PROJECT* p = gstate.projects[i];
301         if (p->non_cpu_intensive) continue;
302         RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
303         double bt = rpwf.backoff_time>gstate.now?rpwf.backoff_time-gstate.now:0;
304         if (bt) {
305             snprintf(buf, sizeof(buf),
306                 " (resource backoff: %.2f, inc %.2f)",
307                 bt, rpwf.backoff_interval
308             );
309         } else {
310             safe_strcpy(buf, "");
311         }
312         msg_printf(p, MSG_INFO,
313             "[work_fetch] share %.3f %s %s",
314             rpwf.fetchable_share,
315             rsc_project_reason_string(rpwf.rsc_project_reason),
316             buf
317         );
318     }
319 }
320 
clear_request()321 void RSC_WORK_FETCH::clear_request() {
322     req_secs = 0;
323     req_instances = 0;
324 }
325 
326 ///////////////  PROJECT_WORK_FETCH  ///////////////
327 
compute_project_reason(PROJECT * p)328 int PROJECT_WORK_FETCH::compute_project_reason(PROJECT* p) {
329     if (p->non_cpu_intensive) return CANT_FETCH_WORK_NON_CPU_INTENSIVE;
330     if (p->suspended_via_gui) return CANT_FETCH_WORK_SUSPENDED_VIA_GUI;
331     if (p->master_url_fetch_pending) return CANT_FETCH_WORK_MASTER_URL_FETCH_PENDING;
332     if (p->dont_request_more_work) return CANT_FETCH_WORK_DONT_REQUEST_MORE_WORK;
333     if (p->some_download_stalled()) return CANT_FETCH_WORK_DOWNLOAD_STALLED;
334     if (p->some_result_suspended()) return CANT_FETCH_WORK_RESULT_SUSPENDED;
335     if (p->too_many_uploading_results) return CANT_FETCH_WORK_TOO_MANY_UPLOADS;
336 
337     // this goes last, so that if this is the reason we know
338     // that there are no other reasons
339     //
340     if (p->min_rpc_time > gstate.now) return CANT_FETCH_WORK_MIN_RPC_TIME;
341     return 0;
342 }
343 
reset(PROJECT * p)344 void PROJECT_WORK_FETCH::reset(PROJECT* p) {
345     for (int i=0; i<coprocs.n_rsc; i++) {
346         p->rsc_pwf[i].reset();
347     }
348 }
349 
rr_init(PROJECT * p)350 void PROJECT_WORK_FETCH::rr_init(PROJECT* p) {
351     project_reason = compute_project_reason(p);
352     n_runnable_jobs = 0;
353 }
354 
print_state(PROJECT * p)355 void PROJECT_WORK_FETCH::print_state(PROJECT* p) {
356     char buf[1024], buf2[1024];
357     if (project_reason) {
358         snprintf(buf, sizeof(buf),
359             "can't request work: %s",
360             project_reason_string(p, buf2, sizeof(buf2))
361         );
362     } else {
363         safe_strcpy(buf, "can request work");
364     }
365     if (p->min_rpc_time > gstate.now) {
366         snprintf(buf2, sizeof(buf2),
367             " (%.2f sec)",
368             p->min_rpc_time - gstate.now
369         );
370         safe_strcat(buf, buf2);
371     }
372     msg_printf(p, MSG_INFO,
373         "[work_fetch] REC %.3f prio %.3f %s",
374         rec,
375         p->sched_priority,
376         buf
377     );
378 }
379 
380 ///////////////  WORK_FETCH  ///////////////
381 
rr_init()382 void WORK_FETCH::rr_init() {
383     // compute PROJECT::RSC_PROJECT_WORK_FETCH::has_deferred_job
384     //
385     for (unsigned int i=0; i<gstate.projects.size(); i++) {
386         PROJECT* p = gstate.projects[i];
387         for (int j=0; j<coprocs.n_rsc; j++) {
388             p->rsc_pwf[j].has_deferred_job = false;
389         }
390     }
391     for (unsigned int i=0; i<gstate.results.size(); i++) {
392         RESULT* rp = gstate.results[i];
393         if (rp->schedule_backoff) {
394             if (rp->schedule_backoff > gstate.now) {
395                 int rt = rp->avp->gpu_usage.rsc_type;
396                 rp->project->rsc_pwf[rt].has_deferred_job = true;
397             } else {
398                 rp->schedule_backoff = 0;
399                 gstate.request_schedule_cpus("schedule backoff finished");
400             }
401         }
402     }
403 
404     for (int i=0; i<coprocs.n_rsc; i++) {
405         rsc_work_fetch[i].rr_init();
406     }
407     for (unsigned int i=0; i<gstate.projects.size(); i++) {
408         PROJECT* p = gstate.projects[i];
409         p->pwf.rr_init(p);
410         for (int j=0; j<coprocs.n_rsc; j++) {
411             p->rsc_pwf[j].rr_init();
412         }
413     }
414 }
415 // copy request fields from RSC_WORK_FETCH to COPROCS
416 //
copy_requests()417 void WORK_FETCH::copy_requests() {
418     for (int i=0; i<coprocs.n_rsc; i++) {
419         switch (coproc_type_name_to_num(coprocs.coprocs[i].type)) {
420         case PROC_TYPE_NVIDIA_GPU:
421             rsc_work_fetch[i].copy_request(coprocs.nvidia);
422             break;
423         case PROC_TYPE_AMD_GPU:
424             rsc_work_fetch[i].copy_request(coprocs.ati);
425             break;
426         case PROC_TYPE_INTEL_GPU:
427             rsc_work_fetch[i].copy_request(coprocs.intel_gpu);
428             break;
429         default:
430             rsc_work_fetch[i].copy_request(coprocs.coprocs[i]);
431             break;
432         }
433     }
434 }
435 
print_state()436 void WORK_FETCH::print_state() {
437     msg_printf(0, MSG_INFO, "[work_fetch] ------- start work fetch state -------");
438     msg_printf(0, MSG_INFO, "[work_fetch] target work buffer: %.2f + %.2f sec",
439         gstate.work_buf_min(), gstate.work_buf_additional()
440     );
441     msg_printf(0, MSG_INFO, "[work_fetch] --- project states ---");
442     for (unsigned int i=0; i<gstate.projects.size(); i++) {
443         PROJECT* p = gstate.projects[i];
444         p->pwf.print_state(p);
445     }
446     for (int i=0; i<coprocs.n_rsc; i++) {
447         rsc_work_fetch[i].print_state(rsc_name_long(i));
448     }
449     msg_printf(0, MSG_INFO, "[work_fetch] ------- end work fetch state -------");
450 }
451 
clear_request()452 void WORK_FETCH::clear_request() {
453     for (int i=0; i<coprocs.n_rsc; i++) {
454         rsc_work_fetch[i].clear_request();
455     }
456 }
457 
requested_work()458 bool WORK_FETCH::requested_work() {
459     for (int i=0; i<coprocs.n_rsc; i++) {
460         if (rsc_work_fetch[i].req_secs) return true;
461     }
462     return false;
463 }
464 
465 // We're going to contact this project for reasons other than work fetch
466 // (e.g., to report completed results, or at user request).
467 // Decide if we should "piggyback" a work fetch request.
468 //
piggyback_work_request(PROJECT * p)469 void WORK_FETCH::piggyback_work_request(PROJECT* p) {
470     WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback_work_request()");)
471     clear_request();
472     if (cc_config.fetch_minimal_work && gstate.had_or_requested_work) return;
473     if (p->non_cpu_intensive) {
474         if (!has_a_job_in_progress(p) && !p->dont_request_more_work) {
475             rsc_work_fetch[0].req_secs = 1;
476         }
477         return;
478     }
479 
480     setup();
481 
482     switch (p->pwf.project_reason) {
483     case 0:
484     case CANT_FETCH_WORK_MIN_RPC_TIME:
485         break;
486     default:
487         return;
488     }
489 
490     // if project was updated from manager and config says so,
491     // fetch work for a resource even if there are higher-prio projects
492     // able to fetch it
493     //
494     bool check_higher_priority_projects = true;
495     if (p->sched_rpc_pending && cc_config.fetch_on_update) {
496         check_higher_priority_projects = false;
497     }
498 
499     // For each resource, scan projects in decreasing priority,
500     // seeing if there's one that's higher-priority than this
501     // able to fetch work for the resource.
502     // If not, and the resource needs topping off, do so
503     //
504     for (int i=0; i<coprocs.n_rsc; i++) {
505         WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: resource %s", rsc_name_long(i));)
506         RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
507         if (i && !gpus_usable) {
508             rwf.dont_fetch_reason = DONT_FETCH_GPUS_NOT_USABLE;
509             continue;
510         }
511         RSC_PROJECT_WORK_FETCH& rpwf = rwf.project_state(p);
512         switch (rpwf.rsc_project_reason) {
513         case 0:
514         case DONT_FETCH_BACKED_OFF:
515             break;
516         default:
517             WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: can't fetch %s: %s", rsc_name_long(i), rsc_project_reason_string(rpwf.rsc_project_reason));)
518             continue;
519         }
520         bool buffer_low = (rwf.saturated_time < gstate.work_buf_total());
521         bool need_work = buffer_low;
522         if (rwf.has_exclusions && rwf.uses_starved_excluded_instances(p)) {
523             need_work = true;
524         }
525         if (!need_work) {
526             WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: don't need %s", rsc_name_long(i));)
527             rwf.dont_fetch_reason = DONT_FETCH_BUFFER_FULL;
528             continue;
529         }
530         if (check_higher_priority_projects) {
531             PROJECT* p2 = NULL;
532             for (unsigned int j=0; j<projects_sorted.size(); j++) {
533                 p2 = projects_sorted[j];
534                 if (p2 == p) break;
535                 if (p2->sched_priority == p->sched_priority) continue;
536                 if (p2->pwf.project_reason) {
537                     WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: %s can't fetch work", p2->project_name);)
538                     continue;
539                 }
540                 RSC_PROJECT_WORK_FETCH& rpwf2 = rwf.project_state(p2);
541                 if (!rpwf2.rsc_project_reason) {
542                     WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: better proj %s", p2->project_name);)
543                     break;
544                 }
545             }
546             if (p != p2) {
547                 rwf.dont_fetch_reason = DONT_FETCH_NOT_HIGHEST_PRIO;
548                 continue;
549             }
550         }
551         WF_DEBUG(msg_printf(p, MSG_INFO, "piggyback: requesting %s", rsc_name_long(i));)
552         if (buffer_low) {
553             rwf.set_request(p);
554         } else {
555             rwf.set_request_excluded(p);
556         }
557     }
558     if (!requested_work()) {
559         p->pwf.project_reason = CANT_FETCH_WORK_DONT_NEED;
560     }
561 }
562 
563 // see if there's a fetchable non-CPU-intensive project without work
564 //
non_cpu_intensive_project_needing_work()565 PROJECT* WORK_FETCH::non_cpu_intensive_project_needing_work() {
566     for (unsigned int i=0; i<gstate.projects.size(); i++) {
567         PROJECT* p = gstate.projects[i];
568         if (!p->non_cpu_intensive) continue;
569         if (!p->can_request_work()) continue;
570         if (p->rsc_pwf[0].backoff_time > gstate.now) continue;
571         if (has_a_job_in_progress(p)) continue;
572         clear_request();
573         rsc_work_fetch[0].req_secs = 1;
574         return p;
575     }
576     return 0;
577 }
578 
higher_priority(PROJECT * p1,PROJECT * p2)579 static bool higher_priority(PROJECT *p1, PROJECT *p2) {
580     return (p1->sched_priority > p2->sched_priority);
581 }
582 
583 // return true if there is exclusion starvation
584 // and this project can use the starved instances
585 //
uses_starved_excluded_instances(PROJECT * p)586 bool RSC_WORK_FETCH::uses_starved_excluded_instances(PROJECT* p) {
587     RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
588     if (!sim_excluded_instances) return false;
589     if ((sim_excluded_instances & rpwf.non_excluded_instances) == 0) {
590         WF_DEBUG(msg_printf(p, MSG_INFO, "skip: excl");)
591         return false;
592     }
593     return true;
594 }
595 
596 // setup for choose_project() and piggyback()
597 //
setup()598 void WORK_FETCH::setup() {
599     gstate.compute_nuploading_results();
600 
601     rr_simulation();
602 
603     // Compute rsc_project_reason.
604     // Must do this after rr_simulation() because the logic for
605     // zero-resource-share projects uses #idle instances
606     //
607     for (unsigned int i=0; i<gstate.projects.size(); i++) {
608         PROJECT* p = gstate.projects[i];
609         for (int j=0; j<coprocs.n_rsc; j++) {
610             RSC_PROJECT_WORK_FETCH& rpwf = p->rsc_pwf[j];
611             rpwf.rsc_project_reason = rpwf.compute_rsc_project_reason(p, j);
612         }
613     }
614 
615     compute_shares();
616     project_priority_init(true);
617     clear_request();
618 
619     // Decrement the priority of projects that have work queued.
620     // Specifically, subtract
621     // (FLOPs queued for P)/(FLOPs of max queue)
622     // which will generally be between 0 and 1.
623     // This is a little arbitrary but I can't think of anything better.
624     //
625     double max_queued_flops = gstate.work_buf_total()*total_peak_flops();
626     for (unsigned int i=0; i<gstate.results.size(); i++) {
627         RESULT* rp = gstate.results[i];
628         PROJECT* p = rp->project;
629         p->sched_priority -= rp->estimated_flops_remaining()/max_queued_flops;
630     }
631 
632     // don't request work from projects w/ > 1000 runnable jobs
633     //
634     int job_limit = 1000;
635     for (unsigned int i=0; i<gstate.projects.size(); i++) {
636         PROJECT* p = gstate.projects[i];
637         if (p->pwf.n_runnable_jobs > job_limit && !p->pwf.project_reason) {
638             p->pwf.project_reason = CANT_FETCH_WORK_TOO_MANY_RUNNABLE;
639         }
640     }
641 
642     projects_sorted = gstate.projects;
643     std::sort(
644         projects_sorted.begin(),
645         projects_sorted.end(),
646         higher_priority
647     );
648     if (log_flags.work_fetch_debug) {
649         print_state();
650     }
651 }
652 
653 // Choose a project to fetch work from,
654 // and set the request fields of resource objects.
655 // Set p->sched_rpc_pending; if you decide not to request work
656 // from the project, you must clear this.
657 //
choose_project()658 PROJECT* WORK_FETCH::choose_project() {
659     PROJECT* p;
660 
661     p = non_cpu_intensive_project_needing_work();
662     if (p) return p;
663 
664     setup();
665 
666     for (int i=0; i<coprocs.n_rsc; i++) {
667         rsc_work_fetch[i].found_project = NULL;
668     }
669 
670     // scan projects in order of decreasing priority
671     //
672     bool found = false;
673     for (unsigned int j=0; j<projects_sorted.size(); j++) {
674         p = projects_sorted[j];
675         WF_DEBUG(msg_printf(p, MSG_INFO, "scanning");)
676         if (p->pwf.project_reason) {
677             WF_DEBUG(msg_printf(p, MSG_INFO, "skip: cfwr %d", p->pwf.project_reason);)
678             continue;
679         }
680 
681         // For each resource type:
682         // - See if we can ask this project for work of that type;
683         //   if so set a flag so that lower-priority projects
684         //   won't request it
685         // - If so, see if work is needed for this type;
686         //   if so, set "found_project" flag
687         //
688         int rsc_index = -1;
689         for (int i=0; i<coprocs.n_rsc; i++) {
690             if (i && !gpus_usable) continue;
691             RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
692             RSC_PROJECT_WORK_FETCH& rpwf = rwf.project_state(p);
693             if (!rpwf.rsc_project_reason) {
694                 if (!rwf.found_project) {
695                     rwf.found_project = p;
696                 }
697                 WF_DEBUG(msg_printf(p, MSG_INFO, "can fetch %s", rsc_name_long(i));)
698             } else {
699                 WF_DEBUG(msg_printf(p, MSG_INFO, "can't fetch %s: %s", rsc_name_long(i), rsc_project_reason_string(rpwf.rsc_project_reason));)
700                 continue;
701             }
702             if (rwf.saturated_time < gstate.work_buf_min()) {
703                 WF_DEBUG(msg_printf(p, MSG_INFO, "%s needs work - buffer low", rsc_name_long(i));)
704                 rsc_index = i;
705                 break;
706             }
707             if (rwf.has_exclusions && rwf.uses_starved_excluded_instances(p)) {
708                 WF_DEBUG(msg_printf(p, MSG_INFO, "%s needs work - excluded instance starved", rsc_name_long(i));)
709                 rsc_index = i;
710                 break;
711             }
712         }
713 
714         // If rsc_index is nonzero, it's a resource that this project
715         // can ask for work, and which needs work.
716         // And this is the highest-priority project having this property.
717         // Request work from this resource,
718         // and any others for which this is the highest-priority project
719         // able to request work
720         //
721         if (rsc_index >= 0) {
722             bool any_request = false;
723             for (int i=0; i<coprocs.n_rsc; i++) {
724                 if (i && !gpus_usable) continue;
725                 RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
726                 bool buffer_low;
727                 WF_DEBUG(msg_printf(p, MSG_INFO, "checking %s", rsc_name_long(i));)
728                 if (i == rsc_index) {
729                     buffer_low = (rwf.saturated_time < gstate.work_buf_min());
730                 } else {
731                     if (rwf.found_project && rwf.found_project != p) {
732                         WF_DEBUG(msg_printf(p, MSG_INFO, "%s not high prio proj", rsc_name_long(i));)
733                         continue;
734                     }
735                     buffer_low = (rwf.saturated_time < gstate.work_buf_total());
736                     bool need_work = buffer_low;
737                     if (rwf.has_exclusions && rwf.uses_starved_excluded_instances(p)) {
738                         need_work = true;
739                     }
740                     if (!need_work) {
741                         WF_DEBUG(msg_printf(p, MSG_INFO, "%s don't need", rsc_name_long(i));)
742                         continue;
743                     }
744                     RSC_PROJECT_WORK_FETCH& rpwf = rwf.project_state(p);
745                     int reason = rpwf.rsc_project_reason;
746                     switch (reason) {
747                     case 0:
748                     case DONT_FETCH_BACKED_OFF:
749                         // request even if backed off - no reason not to.
750                         //
751                         break;
752                     default:
753                         WF_DEBUG(msg_printf(p, MSG_INFO, "%s can't fetch: %s", rsc_name_long(i), rsc_project_reason_string(reason));)
754                         continue;
755                     }
756                 }
757                 if (buffer_low) {
758                     rwf.set_request(p);
759                     WF_DEBUG(msg_printf(p, MSG_INFO, "%s set_request: %f", rsc_name_long(i), rwf.req_secs);)
760                 } else {
761                     rwf.set_request_excluded(p);
762                     WF_DEBUG(msg_printf(p, MSG_INFO, "%s set_request_excluded: %f", rsc_name_long(i), rwf.req_secs);)
763                 }
764                 if (rwf.req_secs > 0) {
765                     any_request = true;
766                 }
767             }
768             if (any_request) {
769                 found = true;
770                 break;
771             }
772         }
773     }
774 
775     if (found) {
776         p->sched_rpc_pending = RPC_REASON_NEED_WORK;
777     } else {
778         if (log_flags.work_fetch_debug) {
779             msg_printf(0, MSG_INFO, "[work_fetch] No project chosen for work fetch");
780         }
781         p = NULL;
782     }
783 
784     return p;
785 }
786 
787 // estimate the amount of CPU and GPU time this task has got
788 // in last dt sec, and add to project totals
789 //
accumulate_inst_sec(ACTIVE_TASK * atp,double dt)790 void WORK_FETCH::accumulate_inst_sec(ACTIVE_TASK* atp, double dt) {
791     APP_VERSION* avp = atp->result->avp;
792     PROJECT* p = atp->result->project;
793     double x = dt*avp->avg_ncpus;
794     p->rsc_pwf[0].secs_this_rec_interval += x;
795     rsc_work_fetch[0].secs_this_rec_interval += x;
796     int rt = avp->gpu_usage.rsc_type;
797     if (rt) {
798         x = dt*avp->gpu_usage.usage;
799         p->rsc_pwf[rt].secs_this_rec_interval += x;
800         rsc_work_fetch[rt].secs_this_rec_interval += x;
801     }
802 }
803 
804 // find total and per-project resource shares for each resource
805 //
compute_shares()806 void WORK_FETCH::compute_shares() {
807     unsigned int i;
808     PROJECT* p;
809     for (i=0; i<gstate.projects.size(); i++) {
810         p = gstate.projects[i];
811         if (p->non_cpu_intensive) continue;
812         if (p->pwf.project_reason) continue;
813         for (int j=0; j<coprocs.n_rsc; j++) {
814             if (!p->rsc_pwf[j].rsc_project_reason) {
815                 rsc_work_fetch[j].total_fetchable_share += p->resource_share;
816             }
817         }
818     }
819     for (i=0; i<gstate.projects.size(); i++) {
820         p = gstate.projects[i];
821         if (p->non_cpu_intensive) continue;
822         if (p->pwf.project_reason) continue;
823         for (int j=0; j<coprocs.n_rsc; j++) {
824             if (!p->rsc_pwf[j].rsc_project_reason) {
825                 p->rsc_pwf[j].fetchable_share = rsc_work_fetch[j].total_fetchable_share?p->resource_share/rsc_work_fetch[j].total_fetchable_share:1;
826             }
827         }
828     }
829 }
830 
request_string(char * buf,int len)831 void WORK_FETCH::request_string(char* buf, int len) {
832     char buf2[256];
833     snprintf(buf, len,
834         "[work_fetch] request: CPU (%.2f sec, %.2f inst)",
835         rsc_work_fetch[0].req_secs, rsc_work_fetch[0].req_instances
836     );
837     for (int i=1; i<coprocs.n_rsc; i++) {
838         snprintf(buf2, sizeof(buf2),
839             " %s (%.2f sec, %.2f inst)",
840             rsc_name_long(i), rsc_work_fetch[i].req_secs, rsc_work_fetch[i].req_instances
841         );
842         strlcat(buf, buf2, len);
843     }
844 }
845 
write_request(FILE * f,PROJECT * p)846 void WORK_FETCH::write_request(FILE* f, PROJECT* p) {
847     double work_req = rsc_work_fetch[0].req_secs;
848 
849     // if project is anonymous platform, set the overall work req
850     // to the max of the requests of resource types for which we have versions.
851     // Otherwise projects with old schedulers won't send us work.
852     // THIS CAN BE REMOVED AT SOME POINT
853     //
854     if (p->anonymous_platform) {
855         for (int i=1; i<coprocs.n_rsc; i++) {
856             if (has_coproc_app(p, i)) {
857                 if (rsc_work_fetch[i].req_secs > work_req) {
858                     work_req = rsc_work_fetch[i].req_secs;
859                 }
860             }
861         }
862     }
863     fprintf(f,
864         "    <work_req_seconds>%f</work_req_seconds>\n"
865         "    <cpu_req_secs>%f</cpu_req_secs>\n"
866         "    <cpu_req_instances>%f</cpu_req_instances>\n"
867         "    <estimated_delay>%f</estimated_delay>\n",
868         work_req,
869         rsc_work_fetch[0].req_secs,
870         rsc_work_fetch[0].req_instances,
871         rsc_work_fetch[0].req_secs?rsc_work_fetch[0].busy_time_estimator.get_busy_time():0
872     );
873     if (log_flags.work_fetch_debug) {
874         char buf[256];
875         request_string(buf, sizeof(buf));
876         msg_printf(p, MSG_INFO, "%s", buf);
877     }
878 }
879 
880 // we just got a scheduler reply with the given jobs; update backoffs
881 //
handle_reply(PROJECT * p,SCHEDULER_REPLY *,vector<RESULT * > new_results)882 void WORK_FETCH::handle_reply(
883     PROJECT* p, SCHEDULER_REPLY*, vector<RESULT*> new_results
884 ) {
885     bool got_work[MAX_RSC];
886     bool requested_work_rsc[MAX_RSC];
887     for (int i=0; i<coprocs.n_rsc; i++) {
888         got_work[i] = false;
889         requested_work_rsc[i] = (rsc_work_fetch[i].req_secs > 0);
890     }
891     for (unsigned int i=0; i<new_results.size(); i++) {
892         RESULT* rp = new_results[i];
893         got_work[rp->avp->gpu_usage.rsc_type] = true;
894     }
895 
896     for (int i=0; i<coprocs.n_rsc; i++) {
897         // back off on a resource type if
898         // - we asked for jobs
899         // - we didn't get any
900         // - we're not currently backed off for that type
901         //   (i.e. don't back off because of a piggyback request)
902         // - the RPC was done for a reason that is automatic
903         //   and potentially frequent
904         //
905         if (requested_work_rsc[i] && !got_work[i]) {
906             if (p->rsc_pwf[i].backoff_time < gstate.now) {
907                 switch (p->sched_rpc_pending) {
908                 case RPC_REASON_RESULTS_DUE:
909                 case RPC_REASON_NEED_WORK:
910                 case RPC_REASON_TRICKLE_UP:
911                     p->rsc_pwf[i].resource_backoff(p, rsc_name_long(i));
912                 }
913             }
914         }
915         // if we did get jobs, clear backoff
916         //
917         if (got_work[i]) {
918             p->rsc_pwf[i].clear_backoff();
919         }
920     }
921     p->pwf.request_if_idle_and_uploading = false;
922 }
923 
924 // set up for initial RPC.
925 // Ask for just 1 job per instance,
926 // since we don't have good runtime estimates yet
927 //
set_initial_work_request(PROJECT * p)928 void WORK_FETCH::set_initial_work_request(PROJECT* p) {
929     clear_request();
930     for (int i=0; i<coprocs.n_rsc; i++) {
931         if (p->resource_share > 0 && !p->dont_request_more_work) {
932             rsc_work_fetch[i].req_secs = 1;
933             if (i) {
934                 RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
935                 if (rwf.ninstances ==  p->rsc_pwf[i].ncoprocs_excluded) {
936                     rsc_work_fetch[i].req_secs = 0;
937                 }
938             }
939         }
940         rsc_work_fetch[i].busy_time_estimator.reset();
941     }
942 }
943 
944 // called once, at client startup
945 //
init()946 void WORK_FETCH::init() {
947     rsc_work_fetch[0].init(0, gstate.ncpus, 1);
948     double cpu_flops = gstate.host_info.p_fpops;
949 
950     // use 20% as a rough estimate of GPU efficiency
951 
952     for (int i=1; i<coprocs.n_rsc; i++) {
953         rsc_work_fetch[i].init(
954             i, coprocs.coprocs[i].count,
955             coprocs.coprocs[i].count*0.2*coprocs.coprocs[i].peak_flops/cpu_flops
956         );
957     }
958 
959     // see what resources anon platform projects can use
960     //
961     unsigned int i, j;
962     for (i=0; i<gstate.projects.size(); i++) {
963         PROJECT* p = gstate.projects[i];
964         if (!p->anonymous_platform) continue;
965         for (int k=0; k<coprocs.n_rsc; k++) {
966             p->rsc_pwf[k].anonymous_platform_no_apps = true;
967         }
968         for (j=0; j<gstate.app_versions.size(); j++) {
969             APP_VERSION* avp = gstate.app_versions[j];
970             if (avp->project != p) continue;
971             p->rsc_pwf[avp->gpu_usage.rsc_type].anonymous_platform_no_apps = false;
972         }
973     }
974 }
975 
976 // clear backoff for app's resource
977 //
clear_backoffs(APP_VERSION & av)978 void WORK_FETCH::clear_backoffs(APP_VERSION& av) {
979     av.project->rsc_pwf[av.gpu_usage.rsc_type].clear_backoff();
980 }
981 
982 ////////////////////////
983 
compute_nuploading_results()984 void CLIENT_STATE::compute_nuploading_results() {
985     unsigned int i;
986 
987     for (i=0; i<projects.size(); i++) {
988         projects[i]->nuploading_results = 0;
989         projects[i]->too_many_uploading_results = false;
990     }
991     for (i=0; i<results.size(); i++) {
992         RESULT* rp = results[i];
993         if (rp->state() == RESULT_FILES_UPLOADING) {
994             rp->project->nuploading_results++;
995         }
996     }
997     int n = gstate.ncpus;
998     for (int j=1; j<coprocs.n_rsc; j++) {
999         if (coprocs.coprocs[j].count > n) {
1000             n = coprocs.coprocs[j].count;
1001         }
1002     }
1003     n *= 2;
1004     for (i=0; i<projects.size(); i++) {
1005         if (projects[i]->nuploading_results > n) {
1006             projects[i]->too_many_uploading_results = true;
1007         }
1008     }
1009 }
1010 
1011 // Returns the estimated total elapsed time of this task.
1012 // Compute this as a weighted average of estimates based on
1013 // 1) the workunit's flops count (static estimate)
1014 // 2) the current elapsed time and fraction done (dynamic estimate)
1015 //
est_dur()1016 double ACTIVE_TASK::est_dur() {
1017     if (fraction_done >= 1) return elapsed_time;
1018     double wu_est = result->estimated_runtime();
1019     if (fraction_done <= 0) {
1020         if (elapsed_time > 0) {
1021             // if app is running but hasn't reported fraction done,
1022             // use the fraction-done guesstimate from ACTIVE_TASK::write_gui()
1023             //
1024             double fd = 1 - exp(-elapsed_time/wu_est);
1025             return elapsed_time/fd;
1026         } else {
1027             return wu_est;
1028         }
1029     }
1030     bool exceeded_wu_est = (elapsed_time > wu_est);
1031     if (exceeded_wu_est) wu_est = elapsed_time;
1032     double frac_est = fraction_done_elapsed_time / fraction_done;
1033 
1034     // if app says fraction done is accurate, just use it
1035     // also use it if static estimate has already been exceeded
1036     //
1037     if (result->app->fraction_done_exact || exceeded_wu_est) return frac_est;
1038 
1039     // weighting of dynamic estimate is the fraction done
1040     // i.e. when fraction done is 0.5, weighting is 50/50
1041     //
1042     double fd_weight = fraction_done;
1043     double wu_weight = 1 - fd_weight;
1044     double x = fd_weight*frac_est + wu_weight*wu_est;
1045 #if 0
1046     //if (log_flags.rr_simulation) {
1047         msg_printf(result->project, MSG_INFO,
1048             "[rr_sim] %s frac_est %f = %f/%f",
1049             result->name, frac_est, fraction_done_elapsed_time, fraction_done
1050         );
1051         msg_printf(result->project, MSG_INFO,
1052             "[rr_sim] %s dur: %.2f = %.3f*%.2f + %.3f*%.2f",
1053             result->name, x, fd_weight, frac_est, wu_weight, wu_est
1054         );
1055     //}
1056 #endif
1057     return x;
1058 }
1059 
1060 // the fraction of time BOINC is processing
1061 //
overall_cpu_frac()1062 double CLIENT_STATE::overall_cpu_frac() {
1063     double x = time_stats.on_frac * time_stats.active_frac;
1064     if (x < 0.01) x = 0.01;
1065     if (x > 1) x = 1;
1066     return x;
1067 }
overall_gpu_frac()1068 double CLIENT_STATE::overall_gpu_frac() {
1069     double x = time_stats.on_frac * time_stats.gpu_active_frac;
1070     if (x < 0.01) x = 0.01;
1071     if (x > 1) x = 1;
1072     return x;
1073 }
overall_cpu_and_network_frac()1074 double CLIENT_STATE::overall_cpu_and_network_frac() {
1075     double x = time_stats.on_frac * time_stats.cpu_and_network_available_frac;
1076     if (x < 0.01) x = 0.01;
1077     if (x > 1) x = 1;
1078     return x;
1079 }
1080 
1081 // called when benchmarks change
1082 //
scale_duration_correction_factors(double factor)1083 void CLIENT_STATE::scale_duration_correction_factors(double factor) {
1084     if (factor <= 0) return;
1085     for (unsigned int i=0; i<projects.size(); i++) {
1086         PROJECT* p = projects[i];
1087         if (p->dont_use_dcf) continue;
1088         p->duration_correction_factor *= factor;
1089     }
1090     if (log_flags.dcf_debug) {
1091         msg_printf(NULL, MSG_INFO,
1092             "[dcf] scaling all duration correction factors by %f",
1093             factor
1094         );
1095     }
1096 }
1097 
1098 // Choose a new host CPID.
1099 // If using account manager, do scheduler RPCs
1100 // to all acct-mgr-attached projects to propagate the CPID
1101 //
generate_new_host_cpid()1102 void CLIENT_STATE::generate_new_host_cpid() {
1103     host_info.generate_host_cpid();
1104     for (unsigned int i=0; i<projects.size(); i++) {
1105         if (projects[i]->attached_via_acct_mgr) {
1106             projects[i]->sched_rpc_pending = RPC_REASON_ACCT_MGR_REQ;
1107             projects[i]->set_min_rpc_time(now + 15, "Sending new host CPID");
1108         }
1109     }
1110 }
1111 
rsc_project_reason_string(int reason)1112 const char* rsc_project_reason_string(int reason) {
1113     switch (reason) {
1114     case 0: return "";
1115     case DONT_FETCH_GPUS_NOT_USABLE: return "GPUs not usable";
1116     case DONT_FETCH_PREFS: return "blocked by project preferences";
1117     case DONT_FETCH_CONFIG: return "client configuration";
1118     case DONT_FETCH_NO_APPS: return "no applications";
1119     case DONT_FETCH_AMS: return "account manager prefs";
1120     case DONT_FETCH_ZERO_SHARE: return "zero resource share";
1121     case DONT_FETCH_BUFFER_FULL: return "job cache full";
1122     case DONT_FETCH_NOT_HIGHEST_PRIO: return "not highest priority project";
1123     case DONT_FETCH_BACKED_OFF: return "project is backed off";
1124     case DONT_FETCH_DEFER_SCHED: return "a job is deferred";
1125     }
1126     return "unknown project reason";
1127 }
1128 
project_reason_string(PROJECT * p,char * buf,int len)1129 const char* project_reason_string(PROJECT* p, char* buf, int len) {
1130     switch (p->pwf.project_reason) {
1131     case 0: return "";
1132     case CANT_FETCH_WORK_NON_CPU_INTENSIVE:
1133         return "non CPU intensive";
1134     case CANT_FETCH_WORK_SUSPENDED_VIA_GUI:
1135         return "suspended via Manager";
1136     case CANT_FETCH_WORK_MASTER_URL_FETCH_PENDING:
1137         return "master URL fetch pending";
1138     case CANT_FETCH_WORK_MIN_RPC_TIME:
1139         return "scheduler RPC backoff";
1140     case CANT_FETCH_WORK_DONT_REQUEST_MORE_WORK:
1141         return "\"no new tasks\" requested via Manager";
1142     case CANT_FETCH_WORK_DOWNLOAD_STALLED:
1143         return "some download is stalled";
1144     case CANT_FETCH_WORK_RESULT_SUSPENDED:
1145         return "some task is suspended via Manager";
1146     case CANT_FETCH_WORK_TOO_MANY_UPLOADS:
1147         return "too many uploads in progress";
1148     case CANT_FETCH_WORK_NOT_HIGHEST_PRIORITY:
1149         return "project is not highest priority";
1150     case CANT_FETCH_WORK_TOO_MANY_RUNNABLE:
1151         return "too many runnable tasks";
1152     case CANT_FETCH_WORK_DONT_NEED:
1153         if (coprocs.n_rsc == 1) {
1154             snprintf(buf, len,
1155                 "don't need (%s)",
1156                 rsc_project_reason_string(rsc_work_fetch[0].dont_fetch_reason)
1157             );
1158         } else {
1159             string x;
1160             x = "don't need (";
1161             for (int i=0; i<coprocs.n_rsc; i++) {
1162                 char buf2[256];
1163                 snprintf(buf2, sizeof(buf2),
1164                     "%s: %s",
1165                     rsc_name_long(i),
1166                     rsc_project_reason_string(rsc_work_fetch[i].dont_fetch_reason)
1167                 );
1168                 x += buf2;
1169                 if (i < coprocs.n_rsc-1) {
1170                     x += "; ";
1171                 }
1172             }
1173             x += ")";
1174             strlcpy(buf, x.c_str(), len);
1175         }
1176         return buf;
1177     }
1178     return "unknown reason";
1179 }
1180