1 //----------------------------------------------------------------------------- 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //----------------------------------------------------------------------------- 4 namespace System.ServiceModel.Dispatcher 5 { 6 using System; 7 using System.Collections.Generic; 8 using System.ServiceModel.Diagnostics; 9 using System.Runtime; 10 using System.ServiceModel.Channels; 11 using System.Threading; 12 13 class BufferedReceiveBinder : IChannelBinder 14 { 15 static Action<object> tryReceive = new Action<object>(BufferedReceiveBinder.TryReceive); 16 static AsyncCallback tryReceiveCallback = Fx.ThunkCallback(new AsyncCallback(TryReceiveCallback)); 17 18 IChannelBinder channelBinder; 19 InputQueue<RequestContextWrapper> inputQueue; 20 21 [Fx.Tag.SynchronizationObject(Blocking = true, Kind = Fx.Tag.SynchronizationKind.InterlockedNoSpin)] 22 int pendingOperationSemaphore; 23 BufferedReceiveBinder(IChannelBinder channelBinder)24 public BufferedReceiveBinder(IChannelBinder channelBinder) 25 { 26 this.channelBinder = channelBinder; 27 this.inputQueue = new InputQueue<RequestContextWrapper>(); 28 } 29 30 public IChannel Channel 31 { 32 get { return this.channelBinder.Channel; } 33 } 34 35 public bool HasSession 36 { 37 get { return this.channelBinder.HasSession; } 38 } 39 40 public Uri ListenUri 41 { 42 get { return this.channelBinder.ListenUri; } 43 } 44 45 public EndpointAddress LocalAddress 46 { 47 get { return this.channelBinder.LocalAddress; } 48 } 49 50 public EndpointAddress RemoteAddress 51 { 52 get { return this.channelBinder.RemoteAddress; } 53 } 54 Abort()55 public void Abort() 56 { 57 this.inputQueue.Close(); 58 this.channelBinder.Abort(); 59 } 60 CloseAfterFault(TimeSpan timeout)61 public void CloseAfterFault(TimeSpan timeout) 62 { 63 this.inputQueue.Close(); 64 this.channelBinder.CloseAfterFault(timeout); 65 } 66 67 // Locking: 68 // Only 1 channelBinder operation call should be active at any given time. All future calls 69 // will wait on the inputQueue. The semaphore is always released right before the Dispatch on the inputQueue. 70 // This protects a new call racing with an existing operation that is just about to fully complete. 71 TryReceive(TimeSpan timeout, out RequestContext requestContext)72 public bool TryReceive(TimeSpan timeout, out RequestContext requestContext) 73 { 74 if (Interlocked.CompareExchange(ref this.pendingOperationSemaphore, 1, 0) == 0) 75 { 76 ActionItem.Schedule(tryReceive, this); 77 } 78 79 RequestContextWrapper wrapper; 80 bool success = this.inputQueue.Dequeue(timeout, out wrapper); 81 82 if (success && wrapper != null) 83 { 84 requestContext = wrapper.RequestContext; 85 } 86 else 87 { 88 requestContext = null; 89 } 90 91 return success; 92 } 93 BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)94 public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) 95 { 96 if (Interlocked.CompareExchange(ref this.pendingOperationSemaphore, 1, 0) == 0) 97 { 98 IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, tryReceiveCallback, this); 99 if (result.CompletedSynchronously) 100 { 101 HandleEndTryReceive(result); 102 } 103 } 104 105 return this.inputQueue.BeginDequeue(timeout, callback, state); 106 } 107 EndTryReceive(IAsyncResult result, out RequestContext requestContext)108 public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext) 109 { 110 RequestContextWrapper wrapper; 111 bool success = this.inputQueue.EndDequeue(result, out wrapper); 112 113 if (success && wrapper != null) 114 { 115 requestContext = wrapper.RequestContext; 116 } 117 else 118 { 119 requestContext = null; 120 } 121 return success; 122 } 123 CreateRequestContext(Message message)124 public RequestContext CreateRequestContext(Message message) 125 { 126 return this.channelBinder.CreateRequestContext(message); 127 } 128 Send(Message message, TimeSpan timeout)129 public void Send(Message message, TimeSpan timeout) 130 { 131 this.channelBinder.Send(message, timeout); 132 } 133 BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)134 public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) 135 { 136 return this.channelBinder.BeginSend(message, timeout, callback, state); 137 } 138 EndSend(IAsyncResult result)139 public void EndSend(IAsyncResult result) 140 { 141 this.channelBinder.EndSend(result); 142 } 143 Request(Message message, TimeSpan timeout)144 public Message Request(Message message, TimeSpan timeout) 145 { 146 return this.channelBinder.Request(message, timeout); 147 } 148 BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)149 public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state) 150 { 151 return this.channelBinder.BeginRequest(message, timeout, callback, state); 152 } 153 EndRequest(IAsyncResult result)154 public Message EndRequest(IAsyncResult result) 155 { 156 return this.channelBinder.EndRequest(result); 157 } 158 WaitForMessage(TimeSpan timeout)159 public bool WaitForMessage(TimeSpan timeout) 160 { 161 return this.channelBinder.WaitForMessage(timeout); 162 } 163 BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)164 public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) 165 { 166 return this.channelBinder.BeginWaitForMessage(timeout, callback, state); 167 } 168 EndWaitForMessage(IAsyncResult result)169 public bool EndWaitForMessage(IAsyncResult result) 170 { 171 return this.channelBinder.EndWaitForMessage(result); 172 } 173 InjectRequest(RequestContext requestContext)174 internal void InjectRequest(RequestContext requestContext) 175 { 176 // Reuse the existing requestContext 177 this.inputQueue.EnqueueAndDispatch(new RequestContextWrapper(requestContext)); 178 } 179 180 // 181 // TryReceive threads 182 // 183 TryReceive(object state)184 static void TryReceive(object state) 185 { 186 BufferedReceiveBinder binder = (BufferedReceiveBinder)state; 187 188 RequestContext requestContext; 189 bool requiresDispatch = false; 190 try 191 { 192 if (binder.channelBinder.TryReceive(TimeSpan.MaxValue, out requestContext)) 193 { 194 requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null); 195 } 196 } 197 catch (Exception exception) 198 { 199 if (Fx.IsFatal(exception)) 200 { 201 throw; 202 } 203 204 requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(exception, null); 205 } 206 finally 207 { 208 Interlocked.Exchange(ref binder.pendingOperationSemaphore, 0); 209 if (requiresDispatch) 210 { 211 binder.inputQueue.Dispatch(); 212 } 213 } 214 } 215 TryReceiveCallback(IAsyncResult result)216 static void TryReceiveCallback(IAsyncResult result) 217 { 218 if (result.CompletedSynchronously) 219 { 220 return; 221 } 222 223 HandleEndTryReceive(result); 224 } 225 HandleEndTryReceive(IAsyncResult result)226 static void HandleEndTryReceive(IAsyncResult result) 227 { 228 BufferedReceiveBinder binder = (BufferedReceiveBinder)result.AsyncState; 229 230 RequestContext requestContext; 231 bool requiresDispatch = false; 232 try 233 { 234 if (binder.channelBinder.EndTryReceive(result, out requestContext)) 235 { 236 requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null); 237 } 238 } 239 catch (Exception exception) 240 { 241 if (Fx.IsFatal(exception)) 242 { 243 throw; 244 } 245 246 requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(exception, null); 247 } 248 finally 249 { 250 Interlocked.Exchange(ref binder.pendingOperationSemaphore, 0); 251 if (requiresDispatch) 252 { 253 binder.inputQueue.Dispatch(); 254 } 255 } 256 } 257 258 // A RequestContext may be 'null' (some pieces of ChannelHandler depend on this) but the InputQueue 259 // will not allow null items to be enqueued. Wrap the RequestContexts in another object to 260 // facilitate this semantic 261 class RequestContextWrapper 262 { RequestContextWrapper(RequestContext requestContext)263 public RequestContextWrapper(RequestContext requestContext) 264 { 265 this.RequestContext = requestContext; 266 } 267 268 public RequestContext RequestContext 269 { 270 get; 271 private set; 272 } 273 } 274 } 275 } 276