1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2016 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC. If not, see <http://www.gnu.org/licenses/>.
17
18 // scheduler code related to sending jobs.
19 // NOTE: there should be nothing here specific to particular
20 // scheduling policies (array scan, score-based, locality)
21
22 #include "config.h"
23 #include <vector>
24 #include <list>
25 #include <string>
26 #include <ctime>
27 #include <cstdio>
28 #include <cstring>
29 #include <stdlib.h>
30 #include <sys/time.h>
31 #include <unistd.h>
32
33 #include "error_numbers.h"
34 #include "parse.h"
35 #include "util.h"
36 #include "str_util.h"
37 #include "synch.h"
38
39 #include "credit.h"
40 #include "hr.h"
41 #include "sched_array.h"
42 #include "sched_assign.h"
43 #include "sched_config.h"
44 #include "sched_customize.h"
45 #include "sched_hr.h"
46 #include "sched_locality.h"
47 #include "sched_main.h"
48 #include "sched_msgs.h"
49 #include "sched_nci.h"
50 #include "sched_shmem.h"
51 #include "sched_score.h"
52 #include "sched_timezone.h"
53 #include "sched_types.h"
54 #include "sched_util.h"
55 #include "sched_version.h"
56
57 #include "sched_send.h"
58
59 #ifdef _USING_FCGI_
60 #include "boinc_fcgi.h"
61 #endif
62
63 // if host sends us an impossible RAM size, use this instead
64 //
65 const double DEFAULT_RAM_SIZE = 64000000;
66
67 int selected_app_message_index=0;
68
file_present_on_host(const char * name)69 static inline bool file_present_on_host(const char* name) {
70 for (unsigned i=0; i<g_request->file_infos.size(); i++) {
71 FILE_INFO& fi = g_request->file_infos[i];
72 if (!strstr(name, fi.name)) {
73 return true;
74 }
75 }
76 return false;
77 }
78
79 // return the number of sticky files present on host, used by job
80 //
nfiles_on_host(WORKUNIT & wu)81 int nfiles_on_host(WORKUNIT& wu) {
82 MIOFILE mf;
83 mf.init_buf_read(wu.xml_doc);
84 XML_PARSER xp(&mf);
85 int n=0;
86 while (!xp.get_tag()) {
87 if (xp.match_tag("file_info")) {
88 FILE_INFO fi;
89 int retval = fi.parse(xp);
90 if (retval) continue;
91 if (!fi.sticky) continue;
92 if (file_present_on_host(fi.name)) {
93 n++;
94 }
95 }
96 }
97 return n;
98 }
99
100 // we're going to send the client this job,
101 // and the app uses locality scheduling lite.
102 // Add the job's sticky files to the list of files present on host.
103 //
add_job_files_to_host(WORKUNIT & wu)104 void add_job_files_to_host(WORKUNIT& wu) {
105 MIOFILE mf;
106 mf.init_buf_read(wu.xml_doc);
107 XML_PARSER xp(&mf);
108 while (!xp.get_tag()) {
109 if (xp.match_tag("file_info")) {
110 FILE_INFO fi;
111 int retval = fi.parse(xp);
112 if (retval) continue;
113 if (!fi.sticky) continue;
114 if (!file_present_on_host(fi.name)) {
115 if (config.debug_send) {
116 log_messages.printf(MSG_NORMAL,
117 "[send] Adding file %s to host file list\n", fi.name
118 );
119 }
120 g_request->file_infos.push_back(fi);
121 }
122 }
123 }
124 }
125
126 const double MIN_REQ_SECS = 0;
127 const double MAX_REQ_SECS = (28*SECONDS_IN_DAY);
128
129 // compute effective_ncpus;
130 // get limits on:
131 // # jobs per day
132 // # jobs per RPC
133 // # jobs in progress
134 //
get_job_limits()135 void WORK_REQ::get_job_limits() {
136 int ninstances[NPROC_TYPES];
137 int i;
138
139 memset(ninstances, 0, sizeof(ninstances));
140 int n;
141 n = g_reply->host.p_ncpus;
142 if (g_request->global_prefs.max_ncpus_pct && g_request->global_prefs.max_ncpus_pct < 100) {
143 n = (int)((n*g_request->global_prefs.max_ncpus_pct)/100.);
144 }
145 if (n > config.max_ncpus) n = config.max_ncpus;
146 if (n < 1) n = 1;
147 if (n > MAX_CPUS) n = MAX_CPUS;
148 if (project_prefs.max_cpus) {
149 if (n > project_prefs.max_cpus) {
150 n = project_prefs.max_cpus;
151 }
152 }
153 ninstances[PROC_TYPE_CPU] = n;
154 effective_ncpus = n;
155
156 effective_ngpus = 0;
157 for (i=1; i<g_request->coprocs.n_rsc; i++) {
158 COPROC& cp = g_request->coprocs.coprocs[i];
159 int proc_type = coproc_type_name_to_num(cp.type);
160 if (proc_type < 0) continue;
161 n = cp.count;
162 if (n > MAX_GPUS) n = MAX_GPUS;
163 ninstances[proc_type] = n;
164 effective_ngpus += n;
165 }
166
167 int mult = effective_ncpus + config.gpu_multiplier * effective_ngpus;
168 if (config.non_cpu_intensive) {
169 mult = 1;
170 ninstances[0] = 1;
171 if (effective_ngpus) effective_ngpus = 1;
172 }
173
174 if (config.max_wus_to_send) {
175 g_wreq->max_jobs_per_rpc = mult * config.max_wus_to_send;
176 } else {
177 g_wreq->max_jobs_per_rpc = 999999;
178 }
179
180 if (config.debug_quota) {
181 log_messages.printf(MSG_NORMAL,
182 "[quota] effective ncpus %d ngpus %d\n",
183 effective_ncpus, effective_ngpus
184 );
185 }
186 config.max_jobs_in_progress.reset(ninstances);
187 }
188
find_user_friendly_name(int appid)189 const char* find_user_friendly_name(int appid) {
190 APP* app = ssp->lookup_app(appid);
191 if (app) return app->user_friendly_name;
192 return "deprecated application";
193 }
194
update_quota(DB_HOST_APP_VERSION & hav)195 static void update_quota(DB_HOST_APP_VERSION& hav) {
196 if (config.daily_result_quota) {
197 if (hav.max_jobs_per_day == 0) {
198 hav.max_jobs_per_day = config.daily_result_quota;
199 if (config.debug_quota) {
200 log_messages.printf(MSG_NORMAL,
201 "[quota] [HAV#%lu] Initializing max_results_day to %d\n",
202 hav.app_version_id,
203 config.daily_result_quota
204 );
205 }
206 }
207 }
208
209 if (g_request->last_rpc_dayofyear != g_request->current_rpc_dayofyear) {
210 if (config.debug_quota) {
211 log_messages.printf(MSG_NORMAL,
212 "[quota] [HOST#%lu] [HAV#%lu] Resetting n_jobs_today\n",
213 g_reply->host.id, hav.app_version_id
214 );
215 }
216 hav.n_jobs_today = 0;
217 }
218 }
219
220 // see how much RAM we can use on this machine
221 //
get_mem_sizes()222 static inline void get_mem_sizes() {
223 g_wreq->ram = g_reply->host.m_nbytes;
224 if (g_wreq->ram <= 0) g_wreq->ram = DEFAULT_RAM_SIZE;
225 g_wreq->usable_ram = g_wreq->ram;
226 double busy_frac = g_request->global_prefs.ram_max_used_busy_frac;
227 double idle_frac = g_request->global_prefs.ram_max_used_idle_frac;
228 double frac = 1;
229 if (busy_frac>0 && idle_frac>0) {
230 frac = std::max(busy_frac, idle_frac);
231 if (frac > 1) frac = 1;
232 g_wreq->usable_ram *= frac;
233 }
234 }
235
236 // Decide whether or not this app version is 'reliable'
237 // An app version is reliable if the following conditions are true
238 // (for those that are set in the config file)
239 // 1) The host average turnaround is less than a threshold
240 // 2) consecutive_valid is above a threshold
241 // 3) The host results per day is equal to the max value
242 //
get_reliability_version(HOST_APP_VERSION & hav,double multiplier)243 void get_reliability_version(HOST_APP_VERSION& hav, double multiplier) {
244 if (hav.turnaround.n > MIN_HOST_SAMPLES && config.reliable_max_avg_turnaround) {
245
246 if (hav.turnaround.get_avg() > config.reliable_max_avg_turnaround*multiplier) {
247 if (config.debug_send) {
248 log_messages.printf(MSG_NORMAL,
249 "[send] [AV#%lu] not reliable; avg turnaround: %.3f > %.3f hrs\n",
250 hav.app_version_id,
251 hav.turnaround.get_avg()/3600,
252 config.reliable_max_avg_turnaround*multiplier/3600
253 );
254 }
255 hav.reliable = false;
256 return;
257 }
258 }
259 if (hav.consecutive_valid < CONS_VALID_RELIABLE) {
260 if (config.debug_send) {
261 log_messages.printf(MSG_NORMAL,
262 "[send] [AV#%lu] not reliable; cons valid %d < %d\n",
263 hav.app_version_id,
264 hav.consecutive_valid, CONS_VALID_RELIABLE
265 );
266 }
267 hav.reliable = false;
268 return;
269 }
270 if (config.daily_result_quota) {
271 if (hav.max_jobs_per_day < config.daily_result_quota) {
272 if (config.debug_send) {
273 log_messages.printf(MSG_NORMAL,
274 "[send] [AV#%lu] not reliable; max_jobs_per_day %d<%d\n",
275 hav.app_version_id,
276 hav.max_jobs_per_day,
277 config.daily_result_quota
278 );
279 }
280 hav.reliable = false;
281 return;
282 }
283 }
284 hav.reliable = true;
285 if (config.debug_send) {
286 log_messages.printf(MSG_NORMAL,
287 "[send] [HOST#%lu] app version %lu is reliable\n",
288 g_reply->host.id, hav.app_version_id
289 );
290 }
291 g_wreq->has_reliable_version = true;
292 }
293
294 // decide whether do unreplicated jobs with this app version
295 //
set_trust(DB_HOST_APP_VERSION & hav)296 static void set_trust(DB_HOST_APP_VERSION& hav) {
297 hav.trusted = false;
298 if (hav.consecutive_valid < CONS_VALID_UNREPLICATED) {
299 if (config.debug_send) {
300 log_messages.printf(MSG_NORMAL,
301 "[send] set_trust: cons valid %d < %d, don't use single replication\n",
302 hav.consecutive_valid, CONS_VALID_UNREPLICATED
303 );
304 }
305 return;
306 }
307 double x = 1./hav.consecutive_valid;
308 if (drand() > x) hav.trusted = true;
309 if (config.debug_send) {
310 log_messages.printf(MSG_NORMAL,
311 "[send] set_trust: random choice for cons valid %d: %s\n",
312 hav.consecutive_valid, hav.trusted?"yes":"no"
313 );
314 }
315 }
316
get_reliability_and_trust()317 static void get_reliability_and_trust() {
318 // Platforms other than Windows, Linux and Intel Macs need a
319 // larger set of computers to be marked reliable
320 //
321 double multiplier = 1.0;
322 if (strstr(g_reply->host.os_name,"Windows")
323 || strstr(g_reply->host.os_name,"Linux")
324 || (strstr(g_reply->host.os_name,"Darwin")
325 && !(strstr(g_reply->host.p_vendor,"Power Macintosh"))
326 )) {
327 multiplier = 1.0;
328 } else {
329 multiplier = 1.8;
330 }
331
332 for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
333 DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
334 get_reliability_version(hav, multiplier);
335 set_trust(hav);
336 }
337 }
338
339 // Compute the max additional disk usage we can impose on the host.
340 // Depending on the client version, it can either send us
341 // - d_total and d_free (pre 4 oct 2005)
342 // - the above plus d_boinc_used_total and d_boinc_used_project
343 //
max_allowable_disk()344 double max_allowable_disk() {
345 HOST host = g_request->host;
346 GLOBAL_PREFS prefs = g_request->global_prefs;
347 double x1, x2, x3, x;
348
349 // defaults are from config.xml
350 // if not there these are used:
351 // -default_max_used_gb= 100
352 // -default_max_used_pct = 50
353 // -default_min_free_gb = .001
354 //
355 if (prefs.disk_max_used_gb == 0) {
356 prefs.disk_max_used_gb = config.default_disk_max_used_gb;
357 }
358 if (prefs.disk_max_used_pct == 0) {
359 prefs.disk_max_used_pct = config.default_disk_max_used_pct;
360 }
361 if (prefs.disk_min_free_gb < config.default_disk_min_free_gb) {
362 prefs.disk_min_free_gb = config.default_disk_min_free_gb;
363 }
364
365 // no defaults for total/free disk space (host.d_total, d_free)
366 // if they're zero, client will get no work.
367 //
368
369 if (host.d_boinc_used_total) {
370 // The post 4 oct 2005 case.
371 // Compute the max allowable additional disk usage based on prefs
372 //
373 x1 = prefs.disk_max_used_gb*GIGA - host.d_boinc_used_total;
374 x2 = host.d_total * prefs.disk_max_used_pct / 100.0
375 - host.d_boinc_used_total;
376 x3 = host.d_free - prefs.disk_min_free_gb*GIGA; // may be negative
377 x = std::min(x1, std::min(x2, x3));
378
379 // see which bound is the most stringent
380 //
381 if (x==x1) {
382 g_reply->disk_limits.max_used = x;
383 } else if (x==x2) {
384 g_reply->disk_limits.max_frac = x;
385 } else {
386 g_reply->disk_limits.min_free = x;
387 }
388 } else {
389 // here we don't know how much space BOINC is using.
390 // so we're kinda screwed.
391 // All we can do is assume that BOINC is using zero space.
392 // We can't honor the max_used for max_used_pct preferences.
393 // We can only honor the min_free pref.
394 //
395 x = host.d_free - prefs.disk_min_free_gb*GIGA; // may be negative
396 g_reply->disk_limits.min_free = x;
397 x1 = x2 = x3 = 0;
398 }
399
400 if (x < 0) {
401 if (config.debug_send) {
402 log_messages.printf(MSG_NORMAL,
403 "[send] No disk space available: disk_max_used_gb %.2fGB disk_max_used_pct %.2f disk_min_free_gb %.2fGB\n",
404 prefs.disk_max_used_gb,
405 prefs.disk_max_used_pct,
406 prefs.disk_min_free_gb
407 );
408 log_messages.printf(MSG_NORMAL,
409 "[send] No disk space available: host.d_total %.2fGB host.d_free %.2fGB host.d_boinc_used_total %.2fGB\n",
410 host.d_total/GIGA,
411 host.d_free/GIGA,
412 host.d_boinc_used_total/GIGA
413 );
414 log_messages.printf(MSG_NORMAL,
415 "[send] No disk space available: x1 %.2fGB x2 %.2fGB x3 %.2fGB x %.2fGB\n",
416 x1/GIGA, x2/GIGA, x3/GIGA, x/GIGA
417 );
418 }
419 g_wreq->disk.set_insufficient(-x);
420 x = 0;
421 }
422 return x;
423 }
424
estimate_duration_unscaled(WORKUNIT & wu,BEST_APP_VERSION & bav)425 static double estimate_duration_unscaled(WORKUNIT& wu, BEST_APP_VERSION& bav) {
426 double rsc_fpops_est = wu.rsc_fpops_est;
427 if (rsc_fpops_est <= 0) rsc_fpops_est = 1e12;
428 return rsc_fpops_est/bav.host_usage.projected_flops;
429 }
430
431 // Compute cpu_available_frac and gpu_available_frac.
432 // These are based on client-supplied data, so do sanity checks
433 //
434 #define FRAC_MIN 0.1
clamp_frac(double & frac,const char * name)435 static inline void clamp_frac(double& frac, const char* name) {
436 if (frac > 1) {
437 if (config.debug_send) {
438 log_messages.printf(MSG_NORMAL,
439 "[send] %s=%f; setting to 1\n", name, frac
440 );
441 }
442 frac = 1;
443 } else if (frac < FRAC_MIN) {
444 if (config.debug_send) {
445 log_messages.printf(MSG_NORMAL,
446 "[send] %s=%f; setting to %f\n", name, frac, FRAC_MIN
447 );
448 }
449 frac = .01;
450 }
451 }
452
get_available_fracs()453 static inline void get_available_fracs() {
454 if (g_request->core_client_version<=41900) {
455 g_wreq->cpu_available_frac = g_reply->host.on_frac;
456 g_wreq->gpu_available_frac = g_reply->host.on_frac; // irrelevant
457 } else {
458 g_wreq->cpu_available_frac = g_reply->host.active_frac * g_reply->host.on_frac;
459 g_wreq->gpu_available_frac = g_reply->host.gpu_active_frac * g_reply->host.on_frac;
460 }
461 clamp_frac(g_wreq->cpu_available_frac, "CPU available fraction");
462 clamp_frac(g_wreq->gpu_available_frac, "GPU available fraction");
463 }
464
available_frac(BEST_APP_VERSION & bav)465 double available_frac(BEST_APP_VERSION& bav) {
466 if (bav.host_usage.uses_gpu()) {
467 return g_wreq->gpu_available_frac;
468 } else {
469 return g_wreq->cpu_available_frac;
470 }
471 }
472
473 // estimate the amount of real time to complete this WU,
474 // taking into account active_frac etc.
475 // Note: don't factor in resource_share_fraction.
476 // The core client doesn't necessarily round-robin across all projects.
477 //
estimate_duration(WORKUNIT & wu,BEST_APP_VERSION & bav)478 double estimate_duration(WORKUNIT& wu, BEST_APP_VERSION& bav) {
479 double edu = estimate_duration_unscaled(wu, bav);
480 double ed = edu/available_frac(bav);
481 if (config.debug_send_job) {
482 log_messages.printf(MSG_NORMAL,
483 "[send_job] est. duration for WU %lu: unscaled %.2f scaled %.2f\n",
484 wu.id, edu, ed
485 );
486 }
487 return ed;
488 }
489
update_n_jobs_today()490 void update_n_jobs_today() {
491 for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
492 DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
493 update_quota(hav);
494 }
495 }
496
update_estimated_delay(BEST_APP_VERSION & bav,double dt)497 static inline void update_estimated_delay(BEST_APP_VERSION& bav, double dt) {
498 int pt = bav.host_usage.proc_type;
499 if (pt == PROC_TYPE_CPU) {
500 g_request->cpu_estimated_delay += dt*bav.host_usage.avg_ncpus/g_request->host.p_ncpus;
501 } else {
502 COPROC* cp = g_request->coprocs.proc_type_to_coproc(pt);
503 cp->estimated_delay += dt*bav.host_usage.gpu_usage/cp->count;
504 }
505 }
506
507 // insert "text" right after "after" in the given buffer
508 //
insert_after(char * buffer,const char * after,const char * text)509 static int insert_after(char* buffer, const char* after, const char* text) {
510 char* p;
511 char temp[BLOB_SIZE];
512
513 if (strlen(buffer) + strlen(text) >= BLOB_SIZE-1) {
514 log_messages.printf(MSG_CRITICAL,
515 "insert_after: overflow: %d %d\n",
516 (int)strlen(buffer),
517 (int)strlen(text)
518 );
519 return ERR_BUFFER_OVERFLOW;
520 }
521 p = strstr(buffer, after);
522 if (!p) {
523 log_messages.printf(MSG_CRITICAL,
524 "insert_after: %s not found in %s\n", after, buffer
525 );
526 return ERR_XML_PARSE;
527 }
528 p += strlen(after);
529 // coverity[fixed_size_dest]
530 strcpy(temp, p);
531 strcpy(p, text);
532 strcat(p, temp);
533 return 0;
534 }
535
536 // add elements to WU's xml_doc,
537 // in preparation for sending it to a client
538 //
insert_wu_tags(WORKUNIT & wu,APP & app)539 static int insert_wu_tags(WORKUNIT& wu, APP& app) {
540 char buf[BLOB_SIZE];
541
542 sprintf(buf,
543 " <rsc_fpops_est>%f</rsc_fpops_est>\n"
544 " <rsc_fpops_bound>%f</rsc_fpops_bound>\n"
545 " <rsc_memory_bound>%f</rsc_memory_bound>\n"
546 " <rsc_disk_bound>%f</rsc_disk_bound>\n"
547 " <name>%s</name>\n"
548 " <app_name>%s</app_name>\n",
549 wu.rsc_fpops_est,
550 wu.rsc_fpops_bound,
551 wu.rsc_memory_bound,
552 wu.rsc_disk_bound,
553 wu.name,
554 app.name
555 );
556 return insert_after(wu.xml_doc, "<workunit>\n", buf);
557 }
558
559 // Add the given workunit, app, and app version to a reply.
560 //
add_wu_to_reply(WORKUNIT & wu,SCHEDULER_REPLY &,APP * app,BEST_APP_VERSION * bavp)561 static int add_wu_to_reply(
562 WORKUNIT& wu, SCHEDULER_REPLY&, APP* app, BEST_APP_VERSION* bavp
563 ) {
564 int retval;
565 WORKUNIT wu2, wu3;
566
567 APP_VERSION* avp = bavp->avp;
568
569 // add the app, app_version, and workunit to the reply,
570 // but only if they aren't already there
571 //
572 if (avp) {
573 APP_VERSION av2=*avp, *avp2=&av2;
574
575 if (strlen(config.replace_download_url_by_timezone)) {
576 process_av_timezone(avp, av2);
577 }
578
579 g_reply->insert_app_unique(*app);
580 av2.bavp = bavp;
581 g_reply->insert_app_version_unique(*avp2);
582 if (config.debug_send) {
583 log_messages.printf(MSG_NORMAL,
584 "[send] Sending app_version %s %lu %d %s; projected %.2f GFLOPS\n",
585 app->name,
586 avp2->platformid, avp2->version_num, avp2->plan_class,
587 bavp->host_usage.projected_flops/1e9
588 );
589 }
590 }
591
592 // modify the WU's xml_doc; add <name>, <rsc_*> etc.
593 //
594 wu2 = wu; // make copy since we're going to modify its XML field
595
596 // check if plan class specified memory usage
597 //
598 if (bavp->host_usage.mem_usage) {
599 wu2.rsc_memory_bound = bavp->host_usage.mem_usage;
600 }
601
602 // adjust FPOPS figures for anonymous platform
603 //
604 if (bavp->cavp) {
605 wu2.rsc_fpops_est *= bavp->cavp->rsc_fpops_scale;
606 wu2.rsc_fpops_bound *= bavp->cavp->rsc_fpops_scale;
607 }
608 retval = insert_wu_tags(wu2, *app);
609 if (retval) {
610 log_messages.printf(MSG_CRITICAL,
611 "insert_wu_tags failed: %s\n", boincerror(retval)
612 );
613 return retval;
614 }
615 wu3 = wu2;
616 if (strlen(config.replace_download_url_by_timezone)) {
617 process_wu_timezone(wu2, wu3);
618 }
619
620 g_reply->insert_workunit_unique(wu3);
621
622 // switch to tighter policy for estimating delay
623 //
624 return 0;
625 }
626
627 // add <name> tags to result's xml_doc_in
628 //
insert_name_tags(RESULT & result,WORKUNIT const & wu)629 static int insert_name_tags(RESULT& result, WORKUNIT const& wu) {
630 char buf[256];
631 int retval;
632
633 sprintf(buf, "<name>%s</name>\n", result.name);
634 retval = insert_after(result.xml_doc_in, "<result>\n", buf);
635 if (retval) return retval;
636 sprintf(buf, "<wu_name>%s</wu_name>\n", wu.name);
637 retval = insert_after(result.xml_doc_in, "<result>\n", buf);
638 if (retval) return retval;
639 return 0;
640 }
641
insert_deadline_tag(RESULT & result)642 static int insert_deadline_tag(RESULT& result) {
643 char buf[256];
644 sprintf(buf, "<report_deadline>%d</report_deadline>\n", result.report_deadline);
645 int retval = insert_after(result.xml_doc_in, "<result>\n", buf);
646 if (retval) return retval;
647 return 0;
648 }
649
650 // update workunit fields when send an instance of it:
651 // - transition time
652 // - app_version_id, if app uses homogeneous app version
653 // - hr_class, if we're using HR
654 //
655 // In the latter two cases, the update is conditional on the field
656 // fields either being zero or the desired value.
657 // Some other scheduler instance might have updated it since we read the WU,
658 // and the transitioner might have set it to zero.
659 //
update_wu_on_send(WORKUNIT wu,time_t x,APP & app,BEST_APP_VERSION & bav)660 int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) {
661 DB_WORKUNIT dbwu;
662 char buf[256], buf2[256], where_clause[256];
663 int retval;
664
665 dbwu.id = wu.id;
666
667 // SQL note: can't use min() here
668 //
669 sprintf(buf,
670 "transition_time=if(transition_time<%d, transition_time, %d)",
671 (int)x, (int)x
672 );
673 strcpy(where_clause, "");
674 if (app.homogeneous_app_version) {
675 sprintf(buf2, ", app_version_id=%lu", bav.avp->id);
676 strcat(buf, buf2);
677 sprintf(where_clause,
678 "(app_version_id=0 or app_version_id=%lu)", bav.avp->id
679 );
680 }
681 if (app_hr_type(app)) {
682 int host_hr_class = hr_class(g_request->host, app_hr_type(app));
683 sprintf(buf2, ", hr_class=%d", host_hr_class);
684 strcat(buf, buf2);
685 if (strlen(where_clause)) {
686 strcat(where_clause, " and ");
687 }
688 sprintf(buf2, "(hr_class=0 or hr_class=%d)", host_hr_class);
689 strcat(where_clause, buf2);
690 }
691 retval = dbwu.update_field(buf, strlen(where_clause)?where_clause:NULL);
692 if (retval) return retval;
693 if (boinc_db.affected_rows() != 1) {
694 return ERR_DB_NOT_FOUND;
695 }
696 return 0;
697 }
698
699 // return true iff a result for same WU is already being sent
700 //
wu_already_in_reply(WORKUNIT & wu)701 bool wu_already_in_reply(WORKUNIT& wu) {
702 unsigned int i;
703 for (i=0; i<g_reply->results.size(); i++) {
704 if (wu.id == g_reply->results[i].workunitid) {
705 return true;
706 }
707 }
708 return false;
709 }
710
lock_sema()711 void lock_sema() {
712 lock_semaphore(sema_key);
713 }
714
unlock_sema()715 void unlock_sema() {
716 unlock_semaphore(sema_key);
717 }
718
have_apps(int pt)719 static inline bool have_apps(int pt) {
720 if (g_wreq->anonymous_platform) {
721 return g_wreq->client_has_apps_for_proc_type[pt];
722 } else {
723 return ssp->have_apps_for_proc_type[pt];
724 }
725 }
726
727 // return true if additional work is needed,
728 // and there's disk space left,
729 // and we haven't exceeded result per RPC limit,
730 // and we haven't exceeded results per day limit
731 //
work_needed(bool locality_sched)732 bool work_needed(bool locality_sched) {
733 if (locality_sched) {
734 // if we've failed to send a result because of a transient condition,
735 // return false to preserve invariant
736 //
737 if (g_wreq->disk.insufficient) {
738 if (config.debug_send) {
739 log_messages.printf(MSG_NORMAL,
740 "[send] stopping work search - insufficient disk space\n"
741 );
742 }
743 return false;
744 }
745 if (g_wreq->speed.insufficient) {
746 if (config.debug_send) {
747 log_messages.printf(MSG_NORMAL,
748 "[send] stopping work search - host too slow\n"
749 );
750 }
751 return false;
752 }
753 if (g_wreq->mem.insufficient) {
754 if (config.debug_send) {
755 log_messages.printf(MSG_NORMAL,
756 "[send] stopping work search - insufficient memory\n"
757 );
758 }
759 return false;
760 }
761 if (g_wreq->no_allowed_apps_available) {
762 if (config.debug_send) {
763 log_messages.printf(MSG_NORMAL,
764 "[send] stopping work search - no locality app selected\n"
765 );
766 }
767 return false;
768 }
769 }
770
771 // see if we've reached limits on in-progress jobs
772 //
773 bool some_type_allowed = false;
774
775 for (int i=0; i<NPROC_TYPES; i++) {
776 if (!have_apps(i)) continue;
777
778 // enforce project prefs limit on # of jobs in progress
779 //
780 bool proj_pref_exceeded = false;
781 int mj = g_wreq->project_prefs.max_jobs_in_progress;
782 if (mj) {
783 if (config.max_jobs_in_progress.project_limits.total.njobs >= mj) {
784 proj_pref_exceeded = true;
785 }
786 }
787
788 if (proj_pref_exceeded || config.max_jobs_in_progress.exceeded(NULL, i)) {
789 if (config.debug_quota) {
790 log_messages.printf(MSG_NORMAL,
791 "[quota] reached limit on %s jobs in progress\n",
792 proc_type_name(i)
793 );
794 config.max_jobs_in_progress.print_log();
795 }
796 g_wreq->clear_req(i);
797 g_wreq->max_jobs_on_host_proc_type_exceeded[i] = true;
798 } else {
799 some_type_allowed = true;
800 }
801 }
802
803 if (!some_type_allowed) {
804 if (config.debug_send) {
805 log_messages.printf(MSG_NORMAL,
806 "[send] in-progress job limit exceeded\n"
807 );
808 }
809 g_wreq->max_jobs_on_host_exceeded = true;
810 return false;
811 }
812
813 // see if we've reached max jobs per RPC
814 //
815 if (g_wreq->njobs_sent >= g_wreq->max_jobs_per_rpc) {
816 if (config.debug_quota) {
817 log_messages.printf(MSG_NORMAL,
818 "[quota] stopping work search - njobs %d >= max_jobs_per_rpc %d\n",
819 g_wreq->njobs_sent, g_wreq->max_jobs_per_rpc
820 );
821 }
822 return false;
823 }
824
825 #if 0
826 if (config.debug_send) {
827 char buf[256], buf2[256];
828 strcpy(buf, "");
829 for (int i=0; i<NPROC_TYPES; i++) {
830 sprintf(buf2, " %s (%.2f, %.2f)",
831 proc_type_name(i),
832 g_wreq->req_secs[i],
833 g_wreq->req_instances[i]
834 );
835 strcat(buf, buf2);
836 }
837 log_messages.printf(MSG_NORMAL,
838 "[send] work_needed: spec req %d sec to fill %.2f; %s\n",
839 g_wreq->rsc_spec_request,
840 g_wreq->seconds_to_fill,
841 buf
842 );
843 }
844 #endif
845 if (g_wreq->rsc_spec_request) {
846 for (int i=0; i<NPROC_TYPES; i++) {
847 if (g_wreq->need_proc_type(i) && have_apps(i)) {
848 return true;
849 }
850 }
851 } else {
852 if (g_wreq->seconds_to_fill > 0) {
853 return true;
854 }
855 }
856 if (config.debug_send) {
857 log_messages.printf(MSG_NORMAL, "[send] don't need more work\n");
858 }
859 return false;
860 }
861
862 // return the app version ID, or -2/-3/-4 if anonymous platform
863 //
get_app_version_id(BEST_APP_VERSION * bavp)864 inline static DB_ID_TYPE get_app_version_id(BEST_APP_VERSION* bavp) {
865 if (bavp->avp) {
866 return bavp->avp->id;
867 } else {
868 return bavp->cavp->host_usage.resource_type();
869 }
870 }
871
add_result_to_reply(SCHED_DB_RESULT & result,WORKUNIT & wu,BEST_APP_VERSION * bavp,bool locality_scheduling)872 int add_result_to_reply(
873 SCHED_DB_RESULT& result,
874 WORKUNIT& wu,
875 BEST_APP_VERSION* bavp,
876 bool locality_scheduling
877 ) {
878 int retval;
879 bool resent_result = false;
880 APP* app = ssp->lookup_app(wu.appid);
881
882 result.hostid = g_reply->host.id;
883 result.userid = g_reply->user.id;
884 result.sent_time = time(0);
885 result.report_deadline = result.sent_time + wu.delay_bound;
886 result.flops_estimate = bavp->host_usage.peak_flops;
887 result.app_version_id = get_app_version_id(bavp);
888
889 // update WU DB record.
890 // This can fail in normal operation
891 // (other scheduler already updated hr_class or app_version_id)
892 // so do it before updating the result.
893 //
894 retval = update_wu_on_send(
895 wu, result.report_deadline + config.report_grace_period, *app, *bavp
896 );
897 if (retval == ERR_DB_NOT_FOUND) {
898 log_messages.printf(MSG_NORMAL,
899 "add_result_to_reply: WU already sent to other HR class or app version\n"
900 );
901 return retval;
902 } else if (retval) {
903 log_messages.printf(MSG_CRITICAL,
904 "add_result_to_reply: WU update failed: %d\n",
905 retval
906 );
907 return retval;
908 }
909
910 // update result DB record.
911 // This can also fail in normal operation.
912 // In this case, in principle we should undo
913 // the changes we just made to the WU (or use a transaction)
914 // but I don't think it actually matters.
915 //
916 int old_server_state = result.server_state;
917
918 if (result.server_state != RESULT_SERVER_STATE_IN_PROGRESS) {
919 // We're sending this result for the first time
920 //
921 result.server_state = RESULT_SERVER_STATE_IN_PROGRESS;
922 } else {
923 // Result was already sent to this host but was lost,
924 // so we're resending it.
925 //
926 resent_result = true;
927
928 if (config.debug_send) {
929 log_messages.printf(MSG_NORMAL,
930 "[send] [RESULT#%lu] [HOST#%lu] (resend lost work)\n",
931 result.id, g_reply->host.id
932 );
933 }
934 }
935 retval = result.mark_as_sent(old_server_state, config.report_grace_period);
936 if (retval == ERR_DB_NOT_FOUND) {
937 log_messages.printf(MSG_CRITICAL,
938 "[RESULT#%lu] [HOST#%lu]: CAN'T SEND, already sent to another host\n",
939 result.id, g_reply->host.id
940 );
941 } else if (retval) {
942 log_messages.printf(MSG_CRITICAL,
943 "add_result_to_reply: can't update result: %s\n", boincerror(retval)
944 );
945 }
946 if (retval) return retval;
947
948 // done with DB updates.
949 //
950
951 retval = add_wu_to_reply(wu, *g_reply, app, bavp);
952 if (retval) return retval;
953
954 // Adjust available disk space.
955 // In the locality scheduling locality case,
956 // reduce the available space by less than the workunit rsc_disk_bound,
957 // if the host already has the file or the file was not already sent.
958 //
959 if (!locality_scheduling || decrement_disk_space_locality(wu)) {
960 g_wreq->disk_available -= wu.rsc_disk_bound;
961 }
962
963 double est_dur = estimate_duration(wu, *bavp);
964 if (config.debug_send) {
965 double max_time = wu.rsc_fpops_bound / bavp->host_usage.projected_flops;
966 char buf1[64],buf2[64];
967 secs_to_hmsf(est_dur, buf1);
968 secs_to_hmsf(max_time, buf2);
969 log_messages.printf(MSG_NORMAL,
970 "[send] [HOST#%lu] sending [RESULT#%lu %s] (est. dur. %.2fs (%s)) (max time %.2fs (%s))\n",
971 g_reply->host.id, result.id, result.name, est_dur, buf1, max_time, buf2
972 );
973 }
974
975 // The following overwrites the result's xml_doc field.
976 // But that's OK cuz we're done with DB updates
977 //
978 retval = insert_name_tags(result, wu);
979 if (retval) {
980 log_messages.printf(MSG_CRITICAL,
981 "add_result_to_reply: can't insert name tags: %d\n",
982 retval
983 );
984 return retval;
985 }
986 retval = insert_deadline_tag(result);
987 if (retval) {
988 log_messages.printf(MSG_CRITICAL,
989 "add_result_to_reply: can't insert deadline tag: %s\n", boincerror(retval)
990 );
991 return retval;
992 }
993 result.bav = *bavp;
994 g_reply->insert_result(result);
995 if (g_wreq->rsc_spec_request) {
996 int pt = bavp->host_usage.proc_type;
997 if (pt == PROC_TYPE_CPU) {
998 g_wreq->req_secs[PROC_TYPE_CPU] -= est_dur;
999 g_wreq->req_instances[PROC_TYPE_CPU] -= bavp->host_usage.avg_ncpus;
1000 } else {
1001 g_wreq->req_secs[pt] -= est_dur;
1002 g_wreq->req_instances[pt] -= bavp->host_usage.gpu_usage;
1003 }
1004 } else {
1005 g_wreq->seconds_to_fill -= est_dur;
1006 }
1007 update_estimated_delay(*bavp, est_dur);
1008 g_wreq->njobs_sent++;
1009 config.max_jobs_in_progress.register_job(app, bavp->host_usage.proc_type);
1010 if (!resent_result) {
1011 DB_HOST_APP_VERSION* havp = bavp->host_app_version();
1012 if (havp) {
1013 havp->n_jobs_today++;
1014 }
1015 }
1016
1017 // add this result to workload for simulation
1018 //
1019 if (config.workload_sim && g_request->have_other_results_list) {
1020 IP_RESULT ipr ("", time(0)+wu.delay_bound, est_dur);
1021 g_request->ip_results.push_back(ipr);
1022 }
1023
1024 // mark job as done if debugging flag is set;
1025 // this is used by sched_driver.C (performance testing)
1026 //
1027 if (mark_jobs_done) {
1028 DB_WORKUNIT dbwu;
1029 char buf[256];
1030 sprintf(buf,
1031 "server_state=%d outcome=%d",
1032 RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_SUCCESS
1033 );
1034 result.update_field(buf);
1035
1036 dbwu.id = wu.id;
1037 sprintf(buf, "transition_time=%ld", time(0));
1038 dbwu.update_field(buf);
1039
1040 }
1041
1042 // If we're sending an unreplicated job to an untrusted host,
1043 // mark it as replicated
1044 //
1045 if (wu.target_nresults == 1 && app->target_nresults > 1) {
1046 if (bavp->trusted) {
1047 if (config.debug_send) {
1048 log_messages.printf(MSG_NORMAL,
1049 "[send] [WU#%lu] using trusted app version, not replicating\n", wu.id
1050 );
1051 }
1052 } else {
1053 DB_WORKUNIT dbwu;
1054 char buf[256];
1055 sprintf(buf,
1056 "target_nresults=%d, min_quorum=%d, transition_time=%ld",
1057 app->target_nresults, app->target_nresults, time(0)
1058 );
1059 dbwu.id = wu.id;
1060 if (config.debug_send) {
1061 log_messages.printf(MSG_NORMAL,
1062 "[send] [WU#%lu] sending to untrusted host, replicating\n", wu.id
1063 );
1064 }
1065 retval = dbwu.update_field(buf);
1066 if (retval) {
1067 log_messages.printf(MSG_CRITICAL,
1068 "WU update failed: %s", boincerror(retval)
1069 );
1070 }
1071 }
1072 }
1073
1074 // if the app uses locality scheduling lite,
1075 // add the job's files to the list of those on host
1076 //
1077 if (app->locality_scheduling == LOCALITY_SCHED_LITE) {
1078 add_job_files_to_host(wu);
1079 }
1080
1081 return 0;
1082 }
1083
1084 // Send high-priority messages about things the user can change easily
1085 // (namely the driver version)
1086 // and low-priority messages about things that can't easily be changed,
1087 // but which may be interfering with getting tasks or latest apps
1088 //
send_gpu_property_messages(GPU_REQUIREMENTS & req,double ram,int version,const char * rsc_name)1089 static void send_gpu_property_messages(
1090 GPU_REQUIREMENTS& req, double ram, int version, const char* rsc_name
1091 ) {
1092 char buf[256];
1093 if (ram < req.min_ram) {
1094 sprintf(buf,
1095 "A minimum of %d MB (preferably %d MB) of video RAM is needed to process tasks using your computer's %s",
1096 (int) (req.min_ram/MEGA),
1097 (int) (req.opt_ram/MEGA),
1098 rsc_name
1099 );
1100 g_reply->insert_message(buf, "low");
1101 } else {
1102 if (version) {
1103 if (version < req.min_driver_version) {
1104 sprintf(buf,
1105 "%s: %s",
1106 rsc_name,
1107 _("Upgrade to the latest driver to process tasks using your computer's GPU")
1108 );
1109 g_reply->insert_message(buf, "notice");
1110 } else if (version < req.opt_driver_version) {
1111 sprintf(buf,
1112 "%s: %s",
1113 rsc_name,
1114 _("Upgrade to the latest driver to use all of this project's GPU applications")
1115 );
1116 g_reply->insert_message(buf, "low");
1117 }
1118 }
1119 }
1120 }
1121
1122 // send messages complaining about lack of GPU or the properties of GPUs
1123 //
send_gpu_messages()1124 void send_gpu_messages() {
1125 // Mac client with GPU but too-old client
1126 //
1127 if (g_request->coprocs.nvidia.count
1128 && ssp->have_apps_for_proc_type[PROC_TYPE_NVIDIA_GPU]
1129 && strstr(g_request->host.os_name, "Darwin")
1130 && g_request->core_client_version < 61028
1131 ) {
1132 g_reply->insert_message(
1133 _("A newer version of BOINC is needed to use your NVIDIA GPU; please upgrade to the current version"),
1134 "notice"
1135 );
1136 }
1137
1138 // GPU-only project, client lacks GPU
1139 //
1140 bool usable_gpu = false;
1141 bool have_gpu_apps = false;
1142 for (int i=1; i<NPROC_TYPES; i++) {
1143 if (ssp->have_apps_for_proc_type[i]) {
1144 have_gpu_apps = true;
1145 COPROC* cp = g_request->coprocs.proc_type_to_coproc(i);
1146 if (cp && cp->count) {
1147 usable_gpu = true;
1148 }
1149 }
1150 }
1151 if (!ssp->have_apps_for_proc_type[PROC_TYPE_CPU]
1152 && have_gpu_apps
1153 && !usable_gpu
1154 ) {
1155 char buf[256];
1156 strcpy(buf, "");
1157 for (int i=1; i<NPROC_TYPES; i++) {
1158 if (ssp->have_apps_for_proc_type[i]) {
1159 if (strlen(buf)) {
1160 strcat(buf, " or ");
1161 }
1162 strcat(buf, proc_type_name(i));
1163 }
1164 }
1165 char msg[1024];
1166 sprintf(msg,
1167 _("An %s GPU is required to run tasks for this project"),
1168 buf
1169 );
1170 g_reply->insert_message(msg, "notice");
1171 }
1172
1173 if (g_request->coprocs.nvidia.count && ssp->have_apps_for_proc_type[PROC_TYPE_NVIDIA_GPU]) {
1174 send_gpu_property_messages(gpu_requirements[PROC_TYPE_NVIDIA_GPU],
1175 g_request->coprocs.nvidia.prop.totalGlobalMem,
1176 g_request->coprocs.nvidia.display_driver_version,
1177 proc_type_name(PROC_TYPE_NVIDIA_GPU)
1178 );
1179 }
1180 if (g_request->coprocs.ati.count && ssp->have_apps_for_proc_type[PROC_TYPE_AMD_GPU]) {
1181 send_gpu_property_messages(gpu_requirements[PROC_TYPE_AMD_GPU],
1182 g_request->coprocs.ati.attribs.localRAM*MEGA,
1183 g_request->coprocs.ati.version_num,
1184 proc_type_name(PROC_TYPE_AMD_GPU)
1185 );
1186 }
1187 if (g_request->coprocs.intel_gpu.count && ssp->have_apps_for_proc_type[PROC_TYPE_INTEL_GPU]) {
1188 send_gpu_property_messages(gpu_requirements[PROC_TYPE_INTEL_GPU],
1189 g_request->coprocs.intel_gpu.opencl_prop.global_mem_size,
1190 0,
1191 proc_type_name(PROC_TYPE_INTEL_GPU)
1192 );
1193 }
1194
1195 }
1196
1197 // send messages to user about why jobs were or weren't sent,
1198 // recommendations for GPU driver upgrades, etc.
1199 //
send_user_messages()1200 static void send_user_messages() {
1201 char buf[512];
1202 unsigned int i;
1203 int j;
1204
1205 // GPU messages aren't relevant if anonymous platform
1206 //
1207 if (!g_wreq->anonymous_platform) {
1208 send_gpu_messages();
1209 }
1210
1211 // If work was sent from apps the user did not select, explain.
1212 // NOTE: this will have to be done differently with matchmaker scheduling
1213 //
1214 if (!config.locality_scheduling && !config.locality_scheduler_fraction && config.sched_old) {
1215 if (g_wreq->njobs_sent && !g_wreq->user_apps_only) {
1216 g_reply->insert_message(
1217 "No tasks are available for the applications you have selected",
1218 "low"
1219 );
1220
1221 // Inform the user about applications with no work
1222 //
1223 for (i=0; i<g_wreq->project_prefs.selected_apps.size(); i++) {
1224 if (!g_wreq->project_prefs.selected_apps[i].work_available) {
1225 APP* app = ssp->lookup_app(g_wreq->project_prefs.selected_apps[i].appid);
1226 // don't write message if the app is deprecated
1227 //
1228 if (app) {
1229 char explanation[256];
1230 sprintf(explanation,
1231 "No tasks are available for %s",
1232 find_user_friendly_name(g_wreq->project_prefs.selected_apps[i].appid)
1233 );
1234 g_reply->insert_message( explanation, "low");
1235 }
1236 }
1237 }
1238
1239 // Tell the user about applications they didn't qualify for
1240 //
1241 for (j=0; j<selected_app_message_index; j++){
1242 g_reply->insert_message(g_wreq->no_work_messages.at(j));
1243 }
1244 g_reply->insert_message(
1245 "Your preferences allow tasks from applications other than those selected",
1246 "low"
1247 );
1248 g_reply->insert_message(
1249 "Sending tasks from other applications", "low"
1250 );
1251 }
1252 }
1253
1254 // if client asked for work and we're not sending any, explain why
1255 //
1256 if (g_wreq->njobs_sent == 0 && g_request->work_req_seconds) {
1257 g_reply->set_delay(DELAY_NO_WORK_TEMP);
1258 g_reply->insert_message("No tasks sent", "low");
1259
1260 // Tell the user about applications with no work
1261 //
1262 for (i=0; i<g_wreq->project_prefs.selected_apps.size(); i++) {
1263 if (!g_wreq->project_prefs.selected_apps[i].work_available) {
1264 APP* app = ssp->lookup_app(g_wreq->project_prefs.selected_apps[i].appid);
1265 // don't write message if the app is deprecated
1266 if (app != NULL) {
1267 sprintf(buf, "No tasks are available for %s",
1268 find_user_friendly_name(
1269 g_wreq->project_prefs.selected_apps[i].appid
1270 )
1271 );
1272 g_reply->insert_message(buf, "low");
1273 }
1274 }
1275 }
1276
1277 for (i=0; i<g_wreq->no_work_messages.size(); i++){
1278 g_reply->insert_message(g_wreq->no_work_messages.at(i));
1279 }
1280
1281 if (g_wreq->no_allowed_apps_available) {
1282 g_reply->insert_message(
1283 _("No tasks are available for the applications you have selected."),
1284 "low"
1285 );
1286 }
1287 if (g_wreq->speed.insufficient) {
1288 if (g_request->core_client_version>41900) {
1289 sprintf(buf,
1290 "Tasks won't finish in time: BOINC runs %.1f%% of the time; computation is enabled %.1f%% of that",
1291 100*g_reply->host.on_frac, 100*g_reply->host.active_frac
1292 );
1293 } else {
1294 sprintf(buf,
1295 "Tasks won't finish in time: Computer available %.1f%% of the time",
1296 100*g_reply->host.on_frac
1297 );
1298 }
1299 g_reply->insert_message(buf, "low");
1300 }
1301 if (g_wreq->hr_reject_temp) {
1302 g_reply->insert_message(
1303 "Tasks are committed to other platforms",
1304 "low"
1305 );
1306 }
1307 if (g_wreq->hr_reject_perm) {
1308 g_reply->insert_message(
1309 _("Your computer type is not supported by this project"),
1310 "notice"
1311 );
1312 }
1313 if (g_wreq->outdated_client) {
1314 g_reply->insert_message(
1315 _("Newer BOINC version required; please install current version"),
1316 "notice"
1317 );
1318 g_reply->set_delay(DELAY_NO_WORK_PERM);
1319 log_messages.printf(MSG_NORMAL,
1320 "Not sending tasks because newer client version required\n"
1321 );
1322 }
1323 for (i=0; i<NPROC_TYPES; i++) {
1324 if (g_wreq->project_prefs.dont_use_proc_type[i] && ssp->have_apps_for_proc_type[i]) {
1325 sprintf(buf,
1326 _("Tasks for %s are available, but your preferences are set to not accept them"),
1327 proc_type_name(i)
1328 );
1329 g_reply->insert_message(buf, "low");
1330 }
1331 }
1332 DB_HOST_APP_VERSION* havp = quota_exceeded_version();
1333 if (havp) {
1334 sprintf(buf, "This computer has finished a daily quota of %d tasks",
1335 havp->max_jobs_per_day
1336 );
1337 g_reply->insert_message(buf, "low");
1338 if (config.debug_quota) {
1339 log_messages.printf(MSG_NORMAL,
1340 "[quota] Daily quota %d exceeded for app version %lu\n",
1341 havp->max_jobs_per_day, havp->app_version_id
1342 );
1343 }
1344 g_reply->set_delay(DELAY_NO_WORK_CACHE);
1345 }
1346 if (g_wreq->max_jobs_exceeded()) {
1347 sprintf(buf, "This computer has reached a limit on tasks in progress");
1348 g_reply->insert_message(buf, "low");
1349 g_reply->set_delay(DELAY_NO_WORK_CACHE);
1350 }
1351 }
1352 }
1353
clamp_req_sec(double x)1354 static double clamp_req_sec(double x) {
1355 if (x < MIN_REQ_SECS) return MIN_REQ_SECS;
1356 if (x > MAX_REQ_SECS) return MAX_REQ_SECS;
1357 return x;
1358 }
1359
1360 // prepare to send jobs, both resent and new;
1361 // decipher request type, fill in WORK_REQ
1362 //
send_work_setup()1363 void send_work_setup() {
1364 unsigned int i;
1365
1366 g_wreq->seconds_to_fill = clamp_req_sec(g_request->work_req_seconds);
1367 g_wreq->req_secs[PROC_TYPE_CPU] = clamp_req_sec(g_request->cpu_req_secs);
1368 g_wreq->req_instances[PROC_TYPE_CPU] = g_request->cpu_req_instances;
1369 g_wreq->anonymous_platform = is_anonymous(g_request->platforms.list[0]);
1370
1371 // decide on attributes of HOST_APP_VERSIONS
1372 //
1373 get_reliability_and_trust();
1374
1375 // parse project preferences (e.g. no GPUs)
1376 //
1377 g_wreq->project_prefs.parse();
1378
1379 if (g_wreq->anonymous_platform) {
1380 estimate_flops_anon_platform();
1381
1382 for (i=0; i<NPROC_TYPES; i++) {
1383 g_wreq->client_has_apps_for_proc_type[i] = false;
1384 }
1385 for (i=0; i<g_request->client_app_versions.size(); i++) {
1386 CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
1387 int pt = cav.host_usage.proc_type;
1388 g_wreq->client_has_apps_for_proc_type[pt] = true;
1389 }
1390 }
1391 for (i=1; i<NPROC_TYPES; i++) {
1392 gpu_requirements[i].clear();
1393 }
1394
1395 g_wreq->disk_available = max_allowable_disk();
1396 get_mem_sizes();
1397 get_available_fracs();
1398 g_wreq->get_job_limits();
1399
1400 // do sanity checking on GPU scheduling parameters
1401 //
1402 for (i=1; i<NPROC_TYPES; i++) {
1403 COPROC* cp = g_request->coprocs.proc_type_to_coproc(i);
1404 if (cp && cp->count) {
1405 g_wreq->req_secs[i] = clamp_req_sec(cp->req_secs);
1406 g_wreq->req_instances[i] = cp->req_instances;
1407 if (cp->estimated_delay < 0) {
1408 cp->estimated_delay = g_request->cpu_estimated_delay;
1409 }
1410 }
1411 }
1412 g_wreq->rsc_spec_request = false;
1413 for (i=0; i<NPROC_TYPES; i++) {
1414 if (g_wreq->req_secs[i]) {
1415 g_wreq->rsc_spec_request = true;
1416 break;
1417 }
1418 }
1419
1420 for (i=0; i<g_request->other_results.size(); i++) {
1421 OTHER_RESULT& r = g_request->other_results[i];
1422 APP* app = NULL;
1423 int proc_type = PROC_TYPE_CPU;
1424 bool have_cav = false;
1425 if (r.app_version >= 0
1426 && r.app_version < (int)g_request->client_app_versions.size()
1427 ) {
1428 CLIENT_APP_VERSION& cav = g_request->client_app_versions[r.app_version];
1429 app = cav.app;
1430 if (app) {
1431 have_cav = true;
1432 proc_type = cav.host_usage.proc_type;
1433 }
1434 }
1435 if (!have_cav) {
1436 if (r.have_plan_class) {
1437 proc_type = plan_class_to_proc_type(r.plan_class);
1438 }
1439 }
1440 config.max_jobs_in_progress.register_job(app, proc_type);
1441 }
1442
1443 // print details of request to log
1444 //
1445 if (config.debug_quota) {
1446 log_messages.printf(MSG_NORMAL,
1447 "[quota] max jobs per RPC: %d\n", g_wreq->max_jobs_per_rpc
1448 );
1449 config.max_jobs_in_progress.print_log();
1450 }
1451 if (config.debug_send) {
1452 log_messages.printf(MSG_NORMAL,
1453 "[send] %s old scheduling; %s EDF sim\n",
1454 config.sched_old?"Using":"Not using",
1455 config.workload_sim?"Using":"Not using"
1456 );
1457 log_messages.printf(MSG_NORMAL,
1458 "[send] CPU: req %.2f sec, %.2f instances; est delay %.2f\n",
1459 g_wreq->req_secs[PROC_TYPE_CPU],
1460 g_wreq->req_instances[PROC_TYPE_CPU],
1461 g_request->cpu_estimated_delay
1462 );
1463 for (i=1; i<NPROC_TYPES; i++) {
1464 COPROC* cp = g_request->coprocs.proc_type_to_coproc(i);
1465 if (cp && cp->count) {
1466 log_messages.printf(MSG_NORMAL,
1467 "[send] %s: req %.2f sec, %.2f instances; est delay %.2f\n",
1468 proc_type_name(i),
1469 g_wreq->req_secs[i],
1470 g_wreq->req_instances[i],
1471 cp->estimated_delay
1472 );
1473 }
1474 }
1475 log_messages.printf(MSG_NORMAL,
1476 "[send] work_req_seconds: %.2f secs\n",
1477 g_wreq->seconds_to_fill
1478 );
1479 log_messages.printf(MSG_NORMAL,
1480 "[send] available disk %.2f GB, work_buf_min %d\n",
1481 g_wreq->disk_available/GIGA,
1482 (int)g_request->global_prefs.work_buf_min()
1483 );
1484 log_messages.printf(MSG_NORMAL,
1485 "[send] on_frac %f active_frac %f gpu_active_frac %f\n",
1486 g_reply->host.on_frac,
1487 g_reply->host.active_frac,
1488 g_reply->host.gpu_active_frac
1489 );
1490 if (g_wreq->anonymous_platform) {
1491 log_messages.printf(MSG_NORMAL,
1492 "[send] Anonymous platform app versions:\n"
1493 );
1494 for (i=0; i<g_request->client_app_versions.size(); i++) {
1495 CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
1496 char buf[256];
1497 strcpy(buf, "");
1498 int pt = cav.host_usage.proc_type;
1499 if (pt) {
1500 sprintf(buf, " %.2f %s GPU",
1501 cav.host_usage.gpu_usage,
1502 proc_type_name(pt)
1503 );
1504 }
1505
1506 log_messages.printf(MSG_NORMAL,
1507 " app: %s version %d cpus %.2f%s flops %fG\n",
1508 cav.app_name,
1509 cav.version_num,
1510 cav.host_usage.avg_ncpus,
1511 buf,
1512 cav.host_usage.projected_flops/1e9
1513 );
1514 }
1515 }
1516 #if 0
1517 log_messages.printf(MSG_NORMAL,
1518 "[send] p_vm_extensions_disabled: %s\n",
1519 g_request->host.p_vm_extensions_disabled?"yes":"no"
1520 );
1521 #endif
1522 log_messages.printf(MSG_NORMAL,
1523 "[send] CPU features: %s\n", g_request->host.p_features
1524 );
1525 }
1526 }
1527
1528 // If a record is not in DB, create it.
1529 //
update_host_app_versions(vector<SCHED_DB_RESULT> & results,int hostid)1530 int update_host_app_versions(vector<SCHED_DB_RESULT>& results, int hostid) {
1531 vector<DB_HOST_APP_VERSION> new_havs;
1532 unsigned int i, j;
1533 int retval;
1534
1535 for (i=0; i<results.size(); i++) {
1536 RESULT& r = results[i];
1537 int gavid = generalized_app_version_id(r.app_version_id, r.appid);
1538 DB_HOST_APP_VERSION* havp = gavid_to_havp(gavid);
1539 if (!havp) {
1540 bool found = false;
1541 for (j=0; j<new_havs.size(); j++) {
1542 DB_HOST_APP_VERSION& hav = new_havs[j];
1543 if (hav.app_version_id == gavid) {
1544 found = true;
1545 hav.n_jobs_today++;
1546 }
1547 }
1548 if (!found) {
1549 DB_HOST_APP_VERSION hav;
1550 hav.clear();
1551 hav.host_id = hostid;
1552 hav.app_version_id = gavid;
1553 hav.n_jobs_today = 1;
1554 new_havs.push_back(hav);
1555 }
1556 }
1557 }
1558
1559 // create new records
1560 //
1561 for (i=0; i<new_havs.size(); i++) {
1562 DB_HOST_APP_VERSION& hav = new_havs[i];
1563
1564 retval = hav.insert();
1565 if (retval) {
1566 log_messages.printf(MSG_CRITICAL,
1567 "hav.insert(): %s\n", boincerror(retval)
1568 );
1569 } else {
1570 if (config.debug_credit) {
1571 log_messages.printf(MSG_NORMAL,
1572 "[credit] created host_app_version record (%lu, %lu)\n",
1573 hav.host_id, hav.app_version_id
1574 );
1575 }
1576 }
1577 }
1578 return 0;
1579 }
1580
send_work()1581 void send_work() {
1582 int retval;
1583
1584 if (all_apps_use_hr && hr_unknown_platform(g_request->host)) {
1585 log_messages.printf(MSG_NORMAL,
1586 "Not sending work because unknown HR class\n"
1587 );
1588 g_wreq->hr_reject_perm = true;
1589 return;
1590 }
1591
1592 if (config.enable_assignment) {
1593 if (send_targeted_jobs()) {
1594 if (config.debug_assignment) {
1595 log_messages.printf(MSG_NORMAL,
1596 "[assign] [HOST#%lu] sent assigned jobs\n", g_reply->host.id
1597 );
1598 }
1599 goto done;
1600 }
1601 }
1602
1603 if (config.enable_assignment_multi) {
1604 if (send_broadcast_jobs()) {
1605 if (config.debug_assignment) {
1606 log_messages.printf(MSG_NORMAL,
1607 "[assign] [HOST#%lu] sent assigned jobs\n", g_reply->host.id
1608 );
1609 }
1610 goto done;
1611 }
1612 }
1613
1614 if (config.workload_sim && g_request->have_other_results_list) {
1615 init_ip_results(
1616 g_request->global_prefs.work_buf_min(),
1617 g_wreq->effective_ncpus, g_request->ip_results
1618 );
1619 }
1620
1621 // send non-CPU-intensive jobs if needed
1622 //
1623 if (ssp->have_nci_app) {
1624 send_nci();
1625 }
1626
1627 if (!work_needed(false)) {
1628 goto done;
1629 }
1630
1631 if (config.locality_scheduler_fraction > 0) {
1632 if (drand() < config.locality_scheduler_fraction) {
1633 if (config.debug_locality) {
1634 log_messages.printf(MSG_NORMAL,
1635 "[mixed] sending locality work first\n"
1636 );
1637 }
1638 send_work_locality();
1639
1640 // save 'insufficient' flags from the first scheduler
1641 bool disk_insufficient = g_wreq->disk.insufficient;
1642 bool speed_insufficient = g_wreq->speed.insufficient;
1643 bool mem_insufficient = g_wreq->mem.insufficient;
1644 bool no_allowed_apps_available = g_wreq->no_allowed_apps_available;
1645
1646 // reset 'insufficient' flags for the second scheduler
1647 g_wreq->disk.insufficient = false;
1648 g_wreq->speed.insufficient = false;
1649 g_wreq->mem.insufficient = false;
1650 g_wreq->no_allowed_apps_available = false;
1651
1652 if (config.debug_locality) {
1653 log_messages.printf(MSG_NORMAL,
1654 "[mixed] sending non-locality work second\n"
1655 );
1656 }
1657 send_work_old();
1658
1659 // recombine the 'insufficient' flags from the two schedulers
1660 g_wreq->disk.insufficient = g_wreq->disk.insufficient && disk_insufficient;
1661 g_wreq->speed.insufficient = g_wreq->speed.insufficient && speed_insufficient;
1662 g_wreq->mem.insufficient = g_wreq->mem.insufficient && mem_insufficient;
1663 g_wreq->no_allowed_apps_available = g_wreq->no_allowed_apps_available && no_allowed_apps_available;
1664
1665 } else {
1666 if (config.debug_locality) {
1667 log_messages.printf(MSG_NORMAL,
1668 "[mixed] sending non-locality work first\n"
1669 );
1670 }
1671
1672 // save 'insufficient' flags from the first scheduler
1673 bool disk_insufficient = g_wreq->disk.insufficient;
1674 bool speed_insufficient = g_wreq->speed.insufficient;
1675 bool mem_insufficient = g_wreq->mem.insufficient;
1676 bool no_allowed_apps_available = g_wreq->no_allowed_apps_available;
1677
1678 // reset 'insufficient' flags for the second scheduler
1679 g_wreq->disk.insufficient = false;
1680 g_wreq->speed.insufficient = false;
1681 g_wreq->mem.insufficient = false;
1682 g_wreq->no_allowed_apps_available = false;
1683
1684 send_work_old();
1685 if (config.debug_locality) {
1686 log_messages.printf(MSG_NORMAL,
1687 "[mixed] sending locality work second\n"
1688 );
1689 }
1690 send_work_locality();
1691
1692 // recombine the 'insufficient' flags from the two schedulers
1693 g_wreq->disk.insufficient = g_wreq->disk.insufficient && disk_insufficient;
1694 g_wreq->speed.insufficient = g_wreq->speed.insufficient && speed_insufficient;
1695 g_wreq->mem.insufficient = g_wreq->mem.insufficient && mem_insufficient;
1696 g_wreq->no_allowed_apps_available = g_wreq->no_allowed_apps_available && no_allowed_apps_available;
1697
1698 }
1699 } else if (config.locality_scheduling) {
1700 send_work_locality();
1701 } else if (config.sched_old) {
1702 send_work_old();
1703 } else {
1704 send_work_score();
1705 }
1706
1707 done:
1708 retval = update_host_app_versions(g_reply->results, g_reply->host.id);
1709 if (retval) {
1710 log_messages.printf(MSG_CRITICAL,
1711 "update_host_app_versions() failed: %s\n", boincerror(retval)
1712 );
1713 }
1714 send_user_messages();
1715 }
1716
1717 const char *BOINC_RCSID_32dcd335e7 = "$Id$";
1718