1 /*****************************************************************************\
2 ** agent.c - PMI2 handling thread
3 *****************************************************************************
4 * Copyright (C) 2011-2012 National University of Defense Technology.
5 * Written by Hongjia Cao <hjcao@nudt.edu.cn>.
6 * All rights reserved.
7 * Portions copyright (C) 2015 Mellanox Technologies Inc.
8 * Written by Artem Y. Polyakov <artemp@mellanox.com>.
9 * All rights reserved.
10 *
11 * This file is part of Slurm, a resource management program.
12 * For details, see <https://slurm.schedmd.com/>.
13 * Please also read the included file: DISCLAIMER.
14 *
15 * Slurm is free software; you can redistribute it and/or modify it under
16 * the terms of the GNU General Public License as published by the Free
17 * Software Foundation; either version 2 of the License, or (at your option)
18 * any later version.
19 *
20 * In addition, as a special exception, the copyright holders give permission
21 * to link the code of portions of this program with the OpenSSL library under
22 * certain conditions as described in each individual source file, and
23 * distribute linked combinations including the two. You must obey the GNU
24 * General Public License in all respects for all of the code used other than
25 * OpenSSL. If you modify file(s) with this exception, you may extend this
26 * exception to your version of the file(s), but you are not obligated to do
27 * so. If you do not wish to do so, delete this exception statement from your
28 * version. If you delete this exception statement from all source files in
29 * the program, then also delete it here.
30 *
31 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
32 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
33 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
34 * details.
35 *
36 * You should have received a copy of the GNU General Public License along
37 * with Slurm; if not, write to the Free Software Foundation, Inc.,
38 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
39 \*****************************************************************************/
40
41 #if defined(__FreeBSD__) || defined(__DragonFly__)
42 #include <sys/socket.h> /* AF_INET */
43 #endif
44
45 #include <arpa/inet.h>
46 #include <fcntl.h>
47 #include <signal.h>
48 #include <sys/types.h>
49 #include <sys/un.h>
50 #include <poll.h>
51
52 #include "src/common/slurm_xlator.h"
53 #include "src/common/eio.h"
54 #include "src/common/macros.h"
55 #include "src/common/slurm_mpi.h"
56 #include "src/common/xstring.h"
57 #include "src/slurmd/slurmstepd/slurmstepd_job.h"
58
59 #include "client.h"
60 #include "pmi.h"
61 #include "setup.h"
62
63 static int *initialized = NULL;
64 static int *finalized = NULL;
65
66 static eio_handle_t *pmi2_handle;
67 static pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER;
68 static pthread_cond_t agent_running_cond = PTHREAD_COND_INITIALIZER;
69 static pthread_t _agent_tid = 0;
70
71 static bool _tree_listen_readable(eio_obj_t *obj);
72 static int _tree_listen_read(eio_obj_t *obj, List objs);
73 static struct io_operations tree_listen_ops = {
74 .readable = &_tree_listen_readable,
75 .handle_read = &_tree_listen_read,
76 };
77
78 static bool _task_readable(eio_obj_t *obj);
79 static int _task_read(eio_obj_t *obj, List objs);
80 /* static bool _task_writable(eio_obj_t *obj); */
81 /* static int _task_write(eio_obj_t *obj, List objs); */
82 static struct io_operations task_ops = {
83 .readable = &_task_readable,
84 .handle_read = &_task_read,
85 };
86
87
88 static int _handle_pmi1_init(int fd, int lrank);
89
90 /*********************************************************************/
91
92 static int
_handle_task_request(int fd,int lrank)93 _handle_task_request(int fd, int lrank)
94 {
95 int rc = SLURM_SUCCESS;
96
97 debug3("mpi/pmi2: in _handle_task_request, lrank=%d", lrank);
98
99 if (initialized[lrank] == 0) {
100 rc = _handle_pmi1_init(fd, lrank);
101 initialized[lrank] = 1;
102 } else if (is_pmi11()) {
103 rc = handle_pmi1_cmd(fd, lrank);
104 } else if (is_pmi20()) {
105 rc = handle_pmi2_cmd(fd, lrank);
106 } else {
107 fatal("this is impossible");
108 }
109 return rc;
110 }
111
112 static int
_handle_tree_request(int fd)113 _handle_tree_request(int fd)
114 {
115 uint32_t temp;
116 int rc = SLURM_SUCCESS;
117
118 if (in_stepd()) { /* skip uid passed from slurmd */
119 safe_read(fd, &temp, sizeof(uint32_t));
120 temp = ntohl(temp);
121 debug3("mpi/pmi2: _handle_tree_request: req from uid %u", temp);
122 }
123 rc = handle_tree_cmd(fd);
124 return rc;
125 rwfail:
126 return SLURM_ERROR;
127 }
128
129 /*********************************************************************/
130
131 static bool
_is_fd_ready(int fd)132 _is_fd_ready(int fd)
133 {
134 struct pollfd pfd[1];
135 int rc;
136
137 pfd[0].fd = fd;
138 pfd[0].events = POLLIN;
139
140 rc = poll(pfd, 1, 10);
141
142 return ((rc == 1) && (pfd[0].revents & POLLIN));
143 }
144
145 static bool
_tree_listen_readable(eio_obj_t * obj)146 _tree_listen_readable(eio_obj_t *obj)
147 {
148 debug2("mpi/pmi2: _tree_listen_readable");
149 if (obj->shutdown == true) {
150 if (obj->fd != -1) {
151 close(obj->fd);
152 obj->fd = -1;
153 }
154 debug2(" false, shutdown");
155 return false;
156 }
157 return true;
158 }
159
160 static int
_tree_listen_read(eio_obj_t * obj,List objs)161 _tree_listen_read(eio_obj_t *obj, List objs)
162 {
163 int sd;
164 struct sockaddr addr;
165 struct sockaddr_in *sin;
166 socklen_t size = sizeof(addr);
167 char buf[INET_ADDRSTRLEN];
168
169 debug2("mpi/pmi2: _tree_listen_read");
170
171 while (1) {
172 /*
173 * Return early if fd is not now ready
174 */
175 if (!_is_fd_ready(obj->fd))
176 return 0;
177
178 while ((sd = accept(obj->fd, &addr, &size)) < 0) {
179 if (errno == EINTR)
180 continue;
181 if (errno == EAGAIN) /* No more connections */
182 return 0;
183 if ((errno == ECONNABORTED) ||
184 (errno == EWOULDBLOCK)) {
185 return 0;
186 }
187 error("mpi/pmi2: unable to accept new connection: %m");
188 return 0;
189 }
190
191 if (! in_stepd()) {
192 sin = (struct sockaddr_in *) &addr;
193 inet_ntop(AF_INET, &sin->sin_addr, buf, INET_ADDRSTRLEN);
194 debug3("mpi/pmi2: accepted tree connection: ip=%s sd=%d",
195 buf, sd);
196 }
197
198 /* read command from socket and handle it */
199 _handle_tree_request(sd);
200 close(sd);
201 }
202 return 0;
203 }
204
205 /*********************************************************************/
206
207 static bool
_task_readable(eio_obj_t * obj)208 _task_readable(eio_obj_t *obj)
209 {
210 int lrank;
211
212 debug2("mpi/pmi2: _task_readable");
213
214 lrank = (int)(long)(obj->arg);
215 if (finalized[lrank] == 1) {
216 debug2(" false, finalized");
217 return false;
218 }
219
220 if (obj->shutdown == true) {
221 if (obj->fd != -1) {
222 close(obj->fd);
223 obj->fd = -1;
224 }
225 debug2(" false, shutdown");
226 return false;
227 }
228 return true;
229 }
230
231 static int
_task_read(eio_obj_t * obj,List objs)232 _task_read(eio_obj_t *obj, List objs)
233 {
234 int rc, lrank;
235
236 lrank = (int)(long)(obj->arg);
237 rc = _handle_task_request(obj->fd, lrank);
238
239 return rc;
240 }
241
242 /*********************************************************************/
243
244 /* the PMI1 init */
245 static int
_handle_pmi1_init(int fd,int lrank)246 _handle_pmi1_init(int fd, int lrank)
247 {
248 char buf[64];
249 int version, subversion;
250 int n, rc = 0;
251
252 debug3("mpi/pmi2: in _handle_pmi1_init");
253
254 while ( (n = read(fd, buf, 64)) < 0 && errno == EINTR);
255 if ((n < 0) || (n >= 64)) {
256 error("mpi/pmi2: failed to read PMI1 init command");
257 return SLURM_ERROR;
258 }
259 buf[n] = '\0';
260
261 n = sscanf(buf, "cmd=init pmi_version=%d pmi_subversion=%d\n",
262 &version, &subversion);
263 if (n != 2) {
264 error("mpi/pmi2: invalid PMI1 init command: `%s'", buf);
265 rc = 1;
266 version = 2;
267 subversion = 0;
268 goto send_response;
269 }
270
271 rc = set_pmi_version(version, subversion);
272 if (rc != SLURM_SUCCESS) {
273 get_pmi_version(&version, &subversion);
274 } else
275 rc = 0;
276
277 send_response:
278 snprintf(buf, 64, "cmd=response_to_init rc=%d pmi_version=%d "
279 "pmi_subversion=%d\n", rc, version, subversion);
280
281 while ( (n = write(fd, buf, strlen(buf))) < 0 && errno == EINTR);
282 if (n < 0) {
283 error ("mpi/pmi2: failed to write PMI1 init response");
284 return SLURM_ERROR;
285 }
286
287 debug3("mpi/pmi2: out _handle_pmi1_init");
288 return SLURM_SUCCESS;
289 }
290
291 /*********************************************************************/
292
293
294 /*
295 * main loop of agent thread
296 */
297 static void *
_agent(void * unused)298 _agent(void * unused)
299 {
300 eio_obj_t *tree_listen_obj, *task_obj;
301 int i;
302
303 pmi2_handle = eio_handle_create(0);
304
305 //fd_set_nonblocking(tree_sock);
306 tree_listen_obj = eio_obj_create(tree_sock, &tree_listen_ops,
307 (void *)(-1));
308 eio_new_initial_obj(pmi2_handle, tree_listen_obj);
309
310 /* for stepd, add the sockets to tasks */
311 if (in_stepd()) {
312 for (i = 0; i < job_info.ltasks; i ++) {
313 task_obj = eio_obj_create(STEPD_PMI_SOCK(i), &task_ops,
314 (void*)(long)(i));
315 eio_new_initial_obj(pmi2_handle, task_obj);
316 }
317 initialized = xmalloc(job_info.ltasks * sizeof(int));
318 finalized = xmalloc(job_info.ltasks * sizeof(int));
319 }
320
321 slurm_mutex_lock(&agent_mutex);
322 slurm_cond_signal(&agent_running_cond);
323 slurm_mutex_unlock(&agent_mutex);
324
325 eio_handle_mainloop(pmi2_handle);
326
327 debug("mpi/pmi2: agent thread exit");
328
329 eio_handle_destroy(pmi2_handle);
330
331 return NULL;
332 }
333
334 /*
335 * start the PMI2 agent thread
336 */
337 extern int
pmi2_start_agent(void)338 pmi2_start_agent(void)
339 {
340 static bool first = true;
341
342 slurm_mutex_lock(&agent_mutex);
343 if (!first) {
344 slurm_mutex_unlock(&agent_mutex);
345 return SLURM_SUCCESS;
346 }
347 first = false;
348
349 slurm_thread_create(&_agent_tid, _agent, NULL);
350
351 slurm_cond_wait(&agent_running_cond, &agent_mutex);
352
353 debug("mpi/pmi2: started agent thread");
354
355 slurm_mutex_unlock(&agent_mutex);
356
357 return SLURM_SUCCESS;
358 }
359
360 /*
361 * stop the PMI2 agent thread
362 */
363 extern int
pmi2_stop_agent(void)364 pmi2_stop_agent(void)
365 {
366 slurm_mutex_lock(&agent_mutex);
367
368 if (_agent_tid) {
369 eio_signal_shutdown(pmi2_handle);
370 /* wait for the agent thread to stop */
371 pthread_join(_agent_tid, NULL);
372 _agent_tid = 0;
373 }
374
375 slurm_mutex_unlock(&agent_mutex);
376
377 return SLURM_SUCCESS;
378 }
379
380 extern void
task_finalize(int lrank)381 task_finalize(int lrank)
382 {
383 finalized[lrank] = 1;
384 }
385