1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 1990-2011 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *                 Glenn Fowler <gsf@research.att.com>                  *
18 *                                                                      *
19 ***********************************************************************/
20 #pragma prototyped
21 
22 /*
23  * Glenn Fowler
24  * AT&T Research
25  *
26  * shared event daemon
27  */
28 
29 #define EVENT_MAJOR		1
30 #define EVENT_MINOR		0
31 
32 static const char usage[] =
33 "[-?\n@(#)$Id: event (AT&T Research) 2007-06-05 $\n]"
34 USAGE_LICENSE
35 "[+NAME?event - shared event client and server]"
36 "[+DESCRIPTION?\bevent\b is a shared event client and server. Events are "
37     "stored in a persistent database named by the \aname\a operand. Each "
38     "event has a name, an expiration, and a binary status \braised\b or "
39     "\bnot-raised\b. A non-existent event is \bnot-raised\b. Events may be "
40     "raised, deleted, cleared, tested and waited for. If no \brequest\b "
41     "operands are specified then requests are prompted for, with an "
42     "\bEVENT>\b prompt, and read from the standard input. Multiple command "
43     "line requests must be separated by \b:\b. In the following events "
44     "operands are matched by \bksh\b(1) patterns. The client requests are:]"
45     "{"
46         "[+all \aconnection\a?Raise all pending events for the "
47             "\aconnection\a. Mainly for debugging.]"
48         "[+clear \aevent\a ...?Mark \aevent\a not-raised but do not "
49             "delete from the database. This allows the events to be matched "
50             "by patterns.]"
51         "[+delete \aevent\a ...?Delete \aevent\a.]"
52         "[+exit?Close the client connection.]"
53         "[+hold [ \aevent\a ...]]?If \aevent\a operands are specified "
54             "then clients are not notified about the those events until they "
55             "are explicitly released by \brelease\b \aevent\a ... If no "
56             "events are specified then all current and future events will be "
57             "unconditionally held until a \brelease\b with no event "
58             "operands.]"
59         "[+info?List the server status pending events by client "
60             "connection. The list is terminated by a \bdone\b message.]"
61         "[+list [ \apattern\a ]]?Start an event dictionary scan and list "
62             "the first event. If \apattern\a is specified then only events "
63             "matching \apattern\a are listed.]"
64         "[+next?List the next event in the \blist\b event scan. The list "
65             "is terminated by a \bdone\b message.]"
66         "[+quit?Equivalent to exit.]"
67         "[+raise \aevent\a ...?Raise \aevent\a ...]"
68         "[+release [ \aevent\a ...]]?If \aevent\a operands are specified "
69             "then they are released from a previous \bhold\b \aevent\a ... "
70             "If no \aevent\a operands are specified then any previous "
71             "unconditional \bhold\b is turned off.]"
72         "[+set \aoption\a ...?]"
73         "[+stop?Terminate the server. Persistent data is preserved.]"
74         "[+test \aevent\a?Determine the \aevent\a status.]"
75         "[+wait \aevent\a?Wait for \aevent\a status to be \braised\b.]"
76     "}"
77 "[+?The \b--cs\b, \b--expire\b, \b--initialize\b, and \b--log\b options "
78     "apply to the initial service command, and the \b--expire\b, \b--log\b, "
79     "\b--newer\b, \b--older\b, and \b--quiet\b options apply to client "
80     "requests.]"
81 "[c:cs|connect-stream?Use \aconnect-stream\a instead of the default.]"
82     ":[connect-stream:=/dev/tcp/local/event]"
83 "[e:expire?Set the current event expiration to the \bdate\b(1) or "
84     "\bcron\b(1) expression \adate-expression\a.]:[date-expression]"
85 "[i:initialize?Initialize the service if it is not already running.]"
86 "[l!:log?Log server activity to \astate-name\a.log, where \astate-name\a "
87     "is the state path name sans suffix.]"
88 "[n:newer?Match events newer than \adate\a. If \b--older\b is also "
89     "specified then only event times > newer and < older match.]:[date]"
90 "[o:older?Match events older than \adate\a. If \b--newer\b is also "
91     "specified then only event times > newer and < older match.]:[date]"
92 "[q:quiet?Suppress request confirmation messages.]"
93 "\n"
94 "\nname [ request [ options ] [ arg ... ] ] [ : request ... ]\n"
95 "\n"
96 "[+CAVEATS?Expirations, logging and the \bset\b request are not "
97     "implemented yet.]"
98 "[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bnmake\b(1), \bdbm\b(3), "
99     "\bndbm\b(3), \bgdbm\b(3)]"
100 ;
101 
102 static const char	command[] = "event";
103 
104 static const char	ident_key[] = "'//\t<(IDENT)>\t\\\\'";
105 static const char	ident_name[] = "EVEN";
106 
107 #define IDENT_SWAP	0x01020304
108 #define IDENT_VERSION	((EVENT_MAJOR<<16)|(EVENT_MINOR))
109 
110 #define EVENT(s)	(*((char*)(s))!=ident_key[0])
111 #define log		_log	/* gnu builtin? you've got to be kidding */
112 
113 #include <ast.h>
114 #include <cdt.h>
115 #include <css.h>
116 #include <ctype.h>
117 #include <debug.h>
118 #include <error.h>
119 #include <namval.h>
120 #include <ls.h>
121 #include <regex.h>
122 #include <stdarg.h>
123 #include <swap.h>
124 #include <tok.h>
125 #include <tm.h>
126 #include <ast_ndbm.h>
127 
128 #if !_use_ndbm
129 
130 int
main(int argc,char ** argv)131 main(int argc, char** argv)
132 {
133 	NoP(argc);
134 	NoP(argv);
135 	error(3, "<ndbm.h> library required");
136 	return 1;
137 }
138 
139 #else
140 
141 #define KEY_MAX		64		/* max key length + 1			*/
142 
143 struct Key_s; typedef struct Key_s Key_t;
144 
145 #define DATA_clear	0x00000001	/* explicit clear			*/
146 #define DATA_hold	0x00000002	/* explicit hold			*/
147 
148 typedef uint32_t Number_t;
149 
150 typedef struct Data_s			/* event data				*/
151 {
152 	Number_t		expire;	/* expiration seconds since epoch	*/
153 	Number_t		time;	/* last raise time			*/
154 	Number_t		raise;	/* total # raise requests		*/
155 	Number_t		flags;	/* DATA_* flags				*/
156 } Data_t;
157 
158 typedef struct Event_s			/* event bucket				*/
159 {
160 	Dtlink_t	link;		/* dictionary link			*/
161 	unsigned int	waiting;	/* # clients waiting			*/
162 	Data_t		data;		/* event persistent data		*/
163 	char		name[256];	/* event name				*/
164 } Event_t;
165 
166 typedef struct Waiting_s		/* pending event bucket			*/
167 {
168 	Dtlink_t	link;		/* dictionary link			*/
169 	int		id;		/* wait id				*/
170 	Event_t*	event;		/* event bucket pointer			*/
171 } Waiting_t;
172 
173 typedef struct Connection_s		/* client connection state		*/
174 {
175 	Dtlink_t	link;		/* list link				*/
176 	Csid_t		id;		/* connection id			*/
177 	Dt_t*		waiting;	/* pending events			*/
178 	datum		list;		/* list finger				*/
179 	int		fd;		/* connection fd			*/
180 	int		all;		/* list all vs. list match		*/
181 	int		code;		/* request exit code			*/
182 	int		quiet;		/* suppress response log messages	*/
183 	unsigned long	newer;		/* list --newer time			*/
184 	unsigned long	older;		/* list --older time			*/
185 	regex_t		re;		/* list request pattern			*/
186 } Connection_t;
187 
188 typedef struct Request_s		/* static request info			*/
189 {
190 	const char*	name;		/* request name				*/
191 	int		index;		/* REQ_* index				*/
192 	int		min;		/* min #args				*/
193 	int		max;		/* max #args				*/
194 } Request_t;
195 
196 typedef struct State_s			/* program state			*/
197 {
198 	Cssdisc_t	disc;		/* css discipline			*/
199 	Dtdisc_t	condisc;	/* connection dictionary discipline	*/
200 	Dtdisc_t	eventdisc;	/* event dictionary discipline		*/
201 	Dtdisc_t	waitdisc;	/* pending events dictionary discipline	*/
202 	unsigned int	active;		/* number of active clients		*/
203 	int		hold;		/* hold announcements			*/
204 	int		log;		/* log activity				*/
205 	int		major;		/* db major version			*/
206 	int		minor;		/* db major version			*/
207 	int		swap;		/* datum <=> native int32_ swap		*/
208 	unsigned long	expire;		/* expiration in seconds		*/
209 	DBM*		dbm;		/* dbm handle				*/
210 	Dt_t*		connections;	/* active connection list		*/
211 	Dt_t*		events;		/* outstanding events dictionary	*/
212 	char*		service;	/* service connect stream path		*/
213 	char*		path;		/* event db path			*/
214 	Sfio_t*		logf;		/* log buffer stream			*/
215 	Sfio_t*		usrf;		/* usr buffer stream			*/
216 	Sfio_t*		tmp;		/* tmp buffer stream			*/
217 	char*		cmd[1024];	/* request command argv			*/
218 	char		req[8 * 1024];	/* request buffer			*/
219 } State_t;
220 
221 #define REQ_all			1
222 #define REQ_clear		2
223 #define REQ_delete		3
224 #define REQ_exit		4
225 #define REQ_hold		5
226 #define REQ_info		6
227 #define REQ_list		7
228 #define REQ_next		8
229 #define REQ_raise		9
230 #define REQ_release		10
231 #define REQ_set			11
232 #define REQ_stop		12
233 #define REQ_test		13
234 #define REQ_wait		14
235 
236 static const Request_t	requests[] =
237 {
238 	"a*ll",		REQ_all,	1,	1,
239 	"c*lear",	REQ_clear,	1,	-1,
240 	"d*elete",	REQ_delete,	1,	-1,
241 	"e*xit",	REQ_exit,	0,	0,
242 	"hold",		REQ_hold,	0,	-1,
243 	"i*nfo",	REQ_info,	0,	0,
244 	"l*ist",	REQ_list,	0,	1,
245 	"n*ext",	REQ_next,	0,	0,
246 	"q*uit",	REQ_exit,	0,	0,
247 	"r*aise",	REQ_raise,	1,	-1,
248 	"release",	REQ_release,	0,	-1,
249 	"s*et",		REQ_set,	0,	0,
250 	"stop",		REQ_stop,	0,	0,
251 	"t*est",	REQ_test,	1,	1,
252 	"w*ait",	REQ_wait,	1,	1,
253 };
254 
255 /*
256  * generate a user response and log message
257  */
258 
259 static void
log(State_t * state,Connection_t * con,int type,const char * format,...)260 log(State_t* state, Connection_t* con, int type, const char* format, ...)
261 {
262 	va_list		ap;
263 	char*		s;
264 
265 	va_start(ap, format);
266 	if (format)
267 		sfvprintf(state->tmp, format, ap);
268 	va_end(ap);
269 	if (type)
270 	{
271 		if (!(s = sfstruse(state->tmp)))
272 			error(ERROR_SYSTEM|3, "out of space");
273 		if (type != 'I' && state->log && state->logf)
274 			sfprintf(state->logf, "%s (%03d) %c %s\n", fmttime("%K", time(NiL)), con ? con->fd : 0, type, s);
275 		if (con && type != 'R' && type != 'S')
276 		{
277 			if (type != 'L' || !con->quiet)
278 				sfprintf(state->usrf, "%c %s\n", type, s);
279 			if (type == 'W')
280 				con->code |= 1;
281 			else if (type == 'E')
282 				con->code |= 2;
283 		}
284 	}
285 }
286 
287 /*
288  * accept a new connection
289  */
290 
291 static int
acceptf(Css_t * css,Cssfd_t * fp,Csid_t * ip,char ** av,Cssdisc_t * disc)292 acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
293 {
294 	register State_t*	state = (State_t*)disc;
295 	register Connection_t*	con;
296 
297 	if (!(con = newof(0, Connection_t, 1, 0)))
298 		return -1;
299 	fp->data = con;
300 	con->id = *ip;
301 	con->waiting = 0;
302 	con->fd = fp->fd;
303 	state->active++;
304 	dtinsert(state->connections, con);
305 	log(state, con, 'S', "accept connection -- %d active", state->active);
306 	return fp->fd;
307 }
308 
309 /*
310  * notify connections waiting on ep
311  */
312 
313 static int
notify(State_t * state,Event_t * ep)314 notify(State_t* state, Event_t* ep)
315 {
316 	Connection_t*	cp;
317 	Waiting_t*	wp;
318 	char*		s;
319 	size_t		n;
320 
321 	for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
322 		if (cp->waiting && (wp = (Waiting_t*)dtmatch(cp->waiting, &ep)))
323 		{
324 			if (wp->id >= 0)
325 			{
326 				log(state, cp, 'x', "%d 0", wp->id);
327 				n = sfstrtell(state->usrf);
328 				if (!(s = sfstruse(state->usrf)))
329 					error(ERROR_SYSTEM|3, "out of space");
330 				write(cp->fd, s, n);
331 			}
332 			else if (!cp->quiet)
333 				log(state, cp, 'L', "%s raised", ep->name);
334 			n = ep->waiting == 1;
335 			dtdelete(cp->waiting, wp);
336 			if (n)
337 				break;
338 		}
339 	return 0;
340 }
341 
342 /*
343  * post pending event name for connection
344  */
345 
346 static int
post(State_t * state,Connection_t * con,const char * name,int id)347 post(State_t* state, Connection_t* con, const char* name, int id)
348 {
349 	Event_t*	ep;
350 	Waiting_t*	wp;
351 
352 	if (!con->waiting && !(con->waiting = dtopen(&state->waitdisc, Dtset)))
353 	{
354 		error(ERROR_SYSTEM|3, "out of space [waiting]");
355 		return -1;
356 	}
357 	if (ep = dtmatch(state->events, name))
358 	{
359 		if (dtmatch(con->waiting, &ep))
360 			return 0;
361 	}
362 	else if (!(ep = newof(0, Event_t, 1, 0)))
363 	{
364 		error(ERROR_SYSTEM|3, "out of space [event]");
365 		return -1;
366 	}
367 	else
368 	{
369 		strcpy(ep->name, name);
370 		dtinsert(state->events, ep);
371 	}
372 	if (!(wp = newof(0, Waiting_t, 1, 0)))
373 	{
374 		error(ERROR_SYSTEM|3, "out of space [waiting]");
375 		return -1;
376 	}
377 	ep->waiting++;
378 	wp->id = id;
379 	wp->event = ep;
380 	dtinsert(con->waiting, wp);
381 	return 0;
382 }
383 
384 /*
385  * list server info/state
386  */
387 
388 static int
info(State_t * state,Connection_t * con,Css_t * css)389 info(State_t* state, Connection_t* con, Css_t* css)
390 {
391 	Connection_t*	cp;
392 	Waiting_t*	wp;
393 	int		n;
394 
395 	log(state, con, 'I', "info server='%s' version=%d.%d host=%s pid=%d uid=%d gid=%d", fmtident(usage), EVENT_MAJOR, EVENT_MINOR, csname(css->state, 0), getpid(), geteuid(), getegid());
396 	log(state, con, 'I', "info active=%d", state->active);
397 	for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
398 		if (cp->waiting && (n = dtsize(cp->waiting)) > 0)
399 		{
400 			log(state, con, 0, "waiting connection=%d count=%d", cp->fd, n);
401 			for (wp = (Waiting_t*)dtfirst(cp->waiting); wp; wp = (Waiting_t*)dtnext(cp->waiting, wp))
402 				log(state, con, 0, " %s", wp->event->name);
403 			log(state, con, 'I', 0);
404 		}
405 	log(state, con, 'I', "done");
406 	return 0;
407 }
408 
409 static int	request(State_t*, Connection_t*, int, int, char**, unsigned long, unsigned long);
410 
411 /*
412  * apply request r to one key
413  */
414 
415 static int
apply(State_t * state,Connection_t * con,int id,int index,datum key,datum val,Data_t * dat)416 apply(State_t* state, Connection_t* con, int id, int index, datum key, datum val, Data_t* dat)
417 {
418 	Event_t*	e;
419 	int		n;
420 
421 	switch (index)
422 	{
423 	case REQ_clear:
424 		dat->flags |= DATA_clear;
425 		dat->time = time(NiL);
426 		val.dptr = (void*)dat;
427 		val.dsize = sizeof(*dat);
428 		if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
429 			log(state, con, 'L', "%s cleared", key.dptr);
430 		else if (!dbm_error(state->dbm))
431 			log(state, con, 'W', "%s unchanged", key.dptr);
432 		else
433 		{
434 			dbm_clearerr(state->dbm);
435 			log(state, con, 'E', "%s io error", key.dptr);
436 		}
437 		break;
438 	case REQ_delete:
439 		if (!dbm_delete(state->dbm, key))
440 		{
441 			log(state, con, 'L', "%s deleted", key.dptr);
442 			return 1;
443 		}
444 		else if (!dbm_error(state->dbm))
445 			log(state, con, 'W', "%s not in db", key.dptr);
446 		else
447 		{
448 			dbm_clearerr(state->dbm);
449 			log(state, con, 'E', "%s io error", key.dptr);
450 		}
451 		break;
452 	case REQ_hold:
453 		dat->flags |= DATA_hold;
454 		dat->time = time(NiL);
455 		val.dptr = (void*)dat;
456 		val.dsize = sizeof(*dat);
457 		if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
458 			log(state, con, 'L', "%s held", key.dptr);
459 		else if (!dbm_error(state->dbm))
460 			log(state, con, 'W', "%s unchanged", key.dptr);
461 		else
462 		{
463 			dbm_clearerr(state->dbm);
464 			log(state, con, 'E', "%s io error", key.dptr);
465 		}
466 		break;
467 	case REQ_raise:
468 		dat->flags &= ~DATA_clear;
469 		dat->time = time(NiL);
470 		dat->raise++;
471 		val.dptr = (void*)dat;
472 		val.dsize = sizeof(*dat);
473 		if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
474 		{
475 			if (!state->hold && (e = (Event_t*)dtmatch(state->events, key.dptr)))
476 				notify(state, e);
477 			log(state, con, 'L', "%s raised", key.dptr);
478 		}
479 		else if (!dbm_error(state->dbm))
480 			log(state, con, 'W', "%s unchanged", key.dptr);
481 		else
482 		{
483 			dbm_clearerr(state->dbm);
484 			log(state, con, 'E', "%s io error", key.dptr);
485 		}
486 		break;
487 	case REQ_release:
488 		if (dat->flags & DATA_hold)
489 		{
490 			dat->flags &= ~DATA_hold;
491 			if (dat->raise)
492 			{
493 				val.dptr = (void*)dat;
494 				val.dsize = sizeof(*dat);
495 				if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
496 					log(state, con, 'L', "%s released", key.dptr);
497 				else if (!dbm_error(state->dbm))
498 					log(state, con, 'W', "%s unchanged", key.dptr);
499 				else
500 				{
501 					dbm_clearerr(state->dbm);
502 					log(state, con, 'E', "%s io error", key.dptr);
503 				}
504 				if (e = (Event_t*)dtmatch(state->events, key.dptr))
505 					notify(state, e);
506 			}
507 			else if (!dbm_delete(state->dbm, key))
508 				log(state, con, 'L', "%s deleted", key.dptr);
509 			else if (!dbm_error(state->dbm))
510 				log(state, con, 'W', "%s not in db", key.dptr);
511 			else
512 			{
513 				dbm_clearerr(state->dbm);
514 				log(state, con, 'E', "%s io error", key.dptr);
515 			}
516 		}
517 		break;
518 	case REQ_test:
519 		if (val.dptr && !(dat->flags & DATA_clear))
520 		{
521 			if (state->hold)
522 				log(state, con, 'W', "%s global hold", key.dptr);
523 			else if (dat->flags & DATA_hold)
524 				log(state, con, 'W', "%s explicit hold", key.dptr);
525 			else
526 				log(state, con, 'I', "%s raised", key.dptr);
527 		}
528 		else
529 			log(state, con, 'I', "%s not-raised", key.dptr);
530 		break;
531 	case REQ_wait:
532 		if (val.dptr && !state->hold && !(dat->flags & (DATA_clear|DATA_hold)))
533 			log(state, con, 'I', "%s raised", key.dptr);
534 		else if (post(state, con, key.dptr, id))
535 			return -1;
536 		break;
537 	}
538 	return 0;
539 }
540 
541 /*
542  * apply request r to args a
543  */
544 
545 static int
request(State_t * state,Connection_t * con,int id,int index,char ** a,unsigned long older,unsigned long newer)546 request(State_t* state, Connection_t* con, int id, int index, char** a, unsigned long older, unsigned long newer)
547 {
548 	char*			s;
549 	int			i;
550 	Event_t*		e;
551 	datum			key;
552 	datum			val;
553 	Data_t			dat;
554 	regex_t			re;
555 	char			buf[64];
556 
557 	while (s = *a++)
558 		if (i = regcomp(&re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
559 		{
560 			regerror(i, &re, buf, sizeof(buf));
561 			log(state, con, 'E', "%s: %s", s, buf);
562 		}
563 		else if (regstat(&re)->re_info & REG_LITERAL)
564 		{
565 			if (!EVENT(s))
566 			{
567 				log(state, con, 'E', "%s invalid event name", s);
568 				return -1;
569 			}
570 			key.dptr = (void*)s;
571 			key.dsize = strlen(s) + 1;
572 			if (key.dsize >= sizeof(e->name))
573 				s[(key.dsize = sizeof(e->name)) - 1] = 0;
574 			val = dbm_fetch(state->dbm, key);
575 			if (val.dptr)
576 			{
577 				if (val.dsize > sizeof(dat))
578 					val.dsize = sizeof(dat);
579 				swapmem(state->swap, val.dptr, &dat, sizeof(dat));
580 			}
581 			else
582 				memset(&dat, 0, sizeof(dat));
583 			if (apply(state, con, id, index, key, val, &dat))
584 				return -1;
585 		}
586 		else
587 		{
588 		rescan:
589 			for (key = dbm_firstkey(state->dbm); key.dptr; key = dbm_nextkey(state->dbm))
590 				if (EVENT(key.dptr) && !regexec(&re, key.dptr, 0, NiL, 0))
591 				{
592 					val = dbm_fetch(state->dbm, key);
593 					if (val.dsize > sizeof(dat))
594 						val.dsize = sizeof(dat);
595 					swapmem(state->swap, val.dptr, &dat, val.dsize);
596 					if ((!older || dat.time < older) && (!newer || dat.time > newer))
597 					{
598 						if ((i = apply(state, con, id, index, key, val, &dat)) < 0)
599 							return -1;
600 						if (i > 0)
601 							goto rescan;
602 					}
603 				}
604 		}
605 	return 0;
606 }
607 
608 /*
609  * convert s to a date/time
610  */
611 
612 static unsigned long
date(State_t * state,Connection_t * con,const char * s)613 date(State_t* state, Connection_t* con, const char* s)
614 {
615 	unsigned long	t;
616 	char*		e;
617 	datum		key;
618 	datum		val;
619 	Data_t		dat;
620 
621 	key.dptr = (void*)s;
622 	key.dsize = strlen(s) + 1;
623 	val = dbm_fetch(state->dbm, key);
624 	if (val.dptr)
625 	{
626 		swapmem(state->swap, val.dptr, &dat, val.dsize);
627 		t = dat.time;
628 	}
629 	else
630 	{
631 		t = tmdate(s, &e, NiL);
632 		if (*e)
633 		{
634 			log(state, con, 'E', "%s: invalid date/time", s);
635 			t = 0;
636 		}
637 	}
638 	return t;
639 }
640 
641 /*
642  * service a request
643  */
644 
645 static int
actionf(register Css_t * css,register Cssfd_t * fp,Cssdisc_t * disc)646 actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
647 {
648 	register State_t*	state = (State_t*)disc;
649 	register Connection_t*	con;
650 	char*			s;
651 	char*			t;
652 	char**			a;
653 	char**			q;
654 	Cssfd_t*		f;
655 	Request_t*		r;
656 	Event_t*		e;
657 	Waiting_t*		w;
658 	Connection_t*		x;
659 	int			n;
660 	int			err;
661 	int			id;
662 	unsigned long		older;
663 	unsigned long		newer;
664 	datum			key;
665 	datum			val;
666 	Data_t			data;
667 	char			buf[64];
668 
669 	switch (fp->status)
670 	{
671 	case CS_POLL_CLOSE:
672 		if (con = (Connection_t*)fp->data)
673 			dtdelete(state->connections, con);
674 		return 0;
675 	case CS_POLL_READ:
676 		con = (Connection_t*)fp->data;
677 		if ((n = csread(css->state, fp->fd, state->req, sizeof(state->req), CS_LINE)) <= 0)
678 			return -1;
679 		state->req[--n] = 0;
680 		log(state, con, 'R', "%s", state->req);
681 		con->code = 0;
682 		if (tokscan(state->req, NiL, " %v ", state->cmd, elementsof(state->cmd) - 1) > 0)
683 		{
684 			id = -1;
685 			for (q = state->cmd; (s = *q) && (isalpha(*s) || *s == '_'); q++)
686 			{
687 				while (isalnum(*++s));
688 				if (*s != '=')
689 					break;
690 				if ((s - *q) == 2 && *(s - 1) == 'd' && *(s - 2) == 'i')
691 					id = (int)strtol(s + 1, NiL, 0);
692 			}
693 			s = *(a = q);
694 			if (!(r = (Request_t*)strpsearch(requests, elementsof(requests), sizeof(requests[0]), s, NiL)))
695 				log(state, con, 'E', "%s: unknown request", s);
696 			else
697 			{
698 				opt_info.index = 0;
699 				newer = older = 0;
700 				err = 0;
701 				sfstrseek(state->usrf, 0, SEEK_SET);
702 				for (;;)
703 				{
704 					switch (optget(a, usage))
705 					{
706 					case 'e':
707 						if (r->index == REQ_set)
708 						{
709 							state->expire = strelapsed(opt_info.arg, &t, 1);
710 							if (*t)
711 							{
712 								log(state, con, 'E', "%s: invalid elapsed time expression", opt_info.arg);
713 								err = 1;
714 								break;
715 							}
716 						}
717 						continue;
718 					case 'l':
719 						if (r->index == REQ_set)
720 							state->log = opt_info.num;
721 						continue;
722 					case 'n':
723 						newer = date(state, con, opt_info.arg);
724 						continue;
725 					case 'o':
726 						older = date(state, con, opt_info.arg);
727 						continue;
728 					case 'q':
729 						con->quiet = opt_info.num;
730 						continue;
731 					case '?':
732 					case ':':
733 						log(state, con, 'E', "%s: %s", s, opt_info.arg);
734 						err = 1;
735 						break;
736 					}
737 					break;
738 				}
739 				if (!err)
740 				{
741 					if (!*(a += opt_info.index))
742 					{
743 						if (newer || older)
744 						{
745 							a[0] = "*";
746 							a[1] = 0;
747 							n = 1;
748 						}
749 						else
750 							n = 0;
751 					}
752 					else
753 						n = a[1] ? 2 : 1;
754 					if (r->min && n < r->min)
755 						sfprintf(state->usrf, "E %s: at least %d argument%s expected\n", s, r->min, r->min == 1 ? "" : "s");
756 					else if (r->max > 0 && n > r->max)
757 						log(state, con, 'E', "%s: at most %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
758 					else if (r->min == r->max && n != r->max)
759 						log(state, con, 'E', "%s: %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
760 					else
761 						switch (r->index)
762 						{
763 						case REQ_all:
764 							n = (int)strtol(a[0], &t, 0);
765 							if (*t)
766 							{
767 								log(state, con, 'E', "%s: invalid numeric value", a[0]);
768 								break;
769 							}
770 							else if (!(f = cssfd(css, n, 0)) || !(x = (Connection_t*)f->data))
771 							{
772 								log(state, con, 'E', "%d: invalid connection index", n);
773 								break;
774 							}
775 							if (x->waiting)
776 							{
777 								n = x->quiet;
778 								x->quiet = 1;
779 								a = state->cmd;
780 								for (w = (Waiting_t*)dtfirst(x->waiting); w; w = (Waiting_t*)dtnext(x->waiting, w))
781 								{
782 									if (a >= &state->cmd[elementsof(state->cmd)-1])
783 									{
784 										*a = 0;
785 										if (request(state, x, -1, REQ_raise, a = state->cmd, older, newer))
786 											break;
787 									}
788 									*a++ = w->event->name;
789 									log(state, con, 'R', "%s %s", s, w->event->name);
790 								}
791 								if (a > state->cmd)
792 								{
793 									*a = 0;
794 									request(state, x, -1, REQ_raise, state->cmd, older, newer);
795 								}
796 								x->quiet = n;
797 							}
798 							log(state, con, 'I', "done");
799 							break;
800 						case REQ_clear:
801 						case REQ_delete:
802 						case REQ_raise:
803 						case REQ_test:
804 						case REQ_wait:
805 							if (request(state, con, id, r->index, a, older, newer))
806 								return -1;
807 							break;
808 						case REQ_exit:
809 							cssfd(css, fp->fd, CS_POLL_CLOSE);
810 							break;
811 						case REQ_info:
812 							info(state, con, css);
813 							break;
814 						case REQ_hold:
815 							if (!*a)
816 							{
817 								state->hold = 1;
818 								sfprintf(state->usrf, "I holding\n");
819 							}
820 							else
821 								if (request(state, con, id, r->index, a, older, newer))
822 									return -1;
823 							break;
824 						case REQ_list:
825 							con->all = 1;
826 							if (s = *a)
827 							{
828 								if (n = regcomp(&con->re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
829 								{
830 									regerror(n, &con->re, buf, sizeof(buf));
831 									log(state, con, 'E', "%s: %s", s, buf);
832 									break;
833 								}
834 								con->all = 0;
835 							}
836 							con->list = dbm_firstkey(state->dbm);
837 							if (!con->list.dptr)
838 							{
839 								log(state, con, 'I', "empty");
840 								break;
841 							}
842 							con->newer = newer;
843 							con->older = older;
844 							goto list;
845 						case REQ_next:
846 							if (!con->list.dptr)
847 							{
848 								log(state, con, 'W', "next: must execute list first");
849 								break;
850 							}
851 							for (;;)
852 							{
853 								con->list = dbm_nextkey(state->dbm);
854 								if (!con->list.dptr)
855 								{
856 									log(state, con, 'I', "done");
857 									break;
858 								}
859 						list:
860 								if (EVENT(con->list.dptr) && (con->all || !regexec(&con->re, con->list.dptr, 0, NiL, 0)))
861 								{
862 									val = dbm_fetch(state->dbm, con->list);
863 									if (val.dsize > sizeof(data))
864 										val.dsize = sizeof(data);
865 									swapmem(state->swap, val.dptr, &data, val.dsize);
866 									if ((!con->older || data.time < con->older) && (!con->newer || data.time > con->newer))
867 									{
868 										log(state, con, 'I', "event %s %s %d%s%s", con->list.dptr, fmttime("%K", data.time), data.raise, (data.flags & DATA_clear) ? " CLEAR" : "", (data.flags & DATA_hold) ? " HOLD" : "");
869 										break;
870 									}
871 								}
872 							}
873 							break;
874 						case REQ_release:
875 							if (!*a)
876 							{
877 								state->hold = 0;
878 								sfprintf(state->usrf, "I released\n");
879 								key = dbm_firstkey(state->dbm);
880 								while (key.dptr)
881 								{
882 									val = dbm_fetch(state->dbm, key);
883 									if (val.dsize > sizeof(data))
884 										val.dsize = sizeof(data);
885 									swapmem(state->swap, val.dptr, &data, val.dsize);
886 									if (!(data.flags & (DATA_clear|DATA_hold)) && (e = (Event_t*)dtmatch(state->events, key.dptr)))
887 										notify(state, e);
888 									key = dbm_nextkey(state->dbm);
889 								}
890 							}
891 							else
892 								if (request(state, con, id, r->index, a, older, newer))
893 									return -1;
894 							break;
895 						case REQ_set:
896 							break;
897 						case REQ_stop:
898 							exit(0);
899 							break;
900 						}
901 				}
902 			}
903 			if (sfstrtell(state->usrf))
904 			{
905 				if (id >= 0)
906 				{
907 					sfstrseek(state->usrf, 0, SEEK_SET);
908 					log(state, con, 'x', "%d %d", id, con->code);
909 				}
910 				n = sfstrtell(state->usrf);
911 				if (!(s = sfstruse(state->usrf)))
912 					error(ERROR_SYSTEM|3, "out of space");
913 				if (cswrite(css->state, fp->fd, s, n) != n)
914 					return -1;
915 			}
916 		}
917 		return 1;
918 	}
919 	return 0;
920 }
921 
922 /*
923  * handle exceptions
924  */
925 
926 static int
exceptf(Css_t * css,unsigned long op,unsigned long arg,Cssdisc_t * disc)927 exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
928 {
929 	register State_t*	state = (State_t*)disc;
930 
931 	switch (op)
932 	{
933 	case CSS_CLOSE:
934 		if (state->dbm)
935 		{
936 			dbm_close(state->dbm);
937 			state->dbm = 0;
938 		}
939 		return 0;
940 	case CSS_INTERRUPT:
941 		error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
942 		return 0;
943 	case CSS_WAKEUP:
944 		error(2, "wakeup");
945 		return 0;
946 	}
947 	error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
948 	return -1;
949 }
950 
951 /*
952  * free connection
953  */
954 
955 static void
confree(Dt_t * dt,void * obj,Dtdisc_t * disc)956 confree(Dt_t* dt, void* obj, Dtdisc_t* disc)
957 {
958 	State_t*	state = (State_t*)((char*)disc - offsetof(State_t, condisc));
959 	Connection_t*	con = (Connection_t*)obj;
960 
961 	NoP(dt);
962 	NoP(disc);
963 	state->active--;
964 	if (con->waiting)
965 		dtclose(con->waiting);
966 	log(state, con, 'S', "drop connection -- %d active", state->active);
967 	free(obj);
968 }
969 
970 /*
971  * free event
972  */
973 
974 static void
eventfree(Dt_t * dt,void * obj,Dtdisc_t * disc)975 eventfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
976 {
977 	NoP(dt);
978 	NoP(disc);
979 	free(obj);
980 }
981 
982 /*
983  * free connection pending event
984  */
985 
986 static void
waitfree(Dt_t * dt,void * obj,Dtdisc_t * disc)987 waitfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
988 {
989 	State_t*	state = (State_t*)((char*)disc - offsetof(State_t, waitdisc));
990 	Waiting_t*	p = (Waiting_t*)obj;
991 
992 	NoP(dt);
993 	if (--p->event->waiting == 0)
994 		dtdelete(state->events, p->event);
995 	free(obj);
996 }
997 
998 /*
999  * open and verify event db
1000  */
1001 
1002 static void
db(register State_t * state)1003 db(register State_t* state)
1004 {
1005 	datum		key;
1006 	datum		val;
1007 	Data_t		data;
1008 	uint32_t	u4;
1009 
1010 	if (!(state->dbm = dbm_open(state->path, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)))
1011 		error(ERROR_SYSTEM|3, "%s: cannot open database for read/write", state->path);
1012 	key.dptr = (void*)ident_key;
1013 	key.dsize = sizeof(ident_key);
1014 	val = dbm_fetch(state->dbm, key);
1015 	if (val.dptr)
1016 	{
1017 		if (val.dsize != sizeof(data))
1018 			error(3, "%s: invalid db -- data size %d, expected %d", state->path, val.dsize, sizeof(data));
1019 		memcpy(&data, val.dptr, val.dsize);
1020 		if (memcmp(&data.expire, ident_name, sizeof(data.expire)))
1021 			error(3, "%s: %s: invalid db -- ident mismatch, expected %s", state->path, ident_key, ident_name);
1022 		u4 = IDENT_SWAP;
1023 		if (state->swap = swapop(&u4, &data.time, 4))
1024 			swapmem(state->swap, &data, &data, sizeof(data));
1025 	}
1026 	else
1027 	{
1028 		val = dbm_firstkey(state->dbm);
1029 		if (val.dptr)
1030 			error(3, "%s: %s: invalid db -- ident entry expected", state->path, ident_key);
1031 		memset(&data, 0, sizeof(data));
1032 		memcpy(&data.expire, ident_name, sizeof(data.expire));
1033 		data.time = IDENT_SWAP;
1034 		data.raise = IDENT_VERSION;
1035 		val.dptr = (void*)&data;
1036 		val.dsize = sizeof(data);
1037 		if (dbm_store(state->dbm, key, val, DBM_INSERT))
1038 		{
1039 			dbm_clearerr(state->dbm);
1040 			error(3, "%s: %s: db initial ident entry store failed", state->path, ident_key);
1041 		}
1042 	}
1043 	state->major = (data.raise >> 16) & 0xffff;
1044 	state->minor = data.raise & 0xffff;
1045 }
1046 
1047 /*
1048  * client/server main
1049  */
1050 
1051 int
main(int argc,char ** argv)1052 main(int argc, char** argv)
1053 {
1054 	char*			s;
1055 	Css_t*			css;
1056 
1057 	char*			p = 0;
1058 	int			server = 0;
1059 
1060 	static State_t		state;
1061 
1062 	NoP(argc);
1063 	setlocale(LC_ALL, "");
1064 	opt_info.argv = argv;
1065 	state.log = 1;
1066 	error_info.id = (char*)command;
1067 	if (!(state.usrf = sfstropen()) || !(state.tmp = sfstropen()))
1068 		error(3, "out of space [buf]");
1069 
1070 	/*
1071 	 * check the options
1072 	 */
1073 
1074 	for (;;)
1075 	{
1076 		switch (optget(argv, usage))
1077 		{
1078 		case 'c':
1079 			p = opt_info.arg;
1080 			continue;
1081 		case 'e':
1082 			state.expire = strelapsed(opt_info.arg, &s, 1);
1083 			if (*s)
1084 				error(2, "%s: invalid elapsed time expression", opt_info.arg);
1085 			continue;
1086 		case 'i':
1087 			server = 1;
1088 			continue;
1089 		case 'l':
1090 			state.log = opt_info.num;
1091 			continue;
1092 		case '?':
1093 			error(ERROR_USAGE|4, "%s", opt_info.arg);
1094 			continue;
1095 		case ':':
1096 			error(2, "%s", opt_info.arg);
1097 			continue;
1098 		}
1099 		break;
1100 	}
1101 	if (error_info.errors || !(state.path = *(argv += opt_info.index)))
1102 		error(ERROR_USAGE|4, "%s", optusage(NiL));
1103 
1104 	/*
1105 	 * get the connect stream path
1106 	 */
1107 
1108 	if (s = strrchr(state.path, '/'))
1109 		s++;
1110 	else
1111 		s = state.path;
1112 	if (p)
1113 		sfprintf(state.usrf, "%s/%s", p, s);
1114 	else
1115 		sfprintf(state.usrf, "/dev/tcp/local/%s/%s", error_info.id, s);
1116 	if (!(state.service = strdup(sfstruse(state.usrf))))
1117 		error(3, "out of space [service]");
1118 
1119 	/*
1120 	 * either server or client at this point
1121 	 */
1122 
1123 	if (server)
1124 	{
1125 		umask(S_IWOTH);
1126 		db(&state);
1127 		state.condisc.link = offsetof(Event_t, link);
1128 		state.condisc.freef = confree;
1129 		if (!(state.connections = dtopen(&state.condisc, Dtlist)))
1130 			error(ERROR_SYSTEM|3, "out of space [connection dictionary]");
1131 		state.eventdisc.link = offsetof(Event_t, link);
1132 		state.eventdisc.key = offsetof(Event_t, name);
1133 		state.eventdisc.freef = eventfree;
1134 		if (!(state.events = dtopen(&state.eventdisc, Dtoset)))
1135 			error(ERROR_SYSTEM|3, "out of space [event dictionary]");
1136 		state.waitdisc.link = offsetof(Waiting_t, link);
1137 		state.waitdisc.key = offsetof(Waiting_t, event);
1138 		state.waitdisc.size = sizeof(Event_t*);
1139 		state.waitdisc.freef = waitfree;
1140 		state.disc.version = CSS_VERSION;
1141 		state.disc.flags = CSS_DAEMON|CSS_LOG|CSS_ERROR|CSS_INTERRUPT|CSS_WAKEUP|CSS_PRESERVE;
1142 		state.disc.timeout = 60 * 60 * 1000L;
1143 		state.disc.acceptf = acceptf;
1144 		state.disc.actionf = actionf;
1145 		state.disc.errorf = errorf;
1146 		state.disc.exceptf = exceptf;
1147 		if (!(css = cssopen(state.service, &state.disc)))
1148 			return 1;
1149 		umask(S_IWOTH);
1150 		error_info.id = css->id;
1151 		if (state.log)
1152 		{
1153 			sfprintf(state.tmp, "%s.log", state.path);
1154 			if (!(s = sfstruse(state.tmp)))
1155 				error(ERROR_SYSTEM|3, "out of space");
1156 			if (state.logf = sfopen(NiL, s, "a"))
1157 				sfset(state.logf, SF_LINE, 1);
1158 			else
1159 				error(ERROR_SYSTEM|2, "%s: cannot append log file", s);
1160 		}
1161 		log(&state, 0, 'S', "start service %s", fmtident(usage));
1162 		csspoll(CS_NEVER, 0);
1163 		log(&state, 0, 'S', "stop service");
1164 		return 1;
1165 	}
1166 	return csclient(&cs, -1, state.service, "event> ", argv + 1, CS_CLIENT_ARGV|CS_CLIENT_SEP);
1167 }
1168 
1169 #endif
1170