1 /* -*-C-*-
2 
3 Copyright (C) 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994,
4     1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
5     2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Massachusetts
6     Institute of Technology
7 
8 This file is part of MIT/GNU Scheme.
9 
10 MIT/GNU Scheme is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2 of the License, or (at
13 your option) any later version.
14 
15 MIT/GNU Scheme is distributed in the hope that it will be useful, but
16 WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 General Public License for more details.
19 
20 You should have received a copy of the GNU General Public License
21 along with MIT/GNU Scheme; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301,
23 USA.
24 
25 */
26 
27 /* Scheme side of channel thread interface */
28 
29 #include "os2.h"
30 
31 static void run_channel_thread (void *);
32 static void start_readahead_thread (channel_context_t *);
33 static void send_readahead_ack (qid_t, enum readahead_ack_action);
34 static msg_t * new_message (void);
35 
36 typedef struct
37 {
38   LHANDLE handle;
39   qid_t qid;
40   channel_reader_t reader;
41 } thread_arg_t;
42 
43 void
OS2_start_channel_thread(Tchannel channel,channel_reader_t reader,channel_op_t operator)44 OS2_start_channel_thread (Tchannel channel,
45 			  channel_reader_t reader,
46 			  channel_op_t operator)
47 {
48   channel_context_t * context = (OS2_make_channel_context ());
49   thread_arg_t * arg = (OS_malloc (sizeof (thread_arg_t)));
50   (CHANNEL_OPERATOR_CONTEXT (channel)) = context;
51   OS2_open_qid ((CHANNEL_CONTEXT_READER_QID (context)), OS2_scheme_tqueue);
52   OS2_open_qid
53     ((CHANNEL_CONTEXT_WRITER_QID (context)), (OS2_make_std_tqueue ()));
54   (arg -> handle) = (CHANNEL_HANDLE (channel));
55   (arg -> qid) = (CHANNEL_CONTEXT_WRITER_QID (context));
56   (arg -> reader) = reader;
57   (CHANNEL_CONTEXT_TID (context))
58     = (OS2_beginthread (run_channel_thread, arg, 0));
59   (CHANNEL_OPERATOR (channel)) = operator;
60 }
61 
62 static void
run_channel_thread(void * arg)63 run_channel_thread (void * arg)
64 {
65   LHANDLE handle = (((thread_arg_t *) arg) -> handle);
66   qid_t qid = (((thread_arg_t *) arg) -> qid);
67   channel_reader_t reader = (((thread_arg_t *) arg) -> reader);
68   EXCEPTIONREGISTRATIONRECORD registration;
69   OS_free (arg);
70   (void) OS2_thread_initialize ((&registration), qid);
71   /* Wait for first read request before doing anything.  */
72   while ((OS2_wait_for_readahead_ack (qid)) == raa_read)
73     {
74       int eofp;
75       msg_t * message
76 	= ((*reader) (handle, qid, (OS2_make_readahead ()), (&eofp)));
77       if (message == 0)
78 	break;
79       OS2_send_message (qid, message);
80       if (eofp)
81 	break;
82     }
83   {
84     tqueue_t * tqueue = (OS2_qid_tqueue (qid));
85     OS2_close_qid (qid);
86     OS2_close_std_tqueue (tqueue);
87   }
88   OS2_endthread ();
89 }
90 
91 void
OS2_channel_thread_read_op(Tchannel channel,choparg_t arg1,choparg_t arg2,choparg_t arg3)92 OS2_channel_thread_read_op (Tchannel channel,
93 			    choparg_t arg1, choparg_t arg2, choparg_t arg3)
94 {
95   (* ((long *) arg3))
96     = (OS2_channel_thread_read
97        (channel, ((char *) arg1), ((size_t) arg2)));
98 }
99 
100 void
OS2_initialize_channel_thread_messages(void)101 OS2_initialize_channel_thread_messages (void)
102 {
103   SET_MSG_TYPE_LENGTH (mt_readahead, sm_readahead_t);
104   SET_MSG_TYPE_LENGTH (mt_readahead_ack, sm_readahead_ack_t);
105 }
106 
107 channel_context_t *
OS2_make_channel_context(void)108 OS2_make_channel_context (void)
109 {
110   channel_context_t * context = (OS_malloc (sizeof (channel_context_t)));
111   OS2_make_qid_pair ((& (CHANNEL_CONTEXT_READER_QID (context))),
112 		     (& (CHANNEL_CONTEXT_WRITER_QID (context))));
113   (CHANNEL_CONTEXT_EOFP (context)) = 0;
114   (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 1;
115   return (context);
116 }
117 
118 void
OS2_channel_thread_close(Tchannel channel)119 OS2_channel_thread_close (Tchannel channel)
120 {
121   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
122   /* Send a readahead ACK informing the channel thread to kill itself.
123      Then, close our end of the connection -- it's no longer needed.  */
124   send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_close);
125   OS2_close_qid (CHANNEL_CONTEXT_READER_QID (context));
126   OS_free (context);
127   /* Finally, the caller must close the channel handle.  If the
128      channel thread is blocked in dos_read, this will break it out and
129      get it to kill itself.  There's no race, because the channel
130      thread won't try to close the handle, and if it breaks out of
131      dos_read before we do the close, it will see the readahead ACK we
132      just sent and that will kill it.  */
133 }
134 
135 qid_t
OS2_channel_thread_descriptor(Tchannel channel)136 OS2_channel_thread_descriptor (Tchannel channel)
137 {
138   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
139   /* Make sure that the readahead thread is started, so that when
140      input arrives it will be registered properly so that the "select"
141      emulation will notice it.  */
142   start_readahead_thread (context);
143   return (CHANNEL_CONTEXT_READER_QID (context));
144 }
145 
146 static void
start_readahead_thread(channel_context_t * context)147 start_readahead_thread (channel_context_t * context)
148 {
149   /* Wake up the reader thread if this is the first time we are
150      operating on it.  This is necessary because we sometimes don't
151      want to read from the channel at all -- for example, when the
152      channel is the read side of a pipe that is being passed to a
153      child process.  */
154   if (CHANNEL_CONTEXT_FIRST_READ_P (context))
155     {
156       send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_read);
157       (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 0;
158     }
159 }
160 
161 msg_t *
OS2_make_readahead(void)162 OS2_make_readahead (void)
163 {
164   msg_t * message = (OS2_create_message (mt_readahead));
165   (SM_READAHEAD_INDEX (message)) = 0;
166   return (message);
167 }
168 
169 long
OS2_channel_thread_read(Tchannel channel,char * buffer,size_t size)170 OS2_channel_thread_read (Tchannel channel, char * buffer, size_t size)
171 {
172   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
173   qid_t qid = (CHANNEL_CONTEXT_READER_QID (context));
174   msg_t * message;
175   unsigned short index;
176   unsigned short navail;
177   if ((CHANNEL_CONTEXT_EOFP (context)) || (size == 0))
178     return (0);
179   start_readahead_thread (context);
180   message = (OS2_receive_message (qid, (!CHANNEL_NONBLOCKING (channel)), 1));
181   if (message == 0)
182     return (-1);
183   if (OS2_error_message_p (message))
184     {
185       send_readahead_ack (qid, raa_read);
186       OS2_handle_error_message (message);
187     }
188   if ((MSG_TYPE (message)) != mt_readahead)
189     OS2_logic_error ("Illegal message from channel thread.");
190   index = (SM_READAHEAD_INDEX (message));
191   if (index == 0)
192     send_readahead_ack (qid, raa_read);
193   navail = ((SM_READAHEAD_SIZE (message)) - index);
194   if (navail == 0)
195     {
196       OS2_destroy_message (message);
197       (CHANNEL_CONTEXT_EOFP (context)) = 1;
198       return (0);
199     }
200   else if (navail <= size)
201     {
202       FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, navail);
203       OS2_destroy_message (message);
204       return (navail);
205     }
206   else
207     {
208       FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, size);
209       (SM_READAHEAD_INDEX (message)) += size;
210       OS2_unread_message (qid, message);
211       return (size);
212     }
213 }
214 
215 static void
send_readahead_ack(qid_t qid,enum readahead_ack_action action)216 send_readahead_ack (qid_t qid, enum readahead_ack_action action)
217 {
218   msg_t * message = (OS2_create_message (mt_readahead_ack));
219   (SM_READAHEAD_ACK_ACTION (message)) = action;
220   OS2_send_message (qid, message);
221 }
222 
223 enum readahead_ack_action
OS2_wait_for_readahead_ack(qid_t qid)224 OS2_wait_for_readahead_ack (qid_t qid)
225 {
226   /* Wait for an acknowledgement before starting another read.
227      This regulates the amount of data in the queue.  */
228   msg_t * message = (OS2_wait_for_message (qid, mt_readahead_ack));
229   enum readahead_ack_action action = (SM_READAHEAD_ACK_ACTION (message));
230   OS2_destroy_message (message);
231   return (action);
232 }
233 
234 void
OS2_readahead_buffer_insert(void * buffer,char c)235 OS2_readahead_buffer_insert (void * buffer, char c)
236 {
237   msg_t * last = (OS2_msg_fifo_last (buffer));
238   if ((last != 0) && ((SM_READAHEAD_SIZE (last)) < SM_READAHEAD_MAX))
239     ((SM_READAHEAD_DATA (last)) [(SM_READAHEAD_SIZE (last))++]) = c;
240   else
241     {
242       msg_t * message = (new_message ());
243       ((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
244       OS2_msg_fifo_insert (buffer, message);
245     }
246 }
247 
248 static msg_t *
new_message(void)249 new_message (void)
250 {
251   msg_t * message = (OS2_make_readahead ());
252   (SM_READAHEAD_SIZE (message)) = 0;
253   return (message);
254 }
255 
256 char
OS2_readahead_buffer_rubout(void * buffer)257 OS2_readahead_buffer_rubout (void * buffer)
258 {
259   msg_t * message = (OS2_msg_fifo_last (buffer));
260   if (message == 0)
261     OS2_logic_error ("Rubout from empty readahead buffer.");
262   {
263     char c = ((SM_READAHEAD_DATA (message)) [--(SM_READAHEAD_SIZE (message))]);
264     if ((SM_READAHEAD_SIZE (message)) == 0)
265       {
266 	OS2_msg_fifo_remove_last (buffer);
267 	OS2_destroy_message (message);
268       }
269     return (c);
270   }
271 }
272 
273 msg_t *
OS2_readahead_buffer_read(void * buffer)274 OS2_readahead_buffer_read (void * buffer)
275 {
276   msg_t * message = (OS2_msg_fifo_remove (buffer));
277   return ((message == 0) ? (new_message ()) : message);
278 }
279