1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17 
18 // Abstraction of a set of executing applications,
19 // connected to I/O files in various ways.
20 // Shouldn't depend on CLIENT_STATE.
21 
22 #include "cpp.h"
23 
24 #ifdef _WIN32
25 #include "boinc_win.h"
26 #else
27 #include "config.h"
28 #endif
29 
30 #ifndef _WIN32
31 #include <unistd.h>
32 #if HAVE_SYS_WAIT_H
33 #include <sys/wait.h>
34 #endif
35 #if HAVE_SYS_TIME_H
36 #include <sys/time.h>
37 #endif
38 #if HAVE_SYS_RESOURCE_H
39 #include <sys/resource.h>
40 #endif
41 #if HAVE_SYS_TYPES_H
42 #include <sys/types.h>
43 #endif
44 #if HAVE_FCNTL_H
45 #include <fcntl.h>
46 #endif
47 
48 #include <cctype>
49 #include <ctime>
50 #include <cstdio>
51 #include <cmath>
52 #include <cstdlib>
53 #endif
54 
55 #ifdef _MSC_VER
56 #define snprintf _snprintf
57 #endif
58 
59 #include "error_numbers.h"
60 #include "filesys.h"
61 #include "file_names.h"
62 #include "parse.h"
63 #include "shmem.h"
64 #include "str_replace.h"
65 #include "str_util.h"
66 #include "util.h"
67 
68 #include "async_file.h"
69 #include "client_msgs.h"
70 #include "client_state.h"
71 #include "procinfo.h"
72 #include "result.h"
73 #include "sandbox.h"
74 #include "diagnostics.h"
75 
76 #include "app.h"
77 
78 using std::max;
79 using std::min;
80 
81 double exclusive_app_running = 0;
82 double exclusive_gpu_app_running = 0;
83 int gpu_suspend_reason;
84 double non_boinc_cpu_usage;
85 
~ACTIVE_TASK()86 ACTIVE_TASK::~ACTIVE_TASK() {
87 #ifndef SIM
88     if (async_copy) {
89         remove_async_copy(async_copy);
90     }
91 #endif
92 }
93 
ACTIVE_TASK()94 ACTIVE_TASK::ACTIVE_TASK() {
95 #ifdef _WIN32
96     safe_strcpy(shmem_seg_name, "");
97 #else
98     shmem_seg_name = 0;
99 #endif
100     result = NULL;
101     wup = NULL;
102     app_version = NULL;
103     pid = 0;
104 
105     _task_state = PROCESS_UNINITIALIZED;
106     slot = 0;
107     checkpoint_cpu_time = 0;
108     checkpoint_elapsed_time = 0;
109     checkpoint_fraction_done = 0;
110     checkpoint_fraction_done_elapsed_time = 0;
111     current_cpu_time = 0;
112     peak_working_set_size = 0;
113     peak_swap_size = 0;
114     peak_disk_usage = 0;
115     once_ran_edf = false;
116 
117     fraction_done = 0;
118     fraction_done_elapsed_time = 0;
119     first_fraction_done = 0;
120     first_fraction_done_elapsed_time = 0;
121     scheduler_state = CPU_SCHED_UNINITIALIZED;
122     next_scheduler_state = CPU_SCHED_UNINITIALIZED;
123     signal = 0;
124     run_interval_start_wall_time = gstate.now;
125     checkpoint_wall_time = 0;
126     elapsed_time = 0;
127     bytes_sent_episode = 0;
128     bytes_received_episode = 0;
129     bytes_sent = 0;
130     bytes_received = 0;
131     safe_strcpy(slot_dir, "");
132     safe_strcpy(slot_path, "");
133     max_elapsed_time = 0;
134     max_disk_usage = 0;
135     max_mem_usage = 0;
136     have_trickle_down = false;
137     send_upload_file_status = false;
138     too_large = false;
139     needs_shmem = false;
140     want_network = 0;
141     abort_time = 0;
142     premature_exit_count = 0;
143     quit_time = 0;
144     procinfo.clear();
145     procinfo.working_set_size_smoothed = 0;
146 #ifdef _WIN32
147     process_handle = NULL;
148     shm_handle = NULL;
149 #endif
150     premature_exit_count = 0;
151     overdue_checkpoint = false;
152     last_deadline_miss_time = 0;
153     safe_strcpy(web_graphics_url, "");
154     safe_strcpy(remote_desktop_addr, "");
155     async_copy = NULL;
156     finish_file_time = 0;
157 }
158 
process_exists()159 bool ACTIVE_TASK::process_exists() {
160     switch (task_state()) {
161     case PROCESS_EXECUTING:
162     case PROCESS_SUSPENDED:
163     case PROCESS_ABORT_PENDING:
164     case PROCESS_QUIT_PENDING:
165         return true;
166     }
167     return false;
168 }
169 
170 // preempt this task;
171 // called from the CLIENT_STATE::enforce_schedule()
172 // and ACTIVE_TASK_SET::suspend_all()
173 //
preempt(int preempt_type,int reason)174 int ACTIVE_TASK::preempt(int preempt_type, int reason) {
175     bool remove=false;
176 
177     switch (preempt_type) {
178     case REMOVE_NEVER:
179         remove = false;
180         break;
181     case REMOVE_MAYBE_USER:
182     case REMOVE_MAYBE_SCHED:
183         // GPU jobs: always remove from mem, since it's tying up GPU RAM
184         //
185         if (result->uses_gpu()) {
186             remove = true;
187             break;
188         }
189         // if it's never checkpointed, leave in mem
190         //
191         if (checkpoint_elapsed_time == 0) {
192             remove = false;
193             break;
194         }
195         // otherwise obey user prefs
196         //
197         remove = !gstate.global_prefs.leave_apps_in_memory;
198         break;
199     case REMOVE_ALWAYS:
200         remove = true;
201         break;
202     }
203 
204     bool show_msg = log_flags.cpu_sched && reason != SUSPEND_REASON_CPU_THROTTLE;
205     if (remove) {
206         if (show_msg) {
207             msg_printf(result->project, MSG_INFO,
208                 "[cpu_sched] Preempting %s (removed from memory)",
209                 result->name
210             );
211         }
212         return request_exit();
213     } else {
214         if (show_msg) {
215             msg_printf(result->project, MSG_INFO,
216                 "[cpu_sched] Preempting %s (left in memory)",
217                 result->name
218             );
219         }
220         if (task_state() != PROCESS_EXECUTING) return 0;
221         return suspend();
222     }
223     return 0;
224 }
225 
226 #ifndef SIM
227 
228 // called when the task's main process has exited.
229 // delete the shared memory used to communicate with it,
230 // and kill any remaining subsidiary processes.
231 //
cleanup_task()232 void ACTIVE_TASK::cleanup_task() {
233 #ifdef _WIN32
234     if (process_handle) {
235         CloseHandle(process_handle);
236         process_handle = NULL;
237     }
238     // detach from shared mem.
239     // This will destroy shmem seg since we're the last attachment
240     //
241     if (app_client_shm.shm) {
242         detach_shmem(shm_handle, app_client_shm.shm);
243         app_client_shm.shm = NULL;
244     }
245 #else
246     int retval;
247 
248     if (app_client_shm.shm) {
249 #ifndef __EMX__
250         if (app_version->api_version_at_least(6, 0)) {
251             retval = detach_shmem_mmap(app_client_shm.shm, sizeof(SHARED_MEM));
252         } else
253 #endif
254         {
255             retval = detach_shmem(app_client_shm.shm);
256             if (retval) {
257                 msg_printf(wup->project, MSG_INTERNAL_ERROR,
258                     "Couldn't detach shared memory: %s", boincerror(retval)
259                 );
260             }
261             retval = destroy_shmem(shmem_seg_name);
262             if (retval) {
263                 msg_printf(wup->project, MSG_INTERNAL_ERROR,
264                     "Couldn't destroy shared memory: %s", boincerror(retval)
265                 );
266             }
267         }
268         app_client_shm.shm = NULL;
269         gstate.retry_shmem_time = 0;
270     }
271 #endif
272 
273     kill_subsidiary_processes();
274 
275     if (cc_config.exit_after_finish) {
276         gstate.write_state_file();
277         exit(0);
278     }
279 }
280 
init(RESULT * rp)281 int ACTIVE_TASK::init(RESULT* rp) {
282     result = rp;
283     wup = rp->wup;
284     app_version = rp->avp;
285     max_elapsed_time = rp->wup->rsc_fpops_bound/rp->avp->flops;
286     max_disk_usage = rp->wup->rsc_disk_bound;
287     max_mem_usage = rp->wup->rsc_memory_bound;
288     get_slot_dir(slot, slot_dir, sizeof(slot_dir));
289     relative_to_absolute(slot_dir, slot_path);
290     return 0;
291 }
292 #endif
293 
294 // Deallocate memory to prevent unneeded reporting of memory leaks
295 //
free_mem()296 void ACTIVE_TASK_SET::free_mem() {
297     vector<ACTIVE_TASK*>::iterator at_iter;
298     ACTIVE_TASK *at;
299 
300     at_iter = active_tasks.begin();
301     while (at_iter != active_tasks.end()) {
302         at = active_tasks[0];
303         at_iter = active_tasks.erase(at_iter);
304         delete at;
305     }
306 }
307 
308 #ifndef SIM
309 
app_running(PROC_MAP & pm,const char * p)310 bool app_running(PROC_MAP& pm, const char* p) {
311     PROC_MAP::iterator i;
312     for (i=pm.begin(); i!=pm.end(); ++i) {
313         PROCINFO& pi = i->second;
314         //msg_printf(0, MSG_INFO, "running: [%s]", pi.command);
315         if (!strcasecmp(pi.command, p)) {
316             return true;
317         }
318     }
319     return false;
320 }
321 
322 #if 1  // debugging
procinfo_show(PROC_MAP & pm)323 void procinfo_show(PROC_MAP& pm) {
324     PROCINFO pi;
325     pi.clear();
326     PROC_MAP::iterator i;
327     for (i=pm.begin(); i!=pm.end(); ++i) {
328         PROCINFO& p = i->second;
329 
330         msg_printf(NULL, MSG_INFO, "%d %s: boinc? %d low_pri %d (u%f k%f)",
331             p.id, p.command, p.is_boinc_app, p.is_low_priority,
332             p.user_time, p.kernel_time
333         );
334 #ifdef _WIN32
335         if (p.id == 0) continue;
336 #endif
337         if (p.is_boinc_app) continue;
338         if (p.is_low_priority) continue;
339         pi.kernel_time += p.kernel_time;
340         pi.user_time += p.user_time;
341     }
342     msg_printf(NULL, MSG_INFO, "non-boinc: u%f k%f", pi.user_time, pi.kernel_time);
343 }
344 #endif
345 
346 // scan the set of all processes to
347 // 1) get the working-set size of active tasks
348 // 2) see if exclusive apps are running
349 // 3) get CPU time of non-BOINC processes
350 //
get_memory_usage()351 void ACTIVE_TASK_SET::get_memory_usage() {
352     static double last_mem_time=0;
353     unsigned int i;
354     int retval;
355     static bool first = true;
356     static double last_cpu_time;
357     double diff=0;
358 
359     if (!first) {
360         diff = gstate.now - last_mem_time;
361         if (diff < 0 || diff > MEMORY_USAGE_PERIOD + 10) {
362             // user has changed system clock,
363             // or there has been a long system sleep
364             //
365             last_mem_time = gstate.now;
366             return;
367         }
368         if (diff < MEMORY_USAGE_PERIOD) return;
369     }
370 
371     last_mem_time = gstate.now;
372     PROC_MAP pm;
373     retval = procinfo_setup(pm);
374     if (retval) {
375         if (log_flags.mem_usage_debug) {
376             msg_printf(NULL, MSG_INTERNAL_ERROR,
377                 "[mem_usage] procinfo_setup() returned %d", retval
378             );
379         }
380         return;
381     }
382     PROCINFO boinc_total;
383     if (log_flags.mem_usage_debug) {
384         boinc_total.clear();
385         boinc_total.working_set_size_smoothed = 0;
386     }
387     for (i=0; i<active_tasks.size(); i++) {
388         ACTIVE_TASK* atp = active_tasks[i];
389         if (atp->task_state() == PROCESS_UNINITIALIZED) continue;
390         if (atp->pid ==0) continue;
391 
392         // scan all active tasks with a process, even if not scheduled, because
393         // 1) we might have recently suspended a tasks,
394         //    and we still need to count its time
395         // 2) preempted tasks might not actually suspend themselves
396         //    (and we'd count that as non-BOINC CPU usage
397         //    and suspend everything).
398 
399         PROCINFO& pi = atp->procinfo;
400         unsigned long last_page_fault_count = pi.page_fault_count;
401         pi.clear();
402         pi.id = atp->pid;
403         vector<int>* v = NULL;
404         if (atp->other_pids.size()>0) {
405             v = &(atp->other_pids);
406         }
407         procinfo_app(pi, v, pm, atp->app_version->graphics_exec_file);
408         if (atp->app_version->is_vm_app) {
409             // the memory of virtual machine apps is not reported correctly,
410             // at least on Windows.  Use the VM size instead.
411             //
412             pi.working_set_size_smoothed = atp->wup->rsc_memory_bound;
413         } else {
414             pi.working_set_size_smoothed = .5*(pi.working_set_size_smoothed + pi.working_set_size);
415         }
416 
417         if (pi.working_set_size > atp->peak_working_set_size) {
418             atp->peak_working_set_size = pi.working_set_size;
419         }
420         if (pi.swap_size > atp->peak_swap_size) {
421             atp->peak_swap_size = pi.swap_size;
422         }
423 
424         if (!first) {
425             int pf = pi.page_fault_count - last_page_fault_count;
426             pi.page_fault_rate = pf/diff;
427             if (log_flags.mem_usage_debug) {
428                 msg_printf(atp->result->project, MSG_INFO,
429                     "[mem_usage] %s%s: WS %.2fMB, smoothed %.2fMB, swap %.2fMB, %.2f page faults/sec, user CPU %.3f, kernel CPU %.3f",
430                     atp->scheduler_state==CPU_SCHED_SCHEDULED?"":" (not running)",
431                     atp->result->name,
432                     pi.working_set_size/MEGA,
433                     pi.working_set_size_smoothed/MEGA,
434                     pi.swap_size/MEGA,
435                     pi.page_fault_rate,
436                     pi.user_time,
437                     pi.kernel_time
438                 );
439                 boinc_total.working_set_size += pi.working_set_size;
440                 boinc_total.working_set_size_smoothed += pi.working_set_size_smoothed;
441                 boinc_total.swap_size += pi.swap_size;
442                 boinc_total.page_fault_rate += pi.page_fault_rate;
443             }
444         }
445     }
446 
447     if (!first) {
448         if (log_flags.mem_usage_debug) {
449             msg_printf(0, MSG_INFO,
450                 "[mem_usage] BOINC totals: WS %.2fMB, smoothed %.2fMB, swap %.2fMB, %.2f page faults/sec",
451                 boinc_total.working_set_size/MEGA,
452                 boinc_total.working_set_size_smoothed/MEGA,
453                 boinc_total.swap_size/MEGA,
454                 boinc_total.page_fault_rate
455             );
456         }
457     }
458 
459     for (i=0; i<cc_config.exclusive_apps.size(); i++) {
460         if (app_running(pm, cc_config.exclusive_apps[i].c_str())) {
461             if (log_flags.mem_usage_debug) {
462                 msg_printf(NULL, MSG_INFO,
463                     "[mem_usage] exclusive app %s is running", cc_config.exclusive_apps[i].c_str()
464                 );
465             }
466             exclusive_app_running = gstate.now;
467             break;
468         }
469     }
470     for (i=0; i<cc_config.exclusive_gpu_apps.size(); i++) {
471         if (app_running(pm, cc_config.exclusive_gpu_apps[i].c_str())) {
472             if (log_flags.mem_usage_debug) {
473                 msg_printf(NULL, MSG_INFO,
474                     "[mem_usage] exclusive GPU app %s is running", cc_config.exclusive_gpu_apps[i].c_str()
475                 );
476             }
477             exclusive_gpu_app_running = gstate.now;
478             break;
479         }
480     }
481 
482     // get info on non-BOINC processes.
483     // mem usage info is not useful because most OSs don't
484     // move idle processes out of RAM, so physical memory is always full.
485     // Also (at least on Win) page faults are used for various things,
486     // not all of them generate disk I/O,
487     // so they're not useful for detecting paging/thrashing.
488     //
489     PROCINFO pi;
490     procinfo_non_boinc(pi, pm);
491     if (log_flags.mem_usage_debug) {
492         //procinfo_show(pm);
493         msg_printf(NULL, MSG_INFO,
494             "[mem_usage] All others: WS %.2fMB, swap %.2fMB, user %.3fs, kernel %.3fs",
495             pi.working_set_size/MEGA, pi.swap_size/MEGA,
496             pi.user_time, pi.kernel_time
497         );
498     }
499     double new_cpu_time = pi.user_time + pi.kernel_time;
500     if (!first) {
501         non_boinc_cpu_usage = (new_cpu_time - last_cpu_time)/(diff*gstate.host_info.p_ncpus);
502         // processes might have exited in the last 10 sec,
503         // causing this to be negative.
504         if (non_boinc_cpu_usage < 0) non_boinc_cpu_usage = 0;
505         if (log_flags.mem_usage_debug) {
506             msg_printf(NULL, MSG_INFO,
507                 "[mem_usage] non-BOINC CPU usage: %.2f%%", non_boinc_cpu_usage*100
508             );
509         }
510     }
511     last_cpu_time = new_cpu_time;
512     first = false;
513 }
514 
515 #endif
516 
517 // There's a new trickle file.
518 // Move it from slot dir to project dir
519 //
move_trickle_file()520 int ACTIVE_TASK::move_trickle_file() {
521     char new_path[MAXPATHLEN], old_path[MAXPATHLEN];
522     int retval;
523 
524     snprintf(old_path, sizeof(old_path),
525         "%s/trickle_up.xml",
526         slot_dir
527     );
528     snprintf(new_path, sizeof(new_path),
529         "%s/trickle_up_%s_%d.xml",
530         result->project->project_dir(), result->name, (int)time(0)
531     );
532     retval = boinc_rename(old_path, new_path);
533 
534     // if can't move it, remove
535     //
536     if (retval) {
537         delete_project_owned_file(old_path, true);
538         return ERR_RENAME;
539     }
540     return 0;
541 }
542 
543 // size of output files and files in slot dir
544 //
current_disk_usage(double & size)545 int ACTIVE_TASK::current_disk_usage(double& size) {
546     double x;
547     unsigned int i;
548     int retval;
549     FILE_INFO* fip;
550     char path[MAXPATHLEN];
551 
552     retval = dir_size(slot_dir, size);
553     if (retval) return retval;
554     for (i=0; i<result->output_files.size(); i++) {
555         fip = result->output_files[i].file_info;
556         get_pathname(fip, path, sizeof(path));
557         retval = file_size(path, x);
558         if (!retval) size += x;
559     }
560     if (size > peak_disk_usage) {
561         peak_disk_usage = size;
562     }
563     return 0;
564 }
565 
is_slot_in_use(int slot)566 bool ACTIVE_TASK_SET::is_slot_in_use(int slot) {
567     unsigned int i;
568     for (i=0; i<active_tasks.size(); i++) {
569         if (active_tasks[i]->slot == slot) {
570             return true;
571         }
572     }
573     return false;
574 }
575 
is_slot_dir_in_use(char * dir)576 bool ACTIVE_TASK_SET::is_slot_dir_in_use(char* dir) {
577     char path[MAXPATHLEN];
578     unsigned int i;
579     for (i=0; i<active_tasks.size(); i++) {
580         get_slot_dir(active_tasks[i]->slot, path, sizeof(path));
581         if (!strcmp(path, dir)) return true;
582     }
583     return false;
584 }
585 
586 // Get a free slot:
587 // either find an unused an empty slot dir,
588 // or create a new slot dir if needed
589 //
get_free_slot(RESULT * rp)590 int ACTIVE_TASK::get_free_slot(RESULT* rp) {
591 #ifndef SIM
592     int j, retval;
593     char path[MAXPATHLEN];
594 
595     // scan slot numbers: slots/0, slots/1, etc.
596     //
597     for (j=0; ; j++) {
598         // skip slots that are in use by existing jobs
599         //
600         if (gstate.active_tasks.is_slot_in_use(j)) continue;
601 
602         get_slot_dir(j, path, sizeof(path));
603         if (boinc_file_exists(path)) {
604             if (is_dir(path)) {
605                 // If the directory exists, try to clean it out.
606                 // If this succeeds, use it.
607                 //
608                 retval = client_clean_out_dir(path, "get_free_slot()");
609                 if (!retval) break;
610                 if (log_flags.slot_debug) {
611                     msg_printf(rp->project, MSG_INFO,
612                         "[slot] failed to clean out dir: %s",
613                         boincerror(retval)
614                     );
615                 }
616             }
617         } else {
618             // directory doesn't exist - create one
619             //
620             retval = make_slot_dir(j);
621             if (!retval) break;
622         }
623 
624         // paranoia - don't allow unbounded slots
625         //
626         if (j > gstate.ncpus*100) {
627             msg_printf(rp->project, MSG_INTERNAL_ERROR,
628                 "exceeded limit of %d slot directories", gstate.ncpus*100
629             );
630             return ERR_NULL;
631         }
632     }
633     slot = j;
634     if (log_flags.slot_debug) {
635         msg_printf(rp->project, MSG_INFO,
636             "[slot] assigning slot %d to %s", j, rp->name
637         );
638     }
639 #endif
640     return 0;
641 }
642 
slot_taken(int slot)643 bool ACTIVE_TASK_SET::slot_taken(int slot) {
644     unsigned int i;
645     for (i=0; i<active_tasks.size(); i++) {
646         if (active_tasks[i]->slot == slot) return true;
647     }
648     return false;
649 }
650 
651 // <active_task_state> is here for the benefit of 3rd-party software
652 // that reads the client state file
653 //
write(MIOFILE & fout)654 int ACTIVE_TASK::write(MIOFILE& fout) {
655     fout.printf(
656         "<active_task>\n"
657         "    <project_master_url>%s</project_master_url>\n"
658         "    <result_name>%s</result_name>\n"
659         "    <active_task_state>%d</active_task_state>\n"
660         "    <app_version_num>%d</app_version_num>\n"
661         "    <slot>%d</slot>\n"
662         "    <checkpoint_cpu_time>%f</checkpoint_cpu_time>\n"
663         "    <checkpoint_elapsed_time>%f</checkpoint_elapsed_time>\n"
664         "    <checkpoint_fraction_done>%f</checkpoint_fraction_done>\n"
665         "    <checkpoint_fraction_done_elapsed_time>%f</checkpoint_fraction_done_elapsed_time>\n"
666         "    <current_cpu_time>%f</current_cpu_time>\n"
667         "    <once_ran_edf>%d</once_ran_edf>\n"
668         "    <swap_size>%f</swap_size>\n"
669         "    <working_set_size>%f</working_set_size>\n"
670         "    <working_set_size_smoothed>%f</working_set_size_smoothed>\n"
671         "    <page_fault_rate>%f</page_fault_rate>\n"
672         "    <bytes_sent>%f</bytes_sent>\n"
673         "    <bytes_received>%f</bytes_received>\n",
674         result->project->master_url,
675         result->name,
676         task_state(),
677         app_version->version_num,
678         slot,
679         checkpoint_cpu_time,
680         checkpoint_elapsed_time,
681         checkpoint_fraction_done,
682         checkpoint_fraction_done_elapsed_time,
683         current_cpu_time,
684         once_ran_edf?1:0,
685         procinfo.swap_size,
686         procinfo.working_set_size,
687         procinfo.working_set_size_smoothed,
688         procinfo.page_fault_rate,
689         bytes_sent,
690         bytes_received
691     );
692     fout.printf("</active_task>\n");
693     return 0;
694 }
695 
696 #ifndef SIM
697 
write_gui(MIOFILE & fout)698 int ACTIVE_TASK::write_gui(MIOFILE& fout) {
699     // if the app hasn't reported fraction done or reported > 1,
700     // and a minute has elapsed, estimate fraction done in a
701     // way that constantly increases and approaches 1.
702     //
703     double fd = fraction_done;
704     if (((fd<=0)||(fd>1)) && elapsed_time > 60) {
705         double est_time = wup->rsc_fpops_est/app_version->flops;
706         double x = elapsed_time/est_time;
707         fd = 1 - exp(-x);
708     }
709     fout.printf(
710         "<active_task>\n"
711         "    <active_task_state>%d</active_task_state>\n"
712         "    <app_version_num>%d</app_version_num>\n"
713         "    <slot>%d</slot>\n"
714         "    <pid>%d</pid>\n"
715         "    <scheduler_state>%d</scheduler_state>\n"
716         "    <checkpoint_cpu_time>%f</checkpoint_cpu_time>\n"
717         "    <fraction_done>%f</fraction_done>\n"
718         "    <current_cpu_time>%f</current_cpu_time>\n"
719         "    <elapsed_time>%f</elapsed_time>\n"
720         "    <swap_size>%f</swap_size>\n"
721         "    <working_set_size>%f</working_set_size>\n"
722         "    <working_set_size_smoothed>%f</working_set_size_smoothed>\n"
723         "    <page_fault_rate>%f</page_fault_rate>\n"
724         "    <bytes_sent>%f</bytes_sent>\n"
725         "    <bytes_received>%f</bytes_received>\n"
726         "%s"
727         "%s",
728         task_state(),
729         app_version->version_num,
730         slot,
731         pid,
732         scheduler_state,
733         checkpoint_cpu_time,
734         fd,
735         current_cpu_time,
736         elapsed_time,
737         procinfo.swap_size,
738         procinfo.working_set_size,
739         procinfo.working_set_size_smoothed,
740         procinfo.page_fault_rate,
741         bytes_sent,
742         bytes_received,
743         too_large?"   <too_large/>\n":"",
744         needs_shmem?"   <needs_shmem/>\n":""
745     );
746     if (elapsed_time > first_fraction_done_elapsed_time) {
747         fout.printf(
748             "   <progress_rate>%f</progress_rate>\n",
749             (fd - first_fraction_done)/(elapsed_time - first_fraction_done_elapsed_time)
750         );
751     }
752     if (strlen(app_version->graphics_exec_path)) {
753         fout.printf(
754             "   <graphics_exec_path>%s</graphics_exec_path>\n"
755             "   <slot_path>%s</slot_path>\n",
756             app_version->graphics_exec_path,
757             slot_path
758         );
759     }
760     if (strlen(web_graphics_url)) {
761         fout.printf(
762             "   <web_graphics_url>%s</web_graphics_url>\n",
763             web_graphics_url
764         );
765     }
766     if (strlen(remote_desktop_addr)) {
767         fout.printf(
768             "   <remote_desktop_addr>%s</remote_desktop_addr>\n",
769             remote_desktop_addr
770         );
771     }
772     fout.printf("</active_task>\n");
773     return 0;
774 }
775 
776 #endif
777 
parse(XML_PARSER & xp)778 int ACTIVE_TASK::parse(XML_PARSER& xp) {
779     char result_name[256], project_master_url[256];
780     int n, dummy;
781     unsigned int i;
782     PROJECT* project=0;
783     double x;
784 
785     safe_strcpy(result_name, "");
786     safe_strcpy(project_master_url, "");
787 
788     while (!xp.get_tag()) {
789         if (xp.match_tag("/active_task")) {
790             project = gstate.lookup_project(project_master_url);
791             if (!project) {
792                 msg_printf(
793                     NULL, MSG_INTERNAL_ERROR,
794                     "State file error: project %s not found for task\n",
795                     project_master_url
796                 );
797                 return ERR_NULL;
798             }
799             result = gstate.lookup_result(project, result_name);
800             if (!result) {
801                 msg_printf(
802                     project, MSG_INTERNAL_ERROR,
803                     "State file error: result %s not found for task\n",
804                     result_name
805                 );
806                 return ERR_NULL;
807             }
808 
809             // various sanity checks
810             //
811             if (result->got_server_ack
812                 || result->ready_to_report
813                 || result->state() != RESULT_FILES_DOWNLOADED
814             ) {
815                 return ERR_BAD_RESULT_STATE;
816             }
817 
818             wup = result->wup;
819             app_version = gstate.lookup_app_version(
820                 result->app, result->platform, result->version_num,
821                 result->plan_class
822             );
823             if (!app_version) {
824                 msg_printf(
825                     project, MSG_INTERNAL_ERROR,
826                     "State file error: app %s platform %s version %d not found\n",
827                     result->app->name, result->platform, result->version_num
828                 );
829                 return ERR_NULL;
830             }
831 
832             // make sure no two active tasks are in same slot
833             //
834             for (i=0; i<gstate.active_tasks.active_tasks.size(); i++) {
835                 ACTIVE_TASK* atp = gstate.active_tasks.active_tasks[i];
836                 if (atp->slot == slot) {
837                     msg_printf(project, MSG_INTERNAL_ERROR,
838                         "State file error: two tasks in slot %d\n", slot
839                     );
840                     return ERR_BAD_RESULT_STATE;
841                 }
842             }
843 
844             // for 6.2/6.4 transition
845             //
846             if (checkpoint_elapsed_time == 0) {
847                 elapsed_time = checkpoint_cpu_time;
848                 checkpoint_elapsed_time = elapsed_time;
849             }
850 
851             // for 6.12.25-26 transition;
852             // old clients write fraction_done to state file;
853             // new clients don't
854             if (fraction_done && checkpoint_elapsed_time) {
855                 checkpoint_fraction_done = fraction_done;
856                 checkpoint_fraction_done_elapsed_time = checkpoint_elapsed_time;
857                 fraction_done_elapsed_time = checkpoint_elapsed_time;
858             } else {
859                 fraction_done = checkpoint_fraction_done;
860                 fraction_done_elapsed_time = checkpoint_fraction_done_elapsed_time;
861             }
862             return 0;
863         }
864         else if (xp.parse_str("result_name", result_name, sizeof(result_name))) continue;
865         else if (xp.parse_str("project_master_url", project_master_url, sizeof(project_master_url))) continue;
866         else if (xp.parse_int("slot", slot)) continue;
867         else if (xp.parse_int("active_task_state", dummy)) continue;
868         else if (xp.parse_double("checkpoint_cpu_time", checkpoint_cpu_time)) continue;
869         else if (xp.parse_double("checkpoint_elapsed_time", checkpoint_elapsed_time)) continue;
870         else if (xp.parse_double("checkpoint_fraction_done", checkpoint_fraction_done)) continue;
871         else if (xp.parse_double("checkpoint_fraction_done_elapsed_time", checkpoint_fraction_done_elapsed_time)) continue;
872         else if (xp.parse_bool("once_ran_edf", once_ran_edf)) continue;
873         else if (xp.parse_double("fraction_done", fraction_done)) continue;
874             // deprecated - for backwards compat
875         else if (xp.parse_int("app_version_num", n)) continue;
876         else if (xp.parse_double("swap_size",  procinfo.swap_size)) continue;
877         else if (xp.parse_double("working_set_size", procinfo.working_set_size)) continue;
878         else if (xp.parse_double("working_set_size_smoothed", procinfo.working_set_size_smoothed)) continue;
879         else if (xp.parse_double("page_fault_rate", procinfo.page_fault_rate)) continue;
880         else if (xp.parse_double("current_cpu_time", x)) continue;
881         else if (xp.parse_double("bytes_sent", bytes_sent)) continue;
882         else if (xp.parse_double("bytes_received", bytes_received)) continue;
883         else {
884             if (log_flags.unparsed_xml) {
885                 msg_printf(project, MSG_INFO,
886                     "[unparsed_xml] ACTIVE_TASK::parse(): unrecognized %s\n",
887                     xp.parsed_tag
888                 );
889             }
890         }
891     }
892     return ERR_XML_PARSE;
893 }
894 
write(MIOFILE & fout)895 int ACTIVE_TASK_SET::write(MIOFILE& fout) {
896     unsigned int i;
897     int retval;
898 
899     fout.printf("<active_task_set>\n");
900     for (i=0; i<active_tasks.size(); i++) {
901         retval = active_tasks[i]->write(fout);
902         if (retval) return retval;
903     }
904     fout.printf("</active_task_set>\n");
905     return 0;
906 }
907 
parse(XML_PARSER & xp)908 int ACTIVE_TASK_SET::parse(XML_PARSER& xp) {
909     while (!xp.get_tag()) {
910         if (xp.match_tag("/active_task_set")) return 0;
911         else if (xp.match_tag("active_task")) {
912 #ifdef SIM
913             ACTIVE_TASK at;
914             at.parse(xp);
915 #else
916             ACTIVE_TASK* atp = new ACTIVE_TASK;
917             int retval = atp->parse(xp);
918             if (!retval) {
919                 if (slot_taken(atp->slot)) {
920                     msg_printf(atp->result->project, MSG_INTERNAL_ERROR,
921                         "slot %d in use; discarding result %s",
922                         atp->slot, atp->result->name
923                     );
924                     retval = ERR_XML_PARSE;
925                 }
926             }
927             if (!retval) active_tasks.push_back(atp);
928             else delete atp;
929 #endif
930         } else {
931             if (log_flags.unparsed_xml) {
932                 msg_printf(NULL, MSG_INFO,
933                     "[unparsed_xml] ACTIVE_TASK_SET::parse(): unrecognized %s\n", xp.parsed_tag
934                 );
935             }
936         }
937     }
938     return ERR_XML_PARSE;
939 }
940 
941 #ifndef SIM
942 
init(char * n)943 void MSG_QUEUE::init(char* n) {
944     safe_strcpy(name, n);
945     last_block = 0;
946     msgs.clear();
947 }
948 
msg_queue_send(const char * msg,MSG_CHANNEL & channel)949 void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) {
950     if ((msgs.size()==0) && channel.send_msg(msg)) {
951         if (log_flags.app_msg_send) {
952             msg_printf(NULL, MSG_INFO,
953                 "[app_msg_send] sent %s to %s", msg, name
954             );
955         }
956         last_block = 0;
957         return;
958     }
959     if (log_flags.app_msg_send) {
960         msg_printf(NULL, MSG_INFO,
961             "[app_msg_send] deferred %s to %s", msg, name
962         );
963     }
964     msgs.push_back(string(msg));
965     if (!last_block) last_block = gstate.now;
966 }
967 
msg_queue_poll(MSG_CHANNEL & channel)968 void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) {
969     if (msgs.empty()) return;
970     if (log_flags.app_msg_send) {
971         msg_printf(NULL, MSG_INFO,
972             "[app_msg_send] poll: %d msgs queued for %s:",
973             (int)msgs.size(), name
974         );
975     }
976     if (channel.send_msg(msgs[0].c_str())) {
977         if (log_flags.app_msg_send) {
978             msg_printf(NULL, MSG_INFO,
979                 "[app_msg_send] poll: delayed sent %s", msgs[0].c_str()
980             );
981         }
982         msgs.erase(msgs.begin());
983         last_block = 0;
984     }
985     for (unsigned int i=0; i<msgs.size(); i++) {
986         if (log_flags.app_msg_send) {
987             msg_printf(NULL, MSG_INFO,
988                 "[app_msg_send] poll: deferred: %s", msgs[i].c_str()
989             );
990         }
991     }
992 }
993 
994 // if the last message in the buffer is "msg", remove it and return 1
995 //
msg_queue_purge(const char * msg)996 int MSG_QUEUE::msg_queue_purge(const char* msg) {
997     if (msgs.empty()) return 0;
998     string last_msg = msgs.back();
999     if (log_flags.app_msg_send) {
1000         msg_printf(NULL, MSG_INFO,
1001             "[app_msg_send] purge: wanted %s last msg is %s in %s",
1002             msg, last_msg.c_str(), name
1003         );
1004     }
1005     if (!strcmp(msg, last_msg.c_str())) {
1006         if (log_flags.app_msg_send) {
1007             msg_printf(NULL, MSG_INFO,
1008                 "[app_msg_send] purged %s from %s", msg, name
1009             );
1010         }
1011         msgs.pop_back();
1012         return 1;
1013     }
1014     return 0;
1015 }
1016 
timeout(double diff)1017 bool MSG_QUEUE::timeout(double diff) {
1018     if (!last_block) return false;
1019     if (gstate.now - last_block > diff) {
1020         return true;
1021     }
1022     return false;
1023 }
1024 
1025 #endif
1026 
report_overdue()1027 void ACTIVE_TASK_SET::report_overdue() {
1028     unsigned int i;
1029     ACTIVE_TASK* atp;
1030 
1031     for (i=0; i<active_tasks.size(); i++) {
1032         atp = active_tasks[i];
1033         double diff = (gstate.now - atp->result->report_deadline)/86400;
1034         if (diff > 0) {
1035             msg_printf(atp->result->project, MSG_INFO,
1036                 "Task %s is %.2f days overdue; you may not get credit for it.  Consider aborting it.", atp->result->name, diff
1037             );
1038         }
1039     }
1040 }
1041 
1042 // scan the slot directory, looking for files with names
1043 // of the form boinc_ufr_X.
1044 // Then mark file X as being present (and uploadable)
1045 //
handle_upload_files()1046 int ACTIVE_TASK::handle_upload_files() {
1047     std::string filename;
1048     char buf[MAXPATHLEN], path[MAXPATHLEN];
1049     int retval;
1050 
1051     DirScanner dirscan(slot_dir);
1052     while (dirscan.scan(filename)) {
1053         safe_strcpy(buf, filename.c_str());
1054         if (strstr(buf, UPLOAD_FILE_REQ_PREFIX) == buf) {
1055             char* p = buf+strlen(UPLOAD_FILE_REQ_PREFIX);
1056             FILE_INFO* fip = result->lookup_file_logical(p);
1057             if (fip) {
1058                 get_pathname(fip, path, sizeof(path));
1059                 retval = md5_file(path, fip->md5_cksum, fip->nbytes);
1060                 if (retval) {
1061                     fip->status = retval;
1062                 } else {
1063                     fip->status = FILE_PRESENT;
1064                 }
1065             } else {
1066                 msg_printf(wup->project, MSG_INTERNAL_ERROR,
1067                     "Can't find uploadable file %s", p
1068                 );
1069             }
1070             snprintf(path, sizeof(path), "%s/%s", slot_dir, buf);
1071             delete_project_owned_file(path, true);  // delete the link file
1072         }
1073     }
1074     return 0;
1075 }
1076 
handle_upload_files()1077 void ACTIVE_TASK_SET::handle_upload_files() {
1078     for (unsigned int i=0; i<active_tasks.size(); i++) {
1079         ACTIVE_TASK* atp = active_tasks[i];
1080         atp->handle_upload_files();
1081     }
1082 }
1083 
want_network()1084 bool ACTIVE_TASK_SET::want_network() {
1085     for (unsigned int i=0; i<active_tasks.size(); i++) {
1086         ACTIVE_TASK* atp = active_tasks[i];
1087         if (atp->want_network) return true;
1088     }
1089     return false;
1090 }
1091 
network_available()1092 void ACTIVE_TASK_SET::network_available() {
1093 #ifndef SIM
1094     for (unsigned int i=0; i<active_tasks.size(); i++) {
1095         ACTIVE_TASK* atp = active_tasks[i];
1096         if (atp->want_network) {
1097             atp->send_network_available();
1098         }
1099     }
1100 #endif
1101 }
1102 
upload_notify_app(const FILE_INFO * fip,const FILE_REF * frp)1103 void ACTIVE_TASK::upload_notify_app(const FILE_INFO* fip, const FILE_REF* frp) {
1104     char path[MAXPATHLEN];
1105     snprintf(path, sizeof(path),
1106         "%s/%s%s",
1107         slot_dir, UPLOAD_FILE_STATUS_PREFIX, frp->open_name
1108     );
1109     FILE* f = boinc_fopen(path, "w");
1110     if (!f) return;
1111     fprintf(f, "<status>%d</status>\n", fip->status);
1112     fclose(f);
1113     send_upload_file_status = true;
1114 }
1115 
1116 // a file upload has finished.
1117 // If any running apps are waiting for it, notify them
1118 //
upload_notify_app(FILE_INFO * fip)1119 void ACTIVE_TASK_SET::upload_notify_app(FILE_INFO* fip) {
1120     for (unsigned int i=0; i<active_tasks.size(); i++) {
1121         ACTIVE_TASK* atp = active_tasks[i];
1122         RESULT* rp = atp->result;
1123         FILE_REF* frp = rp->lookup_file(fip);
1124         if (frp) {
1125             atp->upload_notify_app(fip, frp);
1126         }
1127     }
1128 }
1129 
1130 #ifndef SIM
init()1131 void ACTIVE_TASK_SET::init() {
1132     for (unsigned int i=0; i<active_tasks.size(); i++) {
1133         ACTIVE_TASK* atp = active_tasks[i];
1134         atp->init(atp->result);
1135         atp->scheduler_state = CPU_SCHED_PREEMPTED;
1136         atp->read_task_state_file();
1137         atp->current_cpu_time = atp->checkpoint_cpu_time;
1138         atp->elapsed_time = atp->checkpoint_elapsed_time;
1139     }
1140 }
1141 
1142 #endif
1143 
set_task_state(int val,const char * where)1144 void ACTIVE_TASK::set_task_state(int val, const char* where) {
1145     _task_state = val;
1146     if (log_flags.task_debug) {
1147         msg_printf(result->project, MSG_INFO,
1148             "[task] task_state=%s for %s from %s",
1149             active_task_state_string(val), result->name, where
1150         );
1151     }
1152 }
1153 
1154 #ifndef SIM
1155 #ifdef NEW_CPU_THROTTLE
1156 #ifdef _WIN32
throttler(LPVOID)1157 DWORD WINAPI throttler(LPVOID) {
1158 #else
1159 void* throttler(void*) {
1160 #endif
1161 
1162     // Initialize diagnostics framework for this thread
1163     //
1164     diagnostics_thread_init();
1165 
1166     while (1) {
1167         client_mutex.lock();
1168         if (gstate.tasks_suspended
1169             || gstate.global_prefs.cpu_usage_limit > 99
1170             || gstate.global_prefs.cpu_usage_limit < 0.005
1171             ) {
1172             client_mutex.unlock();
1173 //            ::Sleep((int)(1000*10));  // for Win debugging
1174             boinc_sleep(10);
1175             continue;
1176         }
1177         double on, off, on_frac = gstate.global_prefs.cpu_usage_limit / 100;
1178 #if 0
1179 // sub-second CPU throttling
1180 // DOESN'T WORK BECAUSE OF 1-SEC API POLL
1181 #define THROTTLE_PERIOD 1.
1182         on = THROTTLE_PERIOD * on_frac;
1183         off = THROTTLE_PERIOD - on;
1184 #else
1185 // throttling w/ at least 1 sec between suspend/resume
1186         if (on_frac > .5) {
1187             off = 1;
1188             on = on_frac/(1.-on_frac);
1189         } else {
1190             on = 1;
1191             off = (1.-on_frac)/on_frac;
1192         }
1193 #endif
1194 
1195         gstate.tasks_throttled = true;
1196         gstate.active_tasks.suspend_all(SUSPEND_REASON_CPU_THROTTLE);
1197         client_mutex.unlock();
1198         boinc_sleep(off);
1199         client_mutex.lock();
1200         if (!gstate.tasks_suspended) {
1201             gstate.active_tasks.unsuspend_all(SUSPEND_REASON_CPU_THROTTLE);
1202         }
1203         gstate.tasks_throttled = false;
1204         client_mutex.unlock();
1205         boinc_sleep(on);
1206     }
1207     return 0;
1208 }
1209 #endif
1210 #endif
1211