1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 using System.Collections.Generic; 6 using System.Diagnostics; 7 using System.Threading; 8 using System.Threading.Tasks; 9 10 namespace IceInternal 11 { 12 public interface OutgoingAsyncCompletionCallback 13 { init(OutgoingAsyncBase og)14 void init(OutgoingAsyncBase og); 15 handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)16 bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og); handleException(Ice.Exception ex, OutgoingAsyncBase og)17 bool handleException(Ice.Exception ex, OutgoingAsyncBase og); handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)18 bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og); 19 handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)20 void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og); handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)21 void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og); handleInvokeResponse(bool ok, OutgoingAsyncBase og)22 void handleInvokeResponse(bool ok, OutgoingAsyncBase og); 23 } 24 25 public abstract class OutgoingAsyncBase 26 { sent()27 public virtual bool sent() 28 { 29 return sentImpl(true); 30 } 31 exception(Ice.Exception ex)32 public virtual bool exception(Ice.Exception ex) 33 { 34 return exceptionImpl(ex); 35 } 36 response()37 public virtual bool response() 38 { 39 Debug.Assert(false); // Must be overriden by request that can handle responses 40 return false; 41 } 42 invokeSentAsync()43 public void invokeSentAsync() 44 { 45 // 46 // This is called when it's not safe to call the sent callback 47 // synchronously from this thread. Instead the exception callback 48 // is called asynchronously from the client thread pool. 49 // 50 try 51 { 52 instance_.clientThreadPool().dispatch(invokeSent, cachedConnection_); 53 } 54 catch(Ice.CommunicatorDestroyedException) 55 { 56 } 57 } 58 invokeExceptionAsync()59 public void invokeExceptionAsync() 60 { 61 // 62 // CommunicatorDestroyedCompleted is the only exception that can propagate directly 63 // from this method. 64 // 65 instance_.clientThreadPool().dispatch(invokeException, cachedConnection_); 66 } 67 invokeResponseAsync()68 public void invokeResponseAsync() 69 { 70 // 71 // CommunicatorDestroyedCompleted is the only exception that can propagate directly 72 // from this method. 73 // 74 instance_.clientThreadPool().dispatch(invokeResponse, cachedConnection_); 75 } 76 invokeSent()77 public void invokeSent() 78 { 79 try 80 { 81 _completionCallback.handleInvokeSent(sentSynchronously_, _doneInSent, _alreadySent, this); 82 } 83 catch(System.Exception ex) 84 { 85 warning(ex); 86 } 87 88 if(observer_ != null && _doneInSent) 89 { 90 observer_.detach(); 91 observer_ = null; 92 } 93 } invokeException()94 public void invokeException() 95 { 96 try 97 { 98 try 99 { 100 throw _ex; 101 } 102 catch(Ice.Exception ex) 103 { 104 _completionCallback.handleInvokeException(ex, this); 105 } 106 } 107 catch(System.Exception ex) 108 { 109 warning(ex); 110 } 111 112 if(observer_ != null) 113 { 114 observer_.detach(); 115 observer_ = null; 116 } 117 } 118 invokeResponse()119 public void invokeResponse() 120 { 121 if(_ex != null) 122 { 123 invokeException(); 124 return; 125 } 126 127 try 128 { 129 try 130 { 131 _completionCallback.handleInvokeResponse((state_ & StateOK) != 0, this); 132 } 133 catch(Ice.Exception ex) 134 { 135 if(_completionCallback.handleException(ex, this)) 136 { 137 _completionCallback.handleInvokeException(ex, this); 138 } 139 } 140 catch(System.AggregateException ex) 141 { 142 throw ex.InnerException; 143 } 144 } 145 catch(System.Exception ex) 146 { 147 warning(ex); 148 } 149 150 if(observer_ != null) 151 { 152 observer_.detach(); 153 observer_ = null; 154 } 155 } 156 cancelable(CancellationHandler handler)157 public virtual void cancelable(CancellationHandler handler) 158 { 159 lock(this) 160 { 161 if(_cancellationException != null) 162 { 163 try 164 { 165 throw _cancellationException; 166 } 167 catch(Ice.LocalException) 168 { 169 _cancellationException = null; 170 throw; 171 } 172 } 173 _cancellationHandler = handler; 174 } 175 } cancel()176 public void cancel() 177 { 178 cancel(new Ice.InvocationCanceledException()); 179 } 180 attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)181 public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId) 182 { 183 Ice.Instrumentation.InvocationObserver observer = getObserver(); 184 if(observer != null) 185 { 186 int size = os_.size() - Protocol.headerSize - 4; 187 childObserver_ = observer.getRemoteObserver(info, endpt, requestId, size); 188 if(childObserver_ != null) 189 { 190 childObserver_.attach(); 191 } 192 } 193 } 194 attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)195 public void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) 196 { 197 Ice.Instrumentation.InvocationObserver observer = getObserver(); 198 if(observer != null) 199 { 200 int size = os_.size() - Protocol.headerSize - 4; 201 childObserver_ = observer.getCollocatedObserver(adapter, requestId, size); 202 if(childObserver_ != null) 203 { 204 childObserver_.attach(); 205 } 206 } 207 } 208 getOs()209 public Ice.OutputStream getOs() 210 { 211 return os_; 212 } 213 getIs()214 public Ice.InputStream getIs() 215 { 216 return is_; 217 } 218 throwUserException()219 public virtual void throwUserException() 220 { 221 } 222 cacheMessageBuffers()223 public virtual void cacheMessageBuffers() 224 { 225 } 226 isSynchronous()227 public bool isSynchronous() 228 { 229 return synchronous_; 230 } 231 OutgoingAsyncBase(Instance instance, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)232 protected OutgoingAsyncBase(Instance instance, OutgoingAsyncCompletionCallback completionCallback, 233 Ice.OutputStream os = null, Ice.InputStream iss = null) 234 { 235 instance_ = instance; 236 sentSynchronously_ = false; 237 synchronous_ = false; 238 _doneInSent = false; 239 _alreadySent = false; 240 state_ = 0; 241 os_ = os ?? new Ice.OutputStream(instance, Ice.Util.currentProtocolEncoding); 242 is_ = iss ?? new Ice.InputStream(instance, Ice.Util.currentProtocolEncoding); 243 _completionCallback = completionCallback; 244 if(_completionCallback != null) 245 { 246 _completionCallback.init(this); 247 } 248 } 249 sentImpl(bool done)250 protected virtual bool sentImpl(bool done) 251 { 252 lock(this) 253 { 254 _alreadySent = (state_ & StateSent) > 0; 255 state_ |= StateSent; 256 if(done) 257 { 258 _doneInSent = true; 259 if(childObserver_ != null) 260 { 261 childObserver_.detach(); 262 childObserver_ = null; 263 } 264 _cancellationHandler = null; 265 266 // 267 // For oneway requests after the data has been sent 268 // the buffers can be reused unless this is a 269 // collocated invocation. For collocated invocations 270 // the buffer won't be reused because it has already 271 // been marked as cached in invokeCollocated. 272 // 273 cacheMessageBuffers(); 274 } 275 276 bool invoke = _completionCallback.handleSent(done, _alreadySent, this); 277 if(!invoke && _doneInSent && observer_ != null) 278 { 279 observer_.detach(); 280 observer_ = null; 281 } 282 return invoke; 283 } 284 } 285 exceptionImpl(Ice.Exception ex)286 protected virtual bool exceptionImpl(Ice.Exception ex) 287 { 288 lock(this) 289 { 290 _ex = ex; 291 if(childObserver_ != null) 292 { 293 childObserver_.failed(ex.ice_id()); 294 childObserver_.detach(); 295 childObserver_ = null; 296 } 297 _cancellationHandler = null; 298 299 if(observer_ != null) 300 { 301 observer_.failed(ex.ice_id()); 302 } 303 bool invoke = _completionCallback.handleException(ex, this); 304 if(!invoke && observer_ != null) 305 { 306 observer_.detach(); 307 observer_ = null; 308 } 309 return invoke; 310 } 311 } responseImpl(bool userThread, bool ok, bool invoke)312 protected virtual bool responseImpl(bool userThread, bool ok, bool invoke) 313 { 314 lock(this) 315 { 316 if(ok) 317 { 318 state_ |= StateOK; 319 } 320 321 _cancellationHandler = null; 322 323 try 324 { 325 invoke &= _completionCallback.handleResponse(userThread, ok, this); 326 } 327 catch(Ice.Exception ex) 328 { 329 _ex = ex; 330 invoke = _completionCallback.handleException(ex, this); 331 } 332 if(!invoke && observer_ != null) 333 { 334 observer_.detach(); 335 observer_ = null; 336 } 337 return invoke; 338 } 339 } 340 cancel(Ice.LocalException ex)341 protected void cancel(Ice.LocalException ex) 342 { 343 CancellationHandler handler; 344 { 345 lock(this) 346 { 347 _cancellationException = ex; 348 if(_cancellationHandler == null) 349 { 350 return; 351 } 352 handler = _cancellationHandler; 353 } 354 } 355 handler.asyncRequestCanceled(this, ex); 356 } 357 warning(System.Exception ex)358 void warning(System.Exception ex) 359 { 360 if(instance_.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) 361 { 362 instance_.initializationData().logger.warning("exception raised by AMI callback:\n" + ex); 363 } 364 } 365 366 // 367 // This virtual method is necessary for the communicator flush 368 // batch requests implementation. 369 // getObserver()370 virtual protected Ice.Instrumentation.InvocationObserver getObserver() 371 { 372 return observer_; 373 } 374 sentSynchronously()375 public bool sentSynchronously() 376 { 377 return sentSynchronously_; 378 } 379 380 protected Instance instance_; 381 protected Ice.Connection cachedConnection_; 382 protected bool sentSynchronously_; 383 protected bool synchronous_; 384 protected int state_; 385 386 protected Ice.Instrumentation.InvocationObserver observer_; 387 protected Ice.Instrumentation.ChildInvocationObserver childObserver_; 388 389 protected Ice.OutputStream os_; 390 protected Ice.InputStream is_; 391 392 private bool _doneInSent; 393 private bool _alreadySent; 394 private Ice.Exception _ex; 395 private Ice.LocalException _cancellationException; 396 private CancellationHandler _cancellationHandler; 397 private OutgoingAsyncCompletionCallback _completionCallback; 398 399 protected const int StateOK = 0x1; 400 protected const int StateDone = 0x2; 401 protected const int StateSent = 0x4; 402 protected const int StateEndCalled = 0x8; 403 protected const int StateCachedBuffers = 0x10; 404 405 public const int AsyncStatusQueued = 0; 406 public const int AsyncStatusSent = 1; 407 public const int AsyncStatusInvokeSentCallback = 2; 408 } 409 410 // 411 // Base class for proxy based invocations. This class handles the 412 // retry for proxy invocations. It also ensures the child observer is 413 // correct notified of failures and make sure the retry task is 414 // correctly canceled when the invocation completes. 415 // 416 public abstract class ProxyOutgoingAsyncBase : OutgoingAsyncBase, TimerTask 417 { invokeRemote(Ice.ConnectionI connection, bool compress, bool response)418 public abstract int invokeRemote(Ice.ConnectionI connection, bool compress, bool response); invokeCollocated(CollocatedRequestHandler handler)419 public abstract int invokeCollocated(CollocatedRequestHandler handler); 420 exception(Ice.Exception exc)421 public override bool exception(Ice.Exception exc) 422 { 423 if(childObserver_ != null) 424 { 425 childObserver_.failed(exc.ice_id()); 426 childObserver_.detach(); 427 childObserver_ = null; 428 } 429 430 cachedConnection_ = null; 431 if(proxy_.iceReference().getInvocationTimeout() == -2) 432 { 433 instance_.timer().cancel(this); 434 } 435 436 // 437 // NOTE: at this point, synchronization isn't needed, no other threads should be 438 // calling on the callback. 439 // 440 try 441 { 442 // 443 // It's important to let the retry queue do the retry even if 444 // the retry interval is 0. This method can be called with the 445 // connection locked so we can't just retry here. 446 // 447 instance_.retryQueue().add(this, proxy_.iceHandleException(exc, handler_, mode_, _sent, ref _cnt)); 448 return false; 449 } 450 catch(Ice.Exception ex) 451 { 452 return exceptionImpl(ex); // No retries, we're done 453 } 454 } 455 cancelable(CancellationHandler handler)456 public override void cancelable(CancellationHandler handler) 457 { 458 if(proxy_.iceReference().getInvocationTimeout() == -2 && cachedConnection_ != null) 459 { 460 int timeout = cachedConnection_.timeout(); 461 if(timeout > 0) 462 { 463 instance_.timer().schedule(this, timeout); 464 } 465 } 466 base.cancelable(handler); 467 } 468 retryException(Ice.Exception ex)469 public void retryException(Ice.Exception ex) 470 { 471 try 472 { 473 // 474 // It's important to let the retry queue do the retry. This is 475 // called from the connect request handler and the retry might 476 // require could end up waiting for the flush of the 477 // connection to be done. 478 // 479 proxy_.iceUpdateRequestHandler(handler_, null); // Clear request handler and always retry. 480 instance_.retryQueue().add(this, 0); 481 } 482 catch(Ice.Exception exc) 483 { 484 if(exception(exc)) 485 { 486 invokeExceptionAsync(); 487 } 488 } 489 } 490 retry()491 public void retry() 492 { 493 invokeImpl(false); 494 } abort(Ice.Exception ex)495 public void abort(Ice.Exception ex) 496 { 497 Debug.Assert(childObserver_ == null); 498 if(exceptionImpl(ex)) 499 { 500 invokeExceptionAsync(); 501 } 502 else if(ex is Ice.CommunicatorDestroyedException) 503 { 504 // 505 // If it's a communicator destroyed exception, swallow 506 // it but instead notify the user thread. Even if no callback 507 // was provided. 508 // 509 throw ex; 510 } 511 } 512 ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)513 protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, 514 OutgoingAsyncCompletionCallback completionCallback, 515 Ice.OutputStream os = null, 516 Ice.InputStream iss = null) : 517 base(prx.iceReference().getInstance(), completionCallback, os, iss) 518 { 519 proxy_ = prx; 520 mode_ = Ice.OperationMode.Normal; 521 _cnt = 0; 522 _sent = false; 523 } 524 invokeImpl(bool userThread)525 protected void invokeImpl(bool userThread) 526 { 527 try 528 { 529 if(userThread) 530 { 531 int invocationTimeout = proxy_.iceReference().getInvocationTimeout(); 532 if(invocationTimeout > 0) 533 { 534 instance_.timer().schedule(this, invocationTimeout); 535 } 536 } 537 else if(observer_ != null) 538 { 539 observer_.retried(); 540 } 541 542 while(true) 543 { 544 try 545 { 546 _sent = false; 547 handler_ = proxy_.iceGetRequestHandler(); 548 int status = handler_.sendAsyncRequest(this); 549 if((status & AsyncStatusSent) != 0) 550 { 551 if(userThread) 552 { 553 sentSynchronously_ = true; 554 if((status & AsyncStatusInvokeSentCallback) != 0) 555 { 556 invokeSent(); // Call the sent callback from the user thread. 557 } 558 } 559 else 560 { 561 if((status & AsyncStatusInvokeSentCallback) != 0) 562 { 563 invokeSentAsync(); // Call the sent callback from a client thread pool thread. 564 } 565 } 566 } 567 return; // We're done! 568 } 569 catch(RetryException) 570 { 571 proxy_.iceUpdateRequestHandler(handler_, null); // Clear request handler and always retry. 572 } 573 catch(Ice.Exception ex) 574 { 575 if(childObserver_ != null) 576 { 577 childObserver_.failed(ex.ice_id()); 578 childObserver_.detach(); 579 childObserver_ = null; 580 } 581 int interval = proxy_.iceHandleException(ex, handler_, mode_, _sent, ref _cnt); 582 if(interval > 0) 583 { 584 instance_.retryQueue().add(this, interval); 585 return; 586 } 587 else if(observer_ != null) 588 { 589 observer_.retried(); 590 } 591 } 592 } 593 } 594 catch(Ice.Exception ex) 595 { 596 // 597 // If called from the user thread we re-throw, the exception 598 // will be catch by the caller and abort() will be called. 599 // 600 if(userThread) 601 { 602 throw; 603 } 604 else if(exceptionImpl(ex)) // No retries, we're done 605 { 606 invokeExceptionAsync(); 607 } 608 } 609 } sentImpl(bool done)610 protected override bool sentImpl(bool done) 611 { 612 _sent = true; 613 if(done) 614 { 615 if(proxy_.iceReference().getInvocationTimeout() != -1) 616 { 617 instance_.timer().cancel(this); 618 } 619 } 620 return base.sentImpl(done); 621 } exceptionImpl(Ice.Exception ex)622 protected override bool exceptionImpl(Ice.Exception ex) 623 { 624 if(proxy_.iceReference().getInvocationTimeout() != -1) 625 { 626 instance_.timer().cancel(this); 627 } 628 return base.exceptionImpl(ex); 629 } 630 responseImpl(bool userThread, bool ok, bool invoke)631 protected override bool responseImpl(bool userThread, bool ok, bool invoke) 632 { 633 if(proxy_.iceReference().getInvocationTimeout() != -1) 634 { 635 instance_.timer().cancel(this); 636 } 637 return base.responseImpl(userThread, ok, invoke); 638 } 639 runTimerTask()640 public void runTimerTask() 641 { 642 if(proxy_.iceReference().getInvocationTimeout() == -2) 643 { 644 cancel(new Ice.ConnectionTimeoutException()); 645 } 646 else 647 { 648 cancel(new Ice.InvocationTimeoutException()); 649 } 650 } 651 652 protected readonly Ice.ObjectPrxHelperBase proxy_; 653 protected RequestHandler handler_; 654 protected Ice.OperationMode mode_; 655 656 private int _cnt; 657 private bool _sent; 658 } 659 660 // 661 // Class for handling Slice operation invocations 662 // 663 public class OutgoingAsync : ProxyOutgoingAsyncBase 664 { OutgoingAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)665 public OutgoingAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, 666 Ice.OutputStream os = null, Ice.InputStream iss = null) : 667 base(prx, completionCallback, os, iss) 668 { 669 encoding_ = Protocol.getCompatibleEncoding(proxy_.iceReference().getEncoding()); 670 synchronous_ = false; 671 } 672 prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context)673 public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context) 674 { 675 Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol())); 676 677 mode_ = mode; 678 679 observer_ = ObserverHelper.get(proxy_, operation, context); 680 681 switch(proxy_.iceReference().getMode()) 682 { 683 case Reference.Mode.ModeTwoway: 684 case Reference.Mode.ModeOneway: 685 case Reference.Mode.ModeDatagram: 686 { 687 os_.writeBlob(Protocol.requestHdr); 688 break; 689 } 690 691 case Reference.Mode.ModeBatchOneway: 692 case Reference.Mode.ModeBatchDatagram: 693 { 694 proxy_.iceGetBatchRequestQueue().prepareBatchRequest(os_); 695 break; 696 } 697 } 698 699 Reference rf = proxy_.iceReference(); 700 701 rf.getIdentity().ice_writeMembers(os_); 702 703 // 704 // For compatibility with the old FacetPath. 705 // 706 string facet = rf.getFacet(); 707 if(facet == null || facet.Length == 0) 708 { 709 os_.writeStringSeq(null); 710 } 711 else 712 { 713 string[] facetPath = { facet }; 714 os_.writeStringSeq(facetPath); 715 } 716 717 os_.writeString(operation); 718 719 os_.writeByte((byte)mode); 720 721 if(context != null) 722 { 723 // 724 // Explicit context 725 // 726 Ice.ContextHelper.write(os_, context); 727 } 728 else 729 { 730 // 731 // Implicit context 732 // 733 Ice.ImplicitContextI implicitContext = rf.getInstance().getImplicitContext(); 734 Dictionary<string, string> prxContext = rf.getContext(); 735 736 if(implicitContext == null) 737 { 738 Ice.ContextHelper.write(os_, prxContext); 739 } 740 else 741 { 742 implicitContext.write(prxContext, os_); 743 } 744 } 745 } sent()746 public override bool sent() 747 { 748 return base.sentImpl(!proxy_.ice_isTwoway()); // done = true if it's not a two-way proxy 749 } 750 response()751 public override bool response() 752 { 753 // 754 // NOTE: this method is called from ConnectionI.parseMessage 755 // with the connection locked. Therefore, it must not invoke 756 // any user callbacks. 757 // 758 Debug.Assert(proxy_.ice_isTwoway()); // Can only be called for twoways. 759 760 if(childObserver_ != null) 761 { 762 childObserver_.reply(is_.size() - Protocol.headerSize - 4); 763 childObserver_.detach(); 764 childObserver_ = null; 765 } 766 767 byte replyStatus; 768 try 769 { 770 replyStatus = is_.readByte(); 771 772 switch(replyStatus) 773 { 774 case ReplyStatus.replyOK: 775 { 776 break; 777 } 778 case ReplyStatus.replyUserException: 779 { 780 if(observer_ != null) 781 { 782 observer_.userException(); 783 } 784 break; 785 } 786 787 case ReplyStatus.replyObjectNotExist: 788 case ReplyStatus.replyFacetNotExist: 789 case ReplyStatus.replyOperationNotExist: 790 { 791 Ice.Identity ident = new Ice.Identity(); 792 ident.ice_readMembers(is_); 793 794 // 795 // For compatibility with the old FacetPath. 796 // 797 string[] facetPath = is_.readStringSeq(); 798 ; 799 string facet; 800 if(facetPath.Length > 0) 801 { 802 if(facetPath.Length > 1) 803 { 804 throw new Ice.MarshalException(); 805 } 806 facet = facetPath[0]; 807 } 808 else 809 { 810 facet = ""; 811 } 812 813 string operation = is_.readString(); 814 815 Ice.RequestFailedException ex = null; 816 switch(replyStatus) 817 { 818 case ReplyStatus.replyObjectNotExist: 819 { 820 ex = new Ice.ObjectNotExistException(); 821 break; 822 } 823 824 case ReplyStatus.replyFacetNotExist: 825 { 826 ex = new Ice.FacetNotExistException(); 827 break; 828 } 829 830 case ReplyStatus.replyOperationNotExist: 831 { 832 ex = new Ice.OperationNotExistException(); 833 break; 834 } 835 836 default: 837 { 838 Debug.Assert(false); 839 break; 840 } 841 } 842 843 ex.id = ident; 844 ex.facet = facet; 845 ex.operation = operation; 846 throw ex; 847 } 848 849 case ReplyStatus.replyUnknownException: 850 case ReplyStatus.replyUnknownLocalException: 851 case ReplyStatus.replyUnknownUserException: 852 { 853 string unknown = is_.readString(); 854 855 Ice.UnknownException ex = null; 856 switch(replyStatus) 857 { 858 case ReplyStatus.replyUnknownException: 859 { 860 ex = new Ice.UnknownException(); 861 break; 862 } 863 864 case ReplyStatus.replyUnknownLocalException: 865 { 866 ex = new Ice.UnknownLocalException(); 867 break; 868 } 869 870 case ReplyStatus.replyUnknownUserException: 871 { 872 ex = new Ice.UnknownUserException(); 873 break; 874 } 875 876 default: 877 { 878 Debug.Assert(false); 879 break; 880 } 881 } 882 883 ex.unknown = unknown; 884 throw ex; 885 } 886 887 default: 888 { 889 throw new Ice.UnknownReplyStatusException(); 890 } 891 } 892 893 return responseImpl(false, replyStatus == ReplyStatus.replyOK, true); 894 } 895 catch(Ice.Exception ex) 896 { 897 return exception(ex); 898 } 899 } 900 invokeRemote(Ice.ConnectionI connection, bool compress, bool response)901 public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response) 902 { 903 cachedConnection_ = connection; 904 return connection.sendAsyncRequest(this, compress, response, 0); 905 } 906 invokeCollocated(CollocatedRequestHandler handler)907 public override int invokeCollocated(CollocatedRequestHandler handler) 908 { 909 // The stream cannot be cached if the proxy is not a twoway or there is an invocation timeout set. 910 if(!proxy_.ice_isTwoway() || proxy_.iceReference().getInvocationTimeout() != -1) 911 { 912 // Disable caching by marking the streams as cached! 913 state_ |= StateCachedBuffers; 914 } 915 return handler.invokeAsyncRequest(this, 0, synchronous_); 916 } 917 abort(Ice.Exception ex)918 public new void abort(Ice.Exception ex) 919 { 920 Reference.Mode mode = proxy_.iceReference().getMode(); 921 if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) 922 { 923 // 924 // If we didn't finish a batch oneway or datagram request, we 925 // must notify the connection about that we give up ownership 926 // of the batch stream. 927 // 928 proxy_.iceGetBatchRequestQueue().abortBatchRequest(os_); 929 } 930 931 base.abort(ex); 932 } 933 invoke(string operation, bool synchronous)934 protected void invoke(string operation, bool synchronous) 935 { 936 synchronous_ = synchronous; 937 Reference.Mode mode = proxy_.iceReference().getMode(); 938 if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) 939 { 940 sentSynchronously_ = true; 941 proxy_.iceGetBatchRequestQueue().finishBatchRequest(os_, proxy_, operation); 942 responseImpl(true, true, false); // Don't call sent/completed callback for batch AMI requests 943 return; 944 } 945 946 // 947 // NOTE: invokeImpl doesn't throw so this can be called from the 948 // try block with the catch block calling abort() in case of an 949 // exception. 950 // 951 invokeImpl(true); // userThread = true 952 } 953 invoke(string operation, Ice.OperationMode mode, Ice.FormatType format, Dictionary<string, string> context, bool synchronous, System.Action<Ice.OutputStream> write)954 public void invoke(string operation, 955 Ice.OperationMode mode, 956 Ice.FormatType format, 957 Dictionary<string, string> context, 958 bool synchronous, 959 System.Action<Ice.OutputStream> write) 960 { 961 try 962 { 963 prepare(operation, mode, context); 964 if(write != null) 965 { 966 os_.startEncapsulation(encoding_, format); 967 write(os_); 968 os_.endEncapsulation(); 969 } 970 else 971 { 972 os_.writeEmptyEncapsulation(encoding_); 973 } 974 invoke(operation, synchronous); 975 } 976 catch(Ice.Exception ex) 977 { 978 abort(ex); 979 } 980 } 981 throwUserException()982 public override void throwUserException() 983 { 984 try 985 { 986 is_.startEncapsulation(); 987 is_.throwException(); 988 } 989 catch(Ice.UserException ex) 990 { 991 is_.endEncapsulation(); 992 if(userException_!= null) 993 { 994 userException_.Invoke(ex); 995 } 996 throw new Ice.UnknownUserException(ex.ice_id()); 997 } 998 } 999 cacheMessageBuffers()1000 public override void cacheMessageBuffers() 1001 { 1002 if(proxy_.iceReference().getInstance().cacheMessageBuffers() > 0) 1003 { 1004 lock(this) 1005 { 1006 if((state_ & StateCachedBuffers) > 0) 1007 { 1008 return; 1009 } 1010 state_ |= StateCachedBuffers; 1011 } 1012 1013 if(is_ != null) 1014 { 1015 is_.reset(); 1016 } 1017 os_.reset(); 1018 1019 proxy_.cacheMessageBuffers(is_, os_); 1020 1021 is_ = null; 1022 os_ = null; 1023 } 1024 } 1025 1026 protected readonly Ice.EncodingVersion encoding_; 1027 protected System.Action<Ice.UserException> userException_; 1028 } 1029 1030 public class OutgoingAsyncT<T> : OutgoingAsync 1031 { OutgoingAsyncT(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)1032 public OutgoingAsyncT(Ice.ObjectPrxHelperBase prx, 1033 OutgoingAsyncCompletionCallback completionCallback, 1034 Ice.OutputStream os = null, 1035 Ice.InputStream iss = null) : 1036 base(prx, completionCallback, os, iss) 1037 { 1038 } 1039 invoke(string operation, Ice.OperationMode mode, Ice.FormatType format, Dictionary<string, string> context, bool synchronous, System.Action<Ice.OutputStream> write = null, System.Action<Ice.UserException> userException = null, System.Func<Ice.InputStream, T> read = null)1040 public void invoke(string operation, 1041 Ice.OperationMode mode, 1042 Ice.FormatType format, 1043 Dictionary<string, string> context, 1044 bool synchronous, 1045 System.Action<Ice.OutputStream> write = null, 1046 System.Action<Ice.UserException> userException = null, 1047 System.Func<Ice.InputStream, T> read = null) 1048 { 1049 read_ = read; 1050 userException_ = userException; 1051 base.invoke(operation, mode, format, context, synchronous, write); 1052 } 1053 getResult(bool ok)1054 public T getResult(bool ok) 1055 { 1056 try 1057 { 1058 if(ok) 1059 { 1060 if(read_ == null) 1061 { 1062 if(is_ == null || is_.isEmpty()) 1063 { 1064 // 1065 // If there's no response (oneway, batch-oneway proxies), we just set the result 1066 // on completion without reading anything from the input stream. This is required for 1067 // batch invocations. 1068 // 1069 } 1070 else 1071 { 1072 is_.skipEmptyEncapsulation(); 1073 } 1074 return default(T); 1075 } 1076 else 1077 { 1078 is_.startEncapsulation(); 1079 T r = read_(is_); 1080 is_.endEncapsulation(); 1081 return r; 1082 } 1083 } 1084 else 1085 { 1086 throwUserException(); 1087 return default(T); // make compiler happy 1088 } 1089 } 1090 finally 1091 { 1092 cacheMessageBuffers(); 1093 } 1094 } 1095 1096 protected System.Func<Ice.InputStream, T> read_; 1097 } 1098 1099 // 1100 // Class for handling the proxy's begin_ice_flushBatchRequest request. 1101 // 1102 class ProxyFlushBatchAsync : ProxyOutgoingAsyncBase 1103 { ProxyFlushBatchAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback)1104 public ProxyFlushBatchAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) : 1105 base(prx, completionCallback) 1106 { 1107 } 1108 invokeRemote(Ice.ConnectionI connection, bool compress, bool response)1109 public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response) 1110 { 1111 if(_batchRequestNum == 0) 1112 { 1113 if(sent()) 1114 { 1115 return AsyncStatusSent | AsyncStatusInvokeSentCallback; 1116 } 1117 else 1118 { 1119 return AsyncStatusSent; 1120 } 1121 } 1122 cachedConnection_ = connection; 1123 return connection.sendAsyncRequest(this, compress, false, _batchRequestNum); 1124 } 1125 invokeCollocated(CollocatedRequestHandler handler)1126 public override int invokeCollocated(CollocatedRequestHandler handler) 1127 { 1128 if(_batchRequestNum == 0) 1129 { 1130 if(sent()) 1131 { 1132 return AsyncStatusSent | AsyncStatusInvokeSentCallback; 1133 } 1134 else 1135 { 1136 return AsyncStatusSent; 1137 } 1138 } 1139 return handler.invokeAsyncRequest(this, _batchRequestNum, false); 1140 } 1141 invoke(string operation, bool synchronous)1142 public void invoke(string operation, bool synchronous) 1143 { 1144 Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol())); 1145 synchronous_ = synchronous; 1146 observer_ = ObserverHelper.get(proxy_, operation, null); 1147 bool compress; // Not used for proxy flush batch requests. 1148 _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_, out compress); 1149 invokeImpl(true); // userThread = true 1150 } 1151 1152 private int _batchRequestNum; 1153 } 1154 1155 // 1156 // Class for handling the proxy's begin_ice_getConnection request. 1157 // 1158 class ProxyGetConnection : ProxyOutgoingAsyncBase 1159 { ProxyGetConnection(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback)1160 public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) : 1161 base(prx, completionCallback) 1162 { 1163 } 1164 invokeRemote(Ice.ConnectionI connection, bool compress, bool response)1165 public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response) 1166 { 1167 cachedConnection_ = connection; 1168 if(responseImpl(false, true, true)) 1169 { 1170 invokeResponseAsync(); 1171 } 1172 return AsyncStatusSent; 1173 } 1174 invokeCollocated(CollocatedRequestHandler handler)1175 public override int invokeCollocated(CollocatedRequestHandler handler) 1176 { 1177 if(responseImpl(false, true, true)) 1178 { 1179 invokeResponseAsync(); 1180 } 1181 return AsyncStatusSent; 1182 } 1183 getConnection()1184 public Ice.Connection getConnection() 1185 { 1186 return cachedConnection_; 1187 } 1188 invoke(string operation, bool synchronous)1189 public void invoke(string operation, bool synchronous) 1190 { 1191 synchronous_ = synchronous; 1192 observer_ = ObserverHelper.get(proxy_, operation, null); 1193 invokeImpl(true); // userThread = true 1194 } 1195 } 1196 1197 class ConnectionFlushBatchAsync : OutgoingAsyncBase 1198 { ConnectionFlushBatchAsync(Ice.ConnectionI connection, Instance instance, OutgoingAsyncCompletionCallback completionCallback)1199 public ConnectionFlushBatchAsync(Ice.ConnectionI connection, 1200 Instance instance, 1201 OutgoingAsyncCompletionCallback completionCallback) : 1202 base(instance, completionCallback) 1203 { 1204 _connection = connection; 1205 } 1206 invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)1207 public void invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous) 1208 { 1209 synchronous_ = synchronous; 1210 observer_ = ObserverHelper.get(instance_, operation); 1211 try 1212 { 1213 int status; 1214 bool compress; 1215 int batchRequestNum = _connection.getBatchRequestQueue().swap(os_, out compress); 1216 if(batchRequestNum == 0) 1217 { 1218 status = AsyncStatusSent; 1219 if(sent()) 1220 { 1221 status = status | AsyncStatusInvokeSentCallback; 1222 } 1223 } 1224 else 1225 { 1226 bool comp; 1227 if(compressBatch == Ice.CompressBatch.Yes) 1228 { 1229 comp = true; 1230 } 1231 else if(compressBatch == Ice.CompressBatch.No) 1232 { 1233 comp = false; 1234 } 1235 else 1236 { 1237 comp = compress; 1238 } 1239 status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum); 1240 } 1241 1242 if((status & AsyncStatusSent) != 0) 1243 { 1244 sentSynchronously_ = true; 1245 if((status & AsyncStatusInvokeSentCallback) != 0) 1246 { 1247 invokeSent(); 1248 } 1249 } 1250 } 1251 catch(RetryException ex) 1252 { 1253 try 1254 { 1255 throw ex.get(); 1256 } 1257 catch(Ice.LocalException ee) 1258 { 1259 if(exception(ee)) 1260 { 1261 invokeExceptionAsync(); 1262 } 1263 } 1264 } 1265 catch(Ice.Exception ex) 1266 { 1267 if(exception(ex)) 1268 { 1269 invokeExceptionAsync(); 1270 } 1271 } 1272 } 1273 1274 private readonly Ice.ConnectionI _connection; 1275 }; 1276 1277 public class CommunicatorFlushBatchAsync : OutgoingAsyncBase 1278 { 1279 class FlushBatch : OutgoingAsyncBase 1280 { FlushBatch(CommunicatorFlushBatchAsync outAsync, Instance instance, Ice.Instrumentation.InvocationObserver observer)1281 public FlushBatch(CommunicatorFlushBatchAsync outAsync, 1282 Instance instance, 1283 Ice.Instrumentation.InvocationObserver observer) : base(instance, null) 1284 { 1285 _outAsync = outAsync; 1286 _observer = observer; 1287 } 1288 1289 public override bool sent()1290 sent() 1291 { 1292 if(childObserver_ != null) 1293 { 1294 childObserver_.detach(); 1295 childObserver_ = null; 1296 } 1297 _outAsync.check(false); 1298 return false; 1299 } 1300 1301 public override bool exception(Ice.Exception ex)1302 exception(Ice.Exception ex) 1303 { 1304 if(childObserver_ != null) 1305 { 1306 childObserver_.failed(ex.ice_id()); 1307 childObserver_.detach(); 1308 childObserver_ = null; 1309 } 1310 _outAsync.check(false); 1311 return false; 1312 } 1313 1314 protected override Ice.Instrumentation.InvocationObserver getObserver()1315 getObserver() 1316 { 1317 return _observer; 1318 } 1319 1320 private CommunicatorFlushBatchAsync _outAsync; 1321 private Ice.Instrumentation.InvocationObserver _observer; 1322 }; 1323 CommunicatorFlushBatchAsync(Instance instance, OutgoingAsyncCompletionCallback callback)1324 public CommunicatorFlushBatchAsync(Instance instance, OutgoingAsyncCompletionCallback callback) : 1325 base(instance, callback) 1326 { 1327 // 1328 // _useCount is initialized to 1 to prevent premature callbacks. 1329 // The caller must invoke ready() after all flush requests have 1330 // been initiated. 1331 // 1332 _useCount = 1; 1333 } 1334 flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch)1335 public void flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch) 1336 { 1337 lock(this) 1338 { 1339 ++_useCount; 1340 } 1341 1342 try 1343 { 1344 var flushBatch = new FlushBatch(this, instance_, observer_); 1345 bool compress; 1346 int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), out compress); 1347 if(batchRequestNum == 0) 1348 { 1349 flushBatch.sent(); 1350 } 1351 else 1352 { 1353 bool comp; 1354 if(compressBatch == Ice.CompressBatch.Yes) 1355 { 1356 comp = true; 1357 } 1358 else if(compressBatch == Ice.CompressBatch.No) 1359 { 1360 comp = false; 1361 } 1362 else 1363 { 1364 comp = compress; 1365 } 1366 con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum); 1367 } 1368 } 1369 catch(Ice.LocalException) 1370 { 1371 check(false); 1372 throw; 1373 } 1374 } 1375 invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)1376 public void invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous) 1377 { 1378 synchronous_ = synchronous; 1379 observer_ = ObserverHelper.get(instance_, operation); 1380 instance_.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this); 1381 instance_.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this); 1382 check(true); 1383 } 1384 check(bool userThread)1385 public void check(bool userThread) 1386 { 1387 lock(this) 1388 { 1389 Debug.Assert(_useCount > 0); 1390 if(--_useCount > 0) 1391 { 1392 return; 1393 } 1394 } 1395 1396 if(sentImpl(true)) 1397 { 1398 if(userThread) 1399 { 1400 sentSynchronously_ = true; 1401 invokeSent(); 1402 } 1403 else 1404 { 1405 invokeSentAsync(); 1406 } 1407 } 1408 } 1409 1410 private int _useCount; 1411 }; 1412 1413 public abstract class TaskCompletionCallback<T> : TaskCompletionSource<T>, OutgoingAsyncCompletionCallback 1414 { TaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)1415 public TaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) 1416 { 1417 progress_ = progress; 1418 _cancellationToken = cancellationToken; 1419 } 1420 init(OutgoingAsyncBase outgoing)1421 public void init(OutgoingAsyncBase outgoing) 1422 { 1423 if(_cancellationToken.CanBeCanceled) 1424 { 1425 _cancellationToken.Register(outgoing.cancel); 1426 } 1427 } 1428 handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)1429 public bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og) 1430 { 1431 if(done && og.isSynchronous()) 1432 { 1433 Debug.Assert(progress_ == null); 1434 handleInvokeSent(false, done, alreadySent, og); 1435 return false; 1436 } 1437 return done || progress_ != null && !alreadySent; // Invoke the sent callback only if not already invoked. 1438 } 1439 handleException(Ice.Exception ex, OutgoingAsyncBase og)1440 public bool handleException(Ice.Exception ex, OutgoingAsyncBase og) 1441 { 1442 // 1443 // If this is a synchronous call, we can notify the task from this thread to avoid 1444 // the thread context switch. We know there aren't any continuations setup with the 1445 // task. 1446 // 1447 if(og.isSynchronous()) 1448 { 1449 handleInvokeException(ex, og); 1450 return false; 1451 } 1452 else 1453 { 1454 return true; 1455 } 1456 } 1457 handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)1458 public bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og) 1459 { 1460 // 1461 // If called from the user thread (only the case for batch requests) or if this 1462 // is a synchronous call, we can notify the task from this thread to avoid the 1463 // thread context switch. We know there aren't any continuations setup with the 1464 // task. 1465 // 1466 if(userThread || og.isSynchronous()) 1467 { 1468 handleInvokeResponse(ok, og); 1469 return false; 1470 } 1471 else 1472 { 1473 return true; 1474 } 1475 } 1476 handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)1477 public virtual void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og) 1478 { 1479 if(progress_ != null && !alreadySent) 1480 { 1481 progress_.Report(sentSynchronously); 1482 } 1483 if(done) 1484 { 1485 SetResult(default(T)); 1486 } 1487 } 1488 handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)1489 public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og) 1490 { 1491 SetException(ex); 1492 } 1493 handleInvokeResponse(bool ok, OutgoingAsyncBase og)1494 abstract public void handleInvokeResponse(bool ok, OutgoingAsyncBase og); 1495 1496 private readonly CancellationToken _cancellationToken; 1497 1498 protected readonly System.IProgress<bool> progress_; 1499 } 1500 1501 public class OperationTaskCompletionCallback<T> : TaskCompletionCallback<T> 1502 { OperationTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)1503 public OperationTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) : 1504 base(progress, cancellationToken) 1505 { 1506 } 1507 handleInvokeResponse(bool ok, OutgoingAsyncBase og)1508 public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og) 1509 { 1510 SetResult(((OutgoingAsyncT<T>)og).getResult(ok)); 1511 } 1512 } 1513 1514 public class FlushBatchTaskCompletionCallback : TaskCompletionCallback<object> 1515 { FlushBatchTaskCompletionCallback(System.IProgress<bool> progress = null, CancellationToken cancellationToken = new CancellationToken())1516 public FlushBatchTaskCompletionCallback(System.IProgress<bool> progress = null, 1517 CancellationToken cancellationToken = new CancellationToken()) : 1518 base(progress, cancellationToken) 1519 { 1520 } 1521 handleInvokeResponse(bool ok, OutgoingAsyncBase og)1522 public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og) 1523 { 1524 SetResult(null); 1525 } 1526 } 1527 1528 abstract public class AsyncResultCompletionCallback : AsyncResultI, OutgoingAsyncCompletionCallback 1529 { AsyncResultCompletionCallback(Ice.Communicator com, Instance instance, string op, object cookie, Ice.AsyncCallback cb)1530 public AsyncResultCompletionCallback(Ice.Communicator com, Instance instance, string op, object cookie, 1531 Ice.AsyncCallback cb) : 1532 base(com, instance, op, cookie, cb) 1533 { 1534 } 1535 init(OutgoingAsyncBase outgoing)1536 public void init(OutgoingAsyncBase outgoing) 1537 { 1538 outgoing_ = outgoing; 1539 } 1540 handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)1541 public bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og) 1542 { 1543 lock(this) 1544 { 1545 state_ |= StateSent; 1546 if(done) 1547 { 1548 state_ |= StateDone | StateOK; 1549 } 1550 if(waitHandle_ != null) 1551 { 1552 waitHandle_.Set(); 1553 } 1554 Monitor.PulseAll(this); 1555 1556 // 1557 // Invoke the sent callback only if not already invoked. 1558 // 1559 return !alreadySent && sentCallback_ != null; 1560 } 1561 } 1562 handleException(Ice.Exception ex, OutgoingAsyncBase og)1563 public bool handleException(Ice.Exception ex, OutgoingAsyncBase og) 1564 { 1565 lock(this) 1566 { 1567 state_ |= StateDone; 1568 exception_ = ex; 1569 if(waitHandle_ != null) 1570 { 1571 waitHandle_.Set(); 1572 } 1573 Monitor.PulseAll(this); 1574 return completedCallback_ != null; 1575 } 1576 } 1577 handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)1578 public bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og) 1579 { 1580 lock(this) 1581 { 1582 state_ |= StateDone; 1583 if(ok) 1584 { 1585 state_ |= StateOK; 1586 } 1587 if(waitHandle_ != null) 1588 { 1589 waitHandle_.Set(); 1590 } 1591 Monitor.PulseAll(this); 1592 return completedCallback_ != null; 1593 } 1594 } 1595 handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)1596 public void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og) 1597 { 1598 sentCallback_(this); 1599 } 1600 handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)1601 public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og) 1602 { 1603 try 1604 { 1605 completedCallback_(this); 1606 } 1607 catch(Ice.Exception e) 1608 { 1609 throw new System.AggregateException(e); 1610 } 1611 } 1612 handleInvokeResponse(bool ok, OutgoingAsyncBase og)1613 public void handleInvokeResponse(bool ok, OutgoingAsyncBase og) 1614 { 1615 try 1616 { 1617 completedCallback_(this); 1618 } 1619 catch(Ice.Exception e) 1620 { 1621 throw new System.AggregateException(e); 1622 } 1623 } 1624 } 1625 1626 abstract public class ProxyAsyncResultCompletionCallback<T> : AsyncResultCompletionCallback, Ice.AsyncResult<T> 1627 { ProxyAsyncResultCompletionCallback(Ice.ObjectPrxHelperBase proxy, string operation, object cookie, Ice.AsyncCallback cb)1628 public ProxyAsyncResultCompletionCallback(Ice.ObjectPrxHelperBase proxy, string operation, object cookie, 1629 Ice.AsyncCallback cb) : 1630 base(proxy.ice_getCommunicator(), proxy.iceReference().getInstance(), operation, cookie, cb) 1631 { 1632 _proxy = proxy; 1633 } 1634 getProxy()1635 public override Ice.ObjectPrx getProxy() 1636 { 1637 return _proxy; 1638 } 1639 whenCompleted(Ice.ExceptionCallback excb)1640 new public Ice.AsyncResult<T> whenCompleted(Ice.ExceptionCallback excb) 1641 { 1642 base.whenCompleted(excb); 1643 return this; 1644 } 1645 whenCompleted(T cb, Ice.ExceptionCallback excb)1646 virtual public Ice.AsyncResult<T> whenCompleted(T cb, Ice.ExceptionCallback excb) 1647 { 1648 if(cb == null && excb == null) 1649 { 1650 throw new System.ArgumentException("callback is null"); 1651 } 1652 lock(this) 1653 { 1654 if(responseCallback_ != null || exceptionCallback_ != null) 1655 { 1656 throw new System.ArgumentException("callback already set"); 1657 } 1658 responseCallback_ = cb; 1659 exceptionCallback_ = excb; 1660 } 1661 setCompletedCallback(getCompletedCallback()); 1662 return this; 1663 } 1664 whenSent(Ice.SentCallback cb)1665 new public Ice.AsyncResult<T> whenSent(Ice.SentCallback cb) 1666 { 1667 base.whenSent(cb); 1668 return this; 1669 } 1670 1671 protected T responseCallback_; 1672 private Ice.ObjectPrx _proxy; 1673 } 1674 1675 public class OperationAsyncResultCompletionCallback<T, R> : ProxyAsyncResultCompletionCallback<T> 1676 { OperationAsyncResultCompletionCallback(System.Action<T, R> completed, Ice.ObjectPrxHelperBase proxy, string operation, object cookie, Ice.AsyncCallback callback)1677 public OperationAsyncResultCompletionCallback(System.Action<T, R> completed, 1678 Ice.ObjectPrxHelperBase proxy, 1679 string operation, 1680 object cookie, 1681 Ice.AsyncCallback callback) : 1682 base(proxy, operation, cookie, callback) 1683 { 1684 _completed = completed; 1685 } 1686 getCompletedCallback()1687 override protected Ice.AsyncCallback getCompletedCallback() 1688 { 1689 return (Ice.AsyncResult r) => 1690 { 1691 Debug.Assert(r == this); 1692 try 1693 { 1694 R result = ((OutgoingAsyncT<R>)outgoing_).getResult(wait()); 1695 try 1696 { 1697 _completed(responseCallback_, result); 1698 } 1699 catch(Ice.Exception ex) 1700 { 1701 throw new System.AggregateException(ex); 1702 } 1703 } 1704 catch(Ice.Exception ex) 1705 { 1706 if(exceptionCallback_ != null) 1707 { 1708 exceptionCallback_.Invoke(ex); 1709 } 1710 } 1711 }; 1712 } 1713 1714 private System.Action<T, R> _completed; 1715 } 1716 } 1717