1 // Copyright (c) Microsoft. All rights reserved. 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information. 3 4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 5 // 6 // BatchBlock.cs 7 // 8 // 9 // A propagator block that groups individual messages into arrays of messages. 10 // 11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 12 13 using System.Collections.Generic; 14 using System.Diagnostics; 15 using System.Diagnostics.CodeAnalysis; 16 using System.Diagnostics.Contracts; 17 using System.Linq; 18 using System.Security; 19 using System.Threading.Tasks.Dataflow.Internal; 20 21 namespace System.Threading.Tasks.Dataflow 22 { 23 /// <summary>Provides a dataflow block that batches inputs into arrays.</summary> 24 /// <typeparam name="T">Specifies the type of data put into batches.</typeparam> 25 [DebuggerDisplay("{DebuggerDisplayContent,nq}")] 26 [DebuggerTypeProxy(typeof(BatchBlock<>.DebugView))] 27 [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] 28 public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, IReceivableSourceBlock<T[]>, IDebuggerDisplay 29 { 30 /// <summary>The target half of this batch.</summary> 31 private readonly BatchBlockTargetCore _target; 32 /// <summary>The source half of this batch.</summary> 33 private readonly SourceCore<T[]> _source; 34 35 /// <summary>Initializes this <see cref="BatchBlock{T}"/> with the specified batch size.</summary> 36 /// <param name="batchSize">The number of items to group into a batch.</param> 37 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception> BatchBlock(Int32 batchSize)38 public BatchBlock(Int32 batchSize) : 39 this(batchSize, GroupingDataflowBlockOptions.Default) 40 { } 41 42 /// <summary>Initializes this <see cref="BatchBlock{T}"/> with the specified batch size, declining option, and block options.</summary> 43 /// <param name="batchSize">The number of items to group into a batch.</param> 44 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchBlock{T}"/>.</param> 45 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception> 46 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be no greater than the value of the BoundedCapacity option if a non-default value has been set.</exception> 47 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception> BatchBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)48 public BatchBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions) 49 { 50 // Validate arguments 51 if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive); 52 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions"); 53 if (dataflowBlockOptions.BoundedCapacity > 0 && dataflowBlockOptions.BoundedCapacity < batchSize) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_BatchSizeMustBeNoGreaterThanBoundedCapacity); 54 Contract.EndContractBlock(); 55 56 // Ensure we have options that can't be changed by the caller 57 dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone(); 58 59 // Initialize bounding actions 60 Action<ISourceBlock<T[]>, int> onItemsRemoved = null; 61 Func<ISourceBlock<T[]>, T[], IList<T[]>, int> itemCountingFunc = null; 62 if (dataflowBlockOptions.BoundedCapacity > 0) 63 { 64 onItemsRemoved = (owningSource, count) => ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count); 65 itemCountingFunc = (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems); 66 } 67 68 // Initialize source 69 _source = new SourceCore<T[]>(this, dataflowBlockOptions, 70 owningSource => ((BatchBlock<T>)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), 71 onItemsRemoved, itemCountingFunc); 72 73 // Initialize target 74 _target = new BatchBlockTargetCore(this, batchSize, batch => _source.AddMessage(batch), dataflowBlockOptions); 75 76 // When the target is done, let the source know it won't be getting any more data 77 _target.Completion.ContinueWith(delegate { _source.Complete(); }, 78 CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default); 79 80 // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception. 81 // In those cases we need to fault the target half to drop its buffered messages and to release its 82 // reservations. This should not create an infinite loop, because all our implementations are designed 83 // to handle multiple completion requests and to carry over only one. 84 _source.Completion.ContinueWith((completed, state) => 85 { 86 var thisBlock = ((BatchBlock<T>)state) as IDataflowBlock; 87 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion."); 88 thisBlock.Fault(completed.Exception); 89 }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); 90 91 // Handle async cancellation requests by declining on the target 92 Common.WireCancellationToComplete( 93 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchBlockTargetCore)state).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target); 94 #if FEATURE_TRACING 95 DataflowEtwProvider etwLog = DataflowEtwProvider.Log; 96 if (etwLog.IsEnabled()) 97 { 98 etwLog.DataflowBlockCreated(this, dataflowBlockOptions); 99 } 100 #endif 101 } 102 103 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' /> Complete()104 public void Complete() { _target.Complete(exception: null, dropPendingMessages: false, releaseReservedMessages: false); } 105 106 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' /> IDataflowBlock.Fault(Exception exception)107 void IDataflowBlock.Fault(Exception exception) 108 { 109 if (exception == null) throw new ArgumentNullException("exception"); 110 Contract.EndContractBlock(); 111 112 _target.Complete(exception, dropPendingMessages: true, releaseReservedMessages: false); 113 } 114 115 /// <summary> 116 /// Triggers the <see cref="BatchBlock{T}"/> to initiate a batching operation even if the number 117 /// of currently queued or postponed items is less than the <see cref="BatchSize"/>. 118 /// </summary> 119 /// <remarks> 120 /// In greedy mode, a batch will be generated from queued items even if fewer exist than the batch size. 121 /// In non-greedy mode, a batch will be generated asynchronously from postponed items even if 122 /// fewer than the batch size can be consumed. 123 /// </remarks> TriggerBatch()124 public void TriggerBatch() { _target.TriggerBatch(); } 125 126 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' /> LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)127 public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions) 128 { 129 return _source.LinkTo(target, linkOptions); 130 } 131 132 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' /> TryReceive(Predicate<T[]> filter, out T[] item)133 public Boolean TryReceive(Predicate<T[]> filter, out T[] item) 134 { 135 return _source.TryReceive(filter, out item); 136 } 137 138 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' /> TryReceiveAll(out IList<T[]> items)139 public bool TryReceiveAll(out IList<T[]> items) { return _source.TryReceiveAll(out items); } 140 141 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' /> 142 public int OutputCount { get { return _source.OutputCount; } } 143 144 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' /> 145 public Task Completion { get { return _source.Completion; } } 146 147 /// <summary>Gets the size of the batches generated by this <see cref="BatchBlock{T}"/>.</summary> 148 /// <remarks> 149 /// If the number of items provided to the block is not evenly divisible by the batch size provided 150 /// to the block's constructor, the block's final batch may contain fewer than the requested number of items. 151 /// </remarks> 152 public Int32 BatchSize { get { return _target.BatchSize; } } 153 154 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' /> OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)155 DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept) 156 { 157 return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept); 158 } 159 160 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' /> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out Boolean messageConsumed)161 T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out Boolean messageConsumed) 162 { 163 return _source.ConsumeMessage(messageHeader, target, out messageConsumed); 164 } 165 166 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' /> ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)167 bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) 168 { 169 return _source.ReserveMessage(messageHeader, target); 170 } 171 172 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' /> ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)173 void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) 174 { 175 _source.ReleaseReservation(messageHeader, target); 176 } 177 178 /// <summary>Gets the number of messages waiting to be offered. This must only be used from the debugger as it avoids taking necessary locks.</summary> 179 private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } } 180 181 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' /> ToString()182 public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); } 183 184 /// <summary>The data to display in the debugger display attribute.</summary> 185 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")] 186 private object DebuggerDisplayContent 187 { 188 get 189 { 190 return string.Format("{0}, BatchSize={1}, OutputCount={2}", 191 Common.GetNameForDebugger(this, _source.DataflowBlockOptions), 192 BatchSize, 193 OutputCountForDebugger); 194 } 195 } 196 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary> 197 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } } 198 199 /// <summary>Provides a debugger type proxy for the Batch.</summary> 200 private sealed class DebugView 201 { 202 /// <summary>The batch block being viewed.</summary> 203 private BatchBlock<T> _batchBlock; 204 /// <summary>The target half being viewed.</summary> 205 private readonly BatchBlockTargetCore.DebuggingInformation _targetDebuggingInformation; 206 /// <summary>The source half of the block being viewed.</summary> 207 private readonly SourceCore<T[]>.DebuggingInformation _sourceDebuggingInformation; 208 209 /// <summary>Initializes the debug view.</summary> 210 /// <param name="batchBlock">The batch being viewed.</param> DebugView(BatchBlock<T> batchBlock)211 public DebugView(BatchBlock<T> batchBlock) 212 { 213 Contract.Requires(batchBlock != null, "Need a block with which to construct the debug view"); 214 _batchBlock = batchBlock; 215 _targetDebuggingInformation = batchBlock._target.GetDebuggingInformation(); 216 _sourceDebuggingInformation = batchBlock._source.GetDebuggingInformation(); 217 } 218 219 /// <summary>Gets the messages waiting to be processed.</summary> 220 public IEnumerable<T> InputQueue { get { return _targetDebuggingInformation.InputQueue; } } 221 /// <summary>Gets the messages waiting to be received.</summary> 222 public IEnumerable<T[]> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } } 223 /// <summary>Gets the number of batches that have been completed.</summary> 224 public long BatchesCompleted { get { return _targetDebuggingInformation.NumberOfBatchesCompleted; } } 225 226 /// <summary>Gets the task being used for input processing.</summary> 227 public Task TaskForInputProcessing { get { return _targetDebuggingInformation.TaskForInputProcessing; } } 228 /// <summary>Gets the task being used for output processing.</summary> 229 public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } } 230 231 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary> 232 public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _targetDebuggingInformation.DataflowBlockOptions; } } 233 /// <summary>Gets the size of batches generated by the block.</summary> 234 public int BatchSize { get { return _batchBlock.BatchSize; } } 235 /// <summary>Gets whether the block is declining further messages.</summary> 236 public bool IsDecliningPermanently { get { return _targetDebuggingInformation.IsDecliningPermanently; } } 237 /// <summary>Gets whether the block is completed.</summary> 238 public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } } 239 /// <summary>Gets the block's Id.</summary> 240 public int Id { get { return Common.GetBlockId(_batchBlock); } } 241 242 /// <summary>Gets the messages postponed by this batch.</summary> 243 public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages { get { return _targetDebuggingInformation.PostponedMessages; } } 244 /// <summary>Gets the set of all targets linked from this block.</summary> 245 public TargetRegistry<T[]> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } } 246 /// <summary>Gets the set of all targets linked from this block.</summary> 247 public ITargetBlock<T[]> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } } 248 } 249 250 /// <summary>Provides the core target implementation for a Batch.</summary> 251 [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] 252 [DebuggerDisplay("{DebuggerDisplayContent,nq}")] 253 private sealed class BatchBlockTargetCore 254 { 255 /// <summary>The messages in this target.</summary> 256 private readonly Queue<T> _messages = new Queue<T>(); 257 /// <summary>A task representing the completion of the block.</summary> 258 private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>(); 259 260 /// <summary>Gets the object used as the incoming lock.</summary> 261 private object IncomingLock { get { return _completionTask; } } 262 263 /// <summary>The target that owns this target core.</summary> 264 private readonly BatchBlock<T> _owningBatch; 265 /// <summary>The batch size.</summary> 266 private readonly int _batchSize; 267 /// <summary>State used when in non-greedy mode.</summary> 268 private readonly NonGreedyState _nonGreedyState; 269 /// <summary>Bounding state for when the block is executing in bounded mode.</summary> 270 private readonly BoundingState _boundingState; 271 /// <summary>The options associated with this block.</summary> 272 private readonly GroupingDataflowBlockOptions _dataflowBlockOptions; 273 /// <summary>The action invoked with a completed batch.</summary> 274 private readonly Action<T[]> _batchCompletedAction; 275 276 /// <summary>Whether to stop accepting new messages.</summary> 277 private bool _decliningPermanently; 278 /// <summary>Whether we've completed at least one batch.</summary> 279 private long _batchesCompleted; 280 /// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary> 281 private bool _completionReserved; 282 283 /// <summary>State used only when in non-greedy mode.</summary> 284 private sealed class NonGreedyState 285 { 286 /// <summary>Collection of postponed messages.</summary> 287 internal readonly QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages; 288 /// <summary>A temporary array used to store data retrieved from PostponedMessages.</summary> 289 internal readonly KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] PostponedMessagesTemp; 290 /// <summary>A temporary list used in non-greedy mode when consuming postponed messages to store successfully reserved messages.</summary> 291 internal readonly List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> ReservedSourcesTemp; 292 /// <summary>Whether the next batching operation should accept fewer than BatchSize items.</summary> 293 /// <remarks>This value may be read not under a lock, but it must only be written to protected by the IncomingLock.</remarks> 294 internal bool AcceptFewerThanBatchSize; 295 /// <summary>The task used to process messages.</summary> 296 internal Task TaskForInputProcessing; 297 298 /// <summary>Initializes the NonGreedyState.</summary> 299 /// <param name="batchSize">The batch size used by the BatchBlock.</param> NonGreedyState(int batchSize)300 internal NonGreedyState(int batchSize) 301 { 302 // A non-greedy batch requires at least batchSize sources to be successful. 303 // Thus, we initialize our collections to be able to store at least that many elements 304 // in order to avoid unnecessary allocations below that point. 305 Contract.Requires(batchSize > 0, "A positive batch size is required"); 306 PostponedMessages = new QueuedMap<ISourceBlock<T>, DataflowMessageHeader>(batchSize); 307 PostponedMessagesTemp = new KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[batchSize]; 308 ReservedSourcesTemp = new List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>>(batchSize); 309 } 310 } 311 312 /// <summary>Initializes this target core with the specified configuration.</summary> 313 /// <param name="owningBatch">The owning batch target.</param> 314 /// <param name="batchSize">The number of items to group into a batch.</param> 315 /// <param name="batchCompletedAction">The delegate to invoke when a batch is completed.</param> 316 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchBlock{T}"/>. Assumed to be immutable.</param> 317 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception> 318 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception> BatchBlockTargetCore(BatchBlock<T> owningBatch, Int32 batchSize, Action<T[]> batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions)319 internal BatchBlockTargetCore(BatchBlock<T> owningBatch, Int32 batchSize, Action<T[]> batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions) 320 { 321 Contract.Requires(owningBatch != null, "This batch target core must be associated with a batch block."); 322 Contract.Requires(batchSize >= 1, "Batch sizes must be positive."); 323 Contract.Requires(batchCompletedAction != null, "Completion action must be specified."); 324 Contract.Requires(dataflowBlockOptions != null, "Options required to configure the block."); 325 326 // Store arguments 327 _owningBatch = owningBatch; 328 _batchSize = batchSize; 329 _batchCompletedAction = batchCompletedAction; 330 _dataflowBlockOptions = dataflowBlockOptions; 331 332 // We'll be using _nonGreedyState even if we are greedy with bounding 333 bool boundingEnabled = dataflowBlockOptions.BoundedCapacity > 0; 334 if (!_dataflowBlockOptions.Greedy || boundingEnabled) _nonGreedyState = new NonGreedyState(batchSize); 335 if (boundingEnabled) _boundingState = new BoundingState(dataflowBlockOptions.BoundedCapacity); 336 } 337 338 /// <summary> 339 /// Triggers a batching operation even if the number of currently queued or postponed items is less than the <see cref="BatchSize"/>. 340 /// </summary> TriggerBatch()341 internal void TriggerBatch() 342 { 343 lock (IncomingLock) 344 { 345 // If we shouldn't be doing any more work, bail. Otherwise, note that we're willing to 346 // accept fewer items in the next batching operation, and ensure processing is kicked off. 347 if (!_decliningPermanently && !_dataflowBlockOptions.CancellationToken.IsCancellationRequested) 348 { 349 if (_nonGreedyState == null) 350 { 351 MakeBatchIfPossible(evenIfFewerThanBatchSize: true); 352 } 353 else 354 { 355 _nonGreedyState.AcceptFewerThanBatchSize = true; 356 ProcessAsyncIfNecessary(); 357 } 358 } 359 CompleteBlockIfPossible(); 360 } 361 } 362 363 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' /> OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)364 internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept) 365 { 366 // Validate arguments 367 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader"); 368 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept"); 369 Contract.EndContractBlock(); 370 371 lock (IncomingLock) 372 { 373 // If we shouldn't be accepting more messages, don't. 374 if (_decliningPermanently) 375 { 376 CompleteBlockIfPossible(); 377 return DataflowMessageStatus.DecliningPermanently; 378 } 379 380 // We can directly accept the message if: 381 // 1) we are being greedy AND we are not bounding, OR 382 // 2) we are being greedy AND we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing. 383 // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.) 384 // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and 385 // accepting new ones directly into the queue.) 386 if (_dataflowBlockOptions.Greedy && 387 (_boundingState == null 388 || 389 (_boundingState.CountIsLessThanBound && _nonGreedyState.PostponedMessages.Count == 0 && _nonGreedyState.TaskForInputProcessing == null))) 390 { 391 // Consume the message from the source if necessary 392 if (consumeToAccept) 393 { 394 Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true."); 395 396 bool consumed; 397 messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed); 398 if (!consumed) return DataflowMessageStatus.NotAvailable; 399 } 400 401 // Once consumed, enqueue it. 402 _messages.Enqueue(messageValue); 403 if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound 404 405 // Now start declining if the number of batches we've already made plus 406 // the number we can make from data already enqueued meets our quota. 407 if (!_decliningPermanently && 408 (_batchesCompleted + (_messages.Count / _batchSize)) >= _dataflowBlockOptions.ActualMaxNumberOfGroups) 409 { 410 _decliningPermanently = true; 411 } 412 413 // Now that we have a message, see if we can make forward progress. 414 MakeBatchIfPossible(evenIfFewerThanBatchSize: false); 415 416 CompleteBlockIfPossible(); 417 return DataflowMessageStatus.Accepted; 418 } 419 // Otherwise, we try to postpone if a source was provided 420 else if (source != null) 421 { 422 Debug.Assert(_nonGreedyState != null, "_nonGreedyState must have been initialized during construction in non-greedy mode."); 423 424 // We always postpone using _nonGreedyState even if we are being greedy with bounding 425 _nonGreedyState.PostponedMessages.Push(source, messageHeader); 426 427 // In non-greedy mode, we need to see if batch could be completed 428 if (!_dataflowBlockOptions.Greedy) ProcessAsyncIfNecessary(); 429 430 return DataflowMessageStatus.Postponed; 431 } 432 // We can't do anything else about this message 433 return DataflowMessageStatus.Declined; 434 } 435 } 436 437 /// <summary>Completes/faults the block. 438 /// In general, it is not safe to pass releaseReservedMessages:true, because releasing of reserved messages 439 /// is done without taking a lock. We pass releaseReservedMessages:true only when an exception has been 440 /// caught inside the message processing loop which is a single instance at any given moment.</summary> 441 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] Complete(Exception exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState = false)442 internal void Complete(Exception exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState = false) 443 { 444 // Ensure that no new messages may be added 445 lock (IncomingLock) 446 { 447 // Faulting from outside is allowed until we start declining permanently. 448 // Faulting from inside is allowed at any time. 449 if (exception != null && (!_decliningPermanently || releaseReservedMessages)) 450 { 451 // Record the exception in the source. 452 // The source, which exposes its Completion to the public will take this 453 // into account and will complete in Faulted state. 454 _owningBatch._source.AddException(exception); 455 } 456 457 // Drop pending messages if requested 458 if (dropPendingMessages) _messages.Clear(); 459 } 460 461 // Release reserved messages if requested. 462 // This must be done from outside the lock. 463 if (releaseReservedMessages) 464 { 465 try { ReleaseReservedMessages(throwOnFirstException: false); } 466 catch (Exception e) { _owningBatch._source.AddException(e); } 467 } 468 469 // Triggering completion requires the lock 470 lock (IncomingLock) 471 { 472 // Revert the dirty processing state if requested 473 if (revertProcessingState) 474 { 475 Debug.Assert(_nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null, 476 "The processing state must be dirty when revertProcessingState==true."); 477 _nonGreedyState.TaskForInputProcessing = null; 478 } 479 480 // Trigger completion 481 _decliningPermanently = true; 482 CompleteBlockIfPossible(); 483 } 484 } 485 486 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' /> 487 internal Task Completion { get { return _completionTask.Task; } } 488 489 /// <summary>Gets the size of the batches generated by this <see cref="BatchBlock{T}"/>.</summary> 490 internal Int32 BatchSize { get { return _batchSize; } } 491 492 /// <summary>Gets whether the target has had cancellation requested or an exception has occurred.</summary> 493 private bool CanceledOrFaulted 494 { 495 get 496 { 497 return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || _owningBatch._source.HasExceptions; 498 } 499 } 500 501 /// <summary>Returns the available capacity to bring in postponed items. The exact values above _batchSize don't matter.</summary> 502 private int BoundedCapacityAvailable 503 { 504 get 505 { 506 Common.ContractAssertMonitorStatus(IncomingLock, held: true); 507 508 return _boundingState != null ? 509 _dataflowBlockOptions.BoundedCapacity - _boundingState.CurrentCount : 510 _batchSize; 511 } 512 } 513 514 /// <summary>Completes the block once all completion conditions are met.</summary> CompleteBlockIfPossible()515 private void CompleteBlockIfPossible() 516 { 517 Common.ContractAssertMonitorStatus(IncomingLock, held: true); 518 519 if (!_completionReserved) 520 { 521 bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null; 522 bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups; 523 bool noMoreMessages = _decliningPermanently && _messages.Count < _batchSize; 524 525 bool complete = !currentlyProcessing && (completedAllDesiredBatches || noMoreMessages || CanceledOrFaulted); 526 if (complete) 527 { 528 _completionReserved = true; 529 530 // Make sure the target is declining 531 _decliningPermanently = true; 532 533 // If we still have straggling items remaining, make them into their own batch even though there are fewer than batchSize 534 if (_messages.Count > 0) MakeBatchIfPossible(evenIfFewerThanBatchSize: true); 535 536 // We need to complete the block, but we may have arrived here from an external 537 // call to the block. To avoid running arbitrary code in the form of 538 // completion task continuations in that case, do it in a separate task. 539 Task.Factory.StartNew(thisTargetCore => 540 { 541 var targetCore = (BatchBlockTargetCore)thisTargetCore; 542 543 // Release any postponed messages 544 List<Exception> exceptions = null; 545 if (targetCore._nonGreedyState != null) 546 { 547 // Note: No locks should be held at this point 548 Common.ReleaseAllPostponedMessages(targetCore._owningBatch, 549 targetCore._nonGreedyState.PostponedMessages, 550 ref exceptions); 551 } 552 553 if (exceptions != null) 554 { 555 // It is important to migrate these exceptions to the source part of the owning batch, 556 // because that is the completion task that is publically exposed. 557 targetCore._owningBatch._source.AddExceptions(exceptions); 558 } 559 560 // Target's completion task is only available internally with the sole purpose 561 // of releasing the task that completes the parent. Hence the actual reason 562 // for completing this task doesn't matter. 563 targetCore._completionTask.TrySetResult(default(VoidResult)); 564 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default); 565 } 566 } 567 } 568 569 /// <summary> 570 /// Gets whether we should launch further synchronous or asynchronous processing 571 /// to create batches. 572 /// </summary> 573 private bool BatchesNeedProcessing 574 { 575 get 576 { 577 Common.ContractAssertMonitorStatus(IncomingLock, held: true); 578 579 // If we're currently processing asynchronously, let that async task 580 // handle all work; nothing more to do here. If we're not currently processing 581 // but cancellation has been requested, don't do more work either. 582 bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups; 583 bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null; 584 if (completedAllDesiredBatches || currentlyProcessing || CanceledOrFaulted) return false; 585 586 // Now, if it's possible to create a batch from queued items or if there are enough 587 // postponed items to attempt a batch, batches need processing. 588 int neededMessageCountToCompleteBatch = _batchSize - _messages.Count; 589 int boundedCapacityAvailable = BoundedCapacityAvailable; 590 591 // We have items queued up sufficient to make up a batch 592 if (neededMessageCountToCompleteBatch <= 0) return true; 593 594 if (_nonGreedyState != null) 595 { 596 // We can make a triggered batch using postponed messages 597 if (_nonGreedyState.AcceptFewerThanBatchSize && 598 (_messages.Count > 0 || (_nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0))) 599 return true; 600 601 if (_dataflowBlockOptions.Greedy) 602 { 603 // We are in greedy mode and we have postponed messages. 604 // (In greedy mode we only postpone due to lack of bounding capacity.) 605 // And now we have capacity to consume some postponed messages. 606 // (In greedy mode we can/should consume as many postponed messages as we can even 607 // if those messages are insufficient to make up a batch.) 608 if (_nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0) return true; 609 } 610 else 611 { 612 // We are in non-greedy mode and we have enough postponed messages and bounding capacity to make a full batch 613 if (_nonGreedyState.PostponedMessages.Count >= neededMessageCountToCompleteBatch && 614 boundedCapacityAvailable >= neededMessageCountToCompleteBatch) 615 return true; 616 } 617 } 618 619 // There is no other reason to kick off a processing task 620 return false; 621 } 622 } 623 624 /// <summary>Called when new messages are available to be processed.</summary> 625 /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param> ProcessAsyncIfNecessary(bool isReplacementReplica = false)626 private void ProcessAsyncIfNecessary(bool isReplacementReplica = false) 627 { 628 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode."); 629 Common.ContractAssertMonitorStatus(IncomingLock, held: true); 630 631 if (BatchesNeedProcessing) 632 { 633 ProcessAsyncIfNecessary_Slow(isReplacementReplica); 634 } 635 } 636 637 /// <summary> 638 /// Slow path for ProcessAsyncIfNecessary. 639 /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined. 640 /// </summary> ProcessAsyncIfNecessary_Slow(bool isReplacementReplica)641 private void ProcessAsyncIfNecessary_Slow(bool isReplacementReplica) 642 { 643 Contract.Requires(BatchesNeedProcessing, "There must be a batch that needs processing."); 644 645 // Create task and store into _taskForInputProcessing prior to scheduling the task 646 // so that _taskForInputProcessing will be visibly set in the task loop. 647 _nonGreedyState.TaskForInputProcessing = new Task(thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget).ProcessMessagesLoopCore(), this, 648 Common.GetCreationOptionsForTask(isReplacementReplica)); 649 650 #if FEATURE_TRACING 651 DataflowEtwProvider etwLog = DataflowEtwProvider.Log; 652 if (etwLog.IsEnabled()) 653 { 654 etwLog.TaskLaunchedForMessageHandling( 655 _owningBatch, _nonGreedyState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, 656 _messages.Count + (_nonGreedyState != null ? _nonGreedyState.PostponedMessages.Count : 0)); 657 } 658 #endif 659 660 // Start the task handling scheduling exceptions 661 Exception exception = Common.StartTaskSafe(_nonGreedyState.TaskForInputProcessing, _dataflowBlockOptions.TaskScheduler); 662 if (exception != null) 663 { 664 // Get out from under currently held locks. Complete re-acquires the locks it needs. 665 Task.Factory.StartNew(exc => Complete(exception: (Exception)exc, dropPendingMessages: true, releaseReservedMessages: true, revertProcessingState: true), 666 exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default); 667 } 668 } 669 670 671 /// <summary>Task body used to process messages.</summary> 672 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] ProcessMessagesLoopCore()673 private void ProcessMessagesLoopCore() 674 { 675 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode."); 676 Common.ContractAssertMonitorStatus(IncomingLock, held: false); 677 try 678 { 679 int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask; 680 int timesThroughLoop = 0; 681 bool madeProgress; 682 do 683 { 684 // Determine whether a batch has been forced/triggered. 685 // (If the value is read as false and is set to true immediately afterwards, 686 // we'll simply force the next time around. The only code that can 687 // set the value to false is this function, after reading a true value.) 688 bool triggered = Volatile.Read(ref _nonGreedyState.AcceptFewerThanBatchSize); 689 690 // Retrieve postponed items: 691 // In non-greedy mode: Reserve + Consume 692 // In greedy bounded mode: Consume (without a prior reservation) 693 if (!_dataflowBlockOptions.Greedy) RetrievePostponedItemsNonGreedy(allowFewerThanBatchSize: triggered); 694 else RetrievePostponedItemsGreedyBounded(allowFewerThanBatchSize: triggered); 695 696 // Try to make a batch if there are enough buffered messages 697 lock (IncomingLock) 698 { 699 madeProgress = MakeBatchIfPossible(evenIfFewerThanBatchSize: triggered); 700 701 // Reset the trigger flag if: 702 // - We made a batch, regardless of whether it came due to a trigger or not. 703 // - We tried to make a batch due to a trigger, but were unable to, which 704 // could happen if we're unable to consume any of the postponed messages. 705 if (madeProgress || triggered) _nonGreedyState.AcceptFewerThanBatchSize = false; 706 } 707 708 timesThroughLoop++; 709 } while (madeProgress && timesThroughLoop < maxMessagesPerTask); 710 } 711 catch (Exception exc) 712 { 713 Complete(exc, dropPendingMessages: false, releaseReservedMessages: true); 714 } 715 finally 716 { 717 lock (IncomingLock) 718 { 719 // We're no longer processing, so null out the processing task 720 _nonGreedyState.TaskForInputProcessing = null; 721 722 // However, we may have given up early because we hit our own configured 723 // processing limits rather than because we ran out of work to do. If that's 724 // the case, make sure we spin up another task to keep going. 725 ProcessAsyncIfNecessary(isReplacementReplica: true); 726 727 // If, however, we stopped because we ran out of work to do and we 728 // know we'll never get more, then complete. 729 CompleteBlockIfPossible(); 730 } 731 } 732 } 733 734 /// <summary>Create a batch from the available items.</summary> 735 /// <param name="evenIfFewerThanBatchSize"> 736 /// Whether to make a batch even if there are fewer than BatchSize items available. 737 /// </param> 738 /// <returns>true if a batch was created and published; otherwise, false.</returns> MakeBatchIfPossible(bool evenIfFewerThanBatchSize)739 private bool MakeBatchIfPossible(bool evenIfFewerThanBatchSize) 740 { 741 Common.ContractAssertMonitorStatus(IncomingLock, held: true); 742 743 // Is a full batch available? 744 bool fullBatch = _messages.Count >= _batchSize; 745 746 // If so, or if it's ok to make a batch with fewer than batchSize, make one. 747 if (fullBatch || (evenIfFewerThanBatchSize && _messages.Count > 0)) 748 { 749 var newBatch = new T[fullBatch ? _batchSize : _messages.Count]; 750 for (int i = 0; i < newBatch.Length; i++) newBatch[i] = _messages.Dequeue(); 751 _batchCompletedAction(newBatch); 752 _batchesCompleted++; 753 if (_batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups) _decliningPermanently = true; 754 return true; 755 } 756 // No batch could be created 757 else return false; 758 } 759 760 /// <summary>Retrieves postponed items in non-greedy mode if we have enough to make a batch.</summary> 761 /// <remarks>Whether we'll accept consuming fewer elements than the defined batch size.</remarks> RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize)762 private void RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize) 763 { 764 Contract.Requires(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode."); 765 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode."); 766 Common.ContractAssertMonitorStatus(IncomingLock, held: false); 767 768 // Shortcuts just to keep the code cleaner 769 QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages; 770 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp; 771 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader,T>>> reserved = _nonGreedyState.ReservedSourcesTemp; 772 773 // Clear the temporary buffer. This is safe to do without a lock because 774 // it is only accessed by the serial message loop. 775 reserved.Clear(); 776 777 int poppedInitially; 778 int boundedCapacityAvailable; 779 lock (IncomingLock) 780 { 781 // The queue must be empty between batches in non-greedy mode 782 Debug.Assert(_messages.Count == 0, "The queue must be empty between batches in non-greedy mode"); 783 784 // If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done 785 boundedCapacityAvailable = BoundedCapacityAvailable; 786 if (_decliningPermanently || 787 postponed.Count == 0 || 788 boundedCapacityAvailable <= 0 || 789 (!allowFewerThanBatchSize && (postponed.Count < _batchSize || boundedCapacityAvailable < _batchSize))) 790 return; 791 792 // Grab an initial batch of postponed messages. 793 poppedInitially = postponed.PopRange(postponedTemp, 0, _batchSize); 794 Debug.Assert(allowFewerThanBatchSize ? poppedInitially > 0 : poppedInitially == _batchSize, 795 "We received fewer than we expected based on the previous check."); 796 } // Release the lock. We must not hold it while calling Reserve/Consume/Release. 797 798 // Try to reserve the initial batch of messages. 799 for (int i = 0; i < poppedInitially; i++) 800 { 801 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage = postponedTemp[i]; 802 if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch)) 803 { 804 var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)); 805 var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage); 806 reserved.Add(reservedSourceAndMessage); 807 } 808 } 809 Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long 810 811 // If we didn't reserve enough to make a batch, start picking off postponed messages 812 // one by one until we either have enough reserved or we run out of messages 813 while (reserved.Count < _batchSize) 814 { 815 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage; 816 lock (IncomingLock) 817 { 818 if (!postponed.TryPop(out sourceAndMessage)) break; 819 } // Release the lock. We must not hold it while calling Reserve/Consume/Release. 820 if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch)) 821 { 822 var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)); 823 var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage); 824 reserved.Add(reservedSourceAndMessage); 825 } 826 } 827 828 Debug.Assert(reserved.Count <= _batchSize, "Expected the number of reserved sources to be <= the number needed for a batch."); 829 830 // We've now reserved what we can. Either consume them all or release them all. 831 if (reserved.Count > 0) 832 { 833 // TriggerBatch adds a complication here. It's possible that while we've been reserving 834 // messages, Post has been used to queue up a bunch of messages to the batch, 835 // and that if the batch has a max group count and enough messages were posted, 836 // we could now be declining. In that case, if we don't specially handle the situation, 837 // we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups 838 // implies the block will only ever output a maximum number of batches. To handle this, 839 // we start declining before consuming, now that we know we'll have enough to form a batch. 840 // (If an exception occurs after we do this, we'll be shutting down the block anyway.) 841 // This is also why we still reserve/consume rather than just consume in forced mode, 842 // so that we only consume if we're able to turn what we consume into a batch. 843 bool shouldProceedToConsume = true; 844 if (allowFewerThanBatchSize) 845 { 846 lock (IncomingLock) 847 { 848 if (!_decliningPermanently && 849 (_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups) 850 // Note that this logic differs from the other location where we do a similar check. 851 // Here we want to know whether we're one shy of meeting our quota, because we'll accept 852 // any size batch. Elsewhere, we need to know whether we have the right number of messages 853 // queued up. 854 { 855 shouldProceedToConsume = !_decliningPermanently; 856 _decliningPermanently = true; 857 } 858 } 859 } 860 861 if (shouldProceedToConsume && (allowFewerThanBatchSize || reserved.Count == _batchSize)) 862 { 863 ConsumeReservedMessagesNonGreedy(); 864 } 865 else 866 { 867 ReleaseReservedMessages(throwOnFirstException: true); 868 } 869 } 870 871 // Clear out the reserved list, so as not to hold onto values longer than necessary. 872 // We don't do this in case of failure, because the higher-level exception handler 873 // accesses the list to try to release reservations. 874 reserved.Clear(); 875 } 876 877 /// <summary>Retrieves postponed items in greedy bounded mode.</summary> 878 /// <remarks>Whether we'll accept consuming fewer elements than the defined batch size.</remarks> RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize)879 private void RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize) 880 { 881 Contract.Requires(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode."); 882 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode."); 883 Contract.Requires(_boundingState != null, "Bounding state is required when in bounded mode."); 884 Common.ContractAssertMonitorStatus(IncomingLock, held: false); 885 886 // Shortcuts just to keep the code cleaner 887 QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages; 888 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp; 889 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp; 890 891 // Clear the temporary buffer. This is safe to do without a lock because 892 // it is only accessed by the serial message loop. 893 reserved.Clear(); 894 895 int poppedInitially; 896 int boundedCapacityAvailable; 897 int itemCountNeededToCompleteBatch; 898 lock (IncomingLock) 899 { 900 // If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done 901 boundedCapacityAvailable = BoundedCapacityAvailable; 902 itemCountNeededToCompleteBatch = _batchSize - _messages.Count; 903 if (_decliningPermanently || 904 postponed.Count == 0 || 905 boundedCapacityAvailable <= 0) 906 return; 907 908 // Grab an initial batch of postponed messages. 909 if (boundedCapacityAvailable < itemCountNeededToCompleteBatch) itemCountNeededToCompleteBatch = boundedCapacityAvailable; 910 poppedInitially = postponed.PopRange(postponedTemp, 0, itemCountNeededToCompleteBatch); 911 Debug.Assert(poppedInitially > 0, "We received fewer than we expected based on the previous check."); 912 } // Release the lock. We must not hold it while calling Reserve/Consume/Release. 913 914 // Treat popped messages as reserved. 915 // We don't have to formally reserve because we are in greedy mode. 916 for (int i = 0; i < poppedInitially; i++) 917 { 918 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage = postponedTemp[i]; 919 var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)); 920 var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage); 921 reserved.Add(reservedSourceAndMessage); 922 } 923 Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long 924 925 // If we didn't reserve enough to make a batch, start picking off postponed messages 926 // one by one until we either have enough reserved or we run out of messages 927 while (reserved.Count < itemCountNeededToCompleteBatch) 928 { 929 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage; 930 lock (IncomingLock) 931 { 932 if (!postponed.TryPop(out sourceAndMessage)) break; 933 } // Release the lock. We must not hold it while calling Reserve/Consume/Release. 934 935 var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)); 936 var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage); 937 reserved.Add(reservedSourceAndMessage); 938 } 939 940 Debug.Assert(reserved.Count <= itemCountNeededToCompleteBatch, "Expected the number of reserved sources to be <= the number needed for a batch."); 941 942 // We've gotten as many postponed messages as we can. Try to consume them. 943 if (reserved.Count > 0) 944 { 945 // TriggerBatch adds a complication here. It's possible that while we've been reserving 946 // messages, Post has been used to queue up a bunch of messages to the batch, 947 // and that if the batch has a max group count and enough messages were posted, 948 // we could now be declining. In that case, if we don't specially handle the situation, 949 // we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups 950 // implies the block will only ever output a maximum number of batches. To handle this, 951 // we start declining before consuming, now that we know we'll have enough to form a batch. 952 // (If an exception occurs after we do this, we'll be shutting down the block anyway.) 953 // This is also why we still reserve/consume rather than just consume in forced mode, 954 // so that we only consume if we're able to turn what we consume into a batch. 955 bool shouldProceedToConsume = true; 956 if (allowFewerThanBatchSize) 957 { 958 lock (IncomingLock) 959 { 960 if (!_decliningPermanently && 961 (_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups) 962 // Note that this logic differs from the other location where we do a similar check. 963 // Here we want to know whether we're one shy of meeting our quota, because we'll accept 964 // any size batch. Elsewhere, we need to know whether we have the right number of messages 965 // queued up. 966 { 967 shouldProceedToConsume = !_decliningPermanently; 968 _decliningPermanently = true; 969 } 970 } 971 } 972 973 if (shouldProceedToConsume) 974 { 975 ConsumeReservedMessagesGreedyBounded(); 976 } 977 } 978 979 // Clear out the reserved list, so as not to hold onto values longer than necessary. 980 // We don't do this in case of failure, because the higher-level exception handler 981 // accesses the list to try to release reservations. 982 reserved.Clear(); 983 } 984 985 /// <summary> 986 /// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list. 987 /// </summary> ConsumeReservedMessagesNonGreedy()988 private void ConsumeReservedMessagesNonGreedy() 989 { 990 Contract.Requires(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode."); 991 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode."); 992 Contract.Requires(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized."); 993 Common.ContractAssertMonitorStatus(IncomingLock, held: false); 994 995 // Consume the reserved items and store the data. 996 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp; 997 for (int i = 0; i < reserved.Count; i++) 998 { 999 // We can only store the data into _messages while holding the IncomingLock, we 1000 // don't want to allocate extra objects for each batch, and we don't want to 1001 // take and release the lock for each individual item... but we do need to use 1002 // the consumed message rather than the initial one. To handle this, because KeyValuePair is immutable, 1003 // we store a new KVP with the newly consumed message back into the temp list, so that we can 1004 // then enumerate the temp list en mass while taking the lock once afterwards. 1005 KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i]; 1006 reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage 1007 bool consumed; 1008 T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed); 1009 if (!consumed) 1010 { 1011 // The protocol broke down, so throw an exception, as this is fatal. Before doing so, though, 1012 // null out all of the messages we've already consumed, as a higher-level event handler 1013 // should try to release everything in the reserved list. 1014 for (int prev = 0; prev < i; prev++) reserved[prev] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); 1015 throw new InvalidOperationException(SR.InvalidOperation_FailedToConsumeReservedMessage); 1016 } 1017 1018 var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue); 1019 var consumedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, consumedMessage); 1020 reserved[i] = consumedSourceAndMessage; 1021 } 1022 lock (IncomingLock) 1023 { 1024 // Increment the bounding count with the number of consumed messages 1025 if (_boundingState != null) _boundingState.CurrentCount += reserved.Count; 1026 1027 // Enqueue the consumed mesasages 1028 foreach (KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage in reserved) 1029 { 1030 _messages.Enqueue(sourceAndMessage.Value.Value); 1031 } 1032 } 1033 } 1034 1035 /// <summary> 1036 /// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list. 1037 /// </summary> ConsumeReservedMessagesGreedyBounded()1038 private void ConsumeReservedMessagesGreedyBounded() 1039 { 1040 Contract.Requires(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode."); 1041 Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode."); 1042 Contract.Requires(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized."); 1043 Contract.Requires(_boundingState != null, "Bounded state is required for bounded mode."); 1044 Common.ContractAssertMonitorStatus(IncomingLock, held: false); 1045 1046 // Consume the reserved items and store the data. 1047 int consumedCount = 0; 1048 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp; 1049 for (int i = 0; i < reserved.Count; i++) 1050 { 1051 // We can only store the data into _messages while holding the IncomingLock, we 1052 // don't want to allocate extra objects for each batch, and we don't want to 1053 // take and release the lock for each individual item... but we do need to use 1054 // the consumed message rather than the initial one. To handle this, because KeyValuePair is immutable, 1055 // we store a new KVP with the newly consumed message back into the temp list, so that we can 1056 // then enumerate the temp list en mass while taking the lock once afterwards. 1057 KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i]; 1058 reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage 1059 bool consumed; 1060 T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed); 1061 if (consumed) 1062 { 1063 var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue); 1064 var consumedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, consumedMessage); 1065 reserved[i] = consumedSourceAndMessage; 1066 1067 // Keep track of the actually consumed messages 1068 consumedCount++; 1069 } 1070 } 1071 lock (IncomingLock) 1072 { 1073 // Increment the bounding count with the number of consumed messages 1074 if (_boundingState != null) _boundingState.CurrentCount += consumedCount; 1075 1076 // Enqueue the consumed mesasages 1077 foreach (KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage in reserved) 1078 { 1079 // If we didn't consume this message, the KeyValuePai will be default, i.e. the source will be null 1080 if (sourceAndMessage.Key != null) _messages.Enqueue(sourceAndMessage.Value.Value); 1081 } 1082 } 1083 } 1084 1085 /// <summary> 1086 /// Releases all of the reserved messages stored in the non-greedy state's temporary reserved source list. 1087 /// </summary> 1088 /// <param name="throwOnFirstException"> 1089 /// Whether to allow an exception from a release to propagate immediately, 1090 /// or to delay propagation until all releases have been attempted. 1091 /// </param> 1092 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] ReleaseReservedMessages(bool throwOnFirstException)1093 internal void ReleaseReservedMessages(bool throwOnFirstException) 1094 { 1095 Common.ContractAssertMonitorStatus(IncomingLock, held: false); 1096 Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode."); 1097 Debug.Assert(_nonGreedyState.ReservedSourcesTemp != null, "Should have been initialized"); 1098 1099 List<Exception> exceptions = null; 1100 1101 List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp; 1102 for (int i = 0; i < reserved.Count; i++) 1103 { 1104 KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i]; 1105 reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); 1106 ISourceBlock<T> source = sourceAndMessage.Key; 1107 KeyValuePair<DataflowMessageHeader, T> message = sourceAndMessage.Value; 1108 if (source != null && message.Key.IsValid) 1109 { 1110 try { source.ReleaseReservation(message.Key, _owningBatch); } 1111 catch (Exception e) 1112 { 1113 if (throwOnFirstException) throw; 1114 if (exceptions == null) exceptions = new List<Exception>(1); 1115 exceptions.Add(e); 1116 } 1117 } 1118 } 1119 1120 if (exceptions != null) throw new AggregateException(exceptions); 1121 } 1122 1123 /// <summary>Notifies the block that one or more items was removed from the queue.</summary> 1124 /// <param name="numItemsRemoved">The number of items removed.</param> OnItemsRemoved(int numItemsRemoved)1125 internal void OnItemsRemoved(int numItemsRemoved) 1126 { 1127 Contract.Requires(numItemsRemoved > 0, "Should only be called for a positive number of items removed."); 1128 Common.ContractAssertMonitorStatus(IncomingLock, held: false); 1129 1130 // If we're bounding, we need to know when an item is removed so that we 1131 // can update the count that's mirroring the actual count in the source's queue, 1132 // and potentially kick off processing to start consuming postponed messages. 1133 if (_boundingState != null) 1134 { 1135 lock (IncomingLock) 1136 { 1137 // Decrement the count, which mirrors the count in the source half 1138 Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0, 1139 "It should be impossible to have a negative number of items."); 1140 _boundingState.CurrentCount -= numItemsRemoved; 1141 1142 ProcessAsyncIfNecessary(); 1143 CompleteBlockIfPossible(); 1144 } 1145 } 1146 } 1147 1148 /// <summary>Counts the input items in a single output item or in a list of output items.</summary> 1149 /// <param name="singleOutputItem">A single output item. Only considered if multipleOutputItems == null.</param> 1150 /// <param name="multipleOutputItems">A list of output items. May be null.</param> CountItems(T[] singleOutputItem, IList<T[]> multipleOutputItems)1151 internal static int CountItems(T[] singleOutputItem, IList<T[]> multipleOutputItems) 1152 { 1153 // If multipleOutputItems == null, then singleOutputItem is the subject of counting 1154 if (multipleOutputItems == null) return singleOutputItem.Length; 1155 1156 // multipleOutputItems != null. Count the elements in each item. 1157 int count = 0; 1158 foreach (T[] item in multipleOutputItems) count += item.Length; 1159 return count; 1160 } 1161 1162 /// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary> 1163 private int InputCountForDebugger { get { return _messages.Count; } } 1164 1165 /// <summary>Gets information about this helper to be used for display in a debugger.</summary> 1166 /// <returns>Debugging information about this target.</returns> GetDebuggingInformation()1167 internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); } 1168 1169 /// <summary>Gets the object to display in the debugger display attribute.</summary> 1170 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")] 1171 private object DebuggerDisplayContent 1172 { 1173 get 1174 { 1175 var displayBatch = _owningBatch as IDebuggerDisplay; 1176 return string.Format("Block=\"{0}\"", 1177 displayBatch != null ? displayBatch.Content : _owningBatch); 1178 } 1179 } 1180 1181 /// <summary>Provides a wrapper for commonly needed debugging information.</summary> 1182 internal sealed class DebuggingInformation 1183 { 1184 /// <summary>The target being viewed.</summary> 1185 private BatchBlockTargetCore _target; 1186 1187 /// <summary>Initializes the debugging helper.</summary> 1188 /// <param name="target">The target being viewed.</param> DebuggingInformation(BatchBlockTargetCore target)1189 public DebuggingInformation(BatchBlockTargetCore target) { _target = target; } 1190 1191 /// <summary>Gets the messages waiting to be processed.</summary> 1192 public IEnumerable<T> InputQueue { get { return _target._messages.ToList(); } } 1193 /// <summary>Gets the task being used for input processing.</summary> 1194 public Task TaskForInputProcessing { get { return _target._nonGreedyState != null ? _target._nonGreedyState.TaskForInputProcessing : null; } } 1195 /// <summary>Gets the collection of postponed messages.</summary> 1196 public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages { get { return _target._nonGreedyState != null ? _target._nonGreedyState.PostponedMessages : null; } } 1197 /// <summary>Gets whether the block is declining further messages.</summary> 1198 public bool IsDecliningPermanently { get { return _target._decliningPermanently; } } 1199 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary> 1200 public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } } 1201 /// <summary>Gets the number of batches that have been completed.</summary> 1202 public long NumberOfBatchesCompleted { get { return _target._batchesCompleted; } } 1203 } 1204 } 1205 } 1206 } 1207