1 /* Part of XPCE --- The SWI-Prolog GUI toolkit
2
3 Author: Jan Wielemaker and Anjo Anjewierden
4 E-mail: jan@swi.psy.uva.nl
5 WWW: http://www.swi.psy.uva.nl/projects/xpce/
6 Copyright (c) 1985-2002, University of Amsterdam
7 All rights reserved.
8
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions
11 are met:
12
13 1. Redistributions of source code must retain the above copyright
14 notice, this list of conditions and the following disclaimer.
15
16 2. Redistributions in binary form must reproduce the above copyright
17 notice, this list of conditions and the following disclaimer in
18 the documentation and/or other materials provided with the
19 distribution.
20
21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 POSSIBILITY OF SUCH DAMAGE.
33 */
34
35 #include <md.h> /* get HAVE_'s */
36
37 #if defined(HAVE_SOCKET) || defined(HAVE_WINSOCK) || defined(HAVE_FORK)
38
39 #ifdef HAVE_WINSOCK
40 #include "mswinsock.h"
41 #define StreamError() SockError()
42 #else
43 #define StreamError() OsError()
44 #endif
45
46 #include <h/kernel.h>
47
48 #include <h/unix.h>
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52 #ifdef HAVE_SYS_TIME_H
53 #include <sys/time.h>
54 #endif
55
56 static status recordSeparatorStream(Stream s, Any sep);
57
58 #define OsError() getOsErrorPce(PCE)
59
60 status
initialiseStream(Stream s,Int rfd,Int wfd,Code input,Any sep)61 initialiseStream(Stream s, Int rfd, Int wfd, Code input, Any sep)
62 { s->rdfd = s->wrfd = -1;
63 s->ws_ref = 0;
64 s->input_buffer = NULL;
65 s->input_allocated = s->input_p = 0;
66
67 if ( isDefault(rfd) ) rfd = NIL;
68 if ( isDefault(wfd) ) wfd = NIL;
69 if ( isDefault(input) ) input = NIL;
70 if ( isDefault(sep) ) sep = newObject(ClassRegex, CtoName("\n"), EAV);
71
72 if ( notNil(rfd) ) s->rdfd = valInt(rfd);
73 if ( notNil(wfd) ) s->wrfd = valInt(wfd);
74
75 assign(s, input_message, input);
76 recordSeparatorStream(s, sep);
77
78 succeed;
79 }
80
81
82 static status
unlinkStream(Stream s)83 unlinkStream(Stream s)
84 { return closeStream(s);
85 }
86
87 /*******************************
88 * OPEN/CLOSE *
89 *******************************/
90
91
92 status
closeStream(Stream s)93 closeStream(Stream s)
94 { closeOutputStream(s);
95 closeInputStream(s);
96
97 ws_close_stream(s);
98
99 succeed;
100 }
101
102
103 status
closeInputStream(Stream s)104 closeInputStream(Stream s)
105 { if ( s->rdfd >= 0 )
106 { DEBUG(NAME_stream, Cprintf("%s: Closing input\n", pp(s)));
107
108 ws_close_input_stream(s);
109 s->rdfd = -1;
110
111 if ( s->input_buffer )
112 { pceFree(s->input_buffer);
113 s->input_buffer = NULL;
114 }
115 }
116
117 succeed;
118 }
119
120
121 status
closeOutputStream(Stream s)122 closeOutputStream(Stream s)
123 { if ( s->wrfd >= 0 )
124 { int input_too = (s->wrfd == s->rdfd);
125
126 DEBUG(NAME_stream, Cprintf("%s: Closing output\n", pp(s)));
127
128 ws_close_output_stream(s);
129 s->wrfd = -1;
130 if ( input_too )
131 closeInputStream(s);
132 }
133
134 succeed;
135 }
136
137
138 status
inputStream(Stream s,Int fd)139 inputStream(Stream s, Int fd)
140 { if ( notDefault(fd) )
141 { if ( isNil(fd) )
142 closeInputStream(s);
143 else
144 s->rdfd = valInt(fd); /* Unix only! */
145 }
146
147 /*if ( notNil(s->input_message) )*/
148 ws_input_stream(s);
149
150 succeed;
151 }
152
153
154 /*******************************
155 * HANDLE INPUT *
156 *******************************/
157
158
159 #define BLOCKSIZE 1024
160 #define ALLOCSIZE 1024
161
162 #define Round(n, r) (((n) + (r) - 1) & ~((r)-1))
163
164 void
add_data_stream(Stream s,char * data,int len)165 add_data_stream(Stream s, char *data, int len)
166 { char *q;
167
168 if ( !s->input_buffer )
169 { s->input_allocated = Round(len+1, ALLOCSIZE);
170 s->input_buffer = pceMalloc(s->input_allocated);
171 s->input_p = 0;
172 } else if ( s->input_p + len >= s->input_allocated )
173 { s->input_allocated = Round(s->input_p + len + 1, ALLOCSIZE);
174 s->input_buffer = pceRealloc(s->input_buffer, s->input_allocated);
175 }
176
177 q = (char *)&s->input_buffer[s->input_p];
178 memcpy(q, data, len);
179 s->input_p += len;
180 }
181
182
183 static void
write_byte(int byte)184 write_byte(int byte)
185 { if ( byte < 32 || (byte >= 127 && byte < 128+32) || byte == 255 )
186 { char buf[10];
187 char *prt = buf;
188
189 switch(byte)
190 { case '\t':
191 prt = "\\t";
192 break;
193 case '\r':
194 prt = "\\r";
195 break;
196 case '\n':
197 prt = "\\n";
198 break;
199 case '\b':
200 prt = "\\b";
201 break;
202 default:
203 sprintf(buf, "<%d>", byte);
204 }
205
206 Cprintf("%s", prt);
207 } else
208 Cputchar(byte);
209 }
210
211
212 static void
write_buffer(char * buf,int size)213 write_buffer(char *buf, int size)
214 { if ( size > 50 )
215 { write_buffer(buf, 25);
216 Cprintf(" ... ");
217 write_buffer(buf + size - 25, 25);
218 } else
219 { int n;
220
221 for(n=0; n<size; n++)
222 { write_byte(buf[n]);
223 }
224 }
225 }
226
227
228 static void
dispatch_stream(Stream s,int size,int discard)229 dispatch_stream(Stream s, int size, int discard)
230 { string q;
231 AnswerMark mark;
232 Any str;
233
234 assert(size <= s->input_p);
235
236 markAnswerStack(mark);
237 str_set_n_ascii(&q, size, (char *)s->input_buffer);
238 str = StringToString(&q);
239 if ( discard )
240 { pceFree(s->input_buffer);
241 s->input_buffer = NULL;
242 s->input_allocated = s->input_p = 0;
243 } else
244 { memcpy((char *)s->input_buffer,
245 (char *)&s->input_buffer[size],
246 s->input_p - size);
247 s->input_p -= size;
248 }
249
250 DEBUG(NAME_input,
251 { int n = valInt(getSizeCharArray(str));
252
253 Cprintf("Sending: %d characters, `", n);
254 write_buffer(strName(str), n);
255 Cprintf("'\n\tLeft: %d characters, `", s->input_p);
256 write_buffer((char *)s->input_buffer, s->input_p);
257 Cprintf("'\n");
258 });
259
260 if ( notNil(s->input_message) )
261 { addCodeReference(s);
262 assert(isProperObject(s));
263 forwardReceiverCodev(s->input_message, s, 1, &str);
264 assert(isProperObject(s));
265 delCodeReference(s);
266 }
267
268 rewindAnswerStack(mark, NIL);
269 }
270
271
272 static void
dispatch_input_stream(Stream s)273 dispatch_input_stream(Stream s)
274 { while( !onFlag(s, F_FREED|F_FREEING) && s->input_buffer && s->input_p > 0 )
275 { if ( isNil(s->record_separator) )
276 { dispatch_stream(s, s->input_p, TRUE);
277
278 return;
279 }
280
281 if ( isInteger(s->record_separator) )
282 { int bsize = valInt(s->record_separator);
283
284 if ( bsize <= s->input_p )
285 { dispatch_stream(s, bsize, FALSE);
286 continue;
287 }
288
289 return;
290 }
291
292 if ( instanceOfObject(s->record_separator, ClassRegex) )
293 { Regex re = s->record_separator;
294 string str;
295
296 str_set_n_ascii(&str, s->input_p, (char *)s->input_buffer);
297 if ( search_string_regex(re, &str) )
298 { int size = valInt(getRegisterEndRegex(s->record_separator, ZERO));
299
300 dispatch_stream(s, size, FALSE);
301 continue;
302 }
303 }
304
305 return;
306 }
307 }
308
309
310
311 status
handleInputStream(Stream s)312 handleInputStream(Stream s)
313 { char buf[BLOCKSIZE];
314 int n;
315
316 if ( onFlag(s, F_FREED|F_FREEING) )
317 fail;
318
319 if ( (n = ws_read_stream_data(s, buf, BLOCKSIZE, DEFAULT)) > 0 )
320 { if ( isNil(s->input_message) ) /* modal */
321 add_data_stream(s, buf, n);
322 else if ( isNil(s->record_separator) && !s->input_buffer )
323 { string q;
324 Any str;
325 AnswerMark mark;
326 markAnswerStack(mark);
327
328 DEBUG(NAME_input,
329 { Cprintf("Read (%d chars, unbuffered): `", n);
330 write_buffer(buf, n);
331 Cprintf("'\n");
332 });
333
334 str_set_n_ascii(&q, n, buf);
335 str = StringToString(&q);
336 addCodeReference(s);
337 forwardReceiverCodev(s->input_message, s, 1, &str);
338 delCodeReference(s);
339
340 rewindAnswerStack(mark, NIL);
341 } else
342 { add_data_stream(s, buf, n);
343
344 DEBUG(NAME_input,
345 { Cprintf("Read (%d chars): `", n);
346 write_buffer((char *)&s->input_buffer[s->input_p-n], n);
347 Cprintf("'\n");
348 });
349
350 dispatch_input_stream(s);
351 }
352 } else if ( n != -2 ) /* Win 9x errornous WSAEWOULDBLOCK */
353 {
354 DEBUG(NAME_stream,
355 if ( n < 0 )
356 Cprintf("Read failed: %s\n", strName(StreamError()));
357 else
358 Cprintf("%s: Got 0 characters: EOF\n", pp(s));
359 );
360 send(s, NAME_closeInput, EAV);
361 send(s, NAME_endOfFile, EAV);
362 }
363
364 succeed;
365 }
366
367
368 /*******************************
369 * OUTPUT HANDLING *
370 *******************************/
371
372 static status
appendStream(Stream s,CharArray data)373 appendStream(Stream s, CharArray data)
374 { PceString str = &data->data;
375 int l = str_datasize(str);
376
377 return ws_write_stream_data(s, str->s_text, l);
378 }
379
380
381 static status
newlineStream(Stream s)382 newlineStream(Stream s)
383 { static char nl[] = "\n";
384
385 return ws_write_stream_data(s, nl, 1);
386 }
387
388
389 static status
appendLineStream(Stream s,CharArray data)390 appendLineStream(Stream s, CharArray data)
391 { if ( !appendStream(s, data) ||
392 !newlineStream(s) )
393 fail;
394
395 succeed;
396 }
397
398
399 static status
formatStream(Stream s,CharArray fmt,int argc,Any * argv)400 formatStream(Stream s, CharArray fmt, int argc, Any *argv)
401 { string tmp;
402 status rc;
403
404 str_writefv(&tmp, fmt, argc, argv);
405 if ( isstrA(&tmp) )
406 { rc = ws_write_stream_data(s, tmp.s_textA, tmp.s_size);
407 } else
408 { Cprintf("TBD: wide characters in stream->format");
409
410 rc = FALSE;
411 }
412
413 str_unalloc(&tmp);
414
415 return rc;
416 }
417
418
419 static status
waitStream(Stream s)420 waitStream(Stream s)
421 { while( s->rdfd >= 0 )
422 dispatchDisplayManager(TheDisplayManager(), DEFAULT, DEFAULT);
423
424 succeed;
425 }
426
427 /*******************************
428 * INPUT HANDLING *
429 *******************************/
430
431 static StringObj
getReadLineStream(Stream s,Real timeout)432 getReadLineStream(Stream s, Real timeout)
433 { unsigned long epoch, tmo, left;
434 int use_timeout;
435
436 if ( instanceOfObject(timeout, ClassReal) )
437 { double v = valReal(timeout);
438
439 if ( v < 0.0 )
440 answer((StringObj)NIL);
441
442 epoch = mclock();
443 tmo = (unsigned long)(v * 1000.0);
444 use_timeout = TRUE;
445 } else
446 { use_timeout = FALSE;
447 epoch = tmo = 0L; /* keep compiler happy */
448 }
449
450 while(s->rdfd >= 0)
451 { if ( s->input_buffer )
452 { unsigned char *q;
453 int n;
454
455 DEBUG(NAME_stream, Cprintf("Scanning %d chars\n", s->input_p));
456 for(n=s->input_p, q = s->input_buffer; n > 0; n--, q++)
457 { if ( *q == '\n' )
458 { string str;
459 size_t len = (q-s->input_buffer)+1;
460 StringObj rval;
461
462 str_set_n_ascii(&str, len, (char *)s->input_buffer);
463 rval = StringToString(&str);
464 memmove((char *)s->input_buffer,
465 (char *)&s->input_buffer[len], s->input_p - len);
466 s->input_p -= len;
467
468 return rval;
469 }
470 }
471 DEBUG(NAME_stream, Cprintf("No newline, reading\n"));
472 }
473
474 if ( use_timeout )
475 { unsigned long now = mclock();
476
477 if ( now - epoch > tmo )
478 answer((StringObj)NIL);
479 left = tmo - (now - epoch);
480 } else
481 left = 0; /* keep compiler happy */
482
483 if ( !ws_dispatch(DEFAULT, use_timeout ? toInt(left) : NIL) )
484 return (StringObj) NIL;
485 }
486
487 fail;
488 }
489
490
491 static status
endOfFileStream(Stream s)492 endOfFileStream(Stream s)
493 { DEBUG(NAME_stream, Cprintf("Stream %s: end of output\n", pp(s)));
494
495 succeed;
496 }
497
498
499 static status
recordSeparatorStream(Stream s,Any re)500 recordSeparatorStream(Stream s, Any re)
501 { if ( s->record_separator != re )
502 { if ( isInteger(re) && valInt(re) > STR_MAX_SIZE )
503 return errorPce(s, NAME_maxRecordSize, toInt(STR_MAX_SIZE));
504
505 assign(s, record_separator, re);
506
507 if ( instanceOfObject(re, ClassRegex) )
508 compileRegex(re, ON);
509
510 dispatch_input_stream(s); /* handle possible pending data */
511 }
512
513 succeed;
514 }
515
516
517 static status
inputMessageStream(Stream s,Code msg)518 inputMessageStream(Stream s, Code msg)
519 { if ( s->input_message != msg )
520 { Code old = s->input_message;
521
522 assign(s, input_message, msg);
523 if ( isNil(old) && notNil(msg) )
524 { ws_input_stream(s);
525 } else if ( notNil(old) && isNil(msg) )
526 { ws_no_input_stream(s);
527 }
528 }
529
530 succeed;
531 }
532
533
534 /*******************************
535 * AS FILE *
536 *******************************/
537
538 static status
writeAsFileStream(Stream s,Int where,CharArray txt)539 writeAsFileStream(Stream s, Int where, CharArray txt)
540 { if ( notDefault(where) )
541 return errorPce(s, NAME_cannotSeekNonFile);
542
543 return appendStream(s, txt);
544 }
545
546
547 /*******************************
548 * CLASS DECLARATION *
549 *******************************/
550
551 /* Type declarations */
552
553 static char *T_format[] =
554 { "format=char_array", "argument=any ..." };
555 static char *T_initialise[] =
556 { "rfd=[int]", "wfd=[int]",
557 "input_message=[code]", "record_separator=[regex|int]" };
558 static char *T_writeAsFile[] =
559 { "at=[int]", "text=char_array" };
560
561 /* Instance Variables */
562
563 #define var_stream XPCE_var_stream /* AIX 3.2.5 conflict */
564
565 static vardecl var_stream[] =
566 { SV(NAME_inputMessage, "code*", IV_GET|IV_STORE,
567 inputMessageStream,
568 NAME_input, "Forwarded on input from the stream"),
569 SV(NAME_recordSeparator, "regex|int*", IV_GET|IV_STORE,
570 recordSeparatorStream,
571 NAME_input, "Regex that describes the record separator"),
572 IV(NAME_wrfd, "alien:int", IV_NONE,
573 NAME_internal, "File-handle to write to stream"),
574 IV(NAME_rdfd, "alien:int", IV_NONE,
575 NAME_internal, "File-handle to read from stream"),
576 IV(NAME_rdstream, "alien:FILE *", IV_NONE,
577 NAME_internal, "Stream used for <-read_line"),
578 IV(NAME_wsRef, "alien:WsRef", IV_NONE,
579 NAME_internal, "Window system synchronisation"),
580 IV(NAME_inputBuffer, "alien:char *", IV_NONE,
581 NAME_internal, "Buffer for collecting input-data"),
582 IV(NAME_inputAllocated, "alien:int", IV_NONE,
583 NAME_internal, "Allocated size of input_buffer"),
584 IV(NAME_inputP, "alien:int", IV_NONE,
585 NAME_internal, "Number of characters in input_buffer")
586 };
587
588 /* Send Methods */
589
590 static senddecl send_stream[] =
591 { SM(NAME_initialise, 4, T_initialise, initialiseStream,
592 DEFAULT, "Create stream"),
593 SM(NAME_unlink, 0, NULL, unlinkStream,
594 DEFAULT, "Cleanup stream"),
595 SM(NAME_wait, 0, NULL, waitStream,
596 NAME_control, "Wait for the complete output"),
597 SM(NAME_endOfFile, 0, NULL, endOfFileStream,
598 NAME_input, "Send when end-of-file is reached"),
599 SM(NAME_closeInput, 0, NULL, closeInputStream,
600 NAME_open, "Close input section of stream"),
601 SM(NAME_closeOutput, 0, NULL, closeOutputStream,
602 NAME_open, "Close output section of stream"),
603 SM(NAME_input, 1, "fd=[int]*", inputStream,
604 NAME_open, "Enable input from file-descriptor"),
605 SM(NAME_append, 1, "data=char_array", appendStream,
606 NAME_output, "Send data to stream"),
607 SM(NAME_appendLine, 1, "data=char_array", appendLineStream,
608 NAME_output, "->append and ->newline"),
609 SM(NAME_format, 2, T_format, formatStream,
610 NAME_output, "Format arguments and send to stream"),
611 SM(NAME_newline, 0, NULL, newlineStream,
612 NAME_output, "Send a newline to the stream"),
613 SM(NAME_writeAsFile, 2, T_writeAsFile, writeAsFileStream,
614 NAME_stream, "Allow pce_open(Socket, append, Stream)")
615 };
616
617 /* Get Methods */
618
619 static getdecl get_stream[] =
620 { GM(NAME_readLine, 1, "string*", "timeout=[real]", getReadLineStream,
621 NAME_input, "Read line with optional timeout (seconds)")
622 };
623
624 /* Resources */
625
626 #define rc_stream NULL
627 /*
628 static classvardecl rc_stream[] =
629 {
630 };
631 */
632
633 /* Class Declaration */
634
635 ClassDecl(stream_decls,
636 var_stream, send_stream, get_stream, rc_stream,
637 0, NULL,
638 "$Rev$");
639
640 status
makeClassStream(Class class)641 makeClassStream(Class class)
642 { return declareClass(class, &stream_decls);
643 }
644
645 #else /*O_NO_PROCESS && O_NO_SOCKET*/
646
647 /*******************************
648 * CLASS DECLARATION *
649 *******************************/
650
651 /* Type declarations */
652
653
654 /* Instance Variables */
655
656 static vardecl var_stream[] =
657 { IV(NAME_inputMessage, "code*", IV_BOTH,
658 NAME_input, "Forwarded on input from the stream"),
659 IV(NAME_recordSeparator, "regex|int*", IV_GET,
660 NAME_input, "Regex that describes the record separator"),
661 IV(NAME_wrfd, "alien:int", IV_NONE,
662 NAME_internal, "File-handle to write to stream"),
663 IV(NAME_rdfd, "alien:int", IV_NONE,
664 NAME_internal, "File-handle to read from stream"),
665 IV(NAME_rdstream, "alien:FILE *", IV_NONE,
666 NAME_internal, "Stream used for <-read_line"),
667 IV(NAME_inputBuffer, "alien:char *", IV_NONE,
668 NAME_internal, "Buffer for collecting input-data"),
669 IV(NAME_inputAllocated, "alien:int", IV_NONE,
670 NAME_internal, "Allocated size of input_buffer"),
671 IV(NAME_inputP, "alien:int", IV_NONE,
672 NAME_internal, "Number of characters in input_buffer"),
673 IV(NAME_wsRef, "alien:WsRef", IV_NONE,
674 NAME_internal, "Window System synchronisation")
675 };
676
677 /* Send Methods */
678
679 #define send_stream NULL
680 /*
681 static senddecl send_stream[] =
682 {
683 };
684 */
685
686 /* Get Methods */
687
688 #define get_stream NULL
689 /*
690 static getdecl get_stream[] =
691 {
692 };
693 */
694
695 /* Resources */
696
697 #define rc_stream NULL
698 /*
699 static classvardecl rc_stream[] =
700 {
701 };
702 */
703
704 /* Class Declaration */
705
706 ClassDecl(stream_decls,
707 var_stream, send_stream, get_stream, rc_stream,
708 0, NULL,
709 "$Rev$");
710
711 status
makeClassStream(Class class)712 makeClassStream(Class class)
713 { return declareClass(class, &stream_decls);
714 }
715
716 #endif /*O_NO_PROCESS && O_NO_SOCKET*/
717
718