1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 1990-2013 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *               Glenn Fowler <glenn.s.fowler@gmail.com>                *
18 *                                                                      *
19 ***********************************************************************/
20 #pragma prototyped
21 
22 /*
23  * Glenn Fowler
24  * AT&T Research
25  *
26  * shared event daemon
27  */
28 
29 #define EVENT_MAJOR		1
30 #define EVENT_MINOR		0
31 
32 static const char usage[] =
33 "[-?\n@(#)$Id: event (AT&T Research) 2013-10-25 $\n]"
34 USAGE_LICENSE
35 "[+NAME?event - shared event client and server]"
36 "[+DESCRIPTION?\bevent\b is a shared event client and server. Events are "
37     "stored in a persistent database named by the \aname\a operand. Each "
38     "event has a name, an expiration, and a binary status \braised\b or "
39     "\bnot-raised\b. A non-existent event is \bnot-raised\b. Events may be "
40     "raised, deleted, cleared, tested and waited for. If no \brequest\b "
41     "operands are specified then requests are prompted for, with an "
42     "\bEVENT>\b prompt, and read from the standard input. Multiple command "
43     "line requests must be separated by \b:\b. In the following events "
44     "operands are matched by \bksh\b(1) patterns. The client requests are:]"
45     "{"
46         "[+all \aconnection\a?Raise all pending events for the "
47             "\aconnection\a. Mainly for debugging.]"
48         "[+clear \aevent\a ...?Mark \aevent\a not-raised but do not "
49             "delete from the database. This allows the events to be matched "
50             "by patterns.]"
51         "[+delete \aevent\a ...?Delete \aevent\a.]"
52         "[+exit?Close the client connection.]"
53         "[+hold [ \aevent\a ...]]?If \aevent\a operands are specified "
54             "then clients are not notified about the those events until they "
55             "are explicitly released by \brelease\b \aevent\a ... If no "
56             "events are specified then all current and future events will be "
57             "unconditionally held until a \brelease\b with no event "
58             "operands.]"
59         "[+info?List the server status pending events by client "
60             "connection. The list is terminated by a \bdone\b message.]"
61         "[+list [ \apattern\a ]]?Start an event dictionary scan and list "
62             "the first event. If \apattern\a is specified then only events "
63             "matching \apattern\a are listed.]"
64         "[+next?List the next event in the \blist\b event scan. The list "
65             "is terminated by a \bdone\b message.]"
66         "[+quit?Equivalent to exit.]"
67         "[+raise \aevent\a ...?Raise \aevent\a ...]"
68         "[+release [ \aevent\a ...]]?If \aevent\a operands are specified "
69             "then they are released from a previous \bhold\b \aevent\a ... "
70             "If no \aevent\a operands are specified then any previous "
71             "unconditional \bhold\b is turned off.]"
72         "[+set \aoption\a ...?]"
73         "[+stop?Terminate the server. Persistent data is preserved.]"
74         "[+test \aevent\a?Determine the \aevent\a status.]"
75         "[+wait \aevent\a?Wait for \aevent\a status to be \braised\b.]"
76     "}"
77 "[+?The \b--cs\b, \b--expire\b, \b--initialize\b, and \b--log\b options "
78     "apply to the initial service command, and the \b--expire\b, \b--log\b, "
79     "\b--newer\b, \b--older\b, and \b--quiet\b options apply to client "
80     "requests.]"
81 "[c:cs|connect-stream?Use \aconnect-stream\a instead of the default.]"
82     ":[connect-stream:=/dev/tcp/local/event]"
83 "[e:expire?Set the current event expiration to the \bdate\b(1) or "
84     "\bcron\b(1) expression \adate-expression\a.]:[date-expression]"
85 "[i:initialize?Initialize the service if it is not already running.]"
86 "[l!:log?Log server activity to \astate-name\a.log, where \astate-name\a "
87     "is the state path name sans suffix.]"
88 "[n:newer?Match events newer than \adate\a. If \b--older\b is also "
89     "specified then only event times > newer and < older match.]:[date]"
90 "[o:older?Match events older than \adate\a. If \b--newer\b is also "
91     "specified then only event times > newer and < older match.]:[date]"
92 "[q:quiet?Suppress request confirmation messages.]"
93 "\n"
94 "\nname [ request [ options ] [ arg ... ] ] [ : request ... ]\n"
95 "\n"
96 "[+CAVEATS?Expirations, logging and the \bset\b request are not "
97     "implemented yet.]"
98 "[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bnmake\b(1), \bdbm\b(3), "
99     "\bndbm\b(3), \bgdbm\b(3)]"
100 ;
101 
102 static const char	command[] = "event";
103 
104 static const char	ident_key[] = "'//\t<(IDENT)>\t\\\\'";
105 static const char	ident_name[] = "EVEN";
106 
107 #define IDENT_SWAP	0x01020304
108 #define IDENT_VERSION	((EVENT_MAJOR<<16)|(EVENT_MINOR))
109 
110 #define EVENT(s)	(*((char*)(s))!=ident_key[0])
111 #define log		_log	/* gnu builtin? you've got to be kidding */
112 
113 #include <ast.h>
114 #include <cdt.h>
115 #include <css.h>
116 #include <ctype.h>
117 #include <debug.h>
118 #include <error.h>
119 #include <namval.h>
120 #include <ls.h>
121 #include <regex.h>
122 #include <stdarg.h>
123 #include <swap.h>
124 #include <tok.h>
125 #include <tm.h>
126 #include <ast_ndbm.h>
127 
128 #if !_use_ndbm
129 
130 int
main(int argc,char ** argv)131 main(int argc, char** argv)
132 {
133 	NoP(argc);
134 	NoP(argv);
135 	error(3, "<ndbm.h> library required");
136 	return 1;
137 }
138 
139 #else
140 
141 #define KEY_MAX		64		/* max key length + 1			*/
142 
143 struct Key_s; typedef struct Key_s Key_t;
144 
145 #define DATA_clear	0x00000001	/* explicit clear			*/
146 #define DATA_hold	0x00000002	/* explicit hold			*/
147 
148 typedef uint32_t Number_t;
149 
150 typedef struct Data_s			/* event data				*/
151 {
152 	Number_t		expire;	/* expiration seconds since epoch	*/
153 	Number_t		time;	/* last raise time			*/
154 	Number_t		raise;	/* total # raise requests		*/
155 	Number_t		flags;	/* DATA_* flags				*/
156 } Data_t;
157 
158 typedef struct Event_s			/* event bucket				*/
159 {
160 	Dtlink_t	link;		/* dictionary link			*/
161 	unsigned int	waiting;	/* # clients waiting			*/
162 	Data_t		data;		/* event persistent data		*/
163 	char		name[256];	/* event name				*/
164 } Event_t;
165 
166 typedef struct Waiting_s		/* pending event bucket			*/
167 {
168 	Dtlink_t	link;		/* dictionary link			*/
169 	int		id;		/* wait id				*/
170 	Event_t*	event;		/* event bucket pointer			*/
171 } Waiting_t;
172 
173 typedef struct Connection_s		/* client connection state		*/
174 {
175 	Dtlink_t	link;		/* list link				*/
176 	Csid_t		id;		/* connection id			*/
177 	Dt_t*		waiting;	/* pending events			*/
178 	datum		list;		/* list finger				*/
179 	int		fd;		/* connection fd			*/
180 	int		all;		/* list all vs. list match		*/
181 	int		code;		/* request exit code			*/
182 	int		quiet;		/* suppress response log messages	*/
183 	unsigned long	newer;		/* list --newer time			*/
184 	unsigned long	older;		/* list --older time			*/
185 	regex_t		re;		/* list request pattern			*/
186 } Connection_t;
187 
188 typedef struct Request_s		/* static request info			*/
189 {
190 	const char*	name;		/* request name				*/
191 	int		index;		/* REQ_* index				*/
192 	int		min;		/* min #args				*/
193 	int		max;		/* max #args				*/
194 } Request_t;
195 
196 typedef struct State_s			/* program state			*/
197 {
198 	Cssdisc_t	disc;		/* css discipline			*/
199 	Dtdisc_t	condisc;	/* connection dictionary discipline	*/
200 	Dtdisc_t	eventdisc;	/* event dictionary discipline		*/
201 	Dtdisc_t	waitdisc;	/* pending events dictionary discipline	*/
202 	unsigned int	active;		/* number of active clients		*/
203 	int		hold;		/* hold announcements			*/
204 	int		log;		/* log activity				*/
205 	int		major;		/* db major version			*/
206 	int		minor;		/* db major version			*/
207 	int		swap;		/* datum <=> native int32_ swap		*/
208 	unsigned long	expire;		/* expiration in seconds		*/
209 	DBM*		dbm;		/* dbm handle				*/
210 	Dt_t*		connections;	/* active connection list		*/
211 	Dt_t*		events;		/* outstanding events dictionary	*/
212 	char*		service;	/* service connect stream path		*/
213 	char*		path;		/* event db path			*/
214 	Sfio_t*		logf;		/* log file stream			*/
215 	Sfio_t*		usrf;		/* usr buffer stream			*/
216 	Sfio_t*		tmp;		/* tmp buffer stream			*/
217 	char*		cmd[1024];	/* request command argv			*/
218 	char		req[8 * 1024];	/* request buffer			*/
219 } State_t;
220 
221 #define REQ_all			1
222 #define REQ_clear		2
223 #define REQ_delete		3
224 #define REQ_exit		4
225 #define REQ_hold		5
226 #define REQ_info		6
227 #define REQ_list		7
228 #define REQ_next		8
229 #define REQ_raise		9
230 #define REQ_release		10
231 #define REQ_set			11
232 #define REQ_stop		12
233 #define REQ_test		13
234 #define REQ_wait		14
235 
236 static const Request_t	requests[] =
237 {
238 	"a*ll",		REQ_all,	1,	1,
239 	"c*lear",	REQ_clear,	1,	-1,
240 	"d*elete",	REQ_delete,	1,	-1,
241 	"e*xit",	REQ_exit,	0,	0,
242 	"hold",		REQ_hold,	0,	-1,
243 	"i*nfo",	REQ_info,	0,	0,
244 	"l*ist",	REQ_list,	0,	1,
245 	"n*ext",	REQ_next,	0,	0,
246 	"q*uit",	REQ_exit,	0,	0,
247 	"r*aise",	REQ_raise,	1,	-1,
248 	"release",	REQ_release,	0,	-1,
249 	"s*et",		REQ_set,	0,	0,
250 	"stop",		REQ_stop,	0,	0,
251 	"t*est",	REQ_test,	1,	1,
252 	"w*ait",	REQ_wait,	1,	1,
253 };
254 
255 /*
256  * generate a user response and log message
257  */
258 
259 static void
log(State_t * state,Connection_t * con,int type,const char * format,...)260 log(State_t* state, Connection_t* con, int type, const char* format, ...)
261 {
262 	va_list		ap;
263 	char*		s;
264 	int		n;
265 
266 	va_start(ap, format);
267 	if (format)
268 		sfvprintf(state->tmp, format, ap);
269 	va_end(ap);
270 	if (type)
271 	{
272 		if (!(s = sfstruse(state->tmp)))
273 			error(ERROR_SYSTEM|3, "out of space");
274 		if (type != 'I' && state->log && state->logf)
275 			sfprintf(state->logf, "%s (%03d) %c %s\n", fmttime("%K", time(NiL)), con ? con->fd : 0, toupper(type), s);
276 		if (con && type != 'R' && type != 'S')
277 		{
278 			if (type != 'L' || !con->quiet)
279 				debug_printf(con->fd, "%c %s\n", toupper(type), s);
280 			if (type == 'W')
281 				con->code |= 1;
282 			else if (type == 'E')
283 				con->code |= 2;
284 		}
285 	}
286 }
287 
288 /*
289  * accept a new connection
290  */
291 
292 static int
acceptf(Css_t * css,Cssfd_t * fp,Csid_t * ip,char ** av,Cssdisc_t * disc)293 acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
294 {
295 	register State_t*	state = (State_t*)disc;
296 	register Connection_t*	con;
297 
298 	if (!(con = newof(0, Connection_t, 1, 0)))
299 		return -1;
300 	fp->data = con;
301 	con->id = *ip;
302 	con->waiting = 0;
303 	con->fd = fp->fd;
304 	state->active++;
305 	dtinsert(state->connections, con);
306 	log(state, con, 'S', "accept connection -- %d active", state->active);
307 	return fp->fd;
308 }
309 
310 /*
311  * notify connections waiting on ep
312  */
313 
314 static int
notify(State_t * state,Event_t * ep)315 notify(State_t* state, Event_t* ep)
316 {
317 	Connection_t*	cp;
318 	Waiting_t*	wp;
319 	char*		s;
320 	size_t		n;
321 
322 	for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
323 		if (cp->waiting && (wp = (Waiting_t*)dtmatch(cp->waiting, &ep)))
324 		{
325 			if (wp->id >= 0)
326 			{
327 				log(state, cp, 'x', "%d 0", wp->id);
328 				n = sfstrtell(state->usrf);
329 				if (!(s = sfstruse(state->usrf)))
330 					error(ERROR_SYSTEM|3, "out of space");
331 				write(cp->fd, s, n);
332 			}
333 			else if (!cp->quiet)
334 				log(state, cp, 'i', "%s raised", ep->name);
335 			n = ep->waiting == 1;
336 			dtdelete(cp->waiting, wp);
337 			if (n)
338 				break;
339 		}
340 	return 0;
341 }
342 
343 /*
344  * post pending event name for connection
345  */
346 
347 static int
post(State_t * state,Connection_t * con,const char * name,int id)348 post(State_t* state, Connection_t* con, const char* name, int id)
349 {
350 	Event_t*	ep;
351 	Waiting_t*	wp;
352 
353 	if (!con->waiting && !(con->waiting = dtopen(&state->waitdisc, Dtset)))
354 	{
355 		error(ERROR_SYSTEM|3, "out of space [waiting]");
356 		return -1;
357 	}
358 	if (ep = dtmatch(state->events, name))
359 	{
360 		if (dtmatch(con->waiting, &ep))
361 			return 0;
362 	}
363 	else if (!(ep = newof(0, Event_t, 1, 0)))
364 	{
365 		error(ERROR_SYSTEM|3, "out of space [event]");
366 		return -1;
367 	}
368 	else
369 	{
370 		strcpy(ep->name, name);
371 		dtinsert(state->events, ep);
372 	}
373 	if (!(wp = newof(0, Waiting_t, 1, 0)))
374 	{
375 		error(ERROR_SYSTEM|3, "out of space [waiting]");
376 		return -1;
377 	}
378 	ep->waiting++;
379 	wp->id = id;
380 	wp->event = ep;
381 	dtinsert(con->waiting, wp);
382 	return 0;
383 }
384 
385 /*
386  * list server info/state
387  */
388 
389 static int
info(State_t * state,Connection_t * con,Css_t * css)390 info(State_t* state, Connection_t* con, Css_t* css)
391 {
392 	Connection_t*	cp;
393 	Waiting_t*	wp;
394 	int		n;
395 
396 	log(state, con, 'I', "info server='%s' version=%d.%d host=%s pid=%d uid=%d gid=%d", fmtident(usage), EVENT_MAJOR, EVENT_MINOR, csname(css->state, 0), getpid(), geteuid(), getegid());
397 	log(state, con, 'I', "info active=%d", state->active);
398 	for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
399 		if (cp->waiting && (n = dtsize(cp->waiting)) > 0)
400 		{
401 			log(state, con, 0, "waiting connection=%d count=%d", cp->fd, n);
402 			for (wp = (Waiting_t*)dtfirst(cp->waiting); wp; wp = (Waiting_t*)dtnext(cp->waiting, wp))
403 				log(state, con, 0, " %s", wp->event->name);
404 			log(state, con, 'I', 0);
405 		}
406 	log(state, con, 'I', "done");
407 	return 0;
408 }
409 
410 static int	request(State_t*, Connection_t*, int, int, char**, unsigned long, unsigned long);
411 
412 /*
413  * apply request r to one key
414  */
415 
416 static int
apply(State_t * state,Connection_t * con,int id,int index,datum key,datum val,Data_t * dat)417 apply(State_t* state, Connection_t* con, int id, int index, datum key, datum val, Data_t* dat)
418 {
419 	Event_t*	e;
420 	int		n;
421 
422 	switch (index)
423 	{
424 	case REQ_clear:
425 		dat->flags |= DATA_clear;
426 		dat->time = time(NiL);
427 		val.dptr = (void*)dat;
428 		val.dsize = sizeof(*dat);
429 		if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
430 			log(state, con, 'L', "%s cleared", key.dptr);
431 		else if (!dbm_error(state->dbm))
432 			log(state, con, 'W', "%s unchanged", key.dptr);
433 		else
434 		{
435 			dbm_clearerr(state->dbm);
436 			log(state, con, 'E', "%s io error", key.dptr);
437 		}
438 		break;
439 	case REQ_delete:
440 		if (!dbm_delete(state->dbm, key))
441 		{
442 			log(state, con, 'L', "%s deleted", key.dptr);
443 			return 1;
444 		}
445 		else if (!dbm_error(state->dbm))
446 			log(state, con, 'W', "%s not in db", key.dptr);
447 		else
448 		{
449 			dbm_clearerr(state->dbm);
450 			log(state, con, 'E', "%s io error", key.dptr);
451 		}
452 		break;
453 	case REQ_hold:
454 		dat->flags |= DATA_hold;
455 		dat->time = time(NiL);
456 		val.dptr = (void*)dat;
457 		val.dsize = sizeof(*dat);
458 		if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
459 			log(state, con, 'L', "%s held", key.dptr);
460 		else if (!dbm_error(state->dbm))
461 			log(state, con, 'W', "%s unchanged", key.dptr);
462 		else
463 		{
464 			dbm_clearerr(state->dbm);
465 			log(state, con, 'E', "%s io error", key.dptr);
466 		}
467 		break;
468 	case REQ_raise:
469 		dat->flags &= ~DATA_clear;
470 		dat->time = time(NiL);
471 		dat->raise++;
472 		val.dptr = (void*)dat;
473 		val.dsize = sizeof(*dat);
474 		if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
475 		{
476 			if (!state->hold && (e = (Event_t*)dtmatch(state->events, key.dptr)))
477 				notify(state, e);
478 			log(state, con, 'I', "%s raised", key.dptr);
479 		}
480 		else if (!dbm_error(state->dbm))
481 			log(state, con, 'W', "%s unchanged", key.dptr);
482 		else
483 		{
484 			dbm_clearerr(state->dbm);
485 			log(state, con, 'E', "%s io error", key.dptr);
486 		}
487 		break;
488 	case REQ_release:
489 		if (dat->flags & DATA_hold)
490 		{
491 			dat->flags &= ~DATA_hold;
492 			if (dat->raise)
493 			{
494 				val.dptr = (void*)dat;
495 				val.dsize = sizeof(*dat);
496 				if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
497 					log(state, con, 'L', "%s released", key.dptr);
498 				else if (!dbm_error(state->dbm))
499 					log(state, con, 'W', "%s unchanged", key.dptr);
500 				else
501 				{
502 					dbm_clearerr(state->dbm);
503 					log(state, con, 'E', "%s io error", key.dptr);
504 				}
505 				if (e = (Event_t*)dtmatch(state->events, key.dptr))
506 					notify(state, e);
507 			}
508 			else if (!dbm_delete(state->dbm, key))
509 				log(state, con, 'L', "%s deleted", key.dptr);
510 			else if (!dbm_error(state->dbm))
511 				log(state, con, 'W', "%s not in db", key.dptr);
512 			else
513 			{
514 				dbm_clearerr(state->dbm);
515 				log(state, con, 'E', "%s io error", key.dptr);
516 			}
517 		}
518 		break;
519 	case REQ_test:
520 		if (val.dptr && !(dat->flags & DATA_clear))
521 		{
522 			if (state->hold)
523 				log(state, con, 'W', "%s global hold", key.dptr);
524 			else if (dat->flags & DATA_hold)
525 				log(state, con, 'W', "%s explicit hold", key.dptr);
526 			else
527 				log(state, con, 'I', "%s raised", key.dptr);
528 		}
529 		else
530 			log(state, con, 'I', "%s not-raised", key.dptr);
531 		break;
532 	case REQ_wait:
533 		if (val.dptr && !state->hold && !(dat->flags & (DATA_clear|DATA_hold)))
534 			log(state, con, 'I', "%s raised", key.dptr);
535 		else if (post(state, con, key.dptr, id))
536 			return -1;
537 		break;
538 	}
539 	return 0;
540 }
541 
542 /*
543  * apply request r to args a
544  */
545 
546 static int
request(State_t * state,Connection_t * con,int id,int index,char ** a,unsigned long older,unsigned long newer)547 request(State_t* state, Connection_t* con, int id, int index, char** a, unsigned long older, unsigned long newer)
548 {
549 	char*			s;
550 	int			i;
551 	Event_t*		e;
552 	datum			key;
553 	datum			val;
554 	Data_t			dat;
555 	regex_t			re;
556 	char			buf[64];
557 
558 	while (s = *a++)
559 		if (i = regcomp(&re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
560 		{
561 			regerror(i, &re, buf, sizeof(buf));
562 			log(state, con, 'E', "%s: %s", s, buf);
563 		}
564 		else if (regstat(&re)->re_info & REG_LITERAL)
565 		{
566 			if (!EVENT(s))
567 			{
568 				log(state, con, 'E', "%s invalid event name", s);
569 				return -1;
570 			}
571 			key.dptr = (void*)s;
572 			key.dsize = strlen(s) + 1;
573 			if (key.dsize >= sizeof(e->name))
574 				s[(key.dsize = sizeof(e->name)) - 1] = 0;
575 			val = dbm_fetch(state->dbm, key);
576 			if (val.dptr)
577 			{
578 				if (val.dsize > sizeof(dat))
579 					val.dsize = sizeof(dat);
580 				swapmem(state->swap, val.dptr, &dat, sizeof(dat));
581 			}
582 			else
583 				memset(&dat, 0, sizeof(dat));
584 			if (apply(state, con, id, index, key, val, &dat))
585 				return -1;
586 		}
587 		else
588 		{
589 		rescan:
590 			for (key = dbm_firstkey(state->dbm); key.dptr; key = dbm_nextkey(state->dbm))
591 				if (EVENT(key.dptr) && !regexec(&re, key.dptr, 0, NiL, 0))
592 				{
593 					val = dbm_fetch(state->dbm, key);
594 					if (val.dsize > sizeof(dat))
595 						val.dsize = sizeof(dat);
596 					swapmem(state->swap, val.dptr, &dat, val.dsize);
597 					if ((!older || dat.time < older) && (!newer || dat.time > newer))
598 					{
599 						if ((i = apply(state, con, id, index, key, val, &dat)) < 0)
600 							return -1;
601 						if (i > 0)
602 							goto rescan;
603 					}
604 				}
605 		}
606 	return 0;
607 }
608 
609 /*
610  * convert s to a date/time
611  */
612 
613 static unsigned long
date(State_t * state,Connection_t * con,const char * s)614 date(State_t* state, Connection_t* con, const char* s)
615 {
616 	unsigned long	t;
617 	char*		e;
618 	datum		key;
619 	datum		val;
620 	Data_t		dat;
621 
622 	key.dptr = (void*)s;
623 	key.dsize = strlen(s) + 1;
624 	val = dbm_fetch(state->dbm, key);
625 	if (val.dptr)
626 	{
627 		swapmem(state->swap, val.dptr, &dat, val.dsize);
628 		t = dat.time;
629 	}
630 	else
631 	{
632 		t = tmdate(s, &e, NiL);
633 		if (*e)
634 		{
635 			log(state, con, 'E', "%s: invalid date/time", s);
636 			t = 0;
637 		}
638 	}
639 	return t;
640 }
641 
642 /*
643  * service a request
644  */
645 
646 static int
actionf(register Css_t * css,register Cssfd_t * fp,Cssdisc_t * disc)647 actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
648 {
649 	register State_t*	state = (State_t*)disc;
650 	register Connection_t*	con;
651 	char*			s;
652 	char*			t;
653 	char**			a;
654 	char**			q;
655 	Cssfd_t*		f;
656 	Request_t*		r;
657 	Event_t*		e;
658 	Waiting_t*		w;
659 	Connection_t*		x;
660 	int			n;
661 	int			err;
662 	int			id;
663 	unsigned long		older;
664 	unsigned long		newer;
665 	datum			key;
666 	datum			val;
667 	Data_t			data;
668 	char			buf[64];
669 
670 	switch (fp->status)
671 	{
672 	case CS_POLL_CLOSE:
673 		if (con = (Connection_t*)fp->data)
674 			dtdelete(state->connections, con);
675 		return 0;
676 	case CS_POLL_READ:
677 		con = (Connection_t*)fp->data;
678 		if ((n = csread(css->state, fp->fd, state->req, sizeof(state->req), CS_LINE)) <= 0)
679 			return -1;
680 		state->req[--n] = 0;
681 		log(state, con, 'R', "%s", state->req);
682 		con->code = 0;
683 		if (tokscan(state->req, NiL, " %v ", state->cmd, elementsof(state->cmd) - 1) > 0)
684 		{
685 			id = -1;
686 			for (q = state->cmd; (s = *q) && (isalpha(*s) || *s == '_'); q++)
687 			{
688 				while (isalnum(*++s));
689 				if (*s != '=')
690 					break;
691 				if ((s - *q) == 2 && *(s - 1) == 'd' && *(s - 2) == 'i')
692 					id = (int)strtol(s + 1, NiL, 0);
693 			}
694 			s = *(a = q);
695 			if (!(r = (Request_t*)strpsearch(requests, elementsof(requests), sizeof(requests[0]), s, NiL)))
696 				log(state, con, 'E', "%s: unknown request", s);
697 			else
698 			{
699 				opt_info.index = 0;
700 				newer = older = 0;
701 				err = 0;
702 				sfstrseek(state->usrf, 0, SEEK_SET);
703 				for (;;)
704 				{
705 					switch (optget(a, usage))
706 					{
707 					case 'e':
708 						if (r->index == REQ_set)
709 						{
710 							state->expire = strelapsed(opt_info.arg, &t, 1);
711 							if (*t)
712 							{
713 								log(state, con, 'E', "%s: invalid elapsed time expression", opt_info.arg);
714 								err = 1;
715 								break;
716 							}
717 						}
718 						continue;
719 					case 'l':
720 						if (r->index == REQ_set)
721 							state->log = opt_info.num;
722 						continue;
723 					case 'n':
724 						newer = date(state, con, opt_info.arg);
725 						continue;
726 					case 'o':
727 						older = date(state, con, opt_info.arg);
728 						continue;
729 					case 'q':
730 						con->quiet = opt_info.num;
731 						continue;
732 					case '?':
733 					case ':':
734 						log(state, con, 'E', "%s: %s", s, opt_info.arg);
735 						err = 1;
736 						break;
737 					}
738 					break;
739 				}
740 				if (!err)
741 				{
742 					if (!*(a += opt_info.index))
743 					{
744 						if (newer || older)
745 						{
746 							a[0] = "*";
747 							a[1] = 0;
748 							n = 1;
749 						}
750 						else
751 							n = 0;
752 					}
753 					else
754 						n = a[1] ? 2 : 1;
755 					if (r->min && n < r->min)
756 						sfprintf(state->usrf, "E %s: at least %d argument%s expected\n", s, r->min, r->min == 1 ? "" : "s");
757 					else if (r->max > 0 && n > r->max)
758 						log(state, con, 'E', "%s: at most %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
759 					else if (r->min == r->max && n != r->max)
760 						log(state, con, 'E', "%s: %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
761 					else
762 						switch (r->index)
763 						{
764 						case REQ_all:
765 							n = (int)strtol(a[0], &t, 0);
766 							if (*t)
767 							{
768 								log(state, con, 'E', "%s: invalid numeric value", a[0]);
769 								break;
770 							}
771 							else if (!(f = cssfd(css, n, 0)) || !(x = (Connection_t*)f->data))
772 							{
773 								log(state, con, 'E', "%d: invalid connection index", n);
774 								break;
775 							}
776 							if (x->waiting)
777 							{
778 								n = x->quiet;
779 								x->quiet = 1;
780 								a = state->cmd;
781 								for (w = (Waiting_t*)dtfirst(x->waiting); w; w = (Waiting_t*)dtnext(x->waiting, w))
782 								{
783 									if (a >= &state->cmd[elementsof(state->cmd)-1])
784 									{
785 										*a = 0;
786 										if (request(state, x, -1, REQ_raise, a = state->cmd, older, newer))
787 											break;
788 									}
789 									*a++ = w->event->name;
790 									log(state, con, 'R', "%s %s", s, w->event->name);
791 								}
792 								if (a > state->cmd)
793 								{
794 									*a = 0;
795 									request(state, x, -1, REQ_raise, state->cmd, older, newer);
796 								}
797 								x->quiet = n;
798 							}
799 							log(state, con, 'I', "done");
800 							break;
801 						case REQ_clear:
802 						case REQ_delete:
803 						case REQ_raise:
804 						case REQ_test:
805 						case REQ_wait:
806 							if (request(state, con, id, r->index, a, older, newer))
807 								return -1;
808 							break;
809 						case REQ_exit:
810 							cssfd(css, fp->fd, CS_POLL_CLOSE);
811 							break;
812 						case REQ_info:
813 							info(state, con, css);
814 							break;
815 						case REQ_hold:
816 							if (!*a)
817 							{
818 								state->hold = 1;
819 								sfprintf(state->usrf, "I holding\n");
820 							}
821 							else
822 								if (request(state, con, id, r->index, a, older, newer))
823 									return -1;
824 							break;
825 						case REQ_list:
826 							con->all = 1;
827 							if (s = *a)
828 							{
829 								if (n = regcomp(&con->re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
830 								{
831 									regerror(n, &con->re, buf, sizeof(buf));
832 									log(state, con, 'E', "%s: %s", s, buf);
833 									break;
834 								}
835 								con->all = 0;
836 							}
837 							con->list = dbm_firstkey(state->dbm);
838 							if (!con->list.dptr)
839 							{
840 								log(state, con, 'I', "empty");
841 								break;
842 							}
843 							con->newer = newer;
844 							con->older = older;
845 							goto list;
846 						case REQ_next:
847 							if (!con->list.dptr)
848 							{
849 								log(state, con, 'W', "next: must execute list first");
850 								break;
851 							}
852 							for (;;)
853 							{
854 								con->list = dbm_nextkey(state->dbm);
855 								if (!con->list.dptr)
856 								{
857 									log(state, con, 'I', "done");
858 									break;
859 								}
860 						list:
861 								if (EVENT(con->list.dptr) && (con->all || !regexec(&con->re, con->list.dptr, 0, NiL, 0)))
862 								{
863 									val = dbm_fetch(state->dbm, con->list);
864 									if (val.dsize > sizeof(data))
865 										val.dsize = sizeof(data);
866 									swapmem(state->swap, val.dptr, &data, val.dsize);
867 									if ((!con->older || data.time < con->older) && (!con->newer || data.time > con->newer))
868 									{
869 										log(state, con, 'I', "event %s %s %d%s%s", con->list.dptr, fmttime("%K", data.time), data.raise, (data.flags & DATA_clear) ? " CLEAR" : "", (data.flags & DATA_hold) ? " HOLD" : "");
870 										break;
871 									}
872 								}
873 							}
874 							break;
875 						case REQ_release:
876 							if (!*a)
877 							{
878 								state->hold = 0;
879 								sfprintf(state->usrf, "I released\n");
880 								key = dbm_firstkey(state->dbm);
881 								while (key.dptr)
882 								{
883 									val = dbm_fetch(state->dbm, key);
884 									if (val.dsize > sizeof(data))
885 										val.dsize = sizeof(data);
886 									swapmem(state->swap, val.dptr, &data, val.dsize);
887 									if (!(data.flags & (DATA_clear|DATA_hold)) && (e = (Event_t*)dtmatch(state->events, key.dptr)))
888 										notify(state, e);
889 									key = dbm_nextkey(state->dbm);
890 								}
891 							}
892 							else
893 								if (request(state, con, id, r->index, a, older, newer))
894 									return -1;
895 							break;
896 						case REQ_set:
897 							break;
898 						case REQ_stop:
899 							exit(0);
900 							break;
901 						}
902 				}
903 			}
904 			if (sfstrtell(state->usrf))
905 			{
906 				if (id >= 0)
907 				{
908 					sfstrseek(state->usrf, 0, SEEK_SET);
909 					log(state, con, 'x', "%d %d", id, con->code);
910 				}
911 				n = sfstrtell(state->usrf);
912 				if (!(s = sfstruse(state->usrf)))
913 					error(ERROR_SYSTEM|3, "out of space");
914 				if (cswrite(css->state, fp->fd, s, n) != n)
915 					return -1;
916 			}
917 		}
918 		return 1;
919 	}
920 	return 0;
921 }
922 
923 /*
924  * handle exceptions
925  */
926 
927 static int
exceptf(Css_t * css,unsigned long op,unsigned long arg,Cssdisc_t * disc)928 exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
929 {
930 	register State_t*	state = (State_t*)disc;
931 
932 	switch (op)
933 	{
934 	case CSS_CLOSE:
935 		if (state->dbm)
936 		{
937 			dbm_close(state->dbm);
938 			state->dbm = 0;
939 		}
940 		return 0;
941 	case CSS_INTERRUPT:
942 		error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
943 		return 0;
944 	case CSS_WAKEUP:
945 		error(2, "wakeup");
946 		return 0;
947 	}
948 	error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
949 	return -1;
950 }
951 
952 /*
953  * free connection
954  */
955 
956 static void
confree(Dt_t * dt,void * obj,Dtdisc_t * disc)957 confree(Dt_t* dt, void* obj, Dtdisc_t* disc)
958 {
959 	State_t*	state = (State_t*)((char*)disc - offsetof(State_t, condisc));
960 	Connection_t*	con = (Connection_t*)obj;
961 
962 	NoP(dt);
963 	NoP(disc);
964 	state->active--;
965 	if (con->waiting)
966 		dtclose(con->waiting);
967 	log(state, con, 'S', "drop connection -- %d active", state->active);
968 	free(obj);
969 }
970 
971 /*
972  * free event
973  */
974 
975 static void
eventfree(Dt_t * dt,void * obj,Dtdisc_t * disc)976 eventfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
977 {
978 	NoP(dt);
979 	NoP(disc);
980 	free(obj);
981 }
982 
983 /*
984  * free connection pending event
985  */
986 
987 static void
waitfree(Dt_t * dt,void * obj,Dtdisc_t * disc)988 waitfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
989 {
990 	State_t*	state = (State_t*)((char*)disc - offsetof(State_t, waitdisc));
991 	Waiting_t*	p = (Waiting_t*)obj;
992 
993 	NoP(dt);
994 	if (--p->event->waiting == 0)
995 		dtdelete(state->events, p->event);
996 	free(obj);
997 }
998 
999 /*
1000  * open and verify event db
1001  */
1002 
1003 static void
db(register State_t * state)1004 db(register State_t* state)
1005 {
1006 	datum		key;
1007 	datum		val;
1008 	Data_t		data;
1009 	uint32_t	u4;
1010 
1011 	if (!(state->dbm = dbm_open(state->path, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)))
1012 		error(ERROR_SYSTEM|3, "%s: cannot open database for read/write", state->path);
1013 	key.dptr = (void*)ident_key;
1014 	key.dsize = sizeof(ident_key);
1015 	val = dbm_fetch(state->dbm, key);
1016 	if (val.dptr)
1017 	{
1018 		if (val.dsize != sizeof(data))
1019 			error(3, "%s: invalid db -- data size %d, expected %d", state->path, val.dsize, sizeof(data));
1020 		memcpy(&data, val.dptr, val.dsize);
1021 		if (memcmp(&data.expire, ident_name, sizeof(data.expire)))
1022 			error(3, "%s: %s: invalid db -- ident mismatch, expected %s", state->path, ident_key, ident_name);
1023 		u4 = IDENT_SWAP;
1024 		if (state->swap = swapop(&u4, &data.time, 4))
1025 			swapmem(state->swap, &data, &data, sizeof(data));
1026 	}
1027 	else
1028 	{
1029 		val = dbm_firstkey(state->dbm);
1030 		if (val.dptr)
1031 			error(3, "%s: %s: invalid db -- ident entry expected", state->path, ident_key);
1032 		memset(&data, 0, sizeof(data));
1033 		memcpy(&data.expire, ident_name, sizeof(data.expire));
1034 		data.time = IDENT_SWAP;
1035 		data.raise = IDENT_VERSION;
1036 		val.dptr = (void*)&data;
1037 		val.dsize = sizeof(data);
1038 		if (dbm_store(state->dbm, key, val, DBM_INSERT))
1039 		{
1040 			dbm_clearerr(state->dbm);
1041 			error(3, "%s: %s: db initial ident entry store failed", state->path, ident_key);
1042 		}
1043 	}
1044 	state->major = (data.raise >> 16) & 0xffff;
1045 	state->minor = data.raise & 0xffff;
1046 }
1047 
1048 /*
1049  * client/server main
1050  */
1051 
1052 int
main(int argc,char ** argv)1053 main(int argc, char** argv)
1054 {
1055 	char*			s;
1056 	Css_t*			css;
1057 
1058 	char*			p = 0;
1059 	int			server = 0;
1060 
1061 	static State_t		state;
1062 
1063 	NoP(argc);
1064 	setlocale(LC_ALL, "");
1065 	opt_info.argv = argv;
1066 	state.log = 1;
1067 	error_info.id = (char*)command;
1068 	if (!(state.usrf = sfstropen()) || !(state.tmp = sfstropen()))
1069 		error(3, "out of space [buf]");
1070 
1071 	/*
1072 	 * check the options
1073 	 */
1074 
1075 	for (;;)
1076 	{
1077 		switch (optget(argv, usage))
1078 		{
1079 		case 'c':
1080 			p = opt_info.arg;
1081 			continue;
1082 		case 'e':
1083 			state.expire = strelapsed(opt_info.arg, &s, 1);
1084 			if (*s)
1085 				error(2, "%s: invalid elapsed time expression", opt_info.arg);
1086 			continue;
1087 		case 'i':
1088 			server = 1;
1089 			continue;
1090 		case 'l':
1091 			state.log = opt_info.num;
1092 			continue;
1093 		case '?':
1094 			error(ERROR_USAGE|4, "%s", opt_info.arg);
1095 			continue;
1096 		case ':':
1097 			error(2, "%s", opt_info.arg);
1098 			continue;
1099 		}
1100 		break;
1101 	}
1102 	if (error_info.errors || !(state.path = *(argv += opt_info.index)))
1103 		error(ERROR_USAGE|4, "%s", optusage(NiL));
1104 
1105 	/*
1106 	 * get the connect stream path
1107 	 */
1108 
1109 	if (s = strrchr(state.path, '/'))
1110 		s++;
1111 	else
1112 		s = state.path;
1113 	if (p)
1114 		sfprintf(state.usrf, "%s/%s", p, s);
1115 	else
1116 		sfprintf(state.usrf, "/dev/tcp/local/%s/%s", error_info.id, s);
1117 	if (!(state.service = strdup(sfstruse(state.usrf))))
1118 		error(3, "out of space [service]");
1119 
1120 	/*
1121 	 * either server or client at this point
1122 	 */
1123 
1124 	if (server)
1125 	{
1126 		umask(S_IWOTH);
1127 		db(&state);
1128 		state.condisc.link = offsetof(Event_t, link);
1129 		state.condisc.freef = confree;
1130 		if (!(state.connections = dtopen(&state.condisc, Dtlist)))
1131 			error(ERROR_SYSTEM|3, "out of space [connection dictionary]");
1132 		state.eventdisc.link = offsetof(Event_t, link);
1133 		state.eventdisc.key = offsetof(Event_t, name);
1134 		state.eventdisc.freef = eventfree;
1135 		if (!(state.events = dtopen(&state.eventdisc, Dtoset)))
1136 			error(ERROR_SYSTEM|3, "out of space [event dictionary]");
1137 		state.waitdisc.link = offsetof(Waiting_t, link);
1138 		state.waitdisc.key = offsetof(Waiting_t, event);
1139 		state.waitdisc.size = sizeof(Event_t*);
1140 		state.waitdisc.freef = waitfree;
1141 		state.disc.version = CSS_VERSION;
1142 		state.disc.flags = CSS_DAEMON|CSS_LOG|CSS_ERROR|CSS_INTERRUPT|CSS_WAKEUP|CSS_PRESERVE;
1143 		state.disc.timeout = 60 * 60 * 1000L;
1144 		state.disc.acceptf = acceptf;
1145 		state.disc.actionf = actionf;
1146 		state.disc.errorf = errorf;
1147 		state.disc.exceptf = exceptf;
1148 		if (!(css = cssopen(state.service, &state.disc)))
1149 			return 1;
1150 		umask(S_IWOTH);
1151 		error_info.id = css->id;
1152 		if (state.log)
1153 		{
1154 			sfprintf(state.tmp, "%s.log", state.path);
1155 			if (!(s = sfstruse(state.tmp)))
1156 				error(ERROR_SYSTEM|3, "out of space");
1157 			if (state.logf = sfopen(NiL, s, "a"))
1158 				sfset(state.logf, SF_LINE, 1);
1159 			else
1160 				error(ERROR_SYSTEM|2, "%s: cannot append log file", s);
1161 		}
1162 		log(&state, 0, 'S', "start service %s", fmtident(usage));
1163 		csspoll(CS_NEVER, 0);
1164 		log(&state, 0, 'S', "stop service");
1165 		return 1;
1166 	}
1167 	return csclient(&cs, -1, state.service, "event> ", argv + 1, CS_CLIENT_ARGV|CS_CLIENT_SEP);
1168 }
1169 
1170 #endif
1171