1 //-----------------------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //-----------------------------------------------------------------------------
4 
5 namespace System.ServiceModel.Channels
6 {
7     using System;
8     using System.Collections.Generic;
9     using System.Diagnostics;
10     using System.Runtime;
11     using System.ServiceModel;
12     using System.ServiceModel.Diagnostics;
13     using System.ServiceModel.Dispatcher;
14     using System.Threading;
15     using System.Xml;
16     using System.ServiceModel.Diagnostics.Application;
17     using System.Diagnostics.CodeAnalysis;
18 
19     class ChannelDemuxer
20     {
21         public readonly static TimeSpan UseDefaultReceiveTimeout = TimeSpan.MinValue;
22 
23         TypedChannelDemuxer inputDemuxer;
24         TypedChannelDemuxer replyDemuxer;
25         Dictionary<Type, TypedChannelDemuxer> typeDemuxers;
26         TimeSpan peekTimeout;
27         int maxPendingSessions;
28 
ChannelDemuxer()29         public ChannelDemuxer()
30         {
31             this.peekTimeout = ChannelDemuxer.UseDefaultReceiveTimeout; //use the default receive timeout (original behavior)
32             this.maxPendingSessions = 10;
33             this.typeDemuxers = new Dictionary<Type, TypedChannelDemuxer>();
34         }
35 
36         public TimeSpan PeekTimeout
37         {
38             get
39             {
40                 return this.peekTimeout;
41             }
42             set
43             {
44                 this.peekTimeout = value;
45             }
46         }
47 
48         public int MaxPendingSessions
49         {
50             get
51             {
52                 return this.maxPendingSessions;
53             }
54             set
55             {
56                 this.maxPendingSessions = value;
57             }
58         }
59 
60         public IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
61             where TChannel : class, IChannel
62         {
63             return this.BuildChannelListener<TChannel>(context, new ChannelDemuxerFilter(new MatchAllMessageFilter(), 0));
64         }
65 
66         public IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context, ChannelDemuxerFilter filter)
67             where TChannel : class, IChannel
68         {
69             return GetTypedDemuxer(typeof(TChannel), context).BuildChannelListener<TChannel>(filter);
70         }
71 
CreateTypedDemuxer(Type channelType, BindingContext context)72         TypedChannelDemuxer CreateTypedDemuxer(Type channelType, BindingContext context)
73         {
74             if (channelType == typeof(IDuplexChannel))
75                 return (TypedChannelDemuxer)(object)new DuplexChannelDemuxer(context);
76             if (channelType == typeof(IInputSessionChannel))
77                 return (TypedChannelDemuxer)(object)new InputSessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions);
78             if (channelType == typeof(IReplySessionChannel))
79                 return (TypedChannelDemuxer)(object)new ReplySessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions);
80             if (channelType == typeof(IDuplexSessionChannel))
81                 return (TypedChannelDemuxer)(object)new DuplexSessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions);
82             throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException());
83         }
84 
85         [SuppressMessage(FxCop.Category.Usage, "CA2301:EmbeddableTypesInContainersRule", MessageId = "typeDemuxers", Justification = "No need to support type equivalence here.")]
GetTypedDemuxer(Type channelType, BindingContext context)86         TypedChannelDemuxer GetTypedDemuxer(Type channelType, BindingContext context)
87         {
88             TypedChannelDemuxer typeDemuxer = null;
89             bool createdDemuxer = false;
90 
91             if (channelType == typeof(IInputChannel))
92             {
93                 if (this.inputDemuxer == null)
94                 {
95                     if (context.CanBuildInnerChannelListener<IReplyChannel>())
96                         this.inputDemuxer = this.replyDemuxer = new ReplyChannelDemuxer(context);
97                     else
98                         this.inputDemuxer = new InputChannelDemuxer(context);
99                     createdDemuxer = true;
100                 }
101                 typeDemuxer = this.inputDemuxer;
102             }
103             else if (channelType == typeof(IReplyChannel))
104             {
105                 if (this.replyDemuxer == null)
106                 {
107                     this.inputDemuxer = this.replyDemuxer = new ReplyChannelDemuxer(context);
108                     createdDemuxer = true;
109                 }
110                 typeDemuxer = this.replyDemuxer;
111             }
112             else if (!this.typeDemuxers.TryGetValue(channelType, out typeDemuxer))
113             {
114                 typeDemuxer = this.CreateTypedDemuxer(channelType, context);
115                 this.typeDemuxers.Add(channelType, typeDemuxer);
116                 createdDemuxer = true;
117             }
118 
119             if (!createdDemuxer)
120             {
121                 context.RemainingBindingElements.Clear();
122             }
123 
124             return (TypedChannelDemuxer)typeDemuxer;
125         }
126     }
127 
128     abstract class TypedChannelDemuxer
129     {
AbortMessage(RequestContext request)130         internal static void AbortMessage(RequestContext request)
131         {
132             // RequestContext.RequestMessage can throw an AddressMismatch exception.
133             try
134             {
135                 AbortMessage(request.RequestMessage);
136             }
137             catch (Exception e)
138             {
139                 if (Fx.IsFatal(e))
140                 {
141                     throw;
142                 }
143 
144                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
145             }
146         }
147 
AbortMessage(Message message)148         internal static void AbortMessage(Message message)
149         {
150             try
151             {
152                 message.Close();
153             }
154             catch (CommunicationException e)
155             {
156                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
157             }
158             catch (TimeoutException e)
159             {
160                 if (TD.CloseTimeoutIsEnabled())
161                 {
162                     TD.CloseTimeout(e.Message);
163                 }
164 
165                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
166             }
167         }
168 
169         public abstract IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter)
170             where TChannel : class, IChannel;
171     }
172 
173     //
174     // Datagram demuxers
175     //
176 
177     interface IChannelDemuxer
178     {
OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)179         void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout);
OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)180         IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state);
OnEndOuterListenerOpen(IAsyncResult result)181         void OnEndOuterListenerOpen(IAsyncResult result);
OnOuterListenerAbort(ChannelDemuxerFilter filter)182         void OnOuterListenerAbort(ChannelDemuxerFilter filter);
OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)183         void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout);
OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)184         IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state);
OnEndOuterListenerClose(IAsyncResult result)185         void OnEndOuterListenerClose(IAsyncResult result);
186     }
187 
188     abstract class DatagramChannelDemuxer<TInnerChannel, TInnerItem> : TypedChannelDemuxer, IChannelDemuxer
189         where TInnerChannel : class, IChannel
190         where TInnerItem : class, IDisposable
191     {
192         MessageFilterTable<IChannelListener> filterTable;
193         TInnerChannel innerChannel;
194         IChannelListener<TInnerChannel> innerListener;
195         static AsyncCallback onReceiveComplete = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompleteStatic));
196         static Action<object> startReceivingStatic = new Action<object>(StartReceivingStatic);
197         Action onItemDequeued;
198         int openCount;
199         IChannelDemuxFailureHandler demuxFailureHandler;
200         // since the OnOuterListenerOpen method will be called for every outer listener and we will open
201         // the inner listener only once, we need to ensure that all the outer listeners wait till the
202         // inner listener is opened.
203         ThreadNeutralSemaphore openSemaphore;
204         Exception pendingInnerListenerOpenException;
205         bool abortOngoingOpen;
206 
DatagramChannelDemuxer(BindingContext context)207         public DatagramChannelDemuxer(BindingContext context)
208         {
209             this.filterTable = new MessageFilterTable<IChannelListener>();
210             this.innerListener = context.BuildInnerChannelListener<TInnerChannel>();
211             if (context.BindingParameters != null)
212             {
213                 this.demuxFailureHandler = context.BindingParameters.Find<IChannelDemuxFailureHandler>();
214             }
215             this.openSemaphore = new ThreadNeutralSemaphore(1);
216         }
217 
218         protected TInnerChannel InnerChannel
219         {
220             get { return this.innerChannel; }
221         }
222 
223         protected IChannelListener<TInnerChannel> InnerListener
224         {
225             get { return this.innerListener; }
226         }
227 
228         protected object ThisLock
229         {
230             get { return this; }
231         }
232 
233         protected IChannelDemuxFailureHandler DemuxFailureHandler
234         {
235             get { return this.demuxFailureHandler; }
236         }
237 
AbortItem(TInnerItem item)238         protected abstract void AbortItem(TInnerItem item);
BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)239         protected abstract IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state);
240         protected abstract LayeredChannelListener<TChannel> CreateListener<TChannel>(ChannelDemuxerFilter filter) where TChannel : class, IChannel;
Dispatch(IChannelListener listener)241         protected abstract void Dispatch(IChannelListener listener);
EndpointNotFound(TInnerItem item)242         protected abstract void EndpointNotFound(TInnerItem item);
EndReceive(IAsyncResult result)243         protected abstract TInnerItem EndReceive(IAsyncResult result);
EnqueueAndDispatch(IChannelListener listener, TInnerItem item, Action dequeuedCallback, bool canDispatchOnThisThread)244         protected abstract void EnqueueAndDispatch(IChannelListener listener, TInnerItem item, Action dequeuedCallback, bool canDispatchOnThisThread);
EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)245         protected abstract void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread);
GetMessage(TInnerItem item)246         protected abstract Message GetMessage(TInnerItem item);
247 
BuildChannelListener(ChannelDemuxerFilter filter)248         public override IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter)
249         {
250             LayeredChannelListener<TChannel> listener = this.CreateListener<TChannel>(filter);
251             listener.InnerChannelListener = this.innerListener;
252             return listener;
253         }
254 
255         // return false if BeginReceive should be called again
HandleReceiveResult(IAsyncResult result)256         bool HandleReceiveResult(IAsyncResult result)
257         {
258             TInnerItem item;
259             try
260             {
261                 item = this.EndReceive(result);
262             }
263             catch (CommunicationObjectFaultedException e)
264             {
265                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
266                 return true;
267             }
268             catch (CommunicationObjectAbortedException e)
269             {
270                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
271                 return true;
272             }
273             catch (ObjectDisposedException e)
274             {
275                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
276                 return true;
277             }
278             catch (CommunicationException e)
279             {
280                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
281                 return false;
282             }
283             catch (TimeoutException e)
284             {
285                 if (TD.ReceiveTimeoutIsEnabled())
286                 {
287                     TD.ReceiveTimeout(e.Message);
288                 }
289 
290                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
291                 return false;
292             }
293             catch (Exception e)
294             {
295                 if (Fx.IsFatal(e)) throw;
296                 this.HandleUnknownException(e);
297                 return true;
298             }
299 
300             if (item == null)
301             {
302                 if (this.innerChannel.State == CommunicationState.Opened)
303                 {
304                     if (DiagnosticUtility.ShouldTraceError)
305                     {
306                         TraceUtility.TraceEvent(TraceEventType.Error, TraceCode.PrematureDatagramEof, SR.GetString(SR.TraceCodePrematureDatagramEof),
307                             null, this.innerChannel, null);
308                     }
309                 }
310 
311                 return true;
312             }
313 
314             try
315             {
316                 return this.ProcessItem(item);
317             }
318             catch (CommunicationException e)
319             {
320                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
321                 return false;
322             }
323             catch (TimeoutException e)
324             {
325                 if (TD.ReceiveTimeoutIsEnabled())
326                 {
327                     TD.ReceiveTimeout(e.Message);
328                 }
329 
330                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
331                 return false;
332             }
333             catch (Exception e)
334             {
335                 if (Fx.IsFatal(e)) throw;
336                 this.HandleUnknownException(e);
337                 return true;
338             }
339         }
340 
MatchListener(Message message)341         IChannelListener MatchListener(Message message)
342         {
343             IChannelListener matchingListener = null;
344             lock (this.ThisLock)
345             {
346                 if (this.filterTable.GetMatchingValue(message, out matchingListener))
347                 {
348                     return matchingListener;
349                 }
350             }
351             return null;
352         }
353 
OnItemDequeued()354         void OnItemDequeued()
355         {
356             this.StartReceiving();
357         }
358 
StartReceivingStatic(object state)359         static void StartReceivingStatic(object state)
360         {
361             ((DatagramChannelDemuxer<TInnerChannel, TInnerItem>)state).StartReceiving();
362         }
363 
HandleUnknownException(Exception exception)364         protected void HandleUnknownException(Exception exception)
365         {
366             DiagnosticUtility.TraceHandledException(exception, TraceEventType.Error);
367 
368             IChannelListener listener = null;
369             lock (this.ThisLock)
370             {
371                 if (this.filterTable.Count > 0)
372                 {
373                     KeyValuePair<MessageFilter, IChannelListener>[] pairs = new KeyValuePair<MessageFilter, IChannelListener>[this.filterTable.Count];
374                     this.filterTable.CopyTo(pairs, 0);
375                     listener = pairs[0].Value;
376 
377                     if (this.onItemDequeued == null)
378                     {
379                         this.onItemDequeued = new Action(this.OnItemDequeued);
380                     }
381                     this.EnqueueAndDispatch(listener, exception, this.onItemDequeued, false);
382                 }
383             }
384         }
385 
AbortState()386         void AbortState()
387         {
388             if (this.innerChannel != null)
389             {
390                 this.innerChannel.Abort();
391             }
392             this.innerListener.Abort();
393         }
394 
OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)395         public void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)
396         {
397             bool closeInnerChannelAndListener = false;
398 
399             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
400             lock (this.ThisLock)
401             {
402                 if (this.filterTable.ContainsKey(filter.Filter))
403                 {
404                     this.filterTable.Remove(filter.Filter);
405                     if (--this.openCount == 0)
406                     {
407                         closeInnerChannelAndListener = true;
408                     }
409                 }
410             }
411             if (closeInnerChannelAndListener)
412             {
413                 bool closeSucceeded = false;
414                 try
415                 {
416                     if (this.innerChannel != null)
417                     {
418                         this.innerChannel.Close(timeoutHelper.RemainingTime());
419                     }
420                     this.innerListener.Close(timeoutHelper.RemainingTime());
421                     closeSucceeded = true;
422                 }
423                 finally
424                 {
425                     // we should abort the state since calling Abort on the channel demuxer will be a no-op
426                     // due to the reference count being 0
427                     if (!closeSucceeded)
428                     {
429                         AbortState();
430                     }
431                 }
432             }
433         }
434 
OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)435         public IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)
436         {
437             bool closeInnerChannelAndListener = false;
438             lock (this.ThisLock)
439             {
440                 if (this.filterTable.ContainsKey(filter.Filter))
441                 {
442                     this.filterTable.Remove(filter.Filter);
443                     if (--this.openCount == 0)
444                     {
445                         closeInnerChannelAndListener = true;
446                     }
447                 }
448             }
449             if (!closeInnerChannelAndListener)
450             {
451                 return new CompletedAsyncResult(callback, state);
452             }
453             else
454             {
455                 return new CloseAsyncResult(this, timeout, callback, state);
456             }
457         }
458 
OnEndOuterListenerClose(IAsyncResult result)459         public void OnEndOuterListenerClose(IAsyncResult result)
460         {
461             if (result is CompletedAsyncResult)
462             {
463                 CompletedAsyncResult.End(result);
464             }
465             else
466             {
467                 CloseAsyncResult.End(result);
468             }
469         }
470 
OnOuterListenerAbort(ChannelDemuxerFilter filter)471         public void OnOuterListenerAbort(ChannelDemuxerFilter filter)
472         {
473             bool abortInnerChannelAndListener = false;
474             lock (this.ThisLock)
475             {
476                 if (this.filterTable.ContainsKey(filter.Filter))
477                 {
478                     this.filterTable.Remove(filter.Filter);
479                     if (--this.openCount == 0)
480                     {
481                         abortInnerChannelAndListener = true;
482                         this.abortOngoingOpen = true;
483                     }
484                 }
485             }
486             if (abortInnerChannelAndListener)
487             {
488                 AbortState();
489             }
490         }
491 
ThrowPendingOpenExceptionIfAny()492         void ThrowPendingOpenExceptionIfAny()
493         {
494             if (this.pendingInnerListenerOpenException != null)
495             {
496                 if (pendingInnerListenerOpenException is CommunicationObjectAbortedException)
497                 {
498                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectAbortedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString())));
499                 }
500                 else if (pendingInnerListenerOpenException is CommunicationObjectFaultedException)
501                 {
502                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectFaultedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString())));
503                 }
504                 else
505                 {
506                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString())));
507                 }
508             }
509         }
510 
ShouldOpenInnerListener(ChannelDemuxerFilter filter, IChannelListener listener)511         bool ShouldOpenInnerListener(ChannelDemuxerFilter filter, IChannelListener listener)
512         {
513             lock (this.ThisLock)
514             {
515                 // the listener's Abort may be racing with Open
516                 if (listener.State == CommunicationState.Closed || listener.State == CommunicationState.Closing)
517                 {
518                     return false;
519                 }
520                 this.filterTable.Add(filter.Filter, listener, filter.Priority);
521                 if (++this.openCount == 1)
522                 {
523                     this.abortOngoingOpen = false;
524                     return true;
525                 }
526             }
527             return false;
528         }
529 
OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)530         public void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)
531         {
532             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
533             this.openSemaphore.Enter(timeoutHelper.RemainingTime());
534             try
535             {
536                 bool openInnerListener = ShouldOpenInnerListener(filter, listener);
537                 if (openInnerListener)
538                 {
539                     try
540                     {
541                         this.innerListener.Open(timeoutHelper.RemainingTime());
542                         this.innerChannel = this.innerListener.AcceptChannel(timeoutHelper.RemainingTime());
543                         this.innerChannel.Open(timeoutHelper.RemainingTime());
544 
545                         lock (ThisLock)
546                         {
547                             if (this.abortOngoingOpen)
548                             {
549                                 this.AbortState();
550                                 return;
551                             }
552                         }
553 
554                         ActionItem.Schedule(startReceivingStatic, this);
555                     }
556 #pragma warning suppress 56500 // covered by FxCOP
557                     catch (Exception e)
558                     {
559                         this.pendingInnerListenerOpenException = e;
560                         throw;
561                     }
562                 }
563                 else
564                 {
565                     this.ThrowPendingOpenExceptionIfAny();
566                 }
567             }
568             finally
569             {
570                 this.openSemaphore.Exit();
571             }
572         }
573 
OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)574         public IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
575         {
576             return new OpenAsyncResult(this, filter, listener, timeout, callback, state);
577         }
578 
OnEndOuterListenerOpen(IAsyncResult result)579         public void OnEndOuterListenerOpen(IAsyncResult result)
580         {
581             OpenAsyncResult.End(result);
582         }
583 
OnReceiveComplete(IAsyncResult result)584         void OnReceiveComplete(IAsyncResult result)
585         {
586             if (!this.HandleReceiveResult(result))
587             {
588                 this.StartReceiving();
589             }
590         }
591 
OnReceiveCompleteStatic(IAsyncResult result)592         static void OnReceiveCompleteStatic(IAsyncResult result)
593         {
594             if (result.CompletedSynchronously)
595                 return;
596             ((DatagramChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState).OnReceiveComplete(result);
597         }
598 
ProcessItem(TInnerItem item)599         bool ProcessItem(TInnerItem item)
600         {
601             try
602             {
603                 Message message = null;
604                 IChannelListener matchingListener = null;
605                 try
606                 {
607                     message = this.GetMessage(item);
608                     matchingListener = MatchListener(message);
609                 }
610                 // The message may be bad because of which running the listener filters may throw
611                 // In that case, continue receiving
612                 catch (CommunicationException e)
613                 {
614                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
615                     return false;
616                 }
617                 catch (MultipleFilterMatchesException e)
618                 {
619                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
620                     return false;
621                 }
622                 catch (XmlException e)
623                 {
624                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
625                     return false;
626                 }
627                 catch (Exception e)
628                 {
629                     if (Fx.IsFatal(e)) throw;
630                     this.HandleUnknownException(e);
631                     return true;
632                 }
633 
634                 if (matchingListener == null)
635                 {
636                     System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(
637                         new EndpointNotFoundException(SR.GetString(SR.UnableToDemuxChannel, message.Headers.Action)), message);
638                     // EndpointNotFound is responsible for closing the item
639                     this.EndpointNotFound(item);
640                     item = null;
641                     return false;
642                 }
643 
644                 if (this.onItemDequeued == null)
645                 {
646                     this.onItemDequeued = new Action(this.OnItemDequeued);
647                 }
648                 this.EnqueueAndDispatch(matchingListener, item, this.onItemDequeued, false);
649                 item = null;
650                 return true;
651             }
652             finally
653             {
654                 if (item != null)
655                 {
656                     this.AbortItem(item);
657                 }
658             }
659         }
660 
StartReceiving()661         void StartReceiving()
662         {
663             while (true)
664             {
665                 if (this.innerChannel.State != CommunicationState.Opened)
666                 {
667                     return;
668                 }
669 
670                 IAsyncResult result;
671 
672                 try
673                 {
674                     result = this.BeginReceive(TimeSpan.MaxValue, onReceiveComplete, this);
675                 }
676                 catch (CommunicationObjectFaultedException e)
677                 {
678                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
679                     return;
680                 }
681                 catch (CommunicationObjectAbortedException e)
682                 {
683                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
684                     return;
685                 }
686                 catch (ObjectDisposedException e)
687                 {
688                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
689                     return;
690                 }
691                 catch (CommunicationException e)
692                 {
693                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
694                     continue;
695                 }
696                 catch (TimeoutException e)
697                 {
698                     if (TD.ReceiveTimeoutIsEnabled())
699                     {
700                         TD.ReceiveTimeout(e.Message);
701                     }
702 
703                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
704                     continue;
705                 }
706                 catch (Exception e)
707                 {
708                     if (Fx.IsFatal(e)) throw;
709                     this.HandleUnknownException(e);
710                     return;
711                 }
712 
713                 if (!result.CompletedSynchronously)
714                 {
715                     return;
716                 }
717 
718                 if (this.HandleReceiveResult(result))
719                 {
720                     return;
721                 }
722             }
723         }
724 
725         class OpenAsyncResult : AsyncResult
726         {
727             static FastAsyncCallback waitOverCallback = new FastAsyncCallback(WaitOverCallback);
728             static AsyncCallback openListenerCallback = Fx.ThunkCallback(new AsyncCallback(OpenListenerCallback));
729             static AsyncCallback acceptChannelCallback = Fx.ThunkCallback(new AsyncCallback(AcceptChannelCallback));
730             static AsyncCallback openChannelCallback = Fx.ThunkCallback(new AsyncCallback(OpenChannelCallback));
731             DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer;
732             ChannelDemuxerFilter filter;
733             IChannelListener listener;
734             TimeoutHelper timeoutHelper;
735             bool openInnerListener;
736 
OpenAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)737             public OpenAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
738                 : base(callback, state)
739             {
740                 this.channelDemuxer = channelDemuxer;
741                 this.filter = filter;
742                 this.listener = listener;
743                 this.timeoutHelper = new TimeoutHelper(timeout);
744                 if (!this.channelDemuxer.openSemaphore.EnterAsync(this.timeoutHelper.RemainingTime(), waitOverCallback, this))
745                 {
746                     return;
747                 }
748 
749                 bool onWaitOverSucceeded = false;
750                 bool completeSelf = false;
751                 try
752                 {
753                     completeSelf = this.OnWaitOver();
754                     onWaitOverSucceeded = true;
755                 }
756                 finally
757                 {
758                     if (!onWaitOverSucceeded)
759                     {
760                         Cleanup();
761                     }
762                 }
763                 if (completeSelf)
764                 {
765                     Cleanup();
766                     Complete(true);
767                 }
768             }
769 
WaitOverCallback(object state, Exception asyncException)770             static void WaitOverCallback(object state, Exception asyncException)
771             {
772                 OpenAsyncResult self = (OpenAsyncResult)state;
773                 Exception completionException = asyncException;
774                 bool completeSelf = false;
775 
776                 if (completionException != null)
777                 {
778                     completeSelf = true;
779                 }
780                 else
781                 {
782                     try
783                     {
784                         completeSelf = self.OnWaitOver();
785                     }
786 #pragma warning suppress 56500 // covered by FxCOP
787                     catch (Exception e)
788                     {
789                         if (Fx.IsFatal(e)) throw;
790                         completeSelf = true;
791                         completionException = e;
792                     }
793                 }
794 
795                 if (completeSelf)
796                 {
797                     self.Cleanup();
798                     self.Complete(false, completionException);
799                 }
800             }
801 
OnWaitOver()802             bool OnWaitOver()
803             {
804                 this.openInnerListener = this.channelDemuxer.ShouldOpenInnerListener(filter, listener);
805                 // the semaphore is obtained. Check if the inner listener needs to be opened. If not,
806                 // check if there is a pending exception obtained while opening the inner listener and throw
807                 // that
808                 if (!this.openInnerListener)
809                 {
810                     this.channelDemuxer.ThrowPendingOpenExceptionIfAny();
811                     return true;
812                 }
813                 else
814                 {
815                     return this.OnOpenInnerListener();
816                 }
817             }
818 
OnInnerListenerEndOpen(IAsyncResult result)819             bool OnInnerListenerEndOpen(IAsyncResult result)
820             {
821                 this.channelDemuxer.innerListener.EndOpen(result);
822                 result = this.channelDemuxer.innerListener.BeginAcceptChannel(this.timeoutHelper.RemainingTime(), acceptChannelCallback, this);
823 
824                 if (!result.CompletedSynchronously)
825                 {
826                     return false;
827                 }
828 
829                 return this.OnEndAcceptChannel(result);
830             }
831 
OnOpenInnerListener()832             bool OnOpenInnerListener()
833             {
834                 try
835                 {
836                     IAsyncResult result = this.channelDemuxer.innerListener.BeginOpen(timeoutHelper.RemainingTime(), openListenerCallback, this);
837                     if (!result.CompletedSynchronously)
838                     {
839                         return false;
840                     }
841                     this.OnInnerListenerEndOpen(result);
842                     return true;
843                 }
844 #pragma warning suppress 56500 // covered by FxCOP
845                 catch (Exception e)
846                 {
847                     this.channelDemuxer.pendingInnerListenerOpenException = e;
848                     throw;
849                 }
850             }
851 
OpenListenerCallback(IAsyncResult result)852             static void OpenListenerCallback(IAsyncResult result)
853             {
854                 if (result.CompletedSynchronously)
855                 {
856                     return;
857                 }
858                 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
859                 Exception completionException = null;
860                 try
861                 {
862                     self.OnInnerListenerEndOpen(result);
863                 }
864 #pragma warning suppress 56500 // covered by FxCOP
865                 catch (Exception e)
866                 {
867                     if (Fx.IsFatal(e)) throw;
868                     completionException = e;
869                 }
870                 if (completionException != null)
871                 {
872                     self.channelDemuxer.pendingInnerListenerOpenException = completionException;
873                 }
874                 self.Cleanup();
875                 self.Complete(false, completionException);
876             }
877 
AcceptChannelCallback(IAsyncResult result)878             static void AcceptChannelCallback(IAsyncResult result)
879             {
880                 if (result.CompletedSynchronously)
881                 {
882                     return;
883                 }
884                 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
885                 Exception completionException = null;
886                 bool completeSelf = false;
887                 try
888                 {
889                     completeSelf = self.OnEndAcceptChannel(result);
890                 }
891 #pragma warning suppress 56500 // covered by FxCOP
892                 catch (Exception e)
893                 {
894                     if (Fx.IsFatal(e)) throw;
895                     completionException = e;
896                     completeSelf = true;
897                 }
898                 if (completeSelf)
899                 {
900                     if (completionException != null)
901                     {
902                         self.channelDemuxer.pendingInnerListenerOpenException = completionException;
903                     }
904                     self.Cleanup();
905                     self.Complete(false, completionException);
906                 }
907             }
908 
OnEndAcceptChannel(IAsyncResult result)909             bool OnEndAcceptChannel(IAsyncResult result)
910             {
911                 this.channelDemuxer.innerChannel = this.channelDemuxer.innerListener.EndAcceptChannel(result);
912                 IAsyncResult openResult = this.channelDemuxer.innerChannel.BeginOpen(this.timeoutHelper.RemainingTime(), acceptChannelCallback, this);
913 
914                 if (!openResult.CompletedSynchronously)
915                 {
916                     return false;
917                 }
918 
919                 this.OnEndOpenChannel(openResult);
920                 return true;
921             }
922 
OpenChannelCallback(IAsyncResult result)923             static void OpenChannelCallback(IAsyncResult result)
924             {
925                 if (result.CompletedSynchronously)
926                 {
927                     return;
928                 }
929                 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
930                 Exception completionException = null;
931                 try
932                 {
933                     self.OnEndOpenChannel(result);
934                 }
935 #pragma warning suppress 56500 // covered by FxCOP
936                 catch (Exception e)
937                 {
938                     if (Fx.IsFatal(e)) throw;
939                     completionException = e;
940                 }
941                 if (completionException != null)
942                 {
943                     self.channelDemuxer.pendingInnerListenerOpenException = completionException;
944                 }
945                 self.Cleanup();
946                 self.Complete(false, completionException);
947             }
948 
OnEndOpenChannel(IAsyncResult result)949             void OnEndOpenChannel(IAsyncResult result)
950             {
951                 this.channelDemuxer.innerChannel.EndOpen(result);
952 
953                 lock (this.channelDemuxer.ThisLock)
954                 {
955                     if (this.channelDemuxer.abortOngoingOpen)
956                     {
957                         this.channelDemuxer.AbortState();
958                         return;
959                     }
960                 }
961 
962                 ActionItem.Schedule(startReceivingStatic, this.channelDemuxer);
963             }
964 
Cleanup()965             void Cleanup()
966             {
967                 this.channelDemuxer.openSemaphore.Exit();
968             }
969 
End(IAsyncResult result)970             public static void End(IAsyncResult result)
971             {
972                 AsyncResult.End<OpenAsyncResult>(result);
973             }
974         }
975 
976         class CloseAsyncResult : AsyncResult
977         {
978             static AsyncCallback sharedCallback = Fx.ThunkCallback(new AsyncCallback(SharedCallback));
979             DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer;
980             TimeoutHelper timeoutHelper;
981             bool closedInnerChannel;
982 
CloseAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, TimeSpan timeout, AsyncCallback callback, object state)983             public CloseAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, TimeSpan timeout, AsyncCallback callback, object state)
984                 : base(callback, state)
985             {
986                 this.channelDemuxer = channelDemuxer;
987                 this.timeoutHelper = new TimeoutHelper(timeout);
988                 if (channelDemuxer.innerChannel != null)
989                 {
990                     bool closeSucceeded = false;
991                     try
992                     {
993                         IAsyncResult result = channelDemuxer.innerChannel.BeginClose(timeoutHelper.RemainingTime(), sharedCallback, this);
994                         if (!result.CompletedSynchronously)
995                         {
996                             closeSucceeded = true;
997                             return;
998                         }
999                         channelDemuxer.innerChannel.EndClose(result);
1000                         closeSucceeded = true;
1001                     }
1002                     finally
1003                     {
1004                         if (!closeSucceeded)
1005                         {
1006                             // we should abort the state since calling Abort on the channel demuxer will be a no-op
1007                             // due to the reference count being 0
1008                             this.channelDemuxer.AbortState();
1009                         }
1010                     }
1011                 }
1012                 if (OnInnerChannelClosed())
1013                 {
1014                     Complete(true);
1015                 }
1016             }
1017 
OnInnerChannelClosed()1018             bool OnInnerChannelClosed()
1019             {
1020                 this.closedInnerChannel = true;
1021                 bool closeSucceeded = false;
1022                 try
1023                 {
1024                     IAsyncResult result = channelDemuxer.innerListener.BeginClose(timeoutHelper.RemainingTime(), sharedCallback, this);
1025                     if (!result.CompletedSynchronously)
1026                     {
1027                         closeSucceeded = true;
1028                         return false;
1029                     }
1030                     channelDemuxer.innerListener.EndClose(result);
1031                     closeSucceeded = true;
1032                 }
1033                 finally
1034                 {
1035                     if (!closeSucceeded)
1036                     {
1037                         // we should abort the state since calling Abort on the channel demuxer will be a no-op
1038                         // due to the reference count being 0
1039                         channelDemuxer.AbortState();
1040                     }
1041                 }
1042                 return true;
1043             }
1044 
SharedCallback(IAsyncResult result)1045             static void SharedCallback(IAsyncResult result)
1046             {
1047                 if (result.CompletedSynchronously)
1048                 {
1049                     return;
1050                 }
1051                 CloseAsyncResult self = (CloseAsyncResult)result.AsyncState;
1052                 bool completeSelf = false;
1053                 Exception completionException = null;
1054                 bool closeSucceeded = false;
1055                 try
1056                 {
1057                     if (!self.closedInnerChannel)
1058                     {
1059                         self.channelDemuxer.innerChannel.EndClose(result);
1060                         completeSelf = self.OnInnerChannelClosed();
1061                         closeSucceeded = true;
1062                     }
1063                     else
1064                     {
1065                         self.channelDemuxer.innerListener.EndClose(result);
1066                         completeSelf = true;
1067                         closeSucceeded = true;
1068                     }
1069                 }
1070 #pragma warning suppress 56500 // covered by FxCOP
1071                 catch (Exception e)
1072                 {
1073                     if (Fx.IsFatal(e)) throw;
1074                     completeSelf = true;
1075                     completionException = e;
1076                 }
1077                 finally
1078                 {
1079                     if (!closeSucceeded)
1080                     {
1081                         // we should abort the state since calling Abort on the channel demuxer will be a no-op
1082                         // due to the reference count being 0
1083                         self.channelDemuxer.AbortState();
1084                     }
1085                 }
1086                 if (completeSelf)
1087                 {
1088                     self.Complete(false, completionException);
1089                 }
1090             }
1091 
End(IAsyncResult result)1092             public static void End(IAsyncResult result)
1093             {
1094                 AsyncResult.End<CloseAsyncResult>(result);
1095             }
1096         }
1097     }
1098 
1099     class InputChannelDemuxer : DatagramChannelDemuxer<IInputChannel, Message>
1100     {
InputChannelDemuxer(BindingContext context)1101         public InputChannelDemuxer(BindingContext context)
1102             : base(context)
1103         {
1104         }
1105 
AbortItem(Message message)1106         protected override void AbortItem(Message message)
1107         {
1108             AbortMessage(message);
1109         }
1110 
BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)1111         protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
1112         {
1113             return this.InnerChannel.BeginReceive(timeout, callback, state);
1114         }
1115 
CreateListener(ChannelDemuxerFilter filter)1116         protected override LayeredChannelListener<IInputChannel> CreateListener<IInputChannel>(ChannelDemuxerFilter filter)
1117         {
1118             SingletonChannelListener<IInputChannel, InputChannel, Message> listener = new SingletonChannelListener<IInputChannel, InputChannel, Message>(filter, this);
1119             listener.Acceptor = (IChannelAcceptor<IInputChannel>)new InputChannelAcceptor(listener);
1120             return listener;
1121         }
1122 
Dispatch(IChannelListener listener)1123         protected override void Dispatch(IChannelListener listener)
1124         {
1125             SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener;
1126             singletonListener.Dispatch();
1127         }
1128 
EndpointNotFound(Message message)1129         protected override void EndpointNotFound(Message message)
1130         {
1131             if (this.DemuxFailureHandler != null)
1132             {
1133                 this.DemuxFailureHandler.HandleDemuxFailure(message);
1134             }
1135             this.AbortItem(message);
1136         }
1137 
EndReceive(IAsyncResult result)1138         protected override Message EndReceive(IAsyncResult result)
1139         {
1140             return this.InnerChannel.EndReceive(result);
1141         }
1142 
EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)1143         protected override void EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)
1144         {
1145             SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener;
1146             singletonListener.EnqueueAndDispatch(message, dequeuedCallback, canDispatchOnThisThread);
1147         }
1148 
EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1149         protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
1150         {
1151             SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener;
1152             singletonListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
1153         }
1154 
GetMessage(Message message)1155         protected override Message GetMessage(Message message)
1156         {
1157             return message;
1158         }
1159     }
1160 
1161     class DuplexChannelDemuxer : DatagramChannelDemuxer<IDuplexChannel, Message>
1162     {
DuplexChannelDemuxer(BindingContext context)1163         public DuplexChannelDemuxer(BindingContext context)
1164             : base(context)
1165         {
1166         }
1167 
AbortItem(Message message)1168         protected override void AbortItem(Message message)
1169         {
1170             AbortMessage(message);
1171         }
1172 
BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)1173         protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
1174         {
1175             return this.InnerChannel.BeginReceive(timeout, callback, state);
1176         }
1177 
CreateListener(ChannelDemuxerFilter filter)1178         protected override LayeredChannelListener<IDuplexChannel> CreateListener<IDuplexChannel>(ChannelDemuxerFilter filter)
1179         {
1180             SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> listener = new SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>(filter, this);
1181             listener.Acceptor = (IChannelAcceptor<IDuplexChannel>)new DuplexChannelAcceptor(listener, this);
1182             return listener;
1183         }
1184 
Dispatch(IChannelListener listener)1185         protected override void Dispatch(IChannelListener listener)
1186         {
1187             SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener;
1188             singletonListener.Dispatch();
1189         }
1190 
EndpointNotFound(Message message)1191         protected override void EndpointNotFound(Message message)
1192         {
1193             if (this.DemuxFailureHandler != null)
1194             {
1195                 this.DemuxFailureHandler.HandleDemuxFailure(message);
1196             }
1197             this.AbortItem(message);
1198         }
1199 
EndReceive(IAsyncResult result)1200         protected override Message EndReceive(IAsyncResult result)
1201         {
1202             return this.InnerChannel.EndReceive(result);
1203         }
1204 
EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)1205         protected override void EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)
1206         {
1207             SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener;
1208             singletonListener.EnqueueAndDispatch(message, dequeuedCallback, canDispatchOnThisThread);
1209         }
1210 
EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1211         protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
1212         {
1213             SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener;
1214             singletonListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
1215         }
1216 
GetMessage(Message message)1217         protected override Message GetMessage(Message message)
1218         {
1219             return message;
1220         }
1221 
1222         class DuplexChannelAcceptor : SingletonChannelAcceptor<IDuplexChannel, DuplexChannel, Message>
1223         {
1224             DuplexChannelDemuxer demuxer;
1225 
DuplexChannelAcceptor(ChannelManagerBase channelManager, DuplexChannelDemuxer demuxer)1226             public DuplexChannelAcceptor(ChannelManagerBase channelManager, DuplexChannelDemuxer demuxer)
1227                 : base(channelManager)
1228             {
1229                 this.demuxer = demuxer;
1230             }
1231 
OnCreateChannel()1232             protected override DuplexChannel OnCreateChannel()
1233             {
1234                 return new DuplexChannelWrapper(this.ChannelManager, demuxer.InnerChannel);
1235             }
1236 
OnTraceMessageReceived(Message message)1237             protected override void OnTraceMessageReceived(Message message)
1238             {
1239                 if (DiagnosticUtility.ShouldTraceInformation)
1240                 {
1241                     TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageReceived, SR.GetString(SR.TraceCodeMessageReceived),
1242                         MessageTransmitTraceRecord.CreateReceiveTraceRecord(message), this, null);
1243                 }
1244             }
1245         }
1246 
1247         class DuplexChannelWrapper : DuplexChannel
1248         {
1249             IDuplexChannel innerChannel;
1250 
DuplexChannelWrapper(ChannelManagerBase channelManager, IDuplexChannel innerChannel)1251             public DuplexChannelWrapper(ChannelManagerBase channelManager, IDuplexChannel innerChannel)
1252                 : base(channelManager, innerChannel.LocalAddress)
1253             {
1254                 this.innerChannel = innerChannel;
1255             }
1256 
1257             public override EndpointAddress RemoteAddress
1258             {
1259                 get { return this.innerChannel.RemoteAddress; }
1260             }
1261 
1262             public override Uri Via
1263             {
1264                 get { return this.innerChannel.Via; }
1265             }
1266 
OnSend(Message message, TimeSpan timeout)1267             protected override void OnSend(Message message, TimeSpan timeout)
1268             {
1269                 this.innerChannel.Send(message, timeout);
1270             }
1271 
OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)1272             protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
1273             {
1274                 return this.innerChannel.BeginSend(message, timeout, callback, state);
1275             }
1276 
OnEndSend(IAsyncResult result)1277             protected override void OnEndSend(IAsyncResult result)
1278             {
1279                 this.innerChannel.EndSend(result);
1280             }
1281 
OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)1282             protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
1283             {
1284                 return new CompletedAsyncResult(callback, state);
1285             }
1286 
OnEndOpen(IAsyncResult result)1287             protected override void OnEndOpen(IAsyncResult result)
1288             {
1289                 CompletedAsyncResult.End(result);
1290             }
1291 
OnOpen(TimeSpan timeout)1292             protected override void OnOpen(TimeSpan timeout)
1293             {
1294             }
1295         }
1296     }
1297 
1298     class ReplyChannelDemuxer : DatagramChannelDemuxer<IReplyChannel, RequestContext>
1299     {
ReplyChannelDemuxer(BindingContext context)1300         public ReplyChannelDemuxer(BindingContext context)
1301             : base(context)
1302         {
1303         }
1304 
AbortItem(RequestContext request)1305         protected override void AbortItem(RequestContext request)
1306         {
1307             AbortMessage(request);
1308             request.Abort();
1309         }
1310 
BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)1311         protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
1312         {
1313             return this.InnerChannel.BeginReceiveRequest(timeout, callback, state);
1314         }
1315 
CreateListener(ChannelDemuxerFilter filter)1316         protected override LayeredChannelListener<TChannel> CreateListener<TChannel>(ChannelDemuxerFilter filter)
1317         {
1318             if (typeof(TChannel) == typeof(IInputChannel))
1319             {
1320                 SingletonChannelListener<IInputChannel, InputChannel, Message> listener = new SingletonChannelListener<IInputChannel, InputChannel, Message>(filter, this);
1321                 listener.Acceptor = (IChannelAcceptor<IInputChannel>)new InputChannelAcceptor(listener);
1322                 return (LayeredChannelListener<TChannel>)(object)listener;
1323             }
1324             else if (typeof(TChannel) == typeof(IReplyChannel))
1325             {
1326                 SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> listener = new SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>(filter, this);
1327                 listener.Acceptor = (IChannelAcceptor<IReplyChannel>)new ReplyChannelAcceptor(listener);
1328                 return (LayeredChannelListener<TChannel>)(object)listener;
1329             }
1330             else
1331             {
1332                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException());
1333             }
1334         }
1335 
Dispatch(IChannelListener listener)1336         protected override void Dispatch(IChannelListener listener)
1337         {
1338             SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>;
1339             if (inputListener != null)
1340             {
1341                 inputListener.Dispatch();
1342                 return;
1343             }
1344             SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>;
1345             if (replyListener != null)
1346             {
1347                 replyListener.Dispatch();
1348                 return;
1349             }
1350 
1351             throw Fx.AssertAndThrow("ReplyChannelDemuxer.Dispatch (false)");
1352         }
1353 
EndpointNotFoundCallback(IAsyncResult result)1354         void EndpointNotFoundCallback(IAsyncResult result)
1355         {
1356             if (result.CompletedSynchronously)
1357             {
1358                 return;
1359             }
1360             RequestContext item = (RequestContext)result.AsyncState;
1361             bool abortItem = true;
1362             try
1363             {
1364                 ReplyChannelDemuxFailureAsyncResult.End(result);
1365                 abortItem = false;
1366             }
1367             catch (TimeoutException e)
1368             {
1369                 if (TD.SendTimeoutIsEnabled())
1370                 {
1371                     TD.SendTimeout(e.Message);
1372                 }
1373 
1374                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1375             }
1376             catch (CommunicationException e)
1377             {
1378                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1379             }
1380             catch (ObjectDisposedException e)
1381             {
1382                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1383             }
1384             catch (Exception e)
1385             {
1386                 if (Fx.IsFatal(e)) throw;
1387                 this.HandleUnknownException(e);
1388             }
1389             finally
1390             {
1391                 if (abortItem)
1392                 {
1393                     this.AbortItem(item);
1394                 }
1395             }
1396         }
1397 
EndpointNotFound(RequestContext request)1398         protected override void EndpointNotFound(RequestContext request)
1399         {
1400             bool abortItem = true;
1401             try
1402             {
1403                 if (this.DemuxFailureHandler != null)
1404                 {
1405                     try
1406                     {
1407                         ReplyChannelDemuxFailureAsyncResult result = new ReplyChannelDemuxFailureAsyncResult(this.DemuxFailureHandler, request, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), request);
1408                         result.Start();
1409                         if (!result.CompletedSynchronously)
1410                         {
1411                             abortItem = false;
1412                             return;
1413                         }
1414                         ReplyChannelDemuxFailureAsyncResult.End(result);
1415                         abortItem = false;
1416                     }
1417                     catch (CommunicationException e)
1418                     {
1419                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1420                     }
1421                     catch (TimeoutException e)
1422                     {
1423                         if (TD.SendTimeoutIsEnabled())
1424                         {
1425                             TD.SendTimeout(e.Message);
1426                         }
1427                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1428                     }
1429                     catch (ObjectDisposedException e)
1430                     {
1431                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1432                     }
1433                     catch (Exception e)
1434                     {
1435                         if (Fx.IsFatal(e)) throw;
1436                         this.HandleUnknownException(e);
1437                     }
1438                 }
1439             }
1440             finally
1441             {
1442                 if (abortItem)
1443                 {
1444                     this.AbortItem(request);
1445                 }
1446             }
1447         }
1448 
EndReceive(IAsyncResult result)1449         protected override RequestContext EndReceive(IAsyncResult result)
1450         {
1451             return this.InnerChannel.EndReceiveRequest(result);
1452         }
1453 
EnqueueAndDispatch(IChannelListener listener, RequestContext request, Action dequeuedCallback, bool canDispatchOnThisThread)1454         protected override void EnqueueAndDispatch(IChannelListener listener, RequestContext request, Action dequeuedCallback, bool canDispatchOnThisThread)
1455         {
1456             SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>;
1457             if (inputListener != null)
1458             {
1459                 inputListener.EnqueueAndDispatch(request.RequestMessage, dequeuedCallback, canDispatchOnThisThread);
1460 
1461                 try
1462                 {
1463                     request.Close();
1464                 }
1465                 catch (CommunicationException e)
1466                 {
1467                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1468                 }
1469                 catch (TimeoutException e)
1470                 {
1471                     if (TD.CloseTimeoutIsEnabled())
1472                     {
1473                         TD.CloseTimeout(e.Message);
1474                     }
1475                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1476                 }
1477             }
1478             SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>;
1479             if (replyListener != null)
1480             {
1481                 replyListener.EnqueueAndDispatch(request, dequeuedCallback, canDispatchOnThisThread);
1482                 return;
1483             }
1484 
1485             throw Fx.AssertAndThrow("ReplyChannelDemuxer.EnqueueAndDispatch (false)");
1486         }
1487 
EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1488         protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
1489         {
1490             SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>;
1491             if (inputListener != null)
1492             {
1493                 inputListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
1494                 return;
1495             }
1496 
1497             SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>;
1498             if (replyListener != null)
1499             {
1500                 replyListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
1501                 return;
1502             }
1503 
1504             throw Fx.AssertAndThrow("ReplyChannelDemuxer.EnqueueAndDispatch (false)");
1505         }
1506 
GetMessage(RequestContext request)1507         protected override Message GetMessage(RequestContext request)
1508         {
1509             return request.RequestMessage;
1510         }
1511     }
1512 
1513     interface IChannelDemuxerFilter
1514     {
1515         ChannelDemuxerFilter Filter { get; }
1516     }
1517 
1518     class SingletonChannelListener<TChannel, TQueuedChannel, TQueuedItem> : DelegatingChannelListener<TChannel>, IChannelDemuxerFilter
1519         where TChannel : class, IChannel
1520         where TQueuedChannel : InputQueueChannel<TQueuedItem>
1521         where TQueuedItem : class, IDisposable
1522     {
1523         ChannelDemuxerFilter filter;
1524         IChannelDemuxer channelDemuxer;
1525 
SingletonChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)1526         public SingletonChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)
1527             : base(true)
1528         {
1529             this.filter = filter;
1530             this.channelDemuxer = channelDemuxer;
1531         }
1532 
1533         public ChannelDemuxerFilter Filter
1534         {
1535             get { return this.filter; }
1536         }
1537 
1538         SingletonChannelAcceptor<TChannel, TQueuedChannel, TQueuedItem> SingletonAcceptor
1539         {
1540             get { return (SingletonChannelAcceptor<TChannel, TQueuedChannel, TQueuedItem>)base.Acceptor; }
1541             set { this.Acceptor = value; }
1542         }
1543 
OnOpen(TimeSpan timeout)1544         protected override void OnOpen(TimeSpan timeout)
1545         {
1546             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1547             this.channelDemuxer.OnOuterListenerOpen(this.filter, this, timeoutHelper.RemainingTime());
1548             base.OnOpen(timeoutHelper.RemainingTime());
1549         }
1550 
OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)1551         protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
1552         {
1553             return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerOpen, this.OnEndOuterListenerOpen, base.OnBeginOpen, base.OnEndOpen);
1554         }
1555 
OnEndOpen(IAsyncResult result)1556         protected override void OnEndOpen(IAsyncResult result)
1557         {
1558             ChainedAsyncResult.End(result);
1559         }
1560 
OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)1561         IAsyncResult OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)
1562         {
1563             return this.channelDemuxer.OnBeginOuterListenerOpen(this.filter, this, timeout, callback, state);
1564         }
1565 
OnEndOuterListenerOpen(IAsyncResult result)1566         void OnEndOuterListenerOpen(IAsyncResult result)
1567         {
1568             this.channelDemuxer.OnEndOuterListenerOpen(result);
1569         }
1570 
OnAbort()1571         protected override void OnAbort()
1572         {
1573             this.channelDemuxer.OnOuterListenerAbort(this.filter);
1574             base.OnAbort();
1575         }
1576 
OnClose(TimeSpan timeout)1577         protected override void OnClose(TimeSpan timeout)
1578         {
1579             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1580             this.channelDemuxer.OnOuterListenerClose(this.filter, timeoutHelper.RemainingTime());
1581             base.OnClose(timeoutHelper.RemainingTime());
1582         }
1583 
OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)1584         protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
1585         {
1586             return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerClose, this.OnEndOuterListenerClose, base.OnBeginClose, base.OnEndClose);
1587         }
1588 
OnEndClose(IAsyncResult result)1589         protected override void OnEndClose(IAsyncResult result)
1590         {
1591             ChainedAsyncResult.End(result);
1592         }
1593 
OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)1594         IAsyncResult OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)
1595         {
1596             return this.channelDemuxer.OnBeginOuterListenerClose(this.filter, timeout, callback, state);
1597         }
1598 
OnEndOuterListenerClose(IAsyncResult result)1599         void OnEndOuterListenerClose(IAsyncResult result)
1600         {
1601             this.channelDemuxer.OnEndOuterListenerClose(result);
1602         }
1603 
Dispatch()1604         public void Dispatch()
1605         {
1606             this.SingletonAcceptor.DispatchItems();
1607         }
1608 
EnqueueAndDispatch(TQueuedItem item, Action dequeuedCallback, bool canDispatchOnThisThread)1609         public void EnqueueAndDispatch(TQueuedItem item, Action dequeuedCallback, bool canDispatchOnThisThread)
1610         {
1611             this.SingletonAcceptor.EnqueueAndDispatch(item, dequeuedCallback, canDispatchOnThisThread);
1612         }
1613 
EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1614         public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
1615         {
1616             this.SingletonAcceptor.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
1617         }
1618     }
1619 
1620     //
1621     // Session demuxers
1622     //
1623 
1624     abstract class SessionChannelDemuxer<TInnerChannel, TInnerItem> : TypedChannelDemuxer, IChannelDemuxer
1625         where TInnerChannel : class, IChannel
1626         where TInnerItem : class, IDisposable
1627     {
1628         IChannelDemuxFailureHandler demuxFailureHandler;
1629         MessageFilterTable<InputQueueChannelListener<TInnerChannel>> filterTable;
1630         IChannelListener<TInnerChannel> innerListener;
1631         static AsyncCallback onAcceptComplete = Fx.ThunkCallback(new AsyncCallback(OnAcceptCompleteStatic));
1632         static AsyncCallback onPeekComplete = Fx.ThunkCallback(new AsyncCallback(OnPeekCompleteStatic));
1633         Action onItemDequeued;
1634         static WaitCallback scheduleAcceptStatic = new WaitCallback(ScheduleAcceptStatic);
1635         static Action<object> startAcceptStatic = new Action<object>(StartAcceptStatic);
1636         Action<object> onStartAccepting;
1637         int openCount;
1638         ThreadNeutralSemaphore openSemaphore;
1639         Exception pendingExceptionOnOpen;
1640         bool abortOngoingOpen;
1641         FlowThrottle throttle;
1642         TimeSpan peekTimeout;
1643 
SessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)1644         public SessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
1645         {
1646             if (context.BindingParameters != null)
1647             {
1648                 this.demuxFailureHandler = context.BindingParameters.Find<IChannelDemuxFailureHandler>();
1649             }
1650             this.innerListener = context.BuildInnerChannelListener<TInnerChannel>();
1651             this.filterTable = new MessageFilterTable<InputQueueChannelListener<TInnerChannel>>();
1652             this.openSemaphore = new ThreadNeutralSemaphore(1);
1653             this.peekTimeout = peekTimeout;
1654             this.throttle = new FlowThrottle(scheduleAcceptStatic, maxPendingSessions, null, null);
1655         }
1656 
1657         protected object ThisLock
1658         {
1659             get { return this; }
1660         }
1661 
1662         protected IChannelDemuxFailureHandler DemuxFailureHandler
1663         {
1664             get { return this.demuxFailureHandler; }
1665         }
1666 
1667         Action<object> OnStartAccepting
1668         {
1669             get
1670             {
1671                 if (this.onStartAccepting == null)
1672                 {
1673                     this.onStartAccepting = new Action<object>(OnStartAcceptingCallback);
1674                 }
1675 
1676                 return this.onStartAccepting;
1677             }
1678         }
1679 
AbortItem(TInnerItem item)1680         protected abstract void AbortItem(TInnerItem item);
BeginReceive(TInnerChannel channel, AsyncCallback callback, object state)1681         protected abstract IAsyncResult BeginReceive(TInnerChannel channel, AsyncCallback callback, object state);
BeginReceive(TInnerChannel channel, TimeSpan timeout, AsyncCallback callback, object state)1682         protected abstract IAsyncResult BeginReceive(TInnerChannel channel, TimeSpan timeout, AsyncCallback callback, object state);
CreateChannel(ChannelManagerBase channelManager, TInnerChannel innerChannel, TInnerItem firstItem)1683         protected abstract TInnerChannel CreateChannel(ChannelManagerBase channelManager, TInnerChannel innerChannel, TInnerItem firstItem);
EndpointNotFound(TInnerChannel channel, TInnerItem item)1684         protected abstract void EndpointNotFound(TInnerChannel channel, TInnerItem item);
EndReceive(TInnerChannel channel, IAsyncResult result)1685         protected abstract TInnerItem EndReceive(TInnerChannel channel, IAsyncResult result);
GetMessage(TInnerItem item)1686         protected abstract Message GetMessage(TInnerItem item);
1687 
BuildChannelListener(ChannelDemuxerFilter filter)1688         public override IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter)
1689         {
1690             Fx.Assert(typeof(TChannel) == typeof(TInnerChannel), "SessionChannelDemuxer.BuildChannelListener (typeof(TChannel) == typeof(TInnerChannel))");
1691 
1692             InputQueueChannelListener<TChannel> listener = new InputQueueChannelListener<TChannel>(filter, this);
1693             listener.InnerChannelListener = this.innerListener;
1694             return listener;
1695         }
1696 
1697         // return true if another BeginAcceptChannel should pend
BeginAcceptChannel(bool requiresThrottle, out IAsyncResult result)1698         bool BeginAcceptChannel(bool requiresThrottle, out IAsyncResult result)
1699         {
1700             result = null;
1701 
1702             if (requiresThrottle && !this.throttle.Acquire(this))
1703             {
1704                 return false;
1705             }
1706 
1707             bool releaseThrottle = true;
1708 
1709             try
1710             {
1711                 result = this.innerListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptComplete, this);
1712                 releaseThrottle = false;
1713             }
1714             catch (CommunicationObjectFaultedException e)
1715             {
1716                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1717                 return false;
1718             }
1719             catch (CommunicationObjectAbortedException e)
1720             {
1721                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1722                 return false;
1723             }
1724             catch (ObjectDisposedException e)
1725             {
1726                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1727                 return false;
1728             }
1729             catch (CommunicationException e)
1730             {
1731                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1732                 return true;
1733             }
1734             catch (TimeoutException e)
1735             {
1736                 if (TD.OpenTimeoutIsEnabled())
1737                 {
1738                     TD.OpenTimeout(e.Message);
1739                 }
1740                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1741                 return true;
1742             }
1743             catch (Exception e)
1744             {
1745                 if (Fx.IsFatal(e)) throw;
1746                 this.HandleUnknownException(e);
1747                 releaseThrottle = false;
1748                 return false;
1749             }
1750             finally
1751             {
1752                 if (releaseThrottle)
1753                 {
1754                     this.throttle.Release();
1755                 }
1756             }
1757 
1758             return true;
1759         }
1760 
EndAcceptChannel(IAsyncResult result, out TInnerChannel channel)1761         bool EndAcceptChannel(IAsyncResult result, out TInnerChannel channel)
1762         {
1763             channel = null;
1764             bool releaseThrottle = true;
1765             try
1766             {
1767                 channel = this.innerListener.EndAcceptChannel(result);
1768                 releaseThrottle = (channel == null);
1769             }
1770             catch (CommunicationObjectFaultedException e)
1771             {
1772                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1773                 return false;
1774             }
1775             catch (CommunicationObjectAbortedException e)
1776             {
1777                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1778                 return false;
1779             }
1780             catch (ObjectDisposedException e)
1781             {
1782                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1783                 return false;
1784             }
1785             catch (CommunicationException e)
1786             {
1787                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1788                 return true;
1789             }
1790             catch (TimeoutException e)
1791             {
1792                 if (TD.OpenTimeoutIsEnabled())
1793                 {
1794                     TD.OpenTimeout(e.Message);
1795                 }
1796                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1797                 return true;
1798             }
1799             catch (Exception e)
1800             {
1801                 if (Fx.IsFatal(e)) throw;
1802                 this.HandleUnknownException(e);
1803                 releaseThrottle = false;
1804                 return false;
1805             }
1806             finally
1807             {
1808                 if (releaseThrottle)
1809                 {
1810                     throttle.Release();
1811                 }
1812             }
1813 
1814             return (channel != null);
1815         }
1816 
PeekChannel(TInnerChannel channel)1817         void PeekChannel(TInnerChannel channel)
1818         {
1819             bool releaseThrottle = true;
1820             try
1821             {
1822                 IAsyncResult peekResult = new PeekAsyncResult(this, channel, onPeekComplete, this);
1823                 releaseThrottle = false;
1824                 if (!peekResult.CompletedSynchronously)
1825                 {
1826                     return;
1827                 }
1828                 channel = null;
1829                 this.HandlePeekResult(peekResult);
1830             }
1831             catch (CommunicationException e)
1832             {
1833                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1834             }
1835             catch (TimeoutException e)
1836             {
1837                 if (TD.OpenTimeoutIsEnabled())
1838                 {
1839                     TD.OpenTimeout(e.Message);
1840                 }
1841                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1842             }
1843             catch (ObjectDisposedException e)
1844             {
1845                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1846             }
1847             catch (Exception e)
1848             {
1849                 if (Fx.IsFatal(e)) throw;
1850                 this.HandleUnknownException(e);
1851                 releaseThrottle = false;
1852             }
1853 
1854             if (channel != null)
1855             {
1856                 channel.Abort();
1857             }
1858 
1859             if (releaseThrottle)
1860             {
1861                 this.throttle.Release();
1862             }
1863         }
1864 
HandlePeekResult(IAsyncResult result)1865         void HandlePeekResult(IAsyncResult result)
1866         {
1867             TInnerChannel channel = null;
1868             TInnerItem item;
1869             bool abortChannel = false;
1870             bool releaseThrottle = true;
1871             try
1872             {
1873                 PeekAsyncResult.End(result, out channel, out item);
1874                 releaseThrottle = (item == null);
1875             }
1876             catch (ObjectDisposedException e)
1877             {
1878                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1879                 abortChannel = true;
1880                 return;
1881             }
1882             catch (CommunicationException e)
1883             {
1884                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1885                 abortChannel = true;
1886                 return;
1887             }
1888             catch (TimeoutException e)
1889             {
1890                 if (TD.OpenTimeoutIsEnabled())
1891                 {
1892                     TD.OpenTimeout(e.Message);
1893                 }
1894                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1895                 abortChannel = true;
1896                 return;
1897             }
1898             catch (Exception e)
1899             {
1900                 if (Fx.IsFatal(e)) throw;
1901                 this.HandleUnknownException(e);
1902                 releaseThrottle = false;
1903                 return;
1904             }
1905             finally
1906             {
1907                 if (abortChannel && channel != null)
1908                 {
1909                     channel.Abort();
1910                 }
1911 
1912                 if (releaseThrottle)
1913                 {
1914                     this.throttle.Release();
1915                 }
1916             }
1917 
1918             if (item != null)
1919             {
1920                 releaseThrottle = true;
1921 
1922                 try
1923                 {
1924                     this.ProcessItem(channel, item);
1925                     releaseThrottle = false;
1926                 }
1927                 catch (CommunicationException e)
1928                 {
1929                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1930                 }
1931                 catch (TimeoutException e)
1932                 {
1933                     if (TD.OpenTimeoutIsEnabled())
1934                     {
1935                         TD.OpenTimeout(e.Message);
1936                     }
1937                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1938                 }
1939                 catch (Exception e)
1940                 {
1941                     if (Fx.IsFatal(e)) throw;
1942                     this.HandleUnknownException(e);
1943                     releaseThrottle = false;
1944                 }
1945                 finally
1946                 {
1947                     if (releaseThrottle)
1948                     {
1949                         this.throttle.Release();
1950                     }
1951                 }
1952             }
1953         }
1954 
MatchListener(Message message)1955         InputQueueChannelListener<TInnerChannel> MatchListener(Message message)
1956         {
1957             InputQueueChannelListener<TInnerChannel> matchingListener = null;
1958             lock (this.ThisLock)
1959             {
1960                 if (this.filterTable.GetMatchingValue(message, out matchingListener))
1961                 {
1962                     return matchingListener;
1963                 }
1964             }
1965             return null;
1966         }
1967 
OnAcceptCompleteStatic(IAsyncResult result)1968         static void OnAcceptCompleteStatic(IAsyncResult result)
1969         {
1970             if (result.CompletedSynchronously)
1971             {
1972                 return;
1973             }
1974 
1975             ((SessionChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState).OnStartAcceptingCallback(result);
1976         }
1977 
ScheduleAcceptStatic(object state)1978         static void ScheduleAcceptStatic(object state)
1979         {
1980             ActionItem.Schedule(startAcceptStatic, state);
1981         }
1982 
StartAcceptStatic(object state)1983         static void StartAcceptStatic(object state)
1984         {
1985             ((SessionChannelDemuxer<TInnerChannel, TInnerItem>)state).StartAccepting(false);
1986         }
1987 
ShouldStartAccepting(ChannelDemuxerFilter filter, IChannelListener listener)1988         bool ShouldStartAccepting(ChannelDemuxerFilter filter, IChannelListener listener)
1989         {
1990             lock (this.ThisLock)
1991             {
1992                 // the listener's Abort may be racing with Open
1993                 if (listener.State == CommunicationState.Closed || listener.State == CommunicationState.Closing)
1994                 {
1995                     return false;
1996                 }
1997 
1998                 this.filterTable.Add(filter.Filter, (InputQueueChannelListener<TInnerChannel>)(object)listener, filter.Priority);
1999                 if (++this.openCount == 1)
2000                 {
2001                     this.abortOngoingOpen = false;
2002                     return true;
2003                 }
2004             }
2005             return false;
2006         }
2007 
StartAccepting(bool requiresThrottle)2008         void StartAccepting(bool requiresThrottle)
2009         {
2010             IAsyncResult acceptResult;
2011             bool acceptValid = this.BeginAcceptChannel(requiresThrottle, out acceptResult);
2012             if (acceptValid && (acceptResult == null || acceptResult.CompletedSynchronously))
2013             {
2014                 // need to spawn another thread to process this completion
2015                 ActionItem.Schedule(OnStartAccepting, acceptResult);
2016             }
2017         }
2018 
OnItemDequeued()2019         void OnItemDequeued()
2020         {
2021             this.throttle.Release();
2022         }
2023 
ThrowPendingOpenExceptionIfAny()2024         void ThrowPendingOpenExceptionIfAny()
2025         {
2026             if (this.pendingExceptionOnOpen != null)
2027             {
2028                 if (pendingExceptionOnOpen is CommunicationObjectAbortedException)
2029                 {
2030                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectAbortedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString())));
2031                 }
2032                 else if (pendingExceptionOnOpen is CommunicationObjectFaultedException)
2033                 {
2034                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectFaultedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString())));
2035                 }
2036                 else
2037                 {
2038                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString())));
2039                 }
2040             }
2041         }
2042 
OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)2043         public void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)
2044         {
2045             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
2046             this.openSemaphore.Enter(timeoutHelper.RemainingTime());
2047             try
2048             {
2049                 bool startAccepting = ShouldStartAccepting(filter, listener);
2050                 if (startAccepting)
2051                 {
2052                     try
2053                     {
2054                         this.innerListener.Open(timeoutHelper.RemainingTime());
2055                         StartAccepting(true);
2056                         lock (ThisLock)
2057                         {
2058                             if (this.abortOngoingOpen)
2059                             {
2060                                 this.innerListener.Abort();
2061                             }
2062                         }
2063                     }
2064 #pragma warning suppress 56500 // covered by FxCOP
2065                     catch (Exception e)
2066                     {
2067                         this.pendingExceptionOnOpen = e;
2068                         throw;
2069                     }
2070                 }
2071                 else
2072                 {
2073                     this.ThrowPendingOpenExceptionIfAny();
2074                 }
2075             }
2076             finally
2077             {
2078                 this.openSemaphore.Exit();
2079             }
2080         }
2081 
2082 
OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)2083         public IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
2084         {
2085             return new OpenAsyncResult(this, filter, listener, timeout, callback, state);
2086         }
2087 
OnEndOuterListenerOpen(IAsyncResult result)2088         public void OnEndOuterListenerOpen(IAsyncResult result)
2089         {
2090             OpenAsyncResult.End(result);
2091         }
2092 
ShouldCloseInnerListener(ChannelDemuxerFilter filter, bool aborted)2093         bool ShouldCloseInnerListener(ChannelDemuxerFilter filter, bool aborted)
2094         {
2095             lock (this.ThisLock)
2096             {
2097                 if (this.filterTable.ContainsKey(filter.Filter))
2098                 {
2099                     this.filterTable.Remove(filter.Filter);
2100                     if (--this.openCount == 0)
2101                     {
2102                         if (aborted)
2103                         {
2104                             this.abortOngoingOpen = true;
2105                         }
2106                         return true;
2107                     }
2108                 }
2109             }
2110             return false;
2111         }
2112 
OnOuterListenerAbort(ChannelDemuxerFilter filter)2113         public void OnOuterListenerAbort(ChannelDemuxerFilter filter)
2114         {
2115             if (ShouldCloseInnerListener(filter, true))
2116             {
2117                 innerListener.Abort();
2118             }
2119         }
2120 
OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)2121         public void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)
2122         {
2123             if (ShouldCloseInnerListener(filter, false))
2124             {
2125                 bool closeSucceeded = false;
2126                 try
2127                 {
2128                     innerListener.Close(timeout);
2129                     closeSucceeded = true;
2130                 }
2131                 finally
2132                 {
2133                     if (!closeSucceeded)
2134                     {
2135                         // we should abort the state since calling Abort on the channel demuxer will be a no-op
2136                         // due to the reference count being 0
2137                         innerListener.Abort();
2138                     }
2139                 }
2140             }
2141         }
2142 
OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)2143         public IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)
2144         {
2145             if (ShouldCloseInnerListener(filter, false))
2146             {
2147                 bool closeSucceeded = false;
2148                 try
2149                 {
2150                     IAsyncResult result = this.innerListener.BeginClose(timeout, callback, state);
2151                     closeSucceeded = true;
2152                     return result;
2153                 }
2154                 finally
2155                 {
2156                     if (!closeSucceeded)
2157                     {
2158                         // we should abort the state since calling Abort on the channel demuxer will be a no-op
2159                         // due to the reference count being 0
2160                         this.innerListener.Abort();
2161                     }
2162                 }
2163             }
2164             else
2165             {
2166                 return new CompletedAsyncResult(callback, state);
2167             }
2168         }
2169 
OnEndOuterListenerClose(IAsyncResult result)2170         public void OnEndOuterListenerClose(IAsyncResult result)
2171         {
2172             if (result is CompletedAsyncResult)
2173             {
2174                 CompletedAsyncResult.End(result);
2175             }
2176             else
2177             {
2178                 bool closeSucceeded = false;
2179                 try
2180                 {
2181                     this.innerListener.EndClose(result);
2182                     closeSucceeded = true;
2183                 }
2184                 finally
2185                 {
2186                     if (!closeSucceeded)
2187                     {
2188                         // we should abort the state since calling Abort on the channel demuxer will be a no-op
2189                         // due to the reference count being 0
2190                         this.innerListener.Abort();
2191                     }
2192                 }
2193             }
2194         }
2195 
OnStartAcceptingCallback(object state)2196         void OnStartAcceptingCallback(object state)
2197         {
2198             IAsyncResult result = (IAsyncResult)state;
2199             TInnerChannel channel = null;
2200 
2201             if (result == null || this.EndAcceptChannel(result, out channel))
2202             {
2203                 this.StartAccepting(channel);
2204             }
2205         }
2206 
OnPeekCompleteStatic(IAsyncResult result)2207         static void OnPeekCompleteStatic(IAsyncResult result)
2208         {
2209             if (result.CompletedSynchronously)
2210             {
2211                 return;
2212             }
2213 
2214             SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer
2215                 = (SessionChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState;
2216 
2217             bool releaseThrottle = true;
2218 
2219             try
2220             {
2221                 demuxer.HandlePeekResult(result);
2222                 releaseThrottle = false;
2223             }
2224             catch (CommunicationException e)
2225             {
2226                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2227             }
2228             catch (ObjectDisposedException e)
2229             {
2230                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2231             }
2232             catch (Exception e)
2233             {
2234                 if (Fx.IsFatal(e)) throw;
2235                 demuxer.HandleUnknownException(e);
2236                 releaseThrottle = false;
2237             }
2238             finally
2239             {
2240                 if (releaseThrottle)
2241                 {
2242                     demuxer.throttle.Release();
2243                 }
2244             }
2245         }
2246 
ProcessItem(TInnerChannel channel, TInnerItem item)2247         void ProcessItem(TInnerChannel channel, TInnerItem item)
2248         {
2249             InputQueueChannelListener<TInnerChannel> listener = null;
2250             TInnerChannel wrappedChannel = null;
2251             bool releaseThrottle = true;
2252 
2253             try
2254             {
2255                 Message message = this.GetMessage(item);
2256                 try
2257                 {
2258                     listener = MatchListener(message);
2259                     releaseThrottle = (listener == null);
2260                 }
2261                 // MatchListener could run the filters against an untrusted message and could throw.
2262                 // If so, abort the session
2263                 catch (CommunicationException e)
2264                 {
2265                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2266                     return;
2267                 }
2268                 catch (MultipleFilterMatchesException e)
2269                 {
2270                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2271                     return;
2272                 }
2273                 catch (XmlException e)
2274                 {
2275                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2276                     return;
2277                 }
2278                 finally
2279                 {
2280                     if (releaseThrottle)
2281                     {
2282                         this.throttle.Release();
2283                     }
2284                 }
2285 
2286                 if (listener == null)
2287                 {
2288                     try
2289                     {
2290                         throw TraceUtility.ThrowHelperError(new EndpointNotFoundException(SR.GetString(SR.UnableToDemuxChannel, message.Headers.Action)), message);
2291                     }
2292                     catch (EndpointNotFoundException e)
2293                     {
2294                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2295                         this.EndpointNotFound(channel, item);
2296                         // EndpointNotFound is responsible for closing and aborting the channel
2297                         channel = null;
2298                         item = null;
2299                     }
2300                     return;
2301                 }
2302 
2303                 wrappedChannel = this.CreateChannel(listener, channel, item);
2304                 channel = null;
2305                 item = null;
2306             }
2307             finally
2308             {
2309                 if (item != null)
2310                 {
2311                     this.AbortItem(item);
2312                 }
2313                 if (channel != null)
2314                 {
2315                     channel.Abort();
2316                 }
2317             }
2318 
2319             bool enqueueSucceeded = false;
2320             try
2321             {
2322                 if (this.onItemDequeued == null)
2323                 {
2324                     this.onItemDequeued = new Action(this.OnItemDequeued);
2325                 }
2326 
2327                 listener.InputQueueAcceptor.EnqueueAndDispatch(wrappedChannel, this.onItemDequeued, false);
2328                 enqueueSucceeded = true;
2329             }
2330             catch (Exception e)
2331             {
2332                 if (Fx.IsFatal(e)) throw;
2333                 this.HandleUnknownException(e);
2334             }
2335             finally
2336             {
2337                 if (!enqueueSucceeded)
2338                 {
2339                     this.throttle.Release();
2340                     wrappedChannel.Abort();
2341                 }
2342             }
2343         }
2344 
HandleUnknownException(Exception exception)2345         protected void HandleUnknownException(Exception exception)
2346         {
2347             InputQueueChannelListener<TInnerChannel> listener = null;
2348 
2349             lock (this.ThisLock)
2350             {
2351                 if (this.filterTable.Count > 0)
2352                 {
2353                     KeyValuePair<MessageFilter, InputQueueChannelListener<TInnerChannel>>[] pairs = new KeyValuePair<MessageFilter, InputQueueChannelListener<TInnerChannel>>[this.filterTable.Count];
2354                     this.filterTable.CopyTo(pairs, 0);
2355                     listener = pairs[0].Value;
2356 
2357                     if (this.onItemDequeued == null)
2358                     {
2359                         this.onItemDequeued = new Action(OnItemDequeued);
2360                     }
2361 
2362                     listener.InputQueueAcceptor.EnqueueAndDispatch(exception, this.onItemDequeued, false);
2363                 }
2364             }
2365         }
2366 
StartAccepting(TInnerChannel channelToPeek)2367         void StartAccepting(TInnerChannel channelToPeek)
2368         {
2369             for (;;)
2370             {
2371                 IAsyncResult result;
2372                 bool acceptValid = this.BeginAcceptChannel(true, out result);
2373 
2374                 if (channelToPeek != null)
2375                 {
2376                     if (acceptValid && (result == null || result.CompletedSynchronously))
2377                     {
2378                         // need to spawn another thread to process this completion
2379                         // since we're going to process channelToPeek on this thread
2380                         ActionItem.Schedule(OnStartAccepting, result);
2381                     }
2382 
2383                     PeekChannel(channelToPeek);
2384                     return;
2385                 }
2386                 else
2387                 {
2388                     if (!acceptValid)
2389                     {
2390                         return; // we're done, listener is toast
2391                     }
2392 
2393                     if (result == null)
2394                     {
2395                         continue;
2396                     }
2397 
2398                     if (!result.CompletedSynchronously)
2399                     {
2400                         return;
2401                     }
2402 
2403                     if (!this.EndAcceptChannel(result, out channelToPeek))
2404                     {
2405                         return;
2406                     }
2407                 }
2408             }
2409         }
2410 
2411         class PeekAsyncResult : AsyncResult
2412         {
2413             TInnerChannel channel;
2414             SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer;
2415             TInnerItem item;
2416             static AsyncCallback onOpenComplete = Fx.ThunkCallback(new AsyncCallback(OnOpenCompleteStatic));
2417             static AsyncCallback onReceiveComplete = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompleteStatic));
2418 
PeekAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer, TInnerChannel channel, AsyncCallback callback, object state)2419             public PeekAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer, TInnerChannel channel, AsyncCallback callback, object state)
2420                 : base(callback, state)
2421             {
2422                 this.demuxer = demuxer;
2423                 this.channel = channel;
2424                 IAsyncResult result = this.channel.BeginOpen(onOpenComplete, this);
2425                 if (!result.CompletedSynchronously)
2426                 {
2427                     return;
2428                 }
2429                 if (this.HandleOpenComplete(result))
2430                 {
2431                     this.Complete(true);
2432                 }
2433             }
2434 
End(IAsyncResult result, out TInnerChannel channel, out TInnerItem item)2435             public static void End(IAsyncResult result, out TInnerChannel channel, out TInnerItem item)
2436             {
2437                 PeekAsyncResult peekResult = AsyncResult.End<PeekAsyncResult>(result);
2438                 channel = peekResult.channel;
2439                 item = peekResult.item;
2440             }
2441 
HandleOpenComplete(IAsyncResult result)2442             bool HandleOpenComplete(IAsyncResult result)
2443             {
2444                 this.channel.EndOpen(result);
2445 
2446                 IAsyncResult receiveResult;
2447 
2448                 if (this.demuxer.peekTimeout == ChannelDemuxer.UseDefaultReceiveTimeout)
2449                 {
2450                     //use the default ReceiveTimeout for the channel
2451                     receiveResult = this.demuxer.BeginReceive(this.channel, onReceiveComplete, this);
2452                 }
2453                 else
2454                 {
2455                     receiveResult = this.demuxer.BeginReceive(this.channel, this.demuxer.peekTimeout, onReceiveComplete, this);
2456                 }
2457 
2458                 if (receiveResult.CompletedSynchronously)
2459                 {
2460                     this.HandleReceiveComplete(receiveResult);
2461                     return true;
2462                 }
2463 
2464                 return false;
2465             }
2466 
OnOpenCompleteStatic(IAsyncResult result)2467             static void OnOpenCompleteStatic(IAsyncResult result)
2468             {
2469                 if (result.CompletedSynchronously)
2470                     return;
2471 
2472                 PeekAsyncResult peekAsyncResult = (PeekAsyncResult)result.AsyncState;
2473 
2474                 bool completeSelf = false;
2475                 Exception exception = null;
2476 
2477                 try
2478                 {
2479                     completeSelf = peekAsyncResult.HandleOpenComplete(result);
2480                 }
2481                 catch (Exception e)
2482                 {
2483                     if (Fx.IsFatal(e))
2484                     {
2485                         throw;
2486                     }
2487                     exception = e;
2488                     completeSelf = true;
2489                 }
2490 
2491                 if (completeSelf)
2492                 {
2493                     peekAsyncResult.Complete(false, exception);
2494                 }
2495             }
2496 
HandleReceiveComplete(IAsyncResult result)2497             void HandleReceiveComplete(IAsyncResult result)
2498             {
2499                 this.item = this.demuxer.EndReceive(this.channel, result);
2500             }
2501 
OnReceiveCompleteStatic(IAsyncResult result)2502             static void OnReceiveCompleteStatic(IAsyncResult result)
2503             {
2504                 if (result.CompletedSynchronously)
2505                     return;
2506 
2507                 PeekAsyncResult peekAsyncResult = (PeekAsyncResult)result.AsyncState;
2508                 Exception exception = null;
2509 
2510                 try
2511                 {
2512                     peekAsyncResult.HandleReceiveComplete(result);
2513                 }
2514                 catch (Exception e)
2515                 {
2516                     if (Fx.IsFatal(e))
2517                     {
2518                         throw;
2519                     }
2520                     exception = e;
2521                 }
2522 
2523                 peekAsyncResult.Complete(false, exception);
2524             }
2525         }
2526 
2527         class OpenAsyncResult : AsyncResult
2528         {
2529             static FastAsyncCallback waitOverCallback = new FastAsyncCallback(WaitOverCallback);
2530             static AsyncCallback openListenerCallback = Fx.ThunkCallback(new AsyncCallback(OpenListenerCallback));
2531             SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer;
2532             ChannelDemuxerFilter filter;
2533             IChannelListener listener;
2534             TimeoutHelper timeoutHelper;
2535             bool startAccepting;
2536 
OpenAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)2537             public OpenAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
2538                 : base(callback, state)
2539             {
2540                 this.channelDemuxer = channelDemuxer;
2541                 this.filter = filter;
2542                 this.listener = listener;
2543                 this.timeoutHelper = new TimeoutHelper(timeout);
2544                 if (!this.channelDemuxer.openSemaphore.EnterAsync(this.timeoutHelper.RemainingTime(), waitOverCallback, this))
2545                 {
2546                     return;
2547                 }
2548 
2549                 bool waitOverSucceeded = false;
2550                 bool completeSelf = false;
2551                 try
2552                 {
2553                     completeSelf = this.OnWaitOver();
2554                     waitOverSucceeded = true;
2555                 }
2556                 finally
2557                 {
2558                     if (!waitOverSucceeded)
2559                     {
2560                         Cleanup();
2561                     }
2562                 }
2563                 if (completeSelf)
2564                 {
2565                     Cleanup();
2566                     Complete(true);
2567                 }
2568             }
2569 
WaitOverCallback(object state, Exception asyncException)2570             static void WaitOverCallback(object state, Exception asyncException)
2571             {
2572                 OpenAsyncResult self = (OpenAsyncResult)state;
2573                 bool completeSelf = false;
2574                 Exception completionException = asyncException;
2575                 if (completionException != null)
2576                 {
2577                     completeSelf = true;
2578                 }
2579                 else
2580                 {
2581                     try
2582                     {
2583                         completeSelf = self.OnWaitOver();
2584                     }
2585 #pragma warning suppress 56500 // covered by FxCOP
2586                     catch (Exception e)
2587                     {
2588                         if (Fx.IsFatal(e)) throw;
2589                         completeSelf = true;
2590                         completionException = e;
2591                     }
2592                 }
2593 
2594                 if (completeSelf)
2595                 {
2596                     self.Cleanup();
2597                     self.Complete(false, completionException);
2598                 }
2599             }
2600 
OnWaitOver()2601             bool OnWaitOver()
2602             {
2603                 this.startAccepting = this.channelDemuxer.ShouldStartAccepting(this.filter, this.listener);
2604                 if (!this.startAccepting)
2605                 {
2606                     this.channelDemuxer.ThrowPendingOpenExceptionIfAny();
2607                     return true;
2608                 }
2609                 else
2610                 {
2611                     return this.OnStartAccepting();
2612                 }
2613             }
2614 
OnEndInnerListenerOpen(IAsyncResult result)2615             void OnEndInnerListenerOpen(IAsyncResult result)
2616             {
2617                 this.channelDemuxer.innerListener.EndOpen(result);
2618                 this.channelDemuxer.StartAccepting(true);
2619                 lock (this.channelDemuxer.ThisLock)
2620                 {
2621                     if (this.channelDemuxer.abortOngoingOpen)
2622                     {
2623                         this.channelDemuxer.innerListener.Abort();
2624                     }
2625                 }
2626             }
2627 
OnStartAccepting()2628             bool OnStartAccepting()
2629             {
2630                 try
2631                 {
2632                     IAsyncResult result = this.channelDemuxer.innerListener.BeginOpen(timeoutHelper.RemainingTime(), openListenerCallback, this);
2633                     if (!result.CompletedSynchronously)
2634                     {
2635                         return false;
2636                     }
2637                     this.OnEndInnerListenerOpen(result);
2638                     return true;
2639                 }
2640 #pragma warning suppress 56500 // covered by FxCOP
2641                 catch (Exception e)
2642                 {
2643                     this.channelDemuxer.pendingExceptionOnOpen = e;
2644                     throw;
2645                 }
2646             }
2647 
OpenListenerCallback(IAsyncResult result)2648             static void OpenListenerCallback(IAsyncResult result)
2649             {
2650                 if (result.CompletedSynchronously)
2651                 {
2652                     return;
2653                 }
2654                 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
2655                 Exception completionException = null;
2656                 try
2657                 {
2658                     self.OnEndInnerListenerOpen(result);
2659                 }
2660 #pragma warning suppress 56500 // covered by FxCOP
2661                 catch (Exception e)
2662                 {
2663                     if (Fx.IsFatal(e)) throw;
2664                     completionException = e;
2665                 }
2666                 if (completionException != null)
2667                 {
2668                     self.channelDemuxer.pendingExceptionOnOpen = completionException;
2669                 }
2670                 self.Cleanup();
2671                 self.Complete(false, completionException);
2672             }
2673 
Cleanup()2674             void Cleanup()
2675             {
2676                 this.channelDemuxer.openSemaphore.Exit();
2677             }
2678 
End(IAsyncResult result)2679             public static void End(IAsyncResult result)
2680             {
2681                 AsyncResult.End<OpenAsyncResult>(result);
2682             }
2683         }
2684     }
2685 
2686     class InputSessionChannelDemuxer : SessionChannelDemuxer<IInputSessionChannel, Message>
2687     {
InputSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)2688         public InputSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
2689             : base(context, peekTimeout, maxPendingSessions)
2690         {
2691         }
2692 
AbortItem(Message message)2693         protected override void AbortItem(Message message)
2694         {
2695             AbortMessage(message);
2696         }
2697 
BeginReceive(IInputSessionChannel channel, AsyncCallback callback, object state)2698         protected override IAsyncResult BeginReceive(IInputSessionChannel channel, AsyncCallback callback, object state)
2699         {
2700             return channel.BeginReceive(callback, state);
2701         }
2702 
BeginReceive(IInputSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)2703         protected override IAsyncResult BeginReceive(IInputSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
2704         {
2705             return channel.BeginReceive(timeout, callback, state);
2706         }
2707 
CreateChannel(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)2708         protected override IInputSessionChannel CreateChannel(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)
2709         {
2710             return new InputSessionChannelWrapper(channelManager, innerChannel, firstMessage);
2711         }
2712 
EndpointNotFound(IInputSessionChannel channel, Message message)2713         protected override void EndpointNotFound(IInputSessionChannel channel, Message message)
2714         {
2715             if (this.DemuxFailureHandler != null)
2716             {
2717                 this.DemuxFailureHandler.HandleDemuxFailure(message);
2718             }
2719             this.AbortItem(message);
2720             channel.Abort();
2721         }
2722 
EndReceive(IInputSessionChannel channel, IAsyncResult result)2723         protected override Message EndReceive(IInputSessionChannel channel, IAsyncResult result)
2724         {
2725             return channel.EndReceive(result);
2726         }
2727 
GetMessage(Message message)2728         protected override Message GetMessage(Message message)
2729         {
2730             return message;
2731         }
2732     }
2733 
2734     class InputSessionChannelWrapper : InputChannelWrapper, IInputSessionChannel
2735     {
InputSessionChannelWrapper(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)2736         public InputSessionChannelWrapper(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)
2737             : base(channelManager, innerChannel, firstMessage)
2738         {
2739         }
2740 
2741         new IInputSessionChannel InnerChannel
2742         {
2743             get { return (IInputSessionChannel)base.InnerChannel; }
2744         }
2745 
2746         public IInputSession Session
2747         {
2748             get { return this.InnerChannel.Session; }
2749         }
2750     }
2751 
2752     class DuplexSessionChannelDemuxer : SessionChannelDemuxer<IDuplexSessionChannel, Message>
2753     {
DuplexSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)2754         public DuplexSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
2755             : base(context, peekTimeout, maxPendingSessions)
2756         {
2757         }
2758 
AbortItem(Message message)2759         protected override void AbortItem(Message message)
2760         {
2761             AbortMessage(message);
2762         }
2763 
BeginReceive(IDuplexSessionChannel channel, AsyncCallback callback, object state)2764         protected override IAsyncResult BeginReceive(IDuplexSessionChannel channel, AsyncCallback callback, object state)
2765         {
2766             return channel.BeginReceive(callback, state);
2767         }
2768 
BeginReceive(IDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)2769         protected override IAsyncResult BeginReceive(IDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
2770         {
2771             return channel.BeginReceive(timeout, callback, state);
2772         }
2773 
CreateChannel(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)2774         protected override IDuplexSessionChannel CreateChannel(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)
2775         {
2776             return new DuplexSessionChannelWrapper(channelManager, innerChannel, firstMessage);
2777         }
2778 
EndpointNotFoundCallback(IAsyncResult result)2779         void EndpointNotFoundCallback(IAsyncResult result)
2780         {
2781             if (result.CompletedSynchronously)
2782             {
2783                 return;
2784             }
2785             ChannelAndMessageAsyncState channelAndMessage = (ChannelAndMessageAsyncState)result.AsyncState;
2786             bool doAbort = true;
2787             try
2788             {
2789                 DuplexSessionDemuxFailureAsyncResult.End(result);
2790                 doAbort = false;
2791             }
2792             catch (TimeoutException e)
2793             {
2794                 if (TD.SendTimeoutIsEnabled())
2795                 {
2796                     TD.SendTimeout(e.Message);
2797                 }
2798                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2799             }
2800             catch (CommunicationException e)
2801             {
2802                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2803             }
2804             catch (ObjectDisposedException e)
2805             {
2806                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2807             }
2808             catch (Exception e)
2809             {
2810                 if (Fx.IsFatal(e)) throw;
2811                 this.HandleUnknownException(e);
2812             }
2813             finally
2814             {
2815                 if (doAbort)
2816                 {
2817                     this.AbortItem(channelAndMessage.message);
2818                     channelAndMessage.channel.Abort();
2819                 }
2820             }
2821         }
2822 
EndpointNotFound(IDuplexSessionChannel channel, Message message)2823         protected override void EndpointNotFound(IDuplexSessionChannel channel, Message message)
2824         {
2825             bool doAbort = true;
2826             try
2827             {
2828                 if (this.DemuxFailureHandler != null)
2829                 {
2830                     try
2831                     {
2832                         DuplexSessionDemuxFailureAsyncResult result = new DuplexSessionDemuxFailureAsyncResult(this.DemuxFailureHandler, channel, message, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), new ChannelAndMessageAsyncState(channel, message));
2833                         result.Start();
2834                         if (!result.CompletedSynchronously)
2835                         {
2836                             doAbort = false;
2837                             return;
2838                         }
2839                         DuplexSessionDemuxFailureAsyncResult.End(result);
2840                         doAbort = false;
2841                     }
2842                     catch (CommunicationException e)
2843                     {
2844                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2845                     }
2846                     catch (TimeoutException e)
2847                     {
2848                         if (TD.SendTimeoutIsEnabled())
2849                         {
2850                             TD.SendTimeout(e.Message);
2851                         }
2852                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2853                     }
2854                     catch (ObjectDisposedException e)
2855                     {
2856                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2857                     }
2858                     catch (Exception e)
2859                     {
2860                         if (Fx.IsFatal(e)) throw;
2861                         this.HandleUnknownException(e);
2862                     }
2863                 }
2864             }
2865             finally
2866             {
2867                 if (doAbort)
2868                 {
2869                     this.AbortItem(message);
2870                     channel.Abort();
2871                 }
2872             }
2873         }
2874 
EndReceive(IDuplexSessionChannel channel, IAsyncResult result)2875         protected override Message EndReceive(IDuplexSessionChannel channel, IAsyncResult result)
2876         {
2877             return channel.EndReceive(result);
2878         }
2879 
GetMessage(Message message)2880         protected override Message GetMessage(Message message)
2881         {
2882             return message;
2883         }
2884 
2885         struct ChannelAndMessageAsyncState
2886         {
2887             public IChannel channel;
2888             public Message message;
2889 
ChannelAndMessageAsyncStateSystem.ServiceModel.Channels.DuplexSessionChannelDemuxer.ChannelAndMessageAsyncState2890             public ChannelAndMessageAsyncState(IChannel channel, Message message)
2891             {
2892                 this.channel = channel;
2893                 this.message = message;
2894             }
2895         }
2896     }
2897 
2898     class DuplexSessionChannelWrapper : InputChannelWrapper, IDuplexSessionChannel
2899     {
DuplexSessionChannelWrapper(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)2900         public DuplexSessionChannelWrapper(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)
2901             : base(channelManager, innerChannel, firstMessage)
2902         {
2903         }
2904 
2905         new IDuplexSessionChannel InnerChannel
2906         {
2907             get { return (IDuplexSessionChannel)base.InnerChannel; }
2908         }
2909 
2910         public IDuplexSession Session
2911         {
2912             get { return InnerChannel.Session; }
2913         }
2914 
2915         public EndpointAddress RemoteAddress
2916         {
2917             get { return InnerChannel.RemoteAddress; }
2918         }
2919 
2920         public Uri Via
2921         {
2922             get { return InnerChannel.Via; }
2923         }
2924 
Send(Message message)2925         public void Send(Message message)
2926         {
2927             this.InnerChannel.Send(message);
2928         }
2929 
Send(Message message, TimeSpan timeout)2930         public void Send(Message message, TimeSpan timeout)
2931         {
2932             this.InnerChannel.Send(message, timeout);
2933         }
2934 
BeginSend(Message message, AsyncCallback callback, object state)2935         public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
2936         {
2937             return this.InnerChannel.BeginSend(message, callback, state);
2938         }
2939 
BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)2940         public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
2941         {
2942             return this.InnerChannel.BeginSend(message, timeout, callback, state);
2943         }
2944 
EndSend(IAsyncResult result)2945         public void EndSend(IAsyncResult result)
2946         {
2947             this.InnerChannel.EndSend(result);
2948         }
2949     }
2950 
2951     class ReplySessionChannelDemuxer : SessionChannelDemuxer<IReplySessionChannel, RequestContext>
2952     {
ReplySessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)2953         public ReplySessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
2954             : base(context, peekTimeout, maxPendingSessions)
2955         {
2956         }
2957 
AbortItem(RequestContext request)2958         protected override void AbortItem(RequestContext request)
2959         {
2960             AbortMessage(request);
2961             request.Abort();
2962         }
2963 
BeginReceive(IReplySessionChannel channel, AsyncCallback callback, object state)2964         protected override IAsyncResult BeginReceive(IReplySessionChannel channel, AsyncCallback callback, object state)
2965         {
2966             return channel.BeginReceiveRequest(callback, state);
2967         }
2968 
BeginReceive(IReplySessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)2969         protected override IAsyncResult BeginReceive(IReplySessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
2970         {
2971             return channel.BeginReceiveRequest(timeout, callback, state);
2972         }
2973 
CreateChannel(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)2974         protected override IReplySessionChannel CreateChannel(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)
2975         {
2976             return new ReplySessionChannelWrapper(channelManager, innerChannel, firstRequest);
2977         }
2978 
EndpointNotFoundCallback(IAsyncResult result)2979         void EndpointNotFoundCallback(IAsyncResult result)
2980         {
2981             if (result.CompletedSynchronously)
2982             {
2983                 return;
2984             }
2985             ChannelAndRequestAsyncState channelAndRequest = (ChannelAndRequestAsyncState)result.AsyncState;
2986             bool doAbort = true;
2987             try
2988             {
2989                 ReplySessionDemuxFailureAsyncResult.End(result);
2990                 doAbort = false;
2991             }
2992             catch (TimeoutException e)
2993             {
2994                 if (TD.SendTimeoutIsEnabled())
2995                 {
2996                     TD.SendTimeout(e.Message);
2997                 }
2998                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
2999             }
3000             catch (CommunicationException e)
3001             {
3002                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
3003             }
3004             catch (ObjectDisposedException e)
3005             {
3006                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
3007             }
3008             catch (Exception e)
3009             {
3010                 if (Fx.IsFatal(e)) throw;
3011                 this.HandleUnknownException(e);
3012             }
3013             finally
3014             {
3015                 if (doAbort)
3016                 {
3017                     this.AbortItem(channelAndRequest.request);
3018                     channelAndRequest.channel.Abort();
3019                 }
3020             }
3021         }
3022 
EndpointNotFound(IReplySessionChannel channel, RequestContext request)3023         protected override void EndpointNotFound(IReplySessionChannel channel, RequestContext request)
3024         {
3025             bool doAbort = true;
3026             try
3027             {
3028                 if (this.DemuxFailureHandler != null)
3029                 {
3030                     try
3031                     {
3032                         ReplySessionDemuxFailureAsyncResult result = new ReplySessionDemuxFailureAsyncResult(this.DemuxFailureHandler, request, channel, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), new ChannelAndRequestAsyncState(channel, request));
3033                         result.Start();
3034                         if (!result.CompletedSynchronously)
3035                         {
3036                             doAbort = false;
3037                             return;
3038                         }
3039                         ReplySessionDemuxFailureAsyncResult.End(result);
3040                         doAbort = false;
3041                     }
3042                     catch (CommunicationException e)
3043                     {
3044                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
3045                     }
3046                     catch (TimeoutException e)
3047                     {
3048                         if (TD.SendTimeoutIsEnabled())
3049                         {
3050                             TD.SendTimeout(e.Message);
3051                         }
3052                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
3053                     }
3054                     catch (ObjectDisposedException e)
3055                     {
3056                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
3057                     }
3058                     catch (Exception e)
3059                     {
3060                         if (Fx.IsFatal(e)) throw;
3061                         this.HandleUnknownException(e);
3062                     }
3063                 }
3064             }
3065             finally
3066             {
3067                 if (doAbort)
3068                 {
3069                     this.AbortItem(request);
3070                     channel.Abort();
3071                 }
3072             }
3073         }
3074 
EndReceive(IReplySessionChannel channel, IAsyncResult result)3075         protected override RequestContext EndReceive(IReplySessionChannel channel, IAsyncResult result)
3076         {
3077             return channel.EndReceiveRequest(result);
3078         }
3079 
GetMessage(RequestContext request)3080         protected override Message GetMessage(RequestContext request)
3081         {
3082             return request.RequestMessage;
3083         }
3084 
3085         struct ChannelAndRequestAsyncState
3086         {
3087             public IChannel channel;
3088             public RequestContext request;
3089 
ChannelAndRequestAsyncStateSystem.ServiceModel.Channels.ReplySessionChannelDemuxer.ChannelAndRequestAsyncState3090             public ChannelAndRequestAsyncState(IChannel channel, RequestContext request)
3091             {
3092                 this.channel = channel;
3093                 this.request = request;
3094             }
3095         }
3096     }
3097 
3098     class ReplySessionChannelWrapper : ReplyChannelWrapper, IReplySessionChannel
3099     {
ReplySessionChannelWrapper(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)3100         public ReplySessionChannelWrapper(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)
3101             : base(channelManager, innerChannel, firstRequest)
3102         {
3103         }
3104 
3105         new IReplySessionChannel InnerChannel
3106         {
3107             get { return (IReplySessionChannel)base.InnerChannel; }
3108         }
3109 
3110         public IInputSession Session
3111         {
3112             get { return this.InnerChannel.Session; }
3113         }
3114     }
3115 
3116     abstract class ChannelWrapper<TChannel, TItem> : LayeredChannel<TChannel>
3117         where TChannel : class, IChannel
3118         where TItem : class, IDisposable
3119     {
3120         TItem firstItem;
3121 
ChannelWrapper(ChannelManagerBase channelManager, TChannel innerChannel, TItem firstItem)3122         public ChannelWrapper(ChannelManagerBase channelManager, TChannel innerChannel, TItem firstItem)
3123             : base(channelManager, innerChannel)
3124         {
3125             this.firstItem = firstItem;
3126         }
3127 
CloseFirstItem(TimeSpan timeout)3128         protected abstract void CloseFirstItem(TimeSpan timeout);
3129 
GetFirstItem()3130         protected TItem GetFirstItem()
3131         {
3132             return Interlocked.Exchange<TItem>(ref this.firstItem, null);
3133         }
3134 
HaveFirstItem()3135         protected bool HaveFirstItem()
3136         {
3137             return (this.firstItem != null);
3138         }
3139 
OnAbort()3140         protected override void OnAbort()
3141         {
3142             base.OnAbort();
3143             this.CloseFirstItem(TimeSpan.Zero);
3144         }
3145 
OnClose(TimeSpan timeout)3146         protected override void OnClose(TimeSpan timeout)
3147         {
3148             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
3149             this.CloseFirstItem(timeoutHelper.RemainingTime());
3150             base.OnClose(timeoutHelper.RemainingTime());
3151         }
3152 
OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)3153         protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
3154         {
3155             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
3156             this.CloseFirstItem(timeoutHelper.RemainingTime());
3157             return base.OnBeginClose(timeoutHelper.RemainingTime(), callback, state);
3158         }
3159 
OnEndClose(IAsyncResult result)3160         protected override void OnEndClose(IAsyncResult result)
3161         {
3162             base.OnEndClose(result);
3163         }
3164 
3165         protected class ReceiveAsyncResult : AsyncResult
3166         {
3167             TItem item;
3168 
ReceiveAsyncResult(TItem item, AsyncCallback callback, object state)3169             public ReceiveAsyncResult(TItem item, AsyncCallback callback, object state)
3170                 : base(callback, state)
3171             {
3172                 this.item = item;
3173                 this.Complete(true);
3174             }
3175 
End(IAsyncResult result)3176             public static TItem End(IAsyncResult result)
3177             {
3178                 ReceiveAsyncResult receiveResult = AsyncResult.End<ReceiveAsyncResult>(result);
3179                 return receiveResult.item;
3180             }
3181         }
3182 
3183         protected class WaitAsyncResult : AsyncResult
3184         {
WaitAsyncResult(AsyncCallback callback, object state)3185             public WaitAsyncResult(AsyncCallback callback, object state)
3186                 : base(callback, state)
3187             {
3188                 this.Complete(true);
3189             }
3190 
End(IAsyncResult result)3191             public static bool End(IAsyncResult result)
3192             {
3193                 WaitAsyncResult waitResult = AsyncResult.End<WaitAsyncResult>(result);
3194                 return true;
3195             }
3196         }
3197     }
3198 
3199     class InputChannelWrapper : ChannelWrapper<IInputChannel, Message>, IInputChannel
3200     {
InputChannelWrapper(ChannelManagerBase channelManager, IInputChannel innerChannel, Message firstMessage)3201         public InputChannelWrapper(ChannelManagerBase channelManager, IInputChannel innerChannel, Message firstMessage)
3202             : base(channelManager, innerChannel, firstMessage)
3203         {
3204         }
3205 
3206         public EndpointAddress LocalAddress
3207         {
3208             get { return this.InnerChannel.LocalAddress; }
3209         }
3210 
OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)3211         protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
3212         {
3213             return new CompletedAsyncResult(callback, state);
3214         }
3215 
OnEndOpen(IAsyncResult result)3216         protected override void OnEndOpen(IAsyncResult result)
3217         {
3218             CompletedAsyncResult.End(result);
3219         }
3220 
OnOpen(TimeSpan timeout)3221         protected override void OnOpen(TimeSpan timeout)
3222         {
3223         }
3224 
CloseFirstItem(TimeSpan timeout)3225         protected override void CloseFirstItem(TimeSpan timeout)
3226         {
3227             Message message = this.GetFirstItem();
3228             if (message != null)
3229             {
3230                 TypedChannelDemuxer.AbortMessage(message);
3231             }
3232         }
3233 
Receive()3234         public Message Receive()
3235         {
3236             Message message = this.GetFirstItem();
3237             if (message != null)
3238                 return message;
3239             return this.InnerChannel.Receive();
3240         }
3241 
Receive(TimeSpan timeout)3242         public Message Receive(TimeSpan timeout)
3243         {
3244             Message message = this.GetFirstItem();
3245             if (message != null)
3246                 return message;
3247             return this.InnerChannel.Receive(timeout);
3248         }
3249 
BeginReceive(AsyncCallback callback, object state)3250         public IAsyncResult BeginReceive(AsyncCallback callback, object state)
3251         {
3252             Message message = this.GetFirstItem();
3253             if (message != null)
3254                 return new ReceiveAsyncResult(message, callback, state);
3255             return this.InnerChannel.BeginReceive(callback, state);
3256         }
3257 
BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)3258         public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
3259         {
3260             Message message = this.GetFirstItem();
3261             if (message != null)
3262                 return new ReceiveAsyncResult(message, callback, state);
3263             return this.InnerChannel.BeginReceive(timeout, callback, state);
3264         }
3265 
EndReceive(IAsyncResult result)3266         public Message EndReceive(IAsyncResult result)
3267         {
3268             if (result is ReceiveAsyncResult)
3269                 return ReceiveAsyncResult.End(result);
3270             return this.InnerChannel.EndReceive(result);
3271         }
3272 
TryReceive(TimeSpan timeout, out Message message)3273         public bool TryReceive(TimeSpan timeout, out Message message)
3274         {
3275             message = this.GetFirstItem();
3276             if (message != null)
3277                 return true;
3278             return this.InnerChannel.TryReceive(timeout, out message);
3279         }
3280 
BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)3281         public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
3282         {
3283             Message message = this.GetFirstItem();
3284             if (message != null)
3285                 return new ReceiveAsyncResult(message, callback, state);
3286             return this.InnerChannel.BeginTryReceive(timeout, callback, state);
3287         }
3288 
EndTryReceive(IAsyncResult result, out Message message)3289         public bool EndTryReceive(IAsyncResult result, out Message message)
3290         {
3291             if (result is ReceiveAsyncResult)
3292             {
3293                 message = ReceiveAsyncResult.End(result);
3294                 return true;
3295             }
3296             return this.InnerChannel.EndTryReceive(result, out message);
3297         }
3298 
WaitForMessage(TimeSpan timeout)3299         public bool WaitForMessage(TimeSpan timeout)
3300         {
3301             if (this.HaveFirstItem())
3302                 return true;
3303             return this.InnerChannel.WaitForMessage(timeout);
3304         }
3305 
BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)3306         public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
3307         {
3308             if (this.HaveFirstItem())
3309                 return new WaitAsyncResult(callback, state);
3310             return this.InnerChannel.BeginWaitForMessage(timeout, callback, state);
3311         }
3312 
EndWaitForMessage(IAsyncResult result)3313         public bool EndWaitForMessage(IAsyncResult result)
3314         {
3315             if (result is WaitAsyncResult)
3316                 return WaitAsyncResult.End(result);
3317             return this.InnerChannel.EndWaitForMessage(result);
3318         }
3319     }
3320 
3321     class ReplyChannelWrapper : ChannelWrapper<IReplyChannel, RequestContext>, IReplyChannel
3322     {
ReplyChannelWrapper(ChannelManagerBase channelManager, IReplyChannel innerChannel, RequestContext firstRequest)3323         public ReplyChannelWrapper(ChannelManagerBase channelManager, IReplyChannel innerChannel, RequestContext firstRequest)
3324             : base(channelManager, innerChannel, firstRequest)
3325         {
3326         }
3327 
3328         public EndpointAddress LocalAddress
3329         {
3330             get { return this.InnerChannel.LocalAddress; }
3331         }
3332 
OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)3333         protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
3334         {
3335             return new CompletedAsyncResult(callback, state);
3336         }
3337 
OnEndOpen(IAsyncResult result)3338         protected override void OnEndOpen(IAsyncResult result)
3339         {
3340             CompletedAsyncResult.End(result);
3341         }
3342 
OnOpen(TimeSpan timeout)3343         protected override void OnOpen(TimeSpan timeout)
3344         {
3345         }
3346 
CloseFirstItem(TimeSpan timeout)3347         protected override void CloseFirstItem(TimeSpan timeout)
3348         {
3349             RequestContext request = this.GetFirstItem();
3350             if (request != null)
3351             {
3352                 try
3353                 {
3354                     request.RequestMessage.Close();
3355                     request.Close(timeout);
3356                 }
3357                 catch (CommunicationException e)
3358                 {
3359                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
3360                 }
3361                 catch (TimeoutException e)
3362                 {
3363                     if (TD.CloseTimeoutIsEnabled())
3364                     {
3365                         TD.CloseTimeout(e.Message);
3366                     }
3367                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
3368                 }
3369             }
3370         }
3371 
ReceiveRequest()3372         public RequestContext ReceiveRequest()
3373         {
3374             RequestContext request = this.GetFirstItem();
3375             if (request != null)
3376                 return request;
3377             return this.InnerChannel.ReceiveRequest();
3378         }
3379 
ReceiveRequest(TimeSpan timeout)3380         public RequestContext ReceiveRequest(TimeSpan timeout)
3381         {
3382             RequestContext request = this.GetFirstItem();
3383             if (request != null)
3384                 return request;
3385             return this.InnerChannel.ReceiveRequest(timeout);
3386         }
3387 
BeginReceiveRequest(AsyncCallback callback, object state)3388         public IAsyncResult BeginReceiveRequest(AsyncCallback callback, object state)
3389         {
3390             RequestContext request = this.GetFirstItem();
3391             if (request != null)
3392                 return new ReceiveAsyncResult(request, callback, state);
3393             return this.InnerChannel.BeginReceiveRequest(callback, state);
3394         }
3395 
BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)3396         public IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
3397         {
3398             RequestContext request = this.GetFirstItem();
3399             if (request != null)
3400                 return new ReceiveAsyncResult(request, callback, state);
3401             return this.InnerChannel.BeginReceiveRequest(timeout, callback, state);
3402         }
3403 
EndReceiveRequest(IAsyncResult result)3404         public RequestContext EndReceiveRequest(IAsyncResult result)
3405         {
3406             if (result is ReceiveAsyncResult)
3407                 return ReceiveAsyncResult.End(result);
3408             return this.InnerChannel.EndReceiveRequest(result);
3409         }
3410 
TryReceiveRequest(TimeSpan timeout, out RequestContext request)3411         public bool TryReceiveRequest(TimeSpan timeout, out RequestContext request)
3412         {
3413             request = this.GetFirstItem();
3414             if (request != null)
3415                 return true;
3416             return this.InnerChannel.TryReceiveRequest(timeout, out request);
3417         }
3418 
BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)3419         public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
3420         {
3421             RequestContext request = this.GetFirstItem();
3422             if (request != null)
3423                 return new ReceiveAsyncResult(request, callback, state);
3424             return this.InnerChannel.BeginTryReceiveRequest(timeout, callback, state);
3425         }
3426 
EndTryReceiveRequest(IAsyncResult result, out RequestContext request)3427         public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext request)
3428         {
3429             if (result is ReceiveAsyncResult)
3430             {
3431                 request = ReceiveAsyncResult.End(result);
3432                 return true;
3433             }
3434             return this.InnerChannel.EndTryReceiveRequest(result, out request);
3435         }
3436 
WaitForRequest(TimeSpan timeout)3437         public bool WaitForRequest(TimeSpan timeout)
3438         {
3439             if (this.HaveFirstItem())
3440                 return true;
3441             return this.InnerChannel.WaitForRequest(timeout);
3442         }
3443 
BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)3444         public IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)
3445         {
3446             if (this.HaveFirstItem())
3447                 return new WaitAsyncResult(callback, state);
3448             return this.InnerChannel.BeginWaitForRequest(timeout, callback, state);
3449         }
3450 
EndWaitForRequest(IAsyncResult result)3451         public bool EndWaitForRequest(IAsyncResult result)
3452         {
3453             if (result is WaitAsyncResult)
3454                 return WaitAsyncResult.End(result);
3455             return this.InnerChannel.EndWaitForRequest(result);
3456         }
3457     }
3458 
3459     class InputQueueChannelListener<TChannel> : DelegatingChannelListener<TChannel>
3460         where TChannel : class, IChannel
3461     {
3462         ChannelDemuxerFilter filter;
3463         IChannelDemuxer channelDemuxer;
3464 
InputQueueChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)3465         public InputQueueChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)
3466             : base(true)
3467         {
3468             this.filter = filter;
3469             this.channelDemuxer = channelDemuxer;
3470             this.Acceptor = new InputQueueChannelAcceptor<TChannel>(this);
3471         }
3472 
3473         public ChannelDemuxerFilter Filter
3474         {
3475             get { return this.filter; }
3476         }
3477 
3478         public InputQueueChannelAcceptor<TChannel> InputQueueAcceptor
3479         {
3480             get { return (InputQueueChannelAcceptor<TChannel>)base.Acceptor; }
3481         }
3482 
OnOpen(TimeSpan timeout)3483         protected override void OnOpen(TimeSpan timeout)
3484         {
3485             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
3486             this.channelDemuxer.OnOuterListenerOpen(this.filter, this, timeoutHelper.RemainingTime());
3487             base.OnOpen(timeoutHelper.RemainingTime());
3488         }
3489 
OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)3490         protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
3491         {
3492             return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerOpen, this.OnEndOuterListenerOpen, base.OnBeginOpen, base.OnEndOpen);
3493         }
3494 
OnEndOpen(IAsyncResult result)3495         protected override void OnEndOpen(IAsyncResult result)
3496         {
3497             ChainedAsyncResult.End(result);
3498         }
3499 
OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)3500         IAsyncResult OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)
3501         {
3502             return this.channelDemuxer.OnBeginOuterListenerOpen(this.filter, this, timeout, callback, state);
3503         }
3504 
OnEndOuterListenerOpen(IAsyncResult result)3505         void OnEndOuterListenerOpen(IAsyncResult result)
3506         {
3507             this.channelDemuxer.OnEndOuterListenerOpen(result);
3508         }
3509 
OnAbort()3510         protected override void OnAbort()
3511         {
3512             this.channelDemuxer.OnOuterListenerAbort(this.filter);
3513             base.OnAbort();
3514         }
3515 
OnClose(TimeSpan timeout)3516         protected override void OnClose(TimeSpan timeout)
3517         {
3518             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
3519             this.channelDemuxer.OnOuterListenerClose(this.filter, timeoutHelper.RemainingTime());
3520             base.OnClose(timeoutHelper.RemainingTime());
3521         }
3522 
OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)3523         protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
3524         {
3525             return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerClose, this.OnEndOuterListenerClose, base.OnBeginClose, base.OnEndClose);
3526         }
3527 
OnEndClose(IAsyncResult result)3528         protected override void OnEndClose(IAsyncResult result)
3529         {
3530             ChainedAsyncResult.End(result);
3531         }
3532 
OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)3533         IAsyncResult OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)
3534         {
3535             return this.channelDemuxer.OnBeginOuterListenerClose(this.filter, timeout, callback, state);
3536         }
3537 
OnEndOuterListenerClose(IAsyncResult result)3538         void OnEndOuterListenerClose(IAsyncResult result)
3539         {
3540             this.channelDemuxer.OnEndOuterListenerClose(result);
3541         }
3542     }
3543 
3544     //
3545     // Binding element
3546     //
3547 
3548     class ChannelDemuxerBindingElement : BindingElement
3549     {
3550         ChannelDemuxer demuxer;
3551         CachedBindingContextState cachedContextState;
3552         bool cacheContextState;
3553 
ChannelDemuxerBindingElement(bool cacheContextState)3554         public ChannelDemuxerBindingElement(bool cacheContextState)
3555         {
3556             this.cacheContextState = cacheContextState;
3557             if (cacheContextState)
3558             {
3559                 this.cachedContextState = new CachedBindingContextState();
3560             }
3561             this.demuxer = new ChannelDemuxer();
3562         }
3563 
ChannelDemuxerBindingElement(ChannelDemuxerBindingElement element)3564         public ChannelDemuxerBindingElement(ChannelDemuxerBindingElement element)
3565         {
3566             this.demuxer = element.demuxer;
3567             this.cacheContextState = element.cacheContextState;
3568             this.cachedContextState = element.cachedContextState;
3569         }
3570 
3571         public TimeSpan PeekTimeout
3572         {
3573             get
3574             {
3575                 return this.demuxer.PeekTimeout;
3576             }
3577             set
3578             {
3579                 if (value < TimeSpan.Zero && value != ChannelDemuxer.UseDefaultReceiveTimeout)
3580                 {
3581                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value"));
3582                 }
3583 
3584                 this.demuxer.PeekTimeout = value;
3585             }
3586         }
3587 
3588         public int MaxPendingSessions
3589         {
3590             get
3591             {
3592                 return this.demuxer.MaxPendingSessions;
3593             }
3594             set
3595             {
3596                 if (value < 1)
3597                 {
3598                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException(SR.GetString(SR.ValueMustBeGreaterThanZero)));
3599                 }
3600 
3601                 this.demuxer.MaxPendingSessions = value;
3602             }
3603         }
3604 
SubstituteCachedBindingContextParametersIfNeeded(BindingContext context)3605         void SubstituteCachedBindingContextParametersIfNeeded(BindingContext context)
3606         {
3607             if (!this.cacheContextState)
3608             {
3609                 return;
3610             }
3611             if (!this.cachedContextState.IsStateCached)
3612             {
3613                 foreach (object parameter in context.BindingParameters)
3614                 {
3615                     this.cachedContextState.CachedBindingParameters.Add(parameter);
3616                 }
3617                 this.cachedContextState.IsStateCached = true;
3618             }
3619             else
3620             {
3621                 context.BindingParameters.Clear();
3622                 foreach (object parameter in this.cachedContextState.CachedBindingParameters)
3623                 {
3624                     context.BindingParameters.Add(parameter);
3625                 }
3626             }
3627         }
3628 
BuildChannelFactory(BindingContext context)3629         public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
3630         {
3631             if (context == null)
3632                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
3633 
3634             SubstituteCachedBindingContextParametersIfNeeded(context);
3635             return context.BuildInnerChannelFactory<TChannel>();
3636         }
3637 
3638 
BuildChannelListener(BindingContext context)3639         public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
3640         {
3641             if (context == null)
3642                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
3643             ChannelDemuxerFilter filter = context.BindingParameters.Remove<ChannelDemuxerFilter>();
3644             SubstituteCachedBindingContextParametersIfNeeded(context);
3645             if (filter == null)
3646                 return demuxer.BuildChannelListener<TChannel>(context);
3647             else
3648                 return demuxer.BuildChannelListener<TChannel>(context, filter);
3649         }
3650 
CanBuildChannelFactory(BindingContext context)3651         public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
3652         {
3653             if (context == null)
3654                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
3655 
3656             return context.CanBuildInnerChannelFactory<TChannel>();
3657         }
3658 
CanBuildChannelListener(BindingContext context)3659         public override bool CanBuildChannelListener<TChannel>(BindingContext context)
3660         {
3661             if (context == null)
3662                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
3663 
3664             return context.CanBuildInnerChannelListener<TChannel>();
3665         }
3666 
Clone()3667         public override BindingElement Clone()
3668         {
3669             return new ChannelDemuxerBindingElement(this);
3670         }
3671 
GetProperty(BindingContext context)3672         public override T GetProperty<T>(BindingContext context)
3673         {
3674             // augment the context with cached binding parameters
3675             if (this.cacheContextState && this.cachedContextState.IsStateCached)
3676             {
3677                 for (int i = 0; i < this.cachedContextState.CachedBindingParameters.Count; ++i)
3678                 {
3679                     if (!context.BindingParameters.Contains(this.cachedContextState.CachedBindingParameters[i].GetType()))
3680                     {
3681                         context.BindingParameters.Add(this.cachedContextState.CachedBindingParameters[i]);
3682                     }
3683                 }
3684             }
3685             return context.GetInnerProperty<T>();
3686         }
3687 
3688         class CachedBindingContextState
3689         {
3690             public bool IsStateCached;
3691             public BindingParameterCollection CachedBindingParameters;
3692 
CachedBindingContextState()3693             public CachedBindingContextState()
3694             {
3695                 CachedBindingParameters = new BindingParameterCollection();
3696             }
3697         }
3698     }
3699 
3700     //
3701     // Demuxer filter
3702     //
3703 
3704     class ChannelDemuxerFilter
3705     {
3706         MessageFilter filter;
3707         int priority;
3708 
ChannelDemuxerFilter(MessageFilter filter, int priority)3709         public ChannelDemuxerFilter(MessageFilter filter, int priority)
3710         {
3711             this.filter = filter;
3712             this.priority = priority;
3713         }
3714 
3715         public MessageFilter Filter
3716         {
3717             get { return this.filter; }
3718         }
3719 
3720         public int Priority
3721         {
3722             get { return this.priority; }
3723         }
3724     }
3725 
3726     class ReplyChannelDemuxFailureAsyncResult : AsyncResult
3727     {
3728         static AsyncCallback demuxFailureHandlerCallback = Fx.ThunkCallback(new AsyncCallback(DemuxFailureHandlerCallback));
3729         IChannelDemuxFailureHandler demuxFailureHandler;
3730         RequestContext requestContext;
3731 
ReplyChannelDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, AsyncCallback callback, object state)3732         public ReplyChannelDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, AsyncCallback callback, object state)
3733             : base(callback, state)
3734         {
3735             if (demuxFailureHandler == null)
3736             {
3737                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("demuxFailureHandler");
3738             }
3739             if (requestContext == null)
3740             {
3741                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("requestContext");
3742             }
3743             this.demuxFailureHandler = demuxFailureHandler;
3744             this.requestContext = requestContext;
3745         }
3746 
Start()3747         public void Start()
3748         {
3749             IAsyncResult result = this.demuxFailureHandler.BeginHandleDemuxFailure(requestContext.RequestMessage, requestContext, demuxFailureHandlerCallback, this);
3750             if (!result.CompletedSynchronously)
3751             {
3752                 return;
3753             }
3754             this.demuxFailureHandler.EndHandleDemuxFailure(result);
3755             if (this.OnDemuxFailureHandled())
3756             {
3757                 Complete(true);
3758             }
3759         }
3760 
OnDemuxFailureHandled()3761         protected virtual bool OnDemuxFailureHandled()
3762         {
3763             requestContext.Close();
3764             return true;
3765         }
3766 
DemuxFailureHandlerCallback(IAsyncResult result)3767         static void DemuxFailureHandlerCallback(IAsyncResult result)
3768         {
3769             if (result.CompletedSynchronously)
3770             {
3771                 return;
3772             }
3773             ReplyChannelDemuxFailureAsyncResult self = (ReplyChannelDemuxFailureAsyncResult)(result.AsyncState);
3774             bool completeSelf = false;
3775             Exception completionException = null;
3776             try
3777             {
3778                 self.demuxFailureHandler.EndHandleDemuxFailure(result);
3779                 completeSelf = self.OnDemuxFailureHandled();
3780             }
3781 #pragma warning suppress 56500 // covered by FxCOP
3782             catch (Exception e)
3783             {
3784                 if (Fx.IsFatal(e)) throw;
3785                 completeSelf = true;
3786                 completionException = e;
3787             }
3788             if (completeSelf)
3789             {
3790                 self.Complete(false, completionException);
3791             }
3792         }
3793 
End(IAsyncResult result)3794         public static void End(IAsyncResult result)
3795         {
3796             AsyncResult.End<ReplyChannelDemuxFailureAsyncResult>(result);
3797         }
3798     }
3799 
3800     class ReplySessionDemuxFailureAsyncResult : ReplyChannelDemuxFailureAsyncResult
3801     {
3802         static AsyncCallback closeChannelCallback = Fx.ThunkCallback(new AsyncCallback(ChannelCloseCallback));
3803         IReplySessionChannel channel;
3804 
ReplySessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, IReplySessionChannel channel, AsyncCallback callback, object state)3805         public ReplySessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, IReplySessionChannel channel, AsyncCallback callback, object state)
3806             : base(demuxFailureHandler, requestContext, callback, state)
3807         {
3808             this.channel = channel;
3809         }
3810 
OnDemuxFailureHandled()3811         protected override bool OnDemuxFailureHandled()
3812         {
3813             base.OnDemuxFailureHandled();
3814             IAsyncResult result = this.channel.BeginClose(closeChannelCallback, this);
3815             if (!result.CompletedSynchronously)
3816             {
3817                 return false;
3818             }
3819             this.channel.EndClose(result);
3820             return true;
3821         }
3822 
ChannelCloseCallback(IAsyncResult result)3823         static void ChannelCloseCallback(IAsyncResult result)
3824         {
3825             if (result.CompletedSynchronously)
3826             {
3827                 return;
3828             }
3829             ReplySessionDemuxFailureAsyncResult self = (ReplySessionDemuxFailureAsyncResult)result.AsyncState;
3830             Exception completionException = null;
3831             try
3832             {
3833                 self.channel.EndClose(result);
3834             }
3835 #pragma warning suppress 56500 // covered by FxCOP
3836             catch (Exception e)
3837             {
3838                 if (Fx.IsFatal(e)) throw;
3839                 completionException = e;
3840             }
3841             self.Complete(false, completionException);
3842         }
3843 
End(IAsyncResult result)3844         public static new void End(IAsyncResult result)
3845         {
3846             AsyncResult.End<ReplySessionDemuxFailureAsyncResult>(result);
3847         }
3848     }
3849 
3850     class DuplexSessionDemuxFailureAsyncResult : AsyncResult
3851     {
3852         static AsyncCallback demuxFailureHandlerCallback = Fx.ThunkCallback(new AsyncCallback(DemuxFailureHandlerCallback));
3853         static AsyncCallback channelCloseCallback = Fx.ThunkCallback(new AsyncCallback(ChannelCloseCallback));
3854         IChannelDemuxFailureHandler demuxFailureHandler;
3855         IDuplexSessionChannel channel;
3856         Message message;
3857 
DuplexSessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, IDuplexSessionChannel channel, Message message, AsyncCallback callback, object state)3858         public DuplexSessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, IDuplexSessionChannel channel, Message message, AsyncCallback callback, object state)
3859             : base(callback, state)
3860         {
3861             if (demuxFailureHandler == null)
3862             {
3863                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("demuxFailureHandler");
3864             }
3865             if (channel == null)
3866             {
3867                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("channel");
3868             }
3869             this.demuxFailureHandler = demuxFailureHandler;
3870             this.channel = channel;
3871             this.message = message;
3872         }
3873 
Start()3874         public void Start()
3875         {
3876             IAsyncResult result = this.demuxFailureHandler.BeginHandleDemuxFailure(this.message, this.channel, demuxFailureHandlerCallback, this);
3877             if (!result.CompletedSynchronously)
3878             {
3879                 return;
3880             }
3881             this.demuxFailureHandler.EndHandleDemuxFailure(result);
3882             if (this.OnDemuxFailureHandled())
3883             {
3884                 Complete(true);
3885             }
3886         }
3887 
OnDemuxFailureHandled()3888         bool OnDemuxFailureHandled()
3889         {
3890             IAsyncResult result = this.channel.BeginClose(channelCloseCallback, this);
3891             if (!result.CompletedSynchronously)
3892             {
3893                 return false;
3894             }
3895             this.channel.EndClose(result);
3896             this.message.Close();
3897             return true;
3898         }
3899 
DemuxFailureHandlerCallback(IAsyncResult result)3900         static void DemuxFailureHandlerCallback(IAsyncResult result)
3901         {
3902             if (result.CompletedSynchronously)
3903             {
3904                 return;
3905             }
3906             DuplexSessionDemuxFailureAsyncResult self = (DuplexSessionDemuxFailureAsyncResult)result.AsyncState;
3907             bool completeSelf = false;
3908             Exception completionException = null;
3909             try
3910             {
3911                 self.demuxFailureHandler.EndHandleDemuxFailure(result);
3912                 completeSelf = self.OnDemuxFailureHandled();
3913             }
3914 #pragma warning suppress 56500 // covered by FxCOP
3915             catch (Exception e)
3916             {
3917                 if (Fx.IsFatal(e)) throw;
3918                 completeSelf = true;
3919                 completionException = e;
3920             }
3921             if (completeSelf)
3922             {
3923                 self.Complete(false, completionException);
3924             }
3925         }
3926 
ChannelCloseCallback(IAsyncResult result)3927         static void ChannelCloseCallback(IAsyncResult result)
3928         {
3929             if (result.CompletedSynchronously)
3930             {
3931                 return;
3932             }
3933             DuplexSessionDemuxFailureAsyncResult self = (DuplexSessionDemuxFailureAsyncResult)result.AsyncState;
3934             Exception completionException = null;
3935             try
3936             {
3937                 self.channel.EndClose(result);
3938                 self.message.Close();
3939             }
3940 #pragma warning suppress 56500 // covered by FxCOP
3941             catch (Exception e)
3942             {
3943                 if (Fx.IsFatal(e)) throw;
3944                 completionException = e;
3945             }
3946             self.Complete(false, completionException);
3947         }
3948 
End(IAsyncResult result)3949         public static void End(IAsyncResult result)
3950         {
3951             AsyncResult.End<DuplexSessionDemuxFailureAsyncResult>(result);
3952         }
3953     }
3954 
3955     interface IChannelDemuxFailureHandler
3956     {
HandleDemuxFailure(Message message)3957         void HandleDemuxFailure(Message message);
3958 
BeginHandleDemuxFailure(Message message, RequestContext faultContext, AsyncCallback callback, object state)3959         IAsyncResult BeginHandleDemuxFailure(Message message, RequestContext faultContext, AsyncCallback callback, object state);
BeginHandleDemuxFailure(Message message, IOutputChannel faultContext, AsyncCallback callback, object state)3960         IAsyncResult BeginHandleDemuxFailure(Message message, IOutputChannel faultContext, AsyncCallback callback, object state);
EndHandleDemuxFailure(IAsyncResult result)3961         void EndHandleDemuxFailure(IAsyncResult result);
3962     }
3963 }
3964