1 /*****************************************************************************\
2  *  $Id: pstdout.c,v 1.10 2010-02-10 01:27:44 chu11 Exp $
3  *****************************************************************************
4  *  Copyright (C) 2007-2015 Lawrence Livermore National Security, LLC.
5  *  Copyright (C) 2007 The Regents of the University of California.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Albert Chu <chu11@llnl.gov>
8  *  UCRL-CODE-227589
9  *
10  *  This file is part of pstdout, a library used to launch and manage
11  *  the standard output of multiple threads. For details, see
12  *  http://www.llnl.gov/linux/.
13  *
14  *  Pstdout is free software; you can redistribute it and/or modify
15  *  it under the terms of the GNU General Public License as published by the
16  *  Free Software Foundation; either version 3 of the License, or (at your
17  *  option) any later version.
18  *
19  *  Pstdout is distributed in the hope that it will be useful, but
20  *  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
21  *  or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
22  *  for more details.
23  *
24  *  You should have received a copy of the GNU General Public License along
25  *  with Pstdout.  If not, see <http://www.gnu.org/licenses/>.
26 \*****************************************************************************/
27 
28 /*
29  * Notes:
30  *
31  * Needs to be compiled with -D_REENTRANT
32  */
33 
34 #ifdef HAVE_CONFIG_H
35 #include "config.h"
36 #endif /* HAVE_CONFIG_H */
37 
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stdint.h>
41 #if STDC_HEADERS
42 #include <string.h>
43 #endif /* STDC_HEADERS */
44 #if HAVE_PTHREAD_H
45 #include <pthread.h>
46 #endif /* HAVE_PTHREAD_H */
47 #include <signal.h>
48 #include <assert.h>
49 #include <errno.h>
50 
51 #include "pstdout.h"
52 #include "cbuf.h"
53 #include "fi_hostlist.h"
54 #include "list.h"
55 
56 /* max hostrange size is typically 16 bytes
57  *
58  * counting the comma, a 1000 node non-expanded hostrange list could
59  * be (16 + 1) * 1000 = 17000 bytes.
60  *
61  * So we round up from there to the nearest 16K
62  */
63 #define PSTDOUT_BUFLEN            32768
64 
65 static char * pstdout_errmsg[] =
66   {
67     "success",
68     "library uninitialized",
69     "incorrect parameters passed in",
70     "out of memory",
71     "unknown internal error",
72     "error number out of range",
73   };
74 
75 static uint32_t pstdout_debug_flags = PSTDOUT_DEBUG_NONE;
76 static uint32_t pstdout_output_flags = PSTDOUT_OUTPUT_STDOUT_DEFAULT | PSTDOUT_OUTPUT_STDERR_DEFAULT;
77 static unsigned int pstdout_fanout = PSTDOUT_FANOUT_DEFAULT;
78 
79 static pthread_mutex_t pstdout_threadcount_mutex = PTHREAD_MUTEX_INITIALIZER;
80 static pthread_cond_t pstdout_threadcount_cond = PTHREAD_COND_INITIALIZER;
81 static int pstdout_threadcount = 0;
82 
83 struct pstdout_thread_data {
84   char *hostname;
85   pthread_t tid;
86   pthread_attr_t attr;
87   int exit_code;
88   Pstdout_Thread pstdout_func;
89   void *arg;
90 };
91 
92 struct pstdout_state {
93   uint32_t magic;
94   char *hostname;
95   cbuf_t p_stdout;
96   cbuf_t p_stderr;
97   char *buffer_stdout;
98   char *buffer_stderr;
99   unsigned int buffer_stdout_len;
100   unsigned int buffer_stderr_len;
101   int no_more_external_output;
102   pthread_mutex_t mutex;
103 };
104 
105 #define PSTDOUT_STATE_MAGIC    0x76309ab3
106 #define PSTDOUT_STATE_CBUF_MIN 32
107 #define PSTDOUT_STATE_CBUF_MAX 2048
108 
109 int pstdout_errnum = PSTDOUT_ERR_SUCCESS;
110 
111 struct pstdout_consolidated_data {
112   fi_hostlist_t h;
113   char *output;
114 };
115 
116 static List pstdout_consolidated_stdout = NULL;
117 static List pstdout_consolidated_stderr = NULL;
118 
119 static pthread_mutex_t pstdout_consolidated_stdout_mutex = PTHREAD_MUTEX_INITIALIZER;
120 static pthread_mutex_t pstdout_consolidated_stderr_mutex = PTHREAD_MUTEX_INITIALIZER;
121 
122 static int pstdout_initialized = 0;
123 
124 static pthread_mutex_t pstdout_launch_mutex = PTHREAD_MUTEX_INITIALIZER;
125 
126 static List pstdout_states = NULL;
127 static pthread_mutex_t pstdout_states_mutex = PTHREAD_MUTEX_INITIALIZER;
128 
129 #ifndef HAVE_SIGHANDLER_T
130 typedef void (*sighandler_t)(int);
131 #endif /* HAVE_SIGHANDLER_T */
132 
133 static struct pstdout_consolidated_data *
_pstdout_consolidated_data_create(const char * hostname,const char * output)134 _pstdout_consolidated_data_create(const char *hostname, const char *output)
135 {
136   struct pstdout_consolidated_data *cdata = NULL;
137 
138   assert(hostname);
139   assert(output);
140 
141   if (!(cdata = (struct pstdout_consolidated_data *)malloc(sizeof(struct pstdout_consolidated_data))))
142     {
143       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
144       goto cleanup;
145     }
146   cdata->h = NULL;
147   cdata->output = NULL;
148 
149   if (!(cdata->h = fi_hostlist_create(hostname)))
150     {
151       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
152       goto cleanup;
153     }
154 
155   if (!(cdata->output = strdup(output)))
156     {
157       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
158       goto cleanup;
159     }
160 
161   return cdata;
162 
163  cleanup:
164   if (cdata)
165     {
166       if (cdata->h)
167         fi_hostlist_destroy(cdata->h);
168       free(cdata->output);
169       free(cdata);
170     }
171   return NULL;
172 }
173 
174 static void
_pstdout_consolidated_data_destroy(void * x)175 _pstdout_consolidated_data_destroy(void *x)
176 {
177   struct pstdout_consolidated_data *cdata;
178 
179   assert(x);
180 
181   cdata = (struct pstdout_consolidated_data *)x;
182   if (cdata->h)
183     fi_hostlist_destroy(cdata->h);
184   free(cdata->output);
185   free(cdata);
186 }
187 
188 static int
_pstdout_consolidated_data_compare(void * x,void * y)189 _pstdout_consolidated_data_compare(void *x, void *y)
190 {
191   struct pstdout_consolidated_data *cdataX;
192   struct pstdout_consolidated_data *cdataY;
193   int h_countX, h_countY;
194 
195   assert(x);
196   assert(y);
197 
198   cdataX = (struct pstdout_consolidated_data *)x;
199   cdataY = (struct pstdout_consolidated_data *)x;
200 
201   assert(cdataX->h);
202   assert(cdataY->h);
203 
204   h_countX = fi_hostlist_count(cdataX->h);
205   h_countY = fi_hostlist_count(cdataY->h);
206 
207   if (h_countX < h_countY)
208     return -1;
209   if (h_countX > h_countY)
210     return 1;
211   return 0;
212 }
213 
214 static int
_pstdout_consolidated_data_find(void * x,void * key)215 _pstdout_consolidated_data_find(void *x, void *key)
216 {
217   struct pstdout_consolidated_data *cdata;
218 
219   assert(x);
220   assert(key);
221 
222   cdata = (struct pstdout_consolidated_data *)x;
223 
224   assert(cdata->output);
225 
226   if (!strcmp(cdata->output, (char *)key))
227     return 1;
228   return 0;
229 }
230 
231 static int
_pstdout_consolidated_data_delete_all(void * x,void * key)232 _pstdout_consolidated_data_delete_all(void *x, void *key)
233 {
234   return 1;
235 }
236 
237 static int
_pstdout_states_delete_pointer(void * x,void * key)238 _pstdout_states_delete_pointer(void *x, void *key)
239 {
240   if (x == key)
241     return 1;
242   return 0;
243 }
244 
245 int
pstdout_init(void)246 pstdout_init(void)
247 {
248   if (!pstdout_initialized)
249     {
250       if (!(pstdout_consolidated_stdout = list_create((ListDelF)_pstdout_consolidated_data_destroy)))
251         {
252           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
253           goto cleanup;
254         }
255       if (!(pstdout_consolidated_stderr = list_create((ListDelF)_pstdout_consolidated_data_destroy)))
256         {
257           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
258           goto cleanup;
259         }
260       if (!(pstdout_states = list_create((ListDelF)NULL)))
261         {
262           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
263           goto cleanup;
264         }
265       pstdout_initialized++;
266     }
267 
268   return 0;
269 
270  cleanup:
271   if (pstdout_consolidated_stdout)
272     list_destroy(pstdout_consolidated_stdout);
273   if (pstdout_consolidated_stderr)
274     list_destroy(pstdout_consolidated_stderr);
275   if (pstdout_states)
276     list_destroy(pstdout_states);
277   return -1;
278 }
279 
280 char *
pstdout_strerror(int errnum)281 pstdout_strerror(int errnum)
282 {
283   if (errnum >= PSTDOUT_ERR_SUCCESS && errnum <= PSTDOUT_ERR_ERRNUMRANGE)
284     return pstdout_errmsg[errnum];
285   else
286     return pstdout_errmsg[PSTDOUT_ERR_ERRNUMRANGE];
287 }
288 
289 int
pstdout_set_debug_flags(unsigned int debug_flags)290 pstdout_set_debug_flags(unsigned int debug_flags)
291 {
292   int rc, rv = -1;
293 
294   if (debug_flags & ~PSTDOUT_DEBUG_MASK)
295     {
296       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
297       return -1;
298     }
299 
300   if (debug_flags & PSTDOUT_DEBUG_NONE
301       && debug_flags & PSTDOUT_DEBUG_STANDARD)
302     {
303       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
304       return -1;
305     }
306 
307   if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
308     {
309       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
310         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
311       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
312       goto cleanup;
313     }
314 
315   pstdout_debug_flags = debug_flags;
316 
317 
318   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
319   rv = 0;
320  cleanup:
321   if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
322     {
323       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
324         fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
325       /* Don't change error code, just move on */
326     }
327   return rv;
328 }
329 
330 int
pstdout_get_debug_flags(void)331 pstdout_get_debug_flags(void)
332 {
333   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
334   return pstdout_debug_flags;
335 }
336 
337 int
pstdout_set_output_flags(unsigned int output_flags)338 pstdout_set_output_flags(unsigned int output_flags)
339 {
340   int rc, rv = -1;
341 
342   if (output_flags & ~PSTDOUT_OUTPUT_MASK)
343     {
344       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
345       return -1;
346     }
347 
348   if (((output_flags & PSTDOUT_OUTPUT_STDOUT_DEFAULT)
349        && (output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME))
350       || ((output_flags & PSTDOUT_OUTPUT_STDERR_DEFAULT)
351           && (output_flags & PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME))
352       || (!(output_flags & PSTDOUT_OUTPUT_STDOUT_DEFAULT)
353           && !(output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME))
354       || (!(output_flags & PSTDOUT_OUTPUT_STDERR_DEFAULT)
355           && !(output_flags & PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME)))
356     {
357       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
358       return -1;
359     }
360 
361   if (((output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME)
362        && (output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE))
363       || ((output_flags & PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME)
364           && (output_flags & PSTDOUT_OUTPUT_STDERR_CONSOLIDATE)))
365     {
366       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
367       return -1;
368     }
369 
370   if (((output_flags & PSTDOUT_OUTPUT_BUFFER_STDOUT)
371        && (output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE))
372       || ((output_flags & PSTDOUT_OUTPUT_BUFFER_STDERR)
373           && (output_flags & PSTDOUT_OUTPUT_STDERR_CONSOLIDATE)))
374     {
375       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
376       return -1;
377     }
378 
379   if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
380     {
381       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
382         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
383       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
384       goto cleanup;
385     }
386 
387   pstdout_output_flags = output_flags;
388 
389   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
390   rv = 0;
391  cleanup:
392   if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
393     {
394       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
395         fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
396       /* Don't change error code, just move on */
397     }
398   return rv;
399 }
400 
401 int
pstdout_get_output_flags(void)402 pstdout_get_output_flags(void)
403 {
404   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
405   return pstdout_output_flags;
406 }
407 
408 int
pstdout_set_fanout(unsigned int fanout)409 pstdout_set_fanout(unsigned int fanout)
410 {
411   int rc, rv = -1;
412 
413   if (!(fanout >= PSTDOUT_FANOUT_MIN && fanout <= PSTDOUT_FANOUT_MAX))
414     {
415       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
416       return -1;
417     }
418 
419   if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
420     {
421       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
422         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
423       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
424       goto cleanup;
425     }
426 
427   pstdout_fanout = fanout;
428 
429   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
430   rv = 0;
431  cleanup:
432   if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
433     {
434       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
435         fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
436       /* Don't change error code, just move on */
437     }
438 
439   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
440   return rv;
441 }
442 
443 int
pstdout_get_fanout(void)444 pstdout_get_fanout(void)
445 {
446   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
447   return pstdout_fanout;
448 }
449 
450 int
pstdout_hostnames_count(const char * hostnames)451 pstdout_hostnames_count(const char *hostnames)
452 {
453   fi_hostlist_t h = NULL;
454   int count = 0;
455   int rv = -1;
456 
457   if (!hostnames)
458     {
459       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
460       return -1;
461     }
462 
463   if (!(h = fi_hostlist_create(NULL)))
464     {
465       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
466       goto cleanup;
467     }
468 
469   if (!(count = fi_hostlist_push(h, hostnames)))
470     {
471       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
472         fprintf(stderr, "valid hostnames count == 0\n");
473       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
474       goto cleanup;
475     }
476 
477   rv = count;
478  cleanup:
479   if (h)
480     fi_hostlist_destroy(h);
481   return rv;
482 }
483 
484 static int
_pstdout_print(pstdout_state_t pstate,int internal_to_pstdout,FILE * stream,const char * format,va_list ap)485 _pstdout_print(pstdout_state_t pstate,
486                int internal_to_pstdout,
487                FILE *stream,
488                const char *format,
489                va_list ap)
490 {
491   char *buf = NULL;
492   char *linebuf = NULL;
493   size_t buflen = PSTDOUT_BUFLEN;
494   size_t linebuflen = PSTDOUT_BUFLEN;
495   cbuf_t whichcbuf;
496   uint32_t whichdefaultmask;
497   uint32_t whichprependmask;
498   uint32_t whichbuffermask;
499   uint32_t whichconsolidatemask;
500   char **whichbuffer;
501   unsigned int *whichbufferlen;
502   int wlen;
503   int linelen;
504   int pstate_mutex_locked = 0;
505   int rc, rv = -1;
506 
507   assert(pstate);
508   assert(pstate->magic == PSTDOUT_STATE_MAGIC);
509   assert(pstate->p_stdout);
510   assert(pstate->p_stderr);
511   assert(stream);
512   assert(stream == stdout || stream == stderr);
513   assert(format);
514 
515   if (stream == stdout)
516     {
517       whichcbuf = pstate->p_stdout;
518       whichdefaultmask = PSTDOUT_OUTPUT_STDOUT_DEFAULT;
519       whichprependmask = PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME;
520       whichbuffermask = PSTDOUT_OUTPUT_BUFFER_STDOUT;
521       whichconsolidatemask = PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE;
522       whichbuffer = &(pstate->buffer_stdout);
523       whichbufferlen = &(pstate->buffer_stdout_len);
524     }
525   else
526     {
527       whichcbuf = pstate->p_stderr;
528       whichdefaultmask = PSTDOUT_OUTPUT_STDERR_DEFAULT;
529       whichprependmask = PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME;
530       whichbuffermask = PSTDOUT_OUTPUT_BUFFER_STDERR;
531       whichconsolidatemask = PSTDOUT_OUTPUT_STDERR_CONSOLIDATE;
532       whichbuffer = &(pstate->buffer_stderr);
533       whichbufferlen = &(pstate->buffer_stderr_len);
534     }
535 
536   while (1)
537     {
538       va_list vacpy;
539 
540       if (!(buf = (char *)realloc(buf, buflen)))
541         {
542           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
543           goto cleanup;
544         }
545       memset(buf, '\0', PSTDOUT_BUFLEN);
546       va_copy(vacpy, ap);
547       wlen = vsnprintf(buf, buflen, format, vacpy);
548       va_end(vacpy);
549       if (wlen < buflen)
550         break;
551       buflen += PSTDOUT_BUFLEN;
552     }
553 
554   if ((rc = pthread_mutex_lock(&(pstate->mutex))))
555     {
556       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
557         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
558       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
559       goto cleanup;
560     }
561   pstate_mutex_locked++;
562 
563   /* Protect from racing output when we are in a Ctrl+C flushing
564    * buffered output situation
565    */
566   if (!internal_to_pstdout && pstate->no_more_external_output)
567     goto cleanup;
568 
569   if (cbuf_write(whichcbuf, buf, wlen, NULL) < 0)
570     {
571       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
572         fprintf(stderr, "cbuf_write: %s\n", strerror(errno));
573       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
574       goto cleanup;
575     }
576 
577   while (1)
578     {
579       if (!(linebuf = (char *)realloc(linebuf, linebuflen)))
580         {
581           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
582           goto cleanup;
583         }
584       memset(linebuf, '\0', PSTDOUT_BUFLEN);
585 
586       while ((linelen = cbuf_read_line (whichcbuf, linebuf, linebuflen, 1)) > 0)
587         {
588           if (linelen >= linebuflen)
589             break;
590 
591           if (!pstate->hostname
592               || ((pstdout_output_flags & whichdefaultmask)
593                   && !(pstdout_output_flags & whichbuffermask)
594                   && !(pstdout_output_flags & whichconsolidatemask)))
595 
596             {
597               rv = fprintf(stream, "%s", linebuf);
598               fflush(stream);
599             }
600           else if (pstdout_output_flags & whichprependmask
601                    && !(pstdout_output_flags & whichbuffermask)
602                    && !(pstdout_output_flags & whichconsolidatemask))
603             {
604               rv = fprintf(stream, "%s: %s", pstate->hostname, linebuf);
605               fflush(stream);
606             }
607           else if (((pstdout_output_flags & whichdefaultmask)
608                     && (pstdout_output_flags & whichbuffermask))
609                    || (pstdout_output_flags & whichconsolidatemask))
610             {
611               if (!(*whichbuffer = (char *)realloc(*whichbuffer, *whichbufferlen + linelen)))
612                 {
613                   pstdout_errnum = PSTDOUT_ERR_OUTMEM;
614                   goto cleanup;
615                 }
616 
617               /* Don't use snprintf, it will truncate b/c "snprintf and
618                  vsnprintf do not write more than size bytes (including
619                  the trailing '\0'). " */
620               memcpy(*whichbuffer + *whichbufferlen, linebuf, linelen);
621               *whichbufferlen += linelen;
622               rv = linelen;
623             }
624           else if ((pstdout_output_flags & whichprependmask)
625                    && (pstdout_output_flags & whichbuffermask))
626             {
627               unsigned int hostname_len;
628               unsigned int extra_len;
629 
630               /* + 2 is for the ": " */
631               hostname_len = strlen(pstate->hostname);
632               extra_len = hostname_len + 2;
633               if (!(*whichbuffer = (char *)realloc(*whichbuffer, *whichbufferlen + linelen + extra_len)))
634                 {
635                   pstdout_errnum = PSTDOUT_ERR_OUTMEM;
636                   goto cleanup;
637                 }
638 
639               /* Don't use snprintf, it will truncate b/c "snprintf and
640                  vsnprintf do not write more than size bytes (including
641                  the trailing '\0'). " */
642               memcpy(*whichbuffer + *whichbufferlen,
643                      pstate->hostname,
644                      hostname_len);
645               memcpy(*whichbuffer + *whichbufferlen + hostname_len,
646                      ": ",
647                      2);
648               memcpy(*whichbuffer + *whichbufferlen + hostname_len + 2,
649                      linebuf,
650                      linelen);
651               *whichbufferlen += linelen + extra_len;
652               rv = linelen + extra_len;
653             }
654           else
655             {
656               pstdout_errnum = PSTDOUT_ERR_INTERNAL;
657               return -1;
658             }
659         }
660 
661       if (linelen < 0)
662         {
663           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
664             fprintf(stderr, "cbuf_read_line: %s\n", strerror(errno));
665           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
666           goto cleanup;
667         }
668 
669       if (!linelen)
670         break;
671 
672       linebuflen += PSTDOUT_BUFLEN;
673     }
674 
675   if (rv < 0)
676     rv = 0;
677 
678   pstdout_errnum = PSTDOUT_ERR_SUCCESS;
679  cleanup:
680   if (pstate_mutex_locked)
681     {
682       if ((rc = pthread_mutex_unlock(&(pstate->mutex))))
683         {
684           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
685             fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
686           /* Don't change error code, just move on */
687         }
688     }
689   free(buf);
690   free(linebuf);
691   return rv;
692 }
693 
694 static void
_pstdout_print_wrapper(pstdout_state_t pstate,int internal_to_pstdout,FILE * stream,const char * format,...)695 _pstdout_print_wrapper(pstdout_state_t pstate,
696                        int internal_to_pstdout,
697                        FILE *stream,
698                        const char *format, ...)
699 {
700   va_list ap;
701 
702   assert(pstate);
703   assert(pstate->magic == PSTDOUT_STATE_MAGIC);
704   assert(stream);
705   assert(stream == stdout || stream == stderr);
706   assert(format);
707 
708   va_start(ap, format);
709   _pstdout_print(pstate, internal_to_pstdout, stderr, format, ap);
710   va_end(ap);
711 }
712 
713 int
pstdout_printf(pstdout_state_t pstate,const char * format,...)714 pstdout_printf(pstdout_state_t pstate, const char *format, ...)
715 {
716   va_list ap;
717   int rv;
718 
719   if (!pstdout_initialized)
720     {
721       pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
722       return -1;
723     }
724 
725   if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
726     {
727       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
728       return -1;
729     }
730 
731   if (!format)
732     {
733       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
734       return -1;
735     }
736 
737   va_start(ap, format);
738   rv = _pstdout_print(pstate, 0, stdout, format, ap);
739   va_end(ap);
740   return rv;
741 }
742 
743 int
pstdout_vprintf(pstdout_state_t pstate,const char * format,va_list ap)744 pstdout_vprintf(pstdout_state_t pstate, const char *format, va_list ap)
745 {
746   int rv;
747 
748   if (!pstdout_initialized)
749     {
750       pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
751       return -1;
752     }
753 
754   if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
755     {
756       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
757       return -1;
758     }
759 
760   if (!format)
761     {
762       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
763       return -1;
764     }
765 
766   /* achu: va_list is defined by C standard as an object type, not
767    * necessarily pointer type.  So can't do NULL on ap type.  No known
768    * portable macro to test for validity of input.
769    */
770 
771   rv = _pstdout_print(pstate, 0, stdout, format, ap);
772   return rv;
773 }
774 
775 int
pstdout_fprintf(pstdout_state_t pstate,FILE * stream,const char * format,...)776 pstdout_fprintf(pstdout_state_t pstate, FILE *stream, const char *format, ...)
777 {
778   va_list ap;
779   int rv;
780 
781   if (!pstdout_initialized)
782     {
783       pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
784       return -1;
785     }
786 
787   if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
788     {
789       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
790       return -1;
791     }
792 
793   if (!stream || !format)
794     {
795       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
796       return -1;
797     }
798 
799   if (stream != stdout && stream != stderr)
800     {
801       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
802       return -1;
803     }
804 
805   va_start(ap, format);
806   rv = _pstdout_print(pstate, 0, stream, format, ap);
807   va_end(ap);
808   return rv;
809 }
810 
811 int
pstdout_vfprintf(pstdout_state_t pstate,FILE * stream,const char * format,va_list ap)812 pstdout_vfprintf(pstdout_state_t pstate, FILE *stream, const char *format,
813                  va_list ap)
814 {
815   int rv;
816 
817   if (!pstdout_initialized)
818     {
819       pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
820       return -1;
821     }
822 
823   if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
824     {
825       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
826       return -1;
827     }
828 
829   if (!stream || !format)
830     {
831       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
832       return -1;
833     }
834 
835   if (stream != stdout && stream != stderr)
836     {
837       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
838       return -1;
839     }
840 
841   rv = _pstdout_print(pstate, 0, stream, format, ap);
842   return rv;
843 }
844 
845 void
pstdout_perror(pstdout_state_t pstate,const char * s)846 pstdout_perror(pstdout_state_t pstate, const char *s)
847 {
848   if (!pstdout_initialized)
849     {
850       pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
851       return;
852     }
853 
854   if (!pstate || pstate->magic != PSTDOUT_STATE_MAGIC)
855     {
856       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
857       return;
858     }
859 
860   if (!s)
861     {
862       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
863       return;
864     }
865 
866   _pstdout_print_wrapper(pstate, 0, stderr, "%s: %s\n", s, strerror(errno));
867 }
868 
869 static int
_pstdout_state_init(pstdout_state_t pstate,const char * hostname)870 _pstdout_state_init(pstdout_state_t pstate, const char *hostname)
871 {
872   int rc;
873 
874   assert(pstate);
875 
876   memset(pstate, '\0', sizeof(struct pstdout_state));
877   pstate->magic = PSTDOUT_STATE_MAGIC;
878   pstate->hostname = (char *)hostname;
879 
880   if (!(pstate->p_stdout = cbuf_create(PSTDOUT_STATE_CBUF_MIN, PSTDOUT_STATE_CBUF_MAX)))
881     {
882       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
883         fprintf(stderr, "cbuf_create: %s\n", strerror(errno));
884       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
885       return -1;
886     }
887   if (!(pstate->p_stderr = cbuf_create(PSTDOUT_STATE_CBUF_MIN, PSTDOUT_STATE_CBUF_MAX)))
888     {
889       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
890         fprintf(stderr, "cbuf_create: %s\n", strerror(errno));
891       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
892       return -1;
893     }
894   pstate->buffer_stdout = NULL;
895   pstate->buffer_stdout_len = 0;
896   pstate->buffer_stderr = NULL;
897   pstate->buffer_stderr_len = 0;
898   pstate->no_more_external_output = 0;
899 
900   if ((rc = pthread_mutex_init(&(pstate->mutex), NULL)))
901     {
902       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
903         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
904       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
905       return -1;
906     }
907   return 0;
908 }
909 
910 static int
_pstdout_output_buffer_data(pstdout_state_t pstate,FILE * stream,char ** whichbuffer,unsigned int * whichbufferlen,uint32_t whichprependmask,uint32_t whichbuffermask,uint32_t whichconsolidatemask,List whichconsolidatedlist,pthread_mutex_t * whichconsolidatedmutex)911 _pstdout_output_buffer_data(pstdout_state_t pstate,
912                             FILE *stream,
913                             char **whichbuffer,
914                             unsigned int *whichbufferlen,
915                             uint32_t whichprependmask,
916                             uint32_t whichbuffermask,
917                             uint32_t whichconsolidatemask,
918                             List whichconsolidatedlist,
919                             pthread_mutex_t *whichconsolidatedmutex)
920 {
921   assert(pstate);
922   assert(pstate->magic == PSTDOUT_STATE_MAGIC);
923   assert(pstate->p_stdout);
924   assert(pstate->p_stderr);
925   assert(stream);
926   assert(stream == stdout || stream == stderr);
927   assert(whichbuffer);
928   assert(whichbufferlen);
929   assert(whichprependmask == PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME
930          || whichprependmask == PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME);
931   assert(whichbuffermask == PSTDOUT_OUTPUT_BUFFER_STDOUT
932          || whichbuffermask == PSTDOUT_OUTPUT_BUFFER_STDERR);
933   assert(whichconsolidatemask == PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE
934          || whichconsolidatemask == PSTDOUT_OUTPUT_STDERR_CONSOLIDATE);
935   assert(whichconsolidatedlist);
936   assert(whichconsolidatedmutex);
937 
938   if ((*whichbuffer && *whichbufferlen)
939       && (pstdout_output_flags & whichbuffermask
940           || pstdout_output_flags & whichconsolidatemask))
941     {
942       /* Need to write a '\0' */
943       if (!(*whichbuffer = (char *)realloc(*whichbuffer, *whichbufferlen + 1)))
944         {
945           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
946           goto cleanup;
947         }
948 
949       (*whichbuffer)[*whichbufferlen] = '\0';
950       *whichbufferlen += 1;
951 
952       if (pstdout_output_flags & whichbuffermask)
953         {
954           if (!(pstdout_output_flags & whichprependmask))
955             {
956               fprintf(stream, "----------------\n");
957               fprintf(stream, "%s\n", pstate->hostname);
958               fprintf(stream, "----------------\n");
959             }
960           fprintf(stream, "%s", *whichbuffer);
961           fflush(stream);
962         }
963       else
964         {
965           struct pstdout_consolidated_data *cdata;
966           int rc;
967 
968           if ((rc = pthread_mutex_lock(whichconsolidatedmutex)))
969             {
970               if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
971                 fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
972               pstdout_errnum = PSTDOUT_ERR_INTERNAL;
973               goto cleanup;
974             }
975 
976           if (!(cdata = list_find_first(whichconsolidatedlist, _pstdout_consolidated_data_find, *whichbuffer)))
977             {
978               if (!(cdata = _pstdout_consolidated_data_create(pstate->hostname, *whichbuffer)))
979                 goto cleanup;
980 
981               if (!list_append(whichconsolidatedlist, cdata))
982                 {
983                   if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
984                     fprintf(stderr, "list_append: %s\n", strerror(errno));
985                   pstdout_errnum = PSTDOUT_ERR_INTERNAL;
986                   _pstdout_consolidated_data_destroy(cdata);
987                   goto cleanup;
988                 }
989             }
990           else
991             {
992               if (!fi_hostlist_push(cdata->h, pstate->hostname))
993                 {
994                   if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
995                     fprintf(stderr, "fi_hostlist_push: %s\n", strerror(errno));
996                   pstdout_errnum = PSTDOUT_ERR_INTERNAL;
997                   goto cleanup;
998                 }
999             }
1000 
1001           if ((rc = pthread_mutex_unlock(whichconsolidatedmutex)))
1002             {
1003               if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1004                 fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1005               pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1006               goto cleanup;
1007             }
1008 
1009         }
1010     }
1011 
1012   return 0;
1013 
1014  cleanup:
1015   return -1;
1016 }
1017 
1018 static int
_pstdout_output_finish(pstdout_state_t pstate)1019 _pstdout_output_finish(pstdout_state_t pstate)
1020 {
1021   int pstate_mutex_locked = 0;
1022   int rc, rv = -1;
1023 
1024   assert(pstate);
1025   assert(pstate->magic == PSTDOUT_STATE_MAGIC);
1026   assert(pstate->p_stdout);
1027   assert(pstate->p_stderr);
1028 
1029   if ((rc = pthread_mutex_lock(&(pstate->mutex))))
1030     {
1031       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1032         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1033       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1034       goto cleanup;
1035     }
1036   pstate_mutex_locked++;
1037 
1038   /* If there is remaining junk in the cbufs, write a "\n" to it so we
1039    * finish off the line and get it flushed out.
1040    */
1041   if (!cbuf_is_empty(pstate->p_stdout))
1042     _pstdout_print_wrapper(pstate, 1, stdout, "\n");
1043 
1044   if (!cbuf_is_empty(pstate->p_stderr))
1045     _pstdout_print_wrapper(pstate, 1, stderr, "\n");
1046 
1047   if (_pstdout_output_buffer_data(pstate,
1048                                   stdout,
1049                                   &(pstate->buffer_stdout),
1050                                   &(pstate->buffer_stdout_len),
1051                                   PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME,
1052                                   PSTDOUT_OUTPUT_BUFFER_STDOUT,
1053                                   PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE,
1054                                   pstdout_consolidated_stdout,
1055                                   &pstdout_consolidated_stdout_mutex) < 0)
1056     goto cleanup;
1057 
1058   if (_pstdout_output_buffer_data(pstate,
1059                                   stderr,
1060                                   &(pstate->buffer_stderr),
1061                                   &(pstate->buffer_stderr_len),
1062                                   PSTDOUT_OUTPUT_STDERR_PREPEND_HOSTNAME,
1063                                   PSTDOUT_OUTPUT_BUFFER_STDERR,
1064                                   PSTDOUT_OUTPUT_STDERR_CONSOLIDATE,
1065                                   pstdout_consolidated_stderr,
1066                                   &pstdout_consolidated_stderr_mutex) < 0)
1067     goto cleanup;
1068 
1069   /* Only output from internal to pstdout is allowed */
1070   pstate->no_more_external_output = 1;
1071   rv = 0;
1072 
1073  cleanup:
1074   if (pstate_mutex_locked)
1075     {
1076       if ((rc = pthread_mutex_unlock(&(pstate->mutex))))
1077         {
1078           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1079             fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1080           /* Don't change error code, just move on */
1081         }
1082     }
1083   return rv;
1084 }
1085 
1086 static void
_pstdout_state_cleanup(pstdout_state_t pstate)1087 _pstdout_state_cleanup(pstdout_state_t pstate)
1088 {
1089   assert(pstate);
1090   assert(pstate->magic == PSTDOUT_STATE_MAGIC);
1091 
1092   if (pstate->p_stdout)
1093     cbuf_destroy(pstate->p_stdout);
1094   if (pstate->p_stderr)
1095     cbuf_destroy(pstate->p_stderr);
1096   free(pstate->buffer_stdout);
1097   free(pstate->buffer_stderr);
1098   memset(pstate, '\0', sizeof(struct pstdout_state));
1099 }
1100 
1101 static void *
_pstdout_func_entry(void * arg)1102 _pstdout_func_entry(void *arg)
1103 {
1104   struct pstdout_thread_data *tdata = NULL;
1105   struct pstdout_state pstate;
1106   int rc;
1107 
1108   tdata = (struct pstdout_thread_data *)arg;
1109 
1110   if (_pstdout_state_init(&pstate, tdata->hostname) < 0)
1111     goto cleanup;
1112 
1113   if ((rc = pthread_mutex_lock(&pstdout_states_mutex)))
1114     {
1115       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1116         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1117       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1118       goto cleanup;
1119     }
1120 
1121   if (!list_append(pstdout_states, &pstate))
1122     {
1123       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1124         fprintf(stderr, "list_append: %s\n", strerror(errno));
1125       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1126       pthread_mutex_unlock(&pstdout_states_mutex);
1127       goto cleanup;
1128     }
1129 
1130   if ((rc = pthread_mutex_unlock(&pstdout_states_mutex)))
1131     {
1132       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1133         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1134       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1135       goto cleanup;
1136     }
1137 
1138   tdata->exit_code = (tdata->pstdout_func)(&pstate, tdata->hostname, tdata->arg);
1139 
1140   if (_pstdout_output_finish(&pstate) < 0)
1141     goto cleanup;
1142 
1143  cleanup:
1144   pthread_mutex_lock(&pstdout_states_mutex);
1145   list_delete_all(pstdout_states, _pstdout_states_delete_pointer, &pstate);
1146   pthread_mutex_unlock(&pstdout_states_mutex);
1147   _pstdout_state_cleanup(&pstate);
1148   pthread_mutex_lock(&pstdout_threadcount_mutex);
1149   pstdout_threadcount--;
1150   pthread_cond_signal(&pstdout_threadcount_cond);
1151   pthread_mutex_unlock(&pstdout_threadcount_mutex);
1152   return NULL;
1153 }
1154 
1155 static int
_pstdout_output_consolidated(FILE * stream,List whichconsolidatedlist,pthread_mutex_t * whichconsolidatedmutex)1156 _pstdout_output_consolidated(FILE *stream,
1157                              List whichconsolidatedlist,
1158                              pthread_mutex_t *whichconsolidatedmutex)
1159 {
1160   struct pstdout_consolidated_data *cdata;
1161   ListIterator itr = NULL;
1162   int mutex_locked = 0;
1163   int rc, rv = -1;
1164 
1165   assert(stream);
1166   assert(stream == stdout || stream == stderr);
1167   assert(whichconsolidatedlist);
1168   assert(whichconsolidatedmutex);
1169 
1170   if ((rc = pthread_mutex_lock(whichconsolidatedmutex)))
1171     {
1172       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1173         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1174       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1175       goto cleanup;
1176     }
1177   mutex_locked++;
1178 
1179   list_sort(whichconsolidatedlist, _pstdout_consolidated_data_compare);
1180 
1181   if (!(itr = list_iterator_create (whichconsolidatedlist)))
1182     {
1183       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1184       goto cleanup;
1185     }
1186 
1187   while ((cdata = list_next(itr)))
1188     {
1189       char hbuf[PSTDOUT_BUFLEN + 1];
1190 
1191       memset(hbuf, '\0', PSTDOUT_BUFLEN + 1);
1192       fi_hostlist_sort(cdata->h);
1193       if (fi_hostlist_ranged_string(cdata->h, PSTDOUT_BUFLEN, hbuf) < 0)
1194         {
1195           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1196             fprintf(stderr, "fi_hostlist_ranged_string: %s\n", strerror(errno));
1197           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1198           goto cleanup;
1199         }
1200 
1201       fprintf(stream, "----------------\n");
1202       fprintf(stream, "%s\n", hbuf);
1203       fprintf(stream, "----------------\n");
1204       fprintf(stream, "%s", cdata->output);
1205     }
1206 
1207   rv = 0;
1208  cleanup:
1209   if (mutex_locked)
1210     {
1211       if ((rc = pthread_mutex_unlock(whichconsolidatedmutex)))
1212         {
1213           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1214             fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1215           /* Don't change error code, just move on */
1216         }
1217     }
1218   if (itr)
1219     list_iterator_destroy(itr);
1220   return rv;
1221 }
1222 
1223 static int
_pstdout_output_consolidated_finish(void)1224 _pstdout_output_consolidated_finish(void)
1225 {
1226   /* Output consolidated data */
1227   if (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE)
1228     {
1229       if (_pstdout_output_consolidated(stdout,
1230                                        pstdout_consolidated_stdout,
1231                                        &pstdout_consolidated_stdout_mutex) < 0)
1232         goto cleanup;
1233     }
1234 
1235   if (pstdout_output_flags & PSTDOUT_OUTPUT_STDERR_CONSOLIDATE)
1236     {
1237       if (_pstdout_output_consolidated(stderr,
1238                                        pstdout_consolidated_stderr,
1239                                        &pstdout_consolidated_stderr_mutex) < 0)
1240         goto cleanup;
1241     }
1242 
1243   return 0;
1244 
1245  cleanup:
1246   return -1;
1247 }
1248 
1249 static int
_pstdout_sigint_finish_output(void * x,void * arg)1250 _pstdout_sigint_finish_output(void *x, void *arg)
1251 {
1252   struct pstdout_state *pstate;
1253   assert(x);
1254 
1255   pstate = (struct pstdout_state *)x;
1256 
1257   if (_pstdout_output_finish(pstate) < 0)
1258     return -1;
1259 
1260   /* The no_more_external_output flag set in _pstdout_output_finish()
1261    * protects from extraneous extra output from other threads after
1262    * this output.
1263    */
1264 
1265   if (pstate->hostname
1266       && (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME)
1267       && !(pstdout_output_flags & PSTDOUT_OUTPUT_BUFFER_STDOUT)
1268       && !(pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE))
1269     {
1270       fprintf(stdout, "%s: exiting session\n", pstate->hostname);
1271       fflush(stdout);
1272     }
1273 
1274   /* Note: there isn't a race with any remaining threads and the below
1275    * output, b/c the _pstdout_output_finish() outputs buffered data or
1276    * puts the remaining data into the consolidated output list.  Any extra
1277    * output from the user cannot be outputted.
1278    */
1279 
1280   if (pstate->hostname
1281       && (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_PREPEND_HOSTNAME)
1282       && (pstdout_output_flags & PSTDOUT_OUTPUT_BUFFER_STDOUT))
1283     {
1284       if ((pstate->buffer_stdout && pstate->buffer_stdout_len)
1285           || (pstate->buffer_stderr && pstate->buffer_stderr_len))
1286         fprintf(stdout, "%s: exiting session: current output flushed\n", pstate->hostname);
1287       else
1288         fprintf(stdout, "%s: exiting session\n", pstate->hostname);
1289       fflush(stdout);
1290     }
1291 
1292   if (pstdout_output_flags & PSTDOUT_OUTPUT_STDOUT_CONSOLIDATE)
1293     {
1294       if ((pstate->buffer_stdout && pstate->buffer_stdout_len)
1295           || (pstate->buffer_stderr && pstate->buffer_stderr_len))
1296         fprintf(stdout, "%s: exiting session: current output consolidated\n", pstate->hostname);
1297       else
1298         fprintf(stdout, "%s: exiting session\n", pstate->hostname);
1299     }
1300 
1301   return 0;
1302 }
1303 
1304 void
_pstdout_sigint(int s)1305 _pstdout_sigint(int s)
1306 {
1307   int rc;
1308 
1309   /* This is a last ditch effort, so no need to worry if we don't get
1310    * a lock or get an error or whatever.
1311    */
1312   if ((rc = pthread_mutex_lock(&pstdout_states_mutex)))
1313     {
1314       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1315         fprintf(stderr, "fi_hostlist_ranged_string: %s\n", strerror(rc));
1316     }
1317 
1318   if (list_for_each(pstdout_states, _pstdout_sigint_finish_output, NULL) < 0)
1319     {
1320       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1321         fprintf(stderr, "list_for_each: %s\n", strerror(errno));
1322     }
1323 
1324   _pstdout_output_consolidated_finish();
1325   exit (EXIT_FAILURE);
1326 }
1327 
1328 int
pstdout_launch(const char * hostnames,Pstdout_Thread pstdout_func,void * arg)1329 pstdout_launch(const char *hostnames, Pstdout_Thread pstdout_func, void *arg)
1330 {
1331   struct pstdout_thread_data **tdata = NULL;
1332   struct pstdout_state pstate;
1333   unsigned int pstate_init = 0;
1334   fi_hostlist_iterator_t hitr = NULL;
1335   fi_hostlist_t h = NULL;
1336   int h_count = 0;
1337   char *host = NULL;
1338   int exit_code = -1;
1339   sighandler_t sighandler_save = NULL;
1340   int sighandler_set = 0;
1341   int rc;
1342   int i;
1343 
1344   if (!pstdout_initialized)
1345     {
1346       pstdout_errnum = PSTDOUT_ERR_UNINITIALIZED;
1347       return -1;
1348     }
1349 
1350   if (!pstdout_func)
1351     {
1352       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1353       return -1;
1354     }
1355 
1356   if ((rc = pthread_mutex_lock(&pstdout_launch_mutex)))
1357     {
1358       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1359         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1360       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1361       goto cleanup;
1362     }
1363 
1364   /* Special case */
1365   if (!hostnames)
1366     {
1367       if (_pstdout_state_init(&pstate, NULL) < 0)
1368         goto cleanup;
1369       pstate_init++;
1370 
1371       exit_code = pstdout_func(&pstate, NULL, arg);
1372       pstdout_errnum = PSTDOUT_ERR_SUCCESS;
1373       goto cleanup;
1374     }
1375 
1376   if (!(h = fi_hostlist_create(hostnames)))
1377     {
1378       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1379       goto cleanup;
1380     }
1381   h_count = fi_hostlist_count(h);
1382 
1383   /* Sanity check */
1384   if (h_count <= 0)
1385     {
1386       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1387         fprintf(stderr, "h_count = %d\n", h_count);
1388       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1389       goto cleanup;
1390     }
1391 
1392   /* Special case */
1393   if (h_count == 1)
1394     {
1395       if (_pstdout_state_init(&pstate, hostnames) < 0)
1396         goto cleanup;
1397       pstate_init++;
1398 
1399       exit_code = pstdout_func(&pstate, hostnames, arg);
1400       pstdout_errnum = PSTDOUT_ERR_SUCCESS;
1401       goto cleanup;
1402     }
1403 
1404   if ((sighandler_save = signal(SIGINT, _pstdout_sigint)) == SIG_ERR)
1405     {
1406       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1407         fprintf(stderr, "signal\n");
1408       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1409       goto cleanup;
1410     }
1411   sighandler_set++;
1412 
1413   if (!(hitr = fi_hostlist_iterator_create(h)))
1414     {
1415       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1416       goto cleanup;
1417     }
1418 
1419   if (!(tdata = (struct pstdout_thread_data **)malloc(sizeof(struct pstdout_thread_data *) * h_count)))
1420     {
1421       pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1422       goto cleanup;
1423     }
1424   memset(tdata, '\0', sizeof(struct pstdout_thread_data *) * h_count);
1425 
1426   i = 0;
1427   while ((host = fi_hostlist_next(hitr)))
1428     {
1429       if (!(tdata[i] = (struct pstdout_thread_data *)malloc(sizeof(struct pstdout_thread_data))))
1430         {
1431           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1432           goto cleanup;
1433         }
1434       memset(tdata[i], '\0', sizeof(struct pstdout_thread_data));
1435 
1436       if (!(tdata[i]->hostname = strdup(host)))
1437         {
1438           pstdout_errnum = PSTDOUT_ERR_OUTMEM;
1439           goto cleanup;
1440         }
1441       tdata[i]->pstdout_func = pstdout_func;
1442       tdata[i]->arg = arg;
1443 
1444       if ((rc = pthread_attr_init(&(tdata[i]->attr))))
1445         {
1446           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1447             fprintf(stderr, "pthread_attr_init: %s\n", strerror(rc));
1448           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1449           goto cleanup;
1450         }
1451 
1452       if ((rc = pthread_attr_setdetachstate(&(tdata[i]->attr), PTHREAD_CREATE_DETACHED)))
1453         {
1454           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1455             fprintf(stderr, "pthread_attr_setdetachstate: %s\n", strerror(rc));
1456           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1457           goto cleanup;
1458         }
1459 
1460       free(host);
1461       i++;
1462     }
1463   host = NULL;
1464 
1465   fi_hostlist_iterator_destroy(hitr);
1466   hitr = NULL;
1467 
1468   fi_hostlist_destroy(h);
1469   h = NULL;
1470 
1471   /* Launch threads up to fanout */
1472   for (i = 0; i < h_count; i++)
1473     {
1474       if ((rc = pthread_mutex_lock(&pstdout_threadcount_mutex)))
1475         {
1476           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1477             fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1478           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1479           goto cleanup;
1480         }
1481 
1482       if (pstdout_threadcount == pstdout_fanout)
1483         {
1484           if ((rc = pthread_cond_wait(&pstdout_threadcount_cond, &pstdout_threadcount_mutex)))
1485             {
1486               if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1487                 fprintf(stderr, "pthread_cond_wait: %s\n", strerror(rc));
1488               pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1489               goto cleanup;
1490             }
1491         }
1492 
1493       if ((rc = pthread_create(&(tdata[i]->tid),
1494                                &(tdata[i]->attr),
1495                                _pstdout_func_entry,
1496                                (void *) tdata[i])))
1497         {
1498           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1499             fprintf(stderr, "pthread_create: %s\n", strerror(rc));
1500           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1501           goto cleanup;
1502         }
1503 
1504       pstdout_threadcount++;
1505 
1506       if ((rc = pthread_mutex_unlock(&pstdout_threadcount_mutex)))
1507         {
1508           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1509             fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1510           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1511           goto cleanup;
1512         }
1513     }
1514 
1515   /* Wait for Threads to finish */
1516 
1517   if ((rc = pthread_mutex_lock(&pstdout_threadcount_mutex)))
1518     {
1519       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1520         fprintf(stderr, "pthread_mutex_lock: %s\n", strerror(rc));
1521       pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1522       goto cleanup;
1523     }
1524 
1525   while (pstdout_threadcount > 0)
1526     {
1527       if ((rc = pthread_cond_wait(&pstdout_threadcount_cond, &pstdout_threadcount_mutex)))
1528         {
1529           if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1530             fprintf(stderr, "pthread_cond_wait: %s\n", strerror(rc));
1531           pstdout_errnum = PSTDOUT_ERR_INTERNAL;
1532           goto cleanup;
1533         }
1534     }
1535 
1536   if (_pstdout_output_consolidated_finish() < 0)
1537     goto cleanup;
1538 
1539   /* Determine exit code */
1540   exit_code = 0;
1541   for (i = 0; i < h_count; i++)
1542     {
1543       if (tdata[i]->exit_code > exit_code)
1544         exit_code = tdata[i]->exit_code;
1545     }
1546 
1547  cleanup:
1548   /* Cannot pass NULL for key, so just pass dummy key */
1549   list_delete_all(pstdout_consolidated_stdout, _pstdout_consolidated_data_delete_all, "");
1550   list_delete_all(pstdout_consolidated_stderr, _pstdout_consolidated_data_delete_all, "");
1551   if (pstate_init)
1552     _pstdout_state_cleanup(&pstate);
1553   if (tdata)
1554     {
1555       for (i = 0; i < h_count; i++)
1556         {
1557           if (tdata[i])
1558             {
1559               free(tdata[i]->hostname);
1560               pthread_attr_destroy(&(tdata[i]->attr));
1561               free(tdata[i]);
1562             }
1563         }
1564       free(tdata);
1565     }
1566   if (hitr)
1567     fi_hostlist_iterator_destroy(hitr);
1568   if (h)
1569     fi_hostlist_destroy(h);
1570   free(host);
1571   if ((rc = pthread_mutex_unlock(&pstdout_launch_mutex)))
1572     {
1573       if (pstdout_debug_flags & PSTDOUT_DEBUG_STANDARD)
1574         fprintf(stderr, "pthread_mutex_unlock: %s\n", strerror(rc));
1575       /* Don't change error code, just move on */
1576     }
1577   if (sighandler_set)
1578     signal(SIGINT, sighandler_save);
1579   return exit_code;
1580 }
1581 
1582 int
PSTDOUT_PRINTF(pstdout_state_t pstate,const char * format,...)1583 PSTDOUT_PRINTF(pstdout_state_t pstate, const char *format, ...)
1584 {
1585   va_list ap;
1586   int rv;
1587 
1588   if (!format)
1589     {
1590       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1591       return -1;
1592     }
1593 
1594   va_start(ap, format);
1595   if (!pstate
1596       || pstate->magic != PSTDOUT_STATE_MAGIC
1597       || !pstdout_initialized)
1598     rv = vprintf(format, ap);
1599   else
1600     rv = _pstdout_print(pstate, 0, stdout, format, ap);
1601   va_end(ap);
1602   return rv;
1603 }
1604 
1605 int
PSTDOUT_FPRINTF(pstdout_state_t pstate,FILE * stream,const char * format,...)1606 PSTDOUT_FPRINTF(pstdout_state_t pstate, FILE *stream, const char *format, ...)
1607 {
1608   va_list ap;
1609   int rv;
1610 
1611   if (!stream || !format)
1612     {
1613       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1614       return -1;
1615     }
1616 
1617   va_start(ap, format);
1618   if (!pstate
1619       || pstate->magic != PSTDOUT_STATE_MAGIC
1620       || (stream != stdout && stream != stderr)
1621       || !pstdout_initialized)
1622     rv = vfprintf(stream, format, ap);
1623   else
1624     rv = _pstdout_print(pstate, 0, stream, format, ap);
1625   va_end(ap);
1626   return rv;
1627 }
1628 
1629 void
PSTDOUT_PERROR(pstdout_state_t pstate,const char * s)1630 PSTDOUT_PERROR(pstdout_state_t pstate, const char *s)
1631 {
1632   if (!s)
1633     {
1634       pstdout_errnum = PSTDOUT_ERR_PARAMETERS;
1635       return;
1636     }
1637 
1638   if (!pstate
1639       || pstate->magic != PSTDOUT_STATE_MAGIC
1640       || !pstdout_initialized)
1641     fprintf(stderr, "%s: %s\n", s, strerror(errno));
1642   else
1643     _pstdout_print_wrapper(pstate, 0, stderr, "%s: %s\n", s, strerror(errno));
1644 }
1645