1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 1990-2011 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *               Glenn Fowler <glenn.s.fowler@gmail.com>                *
18 *                                                                      *
19 ***********************************************************************/
20 #pragma prototyped
21 /*
22  * Glenn Fowler
23  * AT&T Bell Laboratories
24  *
25  * remote coshell server job and connection support
26  */
27 
28 #include "service.h"
29 
30 /*
31  * drop a connection
32  */
33 
34 void
drop(register int fd)35 drop(register int fd)
36 {
37 	register int	i;
38 	register int	n;
39 	Coshell_t*	sp;
40 	Cojob_t*	jp;
41 	Sfio_t*		tp;
42 
43 	switch (state.con[fd].type)
44 	{
45 	case USER:
46 		if (state.con[fd].info.user.running)
47 			for (jp = state.job; jp <= state.jobmax; jp++)
48 				if (jp->fd == fd && jp->pid)
49 				{
50 					jp->fd = 0;
51 					jobkill(jp, SIGKILL);
52 				}
53 		/*FALLTHROUGH*/
54 	case INIT:
55 		for (i = 0; i < elementsof(state.con[fd].info.user.fds); i++)
56 			if ((n = state.con[fd].info.user.fds[i]) != fd && n >= 0 && state.con[n].type)
57 			{
58 				close(n);
59 				state.con[n].type = 0;
60 			}
61 		if (sp = state.con[fd].info.user.home)
62 			sp->home--;
63 		if (state.con[fd].info.user.pump)
64 			free(state.con[fd].info.user.pump);
65 		if (state.con[fd].info.user.expr)
66 			free(state.con[fd].info.user.expr);
67 		break;
68 	case PASS:
69 		if (tp = state.con[fd].info.pass.serialize)
70 		{
71 			state.con[fd].info.pass.serialize = 0;
72 			cswrite(state.con[fd].info.pass.fd, sfstrbase(tp), sfstrtell(tp));
73 			sfstrclose(tp);
74 		}
75 		if ((jp = state.con[fd].info.pass.job) && jp->pid)
76 		{
77 			if (--jp->ref <= 0)
78 				jobdone(jp);
79 			else if (!jp->lost)
80 				jp->lost = cs.time + UPDATE;
81 		}
82 		break;
83 	case POLL:
84 		error(ERROR_SYSTEM|3, "lost connect stream");
85 		break;
86 	case SHELL:
87 		sp = state.con[fd].info.shell;
88 		sp->fd = -1;
89 		shellclose(sp, -1);
90 		break;
91 	}
92 	state.con[fd].type = 0;
93 	state.con[fd].error = 0;
94 	csfd(fd, CS_POLL_CLOSE);
95 }
96 
97 /*
98  * check the job table for hogs running on busy shells
99  * or jobs queued on hung shells
100  * if only!=0 then only that shell is checked
101  */
102 
103 void
jobcheck(Coshell_t * only)104 jobcheck(Coshell_t* only)
105 {
106 	register Cojob_t*	jp;
107 	register Coshell_t*	sp;
108 	char*			s;
109 
110 	for (jp = state.job; jp <= state.jobmax; jp++)
111 		if (jp->pid && ((sp = jp->shell) == only || !only))
112 		{
113 			if (jp->pid > 0)
114 			{
115 				if (sp->update <= cs.time && sp->errors < ERRORS) update(sp);
116 				if (jp->lost && jp->lost < cs.time)
117 				{
118 					message((-4, "jobcheck: %s: job %d pid %d lost", sp->name, jp->rid, jp->pid));
119 					jp->sig = SIGKILL;
120 					jobdone(jp);
121 					continue;
122 				}
123 				if (sp->idle)
124 				{
125 					if (sp->stat.idle < state.busy && (!sp->bypass || !miscmatch(sp, sp->bypass)))
126 					{
127 						if (!jp->busy && state.grace) jp->busy = cs.time + state.grace;
128 						else if (jp->busy < cs.time)
129 						{
130 							if (state.migrate)
131 							{
132 								int	n;
133 
134 								error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d %s", sp->name, jp - state.job, jp->pid, state.migrate);
135 								n = sfprintf(state.string, "job=%d; pid=%d; host=%s; type=%s; %s\n", jp - state.job, jp->pid, sp->name, sp->type, state.migrate);
136 								if (s = sfstruse(state.string))
137 									cswrite(jp->shell->fd, s, n);
138 								else
139 									error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "out of space");
140 								jp->sig = SIGKILL;
141 								jobdone(jp);
142 							}
143 							else
144 							{
145 #ifdef SIGSTOP
146 								error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d stopped", sp->name, jp - state.job, jp->pid);
147 								jobkill(jp, SIGSTOP);
148 #else
149 								error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d is a hog", sp->name, jp - state.job, jp->pid);
150 #endif
151 								jp->busy = HOG;
152 							}
153 						}
154 						continue;
155 					}
156 				}
157 				if (jp->busy == HOG)
158 				{
159 #ifdef SIGCONT
160 					error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d restarted on %s", jp - state.job, jp->pid, sp->name);
161 					jobkill(jp, SIGCONT);
162 #else
163 					error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d is no longer hogging %s", jp - state.job, jp->pid, sp->name);
164 					jp->busy = 0;
165 #endif
166 				}
167 			}
168 			else if (jp->cmd)
169 			{
170 				if (jp->pid == QUEUE && (sp->fd > 0 || sp == &state.wait))
171 				{
172 					if (state.running < (state.perserver + state.jobwait) && state.con[jp->fd].info.user.running < (state.peruser + 1) && (sp == &state.wait || sp->running < ((state.perhost ? state.perhost : sp->cpu * state.percpu) + 1)))
173 						shellexec(jp, jp->cmd, jp->fd);
174 				}
175 				else if (cs.time > jp->start + LOST)
176 				{
177 					message((-4, "jobcheck: %s: possibly hung %s", sp->name, fmtelapsed(cs.time - sp->start, 1)));
178 					shellclose(sp, -1);
179 				}
180 			}
181 		}
182 	if (state.jobwait || state.shellwait) cswakeup(state.wakeup = UPDATE * 1000L);
183 	else if (!state.busy || !state.running) cswakeup(state.wakeup = 0L);
184 	else if (!only) cswakeup(state.wakeup = UPDATE * 1000L);
185 }
186 
187 /*
188  * kill job with sig
189  */
190 
191 void
jobkill(Cojob_t * jp,int sig)192 jobkill(Cojob_t* jp, int sig)
193 {
194 	int	n;
195 	char	buf[128];
196 
197 	if (jp->pid)
198 	{
199 		jp->sig = sig;
200 		jp->busy = 0;
201 		if (jp->pid > 0 && jp->shell->fd > 0)
202 		{
203 			n = sfsprintf(buf, sizeof(buf), "kill -%s -%d\n", fmtsignal(-sig), jp->pid);
204 			cswrite(jp->shell->fd, buf, n);
205 			message((-2, "killpg -%s %s.%d", fmtsignal(-sig), jp->shell->name, jp->pid));
206 		}
207 		if (sig == SIGKILL) jobdone(jp);
208 	}
209 }
210 
211 /*
212  * job jp is done
213  */
214 
215 void
jobdone(register Cojob_t * jp)216 jobdone(register Cojob_t* jp)
217 {
218 	register int	n;
219 	char		buf[64];
220 
221 	jp->pid = 0;
222 	jp->ref = 0;
223 	if (jp->cmd)
224 	{
225 		free(jp->cmd);
226 		jp->cmd = 0;
227 		state.jobwait--;
228 	}
229 	if (jp->shell == &state.wait) state.joblimit--;
230 	else if (jp->shell->running)
231 	{
232 		jp->shell->running--;
233 		if (!--state.running)
234 		{
235 			state.real += cs.time - state.clock;
236 			cswakeup(state.wakeup = 0L);
237 		}
238 	}
239 	if (jp->fd > 0)
240 	{
241 		jp->shell->user += jp->user;
242 		jp->shell->sys += jp->sys;
243 		state.con[jp->fd].info.user.running--;
244 		state.user += jp->user;
245 		state.sys += jp->sys;
246 		if (state.disable && (jp->sig == SIGKILL || jp->status > 128 && (jp->status % 128) == SIGKILL))
247 		{
248 			jp->shell->mode |= SHELL_DISABLE;
249 			jp->shell->update = cs.time + state.disable;
250 		}
251 #ifdef SIGCONT
252 		if (jp->sig && jp->sig != SIGCONT)
253 #else
254 		if (jp->sig)
255 #endif
256 			jp->status = jp->sig + 128;
257 		n = sfsprintf(buf, sizeof(buf), "x %d %d %s %s\n", jp->rid, jp->status, fmtelapsed(jp->user, CO_QUANT), fmtelapsed(jp->sys, CO_QUANT));
258 		if (cswrite(state.con[jp->fd].info.user.fds[0], buf, n) != n)
259 			drop(jp->fd);
260 	}
261 	if (state.joblimit) jobcheck(NiL);
262 }
263