1 /*
2     Task Spooler - a task queue system for the unix user
3     Copyright (C) 2007-2013  Lluís Batlle i Rossell
4 
5     Please find the license in the provided COPYING file.
6 */
7 #include <stdlib.h>
8 #include <unistd.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <sys/time.h>
12 #include <time.h>
13 #include "main.h"
14 
15 /* The list will access them */
16 int busy_slots = 0;
17 int max_slots = 1;
18 
19 struct Notify
20 {
21     int socket;
22     int jobid;
23     struct Notify *next;
24 };
25 
26 /* Globals */
27 static struct Job *firstjob = 0;
28 static struct Job *first_finished_job = 0;
29 static int jobids = 0;
30 /* This is used for dependencies from jobs
31  * already out of the queue */
32 static int last_errorlevel = 0; /* Before the first job, let's consider
33                                    a good previous result */
34 /* We need this to handle well "-d" after a "-nf" run */
35 static int last_finished_jobid;
36 
37 static struct Notify *first_notify = 0;
38 
39 int max_jobs;
40 
41 static struct Job * get_job(int jobid);
42 void notify_errorlevel(struct Job *p);
43 
send_list_line(int s,const char * str)44 static void send_list_line(int s, const char * str)
45 {
46     struct msg m;
47 
48     /* Message */
49     m.type = LIST_LINE;
50     m.u.size = strlen(str) + 1;
51 
52     send_msg(s, &m);
53 
54     /* Send the line */
55     send_bytes(s, str, m.u.size);
56 }
57 
send_urgent_ok(int s)58 static void send_urgent_ok(int s)
59 {
60     struct msg m;
61 
62     /* Message */
63     m.type = URGENT_OK;
64 
65     send_msg(s, &m);
66 }
67 
send_swap_jobs_ok(int s)68 static void send_swap_jobs_ok(int s)
69 {
70     struct msg m;
71 
72     /* Message */
73     m.type = SWAP_JOBS_OK;
74 
75     send_msg(s, &m);
76 }
77 
find_previous_job(const struct Job * final)78 static struct Job * find_previous_job(const struct Job *final)
79 {
80     struct Job *p;
81 
82     /* Show Queued or Running jobs */
83     p = firstjob;
84     while(p != 0)
85     {
86         if (p->next == final)
87             return p;
88         p = p->next;
89     }
90 
91     return 0;
92 }
93 
findjob(int jobid)94 static struct Job * findjob(int jobid)
95 {
96     struct Job *p;
97 
98     /* Show Queued or Running jobs */
99     p = firstjob;
100     while(p != 0)
101     {
102         if (p->jobid == jobid)
103             return p;
104         p = p->next;
105     }
106 
107     return 0;
108 }
109 
findjob_holding_client()110 static struct Job * findjob_holding_client()
111 {
112     struct Job *p;
113 
114     /* Show Queued or Running jobs */
115     p = firstjob;
116     while(p != 0)
117     {
118         if (p->state == HOLDING_CLIENT)
119             return p;
120         p = p->next;
121     }
122 
123     return 0;
124 }
125 
find_finished_job(int jobid)126 static struct Job * find_finished_job(int jobid)
127 {
128     struct Job *p;
129 
130     /* Show Queued or Running jobs */
131     p = first_finished_job;
132     while(p != 0)
133     {
134         if (p->jobid == jobid)
135             return p;
136         p = p->next;
137     }
138 
139     return 0;
140 }
141 
count_not_finished_jobs()142 static int count_not_finished_jobs()
143 {
144     int count=0;
145     struct Job *p;
146 
147     /* Show Queued or Running jobs */
148     p = firstjob;
149     while(p != 0)
150     {
151         ++count;
152         p = p->next;
153     }
154     return count;
155 }
156 
add_notify_errorlevel_to(struct Job * job,int jobid)157 static void add_notify_errorlevel_to(struct Job *job, int jobid)
158 {
159     int *p;
160     int newsize = (job->notify_errorlevel_to_size + 1)
161         * sizeof(int);
162     p = (int *) realloc(job->notify_errorlevel_to,
163             newsize);
164 
165     if (p == 0)
166         error("Cannot allocate more memory for notify_errorlist_to for jobid %i,"
167                 " having already %i elements",
168                 job->jobid, job->notify_errorlevel_to_size);
169 
170     job->notify_errorlevel_to = p;
171     job->notify_errorlevel_to_size += 1;
172     job->notify_errorlevel_to[job->notify_errorlevel_to_size - 1] = jobid;
173 }
174 
s_mark_job_running(int jobid)175 void s_mark_job_running(int jobid)
176 {
177     struct Job *p;
178     p = findjob(jobid);
179     if (!p)
180         error("Cannot mark the jobid %i RUNNING.", jobid);
181     p->state = RUNNING;
182 }
183 
184 /* -1 means nothing awaken, otherwise returns the jobid awaken */
wake_hold_client()185 int wake_hold_client()
186 {
187     struct Job *p;
188     p = findjob_holding_client();
189     if (p)
190     {
191         p->state = QUEUED;
192         return p->jobid;
193     }
194     return -1;
195 }
196 
jstate2string(enum Jobstate s)197 const char * jstate2string(enum Jobstate s)
198 {
199     const char * jobstate;
200     switch(s)
201     {
202         case QUEUED:
203             jobstate = "queued";
204             break;
205         case RUNNING:
206             jobstate = "running";
207             break;
208         case FINISHED:
209             jobstate = "finished";
210             break;
211         case SKIPPED:
212             jobstate = "skipped";
213             break;
214         case HOLDING_CLIENT:
215             jobstate = "skipped";
216             break;
217     }
218     return jobstate;
219 }
220 
s_list(int s)221 void s_list(int s)
222 {
223     struct Job *p;
224     char *buffer;
225 
226     /* Times:   0.00/0.00/0.00 - 4+4+4+2 = 14*/
227     buffer = joblist_headers();
228     send_list_line(s,buffer);
229     free(buffer);
230 
231     /* Show Queued or Running jobs */
232     p = firstjob;
233     while(p != 0)
234     {
235         if (p->state != HOLDING_CLIENT)
236         {
237             buffer = joblist_line(p);
238             send_list_line(s,buffer);
239             free(buffer);
240         }
241         p = p->next;
242     }
243 
244     p = first_finished_job;
245 
246     /* Show Finished jobs */
247     while(p != 0)
248     {
249         buffer = joblist_line(p);
250         send_list_line(s,buffer);
251         free(buffer);
252         p = p->next;
253     }
254 }
255 
newjobptr()256 static struct Job * newjobptr()
257 {
258     struct Job *p;
259 
260     if (firstjob == 0)
261     {
262         firstjob = (struct Job *) malloc(sizeof(*firstjob));
263         firstjob->next = 0;
264         firstjob->output_filename = 0;
265         firstjob->command = 0;
266         return firstjob;
267     }
268 
269     p = firstjob;
270     while(p->next != 0)
271         p = p->next;
272 
273     p->next = (struct Job *) malloc(sizeof(*p));
274     p->next->next = 0;
275     p->next->output_filename = 0;
276     p->next->command = 0;
277 
278     return p->next;
279 }
280 
281 /* Returns -1 if no last job id found */
find_last_jobid_in_queue(int neglect_jobid)282 static int find_last_jobid_in_queue(int neglect_jobid)
283 {
284     struct Job *p;
285     int last_jobid = -1;
286 
287     p = firstjob;
288     while(p != 0)
289     {
290         if (p->jobid != neglect_jobid &&
291             p->jobid > last_jobid)
292             last_jobid = p->jobid;
293         p = p->next;
294     }
295 
296     return last_jobid;
297 }
298 
299 /* Returns -1 if no last job id found */
find_last_stored_jobid_finished()300 static int find_last_stored_jobid_finished()
301 {
302     struct Job *p;
303     int last_jobid = -1;
304 
305     p = first_finished_job;
306     while(p != 0)
307     {
308         if (p->jobid > last_jobid)
309             last_jobid = p->jobid;
310         p = p->next;
311     }
312 
313     return last_jobid;
314 }
315 
316 /* Returns job id or -1 on error */
s_newjob(int s,struct msg * m)317 int s_newjob(int s, struct msg *m)
318 {
319     struct Job *p;
320     int res;
321 
322     p = newjobptr();
323 
324     p->jobid = jobids++;
325     if (count_not_finished_jobs() < max_jobs)
326         p->state = QUEUED;
327     else
328         p->state = HOLDING_CLIENT;
329     p->num_slots = m->u.newjob.num_slots;
330     p->store_output = m->u.newjob.store_output;
331     p->should_keep_finished = m->u.newjob.should_keep_finished;
332     p->notify_errorlevel_to = 0;
333     p->notify_errorlevel_to_size = 0;
334     p->do_depend = m->u.newjob.do_depend;
335     p->depend_on = -1; /* By default. May be overriden in the next conditions */
336     if (m->u.newjob.do_depend == 1)
337     {
338         /* Depend on the last queued job. */
339 
340         /* As we already have 'p' in the queue,
341          * neglect it during the find_last_jobid_in_queue() */
342         if (m->u.newjob.depend_on == -1)
343         {
344             p->depend_on = find_last_jobid_in_queue(p->jobid);
345 
346             /* We don't trust the last jobid in the queue (running or queued)
347              * if it's not the last added job. In that case, let
348              * the next control flow handle it as if it could not
349              * do_depend on any still queued job. */
350             if (last_finished_jobid > p->depend_on)
351                 p->depend_on = -1;
352 
353             /* If it's queued still without result, let it know
354              * its result to p when it finishes. */
355             if (p->depend_on != -1)
356             {
357                 struct Job *depended_job;
358                 depended_job = findjob(p->depend_on);
359                 if (depended_job != 0)
360                     add_notify_errorlevel_to(depended_job, p->jobid);
361                 else
362                     warning("The jobid %i is queued to do_depend on the jobid %i"
363                         " suddenly non existent in the queue", p->jobid,
364                         p->depend_on);
365             }
366             else /* Otherwise take the finished job, or the last_errorlevel */
367             {
368                 if (m->u.newjob.depend_on == -1)
369                 {
370                     int ljobid = find_last_stored_jobid_finished();
371                     p->depend_on = ljobid;
372 
373                     /* If we have a newer result stored, use it */
374                     /* NOTE:
375                      *   Reading this now, I don't know how ljobid can be
376                      *   greater than last_finished_jobid */
377                     if (last_finished_jobid < ljobid)
378                     {
379                         struct Job *parent;
380                         parent = find_finished_job(ljobid);
381                         if (!parent)
382                             error("jobid %i suddenly disappeared from the finished list",
383                                 ljobid);
384                         p->dependency_errorlevel = parent->result.errorlevel;
385                     }
386                     else
387                         p->dependency_errorlevel = last_errorlevel;
388                 }
389             }
390         }
391         else
392         {
393             /* The user decided what's the job this new job depends on */
394             struct Job *depended_job;
395 
396             p->depend_on = m->u.newjob.depend_on;
397 
398             depended_job = findjob(p->depend_on);
399             if (depended_job != 0)
400                 add_notify_errorlevel_to(depended_job, p->jobid);
401             else
402             {
403                 struct Job *parent;
404                 parent = find_finished_job(p->depend_on);
405                 if (parent)
406                 {
407                     p->dependency_errorlevel = parent->result.errorlevel;
408                 }
409                 else
410                 {
411                     /* We consider as if the job not found
412                        didn't finish well */
413                     p->dependency_errorlevel = -1;
414                 }
415             }
416         }
417     }
418 
419 
420     pinfo_init(&p->info);
421     pinfo_set_enqueue_time(&p->info);
422 
423     /* load the command */
424     p->command = malloc(m->u.newjob.command_size);
425     if (p->command == 0)
426         error("Cannot allocate memory in s_newjob command_size (%i)",
427                 m->u.newjob.command_size);
428     res = recv_bytes(s, p->command, m->u.newjob.command_size);
429     if (res == -1)
430         error("wrong bytes received");
431 
432     /* load the label */
433     p->label = 0;
434     if (m->u.newjob.label_size > 0)
435     {
436         char *ptr;
437         ptr = (char *) malloc(m->u.newjob.label_size);
438         if (ptr == 0)
439             error("Cannot allocate memory in s_newjob env_size(%i)",
440                     m->u.newjob.env_size);
441         res = recv_bytes(s, ptr, m->u.newjob.label_size);
442         if (res == -1)
443             error("wrong bytes received");
444         p->label = ptr;
445     }
446 
447     /* load the info */
448     if (m->u.newjob.env_size > 0)
449     {
450         char *ptr;
451         ptr = (char *) malloc(m->u.newjob.env_size);
452         if (ptr == 0)
453             error("Cannot allocate memory in s_newjob env_size(%i)",
454                     m->u.newjob.env_size);
455         res = recv_bytes(s, ptr, m->u.newjob.env_size);
456         if (res == -1)
457             error("wrong bytes received");
458         pinfo_addinfo(&p->info, m->u.newjob.env_size+100,
459                 "Environment:\n%s", ptr);
460         free(ptr);
461     }
462 
463     return p->jobid;
464 }
465 
466 /* This assumes the jobid exists */
s_removejob(int jobid)467 void s_removejob(int jobid)
468 {
469     struct Job *p;
470     struct Job *newnext;
471 
472     if (firstjob->jobid == jobid)
473     {
474         struct Job *newfirst;
475 
476         /* First job is to be removed */
477         newfirst = firstjob->next;
478         free(firstjob->command);
479         free(firstjob->output_filename);
480         pinfo_free(&firstjob->info);
481         free(firstjob->label);
482         free(firstjob);
483         firstjob = newfirst;
484         return;
485     }
486 
487     p = firstjob;
488     /* Not first job */
489     while (p->next != 0)
490     {
491         if (p->next->jobid == jobid)
492             break;
493         p = p->next;
494     }
495     if (p->next == 0)
496         error("Job to be removed not found. jobid=%i", jobid);
497 
498     newnext = p->next->next;
499 
500     free(p->next->command);
501     free(p->next);
502     p->next = newnext;
503 }
504 
505 /* -1 if no one should be run. */
next_run_job()506 int next_run_job()
507 {
508     struct Job *p;
509 
510     const int free_slots = max_slots - busy_slots;
511 
512     /* busy_slots may be bigger than the maximum slots,
513      * if the user was running many jobs, and suddenly
514      * trimmed the maximum slots down. */
515     if (free_slots <= 0)
516         return -1;
517 
518     /* If there are no jobs to run... */
519     if (firstjob == 0)
520         return -1;
521 
522     /* Look for a runnable task */
523     p = firstjob;
524     while(p != 0)
525     {
526         if (p->state == QUEUED)
527         {
528             if (p->depend_on >= 0)
529             {
530                 struct Job *do_depend_job = get_job(p->depend_on);
531                 /* We won't try to run any job do_depending on an unfinished
532                  * job */
533                 if (do_depend_job != NULL &&
534                     (do_depend_job->state == QUEUED || do_depend_job->state == RUNNING))
535                 {
536                     /* Next try */
537                     p = p->next;
538                     continue;
539                 }
540             }
541 
542             if (free_slots >= p->num_slots)
543             {
544                 busy_slots = busy_slots + p->num_slots;
545                 return p->jobid;
546             }
547         }
548         p = p->next;
549     }
550 
551     return -1;
552 }
553 
554 /* Returns 1000 if no limit, The limit otherwise. */
get_max_finished_jobs()555 static int get_max_finished_jobs()
556 {
557     char *limit;
558 
559     limit = getenv("TS_MAXFINISHED");
560     if (limit == NULL)
561         return 1000;
562     return abs(atoi(limit));
563 }
564 
565 /* Add the job to the finished queue. */
new_finished_job(struct Job * j)566 static void new_finished_job(struct Job *j)
567 {
568     struct Job *p;
569     int count, max;
570 
571     max = get_max_finished_jobs();
572     count = 0;
573 
574     if (first_finished_job == 0 && count < max)
575     {
576         first_finished_job = j;
577         first_finished_job->next = 0;
578         return;
579     }
580 
581     ++count;
582 
583     p = first_finished_job;
584     while(p->next != 0)
585     {
586         p = p->next;
587         ++count;
588     }
589 
590     /* If too many jobs, wipe out the first */
591     if (count >= max)
592     {
593         struct Job *tmp;
594         tmp = first_finished_job;
595         first_finished_job = first_finished_job->next;
596         free(tmp->command);
597         free(tmp->output_filename);
598         pinfo_free(&tmp->info);
599         free(tmp->label);
600         free(tmp);
601     }
602     p->next = j;
603     p->next->next = 0;
604 
605     return;
606 }
607 
job_is_in_state(int jobid,enum Jobstate state)608 static int job_is_in_state(int jobid, enum Jobstate state)
609 {
610     struct Job *p;
611 
612     p = findjob(jobid);
613     if (p == 0)
614         return 0;
615     if (p->state == state)
616         return 1;
617     return 0;
618 }
619 
job_is_running(int jobid)620 int job_is_running(int jobid)
621 {
622     return job_is_in_state(jobid, RUNNING);
623 }
624 
job_is_holding_client(int jobid)625 int job_is_holding_client(int jobid)
626 {
627     return job_is_in_state(jobid, HOLDING_CLIENT);
628 }
629 
in_notify_list(int jobid)630 static int in_notify_list(int jobid)
631 {
632     struct Notify *n, *tmp;
633 
634     n = first_notify;
635     while (n != 0)
636     {
637         tmp = n;
638         n = n->next;
639         if (tmp->jobid == jobid)
640             return 1;
641     }
642     return 0;
643 }
644 
job_finished(const struct Result * result,int jobid)645 void job_finished(const struct Result *result, int jobid)
646 {
647     struct Job *p;
648 
649     if (busy_slots <= 0)
650         error("Wrong state in the server. busy_slots = %i instead of greater than 0", busy_slots);
651 
652     p = findjob(jobid);
653     if (p == 0)
654         error("on jobid %i finished, it doesn't exist", jobid);
655 
656     /* The job may be not only in running state, but also in other states, as
657      * we call this to clean up the jobs list in case of the client closing the
658      * connection. */
659     if (p->state == RUNNING)
660         busy_slots = busy_slots - p->num_slots;
661 
662     /* Mark state */
663     if (result->skipped)
664         p->state = SKIPPED;
665     else
666         p->state = FINISHED;
667     p->result = *result;
668     last_finished_jobid = p->jobid;
669     notify_errorlevel(p);
670     pinfo_set_end_time(&p->info);
671 
672     if (p->result.died_by_signal)
673         pinfo_addinfo(&p->info, 100, "Exit status: killed by signal %i\n", p->result.signal);
674     else
675         pinfo_addinfo(&p->info, 100, "Exit status: died with exit code %i\n", p->result.errorlevel);
676 
677     /* Find the pointing node, to
678      * update it removing the finished job. */
679     {
680         struct Job **jpointer = 0;
681         struct Job *newfirst = p->next;
682         if (firstjob == p)
683             jpointer = &firstjob;
684         else
685         {
686             struct Job *p2;
687             p2 = firstjob;
688             while(p2 != 0)
689             {
690                 if (p2->next == p)
691                 {
692                     jpointer = &(p2->next);
693                     break;
694                 }
695                 p2 = p2->next;
696             }
697         }
698 
699         /* Add it to the finished queue (maybe temporarily) */
700         if (p->should_keep_finished || in_notify_list(p->jobid))
701             new_finished_job(p);
702 
703         /* Remove it from the run queue */
704         if (jpointer == 0)
705             error("Cannot remove a finished job from the "
706                 "queue list (jobid=%i)", p->jobid);
707 
708         *jpointer = newfirst;
709     }
710 }
711 
s_clear_finished()712 void s_clear_finished()
713 {
714     struct Job *p;
715 
716     if (first_finished_job == 0)
717         return;
718 
719     p = first_finished_job;
720     first_finished_job = 0;
721 
722     while (p != 0)
723     {
724         struct Job *tmp;
725         tmp = p->next;
726         free(p->command);
727         free(p->output_filename);
728         pinfo_free(&p->info);
729         free(p->label);
730         free(p);
731         p = tmp;
732     }
733 }
734 
s_process_runjob_ok(int jobid,char * oname,int pid)735 void s_process_runjob_ok(int jobid, char *oname, int pid)
736 {
737     struct Job *p;
738     p = findjob(jobid);
739     if (p == 0)
740         error("Job %i already run not found on runjob_ok", jobid);
741     if (p->state != RUNNING)
742         error("Job %i not running, but %i on runjob_ok", jobid,
743                 p->state);
744 
745     p->pid = pid;
746     p->output_filename = oname;
747     pinfo_set_start_time(&p->info);
748 }
749 
s_send_runjob(int s,int jobid)750 void s_send_runjob(int s, int jobid)
751 {
752     struct msg m;
753     struct Job *p;
754 
755     p = findjob(jobid);
756     if (p == 0)
757         error("Job %i was expected to run", jobid);
758 
759 
760     m.type = RUNJOB;
761 
762     /* TODO
763      * We should make the dependencies update the jobids they're do_depending on.
764      * Then, on finish, these could set the errorlevel to send to its dependency childs.
765      * We cannot consider that the jobs will leave traces in the finished job list (-nf?) . */
766 
767     m.u.last_errorlevel = p->dependency_errorlevel;
768 
769     send_msg(s, &m);
770 }
771 
s_job_info(int s,int jobid)772 void s_job_info(int s, int jobid)
773 {
774     struct Job *p = 0;
775     struct msg m;
776 
777     if (jobid == -1)
778     {
779         /* This means that we want the job info of the running task, or that
780          * of the last job run */
781         if (busy_slots > 0)
782         {
783             p = firstjob;
784             if (p == 0)
785                 error("Internal state WAITING, but job not run."
786                         "firstjob = %x", firstjob);
787         }
788         else
789         {
790             p = first_finished_job;
791             if (p == 0)
792             {
793                 send_list_line(s, "No jobs.\n");
794                 return;
795             }
796             while(p->next != 0)
797                 p = p->next;
798         }
799     } else
800     {
801         p = firstjob;
802         while (p != 0 && p->jobid != jobid)
803             p = p->next;
804 
805         /* Look in finished jobs if needed */
806         if (p == 0)
807         {
808             p = first_finished_job;
809             while (p != 0 && p->jobid != jobid)
810                 p = p->next;
811         }
812     }
813 
814     if (p == 0)
815     {
816         char tmp[50];
817         sprintf(tmp, "Job %i not finished or not running.\n", jobid);
818         send_list_line(s, tmp);
819         return;
820     }
821 
822     m.type = INFO_DATA;
823     send_msg(s, &m);
824     pinfo_dump(&p->info, s);
825     fd_nprintf(s, 100, "Command: ");
826     if (p->depend_on != -1)
827         fd_nprintf(s, 100, "[%i]&& ", p->depend_on);
828     write(s, p->command, strlen(p->command));
829     fd_nprintf(s, 100, "\n");
830     fd_nprintf(s, 100, "Slots required: %i\n", p->num_slots);
831     fd_nprintf(s, 100, "Enqueue time: %s",
832             ctime(&p->info.enqueue_time.tv_sec));
833     if (p->state == RUNNING)
834     {
835         fd_nprintf(s, 100, "Start time: %s",
836                 ctime(&p->info.start_time.tv_sec));
837         fd_nprintf(s, 100, "Time running: %fs\n",
838                 pinfo_time_until_now(&p->info));
839     } else if (p->state == FINISHED)
840     {
841         fd_nprintf(s, 100, "Start time: %s",
842                 ctime(&p->info.start_time.tv_sec));
843         fd_nprintf(s, 100, "End time: %s",
844                 ctime(&p->info.end_time.tv_sec));
845         fd_nprintf(s, 100, "Time run: %fs\n",
846                 pinfo_time_run(&p->info));
847     }
848 }
849 
s_send_output(int s,int jobid)850 void s_send_output(int s, int jobid)
851 {
852     struct Job *p = 0;
853     struct msg m;
854 
855     if (jobid == -1)
856     {
857         /* This means that we want the output info of the running task, or that
858          * of the last job run */
859         if (busy_slots > 0)
860         {
861             p = firstjob;
862             if (p == 0)
863                 error("Internal state WAITING, but job not run."
864                         "firstjob = %x", firstjob);
865         }
866         else
867         {
868             p = first_finished_job;
869             if (p == 0)
870             {
871                 send_list_line(s, "No jobs.\n");
872                 return;
873             }
874             while(p->next != 0)
875                 p = p->next;
876         }
877     } else
878     {
879         p = get_job(jobid);
880         if (p != 0 && p->state != RUNNING
881             && p->state != FINISHED
882             && p->state != SKIPPED)
883             p = 0;
884     }
885 
886     if (p == 0)
887     {
888         char tmp[50];
889         if (jobid == -1)
890             sprintf(tmp, "The last job has not finished or is not running.\n");
891         else
892             sprintf(tmp, "Job %i not finished or not running.\n", jobid);
893         send_list_line(s, tmp);
894         return;
895     }
896 
897     if (p->state == SKIPPED)
898     {
899         char tmp[50];
900         if (jobid == -1)
901             sprintf(tmp,  "The last job was skipped due to a dependency.\n");
902 
903         else
904             sprintf(tmp, "Job %i was skipped due to a dependency.\n", jobid);
905         send_list_line(s, tmp);
906         return;
907     }
908 
909     m.type = ANSWER_OUTPUT;
910     m.u.output.store_output = p->store_output;
911     m.u.output.pid = p->pid;
912     if (m.u.output.store_output && p->output_filename)
913         m.u.output.ofilename_size = strlen(p->output_filename) + 1;
914     else
915         m.u.output.ofilename_size = 0;
916     send_msg(s, &m);
917     if (m.u.output.ofilename_size > 0)
918         send_bytes(s, p->output_filename, m.u.output.ofilename_size);
919 }
920 
notify_errorlevel(struct Job * p)921 void notify_errorlevel(struct Job *p)
922 {
923     int i;
924 
925     last_errorlevel = p->result.errorlevel;
926 
927     for(i = 0; i < p->notify_errorlevel_to_size; ++i)
928     {
929         struct Job *notified;
930         notified = get_job(p->notify_errorlevel_to[i]);
931         if (notified)
932         {
933             notified->dependency_errorlevel = p->result.errorlevel;
934         }
935     }
936 }
937 
938 /* jobid is input/output. If the input is -1, it's changed to the jobid
939  * removed */
s_remove_job(int s,int * jobid)940 int s_remove_job(int s, int *jobid)
941 {
942     struct Job *p = 0;
943     struct msg m;
944     struct Job *before_p = 0;
945 
946     if (*jobid == -1)
947     {
948         /* Find the last job added */
949         p = firstjob;
950         if (p != 0)
951         {
952             while (p->next != 0)
953             {
954                 before_p = p;
955                 p = p->next;
956             }
957         } else
958         {
959             /* last 'finished' */
960             p = first_finished_job;
961             if (p)
962             {
963                 while (p->next != 0)
964                 {
965                     before_p = p;
966                     p = p->next;
967                 }
968             }
969         }
970     }
971     else
972     {
973         p = firstjob;
974         if (p != 0)
975         {
976             while (p->next != 0 && p->jobid != *jobid)
977             {
978                 before_p = p;
979                 p = p->next;
980             }
981         }
982 
983         /* If not found, look in the 'finished' list */
984         if (p == 0 || p->jobid != *jobid)
985         {
986             p = first_finished_job;
987             if (p != 0)
988             {
989                 while (p->next != 0 && p->jobid != *jobid)
990                 {
991                     before_p = p;
992                     p = p->next;
993                 }
994                 if (p->jobid != *jobid)
995                     p = 0;
996             }
997         }
998     }
999 
1000     if (p == 0 || p->state == RUNNING || p == firstjob)
1001     {
1002         char tmp[50];
1003         if (*jobid == -1)
1004             sprintf(tmp, "The last job cannot be removed.\n");
1005         else
1006             sprintf(tmp, "The job %i cannot be removed.\n", *jobid);
1007         send_list_line(s, tmp);
1008         return 0;
1009     }
1010 
1011     /* Return the jobid found */
1012     *jobid = p->jobid;
1013 
1014     /* Tricks for the check_notify_list */
1015     p->state = FINISHED;
1016     p->result.errorlevel = -1;
1017     notify_errorlevel(p);
1018 
1019     /* Notify the clients in wait_job */
1020     check_notify_list(m.u.jobid);
1021 
1022     /* Update the list pointers */
1023     if (p == first_finished_job)
1024         first_finished_job = p->next;
1025     else
1026         before_p->next = p->next;
1027 
1028     free(p->notify_errorlevel_to);
1029     free(p->command);
1030     free(p->output_filename);
1031     pinfo_free(&p->info);
1032     free(p->label);
1033     free(p);
1034 
1035     m.type = REMOVEJOB_OK;
1036     send_msg(s, &m);
1037     return 1;
1038 }
1039 
add_to_notify_list(int s,int jobid)1040 static void add_to_notify_list(int s, int jobid)
1041 {
1042     struct Notify *n;
1043     struct Notify *new;
1044 
1045     new = (struct Notify *) malloc(sizeof(*new));
1046 
1047     new->socket = s;
1048     new->jobid = jobid;
1049     new->next = 0;
1050 
1051     n = first_notify;
1052     if (n == 0)
1053     {
1054         first_notify = new;
1055         return;
1056     }
1057 
1058     while(n->next != 0)
1059         n = n->next;
1060 
1061     n->next = new;
1062 }
1063 
send_waitjob_ok(int s,int errorlevel)1064 static void send_waitjob_ok(int s, int errorlevel)
1065 {
1066     struct msg m;
1067 
1068     m.type = WAITJOB_OK;
1069     m.u.result.errorlevel = errorlevel;
1070     send_msg(s, &m);
1071 }
1072 
1073 static struct Job *
get_job(int jobid)1074 get_job(int jobid)
1075 {
1076     struct Job *j;
1077 
1078     j = findjob(jobid);
1079     if (j != NULL)
1080         return j;
1081 
1082     j = find_finished_job(jobid);
1083 
1084     if (j != NULL)
1085         return j;
1086 
1087     return 0;
1088 }
1089 
1090 /* Don't complain, if the socket doesn't exist */
s_remove_notification(int s)1091 void s_remove_notification(int s)
1092 {
1093     struct Notify *n;
1094     struct Notify *previous;
1095     n = first_notify;
1096     while (n != 0 && n->socket != s)
1097         n = n->next;
1098     if (n == 0 || n->socket != s)
1099         return;
1100 
1101     /* Remove the notification */
1102     previous = first_notify;
1103     if (n == previous)
1104     {
1105         first_notify = n->next;
1106         free(n);
1107         return;
1108     }
1109 
1110     /* if not the first... */
1111     while(previous->next != n)
1112         previous = previous->next;
1113 
1114     previous->next = n->next;
1115     free(n);
1116 }
1117 
destroy_finished_job(struct Job * j)1118 static void destroy_finished_job(struct Job *j)
1119 {
1120     if (j == first_finished_job)
1121         first_finished_job = j->next;
1122     else
1123     {
1124         struct Job *i;
1125         for(i = first_finished_job; i != 0; ++i)
1126         {
1127             if (i->next == j)
1128             {
1129                 i->next = j->next;
1130                 break;
1131             }
1132         }
1133         if (i == 0) {
1134             error("Cannot destroy the expected job %i", j->jobid);
1135         }
1136     }
1137 
1138     free(j->notify_errorlevel_to);
1139     free(j->command);
1140     free(j->output_filename);
1141     pinfo_free(&j->info);
1142     free(j->label);
1143     free(j);
1144 }
1145 
1146 /* This is called when a job finishes */
check_notify_list(int jobid)1147 void check_notify_list(int jobid)
1148 {
1149     struct Notify *n, *tmp;
1150     struct Job *j;
1151 
1152     n = first_notify;
1153     while (n != 0)
1154     {
1155         tmp = n;
1156         n = n->next;
1157         if (tmp->jobid == jobid)
1158         {
1159             j = get_job(jobid);
1160             /* If the job finishes, notify the waiter */
1161             if (j->state == FINISHED || j->state == SKIPPED)
1162             {
1163                 send_waitjob_ok(tmp->socket, j->result.errorlevel);
1164                 /* We want to get the next Nofity* before we remove
1165                  * the actual 'n'. As s_remove_notification() simply
1166                  * removes the element from the linked list, we can
1167                  * safely follow on the list from n->next. */
1168                 s_remove_notification(tmp->socket);
1169 
1170                 /* Remove the jobs that were temporarily in the finished list,
1171                  * just for their notifiers. */
1172                 if (!in_notify_list(jobid) && !j->should_keep_finished) {
1173                     destroy_finished_job(j);
1174                 }
1175             }
1176         }
1177     }
1178 }
1179 
s_wait_job(int s,int jobid)1180 void s_wait_job(int s, int jobid)
1181 {
1182     struct Job *p = 0;
1183 
1184     if (jobid == -1)
1185     {
1186         /* Find the last job added */
1187         p = firstjob;
1188 
1189         if (p != 0)
1190             while (p->next != 0)
1191                 p = p->next;
1192 
1193         /* Look in finished jobs if needed */
1194         if (p == 0)
1195         {
1196             p = first_finished_job;
1197             if (p != 0)
1198                 while (p->next != 0)
1199                     p = p->next;
1200         }
1201     }
1202     else
1203     {
1204         p = firstjob;
1205         while (p != 0 && p->jobid != jobid)
1206             p = p->next;
1207 
1208         /* Look in finished jobs if needed */
1209         if (p == 0)
1210         {
1211             p = first_finished_job;
1212             while (p != 0 && p->jobid != jobid)
1213                 p = p->next;
1214         }
1215     }
1216 
1217     if (p == 0)
1218     {
1219         char tmp[50];
1220         if (jobid == -1)
1221             sprintf(tmp, "The last job cannot be waited.\n");
1222         else
1223             sprintf(tmp, "The job %i cannot be waited.\n", jobid);
1224         send_list_line(s, tmp);
1225         return;
1226     }
1227 
1228     if (p->state == FINISHED || p->state == SKIPPED)
1229     {
1230         send_waitjob_ok(s, p->result.errorlevel);
1231     }
1232     else
1233         add_to_notify_list(s, p->jobid);
1234 }
1235 
s_wait_running_job(int s,int jobid)1236 void s_wait_running_job(int s, int jobid)
1237 {
1238     struct Job *p = 0;
1239 
1240     /* The job finding algorithm should be similar to that of
1241      * s_send_output, because this will be used by "-t" and "-c" */
1242     if (jobid == -1)
1243     {
1244         /* This means that we want the output info of the running task, or that
1245          * of the last job run */
1246         if (busy_slots > 0)
1247         {
1248             p = firstjob;
1249             if (p == 0)
1250                 error("Internal state WAITING, but job not run."
1251                         "firstjob = %x", firstjob);
1252         }
1253         else
1254         {
1255             p = first_finished_job;
1256             if (p == 0)
1257             {
1258                 send_list_line(s, "No jobs.\n");
1259                 return;
1260             }
1261             while(p->next != 0)
1262                 p = p->next;
1263         }
1264     }
1265     else
1266     {
1267         p = firstjob;
1268         while (p != 0 && p->jobid != jobid)
1269             p = p->next;
1270 
1271         /* Look in finished jobs if needed */
1272         if (p == 0)
1273         {
1274             p = first_finished_job;
1275             while (p != 0 && p->jobid != jobid)
1276                 p = p->next;
1277         }
1278     }
1279 
1280     if (p == 0)
1281     {
1282         char tmp[50];
1283         if (jobid == -1)
1284             sprintf(tmp, "The last job cannot be waited.\n");
1285         else
1286             sprintf(tmp, "The job %i cannot be waited.\n", jobid);
1287         send_list_line(s, tmp);
1288         return;
1289     }
1290 
1291     if (p->state == FINISHED || p->state == SKIPPED)
1292     {
1293         send_waitjob_ok(s, p->result.errorlevel);
1294     }
1295     else
1296         add_to_notify_list(s, p->jobid);
1297 }
1298 
s_set_max_slots(int new_max_slots)1299 void s_set_max_slots(int new_max_slots)
1300 {
1301     if (new_max_slots > 0)
1302         max_slots = new_max_slots;
1303     else
1304         warning("Received new_max_slots=%i", new_max_slots);
1305 }
1306 
s_get_max_slots(int s)1307 void s_get_max_slots(int s)
1308 {
1309     struct msg m;
1310 
1311     /* Message */
1312     m.type = GET_MAX_SLOTS_OK;
1313     m.u.max_slots = max_slots;
1314 
1315     send_msg(s, &m);
1316 }
1317 
s_move_urgent(int s,int jobid)1318 void s_move_urgent(int s, int jobid)
1319 {
1320     struct Job *p = 0;
1321     struct Job *tmp1;
1322 
1323     if (jobid == -1)
1324     {
1325         /* Find the last job added */
1326         p = firstjob;
1327 
1328         if (p != 0)
1329             while (p->next != 0)
1330                 p = p->next;
1331     }
1332     else
1333     {
1334         p = firstjob;
1335         while (p != 0 && p->jobid != jobid)
1336             p = p->next;
1337     }
1338 
1339     if (p == 0 || firstjob->next == 0)
1340     {
1341         char tmp[50];
1342         if (jobid == -1)
1343             sprintf(tmp, "The last job cannot be urged.\n");
1344         else
1345             sprintf(tmp, "The job %i cannot be urged.\n", jobid);
1346         send_list_line(s, tmp);
1347         return;
1348     }
1349 
1350     /* Interchange the pointers */
1351     tmp1 = find_previous_job(p);
1352     tmp1->next = p->next;
1353     p->next = firstjob->next;
1354     firstjob->next = p;
1355 
1356 
1357     send_urgent_ok(s);
1358 }
1359 
s_swap_jobs(int s,int jobid1,int jobid2)1360 void s_swap_jobs(int s, int jobid1, int jobid2)
1361 {
1362     struct Job *p1, *p2;
1363     struct Job *prev1, *prev2;
1364     struct Job *tmp;
1365 
1366     p1 = findjob(jobid1);
1367     p2 = findjob(jobid2);
1368 
1369     if (p1 == 0 || p2 == 0 || p1 == firstjob || p2 == firstjob)
1370     {
1371         char prev[60];
1372         sprintf(prev, "The jobs %i and %i cannot be swapped.\n", jobid1, jobid2);
1373         send_list_line(s, prev);
1374         return;
1375     }
1376 
1377     /* Interchange the pointers */
1378     prev1 = find_previous_job(p1);
1379     prev2 = find_previous_job(p2);
1380     prev1->next = p2;
1381     prev2->next = p1;
1382     tmp = p1->next;
1383     p1->next = p2->next;
1384     p2->next = tmp;
1385 
1386     send_swap_jobs_ok(s);
1387 }
1388 
send_state(int s,enum Jobstate state)1389 static void send_state(int s, enum Jobstate state)
1390 {
1391     struct msg m;
1392 
1393     m.type = ANSWER_STATE;
1394     m.u.state = state;
1395 
1396     send_msg(s, &m);
1397 }
1398 
s_send_state(int s,int jobid)1399 void s_send_state(int s, int jobid)
1400 {
1401     struct Job *p = 0;
1402 
1403     if (jobid == -1)
1404     {
1405         /* Find the last job added */
1406         p = firstjob;
1407 
1408         if (p != 0)
1409             while (p->next != 0)
1410                 p = p->next;
1411 
1412         /* Look in finished jobs if needed */
1413         if (p == 0)
1414         {
1415             p = first_finished_job;
1416             if (p != 0)
1417                 while (p->next != 0)
1418                     p = p->next;
1419         }
1420 
1421     }
1422     else
1423     {
1424         p = get_job(jobid);
1425     }
1426 
1427     if (p == 0)
1428     {
1429         char tmp[50];
1430         if (jobid == -1)
1431             sprintf(tmp, "The last job cannot be stated.\n");
1432         else
1433             sprintf(tmp, "The job %i cannot be stated.\n", jobid);
1434         send_list_line(s, tmp);
1435         return;
1436     }
1437 
1438     /* Interchange the pointers */
1439     send_state(s, p->state);
1440 }
1441 
dump_job_struct(FILE * out,const struct Job * p)1442 static void dump_job_struct(FILE *out, const struct Job *p)
1443 {
1444     fprintf(out, "  new_job\n");
1445     fprintf(out, "    jobid %i\n", p->jobid);
1446     fprintf(out, "    command \"%s\"\n", p->command);
1447     fprintf(out, "    state %s\n",
1448             jstate2string(p->state));
1449     fprintf(out, "    result.errorlevel %i\n", p->result.errorlevel);
1450     fprintf(out, "    output_filename \"%s\"\n",
1451             p->output_filename ? p->output_filename : "NULL");
1452     fprintf(out, "    store_output %i\n", p->store_output);
1453     fprintf(out, "    pid %i\n", p->pid);
1454     fprintf(out, "    should_keep_finished %i\n", p->should_keep_finished);
1455 }
1456 
dump_jobs_struct(FILE * out)1457 void dump_jobs_struct(FILE *out)
1458 {
1459     const struct Job *p;
1460 
1461     fprintf(out, "New_jobs\n");
1462 
1463     p = firstjob;
1464     while (p != 0)
1465     {
1466         dump_job_struct(out, p);
1467         p = p->next;
1468     }
1469 
1470     p = first_finished_job;
1471     while (p != 0)
1472     {
1473         dump_job_struct(out, p);
1474         p = p->next;
1475     }
1476 }
1477 
dump_notify_struct(FILE * out,const struct Notify * n)1478 static void dump_notify_struct(FILE *out, const struct Notify *n)
1479 {
1480     fprintf(out, "  notify\n");
1481     fprintf(out, "    jobid %i\n", n->jobid);
1482     fprintf(out, "    socket \"%i\"\n", n->socket);
1483 }
1484 
dump_notifies_struct(FILE * out)1485 void dump_notifies_struct(FILE *out)
1486 {
1487     const struct Notify *n;
1488 
1489     fprintf(out, "New_notifies\n");
1490 
1491     n = first_notify;
1492     while (n != 0)
1493     {
1494         dump_notify_struct(out, n);
1495         n = n->next;
1496     }
1497 }
1498 
joblist_dump(int fd)1499 void joblist_dump(int fd)
1500 {
1501     struct Job *p;
1502     char *buffer;
1503 
1504     buffer = joblistdump_headers();
1505     write(fd,buffer, strlen(buffer));
1506     free(buffer);
1507 
1508     /* We reuse the headers from the list */
1509     buffer = joblist_headers();
1510     write(fd, "# ", 2);
1511     write(fd, buffer, strlen(buffer));
1512 
1513     /* Show Finished jobs */
1514     p = first_finished_job;
1515     while(p != 0)
1516     {
1517         buffer = joblist_line(p);
1518         write(fd, "# ", 2);
1519         write(fd,buffer, strlen(buffer));
1520         free(buffer);
1521         p = p->next;
1522     }
1523 
1524     write(fd, "\n", 1);
1525 
1526     /* Show Queued or Running jobs */
1527     p = firstjob;
1528     while(p != 0)
1529     {
1530         buffer = joblistdump_torun(p);
1531         write(fd,buffer,strlen(buffer));
1532         free(buffer);
1533         p = p->next;
1534     }
1535 }
1536