1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 1990-2011 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *                 Glenn Fowler <gsf@research.att.com>                  *
18 *                                                                      *
19 ***********************************************************************/
20 #pragma prototyped
21 /*
22  * Glenn Fowler
23  * AT&T Research
24  */
25 
26 static const char usage[] =
27 "[-?\n@(#)$Id: mbb (AT&T Research) 2000-05-09 $\n]"
28 USAGE_LICENSE
29 "[+NAME?mbb - message bulletin board server]"
30 "[+DESCRIPTION?\bmbb\b is a message bulletin board server. Each \bmbb\b"
31 "	instance is a session. Each session supports up to 64 message"
32 "	channels, labeled from 0 to 63. A client connects to \bmbb\b and"
33 "	provides a mask of channels that it is interested in. All subsequent"
34 "	messages in the mask are sent to the client. Channel 0 is reserved"
35 "	for service control messages.]"
36 "[+?A message is a newline terminated line with length 8K bytes max"
37 "	that does not contain the ASCII NUL character. A message must"
38 "	be prefixed by its channel number. The server changes this to"
39 "	\achannel\a.\aid\a where \aid\a is the server assigned client"
40 "	identification number. Messages with invalid or missing channel"
41 "	numbers are silently rejected.]"
42 
43 "[b:backlog?Every \an\ath client message with length > 1 will be treated as"
44 "	if only half the data were sent. This should kick in the message"
45 "	backlog logic.]#[n]"
46 "[d:debug?Set the debug trace level to \alevel\a. Higher levels produce more"
47 "	output.]#[level]"
48 "[t:timeout?The service will exit after a \atime\a period of client"
49 "	inactivity. The default is to run until the system crashes.]:[time]"
50 
51 "\n"
52 "\nconnect-stream\n"
53 "\n"
54 
55 "[+PROTOCOL?Channel 0 is for service control messages. The server and clients"
56 "	may send messages on channel 0, with the exception that client"
57 "	messages on channel 0 are not sent to the other clients. The control"
58 "	messages are:]{"
59 "		[+0 listen \amask\a?[client]] The client is interested in"
60 "			channel numbers in the bitmask \amask\a. The default"
61 "			\amask\a is \b0xfffffffffffffffe\b; i.e., all but"
62 "			channel 0.]"
63 "		[+0 join \aid\a?[server]] Client with server assigned id"
64 "			number \aid\a has joined the session.]"
65 "		[+0 drop \aid\a?[server]] Client with server assigned id"
66 "			number \aid\a has dropped from the session.]"
67 "	}"
68 
69 "[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bss\b(1), \bcs\b(3)]"
70 ;
71 
72 #include <css.h>
73 #include <ctype.h>
74 #include <debug.h>
75 #include <error.h>
76 #include <ls.h>
77 #include <stdarg.h>
78 
79 typedef struct Log_s
80 {
81 	char			name[2];	/* file name		*/
82 	Sfio_t*			sp;		/* r/w stream		*/
83 	Sfoff_t			offset;		/* next write offset	*/
84 	size_t			blocked;	/* blocked connections	*/
85 } Log_t;
86 
87 typedef struct Connection_s
88 {
89 	struct Connection_s*	next;
90 	Cssfd_t*		fp;
91 	Sfulong_t		mask;
92 	Sfoff_t			blocked[2];
93 } Connection_t;
94 
95 typedef struct State_s
96 {
97 	Cssdisc_t		disc;
98 	Connection_t*		all;
99 	Sfio_t*			tmp;
100 	Log_t			logs[2];
101 	int			backlog;
102 	int			count;
103 	int			log;
104 	int			logged;
105 } State_t;
106 
107 #define ALL			(-1)
108 #define CHUNK			(4 * 1024)
109 #define HOG			(4 * 1024 * 1024)
110 
111 #define CHAN_DEFAULT		(((Sfulong_t)~0)^1)
112 #define CHAN_VALID(c)		((c)>=0&&(c)<=63)
113 #define CHAN_MASK(c)		(((Sfulong_t)1)<<(c))
114 
115 static char			buf[8 * 1024];
116 
117 static ssize_t
data(register State_t * state,register Connection_t * to,char * s,size_t n,int force)118 data(register State_t* state, register Connection_t* to, char* s, size_t n, int force)
119 {
120 	if (!force && n > 1 && state->backlog && ++state->count >= state->backlog)
121 	{
122 		message((-1, "[%d] %d backlog", __LINE__, to->fp->fd));
123 		state->count = 0;
124 		n /= 2;
125 	}
126 	return write(to->fp->fd, s, n);
127 }
128 
129 static int
note(Css_t * css,register Connection_t * to,int log,char * s,size_t n,int force,Cssdisc_t * disc)130 note(Css_t* css, register Connection_t* to, int log, char* s, size_t n, int force, Cssdisc_t* disc)
131 {
132 	register State_t*	state = (State_t*)disc;
133 	ssize_t			z;
134 
135 	if ((force || to->blocked[log] < 0) && (z = data(state, to, s, n, force)) != n)
136 	{
137 		if (!force && !state->logged)
138 		{
139 			state->logged = 1;
140 			if (!state->logs[log].sp)
141 			{
142 				state->logs[log].name[0] = '0' + log;
143 				remove(state->logs[log].name);
144 				if (!(state->logs[log].sp = sfopen(NiL, state->logs[log].name, "r+")))
145 					error(ERROR_SYSTEM|3, "%s: cannot create message log", state->logs[log].name);
146 				message((-1, "[%d] %s: create log", __LINE__, state->logs[log].name));
147 			}
148 			message((-1, "[%d] %s: %d log", __LINE__, state->logs[log].name, to->fp->fd));
149 			if (sfwrite(state->logs[log].sp, s, n) != n)
150 				error(ERROR_SYSTEM|3, "%s: log file write error", state->logs[log].name);
151 			if ((state->logs[log].offset += n) >= HOG && !state->logs[!log].sp)
152 				state->log = !log;
153 		}
154 		if (to->blocked[log] < 0)
155 		{
156 			message((-1, "[%d] %s: block", __LINE__, state->logs[log].name));
157 			state->logs[log].blocked++;
158 		}
159 		to->blocked[log] = state->logs[log].offset - n + z;
160 		message((-1, "[%d] %s: %d offset %I*d", __LINE__, state->logs[log].name, to->fp->fd, sizeof(to->blocked[log]), to->blocked[log]));
161 		cssfd(css, to->fp->fd, CS_POLL_READ|CS_POLL_WRITE);
162 		return 0;
163 	}
164 	if (to->blocked[log] >= 0)
165 	{
166 		message((-1, "[%d] %s: %d unblock", __LINE__, state->logs[log].name, to->fp->fd));
167 		to->blocked[log] = -1;
168 		if (!--state->logs[log].blocked)
169 		{
170 			sfclose(state->logs[log].sp);
171 			state->logs[log].sp = 0;
172 			state->logs[log].offset = 0;
173 			remove(state->logs[log].name);
174 			message((-1, "[%d] %s: clear", __LINE__, state->logs[log].name));
175 		}
176 	}
177 	return 1;
178 }
179 
180 static int
dump(Css_t * css,register Connection_t * con,int log,Cssdisc_t * disc)181 dump(Css_t* css, register Connection_t* con, int log, Cssdisc_t* disc)
182 {
183 	register State_t*	state = (State_t*)disc;
184 	char*			s;
185 	size_t			n;
186 	int			r;
187 
188 	n = state->logs[log].offset - con->blocked[log];
189 	if (n > CHUNK)
190 		n = CHUNK;
191 	if (sfseek(state->logs[log].sp, con->blocked[log], SEEK_SET) != con->blocked[log])
192 		error(ERROR_SYSTEM|3, "%s: cannot seek to %I*d", state->logs[log].name, sizeof(con->blocked[log]), con->blocked[log]);
193 	message((-1, "[%d] %s reserve n %I*d offset %I*d", __LINE__, state->logs[log].name, sizeof(n), n, sizeof(con->blocked[log]), con->blocked[log]));
194 	if (!(s = sfreserve(state->logs[log].sp, n, 0)))
195 		error(ERROR_SYSTEM|3, "%s: cannot reserve %d at %I*d", state->logs[log].name, sizeof(n), n, sizeof(con->blocked[log]), con->blocked[log]);
196 	r = note(css, con, log, s, n, 1, disc);
197 	if (state->logs[log].sp && sfseek(state->logs[log].sp, state->logs[log].offset, SEEK_SET) != state->logs[log].offset)
198 		error(ERROR_SYSTEM|3, "%s: cannot seek to %I*d", state->logs[log].name, sizeof(state->logs[log].offset), state->logs[log].offset);
199 	return r;
200 }
201 
202 static int
post(Css_t * css,Cssdisc_t * disc,Connection_t * from,register Connection_t * to,int channel,const char * format,...)203 post(Css_t* css, Cssdisc_t* disc, Connection_t* from, register Connection_t* to, int channel, const char* format, ...)
204 {
205 	State_t*		state = (State_t*)disc;
206 	char*			s;
207 	ssize_t			n;
208 	Sfulong_t		m;
209 	va_list			ap;
210 
211 	sfprintf(state->tmp, "%d", channel);
212 	if (from)
213 		sfprintf(state->tmp, ".%d", from->fp->fd);
214 	sfputc(state->tmp, ' ');
215 	va_start(ap, format);
216 	sfvprintf(state->tmp, format, ap);
217 	va_end(ap);
218 	sfputc(state->tmp, '\n');
219 	n = sfstrtell(state->tmp);
220 	if (!(s = sfstruse(state->tmp)))
221 		error(ERROR_SYSTEM|3, "out of space");
222 	m = CHAN_MASK(channel);
223 	state->logged = 0;
224 	if (!to)
225 	{
226 		for (to = state->all; to; to = to->next)
227 			if ((to->mask & m) && to != from)
228 				note(css, to, state->log, s, n, 0, disc);
229 	}
230 	else if (to->mask & m)
231 		note(css, to, state->log, s, n, 0, disc);
232 	return 0;
233 }
234 
235 static void
drop(Css_t * css,Connection_t * con,Cssdisc_t * disc)236 drop(Css_t* css, Connection_t* con, Cssdisc_t* disc)
237 {
238 	register State_t*	state = (State_t*)disc;
239 	register Connection_t*	cp;
240 	register Connection_t*	pp;
241 
242 	pp = 0;
243 	for (cp = state->all; cp; pp = cp, cp = cp->next)
244 		if (cp == con)
245 		{
246 			if (pp)
247 				pp->next = cp->next;
248 			else
249 				state->all = cp->next;
250 			cp->fp->data = 0;
251 			free(cp);
252 			post(css, disc, cp, NiL, 0, "drop");
253 			break;
254 		}
255 }
256 
257 static int
acceptf(Css_t * css,Cssfd_t * fp,Csid_t * ip,char ** av,Cssdisc_t * disc)258 acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
259 {
260 	register State_t*	state = (State_t*)disc;
261 	register Connection_t*	con;
262 	int			i;
263 
264 	NoP(ip);
265 	NoP(av);
266 	if (!(con = newof(0, Connection_t, 1, 0)))
267 		return -1;
268 	fp->data = con;
269 	con->fp = fp;
270 	con->mask = CHAN_DEFAULT;
271 	for (i = 0; i < elementsof(state->logs); i++)
272 		con->blocked[i] = -1;
273 	con->next = state->all;
274 	state->all = con;
275 	post(css, disc, con, NiL, 0, "join");
276 	return fp->fd;
277 }
278 
279 static int
actionf(register Css_t * css,register Cssfd_t * fp,Cssdisc_t * disc)280 actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
281 {
282 	register State_t*	state = (State_t*)disc;
283 	register Connection_t*	con;
284 	register char*		s;
285 	char*			e;
286 	int			n;
287 	int			c;
288 	Sfulong_t		m;
289 	Sfulong_t		o;
290 
291 	switch (fp->status)
292 	{
293 	case CS_POLL_CLOSE:
294 		if (!(con = (Connection_t*)fp->data))
295 			return -1;
296 		drop(css, con, disc);
297 		return 0;
298 	case CS_POLL_READ:
299 		if (!(con = (Connection_t*)fp->data))
300 			return -1;
301 		if ((n = csread(css->state, fp->fd, buf, sizeof(buf) - 1, CS_LINE)) <= 0)
302 		{
303 			drop(css, con, disc);
304 			return -1;
305 		}
306 		if (n > 0 && buf[n - 1] == '\n')
307 			n--;
308 		buf[n] = 0;
309 		for (s = buf; isspace(*s); s++);
310 		c = (int)strtol(s, &e, 0);
311 		if (CHAN_VALID(c) && e > s)
312 		{
313 			s = e;
314 			if (*s == '.')
315 				while (isdigit(*++s));
316 			for (; isspace(*s); s++);
317 			if (c == 0)
318 			{
319 				for (e = s; *s && !isspace(*s); s++);
320 				if (*s)
321 					for (*s++ = 0; isspace(*s); s++);
322 				switch (*e)
323 				{
324 				case 'm':
325 					if (!strcmp(e, "mask"))
326 					{
327 						o = con->mask;
328 						if (*s)
329 						{
330 							m = strtoull(s, &e, 0);
331 							if (e > s)
332 								con->mask = m;
333 						}
334 						post(css, disc, con, con, 0, "mask 0x%I*x 0x%I*x", sizeof(con->mask), con->mask, sizeof(o), o);
335 					}
336 					break;
337 				case 'q':
338 					/* might want privilege check here */
339 					if (!strcmp(e, "quit"))
340 						exit(0);
341 					break;
342 				}
343 			}
344 			else
345 				post(css, disc, con, NiL, c, "%s", s);
346 		}
347 		return 1;
348 	case CS_POLL_WRITE:
349 		if (!(con = (Connection_t*)fp->data))
350 			return -1;
351 		if ((con->blocked[!state->log] < 0 || dump(css, con, !state->log, disc)) && con->blocked[state->log] >= 0)
352 			dump(css, con, state->log, disc);
353 		return 1;
354 	}
355 	return 0;
356 }
357 
358 static int
exceptf(Css_t * css,unsigned long op,unsigned long arg,Cssdisc_t * disc)359 exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
360 {
361 	switch (op)
362 	{
363 	case CSS_INTERRUPT:
364 		error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
365 		return 0;
366 	case CSS_DORMANT:
367 		error(2, "service dormant exit");
368 		exit(0);
369 	}
370 	error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
371 	return -1;
372 }
373 
374 int
main(int argc,char ** argv)375 main(int argc, char** argv)
376 {
377 	char*		e;
378 	State_t		state;
379 
380 	NoP(argc);
381 	error_info.id = "mbb";
382 	memset(&state, 0, sizeof(state));
383 	state.disc.version = CSS_VERSION;
384 	state.disc.flags = CSS_DAEMON|CSS_ERROR|CSS_INTERRUPT|CSS_LOG;
385 	state.disc.acceptf = acceptf;
386 	state.disc.actionf = actionf;
387 	state.disc.errorf = errorf;
388 	state.disc.exceptf = exceptf;
389 	for (;;)
390 	{
391 		switch (optget(argv, usage))
392 		{
393 		case 'b':
394 			state.backlog = opt_info.num;
395 			continue;
396 		case 'd':
397 			error_info.trace = -opt_info.num;
398 			continue;
399 		case 't':
400 			state.disc.timeout = strelapsed(opt_info.arg, &e, 1);
401 			if (*e)
402 				error(3, "%s: invalid timeout value", opt_info.arg);
403 			state.disc.flags |= CSS_DORMANT;
404 			continue;
405 		case '?':
406 			error(ERROR_USAGE|4, "%s", opt_info.arg);
407 			continue;
408 		case ':':
409 			error(2, "%s", opt_info.arg);
410 			continue;
411 		}
412 		break;
413 	}
414 	argv += opt_info.index;
415 	if (!argv[0] || argv[1])
416 		error(ERROR_USAGE|4, "%s", optusage(NiL));
417 	if (!(state.tmp = sfstropen()))
418 		error(ERROR_SYSTEM|3, "out of space [tmp stream]");
419 	if (!cssopen(argv[0], &state.disc))
420 		return 1;
421 	umask(S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
422 	csspoll(CS_NEVER, 0);
423 	return 1;
424 }
425