1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2011 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
13 * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
14 * All rights reserved.
15 * Copyright (c) 2015-2020 Intel, Inc. All rights reserved.
16 * Copyright (c) 2017 IBM Corporation. All rights reserved.
17 * Copyright (c) 2017 Mellanox Technologies. All rights reserved.
18 * Copyright (c) 2018 Research Organization for Information Science
19 * and Technology (RIST). All rights reserved.
20 * $COPYRIGHT$
21 *
22 * Additional copyrights may follow
23 *
24 * $HEADER$
25 */
26 /**
27 * @file
28 *
29 * I/O Forwarding Service
30 */
31
32 #ifndef PMIX_IOF_H
33 #define PMIX_IOF_H
34
35 #include "src/include/pmix_config.h"
36
37 #ifdef HAVE_SYS_TYPES_H
38 #include <sys/types.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_NET_UIO_H
44 #include <net/uio.h>
45 #endif
46 #ifdef HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49 #include <signal.h>
50
51 #include "src/class/pmix_list.h"
52 #include "src/include/pmix_globals.h"
53 #include "src/util/fd.h"
54
55 BEGIN_C_DECLS
56
57 /*
58 * Maximum size of single msg
59 */
60 #define PMIX_IOF_BASE_MSG_MAX 4096
61 #define PMIX_IOF_BASE_TAG_MAX 50
62 #define PMIX_IOF_BASE_TAGGED_OUT_MAX 8192
63 #define PMIX_IOF_MAX_INPUT_BUFFERS 50
64
65 typedef struct {
66 pmix_list_item_t super;
67 bool pending;
68 bool always_writable;
69 pmix_event_t ev;
70 struct timeval tv;
71 int fd;
72 pmix_list_t outputs;
73 } pmix_iof_write_event_t;
74 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_event_t);
75
76 typedef struct {
77 pmix_list_item_t super;
78 pmix_proc_t name;
79 pmix_iof_channel_t tag;
80 pmix_iof_write_event_t wev;
81 bool xoff;
82 bool exclusive;
83 bool closed;
84 } pmix_iof_sink_t;
85 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_sink_t);
86
87 typedef struct {
88 pmix_list_item_t super;
89 char data[PMIX_IOF_BASE_TAGGED_OUT_MAX];
90 int numbytes;
91 } pmix_iof_write_output_t;
92 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_output_t);
93
94 typedef struct {
95 pmix_object_t super;
96 pmix_event_t ev;
97 struct timeval tv;
98 int fd;
99 bool active;
100 void *childproc;
101 bool always_readable;
102 pmix_proc_t name;
103 pmix_iof_channel_t channel;
104 pmix_proc_t *targets;
105 size_t ntargets;
106 pmix_info_t *directives;
107 size_t ndirs;
108 } pmix_iof_read_event_t;
109 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_read_event_t);
110
111
112 /* define a struct to hold booleans controlling the
113 * format/contents of the output */
114 typedef struct {
115 bool xml;
116 time_t timestamp;
117 bool tag;
118 } pmix_iof_flags_t;
119
120
121 /* Write event macro's */
122
123 static inline bool
pmix_iof_fd_always_ready(int fd)124 pmix_iof_fd_always_ready(int fd)
125 {
126 return pmix_fd_is_regular(fd) ||
127 (pmix_fd_is_chardev(fd) && !isatty(fd)) ||
128 pmix_fd_is_blkdev(fd);
129 }
130
131 #define PMIX_IOF_SINK_BLOCKSIZE (1024)
132
133 #define PMIX_IOF_SINK_ACTIVATE(wev) \
134 do { \
135 struct timeval *tv = NULL; \
136 wev->pending = true; \
137 PMIX_POST_OBJECT(wev); \
138 if (wev->always_writable) { \
139 /* Regular is always write ready. Use timer to activate */ \
140 tv = &wev->tv; \
141 } \
142 if (pmix_event_add(&wev->ev, tv)) { \
143 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
144 } \
145 } while(0);
146
147
148 /* define an output "sink", adding it to the provided
149 * endpoint list for this proc */
150 #define PMIX_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
151 do { \
152 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output, \
153 "defining endpt: file %s line %d fd %d", \
154 __FILE__, __LINE__, (fid))); \
155 PMIX_CONSTRUCT((snk), pmix_iof_sink_t); \
156 pmix_strncpy((snk)->name.nspace, (nm)->nspace, PMIX_MAX_NSLEN); \
157 (snk)->name.rank = (nm)->rank; \
158 (snk)->tag = (tg); \
159 if (0 <= (fid)) { \
160 (snk)->wev.fd = (fid); \
161 (snk)->wev.always_writable = \
162 pmix_iof_fd_always_ready(fid); \
163 if ((snk)->wev.always_writable) { \
164 pmix_event_evtimer_set(pmix_globals.evbase, \
165 &(snk)->wev.ev, wrthndlr, (snk)); \
166 } else { \
167 pmix_event_set(pmix_globals.evbase, \
168 &(snk)->wev.ev, (snk)->wev.fd, \
169 PMIX_EV_WRITE, \
170 wrthndlr, (snk)); \
171 } \
172 } \
173 PMIX_POST_OBJECT(snk); \
174 } while(0);
175
176 /* Read event macro's */
177 #define PMIX_IOF_READ_ADDEV(rev) \
178 do { \
179 struct timeval *tv = NULL; \
180 if ((rev)->always_readable) { \
181 tv = &(rev)->tv; \
182 } \
183 if (pmix_event_add(&(rev)->ev, tv)) { \
184 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
185 } \
186 } while(0);
187
188 #define PMIX_IOF_READ_ACTIVATE(rev) \
189 do { \
190 (rev)->active = true; \
191 PMIX_POST_OBJECT(rev); \
192 PMIX_IOF_READ_ADDEV(rev); \
193 } while(0);
194
195
196 #define PMIX_IOF_READ_EVENT(rv, p, np, d, nd, fid, cbfunc, actv) \
197 do { \
198 size_t _ii; \
199 pmix_iof_read_event_t *rev; \
200 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output, \
201 "defining read event at: %s %d", \
202 __FILE__, __LINE__)); \
203 rev = PMIX_NEW(pmix_iof_read_event_t); \
204 if (NULL != (p)) { \
205 (rev)->ntargets = (np); \
206 PMIX_PROC_CREATE((rev)->targets, (rev)->ntargets); \
207 memcpy((rev)->targets, (p), (np) * sizeof(pmix_proc_t)); \
208 } \
209 if (NULL != (d) && 0 < (nd)) { \
210 PMIX_INFO_CREATE((rev)->directives, (nd)); \
211 (rev)->ndirs = (nd); \
212 for (_ii=0; _ii < (size_t)nd; _ii++) { \
213 PMIX_INFO_XFER(&((rev)->directives[_ii]), &((d)[_ii])); \
214 } \
215 } \
216 rev->fd = (fid); \
217 rev->always_readable = pmix_iof_fd_always_ready(fid); \
218 *(rv) = rev; \
219 if(rev->always_readable) { \
220 pmix_event_evtimer_set(pmix_globals.evbase, \
221 &rev->ev, (cbfunc), rev); \
222 } else { \
223 pmix_event_set(pmix_globals.evbase, \
224 &rev->ev, (fid), \
225 PMIX_EV_READ, \
226 (cbfunc), rev); \
227 } \
228 if ((actv)) { \
229 PMIX_IOF_READ_ACTIVATE(rev) \
230 } \
231 } while(0);
232
233
234 PMIX_EXPORT pmix_status_t pmix_iof_flush(void);
235
236 PMIX_EXPORT pmix_status_t pmix_iof_write_output(const pmix_proc_t *name,
237 pmix_iof_channel_t stream,
238 const pmix_byte_object_t *bo,
239 pmix_iof_flags_t *flags);
240 PMIX_EXPORT void pmix_iof_static_dump_output(pmix_iof_sink_t *sink);
241 PMIX_EXPORT void pmix_iof_write_handler(int fd, short event, void *cbdata);
242 PMIX_EXPORT bool pmix_iof_stdin_check(int fd);
243 PMIX_EXPORT void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata);
244 PMIX_EXPORT void pmix_iof_stdin_cb(int fd, short event, void *cbdata);
245 PMIX_EXPORT pmix_status_t pmix_iof_process_iof(pmix_iof_channel_t channels,
246 const pmix_proc_t *source,
247 const pmix_byte_object_t *bo,
248 const pmix_info_t *info, size_t ninfo,
249 const pmix_iof_req_t *req);
250
251 END_C_DECLS
252
253 #endif /* PMIX_IOF_H */
254