1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC. If not, see <http://www.gnu.org/licenses/>.
17
18 // High-level logic for communicating with scheduling servers,
19 // and for merging the result of a scheduler RPC into the client state
20
21 // The scheduler RPC mechanism is in scheduler_op.C
22
23 #include "cpp.h"
24
25 #ifdef _WIN32
26 #include "boinc_win.h"
27 #else
28 #include "config.h"
29 #include <cstdio>
30 #include <cmath>
31 #include <ctime>
32 #include <cstring>
33 #include <map>
34 #include <set>
35 #endif
36
37 #ifdef _MSC_VER
38 #define snprintf _snprintf
39 #endif
40
41 #include "crypt.h"
42 #include "error_numbers.h"
43 #include "file_names.h"
44 #include "filesys.h"
45 #include "parse.h"
46 #include "str_util.h"
47 #include "str_replace.h"
48 #include "url.h"
49 #include "util.h"
50
51 #include "client_msgs.h"
52 #include "cs_notice.h"
53 #include "cs_trickle.h"
54 #include "project.h"
55 #include "result.h"
56 #include "scheduler_op.h"
57 #include "sandbox.h"
58
59 #include "client_state.h"
60
61 using std::max;
62 using std::vector;
63 using std::string;
64
65 // quantities like avg CPU time decay by a factor of e every week
66 //
67 #define EXP_DECAY_RATE (1./(SECONDS_PER_DAY*7))
68
69 // try to report results this much before their deadline
70 //
71 #define REPORT_DEADLINE_CUSHION ((double)SECONDS_PER_DAY)
72
73 // report results within this time after completion
74 //
75 #define MAX_REPORT_DELAY 3600
76
77 #ifndef SIM
78
79 // Write a scheduler request to a disk file,
80 // to be sent to a scheduling server
81 //
make_scheduler_request(PROJECT * p)82 int CLIENT_STATE::make_scheduler_request(PROJECT* p) {
83 char buf[1024];
84 MIOFILE mf;
85 unsigned int i;
86 RESULT* rp;
87
88 get_sched_request_filename(*p, buf, sizeof(buf));
89 FILE* f = boinc_fopen(buf, "wb");
90 if (!f) return ERR_FOPEN;
91
92 double trs = total_resource_share();
93 double rrs = runnable_resource_share(RSC_TYPE_ANY);
94 double prrs = potentially_runnable_resource_share();
95 double resource_share_fraction, rrs_fraction, prrs_fraction;
96 if (trs) {
97 resource_share_fraction = p->resource_share / trs;
98 } else {
99 resource_share_fraction = 1;
100 }
101 if (rrs) {
102 rrs_fraction = p->resource_share / rrs;
103 } else {
104 rrs_fraction = 1;
105 }
106 if (prrs) {
107 prrs_fraction = p->resource_share / prrs;
108 } else {
109 prrs_fraction = 1;
110 }
111
112 // if hostid is zero, rpc_seqno better be also
113 //
114 if (!p->hostid) {
115 p->rpc_seqno = 0;
116 }
117
118 mf.init_file(f);
119 fprintf(f,
120 "<scheduler_request>\n"
121 " <authenticator>%s</authenticator>\n"
122 " <hostid>%d</hostid>\n"
123 " <rpc_seqno>%d</rpc_seqno>\n"
124 " <core_client_major_version>%d</core_client_major_version>\n"
125 " <core_client_minor_version>%d</core_client_minor_version>\n"
126 " <core_client_release>%d</core_client_release>\n"
127 " <resource_share_fraction>%f</resource_share_fraction>\n"
128 " <rrs_fraction>%f</rrs_fraction>\n"
129 " <prrs_fraction>%f</prrs_fraction>\n"
130 " <duration_correction_factor>%f</duration_correction_factor>\n"
131 " <allow_multiple_clients>%d</allow_multiple_clients>\n"
132 " <sandbox>%d</sandbox>\n"
133 " <dont_send_work>%d</dont_send_work>\n",
134 p->authenticator,
135 p->hostid,
136 p->rpc_seqno,
137 core_client_version.major,
138 core_client_version.minor,
139 core_client_version.release,
140 resource_share_fraction,
141 rrs_fraction,
142 prrs_fraction,
143 p->duration_correction_factor,
144 cc_config.allow_multiple_clients?1:0,
145 g_use_sandbox?1:0,
146 p->dont_request_more_work?1:0
147 );
148 work_fetch.write_request(f, p);
149
150 // write client capabilities
151 //
152 fprintf(f,
153 " <client_cap_plan_class>1</client_cap_plan_class>\n"
154 );
155
156 write_platforms(p, mf);
157
158 if (strlen(p->code_sign_key)) {
159 fprintf(f, " <code_sign_key>\n%s\n</code_sign_key>\n", p->code_sign_key);
160 }
161
162 // send working prefs
163 //
164 fprintf(f, "<working_global_preferences>\n");
165 global_prefs.write(mf);
166 fprintf(f, "</working_global_preferences>\n");
167
168 // send master global preferences if present and not host-specific
169 //
170 if (!global_prefs.host_specific && boinc_file_exists(GLOBAL_PREFS_FILE_NAME)) {
171 FILE* fprefs = fopen(GLOBAL_PREFS_FILE_NAME, "r");
172 if (fprefs) {
173 copy_stream(fprefs, f);
174 fclose(fprefs);
175 }
176 PROJECT* pp = lookup_project(global_prefs.source_project);
177 if (pp && strlen(pp->email_hash)) {
178 fprintf(f,
179 "<global_prefs_source_email_hash>%s</global_prefs_source_email_hash>\n",
180 pp->email_hash
181 );
182 }
183 }
184
185 // Of the projects with same email hash as this one,
186 // send the oldest cross-project ID.
187 // Use project URL as tie-breaker.
188 //
189 PROJECT* winner = p;
190 for (i=0; i<projects.size(); i++ ) {
191 PROJECT* project = projects[i];
192 if (project == p) continue;
193 if (strcmp(project->email_hash, p->email_hash)) continue;
194 if (project->cpid_time < winner->cpid_time) {
195 winner = project;
196 } else if (project->cpid_time == winner->cpid_time) {
197 if (strcmp(project->master_url, winner->master_url) < 0) {
198 winner = project;
199 }
200 }
201 }
202 fprintf(f,
203 "<cross_project_id>%s</cross_project_id>\n",
204 winner->cross_project_id
205 );
206
207 time_stats.write(mf, true);
208 net_stats.write(mf);
209 if (global_prefs.daily_xfer_period_days) {
210 daily_xfer_history.write_scheduler_request(
211 mf, global_prefs.daily_xfer_period_days
212 );
213 }
214
215 // update hardware info, and write host info
216 //
217 host_info.get_host_info(false);
218 set_ncpus();
219 host_info.write(mf, !cc_config.suppress_net_info, false);
220
221 // get and write disk usage
222 //
223 get_disk_usages();
224 get_disk_shares();
225 fprintf(f,
226 " <disk_usage>\n"
227 " <d_boinc_used_total>%f</d_boinc_used_total>\n"
228 " <d_boinc_used_project>%f</d_boinc_used_project>\n"
229 " <d_project_share>%f</d_project_share>\n"
230 " </disk_usage>\n",
231 total_disk_usage, p->disk_usage, p->disk_share
232 );
233
234 if (coprocs.n_rsc > 1) {
235 work_fetch.copy_requests();
236 coprocs.write_xml(mf, true);
237 }
238
239 // report completed jobs
240 //
241 unsigned int last_reported_index = 0;
242 p->nresults_returned = 0;
243 for (i=0; i<results.size(); i++) {
244 rp = results[i];
245 if (rp->project == p && rp->ready_to_report) {
246 p->nresults_returned++;
247 rp->write(mf, true);
248 }
249 if (cc_config.max_tasks_reported
250 && (p->nresults_returned >= cc_config.max_tasks_reported)
251 ) {
252 last_reported_index = i;
253 break;
254 }
255 }
256
257 read_trickle_files(p, f);
258
259 // report sticky files as needed
260 //
261 for (i=0; i<file_infos.size(); i++) {
262 FILE_INFO* fip = file_infos[i];
263 if (fip->project != p) continue;
264 if (!fip->sticky) continue;
265 fprintf(f,
266 " <file_info>\n"
267 " <name>%s</name>\n"
268 " <nbytes>%f</nbytes>\n"
269 " <status>%d</status>\n"
270 " </file_info>\n",
271 fip->name, fip->nbytes, fip->status
272 );
273 }
274
275 if (p->send_time_stats_log) {
276 fprintf(f, "<time_stats_log>\n");
277 time_stats.get_log_after(p->send_time_stats_log, mf);
278 fprintf(f, "</time_stats_log>\n");
279 }
280 if (p->send_job_log) {
281 fprintf(f, "<job_log>\n");
282 job_log_filename(*p, buf, sizeof(buf));
283 send_log_after(buf, p->send_job_log, mf);
284 fprintf(f, "</job_log>\n");
285 }
286
287 // send descriptions of app versions
288 //
289 fprintf(f, "<app_versions>\n");
290 int j=0;
291 for (i=0; i<app_versions.size(); i++) {
292 APP_VERSION* avp = app_versions[i];
293 if (avp->project != p) continue;
294 avp->write(mf, false);
295 avp->index = j++;
296 }
297 fprintf(f, "</app_versions>\n");
298
299 // send descriptions of jobs in progress for this project
300 //
301 fprintf(f, "<other_results>\n");
302 for (i=0; i<results.size(); i++) {
303 rp = results[i];
304 if (rp->project != p) continue;
305 if ((last_reported_index && (i > last_reported_index)) || !rp->ready_to_report) {
306 fprintf(f,
307 " <other_result>\n"
308 " <name>%s</name>\n"
309 " <app_version>%d</app_version>\n",
310 rp->name,
311 rp->avp->index
312 );
313 // the following is for backwards compatibility w/ old schedulers
314 //
315 if (strlen(rp->avp->plan_class)) {
316 fprintf(f,
317 " <plan_class>%s</plan_class>\n",
318 rp->avp->plan_class
319 );
320 }
321 fprintf(f,
322 " </other_result>\n"
323 );
324 }
325 }
326 fprintf(f, "</other_results>\n");
327
328 // if requested by project, send summary of all in-progress results
329 // (for EDF simulation by scheduler)
330 //
331 if (p->send_full_workload) {
332 fprintf(f, "<in_progress_results>\n");
333 for (i=0; i<results.size(); i++) {
334 rp = results[i];
335 double x = rp->estimated_runtime_remaining();
336 if (x == 0) continue;
337 safe_strcpy(buf, "");
338 int rt = rp->avp->gpu_usage.rsc_type;
339 if (rt) {
340 if (rt == rsc_index(GPU_TYPE_NVIDIA)) {
341 snprintf(buf, sizeof(buf),
342 " <ncudas>%f</ncudas>\n",
343 rp->avp->gpu_usage.usage
344 );
345 } else if (rt == rsc_index(GPU_TYPE_ATI)) {
346 snprintf(buf, sizeof(buf),
347 " <natis>%f</natis>\n",
348 rp->avp->gpu_usage.usage
349 );
350 } else if (rt == rsc_index(GPU_TYPE_INTEL)) {
351 snprintf(buf, sizeof(buf),
352 " <nintel_gpus>%f</nintel_gpus>\n",
353 rp->avp->gpu_usage.usage
354 );
355 }
356 }
357 fprintf(f,
358 " <ip_result>\n"
359 " <name>%s</name>\n"
360 " <report_deadline>%.0f</report_deadline>\n"
361 " <time_remaining>%.2f</time_remaining>\n"
362 " <avg_ncpus>%f</avg_ncpus>\n"
363 "%s"
364 " </ip_result>\n",
365 rp->name,
366 rp->report_deadline,
367 x,
368 rp->avp->avg_ncpus,
369 buf
370 );
371 }
372 fprintf(f, "</in_progress_results>\n");
373 }
374 FILE* cof = boinc_fopen(CLIENT_OPAQUE_FILENAME, "r");
375 if (cof) {
376 fprintf(f, "<client_opaque>\n<![CDATA[\n");
377 copy_stream(cof, f);
378 fprintf(f, "\n]]>\n</client_opaque>\n");
379 fclose(cof);
380 }
381
382 if (strlen(client_brand)) {
383 fprintf(f, " <client_brand>%s</client_brand>\n", client_brand);
384 }
385
386 if (acct_mgr_info.using_am() && !acct_mgr_info.sched_req_opaque.empty()) {
387 fprintf(f, "<am_opaque>\n<![CDATA[\n");
388 fprintf(f, "%s", acct_mgr_info.sched_req_opaque.c_str());
389 fprintf(f, "\n]]>\n</am_opaque>\n");
390 }
391
392 fprintf(f, "</scheduler_request>\n");
393
394 fclose(f);
395 return 0;
396 }
397
398 // the project is uploading, and it started recently
399 //
actively_uploading(PROJECT * p)400 static inline bool actively_uploading(PROJECT* p) {
401 for (unsigned int i=0; i<gstate.file_xfers->file_xfers.size(); i++) {
402 FILE_XFER* fxp = gstate.file_xfers->file_xfers[i];
403 if (fxp->fip->project != p) continue;
404 if (!fxp->is_upload) continue;
405 if (gstate.now - fxp->start_time > WF_UPLOAD_DEFER_INTERVAL) continue;
406 //msg_printf(p, MSG_INFO, "actively uploading");
407 return true;
408 }
409 //msg_printf(p, MSG_INFO, "not actively uploading");
410 return false;
411 }
412
413 // If there is a request for an idle instance, return true.
414 // Clear other requests
415 //
idle_request()416 static inline bool idle_request() {
417 bool found = false;
418 for (int i=0; i<coprocs.n_rsc; i++) {
419 RSC_WORK_FETCH &rwf = rsc_work_fetch[i];
420 if (rwf.req_instances) {
421 found = true;
422 } else {
423 rwf.req_secs = 0;
424 }
425 }
426 return found;
427 }
428
429 // Called once/sec.
430 // Initiate scheduler RPC activity if needed and possible
431 //
scheduler_rpc_poll()432 bool CLIENT_STATE::scheduler_rpc_poll() {
433 PROJECT *p;
434 static double last_time=0;
435 static double last_work_fetch_time = 0;
436 double elapsed_time;
437
438 // are we currently doing a scheduler RPC?
439 // If so, see if it's finished
440 //
441 if (scheduler_op->state != SCHEDULER_OP_STATE_IDLE) {
442 last_time = now;
443 scheduler_op->poll();
444 return (scheduler_op->state == SCHEDULER_OP_STATE_IDLE);
445 }
446
447 if (network_suspended) return false;
448
449 // check only every 5 sec
450 //
451 if (!clock_change && now - last_time < SCHEDULER_RPC_POLL_PERIOD) return false;
452 last_time = now;
453
454 if (scheduler_op->check_master_fetch_start()) {
455 return true;
456 }
457
458 // If we haven't run benchmarks yet, don't do a scheduler RPC.
459 // We need to know CPU speed to handle app versions
460 //
461 if (!host_info.p_calculated) return false;
462
463 // check for various reasons to contact particular projects.
464 // If we need to contact a project,
465 // see if we should ask it for work as well.
466 //
467 p = next_project_sched_rpc_pending();
468 if (p) {
469 if (log_flags.sched_op_debug) {
470 msg_printf(p, MSG_INFO, "sched RPC pending: %s",
471 rpc_reason_string(p->sched_rpc_pending)
472 );
473 }
474 // if the user requested the RPC,
475 // clear backoffs to allow work requests
476 //
477 if (p->sched_rpc_pending == RPC_REASON_USER_REQ) {
478 for (int i=0; i<coprocs.n_rsc; i++) {
479 p->rsc_pwf[i].clear_backoff();
480 }
481 }
482 work_fetch.piggyback_work_request(p);
483 scheduler_op->init_op_project(p, p->sched_rpc_pending);
484 return true;
485 }
486 p = next_project_trickle_up_pending();
487 if (p) {
488 work_fetch.piggyback_work_request(p);
489 scheduler_op->init_op_project(p, RPC_REASON_TRICKLE_UP);
490 return true;
491 }
492
493 // stuff from here on is checked only once/minute,
494 // or if work fetch was requested.
495 //
496
497 if (must_check_work_fetch) {
498 last_work_fetch_time = 0;
499 }
500 elapsed_time = now - last_work_fetch_time;
501 if (!clock_change && elapsed_time < WORK_FETCH_PERIOD) return false;
502 must_check_work_fetch = false;
503 last_work_fetch_time = now;
504
505 // check if we should report finished results
506 //
507 bool suspend_soon = global_prefs.net_times.suspended(now + 1800);
508 suspend_soon |= global_prefs.cpu_times.suspended(now + 1800);
509 p = find_project_with_overdue_results(suspend_soon);
510 if (p) {
511 work_fetch.piggyback_work_request(p);
512 scheduler_op->init_op_project(p, RPC_REASON_RESULTS_DUE);
513 return true;
514 }
515
516 // check if we should fetch work (do this last)
517 //
518
519 switch (suspend_reason) {
520 case 0:
521 case SUSPEND_REASON_CPU_THROTTLE:
522 break;
523 default:
524 return false;
525 }
526 if (cc_config.fetch_minimal_work && had_or_requested_work) {
527 return false;
528 }
529
530 p = work_fetch.choose_project();
531 if (p) {
532 if (actively_uploading(p)) {
533 bool dont_request = true;
534 if (p->pwf.request_if_idle_and_uploading) {
535 if (idle_request()) {
536 dont_request = false;
537 }
538 }
539 if (dont_request) {
540 if (log_flags.work_fetch_debug) {
541 msg_printf(p, MSG_INFO,
542 "[work_fetch] deferring work fetch; upload active"
543 );
544 }
545 p->sched_rpc_pending = 0;
546 return false;
547 }
548 }
549 scheduler_op->init_op_project(p, RPC_REASON_NEED_WORK);
550 return true;
551 }
552 return false;
553 }
554
555 // Handle the reply from a scheduler
556 //
handle_scheduler_reply(PROJECT * project,char * scheduler_url)557 int CLIENT_STATE::handle_scheduler_reply(
558 PROJECT* project, char* scheduler_url
559 ) {
560 SCHEDULER_REPLY sr;
561 FILE* f;
562 int retval;
563 unsigned int i;
564 bool signature_valid, update_global_prefs=false, update_project_prefs=false;
565 char buf[1024], filename[256];
566 string old_gui_urls = project->gui_urls;
567 PROJECT* p2;
568 vector<RESULT*>new_results;
569
570 project->last_rpc_time = now;
571
572 if (work_fetch.requested_work()) {
573 had_or_requested_work = true;
574 }
575
576 get_sched_reply_filename(*project, filename, sizeof(filename));
577
578 f = fopen(filename, "rb");
579 if (!f) return ERR_FOPEN;
580 retval = sr.parse(f, project);
581 fclose(f);
582 if (retval) return retval;
583
584 if (log_flags.sched_ops) {
585 if (work_fetch.requested_work()) {
586 snprintf(buf, sizeof(buf), ": got %d new tasks", (int)sr.results.size());
587 } else {
588 safe_strcpy(buf, "");
589 }
590 msg_printf(project, MSG_INFO, "Scheduler request completed%s", buf);
591 }
592 if (log_flags.sched_op_debug) {
593 if (sr.scheduler_version) {
594 msg_printf(project, MSG_INFO,
595 "[sched_op] Server version %d",
596 sr.scheduler_version
597 );
598 }
599 }
600
601 // check that master URL is correct
602 //
603 if (strlen(sr.master_url)) {
604 canonicalize_master_url(sr.master_url, sizeof(sr.master_url));
605 string url1 = sr.master_url;
606 string url2 = project->master_url;
607 downcase_string(url1);
608 downcase_string(url2);
609 if (url1 != url2) {
610 p2 = lookup_project(sr.master_url);
611 if (p2) {
612 msg_printf(project, MSG_USER_ALERT,
613 "You are attached to this project twice. Please remove projects named %s, then add %s",
614 project->project_name,
615 sr.master_url
616 );
617 } else {
618 msg_printf(project, MSG_USER_ALERT,
619 _("This project is using an old URL. When convenient, remove the project, then add %s"),
620 sr.master_url
621 );
622 }
623 }
624 }
625
626 // make sure we don't already have a project of same name
627 //
628 bool dup_name = false;
629 for (i=0; i<projects.size(); i++) {
630 p2 = projects[i];
631 if (project == p2) continue;
632 if (!strcmp(p2->project_name, project->project_name)) {
633 dup_name = true;
634 break;
635 }
636 }
637 if (dup_name) {
638 msg_printf(project, MSG_INFO,
639 "Already attached to a project named %s (possibly with wrong URL)",
640 project->project_name
641 );
642 msg_printf(project, MSG_INFO,
643 "Consider detaching this project, then trying again"
644 );
645 }
646
647 // show messages from server
648 //
649 bool got_notice = false;
650 for (i=0; i<sr.messages.size(); i++) {
651 USER_MESSAGE& um = sr.messages[i];
652 int prio = MSG_INFO;
653 if (!strcmp(um.priority.c_str(), "notice")) {
654 prio = MSG_SCHEDULER_ALERT;
655 got_notice = true;
656 }
657 msg_printf(project, prio, "%s", um.message.c_str());
658 }
659
660 // if we requested work and didn't get notices,
661 // clear scheduler notices from this project
662 //
663 if (work_fetch.requested_work() && !got_notice) {
664 notices.remove_notices(project, REMOVE_SCHEDULER_MSG);
665 }
666
667 if (log_flags.sched_op_debug && sr.request_delay) {
668 msg_printf(project, MSG_INFO,
669 "Project requested delay of %.0f seconds", sr.request_delay
670 );
671 }
672
673 // if project is down, return error (so that we back off)
674 // and don't do anything else
675 //
676 if (sr.project_is_down) {
677 if (sr.request_delay) {
678 double x = now + sr.request_delay;
679 project->set_min_rpc_time(x, "project requested delay");
680 }
681 return ERR_PROJECT_DOWN;
682 }
683
684 // if the scheduler reply includes global preferences,
685 // insert extra elements, write to disk, and parse
686 //
687 if (sr.global_prefs_xml) {
688 // ignore prefs if we're using prefs from account mgr
689 // BAM! currently has mixed http, https; trim off
690 char* p = strchr(global_prefs.source_project, '/');
691 char* q = strchr(gstate.acct_mgr_info.master_url, '/');
692 if (gstate.acct_mgr_info.using_am() && p && q && !strcmp(p, q)) {
693 if (log_flags.sched_op_debug) {
694 msg_printf(project, MSG_INFO,
695 "ignoring prefs from project; using prefs from AM"
696 );
697 }
698 } else if (!global_prefs.host_specific || sr.scheduler_version >= 507) {
699 // ignore prefs if we have host-specific prefs
700 // and we're talking to an old scheduler
701 //
702 retval = save_global_prefs(
703 sr.global_prefs_xml, project->master_url, scheduler_url
704 );
705 if (retval) {
706 return retval;
707 }
708 update_global_prefs = true;
709 } else {
710 if (log_flags.sched_op_debug) {
711 msg_printf(project, MSG_INFO,
712 "ignoring prefs from old server; we have host-specific prefs"
713 );
714 }
715 }
716 }
717
718 // see if we have a new venue from this project
719 // (this must go AFTER the above, since otherwise
720 // global_prefs_source_project() is meaningless)
721 //
722 if (strcmp(project->host_venue, sr.host_venue)) {
723 safe_strcpy(project->host_venue, sr.host_venue);
724 msg_printf(project, MSG_INFO, "New computer location: %s", sr.host_venue);
725 update_project_prefs = true;
726 #ifdef USE_NET_PREFS
727 if (project == global_prefs_source_project()) {
728 safe_strcpy(main_host_venue, sr.host_venue);
729 update_global_prefs = true;
730 }
731 #endif
732 }
733
734 if (update_global_prefs) {
735 read_global_prefs();
736 }
737
738 // deal with project preferences (should always be there)
739 // If they've changed, write to account file,
740 // then parse to get our venue, and pass to running apps
741 //
742 if (sr.project_prefs_xml) {
743 if (strcmp(project->project_prefs.c_str(), sr.project_prefs_xml)) {
744 project->project_prefs = string(sr.project_prefs_xml);
745 update_project_prefs = true;
746 }
747 }
748
749 // the account file has GUI URLs and project prefs.
750 // rewrite if either of these has changed
751 //
752 if (project->gui_urls != old_gui_urls || update_project_prefs) {
753 retval = project->write_account_file();
754 if (retval) {
755 msg_printf(project, MSG_INTERNAL_ERROR,
756 "Can't write account file: %s", boincerror(retval)
757 );
758 return retval;
759 }
760 }
761
762 if (update_project_prefs) {
763 project->parse_account_file();
764 project->parse_preferences_for_user_files();
765 active_tasks.request_reread_prefs(project);
766 }
767
768 // show notice if we can't possibly get work from this project.
769 // This must come after parsing project prefs
770 //
771 project->show_no_work_notice();
772
773 // if the scheduler reply includes a code-signing key,
774 // accept it if we don't already have one from the project.
775 // Otherwise verify its signature, using the key we already have.
776 //
777
778 if (sr.code_sign_key) {
779 if (!strlen(project->code_sign_key)) {
780 safe_strcpy(project->code_sign_key, sr.code_sign_key);
781 } else {
782 if (sr.code_sign_key_signature) {
783 retval = check_string_signature2(
784 sr.code_sign_key, sr.code_sign_key_signature,
785 project->code_sign_key, signature_valid
786 );
787 if (!retval && signature_valid) {
788 safe_strcpy(project->code_sign_key, sr.code_sign_key);
789 } else {
790 msg_printf(project, MSG_INTERNAL_ERROR,
791 "New code signing key doesn't validate"
792 );
793 }
794 } else {
795 msg_printf(project, MSG_INTERNAL_ERROR,
796 "Missing code sign key signature"
797 );
798 }
799 }
800 }
801
802 // copy new entities to client state
803 //
804 for (i=0; i<sr.apps.size(); i++) {
805 APP* app = lookup_app(project, sr.apps[i].name);
806 if (app) {
807 // update app attributes; they may have changed on server
808 //
809 safe_strcpy(app->user_friendly_name, sr.apps[i].user_friendly_name);
810 app->non_cpu_intensive = sr.apps[i].non_cpu_intensive;
811 app->fraction_done_exact = sr.apps[i].fraction_done_exact;
812 } else {
813 app = new APP;
814 *app = sr.apps[i];
815 retval = link_app(project, app);
816 if (retval) {
817 msg_printf(project, MSG_INTERNAL_ERROR,
818 "Can't handle application %s in scheduler reply", app->name
819 );
820 delete app;
821 } else {
822 apps.push_back(app);
823 }
824 }
825 }
826 FILE_INFO* fip;
827 for (i=0; i<sr.file_infos.size(); i++) {
828 fip = lookup_file_info(project, sr.file_infos[i].name);
829 if (fip) {
830 fip->merge_info(sr.file_infos[i]);
831 } else {
832 fip = new FILE_INFO;
833 *fip = sr.file_infos[i];
834 if (fip->sticky_lifetime) {
835 fip->sticky_expire_time = now + fip->sticky_lifetime;
836 }
837 retval = link_file_info(project, fip);
838 if (retval) {
839 msg_printf(project, MSG_INTERNAL_ERROR,
840 "Can't handle file %s in scheduler reply", fip->name
841 );
842 delete fip;
843 } else {
844 file_infos.push_back(fip);
845 }
846 }
847 }
848 for (i=0; i<sr.file_deletes.size(); i++) {
849 fip = lookup_file_info(project, sr.file_deletes[i].c_str());
850 if (fip) {
851 if (log_flags.file_xfer_debug) {
852 msg_printf(project, MSG_INFO,
853 "[file_xfer_debug] Got server request to delete file %s",
854 fip->name
855 );
856 }
857 fip->sticky = false;
858 }
859 }
860 for (i=0; i<sr.app_versions.size(); i++) {
861 if (project->anonymous_platform) {
862 msg_printf(project, MSG_INTERNAL_ERROR,
863 "App version returned from anonymous platform project; ignoring"
864 );
865 continue;
866 }
867 APP_VERSION& avpp = sr.app_versions[i];
868 if (strlen(avpp.platform) == 0) {
869 safe_strcpy(avpp.platform, get_primary_platform());
870 } else {
871 if (!is_supported_platform(avpp.platform)) {
872 msg_printf(project, MSG_INTERNAL_ERROR,
873 "App version has unsupported platform %s", avpp.platform
874 );
875 continue;
876 }
877 }
878 if (avpp.missing_coproc) {
879 msg_printf(project, MSG_INTERNAL_ERROR,
880 "App version uses non-existent %s GPU",
881 avpp.missing_coproc_name
882 );
883 }
884 APP* app = lookup_app(project, avpp.app_name);
885 if (!app) {
886 msg_printf(project, MSG_INTERNAL_ERROR,
887 "Missing app %s", avpp.app_name
888 );
889 continue;
890 }
891 APP_VERSION* avp = lookup_app_version(
892 app, avpp.platform, avpp.version_num, avpp.plan_class
893 );
894 if (avp) {
895 // update app version attributes in case they changed on server
896 //
897 avp->avg_ncpus = avpp.avg_ncpus;
898 avp->max_ncpus = avpp.max_ncpus;
899 avp->flops = avpp.flops;
900 safe_strcpy(avp->cmdline, avpp.cmdline);
901 avp->gpu_usage = avpp.gpu_usage;
902 strlcpy(avp->api_version, avpp.api_version, sizeof(avp->api_version));
903 avp->dont_throttle = avpp.dont_throttle;
904 avp->needs_network = avpp.needs_network;
905
906 // if we had download failures, clear them
907 //
908 avp->clear_errors();
909
910 continue;
911 }
912 avp = new APP_VERSION;
913 *avp = avpp;
914 retval = link_app_version(project, avp);
915 if (retval) {
916 delete avp;
917 continue;
918 }
919 app_versions.push_back(avp);
920 }
921 for (i=0; i<sr.workunits.size(); i++) {
922 if (lookup_workunit(project, sr.workunits[i].name)) continue;
923 WORKUNIT* wup = new WORKUNIT;
924 *wup = sr.workunits[i];
925 wup->project = project;
926 retval = link_workunit(project, wup);
927 if (retval) {
928 msg_printf(project, MSG_INTERNAL_ERROR,
929 "Can't handle task %s in scheduler reply", wup->name
930 );
931 delete wup;
932 continue;
933 }
934 wup->clear_errors();
935 workunits.push_back(wup);
936 }
937 double est_rsc_runtime[MAX_RSC];
938 for (int j=0; j<coprocs.n_rsc; j++) {
939 est_rsc_runtime[j] = 0;
940 }
941 for (i=0; i<sr.results.size(); i++) {
942 RESULT* rp2 = lookup_result(project, sr.results[i].name);
943 if (rp2) {
944 // see if project wants to change the job's deadline
945 //
946 if (sr.results[i].report_deadline != rp2->report_deadline) {
947 rp2->report_deadline = sr.results[i].report_deadline;
948 } else {
949 msg_printf(project, MSG_INTERNAL_ERROR,
950 "Already have task %s\n", sr.results[i].name
951 );
952 }
953 continue;
954 }
955 RESULT* rp = new RESULT;
956 *rp = sr.results[i];
957 retval = link_result(project, rp);
958 if (retval) {
959 msg_printf(project, MSG_INTERNAL_ERROR,
960 "Can't handle task %s in scheduler reply", rp->name
961 );
962 delete rp;
963 continue;
964 }
965 if (strlen(rp->platform) == 0) {
966 safe_strcpy(rp->platform, get_primary_platform());
967 rp->version_num = latest_version(rp->wup->app, rp->platform);
968 }
969 rp->avp = lookup_app_version(
970 rp->wup->app, rp->platform, rp->version_num, rp->plan_class
971 );
972 if (!rp->avp) {
973 msg_printf(project, MSG_INTERNAL_ERROR,
974 "No app version found for app %s platform %s ver %d class %s; discarding %s",
975 rp->wup->app->name, rp->platform, rp->version_num, rp->plan_class, rp->name
976 );
977 delete rp;
978 continue;
979 }
980 if (rp->avp->missing_coproc) {
981 msg_printf(project, MSG_INTERNAL_ERROR,
982 "Missing coprocessor for task %s; aborting", rp->name
983 );
984 rp->abort_inactive(EXIT_MISSING_COPROC);
985 } else {
986 rp->set_state(RESULT_NEW, "handle_scheduler_reply");
987 int rt = rp->avp->gpu_usage.rsc_type;
988 if (rt > 0) {
989 est_rsc_runtime[rt] += rp->estimated_runtime();
990 gpus_usable = true;
991 // trigger a check of whether GPU is actually usable
992 } else {
993 est_rsc_runtime[0] += rp->estimated_runtime();
994 }
995 }
996 rp->wup->version_num = rp->version_num;
997 rp->received_time = now;
998 new_results.push_back(rp);
999 results.push_back(rp);
1000 }
1001 sort_results();
1002
1003 if (log_flags.sched_op_debug) {
1004 if (sr.results.size()) {
1005 for (int j=0; j<coprocs.n_rsc; j++) {
1006 msg_printf(project, MSG_INFO,
1007 "[sched_op] estimated total %s task duration: %.0f seconds",
1008 rsc_name_long(j),
1009 est_rsc_runtime[j]/time_stats.availability_frac(j)
1010 );
1011 }
1012 }
1013 }
1014
1015 // update records for ack'ed results
1016 //
1017 for (i=0; i<sr.result_acks.size(); i++) {
1018 if (log_flags.sched_op_debug) {
1019 msg_printf(project, MSG_INFO,
1020 "[sched_op] handle_scheduler_reply(): got ack for task %s\n",
1021 sr.result_acks[i].name
1022 );
1023 }
1024 RESULT* rp = lookup_result(project, sr.result_acks[i].name);
1025 if (rp) {
1026 rp->got_server_ack = true;
1027 } else {
1028 msg_printf(project, MSG_INTERNAL_ERROR,
1029 "Got ack for task %s, but can't find it", sr.result_acks[i].name
1030 );
1031 }
1032 }
1033
1034 // handle result abort requests
1035 //
1036 for (i=0; i<sr.result_abort.size(); i++) {
1037 RESULT* rp = lookup_result(project, sr.result_abort[i].name);
1038 if (rp) {
1039 ACTIVE_TASK* atp = lookup_active_task_by_result(rp);
1040 if (atp) {
1041 atp->abort_task(EXIT_ABORTED_BY_PROJECT,
1042 "aborted by project - no longer usable"
1043 );
1044 } else {
1045 rp->abort_inactive(EXIT_ABORTED_BY_PROJECT);
1046 }
1047 } else {
1048 msg_printf(project, MSG_INTERNAL_ERROR,
1049 "Server requested abort of unknown task %s",
1050 sr.result_abort[i].name
1051 );
1052 }
1053 }
1054 for (i=0; i<sr.result_abort_if_not_started.size(); i++) {
1055 RESULT* rp = lookup_result(project, sr.result_abort_if_not_started[i].name);
1056 if (!rp) {
1057 msg_printf(project, MSG_INTERNAL_ERROR,
1058 "Server requested conditional abort of unknown task %s",
1059 sr.result_abort_if_not_started[i].name
1060 );
1061 continue;
1062 }
1063 if (rp->not_started) {
1064 rp->abort_inactive(EXIT_ABORTED_BY_PROJECT);
1065 }
1066 }
1067
1068 // remove acked trickle files
1069 //
1070 if (sr.message_ack) {
1071 remove_trickle_files(project);
1072 }
1073 if (sr.send_full_workload) {
1074 project->send_full_workload = true;
1075 }
1076 project->dont_use_dcf = sr.dont_use_dcf;
1077 project->send_time_stats_log = sr.send_time_stats_log;
1078 project->send_job_log = sr.send_job_log;
1079 project->trickle_up_pending = false;
1080
1081 // The project returns a hostid only if it has created a new host record.
1082 // In that case reset RPC seqno
1083 //
1084 if (sr.hostid) {
1085 if (project->hostid) {
1086 // if we already have a host ID for this project,
1087 // we must have sent it a stale seqno,
1088 // which usually means our state file was copied from another host.
1089 // So generate a new host CPID.
1090 //
1091 generate_new_host_cpid();
1092 msg_printf(project, MSG_INFO,
1093 "Generated new computer cross-project ID: %s",
1094 host_info.host_cpid
1095 );
1096 }
1097 //msg_printf(project, MSG_INFO, "Changing host ID from %d to %d", project->hostid, sr.hostid);
1098 project->hostid = sr.hostid;
1099 project->rpc_seqno = 0;
1100 }
1101
1102 #ifdef ENABLE_AUTO_UPDATE
1103 if (sr.auto_update.present) {
1104 if (!sr.auto_update.validate_and_link(project)) {
1105 auto_update = sr.auto_update;
1106 }
1107 }
1108 #endif
1109
1110 project->project_files = sr.project_files;
1111 project->link_project_files();
1112 project->create_project_file_symlinks();
1113
1114 if (log_flags.state_debug) {
1115 msg_printf(project, MSG_INFO,
1116 "[state] handle_scheduler_reply(): State after handle_scheduler_reply():"
1117 );
1118 print_summary();
1119 }
1120
1121 // the following must precede the backoff and request_delay checks,
1122 // since it overrides them
1123 //
1124 if (sr.next_rpc_delay) {
1125 project->next_rpc_time = now + sr.next_rpc_delay;
1126 } else {
1127 project->next_rpc_time = 0;
1128 }
1129
1130 work_fetch.handle_reply(project, &sr, new_results);
1131
1132 project->nrpc_failures = 0;
1133 project->min_rpc_time = 0;
1134
1135 if (sr.request_delay) {
1136 double x = now + sr.request_delay;
1137 project->set_min_rpc_time(x, "requested by project");
1138 }
1139
1140 if (sr.got_rss_feeds) {
1141 handle_sr_feeds(sr.sr_feeds, project);
1142 }
1143
1144 update_trickle_up_urls(project, sr.trickle_up_urls);
1145
1146 // garbage collect in case the project sent us some irrelevant FILE_INFOs;
1147 // avoid starting transfers for them
1148 //
1149 gstate.garbage_collect_always();
1150
1151 // if the user provided app_config.xml for this project,
1152 // apply it to any app versions we just got
1153 //
1154 project->app_configs.config_app_versions(project, false);
1155
1156 // make sure we don't set no_rsc_apps[] for all processor types
1157 //
1158 if (!project->anonymous_platform) {
1159 project->check_no_rsc_apps();
1160 }
1161
1162 return 0;
1163 }
1164
1165 #endif // SIM
1166
check_project_timeout()1167 void CLIENT_STATE::check_project_timeout() {
1168 unsigned int i;
1169 for (i=0; i<projects.size(); i++) {
1170 PROJECT* p = projects[i];
1171 if (p->possibly_backed_off && now > p->min_rpc_time) {
1172 p->possibly_backed_off = false;
1173 char buf[256];
1174 snprintf(buf, sizeof(buf), "Backoff ended for %s", p->get_project_name());
1175 request_work_fetch(buf);
1176 }
1177 }
1178 }
1179
1180 // find a project that needs to have its master file fetched
1181 //
next_project_master_pending()1182 PROJECT* CLIENT_STATE::next_project_master_pending() {
1183 unsigned int i;
1184 PROJECT* p;
1185
1186 for (i=0; i<projects.size(); i++) {
1187 p = projects[i];
1188 if (p->waiting_until_min_rpc_time()) continue;
1189 if (p->suspended_via_gui) continue;
1190 if (p->master_url_fetch_pending) {
1191 return p;
1192 }
1193 }
1194 return 0;
1195 }
1196
1197 // find a project for which a scheduler RPC has been requested
1198 // - by user
1199 // - by an account manager
1200 // - by the project
1201 // - because the project was just attached (for verification)
1202 //
next_project_sched_rpc_pending()1203 PROJECT* CLIENT_STATE::next_project_sched_rpc_pending() {
1204 unsigned int i;
1205 PROJECT* p;
1206
1207 for (i=0; i<projects.size(); i++) {
1208 p = projects[i];
1209 bool honor_backoff = true;
1210 bool honor_suspend = true;
1211
1212 // is a scheduler-requested RPC due?
1213 //
1214 if (!p->sched_rpc_pending && p->next_rpc_time && p->next_rpc_time<now) {
1215 // don't do it if project is set to no new work
1216 // and has no jobs currently
1217 //
1218 if (!p->dont_request_more_work || p->has_results()) {
1219 p->sched_rpc_pending = RPC_REASON_PROJECT_REQ;
1220 }
1221 }
1222
1223 switch (p->sched_rpc_pending) {
1224 case RPC_REASON_USER_REQ:
1225 honor_backoff = false;
1226 honor_suspend = false;
1227 break;
1228 case RPC_REASON_ACCT_MGR_REQ:
1229 // This is critical for acct mgrs, to propagate new host CPIDs
1230 honor_suspend = false;
1231 break;
1232 case RPC_REASON_INIT:
1233 // always do the initial RPC so we can get project name etc.
1234 honor_suspend = false;
1235 break;
1236 case RPC_REASON_PROJECT_REQ:
1237 break;
1238 default:
1239 continue;
1240 }
1241 if (honor_backoff && p->waiting_until_min_rpc_time()) {
1242 continue;
1243 }
1244 if (honor_suspend && p->suspended_via_gui) {
1245 continue;
1246 }
1247 if (p->sched_rpc_pending) {
1248 return p;
1249 }
1250 }
1251 return 0;
1252 }
1253
next_project_trickle_up_pending()1254 PROJECT* CLIENT_STATE::next_project_trickle_up_pending() {
1255 unsigned int i;
1256 PROJECT* p;
1257
1258 for (i=0; i<projects.size(); i++) {
1259 p = projects[i];
1260 if (p->waiting_until_min_rpc_time()) continue;
1261 if (p->suspended_via_gui) continue;
1262 if (p->trickle_up_pending) {
1263 return p;
1264 }
1265 }
1266 return 0;
1267 }
1268
1269 // find a project with finished results that should be reported.
1270 // This means:
1271 // - we're not backing off contacting the project
1272 // - no upload for that project is active
1273 // - the result is ready_to_report (compute done; files uploaded)
1274 // - we're within a day of the report deadline,
1275 // or at least a day has elapsed since the result was completed,
1276 // or we have a sporadic connection
1277 // or the project is in "don't request more work" state
1278 // or a network suspend period is coming up soon
1279 // or the project has > RESULT_REPORT_IF_AT_LEAST_N results ready to report
1280 //
find_project_with_overdue_results(bool network_suspend_soon)1281 PROJECT* CLIENT_STATE::find_project_with_overdue_results(
1282 bool network_suspend_soon
1283 ) {
1284 unsigned int i;
1285 RESULT* r;
1286
1287 for (i=0; i<projects.size(); i++) {
1288 PROJECT* p = projects[i];
1289 p->n_ready = 0;
1290 p->dont_contact = false;
1291 if (p->waiting_until_min_rpc_time()) p->dont_contact = true;
1292 if (p->suspended_via_gui) p->dont_contact = true;
1293 #ifndef SIM
1294 if (actively_uploading(p)) p->dont_contact = true;
1295 #endif
1296 }
1297
1298 for (i=0; i<results.size(); i++) {
1299 r = results[i];
1300 if (!r->ready_to_report) continue;
1301
1302 PROJECT* p = r->project;
1303 if (p->dont_contact) continue;
1304
1305 if (p->dont_request_more_work) {
1306 return p;
1307 }
1308
1309 if (r->report_immediately) {
1310 return p;
1311 }
1312
1313 if (cc_config.report_results_immediately) {
1314 return p;
1315 }
1316
1317 if (p->report_results_immediately) {
1318 return p;
1319 }
1320
1321 if (r->app->report_results_immediately) {
1322 return p;
1323 }
1324
1325 if (net_status.have_sporadic_connection) {
1326 return p;
1327 }
1328
1329 if (network_suspend_soon) {
1330 return p;
1331 }
1332
1333 double cushion = std::max(REPORT_DEADLINE_CUSHION, work_buf_min());
1334 if (gstate.now > r->report_deadline - cushion) {
1335 return p;
1336 }
1337
1338 if (gstate.now > r->completed_time + MAX_REPORT_DELAY) {
1339 return p;
1340 }
1341
1342 p->n_ready++;
1343 if (p->n_ready >= RESULT_REPORT_IF_AT_LEAST_N) {
1344 return p;
1345 }
1346 }
1347 return 0;
1348 }
1349
1350 // trigger work fetch
1351 //
request_work_fetch(const char * where)1352 void CLIENT_STATE::request_work_fetch(const char* where) {
1353 if (log_flags.work_fetch_debug) {
1354 msg_printf(0, MSG_INFO, "[work_fetch] Request work fetch: %s", where);
1355 }
1356 must_check_work_fetch = true;
1357 }
1358
1359