1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1990-2011 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Eclipse Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.eclipse.org/org/documents/epl-v10.html *
11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <gsf@research.att.com> *
18 * *
19 ***********************************************************************/
20 #pragma prototyped
21 /*
22 * Glenn Fowler
23 * AT&T Research
24 */
25
26 static const char usage[] =
27 "[-?\n@(#)$Id: mbb (AT&T Research) 2000-05-09 $\n]"
28 USAGE_LICENSE
29 "[+NAME?mbb - message bulletin board server]"
30 "[+DESCRIPTION?\bmbb\b is a message bulletin board server. Each \bmbb\b"
31 " instance is a session. Each session supports up to 64 message"
32 " channels, labeled from 0 to 63. A client connects to \bmbb\b and"
33 " provides a mask of channels that it is interested in. All subsequent"
34 " messages in the mask are sent to the client. Channel 0 is reserved"
35 " for service control messages.]"
36 "[+?A message is a newline terminated line with length 8K bytes max"
37 " that does not contain the ASCII NUL character. A message must"
38 " be prefixed by its channel number. The server changes this to"
39 " \achannel\a.\aid\a where \aid\a is the server assigned client"
40 " identification number. Messages with invalid or missing channel"
41 " numbers are silently rejected.]"
42
43 "[b:backlog?Every \an\ath client message with length > 1 will be treated as"
44 " if only half the data were sent. This should kick in the message"
45 " backlog logic.]#[n]"
46 "[d:debug?Set the debug trace level to \alevel\a. Higher levels produce more"
47 " output.]#[level]"
48 "[t:timeout?The service will exit after a \atime\a period of client"
49 " inactivity. The default is to run until the system crashes.]:[time]"
50
51 "\n"
52 "\nconnect-stream\n"
53 "\n"
54
55 "[+PROTOCOL?Channel 0 is for service control messages. The server and clients"
56 " may send messages on channel 0, with the exception that client"
57 " messages on channel 0 are not sent to the other clients. The control"
58 " messages are:]{"
59 " [+0 listen \amask\a?[client]] The client is interested in"
60 " channel numbers in the bitmask \amask\a. The default"
61 " \amask\a is \b0xfffffffffffffffe\b; i.e., all but"
62 " channel 0.]"
63 " [+0 join \aid\a?[server]] Client with server assigned id"
64 " number \aid\a has joined the session.]"
65 " [+0 drop \aid\a?[server]] Client with server assigned id"
66 " number \aid\a has dropped from the session.]"
67 " }"
68
69 "[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bss\b(1), \bcs\b(3)]"
70 ;
71
72 #include <css.h>
73 #include <ctype.h>
74 #include <debug.h>
75 #include <error.h>
76 #include <ls.h>
77 #include <stdarg.h>
78
79 typedef struct Log_s
80 {
81 char name[2]; /* file name */
82 Sfio_t* sp; /* r/w stream */
83 Sfoff_t offset; /* next write offset */
84 size_t blocked; /* blocked connections */
85 } Log_t;
86
87 typedef struct Connection_s
88 {
89 struct Connection_s* next;
90 Cssfd_t* fp;
91 Sfulong_t mask;
92 Sfoff_t blocked[2];
93 } Connection_t;
94
95 typedef struct State_s
96 {
97 Cssdisc_t disc;
98 Connection_t* all;
99 Sfio_t* tmp;
100 Log_t logs[2];
101 int backlog;
102 int count;
103 int log;
104 int logged;
105 } State_t;
106
107 #define ALL (-1)
108 #define CHUNK (4 * 1024)
109 #define HOG (4 * 1024 * 1024)
110
111 #define CHAN_DEFAULT (((Sfulong_t)~0)^1)
112 #define CHAN_VALID(c) ((c)>=0&&(c)<=63)
113 #define CHAN_MASK(c) (((Sfulong_t)1)<<(c))
114
115 static char buf[8 * 1024];
116
117 static ssize_t
data(register State_t * state,register Connection_t * to,char * s,size_t n,int force)118 data(register State_t* state, register Connection_t* to, char* s, size_t n, int force)
119 {
120 if (!force && n > 1 && state->backlog && ++state->count >= state->backlog)
121 {
122 message((-1, "[%d] %d backlog", __LINE__, to->fp->fd));
123 state->count = 0;
124 n /= 2;
125 }
126 return write(to->fp->fd, s, n);
127 }
128
129 static int
note(Css_t * css,register Connection_t * to,int log,char * s,size_t n,int force,Cssdisc_t * disc)130 note(Css_t* css, register Connection_t* to, int log, char* s, size_t n, int force, Cssdisc_t* disc)
131 {
132 register State_t* state = (State_t*)disc;
133 ssize_t z;
134
135 if ((force || to->blocked[log] < 0) && (z = data(state, to, s, n, force)) != n)
136 {
137 if (!force && !state->logged)
138 {
139 state->logged = 1;
140 if (!state->logs[log].sp)
141 {
142 state->logs[log].name[0] = '0' + log;
143 remove(state->logs[log].name);
144 if (!(state->logs[log].sp = sfopen(NiL, state->logs[log].name, "r+")))
145 error(ERROR_SYSTEM|3, "%s: cannot create message log", state->logs[log].name);
146 message((-1, "[%d] %s: create log", __LINE__, state->logs[log].name));
147 }
148 message((-1, "[%d] %s: %d log", __LINE__, state->logs[log].name, to->fp->fd));
149 if (sfwrite(state->logs[log].sp, s, n) != n)
150 error(ERROR_SYSTEM|3, "%s: log file write error", state->logs[log].name);
151 if ((state->logs[log].offset += n) >= HOG && !state->logs[!log].sp)
152 state->log = !log;
153 }
154 if (to->blocked[log] < 0)
155 {
156 message((-1, "[%d] %s: block", __LINE__, state->logs[log].name));
157 state->logs[log].blocked++;
158 }
159 to->blocked[log] = state->logs[log].offset - n + z;
160 message((-1, "[%d] %s: %d offset %I*d", __LINE__, state->logs[log].name, to->fp->fd, sizeof(to->blocked[log]), to->blocked[log]));
161 cssfd(css, to->fp->fd, CS_POLL_READ|CS_POLL_WRITE);
162 return 0;
163 }
164 if (to->blocked[log] >= 0)
165 {
166 message((-1, "[%d] %s: %d unblock", __LINE__, state->logs[log].name, to->fp->fd));
167 to->blocked[log] = -1;
168 if (!--state->logs[log].blocked)
169 {
170 sfclose(state->logs[log].sp);
171 state->logs[log].sp = 0;
172 state->logs[log].offset = 0;
173 remove(state->logs[log].name);
174 message((-1, "[%d] %s: clear", __LINE__, state->logs[log].name));
175 }
176 }
177 return 1;
178 }
179
180 static int
dump(Css_t * css,register Connection_t * con,int log,Cssdisc_t * disc)181 dump(Css_t* css, register Connection_t* con, int log, Cssdisc_t* disc)
182 {
183 register State_t* state = (State_t*)disc;
184 char* s;
185 size_t n;
186 int r;
187
188 n = state->logs[log].offset - con->blocked[log];
189 if (n > CHUNK)
190 n = CHUNK;
191 if (sfseek(state->logs[log].sp, con->blocked[log], SEEK_SET) != con->blocked[log])
192 error(ERROR_SYSTEM|3, "%s: cannot seek to %I*d", state->logs[log].name, sizeof(con->blocked[log]), con->blocked[log]);
193 message((-1, "[%d] %s reserve n %I*d offset %I*d", __LINE__, state->logs[log].name, sizeof(n), n, sizeof(con->blocked[log]), con->blocked[log]));
194 if (!(s = sfreserve(state->logs[log].sp, n, 0)))
195 error(ERROR_SYSTEM|3, "%s: cannot reserve %d at %I*d", state->logs[log].name, sizeof(n), n, sizeof(con->blocked[log]), con->blocked[log]);
196 r = note(css, con, log, s, n, 1, disc);
197 if (state->logs[log].sp && sfseek(state->logs[log].sp, state->logs[log].offset, SEEK_SET) != state->logs[log].offset)
198 error(ERROR_SYSTEM|3, "%s: cannot seek to %I*d", state->logs[log].name, sizeof(state->logs[log].offset), state->logs[log].offset);
199 return r;
200 }
201
202 static int
post(Css_t * css,Cssdisc_t * disc,Connection_t * from,register Connection_t * to,int channel,const char * format,...)203 post(Css_t* css, Cssdisc_t* disc, Connection_t* from, register Connection_t* to, int channel, const char* format, ...)
204 {
205 State_t* state = (State_t*)disc;
206 char* s;
207 ssize_t n;
208 Sfulong_t m;
209 va_list ap;
210
211 sfprintf(state->tmp, "%d", channel);
212 if (from)
213 sfprintf(state->tmp, ".%d", from->fp->fd);
214 sfputc(state->tmp, ' ');
215 va_start(ap, format);
216 sfvprintf(state->tmp, format, ap);
217 va_end(ap);
218 sfputc(state->tmp, '\n');
219 n = sfstrtell(state->tmp);
220 if (!(s = sfstruse(state->tmp)))
221 error(ERROR_SYSTEM|3, "out of space");
222 m = CHAN_MASK(channel);
223 state->logged = 0;
224 if (!to)
225 {
226 for (to = state->all; to; to = to->next)
227 if ((to->mask & m) && to != from)
228 note(css, to, state->log, s, n, 0, disc);
229 }
230 else if (to->mask & m)
231 note(css, to, state->log, s, n, 0, disc);
232 return 0;
233 }
234
235 static void
drop(Css_t * css,Connection_t * con,Cssdisc_t * disc)236 drop(Css_t* css, Connection_t* con, Cssdisc_t* disc)
237 {
238 register State_t* state = (State_t*)disc;
239 register Connection_t* cp;
240 register Connection_t* pp;
241
242 pp = 0;
243 for (cp = state->all; cp; pp = cp, cp = cp->next)
244 if (cp == con)
245 {
246 if (pp)
247 pp->next = cp->next;
248 else
249 state->all = cp->next;
250 cp->fp->data = 0;
251 free(cp);
252 post(css, disc, cp, NiL, 0, "drop");
253 break;
254 }
255 }
256
257 static int
acceptf(Css_t * css,Cssfd_t * fp,Csid_t * ip,char ** av,Cssdisc_t * disc)258 acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
259 {
260 register State_t* state = (State_t*)disc;
261 register Connection_t* con;
262 int i;
263
264 NoP(ip);
265 NoP(av);
266 if (!(con = newof(0, Connection_t, 1, 0)))
267 return -1;
268 fp->data = con;
269 con->fp = fp;
270 con->mask = CHAN_DEFAULT;
271 for (i = 0; i < elementsof(state->logs); i++)
272 con->blocked[i] = -1;
273 con->next = state->all;
274 state->all = con;
275 post(css, disc, con, NiL, 0, "join");
276 return fp->fd;
277 }
278
279 static int
actionf(register Css_t * css,register Cssfd_t * fp,Cssdisc_t * disc)280 actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
281 {
282 register State_t* state = (State_t*)disc;
283 register Connection_t* con;
284 register char* s;
285 char* e;
286 int n;
287 int c;
288 Sfulong_t m;
289 Sfulong_t o;
290
291 switch (fp->status)
292 {
293 case CS_POLL_CLOSE:
294 if (!(con = (Connection_t*)fp->data))
295 return -1;
296 drop(css, con, disc);
297 return 0;
298 case CS_POLL_READ:
299 if (!(con = (Connection_t*)fp->data))
300 return -1;
301 if ((n = csread(css->state, fp->fd, buf, sizeof(buf) - 1, CS_LINE)) <= 0)
302 {
303 drop(css, con, disc);
304 return -1;
305 }
306 if (n > 0 && buf[n - 1] == '\n')
307 n--;
308 buf[n] = 0;
309 for (s = buf; isspace(*s); s++);
310 c = (int)strtol(s, &e, 0);
311 if (CHAN_VALID(c) && e > s)
312 {
313 s = e;
314 if (*s == '.')
315 while (isdigit(*++s));
316 for (; isspace(*s); s++);
317 if (c == 0)
318 {
319 for (e = s; *s && !isspace(*s); s++);
320 if (*s)
321 for (*s++ = 0; isspace(*s); s++);
322 switch (*e)
323 {
324 case 'm':
325 if (!strcmp(e, "mask"))
326 {
327 o = con->mask;
328 if (*s)
329 {
330 m = strtoull(s, &e, 0);
331 if (e > s)
332 con->mask = m;
333 }
334 post(css, disc, con, con, 0, "mask 0x%I*x 0x%I*x", sizeof(con->mask), con->mask, sizeof(o), o);
335 }
336 break;
337 case 'q':
338 /* might want privilege check here */
339 if (!strcmp(e, "quit"))
340 exit(0);
341 break;
342 }
343 }
344 else
345 post(css, disc, con, NiL, c, "%s", s);
346 }
347 return 1;
348 case CS_POLL_WRITE:
349 if (!(con = (Connection_t*)fp->data))
350 return -1;
351 if ((con->blocked[!state->log] < 0 || dump(css, con, !state->log, disc)) && con->blocked[state->log] >= 0)
352 dump(css, con, state->log, disc);
353 return 1;
354 }
355 return 0;
356 }
357
358 static int
exceptf(Css_t * css,unsigned long op,unsigned long arg,Cssdisc_t * disc)359 exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
360 {
361 switch (op)
362 {
363 case CSS_INTERRUPT:
364 error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
365 return 0;
366 case CSS_DORMANT:
367 error(2, "service dormant exit");
368 exit(0);
369 }
370 error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
371 return -1;
372 }
373
374 int
main(int argc,char ** argv)375 main(int argc, char** argv)
376 {
377 char* e;
378 State_t state;
379
380 NoP(argc);
381 error_info.id = "mbb";
382 memset(&state, 0, sizeof(state));
383 state.disc.version = CSS_VERSION;
384 state.disc.flags = CSS_DAEMON|CSS_ERROR|CSS_INTERRUPT|CSS_LOG;
385 state.disc.acceptf = acceptf;
386 state.disc.actionf = actionf;
387 state.disc.errorf = errorf;
388 state.disc.exceptf = exceptf;
389 for (;;)
390 {
391 switch (optget(argv, usage))
392 {
393 case 'b':
394 state.backlog = opt_info.num;
395 continue;
396 case 'd':
397 error_info.trace = -opt_info.num;
398 continue;
399 case 't':
400 state.disc.timeout = strelapsed(opt_info.arg, &e, 1);
401 if (*e)
402 error(3, "%s: invalid timeout value", opt_info.arg);
403 state.disc.flags |= CSS_DORMANT;
404 continue;
405 case '?':
406 error(ERROR_USAGE|4, "%s", opt_info.arg);
407 continue;
408 case ':':
409 error(2, "%s", opt_info.arg);
410 continue;
411 }
412 break;
413 }
414 argv += opt_info.index;
415 if (!argv[0] || argv[1])
416 error(ERROR_USAGE|4, "%s", optusage(NiL));
417 if (!(state.tmp = sfstropen()))
418 error(ERROR_SYSTEM|3, "out of space [tmp stream]");
419 if (!cssopen(argv[0], &state.disc))
420 return 1;
421 umask(S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
422 csspoll(CS_NEVER, 0);
423 return 1;
424 }
425