1 // ==++== 2 // 3 // Copyright (c) Microsoft Corporation. All rights reserved. 4 // 5 // ==--== 6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 7 // 8 // SpoolingTask.cs 9 // 10 // <OWNER>Microsoft</OWNER> 11 // 12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 13 14 using System.Threading; 15 using System.Threading.Tasks; 16 using System.Diagnostics.Contracts; 17 18 namespace System.Linq.Parallel 19 { 20 21 /// <summary> 22 /// A factory class to execute spooling logic. 23 /// </summary> 24 internal static class SpoolingTask 25 { 26 //----------------------------------------------------------------------------------- 27 // Creates and begins execution of a new spooling task. Executes synchronously, 28 // and by the time this API has returned all of the results have been produced. 29 // 30 // Arguments: 31 // groupState - values for inter-task communication 32 // partitions - the producer enumerators 33 // channels - the producer-consumer channels 34 // taskScheduler - the task manager on which to execute 35 // 36 SpoolStopAndGo( QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, SynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)37 internal static void SpoolStopAndGo<TInputOutput, TIgnoreKey>( 38 QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, 39 SynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler) 40 { 41 Contract.Requires(partitions.PartitionCount == channels.Length); 42 Contract.Requires(groupState != null); 43 44 // Ensure all tasks in this query are parented under a common root. 45 Task rootTask = new Task( 46 () => 47 { 48 int maxToRunInParallel = partitions.PartitionCount - 1; 49 50 // A stop-and-go merge uses the current thread for one task and then blocks before 51 // returning to the caller, until all results have been accumulated. We do this by 52 // running the last partition on the calling thread. 53 for (int i = 0; i < maxToRunInParallel; i++) 54 { 55 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 56 57 QueryTask asyncTask = new StopAndGoSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i], channels[i]); 58 asyncTask.RunAsynchronously(taskScheduler); 59 } 60 61 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); 62 63 // Run one task synchronously on the current thread. 64 QueryTask syncTask = new StopAndGoSpoolingTask<TInputOutput, TIgnoreKey>( 65 maxToRunInParallel, groupState, partitions[maxToRunInParallel], channels[maxToRunInParallel]); 66 syncTask.RunSynchronously(taskScheduler); 67 }); 68 69 // Begin the query on the calling thread. 70 groupState.QueryBegin(rootTask); 71 72 // We don't want to return until the task is finished. Run it on the calling thread. 73 rootTask.RunSynchronously(taskScheduler); 74 75 // Wait for the query to complete, propagate exceptions, and so on. 76 // For pipelined queries, this step happens in the async enumerator. 77 groupState.QueryEnd(false); 78 } 79 80 //----------------------------------------------------------------------------------- 81 // Creates and begins execution of a new spooling task. Runs asynchronously. 82 // 83 // Arguments: 84 // groupState - values for inter-task communication 85 // partitions - the producer enumerators 86 // channels - the producer-consumer channels 87 // taskScheduler - the task manager on which to execute 88 // 89 SpoolPipeline( QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, AsynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)90 internal static void SpoolPipeline<TInputOutput, TIgnoreKey>( 91 QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, 92 AsynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler) 93 { 94 Contract.Requires(partitions.PartitionCount == channels.Length); 95 Contract.Requires(groupState != null); 96 97 // Ensure all tasks in this query are parented under a common root. Because this 98 // is a pipelined query, we detach it from the parent (to avoid blocking the calling 99 // thread), and run the query on a separate thread. 100 Task rootTask = new Task( 101 () => 102 { 103 // Create tasks that will enumerate the partitions in parallel. Because we're pipelining, 104 // we will begin running these tasks in parallel and then return. 105 for (int i = 0; i < partitions.PartitionCount; i++) 106 { 107 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 108 109 QueryTask asyncTask = new PipelineSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i], channels[i]); 110 asyncTask.RunAsynchronously(taskScheduler); 111 } 112 }); 113 114 // Begin the query on the calling thread. 115 groupState.QueryBegin(rootTask); 116 117 // And schedule it for execution. This is done after beginning to ensure no thread tries to 118 // end the query before its root task has been recorded properly. 119 rootTask.Start(taskScheduler); 120 121 // We don't call QueryEnd here; when we return, the query is still executing, and the 122 // last enumerator to be disposed of will call QueryEnd for us. 123 } 124 125 //----------------------------------------------------------------------------------- 126 // Creates and begins execution of a new spooling task. This is a for-all style 127 // execution, meaning that the query will be run fully (for effect) before returning 128 // and that there are no channels into which data will be queued. 129 // 130 // Arguments: 131 // groupState - values for inter-task communication 132 // partitions - the producer enumerators 133 // taskScheduler - the task manager on which to execute 134 // 135 SpoolForAll( QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, TaskScheduler taskScheduler)136 internal static void SpoolForAll<TInputOutput, TIgnoreKey>( 137 QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, TaskScheduler taskScheduler) 138 { 139 Contract.Requires(groupState != null); 140 141 // Ensure all tasks in this query are parented under a common root. 142 Task rootTask = new Task( 143 () => 144 { 145 int maxToRunInParallel = partitions.PartitionCount - 1; 146 147 // Create tasks that will enumerate the partitions in parallel "for effect"; in other words, 148 // no data will be placed into any kind of producer-consumer channel. 149 for (int i = 0; i < maxToRunInParallel; i++) 150 { 151 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 152 153 QueryTask asyncTask = new ForAllSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i]); 154 asyncTask.RunAsynchronously(taskScheduler); 155 } 156 157 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); 158 159 // Run one task synchronously on the current thread. 160 QueryTask syncTask = new ForAllSpoolingTask<TInputOutput, TIgnoreKey>(maxToRunInParallel, groupState, partitions[maxToRunInParallel]); 161 syncTask.RunSynchronously(taskScheduler); 162 }); 163 164 // Begin the query on the calling thread. 165 groupState.QueryBegin(rootTask); 166 167 // We don't want to return until the task is finished. Run it on the calling thread. 168 rootTask.RunSynchronously(taskScheduler); 169 170 // Wait for the query to complete, propagate exceptions, and so on. 171 // For pipelined queries, this step happens in the async enumerator. 172 groupState.QueryEnd(false); 173 } 174 } 175 176 /// <summary> 177 /// A spooling task handles marshaling data from a producer to a consumer. It's given 178 /// a single enumerator object that contains all of the production algorithms, a single 179 /// destination channel from which consumers draw results, and (optionally) a 180 /// synchronization primitive using which to notify asynchronous consumers. 181 /// </summary> 182 /// <typeparam name="TInputOutput"></typeparam> 183 /// <typeparam name="TIgnoreKey"></typeparam> 184 internal class StopAndGoSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase 185 { 186 // The data source from which to pull data. 187 private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source; 188 189 // The destination channel into which data is placed. This can be null if we are 190 // enumerating "for effect", e.g. forall loop. 191 private SynchronousChannel<TInputOutput> m_destination; 192 193 //----------------------------------------------------------------------------------- 194 // Creates, but does not execute, a new spooling task. 195 // 196 // Arguments: 197 // taskIndex - the unique index of this task 198 // source - the producer enumerator 199 // destination - the destination channel into which to spool elements 200 // 201 // Assumptions: 202 // Source cannot be null, although the other arguments may be. 203 // 204 StopAndGoSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, SynchronousChannel<TInputOutput> destination)205 internal StopAndGoSpoolingTask( 206 int taskIndex, QueryTaskGroupState groupState, 207 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, SynchronousChannel<TInputOutput> destination) 208 : base(taskIndex, groupState) 209 { 210 Contract.Requires(source != null); 211 m_source = source; 212 m_destination = destination; 213 } 214 215 //----------------------------------------------------------------------------------- 216 // This method is responsible for enumerating results and enqueueing them to 217 // the output channel(s) as appropriate. Each base class implements its own. 218 // 219 SpoolingWork()220 protected override void SpoolingWork() 221 { 222 // We just enumerate over the entire source data stream, placing each element 223 // into the destination channel. 224 TInputOutput current = default(TInputOutput); 225 TIgnoreKey keyUnused = default(TIgnoreKey); 226 227 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source = m_source; 228 SynchronousChannel<TInputOutput> destination = m_destination; 229 CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken; 230 231 destination.Init(); 232 while (source.MoveNext(ref current, ref keyUnused)) 233 { 234 // If an abort has been requested, stop this worker immediately. 235 if (cancelToken.IsCancellationRequested) 236 { 237 break; 238 } 239 240 destination.Enqueue(current); 241 } 242 } 243 244 //----------------------------------------------------------------------------------- 245 // Ensure we signal that the channel is complete. 246 // 247 SpoolingFinally()248 protected override void SpoolingFinally() 249 { 250 // Call the base implementation. 251 base.SpoolingFinally(); 252 253 // Signal that we are done, in the case of asynchronous consumption. 254 if (m_destination != null) 255 { 256 m_destination.SetDone(); 257 } 258 259 // Dispose of the source enumerator *after* signaling that the task is done. 260 // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. 261 m_source.Dispose(); 262 } 263 } 264 265 /// <summary> 266 /// A spooling task handles marshaling data from a producer to a consumer. It's given 267 /// a single enumerator object that contains all of the production algorithms, a single 268 /// destination channel from which consumers draw results, and (optionally) a 269 /// synchronization primitive using which to notify asynchronous consumers. 270 /// </summary> 271 /// <typeparam name="TInputOutput"></typeparam> 272 /// <typeparam name="TIgnoreKey"></typeparam> 273 internal class PipelineSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase 274 { 275 // The data source from which to pull data. 276 private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source; 277 278 // The destination channel into which data is placed. This can be null if we are 279 // enumerating "for effect", e.g. forall loop. 280 private AsynchronousChannel<TInputOutput> m_destination; 281 282 //----------------------------------------------------------------------------------- 283 // Creates, but does not execute, a new spooling task. 284 // 285 // Arguments: 286 // taskIndex - the unique index of this task 287 // source - the producer enumerator 288 // destination - the destination channel into which to spool elements 289 // 290 // Assumptions: 291 // Source cannot be null, although the other arguments may be. 292 // 293 PipelineSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, AsynchronousChannel<TInputOutput> destination)294 internal PipelineSpoolingTask( 295 int taskIndex, QueryTaskGroupState groupState, 296 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, AsynchronousChannel<TInputOutput> destination) 297 : base(taskIndex, groupState) 298 { 299 Contract.Assert(source != null); 300 m_source = source; 301 m_destination = destination; 302 } 303 304 //----------------------------------------------------------------------------------- 305 // This method is responsible for enumerating results and enqueueing them to 306 // the output channel(s) as appropriate. Each base class implements its own. 307 // 308 SpoolingWork()309 protected override void SpoolingWork() 310 { 311 // We just enumerate over the entire source data stream, placing each element 312 // into the destination channel. 313 TInputOutput current = default(TInputOutput); 314 TIgnoreKey keyUnused = default(TIgnoreKey); 315 316 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source = m_source; 317 AsynchronousChannel<TInputOutput> destination = m_destination; 318 CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken; 319 320 while (source.MoveNext(ref current, ref keyUnused)) 321 { 322 // If an abort has been requested, stop this worker immediately. 323 if (cancelToken.IsCancellationRequested) 324 { 325 break; 326 } 327 328 destination.Enqueue(current); 329 } 330 331 // Flush remaining data to the query consumer in preparation for channel shutdown. 332 destination.FlushBuffers(); 333 } 334 335 //----------------------------------------------------------------------------------- 336 // Ensure we signal that the channel is complete. 337 // 338 SpoolingFinally()339 protected override void SpoolingFinally() 340 { 341 // Call the base implementation. 342 base.SpoolingFinally(); 343 344 // Signal that we are done, in the case of asynchronous consumption. 345 if (m_destination != null) 346 { 347 m_destination.SetDone(); 348 } 349 350 // Dispose of the source enumerator *after* signaling that the task is done. 351 // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. 352 m_source.Dispose(); 353 } 354 } 355 356 /// <summary> 357 /// A spooling task handles marshaling data from a producer to a consumer. It's given 358 /// a single enumerator object that contains all of the production algorithms, a single 359 /// destination channel from which consumers draw results, and (optionally) a 360 /// synchronization primitive using which to notify asynchronous consumers. 361 /// </summary> 362 /// <typeparam name="TInputOutput"></typeparam> 363 /// <typeparam name="TIgnoreKey"></typeparam> 364 internal class ForAllSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase 365 { 366 // The data source from which to pull data. 367 private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source; 368 369 //----------------------------------------------------------------------------------- 370 // Creates, but does not execute, a new spooling task. 371 // 372 // Arguments: 373 // taskIndex - the unique index of this task 374 // source - the producer enumerator 375 // destination - the destination channel into which to spool elements 376 // 377 // Assumptions: 378 // Source cannot be null, although the other arguments may be. 379 // 380 ForAllSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source)381 internal ForAllSpoolingTask( 382 int taskIndex, QueryTaskGroupState groupState, 383 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source) 384 : base(taskIndex, groupState) 385 { 386 Contract.Assert(source != null); 387 m_source = source; 388 } 389 390 //----------------------------------------------------------------------------------- 391 // This method is responsible for enumerating results and enqueueing them to 392 // the output channel(s) as appropriate. Each base class implements its own. 393 // 394 SpoolingWork()395 protected override void SpoolingWork() 396 { 397 // We just enumerate over the entire source data stream for effect. 398 TInputOutput currentUnused = default(TInputOutput); 399 TIgnoreKey keyUnused = default(TIgnoreKey); 400 401 //Note: this only ever runs with a ForAll operator, and ForAllEnumerator performs cancellation checks 402 while (m_source.MoveNext(ref currentUnused, ref keyUnused)) 403 ; 404 } 405 406 //----------------------------------------------------------------------------------- 407 // Ensure we signal that the channel is complete. 408 // 409 SpoolingFinally()410 protected override void SpoolingFinally() 411 { 412 // Call the base implementation. 413 base.SpoolingFinally(); 414 415 // Dispose of the source enumerator 416 m_source.Dispose(); 417 } 418 } 419 } 420