1 /* -*-C-*-
2
3 Copyright (C) 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994,
4 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
5 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Massachusetts
6 Institute of Technology
7
8 This file is part of MIT/GNU Scheme.
9
10 MIT/GNU Scheme is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2 of the License, or (at
13 your option) any later version.
14
15 MIT/GNU Scheme is distributed in the hope that it will be useful, but
16 WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with MIT/GNU Scheme; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301,
23 USA.
24
25 */
26
27 /* Scheme side of channel thread interface */
28
29 #include "os2.h"
30
31 static void run_channel_thread (void *);
32 static void start_readahead_thread (channel_context_t *);
33 static void send_readahead_ack (qid_t, enum readahead_ack_action);
34 static msg_t * new_message (void);
35
36 typedef struct
37 {
38 LHANDLE handle;
39 qid_t qid;
40 channel_reader_t reader;
41 } thread_arg_t;
42
43 void
OS2_start_channel_thread(Tchannel channel,channel_reader_t reader,channel_op_t operator)44 OS2_start_channel_thread (Tchannel channel,
45 channel_reader_t reader,
46 channel_op_t operator)
47 {
48 channel_context_t * context = (OS2_make_channel_context ());
49 thread_arg_t * arg = (OS_malloc (sizeof (thread_arg_t)));
50 (CHANNEL_OPERATOR_CONTEXT (channel)) = context;
51 OS2_open_qid ((CHANNEL_CONTEXT_READER_QID (context)), OS2_scheme_tqueue);
52 OS2_open_qid
53 ((CHANNEL_CONTEXT_WRITER_QID (context)), (OS2_make_std_tqueue ()));
54 (arg -> handle) = (CHANNEL_HANDLE (channel));
55 (arg -> qid) = (CHANNEL_CONTEXT_WRITER_QID (context));
56 (arg -> reader) = reader;
57 (CHANNEL_CONTEXT_TID (context))
58 = (OS2_beginthread (run_channel_thread, arg, 0));
59 (CHANNEL_OPERATOR (channel)) = operator;
60 }
61
62 static void
run_channel_thread(void * arg)63 run_channel_thread (void * arg)
64 {
65 LHANDLE handle = (((thread_arg_t *) arg) -> handle);
66 qid_t qid = (((thread_arg_t *) arg) -> qid);
67 channel_reader_t reader = (((thread_arg_t *) arg) -> reader);
68 EXCEPTIONREGISTRATIONRECORD registration;
69 OS_free (arg);
70 (void) OS2_thread_initialize ((®istration), qid);
71 /* Wait for first read request before doing anything. */
72 while ((OS2_wait_for_readahead_ack (qid)) == raa_read)
73 {
74 int eofp;
75 msg_t * message
76 = ((*reader) (handle, qid, (OS2_make_readahead ()), (&eofp)));
77 if (message == 0)
78 break;
79 OS2_send_message (qid, message);
80 if (eofp)
81 break;
82 }
83 {
84 tqueue_t * tqueue = (OS2_qid_tqueue (qid));
85 OS2_close_qid (qid);
86 OS2_close_std_tqueue (tqueue);
87 }
88 OS2_endthread ();
89 }
90
91 void
OS2_channel_thread_read_op(Tchannel channel,choparg_t arg1,choparg_t arg2,choparg_t arg3)92 OS2_channel_thread_read_op (Tchannel channel,
93 choparg_t arg1, choparg_t arg2, choparg_t arg3)
94 {
95 (* ((long *) arg3))
96 = (OS2_channel_thread_read
97 (channel, ((char *) arg1), ((size_t) arg2)));
98 }
99
100 void
OS2_initialize_channel_thread_messages(void)101 OS2_initialize_channel_thread_messages (void)
102 {
103 SET_MSG_TYPE_LENGTH (mt_readahead, sm_readahead_t);
104 SET_MSG_TYPE_LENGTH (mt_readahead_ack, sm_readahead_ack_t);
105 }
106
107 channel_context_t *
OS2_make_channel_context(void)108 OS2_make_channel_context (void)
109 {
110 channel_context_t * context = (OS_malloc (sizeof (channel_context_t)));
111 OS2_make_qid_pair ((& (CHANNEL_CONTEXT_READER_QID (context))),
112 (& (CHANNEL_CONTEXT_WRITER_QID (context))));
113 (CHANNEL_CONTEXT_EOFP (context)) = 0;
114 (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 1;
115 return (context);
116 }
117
118 void
OS2_channel_thread_close(Tchannel channel)119 OS2_channel_thread_close (Tchannel channel)
120 {
121 channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
122 /* Send a readahead ACK informing the channel thread to kill itself.
123 Then, close our end of the connection -- it's no longer needed. */
124 send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_close);
125 OS2_close_qid (CHANNEL_CONTEXT_READER_QID (context));
126 OS_free (context);
127 /* Finally, the caller must close the channel handle. If the
128 channel thread is blocked in dos_read, this will break it out and
129 get it to kill itself. There's no race, because the channel
130 thread won't try to close the handle, and if it breaks out of
131 dos_read before we do the close, it will see the readahead ACK we
132 just sent and that will kill it. */
133 }
134
135 qid_t
OS2_channel_thread_descriptor(Tchannel channel)136 OS2_channel_thread_descriptor (Tchannel channel)
137 {
138 channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
139 /* Make sure that the readahead thread is started, so that when
140 input arrives it will be registered properly so that the "select"
141 emulation will notice it. */
142 start_readahead_thread (context);
143 return (CHANNEL_CONTEXT_READER_QID (context));
144 }
145
146 static void
start_readahead_thread(channel_context_t * context)147 start_readahead_thread (channel_context_t * context)
148 {
149 /* Wake up the reader thread if this is the first time we are
150 operating on it. This is necessary because we sometimes don't
151 want to read from the channel at all -- for example, when the
152 channel is the read side of a pipe that is being passed to a
153 child process. */
154 if (CHANNEL_CONTEXT_FIRST_READ_P (context))
155 {
156 send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_read);
157 (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 0;
158 }
159 }
160
161 msg_t *
OS2_make_readahead(void)162 OS2_make_readahead (void)
163 {
164 msg_t * message = (OS2_create_message (mt_readahead));
165 (SM_READAHEAD_INDEX (message)) = 0;
166 return (message);
167 }
168
169 long
OS2_channel_thread_read(Tchannel channel,char * buffer,size_t size)170 OS2_channel_thread_read (Tchannel channel, char * buffer, size_t size)
171 {
172 channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
173 qid_t qid = (CHANNEL_CONTEXT_READER_QID (context));
174 msg_t * message;
175 unsigned short index;
176 unsigned short navail;
177 if ((CHANNEL_CONTEXT_EOFP (context)) || (size == 0))
178 return (0);
179 start_readahead_thread (context);
180 message = (OS2_receive_message (qid, (!CHANNEL_NONBLOCKING (channel)), 1));
181 if (message == 0)
182 return (-1);
183 if (OS2_error_message_p (message))
184 {
185 send_readahead_ack (qid, raa_read);
186 OS2_handle_error_message (message);
187 }
188 if ((MSG_TYPE (message)) != mt_readahead)
189 OS2_logic_error ("Illegal message from channel thread.");
190 index = (SM_READAHEAD_INDEX (message));
191 if (index == 0)
192 send_readahead_ack (qid, raa_read);
193 navail = ((SM_READAHEAD_SIZE (message)) - index);
194 if (navail == 0)
195 {
196 OS2_destroy_message (message);
197 (CHANNEL_CONTEXT_EOFP (context)) = 1;
198 return (0);
199 }
200 else if (navail <= size)
201 {
202 FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, navail);
203 OS2_destroy_message (message);
204 return (navail);
205 }
206 else
207 {
208 FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, size);
209 (SM_READAHEAD_INDEX (message)) += size;
210 OS2_unread_message (qid, message);
211 return (size);
212 }
213 }
214
215 static void
send_readahead_ack(qid_t qid,enum readahead_ack_action action)216 send_readahead_ack (qid_t qid, enum readahead_ack_action action)
217 {
218 msg_t * message = (OS2_create_message (mt_readahead_ack));
219 (SM_READAHEAD_ACK_ACTION (message)) = action;
220 OS2_send_message (qid, message);
221 }
222
223 enum readahead_ack_action
OS2_wait_for_readahead_ack(qid_t qid)224 OS2_wait_for_readahead_ack (qid_t qid)
225 {
226 /* Wait for an acknowledgement before starting another read.
227 This regulates the amount of data in the queue. */
228 msg_t * message = (OS2_wait_for_message (qid, mt_readahead_ack));
229 enum readahead_ack_action action = (SM_READAHEAD_ACK_ACTION (message));
230 OS2_destroy_message (message);
231 return (action);
232 }
233
234 void
OS2_readahead_buffer_insert(void * buffer,char c)235 OS2_readahead_buffer_insert (void * buffer, char c)
236 {
237 msg_t * last = (OS2_msg_fifo_last (buffer));
238 if ((last != 0) && ((SM_READAHEAD_SIZE (last)) < SM_READAHEAD_MAX))
239 ((SM_READAHEAD_DATA (last)) [(SM_READAHEAD_SIZE (last))++]) = c;
240 else
241 {
242 msg_t * message = (new_message ());
243 ((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
244 OS2_msg_fifo_insert (buffer, message);
245 }
246 }
247
248 static msg_t *
new_message(void)249 new_message (void)
250 {
251 msg_t * message = (OS2_make_readahead ());
252 (SM_READAHEAD_SIZE (message)) = 0;
253 return (message);
254 }
255
256 char
OS2_readahead_buffer_rubout(void * buffer)257 OS2_readahead_buffer_rubout (void * buffer)
258 {
259 msg_t * message = (OS2_msg_fifo_last (buffer));
260 if (message == 0)
261 OS2_logic_error ("Rubout from empty readahead buffer.");
262 {
263 char c = ((SM_READAHEAD_DATA (message)) [--(SM_READAHEAD_SIZE (message))]);
264 if ((SM_READAHEAD_SIZE (message)) == 0)
265 {
266 OS2_msg_fifo_remove_last (buffer);
267 OS2_destroy_message (message);
268 }
269 return (c);
270 }
271 }
272
273 msg_t *
OS2_readahead_buffer_read(void * buffer)274 OS2_readahead_buffer_read (void * buffer)
275 {
276 msg_t * message = (OS2_msg_fifo_remove (buffer));
277 return ((message == 0) ? (new_message ()) : message);
278 }
279