1 //-----------------------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //-----------------------------------------------------------------------------
4 namespace System.ServiceModel.Dispatcher
5 {
6     using System;
7     using System.Collections.Generic;
8     using System.ServiceModel.Diagnostics;
9     using System.Runtime;
10     using System.ServiceModel.Channels;
11     using System.Threading;
12 
13     class BufferedReceiveBinder : IChannelBinder
14     {
15         static Action<object> tryReceive = new Action<object>(BufferedReceiveBinder.TryReceive);
16         static AsyncCallback tryReceiveCallback = Fx.ThunkCallback(new AsyncCallback(TryReceiveCallback));
17 
18         IChannelBinder channelBinder;
19         InputQueue<RequestContextWrapper> inputQueue;
20 
21         [Fx.Tag.SynchronizationObject(Blocking = true, Kind = Fx.Tag.SynchronizationKind.InterlockedNoSpin)]
22         int pendingOperationSemaphore;
23 
BufferedReceiveBinder(IChannelBinder channelBinder)24         public BufferedReceiveBinder(IChannelBinder channelBinder)
25         {
26             this.channelBinder = channelBinder;
27             this.inputQueue = new InputQueue<RequestContextWrapper>();
28         }
29 
30         public IChannel Channel
31         {
32             get { return this.channelBinder.Channel; }
33         }
34 
35         public bool HasSession
36         {
37             get { return this.channelBinder.HasSession; }
38         }
39 
40         public Uri ListenUri
41         {
42             get { return this.channelBinder.ListenUri; }
43         }
44 
45         public EndpointAddress LocalAddress
46         {
47             get { return this.channelBinder.LocalAddress; }
48         }
49 
50         public EndpointAddress RemoteAddress
51         {
52             get { return this.channelBinder.RemoteAddress; }
53         }
54 
Abort()55         public void Abort()
56         {
57             this.inputQueue.Close();
58             this.channelBinder.Abort();
59         }
60 
CloseAfterFault(TimeSpan timeout)61         public void CloseAfterFault(TimeSpan timeout)
62         {
63             this.inputQueue.Close();
64             this.channelBinder.CloseAfterFault(timeout);
65         }
66 
67         // Locking:
68         // Only 1 channelBinder operation call should be active at any given time. All future calls
69         // will wait on the inputQueue. The semaphore is always released right before the Dispatch on the inputQueue.
70         // This protects a new call racing with an existing operation that is just about to fully complete.
71 
TryReceive(TimeSpan timeout, out RequestContext requestContext)72         public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
73         {
74             if (Interlocked.CompareExchange(ref this.pendingOperationSemaphore, 1, 0) == 0)
75             {
76                 ActionItem.Schedule(tryReceive, this);
77             }
78 
79             RequestContextWrapper wrapper;
80             bool success = this.inputQueue.Dequeue(timeout, out wrapper);
81 
82             if (success && wrapper != null)
83             {
84                 requestContext = wrapper.RequestContext;
85             }
86             else
87             {
88                 requestContext = null;
89             }
90 
91             return success;
92         }
93 
BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)94         public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
95         {
96             if (Interlocked.CompareExchange(ref this.pendingOperationSemaphore, 1, 0) == 0)
97             {
98                 IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, tryReceiveCallback, this);
99                 if (result.CompletedSynchronously)
100                 {
101                     HandleEndTryReceive(result);
102                 }
103             }
104 
105             return this.inputQueue.BeginDequeue(timeout, callback, state);
106         }
107 
EndTryReceive(IAsyncResult result, out RequestContext requestContext)108         public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
109         {
110             RequestContextWrapper wrapper;
111             bool success = this.inputQueue.EndDequeue(result, out wrapper);
112 
113             if (success && wrapper != null)
114             {
115                 requestContext = wrapper.RequestContext;
116             }
117             else
118             {
119                 requestContext = null;
120             }
121             return success;
122         }
123 
CreateRequestContext(Message message)124         public RequestContext CreateRequestContext(Message message)
125         {
126             return this.channelBinder.CreateRequestContext(message);
127         }
128 
Send(Message message, TimeSpan timeout)129         public void Send(Message message, TimeSpan timeout)
130         {
131             this.channelBinder.Send(message, timeout);
132         }
133 
BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)134         public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
135         {
136             return this.channelBinder.BeginSend(message, timeout, callback, state);
137         }
138 
EndSend(IAsyncResult result)139         public void EndSend(IAsyncResult result)
140         {
141             this.channelBinder.EndSend(result);
142         }
143 
Request(Message message, TimeSpan timeout)144         public Message Request(Message message, TimeSpan timeout)
145         {
146             return this.channelBinder.Request(message, timeout);
147         }
148 
BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)149         public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
150         {
151             return this.channelBinder.BeginRequest(message, timeout, callback, state);
152         }
153 
EndRequest(IAsyncResult result)154         public Message EndRequest(IAsyncResult result)
155         {
156             return this.channelBinder.EndRequest(result);
157         }
158 
WaitForMessage(TimeSpan timeout)159         public bool WaitForMessage(TimeSpan timeout)
160         {
161             return this.channelBinder.WaitForMessage(timeout);
162         }
163 
BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)164         public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
165         {
166             return this.channelBinder.BeginWaitForMessage(timeout, callback, state);
167         }
168 
EndWaitForMessage(IAsyncResult result)169         public bool EndWaitForMessage(IAsyncResult result)
170         {
171             return this.channelBinder.EndWaitForMessage(result);
172         }
173 
InjectRequest(RequestContext requestContext)174         internal void InjectRequest(RequestContext requestContext)
175         {
176             // Reuse the existing requestContext
177             this.inputQueue.EnqueueAndDispatch(new RequestContextWrapper(requestContext));
178         }
179 
180         //
181         // TryReceive threads
182         //
183 
TryReceive(object state)184         static void TryReceive(object state)
185         {
186             BufferedReceiveBinder binder = (BufferedReceiveBinder)state;
187 
188             RequestContext requestContext;
189             bool requiresDispatch = false;
190             try
191             {
192                 if (binder.channelBinder.TryReceive(TimeSpan.MaxValue, out requestContext))
193                 {
194                     requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null);
195                 }
196             }
197             catch (Exception exception)
198             {
199                 if (Fx.IsFatal(exception))
200                 {
201                     throw;
202                 }
203 
204                 requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(exception, null);
205             }
206             finally
207             {
208                 Interlocked.Exchange(ref binder.pendingOperationSemaphore, 0);
209                 if (requiresDispatch)
210                 {
211                     binder.inputQueue.Dispatch();
212                 }
213             }
214         }
215 
TryReceiveCallback(IAsyncResult result)216         static void TryReceiveCallback(IAsyncResult result)
217         {
218             if (result.CompletedSynchronously)
219             {
220                 return;
221             }
222 
223             HandleEndTryReceive(result);
224         }
225 
HandleEndTryReceive(IAsyncResult result)226         static void HandleEndTryReceive(IAsyncResult result)
227         {
228             BufferedReceiveBinder binder = (BufferedReceiveBinder)result.AsyncState;
229 
230             RequestContext requestContext;
231             bool requiresDispatch = false;
232             try
233             {
234                 if (binder.channelBinder.EndTryReceive(result, out requestContext))
235                 {
236                     requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null);
237                 }
238             }
239             catch (Exception exception)
240             {
241                 if (Fx.IsFatal(exception))
242                 {
243                     throw;
244                 }
245 
246                 requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(exception, null);
247             }
248             finally
249             {
250                 Interlocked.Exchange(ref binder.pendingOperationSemaphore, 0);
251                 if (requiresDispatch)
252                 {
253                     binder.inputQueue.Dispatch();
254                 }
255             }
256         }
257 
258         // A RequestContext may be 'null' (some pieces of ChannelHandler depend on this) but the InputQueue
259         // will not allow null items to be enqueued. Wrap the RequestContexts in another object to
260         // facilitate this semantic
261         class RequestContextWrapper
262         {
RequestContextWrapper(RequestContext requestContext)263             public RequestContextWrapper(RequestContext requestContext)
264             {
265                 this.RequestContext = requestContext;
266             }
267 
268             public RequestContext RequestContext
269             {
270                 get;
271                 private set;
272             }
273         }
274     }
275 }
276