1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17 
18 // Feeder: create a shared memory segment containing DB info,
19 // including an array of work items (results/workunits to send).
20 //
21 // Usage: feeder [ options ]
22 //  [ -d x ]                debug level x
23 //  [ --allapps ]           interleave results from all applications uniformly
24 //  [ --by_batch ]          interleave results from all batches uniformly
25 //  [ --random_order ]      order by "random" field of result
26 //  [ --random_order_db ]   randomize order with SQL rand(sysdate())
27 //  [ --priority_order ]    order by decreasing "priority" field of result
28 //  [ --priority_asc ]      order by increasing "priority" field of result
29 //  [ --priority_order_create_time ]
30 //                          order by priority, then by increasing WU create time
31 //  [ --mod n i ]           handle only results with (id mod n) == i
32 //  [ --wmod n i ]          handle only workunits with (id mod n) == i
33 //                          recommended if using HR with multiple schedulers
34 //  [ --sleep_interval x ]  sleep x seconds if nothing to do
35 //  [ --appids a1{,a2} ]    get work only for appids a1,...
36 //                          (comma-separated list)
37 //  [ --purge_stale x ]     remove work items from the shared memory segment
38 //                          that have been there for longer then x minutes
39 //                          but haven't been assigned
40 //
41 // The feeder tries to keep the work array filled.
42 // It maintains a DB enumerator (DB_WORK_ITEM).
43 // scan_work_array() scans the work array.
44 // looking for empty slots and trying to fill them in.
45 // The enumeration may return results already in the array.
46 // So, for each result, we scan the entire array to make sure
47 // it's not there already (can this be streamlined?)
48 //
49 // The length of the enum (max and actual) and the number of empty
50 // slots may differ; either one may be larger.
51 // New jobs may arrive (from the transitioner at any time).
52 // So we use the following policies:
53 //
54 // - Restart the enum at most once during a given array scan
55 // - If a scan doesn't add anything (i.e. array is full, or nothing in DB)
56 //   sleep for N seconds
57 // - If an enumerated job was already in the array,
58 //   stop the scan and sleep for N seconds
59 // - Otherwise immediately start another scan
60 
61 // If --allapps is used:
62 // - there are separate DB enumerators for each app
63 // - the work array is interleaved by application, based on their weights.
64 //   slot_to_app[] maps slot (i.e. work array index) to app index.
65 //   app_count[] is the number of slots per app
66 //   (approximately proportional to its weight)
67 
68 // Homogeneous redundancy (HR):
69 // If HR is used, jobs can either be "uncommitted"
70 // (can send to any HR class)
71 // or "committed" (can send only to one HR class).
72 // The feeder tries to maintain a ratio of committed to uncommitted
73 // (generally 50/50) and, of committed jobs, ratios between HR classes
74 // (proportional to the total RAC of hosts in that class).
75 // This is to maximize the likelihood of having work for an average host.
76 //
77 // If you use different HR types between apps, you must use --allapps.
78 // Otherwise we wouldn't know how many slots to reserve for each HR type.
79 //
80 // It's OK to use HR for some apps and not others.
81 
82 // Trigger files:
83 // The feeder program periodically checks for two trigger files:
84 //
85 // stop_server:  destroy shmem and exit
86 //               leave trigger file there (for other daemons)
87 // reread_db:    update DB contents in existing shmem
88 //               delete trigger file
89 
90 // If you get an "Invalid argument" error when trying to run the feeder,
91 // it is likely that you aren't able to allocate enough shared memory.
92 // Either increase the maximum shared memory segment size in the kernel
93 // configuration, or decrease the MAX_PLATFORMS, MAX_APPS
94 // MAX_APP_VERSIONS, and MAX_WU_RESULTS in sched_shmem.h
95 
96 #include "config.h"
97 #include <cstdio>
98 #include <cstdlib>
99 #include <cstring>
100 #include <string>
101 #include <ctime>
102 #include <csignal>
103 #include <unistd.h>
104 #include <cmath>
105 #include <sys/types.h>
106 #include <sys/stat.h>
107 #include <sys/param.h>
108 #include <vector>
109 using std::vector;
110 
111 #include "boinc_db.h"
112 #include "error_numbers.h"
113 #include "filesys.h"
114 #include "shmem.h"
115 #include "str_util.h"
116 #include "svn_version.h"
117 #include "synch.h"
118 #include "util.h"
119 #include "version.h"
120 
121 #include "credit.h"
122 #include "sched_config.h"
123 #include "sched_shmem.h"
124 #include "sched_util.h"
125 #include "sched_msgs.h"
126 #include "hr_info.h"
127 #ifdef GCL_SIMULATOR
128 #include "gcl_simulator.h"
129 #endif
130 
131 #define DEFAULT_SLEEP_INTERVAL  5
132 #define AV_UPDATE_PERIOD      600
133 
134 #define REREAD_DB_FILENAME      "reread_db"
135 
136 #define ENUM_FIRST_PASS     0
137 #define ENUM_SECOND_PASS    1
138 #define ENUM_OVER           2
139 
140 SCHED_SHMEM* ssp;
141 key_t sema_key;
142 const char* order_clause="";
143 char mod_select_clause[256];
144 int sleep_interval = DEFAULT_SLEEP_INTERVAL;
145 bool all_apps = false;
146 int purge_stale_time = 0;
147 int num_work_items = MAX_WU_RESULTS;
148 int enum_limit = MAX_WU_RESULTS*2;
149 
150 // The following defined if --allapps:
151 int *enum_sizes;
152     // the enum size per app; else not used
153 int *app_indices;
154     // maps slot number to app index, else all zero
155 int napps;
156     // number of apps, else one
157 
158 HR_INFO hr_info;
159 bool using_hr;
160     // true iff any app is using HR
161 bool is_main_feeder = true;
162     // false if using --mod or --wmod and this one isn't 0
163 
signal_handler(int)164 void signal_handler(int) {
165     log_messages.printf(MSG_NORMAL, "Signaled by simulator\n");
166     return;
167 }
168 
cleanup_shmem()169 void cleanup_shmem() {
170     ssp->ready = false;
171     detach_shmem((void*)ssp);
172     destroy_shmem(config.shmem_key);
173 }
174 
check_reread_trigger()175 int check_reread_trigger() {
176     FILE* f;
177     f = fopen(config.project_path(REREAD_DB_FILENAME), "r");
178     if (f) {
179         fclose(f);
180         log_messages.printf(MSG_NORMAL,
181             "Found trigger file %s; re-scanning database tables.\n",
182             REREAD_DB_FILENAME
183         );
184         ssp->init(num_work_items);
185         ssp->scan_tables();
186         ssp->perf_info.get_from_db();
187         int retval = unlink(config.project_path(REREAD_DB_FILENAME));
188         if (retval) {
189             // if we can't remove trigger file, exit to avoid infinite loop
190             //
191             log_messages.printf(MSG_CRITICAL,
192                 "Can't unlink trigger file; exiting\n"
193             );
194         }
195         log_messages.printf(MSG_NORMAL,
196             "Done re-scanning: trigger file removed.\n"
197         );
198     }
199     return 0;
200 }
201 
202 // Count the # of slots used by HR classes.
203 // This is done at the start of each array scan,
204 // and doesn't reflect slots that have been emptied out by the scheduler
205 //
hr_count_slots()206 void hr_count_slots() {
207     int i, j;
208 
209     for (i=1; i<HR_NTYPES; i++) {
210         if (!hr_info.type_being_used[i]) continue;
211         for (j=0; j<hr_nclasses[i]; j++) {
212             hr_info.cur_slots[i][j] = 0;
213         }
214     }
215     for (i=0; i<ssp->max_wu_results; i++) {
216         int app_index = app_indices[i];
217         int hrt = ssp->apps[app_index].homogeneous_redundancy;
218         if (!hrt) continue;
219 
220         WU_RESULT& wu_result = ssp->wu_results[i];
221         if (wu_result.state == WR_STATE_PRESENT) {
222             int hrc = wu_result.workunit.hr_class;
223             if (hrc < 0 || hrc >= hr_nclasses[hrt]) {
224                 log_messages.printf(MSG_CRITICAL,
225                     "HR class %d is out of range\n", hrc
226                 );
227                 continue;
228             }
229             hr_info.cur_slots[hrt][hrc]++;
230         }
231     }
232 }
233 
234 // Enumerate jobs from DB until find one that is not already in the work array.
235 // If find one, return true.
236 // If reach end of enum for second time on this array scan, return false
237 //
get_job_from_db(DB_WORK_ITEM & wi,int app_index,int & enum_phase,int & ncollisions)238 static bool get_job_from_db(
239     DB_WORK_ITEM& wi,    // enumerator to get job from
240     int app_index,       // if using --allapps, the app index
241     int& enum_phase,
242     int& ncollisions
243 ) {
244     bool collision;
245     int retval, j, enum_size;
246     char select_clause[256];
247 
248     if (all_apps) {
249         sprintf(select_clause, "%s and r1.appid=%lu",
250             mod_select_clause, ssp->apps[app_index].id
251         );
252         enum_size = enum_sizes[app_index];
253     } else {
254         safe_strcpy(select_clause, mod_select_clause);
255         enum_size = enum_limit;
256     }
257     int hrt = ssp->apps[app_index].homogeneous_redundancy;
258 
259     while (1) {
260         if (hrt && config.hr_allocate_slots) {
261             retval = wi.enumerate_all(enum_size, select_clause);
262         } else {
263             retval = wi.enumerate(enum_size, select_clause, order_clause);
264         }
265         if (retval) {
266             if (retval != ERR_DB_NOT_FOUND) {
267                 // If DB server dies, exit;
268                 // so /start (run from crontab) will restart us eventually.
269                 //
270                 log_messages.printf(MSG_CRITICAL,
271                     "DB connection lost, exiting\n"
272                 );
273                 exit(0);
274             }
275 
276             // we've reach the end of the result set
277             //
278             switch (enum_phase) {
279             case ENUM_FIRST_PASS:
280                 enum_phase = ENUM_SECOND_PASS;
281                 ncollisions = 0;
282                     // disregard collisions - maybe we'll find new jobs
283                 break;
284             case ENUM_SECOND_PASS:
285                 enum_phase = ENUM_OVER;
286                 return false;
287             }
288             log_messages.printf(MSG_NORMAL,
289                 "restarted enumeration for appid %lu\n",
290                 ssp->apps[app_index].id
291             );
292         } else {
293             // Check for invalid application ID
294             //
295             if (!ssp->lookup_app(wi.wu.appid)) {
296 #if 0
297                 log_messages.printf(MSG_CRITICAL,
298                     "result [RESULT#%u] has bad appid %d; clean up your DB!\n",
299                     wi.res_id, wi.wu.appid
300                 );
301 #endif
302                 continue;
303             }
304 
305             // if the WU had an error, mark result as DIDNT_NEED
306             //
307             if (wi.wu.error_mask) {
308                 char buf[256];
309                 DB_RESULT result;
310                 result.id = wi.res_id;
311                 sprintf(buf, "server_state=%d, outcome=%d",
312                     RESULT_SERVER_STATE_OVER,
313                     RESULT_OUTCOME_DIDNT_NEED
314                 );
315                 result.update_field(buf);
316                 log_messages.printf(MSG_NORMAL,
317                     "[RESULT#%lu] WU had error, marking as DIDNT_NEED\n",
318                     wi.res_id
319                 );
320                 continue;
321             }
322 
323             // Check for collision (i.e. this result already is in the array)
324             //
325             collision = false;
326             for (j=0; j<ssp->max_wu_results; j++) {
327                 if (ssp->wu_results[j].state != WR_STATE_EMPTY && ssp->wu_results[j].resultid == wi.res_id) {
328                     // If the result is already in shared mem,
329                     // and another instance of the WU has been sent,
330                     // bump the infeasible count to encourage
331                     // it to get sent more quickly
332                     //
333                     if (ssp->wu_results[j].infeasible_count == 0) {
334                         if (wi.wu.hr_class > 0) {
335                             ssp->wu_results[j].infeasible_count++;
336                         }
337                     }
338                     ncollisions++;
339                     collision = true;
340                     log_messages.printf(MSG_DEBUG,
341                         "result [RESULT#%lu] already in array\n", wi.res_id
342                     );
343                     break;
344                 }
345             }
346             if (collision) {
347                 continue;
348             }
349 
350             // if using HR, check whether we've exceeded quota for this class
351             //
352             if (hrt && config.hr_allocate_slots) {
353                 if (!hr_info.accept(hrt, wi.wu.hr_class)) {
354                     log_messages.printf(MSG_DEBUG,
355                         "rejecting [RESULT#%lu] because HR class %d/%d over quota\n",
356                         wi.res_id, hrt, wi.wu.hr_class
357                     );
358                     continue;
359                 }
360             }
361             return true;
362         }
363     }
364     return false;   // never reached
365 }
366 
367 // This function decides the interleaving used for --allapps.
368 // Inputs:
369 //   n (number of weights)
370 //   k (length of vector)
371 //   a set of weights w(0)..w(n-1)
372 // Outputs:
373 //   a vector v(0)..v(k-1) with values 0..n-1,
374 //     where each value occurs with the given weight,
375 //     and values are interleaved as much as possible.
376 //   a vector count(0)..count(n-1) saying how many times
377 //     each value occurs in v
378 //
weighted_interleave(double * weights,int n,int k,int * v,int * count)379 void weighted_interleave(double* weights, int n, int k, int* v, int* count) {
380     double *x = (double*) calloc(n, sizeof(double));
381     int i;
382     for (i=0; i<n; i++) {
383         // make sure apps with no weight get no slots
384         if (weights[i] == 0) {
385             x[i] = 1e-100;
386         }
387         count[i] = 0;
388     }
389     for (i=0; i<k; i++) {
390         int best = 0;
391         for (int j=1; j<n; j++) {
392             if (x[j] > x[best]) {
393                 best = j;
394             }
395         }
396         v[i] = best;
397         x[best] -= 1/weights[best];
398         count[best]++;
399     }
400     free(x);
401 }
402 
403 // update the job size statistics fields of array entries
404 //
update_job_stats()405 static void update_job_stats() {
406     int i, n=0;
407     double sum=0, sum_sqr=0;
408 
409     for (i=0; i<ssp->max_wu_results; i++) {
410         WU_RESULT& wu_result = ssp->wu_results[i];
411         if (wu_result.state != WR_STATE_PRESENT) continue;
412         n++;
413         double e = wu_result.workunit.rsc_fpops_est;
414         sum += e;
415         sum_sqr += e*e;
416     }
417     double mean = 0;
418     double stdev = 1;
419     if (n != 0) {
420         mean = sum/n;
421         stdev = sqrt((sum_sqr - sum*mean)/n);
422     }
423     for (i=0; i<ssp->max_wu_results; i++) {
424         WU_RESULT& wu_result = ssp->wu_results[i];
425         if (wu_result.state != WR_STATE_PRESENT) continue;
426         double e = wu_result.workunit.rsc_fpops_est;
427         double diff = e - mean;
428         wu_result.fpops_size = diff/stdev;
429     }
430 }
431 
432 // We're purging this item because it's been in shared mem too long.
433 // In general it will get added again soon.
434 // But if it's committed to an HR class,
435 // it could be because it got sent to a rare host.
436 // Un-commit it by zeroing out the WU's hr class,
437 // and incrementing target_nresults
438 //
purge_stale(WU_RESULT & wu_result)439 static void purge_stale(WU_RESULT& wu_result) {
440     DB_WORKUNIT wu;
441     wu.id = wu_result.workunit.id;
442     if (wu_result.workunit.hr_class) {
443         char buf[256];
444         sprintf(buf,
445             "hr_class=0, target_nresults=target_nresults+1, transition_time=%ld",
446             time(0)
447         );
448         wu.update_field(buf);
449     }
450 }
451 
452 // Make one pass through the work array, filling in empty slots.
453 // Return true if we filled in any.
454 //
scan_work_array(vector<DB_WORK_ITEM> & work_items)455 static bool scan_work_array(vector<DB_WORK_ITEM> &work_items) {
456     int i;
457     bool found;
458     int enum_phase[napps];
459     int app_index;
460     int nadditions=0, ncollisions=0;
461 
462     for (i=0; i<napps; i++) {
463         if (work_items[i].cursor.active) {
464             enum_phase[i] = ENUM_FIRST_PASS;
465         } else {
466             enum_phase[i] = ENUM_SECOND_PASS;
467         }
468     }
469 
470     if (using_hr && config.hr_allocate_slots) {
471         hr_count_slots();
472     }
473 
474     for (i=0; i<ssp->max_wu_results; i++) {
475         app_index = app_indices[i];
476 
477         DB_WORK_ITEM& wi = work_items[app_index];
478         WU_RESULT& wu_result = ssp->wu_results[i];
479         switch (wu_result.state) {
480         case WR_STATE_PRESENT:
481             if (purge_stale_time && wu_result.time_added_to_shared_memory < (time(0) - purge_stale_time)) {
482                 log_messages.printf(MSG_NORMAL,
483                     "remove result [RESULT#%lu] from slot %d because it is stale\n",
484                     wu_result.resultid, i
485                 );
486                 purge_stale(wu_result);
487                 wu_result.state = WR_STATE_EMPTY;
488                 // fall through, refill this array slot
489             } else {
490                 break;
491             }
492         case WR_STATE_EMPTY:
493             if (enum_phase[app_index] == ENUM_OVER) continue;
494             found = get_job_from_db(
495                 wi, app_index, enum_phase[app_index], ncollisions
496             );
497             if (found) {
498                 log_messages.printf(MSG_NORMAL,
499                     "adding result [RESULT#%lu] in slot %d\n",
500                     wi.res_id, i
501                 );
502                 wu_result.resultid = wi.res_id;
503                 wu_result.res_priority = wi.res_priority;
504                 wu_result.res_server_state = wi.res_server_state;
505                 wu_result.res_report_deadline = wi.res_report_deadline;
506                 wu_result.workunit = wi.wu;
507                 wu_result.state = WR_STATE_PRESENT;
508                 // If the workunit has already been allocated to a certain
509                 // OS then it should be assigned quickly,
510                 // so we set its infeasible_count to 1
511                 //
512                 if (wi.wu.hr_class > 0) {
513                     wu_result.infeasible_count = 1;
514                 } else {
515                     wu_result.infeasible_count = 0;
516                 }
517                 // set the need_reliable flag if needed
518                 //
519                 wu_result.need_reliable = false;
520                 if (config.reliable_on_priority && wu_result.res_priority >= config.reliable_on_priority) {
521                     wu_result.need_reliable = true;
522                 }
523                 wu_result.time_added_to_shared_memory = time(0);
524                 nadditions++;
525             }
526             break;
527         default:
528             // here the state is a PID; see if it's still alive
529             //
530             int pid = wu_result.state;
531             struct stat s;
532             char buf[256];
533             sprintf(buf, "/proc/%d", pid);
534             log_messages.printf(MSG_NORMAL, "checking pid %d\n", pid);
535             if (stat(buf, &s)) {
536                 wu_result.state = WR_STATE_PRESENT;
537                 log_messages.printf(MSG_NORMAL,
538                     "Result reserved by non-existent process PID %d; resetting\n",
539                     pid
540                 );
541             }
542         }
543     }
544     log_messages.printf(MSG_DEBUG, "Added %d results to array\n", nadditions);
545     if (ncollisions) {
546         log_messages.printf(MSG_DEBUG,
547             "%d results already in array\n", ncollisions
548         );
549         return false;
550     }
551     if (nadditions == 0) {
552         return false;
553     }
554     return true;
555 }
556 
feeder_loop()557 void feeder_loop() {
558     vector<DB_WORK_ITEM> work_items;
559     double next_av_update_time=0;
560 
561     // may need one enumeration per app; create vector
562     //
563     work_items.resize(napps);
564 
565     while (1) {
566         bool action;
567         if (config.dont_send_jobs) {
568             action = false;
569         } else {
570             action = scan_work_array(work_items);
571         }
572         ssp->ready = true;
573         if (!action) {
574 #ifdef GCL_SIMULATOR
575             continue_simulation("feeder");
576             log_messages.printf(MSG_DEBUG, "Waiting for signal\n");
577             signal(SIGUSR2, simulator_signal_handler);
578             pause();
579 #else
580             log_messages.printf(MSG_DEBUG,
581                 "No action; sleeping %d sec\n", sleep_interval
582             );
583             daemon_sleep(sleep_interval);
584 #endif
585         } else {
586             if (config.job_size_matching) {
587                 update_job_stats();
588             }
589         }
590 
591         double now = dtime();
592         if (is_main_feeder && now > next_av_update_time) {
593             int retval = update_av_scales(ssp);
594             if (retval) {
595                 log_messages.printf(MSG_CRITICAL,
596                     "update_av_scales failed: %s\n", boincerror(retval)
597                 );
598                 exit(1);
599             }
600             next_av_update_time = now + AV_UPDATE_PERIOD;
601         }
602         fflush(stdout);
603         check_stop_daemons();
604         check_reread_trigger();
605     }
606 }
607 
608 // see if we're using HR, and if so initialize the necessary data structures
609 //
hr_init()610 void hr_init() {
611     int i, retval;
612     bool apps_differ = false;
613     bool some_app_uses_hr = false;
614     int hrt, hr_type0 = ssp->apps[0].homogeneous_redundancy;
615 
616     using_hr = false;
617 
618     for (i=0; i<ssp->napps; i++) {
619         hrt = ssp->apps[i].homogeneous_redundancy;
620         if (hrt <0 || hrt >= HR_NTYPES) {
621             log_messages.printf(MSG_CRITICAL,
622                 "HR type %d out of range for app %d\n", hrt, i
623             );
624             exit(1);
625         }
626         if (hrt) some_app_uses_hr = true;
627         if (hrt != hr_type0) apps_differ = true;
628     }
629     if (config.homogeneous_redundancy) {
630         log_messages.printf(MSG_NORMAL,
631             "config HR is %d\n", config.homogeneous_redundancy
632         );
633         hrt = config.homogeneous_redundancy;
634         if (hrt < 0 || hrt >= HR_NTYPES) {
635             log_messages.printf(MSG_CRITICAL,
636                 "Main HR type %d out of range\n", hrt
637             );
638             exit(1);
639         }
640         if (some_app_uses_hr) {
641             log_messages.printf(MSG_CRITICAL,
642                 "You can specify HR at global or app level, but not both\n"
643             );
644             exit(1);
645         }
646         for (i=0; i<ssp->napps; i++) {
647             ssp->apps[i].homogeneous_redundancy = config.homogeneous_redundancy;
648             ssp->apps[i].weight = 1;
649         }
650 
651     } else {
652         if (some_app_uses_hr) {
653             if (apps_differ && !all_apps) {
654                 log_messages.printf(MSG_CRITICAL,
655                     "You must use --allapps if apps have different HR\n"
656                 );
657                 exit(1);
658             }
659         } else {
660             return;     // HR not being used
661         }
662     }
663     using_hr = true;
664     if (config.hr_allocate_slots) {
665         hr_info.init();
666         retval = hr_info.read_file();
667         if (retval) {
668             log_messages.printf(MSG_CRITICAL,
669                 "Can't read HR info file: %s\n", boincerror(retval)
670             );
671             exit(1);
672         }
673 
674         // find the weight for each HR type
675         //
676         for (i=0; i<ssp->napps; i++) {
677             hrt = ssp->apps[i].homogeneous_redundancy;
678             hr_info.type_weights[hrt] += ssp->apps[i].weight;
679             hr_info.type_being_used[hrt] = true;
680         }
681 
682         // compute the slot allocations for HR classes
683         //
684         hr_info.allocate(ssp->max_wu_results);
685         hr_info.show(stderr);
686     }
687 }
688 
689 // write a summary of feeder state to stderr
690 //
show_state(int)691 void show_state(int) {
692     ssp->show(stderr);
693     if (config.hr_allocate_slots) {
694         hr_info.show(stderr);
695     }
696 }
697 
show_version()698 void show_version() {
699     log_messages.printf(MSG_NORMAL, "%s\n", SVN_VERSION);
700 }
701 
usage(char * name)702 void usage(char *name) {
703     fprintf(stderr,
704         "%s creates a shared memory segment containing DB info,\n"
705         "including an array of work items (results/workunits to send).\n\n"
706         "Usage: %s [OPTION]...\n\n"
707         "Options:\n"
708         "  [ -d X | --debug_level X]        Set log verbosity to X (1..4)\n"
709         "  [ --allapps ]                    Interleave results from all applications uniformly.\n"
710         "  [ --random_order ]               order by \"random\" field of result\n"
711         "  [ --random_order_db ]            randomize order with SQL rand(sysdate())\n"
712         "  [ --priority_asc ]               order by increasing \"priority\" field of result\n"
713         "  [ --priority_order ]             order by decreasing \"priority\" field of result\n"
714         "  [ --priority_order_create_time ] order by priority, then by increasing WU create time\n"
715         "  [ --purge_stale x ]              remove work items from the shared memory segment after x secs\n"
716         "                                   that have been there for longer then x minutes\n"
717         "                                   but haven't been assigned\n"
718         "  [ --appids a1{,a2} ]             get work only for appids a1,... (comma-separated list)\n"
719         "  [ --mod n i ]                    handle only results with (id mod n) == i\n"
720         "  [ --wmod n i ]                   handle only workunits with (id mod n) == i\n"
721         "  [ --sleep_interval x ]           sleep x seconds if nothing to do\n"
722         "  [ -h | --help ]                  Shows this help text.\n"
723         "  [ -v | --version ]               Shows version information.\n",
724         name, name
725     );
726 }
727 
main(int argc,char ** argv)728 int main(int argc, char** argv) {
729     int i, retval;
730     void* p;
731     char path[MAXPATHLEN], order_buf[1024];
732 
733     for (i=1; i<argc; i++) {
734         if (is_arg(argv[i], "d") || is_arg(argv[i], "debug_level")) {
735             if (!argv[++i]) {
736                 log_messages.printf(MSG_CRITICAL, "%s requires an argument\n\n", argv[--i]);
737                 usage(argv[0]);
738                 exit(1);
739             }
740             int dl = atoi(argv[i]);
741             log_messages.set_debug_level(dl);
742             if (dl == 4) g_print_queries = true;
743         } else if (is_arg(argv[i], "random_order")) {
744             order_clause = "order by r1.random ";
745         } else if (is_arg(argv[i], "random_order_db")) {
746             order_clause = "order by rand(sysdate()) ";
747         } else if (is_arg(argv[i], "allapps")) {
748             all_apps = true;
749         } else if (is_arg(argv[i], "priority_asc")) {
750             order_clause = "order by r1.priority asc ";
751         } else if (is_arg(argv[i], "priority_order")) {
752             order_clause = "order by r1.priority desc ";
753         } else if (is_arg(argv[i], "priority_order_create_time")) {
754             order_clause = "order by r1.priority desc, r1.workunitid";
755         } else if (is_arg(argv[i], "by_batch")) {
756             // Evenly distribute work among batches
757             // The 0=1 causes anything before the union statement
758             // to result in an empty set,
759             // and the '#' at the end comments out anthing following our query
760             // This has allowed us to inject a more customizable query
761             //
762             sprintf(order_buf, "and 0=1 union (SELECT r1.id, r1.priority, r1.server_state, r1.report_deadline, workunit.* FROM workunit JOIN ("
763                 "SELECT *, CASE WHEN @batch != t.batch THEN @rownum := 0 WHEN @batch = t.batch THEN @rownum := @rownum + 1 END AS rank, @batch := t.batch "
764                 "FROM (SELECT @rownum := 0, @batch := 0, r.* FROM result r WHERE r.server_state=2 ORDER BY batch) t) r1 ON workunit.id=r1.workunitid "
765                 "ORDER BY rank LIMIT %d)#",
766                 enum_limit
767             );
768             order_clause = order_buf;
769         } else if (is_arg(argv[i], "purge_stale")) {
770             purge_stale_time = atoi(argv[++i])*60;
771         } else if (is_arg(argv[i], "appids")) {
772             if (!argv[++i]) {
773                 log_messages.printf(MSG_CRITICAL, "%s requires an argument\n\n", argv[--i]);
774                 usage(argv[0]);
775                 exit(1);
776             }
777             strcat(mod_select_clause, " and workunit.appid in (");
778             safe_strcat(mod_select_clause, argv[i]);
779             strcat(mod_select_clause, ")");
780         } else if (is_arg(argv[i], "mod")) {
781             if (!argv[i+1] || !argv[i+2]) {
782                 log_messages.printf(MSG_CRITICAL, "%s requires two arguments\n\n", argv[i]);
783                 usage(argv[0]);
784                 exit(1);
785             }
786             int n = atoi(argv[++i]);
787             int j = atoi(argv[++i]);
788             sprintf(mod_select_clause, "and r1.id %% %d = %d ", n, j);
789             is_main_feeder = (j==0);
790         } else if (is_arg(argv[i], "wmod")) {
791             if (!argv[i+1] || !argv[i+2]) {
792                 log_messages.printf(MSG_CRITICAL, "%s requires two arguments\n\n", argv[i]);
793                 usage(argv[0]);
794                 exit(1);
795             }
796             int n = atoi(argv[++i]);
797             int j = atoi(argv[++i]);
798             sprintf(mod_select_clause, "and workunit.id %% %d = %d ", n, j);
799             is_main_feeder = (j==0);
800         } else if (is_arg(argv[i], "sleep_interval")) {
801             if (!argv[++i]) {
802                 log_messages.printf(MSG_CRITICAL, "%s requires an argument\n\n", argv[--i]);
803                 usage(argv[0]);
804                 exit(1);
805             }
806             sleep_interval = atoi(argv[i]);
807         } else if (is_arg(argv[i], "v") || is_arg(argv[i], "version")) {
808             show_version();
809             exit(0);
810         } else if (is_arg(argv[i], "h") || is_arg(argv[i], "help")) {
811             usage(argv[0]);
812             exit(0);
813         } else {
814             log_messages.printf(MSG_CRITICAL, "unknown command line argument: %s\n\n", argv[i]);
815             usage(argv[0]);
816             exit(1);
817         }
818     }
819 
820     retval = config.parse_file();
821     if (retval) {
822         log_messages.printf(MSG_CRITICAL,
823             "Can't parse config.xml: %s\n", boincerror(retval)
824         );
825         exit(1);
826     }
827 
828     unlink(config.project_path(REREAD_DB_FILENAME));
829 
830     log_messages.printf(MSG_NORMAL, "Starting\n");
831     show_version();
832 
833     if (config.feeder_query_size) {
834         enum_limit = config.feeder_query_size;
835     }
836     if (config.shmem_work_items) {
837         num_work_items = config.shmem_work_items;
838     }
839     strlcpy(path, config.project_dir, sizeof(path));
840     get_key(path, 'a', sema_key);
841     destroy_semaphore(sema_key);
842     create_semaphore(sema_key);
843 
844     retval = destroy_shmem(config.shmem_key);
845     if (retval) {
846         log_messages.printf(MSG_CRITICAL, "can't destroy shmem\n");
847         exit(1);
848     }
849 
850     int shmem_size = sizeof(SCHED_SHMEM) + num_work_items*sizeof(WU_RESULT);
851     retval = create_shmem(config.shmem_key, shmem_size, 0 /* don't set GID */, &p);
852     if (retval) {
853         log_messages.printf(MSG_CRITICAL, "can't create shmem\n");
854         exit(1);
855     }
856     ssp = (SCHED_SHMEM*)p;
857     ssp->init(num_work_items);
858 
859     atexit(cleanup_shmem);
860     install_stop_signal_handler();
861 
862     retval = boinc_db.open(
863         config.db_name, config.db_host, config.db_user, config.db_passwd
864     );
865     if (retval) {
866         log_messages.printf(MSG_CRITICAL,
867             "boinc_db.open: %d; %s\n", retval, boinc_db.error_string()
868         );
869         exit(1);
870     }
871     retval = boinc_db.set_isolation_level(READ_UNCOMMITTED);
872     if (retval) {
873         log_messages.printf(MSG_CRITICAL,
874             "boinc_db.set_isolation_level: %d; %s\n", retval, boinc_db.error_string()
875         );
876     }
877     ssp->scan_tables();
878 
879     log_messages.printf(MSG_NORMAL,
880         "read "
881         "%d platforms, "
882         "%d apps, "
883         "%d app_versions, "
884         "%d assignments\n",
885         ssp->nplatforms,
886         ssp->napps,
887         ssp->napp_versions,
888         ssp->nassignments
889     );
890     log_messages.printf(MSG_NORMAL,
891         "Using %d job slots\n", ssp->max_wu_results
892     );
893 
894     app_indices = (int*) calloc(ssp->max_wu_results, sizeof(int));
895 
896     // If all_apps is set, make an array saying which array slot
897     // is associated with which app
898     //
899     if (all_apps) {
900         napps = ssp->napps;
901         enum_sizes = (int*) calloc(ssp->napps, sizeof(int));
902         double* weights = (double*) calloc(ssp->napps, sizeof(double));
903         int* counts = (int*) calloc(ssp->napps, sizeof(int));
904         if (ssp->app_weight_sum == 0) {
905             for (i=0; i<ssp->napps; i++) {
906                 ssp->apps[i].weight = 1;
907             }
908             ssp->app_weight_sum = ssp->napps;
909         }
910         for (i=0; i<ssp->napps; i++) {
911             weights[i] = ssp->apps[i].weight;
912         }
913         for (i=0; i<ssp->napps; i++) {
914             enum_sizes[i] = (int) floor(0.5 + enum_limit*(weights[i])/(ssp->app_weight_sum));
915         }
916         weighted_interleave(
917             weights, ssp->napps, ssp->max_wu_results, app_indices, counts
918         );
919         free(weights);
920         free(counts);
921     } else {
922         napps = 1;
923     }
924 
925     hr_init();
926 
927     if (using_hr && strlen(order_clause)) {
928         log_messages.printf(MSG_CRITICAL,
929             "Note: ordering options will not apply to apps for which homogeneous redundancy is used\n"
930         );
931     }
932 
933     retval = ssp->perf_info.get_from_db();
934     if (retval) {
935         log_messages.printf(MSG_CRITICAL,
936             "PERF_INFO::get_from_db(): %d\n", retval
937         );
938     }
939 
940     signal(SIGUSR1, show_state);
941 
942     feeder_loop();
943 }
944 
945 const char *BOINC_RCSID_57c87aa242 = "$Id$";
946