1 /*
2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2006 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2008      Cisco Systems, Inc.  All rights reserved.
13  * Copyright (c) 2017      Intel, Inc. All rights reserved.
14  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
15  * $COPYRIGHT$
16  *
17  * Additional copyrights may follow
18  *
19  * $HEADER$
20  *
21  * These symbols are in a file by themselves to provide nice linker
22  * semantics.  Since linkers generally pull in symbols by object
23  * files, keeping these symbols as the only symbols in this file
24  * prevents utility programs such as "ompi_info" from having to import
25  * entire components just to query their version and parameters.
26  */
27 
28 #include "orte_config.h"
29 #include "orte/constants.h"
30 
31 #include <string.h>
32 #include <stdlib.h>
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <time.h>
37 #include <errno.h>
38 
39 #include "opal/util/output.h"
40 
41 #include "orte/util/name_fns.h"
42 #include "orte/util/threads.h"
43 #include "orte/runtime/orte_globals.h"
44 #include "orte/mca/errmgr/errmgr.h"
45 #include "orte/mca/state/state.h"
46 
47 #include "orte/mca/iof/base/base.h"
48 
orte_iof_base_write_output(const orte_process_name_t * name,orte_iof_tag_t stream,const unsigned char * data,int numbytes,orte_iof_write_event_t * channel)49 int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
50                                const unsigned char *data, int numbytes,
51                                orte_iof_write_event_t *channel)
52 {
53     char starttag[ORTE_IOF_BASE_TAG_MAX], endtag[ORTE_IOF_BASE_TAG_MAX], *suffix;
54     orte_iof_write_output_t *output;
55     int i, j, k, starttaglen, endtaglen, num_buffered;
56     bool endtagged;
57     char qprint[10];
58 
59     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
60                          "%s write:output setting up to write %d bytes to %s for %s on fd %d",
61                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
62                          (ORTE_IOF_STDIN & stream) ? "stdin" : ((ORTE_IOF_STDOUT & stream) ? "stdout" : ((ORTE_IOF_STDERR & stream) ? "stderr" : "stddiag")),
63                          ORTE_NAME_PRINT(name),
64                          (NULL == channel) ? -1 : channel->fd));
65 
66     /* setup output object */
67     output = OBJ_NEW(orte_iof_write_output_t);
68 
69     /* write output data to the corresponding tag */
70     if (ORTE_IOF_STDIN & stream) {
71         /* copy over the data to be written */
72         if (0 < numbytes) {
73             /* don't copy 0 bytes - we just need to pass
74              * the zero bytes so the fd can be closed
75              * after it writes everything out
76              */
77             memcpy(output->data, data, numbytes);
78         }
79         output->numbytes = numbytes;
80         goto process;
81     } else if (ORTE_IOF_STDOUT & stream) {
82         /* write the bytes to stdout */
83         suffix = "stdout";
84     } else if (ORTE_IOF_STDERR & stream) {
85         /* write the bytes to stderr */
86         suffix = "stderr";
87     } else if (ORTE_IOF_STDDIAG & stream) {
88         /* write the bytes to stderr */
89         suffix = "stddiag";
90     } else {
91         /* error - this should never happen */
92         ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
93         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
94                              "%s stream %0x", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), stream));
95         return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
96     }
97 
98     /* if this is to be xml tagged, create a tag with the correct syntax - we do not allow
99      * timestamping of xml output
100      */
101     if (orte_xml_output) {
102         snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "<%s rank=\"%s\">", suffix, ORTE_VPID_PRINT(name->vpid));
103         snprintf(endtag, ORTE_IOF_BASE_TAG_MAX, "</%s>", suffix);
104         goto construct;
105     }
106 
107     /* if we are to timestamp output, start the tag with that */
108     if (orte_timestamp_output) {
109         time_t mytime;
110         char *cptr;
111         /* get the timestamp */
112         time(&mytime);
113         cptr = ctime(&mytime);
114         cptr[strlen(cptr)-1] = '\0';  /* remove trailing newline */
115 
116         if (orte_tag_output) {
117             /* if we want it tagged as well, use both */
118             snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s[%s,%s]<%s>:",
119                      cptr, ORTE_LOCAL_JOBID_PRINT(name->jobid),
120                      ORTE_VPID_PRINT(name->vpid), suffix);
121         } else {
122             /* only use timestamp */
123             snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s<%s>:", cptr, suffix);
124         }
125         /* no endtag for this option */
126         memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
127         goto construct;
128     }
129 
130     if (orte_tag_output) {
131         snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "[%s,%s]<%s>:",
132                  ORTE_LOCAL_JOBID_PRINT(name->jobid),
133                  ORTE_VPID_PRINT(name->vpid), suffix);
134         /* no endtag for this option */
135         memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
136         goto construct;
137     }
138 
139     /* if we get here, then the data is not to be tagged - just copy it
140      * and move on to processing
141      */
142     if (0 < numbytes) {
143         /* don't copy 0 bytes - we just need to pass
144          * the zero bytes so the fd can be closed
145          * after it writes everything out
146          */
147         memcpy(output->data, data, numbytes);
148     }
149     output->numbytes = numbytes;
150     goto process;
151 
152   construct:
153     starttaglen = strlen(starttag);
154     endtaglen = strlen(endtag);
155     endtagged = false;
156     /* start with the tag */
157     for (j=0, k=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
158         output->data[k++] = starttag[j];
159     }
160     /* cycle through the data looking for <cr>
161      * and replace those with the tag
162      */
163     for (i=0; i < numbytes && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; i++) {
164         if (orte_xml_output) {
165             if ('&' == data[i]) {
166                 if (k+5 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
167                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
168                     goto process;
169                 }
170                 snprintf(qprint, 10, "&amp;");
171                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
172                     output->data[k++] = qprint[j];
173                 }
174             } else if ('<' == data[i]) {
175                 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
176                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
177                     goto process;
178                 }
179                 snprintf(qprint, 10, "&lt;");
180                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
181                     output->data[k++] = qprint[j];
182                 }
183             } else if ('>' == data[i]) {
184                 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
185                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
186                     goto process;
187                 }
188                 snprintf(qprint, 10, "&gt;");
189                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
190                     output->data[k++] = qprint[j];
191                 }
192             } else if (data[i] < 32 || data[i] > 127) {
193                 /* this is a non-printable character, so escape it too */
194                 if (k+7 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
195                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
196                     goto process;
197                 }
198                 snprintf(qprint, 10, "&#%03d;", (int)data[i]);
199                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
200                     output->data[k++] = qprint[j];
201                 }
202                 /* if this was a \n, then we also need to break the line with the end tag */
203                 if ('\n' == data[i] && (k+endtaglen+1) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
204                     /* we need to break the line with the end tag */
205                     for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
206                         output->data[k++] = endtag[j];
207                     }
208                     /* move the <cr> over */
209                     output->data[k++] = '\n';
210                     /* if this isn't the end of the data buffer, add a new start tag */
211                     if (i < numbytes-1 && (k+starttaglen) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
212                         for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
213                             output->data[k++] = starttag[j];
214                             endtagged = false;
215                         }
216                     } else {
217                         endtagged = true;
218                     }
219                 }
220             } else {
221                 output->data[k++] = data[i];
222             }
223         } else {
224             if ('\n' == data[i]) {
225                 /* we need to break the line with the end tag */
226                 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
227                     output->data[k++] = endtag[j];
228                 }
229                 /* move the <cr> over */
230                 output->data[k++] = '\n';
231                 /* if this isn't the end of the data buffer, add a new start tag */
232                 if (i < numbytes-1) {
233                     for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
234                         output->data[k++] = starttag[j];
235                         endtagged = false;
236                     }
237                 } else {
238                     endtagged = true;
239                 }
240             } else {
241                 output->data[k++] = data[i];
242             }
243         }
244     }
245     if (!endtagged && k < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
246         /* need to add an endtag */
247         for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
248             output->data[k++] = endtag[j];
249         }
250         output->data[k] = '\n';
251     }
252     output->numbytes = k;
253 
254   process:
255     /* add this data to the write list for this fd */
256     opal_list_append(&channel->outputs, &output->super);
257 
258     /* record how big the buffer is */
259     num_buffered = opal_list_get_size(&channel->outputs);
260 
261     /* is the write event issued? */
262     if (!channel->pending) {
263         /* issue it */
264         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
265                              "%s write:output adding write event",
266                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
267         ORTE_IOF_SINK_ACTIVATE(channel);
268     }
269 
270     return num_buffered;
271 }
272 
orte_iof_base_static_dump_output(orte_iof_read_event_t * rev)273 void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
274 {
275     bool dump;
276     int num_written;
277     orte_iof_write_event_t *wev;
278     orte_iof_write_output_t *output;
279 
280     if (NULL != rev->sink) {
281         wev = rev->sink->wev;
282         if (NULL != wev && !opal_list_is_empty(&wev->outputs)) {
283             dump = false;
284             /* make one last attempt to write this out */
285             while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
286                 if (!dump) {
287                     num_written = write(wev->fd, output->data, output->numbytes);
288                     if (num_written < output->numbytes) {
289                         /* don't retry - just cleanout the list and dump it */
290                         dump = true;
291                     }
292                 }
293                 OBJ_RELEASE(output);
294             }
295         }
296     }
297 }
298 
orte_iof_base_write_handler(int _fd,short event,void * cbdata)299 void orte_iof_base_write_handler(int _fd, short event, void *cbdata)
300 {
301     orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
302     orte_iof_write_event_t *wev = sink->wev;
303     opal_list_item_t *item;
304     orte_iof_write_output_t *output;
305     int num_written, total_written = 0;
306 
307     ORTE_ACQUIRE_OBJECT(sink);
308 
309     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
310                          "%s write:handler writing data to %d",
311                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
312                          wev->fd));
313 
314     while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
315         output = (orte_iof_write_output_t*)item;
316         if (0 == output->numbytes) {
317             /* indicates we are to close this stream */
318             OBJ_RELEASE(sink);
319             return;
320         }
321         num_written = write(wev->fd, output->data, output->numbytes);
322         if (num_written < 0) {
323             if (EAGAIN == errno || EINTR == errno) {
324                 /* push this item back on the front of the list */
325                 opal_list_prepend(&wev->outputs, item);
326                 /* if the list is getting too large, abort */
327                 if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
328                     opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
329                     ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
330                     goto ABORT;
331                 }
332                 /* leave the write event running so it will call us again
333                  * when the fd is ready.
334                  */
335                 goto NEXT_CALL;
336             }
337             /* otherwise, something bad happened so all we can do is abort
338              * this attempt
339              */
340             OBJ_RELEASE(output);
341             goto ABORT;
342         } else if (num_written < output->numbytes) {
343             /* incomplete write - adjust data to avoid duplicate output */
344             memmove(output->data, &output->data[num_written], output->numbytes - num_written);
345             /* adjust the number of bytes remaining to be written */
346             output->numbytes -= num_written;
347             /* push this item back on the front of the list */
348             opal_list_prepend(&wev->outputs, item);
349             /* if the list is getting too large, abort */
350             if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
351                 opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
352                 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
353                 goto ABORT;
354             }
355             /* leave the write event running so it will call us again
356              * when the fd is ready
357              */
358             goto NEXT_CALL;
359         }
360         OBJ_RELEASE(output);
361 
362         total_written += num_written;
363         if(wev->always_writable && (ORTE_IOF_SINK_BLOCKSIZE <= total_written)){
364             /* If this is a regular file it will never tell us it will block
365              * Write no more than ORTE_IOF_REGULARF_BLOCK at a time allowing
366              * other fds to progress
367              */
368             goto NEXT_CALL;
369         }
370     }
371   ABORT:
372     wev->pending = false;
373     ORTE_POST_OBJECT(wev);
374     return;
375 NEXT_CALL:
376     ORTE_IOF_SINK_ACTIVATE(wev);
377 }
378