1 /*
2 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2006 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
13 * Copyright (c) 2017 Intel, Inc. All rights reserved.
14 * Copyright (c) 2017 Mellanox Technologies. All rights reserved.
15 * $COPYRIGHT$
16 *
17 * Additional copyrights may follow
18 *
19 * $HEADER$
20 *
21 * These symbols are in a file by themselves to provide nice linker
22 * semantics. Since linkers generally pull in symbols by object
23 * files, keeping these symbols as the only symbols in this file
24 * prevents utility programs such as "ompi_info" from having to import
25 * entire components just to query their version and parameters.
26 */
27
28 #include "orte_config.h"
29 #include "orte/constants.h"
30
31 #include <string.h>
32 #include <stdlib.h>
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <time.h>
37 #include <errno.h>
38
39 #include "opal/util/output.h"
40
41 #include "orte/util/name_fns.h"
42 #include "orte/util/threads.h"
43 #include "orte/runtime/orte_globals.h"
44 #include "orte/mca/errmgr/errmgr.h"
45 #include "orte/mca/state/state.h"
46
47 #include "orte/mca/iof/base/base.h"
48
orte_iof_base_write_output(const orte_process_name_t * name,orte_iof_tag_t stream,const unsigned char * data,int numbytes,orte_iof_write_event_t * channel)49 int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
50 const unsigned char *data, int numbytes,
51 orte_iof_write_event_t *channel)
52 {
53 char starttag[ORTE_IOF_BASE_TAG_MAX], endtag[ORTE_IOF_BASE_TAG_MAX], *suffix;
54 orte_iof_write_output_t *output;
55 int i, j, k, starttaglen, endtaglen, num_buffered;
56 bool endtagged;
57 char qprint[10];
58
59 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
60 "%s write:output setting up to write %d bytes to %s for %s on fd %d",
61 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
62 (ORTE_IOF_STDIN & stream) ? "stdin" : ((ORTE_IOF_STDOUT & stream) ? "stdout" : ((ORTE_IOF_STDERR & stream) ? "stderr" : "stddiag")),
63 ORTE_NAME_PRINT(name),
64 (NULL == channel) ? -1 : channel->fd));
65
66 /* setup output object */
67 output = OBJ_NEW(orte_iof_write_output_t);
68
69 /* write output data to the corresponding tag */
70 if (ORTE_IOF_STDIN & stream) {
71 /* copy over the data to be written */
72 if (0 < numbytes) {
73 /* don't copy 0 bytes - we just need to pass
74 * the zero bytes so the fd can be closed
75 * after it writes everything out
76 */
77 memcpy(output->data, data, numbytes);
78 }
79 output->numbytes = numbytes;
80 goto process;
81 } else if (ORTE_IOF_STDOUT & stream) {
82 /* write the bytes to stdout */
83 suffix = "stdout";
84 } else if (ORTE_IOF_STDERR & stream) {
85 /* write the bytes to stderr */
86 suffix = "stderr";
87 } else if (ORTE_IOF_STDDIAG & stream) {
88 /* write the bytes to stderr */
89 suffix = "stddiag";
90 } else {
91 /* error - this should never happen */
92 ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
93 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
94 "%s stream %0x", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), stream));
95 return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
96 }
97
98 /* if this is to be xml tagged, create a tag with the correct syntax - we do not allow
99 * timestamping of xml output
100 */
101 if (orte_xml_output) {
102 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "<%s rank=\"%s\">", suffix, ORTE_VPID_PRINT(name->vpid));
103 snprintf(endtag, ORTE_IOF_BASE_TAG_MAX, "</%s>", suffix);
104 goto construct;
105 }
106
107 /* if we are to timestamp output, start the tag with that */
108 if (orte_timestamp_output) {
109 time_t mytime;
110 char *cptr;
111 /* get the timestamp */
112 time(&mytime);
113 cptr = ctime(&mytime);
114 cptr[strlen(cptr)-1] = '\0'; /* remove trailing newline */
115
116 if (orte_tag_output) {
117 /* if we want it tagged as well, use both */
118 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s[%s,%s]<%s>:",
119 cptr, ORTE_LOCAL_JOBID_PRINT(name->jobid),
120 ORTE_VPID_PRINT(name->vpid), suffix);
121 } else {
122 /* only use timestamp */
123 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s<%s>:", cptr, suffix);
124 }
125 /* no endtag for this option */
126 memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
127 goto construct;
128 }
129
130 if (orte_tag_output) {
131 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "[%s,%s]<%s>:",
132 ORTE_LOCAL_JOBID_PRINT(name->jobid),
133 ORTE_VPID_PRINT(name->vpid), suffix);
134 /* no endtag for this option */
135 memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
136 goto construct;
137 }
138
139 /* if we get here, then the data is not to be tagged - just copy it
140 * and move on to processing
141 */
142 if (0 < numbytes) {
143 /* don't copy 0 bytes - we just need to pass
144 * the zero bytes so the fd can be closed
145 * after it writes everything out
146 */
147 memcpy(output->data, data, numbytes);
148 }
149 output->numbytes = numbytes;
150 goto process;
151
152 construct:
153 starttaglen = strlen(starttag);
154 endtaglen = strlen(endtag);
155 endtagged = false;
156 /* start with the tag */
157 for (j=0, k=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
158 output->data[k++] = starttag[j];
159 }
160 /* cycle through the data looking for <cr>
161 * and replace those with the tag
162 */
163 for (i=0; i < numbytes && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; i++) {
164 if (orte_xml_output) {
165 if ('&' == data[i]) {
166 if (k+5 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
167 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
168 goto process;
169 }
170 snprintf(qprint, 10, "&");
171 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
172 output->data[k++] = qprint[j];
173 }
174 } else if ('<' == data[i]) {
175 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
176 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
177 goto process;
178 }
179 snprintf(qprint, 10, "<");
180 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
181 output->data[k++] = qprint[j];
182 }
183 } else if ('>' == data[i]) {
184 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
185 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
186 goto process;
187 }
188 snprintf(qprint, 10, ">");
189 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
190 output->data[k++] = qprint[j];
191 }
192 } else if (data[i] < 32 || data[i] > 127) {
193 /* this is a non-printable character, so escape it too */
194 if (k+7 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
195 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
196 goto process;
197 }
198 snprintf(qprint, 10, "&#%03d;", (int)data[i]);
199 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
200 output->data[k++] = qprint[j];
201 }
202 /* if this was a \n, then we also need to break the line with the end tag */
203 if ('\n' == data[i] && (k+endtaglen+1) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
204 /* we need to break the line with the end tag */
205 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
206 output->data[k++] = endtag[j];
207 }
208 /* move the <cr> over */
209 output->data[k++] = '\n';
210 /* if this isn't the end of the data buffer, add a new start tag */
211 if (i < numbytes-1 && (k+starttaglen) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
212 for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
213 output->data[k++] = starttag[j];
214 endtagged = false;
215 }
216 } else {
217 endtagged = true;
218 }
219 }
220 } else {
221 output->data[k++] = data[i];
222 }
223 } else {
224 if ('\n' == data[i]) {
225 /* we need to break the line with the end tag */
226 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
227 output->data[k++] = endtag[j];
228 }
229 /* move the <cr> over */
230 output->data[k++] = '\n';
231 /* if this isn't the end of the data buffer, add a new start tag */
232 if (i < numbytes-1) {
233 for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
234 output->data[k++] = starttag[j];
235 endtagged = false;
236 }
237 } else {
238 endtagged = true;
239 }
240 } else {
241 output->data[k++] = data[i];
242 }
243 }
244 }
245 if (!endtagged && k < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
246 /* need to add an endtag */
247 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
248 output->data[k++] = endtag[j];
249 }
250 output->data[k] = '\n';
251 }
252 output->numbytes = k;
253
254 process:
255 /* add this data to the write list for this fd */
256 opal_list_append(&channel->outputs, &output->super);
257
258 /* record how big the buffer is */
259 num_buffered = opal_list_get_size(&channel->outputs);
260
261 /* is the write event issued? */
262 if (!channel->pending) {
263 /* issue it */
264 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
265 "%s write:output adding write event",
266 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
267 ORTE_IOF_SINK_ACTIVATE(channel);
268 }
269
270 return num_buffered;
271 }
272
orte_iof_base_static_dump_output(orte_iof_read_event_t * rev)273 void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
274 {
275 bool dump;
276 int num_written;
277 orte_iof_write_event_t *wev;
278 orte_iof_write_output_t *output;
279
280 if (NULL != rev->sink) {
281 wev = rev->sink->wev;
282 if (NULL != wev && !opal_list_is_empty(&wev->outputs)) {
283 dump = false;
284 /* make one last attempt to write this out */
285 while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
286 if (!dump) {
287 num_written = write(wev->fd, output->data, output->numbytes);
288 if (num_written < output->numbytes) {
289 /* don't retry - just cleanout the list and dump it */
290 dump = true;
291 }
292 }
293 OBJ_RELEASE(output);
294 }
295 }
296 }
297 }
298
orte_iof_base_write_handler(int _fd,short event,void * cbdata)299 void orte_iof_base_write_handler(int _fd, short event, void *cbdata)
300 {
301 orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
302 orte_iof_write_event_t *wev = sink->wev;
303 opal_list_item_t *item;
304 orte_iof_write_output_t *output;
305 int num_written, total_written = 0;
306
307 ORTE_ACQUIRE_OBJECT(sink);
308
309 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
310 "%s write:handler writing data to %d",
311 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
312 wev->fd));
313
314 while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
315 output = (orte_iof_write_output_t*)item;
316 if (0 == output->numbytes) {
317 /* indicates we are to close this stream */
318 OBJ_RELEASE(sink);
319 return;
320 }
321 num_written = write(wev->fd, output->data, output->numbytes);
322 if (num_written < 0) {
323 if (EAGAIN == errno || EINTR == errno) {
324 /* push this item back on the front of the list */
325 opal_list_prepend(&wev->outputs, item);
326 /* if the list is getting too large, abort */
327 if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
328 opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
329 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
330 goto ABORT;
331 }
332 /* leave the write event running so it will call us again
333 * when the fd is ready.
334 */
335 goto NEXT_CALL;
336 }
337 /* otherwise, something bad happened so all we can do is abort
338 * this attempt
339 */
340 OBJ_RELEASE(output);
341 goto ABORT;
342 } else if (num_written < output->numbytes) {
343 /* incomplete write - adjust data to avoid duplicate output */
344 memmove(output->data, &output->data[num_written], output->numbytes - num_written);
345 /* adjust the number of bytes remaining to be written */
346 output->numbytes -= num_written;
347 /* push this item back on the front of the list */
348 opal_list_prepend(&wev->outputs, item);
349 /* if the list is getting too large, abort */
350 if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
351 opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
352 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
353 goto ABORT;
354 }
355 /* leave the write event running so it will call us again
356 * when the fd is ready
357 */
358 goto NEXT_CALL;
359 }
360 OBJ_RELEASE(output);
361
362 total_written += num_written;
363 if(wev->always_writable && (ORTE_IOF_SINK_BLOCKSIZE <= total_written)){
364 /* If this is a regular file it will never tell us it will block
365 * Write no more than ORTE_IOF_REGULARF_BLOCK at a time allowing
366 * other fds to progress
367 */
368 goto NEXT_CALL;
369 }
370 }
371 ABORT:
372 wev->pending = false;
373 ORTE_POST_OBJECT(wev);
374 return;
375 NEXT_CALL:
376 ORTE_IOF_SINK_ACTIVATE(wev);
377 }
378