1 /*****************************************************************************\
2  **  agent.c - PMI2 handling thread
3  *****************************************************************************
4  *  Copyright (C) 2011-2012 National University of Defense Technology.
5  *  Written by Hongjia Cao <hjcao@nudt.edu.cn>.
6  *  All rights reserved.
7  *  Portions copyright (C) 2015 Mellanox Technologies Inc.
8  *  Written by Artem Y. Polyakov <artemp@mellanox.com>.
9  *  All rights reserved.
10  *
11  *  This file is part of Slurm, a resource management program.
12  *  For details, see <https://slurm.schedmd.com/>.
13  *  Please also read the included file: DISCLAIMER.
14  *
15  *  Slurm is free software; you can redistribute it and/or modify it under
16  *  the terms of the GNU General Public License as published by the Free
17  *  Software Foundation; either version 2 of the License, or (at your option)
18  *  any later version.
19  *
20  *  In addition, as a special exception, the copyright holders give permission
21  *  to link the code of portions of this program with the OpenSSL library under
22  *  certain conditions as described in each individual source file, and
23  *  distribute linked combinations including the two. You must obey the GNU
24  *  General Public License in all respects for all of the code used other than
25  *  OpenSSL. If you modify file(s) with this exception, you may extend this
26  *  exception to your version of the file(s), but you are not obligated to do
27  *  so. If you do not wish to do so, delete this exception statement from your
28  *  version.  If you delete this exception statement from all source files in
29  *  the program, then also delete it here.
30  *
31  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
32  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
33  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
34  *  details.
35  *
36  *  You should have received a copy of the GNU General Public License along
37  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
38  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
39 \*****************************************************************************/
40 
41 #if defined(__FreeBSD__) || defined(__DragonFly__)
42 #include <sys/socket.h>	/* AF_INET */
43 #endif
44 
45 #include <arpa/inet.h>
46 #include <fcntl.h>
47 #include <signal.h>
48 #include <sys/types.h>
49 #include <sys/un.h>
50 #include <poll.h>
51 
52 #include "src/common/slurm_xlator.h"
53 #include "src/common/eio.h"
54 #include "src/common/macros.h"
55 #include "src/common/slurm_mpi.h"
56 #include "src/common/xstring.h"
57 #include "src/slurmd/slurmstepd/slurmstepd_job.h"
58 
59 #include "client.h"
60 #include "pmi.h"
61 #include "setup.h"
62 
63 static int *initialized = NULL;
64 static int *finalized = NULL;
65 
66 static eio_handle_t *pmi2_handle;
67 static pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER;
68 static pthread_cond_t agent_running_cond = PTHREAD_COND_INITIALIZER;
69 static pthread_t _agent_tid = 0;
70 
71 static bool _tree_listen_readable(eio_obj_t *obj);
72 static int  _tree_listen_read(eio_obj_t *obj, List objs);
73 static struct io_operations tree_listen_ops = {
74 .readable    = &_tree_listen_readable,
75 .handle_read = &_tree_listen_read,
76 };
77 
78 static bool _task_readable(eio_obj_t *obj);
79 static int  _task_read(eio_obj_t *obj, List objs);
80 /* static bool _task_writable(eio_obj_t *obj); */
81 /* static int  _task_write(eio_obj_t *obj, List objs); */
82 static struct io_operations task_ops = {
83 .readable    =  &_task_readable,
84 .handle_read =  &_task_read,
85 };
86 
87 
88 static int _handle_pmi1_init(int fd, int lrank);
89 
90 /*********************************************************************/
91 
92 static int
_handle_task_request(int fd,int lrank)93 _handle_task_request(int fd, int lrank)
94 {
95 	int rc = SLURM_SUCCESS;
96 
97 	debug3("mpi/pmi2: in _handle_task_request, lrank=%d", lrank);
98 
99 	if (initialized[lrank] == 0) {
100 		rc = _handle_pmi1_init(fd, lrank);
101 		initialized[lrank] = 1;
102 	} else if (is_pmi11()) {
103 		rc = handle_pmi1_cmd(fd, lrank);
104 	} else if (is_pmi20()) {
105 		rc = handle_pmi2_cmd(fd, lrank);
106 	} else {
107 		fatal("this is impossible");
108 	}
109 	return rc;
110 }
111 
112 static int
_handle_tree_request(int fd)113 _handle_tree_request(int fd)
114 {
115 	uint32_t temp;
116 	int rc = SLURM_SUCCESS;
117 
118 	if (in_stepd()) {	/* skip uid passed from slurmd */
119 		safe_read(fd, &temp, sizeof(uint32_t));
120 		temp = ntohl(temp);
121 		debug3("mpi/pmi2: _handle_tree_request: req from uid %u", temp);
122 	}
123 	rc = handle_tree_cmd(fd);
124 	return rc;
125 rwfail:
126 	return SLURM_ERROR;
127 }
128 
129 /*********************************************************************/
130 
131 static bool
_is_fd_ready(int fd)132 _is_fd_ready(int fd)
133 {
134 	struct pollfd pfd[1];
135 	int    rc;
136 
137 	pfd[0].fd     = fd;
138 	pfd[0].events = POLLIN;
139 
140 	rc = poll(pfd, 1, 10);
141 
142 	return ((rc == 1) && (pfd[0].revents & POLLIN));
143 }
144 
145 static bool
_tree_listen_readable(eio_obj_t * obj)146 _tree_listen_readable(eio_obj_t *obj)
147 {
148 	debug2("mpi/pmi2: _tree_listen_readable");
149 	if (obj->shutdown == true) {
150 		if (obj->fd != -1) {
151 			close(obj->fd);
152 			obj->fd = -1;
153 		}
154 		debug2("    false, shutdown");
155 		return false;
156 	}
157 	return true;
158 }
159 
160 static int
_tree_listen_read(eio_obj_t * obj,List objs)161 _tree_listen_read(eio_obj_t *obj, List objs)
162 {
163 	int sd;
164 	struct sockaddr addr;
165 	struct sockaddr_in *sin;
166 	socklen_t size = sizeof(addr);
167 	char buf[INET_ADDRSTRLEN];
168 
169 	debug2("mpi/pmi2: _tree_listen_read");
170 
171 	while (1) {
172 		/*
173 		 * Return early if fd is not now ready
174 		 */
175 		if (!_is_fd_ready(obj->fd))
176 			return 0;
177 
178 		while ((sd = accept(obj->fd, &addr, &size)) < 0) {
179 			if (errno == EINTR)
180 				continue;
181 			if (errno == EAGAIN)    /* No more connections */
182 				return 0;
183 			if ((errno == ECONNABORTED) ||
184 			    (errno == EWOULDBLOCK)) {
185 				return 0;
186 			}
187 			error("mpi/pmi2: unable to accept new connection: %m");
188 			return 0;
189 		}
190 
191 		if (! in_stepd()) {
192 			sin = (struct sockaddr_in *) &addr;
193 			inet_ntop(AF_INET, &sin->sin_addr, buf, INET_ADDRSTRLEN);
194 			debug3("mpi/pmi2: accepted tree connection: ip=%s sd=%d",
195 			       buf, sd);
196 		}
197 
198 		/* read command from socket and handle it */
199 		_handle_tree_request(sd);
200 		close(sd);
201 	}
202 	return 0;
203 }
204 
205 /*********************************************************************/
206 
207 static bool
_task_readable(eio_obj_t * obj)208 _task_readable(eio_obj_t *obj)
209 {
210 	int lrank;
211 
212 	debug2("mpi/pmi2: _task_readable");
213 
214 	lrank = (int)(long)(obj->arg);
215 	if (finalized[lrank] == 1) {
216 		debug2("    false, finalized");
217 		return false;
218 	}
219 
220 	if (obj->shutdown == true) {
221 		if (obj->fd != -1) {
222 			close(obj->fd);
223 			obj->fd = -1;
224 		}
225 		debug2("    false, shutdown");
226 		return false;
227 	}
228 	return true;
229 }
230 
231 static int
_task_read(eio_obj_t * obj,List objs)232 _task_read(eio_obj_t *obj, List objs)
233 {
234 	int rc, lrank;
235 
236 	lrank = (int)(long)(obj->arg);
237 	rc = _handle_task_request(obj->fd, lrank);
238 
239 	return rc;
240 }
241 
242 /*********************************************************************/
243 
244 /* the PMI1 init */
245 static int
_handle_pmi1_init(int fd,int lrank)246 _handle_pmi1_init(int fd, int lrank)
247 {
248 	char buf[64];
249 	int version, subversion;
250 	int n, rc = 0;
251 
252 	debug3("mpi/pmi2: in _handle_pmi1_init");
253 
254 	while ( (n = read(fd, buf, 64)) < 0 && errno == EINTR);
255 	if ((n < 0) || (n >= 64)) {
256 		error("mpi/pmi2: failed to read PMI1 init command");
257 		return SLURM_ERROR;
258 	}
259 	buf[n] = '\0';
260 
261 	n = sscanf(buf, "cmd=init pmi_version=%d pmi_subversion=%d\n",
262 		   &version, &subversion);
263 	if (n != 2) {
264 		error("mpi/pmi2: invalid PMI1 init command: `%s'", buf);
265 		rc = 1;
266 		version = 2;
267 		subversion = 0;
268 		goto send_response;
269 	}
270 
271 	rc = set_pmi_version(version, subversion);
272 	if (rc != SLURM_SUCCESS) {
273 		get_pmi_version(&version, &subversion);
274 	} else
275 		rc = 0;
276 
277 send_response:
278 	snprintf(buf, 64, "cmd=response_to_init rc=%d pmi_version=%d "
279 		 "pmi_subversion=%d\n", rc, version, subversion);
280 
281 	while ( (n = write(fd, buf, strlen(buf))) < 0 && errno == EINTR);
282 	if (n < 0) {
283 		error ("mpi/pmi2: failed to write PMI1 init response");
284 		return SLURM_ERROR;
285 	}
286 
287 	debug3("mpi/pmi2: out _handle_pmi1_init");
288 	return SLURM_SUCCESS;
289 }
290 
291 /*********************************************************************/
292 
293 
294 /*
295  * main loop of agent thread
296  */
297 static void *
_agent(void * unused)298 _agent(void * unused)
299 {
300 	eio_obj_t *tree_listen_obj, *task_obj;
301 	int i;
302 
303 	pmi2_handle = eio_handle_create(0);
304 
305 	//fd_set_nonblocking(tree_sock);
306 	tree_listen_obj = eio_obj_create(tree_sock, &tree_listen_ops,
307 					 (void *)(-1));
308 	eio_new_initial_obj(pmi2_handle, tree_listen_obj);
309 
310 	/* for stepd, add the sockets to tasks */
311 	if (in_stepd()) {
312 		for (i = 0; i < job_info.ltasks; i ++) {
313 			task_obj = eio_obj_create(STEPD_PMI_SOCK(i), &task_ops,
314 						  (void*)(long)(i));
315 			eio_new_initial_obj(pmi2_handle, task_obj);
316 		}
317 		initialized = xmalloc(job_info.ltasks * sizeof(int));
318 		finalized = xmalloc(job_info.ltasks * sizeof(int));
319 	}
320 
321 	slurm_mutex_lock(&agent_mutex);
322 	slurm_cond_signal(&agent_running_cond);
323 	slurm_mutex_unlock(&agent_mutex);
324 
325 	eio_handle_mainloop(pmi2_handle);
326 
327 	debug("mpi/pmi2: agent thread exit");
328 
329 	eio_handle_destroy(pmi2_handle);
330 
331 	return NULL;
332 }
333 
334 /*
335  * start the PMI2 agent thread
336  */
337 extern int
pmi2_start_agent(void)338 pmi2_start_agent(void)
339 {
340 	static bool first = true;
341 
342 	slurm_mutex_lock(&agent_mutex);
343 	if (!first) {
344 		slurm_mutex_unlock(&agent_mutex);
345 		return SLURM_SUCCESS;
346 	}
347 	first = false;
348 
349 	slurm_thread_create(&_agent_tid, _agent, NULL);
350 
351 	slurm_cond_wait(&agent_running_cond, &agent_mutex);
352 
353 	debug("mpi/pmi2: started agent thread");
354 
355 	slurm_mutex_unlock(&agent_mutex);
356 
357 	return SLURM_SUCCESS;
358 }
359 
360 /*
361  * stop the PMI2 agent thread
362  */
363 extern int
pmi2_stop_agent(void)364 pmi2_stop_agent(void)
365 {
366 	slurm_mutex_lock(&agent_mutex);
367 
368 	if (_agent_tid) {
369 		eio_signal_shutdown(pmi2_handle);
370 		/* wait for the agent thread to stop */
371 		pthread_join(_agent_tid, NULL);
372 		_agent_tid = 0;
373 	}
374 
375 	slurm_mutex_unlock(&agent_mutex);
376 
377 	return SLURM_SUCCESS;
378 }
379 
380 extern void
task_finalize(int lrank)381 task_finalize(int lrank)
382 {
383 	finalized[lrank] = 1;
384 }
385