1 /* 2 * Copyright (c) 2000 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.lib.aSocket.nbio; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.core.*; 29 import seda.sandStorm.lib.aSocket.*; 30 import seda.nbio.*; 31 32 import java.net.*; 33 import java.io.*; 34 import java.util.*; 35 36 /** 37 * Internal class used to represent state of an active socket connection. 38 */ 39 class SockState extends seda.sandStorm.lib.aSocket.SockState { 40 41 private static final boolean DEBUG = false; 42 43 private NonblockingInputStream nbis; 44 private NonblockingOutputStream nbos; 45 private SelectItem readsi, writesi; 46 47 private SelectSource read_selsource; 48 private SelectSource write_selsource; 49 SockState(ATcpConnection conn, Socket nbsock, int writeClogThreshold)50 public SockState(ATcpConnection conn, Socket nbsock, int writeClogThreshold) throws IOException { 51 if (DEBUG) System.err.println("SockState: Constructor called with "+conn+", "+nbsock+", "+writeClogThreshold); 52 this.conn = conn; 53 this.nbsock = nbsock; 54 this.writeClogThreshold = writeClogThreshold; 55 this.write_selsource = null; 56 57 if (DEBUG) System.err.println("Sockstate " + nbsock + ": Const creating nbis"); 58 nbis = (NonblockingInputStream)nbsock.getInputStream(); 59 if (DEBUG) System.err.println("Sockstate " + nbsock + ": Const creating nbos"); 60 nbos = (NonblockingOutputStream)nbsock.getOutputStream(); 61 62 if (DEBUG) System.err.println("SockState "+nbsock+": Const creating readBuf of size "+aSocketConst.READ_BUFFER_SIZE); 63 readBuf = new byte[aSocketConst.READ_BUFFER_SIZE]; 64 65 if (DEBUG) System.err.println("SockState "+nbsock+": Setting flags"); 66 outstanding_writes = 0; 67 numEmptyWrites = 0; 68 writeReqList = new ssLinkedList(); 69 70 clogged_qel = null; 71 clogged_numtries = 0; 72 if (DEBUG) System.err.println("SockState "+nbsock+": Const done"); 73 } 74 SockState(ATcpConnection conn, Socket nbsock, Integer writeClogThreshold)75 public SockState(ATcpConnection conn, Socket nbsock, Integer writeClogThreshold) throws IOException { 76 this(conn, nbsock, writeClogThreshold.intValue()); 77 } 78 79 // This is synchronized with close() readInit(SelectSourceIF read_selsource, SinkIF compQ, int readClogTries)80 protected synchronized void readInit(SelectSourceIF read_selsource, SinkIF compQ, int readClogTries) { 81 if (DEBUG) System.err.println("readInit called on "+this); 82 if (closed) return; // May have been closed already 83 this.read_selsource = (SelectSource)read_selsource; 84 this.readCompQ = compQ; 85 this.readClogTries = readClogTries; 86 readsi = new SelectItem((NonblockingSocket)nbsock, this, Selectable.READ_READY); 87 this.read_selsource.register(readsi); 88 } 89 90 doRead()91 protected void doRead() { 92 if (DEBUG) System.err.println("SockState: doRead called"); 93 94 // When using SelectSource, we need this guard, since after closing 95 // a socket we may have outstanding read events still in the queue 96 if (closed) return; 97 98 if (clogged_qel != null) { 99 // Try to drain the clogged element first 100 if (DEBUG) System.err.println("SockState: doRead draining clogged element "+clogged_qel); 101 try { 102 readCompQ.enqueue(clogged_qel); 103 clogged_qel = null; 104 clogged_numtries = 0; 105 } catch (SinkFullException qfe) { 106 // Nope, still clogged 107 if ((readClogTries != -1) && 108 (++clogged_numtries >= readClogTries)) { 109 if (DEBUG) System.err.println("SockState: warning: readClogTries exceeded, dropping "+clogged_qel); 110 clogged_qel = null; 111 clogged_numtries = 0; 112 } else { 113 // Try again later 114 return; 115 } 116 } catch (SinkException sce) { 117 // Whoops - user went away - just drop 118 this.close(null); 119 } 120 } 121 122 int len; 123 124 try { 125 if (DEBUG) System.err.println("SockState: doRead trying read"); 126 len = nbis.read(readBuf, 0, READ_BUFFER_SIZE); 127 if (DEBUG) System.err.println("SockState: read returned "+len); 128 129 if (len == 0) { 130 // XXX MDW: Sometimes we get an error return result from 131 // poll() which causes an attempted read here, but no 132 // IOException. For now I am going to just drop the "null" 133 // packet - on Linux it seems that certain TCP errors can 134 // trigger this. 135 //System.err.println("ss.doRead: Warning: Got empty read on socket"); 136 readsi.revents = 0; 137 return; 138 } else if (len < 0) { 139 // Read failed - assume socket is dead 140 if (DEBUG) System.err.println("ss.doRead: read failed, sock closed"); 141 this.close(readCompQ); 142 readsi.revents = 0; 143 return; 144 } 145 } catch (Exception e) { 146 // Read failed - assume socket is dead 147 if (DEBUG) System.err.println("ss.doRead: read got IOException: "+e.getMessage()); 148 this.close(readCompQ); 149 readsi.revents = 0; 150 return; 151 } 152 153 if (DEBUG) System.err.println("ss.doRead: Pushing up new ATcpInPacket, len="+len); 154 155 pkt = new ATcpInPacket(conn, readBuf, len, aSocketConst.READ_BUFFER_COPY, seqNum); 156 // 0 is special (indicates no sequence number) 157 seqNum++; if (seqNum == 0) seqNum = 1; 158 if (aSocketConst.READ_BUFFER_COPY == false) { 159 readBuf = new byte[aSocketConst.READ_BUFFER_SIZE]; 160 } 161 162 try { 163 readCompQ.enqueue(pkt); 164 } catch (SinkFullException qfe) { 165 clogged_qel = pkt; 166 clogged_numtries = 0; 167 return; 168 } catch (SinkException sce) { 169 // User has gone away 170 this.close(null); 171 return; 172 } 173 readsi.revents = 0; 174 } 175 176 // XXX This is synchronized with close() to avoid a race with close() 177 // removing the writeReqList while this method is being called. 178 // Probably a better way to do this... addWriteRequest(aSocketRequest req, SelectSourceIF write_selsource)179 protected synchronized boolean addWriteRequest(aSocketRequest req, SelectSourceIF write_selsource) { 180 if (closed) return false; 181 182 if (DEBUG) System.err.println("SockState: addWriteRequest called"); 183 184 185 if (this.write_selsource == null) { 186 if (DEBUG) System.err.println("SockState: Setting selsource to "+write_selsource); 187 this.write_selsource = (SelectSource)write_selsource; 188 writesi = new SelectItem((NonblockingSocket)nbsock, this, Selectable.WRITE_READY); 189 ((SelectSource)write_selsource).register(writesi); 190 numActiveWriteSockets++; 191 if (DEBUG) System.err.println("SockState: Registered with selsource"); 192 } else if (this.outstanding_writes == 0) { 193 numEmptyWrites = 0; 194 writeMaskEnable(); 195 } 196 197 if ((writeClogThreshold != -1) && 198 (this.outstanding_writes > writeClogThreshold)) { 199 if (DEBUG) System.err.println("SockState: warning: writeClogThreshold exceeded, dropping "+req); 200 if (req instanceof ATcpWriteRequest) return false; 201 if (req instanceof ATcpCloseRequest) { 202 // Do immediate close: Assume socket is clogged 203 ATcpCloseRequest creq = (ATcpCloseRequest)req; 204 this.close(creq.compQ); 205 return true; 206 } 207 } 208 209 if (DEBUG) System.err.println("SockState: Adding writeReq to tail"); 210 writeReqList.add_to_tail(req); 211 this.outstanding_writes++; 212 return true; 213 } 214 initWrite(ATcpWriteRequest req)215 protected void initWrite(ATcpWriteRequest req) { 216 this.cur_write_req = req; 217 this.writeBuf = req.buf.data; 218 this.cur_offset = req.buf.offset; 219 this.cur_length_target = req.buf.size + cur_offset; 220 } 221 tryWrite()222 protected boolean tryWrite() throws SinkClosedException { 223 try { 224 int tryLen; 225 if (MAX_WRITE_LEN == -1) { 226 tryLen = cur_length_target - cur_offset; 227 } else { 228 tryLen = Math.min(cur_length_target - cur_offset, MAX_WRITE_LEN); 229 } 230 cur_offset += nbos.nbWrite(writeBuf, cur_offset, tryLen); 231 if (DEBUG) System.err.println("SockState: tryWrite() of "+tryLen+" bytes (len="+cur_length_target+", off="+cur_offset); 232 233 } catch (IOException ioe) { 234 // Assume this is because socket was already closed 235 this.close(null); 236 throw new SinkClosedException("tryWrite got exception doing write: "+ioe.getMessage()); 237 } 238 if (cur_offset == cur_length_target) { 239 if (DEBUG) System.err.println("SockState: tryWrite() completed write of "+cur_length_target+" bytes"); 240 return true; 241 } 242 else return false; 243 } 244 writeMaskEnable()245 protected void writeMaskEnable() { 246 numActiveWriteSockets++; 247 writesi.events |= Selectable.WRITE_READY; 248 write_selsource.update(writesi); 249 } 250 writeMaskDisable()251 protected void writeMaskDisable() { 252 numActiveWriteSockets--; 253 writesi.events &= ~(Selectable.WRITE_READY); 254 write_selsource.update(writesi); 255 } 256 257 // XXX This is synchronized to avoid close() interfering with 258 // addWriteRequest close(SinkIF closeEventQueue)259 protected synchronized void close(SinkIF closeEventQueue) { 260 if (closed) return; 261 262 closed = true; 263 264 if (DEBUG) System.err.println("SockState.close(): Deregistering with selsources"); 265 if (read_selsource != null) read_selsource.deregister(readsi); 266 if (write_selsource != null) write_selsource.deregister(writesi); 267 if (DEBUG) System.err.println("SockState.close(): done deregistering with selsources"); 268 // Eliminate write queue 269 270 // XXX XXX XXX MDW: This introduces a race condition with 271 // addWriteRequest() -- need to serialize close() with other 272 // queue operations on the socket. 273 writeReqList = null; 274 275 try { 276 if (DEBUG) System.err.println("SockState.close(): doing close ["+nbsock+"]"); 277 nbsock.close(); 278 } catch (IOException e) { 279 // Do nothing 280 } 281 282 if (closeEventQueue != null) { 283 SinkClosedEvent sce = new SinkClosedEvent(conn); 284 closeEventQueue.enqueue_lossy(sce); 285 } 286 } 287 288 } 289 290