1 /* 2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.io.*; 28 import java.util.*; 29 import java.rmi.server.LogStream; 30 31 import sun.rmi.runtime.Log; 32 33 /** 34 * ConnectionMultiplexer manages the transparent multiplexing of 35 * multiple virtual connections from one endpoint to another through 36 * one given real connection to that endpoint. The input and output 37 * streams for the the underlying real connection must be supplied. 38 * A callback object is also supplied to be informed of new virtual 39 * connections opened by the remote endpoint. After creation, the 40 * run() method must be called in a thread created for demultiplexing 41 * the connections. The openConnection() method is called to 42 * initiate a virtual connection from this endpoint. 43 * 44 * @author Peter Jones 45 */ 46 @SuppressWarnings("deprecation") 47 final class ConnectionMultiplexer { 48 49 /** "multiplex" log level */ 50 static int logLevel = LogStream.parseLevel(getLogLevel()); 51 getLogLevel()52 private static String getLogLevel() { 53 return java.security.AccessController.doPrivileged( 54 new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel")); 55 } 56 57 /* multiplex system log */ 58 static final Log multiplexLog = 59 Log.getLog("sun.rmi.transport.tcp.multiplex", 60 "multiplex", ConnectionMultiplexer.logLevel); 61 62 /** multiplexing protocol operation codes */ 63 private final static int OPEN = 0xE1; 64 private final static int CLOSE = 0xE2; 65 private final static int CLOSEACK = 0xE3; 66 private final static int REQUEST = 0xE4; 67 private final static int TRANSMIT = 0xE5; 68 69 /** object to notify for new connections from remote endpoint */ 70 private TCPChannel channel; 71 72 /** input stream for underlying single connection */ 73 private InputStream in; 74 75 /** output stream for underlying single connection */ 76 private OutputStream out; 77 78 /** true if underlying connection originated from this endpoint 79 (used for generating unique connection IDs) */ 80 private boolean orig; 81 82 /** layered stream for reading formatted data from underlying connection */ 83 private DataInputStream dataIn; 84 85 /** layered stream for writing formatted data to underlying connection */ 86 private DataOutputStream dataOut; 87 88 /** table holding currently open connection IDs and related info */ 89 private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7); 90 91 /** number of currently open connections */ 92 private int numConnections = 0; 93 94 /** maximum allowed open connections */ 95 private final static int maxConnections = 256; 96 97 /** ID of last connection opened */ 98 private int lastID = 0x1001; 99 100 /** true if this mechanism is still alive */ 101 private boolean alive = true; 102 103 /** 104 * Create a new ConnectionMultiplexer using the given underlying 105 * input/output stream pair. The run method must be called 106 * (possibly on a new thread) to handle the demultiplexing. 107 * @param channel object to notify when new connection is received 108 * @param in input stream of underlying connection 109 * @param out output stream of underlying connection 110 * @param orig true if this endpoint intiated the underlying 111 * connection (needs to be set differently at both ends) 112 */ ConnectionMultiplexer( TCPChannel channel, InputStream in, OutputStream out, boolean orig)113 public ConnectionMultiplexer( 114 TCPChannel channel, 115 InputStream in, 116 OutputStream out, 117 boolean orig) 118 { 119 this.channel = channel; 120 this.in = in; 121 this.out = out; 122 this.orig = orig; 123 124 dataIn = new DataInputStream(in); 125 dataOut = new DataOutputStream(out); 126 } 127 128 /** 129 * Process multiplexing protocol received from underlying connection. 130 */ run()131 public void run() throws IOException 132 { 133 try { 134 int op, id, length; 135 MultiplexConnectionInfo info; 136 137 while (true) { 138 139 // read next op code from remote endpoint 140 op = dataIn.readUnsignedByte(); 141 switch (op) { 142 143 // remote endpoint initiating new connection 144 case OPEN: 145 id = dataIn.readUnsignedShort(); 146 147 if (multiplexLog.isLoggable(Log.VERBOSE)) { 148 multiplexLog.log(Log.VERBOSE, "operation OPEN " + id); 149 } 150 151 info = connectionTable.get(id); 152 if (info != null) 153 throw new IOException( 154 "OPEN: Connection ID already exists"); 155 info = new MultiplexConnectionInfo(id); 156 info.in = new MultiplexInputStream(this, info, 2048); 157 info.out = new MultiplexOutputStream(this, info, 2048); 158 synchronized (connectionTable) { 159 connectionTable.put(id, info); 160 ++ numConnections; 161 } 162 sun.rmi.transport.Connection conn; 163 conn = new TCPConnection(channel, info.in, info.out); 164 channel.acceptMultiplexConnection(conn); 165 break; 166 167 // remote endpoint closing connection 168 case CLOSE: 169 id = dataIn.readUnsignedShort(); 170 171 if (multiplexLog.isLoggable(Log.VERBOSE)) { 172 multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id); 173 } 174 175 info = connectionTable.get(id); 176 if (info == null) 177 throw new IOException( 178 "CLOSE: Invalid connection ID"); 179 info.in.disconnect(); 180 info.out.disconnect(); 181 if (!info.closed) 182 sendCloseAck(info); 183 synchronized (connectionTable) { 184 connectionTable.remove(id); 185 -- numConnections; 186 } 187 break; 188 189 // remote endpoint acknowledging close of connection 190 case CLOSEACK: 191 id = dataIn.readUnsignedShort(); 192 193 if (multiplexLog.isLoggable(Log.VERBOSE)) { 194 multiplexLog.log(Log.VERBOSE, 195 "operation CLOSEACK " + id); 196 } 197 198 info = connectionTable.get(id); 199 if (info == null) 200 throw new IOException( 201 "CLOSEACK: Invalid connection ID"); 202 if (!info.closed) 203 throw new IOException( 204 "CLOSEACK: Connection not closed"); 205 info.in.disconnect(); 206 info.out.disconnect(); 207 synchronized (connectionTable) { 208 connectionTable.remove(id); 209 -- numConnections; 210 } 211 break; 212 213 // remote endpoint declaring additional bytes receivable 214 case REQUEST: 215 id = dataIn.readUnsignedShort(); 216 info = connectionTable.get(id); 217 if (info == null) 218 throw new IOException( 219 "REQUEST: Invalid connection ID"); 220 length = dataIn.readInt(); 221 222 if (multiplexLog.isLoggable(Log.VERBOSE)) { 223 multiplexLog.log(Log.VERBOSE, 224 "operation REQUEST " + id + ": " + length); 225 } 226 227 info.out.request(length); 228 break; 229 230 // remote endpoint transmitting data packet 231 case TRANSMIT: 232 id = dataIn.readUnsignedShort(); 233 info = connectionTable.get(id); 234 if (info == null) 235 throw new IOException("SEND: Invalid connection ID"); 236 length = dataIn.readInt(); 237 238 if (multiplexLog.isLoggable(Log.VERBOSE)) { 239 multiplexLog.log(Log.VERBOSE, 240 "operation TRANSMIT " + id + ": " + length); 241 } 242 243 info.in.receive(length, dataIn); 244 break; 245 246 default: 247 throw new IOException("Invalid operation: " + 248 Integer.toHexString(op)); 249 } 250 } 251 } finally { 252 shutDown(); 253 } 254 } 255 256 /** 257 * Initiate a new multiplexed connection through the underlying 258 * connection. 259 */ openConnection()260 public synchronized TCPConnection openConnection() throws IOException 261 { 262 // generate ID that should not be already used 263 // If all possible 32768 IDs are used, 264 // this method will block searching for a new ID forever. 265 int id; 266 do { 267 lastID = (++ lastID) & 0x7FFF; 268 id = lastID; 269 270 // The orig flag (copied to the high bit of the ID) is used 271 // to have two distinct ranges to choose IDs from for the 272 // two endpoints. 273 if (orig) 274 id |= 0x8000; 275 } while (connectionTable.get(id) != null); 276 277 // create multiplexing streams and bookkeeping information 278 MultiplexConnectionInfo info = new MultiplexConnectionInfo(id); 279 info.in = new MultiplexInputStream(this, info, 2048); 280 info.out = new MultiplexOutputStream(this, info, 2048); 281 282 // add to connection table if multiplexer has not died 283 synchronized (connectionTable) { 284 if (!alive) 285 throw new IOException("Multiplexer connection dead"); 286 if (numConnections >= maxConnections) 287 throw new IOException("Cannot exceed " + maxConnections + 288 " simultaneous multiplexed connections"); 289 connectionTable.put(id, info); 290 ++ numConnections; 291 } 292 293 // inform remote endpoint of new connection 294 synchronized (dataOut) { 295 try { 296 dataOut.writeByte(OPEN); 297 dataOut.writeShort(id); 298 dataOut.flush(); 299 } catch (IOException e) { 300 multiplexLog.log(Log.BRIEF, "exception: ", e); 301 302 shutDown(); 303 throw e; 304 } 305 } 306 307 return new TCPConnection(channel, info.in, info.out); 308 } 309 310 /** 311 * Shut down all connections and clean up. 312 */ shutDown()313 public void shutDown() 314 { 315 // inform all associated streams 316 synchronized (connectionTable) { 317 // return if multiplexer already officially dead 318 if (!alive) 319 return; 320 alive = false; 321 322 Enumeration<MultiplexConnectionInfo> enum_ = 323 connectionTable.elements(); 324 while (enum_.hasMoreElements()) { 325 MultiplexConnectionInfo info = enum_.nextElement(); 326 info.in.disconnect(); 327 info.out.disconnect(); 328 } 329 connectionTable.clear(); 330 numConnections = 0; 331 } 332 333 // close underlying connection, if possible (and not already done) 334 try { 335 in.close(); 336 } catch (IOException e) { 337 } 338 try { 339 out.close(); 340 } catch (IOException e) { 341 } 342 } 343 344 /** 345 * Send request for more data on connection to remote endpoint. 346 * @param info connection information structure 347 * @param len number of more bytes that can be received 348 */ sendRequest(MultiplexConnectionInfo info, int len)349 void sendRequest(MultiplexConnectionInfo info, int len) throws IOException 350 { 351 synchronized (dataOut) { 352 if (alive && !info.closed) 353 try { 354 dataOut.writeByte(REQUEST); 355 dataOut.writeShort(info.id); 356 dataOut.writeInt(len); 357 dataOut.flush(); 358 } catch (IOException e) { 359 multiplexLog.log(Log.BRIEF, "exception: ", e); 360 361 shutDown(); 362 throw e; 363 } 364 } 365 } 366 367 /** 368 * Send packet of requested data on connection to remote endpoint. 369 * @param info connection information structure 370 * @param buf array containing bytes to send 371 * @param off offset of first array index of packet 372 * @param len number of bytes in packet to send 373 */ sendTransmit(MultiplexConnectionInfo info, byte buf[], int off, int len)374 void sendTransmit(MultiplexConnectionInfo info, 375 byte buf[], int off, int len) throws IOException 376 { 377 synchronized (dataOut) { 378 if (alive && !info.closed) 379 try { 380 dataOut.writeByte(TRANSMIT); 381 dataOut.writeShort(info.id); 382 dataOut.writeInt(len); 383 dataOut.write(buf, off, len); 384 dataOut.flush(); 385 } catch (IOException e) { 386 multiplexLog.log(Log.BRIEF, "exception: ", e); 387 388 shutDown(); 389 throw e; 390 } 391 } 392 } 393 394 /** 395 * Inform remote endpoint that connection has been closed. 396 * @param info connection information structure 397 */ sendClose(MultiplexConnectionInfo info)398 void sendClose(MultiplexConnectionInfo info) throws IOException 399 { 400 info.out.disconnect(); 401 synchronized (dataOut) { 402 if (alive && !info.closed) 403 try { 404 dataOut.writeByte(CLOSE); 405 dataOut.writeShort(info.id); 406 dataOut.flush(); 407 info.closed = true; 408 } catch (IOException e) { 409 multiplexLog.log(Log.BRIEF, "exception: ", e); 410 411 shutDown(); 412 throw e; 413 } 414 } 415 } 416 417 /** 418 * Acknowledge remote endpoint's closing of connection. 419 * @param info connection information structure 420 */ sendCloseAck(MultiplexConnectionInfo info)421 void sendCloseAck(MultiplexConnectionInfo info) throws IOException 422 { 423 synchronized (dataOut) { 424 if (alive && !info.closed) 425 try { 426 dataOut.writeByte(CLOSEACK); 427 dataOut.writeShort(info.id); 428 dataOut.flush(); 429 info.closed = true; 430 } catch (IOException e) { 431 multiplexLog.log(Log.BRIEF, "exception: ", e); 432 433 shutDown(); 434 throw e; 435 } 436 } 437 } 438 439 /** 440 * Shut down connection upon finalization. 441 */ finalize()442 protected void finalize() throws Throwable 443 { 444 super.finalize(); 445 shutDown(); 446 } 447 } 448