1 /*
2 Task Spooler - a task queue system for the unix user
3 Copyright (C) 2007-2009 Lluís Batlle i Rossell
4
5 Please find the license in the provided COPYING file.
6 */
7 #include <stdio.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <sys/time.h>
14 #include <signal.h>
15 #include "main.h"
16
17 static void c_end_of_job(const struct Result *res);
18 static void c_wait_job_send();
19 static void c_wait_running_job_send();
20
build_command_string()21 char *build_command_string()
22 {
23 int size;
24 int i;
25 int num;
26 char **array;
27 char *commandstring;
28
29 size = 0;
30 num = command_line.command.num;
31 array = command_line.command.array;
32
33 /* Count bytes needed */
34 for (i = 0; i < num; ++i)
35 {
36 /* The '1' is for spaces, and at the last i,
37 * for the null character */
38 size = size + strlen(array[i]) + 1;
39 }
40
41 /* Alloc */
42 commandstring = (char *) malloc(size);
43 if(commandstring == NULL)
44 error("Error in malloc for commandstring");
45
46 /* Build the command */
47 strcpy(commandstring, array[0]);
48 for (i = 1; i < num; ++i)
49 {
50 strcat(commandstring, " ");
51 strcat(commandstring, array[i]);
52 }
53
54 return commandstring;
55 }
56
c_new_job()57 void c_new_job()
58 {
59 struct msg m;
60 char *new_command;
61 char *myenv;
62
63 m.type = NEWJOB;
64
65 new_command = build_command_string();
66
67 myenv = get_environment();
68
69 /* global */
70 m.u.newjob.command_size = strlen(new_command) + 1; /* add null */
71 if (myenv)
72 m.u.newjob.env_size = strlen(myenv) + 1; /* add null */
73 else
74 m.u.newjob.env_size = 0;
75 if (command_line.label)
76 m.u.newjob.label_size = strlen(command_line.label) + 1; /* add null */
77 else
78 m.u.newjob.label_size = 0;
79 m.u.newjob.store_output = command_line.store_output;
80 m.u.newjob.do_depend = command_line.do_depend;
81 m.u.newjob.depend_on = command_line.depend_on;
82 m.u.newjob.should_keep_finished = command_line.should_keep_finished;
83 m.u.newjob.command_size = strlen(new_command) + 1; /* add null */
84 m.u.newjob.wait_enqueuing = command_line.wait_enqueuing;
85 m.u.newjob.num_slots = command_line.num_slots;
86
87 /* Send the message */
88 send_msg(server_socket, &m);
89
90 /* Send the command */
91 send_bytes(server_socket, new_command, m.u.newjob.command_size);
92
93 /* Send the label */
94 send_bytes(server_socket, command_line.label, m.u.newjob.label_size);
95
96 /* Send the environment */
97 send_bytes(server_socket, myenv, m.u.newjob.env_size);
98
99 free(new_command);
100 free(myenv);
101 }
102
c_wait_newjob_ok()103 int c_wait_newjob_ok()
104 {
105 struct msg m;
106 int res;
107
108 res = recv_msg(server_socket, &m);
109 if(res == -1)
110 error("Error in wait_newjob_ok");
111 if(m.type == NEWJOB_NOK)
112 {
113 fprintf(stderr, "Error, queue full\n");
114 exit(EXITCODE_QUEUE_FULL);
115 }
116 if(m.type != NEWJOB_OK)
117 error("Error getting the newjob_ok");
118
119 return m.u.jobid;
120 }
121
c_wait_server_commands()122 int c_wait_server_commands()
123 {
124 struct msg m;
125 int res;
126
127 while (1)
128 {
129 res = recv_msg(server_socket, &m);
130 if(res == -1)
131 error("Error in wait_server_commands");
132
133 if (res == 0)
134 break;
135 if(res != sizeof(m))
136 error("Error in wait_server_commands");
137 if (m.type == RUNJOB)
138 {
139 struct Result res;
140 res.skipped = 0;
141 /* These will send RUNJOB_OK */
142 if (command_line.do_depend && m.u.last_errorlevel != 0)
143 {
144 res.errorlevel = -1;
145 res.user_ms = 0.;
146 res.system_ms = 0.;
147 res.real_ms = 0.;
148 res.skipped = 1;
149 c_send_runjob_ok(0, -1);
150 }
151 else
152 run_job(&res);
153 c_end_of_job(&res);
154 return res.errorlevel;
155 }
156 }
157 return -1;
158 }
159
c_wait_server_lines()160 void c_wait_server_lines()
161 {
162 struct msg m;
163 int res;
164
165 while (1)
166 {
167 res = recv_msg(server_socket, &m);
168 if(res == -1)
169 error("Error in wait_server_lines");
170
171 if (res == 0)
172 break;
173 if(res != sizeof(m))
174 error("Error in wait_server_lines 2");
175 if (m.type == LIST_LINE)
176 {
177 char * buffer;
178 buffer = (char *) malloc(m.u.size);
179 recv_bytes(server_socket, buffer, m.u.size);
180 printf("%s", buffer);
181 free(buffer);
182 }
183 }
184 }
185
c_list_jobs()186 void c_list_jobs()
187 {
188 struct msg m;
189
190 m.type = LIST;
191
192 send_msg(server_socket, &m);
193 }
194
195 /* Exits if wrong */
c_check_version()196 void c_check_version()
197 {
198 struct msg m;
199 int res;
200
201 m.type = GET_VERSION;
202 /* Double send, so an old ts will answer for sure at least once */
203 send_msg(server_socket, &m);
204 send_msg(server_socket, &m);
205
206 /* Set up a 2 second timeout to receive the
207 version msg. */
208
209 res = recv_msg(server_socket, &m);
210 if(res == -1)
211 error("Error calling recv_msg in c_check_version");
212 if (m.type != VERSION || m.u.version != PROTOCOL_VERSION)
213 {
214 printf("Wrong server version. Received %i, expecting %i\n",
215 m.u.version, PROTOCOL_VERSION);
216
217 error("Wrong server version. Received %i, expecting %i",
218 m.u.version, PROTOCOL_VERSION);
219 }
220
221 /* Receive also the 2nd send_msg if we got the right version */
222 res = recv_msg(server_socket, &m);
223 if(res == -1)
224 error("Error calling the 2nd recv_msg in c_check_version");
225 }
226
c_show_info()227 void c_show_info()
228 {
229 struct msg m;
230 int res;
231
232 m.type = INFO;
233 m.u.jobid = command_line.jobid;
234
235 send_msg(server_socket, &m);
236
237 while (1)
238 {
239 res = recv_msg(server_socket, &m);
240 if(res == -1)
241 error("Error in wait_server_lines");
242
243 if (res == 0)
244 break;
245 if(res != sizeof(m))
246 error("Error in wait_server_lines 2");
247 if (m.type == INFO_DATA)
248 {
249 char * buffer;
250 enum { DSIZE = 1000 };
251
252 /* We're going to output data using the stdout fd */
253 fflush(stdout);
254 buffer = (char *) malloc(DSIZE);
255 do
256 {
257 res = recv(server_socket, buffer, DSIZE, 0);
258 if (res > 0)
259 write(1, buffer, res);
260 } while(res > 0);
261 free(buffer);
262 }
263 }
264 }
265
c_send_runjob_ok(const char * ofname,int pid)266 void c_send_runjob_ok(const char *ofname, int pid)
267 {
268 struct msg m;
269
270 /* Prepare the message */
271 m.type = RUNJOB_OK;
272 if (ofname) /* ofname == 0, skipped execution */
273 m.u.output.store_output = command_line.store_output;
274 else
275 m.u.output.store_output = 0;
276 m.u.output.pid = pid;
277 if (m.u.output.store_output)
278 m.u.output.ofilename_size = strlen(ofname) + 1;
279 else
280 m.u.output.ofilename_size = 0;
281
282 send_msg(server_socket, &m);
283
284 /* Send the filename */
285 if (command_line.store_output)
286 send_bytes(server_socket, ofname, m.u.output.ofilename_size);
287 }
288
c_end_of_job(const struct Result * res)289 static void c_end_of_job(const struct Result *res)
290 {
291 struct msg m;
292
293 m.type = ENDJOB;
294 m.u.result = *res; /* struct copy */
295
296 send_msg(server_socket, &m);
297 }
298
c_shutdown_server()299 void c_shutdown_server()
300 {
301 struct msg m;
302
303 m.type = KILL_SERVER;
304 send_msg(server_socket, &m);
305 }
306
c_clear_finished()307 void c_clear_finished()
308 {
309 struct msg m;
310
311 m.type = CLEAR_FINISHED;
312 send_msg(server_socket, &m);
313 }
314
get_output_file(int * pid)315 static char * get_output_file(int *pid)
316 {
317 struct msg m;
318 int res;
319 char *string = 0;
320
321 /* Send the request */
322 m.type = ASK_OUTPUT;
323 m.u.jobid = command_line.jobid;
324 send_msg(server_socket, &m);
325
326 /* Receive the answer */
327 res = recv_msg(server_socket, &m);
328 if(res != sizeof(m))
329 error("Error in get_output_file");
330 switch(m.type)
331 {
332 case ANSWER_OUTPUT:
333 if (m.u.output.store_output)
334 {
335 /* Receive the output file name */
336 string = 0;
337 if (m.u.output.ofilename_size > 0)
338 {
339 string = (char *) malloc(m.u.output.ofilename_size);
340 recv_bytes(server_socket, string, m.u.output.ofilename_size);
341 }
342 *pid = m.u.output.pid;
343 return string;
344 }
345 *pid = m.u.output.pid;
346 return 0;
347 /* WILL NOT GO FURTHER */
348 case LIST_LINE: /* Only ONE line accepted */
349 string = (char *) malloc(m.u.size);
350 res = recv_bytes(server_socket, string, m.u.size);
351 if(res != m.u.size)
352 error("Error in get_output_file line size");
353 fprintf(stderr, "Error in the request: %s",
354 string);
355 exit(-1);
356 /* WILL NOT GO FURTHER */
357 default:
358 warning("Wrong internal message in get_output_file line size");
359 }
360 /* This will never be reached */
361 return 0;
362 }
363
c_tail()364 int c_tail()
365 {
366 char *str;
367 int pid;
368 str = get_output_file(&pid);
369 if (str == 0)
370 {
371 fprintf(stderr, "The output is not stored. Cannot tail.\n");
372 exit(-1);
373 }
374
375 c_wait_running_job_send();
376
377 return tail_file(str, 10 /* Last lines to show */);
378 }
379
c_cat()380 int c_cat()
381 {
382 char *str;
383 int pid;
384 str = get_output_file(&pid);
385 if (str == 0)
386 {
387 fprintf(stderr, "The output is not stored. Cannot cat.\n");
388 exit(-1);
389 }
390 c_wait_running_job_send();
391
392 return tail_file(str, -1 /* All the lines */);
393 }
394
c_show_output_file()395 void c_show_output_file()
396 {
397 char *str;
398 int pid;
399 /* This will exit if there is any error */
400 str = get_output_file(&pid);
401 if (str == 0)
402 {
403 fprintf(stderr, "The output is not stored.\n");
404 exit(-1);
405 }
406 printf("%s\n", str);
407 free(str);
408 }
409
c_show_pid()410 void c_show_pid()
411 {
412 int pid;
413 /* This will exit if there is any error */
414 get_output_file(&pid);
415 printf("%i\n", pid);
416 }
417
c_kill_job()418 void c_kill_job()
419 {
420 int pid = 0;
421 /* This will exit if there is any error */
422 get_output_file(&pid);
423
424 if (pid == -1 || pid == 0)
425 {
426 fprintf(stderr, "Error: strange PID received: %i\n", pid);
427 exit(-1);
428 }
429
430 /* Send SIGTERM to the process group, as pid is for process group */
431 kill(-pid, SIGTERM);
432 }
433
c_remove_job()434 void c_remove_job()
435 {
436 struct msg m;
437 int res;
438 char *string = 0;
439
440 /* Send the request */
441 m.type = REMOVEJOB;
442 m.u.jobid = command_line.jobid;
443 send_msg(server_socket, &m);
444
445 /* Receive the answer */
446 res = recv_msg(server_socket, &m);
447 if(res != sizeof(m))
448 error("Error in remove_job");
449 switch(m.type)
450 {
451 case REMOVEJOB_OK:
452 return;
453 /* WILL NOT GO FURTHER */
454 case LIST_LINE: /* Only ONE line accepted */
455 string = (char *) malloc(m.u.size);
456 res = recv_bytes(server_socket, string, m.u.size);
457 fprintf(stderr, "Error in the request: %s",
458 string);
459 exit(-1);
460 /* WILL NOT GO FURTHER */
461 default:
462 warning("Wrong internal message in remove_job");
463 }
464 /* This will never be reached */
465 }
466
c_wait_job_recv()467 int c_wait_job_recv()
468 {
469 struct msg m;
470 int res;
471 char *string = 0;
472
473 /* Receive the answer */
474 res = recv_msg(server_socket, &m);
475 if(res != sizeof(m))
476 error("Error in wait_job");
477 switch(m.type)
478 {
479 case WAITJOB_OK:
480 return m.u.result.errorlevel;
481 /* WILL NOT GO FURTHER */
482 case LIST_LINE: /* Only ONE line accepted */
483 string = (char *) malloc(m.u.size);
484 res = recv_bytes(server_socket, string, m.u.size);
485 if(res != m.u.size)
486 error("Error in wait_job - line size");
487 fprintf(stderr, "Error in the request: %s",
488 string);
489 exit(-1);
490 /* WILL NOT GO FURTHER */
491 default:
492 warning("Wrong internal message in c_wait_job");
493 }
494 /* This will never be reached */
495 return -1;
496 }
497
c_wait_job_send()498 static void c_wait_job_send()
499 {
500 struct msg m;
501
502 /* Send the request */
503 m.type = WAITJOB;
504 m.u.jobid = command_line.jobid;
505 send_msg(server_socket, &m);
506 }
507
c_wait_running_job_send()508 static void c_wait_running_job_send()
509 {
510 struct msg m;
511
512 /* Send the request */
513 m.type = WAIT_RUNNING_JOB;
514 m.u.jobid = command_line.jobid;
515 send_msg(server_socket, &m);
516 }
517
518 /* Returns the errorlevel */
c_wait_job()519 int c_wait_job()
520 {
521 c_wait_job_send();
522 return c_wait_job_recv();
523 }
524
525 /* Returns the errorlevel */
c_wait_running_job()526 int c_wait_running_job()
527 {
528 c_wait_running_job_send();
529 return c_wait_job_recv();
530 }
531
c_send_max_slots(int max_slots)532 void c_send_max_slots(int max_slots)
533 {
534 struct msg m;
535
536 /* Send the request */
537 m.type = SET_MAX_SLOTS;
538 m.u.max_slots = command_line.max_slots;
539 send_msg(server_socket, &m);
540 }
541
c_get_max_slots()542 void c_get_max_slots()
543 {
544 struct msg m;
545 int res;
546
547 /* Send the request */
548 m.type = GET_MAX_SLOTS;
549 m.u.max_slots = command_line.max_slots;
550 send_msg(server_socket, &m);
551
552 /* Receive the answer */
553 res = recv_msg(server_socket, &m);
554 if(res != sizeof(m))
555 error("Error in move_urgent");
556 switch(m.type)
557 {
558 case GET_MAX_SLOTS_OK:
559 printf("%i\n", m.u.max_slots);
560 return;
561 default:
562 warning("Wrong internal message in get_max_slots");
563 }
564 }
565
c_move_urgent()566 void c_move_urgent()
567 {
568 struct msg m;
569 int res;
570 char *string = 0;
571
572 /* Send the request */
573 m.type = URGENT;
574 m.u.jobid = command_line.jobid;
575 send_msg(server_socket, &m);
576
577 /* Receive the answer */
578 res = recv_msg(server_socket, &m);
579 if(res != sizeof(m))
580 error("Error in move_urgent");
581 switch(m.type)
582 {
583 case URGENT_OK:
584 return;
585 /* WILL NOT GO FURTHER */
586 case LIST_LINE: /* Only ONE line accepted */
587 string = (char *) malloc(m.u.size);
588 res = recv_bytes(server_socket, string, m.u.size);
589 if(res != m.u.size)
590 error("Error in move_urgent - line size");
591 fprintf(stderr, "Error in the request: %s",
592 string);
593 exit(-1);
594 /* WILL NOT GO FURTHER */
595 default:
596 warning("Wrong internal message in move_urgent");
597 }
598 /* This will never be reached */
599 return;
600 }
601
c_get_state()602 void c_get_state()
603 {
604 struct msg m;
605 int res;
606 char *string = 0;
607
608 /* Send the request */
609 m.type = GET_STATE;
610 m.u.jobid = command_line.jobid;
611 send_msg(server_socket, &m);
612
613 /* Receive the answer */
614 res = recv_msg(server_socket, &m);
615 if(res != sizeof(m))
616 error("Error in get_state - line size");
617 switch(m.type)
618 {
619 case ANSWER_STATE:
620 printf("%s\n", jstate2string(m.u.state));
621 return;
622 /* WILL NOT GO FURTHER */
623 case LIST_LINE: /* Only ONE line accepted */
624 string = (char *) malloc(m.u.size);
625 res = recv_bytes(server_socket, string, m.u.size);
626 if(res != m.u.size)
627 error("Error in get_state - line size");
628 fprintf(stderr, "Error in the request: %s",
629 string);
630 exit(-1);
631 /* WILL NOT GO FURTHER */
632 default:
633 warning("Wrong internal message in get_state");
634 }
635 /* This will never be reached */
636 return;
637 }
638
c_swap_jobs()639 void c_swap_jobs()
640 {
641 struct msg m;
642 int res;
643 char *string = 0;
644
645 /* Send the request */
646 m.type = SWAP_JOBS;
647 m.u.swap.jobid1 = command_line.jobid;
648 m.u.swap.jobid2 = command_line.jobid2;
649 send_msg(server_socket, &m);
650
651 /* Receive the answer */
652 res = recv_msg(server_socket, &m);
653 if(res != sizeof(m))
654 error("Error in swap_jobs");
655 switch(m.type)
656 {
657 case SWAP_JOBS_OK:
658 return;
659 /* WILL NOT GO FURTHER */
660 case LIST_LINE: /* Only ONE line accepted */
661 string = (char *) malloc(m.u.size);
662 res = recv_bytes(server_socket, string, m.u.size);
663 if(res != m.u.size)
664 error("Error in swap_jobs - line size");
665 fprintf(stderr, "Error in the request: %s",
666 string);
667 exit(-1);
668 /* WILL NOT GO FURTHER */
669 default:
670 warning("Wrong internal message in swap_jobs");
671 }
672 /* This will never be reached */
673 return;
674 }
675