1 /*
2 Task Spooler - a task queue system for the unix user
3 Copyright (C) 2007-2013 Lluís Batlle i Rossell
4
5 Please find the license in the provided COPYING file.
6 */
7 #include <stdlib.h>
8 #include <unistd.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <sys/time.h>
12 #include <time.h>
13 #include "main.h"
14
15 /* The list will access them */
16 int busy_slots = 0;
17 int max_slots = 1;
18
19 struct Notify
20 {
21 int socket;
22 int jobid;
23 struct Notify *next;
24 };
25
26 /* Globals */
27 static struct Job *firstjob = 0;
28 static struct Job *first_finished_job = 0;
29 static int jobids = 0;
30 /* This is used for dependencies from jobs
31 * already out of the queue */
32 static int last_errorlevel = 0; /* Before the first job, let's consider
33 a good previous result */
34 /* We need this to handle well "-d" after a "-nf" run */
35 static int last_finished_jobid;
36
37 static struct Notify *first_notify = 0;
38
39 int max_jobs;
40
41 static struct Job * get_job(int jobid);
42 void notify_errorlevel(struct Job *p);
43
send_list_line(int s,const char * str)44 static void send_list_line(int s, const char * str)
45 {
46 struct msg m;
47
48 /* Message */
49 m.type = LIST_LINE;
50 m.u.size = strlen(str) + 1;
51
52 send_msg(s, &m);
53
54 /* Send the line */
55 send_bytes(s, str, m.u.size);
56 }
57
send_urgent_ok(int s)58 static void send_urgent_ok(int s)
59 {
60 struct msg m;
61
62 /* Message */
63 m.type = URGENT_OK;
64
65 send_msg(s, &m);
66 }
67
send_swap_jobs_ok(int s)68 static void send_swap_jobs_ok(int s)
69 {
70 struct msg m;
71
72 /* Message */
73 m.type = SWAP_JOBS_OK;
74
75 send_msg(s, &m);
76 }
77
find_previous_job(const struct Job * final)78 static struct Job * find_previous_job(const struct Job *final)
79 {
80 struct Job *p;
81
82 /* Show Queued or Running jobs */
83 p = firstjob;
84 while(p != 0)
85 {
86 if (p->next == final)
87 return p;
88 p = p->next;
89 }
90
91 return 0;
92 }
93
findjob(int jobid)94 static struct Job * findjob(int jobid)
95 {
96 struct Job *p;
97
98 /* Show Queued or Running jobs */
99 p = firstjob;
100 while(p != 0)
101 {
102 if (p->jobid == jobid)
103 return p;
104 p = p->next;
105 }
106
107 return 0;
108 }
109
findjob_holding_client()110 static struct Job * findjob_holding_client()
111 {
112 struct Job *p;
113
114 /* Show Queued or Running jobs */
115 p = firstjob;
116 while(p != 0)
117 {
118 if (p->state == HOLDING_CLIENT)
119 return p;
120 p = p->next;
121 }
122
123 return 0;
124 }
125
find_finished_job(int jobid)126 static struct Job * find_finished_job(int jobid)
127 {
128 struct Job *p;
129
130 /* Show Queued or Running jobs */
131 p = first_finished_job;
132 while(p != 0)
133 {
134 if (p->jobid == jobid)
135 return p;
136 p = p->next;
137 }
138
139 return 0;
140 }
141
count_not_finished_jobs()142 static int count_not_finished_jobs()
143 {
144 int count=0;
145 struct Job *p;
146
147 /* Show Queued or Running jobs */
148 p = firstjob;
149 while(p != 0)
150 {
151 ++count;
152 p = p->next;
153 }
154 return count;
155 }
156
add_notify_errorlevel_to(struct Job * job,int jobid)157 static void add_notify_errorlevel_to(struct Job *job, int jobid)
158 {
159 int *p;
160 int newsize = (job->notify_errorlevel_to_size + 1)
161 * sizeof(int);
162 p = (int *) realloc(job->notify_errorlevel_to,
163 newsize);
164
165 if (p == 0)
166 error("Cannot allocate more memory for notify_errorlist_to for jobid %i,"
167 " having already %i elements",
168 job->jobid, job->notify_errorlevel_to_size);
169
170 job->notify_errorlevel_to = p;
171 job->notify_errorlevel_to_size += 1;
172 job->notify_errorlevel_to[job->notify_errorlevel_to_size - 1] = jobid;
173 }
174
s_mark_job_running(int jobid)175 void s_mark_job_running(int jobid)
176 {
177 struct Job *p;
178 p = findjob(jobid);
179 if (!p)
180 error("Cannot mark the jobid %i RUNNING.", jobid);
181 p->state = RUNNING;
182 }
183
184 /* -1 means nothing awaken, otherwise returns the jobid awaken */
wake_hold_client()185 int wake_hold_client()
186 {
187 struct Job *p;
188 p = findjob_holding_client();
189 if (p)
190 {
191 p->state = QUEUED;
192 return p->jobid;
193 }
194 return -1;
195 }
196
jstate2string(enum Jobstate s)197 const char * jstate2string(enum Jobstate s)
198 {
199 const char * jobstate;
200 switch(s)
201 {
202 case QUEUED:
203 jobstate = "queued";
204 break;
205 case RUNNING:
206 jobstate = "running";
207 break;
208 case FINISHED:
209 jobstate = "finished";
210 break;
211 case SKIPPED:
212 jobstate = "skipped";
213 break;
214 case HOLDING_CLIENT:
215 jobstate = "skipped";
216 break;
217 }
218 return jobstate;
219 }
220
s_list(int s)221 void s_list(int s)
222 {
223 struct Job *p;
224 char *buffer;
225
226 /* Times: 0.00/0.00/0.00 - 4+4+4+2 = 14*/
227 buffer = joblist_headers();
228 send_list_line(s,buffer);
229 free(buffer);
230
231 /* Show Queued or Running jobs */
232 p = firstjob;
233 while(p != 0)
234 {
235 if (p->state != HOLDING_CLIENT)
236 {
237 buffer = joblist_line(p);
238 send_list_line(s,buffer);
239 free(buffer);
240 }
241 p = p->next;
242 }
243
244 p = first_finished_job;
245
246 /* Show Finished jobs */
247 while(p != 0)
248 {
249 buffer = joblist_line(p);
250 send_list_line(s,buffer);
251 free(buffer);
252 p = p->next;
253 }
254 }
255
newjobptr()256 static struct Job * newjobptr()
257 {
258 struct Job *p;
259
260 if (firstjob == 0)
261 {
262 firstjob = (struct Job *) malloc(sizeof(*firstjob));
263 firstjob->next = 0;
264 firstjob->output_filename = 0;
265 firstjob->command = 0;
266 return firstjob;
267 }
268
269 p = firstjob;
270 while(p->next != 0)
271 p = p->next;
272
273 p->next = (struct Job *) malloc(sizeof(*p));
274 p->next->next = 0;
275 p->next->output_filename = 0;
276 p->next->command = 0;
277
278 return p->next;
279 }
280
281 /* Returns -1 if no last job id found */
find_last_jobid_in_queue(int neglect_jobid)282 static int find_last_jobid_in_queue(int neglect_jobid)
283 {
284 struct Job *p;
285 int last_jobid = -1;
286
287 p = firstjob;
288 while(p != 0)
289 {
290 if (p->jobid != neglect_jobid &&
291 p->jobid > last_jobid)
292 last_jobid = p->jobid;
293 p = p->next;
294 }
295
296 return last_jobid;
297 }
298
299 /* Returns -1 if no last job id found */
find_last_stored_jobid_finished()300 static int find_last_stored_jobid_finished()
301 {
302 struct Job *p;
303 int last_jobid = -1;
304
305 p = first_finished_job;
306 while(p != 0)
307 {
308 if (p->jobid > last_jobid)
309 last_jobid = p->jobid;
310 p = p->next;
311 }
312
313 return last_jobid;
314 }
315
316 /* Returns job id or -1 on error */
s_newjob(int s,struct msg * m)317 int s_newjob(int s, struct msg *m)
318 {
319 struct Job *p;
320 int res;
321
322 p = newjobptr();
323
324 p->jobid = jobids++;
325 if (count_not_finished_jobs() < max_jobs)
326 p->state = QUEUED;
327 else
328 p->state = HOLDING_CLIENT;
329 p->num_slots = m->u.newjob.num_slots;
330 p->store_output = m->u.newjob.store_output;
331 p->should_keep_finished = m->u.newjob.should_keep_finished;
332 p->notify_errorlevel_to = 0;
333 p->notify_errorlevel_to_size = 0;
334 p->do_depend = m->u.newjob.do_depend;
335 p->depend_on = -1; /* By default. May be overriden in the next conditions */
336 if (m->u.newjob.do_depend == 1)
337 {
338 /* Depend on the last queued job. */
339
340 /* As we already have 'p' in the queue,
341 * neglect it during the find_last_jobid_in_queue() */
342 if (m->u.newjob.depend_on == -1)
343 {
344 p->depend_on = find_last_jobid_in_queue(p->jobid);
345
346 /* We don't trust the last jobid in the queue (running or queued)
347 * if it's not the last added job. In that case, let
348 * the next control flow handle it as if it could not
349 * do_depend on any still queued job. */
350 if (last_finished_jobid > p->depend_on)
351 p->depend_on = -1;
352
353 /* If it's queued still without result, let it know
354 * its result to p when it finishes. */
355 if (p->depend_on != -1)
356 {
357 struct Job *depended_job;
358 depended_job = findjob(p->depend_on);
359 if (depended_job != 0)
360 add_notify_errorlevel_to(depended_job, p->jobid);
361 else
362 warning("The jobid %i is queued to do_depend on the jobid %i"
363 " suddenly non existent in the queue", p->jobid,
364 p->depend_on);
365 }
366 else /* Otherwise take the finished job, or the last_errorlevel */
367 {
368 if (m->u.newjob.depend_on == -1)
369 {
370 int ljobid = find_last_stored_jobid_finished();
371 p->depend_on = ljobid;
372
373 /* If we have a newer result stored, use it */
374 /* NOTE:
375 * Reading this now, I don't know how ljobid can be
376 * greater than last_finished_jobid */
377 if (last_finished_jobid < ljobid)
378 {
379 struct Job *parent;
380 parent = find_finished_job(ljobid);
381 if (!parent)
382 error("jobid %i suddenly disappeared from the finished list",
383 ljobid);
384 p->dependency_errorlevel = parent->result.errorlevel;
385 }
386 else
387 p->dependency_errorlevel = last_errorlevel;
388 }
389 }
390 }
391 else
392 {
393 /* The user decided what's the job this new job depends on */
394 struct Job *depended_job;
395
396 p->depend_on = m->u.newjob.depend_on;
397
398 depended_job = findjob(p->depend_on);
399 if (depended_job != 0)
400 add_notify_errorlevel_to(depended_job, p->jobid);
401 else
402 {
403 struct Job *parent;
404 parent = find_finished_job(p->depend_on);
405 if (parent)
406 {
407 p->dependency_errorlevel = parent->result.errorlevel;
408 }
409 else
410 {
411 /* We consider as if the job not found
412 didn't finish well */
413 p->dependency_errorlevel = -1;
414 }
415 }
416 }
417 }
418
419
420 pinfo_init(&p->info);
421 pinfo_set_enqueue_time(&p->info);
422
423 /* load the command */
424 p->command = malloc(m->u.newjob.command_size);
425 if (p->command == 0)
426 error("Cannot allocate memory in s_newjob command_size (%i)",
427 m->u.newjob.command_size);
428 res = recv_bytes(s, p->command, m->u.newjob.command_size);
429 if (res == -1)
430 error("wrong bytes received");
431
432 /* load the label */
433 p->label = 0;
434 if (m->u.newjob.label_size > 0)
435 {
436 char *ptr;
437 ptr = (char *) malloc(m->u.newjob.label_size);
438 if (ptr == 0)
439 error("Cannot allocate memory in s_newjob env_size(%i)",
440 m->u.newjob.env_size);
441 res = recv_bytes(s, ptr, m->u.newjob.label_size);
442 if (res == -1)
443 error("wrong bytes received");
444 p->label = ptr;
445 }
446
447 /* load the info */
448 if (m->u.newjob.env_size > 0)
449 {
450 char *ptr;
451 ptr = (char *) malloc(m->u.newjob.env_size);
452 if (ptr == 0)
453 error("Cannot allocate memory in s_newjob env_size(%i)",
454 m->u.newjob.env_size);
455 res = recv_bytes(s, ptr, m->u.newjob.env_size);
456 if (res == -1)
457 error("wrong bytes received");
458 pinfo_addinfo(&p->info, m->u.newjob.env_size+100,
459 "Environment:\n%s", ptr);
460 free(ptr);
461 }
462
463 return p->jobid;
464 }
465
466 /* This assumes the jobid exists */
s_removejob(int jobid)467 void s_removejob(int jobid)
468 {
469 struct Job *p;
470 struct Job *newnext;
471
472 if (firstjob->jobid == jobid)
473 {
474 struct Job *newfirst;
475
476 /* First job is to be removed */
477 newfirst = firstjob->next;
478 free(firstjob->command);
479 free(firstjob->output_filename);
480 pinfo_free(&firstjob->info);
481 free(firstjob->label);
482 free(firstjob);
483 firstjob = newfirst;
484 return;
485 }
486
487 p = firstjob;
488 /* Not first job */
489 while (p->next != 0)
490 {
491 if (p->next->jobid == jobid)
492 break;
493 p = p->next;
494 }
495 if (p->next == 0)
496 error("Job to be removed not found. jobid=%i", jobid);
497
498 newnext = p->next->next;
499
500 free(p->next->command);
501 free(p->next);
502 p->next = newnext;
503 }
504
505 /* -1 if no one should be run. */
next_run_job()506 int next_run_job()
507 {
508 struct Job *p;
509
510 const int free_slots = max_slots - busy_slots;
511
512 /* busy_slots may be bigger than the maximum slots,
513 * if the user was running many jobs, and suddenly
514 * trimmed the maximum slots down. */
515 if (free_slots <= 0)
516 return -1;
517
518 /* If there are no jobs to run... */
519 if (firstjob == 0)
520 return -1;
521
522 /* Look for a runnable task */
523 p = firstjob;
524 while(p != 0)
525 {
526 if (p->state == QUEUED)
527 {
528 if (p->depend_on >= 0)
529 {
530 struct Job *do_depend_job = get_job(p->depend_on);
531 /* We won't try to run any job do_depending on an unfinished
532 * job */
533 if (do_depend_job != NULL &&
534 (do_depend_job->state == QUEUED || do_depend_job->state == RUNNING))
535 {
536 /* Next try */
537 p = p->next;
538 continue;
539 }
540 }
541
542 if (free_slots >= p->num_slots)
543 {
544 busy_slots = busy_slots + p->num_slots;
545 return p->jobid;
546 }
547 }
548 p = p->next;
549 }
550
551 return -1;
552 }
553
554 /* Returns 1000 if no limit, The limit otherwise. */
get_max_finished_jobs()555 static int get_max_finished_jobs()
556 {
557 char *limit;
558
559 limit = getenv("TS_MAXFINISHED");
560 if (limit == NULL)
561 return 1000;
562 return abs(atoi(limit));
563 }
564
565 /* Add the job to the finished queue. */
new_finished_job(struct Job * j)566 static void new_finished_job(struct Job *j)
567 {
568 struct Job *p;
569 int count, max;
570
571 max = get_max_finished_jobs();
572 count = 0;
573
574 if (first_finished_job == 0 && count < max)
575 {
576 first_finished_job = j;
577 first_finished_job->next = 0;
578 return;
579 }
580
581 ++count;
582
583 p = first_finished_job;
584 while(p->next != 0)
585 {
586 p = p->next;
587 ++count;
588 }
589
590 /* If too many jobs, wipe out the first */
591 if (count >= max)
592 {
593 struct Job *tmp;
594 tmp = first_finished_job;
595 first_finished_job = first_finished_job->next;
596 free(tmp->command);
597 free(tmp->output_filename);
598 pinfo_free(&tmp->info);
599 free(tmp->label);
600 free(tmp);
601 }
602 p->next = j;
603 p->next->next = 0;
604
605 return;
606 }
607
job_is_in_state(int jobid,enum Jobstate state)608 static int job_is_in_state(int jobid, enum Jobstate state)
609 {
610 struct Job *p;
611
612 p = findjob(jobid);
613 if (p == 0)
614 return 0;
615 if (p->state == state)
616 return 1;
617 return 0;
618 }
619
job_is_running(int jobid)620 int job_is_running(int jobid)
621 {
622 return job_is_in_state(jobid, RUNNING);
623 }
624
job_is_holding_client(int jobid)625 int job_is_holding_client(int jobid)
626 {
627 return job_is_in_state(jobid, HOLDING_CLIENT);
628 }
629
in_notify_list(int jobid)630 static int in_notify_list(int jobid)
631 {
632 struct Notify *n, *tmp;
633
634 n = first_notify;
635 while (n != 0)
636 {
637 tmp = n;
638 n = n->next;
639 if (tmp->jobid == jobid)
640 return 1;
641 }
642 return 0;
643 }
644
job_finished(const struct Result * result,int jobid)645 void job_finished(const struct Result *result, int jobid)
646 {
647 struct Job *p;
648
649 if (busy_slots <= 0)
650 error("Wrong state in the server. busy_slots = %i instead of greater than 0", busy_slots);
651
652 p = findjob(jobid);
653 if (p == 0)
654 error("on jobid %i finished, it doesn't exist", jobid);
655
656 /* The job may be not only in running state, but also in other states, as
657 * we call this to clean up the jobs list in case of the client closing the
658 * connection. */
659 if (p->state == RUNNING)
660 busy_slots = busy_slots - p->num_slots;
661
662 /* Mark state */
663 if (result->skipped)
664 p->state = SKIPPED;
665 else
666 p->state = FINISHED;
667 p->result = *result;
668 last_finished_jobid = p->jobid;
669 notify_errorlevel(p);
670 pinfo_set_end_time(&p->info);
671
672 if (p->result.died_by_signal)
673 pinfo_addinfo(&p->info, 100, "Exit status: killed by signal %i\n", p->result.signal);
674 else
675 pinfo_addinfo(&p->info, 100, "Exit status: died with exit code %i\n", p->result.errorlevel);
676
677 /* Find the pointing node, to
678 * update it removing the finished job. */
679 {
680 struct Job **jpointer = 0;
681 struct Job *newfirst = p->next;
682 if (firstjob == p)
683 jpointer = &firstjob;
684 else
685 {
686 struct Job *p2;
687 p2 = firstjob;
688 while(p2 != 0)
689 {
690 if (p2->next == p)
691 {
692 jpointer = &(p2->next);
693 break;
694 }
695 p2 = p2->next;
696 }
697 }
698
699 /* Add it to the finished queue (maybe temporarily) */
700 if (p->should_keep_finished || in_notify_list(p->jobid))
701 new_finished_job(p);
702
703 /* Remove it from the run queue */
704 if (jpointer == 0)
705 error("Cannot remove a finished job from the "
706 "queue list (jobid=%i)", p->jobid);
707
708 *jpointer = newfirst;
709 }
710 }
711
s_clear_finished()712 void s_clear_finished()
713 {
714 struct Job *p;
715
716 if (first_finished_job == 0)
717 return;
718
719 p = first_finished_job;
720 first_finished_job = 0;
721
722 while (p != 0)
723 {
724 struct Job *tmp;
725 tmp = p->next;
726 free(p->command);
727 free(p->output_filename);
728 pinfo_free(&p->info);
729 free(p->label);
730 free(p);
731 p = tmp;
732 }
733 }
734
s_process_runjob_ok(int jobid,char * oname,int pid)735 void s_process_runjob_ok(int jobid, char *oname, int pid)
736 {
737 struct Job *p;
738 p = findjob(jobid);
739 if (p == 0)
740 error("Job %i already run not found on runjob_ok", jobid);
741 if (p->state != RUNNING)
742 error("Job %i not running, but %i on runjob_ok", jobid,
743 p->state);
744
745 p->pid = pid;
746 p->output_filename = oname;
747 pinfo_set_start_time(&p->info);
748 }
749
s_send_runjob(int s,int jobid)750 void s_send_runjob(int s, int jobid)
751 {
752 struct msg m;
753 struct Job *p;
754
755 p = findjob(jobid);
756 if (p == 0)
757 error("Job %i was expected to run", jobid);
758
759
760 m.type = RUNJOB;
761
762 /* TODO
763 * We should make the dependencies update the jobids they're do_depending on.
764 * Then, on finish, these could set the errorlevel to send to its dependency childs.
765 * We cannot consider that the jobs will leave traces in the finished job list (-nf?) . */
766
767 m.u.last_errorlevel = p->dependency_errorlevel;
768
769 send_msg(s, &m);
770 }
771
s_job_info(int s,int jobid)772 void s_job_info(int s, int jobid)
773 {
774 struct Job *p = 0;
775 struct msg m;
776
777 if (jobid == -1)
778 {
779 /* This means that we want the job info of the running task, or that
780 * of the last job run */
781 if (busy_slots > 0)
782 {
783 p = firstjob;
784 if (p == 0)
785 error("Internal state WAITING, but job not run."
786 "firstjob = %x", firstjob);
787 }
788 else
789 {
790 p = first_finished_job;
791 if (p == 0)
792 {
793 send_list_line(s, "No jobs.\n");
794 return;
795 }
796 while(p->next != 0)
797 p = p->next;
798 }
799 } else
800 {
801 p = firstjob;
802 while (p != 0 && p->jobid != jobid)
803 p = p->next;
804
805 /* Look in finished jobs if needed */
806 if (p == 0)
807 {
808 p = first_finished_job;
809 while (p != 0 && p->jobid != jobid)
810 p = p->next;
811 }
812 }
813
814 if (p == 0)
815 {
816 char tmp[50];
817 sprintf(tmp, "Job %i not finished or not running.\n", jobid);
818 send_list_line(s, tmp);
819 return;
820 }
821
822 m.type = INFO_DATA;
823 send_msg(s, &m);
824 pinfo_dump(&p->info, s);
825 fd_nprintf(s, 100, "Command: ");
826 if (p->depend_on != -1)
827 fd_nprintf(s, 100, "[%i]&& ", p->depend_on);
828 write(s, p->command, strlen(p->command));
829 fd_nprintf(s, 100, "\n");
830 fd_nprintf(s, 100, "Slots required: %i\n", p->num_slots);
831 fd_nprintf(s, 100, "Enqueue time: %s",
832 ctime(&p->info.enqueue_time.tv_sec));
833 if (p->state == RUNNING)
834 {
835 fd_nprintf(s, 100, "Start time: %s",
836 ctime(&p->info.start_time.tv_sec));
837 fd_nprintf(s, 100, "Time running: %fs\n",
838 pinfo_time_until_now(&p->info));
839 } else if (p->state == FINISHED)
840 {
841 fd_nprintf(s, 100, "Start time: %s",
842 ctime(&p->info.start_time.tv_sec));
843 fd_nprintf(s, 100, "End time: %s",
844 ctime(&p->info.end_time.tv_sec));
845 fd_nprintf(s, 100, "Time run: %fs\n",
846 pinfo_time_run(&p->info));
847 }
848 }
849
s_send_output(int s,int jobid)850 void s_send_output(int s, int jobid)
851 {
852 struct Job *p = 0;
853 struct msg m;
854
855 if (jobid == -1)
856 {
857 /* This means that we want the output info of the running task, or that
858 * of the last job run */
859 if (busy_slots > 0)
860 {
861 p = firstjob;
862 if (p == 0)
863 error("Internal state WAITING, but job not run."
864 "firstjob = %x", firstjob);
865 }
866 else
867 {
868 p = first_finished_job;
869 if (p == 0)
870 {
871 send_list_line(s, "No jobs.\n");
872 return;
873 }
874 while(p->next != 0)
875 p = p->next;
876 }
877 } else
878 {
879 p = get_job(jobid);
880 if (p != 0 && p->state != RUNNING
881 && p->state != FINISHED
882 && p->state != SKIPPED)
883 p = 0;
884 }
885
886 if (p == 0)
887 {
888 char tmp[50];
889 if (jobid == -1)
890 sprintf(tmp, "The last job has not finished or is not running.\n");
891 else
892 sprintf(tmp, "Job %i not finished or not running.\n", jobid);
893 send_list_line(s, tmp);
894 return;
895 }
896
897 if (p->state == SKIPPED)
898 {
899 char tmp[50];
900 if (jobid == -1)
901 sprintf(tmp, "The last job was skipped due to a dependency.\n");
902
903 else
904 sprintf(tmp, "Job %i was skipped due to a dependency.\n", jobid);
905 send_list_line(s, tmp);
906 return;
907 }
908
909 m.type = ANSWER_OUTPUT;
910 m.u.output.store_output = p->store_output;
911 m.u.output.pid = p->pid;
912 if (m.u.output.store_output && p->output_filename)
913 m.u.output.ofilename_size = strlen(p->output_filename) + 1;
914 else
915 m.u.output.ofilename_size = 0;
916 send_msg(s, &m);
917 if (m.u.output.ofilename_size > 0)
918 send_bytes(s, p->output_filename, m.u.output.ofilename_size);
919 }
920
notify_errorlevel(struct Job * p)921 void notify_errorlevel(struct Job *p)
922 {
923 int i;
924
925 last_errorlevel = p->result.errorlevel;
926
927 for(i = 0; i < p->notify_errorlevel_to_size; ++i)
928 {
929 struct Job *notified;
930 notified = get_job(p->notify_errorlevel_to[i]);
931 if (notified)
932 {
933 notified->dependency_errorlevel = p->result.errorlevel;
934 }
935 }
936 }
937
938 /* jobid is input/output. If the input is -1, it's changed to the jobid
939 * removed */
s_remove_job(int s,int * jobid)940 int s_remove_job(int s, int *jobid)
941 {
942 struct Job *p = 0;
943 struct msg m;
944 struct Job *before_p = 0;
945
946 if (*jobid == -1)
947 {
948 /* Find the last job added */
949 p = firstjob;
950 if (p != 0)
951 {
952 while (p->next != 0)
953 {
954 before_p = p;
955 p = p->next;
956 }
957 } else
958 {
959 /* last 'finished' */
960 p = first_finished_job;
961 if (p)
962 {
963 while (p->next != 0)
964 {
965 before_p = p;
966 p = p->next;
967 }
968 }
969 }
970 }
971 else
972 {
973 p = firstjob;
974 if (p != 0)
975 {
976 while (p->next != 0 && p->jobid != *jobid)
977 {
978 before_p = p;
979 p = p->next;
980 }
981 }
982
983 /* If not found, look in the 'finished' list */
984 if (p == 0 || p->jobid != *jobid)
985 {
986 p = first_finished_job;
987 if (p != 0)
988 {
989 while (p->next != 0 && p->jobid != *jobid)
990 {
991 before_p = p;
992 p = p->next;
993 }
994 if (p->jobid != *jobid)
995 p = 0;
996 }
997 }
998 }
999
1000 if (p == 0 || p->state == RUNNING || p == firstjob)
1001 {
1002 char tmp[50];
1003 if (*jobid == -1)
1004 sprintf(tmp, "The last job cannot be removed.\n");
1005 else
1006 sprintf(tmp, "The job %i cannot be removed.\n", *jobid);
1007 send_list_line(s, tmp);
1008 return 0;
1009 }
1010
1011 /* Return the jobid found */
1012 *jobid = p->jobid;
1013
1014 /* Tricks for the check_notify_list */
1015 p->state = FINISHED;
1016 p->result.errorlevel = -1;
1017 notify_errorlevel(p);
1018
1019 /* Notify the clients in wait_job */
1020 check_notify_list(m.u.jobid);
1021
1022 /* Update the list pointers */
1023 if (p == first_finished_job)
1024 first_finished_job = p->next;
1025 else
1026 before_p->next = p->next;
1027
1028 free(p->notify_errorlevel_to);
1029 free(p->command);
1030 free(p->output_filename);
1031 pinfo_free(&p->info);
1032 free(p->label);
1033 free(p);
1034
1035 m.type = REMOVEJOB_OK;
1036 send_msg(s, &m);
1037 return 1;
1038 }
1039
add_to_notify_list(int s,int jobid)1040 static void add_to_notify_list(int s, int jobid)
1041 {
1042 struct Notify *n;
1043 struct Notify *new;
1044
1045 new = (struct Notify *) malloc(sizeof(*new));
1046
1047 new->socket = s;
1048 new->jobid = jobid;
1049 new->next = 0;
1050
1051 n = first_notify;
1052 if (n == 0)
1053 {
1054 first_notify = new;
1055 return;
1056 }
1057
1058 while(n->next != 0)
1059 n = n->next;
1060
1061 n->next = new;
1062 }
1063
send_waitjob_ok(int s,int errorlevel)1064 static void send_waitjob_ok(int s, int errorlevel)
1065 {
1066 struct msg m;
1067
1068 m.type = WAITJOB_OK;
1069 m.u.result.errorlevel = errorlevel;
1070 send_msg(s, &m);
1071 }
1072
1073 static struct Job *
get_job(int jobid)1074 get_job(int jobid)
1075 {
1076 struct Job *j;
1077
1078 j = findjob(jobid);
1079 if (j != NULL)
1080 return j;
1081
1082 j = find_finished_job(jobid);
1083
1084 if (j != NULL)
1085 return j;
1086
1087 return 0;
1088 }
1089
1090 /* Don't complain, if the socket doesn't exist */
s_remove_notification(int s)1091 void s_remove_notification(int s)
1092 {
1093 struct Notify *n;
1094 struct Notify *previous;
1095 n = first_notify;
1096 while (n != 0 && n->socket != s)
1097 n = n->next;
1098 if (n == 0 || n->socket != s)
1099 return;
1100
1101 /* Remove the notification */
1102 previous = first_notify;
1103 if (n == previous)
1104 {
1105 first_notify = n->next;
1106 free(n);
1107 return;
1108 }
1109
1110 /* if not the first... */
1111 while(previous->next != n)
1112 previous = previous->next;
1113
1114 previous->next = n->next;
1115 free(n);
1116 }
1117
destroy_finished_job(struct Job * j)1118 static void destroy_finished_job(struct Job *j)
1119 {
1120 if (j == first_finished_job)
1121 first_finished_job = j->next;
1122 else
1123 {
1124 struct Job *i;
1125 for(i = first_finished_job; i != 0; ++i)
1126 {
1127 if (i->next == j)
1128 {
1129 i->next = j->next;
1130 break;
1131 }
1132 }
1133 if (i == 0) {
1134 error("Cannot destroy the expected job %i", j->jobid);
1135 }
1136 }
1137
1138 free(j->notify_errorlevel_to);
1139 free(j->command);
1140 free(j->output_filename);
1141 pinfo_free(&j->info);
1142 free(j->label);
1143 free(j);
1144 }
1145
1146 /* This is called when a job finishes */
check_notify_list(int jobid)1147 void check_notify_list(int jobid)
1148 {
1149 struct Notify *n, *tmp;
1150 struct Job *j;
1151
1152 n = first_notify;
1153 while (n != 0)
1154 {
1155 tmp = n;
1156 n = n->next;
1157 if (tmp->jobid == jobid)
1158 {
1159 j = get_job(jobid);
1160 /* If the job finishes, notify the waiter */
1161 if (j->state == FINISHED || j->state == SKIPPED)
1162 {
1163 send_waitjob_ok(tmp->socket, j->result.errorlevel);
1164 /* We want to get the next Nofity* before we remove
1165 * the actual 'n'. As s_remove_notification() simply
1166 * removes the element from the linked list, we can
1167 * safely follow on the list from n->next. */
1168 s_remove_notification(tmp->socket);
1169
1170 /* Remove the jobs that were temporarily in the finished list,
1171 * just for their notifiers. */
1172 if (!in_notify_list(jobid) && !j->should_keep_finished) {
1173 destroy_finished_job(j);
1174 }
1175 }
1176 }
1177 }
1178 }
1179
s_wait_job(int s,int jobid)1180 void s_wait_job(int s, int jobid)
1181 {
1182 struct Job *p = 0;
1183
1184 if (jobid == -1)
1185 {
1186 /* Find the last job added */
1187 p = firstjob;
1188
1189 if (p != 0)
1190 while (p->next != 0)
1191 p = p->next;
1192
1193 /* Look in finished jobs if needed */
1194 if (p == 0)
1195 {
1196 p = first_finished_job;
1197 if (p != 0)
1198 while (p->next != 0)
1199 p = p->next;
1200 }
1201 }
1202 else
1203 {
1204 p = firstjob;
1205 while (p != 0 && p->jobid != jobid)
1206 p = p->next;
1207
1208 /* Look in finished jobs if needed */
1209 if (p == 0)
1210 {
1211 p = first_finished_job;
1212 while (p != 0 && p->jobid != jobid)
1213 p = p->next;
1214 }
1215 }
1216
1217 if (p == 0)
1218 {
1219 char tmp[50];
1220 if (jobid == -1)
1221 sprintf(tmp, "The last job cannot be waited.\n");
1222 else
1223 sprintf(tmp, "The job %i cannot be waited.\n", jobid);
1224 send_list_line(s, tmp);
1225 return;
1226 }
1227
1228 if (p->state == FINISHED || p->state == SKIPPED)
1229 {
1230 send_waitjob_ok(s, p->result.errorlevel);
1231 }
1232 else
1233 add_to_notify_list(s, p->jobid);
1234 }
1235
s_wait_running_job(int s,int jobid)1236 void s_wait_running_job(int s, int jobid)
1237 {
1238 struct Job *p = 0;
1239
1240 /* The job finding algorithm should be similar to that of
1241 * s_send_output, because this will be used by "-t" and "-c" */
1242 if (jobid == -1)
1243 {
1244 /* This means that we want the output info of the running task, or that
1245 * of the last job run */
1246 if (busy_slots > 0)
1247 {
1248 p = firstjob;
1249 if (p == 0)
1250 error("Internal state WAITING, but job not run."
1251 "firstjob = %x", firstjob);
1252 }
1253 else
1254 {
1255 p = first_finished_job;
1256 if (p == 0)
1257 {
1258 send_list_line(s, "No jobs.\n");
1259 return;
1260 }
1261 while(p->next != 0)
1262 p = p->next;
1263 }
1264 }
1265 else
1266 {
1267 p = firstjob;
1268 while (p != 0 && p->jobid != jobid)
1269 p = p->next;
1270
1271 /* Look in finished jobs if needed */
1272 if (p == 0)
1273 {
1274 p = first_finished_job;
1275 while (p != 0 && p->jobid != jobid)
1276 p = p->next;
1277 }
1278 }
1279
1280 if (p == 0)
1281 {
1282 char tmp[50];
1283 if (jobid == -1)
1284 sprintf(tmp, "The last job cannot be waited.\n");
1285 else
1286 sprintf(tmp, "The job %i cannot be waited.\n", jobid);
1287 send_list_line(s, tmp);
1288 return;
1289 }
1290
1291 if (p->state == FINISHED || p->state == SKIPPED)
1292 {
1293 send_waitjob_ok(s, p->result.errorlevel);
1294 }
1295 else
1296 add_to_notify_list(s, p->jobid);
1297 }
1298
s_set_max_slots(int new_max_slots)1299 void s_set_max_slots(int new_max_slots)
1300 {
1301 if (new_max_slots > 0)
1302 max_slots = new_max_slots;
1303 else
1304 warning("Received new_max_slots=%i", new_max_slots);
1305 }
1306
s_get_max_slots(int s)1307 void s_get_max_slots(int s)
1308 {
1309 struct msg m;
1310
1311 /* Message */
1312 m.type = GET_MAX_SLOTS_OK;
1313 m.u.max_slots = max_slots;
1314
1315 send_msg(s, &m);
1316 }
1317
s_move_urgent(int s,int jobid)1318 void s_move_urgent(int s, int jobid)
1319 {
1320 struct Job *p = 0;
1321 struct Job *tmp1;
1322
1323 if (jobid == -1)
1324 {
1325 /* Find the last job added */
1326 p = firstjob;
1327
1328 if (p != 0)
1329 while (p->next != 0)
1330 p = p->next;
1331 }
1332 else
1333 {
1334 p = firstjob;
1335 while (p != 0 && p->jobid != jobid)
1336 p = p->next;
1337 }
1338
1339 if (p == 0 || firstjob->next == 0)
1340 {
1341 char tmp[50];
1342 if (jobid == -1)
1343 sprintf(tmp, "The last job cannot be urged.\n");
1344 else
1345 sprintf(tmp, "The job %i cannot be urged.\n", jobid);
1346 send_list_line(s, tmp);
1347 return;
1348 }
1349
1350 /* Interchange the pointers */
1351 tmp1 = find_previous_job(p);
1352 tmp1->next = p->next;
1353 p->next = firstjob->next;
1354 firstjob->next = p;
1355
1356
1357 send_urgent_ok(s);
1358 }
1359
s_swap_jobs(int s,int jobid1,int jobid2)1360 void s_swap_jobs(int s, int jobid1, int jobid2)
1361 {
1362 struct Job *p1, *p2;
1363 struct Job *prev1, *prev2;
1364 struct Job *tmp;
1365
1366 p1 = findjob(jobid1);
1367 p2 = findjob(jobid2);
1368
1369 if (p1 == 0 || p2 == 0 || p1 == firstjob || p2 == firstjob)
1370 {
1371 char prev[60];
1372 sprintf(prev, "The jobs %i and %i cannot be swapped.\n", jobid1, jobid2);
1373 send_list_line(s, prev);
1374 return;
1375 }
1376
1377 /* Interchange the pointers */
1378 prev1 = find_previous_job(p1);
1379 prev2 = find_previous_job(p2);
1380 prev1->next = p2;
1381 prev2->next = p1;
1382 tmp = p1->next;
1383 p1->next = p2->next;
1384 p2->next = tmp;
1385
1386 send_swap_jobs_ok(s);
1387 }
1388
send_state(int s,enum Jobstate state)1389 static void send_state(int s, enum Jobstate state)
1390 {
1391 struct msg m;
1392
1393 m.type = ANSWER_STATE;
1394 m.u.state = state;
1395
1396 send_msg(s, &m);
1397 }
1398
s_send_state(int s,int jobid)1399 void s_send_state(int s, int jobid)
1400 {
1401 struct Job *p = 0;
1402
1403 if (jobid == -1)
1404 {
1405 /* Find the last job added */
1406 p = firstjob;
1407
1408 if (p != 0)
1409 while (p->next != 0)
1410 p = p->next;
1411
1412 /* Look in finished jobs if needed */
1413 if (p == 0)
1414 {
1415 p = first_finished_job;
1416 if (p != 0)
1417 while (p->next != 0)
1418 p = p->next;
1419 }
1420
1421 }
1422 else
1423 {
1424 p = get_job(jobid);
1425 }
1426
1427 if (p == 0)
1428 {
1429 char tmp[50];
1430 if (jobid == -1)
1431 sprintf(tmp, "The last job cannot be stated.\n");
1432 else
1433 sprintf(tmp, "The job %i cannot be stated.\n", jobid);
1434 send_list_line(s, tmp);
1435 return;
1436 }
1437
1438 /* Interchange the pointers */
1439 send_state(s, p->state);
1440 }
1441
dump_job_struct(FILE * out,const struct Job * p)1442 static void dump_job_struct(FILE *out, const struct Job *p)
1443 {
1444 fprintf(out, " new_job\n");
1445 fprintf(out, " jobid %i\n", p->jobid);
1446 fprintf(out, " command \"%s\"\n", p->command);
1447 fprintf(out, " state %s\n",
1448 jstate2string(p->state));
1449 fprintf(out, " result.errorlevel %i\n", p->result.errorlevel);
1450 fprintf(out, " output_filename \"%s\"\n",
1451 p->output_filename ? p->output_filename : "NULL");
1452 fprintf(out, " store_output %i\n", p->store_output);
1453 fprintf(out, " pid %i\n", p->pid);
1454 fprintf(out, " should_keep_finished %i\n", p->should_keep_finished);
1455 }
1456
dump_jobs_struct(FILE * out)1457 void dump_jobs_struct(FILE *out)
1458 {
1459 const struct Job *p;
1460
1461 fprintf(out, "New_jobs\n");
1462
1463 p = firstjob;
1464 while (p != 0)
1465 {
1466 dump_job_struct(out, p);
1467 p = p->next;
1468 }
1469
1470 p = first_finished_job;
1471 while (p != 0)
1472 {
1473 dump_job_struct(out, p);
1474 p = p->next;
1475 }
1476 }
1477
dump_notify_struct(FILE * out,const struct Notify * n)1478 static void dump_notify_struct(FILE *out, const struct Notify *n)
1479 {
1480 fprintf(out, " notify\n");
1481 fprintf(out, " jobid %i\n", n->jobid);
1482 fprintf(out, " socket \"%i\"\n", n->socket);
1483 }
1484
dump_notifies_struct(FILE * out)1485 void dump_notifies_struct(FILE *out)
1486 {
1487 const struct Notify *n;
1488
1489 fprintf(out, "New_notifies\n");
1490
1491 n = first_notify;
1492 while (n != 0)
1493 {
1494 dump_notify_struct(out, n);
1495 n = n->next;
1496 }
1497 }
1498
joblist_dump(int fd)1499 void joblist_dump(int fd)
1500 {
1501 struct Job *p;
1502 char *buffer;
1503
1504 buffer = joblistdump_headers();
1505 write(fd,buffer, strlen(buffer));
1506 free(buffer);
1507
1508 /* We reuse the headers from the list */
1509 buffer = joblist_headers();
1510 write(fd, "# ", 2);
1511 write(fd, buffer, strlen(buffer));
1512
1513 /* Show Finished jobs */
1514 p = first_finished_job;
1515 while(p != 0)
1516 {
1517 buffer = joblist_line(p);
1518 write(fd, "# ", 2);
1519 write(fd,buffer, strlen(buffer));
1520 free(buffer);
1521 p = p->next;
1522 }
1523
1524 write(fd, "\n", 1);
1525
1526 /* Show Queued or Running jobs */
1527 p = firstjob;
1528 while(p != 0)
1529 {
1530 buffer = joblistdump_torun(p);
1531 write(fd,buffer,strlen(buffer));
1532 free(buffer);
1533 p = p->next;
1534 }
1535 }
1536