1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17 
18 // High-level logic for communicating with scheduling servers,
19 // and for merging the result of a scheduler RPC into the client state
20 
21 // The scheduler RPC mechanism is in scheduler_op.C
22 
23 #include "cpp.h"
24 
25 #ifdef _WIN32
26 #include "boinc_win.h"
27 #else
28 #include "config.h"
29 #include <cstdio>
30 #include <cmath>
31 #include <ctime>
32 #include <cstring>
33 #include <map>
34 #include <set>
35 #endif
36 
37 #ifdef _MSC_VER
38 #define snprintf _snprintf
39 #endif
40 
41 #include "crypt.h"
42 #include "error_numbers.h"
43 #include "file_names.h"
44 #include "filesys.h"
45 #include "parse.h"
46 #include "str_util.h"
47 #include "str_replace.h"
48 #include "url.h"
49 #include "util.h"
50 
51 #include "client_msgs.h"
52 #include "cs_notice.h"
53 #include "cs_trickle.h"
54 #include "project.h"
55 #include "result.h"
56 #include "scheduler_op.h"
57 #include "sandbox.h"
58 
59 #include "client_state.h"
60 
61 using std::max;
62 using std::vector;
63 using std::string;
64 
65 // quantities like avg CPU time decay by a factor of e every week
66 //
67 #define EXP_DECAY_RATE  (1./(SECONDS_PER_DAY*7))
68 
69 // try to report results this much before their deadline
70 //
71 #define REPORT_DEADLINE_CUSHION ((double)SECONDS_PER_DAY)
72 
73 // report results within this time after completion
74 //
75 #define MAX_REPORT_DELAY    3600
76 
77 #ifndef SIM
78 
79 // Write a scheduler request to a disk file,
80 // to be sent to a scheduling server
81 //
make_scheduler_request(PROJECT * p)82 int CLIENT_STATE::make_scheduler_request(PROJECT* p) {
83     char buf[1024];
84     MIOFILE mf;
85     unsigned int i;
86     RESULT* rp;
87 
88     get_sched_request_filename(*p, buf, sizeof(buf));
89     FILE* f = boinc_fopen(buf, "wb");
90     if (!f) return ERR_FOPEN;
91 
92     double trs = total_resource_share();
93     double rrs = runnable_resource_share(RSC_TYPE_ANY);
94     double prrs = potentially_runnable_resource_share();
95     double resource_share_fraction, rrs_fraction, prrs_fraction;
96     if (trs) {
97         resource_share_fraction = p->resource_share / trs;
98     } else {
99         resource_share_fraction = 1;
100     }
101     if (rrs) {
102         rrs_fraction = p->resource_share / rrs;
103     } else {
104         rrs_fraction = 1;
105     }
106     if (prrs) {
107         prrs_fraction = p->resource_share / prrs;
108     } else {
109         prrs_fraction = 1;
110     }
111 
112     // if hostid is zero, rpc_seqno better be also
113     //
114     if (!p->hostid) {
115         p->rpc_seqno = 0;
116     }
117 
118     mf.init_file(f);
119     fprintf(f,
120         "<scheduler_request>\n"
121         "    <authenticator>%s</authenticator>\n"
122         "    <hostid>%d</hostid>\n"
123         "    <rpc_seqno>%d</rpc_seqno>\n"
124         "    <core_client_major_version>%d</core_client_major_version>\n"
125         "    <core_client_minor_version>%d</core_client_minor_version>\n"
126         "    <core_client_release>%d</core_client_release>\n"
127         "    <resource_share_fraction>%f</resource_share_fraction>\n"
128         "    <rrs_fraction>%f</rrs_fraction>\n"
129         "    <prrs_fraction>%f</prrs_fraction>\n"
130         "    <duration_correction_factor>%f</duration_correction_factor>\n"
131         "    <allow_multiple_clients>%d</allow_multiple_clients>\n"
132         "    <sandbox>%d</sandbox>\n"
133         "    <dont_send_work>%d</dont_send_work>\n",
134         p->authenticator,
135         p->hostid,
136         p->rpc_seqno,
137         core_client_version.major,
138         core_client_version.minor,
139         core_client_version.release,
140         resource_share_fraction,
141         rrs_fraction,
142         prrs_fraction,
143         p->duration_correction_factor,
144         cc_config.allow_multiple_clients?1:0,
145         g_use_sandbox?1:0,
146         p->dont_request_more_work?1:0
147     );
148     work_fetch.write_request(f, p);
149 
150     // write client capabilities
151     //
152     fprintf(f,
153         "    <client_cap_plan_class>1</client_cap_plan_class>\n"
154     );
155 
156     write_platforms(p, mf);
157 
158     if (strlen(p->code_sign_key)) {
159         fprintf(f, "    <code_sign_key>\n%s\n</code_sign_key>\n", p->code_sign_key);
160     }
161 
162     // send working prefs
163     //
164     fprintf(f, "<working_global_preferences>\n");
165     global_prefs.write(mf);
166     fprintf(f, "</working_global_preferences>\n");
167 
168     // send master global preferences if present and not host-specific
169     //
170     if (!global_prefs.host_specific && boinc_file_exists(GLOBAL_PREFS_FILE_NAME)) {
171         FILE* fprefs = fopen(GLOBAL_PREFS_FILE_NAME, "r");
172         if (fprefs) {
173             copy_stream(fprefs, f);
174             fclose(fprefs);
175         }
176         PROJECT* pp = lookup_project(global_prefs.source_project);
177         if (pp && strlen(pp->email_hash)) {
178             fprintf(f,
179                 "<global_prefs_source_email_hash>%s</global_prefs_source_email_hash>\n",
180                 pp->email_hash
181             );
182         }
183     }
184 
185     // Of the projects with same email hash as this one,
186     // send the oldest cross-project ID.
187     // Use project URL as tie-breaker.
188     //
189     PROJECT* winner = p;
190     for (i=0; i<projects.size(); i++ ) {
191         PROJECT* project = projects[i];
192         if (project == p) continue;
193         if (strcmp(project->email_hash, p->email_hash)) continue;
194         if (project->cpid_time < winner->cpid_time) {
195             winner = project;
196         } else if (project->cpid_time == winner->cpid_time) {
197             if (strcmp(project->master_url, winner->master_url) < 0) {
198                 winner = project;
199             }
200         }
201     }
202     fprintf(f,
203         "<cross_project_id>%s</cross_project_id>\n",
204         winner->cross_project_id
205     );
206 
207     time_stats.write(mf, true);
208     net_stats.write(mf);
209     if (global_prefs.daily_xfer_period_days) {
210         daily_xfer_history.write_scheduler_request(
211             mf, global_prefs.daily_xfer_period_days
212         );
213     }
214 
215     // update hardware info, and write host info
216     //
217     host_info.get_host_info(false);
218     set_ncpus();
219     host_info.write(mf, !cc_config.suppress_net_info, false);
220 
221     // get and write disk usage
222     //
223     get_disk_usages();
224     get_disk_shares();
225     fprintf(f,
226         "    <disk_usage>\n"
227         "        <d_boinc_used_total>%f</d_boinc_used_total>\n"
228         "        <d_boinc_used_project>%f</d_boinc_used_project>\n"
229         "        <d_project_share>%f</d_project_share>\n"
230         "    </disk_usage>\n",
231         total_disk_usage, p->disk_usage, p->disk_share
232     );
233 
234     if (coprocs.n_rsc > 1) {
235         work_fetch.copy_requests();
236         coprocs.write_xml(mf, true);
237     }
238 
239     // report completed jobs
240     //
241     unsigned int last_reported_index = 0;
242     p->nresults_returned = 0;
243     for (i=0; i<results.size(); i++) {
244         rp = results[i];
245         if (rp->project == p && rp->ready_to_report) {
246             p->nresults_returned++;
247             rp->write(mf, true);
248         }
249         if (cc_config.max_tasks_reported
250             && (p->nresults_returned >= cc_config.max_tasks_reported)
251         ) {
252             last_reported_index = i;
253             break;
254         }
255     }
256 
257     read_trickle_files(p, f);
258 
259     // report sticky files as needed
260     //
261     for (i=0; i<file_infos.size(); i++) {
262         FILE_INFO* fip = file_infos[i];
263         if (fip->project != p) continue;
264         if (!fip->sticky) continue;
265         fprintf(f,
266             "    <file_info>\n"
267             "        <name>%s</name>\n"
268             "        <nbytes>%f</nbytes>\n"
269             "        <status>%d</status>\n"
270             "    </file_info>\n",
271             fip->name, fip->nbytes, fip->status
272         );
273     }
274 
275     if (p->send_time_stats_log) {
276         fprintf(f, "<time_stats_log>\n");
277         time_stats.get_log_after(p->send_time_stats_log, mf);
278         fprintf(f, "</time_stats_log>\n");
279     }
280     if (p->send_job_log) {
281         fprintf(f, "<job_log>\n");
282         job_log_filename(*p, buf, sizeof(buf));
283         send_log_after(buf, p->send_job_log, mf);
284         fprintf(f, "</job_log>\n");
285     }
286 
287     // send descriptions of app versions
288     //
289     fprintf(f, "<app_versions>\n");
290     int j=0;
291     for (i=0; i<app_versions.size(); i++) {
292         APP_VERSION* avp = app_versions[i];
293         if (avp->project != p) continue;
294         avp->write(mf, false);
295         avp->index = j++;
296     }
297     fprintf(f, "</app_versions>\n");
298 
299     // send descriptions of jobs in progress for this project
300     //
301     fprintf(f, "<other_results>\n");
302     for (i=0; i<results.size(); i++) {
303         rp = results[i];
304         if (rp->project != p) continue;
305         if ((last_reported_index && (i > last_reported_index)) || !rp->ready_to_report) {
306             fprintf(f,
307                 "    <other_result>\n"
308                 "        <name>%s</name>\n"
309                 "        <app_version>%d</app_version>\n",
310                 rp->name,
311                 rp->avp->index
312             );
313             // the following is for backwards compatibility w/ old schedulers
314             //
315             if (strlen(rp->avp->plan_class)) {
316                 fprintf(f,
317                     "        <plan_class>%s</plan_class>\n",
318                     rp->avp->plan_class
319                 );
320             }
321             fprintf(f,
322                 "    </other_result>\n"
323             );
324         }
325     }
326     fprintf(f, "</other_results>\n");
327 
328     // if requested by project, send summary of all in-progress results
329     // (for EDF simulation by scheduler)
330     //
331     if (p->send_full_workload) {
332         fprintf(f, "<in_progress_results>\n");
333         for (i=0; i<results.size(); i++) {
334             rp = results[i];
335             double x = rp->estimated_runtime_remaining();
336             if (x == 0) continue;
337             safe_strcpy(buf, "");
338             int rt = rp->avp->gpu_usage.rsc_type;
339             if (rt) {
340                 if (rt == rsc_index(GPU_TYPE_NVIDIA)) {
341                     snprintf(buf, sizeof(buf),
342                         "        <ncudas>%f</ncudas>\n",
343                         rp->avp->gpu_usage.usage
344                     );
345                 } else if (rt == rsc_index(GPU_TYPE_ATI)) {
346                     snprintf(buf, sizeof(buf),
347                         "        <natis>%f</natis>\n",
348                         rp->avp->gpu_usage.usage
349                     );
350                 } else if (rt == rsc_index(GPU_TYPE_INTEL)) {
351                     snprintf(buf, sizeof(buf),
352                         "        <nintel_gpus>%f</nintel_gpus>\n",
353                         rp->avp->gpu_usage.usage
354                     );
355                 }
356             }
357             fprintf(f,
358                 "    <ip_result>\n"
359                 "        <name>%s</name>\n"
360                 "        <report_deadline>%.0f</report_deadline>\n"
361                 "        <time_remaining>%.2f</time_remaining>\n"
362                 "        <avg_ncpus>%f</avg_ncpus>\n"
363                 "%s"
364                 "    </ip_result>\n",
365                 rp->name,
366                 rp->report_deadline,
367                 x,
368                 rp->avp->avg_ncpus,
369                 buf
370             );
371         }
372         fprintf(f, "</in_progress_results>\n");
373     }
374     FILE* cof = boinc_fopen(CLIENT_OPAQUE_FILENAME, "r");
375     if (cof) {
376         fprintf(f, "<client_opaque>\n<![CDATA[\n");
377         copy_stream(cof, f);
378         fprintf(f, "\n]]>\n</client_opaque>\n");
379         fclose(cof);
380     }
381 
382     if (strlen(client_brand)) {
383         fprintf(f, "    <client_brand>%s</client_brand>\n", client_brand);
384     }
385 
386     if (acct_mgr_info.using_am() && !acct_mgr_info.sched_req_opaque.empty()) {
387         fprintf(f, "<am_opaque>\n<![CDATA[\n");
388         fprintf(f, "%s", acct_mgr_info.sched_req_opaque.c_str());
389         fprintf(f, "\n]]>\n</am_opaque>\n");
390     }
391 
392     fprintf(f, "</scheduler_request>\n");
393 
394     fclose(f);
395     return 0;
396 }
397 
398 // the project is uploading, and it started recently
399 //
actively_uploading(PROJECT * p)400 static inline bool actively_uploading(PROJECT* p) {
401     for (unsigned int i=0; i<gstate.file_xfers->file_xfers.size(); i++) {
402         FILE_XFER* fxp = gstate.file_xfers->file_xfers[i];
403         if (fxp->fip->project != p) continue;
404         if (!fxp->is_upload) continue;
405         if (gstate.now - fxp->start_time > WF_UPLOAD_DEFER_INTERVAL) continue;
406         //msg_printf(p, MSG_INFO, "actively uploading");
407         return true;
408     }
409     //msg_printf(p, MSG_INFO, "not actively uploading");
410     return false;
411 }
412 
413 // If there is a request for an idle instance, return true.
414 // Clear other requests
415 //
idle_request()416 static inline bool idle_request() {
417     bool found = false;
418     for (int i=0; i<coprocs.n_rsc; i++) {
419         RSC_WORK_FETCH &rwf = rsc_work_fetch[i];
420         if (rwf.req_instances) {
421             found = true;
422         } else {
423             rwf.req_secs = 0;
424         }
425     }
426     return found;
427 }
428 
429 // Called once/sec.
430 // Initiate scheduler RPC activity if needed and possible
431 //
scheduler_rpc_poll()432 bool CLIENT_STATE::scheduler_rpc_poll() {
433     PROJECT *p;
434     static double last_time=0;
435     static double last_work_fetch_time = 0;
436     double elapsed_time;
437 
438     // are we currently doing a scheduler RPC?
439     // If so, see if it's finished
440     //
441     if (scheduler_op->state != SCHEDULER_OP_STATE_IDLE) {
442         last_time = now;
443         scheduler_op->poll();
444         return (scheduler_op->state == SCHEDULER_OP_STATE_IDLE);
445     }
446 
447     if (network_suspended) return false;
448 
449     // check only every 5 sec
450     //
451     if (!clock_change && now - last_time < SCHEDULER_RPC_POLL_PERIOD) return false;
452     last_time = now;
453 
454     if (scheduler_op->check_master_fetch_start()) {
455         return true;
456     }
457 
458     // If we haven't run benchmarks yet, don't do a scheduler RPC.
459     // We need to know CPU speed to handle app versions
460     //
461     if (!host_info.p_calculated) return false;
462 
463     // check for various reasons to contact particular projects.
464     // If we need to contact a project,
465     // see if we should ask it for work as well.
466     //
467     p = next_project_sched_rpc_pending();
468     if (p) {
469         if (log_flags.sched_op_debug) {
470             msg_printf(p, MSG_INFO, "sched RPC pending: %s",
471                 rpc_reason_string(p->sched_rpc_pending)
472             );
473         }
474         // if the user requested the RPC,
475         // clear backoffs to allow work requests
476         //
477         if (p->sched_rpc_pending == RPC_REASON_USER_REQ) {
478             for (int i=0; i<coprocs.n_rsc; i++) {
479                 p->rsc_pwf[i].clear_backoff();
480             }
481         }
482         work_fetch.piggyback_work_request(p);
483         scheduler_op->init_op_project(p, p->sched_rpc_pending);
484         return true;
485     }
486     p = next_project_trickle_up_pending();
487     if (p) {
488         work_fetch.piggyback_work_request(p);
489         scheduler_op->init_op_project(p, RPC_REASON_TRICKLE_UP);
490         return true;
491     }
492 
493     // stuff from here on is checked only once/minute,
494     // or if work fetch was requested.
495     //
496 
497     if (must_check_work_fetch) {
498         last_work_fetch_time = 0;
499     }
500     elapsed_time = now - last_work_fetch_time;
501     if (!clock_change && elapsed_time < WORK_FETCH_PERIOD) return false;
502     must_check_work_fetch = false;
503     last_work_fetch_time = now;
504 
505     // check if we should report finished results
506     //
507     bool suspend_soon = global_prefs.net_times.suspended(now + 1800);
508     suspend_soon |= global_prefs.cpu_times.suspended(now + 1800);
509     p = find_project_with_overdue_results(suspend_soon);
510     if (p) {
511         work_fetch.piggyback_work_request(p);
512         scheduler_op->init_op_project(p, RPC_REASON_RESULTS_DUE);
513         return true;
514     }
515 
516     // check if we should fetch work (do this last)
517     //
518 
519     switch (suspend_reason) {
520     case 0:
521     case SUSPEND_REASON_CPU_THROTTLE:
522         break;
523     default:
524         return false;
525     }
526     if (cc_config.fetch_minimal_work && had_or_requested_work) {
527         return false;
528     }
529 
530     p = work_fetch.choose_project();
531     if (p) {
532         if (actively_uploading(p)) {
533             bool dont_request = true;
534             if (p->pwf.request_if_idle_and_uploading) {
535                 if (idle_request()) {
536                     dont_request = false;
537                 }
538             }
539             if (dont_request) {
540                 if (log_flags.work_fetch_debug) {
541                     msg_printf(p, MSG_INFO,
542                         "[work_fetch] deferring work fetch; upload active"
543                     );
544                 }
545                 p->sched_rpc_pending = 0;
546                 return false;
547             }
548         }
549         scheduler_op->init_op_project(p, RPC_REASON_NEED_WORK);
550         return true;
551     }
552     return false;
553 }
554 
555 // Handle the reply from a scheduler
556 //
handle_scheduler_reply(PROJECT * project,char * scheduler_url)557 int CLIENT_STATE::handle_scheduler_reply(
558     PROJECT* project, char* scheduler_url
559 ) {
560     SCHEDULER_REPLY sr;
561     FILE* f;
562     int retval;
563     unsigned int i;
564     bool signature_valid, update_global_prefs=false, update_project_prefs=false;
565     char buf[1024], filename[256];
566     string old_gui_urls = project->gui_urls;
567     PROJECT* p2;
568     vector<RESULT*>new_results;
569 
570     project->last_rpc_time = now;
571 
572     if (work_fetch.requested_work()) {
573         had_or_requested_work = true;
574     }
575 
576     get_sched_reply_filename(*project, filename, sizeof(filename));
577 
578     f = fopen(filename, "rb");
579     if (!f) return ERR_FOPEN;
580     retval = sr.parse(f, project);
581     fclose(f);
582     if (retval) return retval;
583 
584     if (log_flags.sched_ops) {
585         if (work_fetch.requested_work()) {
586             snprintf(buf, sizeof(buf), ": got %d new tasks", (int)sr.results.size());
587         } else {
588             safe_strcpy(buf, "");
589         }
590         msg_printf(project, MSG_INFO, "Scheduler request completed%s", buf);
591     }
592     if (log_flags.sched_op_debug) {
593         if (sr.scheduler_version) {
594             msg_printf(project, MSG_INFO,
595                 "[sched_op] Server version %d",
596                 sr.scheduler_version
597             );
598         }
599     }
600 
601     // check that master URL is correct
602     //
603     if (strlen(sr.master_url)) {
604         canonicalize_master_url(sr.master_url, sizeof(sr.master_url));
605         string url1 = sr.master_url;
606         string url2 = project->master_url;
607         downcase_string(url1);
608         downcase_string(url2);
609         if (url1 != url2) {
610             p2 = lookup_project(sr.master_url);
611             if (p2) {
612                 msg_printf(project, MSG_USER_ALERT,
613                     "You are attached to this project twice.  Please remove projects named %s, then add %s",
614                     project->project_name,
615                     sr.master_url
616                 );
617             } else {
618                 msg_printf(project, MSG_USER_ALERT,
619                     _("This project is using an old URL.  When convenient, remove the project, then add %s"),
620                     sr.master_url
621                 );
622             }
623         }
624     }
625 
626     // make sure we don't already have a project of same name
627     //
628     bool dup_name = false;
629     for (i=0; i<projects.size(); i++) {
630         p2 = projects[i];
631         if (project == p2) continue;
632         if (!strcmp(p2->project_name, project->project_name)) {
633             dup_name = true;
634             break;
635         }
636     }
637     if (dup_name) {
638         msg_printf(project, MSG_INFO,
639             "Already attached to a project named %s (possibly with wrong URL)",
640             project->project_name
641         );
642         msg_printf(project, MSG_INFO,
643             "Consider detaching this project, then trying again"
644         );
645     }
646 
647     // show messages from server
648     //
649     bool got_notice = false;
650     for (i=0; i<sr.messages.size(); i++) {
651         USER_MESSAGE& um = sr.messages[i];
652         int prio = MSG_INFO;
653         if (!strcmp(um.priority.c_str(), "notice")) {
654             prio = MSG_SCHEDULER_ALERT;
655             got_notice = true;
656         }
657         msg_printf(project, prio, "%s", um.message.c_str());
658     }
659 
660     // if we requested work and didn't get notices,
661     // clear scheduler notices from this project
662     //
663     if (work_fetch.requested_work() && !got_notice) {
664         notices.remove_notices(project, REMOVE_SCHEDULER_MSG);
665     }
666 
667     if (log_flags.sched_op_debug && sr.request_delay) {
668         msg_printf(project, MSG_INFO,
669             "Project requested delay of %.0f seconds", sr.request_delay
670         );
671     }
672 
673     // if project is down, return error (so that we back off)
674     // and don't do anything else
675     //
676     if (sr.project_is_down) {
677         if (sr.request_delay) {
678             double x = now + sr.request_delay;
679             project->set_min_rpc_time(x, "project requested delay");
680         }
681         return ERR_PROJECT_DOWN;
682     }
683 
684     // if the scheduler reply includes global preferences,
685     // insert extra elements, write to disk, and parse
686     //
687     if (sr.global_prefs_xml) {
688         // ignore prefs if we're using prefs from account mgr
689         // BAM! currently has mixed http, https; trim off
690         char* p = strchr(global_prefs.source_project, '/');
691         char* q = strchr(gstate.acct_mgr_info.master_url, '/');
692         if (gstate.acct_mgr_info.using_am() && p && q && !strcmp(p, q)) {
693             if (log_flags.sched_op_debug) {
694                 msg_printf(project, MSG_INFO,
695                     "ignoring prefs from project; using prefs from AM"
696                 );
697             }
698         } else if (!global_prefs.host_specific || sr.scheduler_version >= 507) {
699             // ignore prefs if we have host-specific prefs
700             // and we're talking to an old scheduler
701             //
702             retval = save_global_prefs(
703                 sr.global_prefs_xml, project->master_url, scheduler_url
704             );
705             if (retval) {
706                 return retval;
707             }
708             update_global_prefs = true;
709         } else {
710             if (log_flags.sched_op_debug) {
711                 msg_printf(project, MSG_INFO,
712                     "ignoring prefs from old server; we have host-specific prefs"
713                 );
714             }
715         }
716     }
717 
718     // see if we have a new venue from this project
719     // (this must go AFTER the above, since otherwise
720     // global_prefs_source_project() is meaningless)
721     //
722     if (strcmp(project->host_venue, sr.host_venue)) {
723         safe_strcpy(project->host_venue, sr.host_venue);
724         msg_printf(project, MSG_INFO, "New computer location: %s", sr.host_venue);
725         update_project_prefs = true;
726 #ifdef USE_NET_PREFS
727         if (project == global_prefs_source_project()) {
728             safe_strcpy(main_host_venue, sr.host_venue);
729             update_global_prefs = true;
730         }
731 #endif
732     }
733 
734     if (update_global_prefs) {
735         read_global_prefs();
736     }
737 
738     // deal with project preferences (should always be there)
739     // If they've changed, write to account file,
740     // then parse to get our venue, and pass to running apps
741     //
742     if (sr.project_prefs_xml) {
743         if (strcmp(project->project_prefs.c_str(), sr.project_prefs_xml)) {
744             project->project_prefs = string(sr.project_prefs_xml);
745             update_project_prefs = true;
746         }
747     }
748 
749     // the account file has GUI URLs and project prefs.
750     // rewrite if either of these has changed
751     //
752     if (project->gui_urls != old_gui_urls || update_project_prefs) {
753         retval = project->write_account_file();
754         if (retval) {
755             msg_printf(project, MSG_INTERNAL_ERROR,
756                 "Can't write account file: %s", boincerror(retval)
757             );
758             return retval;
759         }
760     }
761 
762     if (update_project_prefs) {
763         project->parse_account_file();
764         project->parse_preferences_for_user_files();
765         active_tasks.request_reread_prefs(project);
766     }
767 
768     // show notice if we can't possibly get work from this project.
769     // This must come after parsing project prefs
770     //
771     project->show_no_work_notice();
772 
773     // if the scheduler reply includes a code-signing key,
774     // accept it if we don't already have one from the project.
775     // Otherwise verify its signature, using the key we already have.
776     //
777 
778     if (sr.code_sign_key) {
779         if (!strlen(project->code_sign_key)) {
780             safe_strcpy(project->code_sign_key, sr.code_sign_key);
781         } else {
782             if (sr.code_sign_key_signature) {
783                 retval = check_string_signature2(
784                     sr.code_sign_key, sr.code_sign_key_signature,
785                     project->code_sign_key, signature_valid
786                 );
787                 if (!retval && signature_valid) {
788                     safe_strcpy(project->code_sign_key, sr.code_sign_key);
789                 } else {
790                     msg_printf(project, MSG_INTERNAL_ERROR,
791                         "New code signing key doesn't validate"
792                     );
793                 }
794             } else {
795                 msg_printf(project, MSG_INTERNAL_ERROR,
796                     "Missing code sign key signature"
797                 );
798             }
799         }
800     }
801 
802     // copy new entities to client state
803     //
804     for (i=0; i<sr.apps.size(); i++) {
805         APP* app = lookup_app(project, sr.apps[i].name);
806         if (app) {
807             // update app attributes; they may have changed on server
808             //
809             safe_strcpy(app->user_friendly_name, sr.apps[i].user_friendly_name);
810             app->non_cpu_intensive = sr.apps[i].non_cpu_intensive;
811             app->fraction_done_exact = sr.apps[i].fraction_done_exact;
812         } else {
813             app = new APP;
814             *app = sr.apps[i];
815             retval = link_app(project, app);
816             if (retval) {
817                 msg_printf(project, MSG_INTERNAL_ERROR,
818                     "Can't handle application %s in scheduler reply", app->name
819                 );
820                 delete app;
821             } else {
822                 apps.push_back(app);
823             }
824         }
825     }
826     FILE_INFO* fip;
827     for (i=0; i<sr.file_infos.size(); i++) {
828         fip = lookup_file_info(project, sr.file_infos[i].name);
829         if (fip) {
830             fip->merge_info(sr.file_infos[i]);
831         } else {
832             fip = new FILE_INFO;
833             *fip = sr.file_infos[i];
834             if (fip->sticky_lifetime) {
835                 fip->sticky_expire_time = now + fip->sticky_lifetime;
836             }
837             retval = link_file_info(project, fip);
838             if (retval) {
839                 msg_printf(project, MSG_INTERNAL_ERROR,
840                     "Can't handle file %s in scheduler reply", fip->name
841                 );
842                 delete fip;
843             } else {
844                 file_infos.push_back(fip);
845             }
846         }
847     }
848     for (i=0; i<sr.file_deletes.size(); i++) {
849         fip = lookup_file_info(project, sr.file_deletes[i].c_str());
850         if (fip) {
851             if (log_flags.file_xfer_debug) {
852                 msg_printf(project, MSG_INFO,
853                     "[file_xfer_debug] Got server request to delete file %s",
854                     fip->name
855                 );
856             }
857             fip->sticky = false;
858         }
859     }
860     for (i=0; i<sr.app_versions.size(); i++) {
861         if (project->anonymous_platform) {
862             msg_printf(project, MSG_INTERNAL_ERROR,
863                 "App version returned from anonymous platform project; ignoring"
864             );
865             continue;
866         }
867         APP_VERSION& avpp = sr.app_versions[i];
868         if (strlen(avpp.platform) == 0) {
869             safe_strcpy(avpp.platform, get_primary_platform());
870         } else {
871             if (!is_supported_platform(avpp.platform)) {
872                 msg_printf(project, MSG_INTERNAL_ERROR,
873                     "App version has unsupported platform %s", avpp.platform
874                 );
875                 continue;
876             }
877         }
878         if (avpp.missing_coproc) {
879             msg_printf(project, MSG_INTERNAL_ERROR,
880                 "App version uses non-existent %s GPU",
881                 avpp.missing_coproc_name
882             );
883         }
884         APP* app = lookup_app(project, avpp.app_name);
885         if (!app) {
886             msg_printf(project, MSG_INTERNAL_ERROR,
887                 "Missing app %s", avpp.app_name
888             );
889             continue;
890         }
891         APP_VERSION* avp = lookup_app_version(
892             app, avpp.platform, avpp.version_num, avpp.plan_class
893         );
894         if (avp) {
895             // update app version attributes in case they changed on server
896             //
897             avp->avg_ncpus = avpp.avg_ncpus;
898             avp->max_ncpus = avpp.max_ncpus;
899             avp->flops = avpp.flops;
900             safe_strcpy(avp->cmdline, avpp.cmdline);
901             avp->gpu_usage = avpp.gpu_usage;
902             strlcpy(avp->api_version, avpp.api_version, sizeof(avp->api_version));
903             avp->dont_throttle = avpp.dont_throttle;
904             avp->needs_network = avpp.needs_network;
905 
906             // if we had download failures, clear them
907             //
908             avp->clear_errors();
909 
910             continue;
911         }
912         avp = new APP_VERSION;
913         *avp = avpp;
914         retval = link_app_version(project, avp);
915         if (retval) {
916              delete avp;
917              continue;
918         }
919         app_versions.push_back(avp);
920     }
921     for (i=0; i<sr.workunits.size(); i++) {
922         if (lookup_workunit(project, sr.workunits[i].name)) continue;
923         WORKUNIT* wup = new WORKUNIT;
924         *wup = sr.workunits[i];
925         wup->project = project;
926         retval = link_workunit(project, wup);
927         if (retval) {
928             msg_printf(project, MSG_INTERNAL_ERROR,
929                 "Can't handle task %s in scheduler reply", wup->name
930             );
931             delete wup;
932             continue;
933         }
934         wup->clear_errors();
935         workunits.push_back(wup);
936     }
937     double est_rsc_runtime[MAX_RSC];
938     for (int j=0; j<coprocs.n_rsc; j++) {
939         est_rsc_runtime[j] = 0;
940     }
941     for (i=0; i<sr.results.size(); i++) {
942         RESULT* rp2 = lookup_result(project, sr.results[i].name);
943         if (rp2) {
944             // see if project wants to change the job's deadline
945             //
946             if (sr.results[i].report_deadline != rp2->report_deadline) {
947                 rp2->report_deadline = sr.results[i].report_deadline;
948             } else {
949                 msg_printf(project, MSG_INTERNAL_ERROR,
950                     "Already have task %s\n", sr.results[i].name
951                 );
952             }
953             continue;
954         }
955         RESULT* rp = new RESULT;
956         *rp = sr.results[i];
957         retval = link_result(project, rp);
958         if (retval) {
959             msg_printf(project, MSG_INTERNAL_ERROR,
960                 "Can't handle task %s in scheduler reply", rp->name
961             );
962             delete rp;
963             continue;
964         }
965         if (strlen(rp->platform) == 0) {
966             safe_strcpy(rp->platform, get_primary_platform());
967             rp->version_num = latest_version(rp->wup->app, rp->platform);
968         }
969         rp->avp = lookup_app_version(
970             rp->wup->app, rp->platform, rp->version_num, rp->plan_class
971         );
972         if (!rp->avp) {
973             msg_printf(project, MSG_INTERNAL_ERROR,
974                 "No app version found for app %s platform %s ver %d class %s; discarding %s",
975                 rp->wup->app->name, rp->platform, rp->version_num, rp->plan_class, rp->name
976             );
977             delete rp;
978             continue;
979         }
980         if (rp->avp->missing_coproc) {
981             msg_printf(project, MSG_INTERNAL_ERROR,
982                 "Missing coprocessor for task %s; aborting", rp->name
983             );
984             rp->abort_inactive(EXIT_MISSING_COPROC);
985         } else {
986             rp->set_state(RESULT_NEW, "handle_scheduler_reply");
987             int rt = rp->avp->gpu_usage.rsc_type;
988             if (rt > 0) {
989                 est_rsc_runtime[rt] += rp->estimated_runtime();
990                 gpus_usable = true;
991                     // trigger a check of whether GPU is actually usable
992             } else {
993                 est_rsc_runtime[0] += rp->estimated_runtime();
994             }
995         }
996         rp->wup->version_num = rp->version_num;
997         rp->received_time = now;
998         new_results.push_back(rp);
999         results.push_back(rp);
1000     }
1001     sort_results();
1002 
1003     if (log_flags.sched_op_debug) {
1004         if (sr.results.size()) {
1005             for (int j=0; j<coprocs.n_rsc; j++) {
1006                 msg_printf(project, MSG_INFO,
1007                     "[sched_op] estimated total %s task duration: %.0f seconds",
1008                     rsc_name_long(j),
1009                     est_rsc_runtime[j]/time_stats.availability_frac(j)
1010                 );
1011             }
1012         }
1013     }
1014 
1015     // update records for ack'ed results
1016     //
1017     for (i=0; i<sr.result_acks.size(); i++) {
1018         if (log_flags.sched_op_debug) {
1019             msg_printf(project, MSG_INFO,
1020                 "[sched_op] handle_scheduler_reply(): got ack for task %s\n",
1021                 sr.result_acks[i].name
1022             );
1023         }
1024         RESULT* rp = lookup_result(project, sr.result_acks[i].name);
1025         if (rp) {
1026             rp->got_server_ack = true;
1027         } else {
1028             msg_printf(project, MSG_INTERNAL_ERROR,
1029                 "Got ack for task %s, but can't find it", sr.result_acks[i].name
1030             );
1031         }
1032     }
1033 
1034     // handle result abort requests
1035     //
1036     for (i=0; i<sr.result_abort.size(); i++) {
1037         RESULT* rp = lookup_result(project, sr.result_abort[i].name);
1038         if (rp) {
1039             ACTIVE_TASK* atp = lookup_active_task_by_result(rp);
1040             if (atp) {
1041                 atp->abort_task(EXIT_ABORTED_BY_PROJECT,
1042                     "aborted by project - no longer usable"
1043                 );
1044             } else {
1045                 rp->abort_inactive(EXIT_ABORTED_BY_PROJECT);
1046             }
1047         } else {
1048             msg_printf(project, MSG_INTERNAL_ERROR,
1049                 "Server requested abort of unknown task %s",
1050                 sr.result_abort[i].name
1051             );
1052         }
1053     }
1054     for (i=0; i<sr.result_abort_if_not_started.size(); i++) {
1055         RESULT* rp = lookup_result(project, sr.result_abort_if_not_started[i].name);
1056         if (!rp) {
1057             msg_printf(project, MSG_INTERNAL_ERROR,
1058                 "Server requested conditional abort of unknown task %s",
1059                 sr.result_abort_if_not_started[i].name
1060             );
1061             continue;
1062         }
1063         if (rp->not_started) {
1064             rp->abort_inactive(EXIT_ABORTED_BY_PROJECT);
1065         }
1066     }
1067 
1068     // remove acked trickle files
1069     //
1070     if (sr.message_ack) {
1071         remove_trickle_files(project);
1072     }
1073     if (sr.send_full_workload) {
1074         project->send_full_workload = true;
1075     }
1076     project->dont_use_dcf = sr.dont_use_dcf;
1077     project->send_time_stats_log = sr.send_time_stats_log;
1078     project->send_job_log = sr.send_job_log;
1079     project->trickle_up_pending = false;
1080 
1081     // The project returns a hostid only if it has created a new host record.
1082     // In that case reset RPC seqno
1083     //
1084     if (sr.hostid) {
1085         if (project->hostid) {
1086             // if we already have a host ID for this project,
1087             // we must have sent it a stale seqno,
1088             // which usually means our state file was copied from another host.
1089             // So generate a new host CPID.
1090             //
1091             generate_new_host_cpid();
1092             msg_printf(project, MSG_INFO,
1093                 "Generated new computer cross-project ID: %s",
1094                 host_info.host_cpid
1095             );
1096         }
1097         //msg_printf(project, MSG_INFO, "Changing host ID from %d to %d", project->hostid, sr.hostid);
1098         project->hostid = sr.hostid;
1099         project->rpc_seqno = 0;
1100     }
1101 
1102 #ifdef ENABLE_AUTO_UPDATE
1103     if (sr.auto_update.present) {
1104         if (!sr.auto_update.validate_and_link(project)) {
1105             auto_update = sr.auto_update;
1106         }
1107     }
1108 #endif
1109 
1110     project->project_files = sr.project_files;
1111     project->link_project_files();
1112     project->create_project_file_symlinks();
1113 
1114     if (log_flags.state_debug) {
1115         msg_printf(project, MSG_INFO,
1116             "[state] handle_scheduler_reply(): State after handle_scheduler_reply():"
1117         );
1118         print_summary();
1119     }
1120 
1121     // the following must precede the backoff and request_delay checks,
1122     // since it overrides them
1123     //
1124     if (sr.next_rpc_delay) {
1125         project->next_rpc_time = now + sr.next_rpc_delay;
1126     } else {
1127         project->next_rpc_time = 0;
1128     }
1129 
1130     work_fetch.handle_reply(project, &sr, new_results);
1131 
1132     project->nrpc_failures = 0;
1133     project->min_rpc_time = 0;
1134 
1135     if (sr.request_delay) {
1136         double x = now + sr.request_delay;
1137         project->set_min_rpc_time(x, "requested by project");
1138     }
1139 
1140     if (sr.got_rss_feeds) {
1141         handle_sr_feeds(sr.sr_feeds, project);
1142     }
1143 
1144     update_trickle_up_urls(project, sr.trickle_up_urls);
1145 
1146     // garbage collect in case the project sent us some irrelevant FILE_INFOs;
1147     // avoid starting transfers for them
1148     //
1149     gstate.garbage_collect_always();
1150 
1151     // if the user provided app_config.xml for this project,
1152     // apply it to any app versions we just got
1153     //
1154     project->app_configs.config_app_versions(project, false);
1155 
1156     // make sure we don't set no_rsc_apps[] for all processor types
1157     //
1158     if (!project->anonymous_platform) {
1159         project->check_no_rsc_apps();
1160     }
1161 
1162     return 0;
1163 }
1164 
1165 #endif // SIM
1166 
check_project_timeout()1167 void CLIENT_STATE::check_project_timeout() {
1168     unsigned int i;
1169     for (i=0; i<projects.size(); i++) {
1170         PROJECT* p = projects[i];
1171         if (p->possibly_backed_off && now > p->min_rpc_time) {
1172             p->possibly_backed_off = false;
1173             char buf[256];
1174             snprintf(buf, sizeof(buf), "Backoff ended for %s", p->get_project_name());
1175             request_work_fetch(buf);
1176         }
1177     }
1178 }
1179 
1180 // find a project that needs to have its master file fetched
1181 //
next_project_master_pending()1182 PROJECT* CLIENT_STATE::next_project_master_pending() {
1183     unsigned int i;
1184     PROJECT* p;
1185 
1186     for (i=0; i<projects.size(); i++) {
1187         p = projects[i];
1188         if (p->waiting_until_min_rpc_time()) continue;
1189         if (p->suspended_via_gui) continue;
1190         if (p->master_url_fetch_pending) {
1191             return p;
1192         }
1193     }
1194     return 0;
1195 }
1196 
1197 // find a project for which a scheduler RPC has been requested
1198 // - by user
1199 // - by an account manager
1200 // - by the project
1201 // - because the project was just attached (for verification)
1202 //
next_project_sched_rpc_pending()1203 PROJECT* CLIENT_STATE::next_project_sched_rpc_pending() {
1204     unsigned int i;
1205     PROJECT* p;
1206 
1207     for (i=0; i<projects.size(); i++) {
1208         p = projects[i];
1209         bool honor_backoff = true;
1210         bool honor_suspend = true;
1211 
1212         // is a scheduler-requested RPC due?
1213         //
1214         if (!p->sched_rpc_pending && p->next_rpc_time && p->next_rpc_time<now) {
1215             // don't do it if project is set to no new work
1216             // and has no jobs currently
1217             //
1218             if (!p->dont_request_more_work || p->has_results()) {
1219                 p->sched_rpc_pending = RPC_REASON_PROJECT_REQ;
1220             }
1221         }
1222 
1223         switch (p->sched_rpc_pending) {
1224         case RPC_REASON_USER_REQ:
1225             honor_backoff = false;
1226             honor_suspend = false;
1227             break;
1228         case RPC_REASON_ACCT_MGR_REQ:
1229             // This is critical for acct mgrs, to propagate new host CPIDs
1230             honor_suspend = false;
1231             break;
1232         case RPC_REASON_INIT:
1233             // always do the initial RPC so we can get project name etc.
1234             honor_suspend = false;
1235             break;
1236         case RPC_REASON_PROJECT_REQ:
1237             break;
1238         default:
1239             continue;
1240         }
1241         if (honor_backoff && p->waiting_until_min_rpc_time()) {
1242             continue;
1243         }
1244         if (honor_suspend && p->suspended_via_gui) {
1245             continue;
1246         }
1247         if (p->sched_rpc_pending) {
1248             return p;
1249         }
1250     }
1251     return 0;
1252 }
1253 
next_project_trickle_up_pending()1254 PROJECT* CLIENT_STATE::next_project_trickle_up_pending() {
1255     unsigned int i;
1256     PROJECT* p;
1257 
1258     for (i=0; i<projects.size(); i++) {
1259         p = projects[i];
1260         if (p->waiting_until_min_rpc_time()) continue;
1261         if (p->suspended_via_gui) continue;
1262         if (p->trickle_up_pending) {
1263             return p;
1264         }
1265     }
1266     return 0;
1267 }
1268 
1269 // find a project with finished results that should be reported.
1270 // This means:
1271 //    - we're not backing off contacting the project
1272 //    - no upload for that project is active
1273 //    - the result is ready_to_report (compute done; files uploaded)
1274 //    - we're within a day of the report deadline,
1275 //      or at least a day has elapsed since the result was completed,
1276 //      or we have a sporadic connection
1277 //      or the project is in "don't request more work" state
1278 //      or a network suspend period is coming up soon
1279 //      or the project has > RESULT_REPORT_IF_AT_LEAST_N results ready to report
1280 //
find_project_with_overdue_results(bool network_suspend_soon)1281 PROJECT* CLIENT_STATE::find_project_with_overdue_results(
1282     bool network_suspend_soon
1283 ) {
1284     unsigned int i;
1285     RESULT* r;
1286 
1287     for (i=0; i<projects.size(); i++) {
1288         PROJECT* p = projects[i];
1289         p->n_ready = 0;
1290         p->dont_contact = false;
1291         if (p->waiting_until_min_rpc_time()) p->dont_contact = true;
1292         if (p->suspended_via_gui) p->dont_contact = true;
1293 #ifndef SIM
1294         if (actively_uploading(p)) p->dont_contact = true;
1295 #endif
1296     }
1297 
1298     for (i=0; i<results.size(); i++) {
1299         r = results[i];
1300         if (!r->ready_to_report) continue;
1301 
1302         PROJECT* p = r->project;
1303         if (p->dont_contact) continue;
1304 
1305         if (p->dont_request_more_work) {
1306             return p;
1307         }
1308 
1309         if (r->report_immediately) {
1310             return p;
1311         }
1312 
1313         if (cc_config.report_results_immediately) {
1314             return p;
1315         }
1316 
1317         if (p->report_results_immediately) {
1318             return p;
1319         }
1320 
1321         if (r->app->report_results_immediately) {
1322             return p;
1323         }
1324 
1325         if (net_status.have_sporadic_connection) {
1326             return p;
1327         }
1328 
1329         if (network_suspend_soon) {
1330             return p;
1331         }
1332 
1333         double cushion = std::max(REPORT_DEADLINE_CUSHION, work_buf_min());
1334         if (gstate.now > r->report_deadline - cushion) {
1335             return p;
1336         }
1337 
1338         if (gstate.now > r->completed_time + MAX_REPORT_DELAY) {
1339             return p;
1340         }
1341 
1342         p->n_ready++;
1343         if (p->n_ready >= RESULT_REPORT_IF_AT_LEAST_N) {
1344             return p;
1345         }
1346     }
1347     return 0;
1348 }
1349 
1350 // trigger work fetch
1351 //
request_work_fetch(const char * where)1352 void CLIENT_STATE::request_work_fetch(const char* where) {
1353     if (log_flags.work_fetch_debug) {
1354         msg_printf(0, MSG_INFO, "[work_fetch] Request work fetch: %s", where);
1355     }
1356     must_check_work_fetch = true;
1357 }
1358 
1359