1 /*****************************************************************************\
2  *  srun_comm.c - srun communications
3  *****************************************************************************
4  *  Copyright (C) 2002-2007 The Regents of the University of California.
5  *  Copyright (C) 2008-2010 Lawrence Livermore National Security.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Morris Jette <jette1@llnl.gov>
8  *  CODE-OCEC-09-009. All rights reserved.
9  *
10  *  This file is part of Slurm, a resource management program.
11  *  For details, see <https://slurm.schedmd.com/>.
12  *  Please also read the included file: DISCLAIMER.
13  *
14  *  Slurm is free software; you can redistribute it and/or modify it under
15  *  the terms of the GNU General Public License as published by the Free
16  *  Software Foundation; either version 2 of the License, or (at your option)
17  *  any later version.
18  *
19  *  In addition, as a special exception, the copyright holders give permission
20  *  to link the code of portions of this program with the OpenSSL library under
21  *  certain conditions as described in each individual source file, and
22  *  distribute linked combinations including the two. You must obey the GNU
23  *  General Public License in all respects for all of the code used other than
24  *  OpenSSL. If you modify file(s) with this exception, you may extend this
25  *  exception to your version of the file(s), but you are not obligated to do
26  *  so. If you do not wish to do so, delete this exception statement from your
27  *  version.  If you delete this exception statement from all source files in
28  *  the program, then also delete it here.
29  *
30  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
33  *  details.
34  *
35  *  You should have received a copy of the GNU General Public License along
36  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
37  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
38 \*****************************************************************************/
39 
40 #include "config.h"
41 
42 #include <string.h>
43 
44 #include "src/common/node_select.h"
45 #include "src/common/xassert.h"
46 #include "src/common/xmalloc.h"
47 #include "src/common/xstring.h"
48 #include "src/slurmctld/agent.h"
49 #include "src/slurmctld/fed_mgr.h"
50 #include "src/slurmctld/proc_req.h"
51 #include "src/slurmctld/slurmctld.h"
52 #include "src/slurmctld/srun_comm.h"
53 
54 /* Launch the srun request. Note that retry is always zero since
55  * we don't want to clog the system up with messages destined for
56  * defunct srun processes
57  */
_srun_agent_launch(slurm_addr_t * addr,char * host,slurm_msg_type_t type,void * msg_args,uint16_t protocol_version)58 static void _srun_agent_launch(slurm_addr_t *addr, char *host,
59 			       slurm_msg_type_t type, void *msg_args,
60 			       uint16_t protocol_version)
61 {
62 	agent_arg_t *agent_args = xmalloc(sizeof(agent_arg_t));
63 
64 	agent_args->node_count = 1;
65 	agent_args->retry      = 0;
66 	agent_args->addr       = addr;
67 	agent_args->hostlist   = hostlist_create(host);
68 	agent_args->msg_type   = type;
69 	agent_args->msg_args   = msg_args;
70 	agent_args->protocol_version = protocol_version;
71 
72 	agent_queue_request(agent_args);
73 }
74 
_pending_het_jobs(job_record_t * job_ptr)75 static bool _pending_het_jobs(job_record_t *job_ptr)
76 {
77 	job_record_t *het_job_leader, *het_job;
78 	ListIterator iter;
79 	bool pending_job = false;
80 
81 	if (job_ptr->het_job_id == 0)
82 		return false;
83 
84 	het_job_leader = find_job_record(job_ptr->het_job_id);
85 	if (!het_job_leader) {
86 		error("Hetjob leader %pJ not found", job_ptr);
87 		return false;
88 	}
89 	if (!het_job_leader->het_job_list) {
90 		error("Hetjob leader %pJ lacks het_job_list",
91 		      job_ptr);
92 		return false;
93 	}
94 
95 	iter = list_iterator_create(het_job_leader->het_job_list);
96 	while ((het_job = list_next(iter))) {
97 		if (het_job_leader->het_job_id != het_job->het_job_id) {
98 			error("%s: Bad het_job_list for %pJ",
99 			      __func__, het_job_leader);
100 			continue;
101 		}
102 		if (IS_JOB_PENDING(het_job)) {
103 			pending_job = true;
104 			break;
105 		}
106 	}
107 	list_iterator_destroy(iter);
108 
109 	return pending_job;
110 }
111 
_free_srun_alloc(void * x)112 static void _free_srun_alloc(void *x)
113 {
114 	resource_allocation_response_msg_t *alloc_msg;
115 
116 	alloc_msg = (resource_allocation_response_msg_t *) x;
117 	/* NULL working_cluster_rec because it's pointing to global memory */
118 	alloc_msg->working_cluster_rec = NULL;
119 	slurm_free_resource_allocation_response_msg(alloc_msg);
120 }
121 
122 /*
123  * srun_allocate - notify srun of a resource allocation
124  * IN job_ptr - job allocated resources
125  */
srun_allocate(job_record_t * job_ptr)126 extern void srun_allocate(job_record_t *job_ptr)
127 {
128 	job_record_t *het_job, *het_job_leader;
129 	resource_allocation_response_msg_t *msg_arg = NULL;
130 	slurm_addr_t *addr;
131 	ListIterator iter;
132 	List job_resp_list = NULL;
133 
134 	xassert(job_ptr);
135 	if (!job_ptr || !job_ptr->alloc_resp_port || !job_ptr->alloc_node ||
136 	    !job_ptr->resp_host || !job_ptr->job_resrcs ||
137 	    !job_ptr->job_resrcs->cpu_array_cnt)
138 		return;
139 
140 	if (job_ptr->het_job_id == 0) {
141 		addr = xmalloc(sizeof(struct sockaddr_in));
142 		slurm_set_addr(addr, job_ptr->alloc_resp_port,
143 			job_ptr->resp_host);
144 
145 		msg_arg = build_alloc_msg(job_ptr, SLURM_SUCCESS, NULL);
146 		_srun_agent_launch(addr, job_ptr->alloc_node,
147 				   RESPONSE_RESOURCE_ALLOCATION, msg_arg,
148 				   job_ptr->start_protocol_ver);
149 	} else if (_pending_het_jobs(job_ptr)) {
150 		return;
151 	} else if ((het_job_leader = find_job_record(job_ptr->het_job_id))) {
152 		addr = xmalloc(sizeof(struct sockaddr_in));
153 		slurm_set_addr(addr, het_job_leader->alloc_resp_port,
154 			       het_job_leader->resp_host);
155 		job_resp_list = list_create(_free_srun_alloc);
156 		iter = list_iterator_create(het_job_leader->het_job_list);
157 		while ((het_job = list_next(iter))) {
158 			if (het_job_leader->het_job_id !=
159 				het_job->het_job_id) {
160 				error("%s: Bad het_job_list for %pJ",
161 				      __func__, het_job_leader);
162 				continue;
163 			}
164 			msg_arg = build_alloc_msg(het_job, SLURM_SUCCESS,
165 						  NULL);
166 			list_append(job_resp_list, msg_arg);
167 			msg_arg = NULL;
168 		}
169 		list_iterator_destroy(iter);
170 		_srun_agent_launch(addr, job_ptr->alloc_node,
171 				   RESPONSE_HET_JOB_ALLOCATION, job_resp_list,
172 				   job_ptr->start_protocol_ver);
173 	} else {
174 		error("%s: Can not find hetjob leader %pJ",
175 		      __func__, job_ptr);
176 	}
177 }
178 
179 /*
180  * srun_allocate_abort - notify srun of a resource allocation failure
181  * IN job_ptr - job allocated resources
182  */
srun_allocate_abort(job_record_t * job_ptr)183 extern void srun_allocate_abort(job_record_t *job_ptr)
184 {
185 	if (job_ptr && job_ptr->alloc_resp_port && job_ptr->alloc_node &&
186 	    job_ptr->resp_host) {
187 		slurm_addr_t * addr;
188 		srun_job_complete_msg_t *msg_arg;
189 		addr = xmalloc(sizeof(struct sockaddr_in));
190 		slurm_set_addr(addr, job_ptr->alloc_resp_port,
191 			       job_ptr->resp_host);
192 		msg_arg = xmalloc(sizeof(srun_timeout_msg_t));
193 		msg_arg->job_id   = job_ptr->job_id;
194 		msg_arg->step_id  = NO_VAL;
195 		_srun_agent_launch(addr, job_ptr->alloc_node,
196 				   SRUN_JOB_COMPLETE,
197 				   msg_arg,
198 				   job_ptr->start_protocol_ver);
199 	}
200 }
201 
202 /*
203  * srun_node_fail - notify srun of a node's failure
204  * IN job_ptr - job to notify
205  * IN node_name - name of failed node
206  */
srun_node_fail(job_record_t * job_ptr,char * node_name)207 extern void srun_node_fail(job_record_t *job_ptr, char *node_name)
208 {
209 #ifndef HAVE_FRONT_END
210 	node_record_t *node_ptr;
211 #endif
212 	int bit_position = -1;
213 	slurm_addr_t * addr;
214 	srun_node_fail_msg_t *msg_arg;
215 	ListIterator step_iterator;
216 	step_record_t *step_ptr;
217 
218 	xassert(job_ptr);
219 	xassert(node_name);
220 	if (!job_ptr || !IS_JOB_RUNNING(job_ptr))
221 		return;
222 
223 #ifdef HAVE_FRONT_END
224 	/* Purge all jobs steps in front end mode */
225 #else
226 	if (!node_name || (node_ptr = find_node_record(node_name)) == NULL)
227 		return;
228 	bit_position = node_ptr - node_record_table_ptr;
229 #endif
230 
231 	step_iterator = list_iterator_create(job_ptr->step_list);
232 	while ((step_ptr = list_next(step_iterator))) {
233 		if (step_ptr->step_node_bitmap == NULL)   /* pending step */
234 			continue;
235 		if ((bit_position >= 0) &&
236 		    (!bit_test(step_ptr->step_node_bitmap, bit_position)))
237 			continue;	/* job step not on this node */
238 		if ( (step_ptr->port    == 0)    ||
239 		     (step_ptr->host    == NULL) ||
240 		     (step_ptr->batch_step)      ||
241 		     (step_ptr->host[0] == '\0') )
242 			continue;
243 		addr = xmalloc(sizeof(struct sockaddr_in));
244 		slurm_set_addr(addr, step_ptr->port, step_ptr->host);
245 		msg_arg = xmalloc(sizeof(srun_node_fail_msg_t));
246 		msg_arg->job_id   = job_ptr->job_id;
247 		msg_arg->step_id  = step_ptr->step_id;
248 		msg_arg->nodelist = xstrdup(node_name);
249 		_srun_agent_launch(addr, step_ptr->host, SRUN_NODE_FAIL,
250 				   msg_arg, step_ptr->start_protocol_ver);
251 	}
252 	list_iterator_destroy(step_iterator);
253 
254 	if (job_ptr->other_port && job_ptr->alloc_node && job_ptr->resp_host) {
255 		addr = xmalloc(sizeof(struct sockaddr_in));
256 		slurm_set_addr(addr, job_ptr->other_port, job_ptr->resp_host);
257 		msg_arg = xmalloc(sizeof(srun_node_fail_msg_t));
258 		msg_arg->job_id   = job_ptr->job_id;
259 		msg_arg->step_id  = NO_VAL;
260 		msg_arg->nodelist = xstrdup(node_name);
261 		_srun_agent_launch(addr, job_ptr->alloc_node, SRUN_NODE_FAIL,
262 				   msg_arg, job_ptr->start_protocol_ver);
263 	}
264 }
265 
266 /* srun_ping - ping all srun commands that have not been heard from recently */
srun_ping(void)267 extern void srun_ping (void)
268 {
269 	ListIterator job_iterator;
270 	job_record_t *job_ptr;
271 	slurm_addr_t * addr;
272 	time_t now = time(NULL);
273 	time_t old = now - (slurmctld_conf.inactive_limit / 3) +
274 			   slurmctld_conf.msg_timeout + 1;
275 	srun_ping_msg_t *msg_arg;
276 
277 	if (slurmctld_conf.inactive_limit == 0)
278 		return;		/* No limit, don't bother pinging */
279 
280 	job_iterator = list_iterator_create(job_list);
281 	while ((job_ptr = list_next(job_iterator))) {
282 		xassert (job_ptr->magic == JOB_MAGIC);
283 
284 		if (!IS_JOB_RUNNING(job_ptr))
285 			continue;
286 
287 		if ((job_ptr->time_last_active <= old) && job_ptr->other_port
288 		    &&  job_ptr->alloc_node && job_ptr->resp_host) {
289 			addr = xmalloc(sizeof(struct sockaddr_in));
290 			slurm_set_addr(addr, job_ptr->other_port,
291 				job_ptr->resp_host);
292 			msg_arg = xmalloc(sizeof(srun_ping_msg_t));
293 			msg_arg->job_id  = job_ptr->job_id;
294 			msg_arg->step_id = NO_VAL;
295 			_srun_agent_launch(addr, job_ptr->alloc_node,
296 					   SRUN_PING, msg_arg,
297 					   job_ptr->start_protocol_ver);
298 		}
299 	}
300 
301 	list_iterator_destroy(job_iterator);
302 }
303 
304 /*
305  * srun_step_timeout - notify srun of a job step's imminent timeout
306  * IN step_ptr - pointer to the slurmctld step record
307  * IN timeout_val - when it is going to time out
308  */
srun_step_timeout(step_record_t * step_ptr,time_t timeout_val)309 extern void srun_step_timeout(step_record_t *step_ptr, time_t timeout_val)
310 {
311 	slurm_addr_t *addr;
312 	srun_timeout_msg_t *msg_arg;
313 
314 	xassert(step_ptr);
315 
316 	if (step_ptr->batch_step || !step_ptr->port
317 	    || !step_ptr->host || (step_ptr->host[0] == '\0'))
318 		return;
319 
320 	addr = xmalloc(sizeof(struct sockaddr_in));
321 	slurm_set_addr(addr, step_ptr->port, step_ptr->host);
322 	msg_arg = xmalloc(sizeof(srun_timeout_msg_t));
323 	msg_arg->job_id   = step_ptr->job_ptr->job_id;
324 	msg_arg->step_id  = step_ptr->step_id;
325 	msg_arg->timeout  = timeout_val;
326 	_srun_agent_launch(addr, step_ptr->host, SRUN_TIMEOUT, msg_arg,
327 			   step_ptr->start_protocol_ver);
328 }
329 
330 /*
331  * srun_timeout - notify srun of a job's imminent timeout
332  * IN job_ptr - pointer to the slurmctld job record
333  */
srun_timeout(job_record_t * job_ptr)334 extern void srun_timeout(job_record_t *job_ptr)
335 {
336 	slurm_addr_t * addr;
337 	srun_timeout_msg_t *msg_arg;
338 	ListIterator step_iterator;
339 	step_record_t *step_ptr;
340 
341 	xassert(job_ptr);
342 	if (!IS_JOB_RUNNING(job_ptr))
343 		return;
344 
345 	if (job_ptr->other_port && job_ptr->alloc_node && job_ptr->resp_host) {
346 		addr = xmalloc(sizeof(struct sockaddr_in));
347 		slurm_set_addr(addr, job_ptr->other_port, job_ptr->resp_host);
348 		msg_arg = xmalloc(sizeof(srun_timeout_msg_t));
349 		msg_arg->job_id   = job_ptr->job_id;
350 		msg_arg->step_id  = NO_VAL;
351 		msg_arg->timeout  = job_ptr->end_time;
352 		_srun_agent_launch(addr, job_ptr->alloc_node, SRUN_TIMEOUT,
353 				   msg_arg, job_ptr->start_protocol_ver);
354 	}
355 
356 
357 	step_iterator = list_iterator_create(job_ptr->step_list);
358 	while ((step_ptr = list_next(step_iterator)))
359 		srun_step_timeout(step_ptr, job_ptr->end_time);
360 	list_iterator_destroy(step_iterator);
361 }
362 
363 /*
364  * srun_user_message - Send arbitrary message to an srun job (no job steps)
365  */
srun_user_message(job_record_t * job_ptr,char * msg)366 extern int srun_user_message(job_record_t *job_ptr, char *msg)
367 {
368 	slurm_addr_t * addr;
369 	srun_user_msg_t *msg_arg;
370 
371 	xassert(job_ptr);
372 	if (!IS_JOB_PENDING(job_ptr) && !IS_JOB_RUNNING(job_ptr))
373 		return ESLURM_ALREADY_DONE;
374 
375 	if (job_ptr->other_port &&
376 	    job_ptr->resp_host && job_ptr->resp_host[0]) {
377 		addr = xmalloc(sizeof(struct sockaddr_in));
378 		slurm_set_addr(addr, job_ptr->other_port, job_ptr->resp_host);
379 		msg_arg = xmalloc(sizeof(srun_user_msg_t));
380 		msg_arg->job_id = job_ptr->job_id;
381 		msg_arg->msg    = xstrdup(msg);
382 		_srun_agent_launch(addr, job_ptr->resp_host, SRUN_USER_MSG,
383 				   msg_arg, job_ptr->start_protocol_ver);
384 		return SLURM_SUCCESS;
385 	} else if (job_ptr->batch_flag && IS_JOB_RUNNING(job_ptr)) {
386 #ifndef HAVE_FRONT_END
387 		node_record_t *node_ptr;
388 #endif
389 		job_notify_msg_t *notify_msg_ptr;
390 		agent_arg_t *agent_arg_ptr;
391 #ifdef HAVE_FRONT_END
392 		if (job_ptr->batch_host == NULL)
393 			return ESLURM_DISABLED;	/* no allocated nodes */
394 		agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
395 		agent_arg_ptr->hostlist = hostlist_create(job_ptr->batch_host);
396 		if (!agent_arg_ptr->hostlist)
397 			fatal("Invalid srun host: %s", job_ptr->batch_host);
398 
399 		if (job_ptr->front_end_ptr)
400 			agent_arg_ptr->protocol_version =
401 				job_ptr->front_end_ptr->protocol_version;
402 
403 #else
404 		node_ptr = find_first_node_record(job_ptr->node_bitmap);
405 		if (node_ptr == NULL)
406 			return ESLURM_DISABLED;	/* no allocated nodes */
407 		agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
408 		agent_arg_ptr->hostlist = hostlist_create(node_ptr->name);
409 		agent_arg_ptr->protocol_version = node_ptr->protocol_version;
410 		if (!agent_arg_ptr->hostlist)
411 			fatal("Invalid srun host: %s", node_ptr->name);
412 #endif
413 		notify_msg_ptr = (job_notify_msg_t *)
414 				 xmalloc(sizeof(job_notify_msg_t));
415 		notify_msg_ptr->job_id = job_ptr->job_id;
416 		notify_msg_ptr->message = xstrdup(msg);
417 		agent_arg_ptr->node_count = 1;
418 		agent_arg_ptr->retry = 0;
419 		agent_arg_ptr->msg_type = REQUEST_JOB_NOTIFY;
420 		agent_arg_ptr->msg_args = (void *) notify_msg_ptr;
421 		/* Launch the RPC via agent */
422 		agent_queue_request(agent_arg_ptr);
423 		return SLURM_SUCCESS;
424 	}
425 	return ESLURM_DISABLED;
426 }
427 
428 /*
429  * srun_job_complete - notify srun of a job's termination
430  * IN job_ptr - pointer to the slurmctld job record
431  */
srun_job_complete(job_record_t * job_ptr)432 extern void srun_job_complete(job_record_t *job_ptr)
433 {
434 	slurm_addr_t * addr;
435 	srun_job_complete_msg_t *msg_arg;
436 	ListIterator step_iterator;
437 	step_record_t *step_ptr;
438 
439 	xassert(job_ptr);
440 
441 	if (job_ptr->other_port && job_ptr->alloc_node && job_ptr->resp_host) {
442 		addr = xmalloc(sizeof(struct sockaddr_in));
443 		slurm_set_addr(addr, job_ptr->other_port, job_ptr->resp_host);
444 		msg_arg = xmalloc(sizeof(srun_job_complete_msg_t));
445 		msg_arg->job_id   = job_ptr->job_id;
446 		msg_arg->step_id  = NO_VAL;
447 		_srun_agent_launch(addr, job_ptr->alloc_node,
448 				   SRUN_JOB_COMPLETE, msg_arg,
449 				   job_ptr->start_protocol_ver);
450 	}
451 
452 	step_iterator = list_iterator_create(job_ptr->step_list);
453 	while ((step_ptr = list_next(step_iterator))) {
454 		if (step_ptr->batch_step)	/* batch script itself */
455 			continue;
456 		srun_step_complete(step_ptr);
457 	}
458 	list_iterator_destroy(step_iterator);
459 }
460 
461 /*
462  * srun_job_suspend - notify salloc of suspend/resume operation
463  * IN job_ptr - pointer to the slurmctld job record
464  * IN op - SUSPEND_JOB or RESUME_JOB (enum suspend_opts from slurm.h)
465  * RET - true if message send, otherwise false
466  */
srun_job_suspend(job_record_t * job_ptr,uint16_t op)467 extern bool srun_job_suspend(job_record_t *job_ptr, uint16_t op)
468 {
469 	slurm_addr_t * addr;
470 	suspend_msg_t *msg_arg;
471 	bool msg_sent = false;
472 
473 	xassert(job_ptr);
474 
475 	if (job_ptr->other_port && job_ptr->alloc_node && job_ptr->resp_host) {
476 		addr = xmalloc(sizeof(struct sockaddr_in));
477 		slurm_set_addr(addr, job_ptr->other_port, job_ptr->resp_host);
478 		msg_arg = xmalloc(sizeof(suspend_msg_t));
479 		msg_arg->job_id  = job_ptr->job_id;
480 		msg_arg->op     = op;
481 		_srun_agent_launch(addr, job_ptr->alloc_node,
482 				   SRUN_REQUEST_SUSPEND, msg_arg,
483 				   job_ptr->start_protocol_ver);
484 		msg_sent = true;
485 	}
486 	return msg_sent;
487 }
488 
489 /*
490  * srun_step_complete - notify srun of a job step's termination
491  * IN step_ptr - pointer to the slurmctld job step record
492  */
srun_step_complete(step_record_t * step_ptr)493 extern void srun_step_complete(step_record_t *step_ptr)
494 {
495 	slurm_addr_t * addr;
496 	srun_job_complete_msg_t *msg_arg;
497 
498 	xassert(step_ptr);
499 	if (step_ptr->port && step_ptr->host && step_ptr->host[0]) {
500 		addr = xmalloc(sizeof(struct sockaddr_in));
501 		slurm_set_addr(addr, step_ptr->port, step_ptr->host);
502 		msg_arg = xmalloc(sizeof(srun_job_complete_msg_t));
503 		msg_arg->job_id   = step_ptr->job_ptr->job_id;
504 		msg_arg->step_id  = step_ptr->step_id;
505 		_srun_agent_launch(addr, step_ptr->host, SRUN_JOB_COMPLETE,
506 				   msg_arg, step_ptr->start_protocol_ver);
507 	}
508 }
509 
510 /*
511  * srun_step_missing - notify srun that a job step is missing from
512  *		       a node we expect to find it on
513  * IN step_ptr  - pointer to the slurmctld job step record
514  * IN node_list - name of nodes we did not find the step on
515  */
srun_step_missing(step_record_t * step_ptr,char * node_list)516 extern void srun_step_missing(step_record_t *step_ptr, char *node_list)
517 {
518 	slurm_addr_t * addr;
519 	srun_step_missing_msg_t *msg_arg;
520 
521 	xassert(step_ptr);
522 	if (step_ptr->port && step_ptr->host && step_ptr->host[0]) {
523 		addr = xmalloc(sizeof(struct sockaddr_in));
524 		slurm_set_addr(addr, step_ptr->port, step_ptr->host);
525 		msg_arg = xmalloc(sizeof(srun_step_missing_msg_t));
526 		msg_arg->job_id   = step_ptr->job_ptr->job_id;
527 		msg_arg->step_id  = step_ptr->step_id;
528 		msg_arg->nodelist = xstrdup(node_list);
529 		_srun_agent_launch(addr, step_ptr->host, SRUN_STEP_MISSING,
530 				   msg_arg, step_ptr->start_protocol_ver);
531 	}
532 }
533 
534 /*
535  * srun_step_signal - notify srun that a job step should be signaled
536  * NOTE: Needed on BlueGene/Q to signal runjob process
537  * IN step_ptr  - pointer to the slurmctld job step record
538  * IN signal - signal number
539  */
srun_step_signal(step_record_t * step_ptr,uint16_t signal)540 extern void srun_step_signal(step_record_t *step_ptr, uint16_t signal)
541 {
542 	slurm_addr_t * addr;
543 	job_step_kill_msg_t *msg_arg;
544 
545 	xassert(step_ptr);
546 	if (step_ptr->port && step_ptr->host && step_ptr->host[0]) {
547 		addr = xmalloc(sizeof(struct sockaddr_in));
548 		slurm_set_addr(addr, step_ptr->port, step_ptr->host);
549 		msg_arg = xmalloc(sizeof(job_step_kill_msg_t));
550 		msg_arg->job_id      = step_ptr->job_ptr->job_id;
551 		msg_arg->job_step_id = step_ptr->step_id;
552 		msg_arg->signal      = signal;
553 		_srun_agent_launch(addr, step_ptr->host, SRUN_STEP_SIGNAL,
554 				   msg_arg, step_ptr->start_protocol_ver);
555 	}
556 }
557 
558 /*
559  * srun_exec - request that srun execute a specific command
560  *	and route it's output to stdout
561  * IN step_ptr - pointer to the slurmctld job step record
562  * IN argv - command and arguments to execute
563  */
srun_exec(step_record_t * step_ptr,char ** argv)564 extern void srun_exec(step_record_t *step_ptr, char **argv)
565 {
566 	slurm_addr_t * addr;
567 	srun_exec_msg_t *msg_arg;
568 	int cnt = 1, i;
569 
570 	xassert(step_ptr);
571 
572 	if (step_ptr->port && step_ptr->host && step_ptr->host[0]) {
573 		for (i=0; argv[i]; i++)
574 			cnt++;	/* start at 1 to include trailing NULL */
575 		addr = xmalloc(sizeof(struct sockaddr_in));
576 		slurm_set_addr(addr, step_ptr->port, step_ptr->host);
577 		msg_arg = xmalloc(sizeof(srun_exec_msg_t));
578 		msg_arg->job_id  = step_ptr->job_ptr->job_id;
579 		msg_arg->step_id = step_ptr->step_id;
580 		msg_arg->argc    = cnt;
581 		msg_arg->argv    = xmalloc(sizeof(char *) * cnt);
582 		for (i=0; i<cnt ; i++)
583 			msg_arg->argv[i] = xstrdup(argv[i]);
584 		_srun_agent_launch(addr, step_ptr->host, SRUN_EXEC,
585 				   msg_arg, step_ptr->start_protocol_ver);
586 	} else {
587 		error("srun_exec %pS lacks communication channel",
588 		      step_ptr);
589 	}
590 }
591 
592 /*
593  * srun_response - note that srun has responded
594  * IN job_id  - id of job responding
595  * IN step_id - id of step responding or NO_VAL if not a step
596  */
srun_response(uint32_t job_id,uint32_t step_id)597 extern void srun_response(uint32_t job_id, uint32_t step_id)
598 {
599 	job_record_t *job_ptr = find_job_record(job_id);
600 	time_t now = time(NULL);
601 
602 	if (job_ptr == NULL)
603 		return;
604 	job_ptr->time_last_active = now;
605 }
606