1 //-----------------------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //-----------------------------------------------------------------------------
4 
5 namespace System.ServiceModel.Dispatcher
6 {
7     using System;
8     using System.Diagnostics;
9     using System.Globalization;
10     using System.Runtime;
11     using System.Runtime.CompilerServices;
12     using System.Runtime.Diagnostics;
13     using System.ServiceModel;
14     using System.ServiceModel.Activation;
15     using System.ServiceModel.Channels;
16     using System.ServiceModel.Description;
17     using System.ServiceModel.Diagnostics;
18     using System.ServiceModel.Diagnostics.Application;
19     using System.Threading;
20     using System.Transactions;
21     using System.Xml;
22     using SessionIdleManager = System.ServiceModel.Channels.ServiceChannel.SessionIdleManager;
23 
24     class ChannelHandler
25     {
26         public static readonly TimeSpan CloseAfterFaultTimeout = TimeSpan.FromSeconds(10);
27         public const string MessageBufferPropertyName = "_RequestMessageBuffer_";
28 
29         readonly IChannelBinder binder;
30         readonly DuplexChannelBinder duplexBinder;
31         readonly ServiceHostBase host;
32         readonly bool incrementedActivityCountInConstructor;
33         readonly bool isCallback;
34         readonly ListenerHandler listener;
35         readonly ServiceThrottle throttle;
36         readonly bool wasChannelThrottled;
37         readonly SessionIdleManager idleManager;
38         readonly bool sendAsynchronously;
39 
40         static AsyncCallback onAsyncReplyComplete = Fx.ThunkCallback(new AsyncCallback(ChannelHandler.OnAsyncReplyComplete));
41         static AsyncCallback onAsyncReceiveComplete = Fx.ThunkCallback(new AsyncCallback(ChannelHandler.OnAsyncReceiveComplete));
42         static Action<object> onContinueAsyncReceive = new Action<object>(ChannelHandler.OnContinueAsyncReceive);
43         static Action<object> onStartSyncMessagePump = new Action<object>(ChannelHandler.OnStartSyncMessagePump);
44         static Action<object> onStartAsyncMessagePump = new Action<object>(ChannelHandler.OnStartAsyncMessagePump);
45         static Action<object> onStartSingleTransactedBatch = new Action<object>(ChannelHandler.OnStartSingleTransactedBatch);
46         static Action<object> openAndEnsurePump = new Action<object>(ChannelHandler.OpenAndEnsurePump);
47 
48         RequestInfo requestInfo;
49         ServiceChannel channel;
50         bool doneReceiving;
51         bool hasRegisterBeenCalled;
52         bool hasSession;
53         int isPumpAcquired;
54         bool isChannelTerminated;
55         bool isConcurrent;
56         bool isManualAddressing;
57         MessageVersion messageVersion;
58         ErrorHandlingReceiver receiver;
59         bool receiveSynchronously;
60         bool receiveWithTransaction;
61         RequestContext replied;
62         RequestContext requestWaitingForThrottle;
63         WrappedTransaction acceptTransaction;
64         ServiceThrottle instanceContextThrottle;
65         SharedTransactedBatchContext sharedTransactedBatchContext;
66         TransactedBatchContext transactedBatchContext;
67         bool isMainTransactedBatchHandler;
68         EventTraceActivity eventTraceActivity;
69         SessionOpenNotification sessionOpenNotification;
70         bool needToCreateSessionOpenNotificationMessage;
71         bool shouldRejectMessageWithOnOpenActionHeader;
72 
ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceChannel channel)73         internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceChannel channel)
74         {
75             ClientRuntime clientRuntime = channel.ClientRuntime;
76 
77             this.messageVersion = messageVersion;
78             this.isManualAddressing = clientRuntime.ManualAddressing;
79             this.binder = binder;
80             this.channel = channel;
81 
82             this.isConcurrent = true;
83             this.duplexBinder = binder as DuplexChannelBinder;
84             this.hasSession = binder.HasSession;
85             this.isCallback = true;
86 
87             DispatchRuntime dispatchRuntime = clientRuntime.DispatchRuntime;
88             if (dispatchRuntime == null)
89             {
90                 this.receiver = new ErrorHandlingReceiver(binder, null);
91             }
92             else
93             {
94                 this.receiver = new ErrorHandlingReceiver(binder, dispatchRuntime.ChannelDispatcher);
95             }
96             this.requestInfo = new RequestInfo(this);
97 
98         }
99 
ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceThrottle throttle, ListenerHandler listener, bool wasChannelThrottled, WrappedTransaction acceptTransaction, SessionIdleManager idleManager)100         internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceThrottle throttle,
101             ListenerHandler listener, bool wasChannelThrottled, WrappedTransaction acceptTransaction, SessionIdleManager idleManager)
102         {
103             ChannelDispatcher channelDispatcher = listener.ChannelDispatcher;
104 
105             this.messageVersion = messageVersion;
106             this.isManualAddressing = channelDispatcher.ManualAddressing;
107             this.binder = binder;
108             this.throttle = throttle;
109             this.listener = listener;
110             this.wasChannelThrottled = wasChannelThrottled;
111 
112             this.host = listener.Host;
113             this.receiveSynchronously = channelDispatcher.ReceiveSynchronously;
114             this.sendAsynchronously = channelDispatcher.SendAsynchronously;
115             this.duplexBinder = binder as DuplexChannelBinder;
116             this.hasSession = binder.HasSession;
117             this.isConcurrent = ConcurrencyBehavior.IsConcurrent(channelDispatcher, this.hasSession);
118 
119             if (channelDispatcher.MaxPendingReceives > 1)
120             {
121                 // We need to preserve order if the ChannelHandler is not concurrent.
122                 this.binder = new MultipleReceiveBinder(
123                     this.binder,
124                     channelDispatcher.MaxPendingReceives,
125                     !this.isConcurrent);
126             }
127 
128             if (channelDispatcher.BufferedReceiveEnabled)
129             {
130                 this.binder = new BufferedReceiveBinder(this.binder);
131             }
132 
133             this.receiver = new ErrorHandlingReceiver(this.binder, channelDispatcher);
134             this.idleManager = idleManager;
135             Fx.Assert((this.idleManager != null) == (this.binder.HasSession && this.listener.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout != TimeSpan.MaxValue), "idle manager is present only when there is a session with a finite receive timeout");
136 
137             if (channelDispatcher.IsTransactedReceive && !channelDispatcher.ReceiveContextEnabled)
138             {
139                 receiveSynchronously = true;
140                 receiveWithTransaction = true;
141 
142                 if (channelDispatcher.MaxTransactedBatchSize > 0)
143                 {
144                     int maxConcurrentBatches = 1;
145                     if (null != throttle && throttle.MaxConcurrentCalls > 1)
146                     {
147                         maxConcurrentBatches = throttle.MaxConcurrentCalls;
148                         foreach (EndpointDispatcher endpointDispatcher in channelDispatcher.Endpoints)
149                         {
150                             if (ConcurrencyMode.Multiple != endpointDispatcher.DispatchRuntime.ConcurrencyMode)
151                             {
152                                 maxConcurrentBatches = 1;
153                                 break;
154                             }
155                         }
156                     }
157 
158                     this.sharedTransactedBatchContext = new SharedTransactedBatchContext(this, channelDispatcher, maxConcurrentBatches);
159                     this.isMainTransactedBatchHandler = true;
160                     this.throttle = null;
161                 }
162             }
163             else if (channelDispatcher.IsTransactedReceive && channelDispatcher.ReceiveContextEnabled && channelDispatcher.MaxTransactedBatchSize > 0)
164             {
165                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.IncompatibleBehaviors)));
166             }
167 
168             if (this.binder.HasSession)
169             {
170                 this.sessionOpenNotification = this.binder.Channel.GetProperty<SessionOpenNotification>();
171                 this.needToCreateSessionOpenNotificationMessage = this.sessionOpenNotification != null && this.sessionOpenNotification.IsEnabled;
172             }
173 
174             this.acceptTransaction = acceptTransaction;
175             this.requestInfo = new RequestInfo(this);
176 
177             if (this.listener.State == CommunicationState.Opened)
178             {
179                 this.listener.ChannelDispatcher.Channels.IncrementActivityCount();
180                 this.incrementedActivityCountInConstructor = true;
181             }
182         }
183 
184 
ChannelHandler(ChannelHandler handler, TransactedBatchContext context)185         internal ChannelHandler(ChannelHandler handler, TransactedBatchContext context)
186         {
187             this.messageVersion = handler.messageVersion;
188             this.isManualAddressing = handler.isManualAddressing;
189             this.binder = handler.binder;
190             this.listener = handler.listener;
191             this.wasChannelThrottled = handler.wasChannelThrottled;
192 
193             this.host = handler.host;
194             this.receiveSynchronously = true;
195             this.receiveWithTransaction = true;
196             this.duplexBinder = handler.duplexBinder;
197             this.hasSession = handler.hasSession;
198             this.isConcurrent = handler.isConcurrent;
199             this.receiver = handler.receiver;
200 
201             this.sharedTransactedBatchContext = context.Shared;
202             this.transactedBatchContext = context;
203             this.requestInfo = new RequestInfo(this);
204 
205             this.sendAsynchronously = handler.sendAsynchronously;
206             this.sessionOpenNotification = handler.sessionOpenNotification;
207             this.needToCreateSessionOpenNotificationMessage = handler.needToCreateSessionOpenNotificationMessage;
208             this.shouldRejectMessageWithOnOpenActionHeader = handler.shouldRejectMessageWithOnOpenActionHeader;
209         }
210 
211         internal IChannelBinder Binder
212         {
213             get { return this.binder; }
214         }
215 
216         internal ServiceChannel Channel
217         {
218             get { return this.channel; }
219         }
220 
221         internal bool HasRegisterBeenCalled
222         {
223             get { return this.hasRegisterBeenCalled; }
224         }
225 
226         internal InstanceContext InstanceContext
227         {
228             get { return (this.channel != null) ? this.channel.InstanceContext : null; }
229         }
230 
231         internal ServiceThrottle InstanceContextServiceThrottle
232         {
233             get
234             {
235                 return this.instanceContextThrottle;
236             }
237             set
238             {
239                 this.instanceContextThrottle = value;
240             }
241         }
242 
243         bool IsOpen
244         {
245             get { return this.binder.Channel.State == CommunicationState.Opened; }
246         }
247 
248         EndpointAddress LocalAddress
249         {
250             get
251             {
252                 if (this.binder != null)
253                 {
254                     IInputChannel input = this.binder.Channel as IInputChannel;
255                     if (input != null)
256                     {
257                         return input.LocalAddress;
258                     }
259 
260                     IReplyChannel reply = this.binder.Channel as IReplyChannel;
261                     if (reply != null)
262                     {
263                         return reply.LocalAddress;
264                     }
265                 }
266 
267                 return null;
268             }
269         }
270 
271         object ThisLock
272         {
273             get { return this; }
274         }
275 
276         EventTraceActivity EventTraceActivity
277         {
278             get
279             {
280                 if (this.eventTraceActivity == null)
281                 {
282                     this.eventTraceActivity = new EventTraceActivity();
283                 }
284                 return this.eventTraceActivity;
285             }
286         }
287 
Register(ChannelHandler handler)288         internal static void Register(ChannelHandler handler)
289         {
290             handler.Register();
291         }
292 
Register(ChannelHandler handler, RequestContext request)293         internal static void Register(ChannelHandler handler, RequestContext request)
294         {
295             BufferedReceiveBinder bufferedBinder = handler.Binder as BufferedReceiveBinder;
296             Fx.Assert(bufferedBinder != null, "ChannelHandler.Binder is not a BufferedReceiveBinder");
297 
298             bufferedBinder.InjectRequest(request);
299             handler.Register();
300         }
301 
Register()302         void Register()
303         {
304             this.hasRegisterBeenCalled = true;
305             if (this.binder.Channel.State == CommunicationState.Created)
306             {
307                 ActionItem.Schedule(openAndEnsurePump, this);
308             }
309             else
310             {
311                 this.EnsurePump();
312             }
313         }
314 
AsyncMessagePump()315         void AsyncMessagePump()
316         {
317             IAsyncResult result = this.BeginTryReceive();
318 
319             if ((result != null) && result.CompletedSynchronously)
320             {
321                 this.AsyncMessagePump(result);
322             }
323         }
324 
AsyncMessagePump(IAsyncResult result)325         void AsyncMessagePump(IAsyncResult result)
326         {
327             if (TD.ChannelReceiveStopIsEnabled())
328             {
329                 TD.ChannelReceiveStop(this.EventTraceActivity, this.GetHashCode());
330             }
331 
332             for (;;)
333             {
334                 RequestContext request;
335 
336                 while (!this.EndTryReceive(result, out request))
337                 {
338                     result = this.BeginTryReceive();
339 
340                     if ((result == null) || !result.CompletedSynchronously)
341                     {
342                         return;
343                     }
344                 }
345 
346                 if (!HandleRequest(request, null))
347                 {
348                     break;
349                 }
350 
351                 if (!TryAcquirePump())
352                 {
353                     break;
354                 }
355 
356                 result = this.BeginTryReceive();
357 
358                 if (result == null || !result.CompletedSynchronously)
359                 {
360                     break;
361                 }
362             }
363         }
364 
BeginTryReceive()365         IAsyncResult BeginTryReceive()
366         {
367             this.requestInfo.Cleanup();
368 
369             if (TD.ChannelReceiveStartIsEnabled())
370             {
371                 TD.ChannelReceiveStart(this.EventTraceActivity, this.GetHashCode());
372             }
373 
374             this.shouldRejectMessageWithOnOpenActionHeader = !this.needToCreateSessionOpenNotificationMessage;
375             if (this.needToCreateSessionOpenNotificationMessage)
376             {
377                 return new CompletedAsyncResult(ChannelHandler.onAsyncReceiveComplete, this);
378             }
379 
380             return this.receiver.BeginTryReceive(TimeSpan.MaxValue, ChannelHandler.onAsyncReceiveComplete, this);
381         }
382 
DispatchAndReleasePump(RequestContext request, bool cleanThread, OperationContext currentOperationContext)383         bool DispatchAndReleasePump(RequestContext request, bool cleanThread, OperationContext currentOperationContext)
384         {
385             ServiceChannel channel = this.requestInfo.Channel;
386             EndpointDispatcher endpoint = this.requestInfo.Endpoint;
387             bool releasedPump = false;
388 
389             try
390             {
391                 DispatchRuntime dispatchBehavior = this.requestInfo.DispatchRuntime;
392 
393                 if (channel == null || dispatchBehavior == null)
394                 {
395                     Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.Dispatch(): (channel == null || dispatchBehavior == null)");
396                     return true;
397                 }
398 
399                 MessageBuffer buffer = null;
400                 Message message;
401 
402                 EventTraceActivity eventTraceActivity = TraceDispatchMessageStart(request.RequestMessage);
403                 AspNetEnvironment.Current.PrepareMessageForDispatch(request.RequestMessage);
404                 if (dispatchBehavior.PreserveMessage)
405                 {
406                     object previousBuffer = null;
407                     if (request.RequestMessage.Properties.TryGetValue(MessageBufferPropertyName, out previousBuffer))
408                     {
409                         buffer = (MessageBuffer)previousBuffer;
410                         message = buffer.CreateMessage();
411                     }
412                     else
413                     {
414                         //
415                         buffer = request.RequestMessage.CreateBufferedCopy(int.MaxValue);
416                         message = buffer.CreateMessage();
417                     }
418                 }
419                 else
420                 {
421                     message = request.RequestMessage;
422                 }
423 
424                 DispatchOperationRuntime operation = dispatchBehavior.GetOperation(ref message);
425                 if (operation == null)
426                 {
427                     Fx.Assert("ChannelHandler.Dispatch (operation == null)");
428                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "No DispatchOperationRuntime found to process message.")));
429                 }
430 
431                 if (this.shouldRejectMessageWithOnOpenActionHeader && message.Headers.Action == OperationDescription.SessionOpenedAction)
432                 {
433                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SFxNoEndpointMatchingAddressForConnectionOpeningMessage, message.Headers.Action, "Open")));
434                 }
435 
436                 if (MessageLogger.LoggingEnabled)
437                 {
438                     MessageLogger.LogMessage(ref message, (operation.IsOneWay ? MessageLoggingSource.ServiceLevelReceiveDatagram : MessageLoggingSource.ServiceLevelReceiveRequest) | MessageLoggingSource.LastChance);
439                 }
440 
441                 if (operation.IsTerminating && this.hasSession)
442                 {
443                     this.isChannelTerminated = true;
444                 }
445 
446                 bool hasOperationContextBeenSet;
447                 if (currentOperationContext != null)
448                 {
449                     hasOperationContextBeenSet = true;
450                     currentOperationContext.ReInit(request, message, channel);
451                 }
452                 else
453                 {
454                     hasOperationContextBeenSet = false;
455                     currentOperationContext = new OperationContext(request, message, channel, this.host);
456                 }
457 
458                 if (dispatchBehavior.PreserveMessage)
459                 {
460                     currentOperationContext.IncomingMessageProperties.Add(MessageBufferPropertyName, buffer);
461                 }
462 
463                 if (currentOperationContext.EndpointDispatcher == null && this.listener != null)
464                 {
465                     currentOperationContext.EndpointDispatcher = endpoint;
466                 }
467 
468                 MessageRpc rpc = new MessageRpc(request, message, operation, channel, this.host,
469                     this, cleanThread, currentOperationContext, this.requestInfo.ExistingInstanceContext, eventTraceActivity);
470 
471                 TraceUtility.MessageFlowAtMessageReceived(message, currentOperationContext, eventTraceActivity, true);
472 
473                 rpc.TransactedBatchContext = this.transactedBatchContext;
474 
475                 // passing responsibility for call throttle to MessageRpc
476                 // (MessageRpc implicitly owns this throttle once it's created)
477                 this.requestInfo.ChannelHandlerOwnsCallThrottle = false;
478                 // explicitly passing responsibility for instance throttle to MessageRpc
479                 rpc.MessageRpcOwnsInstanceContextThrottle = this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle;
480                 this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = false;
481 
482                 // These need to happen before Dispatch but after accessing any ChannelHandler
483                 // state, because we go multi-threaded after this until we reacquire pump mutex.
484                 this.ReleasePump();
485                 releasedPump = true;
486 
487                 return operation.Parent.Dispatch(ref rpc, hasOperationContextBeenSet);
488             }
489             catch (Exception e)
490             {
491                 if (Fx.IsFatal(e))
492                 {
493                     throw;
494                 }
495                 return this.HandleError(e, request, channel);
496             }
497             finally
498             {
499                 if (!releasedPump)
500                 {
501                     this.ReleasePump();
502                 }
503             }
504         }
505 
DispatchDone()506         internal void DispatchDone()
507         {
508             if (this.throttle != null)
509             {
510                 this.throttle.DeactivateCall();
511             }
512         }
513 
GetSessionOpenNotificationRequestContext()514         RequestContext GetSessionOpenNotificationRequestContext()
515         {
516             Fx.Assert(this.sessionOpenNotification != null, "this.sessionOpenNotification should not be null.");
517             Message message = Message.CreateMessage(this.Binder.Channel.GetProperty<MessageVersion>(), OperationDescription.SessionOpenedAction);
518             Fx.Assert(this.LocalAddress != null, "this.LocalAddress should not be null.");
519             message.Headers.To = this.LocalAddress.Uri;
520             this.sessionOpenNotification.UpdateMessageProperties(message.Properties);
521             return this.Binder.CreateRequestContext(message);
522         }
523 
EndTryReceive(IAsyncResult result, out RequestContext requestContext)524         bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
525         {
526             bool valid;
527             if (this.needToCreateSessionOpenNotificationMessage)
528             {
529                 this.needToCreateSessionOpenNotificationMessage = false;
530                 Fx.Assert(result is CompletedAsyncResult, "result must be CompletedAsyncResult");
531                 CompletedAsyncResult.End(result);
532                 requestContext = this.GetSessionOpenNotificationRequestContext();
533                 valid = true;
534             }
535             else
536             {
537                 valid = this.receiver.EndTryReceive(result, out requestContext);
538             }
539 
540             if (valid)
541             {
542                 this.HandleReceiveComplete(requestContext);
543             }
544 
545             return valid;
546         }
547 
EnsureChannelAndEndpoint(RequestContext request)548         void EnsureChannelAndEndpoint(RequestContext request)
549         {
550             this.requestInfo.Channel = this.channel;
551 
552             if (this.requestInfo.Channel == null)
553             {
554                 bool addressMatched;
555                 if (this.hasSession)
556                 {
557                     this.requestInfo.Channel = this.GetSessionChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched);
558                 }
559                 else
560                 {
561                     this.requestInfo.Channel = this.GetDatagramChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched);
562                 }
563 
564                 if (this.requestInfo.Channel == null)
565                 {
566                     this.host.RaiseUnknownMessageReceived(request.RequestMessage);
567                     if (addressMatched)
568                     {
569                         this.ReplyContractFilterDidNotMatch(request);
570                     }
571                     else
572                     {
573                         this.ReplyAddressFilterDidNotMatch(request);
574                     }
575                 }
576             }
577             else
578             {
579                 this.requestInfo.Endpoint = this.requestInfo.Channel.EndpointDispatcher;
580 
581                 //For sessionful contracts, the InstanceContext throttle is not copied over to the channel
582                 //as we create the channel before acquiring the lock
583                 if (this.InstanceContextServiceThrottle != null && this.requestInfo.Channel.InstanceContextServiceThrottle == null)
584                 {
585                     this.requestInfo.Channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle;
586                 }
587             }
588 
589             this.requestInfo.EndpointLookupDone = true;
590 
591             if (this.requestInfo.Channel == null)
592             {
593                 // SFx drops a message here
594                 TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint);
595                 request.Close();
596                 return;
597             }
598 
599             if (this.requestInfo.Channel.HasSession || this.isCallback)
600             {
601                 this.requestInfo.DispatchRuntime = this.requestInfo.Channel.DispatchRuntime;
602             }
603             else
604             {
605                 this.requestInfo.DispatchRuntime = this.requestInfo.Endpoint.DispatchRuntime;
606             }
607         }
608 
EnsurePump()609         void EnsurePump()
610         {
611             if (null == this.sharedTransactedBatchContext || this.isMainTransactedBatchHandler)
612             {
613                 if (TryAcquirePump())
614                 {
615                     if (this.receiveSynchronously)
616                     {
617                         ActionItem.Schedule(ChannelHandler.onStartSyncMessagePump, this);
618                     }
619                     else
620                     {
621                         if (Thread.CurrentThread.IsThreadPoolThread)
622                         {
623                             IAsyncResult result = this.BeginTryReceive();
624                             if ((result != null) && result.CompletedSynchronously)
625                             {
626                                 ActionItem.Schedule(ChannelHandler.onContinueAsyncReceive, result);
627                             }
628                         }
629                         else
630                         {
631                             // Since this is not a threadpool thread, we don't know if this thread will exit
632                             // while the IO is still pending (which would cancel the IO), so we have to get
633                             // over to a threadpool thread which we know will not exit while there is pending IO.
634                             ActionItem.Schedule(ChannelHandler.onStartAsyncMessagePump, this);
635                         }
636                     }
637                 }
638             }
639             else
640             {
641                 ActionItem.Schedule(ChannelHandler.onStartSingleTransactedBatch, this);
642             }
643         }
644 
GetDatagramChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)645         ServiceChannel GetDatagramChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)
646         {
647             addressMatched = false;
648             endpoint = this.GetEndpointDispatcher(message, out addressMatched);
649 
650             if (endpoint == null)
651             {
652                 return null;
653             }
654 
655             if (endpoint.DatagramChannel == null)
656             {
657                 lock (this.listener.ThisLock)
658                 {
659                     if (endpoint.DatagramChannel == null)
660                     {
661                         endpoint.DatagramChannel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager);
662                         this.InitializeServiceChannel(endpoint.DatagramChannel);
663                     }
664                 }
665             }
666 
667             return endpoint.DatagramChannel;
668         }
669 
GetEndpointDispatcher(Message message, out bool addressMatched)670         EndpointDispatcher GetEndpointDispatcher(Message message, out bool addressMatched)
671         {
672             return this.listener.Endpoints.Lookup(message, out addressMatched);
673         }
674 
GetSessionChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)675         ServiceChannel GetSessionChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)
676         {
677             addressMatched = false;
678 
679             if (this.channel == null)
680             {
681                 lock (this.ThisLock)
682                 {
683                     if (this.channel == null)
684                     {
685                         endpoint = this.GetEndpointDispatcher(message, out addressMatched);
686                         if (endpoint != null)
687                         {
688                             this.channel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager);
689                             this.InitializeServiceChannel(this.channel);
690                         }
691                     }
692                 }
693             }
694 
695             if (this.channel == null)
696             {
697                 endpoint = null;
698             }
699             else
700             {
701                 endpoint = this.channel.EndpointDispatcher;
702             }
703             return this.channel;
704         }
705 
InitializeServiceChannel(ServiceChannel channel)706         void InitializeServiceChannel(ServiceChannel channel)
707         {
708             if (this.wasChannelThrottled)
709             {
710                 // TFS#500703, when the idle timeout was hit, the constructor of ServiceChannel will abort itself directly. So
711                 // the session throttle will not be released and thus lead to a service unavailablity.
712                 // Note that if the channel is already aborted, the next line "channel.ServiceThrottle = this.throttle;" will throw an exception,
713                 // so we are not going to do any more work inside this method.
714                 // Ideally we should do a thorough refactoring work for this throttling issue. However, it's too risky as a QFE. We should consider
715                 // this in a whole release.
716                 // Note that the "wasChannelThrottled" boolean will only be true if we aquired the session throttle. So we don't have to check HasSession
717                 // again here.
718                 if (channel.Aborted && this.throttle != null)
719                 {
720                     // This line will release the "session" throttle.
721                     this.throttle.DeactivateChannel();
722                 }
723 
724                 channel.ServiceThrottle = this.throttle;
725             }
726 
727             if (this.InstanceContextServiceThrottle != null)
728             {
729                 channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle;
730             }
731 
732             ClientRuntime clientRuntime = channel.ClientRuntime;
733             if (clientRuntime != null)
734             {
735                 Type contractType = clientRuntime.ContractClientType;
736                 Type callbackType = clientRuntime.CallbackClientType;
737 
738                 if (contractType != null)
739                 {
740                     channel.Proxy = ServiceChannelFactory.CreateProxy(contractType, callbackType, MessageDirection.Output, channel);
741                 }
742             }
743 
744             if (this.listener != null)
745             {
746                 this.listener.ChannelDispatcher.InitializeChannel((IClientChannel)channel.Proxy);
747             }
748 
749             ((IChannel)channel).Open();
750         }
751 
ProvideFault(Exception e, ref ErrorHandlerFaultInfo faultInfo)752         void ProvideFault(Exception e, ref ErrorHandlerFaultInfo faultInfo)
753         {
754             if (this.listener != null)
755             {
756                 this.listener.ChannelDispatcher.ProvideFault(e, this.requestInfo.Channel == null ? this.binder.Channel.GetProperty<FaultConverter>() : this.requestInfo.Channel.GetProperty<FaultConverter>(), ref faultInfo);
757             }
758             else if (this.channel != null)
759             {
760                 DispatchRuntime dispatchBehavior = this.channel.ClientRuntime.CallbackDispatchRuntime;
761                 dispatchBehavior.ChannelDispatcher.ProvideFault(e, this.channel.GetProperty<FaultConverter>(), ref faultInfo);
762             }
763         }
764 
HandleError(Exception e)765         internal bool HandleError(Exception e)
766         {
767             ErrorHandlerFaultInfo dummy = new ErrorHandlerFaultInfo();
768             return this.HandleError(e, ref dummy);
769         }
770 
HandleError(Exception e, ref ErrorHandlerFaultInfo faultInfo)771         bool HandleError(Exception e, ref ErrorHandlerFaultInfo faultInfo)
772         {
773             if (e == null)
774             {
775                 Fx.Assert(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown)));
776                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown))));
777             }
778             if (this.listener != null)
779             {
780                 return listener.ChannelDispatcher.HandleError(e, ref faultInfo);
781             }
782             else if (this.channel != null)
783             {
784                 return this.channel.ClientRuntime.CallbackDispatchRuntime.ChannelDispatcher.HandleError(e, ref faultInfo);
785             }
786             else
787             {
788                 return false;
789             }
790         }
791 
HandleError(Exception e, RequestContext request, ServiceChannel channel)792         bool HandleError(Exception e, RequestContext request, ServiceChannel channel)
793         {
794             ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(this.messageVersion.Addressing.DefaultFaultAction);
795             bool replied, replySentAsync;
796             ProvideFaultAndReplyFailure(request, e, ref faultInfo, out replied, out replySentAsync);
797 
798             if (!replySentAsync)
799             {
800                 return this.HandleErrorContinuation(e, request, channel, ref faultInfo, replied);
801             }
802             else
803             {
804                 return false;
805             }
806         }
807 
HandleErrorContinuation(Exception e, RequestContext request, ServiceChannel channel, ref ErrorHandlerFaultInfo faultInfo, bool replied)808         bool HandleErrorContinuation(Exception e, RequestContext request, ServiceChannel channel, ref ErrorHandlerFaultInfo faultInfo, bool replied)
809         {
810             if (replied)
811             {
812                 try
813                 {
814                     request.Close();
815                 }
816                 catch (Exception e1)
817                 {
818                     if (Fx.IsFatal(e1))
819                     {
820                         throw;
821                     }
822                     this.HandleError(e1);
823                 }
824             }
825             else
826             {
827                 request.Abort();
828             }
829             if (!this.HandleError(e, ref faultInfo) && this.hasSession)
830             {
831                 if (channel != null)
832                 {
833                     if (replied)
834                     {
835                         TimeoutHelper timeoutHelper = new TimeoutHelper(CloseAfterFaultTimeout);
836                         try
837                         {
838                             channel.Close(timeoutHelper.RemainingTime());
839                         }
840                         catch (Exception e2)
841                         {
842                             if (Fx.IsFatal(e2))
843                             {
844                                 throw;
845                             }
846                             this.HandleError(e2);
847                         }
848                         try
849                         {
850                             this.binder.CloseAfterFault(timeoutHelper.RemainingTime());
851                         }
852                         catch (Exception e3)
853                         {
854                             if (Fx.IsFatal(e3))
855                             {
856                                 throw;
857                             }
858                             this.HandleError(e3);
859                         }
860                     }
861                     else
862                     {
863                         channel.Abort();
864                         this.binder.Abort();
865                     }
866                 }
867                 else
868                 {
869                     if (replied)
870                     {
871                         try
872                         {
873                             this.binder.CloseAfterFault(CloseAfterFaultTimeout);
874                         }
875                         catch (Exception e4)
876                         {
877                             if (Fx.IsFatal(e4))
878                             {
879                                 throw;
880                             }
881                             this.HandleError(e4);
882                         }
883                     }
884                     else
885                     {
886                         this.binder.Abort();
887                     }
888                 }
889             }
890 
891             return true;
892         }
893 
HandleReceiveComplete(RequestContext context)894         void HandleReceiveComplete(RequestContext context)
895         {
896             try
897             {
898                 if (this.channel != null)
899                 {
900                     this.channel.HandleReceiveComplete(context);
901                 }
902                 else
903                 {
904                     if (context == null && this.hasSession)
905                     {
906                         bool close;
907                         lock (this.ThisLock)
908                         {
909                             close = !this.doneReceiving;
910                             this.doneReceiving = true;
911                         }
912 
913                         if (close)
914                         {
915                             this.receiver.Close();
916 
917                             if (this.idleManager != null)
918                             {
919                                 this.idleManager.CancelTimer();
920                             }
921 
922                             ServiceThrottle throttle = this.throttle;
923                             if (throttle != null)
924                             {
925                                 throttle.DeactivateChannel();
926                             }
927                         }
928                     }
929                 }
930             }
931             finally
932             {
933                 if ((context == null) && this.incrementedActivityCountInConstructor)
934                 {
935                     this.listener.ChannelDispatcher.Channels.DecrementActivityCount();
936                 }
937             }
938         }
939 
HandleRequest(RequestContext request, OperationContext currentOperationContext)940         bool HandleRequest(RequestContext request, OperationContext currentOperationContext)
941         {
942             if (request == null)
943             {
944                 // channel EOF, stop receiving
945                 return false;
946             }
947 
948             ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? TraceUtility.ExtractActivity(request) : null;
949 
950             using (ServiceModelActivity.BoundOperation(activity))
951             {
952                 if (this.HandleRequestAsReply(request))
953                 {
954                     this.ReleasePump();
955                     return true;
956                 }
957 
958                 if (this.isChannelTerminated)
959                 {
960                     this.ReleasePump();
961                     this.ReplyChannelTerminated(request);
962                     return true;
963                 }
964 
965                 if (this.requestInfo.RequestContext != null)
966                 {
967                     Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.RequestContext != null");
968                 }
969 
970                 this.requestInfo.RequestContext = request;
971 
972                 if (!this.TryAcquireCallThrottle(request))
973                 {
974                     // this.ThrottleAcquiredForCall will be called to continue
975                     return false;
976                 }
977 
978                 // NOTE: from here on down, ensure that this code is the same as ThrottleAcquiredForCall (see 55460)
979                 if (this.requestInfo.ChannelHandlerOwnsCallThrottle)
980                 {
981                     Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsCallThrottle");
982                 }
983                 this.requestInfo.ChannelHandlerOwnsCallThrottle = true;
984 
985                 if (!this.TryRetrievingInstanceContext(request))
986                 {
987                     //Would have replied and close the request.
988                     return true;
989                 }
990 
991                 this.requestInfo.Channel.CompletedIOOperation();
992 
993                 //Only acquire InstanceContext throttle if one doesnt already exist.
994                 if (!this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null)))
995                 {
996                     // this.ThrottleAcquired will be called to continue
997                     return false;
998                 }
999                 if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
1000                 {
1001                     Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
1002                 }
1003                 this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
1004 
1005                 if (!this.DispatchAndReleasePump(request, true, currentOperationContext))
1006                 {
1007                     // this.DispatchDone will be called to continue
1008                     return false;
1009                 }
1010             }
1011             return true;
1012         }
1013 
HandleRequestAsReply(RequestContext request)1014         bool HandleRequestAsReply(RequestContext request)
1015         {
1016             if (this.duplexBinder != null)
1017             {
1018                 if (this.duplexBinder.HandleRequestAsReply(request.RequestMessage))
1019                 {
1020                     return true;
1021                 }
1022             }
1023             return false;
1024         }
1025 
OnStartAsyncMessagePump(object state)1026         static void OnStartAsyncMessagePump(object state)
1027         {
1028             ((ChannelHandler)state).AsyncMessagePump();
1029         }
1030 
OnStartSyncMessagePump(object state)1031         static void OnStartSyncMessagePump(object state)
1032         {
1033             ChannelHandler handler = state as ChannelHandler;
1034 
1035             if (TD.ChannelReceiveStopIsEnabled())
1036             {
1037                 TD.ChannelReceiveStop(handler.EventTraceActivity, state.GetHashCode());
1038             }
1039 
1040             if (handler.receiveWithTransaction)
1041             {
1042                 handler.SyncTransactionalMessagePump();
1043             }
1044             else
1045             {
1046                 handler.SyncMessagePump();
1047             }
1048         }
1049 
OnStartSingleTransactedBatch(object state)1050         static void OnStartSingleTransactedBatch(object state)
1051         {
1052             ChannelHandler handler = state as ChannelHandler;
1053             handler.TransactedBatchLoop();
1054         }
1055 
OnAsyncReceiveComplete(IAsyncResult result)1056         static void OnAsyncReceiveComplete(IAsyncResult result)
1057         {
1058             if (!result.CompletedSynchronously)
1059             {
1060                 ((ChannelHandler)result.AsyncState).AsyncMessagePump(result);
1061             }
1062         }
1063 
OnContinueAsyncReceive(object state)1064         static void OnContinueAsyncReceive(object state)
1065         {
1066             IAsyncResult result = (IAsyncResult)state;
1067             ((ChannelHandler)result.AsyncState).AsyncMessagePump(result);
1068         }
1069 
OpenAndEnsurePump(object state)1070         static void OpenAndEnsurePump(object state)
1071         {
1072             ((ChannelHandler)state).OpenAndEnsurePump();
1073         }
1074 
OpenAndEnsurePump()1075         void OpenAndEnsurePump()
1076         {
1077             Exception exception = null;
1078             try
1079             {
1080                 this.binder.Channel.Open();
1081             }
1082             catch (Exception e)
1083             {
1084                 if (Fx.IsFatal(e))
1085                 {
1086                     throw;
1087                 }
1088                 exception = e;
1089             }
1090 
1091             if (exception != null)
1092             {
1093                 if (DiagnosticUtility.ShouldTraceWarning)
1094                 {
1095                     TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning,
1096                         TraceCode.FailedToOpenIncomingChannel,
1097                         SR.GetString(SR.TraceCodeFailedToOpenIncomingChannel));
1098                 }
1099                 SessionIdleManager idleManager = this.idleManager;
1100                 if (idleManager != null)
1101                 {
1102                     idleManager.CancelTimer();
1103                 }
1104                 if ((this.throttle != null) && this.hasSession)
1105                 {
1106                     this.throttle.DeactivateChannel();
1107                 }
1108 
1109                 bool errorHandled = this.HandleError(exception);
1110 
1111                 if (this.incrementedActivityCountInConstructor)
1112                 {
1113                     this.listener.ChannelDispatcher.Channels.DecrementActivityCount();
1114                 }
1115 
1116                 if (!errorHandled)
1117                 {
1118                     this.binder.Channel.Abort();
1119                 }
1120             }
1121             else
1122             {
1123                 this.EnsurePump();
1124             }
1125         }
1126 
TryReceive(TimeSpan timeout, out RequestContext requestContext)1127         bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
1128         {
1129             this.shouldRejectMessageWithOnOpenActionHeader = !this.needToCreateSessionOpenNotificationMessage;
1130 
1131             bool valid;
1132             if (this.needToCreateSessionOpenNotificationMessage)
1133             {
1134                 this.needToCreateSessionOpenNotificationMessage = false;
1135                 requestContext = this.GetSessionOpenNotificationRequestContext();
1136                 valid = true;
1137             }
1138             else
1139             {
1140                 valid = this.receiver.TryReceive(timeout, out requestContext);
1141             }
1142 
1143             if (valid)
1144             {
1145                 this.HandleReceiveComplete(requestContext);
1146             }
1147 
1148             return valid;
1149         }
1150 
ReplyAddressFilterDidNotMatch(RequestContext request)1151         void ReplyAddressFilterDidNotMatch(RequestContext request)
1152         {
1153             FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.DestinationUnreachable,
1154                 this.messageVersion.Addressing.Namespace);
1155             string reason = SR.GetString(SR.SFxNoEndpointMatchingAddress, request.RequestMessage.Headers.To);
1156 
1157             ReplyFailure(request, code, reason);
1158         }
1159 
ReplyContractFilterDidNotMatch(RequestContext request)1160         void ReplyContractFilterDidNotMatch(RequestContext request)
1161         {
1162             // By default, the contract filter is just a filter over the set of initiating actions in
1163             // the contract, so we do error messages accordingly
1164             AddressingVersion addressingVersion = this.messageVersion.Addressing;
1165             if (addressingVersion != AddressingVersion.None && request.RequestMessage.Headers.Action == null)
1166             {
1167                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
1168                     new MessageHeaderException(
1169                     SR.GetString(SR.SFxMissingActionHeader, addressingVersion.Namespace), AddressingStrings.Action, addressingVersion.Namespace));
1170             }
1171             else
1172             {
1173                 // some of this code is duplicated in DispatchRuntime.UnhandledActionInvoker
1174                 // ideally both places would use FaultConverter and ActionNotSupportedException
1175                 FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.ActionNotSupported,
1176                     this.messageVersion.Addressing.Namespace);
1177                 string reason = SR.GetString(SR.SFxNoEndpointMatchingContract, request.RequestMessage.Headers.Action);
1178                 ReplyFailure(request, code, reason, this.messageVersion.Addressing.FaultAction);
1179             }
1180         }
1181 
ReplyChannelTerminated(RequestContext request)1182         void ReplyChannelTerminated(RequestContext request)
1183         {
1184             FaultCode code = FaultCode.CreateSenderFaultCode(FaultCodeConstants.Codes.SessionTerminated,
1185                 FaultCodeConstants.Namespaces.NetDispatch);
1186             string reason = SR.GetString(SR.SFxChannelTerminated0);
1187             string action = FaultCodeConstants.Actions.NetDispatcher;
1188             Message fault = Message.CreateMessage(this.messageVersion, code, reason, action);
1189             ReplyFailure(request, fault, action, reason, code);
1190         }
1191 
ReplyFailure(RequestContext request, FaultCode code, string reason)1192         void ReplyFailure(RequestContext request, FaultCode code, string reason)
1193         {
1194             string action = this.messageVersion.Addressing.DefaultFaultAction;
1195             ReplyFailure(request, code, reason, action);
1196         }
1197 
ReplyFailure(RequestContext request, FaultCode code, string reason, string action)1198         void ReplyFailure(RequestContext request, FaultCode code, string reason, string action)
1199         {
1200             Message fault = Message.CreateMessage(this.messageVersion, code, reason, action);
1201             ReplyFailure(request, fault, action, reason, code);
1202         }
1203 
ReplyFailure(RequestContext request, Message fault, string action, string reason, FaultCode code)1204         void ReplyFailure(RequestContext request, Message fault, string action, string reason, FaultCode code)
1205         {
1206             FaultException exception = new FaultException(reason, code);
1207             ErrorBehavior.ThrowAndCatch(exception);
1208             ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(action);
1209             faultInfo.Fault = fault;
1210             bool replied, replySentAsync;
1211             ProvideFaultAndReplyFailure(request, exception, ref faultInfo, out replied, out replySentAsync);
1212             this.HandleError(exception, ref faultInfo);
1213         }
1214 
ProvideFaultAndReplyFailure(RequestContext request, Exception exception, ref ErrorHandlerFaultInfo faultInfo, out bool replied, out bool replySentAsync)1215         void ProvideFaultAndReplyFailure(RequestContext request, Exception exception, ref ErrorHandlerFaultInfo faultInfo, out bool replied, out bool replySentAsync)
1216         {
1217             replied = false;
1218             replySentAsync = false;
1219             bool requestMessageIsFault = false;
1220             try
1221             {
1222                 requestMessageIsFault = request.RequestMessage.IsFault;
1223             }
1224 #pragma warning suppress 56500 // covered by FxCOP
1225             catch (Exception e)
1226             {
1227                 if (Fx.IsFatal(e))
1228                 {
1229                     throw;
1230                 }
1231                 // ---- it
1232             }
1233 
1234             bool enableFaults = false;
1235             if (this.listener != null)
1236             {
1237                 enableFaults = this.listener.ChannelDispatcher.EnableFaults;
1238             }
1239             else if (this.channel != null && this.channel.IsClient)
1240             {
1241                 enableFaults = this.channel.ClientRuntime.EnableFaults;
1242             }
1243 
1244             if ((!requestMessageIsFault) && enableFaults)
1245             {
1246                 this.ProvideFault(exception, ref faultInfo);
1247                 if (faultInfo.Fault != null)
1248                 {
1249                     Message reply = faultInfo.Fault;
1250                     try
1251                     {
1252                         try
1253                         {
1254                             if (this.PrepareReply(request, reply))
1255                             {
1256                                 if (this.sendAsynchronously)
1257                                 {
1258                                     var state = new ContinuationState { ChannelHandler = this, Channel = channel, Exception = exception, FaultInfo = faultInfo, Request = request, Reply = reply };
1259                                     var result = request.BeginReply(reply, ChannelHandler.onAsyncReplyComplete, state);
1260                                     if (result.CompletedSynchronously)
1261                                     {
1262                                         ChannelHandler.AsyncReplyComplete(result, state);
1263                                         replied = true;
1264                                     }
1265                                     else
1266                                     {
1267                                         replySentAsync = true;
1268                                     }
1269                                 }
1270                                 else
1271                                 {
1272                                     request.Reply(reply);
1273                                     replied = true;
1274                                 }
1275                             }
1276                         }
1277                         finally
1278                         {
1279                             if (!replySentAsync)
1280                             {
1281                                 reply.Close();
1282                             }
1283                         }
1284                     }
1285 #pragma warning suppress 56500 // covered by FxCOP
1286                     catch (Exception e)
1287                     {
1288                         if (Fx.IsFatal(e))
1289                         {
1290                             throw;
1291                         }
1292                         this.HandleError(e);
1293                     }
1294                 }
1295             }
1296         }
1297 
1298         /// <summary>
1299         /// Prepares a reply that can either be sent asynchronously or synchronously depending on the value of
1300         /// sendAsynchronously
1301         /// </summary>
1302         /// <param name="request">The request context to prepare</param>
1303         /// <param name="reply">The reply to prepare</param>
1304         /// <returns>True if channel is open and prepared reply should be sent; otherwise false.</returns>
PrepareReply(RequestContext request, Message reply)1305         bool PrepareReply(RequestContext request, Message reply)
1306         {
1307             // Ensure we only reply once (we may hit the same error multiple times)
1308             if (this.replied == request)
1309             {
1310                 return false;
1311             }
1312             this.replied = request;
1313 
1314             bool canSendReply = true;
1315 
1316             Message requestMessage = null;
1317             try
1318             {
1319                 requestMessage = request.RequestMessage;
1320             }
1321 #pragma warning suppress 56500 // covered by FxCOP
1322             catch (Exception e)
1323             {
1324                 if (Fx.IsFatal(e))
1325                 {
1326                     throw;
1327                 }
1328                 // ---- it
1329             }
1330             if (!object.ReferenceEquals(requestMessage, null))
1331             {
1332                 UniqueId requestID = null;
1333                 try
1334                 {
1335                     requestID = requestMessage.Headers.MessageId;
1336                 }
1337                 catch (MessageHeaderException)
1338                 {
1339                     // ---- it - we don't need to correlate the reply if the MessageId header is bad
1340                 }
1341                 if (!object.ReferenceEquals(requestID, null) && !this.isManualAddressing)
1342                 {
1343                     System.ServiceModel.Channels.RequestReplyCorrelator.PrepareReply(reply, requestID);
1344                 }
1345                 if (!this.hasSession && !this.isManualAddressing)
1346                 {
1347                     try
1348                     {
1349                         canSendReply = System.ServiceModel.Channels.RequestReplyCorrelator.AddressReply(reply, requestMessage);
1350                     }
1351                     catch (MessageHeaderException)
1352                     {
1353                         // ---- it - we don't need to address the reply if the FaultTo header is bad
1354                     }
1355                 }
1356             }
1357 
1358             // ObjectDisposeException can happen
1359             // if the channel is closed in a different
1360             // thread. 99% this check will avoid false
1361             // exceptions.
1362             return this.IsOpen && canSendReply;
1363         }
1364 
AsyncReplyComplete(IAsyncResult result, ContinuationState state)1365         static void AsyncReplyComplete(IAsyncResult result, ContinuationState state)
1366         {
1367             try
1368             {
1369                 state.Request.EndReply(result);
1370             }
1371             catch (Exception e)
1372             {
1373                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1374 
1375                 if (Fx.IsFatal(e))
1376                 {
1377                     throw;
1378                 }
1379 
1380                 state.ChannelHandler.HandleError(e);
1381             }
1382 
1383             try
1384             {
1385                 state.Reply.Close();
1386             }
1387             catch (Exception e)
1388             {
1389                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1390 
1391                 if (Fx.IsFatal(e))
1392                 {
1393                     throw;
1394                 }
1395 
1396                 state.ChannelHandler.HandleError(e);
1397             }
1398 
1399             try
1400             {
1401                 state.ChannelHandler.HandleErrorContinuation(state.Exception, state.Request, state.Channel, ref state.FaultInfo, true);
1402             }
1403             catch (Exception e)
1404             {
1405                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1406 
1407                 if (Fx.IsFatal(e))
1408                 {
1409                     throw;
1410                 }
1411 
1412                 state.ChannelHandler.HandleError(e);
1413             }
1414 
1415             state.ChannelHandler.EnsurePump();
1416         }
1417 
OnAsyncReplyComplete(IAsyncResult result)1418         static void OnAsyncReplyComplete(IAsyncResult result)
1419         {
1420             if (result.CompletedSynchronously)
1421             {
1422                 return;
1423             }
1424 
1425             try
1426             {
1427                 var state = (ContinuationState)result.AsyncState;
1428                 ChannelHandler.AsyncReplyComplete(result, state);
1429             }
1430             catch (Exception e)
1431             {
1432                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1433 
1434                 if (Fx.IsFatal(e))
1435                 {
1436                     throw;
1437                 }
1438             }
1439         }
1440 
ReleasePump()1441         void ReleasePump()
1442         {
1443             if (this.isConcurrent)
1444             {
1445                 Interlocked.Exchange(ref this.isPumpAcquired, 0);
1446             }
1447         }
1448 
SyncMessagePump()1449         void SyncMessagePump()
1450         {
1451             OperationContext existingOperationContext = OperationContext.Current;
1452             try
1453             {
1454                 OperationContext currentOperationContext = new OperationContext(this.host);
1455                 OperationContext.Current = currentOperationContext;
1456 
1457                 for (;;)
1458                 {
1459                     RequestContext request;
1460 
1461                     this.requestInfo.Cleanup();
1462 
1463                     while (!TryReceive(TimeSpan.MaxValue, out request))
1464                     {
1465                     }
1466 
1467                     if (!HandleRequest(request, currentOperationContext))
1468                     {
1469                         break;
1470                     }
1471 
1472                     if (!TryAcquirePump())
1473                     {
1474                         break;
1475                     }
1476 
1477                     currentOperationContext.Recycle();
1478                 }
1479             }
1480             finally
1481             {
1482                 OperationContext.Current = existingOperationContext;
1483             }
1484         }
1485 
1486         [MethodImpl(MethodImplOptions.NoInlining)]
SyncTransactionalMessagePump()1487         void SyncTransactionalMessagePump()
1488         {
1489             for (;;)
1490             {
1491                 bool completedSynchronously;
1492                 if (null == sharedTransactedBatchContext)
1493                 {
1494                     completedSynchronously = TransactedLoop();
1495                 }
1496                 else
1497                 {
1498                     completedSynchronously = TransactedBatchLoop();
1499                 }
1500 
1501                 if (!completedSynchronously)
1502                 {
1503                     return;
1504                 }
1505             }
1506         }
1507 
TransactedLoop()1508         bool TransactedLoop()
1509         {
1510             try
1511             {
1512                 this.receiver.WaitForMessage();
1513             }
1514             catch (Exception ex)
1515             {
1516                 if (Fx.IsFatal(ex))
1517                 {
1518                     throw;
1519                 }
1520 
1521                 if (!this.HandleError(ex))
1522                 {
1523                     throw;
1524                 }
1525             }
1526 
1527             RequestContext request;
1528             Transaction tx = CreateOrGetAttachedTransaction();
1529             OperationContext existingOperationContext = OperationContext.Current;
1530 
1531             try
1532             {
1533                 OperationContext currentOperationContext = new OperationContext(this.host);
1534                 OperationContext.Current = currentOperationContext;
1535 
1536                 for (;;)
1537                 {
1538                     this.requestInfo.Cleanup();
1539 
1540                     bool received = TryTransactionalReceive(tx, out request);
1541 
1542                     if (!received)
1543                     {
1544                         return IsOpen;
1545                     }
1546 
1547                     if (null == request)
1548                     {
1549                         return false;
1550                     }
1551 
1552                     TransactionMessageProperty.Set(tx, request.RequestMessage);
1553 
1554                     if (!HandleRequest(request, currentOperationContext))
1555                     {
1556                         return false;
1557                     }
1558 
1559                     if (!TryAcquirePump())
1560                     {
1561                         return false;
1562                     }
1563 
1564                     tx = CreateOrGetAttachedTransaction();
1565                     currentOperationContext.Recycle();
1566                 }
1567             }
1568             finally
1569             {
1570                 OperationContext.Current = existingOperationContext;
1571             }
1572         }
1573 
TransactedBatchLoop()1574         bool TransactedBatchLoop()
1575         {
1576             if (null != this.transactedBatchContext)
1577             {
1578                 if (this.transactedBatchContext.InDispatch)
1579                 {
1580                     this.transactedBatchContext.ForceRollback();
1581                     this.transactedBatchContext.InDispatch = false;
1582                 }
1583                 if (!this.transactedBatchContext.IsActive)
1584                 {
1585                     if (!this.isMainTransactedBatchHandler)
1586                     {
1587                         return false;
1588                     }
1589                     this.transactedBatchContext = null;
1590                 }
1591             }
1592 
1593             if (null == this.transactedBatchContext)
1594             {
1595                 try
1596                 {
1597                     this.receiver.WaitForMessage();
1598                 }
1599                 catch (Exception ex)
1600                 {
1601                     if (Fx.IsFatal(ex))
1602                     {
1603                         throw;
1604                     }
1605 
1606                     if (!this.HandleError(ex))
1607                     {
1608                         throw;
1609                     }
1610                 }
1611                 this.transactedBatchContext = this.sharedTransactedBatchContext.CreateTransactedBatchContext();
1612             }
1613 
1614             OperationContext existingOperationContext = OperationContext.Current;
1615 
1616             try
1617             {
1618                 OperationContext currentOperationContext = new OperationContext(this.host);
1619                 OperationContext.Current = currentOperationContext;
1620 
1621                 RequestContext request;
1622 
1623                 while (this.transactedBatchContext.IsActive)
1624                 {
1625                     this.requestInfo.Cleanup();
1626 
1627                     bool valid = TryTransactionalReceive(this.transactedBatchContext.Transaction, out request);
1628 
1629                     if (!valid)
1630                     {
1631                         if (this.IsOpen)
1632                         {
1633                             this.transactedBatchContext.ForceCommit();
1634                             return true;
1635                         }
1636                         else
1637                         {
1638                             this.transactedBatchContext.ForceRollback();
1639                             return false;
1640                         }
1641                     }
1642 
1643                     if (null == request)
1644                     {
1645                         this.transactedBatchContext.ForceRollback();
1646                         return false;
1647                     }
1648 
1649                     TransactionMessageProperty.Set(this.transactedBatchContext.Transaction, request.RequestMessage);
1650 
1651                     this.transactedBatchContext.InDispatch = true;
1652                     if (!HandleRequest(request, currentOperationContext))
1653                     {
1654                         return false;
1655                     }
1656 
1657                     if (this.transactedBatchContext.InDispatch)
1658                     {
1659                         this.transactedBatchContext.ForceRollback();
1660                         this.transactedBatchContext.InDispatch = false;
1661                         return true;
1662                     }
1663 
1664                     if (!TryAcquirePump())
1665                     {
1666                         Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.TransactedBatchLoop(): (TryAcquiredPump returned false)");
1667                         return false;
1668                     }
1669 
1670                     currentOperationContext.Recycle();
1671                 }
1672             }
1673             finally
1674             {
1675                 OperationContext.Current = existingOperationContext;
1676             }
1677             return true;
1678         }
1679 
CreateOrGetAttachedTransaction()1680         Transaction CreateOrGetAttachedTransaction()
1681         {
1682             if (null != this.acceptTransaction)
1683             {
1684                 lock (ThisLock)
1685                 {
1686                     if (null != this.acceptTransaction)
1687                     {
1688                         Transaction tx = this.acceptTransaction.Transaction;
1689                         this.acceptTransaction = null;
1690                         return tx;
1691                     }
1692                 }
1693             }
1694 
1695             if (null != this.InstanceContext && this.InstanceContext.HasTransaction)
1696             {
1697                 return InstanceContext.Transaction.Attached;
1698             }
1699             else
1700             {
1701                 return TransactionBehavior.CreateTransaction(
1702                     this.listener.ChannelDispatcher.TransactionIsolationLevel,
1703                     TransactionBehavior.NormalizeTimeout(this.listener.ChannelDispatcher.TransactionTimeout));
1704             }
1705         }
1706 
1707         // calls receive on the channel; returns false if no message during the "short timeout"
TryTransactionalReceive(Transaction tx, out RequestContext request)1708         bool TryTransactionalReceive(Transaction tx, out RequestContext request)
1709         {
1710             request = null;
1711             bool received = false;
1712 
1713             try
1714             {
1715                 using (TransactionScope scope = new TransactionScope(tx))
1716                 {
1717                     if (null != this.sharedTransactedBatchContext)
1718                     {
1719                         lock (this.sharedTransactedBatchContext.ReceiveLock)
1720                         {
1721                             if (this.transactedBatchContext.AboutToExpire)
1722                             {
1723                                 return false;
1724                             }
1725 
1726                             received = this.receiver.TryReceive(TimeSpan.Zero, out request);
1727                         }
1728                     }
1729                     else
1730                     {
1731                         TimeSpan receiveTimeout = TimeoutHelper.Min(this.listener.ChannelDispatcher.TransactionTimeout, this.listener.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout);
1732                         received = this.receiver.TryReceive(TransactionBehavior.NormalizeTimeout(receiveTimeout), out request);
1733                     }
1734                     scope.Complete();
1735                 }
1736 
1737                 if (received)
1738                 {
1739                     this.HandleReceiveComplete(request);
1740                 }
1741             }
1742             catch (ObjectDisposedException ex) // thrown from the transaction
1743             {
1744                 this.HandleError(ex);
1745                 request = null;
1746                 return false;
1747             }
1748             catch (TransactionException ex)
1749             {
1750                 this.HandleError(ex);
1751                 request = null;
1752                 return false;
1753             }
1754             catch (Exception ex)
1755             {
1756                 if (Fx.IsFatal(ex))
1757                 {
1758                     throw;
1759                 }
1760 
1761                 if (!this.HandleError(ex))
1762                 {
1763                     throw;
1764                 }
1765             }
1766 
1767             return received;
1768         }
1769 
1770         // This callback always occurs async and always on a dirty thread
ThrottleAcquiredForCall()1771         internal void ThrottleAcquiredForCall()
1772         {
1773             RequestContext request = this.requestWaitingForThrottle;
1774             this.requestWaitingForThrottle = null;
1775             if (this.requestInfo.ChannelHandlerOwnsCallThrottle)
1776             {
1777                 Fx.Assert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsCallThrottle");
1778             }
1779             this.requestInfo.ChannelHandlerOwnsCallThrottle = true;
1780 
1781             if (!this.TryRetrievingInstanceContext(request))
1782             {
1783                 //Should reply/close request and also close the pump
1784                 this.EnsurePump();
1785                 return;
1786             }
1787 
1788             this.requestInfo.Channel.CompletedIOOperation();
1789 
1790             if (this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null)))
1791             {
1792                 if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
1793                 {
1794                     Fx.Assert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
1795                 }
1796                 this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
1797 
1798                 if (this.DispatchAndReleasePump(request, false, null))
1799                 {
1800                     this.EnsurePump();
1801                 }
1802             }
1803         }
1804 
TryRetrievingInstanceContext(RequestContext request)1805         bool TryRetrievingInstanceContext(RequestContext request)
1806         {
1807             try
1808             {
1809                 return TryRetrievingInstanceContextCore(request);
1810             }
1811             catch (Exception ex)
1812             {
1813                 if (Fx.IsFatal(ex))
1814                 {
1815                     throw;
1816                 }
1817 
1818                 DiagnosticUtility.TraceHandledException(ex, TraceEventType.Error);
1819 
1820                 try
1821                 {
1822                     request.Close();
1823                 }
1824                 catch (Exception e)
1825                 {
1826                     if (Fx.IsFatal(e))
1827                     {
1828                         throw;
1829                     }
1830 
1831                     request.Abort();
1832                 }
1833 
1834                 return false;
1835             }
1836         }
1837 
1838         //Return: False denotes failure, Caller should discard the request.
1839         //      : True denotes operation is sucessful.
TryRetrievingInstanceContextCore(RequestContext request)1840         bool TryRetrievingInstanceContextCore(RequestContext request)
1841         {
1842             bool releasePump = true;
1843             try
1844             {
1845                 if (!this.requestInfo.EndpointLookupDone)
1846                 {
1847                     this.EnsureChannelAndEndpoint(request);
1848                 }
1849 
1850                 if (this.requestInfo.Channel == null)
1851                 {
1852                     return false;
1853                 }
1854 
1855                 if (this.requestInfo.DispatchRuntime != null)
1856                 {
1857                     IContextChannel transparentProxy = this.requestInfo.Channel.Proxy as IContextChannel;
1858                     try
1859                     {
1860                         this.requestInfo.ExistingInstanceContext = this.requestInfo.DispatchRuntime.InstanceContextProvider.GetExistingInstanceContext(request.RequestMessage, transparentProxy);
1861                         releasePump = false;
1862                     }
1863                     catch (Exception e)
1864                     {
1865                         if (Fx.IsFatal(e))
1866                         {
1867                             throw;
1868                         }
1869                         this.requestInfo.Channel = null;
1870                         this.HandleError(e, request, channel);
1871                         return false;
1872                     }
1873                 }
1874                 else
1875                 {
1876                     // This can happen if we are pumping for an async client,
1877                     // and we receive a bogus reply.  In that case, there is no
1878                     // DispatchRuntime, because we are only expecting replies.
1879                     //
1880                     // One possible fix for this would be in DuplexChannelBinder
1881                     // to drop all messages with a RelatesTo that do not match a
1882                     // pending request.
1883                     //
1884                     // However, that would not fix:
1885                     // (a) we could get a valid request message with a
1886                     // RelatesTo that we should try to process.
1887                     // (b) we could get a reply message that does not have
1888                     // a RelatesTo.
1889                     //
1890                     // So we do the null check here.
1891                     //
1892                     // SFx drops a message here
1893                     TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint);
1894                     request.Close();
1895                     return false;
1896                 }
1897             }
1898             catch (Exception e)
1899             {
1900                 if (Fx.IsFatal(e))
1901                 {
1902                     throw;
1903                 }
1904 
1905                 this.HandleError(e, request, channel);
1906 
1907                 return false;
1908             }
1909             finally
1910             {
1911                 if (releasePump)
1912                 {
1913                     this.ReleasePump();
1914                 }
1915             }
1916             return true;
1917         }
1918 
1919         // This callback always occurs async and always on a dirty thread
ThrottleAcquired()1920         internal void ThrottleAcquired()
1921         {
1922             RequestContext request = this.requestWaitingForThrottle;
1923             this.requestWaitingForThrottle = null;
1924             if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
1925             {
1926                 Fx.Assert("ChannelHandler.ThrottleAcquired: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
1927             }
1928             this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
1929 
1930             if (this.DispatchAndReleasePump(request, false, null))
1931             {
1932                 this.EnsurePump();
1933             }
1934         }
1935 
TryAcquireThrottle(RequestContext request, bool acquireInstanceContextThrottle)1936         bool TryAcquireThrottle(RequestContext request, bool acquireInstanceContextThrottle)
1937         {
1938             ServiceThrottle throttle = this.throttle;
1939             if ((throttle != null) && (throttle.IsActive))
1940             {
1941                 this.requestWaitingForThrottle = request;
1942 
1943                 if (throttle.AcquireInstanceContextAndDynamic(this, acquireInstanceContextThrottle))
1944                 {
1945                     this.requestWaitingForThrottle = null;
1946                     return true;
1947                 }
1948                 else
1949                 {
1950                     return false;
1951                 }
1952             }
1953             else
1954             {
1955                 return true;
1956             }
1957         }
1958 
TryAcquireCallThrottle(RequestContext request)1959         bool TryAcquireCallThrottle(RequestContext request)
1960         {
1961             ServiceThrottle throttle = this.throttle;
1962             if ((throttle != null) && (throttle.IsActive))
1963             {
1964                 this.requestWaitingForThrottle = request;
1965 
1966                 if (throttle.AcquireCall(this))
1967                 {
1968                     this.requestWaitingForThrottle = null;
1969                     return true;
1970                 }
1971                 else
1972                 {
1973                     return false;
1974                 }
1975             }
1976             else
1977             {
1978                 return true;
1979             }
1980         }
1981 
TryAcquirePump()1982         bool TryAcquirePump()
1983         {
1984             if (this.isConcurrent)
1985             {
1986                 return Interlocked.CompareExchange(ref this.isPumpAcquired, 1, 0) == 0;
1987             }
1988 
1989             return true;
1990         }
1991 
1992         struct RequestInfo
1993         {
1994             public EndpointDispatcher Endpoint;
1995             public InstanceContext ExistingInstanceContext;
1996             public ServiceChannel Channel;
1997             public bool EndpointLookupDone;
1998             public DispatchRuntime DispatchRuntime;
1999             public RequestContext RequestContext;
2000             public ChannelHandler ChannelHandler;
2001             public bool ChannelHandlerOwnsCallThrottle; // if true, we are responsible for call throttle
2002             public bool ChannelHandlerOwnsInstanceContextThrottle; // if true, we are responsible for instance/dynamic throttle
2003 
RequestInfoSystem.ServiceModel.Dispatcher.ChannelHandler.RequestInfo2004             public RequestInfo(ChannelHandler channelHandler)
2005             {
2006                 this.Endpoint = null;
2007                 this.ExistingInstanceContext = null;
2008                 this.Channel = null;
2009                 this.EndpointLookupDone = false;
2010                 this.DispatchRuntime = null;
2011                 this.RequestContext = null;
2012                 this.ChannelHandler = channelHandler;
2013                 this.ChannelHandlerOwnsCallThrottle = false;
2014                 this.ChannelHandlerOwnsInstanceContextThrottle = false;
2015             }
2016 
CleanupSystem.ServiceModel.Dispatcher.ChannelHandler.RequestInfo2017             public void Cleanup()
2018             {
2019                 if (this.ChannelHandlerOwnsInstanceContextThrottle)
2020                 {
2021                     this.ChannelHandler.throttle.DeactivateInstanceContext();
2022                     this.ChannelHandlerOwnsInstanceContextThrottle = false;
2023                 }
2024 
2025                 this.Endpoint = null;
2026                 this.ExistingInstanceContext = null;
2027                 this.Channel = null;
2028                 this.EndpointLookupDone = false;
2029                 this.RequestContext = null;
2030                 if (this.ChannelHandlerOwnsCallThrottle)
2031                 {
2032                     this.ChannelHandler.DispatchDone();
2033                     this.ChannelHandlerOwnsCallThrottle = false;
2034                 }
2035             }
2036         }
2037 
TraceDispatchMessageStart(Message message)2038         EventTraceActivity TraceDispatchMessageStart(Message message)
2039         {
2040             if (FxTrace.Trace.IsEnd2EndActivityTracingEnabled && message != null)
2041             {
2042                 EventTraceActivity eventTraceActivity = EventTraceActivityHelper.TryExtractActivity(message);
2043                 if (TD.DispatchMessageStartIsEnabled())
2044                 {
2045                     TD.DispatchMessageStart(eventTraceActivity);
2046                 }
2047                 return eventTraceActivity;
2048             }
2049 
2050             return null;
2051         }
2052 
2053         /// <summary>
2054         /// Data structure used to carry state for asynchronous replies
2055         /// </summary>
2056         struct ContinuationState
2057         {
2058             public ChannelHandler ChannelHandler;
2059             public Exception Exception;
2060             public RequestContext Request;
2061             public Message Reply;
2062             public ServiceChannel Channel;
2063             public ErrorHandlerFaultInfo FaultInfo;
2064         }
2065     }
2066 }
2067