1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 using System.Collections.Generic;
6 using System.Diagnostics;
7 using System.Threading;
8 using System.Threading.Tasks;
9 
10 namespace IceInternal
11 {
12     public interface OutgoingAsyncCompletionCallback
13     {
init(OutgoingAsyncBase og)14         void init(OutgoingAsyncBase og);
15 
handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)16         bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og);
handleException(Ice.Exception ex, OutgoingAsyncBase og)17         bool handleException(Ice.Exception ex, OutgoingAsyncBase og);
handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)18         bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og);
19 
handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)20         void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og);
handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)21         void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og);
handleInvokeResponse(bool ok, OutgoingAsyncBase og)22         void handleInvokeResponse(bool ok, OutgoingAsyncBase og);
23     }
24 
25     public abstract class OutgoingAsyncBase
26     {
sent()27         public virtual bool sent()
28         {
29             return sentImpl(true);
30         }
31 
exception(Ice.Exception ex)32         public virtual bool exception(Ice.Exception ex)
33         {
34             return exceptionImpl(ex);
35         }
36 
response()37         public virtual bool response()
38         {
39             Debug.Assert(false); // Must be overriden by request that can handle responses
40             return false;
41         }
42 
invokeSentAsync()43         public void invokeSentAsync()
44         {
45             //
46             // This is called when it's not safe to call the sent callback
47             // synchronously from this thread. Instead the exception callback
48             // is called asynchronously from the client thread pool.
49             //
50             try
51             {
52                 instance_.clientThreadPool().dispatch(invokeSent, cachedConnection_);
53             }
54             catch(Ice.CommunicatorDestroyedException)
55             {
56             }
57         }
58 
invokeExceptionAsync()59         public void invokeExceptionAsync()
60         {
61             //
62             // CommunicatorDestroyedCompleted is the only exception that can propagate directly
63             // from this method.
64             //
65             instance_.clientThreadPool().dispatch(invokeException, cachedConnection_);
66         }
67 
invokeResponseAsync()68         public void invokeResponseAsync()
69         {
70             //
71             // CommunicatorDestroyedCompleted is the only exception that can propagate directly
72             // from this method.
73             //
74             instance_.clientThreadPool().dispatch(invokeResponse, cachedConnection_);
75         }
76 
invokeSent()77         public void invokeSent()
78         {
79             try
80             {
81                 _completionCallback.handleInvokeSent(sentSynchronously_, _doneInSent, _alreadySent, this);
82             }
83             catch(System.Exception ex)
84             {
85                 warning(ex);
86             }
87 
88             if(observer_ != null && _doneInSent)
89             {
90                 observer_.detach();
91                 observer_ = null;
92             }
93         }
invokeException()94         public void invokeException()
95         {
96             try
97             {
98                 try
99                 {
100                     throw _ex;
101                 }
102                 catch(Ice.Exception ex)
103                 {
104                     _completionCallback.handleInvokeException(ex, this);
105                 }
106             }
107             catch(System.Exception ex)
108             {
109                 warning(ex);
110             }
111 
112             if(observer_ != null)
113             {
114                 observer_.detach();
115                 observer_ = null;
116             }
117         }
118 
invokeResponse()119         public void invokeResponse()
120         {
121             if(_ex != null)
122             {
123                 invokeException();
124                 return;
125             }
126 
127             try
128             {
129                 try
130                 {
131                     _completionCallback.handleInvokeResponse((state_ & StateOK) != 0, this);
132                 }
133                 catch(Ice.Exception ex)
134                 {
135                     if(_completionCallback.handleException(ex, this))
136                     {
137                         _completionCallback.handleInvokeException(ex, this);
138                     }
139                 }
140                 catch(System.AggregateException ex)
141                 {
142                     throw ex.InnerException;
143                 }
144             }
145             catch(System.Exception ex)
146             {
147                 warning(ex);
148             }
149 
150             if(observer_ != null)
151             {
152                 observer_.detach();
153                 observer_ = null;
154             }
155         }
156 
cancelable(CancellationHandler handler)157         public virtual void cancelable(CancellationHandler handler)
158         {
159             lock(this)
160             {
161                 if(_cancellationException != null)
162                 {
163                     try
164                     {
165                         throw _cancellationException;
166                     }
167                     catch(Ice.LocalException)
168                     {
169                         _cancellationException = null;
170                         throw;
171                     }
172                 }
173                 _cancellationHandler = handler;
174             }
175         }
cancel()176         public void cancel()
177         {
178             cancel(new Ice.InvocationCanceledException());
179         }
180 
attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)181         public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)
182         {
183             Ice.Instrumentation.InvocationObserver observer = getObserver();
184             if(observer != null)
185             {
186                 int size = os_.size() - Protocol.headerSize - 4;
187                 childObserver_ = observer.getRemoteObserver(info, endpt, requestId, size);
188                 if(childObserver_ != null)
189                 {
190                     childObserver_.attach();
191                 }
192             }
193         }
194 
attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)195         public void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
196         {
197             Ice.Instrumentation.InvocationObserver observer = getObserver();
198             if(observer != null)
199             {
200                 int size = os_.size() - Protocol.headerSize - 4;
201                 childObserver_ = observer.getCollocatedObserver(adapter, requestId, size);
202                 if(childObserver_ != null)
203                 {
204                     childObserver_.attach();
205                 }
206             }
207         }
208 
getOs()209         public Ice.OutputStream getOs()
210         {
211             return os_;
212         }
213 
getIs()214         public Ice.InputStream getIs()
215         {
216             return is_;
217         }
218 
throwUserException()219         public virtual void throwUserException()
220         {
221         }
222 
cacheMessageBuffers()223         public virtual void cacheMessageBuffers()
224         {
225         }
226 
isSynchronous()227         public bool isSynchronous()
228         {
229             return synchronous_;
230         }
231 
OutgoingAsyncBase(Instance instance, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)232         protected OutgoingAsyncBase(Instance instance, OutgoingAsyncCompletionCallback completionCallback,
233                                     Ice.OutputStream os = null, Ice.InputStream iss = null)
234         {
235             instance_ = instance;
236             sentSynchronously_ = false;
237             synchronous_ = false;
238             _doneInSent = false;
239             _alreadySent = false;
240             state_ = 0;
241             os_ = os ?? new Ice.OutputStream(instance, Ice.Util.currentProtocolEncoding);
242             is_ = iss ?? new Ice.InputStream(instance, Ice.Util.currentProtocolEncoding);
243             _completionCallback = completionCallback;
244             if(_completionCallback != null)
245             {
246                 _completionCallback.init(this);
247             }
248         }
249 
sentImpl(bool done)250         protected virtual bool sentImpl(bool done)
251         {
252             lock(this)
253             {
254                 _alreadySent = (state_ & StateSent) > 0;
255                 state_ |= StateSent;
256                 if(done)
257                 {
258                     _doneInSent = true;
259                     if(childObserver_ != null)
260                     {
261                         childObserver_.detach();
262                         childObserver_ = null;
263                     }
264                     _cancellationHandler = null;
265 
266                     //
267                     // For oneway requests after the data has been sent
268                     // the buffers can be reused unless this is a
269                     // collocated invocation. For collocated invocations
270                     // the buffer won't be reused because it has already
271                     // been marked as cached in invokeCollocated.
272                     //
273                     cacheMessageBuffers();
274                 }
275 
276                 bool invoke = _completionCallback.handleSent(done, _alreadySent, this);
277                 if(!invoke && _doneInSent && observer_ != null)
278                 {
279                     observer_.detach();
280                     observer_ = null;
281                 }
282                 return invoke;
283             }
284         }
285 
exceptionImpl(Ice.Exception ex)286         protected virtual bool exceptionImpl(Ice.Exception ex)
287         {
288             lock(this)
289             {
290                 _ex = ex;
291                 if(childObserver_ != null)
292                 {
293                     childObserver_.failed(ex.ice_id());
294                     childObserver_.detach();
295                     childObserver_ = null;
296                 }
297                 _cancellationHandler = null;
298 
299                 if(observer_ != null)
300                 {
301                     observer_.failed(ex.ice_id());
302                 }
303                 bool invoke = _completionCallback.handleException(ex, this);
304                 if(!invoke && observer_ != null)
305                 {
306                     observer_.detach();
307                     observer_ = null;
308                 }
309                 return invoke;
310             }
311         }
responseImpl(bool userThread, bool ok, bool invoke)312         protected virtual bool responseImpl(bool userThread, bool ok, bool invoke)
313         {
314             lock(this)
315             {
316                 if(ok)
317                 {
318                     state_ |= StateOK;
319                 }
320 
321                 _cancellationHandler = null;
322 
323                 try
324                 {
325                     invoke &= _completionCallback.handleResponse(userThread, ok, this);
326                 }
327                 catch(Ice.Exception ex)
328                 {
329                     _ex = ex;
330                     invoke = _completionCallback.handleException(ex, this);
331                 }
332                 if(!invoke && observer_ != null)
333                 {
334                     observer_.detach();
335                     observer_ = null;
336                 }
337                 return invoke;
338             }
339         }
340 
cancel(Ice.LocalException ex)341         protected void cancel(Ice.LocalException ex)
342         {
343             CancellationHandler handler;
344             {
345                 lock(this)
346                 {
347                     _cancellationException = ex;
348                     if(_cancellationHandler == null)
349                     {
350                         return;
351                     }
352                     handler = _cancellationHandler;
353                 }
354             }
355             handler.asyncRequestCanceled(this, ex);
356         }
357 
warning(System.Exception ex)358         void warning(System.Exception ex)
359         {
360             if(instance_.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
361             {
362                 instance_.initializationData().logger.warning("exception raised by AMI callback:\n" + ex);
363             }
364         }
365 
366         //
367         // This virtual method is necessary for the communicator flush
368         // batch requests implementation.
369         //
getObserver()370         virtual protected Ice.Instrumentation.InvocationObserver getObserver()
371         {
372             return observer_;
373         }
374 
sentSynchronously()375         public bool sentSynchronously()
376         {
377             return sentSynchronously_;
378         }
379 
380         protected Instance instance_;
381         protected Ice.Connection cachedConnection_;
382         protected bool sentSynchronously_;
383         protected bool synchronous_;
384         protected int state_;
385 
386         protected Ice.Instrumentation.InvocationObserver observer_;
387         protected Ice.Instrumentation.ChildInvocationObserver childObserver_;
388 
389         protected Ice.OutputStream os_;
390         protected Ice.InputStream is_;
391 
392         private bool _doneInSent;
393         private bool _alreadySent;
394         private Ice.Exception _ex;
395         private Ice.LocalException _cancellationException;
396         private CancellationHandler _cancellationHandler;
397         private OutgoingAsyncCompletionCallback _completionCallback;
398 
399         protected const int StateOK = 0x1;
400         protected const int StateDone = 0x2;
401         protected const int StateSent = 0x4;
402         protected const int StateEndCalled = 0x8;
403         protected const int StateCachedBuffers = 0x10;
404 
405         public const int AsyncStatusQueued = 0;
406         public const int AsyncStatusSent = 1;
407         public const int AsyncStatusInvokeSentCallback = 2;
408     }
409 
410     //
411     // Base class for proxy based invocations. This class handles the
412     // retry for proxy invocations. It also ensures the child observer is
413     // correct notified of failures and make sure the retry task is
414     // correctly canceled when the invocation completes.
415     //
416     public abstract class ProxyOutgoingAsyncBase : OutgoingAsyncBase, TimerTask
417     {
invokeRemote(Ice.ConnectionI connection, bool compress, bool response)418         public abstract int invokeRemote(Ice.ConnectionI connection, bool compress, bool response);
invokeCollocated(CollocatedRequestHandler handler)419         public abstract int invokeCollocated(CollocatedRequestHandler handler);
420 
exception(Ice.Exception exc)421         public override bool exception(Ice.Exception exc)
422         {
423             if(childObserver_ != null)
424             {
425                 childObserver_.failed(exc.ice_id());
426                 childObserver_.detach();
427                 childObserver_ = null;
428             }
429 
430             cachedConnection_ = null;
431             if(proxy_.iceReference().getInvocationTimeout() == -2)
432             {
433                 instance_.timer().cancel(this);
434             }
435 
436             //
437             // NOTE: at this point, synchronization isn't needed, no other threads should be
438             // calling on the callback.
439             //
440             try
441             {
442                 //
443                 // It's important to let the retry queue do the retry even if
444                 // the retry interval is 0. This method can be called with the
445                 // connection locked so we can't just retry here.
446                 //
447                 instance_.retryQueue().add(this, proxy_.iceHandleException(exc, handler_, mode_, _sent, ref _cnt));
448                 return false;
449             }
450             catch(Ice.Exception ex)
451             {
452                 return exceptionImpl(ex); // No retries, we're done
453             }
454         }
455 
cancelable(CancellationHandler handler)456         public override void cancelable(CancellationHandler handler)
457         {
458             if(proxy_.iceReference().getInvocationTimeout() == -2 && cachedConnection_ != null)
459             {
460                 int timeout = cachedConnection_.timeout();
461                 if(timeout > 0)
462                 {
463                     instance_.timer().schedule(this, timeout);
464                 }
465             }
466             base.cancelable(handler);
467         }
468 
retryException(Ice.Exception ex)469         public void retryException(Ice.Exception ex)
470         {
471             try
472             {
473                 //
474                 // It's important to let the retry queue do the retry. This is
475                 // called from the connect request handler and the retry might
476                 // require could end up waiting for the flush of the
477                 // connection to be done.
478                 //
479                 proxy_.iceUpdateRequestHandler(handler_, null); // Clear request handler and always retry.
480                 instance_.retryQueue().add(this, 0);
481             }
482             catch(Ice.Exception exc)
483             {
484                 if(exception(exc))
485                 {
486                     invokeExceptionAsync();
487                 }
488             }
489         }
490 
retry()491         public void retry()
492         {
493             invokeImpl(false);
494         }
abort(Ice.Exception ex)495         public void abort(Ice.Exception ex)
496         {
497             Debug.Assert(childObserver_ == null);
498             if(exceptionImpl(ex))
499             {
500                 invokeExceptionAsync();
501             }
502             else if(ex is Ice.CommunicatorDestroyedException)
503             {
504                 //
505                 // If it's a communicator destroyed exception, swallow
506                 // it but instead notify the user thread. Even if no callback
507                 // was provided.
508                 //
509                 throw ex;
510             }
511         }
512 
ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)513         protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx,
514                                          OutgoingAsyncCompletionCallback completionCallback,
515                                          Ice.OutputStream os = null,
516                                          Ice.InputStream iss = null) :
517             base(prx.iceReference().getInstance(), completionCallback, os, iss)
518         {
519             proxy_ = prx;
520             mode_ = Ice.OperationMode.Normal;
521             _cnt = 0;
522             _sent = false;
523         }
524 
invokeImpl(bool userThread)525         protected void invokeImpl(bool userThread)
526         {
527             try
528             {
529                 if(userThread)
530                 {
531                     int invocationTimeout = proxy_.iceReference().getInvocationTimeout();
532                     if(invocationTimeout > 0)
533                     {
534                         instance_.timer().schedule(this, invocationTimeout);
535                     }
536                 }
537                 else if(observer_ != null)
538                 {
539                     observer_.retried();
540                 }
541 
542                 while(true)
543                 {
544                     try
545                     {
546                         _sent = false;
547                         handler_ = proxy_.iceGetRequestHandler();
548                         int status = handler_.sendAsyncRequest(this);
549                         if((status & AsyncStatusSent) != 0)
550                         {
551                             if(userThread)
552                             {
553                                 sentSynchronously_ = true;
554                                 if((status & AsyncStatusInvokeSentCallback) != 0)
555                                 {
556                                     invokeSent(); // Call the sent callback from the user thread.
557                                 }
558                             }
559                             else
560                             {
561                                 if((status & AsyncStatusInvokeSentCallback) != 0)
562                                 {
563                                     invokeSentAsync(); // Call the sent callback from a client thread pool thread.
564                                 }
565                             }
566                         }
567                         return; // We're done!
568                     }
569                     catch(RetryException)
570                     {
571                         proxy_.iceUpdateRequestHandler(handler_, null); // Clear request handler and always retry.
572                     }
573                     catch(Ice.Exception ex)
574                     {
575                         if(childObserver_ != null)
576                         {
577                             childObserver_.failed(ex.ice_id());
578                             childObserver_.detach();
579                             childObserver_ = null;
580                         }
581                         int interval = proxy_.iceHandleException(ex, handler_, mode_, _sent, ref _cnt);
582                         if(interval > 0)
583                         {
584                             instance_.retryQueue().add(this, interval);
585                             return;
586                         }
587                         else if(observer_ != null)
588                         {
589                             observer_.retried();
590                         }
591                     }
592                 }
593             }
594             catch(Ice.Exception ex)
595             {
596                 //
597                 // If called from the user thread we re-throw, the exception
598                 // will be catch by the caller and abort() will be called.
599                 //
600                 if(userThread)
601                 {
602                     throw;
603                 }
604                 else if(exceptionImpl(ex)) // No retries, we're done
605                 {
606                     invokeExceptionAsync();
607                 }
608             }
609         }
sentImpl(bool done)610         protected override bool sentImpl(bool done)
611         {
612             _sent = true;
613             if(done)
614             {
615                 if(proxy_.iceReference().getInvocationTimeout() != -1)
616                 {
617                     instance_.timer().cancel(this);
618                 }
619             }
620             return base.sentImpl(done);
621         }
exceptionImpl(Ice.Exception ex)622         protected override bool exceptionImpl(Ice.Exception ex)
623         {
624             if(proxy_.iceReference().getInvocationTimeout() != -1)
625             {
626                 instance_.timer().cancel(this);
627             }
628             return base.exceptionImpl(ex);
629         }
630 
responseImpl(bool userThread, bool ok, bool invoke)631         protected override bool responseImpl(bool userThread, bool ok, bool invoke)
632         {
633             if(proxy_.iceReference().getInvocationTimeout() != -1)
634             {
635                 instance_.timer().cancel(this);
636             }
637             return base.responseImpl(userThread, ok, invoke);
638         }
639 
runTimerTask()640         public void runTimerTask()
641         {
642             if(proxy_.iceReference().getInvocationTimeout() == -2)
643             {
644                 cancel(new Ice.ConnectionTimeoutException());
645             }
646             else
647             {
648                 cancel(new Ice.InvocationTimeoutException());
649             }
650         }
651 
652         protected readonly Ice.ObjectPrxHelperBase proxy_;
653         protected RequestHandler handler_;
654         protected Ice.OperationMode mode_;
655 
656         private int _cnt;
657         private bool _sent;
658     }
659 
660     //
661     // Class for handling Slice operation invocations
662     //
663     public class OutgoingAsync : ProxyOutgoingAsyncBase
664     {
OutgoingAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)665         public OutgoingAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback,
666                              Ice.OutputStream os = null, Ice.InputStream iss = null) :
667             base(prx, completionCallback, os, iss)
668         {
669             encoding_ = Protocol.getCompatibleEncoding(proxy_.iceReference().getEncoding());
670             synchronous_ = false;
671         }
672 
prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context)673         public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context)
674         {
675             Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol()));
676 
677             mode_ = mode;
678 
679             observer_ = ObserverHelper.get(proxy_, operation, context);
680 
681             switch(proxy_.iceReference().getMode())
682             {
683                 case Reference.Mode.ModeTwoway:
684                 case Reference.Mode.ModeOneway:
685                 case Reference.Mode.ModeDatagram:
686                 {
687                     os_.writeBlob(Protocol.requestHdr);
688                     break;
689                 }
690 
691                 case Reference.Mode.ModeBatchOneway:
692                 case Reference.Mode.ModeBatchDatagram:
693                 {
694                     proxy_.iceGetBatchRequestQueue().prepareBatchRequest(os_);
695                     break;
696                 }
697             }
698 
699             Reference rf = proxy_.iceReference();
700 
701             rf.getIdentity().ice_writeMembers(os_);
702 
703             //
704             // For compatibility with the old FacetPath.
705             //
706             string facet = rf.getFacet();
707             if(facet == null || facet.Length == 0)
708             {
709                 os_.writeStringSeq(null);
710             }
711             else
712             {
713                 string[] facetPath = { facet };
714                 os_.writeStringSeq(facetPath);
715             }
716 
717             os_.writeString(operation);
718 
719             os_.writeByte((byte)mode);
720 
721             if(context != null)
722             {
723                 //
724                 // Explicit context
725                 //
726                 Ice.ContextHelper.write(os_, context);
727             }
728             else
729             {
730                 //
731                 // Implicit context
732                 //
733                 Ice.ImplicitContextI implicitContext = rf.getInstance().getImplicitContext();
734                 Dictionary<string, string> prxContext = rf.getContext();
735 
736                 if(implicitContext == null)
737                 {
738                     Ice.ContextHelper.write(os_, prxContext);
739                 }
740                 else
741                 {
742                     implicitContext.write(prxContext, os_);
743                 }
744             }
745         }
sent()746         public override bool sent()
747         {
748             return base.sentImpl(!proxy_.ice_isTwoway()); // done = true if it's not a two-way proxy
749         }
750 
response()751         public override bool response()
752         {
753             //
754             // NOTE: this method is called from ConnectionI.parseMessage
755             // with the connection locked. Therefore, it must not invoke
756             // any user callbacks.
757             //
758             Debug.Assert(proxy_.ice_isTwoway()); // Can only be called for twoways.
759 
760             if(childObserver_ != null)
761             {
762                 childObserver_.reply(is_.size() - Protocol.headerSize - 4);
763                 childObserver_.detach();
764                 childObserver_ = null;
765             }
766 
767             byte replyStatus;
768             try
769             {
770                 replyStatus = is_.readByte();
771 
772                 switch(replyStatus)
773                 {
774                     case ReplyStatus.replyOK:
775                     {
776                         break;
777                     }
778                     case ReplyStatus.replyUserException:
779                     {
780                         if(observer_ != null)
781                         {
782                             observer_.userException();
783                         }
784                         break;
785                     }
786 
787                     case ReplyStatus.replyObjectNotExist:
788                     case ReplyStatus.replyFacetNotExist:
789                     case ReplyStatus.replyOperationNotExist:
790                     {
791                         Ice.Identity ident = new Ice.Identity();
792                         ident.ice_readMembers(is_);
793 
794                         //
795                         // For compatibility with the old FacetPath.
796                         //
797                         string[] facetPath = is_.readStringSeq();
798                         ;
799                         string facet;
800                         if(facetPath.Length > 0)
801                         {
802                             if(facetPath.Length > 1)
803                             {
804                                 throw new Ice.MarshalException();
805                             }
806                             facet = facetPath[0];
807                         }
808                         else
809                         {
810                             facet = "";
811                         }
812 
813                         string operation = is_.readString();
814 
815                         Ice.RequestFailedException ex = null;
816                         switch(replyStatus)
817                         {
818                             case ReplyStatus.replyObjectNotExist:
819                             {
820                                 ex = new Ice.ObjectNotExistException();
821                                 break;
822                             }
823 
824                             case ReplyStatus.replyFacetNotExist:
825                             {
826                                 ex = new Ice.FacetNotExistException();
827                                 break;
828                             }
829 
830                             case ReplyStatus.replyOperationNotExist:
831                             {
832                                 ex = new Ice.OperationNotExistException();
833                                 break;
834                             }
835 
836                             default:
837                             {
838                                 Debug.Assert(false);
839                                 break;
840                             }
841                         }
842 
843                         ex.id = ident;
844                         ex.facet = facet;
845                         ex.operation = operation;
846                         throw ex;
847                     }
848 
849                     case ReplyStatus.replyUnknownException:
850                     case ReplyStatus.replyUnknownLocalException:
851                     case ReplyStatus.replyUnknownUserException:
852                     {
853                         string unknown = is_.readString();
854 
855                         Ice.UnknownException ex = null;
856                         switch(replyStatus)
857                         {
858                             case ReplyStatus.replyUnknownException:
859                             {
860                                 ex = new Ice.UnknownException();
861                                 break;
862                             }
863 
864                             case ReplyStatus.replyUnknownLocalException:
865                             {
866                                 ex = new Ice.UnknownLocalException();
867                                 break;
868                             }
869 
870                             case ReplyStatus.replyUnknownUserException:
871                             {
872                                 ex = new Ice.UnknownUserException();
873                                 break;
874                             }
875 
876                             default:
877                             {
878                                 Debug.Assert(false);
879                                 break;
880                             }
881                         }
882 
883                         ex.unknown = unknown;
884                         throw ex;
885                     }
886 
887                     default:
888                     {
889                         throw new Ice.UnknownReplyStatusException();
890                     }
891                 }
892 
893                 return responseImpl(false, replyStatus == ReplyStatus.replyOK, true);
894             }
895             catch(Ice.Exception ex)
896             {
897                 return exception(ex);
898             }
899         }
900 
invokeRemote(Ice.ConnectionI connection, bool compress, bool response)901         public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
902         {
903             cachedConnection_ = connection;
904             return connection.sendAsyncRequest(this, compress, response, 0);
905         }
906 
invokeCollocated(CollocatedRequestHandler handler)907         public override int invokeCollocated(CollocatedRequestHandler handler)
908         {
909             // The stream cannot be cached if the proxy is not a twoway or there is an invocation timeout set.
910             if(!proxy_.ice_isTwoway() || proxy_.iceReference().getInvocationTimeout() != -1)
911             {
912                 // Disable caching by marking the streams as cached!
913                 state_ |= StateCachedBuffers;
914             }
915             return handler.invokeAsyncRequest(this, 0, synchronous_);
916         }
917 
abort(Ice.Exception ex)918         public new void abort(Ice.Exception ex)
919         {
920             Reference.Mode mode = proxy_.iceReference().getMode();
921             if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
922             {
923                 //
924                 // If we didn't finish a batch oneway or datagram request, we
925                 // must notify the connection about that we give up ownership
926                 // of the batch stream.
927                 //
928                 proxy_.iceGetBatchRequestQueue().abortBatchRequest(os_);
929             }
930 
931             base.abort(ex);
932         }
933 
invoke(string operation, bool synchronous)934         protected void invoke(string operation, bool synchronous)
935         {
936             synchronous_ = synchronous;
937             Reference.Mode mode = proxy_.iceReference().getMode();
938             if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
939             {
940                 sentSynchronously_ = true;
941                 proxy_.iceGetBatchRequestQueue().finishBatchRequest(os_, proxy_, operation);
942                 responseImpl(true, true, false); // Don't call sent/completed callback for batch AMI requests
943                 return;
944             }
945 
946             //
947             // NOTE: invokeImpl doesn't throw so this can be called from the
948             // try block with the catch block calling abort() in case of an
949             // exception.
950             //
951             invokeImpl(true); // userThread = true
952         }
953 
invoke(string operation, Ice.OperationMode mode, Ice.FormatType format, Dictionary<string, string> context, bool synchronous, System.Action<Ice.OutputStream> write)954         public void invoke(string operation,
955                            Ice.OperationMode mode,
956                            Ice.FormatType format,
957                            Dictionary<string, string> context,
958                            bool synchronous,
959                            System.Action<Ice.OutputStream> write)
960         {
961             try
962             {
963                 prepare(operation, mode, context);
964                 if(write != null)
965                 {
966                     os_.startEncapsulation(encoding_, format);
967                     write(os_);
968                     os_.endEncapsulation();
969                 }
970                 else
971                 {
972                     os_.writeEmptyEncapsulation(encoding_);
973                 }
974                 invoke(operation, synchronous);
975             }
976             catch(Ice.Exception ex)
977             {
978                 abort(ex);
979             }
980         }
981 
throwUserException()982         public override void throwUserException()
983         {
984             try
985             {
986                 is_.startEncapsulation();
987                 is_.throwException();
988             }
989             catch(Ice.UserException ex)
990             {
991                 is_.endEncapsulation();
992                 if(userException_!= null)
993                 {
994                     userException_.Invoke(ex);
995                 }
996                 throw new Ice.UnknownUserException(ex.ice_id());
997             }
998         }
999 
cacheMessageBuffers()1000         public override void cacheMessageBuffers()
1001         {
1002             if(proxy_.iceReference().getInstance().cacheMessageBuffers() > 0)
1003             {
1004                 lock(this)
1005                 {
1006                     if((state_ & StateCachedBuffers) > 0)
1007                     {
1008                         return;
1009                     }
1010                     state_ |= StateCachedBuffers;
1011                 }
1012 
1013                 if(is_ != null)
1014                 {
1015                     is_.reset();
1016                 }
1017                 os_.reset();
1018 
1019                 proxy_.cacheMessageBuffers(is_, os_);
1020 
1021                 is_ = null;
1022                 os_ = null;
1023             }
1024         }
1025 
1026         protected readonly Ice.EncodingVersion encoding_;
1027         protected System.Action<Ice.UserException> userException_;
1028     }
1029 
1030     public class OutgoingAsyncT<T> : OutgoingAsync
1031     {
OutgoingAsyncT(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, Ice.OutputStream os = null, Ice.InputStream iss = null)1032         public OutgoingAsyncT(Ice.ObjectPrxHelperBase prx,
1033                               OutgoingAsyncCompletionCallback completionCallback,
1034                               Ice.OutputStream os = null,
1035                               Ice.InputStream iss = null) :
1036             base(prx, completionCallback, os, iss)
1037         {
1038         }
1039 
invoke(string operation, Ice.OperationMode mode, Ice.FormatType format, Dictionary<string, string> context, bool synchronous, System.Action<Ice.OutputStream> write = null, System.Action<Ice.UserException> userException = null, System.Func<Ice.InputStream, T> read = null)1040         public void invoke(string operation,
1041                            Ice.OperationMode mode,
1042                            Ice.FormatType format,
1043                            Dictionary<string, string> context,
1044                            bool synchronous,
1045                            System.Action<Ice.OutputStream> write = null,
1046                            System.Action<Ice.UserException> userException = null,
1047                            System.Func<Ice.InputStream, T> read = null)
1048         {
1049             read_ = read;
1050             userException_ = userException;
1051             base.invoke(operation, mode, format, context, synchronous, write);
1052         }
1053 
getResult(bool ok)1054         public T getResult(bool ok)
1055         {
1056             try
1057             {
1058                 if(ok)
1059                 {
1060                     if(read_ == null)
1061                     {
1062                         if(is_ == null || is_.isEmpty())
1063                         {
1064                             //
1065                             // If there's no response (oneway, batch-oneway proxies), we just set the result
1066                             // on completion without reading anything from the input stream. This is required for
1067                             // batch invocations.
1068                             //
1069                         }
1070                         else
1071                         {
1072                             is_.skipEmptyEncapsulation();
1073                         }
1074                         return default(T);
1075                     }
1076                     else
1077                     {
1078                         is_.startEncapsulation();
1079                         T r = read_(is_);
1080                         is_.endEncapsulation();
1081                         return r;
1082                     }
1083                 }
1084                 else
1085                 {
1086                     throwUserException();
1087                     return default(T); // make compiler happy
1088                 }
1089             }
1090             finally
1091             {
1092                 cacheMessageBuffers();
1093             }
1094         }
1095 
1096         protected System.Func<Ice.InputStream, T> read_;
1097     }
1098 
1099     //
1100     // Class for handling the proxy's begin_ice_flushBatchRequest request.
1101     //
1102     class ProxyFlushBatchAsync : ProxyOutgoingAsyncBase
1103     {
ProxyFlushBatchAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback)1104         public ProxyFlushBatchAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) :
1105             base(prx, completionCallback)
1106         {
1107         }
1108 
invokeRemote(Ice.ConnectionI connection, bool compress, bool response)1109         public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
1110         {
1111             if(_batchRequestNum == 0)
1112             {
1113                 if(sent())
1114                 {
1115                     return AsyncStatusSent | AsyncStatusInvokeSentCallback;
1116                 }
1117                 else
1118                 {
1119                     return AsyncStatusSent;
1120                 }
1121             }
1122             cachedConnection_ = connection;
1123             return connection.sendAsyncRequest(this, compress, false, _batchRequestNum);
1124         }
1125 
invokeCollocated(CollocatedRequestHandler handler)1126         public override int invokeCollocated(CollocatedRequestHandler handler)
1127         {
1128             if(_batchRequestNum == 0)
1129             {
1130                 if(sent())
1131                 {
1132                     return AsyncStatusSent | AsyncStatusInvokeSentCallback;
1133                 }
1134                 else
1135                 {
1136                     return AsyncStatusSent;
1137                 }
1138             }
1139             return handler.invokeAsyncRequest(this, _batchRequestNum, false);
1140         }
1141 
invoke(string operation, bool synchronous)1142         public void invoke(string operation, bool synchronous)
1143         {
1144             Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol()));
1145             synchronous_ = synchronous;
1146             observer_ = ObserverHelper.get(proxy_, operation, null);
1147             bool compress; // Not used for proxy flush batch requests.
1148             _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_, out compress);
1149             invokeImpl(true); // userThread = true
1150         }
1151 
1152         private int _batchRequestNum;
1153     }
1154 
1155     //
1156     // Class for handling the proxy's begin_ice_getConnection request.
1157     //
1158     class ProxyGetConnection : ProxyOutgoingAsyncBase
1159     {
ProxyGetConnection(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback)1160         public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) :
1161             base(prx, completionCallback)
1162         {
1163         }
1164 
invokeRemote(Ice.ConnectionI connection, bool compress, bool response)1165         public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
1166         {
1167             cachedConnection_ = connection;
1168             if(responseImpl(false, true, true))
1169             {
1170                 invokeResponseAsync();
1171             }
1172             return AsyncStatusSent;
1173         }
1174 
invokeCollocated(CollocatedRequestHandler handler)1175         public override int invokeCollocated(CollocatedRequestHandler handler)
1176         {
1177             if(responseImpl(false, true, true))
1178             {
1179                 invokeResponseAsync();
1180             }
1181             return AsyncStatusSent;
1182         }
1183 
getConnection()1184         public Ice.Connection getConnection()
1185         {
1186             return cachedConnection_;
1187         }
1188 
invoke(string operation, bool synchronous)1189         public void invoke(string operation, bool synchronous)
1190         {
1191             synchronous_ = synchronous;
1192             observer_ = ObserverHelper.get(proxy_, operation, null);
1193             invokeImpl(true); // userThread = true
1194         }
1195     }
1196 
1197     class ConnectionFlushBatchAsync : OutgoingAsyncBase
1198     {
ConnectionFlushBatchAsync(Ice.ConnectionI connection, Instance instance, OutgoingAsyncCompletionCallback completionCallback)1199         public ConnectionFlushBatchAsync(Ice.ConnectionI connection,
1200                                          Instance instance,
1201                                          OutgoingAsyncCompletionCallback completionCallback) :
1202             base(instance, completionCallback)
1203         {
1204             _connection = connection;
1205         }
1206 
invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)1207         public void invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)
1208         {
1209             synchronous_ = synchronous;
1210             observer_ = ObserverHelper.get(instance_, operation);
1211             try
1212             {
1213                 int status;
1214                 bool compress;
1215                 int batchRequestNum = _connection.getBatchRequestQueue().swap(os_, out compress);
1216                 if(batchRequestNum == 0)
1217                 {
1218                     status = AsyncStatusSent;
1219                     if(sent())
1220                     {
1221                         status = status | AsyncStatusInvokeSentCallback;
1222                     }
1223                 }
1224                 else
1225                 {
1226                     bool comp;
1227                     if(compressBatch == Ice.CompressBatch.Yes)
1228                     {
1229                         comp = true;
1230                     }
1231                     else if(compressBatch == Ice.CompressBatch.No)
1232                     {
1233                         comp = false;
1234                     }
1235                     else
1236                     {
1237                         comp = compress;
1238                     }
1239                     status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum);
1240                 }
1241 
1242                 if((status & AsyncStatusSent) != 0)
1243                 {
1244                     sentSynchronously_ = true;
1245                     if((status & AsyncStatusInvokeSentCallback) != 0)
1246                     {
1247                         invokeSent();
1248                     }
1249                 }
1250             }
1251             catch(RetryException ex)
1252             {
1253                 try
1254                 {
1255                     throw ex.get();
1256                 }
1257                 catch(Ice.LocalException ee)
1258                 {
1259                     if(exception(ee))
1260                     {
1261                         invokeExceptionAsync();
1262                     }
1263                 }
1264             }
1265             catch(Ice.Exception ex)
1266             {
1267                 if(exception(ex))
1268                 {
1269                     invokeExceptionAsync();
1270                 }
1271             }
1272         }
1273 
1274         private readonly Ice.ConnectionI _connection;
1275     };
1276 
1277     public class CommunicatorFlushBatchAsync : OutgoingAsyncBase
1278     {
1279         class FlushBatch : OutgoingAsyncBase
1280         {
FlushBatch(CommunicatorFlushBatchAsync outAsync, Instance instance, Ice.Instrumentation.InvocationObserver observer)1281             public FlushBatch(CommunicatorFlushBatchAsync outAsync,
1282                               Instance instance,
1283                               Ice.Instrumentation.InvocationObserver observer) : base(instance, null)
1284             {
1285                 _outAsync = outAsync;
1286                 _observer = observer;
1287             }
1288 
1289             public override bool
sent()1290             sent()
1291             {
1292                 if(childObserver_ != null)
1293                 {
1294                     childObserver_.detach();
1295                     childObserver_ = null;
1296                 }
1297                 _outAsync.check(false);
1298                 return false;
1299             }
1300 
1301             public override bool
exception(Ice.Exception ex)1302             exception(Ice.Exception ex)
1303             {
1304                 if(childObserver_ != null)
1305                 {
1306                     childObserver_.failed(ex.ice_id());
1307                     childObserver_.detach();
1308                     childObserver_ = null;
1309                 }
1310                 _outAsync.check(false);
1311                 return false;
1312             }
1313 
1314             protected override Ice.Instrumentation.InvocationObserver
getObserver()1315             getObserver()
1316             {
1317                 return _observer;
1318             }
1319 
1320             private CommunicatorFlushBatchAsync _outAsync;
1321             private Ice.Instrumentation.InvocationObserver _observer;
1322         };
1323 
CommunicatorFlushBatchAsync(Instance instance, OutgoingAsyncCompletionCallback callback)1324         public CommunicatorFlushBatchAsync(Instance instance, OutgoingAsyncCompletionCallback callback) :
1325             base(instance, callback)
1326         {
1327             //
1328             // _useCount is initialized to 1 to prevent premature callbacks.
1329             // The caller must invoke ready() after all flush requests have
1330             // been initiated.
1331             //
1332             _useCount = 1;
1333         }
1334 
flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch)1335         public void flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch)
1336         {
1337             lock(this)
1338             {
1339                 ++_useCount;
1340             }
1341 
1342             try
1343             {
1344                 var flushBatch = new FlushBatch(this, instance_, observer_);
1345                 bool compress;
1346                 int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), out compress);
1347                 if(batchRequestNum == 0)
1348                 {
1349                     flushBatch.sent();
1350                 }
1351                 else
1352                 {
1353                     bool comp;
1354                     if(compressBatch == Ice.CompressBatch.Yes)
1355                     {
1356                         comp = true;
1357                     }
1358                     else if(compressBatch == Ice.CompressBatch.No)
1359                     {
1360                         comp = false;
1361                     }
1362                     else
1363                     {
1364                         comp = compress;
1365                     }
1366                     con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum);
1367                 }
1368             }
1369             catch(Ice.LocalException)
1370             {
1371                 check(false);
1372                 throw;
1373             }
1374         }
1375 
invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)1376         public void invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)
1377         {
1378             synchronous_ = synchronous;
1379             observer_ = ObserverHelper.get(instance_, operation);
1380             instance_.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this);
1381             instance_.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this);
1382             check(true);
1383         }
1384 
check(bool userThread)1385         public void check(bool userThread)
1386         {
1387             lock(this)
1388             {
1389                 Debug.Assert(_useCount > 0);
1390                 if(--_useCount > 0)
1391                 {
1392                     return;
1393                 }
1394             }
1395 
1396             if(sentImpl(true))
1397             {
1398                 if(userThread)
1399                 {
1400                     sentSynchronously_ = true;
1401                     invokeSent();
1402                 }
1403                 else
1404                 {
1405                     invokeSentAsync();
1406                 }
1407             }
1408         }
1409 
1410         private int _useCount;
1411     };
1412 
1413     public abstract class TaskCompletionCallback<T> : TaskCompletionSource<T>, OutgoingAsyncCompletionCallback
1414     {
TaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)1415         public TaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)
1416         {
1417             progress_ = progress;
1418             _cancellationToken = cancellationToken;
1419         }
1420 
init(OutgoingAsyncBase outgoing)1421         public void init(OutgoingAsyncBase outgoing)
1422         {
1423             if(_cancellationToken.CanBeCanceled)
1424             {
1425                 _cancellationToken.Register(outgoing.cancel);
1426             }
1427         }
1428 
handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)1429         public bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)
1430         {
1431             if(done && og.isSynchronous())
1432             {
1433                 Debug.Assert(progress_ == null);
1434                 handleInvokeSent(false, done, alreadySent, og);
1435                 return false;
1436             }
1437             return done || progress_ != null && !alreadySent; // Invoke the sent callback only if not already invoked.
1438         }
1439 
handleException(Ice.Exception ex, OutgoingAsyncBase og)1440         public bool handleException(Ice.Exception ex, OutgoingAsyncBase og)
1441         {
1442             //
1443             // If this is a synchronous call, we can notify the task from this thread to avoid
1444             // the thread context switch. We know there aren't any continuations setup with the
1445             // task.
1446             //
1447             if(og.isSynchronous())
1448             {
1449                 handleInvokeException(ex, og);
1450                 return false;
1451             }
1452             else
1453             {
1454                 return true;
1455             }
1456         }
1457 
handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)1458         public bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)
1459         {
1460             //
1461             // If called from the user thread (only the case for batch requests) or if this
1462             // is a synchronous call, we can notify the task from this thread to avoid the
1463             // thread context switch. We know there aren't any continuations setup with the
1464             // task.
1465             //
1466             if(userThread || og.isSynchronous())
1467             {
1468                 handleInvokeResponse(ok, og);
1469                 return false;
1470             }
1471             else
1472             {
1473                 return true;
1474             }
1475         }
1476 
handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)1477         public virtual void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)
1478         {
1479             if(progress_ != null && !alreadySent)
1480             {
1481                progress_.Report(sentSynchronously);
1482             }
1483             if(done)
1484             {
1485                 SetResult(default(T));
1486             }
1487         }
1488 
handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)1489         public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)
1490         {
1491             SetException(ex);
1492         }
1493 
handleInvokeResponse(bool ok, OutgoingAsyncBase og)1494         abstract public void handleInvokeResponse(bool ok, OutgoingAsyncBase og);
1495 
1496         private readonly CancellationToken _cancellationToken;
1497 
1498         protected readonly System.IProgress<bool> progress_;
1499     }
1500 
1501     public class OperationTaskCompletionCallback<T> : TaskCompletionCallback<T>
1502     {
OperationTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)1503         public OperationTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) :
1504             base(progress, cancellationToken)
1505         {
1506         }
1507 
handleInvokeResponse(bool ok, OutgoingAsyncBase og)1508         public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og)
1509         {
1510             SetResult(((OutgoingAsyncT<T>)og).getResult(ok));
1511         }
1512     }
1513 
1514     public class FlushBatchTaskCompletionCallback : TaskCompletionCallback<object>
1515     {
FlushBatchTaskCompletionCallback(System.IProgress<bool> progress = null, CancellationToken cancellationToken = new CancellationToken())1516         public FlushBatchTaskCompletionCallback(System.IProgress<bool> progress = null,
1517                                                 CancellationToken cancellationToken = new CancellationToken()) :
1518             base(progress, cancellationToken)
1519         {
1520         }
1521 
handleInvokeResponse(bool ok, OutgoingAsyncBase og)1522         public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og)
1523         {
1524             SetResult(null);
1525         }
1526     }
1527 
1528     abstract public class AsyncResultCompletionCallback : AsyncResultI, OutgoingAsyncCompletionCallback
1529     {
AsyncResultCompletionCallback(Ice.Communicator com, Instance instance, string op, object cookie, Ice.AsyncCallback cb)1530         public AsyncResultCompletionCallback(Ice.Communicator com, Instance instance, string op, object cookie,
1531                                              Ice.AsyncCallback cb) :
1532             base(com, instance, op, cookie, cb)
1533         {
1534         }
1535 
init(OutgoingAsyncBase outgoing)1536         public void init(OutgoingAsyncBase outgoing)
1537         {
1538             outgoing_ = outgoing;
1539         }
1540 
handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)1541         public bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)
1542         {
1543             lock(this)
1544             {
1545                 state_ |= StateSent;
1546                 if(done)
1547                 {
1548                     state_ |= StateDone | StateOK;
1549                 }
1550                 if(waitHandle_ != null)
1551                 {
1552                     waitHandle_.Set();
1553                 }
1554                 Monitor.PulseAll(this);
1555 
1556                 //
1557                 // Invoke the sent callback only if not already invoked.
1558                 //
1559                 return !alreadySent && sentCallback_ != null;
1560             }
1561         }
1562 
handleException(Ice.Exception ex, OutgoingAsyncBase og)1563         public bool handleException(Ice.Exception ex, OutgoingAsyncBase og)
1564         {
1565             lock(this)
1566             {
1567                 state_ |= StateDone;
1568                 exception_ = ex;
1569                 if(waitHandle_ != null)
1570                 {
1571                     waitHandle_.Set();
1572                 }
1573                 Monitor.PulseAll(this);
1574                 return completedCallback_ != null;
1575             }
1576         }
1577 
handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)1578         public bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)
1579         {
1580             lock(this)
1581             {
1582                 state_ |= StateDone;
1583                 if(ok)
1584                 {
1585                     state_ |= StateOK;
1586                 }
1587                 if(waitHandle_ != null)
1588                 {
1589                     waitHandle_.Set();
1590                 }
1591                 Monitor.PulseAll(this);
1592                 return completedCallback_ != null;
1593             }
1594         }
1595 
handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)1596         public void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)
1597         {
1598             sentCallback_(this);
1599         }
1600 
handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)1601         public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)
1602         {
1603             try
1604             {
1605                 completedCallback_(this);
1606             }
1607             catch(Ice.Exception e)
1608             {
1609                 throw new System.AggregateException(e);
1610             }
1611         }
1612 
handleInvokeResponse(bool ok, OutgoingAsyncBase og)1613         public void handleInvokeResponse(bool ok, OutgoingAsyncBase og)
1614         {
1615             try
1616             {
1617                 completedCallback_(this);
1618             }
1619             catch(Ice.Exception e)
1620             {
1621                 throw new System.AggregateException(e);
1622             }
1623         }
1624     }
1625 
1626     abstract public class ProxyAsyncResultCompletionCallback<T> : AsyncResultCompletionCallback, Ice.AsyncResult<T>
1627     {
ProxyAsyncResultCompletionCallback(Ice.ObjectPrxHelperBase proxy, string operation, object cookie, Ice.AsyncCallback cb)1628         public ProxyAsyncResultCompletionCallback(Ice.ObjectPrxHelperBase proxy, string operation, object cookie,
1629                                                   Ice.AsyncCallback cb) :
1630             base(proxy.ice_getCommunicator(), proxy.iceReference().getInstance(), operation, cookie, cb)
1631         {
1632             _proxy = proxy;
1633         }
1634 
getProxy()1635         public override Ice.ObjectPrx getProxy()
1636         {
1637             return _proxy;
1638         }
1639 
whenCompleted(Ice.ExceptionCallback excb)1640         new public Ice.AsyncResult<T> whenCompleted(Ice.ExceptionCallback excb)
1641         {
1642             base.whenCompleted(excb);
1643             return this;
1644         }
1645 
whenCompleted(T cb, Ice.ExceptionCallback excb)1646         virtual public Ice.AsyncResult<T> whenCompleted(T cb, Ice.ExceptionCallback excb)
1647         {
1648             if(cb == null && excb == null)
1649             {
1650                 throw new System.ArgumentException("callback is null");
1651             }
1652             lock(this)
1653             {
1654                 if(responseCallback_ != null || exceptionCallback_ != null)
1655                 {
1656                     throw new System.ArgumentException("callback already set");
1657                 }
1658                 responseCallback_ = cb;
1659                 exceptionCallback_ = excb;
1660             }
1661             setCompletedCallback(getCompletedCallback());
1662             return this;
1663         }
1664 
whenSent(Ice.SentCallback cb)1665         new public Ice.AsyncResult<T> whenSent(Ice.SentCallback cb)
1666         {
1667             base.whenSent(cb);
1668             return this;
1669         }
1670 
1671         protected T responseCallback_;
1672         private Ice.ObjectPrx _proxy;
1673     }
1674 
1675     public class OperationAsyncResultCompletionCallback<T, R> : ProxyAsyncResultCompletionCallback<T>
1676     {
OperationAsyncResultCompletionCallback(System.Action<T, R> completed, Ice.ObjectPrxHelperBase proxy, string operation, object cookie, Ice.AsyncCallback callback)1677         public OperationAsyncResultCompletionCallback(System.Action<T, R> completed,
1678                                                       Ice.ObjectPrxHelperBase proxy,
1679                                                       string operation,
1680                                                       object cookie,
1681                                                       Ice.AsyncCallback callback) :
1682             base(proxy, operation, cookie, callback)
1683         {
1684             _completed = completed;
1685         }
1686 
getCompletedCallback()1687         override protected Ice.AsyncCallback getCompletedCallback()
1688         {
1689             return (Ice.AsyncResult r) =>
1690             {
1691                 Debug.Assert(r == this);
1692                 try
1693                 {
1694                     R result = ((OutgoingAsyncT<R>)outgoing_).getResult(wait());
1695                     try
1696                     {
1697                         _completed(responseCallback_, result);
1698                     }
1699                     catch(Ice.Exception ex)
1700                     {
1701                         throw new System.AggregateException(ex);
1702                     }
1703                 }
1704                 catch(Ice.Exception ex)
1705                 {
1706                     if(exceptionCallback_ != null)
1707                     {
1708                         exceptionCallback_.Invoke(ex);
1709                     }
1710                 }
1711             };
1712         }
1713 
1714         private System.Action<T, R> _completed;
1715     }
1716 }
1717