1 /*
2  *	epos/src/agent.cc
3  *	(c) 1998-01 geo@cuni.cz
4  *
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9 
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License in doc/COPYING for more details.
14  *
15  */
16 
17 #include "epos.h"
18 #include "agent.h"
19 #include "client.h"
20 #include "slab.h"
21 
22 #ifdef HAVE_SYS_TIME_H
23 	#include <sys/time.h>
24 #endif
25 
26 #ifdef HAVE_UNIX_H
27 	#include <unix.h>
28 #endif
29 
30 #ifdef HAVE_SYS_SOCKET_H
31 	#include <sys/socket.h>
32 #endif
33 
34 #ifndef HAVE_SOCKLEN_T
35 	#define socklen_t int
36 #endif
37 
38 #ifdef HAVE_WINSOCK2_H
39 	#include <winsock2.h>
40 #else
41 	#ifdef HAVE_WINSOCK_H
42 		#include <winsock.h>
43 	#endif
44 #endif
45 
46 #ifdef HAVE_SYS_STAT_H
47 	#include <sys/stat.h>
48 #endif
49 
50 #ifdef HAVE_FCNTL_H
51 	#include <fcntl.h>
52 #endif
53 
54 #ifdef HAVE_ERRNO_H
55 	#include <errno.h>
56 #endif
57 
58 #ifndef O_NONBLOCK
59 	#define O_NONBLOCK 0
60 #endif
61 
62 
63 #define DARK_ERRLOG 2	/* 2 == stderr; for global stdshriek and stddbg output */
64 
65 #ifdef HAVE_GETTIMEOFDAY
66 
agent_profile(const char * s)67 inline void agent_profile(const char *s)
68 {
69 	if (!scfg->profile || !*scfg->profile) return;
70 
71 	static FILE *log = NULL;
72 	if (!log) log = fopen(scfg->profile, "w", "profile");
73 	static struct timeval start, stop;
74 	if (!s) {
75 		if (gettimeofday(&start, NULL)) shriek(861, "profiler fails");
76 		int duration = start.tv_sec - stop.tv_sec;
77 		duration *= 1000000;
78 		duration += start.tv_usec - stop.tv_usec;
79 		fprintf(log, "%10ld", duration);
80 		fflush(log);
81 		if (gettimeofday(&start, NULL)) shriek(861, "profiler fails");
82 	} else {
83 		if (gettimeofday(&stop, NULL)) shriek(861, "profiler fails");
84 		int duration = stop.tv_sec - start.tv_sec;
85 		duration *= 1000000;
86 		duration += stop.tv_usec - start.tv_usec;
87 		fprintf(log, " %-13s%8ld\n", s, duration);
88 		fflush(log);
89 		if (gettimeofday(&stop, NULL)) shriek(861, "profiler fails");
90 	}
91 }
92 
93 #else
agent_profile(const char * s)94 	inline void agent_profile(const char *s) { return; }
95 #endif
96 
97 
agent(DATA_TYPE typein,DATA_TYPE typeout)98 agent::agent(DATA_TYPE typein, DATA_TYPE typeout)
99 {
100 	in = typein, out = typeout;
101 	next = prev = NULL, inb = NULL;
102 	pendin = pendout = NULL; pendcount = 0;
103 	c = NULL;
104 	dep = NULL;
105 	D_PRINT(1, "Creating a handler, intype %i, outtype %i\n", typein, typeout);
106 }
107 
~agent()108 agent::~agent()
109 {
110 //	D_PRINT(1, "Handler deleted.\n");
111 }
112 
113 inline void
unquench()114 agent::unquench()
115 {
116 	if (inb || pendin) schedule();
117 	/* can not unquench input agents */
118 }
119 
120 void
timeslice()121 agent::timeslice()
122 {
123 	c->enter();
124 	D_PRINT(1, "Timeslice for %s\n", name());
125 	D_PRINT(0, "pendcount %d, next->pendcount %d, pend_max %d\n", pendcount, next ? next->pendcount : -42, scfg->pend_max);
126 	if (next && next->pendcount > scfg->pend_max) {
127 		D_PRINT(1, "(satiated)\n");
128 		c->leave();
129 		return;
130 	}
131 	if (!inb && in != T_NONE) {
132 		if (!pendin) {
133 			/* We may take this path legally after an intr command */
134 			D_PRINT(2, "No input of type %i; shrugging off\n", in);
135 			c->leave();
136 			return;
137 		}
138 		D_PRINT(2, "Getting pending task, type %i\n", in);
139 		pend_ll *tmp = pendin;
140 		inb = tmp->d;
141 		pendin = tmp->next;
142 		delete tmp;
143 		if (pendcount-- == scfg->pend_min) prev->unquench();
144 		if (!pendin) {
145 			prev->pendout = NULL;
146 			if (pendcount) shriek(862, "pending count incorrect");
147 		}
148 	}
149 	try {
150 		agent_profile(NULL);
151 		run();
152 		agent_profile(name());
153 	} catch (command_failed *e) {
154 		if (!next) throw e;
155 		D_PRINT(2, "Processing failed, %d, %.60s\n", e->code, e->msg);
156 		reply(e->code, e->msg);
157 		delete e;
158 		finis(true);
159 	}
160 	if (pendcount && !inb) schedule();
161 	c->leave();
162 
163 	agent *tmp;
164 	for (agent *a = dep; a; a = tmp) {
165 		D_PRINT(1, "Scheduling %s because of %s\n", a->name(), name());
166 		tmp = a->dep;
167 		a->schedule();
168 		a->dep = NULL;
169 	}
170 	dep = NULL;
171 }
172 
173 bool
mktask(int)174 agent::mktask(int)
175 {
176 	return false;	// except for the input agent, agents can't start a task
177 }
178 
do_relax(void * ptr,DATA_TYPE type)179 inline void do_relax(void *ptr, DATA_TYPE type)
180 {
181 	if (ptr) switch(type) {
182 		case T_NONE:	break;
183 		case T_INPUT:	free(ptr); break;
184 		case T_STML:
185 		case T_TEXT:	free(ptr); break;
186 		case T_UNITS:	delete ((unit *)ptr); break;
187 		case T_SEGS:	free(ptr); break;
188 		case T_SSIF:	free(ptr); break;
189 		case T_WAVEFM:	delete((wavefm *)ptr); break;
190 	}
191 }
192 
193 void
relax()194 agent::relax()
195 {
196 	if (inb) {
197 		do_relax(inb, in);	/* branch likely */
198 		inb = NULL;
199 	}
200 	if (!next) return;
201 	for (pend_ll *p = next->pendin; p != NULL; ) {
202 		do_relax(p->d, out);
203 		pend_ll *n = p->next;
204 		next->pendcount--;
205 		delete p;
206 		p = n;
207 	}
208 	pendout = next->pendin = NULL;
209 	if (next->pendcount) shriek(862, "pending count incorrect");
210 }
211 
212 void
finis(bool err)213 agent::finis(bool err)
214 {
215 	if (!next) shriek(861, "Non-module finished");
216 	agent *a = next;
217 	while (a->next) a = a->next;
218 	a->finis(err);
219 }
220 
221 /*
222  *	There is a slight difference between relax() and brk().  Both discard all
223  *	pending input.  In addition, oa_wavefm::brk()  (only) discards any
224  *	waveform already written to the sound card;
225  *	relax() would call sync_soundcard() instead.
226  */
227 
brk()228 bool agent::brk()
229 {
230 	relax();
231 	D_PRINT(1, "interrupting an agent, intype %i, outtype %i\n", in, out);
232 	return true;
233 }
234 
235 void
pass(void * ptr)236 agent::pass(void *ptr)
237 {
238 	if (!ptr) shriek(862, "Nothing to pass");
239 	if (!next) shriek(862, "Nowhere to pass to");
240 	if (pendout || next->inb) {
241 		pendout = (pendout ? pendout->next : next->pendin) = new pend_ll(ptr, NULL);
242 		next->pendcount++;
243 	} else {
244 		next->inb = ptr;
245 		next->schedule();
246 	}
247 }
248 
249 class a_ascii : public agent
250 {
251 	virtual void run();
name()252 	virtual const char *name() { return "raw"; };
253    public:
a_ascii()254 	a_ascii() : agent(T_TEXT, T_UNITS) {};
255 };
256 
257 void
run()258 a_ascii::run()
259 {
260 	void *r = str2units((char *)inb);
261 	free(inb);
262 	inb = NULL;
263 	pass(r);
264 }
265 
266 
267 class a_stml : public agent
268 {
269 	virtual void run();
name()270 	virtual const char *name() { return "stml"; };
271    public:
a_stml()272 	a_stml() : agent(T_STML, T_UNITS) {};
273 };
274 
275 void
run()276 a_stml::run()
277 {
278 	shriek(462, "STML parser not available");
279 }
280 
281 
282 class a_rules : public agent
283 {
284 	virtual void run();
name()285 	virtual const char *name() { return "rules"; };
286    public:
a_rules()287 	a_rules() : agent(T_UNITS, T_UNITS) {};
288 };
289 
290 void
run()291 a_rules::run()
292 {
293 	void *r = inb;
294 	inb = NULL;
295 	this_lang->ruleset->apply((unit *)r);
296 	pass(r);
297 }
298 
299 
300 class a_print : public agent
301 {
302 	virtual void run();
name()303 	virtual const char *name() { return "print"; };
304    public:
a_print()305 	a_print() : agent(T_UNITS, T_TEXT) {};
306 };
307 
308 void
run()309 a_print::run()
310 {
311 	char *b;
312 
313 	b = ((unit *)inb)->gather(false /* no ^$ */, true /* incl. ssegs */ );
314 
315 	delete (unit *) inb;
316 	inb = NULL;
317 	pass(get_text_buffer(b));
318 }
319 
320 
321 class a_segs : public agent
322 {
323 	virtual void run();
name()324 	virtual const char *name() { return "segs"; };
325 	int position;
326    public:
a_segs()327 	a_segs() : agent(T_UNITS, T_SEGS) {position = 0;};
328 };
329 
330 #define INIT_SEGS_BS	2048
331 #define INIT_SSIF_BS	4096
332 
333 void
run()334 a_segs::run()
335 {
336 	int sbs = cfg->seg_buff_size ? cfg->seg_buff_size : INIT_SEGS_BS;
337 	segment *d = (segment *)xmalloc((sbs + 1) * sizeof(segment));
338 	segment *c = d + 1;
339 	int n;
340 	int items = 0;
341 
342 	unit *root = *(unit **)&inb;
343 	root->project(scfg->_segm_level);
344 again:
345 	n = root->write_segs(c, position, sbs);
346 	D_PRINT(1, "Writing at most %d segs: wrote %d segs\n", sbs, n);
347 	position += n;
348 	items += n;
349 	if (n >= sbs) {
350 		if (cfg->seg_buff_size) {
351 			shriek(462, "Cannot combine nonzero seg_buff_size with the traditional SSIF");
352 		} else {
353 			d = (segment *)xrealloc(d, (sbs + 1 + position) * sizeof(segment));
354 			c = d + 1 + position;
355 			goto again;
356 		}
357 	} else {
358 		delete root;
359 		inb = NULL;
360 		position = 0;
361 	}
362 	d->code = items;
363 	d->nothing = d->ll = 0;
364 	D_PRINT(1, "agent segs generated %d segments\n", n);
365 	pass(d);
366 }
367 
368 class a_ssif : public agent
369 {
370 	virtual void run();
name()371 	virtual const char *name() { return "to-mbrola"; };
372 	int position;
373    public:
a_ssif()374 	a_ssif() : agent(T_UNITS, T_SSIF) {position = 0;};
375 };
376 
377 void
run()378 a_ssif::run()
379 {
380 	int ssifbs = cfg->ssif_buff_size ? cfg->ssif_buff_size : INIT_SSIF_BS;
381 	char *d = (char *)xmalloc((ssifbs + 1));
382 	char *c = d /* + sizeof header */;
383 	int n;
384 	int items = 0;
385 
386 	unit *root = *(unit **)&inb;
387 	root->project(scfg->_phone_level);
388 again:
389 	n = root->write_ssif(c, position, ssifbs);
390 	position += n;
391 	items += n;
392 	if (cfg->ssif_buff_size && n >= ssifbs) {
393 		d = (char *)xrealloc(d, (ssifbs + 1 + position));
394 		c = d /* + sizeof header */ + strlen(d);		/* FIXME: efficiency */
395 		goto again;
396 	} else {
397 		delete root;
398 		inb = NULL;
399 		position = 0;
400 	}
401 	D_PRINT(1, "agent ssif generated %d items\n", n);
402 	pass(d);
403 }
404 
405 class a_chunk : public agent
406 {
407 	char *bm;
408 	virtual void run();
name()409 	virtual const char *name() { return "chunk"; };
410    public:
a_chunk()411 	a_chunk() : agent(T_TEXT, T_TEXT) { bm = NULL; };
412 };
413 
utt_break(char * t)414 inline char *utt_break(char *t)		/* returns ptr past the last char */
415 {
416 	char *r = t;
417 	do {				/* split between . ? ... and whitespace */
418 		r += strcspn(r, ".?");
419 		r += strspn(r, ".?");
420 	} while (!strchr(WHITESPACE, r[0]));
421 
422 	if (r - t > scfg->max_utterance) {
423 		r = t + strcspn(t, ".,?!:;");
424 		if (*r) r++;
425 	} else return r;
426 
427 	if (r - t > scfg->max_utterance) {
428 		r = t + strcspn(t, ".,?!:;-=+_~@#$%^&*\\|/ \t\n");
429 		if (*r) r++;
430 	} else return r;
431 
432 	if (r - t > scfg->max_utterance) {
433 		r = t + strcspn(t, "()<>{}[]'\"");
434 	} else return r;
435 
436 	if (r - t > scfg->max_utterance) {
437 		if (scfg->split_utterance < (int)strlen(t))
438 			r = t + scfg->split_utterance;
439 	}
440 
441 	return r;
442 }
443 
run()444 void a_chunk::run()
445 {
446 	if (!bm) {
447 		bm = (char *)inb;
448 	}
449 	char *tmp = bm;
450 	bm = utt_break(bm);
451 	D_PRINT(2, "Utterance chunking about to split off %d bytes\n", bm - tmp);
452 	if (bm && *bm) {
453 		char h = bm[0];
454 		bm[0] = 0;
455 		pass(get_text_buffer(tmp));
456 		bm[0] = h;
457 		schedule();
458 	} else {
459 		pass(get_text_buffer(tmp));
460 		free(inb);
461 		inb = NULL;
462 		bm = NULL;
463 	}
464 }
465 
466 class a_join : public agent
467 {
468 	char *heldout;
469 	virtual void run();
name()470 	virtual const char *name() { return "join"; };
471    public:
a_join()472 	a_join() : agent(T_TEXT, T_TEXT) { heldout = NULL; };
473 };
474 
475 void
run()476 a_join::run()
477 {
478 	char *b;
479 	char *last = (char *)inb;
480 	if (heldout) {
481 		b = (char *)xmalloc(strlen(heldout) + strlen(last) + 1);
482 		strcpy(b, heldout);
483 		strcat(b, last);
484 		free(last);
485 		free(heldout); heldout = NULL;
486 	} else b = last;
487 
488 	if (*utt_break(b)) {
489 		char *decodable = get_text_buffer(b);
490 		free(b);
491 		pass(decodable);
492 	} else heldout = b;
493 }
494 
495 class a_synth : public agent
496 {
497 	virtual void run();
name()498 	virtual const char *name() { return "synth"; };
499    protected:
500 	void init_syn();
501    public:
a_synth(DATA_TYPE din,DATA_TYPE dout)502 	a_synth(DATA_TYPE din, DATA_TYPE dout) : agent(din, dout) {};
a_synth()503 	a_synth() : agent(T_SEGS, T_WAVEFM) {};
504 };
505 
fallbackable_error(int code)506 bool fallbackable_error(int code)
507 {
508 	switch (this_lang->fallback_mode) {
509 		case 0: return false;
510 		case 1: return true;
511 		case 4: return code == 445 || code / 10 == 47;
512 		case 7: return code / 10 == 47;
513 		default: return code == this_lang->fallback_mode;
514 	}
515 }
516 
517 void
init_syn()518 a_synth::init_syn()
519 {
520 	try {
521 		this_voice->syn = this_voice->setup_synth();
522 	} catch (command_failed *e) {
523 		if (fallbackable_error(e->code)
524 				&& this_lang->fallback_voice
525 				&& *this_lang->fallback_voice) {
526 			voice_switch(this_lang->fallback_voice);
527 			if (this_lang->permanent_fallbacks) {
528 				c->leave();
529 				voice_switch(this_lang->fallback_voice);
530 				c->enter();
531 			}
532 			delete e;
533 			return;
534 		} else throw e;
535 	}
536 }
537 
538 void
run()539 a_synth::run()
540 {
541 	if (!this_voice) shriek(861, "No current voice");
542 	while (!this_voice->syn) init_syn();	// at most twice
543 	wavefm *wfm = new wavefm(this_voice);
544 	this_voice->syn->synsegs(this_voice, (segment *)inb + 1, ((segment*)inb)->code, wfm);
545 	D_PRINT(1, "a_synth processes %d segments\n", ((segment *)inb)->code);
546 
547 	free(inb);
548 	inb = NULL;
549 	pass(wfm);
550 }
551 
552 class a_syn : public a_synth
553 {
554 	virtual void run();
555    public:
a_syn()556 	a_syn() : a_synth(T_SSIF, T_WAVEFM) {};
557 };
558 
559 void
run()560 a_syn::run()
561 {
562 	if (!this_voice) shriek(861, "No current voice");
563 	while (!this_voice->syn) init_syn();	// at most twice
564 	wavefm *wfm = new wavefm(this_voice);
565 	this_voice->syn->synssif(this_voice, (char *)inb, wfm);
566 //	D_PRINT(1, "a_synth processes %d segments\n", ((segment *)inb)->code);
567 
568 	free(inb);
569 	inb = NULL;
570 	pass(wfm);
571 }
572 
573 template <DATA_TYPE TYPE> class a_type : public agent
574 {
575 	virtual void run();
name()576 	virtual const char *name() { return "type spec"; };
577    public:
a_type()578 	a_type() : agent(TYPE, TYPE) {};
579 };
580 
581 template <DATA_TYPE TYPE> void
run()582 a_type<TYPE>::run()
583 {
584 	void *outb = inb;
585 	inb = NULL;
586 	pass(outb);
587 }
588 
589 class a_io : public agent
590 {
591 	virtual void run() = 0;
592    protected:
593 	int socket;
594 	a_ttscp *dc;
595 	bool close_upon_exit;
596    public:
597 	a_io(const char *, DATA_TYPE, DATA_TYPE);
598 	virtual ~a_io();
599 };
600 
601 
602 #define LOCALSOUNDAGENT "localsound"
603 
604 extern int localsound;
605 
special_io(const char * name,DATA_TYPE intype)606 socky int special_io(const char *name, DATA_TYPE intype)
607 {
608 	if (intype == T_INPUT || strcmp(name, LOCALSOUNDAGENT))
609 		shriek(415, "Bad stream component %s", name);
610 	if (!cfg->localsound) shriek(453, "Not allowed to use localsound");
611 
612 	if (localsound != -1) return localsound;
613 	int r = open(scfg->local_sound_device, O_WRONLY | O_NONBLOCK);
614 	if (r == -1) shriek(462, "Could not open localsound device, error %d", errno);
615 	localsound = r;
616 	return r;
617 }
618 
619 void stretch_sleep_tables(socky int);
620 
a_io(const char * par,DATA_TYPE in,DATA_TYPE out)621 a_io::a_io(const char *par, DATA_TYPE in, DATA_TYPE out) : agent(in, out)
622 {
623 	char *filename;
624 
625 	close_upon_exit = false;
626 	dc = NULL;
627 
628 	switch(*par) {
629 		case '$': dc = data_conns->translate(par + 1);
630 			  if (!dc) shriek(444, "Not a known data connection handle");
631 			  else socket = (in == T_INPUT ? dc->c->config->get__sd_in() : dc->c->config->get__sd_out());
632 			  break;
633 		case '/': if (in == T_INPUT && !cfg->readfs)
634 				shriek(454, "No filesystem inputs allowed");
635 			  if (out == T_NONE && !cfg->writefs)
636 			  	shriek(454, "No filesystem outputs allowed");
637 			  filename = limit_pathname(par, cfg->pseudo_root_dir);
638 			  socket = open(filename, in == T_INPUT ? O_RDONLY | O_NONBLOCK | O_BINARY
639 						: O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_BINARY, MODE_MASK);
640 			  free(filename);
641 			  if (socket == -1) shriek(445, "Cannot open file %s", par);
642 			  else close_upon_exit = true;
643 			  break;
644 		case '#': socket = special_io(par + 1, in);
645 			  break;
646 		default:  shriek(462, "unimplemented i/o agent class");
647 			/* if ever adding classes, take care of closing/nonclosing
648 			 * the socket upon exit        */
649 	}
650 	if (socket >= 0) stretch_sleep_tables(socket);
651 	D_PRINT(0, "I/O agent is %p\n", this);
652 }
653 
~a_io()654 a_io::~a_io()
655 {
656 	D_PRINT(0, "~a_io\n");
657 	if (close_upon_exit) close_and_invalidate(socket);
658 }
659 
660 class a_input : public a_io
661 {
662 	int toread;
663 	int offset;
664 
665 	virtual void run();
name()666 	virtual const char *name() { return "input"; };
667    protected:
668 	virtual bool mktask(int size);
669    public:
670 	a_input(const char *);
671 };
672 
a_input(const char * par)673 a_input::a_input(const char *par) : a_io(par, T_INPUT, T_TEXT)
674 {
675 }
676 
677 
run()678 void a_input::run()
679 {
680 	int res;
681 	D_PRINT(0, "Entering input agent\n");
682 	if (block_table[socket]) {
683 		D_PRINT(2, "avoiding a nested input\n");
684 		block(socket);
685 		return;
686 	}
687 	res = yread(socket, (char *)inb + offset, toread - offset);
688 	if (res == -1 && errno == EAGAIN) {
689 		D_PRINT(2, "avoiding an EAGAIN on input\n");
690 		block(socket);
691 		return;
692 	}
693 	if (res <= 0) {
694 		if (!dc) {
695 			if (res == 0) shriek(438, "end of file");
696 			else shriek(437, "read error");
697 		}
698 		if (dc->ctrl) dc->ctrl->deps->remove(dc->handle);
699 		c->leave();
700 		delete data_conns->remove(dc->handle);
701 		c->enter();
702 		shriek(436, "data conn %d lost reading", socket);
703 	}
704 	offset += res;
705 	if (offset == toread) {
706 		void *dta = inb;
707 		((char *)dta)[offset] = 0;
708 		switch (out) {
709 			case T_SEGS:
710 				if ((((segment *)inb)->code + 1) * (int)sizeof(segment) != offset)
711 					shriek(432, "Received bad segments: %d segs, %d bytes",
712 						((segment *)inb)->code, offset);
713 				break;
714 			case T_WAVEFM:
715 				wavefm *w;
716 				w = new wavefm(this_voice);
717 				w->become(inb, offset);
718 				free(inb);
719 				inb = NULL; toread = offset = 0;
720 				pass(w);
721 				return;
722 			case T_TEXT:
723 				encode_string((char *)inb, this_lang->charset, false);
724 			default: ;	/* otherwise no problem */
725 		}
726 		D_PRINT(2, "Read and about to process %s\n", (char *)dta);
727 		inb = NULL;
728 		toread = 0;	// superfluous
729 		offset = 0;	// superfluous
730 		pass(dta);
731 	} else block(socket);
732 }
733 
734 bool
mktask(int size)735 a_input::mktask(int size)
736 {
737 	D_PRINT(2, "%d bytes to be read\n", size);
738 	if (inb) return false;	// busy
739 	toread = size;
740 	D_PRINT(0, "Alloc in a_input:\n");
741 	inb = get_text_buffer(size);
742 	offset = 0;
743 	block(socket);
744 	D_PRINT(1, "Apply task has been scheduled\n");
745 	return true;
746 }
747 
748 
749 class a_output : public a_io
750 {
751 	virtual int insize() = 0;
decode()752 	virtual void decode() {};
753 	virtual void run();
name()754 	virtual const char *name() { return "output"; };
foreground()755 	bool foreground() {return ((stream *)next)->foreground(); };
756 	int written;
757 	bool decoded;
758    protected:
759 	void report(bool total, int written);
760    public:
a_output(const char * par,DATA_TYPE i)761 	a_output(const char *par, DATA_TYPE i) : a_io(par, i, T_NONE) {written = 0; decoded = false;};
762 };
763 
764 void
run()765 a_output::run()
766 {
767 	if (push_table[socket]) {
768 		push(socket);
769 		return;
770 	}
771 	int size = insize();
772 	if (!decoded) {
773 		decode();
774 		decoded = true;
775 	}
776 
777 	if (written) {
778 		int now_written = ywrite(socket, (char *)inb + written, size - written);
779 		if (now_written == -1) {
780 			if (errno == EAGAIN) {
781 				push(socket);
782 				return;
783 			}
784 			shriek(436, "data connection %d lost during writing", socket);
785 		}
786 		report(false, now_written);
787 		written += now_written;
788 	} else {
789 		written = ywrite(socket, (char *)inb, size);
790 		if (written) {
791 			if (written == -1) {
792 				if (errno == EAGAIN) {
793 					push(socket);
794 					return;
795 				}
796 				shriek(436, "data connection %d lost before writing", socket);
797 			}
798 			report(true, size);
799 			report(false, written);
800 		}
801 	}
802 	if (written == size) {
803 		written = 0;
804 		decoded = false;
805 		relax();
806 		finis(false);
807 	} else push(socket);
808 }
809 
810 void
report(bool total,int written)811 a_output::report(bool total, int written)
812 {
813 //	inb = NULL;
814 	if (foreground()) {
815 		reply(total ? "122 total bytes" : "123 written bytes");
816 		sprintf(scratch, " %d", written);
817 		sputs(scratch, cfg->get__sd_out());
818 		sputs("\r\n", cfg->get__sd_out());
819 	}
820 }
821 
822 
823 template <DATA_TYPE type> class oa_ascii : public a_output
824 {
insize()825 	virtual int insize() {
826 		return strlen((char *)inb);
827 	}
decode()828 	virtual void decode() {
829 		decode_string((char *)inb, this_lang->charset);
830 	}
831    public:
oa_ascii(const char * s)832 	oa_ascii(const char *s): a_output(s, type) {};
833 };
834 
835 //class oa_stml : public a_output
836 //{
837 //	virtual int insize() {
838 //		return strlen((char *)inb);
839 //	}
840 //   public:
841 //	oa_stml(const char *s): a_output(s, T_STML) {};
842 //};
843 
844 class oa_seg : public a_output
845 {
insize()846 	virtual int insize() {
847 		D_PRINT(1, "Sending %d segments\n", ((segment *)inb)->code);
848 		return ((segment *)inb)->code * sizeof(segment);
849 	}
850    public:
oa_seg(const char * s)851 	oa_seg(const char *s): a_output(s, T_SEGS) {};
852 };
853 
854 class oa_wavefm : public a_output
855 {
insize()856 	virtual int insize() {
857 		shriek(462, "abstract oa_wavefm::insize"); return 0;
858 	}
859 	virtual void run();
860 	virtual bool brk();
861 	bool attached;
862    public:
oa_wavefm(const char * s)863 	oa_wavefm(const char *s): a_output(s, T_WAVEFM) {attached = false;};
864 };
865 
866 void
run()867 oa_wavefm::run()
868 {
869 	wavefm *w = (wavefm *)inb;
870 
871 	if (!attached && !push_table[socket]) {
872 		w->attach(socket);
873 		report(false, w->written);
874 		attached = true;
875 	}
876 	bool to_do;
877 	while ((to_do = w->flush()) && w->written > 0) {
878 		report(false, w->written);
879 	}
880 
881 	if (to_do && w->written >= 0) push(socket);
882 	else {
883 		if (w->written == -1) {
884 			shriek(436, "data conn %d lost writing", socket);
885 		}
886 		if (w->written > 0) report(false, w->written);
887 		report(true, w->written_bytes());
888 		w->detach(socket);
889 		D_PRINT(1, "oa_wavefm wrote %d bytes\n", w->written_bytes());
890 		attached = false;
891 		delete w;
892 		inb = NULL;
893 		finis(false);
894 	}
895 }
896 
897 bool
brk()898 oa_wavefm::brk()
899 {
900 	if (inb) {
901 
902 		wavefm *w = (wavefm *)inb;
903 
904 		report(true, w->written_bytes());
905 		w->brk();
906 		if (attached) w->detach(socket);
907 		D_PRINT(1, "oa_wavefm wrote %d bytes\n", w->written_bytes());
908 		attached = false;
909 		relax();
910 	}
911 	finis(true);
912 	return true;
913 }
914 
915 
916 /*
917  *	A stream is a linked list of agents, one of them being the
918  *	stream agent itself. stream->head is an input agent.
919  */
920 
921 enum agent_type {AT_UNKNOWN, AT_CHUNK, AT_JOIN, AT_ASCII, AT_SSIF, AT_SEGS,
922 			AT_PRINT, AT_RULES, AT_STML, AT_SYN, AT_TRAD_SYNTH,
923 			AT_T_TEXT, AT_T_STML, AT_T_UNITS, AT_T_SSIF, AT_T_SEGS, AT_T_WAVEFM};
924 const char *agent_type_str = ":chunk:join:raw:dump:diphs:print:rules:stml:syn:synth:[t]:[s]:[i]:[p]:[d]:[w]:";
925 
make_agent(char * s,agent * preceding)926 agent *make_agent(char *s, agent *preceding)
927 {
928 	if (strchr("@#/.$", *s)) {
929 		if (!preceding) return new a_input(s);
930 		switch (preceding->out) {
931 			case T_TEXT:   return new oa_ascii<T_TEXT>(s);
932 			case T_STML:   return new oa_ascii<T_STML>(s);
933 			case T_UNITS: shriek(448, "Units are hard to output");
934 			case T_SSIF:   return new oa_ascii<T_SSIF>(s);
935 			case T_SEGS:  return new oa_seg(s);
936 			case T_WAVEFM: return new oa_wavefm(s);
937 			default: shriek(462, "unimplmd oa");
938 		}
939 	}
940 	switch ((agent_type)str2enum(s, agent_type_str, AT_UNKNOWN))
941 	{
942 		case AT_UNKNOWN: shriek(861, "Agent type bug.");
943 		case AT_ASCII: return new a_ascii;
944 		case AT_CHUNK: return new a_chunk;
945 		case AT_SSIF:  return new a_ssif;
946 		case AT_SEGS:  return new a_segs;
947 		case AT_JOIN:  return new a_join;
948 		case AT_PRINT: return new a_print;
949 		case AT_RULES: return new a_rules;
950 		case AT_STML:  return new a_stml;
951 		case AT_SYN:   return new a_syn;
952 		case AT_TRAD_SYNTH: return new a_synth;
953 
954 		case AT_T_TEXT:  return new a_type<T_TEXT>;
955 		case AT_T_STML:  return new a_type<T_STML>;
956 		case AT_T_UNITS: return new a_type<T_UNITS>;
957 		case AT_T_SSIF:  return new a_type<T_SSIF>;
958 		case AT_T_SEGS:  return new a_type<T_SEGS>;
959 		case AT_T_WAVEFM:return new a_type<T_WAVEFM>;
960 
961 		default:       shriek(415, "Unknown agent type %s", s); return NULL;
962 	}
963 }
964 
965 
stream(char * s,context * pc)966 stream::stream(char *s, context *pc) : agent(T_NONE, T_NONE)
967 {
968 	char *tmp;
969 	agent *a;
970 	agent *l = head = NULL;
971 
972 	callbk = NULL;
973 	c = pc;
974 
975 	tmp = strchr(s, LIST_DELIM);
976 	if (!tmp) shriek(415, "Bad stream syntax");
977 
978 	do {
979 		*tmp = 0;
980 		D_PRINT(1, "Making agent out of %s\n", s);
981 		try {
982 			a = make_agent(s, NULL); a->c = c;
983 		} catch (command_failed *e) {
984 			release_agents();
985 			throw e;
986 		}
987 		*tmp = LIST_DELIM;
988 		s = ++tmp;
989 		if (!l) head = a;
990 		else l->next = a;
991 		a->prev = l;
992 		l = a;
993 	} while((tmp = strchr(s, LIST_DELIM)));
994 	a = make_agent(s, a); a->c = c;
995 	l->next = a;
996 	a->prev = l;
997 	a->next = this;
998 	if (head->next != this) head->out = head->next->in;	/* adjust a_input type */
999 }
1000 
~stream()1001 stream::~stream()
1002 {
1003 	release_agents();
1004 //	delete c;
1005 }
1006 
1007 void
release_agents()1008 stream::release_agents()
1009 {
1010 	for (agent *a = head; a && a != this; ) {
1011 		agent *b = a;
1012 		a = a->next;
1013 		delete b;
1014 	}
1015 }
1016 
1017 void
apply(agent * ref,int bytes)1018 stream::apply(agent *ref, int bytes)
1019 {
1020 	D_PRINT(2, "In stream::apply %p %p %d\n", head, ref, bytes);
1021 	callbk = ref;
1022 	head->mktask(bytes);
1023 }
1024 
1025 bool
brk()1026 stream::brk()
1027 {
1028 	if (!callbk) return false;	/* break only if running */
1029 	for (agent *a = head; a && a != this; a = a->next)
1030 		a->brk();
1031 	reply("401 interrupted");
1032 	return true;
1033 }
1034 
1035 void
run()1036 stream::run()
1037 {
1038 	shriek(861, "scheduled a stream");
1039 }
1040 
1041 void
finis(bool err)1042 stream::finis(bool err)		// FIXME: simplify
1043 {
1044 	if (err) reply("191 finis recovery");
1045 	D_PRINT(2, "submitted a subtask\n");
1046 	for (agent *a = head; a != this; a = a->next) {
1047 		if (a->inb || a->pendin) {
1048 			D_PRINT(2, "more subtasks are pending\n");
1049 			if (err) {
1050 				a->relax();
1051 				D_PRINT(2, "subtasks discarded\n");
1052 				if  (callbk) callbk->schedule();
1053 				else shriek(862, "double fault - no callback");
1054 				callbk = NULL;
1055 				return;
1056 			}
1057 			return;
1058 		}
1059 	}
1060 	D_PRINT(2, "this has been the last subtask\n");
1061 	if (!err) reply("200 output OK");
1062 	if (callbk) callbk->schedule();
1063 	else shriek(862, "no callback");
1064 	callbk = NULL;
1065 }
1066 
1067 class a_disconnector : public agent
1068 {
1069 	virtual void run();
name()1070 	virtual const char *name() { return "disconnector"; };
1071 	a_protocol **to_delete;
1072 	int last;
1073 	int max;
1074    public:
1075 	void disconnect(a_protocol *);
1076 	a_disconnector();
1077 	virtual ~a_disconnector();
1078 };
1079 
a_disconnector()1080 a_disconnector::a_disconnector() : agent(T_NONE, T_NONE)
1081 {
1082 	to_delete = (a_protocol **)xmalloc(sizeof(void *));
1083 	last = 0;
1084 	max = 1;
1085 }
1086 
~a_disconnector()1087 a_disconnector::~a_disconnector()
1088 {
1089 	if (last) shriek(861, "Forgot to disconnect a protocol agent!");
1090 	free(to_delete);
1091 }
1092 
run()1093 void a_disconnector::run()
1094 {
1095 	if (!last) shriek(861, "Spurious disconnect");
1096 	delete to_delete[--last];
1097 	to_delete[last] = NULL;
1098 	D_PRINT(0, "Disconnect finished, last is %d\n", last);
1099 }
1100 
disconnect(a_protocol * moriturus)1101 void a_disconnector::disconnect(a_protocol *moriturus)
1102 {
1103 	if (last == max && ! (max & max - 1)) {
1104 		max <<= 1;
1105 		to_delete = (a_protocol **)xrealloc(to_delete, max * sizeof(void *));
1106 	}
1107 	to_delete[last++] = moriturus;
1108 	D_PRINT(0, "Disconnect requested, last is %d\n", last);
1109 	schedule();
1110 }
1111 
1112 a_disconnector disconnector;
1113 
a_protocol()1114 a_protocol::a_protocol() : agent(T_NONE, T_NONE)
1115 {
1116 	sgets_buff = get_text_cmd_buffer();
1117 	*sgets_buff = 0;
1118 	buffer = get_text_cmd_buffer();
1119 }
1120 
~a_protocol()1121 a_protocol::~a_protocol()
1122 {
1123 	free(sgets_buff);
1124 	free(buffer);
1125 }
1126 
run()1127 void a_protocol::run()
1128 {
1129 	int res;
1130 	res = sgets(buffer, cfg->max_net_cmd, cfg->get__sd_in(), sgets_buff);
1131 	if (res < 0) {
1132 		disconnect();
1133 		return;
1134 	}
1135 
1136 	encode_string(buffer, this_lang->charset, false);	// FIXME (alloc->true)
1137 
1138 	if ((int)strlen(buffer) >= cfg->max_net_cmd)
1139 		shriek(413, "Received command is too long");
1140 	if (res > 0 && *buffer) switch (run_command(buffer)) {
1141 		case PA_NEXT:
1142 			D_PRINT(0, "PA_NEXT\n");
1143 			if (strchr(sgets_buff, '\n')) schedule();
1144 			else block(cfg->get__sd_in());
1145 			return;
1146 		case PA_DONE:
1147 			D_PRINT(0, "PA_DONE\n");
1148 			disconnect();
1149 			return;
1150 		case PA_WAIT:
1151 			D_PRINT(0, "PA_WAIT\n");
1152 			return;
1153 		default:
1154 			shriek(861, "Bad protocol action\n");
1155 	}
1156 	block(cfg->get__sd_in());		/* partial line read */
1157 
1158 //	leave_context(i);
1159 
1160 //	non-blocking get_line etc.
1161 }
1162 
a_ttscp(socky int _sd_in,socky int _sd_out)1163 a_ttscp::a_ttscp(socky int _sd_in, socky int _sd_out) : a_protocol()
1164 {
1165 	c = new context(_sd_in, _sd_out);
1166 	c->enter();
1167 
1168 	handle = (char *)malloc(cfg->handle_size + 1);
1169 	do make_rnd_passwd(handle, cfg->handle_size);
1170 		while (data_conns->translate(handle));
1171 	ctrl_conns->add(handle, this);
1172 
1173 	sputs(
1174 		"TTSCP spoken here\r\n"
1175 		"protocol: 0\r\n"
1176 		"extensions:\r\n"
1177 		"server: Epos\r\n"
1178 		"release: " VERSION "\r\n"
1179 		"handle: ", cfg->get__sd_out());
1180 	sputs(		handle, cfg->get__sd_out());
1181 	sputs(	"\r\n", cfg->get__sd_out());
1182 	ctrl = NULL;
1183 	deps = new hash_table<char, a_ttscp>(4);
1184 	deps->dupdata = deps->dupkey = false;
1185 	c->leave();
1186 	block(_sd_in);
1187 }
1188 
1189 /*
1190  *	Warning: the following destructor runs in the master context;
1191  *	therefore, be careful with using cfg etc.
1192  */
1193 
~a_ttscp()1194 a_ttscp::~a_ttscp()
1195 {
1196 	c->enter();
1197 	if (cfg->current_stream) delete cfg->current_stream;
1198 	cfg->current_stream = NULL;
1199 	D_PRINT(2, "deleted context closes fd %d and %d\n", cfg->get__sd_in(), cfg->get__sd_out());
1200 	c->leave();
1201 	while (deps->items) {
1202 		a_ttscp *tmp = deps->translate(deps->get_random());
1203 		deps->remove(tmp->handle);
1204 		delete data_conns->remove(tmp->handle);
1205 	}
1206 	delete deps;
1207 	c->enter();
1208 	if (cfg->get__sd_in() != -1)
1209 		close_and_invalidate(cfg->get__sd_in());
1210 	if (cfg->get__sd_out() != -1 && cfg->get__sd_out() != cfg->get__sd_in())
1211 		close_and_invalidate(cfg->get__sd_out());
1212 	if (data_conns->translate(handle) || ctrl_conns->translate(handle))
1213 		shriek(862, "Forgot to forget a_ttscp");
1214 
1215 	free(handle);
1216 	c->leave();
1217 	delete c;
1218 	// close the descriptor? no, the ~context does that
1219 }
1220 
1221 bool
brk()1222 a_ttscp::brk()
1223 {
1224 	if (c->config->current_stream)
1225 		return c->config->current_stream->brk();
1226 	return false;
1227 }
1228 
1229 int
run_command(char * cmd)1230 a_ttscp::run_command(char *cmd)
1231 {
1232 	char *keyword;
1233 	char *param;
1234 
1235 	D_PRINT(2, "[ cmd] %s\n", cmd);
1236 
1237 	keyword = cmd + strspn (cmd, WHITESPACE);
1238 	param = keyword + strcspn(keyword, WHITESPACE);
1239 	if (param - keyword != 4) goto bad;	/* all cmds are 4 chars */
1240 
1241 	if (!*param) param = NULL;
1242 	else *param++ = 0;
1243 
1244 	if (param) param += strspn(param, WHITESPACE);
1245 
1246 	int i;
1247 	for (i=0; ttscp_cmd_set[i].name &&
1248 		(*(const int *)keyword != *(const int *)&ttscp_cmd_set[i].name);)
1249 			i++;
1250 	if (!ttscp_cmd_set[i].name) goto bad;
1251 
1252 	if (!param && ttscp_cmd_set[i].param == PAR_REQ) {
1253 		reply("417 parameter missing");
1254 		return PA_NEXT;
1255 	}
1256 	if (param && ttscp_cmd_set[i].param == PAR_FORBIDDEN) {
1257 		reply("416 parameter not allowed");
1258 		return PA_NEXT;
1259 	}
1260 
1261 	try {
1262 		return ttscp_cmd_set[i].impl(param, this);
1263 	} catch (command_failed *e) {
1264 		D_PRINT(2, "Command failed, %d, %.60s\n", e->code, e->msg);
1265 		reply(e->code, e->msg);
1266 		delete e;
1267 		return PA_NEXT;
1268 	} catch (connection_lost *d) {
1269 		D_PRINT(2, "Releasing a TTSCP control connection, %d, %.60s\n", d->code, d->msg);
1270 		reply(d->code, d->msg);		/* just in case */
1271 		reply(201, fmt("debug %d", cfg->get__sd_in()));
1272 		delete d;
1273 		return PA_DONE;
1274 	}
1275 
1276    bad:
1277 	cmd_bad(cmd);
1278 	return PA_NEXT;
1279 }
1280 
1281 void
disconnect()1282 a_ttscp::disconnect()
1283 {
1284 	D_PRINT(2, "ctrl conn %d lost\n", cfg->get__sd_in());
1285 	if (this != ctrl_conns->remove(handle) /* && this != data_conns->remove(handle) */ )
1286 		shriek(862, "Failed to disconnect a ctrl connection");
1287 	disconnector.disconnect(this);
1288 }
1289 
make_nonblocking(int f)1290 void make_nonblocking(int f)
1291 {
1292 #ifdef HAVE_WINSOCK
1293 	ioctlsocket((unsigned long int)f, FIONBIO, (unsigned long int *)&make_nonblocking);	// &make_nonblocking is a dummy non-NULL pointer
1294 #else
1295 	fcntl(f, F_SETFL, O_NONBLOCK);
1296 #endif
1297 }
1298 
1299 #ifndef HAVE_GETHOSTNAME
gethostname(char * b,size_t)1300 int gethostname(char *b, size_t)
1301 {
1302 	strcpy(b, "localhost");
1303 	return 0;
1304 }
1305 #endif
1306 
a_accept()1307 a_accept::a_accept() : agent(T_NONE, T_NONE)
1308 {
1309 	static sockaddr_in sa;
1310 
1311 	char one = 1;
1312 
1313 //	c = new context(-1, /*** dark errors ***/ DARK_ERRLOG);	//FIXME
1314 //	c->enter();
1315 
1316 	listening = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1317 	memset(&sa, 0, sizeof(sa));
1318 	gethostname(scratch, scfg->scratch_size - 1);
1319 	sa.sin_family = AF_INET;
1320 	sa.sin_addr.s_addr = htonl(scfg->local_only ? INADDR_LOOPBACK : INADDR_ANY);
1321 	sa.sin_port = htons(scfg->listen_port);
1322 	setsockopt(listening, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
1323 	D_PRINT(3, "* Binding to the TTSCP port %d.\n", scfg->listen_port);
1324 	if (bind(listening, (sockaddr *)&sa, sizeof (sa))) shriek(871, "Could not bind");
1325 	if (listen(listening, 64)) shriek(871, "Could not listen");
1326 	make_nonblocking(listening);
1327 
1328 	ia.sin_family = AF_INET;
1329 	ia.sin_addr.s_addr = htonl(INADDR_ANY);
1330 	ia.sin_port = 0;
1331 
1332 	block(listening);
1333 //	c->leave();
1334 }
1335 
~a_accept()1336 a_accept::~a_accept()
1337 {
1338 	close (listening);
1339 //	close (c->config->_sd_in);
1340 //	delete c;
1341 }
1342 
1343 void
run()1344 a_accept::run()
1345 {
1346 	static socklen_t sia = sizeof(sockaddr);	// Will __QNX__ complain?
1347 	int f = accept(listening, (sockaddr *)&ia, &sia);
1348 	if (f == -1) {
1349 //		shriek(871, "Cannot accept() - network problem (errno %d)", errno);
1350 		D_PRINT(3, "Cannot accept() - errno %d! Madly looping.\n", errno);
1351 		if (errno != EAGAIN) schedule();
1352 		return;
1353 	}
1354 	make_nonblocking(f);
1355 	D_PRINT(2, "Accepted %d (on %d).\n", f, listening);
1356 //	c->leave();
1357 	unuse(new a_ttscp(f, f));
1358 //	c->enter();
1359 	block(listening);
1360 }
1361 
1362 struct sched_aq
1363 {
1364 	agent *ag;
1365 	sched_aq *next;
1366 	sched_aq *prev;
1367 
1368 	void *operator new(size_t size);
1369 	void operator delete(void *ptr);
1370 };
1371 
1372 SLABIFY(sched_aq, sched_aq_slab, 341, shutdown_sched_aq);
1373 
1374 sched_aq *sched_head = NULL;
1375 sched_aq *sched_tail = NULL;
1376 
1377 int runnable_agents = 0;
1378 
1379 void
schedule()1380 agent::schedule()
1381 {
1382 	if (!this) shriek(862, "scheduling garbage");
1383 	runnable_agents++;
1384 	D_PRINT(1, "%d runnable agents\n", runnable_agents);
1385 	sched_aq *tmp = new sched_aq;
1386 	tmp->ag = this;
1387 	tmp->prev = NULL;
1388 	tmp->next = sched_head;
1389 	if (sched_head) sched_head->prev = tmp;
1390 	else sched_tail = tmp;
1391 	sched_head = tmp;
1392 }
1393 
sched_sel()1394 agent *sched_sel()
1395 {
1396 	agent *r;
1397 	sched_aq *tmp;
1398 	if (!sched_tail) shriek(862, "agent queue empty");
1399 	if (!sched_tail->prev) sched_head = NULL;
1400 	else sched_tail->prev->next = NULL;
1401 	r = sched_tail->ag;
1402 	tmp = sched_tail;
1403 	sched_tail = sched_tail->prev;
1404 	delete tmp;
1405 	runnable_agents--;
1406 	D_PRINT(1, "Agent %s\n", r->name());
1407 	return r;
1408 }
1409 
shutdown_agent_queue()1410 void shutdown_agent_queue()
1411 {
1412 	for (sched_aq *tmp = sched_head; tmp; tmp = tmp->next)
1413 		delete tmp;
1414 }
1415 
1416 agent **block_table = (agent **)xmalloc(1);
1417 agent **push_table = (agent **)xmalloc(1);
1418 fd_set block_set;
1419 fd_set push_set;
1420 socky int select_fd_max = 0;
1421 
stretch_sleep_tables(socky int fd)1422 void stretch_sleep_tables(socky int fd)
1423 {
1424 	if (select_fd_max <= fd) {
1425 		block_table = (agent **)xrealloc(block_table, (fd + 1) * sizeof(agent *));
1426 		push_table = (agent **)xrealloc(push_table, (fd + 1) * sizeof(agent *));
1427 		for ( ; select_fd_max <= fd; select_fd_max++) {
1428 			block_table[select_fd_max] = NULL;
1429 			push_table[select_fd_max] = NULL;
1430 		}
1431 	}
1432 }
1433 
1434 /*
1435  *	agent::run() should return after calling block() or push()
1436  */
1437 
1438 void
block(socky int fd)1439 agent::block(socky int fd)
1440 {
1441 	D_PRINT(1, "Sleeping on %d\n", fd);
1442 	stretch_sleep_tables(fd);
1443 	if (block_table[fd]) {
1444 		agent *a;
1445 
1446 		if (this == block_table[fd])
1447 			shriek(861, "Resleeping on %d", fd);
1448 		if (!FD_ISSET(fd, &block_set))
1449 			shriek(861, "Countersleeping on %d", fd);
1450 		for (a = block_table[fd]; a->dep; a = a->dep) ;
1451 		a->dep = this;
1452 	} else {
1453 		block_table[fd] = this;
1454 		FD_SET(fd, &block_set);
1455 	}
1456 	return;
1457 }
1458 
1459 void
push(socky int fd)1460 agent::push(socky int fd)
1461 {
1462 	D_PRINT(1, "Pushing on %d\n", fd);
1463 	stretch_sleep_tables(fd);
1464 	if (push_table[fd]) {
1465 		agent *a;
1466 
1467 		if (this == push_table[fd])
1468 			shriek(861, "Resleeping on %d", fd);
1469 		if (!FD_ISSET(fd, &push_set))
1470 			shriek(861, "Countersleeping on %d", fd);
1471 		for (a = push_table[fd]; a->dep; a = a->dep) ;
1472 		a->dep = this;
1473 	} else {
1474 		push_table[fd] = this;
1475 		FD_SET(fd, &push_set);
1476 	}
1477 	return;
1478 }
1479 
1480