1 /*
2  * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.lib.aSocket.nbio;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.core.*;
29 import seda.sandStorm.lib.aSocket.*;
30 import seda.nbio.*;
31 
32 import java.net.*;
33 import java.io.*;
34 import java.util.*;
35 
36 /**
37  * Internal class used to represent state of an active socket connection.
38  */
39 class SockState extends seda.sandStorm.lib.aSocket.SockState {
40 
41   private static final boolean DEBUG = false;
42 
43   private NonblockingInputStream nbis;
44   private NonblockingOutputStream nbos;
45   private SelectItem readsi, writesi;
46 
47   private SelectSource read_selsource;
48   private SelectSource write_selsource;
49 
SockState(ATcpConnection conn, Socket nbsock, int writeClogThreshold)50   public SockState(ATcpConnection conn, Socket nbsock, int writeClogThreshold) throws IOException {
51     if (DEBUG) System.err.println("SockState: Constructor called with "+conn+", "+nbsock+", "+writeClogThreshold);
52     this.conn = conn;
53     this.nbsock = nbsock;
54     this.writeClogThreshold = writeClogThreshold;
55     this.write_selsource = null;
56 
57     if (DEBUG) System.err.println("Sockstate " + nbsock + ": Const creating nbis");
58     nbis = (NonblockingInputStream)nbsock.getInputStream();
59     if (DEBUG) System.err.println("Sockstate " + nbsock + ": Const creating nbos");
60     nbos = (NonblockingOutputStream)nbsock.getOutputStream();
61 
62     if (DEBUG) System.err.println("SockState "+nbsock+": Const creating readBuf of size "+aSocketConst.READ_BUFFER_SIZE);
63     readBuf = new byte[aSocketConst.READ_BUFFER_SIZE];
64 
65     if (DEBUG) System.err.println("SockState "+nbsock+": Setting flags");
66     outstanding_writes = 0;
67     numEmptyWrites = 0;
68     writeReqList = new ssLinkedList();
69 
70     clogged_qel = null;
71     clogged_numtries = 0;
72     if (DEBUG) System.err.println("SockState "+nbsock+": Const done");
73   }
74 
SockState(ATcpConnection conn, Socket nbsock, Integer writeClogThreshold)75   public SockState(ATcpConnection conn, Socket nbsock, Integer writeClogThreshold) throws IOException {
76       this(conn, nbsock, writeClogThreshold.intValue());
77     }
78 
79   // This is synchronized with close()
readInit(SelectSourceIF read_selsource, SinkIF compQ, int readClogTries)80   protected synchronized void readInit(SelectSourceIF read_selsource, SinkIF compQ, int readClogTries) {
81     if (DEBUG) System.err.println("readInit called on "+this);
82     if (closed) return; // May have been closed already
83     this.read_selsource = (SelectSource)read_selsource;
84     this.readCompQ = compQ;
85     this.readClogTries = readClogTries;
86     readsi = new SelectItem((NonblockingSocket)nbsock, this, Selectable.READ_READY);
87     this.read_selsource.register(readsi);
88   }
89 
90 
doRead()91   protected void doRead() {
92     if (DEBUG) System.err.println("SockState: doRead called");
93 
94     // When using SelectSource, we need this guard, since after closing
95     // a socket we may have outstanding read events still in the queue
96     if (closed) return;
97 
98     if (clogged_qel != null) {
99       // Try to drain the clogged element first
100       if (DEBUG) System.err.println("SockState: doRead draining clogged element "+clogged_qel);
101       try {
102 	readCompQ.enqueue(clogged_qel);
103 	clogged_qel = null;
104 	clogged_numtries = 0;
105       } catch (SinkFullException qfe) {
106 	// Nope, still clogged
107 	if ((readClogTries != -1) &&
108 	    (++clogged_numtries >= readClogTries)) {
109 	  if (DEBUG) System.err.println("SockState: warning: readClogTries exceeded, dropping "+clogged_qel);
110 	  clogged_qel = null;
111 	  clogged_numtries = 0;
112 	} else {
113 	  // Try again later
114 	  return;
115 	}
116       } catch (SinkException sce) {
117 	// Whoops - user went away - just drop
118 	this.close(null);
119       }
120     }
121 
122     int len;
123 
124     try {
125       if (DEBUG) System.err.println("SockState: doRead trying read");
126       len = nbis.read(readBuf, 0, READ_BUFFER_SIZE);
127       if (DEBUG) System.err.println("SockState: read returned "+len);
128 
129       if (len == 0) {
130 	// XXX MDW: Sometimes we get an error return result from
131 	// poll() which causes an attempted read here, but no
132 	// IOException. For now I am going to just drop the "null"
133 	// packet - on Linux it seems that certain TCP errors can
134 	// trigger this.
135 	//System.err.println("ss.doRead: Warning: Got empty read on socket");
136         readsi.revents = 0;
137 	return;
138       } else if (len < 0) {
139 	// Read failed - assume socket is dead
140 	if (DEBUG) System.err.println("ss.doRead: read failed, sock closed");
141 	this.close(readCompQ);
142 	readsi.revents = 0;
143 	return;
144       }
145     } catch (Exception e) {
146       // Read failed - assume socket is dead
147       if (DEBUG) System.err.println("ss.doRead: read got IOException: "+e.getMessage());
148       this.close(readCompQ);
149       readsi.revents = 0;
150       return;
151     }
152 
153     if (DEBUG) System.err.println("ss.doRead: Pushing up new ATcpInPacket, len="+len);
154 
155     pkt = new ATcpInPacket(conn, readBuf, len, aSocketConst.READ_BUFFER_COPY, seqNum);
156     // 0 is special (indicates no sequence number)
157     seqNum++; if (seqNum == 0) seqNum = 1;
158     if (aSocketConst.READ_BUFFER_COPY == false) {
159       readBuf = new byte[aSocketConst.READ_BUFFER_SIZE];
160     }
161 
162     try {
163       readCompQ.enqueue(pkt);
164     } catch (SinkFullException qfe) {
165       clogged_qel = pkt;
166       clogged_numtries = 0;
167       return;
168     } catch (SinkException sce) {
169       // User has gone away
170       this.close(null);
171       return;
172     }
173     readsi.revents = 0;
174   }
175 
176   // XXX This is synchronized with close() to avoid a race with close()
177   // removing the writeReqList while this method is being called.
178   // Probably a better way to do this...
addWriteRequest(aSocketRequest req, SelectSourceIF write_selsource)179   protected synchronized boolean addWriteRequest(aSocketRequest req, SelectSourceIF write_selsource) {
180     if (closed) return false;
181 
182     if (DEBUG) System.err.println("SockState: addWriteRequest called");
183 
184 
185     if (this.write_selsource == null) {
186       if (DEBUG) System.err.println("SockState: Setting selsource to "+write_selsource);
187       this.write_selsource = (SelectSource)write_selsource;
188       writesi = new SelectItem((NonblockingSocket)nbsock, this, Selectable.WRITE_READY);
189       ((SelectSource)write_selsource).register(writesi);
190       numActiveWriteSockets++;
191       if (DEBUG) System.err.println("SockState: Registered with selsource");
192     } else if (this.outstanding_writes == 0) {
193       numEmptyWrites = 0;
194       writeMaskEnable();
195     }
196 
197     if ((writeClogThreshold != -1) &&
198 	(this.outstanding_writes > writeClogThreshold)) {
199       if (DEBUG) System.err.println("SockState: warning: writeClogThreshold exceeded, dropping "+req);
200       if (req instanceof ATcpWriteRequest) return false;
201       if (req instanceof ATcpCloseRequest) {
202 	// Do immediate close: Assume socket is clogged
203 	ATcpCloseRequest creq = (ATcpCloseRequest)req;
204 	this.close(creq.compQ);
205 	return true;
206       }
207     }
208 
209     if (DEBUG) System.err.println("SockState: Adding writeReq to tail");
210     writeReqList.add_to_tail(req);
211     this.outstanding_writes++;
212     return true;
213   }
214 
initWrite(ATcpWriteRequest req)215   protected void initWrite(ATcpWriteRequest req) {
216     this.cur_write_req = req;
217     this.writeBuf = req.buf.data;
218     this.cur_offset = req.buf.offset;
219     this.cur_length_target = req.buf.size + cur_offset;
220   }
221 
tryWrite()222   protected boolean tryWrite() throws SinkClosedException {
223     try {
224       int tryLen;
225       if (MAX_WRITE_LEN == -1) {
226 	tryLen = cur_length_target - cur_offset;
227       } else {
228 	tryLen = Math.min(cur_length_target - cur_offset, MAX_WRITE_LEN);
229       }
230       cur_offset += nbos.nbWrite(writeBuf, cur_offset, tryLen);
231       if (DEBUG) System.err.println("SockState: tryWrite() of "+tryLen+" bytes (len="+cur_length_target+", off="+cur_offset);
232 
233     } catch (IOException ioe) {
234       // Assume this is because socket was already closed
235       this.close(null);
236       throw new SinkClosedException("tryWrite got exception doing write: "+ioe.getMessage());
237     }
238     if (cur_offset == cur_length_target) {
239       if (DEBUG) System.err.println("SockState: tryWrite() completed write of "+cur_length_target+" bytes");
240       return true;
241     }
242     else return false;
243   }
244 
writeMaskEnable()245   protected void writeMaskEnable() {
246     numActiveWriteSockets++;
247     writesi.events |= Selectable.WRITE_READY;
248     write_selsource.update(writesi);
249   }
250 
writeMaskDisable()251   protected void writeMaskDisable() {
252     numActiveWriteSockets--;
253     writesi.events &= ~(Selectable.WRITE_READY);
254     write_selsource.update(writesi);
255   }
256 
257   // XXX This is synchronized to avoid close() interfering with
258   // addWriteRequest
close(SinkIF closeEventQueue)259   protected synchronized void close(SinkIF closeEventQueue) {
260     if (closed) return;
261 
262     closed = true;
263 
264     if (DEBUG) System.err.println("SockState.close(): Deregistering with selsources");
265     if (read_selsource != null) read_selsource.deregister(readsi);
266     if (write_selsource != null) write_selsource.deregister(writesi);
267     if (DEBUG) System.err.println("SockState.close(): done deregistering with selsources");
268     // Eliminate write queue
269 
270     // XXX XXX XXX MDW: This introduces a race condition with
271     // addWriteRequest() -- need to serialize close() with other
272     // queue operations on the socket.
273     writeReqList = null;
274 
275     try {
276       if (DEBUG) System.err.println("SockState.close(): doing close ["+nbsock+"]");
277       nbsock.close();
278     } catch (IOException e) {
279       // Do nothing
280     }
281 
282     if (closeEventQueue != null) {
283       SinkClosedEvent sce = new SinkClosedEvent(conn);
284       closeEventQueue.enqueue_lossy(sce);
285     }
286   }
287 
288 }
289 
290