1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2016 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17 
18 // scheduler code related to sending jobs.
19 // NOTE: there should be nothing here specific to particular
20 // scheduling policies (array scan, score-based, locality)
21 
22 #include "config.h"
23 #include <vector>
24 #include <list>
25 #include <string>
26 #include <ctime>
27 #include <cstdio>
28 #include <cstring>
29 #include <stdlib.h>
30 #include <sys/time.h>
31 #include <unistd.h>
32 
33 #include "error_numbers.h"
34 #include "parse.h"
35 #include "util.h"
36 #include "str_util.h"
37 #include "synch.h"
38 
39 #include "credit.h"
40 #include "hr.h"
41 #include "sched_array.h"
42 #include "sched_assign.h"
43 #include "sched_config.h"
44 #include "sched_customize.h"
45 #include "sched_hr.h"
46 #include "sched_locality.h"
47 #include "sched_main.h"
48 #include "sched_msgs.h"
49 #include "sched_nci.h"
50 #include "sched_shmem.h"
51 #include "sched_score.h"
52 #include "sched_timezone.h"
53 #include "sched_types.h"
54 #include "sched_util.h"
55 #include "sched_version.h"
56 
57 #include "sched_send.h"
58 
59 #ifdef _USING_FCGI_
60 #include "boinc_fcgi.h"
61 #endif
62 
63 // if host sends us an impossible RAM size, use this instead
64 //
65 const double DEFAULT_RAM_SIZE = 64000000;
66 
67 int selected_app_message_index=0;
68 
file_present_on_host(const char * name)69 static inline bool file_present_on_host(const char* name) {
70     for (unsigned i=0; i<g_request->file_infos.size(); i++) {
71         FILE_INFO& fi = g_request->file_infos[i];
72         if (!strstr(name, fi.name)) {
73             return true;
74         }
75     }
76     return false;
77 }
78 
79 // return the number of sticky files present on host, used by job
80 //
nfiles_on_host(WORKUNIT & wu)81 int nfiles_on_host(WORKUNIT& wu) {
82     MIOFILE mf;
83     mf.init_buf_read(wu.xml_doc);
84     XML_PARSER xp(&mf);
85     int n=0;
86     while (!xp.get_tag()) {
87         if (xp.match_tag("file_info")) {
88             FILE_INFO fi;
89             int retval = fi.parse(xp);
90             if (retval) continue;
91             if (!fi.sticky) continue;
92             if (file_present_on_host(fi.name)) {
93                 n++;
94             }
95         }
96     }
97     return n;
98 }
99 
100 // we're going to send the client this job,
101 // and the app uses locality scheduling lite.
102 // Add the job's sticky files to the list of files present on host.
103 //
add_job_files_to_host(WORKUNIT & wu)104 void add_job_files_to_host(WORKUNIT& wu) {
105     MIOFILE mf;
106     mf.init_buf_read(wu.xml_doc);
107     XML_PARSER xp(&mf);
108     while (!xp.get_tag()) {
109         if (xp.match_tag("file_info")) {
110             FILE_INFO fi;
111             int retval = fi.parse(xp);
112             if (retval) continue;
113             if (!fi.sticky) continue;
114             if (!file_present_on_host(fi.name)) {
115                 if (config.debug_send) {
116                     log_messages.printf(MSG_NORMAL,
117                         "[send] Adding file %s to host file list\n", fi.name
118                     );
119                 }
120                 g_request->file_infos.push_back(fi);
121             }
122         }
123     }
124 }
125 
126 const double MIN_REQ_SECS = 0;
127 const double MAX_REQ_SECS = (28*SECONDS_IN_DAY);
128 
129 // compute effective_ncpus;
130 // get limits on:
131 // # jobs per day
132 // # jobs per RPC
133 // # jobs in progress
134 //
get_job_limits()135 void WORK_REQ::get_job_limits() {
136     int ninstances[NPROC_TYPES];
137     int i;
138 
139     memset(ninstances, 0, sizeof(ninstances));
140     int n;
141     n = g_reply->host.p_ncpus;
142     if (g_request->global_prefs.max_ncpus_pct && g_request->global_prefs.max_ncpus_pct < 100) {
143         n = (int)((n*g_request->global_prefs.max_ncpus_pct)/100.);
144     }
145     if (n > config.max_ncpus) n = config.max_ncpus;
146     if (n < 1) n = 1;
147     if (n > MAX_CPUS) n = MAX_CPUS;
148     if (project_prefs.max_cpus) {
149         if (n > project_prefs.max_cpus) {
150             n = project_prefs.max_cpus;
151         }
152     }
153     ninstances[PROC_TYPE_CPU] = n;
154     effective_ncpus = n;
155 
156     effective_ngpus = 0;
157     for (i=1; i<g_request->coprocs.n_rsc; i++) {
158         COPROC& cp = g_request->coprocs.coprocs[i];
159         int proc_type = coproc_type_name_to_num(cp.type);
160         if (proc_type < 0) continue;
161         n = cp.count;
162         if (n > MAX_GPUS) n = MAX_GPUS;
163         ninstances[proc_type] = n;
164         effective_ngpus += n;
165     }
166 
167     int mult = effective_ncpus + config.gpu_multiplier * effective_ngpus;
168     if (config.non_cpu_intensive) {
169         mult = 1;
170         ninstances[0] = 1;
171         if (effective_ngpus) effective_ngpus = 1;
172     }
173 
174     if (config.max_wus_to_send) {
175         g_wreq->max_jobs_per_rpc = mult * config.max_wus_to_send;
176     } else {
177         g_wreq->max_jobs_per_rpc = 999999;
178     }
179 
180     if (config.debug_quota) {
181         log_messages.printf(MSG_NORMAL,
182             "[quota] effective ncpus %d ngpus %d\n",
183             effective_ncpus, effective_ngpus
184         );
185     }
186     config.max_jobs_in_progress.reset(ninstances);
187 }
188 
find_user_friendly_name(int appid)189 const char* find_user_friendly_name(int appid) {
190     APP* app = ssp->lookup_app(appid);
191     if (app) return app->user_friendly_name;
192     return "deprecated application";
193 }
194 
update_quota(DB_HOST_APP_VERSION & hav)195 static void update_quota(DB_HOST_APP_VERSION& hav) {
196     if (config.daily_result_quota) {
197         if (hav.max_jobs_per_day == 0) {
198             hav.max_jobs_per_day = config.daily_result_quota;
199             if (config.debug_quota) {
200                 log_messages.printf(MSG_NORMAL,
201                     "[quota] [HAV#%lu] Initializing max_results_day to %d\n",
202                     hav.app_version_id,
203                     config.daily_result_quota
204                 );
205             }
206         }
207     }
208 
209     if (g_request->last_rpc_dayofyear != g_request->current_rpc_dayofyear) {
210         if (config.debug_quota) {
211             log_messages.printf(MSG_NORMAL,
212                 "[quota] [HOST#%lu] [HAV#%lu] Resetting n_jobs_today\n",
213                 g_reply->host.id, hav.app_version_id
214             );
215         }
216         hav.n_jobs_today = 0;
217     }
218 }
219 
220 // see how much RAM we can use on this machine
221 //
get_mem_sizes()222 static inline void get_mem_sizes() {
223     g_wreq->ram = g_reply->host.m_nbytes;
224     if (g_wreq->ram <= 0) g_wreq->ram = DEFAULT_RAM_SIZE;
225     g_wreq->usable_ram = g_wreq->ram;
226     double busy_frac = g_request->global_prefs.ram_max_used_busy_frac;
227     double idle_frac = g_request->global_prefs.ram_max_used_idle_frac;
228     double frac = 1;
229     if (busy_frac>0 && idle_frac>0) {
230         frac = std::max(busy_frac, idle_frac);
231         if (frac > 1) frac = 1;
232         g_wreq->usable_ram *= frac;
233     }
234 }
235 
236 // Decide whether or not this app version is 'reliable'
237 // An app version is reliable if the following conditions are true
238 // (for those that are set in the config file)
239 // 1) The host average turnaround is less than a threshold
240 // 2) consecutive_valid is above a threshold
241 // 3) The host results per day is equal to the max value
242 //
get_reliability_version(HOST_APP_VERSION & hav,double multiplier)243 void get_reliability_version(HOST_APP_VERSION& hav, double multiplier) {
244     if (hav.turnaround.n > MIN_HOST_SAMPLES && config.reliable_max_avg_turnaround) {
245 
246         if (hav.turnaround.get_avg() > config.reliable_max_avg_turnaround*multiplier) {
247             if (config.debug_send) {
248                 log_messages.printf(MSG_NORMAL,
249                     "[send] [AV#%lu] not reliable; avg turnaround: %.3f > %.3f hrs\n",
250                     hav.app_version_id,
251                     hav.turnaround.get_avg()/3600,
252                     config.reliable_max_avg_turnaround*multiplier/3600
253                 );
254             }
255             hav.reliable = false;
256             return;
257         }
258     }
259     if (hav.consecutive_valid < CONS_VALID_RELIABLE) {
260         if (config.debug_send) {
261             log_messages.printf(MSG_NORMAL,
262                 "[send] [AV#%lu] not reliable; cons valid %d < %d\n",
263                 hav.app_version_id,
264                 hav.consecutive_valid, CONS_VALID_RELIABLE
265             );
266         }
267         hav.reliable = false;
268         return;
269     }
270     if (config.daily_result_quota) {
271         if (hav.max_jobs_per_day < config.daily_result_quota) {
272             if (config.debug_send) {
273                 log_messages.printf(MSG_NORMAL,
274                     "[send] [AV#%lu] not reliable; max_jobs_per_day %d<%d\n",
275                     hav.app_version_id,
276                     hav.max_jobs_per_day,
277                     config.daily_result_quota
278                 );
279             }
280             hav.reliable = false;
281             return;
282         }
283     }
284     hav.reliable = true;
285     if (config.debug_send) {
286         log_messages.printf(MSG_NORMAL,
287             "[send] [HOST#%lu] app version %lu is reliable\n",
288             g_reply->host.id, hav.app_version_id
289         );
290     }
291     g_wreq->has_reliable_version = true;
292 }
293 
294 // decide whether do unreplicated jobs with this app version
295 //
set_trust(DB_HOST_APP_VERSION & hav)296 static void set_trust(DB_HOST_APP_VERSION& hav) {
297     hav.trusted = false;
298     if (hav.consecutive_valid < CONS_VALID_UNREPLICATED) {
299         if (config.debug_send) {
300             log_messages.printf(MSG_NORMAL,
301                 "[send] set_trust: cons valid %d < %d, don't use single replication\n",
302                 hav.consecutive_valid, CONS_VALID_UNREPLICATED
303             );
304         }
305         return;
306     }
307     double x = 1./hav.consecutive_valid;
308     if (drand() > x) hav.trusted = true;
309     if (config.debug_send) {
310         log_messages.printf(MSG_NORMAL,
311             "[send] set_trust: random choice for cons valid %d: %s\n",
312             hav.consecutive_valid, hav.trusted?"yes":"no"
313         );
314     }
315 }
316 
get_reliability_and_trust()317 static void get_reliability_and_trust() {
318     // Platforms other than Windows, Linux and Intel Macs need a
319     // larger set of computers to be marked reliable
320     //
321     double multiplier = 1.0;
322     if (strstr(g_reply->host.os_name,"Windows")
323         || strstr(g_reply->host.os_name,"Linux")
324         || (strstr(g_reply->host.os_name,"Darwin")
325             && !(strstr(g_reply->host.p_vendor,"Power Macintosh"))
326     )) {
327         multiplier = 1.0;
328     } else {
329         multiplier = 1.8;
330     }
331 
332     for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
333         DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
334         get_reliability_version(hav, multiplier);
335         set_trust(hav);
336     }
337 }
338 
339 // Compute the max additional disk usage we can impose on the host.
340 // Depending on the client version, it can either send us
341 // - d_total and d_free (pre 4 oct 2005)
342 // - the above plus d_boinc_used_total and d_boinc_used_project
343 //
max_allowable_disk()344 double max_allowable_disk() {
345     HOST host = g_request->host;
346     GLOBAL_PREFS prefs = g_request->global_prefs;
347     double x1, x2, x3, x;
348 
349     // defaults are from config.xml
350     // if not there these are used:
351     // -default_max_used_gb= 100
352     // -default_max_used_pct = 50
353     // -default_min_free_gb = .001
354     //
355     if (prefs.disk_max_used_gb == 0) {
356        prefs.disk_max_used_gb = config.default_disk_max_used_gb;
357     }
358     if (prefs.disk_max_used_pct == 0) {
359        prefs.disk_max_used_pct = config.default_disk_max_used_pct;
360     }
361     if (prefs.disk_min_free_gb < config.default_disk_min_free_gb) {
362        prefs.disk_min_free_gb = config.default_disk_min_free_gb;
363     }
364 
365     // no defaults for total/free disk space (host.d_total, d_free)
366     // if they're zero, client will get no work.
367     //
368 
369     if (host.d_boinc_used_total) {
370         // The post 4 oct 2005 case.
371         // Compute the max allowable additional disk usage based on prefs
372         //
373         x1 = prefs.disk_max_used_gb*GIGA - host.d_boinc_used_total;
374         x2 = host.d_total * prefs.disk_max_used_pct / 100.0
375             - host.d_boinc_used_total;
376         x3 = host.d_free - prefs.disk_min_free_gb*GIGA;      // may be negative
377         x = std::min(x1, std::min(x2, x3));
378 
379         // see which bound is the most stringent
380         //
381         if (x==x1) {
382             g_reply->disk_limits.max_used = x;
383         } else if (x==x2) {
384             g_reply->disk_limits.max_frac = x;
385         } else {
386             g_reply->disk_limits.min_free = x;
387         }
388     } else {
389         // here we don't know how much space BOINC is using.
390         // so we're kinda screwed.
391         // All we can do is assume that BOINC is using zero space.
392         // We can't honor the max_used for max_used_pct preferences.
393         // We can only honor the min_free pref.
394         //
395         x = host.d_free - prefs.disk_min_free_gb*GIGA;      // may be negative
396         g_reply->disk_limits.min_free = x;
397         x1 = x2 = x3 = 0;
398     }
399 
400     if (x < 0) {
401         if (config.debug_send) {
402             log_messages.printf(MSG_NORMAL,
403                 "[send] No disk space available: disk_max_used_gb %.2fGB disk_max_used_pct %.2f disk_min_free_gb %.2fGB\n",
404                 prefs.disk_max_used_gb,
405                 prefs.disk_max_used_pct,
406                 prefs.disk_min_free_gb
407             );
408             log_messages.printf(MSG_NORMAL,
409                 "[send] No disk space available: host.d_total %.2fGB host.d_free %.2fGB host.d_boinc_used_total %.2fGB\n",
410                 host.d_total/GIGA,
411                 host.d_free/GIGA,
412                 host.d_boinc_used_total/GIGA
413             );
414             log_messages.printf(MSG_NORMAL,
415                 "[send] No disk space available: x1 %.2fGB x2 %.2fGB x3 %.2fGB x %.2fGB\n",
416                 x1/GIGA, x2/GIGA, x3/GIGA, x/GIGA
417             );
418         }
419         g_wreq->disk.set_insufficient(-x);
420         x = 0;
421     }
422     return x;
423 }
424 
estimate_duration_unscaled(WORKUNIT & wu,BEST_APP_VERSION & bav)425 static double estimate_duration_unscaled(WORKUNIT& wu, BEST_APP_VERSION& bav) {
426     double rsc_fpops_est = wu.rsc_fpops_est;
427     if (rsc_fpops_est <= 0) rsc_fpops_est = 1e12;
428     return rsc_fpops_est/bav.host_usage.projected_flops;
429 }
430 
431 // Compute cpu_available_frac and gpu_available_frac.
432 // These are based on client-supplied data, so do sanity checks
433 //
434 #define FRAC_MIN 0.1
clamp_frac(double & frac,const char * name)435 static inline void clamp_frac(double& frac, const char* name) {
436     if (frac > 1) {
437         if (config.debug_send) {
438             log_messages.printf(MSG_NORMAL,
439                 "[send] %s=%f; setting to 1\n", name, frac
440             );
441         }
442         frac = 1;
443     } else if (frac < FRAC_MIN) {
444         if (config.debug_send) {
445             log_messages.printf(MSG_NORMAL,
446                 "[send] %s=%f; setting to %f\n", name, frac, FRAC_MIN
447             );
448         }
449         frac = .01;
450     }
451 }
452 
get_available_fracs()453 static inline void get_available_fracs() {
454     if (g_request->core_client_version<=41900) {
455         g_wreq->cpu_available_frac = g_reply->host.on_frac;
456         g_wreq->gpu_available_frac = g_reply->host.on_frac; // irrelevant
457     } else {
458         g_wreq->cpu_available_frac = g_reply->host.active_frac * g_reply->host.on_frac;
459         g_wreq->gpu_available_frac = g_reply->host.gpu_active_frac * g_reply->host.on_frac;
460     }
461     clamp_frac(g_wreq->cpu_available_frac, "CPU available fraction");
462     clamp_frac(g_wreq->gpu_available_frac, "GPU available fraction");
463 }
464 
available_frac(BEST_APP_VERSION & bav)465 double available_frac(BEST_APP_VERSION& bav) {
466     if (bav.host_usage.uses_gpu()) {
467         return g_wreq->gpu_available_frac;
468     } else {
469         return g_wreq->cpu_available_frac;
470     }
471 }
472 
473 // estimate the amount of real time to complete this WU,
474 // taking into account active_frac etc.
475 // Note: don't factor in resource_share_fraction.
476 // The core client doesn't necessarily round-robin across all projects.
477 //
estimate_duration(WORKUNIT & wu,BEST_APP_VERSION & bav)478 double estimate_duration(WORKUNIT& wu, BEST_APP_VERSION& bav) {
479     double edu = estimate_duration_unscaled(wu, bav);
480     double ed = edu/available_frac(bav);
481     if (config.debug_send_job) {
482         log_messages.printf(MSG_NORMAL,
483             "[send_job] est. duration for WU %lu: unscaled %.2f scaled %.2f\n",
484             wu.id, edu, ed
485         );
486     }
487     return ed;
488 }
489 
update_n_jobs_today()490 void update_n_jobs_today() {
491     for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
492         DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
493         update_quota(hav);
494     }
495 }
496 
update_estimated_delay(BEST_APP_VERSION & bav,double dt)497 static inline void update_estimated_delay(BEST_APP_VERSION& bav, double dt) {
498     int pt = bav.host_usage.proc_type;
499     if (pt == PROC_TYPE_CPU) {
500         g_request->cpu_estimated_delay += dt*bav.host_usage.avg_ncpus/g_request->host.p_ncpus;
501     } else {
502         COPROC* cp = g_request->coprocs.proc_type_to_coproc(pt);
503         cp->estimated_delay += dt*bav.host_usage.gpu_usage/cp->count;
504     }
505 }
506 
507 // insert "text" right after "after" in the given buffer
508 //
insert_after(char * buffer,const char * after,const char * text)509 static int insert_after(char* buffer, const char* after, const char* text) {
510     char* p;
511     char temp[BLOB_SIZE];
512 
513     if (strlen(buffer) + strlen(text) >= BLOB_SIZE-1) {
514         log_messages.printf(MSG_CRITICAL,
515             "insert_after: overflow: %d %d\n",
516             (int)strlen(buffer),
517             (int)strlen(text)
518         );
519         return ERR_BUFFER_OVERFLOW;
520     }
521     p = strstr(buffer, after);
522     if (!p) {
523         log_messages.printf(MSG_CRITICAL,
524             "insert_after: %s not found in %s\n", after, buffer
525         );
526         return ERR_XML_PARSE;
527     }
528     p += strlen(after);
529     // coverity[fixed_size_dest]
530     strcpy(temp, p);
531     strcpy(p, text);
532     strcat(p, temp);
533     return 0;
534 }
535 
536 // add elements to WU's xml_doc,
537 // in preparation for sending it to a client
538 //
insert_wu_tags(WORKUNIT & wu,APP & app)539 static int insert_wu_tags(WORKUNIT& wu, APP& app) {
540     char buf[BLOB_SIZE];
541 
542     sprintf(buf,
543         "    <rsc_fpops_est>%f</rsc_fpops_est>\n"
544         "    <rsc_fpops_bound>%f</rsc_fpops_bound>\n"
545         "    <rsc_memory_bound>%f</rsc_memory_bound>\n"
546         "    <rsc_disk_bound>%f</rsc_disk_bound>\n"
547         "    <name>%s</name>\n"
548         "    <app_name>%s</app_name>\n",
549         wu.rsc_fpops_est,
550         wu.rsc_fpops_bound,
551         wu.rsc_memory_bound,
552         wu.rsc_disk_bound,
553         wu.name,
554         app.name
555     );
556     return insert_after(wu.xml_doc, "<workunit>\n", buf);
557 }
558 
559 // Add the given workunit, app, and app version to a reply.
560 //
add_wu_to_reply(WORKUNIT & wu,SCHEDULER_REPLY &,APP * app,BEST_APP_VERSION * bavp)561 static int add_wu_to_reply(
562     WORKUNIT& wu, SCHEDULER_REPLY&, APP* app, BEST_APP_VERSION* bavp
563 ) {
564     int retval;
565     WORKUNIT wu2, wu3;
566 
567     APP_VERSION* avp = bavp->avp;
568 
569     // add the app, app_version, and workunit to the reply,
570     // but only if they aren't already there
571     //
572     if (avp) {
573         APP_VERSION av2=*avp, *avp2=&av2;
574 
575         if (strlen(config.replace_download_url_by_timezone)) {
576             process_av_timezone(avp, av2);
577         }
578 
579         g_reply->insert_app_unique(*app);
580         av2.bavp = bavp;
581         g_reply->insert_app_version_unique(*avp2);
582         if (config.debug_send) {
583             log_messages.printf(MSG_NORMAL,
584                 "[send] Sending app_version %s %lu %d %s; projected %.2f GFLOPS\n",
585                 app->name,
586                 avp2->platformid, avp2->version_num, avp2->plan_class,
587                 bavp->host_usage.projected_flops/1e9
588             );
589         }
590     }
591 
592     // modify the WU's xml_doc; add <name>, <rsc_*> etc.
593     //
594     wu2 = wu;       // make copy since we're going to modify its XML field
595 
596     // check if plan class specified memory usage
597     //
598     if (bavp->host_usage.mem_usage) {
599         wu2.rsc_memory_bound = bavp->host_usage.mem_usage;
600     }
601 
602     // adjust FPOPS figures for anonymous platform
603     //
604     if (bavp->cavp) {
605         wu2.rsc_fpops_est *= bavp->cavp->rsc_fpops_scale;
606         wu2.rsc_fpops_bound *= bavp->cavp->rsc_fpops_scale;
607     }
608     retval = insert_wu_tags(wu2, *app);
609     if (retval) {
610         log_messages.printf(MSG_CRITICAL,
611             "insert_wu_tags failed: %s\n", boincerror(retval)
612         );
613         return retval;
614     }
615     wu3 = wu2;
616     if (strlen(config.replace_download_url_by_timezone)) {
617         process_wu_timezone(wu2, wu3);
618     }
619 
620     g_reply->insert_workunit_unique(wu3);
621 
622     // switch to tighter policy for estimating delay
623     //
624     return 0;
625 }
626 
627 // add <name> tags to result's xml_doc_in
628 //
insert_name_tags(RESULT & result,WORKUNIT const & wu)629 static int insert_name_tags(RESULT& result, WORKUNIT const& wu) {
630     char buf[256];
631     int retval;
632 
633     sprintf(buf, "<name>%s</name>\n", result.name);
634     retval = insert_after(result.xml_doc_in, "<result>\n", buf);
635     if (retval) return retval;
636     sprintf(buf, "<wu_name>%s</wu_name>\n", wu.name);
637     retval = insert_after(result.xml_doc_in, "<result>\n", buf);
638     if (retval) return retval;
639     return 0;
640 }
641 
insert_deadline_tag(RESULT & result)642 static int insert_deadline_tag(RESULT& result) {
643     char buf[256];
644     sprintf(buf, "<report_deadline>%d</report_deadline>\n", result.report_deadline);
645     int retval = insert_after(result.xml_doc_in, "<result>\n", buf);
646     if (retval) return retval;
647     return 0;
648 }
649 
650 // update workunit fields when send an instance of it:
651 // - transition time
652 // - app_version_id, if app uses homogeneous app version
653 // - hr_class, if we're using HR
654 //
655 // In the latter two cases, the update is conditional on the field
656 // fields either being zero or the desired value.
657 // Some other scheduler instance might have updated it since we read the WU,
658 // and the transitioner might have set it to zero.
659 //
update_wu_on_send(WORKUNIT wu,time_t x,APP & app,BEST_APP_VERSION & bav)660 int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) {
661     DB_WORKUNIT dbwu;
662     char buf[256], buf2[256], where_clause[256];
663     int retval;
664 
665     dbwu.id = wu.id;
666 
667     // SQL note: can't use min() here
668     //
669     sprintf(buf,
670         "transition_time=if(transition_time<%d, transition_time, %d)",
671         (int)x, (int)x
672     );
673     strcpy(where_clause, "");
674     if (app.homogeneous_app_version) {
675         sprintf(buf2, ", app_version_id=%lu", bav.avp->id);
676         strcat(buf, buf2);
677         sprintf(where_clause,
678             "(app_version_id=0 or app_version_id=%lu)", bav.avp->id
679         );
680     }
681     if (app_hr_type(app)) {
682         int host_hr_class = hr_class(g_request->host, app_hr_type(app));
683         sprintf(buf2, ", hr_class=%d", host_hr_class);
684         strcat(buf, buf2);
685         if (strlen(where_clause)) {
686             strcat(where_clause, " and ");
687         }
688         sprintf(buf2, "(hr_class=0 or hr_class=%d)", host_hr_class);
689         strcat(where_clause, buf2);
690     }
691     retval = dbwu.update_field(buf, strlen(where_clause)?where_clause:NULL);
692     if (retval) return retval;
693     if (boinc_db.affected_rows() != 1) {
694         return ERR_DB_NOT_FOUND;
695     }
696     return 0;
697 }
698 
699 // return true iff a result for same WU is already being sent
700 //
wu_already_in_reply(WORKUNIT & wu)701 bool wu_already_in_reply(WORKUNIT& wu) {
702     unsigned int i;
703     for (i=0; i<g_reply->results.size(); i++) {
704         if (wu.id == g_reply->results[i].workunitid) {
705             return true;
706         }
707     }
708     return false;
709 }
710 
lock_sema()711 void lock_sema() {
712     lock_semaphore(sema_key);
713 }
714 
unlock_sema()715 void unlock_sema() {
716     unlock_semaphore(sema_key);
717 }
718 
have_apps(int pt)719 static inline bool have_apps(int pt) {
720     if (g_wreq->anonymous_platform) {
721         return g_wreq->client_has_apps_for_proc_type[pt];
722     } else {
723         return ssp->have_apps_for_proc_type[pt];
724     }
725 }
726 
727 // return true if additional work is needed,
728 // and there's disk space left,
729 // and we haven't exceeded result per RPC limit,
730 // and we haven't exceeded results per day limit
731 //
work_needed(bool locality_sched)732 bool work_needed(bool locality_sched) {
733     if (locality_sched) {
734         // if we've failed to send a result because of a transient condition,
735         // return false to preserve invariant
736         //
737         if (g_wreq->disk.insufficient) {
738             if (config.debug_send) {
739                 log_messages.printf(MSG_NORMAL,
740                     "[send] stopping work search - insufficient disk space\n"
741                 );
742             }
743             return false;
744         }
745         if (g_wreq->speed.insufficient) {
746             if (config.debug_send) {
747                 log_messages.printf(MSG_NORMAL,
748                     "[send] stopping work search - host too slow\n"
749                 );
750             }
751             return false;
752         }
753         if (g_wreq->mem.insufficient) {
754             if (config.debug_send) {
755                 log_messages.printf(MSG_NORMAL,
756                     "[send] stopping work search - insufficient memory\n"
757                 );
758             }
759             return false;
760         }
761         if (g_wreq->no_allowed_apps_available) {
762             if (config.debug_send) {
763                 log_messages.printf(MSG_NORMAL,
764                     "[send] stopping work search - no locality app selected\n"
765                 );
766             }
767             return false;
768         }
769     }
770 
771     // see if we've reached limits on in-progress jobs
772     //
773     bool some_type_allowed = false;
774 
775     for (int i=0; i<NPROC_TYPES; i++) {
776         if (!have_apps(i)) continue;
777 
778         // enforce project prefs limit on # of jobs in progress
779         //
780         bool proj_pref_exceeded = false;
781         int mj = g_wreq->project_prefs.max_jobs_in_progress;
782         if (mj) {
783             if (config.max_jobs_in_progress.project_limits.total.njobs >= mj) {
784                 proj_pref_exceeded = true;
785             }
786         }
787 
788         if (proj_pref_exceeded || config.max_jobs_in_progress.exceeded(NULL, i)) {
789             if (config.debug_quota) {
790                 log_messages.printf(MSG_NORMAL,
791                     "[quota] reached limit on %s jobs in progress\n",
792                     proc_type_name(i)
793                 );
794                 config.max_jobs_in_progress.print_log();
795             }
796             g_wreq->clear_req(i);
797             g_wreq->max_jobs_on_host_proc_type_exceeded[i] = true;
798         } else {
799             some_type_allowed = true;
800         }
801     }
802 
803     if (!some_type_allowed) {
804         if (config.debug_send) {
805             log_messages.printf(MSG_NORMAL,
806                 "[send] in-progress job limit exceeded\n"
807             );
808         }
809         g_wreq->max_jobs_on_host_exceeded = true;
810         return false;
811     }
812 
813     // see if we've reached max jobs per RPC
814     //
815     if (g_wreq->njobs_sent >= g_wreq->max_jobs_per_rpc) {
816         if (config.debug_quota) {
817             log_messages.printf(MSG_NORMAL,
818                 "[quota] stopping work search - njobs %d >= max_jobs_per_rpc %d\n",
819                 g_wreq->njobs_sent, g_wreq->max_jobs_per_rpc
820             );
821         }
822         return false;
823     }
824 
825 #if 0
826     if (config.debug_send) {
827         char buf[256], buf2[256];
828         strcpy(buf, "");
829         for (int i=0; i<NPROC_TYPES; i++) {
830             sprintf(buf2, " %s (%.2f, %.2f)",
831                 proc_type_name(i),
832                 g_wreq->req_secs[i],
833                 g_wreq->req_instances[i]
834             );
835             strcat(buf, buf2);
836         }
837         log_messages.printf(MSG_NORMAL,
838             "[send] work_needed: spec req %d sec to fill %.2f; %s\n",
839             g_wreq->rsc_spec_request,
840             g_wreq->seconds_to_fill,
841             buf
842         );
843     }
844 #endif
845     if (g_wreq->rsc_spec_request) {
846         for (int i=0; i<NPROC_TYPES; i++) {
847             if (g_wreq->need_proc_type(i) && have_apps(i)) {
848                 return true;
849             }
850         }
851     } else {
852         if (g_wreq->seconds_to_fill > 0) {
853             return true;
854         }
855     }
856     if (config.debug_send) {
857         log_messages.printf(MSG_NORMAL, "[send] don't need more work\n");
858     }
859     return false;
860 }
861 
862 // return the app version ID, or -2/-3/-4 if anonymous platform
863 //
get_app_version_id(BEST_APP_VERSION * bavp)864 inline static DB_ID_TYPE get_app_version_id(BEST_APP_VERSION* bavp) {
865     if (bavp->avp) {
866         return bavp->avp->id;
867     } else {
868         return bavp->cavp->host_usage.resource_type();
869     }
870 }
871 
add_result_to_reply(SCHED_DB_RESULT & result,WORKUNIT & wu,BEST_APP_VERSION * bavp,bool locality_scheduling)872 int add_result_to_reply(
873     SCHED_DB_RESULT& result,
874     WORKUNIT& wu,
875     BEST_APP_VERSION* bavp,
876     bool locality_scheduling
877 ) {
878     int retval;
879     bool resent_result = false;
880     APP* app = ssp->lookup_app(wu.appid);
881 
882     result.hostid = g_reply->host.id;
883     result.userid = g_reply->user.id;
884     result.sent_time = time(0);
885     result.report_deadline = result.sent_time + wu.delay_bound;
886     result.flops_estimate = bavp->host_usage.peak_flops;
887     result.app_version_id = get_app_version_id(bavp);
888 
889     // update WU DB record.
890     // This can fail in normal operation
891     // (other scheduler already updated hr_class or app_version_id)
892     // so do it before updating the result.
893     //
894     retval = update_wu_on_send(
895         wu, result.report_deadline + config.report_grace_period, *app, *bavp
896     );
897     if (retval == ERR_DB_NOT_FOUND) {
898         log_messages.printf(MSG_NORMAL,
899             "add_result_to_reply: WU already sent to other HR class or app version\n"
900         );
901         return retval;
902     } else if (retval) {
903         log_messages.printf(MSG_CRITICAL,
904             "add_result_to_reply: WU update failed: %d\n",
905             retval
906         );
907         return retval;
908     }
909 
910     // update result DB record.
911     // This can also fail in normal operation.
912     // In this case, in principle we should undo
913     // the changes we just made to the WU (or use a transaction)
914     // but I don't think it actually matters.
915     //
916     int old_server_state = result.server_state;
917 
918     if (result.server_state != RESULT_SERVER_STATE_IN_PROGRESS) {
919         // We're sending this result for the first time
920         //
921         result.server_state = RESULT_SERVER_STATE_IN_PROGRESS;
922     } else {
923         // Result was already sent to this host but was lost,
924         // so we're resending it.
925         //
926         resent_result = true;
927 
928         if (config.debug_send) {
929             log_messages.printf(MSG_NORMAL,
930                 "[send] [RESULT#%lu] [HOST#%lu] (resend lost work)\n",
931                 result.id, g_reply->host.id
932             );
933         }
934     }
935     retval = result.mark_as_sent(old_server_state, config.report_grace_period);
936     if (retval == ERR_DB_NOT_FOUND) {
937         log_messages.printf(MSG_CRITICAL,
938             "[RESULT#%lu] [HOST#%lu]: CAN'T SEND, already sent to another host\n",
939             result.id, g_reply->host.id
940         );
941     } else if (retval) {
942         log_messages.printf(MSG_CRITICAL,
943             "add_result_to_reply: can't update result: %s\n", boincerror(retval)
944         );
945     }
946     if (retval) return retval;
947 
948     // done with DB updates.
949     //
950 
951     retval = add_wu_to_reply(wu, *g_reply, app, bavp);
952     if (retval) return retval;
953 
954     // Adjust available disk space.
955     // In the locality scheduling locality case,
956     // reduce the available space by less than the workunit rsc_disk_bound,
957     // if the host already has the file or the file was not already sent.
958     //
959     if (!locality_scheduling || decrement_disk_space_locality(wu)) {
960         g_wreq->disk_available -= wu.rsc_disk_bound;
961     }
962 
963     double est_dur = estimate_duration(wu, *bavp);
964     if (config.debug_send) {
965         double max_time = wu.rsc_fpops_bound / bavp->host_usage.projected_flops;
966         char buf1[64],buf2[64];
967         secs_to_hmsf(est_dur, buf1);
968         secs_to_hmsf(max_time, buf2);
969         log_messages.printf(MSG_NORMAL,
970             "[send] [HOST#%lu] sending [RESULT#%lu %s] (est. dur. %.2fs (%s)) (max time %.2fs (%s))\n",
971             g_reply->host.id, result.id, result.name, est_dur, buf1, max_time, buf2
972         );
973     }
974 
975     // The following overwrites the result's xml_doc field.
976     // But that's OK cuz we're done with DB updates
977     //
978     retval = insert_name_tags(result, wu);
979     if (retval) {
980         log_messages.printf(MSG_CRITICAL,
981             "add_result_to_reply: can't insert name tags: %d\n",
982             retval
983         );
984         return retval;
985     }
986     retval = insert_deadline_tag(result);
987     if (retval) {
988         log_messages.printf(MSG_CRITICAL,
989             "add_result_to_reply: can't insert deadline tag: %s\n", boincerror(retval)
990         );
991         return retval;
992     }
993     result.bav = *bavp;
994     g_reply->insert_result(result);
995     if (g_wreq->rsc_spec_request) {
996         int pt = bavp->host_usage.proc_type;
997         if (pt == PROC_TYPE_CPU) {
998             g_wreq->req_secs[PROC_TYPE_CPU] -= est_dur;
999             g_wreq->req_instances[PROC_TYPE_CPU] -= bavp->host_usage.avg_ncpus;
1000         } else {
1001             g_wreq->req_secs[pt] -= est_dur;
1002             g_wreq->req_instances[pt] -= bavp->host_usage.gpu_usage;
1003         }
1004     } else {
1005         g_wreq->seconds_to_fill -= est_dur;
1006     }
1007     update_estimated_delay(*bavp, est_dur);
1008     g_wreq->njobs_sent++;
1009     config.max_jobs_in_progress.register_job(app, bavp->host_usage.proc_type);
1010     if (!resent_result) {
1011         DB_HOST_APP_VERSION* havp = bavp->host_app_version();
1012         if (havp) {
1013             havp->n_jobs_today++;
1014         }
1015     }
1016 
1017     // add this result to workload for simulation
1018     //
1019     if (config.workload_sim && g_request->have_other_results_list) {
1020         IP_RESULT ipr ("", time(0)+wu.delay_bound, est_dur);
1021         g_request->ip_results.push_back(ipr);
1022     }
1023 
1024     // mark job as done if debugging flag is set;
1025     // this is used by sched_driver.C (performance testing)
1026     //
1027     if (mark_jobs_done) {
1028         DB_WORKUNIT dbwu;
1029         char buf[256];
1030         sprintf(buf,
1031             "server_state=%d outcome=%d",
1032             RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_SUCCESS
1033         );
1034         result.update_field(buf);
1035 
1036         dbwu.id = wu.id;
1037         sprintf(buf, "transition_time=%ld", time(0));
1038         dbwu.update_field(buf);
1039 
1040     }
1041 
1042     // If we're sending an unreplicated job to an untrusted host,
1043     // mark it as replicated
1044     //
1045     if (wu.target_nresults == 1 && app->target_nresults > 1) {
1046         if (bavp->trusted) {
1047             if (config.debug_send) {
1048                 log_messages.printf(MSG_NORMAL,
1049                     "[send] [WU#%lu] using trusted app version, not replicating\n", wu.id
1050                 );
1051             }
1052         } else {
1053             DB_WORKUNIT dbwu;
1054             char buf[256];
1055             sprintf(buf,
1056                 "target_nresults=%d, min_quorum=%d, transition_time=%ld",
1057                 app->target_nresults, app->target_nresults, time(0)
1058             );
1059             dbwu.id = wu.id;
1060             if (config.debug_send) {
1061                 log_messages.printf(MSG_NORMAL,
1062                     "[send] [WU#%lu] sending to untrusted host, replicating\n", wu.id
1063                 );
1064             }
1065             retval = dbwu.update_field(buf);
1066             if (retval) {
1067                 log_messages.printf(MSG_CRITICAL,
1068                     "WU update failed: %s", boincerror(retval)
1069                 );
1070             }
1071         }
1072     }
1073 
1074     // if the app uses locality scheduling lite,
1075     // add the job's files to the list of those on host
1076     //
1077     if (app->locality_scheduling == LOCALITY_SCHED_LITE) {
1078         add_job_files_to_host(wu);
1079     }
1080 
1081     return 0;
1082 }
1083 
1084 // Send high-priority messages about things the user can change easily
1085 // (namely the driver version)
1086 // and low-priority messages about things that can't easily be changed,
1087 // but which may be interfering with getting tasks or latest apps
1088 //
send_gpu_property_messages(GPU_REQUIREMENTS & req,double ram,int version,const char * rsc_name)1089 static void send_gpu_property_messages(
1090     GPU_REQUIREMENTS& req, double ram, int version, const char* rsc_name
1091 ) {
1092     char buf[256];
1093     if (ram < req.min_ram) {
1094         sprintf(buf,
1095             "A minimum of %d MB (preferably %d MB) of video RAM is needed to process tasks using your computer's %s",
1096             (int) (req.min_ram/MEGA),
1097             (int) (req.opt_ram/MEGA),
1098             rsc_name
1099         );
1100         g_reply->insert_message(buf, "low");
1101     } else {
1102         if (version) {
1103             if (version < req.min_driver_version) {
1104                 sprintf(buf,
1105                     "%s: %s",
1106                     rsc_name,
1107                     _("Upgrade to the latest driver to process tasks using your computer's GPU")
1108                 );
1109                 g_reply->insert_message(buf, "notice");
1110             } else if (version < req.opt_driver_version) {
1111                 sprintf(buf,
1112                     "%s: %s",
1113                     rsc_name,
1114                     _("Upgrade to the latest driver to use all of this project's GPU applications")
1115                 );
1116                 g_reply->insert_message(buf, "low");
1117             }
1118         }
1119     }
1120 }
1121 
1122 // send messages complaining about lack of GPU or the properties of GPUs
1123 //
send_gpu_messages()1124 void send_gpu_messages() {
1125     // Mac client with GPU but too-old client
1126     //
1127     if (g_request->coprocs.nvidia.count
1128         && ssp->have_apps_for_proc_type[PROC_TYPE_NVIDIA_GPU]
1129         && strstr(g_request->host.os_name, "Darwin")
1130         && g_request->core_client_version < 61028
1131     ) {
1132         g_reply->insert_message(
1133             _("A newer version of BOINC is needed to use your NVIDIA GPU; please upgrade to the current version"),
1134             "notice"
1135         );
1136     }
1137 
1138     // GPU-only project, client lacks GPU
1139     //
1140     bool usable_gpu = false;
1141     bool have_gpu_apps = false;
1142     for (int i=1; i<NPROC_TYPES; i++) {
1143         if (ssp->have_apps_for_proc_type[i]) {
1144             have_gpu_apps = true;
1145             COPROC* cp = g_request->coprocs.proc_type_to_coproc(i);
1146             if (cp && cp->count) {
1147                 usable_gpu = true;
1148             }
1149         }
1150     }
1151     if (!ssp->have_apps_for_proc_type[PROC_TYPE_CPU]
1152         && have_gpu_apps
1153         && !usable_gpu
1154     ) {
1155         char buf[256];
1156         strcpy(buf, "");
1157         for (int i=1; i<NPROC_TYPES; i++) {
1158             if (ssp->have_apps_for_proc_type[i]) {
1159                 if (strlen(buf)) {
1160                     strcat(buf, " or ");
1161                 }
1162                 strcat(buf, proc_type_name(i));
1163             }
1164         }
1165         char msg[1024];
1166         sprintf(msg,
1167             _("An %s GPU is required to run tasks for this project"),
1168             buf
1169         );
1170         g_reply->insert_message(msg, "notice");
1171     }
1172 
1173     if (g_request->coprocs.nvidia.count && ssp->have_apps_for_proc_type[PROC_TYPE_NVIDIA_GPU]) {
1174         send_gpu_property_messages(gpu_requirements[PROC_TYPE_NVIDIA_GPU],
1175             g_request->coprocs.nvidia.prop.totalGlobalMem,
1176             g_request->coprocs.nvidia.display_driver_version,
1177             proc_type_name(PROC_TYPE_NVIDIA_GPU)
1178         );
1179     }
1180     if (g_request->coprocs.ati.count && ssp->have_apps_for_proc_type[PROC_TYPE_AMD_GPU]) {
1181         send_gpu_property_messages(gpu_requirements[PROC_TYPE_AMD_GPU],
1182             g_request->coprocs.ati.attribs.localRAM*MEGA,
1183             g_request->coprocs.ati.version_num,
1184             proc_type_name(PROC_TYPE_AMD_GPU)
1185         );
1186     }
1187     if (g_request->coprocs.intel_gpu.count && ssp->have_apps_for_proc_type[PROC_TYPE_INTEL_GPU]) {
1188         send_gpu_property_messages(gpu_requirements[PROC_TYPE_INTEL_GPU],
1189             g_request->coprocs.intel_gpu.opencl_prop.global_mem_size,
1190             0,
1191             proc_type_name(PROC_TYPE_INTEL_GPU)
1192         );
1193     }
1194 
1195 }
1196 
1197 // send messages to user about why jobs were or weren't sent,
1198 // recommendations for GPU driver upgrades, etc.
1199 //
send_user_messages()1200 static void send_user_messages() {
1201     char buf[512];
1202     unsigned int i;
1203     int j;
1204 
1205     // GPU messages aren't relevant if anonymous platform
1206     //
1207     if (!g_wreq->anonymous_platform) {
1208         send_gpu_messages();
1209     }
1210 
1211     // If work was sent from apps the user did not select, explain.
1212     // NOTE: this will have to be done differently with matchmaker scheduling
1213     //
1214     if (!config.locality_scheduling && !config.locality_scheduler_fraction && config.sched_old) {
1215         if (g_wreq->njobs_sent && !g_wreq->user_apps_only) {
1216             g_reply->insert_message(
1217                 "No tasks are available for the applications you have selected",
1218                 "low"
1219             );
1220 
1221             // Inform the user about applications with no work
1222             //
1223             for (i=0; i<g_wreq->project_prefs.selected_apps.size(); i++) {
1224                 if (!g_wreq->project_prefs.selected_apps[i].work_available) {
1225                     APP* app = ssp->lookup_app(g_wreq->project_prefs.selected_apps[i].appid);
1226                     // don't write message if the app is deprecated
1227                     //
1228                     if (app) {
1229                         char explanation[256];
1230                         sprintf(explanation,
1231                             "No tasks are available for %s",
1232                             find_user_friendly_name(g_wreq->project_prefs.selected_apps[i].appid)
1233                         );
1234                         g_reply->insert_message( explanation, "low");
1235                     }
1236                 }
1237             }
1238 
1239             // Tell the user about applications they didn't qualify for
1240             //
1241             for (j=0; j<selected_app_message_index; j++){
1242                 g_reply->insert_message(g_wreq->no_work_messages.at(j));
1243             }
1244             g_reply->insert_message(
1245                 "Your preferences allow tasks from applications other than those selected",
1246                 "low"
1247             );
1248             g_reply->insert_message(
1249                 "Sending tasks from other applications", "low"
1250             );
1251         }
1252     }
1253 
1254     // if client asked for work and we're not sending any, explain why
1255     //
1256     if (g_wreq->njobs_sent == 0 && g_request->work_req_seconds) {
1257         g_reply->set_delay(DELAY_NO_WORK_TEMP);
1258         g_reply->insert_message("No tasks sent", "low");
1259 
1260         // Tell the user about applications with no work
1261         //
1262         for (i=0; i<g_wreq->project_prefs.selected_apps.size(); i++) {
1263             if (!g_wreq->project_prefs.selected_apps[i].work_available) {
1264                 APP* app = ssp->lookup_app(g_wreq->project_prefs.selected_apps[i].appid);
1265                 // don't write message if the app is deprecated
1266                 if (app != NULL) {
1267                     sprintf(buf, "No tasks are available for %s",
1268                         find_user_friendly_name(
1269                             g_wreq->project_prefs.selected_apps[i].appid
1270                         )
1271                     );
1272                     g_reply->insert_message(buf, "low");
1273                 }
1274             }
1275         }
1276 
1277         for (i=0; i<g_wreq->no_work_messages.size(); i++){
1278             g_reply->insert_message(g_wreq->no_work_messages.at(i));
1279         }
1280 
1281         if (g_wreq->no_allowed_apps_available) {
1282             g_reply->insert_message(
1283                 _("No tasks are available for the applications you have selected."),
1284                 "low"
1285             );
1286         }
1287         if (g_wreq->speed.insufficient) {
1288             if (g_request->core_client_version>41900) {
1289                 sprintf(buf,
1290                     "Tasks won't finish in time: BOINC runs %.1f%% of the time; computation is enabled %.1f%% of that",
1291                     100*g_reply->host.on_frac, 100*g_reply->host.active_frac
1292                 );
1293             } else {
1294                 sprintf(buf,
1295                     "Tasks won't finish in time: Computer available %.1f%% of the time",
1296                     100*g_reply->host.on_frac
1297                 );
1298             }
1299             g_reply->insert_message(buf, "low");
1300         }
1301         if (g_wreq->hr_reject_temp) {
1302             g_reply->insert_message(
1303                 "Tasks are committed to other platforms",
1304                 "low"
1305             );
1306         }
1307         if (g_wreq->hr_reject_perm) {
1308             g_reply->insert_message(
1309                 _("Your computer type is not supported by this project"),
1310                 "notice"
1311             );
1312         }
1313         if (g_wreq->outdated_client) {
1314             g_reply->insert_message(
1315                 _("Newer BOINC version required; please install current version"),
1316                 "notice"
1317             );
1318             g_reply->set_delay(DELAY_NO_WORK_PERM);
1319             log_messages.printf(MSG_NORMAL,
1320                 "Not sending tasks because newer client version required\n"
1321             );
1322         }
1323         for (i=0; i<NPROC_TYPES; i++) {
1324             if (g_wreq->project_prefs.dont_use_proc_type[i] && ssp->have_apps_for_proc_type[i]) {
1325                 sprintf(buf,
1326                     _("Tasks for %s are available, but your preferences are set to not accept them"),
1327                     proc_type_name(i)
1328                 );
1329                 g_reply->insert_message(buf, "low");
1330             }
1331         }
1332         DB_HOST_APP_VERSION* havp = quota_exceeded_version();
1333         if (havp) {
1334             sprintf(buf, "This computer has finished a daily quota of %d tasks",
1335                 havp->max_jobs_per_day
1336             );
1337             g_reply->insert_message(buf, "low");
1338             if (config.debug_quota) {
1339                 log_messages.printf(MSG_NORMAL,
1340                     "[quota] Daily quota %d exceeded for app version %lu\n",
1341                     havp->max_jobs_per_day, havp->app_version_id
1342                 );
1343             }
1344             g_reply->set_delay(DELAY_NO_WORK_CACHE);
1345         }
1346         if (g_wreq->max_jobs_exceeded()) {
1347             sprintf(buf, "This computer has reached a limit on tasks in progress");
1348             g_reply->insert_message(buf, "low");
1349             g_reply->set_delay(DELAY_NO_WORK_CACHE);
1350         }
1351     }
1352 }
1353 
clamp_req_sec(double x)1354 static double clamp_req_sec(double x) {
1355     if (x < MIN_REQ_SECS) return MIN_REQ_SECS;
1356     if (x > MAX_REQ_SECS) return MAX_REQ_SECS;
1357     return x;
1358 }
1359 
1360 // prepare to send jobs, both resent and new;
1361 // decipher request type, fill in WORK_REQ
1362 //
send_work_setup()1363 void send_work_setup() {
1364     unsigned int i;
1365 
1366     g_wreq->seconds_to_fill = clamp_req_sec(g_request->work_req_seconds);
1367     g_wreq->req_secs[PROC_TYPE_CPU] = clamp_req_sec(g_request->cpu_req_secs);
1368     g_wreq->req_instances[PROC_TYPE_CPU] = g_request->cpu_req_instances;
1369     g_wreq->anonymous_platform = is_anonymous(g_request->platforms.list[0]);
1370 
1371     // decide on attributes of HOST_APP_VERSIONS
1372     //
1373     get_reliability_and_trust();
1374 
1375     // parse project preferences (e.g. no GPUs)
1376     //
1377     g_wreq->project_prefs.parse();
1378 
1379     if (g_wreq->anonymous_platform) {
1380         estimate_flops_anon_platform();
1381 
1382         for (i=0; i<NPROC_TYPES; i++) {
1383             g_wreq->client_has_apps_for_proc_type[i] = false;
1384         }
1385         for (i=0; i<g_request->client_app_versions.size(); i++) {
1386             CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
1387             int pt = cav.host_usage.proc_type;
1388             g_wreq->client_has_apps_for_proc_type[pt] = true;
1389         }
1390     }
1391     for (i=1; i<NPROC_TYPES; i++) {
1392         gpu_requirements[i].clear();
1393     }
1394 
1395     g_wreq->disk_available = max_allowable_disk();
1396     get_mem_sizes();
1397     get_available_fracs();
1398     g_wreq->get_job_limits();
1399 
1400     // do sanity checking on GPU scheduling parameters
1401     //
1402     for (i=1; i<NPROC_TYPES; i++) {
1403         COPROC* cp = g_request->coprocs.proc_type_to_coproc(i);
1404         if (cp && cp->count) {
1405             g_wreq->req_secs[i] = clamp_req_sec(cp->req_secs);
1406             g_wreq->req_instances[i] = cp->req_instances;
1407             if (cp->estimated_delay < 0) {
1408                 cp->estimated_delay = g_request->cpu_estimated_delay;
1409             }
1410         }
1411     }
1412     g_wreq->rsc_spec_request = false;
1413     for (i=0; i<NPROC_TYPES; i++) {
1414         if (g_wreq->req_secs[i]) {
1415             g_wreq->rsc_spec_request = true;
1416             break;
1417         }
1418     }
1419 
1420     for (i=0; i<g_request->other_results.size(); i++) {
1421         OTHER_RESULT& r = g_request->other_results[i];
1422         APP* app = NULL;
1423         int proc_type = PROC_TYPE_CPU;
1424         bool have_cav = false;
1425         if (r.app_version >= 0
1426             && r.app_version < (int)g_request->client_app_versions.size()
1427         ) {
1428             CLIENT_APP_VERSION& cav = g_request->client_app_versions[r.app_version];
1429             app = cav.app;
1430             if (app) {
1431                 have_cav = true;
1432                 proc_type = cav.host_usage.proc_type;
1433             }
1434         }
1435         if (!have_cav) {
1436             if (r.have_plan_class) {
1437                 proc_type = plan_class_to_proc_type(r.plan_class);
1438             }
1439         }
1440         config.max_jobs_in_progress.register_job(app, proc_type);
1441     }
1442 
1443     // print details of request to log
1444     //
1445     if (config.debug_quota) {
1446         log_messages.printf(MSG_NORMAL,
1447             "[quota] max jobs per RPC: %d\n", g_wreq->max_jobs_per_rpc
1448         );
1449         config.max_jobs_in_progress.print_log();
1450     }
1451     if (config.debug_send) {
1452         log_messages.printf(MSG_NORMAL,
1453             "[send] %s old scheduling; %s EDF sim\n",
1454             config.sched_old?"Using":"Not using",
1455             config.workload_sim?"Using":"Not using"
1456         );
1457         log_messages.printf(MSG_NORMAL,
1458             "[send] CPU: req %.2f sec, %.2f instances; est delay %.2f\n",
1459             g_wreq->req_secs[PROC_TYPE_CPU],
1460             g_wreq->req_instances[PROC_TYPE_CPU],
1461             g_request->cpu_estimated_delay
1462         );
1463         for (i=1; i<NPROC_TYPES; i++) {
1464             COPROC* cp = g_request->coprocs.proc_type_to_coproc(i);
1465             if (cp && cp->count) {
1466                 log_messages.printf(MSG_NORMAL,
1467                     "[send] %s: req %.2f sec, %.2f instances; est delay %.2f\n",
1468                     proc_type_name(i),
1469                     g_wreq->req_secs[i],
1470                     g_wreq->req_instances[i],
1471                     cp->estimated_delay
1472                 );
1473             }
1474         }
1475         log_messages.printf(MSG_NORMAL,
1476             "[send] work_req_seconds: %.2f secs\n",
1477             g_wreq->seconds_to_fill
1478         );
1479         log_messages.printf(MSG_NORMAL,
1480             "[send] available disk %.2f GB, work_buf_min %d\n",
1481             g_wreq->disk_available/GIGA,
1482             (int)g_request->global_prefs.work_buf_min()
1483         );
1484         log_messages.printf(MSG_NORMAL,
1485             "[send] on_frac %f active_frac %f gpu_active_frac %f\n",
1486             g_reply->host.on_frac,
1487             g_reply->host.active_frac,
1488             g_reply->host.gpu_active_frac
1489         );
1490         if (g_wreq->anonymous_platform) {
1491             log_messages.printf(MSG_NORMAL,
1492                 "[send] Anonymous platform app versions:\n"
1493             );
1494             for (i=0; i<g_request->client_app_versions.size(); i++) {
1495                 CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
1496                 char buf[256];
1497                 strcpy(buf, "");
1498                 int pt = cav.host_usage.proc_type;
1499                 if (pt) {
1500                     sprintf(buf, " %.2f %s GPU",
1501                         cav.host_usage.gpu_usage,
1502                         proc_type_name(pt)
1503                     );
1504                 }
1505 
1506                 log_messages.printf(MSG_NORMAL,
1507                     "   app: %s version %d cpus %.2f%s flops %fG\n",
1508                     cav.app_name,
1509                     cav.version_num,
1510                     cav.host_usage.avg_ncpus,
1511                     buf,
1512                     cav.host_usage.projected_flops/1e9
1513                 );
1514             }
1515         }
1516 #if 0
1517         log_messages.printf(MSG_NORMAL,
1518             "[send] p_vm_extensions_disabled: %s\n",
1519             g_request->host.p_vm_extensions_disabled?"yes":"no"
1520         );
1521 #endif
1522         log_messages.printf(MSG_NORMAL,
1523             "[send] CPU features: %s\n", g_request->host.p_features
1524         );
1525     }
1526 }
1527 
1528 // If a record is not in DB, create it.
1529 //
update_host_app_versions(vector<SCHED_DB_RESULT> & results,int hostid)1530 int update_host_app_versions(vector<SCHED_DB_RESULT>& results, int hostid) {
1531     vector<DB_HOST_APP_VERSION> new_havs;
1532     unsigned int i, j;
1533     int retval;
1534 
1535     for (i=0; i<results.size(); i++) {
1536         RESULT& r = results[i];
1537         int gavid = generalized_app_version_id(r.app_version_id, r.appid);
1538         DB_HOST_APP_VERSION* havp = gavid_to_havp(gavid);
1539         if (!havp) {
1540             bool found = false;
1541             for (j=0; j<new_havs.size(); j++) {
1542                 DB_HOST_APP_VERSION& hav = new_havs[j];
1543                 if (hav.app_version_id == gavid) {
1544                     found = true;
1545                     hav.n_jobs_today++;
1546                 }
1547             }
1548             if (!found) {
1549                 DB_HOST_APP_VERSION hav;
1550                 hav.clear();
1551                 hav.host_id = hostid;
1552                 hav.app_version_id = gavid;
1553                 hav.n_jobs_today = 1;
1554                 new_havs.push_back(hav);
1555             }
1556         }
1557     }
1558 
1559     // create new records
1560     //
1561     for (i=0; i<new_havs.size(); i++) {
1562         DB_HOST_APP_VERSION& hav = new_havs[i];
1563 
1564         retval = hav.insert();
1565         if (retval) {
1566             log_messages.printf(MSG_CRITICAL,
1567                 "hav.insert(): %s\n", boincerror(retval)
1568             );
1569         } else {
1570             if (config.debug_credit) {
1571                 log_messages.printf(MSG_NORMAL,
1572                     "[credit] created host_app_version record (%lu, %lu)\n",
1573                     hav.host_id, hav.app_version_id
1574                 );
1575             }
1576         }
1577     }
1578     return 0;
1579 }
1580 
send_work()1581 void send_work() {
1582     int retval;
1583 
1584     if (all_apps_use_hr && hr_unknown_platform(g_request->host)) {
1585         log_messages.printf(MSG_NORMAL,
1586             "Not sending work because unknown HR class\n"
1587         );
1588         g_wreq->hr_reject_perm = true;
1589         return;
1590     }
1591 
1592     if (config.enable_assignment) {
1593         if (send_targeted_jobs()) {
1594             if (config.debug_assignment) {
1595                 log_messages.printf(MSG_NORMAL,
1596                     "[assign] [HOST#%lu] sent assigned jobs\n", g_reply->host.id
1597                 );
1598             }
1599             goto done;
1600         }
1601     }
1602 
1603     if (config.enable_assignment_multi) {
1604         if (send_broadcast_jobs()) {
1605             if (config.debug_assignment) {
1606                 log_messages.printf(MSG_NORMAL,
1607                     "[assign] [HOST#%lu] sent assigned jobs\n", g_reply->host.id
1608                 );
1609             }
1610             goto done;
1611         }
1612     }
1613 
1614     if (config.workload_sim && g_request->have_other_results_list) {
1615         init_ip_results(
1616             g_request->global_prefs.work_buf_min(),
1617             g_wreq->effective_ncpus, g_request->ip_results
1618         );
1619     }
1620 
1621     // send non-CPU-intensive jobs if needed
1622     //
1623     if (ssp->have_nci_app) {
1624         send_nci();
1625     }
1626 
1627     if (!work_needed(false)) {
1628         goto done;
1629     }
1630 
1631     if (config.locality_scheduler_fraction > 0) {
1632         if (drand() < config.locality_scheduler_fraction) {
1633             if (config.debug_locality) {
1634                 log_messages.printf(MSG_NORMAL,
1635                     "[mixed] sending locality work first\n"
1636                 );
1637             }
1638             send_work_locality();
1639 
1640             // save 'insufficient' flags from the first scheduler
1641             bool disk_insufficient  = g_wreq->disk.insufficient;
1642             bool speed_insufficient = g_wreq->speed.insufficient;
1643             bool mem_insufficient   = g_wreq->mem.insufficient;
1644             bool no_allowed_apps_available = g_wreq->no_allowed_apps_available;
1645 
1646             // reset 'insufficient' flags for the second scheduler
1647             g_wreq->disk.insufficient = false;
1648             g_wreq->speed.insufficient = false;
1649             g_wreq->mem.insufficient = false;
1650             g_wreq->no_allowed_apps_available = false;
1651 
1652             if (config.debug_locality) {
1653                 log_messages.printf(MSG_NORMAL,
1654                     "[mixed] sending non-locality work second\n"
1655                 );
1656             }
1657             send_work_old();
1658 
1659             // recombine the 'insufficient' flags from the two schedulers
1660             g_wreq->disk.insufficient  = g_wreq->disk.insufficient && disk_insufficient;
1661             g_wreq->speed.insufficient = g_wreq->speed.insufficient && speed_insufficient;
1662             g_wreq->mem.insufficient   = g_wreq->mem.insufficient && mem_insufficient;
1663             g_wreq->no_allowed_apps_available = g_wreq->no_allowed_apps_available && no_allowed_apps_available;
1664 
1665         } else {
1666             if (config.debug_locality) {
1667                 log_messages.printf(MSG_NORMAL,
1668                     "[mixed] sending non-locality work first\n"
1669                 );
1670             }
1671 
1672             // save 'insufficient' flags from the first scheduler
1673             bool disk_insufficient  = g_wreq->disk.insufficient;
1674             bool speed_insufficient = g_wreq->speed.insufficient;
1675             bool mem_insufficient   = g_wreq->mem.insufficient;
1676             bool no_allowed_apps_available = g_wreq->no_allowed_apps_available;
1677 
1678             // reset 'insufficient' flags for the second scheduler
1679             g_wreq->disk.insufficient = false;
1680             g_wreq->speed.insufficient = false;
1681             g_wreq->mem.insufficient = false;
1682             g_wreq->no_allowed_apps_available = false;
1683 
1684             send_work_old();
1685             if (config.debug_locality) {
1686                 log_messages.printf(MSG_NORMAL,
1687                     "[mixed] sending locality work second\n"
1688                 );
1689             }
1690             send_work_locality();
1691 
1692             // recombine the 'insufficient' flags from the two schedulers
1693             g_wreq->disk.insufficient  = g_wreq->disk.insufficient && disk_insufficient;
1694             g_wreq->speed.insufficient = g_wreq->speed.insufficient && speed_insufficient;
1695             g_wreq->mem.insufficient   = g_wreq->mem.insufficient && mem_insufficient;
1696             g_wreq->no_allowed_apps_available = g_wreq->no_allowed_apps_available && no_allowed_apps_available;
1697 
1698         }
1699     } else if (config.locality_scheduling) {
1700         send_work_locality();
1701     } else if (config.sched_old) {
1702         send_work_old();
1703     } else {
1704         send_work_score();
1705     }
1706 
1707 done:
1708     retval = update_host_app_versions(g_reply->results, g_reply->host.id);
1709     if (retval) {
1710         log_messages.printf(MSG_CRITICAL,
1711             "update_host_app_versions() failed: %s\n", boincerror(retval)
1712         );
1713     }
1714     send_user_messages();
1715 }
1716 
1717 const char *BOINC_RCSID_32dcd335e7 = "$Id$";
1718