1 /*  Part of XPCE --- The SWI-Prolog GUI toolkit
2 
3     Author:        Jan Wielemaker and Anjo Anjewierden
4     E-mail:        jan@swi.psy.uva.nl
5     WWW:           http://www.swi.psy.uva.nl/projects/xpce/
6     Copyright (c)  1985-2002, University of Amsterdam
7     All rights reserved.
8 
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions
11     are met:
12 
13     1. Redistributions of source code must retain the above copyright
14        notice, this list of conditions and the following disclaimer.
15 
16     2. Redistributions in binary form must reproduce the above copyright
17        notice, this list of conditions and the following disclaimer in
18        the documentation and/or other materials provided with the
19        distribution.
20 
21     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22     "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23     LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24     FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25     COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26     INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27     BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28     LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30     LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31     ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32     POSSIBILITY OF SUCH DAMAGE.
33 */
34 
35 #include <md.h>				/* get HAVE_'s */
36 
37 #if defined(HAVE_SOCKET) || defined(HAVE_WINSOCK) || defined(HAVE_FORK)
38 
39 #ifdef HAVE_WINSOCK
40 #include "mswinsock.h"
41 #define StreamError() SockError()
42 #else
43 #define StreamError() OsError()
44 #endif
45 
46 #include <h/kernel.h>
47 
48 #include <h/unix.h>
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52 #ifdef HAVE_SYS_TIME_H
53 #include <sys/time.h>
54 #endif
55 
56 static status recordSeparatorStream(Stream s, Any sep);
57 
58 #define OsError() getOsErrorPce(PCE)
59 
60 status
initialiseStream(Stream s,Int rfd,Int wfd,Code input,Any sep)61 initialiseStream(Stream s, Int rfd, Int wfd, Code input, Any sep)
62 { s->rdfd = s->wrfd = -1;
63   s->ws_ref = 0;
64   s->input_buffer = NULL;
65   s->input_allocated = s->input_p = 0;
66 
67   if ( isDefault(rfd) )   rfd = NIL;
68   if ( isDefault(wfd) )   wfd = NIL;
69   if ( isDefault(input) ) input = NIL;
70   if ( isDefault(sep) )   sep = newObject(ClassRegex, CtoName("\n"), EAV);
71 
72   if ( notNil(rfd) ) s->rdfd = valInt(rfd);
73   if ( notNil(wfd) ) s->wrfd = valInt(wfd);
74 
75   assign(s, input_message, input);
76   recordSeparatorStream(s, sep);
77 
78   succeed;
79 }
80 
81 
82 static status
unlinkStream(Stream s)83 unlinkStream(Stream s)
84 { return closeStream(s);
85 }
86 
87 		 /*******************************
88 		 *	    OPEN/CLOSE		*
89 		 *******************************/
90 
91 
92 status
closeStream(Stream s)93 closeStream(Stream s)
94 { closeOutputStream(s);
95   closeInputStream(s);
96 
97   ws_close_stream(s);
98 
99   succeed;
100 }
101 
102 
103 status
closeInputStream(Stream s)104 closeInputStream(Stream s)
105 { if ( s->rdfd >= 0 )
106   { DEBUG(NAME_stream, Cprintf("%s: Closing input\n", pp(s)));
107 
108     ws_close_input_stream(s);
109     s->rdfd = -1;
110 
111     if ( s->input_buffer )
112     { pceFree(s->input_buffer);
113       s->input_buffer = NULL;
114     }
115   }
116 
117   succeed;
118 }
119 
120 
121 status
closeOutputStream(Stream s)122 closeOutputStream(Stream s)
123 { if ( s->wrfd >= 0 )
124   { int input_too = (s->wrfd == s->rdfd);
125 
126     DEBUG(NAME_stream, Cprintf("%s: Closing output\n", pp(s)));
127 
128     ws_close_output_stream(s);
129     s->wrfd = -1;
130     if ( input_too )
131       closeInputStream(s);
132   }
133 
134   succeed;
135 }
136 
137 
138 status
inputStream(Stream s,Int fd)139 inputStream(Stream s, Int fd)
140 { if ( notDefault(fd) )
141   { if ( isNil(fd) )
142       closeInputStream(s);
143     else
144       s->rdfd = valInt(fd);		/* Unix only! */
145   }
146 
147 /*if ( notNil(s->input_message) )*/
148     ws_input_stream(s);
149 
150   succeed;
151 }
152 
153 
154 		 /*******************************
155 		 *        HANDLE INPUT		*
156 		 *******************************/
157 
158 
159 #define BLOCKSIZE 1024
160 #define ALLOCSIZE 1024
161 
162 #define Round(n, r) (((n) + (r) - 1) & ~((r)-1))
163 
164 void
add_data_stream(Stream s,char * data,int len)165 add_data_stream(Stream s, char *data, int len)
166 { char *q;
167 
168   if ( !s->input_buffer )
169   { s->input_allocated = Round(len+1, ALLOCSIZE);
170     s->input_buffer = pceMalloc(s->input_allocated);
171     s->input_p = 0;
172   } else if ( s->input_p + len >= s->input_allocated )
173   { s->input_allocated = Round(s->input_p + len + 1, ALLOCSIZE);
174     s->input_buffer = pceRealloc(s->input_buffer, s->input_allocated);
175   }
176 
177   q = (char *)&s->input_buffer[s->input_p];
178   memcpy(q, data, len);
179   s->input_p += len;
180 }
181 
182 
183 static void
write_byte(int byte)184 write_byte(int byte)
185 { if ( byte < 32 || (byte >= 127 && byte < 128+32) || byte == 255 )
186   { char buf[10];
187     char *prt = buf;
188 
189     switch(byte)
190     { case '\t':
191 	prt = "\\t";
192         break;
193       case '\r':
194 	prt = "\\r";
195 	break;
196       case '\n':
197 	prt = "\\n";
198 	break;
199       case '\b':
200 	prt = "\\b";
201 	break;
202       default:
203 	sprintf(buf, "<%d>", byte);
204     }
205 
206     Cprintf("%s", prt);
207   } else
208     Cputchar(byte);
209 }
210 
211 
212 static void
write_buffer(char * buf,int size)213 write_buffer(char *buf, int size)
214 { if ( size > 50 )
215   { write_buffer(buf, 25);
216     Cprintf(" ... ");
217     write_buffer(buf + size - 25, 25);
218   } else
219   { int n;
220 
221     for(n=0; n<size; n++)
222     { write_byte(buf[n]);
223     }
224   }
225 }
226 
227 
228 static void
dispatch_stream(Stream s,int size,int discard)229 dispatch_stream(Stream s, int size, int discard)
230 { string q;
231   AnswerMark mark;
232   Any str;
233 
234   assert(size <= s->input_p);
235 
236   markAnswerStack(mark);
237   str_set_n_ascii(&q, size, (char *)s->input_buffer);
238   str = StringToString(&q);
239   if ( discard )
240   { pceFree(s->input_buffer);
241     s->input_buffer = NULL;
242     s->input_allocated = s->input_p = 0;
243   } else
244   { memcpy((char *)s->input_buffer,
245 	   (char *)&s->input_buffer[size],
246 	   s->input_p - size);
247     s->input_p -= size;
248   }
249 
250   DEBUG(NAME_input,
251 	{ int n = valInt(getSizeCharArray(str));
252 
253 	  Cprintf("Sending: %d characters, `", n);
254 	  write_buffer(strName(str), n);
255 	  Cprintf("'\n\tLeft: %d characters, `", s->input_p);
256 	  write_buffer((char *)s->input_buffer, s->input_p);
257 	  Cprintf("'\n");
258 	});
259 
260   if ( notNil(s->input_message) )
261   { addCodeReference(s);
262     assert(isProperObject(s));
263     forwardReceiverCodev(s->input_message, s, 1, &str);
264     assert(isProperObject(s));
265     delCodeReference(s);
266   }
267 
268   rewindAnswerStack(mark, NIL);
269 }
270 
271 
272 static void
dispatch_input_stream(Stream s)273 dispatch_input_stream(Stream s)
274 { while( !onFlag(s, F_FREED|F_FREEING) && s->input_buffer && s->input_p > 0 )
275   { if ( isNil(s->record_separator) )
276     { dispatch_stream(s, s->input_p, TRUE);
277 
278       return;
279     }
280 
281     if ( isInteger(s->record_separator) )
282     { int bsize = valInt(s->record_separator);
283 
284       if ( bsize <= s->input_p )
285       {	dispatch_stream(s, bsize, FALSE);
286 	continue;
287       }
288 
289       return;
290     }
291 
292     if ( instanceOfObject(s->record_separator, ClassRegex) )
293     { Regex re = s->record_separator;
294       string str;
295 
296       str_set_n_ascii(&str, s->input_p, (char *)s->input_buffer);
297       if ( search_string_regex(re, &str) )
298       { int size = valInt(getRegisterEndRegex(s->record_separator, ZERO));
299 
300 	dispatch_stream(s, size, FALSE);
301 	continue;
302       }
303     }
304 
305     return;
306   }
307 }
308 
309 
310 
311 status
handleInputStream(Stream s)312 handleInputStream(Stream s)
313 { char buf[BLOCKSIZE];
314   int n;
315 
316   if ( onFlag(s, F_FREED|F_FREEING) )
317     fail;
318 
319   if ( (n = ws_read_stream_data(s, buf, BLOCKSIZE, DEFAULT)) > 0 )
320   { if ( isNil(s->input_message) )	/* modal */
321       add_data_stream(s, buf, n);
322     else if ( isNil(s->record_separator) && !s->input_buffer )
323     { string q;
324       Any str;
325       AnswerMark mark;
326       markAnswerStack(mark);
327 
328       DEBUG(NAME_input,
329 	    { Cprintf("Read (%d chars, unbuffered): `", n);
330 	      write_buffer(buf, n);
331 	      Cprintf("'\n");
332 	    });
333 
334       str_set_n_ascii(&q, n, buf);
335       str = StringToString(&q);
336       addCodeReference(s);
337       forwardReceiverCodev(s->input_message, s, 1, &str);
338       delCodeReference(s);
339 
340       rewindAnswerStack(mark, NIL);
341     } else
342     { add_data_stream(s, buf, n);
343 
344       DEBUG(NAME_input,
345 	    { Cprintf("Read (%d chars): `", n);
346 	      write_buffer((char *)&s->input_buffer[s->input_p-n], n);
347 	      Cprintf("'\n");
348 	    });
349 
350       dispatch_input_stream(s);
351     }
352   } else if ( n != -2 )			/* Win 9x errornous WSAEWOULDBLOCK */
353   {
354     DEBUG(NAME_stream,
355 	  if ( n < 0 )
356 	    Cprintf("Read failed: %s\n", strName(StreamError()));
357 	  else
358 	    Cprintf("%s: Got 0 characters: EOF\n", pp(s));
359 	 );
360     send(s, NAME_closeInput, EAV);
361     send(s, NAME_endOfFile, EAV);
362   }
363 
364   succeed;
365 }
366 
367 
368 		 /*******************************
369 		 *       OUTPUT HANDLING	*
370 		 *******************************/
371 
372 static status
appendStream(Stream s,CharArray data)373 appendStream(Stream s, CharArray data)
374 { PceString str = &data->data;
375   int l = str_datasize(str);
376 
377   return ws_write_stream_data(s, str->s_text, l);
378 }
379 
380 
381 static status
newlineStream(Stream s)382 newlineStream(Stream s)
383 { static char nl[] = "\n";
384 
385   return ws_write_stream_data(s, nl, 1);
386 }
387 
388 
389 static status
appendLineStream(Stream s,CharArray data)390 appendLineStream(Stream s, CharArray data)
391 { if ( !appendStream(s, data) ||
392        !newlineStream(s) )
393     fail;
394 
395   succeed;
396 }
397 
398 
399 static status
formatStream(Stream s,CharArray fmt,int argc,Any * argv)400 formatStream(Stream s, CharArray fmt, int argc, Any *argv)
401 { string tmp;
402   status rc;
403 
404   str_writefv(&tmp, fmt, argc, argv);
405   if ( isstrA(&tmp) )
406   { rc = ws_write_stream_data(s, tmp.s_textA, tmp.s_size);
407   } else
408   { Cprintf("TBD: wide characters in stream->format");
409 
410     rc = FALSE;
411   }
412 
413   str_unalloc(&tmp);
414 
415   return rc;
416 }
417 
418 
419 static status
waitStream(Stream s)420 waitStream(Stream s)
421 { while( s->rdfd >= 0 )
422     dispatchDisplayManager(TheDisplayManager(), DEFAULT, DEFAULT);
423 
424   succeed;
425 }
426 
427 		 /*******************************
428 		 *	  INPUT HANDLING	*
429 		 *******************************/
430 
431 static StringObj
getReadLineStream(Stream s,Real timeout)432 getReadLineStream(Stream s, Real timeout)
433 { unsigned long epoch, tmo, left;
434   int use_timeout;
435 
436   if ( instanceOfObject(timeout, ClassReal) )
437   { double v = valReal(timeout);
438 
439     if ( v < 0.0 )
440       answer((StringObj)NIL);
441 
442     epoch = mclock();
443     tmo = (unsigned long)(v * 1000.0);
444     use_timeout = TRUE;
445   } else
446   { use_timeout = FALSE;
447     epoch = tmo = 0L;		/* keep compiler happy */
448   }
449 
450   while(s->rdfd >= 0)
451   { if ( s->input_buffer )
452     { unsigned char *q;
453       int n;
454 
455       DEBUG(NAME_stream, Cprintf("Scanning %d chars\n", s->input_p));
456       for(n=s->input_p, q = s->input_buffer; n > 0; n--, q++)
457       { if ( *q == '\n' )
458 	{ string str;
459 	  size_t len = (q-s->input_buffer)+1;
460 	  StringObj rval;
461 
462 	  str_set_n_ascii(&str, len, (char *)s->input_buffer);
463 	  rval = StringToString(&str);
464 	  memmove((char *)s->input_buffer,
465 		  (char *)&s->input_buffer[len], s->input_p - len);
466 	  s->input_p -= len;
467 
468 	  return rval;
469 	}
470       }
471       DEBUG(NAME_stream, Cprintf("No newline, reading\n"));
472     }
473 
474     if ( use_timeout )
475     { unsigned long now = mclock();
476 
477       if ( now - epoch > tmo )
478 	answer((StringObj)NIL);
479       left = tmo - (now - epoch);
480     } else
481       left = 0;				/* keep compiler happy */
482 
483     if ( !ws_dispatch(DEFAULT, use_timeout ? toInt(left) : NIL) )
484       return (StringObj) NIL;
485   }
486 
487   fail;
488 }
489 
490 
491 static status
endOfFileStream(Stream s)492 endOfFileStream(Stream s)
493 { DEBUG(NAME_stream, Cprintf("Stream %s: end of output\n", pp(s)));
494 
495   succeed;
496 }
497 
498 
499 static status
recordSeparatorStream(Stream s,Any re)500 recordSeparatorStream(Stream s, Any re)
501 { if ( s->record_separator != re )
502   { if ( isInteger(re) && valInt(re) > STR_MAX_SIZE )
503       return errorPce(s, NAME_maxRecordSize, toInt(STR_MAX_SIZE));
504 
505     assign(s, record_separator, re);
506 
507     if ( instanceOfObject(re, ClassRegex) )
508       compileRegex(re, ON);
509 
510     dispatch_input_stream(s);		/* handle possible pending data */
511   }
512 
513   succeed;
514 }
515 
516 
517 static status
inputMessageStream(Stream s,Code msg)518 inputMessageStream(Stream s, Code msg)
519 { if ( s->input_message != msg )
520   { Code old = s->input_message;
521 
522     assign(s, input_message, msg);
523     if ( isNil(old) && notNil(msg) )
524     { ws_input_stream(s);
525     } else if ( notNil(old) && isNil(msg) )
526     { ws_no_input_stream(s);
527     }
528   }
529 
530   succeed;
531 }
532 
533 
534 		 /*******************************
535 		 *	      AS FILE		*
536 		 *******************************/
537 
538 static status
writeAsFileStream(Stream s,Int where,CharArray txt)539 writeAsFileStream(Stream s, Int where, CharArray txt)
540 { if ( notDefault(where) )
541     return errorPce(s, NAME_cannotSeekNonFile);
542 
543   return appendStream(s, txt);
544 }
545 
546 
547 		 /*******************************
548 		 *	 CLASS DECLARATION	*
549 		 *******************************/
550 
551 /* Type declarations */
552 
553 static char *T_format[] =
554         { "format=char_array", "argument=any ..." };
555 static char *T_initialise[] =
556         { "rfd=[int]", "wfd=[int]",
557 	  "input_message=[code]", "record_separator=[regex|int]" };
558 static char *T_writeAsFile[] =
559         { "at=[int]", "text=char_array" };
560 
561 /* Instance Variables */
562 
563 #define var_stream XPCE_var_stream	/* AIX 3.2.5 conflict */
564 
565 static vardecl var_stream[] =
566 { SV(NAME_inputMessage, "code*", IV_GET|IV_STORE,
567      inputMessageStream,
568      NAME_input, "Forwarded on input from the stream"),
569   SV(NAME_recordSeparator, "regex|int*", IV_GET|IV_STORE,
570      recordSeparatorStream,
571      NAME_input, "Regex that describes the record separator"),
572   IV(NAME_wrfd, "alien:int", IV_NONE,
573      NAME_internal, "File-handle to write to stream"),
574   IV(NAME_rdfd, "alien:int", IV_NONE,
575      NAME_internal, "File-handle to read from stream"),
576   IV(NAME_rdstream, "alien:FILE *", IV_NONE,
577      NAME_internal, "Stream used for <-read_line"),
578   IV(NAME_wsRef, "alien:WsRef", IV_NONE,
579      NAME_internal, "Window system synchronisation"),
580   IV(NAME_inputBuffer, "alien:char *", IV_NONE,
581      NAME_internal, "Buffer for collecting input-data"),
582   IV(NAME_inputAllocated, "alien:int", IV_NONE,
583      NAME_internal, "Allocated size of input_buffer"),
584   IV(NAME_inputP, "alien:int", IV_NONE,
585      NAME_internal, "Number of characters in input_buffer")
586 };
587 
588 /* Send Methods */
589 
590 static senddecl send_stream[] =
591 { SM(NAME_initialise, 4, T_initialise, initialiseStream,
592      DEFAULT, "Create stream"),
593   SM(NAME_unlink, 0, NULL, unlinkStream,
594      DEFAULT, "Cleanup stream"),
595   SM(NAME_wait, 0, NULL, waitStream,
596      NAME_control, "Wait for the complete output"),
597   SM(NAME_endOfFile, 0, NULL, endOfFileStream,
598      NAME_input, "Send when end-of-file is reached"),
599   SM(NAME_closeInput, 0, NULL, closeInputStream,
600      NAME_open, "Close input section of stream"),
601   SM(NAME_closeOutput, 0, NULL, closeOutputStream,
602      NAME_open, "Close output section of stream"),
603   SM(NAME_input, 1, "fd=[int]*", inputStream,
604      NAME_open, "Enable input from file-descriptor"),
605   SM(NAME_append, 1, "data=char_array", appendStream,
606      NAME_output, "Send data to stream"),
607   SM(NAME_appendLine, 1, "data=char_array", appendLineStream,
608      NAME_output, "->append and ->newline"),
609   SM(NAME_format, 2, T_format, formatStream,
610      NAME_output, "Format arguments and send to stream"),
611   SM(NAME_newline, 0, NULL, newlineStream,
612      NAME_output, "Send a newline to the stream"),
613   SM(NAME_writeAsFile, 2, T_writeAsFile, writeAsFileStream,
614      NAME_stream, "Allow pce_open(Socket, append, Stream)")
615 };
616 
617 /* Get Methods */
618 
619 static getdecl get_stream[] =
620 { GM(NAME_readLine, 1, "string*", "timeout=[real]", getReadLineStream,
621      NAME_input, "Read line with optional timeout (seconds)")
622 };
623 
624 /* Resources */
625 
626 #define rc_stream NULL
627 /*
628 static classvardecl rc_stream[] =
629 {
630 };
631 */
632 
633 /* Class Declaration */
634 
635 ClassDecl(stream_decls,
636           var_stream, send_stream, get_stream, rc_stream,
637           0, NULL,
638           "$Rev$");
639 
640 status
makeClassStream(Class class)641 makeClassStream(Class class)
642 { return declareClass(class, &stream_decls);
643 }
644 
645 #else /*O_NO_PROCESS && O_NO_SOCKET*/
646 
647 		 /*******************************
648 		 *	 CLASS DECLARATION	*
649 		 *******************************/
650 
651 /* Type declarations */
652 
653 
654 /* Instance Variables */
655 
656 static vardecl var_stream[] =
657 { IV(NAME_inputMessage, "code*", IV_BOTH,
658      NAME_input, "Forwarded on input from the stream"),
659   IV(NAME_recordSeparator, "regex|int*", IV_GET,
660      NAME_input, "Regex that describes the record separator"),
661   IV(NAME_wrfd, "alien:int", IV_NONE,
662      NAME_internal, "File-handle to write to stream"),
663   IV(NAME_rdfd, "alien:int", IV_NONE,
664      NAME_internal, "File-handle to read from stream"),
665   IV(NAME_rdstream, "alien:FILE *", IV_NONE,
666      NAME_internal, "Stream used for <-read_line"),
667   IV(NAME_inputBuffer, "alien:char *", IV_NONE,
668      NAME_internal, "Buffer for collecting input-data"),
669   IV(NAME_inputAllocated, "alien:int", IV_NONE,
670      NAME_internal, "Allocated size of input_buffer"),
671   IV(NAME_inputP, "alien:int", IV_NONE,
672      NAME_internal, "Number of characters in input_buffer"),
673   IV(NAME_wsRef, "alien:WsRef", IV_NONE,
674      NAME_internal, "Window System synchronisation")
675 };
676 
677 /* Send Methods */
678 
679 #define send_stream NULL
680 /*
681 static senddecl send_stream[] =
682 {
683 };
684 */
685 
686 /* Get Methods */
687 
688 #define get_stream NULL
689 /*
690 static getdecl get_stream[] =
691 {
692 };
693 */
694 
695 /* Resources */
696 
697 #define rc_stream NULL
698 /*
699 static classvardecl rc_stream[] =
700 {
701 };
702 */
703 
704 /* Class Declaration */
705 
706 ClassDecl(stream_decls,
707           var_stream, send_stream, get_stream, rc_stream,
708           0, NULL,
709           "$Rev$");
710 
711 status
makeClassStream(Class class)712 makeClassStream(Class class)
713 { return declareClass(class, &stream_decls);
714 }
715 
716 #endif /*O_NO_PROCESS && O_NO_SOCKET*/
717 
718