1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3 
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // BatchBlock.cs
7 //
8 //
9 // A propagator block that groups individual messages into arrays of messages.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12 
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Linq;
18 using System.Security;
19 using System.Threading.Tasks.Dataflow.Internal;
20 
21 namespace System.Threading.Tasks.Dataflow
22 {
23     /// <summary>Provides a dataflow block that batches inputs into arrays.</summary>
24     /// <typeparam name="T">Specifies the type of data put into batches.</typeparam>
25     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
26     [DebuggerTypeProxy(typeof(BatchBlock<>.DebugView))]
27     [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
28     public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, IReceivableSourceBlock<T[]>, IDebuggerDisplay
29     {
30         /// <summary>The target half of this batch.</summary>
31         private readonly BatchBlockTargetCore _target;
32         /// <summary>The source half of this batch.</summary>
33         private readonly SourceCore<T[]> _source;
34 
35         /// <summary>Initializes this <see cref="BatchBlock{T}"/> with the specified batch size.</summary>
36         /// <param name="batchSize">The number of items to group into a batch.</param>
37         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
BatchBlock(Int32 batchSize)38         public BatchBlock(Int32 batchSize) :
39             this(batchSize, GroupingDataflowBlockOptions.Default)
40         { }
41 
42         /// <summary>Initializes this <see cref="BatchBlock{T}"/> with the  specified batch size, declining option, and block options.</summary>
43         /// <param name="batchSize">The number of items to group into a batch.</param>
44         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchBlock{T}"/>.</param>
45         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
46         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be no greater than the value of the BoundedCapacity option if a non-default value has been set.</exception>
47         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
BatchBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)48         public BatchBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
49         {
50             // Validate arguments
51             if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
52             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
53             if (dataflowBlockOptions.BoundedCapacity > 0 && dataflowBlockOptions.BoundedCapacity < batchSize) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_BatchSizeMustBeNoGreaterThanBoundedCapacity);
54             Contract.EndContractBlock();
55 
56             // Ensure we have options that can't be changed by the caller
57             dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
58 
59             // Initialize bounding actions
60             Action<ISourceBlock<T[]>, int> onItemsRemoved = null;
61             Func<ISourceBlock<T[]>, T[], IList<T[]>, int> itemCountingFunc = null;
62             if (dataflowBlockOptions.BoundedCapacity > 0)
63             {
64                 onItemsRemoved = (owningSource, count) => ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count);
65                 itemCountingFunc = (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems);
66             }
67 
68             // Initialize source
69             _source = new SourceCore<T[]>(this, dataflowBlockOptions,
70                 owningSource => ((BatchBlock<T>)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false),
71                 onItemsRemoved, itemCountingFunc);
72 
73             // Initialize target
74             _target = new BatchBlockTargetCore(this, batchSize, batch => _source.AddMessage(batch), dataflowBlockOptions);
75 
76             // When the target is done, let the source know it won't be getting any more data
77             _target.Completion.ContinueWith(delegate { _source.Complete(); },
78                 CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
79 
80             // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
81             // In those cases we need to fault the target half to drop its buffered messages and to release its
82             // reservations. This should not create an infinite loop, because all our implementations are designed
83             // to handle multiple completion requests and to carry over only one.
84             _source.Completion.ContinueWith((completed, state) =>
85             {
86                 var thisBlock = ((BatchBlock<T>)state) as IDataflowBlock;
87                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
88                 thisBlock.Fault(completed.Exception);
89             }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
90 
91             // Handle async cancellation requests by declining on the target
92             Common.WireCancellationToComplete(
93                 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchBlockTargetCore)state).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target);
94 #if FEATURE_TRACING
95             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
96             if (etwLog.IsEnabled())
97             {
98                 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
99             }
100 #endif
101         }
102 
103         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
Complete()104         public void Complete() { _target.Complete(exception: null, dropPendingMessages: false, releaseReservedMessages: false); }
105 
106         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
IDataflowBlock.Fault(Exception exception)107         void IDataflowBlock.Fault(Exception exception)
108         {
109             if (exception == null) throw new ArgumentNullException("exception");
110             Contract.EndContractBlock();
111 
112             _target.Complete(exception, dropPendingMessages: true, releaseReservedMessages: false);
113         }
114 
115         /// <summary>
116         /// Triggers the <see cref="BatchBlock{T}"/> to initiate a batching operation even if the number
117         /// of currently queued or postponed items is less than the <see cref="BatchSize"/>.
118         /// </summary>
119         /// <remarks>
120         /// In greedy mode, a batch will be generated from queued items even if fewer exist than the batch size.
121         /// In non-greedy mode, a batch will be generated asynchronously from postponed items even if
122         /// fewer than the batch size can be consumed.
123         /// </remarks>
TriggerBatch()124         public void TriggerBatch() { _target.TriggerBatch(); }
125 
126         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)127         public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
128         {
129             return _source.LinkTo(target, linkOptions);
130         }
131 
132         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
TryReceive(Predicate<T[]> filter, out T[] item)133         public Boolean TryReceive(Predicate<T[]> filter, out T[] item)
134         {
135             return _source.TryReceive(filter, out item);
136         }
137 
138         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
TryReceiveAll(out IList<T[]> items)139         public bool TryReceiveAll(out IList<T[]> items) { return _source.TryReceiveAll(out items); }
140 
141         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
142         public int OutputCount { get { return _source.OutputCount; } }
143 
144         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
145         public Task Completion { get { return _source.Completion; } }
146 
147         /// <summary>Gets the size of the batches generated by this <see cref="BatchBlock{T}"/>.</summary>
148         /// <remarks>
149         /// If the number of items provided to the block is not evenly divisible by the batch size provided
150         /// to the block's constructor, the block's final batch may contain fewer than the requested number of items.
151         /// </remarks>
152         public Int32 BatchSize { get { return _target.BatchSize; } }
153 
154         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)155         DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
156         {
157             return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
158         }
159 
160         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out Boolean messageConsumed)161         T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out Boolean messageConsumed)
162         {
163             return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
164         }
165 
166         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)167         bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
168         {
169             return _source.ReserveMessage(messageHeader, target);
170         }
171 
172         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)173         void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
174         {
175             _source.ReleaseReservation(messageHeader, target);
176         }
177 
178         /// <summary>Gets the number of messages waiting to be offered.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
179         private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
180 
181         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
ToString()182         public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
183 
184         /// <summary>The data to display in the debugger display attribute.</summary>
185         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
186         private object DebuggerDisplayContent
187         {
188             get
189             {
190                 return string.Format("{0}, BatchSize={1}, OutputCount={2}",
191                     Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
192                     BatchSize,
193                     OutputCountForDebugger);
194             }
195         }
196         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
197         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
198 
199         /// <summary>Provides a debugger type proxy for the Batch.</summary>
200         private sealed class DebugView
201         {
202             /// <summary>The batch block being viewed.</summary>
203             private BatchBlock<T> _batchBlock;
204             /// <summary>The target half being viewed.</summary>
205             private readonly BatchBlockTargetCore.DebuggingInformation _targetDebuggingInformation;
206             /// <summary>The source half of the block being viewed.</summary>
207             private readonly SourceCore<T[]>.DebuggingInformation _sourceDebuggingInformation;
208 
209             /// <summary>Initializes the debug view.</summary>
210             /// <param name="batchBlock">The batch being viewed.</param>
DebugView(BatchBlock<T> batchBlock)211             public DebugView(BatchBlock<T> batchBlock)
212             {
213                 Contract.Requires(batchBlock != null, "Need a block with which to construct the debug view");
214                 _batchBlock = batchBlock;
215                 _targetDebuggingInformation = batchBlock._target.GetDebuggingInformation();
216                 _sourceDebuggingInformation = batchBlock._source.GetDebuggingInformation();
217             }
218 
219             /// <summary>Gets the messages waiting to be processed.</summary>
220             public IEnumerable<T> InputQueue { get { return _targetDebuggingInformation.InputQueue; } }
221             /// <summary>Gets the messages waiting to be received.</summary>
222             public IEnumerable<T[]> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
223             /// <summary>Gets the number of batches that have been completed.</summary>
224             public long BatchesCompleted { get { return _targetDebuggingInformation.NumberOfBatchesCompleted; } }
225 
226             /// <summary>Gets the task being used for input processing.</summary>
227             public Task TaskForInputProcessing { get { return _targetDebuggingInformation.TaskForInputProcessing; } }
228             /// <summary>Gets the task being used for output processing.</summary>
229             public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
230 
231             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
232             public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _targetDebuggingInformation.DataflowBlockOptions; } }
233             /// <summary>Gets the size of batches generated by the block.</summary>
234             public int BatchSize { get { return _batchBlock.BatchSize; } }
235             /// <summary>Gets whether the block is declining further messages.</summary>
236             public bool IsDecliningPermanently { get { return _targetDebuggingInformation.IsDecliningPermanently; } }
237             /// <summary>Gets whether the block is completed.</summary>
238             public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
239             /// <summary>Gets the block's Id.</summary>
240             public int Id { get { return Common.GetBlockId(_batchBlock); } }
241 
242             /// <summary>Gets the messages postponed by this batch.</summary>
243             public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages { get { return _targetDebuggingInformation.PostponedMessages; } }
244             /// <summary>Gets the set of all targets linked from this block.</summary>
245             public TargetRegistry<T[]> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
246             /// <summary>Gets the set of all targets linked from this block.</summary>
247             public ITargetBlock<T[]> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
248         }
249 
250         /// <summary>Provides the core target implementation for a Batch.</summary>
251         [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
252         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
253         private sealed class BatchBlockTargetCore
254         {
255             /// <summary>The messages in this target.</summary>
256             private readonly Queue<T> _messages = new Queue<T>();
257             /// <summary>A task representing the completion of the block.</summary>
258             private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>();
259 
260             /// <summary>Gets the object used as the incoming lock.</summary>
261             private object IncomingLock { get { return _completionTask; } }
262 
263             /// <summary>The target that owns this target core.</summary>
264             private readonly BatchBlock<T> _owningBatch;
265             /// <summary>The batch size.</summary>
266             private readonly int _batchSize;
267             /// <summary>State used when in non-greedy mode.</summary>
268             private readonly NonGreedyState _nonGreedyState;
269             /// <summary>Bounding state for when the block is executing in bounded mode.</summary>
270             private readonly BoundingState _boundingState;
271             /// <summary>The options associated with this block.</summary>
272             private readonly GroupingDataflowBlockOptions _dataflowBlockOptions;
273             /// <summary>The action invoked with a completed batch.</summary>
274             private readonly Action<T[]> _batchCompletedAction;
275 
276             /// <summary>Whether to stop accepting new messages.</summary>
277             private bool _decliningPermanently;
278             /// <summary>Whether we've completed at least one batch.</summary>
279             private long _batchesCompleted;
280             /// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
281             private bool _completionReserved;
282 
283             /// <summary>State used only when in non-greedy mode.</summary>
284             private sealed class NonGreedyState
285             {
286                 /// <summary>Collection of postponed messages.</summary>
287                 internal readonly QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages;
288                 /// <summary>A temporary array used to store data retrieved from PostponedMessages.</summary>
289                 internal readonly KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] PostponedMessagesTemp;
290                 /// <summary>A temporary list used in non-greedy mode when consuming postponed messages to store successfully reserved messages.</summary>
291                 internal readonly List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> ReservedSourcesTemp;
292                 /// <summary>Whether the next batching operation should accept fewer than BatchSize items.</summary>
293                 /// <remarks>This value may be read not under a lock, but it must only be written to protected by the IncomingLock.</remarks>
294                 internal bool AcceptFewerThanBatchSize;
295                 /// <summary>The task used to process messages.</summary>
296                 internal Task TaskForInputProcessing;
297 
298                 /// <summary>Initializes the NonGreedyState.</summary>
299                 /// <param name="batchSize">The batch size used by the BatchBlock.</param>
NonGreedyState(int batchSize)300                 internal NonGreedyState(int batchSize)
301                 {
302                     // A non-greedy batch requires at least batchSize sources to be successful.
303                     // Thus, we initialize our collections to be able to store at least that many elements
304                     // in order to avoid unnecessary allocations below that point.
305                     Contract.Requires(batchSize > 0, "A positive batch size is required");
306                     PostponedMessages = new QueuedMap<ISourceBlock<T>, DataflowMessageHeader>(batchSize);
307                     PostponedMessagesTemp = new KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[batchSize];
308                     ReservedSourcesTemp = new List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>>(batchSize);
309                 }
310             }
311 
312             /// <summary>Initializes this target core with the specified configuration.</summary>
313             /// <param name="owningBatch">The owning batch target.</param>
314             /// <param name="batchSize">The number of items to group into a batch.</param>
315             /// <param name="batchCompletedAction">The delegate to invoke when a batch is completed.</param>
316             /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchBlock{T}"/>.  Assumed to be immutable.</param>
317             /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
318             /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
BatchBlockTargetCore(BatchBlock<T> owningBatch, Int32 batchSize, Action<T[]> batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions)319             internal BatchBlockTargetCore(BatchBlock<T> owningBatch, Int32 batchSize, Action<T[]> batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions)
320             {
321                 Contract.Requires(owningBatch != null, "This batch target core must be associated with a batch block.");
322                 Contract.Requires(batchSize >= 1, "Batch sizes must be positive.");
323                 Contract.Requires(batchCompletedAction != null, "Completion action must be specified.");
324                 Contract.Requires(dataflowBlockOptions != null, "Options required to configure the block.");
325 
326                 // Store arguments
327                 _owningBatch = owningBatch;
328                 _batchSize = batchSize;
329                 _batchCompletedAction = batchCompletedAction;
330                 _dataflowBlockOptions = dataflowBlockOptions;
331 
332                 // We'll be using _nonGreedyState even if we are greedy with bounding
333                 bool boundingEnabled = dataflowBlockOptions.BoundedCapacity > 0;
334                 if (!_dataflowBlockOptions.Greedy || boundingEnabled) _nonGreedyState = new NonGreedyState(batchSize);
335                 if (boundingEnabled) _boundingState = new BoundingState(dataflowBlockOptions.BoundedCapacity);
336             }
337 
338             /// <summary>
339             /// Triggers a batching operation even if the number of currently queued or postponed items is less than the <see cref="BatchSize"/>.
340             /// </summary>
TriggerBatch()341             internal void TriggerBatch()
342             {
343                 lock (IncomingLock)
344                 {
345                     // If we shouldn't be doing any more work, bail.  Otherwise, note that we're willing to
346                     // accept fewer items in the next batching operation, and ensure processing is kicked off.
347                     if (!_decliningPermanently && !_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
348                     {
349                         if (_nonGreedyState == null)
350                         {
351                             MakeBatchIfPossible(evenIfFewerThanBatchSize: true);
352                         }
353                         else
354                         {
355                             _nonGreedyState.AcceptFewerThanBatchSize = true;
356                             ProcessAsyncIfNecessary();
357                         }
358                     }
359                     CompleteBlockIfPossible();
360                 }
361             }
362 
363             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)364             internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
365             {
366                 // Validate arguments
367                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
368                 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
369                 Contract.EndContractBlock();
370 
371                 lock (IncomingLock)
372                 {
373                     // If we shouldn't be accepting more messages, don't.
374                     if (_decliningPermanently)
375                     {
376                         CompleteBlockIfPossible();
377                         return DataflowMessageStatus.DecliningPermanently;
378                     }
379 
380                     // We can directly accept the message if:
381                     //      1) we are being greedy AND we are not bounding, OR
382                     //      2) we are being greedy AND we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing.
383                     // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
384                     // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
385                     // accepting new ones directly into the queue.)
386                     if (_dataflowBlockOptions.Greedy &&
387                             (_boundingState == null
388                                 ||
389                              (_boundingState.CountIsLessThanBound && _nonGreedyState.PostponedMessages.Count == 0 && _nonGreedyState.TaskForInputProcessing == null)))
390                     {
391                         // Consume the message from the source if necessary
392                         if (consumeToAccept)
393                         {
394                             Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
395 
396                             bool consumed;
397                             messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed);
398                             if (!consumed) return DataflowMessageStatus.NotAvailable;
399                         }
400 
401                         // Once consumed, enqueue it.
402                         _messages.Enqueue(messageValue);
403                         if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
404 
405                         // Now start declining if the number of batches we've already made plus
406                         // the number we can make from data already enqueued meets our quota.
407                         if (!_decliningPermanently &&
408                             (_batchesCompleted + (_messages.Count / _batchSize)) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
409                         {
410                             _decliningPermanently = true;
411                         }
412 
413                         // Now that we have a message, see if we can make forward progress.
414                         MakeBatchIfPossible(evenIfFewerThanBatchSize: false);
415 
416                         CompleteBlockIfPossible();
417                         return DataflowMessageStatus.Accepted;
418                     }
419                     // Otherwise, we try to postpone if a source was provided
420                     else if (source != null)
421                     {
422                         Debug.Assert(_nonGreedyState != null, "_nonGreedyState must have been initialized during construction in non-greedy mode.");
423 
424                         // We always postpone using _nonGreedyState even if we are being greedy with bounding
425                         _nonGreedyState.PostponedMessages.Push(source, messageHeader);
426 
427                         // In non-greedy mode, we need to see if batch could be completed
428                         if (!_dataflowBlockOptions.Greedy) ProcessAsyncIfNecessary();
429 
430                         return DataflowMessageStatus.Postponed;
431                     }
432                     // We can't do anything else about this message
433                     return DataflowMessageStatus.Declined;
434                 }
435             }
436 
437             /// <summary>Completes/faults the block.
438             /// In general, it is not safe to pass releaseReservedMessages:true, because releasing of reserved messages
439             /// is done without taking a lock. We pass releaseReservedMessages:true only when an exception has been
440             /// caught inside the message processing loop which is a single instance at any given moment.</summary>
441             [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
Complete(Exception exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState = false)442             internal void Complete(Exception exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState = false)
443             {
444                 // Ensure that no new messages may be added
445                 lock (IncomingLock)
446                 {
447                     // Faulting from outside is allowed until we start declining permanently.
448                     // Faulting from inside is allowed at any time.
449                     if (exception != null && (!_decliningPermanently || releaseReservedMessages))
450                     {
451                         // Record the exception in the source.
452                         // The source, which exposes its Completion to the public will take this
453                         // into account and will complete in Faulted state.
454                         _owningBatch._source.AddException(exception);
455                     }
456 
457                     // Drop pending messages if requested
458                     if (dropPendingMessages) _messages.Clear();
459                 }
460 
461                 // Release reserved messages if requested.
462                 // This must be done from outside the lock.
463                 if (releaseReservedMessages)
464                 {
465                     try { ReleaseReservedMessages(throwOnFirstException: false); }
466                     catch (Exception e) { _owningBatch._source.AddException(e); }
467                 }
468 
469                 // Triggering completion requires the lock
470                 lock (IncomingLock)
471                 {
472                     // Revert the dirty processing state if requested
473                     if (revertProcessingState)
474                     {
475                         Debug.Assert(_nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null,
476                                         "The processing state must be dirty when revertProcessingState==true.");
477                         _nonGreedyState.TaskForInputProcessing = null;
478                     }
479 
480                     // Trigger completion
481                     _decliningPermanently = true;
482                     CompleteBlockIfPossible();
483                 }
484             }
485 
486             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
487             internal Task Completion { get { return _completionTask.Task; } }
488 
489             /// <summary>Gets the size of the batches generated by this <see cref="BatchBlock{T}"/>.</summary>
490             internal Int32 BatchSize { get { return _batchSize; } }
491 
492             /// <summary>Gets whether the target has had cancellation requested or an exception has occurred.</summary>
493             private bool CanceledOrFaulted
494             {
495                 get
496                 {
497                     return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || _owningBatch._source.HasExceptions;
498                 }
499             }
500 
501             /// <summary>Returns the available capacity to bring in postponed items. The exact values above _batchSize don't matter.</summary>
502             private int BoundedCapacityAvailable
503             {
504                 get
505                 {
506                     Common.ContractAssertMonitorStatus(IncomingLock, held: true);
507 
508                     return _boundingState != null ?
509                                 _dataflowBlockOptions.BoundedCapacity - _boundingState.CurrentCount :
510                                 _batchSize;
511                 }
512             }
513 
514             /// <summary>Completes the block once all completion conditions are met.</summary>
CompleteBlockIfPossible()515             private void CompleteBlockIfPossible()
516             {
517                 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
518 
519                 if (!_completionReserved)
520                 {
521                     bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
522                     bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups;
523                     bool noMoreMessages = _decliningPermanently && _messages.Count < _batchSize;
524 
525                     bool complete = !currentlyProcessing && (completedAllDesiredBatches || noMoreMessages || CanceledOrFaulted);
526                     if (complete)
527                     {
528                         _completionReserved = true;
529 
530                         // Make sure the target is declining
531                         _decliningPermanently = true;
532 
533                         // If we still have straggling items remaining, make them into their own batch even though there are fewer than batchSize
534                         if (_messages.Count > 0) MakeBatchIfPossible(evenIfFewerThanBatchSize: true);
535 
536                         // We need to complete the block, but we may have arrived here from an external
537                         // call to the block.  To avoid running arbitrary code in the form of
538                         // completion task continuations in that case, do it in a separate task.
539                         Task.Factory.StartNew(thisTargetCore =>
540                         {
541                             var targetCore = (BatchBlockTargetCore)thisTargetCore;
542 
543                             // Release any postponed messages
544                             List<Exception> exceptions = null;
545                             if (targetCore._nonGreedyState != null)
546                             {
547                                 // Note: No locks should be held at this point
548                                 Common.ReleaseAllPostponedMessages(targetCore._owningBatch,
549                                                                    targetCore._nonGreedyState.PostponedMessages,
550                                                                    ref exceptions);
551                             }
552 
553                             if (exceptions != null)
554                             {
555                                 // It is important to migrate these exceptions to the source part of the owning batch,
556                                 // because that is the completion task that is publically exposed.
557                                 targetCore._owningBatch._source.AddExceptions(exceptions);
558                             }
559 
560                             // Target's completion task is only available internally with the sole purpose
561                             // of releasing the task that completes the parent. Hence the actual reason
562                             // for completing this task doesn't matter.
563                             targetCore._completionTask.TrySetResult(default(VoidResult));
564                         }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
565                     }
566                 }
567             }
568 
569             /// <summary>
570             /// Gets whether we should launch further synchronous or asynchronous processing
571             /// to create batches.
572             /// </summary>
573             private bool BatchesNeedProcessing
574             {
575                 get
576                 {
577                     Common.ContractAssertMonitorStatus(IncomingLock, held: true);
578 
579                     // If we're currently processing asynchronously, let that async task
580                     // handle all work; nothing more to do here.  If we're not currently processing
581                     // but cancellation has been requested, don't do more work either.
582                     bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups;
583                     bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
584                     if (completedAllDesiredBatches || currentlyProcessing || CanceledOrFaulted) return false;
585 
586                     // Now, if it's possible to create a batch from queued items or if there are enough
587                     // postponed items to attempt a batch, batches need processing.
588                     int neededMessageCountToCompleteBatch = _batchSize - _messages.Count;
589                     int boundedCapacityAvailable = BoundedCapacityAvailable;
590 
591                     // We have items queued up sufficient to make up a batch
592                     if (neededMessageCountToCompleteBatch <= 0) return true;
593 
594                     if (_nonGreedyState != null)
595                     {
596                         // We can make a triggered batch using postponed messages
597                         if (_nonGreedyState.AcceptFewerThanBatchSize &&
598                             (_messages.Count > 0 || (_nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0)))
599                             return true;
600 
601                         if (_dataflowBlockOptions.Greedy)
602                         {
603                             // We are in greedy mode and we have postponed messages.
604                             // (In greedy mode we only postpone due to lack of bounding capacity.)
605                             // And now we have capacity to consume some postponed messages.
606                             // (In greedy mode we can/should consume as many postponed messages as we can even
607                             // if those messages are insufficient to make up a batch.)
608                             if (_nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0) return true;
609                         }
610                         else
611                         {
612                             // We are in non-greedy mode and we have enough postponed messages and bounding capacity to make a full batch
613                             if (_nonGreedyState.PostponedMessages.Count >= neededMessageCountToCompleteBatch &&
614                                 boundedCapacityAvailable >= neededMessageCountToCompleteBatch)
615                                 return true;
616                         }
617                     }
618 
619                     // There is no other reason to kick off a processing task
620                     return false;
621                 }
622             }
623 
624             /// <summary>Called when new messages are available to be processed.</summary>
625             /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
ProcessAsyncIfNecessary(bool isReplacementReplica = false)626             private void ProcessAsyncIfNecessary(bool isReplacementReplica = false)
627             {
628                 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
629                 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
630 
631                 if (BatchesNeedProcessing)
632                 {
633                     ProcessAsyncIfNecessary_Slow(isReplacementReplica);
634                 }
635             }
636 
637             /// <summary>
638             /// Slow path for ProcessAsyncIfNecessary.
639             /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
640             /// </summary>
ProcessAsyncIfNecessary_Slow(bool isReplacementReplica)641             private void ProcessAsyncIfNecessary_Slow(bool isReplacementReplica)
642             {
643                 Contract.Requires(BatchesNeedProcessing, "There must be a batch that needs processing.");
644 
645                 // Create task and store into _taskForInputProcessing prior to scheduling the task
646                 // so that _taskForInputProcessing will be visibly set in the task loop.
647                 _nonGreedyState.TaskForInputProcessing = new Task(thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget).ProcessMessagesLoopCore(), this,
648                                                     Common.GetCreationOptionsForTask(isReplacementReplica));
649 
650 #if FEATURE_TRACING
651                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
652                 if (etwLog.IsEnabled())
653                 {
654                     etwLog.TaskLaunchedForMessageHandling(
655                         _owningBatch, _nonGreedyState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
656                         _messages.Count + (_nonGreedyState != null ? _nonGreedyState.PostponedMessages.Count : 0));
657                 }
658 #endif
659 
660                 // Start the task handling scheduling exceptions
661                 Exception exception = Common.StartTaskSafe(_nonGreedyState.TaskForInputProcessing, _dataflowBlockOptions.TaskScheduler);
662                 if (exception != null)
663                 {
664                     // Get out from under currently held locks. Complete re-acquires the locks it needs.
665                     Task.Factory.StartNew(exc => Complete(exception: (Exception)exc, dropPendingMessages: true, releaseReservedMessages: true, revertProcessingState: true),
666                                         exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
667                 }
668             }
669 
670 
671             /// <summary>Task body used to process messages.</summary>
672             [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
ProcessMessagesLoopCore()673             private void ProcessMessagesLoopCore()
674             {
675                 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
676                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
677                 try
678                 {
679                     int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
680                     int timesThroughLoop = 0;
681                     bool madeProgress;
682                     do
683                     {
684                         // Determine whether a batch has been forced/triggered.
685                         // (If the value is read as false and is set to true immediately afterwards,
686                         // we'll simply force the next time around.  The only code that can
687                         // set the value to false is this function, after reading a true value.)
688                         bool triggered = Volatile.Read(ref _nonGreedyState.AcceptFewerThanBatchSize);
689 
690                         // Retrieve postponed items:
691                         //      In non-greedy mode: Reserve + Consume
692                         //      In greedy bounded mode: Consume (without a prior reservation)
693                         if (!_dataflowBlockOptions.Greedy) RetrievePostponedItemsNonGreedy(allowFewerThanBatchSize: triggered);
694                         else RetrievePostponedItemsGreedyBounded(allowFewerThanBatchSize: triggered);
695 
696                         // Try to make a batch if there are enough buffered messages
697                         lock (IncomingLock)
698                         {
699                             madeProgress = MakeBatchIfPossible(evenIfFewerThanBatchSize: triggered);
700 
701                             // Reset the trigger flag if:
702                             // - We made a batch, regardless of whether it came due to a trigger or not.
703                             // - We tried to make a batch due to a trigger, but were unable to, which
704                             //   could happen if we're unable to consume any of the postponed messages.
705                             if (madeProgress || triggered) _nonGreedyState.AcceptFewerThanBatchSize = false;
706                         }
707 
708                         timesThroughLoop++;
709                     } while (madeProgress && timesThroughLoop < maxMessagesPerTask);
710                 }
711                 catch (Exception exc)
712                 {
713                     Complete(exc, dropPendingMessages: false, releaseReservedMessages: true);
714                 }
715                 finally
716                 {
717                     lock (IncomingLock)
718                     {
719                         // We're no longer processing, so null out the processing task
720                         _nonGreedyState.TaskForInputProcessing = null;
721 
722                         // However, we may have given up early because we hit our own configured
723                         // processing limits rather than because we ran out of work to do.  If that's
724                         // the case, make sure we spin up another task to keep going.
725                         ProcessAsyncIfNecessary(isReplacementReplica: true);
726 
727                         // If, however, we stopped because we ran out of work to do and we
728                         // know we'll never get more, then complete.
729                         CompleteBlockIfPossible();
730                     }
731                 }
732             }
733 
734             /// <summary>Create a batch from the available items.</summary>
735             /// <param name="evenIfFewerThanBatchSize">
736             /// Whether to make a batch even if there are fewer than BatchSize items available.
737             /// </param>
738             /// <returns>true if a batch was created and published; otherwise, false.</returns>
MakeBatchIfPossible(bool evenIfFewerThanBatchSize)739             private bool MakeBatchIfPossible(bool evenIfFewerThanBatchSize)
740             {
741                 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
742 
743                 // Is a full batch available?
744                 bool fullBatch = _messages.Count >= _batchSize;
745 
746                 // If so, or if it's ok to make a batch with fewer than batchSize, make one.
747                 if (fullBatch || (evenIfFewerThanBatchSize && _messages.Count > 0))
748                 {
749                     var newBatch = new T[fullBatch ? _batchSize : _messages.Count];
750                     for (int i = 0; i < newBatch.Length; i++) newBatch[i] = _messages.Dequeue();
751                     _batchCompletedAction(newBatch);
752                     _batchesCompleted++;
753                     if (_batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups) _decliningPermanently = true;
754                     return true;
755                 }
756                 // No batch could be created
757                 else return false;
758             }
759 
760             /// <summary>Retrieves postponed items in non-greedy mode if we have enough to make a batch.</summary>
761             /// <remarks>Whether we'll accept consuming fewer elements than the defined batch size.</remarks>
RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize)762             private void RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize)
763             {
764                 Contract.Requires(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode.");
765                 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
766                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
767 
768                 // Shortcuts just to keep the code cleaner
769                 QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages;
770                 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp;
771                 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader,T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
772 
773                 // Clear the temporary buffer.  This is safe to do without a lock because
774                 // it is only accessed by the serial message loop.
775                 reserved.Clear();
776 
777                 int poppedInitially;
778                 int boundedCapacityAvailable;
779                 lock (IncomingLock)
780                 {
781                     // The queue must be empty between batches in non-greedy mode
782                     Debug.Assert(_messages.Count == 0, "The queue must be empty between batches in non-greedy mode");
783 
784                     // If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done
785                     boundedCapacityAvailable = BoundedCapacityAvailable;
786                     if (_decliningPermanently ||
787                         postponed.Count == 0 ||
788                         boundedCapacityAvailable <= 0 ||
789                         (!allowFewerThanBatchSize && (postponed.Count < _batchSize || boundedCapacityAvailable < _batchSize)))
790                         return;
791 
792                     // Grab an initial batch of postponed messages.
793                     poppedInitially = postponed.PopRange(postponedTemp, 0, _batchSize);
794                     Debug.Assert(allowFewerThanBatchSize ? poppedInitially > 0 : poppedInitially == _batchSize,
795                                     "We received fewer than we expected based on the previous check.");
796                 } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
797 
798                 // Try to reserve the initial batch of messages.
799                 for (int i = 0; i < poppedInitially; i++)
800                 {
801                     KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage = postponedTemp[i];
802                     if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch))
803                     {
804                         var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T));
805                         var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
806                         reserved.Add(reservedSourceAndMessage);
807                     }
808                 }
809                 Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long
810 
811                 // If we didn't reserve enough to make a batch, start picking off postponed messages
812                 // one by one until we either have enough reserved or we run out of messages
813                 while (reserved.Count < _batchSize)
814                 {
815                     KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
816                     lock (IncomingLock)
817                     {
818                         if (!postponed.TryPop(out sourceAndMessage)) break;
819                     } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
820                     if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch))
821                     {
822                         var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T));
823                         var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
824                         reserved.Add(reservedSourceAndMessage);
825                     }
826                 }
827 
828                 Debug.Assert(reserved.Count <= _batchSize, "Expected the number of reserved sources to be <= the number needed for a batch.");
829 
830                 // We've now reserved what we can.  Either consume them all or release them all.
831                 if (reserved.Count > 0)
832                 {
833                     // TriggerBatch adds a complication here.  It's possible that while we've been reserving
834                     // messages, Post has been used to queue up a bunch of messages to the batch,
835                     // and that if the batch has a max group count and enough messages were posted,
836                     // we could now be declining.  In that case, if we don't specially handle the situation,
837                     // we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups
838                     // implies the block will only ever output a maximum number of batches.  To handle this,
839                     // we start declining before consuming, now that we know we'll have enough to form a batch.
840                     // (If an exception occurs after we do this, we'll be shutting down the block anyway.)
841                     // This is also why we still reserve/consume rather than just consume in forced mode,
842                     // so that we only consume if we're able to turn what we consume into a batch.
843                     bool shouldProceedToConsume = true;
844                     if (allowFewerThanBatchSize)
845                     {
846                         lock (IncomingLock)
847                         {
848                             if (!_decliningPermanently &&
849                                 (_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
850                             // Note that this logic differs from the other location where we do a similar check.
851                             // Here we want to know whether we're one shy of meeting our quota, because we'll accept
852                             // any size batch.  Elsewhere, we need to know whether we have the right number of messages
853                             // queued up.
854                             {
855                                 shouldProceedToConsume = !_decliningPermanently;
856                                 _decliningPermanently = true;
857                             }
858                         }
859                     }
860 
861                     if (shouldProceedToConsume && (allowFewerThanBatchSize || reserved.Count == _batchSize))
862                     {
863                         ConsumeReservedMessagesNonGreedy();
864                     }
865                     else
866                     {
867                         ReleaseReservedMessages(throwOnFirstException: true);
868                     }
869                 }
870 
871                 // Clear out the reserved list, so as not to hold onto values longer than necessary.
872                 // We don't do this in case of failure, because the higher-level exception handler
873                 // accesses the list to try to release reservations.
874                 reserved.Clear();
875             }
876 
877             /// <summary>Retrieves postponed items in greedy bounded mode.</summary>
878             /// <remarks>Whether we'll accept consuming fewer elements than the defined batch size.</remarks>
RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize)879             private void RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize)
880             {
881                 Contract.Requires(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode.");
882                 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
883                 Contract.Requires(_boundingState != null, "Bounding state is required when in bounded mode.");
884                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
885 
886                 // Shortcuts just to keep the code cleaner
887                 QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages;
888                 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp;
889                 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
890 
891                 // Clear the temporary buffer.  This is safe to do without a lock because
892                 // it is only accessed by the serial message loop.
893                 reserved.Clear();
894 
895                 int poppedInitially;
896                 int boundedCapacityAvailable;
897                 int itemCountNeededToCompleteBatch;
898                 lock (IncomingLock)
899                 {
900                     // If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done
901                     boundedCapacityAvailable = BoundedCapacityAvailable;
902                     itemCountNeededToCompleteBatch = _batchSize - _messages.Count;
903                     if (_decliningPermanently ||
904                         postponed.Count == 0 ||
905                         boundedCapacityAvailable <= 0)
906                         return;
907 
908                     // Grab an initial batch of postponed messages.
909                     if (boundedCapacityAvailable < itemCountNeededToCompleteBatch) itemCountNeededToCompleteBatch = boundedCapacityAvailable;
910                     poppedInitially = postponed.PopRange(postponedTemp, 0, itemCountNeededToCompleteBatch);
911                     Debug.Assert(poppedInitially > 0, "We received fewer than we expected based on the previous check.");
912                 } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
913 
914                 // Treat popped messages as reserved.
915                 // We don't have to formally reserve because we are in greedy mode.
916                 for (int i = 0; i < poppedInitially; i++)
917                 {
918                     KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage = postponedTemp[i];
919                     var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T));
920                     var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
921                     reserved.Add(reservedSourceAndMessage);
922                 }
923                 Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long
924 
925                 // If we didn't reserve enough to make a batch, start picking off postponed messages
926                 // one by one until we either have enough reserved or we run out of messages
927                 while (reserved.Count < itemCountNeededToCompleteBatch)
928                 {
929                     KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
930                     lock (IncomingLock)
931                     {
932                         if (!postponed.TryPop(out sourceAndMessage)) break;
933                     } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
934 
935                     var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T));
936                     var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
937                     reserved.Add(reservedSourceAndMessage);
938                 }
939 
940                 Debug.Assert(reserved.Count <= itemCountNeededToCompleteBatch, "Expected the number of reserved sources to be <= the number needed for a batch.");
941 
942                 // We've gotten as many postponed messages as we can. Try to consume them.
943                 if (reserved.Count > 0)
944                 {
945                     // TriggerBatch adds a complication here.  It's possible that while we've been reserving
946                     // messages, Post has been used to queue up a bunch of messages to the batch,
947                     // and that if the batch has a max group count and enough messages were posted,
948                     // we could now be declining.  In that case, if we don't specially handle the situation,
949                     // we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups
950                     // implies the block will only ever output a maximum number of batches.  To handle this,
951                     // we start declining before consuming, now that we know we'll have enough to form a batch.
952                     // (If an exception occurs after we do this, we'll be shutting down the block anyway.)
953                     // This is also why we still reserve/consume rather than just consume in forced mode,
954                     // so that we only consume if we're able to turn what we consume into a batch.
955                     bool shouldProceedToConsume = true;
956                     if (allowFewerThanBatchSize)
957                     {
958                         lock (IncomingLock)
959                         {
960                             if (!_decliningPermanently &&
961                                 (_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
962                             // Note that this logic differs from the other location where we do a similar check.
963                             // Here we want to know whether we're one shy of meeting our quota, because we'll accept
964                             // any size batch.  Elsewhere, we need to know whether we have the right number of messages
965                             // queued up.
966                             {
967                                 shouldProceedToConsume = !_decliningPermanently;
968                                 _decliningPermanently = true;
969                             }
970                         }
971                     }
972 
973                     if (shouldProceedToConsume)
974                     {
975                         ConsumeReservedMessagesGreedyBounded();
976                     }
977                 }
978 
979                 // Clear out the reserved list, so as not to hold onto values longer than necessary.
980                 // We don't do this in case of failure, because the higher-level exception handler
981                 // accesses the list to try to release reservations.
982                 reserved.Clear();
983             }
984 
985             /// <summary>
986             /// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list.
987             /// </summary>
ConsumeReservedMessagesNonGreedy()988             private void ConsumeReservedMessagesNonGreedy()
989             {
990                 Contract.Requires(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode.");
991                 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
992                 Contract.Requires(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized.");
993                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
994 
995                 // Consume the reserved items and store the data.
996                 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
997                 for (int i = 0; i < reserved.Count; i++)
998                 {
999                     // We can only store the data into _messages while holding the IncomingLock, we
1000                     // don't want to allocate extra objects for each batch, and we don't want to
1001                     // take and release the lock for each individual item... but we do need to use
1002                     // the consumed message rather than the initial one.  To handle this, because KeyValuePair is immutable,
1003                     // we store a new KVP with the newly consumed message back into the temp list, so that we can
1004                     // then enumerate the temp list en mass while taking the lock once afterwards.
1005                     KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
1006                     reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
1007                     bool consumed;
1008                     T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
1009                     if (!consumed)
1010                     {
1011                         // The protocol broke down, so throw an exception, as this is fatal.  Before doing so, though,
1012                         // null out all of the messages we've already consumed, as a higher-level event handler
1013                         // should try to release everything in the reserved list.
1014                         for (int prev = 0; prev < i; prev++) reserved[prev] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>);
1015                         throw new InvalidOperationException(SR.InvalidOperation_FailedToConsumeReservedMessage);
1016                     }
1017 
1018                     var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue);
1019                     var consumedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, consumedMessage);
1020                     reserved[i] = consumedSourceAndMessage;
1021                 }
1022                 lock (IncomingLock)
1023                 {
1024                     // Increment the bounding count with the number of consumed messages
1025                     if (_boundingState != null) _boundingState.CurrentCount += reserved.Count;
1026 
1027                     // Enqueue the consumed mesasages
1028                     foreach (KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage in reserved)
1029                     {
1030                         _messages.Enqueue(sourceAndMessage.Value.Value);
1031                     }
1032                 }
1033             }
1034 
1035             /// <summary>
1036             /// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list.
1037             /// </summary>
ConsumeReservedMessagesGreedyBounded()1038             private void ConsumeReservedMessagesGreedyBounded()
1039             {
1040                 Contract.Requires(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode.");
1041                 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
1042                 Contract.Requires(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized.");
1043                 Contract.Requires(_boundingState != null, "Bounded state is required for bounded mode.");
1044                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
1045 
1046                 // Consume the reserved items and store the data.
1047                 int consumedCount = 0;
1048                 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
1049                 for (int i = 0; i < reserved.Count; i++)
1050                 {
1051                     // We can only store the data into _messages while holding the IncomingLock, we
1052                     // don't want to allocate extra objects for each batch, and we don't want to
1053                     // take and release the lock for each individual item... but we do need to use
1054                     // the consumed message rather than the initial one.  To handle this, because KeyValuePair is immutable,
1055                     // we store a new KVP with the newly consumed message back into the temp list, so that we can
1056                     // then enumerate the temp list en mass while taking the lock once afterwards.
1057                     KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
1058                     reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
1059                     bool consumed;
1060                     T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
1061                     if (consumed)
1062                     {
1063                         var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue);
1064                         var consumedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, consumedMessage);
1065                         reserved[i] = consumedSourceAndMessage;
1066 
1067                         // Keep track of the actually consumed messages
1068                         consumedCount++;
1069                     }
1070                 }
1071                 lock (IncomingLock)
1072                 {
1073                     // Increment the bounding count with the number of consumed messages
1074                     if (_boundingState != null) _boundingState.CurrentCount += consumedCount;
1075 
1076                     // Enqueue the consumed mesasages
1077                     foreach (KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage in reserved)
1078                     {
1079                         // If we didn't consume this message, the KeyValuePai will be default, i.e. the source will be null
1080                         if (sourceAndMessage.Key != null) _messages.Enqueue(sourceAndMessage.Value.Value);
1081                     }
1082                 }
1083             }
1084 
1085             /// <summary>
1086             /// Releases all of the reserved messages stored in the non-greedy state's temporary reserved source list.
1087             /// </summary>
1088             /// <param name="throwOnFirstException">
1089             /// Whether to allow an exception from a release to propagate immediately,
1090             /// or to delay propagation until all releases have been attempted.
1091             /// </param>
1092             [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
ReleaseReservedMessages(bool throwOnFirstException)1093             internal void ReleaseReservedMessages(bool throwOnFirstException)
1094             {
1095                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
1096                 Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
1097                 Debug.Assert(_nonGreedyState.ReservedSourcesTemp != null, "Should have been initialized");
1098 
1099                 List<Exception> exceptions = null;
1100 
1101                 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
1102                 for (int i = 0; i < reserved.Count; i++)
1103                 {
1104                     KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
1105                     reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>);
1106                     ISourceBlock<T> source = sourceAndMessage.Key;
1107                     KeyValuePair<DataflowMessageHeader, T> message = sourceAndMessage.Value;
1108                     if (source != null && message.Key.IsValid)
1109                     {
1110                         try { source.ReleaseReservation(message.Key, _owningBatch); }
1111                         catch (Exception e)
1112                         {
1113                             if (throwOnFirstException) throw;
1114                             if (exceptions == null) exceptions = new List<Exception>(1);
1115                             exceptions.Add(e);
1116                         }
1117                     }
1118                 }
1119 
1120                 if (exceptions != null) throw new AggregateException(exceptions);
1121             }
1122 
1123             /// <summary>Notifies the block that one or more items was removed from the queue.</summary>
1124             /// <param name="numItemsRemoved">The number of items removed.</param>
OnItemsRemoved(int numItemsRemoved)1125             internal void OnItemsRemoved(int numItemsRemoved)
1126             {
1127                 Contract.Requires(numItemsRemoved > 0, "Should only be called for a positive number of items removed.");
1128                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
1129 
1130                 // If we're bounding, we need to know when an item is removed so that we
1131                 // can update the count that's mirroring the actual count in the source's queue,
1132                 // and potentially kick off processing to start consuming postponed messages.
1133                 if (_boundingState != null)
1134                 {
1135                     lock (IncomingLock)
1136                     {
1137                         // Decrement the count, which mirrors the count in the source half
1138                         Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
1139                             "It should be impossible to have a negative number of items.");
1140                         _boundingState.CurrentCount -= numItemsRemoved;
1141 
1142                         ProcessAsyncIfNecessary();
1143                         CompleteBlockIfPossible();
1144                     }
1145                 }
1146             }
1147 
1148             /// <summary>Counts the input items in a single output item or in a list of output items.</summary>
1149             /// <param name="singleOutputItem">A single output item. Only considered if multipleOutputItems == null.</param>
1150             /// <param name="multipleOutputItems">A list of output items. May be null.</param>
CountItems(T[] singleOutputItem, IList<T[]> multipleOutputItems)1151             internal static int CountItems(T[] singleOutputItem, IList<T[]> multipleOutputItems)
1152             {
1153                 // If multipleOutputItems == null, then singleOutputItem is the subject of counting
1154                 if (multipleOutputItems == null) return singleOutputItem.Length;
1155 
1156                 // multipleOutputItems != null. Count the elements in each item.
1157                 int count = 0;
1158                 foreach (T[] item in multipleOutputItems) count += item.Length;
1159                 return count;
1160             }
1161 
1162             /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
1163             private int InputCountForDebugger { get { return _messages.Count; } }
1164 
1165             /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
1166             /// <returns>Debugging information about this target.</returns>
GetDebuggingInformation()1167             internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
1168 
1169             /// <summary>Gets the object to display in the debugger display attribute.</summary>
1170             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1171             private object DebuggerDisplayContent
1172             {
1173                 get
1174                 {
1175                     var displayBatch = _owningBatch as IDebuggerDisplay;
1176                     return string.Format("Block=\"{0}\"",
1177                         displayBatch != null ? displayBatch.Content : _owningBatch);
1178                 }
1179             }
1180 
1181             /// <summary>Provides a wrapper for commonly needed debugging information.</summary>
1182             internal sealed class DebuggingInformation
1183             {
1184                 /// <summary>The target being viewed.</summary>
1185                 private BatchBlockTargetCore _target;
1186 
1187                 /// <summary>Initializes the debugging helper.</summary>
1188                 /// <param name="target">The target being viewed.</param>
DebuggingInformation(BatchBlockTargetCore target)1189                 public DebuggingInformation(BatchBlockTargetCore target) { _target = target; }
1190 
1191                 /// <summary>Gets the messages waiting to be processed.</summary>
1192                 public IEnumerable<T> InputQueue { get { return _target._messages.ToList(); } }
1193                 /// <summary>Gets the task being used for input processing.</summary>
1194                 public Task TaskForInputProcessing { get { return _target._nonGreedyState != null ? _target._nonGreedyState.TaskForInputProcessing : null; } }
1195                 /// <summary>Gets the collection of postponed messages.</summary>
1196                 public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages { get { return _target._nonGreedyState != null ? _target._nonGreedyState.PostponedMessages : null; } }
1197                 /// <summary>Gets whether the block is declining further messages.</summary>
1198                 public bool IsDecliningPermanently { get { return _target._decliningPermanently; } }
1199                 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
1200                 public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } }
1201                 /// <summary>Gets the number of batches that have been completed.</summary>
1202                 public long NumberOfBatchesCompleted { get { return _target._batchesCompleted; } }
1203             }
1204         }
1205     }
1206 }
1207