1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 namespace Ice 6 { 7 using System; 8 using System.Collections.Generic; 9 using System.Diagnostics; 10 using System.Text; 11 using System.Threading; 12 using System.Threading.Tasks; 13 using System.Linq; 14 15 using Instrumentation; 16 using IceInternal; 17 18 public sealed class ConnectionI : IceInternal.EventHandler, ResponseHandler, CancellationHandler, Connection 19 { 20 public interface StartCallback 21 { connectionStartCompleted(ConnectionI connection)22 void connectionStartCompleted(ConnectionI connection); connectionStartFailed(ConnectionI connection, LocalException ex)23 void connectionStartFailed(ConnectionI connection, LocalException ex); 24 } 25 26 private class TimeoutCallback : TimerTask 27 { TimeoutCallback(ConnectionI connection)28 public TimeoutCallback(ConnectionI connection) 29 { 30 _connection = connection; 31 } 32 runTimerTask()33 public void runTimerTask() 34 { 35 _connection.timedOut(); 36 } 37 38 private ConnectionI _connection; 39 } 40 start(StartCallback callback)41 public void start(StartCallback callback) 42 { 43 try 44 { 45 lock(this) 46 { 47 // 48 // The connection might already be closed if the communicator was destroyed. 49 // 50 if(_state >= StateClosed) 51 { 52 Debug.Assert(_exception != null); 53 throw _exception; 54 } 55 56 if(!initialize(SocketOperation.None) || !validate(SocketOperation.None)) 57 { 58 _startCallback = callback; 59 return; 60 } 61 62 // 63 // We start out in holding state. 64 // 65 setState(StateHolding); 66 } 67 } 68 catch(LocalException ex) 69 { 70 exception(ex); 71 callback.connectionStartFailed(this, _exception); 72 return; 73 } 74 75 callback.connectionStartCompleted(this); 76 } 77 startAndWait()78 public void startAndWait() 79 { 80 try 81 { 82 lock(this) 83 { 84 // 85 // The connection might already be closed if the communicator was destroyed. 86 // 87 if(_state >= StateClosed) 88 { 89 Debug.Assert(_exception != null); 90 throw _exception; 91 } 92 93 if(!initialize(SocketOperation.None) || !validate(SocketOperation.None)) 94 { 95 // 96 // Wait for the connection to be validated. 97 // 98 while(_state <= StateNotValidated) 99 { 100 Monitor.Wait(this); 101 } 102 103 if(_state >= StateClosing) 104 { 105 Debug.Assert(_exception != null); 106 throw _exception; 107 } 108 } 109 110 // 111 // We start out in holding state. 112 // 113 setState(StateHolding); 114 } 115 } 116 catch(LocalException ex) 117 { 118 exception(ex); 119 waitUntilFinished(); 120 return; 121 } 122 } 123 activate()124 public void activate() 125 { 126 lock(this) 127 { 128 if(_state <= StateNotValidated) 129 { 130 return; 131 } 132 133 if(_acmLastActivity > -1) 134 { 135 _acmLastActivity = Time.currentMonotonicTimeMillis(); 136 } 137 setState(StateActive); 138 } 139 } 140 hold()141 public void hold() 142 { 143 lock(this) 144 { 145 if(_state <= StateNotValidated) 146 { 147 return; 148 } 149 150 setState(StateHolding); 151 } 152 } 153 154 // DestructionReason. 155 public const int ObjectAdapterDeactivated = 0; 156 public const int CommunicatorDestroyed = 1; 157 destroy(int reason)158 public void destroy(int reason) 159 { 160 lock(this) 161 { 162 switch(reason) 163 { 164 case ObjectAdapterDeactivated: 165 { 166 setState(StateClosing, new ObjectAdapterDeactivatedException()); 167 break; 168 } 169 170 case CommunicatorDestroyed: 171 { 172 setState(StateClosing, new CommunicatorDestroyedException()); 173 break; 174 } 175 } 176 } 177 } 178 close(ConnectionClose mode)179 public void close(ConnectionClose mode) 180 { 181 lock(this) 182 { 183 if(mode == ConnectionClose.Forcefully) 184 { 185 setState(StateClosed, new ConnectionManuallyClosedException(false)); 186 } 187 else if(mode == ConnectionClose.Gracefully) 188 { 189 setState(StateClosing, new ConnectionManuallyClosedException(true)); 190 } 191 else 192 { 193 Debug.Assert(mode == ConnectionClose.GracefullyWithWait); 194 195 // 196 // Wait until all outstanding requests have been completed. 197 // 198 while(_asyncRequests.Count != 0) 199 { 200 Monitor.Wait(this); 201 } 202 203 setState(StateClosing, new ConnectionManuallyClosedException(true)); 204 } 205 } 206 } 207 isActiveOrHolding()208 public bool isActiveOrHolding() 209 { 210 lock(this) 211 { 212 return _state > StateNotValidated && _state < StateClosing; 213 } 214 } 215 isFinished()216 public bool isFinished() 217 { 218 // 219 // We can use TryLock here, because as long as there are still 220 // threads operating in this connection object, connection 221 // destruction is considered as not yet finished. 222 // 223 if(!Monitor.TryEnter(this)) 224 { 225 return false; 226 } 227 228 try 229 { 230 if(_state != StateFinished || _dispatchCount != 0) 231 { 232 return false; 233 } 234 235 Debug.Assert(_state == StateFinished); 236 return true; 237 } 238 finally 239 { 240 Monitor.Exit(this); 241 } 242 } 243 throwException()244 public void throwException() 245 { 246 lock(this) 247 { 248 if(_exception != null) 249 { 250 Debug.Assert(_state >= StateClosing); 251 throw _exception; 252 } 253 } 254 } 255 waitUntilHolding()256 public void waitUntilHolding() 257 { 258 lock(this) 259 { 260 while(_state < StateHolding || _dispatchCount > 0) 261 { 262 Monitor.Wait(this); 263 } 264 } 265 } 266 waitUntilFinished()267 public void waitUntilFinished() 268 { 269 lock(this) 270 { 271 // 272 // We wait indefinitely until the connection is finished and all 273 // outstanding requests are completed. Otherwise we couldn't 274 // guarantee that there are no outstanding calls when deactivate() 275 // is called on the servant locators. 276 // 277 while(_state < StateFinished || _dispatchCount > 0) 278 { 279 Monitor.Wait(this); 280 } 281 282 Debug.Assert(_state == StateFinished); 283 284 // 285 // Clear the OA. See bug 1673 for the details of why this is necessary. 286 // 287 _adapter = null; 288 } 289 } 290 updateObserver()291 public void updateObserver() 292 { 293 lock(this) 294 { 295 if(_state < StateNotValidated || _state > StateClosed) 296 { 297 return; 298 } 299 300 Debug.Assert(_instance.initializationData().observer != null); 301 _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), 302 _endpoint, 303 toConnectionState(_state), 304 _observer); 305 if(_observer != null) 306 { 307 _observer.attach(); 308 } 309 else 310 { 311 _writeStreamPos = -1; 312 _readStreamPos = -1; 313 } 314 } 315 } 316 monitor(long now, ACMConfig acm)317 public void monitor(long now, ACMConfig acm) 318 { 319 lock(this) 320 { 321 if(_state != StateActive) 322 { 323 return; 324 } 325 326 // 327 // We send a heartbeat if there was no activity in the last 328 // (timeout / 4) period. Sending a heartbeat sooner than 329 // really needed is safer to ensure that the receiver will 330 // receive the heartbeat in time. Sending the heartbeat if 331 // there was no activity in the last (timeout / 2) period 332 // isn't enough since monitor() is called only every (timeout 333 // / 2) period. 334 // 335 // Note that this doesn't imply that we are sending 4 heartbeats 336 // per timeout period because the monitor() method is still only 337 // called every (timeout / 2) period. 338 // 339 if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways || 340 (acm.heartbeat != ACMHeartbeat.HeartbeatOff && _writeStream.isEmpty() && 341 now >= (_acmLastActivity + acm.timeout / 4))) 342 { 343 if(acm.heartbeat != ACMHeartbeat.HeartbeatOnDispatch || _dispatchCount > 0) 344 { 345 sendHeartbeatNow(); 346 } 347 } 348 349 if(_readStream.size() > Protocol.headerSize || !_writeStream.isEmpty()) 350 { 351 // 352 // If writing or reading, nothing to do, the connection 353 // timeout will kick-in if writes or reads don't progress. 354 // This check is necessary because the actitivy timer is 355 // only set when a message is fully read/written. 356 // 357 return; 358 } 359 360 if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout)) 361 { 362 if(acm.close == ACMClose.CloseOnIdleForceful || 363 (acm.close != ACMClose.CloseOnIdle && (_asyncRequests.Count > 0))) 364 { 365 // 366 // Close the connection if we didn't receive a heartbeat in 367 // the last period. 368 // 369 setState(StateClosed, new ConnectionTimeoutException()); 370 } 371 else if(acm.close != ACMClose.CloseOnInvocation && 372 _dispatchCount == 0 && _batchRequestQueue.isEmpty() && 373 _asyncRequests.Count == 0) 374 { 375 // 376 // The connection is idle, close it. 377 // 378 setState(StateClosing, new ConnectionTimeoutException()); 379 } 380 } 381 } 382 } 383 sendAsyncRequest(OutgoingAsyncBase og, bool compress, bool response, int batchRequestNum)384 public int sendAsyncRequest(OutgoingAsyncBase og, bool compress, bool response, 385 int batchRequestNum) 386 { 387 OutputStream os = og.getOs(); 388 389 lock(this) 390 { 391 // 392 // If the exception is closed before we even have a chance 393 // to send our request, we always try to send the request 394 // again. 395 // 396 if(_exception != null) 397 { 398 throw new RetryException(_exception); 399 } 400 401 Debug.Assert(_state > StateNotValidated); 402 Debug.Assert(_state < StateClosing); 403 404 // 405 // Ensure the message isn't bigger than what we can send with the 406 // transport. 407 // 408 _transceiver.checkSendSize(os.getBuffer()); 409 410 // 411 // Notify the request that it's cancelable with this connection. 412 // This will throw if the request is canceled. 413 // 414 og.cancelable(this); 415 int requestId = 0; 416 if(response) 417 { 418 // 419 // Create a new unique request ID. 420 // 421 requestId = _nextRequestId++; 422 if(requestId <= 0) 423 { 424 _nextRequestId = 1; 425 requestId = _nextRequestId++; 426 } 427 428 // 429 // Fill in the request ID. 430 // 431 os.pos(Protocol.headerSize); 432 os.writeInt(requestId); 433 } 434 else if(batchRequestNum > 0) 435 { 436 os.pos(Protocol.headerSize); 437 os.writeInt(batchRequestNum); 438 } 439 440 og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); 441 442 int status = OutgoingAsyncBase.AsyncStatusQueued; 443 try 444 { 445 OutgoingMessage message = new OutgoingMessage(og, os, compress, requestId); 446 status = sendMessage(message); 447 } 448 catch(LocalException ex) 449 { 450 setState(StateClosed, ex); 451 Debug.Assert(_exception != null); 452 throw _exception; 453 } 454 455 if(response) 456 { 457 // 458 // Add to the async requests map. 459 // 460 _asyncRequests[requestId] = og; 461 } 462 return status; 463 } 464 } 465 getBatchRequestQueue()466 public BatchRequestQueue getBatchRequestQueue() 467 { 468 return _batchRequestQueue; 469 } 470 flushBatchRequests(CompressBatch compressBatch)471 public void flushBatchRequests(CompressBatch compressBatch) 472 { 473 try 474 { 475 var completed = new FlushBatchTaskCompletionCallback(); 476 var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed); 477 outgoing.invoke(_flushBatchRequests_name, compressBatch, true); 478 completed.Task.Wait(); 479 } 480 catch(AggregateException ex) 481 { 482 throw ex.InnerException; 483 } 484 } 485 486 private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback 487 { ConnectionFlushBatchCompletionCallback(Connection connection, Communicator communicator, Instance instance, string op, object cookie, AsyncCallback callback)488 public ConnectionFlushBatchCompletionCallback(Connection connection, 489 Communicator communicator, 490 Instance instance, 491 string op, 492 object cookie, 493 AsyncCallback callback) 494 : base(communicator, instance, op, cookie, callback) 495 { 496 _connection = connection; 497 } 498 getConnection()499 public override Connection getConnection() 500 { 501 return _connection; 502 } 503 getCompletedCallback()504 protected override AsyncCallback getCompletedCallback() 505 { 506 return (AsyncResult result) => 507 { 508 try 509 { 510 result.throwLocalException(); 511 } 512 catch(Exception ex) 513 { 514 if(exceptionCallback_ != null) 515 { 516 exceptionCallback_.Invoke(ex); 517 } 518 } 519 }; 520 } 521 522 private Connection _connection; 523 } 524 flushBatchRequestsAsync(CompressBatch compressBatch, IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken())525 public Task flushBatchRequestsAsync(CompressBatch compressBatch, 526 IProgress<bool> progress = null, 527 CancellationToken cancel = new CancellationToken()) 528 { 529 var completed = new FlushBatchTaskCompletionCallback(progress, cancel); 530 var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed); 531 outgoing.invoke(_flushBatchRequests_name, compressBatch, false); 532 return completed.Task; 533 } 534 begin_flushBatchRequests(CompressBatch compressBatch, AsyncCallback cb = null, object cookie = null)535 public AsyncResult begin_flushBatchRequests(CompressBatch compressBatch, 536 AsyncCallback cb = null, 537 object cookie = null) 538 { 539 var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance, 540 _flushBatchRequests_name, cookie, cb); 541 var outgoing = new ConnectionFlushBatchAsync(this, _instance, result); 542 outgoing.invoke(_flushBatchRequests_name, compressBatch, false); 543 return result; 544 } 545 end_flushBatchRequests(AsyncResult r)546 public void end_flushBatchRequests(AsyncResult r) 547 { 548 if(r != null && r.getConnection() != this) 549 { 550 const string msg = "Connection for call to end_" + _flushBatchRequests_name + 551 " does not match connection that was used to call corresponding begin_" + 552 _flushBatchRequests_name + " method"; 553 throw new ArgumentException(msg); 554 } 555 AsyncResultI.check(r, _flushBatchRequests_name).wait(); 556 } 557 558 private const string _flushBatchRequests_name = "flushBatchRequests"; 559 setCloseCallback(CloseCallback callback)560 public void setCloseCallback(CloseCallback callback) 561 { 562 lock(this) 563 { 564 if(_state >= StateClosed) 565 { 566 if(callback != null) 567 { 568 _threadPool.dispatch(() => 569 { 570 try 571 { 572 callback(this); 573 } 574 catch(System.Exception ex) 575 { 576 _logger.error("connection callback exception:\n" + ex + '\n' + _desc); 577 } 578 } , this); 579 } 580 } 581 else 582 { 583 _closeCallback = callback; 584 } 585 } 586 } 587 setHeartbeatCallback(HeartbeatCallback callback)588 public void setHeartbeatCallback(HeartbeatCallback callback) 589 { 590 lock(this) 591 { 592 if(_state >= StateClosed) 593 { 594 return; 595 } 596 _heartbeatCallback = callback; 597 } 598 } 599 heartbeat()600 public void heartbeat() 601 { 602 heartbeatAsync().Wait(); 603 } 604 605 private class HeartbeatCompletionCallback : AsyncResultCompletionCallback 606 { HeartbeatCompletionCallback(Ice.Connection connection, Ice.Communicator communicator, Instance instance, object cookie, Ice.AsyncCallback callback)607 public HeartbeatCompletionCallback(Ice.Connection connection, 608 Ice.Communicator communicator, 609 Instance instance, 610 object cookie, 611 Ice.AsyncCallback callback) 612 : base(communicator, instance, "heartbeat", cookie, callback) 613 { 614 _connection = connection; 615 } 616 getConnection()617 public override Ice.Connection getConnection() 618 { 619 return _connection; 620 } 621 getCompletedCallback()622 protected override Ice.AsyncCallback getCompletedCallback() 623 { 624 return (Ice.AsyncResult result) => 625 { 626 try 627 { 628 result.throwLocalException(); 629 } 630 catch(Ice.Exception ex) 631 { 632 if(exceptionCallback_ != null) 633 { 634 exceptionCallback_.Invoke(ex); 635 } 636 } 637 }; 638 } 639 640 private Ice.Connection _connection; 641 } 642 643 private class HeartbeatTaskCompletionCallback : TaskCompletionCallback<object> 644 { HeartbeatTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)645 public HeartbeatTaskCompletionCallback(System.IProgress<bool> progress, 646 CancellationToken cancellationToken) : 647 base(progress, cancellationToken) 648 { 649 } 650 handleInvokeResponse(bool ok, OutgoingAsyncBase og)651 public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og) 652 { 653 SetResult(null); 654 } 655 } 656 657 private class HeartbeatAsync : OutgoingAsyncBase 658 { HeartbeatAsync(Ice.ConnectionI connection, Instance instance, OutgoingAsyncCompletionCallback completionCallback)659 public HeartbeatAsync(Ice.ConnectionI connection, 660 Instance instance, 661 OutgoingAsyncCompletionCallback completionCallback) : 662 base(instance, completionCallback) 663 { 664 _connection = connection; 665 } 666 invoke()667 public void invoke() 668 { 669 try 670 { 671 os_.writeBlob(IceInternal.Protocol.magic); 672 ProtocolVersion.ice_write(os_, Ice.Util.currentProtocol); 673 EncodingVersion.ice_write(os_, Ice.Util.currentProtocolEncoding); 674 os_.writeByte(IceInternal.Protocol.validateConnectionMsg); 675 os_.writeByte((byte)0); 676 os_.writeInt(IceInternal.Protocol.headerSize); // Message size. 677 678 int status = _connection.sendAsyncRequest(this, false, false, 0); 679 680 if((status & AsyncStatusSent) != 0) 681 { 682 sentSynchronously_ = true; 683 if((status & AsyncStatusInvokeSentCallback) != 0) 684 { 685 invokeSent(); 686 } 687 } 688 } 689 catch(RetryException ex) 690 { 691 try 692 { 693 throw ex.get(); 694 } 695 catch(Ice.LocalException ee) 696 { 697 if(exception(ee)) 698 { 699 invokeExceptionAsync(); 700 } 701 } 702 } 703 catch(Ice.Exception ex) 704 { 705 if(exception(ex)) 706 { 707 invokeExceptionAsync(); 708 } 709 } 710 } 711 712 private readonly Ice.ConnectionI _connection; 713 } 714 heartbeatAsync(IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken())715 public Task heartbeatAsync(IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken()) 716 { 717 var completed = new HeartbeatTaskCompletionCallback(progress, cancel); 718 var outgoing = new HeartbeatAsync(this, _instance, completed); 719 outgoing.invoke(); 720 return completed.Task; 721 } 722 begin_heartbeat(AsyncCallback cb = null, object cookie = null)723 public AsyncResult begin_heartbeat(AsyncCallback cb = null, object cookie = null) 724 { 725 var result = new HeartbeatCompletionCallback(this, _communicator, _instance, cookie, cb); 726 var outgoing = new HeartbeatAsync(this, _instance, result); 727 outgoing.invoke(); 728 return result; 729 } 730 end_heartbeat(AsyncResult r)731 public void end_heartbeat(AsyncResult r) 732 { 733 if(r != null && r.getConnection() != this) 734 { 735 const string msg = "Connection for call to end_heartbeat does not match connection that was used " + 736 "to call corresponding begin_heartbeat method"; 737 throw new ArgumentException(msg); 738 } 739 AsyncResultI.check(r, "heartbeat").wait(); 740 } 741 setACM(Optional<int> timeout, Optional<ACMClose> close, Optional<ACMHeartbeat> heartbeat)742 public void setACM(Optional<int> timeout, Optional<ACMClose> close, Optional<ACMHeartbeat> heartbeat) 743 { 744 lock(this) 745 { 746 if(timeout.HasValue && timeout.Value < 0) 747 { 748 throw new ArgumentException("invalid negative ACM timeout value"); 749 } 750 if(_monitor == null || _state >= StateClosed) 751 { 752 return; 753 } 754 755 if(_state == StateActive) 756 { 757 _monitor.remove(this); 758 } 759 _monitor = _monitor.acm(timeout, close, heartbeat); 760 761 if(_monitor.getACM().timeout <= 0) 762 { 763 _acmLastActivity = -1; // Disable the recording of last activity. 764 } 765 else if(_state == StateActive && _acmLastActivity == -1) 766 { 767 _acmLastActivity = Time.currentMonotonicTimeMillis(); 768 } 769 770 if(_state == StateActive) 771 { 772 _monitor.add(this); 773 } 774 } 775 } 776 getACM()777 public ACM getACM() 778 { 779 lock(this) 780 { 781 return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); 782 } 783 } 784 asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)785 public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex) 786 { 787 // 788 // NOTE: This isn't called from a thread pool thread. 789 // 790 791 lock(this) 792 { 793 if(_state >= StateClosed) 794 { 795 return; // The request has already been or will be shortly notified of the failure. 796 } 797 798 OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync); 799 if(o != null) 800 { 801 if(o.requestId > 0) 802 { 803 _asyncRequests.Remove(o.requestId); 804 } 805 806 if(ex is ConnectionTimeoutException) 807 { 808 setState(StateClosed, ex); 809 } 810 else 811 { 812 // 813 // If the request is being sent, don't remove it from the send streams, 814 // it will be removed once the sending is finished. 815 // 816 if(o == _sendStreams.First.Value) 817 { 818 o.canceled(); 819 } 820 else 821 { 822 o.canceled(); 823 _sendStreams.Remove(o); 824 } 825 if(outAsync.exception(ex)) 826 { 827 outAsync.invokeExceptionAsync(); 828 } 829 } 830 return; 831 } 832 833 if(outAsync is OutgoingAsync) 834 { 835 foreach(KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests) 836 { 837 if(kvp.Value == outAsync) 838 { 839 if(ex is ConnectionTimeoutException) 840 { 841 setState(StateClosed, ex); 842 } 843 else 844 { 845 _asyncRequests.Remove(kvp.Key); 846 if(outAsync.exception(ex)) 847 { 848 outAsync.invokeExceptionAsync(); 849 } 850 } 851 return; 852 } 853 } 854 } 855 } 856 } 857 sendResponse(int requestId, OutputStream os, byte compressFlag, bool amd)858 public void sendResponse(int requestId, OutputStream os, byte compressFlag, bool amd) 859 { 860 lock(this) 861 { 862 Debug.Assert(_state > StateNotValidated); 863 864 try 865 { 866 if(--_dispatchCount == 0) 867 { 868 if(_state == StateFinished) 869 { 870 reap(); 871 } 872 Monitor.PulseAll(this); 873 } 874 875 if(_state >= StateClosed) 876 { 877 Debug.Assert(_exception != null); 878 throw _exception; 879 } 880 881 sendMessage(new OutgoingMessage(os, compressFlag > 0, true)); 882 883 if(_state == StateClosing && _dispatchCount == 0) 884 { 885 initiateShutdown(); 886 } 887 } 888 catch(LocalException ex) 889 { 890 setState(StateClosed, ex); 891 } 892 } 893 } 894 sendNoResponse()895 public void sendNoResponse() 896 { 897 lock(this) 898 { 899 Debug.Assert(_state > StateNotValidated); 900 901 try 902 { 903 if(--_dispatchCount == 0) 904 { 905 if(_state == StateFinished) 906 { 907 reap(); 908 } 909 Monitor.PulseAll(this); 910 } 911 912 if(_state >= StateClosed) 913 { 914 Debug.Assert(_exception != null); 915 throw _exception; 916 } 917 918 if(_state == StateClosing && _dispatchCount == 0) 919 { 920 initiateShutdown(); 921 } 922 } 923 catch(LocalException ex) 924 { 925 setState(StateClosed, ex); 926 } 927 } 928 } 929 systemException(int requestId, SystemException ex, bool amd)930 public bool systemException(int requestId, SystemException ex, bool amd) 931 { 932 return false; // System exceptions aren't marshalled. 933 } 934 invokeException(int requestId, LocalException ex, int invokeNum, bool amd)935 public void invokeException(int requestId, LocalException ex, int invokeNum, bool amd) 936 { 937 // 938 // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't 939 // called in case of a fatal exception we decrement _dispatchCount here. 940 // 941 942 lock(this) 943 { 944 setState(StateClosed, ex); 945 946 if(invokeNum > 0) 947 { 948 Debug.Assert(_dispatchCount >= invokeNum); 949 _dispatchCount -= invokeNum; 950 if(_dispatchCount == 0) 951 { 952 if(_state == StateFinished) 953 { 954 reap(); 955 } 956 Monitor.PulseAll(this); 957 } 958 } 959 } 960 } 961 endpoint()962 public EndpointI endpoint() 963 { 964 return _endpoint; // No mutex protection necessary, _endpoint is immutable. 965 } 966 connector()967 public Connector connector() 968 { 969 return _connector; // No mutex protection necessary, _endpoint is immutable. 970 } 971 setAdapter(ObjectAdapter adapter)972 public void setAdapter(ObjectAdapter adapter) 973 { 974 if(adapter != null) 975 { 976 // Go through the adapter to set the adapter and servant manager on this connection 977 // to ensure the object adapter is still active. 978 ((ObjectAdapterI)adapter).setAdapterOnConnection(this); 979 } 980 else 981 { 982 lock(this) 983 { 984 if(_state <= StateNotValidated || _state >= StateClosing) 985 { 986 return; 987 } 988 _adapter = null; 989 _servantManager = null; 990 } 991 } 992 993 // 994 // We never change the thread pool with which we were initially 995 // registered, even if we add or remove an object adapter. 996 // 997 } 998 getAdapter()999 public ObjectAdapter getAdapter() 1000 { 1001 lock(this) 1002 { 1003 return _adapter; 1004 } 1005 } 1006 getEndpoint()1007 public Endpoint getEndpoint() 1008 { 1009 return _endpoint; // No mutex protection necessary, _endpoint is immutable. 1010 } 1011 createProxy(Identity ident)1012 public ObjectPrx createProxy(Identity ident) 1013 { 1014 // 1015 // Create a reference and return a reverse proxy for this 1016 // reference. 1017 // 1018 return _instance.proxyFactory().referenceToProxy(_instance.referenceFactory().create(ident, this)); 1019 } 1020 setAdapterAndServantManager(ObjectAdapter adapter, IceInternal.ServantManager servantManager)1021 public void setAdapterAndServantManager(ObjectAdapter adapter, IceInternal.ServantManager servantManager) 1022 { 1023 lock(this) 1024 { 1025 if(_state <= StateNotValidated || _state >= StateClosing) 1026 { 1027 return; 1028 } 1029 Debug.Assert(adapter != null); // Called by ObjectAdapterI::setAdapterOnConnection 1030 _adapter = adapter; 1031 _servantManager = servantManager; 1032 } 1033 } 1034 1035 // 1036 // Operations from EventHandler 1037 // startAsync(int operation, IceInternal.AsyncCallback cb, ref bool completedSynchronously)1038 public override bool startAsync(int operation, IceInternal.AsyncCallback cb, ref bool completedSynchronously) 1039 { 1040 if(_state >= StateClosed) 1041 { 1042 return false; 1043 } 1044 1045 try 1046 { 1047 if((operation & SocketOperation.Write) != 0) 1048 { 1049 if(_observer != null) 1050 { 1051 observerStartWrite(_writeStream.getBuffer()); 1052 } 1053 1054 bool completed; 1055 completedSynchronously = _transceiver.startWrite(_writeStream.getBuffer(), cb, this, out completed); 1056 if(completed && _sendStreams.Count > 0) 1057 { 1058 // The whole message is written, assume it's sent now for at-most-once semantics. 1059 _sendStreams.First.Value.isSent = true; 1060 } 1061 } 1062 else if((operation & SocketOperation.Read) != 0) 1063 { 1064 if(_observer != null && !_readHeader) 1065 { 1066 observerStartRead(_readStream.getBuffer()); 1067 } 1068 1069 completedSynchronously = _transceiver.startRead(_readStream.getBuffer(), cb, this); 1070 } 1071 } 1072 catch(LocalException ex) 1073 { 1074 setState(StateClosed, ex); 1075 return false; 1076 } 1077 return true; 1078 } 1079 finishAsync(int operation)1080 public override bool finishAsync(int operation) 1081 { 1082 try 1083 { 1084 if((operation & SocketOperation.Write) != 0) 1085 { 1086 IceInternal.Buffer buf = _writeStream.getBuffer(); 1087 int start = buf.b.position(); 1088 _transceiver.finishWrite(buf); 1089 if(_instance.traceLevels().network >= 3 && buf.b.position() != start) 1090 { 1091 StringBuilder s = new StringBuilder("sent "); 1092 s.Append(buf.b.position() - start); 1093 if(!_endpoint.datagram()) 1094 { 1095 s.Append(" of "); 1096 s.Append(buf.b.limit() - start); 1097 } 1098 s.Append(" bytes via "); 1099 s.Append(_endpoint.protocol()); 1100 s.Append("\n"); 1101 s.Append(ToString()); 1102 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1103 } 1104 1105 if(_observer != null) 1106 { 1107 observerFinishWrite(_writeStream.getBuffer()); 1108 } 1109 } 1110 else if((operation & SocketOperation.Read) != 0) 1111 { 1112 IceInternal.Buffer buf = _readStream.getBuffer(); 1113 int start = buf.b.position(); 1114 _transceiver.finishRead(buf); 1115 if(_instance.traceLevels().network >= 3 && buf.b.position() != start) 1116 { 1117 StringBuilder s = new StringBuilder("received "); 1118 if(_endpoint.datagram()) 1119 { 1120 s.Append(buf.b.limit()); 1121 } 1122 else 1123 { 1124 s.Append(buf.b.position() - start); 1125 s.Append(" of "); 1126 s.Append(buf.b.limit() - start); 1127 } 1128 s.Append(" bytes via "); 1129 s.Append(_endpoint.protocol()); 1130 s.Append("\n"); 1131 s.Append(ToString()); 1132 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1133 } 1134 1135 if(_observer != null && !_readHeader) 1136 { 1137 observerFinishRead(_readStream.getBuffer()); 1138 } 1139 } 1140 } 1141 catch(LocalException ex) 1142 { 1143 setState(StateClosed, ex); 1144 } 1145 return _state < StateClosed; 1146 } 1147 message(ref ThreadPoolCurrent current)1148 public override void message(ref ThreadPoolCurrent current) 1149 { 1150 StartCallback startCB = null; 1151 Queue<OutgoingMessage> sentCBs = null; 1152 MessageInfo info = new MessageInfo(); 1153 int dispatchCount = 0; 1154 1155 ThreadPoolMessage msg = new ThreadPoolMessage(this); 1156 try 1157 { 1158 lock(this) 1159 { 1160 if(!msg.startIOScope(ref current)) 1161 { 1162 return; 1163 } 1164 1165 if(_state >= StateClosed) 1166 { 1167 return; 1168 } 1169 1170 int readyOp = current.operation; 1171 try 1172 { 1173 unscheduleTimeout(current.operation); 1174 1175 int writeOp = SocketOperation.None; 1176 int readOp = SocketOperation.None; 1177 if((readyOp & SocketOperation.Write) != 0) 1178 { 1179 if(_observer != null) 1180 { 1181 observerStartWrite(_writeStream.getBuffer()); 1182 } 1183 writeOp = write(_writeStream.getBuffer()); 1184 if(_observer != null && (writeOp & SocketOperation.Write) == 0) 1185 { 1186 observerFinishWrite(_writeStream.getBuffer()); 1187 } 1188 } 1189 1190 while((readyOp & SocketOperation.Read) != 0) 1191 { 1192 IceInternal.Buffer buf = _readStream.getBuffer(); 1193 1194 if(_observer != null && !_readHeader) 1195 { 1196 observerStartRead(buf); 1197 } 1198 1199 readOp = read(buf); 1200 if((readOp & SocketOperation.Read) != 0) 1201 { 1202 break; 1203 } 1204 if(_observer != null && !_readHeader) 1205 { 1206 Debug.Assert(!buf.b.hasRemaining()); 1207 observerFinishRead(buf); 1208 } 1209 1210 if(_readHeader) // Read header if necessary. 1211 { 1212 _readHeader = false; 1213 1214 if(_observer != null) 1215 { 1216 _observer.receivedBytes(Protocol.headerSize); 1217 } 1218 1219 int pos = _readStream.pos(); 1220 if(pos < Protocol.headerSize) 1221 { 1222 // 1223 // This situation is possible for small UDP packets. 1224 // 1225 throw new IllegalMessageSizeException(); 1226 } 1227 1228 _readStream.pos(0); 1229 byte[] m = new byte[4]; 1230 m[0] = _readStream.readByte(); 1231 m[1] = _readStream.readByte(); 1232 m[2] = _readStream.readByte(); 1233 m[3] = _readStream.readByte(); 1234 if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] || 1235 m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) 1236 { 1237 BadMagicException ex = new BadMagicException(); 1238 ex.badMagic = m; 1239 throw ex; 1240 } 1241 1242 ProtocolVersion pv = new ProtocolVersion(); 1243 pv.ice_readMembers(_readStream); 1244 Protocol.checkSupportedProtocol(pv); 1245 EncodingVersion ev = new EncodingVersion(); 1246 ev.ice_readMembers(_readStream); 1247 Protocol.checkSupportedProtocolEncoding(ev); 1248 1249 _readStream.readByte(); // messageType 1250 _readStream.readByte(); // compress 1251 int size = _readStream.readInt(); 1252 if(size < Protocol.headerSize) 1253 { 1254 throw new IllegalMessageSizeException(); 1255 } 1256 if(size > _messageSizeMax) 1257 { 1258 Ex.throwMemoryLimitException(size, _messageSizeMax); 1259 } 1260 if(size > _readStream.size()) 1261 { 1262 _readStream.resize(size); 1263 } 1264 _readStream.pos(pos); 1265 } 1266 1267 if(buf.b.hasRemaining()) 1268 { 1269 if(_endpoint.datagram()) 1270 { 1271 throw new DatagramLimitException(); // The message was truncated. 1272 } 1273 continue; 1274 } 1275 break; 1276 } 1277 1278 int newOp = readOp | writeOp; 1279 readyOp &= ~newOp; 1280 Debug.Assert(readyOp != 0 || newOp != 0); 1281 1282 if(_state <= StateNotValidated) 1283 { 1284 if(newOp != 0) 1285 { 1286 // 1287 // Wait for all the transceiver conditions to be 1288 // satisfied before continuing. 1289 // 1290 scheduleTimeout(newOp); 1291 _threadPool.update(this, current.operation, newOp); 1292 return; 1293 } 1294 1295 if(_state == StateNotInitialized && !initialize(current.operation)) 1296 { 1297 return; 1298 } 1299 1300 if(_state <= StateNotValidated && !validate(current.operation)) 1301 { 1302 return; 1303 } 1304 1305 _threadPool.unregister(this, current.operation); 1306 1307 // 1308 // We start out in holding state. 1309 // 1310 setState(StateHolding); 1311 if(_startCallback != null) 1312 { 1313 startCB = _startCallback; 1314 _startCallback = null; 1315 if(startCB != null) 1316 { 1317 ++dispatchCount; 1318 } 1319 } 1320 } 1321 else 1322 { 1323 Debug.Assert(_state <= StateClosingPending); 1324 1325 // 1326 // We parse messages first, if we receive a close 1327 // connection message we won't send more messages. 1328 // 1329 if((readyOp & SocketOperation.Read) != 0) 1330 { 1331 newOp |= parseMessage(ref info); 1332 dispatchCount += info.messageDispatchCount; 1333 } 1334 1335 if((readyOp & SocketOperation.Write) != 0) 1336 { 1337 newOp |= sendNextMessage(out sentCBs); 1338 if(sentCBs != null) 1339 { 1340 ++dispatchCount; 1341 } 1342 } 1343 1344 if(_state < StateClosed) 1345 { 1346 scheduleTimeout(newOp); 1347 _threadPool.update(this, current.operation, newOp); 1348 } 1349 } 1350 1351 if(_acmLastActivity > -1) 1352 { 1353 _acmLastActivity = Time.currentMonotonicTimeMillis(); 1354 } 1355 1356 if(dispatchCount == 0) 1357 { 1358 return; // Nothing to dispatch we're done! 1359 } 1360 1361 _dispatchCount += dispatchCount; 1362 1363 msg.completed(ref current); 1364 } 1365 catch(DatagramLimitException) // Expected. 1366 { 1367 if(_warnUdp) 1368 { 1369 _logger.warning(string.Format("maximum datagram size of {0} exceeded", _readStream.pos())); 1370 } 1371 _readStream.resize(Protocol.headerSize); 1372 _readStream.pos(0); 1373 _readHeader = true; 1374 return; 1375 } 1376 catch(SocketException ex) 1377 { 1378 setState(StateClosed, ex); 1379 return; 1380 } 1381 catch(LocalException ex) 1382 { 1383 if(_endpoint.datagram()) 1384 { 1385 if(_warn) 1386 { 1387 _logger.warning(string.Format("datagram connection exception:\n{0}\n{1}", ex, _desc)); 1388 } 1389 _readStream.resize(Protocol.headerSize); 1390 _readStream.pos(0); 1391 _readHeader = true; 1392 } 1393 else 1394 { 1395 setState(StateClosed, ex); 1396 } 1397 return; 1398 } 1399 1400 ThreadPoolCurrent c = current; 1401 _threadPool.dispatch(() => 1402 { 1403 dispatch(startCB, sentCBs, info); 1404 msg.destroy(ref c); 1405 }, this); 1406 } 1407 } 1408 finally 1409 { 1410 msg.finishIOScope(ref current); 1411 } 1412 1413 } 1414 dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)1415 private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info) 1416 { 1417 int dispatchedCount = 0; 1418 1419 // 1420 // Notify the factory that the connection establishment and 1421 // validation has completed. 1422 // 1423 if(startCB != null) 1424 { 1425 startCB.connectionStartCompleted(this); 1426 ++dispatchedCount; 1427 } 1428 1429 // 1430 // Notify AMI calls that the message was sent. 1431 // 1432 if(sentCBs != null) 1433 { 1434 foreach(OutgoingMessage m in sentCBs) 1435 { 1436 if(m.invokeSent) 1437 { 1438 m.outAsync.invokeSent(); 1439 } 1440 if(m.receivedReply) 1441 { 1442 OutgoingAsync outAsync = (OutgoingAsync)m.outAsync; 1443 if(outAsync.response()) 1444 { 1445 outAsync.invokeResponse(); 1446 } 1447 } 1448 } 1449 ++dispatchedCount; 1450 } 1451 1452 // 1453 // Asynchronous replies must be handled outside the thread 1454 // synchronization, so that nested calls are possible. 1455 // 1456 if(info.outAsync != null) 1457 { 1458 info.outAsync.invokeResponse(); 1459 ++dispatchedCount; 1460 } 1461 1462 if(info.heartbeatCallback != null) 1463 { 1464 try 1465 { 1466 info.heartbeatCallback(this); 1467 } 1468 catch(System.Exception ex) 1469 { 1470 _logger.error("connection callback exception:\n" + ex + '\n' + _desc); 1471 } 1472 ++dispatchedCount; 1473 } 1474 1475 // 1476 // Method invocation (or multiple invocations for batch messages) 1477 // must be done outside the thread synchronization, so that nested 1478 // calls are possible. 1479 // 1480 if(info.invokeNum > 0) 1481 { 1482 invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, 1483 info.adapter); 1484 1485 // 1486 // Don't increase dispatchedCount, the dispatch count is 1487 // decreased when the incoming reply is sent. 1488 // 1489 } 1490 1491 // 1492 // Decrease dispatch count. 1493 // 1494 if(dispatchedCount > 0) 1495 { 1496 lock(this) 1497 { 1498 _dispatchCount -= dispatchedCount; 1499 if(_dispatchCount == 0) 1500 { 1501 // 1502 // Only initiate shutdown if not already done. It 1503 // might have already been done if the sent callback 1504 // or AMI callback was dispatched when the connection 1505 // was already in the closing state. 1506 // 1507 if(_state == StateClosing) 1508 { 1509 try 1510 { 1511 initiateShutdown(); 1512 } 1513 catch(Ice.LocalException ex) 1514 { 1515 setState(StateClosed, ex); 1516 } 1517 } 1518 else if(_state == StateFinished) 1519 { 1520 reap(); 1521 } 1522 Monitor.PulseAll(this); 1523 } 1524 } 1525 } 1526 } 1527 finished(ref ThreadPoolCurrent current)1528 public override void finished(ref ThreadPoolCurrent current) 1529 { 1530 lock(this) 1531 { 1532 Debug.Assert(_state == StateClosed); 1533 unscheduleTimeout(SocketOperation.Read | SocketOperation.Write); 1534 } 1535 1536 // 1537 // If there are no callbacks to call, we don't call ioCompleted() since we're not going 1538 // to call code that will potentially block (this avoids promoting a new leader and 1539 // unecessary thread creation, especially if this is called on shutdown). 1540 // 1541 if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && 1542 _closeCallback == null && _heartbeatCallback == null) 1543 { 1544 finish(); 1545 return; 1546 } 1547 1548 // 1549 // Unlike C++/Java, this method is called from an IO thread of the .NET thread 1550 // pool of from the communicator async IO thread. While it's fine to handle the 1551 // non-blocking activity of the connection from these threads, the dispatching 1552 // of the message must be taken care of by the Ice thread pool. 1553 // 1554 _threadPool.dispatch(finish, this); 1555 } 1556 finish()1557 private void finish() 1558 { 1559 if(!_initialized) 1560 { 1561 if(_instance.traceLevels().network >= 2) 1562 { 1563 StringBuilder s = new StringBuilder("failed to "); 1564 s.Append(_connector != null ? "establish" : "accept"); 1565 s.Append(" "); 1566 s.Append(_endpoint.protocol()); 1567 s.Append(" connection\n"); 1568 s.Append(ToString()); 1569 s.Append("\n"); 1570 s.Append(_exception); 1571 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1572 } 1573 } 1574 else 1575 { 1576 if(_instance.traceLevels().network >= 1) 1577 { 1578 StringBuilder s = new StringBuilder("closed "); 1579 s.Append(_endpoint.protocol()); 1580 s.Append(" connection\n"); 1581 s.Append(ToString()); 1582 1583 // 1584 // Trace the cause of unexpected connection closures 1585 // 1586 if(!(_exception is CloseConnectionException || 1587 _exception is ConnectionManuallyClosedException || 1588 _exception is ConnectionTimeoutException || 1589 _exception is CommunicatorDestroyedException || 1590 _exception is ObjectAdapterDeactivatedException)) 1591 { 1592 s.Append("\n"); 1593 s.Append(_exception); 1594 } 1595 1596 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1597 } 1598 } 1599 1600 if(_startCallback != null) 1601 { 1602 _startCallback.connectionStartFailed(this, _exception); 1603 _startCallback = null; 1604 } 1605 1606 if(_sendStreams.Count > 0) 1607 { 1608 if(!_writeStream.isEmpty()) 1609 { 1610 // 1611 // Return the stream to the outgoing call. This is important for 1612 // retriable AMI calls which are not marshalled again. 1613 // 1614 OutgoingMessage message = _sendStreams.First.Value; 1615 _writeStream.swap(message.stream); 1616 1617 // 1618 // The current message might be sent but not yet removed from _sendStreams. If 1619 // the response has been received in the meantime, we remove the message from 1620 // _sendStreams to not call finished on a message which is already done. 1621 // 1622 if(message.isSent || message.receivedReply) 1623 { 1624 if(message.sent() && message.invokeSent) 1625 { 1626 message.outAsync.invokeSent(); 1627 } 1628 if(message.receivedReply) 1629 { 1630 OutgoingAsync outAsync = (OutgoingAsync)message.outAsync; 1631 if(outAsync.response()) 1632 { 1633 outAsync.invokeResponse(); 1634 } 1635 } 1636 _sendStreams.RemoveFirst(); 1637 } 1638 } 1639 1640 foreach (OutgoingMessage o in _sendStreams) 1641 { 1642 o.completed(_exception); 1643 if(o.requestId > 0) // Make sure finished isn't called twice. 1644 { 1645 _asyncRequests.Remove(o.requestId); 1646 } 1647 } 1648 _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage 1649 } 1650 1651 foreach(OutgoingAsyncBase o in _asyncRequests.Values) 1652 { 1653 if(o.exception(_exception)) 1654 { 1655 o.invokeException(); 1656 } 1657 } 1658 _asyncRequests.Clear(); 1659 1660 // 1661 // Don't wait to be reaped to reclaim memory allocated by read/write streams. 1662 // 1663 _writeStream.clear(); 1664 _writeStream.getBuffer().clear(); 1665 _readStream.clear(); 1666 _readStream.getBuffer().clear(); 1667 _incomingCache = null; 1668 1669 if(_closeCallback != null) 1670 { 1671 try 1672 { 1673 _closeCallback(this); 1674 } 1675 catch(System.Exception ex) 1676 { 1677 _logger.error("connection callback exception:\n" + ex + '\n' + _desc); 1678 } 1679 _closeCallback = null; 1680 } 1681 1682 _heartbeatCallback = null; 1683 1684 // 1685 // This must be done last as this will cause waitUntilFinished() to return (and communicator 1686 // objects such as the timer might be destroyed too). 1687 // 1688 lock(this) 1689 { 1690 setState(StateFinished); 1691 1692 if(_dispatchCount == 0) 1693 { 1694 reap(); 1695 } 1696 } 1697 } 1698 ToString()1699 public override string ToString() 1700 { 1701 return _desc; // No mutex lock, _desc is immutable. 1702 } 1703 timedOut()1704 public void timedOut() 1705 { 1706 lock(this) 1707 { 1708 if(_state <= StateNotValidated) 1709 { 1710 setState(StateClosed, new ConnectTimeoutException()); 1711 } 1712 else if(_state < StateClosing) 1713 { 1714 setState(StateClosed, new TimeoutException()); 1715 } 1716 else if(_state < StateClosed) 1717 { 1718 setState(StateClosed, new CloseTimeoutException()); 1719 } 1720 } 1721 } 1722 type()1723 public string type() 1724 { 1725 return _type; // No mutex lock, _type is immutable. 1726 } 1727 timeout()1728 public int timeout() 1729 { 1730 return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable. 1731 } 1732 getInfo()1733 public ConnectionInfo getInfo() 1734 { 1735 lock(this) 1736 { 1737 if(_state >= StateClosed) 1738 { 1739 throw _exception; 1740 } 1741 return initConnectionInfo(); 1742 } 1743 } 1744 setBufferSize(int rcvSize, int sndSize)1745 public void setBufferSize(int rcvSize, int sndSize) 1746 { 1747 lock(this) 1748 { 1749 if(_state >= StateClosed) 1750 { 1751 throw _exception; 1752 } 1753 _transceiver.setBufferSize(rcvSize, sndSize); 1754 _info = null; // Invalidate the cached connection info 1755 } 1756 } 1757 ice_toString_()1758 public string ice_toString_() 1759 { 1760 return ToString(); 1761 } 1762 exception(LocalException ex)1763 public void exception(LocalException ex) 1764 { 1765 lock(this) 1766 { 1767 setState(StateClosed, ex); 1768 } 1769 } 1770 getThreadPool()1771 public IceInternal.ThreadPool getThreadPool() 1772 { 1773 return _threadPool; 1774 } 1775 ConnectionI()1776 static ConnectionI() 1777 { 1778 _compressionSupported = IceInternal.BZip2.supported(); 1779 } 1780 ConnectionI(Communicator communicator, Instance instance, ACMMonitor monitor, Transceiver transceiver, Connector connector, EndpointI endpoint, ObjectAdapterI adapter)1781 internal ConnectionI(Communicator communicator, Instance instance, ACMMonitor monitor, Transceiver transceiver, 1782 Connector connector, EndpointI endpoint, ObjectAdapterI adapter) 1783 { 1784 _communicator = communicator; 1785 _instance = instance; 1786 _monitor = monitor; 1787 _transceiver = transceiver; 1788 _desc = transceiver.ToString(); 1789 _type = transceiver.protocol(); 1790 _connector = connector; 1791 _endpoint = endpoint; 1792 _adapter = adapter; 1793 InitializationData initData = instance.initializationData(); 1794 _logger = initData.logger; // Cached for better performance. 1795 _traceLevels = instance.traceLevels(); // Cached for better performance. 1796 _timer = instance.timer(); 1797 _writeTimeout = new TimeoutCallback(this); 1798 _writeTimeoutScheduled = false; 1799 _readTimeout = new TimeoutCallback(this); 1800 _readTimeoutScheduled = false; 1801 _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0; 1802 _warnUdp = initData.properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; 1803 _cacheBuffers = instance.cacheMessageBuffers() > 0; 1804 if(_monitor != null && _monitor.getACM().timeout > 0) 1805 { 1806 _acmLastActivity = Time.currentMonotonicTimeMillis(); 1807 } 1808 else 1809 { 1810 _acmLastActivity = -1; 1811 } 1812 _nextRequestId = 1; 1813 _messageSizeMax = adapter != null ? adapter.messageSizeMax() : instance.messageSizeMax(); 1814 _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram()); 1815 _readStream = new InputStream(instance, Util.currentProtocolEncoding); 1816 _readHeader = false; 1817 _readStreamPos = -1; 1818 _writeStream = new OutputStream(instance, Util.currentProtocolEncoding); 1819 _writeStreamPos = -1; 1820 _dispatchCount = 0; 1821 _state = StateNotInitialized; 1822 1823 _compressionLevel = initData.properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1); 1824 if(_compressionLevel < 1) 1825 { 1826 _compressionLevel = 1; 1827 } 1828 else if(_compressionLevel > 9) 1829 { 1830 _compressionLevel = 9; 1831 } 1832 1833 if(adapter != null) 1834 { 1835 _servantManager = adapter.getServantManager(); 1836 } 1837 1838 try 1839 { 1840 if(adapter != null) 1841 { 1842 _threadPool = adapter.getThreadPool(); 1843 } 1844 else 1845 { 1846 _threadPool = instance.clientThreadPool(); 1847 } 1848 _threadPool.initialize(this); 1849 } 1850 catch(LocalException) 1851 { 1852 throw; 1853 } 1854 catch(System.Exception ex) 1855 { 1856 throw new SyscallException(ex); 1857 } 1858 } 1859 1860 private const int StateNotInitialized = 0; 1861 private const int StateNotValidated = 1; 1862 private const int StateActive = 2; 1863 private const int StateHolding = 3; 1864 private const int StateClosing = 4; 1865 private const int StateClosingPending = 5; 1866 private const int StateClosed = 6; 1867 private const int StateFinished = 7; 1868 setState(int state, LocalException ex)1869 private void setState(int state, LocalException ex) 1870 { 1871 // 1872 // If setState() is called with an exception, then only closed 1873 // and closing states are permissible. 1874 // 1875 Debug.Assert(state >= StateClosing); 1876 1877 if(_state == state) // Don't switch twice. 1878 { 1879 return; 1880 } 1881 1882 if(_exception == null) 1883 { 1884 // 1885 // If we are in closed state, an exception must be set. 1886 // 1887 Debug.Assert(_state != StateClosed); 1888 1889 _exception = ex; 1890 1891 // 1892 // We don't warn if we are not validated. 1893 // 1894 if(_warn && _validated) 1895 { 1896 // 1897 // Don't warn about certain expected exceptions. 1898 // 1899 if(!(_exception is CloseConnectionException || 1900 _exception is ConnectionManuallyClosedException || 1901 _exception is ConnectionTimeoutException || 1902 _exception is CommunicatorDestroyedException || 1903 _exception is ObjectAdapterDeactivatedException || 1904 (_exception is ConnectionLostException && _state >= StateClosing))) 1905 { 1906 warning("connection exception", _exception); 1907 } 1908 } 1909 } 1910 1911 // 1912 // We must set the new state before we notify requests of any 1913 // exceptions. Otherwise new requests may retry on a 1914 // connection that is not yet marked as closed or closing. 1915 // 1916 setState(state); 1917 } 1918 setState(int state)1919 private void setState(int state) 1920 { 1921 // 1922 // We don't want to send close connection messages if the endpoint 1923 // only supports oneway transmission from client to server. 1924 // 1925 if(_endpoint.datagram() && state == StateClosing) 1926 { 1927 state = StateClosed; 1928 } 1929 1930 // 1931 // Skip graceful shutdown if we are destroyed before validation. 1932 // 1933 if(_state <= StateNotValidated && state == StateClosing) 1934 { 1935 state = StateClosed; 1936 } 1937 1938 if(_state == state) // Don't switch twice. 1939 { 1940 return; 1941 } 1942 1943 try 1944 { 1945 switch(state) 1946 { 1947 case StateNotInitialized: 1948 { 1949 Debug.Assert(false); 1950 break; 1951 } 1952 1953 case StateNotValidated: 1954 { 1955 if(_state != StateNotInitialized) 1956 { 1957 Debug.Assert(_state == StateClosed); 1958 return; 1959 } 1960 break; 1961 } 1962 1963 case StateActive: 1964 { 1965 // 1966 // Can only switch from holding or not validated to 1967 // active. 1968 // 1969 if(_state != StateHolding && _state != StateNotValidated) 1970 { 1971 return; 1972 } 1973 _threadPool.register(this, SocketOperation.Read); 1974 break; 1975 } 1976 1977 case StateHolding: 1978 { 1979 // 1980 // Can only switch from active or not validated to 1981 // holding. 1982 // 1983 if(_state != StateActive && _state != StateNotValidated) 1984 { 1985 return; 1986 } 1987 if(_state == StateActive) 1988 { 1989 _threadPool.unregister(this, SocketOperation.Read); 1990 } 1991 break; 1992 } 1993 1994 case StateClosing: 1995 case StateClosingPending: 1996 { 1997 // 1998 // Can't change back from closing pending. 1999 // 2000 if(_state >= StateClosingPending) 2001 { 2002 return; 2003 } 2004 break; 2005 } 2006 2007 case StateClosed: 2008 { 2009 if(_state == StateFinished) 2010 { 2011 return; 2012 } 2013 2014 _batchRequestQueue.destroy(_exception); 2015 _threadPool.finish(this); 2016 _transceiver.close(); 2017 break; 2018 } 2019 2020 case StateFinished: 2021 { 2022 Debug.Assert(_state == StateClosed); 2023 _transceiver.destroy(); 2024 _communicator = null; 2025 break; 2026 } 2027 } 2028 } 2029 catch(LocalException ex) 2030 { 2031 _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString()); 2032 } 2033 2034 // 2035 // We only register with the connection monitor if our new state 2036 // is StateActive. Otherwise we unregister with the connection 2037 // monitor, but only if we were registered before, i.e., if our 2038 // old state was StateActive. 2039 // 2040 if(_monitor != null) 2041 { 2042 if(state == StateActive) 2043 { 2044 if(_acmLastActivity > -1) 2045 { 2046 _acmLastActivity = Time.currentMonotonicTimeMillis(); 2047 } 2048 _monitor.add(this); 2049 } 2050 else if(_state == StateActive) 2051 { 2052 _monitor.remove(this); 2053 } 2054 } 2055 2056 if(_instance.initializationData().observer != null) 2057 { 2058 ConnectionState oldState = toConnectionState(_state); 2059 ConnectionState newState = toConnectionState(state); 2060 if(oldState != newState) 2061 { 2062 _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), 2063 _endpoint, 2064 newState, 2065 _observer); 2066 if(_observer != null) 2067 { 2068 _observer.attach(); 2069 } 2070 else 2071 { 2072 _writeStreamPos = -1; 2073 _readStreamPos = -1; 2074 } 2075 } 2076 if(_observer != null && state == StateClosed && _exception != null) 2077 { 2078 if(!(_exception is CloseConnectionException || 2079 _exception is ConnectionManuallyClosedException || 2080 _exception is ConnectionTimeoutException || 2081 _exception is CommunicatorDestroyedException || 2082 _exception is ObjectAdapterDeactivatedException || 2083 (_exception is ConnectionLostException && _state >= StateClosing))) 2084 { 2085 _observer.failed(_exception.ice_id()); 2086 } 2087 } 2088 } 2089 _state = state; 2090 2091 Monitor.PulseAll(this); 2092 2093 if(_state == StateClosing && _dispatchCount == 0) 2094 { 2095 try 2096 { 2097 initiateShutdown(); 2098 } 2099 catch(LocalException ex) 2100 { 2101 setState(StateClosed, ex); 2102 } 2103 } 2104 } 2105 initiateShutdown()2106 private void initiateShutdown() 2107 { 2108 Debug.Assert(_state == StateClosing && _dispatchCount == 0); 2109 2110 if(_shutdownInitiated) 2111 { 2112 return; 2113 } 2114 _shutdownInitiated = true; 2115 2116 if(!_endpoint.datagram()) 2117 { 2118 // 2119 // Before we shut down, we send a close connection message. 2120 // 2121 OutputStream os = new OutputStream(_instance, Util.currentProtocolEncoding); 2122 os.writeBlob(Protocol.magic); 2123 Util.currentProtocol.ice_writeMembers(os); 2124 Util.currentProtocolEncoding.ice_writeMembers(os); 2125 os.writeByte(Protocol.closeConnectionMsg); 2126 os.writeByte(_compressionSupported ? (byte)1 : (byte)0); 2127 os.writeInt(Protocol.headerSize); // Message size. 2128 2129 if((sendMessage(new OutgoingMessage(os, false, false)) & OutgoingAsyncBase.AsyncStatusSent) != 0) 2130 { 2131 setState(StateClosingPending); 2132 2133 // 2134 // Notify the transceiver of the graceful connection closure. 2135 // 2136 int op = _transceiver.closing(true, _exception); 2137 if(op != 0) 2138 { 2139 scheduleTimeout(op); 2140 _threadPool.register(this, op); 2141 } 2142 } 2143 } 2144 } 2145 sendHeartbeatNow()2146 private void sendHeartbeatNow() 2147 { 2148 Debug.Assert(_state == StateActive); 2149 2150 if(!_endpoint.datagram()) 2151 { 2152 OutputStream os = new OutputStream(_instance, Util.currentProtocolEncoding); 2153 os.writeBlob(Protocol.magic); 2154 Util.currentProtocol.ice_writeMembers(os); 2155 Util.currentProtocolEncoding.ice_writeMembers(os); 2156 os.writeByte(Protocol.validateConnectionMsg); 2157 os.writeByte(0); 2158 os.writeInt(Protocol.headerSize); // Message size. 2159 try 2160 { 2161 sendMessage(new OutgoingMessage(os, false, false)); 2162 } 2163 catch(LocalException ex) 2164 { 2165 setState(StateClosed, ex); 2166 Debug.Assert(_exception != null); 2167 } 2168 } 2169 } 2170 initialize(int operation)2171 private bool initialize(int operation) 2172 { 2173 int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData); 2174 if(s != SocketOperation.None) 2175 { 2176 scheduleTimeout(s); 2177 _threadPool.update(this, operation, s); 2178 return false; 2179 } 2180 2181 // 2182 // Update the connection description once the transceiver is initialized. 2183 // 2184 _desc = _transceiver.ToString(); 2185 _initialized = true; 2186 setState(StateNotValidated); 2187 2188 return true; 2189 } 2190 validate(int operation)2191 private bool validate(int operation) 2192 { 2193 if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. 2194 { 2195 if(_adapter != null) // The server side has the active role for connection validation. 2196 { 2197 if(_writeStream.size() == 0) 2198 { 2199 _writeStream.writeBlob(Protocol.magic); 2200 Util.currentProtocol.ice_writeMembers(_writeStream); 2201 Util.currentProtocolEncoding.ice_writeMembers(_writeStream); 2202 _writeStream.writeByte(Protocol.validateConnectionMsg); 2203 _writeStream.writeByte(0); // Compression status (always zero for validate connection). 2204 _writeStream.writeInt(Protocol.headerSize); // Message size. 2205 TraceUtil.traceSend(_writeStream, _logger, _traceLevels); 2206 _writeStream.prepareWrite(); 2207 } 2208 2209 if(_observer != null) 2210 { 2211 observerStartWrite(_writeStream.getBuffer()); 2212 } 2213 2214 if(_writeStream.pos() != _writeStream.size()) 2215 { 2216 int op = write(_writeStream.getBuffer()); 2217 if(op != 0) 2218 { 2219 scheduleTimeout(op); 2220 _threadPool.update(this, operation, op); 2221 return false; 2222 } 2223 } 2224 2225 if(_observer != null) 2226 { 2227 observerFinishWrite(_writeStream.getBuffer()); 2228 } 2229 } 2230 else // The client side has the passive role for connection validation. 2231 { 2232 if(_readStream.size() == 0) 2233 { 2234 _readStream.resize(Protocol.headerSize); 2235 _readStream.pos(0); 2236 } 2237 2238 if(_observer != null) 2239 { 2240 observerStartRead(_readStream.getBuffer()); 2241 } 2242 2243 if(_readStream.pos() != _readStream.size()) 2244 { 2245 int op = read(_readStream.getBuffer()); 2246 if(op != 0) 2247 { 2248 scheduleTimeout(op); 2249 _threadPool.update(this, operation, op); 2250 return false; 2251 } 2252 } 2253 2254 if(_observer != null) 2255 { 2256 observerFinishRead(_readStream.getBuffer()); 2257 } 2258 2259 Debug.Assert(_readStream.pos() == Protocol.headerSize); 2260 _readStream.pos(0); 2261 byte[] m = _readStream.readBlob(4); 2262 if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] || 2263 m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) 2264 { 2265 BadMagicException ex = new BadMagicException(); 2266 ex.badMagic = m; 2267 throw ex; 2268 } 2269 2270 ProtocolVersion pv = new ProtocolVersion(); 2271 pv.ice_readMembers(_readStream); 2272 Protocol.checkSupportedProtocol(pv); 2273 2274 EncodingVersion ev = new EncodingVersion(); 2275 ev.ice_readMembers(_readStream); 2276 Protocol.checkSupportedProtocolEncoding(ev); 2277 2278 byte messageType = _readStream.readByte(); 2279 if(messageType != Protocol.validateConnectionMsg) 2280 { 2281 throw new ConnectionNotValidatedException(); 2282 } 2283 _readStream.readByte(); // Ignore compression status for validate connection. 2284 int size = _readStream.readInt(); 2285 if(size != Protocol.headerSize) 2286 { 2287 throw new IllegalMessageSizeException(); 2288 } 2289 TraceUtil.traceRecv(_readStream, _logger, _traceLevels); 2290 2291 _validated = true; 2292 } 2293 } 2294 2295 _writeStream.resize(0); 2296 _writeStream.pos(0); 2297 2298 _readStream.resize(Protocol.headerSize); 2299 _readStream.pos(0); 2300 _readHeader = true; 2301 2302 if(_instance.traceLevels().network >= 1) 2303 { 2304 StringBuilder s = new StringBuilder(); 2305 if(_endpoint.datagram()) 2306 { 2307 s.Append("starting to "); 2308 s.Append(_connector != null ? "send" : "receive"); 2309 s.Append(" "); 2310 s.Append(_endpoint.protocol()); 2311 s.Append(" messages\n"); 2312 s.Append(_transceiver.toDetailedString()); 2313 } 2314 else 2315 { 2316 s.Append(_connector != null ? "established" : "accepted"); 2317 s.Append(" "); 2318 s.Append(_endpoint.protocol()); 2319 s.Append(" connection\n"); 2320 s.Append(ToString()); 2321 } 2322 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 2323 } 2324 2325 return true; 2326 } 2327 sendNextMessage(out Queue<OutgoingMessage> callbacks)2328 private int sendNextMessage(out Queue<OutgoingMessage> callbacks) 2329 { 2330 callbacks = null; 2331 2332 if(_sendStreams.Count == 0) 2333 { 2334 return SocketOperation.None; 2335 } 2336 else if(_state == StateClosingPending && _writeStream.pos() == 0) 2337 { 2338 // Message wasn't sent, empty the _writeStream, we're not going to send more data. 2339 OutgoingMessage message = _sendStreams.First.Value; 2340 _writeStream.swap(message.stream); 2341 return SocketOperation.None; 2342 } 2343 2344 Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); 2345 try 2346 { 2347 while(true) 2348 { 2349 // 2350 // Notify the message that it was sent. 2351 // 2352 OutgoingMessage message = _sendStreams.First.Value; 2353 _writeStream.swap(message.stream); 2354 if(message.sent()) 2355 { 2356 if(callbacks == null) 2357 { 2358 callbacks = new Queue<OutgoingMessage>(); 2359 } 2360 callbacks.Enqueue(message); 2361 } 2362 _sendStreams.RemoveFirst(); 2363 2364 // 2365 // If there's nothing left to send, we're done. 2366 // 2367 if(_sendStreams.Count == 0) 2368 { 2369 break; 2370 } 2371 2372 // 2373 // If we are in the closed state or if the close is 2374 // pending, don't continue sending. 2375 // 2376 // This can occur if parseMessage (called before 2377 // sendNextMessage by message()) closes the connection. 2378 // 2379 if(_state >= StateClosingPending) 2380 { 2381 return SocketOperation.None; 2382 } 2383 2384 // 2385 // Otherwise, prepare the next message stream for writing. 2386 // 2387 message = _sendStreams.First.Value; 2388 Debug.Assert(!message.prepared); 2389 OutputStream stream = message.stream; 2390 2391 message.stream = doCompress(message.stream, message.compress); 2392 message.stream.prepareWrite(); 2393 message.prepared = true; 2394 2395 TraceUtil.traceSend(stream, _logger, _traceLevels); 2396 _writeStream.swap(message.stream); 2397 2398 // 2399 // Send the message. 2400 // 2401 if(_observer != null) 2402 { 2403 observerStartWrite(_writeStream.getBuffer()); 2404 } 2405 if(_writeStream.pos() != _writeStream.size()) 2406 { 2407 int op = write(_writeStream.getBuffer()); 2408 if(op != 0) 2409 { 2410 return op; 2411 } 2412 } 2413 if(_observer != null) 2414 { 2415 observerFinishWrite(_writeStream.getBuffer()); 2416 } 2417 } 2418 2419 // 2420 // If all the messages were sent and we are in the closing state, we schedule 2421 // the close timeout to wait for the peer to close the connection. 2422 // 2423 if(_state == StateClosing && _shutdownInitiated) 2424 { 2425 setState(StateClosingPending); 2426 int op = _transceiver.closing(true, _exception); 2427 if(op != 0) 2428 { 2429 return op; 2430 } 2431 } 2432 } 2433 catch(LocalException ex) 2434 { 2435 setState(StateClosed, ex); 2436 } 2437 return SocketOperation.None; 2438 } 2439 sendMessage(OutgoingMessage message)2440 private int sendMessage(OutgoingMessage message) 2441 { 2442 Debug.Assert(_state < StateClosed); 2443 2444 if(_sendStreams.Count > 0) 2445 { 2446 message.adopt(); 2447 _sendStreams.AddLast(message); 2448 return OutgoingAsyncBase.AsyncStatusQueued; 2449 } 2450 2451 // 2452 // Attempt to send the message without blocking. If the send blocks, we use 2453 // asynchronous I/O or we request the caller to call finishSendMessage() outside 2454 // the synchronization. 2455 // 2456 2457 Debug.Assert(!message.prepared); 2458 2459 OutputStream stream = message.stream; 2460 2461 message.stream = doCompress(stream, message.compress); 2462 message.stream.prepareWrite(); 2463 message.prepared = true; 2464 2465 TraceUtil.traceSend(stream, _logger, _traceLevels); 2466 2467 // 2468 // Send the message without blocking. 2469 // 2470 if(_observer != null) 2471 { 2472 observerStartWrite(message.stream.getBuffer()); 2473 } 2474 int op = write(message.stream.getBuffer()); 2475 if(op == 0) 2476 { 2477 if(_observer != null) 2478 { 2479 observerFinishWrite(message.stream.getBuffer()); 2480 } 2481 2482 int status = OutgoingAsyncBase.AsyncStatusSent; 2483 if(message.sent()) 2484 { 2485 status = status | OutgoingAsyncBase.AsyncStatusInvokeSentCallback; 2486 } 2487 2488 if(_acmLastActivity > -1) 2489 { 2490 _acmLastActivity = Time.currentMonotonicTimeMillis(); 2491 } 2492 return status; 2493 } 2494 2495 message.adopt(); 2496 2497 _writeStream.swap(message.stream); 2498 _sendStreams.AddLast(message); 2499 scheduleTimeout(op); 2500 _threadPool.register(this, op); 2501 return OutgoingAsyncBase.AsyncStatusQueued; 2502 } 2503 doCompress(OutputStream uncompressed, bool compress)2504 private OutputStream doCompress(OutputStream uncompressed, bool compress) 2505 { 2506 if(_compressionSupported) 2507 { 2508 if(compress && uncompressed.size() >= 100) 2509 { 2510 // 2511 // Do compression. 2512 // 2513 IceInternal.Buffer cbuf = BZip2.compress(uncompressed.getBuffer(), Protocol.headerSize, 2514 _compressionLevel); 2515 if(cbuf != null) 2516 { 2517 OutputStream cstream = 2518 new OutputStream(uncompressed.instance(), uncompressed.getEncoding(), cbuf, true); 2519 2520 // 2521 // Set compression status. 2522 // 2523 cstream.pos(9); 2524 cstream.writeByte(2); 2525 2526 // 2527 // Write the size of the compressed stream into the header. 2528 // 2529 cstream.pos(10); 2530 cstream.writeInt(cstream.size()); 2531 2532 // 2533 // Write the compression status and size of the compressed stream into the header of the 2534 // uncompressed stream -- we need this to trace requests correctly. 2535 // 2536 uncompressed.pos(9); 2537 uncompressed.writeByte(2); 2538 uncompressed.writeInt(cstream.size()); 2539 2540 return cstream; 2541 } 2542 } 2543 } 2544 2545 uncompressed.pos(9); 2546 uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0)); 2547 2548 // 2549 // Not compressed, fill in the message size. 2550 // 2551 uncompressed.pos(10); 2552 uncompressed.writeInt(uncompressed.size()); 2553 2554 return uncompressed; 2555 } 2556 2557 private struct MessageInfo 2558 { 2559 public InputStream stream; 2560 public int invokeNum; 2561 public int requestId; 2562 public byte compress; 2563 public ServantManager servantManager; 2564 public ObjectAdapter adapter; 2565 public OutgoingAsyncBase outAsync; 2566 public HeartbeatCallback heartbeatCallback; 2567 public int messageDispatchCount; 2568 } 2569 parseMessage(ref MessageInfo info)2570 private int parseMessage(ref MessageInfo info) 2571 { 2572 Debug.Assert(_state > StateNotValidated && _state < StateClosed); 2573 2574 info.stream = new InputStream(_instance, Util.currentProtocolEncoding); 2575 _readStream.swap(info.stream); 2576 _readStream.resize(Protocol.headerSize); 2577 _readStream.pos(0); 2578 _readHeader = true; 2579 2580 Debug.Assert(info.stream.pos() == info.stream.size()); 2581 2582 // 2583 // Connection is validated on first message. This is only used by 2584 // setState() to check wether or not we can print a connection 2585 // warning (a client might close the connection forcefully if the 2586 // connection isn't validated). 2587 // 2588 _validated = true; 2589 2590 try 2591 { 2592 // 2593 // The magic and version fields have already been checked. 2594 // 2595 info.stream.pos(8); 2596 byte messageType = info.stream.readByte(); 2597 info.compress = info.stream.readByte(); 2598 if(info.compress == 2) 2599 { 2600 if(_compressionSupported) 2601 { 2602 IceInternal.Buffer ubuf = BZip2.uncompress(info.stream.getBuffer(), Protocol.headerSize, 2603 _messageSizeMax); 2604 info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true); 2605 } 2606 else 2607 { 2608 string lib = AssemblyUtil.isWindows ? "bzip2.dll" : "libbz2.so.1"; 2609 FeatureNotSupportedException ex = new FeatureNotSupportedException(); 2610 ex.unsupportedFeature = "Cannot uncompress compressed message: " + lib + " not found"; 2611 throw ex; 2612 } 2613 } 2614 info.stream.pos(Protocol.headerSize); 2615 2616 switch(messageType) 2617 { 2618 case Protocol.closeConnectionMsg: 2619 { 2620 TraceUtil.traceRecv(info.stream, _logger, _traceLevels); 2621 if(_endpoint.datagram()) 2622 { 2623 if(_warn) 2624 { 2625 _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); 2626 } 2627 } 2628 else 2629 { 2630 setState(StateClosingPending, new CloseConnectionException()); 2631 2632 // 2633 // Notify the transceiver of the graceful connection closure. 2634 // 2635 int op = _transceiver.closing(false, _exception); 2636 if(op != 0) 2637 { 2638 return op; 2639 } 2640 setState(StateClosed); 2641 } 2642 break; 2643 } 2644 2645 case Protocol.requestMsg: 2646 { 2647 if(_state >= StateClosing) 2648 { 2649 TraceUtil.trace("received request during closing\n" + 2650 "(ignored by server, client will retry)", info.stream, _logger, 2651 _traceLevels); 2652 } 2653 else 2654 { 2655 TraceUtil.traceRecv(info.stream, _logger, _traceLevels); 2656 info.requestId = info.stream.readInt(); 2657 info.invokeNum = 1; 2658 info.servantManager = _servantManager; 2659 info.adapter = _adapter; 2660 ++info.messageDispatchCount; 2661 } 2662 break; 2663 } 2664 2665 case Protocol.requestBatchMsg: 2666 { 2667 if(_state >= StateClosing) 2668 { 2669 TraceUtil.trace("received batch request during closing\n" + 2670 "(ignored by server, client will retry)", info.stream, _logger, 2671 _traceLevels); 2672 } 2673 else 2674 { 2675 TraceUtil.traceRecv(info.stream, _logger, _traceLevels); 2676 info.invokeNum = info.stream.readInt(); 2677 if(info.invokeNum < 0) 2678 { 2679 info.invokeNum = 0; 2680 throw new UnmarshalOutOfBoundsException(); 2681 } 2682 info.servantManager = _servantManager; 2683 info.adapter = _adapter; 2684 info.messageDispatchCount += info.invokeNum; 2685 } 2686 break; 2687 } 2688 2689 case Protocol.replyMsg: 2690 { 2691 TraceUtil.traceRecv(info.stream, _logger, _traceLevels); 2692 info.requestId = info.stream.readInt(); 2693 if(_asyncRequests.TryGetValue(info.requestId, out info.outAsync)) 2694 { 2695 _asyncRequests.Remove(info.requestId); 2696 2697 info.outAsync.getIs().swap(info.stream); 2698 2699 // 2700 // If we just received the reply for a request which isn't acknowledge as 2701 // sent yet, we queue the reply instead of processing it right away. It 2702 // will be processed once the write callback is invoked for the message. 2703 // 2704 OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null; 2705 if(message != null && message.outAsync == info.outAsync) 2706 { 2707 message.receivedReply = true; 2708 } 2709 else if(info.outAsync.response()) 2710 { 2711 ++info.messageDispatchCount; 2712 } 2713 else 2714 { 2715 info.outAsync = null; 2716 } 2717 Monitor.PulseAll(this); // Notify threads blocked in close() 2718 } 2719 break; 2720 } 2721 2722 case Protocol.validateConnectionMsg: 2723 { 2724 TraceUtil.traceRecv(info.stream, _logger, _traceLevels); 2725 if(_heartbeatCallback != null) 2726 { 2727 info.heartbeatCallback = _heartbeatCallback; 2728 ++info.messageDispatchCount; 2729 } 2730 break; 2731 } 2732 2733 default: 2734 { 2735 TraceUtil.trace("received unknown message\n(invalid, closing connection)", 2736 info.stream, _logger, _traceLevels); 2737 throw new UnknownMessageException(); 2738 } 2739 } 2740 } 2741 catch(LocalException ex) 2742 { 2743 if(_endpoint.datagram()) 2744 { 2745 if(_warn) 2746 { 2747 _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc); 2748 } 2749 } 2750 else 2751 { 2752 setState(StateClosed, ex); 2753 } 2754 } 2755 2756 return _state == StateHolding ? SocketOperation.None : SocketOperation.Read; 2757 } 2758 invokeAll(InputStream stream, int invokeNum, int requestId, byte compress, ServantManager servantManager, ObjectAdapter adapter)2759 private void invokeAll(InputStream stream, int invokeNum, int requestId, byte compress, 2760 ServantManager servantManager, ObjectAdapter adapter) 2761 { 2762 // 2763 // Note: In contrast to other private or protected methods, this 2764 // operation must be called *without* the mutex locked. 2765 // 2766 2767 Incoming inc = null; 2768 try 2769 { 2770 while(invokeNum > 0) 2771 { 2772 // 2773 // Prepare the invocation. 2774 // 2775 bool response = !_endpoint.datagram() && requestId != 0; 2776 Debug.Assert(!response || invokeNum == 1); 2777 2778 inc = getIncoming(adapter, response, compress, requestId); 2779 2780 // 2781 // Dispatch the invocation. 2782 // 2783 inc.invoke(servantManager, stream); 2784 2785 --invokeNum; 2786 2787 reclaimIncoming(inc); 2788 inc = null; 2789 } 2790 2791 stream.clear(); 2792 } 2793 catch(LocalException ex) 2794 { 2795 invokeException(requestId, ex, invokeNum, false); 2796 } 2797 finally 2798 { 2799 if(inc != null) 2800 { 2801 reclaimIncoming(inc); 2802 } 2803 } 2804 } 2805 scheduleTimeout(int status)2806 private void scheduleTimeout(int status) 2807 { 2808 int timeout; 2809 if(_state < StateActive) 2810 { 2811 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 2812 if(defaultsAndOverrides.overrideConnectTimeout) 2813 { 2814 timeout = defaultsAndOverrides.overrideConnectTimeoutValue; 2815 } 2816 else 2817 { 2818 timeout = _endpoint.timeout(); 2819 } 2820 } 2821 else if(_state < StateClosingPending) 2822 { 2823 if(_readHeader) // No timeout for reading the header. 2824 { 2825 status &= ~SocketOperation.Read; 2826 } 2827 timeout = _endpoint.timeout(); 2828 } 2829 else 2830 { 2831 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 2832 if(defaultsAndOverrides.overrideCloseTimeout) 2833 { 2834 timeout = defaultsAndOverrides.overrideCloseTimeoutValue; 2835 } 2836 else 2837 { 2838 timeout = _endpoint.timeout(); 2839 } 2840 } 2841 2842 if(timeout < 0) 2843 { 2844 return; 2845 } 2846 2847 if((status & SocketOperation.Read) != 0) 2848 { 2849 if(_readTimeoutScheduled) 2850 { 2851 _timer.cancel(_readTimeout); 2852 } 2853 _timer.schedule(_readTimeout, timeout); 2854 _readTimeoutScheduled = true; 2855 } 2856 if((status & (SocketOperation.Write | SocketOperation.Connect)) != 0) 2857 { 2858 if(_writeTimeoutScheduled) 2859 { 2860 _timer.cancel(_writeTimeout); 2861 } 2862 _timer.schedule(_writeTimeout, timeout); 2863 _writeTimeoutScheduled = true; 2864 } 2865 } 2866 unscheduleTimeout(int status)2867 private void unscheduleTimeout(int status) 2868 { 2869 if((status & SocketOperation.Read) != 0 && _readTimeoutScheduled) 2870 { 2871 _timer.cancel(_readTimeout); 2872 _readTimeoutScheduled = false; 2873 } 2874 if((status & (SocketOperation.Write | SocketOperation.Connect)) != 0 && 2875 _writeTimeoutScheduled) 2876 { 2877 _timer.cancel(_writeTimeout); 2878 _writeTimeoutScheduled = false; 2879 } 2880 } 2881 initConnectionInfo()2882 private ConnectionInfo initConnectionInfo() 2883 { 2884 if(_state > StateNotInitialized && _info != null) // Update the connection info until it's initialized 2885 { 2886 return _info; 2887 } 2888 2889 try 2890 { 2891 _info = _transceiver.getInfo(); 2892 } 2893 catch(LocalException) 2894 { 2895 _info = new ConnectionInfo(); 2896 } 2897 for(ConnectionInfo info = _info; info != null; info = info.underlying) 2898 { 2899 info.connectionId = _endpoint.connectionId(); 2900 info.adapterName = _adapter != null ? _adapter.getName() : ""; 2901 info.incoming = _connector == null; 2902 } 2903 return _info; 2904 } 2905 reap()2906 private void reap() 2907 { 2908 if(_monitor != null) 2909 { 2910 _monitor.reap(this); 2911 } 2912 if(_observer != null) 2913 { 2914 _observer.detach(); 2915 } 2916 } 2917 toConnectionState(int state)2918 ConnectionState toConnectionState(int state) 2919 { 2920 return connectionStateMap[state]; 2921 } 2922 warning(string msg, System.Exception ex)2923 private void warning(string msg, System.Exception ex) 2924 { 2925 _logger.warning(msg + ":\n" + ex + "\n" + _transceiver.ToString()); 2926 } 2927 observerStartRead(IceInternal.Buffer buf)2928 private void observerStartRead(IceInternal.Buffer buf) 2929 { 2930 if(_readStreamPos >= 0) 2931 { 2932 Debug.Assert(!buf.empty()); 2933 _observer.receivedBytes(buf.b.position() - _readStreamPos); 2934 } 2935 _readStreamPos = buf.empty() ? -1 : buf.b.position(); 2936 } 2937 observerFinishRead(IceInternal.Buffer buf)2938 private void observerFinishRead(IceInternal.Buffer buf) 2939 { 2940 if(_readStreamPos == -1) 2941 { 2942 return; 2943 } 2944 Debug.Assert(buf.b.position() >= _readStreamPos); 2945 _observer.receivedBytes(buf.b.position() - _readStreamPos); 2946 _readStreamPos = -1; 2947 } 2948 observerStartWrite(IceInternal.Buffer buf)2949 private void observerStartWrite(IceInternal.Buffer buf) 2950 { 2951 if(_writeStreamPos >= 0) 2952 { 2953 Debug.Assert(!buf.empty()); 2954 _observer.sentBytes(buf.b.position() - _writeStreamPos); 2955 } 2956 _writeStreamPos = buf.empty() ? -1 : buf.b.position(); 2957 } 2958 observerFinishWrite(IceInternal.Buffer buf)2959 private void observerFinishWrite(IceInternal.Buffer buf) 2960 { 2961 if(_writeStreamPos == -1) 2962 { 2963 return; 2964 } 2965 if(buf.b.position() > _writeStreamPos) 2966 { 2967 _observer.sentBytes(buf.b.position() - _writeStreamPos); 2968 } 2969 _writeStreamPos = -1; 2970 } 2971 getIncoming(ObjectAdapter adapter, bool response, byte compress, int requestId)2972 private Incoming getIncoming(ObjectAdapter adapter, bool response, byte compress, int requestId) 2973 { 2974 Incoming inc = null; 2975 2976 if(_cacheBuffers) 2977 { 2978 lock(_incomingCacheMutex) 2979 { 2980 if(_incomingCache == null) 2981 { 2982 inc = new Incoming(_instance, this, this, adapter, response, compress, requestId); 2983 } 2984 else 2985 { 2986 inc = _incomingCache; 2987 _incomingCache = _incomingCache.next; 2988 inc.reset(_instance, this, this, adapter, response, compress, requestId); 2989 inc.next = null; 2990 } 2991 } 2992 } 2993 else 2994 { 2995 inc = new Incoming(_instance, this, this, adapter, response, compress, requestId); 2996 } 2997 2998 return inc; 2999 } 3000 reclaimIncoming(Incoming inc)3001 internal void reclaimIncoming(Incoming inc) 3002 { 3003 if(_cacheBuffers && inc.reclaim()) 3004 { 3005 lock(_incomingCacheMutex) 3006 { 3007 inc.next = _incomingCache; 3008 _incomingCache = inc; 3009 } 3010 } 3011 } 3012 read(IceInternal.Buffer buf)3013 private int read(IceInternal.Buffer buf) 3014 { 3015 int start = buf.b.position(); 3016 int op = _transceiver.read(buf, ref _hasMoreData); 3017 if(_instance.traceLevels().network >= 3 && buf.b.position() != start) 3018 { 3019 StringBuilder s = new StringBuilder("received "); 3020 if(_endpoint.datagram()) 3021 { 3022 s.Append(buf.b.limit()); 3023 } 3024 else 3025 { 3026 s.Append(buf.b.position() - start); 3027 s.Append(" of "); 3028 s.Append(buf.b.limit() - start); 3029 } 3030 s.Append(" bytes via "); 3031 s.Append(_endpoint.protocol()); 3032 s.Append("\n"); 3033 s.Append(ToString()); 3034 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 3035 } 3036 return op; 3037 } 3038 write(IceInternal.Buffer buf)3039 private int write(IceInternal.Buffer buf) 3040 { 3041 int start = buf.b.position(); 3042 int op = _transceiver.write(buf); 3043 if(_instance.traceLevels().network >= 3 && buf.b.position() != start) 3044 { 3045 StringBuilder s = new StringBuilder("sent "); 3046 s.Append(buf.b.position() - start); 3047 if(!_endpoint.datagram()) 3048 { 3049 s.Append(" of "); 3050 s.Append(buf.b.limit() - start); 3051 } 3052 s.Append(" bytes via "); 3053 s.Append(_endpoint.protocol()); 3054 s.Append("\n"); 3055 s.Append(ToString()); 3056 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 3057 } 3058 return op; 3059 } 3060 3061 private class OutgoingMessage 3062 { OutgoingMessage(OutputStream stream, bool compress, bool adopt)3063 internal OutgoingMessage(OutputStream stream, bool compress, bool adopt) 3064 { 3065 this.stream = stream; 3066 this.compress = compress; 3067 _adopt = adopt; 3068 } 3069 OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)3070 internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId) 3071 { 3072 this.outAsync = outAsync; 3073 this.stream = stream; 3074 this.compress = compress; 3075 this.requestId = requestId; 3076 } 3077 canceled()3078 internal void canceled() 3079 { 3080 Debug.Assert(outAsync != null); // Only requests can timeout. 3081 outAsync = null; 3082 } 3083 adopt()3084 internal void adopt() 3085 { 3086 if(_adopt) 3087 { 3088 OutputStream stream = new OutputStream(this.stream.instance(), Util.currentProtocolEncoding); 3089 stream.swap(this.stream); 3090 this.stream = stream; 3091 _adopt = false; 3092 } 3093 } 3094 sent()3095 internal bool sent() 3096 { 3097 stream = null; 3098 if(outAsync != null) 3099 { 3100 invokeSent = outAsync.sent(); 3101 return invokeSent ||receivedReply; 3102 } 3103 return false; 3104 } 3105 completed(LocalException ex)3106 internal void completed(LocalException ex) 3107 { 3108 if(outAsync != null) 3109 { 3110 if(outAsync.exception(ex)) 3111 { 3112 outAsync.invokeException(); 3113 } 3114 } 3115 stream = null; 3116 } 3117 3118 internal OutputStream stream; 3119 internal OutgoingAsyncBase outAsync; 3120 internal bool compress; 3121 internal int requestId; 3122 internal bool _adopt; 3123 internal bool prepared; 3124 internal bool isSent; 3125 internal bool invokeSent; 3126 internal bool receivedReply; 3127 } 3128 3129 private Communicator _communicator; 3130 private Instance _instance; 3131 private ACMMonitor _monitor; 3132 private Transceiver _transceiver; 3133 private string _desc; 3134 private string _type; 3135 private Connector _connector; 3136 private EndpointI _endpoint; 3137 3138 private ObjectAdapter _adapter; 3139 private ServantManager _servantManager; 3140 3141 private Logger _logger; 3142 private TraceLevels _traceLevels; 3143 private IceInternal.ThreadPool _threadPool; 3144 3145 private IceInternal.Timer _timer; 3146 private TimerTask _writeTimeout; 3147 private bool _writeTimeoutScheduled; 3148 private TimerTask _readTimeout; 3149 private bool _readTimeoutScheduled; 3150 3151 private StartCallback _startCallback = null; 3152 3153 private bool _warn; 3154 private bool _warnUdp; 3155 3156 private long _acmLastActivity; 3157 3158 private int _compressionLevel; 3159 3160 private int _nextRequestId; 3161 3162 private Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>(); 3163 3164 private LocalException _exception; 3165 3166 private readonly int _messageSizeMax; 3167 private BatchRequestQueue _batchRequestQueue; 3168 3169 private LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>(); 3170 3171 private InputStream _readStream; 3172 private bool _readHeader; 3173 private OutputStream _writeStream; 3174 3175 private ConnectionObserver _observer; 3176 private int _readStreamPos; 3177 private int _writeStreamPos; 3178 3179 private int _dispatchCount; 3180 3181 private int _state; // The current state. 3182 private bool _shutdownInitiated = false; 3183 private bool _initialized = false; 3184 private bool _validated = false; 3185 3186 private Incoming _incomingCache; 3187 private object _incomingCacheMutex = new object(); 3188 3189 private static bool _compressionSupported; 3190 3191 private bool _cacheBuffers; 3192 3193 private ConnectionInfo _info; 3194 3195 private CloseCallback _closeCallback; 3196 private HeartbeatCallback _heartbeatCallback; 3197 3198 private static ConnectionState[] connectionStateMap = new ConnectionState[] { 3199 ConnectionState.ConnectionStateValidating, // StateNotInitialized 3200 ConnectionState.ConnectionStateValidating, // StateNotValidated 3201 ConnectionState.ConnectionStateActive, // StateActive 3202 ConnectionState.ConnectionStateHolding, // StateHolding 3203 ConnectionState.ConnectionStateClosing, // StateClosing 3204 ConnectionState.ConnectionStateClosing, // StateClosingPending 3205 ConnectionState.ConnectionStateClosed, // StateClosed 3206 ConnectionState.ConnectionStateClosed, // StateFinished 3207 }; 3208 } 3209 } 3210