1 /*
2     Task Spooler - a task queue system for the unix user
3     Copyright (C) 2007-2009  Lluís Batlle i Rossell
4 
5     Please find the license in the provided COPYING file.
6 */
7 #include <stdio.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <sys/time.h>
14 #include <signal.h>
15 #include "main.h"
16 
17 static void c_end_of_job(const struct Result *res);
18 static void c_wait_job_send();
19 static void c_wait_running_job_send();
20 
build_command_string()21 char *build_command_string()
22 {
23     int size;
24     int i;
25     int num;
26     char **array;
27     char *commandstring;
28 
29     size = 0;
30     num = command_line.command.num;
31     array = command_line.command.array;
32 
33     /* Count bytes needed */
34     for (i = 0; i < num; ++i)
35     {
36         /* The '1' is for spaces, and at the last i,
37          * for the null character */
38         size = size + strlen(array[i]) + 1;
39     }
40 
41     /* Alloc */
42     commandstring = (char *) malloc(size);
43     if(commandstring == NULL)
44         error("Error in malloc for commandstring");
45 
46     /* Build the command */
47     strcpy(commandstring, array[0]);
48     for (i = 1; i < num; ++i)
49     {
50         strcat(commandstring, " ");
51         strcat(commandstring, array[i]);
52     }
53 
54     return commandstring;
55 }
56 
c_new_job()57 void c_new_job()
58 {
59     struct msg m;
60     char *new_command;
61     char *myenv;
62 
63     m.type = NEWJOB;
64 
65     new_command = build_command_string();
66 
67     myenv = get_environment();
68 
69     /* global */
70     m.u.newjob.command_size = strlen(new_command) + 1; /* add null */
71     if (myenv)
72         m.u.newjob.env_size = strlen(myenv) + 1; /* add null */
73     else
74         m.u.newjob.env_size = 0;
75     if (command_line.label)
76         m.u.newjob.label_size = strlen(command_line.label) + 1; /* add null */
77     else
78         m.u.newjob.label_size = 0;
79     m.u.newjob.store_output = command_line.store_output;
80     m.u.newjob.do_depend = command_line.do_depend;
81     m.u.newjob.depend_on = command_line.depend_on;
82     m.u.newjob.should_keep_finished = command_line.should_keep_finished;
83     m.u.newjob.command_size = strlen(new_command) + 1; /* add null */
84     m.u.newjob.wait_enqueuing = command_line.wait_enqueuing;
85     m.u.newjob.num_slots = command_line.num_slots;
86 
87     /* Send the message */
88     send_msg(server_socket, &m);
89 
90     /* Send the command */
91     send_bytes(server_socket, new_command, m.u.newjob.command_size);
92 
93     /* Send the label */
94     send_bytes(server_socket, command_line.label, m.u.newjob.label_size);
95 
96     /* Send the environment */
97     send_bytes(server_socket, myenv, m.u.newjob.env_size);
98 
99     free(new_command);
100     free(myenv);
101 }
102 
c_wait_newjob_ok()103 int c_wait_newjob_ok()
104 {
105     struct msg m;
106     int res;
107 
108     res = recv_msg(server_socket, &m);
109     if(res == -1)
110         error("Error in wait_newjob_ok");
111     if(m.type == NEWJOB_NOK)
112     {
113         fprintf(stderr, "Error, queue full\n");
114         exit(EXITCODE_QUEUE_FULL);
115     }
116     if(m.type != NEWJOB_OK)
117         error("Error getting the newjob_ok");
118 
119     return m.u.jobid;
120 }
121 
c_wait_server_commands()122 int c_wait_server_commands()
123 {
124     struct msg m;
125     int res;
126 
127     while (1)
128     {
129         res = recv_msg(server_socket, &m);
130         if(res == -1)
131             error("Error in wait_server_commands");
132 
133         if (res == 0)
134             break;
135         if(res != sizeof(m))
136             error("Error in wait_server_commands");
137         if (m.type == RUNJOB)
138         {
139             struct Result res;
140             res.skipped = 0;
141             /* These will send RUNJOB_OK */
142             if (command_line.do_depend && m.u.last_errorlevel != 0)
143             {
144                 res.errorlevel = -1;
145                 res.user_ms = 0.;
146                 res.system_ms = 0.;
147                 res.real_ms = 0.;
148                 res.skipped = 1;
149                 c_send_runjob_ok(0, -1);
150             }
151             else
152                 run_job(&res);
153             c_end_of_job(&res);
154             return res.errorlevel;
155         }
156     }
157     return -1;
158 }
159 
c_wait_server_lines()160 void c_wait_server_lines()
161 {
162     struct msg m;
163     int res;
164 
165     while (1)
166     {
167         res = recv_msg(server_socket, &m);
168         if(res == -1)
169             error("Error in wait_server_lines");
170 
171         if (res == 0)
172             break;
173         if(res != sizeof(m))
174             error("Error in wait_server_lines 2");
175         if (m.type == LIST_LINE)
176         {
177             char * buffer;
178             buffer = (char *) malloc(m.u.size);
179             recv_bytes(server_socket, buffer, m.u.size);
180             printf("%s", buffer);
181             free(buffer);
182         }
183     }
184 }
185 
c_list_jobs()186 void c_list_jobs()
187 {
188     struct msg m;
189 
190     m.type = LIST;
191 
192     send_msg(server_socket, &m);
193 }
194 
195 /* Exits if wrong */
c_check_version()196 void c_check_version()
197 {
198     struct msg m;
199     int res;
200 
201     m.type = GET_VERSION;
202     /* Double send, so an old ts will answer for sure at least once */
203     send_msg(server_socket, &m);
204     send_msg(server_socket, &m);
205 
206     /* Set up a 2 second timeout to receive the
207     version msg. */
208 
209     res = recv_msg(server_socket, &m);
210     if(res == -1)
211         error("Error calling recv_msg in c_check_version");
212     if (m.type != VERSION || m.u.version != PROTOCOL_VERSION)
213     {
214         printf("Wrong server version. Received %i, expecting %i\n",
215             m.u.version, PROTOCOL_VERSION);
216 
217         error("Wrong server version. Received %i, expecting %i",
218             m.u.version, PROTOCOL_VERSION);
219     }
220 
221     /* Receive also the 2nd send_msg if we got the right version */
222     res = recv_msg(server_socket, &m);
223     if(res == -1)
224         error("Error calling the 2nd recv_msg in c_check_version");
225 }
226 
c_show_info()227 void c_show_info()
228 {
229     struct msg m;
230     int res;
231 
232     m.type = INFO;
233     m.u.jobid = command_line.jobid;
234 
235     send_msg(server_socket, &m);
236 
237     while (1)
238     {
239         res = recv_msg(server_socket, &m);
240         if(res == -1)
241             error("Error in wait_server_lines");
242 
243         if (res == 0)
244             break;
245         if(res != sizeof(m))
246             error("Error in wait_server_lines 2");
247         if (m.type == INFO_DATA)
248         {
249             char * buffer;
250             enum { DSIZE = 1000 };
251 
252             /* We're going to output data using the stdout fd */
253             fflush(stdout);
254             buffer = (char *) malloc(DSIZE);
255             do
256             {
257                 res = recv(server_socket, buffer, DSIZE, 0);
258                 if (res > 0)
259                     write(1, buffer, res);
260             } while(res > 0);
261             free(buffer);
262         }
263     }
264 }
265 
c_send_runjob_ok(const char * ofname,int pid)266 void c_send_runjob_ok(const char *ofname, int pid)
267 {
268     struct msg m;
269 
270     /* Prepare the message */
271     m.type = RUNJOB_OK;
272     if (ofname) /* ofname == 0, skipped execution */
273 	m.u.output.store_output = command_line.store_output;
274     else
275 	m.u.output.store_output = 0;
276     m.u.output.pid = pid;
277     if (m.u.output.store_output)
278         m.u.output.ofilename_size = strlen(ofname) + 1;
279     else
280         m.u.output.ofilename_size = 0;
281 
282     send_msg(server_socket, &m);
283 
284     /* Send the filename */
285     if (command_line.store_output)
286         send_bytes(server_socket, ofname, m.u.output.ofilename_size);
287 }
288 
c_end_of_job(const struct Result * res)289 static void c_end_of_job(const struct Result *res)
290 {
291     struct msg m;
292 
293     m.type = ENDJOB;
294     m.u.result = *res; /* struct copy */
295 
296     send_msg(server_socket, &m);
297 }
298 
c_shutdown_server()299 void c_shutdown_server()
300 {
301     struct msg m;
302 
303     m.type = KILL_SERVER;
304     send_msg(server_socket, &m);
305 }
306 
c_clear_finished()307 void c_clear_finished()
308 {
309     struct msg m;
310 
311     m.type = CLEAR_FINISHED;
312     send_msg(server_socket, &m);
313 }
314 
get_output_file(int * pid)315 static char * get_output_file(int *pid)
316 {
317     struct msg m;
318     int res;
319     char *string = 0;
320 
321     /* Send the request */
322     m.type = ASK_OUTPUT;
323     m.u.jobid = command_line.jobid;
324     send_msg(server_socket, &m);
325 
326     /* Receive the answer */
327     res = recv_msg(server_socket, &m);
328     if(res != sizeof(m))
329         error("Error in get_output_file");
330     switch(m.type)
331     {
332     case ANSWER_OUTPUT:
333         if (m.u.output.store_output)
334         {
335             /* Receive the output file name */
336             string = 0;
337             if (m.u.output.ofilename_size > 0)
338             {
339                 string = (char *) malloc(m.u.output.ofilename_size);
340                 recv_bytes(server_socket, string, m.u.output.ofilename_size);
341             }
342             *pid = m.u.output.pid;
343             return string;
344         }
345         *pid = m.u.output.pid;
346         return 0;
347         /* WILL NOT GO FURTHER */
348     case LIST_LINE: /* Only ONE line accepted */
349         string = (char *) malloc(m.u.size);
350         res = recv_bytes(server_socket, string, m.u.size);
351         if(res != m.u.size)
352             error("Error in get_output_file line size");
353         fprintf(stderr, "Error in the request: %s",
354                 string);
355         exit(-1);
356         /* WILL NOT GO FURTHER */
357     default:
358         warning("Wrong internal message in get_output_file line size");
359     }
360     /* This will never be reached */
361     return 0;
362 }
363 
c_tail()364 int c_tail()
365 {
366     char *str;
367     int pid;
368     str = get_output_file(&pid);
369     if (str == 0)
370     {
371         fprintf(stderr, "The output is not stored. Cannot tail.\n");
372         exit(-1);
373     }
374 
375     c_wait_running_job_send();
376 
377     return tail_file(str, 10 /* Last lines to show */);
378 }
379 
c_cat()380 int c_cat()
381 {
382     char *str;
383     int pid;
384     str = get_output_file(&pid);
385     if (str == 0)
386     {
387         fprintf(stderr, "The output is not stored. Cannot cat.\n");
388         exit(-1);
389     }
390     c_wait_running_job_send();
391 
392     return tail_file(str, -1 /* All the lines */);
393 }
394 
c_show_output_file()395 void c_show_output_file()
396 {
397     char *str;
398     int pid;
399     /* This will exit if there is any error */
400     str = get_output_file(&pid);
401     if (str == 0)
402     {
403         fprintf(stderr, "The output is not stored.\n");
404         exit(-1);
405     }
406     printf("%s\n", str);
407     free(str);
408 }
409 
c_show_pid()410 void c_show_pid()
411 {
412     int pid;
413     /* This will exit if there is any error */
414     get_output_file(&pid);
415     printf("%i\n", pid);
416 }
417 
c_kill_job()418 void c_kill_job()
419 {
420     int pid = 0;
421     /* This will exit if there is any error */
422     get_output_file(&pid);
423 
424     if (pid == -1 || pid == 0)
425     {
426         fprintf(stderr, "Error: strange PID received: %i\n", pid);
427         exit(-1);
428     }
429 
430     /* Send SIGTERM to the process group, as pid is for process group */
431     kill(-pid, SIGTERM);
432 }
433 
c_remove_job()434 void c_remove_job()
435 {
436     struct msg m;
437     int res;
438     char *string = 0;
439 
440     /* Send the request */
441     m.type = REMOVEJOB;
442     m.u.jobid = command_line.jobid;
443     send_msg(server_socket, &m);
444 
445     /* Receive the answer */
446     res = recv_msg(server_socket, &m);
447     if(res != sizeof(m))
448         error("Error in remove_job");
449     switch(m.type)
450     {
451     case REMOVEJOB_OK:
452         return;
453         /* WILL NOT GO FURTHER */
454     case LIST_LINE: /* Only ONE line accepted */
455         string = (char *) malloc(m.u.size);
456         res = recv_bytes(server_socket, string, m.u.size);
457         fprintf(stderr, "Error in the request: %s",
458                 string);
459         exit(-1);
460         /* WILL NOT GO FURTHER */
461     default:
462         warning("Wrong internal message in remove_job");
463     }
464     /* This will never be reached */
465 }
466 
c_wait_job_recv()467 int c_wait_job_recv()
468 {
469     struct msg m;
470     int res;
471     char *string = 0;
472 
473     /* Receive the answer */
474     res = recv_msg(server_socket, &m);
475     if(res != sizeof(m))
476         error("Error in wait_job");
477     switch(m.type)
478     {
479     case WAITJOB_OK:
480         return m.u.result.errorlevel;
481         /* WILL NOT GO FURTHER */
482     case LIST_LINE: /* Only ONE line accepted */
483         string = (char *) malloc(m.u.size);
484         res = recv_bytes(server_socket, string, m.u.size);
485         if(res != m.u.size)
486             error("Error in wait_job - line size");
487         fprintf(stderr, "Error in the request: %s",
488                 string);
489         exit(-1);
490         /* WILL NOT GO FURTHER */
491     default:
492         warning("Wrong internal message in c_wait_job");
493     }
494     /* This will never be reached */
495     return -1;
496 }
497 
c_wait_job_send()498 static void c_wait_job_send()
499 {
500     struct msg m;
501 
502     /* Send the request */
503     m.type = WAITJOB;
504     m.u.jobid = command_line.jobid;
505     send_msg(server_socket, &m);
506 }
507 
c_wait_running_job_send()508 static void c_wait_running_job_send()
509 {
510     struct msg m;
511 
512     /* Send the request */
513     m.type = WAIT_RUNNING_JOB;
514     m.u.jobid = command_line.jobid;
515     send_msg(server_socket, &m);
516 }
517 
518 /* Returns the errorlevel */
c_wait_job()519 int c_wait_job()
520 {
521     c_wait_job_send();
522     return c_wait_job_recv();
523 }
524 
525 /* Returns the errorlevel */
c_wait_running_job()526 int c_wait_running_job()
527 {
528     c_wait_running_job_send();
529     return c_wait_job_recv();
530 }
531 
c_send_max_slots(int max_slots)532 void c_send_max_slots(int max_slots)
533 {
534     struct msg m;
535 
536     /* Send the request */
537     m.type = SET_MAX_SLOTS;
538     m.u.max_slots = command_line.max_slots;
539     send_msg(server_socket, &m);
540 }
541 
c_get_max_slots()542 void c_get_max_slots()
543 {
544     struct msg m;
545     int res;
546 
547     /* Send the request */
548     m.type = GET_MAX_SLOTS;
549     m.u.max_slots = command_line.max_slots;
550     send_msg(server_socket, &m);
551 
552     /* Receive the answer */
553     res = recv_msg(server_socket, &m);
554     if(res != sizeof(m))
555         error("Error in move_urgent");
556     switch(m.type)
557     {
558         case GET_MAX_SLOTS_OK:
559             printf("%i\n", m.u.max_slots);
560             return;
561         default:
562             warning("Wrong internal message in get_max_slots");
563     }
564 }
565 
c_move_urgent()566 void c_move_urgent()
567 {
568     struct msg m;
569     int res;
570     char *string = 0;
571 
572     /* Send the request */
573     m.type = URGENT;
574     m.u.jobid = command_line.jobid;
575     send_msg(server_socket, &m);
576 
577     /* Receive the answer */
578     res = recv_msg(server_socket, &m);
579     if(res != sizeof(m))
580         error("Error in move_urgent");
581     switch(m.type)
582     {
583     case URGENT_OK:
584         return;
585         /* WILL NOT GO FURTHER */
586     case LIST_LINE: /* Only ONE line accepted */
587         string = (char *) malloc(m.u.size);
588         res = recv_bytes(server_socket, string, m.u.size);
589         if(res != m.u.size)
590             error("Error in move_urgent - line size");
591         fprintf(stderr, "Error in the request: %s",
592                 string);
593         exit(-1);
594         /* WILL NOT GO FURTHER */
595     default:
596         warning("Wrong internal message in move_urgent");
597     }
598     /* This will never be reached */
599     return;
600 }
601 
c_get_state()602 void c_get_state()
603 {
604     struct msg m;
605     int res;
606     char *string = 0;
607 
608     /* Send the request */
609     m.type = GET_STATE;
610     m.u.jobid = command_line.jobid;
611     send_msg(server_socket, &m);
612 
613     /* Receive the answer */
614     res = recv_msg(server_socket, &m);
615     if(res != sizeof(m))
616         error("Error in get_state - line size");
617     switch(m.type)
618     {
619     case ANSWER_STATE:
620         printf("%s\n", jstate2string(m.u.state));
621         return;
622         /* WILL NOT GO FURTHER */
623     case LIST_LINE: /* Only ONE line accepted */
624         string = (char *) malloc(m.u.size);
625         res = recv_bytes(server_socket, string, m.u.size);
626         if(res != m.u.size)
627             error("Error in get_state - line size");
628         fprintf(stderr, "Error in the request: %s",
629                 string);
630         exit(-1);
631         /* WILL NOT GO FURTHER */
632     default:
633         warning("Wrong internal message in get_state");
634     }
635     /* This will never be reached */
636     return;
637 }
638 
c_swap_jobs()639 void c_swap_jobs()
640 {
641     struct msg m;
642     int res;
643     char *string = 0;
644 
645     /* Send the request */
646     m.type = SWAP_JOBS;
647     m.u.swap.jobid1 = command_line.jobid;
648     m.u.swap.jobid2 = command_line.jobid2;
649     send_msg(server_socket, &m);
650 
651     /* Receive the answer */
652     res = recv_msg(server_socket, &m);
653     if(res != sizeof(m))
654         error("Error in swap_jobs");
655     switch(m.type)
656     {
657     case SWAP_JOBS_OK:
658         return;
659         /* WILL NOT GO FURTHER */
660     case LIST_LINE: /* Only ONE line accepted */
661         string = (char *) malloc(m.u.size);
662         res = recv_bytes(server_socket, string, m.u.size);
663         if(res != m.u.size)
664             error("Error in swap_jobs - line size");
665         fprintf(stderr, "Error in the request: %s",
666                 string);
667         exit(-1);
668         /* WILL NOT GO FURTHER */
669     default:
670         warning("Wrong internal message in swap_jobs");
671     }
672     /* This will never be reached */
673     return;
674 }
675