1 //----------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //----------------------------------------------------------------
4 namespace System.ServiceModel.Activities.Dispatcher
5 {
6     using System.Activities.Hosting;
7     using System.Collections.Generic;
8     using System.Collections.ObjectModel;
9     using System.Runtime;
10     using System.Runtime.DurableInstancing;
11     using System.ServiceModel.Channels;
12     using System.Threading;
13 
14     sealed class BufferedReceiveManager : IExtension<ServiceHostBase>
15     {
16         static AsyncCallback onEndAbandon;
17         Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>> bufferedProperties;
18         PendingMessageThrottle throttle;
19         WorkflowServiceHost host;
20 
21         int initialized;
22 
23         [Fx.Tag.SynchronizationObject(Blocking = false)]
24         object thisLock;
25 
BufferedReceiveManager(int maxPendingMessagesPerChannel)26         public BufferedReceiveManager(int maxPendingMessagesPerChannel)
27         {
28             this.throttle = new PendingMessageThrottle(maxPendingMessagesPerChannel);
29             this.thisLock = new object();
30         }
31 
BufferReceive(OperationContext operationContext, ReceiveContext receiveContext, string bookmarkName, BufferedReceiveState state, bool retry)32         public bool BufferReceive(OperationContext operationContext, ReceiveContext receiveContext, string bookmarkName, BufferedReceiveState state, bool retry)
33         {
34             Fx.Assert(receiveContext != null, "ReceiveContext must be present in order to perform buffering");
35 
36             bool success = false;
37 
38             BufferedReceiveMessageProperty property = null;
39             if (BufferedReceiveMessageProperty.TryGet(operationContext.IncomingMessageProperties, out property))
40             {
41                 CorrelationMessageProperty correlation = null;
42                 if (CorrelationMessageProperty.TryGet(operationContext.IncomingMessageProperties, out correlation))
43                 {
44                     InstanceKey instanceKey = correlation.CorrelationKey;
45                     int channelKey = operationContext.Channel.GetHashCode();
46                     if (this.throttle.Acquire(channelKey))
47                     {
48                         try
49                         {
50                             // Tag the property with identifying data to be used during later processing
51                             if (UpdateProperty(property, receiveContext, channelKey, bookmarkName, state))
52                             {
53                                 // Cleanup if we are notified the ReceiveContext faulted underneath us
54                                 receiveContext.Faulted += delegate(object sender, EventArgs e)
55                                 {
56                                     lock (this.thisLock)
57                                     {
58                                         if (this.bufferedProperties.ContainsKey(instanceKey))
59                                         {
60                                             if (this.bufferedProperties[instanceKey].Remove(property))
61                                             {
62                                                 try
63                                                 {
64                                                     property.RequestContext.DelayClose(false);
65                                                     property.RequestContext.Abort();
66                                                 }
67                                                 catch (Exception exception)
68                                                 {
69                                                     if (Fx.IsFatal(exception))
70                                                     {
71                                                         throw;
72                                                     }
73 
74                                                     // ---- these exceptions as we are already on the error path
75                                                 }
76 
77                                                 this.throttle.Release(channelKey);
78                                             }
79                                         }
80                                     }
81                                 };
82 
83                                 // Actual Buffering
84                                 lock (this.thisLock)
85                                 {
86                                     // Optimistic state check in case we just raced with the receiveContext
87                                     // faulting. If the receiveContext still faults after the state check, the above
88                                     // cleanup routine will handle things correctly. In both cases, a double-release
89                                     // of the throttle is protected.
90                                     if (receiveContext.State == ReceiveContextState.Received)
91                                     {
92                                         bool found = false;
93                                         // if the exception indicates retry-able (such as RetryException),
94                                         // we will simply retry.  This happens when racing with abort and
95                                         // WF informing the client to retry (BufferedReceiveManager is a
96                                         // client in this case).
97                                         if (retry)
98                                         {
99                                             property.RequestContext.DelayClose(true);
100                                             property.RegisterForReplay(operationContext);
101                                             property.ReplayRequest();
102                                             property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
103                                             found = true;
104                                         }
105                                         else
106                                         {
107                                             ReadOnlyCollection<BookmarkInfo> bookmarks = this.host.DurableInstanceManager.PersistenceProviderDirectory.GetBookmarksForInstance(instanceKey);
108                                             // Retry in case match the existing bookmark
109                                             if (bookmarks != null)
110                                             {
111                                                 for (int i = 0; i < bookmarks.Count; ++i)
112                                                 {
113                                                     BookmarkInfo bookmark = bookmarks[i];
114                                                     if (bookmark.BookmarkName == bookmarkName)
115                                                     {
116                                                         // Found it so retry...
117                                                         property.RequestContext.DelayClose(true);
118                                                         property.RegisterForReplay(operationContext);
119                                                         property.ReplayRequest();
120                                                         property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
121                                                         found = true;
122                                                         break;
123                                                     }
124                                                 }
125                                             }
126                                         }
127 
128                                         if (!found)
129                                         {
130                                             List<BufferedReceiveMessageProperty> properties;
131                                             if (!this.bufferedProperties.TryGetValue(instanceKey, out properties))
132                                             {
133                                                 properties = new List<BufferedReceiveMessageProperty>();
134                                                 this.bufferedProperties.Add(instanceKey, properties);
135                                             }
136                                             property.RequestContext.DelayClose(true);
137                                             property.RegisterForReplay(operationContext);
138                                             properties.Add(property);
139                                         }
140                                         else
141                                         {
142                                             this.throttle.Release(channelKey);
143                                         }
144                                         success = true;
145                                     }
146                                 }
147                             }
148                         }
149                         finally
150                         {
151                             if (!success)
152                             {
153                                 this.throttle.Release(channelKey);
154                             }
155                         }
156                     }
157                 }
158             }
159 
160             return success;
161         }
162 
Retry(HashSet<InstanceKey> associatedInstances, ReadOnlyCollection<BookmarkInfo> availableBookmarks)163         public void Retry(HashSet<InstanceKey> associatedInstances, ReadOnlyCollection<BookmarkInfo> availableBookmarks)
164         {
165             List<BookmarkInfo> bookmarks = new List<BookmarkInfo>(availableBookmarks);
166             foreach (InstanceKey instanceKey in associatedInstances)
167             {
168                 lock (this.thisLock)
169                 {
170                     if (this.bufferedProperties.ContainsKey(instanceKey))
171                     {
172                         List<BufferedReceiveMessageProperty> properties = this.bufferedProperties[instanceKey];
173                         int index = 0;
174 
175                         while (index < properties.Count && bookmarks.Count > 0)
176                         {
177                             BufferedReceiveMessageProperty property = properties[index];
178 
179                             // Determine if this property is now ready to be processed
180                             int channelKey = 0;
181                             bool found = false;
182                             for (int i = 0; i < bookmarks.Count; ++i)
183                             {
184                                 BookmarkInfo bookmark = (BookmarkInfo)bookmarks[i];
185                                 PropertyData data = (PropertyData)property.UserState;
186                                 if (bookmark.BookmarkName == data.BookmarkName)
187                                 {
188                                     // Found it so retry...
189                                     bookmarks.RemoveAt(i);
190                                     channelKey = data.ChannelKey;
191                                     property.ReplayRequest();
192                                     property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
193                                     found = true;
194                                     break;
195                                 }
196                             }
197 
198                             if (!found)
199                             {
200                                 index++;
201                             }
202                             else
203                             {
204                                 properties.RemoveAt(index);
205                                 this.throttle.Release(channelKey);
206                             }
207                         }
208                     }
209                 }
210 
211                 if (bookmarks.Count == 0)
212                 {
213                     break;
214                 }
215             }
216         }
217 
AbandonBufferedReceives(HashSet<InstanceKey> associatedInstances)218         public void AbandonBufferedReceives(HashSet<InstanceKey> associatedInstances)
219         {
220             foreach (InstanceKey instanceKey in associatedInstances)
221             {
222                 lock (this.thisLock)
223                 {
224                     if (this.bufferedProperties.ContainsKey(instanceKey))
225                     {
226                         foreach (BufferedReceiveMessageProperty property in this.bufferedProperties[instanceKey])
227                         {
228                             PropertyData data = (PropertyData)property.UserState;
229                             AbandonReceiveContext(data.ReceiveContext);
230                             this.throttle.Release(data.ChannelKey);
231                         }
232 
233                         this.bufferedProperties.Remove(instanceKey);
234                     }
235                 }
236             }
237         }
238 
239         // clean up any remaining buffered receives as part of ServiceHost close.
AbandonBufferedReceives()240         internal void AbandonBufferedReceives()
241         {
242             lock (this.thisLock)
243             {
244                 foreach (List<BufferedReceiveMessageProperty> value in this.bufferedProperties.Values)
245                 {
246                     foreach (BufferedReceiveMessageProperty property in value)
247                     {
248                         PropertyData data = (PropertyData)property.UserState;
249                         AbandonReceiveContext(data.ReceiveContext);
250                         this.throttle.Release(data.ChannelKey);
251                     }
252                 }
253                 this.bufferedProperties.Clear();
254             }
255         }
256 
257         // Best-effort to abandon the receiveContext
AbandonReceiveContext(ReceiveContext receiveContext)258         internal static void AbandonReceiveContext(ReceiveContext receiveContext)
259         {
260             if (receiveContext != null)
261             {
262                 if (onEndAbandon == null)
263                 {
264                     onEndAbandon = Fx.ThunkCallback(new AsyncCallback(OnEndAbandon));
265                 }
266 
267                 try
268                 {
269                     IAsyncResult result = receiveContext.BeginAbandon(
270                         TimeSpan.MaxValue, onEndAbandon, receiveContext);
271                     if (result.CompletedSynchronously)
272                     {
273                         HandleEndAbandon(result);
274                     }
275                 }
276                 catch (Exception exception)
277                 {
278                     if (Fx.IsFatal(exception))
279                     {
280                         throw;
281                     }
282 
283                     // We ---- any Abandon exception - best effort.
284                     FxTrace.Exception.AsWarning(exception);
285                 }
286             }
287         }
288 
HandleEndAbandon(IAsyncResult result)289         static bool HandleEndAbandon(IAsyncResult result)
290         {
291             ReceiveContext receiveContext = (ReceiveContext)result.AsyncState;
292             receiveContext.EndAbandon(result);
293             return true;
294         }
295 
OnEndAbandon(IAsyncResult result)296         static void OnEndAbandon(IAsyncResult result)
297         {
298             if (result.CompletedSynchronously)
299             {
300                 return;
301             }
302 
303             try
304             {
305                 HandleEndAbandon(result);
306             }
307             catch (Exception exception)
308             {
309                 if (Fx.IsFatal(exception))
310                 {
311                     throw;
312                 }
313 
314                 // We ---- any Abandon exception - best effort.
315                 FxTrace.Exception.AsWarning(exception);
316             }
317         }
318 
Attach(ServiceHostBase owner)319         void IExtension<ServiceHostBase>.Attach(ServiceHostBase owner)
320         {
321             if (owner == null)
322             {
323                 throw FxTrace.Exception.AsError(new ArgumentNullException("owner"));
324             }
325 
326             if (Interlocked.CompareExchange(ref this.initialized, 1, 0) != 0)
327             {
328                 throw FxTrace.Exception.AsError(
329                     new InvalidOperationException(SR.BufferedReceiveBehaviorMultipleUse));
330             }
331 
332             owner.ThrowIfClosedOrOpened();
333 
334             Fx.Assert(owner is WorkflowServiceHost, "owner must be of WorkflowServiceHost type!");
335             this.host = (WorkflowServiceHost)owner;
336             Initialize();
337         }
338 
Detach(ServiceHostBase owner)339         void IExtension<ServiceHostBase>.Detach(ServiceHostBase owner)
340         {
341         }
342 
UpdateProperty(BufferedReceiveMessageProperty property, ReceiveContext receiveContext, int channelKey, string bookmarkName, BufferedReceiveState state)343         bool UpdateProperty(BufferedReceiveMessageProperty property, ReceiveContext receiveContext, int channelKey, string bookmarkName, BufferedReceiveState state)
344         {
345             // If there's data already there make sure the state is allowed
346             if (property.UserState == null)
347             {
348                 property.UserState = new PropertyData()
349                 {
350                     ReceiveContext = receiveContext,
351                     ChannelKey = channelKey,
352                     BookmarkName = bookmarkName,
353                     State = state
354                 };
355             }
356             else
357             {
358                 PropertyData data = (PropertyData)property.UserState;
359 
360                 // We should not buffer twice at the same state
361                 if (data.State == state)
362                 {
363                     return false;
364                 }
365 
366                 data.State = state;
367             }
368 
369             return true;
370         }
371 
Initialize()372         void Initialize()
373         {
374             this.bufferedProperties = new Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>>();
375         }
376 
377         class PendingMessageThrottle
378         {
379             [Fx.Tag.SynchronizationObject(Blocking = false)]
380             Dictionary<int, ThrottleEntry> pendingMessages;
381 
382             int maxPendingMessagesPerChannel;
383             int warningRestoreLimit;
384 
PendingMessageThrottle(int maxPendingMessagesPerChannel)385             public PendingMessageThrottle(int maxPendingMessagesPerChannel)
386             {
387                 this.maxPendingMessagesPerChannel = maxPendingMessagesPerChannel;
388                 this.warningRestoreLimit = (int)Math.Floor(0.7 * (double)maxPendingMessagesPerChannel);
389                 this.pendingMessages = new Dictionary<int, ThrottleEntry>();
390             }
391 
Acquire(int channelKey)392             public bool Acquire(int channelKey)
393             {
394                 lock (this.pendingMessages)
395                 {
396                     if (!this.pendingMessages.ContainsKey(channelKey))
397                     {
398                         this.pendingMessages.Add(channelKey, new ThrottleEntry());
399                     }
400 
401                     ThrottleEntry entry = this.pendingMessages[channelKey];
402                     if (entry.Count < this.maxPendingMessagesPerChannel)
403                     {
404                         entry.Count++;
405                         if (TD.PendingMessagesPerChannelRatioIsEnabled())
406                         {
407                             TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel);
408                         }
409                         return true;
410                     }
411                     else
412                     {
413                         if (TD.MaxPendingMessagesPerChannelExceededIsEnabled())
414                         {
415                             if (!entry.WarningIssued)
416                             {
417                                 TD.MaxPendingMessagesPerChannelExceeded(this.maxPendingMessagesPerChannel);
418                                 entry.WarningIssued = true;
419                             }
420                         }
421 
422                         return false;
423                     }
424                 }
425             }
426 
Release(int channelKey)427             public void Release(int channelKey)
428             {
429                 lock (this.pendingMessages)
430                 {
431                     ThrottleEntry entry = this.pendingMessages[channelKey];
432                     Fx.Assert(entry.Count > 0, "The pending message throttle was released too many times");
433 
434                     entry.Count--;
435                     if (TD.PendingMessagesPerChannelRatioIsEnabled())
436                     {
437                         TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel);
438                     }
439                     if (entry.Count == 0)
440                     {
441                         this.pendingMessages.Remove(channelKey);
442                     }
443                     else if (entry.Count < this.warningRestoreLimit)
444                     {
445                         entry.WarningIssued = false;
446                     }
447                 }
448             }
449 
450             class ThrottleEntry
451             {
ThrottleEntry()452                 public ThrottleEntry()
453                 {
454                 }
455 
456                 public bool WarningIssued
457                 {
458                     get;
459                     set;
460                 }
461 
462                 public int Count
463                 {
464                     get;
465                     set;
466                 }
467             }
468         }
469 
470         class PropertyData
471         {
PropertyData()472             public PropertyData()
473             {
474             }
475 
476             public ReceiveContext ReceiveContext
477             {
478                 get;
479                 set;
480             }
481 
482             public int ChannelKey
483             {
484                 get;
485                 set;
486             }
487 
488             public string BookmarkName
489             {
490                 get;
491                 set;
492             }
493 
494             public BufferedReceiveState State
495             {
496                 get;
497                 set;
498             }
499         }
500     }
501 }
502