1 //----------------------------------------------------------------------------- 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //----------------------------------------------------------------------------- 4 5 namespace System.ServiceModel.Channels 6 { 7 using System; 8 using System.Collections.Generic; 9 using System.Diagnostics; 10 using System.Runtime; 11 using System.ServiceModel; 12 using System.ServiceModel.Diagnostics; 13 using System.ServiceModel.Dispatcher; 14 using System.Threading; 15 using System.Xml; 16 using System.ServiceModel.Diagnostics.Application; 17 using System.Diagnostics.CodeAnalysis; 18 19 class ChannelDemuxer 20 { 21 public readonly static TimeSpan UseDefaultReceiveTimeout = TimeSpan.MinValue; 22 23 TypedChannelDemuxer inputDemuxer; 24 TypedChannelDemuxer replyDemuxer; 25 Dictionary<Type, TypedChannelDemuxer> typeDemuxers; 26 TimeSpan peekTimeout; 27 int maxPendingSessions; 28 ChannelDemuxer()29 public ChannelDemuxer() 30 { 31 this.peekTimeout = ChannelDemuxer.UseDefaultReceiveTimeout; //use the default receive timeout (original behavior) 32 this.maxPendingSessions = 10; 33 this.typeDemuxers = new Dictionary<Type, TypedChannelDemuxer>(); 34 } 35 36 public TimeSpan PeekTimeout 37 { 38 get 39 { 40 return this.peekTimeout; 41 } 42 set 43 { 44 this.peekTimeout = value; 45 } 46 } 47 48 public int MaxPendingSessions 49 { 50 get 51 { 52 return this.maxPendingSessions; 53 } 54 set 55 { 56 this.maxPendingSessions = value; 57 } 58 } 59 60 public IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) 61 where TChannel : class, IChannel 62 { 63 return this.BuildChannelListener<TChannel>(context, new ChannelDemuxerFilter(new MatchAllMessageFilter(), 0)); 64 } 65 66 public IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context, ChannelDemuxerFilter filter) 67 where TChannel : class, IChannel 68 { 69 return GetTypedDemuxer(typeof(TChannel), context).BuildChannelListener<TChannel>(filter); 70 } 71 CreateTypedDemuxer(Type channelType, BindingContext context)72 TypedChannelDemuxer CreateTypedDemuxer(Type channelType, BindingContext context) 73 { 74 if (channelType == typeof(IDuplexChannel)) 75 return (TypedChannelDemuxer)(object)new DuplexChannelDemuxer(context); 76 if (channelType == typeof(IInputSessionChannel)) 77 return (TypedChannelDemuxer)(object)new InputSessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions); 78 if (channelType == typeof(IReplySessionChannel)) 79 return (TypedChannelDemuxer)(object)new ReplySessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions); 80 if (channelType == typeof(IDuplexSessionChannel)) 81 return (TypedChannelDemuxer)(object)new DuplexSessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions); 82 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException()); 83 } 84 85 [SuppressMessage(FxCop.Category.Usage, "CA2301:EmbeddableTypesInContainersRule", MessageId = "typeDemuxers", Justification = "No need to support type equivalence here.")] GetTypedDemuxer(Type channelType, BindingContext context)86 TypedChannelDemuxer GetTypedDemuxer(Type channelType, BindingContext context) 87 { 88 TypedChannelDemuxer typeDemuxer = null; 89 bool createdDemuxer = false; 90 91 if (channelType == typeof(IInputChannel)) 92 { 93 if (this.inputDemuxer == null) 94 { 95 if (context.CanBuildInnerChannelListener<IReplyChannel>()) 96 this.inputDemuxer = this.replyDemuxer = new ReplyChannelDemuxer(context); 97 else 98 this.inputDemuxer = new InputChannelDemuxer(context); 99 createdDemuxer = true; 100 } 101 typeDemuxer = this.inputDemuxer; 102 } 103 else if (channelType == typeof(IReplyChannel)) 104 { 105 if (this.replyDemuxer == null) 106 { 107 this.inputDemuxer = this.replyDemuxer = new ReplyChannelDemuxer(context); 108 createdDemuxer = true; 109 } 110 typeDemuxer = this.replyDemuxer; 111 } 112 else if (!this.typeDemuxers.TryGetValue(channelType, out typeDemuxer)) 113 { 114 typeDemuxer = this.CreateTypedDemuxer(channelType, context); 115 this.typeDemuxers.Add(channelType, typeDemuxer); 116 createdDemuxer = true; 117 } 118 119 if (!createdDemuxer) 120 { 121 context.RemainingBindingElements.Clear(); 122 } 123 124 return (TypedChannelDemuxer)typeDemuxer; 125 } 126 } 127 128 abstract class TypedChannelDemuxer 129 { AbortMessage(RequestContext request)130 internal static void AbortMessage(RequestContext request) 131 { 132 // RequestContext.RequestMessage can throw an AddressMismatch exception. 133 try 134 { 135 AbortMessage(request.RequestMessage); 136 } 137 catch (Exception e) 138 { 139 if (Fx.IsFatal(e)) 140 { 141 throw; 142 } 143 144 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 145 } 146 } 147 AbortMessage(Message message)148 internal static void AbortMessage(Message message) 149 { 150 try 151 { 152 message.Close(); 153 } 154 catch (CommunicationException e) 155 { 156 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 157 } 158 catch (TimeoutException e) 159 { 160 if (TD.CloseTimeoutIsEnabled()) 161 { 162 TD.CloseTimeout(e.Message); 163 } 164 165 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 166 } 167 } 168 169 public abstract IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter) 170 where TChannel : class, IChannel; 171 } 172 173 // 174 // Datagram demuxers 175 // 176 177 interface IChannelDemuxer 178 { OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)179 void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout); OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)180 IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state); OnEndOuterListenerOpen(IAsyncResult result)181 void OnEndOuterListenerOpen(IAsyncResult result); OnOuterListenerAbort(ChannelDemuxerFilter filter)182 void OnOuterListenerAbort(ChannelDemuxerFilter filter); OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)183 void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout); OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)184 IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state); OnEndOuterListenerClose(IAsyncResult result)185 void OnEndOuterListenerClose(IAsyncResult result); 186 } 187 188 abstract class DatagramChannelDemuxer<TInnerChannel, TInnerItem> : TypedChannelDemuxer, IChannelDemuxer 189 where TInnerChannel : class, IChannel 190 where TInnerItem : class, IDisposable 191 { 192 MessageFilterTable<IChannelListener> filterTable; 193 TInnerChannel innerChannel; 194 IChannelListener<TInnerChannel> innerListener; 195 static AsyncCallback onReceiveComplete = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompleteStatic)); 196 static Action<object> startReceivingStatic = new Action<object>(StartReceivingStatic); 197 Action onItemDequeued; 198 int openCount; 199 IChannelDemuxFailureHandler demuxFailureHandler; 200 // since the OnOuterListenerOpen method will be called for every outer listener and we will open 201 // the inner listener only once, we need to ensure that all the outer listeners wait till the 202 // inner listener is opened. 203 ThreadNeutralSemaphore openSemaphore; 204 Exception pendingInnerListenerOpenException; 205 bool abortOngoingOpen; 206 DatagramChannelDemuxer(BindingContext context)207 public DatagramChannelDemuxer(BindingContext context) 208 { 209 this.filterTable = new MessageFilterTable<IChannelListener>(); 210 this.innerListener = context.BuildInnerChannelListener<TInnerChannel>(); 211 if (context.BindingParameters != null) 212 { 213 this.demuxFailureHandler = context.BindingParameters.Find<IChannelDemuxFailureHandler>(); 214 } 215 this.openSemaphore = new ThreadNeutralSemaphore(1); 216 } 217 218 protected TInnerChannel InnerChannel 219 { 220 get { return this.innerChannel; } 221 } 222 223 protected IChannelListener<TInnerChannel> InnerListener 224 { 225 get { return this.innerListener; } 226 } 227 228 protected object ThisLock 229 { 230 get { return this; } 231 } 232 233 protected IChannelDemuxFailureHandler DemuxFailureHandler 234 { 235 get { return this.demuxFailureHandler; } 236 } 237 AbortItem(TInnerItem item)238 protected abstract void AbortItem(TInnerItem item); BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)239 protected abstract IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state); 240 protected abstract LayeredChannelListener<TChannel> CreateListener<TChannel>(ChannelDemuxerFilter filter) where TChannel : class, IChannel; Dispatch(IChannelListener listener)241 protected abstract void Dispatch(IChannelListener listener); EndpointNotFound(TInnerItem item)242 protected abstract void EndpointNotFound(TInnerItem item); EndReceive(IAsyncResult result)243 protected abstract TInnerItem EndReceive(IAsyncResult result); EnqueueAndDispatch(IChannelListener listener, TInnerItem item, Action dequeuedCallback, bool canDispatchOnThisThread)244 protected abstract void EnqueueAndDispatch(IChannelListener listener, TInnerItem item, Action dequeuedCallback, bool canDispatchOnThisThread); EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)245 protected abstract void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread); GetMessage(TInnerItem item)246 protected abstract Message GetMessage(TInnerItem item); 247 BuildChannelListener(ChannelDemuxerFilter filter)248 public override IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter) 249 { 250 LayeredChannelListener<TChannel> listener = this.CreateListener<TChannel>(filter); 251 listener.InnerChannelListener = this.innerListener; 252 return listener; 253 } 254 255 // return false if BeginReceive should be called again HandleReceiveResult(IAsyncResult result)256 bool HandleReceiveResult(IAsyncResult result) 257 { 258 TInnerItem item; 259 try 260 { 261 item = this.EndReceive(result); 262 } 263 catch (CommunicationObjectFaultedException e) 264 { 265 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 266 return true; 267 } 268 catch (CommunicationObjectAbortedException e) 269 { 270 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 271 return true; 272 } 273 catch (ObjectDisposedException e) 274 { 275 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 276 return true; 277 } 278 catch (CommunicationException e) 279 { 280 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 281 return false; 282 } 283 catch (TimeoutException e) 284 { 285 if (TD.ReceiveTimeoutIsEnabled()) 286 { 287 TD.ReceiveTimeout(e.Message); 288 } 289 290 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 291 return false; 292 } 293 catch (Exception e) 294 { 295 if (Fx.IsFatal(e)) throw; 296 this.HandleUnknownException(e); 297 return true; 298 } 299 300 if (item == null) 301 { 302 if (this.innerChannel.State == CommunicationState.Opened) 303 { 304 if (DiagnosticUtility.ShouldTraceError) 305 { 306 TraceUtility.TraceEvent(TraceEventType.Error, TraceCode.PrematureDatagramEof, SR.GetString(SR.TraceCodePrematureDatagramEof), 307 null, this.innerChannel, null); 308 } 309 } 310 311 return true; 312 } 313 314 try 315 { 316 return this.ProcessItem(item); 317 } 318 catch (CommunicationException e) 319 { 320 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 321 return false; 322 } 323 catch (TimeoutException e) 324 { 325 if (TD.ReceiveTimeoutIsEnabled()) 326 { 327 TD.ReceiveTimeout(e.Message); 328 } 329 330 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 331 return false; 332 } 333 catch (Exception e) 334 { 335 if (Fx.IsFatal(e)) throw; 336 this.HandleUnknownException(e); 337 return true; 338 } 339 } 340 MatchListener(Message message)341 IChannelListener MatchListener(Message message) 342 { 343 IChannelListener matchingListener = null; 344 lock (this.ThisLock) 345 { 346 if (this.filterTable.GetMatchingValue(message, out matchingListener)) 347 { 348 return matchingListener; 349 } 350 } 351 return null; 352 } 353 OnItemDequeued()354 void OnItemDequeued() 355 { 356 this.StartReceiving(); 357 } 358 StartReceivingStatic(object state)359 static void StartReceivingStatic(object state) 360 { 361 ((DatagramChannelDemuxer<TInnerChannel, TInnerItem>)state).StartReceiving(); 362 } 363 HandleUnknownException(Exception exception)364 protected void HandleUnknownException(Exception exception) 365 { 366 DiagnosticUtility.TraceHandledException(exception, TraceEventType.Error); 367 368 IChannelListener listener = null; 369 lock (this.ThisLock) 370 { 371 if (this.filterTable.Count > 0) 372 { 373 KeyValuePair<MessageFilter, IChannelListener>[] pairs = new KeyValuePair<MessageFilter, IChannelListener>[this.filterTable.Count]; 374 this.filterTable.CopyTo(pairs, 0); 375 listener = pairs[0].Value; 376 377 if (this.onItemDequeued == null) 378 { 379 this.onItemDequeued = new Action(this.OnItemDequeued); 380 } 381 this.EnqueueAndDispatch(listener, exception, this.onItemDequeued, false); 382 } 383 } 384 } 385 AbortState()386 void AbortState() 387 { 388 if (this.innerChannel != null) 389 { 390 this.innerChannel.Abort(); 391 } 392 this.innerListener.Abort(); 393 } 394 OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)395 public void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout) 396 { 397 bool closeInnerChannelAndListener = false; 398 399 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 400 lock (this.ThisLock) 401 { 402 if (this.filterTable.ContainsKey(filter.Filter)) 403 { 404 this.filterTable.Remove(filter.Filter); 405 if (--this.openCount == 0) 406 { 407 closeInnerChannelAndListener = true; 408 } 409 } 410 } 411 if (closeInnerChannelAndListener) 412 { 413 bool closeSucceeded = false; 414 try 415 { 416 if (this.innerChannel != null) 417 { 418 this.innerChannel.Close(timeoutHelper.RemainingTime()); 419 } 420 this.innerListener.Close(timeoutHelper.RemainingTime()); 421 closeSucceeded = true; 422 } 423 finally 424 { 425 // we should abort the state since calling Abort on the channel demuxer will be a no-op 426 // due to the reference count being 0 427 if (!closeSucceeded) 428 { 429 AbortState(); 430 } 431 } 432 } 433 } 434 OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)435 public IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state) 436 { 437 bool closeInnerChannelAndListener = false; 438 lock (this.ThisLock) 439 { 440 if (this.filterTable.ContainsKey(filter.Filter)) 441 { 442 this.filterTable.Remove(filter.Filter); 443 if (--this.openCount == 0) 444 { 445 closeInnerChannelAndListener = true; 446 } 447 } 448 } 449 if (!closeInnerChannelAndListener) 450 { 451 return new CompletedAsyncResult(callback, state); 452 } 453 else 454 { 455 return new CloseAsyncResult(this, timeout, callback, state); 456 } 457 } 458 OnEndOuterListenerClose(IAsyncResult result)459 public void OnEndOuterListenerClose(IAsyncResult result) 460 { 461 if (result is CompletedAsyncResult) 462 { 463 CompletedAsyncResult.End(result); 464 } 465 else 466 { 467 CloseAsyncResult.End(result); 468 } 469 } 470 OnOuterListenerAbort(ChannelDemuxerFilter filter)471 public void OnOuterListenerAbort(ChannelDemuxerFilter filter) 472 { 473 bool abortInnerChannelAndListener = false; 474 lock (this.ThisLock) 475 { 476 if (this.filterTable.ContainsKey(filter.Filter)) 477 { 478 this.filterTable.Remove(filter.Filter); 479 if (--this.openCount == 0) 480 { 481 abortInnerChannelAndListener = true; 482 this.abortOngoingOpen = true; 483 } 484 } 485 } 486 if (abortInnerChannelAndListener) 487 { 488 AbortState(); 489 } 490 } 491 ThrowPendingOpenExceptionIfAny()492 void ThrowPendingOpenExceptionIfAny() 493 { 494 if (this.pendingInnerListenerOpenException != null) 495 { 496 if (pendingInnerListenerOpenException is CommunicationObjectAbortedException) 497 { 498 throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectAbortedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString()))); 499 } 500 else if (pendingInnerListenerOpenException is CommunicationObjectFaultedException) 501 { 502 throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectFaultedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString()))); 503 } 504 else 505 { 506 throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString()))); 507 } 508 } 509 } 510 ShouldOpenInnerListener(ChannelDemuxerFilter filter, IChannelListener listener)511 bool ShouldOpenInnerListener(ChannelDemuxerFilter filter, IChannelListener listener) 512 { 513 lock (this.ThisLock) 514 { 515 // the listener's Abort may be racing with Open 516 if (listener.State == CommunicationState.Closed || listener.State == CommunicationState.Closing) 517 { 518 return false; 519 } 520 this.filterTable.Add(filter.Filter, listener, filter.Priority); 521 if (++this.openCount == 1) 522 { 523 this.abortOngoingOpen = false; 524 return true; 525 } 526 } 527 return false; 528 } 529 OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)530 public void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout) 531 { 532 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 533 this.openSemaphore.Enter(timeoutHelper.RemainingTime()); 534 try 535 { 536 bool openInnerListener = ShouldOpenInnerListener(filter, listener); 537 if (openInnerListener) 538 { 539 try 540 { 541 this.innerListener.Open(timeoutHelper.RemainingTime()); 542 this.innerChannel = this.innerListener.AcceptChannel(timeoutHelper.RemainingTime()); 543 this.innerChannel.Open(timeoutHelper.RemainingTime()); 544 545 lock (ThisLock) 546 { 547 if (this.abortOngoingOpen) 548 { 549 this.AbortState(); 550 return; 551 } 552 } 553 554 ActionItem.Schedule(startReceivingStatic, this); 555 } 556 #pragma warning suppress 56500 // covered by FxCOP 557 catch (Exception e) 558 { 559 this.pendingInnerListenerOpenException = e; 560 throw; 561 } 562 } 563 else 564 { 565 this.ThrowPendingOpenExceptionIfAny(); 566 } 567 } 568 finally 569 { 570 this.openSemaphore.Exit(); 571 } 572 } 573 OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)574 public IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state) 575 { 576 return new OpenAsyncResult(this, filter, listener, timeout, callback, state); 577 } 578 OnEndOuterListenerOpen(IAsyncResult result)579 public void OnEndOuterListenerOpen(IAsyncResult result) 580 { 581 OpenAsyncResult.End(result); 582 } 583 OnReceiveComplete(IAsyncResult result)584 void OnReceiveComplete(IAsyncResult result) 585 { 586 if (!this.HandleReceiveResult(result)) 587 { 588 this.StartReceiving(); 589 } 590 } 591 OnReceiveCompleteStatic(IAsyncResult result)592 static void OnReceiveCompleteStatic(IAsyncResult result) 593 { 594 if (result.CompletedSynchronously) 595 return; 596 ((DatagramChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState).OnReceiveComplete(result); 597 } 598 ProcessItem(TInnerItem item)599 bool ProcessItem(TInnerItem item) 600 { 601 try 602 { 603 Message message = null; 604 IChannelListener matchingListener = null; 605 try 606 { 607 message = this.GetMessage(item); 608 matchingListener = MatchListener(message); 609 } 610 // The message may be bad because of which running the listener filters may throw 611 // In that case, continue receiving 612 catch (CommunicationException e) 613 { 614 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 615 return false; 616 } 617 catch (MultipleFilterMatchesException e) 618 { 619 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 620 return false; 621 } 622 catch (XmlException e) 623 { 624 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 625 return false; 626 } 627 catch (Exception e) 628 { 629 if (Fx.IsFatal(e)) throw; 630 this.HandleUnknownException(e); 631 return true; 632 } 633 634 if (matchingListener == null) 635 { 636 System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch( 637 new EndpointNotFoundException(SR.GetString(SR.UnableToDemuxChannel, message.Headers.Action)), message); 638 // EndpointNotFound is responsible for closing the item 639 this.EndpointNotFound(item); 640 item = null; 641 return false; 642 } 643 644 if (this.onItemDequeued == null) 645 { 646 this.onItemDequeued = new Action(this.OnItemDequeued); 647 } 648 this.EnqueueAndDispatch(matchingListener, item, this.onItemDequeued, false); 649 item = null; 650 return true; 651 } 652 finally 653 { 654 if (item != null) 655 { 656 this.AbortItem(item); 657 } 658 } 659 } 660 StartReceiving()661 void StartReceiving() 662 { 663 while (true) 664 { 665 if (this.innerChannel.State != CommunicationState.Opened) 666 { 667 return; 668 } 669 670 IAsyncResult result; 671 672 try 673 { 674 result = this.BeginReceive(TimeSpan.MaxValue, onReceiveComplete, this); 675 } 676 catch (CommunicationObjectFaultedException e) 677 { 678 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 679 return; 680 } 681 catch (CommunicationObjectAbortedException e) 682 { 683 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 684 return; 685 } 686 catch (ObjectDisposedException e) 687 { 688 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 689 return; 690 } 691 catch (CommunicationException e) 692 { 693 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 694 continue; 695 } 696 catch (TimeoutException e) 697 { 698 if (TD.ReceiveTimeoutIsEnabled()) 699 { 700 TD.ReceiveTimeout(e.Message); 701 } 702 703 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 704 continue; 705 } 706 catch (Exception e) 707 { 708 if (Fx.IsFatal(e)) throw; 709 this.HandleUnknownException(e); 710 return; 711 } 712 713 if (!result.CompletedSynchronously) 714 { 715 return; 716 } 717 718 if (this.HandleReceiveResult(result)) 719 { 720 return; 721 } 722 } 723 } 724 725 class OpenAsyncResult : AsyncResult 726 { 727 static FastAsyncCallback waitOverCallback = new FastAsyncCallback(WaitOverCallback); 728 static AsyncCallback openListenerCallback = Fx.ThunkCallback(new AsyncCallback(OpenListenerCallback)); 729 static AsyncCallback acceptChannelCallback = Fx.ThunkCallback(new AsyncCallback(AcceptChannelCallback)); 730 static AsyncCallback openChannelCallback = Fx.ThunkCallback(new AsyncCallback(OpenChannelCallback)); 731 DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer; 732 ChannelDemuxerFilter filter; 733 IChannelListener listener; 734 TimeoutHelper timeoutHelper; 735 bool openInnerListener; 736 OpenAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)737 public OpenAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state) 738 : base(callback, state) 739 { 740 this.channelDemuxer = channelDemuxer; 741 this.filter = filter; 742 this.listener = listener; 743 this.timeoutHelper = new TimeoutHelper(timeout); 744 if (!this.channelDemuxer.openSemaphore.EnterAsync(this.timeoutHelper.RemainingTime(), waitOverCallback, this)) 745 { 746 return; 747 } 748 749 bool onWaitOverSucceeded = false; 750 bool completeSelf = false; 751 try 752 { 753 completeSelf = this.OnWaitOver(); 754 onWaitOverSucceeded = true; 755 } 756 finally 757 { 758 if (!onWaitOverSucceeded) 759 { 760 Cleanup(); 761 } 762 } 763 if (completeSelf) 764 { 765 Cleanup(); 766 Complete(true); 767 } 768 } 769 WaitOverCallback(object state, Exception asyncException)770 static void WaitOverCallback(object state, Exception asyncException) 771 { 772 OpenAsyncResult self = (OpenAsyncResult)state; 773 Exception completionException = asyncException; 774 bool completeSelf = false; 775 776 if (completionException != null) 777 { 778 completeSelf = true; 779 } 780 else 781 { 782 try 783 { 784 completeSelf = self.OnWaitOver(); 785 } 786 #pragma warning suppress 56500 // covered by FxCOP 787 catch (Exception e) 788 { 789 if (Fx.IsFatal(e)) throw; 790 completeSelf = true; 791 completionException = e; 792 } 793 } 794 795 if (completeSelf) 796 { 797 self.Cleanup(); 798 self.Complete(false, completionException); 799 } 800 } 801 OnWaitOver()802 bool OnWaitOver() 803 { 804 this.openInnerListener = this.channelDemuxer.ShouldOpenInnerListener(filter, listener); 805 // the semaphore is obtained. Check if the inner listener needs to be opened. If not, 806 // check if there is a pending exception obtained while opening the inner listener and throw 807 // that 808 if (!this.openInnerListener) 809 { 810 this.channelDemuxer.ThrowPendingOpenExceptionIfAny(); 811 return true; 812 } 813 else 814 { 815 return this.OnOpenInnerListener(); 816 } 817 } 818 OnInnerListenerEndOpen(IAsyncResult result)819 bool OnInnerListenerEndOpen(IAsyncResult result) 820 { 821 this.channelDemuxer.innerListener.EndOpen(result); 822 result = this.channelDemuxer.innerListener.BeginAcceptChannel(this.timeoutHelper.RemainingTime(), acceptChannelCallback, this); 823 824 if (!result.CompletedSynchronously) 825 { 826 return false; 827 } 828 829 return this.OnEndAcceptChannel(result); 830 } 831 OnOpenInnerListener()832 bool OnOpenInnerListener() 833 { 834 try 835 { 836 IAsyncResult result = this.channelDemuxer.innerListener.BeginOpen(timeoutHelper.RemainingTime(), openListenerCallback, this); 837 if (!result.CompletedSynchronously) 838 { 839 return false; 840 } 841 this.OnInnerListenerEndOpen(result); 842 return true; 843 } 844 #pragma warning suppress 56500 // covered by FxCOP 845 catch (Exception e) 846 { 847 this.channelDemuxer.pendingInnerListenerOpenException = e; 848 throw; 849 } 850 } 851 OpenListenerCallback(IAsyncResult result)852 static void OpenListenerCallback(IAsyncResult result) 853 { 854 if (result.CompletedSynchronously) 855 { 856 return; 857 } 858 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState; 859 Exception completionException = null; 860 try 861 { 862 self.OnInnerListenerEndOpen(result); 863 } 864 #pragma warning suppress 56500 // covered by FxCOP 865 catch (Exception e) 866 { 867 if (Fx.IsFatal(e)) throw; 868 completionException = e; 869 } 870 if (completionException != null) 871 { 872 self.channelDemuxer.pendingInnerListenerOpenException = completionException; 873 } 874 self.Cleanup(); 875 self.Complete(false, completionException); 876 } 877 AcceptChannelCallback(IAsyncResult result)878 static void AcceptChannelCallback(IAsyncResult result) 879 { 880 if (result.CompletedSynchronously) 881 { 882 return; 883 } 884 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState; 885 Exception completionException = null; 886 bool completeSelf = false; 887 try 888 { 889 completeSelf = self.OnEndAcceptChannel(result); 890 } 891 #pragma warning suppress 56500 // covered by FxCOP 892 catch (Exception e) 893 { 894 if (Fx.IsFatal(e)) throw; 895 completionException = e; 896 completeSelf = true; 897 } 898 if (completeSelf) 899 { 900 if (completionException != null) 901 { 902 self.channelDemuxer.pendingInnerListenerOpenException = completionException; 903 } 904 self.Cleanup(); 905 self.Complete(false, completionException); 906 } 907 } 908 OnEndAcceptChannel(IAsyncResult result)909 bool OnEndAcceptChannel(IAsyncResult result) 910 { 911 this.channelDemuxer.innerChannel = this.channelDemuxer.innerListener.EndAcceptChannel(result); 912 IAsyncResult openResult = this.channelDemuxer.innerChannel.BeginOpen(this.timeoutHelper.RemainingTime(), acceptChannelCallback, this); 913 914 if (!openResult.CompletedSynchronously) 915 { 916 return false; 917 } 918 919 this.OnEndOpenChannel(openResult); 920 return true; 921 } 922 OpenChannelCallback(IAsyncResult result)923 static void OpenChannelCallback(IAsyncResult result) 924 { 925 if (result.CompletedSynchronously) 926 { 927 return; 928 } 929 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState; 930 Exception completionException = null; 931 try 932 { 933 self.OnEndOpenChannel(result); 934 } 935 #pragma warning suppress 56500 // covered by FxCOP 936 catch (Exception e) 937 { 938 if (Fx.IsFatal(e)) throw; 939 completionException = e; 940 } 941 if (completionException != null) 942 { 943 self.channelDemuxer.pendingInnerListenerOpenException = completionException; 944 } 945 self.Cleanup(); 946 self.Complete(false, completionException); 947 } 948 OnEndOpenChannel(IAsyncResult result)949 void OnEndOpenChannel(IAsyncResult result) 950 { 951 this.channelDemuxer.innerChannel.EndOpen(result); 952 953 lock (this.channelDemuxer.ThisLock) 954 { 955 if (this.channelDemuxer.abortOngoingOpen) 956 { 957 this.channelDemuxer.AbortState(); 958 return; 959 } 960 } 961 962 ActionItem.Schedule(startReceivingStatic, this.channelDemuxer); 963 } 964 Cleanup()965 void Cleanup() 966 { 967 this.channelDemuxer.openSemaphore.Exit(); 968 } 969 End(IAsyncResult result)970 public static void End(IAsyncResult result) 971 { 972 AsyncResult.End<OpenAsyncResult>(result); 973 } 974 } 975 976 class CloseAsyncResult : AsyncResult 977 { 978 static AsyncCallback sharedCallback = Fx.ThunkCallback(new AsyncCallback(SharedCallback)); 979 DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer; 980 TimeoutHelper timeoutHelper; 981 bool closedInnerChannel; 982 CloseAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, TimeSpan timeout, AsyncCallback callback, object state)983 public CloseAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, TimeSpan timeout, AsyncCallback callback, object state) 984 : base(callback, state) 985 { 986 this.channelDemuxer = channelDemuxer; 987 this.timeoutHelper = new TimeoutHelper(timeout); 988 if (channelDemuxer.innerChannel != null) 989 { 990 bool closeSucceeded = false; 991 try 992 { 993 IAsyncResult result = channelDemuxer.innerChannel.BeginClose(timeoutHelper.RemainingTime(), sharedCallback, this); 994 if (!result.CompletedSynchronously) 995 { 996 closeSucceeded = true; 997 return; 998 } 999 channelDemuxer.innerChannel.EndClose(result); 1000 closeSucceeded = true; 1001 } 1002 finally 1003 { 1004 if (!closeSucceeded) 1005 { 1006 // we should abort the state since calling Abort on the channel demuxer will be a no-op 1007 // due to the reference count being 0 1008 this.channelDemuxer.AbortState(); 1009 } 1010 } 1011 } 1012 if (OnInnerChannelClosed()) 1013 { 1014 Complete(true); 1015 } 1016 } 1017 OnInnerChannelClosed()1018 bool OnInnerChannelClosed() 1019 { 1020 this.closedInnerChannel = true; 1021 bool closeSucceeded = false; 1022 try 1023 { 1024 IAsyncResult result = channelDemuxer.innerListener.BeginClose(timeoutHelper.RemainingTime(), sharedCallback, this); 1025 if (!result.CompletedSynchronously) 1026 { 1027 closeSucceeded = true; 1028 return false; 1029 } 1030 channelDemuxer.innerListener.EndClose(result); 1031 closeSucceeded = true; 1032 } 1033 finally 1034 { 1035 if (!closeSucceeded) 1036 { 1037 // we should abort the state since calling Abort on the channel demuxer will be a no-op 1038 // due to the reference count being 0 1039 channelDemuxer.AbortState(); 1040 } 1041 } 1042 return true; 1043 } 1044 SharedCallback(IAsyncResult result)1045 static void SharedCallback(IAsyncResult result) 1046 { 1047 if (result.CompletedSynchronously) 1048 { 1049 return; 1050 } 1051 CloseAsyncResult self = (CloseAsyncResult)result.AsyncState; 1052 bool completeSelf = false; 1053 Exception completionException = null; 1054 bool closeSucceeded = false; 1055 try 1056 { 1057 if (!self.closedInnerChannel) 1058 { 1059 self.channelDemuxer.innerChannel.EndClose(result); 1060 completeSelf = self.OnInnerChannelClosed(); 1061 closeSucceeded = true; 1062 } 1063 else 1064 { 1065 self.channelDemuxer.innerListener.EndClose(result); 1066 completeSelf = true; 1067 closeSucceeded = true; 1068 } 1069 } 1070 #pragma warning suppress 56500 // covered by FxCOP 1071 catch (Exception e) 1072 { 1073 if (Fx.IsFatal(e)) throw; 1074 completeSelf = true; 1075 completionException = e; 1076 } 1077 finally 1078 { 1079 if (!closeSucceeded) 1080 { 1081 // we should abort the state since calling Abort on the channel demuxer will be a no-op 1082 // due to the reference count being 0 1083 self.channelDemuxer.AbortState(); 1084 } 1085 } 1086 if (completeSelf) 1087 { 1088 self.Complete(false, completionException); 1089 } 1090 } 1091 End(IAsyncResult result)1092 public static void End(IAsyncResult result) 1093 { 1094 AsyncResult.End<CloseAsyncResult>(result); 1095 } 1096 } 1097 } 1098 1099 class InputChannelDemuxer : DatagramChannelDemuxer<IInputChannel, Message> 1100 { InputChannelDemuxer(BindingContext context)1101 public InputChannelDemuxer(BindingContext context) 1102 : base(context) 1103 { 1104 } 1105 AbortItem(Message message)1106 protected override void AbortItem(Message message) 1107 { 1108 AbortMessage(message); 1109 } 1110 BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)1111 protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) 1112 { 1113 return this.InnerChannel.BeginReceive(timeout, callback, state); 1114 } 1115 CreateListener(ChannelDemuxerFilter filter)1116 protected override LayeredChannelListener<IInputChannel> CreateListener<IInputChannel>(ChannelDemuxerFilter filter) 1117 { 1118 SingletonChannelListener<IInputChannel, InputChannel, Message> listener = new SingletonChannelListener<IInputChannel, InputChannel, Message>(filter, this); 1119 listener.Acceptor = (IChannelAcceptor<IInputChannel>)new InputChannelAcceptor(listener); 1120 return listener; 1121 } 1122 Dispatch(IChannelListener listener)1123 protected override void Dispatch(IChannelListener listener) 1124 { 1125 SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener; 1126 singletonListener.Dispatch(); 1127 } 1128 EndpointNotFound(Message message)1129 protected override void EndpointNotFound(Message message) 1130 { 1131 if (this.DemuxFailureHandler != null) 1132 { 1133 this.DemuxFailureHandler.HandleDemuxFailure(message); 1134 } 1135 this.AbortItem(message); 1136 } 1137 EndReceive(IAsyncResult result)1138 protected override Message EndReceive(IAsyncResult result) 1139 { 1140 return this.InnerChannel.EndReceive(result); 1141 } 1142 EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)1143 protected override void EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread) 1144 { 1145 SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener; 1146 singletonListener.EnqueueAndDispatch(message, dequeuedCallback, canDispatchOnThisThread); 1147 } 1148 EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1149 protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread) 1150 { 1151 SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener; 1152 singletonListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread); 1153 } 1154 GetMessage(Message message)1155 protected override Message GetMessage(Message message) 1156 { 1157 return message; 1158 } 1159 } 1160 1161 class DuplexChannelDemuxer : DatagramChannelDemuxer<IDuplexChannel, Message> 1162 { DuplexChannelDemuxer(BindingContext context)1163 public DuplexChannelDemuxer(BindingContext context) 1164 : base(context) 1165 { 1166 } 1167 AbortItem(Message message)1168 protected override void AbortItem(Message message) 1169 { 1170 AbortMessage(message); 1171 } 1172 BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)1173 protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) 1174 { 1175 return this.InnerChannel.BeginReceive(timeout, callback, state); 1176 } 1177 CreateListener(ChannelDemuxerFilter filter)1178 protected override LayeredChannelListener<IDuplexChannel> CreateListener<IDuplexChannel>(ChannelDemuxerFilter filter) 1179 { 1180 SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> listener = new SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>(filter, this); 1181 listener.Acceptor = (IChannelAcceptor<IDuplexChannel>)new DuplexChannelAcceptor(listener, this); 1182 return listener; 1183 } 1184 Dispatch(IChannelListener listener)1185 protected override void Dispatch(IChannelListener listener) 1186 { 1187 SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener; 1188 singletonListener.Dispatch(); 1189 } 1190 EndpointNotFound(Message message)1191 protected override void EndpointNotFound(Message message) 1192 { 1193 if (this.DemuxFailureHandler != null) 1194 { 1195 this.DemuxFailureHandler.HandleDemuxFailure(message); 1196 } 1197 this.AbortItem(message); 1198 } 1199 EndReceive(IAsyncResult result)1200 protected override Message EndReceive(IAsyncResult result) 1201 { 1202 return this.InnerChannel.EndReceive(result); 1203 } 1204 EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)1205 protected override void EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread) 1206 { 1207 SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener; 1208 singletonListener.EnqueueAndDispatch(message, dequeuedCallback, canDispatchOnThisThread); 1209 } 1210 EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1211 protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread) 1212 { 1213 SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener; 1214 singletonListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread); 1215 } 1216 GetMessage(Message message)1217 protected override Message GetMessage(Message message) 1218 { 1219 return message; 1220 } 1221 1222 class DuplexChannelAcceptor : SingletonChannelAcceptor<IDuplexChannel, DuplexChannel, Message> 1223 { 1224 DuplexChannelDemuxer demuxer; 1225 DuplexChannelAcceptor(ChannelManagerBase channelManager, DuplexChannelDemuxer demuxer)1226 public DuplexChannelAcceptor(ChannelManagerBase channelManager, DuplexChannelDemuxer demuxer) 1227 : base(channelManager) 1228 { 1229 this.demuxer = demuxer; 1230 } 1231 OnCreateChannel()1232 protected override DuplexChannel OnCreateChannel() 1233 { 1234 return new DuplexChannelWrapper(this.ChannelManager, demuxer.InnerChannel); 1235 } 1236 OnTraceMessageReceived(Message message)1237 protected override void OnTraceMessageReceived(Message message) 1238 { 1239 if (DiagnosticUtility.ShouldTraceInformation) 1240 { 1241 TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageReceived, SR.GetString(SR.TraceCodeMessageReceived), 1242 MessageTransmitTraceRecord.CreateReceiveTraceRecord(message), this, null); 1243 } 1244 } 1245 } 1246 1247 class DuplexChannelWrapper : DuplexChannel 1248 { 1249 IDuplexChannel innerChannel; 1250 DuplexChannelWrapper(ChannelManagerBase channelManager, IDuplexChannel innerChannel)1251 public DuplexChannelWrapper(ChannelManagerBase channelManager, IDuplexChannel innerChannel) 1252 : base(channelManager, innerChannel.LocalAddress) 1253 { 1254 this.innerChannel = innerChannel; 1255 } 1256 1257 public override EndpointAddress RemoteAddress 1258 { 1259 get { return this.innerChannel.RemoteAddress; } 1260 } 1261 1262 public override Uri Via 1263 { 1264 get { return this.innerChannel.Via; } 1265 } 1266 OnSend(Message message, TimeSpan timeout)1267 protected override void OnSend(Message message, TimeSpan timeout) 1268 { 1269 this.innerChannel.Send(message, timeout); 1270 } 1271 OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)1272 protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) 1273 { 1274 return this.innerChannel.BeginSend(message, timeout, callback, state); 1275 } 1276 OnEndSend(IAsyncResult result)1277 protected override void OnEndSend(IAsyncResult result) 1278 { 1279 this.innerChannel.EndSend(result); 1280 } 1281 OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)1282 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 1283 { 1284 return new CompletedAsyncResult(callback, state); 1285 } 1286 OnEndOpen(IAsyncResult result)1287 protected override void OnEndOpen(IAsyncResult result) 1288 { 1289 CompletedAsyncResult.End(result); 1290 } 1291 OnOpen(TimeSpan timeout)1292 protected override void OnOpen(TimeSpan timeout) 1293 { 1294 } 1295 } 1296 } 1297 1298 class ReplyChannelDemuxer : DatagramChannelDemuxer<IReplyChannel, RequestContext> 1299 { ReplyChannelDemuxer(BindingContext context)1300 public ReplyChannelDemuxer(BindingContext context) 1301 : base(context) 1302 { 1303 } 1304 AbortItem(RequestContext request)1305 protected override void AbortItem(RequestContext request) 1306 { 1307 AbortMessage(request); 1308 request.Abort(); 1309 } 1310 BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)1311 protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) 1312 { 1313 return this.InnerChannel.BeginReceiveRequest(timeout, callback, state); 1314 } 1315 CreateListener(ChannelDemuxerFilter filter)1316 protected override LayeredChannelListener<TChannel> CreateListener<TChannel>(ChannelDemuxerFilter filter) 1317 { 1318 if (typeof(TChannel) == typeof(IInputChannel)) 1319 { 1320 SingletonChannelListener<IInputChannel, InputChannel, Message> listener = new SingletonChannelListener<IInputChannel, InputChannel, Message>(filter, this); 1321 listener.Acceptor = (IChannelAcceptor<IInputChannel>)new InputChannelAcceptor(listener); 1322 return (LayeredChannelListener<TChannel>)(object)listener; 1323 } 1324 else if (typeof(TChannel) == typeof(IReplyChannel)) 1325 { 1326 SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> listener = new SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>(filter, this); 1327 listener.Acceptor = (IChannelAcceptor<IReplyChannel>)new ReplyChannelAcceptor(listener); 1328 return (LayeredChannelListener<TChannel>)(object)listener; 1329 } 1330 else 1331 { 1332 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException()); 1333 } 1334 } 1335 Dispatch(IChannelListener listener)1336 protected override void Dispatch(IChannelListener listener) 1337 { 1338 SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>; 1339 if (inputListener != null) 1340 { 1341 inputListener.Dispatch(); 1342 return; 1343 } 1344 SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>; 1345 if (replyListener != null) 1346 { 1347 replyListener.Dispatch(); 1348 return; 1349 } 1350 1351 throw Fx.AssertAndThrow("ReplyChannelDemuxer.Dispatch (false)"); 1352 } 1353 EndpointNotFoundCallback(IAsyncResult result)1354 void EndpointNotFoundCallback(IAsyncResult result) 1355 { 1356 if (result.CompletedSynchronously) 1357 { 1358 return; 1359 } 1360 RequestContext item = (RequestContext)result.AsyncState; 1361 bool abortItem = true; 1362 try 1363 { 1364 ReplyChannelDemuxFailureAsyncResult.End(result); 1365 abortItem = false; 1366 } 1367 catch (TimeoutException e) 1368 { 1369 if (TD.SendTimeoutIsEnabled()) 1370 { 1371 TD.SendTimeout(e.Message); 1372 } 1373 1374 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1375 } 1376 catch (CommunicationException e) 1377 { 1378 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1379 } 1380 catch (ObjectDisposedException e) 1381 { 1382 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1383 } 1384 catch (Exception e) 1385 { 1386 if (Fx.IsFatal(e)) throw; 1387 this.HandleUnknownException(e); 1388 } 1389 finally 1390 { 1391 if (abortItem) 1392 { 1393 this.AbortItem(item); 1394 } 1395 } 1396 } 1397 EndpointNotFound(RequestContext request)1398 protected override void EndpointNotFound(RequestContext request) 1399 { 1400 bool abortItem = true; 1401 try 1402 { 1403 if (this.DemuxFailureHandler != null) 1404 { 1405 try 1406 { 1407 ReplyChannelDemuxFailureAsyncResult result = new ReplyChannelDemuxFailureAsyncResult(this.DemuxFailureHandler, request, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), request); 1408 result.Start(); 1409 if (!result.CompletedSynchronously) 1410 { 1411 abortItem = false; 1412 return; 1413 } 1414 ReplyChannelDemuxFailureAsyncResult.End(result); 1415 abortItem = false; 1416 } 1417 catch (CommunicationException e) 1418 { 1419 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1420 } 1421 catch (TimeoutException e) 1422 { 1423 if (TD.SendTimeoutIsEnabled()) 1424 { 1425 TD.SendTimeout(e.Message); 1426 } 1427 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1428 } 1429 catch (ObjectDisposedException e) 1430 { 1431 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1432 } 1433 catch (Exception e) 1434 { 1435 if (Fx.IsFatal(e)) throw; 1436 this.HandleUnknownException(e); 1437 } 1438 } 1439 } 1440 finally 1441 { 1442 if (abortItem) 1443 { 1444 this.AbortItem(request); 1445 } 1446 } 1447 } 1448 EndReceive(IAsyncResult result)1449 protected override RequestContext EndReceive(IAsyncResult result) 1450 { 1451 return this.InnerChannel.EndReceiveRequest(result); 1452 } 1453 EnqueueAndDispatch(IChannelListener listener, RequestContext request, Action dequeuedCallback, bool canDispatchOnThisThread)1454 protected override void EnqueueAndDispatch(IChannelListener listener, RequestContext request, Action dequeuedCallback, bool canDispatchOnThisThread) 1455 { 1456 SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>; 1457 if (inputListener != null) 1458 { 1459 inputListener.EnqueueAndDispatch(request.RequestMessage, dequeuedCallback, canDispatchOnThisThread); 1460 1461 try 1462 { 1463 request.Close(); 1464 } 1465 catch (CommunicationException e) 1466 { 1467 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1468 } 1469 catch (TimeoutException e) 1470 { 1471 if (TD.CloseTimeoutIsEnabled()) 1472 { 1473 TD.CloseTimeout(e.Message); 1474 } 1475 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1476 } 1477 } 1478 SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>; 1479 if (replyListener != null) 1480 { 1481 replyListener.EnqueueAndDispatch(request, dequeuedCallback, canDispatchOnThisThread); 1482 return; 1483 } 1484 1485 throw Fx.AssertAndThrow("ReplyChannelDemuxer.EnqueueAndDispatch (false)"); 1486 } 1487 EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1488 protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread) 1489 { 1490 SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>; 1491 if (inputListener != null) 1492 { 1493 inputListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread); 1494 return; 1495 } 1496 1497 SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>; 1498 if (replyListener != null) 1499 { 1500 replyListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread); 1501 return; 1502 } 1503 1504 throw Fx.AssertAndThrow("ReplyChannelDemuxer.EnqueueAndDispatch (false)"); 1505 } 1506 GetMessage(RequestContext request)1507 protected override Message GetMessage(RequestContext request) 1508 { 1509 return request.RequestMessage; 1510 } 1511 } 1512 1513 interface IChannelDemuxerFilter 1514 { 1515 ChannelDemuxerFilter Filter { get; } 1516 } 1517 1518 class SingletonChannelListener<TChannel, TQueuedChannel, TQueuedItem> : DelegatingChannelListener<TChannel>, IChannelDemuxerFilter 1519 where TChannel : class, IChannel 1520 where TQueuedChannel : InputQueueChannel<TQueuedItem> 1521 where TQueuedItem : class, IDisposable 1522 { 1523 ChannelDemuxerFilter filter; 1524 IChannelDemuxer channelDemuxer; 1525 SingletonChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)1526 public SingletonChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer) 1527 : base(true) 1528 { 1529 this.filter = filter; 1530 this.channelDemuxer = channelDemuxer; 1531 } 1532 1533 public ChannelDemuxerFilter Filter 1534 { 1535 get { return this.filter; } 1536 } 1537 1538 SingletonChannelAcceptor<TChannel, TQueuedChannel, TQueuedItem> SingletonAcceptor 1539 { 1540 get { return (SingletonChannelAcceptor<TChannel, TQueuedChannel, TQueuedItem>)base.Acceptor; } 1541 set { this.Acceptor = value; } 1542 } 1543 OnOpen(TimeSpan timeout)1544 protected override void OnOpen(TimeSpan timeout) 1545 { 1546 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 1547 this.channelDemuxer.OnOuterListenerOpen(this.filter, this, timeoutHelper.RemainingTime()); 1548 base.OnOpen(timeoutHelper.RemainingTime()); 1549 } 1550 OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)1551 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 1552 { 1553 return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerOpen, this.OnEndOuterListenerOpen, base.OnBeginOpen, base.OnEndOpen); 1554 } 1555 OnEndOpen(IAsyncResult result)1556 protected override void OnEndOpen(IAsyncResult result) 1557 { 1558 ChainedAsyncResult.End(result); 1559 } 1560 OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)1561 IAsyncResult OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state) 1562 { 1563 return this.channelDemuxer.OnBeginOuterListenerOpen(this.filter, this, timeout, callback, state); 1564 } 1565 OnEndOuterListenerOpen(IAsyncResult result)1566 void OnEndOuterListenerOpen(IAsyncResult result) 1567 { 1568 this.channelDemuxer.OnEndOuterListenerOpen(result); 1569 } 1570 OnAbort()1571 protected override void OnAbort() 1572 { 1573 this.channelDemuxer.OnOuterListenerAbort(this.filter); 1574 base.OnAbort(); 1575 } 1576 OnClose(TimeSpan timeout)1577 protected override void OnClose(TimeSpan timeout) 1578 { 1579 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 1580 this.channelDemuxer.OnOuterListenerClose(this.filter, timeoutHelper.RemainingTime()); 1581 base.OnClose(timeoutHelper.RemainingTime()); 1582 } 1583 OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)1584 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) 1585 { 1586 return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerClose, this.OnEndOuterListenerClose, base.OnBeginClose, base.OnEndClose); 1587 } 1588 OnEndClose(IAsyncResult result)1589 protected override void OnEndClose(IAsyncResult result) 1590 { 1591 ChainedAsyncResult.End(result); 1592 } 1593 OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)1594 IAsyncResult OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state) 1595 { 1596 return this.channelDemuxer.OnBeginOuterListenerClose(this.filter, timeout, callback, state); 1597 } 1598 OnEndOuterListenerClose(IAsyncResult result)1599 void OnEndOuterListenerClose(IAsyncResult result) 1600 { 1601 this.channelDemuxer.OnEndOuterListenerClose(result); 1602 } 1603 Dispatch()1604 public void Dispatch() 1605 { 1606 this.SingletonAcceptor.DispatchItems(); 1607 } 1608 EnqueueAndDispatch(TQueuedItem item, Action dequeuedCallback, bool canDispatchOnThisThread)1609 public void EnqueueAndDispatch(TQueuedItem item, Action dequeuedCallback, bool canDispatchOnThisThread) 1610 { 1611 this.SingletonAcceptor.EnqueueAndDispatch(item, dequeuedCallback, canDispatchOnThisThread); 1612 } 1613 EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)1614 public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread) 1615 { 1616 this.SingletonAcceptor.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread); 1617 } 1618 } 1619 1620 // 1621 // Session demuxers 1622 // 1623 1624 abstract class SessionChannelDemuxer<TInnerChannel, TInnerItem> : TypedChannelDemuxer, IChannelDemuxer 1625 where TInnerChannel : class, IChannel 1626 where TInnerItem : class, IDisposable 1627 { 1628 IChannelDemuxFailureHandler demuxFailureHandler; 1629 MessageFilterTable<InputQueueChannelListener<TInnerChannel>> filterTable; 1630 IChannelListener<TInnerChannel> innerListener; 1631 static AsyncCallback onAcceptComplete = Fx.ThunkCallback(new AsyncCallback(OnAcceptCompleteStatic)); 1632 static AsyncCallback onPeekComplete = Fx.ThunkCallback(new AsyncCallback(OnPeekCompleteStatic)); 1633 Action onItemDequeued; 1634 static WaitCallback scheduleAcceptStatic = new WaitCallback(ScheduleAcceptStatic); 1635 static Action<object> startAcceptStatic = new Action<object>(StartAcceptStatic); 1636 Action<object> onStartAccepting; 1637 int openCount; 1638 ThreadNeutralSemaphore openSemaphore; 1639 Exception pendingExceptionOnOpen; 1640 bool abortOngoingOpen; 1641 FlowThrottle throttle; 1642 TimeSpan peekTimeout; 1643 SessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)1644 public SessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions) 1645 { 1646 if (context.BindingParameters != null) 1647 { 1648 this.demuxFailureHandler = context.BindingParameters.Find<IChannelDemuxFailureHandler>(); 1649 } 1650 this.innerListener = context.BuildInnerChannelListener<TInnerChannel>(); 1651 this.filterTable = new MessageFilterTable<InputQueueChannelListener<TInnerChannel>>(); 1652 this.openSemaphore = new ThreadNeutralSemaphore(1); 1653 this.peekTimeout = peekTimeout; 1654 this.throttle = new FlowThrottle(scheduleAcceptStatic, maxPendingSessions, null, null); 1655 } 1656 1657 protected object ThisLock 1658 { 1659 get { return this; } 1660 } 1661 1662 protected IChannelDemuxFailureHandler DemuxFailureHandler 1663 { 1664 get { return this.demuxFailureHandler; } 1665 } 1666 1667 Action<object> OnStartAccepting 1668 { 1669 get 1670 { 1671 if (this.onStartAccepting == null) 1672 { 1673 this.onStartAccepting = new Action<object>(OnStartAcceptingCallback); 1674 } 1675 1676 return this.onStartAccepting; 1677 } 1678 } 1679 AbortItem(TInnerItem item)1680 protected abstract void AbortItem(TInnerItem item); BeginReceive(TInnerChannel channel, AsyncCallback callback, object state)1681 protected abstract IAsyncResult BeginReceive(TInnerChannel channel, AsyncCallback callback, object state); BeginReceive(TInnerChannel channel, TimeSpan timeout, AsyncCallback callback, object state)1682 protected abstract IAsyncResult BeginReceive(TInnerChannel channel, TimeSpan timeout, AsyncCallback callback, object state); CreateChannel(ChannelManagerBase channelManager, TInnerChannel innerChannel, TInnerItem firstItem)1683 protected abstract TInnerChannel CreateChannel(ChannelManagerBase channelManager, TInnerChannel innerChannel, TInnerItem firstItem); EndpointNotFound(TInnerChannel channel, TInnerItem item)1684 protected abstract void EndpointNotFound(TInnerChannel channel, TInnerItem item); EndReceive(TInnerChannel channel, IAsyncResult result)1685 protected abstract TInnerItem EndReceive(TInnerChannel channel, IAsyncResult result); GetMessage(TInnerItem item)1686 protected abstract Message GetMessage(TInnerItem item); 1687 BuildChannelListener(ChannelDemuxerFilter filter)1688 public override IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter) 1689 { 1690 Fx.Assert(typeof(TChannel) == typeof(TInnerChannel), "SessionChannelDemuxer.BuildChannelListener (typeof(TChannel) == typeof(TInnerChannel))"); 1691 1692 InputQueueChannelListener<TChannel> listener = new InputQueueChannelListener<TChannel>(filter, this); 1693 listener.InnerChannelListener = this.innerListener; 1694 return listener; 1695 } 1696 1697 // return true if another BeginAcceptChannel should pend BeginAcceptChannel(bool requiresThrottle, out IAsyncResult result)1698 bool BeginAcceptChannel(bool requiresThrottle, out IAsyncResult result) 1699 { 1700 result = null; 1701 1702 if (requiresThrottle && !this.throttle.Acquire(this)) 1703 { 1704 return false; 1705 } 1706 1707 bool releaseThrottle = true; 1708 1709 try 1710 { 1711 result = this.innerListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptComplete, this); 1712 releaseThrottle = false; 1713 } 1714 catch (CommunicationObjectFaultedException e) 1715 { 1716 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1717 return false; 1718 } 1719 catch (CommunicationObjectAbortedException e) 1720 { 1721 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1722 return false; 1723 } 1724 catch (ObjectDisposedException e) 1725 { 1726 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1727 return false; 1728 } 1729 catch (CommunicationException e) 1730 { 1731 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1732 return true; 1733 } 1734 catch (TimeoutException e) 1735 { 1736 if (TD.OpenTimeoutIsEnabled()) 1737 { 1738 TD.OpenTimeout(e.Message); 1739 } 1740 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1741 return true; 1742 } 1743 catch (Exception e) 1744 { 1745 if (Fx.IsFatal(e)) throw; 1746 this.HandleUnknownException(e); 1747 releaseThrottle = false; 1748 return false; 1749 } 1750 finally 1751 { 1752 if (releaseThrottle) 1753 { 1754 this.throttle.Release(); 1755 } 1756 } 1757 1758 return true; 1759 } 1760 EndAcceptChannel(IAsyncResult result, out TInnerChannel channel)1761 bool EndAcceptChannel(IAsyncResult result, out TInnerChannel channel) 1762 { 1763 channel = null; 1764 bool releaseThrottle = true; 1765 try 1766 { 1767 channel = this.innerListener.EndAcceptChannel(result); 1768 releaseThrottle = (channel == null); 1769 } 1770 catch (CommunicationObjectFaultedException e) 1771 { 1772 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1773 return false; 1774 } 1775 catch (CommunicationObjectAbortedException e) 1776 { 1777 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1778 return false; 1779 } 1780 catch (ObjectDisposedException e) 1781 { 1782 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1783 return false; 1784 } 1785 catch (CommunicationException e) 1786 { 1787 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1788 return true; 1789 } 1790 catch (TimeoutException e) 1791 { 1792 if (TD.OpenTimeoutIsEnabled()) 1793 { 1794 TD.OpenTimeout(e.Message); 1795 } 1796 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1797 return true; 1798 } 1799 catch (Exception e) 1800 { 1801 if (Fx.IsFatal(e)) throw; 1802 this.HandleUnknownException(e); 1803 releaseThrottle = false; 1804 return false; 1805 } 1806 finally 1807 { 1808 if (releaseThrottle) 1809 { 1810 throttle.Release(); 1811 } 1812 } 1813 1814 return (channel != null); 1815 } 1816 PeekChannel(TInnerChannel channel)1817 void PeekChannel(TInnerChannel channel) 1818 { 1819 bool releaseThrottle = true; 1820 try 1821 { 1822 IAsyncResult peekResult = new PeekAsyncResult(this, channel, onPeekComplete, this); 1823 releaseThrottle = false; 1824 if (!peekResult.CompletedSynchronously) 1825 { 1826 return; 1827 } 1828 channel = null; 1829 this.HandlePeekResult(peekResult); 1830 } 1831 catch (CommunicationException e) 1832 { 1833 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1834 } 1835 catch (TimeoutException e) 1836 { 1837 if (TD.OpenTimeoutIsEnabled()) 1838 { 1839 TD.OpenTimeout(e.Message); 1840 } 1841 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1842 } 1843 catch (ObjectDisposedException e) 1844 { 1845 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1846 } 1847 catch (Exception e) 1848 { 1849 if (Fx.IsFatal(e)) throw; 1850 this.HandleUnknownException(e); 1851 releaseThrottle = false; 1852 } 1853 1854 if (channel != null) 1855 { 1856 channel.Abort(); 1857 } 1858 1859 if (releaseThrottle) 1860 { 1861 this.throttle.Release(); 1862 } 1863 } 1864 HandlePeekResult(IAsyncResult result)1865 void HandlePeekResult(IAsyncResult result) 1866 { 1867 TInnerChannel channel = null; 1868 TInnerItem item; 1869 bool abortChannel = false; 1870 bool releaseThrottle = true; 1871 try 1872 { 1873 PeekAsyncResult.End(result, out channel, out item); 1874 releaseThrottle = (item == null); 1875 } 1876 catch (ObjectDisposedException e) 1877 { 1878 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1879 abortChannel = true; 1880 return; 1881 } 1882 catch (CommunicationException e) 1883 { 1884 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1885 abortChannel = true; 1886 return; 1887 } 1888 catch (TimeoutException e) 1889 { 1890 if (TD.OpenTimeoutIsEnabled()) 1891 { 1892 TD.OpenTimeout(e.Message); 1893 } 1894 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1895 abortChannel = true; 1896 return; 1897 } 1898 catch (Exception e) 1899 { 1900 if (Fx.IsFatal(e)) throw; 1901 this.HandleUnknownException(e); 1902 releaseThrottle = false; 1903 return; 1904 } 1905 finally 1906 { 1907 if (abortChannel && channel != null) 1908 { 1909 channel.Abort(); 1910 } 1911 1912 if (releaseThrottle) 1913 { 1914 this.throttle.Release(); 1915 } 1916 } 1917 1918 if (item != null) 1919 { 1920 releaseThrottle = true; 1921 1922 try 1923 { 1924 this.ProcessItem(channel, item); 1925 releaseThrottle = false; 1926 } 1927 catch (CommunicationException e) 1928 { 1929 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1930 } 1931 catch (TimeoutException e) 1932 { 1933 if (TD.OpenTimeoutIsEnabled()) 1934 { 1935 TD.OpenTimeout(e.Message); 1936 } 1937 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 1938 } 1939 catch (Exception e) 1940 { 1941 if (Fx.IsFatal(e)) throw; 1942 this.HandleUnknownException(e); 1943 releaseThrottle = false; 1944 } 1945 finally 1946 { 1947 if (releaseThrottle) 1948 { 1949 this.throttle.Release(); 1950 } 1951 } 1952 } 1953 } 1954 MatchListener(Message message)1955 InputQueueChannelListener<TInnerChannel> MatchListener(Message message) 1956 { 1957 InputQueueChannelListener<TInnerChannel> matchingListener = null; 1958 lock (this.ThisLock) 1959 { 1960 if (this.filterTable.GetMatchingValue(message, out matchingListener)) 1961 { 1962 return matchingListener; 1963 } 1964 } 1965 return null; 1966 } 1967 OnAcceptCompleteStatic(IAsyncResult result)1968 static void OnAcceptCompleteStatic(IAsyncResult result) 1969 { 1970 if (result.CompletedSynchronously) 1971 { 1972 return; 1973 } 1974 1975 ((SessionChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState).OnStartAcceptingCallback(result); 1976 } 1977 ScheduleAcceptStatic(object state)1978 static void ScheduleAcceptStatic(object state) 1979 { 1980 ActionItem.Schedule(startAcceptStatic, state); 1981 } 1982 StartAcceptStatic(object state)1983 static void StartAcceptStatic(object state) 1984 { 1985 ((SessionChannelDemuxer<TInnerChannel, TInnerItem>)state).StartAccepting(false); 1986 } 1987 ShouldStartAccepting(ChannelDemuxerFilter filter, IChannelListener listener)1988 bool ShouldStartAccepting(ChannelDemuxerFilter filter, IChannelListener listener) 1989 { 1990 lock (this.ThisLock) 1991 { 1992 // the listener's Abort may be racing with Open 1993 if (listener.State == CommunicationState.Closed || listener.State == CommunicationState.Closing) 1994 { 1995 return false; 1996 } 1997 1998 this.filterTable.Add(filter.Filter, (InputQueueChannelListener<TInnerChannel>)(object)listener, filter.Priority); 1999 if (++this.openCount == 1) 2000 { 2001 this.abortOngoingOpen = false; 2002 return true; 2003 } 2004 } 2005 return false; 2006 } 2007 StartAccepting(bool requiresThrottle)2008 void StartAccepting(bool requiresThrottle) 2009 { 2010 IAsyncResult acceptResult; 2011 bool acceptValid = this.BeginAcceptChannel(requiresThrottle, out acceptResult); 2012 if (acceptValid && (acceptResult == null || acceptResult.CompletedSynchronously)) 2013 { 2014 // need to spawn another thread to process this completion 2015 ActionItem.Schedule(OnStartAccepting, acceptResult); 2016 } 2017 } 2018 OnItemDequeued()2019 void OnItemDequeued() 2020 { 2021 this.throttle.Release(); 2022 } 2023 ThrowPendingOpenExceptionIfAny()2024 void ThrowPendingOpenExceptionIfAny() 2025 { 2026 if (this.pendingExceptionOnOpen != null) 2027 { 2028 if (pendingExceptionOnOpen is CommunicationObjectAbortedException) 2029 { 2030 throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectAbortedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString()))); 2031 } 2032 else if (pendingExceptionOnOpen is CommunicationObjectFaultedException) 2033 { 2034 throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectFaultedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString()))); 2035 } 2036 else 2037 { 2038 throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString()))); 2039 } 2040 } 2041 } 2042 OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)2043 public void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout) 2044 { 2045 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 2046 this.openSemaphore.Enter(timeoutHelper.RemainingTime()); 2047 try 2048 { 2049 bool startAccepting = ShouldStartAccepting(filter, listener); 2050 if (startAccepting) 2051 { 2052 try 2053 { 2054 this.innerListener.Open(timeoutHelper.RemainingTime()); 2055 StartAccepting(true); 2056 lock (ThisLock) 2057 { 2058 if (this.abortOngoingOpen) 2059 { 2060 this.innerListener.Abort(); 2061 } 2062 } 2063 } 2064 #pragma warning suppress 56500 // covered by FxCOP 2065 catch (Exception e) 2066 { 2067 this.pendingExceptionOnOpen = e; 2068 throw; 2069 } 2070 } 2071 else 2072 { 2073 this.ThrowPendingOpenExceptionIfAny(); 2074 } 2075 } 2076 finally 2077 { 2078 this.openSemaphore.Exit(); 2079 } 2080 } 2081 2082 OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)2083 public IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state) 2084 { 2085 return new OpenAsyncResult(this, filter, listener, timeout, callback, state); 2086 } 2087 OnEndOuterListenerOpen(IAsyncResult result)2088 public void OnEndOuterListenerOpen(IAsyncResult result) 2089 { 2090 OpenAsyncResult.End(result); 2091 } 2092 ShouldCloseInnerListener(ChannelDemuxerFilter filter, bool aborted)2093 bool ShouldCloseInnerListener(ChannelDemuxerFilter filter, bool aborted) 2094 { 2095 lock (this.ThisLock) 2096 { 2097 if (this.filterTable.ContainsKey(filter.Filter)) 2098 { 2099 this.filterTable.Remove(filter.Filter); 2100 if (--this.openCount == 0) 2101 { 2102 if (aborted) 2103 { 2104 this.abortOngoingOpen = true; 2105 } 2106 return true; 2107 } 2108 } 2109 } 2110 return false; 2111 } 2112 OnOuterListenerAbort(ChannelDemuxerFilter filter)2113 public void OnOuterListenerAbort(ChannelDemuxerFilter filter) 2114 { 2115 if (ShouldCloseInnerListener(filter, true)) 2116 { 2117 innerListener.Abort(); 2118 } 2119 } 2120 OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)2121 public void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout) 2122 { 2123 if (ShouldCloseInnerListener(filter, false)) 2124 { 2125 bool closeSucceeded = false; 2126 try 2127 { 2128 innerListener.Close(timeout); 2129 closeSucceeded = true; 2130 } 2131 finally 2132 { 2133 if (!closeSucceeded) 2134 { 2135 // we should abort the state since calling Abort on the channel demuxer will be a no-op 2136 // due to the reference count being 0 2137 innerListener.Abort(); 2138 } 2139 } 2140 } 2141 } 2142 OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)2143 public IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state) 2144 { 2145 if (ShouldCloseInnerListener(filter, false)) 2146 { 2147 bool closeSucceeded = false; 2148 try 2149 { 2150 IAsyncResult result = this.innerListener.BeginClose(timeout, callback, state); 2151 closeSucceeded = true; 2152 return result; 2153 } 2154 finally 2155 { 2156 if (!closeSucceeded) 2157 { 2158 // we should abort the state since calling Abort on the channel demuxer will be a no-op 2159 // due to the reference count being 0 2160 this.innerListener.Abort(); 2161 } 2162 } 2163 } 2164 else 2165 { 2166 return new CompletedAsyncResult(callback, state); 2167 } 2168 } 2169 OnEndOuterListenerClose(IAsyncResult result)2170 public void OnEndOuterListenerClose(IAsyncResult result) 2171 { 2172 if (result is CompletedAsyncResult) 2173 { 2174 CompletedAsyncResult.End(result); 2175 } 2176 else 2177 { 2178 bool closeSucceeded = false; 2179 try 2180 { 2181 this.innerListener.EndClose(result); 2182 closeSucceeded = true; 2183 } 2184 finally 2185 { 2186 if (!closeSucceeded) 2187 { 2188 // we should abort the state since calling Abort on the channel demuxer will be a no-op 2189 // due to the reference count being 0 2190 this.innerListener.Abort(); 2191 } 2192 } 2193 } 2194 } 2195 OnStartAcceptingCallback(object state)2196 void OnStartAcceptingCallback(object state) 2197 { 2198 IAsyncResult result = (IAsyncResult)state; 2199 TInnerChannel channel = null; 2200 2201 if (result == null || this.EndAcceptChannel(result, out channel)) 2202 { 2203 this.StartAccepting(channel); 2204 } 2205 } 2206 OnPeekCompleteStatic(IAsyncResult result)2207 static void OnPeekCompleteStatic(IAsyncResult result) 2208 { 2209 if (result.CompletedSynchronously) 2210 { 2211 return; 2212 } 2213 2214 SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer 2215 = (SessionChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState; 2216 2217 bool releaseThrottle = true; 2218 2219 try 2220 { 2221 demuxer.HandlePeekResult(result); 2222 releaseThrottle = false; 2223 } 2224 catch (CommunicationException e) 2225 { 2226 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2227 } 2228 catch (ObjectDisposedException e) 2229 { 2230 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2231 } 2232 catch (Exception e) 2233 { 2234 if (Fx.IsFatal(e)) throw; 2235 demuxer.HandleUnknownException(e); 2236 releaseThrottle = false; 2237 } 2238 finally 2239 { 2240 if (releaseThrottle) 2241 { 2242 demuxer.throttle.Release(); 2243 } 2244 } 2245 } 2246 ProcessItem(TInnerChannel channel, TInnerItem item)2247 void ProcessItem(TInnerChannel channel, TInnerItem item) 2248 { 2249 InputQueueChannelListener<TInnerChannel> listener = null; 2250 TInnerChannel wrappedChannel = null; 2251 bool releaseThrottle = true; 2252 2253 try 2254 { 2255 Message message = this.GetMessage(item); 2256 try 2257 { 2258 listener = MatchListener(message); 2259 releaseThrottle = (listener == null); 2260 } 2261 // MatchListener could run the filters against an untrusted message and could throw. 2262 // If so, abort the session 2263 catch (CommunicationException e) 2264 { 2265 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2266 return; 2267 } 2268 catch (MultipleFilterMatchesException e) 2269 { 2270 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2271 return; 2272 } 2273 catch (XmlException e) 2274 { 2275 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2276 return; 2277 } 2278 finally 2279 { 2280 if (releaseThrottle) 2281 { 2282 this.throttle.Release(); 2283 } 2284 } 2285 2286 if (listener == null) 2287 { 2288 try 2289 { 2290 throw TraceUtility.ThrowHelperError(new EndpointNotFoundException(SR.GetString(SR.UnableToDemuxChannel, message.Headers.Action)), message); 2291 } 2292 catch (EndpointNotFoundException e) 2293 { 2294 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2295 this.EndpointNotFound(channel, item); 2296 // EndpointNotFound is responsible for closing and aborting the channel 2297 channel = null; 2298 item = null; 2299 } 2300 return; 2301 } 2302 2303 wrappedChannel = this.CreateChannel(listener, channel, item); 2304 channel = null; 2305 item = null; 2306 } 2307 finally 2308 { 2309 if (item != null) 2310 { 2311 this.AbortItem(item); 2312 } 2313 if (channel != null) 2314 { 2315 channel.Abort(); 2316 } 2317 } 2318 2319 bool enqueueSucceeded = false; 2320 try 2321 { 2322 if (this.onItemDequeued == null) 2323 { 2324 this.onItemDequeued = new Action(this.OnItemDequeued); 2325 } 2326 2327 listener.InputQueueAcceptor.EnqueueAndDispatch(wrappedChannel, this.onItemDequeued, false); 2328 enqueueSucceeded = true; 2329 } 2330 catch (Exception e) 2331 { 2332 if (Fx.IsFatal(e)) throw; 2333 this.HandleUnknownException(e); 2334 } 2335 finally 2336 { 2337 if (!enqueueSucceeded) 2338 { 2339 this.throttle.Release(); 2340 wrappedChannel.Abort(); 2341 } 2342 } 2343 } 2344 HandleUnknownException(Exception exception)2345 protected void HandleUnknownException(Exception exception) 2346 { 2347 InputQueueChannelListener<TInnerChannel> listener = null; 2348 2349 lock (this.ThisLock) 2350 { 2351 if (this.filterTable.Count > 0) 2352 { 2353 KeyValuePair<MessageFilter, InputQueueChannelListener<TInnerChannel>>[] pairs = new KeyValuePair<MessageFilter, InputQueueChannelListener<TInnerChannel>>[this.filterTable.Count]; 2354 this.filterTable.CopyTo(pairs, 0); 2355 listener = pairs[0].Value; 2356 2357 if (this.onItemDequeued == null) 2358 { 2359 this.onItemDequeued = new Action(OnItemDequeued); 2360 } 2361 2362 listener.InputQueueAcceptor.EnqueueAndDispatch(exception, this.onItemDequeued, false); 2363 } 2364 } 2365 } 2366 StartAccepting(TInnerChannel channelToPeek)2367 void StartAccepting(TInnerChannel channelToPeek) 2368 { 2369 for (;;) 2370 { 2371 IAsyncResult result; 2372 bool acceptValid = this.BeginAcceptChannel(true, out result); 2373 2374 if (channelToPeek != null) 2375 { 2376 if (acceptValid && (result == null || result.CompletedSynchronously)) 2377 { 2378 // need to spawn another thread to process this completion 2379 // since we're going to process channelToPeek on this thread 2380 ActionItem.Schedule(OnStartAccepting, result); 2381 } 2382 2383 PeekChannel(channelToPeek); 2384 return; 2385 } 2386 else 2387 { 2388 if (!acceptValid) 2389 { 2390 return; // we're done, listener is toast 2391 } 2392 2393 if (result == null) 2394 { 2395 continue; 2396 } 2397 2398 if (!result.CompletedSynchronously) 2399 { 2400 return; 2401 } 2402 2403 if (!this.EndAcceptChannel(result, out channelToPeek)) 2404 { 2405 return; 2406 } 2407 } 2408 } 2409 } 2410 2411 class PeekAsyncResult : AsyncResult 2412 { 2413 TInnerChannel channel; 2414 SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer; 2415 TInnerItem item; 2416 static AsyncCallback onOpenComplete = Fx.ThunkCallback(new AsyncCallback(OnOpenCompleteStatic)); 2417 static AsyncCallback onReceiveComplete = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompleteStatic)); 2418 PeekAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer, TInnerChannel channel, AsyncCallback callback, object state)2419 public PeekAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer, TInnerChannel channel, AsyncCallback callback, object state) 2420 : base(callback, state) 2421 { 2422 this.demuxer = demuxer; 2423 this.channel = channel; 2424 IAsyncResult result = this.channel.BeginOpen(onOpenComplete, this); 2425 if (!result.CompletedSynchronously) 2426 { 2427 return; 2428 } 2429 if (this.HandleOpenComplete(result)) 2430 { 2431 this.Complete(true); 2432 } 2433 } 2434 End(IAsyncResult result, out TInnerChannel channel, out TInnerItem item)2435 public static void End(IAsyncResult result, out TInnerChannel channel, out TInnerItem item) 2436 { 2437 PeekAsyncResult peekResult = AsyncResult.End<PeekAsyncResult>(result); 2438 channel = peekResult.channel; 2439 item = peekResult.item; 2440 } 2441 HandleOpenComplete(IAsyncResult result)2442 bool HandleOpenComplete(IAsyncResult result) 2443 { 2444 this.channel.EndOpen(result); 2445 2446 IAsyncResult receiveResult; 2447 2448 if (this.demuxer.peekTimeout == ChannelDemuxer.UseDefaultReceiveTimeout) 2449 { 2450 //use the default ReceiveTimeout for the channel 2451 receiveResult = this.demuxer.BeginReceive(this.channel, onReceiveComplete, this); 2452 } 2453 else 2454 { 2455 receiveResult = this.demuxer.BeginReceive(this.channel, this.demuxer.peekTimeout, onReceiveComplete, this); 2456 } 2457 2458 if (receiveResult.CompletedSynchronously) 2459 { 2460 this.HandleReceiveComplete(receiveResult); 2461 return true; 2462 } 2463 2464 return false; 2465 } 2466 OnOpenCompleteStatic(IAsyncResult result)2467 static void OnOpenCompleteStatic(IAsyncResult result) 2468 { 2469 if (result.CompletedSynchronously) 2470 return; 2471 2472 PeekAsyncResult peekAsyncResult = (PeekAsyncResult)result.AsyncState; 2473 2474 bool completeSelf = false; 2475 Exception exception = null; 2476 2477 try 2478 { 2479 completeSelf = peekAsyncResult.HandleOpenComplete(result); 2480 } 2481 catch (Exception e) 2482 { 2483 if (Fx.IsFatal(e)) 2484 { 2485 throw; 2486 } 2487 exception = e; 2488 completeSelf = true; 2489 } 2490 2491 if (completeSelf) 2492 { 2493 peekAsyncResult.Complete(false, exception); 2494 } 2495 } 2496 HandleReceiveComplete(IAsyncResult result)2497 void HandleReceiveComplete(IAsyncResult result) 2498 { 2499 this.item = this.demuxer.EndReceive(this.channel, result); 2500 } 2501 OnReceiveCompleteStatic(IAsyncResult result)2502 static void OnReceiveCompleteStatic(IAsyncResult result) 2503 { 2504 if (result.CompletedSynchronously) 2505 return; 2506 2507 PeekAsyncResult peekAsyncResult = (PeekAsyncResult)result.AsyncState; 2508 Exception exception = null; 2509 2510 try 2511 { 2512 peekAsyncResult.HandleReceiveComplete(result); 2513 } 2514 catch (Exception e) 2515 { 2516 if (Fx.IsFatal(e)) 2517 { 2518 throw; 2519 } 2520 exception = e; 2521 } 2522 2523 peekAsyncResult.Complete(false, exception); 2524 } 2525 } 2526 2527 class OpenAsyncResult : AsyncResult 2528 { 2529 static FastAsyncCallback waitOverCallback = new FastAsyncCallback(WaitOverCallback); 2530 static AsyncCallback openListenerCallback = Fx.ThunkCallback(new AsyncCallback(OpenListenerCallback)); 2531 SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer; 2532 ChannelDemuxerFilter filter; 2533 IChannelListener listener; 2534 TimeoutHelper timeoutHelper; 2535 bool startAccepting; 2536 OpenAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)2537 public OpenAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state) 2538 : base(callback, state) 2539 { 2540 this.channelDemuxer = channelDemuxer; 2541 this.filter = filter; 2542 this.listener = listener; 2543 this.timeoutHelper = new TimeoutHelper(timeout); 2544 if (!this.channelDemuxer.openSemaphore.EnterAsync(this.timeoutHelper.RemainingTime(), waitOverCallback, this)) 2545 { 2546 return; 2547 } 2548 2549 bool waitOverSucceeded = false; 2550 bool completeSelf = false; 2551 try 2552 { 2553 completeSelf = this.OnWaitOver(); 2554 waitOverSucceeded = true; 2555 } 2556 finally 2557 { 2558 if (!waitOverSucceeded) 2559 { 2560 Cleanup(); 2561 } 2562 } 2563 if (completeSelf) 2564 { 2565 Cleanup(); 2566 Complete(true); 2567 } 2568 } 2569 WaitOverCallback(object state, Exception asyncException)2570 static void WaitOverCallback(object state, Exception asyncException) 2571 { 2572 OpenAsyncResult self = (OpenAsyncResult)state; 2573 bool completeSelf = false; 2574 Exception completionException = asyncException; 2575 if (completionException != null) 2576 { 2577 completeSelf = true; 2578 } 2579 else 2580 { 2581 try 2582 { 2583 completeSelf = self.OnWaitOver(); 2584 } 2585 #pragma warning suppress 56500 // covered by FxCOP 2586 catch (Exception e) 2587 { 2588 if (Fx.IsFatal(e)) throw; 2589 completeSelf = true; 2590 completionException = e; 2591 } 2592 } 2593 2594 if (completeSelf) 2595 { 2596 self.Cleanup(); 2597 self.Complete(false, completionException); 2598 } 2599 } 2600 OnWaitOver()2601 bool OnWaitOver() 2602 { 2603 this.startAccepting = this.channelDemuxer.ShouldStartAccepting(this.filter, this.listener); 2604 if (!this.startAccepting) 2605 { 2606 this.channelDemuxer.ThrowPendingOpenExceptionIfAny(); 2607 return true; 2608 } 2609 else 2610 { 2611 return this.OnStartAccepting(); 2612 } 2613 } 2614 OnEndInnerListenerOpen(IAsyncResult result)2615 void OnEndInnerListenerOpen(IAsyncResult result) 2616 { 2617 this.channelDemuxer.innerListener.EndOpen(result); 2618 this.channelDemuxer.StartAccepting(true); 2619 lock (this.channelDemuxer.ThisLock) 2620 { 2621 if (this.channelDemuxer.abortOngoingOpen) 2622 { 2623 this.channelDemuxer.innerListener.Abort(); 2624 } 2625 } 2626 } 2627 OnStartAccepting()2628 bool OnStartAccepting() 2629 { 2630 try 2631 { 2632 IAsyncResult result = this.channelDemuxer.innerListener.BeginOpen(timeoutHelper.RemainingTime(), openListenerCallback, this); 2633 if (!result.CompletedSynchronously) 2634 { 2635 return false; 2636 } 2637 this.OnEndInnerListenerOpen(result); 2638 return true; 2639 } 2640 #pragma warning suppress 56500 // covered by FxCOP 2641 catch (Exception e) 2642 { 2643 this.channelDemuxer.pendingExceptionOnOpen = e; 2644 throw; 2645 } 2646 } 2647 OpenListenerCallback(IAsyncResult result)2648 static void OpenListenerCallback(IAsyncResult result) 2649 { 2650 if (result.CompletedSynchronously) 2651 { 2652 return; 2653 } 2654 OpenAsyncResult self = (OpenAsyncResult)result.AsyncState; 2655 Exception completionException = null; 2656 try 2657 { 2658 self.OnEndInnerListenerOpen(result); 2659 } 2660 #pragma warning suppress 56500 // covered by FxCOP 2661 catch (Exception e) 2662 { 2663 if (Fx.IsFatal(e)) throw; 2664 completionException = e; 2665 } 2666 if (completionException != null) 2667 { 2668 self.channelDemuxer.pendingExceptionOnOpen = completionException; 2669 } 2670 self.Cleanup(); 2671 self.Complete(false, completionException); 2672 } 2673 Cleanup()2674 void Cleanup() 2675 { 2676 this.channelDemuxer.openSemaphore.Exit(); 2677 } 2678 End(IAsyncResult result)2679 public static void End(IAsyncResult result) 2680 { 2681 AsyncResult.End<OpenAsyncResult>(result); 2682 } 2683 } 2684 } 2685 2686 class InputSessionChannelDemuxer : SessionChannelDemuxer<IInputSessionChannel, Message> 2687 { InputSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)2688 public InputSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions) 2689 : base(context, peekTimeout, maxPendingSessions) 2690 { 2691 } 2692 AbortItem(Message message)2693 protected override void AbortItem(Message message) 2694 { 2695 AbortMessage(message); 2696 } 2697 BeginReceive(IInputSessionChannel channel, AsyncCallback callback, object state)2698 protected override IAsyncResult BeginReceive(IInputSessionChannel channel, AsyncCallback callback, object state) 2699 { 2700 return channel.BeginReceive(callback, state); 2701 } 2702 BeginReceive(IInputSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)2703 protected override IAsyncResult BeginReceive(IInputSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state) 2704 { 2705 return channel.BeginReceive(timeout, callback, state); 2706 } 2707 CreateChannel(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)2708 protected override IInputSessionChannel CreateChannel(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage) 2709 { 2710 return new InputSessionChannelWrapper(channelManager, innerChannel, firstMessage); 2711 } 2712 EndpointNotFound(IInputSessionChannel channel, Message message)2713 protected override void EndpointNotFound(IInputSessionChannel channel, Message message) 2714 { 2715 if (this.DemuxFailureHandler != null) 2716 { 2717 this.DemuxFailureHandler.HandleDemuxFailure(message); 2718 } 2719 this.AbortItem(message); 2720 channel.Abort(); 2721 } 2722 EndReceive(IInputSessionChannel channel, IAsyncResult result)2723 protected override Message EndReceive(IInputSessionChannel channel, IAsyncResult result) 2724 { 2725 return channel.EndReceive(result); 2726 } 2727 GetMessage(Message message)2728 protected override Message GetMessage(Message message) 2729 { 2730 return message; 2731 } 2732 } 2733 2734 class InputSessionChannelWrapper : InputChannelWrapper, IInputSessionChannel 2735 { InputSessionChannelWrapper(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)2736 public InputSessionChannelWrapper(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage) 2737 : base(channelManager, innerChannel, firstMessage) 2738 { 2739 } 2740 2741 new IInputSessionChannel InnerChannel 2742 { 2743 get { return (IInputSessionChannel)base.InnerChannel; } 2744 } 2745 2746 public IInputSession Session 2747 { 2748 get { return this.InnerChannel.Session; } 2749 } 2750 } 2751 2752 class DuplexSessionChannelDemuxer : SessionChannelDemuxer<IDuplexSessionChannel, Message> 2753 { DuplexSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)2754 public DuplexSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions) 2755 : base(context, peekTimeout, maxPendingSessions) 2756 { 2757 } 2758 AbortItem(Message message)2759 protected override void AbortItem(Message message) 2760 { 2761 AbortMessage(message); 2762 } 2763 BeginReceive(IDuplexSessionChannel channel, AsyncCallback callback, object state)2764 protected override IAsyncResult BeginReceive(IDuplexSessionChannel channel, AsyncCallback callback, object state) 2765 { 2766 return channel.BeginReceive(callback, state); 2767 } 2768 BeginReceive(IDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)2769 protected override IAsyncResult BeginReceive(IDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state) 2770 { 2771 return channel.BeginReceive(timeout, callback, state); 2772 } 2773 CreateChannel(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)2774 protected override IDuplexSessionChannel CreateChannel(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage) 2775 { 2776 return new DuplexSessionChannelWrapper(channelManager, innerChannel, firstMessage); 2777 } 2778 EndpointNotFoundCallback(IAsyncResult result)2779 void EndpointNotFoundCallback(IAsyncResult result) 2780 { 2781 if (result.CompletedSynchronously) 2782 { 2783 return; 2784 } 2785 ChannelAndMessageAsyncState channelAndMessage = (ChannelAndMessageAsyncState)result.AsyncState; 2786 bool doAbort = true; 2787 try 2788 { 2789 DuplexSessionDemuxFailureAsyncResult.End(result); 2790 doAbort = false; 2791 } 2792 catch (TimeoutException e) 2793 { 2794 if (TD.SendTimeoutIsEnabled()) 2795 { 2796 TD.SendTimeout(e.Message); 2797 } 2798 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2799 } 2800 catch (CommunicationException e) 2801 { 2802 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2803 } 2804 catch (ObjectDisposedException e) 2805 { 2806 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2807 } 2808 catch (Exception e) 2809 { 2810 if (Fx.IsFatal(e)) throw; 2811 this.HandleUnknownException(e); 2812 } 2813 finally 2814 { 2815 if (doAbort) 2816 { 2817 this.AbortItem(channelAndMessage.message); 2818 channelAndMessage.channel.Abort(); 2819 } 2820 } 2821 } 2822 EndpointNotFound(IDuplexSessionChannel channel, Message message)2823 protected override void EndpointNotFound(IDuplexSessionChannel channel, Message message) 2824 { 2825 bool doAbort = true; 2826 try 2827 { 2828 if (this.DemuxFailureHandler != null) 2829 { 2830 try 2831 { 2832 DuplexSessionDemuxFailureAsyncResult result = new DuplexSessionDemuxFailureAsyncResult(this.DemuxFailureHandler, channel, message, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), new ChannelAndMessageAsyncState(channel, message)); 2833 result.Start(); 2834 if (!result.CompletedSynchronously) 2835 { 2836 doAbort = false; 2837 return; 2838 } 2839 DuplexSessionDemuxFailureAsyncResult.End(result); 2840 doAbort = false; 2841 } 2842 catch (CommunicationException e) 2843 { 2844 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2845 } 2846 catch (TimeoutException e) 2847 { 2848 if (TD.SendTimeoutIsEnabled()) 2849 { 2850 TD.SendTimeout(e.Message); 2851 } 2852 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2853 } 2854 catch (ObjectDisposedException e) 2855 { 2856 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2857 } 2858 catch (Exception e) 2859 { 2860 if (Fx.IsFatal(e)) throw; 2861 this.HandleUnknownException(e); 2862 } 2863 } 2864 } 2865 finally 2866 { 2867 if (doAbort) 2868 { 2869 this.AbortItem(message); 2870 channel.Abort(); 2871 } 2872 } 2873 } 2874 EndReceive(IDuplexSessionChannel channel, IAsyncResult result)2875 protected override Message EndReceive(IDuplexSessionChannel channel, IAsyncResult result) 2876 { 2877 return channel.EndReceive(result); 2878 } 2879 GetMessage(Message message)2880 protected override Message GetMessage(Message message) 2881 { 2882 return message; 2883 } 2884 2885 struct ChannelAndMessageAsyncState 2886 { 2887 public IChannel channel; 2888 public Message message; 2889 ChannelAndMessageAsyncStateSystem.ServiceModel.Channels.DuplexSessionChannelDemuxer.ChannelAndMessageAsyncState2890 public ChannelAndMessageAsyncState(IChannel channel, Message message) 2891 { 2892 this.channel = channel; 2893 this.message = message; 2894 } 2895 } 2896 } 2897 2898 class DuplexSessionChannelWrapper : InputChannelWrapper, IDuplexSessionChannel 2899 { DuplexSessionChannelWrapper(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)2900 public DuplexSessionChannelWrapper(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage) 2901 : base(channelManager, innerChannel, firstMessage) 2902 { 2903 } 2904 2905 new IDuplexSessionChannel InnerChannel 2906 { 2907 get { return (IDuplexSessionChannel)base.InnerChannel; } 2908 } 2909 2910 public IDuplexSession Session 2911 { 2912 get { return InnerChannel.Session; } 2913 } 2914 2915 public EndpointAddress RemoteAddress 2916 { 2917 get { return InnerChannel.RemoteAddress; } 2918 } 2919 2920 public Uri Via 2921 { 2922 get { return InnerChannel.Via; } 2923 } 2924 Send(Message message)2925 public void Send(Message message) 2926 { 2927 this.InnerChannel.Send(message); 2928 } 2929 Send(Message message, TimeSpan timeout)2930 public void Send(Message message, TimeSpan timeout) 2931 { 2932 this.InnerChannel.Send(message, timeout); 2933 } 2934 BeginSend(Message message, AsyncCallback callback, object state)2935 public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) 2936 { 2937 return this.InnerChannel.BeginSend(message, callback, state); 2938 } 2939 BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)2940 public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) 2941 { 2942 return this.InnerChannel.BeginSend(message, timeout, callback, state); 2943 } 2944 EndSend(IAsyncResult result)2945 public void EndSend(IAsyncResult result) 2946 { 2947 this.InnerChannel.EndSend(result); 2948 } 2949 } 2950 2951 class ReplySessionChannelDemuxer : SessionChannelDemuxer<IReplySessionChannel, RequestContext> 2952 { ReplySessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)2953 public ReplySessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions) 2954 : base(context, peekTimeout, maxPendingSessions) 2955 { 2956 } 2957 AbortItem(RequestContext request)2958 protected override void AbortItem(RequestContext request) 2959 { 2960 AbortMessage(request); 2961 request.Abort(); 2962 } 2963 BeginReceive(IReplySessionChannel channel, AsyncCallback callback, object state)2964 protected override IAsyncResult BeginReceive(IReplySessionChannel channel, AsyncCallback callback, object state) 2965 { 2966 return channel.BeginReceiveRequest(callback, state); 2967 } 2968 BeginReceive(IReplySessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)2969 protected override IAsyncResult BeginReceive(IReplySessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state) 2970 { 2971 return channel.BeginReceiveRequest(timeout, callback, state); 2972 } 2973 CreateChannel(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)2974 protected override IReplySessionChannel CreateChannel(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest) 2975 { 2976 return new ReplySessionChannelWrapper(channelManager, innerChannel, firstRequest); 2977 } 2978 EndpointNotFoundCallback(IAsyncResult result)2979 void EndpointNotFoundCallback(IAsyncResult result) 2980 { 2981 if (result.CompletedSynchronously) 2982 { 2983 return; 2984 } 2985 ChannelAndRequestAsyncState channelAndRequest = (ChannelAndRequestAsyncState)result.AsyncState; 2986 bool doAbort = true; 2987 try 2988 { 2989 ReplySessionDemuxFailureAsyncResult.End(result); 2990 doAbort = false; 2991 } 2992 catch (TimeoutException e) 2993 { 2994 if (TD.SendTimeoutIsEnabled()) 2995 { 2996 TD.SendTimeout(e.Message); 2997 } 2998 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 2999 } 3000 catch (CommunicationException e) 3001 { 3002 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 3003 } 3004 catch (ObjectDisposedException e) 3005 { 3006 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 3007 } 3008 catch (Exception e) 3009 { 3010 if (Fx.IsFatal(e)) throw; 3011 this.HandleUnknownException(e); 3012 } 3013 finally 3014 { 3015 if (doAbort) 3016 { 3017 this.AbortItem(channelAndRequest.request); 3018 channelAndRequest.channel.Abort(); 3019 } 3020 } 3021 } 3022 EndpointNotFound(IReplySessionChannel channel, RequestContext request)3023 protected override void EndpointNotFound(IReplySessionChannel channel, RequestContext request) 3024 { 3025 bool doAbort = true; 3026 try 3027 { 3028 if (this.DemuxFailureHandler != null) 3029 { 3030 try 3031 { 3032 ReplySessionDemuxFailureAsyncResult result = new ReplySessionDemuxFailureAsyncResult(this.DemuxFailureHandler, request, channel, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), new ChannelAndRequestAsyncState(channel, request)); 3033 result.Start(); 3034 if (!result.CompletedSynchronously) 3035 { 3036 doAbort = false; 3037 return; 3038 } 3039 ReplySessionDemuxFailureAsyncResult.End(result); 3040 doAbort = false; 3041 } 3042 catch (CommunicationException e) 3043 { 3044 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 3045 } 3046 catch (TimeoutException e) 3047 { 3048 if (TD.SendTimeoutIsEnabled()) 3049 { 3050 TD.SendTimeout(e.Message); 3051 } 3052 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 3053 } 3054 catch (ObjectDisposedException e) 3055 { 3056 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 3057 } 3058 catch (Exception e) 3059 { 3060 if (Fx.IsFatal(e)) throw; 3061 this.HandleUnknownException(e); 3062 } 3063 } 3064 } 3065 finally 3066 { 3067 if (doAbort) 3068 { 3069 this.AbortItem(request); 3070 channel.Abort(); 3071 } 3072 } 3073 } 3074 EndReceive(IReplySessionChannel channel, IAsyncResult result)3075 protected override RequestContext EndReceive(IReplySessionChannel channel, IAsyncResult result) 3076 { 3077 return channel.EndReceiveRequest(result); 3078 } 3079 GetMessage(RequestContext request)3080 protected override Message GetMessage(RequestContext request) 3081 { 3082 return request.RequestMessage; 3083 } 3084 3085 struct ChannelAndRequestAsyncState 3086 { 3087 public IChannel channel; 3088 public RequestContext request; 3089 ChannelAndRequestAsyncStateSystem.ServiceModel.Channels.ReplySessionChannelDemuxer.ChannelAndRequestAsyncState3090 public ChannelAndRequestAsyncState(IChannel channel, RequestContext request) 3091 { 3092 this.channel = channel; 3093 this.request = request; 3094 } 3095 } 3096 } 3097 3098 class ReplySessionChannelWrapper : ReplyChannelWrapper, IReplySessionChannel 3099 { ReplySessionChannelWrapper(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)3100 public ReplySessionChannelWrapper(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest) 3101 : base(channelManager, innerChannel, firstRequest) 3102 { 3103 } 3104 3105 new IReplySessionChannel InnerChannel 3106 { 3107 get { return (IReplySessionChannel)base.InnerChannel; } 3108 } 3109 3110 public IInputSession Session 3111 { 3112 get { return this.InnerChannel.Session; } 3113 } 3114 } 3115 3116 abstract class ChannelWrapper<TChannel, TItem> : LayeredChannel<TChannel> 3117 where TChannel : class, IChannel 3118 where TItem : class, IDisposable 3119 { 3120 TItem firstItem; 3121 ChannelWrapper(ChannelManagerBase channelManager, TChannel innerChannel, TItem firstItem)3122 public ChannelWrapper(ChannelManagerBase channelManager, TChannel innerChannel, TItem firstItem) 3123 : base(channelManager, innerChannel) 3124 { 3125 this.firstItem = firstItem; 3126 } 3127 CloseFirstItem(TimeSpan timeout)3128 protected abstract void CloseFirstItem(TimeSpan timeout); 3129 GetFirstItem()3130 protected TItem GetFirstItem() 3131 { 3132 return Interlocked.Exchange<TItem>(ref this.firstItem, null); 3133 } 3134 HaveFirstItem()3135 protected bool HaveFirstItem() 3136 { 3137 return (this.firstItem != null); 3138 } 3139 OnAbort()3140 protected override void OnAbort() 3141 { 3142 base.OnAbort(); 3143 this.CloseFirstItem(TimeSpan.Zero); 3144 } 3145 OnClose(TimeSpan timeout)3146 protected override void OnClose(TimeSpan timeout) 3147 { 3148 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 3149 this.CloseFirstItem(timeoutHelper.RemainingTime()); 3150 base.OnClose(timeoutHelper.RemainingTime()); 3151 } 3152 OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)3153 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) 3154 { 3155 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 3156 this.CloseFirstItem(timeoutHelper.RemainingTime()); 3157 return base.OnBeginClose(timeoutHelper.RemainingTime(), callback, state); 3158 } 3159 OnEndClose(IAsyncResult result)3160 protected override void OnEndClose(IAsyncResult result) 3161 { 3162 base.OnEndClose(result); 3163 } 3164 3165 protected class ReceiveAsyncResult : AsyncResult 3166 { 3167 TItem item; 3168 ReceiveAsyncResult(TItem item, AsyncCallback callback, object state)3169 public ReceiveAsyncResult(TItem item, AsyncCallback callback, object state) 3170 : base(callback, state) 3171 { 3172 this.item = item; 3173 this.Complete(true); 3174 } 3175 End(IAsyncResult result)3176 public static TItem End(IAsyncResult result) 3177 { 3178 ReceiveAsyncResult receiveResult = AsyncResult.End<ReceiveAsyncResult>(result); 3179 return receiveResult.item; 3180 } 3181 } 3182 3183 protected class WaitAsyncResult : AsyncResult 3184 { WaitAsyncResult(AsyncCallback callback, object state)3185 public WaitAsyncResult(AsyncCallback callback, object state) 3186 : base(callback, state) 3187 { 3188 this.Complete(true); 3189 } 3190 End(IAsyncResult result)3191 public static bool End(IAsyncResult result) 3192 { 3193 WaitAsyncResult waitResult = AsyncResult.End<WaitAsyncResult>(result); 3194 return true; 3195 } 3196 } 3197 } 3198 3199 class InputChannelWrapper : ChannelWrapper<IInputChannel, Message>, IInputChannel 3200 { InputChannelWrapper(ChannelManagerBase channelManager, IInputChannel innerChannel, Message firstMessage)3201 public InputChannelWrapper(ChannelManagerBase channelManager, IInputChannel innerChannel, Message firstMessage) 3202 : base(channelManager, innerChannel, firstMessage) 3203 { 3204 } 3205 3206 public EndpointAddress LocalAddress 3207 { 3208 get { return this.InnerChannel.LocalAddress; } 3209 } 3210 OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)3211 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 3212 { 3213 return new CompletedAsyncResult(callback, state); 3214 } 3215 OnEndOpen(IAsyncResult result)3216 protected override void OnEndOpen(IAsyncResult result) 3217 { 3218 CompletedAsyncResult.End(result); 3219 } 3220 OnOpen(TimeSpan timeout)3221 protected override void OnOpen(TimeSpan timeout) 3222 { 3223 } 3224 CloseFirstItem(TimeSpan timeout)3225 protected override void CloseFirstItem(TimeSpan timeout) 3226 { 3227 Message message = this.GetFirstItem(); 3228 if (message != null) 3229 { 3230 TypedChannelDemuxer.AbortMessage(message); 3231 } 3232 } 3233 Receive()3234 public Message Receive() 3235 { 3236 Message message = this.GetFirstItem(); 3237 if (message != null) 3238 return message; 3239 return this.InnerChannel.Receive(); 3240 } 3241 Receive(TimeSpan timeout)3242 public Message Receive(TimeSpan timeout) 3243 { 3244 Message message = this.GetFirstItem(); 3245 if (message != null) 3246 return message; 3247 return this.InnerChannel.Receive(timeout); 3248 } 3249 BeginReceive(AsyncCallback callback, object state)3250 public IAsyncResult BeginReceive(AsyncCallback callback, object state) 3251 { 3252 Message message = this.GetFirstItem(); 3253 if (message != null) 3254 return new ReceiveAsyncResult(message, callback, state); 3255 return this.InnerChannel.BeginReceive(callback, state); 3256 } 3257 BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)3258 public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) 3259 { 3260 Message message = this.GetFirstItem(); 3261 if (message != null) 3262 return new ReceiveAsyncResult(message, callback, state); 3263 return this.InnerChannel.BeginReceive(timeout, callback, state); 3264 } 3265 EndReceive(IAsyncResult result)3266 public Message EndReceive(IAsyncResult result) 3267 { 3268 if (result is ReceiveAsyncResult) 3269 return ReceiveAsyncResult.End(result); 3270 return this.InnerChannel.EndReceive(result); 3271 } 3272 TryReceive(TimeSpan timeout, out Message message)3273 public bool TryReceive(TimeSpan timeout, out Message message) 3274 { 3275 message = this.GetFirstItem(); 3276 if (message != null) 3277 return true; 3278 return this.InnerChannel.TryReceive(timeout, out message); 3279 } 3280 BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)3281 public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) 3282 { 3283 Message message = this.GetFirstItem(); 3284 if (message != null) 3285 return new ReceiveAsyncResult(message, callback, state); 3286 return this.InnerChannel.BeginTryReceive(timeout, callback, state); 3287 } 3288 EndTryReceive(IAsyncResult result, out Message message)3289 public bool EndTryReceive(IAsyncResult result, out Message message) 3290 { 3291 if (result is ReceiveAsyncResult) 3292 { 3293 message = ReceiveAsyncResult.End(result); 3294 return true; 3295 } 3296 return this.InnerChannel.EndTryReceive(result, out message); 3297 } 3298 WaitForMessage(TimeSpan timeout)3299 public bool WaitForMessage(TimeSpan timeout) 3300 { 3301 if (this.HaveFirstItem()) 3302 return true; 3303 return this.InnerChannel.WaitForMessage(timeout); 3304 } 3305 BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)3306 public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) 3307 { 3308 if (this.HaveFirstItem()) 3309 return new WaitAsyncResult(callback, state); 3310 return this.InnerChannel.BeginWaitForMessage(timeout, callback, state); 3311 } 3312 EndWaitForMessage(IAsyncResult result)3313 public bool EndWaitForMessage(IAsyncResult result) 3314 { 3315 if (result is WaitAsyncResult) 3316 return WaitAsyncResult.End(result); 3317 return this.InnerChannel.EndWaitForMessage(result); 3318 } 3319 } 3320 3321 class ReplyChannelWrapper : ChannelWrapper<IReplyChannel, RequestContext>, IReplyChannel 3322 { ReplyChannelWrapper(ChannelManagerBase channelManager, IReplyChannel innerChannel, RequestContext firstRequest)3323 public ReplyChannelWrapper(ChannelManagerBase channelManager, IReplyChannel innerChannel, RequestContext firstRequest) 3324 : base(channelManager, innerChannel, firstRequest) 3325 { 3326 } 3327 3328 public EndpointAddress LocalAddress 3329 { 3330 get { return this.InnerChannel.LocalAddress; } 3331 } 3332 OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)3333 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 3334 { 3335 return new CompletedAsyncResult(callback, state); 3336 } 3337 OnEndOpen(IAsyncResult result)3338 protected override void OnEndOpen(IAsyncResult result) 3339 { 3340 CompletedAsyncResult.End(result); 3341 } 3342 OnOpen(TimeSpan timeout)3343 protected override void OnOpen(TimeSpan timeout) 3344 { 3345 } 3346 CloseFirstItem(TimeSpan timeout)3347 protected override void CloseFirstItem(TimeSpan timeout) 3348 { 3349 RequestContext request = this.GetFirstItem(); 3350 if (request != null) 3351 { 3352 try 3353 { 3354 request.RequestMessage.Close(); 3355 request.Close(timeout); 3356 } 3357 catch (CommunicationException e) 3358 { 3359 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 3360 } 3361 catch (TimeoutException e) 3362 { 3363 if (TD.CloseTimeoutIsEnabled()) 3364 { 3365 TD.CloseTimeout(e.Message); 3366 } 3367 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information); 3368 } 3369 } 3370 } 3371 ReceiveRequest()3372 public RequestContext ReceiveRequest() 3373 { 3374 RequestContext request = this.GetFirstItem(); 3375 if (request != null) 3376 return request; 3377 return this.InnerChannel.ReceiveRequest(); 3378 } 3379 ReceiveRequest(TimeSpan timeout)3380 public RequestContext ReceiveRequest(TimeSpan timeout) 3381 { 3382 RequestContext request = this.GetFirstItem(); 3383 if (request != null) 3384 return request; 3385 return this.InnerChannel.ReceiveRequest(timeout); 3386 } 3387 BeginReceiveRequest(AsyncCallback callback, object state)3388 public IAsyncResult BeginReceiveRequest(AsyncCallback callback, object state) 3389 { 3390 RequestContext request = this.GetFirstItem(); 3391 if (request != null) 3392 return new ReceiveAsyncResult(request, callback, state); 3393 return this.InnerChannel.BeginReceiveRequest(callback, state); 3394 } 3395 BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)3396 public IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state) 3397 { 3398 RequestContext request = this.GetFirstItem(); 3399 if (request != null) 3400 return new ReceiveAsyncResult(request, callback, state); 3401 return this.InnerChannel.BeginReceiveRequest(timeout, callback, state); 3402 } 3403 EndReceiveRequest(IAsyncResult result)3404 public RequestContext EndReceiveRequest(IAsyncResult result) 3405 { 3406 if (result is ReceiveAsyncResult) 3407 return ReceiveAsyncResult.End(result); 3408 return this.InnerChannel.EndReceiveRequest(result); 3409 } 3410 TryReceiveRequest(TimeSpan timeout, out RequestContext request)3411 public bool TryReceiveRequest(TimeSpan timeout, out RequestContext request) 3412 { 3413 request = this.GetFirstItem(); 3414 if (request != null) 3415 return true; 3416 return this.InnerChannel.TryReceiveRequest(timeout, out request); 3417 } 3418 BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)3419 public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state) 3420 { 3421 RequestContext request = this.GetFirstItem(); 3422 if (request != null) 3423 return new ReceiveAsyncResult(request, callback, state); 3424 return this.InnerChannel.BeginTryReceiveRequest(timeout, callback, state); 3425 } 3426 EndTryReceiveRequest(IAsyncResult result, out RequestContext request)3427 public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext request) 3428 { 3429 if (result is ReceiveAsyncResult) 3430 { 3431 request = ReceiveAsyncResult.End(result); 3432 return true; 3433 } 3434 return this.InnerChannel.EndTryReceiveRequest(result, out request); 3435 } 3436 WaitForRequest(TimeSpan timeout)3437 public bool WaitForRequest(TimeSpan timeout) 3438 { 3439 if (this.HaveFirstItem()) 3440 return true; 3441 return this.InnerChannel.WaitForRequest(timeout); 3442 } 3443 BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)3444 public IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state) 3445 { 3446 if (this.HaveFirstItem()) 3447 return new WaitAsyncResult(callback, state); 3448 return this.InnerChannel.BeginWaitForRequest(timeout, callback, state); 3449 } 3450 EndWaitForRequest(IAsyncResult result)3451 public bool EndWaitForRequest(IAsyncResult result) 3452 { 3453 if (result is WaitAsyncResult) 3454 return WaitAsyncResult.End(result); 3455 return this.InnerChannel.EndWaitForRequest(result); 3456 } 3457 } 3458 3459 class InputQueueChannelListener<TChannel> : DelegatingChannelListener<TChannel> 3460 where TChannel : class, IChannel 3461 { 3462 ChannelDemuxerFilter filter; 3463 IChannelDemuxer channelDemuxer; 3464 InputQueueChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)3465 public InputQueueChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer) 3466 : base(true) 3467 { 3468 this.filter = filter; 3469 this.channelDemuxer = channelDemuxer; 3470 this.Acceptor = new InputQueueChannelAcceptor<TChannel>(this); 3471 } 3472 3473 public ChannelDemuxerFilter Filter 3474 { 3475 get { return this.filter; } 3476 } 3477 3478 public InputQueueChannelAcceptor<TChannel> InputQueueAcceptor 3479 { 3480 get { return (InputQueueChannelAcceptor<TChannel>)base.Acceptor; } 3481 } 3482 OnOpen(TimeSpan timeout)3483 protected override void OnOpen(TimeSpan timeout) 3484 { 3485 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 3486 this.channelDemuxer.OnOuterListenerOpen(this.filter, this, timeoutHelper.RemainingTime()); 3487 base.OnOpen(timeoutHelper.RemainingTime()); 3488 } 3489 OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)3490 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 3491 { 3492 return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerOpen, this.OnEndOuterListenerOpen, base.OnBeginOpen, base.OnEndOpen); 3493 } 3494 OnEndOpen(IAsyncResult result)3495 protected override void OnEndOpen(IAsyncResult result) 3496 { 3497 ChainedAsyncResult.End(result); 3498 } 3499 OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)3500 IAsyncResult OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state) 3501 { 3502 return this.channelDemuxer.OnBeginOuterListenerOpen(this.filter, this, timeout, callback, state); 3503 } 3504 OnEndOuterListenerOpen(IAsyncResult result)3505 void OnEndOuterListenerOpen(IAsyncResult result) 3506 { 3507 this.channelDemuxer.OnEndOuterListenerOpen(result); 3508 } 3509 OnAbort()3510 protected override void OnAbort() 3511 { 3512 this.channelDemuxer.OnOuterListenerAbort(this.filter); 3513 base.OnAbort(); 3514 } 3515 OnClose(TimeSpan timeout)3516 protected override void OnClose(TimeSpan timeout) 3517 { 3518 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 3519 this.channelDemuxer.OnOuterListenerClose(this.filter, timeoutHelper.RemainingTime()); 3520 base.OnClose(timeoutHelper.RemainingTime()); 3521 } 3522 OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)3523 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) 3524 { 3525 return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerClose, this.OnEndOuterListenerClose, base.OnBeginClose, base.OnEndClose); 3526 } 3527 OnEndClose(IAsyncResult result)3528 protected override void OnEndClose(IAsyncResult result) 3529 { 3530 ChainedAsyncResult.End(result); 3531 } 3532 OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)3533 IAsyncResult OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state) 3534 { 3535 return this.channelDemuxer.OnBeginOuterListenerClose(this.filter, timeout, callback, state); 3536 } 3537 OnEndOuterListenerClose(IAsyncResult result)3538 void OnEndOuterListenerClose(IAsyncResult result) 3539 { 3540 this.channelDemuxer.OnEndOuterListenerClose(result); 3541 } 3542 } 3543 3544 // 3545 // Binding element 3546 // 3547 3548 class ChannelDemuxerBindingElement : BindingElement 3549 { 3550 ChannelDemuxer demuxer; 3551 CachedBindingContextState cachedContextState; 3552 bool cacheContextState; 3553 ChannelDemuxerBindingElement(bool cacheContextState)3554 public ChannelDemuxerBindingElement(bool cacheContextState) 3555 { 3556 this.cacheContextState = cacheContextState; 3557 if (cacheContextState) 3558 { 3559 this.cachedContextState = new CachedBindingContextState(); 3560 } 3561 this.demuxer = new ChannelDemuxer(); 3562 } 3563 ChannelDemuxerBindingElement(ChannelDemuxerBindingElement element)3564 public ChannelDemuxerBindingElement(ChannelDemuxerBindingElement element) 3565 { 3566 this.demuxer = element.demuxer; 3567 this.cacheContextState = element.cacheContextState; 3568 this.cachedContextState = element.cachedContextState; 3569 } 3570 3571 public TimeSpan PeekTimeout 3572 { 3573 get 3574 { 3575 return this.demuxer.PeekTimeout; 3576 } 3577 set 3578 { 3579 if (value < TimeSpan.Zero && value != ChannelDemuxer.UseDefaultReceiveTimeout) 3580 { 3581 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value")); 3582 } 3583 3584 this.demuxer.PeekTimeout = value; 3585 } 3586 } 3587 3588 public int MaxPendingSessions 3589 { 3590 get 3591 { 3592 return this.demuxer.MaxPendingSessions; 3593 } 3594 set 3595 { 3596 if (value < 1) 3597 { 3598 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException(SR.GetString(SR.ValueMustBeGreaterThanZero))); 3599 } 3600 3601 this.demuxer.MaxPendingSessions = value; 3602 } 3603 } 3604 SubstituteCachedBindingContextParametersIfNeeded(BindingContext context)3605 void SubstituteCachedBindingContextParametersIfNeeded(BindingContext context) 3606 { 3607 if (!this.cacheContextState) 3608 { 3609 return; 3610 } 3611 if (!this.cachedContextState.IsStateCached) 3612 { 3613 foreach (object parameter in context.BindingParameters) 3614 { 3615 this.cachedContextState.CachedBindingParameters.Add(parameter); 3616 } 3617 this.cachedContextState.IsStateCached = true; 3618 } 3619 else 3620 { 3621 context.BindingParameters.Clear(); 3622 foreach (object parameter in this.cachedContextState.CachedBindingParameters) 3623 { 3624 context.BindingParameters.Add(parameter); 3625 } 3626 } 3627 } 3628 BuildChannelFactory(BindingContext context)3629 public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) 3630 { 3631 if (context == null) 3632 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context"); 3633 3634 SubstituteCachedBindingContextParametersIfNeeded(context); 3635 return context.BuildInnerChannelFactory<TChannel>(); 3636 } 3637 3638 BuildChannelListener(BindingContext context)3639 public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) 3640 { 3641 if (context == null) 3642 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context"); 3643 ChannelDemuxerFilter filter = context.BindingParameters.Remove<ChannelDemuxerFilter>(); 3644 SubstituteCachedBindingContextParametersIfNeeded(context); 3645 if (filter == null) 3646 return demuxer.BuildChannelListener<TChannel>(context); 3647 else 3648 return demuxer.BuildChannelListener<TChannel>(context, filter); 3649 } 3650 CanBuildChannelFactory(BindingContext context)3651 public override bool CanBuildChannelFactory<TChannel>(BindingContext context) 3652 { 3653 if (context == null) 3654 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context"); 3655 3656 return context.CanBuildInnerChannelFactory<TChannel>(); 3657 } 3658 CanBuildChannelListener(BindingContext context)3659 public override bool CanBuildChannelListener<TChannel>(BindingContext context) 3660 { 3661 if (context == null) 3662 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context"); 3663 3664 return context.CanBuildInnerChannelListener<TChannel>(); 3665 } 3666 Clone()3667 public override BindingElement Clone() 3668 { 3669 return new ChannelDemuxerBindingElement(this); 3670 } 3671 GetProperty(BindingContext context)3672 public override T GetProperty<T>(BindingContext context) 3673 { 3674 // augment the context with cached binding parameters 3675 if (this.cacheContextState && this.cachedContextState.IsStateCached) 3676 { 3677 for (int i = 0; i < this.cachedContextState.CachedBindingParameters.Count; ++i) 3678 { 3679 if (!context.BindingParameters.Contains(this.cachedContextState.CachedBindingParameters[i].GetType())) 3680 { 3681 context.BindingParameters.Add(this.cachedContextState.CachedBindingParameters[i]); 3682 } 3683 } 3684 } 3685 return context.GetInnerProperty<T>(); 3686 } 3687 3688 class CachedBindingContextState 3689 { 3690 public bool IsStateCached; 3691 public BindingParameterCollection CachedBindingParameters; 3692 CachedBindingContextState()3693 public CachedBindingContextState() 3694 { 3695 CachedBindingParameters = new BindingParameterCollection(); 3696 } 3697 } 3698 } 3699 3700 // 3701 // Demuxer filter 3702 // 3703 3704 class ChannelDemuxerFilter 3705 { 3706 MessageFilter filter; 3707 int priority; 3708 ChannelDemuxerFilter(MessageFilter filter, int priority)3709 public ChannelDemuxerFilter(MessageFilter filter, int priority) 3710 { 3711 this.filter = filter; 3712 this.priority = priority; 3713 } 3714 3715 public MessageFilter Filter 3716 { 3717 get { return this.filter; } 3718 } 3719 3720 public int Priority 3721 { 3722 get { return this.priority; } 3723 } 3724 } 3725 3726 class ReplyChannelDemuxFailureAsyncResult : AsyncResult 3727 { 3728 static AsyncCallback demuxFailureHandlerCallback = Fx.ThunkCallback(new AsyncCallback(DemuxFailureHandlerCallback)); 3729 IChannelDemuxFailureHandler demuxFailureHandler; 3730 RequestContext requestContext; 3731 ReplyChannelDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, AsyncCallback callback, object state)3732 public ReplyChannelDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, AsyncCallback callback, object state) 3733 : base(callback, state) 3734 { 3735 if (demuxFailureHandler == null) 3736 { 3737 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("demuxFailureHandler"); 3738 } 3739 if (requestContext == null) 3740 { 3741 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("requestContext"); 3742 } 3743 this.demuxFailureHandler = demuxFailureHandler; 3744 this.requestContext = requestContext; 3745 } 3746 Start()3747 public void Start() 3748 { 3749 IAsyncResult result = this.demuxFailureHandler.BeginHandleDemuxFailure(requestContext.RequestMessage, requestContext, demuxFailureHandlerCallback, this); 3750 if (!result.CompletedSynchronously) 3751 { 3752 return; 3753 } 3754 this.demuxFailureHandler.EndHandleDemuxFailure(result); 3755 if (this.OnDemuxFailureHandled()) 3756 { 3757 Complete(true); 3758 } 3759 } 3760 OnDemuxFailureHandled()3761 protected virtual bool OnDemuxFailureHandled() 3762 { 3763 requestContext.Close(); 3764 return true; 3765 } 3766 DemuxFailureHandlerCallback(IAsyncResult result)3767 static void DemuxFailureHandlerCallback(IAsyncResult result) 3768 { 3769 if (result.CompletedSynchronously) 3770 { 3771 return; 3772 } 3773 ReplyChannelDemuxFailureAsyncResult self = (ReplyChannelDemuxFailureAsyncResult)(result.AsyncState); 3774 bool completeSelf = false; 3775 Exception completionException = null; 3776 try 3777 { 3778 self.demuxFailureHandler.EndHandleDemuxFailure(result); 3779 completeSelf = self.OnDemuxFailureHandled(); 3780 } 3781 #pragma warning suppress 56500 // covered by FxCOP 3782 catch (Exception e) 3783 { 3784 if (Fx.IsFatal(e)) throw; 3785 completeSelf = true; 3786 completionException = e; 3787 } 3788 if (completeSelf) 3789 { 3790 self.Complete(false, completionException); 3791 } 3792 } 3793 End(IAsyncResult result)3794 public static void End(IAsyncResult result) 3795 { 3796 AsyncResult.End<ReplyChannelDemuxFailureAsyncResult>(result); 3797 } 3798 } 3799 3800 class ReplySessionDemuxFailureAsyncResult : ReplyChannelDemuxFailureAsyncResult 3801 { 3802 static AsyncCallback closeChannelCallback = Fx.ThunkCallback(new AsyncCallback(ChannelCloseCallback)); 3803 IReplySessionChannel channel; 3804 ReplySessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, IReplySessionChannel channel, AsyncCallback callback, object state)3805 public ReplySessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, IReplySessionChannel channel, AsyncCallback callback, object state) 3806 : base(demuxFailureHandler, requestContext, callback, state) 3807 { 3808 this.channel = channel; 3809 } 3810 OnDemuxFailureHandled()3811 protected override bool OnDemuxFailureHandled() 3812 { 3813 base.OnDemuxFailureHandled(); 3814 IAsyncResult result = this.channel.BeginClose(closeChannelCallback, this); 3815 if (!result.CompletedSynchronously) 3816 { 3817 return false; 3818 } 3819 this.channel.EndClose(result); 3820 return true; 3821 } 3822 ChannelCloseCallback(IAsyncResult result)3823 static void ChannelCloseCallback(IAsyncResult result) 3824 { 3825 if (result.CompletedSynchronously) 3826 { 3827 return; 3828 } 3829 ReplySessionDemuxFailureAsyncResult self = (ReplySessionDemuxFailureAsyncResult)result.AsyncState; 3830 Exception completionException = null; 3831 try 3832 { 3833 self.channel.EndClose(result); 3834 } 3835 #pragma warning suppress 56500 // covered by FxCOP 3836 catch (Exception e) 3837 { 3838 if (Fx.IsFatal(e)) throw; 3839 completionException = e; 3840 } 3841 self.Complete(false, completionException); 3842 } 3843 End(IAsyncResult result)3844 public static new void End(IAsyncResult result) 3845 { 3846 AsyncResult.End<ReplySessionDemuxFailureAsyncResult>(result); 3847 } 3848 } 3849 3850 class DuplexSessionDemuxFailureAsyncResult : AsyncResult 3851 { 3852 static AsyncCallback demuxFailureHandlerCallback = Fx.ThunkCallback(new AsyncCallback(DemuxFailureHandlerCallback)); 3853 static AsyncCallback channelCloseCallback = Fx.ThunkCallback(new AsyncCallback(ChannelCloseCallback)); 3854 IChannelDemuxFailureHandler demuxFailureHandler; 3855 IDuplexSessionChannel channel; 3856 Message message; 3857 DuplexSessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, IDuplexSessionChannel channel, Message message, AsyncCallback callback, object state)3858 public DuplexSessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, IDuplexSessionChannel channel, Message message, AsyncCallback callback, object state) 3859 : base(callback, state) 3860 { 3861 if (demuxFailureHandler == null) 3862 { 3863 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("demuxFailureHandler"); 3864 } 3865 if (channel == null) 3866 { 3867 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("channel"); 3868 } 3869 this.demuxFailureHandler = demuxFailureHandler; 3870 this.channel = channel; 3871 this.message = message; 3872 } 3873 Start()3874 public void Start() 3875 { 3876 IAsyncResult result = this.demuxFailureHandler.BeginHandleDemuxFailure(this.message, this.channel, demuxFailureHandlerCallback, this); 3877 if (!result.CompletedSynchronously) 3878 { 3879 return; 3880 } 3881 this.demuxFailureHandler.EndHandleDemuxFailure(result); 3882 if (this.OnDemuxFailureHandled()) 3883 { 3884 Complete(true); 3885 } 3886 } 3887 OnDemuxFailureHandled()3888 bool OnDemuxFailureHandled() 3889 { 3890 IAsyncResult result = this.channel.BeginClose(channelCloseCallback, this); 3891 if (!result.CompletedSynchronously) 3892 { 3893 return false; 3894 } 3895 this.channel.EndClose(result); 3896 this.message.Close(); 3897 return true; 3898 } 3899 DemuxFailureHandlerCallback(IAsyncResult result)3900 static void DemuxFailureHandlerCallback(IAsyncResult result) 3901 { 3902 if (result.CompletedSynchronously) 3903 { 3904 return; 3905 } 3906 DuplexSessionDemuxFailureAsyncResult self = (DuplexSessionDemuxFailureAsyncResult)result.AsyncState; 3907 bool completeSelf = false; 3908 Exception completionException = null; 3909 try 3910 { 3911 self.demuxFailureHandler.EndHandleDemuxFailure(result); 3912 completeSelf = self.OnDemuxFailureHandled(); 3913 } 3914 #pragma warning suppress 56500 // covered by FxCOP 3915 catch (Exception e) 3916 { 3917 if (Fx.IsFatal(e)) throw; 3918 completeSelf = true; 3919 completionException = e; 3920 } 3921 if (completeSelf) 3922 { 3923 self.Complete(false, completionException); 3924 } 3925 } 3926 ChannelCloseCallback(IAsyncResult result)3927 static void ChannelCloseCallback(IAsyncResult result) 3928 { 3929 if (result.CompletedSynchronously) 3930 { 3931 return; 3932 } 3933 DuplexSessionDemuxFailureAsyncResult self = (DuplexSessionDemuxFailureAsyncResult)result.AsyncState; 3934 Exception completionException = null; 3935 try 3936 { 3937 self.channel.EndClose(result); 3938 self.message.Close(); 3939 } 3940 #pragma warning suppress 56500 // covered by FxCOP 3941 catch (Exception e) 3942 { 3943 if (Fx.IsFatal(e)) throw; 3944 completionException = e; 3945 } 3946 self.Complete(false, completionException); 3947 } 3948 End(IAsyncResult result)3949 public static void End(IAsyncResult result) 3950 { 3951 AsyncResult.End<DuplexSessionDemuxFailureAsyncResult>(result); 3952 } 3953 } 3954 3955 interface IChannelDemuxFailureHandler 3956 { HandleDemuxFailure(Message message)3957 void HandleDemuxFailure(Message message); 3958 BeginHandleDemuxFailure(Message message, RequestContext faultContext, AsyncCallback callback, object state)3959 IAsyncResult BeginHandleDemuxFailure(Message message, RequestContext faultContext, AsyncCallback callback, object state); BeginHandleDemuxFailure(Message message, IOutputChannel faultContext, AsyncCallback callback, object state)3960 IAsyncResult BeginHandleDemuxFailure(Message message, IOutputChannel faultContext, AsyncCallback callback, object state); EndHandleDemuxFailure(IAsyncResult result)3961 void EndHandleDemuxFailure(IAsyncResult result); 3962 } 3963 } 3964