1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1990-2013 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Eclipse Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.eclipse.org/org/documents/epl-v10.html *
11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <glenn.s.fowler@gmail.com> *
18 * *
19 ***********************************************************************/
20 #pragma prototyped
21
22 /*
23 * Glenn Fowler
24 * AT&T Research
25 *
26 * shared event daemon
27 */
28
29 #define EVENT_MAJOR 1
30 #define EVENT_MINOR 0
31
32 static const char usage[] =
33 "[-?\n@(#)$Id: event (AT&T Research) 2013-10-25 $\n]"
34 USAGE_LICENSE
35 "[+NAME?event - shared event client and server]"
36 "[+DESCRIPTION?\bevent\b is a shared event client and server. Events are "
37 "stored in a persistent database named by the \aname\a operand. Each "
38 "event has a name, an expiration, and a binary status \braised\b or "
39 "\bnot-raised\b. A non-existent event is \bnot-raised\b. Events may be "
40 "raised, deleted, cleared, tested and waited for. If no \brequest\b "
41 "operands are specified then requests are prompted for, with an "
42 "\bEVENT>\b prompt, and read from the standard input. Multiple command "
43 "line requests must be separated by \b:\b. In the following events "
44 "operands are matched by \bksh\b(1) patterns. The client requests are:]"
45 "{"
46 "[+all \aconnection\a?Raise all pending events for the "
47 "\aconnection\a. Mainly for debugging.]"
48 "[+clear \aevent\a ...?Mark \aevent\a not-raised but do not "
49 "delete from the database. This allows the events to be matched "
50 "by patterns.]"
51 "[+delete \aevent\a ...?Delete \aevent\a.]"
52 "[+exit?Close the client connection.]"
53 "[+hold [ \aevent\a ...]]?If \aevent\a operands are specified "
54 "then clients are not notified about the those events until they "
55 "are explicitly released by \brelease\b \aevent\a ... If no "
56 "events are specified then all current and future events will be "
57 "unconditionally held until a \brelease\b with no event "
58 "operands.]"
59 "[+info?List the server status pending events by client "
60 "connection. The list is terminated by a \bdone\b message.]"
61 "[+list [ \apattern\a ]]?Start an event dictionary scan and list "
62 "the first event. If \apattern\a is specified then only events "
63 "matching \apattern\a are listed.]"
64 "[+next?List the next event in the \blist\b event scan. The list "
65 "is terminated by a \bdone\b message.]"
66 "[+quit?Equivalent to exit.]"
67 "[+raise \aevent\a ...?Raise \aevent\a ...]"
68 "[+release [ \aevent\a ...]]?If \aevent\a operands are specified "
69 "then they are released from a previous \bhold\b \aevent\a ... "
70 "If no \aevent\a operands are specified then any previous "
71 "unconditional \bhold\b is turned off.]"
72 "[+set \aoption\a ...?]"
73 "[+stop?Terminate the server. Persistent data is preserved.]"
74 "[+test \aevent\a?Determine the \aevent\a status.]"
75 "[+wait \aevent\a?Wait for \aevent\a status to be \braised\b.]"
76 "}"
77 "[+?The \b--cs\b, \b--expire\b, \b--initialize\b, and \b--log\b options "
78 "apply to the initial service command, and the \b--expire\b, \b--log\b, "
79 "\b--newer\b, \b--older\b, and \b--quiet\b options apply to client "
80 "requests.]"
81 "[c:cs|connect-stream?Use \aconnect-stream\a instead of the default.]"
82 ":[connect-stream:=/dev/tcp/local/event]"
83 "[e:expire?Set the current event expiration to the \bdate\b(1) or "
84 "\bcron\b(1) expression \adate-expression\a.]:[date-expression]"
85 "[i:initialize?Initialize the service if it is not already running.]"
86 "[l!:log?Log server activity to \astate-name\a.log, where \astate-name\a "
87 "is the state path name sans suffix.]"
88 "[n:newer?Match events newer than \adate\a. If \b--older\b is also "
89 "specified then only event times > newer and < older match.]:[date]"
90 "[o:older?Match events older than \adate\a. If \b--newer\b is also "
91 "specified then only event times > newer and < older match.]:[date]"
92 "[q:quiet?Suppress request confirmation messages.]"
93 "\n"
94 "\nname [ request [ options ] [ arg ... ] ] [ : request ... ]\n"
95 "\n"
96 "[+CAVEATS?Expirations, logging and the \bset\b request are not "
97 "implemented yet.]"
98 "[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bnmake\b(1), \bdbm\b(3), "
99 "\bndbm\b(3), \bgdbm\b(3)]"
100 ;
101
102 static const char command[] = "event";
103
104 static const char ident_key[] = "'//\t<(IDENT)>\t\\\\'";
105 static const char ident_name[] = "EVEN";
106
107 #define IDENT_SWAP 0x01020304
108 #define IDENT_VERSION ((EVENT_MAJOR<<16)|(EVENT_MINOR))
109
110 #define EVENT(s) (*((char*)(s))!=ident_key[0])
111 #define log _log /* gnu builtin? you've got to be kidding */
112
113 #include <ast.h>
114 #include <cdt.h>
115 #include <css.h>
116 #include <ctype.h>
117 #include <debug.h>
118 #include <error.h>
119 #include <namval.h>
120 #include <ls.h>
121 #include <regex.h>
122 #include <stdarg.h>
123 #include <swap.h>
124 #include <tok.h>
125 #include <tm.h>
126 #include <ast_ndbm.h>
127
128 #if !_use_ndbm
129
130 int
main(int argc,char ** argv)131 main(int argc, char** argv)
132 {
133 NoP(argc);
134 NoP(argv);
135 error(3, "<ndbm.h> library required");
136 return 1;
137 }
138
139 #else
140
141 #define KEY_MAX 64 /* max key length + 1 */
142
143 struct Key_s; typedef struct Key_s Key_t;
144
145 #define DATA_clear 0x00000001 /* explicit clear */
146 #define DATA_hold 0x00000002 /* explicit hold */
147
148 typedef uint32_t Number_t;
149
150 typedef struct Data_s /* event data */
151 {
152 Number_t expire; /* expiration seconds since epoch */
153 Number_t time; /* last raise time */
154 Number_t raise; /* total # raise requests */
155 Number_t flags; /* DATA_* flags */
156 } Data_t;
157
158 typedef struct Event_s /* event bucket */
159 {
160 Dtlink_t link; /* dictionary link */
161 unsigned int waiting; /* # clients waiting */
162 Data_t data; /* event persistent data */
163 char name[256]; /* event name */
164 } Event_t;
165
166 typedef struct Waiting_s /* pending event bucket */
167 {
168 Dtlink_t link; /* dictionary link */
169 int id; /* wait id */
170 Event_t* event; /* event bucket pointer */
171 } Waiting_t;
172
173 typedef struct Connection_s /* client connection state */
174 {
175 Dtlink_t link; /* list link */
176 Csid_t id; /* connection id */
177 Dt_t* waiting; /* pending events */
178 datum list; /* list finger */
179 int fd; /* connection fd */
180 int all; /* list all vs. list match */
181 int code; /* request exit code */
182 int quiet; /* suppress response log messages */
183 unsigned long newer; /* list --newer time */
184 unsigned long older; /* list --older time */
185 regex_t re; /* list request pattern */
186 } Connection_t;
187
188 typedef struct Request_s /* static request info */
189 {
190 const char* name; /* request name */
191 int index; /* REQ_* index */
192 int min; /* min #args */
193 int max; /* max #args */
194 } Request_t;
195
196 typedef struct State_s /* program state */
197 {
198 Cssdisc_t disc; /* css discipline */
199 Dtdisc_t condisc; /* connection dictionary discipline */
200 Dtdisc_t eventdisc; /* event dictionary discipline */
201 Dtdisc_t waitdisc; /* pending events dictionary discipline */
202 unsigned int active; /* number of active clients */
203 int hold; /* hold announcements */
204 int log; /* log activity */
205 int major; /* db major version */
206 int minor; /* db major version */
207 int swap; /* datum <=> native int32_ swap */
208 unsigned long expire; /* expiration in seconds */
209 DBM* dbm; /* dbm handle */
210 Dt_t* connections; /* active connection list */
211 Dt_t* events; /* outstanding events dictionary */
212 char* service; /* service connect stream path */
213 char* path; /* event db path */
214 Sfio_t* logf; /* log file stream */
215 Sfio_t* usrf; /* usr buffer stream */
216 Sfio_t* tmp; /* tmp buffer stream */
217 char* cmd[1024]; /* request command argv */
218 char req[8 * 1024]; /* request buffer */
219 } State_t;
220
221 #define REQ_all 1
222 #define REQ_clear 2
223 #define REQ_delete 3
224 #define REQ_exit 4
225 #define REQ_hold 5
226 #define REQ_info 6
227 #define REQ_list 7
228 #define REQ_next 8
229 #define REQ_raise 9
230 #define REQ_release 10
231 #define REQ_set 11
232 #define REQ_stop 12
233 #define REQ_test 13
234 #define REQ_wait 14
235
236 static const Request_t requests[] =
237 {
238 "a*ll", REQ_all, 1, 1,
239 "c*lear", REQ_clear, 1, -1,
240 "d*elete", REQ_delete, 1, -1,
241 "e*xit", REQ_exit, 0, 0,
242 "hold", REQ_hold, 0, -1,
243 "i*nfo", REQ_info, 0, 0,
244 "l*ist", REQ_list, 0, 1,
245 "n*ext", REQ_next, 0, 0,
246 "q*uit", REQ_exit, 0, 0,
247 "r*aise", REQ_raise, 1, -1,
248 "release", REQ_release, 0, -1,
249 "s*et", REQ_set, 0, 0,
250 "stop", REQ_stop, 0, 0,
251 "t*est", REQ_test, 1, 1,
252 "w*ait", REQ_wait, 1, 1,
253 };
254
255 /*
256 * generate a user response and log message
257 */
258
259 static void
log(State_t * state,Connection_t * con,int type,const char * format,...)260 log(State_t* state, Connection_t* con, int type, const char* format, ...)
261 {
262 va_list ap;
263 char* s;
264 int n;
265
266 va_start(ap, format);
267 if (format)
268 sfvprintf(state->tmp, format, ap);
269 va_end(ap);
270 if (type)
271 {
272 if (!(s = sfstruse(state->tmp)))
273 error(ERROR_SYSTEM|3, "out of space");
274 if (type != 'I' && state->log && state->logf)
275 sfprintf(state->logf, "%s (%03d) %c %s\n", fmttime("%K", time(NiL)), con ? con->fd : 0, toupper(type), s);
276 if (con && type != 'R' && type != 'S')
277 {
278 if (type != 'L' || !con->quiet)
279 debug_printf(con->fd, "%c %s\n", toupper(type), s);
280 if (type == 'W')
281 con->code |= 1;
282 else if (type == 'E')
283 con->code |= 2;
284 }
285 }
286 }
287
288 /*
289 * accept a new connection
290 */
291
292 static int
acceptf(Css_t * css,Cssfd_t * fp,Csid_t * ip,char ** av,Cssdisc_t * disc)293 acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
294 {
295 register State_t* state = (State_t*)disc;
296 register Connection_t* con;
297
298 if (!(con = newof(0, Connection_t, 1, 0)))
299 return -1;
300 fp->data = con;
301 con->id = *ip;
302 con->waiting = 0;
303 con->fd = fp->fd;
304 state->active++;
305 dtinsert(state->connections, con);
306 log(state, con, 'S', "accept connection -- %d active", state->active);
307 return fp->fd;
308 }
309
310 /*
311 * notify connections waiting on ep
312 */
313
314 static int
notify(State_t * state,Event_t * ep)315 notify(State_t* state, Event_t* ep)
316 {
317 Connection_t* cp;
318 Waiting_t* wp;
319 char* s;
320 size_t n;
321
322 for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
323 if (cp->waiting && (wp = (Waiting_t*)dtmatch(cp->waiting, &ep)))
324 {
325 if (wp->id >= 0)
326 {
327 log(state, cp, 'x', "%d 0", wp->id);
328 n = sfstrtell(state->usrf);
329 if (!(s = sfstruse(state->usrf)))
330 error(ERROR_SYSTEM|3, "out of space");
331 write(cp->fd, s, n);
332 }
333 else if (!cp->quiet)
334 log(state, cp, 'i', "%s raised", ep->name);
335 n = ep->waiting == 1;
336 dtdelete(cp->waiting, wp);
337 if (n)
338 break;
339 }
340 return 0;
341 }
342
343 /*
344 * post pending event name for connection
345 */
346
347 static int
post(State_t * state,Connection_t * con,const char * name,int id)348 post(State_t* state, Connection_t* con, const char* name, int id)
349 {
350 Event_t* ep;
351 Waiting_t* wp;
352
353 if (!con->waiting && !(con->waiting = dtopen(&state->waitdisc, Dtset)))
354 {
355 error(ERROR_SYSTEM|3, "out of space [waiting]");
356 return -1;
357 }
358 if (ep = dtmatch(state->events, name))
359 {
360 if (dtmatch(con->waiting, &ep))
361 return 0;
362 }
363 else if (!(ep = newof(0, Event_t, 1, 0)))
364 {
365 error(ERROR_SYSTEM|3, "out of space [event]");
366 return -1;
367 }
368 else
369 {
370 strcpy(ep->name, name);
371 dtinsert(state->events, ep);
372 }
373 if (!(wp = newof(0, Waiting_t, 1, 0)))
374 {
375 error(ERROR_SYSTEM|3, "out of space [waiting]");
376 return -1;
377 }
378 ep->waiting++;
379 wp->id = id;
380 wp->event = ep;
381 dtinsert(con->waiting, wp);
382 return 0;
383 }
384
385 /*
386 * list server info/state
387 */
388
389 static int
info(State_t * state,Connection_t * con,Css_t * css)390 info(State_t* state, Connection_t* con, Css_t* css)
391 {
392 Connection_t* cp;
393 Waiting_t* wp;
394 int n;
395
396 log(state, con, 'I', "info server='%s' version=%d.%d host=%s pid=%d uid=%d gid=%d", fmtident(usage), EVENT_MAJOR, EVENT_MINOR, csname(css->state, 0), getpid(), geteuid(), getegid());
397 log(state, con, 'I', "info active=%d", state->active);
398 for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
399 if (cp->waiting && (n = dtsize(cp->waiting)) > 0)
400 {
401 log(state, con, 0, "waiting connection=%d count=%d", cp->fd, n);
402 for (wp = (Waiting_t*)dtfirst(cp->waiting); wp; wp = (Waiting_t*)dtnext(cp->waiting, wp))
403 log(state, con, 0, " %s", wp->event->name);
404 log(state, con, 'I', 0);
405 }
406 log(state, con, 'I', "done");
407 return 0;
408 }
409
410 static int request(State_t*, Connection_t*, int, int, char**, unsigned long, unsigned long);
411
412 /*
413 * apply request r to one key
414 */
415
416 static int
apply(State_t * state,Connection_t * con,int id,int index,datum key,datum val,Data_t * dat)417 apply(State_t* state, Connection_t* con, int id, int index, datum key, datum val, Data_t* dat)
418 {
419 Event_t* e;
420 int n;
421
422 switch (index)
423 {
424 case REQ_clear:
425 dat->flags |= DATA_clear;
426 dat->time = time(NiL);
427 val.dptr = (void*)dat;
428 val.dsize = sizeof(*dat);
429 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
430 log(state, con, 'L', "%s cleared", key.dptr);
431 else if (!dbm_error(state->dbm))
432 log(state, con, 'W', "%s unchanged", key.dptr);
433 else
434 {
435 dbm_clearerr(state->dbm);
436 log(state, con, 'E', "%s io error", key.dptr);
437 }
438 break;
439 case REQ_delete:
440 if (!dbm_delete(state->dbm, key))
441 {
442 log(state, con, 'L', "%s deleted", key.dptr);
443 return 1;
444 }
445 else if (!dbm_error(state->dbm))
446 log(state, con, 'W', "%s not in db", key.dptr);
447 else
448 {
449 dbm_clearerr(state->dbm);
450 log(state, con, 'E', "%s io error", key.dptr);
451 }
452 break;
453 case REQ_hold:
454 dat->flags |= DATA_hold;
455 dat->time = time(NiL);
456 val.dptr = (void*)dat;
457 val.dsize = sizeof(*dat);
458 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
459 log(state, con, 'L', "%s held", key.dptr);
460 else if (!dbm_error(state->dbm))
461 log(state, con, 'W', "%s unchanged", key.dptr);
462 else
463 {
464 dbm_clearerr(state->dbm);
465 log(state, con, 'E', "%s io error", key.dptr);
466 }
467 break;
468 case REQ_raise:
469 dat->flags &= ~DATA_clear;
470 dat->time = time(NiL);
471 dat->raise++;
472 val.dptr = (void*)dat;
473 val.dsize = sizeof(*dat);
474 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
475 {
476 if (!state->hold && (e = (Event_t*)dtmatch(state->events, key.dptr)))
477 notify(state, e);
478 log(state, con, 'I', "%s raised", key.dptr);
479 }
480 else if (!dbm_error(state->dbm))
481 log(state, con, 'W', "%s unchanged", key.dptr);
482 else
483 {
484 dbm_clearerr(state->dbm);
485 log(state, con, 'E', "%s io error", key.dptr);
486 }
487 break;
488 case REQ_release:
489 if (dat->flags & DATA_hold)
490 {
491 dat->flags &= ~DATA_hold;
492 if (dat->raise)
493 {
494 val.dptr = (void*)dat;
495 val.dsize = sizeof(*dat);
496 if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
497 log(state, con, 'L', "%s released", key.dptr);
498 else if (!dbm_error(state->dbm))
499 log(state, con, 'W', "%s unchanged", key.dptr);
500 else
501 {
502 dbm_clearerr(state->dbm);
503 log(state, con, 'E', "%s io error", key.dptr);
504 }
505 if (e = (Event_t*)dtmatch(state->events, key.dptr))
506 notify(state, e);
507 }
508 else if (!dbm_delete(state->dbm, key))
509 log(state, con, 'L', "%s deleted", key.dptr);
510 else if (!dbm_error(state->dbm))
511 log(state, con, 'W', "%s not in db", key.dptr);
512 else
513 {
514 dbm_clearerr(state->dbm);
515 log(state, con, 'E', "%s io error", key.dptr);
516 }
517 }
518 break;
519 case REQ_test:
520 if (val.dptr && !(dat->flags & DATA_clear))
521 {
522 if (state->hold)
523 log(state, con, 'W', "%s global hold", key.dptr);
524 else if (dat->flags & DATA_hold)
525 log(state, con, 'W', "%s explicit hold", key.dptr);
526 else
527 log(state, con, 'I', "%s raised", key.dptr);
528 }
529 else
530 log(state, con, 'I', "%s not-raised", key.dptr);
531 break;
532 case REQ_wait:
533 if (val.dptr && !state->hold && !(dat->flags & (DATA_clear|DATA_hold)))
534 log(state, con, 'I', "%s raised", key.dptr);
535 else if (post(state, con, key.dptr, id))
536 return -1;
537 break;
538 }
539 return 0;
540 }
541
542 /*
543 * apply request r to args a
544 */
545
546 static int
request(State_t * state,Connection_t * con,int id,int index,char ** a,unsigned long older,unsigned long newer)547 request(State_t* state, Connection_t* con, int id, int index, char** a, unsigned long older, unsigned long newer)
548 {
549 char* s;
550 int i;
551 Event_t* e;
552 datum key;
553 datum val;
554 Data_t dat;
555 regex_t re;
556 char buf[64];
557
558 while (s = *a++)
559 if (i = regcomp(&re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
560 {
561 regerror(i, &re, buf, sizeof(buf));
562 log(state, con, 'E', "%s: %s", s, buf);
563 }
564 else if (regstat(&re)->re_info & REG_LITERAL)
565 {
566 if (!EVENT(s))
567 {
568 log(state, con, 'E', "%s invalid event name", s);
569 return -1;
570 }
571 key.dptr = (void*)s;
572 key.dsize = strlen(s) + 1;
573 if (key.dsize >= sizeof(e->name))
574 s[(key.dsize = sizeof(e->name)) - 1] = 0;
575 val = dbm_fetch(state->dbm, key);
576 if (val.dptr)
577 {
578 if (val.dsize > sizeof(dat))
579 val.dsize = sizeof(dat);
580 swapmem(state->swap, val.dptr, &dat, sizeof(dat));
581 }
582 else
583 memset(&dat, 0, sizeof(dat));
584 if (apply(state, con, id, index, key, val, &dat))
585 return -1;
586 }
587 else
588 {
589 rescan:
590 for (key = dbm_firstkey(state->dbm); key.dptr; key = dbm_nextkey(state->dbm))
591 if (EVENT(key.dptr) && !regexec(&re, key.dptr, 0, NiL, 0))
592 {
593 val = dbm_fetch(state->dbm, key);
594 if (val.dsize > sizeof(dat))
595 val.dsize = sizeof(dat);
596 swapmem(state->swap, val.dptr, &dat, val.dsize);
597 if ((!older || dat.time < older) && (!newer || dat.time > newer))
598 {
599 if ((i = apply(state, con, id, index, key, val, &dat)) < 0)
600 return -1;
601 if (i > 0)
602 goto rescan;
603 }
604 }
605 }
606 return 0;
607 }
608
609 /*
610 * convert s to a date/time
611 */
612
613 static unsigned long
date(State_t * state,Connection_t * con,const char * s)614 date(State_t* state, Connection_t* con, const char* s)
615 {
616 unsigned long t;
617 char* e;
618 datum key;
619 datum val;
620 Data_t dat;
621
622 key.dptr = (void*)s;
623 key.dsize = strlen(s) + 1;
624 val = dbm_fetch(state->dbm, key);
625 if (val.dptr)
626 {
627 swapmem(state->swap, val.dptr, &dat, val.dsize);
628 t = dat.time;
629 }
630 else
631 {
632 t = tmdate(s, &e, NiL);
633 if (*e)
634 {
635 log(state, con, 'E', "%s: invalid date/time", s);
636 t = 0;
637 }
638 }
639 return t;
640 }
641
642 /*
643 * service a request
644 */
645
646 static int
actionf(register Css_t * css,register Cssfd_t * fp,Cssdisc_t * disc)647 actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
648 {
649 register State_t* state = (State_t*)disc;
650 register Connection_t* con;
651 char* s;
652 char* t;
653 char** a;
654 char** q;
655 Cssfd_t* f;
656 Request_t* r;
657 Event_t* e;
658 Waiting_t* w;
659 Connection_t* x;
660 int n;
661 int err;
662 int id;
663 unsigned long older;
664 unsigned long newer;
665 datum key;
666 datum val;
667 Data_t data;
668 char buf[64];
669
670 switch (fp->status)
671 {
672 case CS_POLL_CLOSE:
673 if (con = (Connection_t*)fp->data)
674 dtdelete(state->connections, con);
675 return 0;
676 case CS_POLL_READ:
677 con = (Connection_t*)fp->data;
678 if ((n = csread(css->state, fp->fd, state->req, sizeof(state->req), CS_LINE)) <= 0)
679 return -1;
680 state->req[--n] = 0;
681 log(state, con, 'R', "%s", state->req);
682 con->code = 0;
683 if (tokscan(state->req, NiL, " %v ", state->cmd, elementsof(state->cmd) - 1) > 0)
684 {
685 id = -1;
686 for (q = state->cmd; (s = *q) && (isalpha(*s) || *s == '_'); q++)
687 {
688 while (isalnum(*++s));
689 if (*s != '=')
690 break;
691 if ((s - *q) == 2 && *(s - 1) == 'd' && *(s - 2) == 'i')
692 id = (int)strtol(s + 1, NiL, 0);
693 }
694 s = *(a = q);
695 if (!(r = (Request_t*)strpsearch(requests, elementsof(requests), sizeof(requests[0]), s, NiL)))
696 log(state, con, 'E', "%s: unknown request", s);
697 else
698 {
699 opt_info.index = 0;
700 newer = older = 0;
701 err = 0;
702 sfstrseek(state->usrf, 0, SEEK_SET);
703 for (;;)
704 {
705 switch (optget(a, usage))
706 {
707 case 'e':
708 if (r->index == REQ_set)
709 {
710 state->expire = strelapsed(opt_info.arg, &t, 1);
711 if (*t)
712 {
713 log(state, con, 'E', "%s: invalid elapsed time expression", opt_info.arg);
714 err = 1;
715 break;
716 }
717 }
718 continue;
719 case 'l':
720 if (r->index == REQ_set)
721 state->log = opt_info.num;
722 continue;
723 case 'n':
724 newer = date(state, con, opt_info.arg);
725 continue;
726 case 'o':
727 older = date(state, con, opt_info.arg);
728 continue;
729 case 'q':
730 con->quiet = opt_info.num;
731 continue;
732 case '?':
733 case ':':
734 log(state, con, 'E', "%s: %s", s, opt_info.arg);
735 err = 1;
736 break;
737 }
738 break;
739 }
740 if (!err)
741 {
742 if (!*(a += opt_info.index))
743 {
744 if (newer || older)
745 {
746 a[0] = "*";
747 a[1] = 0;
748 n = 1;
749 }
750 else
751 n = 0;
752 }
753 else
754 n = a[1] ? 2 : 1;
755 if (r->min && n < r->min)
756 sfprintf(state->usrf, "E %s: at least %d argument%s expected\n", s, r->min, r->min == 1 ? "" : "s");
757 else if (r->max > 0 && n > r->max)
758 log(state, con, 'E', "%s: at most %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
759 else if (r->min == r->max && n != r->max)
760 log(state, con, 'E', "%s: %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
761 else
762 switch (r->index)
763 {
764 case REQ_all:
765 n = (int)strtol(a[0], &t, 0);
766 if (*t)
767 {
768 log(state, con, 'E', "%s: invalid numeric value", a[0]);
769 break;
770 }
771 else if (!(f = cssfd(css, n, 0)) || !(x = (Connection_t*)f->data))
772 {
773 log(state, con, 'E', "%d: invalid connection index", n);
774 break;
775 }
776 if (x->waiting)
777 {
778 n = x->quiet;
779 x->quiet = 1;
780 a = state->cmd;
781 for (w = (Waiting_t*)dtfirst(x->waiting); w; w = (Waiting_t*)dtnext(x->waiting, w))
782 {
783 if (a >= &state->cmd[elementsof(state->cmd)-1])
784 {
785 *a = 0;
786 if (request(state, x, -1, REQ_raise, a = state->cmd, older, newer))
787 break;
788 }
789 *a++ = w->event->name;
790 log(state, con, 'R', "%s %s", s, w->event->name);
791 }
792 if (a > state->cmd)
793 {
794 *a = 0;
795 request(state, x, -1, REQ_raise, state->cmd, older, newer);
796 }
797 x->quiet = n;
798 }
799 log(state, con, 'I', "done");
800 break;
801 case REQ_clear:
802 case REQ_delete:
803 case REQ_raise:
804 case REQ_test:
805 case REQ_wait:
806 if (request(state, con, id, r->index, a, older, newer))
807 return -1;
808 break;
809 case REQ_exit:
810 cssfd(css, fp->fd, CS_POLL_CLOSE);
811 break;
812 case REQ_info:
813 info(state, con, css);
814 break;
815 case REQ_hold:
816 if (!*a)
817 {
818 state->hold = 1;
819 sfprintf(state->usrf, "I holding\n");
820 }
821 else
822 if (request(state, con, id, r->index, a, older, newer))
823 return -1;
824 break;
825 case REQ_list:
826 con->all = 1;
827 if (s = *a)
828 {
829 if (n = regcomp(&con->re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
830 {
831 regerror(n, &con->re, buf, sizeof(buf));
832 log(state, con, 'E', "%s: %s", s, buf);
833 break;
834 }
835 con->all = 0;
836 }
837 con->list = dbm_firstkey(state->dbm);
838 if (!con->list.dptr)
839 {
840 log(state, con, 'I', "empty");
841 break;
842 }
843 con->newer = newer;
844 con->older = older;
845 goto list;
846 case REQ_next:
847 if (!con->list.dptr)
848 {
849 log(state, con, 'W', "next: must execute list first");
850 break;
851 }
852 for (;;)
853 {
854 con->list = dbm_nextkey(state->dbm);
855 if (!con->list.dptr)
856 {
857 log(state, con, 'I', "done");
858 break;
859 }
860 list:
861 if (EVENT(con->list.dptr) && (con->all || !regexec(&con->re, con->list.dptr, 0, NiL, 0)))
862 {
863 val = dbm_fetch(state->dbm, con->list);
864 if (val.dsize > sizeof(data))
865 val.dsize = sizeof(data);
866 swapmem(state->swap, val.dptr, &data, val.dsize);
867 if ((!con->older || data.time < con->older) && (!con->newer || data.time > con->newer))
868 {
869 log(state, con, 'I', "event %s %s %d%s%s", con->list.dptr, fmttime("%K", data.time), data.raise, (data.flags & DATA_clear) ? " CLEAR" : "", (data.flags & DATA_hold) ? " HOLD" : "");
870 break;
871 }
872 }
873 }
874 break;
875 case REQ_release:
876 if (!*a)
877 {
878 state->hold = 0;
879 sfprintf(state->usrf, "I released\n");
880 key = dbm_firstkey(state->dbm);
881 while (key.dptr)
882 {
883 val = dbm_fetch(state->dbm, key);
884 if (val.dsize > sizeof(data))
885 val.dsize = sizeof(data);
886 swapmem(state->swap, val.dptr, &data, val.dsize);
887 if (!(data.flags & (DATA_clear|DATA_hold)) && (e = (Event_t*)dtmatch(state->events, key.dptr)))
888 notify(state, e);
889 key = dbm_nextkey(state->dbm);
890 }
891 }
892 else
893 if (request(state, con, id, r->index, a, older, newer))
894 return -1;
895 break;
896 case REQ_set:
897 break;
898 case REQ_stop:
899 exit(0);
900 break;
901 }
902 }
903 }
904 if (sfstrtell(state->usrf))
905 {
906 if (id >= 0)
907 {
908 sfstrseek(state->usrf, 0, SEEK_SET);
909 log(state, con, 'x', "%d %d", id, con->code);
910 }
911 n = sfstrtell(state->usrf);
912 if (!(s = sfstruse(state->usrf)))
913 error(ERROR_SYSTEM|3, "out of space");
914 if (cswrite(css->state, fp->fd, s, n) != n)
915 return -1;
916 }
917 }
918 return 1;
919 }
920 return 0;
921 }
922
923 /*
924 * handle exceptions
925 */
926
927 static int
exceptf(Css_t * css,unsigned long op,unsigned long arg,Cssdisc_t * disc)928 exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
929 {
930 register State_t* state = (State_t*)disc;
931
932 switch (op)
933 {
934 case CSS_CLOSE:
935 if (state->dbm)
936 {
937 dbm_close(state->dbm);
938 state->dbm = 0;
939 }
940 return 0;
941 case CSS_INTERRUPT:
942 error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
943 return 0;
944 case CSS_WAKEUP:
945 error(2, "wakeup");
946 return 0;
947 }
948 error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
949 return -1;
950 }
951
952 /*
953 * free connection
954 */
955
956 static void
confree(Dt_t * dt,void * obj,Dtdisc_t * disc)957 confree(Dt_t* dt, void* obj, Dtdisc_t* disc)
958 {
959 State_t* state = (State_t*)((char*)disc - offsetof(State_t, condisc));
960 Connection_t* con = (Connection_t*)obj;
961
962 NoP(dt);
963 NoP(disc);
964 state->active--;
965 if (con->waiting)
966 dtclose(con->waiting);
967 log(state, con, 'S', "drop connection -- %d active", state->active);
968 free(obj);
969 }
970
971 /*
972 * free event
973 */
974
975 static void
eventfree(Dt_t * dt,void * obj,Dtdisc_t * disc)976 eventfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
977 {
978 NoP(dt);
979 NoP(disc);
980 free(obj);
981 }
982
983 /*
984 * free connection pending event
985 */
986
987 static void
waitfree(Dt_t * dt,void * obj,Dtdisc_t * disc)988 waitfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
989 {
990 State_t* state = (State_t*)((char*)disc - offsetof(State_t, waitdisc));
991 Waiting_t* p = (Waiting_t*)obj;
992
993 NoP(dt);
994 if (--p->event->waiting == 0)
995 dtdelete(state->events, p->event);
996 free(obj);
997 }
998
999 /*
1000 * open and verify event db
1001 */
1002
1003 static void
db(register State_t * state)1004 db(register State_t* state)
1005 {
1006 datum key;
1007 datum val;
1008 Data_t data;
1009 uint32_t u4;
1010
1011 if (!(state->dbm = dbm_open(state->path, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)))
1012 error(ERROR_SYSTEM|3, "%s: cannot open database for read/write", state->path);
1013 key.dptr = (void*)ident_key;
1014 key.dsize = sizeof(ident_key);
1015 val = dbm_fetch(state->dbm, key);
1016 if (val.dptr)
1017 {
1018 if (val.dsize != sizeof(data))
1019 error(3, "%s: invalid db -- data size %d, expected %d", state->path, val.dsize, sizeof(data));
1020 memcpy(&data, val.dptr, val.dsize);
1021 if (memcmp(&data.expire, ident_name, sizeof(data.expire)))
1022 error(3, "%s: %s: invalid db -- ident mismatch, expected %s", state->path, ident_key, ident_name);
1023 u4 = IDENT_SWAP;
1024 if (state->swap = swapop(&u4, &data.time, 4))
1025 swapmem(state->swap, &data, &data, sizeof(data));
1026 }
1027 else
1028 {
1029 val = dbm_firstkey(state->dbm);
1030 if (val.dptr)
1031 error(3, "%s: %s: invalid db -- ident entry expected", state->path, ident_key);
1032 memset(&data, 0, sizeof(data));
1033 memcpy(&data.expire, ident_name, sizeof(data.expire));
1034 data.time = IDENT_SWAP;
1035 data.raise = IDENT_VERSION;
1036 val.dptr = (void*)&data;
1037 val.dsize = sizeof(data);
1038 if (dbm_store(state->dbm, key, val, DBM_INSERT))
1039 {
1040 dbm_clearerr(state->dbm);
1041 error(3, "%s: %s: db initial ident entry store failed", state->path, ident_key);
1042 }
1043 }
1044 state->major = (data.raise >> 16) & 0xffff;
1045 state->minor = data.raise & 0xffff;
1046 }
1047
1048 /*
1049 * client/server main
1050 */
1051
1052 int
main(int argc,char ** argv)1053 main(int argc, char** argv)
1054 {
1055 char* s;
1056 Css_t* css;
1057
1058 char* p = 0;
1059 int server = 0;
1060
1061 static State_t state;
1062
1063 NoP(argc);
1064 setlocale(LC_ALL, "");
1065 opt_info.argv = argv;
1066 state.log = 1;
1067 error_info.id = (char*)command;
1068 if (!(state.usrf = sfstropen()) || !(state.tmp = sfstropen()))
1069 error(3, "out of space [buf]");
1070
1071 /*
1072 * check the options
1073 */
1074
1075 for (;;)
1076 {
1077 switch (optget(argv, usage))
1078 {
1079 case 'c':
1080 p = opt_info.arg;
1081 continue;
1082 case 'e':
1083 state.expire = strelapsed(opt_info.arg, &s, 1);
1084 if (*s)
1085 error(2, "%s: invalid elapsed time expression", opt_info.arg);
1086 continue;
1087 case 'i':
1088 server = 1;
1089 continue;
1090 case 'l':
1091 state.log = opt_info.num;
1092 continue;
1093 case '?':
1094 error(ERROR_USAGE|4, "%s", opt_info.arg);
1095 continue;
1096 case ':':
1097 error(2, "%s", opt_info.arg);
1098 continue;
1099 }
1100 break;
1101 }
1102 if (error_info.errors || !(state.path = *(argv += opt_info.index)))
1103 error(ERROR_USAGE|4, "%s", optusage(NiL));
1104
1105 /*
1106 * get the connect stream path
1107 */
1108
1109 if (s = strrchr(state.path, '/'))
1110 s++;
1111 else
1112 s = state.path;
1113 if (p)
1114 sfprintf(state.usrf, "%s/%s", p, s);
1115 else
1116 sfprintf(state.usrf, "/dev/tcp/local/%s/%s", error_info.id, s);
1117 if (!(state.service = strdup(sfstruse(state.usrf))))
1118 error(3, "out of space [service]");
1119
1120 /*
1121 * either server or client at this point
1122 */
1123
1124 if (server)
1125 {
1126 umask(S_IWOTH);
1127 db(&state);
1128 state.condisc.link = offsetof(Event_t, link);
1129 state.condisc.freef = confree;
1130 if (!(state.connections = dtopen(&state.condisc, Dtlist)))
1131 error(ERROR_SYSTEM|3, "out of space [connection dictionary]");
1132 state.eventdisc.link = offsetof(Event_t, link);
1133 state.eventdisc.key = offsetof(Event_t, name);
1134 state.eventdisc.freef = eventfree;
1135 if (!(state.events = dtopen(&state.eventdisc, Dtoset)))
1136 error(ERROR_SYSTEM|3, "out of space [event dictionary]");
1137 state.waitdisc.link = offsetof(Waiting_t, link);
1138 state.waitdisc.key = offsetof(Waiting_t, event);
1139 state.waitdisc.size = sizeof(Event_t*);
1140 state.waitdisc.freef = waitfree;
1141 state.disc.version = CSS_VERSION;
1142 state.disc.flags = CSS_DAEMON|CSS_LOG|CSS_ERROR|CSS_INTERRUPT|CSS_WAKEUP|CSS_PRESERVE;
1143 state.disc.timeout = 60 * 60 * 1000L;
1144 state.disc.acceptf = acceptf;
1145 state.disc.actionf = actionf;
1146 state.disc.errorf = errorf;
1147 state.disc.exceptf = exceptf;
1148 if (!(css = cssopen(state.service, &state.disc)))
1149 return 1;
1150 umask(S_IWOTH);
1151 error_info.id = css->id;
1152 if (state.log)
1153 {
1154 sfprintf(state.tmp, "%s.log", state.path);
1155 if (!(s = sfstruse(state.tmp)))
1156 error(ERROR_SYSTEM|3, "out of space");
1157 if (state.logf = sfopen(NiL, s, "a"))
1158 sfset(state.logf, SF_LINE, 1);
1159 else
1160 error(ERROR_SYSTEM|2, "%s: cannot append log file", s);
1161 }
1162 log(&state, 0, 'S', "start service %s", fmtident(usage));
1163 csspoll(CS_NEVER, 0);
1164 log(&state, 0, 'S', "stop service");
1165 return 1;
1166 }
1167 return csclient(&cs, -1, state.service, "event> ", argv + 1, CS_CLIENT_ARGV|CS_CLIENT_SEP);
1168 }
1169
1170 #endif
1171