1 /*
2 * epos/src/agent.cc
3 * (c) 1998-01 geo@cuni.cz
4 *
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License in doc/COPYING for more details.
14 *
15 */
16
17 #include "epos.h"
18 #include "agent.h"
19 #include "client.h"
20 #include "slab.h"
21
22 #ifdef HAVE_SYS_TIME_H
23 #include <sys/time.h>
24 #endif
25
26 #ifdef HAVE_UNIX_H
27 #include <unix.h>
28 #endif
29
30 #ifdef HAVE_SYS_SOCKET_H
31 #include <sys/socket.h>
32 #endif
33
34 #ifndef HAVE_SOCKLEN_T
35 #define socklen_t int
36 #endif
37
38 #ifdef HAVE_WINSOCK2_H
39 #include <winsock2.h>
40 #else
41 #ifdef HAVE_WINSOCK_H
42 #include <winsock.h>
43 #endif
44 #endif
45
46 #ifdef HAVE_SYS_STAT_H
47 #include <sys/stat.h>
48 #endif
49
50 #ifdef HAVE_FCNTL_H
51 #include <fcntl.h>
52 #endif
53
54 #ifdef HAVE_ERRNO_H
55 #include <errno.h>
56 #endif
57
58 #ifndef O_NONBLOCK
59 #define O_NONBLOCK 0
60 #endif
61
62
63 #define DARK_ERRLOG 2 /* 2 == stderr; for global stdshriek and stddbg output */
64
65 #ifdef HAVE_GETTIMEOFDAY
66
agent_profile(const char * s)67 inline void agent_profile(const char *s)
68 {
69 if (!scfg->profile || !*scfg->profile) return;
70
71 static FILE *log = NULL;
72 if (!log) log = fopen(scfg->profile, "w", "profile");
73 static struct timeval start, stop;
74 if (!s) {
75 if (gettimeofday(&start, NULL)) shriek(861, "profiler fails");
76 int duration = start.tv_sec - stop.tv_sec;
77 duration *= 1000000;
78 duration += start.tv_usec - stop.tv_usec;
79 fprintf(log, "%10ld", duration);
80 fflush(log);
81 if (gettimeofday(&start, NULL)) shriek(861, "profiler fails");
82 } else {
83 if (gettimeofday(&stop, NULL)) shriek(861, "profiler fails");
84 int duration = stop.tv_sec - start.tv_sec;
85 duration *= 1000000;
86 duration += stop.tv_usec - start.tv_usec;
87 fprintf(log, " %-13s%8ld\n", s, duration);
88 fflush(log);
89 if (gettimeofday(&stop, NULL)) shriek(861, "profiler fails");
90 }
91 }
92
93 #else
agent_profile(const char * s)94 inline void agent_profile(const char *s) { return; }
95 #endif
96
97
agent(DATA_TYPE typein,DATA_TYPE typeout)98 agent::agent(DATA_TYPE typein, DATA_TYPE typeout)
99 {
100 in = typein, out = typeout;
101 next = prev = NULL, inb = NULL;
102 pendin = pendout = NULL; pendcount = 0;
103 c = NULL;
104 dep = NULL;
105 D_PRINT(1, "Creating a handler, intype %i, outtype %i\n", typein, typeout);
106 }
107
~agent()108 agent::~agent()
109 {
110 // D_PRINT(1, "Handler deleted.\n");
111 }
112
113 inline void
unquench()114 agent::unquench()
115 {
116 if (inb || pendin) schedule();
117 /* can not unquench input agents */
118 }
119
120 void
timeslice()121 agent::timeslice()
122 {
123 c->enter();
124 D_PRINT(1, "Timeslice for %s\n", name());
125 D_PRINT(0, "pendcount %d, next->pendcount %d, pend_max %d\n", pendcount, next ? next->pendcount : -42, scfg->pend_max);
126 if (next && next->pendcount > scfg->pend_max) {
127 D_PRINT(1, "(satiated)\n");
128 c->leave();
129 return;
130 }
131 if (!inb && in != T_NONE) {
132 if (!pendin) {
133 /* We may take this path legally after an intr command */
134 D_PRINT(2, "No input of type %i; shrugging off\n", in);
135 c->leave();
136 return;
137 }
138 D_PRINT(2, "Getting pending task, type %i\n", in);
139 pend_ll *tmp = pendin;
140 inb = tmp->d;
141 pendin = tmp->next;
142 delete tmp;
143 if (pendcount-- == scfg->pend_min) prev->unquench();
144 if (!pendin) {
145 prev->pendout = NULL;
146 if (pendcount) shriek(862, "pending count incorrect");
147 }
148 }
149 try {
150 agent_profile(NULL);
151 run();
152 agent_profile(name());
153 } catch (command_failed *e) {
154 if (!next) throw e;
155 D_PRINT(2, "Processing failed, %d, %.60s\n", e->code, e->msg);
156 reply(e->code, e->msg);
157 delete e;
158 finis(true);
159 }
160 if (pendcount && !inb) schedule();
161 c->leave();
162
163 agent *tmp;
164 for (agent *a = dep; a; a = tmp) {
165 D_PRINT(1, "Scheduling %s because of %s\n", a->name(), name());
166 tmp = a->dep;
167 a->schedule();
168 a->dep = NULL;
169 }
170 dep = NULL;
171 }
172
173 bool
mktask(int)174 agent::mktask(int)
175 {
176 return false; // except for the input agent, agents can't start a task
177 }
178
do_relax(void * ptr,DATA_TYPE type)179 inline void do_relax(void *ptr, DATA_TYPE type)
180 {
181 if (ptr) switch(type) {
182 case T_NONE: break;
183 case T_INPUT: free(ptr); break;
184 case T_STML:
185 case T_TEXT: free(ptr); break;
186 case T_UNITS: delete ((unit *)ptr); break;
187 case T_SEGS: free(ptr); break;
188 case T_SSIF: free(ptr); break;
189 case T_WAVEFM: delete((wavefm *)ptr); break;
190 }
191 }
192
193 void
relax()194 agent::relax()
195 {
196 if (inb) {
197 do_relax(inb, in); /* branch likely */
198 inb = NULL;
199 }
200 if (!next) return;
201 for (pend_ll *p = next->pendin; p != NULL; ) {
202 do_relax(p->d, out);
203 pend_ll *n = p->next;
204 next->pendcount--;
205 delete p;
206 p = n;
207 }
208 pendout = next->pendin = NULL;
209 if (next->pendcount) shriek(862, "pending count incorrect");
210 }
211
212 void
finis(bool err)213 agent::finis(bool err)
214 {
215 if (!next) shriek(861, "Non-module finished");
216 agent *a = next;
217 while (a->next) a = a->next;
218 a->finis(err);
219 }
220
221 /*
222 * There is a slight difference between relax() and brk(). Both discard all
223 * pending input. In addition, oa_wavefm::brk() (only) discards any
224 * waveform already written to the sound card;
225 * relax() would call sync_soundcard() instead.
226 */
227
brk()228 bool agent::brk()
229 {
230 relax();
231 D_PRINT(1, "interrupting an agent, intype %i, outtype %i\n", in, out);
232 return true;
233 }
234
235 void
pass(void * ptr)236 agent::pass(void *ptr)
237 {
238 if (!ptr) shriek(862, "Nothing to pass");
239 if (!next) shriek(862, "Nowhere to pass to");
240 if (pendout || next->inb) {
241 pendout = (pendout ? pendout->next : next->pendin) = new pend_ll(ptr, NULL);
242 next->pendcount++;
243 } else {
244 next->inb = ptr;
245 next->schedule();
246 }
247 }
248
249 class a_ascii : public agent
250 {
251 virtual void run();
name()252 virtual const char *name() { return "raw"; };
253 public:
a_ascii()254 a_ascii() : agent(T_TEXT, T_UNITS) {};
255 };
256
257 void
run()258 a_ascii::run()
259 {
260 void *r = str2units((char *)inb);
261 free(inb);
262 inb = NULL;
263 pass(r);
264 }
265
266
267 class a_stml : public agent
268 {
269 virtual void run();
name()270 virtual const char *name() { return "stml"; };
271 public:
a_stml()272 a_stml() : agent(T_STML, T_UNITS) {};
273 };
274
275 void
run()276 a_stml::run()
277 {
278 shriek(462, "STML parser not available");
279 }
280
281
282 class a_rules : public agent
283 {
284 virtual void run();
name()285 virtual const char *name() { return "rules"; };
286 public:
a_rules()287 a_rules() : agent(T_UNITS, T_UNITS) {};
288 };
289
290 void
run()291 a_rules::run()
292 {
293 void *r = inb;
294 inb = NULL;
295 this_lang->ruleset->apply((unit *)r);
296 pass(r);
297 }
298
299
300 class a_print : public agent
301 {
302 virtual void run();
name()303 virtual const char *name() { return "print"; };
304 public:
a_print()305 a_print() : agent(T_UNITS, T_TEXT) {};
306 };
307
308 void
run()309 a_print::run()
310 {
311 char *b;
312
313 b = ((unit *)inb)->gather(false /* no ^$ */, true /* incl. ssegs */ );
314
315 delete (unit *) inb;
316 inb = NULL;
317 pass(get_text_buffer(b));
318 }
319
320
321 class a_segs : public agent
322 {
323 virtual void run();
name()324 virtual const char *name() { return "segs"; };
325 int position;
326 public:
a_segs()327 a_segs() : agent(T_UNITS, T_SEGS) {position = 0;};
328 };
329
330 #define INIT_SEGS_BS 2048
331 #define INIT_SSIF_BS 4096
332
333 void
run()334 a_segs::run()
335 {
336 int sbs = cfg->seg_buff_size ? cfg->seg_buff_size : INIT_SEGS_BS;
337 segment *d = (segment *)xmalloc((sbs + 1) * sizeof(segment));
338 segment *c = d + 1;
339 int n;
340 int items = 0;
341
342 unit *root = *(unit **)&inb;
343 root->project(scfg->_segm_level);
344 again:
345 n = root->write_segs(c, position, sbs);
346 D_PRINT(1, "Writing at most %d segs: wrote %d segs\n", sbs, n);
347 position += n;
348 items += n;
349 if (n >= sbs) {
350 if (cfg->seg_buff_size) {
351 shriek(462, "Cannot combine nonzero seg_buff_size with the traditional SSIF");
352 } else {
353 d = (segment *)xrealloc(d, (sbs + 1 + position) * sizeof(segment));
354 c = d + 1 + position;
355 goto again;
356 }
357 } else {
358 delete root;
359 inb = NULL;
360 position = 0;
361 }
362 d->code = items;
363 d->nothing = d->ll = 0;
364 D_PRINT(1, "agent segs generated %d segments\n", n);
365 pass(d);
366 }
367
368 class a_ssif : public agent
369 {
370 virtual void run();
name()371 virtual const char *name() { return "to-mbrola"; };
372 int position;
373 public:
a_ssif()374 a_ssif() : agent(T_UNITS, T_SSIF) {position = 0;};
375 };
376
377 void
run()378 a_ssif::run()
379 {
380 int ssifbs = cfg->ssif_buff_size ? cfg->ssif_buff_size : INIT_SSIF_BS;
381 char *d = (char *)xmalloc((ssifbs + 1));
382 char *c = d /* + sizeof header */;
383 int n;
384 int items = 0;
385
386 unit *root = *(unit **)&inb;
387 root->project(scfg->_phone_level);
388 again:
389 n = root->write_ssif(c, position, ssifbs);
390 position += n;
391 items += n;
392 if (cfg->ssif_buff_size && n >= ssifbs) {
393 d = (char *)xrealloc(d, (ssifbs + 1 + position));
394 c = d /* + sizeof header */ + strlen(d); /* FIXME: efficiency */
395 goto again;
396 } else {
397 delete root;
398 inb = NULL;
399 position = 0;
400 }
401 D_PRINT(1, "agent ssif generated %d items\n", n);
402 pass(d);
403 }
404
405 class a_chunk : public agent
406 {
407 char *bm;
408 virtual void run();
name()409 virtual const char *name() { return "chunk"; };
410 public:
a_chunk()411 a_chunk() : agent(T_TEXT, T_TEXT) { bm = NULL; };
412 };
413
utt_break(char * t)414 inline char *utt_break(char *t) /* returns ptr past the last char */
415 {
416 char *r = t;
417 do { /* split between . ? ... and whitespace */
418 r += strcspn(r, ".?");
419 r += strspn(r, ".?");
420 } while (!strchr(WHITESPACE, r[0]));
421
422 if (r - t > scfg->max_utterance) {
423 r = t + strcspn(t, ".,?!:;");
424 if (*r) r++;
425 } else return r;
426
427 if (r - t > scfg->max_utterance) {
428 r = t + strcspn(t, ".,?!:;-=+_~@#$%^&*\\|/ \t\n");
429 if (*r) r++;
430 } else return r;
431
432 if (r - t > scfg->max_utterance) {
433 r = t + strcspn(t, "()<>{}[]'\"");
434 } else return r;
435
436 if (r - t > scfg->max_utterance) {
437 if (scfg->split_utterance < (int)strlen(t))
438 r = t + scfg->split_utterance;
439 }
440
441 return r;
442 }
443
run()444 void a_chunk::run()
445 {
446 if (!bm) {
447 bm = (char *)inb;
448 }
449 char *tmp = bm;
450 bm = utt_break(bm);
451 D_PRINT(2, "Utterance chunking about to split off %d bytes\n", bm - tmp);
452 if (bm && *bm) {
453 char h = bm[0];
454 bm[0] = 0;
455 pass(get_text_buffer(tmp));
456 bm[0] = h;
457 schedule();
458 } else {
459 pass(get_text_buffer(tmp));
460 free(inb);
461 inb = NULL;
462 bm = NULL;
463 }
464 }
465
466 class a_join : public agent
467 {
468 char *heldout;
469 virtual void run();
name()470 virtual const char *name() { return "join"; };
471 public:
a_join()472 a_join() : agent(T_TEXT, T_TEXT) { heldout = NULL; };
473 };
474
475 void
run()476 a_join::run()
477 {
478 char *b;
479 char *last = (char *)inb;
480 if (heldout) {
481 b = (char *)xmalloc(strlen(heldout) + strlen(last) + 1);
482 strcpy(b, heldout);
483 strcat(b, last);
484 free(last);
485 free(heldout); heldout = NULL;
486 } else b = last;
487
488 if (*utt_break(b)) {
489 char *decodable = get_text_buffer(b);
490 free(b);
491 pass(decodable);
492 } else heldout = b;
493 }
494
495 class a_synth : public agent
496 {
497 virtual void run();
name()498 virtual const char *name() { return "synth"; };
499 protected:
500 void init_syn();
501 public:
a_synth(DATA_TYPE din,DATA_TYPE dout)502 a_synth(DATA_TYPE din, DATA_TYPE dout) : agent(din, dout) {};
a_synth()503 a_synth() : agent(T_SEGS, T_WAVEFM) {};
504 };
505
fallbackable_error(int code)506 bool fallbackable_error(int code)
507 {
508 switch (this_lang->fallback_mode) {
509 case 0: return false;
510 case 1: return true;
511 case 4: return code == 445 || code / 10 == 47;
512 case 7: return code / 10 == 47;
513 default: return code == this_lang->fallback_mode;
514 }
515 }
516
517 void
init_syn()518 a_synth::init_syn()
519 {
520 try {
521 this_voice->syn = this_voice->setup_synth();
522 } catch (command_failed *e) {
523 if (fallbackable_error(e->code)
524 && this_lang->fallback_voice
525 && *this_lang->fallback_voice) {
526 voice_switch(this_lang->fallback_voice);
527 if (this_lang->permanent_fallbacks) {
528 c->leave();
529 voice_switch(this_lang->fallback_voice);
530 c->enter();
531 }
532 delete e;
533 return;
534 } else throw e;
535 }
536 }
537
538 void
run()539 a_synth::run()
540 {
541 if (!this_voice) shriek(861, "No current voice");
542 while (!this_voice->syn) init_syn(); // at most twice
543 wavefm *wfm = new wavefm(this_voice);
544 this_voice->syn->synsegs(this_voice, (segment *)inb + 1, ((segment*)inb)->code, wfm);
545 D_PRINT(1, "a_synth processes %d segments\n", ((segment *)inb)->code);
546
547 free(inb);
548 inb = NULL;
549 pass(wfm);
550 }
551
552 class a_syn : public a_synth
553 {
554 virtual void run();
555 public:
a_syn()556 a_syn() : a_synth(T_SSIF, T_WAVEFM) {};
557 };
558
559 void
run()560 a_syn::run()
561 {
562 if (!this_voice) shriek(861, "No current voice");
563 while (!this_voice->syn) init_syn(); // at most twice
564 wavefm *wfm = new wavefm(this_voice);
565 this_voice->syn->synssif(this_voice, (char *)inb, wfm);
566 // D_PRINT(1, "a_synth processes %d segments\n", ((segment *)inb)->code);
567
568 free(inb);
569 inb = NULL;
570 pass(wfm);
571 }
572
573 template <DATA_TYPE TYPE> class a_type : public agent
574 {
575 virtual void run();
name()576 virtual const char *name() { return "type spec"; };
577 public:
a_type()578 a_type() : agent(TYPE, TYPE) {};
579 };
580
581 template <DATA_TYPE TYPE> void
run()582 a_type<TYPE>::run()
583 {
584 void *outb = inb;
585 inb = NULL;
586 pass(outb);
587 }
588
589 class a_io : public agent
590 {
591 virtual void run() = 0;
592 protected:
593 int socket;
594 a_ttscp *dc;
595 bool close_upon_exit;
596 public:
597 a_io(const char *, DATA_TYPE, DATA_TYPE);
598 virtual ~a_io();
599 };
600
601
602 #define LOCALSOUNDAGENT "localsound"
603
604 extern int localsound;
605
special_io(const char * name,DATA_TYPE intype)606 socky int special_io(const char *name, DATA_TYPE intype)
607 {
608 if (intype == T_INPUT || strcmp(name, LOCALSOUNDAGENT))
609 shriek(415, "Bad stream component %s", name);
610 if (!cfg->localsound) shriek(453, "Not allowed to use localsound");
611
612 if (localsound != -1) return localsound;
613 int r = open(scfg->local_sound_device, O_WRONLY | O_NONBLOCK);
614 if (r == -1) shriek(462, "Could not open localsound device, error %d", errno);
615 localsound = r;
616 return r;
617 }
618
619 void stretch_sleep_tables(socky int);
620
a_io(const char * par,DATA_TYPE in,DATA_TYPE out)621 a_io::a_io(const char *par, DATA_TYPE in, DATA_TYPE out) : agent(in, out)
622 {
623 char *filename;
624
625 close_upon_exit = false;
626 dc = NULL;
627
628 switch(*par) {
629 case '$': dc = data_conns->translate(par + 1);
630 if (!dc) shriek(444, "Not a known data connection handle");
631 else socket = (in == T_INPUT ? dc->c->config->get__sd_in() : dc->c->config->get__sd_out());
632 break;
633 case '/': if (in == T_INPUT && !cfg->readfs)
634 shriek(454, "No filesystem inputs allowed");
635 if (out == T_NONE && !cfg->writefs)
636 shriek(454, "No filesystem outputs allowed");
637 filename = limit_pathname(par, cfg->pseudo_root_dir);
638 socket = open(filename, in == T_INPUT ? O_RDONLY | O_NONBLOCK | O_BINARY
639 : O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_BINARY, MODE_MASK);
640 free(filename);
641 if (socket == -1) shriek(445, "Cannot open file %s", par);
642 else close_upon_exit = true;
643 break;
644 case '#': socket = special_io(par + 1, in);
645 break;
646 default: shriek(462, "unimplemented i/o agent class");
647 /* if ever adding classes, take care of closing/nonclosing
648 * the socket upon exit */
649 }
650 if (socket >= 0) stretch_sleep_tables(socket);
651 D_PRINT(0, "I/O agent is %p\n", this);
652 }
653
~a_io()654 a_io::~a_io()
655 {
656 D_PRINT(0, "~a_io\n");
657 if (close_upon_exit) close_and_invalidate(socket);
658 }
659
660 class a_input : public a_io
661 {
662 int toread;
663 int offset;
664
665 virtual void run();
name()666 virtual const char *name() { return "input"; };
667 protected:
668 virtual bool mktask(int size);
669 public:
670 a_input(const char *);
671 };
672
a_input(const char * par)673 a_input::a_input(const char *par) : a_io(par, T_INPUT, T_TEXT)
674 {
675 }
676
677
run()678 void a_input::run()
679 {
680 int res;
681 D_PRINT(0, "Entering input agent\n");
682 if (block_table[socket]) {
683 D_PRINT(2, "avoiding a nested input\n");
684 block(socket);
685 return;
686 }
687 res = yread(socket, (char *)inb + offset, toread - offset);
688 if (res == -1 && errno == EAGAIN) {
689 D_PRINT(2, "avoiding an EAGAIN on input\n");
690 block(socket);
691 return;
692 }
693 if (res <= 0) {
694 if (!dc) {
695 if (res == 0) shriek(438, "end of file");
696 else shriek(437, "read error");
697 }
698 if (dc->ctrl) dc->ctrl->deps->remove(dc->handle);
699 c->leave();
700 delete data_conns->remove(dc->handle);
701 c->enter();
702 shriek(436, "data conn %d lost reading", socket);
703 }
704 offset += res;
705 if (offset == toread) {
706 void *dta = inb;
707 ((char *)dta)[offset] = 0;
708 switch (out) {
709 case T_SEGS:
710 if ((((segment *)inb)->code + 1) * (int)sizeof(segment) != offset)
711 shriek(432, "Received bad segments: %d segs, %d bytes",
712 ((segment *)inb)->code, offset);
713 break;
714 case T_WAVEFM:
715 wavefm *w;
716 w = new wavefm(this_voice);
717 w->become(inb, offset);
718 free(inb);
719 inb = NULL; toread = offset = 0;
720 pass(w);
721 return;
722 case T_TEXT:
723 encode_string((char *)inb, this_lang->charset, false);
724 default: ; /* otherwise no problem */
725 }
726 D_PRINT(2, "Read and about to process %s\n", (char *)dta);
727 inb = NULL;
728 toread = 0; // superfluous
729 offset = 0; // superfluous
730 pass(dta);
731 } else block(socket);
732 }
733
734 bool
mktask(int size)735 a_input::mktask(int size)
736 {
737 D_PRINT(2, "%d bytes to be read\n", size);
738 if (inb) return false; // busy
739 toread = size;
740 D_PRINT(0, "Alloc in a_input:\n");
741 inb = get_text_buffer(size);
742 offset = 0;
743 block(socket);
744 D_PRINT(1, "Apply task has been scheduled\n");
745 return true;
746 }
747
748
749 class a_output : public a_io
750 {
751 virtual int insize() = 0;
decode()752 virtual void decode() {};
753 virtual void run();
name()754 virtual const char *name() { return "output"; };
foreground()755 bool foreground() {return ((stream *)next)->foreground(); };
756 int written;
757 bool decoded;
758 protected:
759 void report(bool total, int written);
760 public:
a_output(const char * par,DATA_TYPE i)761 a_output(const char *par, DATA_TYPE i) : a_io(par, i, T_NONE) {written = 0; decoded = false;};
762 };
763
764 void
run()765 a_output::run()
766 {
767 if (push_table[socket]) {
768 push(socket);
769 return;
770 }
771 int size = insize();
772 if (!decoded) {
773 decode();
774 decoded = true;
775 }
776
777 if (written) {
778 int now_written = ywrite(socket, (char *)inb + written, size - written);
779 if (now_written == -1) {
780 if (errno == EAGAIN) {
781 push(socket);
782 return;
783 }
784 shriek(436, "data connection %d lost during writing", socket);
785 }
786 report(false, now_written);
787 written += now_written;
788 } else {
789 written = ywrite(socket, (char *)inb, size);
790 if (written) {
791 if (written == -1) {
792 if (errno == EAGAIN) {
793 push(socket);
794 return;
795 }
796 shriek(436, "data connection %d lost before writing", socket);
797 }
798 report(true, size);
799 report(false, written);
800 }
801 }
802 if (written == size) {
803 written = 0;
804 decoded = false;
805 relax();
806 finis(false);
807 } else push(socket);
808 }
809
810 void
report(bool total,int written)811 a_output::report(bool total, int written)
812 {
813 // inb = NULL;
814 if (foreground()) {
815 reply(total ? "122 total bytes" : "123 written bytes");
816 sprintf(scratch, " %d", written);
817 sputs(scratch, cfg->get__sd_out());
818 sputs("\r\n", cfg->get__sd_out());
819 }
820 }
821
822
823 template <DATA_TYPE type> class oa_ascii : public a_output
824 {
insize()825 virtual int insize() {
826 return strlen((char *)inb);
827 }
decode()828 virtual void decode() {
829 decode_string((char *)inb, this_lang->charset);
830 }
831 public:
oa_ascii(const char * s)832 oa_ascii(const char *s): a_output(s, type) {};
833 };
834
835 //class oa_stml : public a_output
836 //{
837 // virtual int insize() {
838 // return strlen((char *)inb);
839 // }
840 // public:
841 // oa_stml(const char *s): a_output(s, T_STML) {};
842 //};
843
844 class oa_seg : public a_output
845 {
insize()846 virtual int insize() {
847 D_PRINT(1, "Sending %d segments\n", ((segment *)inb)->code);
848 return ((segment *)inb)->code * sizeof(segment);
849 }
850 public:
oa_seg(const char * s)851 oa_seg(const char *s): a_output(s, T_SEGS) {};
852 };
853
854 class oa_wavefm : public a_output
855 {
insize()856 virtual int insize() {
857 shriek(462, "abstract oa_wavefm::insize"); return 0;
858 }
859 virtual void run();
860 virtual bool brk();
861 bool attached;
862 public:
oa_wavefm(const char * s)863 oa_wavefm(const char *s): a_output(s, T_WAVEFM) {attached = false;};
864 };
865
866 void
run()867 oa_wavefm::run()
868 {
869 wavefm *w = (wavefm *)inb;
870
871 if (!attached && !push_table[socket]) {
872 w->attach(socket);
873 report(false, w->written);
874 attached = true;
875 }
876 bool to_do;
877 while ((to_do = w->flush()) && w->written > 0) {
878 report(false, w->written);
879 }
880
881 if (to_do && w->written >= 0) push(socket);
882 else {
883 if (w->written == -1) {
884 shriek(436, "data conn %d lost writing", socket);
885 }
886 if (w->written > 0) report(false, w->written);
887 report(true, w->written_bytes());
888 w->detach(socket);
889 D_PRINT(1, "oa_wavefm wrote %d bytes\n", w->written_bytes());
890 attached = false;
891 delete w;
892 inb = NULL;
893 finis(false);
894 }
895 }
896
897 bool
brk()898 oa_wavefm::brk()
899 {
900 if (inb) {
901
902 wavefm *w = (wavefm *)inb;
903
904 report(true, w->written_bytes());
905 w->brk();
906 if (attached) w->detach(socket);
907 D_PRINT(1, "oa_wavefm wrote %d bytes\n", w->written_bytes());
908 attached = false;
909 relax();
910 }
911 finis(true);
912 return true;
913 }
914
915
916 /*
917 * A stream is a linked list of agents, one of them being the
918 * stream agent itself. stream->head is an input agent.
919 */
920
921 enum agent_type {AT_UNKNOWN, AT_CHUNK, AT_JOIN, AT_ASCII, AT_SSIF, AT_SEGS,
922 AT_PRINT, AT_RULES, AT_STML, AT_SYN, AT_TRAD_SYNTH,
923 AT_T_TEXT, AT_T_STML, AT_T_UNITS, AT_T_SSIF, AT_T_SEGS, AT_T_WAVEFM};
924 const char *agent_type_str = ":chunk:join:raw:dump:diphs:print:rules:stml:syn:synth:[t]:[s]:[i]:[p]:[d]:[w]:";
925
make_agent(char * s,agent * preceding)926 agent *make_agent(char *s, agent *preceding)
927 {
928 if (strchr("@#/.$", *s)) {
929 if (!preceding) return new a_input(s);
930 switch (preceding->out) {
931 case T_TEXT: return new oa_ascii<T_TEXT>(s);
932 case T_STML: return new oa_ascii<T_STML>(s);
933 case T_UNITS: shriek(448, "Units are hard to output");
934 case T_SSIF: return new oa_ascii<T_SSIF>(s);
935 case T_SEGS: return new oa_seg(s);
936 case T_WAVEFM: return new oa_wavefm(s);
937 default: shriek(462, "unimplmd oa");
938 }
939 }
940 switch ((agent_type)str2enum(s, agent_type_str, AT_UNKNOWN))
941 {
942 case AT_UNKNOWN: shriek(861, "Agent type bug.");
943 case AT_ASCII: return new a_ascii;
944 case AT_CHUNK: return new a_chunk;
945 case AT_SSIF: return new a_ssif;
946 case AT_SEGS: return new a_segs;
947 case AT_JOIN: return new a_join;
948 case AT_PRINT: return new a_print;
949 case AT_RULES: return new a_rules;
950 case AT_STML: return new a_stml;
951 case AT_SYN: return new a_syn;
952 case AT_TRAD_SYNTH: return new a_synth;
953
954 case AT_T_TEXT: return new a_type<T_TEXT>;
955 case AT_T_STML: return new a_type<T_STML>;
956 case AT_T_UNITS: return new a_type<T_UNITS>;
957 case AT_T_SSIF: return new a_type<T_SSIF>;
958 case AT_T_SEGS: return new a_type<T_SEGS>;
959 case AT_T_WAVEFM:return new a_type<T_WAVEFM>;
960
961 default: shriek(415, "Unknown agent type %s", s); return NULL;
962 }
963 }
964
965
stream(char * s,context * pc)966 stream::stream(char *s, context *pc) : agent(T_NONE, T_NONE)
967 {
968 char *tmp;
969 agent *a;
970 agent *l = head = NULL;
971
972 callbk = NULL;
973 c = pc;
974
975 tmp = strchr(s, LIST_DELIM);
976 if (!tmp) shriek(415, "Bad stream syntax");
977
978 do {
979 *tmp = 0;
980 D_PRINT(1, "Making agent out of %s\n", s);
981 try {
982 a = make_agent(s, NULL); a->c = c;
983 } catch (command_failed *e) {
984 release_agents();
985 throw e;
986 }
987 *tmp = LIST_DELIM;
988 s = ++tmp;
989 if (!l) head = a;
990 else l->next = a;
991 a->prev = l;
992 l = a;
993 } while((tmp = strchr(s, LIST_DELIM)));
994 a = make_agent(s, a); a->c = c;
995 l->next = a;
996 a->prev = l;
997 a->next = this;
998 if (head->next != this) head->out = head->next->in; /* adjust a_input type */
999 }
1000
~stream()1001 stream::~stream()
1002 {
1003 release_agents();
1004 // delete c;
1005 }
1006
1007 void
release_agents()1008 stream::release_agents()
1009 {
1010 for (agent *a = head; a && a != this; ) {
1011 agent *b = a;
1012 a = a->next;
1013 delete b;
1014 }
1015 }
1016
1017 void
apply(agent * ref,int bytes)1018 stream::apply(agent *ref, int bytes)
1019 {
1020 D_PRINT(2, "In stream::apply %p %p %d\n", head, ref, bytes);
1021 callbk = ref;
1022 head->mktask(bytes);
1023 }
1024
1025 bool
brk()1026 stream::brk()
1027 {
1028 if (!callbk) return false; /* break only if running */
1029 for (agent *a = head; a && a != this; a = a->next)
1030 a->brk();
1031 reply("401 interrupted");
1032 return true;
1033 }
1034
1035 void
run()1036 stream::run()
1037 {
1038 shriek(861, "scheduled a stream");
1039 }
1040
1041 void
finis(bool err)1042 stream::finis(bool err) // FIXME: simplify
1043 {
1044 if (err) reply("191 finis recovery");
1045 D_PRINT(2, "submitted a subtask\n");
1046 for (agent *a = head; a != this; a = a->next) {
1047 if (a->inb || a->pendin) {
1048 D_PRINT(2, "more subtasks are pending\n");
1049 if (err) {
1050 a->relax();
1051 D_PRINT(2, "subtasks discarded\n");
1052 if (callbk) callbk->schedule();
1053 else shriek(862, "double fault - no callback");
1054 callbk = NULL;
1055 return;
1056 }
1057 return;
1058 }
1059 }
1060 D_PRINT(2, "this has been the last subtask\n");
1061 if (!err) reply("200 output OK");
1062 if (callbk) callbk->schedule();
1063 else shriek(862, "no callback");
1064 callbk = NULL;
1065 }
1066
1067 class a_disconnector : public agent
1068 {
1069 virtual void run();
name()1070 virtual const char *name() { return "disconnector"; };
1071 a_protocol **to_delete;
1072 int last;
1073 int max;
1074 public:
1075 void disconnect(a_protocol *);
1076 a_disconnector();
1077 virtual ~a_disconnector();
1078 };
1079
a_disconnector()1080 a_disconnector::a_disconnector() : agent(T_NONE, T_NONE)
1081 {
1082 to_delete = (a_protocol **)xmalloc(sizeof(void *));
1083 last = 0;
1084 max = 1;
1085 }
1086
~a_disconnector()1087 a_disconnector::~a_disconnector()
1088 {
1089 if (last) shriek(861, "Forgot to disconnect a protocol agent!");
1090 free(to_delete);
1091 }
1092
run()1093 void a_disconnector::run()
1094 {
1095 if (!last) shriek(861, "Spurious disconnect");
1096 delete to_delete[--last];
1097 to_delete[last] = NULL;
1098 D_PRINT(0, "Disconnect finished, last is %d\n", last);
1099 }
1100
disconnect(a_protocol * moriturus)1101 void a_disconnector::disconnect(a_protocol *moriturus)
1102 {
1103 if (last == max && ! (max & max - 1)) {
1104 max <<= 1;
1105 to_delete = (a_protocol **)xrealloc(to_delete, max * sizeof(void *));
1106 }
1107 to_delete[last++] = moriturus;
1108 D_PRINT(0, "Disconnect requested, last is %d\n", last);
1109 schedule();
1110 }
1111
1112 a_disconnector disconnector;
1113
a_protocol()1114 a_protocol::a_protocol() : agent(T_NONE, T_NONE)
1115 {
1116 sgets_buff = get_text_cmd_buffer();
1117 *sgets_buff = 0;
1118 buffer = get_text_cmd_buffer();
1119 }
1120
~a_protocol()1121 a_protocol::~a_protocol()
1122 {
1123 free(sgets_buff);
1124 free(buffer);
1125 }
1126
run()1127 void a_protocol::run()
1128 {
1129 int res;
1130 res = sgets(buffer, cfg->max_net_cmd, cfg->get__sd_in(), sgets_buff);
1131 if (res < 0) {
1132 disconnect();
1133 return;
1134 }
1135
1136 encode_string(buffer, this_lang->charset, false); // FIXME (alloc->true)
1137
1138 if ((int)strlen(buffer) >= cfg->max_net_cmd)
1139 shriek(413, "Received command is too long");
1140 if (res > 0 && *buffer) switch (run_command(buffer)) {
1141 case PA_NEXT:
1142 D_PRINT(0, "PA_NEXT\n");
1143 if (strchr(sgets_buff, '\n')) schedule();
1144 else block(cfg->get__sd_in());
1145 return;
1146 case PA_DONE:
1147 D_PRINT(0, "PA_DONE\n");
1148 disconnect();
1149 return;
1150 case PA_WAIT:
1151 D_PRINT(0, "PA_WAIT\n");
1152 return;
1153 default:
1154 shriek(861, "Bad protocol action\n");
1155 }
1156 block(cfg->get__sd_in()); /* partial line read */
1157
1158 // leave_context(i);
1159
1160 // non-blocking get_line etc.
1161 }
1162
a_ttscp(socky int _sd_in,socky int _sd_out)1163 a_ttscp::a_ttscp(socky int _sd_in, socky int _sd_out) : a_protocol()
1164 {
1165 c = new context(_sd_in, _sd_out);
1166 c->enter();
1167
1168 handle = (char *)malloc(cfg->handle_size + 1);
1169 do make_rnd_passwd(handle, cfg->handle_size);
1170 while (data_conns->translate(handle));
1171 ctrl_conns->add(handle, this);
1172
1173 sputs(
1174 "TTSCP spoken here\r\n"
1175 "protocol: 0\r\n"
1176 "extensions:\r\n"
1177 "server: Epos\r\n"
1178 "release: " VERSION "\r\n"
1179 "handle: ", cfg->get__sd_out());
1180 sputs( handle, cfg->get__sd_out());
1181 sputs( "\r\n", cfg->get__sd_out());
1182 ctrl = NULL;
1183 deps = new hash_table<char, a_ttscp>(4);
1184 deps->dupdata = deps->dupkey = false;
1185 c->leave();
1186 block(_sd_in);
1187 }
1188
1189 /*
1190 * Warning: the following destructor runs in the master context;
1191 * therefore, be careful with using cfg etc.
1192 */
1193
~a_ttscp()1194 a_ttscp::~a_ttscp()
1195 {
1196 c->enter();
1197 if (cfg->current_stream) delete cfg->current_stream;
1198 cfg->current_stream = NULL;
1199 D_PRINT(2, "deleted context closes fd %d and %d\n", cfg->get__sd_in(), cfg->get__sd_out());
1200 c->leave();
1201 while (deps->items) {
1202 a_ttscp *tmp = deps->translate(deps->get_random());
1203 deps->remove(tmp->handle);
1204 delete data_conns->remove(tmp->handle);
1205 }
1206 delete deps;
1207 c->enter();
1208 if (cfg->get__sd_in() != -1)
1209 close_and_invalidate(cfg->get__sd_in());
1210 if (cfg->get__sd_out() != -1 && cfg->get__sd_out() != cfg->get__sd_in())
1211 close_and_invalidate(cfg->get__sd_out());
1212 if (data_conns->translate(handle) || ctrl_conns->translate(handle))
1213 shriek(862, "Forgot to forget a_ttscp");
1214
1215 free(handle);
1216 c->leave();
1217 delete c;
1218 // close the descriptor? no, the ~context does that
1219 }
1220
1221 bool
brk()1222 a_ttscp::brk()
1223 {
1224 if (c->config->current_stream)
1225 return c->config->current_stream->brk();
1226 return false;
1227 }
1228
1229 int
run_command(char * cmd)1230 a_ttscp::run_command(char *cmd)
1231 {
1232 char *keyword;
1233 char *param;
1234
1235 D_PRINT(2, "[ cmd] %s\n", cmd);
1236
1237 keyword = cmd + strspn (cmd, WHITESPACE);
1238 param = keyword + strcspn(keyword, WHITESPACE);
1239 if (param - keyword != 4) goto bad; /* all cmds are 4 chars */
1240
1241 if (!*param) param = NULL;
1242 else *param++ = 0;
1243
1244 if (param) param += strspn(param, WHITESPACE);
1245
1246 int i;
1247 for (i=0; ttscp_cmd_set[i].name &&
1248 (*(const int *)keyword != *(const int *)&ttscp_cmd_set[i].name);)
1249 i++;
1250 if (!ttscp_cmd_set[i].name) goto bad;
1251
1252 if (!param && ttscp_cmd_set[i].param == PAR_REQ) {
1253 reply("417 parameter missing");
1254 return PA_NEXT;
1255 }
1256 if (param && ttscp_cmd_set[i].param == PAR_FORBIDDEN) {
1257 reply("416 parameter not allowed");
1258 return PA_NEXT;
1259 }
1260
1261 try {
1262 return ttscp_cmd_set[i].impl(param, this);
1263 } catch (command_failed *e) {
1264 D_PRINT(2, "Command failed, %d, %.60s\n", e->code, e->msg);
1265 reply(e->code, e->msg);
1266 delete e;
1267 return PA_NEXT;
1268 } catch (connection_lost *d) {
1269 D_PRINT(2, "Releasing a TTSCP control connection, %d, %.60s\n", d->code, d->msg);
1270 reply(d->code, d->msg); /* just in case */
1271 reply(201, fmt("debug %d", cfg->get__sd_in()));
1272 delete d;
1273 return PA_DONE;
1274 }
1275
1276 bad:
1277 cmd_bad(cmd);
1278 return PA_NEXT;
1279 }
1280
1281 void
disconnect()1282 a_ttscp::disconnect()
1283 {
1284 D_PRINT(2, "ctrl conn %d lost\n", cfg->get__sd_in());
1285 if (this != ctrl_conns->remove(handle) /* && this != data_conns->remove(handle) */ )
1286 shriek(862, "Failed to disconnect a ctrl connection");
1287 disconnector.disconnect(this);
1288 }
1289
make_nonblocking(int f)1290 void make_nonblocking(int f)
1291 {
1292 #ifdef HAVE_WINSOCK
1293 ioctlsocket((unsigned long int)f, FIONBIO, (unsigned long int *)&make_nonblocking); // &make_nonblocking is a dummy non-NULL pointer
1294 #else
1295 fcntl(f, F_SETFL, O_NONBLOCK);
1296 #endif
1297 }
1298
1299 #ifndef HAVE_GETHOSTNAME
gethostname(char * b,size_t)1300 int gethostname(char *b, size_t)
1301 {
1302 strcpy(b, "localhost");
1303 return 0;
1304 }
1305 #endif
1306
a_accept()1307 a_accept::a_accept() : agent(T_NONE, T_NONE)
1308 {
1309 static sockaddr_in sa;
1310
1311 char one = 1;
1312
1313 // c = new context(-1, /*** dark errors ***/ DARK_ERRLOG); //FIXME
1314 // c->enter();
1315
1316 listening = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1317 memset(&sa, 0, sizeof(sa));
1318 gethostname(scratch, scfg->scratch_size - 1);
1319 sa.sin_family = AF_INET;
1320 sa.sin_addr.s_addr = htonl(scfg->local_only ? INADDR_LOOPBACK : INADDR_ANY);
1321 sa.sin_port = htons(scfg->listen_port);
1322 setsockopt(listening, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
1323 D_PRINT(3, "* Binding to the TTSCP port %d.\n", scfg->listen_port);
1324 if (bind(listening, (sockaddr *)&sa, sizeof (sa))) shriek(871, "Could not bind");
1325 if (listen(listening, 64)) shriek(871, "Could not listen");
1326 make_nonblocking(listening);
1327
1328 ia.sin_family = AF_INET;
1329 ia.sin_addr.s_addr = htonl(INADDR_ANY);
1330 ia.sin_port = 0;
1331
1332 block(listening);
1333 // c->leave();
1334 }
1335
~a_accept()1336 a_accept::~a_accept()
1337 {
1338 close (listening);
1339 // close (c->config->_sd_in);
1340 // delete c;
1341 }
1342
1343 void
run()1344 a_accept::run()
1345 {
1346 static socklen_t sia = sizeof(sockaddr); // Will __QNX__ complain?
1347 int f = accept(listening, (sockaddr *)&ia, &sia);
1348 if (f == -1) {
1349 // shriek(871, "Cannot accept() - network problem (errno %d)", errno);
1350 D_PRINT(3, "Cannot accept() - errno %d! Madly looping.\n", errno);
1351 if (errno != EAGAIN) schedule();
1352 return;
1353 }
1354 make_nonblocking(f);
1355 D_PRINT(2, "Accepted %d (on %d).\n", f, listening);
1356 // c->leave();
1357 unuse(new a_ttscp(f, f));
1358 // c->enter();
1359 block(listening);
1360 }
1361
1362 struct sched_aq
1363 {
1364 agent *ag;
1365 sched_aq *next;
1366 sched_aq *prev;
1367
1368 void *operator new(size_t size);
1369 void operator delete(void *ptr);
1370 };
1371
1372 SLABIFY(sched_aq, sched_aq_slab, 341, shutdown_sched_aq);
1373
1374 sched_aq *sched_head = NULL;
1375 sched_aq *sched_tail = NULL;
1376
1377 int runnable_agents = 0;
1378
1379 void
schedule()1380 agent::schedule()
1381 {
1382 if (!this) shriek(862, "scheduling garbage");
1383 runnable_agents++;
1384 D_PRINT(1, "%d runnable agents\n", runnable_agents);
1385 sched_aq *tmp = new sched_aq;
1386 tmp->ag = this;
1387 tmp->prev = NULL;
1388 tmp->next = sched_head;
1389 if (sched_head) sched_head->prev = tmp;
1390 else sched_tail = tmp;
1391 sched_head = tmp;
1392 }
1393
sched_sel()1394 agent *sched_sel()
1395 {
1396 agent *r;
1397 sched_aq *tmp;
1398 if (!sched_tail) shriek(862, "agent queue empty");
1399 if (!sched_tail->prev) sched_head = NULL;
1400 else sched_tail->prev->next = NULL;
1401 r = sched_tail->ag;
1402 tmp = sched_tail;
1403 sched_tail = sched_tail->prev;
1404 delete tmp;
1405 runnable_agents--;
1406 D_PRINT(1, "Agent %s\n", r->name());
1407 return r;
1408 }
1409
shutdown_agent_queue()1410 void shutdown_agent_queue()
1411 {
1412 for (sched_aq *tmp = sched_head; tmp; tmp = tmp->next)
1413 delete tmp;
1414 }
1415
1416 agent **block_table = (agent **)xmalloc(1);
1417 agent **push_table = (agent **)xmalloc(1);
1418 fd_set block_set;
1419 fd_set push_set;
1420 socky int select_fd_max = 0;
1421
stretch_sleep_tables(socky int fd)1422 void stretch_sleep_tables(socky int fd)
1423 {
1424 if (select_fd_max <= fd) {
1425 block_table = (agent **)xrealloc(block_table, (fd + 1) * sizeof(agent *));
1426 push_table = (agent **)xrealloc(push_table, (fd + 1) * sizeof(agent *));
1427 for ( ; select_fd_max <= fd; select_fd_max++) {
1428 block_table[select_fd_max] = NULL;
1429 push_table[select_fd_max] = NULL;
1430 }
1431 }
1432 }
1433
1434 /*
1435 * agent::run() should return after calling block() or push()
1436 */
1437
1438 void
block(socky int fd)1439 agent::block(socky int fd)
1440 {
1441 D_PRINT(1, "Sleeping on %d\n", fd);
1442 stretch_sleep_tables(fd);
1443 if (block_table[fd]) {
1444 agent *a;
1445
1446 if (this == block_table[fd])
1447 shriek(861, "Resleeping on %d", fd);
1448 if (!FD_ISSET(fd, &block_set))
1449 shriek(861, "Countersleeping on %d", fd);
1450 for (a = block_table[fd]; a->dep; a = a->dep) ;
1451 a->dep = this;
1452 } else {
1453 block_table[fd] = this;
1454 FD_SET(fd, &block_set);
1455 }
1456 return;
1457 }
1458
1459 void
push(socky int fd)1460 agent::push(socky int fd)
1461 {
1462 D_PRINT(1, "Pushing on %d\n", fd);
1463 stretch_sleep_tables(fd);
1464 if (push_table[fd]) {
1465 agent *a;
1466
1467 if (this == push_table[fd])
1468 shriek(861, "Resleeping on %d", fd);
1469 if (!FD_ISSET(fd, &push_set))
1470 shriek(861, "Countersleeping on %d", fd);
1471 for (a = push_table[fd]; a->dep; a = a->dep) ;
1472 a->dep = this;
1473 } else {
1474 push_table[fd] = this;
1475 FD_SET(fd, &push_set);
1476 }
1477 return;
1478 }
1479
1480