1 /*
2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package sun.rmi.transport.tcp;
26 
27 import java.io.*;
28 import java.util.*;
29 import java.rmi.server.LogStream;
30 
31 import sun.rmi.runtime.Log;
32 
33 /**
34  * ConnectionMultiplexer manages the transparent multiplexing of
35  * multiple virtual connections from one endpoint to another through
36  * one given real connection to that endpoint.  The input and output
37  * streams for the the underlying real connection must be supplied.
38  * A callback object is also supplied to be informed of new virtual
39  * connections opened by the remote endpoint.  After creation, the
40  * run() method must be called in a thread created for demultiplexing
41  * the connections.  The openConnection() method is called to
42  * initiate a virtual connection from this endpoint.
43  *
44  * @author Peter Jones
45  */
46 @SuppressWarnings("deprecation")
47 final class ConnectionMultiplexer {
48 
49     /** "multiplex" log level */
50     static int logLevel = LogStream.parseLevel(getLogLevel());
51 
getLogLevel()52     private static String getLogLevel() {
53         return java.security.AccessController.doPrivileged(
54             new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel"));
55     }
56 
57     /* multiplex system log */
58     static final Log multiplexLog =
59         Log.getLog("sun.rmi.transport.tcp.multiplex",
60                    "multiplex", ConnectionMultiplexer.logLevel);
61 
62     /** multiplexing protocol operation codes */
63     private final static int OPEN     = 0xE1;
64     private final static int CLOSE    = 0xE2;
65     private final static int CLOSEACK = 0xE3;
66     private final static int REQUEST  = 0xE4;
67     private final static int TRANSMIT = 0xE5;
68 
69     /** object to notify for new connections from remote endpoint */
70     private TCPChannel channel;
71 
72     /** input stream for underlying single connection */
73     private InputStream in;
74 
75     /** output stream for underlying single connection */
76     private OutputStream out;
77 
78     /** true if underlying connection originated from this endpoint
79         (used for generating unique connection IDs) */
80     private boolean orig;
81 
82     /** layered stream for reading formatted data from underlying connection */
83     private DataInputStream dataIn;
84 
85     /** layered stream for writing formatted data to underlying connection */
86     private DataOutputStream dataOut;
87 
88     /** table holding currently open connection IDs and related info */
89     private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
90 
91     /** number of currently open connections */
92     private int numConnections = 0;
93 
94     /** maximum allowed open connections */
95     private final static int maxConnections = 256;
96 
97     /** ID of last connection opened */
98     private int lastID = 0x1001;
99 
100     /** true if this mechanism is still alive */
101     private boolean alive = true;
102 
103     /**
104      * Create a new ConnectionMultiplexer using the given underlying
105      * input/output stream pair.  The run method must be called
106      * (possibly on a new thread) to handle the demultiplexing.
107      * @param channel object to notify when new connection is received
108      * @param in input stream of underlying connection
109      * @param out output stream of underlying connection
110      * @param orig true if this endpoint intiated the underlying
111      *        connection (needs to be set differently at both ends)
112      */
ConnectionMultiplexer( TCPChannel channel, InputStream in, OutputStream out, boolean orig)113     public ConnectionMultiplexer(
114         TCPChannel    channel,
115         InputStream   in,
116         OutputStream  out,
117         boolean       orig)
118     {
119         this.channel = channel;
120         this.in      = in;
121         this.out     = out;
122         this.orig    = orig;
123 
124         dataIn = new DataInputStream(in);
125         dataOut = new DataOutputStream(out);
126     }
127 
128     /**
129      * Process multiplexing protocol received from underlying connection.
130      */
run()131     public void run() throws IOException
132     {
133         try {
134             int op, id, length;
135             MultiplexConnectionInfo info;
136 
137             while (true) {
138 
139                 // read next op code from remote endpoint
140                 op = dataIn.readUnsignedByte();
141                 switch (op) {
142 
143                 // remote endpoint initiating new connection
144                 case OPEN:
145                     id = dataIn.readUnsignedShort();
146 
147                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
148                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
149                     }
150 
151                     info = connectionTable.get(id);
152                     if (info != null)
153                         throw new IOException(
154                             "OPEN: Connection ID already exists");
155                     info = new MultiplexConnectionInfo(id);
156                     info.in = new MultiplexInputStream(this, info, 2048);
157                     info.out = new MultiplexOutputStream(this, info, 2048);
158                     synchronized (connectionTable) {
159                         connectionTable.put(id, info);
160                         ++ numConnections;
161                     }
162                     sun.rmi.transport.Connection conn;
163                     conn = new TCPConnection(channel, info.in, info.out);
164                     channel.acceptMultiplexConnection(conn);
165                     break;
166 
167                 // remote endpoint closing connection
168                 case CLOSE:
169                     id = dataIn.readUnsignedShort();
170 
171                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
172                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
173                     }
174 
175                     info = connectionTable.get(id);
176                     if (info == null)
177                         throw new IOException(
178                             "CLOSE: Invalid connection ID");
179                     info.in.disconnect();
180                     info.out.disconnect();
181                     if (!info.closed)
182                         sendCloseAck(info);
183                     synchronized (connectionTable) {
184                         connectionTable.remove(id);
185                         -- numConnections;
186                     }
187                     break;
188 
189                 // remote endpoint acknowledging close of connection
190                 case CLOSEACK:
191                     id = dataIn.readUnsignedShort();
192 
193                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
194                         multiplexLog.log(Log.VERBOSE,
195                             "operation  CLOSEACK " + id);
196                     }
197 
198                     info = connectionTable.get(id);
199                     if (info == null)
200                         throw new IOException(
201                             "CLOSEACK: Invalid connection ID");
202                     if (!info.closed)
203                         throw new IOException(
204                             "CLOSEACK: Connection not closed");
205                     info.in.disconnect();
206                     info.out.disconnect();
207                     synchronized (connectionTable) {
208                         connectionTable.remove(id);
209                         -- numConnections;
210                     }
211                     break;
212 
213                 // remote endpoint declaring additional bytes receivable
214                 case REQUEST:
215                     id = dataIn.readUnsignedShort();
216                     info = connectionTable.get(id);
217                     if (info == null)
218                         throw new IOException(
219                             "REQUEST: Invalid connection ID");
220                     length = dataIn.readInt();
221 
222                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
223                         multiplexLog.log(Log.VERBOSE,
224                             "operation  REQUEST " + id + ": " + length);
225                     }
226 
227                     info.out.request(length);
228                     break;
229 
230                 // remote endpoint transmitting data packet
231                 case TRANSMIT:
232                     id = dataIn.readUnsignedShort();
233                     info = connectionTable.get(id);
234                     if (info == null)
235                         throw new IOException("SEND: Invalid connection ID");
236                     length = dataIn.readInt();
237 
238                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
239                         multiplexLog.log(Log.VERBOSE,
240                             "operation  TRANSMIT " + id + ": " + length);
241                     }
242 
243                     info.in.receive(length, dataIn);
244                     break;
245 
246                 default:
247                     throw new IOException("Invalid operation: " +
248                                           Integer.toHexString(op));
249                 }
250             }
251         } finally {
252             shutDown();
253         }
254     }
255 
256     /**
257      * Initiate a new multiplexed connection through the underlying
258      * connection.
259      */
openConnection()260     public synchronized TCPConnection openConnection() throws IOException
261     {
262         // generate ID that should not be already used
263         // If all possible 32768 IDs are used,
264         // this method will block searching for a new ID forever.
265         int id;
266         do {
267             lastID = (++ lastID) & 0x7FFF;
268             id = lastID;
269 
270             // The orig flag (copied to the high bit of the ID) is used
271             // to have two distinct ranges to choose IDs from for the
272             // two endpoints.
273             if (orig)
274                 id |= 0x8000;
275         } while (connectionTable.get(id) != null);
276 
277         // create multiplexing streams and bookkeeping information
278         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
279         info.in = new MultiplexInputStream(this, info, 2048);
280         info.out = new MultiplexOutputStream(this, info, 2048);
281 
282         // add to connection table if multiplexer has not died
283         synchronized (connectionTable) {
284             if (!alive)
285                 throw new IOException("Multiplexer connection dead");
286             if (numConnections >= maxConnections)
287                 throw new IOException("Cannot exceed " + maxConnections +
288                     " simultaneous multiplexed connections");
289             connectionTable.put(id, info);
290             ++ numConnections;
291         }
292 
293         // inform remote endpoint of new connection
294         synchronized (dataOut) {
295             try {
296                 dataOut.writeByte(OPEN);
297                 dataOut.writeShort(id);
298                 dataOut.flush();
299             } catch (IOException e) {
300                 multiplexLog.log(Log.BRIEF, "exception: ", e);
301 
302                 shutDown();
303                 throw e;
304             }
305         }
306 
307         return new TCPConnection(channel, info.in, info.out);
308     }
309 
310     /**
311      * Shut down all connections and clean up.
312      */
shutDown()313     public void shutDown()
314     {
315         // inform all associated streams
316         synchronized (connectionTable) {
317             // return if multiplexer already officially dead
318             if (!alive)
319                 return;
320             alive = false;
321 
322             Enumeration<MultiplexConnectionInfo> enum_ =
323                     connectionTable.elements();
324             while (enum_.hasMoreElements()) {
325                 MultiplexConnectionInfo info = enum_.nextElement();
326                 info.in.disconnect();
327                 info.out.disconnect();
328             }
329             connectionTable.clear();
330             numConnections = 0;
331         }
332 
333         // close underlying connection, if possible (and not already done)
334         try {
335             in.close();
336         } catch (IOException e) {
337         }
338         try {
339             out.close();
340         } catch (IOException e) {
341         }
342     }
343 
344     /**
345      * Send request for more data on connection to remote endpoint.
346      * @param info connection information structure
347      * @param len number of more bytes that can be received
348      */
sendRequest(MultiplexConnectionInfo info, int len)349     void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
350     {
351         synchronized (dataOut) {
352             if (alive && !info.closed)
353                 try {
354                     dataOut.writeByte(REQUEST);
355                     dataOut.writeShort(info.id);
356                     dataOut.writeInt(len);
357                     dataOut.flush();
358                 } catch (IOException e) {
359                     multiplexLog.log(Log.BRIEF, "exception: ", e);
360 
361                     shutDown();
362                     throw e;
363                 }
364         }
365     }
366 
367     /**
368      * Send packet of requested data on connection to remote endpoint.
369      * @param info connection information structure
370      * @param buf array containing bytes to send
371      * @param off offset of first array index of packet
372      * @param len number of bytes in packet to send
373      */
sendTransmit(MultiplexConnectionInfo info, byte buf[], int off, int len)374     void sendTransmit(MultiplexConnectionInfo info,
375                       byte buf[], int off, int len) throws IOException
376     {
377         synchronized (dataOut) {
378             if (alive && !info.closed)
379                 try {
380                     dataOut.writeByte(TRANSMIT);
381                     dataOut.writeShort(info.id);
382                     dataOut.writeInt(len);
383                     dataOut.write(buf, off, len);
384                     dataOut.flush();
385                 } catch (IOException e) {
386                     multiplexLog.log(Log.BRIEF, "exception: ", e);
387 
388                     shutDown();
389                     throw e;
390                 }
391         }
392     }
393 
394     /**
395      * Inform remote endpoint that connection has been closed.
396      * @param info connection information structure
397      */
sendClose(MultiplexConnectionInfo info)398     void sendClose(MultiplexConnectionInfo info) throws IOException
399     {
400         info.out.disconnect();
401         synchronized (dataOut) {
402             if (alive && !info.closed)
403                 try {
404                     dataOut.writeByte(CLOSE);
405                     dataOut.writeShort(info.id);
406                     dataOut.flush();
407                     info.closed = true;
408                 } catch (IOException e) {
409                     multiplexLog.log(Log.BRIEF, "exception: ", e);
410 
411                     shutDown();
412                     throw e;
413                 }
414         }
415     }
416 
417     /**
418      * Acknowledge remote endpoint's closing of connection.
419      * @param info connection information structure
420      */
sendCloseAck(MultiplexConnectionInfo info)421     void sendCloseAck(MultiplexConnectionInfo info) throws IOException
422     {
423         synchronized (dataOut) {
424             if (alive && !info.closed)
425                 try {
426                     dataOut.writeByte(CLOSEACK);
427                     dataOut.writeShort(info.id);
428                     dataOut.flush();
429                     info.closed = true;
430                 } catch (IOException e) {
431                     multiplexLog.log(Log.BRIEF, "exception: ", e);
432 
433                     shutDown();
434                     throw e;
435                 }
436         }
437     }
438 
439     /**
440      * Shut down connection upon finalization.
441      */
finalize()442     protected void finalize() throws Throwable
443     {
444         super.finalize();
445         shutDown();
446     }
447 }
448