1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1990-2011 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Eclipse Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.eclipse.org/org/documents/epl-v10.html *
11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <gsf@research.att.com> *
18 * *
19 ***********************************************************************/
20 #pragma prototyped
21
22 /*
23 * Glenn Fowler
24 * AT&T Research
25 *
26 * shared event daemon
27 */
28
29 #define EVENT_MAJOR 1
30 #define EVENT_MINOR 0
31
32 static const char usage[] =
33 "[-?\n@(#)$Id: event (AT&T Research) 2007-06-05 $\n]"
34 USAGE_LICENSE
35 "[+NAME?event - shared event client and server]"
36 "[+DESCRIPTION?\bevent\b is a shared event client and server. Events are "
37 "stored in a persistent database named by the \aname\a operand. Each "
38 "event has a name, an expiration, and a binary status \braised\b or "
39 "\bnot-raised\b. A non-existent event is \bnot-raised\b. Events may be "
40 "raised, deleted, cleared, tested and waited for. If no \brequest\b "
41 "operands are specified then requests are prompted for, with an "
42 "\bEVENT>\b prompt, and read from the standard input. Multiple command "
43 "line requests must be separated by \b:\b. In the following events "
44 "operands are matched by \bksh\b(1) patterns. The client requests are:]"
45 "{"
46 "[+all \aconnection\a?Raise all pending events for the "
47 "\aconnection\a. Mainly for debugging.]"
48 "[+clear \aevent\a ...?Mark \aevent\a not-raised but do not "
49 "delete from the database. This allows the events to be matched "
50 "by patterns.]"
51 "[+delete \aevent\a ...?Delete \aevent\a.]"
52 "[+exit?Close the client connection.]"
53 "[+hold [ \aevent\a ...]]?If \aevent\a operands are specified "
54 "then clients are not notified about the those events until they "
55 "are explicitly released by \brelease\b \aevent\a ... If no "
56 "events are specified then all current and future events will be "
57 "unconditionally held until a \brelease\b with no event "
58 "operands.]"
59 "[+info?List the server status pending events by client "
60 "connection. The list is terminated by a \bdone\b message.]"
61 "[+list [ \apattern\a ]]?Start an event dictionary scan and list "
62 "the first event. If \apattern\a is specified then only events "
63 "matching \apattern\a are listed.]"
64 "[+next?List the next event in the \blist\b event scan. The list "
65 "is terminated by a \bdone\b message.]"
66 "[+quit?Equivalent to exit.]"
67 "[+raise \aevent\a ...?Raise \aevent\a ...]"
68 "[+release [ \aevent\a ...]]?If \aevent\a operands are specified "
69 "then they are released from a previous \bhold\b \aevent\a ... "
70 "If no \aevent\a operands are specified then any previous "
71 "unconditional \bhold\b is turned off.]"
72 "[+set \aoption\a ...?]"
73 "[+stop?Terminate the server. Persistent data is preserved.]"
74 "[+test \aevent\a?Determine the \aevent\a status.]"
75 "[+wait \aevent\a?Wait for \aevent\a status to be \braised\b.]"
76 "}"
77 "[+?The \b--cs\b, \b--expire\b, \b--initialize\b, and \b--log\b options "
78 "apply to the initial service command, and the \b--expire\b, \b--log\b, "
79 "\b--newer\b, \b--older\b, and \b--quiet\b options apply to client "
80 "requests.]"
81 "[c:cs|connect-stream?Use \aconnect-stream\a instead of the default.]"
82 ":[connect-stream:=/dev/tcp/local/event]"
83 "[e:expire?Set the current event expiration to the \bdate\b(1) or "
84 "\bcron\b(1) expression \adate-expression\a.]:[date-expression]"
85 "[i:initialize?Initialize the service if it is not already running.]"
86 "[l!:log?Log server activity to \astate-name\a.log, where \astate-name\a "
87 "is the state path name sans suffix.]"
88 "[n:newer?Match events newer than \adate\a. If \b--older\b is also "
89 "specified then only event times > newer and < older match.]:[date]"
90 "[o:older?Match events older than \adate\a. If \b--newer\b is also "
91 "specified then only event times > newer and < older match.]:[date]"
92 "[q:quiet?Suppress request confirmation messages.]"
93 "\n"
94 "\nname [ request [ options ] [ arg ... ] ] [ : request ... ]\n"
95 "\n"
96 "[+CAVEATS?Expirations, logging and the \bset\b request are not "
97 "implemented yet.]"
98 "[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bnmake\b(1), \bdbm\b(3), "
99 "\bndbm\b(3), \bgdbm\b(3)]"
100 ;
101
102 static const char command[] = "event";
103
104 static const char ident_key[] = "'//\t<(IDENT)>\t\\\\'";
105 static const char ident_name[] = "EVEN";
106
107 #define IDENT_SWAP 0x01020304
108 #define IDENT_VERSION ((EVENT_MAJOR<<16)|(EVENT_MINOR))
109
110 #define EVENT(s) (*((char*)(s))!=ident_key[0])
111 #define log _log /* gnu builtin? you've got to be kidding */
112
113 #include <ast.h>
114 #include <cdt.h>
115 #include <css.h>
116 #include <ctype.h>
117 #include <debug.h>
118 #include <error.h>
119 #include <namval.h>
120 #include <ls.h>
121 #include <regex.h>
122 #include <stdarg.h>
123 #include <swap.h>
124 #include <tok.h>
125 #include <tm.h>
126 #include <ast_ndbm.h>
127
128 #if !_use_ndbm
129
130 int
main(int argc,char ** argv)131 main(int argc, char** argv)
132 {
133 NoP(argc);
134 NoP(argv);
135 error(3, "<ndbm.h> library required");
136 return 1;
137 }
138
139 #else
140
141 #define KEY_MAX 64 /* max key length + 1 */
142
143 struct Key_s; typedef struct Key_s Key_t;
144
145 #define DATA_clear 0x00000001 /* explicit clear */
146 #define DATA_hold 0x00000002 /* explicit hold */
147
148 typedef uint32_t Number_t;
149
150 typedef struct Data_s /* event data */
151 {
152 Number_t expire; /* expiration seconds since epoch */
153 Number_t time; /* last raise time */
154 Number_t raise; /* total # raise requests */
155 Number_t flags; /* DATA_* flags */
156 } Data_t;
157
158 typedef struct Event_s /* event bucket */
159 {
160 Dtlink_t link; /* dictionary link */
161 unsigned int waiting; /* # clients waiting */
162 Data_t data; /* event persistent data */
163 char name[256]; /* event name */
164 } Event_t;
165
166 typedef struct Waiting_s /* pending event bucket */
167 {
168 Dtlink_t link; /* dictionary link */
169 int id; /* wait id */
170 Event_t* event; /* event bucket pointer */
171 } Waiting_t;
172
173 typedef struct Connection_s /* client connection state */
174 {
175 Dtlink_t link; /* list link */
176 Csid_t id; /* connection id */
177 Dt_t* waiting; /* pending events */
178 datum list; /* list finger */
179 int fd; /* connection fd */
180 int all; /* list all vs. list match */
181 int code; /* request exit code */
182 int quiet; /* suppress response log messages */
183 unsigned long newer; /* list --newer time */
184 unsigned long older; /* list --older time */
185 regex_t re; /* list request pattern */
186 } Connection_t;
187
188 typedef struct Request_s /* static request info */
189 {
190 const char* name; /* request name */
191 int index; /* REQ_* index */
192 int min; /* min #args */
193 int max; /* max #args */
194 } Request_t;
195
196 typedef struct State_s /* program state */
197 {
198 Cssdisc_t disc; /* css discipline */
199 Dtdisc_t condisc; /* connection dictionary discipline */
200 Dtdisc_t eventdisc; /* event dictionary discipline */
201 Dtdisc_t waitdisc; /* pending events dictionary discipline */
202 unsigned int active; /* number of active clients */
203 int hold; /* hold announcements */
204 int log; /* log activity */
205 int major; /* db major version */
206 int minor; /* db major version */
207 int swap; /* datum <=> native int32_ swap */
208 unsigned long expire; /* expiration in seconds */
209 DBM* dbm; /* dbm handle */
210 Dt_t* connections; /* active connection list */
211 Dt_t* events; /* outstanding events dictionary */
212 char* service; /* service connect stream path */
213 char* path; /* event db path */
214 Sfio_t* logf; /* log buffer stream */
215 Sfio_t* usrf; /* usr buffer stream */
216 Sfio_t* tmp; /* tmp buffer stream */
217 char* cmd[1024]; /* request command argv */
218 char req[8 * 1024]; /* request buffer */
219 } State_t;
220
221 #define REQ_all 1
222 #define REQ_clear 2
223 #define REQ_delete 3
224 #define REQ_exit 4
225 #define REQ_hold 5
226 #define REQ_info 6
227 #define REQ_list 7
228 #define REQ_next 8
229 #define REQ_raise 9
230 #define REQ_release 10
231 #define REQ_set 11
232 #define REQ_stop 12
233 #define REQ_test 13
234 #define REQ_wait 14
235
236 static const Request_t requests[] =
237 {
238 "a*ll", REQ_all, 1, 1,
239 "c*lear", REQ_clear, 1, -1,
240 "d*elete", REQ_delete, 1, -1,
241 "e*xit", REQ_exit, 0, 0,
242 "hold", REQ_hold, 0, -1,
243 "i*nfo", REQ_info, 0, 0,
244 "l*ist", REQ_list, 0, 1,
245 "n*ext", REQ_next, 0, 0,
246 "q*uit", REQ_exit, 0, 0,
247 "r*aise", REQ_raise, 1, -1,
248 "release", REQ_release, 0, -1,
249 "s*et", REQ_set, 0, 0,
250 "stop", REQ_stop, 0, 0,
251 "t*est", REQ_test, 1, 1,
252 "w*ait", REQ_wait, 1, 1,
253 };
254
255 /*
256 * generate a user response and log message
257 */
258
259 static void
log(State_t * state,Connection_t * con,int type,const char * format,...)260 log(State_t* state, Connection_t* con, int type, const char* format, ...)
261 {
262 va_list ap;
263 char* s;
264
265 va_start(ap, format);
266 if (format)
267 sfvprintf(state->tmp, format, ap);
268 va_end(ap);
269 if (type)
270 {
271 if (!(s = sfstruse(state->tmp)))
272 error(ERROR_SYSTEM|3, "out of space");
273 if (type != 'I' && state->log && state->logf)
274 sfprintf(state->logf, "%s (%03d) %c %s\n", fmttime("%K", time(NiL)), con ? con->fd : 0, type, s);
275 if (con && type != 'R' && type != 'S')
276 {
277 if (type != 'L' || !con->quiet)
278 sfprintf(state->usrf, "%c %s\n", type, s);
279 if (type == 'W')
280 con->code |= 1;
281 else if (type == 'E')
282 con->code |= 2;
283 }
284 }
285 }
286
287 /*
288 * accept a new connection
289 */
290
291 static int
acceptf(Css_t * css,Cssfd_t * fp,Csid_t * ip,char ** av,Cssdisc_t * disc)292 acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
293 {
294 register State_t* state = (State_t*)disc;
295 register Connection_t* con;
296
297 if (!(con = newof(0, Connection_t, 1, 0)))
298 return -1;
299 fp->data = con;
300 con->id = *ip;
301 con->waiting = 0;
302 con->fd = fp->fd;
303 state->active++;
304 dtinsert(state->connections, con);
305 log(state, con, 'S', "accept connection -- %d active", state->active);
306 return fp->fd;
307 }
308
309 /*
310 * notify connections waiting on ep
311 */
312
313 static int
notify(State_t * state,Event_t * ep)314 notify(State_t* state, Event_t* ep)
315 {
316 Connection_t* cp;
317 Waiting_t* wp;
318 char* s;
319 size_t n;
320
321 for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
322 if (cp->waiting && (wp = (Waiting_t*)dtmatch(cp->waiting, &ep)))
323 {
324 if (wp->id >= 0)
325 {
326 log(state, cp, 'x', "%d 0", wp->id);
327 n = sfstrtell(state->usrf);
328 if (!(s = sfstruse(state->usrf)))
329 error(ERROR_SYSTEM|3, "out of space");
330 write(cp->fd, s, n);
331 }
332 else if (!cp->quiet)
333 log(state, cp, 'L', "%s raised", ep->name);
334 n = ep->waiting == 1;
335 dtdelete(cp->waiting, wp);
336 if (n)
337 break;
338 }
339 return 0;
340 }
341
342 /*
343 * post pending event name for connection
344 */
345
346 static int
post(State_t * state,Connection_t * con,const char * name,int id)347 post(State_t* state, Connection_t* con, const char* name, int id)
348 {
349 Event_t* ep;
350 Waiting_t* wp;
351
352 if (!con->waiting && !(con->waiting = dtopen(&state->waitdisc, Dtset)))
353 {
354 error(ERROR_SYSTEM|3, "out of space [waiting]");
355 return -1;
356 }
357 if (ep = dtmatch(state->events, name))
358 {
359 if (dtmatch(con->waiting, &ep))
360 return 0;
361 }
362 else if (!(ep = newof(0, Event_t, 1, 0)))
363 {
364 error(ERROR_SYSTEM|3, "out of space [event]");
365 return -1;
366 }
367 else
368 {
369 strcpy(ep->name, name);
370 dtinsert(state->events, ep);
371 }
372 if (!(wp = newof(0, Waiting_t, 1, 0)))
373 {
374 error(ERROR_SYSTEM|3, "out of space [waiting]");
375 return -1;
376 }
377 ep->waiting++;
378 wp->id = id;
379 wp->event = ep;
380 dtinsert(con->waiting, wp);
381 return 0;
382 }
383
384 /*
385 * list server info/state
386 */
387
388 static int
info(State_t * state,Connection_t * con,Css_t * css)389 info(State_t* state, Connection_t* con, Css_t* css)
390 {
391 Connection_t* cp;
392 Waiting_t* wp;
393 int n;
394
395 log(state, con, 'I', "info server='%s' version=%d.%d host=%s pid=%d uid=%d gid=%d", fmtident(usage), EVENT_MAJOR, EVENT_MINOR, csname(css->state, 0), getpid(), geteuid(), getegid());
396 log(state, con, 'I', "info active=%d", state->active);
397 for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
398 if (cp->waiting && (n = dtsize(cp->waiting)) > 0)
399 {
400 log(state, con, 0, "waiting connection=%d count=%d", cp->fd, n);
401 for (wp = (Waiting_t*)dtfirst(cp->waiting); wp; wp = (Waiting_t*)dtnext(cp->waiting, wp))
402 log(state, con, 0, " %s", wp->event->name);
403 log(state, con, 'I', 0);
404 }
405 log(state, con, 'I', "done");
406 return 0;
407 }
408
409 static int request(State_t*, Connection_t*, int, int, char**, unsigned long, unsigned long);
410
411 /*
412 * apply request r to one key
413 */
414
415 static int
apply(State_t * state,Connection_t * con,int id,int index,datum key,datum val,Data_t * dat)416 apply(State_t* state, Connection_t* con, int id, int index, datum key, datum val, Data_t* dat)
417 {
418 Event_t* e;
419 int n;
420
421 switch (index)
422 {
423 case REQ_clear:
424 dat->flags |= DATA_clear;
425 dat->time = time(NiL);
426 val.dptr = (void*)dat;
427 val.dsize = sizeof(*dat);
428 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
429 log(state, con, 'L', "%s cleared", key.dptr);
430 else if (!dbm_error(state->dbm))
431 log(state, con, 'W', "%s unchanged", key.dptr);
432 else
433 {
434 dbm_clearerr(state->dbm);
435 log(state, con, 'E', "%s io error", key.dptr);
436 }
437 break;
438 case REQ_delete:
439 if (!dbm_delete(state->dbm, key))
440 {
441 log(state, con, 'L', "%s deleted", key.dptr);
442 return 1;
443 }
444 else if (!dbm_error(state->dbm))
445 log(state, con, 'W', "%s not in db", key.dptr);
446 else
447 {
448 dbm_clearerr(state->dbm);
449 log(state, con, 'E', "%s io error", key.dptr);
450 }
451 break;
452 case REQ_hold:
453 dat->flags |= DATA_hold;
454 dat->time = time(NiL);
455 val.dptr = (void*)dat;
456 val.dsize = sizeof(*dat);
457 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
458 log(state, con, 'L', "%s held", key.dptr);
459 else if (!dbm_error(state->dbm))
460 log(state, con, 'W', "%s unchanged", key.dptr);
461 else
462 {
463 dbm_clearerr(state->dbm);
464 log(state, con, 'E', "%s io error", key.dptr);
465 }
466 break;
467 case REQ_raise:
468 dat->flags &= ~DATA_clear;
469 dat->time = time(NiL);
470 dat->raise++;
471 val.dptr = (void*)dat;
472 val.dsize = sizeof(*dat);
473 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
474 {
475 if (!state->hold && (e = (Event_t*)dtmatch(state->events, key.dptr)))
476 notify(state, e);
477 log(state, con, 'L', "%s raised", key.dptr);
478 }
479 else if (!dbm_error(state->dbm))
480 log(state, con, 'W', "%s unchanged", key.dptr);
481 else
482 {
483 dbm_clearerr(state->dbm);
484 log(state, con, 'E', "%s io error", key.dptr);
485 }
486 break;
487 case REQ_release:
488 if (dat->flags & DATA_hold)
489 {
490 dat->flags &= ~DATA_hold;
491 if (dat->raise)
492 {
493 val.dptr = (void*)dat;
494 val.dsize = sizeof(*dat);
495 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
496 log(state, con, 'L', "%s released", key.dptr);
497 else if (!dbm_error(state->dbm))
498 log(state, con, 'W', "%s unchanged", key.dptr);
499 else
500 {
501 dbm_clearerr(state->dbm);
502 log(state, con, 'E', "%s io error", key.dptr);
503 }
504 if (e = (Event_t*)dtmatch(state->events, key.dptr))
505 notify(state, e);
506 }
507 else if (!dbm_delete(state->dbm, key))
508 log(state, con, 'L', "%s deleted", key.dptr);
509 else if (!dbm_error(state->dbm))
510 log(state, con, 'W', "%s not in db", key.dptr);
511 else
512 {
513 dbm_clearerr(state->dbm);
514 log(state, con, 'E', "%s io error", key.dptr);
515 }
516 }
517 break;
518 case REQ_test:
519 if (val.dptr && !(dat->flags & DATA_clear))
520 {
521 if (state->hold)
522 log(state, con, 'W', "%s global hold", key.dptr);
523 else if (dat->flags & DATA_hold)
524 log(state, con, 'W', "%s explicit hold", key.dptr);
525 else
526 log(state, con, 'I', "%s raised", key.dptr);
527 }
528 else
529 log(state, con, 'I', "%s not-raised", key.dptr);
530 break;
531 case REQ_wait:
532 if (val.dptr && !state->hold && !(dat->flags & (DATA_clear|DATA_hold)))
533 log(state, con, 'I', "%s raised", key.dptr);
534 else if (post(state, con, key.dptr, id))
535 return -1;
536 break;
537 }
538 return 0;
539 }
540
541 /*
542 * apply request r to args a
543 */
544
545 static int
request(State_t * state,Connection_t * con,int id,int index,char ** a,unsigned long older,unsigned long newer)546 request(State_t* state, Connection_t* con, int id, int index, char** a, unsigned long older, unsigned long newer)
547 {
548 char* s;
549 int i;
550 Event_t* e;
551 datum key;
552 datum val;
553 Data_t dat;
554 regex_t re;
555 char buf[64];
556
557 while (s = *a++)
558 if (i = regcomp(&re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
559 {
560 regerror(i, &re, buf, sizeof(buf));
561 log(state, con, 'E', "%s: %s", s, buf);
562 }
563 else if (regstat(&re)->re_info & REG_LITERAL)
564 {
565 if (!EVENT(s))
566 {
567 log(state, con, 'E', "%s invalid event name", s);
568 return -1;
569 }
570 key.dptr = (void*)s;
571 key.dsize = strlen(s) + 1;
572 if (key.dsize >= sizeof(e->name))
573 s[(key.dsize = sizeof(e->name)) - 1] = 0;
574 val = dbm_fetch(state->dbm, key);
575 if (val.dptr)
576 {
577 if (val.dsize > sizeof(dat))
578 val.dsize = sizeof(dat);
579 swapmem(state->swap, val.dptr, &dat, sizeof(dat));
580 }
581 else
582 memset(&dat, 0, sizeof(dat));
583 if (apply(state, con, id, index, key, val, &dat))
584 return -1;
585 }
586 else
587 {
588 rescan:
589 for (key = dbm_firstkey(state->dbm); key.dptr; key = dbm_nextkey(state->dbm))
590 if (EVENT(key.dptr) && !regexec(&re, key.dptr, 0, NiL, 0))
591 {
592 val = dbm_fetch(state->dbm, key);
593 if (val.dsize > sizeof(dat))
594 val.dsize = sizeof(dat);
595 swapmem(state->swap, val.dptr, &dat, val.dsize);
596 if ((!older || dat.time < older) && (!newer || dat.time > newer))
597 {
598 if ((i = apply(state, con, id, index, key, val, &dat)) < 0)
599 return -1;
600 if (i > 0)
601 goto rescan;
602 }
603 }
604 }
605 return 0;
606 }
607
608 /*
609 * convert s to a date/time
610 */
611
612 static unsigned long
date(State_t * state,Connection_t * con,const char * s)613 date(State_t* state, Connection_t* con, const char* s)
614 {
615 unsigned long t;
616 char* e;
617 datum key;
618 datum val;
619 Data_t dat;
620
621 key.dptr = (void*)s;
622 key.dsize = strlen(s) + 1;
623 val = dbm_fetch(state->dbm, key);
624 if (val.dptr)
625 {
626 swapmem(state->swap, val.dptr, &dat, val.dsize);
627 t = dat.time;
628 }
629 else
630 {
631 t = tmdate(s, &e, NiL);
632 if (*e)
633 {
634 log(state, con, 'E', "%s: invalid date/time", s);
635 t = 0;
636 }
637 }
638 return t;
639 }
640
641 /*
642 * service a request
643 */
644
645 static int
actionf(register Css_t * css,register Cssfd_t * fp,Cssdisc_t * disc)646 actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
647 {
648 register State_t* state = (State_t*)disc;
649 register Connection_t* con;
650 char* s;
651 char* t;
652 char** a;
653 char** q;
654 Cssfd_t* f;
655 Request_t* r;
656 Event_t* e;
657 Waiting_t* w;
658 Connection_t* x;
659 int n;
660 int err;
661 int id;
662 unsigned long older;
663 unsigned long newer;
664 datum key;
665 datum val;
666 Data_t data;
667 char buf[64];
668
669 switch (fp->status)
670 {
671 case CS_POLL_CLOSE:
672 if (con = (Connection_t*)fp->data)
673 dtdelete(state->connections, con);
674 return 0;
675 case CS_POLL_READ:
676 con = (Connection_t*)fp->data;
677 if ((n = csread(css->state, fp->fd, state->req, sizeof(state->req), CS_LINE)) <= 0)
678 return -1;
679 state->req[--n] = 0;
680 log(state, con, 'R', "%s", state->req);
681 con->code = 0;
682 if (tokscan(state->req, NiL, " %v ", state->cmd, elementsof(state->cmd) - 1) > 0)
683 {
684 id = -1;
685 for (q = state->cmd; (s = *q) && (isalpha(*s) || *s == '_'); q++)
686 {
687 while (isalnum(*++s));
688 if (*s != '=')
689 break;
690 if ((s - *q) == 2 && *(s - 1) == 'd' && *(s - 2) == 'i')
691 id = (int)strtol(s + 1, NiL, 0);
692 }
693 s = *(a = q);
694 if (!(r = (Request_t*)strpsearch(requests, elementsof(requests), sizeof(requests[0]), s, NiL)))
695 log(state, con, 'E', "%s: unknown request", s);
696 else
697 {
698 opt_info.index = 0;
699 newer = older = 0;
700 err = 0;
701 sfstrseek(state->usrf, 0, SEEK_SET);
702 for (;;)
703 {
704 switch (optget(a, usage))
705 {
706 case 'e':
707 if (r->index == REQ_set)
708 {
709 state->expire = strelapsed(opt_info.arg, &t, 1);
710 if (*t)
711 {
712 log(state, con, 'E', "%s: invalid elapsed time expression", opt_info.arg);
713 err = 1;
714 break;
715 }
716 }
717 continue;
718 case 'l':
719 if (r->index == REQ_set)
720 state->log = opt_info.num;
721 continue;
722 case 'n':
723 newer = date(state, con, opt_info.arg);
724 continue;
725 case 'o':
726 older = date(state, con, opt_info.arg);
727 continue;
728 case 'q':
729 con->quiet = opt_info.num;
730 continue;
731 case '?':
732 case ':':
733 log(state, con, 'E', "%s: %s", s, opt_info.arg);
734 err = 1;
735 break;
736 }
737 break;
738 }
739 if (!err)
740 {
741 if (!*(a += opt_info.index))
742 {
743 if (newer || older)
744 {
745 a[0] = "*";
746 a[1] = 0;
747 n = 1;
748 }
749 else
750 n = 0;
751 }
752 else
753 n = a[1] ? 2 : 1;
754 if (r->min && n < r->min)
755 sfprintf(state->usrf, "E %s: at least %d argument%s expected\n", s, r->min, r->min == 1 ? "" : "s");
756 else if (r->max > 0 && n > r->max)
757 log(state, con, 'E', "%s: at most %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
758 else if (r->min == r->max && n != r->max)
759 log(state, con, 'E', "%s: %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
760 else
761 switch (r->index)
762 {
763 case REQ_all:
764 n = (int)strtol(a[0], &t, 0);
765 if (*t)
766 {
767 log(state, con, 'E', "%s: invalid numeric value", a[0]);
768 break;
769 }
770 else if (!(f = cssfd(css, n, 0)) || !(x = (Connection_t*)f->data))
771 {
772 log(state, con, 'E', "%d: invalid connection index", n);
773 break;
774 }
775 if (x->waiting)
776 {
777 n = x->quiet;
778 x->quiet = 1;
779 a = state->cmd;
780 for (w = (Waiting_t*)dtfirst(x->waiting); w; w = (Waiting_t*)dtnext(x->waiting, w))
781 {
782 if (a >= &state->cmd[elementsof(state->cmd)-1])
783 {
784 *a = 0;
785 if (request(state, x, -1, REQ_raise, a = state->cmd, older, newer))
786 break;
787 }
788 *a++ = w->event->name;
789 log(state, con, 'R', "%s %s", s, w->event->name);
790 }
791 if (a > state->cmd)
792 {
793 *a = 0;
794 request(state, x, -1, REQ_raise, state->cmd, older, newer);
795 }
796 x->quiet = n;
797 }
798 log(state, con, 'I', "done");
799 break;
800 case REQ_clear:
801 case REQ_delete:
802 case REQ_raise:
803 case REQ_test:
804 case REQ_wait:
805 if (request(state, con, id, r->index, a, older, newer))
806 return -1;
807 break;
808 case REQ_exit:
809 cssfd(css, fp->fd, CS_POLL_CLOSE);
810 break;
811 case REQ_info:
812 info(state, con, css);
813 break;
814 case REQ_hold:
815 if (!*a)
816 {
817 state->hold = 1;
818 sfprintf(state->usrf, "I holding\n");
819 }
820 else
821 if (request(state, con, id, r->index, a, older, newer))
822 return -1;
823 break;
824 case REQ_list:
825 con->all = 1;
826 if (s = *a)
827 {
828 if (n = regcomp(&con->re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
829 {
830 regerror(n, &con->re, buf, sizeof(buf));
831 log(state, con, 'E', "%s: %s", s, buf);
832 break;
833 }
834 con->all = 0;
835 }
836 con->list = dbm_firstkey(state->dbm);
837 if (!con->list.dptr)
838 {
839 log(state, con, 'I', "empty");
840 break;
841 }
842 con->newer = newer;
843 con->older = older;
844 goto list;
845 case REQ_next:
846 if (!con->list.dptr)
847 {
848 log(state, con, 'W', "next: must execute list first");
849 break;
850 }
851 for (;;)
852 {
853 con->list = dbm_nextkey(state->dbm);
854 if (!con->list.dptr)
855 {
856 log(state, con, 'I', "done");
857 break;
858 }
859 list:
860 if (EVENT(con->list.dptr) && (con->all || !regexec(&con->re, con->list.dptr, 0, NiL, 0)))
861 {
862 val = dbm_fetch(state->dbm, con->list);
863 if (val.dsize > sizeof(data))
864 val.dsize = sizeof(data);
865 swapmem(state->swap, val.dptr, &data, val.dsize);
866 if ((!con->older || data.time < con->older) && (!con->newer || data.time > con->newer))
867 {
868 log(state, con, 'I', "event %s %s %d%s%s", con->list.dptr, fmttime("%K", data.time), data.raise, (data.flags & DATA_clear) ? " CLEAR" : "", (data.flags & DATA_hold) ? " HOLD" : "");
869 break;
870 }
871 }
872 }
873 break;
874 case REQ_release:
875 if (!*a)
876 {
877 state->hold = 0;
878 sfprintf(state->usrf, "I released\n");
879 key = dbm_firstkey(state->dbm);
880 while (key.dptr)
881 {
882 val = dbm_fetch(state->dbm, key);
883 if (val.dsize > sizeof(data))
884 val.dsize = sizeof(data);
885 swapmem(state->swap, val.dptr, &data, val.dsize);
886 if (!(data.flags & (DATA_clear|DATA_hold)) && (e = (Event_t*)dtmatch(state->events, key.dptr)))
887 notify(state, e);
888 key = dbm_nextkey(state->dbm);
889 }
890 }
891 else
892 if (request(state, con, id, r->index, a, older, newer))
893 return -1;
894 break;
895 case REQ_set:
896 break;
897 case REQ_stop:
898 exit(0);
899 break;
900 }
901 }
902 }
903 if (sfstrtell(state->usrf))
904 {
905 if (id >= 0)
906 {
907 sfstrseek(state->usrf, 0, SEEK_SET);
908 log(state, con, 'x', "%d %d", id, con->code);
909 }
910 n = sfstrtell(state->usrf);
911 if (!(s = sfstruse(state->usrf)))
912 error(ERROR_SYSTEM|3, "out of space");
913 if (cswrite(css->state, fp->fd, s, n) != n)
914 return -1;
915 }
916 }
917 return 1;
918 }
919 return 0;
920 }
921
922 /*
923 * handle exceptions
924 */
925
926 static int
exceptf(Css_t * css,unsigned long op,unsigned long arg,Cssdisc_t * disc)927 exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
928 {
929 register State_t* state = (State_t*)disc;
930
931 switch (op)
932 {
933 case CSS_CLOSE:
934 if (state->dbm)
935 {
936 dbm_close(state->dbm);
937 state->dbm = 0;
938 }
939 return 0;
940 case CSS_INTERRUPT:
941 error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
942 return 0;
943 case CSS_WAKEUP:
944 error(2, "wakeup");
945 return 0;
946 }
947 error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
948 return -1;
949 }
950
951 /*
952 * free connection
953 */
954
955 static void
confree(Dt_t * dt,void * obj,Dtdisc_t * disc)956 confree(Dt_t* dt, void* obj, Dtdisc_t* disc)
957 {
958 State_t* state = (State_t*)((char*)disc - offsetof(State_t, condisc));
959 Connection_t* con = (Connection_t*)obj;
960
961 NoP(dt);
962 NoP(disc);
963 state->active--;
964 if (con->waiting)
965 dtclose(con->waiting);
966 log(state, con, 'S', "drop connection -- %d active", state->active);
967 free(obj);
968 }
969
970 /*
971 * free event
972 */
973
974 static void
eventfree(Dt_t * dt,void * obj,Dtdisc_t * disc)975 eventfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
976 {
977 NoP(dt);
978 NoP(disc);
979 free(obj);
980 }
981
982 /*
983 * free connection pending event
984 */
985
986 static void
waitfree(Dt_t * dt,void * obj,Dtdisc_t * disc)987 waitfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
988 {
989 State_t* state = (State_t*)((char*)disc - offsetof(State_t, waitdisc));
990 Waiting_t* p = (Waiting_t*)obj;
991
992 NoP(dt);
993 if (--p->event->waiting == 0)
994 dtdelete(state->events, p->event);
995 free(obj);
996 }
997
998 /*
999 * open and verify event db
1000 */
1001
1002 static void
db(register State_t * state)1003 db(register State_t* state)
1004 {
1005 datum key;
1006 datum val;
1007 Data_t data;
1008 uint32_t u4;
1009
1010 if (!(state->dbm = dbm_open(state->path, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)))
1011 error(ERROR_SYSTEM|3, "%s: cannot open database for read/write", state->path);
1012 key.dptr = (void*)ident_key;
1013 key.dsize = sizeof(ident_key);
1014 val = dbm_fetch(state->dbm, key);
1015 if (val.dptr)
1016 {
1017 if (val.dsize != sizeof(data))
1018 error(3, "%s: invalid db -- data size %d, expected %d", state->path, val.dsize, sizeof(data));
1019 memcpy(&data, val.dptr, val.dsize);
1020 if (memcmp(&data.expire, ident_name, sizeof(data.expire)))
1021 error(3, "%s: %s: invalid db -- ident mismatch, expected %s", state->path, ident_key, ident_name);
1022 u4 = IDENT_SWAP;
1023 if (state->swap = swapop(&u4, &data.time, 4))
1024 swapmem(state->swap, &data, &data, sizeof(data));
1025 }
1026 else
1027 {
1028 val = dbm_firstkey(state->dbm);
1029 if (val.dptr)
1030 error(3, "%s: %s: invalid db -- ident entry expected", state->path, ident_key);
1031 memset(&data, 0, sizeof(data));
1032 memcpy(&data.expire, ident_name, sizeof(data.expire));
1033 data.time = IDENT_SWAP;
1034 data.raise = IDENT_VERSION;
1035 val.dptr = (void*)&data;
1036 val.dsize = sizeof(data);
1037 if (dbm_store(state->dbm, key, val, DBM_INSERT))
1038 {
1039 dbm_clearerr(state->dbm);
1040 error(3, "%s: %s: db initial ident entry store failed", state->path, ident_key);
1041 }
1042 }
1043 state->major = (data.raise >> 16) & 0xffff;
1044 state->minor = data.raise & 0xffff;
1045 }
1046
1047 /*
1048 * client/server main
1049 */
1050
1051 int
main(int argc,char ** argv)1052 main(int argc, char** argv)
1053 {
1054 char* s;
1055 Css_t* css;
1056
1057 char* p = 0;
1058 int server = 0;
1059
1060 static State_t state;
1061
1062 NoP(argc);
1063 setlocale(LC_ALL, "");
1064 opt_info.argv = argv;
1065 state.log = 1;
1066 error_info.id = (char*)command;
1067 if (!(state.usrf = sfstropen()) || !(state.tmp = sfstropen()))
1068 error(3, "out of space [buf]");
1069
1070 /*
1071 * check the options
1072 */
1073
1074 for (;;)
1075 {
1076 switch (optget(argv, usage))
1077 {
1078 case 'c':
1079 p = opt_info.arg;
1080 continue;
1081 case 'e':
1082 state.expire = strelapsed(opt_info.arg, &s, 1);
1083 if (*s)
1084 error(2, "%s: invalid elapsed time expression", opt_info.arg);
1085 continue;
1086 case 'i':
1087 server = 1;
1088 continue;
1089 case 'l':
1090 state.log = opt_info.num;
1091 continue;
1092 case '?':
1093 error(ERROR_USAGE|4, "%s", opt_info.arg);
1094 continue;
1095 case ':':
1096 error(2, "%s", opt_info.arg);
1097 continue;
1098 }
1099 break;
1100 }
1101 if (error_info.errors || !(state.path = *(argv += opt_info.index)))
1102 error(ERROR_USAGE|4, "%s", optusage(NiL));
1103
1104 /*
1105 * get the connect stream path
1106 */
1107
1108 if (s = strrchr(state.path, '/'))
1109 s++;
1110 else
1111 s = state.path;
1112 if (p)
1113 sfprintf(state.usrf, "%s/%s", p, s);
1114 else
1115 sfprintf(state.usrf, "/dev/tcp/local/%s/%s", error_info.id, s);
1116 if (!(state.service = strdup(sfstruse(state.usrf))))
1117 error(3, "out of space [service]");
1118
1119 /*
1120 * either server or client at this point
1121 */
1122
1123 if (server)
1124 {
1125 umask(S_IWOTH);
1126 db(&state);
1127 state.condisc.link = offsetof(Event_t, link);
1128 state.condisc.freef = confree;
1129 if (!(state.connections = dtopen(&state.condisc, Dtlist)))
1130 error(ERROR_SYSTEM|3, "out of space [connection dictionary]");
1131 state.eventdisc.link = offsetof(Event_t, link);
1132 state.eventdisc.key = offsetof(Event_t, name);
1133 state.eventdisc.freef = eventfree;
1134 if (!(state.events = dtopen(&state.eventdisc, Dtoset)))
1135 error(ERROR_SYSTEM|3, "out of space [event dictionary]");
1136 state.waitdisc.link = offsetof(Waiting_t, link);
1137 state.waitdisc.key = offsetof(Waiting_t, event);
1138 state.waitdisc.size = sizeof(Event_t*);
1139 state.waitdisc.freef = waitfree;
1140 state.disc.version = CSS_VERSION;
1141 state.disc.flags = CSS_DAEMON|CSS_LOG|CSS_ERROR|CSS_INTERRUPT|CSS_WAKEUP|CSS_PRESERVE;
1142 state.disc.timeout = 60 * 60 * 1000L;
1143 state.disc.acceptf = acceptf;
1144 state.disc.actionf = actionf;
1145 state.disc.errorf = errorf;
1146 state.disc.exceptf = exceptf;
1147 if (!(css = cssopen(state.service, &state.disc)))
1148 return 1;
1149 umask(S_IWOTH);
1150 error_info.id = css->id;
1151 if (state.log)
1152 {
1153 sfprintf(state.tmp, "%s.log", state.path);
1154 if (!(s = sfstruse(state.tmp)))
1155 error(ERROR_SYSTEM|3, "out of space");
1156 if (state.logf = sfopen(NiL, s, "a"))
1157 sfset(state.logf, SF_LINE, 1);
1158 else
1159 error(ERROR_SYSTEM|2, "%s: cannot append log file", s);
1160 }
1161 log(&state, 0, 'S', "start service %s", fmtident(usage));
1162 csspoll(CS_NEVER, 0);
1163 log(&state, 0, 'S', "stop service");
1164 return 1;
1165 }
1166 return csclient(&cs, -1, state.service, "event> ", argv + 1, CS_CLIENT_ARGV|CS_CLIENT_SEP);
1167 }
1168
1169 #endif
1170