1 /*
2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2011 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2013      Los Alamos National Security, LLC.  All rights reserved.
13  * Copyright (c) 2013      Cisco Systems, Inc.  All rights reserved.
14  * Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
15  * Copyright (c) 2015-2018 Research Organization for Information Science
16  *                         and Technology (RIST).  All rights reserved.
17  * Copyright (c) 2017      IBM Corporation.  All rights reserved.
18  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
19  * $COPYRIGHT$
20  *
21  * Additional copyrights may follow
22  *
23  * $HEADER$
24  */
25 
26 
27 #include "orte_config.h"
28 #include "orte/constants.h"
29 
30 #include <string.h>
31 #include <stdio.h>
32 
33 #include "orte/mca/mca.h"
34 #include "opal/mca/base/base.h"
35 #include "opal/util/os_dirpath.h"
36 #include "opal/util/output.h"
37 #include "opal/util/basename.h"
38 
39 #include "orte/util/proc_info.h"
40 #include "orte/runtime/orte_globals.h"
41 #include "orte/util/name_fns.h"
42 #include "orte/mca/rml/rml.h"
43 
44 #include "orte/mca/iof/iof.h"
45 #include "orte/mca/iof/base/base.h"
46 
47 /*
48  * The following file was created by configure.  It contains extern
49  * statements and the definition of an array of pointers to each
50  * component's public orte_base_component_t struct.
51  */
52 
53 #include "orte/mca/iof/base/static-components.h"
54 
55 orte_iof_base_module_t orte_iof = {0};
56 
57 
58 /*
59  * Global variables
60  */
61 
62 orte_iof_base_t orte_iof_base = {0};
63 
orte_iof_base_register(mca_base_register_flag_t flags)64 static int orte_iof_base_register(mca_base_register_flag_t flags)
65 {
66     /* check for maximum number of pending output messages */
67     orte_iof_base.output_limit = (size_t) INT_MAX;
68     (void) mca_base_var_register("orte", "iof", "base", "output_limit",
69                                  "Maximum backlog of output messages [default: unlimited]",
70                                  MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0,
71                                  OPAL_INFO_LVL_9,
72                                  MCA_BASE_VAR_SCOPE_READONLY,
73                                  &orte_iof_base.output_limit);
74 
75     /* Redirect application stderr to stdout (at source) */
76     orte_iof_base.redirect_app_stderr_to_stdout = false;
77     (void) mca_base_var_register("orte", "iof","base", "redirect_app_stderr_to_stdout",
78                                  "Redirect application stderr to stdout at source (default: false)",
79                                  MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
80                                  OPAL_INFO_LVL_9,
81                                  MCA_BASE_VAR_SCOPE_READONLY,
82                                  &orte_iof_base.redirect_app_stderr_to_stdout);
83 
84     return ORTE_SUCCESS;
85 }
86 
orte_iof_base_close(void)87 static int orte_iof_base_close(void)
88 {
89     /* Close the selected component */
90     if (NULL != orte_iof.finalize) {
91         orte_iof.finalize();
92     }
93 
94     if (!ORTE_PROC_IS_DAEMON) {
95         if (NULL != orte_iof_base.iof_write_stdout) {
96             OBJ_RELEASE(orte_iof_base.iof_write_stdout);
97         }
98         if (!orte_xml_output && NULL != orte_iof_base.iof_write_stderr) {
99             OBJ_RELEASE(orte_iof_base.iof_write_stderr);
100         }
101     }
102     return mca_base_framework_components_close(&orte_iof_base_framework, NULL);
103 }
104 
105 
106 /**
107  * Function for finding and opening either all MCA components, or the one
108  * that was specifically requested via a MCA parameter.
109  */
orte_iof_base_open(mca_base_open_flag_t flags)110 static int orte_iof_base_open(mca_base_open_flag_t flags)
111 {
112     int xmlfd;
113 
114     /* daemons do not need to do this as they do not write out stdout/err */
115     if (!ORTE_PROC_IS_DAEMON) {
116         if (orte_xml_output) {
117             if (NULL != orte_xml_fp) {
118                 /* user wants all xml-formatted output sent to file */
119                 xmlfd = fileno(orte_xml_fp);
120             } else {
121                 xmlfd = 1;
122             }
123             /* setup the stdout event */
124             ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
125                                  xmlfd, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
126             /* don't create a stderr event - all output will go to
127              * the stdout channel
128              */
129         } else {
130             /* setup the stdout event */
131             ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
132                                  1, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
133             /* setup the stderr event */
134             ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stderr, ORTE_PROC_MY_NAME,
135                                  2, ORTE_IOF_STDERR, orte_iof_base_write_handler);
136         }
137 
138         /* do NOT set these file descriptors to non-blocking. If we do so,
139          * we set the file descriptor to non-blocking for everyone that has
140          * that file descriptor, which includes everyone else in our shell
141          * pipeline chain.  (See
142          * http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).
143          * This causes things like "mpirun -np 1 big_app | cat" to lose
144          * output, because cat's stdout is then ALSO non-blocking and cat
145          * isn't built to deal with that case (same with almost all other
146          * unix text utils).
147          */
148     }
149 
150     /* Open up all available components */
151     return mca_base_framework_components_open(&orte_iof_base_framework, flags);
152 }
153 
154 MCA_BASE_FRAMEWORK_DECLARE(orte, iof, "ORTE I/O Forwarding",
155                            orte_iof_base_register, orte_iof_base_open, orte_iof_base_close,
156                            mca_iof_base_static_components, 0);
157 
158 
159 /* class instances */
orte_iof_job_construct(orte_iof_job_t * ptr)160 static void orte_iof_job_construct(orte_iof_job_t *ptr)
161 {
162     ptr->jdata = NULL;
163     OBJ_CONSTRUCT(&ptr->xoff, opal_bitmap_t);
164 }
orte_iof_job_destruct(orte_iof_job_t * ptr)165 static void orte_iof_job_destruct(orte_iof_job_t *ptr)
166 {
167     if (NULL != ptr->jdata) {
168         OBJ_RELEASE(ptr->jdata);
169     }
170     OBJ_DESTRUCT(&ptr->xoff);
171 }
172 OBJ_CLASS_INSTANCE(orte_iof_job_t,
173                    opal_object_t,
174                    orte_iof_job_construct,
175                    orte_iof_job_destruct);
176 
orte_iof_base_proc_construct(orte_iof_proc_t * ptr)177 static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
178 {
179     ptr->stdinev = NULL;
180     ptr->revstdout = NULL;
181     ptr->revstderr = NULL;
182 #if OPAL_PMIX_V1
183     ptr->revstddiag = NULL;
184 #endif
185     ptr->subscribers = NULL;
186     ptr->copy = true;
187 }
orte_iof_base_proc_destruct(orte_iof_proc_t * ptr)188 static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
189 {
190     if (NULL != ptr->stdinev) {
191         OBJ_RELEASE(ptr->stdinev);
192     }
193     if (NULL != ptr->revstdout) {
194         OBJ_RELEASE(ptr->revstdout);
195     }
196     if (NULL != ptr->revstderr) {
197         OBJ_RELEASE(ptr->revstderr);
198     }
199 #if OPAL_PMIX_V1
200     if (NULL != ptr->revstddiag) {
201         OBJ_RELEASE(ptr->revstddiag);
202     }
203 #endif
204     if (NULL != ptr->subscribers) {
205         OPAL_LIST_RELEASE(ptr->subscribers);
206     }
207 }
208 OBJ_CLASS_INSTANCE(orte_iof_proc_t,
209                    opal_list_item_t,
210                    orte_iof_base_proc_construct,
211                    orte_iof_base_proc_destruct);
212 
213 
orte_iof_base_sink_construct(orte_iof_sink_t * ptr)214 static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
215 {
216     ptr->daemon.jobid = ORTE_JOBID_INVALID;
217     ptr->daemon.vpid = ORTE_VPID_INVALID;
218     ptr->wev = OBJ_NEW(orte_iof_write_event_t);
219     ptr->xoff = false;
220     ptr->exclusive = false;
221     ptr->closed = false;
222 }
orte_iof_base_sink_destruct(orte_iof_sink_t * ptr)223 static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr)
224 {
225     if (NULL != ptr->wev && 0 <= ptr->wev->fd) {
226         OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
227                              "%s iof: closing sink for process %s on fd %d",
228                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
229                              ORTE_NAME_PRINT(&ptr->name), ptr->wev->fd));
230         OBJ_RELEASE(ptr->wev);
231     }
232 }
233 OBJ_CLASS_INSTANCE(orte_iof_sink_t,
234                    opal_list_item_t,
235                    orte_iof_base_sink_construct,
236                    orte_iof_base_sink_destruct);
237 
238 
orte_iof_base_read_event_construct(orte_iof_read_event_t * rev)239 static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
240 {
241     rev->proc = NULL;
242     rev->fd = -1;
243     rev->active = false;
244     rev->ev = opal_event_alloc();
245     rev->sink = NULL;
246     rev->tv.tv_sec = 0;
247     rev->tv.tv_usec = 0;
248 }
orte_iof_base_read_event_destruct(orte_iof_read_event_t * rev)249 static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
250 {
251     orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
252 
253     opal_event_free(rev->ev);
254     if (0 <= rev->fd) {
255         OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
256                              "%s iof: closing fd %d for process %s",
257                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rev->fd,
258                              (NULL == proct) ? "UNKNOWN" : ORTE_NAME_PRINT(&proct->name)));
259         close(rev->fd);
260         rev->fd = -1;
261     }
262     if (NULL != rev->sink) {
263         OBJ_RELEASE(rev->sink);
264     }
265     if (NULL != proct) {
266         OBJ_RELEASE(proct);
267     }
268 }
269 OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
270                    opal_object_t,
271                    orte_iof_base_read_event_construct,
272                    orte_iof_base_read_event_destruct);
273 
orte_iof_base_write_event_construct(orte_iof_write_event_t * wev)274 static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
275 {
276     wev->pending = false;
277     wev->always_writable = false;
278     wev->fd = -1;
279     OBJ_CONSTRUCT(&wev->outputs, opal_list_t);
280     wev->ev = opal_event_alloc();
281     wev->tv.tv_sec = 0;
282     wev->tv.tv_usec = 0;
283 }
orte_iof_base_write_event_destruct(orte_iof_write_event_t * wev)284 static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
285 {
286     opal_event_free(wev->ev);
287     if (ORTE_PROC_IS_HNP && NULL != orte_xml_fp) {
288         int xmlfd = fileno(orte_xml_fp);
289         if (xmlfd == wev->fd) {
290             /* don't close this one - will get it later */
291             OBJ_DESTRUCT(&wev->outputs);
292             return;
293         }
294     }
295     if (2 < wev->fd) {
296         OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
297                              "%s iof: closing fd %d for write event",
298                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
299         close(wev->fd);
300     }
301     OBJ_DESTRUCT(&wev->outputs);
302 }
303 OBJ_CLASS_INSTANCE(orte_iof_write_event_t,
304                    opal_list_item_t,
305                    orte_iof_base_write_event_construct,
306                    orte_iof_base_write_event_destruct);
307 
308 OBJ_CLASS_INSTANCE(orte_iof_write_output_t,
309                    opal_list_item_t,
310                    NULL, NULL);
311