1 /*
2 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2011 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
13 * Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
14 * Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
15 * Copyright (c) 2015-2018 Research Organization for Information Science
16 * and Technology (RIST). All rights reserved.
17 * Copyright (c) 2017 IBM Corporation. All rights reserved.
18 * Copyright (c) 2017 Mellanox Technologies. All rights reserved.
19 * $COPYRIGHT$
20 *
21 * Additional copyrights may follow
22 *
23 * $HEADER$
24 */
25
26
27 #include "orte_config.h"
28 #include "orte/constants.h"
29
30 #include <string.h>
31 #include <stdio.h>
32
33 #include "orte/mca/mca.h"
34 #include "opal/mca/base/base.h"
35 #include "opal/util/os_dirpath.h"
36 #include "opal/util/output.h"
37 #include "opal/util/basename.h"
38
39 #include "orte/util/proc_info.h"
40 #include "orte/runtime/orte_globals.h"
41 #include "orte/util/name_fns.h"
42 #include "orte/mca/rml/rml.h"
43
44 #include "orte/mca/iof/iof.h"
45 #include "orte/mca/iof/base/base.h"
46
47 /*
48 * The following file was created by configure. It contains extern
49 * statements and the definition of an array of pointers to each
50 * component's public orte_base_component_t struct.
51 */
52
53 #include "orte/mca/iof/base/static-components.h"
54
55 orte_iof_base_module_t orte_iof = {0};
56
57
58 /*
59 * Global variables
60 */
61
62 orte_iof_base_t orte_iof_base = {0};
63
orte_iof_base_register(mca_base_register_flag_t flags)64 static int orte_iof_base_register(mca_base_register_flag_t flags)
65 {
66 /* check for maximum number of pending output messages */
67 orte_iof_base.output_limit = (size_t) INT_MAX;
68 (void) mca_base_var_register("orte", "iof", "base", "output_limit",
69 "Maximum backlog of output messages [default: unlimited]",
70 MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0,
71 OPAL_INFO_LVL_9,
72 MCA_BASE_VAR_SCOPE_READONLY,
73 &orte_iof_base.output_limit);
74
75 /* Redirect application stderr to stdout (at source) */
76 orte_iof_base.redirect_app_stderr_to_stdout = false;
77 (void) mca_base_var_register("orte", "iof","base", "redirect_app_stderr_to_stdout",
78 "Redirect application stderr to stdout at source (default: false)",
79 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
80 OPAL_INFO_LVL_9,
81 MCA_BASE_VAR_SCOPE_READONLY,
82 &orte_iof_base.redirect_app_stderr_to_stdout);
83
84 return ORTE_SUCCESS;
85 }
86
orte_iof_base_close(void)87 static int orte_iof_base_close(void)
88 {
89 /* Close the selected component */
90 if (NULL != orte_iof.finalize) {
91 orte_iof.finalize();
92 }
93
94 if (!ORTE_PROC_IS_DAEMON) {
95 if (NULL != orte_iof_base.iof_write_stdout) {
96 OBJ_RELEASE(orte_iof_base.iof_write_stdout);
97 }
98 if (!orte_xml_output && NULL != orte_iof_base.iof_write_stderr) {
99 OBJ_RELEASE(orte_iof_base.iof_write_stderr);
100 }
101 }
102 return mca_base_framework_components_close(&orte_iof_base_framework, NULL);
103 }
104
105
106 /**
107 * Function for finding and opening either all MCA components, or the one
108 * that was specifically requested via a MCA parameter.
109 */
orte_iof_base_open(mca_base_open_flag_t flags)110 static int orte_iof_base_open(mca_base_open_flag_t flags)
111 {
112 int xmlfd;
113
114 /* daemons do not need to do this as they do not write out stdout/err */
115 if (!ORTE_PROC_IS_DAEMON) {
116 if (orte_xml_output) {
117 if (NULL != orte_xml_fp) {
118 /* user wants all xml-formatted output sent to file */
119 xmlfd = fileno(orte_xml_fp);
120 } else {
121 xmlfd = 1;
122 }
123 /* setup the stdout event */
124 ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
125 xmlfd, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
126 /* don't create a stderr event - all output will go to
127 * the stdout channel
128 */
129 } else {
130 /* setup the stdout event */
131 ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
132 1, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
133 /* setup the stderr event */
134 ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stderr, ORTE_PROC_MY_NAME,
135 2, ORTE_IOF_STDERR, orte_iof_base_write_handler);
136 }
137
138 /* do NOT set these file descriptors to non-blocking. If we do so,
139 * we set the file descriptor to non-blocking for everyone that has
140 * that file descriptor, which includes everyone else in our shell
141 * pipeline chain. (See
142 * http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).
143 * This causes things like "mpirun -np 1 big_app | cat" to lose
144 * output, because cat's stdout is then ALSO non-blocking and cat
145 * isn't built to deal with that case (same with almost all other
146 * unix text utils).
147 */
148 }
149
150 /* Open up all available components */
151 return mca_base_framework_components_open(&orte_iof_base_framework, flags);
152 }
153
154 MCA_BASE_FRAMEWORK_DECLARE(orte, iof, "ORTE I/O Forwarding",
155 orte_iof_base_register, orte_iof_base_open, orte_iof_base_close,
156 mca_iof_base_static_components, 0);
157
158
159 /* class instances */
orte_iof_job_construct(orte_iof_job_t * ptr)160 static void orte_iof_job_construct(orte_iof_job_t *ptr)
161 {
162 ptr->jdata = NULL;
163 OBJ_CONSTRUCT(&ptr->xoff, opal_bitmap_t);
164 }
orte_iof_job_destruct(orte_iof_job_t * ptr)165 static void orte_iof_job_destruct(orte_iof_job_t *ptr)
166 {
167 if (NULL != ptr->jdata) {
168 OBJ_RELEASE(ptr->jdata);
169 }
170 OBJ_DESTRUCT(&ptr->xoff);
171 }
172 OBJ_CLASS_INSTANCE(orte_iof_job_t,
173 opal_object_t,
174 orte_iof_job_construct,
175 orte_iof_job_destruct);
176
orte_iof_base_proc_construct(orte_iof_proc_t * ptr)177 static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
178 {
179 ptr->stdinev = NULL;
180 ptr->revstdout = NULL;
181 ptr->revstderr = NULL;
182 #if OPAL_PMIX_V1
183 ptr->revstddiag = NULL;
184 #endif
185 ptr->subscribers = NULL;
186 ptr->copy = true;
187 }
orte_iof_base_proc_destruct(orte_iof_proc_t * ptr)188 static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
189 {
190 if (NULL != ptr->stdinev) {
191 OBJ_RELEASE(ptr->stdinev);
192 }
193 if (NULL != ptr->revstdout) {
194 OBJ_RELEASE(ptr->revstdout);
195 }
196 if (NULL != ptr->revstderr) {
197 OBJ_RELEASE(ptr->revstderr);
198 }
199 #if OPAL_PMIX_V1
200 if (NULL != ptr->revstddiag) {
201 OBJ_RELEASE(ptr->revstddiag);
202 }
203 #endif
204 if (NULL != ptr->subscribers) {
205 OPAL_LIST_RELEASE(ptr->subscribers);
206 }
207 }
208 OBJ_CLASS_INSTANCE(orte_iof_proc_t,
209 opal_list_item_t,
210 orte_iof_base_proc_construct,
211 orte_iof_base_proc_destruct);
212
213
orte_iof_base_sink_construct(orte_iof_sink_t * ptr)214 static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
215 {
216 ptr->daemon.jobid = ORTE_JOBID_INVALID;
217 ptr->daemon.vpid = ORTE_VPID_INVALID;
218 ptr->wev = OBJ_NEW(orte_iof_write_event_t);
219 ptr->xoff = false;
220 ptr->exclusive = false;
221 ptr->closed = false;
222 }
orte_iof_base_sink_destruct(orte_iof_sink_t * ptr)223 static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr)
224 {
225 if (NULL != ptr->wev && 0 <= ptr->wev->fd) {
226 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
227 "%s iof: closing sink for process %s on fd %d",
228 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
229 ORTE_NAME_PRINT(&ptr->name), ptr->wev->fd));
230 OBJ_RELEASE(ptr->wev);
231 }
232 }
233 OBJ_CLASS_INSTANCE(orte_iof_sink_t,
234 opal_list_item_t,
235 orte_iof_base_sink_construct,
236 orte_iof_base_sink_destruct);
237
238
orte_iof_base_read_event_construct(orte_iof_read_event_t * rev)239 static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
240 {
241 rev->proc = NULL;
242 rev->fd = -1;
243 rev->active = false;
244 rev->ev = opal_event_alloc();
245 rev->sink = NULL;
246 rev->tv.tv_sec = 0;
247 rev->tv.tv_usec = 0;
248 }
orte_iof_base_read_event_destruct(orte_iof_read_event_t * rev)249 static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
250 {
251 orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
252
253 opal_event_free(rev->ev);
254 if (0 <= rev->fd) {
255 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
256 "%s iof: closing fd %d for process %s",
257 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rev->fd,
258 (NULL == proct) ? "UNKNOWN" : ORTE_NAME_PRINT(&proct->name)));
259 close(rev->fd);
260 rev->fd = -1;
261 }
262 if (NULL != rev->sink) {
263 OBJ_RELEASE(rev->sink);
264 }
265 if (NULL != proct) {
266 OBJ_RELEASE(proct);
267 }
268 }
269 OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
270 opal_object_t,
271 orte_iof_base_read_event_construct,
272 orte_iof_base_read_event_destruct);
273
orte_iof_base_write_event_construct(orte_iof_write_event_t * wev)274 static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
275 {
276 wev->pending = false;
277 wev->always_writable = false;
278 wev->fd = -1;
279 OBJ_CONSTRUCT(&wev->outputs, opal_list_t);
280 wev->ev = opal_event_alloc();
281 wev->tv.tv_sec = 0;
282 wev->tv.tv_usec = 0;
283 }
orte_iof_base_write_event_destruct(orte_iof_write_event_t * wev)284 static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
285 {
286 opal_event_free(wev->ev);
287 if (ORTE_PROC_IS_HNP && NULL != orte_xml_fp) {
288 int xmlfd = fileno(orte_xml_fp);
289 if (xmlfd == wev->fd) {
290 /* don't close this one - will get it later */
291 OBJ_DESTRUCT(&wev->outputs);
292 return;
293 }
294 }
295 if (2 < wev->fd) {
296 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
297 "%s iof: closing fd %d for write event",
298 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
299 close(wev->fd);
300 }
301 OBJ_DESTRUCT(&wev->outputs);
302 }
303 OBJ_CLASS_INSTANCE(orte_iof_write_event_t,
304 opal_list_item_t,
305 orte_iof_base_write_event_construct,
306 orte_iof_base_write_event_destruct);
307
308 OBJ_CLASS_INSTANCE(orte_iof_write_output_t,
309 opal_list_item_t,
310 NULL, NULL);
311