1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1990-2011 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Eclipse Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.eclipse.org/org/documents/epl-v10.html *
11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <glenn.s.fowler@gmail.com> *
18 * *
19 ***********************************************************************/
20 #pragma prototyped
21 /*
22 * Glenn Fowler
23 * AT&T Bell Laboratories
24 *
25 * remote coshell server job and connection support
26 */
27
28 #include "service.h"
29
30 /*
31 * drop a connection
32 */
33
34 void
drop(register int fd)35 drop(register int fd)
36 {
37 register int i;
38 register int n;
39 Coshell_t* sp;
40 Cojob_t* jp;
41 Sfio_t* tp;
42
43 switch (state.con[fd].type)
44 {
45 case USER:
46 if (state.con[fd].info.user.running)
47 for (jp = state.job; jp <= state.jobmax; jp++)
48 if (jp->fd == fd && jp->pid)
49 {
50 jp->fd = 0;
51 jobkill(jp, SIGKILL);
52 }
53 /*FALLTHROUGH*/
54 case INIT:
55 for (i = 0; i < elementsof(state.con[fd].info.user.fds); i++)
56 if ((n = state.con[fd].info.user.fds[i]) != fd && n >= 0 && state.con[n].type)
57 {
58 close(n);
59 state.con[n].type = 0;
60 }
61 if (sp = state.con[fd].info.user.home)
62 sp->home--;
63 if (state.con[fd].info.user.pump)
64 free(state.con[fd].info.user.pump);
65 if (state.con[fd].info.user.expr)
66 free(state.con[fd].info.user.expr);
67 break;
68 case PASS:
69 if (tp = state.con[fd].info.pass.serialize)
70 {
71 state.con[fd].info.pass.serialize = 0;
72 cswrite(state.con[fd].info.pass.fd, sfstrbase(tp), sfstrtell(tp));
73 sfstrclose(tp);
74 }
75 if ((jp = state.con[fd].info.pass.job) && jp->pid)
76 {
77 if (--jp->ref <= 0)
78 jobdone(jp);
79 else if (!jp->lost)
80 jp->lost = cs.time + UPDATE;
81 }
82 break;
83 case POLL:
84 error(ERROR_SYSTEM|3, "lost connect stream");
85 break;
86 case SHELL:
87 sp = state.con[fd].info.shell;
88 sp->fd = -1;
89 shellclose(sp, -1);
90 break;
91 }
92 state.con[fd].type = 0;
93 state.con[fd].error = 0;
94 csfd(fd, CS_POLL_CLOSE);
95 }
96
97 /*
98 * check the job table for hogs running on busy shells
99 * or jobs queued on hung shells
100 * if only!=0 then only that shell is checked
101 */
102
103 void
jobcheck(Coshell_t * only)104 jobcheck(Coshell_t* only)
105 {
106 register Cojob_t* jp;
107 register Coshell_t* sp;
108 char* s;
109
110 for (jp = state.job; jp <= state.jobmax; jp++)
111 if (jp->pid && ((sp = jp->shell) == only || !only))
112 {
113 if (jp->pid > 0)
114 {
115 if (sp->update <= cs.time && sp->errors < ERRORS) update(sp);
116 if (jp->lost && jp->lost < cs.time)
117 {
118 message((-4, "jobcheck: %s: job %d pid %d lost", sp->name, jp->rid, jp->pid));
119 jp->sig = SIGKILL;
120 jobdone(jp);
121 continue;
122 }
123 if (sp->idle)
124 {
125 if (sp->stat.idle < state.busy && (!sp->bypass || !miscmatch(sp, sp->bypass)))
126 {
127 if (!jp->busy && state.grace) jp->busy = cs.time + state.grace;
128 else if (jp->busy < cs.time)
129 {
130 if (state.migrate)
131 {
132 int n;
133
134 error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d %s", sp->name, jp - state.job, jp->pid, state.migrate);
135 n = sfprintf(state.string, "job=%d; pid=%d; host=%s; type=%s; %s\n", jp - state.job, jp->pid, sp->name, sp->type, state.migrate);
136 if (s = sfstruse(state.string))
137 cswrite(jp->shell->fd, s, n);
138 else
139 error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "out of space");
140 jp->sig = SIGKILL;
141 jobdone(jp);
142 }
143 else
144 {
145 #ifdef SIGSTOP
146 error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d stopped", sp->name, jp - state.job, jp->pid);
147 jobkill(jp, SIGSTOP);
148 #else
149 error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d is a hog", sp->name, jp - state.job, jp->pid);
150 #endif
151 jp->busy = HOG;
152 }
153 }
154 continue;
155 }
156 }
157 if (jp->busy == HOG)
158 {
159 #ifdef SIGCONT
160 error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d restarted on %s", jp - state.job, jp->pid, sp->name);
161 jobkill(jp, SIGCONT);
162 #else
163 error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d is no longer hogging %s", jp - state.job, jp->pid, sp->name);
164 jp->busy = 0;
165 #endif
166 }
167 }
168 else if (jp->cmd)
169 {
170 if (jp->pid == QUEUE && (sp->fd > 0 || sp == &state.wait))
171 {
172 if (state.running < (state.perserver + state.jobwait) && state.con[jp->fd].info.user.running < (state.peruser + 1) && (sp == &state.wait || sp->running < ((state.perhost ? state.perhost : sp->cpu * state.percpu) + 1)))
173 shellexec(jp, jp->cmd, jp->fd);
174 }
175 else if (cs.time > jp->start + LOST)
176 {
177 message((-4, "jobcheck: %s: possibly hung %s", sp->name, fmtelapsed(cs.time - sp->start, 1)));
178 shellclose(sp, -1);
179 }
180 }
181 }
182 if (state.jobwait || state.shellwait) cswakeup(state.wakeup = UPDATE * 1000L);
183 else if (!state.busy || !state.running) cswakeup(state.wakeup = 0L);
184 else if (!only) cswakeup(state.wakeup = UPDATE * 1000L);
185 }
186
187 /*
188 * kill job with sig
189 */
190
191 void
jobkill(Cojob_t * jp,int sig)192 jobkill(Cojob_t* jp, int sig)
193 {
194 int n;
195 char buf[128];
196
197 if (jp->pid)
198 {
199 jp->sig = sig;
200 jp->busy = 0;
201 if (jp->pid > 0 && jp->shell->fd > 0)
202 {
203 n = sfsprintf(buf, sizeof(buf), "kill -%s -%d\n", fmtsignal(-sig), jp->pid);
204 cswrite(jp->shell->fd, buf, n);
205 message((-2, "killpg -%s %s.%d", fmtsignal(-sig), jp->shell->name, jp->pid));
206 }
207 if (sig == SIGKILL) jobdone(jp);
208 }
209 }
210
211 /*
212 * job jp is done
213 */
214
215 void
jobdone(register Cojob_t * jp)216 jobdone(register Cojob_t* jp)
217 {
218 register int n;
219 char buf[64];
220
221 jp->pid = 0;
222 jp->ref = 0;
223 if (jp->cmd)
224 {
225 free(jp->cmd);
226 jp->cmd = 0;
227 state.jobwait--;
228 }
229 if (jp->shell == &state.wait) state.joblimit--;
230 else if (jp->shell->running)
231 {
232 jp->shell->running--;
233 if (!--state.running)
234 {
235 state.real += cs.time - state.clock;
236 cswakeup(state.wakeup = 0L);
237 }
238 }
239 if (jp->fd > 0)
240 {
241 jp->shell->user += jp->user;
242 jp->shell->sys += jp->sys;
243 state.con[jp->fd].info.user.running--;
244 state.user += jp->user;
245 state.sys += jp->sys;
246 if (state.disable && (jp->sig == SIGKILL || jp->status > 128 && (jp->status % 128) == SIGKILL))
247 {
248 jp->shell->mode |= SHELL_DISABLE;
249 jp->shell->update = cs.time + state.disable;
250 }
251 #ifdef SIGCONT
252 if (jp->sig && jp->sig != SIGCONT)
253 #else
254 if (jp->sig)
255 #endif
256 jp->status = jp->sig + 128;
257 n = sfsprintf(buf, sizeof(buf), "x %d %d %s %s\n", jp->rid, jp->status, fmtelapsed(jp->user, CO_QUANT), fmtelapsed(jp->sys, CO_QUANT));
258 if (cswrite(state.con[jp->fd].info.user.fds[0], buf, n) != n)
259 drop(jp->fd);
260 }
261 if (state.joblimit) jobcheck(NiL);
262 }
263