1 /*****************************************************************************\
2 * $Id: pstdout.c,v 1.10 2010-02-10 01:27:44 chu11 Exp $
3 *****************************************************************************
4 * Copyright (C) 2007-2015 Lawrence Livermore National Security, LLC.
5 * Copyright (C) 2007 The Regents of the University of California.
6 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7 * Written by Albert Chu <chu11@llnl.gov>
8 * UCRL-CODE-227589
9 *
10 * This file is part of pstdout, a library used to launch and manage
11 * the standard output of multiple threads. For details, see
12 * http://www.llnl.gov/linux/.
13 *
14 * Pstdout is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by the
16 * Free Software Foundation; either version 3 of the License, or (at your
17 * option) any later version.
18 *
19 * Pstdout is distributed in the hope that it will be useful, but
20 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
21 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 * for more details.
23 *
24 * You should have received a copy of the GNU General Public License along
25 * with Pstdout. If not, see <http://www.gnu.org/licenses/>.
26 \*****************************************************************************/
27
28 /*
29 * Notes:
30 *
31 * Needs to be compiled with -D_REENTRANT
32 */
33
34 #ifdef HAVE_CONFIG_H
35 #include "config.h"
36 #endif /* HAVE_CONFIG_H */
37
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stdint.h>
41 #if STDC_HEADERS
42 #include <string.h>
43 #endif /* STDC_HEADERS */
44 #if HAVE_PTHREAD_H
45 #include <pthread.h>
46 #endif /* HAVE_PTHREAD_H */
47 #include <signal.h>
48 #include <assert.h>
49 #include <errno.h>
50
51 #include "pstdout.h"
52 #include "cbuf.h"
53 #include "fi_hostlist.h"
54 #include "list.h"
55
56 /* max hostrange size is typically 16 bytes
57 *
58 * counting the comma, a 1000 node non-expanded hostrange list could
59 * be (16 + 1) * 1000 = 17000 bytes.
60 *
61 * So we round up from there to the nearest 16K
62 */
63 #define PSTDOUT_BUFLEN 32768
64
65 static char * pstdout_errmsg[] =
66 {
67 "success",
68 "library uninitialized",
69 "incorrect parameters passed in",
70 "out of memory",
71 "unknown internal error",
72 "error number out of range",
73 };
74
75 static uint32_t pstdout_debug_flags = PSTDOUT_DEBUG_NONE;
76 static uint32_t pstdout_output_flags = PSTDOUT_OUTPUT_STDOUT_DEFAULT | PSTDOUT_OUTPUT_STDERR_DEFAULT;
77 static unsigned int pstdout_fanout = PSTDOUT_FANOUT_DEFAULT;
78
79 static pthread_mutex_t pstdout_threadcount_mutex = PTHREAD_MUTEX_INITIALIZER;
80 static pthread_cond_t pstdout_threadcount_cond = PTHREAD_COND_INITIALIZER;
81 static int pstdout_threadcount = 0;
82
83 struct pstdout_thread_data {
84 char *hostname;
85 pthread_t tid;
86 pthread_attr_t attr;
87 int exit_code;
88 Pstdout_Thread pstdout_func;
89 void *arg;
90 };
91
92 struct pstdout_state {
93 uint32_t magic;
94 char *hostname;
95 cbuf_t p_stdout;
96 cbuf_t p_stderr;
97 char *buffer_stdout;
98 char *buffer_stderr;
99 unsigned int buffer_stdout_len;
100 unsigned int buffer_stderr_len;
101 int no_more_external_output;
102 pthread_mutex_t mutex;
103 };
104
105 #define PSTDOUT_STATE_MAGIC 0x76309ab3
106 #define PSTDOUT_STATE_CBUF_MIN 32
107 #define PSTDOUT_STATE_CBUF_MAX 2048
108
109 int pstdout_errnum = PSTDOUT_ERR_SUCCESS;
110
111 struct pstdout_consolidated_data {
112 fi_hostlist_t h;
113 char *output;
114 };
115
116 static List pstdout_consolidated_stdout = NULL;
117 static List pstdout_consolidated_stderr = NULL;
118
119 static pthread_mutex_t pstdout_consolidated_stdout_mutex = PTHREAD_MUTEX_INITIALIZER;
120 static pthread_mutex_t pstdout_consolidated_stderr_mutex = PTHREAD_MUTEX_INITIALIZER;
121
122 static int pstdout_initialized = 0;
123
124 static pthread_mutex_t pstdout_launch_mutex = PTHREAD_MUTEX_INITIALIZER;
125
126 static List pstdout_states = NULL;
127 static pthread_mutex_t pstdout_states_mutex = PTHREAD_MUTEX_INITIALIZER;
128
129 #ifndef HAVE_SIGHANDLER_T
130 typedef void (*sighandler_t)(int);
131 #endif /* HAVE_SIGHANDLER_T */
132
133 static struct pstdout_consolidated_data *
_pstdout_consolidated_data_create(const char * hostname,const char * output)134 _pstdout_consolidated_data_create(const char *hostname, const char *output)
135 {
136 struct pstdout_consolidated_data *cdata = NULL;
137
138 assert(hostname);
139 assert(output);
140
141 if (!(cdata = (struct pstdout_consolidated_data *)malloc(sizeof(struct pstdout_consolidated_data))))
142 {
143 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
144 goto cleanup;
145 }
146 cdata->h = NULL;
147 cdata->output = NULL;
148
149 if (!(cdata->h = fi_hostlist_create(hostname)))
150 {
151 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
152 goto cleanup;
153 }
154
155 if (!(cdata->output = strdup(output)))
156 {
157 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
158 goto cleanup;
159 }
160
161 return cdata;
162
163 cleanup:
164 if (cdata)
165 {
166 if (cdata->h)
167 fi_hostlist_destroy(cdata->h);
168 free(cdata->output);
169 free(cdata);
170 }
171 return NULL;
172 }
173
174 static void
_pstdout_consolidated_data_destroy(void * x)175 _pstdout_consolidated_data_destroy(void *x)
176 {
177 struct pstdout_consolidated_data *cdata;
178
179 assert(x);
180
181 cdata = (struct pstdout_consolidated_data *)x;
182 if (cdata->h)
183 fi_hostlist_destroy(cdata->h);
184 free(cdata->output);
185 free(cdata);
186 }
187
188 static int
_pstdout_consolidated_data_compare(void * x,void * y)189 _pstdout_consolidated_data_compare(void *x, void *y)
190 {
191 struct pstdout_consolidated_data *cdataX;
192 struct pstdout_consolidated_data *cdataY;
193 int h_countX, h_countY;
194
195 assert(x);
196 assert(y);
197
198 cdataX = (struct pstdout_consolidated_data *)x;
199 cdataY = (struct pstdout_consolidated_data *)x;
200
201 assert(cdataX->h);
202 assert(cdataY->h);
203
204 h_countX = fi_hostlist_count(cdataX->h);
205 h_countY = fi_hostlist_count(cdataY->h);
206
207 if (h_countX < h_countY)
208 return -1;
209 if (h_countX > h_countY)
210 return 1;
211 return 0;
212 }
213
214 static int
_pstdout_consolidated_data_find(void * x,void * key)215 _pstdout_consolidated_data_find(void *x, void *key)
216 {
217 struct pstdout_consolidated_data *cdata;
218
219 assert(x);
220 assert(key);
221
222 cdata = (struct pstdout_consolidated_data *)x;
223
224 assert(cdata->output);
225
226 if (!strcmp(cdata->output, (char *)key))
227 return 1;
228 return 0;
229 }
230
231 static int
_pstdout_consolidated_data_delete_all(void * x,void * key)232 _pstdout_consolidated_data_delete_all(void *x, void *key)
233 {
234 return 1;
235 }
236
237 static int
_pstdout_states_delete_pointer(void * x,void * key)238 _pstdout_states_delete_pointer(void *x, void *key)
239 {
240 if (x == key)
241 return 1;
242 return 0;
243 }
244
245 int
pstdout_init(void)246 pstdout_init(void)
247 {
248 if (!pstdout_initialized)
249 {
250 if (!(pstdout_consolidated_stdout = list_create((ListDelF)_pstdout_consolidated_data_destroy)))
251 {
252 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
253 goto cleanup;
254 }
255 if (!(pstdout_consolidated_stderr = list_create((ListDelF)_pstdout_consolidated_data_destroy)))
256 {
257 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
258 goto cleanup;
259 }
260 if (!(pstdout_states = list_create((ListDelF)NULL)))
261 {
262 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
263 goto cleanup;
264 }
265 pstdout_initialized++;
266 }
267
268 return 0;
269
270 cleanup:
271 if (pstdout_consolidated_stdout)
272 list_destroy(pstdout_consolidated_stdout);
273 if (pstdout_consolidated_stderr)
274 list_destroy(pstdout_consolidated_stderr);
275 if (pstdout_states)
276 list_destroy(pstdout_states);
277 return -1;
278 }
279
280 char *
pstdout_strerror(int errnum)281 pstdout_strerror(int errnum)
282 {
283 if (errnum >= PSTDOUT_ERR_SUCCESS && errnum <= PSTDOUT_ERR_ERRNUMRANGE)
284 return pstdout_errmsg[errnum];
285 else
286 return pstdout_errmsg[PSTDOUT_ERR_ERRNUMRANGE];
287 }
288
289 int
pstdout_set_debug_flags(unsigned int debug_flags)290 pstdout_set_debug_flags(unsigned int debug_flags)
291 {
292 int rc, rv = -1;
293
294 if (debug_flags & ~PSTDOUT_DEBUG_MASK)
295 {
296 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
297 return -1;
298 }
299
300 if (debug_flags & PSTDOUT_DEBUG_NONE
301 && debug_flags & PSTDOUT_DEBUG_STANDARD)
302 {
303 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
304 return -1;
305 }
306
307 if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
308 {
309 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
310 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
311 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
312 goto cleanup;
313 }
314
315 pstdout_debug_flags = debug_flags;
316
317
318 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
319 rv = 0;
320 cleanup:
321 if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
322 {
323 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
324 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
325 /* Don't change error code, just move on */
326 }
327 return rv;
328 }
329
330 int
pstdout_get_debug_flags(void)331 pstdout_get_debug_flags(void)
332 {
333 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
334 return pstdout_debug_flags;
335 }
336
337 int
pstdout_set_output_flags(unsigned int output_flags)338 pstdout_set_output_flags(unsigned int output_flags)
339 {
340 int rc, rv = -1;
341
342 if (output_flags & ~PSTDOUT_OUTPUT_MASK)
343 {
344 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
345 return -1;
346 }
347
348 if (((output_flags & PSTDOUT_OUTPUT_STDOUT_DEFAULT)
349 && (output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME))
350 || ((output_flags & PSTDOUT_OUTPUT_STDERR_DEFAULT)
351 && (output_flags & PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME))
352 || (!(output_flags & PSTDOUT_OUTPUT_STDOUT_DEFAULT)
353 && !(output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME))
354 || (!(output_flags & PSTDOUT_OUTPUT_STDERR_DEFAULT)
355 && !(output_flags & PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME)))
356 {
357 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
358 return -1;
359 }
360
361 if (((output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME)
362 && (output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE))
363 || ((output_flags & PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME)
364 && (output_flags & PSTDOUT_OUTPUT_STDERR_CONSOLIDATE)))
365 {
366 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
367 return -1;
368 }
369
370 if (((output_flags & PSTDOUT_OUTPUT_BUFFER_STDOUT)
371 && (output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE))
372 || ((output_flags & PSTDOUT_OUTPUT_BUFFER_STDERR)
373 && (output_flags & PSTDOUT_OUTPUT_STDERR_CONSOLIDATE)))
374 {
375 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
376 return -1;
377 }
378
379 if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
380 {
381 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
382 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
383 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
384 goto cleanup;
385 }
386
387 pstdout_output_flags = output_flags;
388
389 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
390 rv = 0;
391 cleanup:
392 if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
393 {
394 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
395 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
396 /* Don't change error code, just move on */
397 }
398 return rv;
399 }
400
401 int
pstdout_get_output_flags(void)402 pstdout_get_output_flags(void)
403 {
404 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
405 return pstdout_output_flags;
406 }
407
408 int
pstdout_set_fanout(unsigned int fanout)409 pstdout_set_fanout(unsigned int fanout)
410 {
411 int rc, rv = -1;
412
413 if (!(fanout >= PSTDOUT_FANOUT_MIN && fanout <= PSTDOUT_FANOUT_MAX))
414 {
415 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
416 return -1;
417 }
418
419 if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
420 {
421 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
422 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
423 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
424 goto cleanup;
425 }
426
427 pstdout_fanout = fanout;
428
429 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
430 rv = 0;
431 cleanup:
432 if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
433 {
434 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
435 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
436 /* Don't change error code, just move on */
437 }
438
439 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
440 return rv;
441 }
442
443 int
pstdout_get_fanout(void)444 pstdout_get_fanout(void)
445 {
446 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
447 return pstdout_fanout;
448 }
449
450 int
pstdout_hostnames_count(const char * hostnames)451 pstdout_hostnames_count(const char *hostnames)
452 {
453 fi_hostlist_t h = NULL;
454 int count = 0;
455 int rv = -1;
456
457 if (!hostnames)
458 {
459 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
460 return -1;
461 }
462
463 if (!(h = fi_hostlist_create(NULL)))
464 {
465 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
466 goto cleanup;
467 }
468
469 if (!(count = fi_hostlist_push(h, hostnames)))
470 {
471 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
472 fprintf(stderr, "valid hostnames count == 0\n");
473 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
474 goto cleanup;
475 }
476
477 rv = count;
478 cleanup:
479 if (h)
480 fi_hostlist_destroy(h);
481 return rv;
482 }
483
484 static int
_pstdout_print(pstdout_state_t pstate,int internal_to_pstdout,FILE * stream,const char * format,va_list ap)485 _pstdout_print(pstdout_state_t pstate,
486 int internal_to_pstdout,
487 FILE *stream,
488 const char *format,
489 va_list ap)
490 {
491 char *buf = NULL;
492 char *linebuf = NULL;
493 size_t buflen = PSTDOUT_BUFLEN;
494 size_t linebuflen = PSTDOUT_BUFLEN;
495 cbuf_t whichcbuf;
496 uint32_t whichdefaultmask;
497 uint32_t whichprependmask;
498 uint32_t whichbuffermask;
499 uint32_t whichconsolidatemask;
500 char **whichbuffer;
501 unsigned int *whichbufferlen;
502 int wlen;
503 int linelen;
504 int pstate_mutex_locked = 0;
505 int rc, rv = -1;
506
507 assert(pstate);
508 assert(pstate->magic == PSTDOUT_STATE_MAGIC);
509 assert(pstate->p_stdout);
510 assert(pstate->p_stderr);
511 assert(stream);
512 assert(stream == stdout || stream == stderr);
513 assert(format);
514
515 if (stream == stdout)
516 {
517 whichcbuf = pstate->p_stdout;
518 whichdefaultmask = PSTDOUT_OUTPUT_STDOUT_DEFAULT;
519 whichprependmask = PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME;
520 whichbuffermask = PSTDOUT_OUTPUT_BUFFER_STDOUT;
521 whichconsolidatemask = PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE;
522 whichbuffer = &(pstate->buffer_stdout);
523 whichbufferlen = &(pstate->buffer_stdout_len);
524 }
525 else
526 {
527 whichcbuf = pstate->p_stderr;
528 whichdefaultmask = PSTDOUT_OUTPUT_STDERR_DEFAULT;
529 whichprependmask = PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME;
530 whichbuffermask = PSTDOUT_OUTPUT_BUFFER_STDERR;
531 whichconsolidatemask = PSTDOUT_OUTPUT_STDERR_CONSOLIDATE;
532 whichbuffer = &(pstate->buffer_stderr);
533 whichbufferlen = &(pstate->buffer_stderr_len);
534 }
535
536 while (1)
537 {
538 va_list vacpy;
539
540 if (!(buf = (char *)realloc(buf, buflen)))
541 {
542 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
543 goto cleanup;
544 }
545 memset(buf, '\0', PSTDOUT_BUFLEN);
546 va_copy(vacpy, ap);
547 wlen = vsnprintf(buf, buflen, format, vacpy);
548 va_end(vacpy);
549 if (wlen < buflen)
550 break;
551 buflen += PSTDOUT_BUFLEN;
552 }
553
554 if ((rc = pthread_mutex_lock(&(pstate->mutex))))
555 {
556 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
557 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
558 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
559 goto cleanup;
560 }
561 pstate_mutex_locked++;
562
563 /* Protect from racing output when we are in a Ctrl+C flushing
564 * buffered output situation
565 */
566 if (!internal_to_pstdout && pstate->no_more_external_output)
567 goto cleanup;
568
569 if (cbuf_write(whichcbuf, buf, wlen, NULL) < 0)
570 {
571 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
572 fprintf(stderr, "cbuf_write: %s\n", strerror(errno));
573 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
574 goto cleanup;
575 }
576
577 while (1)
578 {
579 if (!(linebuf = (char *)realloc(linebuf, linebuflen)))
580 {
581 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
582 goto cleanup;
583 }
584 memset(linebuf, '\0', PSTDOUT_BUFLEN);
585
586 while ((linelen = cbuf_read_line (whichcbuf, linebuf, linebuflen, 1)) > 0)
587 {
588 if (linelen >= linebuflen)
589 break;
590
591 if (!pstate->hostname
592 || ((pstdout_output_flags & whichdefaultmask)
593 && !(pstdout_output_flags & whichbuffermask)
594 && !(pstdout_output_flags & whichconsolidatemask)))
595
596 {
597 rv = fprintf(stream, "%s", linebuf);
598 fflush(stream);
599 }
600 else if (pstdout_output_flags & whichprependmask
601 && !(pstdout_output_flags & whichbuffermask)
602 && !(pstdout_output_flags & whichconsolidatemask))
603 {
604 rv = fprintf(stream, "%s: %s", pstate->hostname, linebuf);
605 fflush(stream);
606 }
607 else if (((pstdout_output_flags & whichdefaultmask)
608 && (pstdout_output_flags & whichbuffermask))
609 || (pstdout_output_flags & whichconsolidatemask))
610 {
611 if (!(*whichbuffer = (char *)realloc(*whichbuffer, *whichbufferlen + linelen)))
612 {
613 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
614 goto cleanup;
615 }
616
617 /* Don't use snprintf, it will truncate b/c "snprintf and
618 vsnprintf do not write more than size bytes (including
619 the trailing '\0'). " */
620 memcpy(*whichbuffer + *whichbufferlen, linebuf, linelen);
621 *whichbufferlen += linelen;
622 rv = linelen;
623 }
624 else if ((pstdout_output_flags & whichprependmask)
625 && (pstdout_output_flags & whichbuffermask))
626 {
627 unsigned int hostname_len;
628 unsigned int extra_len;
629
630 /* + 2 is for the ": " */
631 hostname_len = strlen(pstate->hostname);
632 extra_len = hostname_len + 2;
633 if (!(*whichbuffer = (char *)realloc(*whichbuffer, *whichbufferlen + linelen + extra_len)))
634 {
635 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
636 goto cleanup;
637 }
638
639 /* Don't use snprintf, it will truncate b/c "snprintf and
640 vsnprintf do not write more than size bytes (including
641 the trailing '\0'). " */
642 memcpy(*whichbuffer + *whichbufferlen,
643 pstate->hostname,
644 hostname_len);
645 memcpy(*whichbuffer + *whichbufferlen + hostname_len,
646 ": ",
647 2);
648 memcpy(*whichbuffer + *whichbufferlen + hostname_len + 2,
649 linebuf,
650 linelen);
651 *whichbufferlen += linelen + extra_len;
652 rv = linelen + extra_len;
653 }
654 else
655 {
656 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
657 return -1;
658 }
659 }
660
661 if (linelen < 0)
662 {
663 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
664 fprintf(stderr, "cbuf_read_line: %s\n", strerror(errno));
665 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
666 goto cleanup;
667 }
668
669 if (!linelen)
670 break;
671
672 linebuflen += PSTDOUT_BUFLEN;
673 }
674
675 if (rv < 0)
676 rv = 0;
677
678 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
679 cleanup:
680 if (pstate_mutex_locked)
681 {
682 if ((rc = pthread_mutex_unlock(&(pstate->mutex))))
683 {
684 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
685 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
686 /* Don't change error code, just move on */
687 }
688 }
689 free(buf);
690 free(linebuf);
691 return rv;
692 }
693
694 static void
_pstdout_print_wrapper(pstdout_state_t pstate,int internal_to_pstdout,FILE * stream,const char * format,...)695 _pstdout_print_wrapper(pstdout_state_t pstate,
696 int internal_to_pstdout,
697 FILE *stream,
698 const char *format, ...)
699 {
700 va_list ap;
701
702 assert(pstate);
703 assert(pstate->magic == PSTDOUT_STATE_MAGIC);
704 assert(stream);
705 assert(stream == stdout || stream == stderr);
706 assert(format);
707
708 va_start(ap, format);
709 _pstdout_print(pstate, internal_to_pstdout, stderr, format, ap);
710 va_end(ap);
711 }
712
713 int
pstdout_printf(pstdout_state_t pstate,const char * format,...)714 pstdout_printf(pstdout_state_t pstate, const char *format, ...)
715 {
716 va_list ap;
717 int rv;
718
719 if (!pstdout_initialized)
720 {
721 pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
722 return -1;
723 }
724
725 if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
726 {
727 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
728 return -1;
729 }
730
731 if (!format)
732 {
733 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
734 return -1;
735 }
736
737 va_start(ap, format);
738 rv = _pstdout_print(pstate, 0, stdout, format, ap);
739 va_end(ap);
740 return rv;
741 }
742
743 int
pstdout_vprintf(pstdout_state_t pstate,const char * format,va_list ap)744 pstdout_vprintf(pstdout_state_t pstate, const char *format, va_list ap)
745 {
746 int rv;
747
748 if (!pstdout_initialized)
749 {
750 pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
751 return -1;
752 }
753
754 if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
755 {
756 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
757 return -1;
758 }
759
760 if (!format)
761 {
762 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
763 return -1;
764 }
765
766 /* achu: va_list is defined by C standard as an object type, not
767 * necessarily pointer type. So can't do NULL on ap type. No known
768 * portable macro to test for validity of input.
769 */
770
771 rv = _pstdout_print(pstate, 0, stdout, format, ap);
772 return rv;
773 }
774
775 int
pstdout_fprintf(pstdout_state_t pstate,FILE * stream,const char * format,...)776 pstdout_fprintf(pstdout_state_t pstate, FILE *stream, const char *format, ...)
777 {
778 va_list ap;
779 int rv;
780
781 if (!pstdout_initialized)
782 {
783 pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
784 return -1;
785 }
786
787 if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
788 {
789 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
790 return -1;
791 }
792
793 if (!stream || !format)
794 {
795 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
796 return -1;
797 }
798
799 if (stream != stdout && stream != stderr)
800 {
801 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
802 return -1;
803 }
804
805 va_start(ap, format);
806 rv = _pstdout_print(pstate, 0, stream, format, ap);
807 va_end(ap);
808 return rv;
809 }
810
811 int
pstdout_vfprintf(pstdout_state_t pstate,FILE * stream,const char * format,va_list ap)812 pstdout_vfprintf(pstdout_state_t pstate, FILE *stream, const char *format,
813 va_list ap)
814 {
815 int rv;
816
817 if (!pstdout_initialized)
818 {
819 pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
820 return -1;
821 }
822
823 if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
824 {
825 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
826 return -1;
827 }
828
829 if (!stream || !format)
830 {
831 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
832 return -1;
833 }
834
835 if (stream != stdout && stream != stderr)
836 {
837 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
838 return -1;
839 }
840
841 rv = _pstdout_print(pstate, 0, stream, format, ap);
842 return rv;
843 }
844
845 void
pstdout_perror(pstdout_state_t pstate,const char * s)846 pstdout_perror(pstdout_state_t pstate, const char *s)
847 {
848 if (!pstdout_initialized)
849 {
850 pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
851 return;
852 }
853
854 if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
855 {
856 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
857 return;
858 }
859
860 if (!s)
861 {
862 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
863 return;
864 }
865
866 _pstdout_print_wrapper(pstate, 0, stderr, "%s: %s\n", s, strerror(errno));
867 }
868
869 static int
_pstdout_state_init(pstdout_state_t pstate,const char * hostname)870 _pstdout_state_init(pstdout_state_t pstate, const char *hostname)
871 {
872 int rc;
873
874 assert(pstate);
875
876 memset(pstate, '\0', sizeof(struct pstdout_state));
877 pstate->magic = PSTDOUT_STATE_MAGIC;
878 pstate->hostname = (char *)hostname;
879
880 if (!(pstate->p_stdout = cbuf_create(PSTDOUT_STATE_CBUF_MIN, PSTDOUT_STATE_CBUF_MAX)))
881 {
882 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
883 fprintf(stderr, "cbuf_create: %s\n", strerror(errno));
884 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
885 return -1;
886 }
887 if (!(pstate->p_stderr = cbuf_create(PSTDOUT_STATE_CBUF_MIN, PSTDOUT_STATE_CBUF_MAX)))
888 {
889 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
890 fprintf(stderr, "cbuf_create: %s\n", strerror(errno));
891 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
892 return -1;
893 }
894 pstate->buffer_stdout = NULL;
895 pstate->buffer_stdout_len = 0;
896 pstate->buffer_stderr = NULL;
897 pstate->buffer_stderr_len = 0;
898 pstate->no_more_external_output = 0;
899
900 if ((rc = pthread_mutex_init(&(pstate->mutex), NULL)))
901 {
902 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
903 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
904 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
905 return -1;
906 }
907 return 0;
908 }
909
910 static int
_pstdout_output_buffer_data(pstdout_state_t pstate,FILE * stream,char ** whichbuffer,unsigned int * whichbufferlen,uint32_t whichprependmask,uint32_t whichbuffermask,uint32_t whichconsolidatemask,List whichconsolidatedlist,pthread_mutex_t * whichconsolidatedmutex)911 _pstdout_output_buffer_data(pstdout_state_t pstate,
912 FILE *stream,
913 char **whichbuffer,
914 unsigned int *whichbufferlen,
915 uint32_t whichprependmask,
916 uint32_t whichbuffermask,
917 uint32_t whichconsolidatemask,
918 List whichconsolidatedlist,
919 pthread_mutex_t *whichconsolidatedmutex)
920 {
921 assert(pstate);
922 assert(pstate->magic == PSTDOUT_STATE_MAGIC);
923 assert(pstate->p_stdout);
924 assert(pstate->p_stderr);
925 assert(stream);
926 assert(stream == stdout || stream == stderr);
927 assert(whichbuffer);
928 assert(whichbufferlen);
929 assert(whichprependmask == PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME
930 || whichprependmask == PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME);
931 assert(whichbuffermask == PSTDOUT_OUTPUT_BUFFER_STDOUT
932 || whichbuffermask == PSTDOUT_OUTPUT_BUFFER_STDERR);
933 assert(whichconsolidatemask == PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE
934 || whichconsolidatemask == PSTDOUT_OUTPUT_STDERR_CONSOLIDATE);
935 assert(whichconsolidatedlist);
936 assert(whichconsolidatedmutex);
937
938 if ((*whichbuffer && *whichbufferlen)
939 && (pstdout_output_flags & whichbuffermask
940 || pstdout_output_flags & whichconsolidatemask))
941 {
942 /* Need to write a '\0' */
943 if (!(*whichbuffer = (char *)realloc(*whichbuffer, *whichbufferlen + 1)))
944 {
945 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
946 goto cleanup;
947 }
948
949 (*whichbuffer)[*whichbufferlen] = '\0';
950 *whichbufferlen += 1;
951
952 if (pstdout_output_flags & whichbuffermask)
953 {
954 if (!(pstdout_output_flags & whichprependmask))
955 {
956 fprintf(stream, "----------------\n");
957 fprintf(stream, "%s\n", pstate->hostname);
958 fprintf(stream, "----------------\n");
959 }
960 fprintf(stream, "%s", *whichbuffer);
961 fflush(stream);
962 }
963 else
964 {
965 struct pstdout_consolidated_data *cdata;
966 int rc;
967
968 if ((rc = pthread_mutex_lock(whichconsolidatedmutex)))
969 {
970 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
971 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
972 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
973 goto cleanup;
974 }
975
976 if (!(cdata = list_find_first(whichconsolidatedlist, _pstdout_consolidated_data_find, *whichbuffer)))
977 {
978 if (!(cdata = _pstdout_consolidated_data_create(pstate->hostname, *whichbuffer)))
979 goto cleanup;
980
981 if (!list_append(whichconsolidatedlist, cdata))
982 {
983 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
984 fprintf(stderr, "list_append: %s\n", strerror(errno));
985 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
986 _pstdout_consolidated_data_destroy(cdata);
987 goto cleanup;
988 }
989 }
990 else
991 {
992 if (!fi_hostlist_push(cdata->h, pstate->hostname))
993 {
994 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
995 fprintf(stderr, "fi_hostlist_push: %s\n", strerror(errno));
996 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
997 goto cleanup;
998 }
999 }
1000
1001 if ((rc = pthread_mutex_unlock(whichconsolidatedmutex)))
1002 {
1003 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1004 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1005 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1006 goto cleanup;
1007 }
1008
1009 }
1010 }
1011
1012 return 0;
1013
1014 cleanup:
1015 return -1;
1016 }
1017
1018 static int
_pstdout_output_finish(pstdout_state_t pstate)1019 _pstdout_output_finish(pstdout_state_t pstate)
1020 {
1021 int pstate_mutex_locked = 0;
1022 int rc, rv = -1;
1023
1024 assert(pstate);
1025 assert(pstate->magic == PSTDOUT_STATE_MAGIC);
1026 assert(pstate->p_stdout);
1027 assert(pstate->p_stderr);
1028
1029 if ((rc = pthread_mutex_lock(&(pstate->mutex))))
1030 {
1031 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1032 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1033 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1034 goto cleanup;
1035 }
1036 pstate_mutex_locked++;
1037
1038 /* If there is remaining junk in the cbufs, write a "\n" to it so we
1039 * finish off the line and get it flushed out.
1040 */
1041 if (!cbuf_is_empty(pstate->p_stdout))
1042 _pstdout_print_wrapper(pstate, 1, stdout, "\n");
1043
1044 if (!cbuf_is_empty(pstate->p_stderr))
1045 _pstdout_print_wrapper(pstate, 1, stderr, "\n");
1046
1047 if (_pstdout_output_buffer_data(pstate,
1048 stdout,
1049 &(pstate->buffer_stdout),
1050 &(pstate->buffer_stdout_len),
1051 PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME,
1052 PSTDOUT_OUTPUT_BUFFER_STDOUT,
1053 PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE,
1054 pstdout_consolidated_stdout,
1055 &pstdout_consolidated_stdout_mutex) < 0)
1056 goto cleanup;
1057
1058 if (_pstdout_output_buffer_data(pstate,
1059 stderr,
1060 &(pstate->buffer_stderr),
1061 &(pstate->buffer_stderr_len),
1062 PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME,
1063 PSTDOUT_OUTPUT_BUFFER_STDERR,
1064 PSTDOUT_OUTPUT_STDERR_CONSOLIDATE,
1065 pstdout_consolidated_stderr,
1066 &pstdout_consolidated_stderr_mutex) < 0)
1067 goto cleanup;
1068
1069 /* Only output from internal to pstdout is allowed */
1070 pstate->no_more_external_output = 1;
1071 rv = 0;
1072
1073 cleanup:
1074 if (pstate_mutex_locked)
1075 {
1076 if ((rc = pthread_mutex_unlock(&(pstate->mutex))))
1077 {
1078 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1079 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1080 /* Don't change error code, just move on */
1081 }
1082 }
1083 return rv;
1084 }
1085
1086 static void
_pstdout_state_cleanup(pstdout_state_t pstate)1087 _pstdout_state_cleanup(pstdout_state_t pstate)
1088 {
1089 assert(pstate);
1090 assert(pstate->magic == PSTDOUT_STATE_MAGIC);
1091
1092 if (pstate->p_stdout)
1093 cbuf_destroy(pstate->p_stdout);
1094 if (pstate->p_stderr)
1095 cbuf_destroy(pstate->p_stderr);
1096 free(pstate->buffer_stdout);
1097 free(pstate->buffer_stderr);
1098 memset(pstate, '\0', sizeof(struct pstdout_state));
1099 }
1100
1101 static void *
_pstdout_func_entry(void * arg)1102 _pstdout_func_entry(void *arg)
1103 {
1104 struct pstdout_thread_data *tdata = NULL;
1105 struct pstdout_state pstate;
1106 int rc;
1107
1108 tdata = (struct pstdout_thread_data *)arg;
1109
1110 if (_pstdout_state_init(&pstate, tdata->hostname) < 0)
1111 goto cleanup;
1112
1113 if ((rc = pthread_mutex_lock(&pstdout_states_mutex)))
1114 {
1115 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1116 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1117 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1118 goto cleanup;
1119 }
1120
1121 if (!list_append(pstdout_states, &pstate))
1122 {
1123 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1124 fprintf(stderr, "list_append: %s\n", strerror(errno));
1125 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1126 pthread_mutex_unlock(&pstdout_states_mutex);
1127 goto cleanup;
1128 }
1129
1130 if ((rc = pthread_mutex_unlock(&pstdout_states_mutex)))
1131 {
1132 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1133 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1134 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1135 goto cleanup;
1136 }
1137
1138 tdata->exit_code = (tdata->pstdout_func)(&pstate, tdata->hostname, tdata->arg);
1139
1140 if (_pstdout_output_finish(&pstate) < 0)
1141 goto cleanup;
1142
1143 cleanup:
1144 pthread_mutex_lock(&pstdout_states_mutex);
1145 list_delete_all(pstdout_states, _pstdout_states_delete_pointer, &pstate);
1146 pthread_mutex_unlock(&pstdout_states_mutex);
1147 _pstdout_state_cleanup(&pstate);
1148 pthread_mutex_lock(&pstdout_threadcount_mutex);
1149 pstdout_threadcount--;
1150 pthread_cond_signal(&pstdout_threadcount_cond);
1151 pthread_mutex_unlock(&pstdout_threadcount_mutex);
1152 return NULL;
1153 }
1154
1155 static int
_pstdout_output_consolidated(FILE * stream,List whichconsolidatedlist,pthread_mutex_t * whichconsolidatedmutex)1156 _pstdout_output_consolidated(FILE *stream,
1157 List whichconsolidatedlist,
1158 pthread_mutex_t *whichconsolidatedmutex)
1159 {
1160 struct pstdout_consolidated_data *cdata;
1161 ListIterator itr = NULL;
1162 int mutex_locked = 0;
1163 int rc, rv = -1;
1164
1165 assert(stream);
1166 assert(stream == stdout || stream == stderr);
1167 assert(whichconsolidatedlist);
1168 assert(whichconsolidatedmutex);
1169
1170 if ((rc = pthread_mutex_lock(whichconsolidatedmutex)))
1171 {
1172 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1173 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1174 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1175 goto cleanup;
1176 }
1177 mutex_locked++;
1178
1179 list_sort(whichconsolidatedlist, _pstdout_consolidated_data_compare);
1180
1181 if (!(itr = list_iterator_create (whichconsolidatedlist)))
1182 {
1183 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1184 goto cleanup;
1185 }
1186
1187 while ((cdata = list_next(itr)))
1188 {
1189 char hbuf[PSTDOUT_BUFLEN + 1];
1190
1191 memset(hbuf, '\0', PSTDOUT_BUFLEN + 1);
1192 fi_hostlist_sort(cdata->h);
1193 if (fi_hostlist_ranged_string(cdata->h, PSTDOUT_BUFLEN, hbuf) < 0)
1194 {
1195 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1196 fprintf(stderr, "fi_hostlist_ranged_string: %s\n", strerror(errno));
1197 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1198 goto cleanup;
1199 }
1200
1201 fprintf(stream, "----------------\n");
1202 fprintf(stream, "%s\n", hbuf);
1203 fprintf(stream, "----------------\n");
1204 fprintf(stream, "%s", cdata->output);
1205 }
1206
1207 rv = 0;
1208 cleanup:
1209 if (mutex_locked)
1210 {
1211 if ((rc = pthread_mutex_unlock(whichconsolidatedmutex)))
1212 {
1213 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1214 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1215 /* Don't change error code, just move on */
1216 }
1217 }
1218 if (itr)
1219 list_iterator_destroy(itr);
1220 return rv;
1221 }
1222
1223 static int
_pstdout_output_consolidated_finish(void)1224 _pstdout_output_consolidated_finish(void)
1225 {
1226 /* Output consolidated data */
1227 if (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE)
1228 {
1229 if (_pstdout_output_consolidated(stdout,
1230 pstdout_consolidated_stdout,
1231 &pstdout_consolidated_stdout_mutex) < 0)
1232 goto cleanup;
1233 }
1234
1235 if (pstdout_output_flags & PSTDOUT_OUTPUT_STDERR_CONSOLIDATE)
1236 {
1237 if (_pstdout_output_consolidated(stderr,
1238 pstdout_consolidated_stderr,
1239 &pstdout_consolidated_stderr_mutex) < 0)
1240 goto cleanup;
1241 }
1242
1243 return 0;
1244
1245 cleanup:
1246 return -1;
1247 }
1248
1249 static int
_pstdout_sigint_finish_output(void * x,void * arg)1250 _pstdout_sigint_finish_output(void *x, void *arg)
1251 {
1252 struct pstdout_state *pstate;
1253 assert(x);
1254
1255 pstate = (struct pstdout_state *)x;
1256
1257 if (_pstdout_output_finish(pstate) < 0)
1258 return -1;
1259
1260 /* The no_more_external_output flag set in _pstdout_output_finish()
1261 * protects from extraneous extra output from other threads after
1262 * this output.
1263 */
1264
1265 if (pstate->hostname
1266 && (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME)
1267 && !(pstdout_output_flags & PSTDOUT_OUTPUT_BUFFER_STDOUT)
1268 && !(pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE))
1269 {
1270 fprintf(stdout, "%s: exiting session\n", pstate->hostname);
1271 fflush(stdout);
1272 }
1273
1274 /* Note: there isn't a race with any remaining threads and the below
1275 * output, b/c the _pstdout_output_finish() outputs buffered data or
1276 * puts the remaining data into the consolidated output list. Any extra
1277 * output from the user cannot be outputted.
1278 */
1279
1280 if (pstate->hostname
1281 && (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME)
1282 && (pstdout_output_flags & PSTDOUT_OUTPUT_BUFFER_STDOUT))
1283 {
1284 if ((pstate->buffer_stdout && pstate->buffer_stdout_len)
1285 || (pstate->buffer_stderr && pstate->buffer_stderr_len))
1286 fprintf(stdout, "%s: exiting session: current output flushed\n", pstate->hostname);
1287 else
1288 fprintf(stdout, "%s: exiting session\n", pstate->hostname);
1289 fflush(stdout);
1290 }
1291
1292 if (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE)
1293 {
1294 if ((pstate->buffer_stdout && pstate->buffer_stdout_len)
1295 || (pstate->buffer_stderr && pstate->buffer_stderr_len))
1296 fprintf(stdout, "%s: exiting session: current output consolidated\n", pstate->hostname);
1297 else
1298 fprintf(stdout, "%s: exiting session\n", pstate->hostname);
1299 }
1300
1301 return 0;
1302 }
1303
1304 void
_pstdout_sigint(int s)1305 _pstdout_sigint(int s)
1306 {
1307 int rc;
1308
1309 /* This is a last ditch effort, so no need to worry if we don't get
1310 * a lock or get an error or whatever.
1311 */
1312 if ((rc = pthread_mutex_lock(&pstdout_states_mutex)))
1313 {
1314 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1315 fprintf(stderr, "fi_hostlist_ranged_string: %s\n", strerror(rc));
1316 }
1317
1318 if (list_for_each(pstdout_states, _pstdout_sigint_finish_output, NULL) < 0)
1319 {
1320 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1321 fprintf(stderr, "list_for_each: %s\n", strerror(errno));
1322 }
1323
1324 _pstdout_output_consolidated_finish();
1325 exit (EXIT_FAILURE);
1326 }
1327
1328 int
pstdout_launch(const char * hostnames,Pstdout_Thread pstdout_func,void * arg)1329 pstdout_launch(const char *hostnames, Pstdout_Thread pstdout_func, void *arg)
1330 {
1331 struct pstdout_thread_data **tdata = NULL;
1332 struct pstdout_state pstate;
1333 unsigned int pstate_init = 0;
1334 fi_hostlist_iterator_t hitr = NULL;
1335 fi_hostlist_t h = NULL;
1336 int h_count = 0;
1337 char *host = NULL;
1338 int exit_code = -1;
1339 sighandler_t sighandler_save = NULL;
1340 int sighandler_set = 0;
1341 int rc;
1342 int i;
1343
1344 if (!pstdout_initialized)
1345 {
1346 pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
1347 return -1;
1348 }
1349
1350 if (!pstdout_func)
1351 {
1352 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1353 return -1;
1354 }
1355
1356 if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
1357 {
1358 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1359 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1360 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1361 goto cleanup;
1362 }
1363
1364 /* Special case */
1365 if (!hostnames)
1366 {
1367 if (_pstdout_state_init(&pstate, NULL) < 0)
1368 goto cleanup;
1369 pstate_init++;
1370
1371 exit_code = pstdout_func(&pstate, NULL, arg);
1372 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
1373 goto cleanup;
1374 }
1375
1376 if (!(h = fi_hostlist_create(hostnames)))
1377 {
1378 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1379 goto cleanup;
1380 }
1381 h_count = fi_hostlist_count(h);
1382
1383 /* Sanity check */
1384 if (h_count <= 0)
1385 {
1386 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1387 fprintf(stderr, "h_count = %d\n", h_count);
1388 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1389 goto cleanup;
1390 }
1391
1392 /* Special case */
1393 if (h_count == 1)
1394 {
1395 if (_pstdout_state_init(&pstate, hostnames) < 0)
1396 goto cleanup;
1397 pstate_init++;
1398
1399 exit_code = pstdout_func(&pstate, hostnames, arg);
1400 pstdout_errnum = PSTDOUT_ERR_SUCCESS;
1401 goto cleanup;
1402 }
1403
1404 if ((sighandler_save = signal(SIGINT, _pstdout_sigint)) == SIG_ERR)
1405 {
1406 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1407 fprintf(stderr, "signal\n");
1408 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1409 goto cleanup;
1410 }
1411 sighandler_set++;
1412
1413 if (!(hitr = fi_hostlist_iterator_create(h)))
1414 {
1415 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1416 goto cleanup;
1417 }
1418
1419 if (!(tdata = (struct pstdout_thread_data **)malloc(sizeof(struct pstdout_thread_data *) * h_count)))
1420 {
1421 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1422 goto cleanup;
1423 }
1424 memset(tdata, '\0', sizeof(struct pstdout_thread_data *) * h_count);
1425
1426 i = 0;
1427 while ((host = fi_hostlist_next(hitr)))
1428 {
1429 if (!(tdata[i] = (struct pstdout_thread_data *)malloc(sizeof(struct pstdout_thread_data))))
1430 {
1431 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1432 goto cleanup;
1433 }
1434 memset(tdata[i], '\0', sizeof(struct pstdout_thread_data));
1435
1436 if (!(tdata[i]->hostname = strdup(host)))
1437 {
1438 pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1439 goto cleanup;
1440 }
1441 tdata[i]->pstdout_func = pstdout_func;
1442 tdata[i]->arg = arg;
1443
1444 if ((rc = pthread_attr_init(&(tdata[i]->attr))))
1445 {
1446 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1447 fprintf(stderr, "pthread_attr_init: %s\n", strerror(rc));
1448 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1449 goto cleanup;
1450 }
1451
1452 if ((rc = pthread_attr_setdetachstate(&(tdata[i]->attr), PTHREAD_CREATE_DETACHED)))
1453 {
1454 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1455 fprintf(stderr, "pthread_attr_setdetachstate: %s\n", strerror(rc));
1456 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1457 goto cleanup;
1458 }
1459
1460 free(host);
1461 i++;
1462 }
1463 host = NULL;
1464
1465 fi_hostlist_iterator_destroy(hitr);
1466 hitr = NULL;
1467
1468 fi_hostlist_destroy(h);
1469 h = NULL;
1470
1471 /* Launch threads up to fanout */
1472 for (i = 0; i < h_count; i++)
1473 {
1474 if ((rc = pthread_mutex_lock(&pstdout_threadcount_mutex)))
1475 {
1476 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1477 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1478 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1479 goto cleanup;
1480 }
1481
1482 if (pstdout_threadcount == pstdout_fanout)
1483 {
1484 if ((rc = pthread_cond_wait(&pstdout_threadcount_cond, &pstdout_threadcount_mutex)))
1485 {
1486 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1487 fprintf(stderr, "pthread_cond_wait: %s\n", strerror(rc));
1488 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1489 goto cleanup;
1490 }
1491 }
1492
1493 if ((rc = pthread_create(&(tdata[i]->tid),
1494 &(tdata[i]->attr),
1495 _pstdout_func_entry,
1496 (void *) tdata[i])))
1497 {
1498 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1499 fprintf(stderr, "pthread_create: %s\n", strerror(rc));
1500 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1501 goto cleanup;
1502 }
1503
1504 pstdout_threadcount++;
1505
1506 if ((rc = pthread_mutex_unlock(&pstdout_threadcount_mutex)))
1507 {
1508 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1509 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1510 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1511 goto cleanup;
1512 }
1513 }
1514
1515 /* Wait for Threads to finish */
1516
1517 if ((rc = pthread_mutex_lock(&pstdout_threadcount_mutex)))
1518 {
1519 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1520 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1521 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1522 goto cleanup;
1523 }
1524
1525 while (pstdout_threadcount > 0)
1526 {
1527 if ((rc = pthread_cond_wait(&pstdout_threadcount_cond, &pstdout_threadcount_mutex)))
1528 {
1529 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1530 fprintf(stderr, "pthread_cond_wait: %s\n", strerror(rc));
1531 pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1532 goto cleanup;
1533 }
1534 }
1535
1536 if (_pstdout_output_consolidated_finish() < 0)
1537 goto cleanup;
1538
1539 /* Determine exit code */
1540 exit_code = 0;
1541 for (i = 0; i < h_count; i++)
1542 {
1543 if (tdata[i]->exit_code > exit_code)
1544 exit_code = tdata[i]->exit_code;
1545 }
1546
1547 cleanup:
1548 /* Cannot pass NULL for key, so just pass dummy key */
1549 list_delete_all(pstdout_consolidated_stdout, _pstdout_consolidated_data_delete_all, "");
1550 list_delete_all(pstdout_consolidated_stderr, _pstdout_consolidated_data_delete_all, "");
1551 if (pstate_init)
1552 _pstdout_state_cleanup(&pstate);
1553 if (tdata)
1554 {
1555 for (i = 0; i < h_count; i++)
1556 {
1557 if (tdata[i])
1558 {
1559 free(tdata[i]->hostname);
1560 pthread_attr_destroy(&(tdata[i]->attr));
1561 free(tdata[i]);
1562 }
1563 }
1564 free(tdata);
1565 }
1566 if (hitr)
1567 fi_hostlist_iterator_destroy(hitr);
1568 if (h)
1569 fi_hostlist_destroy(h);
1570 free(host);
1571 if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
1572 {
1573 if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1574 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1575 /* Don't change error code, just move on */
1576 }
1577 if (sighandler_set)
1578 signal(SIGINT, sighandler_save);
1579 return exit_code;
1580 }
1581
1582 int
PSTDOUT_PRINTF(pstdout_state_t pstate,const char * format,...)1583 PSTDOUT_PRINTF(pstdout_state_t pstate, const char *format, ...)
1584 {
1585 va_list ap;
1586 int rv;
1587
1588 if (!format)
1589 {
1590 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1591 return -1;
1592 }
1593
1594 va_start(ap, format);
1595 if (!pstate
1596 || pstate->magic != PSTDOUT_STATE_MAGIC
1597 || !pstdout_initialized)
1598 rv = vprintf(format, ap);
1599 else
1600 rv = _pstdout_print(pstate, 0, stdout, format, ap);
1601 va_end(ap);
1602 return rv;
1603 }
1604
1605 int
PSTDOUT_FPRINTF(pstdout_state_t pstate,FILE * stream,const char * format,...)1606 PSTDOUT_FPRINTF(pstdout_state_t pstate, FILE *stream, const char *format, ...)
1607 {
1608 va_list ap;
1609 int rv;
1610
1611 if (!stream || !format)
1612 {
1613 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1614 return -1;
1615 }
1616
1617 va_start(ap, format);
1618 if (!pstate
1619 || pstate->magic != PSTDOUT_STATE_MAGIC
1620 || (stream != stdout && stream != stderr)
1621 || !pstdout_initialized)
1622 rv = vfprintf(stream, format, ap);
1623 else
1624 rv = _pstdout_print(pstate, 0, stream, format, ap);
1625 va_end(ap);
1626 return rv;
1627 }
1628
1629 void
PSTDOUT_PERROR(pstdout_state_t pstate,const char * s)1630 PSTDOUT_PERROR(pstdout_state_t pstate, const char *s)
1631 {
1632 if (!s)
1633 {
1634 pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1635 return;
1636 }
1637
1638 if (!pstate
1639 || pstate->magic != PSTDOUT_STATE_MAGIC
1640 || !pstdout_initialized)
1641 fprintf(stderr, "%s: %s\n", s, strerror(errno));
1642 else
1643 _pstdout_print_wrapper(pstate, 0, stderr, "%s: %s\n", s, strerror(errno));
1644 }
1645