1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC. If not, see <http://www.gnu.org/licenses/>.
17
18 // Feeder: create a shared memory segment containing DB info,
19 // including an array of work items (results/workunits to send).
20 //
21 // Usage: feeder [ options ]
22 // [ -d x ] debug level x
23 // [ --allapps ] interleave results from all applications uniformly
24 // [ --by_batch ] interleave results from all batches uniformly
25 // [ --random_order ] order by "random" field of result
26 // [ --random_order_db ] randomize order with SQL rand(sysdate())
27 // [ --priority_order ] order by decreasing "priority" field of result
28 // [ --priority_asc ] order by increasing "priority" field of result
29 // [ --priority_order_create_time ]
30 // order by priority, then by increasing WU create time
31 // [ --mod n i ] handle only results with (id mod n) == i
32 // [ --wmod n i ] handle only workunits with (id mod n) == i
33 // recommended if using HR with multiple schedulers
34 // [ --sleep_interval x ] sleep x seconds if nothing to do
35 // [ --appids a1{,a2} ] get work only for appids a1,...
36 // (comma-separated list)
37 // [ --purge_stale x ] remove work items from the shared memory segment
38 // that have been there for longer then x minutes
39 // but haven't been assigned
40 //
41 // The feeder tries to keep the work array filled.
42 // It maintains a DB enumerator (DB_WORK_ITEM).
43 // scan_work_array() scans the work array.
44 // looking for empty slots and trying to fill them in.
45 // The enumeration may return results already in the array.
46 // So, for each result, we scan the entire array to make sure
47 // it's not there already (can this be streamlined?)
48 //
49 // The length of the enum (max and actual) and the number of empty
50 // slots may differ; either one may be larger.
51 // New jobs may arrive (from the transitioner at any time).
52 // So we use the following policies:
53 //
54 // - Restart the enum at most once during a given array scan
55 // - If a scan doesn't add anything (i.e. array is full, or nothing in DB)
56 // sleep for N seconds
57 // - If an enumerated job was already in the array,
58 // stop the scan and sleep for N seconds
59 // - Otherwise immediately start another scan
60
61 // If --allapps is used:
62 // - there are separate DB enumerators for each app
63 // - the work array is interleaved by application, based on their weights.
64 // slot_to_app[] maps slot (i.e. work array index) to app index.
65 // app_count[] is the number of slots per app
66 // (approximately proportional to its weight)
67
68 // Homogeneous redundancy (HR):
69 // If HR is used, jobs can either be "uncommitted"
70 // (can send to any HR class)
71 // or "committed" (can send only to one HR class).
72 // The feeder tries to maintain a ratio of committed to uncommitted
73 // (generally 50/50) and, of committed jobs, ratios between HR classes
74 // (proportional to the total RAC of hosts in that class).
75 // This is to maximize the likelihood of having work for an average host.
76 //
77 // If you use different HR types between apps, you must use --allapps.
78 // Otherwise we wouldn't know how many slots to reserve for each HR type.
79 //
80 // It's OK to use HR for some apps and not others.
81
82 // Trigger files:
83 // The feeder program periodically checks for two trigger files:
84 //
85 // stop_server: destroy shmem and exit
86 // leave trigger file there (for other daemons)
87 // reread_db: update DB contents in existing shmem
88 // delete trigger file
89
90 // If you get an "Invalid argument" error when trying to run the feeder,
91 // it is likely that you aren't able to allocate enough shared memory.
92 // Either increase the maximum shared memory segment size in the kernel
93 // configuration, or decrease the MAX_PLATFORMS, MAX_APPS
94 // MAX_APP_VERSIONS, and MAX_WU_RESULTS in sched_shmem.h
95
96 #include "config.h"
97 #include <cstdio>
98 #include <cstdlib>
99 #include <cstring>
100 #include <string>
101 #include <ctime>
102 #include <csignal>
103 #include <unistd.h>
104 #include <cmath>
105 #include <sys/types.h>
106 #include <sys/stat.h>
107 #include <sys/param.h>
108 #include <vector>
109 using std::vector;
110
111 #include "boinc_db.h"
112 #include "error_numbers.h"
113 #include "filesys.h"
114 #include "shmem.h"
115 #include "str_util.h"
116 #include "svn_version.h"
117 #include "synch.h"
118 #include "util.h"
119 #include "version.h"
120
121 #include "credit.h"
122 #include "sched_config.h"
123 #include "sched_shmem.h"
124 #include "sched_util.h"
125 #include "sched_msgs.h"
126 #include "hr_info.h"
127 #ifdef GCL_SIMULATOR
128 #include "gcl_simulator.h"
129 #endif
130
131 #define DEFAULT_SLEEP_INTERVAL 5
132 #define AV_UPDATE_PERIOD 600
133
134 #define REREAD_DB_FILENAME "reread_db"
135
136 #define ENUM_FIRST_PASS 0
137 #define ENUM_SECOND_PASS 1
138 #define ENUM_OVER 2
139
140 SCHED_SHMEM* ssp;
141 key_t sema_key;
142 const char* order_clause="";
143 char mod_select_clause[256];
144 int sleep_interval = DEFAULT_SLEEP_INTERVAL;
145 bool all_apps = false;
146 int purge_stale_time = 0;
147 int num_work_items = MAX_WU_RESULTS;
148 int enum_limit = MAX_WU_RESULTS*2;
149
150 // The following defined if --allapps:
151 int *enum_sizes;
152 // the enum size per app; else not used
153 int *app_indices;
154 // maps slot number to app index, else all zero
155 int napps;
156 // number of apps, else one
157
158 HR_INFO hr_info;
159 bool using_hr;
160 // true iff any app is using HR
161 bool is_main_feeder = true;
162 // false if using --mod or --wmod and this one isn't 0
163
signal_handler(int)164 void signal_handler(int) {
165 log_messages.printf(MSG_NORMAL, "Signaled by simulator\n");
166 return;
167 }
168
cleanup_shmem()169 void cleanup_shmem() {
170 ssp->ready = false;
171 detach_shmem((void*)ssp);
172 destroy_shmem(config.shmem_key);
173 }
174
check_reread_trigger()175 int check_reread_trigger() {
176 FILE* f;
177 f = fopen(config.project_path(REREAD_DB_FILENAME), "r");
178 if (f) {
179 fclose(f);
180 log_messages.printf(MSG_NORMAL,
181 "Found trigger file %s; re-scanning database tables.\n",
182 REREAD_DB_FILENAME
183 );
184 ssp->init(num_work_items);
185 ssp->scan_tables();
186 ssp->perf_info.get_from_db();
187 int retval = unlink(config.project_path(REREAD_DB_FILENAME));
188 if (retval) {
189 // if we can't remove trigger file, exit to avoid infinite loop
190 //
191 log_messages.printf(MSG_CRITICAL,
192 "Can't unlink trigger file; exiting\n"
193 );
194 }
195 log_messages.printf(MSG_NORMAL,
196 "Done re-scanning: trigger file removed.\n"
197 );
198 }
199 return 0;
200 }
201
202 // Count the # of slots used by HR classes.
203 // This is done at the start of each array scan,
204 // and doesn't reflect slots that have been emptied out by the scheduler
205 //
hr_count_slots()206 void hr_count_slots() {
207 int i, j;
208
209 for (i=1; i<HR_NTYPES; i++) {
210 if (!hr_info.type_being_used[i]) continue;
211 for (j=0; j<hr_nclasses[i]; j++) {
212 hr_info.cur_slots[i][j] = 0;
213 }
214 }
215 for (i=0; i<ssp->max_wu_results; i++) {
216 int app_index = app_indices[i];
217 int hrt = ssp->apps[app_index].homogeneous_redundancy;
218 if (!hrt) continue;
219
220 WU_RESULT& wu_result = ssp->wu_results[i];
221 if (wu_result.state == WR_STATE_PRESENT) {
222 int hrc = wu_result.workunit.hr_class;
223 if (hrc < 0 || hrc >= hr_nclasses[hrt]) {
224 log_messages.printf(MSG_CRITICAL,
225 "HR class %d is out of range\n", hrc
226 );
227 continue;
228 }
229 hr_info.cur_slots[hrt][hrc]++;
230 }
231 }
232 }
233
234 // Enumerate jobs from DB until find one that is not already in the work array.
235 // If find one, return true.
236 // If reach end of enum for second time on this array scan, return false
237 //
get_job_from_db(DB_WORK_ITEM & wi,int app_index,int & enum_phase,int & ncollisions)238 static bool get_job_from_db(
239 DB_WORK_ITEM& wi, // enumerator to get job from
240 int app_index, // if using --allapps, the app index
241 int& enum_phase,
242 int& ncollisions
243 ) {
244 bool collision;
245 int retval, j, enum_size;
246 char select_clause[256];
247
248 if (all_apps) {
249 sprintf(select_clause, "%s and r1.appid=%lu",
250 mod_select_clause, ssp->apps[app_index].id
251 );
252 enum_size = enum_sizes[app_index];
253 } else {
254 safe_strcpy(select_clause, mod_select_clause);
255 enum_size = enum_limit;
256 }
257 int hrt = ssp->apps[app_index].homogeneous_redundancy;
258
259 while (1) {
260 if (hrt && config.hr_allocate_slots) {
261 retval = wi.enumerate_all(enum_size, select_clause);
262 } else {
263 retval = wi.enumerate(enum_size, select_clause, order_clause);
264 }
265 if (retval) {
266 if (retval != ERR_DB_NOT_FOUND) {
267 // If DB server dies, exit;
268 // so /start (run from crontab) will restart us eventually.
269 //
270 log_messages.printf(MSG_CRITICAL,
271 "DB connection lost, exiting\n"
272 );
273 exit(0);
274 }
275
276 // we've reach the end of the result set
277 //
278 switch (enum_phase) {
279 case ENUM_FIRST_PASS:
280 enum_phase = ENUM_SECOND_PASS;
281 ncollisions = 0;
282 // disregard collisions - maybe we'll find new jobs
283 break;
284 case ENUM_SECOND_PASS:
285 enum_phase = ENUM_OVER;
286 return false;
287 }
288 log_messages.printf(MSG_NORMAL,
289 "restarted enumeration for appid %lu\n",
290 ssp->apps[app_index].id
291 );
292 } else {
293 // Check for invalid application ID
294 //
295 if (!ssp->lookup_app(wi.wu.appid)) {
296 #if 0
297 log_messages.printf(MSG_CRITICAL,
298 "result [RESULT#%u] has bad appid %d; clean up your DB!\n",
299 wi.res_id, wi.wu.appid
300 );
301 #endif
302 continue;
303 }
304
305 // if the WU had an error, mark result as DIDNT_NEED
306 //
307 if (wi.wu.error_mask) {
308 char buf[256];
309 DB_RESULT result;
310 result.id = wi.res_id;
311 sprintf(buf, "server_state=%d, outcome=%d",
312 RESULT_SERVER_STATE_OVER,
313 RESULT_OUTCOME_DIDNT_NEED
314 );
315 result.update_field(buf);
316 log_messages.printf(MSG_NORMAL,
317 "[RESULT#%lu] WU had error, marking as DIDNT_NEED\n",
318 wi.res_id
319 );
320 continue;
321 }
322
323 // Check for collision (i.e. this result already is in the array)
324 //
325 collision = false;
326 for (j=0; j<ssp->max_wu_results; j++) {
327 if (ssp->wu_results[j].state != WR_STATE_EMPTY && ssp->wu_results[j].resultid == wi.res_id) {
328 // If the result is already in shared mem,
329 // and another instance of the WU has been sent,
330 // bump the infeasible count to encourage
331 // it to get sent more quickly
332 //
333 if (ssp->wu_results[j].infeasible_count == 0) {
334 if (wi.wu.hr_class > 0) {
335 ssp->wu_results[j].infeasible_count++;
336 }
337 }
338 ncollisions++;
339 collision = true;
340 log_messages.printf(MSG_DEBUG,
341 "result [RESULT#%lu] already in array\n", wi.res_id
342 );
343 break;
344 }
345 }
346 if (collision) {
347 continue;
348 }
349
350 // if using HR, check whether we've exceeded quota for this class
351 //
352 if (hrt && config.hr_allocate_slots) {
353 if (!hr_info.accept(hrt, wi.wu.hr_class)) {
354 log_messages.printf(MSG_DEBUG,
355 "rejecting [RESULT#%lu] because HR class %d/%d over quota\n",
356 wi.res_id, hrt, wi.wu.hr_class
357 );
358 continue;
359 }
360 }
361 return true;
362 }
363 }
364 return false; // never reached
365 }
366
367 // This function decides the interleaving used for --allapps.
368 // Inputs:
369 // n (number of weights)
370 // k (length of vector)
371 // a set of weights w(0)..w(n-1)
372 // Outputs:
373 // a vector v(0)..v(k-1) with values 0..n-1,
374 // where each value occurs with the given weight,
375 // and values are interleaved as much as possible.
376 // a vector count(0)..count(n-1) saying how many times
377 // each value occurs in v
378 //
weighted_interleave(double * weights,int n,int k,int * v,int * count)379 void weighted_interleave(double* weights, int n, int k, int* v, int* count) {
380 double *x = (double*) calloc(n, sizeof(double));
381 int i;
382 for (i=0; i<n; i++) {
383 // make sure apps with no weight get no slots
384 if (weights[i] == 0) {
385 x[i] = 1e-100;
386 }
387 count[i] = 0;
388 }
389 for (i=0; i<k; i++) {
390 int best = 0;
391 for (int j=1; j<n; j++) {
392 if (x[j] > x[best]) {
393 best = j;
394 }
395 }
396 v[i] = best;
397 x[best] -= 1/weights[best];
398 count[best]++;
399 }
400 free(x);
401 }
402
403 // update the job size statistics fields of array entries
404 //
update_job_stats()405 static void update_job_stats() {
406 int i, n=0;
407 double sum=0, sum_sqr=0;
408
409 for (i=0; i<ssp->max_wu_results; i++) {
410 WU_RESULT& wu_result = ssp->wu_results[i];
411 if (wu_result.state != WR_STATE_PRESENT) continue;
412 n++;
413 double e = wu_result.workunit.rsc_fpops_est;
414 sum += e;
415 sum_sqr += e*e;
416 }
417 double mean = 0;
418 double stdev = 1;
419 if (n != 0) {
420 mean = sum/n;
421 stdev = sqrt((sum_sqr - sum*mean)/n);
422 }
423 for (i=0; i<ssp->max_wu_results; i++) {
424 WU_RESULT& wu_result = ssp->wu_results[i];
425 if (wu_result.state != WR_STATE_PRESENT) continue;
426 double e = wu_result.workunit.rsc_fpops_est;
427 double diff = e - mean;
428 wu_result.fpops_size = diff/stdev;
429 }
430 }
431
432 // We're purging this item because it's been in shared mem too long.
433 // In general it will get added again soon.
434 // But if it's committed to an HR class,
435 // it could be because it got sent to a rare host.
436 // Un-commit it by zeroing out the WU's hr class,
437 // and incrementing target_nresults
438 //
purge_stale(WU_RESULT & wu_result)439 static void purge_stale(WU_RESULT& wu_result) {
440 DB_WORKUNIT wu;
441 wu.id = wu_result.workunit.id;
442 if (wu_result.workunit.hr_class) {
443 char buf[256];
444 sprintf(buf,
445 "hr_class=0, target_nresults=target_nresults+1, transition_time=%ld",
446 time(0)
447 );
448 wu.update_field(buf);
449 }
450 }
451
452 // Make one pass through the work array, filling in empty slots.
453 // Return true if we filled in any.
454 //
scan_work_array(vector<DB_WORK_ITEM> & work_items)455 static bool scan_work_array(vector<DB_WORK_ITEM> &work_items) {
456 int i;
457 bool found;
458 int enum_phase[napps];
459 int app_index;
460 int nadditions=0, ncollisions=0;
461
462 for (i=0; i<napps; i++) {
463 if (work_items[i].cursor.active) {
464 enum_phase[i] = ENUM_FIRST_PASS;
465 } else {
466 enum_phase[i] = ENUM_SECOND_PASS;
467 }
468 }
469
470 if (using_hr && config.hr_allocate_slots) {
471 hr_count_slots();
472 }
473
474 for (i=0; i<ssp->max_wu_results; i++) {
475 app_index = app_indices[i];
476
477 DB_WORK_ITEM& wi = work_items[app_index];
478 WU_RESULT& wu_result = ssp->wu_results[i];
479 switch (wu_result.state) {
480 case WR_STATE_PRESENT:
481 if (purge_stale_time && wu_result.time_added_to_shared_memory < (time(0) - purge_stale_time)) {
482 log_messages.printf(MSG_NORMAL,
483 "remove result [RESULT#%lu] from slot %d because it is stale\n",
484 wu_result.resultid, i
485 );
486 purge_stale(wu_result);
487 wu_result.state = WR_STATE_EMPTY;
488 // fall through, refill this array slot
489 } else {
490 break;
491 }
492 case WR_STATE_EMPTY:
493 if (enum_phase[app_index] == ENUM_OVER) continue;
494 found = get_job_from_db(
495 wi, app_index, enum_phase[app_index], ncollisions
496 );
497 if (found) {
498 log_messages.printf(MSG_NORMAL,
499 "adding result [RESULT#%lu] in slot %d\n",
500 wi.res_id, i
501 );
502 wu_result.resultid = wi.res_id;
503 wu_result.res_priority = wi.res_priority;
504 wu_result.res_server_state = wi.res_server_state;
505 wu_result.res_report_deadline = wi.res_report_deadline;
506 wu_result.workunit = wi.wu;
507 wu_result.state = WR_STATE_PRESENT;
508 // If the workunit has already been allocated to a certain
509 // OS then it should be assigned quickly,
510 // so we set its infeasible_count to 1
511 //
512 if (wi.wu.hr_class > 0) {
513 wu_result.infeasible_count = 1;
514 } else {
515 wu_result.infeasible_count = 0;
516 }
517 // set the need_reliable flag if needed
518 //
519 wu_result.need_reliable = false;
520 if (config.reliable_on_priority && wu_result.res_priority >= config.reliable_on_priority) {
521 wu_result.need_reliable = true;
522 }
523 wu_result.time_added_to_shared_memory = time(0);
524 nadditions++;
525 }
526 break;
527 default:
528 // here the state is a PID; see if it's still alive
529 //
530 int pid = wu_result.state;
531 struct stat s;
532 char buf[256];
533 sprintf(buf, "/proc/%d", pid);
534 log_messages.printf(MSG_NORMAL, "checking pid %d\n", pid);
535 if (stat(buf, &s)) {
536 wu_result.state = WR_STATE_PRESENT;
537 log_messages.printf(MSG_NORMAL,
538 "Result reserved by non-existent process PID %d; resetting\n",
539 pid
540 );
541 }
542 }
543 }
544 log_messages.printf(MSG_DEBUG, "Added %d results to array\n", nadditions);
545 if (ncollisions) {
546 log_messages.printf(MSG_DEBUG,
547 "%d results already in array\n", ncollisions
548 );
549 return false;
550 }
551 if (nadditions == 0) {
552 return false;
553 }
554 return true;
555 }
556
feeder_loop()557 void feeder_loop() {
558 vector<DB_WORK_ITEM> work_items;
559 double next_av_update_time=0;
560
561 // may need one enumeration per app; create vector
562 //
563 work_items.resize(napps);
564
565 while (1) {
566 bool action;
567 if (config.dont_send_jobs) {
568 action = false;
569 } else {
570 action = scan_work_array(work_items);
571 }
572 ssp->ready = true;
573 if (!action) {
574 #ifdef GCL_SIMULATOR
575 continue_simulation("feeder");
576 log_messages.printf(MSG_DEBUG, "Waiting for signal\n");
577 signal(SIGUSR2, simulator_signal_handler);
578 pause();
579 #else
580 log_messages.printf(MSG_DEBUG,
581 "No action; sleeping %d sec\n", sleep_interval
582 );
583 daemon_sleep(sleep_interval);
584 #endif
585 } else {
586 if (config.job_size_matching) {
587 update_job_stats();
588 }
589 }
590
591 double now = dtime();
592 if (is_main_feeder && now > next_av_update_time) {
593 int retval = update_av_scales(ssp);
594 if (retval) {
595 log_messages.printf(MSG_CRITICAL,
596 "update_av_scales failed: %s\n", boincerror(retval)
597 );
598 exit(1);
599 }
600 next_av_update_time = now + AV_UPDATE_PERIOD;
601 }
602 fflush(stdout);
603 check_stop_daemons();
604 check_reread_trigger();
605 }
606 }
607
608 // see if we're using HR, and if so initialize the necessary data structures
609 //
hr_init()610 void hr_init() {
611 int i, retval;
612 bool apps_differ = false;
613 bool some_app_uses_hr = false;
614 int hrt, hr_type0 = ssp->apps[0].homogeneous_redundancy;
615
616 using_hr = false;
617
618 for (i=0; i<ssp->napps; i++) {
619 hrt = ssp->apps[i].homogeneous_redundancy;
620 if (hrt <0 || hrt >= HR_NTYPES) {
621 log_messages.printf(MSG_CRITICAL,
622 "HR type %d out of range for app %d\n", hrt, i
623 );
624 exit(1);
625 }
626 if (hrt) some_app_uses_hr = true;
627 if (hrt != hr_type0) apps_differ = true;
628 }
629 if (config.homogeneous_redundancy) {
630 log_messages.printf(MSG_NORMAL,
631 "config HR is %d\n", config.homogeneous_redundancy
632 );
633 hrt = config.homogeneous_redundancy;
634 if (hrt < 0 || hrt >= HR_NTYPES) {
635 log_messages.printf(MSG_CRITICAL,
636 "Main HR type %d out of range\n", hrt
637 );
638 exit(1);
639 }
640 if (some_app_uses_hr) {
641 log_messages.printf(MSG_CRITICAL,
642 "You can specify HR at global or app level, but not both\n"
643 );
644 exit(1);
645 }
646 for (i=0; i<ssp->napps; i++) {
647 ssp->apps[i].homogeneous_redundancy = config.homogeneous_redundancy;
648 ssp->apps[i].weight = 1;
649 }
650
651 } else {
652 if (some_app_uses_hr) {
653 if (apps_differ && !all_apps) {
654 log_messages.printf(MSG_CRITICAL,
655 "You must use --allapps if apps have different HR\n"
656 );
657 exit(1);
658 }
659 } else {
660 return; // HR not being used
661 }
662 }
663 using_hr = true;
664 if (config.hr_allocate_slots) {
665 hr_info.init();
666 retval = hr_info.read_file();
667 if (retval) {
668 log_messages.printf(MSG_CRITICAL,
669 "Can't read HR info file: %s\n", boincerror(retval)
670 );
671 exit(1);
672 }
673
674 // find the weight for each HR type
675 //
676 for (i=0; i<ssp->napps; i++) {
677 hrt = ssp->apps[i].homogeneous_redundancy;
678 hr_info.type_weights[hrt] += ssp->apps[i].weight;
679 hr_info.type_being_used[hrt] = true;
680 }
681
682 // compute the slot allocations for HR classes
683 //
684 hr_info.allocate(ssp->max_wu_results);
685 hr_info.show(stderr);
686 }
687 }
688
689 // write a summary of feeder state to stderr
690 //
show_state(int)691 void show_state(int) {
692 ssp->show(stderr);
693 if (config.hr_allocate_slots) {
694 hr_info.show(stderr);
695 }
696 }
697
show_version()698 void show_version() {
699 log_messages.printf(MSG_NORMAL, "%s\n", SVN_VERSION);
700 }
701
usage(char * name)702 void usage(char *name) {
703 fprintf(stderr,
704 "%s creates a shared memory segment containing DB info,\n"
705 "including an array of work items (results/workunits to send).\n\n"
706 "Usage: %s [OPTION]...\n\n"
707 "Options:\n"
708 " [ -d X | --debug_level X] Set log verbosity to X (1..4)\n"
709 " [ --allapps ] Interleave results from all applications uniformly.\n"
710 " [ --random_order ] order by \"random\" field of result\n"
711 " [ --random_order_db ] randomize order with SQL rand(sysdate())\n"
712 " [ --priority_asc ] order by increasing \"priority\" field of result\n"
713 " [ --priority_order ] order by decreasing \"priority\" field of result\n"
714 " [ --priority_order_create_time ] order by priority, then by increasing WU create time\n"
715 " [ --purge_stale x ] remove work items from the shared memory segment after x secs\n"
716 " that have been there for longer then x minutes\n"
717 " but haven't been assigned\n"
718 " [ --appids a1{,a2} ] get work only for appids a1,... (comma-separated list)\n"
719 " [ --mod n i ] handle only results with (id mod n) == i\n"
720 " [ --wmod n i ] handle only workunits with (id mod n) == i\n"
721 " [ --sleep_interval x ] sleep x seconds if nothing to do\n"
722 " [ -h | --help ] Shows this help text.\n"
723 " [ -v | --version ] Shows version information.\n",
724 name, name
725 );
726 }
727
main(int argc,char ** argv)728 int main(int argc, char** argv) {
729 int i, retval;
730 void* p;
731 char path[MAXPATHLEN], order_buf[1024];
732
733 for (i=1; i<argc; i++) {
734 if (is_arg(argv[i], "d") || is_arg(argv[i], "debug_level")) {
735 if (!argv[++i]) {
736 log_messages.printf(MSG_CRITICAL, "%s requires an argument\n\n", argv[--i]);
737 usage(argv[0]);
738 exit(1);
739 }
740 int dl = atoi(argv[i]);
741 log_messages.set_debug_level(dl);
742 if (dl == 4) g_print_queries = true;
743 } else if (is_arg(argv[i], "random_order")) {
744 order_clause = "order by r1.random ";
745 } else if (is_arg(argv[i], "random_order_db")) {
746 order_clause = "order by rand(sysdate()) ";
747 } else if (is_arg(argv[i], "allapps")) {
748 all_apps = true;
749 } else if (is_arg(argv[i], "priority_asc")) {
750 order_clause = "order by r1.priority asc ";
751 } else if (is_arg(argv[i], "priority_order")) {
752 order_clause = "order by r1.priority desc ";
753 } else if (is_arg(argv[i], "priority_order_create_time")) {
754 order_clause = "order by r1.priority desc, r1.workunitid";
755 } else if (is_arg(argv[i], "by_batch")) {
756 // Evenly distribute work among batches
757 // The 0=1 causes anything before the union statement
758 // to result in an empty set,
759 // and the '#' at the end comments out anthing following our query
760 // This has allowed us to inject a more customizable query
761 //
762 sprintf(order_buf, "and 0=1 union (SELECT r1.id, r1.priority, r1.server_state, r1.report_deadline, workunit.* FROM workunit JOIN ("
763 "SELECT *, CASE WHEN @batch != t.batch THEN @rownum := 0 WHEN @batch = t.batch THEN @rownum := @rownum + 1 END AS rank, @batch := t.batch "
764 "FROM (SELECT @rownum := 0, @batch := 0, r.* FROM result r WHERE r.server_state=2 ORDER BY batch) t) r1 ON workunit.id=r1.workunitid "
765 "ORDER BY rank LIMIT %d)#",
766 enum_limit
767 );
768 order_clause = order_buf;
769 } else if (is_arg(argv[i], "purge_stale")) {
770 purge_stale_time = atoi(argv[++i])*60;
771 } else if (is_arg(argv[i], "appids")) {
772 if (!argv[++i]) {
773 log_messages.printf(MSG_CRITICAL, "%s requires an argument\n\n", argv[--i]);
774 usage(argv[0]);
775 exit(1);
776 }
777 strcat(mod_select_clause, " and workunit.appid in (");
778 safe_strcat(mod_select_clause, argv[i]);
779 strcat(mod_select_clause, ")");
780 } else if (is_arg(argv[i], "mod")) {
781 if (!argv[i+1] || !argv[i+2]) {
782 log_messages.printf(MSG_CRITICAL, "%s requires two arguments\n\n", argv[i]);
783 usage(argv[0]);
784 exit(1);
785 }
786 int n = atoi(argv[++i]);
787 int j = atoi(argv[++i]);
788 sprintf(mod_select_clause, "and r1.id %% %d = %d ", n, j);
789 is_main_feeder = (j==0);
790 } else if (is_arg(argv[i], "wmod")) {
791 if (!argv[i+1] || !argv[i+2]) {
792 log_messages.printf(MSG_CRITICAL, "%s requires two arguments\n\n", argv[i]);
793 usage(argv[0]);
794 exit(1);
795 }
796 int n = atoi(argv[++i]);
797 int j = atoi(argv[++i]);
798 sprintf(mod_select_clause, "and workunit.id %% %d = %d ", n, j);
799 is_main_feeder = (j==0);
800 } else if (is_arg(argv[i], "sleep_interval")) {
801 if (!argv[++i]) {
802 log_messages.printf(MSG_CRITICAL, "%s requires an argument\n\n", argv[--i]);
803 usage(argv[0]);
804 exit(1);
805 }
806 sleep_interval = atoi(argv[i]);
807 } else if (is_arg(argv[i], "v") || is_arg(argv[i], "version")) {
808 show_version();
809 exit(0);
810 } else if (is_arg(argv[i], "h") || is_arg(argv[i], "help")) {
811 usage(argv[0]);
812 exit(0);
813 } else {
814 log_messages.printf(MSG_CRITICAL, "unknown command line argument: %s\n\n", argv[i]);
815 usage(argv[0]);
816 exit(1);
817 }
818 }
819
820 retval = config.parse_file();
821 if (retval) {
822 log_messages.printf(MSG_CRITICAL,
823 "Can't parse config.xml: %s\n", boincerror(retval)
824 );
825 exit(1);
826 }
827
828 unlink(config.project_path(REREAD_DB_FILENAME));
829
830 log_messages.printf(MSG_NORMAL, "Starting\n");
831 show_version();
832
833 if (config.feeder_query_size) {
834 enum_limit = config.feeder_query_size;
835 }
836 if (config.shmem_work_items) {
837 num_work_items = config.shmem_work_items;
838 }
839 strlcpy(path, config.project_dir, sizeof(path));
840 get_key(path, 'a', sema_key);
841 destroy_semaphore(sema_key);
842 create_semaphore(sema_key);
843
844 retval = destroy_shmem(config.shmem_key);
845 if (retval) {
846 log_messages.printf(MSG_CRITICAL, "can't destroy shmem\n");
847 exit(1);
848 }
849
850 int shmem_size = sizeof(SCHED_SHMEM) + num_work_items*sizeof(WU_RESULT);
851 retval = create_shmem(config.shmem_key, shmem_size, 0 /* don't set GID */, &p);
852 if (retval) {
853 log_messages.printf(MSG_CRITICAL, "can't create shmem\n");
854 exit(1);
855 }
856 ssp = (SCHED_SHMEM*)p;
857 ssp->init(num_work_items);
858
859 atexit(cleanup_shmem);
860 install_stop_signal_handler();
861
862 retval = boinc_db.open(
863 config.db_name, config.db_host, config.db_user, config.db_passwd
864 );
865 if (retval) {
866 log_messages.printf(MSG_CRITICAL,
867 "boinc_db.open: %d; %s\n", retval, boinc_db.error_string()
868 );
869 exit(1);
870 }
871 retval = boinc_db.set_isolation_level(READ_UNCOMMITTED);
872 if (retval) {
873 log_messages.printf(MSG_CRITICAL,
874 "boinc_db.set_isolation_level: %d; %s\n", retval, boinc_db.error_string()
875 );
876 }
877 ssp->scan_tables();
878
879 log_messages.printf(MSG_NORMAL,
880 "read "
881 "%d platforms, "
882 "%d apps, "
883 "%d app_versions, "
884 "%d assignments\n",
885 ssp->nplatforms,
886 ssp->napps,
887 ssp->napp_versions,
888 ssp->nassignments
889 );
890 log_messages.printf(MSG_NORMAL,
891 "Using %d job slots\n", ssp->max_wu_results
892 );
893
894 app_indices = (int*) calloc(ssp->max_wu_results, sizeof(int));
895
896 // If all_apps is set, make an array saying which array slot
897 // is associated with which app
898 //
899 if (all_apps) {
900 napps = ssp->napps;
901 enum_sizes = (int*) calloc(ssp->napps, sizeof(int));
902 double* weights = (double*) calloc(ssp->napps, sizeof(double));
903 int* counts = (int*) calloc(ssp->napps, sizeof(int));
904 if (ssp->app_weight_sum == 0) {
905 for (i=0; i<ssp->napps; i++) {
906 ssp->apps[i].weight = 1;
907 }
908 ssp->app_weight_sum = ssp->napps;
909 }
910 for (i=0; i<ssp->napps; i++) {
911 weights[i] = ssp->apps[i].weight;
912 }
913 for (i=0; i<ssp->napps; i++) {
914 enum_sizes[i] = (int) floor(0.5 + enum_limit*(weights[i])/(ssp->app_weight_sum));
915 }
916 weighted_interleave(
917 weights, ssp->napps, ssp->max_wu_results, app_indices, counts
918 );
919 free(weights);
920 free(counts);
921 } else {
922 napps = 1;
923 }
924
925 hr_init();
926
927 if (using_hr && strlen(order_clause)) {
928 log_messages.printf(MSG_CRITICAL,
929 "Note: ordering options will not apply to apps for which homogeneous redundancy is used\n"
930 );
931 }
932
933 retval = ssp->perf_info.get_from_db();
934 if (retval) {
935 log_messages.printf(MSG_CRITICAL,
936 "PERF_INFO::get_from_db(): %d\n", retval
937 );
938 }
939
940 signal(SIGUSR1, show_state);
941
942 feeder_loop();
943 }
944
945 const char *BOINC_RCSID_57c87aa242 = "$Id$";
946