1 /*
2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2011 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2008      Cisco Systems, Inc.  All rights reserved.
13  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
14  *                         All rights reserved.
15  * Copyright (c) 2015-2020 Intel, Inc.  All rights reserved.
16  * Copyright (c) 2017      IBM Corporation.  All rights reserved.
17  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
18  * Copyright (c) 2018      Research Organization for Information Science
19  *                         and Technology (RIST).  All rights reserved.
20  * $COPYRIGHT$
21  *
22  * Additional copyrights may follow
23  *
24  * $HEADER$
25  */
26 /**
27  * @file
28  *
29  * I/O Forwarding Service
30  */
31 
32 #ifndef PMIX_IOF_H
33 #define PMIX_IOF_H
34 
35 #include "src/include/pmix_config.h"
36 
37 #ifdef HAVE_SYS_TYPES_H
38 #include <sys/types.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_NET_UIO_H
44 #include <net/uio.h>
45 #endif
46 #ifdef HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49 #include <signal.h>
50 
51 #include "src/class/pmix_list.h"
52 #include "src/include/pmix_globals.h"
53 #include "src/util/fd.h"
54 
55 BEGIN_C_DECLS
56 
57 /*
58  * Maximum size of single msg
59  */
60 #define PMIX_IOF_BASE_MSG_MAX           4096
61 #define PMIX_IOF_BASE_TAG_MAX             50
62 #define PMIX_IOF_BASE_TAGGED_OUT_MAX    8192
63 #define PMIX_IOF_MAX_INPUT_BUFFERS        50
64 
65 typedef struct {
66     pmix_list_item_t super;
67     bool pending;
68     bool always_writable;
69     pmix_event_t ev;
70     struct timeval tv;
71     int fd;
72     pmix_list_t outputs;
73 } pmix_iof_write_event_t;
74 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_event_t);
75 
76 typedef struct {
77     pmix_list_item_t super;
78     pmix_proc_t name;
79     pmix_iof_channel_t tag;
80     pmix_iof_write_event_t wev;
81     bool xoff;
82     bool exclusive;
83     bool closed;
84 } pmix_iof_sink_t;
85 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_sink_t);
86 
87 typedef struct {
88     pmix_list_item_t super;
89     char data[PMIX_IOF_BASE_TAGGED_OUT_MAX];
90     int numbytes;
91 } pmix_iof_write_output_t;
92 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_output_t);
93 
94 typedef struct {
95     pmix_object_t super;
96     pmix_event_t ev;
97     struct timeval tv;
98     int fd;
99     bool active;
100     void *childproc;
101     bool always_readable;
102     pmix_proc_t name;
103     pmix_iof_channel_t channel;
104     pmix_proc_t *targets;
105     size_t ntargets;
106     pmix_info_t *directives;
107     size_t ndirs;
108 } pmix_iof_read_event_t;
109 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_read_event_t);
110 
111 
112 /* define a struct to hold booleans controlling the
113  * format/contents of the output */
114 typedef struct {
115     bool xml;
116     time_t timestamp;
117     bool tag;
118 } pmix_iof_flags_t;
119 
120 
121 /* Write event macro's */
122 
123 static inline bool
pmix_iof_fd_always_ready(int fd)124 pmix_iof_fd_always_ready(int fd)
125 {
126     return pmix_fd_is_regular(fd) ||
127            (pmix_fd_is_chardev(fd) && !isatty(fd)) ||
128            pmix_fd_is_blkdev(fd);
129 }
130 
131 #define PMIX_IOF_SINK_BLOCKSIZE (1024)
132 
133 #define PMIX_IOF_SINK_ACTIVATE(wev)                                     \
134     do {                                                                \
135         struct timeval *tv = NULL;                                      \
136         wev->pending = true;                                            \
137         PMIX_POST_OBJECT(wev);                                          \
138         if (wev->always_writable) {                                     \
139             /* Regular is always write ready. Use timer to activate */  \
140             tv = &wev->tv;                                              \
141         }                                                               \
142         if (pmix_event_add(&wev->ev, tv)) {                             \
143             PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);                         \
144         }                                                               \
145     } while(0);
146 
147 
148 /* define an output "sink", adding it to the provided
149  * endpoint list for this proc */
150 #define PMIX_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr)                    \
151     do {                                                                    \
152         PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,             \
153                             "defining endpt: file %s line %d fd %d",        \
154                             __FILE__, __LINE__, (fid)));                    \
155         PMIX_CONSTRUCT((snk), pmix_iof_sink_t);                             \
156         pmix_strncpy((snk)->name.nspace, (nm)->nspace, PMIX_MAX_NSLEN);    \
157         (snk)->name.rank = (nm)->rank;                                      \
158         (snk)->tag = (tg);                                                  \
159         if (0 <= (fid)) {                                                   \
160             (snk)->wev.fd = (fid);                                          \
161             (snk)->wev.always_writable =                                    \
162                     pmix_iof_fd_always_ready(fid);                          \
163             if ((snk)->wev.always_writable) {                               \
164                 pmix_event_evtimer_set(pmix_globals.evbase,                 \
165                                        &(snk)->wev.ev,  wrthndlr, (snk));   \
166             } else {                                                        \
167                 pmix_event_set(pmix_globals.evbase,                         \
168                                &(snk)->wev.ev, (snk)->wev.fd,               \
169                                PMIX_EV_WRITE,                               \
170                                wrthndlr, (snk));                            \
171             }                                                               \
172         }                                                                   \
173         PMIX_POST_OBJECT(snk);                                              \
174     } while(0);
175 
176 /* Read event macro's */
177 #define PMIX_IOF_READ_ADDEV(rev)                                \
178     do {                                                        \
179         struct timeval *tv = NULL;                              \
180         if ((rev)->always_readable) {                           \
181             tv = &(rev)->tv;                                    \
182         }                                                       \
183         if (pmix_event_add(&(rev)->ev, tv)) {                   \
184             PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);                 \
185         }                                                       \
186     } while(0);
187 
188 #define PMIX_IOF_READ_ACTIVATE(rev)                             \
189     do {                                                        \
190         (rev)->active = true;                                   \
191         PMIX_POST_OBJECT(rev);                                  \
192         PMIX_IOF_READ_ADDEV(rev);                               \
193     } while(0);
194 
195 
196 #define PMIX_IOF_READ_EVENT(rv, p, np, d, nd, fid, cbfunc, actv)        \
197     do {                                                                \
198         size_t _ii;                                                     \
199         pmix_iof_read_event_t *rev;                                     \
200         PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,         \
201                             "defining read event at: %s %d",            \
202                             __FILE__, __LINE__));                       \
203         rev = PMIX_NEW(pmix_iof_read_event_t);                          \
204         if (NULL != (p)) {                                              \
205             (rev)->ntargets = (np);                                     \
206             PMIX_PROC_CREATE((rev)->targets, (rev)->ntargets);          \
207             memcpy((rev)->targets, (p), (np) * sizeof(pmix_proc_t));    \
208         }                                                               \
209         if (NULL != (d) && 0 < (nd)) {                                  \
210             PMIX_INFO_CREATE((rev)->directives, (nd));                  \
211             (rev)->ndirs = (nd);                                        \
212             for (_ii=0; _ii < (size_t)nd; _ii++) {                      \
213                 PMIX_INFO_XFER(&((rev)->directives[_ii]), &((d)[_ii])); \
214             }                                                           \
215         }                                                               \
216         rev->fd = (fid);                                                \
217         rev->always_readable = pmix_iof_fd_always_ready(fid);           \
218         *(rv) = rev;                                                    \
219         if(rev->always_readable) {                                      \
220             pmix_event_evtimer_set(pmix_globals.evbase,                 \
221                                    &rev->ev, (cbfunc), rev);            \
222         } else {                                                        \
223             pmix_event_set(pmix_globals.evbase,                         \
224                            &rev->ev, (fid),                             \
225                            PMIX_EV_READ,                                \
226                            (cbfunc), rev);                              \
227         }                                                               \
228         if ((actv)) {                                                   \
229             PMIX_IOF_READ_ACTIVATE(rev)                                 \
230         }                                                               \
231     } while(0);
232 
233 
234 PMIX_EXPORT pmix_status_t pmix_iof_flush(void);
235 
236 PMIX_EXPORT pmix_status_t pmix_iof_write_output(const pmix_proc_t *name,
237                                                 pmix_iof_channel_t stream,
238                                                 const pmix_byte_object_t *bo,
239                                                 pmix_iof_flags_t *flags);
240 PMIX_EXPORT void pmix_iof_static_dump_output(pmix_iof_sink_t *sink);
241 PMIX_EXPORT void pmix_iof_write_handler(int fd, short event, void *cbdata);
242 PMIX_EXPORT bool pmix_iof_stdin_check(int fd);
243 PMIX_EXPORT void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata);
244 PMIX_EXPORT void pmix_iof_stdin_cb(int fd, short event, void *cbdata);
245 PMIX_EXPORT pmix_status_t pmix_iof_process_iof(pmix_iof_channel_t channels,
246                                                const pmix_proc_t *source,
247                                                const pmix_byte_object_t *bo,
248                                                const pmix_info_t *info, size_t ninfo,
249                                                const pmix_iof_req_t *req);
250 
251 END_C_DECLS
252 
253 #endif /* PMIX_IOF_H */
254