1 //---------------------------------------------------------------- 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //---------------------------------------------------------------- 4 namespace System.ServiceModel.Activities.Dispatcher 5 { 6 using System.Activities.Hosting; 7 using System.Collections.Generic; 8 using System.Collections.ObjectModel; 9 using System.Runtime; 10 using System.Runtime.DurableInstancing; 11 using System.ServiceModel.Channels; 12 using System.Threading; 13 14 sealed class BufferedReceiveManager : IExtension<ServiceHostBase> 15 { 16 static AsyncCallback onEndAbandon; 17 Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>> bufferedProperties; 18 PendingMessageThrottle throttle; 19 WorkflowServiceHost host; 20 21 int initialized; 22 23 [Fx.Tag.SynchronizationObject(Blocking = false)] 24 object thisLock; 25 BufferedReceiveManager(int maxPendingMessagesPerChannel)26 public BufferedReceiveManager(int maxPendingMessagesPerChannel) 27 { 28 this.throttle = new PendingMessageThrottle(maxPendingMessagesPerChannel); 29 this.thisLock = new object(); 30 } 31 BufferReceive(OperationContext operationContext, ReceiveContext receiveContext, string bookmarkName, BufferedReceiveState state, bool retry)32 public bool BufferReceive(OperationContext operationContext, ReceiveContext receiveContext, string bookmarkName, BufferedReceiveState state, bool retry) 33 { 34 Fx.Assert(receiveContext != null, "ReceiveContext must be present in order to perform buffering"); 35 36 bool success = false; 37 38 BufferedReceiveMessageProperty property = null; 39 if (BufferedReceiveMessageProperty.TryGet(operationContext.IncomingMessageProperties, out property)) 40 { 41 CorrelationMessageProperty correlation = null; 42 if (CorrelationMessageProperty.TryGet(operationContext.IncomingMessageProperties, out correlation)) 43 { 44 InstanceKey instanceKey = correlation.CorrelationKey; 45 int channelKey = operationContext.Channel.GetHashCode(); 46 if (this.throttle.Acquire(channelKey)) 47 { 48 try 49 { 50 // Tag the property with identifying data to be used during later processing 51 if (UpdateProperty(property, receiveContext, channelKey, bookmarkName, state)) 52 { 53 // Cleanup if we are notified the ReceiveContext faulted underneath us 54 receiveContext.Faulted += delegate(object sender, EventArgs e) 55 { 56 lock (this.thisLock) 57 { 58 if (this.bufferedProperties.ContainsKey(instanceKey)) 59 { 60 if (this.bufferedProperties[instanceKey].Remove(property)) 61 { 62 try 63 { 64 property.RequestContext.DelayClose(false); 65 property.RequestContext.Abort(); 66 } 67 catch (Exception exception) 68 { 69 if (Fx.IsFatal(exception)) 70 { 71 throw; 72 } 73 74 // ---- these exceptions as we are already on the error path 75 } 76 77 this.throttle.Release(channelKey); 78 } 79 } 80 } 81 }; 82 83 // Actual Buffering 84 lock (this.thisLock) 85 { 86 // Optimistic state check in case we just raced with the receiveContext 87 // faulting. If the receiveContext still faults after the state check, the above 88 // cleanup routine will handle things correctly. In both cases, a double-release 89 // of the throttle is protected. 90 if (receiveContext.State == ReceiveContextState.Received) 91 { 92 bool found = false; 93 // if the exception indicates retry-able (such as RetryException), 94 // we will simply retry. This happens when racing with abort and 95 // WF informing the client to retry (BufferedReceiveManager is a 96 // client in this case). 97 if (retry) 98 { 99 property.RequestContext.DelayClose(true); 100 property.RegisterForReplay(operationContext); 101 property.ReplayRequest(); 102 property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext); 103 found = true; 104 } 105 else 106 { 107 ReadOnlyCollection<BookmarkInfo> bookmarks = this.host.DurableInstanceManager.PersistenceProviderDirectory.GetBookmarksForInstance(instanceKey); 108 // Retry in case match the existing bookmark 109 if (bookmarks != null) 110 { 111 for (int i = 0; i < bookmarks.Count; ++i) 112 { 113 BookmarkInfo bookmark = bookmarks[i]; 114 if (bookmark.BookmarkName == bookmarkName) 115 { 116 // Found it so retry... 117 property.RequestContext.DelayClose(true); 118 property.RegisterForReplay(operationContext); 119 property.ReplayRequest(); 120 property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext); 121 found = true; 122 break; 123 } 124 } 125 } 126 } 127 128 if (!found) 129 { 130 List<BufferedReceiveMessageProperty> properties; 131 if (!this.bufferedProperties.TryGetValue(instanceKey, out properties)) 132 { 133 properties = new List<BufferedReceiveMessageProperty>(); 134 this.bufferedProperties.Add(instanceKey, properties); 135 } 136 property.RequestContext.DelayClose(true); 137 property.RegisterForReplay(operationContext); 138 properties.Add(property); 139 } 140 else 141 { 142 this.throttle.Release(channelKey); 143 } 144 success = true; 145 } 146 } 147 } 148 } 149 finally 150 { 151 if (!success) 152 { 153 this.throttle.Release(channelKey); 154 } 155 } 156 } 157 } 158 } 159 160 return success; 161 } 162 Retry(HashSet<InstanceKey> associatedInstances, ReadOnlyCollection<BookmarkInfo> availableBookmarks)163 public void Retry(HashSet<InstanceKey> associatedInstances, ReadOnlyCollection<BookmarkInfo> availableBookmarks) 164 { 165 List<BookmarkInfo> bookmarks = new List<BookmarkInfo>(availableBookmarks); 166 foreach (InstanceKey instanceKey in associatedInstances) 167 { 168 lock (this.thisLock) 169 { 170 if (this.bufferedProperties.ContainsKey(instanceKey)) 171 { 172 List<BufferedReceiveMessageProperty> properties = this.bufferedProperties[instanceKey]; 173 int index = 0; 174 175 while (index < properties.Count && bookmarks.Count > 0) 176 { 177 BufferedReceiveMessageProperty property = properties[index]; 178 179 // Determine if this property is now ready to be processed 180 int channelKey = 0; 181 bool found = false; 182 for (int i = 0; i < bookmarks.Count; ++i) 183 { 184 BookmarkInfo bookmark = (BookmarkInfo)bookmarks[i]; 185 PropertyData data = (PropertyData)property.UserState; 186 if (bookmark.BookmarkName == data.BookmarkName) 187 { 188 // Found it so retry... 189 bookmarks.RemoveAt(i); 190 channelKey = data.ChannelKey; 191 property.ReplayRequest(); 192 property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext); 193 found = true; 194 break; 195 } 196 } 197 198 if (!found) 199 { 200 index++; 201 } 202 else 203 { 204 properties.RemoveAt(index); 205 this.throttle.Release(channelKey); 206 } 207 } 208 } 209 } 210 211 if (bookmarks.Count == 0) 212 { 213 break; 214 } 215 } 216 } 217 AbandonBufferedReceives(HashSet<InstanceKey> associatedInstances)218 public void AbandonBufferedReceives(HashSet<InstanceKey> associatedInstances) 219 { 220 foreach (InstanceKey instanceKey in associatedInstances) 221 { 222 lock (this.thisLock) 223 { 224 if (this.bufferedProperties.ContainsKey(instanceKey)) 225 { 226 foreach (BufferedReceiveMessageProperty property in this.bufferedProperties[instanceKey]) 227 { 228 PropertyData data = (PropertyData)property.UserState; 229 AbandonReceiveContext(data.ReceiveContext); 230 this.throttle.Release(data.ChannelKey); 231 } 232 233 this.bufferedProperties.Remove(instanceKey); 234 } 235 } 236 } 237 } 238 239 // clean up any remaining buffered receives as part of ServiceHost close. AbandonBufferedReceives()240 internal void AbandonBufferedReceives() 241 { 242 lock (this.thisLock) 243 { 244 foreach (List<BufferedReceiveMessageProperty> value in this.bufferedProperties.Values) 245 { 246 foreach (BufferedReceiveMessageProperty property in value) 247 { 248 PropertyData data = (PropertyData)property.UserState; 249 AbandonReceiveContext(data.ReceiveContext); 250 this.throttle.Release(data.ChannelKey); 251 } 252 } 253 this.bufferedProperties.Clear(); 254 } 255 } 256 257 // Best-effort to abandon the receiveContext AbandonReceiveContext(ReceiveContext receiveContext)258 internal static void AbandonReceiveContext(ReceiveContext receiveContext) 259 { 260 if (receiveContext != null) 261 { 262 if (onEndAbandon == null) 263 { 264 onEndAbandon = Fx.ThunkCallback(new AsyncCallback(OnEndAbandon)); 265 } 266 267 try 268 { 269 IAsyncResult result = receiveContext.BeginAbandon( 270 TimeSpan.MaxValue, onEndAbandon, receiveContext); 271 if (result.CompletedSynchronously) 272 { 273 HandleEndAbandon(result); 274 } 275 } 276 catch (Exception exception) 277 { 278 if (Fx.IsFatal(exception)) 279 { 280 throw; 281 } 282 283 // We ---- any Abandon exception - best effort. 284 FxTrace.Exception.AsWarning(exception); 285 } 286 } 287 } 288 HandleEndAbandon(IAsyncResult result)289 static bool HandleEndAbandon(IAsyncResult result) 290 { 291 ReceiveContext receiveContext = (ReceiveContext)result.AsyncState; 292 receiveContext.EndAbandon(result); 293 return true; 294 } 295 OnEndAbandon(IAsyncResult result)296 static void OnEndAbandon(IAsyncResult result) 297 { 298 if (result.CompletedSynchronously) 299 { 300 return; 301 } 302 303 try 304 { 305 HandleEndAbandon(result); 306 } 307 catch (Exception exception) 308 { 309 if (Fx.IsFatal(exception)) 310 { 311 throw; 312 } 313 314 // We ---- any Abandon exception - best effort. 315 FxTrace.Exception.AsWarning(exception); 316 } 317 } 318 Attach(ServiceHostBase owner)319 void IExtension<ServiceHostBase>.Attach(ServiceHostBase owner) 320 { 321 if (owner == null) 322 { 323 throw FxTrace.Exception.AsError(new ArgumentNullException("owner")); 324 } 325 326 if (Interlocked.CompareExchange(ref this.initialized, 1, 0) != 0) 327 { 328 throw FxTrace.Exception.AsError( 329 new InvalidOperationException(SR.BufferedReceiveBehaviorMultipleUse)); 330 } 331 332 owner.ThrowIfClosedOrOpened(); 333 334 Fx.Assert(owner is WorkflowServiceHost, "owner must be of WorkflowServiceHost type!"); 335 this.host = (WorkflowServiceHost)owner; 336 Initialize(); 337 } 338 Detach(ServiceHostBase owner)339 void IExtension<ServiceHostBase>.Detach(ServiceHostBase owner) 340 { 341 } 342 UpdateProperty(BufferedReceiveMessageProperty property, ReceiveContext receiveContext, int channelKey, string bookmarkName, BufferedReceiveState state)343 bool UpdateProperty(BufferedReceiveMessageProperty property, ReceiveContext receiveContext, int channelKey, string bookmarkName, BufferedReceiveState state) 344 { 345 // If there's data already there make sure the state is allowed 346 if (property.UserState == null) 347 { 348 property.UserState = new PropertyData() 349 { 350 ReceiveContext = receiveContext, 351 ChannelKey = channelKey, 352 BookmarkName = bookmarkName, 353 State = state 354 }; 355 } 356 else 357 { 358 PropertyData data = (PropertyData)property.UserState; 359 360 // We should not buffer twice at the same state 361 if (data.State == state) 362 { 363 return false; 364 } 365 366 data.State = state; 367 } 368 369 return true; 370 } 371 Initialize()372 void Initialize() 373 { 374 this.bufferedProperties = new Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>>(); 375 } 376 377 class PendingMessageThrottle 378 { 379 [Fx.Tag.SynchronizationObject(Blocking = false)] 380 Dictionary<int, ThrottleEntry> pendingMessages; 381 382 int maxPendingMessagesPerChannel; 383 int warningRestoreLimit; 384 PendingMessageThrottle(int maxPendingMessagesPerChannel)385 public PendingMessageThrottle(int maxPendingMessagesPerChannel) 386 { 387 this.maxPendingMessagesPerChannel = maxPendingMessagesPerChannel; 388 this.warningRestoreLimit = (int)Math.Floor(0.7 * (double)maxPendingMessagesPerChannel); 389 this.pendingMessages = new Dictionary<int, ThrottleEntry>(); 390 } 391 Acquire(int channelKey)392 public bool Acquire(int channelKey) 393 { 394 lock (this.pendingMessages) 395 { 396 if (!this.pendingMessages.ContainsKey(channelKey)) 397 { 398 this.pendingMessages.Add(channelKey, new ThrottleEntry()); 399 } 400 401 ThrottleEntry entry = this.pendingMessages[channelKey]; 402 if (entry.Count < this.maxPendingMessagesPerChannel) 403 { 404 entry.Count++; 405 if (TD.PendingMessagesPerChannelRatioIsEnabled()) 406 { 407 TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel); 408 } 409 return true; 410 } 411 else 412 { 413 if (TD.MaxPendingMessagesPerChannelExceededIsEnabled()) 414 { 415 if (!entry.WarningIssued) 416 { 417 TD.MaxPendingMessagesPerChannelExceeded(this.maxPendingMessagesPerChannel); 418 entry.WarningIssued = true; 419 } 420 } 421 422 return false; 423 } 424 } 425 } 426 Release(int channelKey)427 public void Release(int channelKey) 428 { 429 lock (this.pendingMessages) 430 { 431 ThrottleEntry entry = this.pendingMessages[channelKey]; 432 Fx.Assert(entry.Count > 0, "The pending message throttle was released too many times"); 433 434 entry.Count--; 435 if (TD.PendingMessagesPerChannelRatioIsEnabled()) 436 { 437 TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel); 438 } 439 if (entry.Count == 0) 440 { 441 this.pendingMessages.Remove(channelKey); 442 } 443 else if (entry.Count < this.warningRestoreLimit) 444 { 445 entry.WarningIssued = false; 446 } 447 } 448 } 449 450 class ThrottleEntry 451 { ThrottleEntry()452 public ThrottleEntry() 453 { 454 } 455 456 public bool WarningIssued 457 { 458 get; 459 set; 460 } 461 462 public int Count 463 { 464 get; 465 set; 466 } 467 } 468 } 469 470 class PropertyData 471 { PropertyData()472 public PropertyData() 473 { 474 } 475 476 public ReceiveContext ReceiveContext 477 { 478 get; 479 set; 480 } 481 482 public int ChannelKey 483 { 484 get; 485 set; 486 } 487 488 public string BookmarkName 489 { 490 get; 491 set; 492 } 493 494 public BufferedReceiveState State 495 { 496 get; 497 set; 498 } 499 } 500 } 501 } 502