1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC. If not, see <http://www.gnu.org/licenses/>.
17
18 // Abstraction of a set of executing applications,
19 // connected to I/O files in various ways.
20 // Shouldn't depend on CLIENT_STATE.
21
22 #include "cpp.h"
23
24 #ifdef _WIN32
25 #include "boinc_win.h"
26 #else
27 #include "config.h"
28 #endif
29
30 #ifndef _WIN32
31 #include <unistd.h>
32 #if HAVE_SYS_WAIT_H
33 #include <sys/wait.h>
34 #endif
35 #if HAVE_SYS_TIME_H
36 #include <sys/time.h>
37 #endif
38 #if HAVE_SYS_RESOURCE_H
39 #include <sys/resource.h>
40 #endif
41 #if HAVE_SYS_TYPES_H
42 #include <sys/types.h>
43 #endif
44 #if HAVE_FCNTL_H
45 #include <fcntl.h>
46 #endif
47
48 #include <cctype>
49 #include <ctime>
50 #include <cstdio>
51 #include <cmath>
52 #include <cstdlib>
53 #endif
54
55 #ifdef _MSC_VER
56 #define snprintf _snprintf
57 #endif
58
59 #include "error_numbers.h"
60 #include "filesys.h"
61 #include "file_names.h"
62 #include "parse.h"
63 #include "shmem.h"
64 #include "str_replace.h"
65 #include "str_util.h"
66 #include "util.h"
67
68 #include "async_file.h"
69 #include "client_msgs.h"
70 #include "client_state.h"
71 #include "procinfo.h"
72 #include "result.h"
73 #include "sandbox.h"
74 #include "diagnostics.h"
75
76 #include "app.h"
77
78 using std::max;
79 using std::min;
80
81 double exclusive_app_running = 0;
82 double exclusive_gpu_app_running = 0;
83 int gpu_suspend_reason;
84 double non_boinc_cpu_usage;
85
~ACTIVE_TASK()86 ACTIVE_TASK::~ACTIVE_TASK() {
87 #ifndef SIM
88 if (async_copy) {
89 remove_async_copy(async_copy);
90 }
91 #endif
92 }
93
ACTIVE_TASK()94 ACTIVE_TASK::ACTIVE_TASK() {
95 #ifdef _WIN32
96 safe_strcpy(shmem_seg_name, "");
97 #else
98 shmem_seg_name = 0;
99 #endif
100 result = NULL;
101 wup = NULL;
102 app_version = NULL;
103 pid = 0;
104
105 _task_state = PROCESS_UNINITIALIZED;
106 slot = 0;
107 checkpoint_cpu_time = 0;
108 checkpoint_elapsed_time = 0;
109 checkpoint_fraction_done = 0;
110 checkpoint_fraction_done_elapsed_time = 0;
111 current_cpu_time = 0;
112 peak_working_set_size = 0;
113 peak_swap_size = 0;
114 peak_disk_usage = 0;
115 once_ran_edf = false;
116
117 fraction_done = 0;
118 fraction_done_elapsed_time = 0;
119 first_fraction_done = 0;
120 first_fraction_done_elapsed_time = 0;
121 scheduler_state = CPU_SCHED_UNINITIALIZED;
122 next_scheduler_state = CPU_SCHED_UNINITIALIZED;
123 signal = 0;
124 run_interval_start_wall_time = gstate.now;
125 checkpoint_wall_time = 0;
126 elapsed_time = 0;
127 bytes_sent_episode = 0;
128 bytes_received_episode = 0;
129 bytes_sent = 0;
130 bytes_received = 0;
131 safe_strcpy(slot_dir, "");
132 safe_strcpy(slot_path, "");
133 max_elapsed_time = 0;
134 max_disk_usage = 0;
135 max_mem_usage = 0;
136 have_trickle_down = false;
137 send_upload_file_status = false;
138 too_large = false;
139 needs_shmem = false;
140 want_network = 0;
141 abort_time = 0;
142 premature_exit_count = 0;
143 quit_time = 0;
144 procinfo.clear();
145 procinfo.working_set_size_smoothed = 0;
146 #ifdef _WIN32
147 process_handle = NULL;
148 shm_handle = NULL;
149 #endif
150 premature_exit_count = 0;
151 overdue_checkpoint = false;
152 last_deadline_miss_time = 0;
153 safe_strcpy(web_graphics_url, "");
154 safe_strcpy(remote_desktop_addr, "");
155 async_copy = NULL;
156 finish_file_time = 0;
157 }
158
process_exists()159 bool ACTIVE_TASK::process_exists() {
160 switch (task_state()) {
161 case PROCESS_EXECUTING:
162 case PROCESS_SUSPENDED:
163 case PROCESS_ABORT_PENDING:
164 case PROCESS_QUIT_PENDING:
165 return true;
166 }
167 return false;
168 }
169
170 // preempt this task;
171 // called from the CLIENT_STATE::enforce_schedule()
172 // and ACTIVE_TASK_SET::suspend_all()
173 //
preempt(int preempt_type,int reason)174 int ACTIVE_TASK::preempt(int preempt_type, int reason) {
175 bool remove=false;
176
177 switch (preempt_type) {
178 case REMOVE_NEVER:
179 remove = false;
180 break;
181 case REMOVE_MAYBE_USER:
182 case REMOVE_MAYBE_SCHED:
183 // GPU jobs: always remove from mem, since it's tying up GPU RAM
184 //
185 if (result->uses_gpu()) {
186 remove = true;
187 break;
188 }
189 // if it's never checkpointed, leave in mem
190 //
191 if (checkpoint_elapsed_time == 0) {
192 remove = false;
193 break;
194 }
195 // otherwise obey user prefs
196 //
197 remove = !gstate.global_prefs.leave_apps_in_memory;
198 break;
199 case REMOVE_ALWAYS:
200 remove = true;
201 break;
202 }
203
204 bool show_msg = log_flags.cpu_sched && reason != SUSPEND_REASON_CPU_THROTTLE;
205 if (remove) {
206 if (show_msg) {
207 msg_printf(result->project, MSG_INFO,
208 "[cpu_sched] Preempting %s (removed from memory)",
209 result->name
210 );
211 }
212 return request_exit();
213 } else {
214 if (show_msg) {
215 msg_printf(result->project, MSG_INFO,
216 "[cpu_sched] Preempting %s (left in memory)",
217 result->name
218 );
219 }
220 if (task_state() != PROCESS_EXECUTING) return 0;
221 return suspend();
222 }
223 return 0;
224 }
225
226 #ifndef SIM
227
228 // called when the task's main process has exited.
229 // delete the shared memory used to communicate with it,
230 // and kill any remaining subsidiary processes.
231 //
cleanup_task()232 void ACTIVE_TASK::cleanup_task() {
233 #ifdef _WIN32
234 if (process_handle) {
235 CloseHandle(process_handle);
236 process_handle = NULL;
237 }
238 // detach from shared mem.
239 // This will destroy shmem seg since we're the last attachment
240 //
241 if (app_client_shm.shm) {
242 detach_shmem(shm_handle, app_client_shm.shm);
243 app_client_shm.shm = NULL;
244 }
245 #else
246 int retval;
247
248 if (app_client_shm.shm) {
249 #ifndef __EMX__
250 if (app_version->api_version_at_least(6, 0)) {
251 retval = detach_shmem_mmap(app_client_shm.shm, sizeof(SHARED_MEM));
252 } else
253 #endif
254 {
255 retval = detach_shmem(app_client_shm.shm);
256 if (retval) {
257 msg_printf(wup->project, MSG_INTERNAL_ERROR,
258 "Couldn't detach shared memory: %s", boincerror(retval)
259 );
260 }
261 retval = destroy_shmem(shmem_seg_name);
262 if (retval) {
263 msg_printf(wup->project, MSG_INTERNAL_ERROR,
264 "Couldn't destroy shared memory: %s", boincerror(retval)
265 );
266 }
267 }
268 app_client_shm.shm = NULL;
269 gstate.retry_shmem_time = 0;
270 }
271 #endif
272
273 kill_subsidiary_processes();
274
275 if (cc_config.exit_after_finish) {
276 gstate.write_state_file();
277 exit(0);
278 }
279 }
280
init(RESULT * rp)281 int ACTIVE_TASK::init(RESULT* rp) {
282 result = rp;
283 wup = rp->wup;
284 app_version = rp->avp;
285 max_elapsed_time = rp->wup->rsc_fpops_bound/rp->avp->flops;
286 max_disk_usage = rp->wup->rsc_disk_bound;
287 max_mem_usage = rp->wup->rsc_memory_bound;
288 get_slot_dir(slot, slot_dir, sizeof(slot_dir));
289 relative_to_absolute(slot_dir, slot_path);
290 return 0;
291 }
292 #endif
293
294 // Deallocate memory to prevent unneeded reporting of memory leaks
295 //
free_mem()296 void ACTIVE_TASK_SET::free_mem() {
297 vector<ACTIVE_TASK*>::iterator at_iter;
298 ACTIVE_TASK *at;
299
300 at_iter = active_tasks.begin();
301 while (at_iter != active_tasks.end()) {
302 at = active_tasks[0];
303 at_iter = active_tasks.erase(at_iter);
304 delete at;
305 }
306 }
307
308 #ifndef SIM
309
app_running(PROC_MAP & pm,const char * p)310 bool app_running(PROC_MAP& pm, const char* p) {
311 PROC_MAP::iterator i;
312 for (i=pm.begin(); i!=pm.end(); ++i) {
313 PROCINFO& pi = i->second;
314 //msg_printf(0, MSG_INFO, "running: [%s]", pi.command);
315 if (!strcasecmp(pi.command, p)) {
316 return true;
317 }
318 }
319 return false;
320 }
321
322 #if 1 // debugging
procinfo_show(PROC_MAP & pm)323 void procinfo_show(PROC_MAP& pm) {
324 PROCINFO pi;
325 pi.clear();
326 PROC_MAP::iterator i;
327 for (i=pm.begin(); i!=pm.end(); ++i) {
328 PROCINFO& p = i->second;
329
330 msg_printf(NULL, MSG_INFO, "%d %s: boinc? %d low_pri %d (u%f k%f)",
331 p.id, p.command, p.is_boinc_app, p.is_low_priority,
332 p.user_time, p.kernel_time
333 );
334 #ifdef _WIN32
335 if (p.id == 0) continue;
336 #endif
337 if (p.is_boinc_app) continue;
338 if (p.is_low_priority) continue;
339 pi.kernel_time += p.kernel_time;
340 pi.user_time += p.user_time;
341 }
342 msg_printf(NULL, MSG_INFO, "non-boinc: u%f k%f", pi.user_time, pi.kernel_time);
343 }
344 #endif
345
346 // scan the set of all processes to
347 // 1) get the working-set size of active tasks
348 // 2) see if exclusive apps are running
349 // 3) get CPU time of non-BOINC processes
350 //
get_memory_usage()351 void ACTIVE_TASK_SET::get_memory_usage() {
352 static double last_mem_time=0;
353 unsigned int i;
354 int retval;
355 static bool first = true;
356 static double last_cpu_time;
357 double diff=0;
358
359 if (!first) {
360 diff = gstate.now - last_mem_time;
361 if (diff < 0 || diff > MEMORY_USAGE_PERIOD + 10) {
362 // user has changed system clock,
363 // or there has been a long system sleep
364 //
365 last_mem_time = gstate.now;
366 return;
367 }
368 if (diff < MEMORY_USAGE_PERIOD) return;
369 }
370
371 last_mem_time = gstate.now;
372 PROC_MAP pm;
373 retval = procinfo_setup(pm);
374 if (retval) {
375 if (log_flags.mem_usage_debug) {
376 msg_printf(NULL, MSG_INTERNAL_ERROR,
377 "[mem_usage] procinfo_setup() returned %d", retval
378 );
379 }
380 return;
381 }
382 PROCINFO boinc_total;
383 if (log_flags.mem_usage_debug) {
384 boinc_total.clear();
385 boinc_total.working_set_size_smoothed = 0;
386 }
387 for (i=0; i<active_tasks.size(); i++) {
388 ACTIVE_TASK* atp = active_tasks[i];
389 if (atp->task_state() == PROCESS_UNINITIALIZED) continue;
390 if (atp->pid ==0) continue;
391
392 // scan all active tasks with a process, even if not scheduled, because
393 // 1) we might have recently suspended a tasks,
394 // and we still need to count its time
395 // 2) preempted tasks might not actually suspend themselves
396 // (and we'd count that as non-BOINC CPU usage
397 // and suspend everything).
398
399 PROCINFO& pi = atp->procinfo;
400 unsigned long last_page_fault_count = pi.page_fault_count;
401 pi.clear();
402 pi.id = atp->pid;
403 vector<int>* v = NULL;
404 if (atp->other_pids.size()>0) {
405 v = &(atp->other_pids);
406 }
407 procinfo_app(pi, v, pm, atp->app_version->graphics_exec_file);
408 if (atp->app_version->is_vm_app) {
409 // the memory of virtual machine apps is not reported correctly,
410 // at least on Windows. Use the VM size instead.
411 //
412 pi.working_set_size_smoothed = atp->wup->rsc_memory_bound;
413 } else {
414 pi.working_set_size_smoothed = .5*(pi.working_set_size_smoothed + pi.working_set_size);
415 }
416
417 if (pi.working_set_size > atp->peak_working_set_size) {
418 atp->peak_working_set_size = pi.working_set_size;
419 }
420 if (pi.swap_size > atp->peak_swap_size) {
421 atp->peak_swap_size = pi.swap_size;
422 }
423
424 if (!first) {
425 int pf = pi.page_fault_count - last_page_fault_count;
426 pi.page_fault_rate = pf/diff;
427 if (log_flags.mem_usage_debug) {
428 msg_printf(atp->result->project, MSG_INFO,
429 "[mem_usage] %s%s: WS %.2fMB, smoothed %.2fMB, swap %.2fMB, %.2f page faults/sec, user CPU %.3f, kernel CPU %.3f",
430 atp->scheduler_state==CPU_SCHED_SCHEDULED?"":" (not running)",
431 atp->result->name,
432 pi.working_set_size/MEGA,
433 pi.working_set_size_smoothed/MEGA,
434 pi.swap_size/MEGA,
435 pi.page_fault_rate,
436 pi.user_time,
437 pi.kernel_time
438 );
439 boinc_total.working_set_size += pi.working_set_size;
440 boinc_total.working_set_size_smoothed += pi.working_set_size_smoothed;
441 boinc_total.swap_size += pi.swap_size;
442 boinc_total.page_fault_rate += pi.page_fault_rate;
443 }
444 }
445 }
446
447 if (!first) {
448 if (log_flags.mem_usage_debug) {
449 msg_printf(0, MSG_INFO,
450 "[mem_usage] BOINC totals: WS %.2fMB, smoothed %.2fMB, swap %.2fMB, %.2f page faults/sec",
451 boinc_total.working_set_size/MEGA,
452 boinc_total.working_set_size_smoothed/MEGA,
453 boinc_total.swap_size/MEGA,
454 boinc_total.page_fault_rate
455 );
456 }
457 }
458
459 for (i=0; i<cc_config.exclusive_apps.size(); i++) {
460 if (app_running(pm, cc_config.exclusive_apps[i].c_str())) {
461 if (log_flags.mem_usage_debug) {
462 msg_printf(NULL, MSG_INFO,
463 "[mem_usage] exclusive app %s is running", cc_config.exclusive_apps[i].c_str()
464 );
465 }
466 exclusive_app_running = gstate.now;
467 break;
468 }
469 }
470 for (i=0; i<cc_config.exclusive_gpu_apps.size(); i++) {
471 if (app_running(pm, cc_config.exclusive_gpu_apps[i].c_str())) {
472 if (log_flags.mem_usage_debug) {
473 msg_printf(NULL, MSG_INFO,
474 "[mem_usage] exclusive GPU app %s is running", cc_config.exclusive_gpu_apps[i].c_str()
475 );
476 }
477 exclusive_gpu_app_running = gstate.now;
478 break;
479 }
480 }
481
482 // get info on non-BOINC processes.
483 // mem usage info is not useful because most OSs don't
484 // move idle processes out of RAM, so physical memory is always full.
485 // Also (at least on Win) page faults are used for various things,
486 // not all of them generate disk I/O,
487 // so they're not useful for detecting paging/thrashing.
488 //
489 PROCINFO pi;
490 procinfo_non_boinc(pi, pm);
491 if (log_flags.mem_usage_debug) {
492 //procinfo_show(pm);
493 msg_printf(NULL, MSG_INFO,
494 "[mem_usage] All others: WS %.2fMB, swap %.2fMB, user %.3fs, kernel %.3fs",
495 pi.working_set_size/MEGA, pi.swap_size/MEGA,
496 pi.user_time, pi.kernel_time
497 );
498 }
499 double new_cpu_time = pi.user_time + pi.kernel_time;
500 if (!first) {
501 non_boinc_cpu_usage = (new_cpu_time - last_cpu_time)/(diff*gstate.host_info.p_ncpus);
502 // processes might have exited in the last 10 sec,
503 // causing this to be negative.
504 if (non_boinc_cpu_usage < 0) non_boinc_cpu_usage = 0;
505 if (log_flags.mem_usage_debug) {
506 msg_printf(NULL, MSG_INFO,
507 "[mem_usage] non-BOINC CPU usage: %.2f%%", non_boinc_cpu_usage*100
508 );
509 }
510 }
511 last_cpu_time = new_cpu_time;
512 first = false;
513 }
514
515 #endif
516
517 // There's a new trickle file.
518 // Move it from slot dir to project dir
519 //
move_trickle_file()520 int ACTIVE_TASK::move_trickle_file() {
521 char new_path[MAXPATHLEN], old_path[MAXPATHLEN];
522 int retval;
523
524 snprintf(old_path, sizeof(old_path),
525 "%s/trickle_up.xml",
526 slot_dir
527 );
528 snprintf(new_path, sizeof(new_path),
529 "%s/trickle_up_%s_%d.xml",
530 result->project->project_dir(), result->name, (int)time(0)
531 );
532 retval = boinc_rename(old_path, new_path);
533
534 // if can't move it, remove
535 //
536 if (retval) {
537 delete_project_owned_file(old_path, true);
538 return ERR_RENAME;
539 }
540 return 0;
541 }
542
543 // size of output files and files in slot dir
544 //
current_disk_usage(double & size)545 int ACTIVE_TASK::current_disk_usage(double& size) {
546 double x;
547 unsigned int i;
548 int retval;
549 FILE_INFO* fip;
550 char path[MAXPATHLEN];
551
552 retval = dir_size(slot_dir, size);
553 if (retval) return retval;
554 for (i=0; i<result->output_files.size(); i++) {
555 fip = result->output_files[i].file_info;
556 get_pathname(fip, path, sizeof(path));
557 retval = file_size(path, x);
558 if (!retval) size += x;
559 }
560 if (size > peak_disk_usage) {
561 peak_disk_usage = size;
562 }
563 return 0;
564 }
565
is_slot_in_use(int slot)566 bool ACTIVE_TASK_SET::is_slot_in_use(int slot) {
567 unsigned int i;
568 for (i=0; i<active_tasks.size(); i++) {
569 if (active_tasks[i]->slot == slot) {
570 return true;
571 }
572 }
573 return false;
574 }
575
is_slot_dir_in_use(char * dir)576 bool ACTIVE_TASK_SET::is_slot_dir_in_use(char* dir) {
577 char path[MAXPATHLEN];
578 unsigned int i;
579 for (i=0; i<active_tasks.size(); i++) {
580 get_slot_dir(active_tasks[i]->slot, path, sizeof(path));
581 if (!strcmp(path, dir)) return true;
582 }
583 return false;
584 }
585
586 // Get a free slot:
587 // either find an unused an empty slot dir,
588 // or create a new slot dir if needed
589 //
get_free_slot(RESULT * rp)590 int ACTIVE_TASK::get_free_slot(RESULT* rp) {
591 #ifndef SIM
592 int j, retval;
593 char path[MAXPATHLEN];
594
595 // scan slot numbers: slots/0, slots/1, etc.
596 //
597 for (j=0; ; j++) {
598 // skip slots that are in use by existing jobs
599 //
600 if (gstate.active_tasks.is_slot_in_use(j)) continue;
601
602 get_slot_dir(j, path, sizeof(path));
603 if (boinc_file_exists(path)) {
604 if (is_dir(path)) {
605 // If the directory exists, try to clean it out.
606 // If this succeeds, use it.
607 //
608 retval = client_clean_out_dir(path, "get_free_slot()");
609 if (!retval) break;
610 if (log_flags.slot_debug) {
611 msg_printf(rp->project, MSG_INFO,
612 "[slot] failed to clean out dir: %s",
613 boincerror(retval)
614 );
615 }
616 }
617 } else {
618 // directory doesn't exist - create one
619 //
620 retval = make_slot_dir(j);
621 if (!retval) break;
622 }
623
624 // paranoia - don't allow unbounded slots
625 //
626 if (j > gstate.ncpus*100) {
627 msg_printf(rp->project, MSG_INTERNAL_ERROR,
628 "exceeded limit of %d slot directories", gstate.ncpus*100
629 );
630 return ERR_NULL;
631 }
632 }
633 slot = j;
634 if (log_flags.slot_debug) {
635 msg_printf(rp->project, MSG_INFO,
636 "[slot] assigning slot %d to %s", j, rp->name
637 );
638 }
639 #endif
640 return 0;
641 }
642
slot_taken(int slot)643 bool ACTIVE_TASK_SET::slot_taken(int slot) {
644 unsigned int i;
645 for (i=0; i<active_tasks.size(); i++) {
646 if (active_tasks[i]->slot == slot) return true;
647 }
648 return false;
649 }
650
651 // <active_task_state> is here for the benefit of 3rd-party software
652 // that reads the client state file
653 //
write(MIOFILE & fout)654 int ACTIVE_TASK::write(MIOFILE& fout) {
655 fout.printf(
656 "<active_task>\n"
657 " <project_master_url>%s</project_master_url>\n"
658 " <result_name>%s</result_name>\n"
659 " <active_task_state>%d</active_task_state>\n"
660 " <app_version_num>%d</app_version_num>\n"
661 " <slot>%d</slot>\n"
662 " <checkpoint_cpu_time>%f</checkpoint_cpu_time>\n"
663 " <checkpoint_elapsed_time>%f</checkpoint_elapsed_time>\n"
664 " <checkpoint_fraction_done>%f</checkpoint_fraction_done>\n"
665 " <checkpoint_fraction_done_elapsed_time>%f</checkpoint_fraction_done_elapsed_time>\n"
666 " <current_cpu_time>%f</current_cpu_time>\n"
667 " <once_ran_edf>%d</once_ran_edf>\n"
668 " <swap_size>%f</swap_size>\n"
669 " <working_set_size>%f</working_set_size>\n"
670 " <working_set_size_smoothed>%f</working_set_size_smoothed>\n"
671 " <page_fault_rate>%f</page_fault_rate>\n"
672 " <bytes_sent>%f</bytes_sent>\n"
673 " <bytes_received>%f</bytes_received>\n",
674 result->project->master_url,
675 result->name,
676 task_state(),
677 app_version->version_num,
678 slot,
679 checkpoint_cpu_time,
680 checkpoint_elapsed_time,
681 checkpoint_fraction_done,
682 checkpoint_fraction_done_elapsed_time,
683 current_cpu_time,
684 once_ran_edf?1:0,
685 procinfo.swap_size,
686 procinfo.working_set_size,
687 procinfo.working_set_size_smoothed,
688 procinfo.page_fault_rate,
689 bytes_sent,
690 bytes_received
691 );
692 fout.printf("</active_task>\n");
693 return 0;
694 }
695
696 #ifndef SIM
697
write_gui(MIOFILE & fout)698 int ACTIVE_TASK::write_gui(MIOFILE& fout) {
699 // if the app hasn't reported fraction done or reported > 1,
700 // and a minute has elapsed, estimate fraction done in a
701 // way that constantly increases and approaches 1.
702 //
703 double fd = fraction_done;
704 if (((fd<=0)||(fd>1)) && elapsed_time > 60) {
705 double est_time = wup->rsc_fpops_est/app_version->flops;
706 double x = elapsed_time/est_time;
707 fd = 1 - exp(-x);
708 }
709 fout.printf(
710 "<active_task>\n"
711 " <active_task_state>%d</active_task_state>\n"
712 " <app_version_num>%d</app_version_num>\n"
713 " <slot>%d</slot>\n"
714 " <pid>%d</pid>\n"
715 " <scheduler_state>%d</scheduler_state>\n"
716 " <checkpoint_cpu_time>%f</checkpoint_cpu_time>\n"
717 " <fraction_done>%f</fraction_done>\n"
718 " <current_cpu_time>%f</current_cpu_time>\n"
719 " <elapsed_time>%f</elapsed_time>\n"
720 " <swap_size>%f</swap_size>\n"
721 " <working_set_size>%f</working_set_size>\n"
722 " <working_set_size_smoothed>%f</working_set_size_smoothed>\n"
723 " <page_fault_rate>%f</page_fault_rate>\n"
724 " <bytes_sent>%f</bytes_sent>\n"
725 " <bytes_received>%f</bytes_received>\n"
726 "%s"
727 "%s",
728 task_state(),
729 app_version->version_num,
730 slot,
731 pid,
732 scheduler_state,
733 checkpoint_cpu_time,
734 fd,
735 current_cpu_time,
736 elapsed_time,
737 procinfo.swap_size,
738 procinfo.working_set_size,
739 procinfo.working_set_size_smoothed,
740 procinfo.page_fault_rate,
741 bytes_sent,
742 bytes_received,
743 too_large?" <too_large/>\n":"",
744 needs_shmem?" <needs_shmem/>\n":""
745 );
746 if (elapsed_time > first_fraction_done_elapsed_time) {
747 fout.printf(
748 " <progress_rate>%f</progress_rate>\n",
749 (fd - first_fraction_done)/(elapsed_time - first_fraction_done_elapsed_time)
750 );
751 }
752 if (strlen(app_version->graphics_exec_path)) {
753 fout.printf(
754 " <graphics_exec_path>%s</graphics_exec_path>\n"
755 " <slot_path>%s</slot_path>\n",
756 app_version->graphics_exec_path,
757 slot_path
758 );
759 }
760 if (strlen(web_graphics_url)) {
761 fout.printf(
762 " <web_graphics_url>%s</web_graphics_url>\n",
763 web_graphics_url
764 );
765 }
766 if (strlen(remote_desktop_addr)) {
767 fout.printf(
768 " <remote_desktop_addr>%s</remote_desktop_addr>\n",
769 remote_desktop_addr
770 );
771 }
772 fout.printf("</active_task>\n");
773 return 0;
774 }
775
776 #endif
777
parse(XML_PARSER & xp)778 int ACTIVE_TASK::parse(XML_PARSER& xp) {
779 char result_name[256], project_master_url[256];
780 int n, dummy;
781 unsigned int i;
782 PROJECT* project=0;
783 double x;
784
785 safe_strcpy(result_name, "");
786 safe_strcpy(project_master_url, "");
787
788 while (!xp.get_tag()) {
789 if (xp.match_tag("/active_task")) {
790 project = gstate.lookup_project(project_master_url);
791 if (!project) {
792 msg_printf(
793 NULL, MSG_INTERNAL_ERROR,
794 "State file error: project %s not found for task\n",
795 project_master_url
796 );
797 return ERR_NULL;
798 }
799 result = gstate.lookup_result(project, result_name);
800 if (!result) {
801 msg_printf(
802 project, MSG_INTERNAL_ERROR,
803 "State file error: result %s not found for task\n",
804 result_name
805 );
806 return ERR_NULL;
807 }
808
809 // various sanity checks
810 //
811 if (result->got_server_ack
812 || result->ready_to_report
813 || result->state() != RESULT_FILES_DOWNLOADED
814 ) {
815 return ERR_BAD_RESULT_STATE;
816 }
817
818 wup = result->wup;
819 app_version = gstate.lookup_app_version(
820 result->app, result->platform, result->version_num,
821 result->plan_class
822 );
823 if (!app_version) {
824 msg_printf(
825 project, MSG_INTERNAL_ERROR,
826 "State file error: app %s platform %s version %d not found\n",
827 result->app->name, result->platform, result->version_num
828 );
829 return ERR_NULL;
830 }
831
832 // make sure no two active tasks are in same slot
833 //
834 for (i=0; i<gstate.active_tasks.active_tasks.size(); i++) {
835 ACTIVE_TASK* atp = gstate.active_tasks.active_tasks[i];
836 if (atp->slot == slot) {
837 msg_printf(project, MSG_INTERNAL_ERROR,
838 "State file error: two tasks in slot %d\n", slot
839 );
840 return ERR_BAD_RESULT_STATE;
841 }
842 }
843
844 // for 6.2/6.4 transition
845 //
846 if (checkpoint_elapsed_time == 0) {
847 elapsed_time = checkpoint_cpu_time;
848 checkpoint_elapsed_time = elapsed_time;
849 }
850
851 // for 6.12.25-26 transition;
852 // old clients write fraction_done to state file;
853 // new clients don't
854 if (fraction_done && checkpoint_elapsed_time) {
855 checkpoint_fraction_done = fraction_done;
856 checkpoint_fraction_done_elapsed_time = checkpoint_elapsed_time;
857 fraction_done_elapsed_time = checkpoint_elapsed_time;
858 } else {
859 fraction_done = checkpoint_fraction_done;
860 fraction_done_elapsed_time = checkpoint_fraction_done_elapsed_time;
861 }
862 return 0;
863 }
864 else if (xp.parse_str("result_name", result_name, sizeof(result_name))) continue;
865 else if (xp.parse_str("project_master_url", project_master_url, sizeof(project_master_url))) continue;
866 else if (xp.parse_int("slot", slot)) continue;
867 else if (xp.parse_int("active_task_state", dummy)) continue;
868 else if (xp.parse_double("checkpoint_cpu_time", checkpoint_cpu_time)) continue;
869 else if (xp.parse_double("checkpoint_elapsed_time", checkpoint_elapsed_time)) continue;
870 else if (xp.parse_double("checkpoint_fraction_done", checkpoint_fraction_done)) continue;
871 else if (xp.parse_double("checkpoint_fraction_done_elapsed_time", checkpoint_fraction_done_elapsed_time)) continue;
872 else if (xp.parse_bool("once_ran_edf", once_ran_edf)) continue;
873 else if (xp.parse_double("fraction_done", fraction_done)) continue;
874 // deprecated - for backwards compat
875 else if (xp.parse_int("app_version_num", n)) continue;
876 else if (xp.parse_double("swap_size", procinfo.swap_size)) continue;
877 else if (xp.parse_double("working_set_size", procinfo.working_set_size)) continue;
878 else if (xp.parse_double("working_set_size_smoothed", procinfo.working_set_size_smoothed)) continue;
879 else if (xp.parse_double("page_fault_rate", procinfo.page_fault_rate)) continue;
880 else if (xp.parse_double("current_cpu_time", x)) continue;
881 else if (xp.parse_double("bytes_sent", bytes_sent)) continue;
882 else if (xp.parse_double("bytes_received", bytes_received)) continue;
883 else {
884 if (log_flags.unparsed_xml) {
885 msg_printf(project, MSG_INFO,
886 "[unparsed_xml] ACTIVE_TASK::parse(): unrecognized %s\n",
887 xp.parsed_tag
888 );
889 }
890 }
891 }
892 return ERR_XML_PARSE;
893 }
894
write(MIOFILE & fout)895 int ACTIVE_TASK_SET::write(MIOFILE& fout) {
896 unsigned int i;
897 int retval;
898
899 fout.printf("<active_task_set>\n");
900 for (i=0; i<active_tasks.size(); i++) {
901 retval = active_tasks[i]->write(fout);
902 if (retval) return retval;
903 }
904 fout.printf("</active_task_set>\n");
905 return 0;
906 }
907
parse(XML_PARSER & xp)908 int ACTIVE_TASK_SET::parse(XML_PARSER& xp) {
909 while (!xp.get_tag()) {
910 if (xp.match_tag("/active_task_set")) return 0;
911 else if (xp.match_tag("active_task")) {
912 #ifdef SIM
913 ACTIVE_TASK at;
914 at.parse(xp);
915 #else
916 ACTIVE_TASK* atp = new ACTIVE_TASK;
917 int retval = atp->parse(xp);
918 if (!retval) {
919 if (slot_taken(atp->slot)) {
920 msg_printf(atp->result->project, MSG_INTERNAL_ERROR,
921 "slot %d in use; discarding result %s",
922 atp->slot, atp->result->name
923 );
924 retval = ERR_XML_PARSE;
925 }
926 }
927 if (!retval) active_tasks.push_back(atp);
928 else delete atp;
929 #endif
930 } else {
931 if (log_flags.unparsed_xml) {
932 msg_printf(NULL, MSG_INFO,
933 "[unparsed_xml] ACTIVE_TASK_SET::parse(): unrecognized %s\n", xp.parsed_tag
934 );
935 }
936 }
937 }
938 return ERR_XML_PARSE;
939 }
940
941 #ifndef SIM
942
init(char * n)943 void MSG_QUEUE::init(char* n) {
944 safe_strcpy(name, n);
945 last_block = 0;
946 msgs.clear();
947 }
948
msg_queue_send(const char * msg,MSG_CHANNEL & channel)949 void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) {
950 if ((msgs.size()==0) && channel.send_msg(msg)) {
951 if (log_flags.app_msg_send) {
952 msg_printf(NULL, MSG_INFO,
953 "[app_msg_send] sent %s to %s", msg, name
954 );
955 }
956 last_block = 0;
957 return;
958 }
959 if (log_flags.app_msg_send) {
960 msg_printf(NULL, MSG_INFO,
961 "[app_msg_send] deferred %s to %s", msg, name
962 );
963 }
964 msgs.push_back(string(msg));
965 if (!last_block) last_block = gstate.now;
966 }
967
msg_queue_poll(MSG_CHANNEL & channel)968 void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) {
969 if (msgs.empty()) return;
970 if (log_flags.app_msg_send) {
971 msg_printf(NULL, MSG_INFO,
972 "[app_msg_send] poll: %d msgs queued for %s:",
973 (int)msgs.size(), name
974 );
975 }
976 if (channel.send_msg(msgs[0].c_str())) {
977 if (log_flags.app_msg_send) {
978 msg_printf(NULL, MSG_INFO,
979 "[app_msg_send] poll: delayed sent %s", msgs[0].c_str()
980 );
981 }
982 msgs.erase(msgs.begin());
983 last_block = 0;
984 }
985 for (unsigned int i=0; i<msgs.size(); i++) {
986 if (log_flags.app_msg_send) {
987 msg_printf(NULL, MSG_INFO,
988 "[app_msg_send] poll: deferred: %s", msgs[i].c_str()
989 );
990 }
991 }
992 }
993
994 // if the last message in the buffer is "msg", remove it and return 1
995 //
msg_queue_purge(const char * msg)996 int MSG_QUEUE::msg_queue_purge(const char* msg) {
997 if (msgs.empty()) return 0;
998 string last_msg = msgs.back();
999 if (log_flags.app_msg_send) {
1000 msg_printf(NULL, MSG_INFO,
1001 "[app_msg_send] purge: wanted %s last msg is %s in %s",
1002 msg, last_msg.c_str(), name
1003 );
1004 }
1005 if (!strcmp(msg, last_msg.c_str())) {
1006 if (log_flags.app_msg_send) {
1007 msg_printf(NULL, MSG_INFO,
1008 "[app_msg_send] purged %s from %s", msg, name
1009 );
1010 }
1011 msgs.pop_back();
1012 return 1;
1013 }
1014 return 0;
1015 }
1016
timeout(double diff)1017 bool MSG_QUEUE::timeout(double diff) {
1018 if (!last_block) return false;
1019 if (gstate.now - last_block > diff) {
1020 return true;
1021 }
1022 return false;
1023 }
1024
1025 #endif
1026
report_overdue()1027 void ACTIVE_TASK_SET::report_overdue() {
1028 unsigned int i;
1029 ACTIVE_TASK* atp;
1030
1031 for (i=0; i<active_tasks.size(); i++) {
1032 atp = active_tasks[i];
1033 double diff = (gstate.now - atp->result->report_deadline)/86400;
1034 if (diff > 0) {
1035 msg_printf(atp->result->project, MSG_INFO,
1036 "Task %s is %.2f days overdue; you may not get credit for it. Consider aborting it.", atp->result->name, diff
1037 );
1038 }
1039 }
1040 }
1041
1042 // scan the slot directory, looking for files with names
1043 // of the form boinc_ufr_X.
1044 // Then mark file X as being present (and uploadable)
1045 //
handle_upload_files()1046 int ACTIVE_TASK::handle_upload_files() {
1047 std::string filename;
1048 char buf[MAXPATHLEN], path[MAXPATHLEN];
1049 int retval;
1050
1051 DirScanner dirscan(slot_dir);
1052 while (dirscan.scan(filename)) {
1053 safe_strcpy(buf, filename.c_str());
1054 if (strstr(buf, UPLOAD_FILE_REQ_PREFIX) == buf) {
1055 char* p = buf+strlen(UPLOAD_FILE_REQ_PREFIX);
1056 FILE_INFO* fip = result->lookup_file_logical(p);
1057 if (fip) {
1058 get_pathname(fip, path, sizeof(path));
1059 retval = md5_file(path, fip->md5_cksum, fip->nbytes);
1060 if (retval) {
1061 fip->status = retval;
1062 } else {
1063 fip->status = FILE_PRESENT;
1064 }
1065 } else {
1066 msg_printf(wup->project, MSG_INTERNAL_ERROR,
1067 "Can't find uploadable file %s", p
1068 );
1069 }
1070 snprintf(path, sizeof(path), "%s/%s", slot_dir, buf);
1071 delete_project_owned_file(path, true); // delete the link file
1072 }
1073 }
1074 return 0;
1075 }
1076
handle_upload_files()1077 void ACTIVE_TASK_SET::handle_upload_files() {
1078 for (unsigned int i=0; i<active_tasks.size(); i++) {
1079 ACTIVE_TASK* atp = active_tasks[i];
1080 atp->handle_upload_files();
1081 }
1082 }
1083
want_network()1084 bool ACTIVE_TASK_SET::want_network() {
1085 for (unsigned int i=0; i<active_tasks.size(); i++) {
1086 ACTIVE_TASK* atp = active_tasks[i];
1087 if (atp->want_network) return true;
1088 }
1089 return false;
1090 }
1091
network_available()1092 void ACTIVE_TASK_SET::network_available() {
1093 #ifndef SIM
1094 for (unsigned int i=0; i<active_tasks.size(); i++) {
1095 ACTIVE_TASK* atp = active_tasks[i];
1096 if (atp->want_network) {
1097 atp->send_network_available();
1098 }
1099 }
1100 #endif
1101 }
1102
upload_notify_app(const FILE_INFO * fip,const FILE_REF * frp)1103 void ACTIVE_TASK::upload_notify_app(const FILE_INFO* fip, const FILE_REF* frp) {
1104 char path[MAXPATHLEN];
1105 snprintf(path, sizeof(path),
1106 "%s/%s%s",
1107 slot_dir, UPLOAD_FILE_STATUS_PREFIX, frp->open_name
1108 );
1109 FILE* f = boinc_fopen(path, "w");
1110 if (!f) return;
1111 fprintf(f, "<status>%d</status>\n", fip->status);
1112 fclose(f);
1113 send_upload_file_status = true;
1114 }
1115
1116 // a file upload has finished.
1117 // If any running apps are waiting for it, notify them
1118 //
upload_notify_app(FILE_INFO * fip)1119 void ACTIVE_TASK_SET::upload_notify_app(FILE_INFO* fip) {
1120 for (unsigned int i=0; i<active_tasks.size(); i++) {
1121 ACTIVE_TASK* atp = active_tasks[i];
1122 RESULT* rp = atp->result;
1123 FILE_REF* frp = rp->lookup_file(fip);
1124 if (frp) {
1125 atp->upload_notify_app(fip, frp);
1126 }
1127 }
1128 }
1129
1130 #ifndef SIM
init()1131 void ACTIVE_TASK_SET::init() {
1132 for (unsigned int i=0; i<active_tasks.size(); i++) {
1133 ACTIVE_TASK* atp = active_tasks[i];
1134 atp->init(atp->result);
1135 atp->scheduler_state = CPU_SCHED_PREEMPTED;
1136 atp->read_task_state_file();
1137 atp->current_cpu_time = atp->checkpoint_cpu_time;
1138 atp->elapsed_time = atp->checkpoint_elapsed_time;
1139 }
1140 }
1141
1142 #endif
1143
set_task_state(int val,const char * where)1144 void ACTIVE_TASK::set_task_state(int val, const char* where) {
1145 _task_state = val;
1146 if (log_flags.task_debug) {
1147 msg_printf(result->project, MSG_INFO,
1148 "[task] task_state=%s for %s from %s",
1149 active_task_state_string(val), result->name, where
1150 );
1151 }
1152 }
1153
1154 #ifndef SIM
1155 #ifdef NEW_CPU_THROTTLE
1156 #ifdef _WIN32
throttler(LPVOID)1157 DWORD WINAPI throttler(LPVOID) {
1158 #else
1159 void* throttler(void*) {
1160 #endif
1161
1162 // Initialize diagnostics framework for this thread
1163 //
1164 diagnostics_thread_init();
1165
1166 while (1) {
1167 client_mutex.lock();
1168 if (gstate.tasks_suspended
1169 || gstate.global_prefs.cpu_usage_limit > 99
1170 || gstate.global_prefs.cpu_usage_limit < 0.005
1171 ) {
1172 client_mutex.unlock();
1173 // ::Sleep((int)(1000*10)); // for Win debugging
1174 boinc_sleep(10);
1175 continue;
1176 }
1177 double on, off, on_frac = gstate.global_prefs.cpu_usage_limit / 100;
1178 #if 0
1179 // sub-second CPU throttling
1180 // DOESN'T WORK BECAUSE OF 1-SEC API POLL
1181 #define THROTTLE_PERIOD 1.
1182 on = THROTTLE_PERIOD * on_frac;
1183 off = THROTTLE_PERIOD - on;
1184 #else
1185 // throttling w/ at least 1 sec between suspend/resume
1186 if (on_frac > .5) {
1187 off = 1;
1188 on = on_frac/(1.-on_frac);
1189 } else {
1190 on = 1;
1191 off = (1.-on_frac)/on_frac;
1192 }
1193 #endif
1194
1195 gstate.tasks_throttled = true;
1196 gstate.active_tasks.suspend_all(SUSPEND_REASON_CPU_THROTTLE);
1197 client_mutex.unlock();
1198 boinc_sleep(off);
1199 client_mutex.lock();
1200 if (!gstate.tasks_suspended) {
1201 gstate.active_tasks.unsuspend_all(SUSPEND_REASON_CPU_THROTTLE);
1202 }
1203 gstate.tasks_throttled = false;
1204 client_mutex.unlock();
1205 boinc_sleep(on);
1206 }
1207 return 0;
1208 }
1209 #endif
1210 #endif
1211