1 /*
2  * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package com.sun.corba.se.impl.transport;
27 
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.Socket;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.SelectableChannel;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.SocketChannel;
35 import java.security.AccessController;
36 import java.security.PrivilegedAction;
37 import java.util.Collections;
38 import java.util.Hashtable;
39 import java.util.HashMap;
40 import java.util.Map;
41 
42 import org.omg.CORBA.COMM_FAILURE;
43 import org.omg.CORBA.CompletionStatus;
44 import org.omg.CORBA.DATA_CONVERSION;
45 import org.omg.CORBA.INTERNAL;
46 import org.omg.CORBA.MARSHAL;
47 import org.omg.CORBA.OBJECT_NOT_EXIST;
48 import org.omg.CORBA.SystemException;
49 
50 import com.sun.org.omg.SendingContext.CodeBase;
51 
52 import com.sun.corba.se.pept.broker.Broker;
53 import com.sun.corba.se.pept.encoding.InputObject;
54 import com.sun.corba.se.pept.encoding.OutputObject;
55 import com.sun.corba.se.pept.protocol.MessageMediator;
56 import com.sun.corba.se.pept.transport.Acceptor;
57 import com.sun.corba.se.pept.transport.Connection;
58 import com.sun.corba.se.pept.transport.ConnectionCache;
59 import com.sun.corba.se.pept.transport.ContactInfo;
60 import com.sun.corba.se.pept.transport.EventHandler;
61 import com.sun.corba.se.pept.transport.InboundConnectionCache;
62 import com.sun.corba.se.pept.transport.OutboundConnectionCache;
63 import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
64 import com.sun.corba.se.pept.transport.Selector;
65 
66 import com.sun.corba.se.spi.ior.IOR;
67 import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
68 import com.sun.corba.se.spi.logging.CORBALogDomains;
69 import com.sun.corba.se.spi.orb.ORB ;
70 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
71 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
72 import com.sun.corba.se.spi.orbutil.threadpool.Work;
73 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
74 import com.sun.corba.se.spi.transport.CorbaContactInfo;
75 import com.sun.corba.se.spi.transport.CorbaConnection;
76 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
77 import com.sun.corba.se.spi.transport.ReadTimeouts;
78 
79 import com.sun.corba.se.impl.encoding.CachedCodeBase;
80 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
81 import com.sun.corba.se.impl.encoding.CDROutputObject;
82 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
83 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
84 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
85 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
86 import com.sun.corba.se.impl.orbutil.ORBConstants;
87 import com.sun.corba.se.impl.orbutil.ORBUtility;
88 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
89 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
90 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
91 
92 /**
93  * @author Harold Carr
94  */
95 public class SocketOrChannelConnectionImpl
96     extends
97         EventHandlerBase
98     implements
99         CorbaConnection,
100         Work
101 {
102     public static boolean dprintWriteLocks = false;
103 
104     //
105     // New transport.
106     //
107 
108     protected long enqueueTime;
109 
110     protected SocketChannel socketChannel;
getSocketChannel()111     public SocketChannel getSocketChannel()
112     {
113         return socketChannel;
114     }
115 
116     // REVISIT:
117     // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
118     protected CorbaContactInfo contactInfo;
119     protected Acceptor acceptor;
120     protected ConnectionCache connectionCache;
121 
122     //
123     // From iiop.Connection.java
124     //
125 
126     protected Socket socket;    // The socket used for this connection.
127     protected long timeStamp = 0;
128     protected boolean isServer = false;
129 
130     // Start at some value other than zero since this is a magic
131     // value in some protocols.
132     protected int requestId = 5;
133     protected CorbaResponseWaitingRoom responseWaitingRoom;
134     protected int state;
135     protected java.lang.Object stateEvent = new java.lang.Object();
136     protected java.lang.Object writeEvent = new java.lang.Object();
137     protected boolean writeLocked;
138     protected int serverRequestCount = 0;
139 
140     // Server request map: used on the server side of Connection
141     // Maps request ID to IIOPInputStream.
142     Map serverRequestMap = null;
143 
144     // This is a flag associated per connection telling us if the
145     // initial set of sending contexts were sent to the receiver
146     // already...
147     protected boolean postInitialContexts = false;
148 
149     // Remote reference to CodeBase server (supplies
150     // FullValueDescription, among other things)
151     protected IOR codeBaseServerIOR;
152 
153     // CodeBase cache for this connection.  This will cache remote operations,
154     // handle connecting, and ensure we don't do any remote operations until
155     // necessary.
156     protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
157 
158     protected ORBUtilSystemException wrapper ;
159 
160     // transport read timeout values
161     protected ReadTimeouts readTimeouts;
162 
163     protected boolean shouldReadGiopHeaderOnly;
164 
165     // A message mediator used when shouldReadGiopHeaderOnly is
166     // true to maintain request message state across execution in a
167     // SelectorThread and WorkerThread.
168     protected CorbaMessageMediator partialMessageMediator = null;
169 
170     // Used in genericRPCMSGFramework test.
SocketOrChannelConnectionImpl(ORB orb)171     protected SocketOrChannelConnectionImpl(ORB orb)
172     {
173         this.orb = orb;
174         wrapper = ORBUtilSystemException.get( orb,
175             CORBALogDomains.RPC_TRANSPORT ) ;
176 
177         setWork(this);
178         responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
179         setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
180     }
181 
182     // Both client and servers.
SocketOrChannelConnectionImpl(ORB orb, boolean useSelectThreadToWait, boolean useWorkerThread)183     protected SocketOrChannelConnectionImpl(ORB orb,
184                                             boolean useSelectThreadToWait,
185                                             boolean useWorkerThread)
186     {
187         this(orb) ;
188         setUseSelectThreadToWait(useSelectThreadToWait);
189         setUseWorkerThreadForEvent(useWorkerThread);
190     }
191 
192     // Client constructor.
SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, boolean useSelectThreadToWait, boolean useWorkerThread, String socketType, String hostname, int port)193     public SocketOrChannelConnectionImpl(ORB orb,
194                                          CorbaContactInfo contactInfo,
195                                          boolean useSelectThreadToWait,
196                                          boolean useWorkerThread,
197                                          String socketType,
198                                          String hostname,
199                                          int port)
200     {
201         this(orb, useSelectThreadToWait, useWorkerThread);
202 
203         this.contactInfo = contactInfo;
204 
205         try {
206             socket = orb.getORBData().getSocketFactory()
207                 .createSocket(socketType,
208                               new InetSocketAddress(hostname, port));
209             socketChannel = socket.getChannel();
210 
211             if (socketChannel != null) {
212                 boolean isBlocking = !useSelectThreadToWait;
213                 socketChannel.configureBlocking(isBlocking);
214             } else {
215                 // IMPORTANT: non-channel-backed sockets must use
216                 // dedicated reader threads.
217                 setUseSelectThreadToWait(false);
218             }
219             if (orb.transportDebugFlag) {
220                 dprint(".initialize: connection created: " + socket);
221             }
222         } catch (Throwable t) {
223             throw wrapper.connectFailure(t, socketType, hostname,
224                                          Integer.toString(port));
225         }
226         state = OPENING;
227     }
228 
229     // Client-side convenience.
SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, String socketType, String hostname, int port)230     public SocketOrChannelConnectionImpl(ORB orb,
231                                          CorbaContactInfo contactInfo,
232                                          String socketType,
233                                          String hostname,
234                                          int port)
235     {
236         this(orb, contactInfo,
237              orb.getORBData().connectionSocketUseSelectThreadToWait(),
238              orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
239              socketType, hostname, port);
240     }
241 
242     // Server-side constructor.
SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket, boolean useSelectThreadToWait, boolean useWorkerThread)243     public SocketOrChannelConnectionImpl(ORB orb,
244                                          Acceptor acceptor,
245                                          Socket socket,
246                                          boolean useSelectThreadToWait,
247                                          boolean useWorkerThread)
248     {
249         this(orb, useSelectThreadToWait, useWorkerThread);
250 
251         this.socket = socket;
252         socketChannel = socket.getChannel();
253         if (socketChannel != null) {
254             // REVISIT
255             try {
256                 boolean isBlocking = !useSelectThreadToWait;
257                 socketChannel.configureBlocking(isBlocking);
258             } catch (IOException e) {
259                 RuntimeException rte = new RuntimeException();
260                 rte.initCause(e);
261                 throw rte;
262             }
263         }
264         this.acceptor = acceptor;
265 
266         serverRequestMap = Collections.synchronizedMap(new HashMap());
267         isServer = true;
268 
269         state = ESTABLISHED;
270     }
271 
272     // Server-side convenience
SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket)273     public SocketOrChannelConnectionImpl(ORB orb,
274                                          Acceptor acceptor,
275                                          Socket socket)
276     {
277         this(orb, acceptor, socket,
278              (socket.getChannel() == null
279               ? false
280               : orb.getORBData().connectionSocketUseSelectThreadToWait()),
281              (socket.getChannel() == null
282               ? false
283               : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
284     }
285 
286     ////////////////////////////////////////////////////
287     //
288     // framework.transport.Connection
289     //
290 
shouldRegisterReadEvent()291     public boolean shouldRegisterReadEvent()
292     {
293         return true;
294     }
295 
shouldRegisterServerReadEvent()296     public boolean shouldRegisterServerReadEvent()
297     {
298         return true;
299     }
300 
read()301     public boolean read()
302     {
303         try {
304             if (orb.transportDebugFlag) {
305                 dprint(".read->: " + this);
306             }
307             CorbaMessageMediator messageMediator = readBits();
308             if (messageMediator != null) {
309                 // Null can happen when client closes stream
310                 // causing purgecalls.
311                 return dispatch(messageMediator);
312             }
313             return true;
314         } finally {
315             if (orb.transportDebugFlag) {
316                 dprint(".read<-: " + this);
317             }
318         }
319     }
320 
readBits()321     protected CorbaMessageMediator readBits()
322     {
323         try {
324 
325             if (orb.transportDebugFlag) {
326                 dprint(".readBits->: " + this);
327             }
328 
329             MessageMediator messageMediator;
330             // REVISIT - use common factory base class.
331             if (contactInfo != null) {
332                 messageMediator =
333                     contactInfo.createMessageMediator(orb, this);
334             } else if (acceptor != null) {
335                 messageMediator = acceptor.createMessageMediator(orb, this);
336             } else {
337                 throw
338                     new RuntimeException("SocketOrChannelConnectionImpl.readBits");
339             }
340             return (CorbaMessageMediator) messageMediator;
341 
342         } catch (ThreadDeath td) {
343             if (orb.transportDebugFlag) {
344                 dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
345             }
346             try {
347                 purgeCalls(wrapper.connectionAbort(td), false, false);
348             } catch (Throwable t) {
349                 if (orb.transportDebugFlag) {
350                     dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
351                 }
352             }
353             throw td;
354         } catch (Throwable ex) {
355             if (orb.transportDebugFlag) {
356                 dprint(".readBits: " + this + ": Throwable: " + ex, ex);
357             }
358 
359             try {
360                 if (ex instanceof INTERNAL) {
361                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
362                 }
363             } catch (IOException e) {
364                 if (orb.transportDebugFlag) {
365                     dprint(".readBits: " + this +
366                            ": sendMessageError: IOException: " + e, e);
367                 }
368             }
369             // REVISIT - make sure reader thread is killed.
370             Selector selector = orb.getTransportManager().getSelector(0);
371             if (selector != null) {
372                 selector.unregisterForEvent(this);
373             }
374             // Notify anyone waiting.
375             purgeCalls(wrapper.connectionAbort(ex), true, false);
376             // REVISIT
377             //keepRunning = false;
378             // REVISIT - if this is called after purgeCalls then
379             // the state of the socket is ABORT so the writeLock
380             // in close throws an exception.  It is ignored but
381             // causes IBM (screen scraping) tests to fail.
382             //close();
383         } finally {
384             if (orb.transportDebugFlag) {
385                 dprint(".readBits<-: " + this);
386             }
387         }
388         return null;
389     }
390 
finishReadingBits(MessageMediator messageMediator)391     protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
392     {
393         try {
394 
395             if (orb.transportDebugFlag) {
396                 dprint(".finishReadingBits->: " + this);
397             }
398 
399             // REVISIT - use common factory base class.
400             if (contactInfo != null) {
401                 messageMediator =
402                     contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
403             } else if (acceptor != null) {
404                 messageMediator =
405                     acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
406             } else {
407                 throw
408                     new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
409             }
410             return (CorbaMessageMediator) messageMediator;
411 
412         } catch (ThreadDeath td) {
413             if (orb.transportDebugFlag) {
414                 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
415             }
416             try {
417                 purgeCalls(wrapper.connectionAbort(td), false, false);
418             } catch (Throwable t) {
419                 if (orb.transportDebugFlag) {
420                     dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
421                 }
422             }
423             throw td;
424         } catch (Throwable ex) {
425             if (orb.transportDebugFlag) {
426                 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
427             }
428 
429             try {
430                 if (ex instanceof INTERNAL) {
431                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
432                 }
433             } catch (IOException e) {
434                 if (orb.transportDebugFlag) {
435                     dprint(".finishReadingBits: " + this +
436                            ": sendMessageError: IOException: " + e, e);
437                 }
438             }
439             // REVISIT - make sure reader thread is killed.
440             orb.getTransportManager().getSelector(0).unregisterForEvent(this);
441             // Notify anyone waiting.
442             purgeCalls(wrapper.connectionAbort(ex), true, false);
443             // REVISIT
444             //keepRunning = false;
445             // REVISIT - if this is called after purgeCalls then
446             // the state of the socket is ABORT so the writeLock
447             // in close throws an exception.  It is ignored but
448             // causes IBM (screen scraping) tests to fail.
449             //close();
450         } finally {
451             if (orb.transportDebugFlag) {
452                 dprint(".finishReadingBits<-: " + this);
453             }
454         }
455         return null;
456     }
457 
dispatch(CorbaMessageMediator messageMediator)458     protected boolean dispatch(CorbaMessageMediator messageMediator)
459     {
460         try {
461             if (orb.transportDebugFlag) {
462                 dprint(".dispatch->: " + this);
463             }
464 
465             //
466             // NOTE:
467             //
468             // This call is the transition from the tranport block
469             // to the protocol block.
470             //
471 
472             boolean result =
473                 messageMediator.getProtocolHandler()
474                 .handleRequest(messageMediator);
475 
476             return result;
477 
478         } catch (ThreadDeath td) {
479             if (orb.transportDebugFlag) {
480                 dprint(".dispatch: ThreadDeath", td );
481             }
482             try {
483                 purgeCalls(wrapper.connectionAbort(td), false, false);
484             } catch (Throwable t) {
485                 if (orb.transportDebugFlag) {
486                     dprint(".dispatch: purgeCalls: Throwable", t);
487                 }
488             }
489             throw td;
490         } catch (Throwable ex) {
491             if (orb.transportDebugFlag) {
492                 dprint(".dispatch: Throwable", ex ) ;
493             }
494 
495             try {
496                 if (ex instanceof INTERNAL) {
497                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
498                 }
499             } catch (IOException e) {
500                 if (orb.transportDebugFlag) {
501                     dprint(".dispatch: sendMessageError: IOException", e);
502                 }
503             }
504             purgeCalls(wrapper.connectionAbort(ex), false, false);
505             // REVISIT
506             //keepRunning = false;
507         } finally {
508             if (orb.transportDebugFlag) {
509                 dprint(".dispatch<-: " + this);
510             }
511         }
512 
513         return true;
514     }
515 
shouldUseDirectByteBuffers()516     public boolean shouldUseDirectByteBuffers()
517     {
518         return getSocketChannel() != null;
519     }
520 
read(int size, int offset, int length, long max_wait_time)521     public ByteBuffer read(int size, int offset, int length, long max_wait_time)
522         throws IOException
523     {
524         if (shouldUseDirectByteBuffers()) {
525 
526             ByteBuffer byteBuffer =
527                 orb.getByteBufferPool().getByteBuffer(size);
528 
529             if (orb.transportDebugFlag) {
530                 // print address of ByteBuffer gotten from pool
531                 int bbAddress = System.identityHashCode(byteBuffer);
532                 StringBuffer sb = new StringBuffer(80);
533                 sb.append(".read: got ByteBuffer id (");
534                 sb.append(bbAddress).append(") from ByteBufferPool.");
535                 String msgStr = sb.toString();
536                 dprint(msgStr);
537             }
538 
539             byteBuffer.position(offset);
540             byteBuffer.limit(size);
541 
542             readFully(byteBuffer, length, max_wait_time);
543 
544             return byteBuffer;
545         }
546 
547         byte[] buf = new byte[size];
548         readFully(getSocket().getInputStream(), buf,
549                   offset, length, max_wait_time);
550         ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
551         byteBuffer.limit(size);
552         return byteBuffer;
553     }
554 
read(ByteBuffer byteBuffer, int offset, int length, long max_wait_time)555     public ByteBuffer read(ByteBuffer byteBuffer, int offset,
556                            int length, long max_wait_time)
557         throws IOException
558     {
559         int size = offset + length;
560         if (shouldUseDirectByteBuffers()) {
561 
562             if (! byteBuffer.isDirect()) {
563                 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
564             }
565             if (size > byteBuffer.capacity()) {
566                 if (orb.transportDebugFlag) {
567                     // print address of ByteBuffer being released
568                     int bbAddress = System.identityHashCode(byteBuffer);
569                     StringBuffer bbsb = new StringBuffer(80);
570                     bbsb.append(".read: releasing ByteBuffer id (")
571                         .append(bbAddress).append(") to ByteBufferPool.");
572                     String bbmsg = bbsb.toString();
573                     dprint(bbmsg);
574                 }
575                 orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
576                 byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
577             }
578             byteBuffer.position(offset);
579             byteBuffer.limit(size);
580             readFully(byteBuffer, length, max_wait_time);
581             byteBuffer.position(0);
582             byteBuffer.limit(size);
583             return byteBuffer;
584         }
585         if (byteBuffer.isDirect()) {
586             throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
587         }
588         byte[] buf = new byte[size];
589         readFully(getSocket().getInputStream(), buf,
590                   offset, length, max_wait_time);
591         return ByteBuffer.wrap(buf);
592     }
593 
readFully(ByteBuffer byteBuffer, int size, long max_wait_time)594     public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
595         throws IOException
596     {
597         int n = 0;
598         int bytecount = 0;
599         long time_to_wait = readTimeouts.get_initial_time_to_wait();
600         long total_time_in_wait = 0;
601 
602         // The reading of data incorporates a strategy to detect a
603         // rogue client. The strategy is implemented as follows. As
604         // long as data is being read, at least 1 byte or more, we
605         // assume we have a well behaved client. If no data is read,
606         // then we sleep for a time to wait, re-calculate a new time to
607         // wait which is lengthier than the previous time spent waiting.
608         // Then, if the total time spent waiting does not exceed a
609         // maximum time we are willing to wait, we attempt another
610         // read. If the maximum amount of time we are willing to
611         // spend waiting for more data is exceeded, we throw an
612         // IOException.
613 
614         // NOTE: Reading of GIOP headers are treated with a smaller
615         //       maximum time to wait threshold. Based on extensive
616         //       performance testing, all GIOP headers are being
617         //       read in 1 read access.
618 
619         do {
620             bytecount = getSocketChannel().read(byteBuffer);
621 
622             if (bytecount < 0) {
623                 throw new IOException("End-of-stream");
624             }
625             else if (bytecount == 0) {
626                 try {
627                     Thread.sleep(time_to_wait);
628                     total_time_in_wait += time_to_wait;
629                     time_to_wait =
630                         (long)(time_to_wait*readTimeouts.get_backoff_factor());
631                 }
632                 catch (InterruptedException ie) {
633                     // ignore exception
634                     if (orb.transportDebugFlag) {
635                         dprint("readFully(): unexpected exception "
636                                 + ie.toString());
637                     }
638                 }
639             }
640             else {
641                 n += bytecount;
642             }
643         }
644         while (n < size && total_time_in_wait < max_wait_time);
645 
646         if (n < size && total_time_in_wait >= max_wait_time)
647         {
648             // failed to read entire message
649             throw wrapper.transportReadTimeoutExceeded(new Integer(size),
650                                       new Integer(n), new Long(max_wait_time),
651                                       new Long(total_time_in_wait));
652         }
653 
654         getConnectionCache().stampTime(this);
655     }
656 
657     // To support non-channel connections.
readFully(java.io.InputStream is, byte[] buf, int offset, int size, long max_wait_time)658     public void readFully(java.io.InputStream is, byte[] buf,
659                           int offset, int size, long max_wait_time)
660         throws IOException
661     {
662         int n = 0;
663         int bytecount = 0;
664         long time_to_wait = readTimeouts.get_initial_time_to_wait();
665         long total_time_in_wait = 0;
666 
667         // The reading of data incorporates a strategy to detect a
668         // rogue client. The strategy is implemented as follows. As
669         // long as data is being read, at least 1 byte or more, we
670         // assume we have a well behaved client. If no data is read,
671         // then we sleep for a time to wait, re-calculate a new time to
672         // wait which is lengthier than the previous time spent waiting.
673         // Then, if the total time spent waiting does not exceed a
674         // maximum time we are willing to wait, we attempt another
675         // read. If the maximum amount of time we are willing to
676         // spend waiting for more data is exceeded, we throw an
677         // IOException.
678 
679         // NOTE: Reading of GIOP headers are treated with a smaller
680         //       maximum time to wait threshold. Based on extensive
681         //       performance testing, all GIOP headers are being
682         //       read in 1 read access.
683 
684         do {
685             bytecount = is.read(buf, offset + n, size - n);
686             if (bytecount < 0) {
687                 throw new IOException("End-of-stream");
688             }
689             else if (bytecount == 0) {
690                 try {
691                     Thread.sleep(time_to_wait);
692                     total_time_in_wait += time_to_wait;
693                     time_to_wait =
694                         (long)(time_to_wait*readTimeouts.get_backoff_factor());
695                 }
696                 catch (InterruptedException ie) {
697                     // ignore exception
698                     if (orb.transportDebugFlag) {
699                         dprint("readFully(): unexpected exception "
700                                 + ie.toString());
701                     }
702                 }
703             }
704             else {
705                 n += bytecount;
706             }
707         }
708         while (n < size && total_time_in_wait < max_wait_time);
709 
710         if (n < size && total_time_in_wait >= max_wait_time)
711         {
712             // failed to read entire message
713             throw wrapper.transportReadTimeoutExceeded(new Integer(size),
714                                       new Integer(n), new Long(max_wait_time),
715                                       new Long(total_time_in_wait));
716         }
717 
718         getConnectionCache().stampTime(this);
719     }
720 
write(ByteBuffer byteBuffer)721     public void write(ByteBuffer byteBuffer)
722         throws IOException
723     {
724         if (shouldUseDirectByteBuffers()) {
725             /* NOTE: cannot perform this test.  If one ask for a
726                ByteBuffer from the pool which is bigger than the size
727                of ByteBuffers managed by the pool, then the pool will
728                return a HeapByteBuffer.
729             if (byteBuffer.hasArray()) {
730                 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
731             }
732             */
733             // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
734             //            all bytes are written on first write attempt.
735             do {
736                 getSocketChannel().write(byteBuffer);
737             }
738             while (byteBuffer.hasRemaining());
739 
740         } else {
741             if (! byteBuffer.hasArray()) {
742                 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
743             }
744             byte[] tmpBuf = byteBuffer.array();
745             getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
746             getSocket().getOutputStream().flush();
747         }
748 
749         // TimeStamp connection to indicate it has been used
750         // Note granularity of connection usage is assumed for
751         // now to be that of a IIOP packet.
752         getConnectionCache().stampTime(this);
753     }
754 
755     /**
756      * Note:it is possible for this to be called more than once
757      */
close()758     public synchronized void close()
759     {
760         try {
761             if (orb.transportDebugFlag) {
762                 dprint(".close->: " + this);
763             }
764             writeLock();
765 
766             // REVISIT It will be good to have a read lock on the reader thread
767             // before we proceed further, to avoid the reader thread (server side)
768             // from processing requests. This avoids the risk that a new request
769             // will be accepted by ReaderThread while the ListenerThread is
770             // attempting to close this connection.
771 
772             if (isBusy()) { // we are busy!
773                 writeUnlock();
774                 if (orb.transportDebugFlag) {
775                     dprint(".close: isBusy so no close: " + this);
776                 }
777                 return;
778             }
779 
780             try {
781                 try {
782                     sendCloseConnection(GIOPVersion.V1_0);
783                 } catch (Throwable t) {
784                     wrapper.exceptionWhenSendingCloseConnection(t);
785                 }
786 
787                 synchronized ( stateEvent ){
788                     state = CLOSE_SENT;
789                     stateEvent.notifyAll();
790                 }
791 
792                 // stop the reader without causing it to do purgeCalls
793                 //Exception ex = new Exception();
794                 //reader.stop(ex); // REVISIT
795 
796                 // NOTE: !!!!!!
797                 // This does writeUnlock().
798                 purgeCalls(wrapper.connectionRebind(), false, true);
799 
800             } catch (Exception ex) {
801                 if (orb.transportDebugFlag) {
802                     dprint(".close: exception: " + this, ex);
803                 }
804             }
805             try {
806                 Selector selector = orb.getTransportManager().getSelector(0);
807                 if (selector != null) {
808                     selector.unregisterForEvent(this);
809                 }
810                 if (socketChannel != null) {
811                     socketChannel.close();
812                 }
813                 socket.close();
814             } catch (IOException e) {
815                 if (orb.transportDebugFlag) {
816                     dprint(".close: " + this, e);
817                 }
818             }
819             closeConnectionResources();
820         } finally {
821             if (orb.transportDebugFlag) {
822                 dprint(".close<-: " + this);
823             }
824         }
825     }
826 
closeConnectionResources()827     public void closeConnectionResources() {
828            if (orb.transportDebugFlag) {
829                dprint(".closeConnectionResources->: " + this);
830            }
831            Selector selector = orb.getTransportManager().getSelector(0);
832            if (selector != null) {
833                selector.unregisterForEvent(this);
834            }
835            try {
836              if (socketChannel != null)
837               socketChannel.close() ;
838                 if (socket != null && !socket.isClosed())
839                 socket.close() ;
840            } catch (IOException e) {
841              if (orb.transportDebugFlag) {
842                  dprint( ".closeConnectionResources: " + this, e ) ;
843              }
844            }
845            if (orb.transportDebugFlag) {
846                dprint(".closeConnectionResources<-: " + this);
847            }
848      }
849 
850 
getAcceptor()851     public Acceptor getAcceptor()
852     {
853         return acceptor;
854     }
855 
getContactInfo()856     public ContactInfo getContactInfo()
857     {
858         return contactInfo;
859     }
860 
getEventHandler()861     public EventHandler getEventHandler()
862     {
863         return this;
864     }
865 
createOutputObject(MessageMediator messageMediator)866     public OutputObject createOutputObject(MessageMediator messageMediator)
867     {
868         // REVISIT - remove this method from Connection and all it subclasses.
869         throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
870     }
871 
872     // This is used by the GIOPOutputObject in order to
873     // throw the correct error when handling code sets.
874     // Can we determine if we are on the server side by
875     // other means?  XREVISIT
isServer()876     public boolean isServer()
877     {
878         return isServer;
879     }
880 
isBusy()881     public boolean isBusy()
882     {
883         if (serverRequestCount > 0 ||
884             getResponseWaitingRoom().numberRegistered() > 0)
885         {
886             return true;
887         } else {
888             return false;
889         }
890     }
891 
getTimeStamp()892     public long getTimeStamp()
893     {
894         return timeStamp;
895     }
896 
setTimeStamp(long time)897     public void setTimeStamp(long time)
898     {
899         timeStamp = time;
900     }
901 
setState(String stateString)902     public void setState(String stateString)
903     {
904         synchronized (stateEvent) {
905             if (stateString.equals("ESTABLISHED")) {
906                 state =  ESTABLISHED;
907                 stateEvent.notifyAll();
908             } else {
909                 // REVISIT: ASSERT
910             }
911         }
912     }
913 
914     /**
915      * Sets the writeLock for this connection.
916      * If the writeLock is already set by someone else, block till the
917      * writeLock is released and can set by us.
918      * IMPORTANT: this connection's lock must be acquired before
919      * setting the writeLock and must be unlocked after setting the writeLock.
920      */
writeLock()921     public void writeLock()
922     {
923       try {
924         if (dprintWriteLocks && orb.transportDebugFlag) {
925             dprint(".writeLock->: " + this);
926         }
927         // Keep looping till we can set the writeLock.
928         while ( true ) {
929             int localState = state;
930             switch ( localState ) {
931 
932             case OPENING:
933                 synchronized (stateEvent) {
934                     if (state != OPENING) {
935                         // somebody has changed 'state' so be careful
936                         break;
937                     }
938                     try {
939                         stateEvent.wait();
940                     } catch (InterruptedException ie) {
941                         if (orb.transportDebugFlag) {
942                             dprint(".writeLock: OPENING InterruptedException: " + this);
943                         }
944                     }
945                 }
946                 // Loop back
947                 break;
948 
949             case ESTABLISHED:
950                 synchronized (writeEvent) {
951                     if (!writeLocked) {
952                         writeLocked = true;
953                         return;
954                     }
955 
956                     try {
957                         // do not stay here too long if state != ESTABLISHED
958                         // Bug 4752117
959                         while (state == ESTABLISHED && writeLocked) {
960                             writeEvent.wait(100);
961                         }
962                     } catch (InterruptedException ie) {
963                         if (orb.transportDebugFlag) {
964                             dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
965                         }
966                     }
967                 }
968                 // Loop back
969                 break;
970 
971                 //
972                 // XXX
973                 // Need to distinguish between client and server roles
974                 // here probably.
975                 //
976             case ABORT:
977                 synchronized ( stateEvent ){
978                     if (state != ABORT) {
979                         break;
980                     }
981                     throw wrapper.writeErrorSend() ;
982                 }
983 
984             case CLOSE_RECVD:
985                 // the connection has been closed or closing
986                 // ==> throw rebind exception
987                 synchronized ( stateEvent ){
988                     if (state != CLOSE_RECVD) {
989                         break;
990                     }
991                     throw wrapper.connectionCloseRebind() ;
992                 }
993 
994             default:
995                 if (orb.transportDebugFlag) {
996                     dprint(".writeLock: default: " + this);
997                 }
998                 // REVISIT
999                 throw new RuntimeException(".writeLock: bad state");
1000             }
1001         }
1002       } finally {
1003         if (dprintWriteLocks && orb.transportDebugFlag) {
1004             dprint(".writeLock<-: " + this);
1005         }
1006       }
1007     }
1008 
writeUnlock()1009     public void writeUnlock()
1010     {
1011         try {
1012             if (dprintWriteLocks && orb.transportDebugFlag) {
1013                 dprint(".writeUnlock->: " + this);
1014             }
1015             synchronized (writeEvent) {
1016                 writeLocked = false;
1017                 writeEvent.notify(); // wake up one guy waiting to write
1018             }
1019         } finally {
1020             if (dprintWriteLocks && orb.transportDebugFlag) {
1021                 dprint(".writeUnlock<-: " + this);
1022             }
1023         }
1024     }
1025 
1026     // Assumes the caller handles writeLock and writeUnlock
sendWithoutLock(OutputObject outputObject)1027     public void sendWithoutLock(OutputObject outputObject)
1028     {
1029         // Don't we need to check for CloseConnection
1030         // here?  REVISIT
1031 
1032         // XREVISIT - Shouldn't the MessageMediator
1033         // be the one to handle writing the data here?
1034 
1035         try {
1036 
1037             // Write the fragment/message
1038 
1039             CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
1040             cdrOutputObject.writeTo(this);
1041             // REVISIT - no flush?
1042             //socket.getOutputStream().flush();
1043 
1044         } catch (IOException e1) {
1045 
1046             /*
1047              * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
1048              * sending a CancelRequest for regular requests / locate requests
1049              */
1050 
1051             // Since IIOPOutputStream's msgheader is set only once, and not
1052             // altered during sending multiple fragments, the original
1053             // msgheader will always have the requestId.
1054             // REVISIT This could be optimized to send a CancelRequest only
1055             // if any fragments had been sent already.
1056 
1057             /* REVISIT: MOVE TO SUBCONTRACT
1058             Message msg = os.getMessage();
1059             if (msg.getType() == Message.GIOPRequest ||
1060                     msg.getType() == Message.GIOPLocateRequest) {
1061                 GIOPVersion requestVersion = msg.getGIOPVersion();
1062                 int requestId = MessageBase.getRequestId(msg);
1063                 try {
1064                     sendCancelRequest(requestVersion, requestId);
1065                 } catch (IOException e2) {
1066                     // most likely an abortive connection closure.
1067                     // ignore, since nothing more can be done.
1068                     if (orb.transportDebugFlag) {
1069 
1070                 }
1071             }
1072             */
1073 
1074             // REVISIT When a send failure happens, purgeCalls() need to be
1075             // called to ensure that the connection is properly removed from
1076             // further usage (ie., cancelling pending requests with COMM_FAILURE
1077             // with an appropriate minor_code CompletionStatus.MAY_BE).
1078 
1079             // Relying on the IIOPOutputStream (as noted below) is not
1080             // sufficient as it handles COMM_FAILURE only for the final
1081             // fragment (during invoke processing). Note that COMM_FAILURE could
1082             // happen while sending the initial fragments.
1083             // Also the IIOPOutputStream does not properly close the connection.
1084             // It simply removes the connection from the table. An orderly
1085             // closure is needed (ie., cancel pending requests on the connection
1086             // COMM_FAILURE as well.
1087 
1088             // IIOPOutputStream will cleanup the connection info when it
1089             // sees this exception.
1090             SystemException exc = wrapper.writeErrorSend(e1);
1091             purgeCalls(exc, false, true);
1092             throw exc;
1093         }
1094     }
1095 
registerWaiter(MessageMediator messageMediator)1096     public void registerWaiter(MessageMediator messageMediator)
1097     {
1098         responseWaitingRoom.registerWaiter(messageMediator);
1099     }
1100 
unregisterWaiter(MessageMediator messageMediator)1101     public void unregisterWaiter(MessageMediator messageMediator)
1102     {
1103         responseWaitingRoom.unregisterWaiter(messageMediator);
1104     }
1105 
waitForResponse(MessageMediator messageMediator)1106     public InputObject waitForResponse(MessageMediator messageMediator)
1107     {
1108         return responseWaitingRoom.waitForResponse(messageMediator);
1109     }
1110 
setConnectionCache(ConnectionCache connectionCache)1111     public void setConnectionCache(ConnectionCache connectionCache)
1112     {
1113         this.connectionCache = connectionCache;
1114     }
1115 
getConnectionCache()1116     public ConnectionCache getConnectionCache()
1117     {
1118         return connectionCache;
1119     }
1120 
1121     ////////////////////////////////////////////////////
1122     //
1123     // EventHandler methods
1124     //
1125 
setUseSelectThreadToWait(boolean x)1126     public void setUseSelectThreadToWait(boolean x)
1127     {
1128         useSelectThreadToWait = x;
1129         // REVISIT - Reading of a GIOP header only is information
1130         //           that should be passed into the constructor
1131         //           from the SocketOrChannelConnection factory.
1132         setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
1133     }
1134 
handleEvent()1135     public void handleEvent()
1136     {
1137         if (orb.transportDebugFlag) {
1138             dprint(".handleEvent->: " + this);
1139         }
1140         getSelectionKey().interestOps(getSelectionKey().interestOps() &
1141                                       (~ getInterestOps()));
1142 
1143         if (shouldUseWorkerThreadForEvent()) {
1144             Throwable throwable = null;
1145             try {
1146                 int poolToUse = 0;
1147                 if (shouldReadGiopHeaderOnly()) {
1148                     partialMessageMediator = readBits();
1149                     poolToUse =
1150                         partialMessageMediator.getThreadPoolToUse();
1151                 }
1152 
1153                 if (orb.transportDebugFlag) {
1154                     dprint(".handleEvent: addWork to pool: " + poolToUse);
1155                 }
1156                 orb.getThreadPoolManager().getThreadPool(poolToUse)
1157                     .getWorkQueue(0).addWork(getWork());
1158             } catch (NoSuchThreadPoolException e) {
1159                 throwable = e;
1160             } catch (NoSuchWorkQueueException e) {
1161                 throwable = e;
1162             }
1163             // REVISIT: need to close connection.
1164             if (throwable != null) {
1165                 if (orb.transportDebugFlag) {
1166                     dprint(".handleEvent: " + throwable);
1167                 }
1168                 INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
1169                 i.initCause(throwable);
1170                 throw i;
1171             }
1172         } else {
1173             if (orb.transportDebugFlag) {
1174                 dprint(".handleEvent: doWork");
1175             }
1176             getWork().doWork();
1177         }
1178         if (orb.transportDebugFlag) {
1179             dprint(".handleEvent<-: " + this);
1180         }
1181     }
1182 
getChannel()1183     public SelectableChannel getChannel()
1184     {
1185         return socketChannel;
1186     }
1187 
getInterestOps()1188     public int getInterestOps()
1189     {
1190         return SelectionKey.OP_READ;
1191     }
1192 
1193     //    public Acceptor getAcceptor() - already defined above.
1194 
getConnection()1195     public Connection getConnection()
1196     {
1197         return this;
1198     }
1199 
1200     ////////////////////////////////////////////////////
1201     //
1202     // Work methods.
1203     //
1204 
getName()1205     public String getName()
1206     {
1207         return this.toString();
1208     }
1209 
doWork()1210     public void doWork()
1211     {
1212         try {
1213             if (orb.transportDebugFlag) {
1214                 dprint(".doWork->: " + this);
1215             }
1216 
1217             // IMPORTANT: Sanity checks on SelectionKeys such as
1218             //            SelectorKey.isValid() should not be done
1219             //            here.
1220             //
1221 
1222             if (!shouldReadGiopHeaderOnly()) {
1223                 read();
1224             }
1225             else {
1226                 // get the partialMessageMediator
1227                 // created by SelectorThread
1228                 CorbaMessageMediator messageMediator =
1229                                          this.getPartialMessageMediator();
1230 
1231                 // read remaining info needed in a MessageMediator
1232                 messageMediator = finishReadingBits(messageMediator);
1233 
1234                 if (messageMediator != null) {
1235                     // Null can happen when client closes stream
1236                     // causing purgecalls.
1237                     dispatch(messageMediator);
1238                 }
1239             }
1240         } catch (Throwable t) {
1241             if (orb.transportDebugFlag) {
1242                 dprint(".doWork: ignoring Throwable: "
1243                        + t
1244                        + " " + this);
1245             }
1246         } finally {
1247             if (orb.transportDebugFlag) {
1248                 dprint(".doWork<-: " + this);
1249             }
1250         }
1251     }
1252 
setEnqueueTime(long timeInMillis)1253     public void setEnqueueTime(long timeInMillis)
1254     {
1255         enqueueTime = timeInMillis;
1256     }
1257 
getEnqueueTime()1258     public long getEnqueueTime()
1259     {
1260         return enqueueTime;
1261     }
1262 
1263     ////////////////////////////////////////////////////
1264     //
1265     // spi.transport.CorbaConnection.
1266     //
1267 
1268     // IMPORTANT: Reader Threads must NOT read Giop header only.
shouldReadGiopHeaderOnly()1269     public boolean shouldReadGiopHeaderOnly() {
1270         return shouldReadGiopHeaderOnly;
1271     }
1272 
setReadGiopHeaderOnly(boolean shouldReadHeaderOnly)1273     protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
1274         shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
1275     }
1276 
getResponseWaitingRoom()1277     public ResponseWaitingRoom getResponseWaitingRoom()
1278     {
1279         return responseWaitingRoom;
1280     }
1281 
1282     // REVISIT - inteface defines isServer but already defined in
1283     // higher interface.
1284 
serverRequestMapPut(int requestId, CorbaMessageMediator messageMediator)1285     public void serverRequestMapPut(int requestId,
1286                                     CorbaMessageMediator messageMediator)
1287     {
1288         serverRequestMap.put(new Integer(requestId), messageMediator);
1289     }
1290 
serverRequestMapGet(int requestId)1291     public CorbaMessageMediator serverRequestMapGet(int requestId)
1292     {
1293         return (CorbaMessageMediator)
1294             serverRequestMap.get(new Integer(requestId));
1295     }
1296 
serverRequestMapRemove(int requestId)1297     public void serverRequestMapRemove(int requestId)
1298     {
1299         serverRequestMap.remove(new Integer(requestId));
1300     }
1301 
1302 
1303     // REVISIT: this is also defined in:
1304     // com.sun.corba.se.spi.legacy.connection.Connection
getSocket()1305     public java.net.Socket getSocket()
1306     {
1307         return socket;
1308     }
1309 
1310     /** It is possible for a Close Connection to have been
1311      ** sent here, but we will not check for this. A "lazy"
1312      ** Exception will be thrown in the Worker thread after the
1313      ** incoming request has been processed even though the connection
1314      ** is closed before the request is processed. This is o.k because
1315      ** it is a boundary condition. To prevent it we would have to add
1316      ** more locks which would reduce performance in the normal case.
1317      **/
serverRequestProcessingBegins()1318     public synchronized void serverRequestProcessingBegins()
1319     {
1320         serverRequestCount++;
1321     }
1322 
serverRequestProcessingEnds()1323     public synchronized void serverRequestProcessingEnds()
1324     {
1325         serverRequestCount--;
1326     }
1327 
1328     //
1329     //
1330     //
1331 
getNextRequestId()1332     public synchronized int getNextRequestId()
1333     {
1334         return requestId++;
1335     }
1336 
1337     // Negotiated code sets for char and wchar data
1338     protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
1339 
getBroker()1340     public ORB getBroker()
1341     {
1342         return orb;
1343     }
1344 
getCodeSetContext()1345     public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
1346         // Needs to be synchronized for the following case when the client
1347         // doesn't send the code set context twice, and we have two threads
1348         // in ServerRequestDispatcher processCodeSetContext.
1349         //
1350         // Thread A checks to see if there is a context, there is none, so
1351         //     it calls setCodeSetContext, getting the synch lock.
1352         // Thread B checks to see if there is a context.  If we didn't synch,
1353         //     it might decide to outlaw wchar/wstring.
1354         if (codeSetContext == null) {
1355             synchronized(this) {
1356                 return codeSetContext;
1357             }
1358         }
1359 
1360         return codeSetContext;
1361     }
1362 
setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc)1363     public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
1364         // Double check whether or not we need to do this
1365         if (codeSetContext == null) {
1366 
1367             if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
1368                 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
1369                 // If the client says it's negotiated a code set that
1370                 // isn't a fallback and we never said we support, then
1371                 // it has a bug.
1372                 throw wrapper.badCodesetsFromClient() ;
1373             }
1374 
1375             codeSetContext = csc;
1376         }
1377     }
1378 
1379     //
1380     // from iiop.IIOPConnection.java
1381     //
1382 
1383     // Map request ID to an InputObject.
1384     // This is so the client thread can start unmarshaling
1385     // the reply and remove it from the out_calls map while the
1386     // ReaderThread can still obtain the input stream to give
1387     // new fragments.  Only the ReaderThread touches the clientReplyMap,
1388     // so it doesn't incur synchronization overhead.
1389 
clientRequestMapGet(int requestId)1390     public MessageMediator clientRequestMapGet(int requestId)
1391     {
1392         return responseWaitingRoom.getMessageMediator(requestId);
1393     }
1394 
1395     protected MessageMediator clientReply_1_1;
1396 
clientReply_1_1_Put(MessageMediator x)1397     public void clientReply_1_1_Put(MessageMediator x)
1398     {
1399         clientReply_1_1 = x;
1400     }
1401 
clientReply_1_1_Get()1402     public MessageMediator clientReply_1_1_Get()
1403     {
1404         return  clientReply_1_1;
1405     }
1406 
clientReply_1_1_Remove()1407     public void clientReply_1_1_Remove()
1408     {
1409         clientReply_1_1 = null;
1410     }
1411 
1412     protected MessageMediator serverRequest_1_1;
1413 
serverRequest_1_1_Put(MessageMediator x)1414     public void serverRequest_1_1_Put(MessageMediator x)
1415     {
1416         serverRequest_1_1 = x;
1417     }
1418 
serverRequest_1_1_Get()1419     public MessageMediator serverRequest_1_1_Get()
1420     {
1421         return  serverRequest_1_1;
1422     }
1423 
serverRequest_1_1_Remove()1424     public void serverRequest_1_1_Remove()
1425     {
1426         serverRequest_1_1 = null;
1427     }
1428 
getStateString( int state )1429     protected String getStateString( int state )
1430     {
1431         synchronized ( stateEvent ){
1432             switch (state) {
1433             case OPENING : return "OPENING" ;
1434             case ESTABLISHED : return "ESTABLISHED" ;
1435             case CLOSE_SENT : return "CLOSE_SENT" ;
1436             case CLOSE_RECVD : return "CLOSE_RECVD" ;
1437             case ABORT : return "ABORT" ;
1438             default : return "???" ;
1439             }
1440         }
1441     }
1442 
isPostInitialContexts()1443     public synchronized boolean isPostInitialContexts() {
1444         return postInitialContexts;
1445     }
1446 
1447     // Can never be unset...
setPostInitialContexts()1448     public synchronized void setPostInitialContexts(){
1449         postInitialContexts = true;
1450     }
1451 
1452     /**
1453      * Wake up the outstanding requests on the connection, and hand them
1454      * COMM_FAILURE exception with a given minor code.
1455      *
1456      * Also, delete connection from connection table and
1457      * stop the reader thread.
1458 
1459      * Note that this should only ever be called by the Reader thread for
1460      * this connection.
1461      *
1462      * @param minor_code The minor code for the COMM_FAILURE major code.
1463      * @param die Kill the reader thread (this thread) before exiting.
1464      */
purgeCalls(SystemException systemException, boolean die, boolean lockHeld)1465     public void purgeCalls(SystemException systemException,
1466                            boolean die, boolean lockHeld)
1467     {
1468         int minor_code = systemException.minor;
1469 
1470         try{
1471             if (orb.transportDebugFlag) {
1472                 dprint(".purgeCalls->: "
1473                        + minor_code + "/" + die + "/" + lockHeld
1474                        + " " + this);
1475             }
1476 
1477             // If this invocation is a result of ThreadDeath caused
1478             // by a previous execution of this routine, just exit.
1479 
1480             synchronized ( stateEvent ){
1481                 if ((state == ABORT) || (state == CLOSE_RECVD)) {
1482                     if (orb.transportDebugFlag) {
1483                         dprint(".purgeCalls: exiting since state is: "
1484                                + getStateString(state)
1485                                + " " + this);
1486                     }
1487                     return;
1488                 }
1489             }
1490 
1491             // Grab the writeLock (freeze the calls)
1492             try {
1493                 if (!lockHeld) {
1494                     writeLock();
1495                 }
1496             } catch (SystemException ex) {
1497                 if (orb.transportDebugFlag)
1498                     dprint(".purgeCalls: SystemException" + ex
1499                            + "; continuing " + this);
1500             }
1501 
1502             // Mark the state of the connection
1503             // and determine the request status
1504             org.omg.CORBA.CompletionStatus completion_status;
1505             synchronized ( stateEvent ){
1506                 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
1507                     state = CLOSE_RECVD;
1508                     systemException.completed = CompletionStatus.COMPLETED_NO;
1509                 } else {
1510                     state = ABORT;
1511                     systemException.completed = CompletionStatus.COMPLETED_MAYBE;
1512                 }
1513                 stateEvent.notifyAll();
1514             }
1515 
1516             try {
1517                 socket.getInputStream().close();
1518                 socket.getOutputStream().close();
1519                 socket.close();
1520             } catch (Exception ex) {
1521                 if (orb.transportDebugFlag) {
1522                     dprint(".purgeCalls: Exception closing socket: " + ex
1523                            + " " + this);
1524                 }
1525             }
1526 
1527             // Signal all threads with outstanding requests on this
1528             // connection and give them the SystemException;
1529 
1530             responseWaitingRoom.signalExceptionToAllWaiters(systemException);
1531         } finally {
1532             if (contactInfo != null) {
1533                 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
1534             } else if (acceptor != null) {
1535                 ((InboundConnectionCache)getConnectionCache()).remove(this);
1536             }
1537 
1538             //
1539             // REVISIT: Stop the reader thread
1540             //
1541 
1542             // Signal all the waiters of the writeLock.
1543             // There are 4 types of writeLock waiters:
1544             // 1. Send waiters:
1545             // 2. SendReply waiters:
1546             // 3. cleanUp waiters:
1547             // 4. purge_call waiters:
1548             //
1549 
1550             writeUnlock();
1551 
1552             if (orb.transportDebugFlag) {
1553                 dprint(".purgeCalls<-: "
1554                        + minor_code + "/" + die + "/" + lockHeld
1555                        + " " + this);
1556             }
1557         }
1558     }
1559 
1560     /*************************************************************************
1561     * The following methods are for dealing with Connection cleaning for
1562     * better scalability of servers in high network load conditions.
1563     **************************************************************************/
1564 
sendCloseConnection(GIOPVersion giopVersion)1565     public void sendCloseConnection(GIOPVersion giopVersion)
1566         throws IOException
1567     {
1568         Message msg = MessageBase.createCloseConnection(giopVersion);
1569         sendHelper(giopVersion, msg);
1570     }
1571 
sendMessageError(GIOPVersion giopVersion)1572     public void sendMessageError(GIOPVersion giopVersion)
1573         throws IOException
1574     {
1575         Message msg = MessageBase.createMessageError(giopVersion);
1576         sendHelper(giopVersion, msg);
1577     }
1578 
1579     /**
1580      * Send a CancelRequest message. This does not lock the connection, so the
1581      * caller needs to ensure this method is called appropriately.
1582      * @exception IOException - could be due to abortive connection closure.
1583      */
sendCancelRequest(GIOPVersion giopVersion, int requestId)1584     public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
1585         throws IOException
1586     {
1587 
1588         Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
1589         sendHelper(giopVersion, msg);
1590     }
1591 
sendHelper(GIOPVersion giopVersion, Message msg)1592     protected void sendHelper(GIOPVersion giopVersion, Message msg)
1593         throws IOException
1594     {
1595         // REVISIT: See comments in CDROutputObject constructor.
1596         CDROutputObject outputObject =
1597             sun.corba.OutputStreamFactory.newCDROutputObject((ORB)orb, null, giopVersion,
1598                                 this, msg, ORBConstants.STREAM_FORMAT_VERSION_1);
1599         msg.write(outputObject);
1600 
1601         outputObject.writeTo(this);
1602     }
1603 
sendCancelRequestWithLock(GIOPVersion giopVersion, int requestId)1604     public void sendCancelRequestWithLock(GIOPVersion giopVersion,
1605                                           int requestId)
1606         throws IOException
1607     {
1608         writeLock();
1609         try {
1610             sendCancelRequest(giopVersion, requestId);
1611         } finally {
1612             writeUnlock();
1613         }
1614     }
1615 
1616     // Begin Code Base methods ---------------------------------------
1617     //
1618     // Set this connection's code base IOR.  The IOR comes from the
1619     // SendingContext.  This is an optional service context, but all
1620     // JavaSoft ORBs send it.
1621     //
1622     // The set and get methods don't need to be synchronized since the
1623     // first possible get would occur during reading a valuetype, and
1624     // that would be after the set.
1625 
1626     // Sets this connection's code base IOR.  This is done after
1627     // getting the IOR out of the SendingContext service context.
1628     // Our ORBs always send this, but it's optional in CORBA.
1629 
setCodeBaseIOR(IOR ior)1630     public final void setCodeBaseIOR(IOR ior) {
1631         codeBaseServerIOR = ior;
1632     }
1633 
getCodeBaseIOR()1634     public final IOR getCodeBaseIOR() {
1635         return codeBaseServerIOR;
1636     }
1637 
1638     // Get a CodeBase stub to use in unmarshaling.  The CachedCodeBase
1639     // won't connect to the remote codebase unless it's necessary.
getCodeBase()1640     public final CodeBase getCodeBase() {
1641         return cachedCodeBase;
1642     }
1643 
1644     // End Code Base methods -----------------------------------------
1645 
1646     // set transport read thresholds
setReadTimeouts(ReadTimeouts readTimeouts)1647     protected void setReadTimeouts(ReadTimeouts readTimeouts) {
1648         this.readTimeouts = readTimeouts;
1649     }
1650 
setPartialMessageMediator(CorbaMessageMediator messageMediator)1651     protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
1652         partialMessageMediator = messageMediator;
1653     }
1654 
getPartialMessageMediator()1655     protected CorbaMessageMediator getPartialMessageMediator() {
1656         return partialMessageMediator;
1657     }
1658 
toString()1659     public String toString()
1660     {
1661         synchronized ( stateEvent ){
1662             return
1663                 "SocketOrChannelConnectionImpl[" + " "
1664                 + (socketChannel == null ?
1665                    socket.toString() : socketChannel.toString()) + " "
1666                 + getStateString( state ) + " "
1667                 + shouldUseSelectThreadToWait() + " "
1668                 + shouldUseWorkerThreadForEvent() + " "
1669                 + shouldReadGiopHeaderOnly()
1670                 + "]" ;
1671         }
1672     }
1673 
1674     // Must be public - used in encoding.
dprint(String msg)1675     public void dprint(String msg)
1676     {
1677         ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
1678     }
1679 
dprint(String msg, Throwable t)1680     protected void dprint(String msg, Throwable t)
1681     {
1682         dprint(msg);
1683         t.printStackTrace(System.out);
1684     }
1685 }
1686 
1687 // End of file.
1688