1 /* 2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.transport; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.Socket; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SelectableChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.SocketChannel; 35 import java.security.AccessController; 36 import java.security.PrivilegedAction; 37 import java.util.Collections; 38 import java.util.Hashtable; 39 import java.util.HashMap; 40 import java.util.Map; 41 42 import org.omg.CORBA.COMM_FAILURE; 43 import org.omg.CORBA.CompletionStatus; 44 import org.omg.CORBA.DATA_CONVERSION; 45 import org.omg.CORBA.INTERNAL; 46 import org.omg.CORBA.MARSHAL; 47 import org.omg.CORBA.OBJECT_NOT_EXIST; 48 import org.omg.CORBA.SystemException; 49 50 import com.sun.org.omg.SendingContext.CodeBase; 51 52 import com.sun.corba.se.pept.broker.Broker; 53 import com.sun.corba.se.pept.encoding.InputObject; 54 import com.sun.corba.se.pept.encoding.OutputObject; 55 import com.sun.corba.se.pept.protocol.MessageMediator; 56 import com.sun.corba.se.pept.transport.Acceptor; 57 import com.sun.corba.se.pept.transport.Connection; 58 import com.sun.corba.se.pept.transport.ConnectionCache; 59 import com.sun.corba.se.pept.transport.ContactInfo; 60 import com.sun.corba.se.pept.transport.EventHandler; 61 import com.sun.corba.se.pept.transport.InboundConnectionCache; 62 import com.sun.corba.se.pept.transport.OutboundConnectionCache; 63 import com.sun.corba.se.pept.transport.ResponseWaitingRoom; 64 import com.sun.corba.se.pept.transport.Selector; 65 66 import com.sun.corba.se.spi.ior.IOR; 67 import com.sun.corba.se.spi.ior.iiop.GIOPVersion; 68 import com.sun.corba.se.spi.logging.CORBALogDomains; 69 import com.sun.corba.se.spi.orb.ORB ; 70 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 71 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 72 import com.sun.corba.se.spi.orbutil.threadpool.Work; 73 import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 74 import com.sun.corba.se.spi.transport.CorbaContactInfo; 75 import com.sun.corba.se.spi.transport.CorbaConnection; 76 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; 77 import com.sun.corba.se.spi.transport.ReadTimeouts; 78 79 import com.sun.corba.se.impl.encoding.CachedCodeBase; 80 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0; 81 import com.sun.corba.se.impl.encoding.CDROutputObject; 82 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0; 83 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo; 84 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry; 85 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 86 import com.sun.corba.se.impl.orbutil.ORBConstants; 87 import com.sun.corba.se.impl.orbutil.ORBUtility; 88 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; 89 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase; 90 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl; 91 92 /** 93 * @author Harold Carr 94 */ 95 public class SocketOrChannelConnectionImpl 96 extends 97 EventHandlerBase 98 implements 99 CorbaConnection, 100 Work 101 { 102 public static boolean dprintWriteLocks = false; 103 104 // 105 // New transport. 106 // 107 108 protected long enqueueTime; 109 110 protected SocketChannel socketChannel; getSocketChannel()111 public SocketChannel getSocketChannel() 112 { 113 return socketChannel; 114 } 115 116 // REVISIT: 117 // protected for test: genericRPCMSGFramework.IIOPConnection constructor. 118 protected CorbaContactInfo contactInfo; 119 protected Acceptor acceptor; 120 protected ConnectionCache connectionCache; 121 122 // 123 // From iiop.Connection.java 124 // 125 126 protected Socket socket; // The socket used for this connection. 127 protected long timeStamp = 0; 128 protected boolean isServer = false; 129 130 // Start at some value other than zero since this is a magic 131 // value in some protocols. 132 protected int requestId = 5; 133 protected CorbaResponseWaitingRoom responseWaitingRoom; 134 protected int state; 135 protected java.lang.Object stateEvent = new java.lang.Object(); 136 protected java.lang.Object writeEvent = new java.lang.Object(); 137 protected boolean writeLocked; 138 protected int serverRequestCount = 0; 139 140 // Server request map: used on the server side of Connection 141 // Maps request ID to IIOPInputStream. 142 Map serverRequestMap = null; 143 144 // This is a flag associated per connection telling us if the 145 // initial set of sending contexts were sent to the receiver 146 // already... 147 protected boolean postInitialContexts = false; 148 149 // Remote reference to CodeBase server (supplies 150 // FullValueDescription, among other things) 151 protected IOR codeBaseServerIOR; 152 153 // CodeBase cache for this connection. This will cache remote operations, 154 // handle connecting, and ensure we don't do any remote operations until 155 // necessary. 156 protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this); 157 158 protected ORBUtilSystemException wrapper ; 159 160 // transport read timeout values 161 protected ReadTimeouts readTimeouts; 162 163 protected boolean shouldReadGiopHeaderOnly; 164 165 // A message mediator used when shouldReadGiopHeaderOnly is 166 // true to maintain request message state across execution in a 167 // SelectorThread and WorkerThread. 168 protected CorbaMessageMediator partialMessageMediator = null; 169 170 // Used in genericRPCMSGFramework test. SocketOrChannelConnectionImpl(ORB orb)171 protected SocketOrChannelConnectionImpl(ORB orb) 172 { 173 this.orb = orb; 174 wrapper = ORBUtilSystemException.get( orb, 175 CORBALogDomains.RPC_TRANSPORT ) ; 176 177 setWork(this); 178 responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this); 179 setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts()); 180 } 181 182 // Both client and servers. SocketOrChannelConnectionImpl(ORB orb, boolean useSelectThreadToWait, boolean useWorkerThread)183 protected SocketOrChannelConnectionImpl(ORB orb, 184 boolean useSelectThreadToWait, 185 boolean useWorkerThread) 186 { 187 this(orb) ; 188 setUseSelectThreadToWait(useSelectThreadToWait); 189 setUseWorkerThreadForEvent(useWorkerThread); 190 } 191 192 // Client constructor. SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, boolean useSelectThreadToWait, boolean useWorkerThread, String socketType, String hostname, int port)193 public SocketOrChannelConnectionImpl(ORB orb, 194 CorbaContactInfo contactInfo, 195 boolean useSelectThreadToWait, 196 boolean useWorkerThread, 197 String socketType, 198 String hostname, 199 int port) 200 { 201 this(orb, useSelectThreadToWait, useWorkerThread); 202 203 this.contactInfo = contactInfo; 204 205 try { 206 socket = orb.getORBData().getSocketFactory() 207 .createSocket(socketType, 208 new InetSocketAddress(hostname, port)); 209 socketChannel = socket.getChannel(); 210 211 if (socketChannel != null) { 212 boolean isBlocking = !useSelectThreadToWait; 213 socketChannel.configureBlocking(isBlocking); 214 } else { 215 // IMPORTANT: non-channel-backed sockets must use 216 // dedicated reader threads. 217 setUseSelectThreadToWait(false); 218 } 219 if (orb.transportDebugFlag) { 220 dprint(".initialize: connection created: " + socket); 221 } 222 } catch (Throwable t) { 223 throw wrapper.connectFailure(t, socketType, hostname, 224 Integer.toString(port)); 225 } 226 state = OPENING; 227 } 228 229 // Client-side convenience. SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, String socketType, String hostname, int port)230 public SocketOrChannelConnectionImpl(ORB orb, 231 CorbaContactInfo contactInfo, 232 String socketType, 233 String hostname, 234 int port) 235 { 236 this(orb, contactInfo, 237 orb.getORBData().connectionSocketUseSelectThreadToWait(), 238 orb.getORBData().connectionSocketUseWorkerThreadForEvent(), 239 socketType, hostname, port); 240 } 241 242 // Server-side constructor. SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket, boolean useSelectThreadToWait, boolean useWorkerThread)243 public SocketOrChannelConnectionImpl(ORB orb, 244 Acceptor acceptor, 245 Socket socket, 246 boolean useSelectThreadToWait, 247 boolean useWorkerThread) 248 { 249 this(orb, useSelectThreadToWait, useWorkerThread); 250 251 this.socket = socket; 252 socketChannel = socket.getChannel(); 253 if (socketChannel != null) { 254 // REVISIT 255 try { 256 boolean isBlocking = !useSelectThreadToWait; 257 socketChannel.configureBlocking(isBlocking); 258 } catch (IOException e) { 259 RuntimeException rte = new RuntimeException(); 260 rte.initCause(e); 261 throw rte; 262 } 263 } 264 this.acceptor = acceptor; 265 266 serverRequestMap = Collections.synchronizedMap(new HashMap()); 267 isServer = true; 268 269 state = ESTABLISHED; 270 } 271 272 // Server-side convenience SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket)273 public SocketOrChannelConnectionImpl(ORB orb, 274 Acceptor acceptor, 275 Socket socket) 276 { 277 this(orb, acceptor, socket, 278 (socket.getChannel() == null 279 ? false 280 : orb.getORBData().connectionSocketUseSelectThreadToWait()), 281 (socket.getChannel() == null 282 ? false 283 : orb.getORBData().connectionSocketUseWorkerThreadForEvent())); 284 } 285 286 //////////////////////////////////////////////////// 287 // 288 // framework.transport.Connection 289 // 290 shouldRegisterReadEvent()291 public boolean shouldRegisterReadEvent() 292 { 293 return true; 294 } 295 shouldRegisterServerReadEvent()296 public boolean shouldRegisterServerReadEvent() 297 { 298 return true; 299 } 300 read()301 public boolean read() 302 { 303 try { 304 if (orb.transportDebugFlag) { 305 dprint(".read->: " + this); 306 } 307 CorbaMessageMediator messageMediator = readBits(); 308 if (messageMediator != null) { 309 // Null can happen when client closes stream 310 // causing purgecalls. 311 return dispatch(messageMediator); 312 } 313 return true; 314 } finally { 315 if (orb.transportDebugFlag) { 316 dprint(".read<-: " + this); 317 } 318 } 319 } 320 readBits()321 protected CorbaMessageMediator readBits() 322 { 323 try { 324 325 if (orb.transportDebugFlag) { 326 dprint(".readBits->: " + this); 327 } 328 329 MessageMediator messageMediator; 330 // REVISIT - use common factory base class. 331 if (contactInfo != null) { 332 messageMediator = 333 contactInfo.createMessageMediator(orb, this); 334 } else if (acceptor != null) { 335 messageMediator = acceptor.createMessageMediator(orb, this); 336 } else { 337 throw 338 new RuntimeException("SocketOrChannelConnectionImpl.readBits"); 339 } 340 return (CorbaMessageMediator) messageMediator; 341 342 } catch (ThreadDeath td) { 343 if (orb.transportDebugFlag) { 344 dprint(".readBits: " + this + ": ThreadDeath: " + td, td); 345 } 346 try { 347 purgeCalls(wrapper.connectionAbort(td), false, false); 348 } catch (Throwable t) { 349 if (orb.transportDebugFlag) { 350 dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t); 351 } 352 } 353 throw td; 354 } catch (Throwable ex) { 355 if (orb.transportDebugFlag) { 356 dprint(".readBits: " + this + ": Throwable: " + ex, ex); 357 } 358 359 try { 360 if (ex instanceof INTERNAL) { 361 sendMessageError(GIOPVersion.DEFAULT_VERSION); 362 } 363 } catch (IOException e) { 364 if (orb.transportDebugFlag) { 365 dprint(".readBits: " + this + 366 ": sendMessageError: IOException: " + e, e); 367 } 368 } 369 // REVISIT - make sure reader thread is killed. 370 Selector selector = orb.getTransportManager().getSelector(0); 371 if (selector != null) { 372 selector.unregisterForEvent(this); 373 } 374 // Notify anyone waiting. 375 purgeCalls(wrapper.connectionAbort(ex), true, false); 376 // REVISIT 377 //keepRunning = false; 378 // REVISIT - if this is called after purgeCalls then 379 // the state of the socket is ABORT so the writeLock 380 // in close throws an exception. It is ignored but 381 // causes IBM (screen scraping) tests to fail. 382 //close(); 383 } finally { 384 if (orb.transportDebugFlag) { 385 dprint(".readBits<-: " + this); 386 } 387 } 388 return null; 389 } 390 finishReadingBits(MessageMediator messageMediator)391 protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator) 392 { 393 try { 394 395 if (orb.transportDebugFlag) { 396 dprint(".finishReadingBits->: " + this); 397 } 398 399 // REVISIT - use common factory base class. 400 if (contactInfo != null) { 401 messageMediator = 402 contactInfo.finishCreatingMessageMediator(orb, this, messageMediator); 403 } else if (acceptor != null) { 404 messageMediator = 405 acceptor.finishCreatingMessageMediator(orb, this, messageMediator); 406 } else { 407 throw 408 new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits"); 409 } 410 return (CorbaMessageMediator) messageMediator; 411 412 } catch (ThreadDeath td) { 413 if (orb.transportDebugFlag) { 414 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td); 415 } 416 try { 417 purgeCalls(wrapper.connectionAbort(td), false, false); 418 } catch (Throwable t) { 419 if (orb.transportDebugFlag) { 420 dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t); 421 } 422 } 423 throw td; 424 } catch (Throwable ex) { 425 if (orb.transportDebugFlag) { 426 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex); 427 } 428 429 try { 430 if (ex instanceof INTERNAL) { 431 sendMessageError(GIOPVersion.DEFAULT_VERSION); 432 } 433 } catch (IOException e) { 434 if (orb.transportDebugFlag) { 435 dprint(".finishReadingBits: " + this + 436 ": sendMessageError: IOException: " + e, e); 437 } 438 } 439 // REVISIT - make sure reader thread is killed. 440 orb.getTransportManager().getSelector(0).unregisterForEvent(this); 441 // Notify anyone waiting. 442 purgeCalls(wrapper.connectionAbort(ex), true, false); 443 // REVISIT 444 //keepRunning = false; 445 // REVISIT - if this is called after purgeCalls then 446 // the state of the socket is ABORT so the writeLock 447 // in close throws an exception. It is ignored but 448 // causes IBM (screen scraping) tests to fail. 449 //close(); 450 } finally { 451 if (orb.transportDebugFlag) { 452 dprint(".finishReadingBits<-: " + this); 453 } 454 } 455 return null; 456 } 457 dispatch(CorbaMessageMediator messageMediator)458 protected boolean dispatch(CorbaMessageMediator messageMediator) 459 { 460 try { 461 if (orb.transportDebugFlag) { 462 dprint(".dispatch->: " + this); 463 } 464 465 // 466 // NOTE: 467 // 468 // This call is the transition from the tranport block 469 // to the protocol block. 470 // 471 472 boolean result = 473 messageMediator.getProtocolHandler() 474 .handleRequest(messageMediator); 475 476 return result; 477 478 } catch (ThreadDeath td) { 479 if (orb.transportDebugFlag) { 480 dprint(".dispatch: ThreadDeath", td ); 481 } 482 try { 483 purgeCalls(wrapper.connectionAbort(td), false, false); 484 } catch (Throwable t) { 485 if (orb.transportDebugFlag) { 486 dprint(".dispatch: purgeCalls: Throwable", t); 487 } 488 } 489 throw td; 490 } catch (Throwable ex) { 491 if (orb.transportDebugFlag) { 492 dprint(".dispatch: Throwable", ex ) ; 493 } 494 495 try { 496 if (ex instanceof INTERNAL) { 497 sendMessageError(GIOPVersion.DEFAULT_VERSION); 498 } 499 } catch (IOException e) { 500 if (orb.transportDebugFlag) { 501 dprint(".dispatch: sendMessageError: IOException", e); 502 } 503 } 504 purgeCalls(wrapper.connectionAbort(ex), false, false); 505 // REVISIT 506 //keepRunning = false; 507 } finally { 508 if (orb.transportDebugFlag) { 509 dprint(".dispatch<-: " + this); 510 } 511 } 512 513 return true; 514 } 515 shouldUseDirectByteBuffers()516 public boolean shouldUseDirectByteBuffers() 517 { 518 return getSocketChannel() != null; 519 } 520 read(int size, int offset, int length, long max_wait_time)521 public ByteBuffer read(int size, int offset, int length, long max_wait_time) 522 throws IOException 523 { 524 if (shouldUseDirectByteBuffers()) { 525 526 ByteBuffer byteBuffer = 527 orb.getByteBufferPool().getByteBuffer(size); 528 529 if (orb.transportDebugFlag) { 530 // print address of ByteBuffer gotten from pool 531 int bbAddress = System.identityHashCode(byteBuffer); 532 StringBuffer sb = new StringBuffer(80); 533 sb.append(".read: got ByteBuffer id ("); 534 sb.append(bbAddress).append(") from ByteBufferPool."); 535 String msgStr = sb.toString(); 536 dprint(msgStr); 537 } 538 539 byteBuffer.position(offset); 540 byteBuffer.limit(size); 541 542 readFully(byteBuffer, length, max_wait_time); 543 544 return byteBuffer; 545 } 546 547 byte[] buf = new byte[size]; 548 readFully(getSocket().getInputStream(), buf, 549 offset, length, max_wait_time); 550 ByteBuffer byteBuffer = ByteBuffer.wrap(buf); 551 byteBuffer.limit(size); 552 return byteBuffer; 553 } 554 read(ByteBuffer byteBuffer, int offset, int length, long max_wait_time)555 public ByteBuffer read(ByteBuffer byteBuffer, int offset, 556 int length, long max_wait_time) 557 throws IOException 558 { 559 int size = offset + length; 560 if (shouldUseDirectByteBuffers()) { 561 562 if (! byteBuffer.isDirect()) { 563 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); 564 } 565 if (size > byteBuffer.capacity()) { 566 if (orb.transportDebugFlag) { 567 // print address of ByteBuffer being released 568 int bbAddress = System.identityHashCode(byteBuffer); 569 StringBuffer bbsb = new StringBuffer(80); 570 bbsb.append(".read: releasing ByteBuffer id (") 571 .append(bbAddress).append(") to ByteBufferPool."); 572 String bbmsg = bbsb.toString(); 573 dprint(bbmsg); 574 } 575 orb.getByteBufferPool().releaseByteBuffer(byteBuffer); 576 byteBuffer = orb.getByteBufferPool().getByteBuffer(size); 577 } 578 byteBuffer.position(offset); 579 byteBuffer.limit(size); 580 readFully(byteBuffer, length, max_wait_time); 581 byteBuffer.position(0); 582 byteBuffer.limit(size); 583 return byteBuffer; 584 } 585 if (byteBuffer.isDirect()) { 586 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 587 } 588 byte[] buf = new byte[size]; 589 readFully(getSocket().getInputStream(), buf, 590 offset, length, max_wait_time); 591 return ByteBuffer.wrap(buf); 592 } 593 readFully(ByteBuffer byteBuffer, int size, long max_wait_time)594 public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time) 595 throws IOException 596 { 597 int n = 0; 598 int bytecount = 0; 599 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 600 long total_time_in_wait = 0; 601 602 // The reading of data incorporates a strategy to detect a 603 // rogue client. The strategy is implemented as follows. As 604 // long as data is being read, at least 1 byte or more, we 605 // assume we have a well behaved client. If no data is read, 606 // then we sleep for a time to wait, re-calculate a new time to 607 // wait which is lengthier than the previous time spent waiting. 608 // Then, if the total time spent waiting does not exceed a 609 // maximum time we are willing to wait, we attempt another 610 // read. If the maximum amount of time we are willing to 611 // spend waiting for more data is exceeded, we throw an 612 // IOException. 613 614 // NOTE: Reading of GIOP headers are treated with a smaller 615 // maximum time to wait threshold. Based on extensive 616 // performance testing, all GIOP headers are being 617 // read in 1 read access. 618 619 do { 620 bytecount = getSocketChannel().read(byteBuffer); 621 622 if (bytecount < 0) { 623 throw new IOException("End-of-stream"); 624 } 625 else if (bytecount == 0) { 626 try { 627 Thread.sleep(time_to_wait); 628 total_time_in_wait += time_to_wait; 629 time_to_wait = 630 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 631 } 632 catch (InterruptedException ie) { 633 // ignore exception 634 if (orb.transportDebugFlag) { 635 dprint("readFully(): unexpected exception " 636 + ie.toString()); 637 } 638 } 639 } 640 else { 641 n += bytecount; 642 } 643 } 644 while (n < size && total_time_in_wait < max_wait_time); 645 646 if (n < size && total_time_in_wait >= max_wait_time) 647 { 648 // failed to read entire message 649 throw wrapper.transportReadTimeoutExceeded(new Integer(size), 650 new Integer(n), new Long(max_wait_time), 651 new Long(total_time_in_wait)); 652 } 653 654 getConnectionCache().stampTime(this); 655 } 656 657 // To support non-channel connections. readFully(java.io.InputStream is, byte[] buf, int offset, int size, long max_wait_time)658 public void readFully(java.io.InputStream is, byte[] buf, 659 int offset, int size, long max_wait_time) 660 throws IOException 661 { 662 int n = 0; 663 int bytecount = 0; 664 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 665 long total_time_in_wait = 0; 666 667 // The reading of data incorporates a strategy to detect a 668 // rogue client. The strategy is implemented as follows. As 669 // long as data is being read, at least 1 byte or more, we 670 // assume we have a well behaved client. If no data is read, 671 // then we sleep for a time to wait, re-calculate a new time to 672 // wait which is lengthier than the previous time spent waiting. 673 // Then, if the total time spent waiting does not exceed a 674 // maximum time we are willing to wait, we attempt another 675 // read. If the maximum amount of time we are willing to 676 // spend waiting for more data is exceeded, we throw an 677 // IOException. 678 679 // NOTE: Reading of GIOP headers are treated with a smaller 680 // maximum time to wait threshold. Based on extensive 681 // performance testing, all GIOP headers are being 682 // read in 1 read access. 683 684 do { 685 bytecount = is.read(buf, offset + n, size - n); 686 if (bytecount < 0) { 687 throw new IOException("End-of-stream"); 688 } 689 else if (bytecount == 0) { 690 try { 691 Thread.sleep(time_to_wait); 692 total_time_in_wait += time_to_wait; 693 time_to_wait = 694 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 695 } 696 catch (InterruptedException ie) { 697 // ignore exception 698 if (orb.transportDebugFlag) { 699 dprint("readFully(): unexpected exception " 700 + ie.toString()); 701 } 702 } 703 } 704 else { 705 n += bytecount; 706 } 707 } 708 while (n < size && total_time_in_wait < max_wait_time); 709 710 if (n < size && total_time_in_wait >= max_wait_time) 711 { 712 // failed to read entire message 713 throw wrapper.transportReadTimeoutExceeded(new Integer(size), 714 new Integer(n), new Long(max_wait_time), 715 new Long(total_time_in_wait)); 716 } 717 718 getConnectionCache().stampTime(this); 719 } 720 write(ByteBuffer byteBuffer)721 public void write(ByteBuffer byteBuffer) 722 throws IOException 723 { 724 if (shouldUseDirectByteBuffers()) { 725 /* NOTE: cannot perform this test. If one ask for a 726 ByteBuffer from the pool which is bigger than the size 727 of ByteBuffers managed by the pool, then the pool will 728 return a HeapByteBuffer. 729 if (byteBuffer.hasArray()) { 730 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); 731 } 732 */ 733 // IMPORTANT: For non-blocking SocketChannels, there's no guarantee 734 // all bytes are written on first write attempt. 735 do { 736 getSocketChannel().write(byteBuffer); 737 } 738 while (byteBuffer.hasRemaining()); 739 740 } else { 741 if (! byteBuffer.hasArray()) { 742 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 743 } 744 byte[] tmpBuf = byteBuffer.array(); 745 getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit()); 746 getSocket().getOutputStream().flush(); 747 } 748 749 // TimeStamp connection to indicate it has been used 750 // Note granularity of connection usage is assumed for 751 // now to be that of a IIOP packet. 752 getConnectionCache().stampTime(this); 753 } 754 755 /** 756 * Note:it is possible for this to be called more than once 757 */ close()758 public synchronized void close() 759 { 760 try { 761 if (orb.transportDebugFlag) { 762 dprint(".close->: " + this); 763 } 764 writeLock(); 765 766 // REVISIT It will be good to have a read lock on the reader thread 767 // before we proceed further, to avoid the reader thread (server side) 768 // from processing requests. This avoids the risk that a new request 769 // will be accepted by ReaderThread while the ListenerThread is 770 // attempting to close this connection. 771 772 if (isBusy()) { // we are busy! 773 writeUnlock(); 774 if (orb.transportDebugFlag) { 775 dprint(".close: isBusy so no close: " + this); 776 } 777 return; 778 } 779 780 try { 781 try { 782 sendCloseConnection(GIOPVersion.V1_0); 783 } catch (Throwable t) { 784 wrapper.exceptionWhenSendingCloseConnection(t); 785 } 786 787 synchronized ( stateEvent ){ 788 state = CLOSE_SENT; 789 stateEvent.notifyAll(); 790 } 791 792 // stop the reader without causing it to do purgeCalls 793 //Exception ex = new Exception(); 794 //reader.stop(ex); // REVISIT 795 796 // NOTE: !!!!!! 797 // This does writeUnlock(). 798 purgeCalls(wrapper.connectionRebind(), false, true); 799 800 } catch (Exception ex) { 801 if (orb.transportDebugFlag) { 802 dprint(".close: exception: " + this, ex); 803 } 804 } 805 try { 806 Selector selector = orb.getTransportManager().getSelector(0); 807 if (selector != null) { 808 selector.unregisterForEvent(this); 809 } 810 if (socketChannel != null) { 811 socketChannel.close(); 812 } 813 socket.close(); 814 } catch (IOException e) { 815 if (orb.transportDebugFlag) { 816 dprint(".close: " + this, e); 817 } 818 } 819 closeConnectionResources(); 820 } finally { 821 if (orb.transportDebugFlag) { 822 dprint(".close<-: " + this); 823 } 824 } 825 } 826 closeConnectionResources()827 public void closeConnectionResources() { 828 if (orb.transportDebugFlag) { 829 dprint(".closeConnectionResources->: " + this); 830 } 831 Selector selector = orb.getTransportManager().getSelector(0); 832 if (selector != null) { 833 selector.unregisterForEvent(this); 834 } 835 try { 836 if (socketChannel != null) 837 socketChannel.close() ; 838 if (socket != null && !socket.isClosed()) 839 socket.close() ; 840 } catch (IOException e) { 841 if (orb.transportDebugFlag) { 842 dprint( ".closeConnectionResources: " + this, e ) ; 843 } 844 } 845 if (orb.transportDebugFlag) { 846 dprint(".closeConnectionResources<-: " + this); 847 } 848 } 849 850 getAcceptor()851 public Acceptor getAcceptor() 852 { 853 return acceptor; 854 } 855 getContactInfo()856 public ContactInfo getContactInfo() 857 { 858 return contactInfo; 859 } 860 getEventHandler()861 public EventHandler getEventHandler() 862 { 863 return this; 864 } 865 createOutputObject(MessageMediator messageMediator)866 public OutputObject createOutputObject(MessageMediator messageMediator) 867 { 868 // REVISIT - remove this method from Connection and all it subclasses. 869 throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called."); 870 } 871 872 // This is used by the GIOPOutputObject in order to 873 // throw the correct error when handling code sets. 874 // Can we determine if we are on the server side by 875 // other means? XREVISIT isServer()876 public boolean isServer() 877 { 878 return isServer; 879 } 880 isBusy()881 public boolean isBusy() 882 { 883 if (serverRequestCount > 0 || 884 getResponseWaitingRoom().numberRegistered() > 0) 885 { 886 return true; 887 } else { 888 return false; 889 } 890 } 891 getTimeStamp()892 public long getTimeStamp() 893 { 894 return timeStamp; 895 } 896 setTimeStamp(long time)897 public void setTimeStamp(long time) 898 { 899 timeStamp = time; 900 } 901 setState(String stateString)902 public void setState(String stateString) 903 { 904 synchronized (stateEvent) { 905 if (stateString.equals("ESTABLISHED")) { 906 state = ESTABLISHED; 907 stateEvent.notifyAll(); 908 } else { 909 // REVISIT: ASSERT 910 } 911 } 912 } 913 914 /** 915 * Sets the writeLock for this connection. 916 * If the writeLock is already set by someone else, block till the 917 * writeLock is released and can set by us. 918 * IMPORTANT: this connection's lock must be acquired before 919 * setting the writeLock and must be unlocked after setting the writeLock. 920 */ writeLock()921 public void writeLock() 922 { 923 try { 924 if (dprintWriteLocks && orb.transportDebugFlag) { 925 dprint(".writeLock->: " + this); 926 } 927 // Keep looping till we can set the writeLock. 928 while ( true ) { 929 int localState = state; 930 switch ( localState ) { 931 932 case OPENING: 933 synchronized (stateEvent) { 934 if (state != OPENING) { 935 // somebody has changed 'state' so be careful 936 break; 937 } 938 try { 939 stateEvent.wait(); 940 } catch (InterruptedException ie) { 941 if (orb.transportDebugFlag) { 942 dprint(".writeLock: OPENING InterruptedException: " + this); 943 } 944 } 945 } 946 // Loop back 947 break; 948 949 case ESTABLISHED: 950 synchronized (writeEvent) { 951 if (!writeLocked) { 952 writeLocked = true; 953 return; 954 } 955 956 try { 957 // do not stay here too long if state != ESTABLISHED 958 // Bug 4752117 959 while (state == ESTABLISHED && writeLocked) { 960 writeEvent.wait(100); 961 } 962 } catch (InterruptedException ie) { 963 if (orb.transportDebugFlag) { 964 dprint(".writeLock: ESTABLISHED InterruptedException: " + this); 965 } 966 } 967 } 968 // Loop back 969 break; 970 971 // 972 // XXX 973 // Need to distinguish between client and server roles 974 // here probably. 975 // 976 case ABORT: 977 synchronized ( stateEvent ){ 978 if (state != ABORT) { 979 break; 980 } 981 throw wrapper.writeErrorSend() ; 982 } 983 984 case CLOSE_RECVD: 985 // the connection has been closed or closing 986 // ==> throw rebind exception 987 synchronized ( stateEvent ){ 988 if (state != CLOSE_RECVD) { 989 break; 990 } 991 throw wrapper.connectionCloseRebind() ; 992 } 993 994 default: 995 if (orb.transportDebugFlag) { 996 dprint(".writeLock: default: " + this); 997 } 998 // REVISIT 999 throw new RuntimeException(".writeLock: bad state"); 1000 } 1001 } 1002 } finally { 1003 if (dprintWriteLocks && orb.transportDebugFlag) { 1004 dprint(".writeLock<-: " + this); 1005 } 1006 } 1007 } 1008 writeUnlock()1009 public void writeUnlock() 1010 { 1011 try { 1012 if (dprintWriteLocks && orb.transportDebugFlag) { 1013 dprint(".writeUnlock->: " + this); 1014 } 1015 synchronized (writeEvent) { 1016 writeLocked = false; 1017 writeEvent.notify(); // wake up one guy waiting to write 1018 } 1019 } finally { 1020 if (dprintWriteLocks && orb.transportDebugFlag) { 1021 dprint(".writeUnlock<-: " + this); 1022 } 1023 } 1024 } 1025 1026 // Assumes the caller handles writeLock and writeUnlock sendWithoutLock(OutputObject outputObject)1027 public void sendWithoutLock(OutputObject outputObject) 1028 { 1029 // Don't we need to check for CloseConnection 1030 // here? REVISIT 1031 1032 // XREVISIT - Shouldn't the MessageMediator 1033 // be the one to handle writing the data here? 1034 1035 try { 1036 1037 // Write the fragment/message 1038 1039 CDROutputObject cdrOutputObject = (CDROutputObject) outputObject; 1040 cdrOutputObject.writeTo(this); 1041 // REVISIT - no flush? 1042 //socket.getOutputStream().flush(); 1043 1044 } catch (IOException e1) { 1045 1046 /* 1047 * ADDED(Ram J) 10/13/2000 In the event of an IOException, try 1048 * sending a CancelRequest for regular requests / locate requests 1049 */ 1050 1051 // Since IIOPOutputStream's msgheader is set only once, and not 1052 // altered during sending multiple fragments, the original 1053 // msgheader will always have the requestId. 1054 // REVISIT This could be optimized to send a CancelRequest only 1055 // if any fragments had been sent already. 1056 1057 /* REVISIT: MOVE TO SUBCONTRACT 1058 Message msg = os.getMessage(); 1059 if (msg.getType() == Message.GIOPRequest || 1060 msg.getType() == Message.GIOPLocateRequest) { 1061 GIOPVersion requestVersion = msg.getGIOPVersion(); 1062 int requestId = MessageBase.getRequestId(msg); 1063 try { 1064 sendCancelRequest(requestVersion, requestId); 1065 } catch (IOException e2) { 1066 // most likely an abortive connection closure. 1067 // ignore, since nothing more can be done. 1068 if (orb.transportDebugFlag) { 1069 1070 } 1071 } 1072 */ 1073 1074 // REVISIT When a send failure happens, purgeCalls() need to be 1075 // called to ensure that the connection is properly removed from 1076 // further usage (ie., cancelling pending requests with COMM_FAILURE 1077 // with an appropriate minor_code CompletionStatus.MAY_BE). 1078 1079 // Relying on the IIOPOutputStream (as noted below) is not 1080 // sufficient as it handles COMM_FAILURE only for the final 1081 // fragment (during invoke processing). Note that COMM_FAILURE could 1082 // happen while sending the initial fragments. 1083 // Also the IIOPOutputStream does not properly close the connection. 1084 // It simply removes the connection from the table. An orderly 1085 // closure is needed (ie., cancel pending requests on the connection 1086 // COMM_FAILURE as well. 1087 1088 // IIOPOutputStream will cleanup the connection info when it 1089 // sees this exception. 1090 SystemException exc = wrapper.writeErrorSend(e1); 1091 purgeCalls(exc, false, true); 1092 throw exc; 1093 } 1094 } 1095 registerWaiter(MessageMediator messageMediator)1096 public void registerWaiter(MessageMediator messageMediator) 1097 { 1098 responseWaitingRoom.registerWaiter(messageMediator); 1099 } 1100 unregisterWaiter(MessageMediator messageMediator)1101 public void unregisterWaiter(MessageMediator messageMediator) 1102 { 1103 responseWaitingRoom.unregisterWaiter(messageMediator); 1104 } 1105 waitForResponse(MessageMediator messageMediator)1106 public InputObject waitForResponse(MessageMediator messageMediator) 1107 { 1108 return responseWaitingRoom.waitForResponse(messageMediator); 1109 } 1110 setConnectionCache(ConnectionCache connectionCache)1111 public void setConnectionCache(ConnectionCache connectionCache) 1112 { 1113 this.connectionCache = connectionCache; 1114 } 1115 getConnectionCache()1116 public ConnectionCache getConnectionCache() 1117 { 1118 return connectionCache; 1119 } 1120 1121 //////////////////////////////////////////////////// 1122 // 1123 // EventHandler methods 1124 // 1125 setUseSelectThreadToWait(boolean x)1126 public void setUseSelectThreadToWait(boolean x) 1127 { 1128 useSelectThreadToWait = x; 1129 // REVISIT - Reading of a GIOP header only is information 1130 // that should be passed into the constructor 1131 // from the SocketOrChannelConnection factory. 1132 setReadGiopHeaderOnly(shouldUseSelectThreadToWait()); 1133 } 1134 handleEvent()1135 public void handleEvent() 1136 { 1137 if (orb.transportDebugFlag) { 1138 dprint(".handleEvent->: " + this); 1139 } 1140 getSelectionKey().interestOps(getSelectionKey().interestOps() & 1141 (~ getInterestOps())); 1142 1143 if (shouldUseWorkerThreadForEvent()) { 1144 Throwable throwable = null; 1145 try { 1146 int poolToUse = 0; 1147 if (shouldReadGiopHeaderOnly()) { 1148 partialMessageMediator = readBits(); 1149 poolToUse = 1150 partialMessageMediator.getThreadPoolToUse(); 1151 } 1152 1153 if (orb.transportDebugFlag) { 1154 dprint(".handleEvent: addWork to pool: " + poolToUse); 1155 } 1156 orb.getThreadPoolManager().getThreadPool(poolToUse) 1157 .getWorkQueue(0).addWork(getWork()); 1158 } catch (NoSuchThreadPoolException e) { 1159 throwable = e; 1160 } catch (NoSuchWorkQueueException e) { 1161 throwable = e; 1162 } 1163 // REVISIT: need to close connection. 1164 if (throwable != null) { 1165 if (orb.transportDebugFlag) { 1166 dprint(".handleEvent: " + throwable); 1167 } 1168 INTERNAL i = new INTERNAL("NoSuchThreadPoolException"); 1169 i.initCause(throwable); 1170 throw i; 1171 } 1172 } else { 1173 if (orb.transportDebugFlag) { 1174 dprint(".handleEvent: doWork"); 1175 } 1176 getWork().doWork(); 1177 } 1178 if (orb.transportDebugFlag) { 1179 dprint(".handleEvent<-: " + this); 1180 } 1181 } 1182 getChannel()1183 public SelectableChannel getChannel() 1184 { 1185 return socketChannel; 1186 } 1187 getInterestOps()1188 public int getInterestOps() 1189 { 1190 return SelectionKey.OP_READ; 1191 } 1192 1193 // public Acceptor getAcceptor() - already defined above. 1194 getConnection()1195 public Connection getConnection() 1196 { 1197 return this; 1198 } 1199 1200 //////////////////////////////////////////////////// 1201 // 1202 // Work methods. 1203 // 1204 getName()1205 public String getName() 1206 { 1207 return this.toString(); 1208 } 1209 doWork()1210 public void doWork() 1211 { 1212 try { 1213 if (orb.transportDebugFlag) { 1214 dprint(".doWork->: " + this); 1215 } 1216 1217 // IMPORTANT: Sanity checks on SelectionKeys such as 1218 // SelectorKey.isValid() should not be done 1219 // here. 1220 // 1221 1222 if (!shouldReadGiopHeaderOnly()) { 1223 read(); 1224 } 1225 else { 1226 // get the partialMessageMediator 1227 // created by SelectorThread 1228 CorbaMessageMediator messageMediator = 1229 this.getPartialMessageMediator(); 1230 1231 // read remaining info needed in a MessageMediator 1232 messageMediator = finishReadingBits(messageMediator); 1233 1234 if (messageMediator != null) { 1235 // Null can happen when client closes stream 1236 // causing purgecalls. 1237 dispatch(messageMediator); 1238 } 1239 } 1240 } catch (Throwable t) { 1241 if (orb.transportDebugFlag) { 1242 dprint(".doWork: ignoring Throwable: " 1243 + t 1244 + " " + this); 1245 } 1246 } finally { 1247 if (orb.transportDebugFlag) { 1248 dprint(".doWork<-: " + this); 1249 } 1250 } 1251 } 1252 setEnqueueTime(long timeInMillis)1253 public void setEnqueueTime(long timeInMillis) 1254 { 1255 enqueueTime = timeInMillis; 1256 } 1257 getEnqueueTime()1258 public long getEnqueueTime() 1259 { 1260 return enqueueTime; 1261 } 1262 1263 //////////////////////////////////////////////////// 1264 // 1265 // spi.transport.CorbaConnection. 1266 // 1267 1268 // IMPORTANT: Reader Threads must NOT read Giop header only. shouldReadGiopHeaderOnly()1269 public boolean shouldReadGiopHeaderOnly() { 1270 return shouldReadGiopHeaderOnly; 1271 } 1272 setReadGiopHeaderOnly(boolean shouldReadHeaderOnly)1273 protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) { 1274 shouldReadGiopHeaderOnly = shouldReadHeaderOnly; 1275 } 1276 getResponseWaitingRoom()1277 public ResponseWaitingRoom getResponseWaitingRoom() 1278 { 1279 return responseWaitingRoom; 1280 } 1281 1282 // REVISIT - inteface defines isServer but already defined in 1283 // higher interface. 1284 serverRequestMapPut(int requestId, CorbaMessageMediator messageMediator)1285 public void serverRequestMapPut(int requestId, 1286 CorbaMessageMediator messageMediator) 1287 { 1288 serverRequestMap.put(new Integer(requestId), messageMediator); 1289 } 1290 serverRequestMapGet(int requestId)1291 public CorbaMessageMediator serverRequestMapGet(int requestId) 1292 { 1293 return (CorbaMessageMediator) 1294 serverRequestMap.get(new Integer(requestId)); 1295 } 1296 serverRequestMapRemove(int requestId)1297 public void serverRequestMapRemove(int requestId) 1298 { 1299 serverRequestMap.remove(new Integer(requestId)); 1300 } 1301 1302 1303 // REVISIT: this is also defined in: 1304 // com.sun.corba.se.spi.legacy.connection.Connection getSocket()1305 public java.net.Socket getSocket() 1306 { 1307 return socket; 1308 } 1309 1310 /** It is possible for a Close Connection to have been 1311 ** sent here, but we will not check for this. A "lazy" 1312 ** Exception will be thrown in the Worker thread after the 1313 ** incoming request has been processed even though the connection 1314 ** is closed before the request is processed. This is o.k because 1315 ** it is a boundary condition. To prevent it we would have to add 1316 ** more locks which would reduce performance in the normal case. 1317 **/ serverRequestProcessingBegins()1318 public synchronized void serverRequestProcessingBegins() 1319 { 1320 serverRequestCount++; 1321 } 1322 serverRequestProcessingEnds()1323 public synchronized void serverRequestProcessingEnds() 1324 { 1325 serverRequestCount--; 1326 } 1327 1328 // 1329 // 1330 // 1331 getNextRequestId()1332 public synchronized int getNextRequestId() 1333 { 1334 return requestId++; 1335 } 1336 1337 // Negotiated code sets for char and wchar data 1338 protected CodeSetComponentInfo.CodeSetContext codeSetContext = null; 1339 getBroker()1340 public ORB getBroker() 1341 { 1342 return orb; 1343 } 1344 getCodeSetContext()1345 public CodeSetComponentInfo.CodeSetContext getCodeSetContext() { 1346 // Needs to be synchronized for the following case when the client 1347 // doesn't send the code set context twice, and we have two threads 1348 // in ServerRequestDispatcher processCodeSetContext. 1349 // 1350 // Thread A checks to see if there is a context, there is none, so 1351 // it calls setCodeSetContext, getting the synch lock. 1352 // Thread B checks to see if there is a context. If we didn't synch, 1353 // it might decide to outlaw wchar/wstring. 1354 if (codeSetContext == null) { 1355 synchronized(this) { 1356 return codeSetContext; 1357 } 1358 } 1359 1360 return codeSetContext; 1361 } 1362 setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc)1363 public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) { 1364 // Double check whether or not we need to do this 1365 if (codeSetContext == null) { 1366 1367 if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null || 1368 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) { 1369 // If the client says it's negotiated a code set that 1370 // isn't a fallback and we never said we support, then 1371 // it has a bug. 1372 throw wrapper.badCodesetsFromClient() ; 1373 } 1374 1375 codeSetContext = csc; 1376 } 1377 } 1378 1379 // 1380 // from iiop.IIOPConnection.java 1381 // 1382 1383 // Map request ID to an InputObject. 1384 // This is so the client thread can start unmarshaling 1385 // the reply and remove it from the out_calls map while the 1386 // ReaderThread can still obtain the input stream to give 1387 // new fragments. Only the ReaderThread touches the clientReplyMap, 1388 // so it doesn't incur synchronization overhead. 1389 clientRequestMapGet(int requestId)1390 public MessageMediator clientRequestMapGet(int requestId) 1391 { 1392 return responseWaitingRoom.getMessageMediator(requestId); 1393 } 1394 1395 protected MessageMediator clientReply_1_1; 1396 clientReply_1_1_Put(MessageMediator x)1397 public void clientReply_1_1_Put(MessageMediator x) 1398 { 1399 clientReply_1_1 = x; 1400 } 1401 clientReply_1_1_Get()1402 public MessageMediator clientReply_1_1_Get() 1403 { 1404 return clientReply_1_1; 1405 } 1406 clientReply_1_1_Remove()1407 public void clientReply_1_1_Remove() 1408 { 1409 clientReply_1_1 = null; 1410 } 1411 1412 protected MessageMediator serverRequest_1_1; 1413 serverRequest_1_1_Put(MessageMediator x)1414 public void serverRequest_1_1_Put(MessageMediator x) 1415 { 1416 serverRequest_1_1 = x; 1417 } 1418 serverRequest_1_1_Get()1419 public MessageMediator serverRequest_1_1_Get() 1420 { 1421 return serverRequest_1_1; 1422 } 1423 serverRequest_1_1_Remove()1424 public void serverRequest_1_1_Remove() 1425 { 1426 serverRequest_1_1 = null; 1427 } 1428 getStateString( int state )1429 protected String getStateString( int state ) 1430 { 1431 synchronized ( stateEvent ){ 1432 switch (state) { 1433 case OPENING : return "OPENING" ; 1434 case ESTABLISHED : return "ESTABLISHED" ; 1435 case CLOSE_SENT : return "CLOSE_SENT" ; 1436 case CLOSE_RECVD : return "CLOSE_RECVD" ; 1437 case ABORT : return "ABORT" ; 1438 default : return "???" ; 1439 } 1440 } 1441 } 1442 isPostInitialContexts()1443 public synchronized boolean isPostInitialContexts() { 1444 return postInitialContexts; 1445 } 1446 1447 // Can never be unset... setPostInitialContexts()1448 public synchronized void setPostInitialContexts(){ 1449 postInitialContexts = true; 1450 } 1451 1452 /** 1453 * Wake up the outstanding requests on the connection, and hand them 1454 * COMM_FAILURE exception with a given minor code. 1455 * 1456 * Also, delete connection from connection table and 1457 * stop the reader thread. 1458 1459 * Note that this should only ever be called by the Reader thread for 1460 * this connection. 1461 * 1462 * @param minor_code The minor code for the COMM_FAILURE major code. 1463 * @param die Kill the reader thread (this thread) before exiting. 1464 */ purgeCalls(SystemException systemException, boolean die, boolean lockHeld)1465 public void purgeCalls(SystemException systemException, 1466 boolean die, boolean lockHeld) 1467 { 1468 int minor_code = systemException.minor; 1469 1470 try{ 1471 if (orb.transportDebugFlag) { 1472 dprint(".purgeCalls->: " 1473 + minor_code + "/" + die + "/" + lockHeld 1474 + " " + this); 1475 } 1476 1477 // If this invocation is a result of ThreadDeath caused 1478 // by a previous execution of this routine, just exit. 1479 1480 synchronized ( stateEvent ){ 1481 if ((state == ABORT) || (state == CLOSE_RECVD)) { 1482 if (orb.transportDebugFlag) { 1483 dprint(".purgeCalls: exiting since state is: " 1484 + getStateString(state) 1485 + " " + this); 1486 } 1487 return; 1488 } 1489 } 1490 1491 // Grab the writeLock (freeze the calls) 1492 try { 1493 if (!lockHeld) { 1494 writeLock(); 1495 } 1496 } catch (SystemException ex) { 1497 if (orb.transportDebugFlag) 1498 dprint(".purgeCalls: SystemException" + ex 1499 + "; continuing " + this); 1500 } 1501 1502 // Mark the state of the connection 1503 // and determine the request status 1504 org.omg.CORBA.CompletionStatus completion_status; 1505 synchronized ( stateEvent ){ 1506 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) { 1507 state = CLOSE_RECVD; 1508 systemException.completed = CompletionStatus.COMPLETED_NO; 1509 } else { 1510 state = ABORT; 1511 systemException.completed = CompletionStatus.COMPLETED_MAYBE; 1512 } 1513 stateEvent.notifyAll(); 1514 } 1515 1516 try { 1517 socket.getInputStream().close(); 1518 socket.getOutputStream().close(); 1519 socket.close(); 1520 } catch (Exception ex) { 1521 if (orb.transportDebugFlag) { 1522 dprint(".purgeCalls: Exception closing socket: " + ex 1523 + " " + this); 1524 } 1525 } 1526 1527 // Signal all threads with outstanding requests on this 1528 // connection and give them the SystemException; 1529 1530 responseWaitingRoom.signalExceptionToAllWaiters(systemException); 1531 } finally { 1532 if (contactInfo != null) { 1533 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo); 1534 } else if (acceptor != null) { 1535 ((InboundConnectionCache)getConnectionCache()).remove(this); 1536 } 1537 1538 // 1539 // REVISIT: Stop the reader thread 1540 // 1541 1542 // Signal all the waiters of the writeLock. 1543 // There are 4 types of writeLock waiters: 1544 // 1. Send waiters: 1545 // 2. SendReply waiters: 1546 // 3. cleanUp waiters: 1547 // 4. purge_call waiters: 1548 // 1549 1550 writeUnlock(); 1551 1552 if (orb.transportDebugFlag) { 1553 dprint(".purgeCalls<-: " 1554 + minor_code + "/" + die + "/" + lockHeld 1555 + " " + this); 1556 } 1557 } 1558 } 1559 1560 /************************************************************************* 1561 * The following methods are for dealing with Connection cleaning for 1562 * better scalability of servers in high network load conditions. 1563 **************************************************************************/ 1564 sendCloseConnection(GIOPVersion giopVersion)1565 public void sendCloseConnection(GIOPVersion giopVersion) 1566 throws IOException 1567 { 1568 Message msg = MessageBase.createCloseConnection(giopVersion); 1569 sendHelper(giopVersion, msg); 1570 } 1571 sendMessageError(GIOPVersion giopVersion)1572 public void sendMessageError(GIOPVersion giopVersion) 1573 throws IOException 1574 { 1575 Message msg = MessageBase.createMessageError(giopVersion); 1576 sendHelper(giopVersion, msg); 1577 } 1578 1579 /** 1580 * Send a CancelRequest message. This does not lock the connection, so the 1581 * caller needs to ensure this method is called appropriately. 1582 * @exception IOException - could be due to abortive connection closure. 1583 */ sendCancelRequest(GIOPVersion giopVersion, int requestId)1584 public void sendCancelRequest(GIOPVersion giopVersion, int requestId) 1585 throws IOException 1586 { 1587 1588 Message msg = MessageBase.createCancelRequest(giopVersion, requestId); 1589 sendHelper(giopVersion, msg); 1590 } 1591 sendHelper(GIOPVersion giopVersion, Message msg)1592 protected void sendHelper(GIOPVersion giopVersion, Message msg) 1593 throws IOException 1594 { 1595 // REVISIT: See comments in CDROutputObject constructor. 1596 CDROutputObject outputObject = 1597 sun.corba.OutputStreamFactory.newCDROutputObject((ORB)orb, null, giopVersion, 1598 this, msg, ORBConstants.STREAM_FORMAT_VERSION_1); 1599 msg.write(outputObject); 1600 1601 outputObject.writeTo(this); 1602 } 1603 sendCancelRequestWithLock(GIOPVersion giopVersion, int requestId)1604 public void sendCancelRequestWithLock(GIOPVersion giopVersion, 1605 int requestId) 1606 throws IOException 1607 { 1608 writeLock(); 1609 try { 1610 sendCancelRequest(giopVersion, requestId); 1611 } finally { 1612 writeUnlock(); 1613 } 1614 } 1615 1616 // Begin Code Base methods --------------------------------------- 1617 // 1618 // Set this connection's code base IOR. The IOR comes from the 1619 // SendingContext. This is an optional service context, but all 1620 // JavaSoft ORBs send it. 1621 // 1622 // The set and get methods don't need to be synchronized since the 1623 // first possible get would occur during reading a valuetype, and 1624 // that would be after the set. 1625 1626 // Sets this connection's code base IOR. This is done after 1627 // getting the IOR out of the SendingContext service context. 1628 // Our ORBs always send this, but it's optional in CORBA. 1629 setCodeBaseIOR(IOR ior)1630 public final void setCodeBaseIOR(IOR ior) { 1631 codeBaseServerIOR = ior; 1632 } 1633 getCodeBaseIOR()1634 public final IOR getCodeBaseIOR() { 1635 return codeBaseServerIOR; 1636 } 1637 1638 // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase 1639 // won't connect to the remote codebase unless it's necessary. getCodeBase()1640 public final CodeBase getCodeBase() { 1641 return cachedCodeBase; 1642 } 1643 1644 // End Code Base methods ----------------------------------------- 1645 1646 // set transport read thresholds setReadTimeouts(ReadTimeouts readTimeouts)1647 protected void setReadTimeouts(ReadTimeouts readTimeouts) { 1648 this.readTimeouts = readTimeouts; 1649 } 1650 setPartialMessageMediator(CorbaMessageMediator messageMediator)1651 protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) { 1652 partialMessageMediator = messageMediator; 1653 } 1654 getPartialMessageMediator()1655 protected CorbaMessageMediator getPartialMessageMediator() { 1656 return partialMessageMediator; 1657 } 1658 toString()1659 public String toString() 1660 { 1661 synchronized ( stateEvent ){ 1662 return 1663 "SocketOrChannelConnectionImpl[" + " " 1664 + (socketChannel == null ? 1665 socket.toString() : socketChannel.toString()) + " " 1666 + getStateString( state ) + " " 1667 + shouldUseSelectThreadToWait() + " " 1668 + shouldUseWorkerThreadForEvent() + " " 1669 + shouldReadGiopHeaderOnly() 1670 + "]" ; 1671 } 1672 } 1673 1674 // Must be public - used in encoding. dprint(String msg)1675 public void dprint(String msg) 1676 { 1677 ORBUtility.dprint("SocketOrChannelConnectionImpl", msg); 1678 } 1679 dprint(String msg, Throwable t)1680 protected void dprint(String msg, Throwable t) 1681 { 1682 dprint(msg); 1683 t.printStackTrace(System.out); 1684 } 1685 } 1686 1687 // End of file. 1688