1 /**
2 $(D std._parallelism) implements high-level primitives for SMP _parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise _parallelism.  $(D std._parallelism) is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 $(D std.concurrency).
9 
10 $(D std._parallelism) is based on the concept of a $(D Task).  A $(D Task) is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other $(D Task).  Using $(D Task)
13 directly allows programming with a future/promise paradigm.  All other
14 supported _parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over $(D Task).  They
16 automatically create one or more $(D Task) objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 
19 After creation, a $(D Task) may be executed in a new thread, or submitted
20 to a $(D TaskPool) for execution.  A $(D TaskPool) encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of $(D Task)s onto a smaller number of threads.  A task queue is a
23 FIFO queue of $(D Task) objects that have been submitted to the
24 $(D TaskPool) and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the $(D Task) at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a $(D Task) is needed before execution
29 by a worker thread has begun, the $(D Task) can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 
32 Warning:  Unless marked as $(D @trusted) or $(D @safe), artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
35 
36 Source:    $(PHOBOSSRC std/_parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42 
43 ///
44 @system unittest
45 {
46     import std.algorithm.iteration : map;
47     import std.math : approxEqual;
48     import std.parallelism : taskPool;
49     import std.range : iota;
50 
51     // Parallel reduce can be combined with
52     // std.algorithm.iteration.map to interesting effect.
53     // The following example (thanks to Russel Winder)
54     // calculates pi by quadrature  using
55     // std.algorithm.map and TaskPool.reduce.
56     // getTerm is evaluated in parallel as needed by
57     // TaskPool.reduce.
58     //
59     // Timings on an Intel i5-3450 quad core machine
60     // for n = 1_000_000_000:
61     //
62     // TaskPool.reduce:       1.067 s
63     // std.algorithm.reduce:  4.011 s
64 
65     enum n = 1_000_000;
66     enum delta = 1.0 / n;
67 
68     alias getTerm = (int i)
69     {
70         immutable x = ( i - 0.5 ) * delta;
71         return delta / ( 1.0 + x * x ) ;
72     };
73 
74     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
75 
76     assert(pi.approxEqual(3.1415926));
77 }
78 
79 import core.atomic;
80 import core.memory;
81 import core.sync.condition;
82 import core.thread;
83 
84 import std.functional;
85 import std.meta;
86 import std.range.primitives;
87 import std.traits;
88 
version(OSX)89 version (OSX)
90 {
91     version = useSysctlbyname;
92 }
version(FreeBSD)93 else version (FreeBSD)
94 {
95     version = useSysctlbyname;
96 }
version(DragonFlyBSD)97 else version (DragonFlyBSD)
98 {
99     version = useSysctlbyname;
100 }
version(NetBSD)101 else version (NetBSD)
102 {
103     version = useSysctlbyname;
104 }
105 
106 
version(Windows)107 version (Windows)
108 {
109     // BUGS:  Only works on Windows 2000 and above.
110     shared static this()
111     {
112         import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
113         import std.algorithm.comparison : max;
114 
115         SYSTEM_INFO si;
116         GetSystemInfo(&si);
117         totalCPUs = max(1, cast(uint) si.dwNumberOfProcessors);
118     }
119 
120 }
version(linux)121 else version (linux)
122 {
123     shared static this()
124     {
125         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
126         totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
127     }
128 }
version(Solaris)129 else version (Solaris)
130 {
131     shared static this()
132     {
133         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
134         totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
135     }
136 }
version(useSysctlbyname)137 else version (useSysctlbyname)
138 {
139     extern(C) int sysctlbyname(
140         const char *, void *, size_t *, void *, size_t
141     );
142 
143     shared static this()
144     {
145         version (OSX)
146         {
147             auto nameStr = "machdep.cpu.core_count\0".ptr;
148         }
149         else version (FreeBSD)
150         {
151             auto nameStr = "hw.ncpu\0".ptr;
152         }
153         else version (DragonFlyBSD)
154         {
155             auto nameStr = "hw.ncpu\0".ptr;
156         }
157         else version (NetBSD)
158         {
159             auto nameStr = "hw.ncpu\0".ptr;
160         }
161 
162         uint ans;
163         size_t len = uint.sizeof;
164         sysctlbyname(nameStr, &ans, &len, null, 0);
165         totalCPUs = ans;
166     }
167 
168 }
169 else
170 {
171     static assert(0, "Don't know how to get N CPUs on this OS.");
172 }
173 
174 immutable size_t cacheLineSize;
this()175 shared static this()
176 {
177     import core.cpuid : datacache;
178     size_t lineSize = 0;
179     foreach (cachelevel; datacache)
180     {
181         if (cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max)
182         {
183             lineSize = cachelevel.lineSize;
184         }
185     }
186 
187     cacheLineSize = lineSize;
188 }
189 
190 
191 /* Atomics code.  These forward to core.atomic, but are written like this
192    for two reasons:
193 
194    1.  They used to actually contain ASM code and I don' want to have to change
195        to directly calling core.atomic in a zillion different places.
196 
197    2.  core.atomic has some misc. issues that make my use cases difficult
198        without wrapping it.  If I didn't wrap it, casts would be required
199        basically everywhere.
200 */
201 private void atomicSetUbyte(T)(ref T stuff, T newVal)
202 if (__traits(isIntegral, T) && is(T : ubyte))
203 {
204     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
205     atomicStore(*(cast(shared) &stuff), newVal);
206 }
207 
208 private ubyte atomicReadUbyte(T)(ref T val)
209 if (__traits(isIntegral, T) && is(T : ubyte))
210 {
211     return atomicLoad(*(cast(shared) &val));
212 }
213 
214 // This gets rid of the need for a lot of annoying casts in other parts of the
215 // code, when enums are involved.
216 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
217 if (__traits(isIntegral, T) && is(T : ubyte))
218 {
219     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
220 }
221 
222 /*--------------------- Generic helper functions, etc.------------------------*/
MapType(R,functions...)223 private template MapType(R, functions...)
224 {
225     static assert(functions.length);
226 
227     ElementType!R e = void;
228     alias MapType =
229         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
230 }
231 
ReduceType(alias fun,R,E)232 private template ReduceType(alias fun, R, E)
233 {
234     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
235 }
236 
noUnsharedAliasing(T)237 private template noUnsharedAliasing(T)
238 {
239     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
240 }
241 
242 // This template tests whether a function may be executed in parallel from
243 // @safe code via Task.executeInNewThread().  There is an additional
244 // requirement for executing it via a TaskPool.  (See isSafeReturn).
isSafeTask(F)245 private template isSafeTask(F)
246 {
247     enum bool isSafeTask =
248         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
249         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
250         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
251         allSatisfy!(noUnsharedAliasing, Parameters!F);
252 }
253 
254 @safe unittest
255 {
256     alias F1 = void function() @safe;
257     alias F2 = void function();
258     alias F3 = void function(uint, string) @trusted;
259     alias F4 = void function(uint, char[]);
260 
261     static assert( isSafeTask!F1);
262     static assert(!isSafeTask!F2);
263     static assert( isSafeTask!F3);
264     static assert(!isSafeTask!F4);
265 
266     alias F5 = uint[] function(uint, string) pure @trusted;
267     static assert( isSafeTask!F5);
268 }
269 
270 // This function decides whether Tasks that meet all of the other requirements
271 // for being executed from @safe code can be executed on a TaskPool.
272 // When executing via TaskPool, it's theoretically possible
273 // to return a value that is also pointed to by a worker thread's thread local
274 // storage.  When executing from executeInNewThread(), the thread that executed
275 // the Task is terminated by the time the return value is visible in the calling
276 // thread, so this is a non-issue.  It's also a non-issue for pure functions
277 // since they can't read global state.
isSafeReturn(T)278 private template isSafeReturn(T)
279 {
280     static if (!hasUnsharedAliasing!(T.ReturnType))
281     {
282         enum isSafeReturn = true;
283     }
284     else static if (T.isPure)
285     {
286         enum isSafeReturn = true;
287     }
288     else
289     {
290         enum isSafeReturn = false;
291     }
292 }
293 
randAssignable(R)294 private template randAssignable(R)
295 {
296     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
297 }
298 
299 private enum TaskStatus : ubyte
300 {
301     notStarted,
302     inProgress,
303     done
304 }
305 
AliasReturn(alias fun,T...)306 private template AliasReturn(alias fun, T...)
307 {
308     alias AliasReturn = typeof({ T args; return fun(args); });
309 }
310 
311 // Should be private, but std.algorithm.reduce is used in the zero-thread case
312 // and won't work w/ private.
reduceAdjoin(functions...)313 template reduceAdjoin(functions...)
314 {
315     static if (functions.length == 1)
316     {
317         alias reduceAdjoin = binaryFun!(functions[0]);
318     }
319     else
320     {
321         T reduceAdjoin(T, U)(T lhs, U rhs)
322         {
323             alias funs = staticMap!(binaryFun, functions);
324 
325             foreach (i, Unused; typeof(lhs.expand))
326             {
327                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
328             }
329 
330             return lhs;
331         }
332     }
333 }
334 
reduceFinish(functions...)335 private template reduceFinish(functions...)
336 {
337     static if (functions.length == 1)
338     {
339         alias reduceFinish = binaryFun!(functions[0]);
340     }
341     else
342     {
343         T reduceFinish(T)(T lhs, T rhs)
344         {
345             alias funs = staticMap!(binaryFun, functions);
346 
347             foreach (i, Unused; typeof(lhs.expand))
348             {
349                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
350             }
351 
352             return lhs;
353         }
354     }
355 }
356 
357 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
358 {
359     enum isRoundRobin = true;
360 }
361 
isRoundRobin(T)362 private template isRoundRobin(T)
363 {
364     enum isRoundRobin = false;
365 }
366 
367 @safe unittest
368 {
369     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
370     static assert(!isRoundRobin!(uint));
371 }
372 
373 // This is the base "class" for all of the other tasks.  Using C-style
374 // polymorphism to allow more direct control over memory allocation, etc.
375 private struct AbstractTask
376 {
377     AbstractTask* prev;
378     AbstractTask* next;
379 
380     // Pointer to a function that executes this task.
381     void function(void*) runTask;
382 
383     Throwable exception;
384     ubyte taskStatus = TaskStatus.notStarted;
385 
doneAbstractTask386     bool done() @property
387     {
388         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
389         {
390             if (exception)
391             {
392                 throw exception;
393             }
394 
395             return true;
396         }
397 
398         return false;
399     }
400 
jobAbstractTask401     void job()
402     {
403         runTask(&this);
404     }
405 }
406 
407 /**
408 $(D Task) represents the fundamental unit of work.  A $(D Task) may be
409 executed in parallel with any other $(D Task).  Using this struct directly
410 allows future/promise _parallelism.  In this paradigm, a function (or delegate
411 or other callable) is executed in a thread other than the one it was called
412 from.  The calling thread does not block while the function is being executed.
413 A call to $(D workForce), $(D yieldForce), or $(D spinForce) is used to
414 ensure that the $(D Task) has finished executing and to obtain the return
415 value, if any.  These functions and $(D done) also act as full memory barriers,
416 meaning that any memory writes made in the thread that executed the $(D Task)
417 are guaranteed to be visible in the calling thread after one of these functions
418 returns.
419 
420 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
421 be used to create an instance of this struct.  See $(D task) for usage examples.
422 
423 Function results are returned from $(D yieldForce), $(D spinForce) and
424 $(D workForce) by ref.  If $(D fun) returns by ref, the reference will point
425 to the returned reference of $(D fun).  Otherwise it will point to a
426 field in this struct.
427 
428 Copying of this struct is disabled, since it would provide no useful semantics.
429 If you want to pass this struct around, you should do so by reference or
430 pointer.
431 
432 Bugs:  Changes to $(D ref) and $(D out) arguments are not propagated to the
433        call site, only to $(D args) in this struct.
434 */
Task(alias fun,Args...)435 struct Task(alias fun, Args...)
436 {
437     AbstractTask base = {runTask : &impl};
438     alias base this;
439 
440     private @property AbstractTask* basePtr()
441     {
442         return &base;
443     }
444 
445     private static void impl(void* myTask)
446     {
447         import std.algorithm.internal : addressOf;
448 
449         Task* myCastedTask = cast(typeof(this)*) myTask;
450         static if (is(ReturnType == void))
451         {
452             fun(myCastedTask._args);
453         }
454         else static if (is(typeof(addressOf(fun(myCastedTask._args)))))
455         {
456             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
457         }
458         else
459         {
460             myCastedTask.returnVal = fun(myCastedTask._args);
461         }
462     }
463 
464     private TaskPool pool;
465     private bool isScoped;  // True if created with scopedTask.
466 
467     Args _args;
468 
469     /**
470     The arguments the function was called with.  Changes to $(D out) and
471     $(D ref) arguments will be visible here.
472     */
473     static if (__traits(isSame, fun, run))
474     {
475         alias args = _args[1..$];
476     }
477     else
478     {
479         alias args = _args;
480     }
481 
482 
483     // The purpose of this code is to decide whether functions whose
484     // return values have unshared aliasing can be executed via
485     // TaskPool from @safe code.  See isSafeReturn.
486     static if (__traits(isSame, fun, run))
487     {
488         static if (isFunctionPointer!(_args[0]))
489         {
490             private enum bool isPure =
491             functionAttributes!(Args[0]) & FunctionAttribute.pure_;
492         }
493         else
494         {
495             // BUG:  Should check this for delegates too, but std.traits
496             //       apparently doesn't allow this.  isPure is irrelevant
497             //       for delegates, at least for now since shared delegates
498             //       don't work.
499             private enum bool isPure = false;
500         }
501 
502     }
503     else
504     {
505         // We already know that we can't execute aliases in @safe code, so
506         // just put a dummy value here.
507         private enum bool isPure = false;
508     }
509 
510 
511     /**
512     The return type of the function called by this $(D Task).  This can be
513     $(D void).
514     */
515     alias ReturnType = typeof(fun(_args));
516 
517     static if (!is(ReturnType == void))
518     {
519         static if (is(typeof(&fun(_args))))
520         {
521             // Ref return.
522             ReturnType* returnVal;
523 
524             ref ReturnType fixRef(ReturnType* val)
525             {
526                 return *val;
527             }
528 
529         }
530         else
531         {
532             ReturnType returnVal;
533 
534             ref ReturnType fixRef(ref ReturnType val)
535             {
536                 return val;
537             }
538         }
539     }
540 
541     private void enforcePool()
542     {
543         import std.exception : enforce;
544         enforce(this.pool !is null, "Job not submitted yet.");
545     }
546 
547     static if (Args.length > 0)
548     {
549         private this(Args args)
550         {
551             _args = args;
552         }
553     }
554 
555     // Work around DMD bug 6588, allow immutable elements.
556     static if (allSatisfy!(isAssignable, Args))
557     {
558         typeof(this) opAssign(typeof(this) rhs)
559         {
560             foreach (i, Type; typeof(this.tupleof))
561             {
562                 this.tupleof[i] = rhs.tupleof[i];
563             }
564             return this;
565         }
566     }
567     else
568     {
569         @disable typeof(this) opAssign(typeof(this) rhs)
570         {
571             assert(0);
572         }
573     }
574 
575     /**
576     If the $(D Task) isn't started yet, execute it in the current thread.
577     If it's done, return its return value, if any.  If it's in progress,
578     busy spin until it's done, then return the return value.  If it threw
579     an exception, rethrow that exception.
580 
581     This function should be used when you expect the result of the
582     $(D Task) to be available on a timescale shorter than that of an OS
583     context switch.
584      */
585     @property ref ReturnType spinForce() @trusted
586     {
587         enforcePool();
588 
589         this.pool.tryDeleteExecute(basePtr);
590 
591         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
592 
593         if (exception)
594         {
595             throw exception;
596         }
597 
598         static if (!is(ReturnType == void))
599         {
600             return fixRef(this.returnVal);
601         }
602     }
603 
604     /**
605     If the $(D Task) isn't started yet, execute it in the current thread.
606     If it's done, return its return value, if any.  If it's in progress,
607     wait on a condition variable.  If it threw an exception, rethrow that
608     exception.
609 
610     This function should be used for expensive functions, as waiting on a
611     condition variable introduces latency, but avoids wasted CPU cycles.
612      */
613     @property ref ReturnType yieldForce() @trusted
614     {
615         enforcePool();
616         this.pool.tryDeleteExecute(basePtr);
617 
618         if (done)
619         {
620             static if (is(ReturnType == void))
621             {
622                 return;
623             }
624             else
625             {
626                 return fixRef(this.returnVal);
627             }
628         }
629 
630         pool.waiterLock();
631         scope(exit) pool.waiterUnlock();
632 
633         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
634         {
635             pool.waitUntilCompletion();
636         }
637 
638         if (exception)
639         {
640             throw exception; // nocoverage
641         }
642 
643         static if (!is(ReturnType == void))
644         {
645             return fixRef(this.returnVal);
646         }
647     }
648 
649     /**
650     If this $(D Task) was not started yet, execute it in the current
651     thread.  If it is finished, return its result.  If it is in progress,
652     execute any other $(D Task) from the $(D TaskPool) instance that
653     this $(D Task) was submitted to until this one
654     is finished.  If it threw an exception, rethrow that exception.
655     If no other tasks are available or this $(D Task) was executed using
656     $(D executeInNewThread), wait on a condition variable.
657      */
658     @property ref ReturnType workForce() @trusted
659     {
660         enforcePool();
661         this.pool.tryDeleteExecute(basePtr);
662 
663         while (true)
664         {
665             if (done)    // done() implicitly checks for exceptions.
666             {
667                 static if (is(ReturnType == void))
668                 {
669                     return;
670                 }
671                 else
672                 {
673                     return fixRef(this.returnVal);
674                 }
675             }
676 
677             AbstractTask* job;
678             {
679                 // Locking explicitly and calling popNoSync() because
680                 // pop() waits on a condition variable if there are no Tasks
681                 // in the queue.
682 
683                 pool.queueLock();
684                 scope(exit) pool.queueUnlock();
685                 job = pool.popNoSync();
686             }
687 
688 
689             if (job !is null)
690             {
691 
692                 version (verboseUnittest)
693                 {
694                     stderr.writeln("Doing workForce work.");
695                 }
696 
697                 pool.doJob(job);
698 
699                 if (done)
700                 {
701                     static if (is(ReturnType == void))
702                     {
703                         return;
704                     }
705                     else
706                     {
707                         return fixRef(this.returnVal);
708                     }
709                 }
710             }
711             else
712             {
713                 version (verboseUnittest)
714                 {
715                     stderr.writeln("Yield from workForce.");
716                 }
717 
718                 return yieldForce;
719             }
720         }
721     }
722 
723     /**
724     Returns $(D true) if the $(D Task) is finished executing.
725 
726     Throws:  Rethrows any exception thrown during the execution of the
727              $(D Task).
728     */
729     @property bool done() @trusted
730     {
731         // Explicitly forwarded for documentation purposes.
732         return base.done;
733     }
734 
735     /**
736     Create a new thread for executing this $(D Task), execute it in the
737     newly created thread, then terminate the thread.  This can be used for
738     future/promise parallelism.  An explicit priority may be given
739     to the $(D Task).  If one is provided, its value is forwarded to
740     $(D core.thread.Thread.priority). See $(REF task, std,parallelism) for
741     usage example.
742     */
743     void executeInNewThread() @trusted
744     {
745         pool = new TaskPool(basePtr);
746     }
747 
748     /// Ditto
749     void executeInNewThread(int priority) @trusted
750     {
751         pool = new TaskPool(basePtr, priority);
752     }
753 
754     @safe ~this()
755     {
756         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
757         {
758             yieldForce;
759         }
760     }
761 
762     // When this is uncommented, it somehow gets called on returning from
763     // scopedTask even though the struct shouldn't be getting copied.
764     //@disable this(this) {}
765 }
766 
767 // Calls $(D fpOrDelegate) with $(D args).  This is an
768 // adapter that makes $(D Task) work with delegates, function pointers and
769 // functors instead of just aliases.
770 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
771 {
772     return fpOrDelegate(args);
773 }
774 
775 /**
776 Creates a $(D Task) on the GC heap that calls an alias.  This may be executed
777 via $(D Task.executeInNewThread) or by submitting to a
778 $(REF TaskPool, std,parallelism).  A globally accessible instance of
779 $(D TaskPool) is provided by $(REF taskPool, std,parallelism).
780 
781 Returns:  A pointer to the $(D Task).
782 
783 Example:
784 ---
785 // Read two files into memory at the same time.
786 import std.file;
787 
788 void main()
789 {
790     // Create and execute a Task for reading
791     // foo.txt.
792     auto file1Task = task!read("foo.txt");
793     file1Task.executeInNewThread();
794 
795     // Read bar.txt in parallel.
796     auto file2Data = read("bar.txt");
797 
798     // Get the results of reading foo.txt.
799     auto file1Data = file1Task.yieldForce;
800 }
801 ---
802 
803 ---
804 // Sorts an array using a parallel quick sort algorithm.
805 // The first partition is done serially.  Both recursion
806 // branches are then executed in parallel.
807 //
808 // Timings for sorting an array of 1,000,000 doubles on
809 // an Athlon 64 X2 dual core machine:
810 //
811 // This implementation:               176 milliseconds.
812 // Equivalent serial implementation:  280 milliseconds
813 void parallelSort(T)(T[] data)
814 {
815     // Sort small subarrays serially.
816     if (data.length < 100)
817     {
818          std.algorithm.sort(data);
819          return;
820     }
821 
822     // Partition the array.
823     swap(data[$ / 2], data[$ - 1]);
824     auto pivot = data[$ - 1];
825     bool lessThanPivot(T elem) { return elem < pivot; }
826 
827     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
828     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
829 
830     auto less = data[0..$ - greaterEqual.length - 1];
831     greaterEqual = data[$ - greaterEqual.length..$];
832 
833     // Execute both recursion branches in parallel.
834     auto recurseTask = task!parallelSort(greaterEqual);
835     taskPool.put(recurseTask);
836     parallelSort(less);
837     recurseTask.yieldForce;
838 }
839 ---
840 */
task(alias fun,Args...)841 auto task(alias fun, Args...)(Args args)
842 {
843     return new Task!(fun, Args)(args);
844 }
845 
846 /**
847 Creates a $(D Task) on the GC heap that calls a function pointer, delegate, or
848 class/struct with overloaded opCall.
849 
850 Example:
851 ---
852 // Read two files in at the same time again,
853 // but this time use a function pointer instead
854 // of an alias to represent std.file.read.
855 import std.file;
856 
857 void main()
858 {
859     // Create and execute a Task for reading
860     // foo.txt.
861     auto file1Task = task(&read, "foo.txt");
862     file1Task.executeInNewThread();
863 
864     // Read bar.txt in parallel.
865     auto file2Data = read("bar.txt");
866 
867     // Get the results of reading foo.txt.
868     auto file1Data = file1Task.yieldForce;
869 }
870 ---
871 
872 Notes: This function takes a non-scope delegate, meaning it can be
873        used with closures.  If you can't allocate a closure due to objects
874        on the stack that have scoped destruction, see $(D scopedTask), which
875        takes a scope delegate.
876  */
877 auto task(F, Args...)(F delegateOrFp, Args args)
878 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
879 {
880     return new Task!(run, F, Args)(delegateOrFp, args);
881 }
882 
883 /**
884 Version of $(D task) usable from $(D @safe) code.  Usage mechanics are
885 identical to the non-@safe case, but safety introduces some restrictions:
886 
887 1.  $(D fun) must be @safe or @trusted.
888 
889 2.  $(D F) must not have any unshared aliasing as defined by
890     $(REF hasUnsharedAliasing, std,traits).  This means it
891     may not be an unshared delegate or a non-shared class or struct
892     with overloaded $(D opCall).  This also precludes accepting template
893     alias parameters.
894 
895 3.  $(D Args) must not have unshared aliasing.
896 
897 4.  $(D fun) must not return by reference.
898 
899 5.  The return type must not have unshared aliasing unless $(D fun) is
900     $(D pure) or the $(D Task) is executed via $(D executeInNewThread) instead
901     of using a $(D TaskPool).
902 
903 */
904 @trusted auto task(F, Args...)(F fun, Args args)
905 if (is(typeof(fun(args))) && isSafeTask!F)
906 {
907     return new Task!(run, F, Args)(fun, args);
908 }
909 
910 /**
911 These functions allow the creation of $(D Task) objects on the stack rather
912 than the GC heap.  The lifetime of a $(D Task) created by $(D scopedTask)
913 cannot exceed the lifetime of the scope it was created in.
914 
915 $(D scopedTask) might be preferred over $(D task):
916 
917 1.  When a $(D Task) that calls a delegate is being created and a closure
918     cannot be allocated due to objects on the stack that have scoped
919     destruction.  The delegate overload of $(D scopedTask) takes a $(D scope)
920     delegate.
921 
922 2.  As a micro-optimization, to avoid the heap allocation associated with
923     $(D task) or with the creation of a closure.
924 
925 Usage is otherwise identical to $(D task).
926 
927 Notes:  $(D Task) objects created using $(D scopedTask) will automatically
928 call $(D Task.yieldForce) in their destructor if necessary to ensure
929 the $(D Task) is complete before the stack frame they reside on is destroyed.
930 */
scopedTask(alias fun,Args...)931 auto scopedTask(alias fun, Args...)(Args args)
932 {
933     auto ret = Task!(fun, Args)(args);
934     ret.isScoped = true;
935     return ret;
936 }
937 
938 /// Ditto
939 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
940 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
941 {
942     auto ret = Task!(run, F, Args)(delegateOrFp, args);
943     ret.isScoped = true;
944     return ret;
945 }
946 
947 /// Ditto
948 @trusted auto scopedTask(F, Args...)(F fun, Args args)
949 if (is(typeof(fun(args))) && isSafeTask!F)
950 {
951     auto ret = Task!(run, F, Args)(fun, args);
952     ret.isScoped = true;
953     return ret;
954 }
955 
956 /**
957 The total number of CPU cores available on the current machine, as reported by
958 the operating system.
959 */
960 immutable uint totalCPUs;
961 
962 /*
963 This class serves two purposes:
964 
965 1.  It distinguishes std.parallelism threads from other threads so that
966     the std.parallelism daemon threads can be terminated.
967 
968 2.  It adds a reference to the pool that the thread is a member of,
969     which is also necessary to allow the daemon threads to be properly
970     terminated.
971 */
972 private final class ParallelismThread : Thread
973 {
this(void delegate ()dg)974     this(void delegate() dg)
975     {
976         super(dg);
977     }
978 
979     TaskPool pool;
980 }
981 
982 // Kill daemon threads.
~this()983 shared static ~this()
984 {
985     foreach (ref thread; Thread)
986     {
987         auto pthread = cast(ParallelismThread) thread;
988         if (pthread is null) continue;
989         auto pool = pthread.pool;
990         if (!pool.isDaemon) continue;
991         pool.stop();
992         pthread.join();
993     }
994 }
995 
996 /**
997 This class encapsulates a task queue and a set of worker threads.  Its purpose
998 is to efficiently map a large number of $(D Task)s onto a smaller number of
999 threads.  A task queue is a FIFO queue of $(D Task) objects that have been
1000 submitted to the $(D TaskPool) and are awaiting execution.  A worker thread is a
1001 thread that executes the $(D Task) at the front of the queue when one is
1002 available and sleeps when the queue is empty.
1003 
1004 This class should usually be used via the global instantiation
1005 available via the $(REF taskPool, std,parallelism) property.
1006 Occasionally it is useful to explicitly instantiate a $(D TaskPool):
1007 
1008 1.  When you want $(D TaskPool) instances with multiple priorities, for example
1009     a low priority pool and a high priority pool.
1010 
1011 2.  When the threads in the global task pool are waiting on a synchronization
1012     primitive (for example a mutex), and you want to parallelize the code that
1013     needs to run before these threads can be resumed.
1014  */
1015 final class TaskPool
1016 {
1017 private:
1018 
1019     // A pool can either be a regular pool or a single-task pool.  A
1020     // single-task pool is a dummy pool that's fired up for
1021     // Task.executeInNewThread().
1022     bool isSingleTask;
1023 
1024     ParallelismThread[] pool;
1025     Thread singleTaskThread;
1026 
1027     AbstractTask* head;
1028     AbstractTask* tail;
1029     PoolState status = PoolState.running;
1030     Condition workerCondition;
1031     Condition waiterCondition;
1032     Mutex queueMutex;
1033     Mutex waiterMutex;  // For waiterCondition
1034 
1035     // The instanceStartIndex of the next instance that will be created.
1036     __gshared static size_t nextInstanceIndex = 1;
1037 
1038     // The index of the current thread.
1039     static size_t threadIndex;
1040 
1041     // The index of the first thread in this instance.
1042     immutable size_t instanceStartIndex;
1043 
1044     // The index that the next thread to be initialized in this pool will have.
1045     size_t nextThreadIndex;
1046 
1047     enum PoolState : ubyte
1048     {
1049         running,
1050         finishing,
1051         stopNow
1052     }
1053 
doJob(AbstractTask * job)1054     void doJob(AbstractTask* job)
1055     {
1056         assert(job.taskStatus == TaskStatus.inProgress);
1057         assert(job.next is null);
1058         assert(job.prev is null);
1059 
1060         scope(exit)
1061         {
1062             if (!isSingleTask)
1063             {
1064                 waiterLock();
1065                 scope(exit) waiterUnlock();
1066                 notifyWaiters();
1067             }
1068         }
1069 
1070         try
1071         {
1072             job.job();
1073         }
1074         catch (Throwable e)
1075         {
1076             job.exception = e;
1077         }
1078 
1079         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1080     }
1081 
1082     // This function is used for dummy pools created by Task.executeInNewThread().
doSingleTask()1083     void doSingleTask()
1084     {
1085         // No synchronization.  Pool is guaranteed to only have one thread,
1086         // and the queue is submitted to before this thread is created.
1087         assert(head);
1088         auto t = head;
1089         t.next = t.prev = head = null;
1090         doJob(t);
1091     }
1092 
1093     // This function performs initialization for each thread that affects
1094     // thread local storage and therefore must be done from within the
1095     // worker thread.  It then calls executeWorkLoop().
startWorkLoop()1096     void startWorkLoop()
1097     {
1098         // Initialize thread index.
1099         {
1100             queueLock();
1101             scope(exit) queueUnlock();
1102             threadIndex = nextThreadIndex;
1103             nextThreadIndex++;
1104         }
1105 
1106         executeWorkLoop();
1107     }
1108 
1109     // This is the main work loop that worker threads spend their time in
1110     // until they terminate.  It's also entered by non-worker threads when
1111     // finish() is called with the blocking variable set to true.
executeWorkLoop()1112     void executeWorkLoop()
1113     {
1114         while (atomicReadUbyte(status) != PoolState.stopNow)
1115         {
1116             AbstractTask* task = pop();
1117             if (task is null)
1118             {
1119                 if (atomicReadUbyte(status) == PoolState.finishing)
1120                 {
1121                     atomicSetUbyte(status, PoolState.stopNow);
1122                     return;
1123                 }
1124             }
1125             else
1126             {
1127                 doJob(task);
1128             }
1129         }
1130     }
1131 
1132     // Pop a task off the queue.
pop()1133     AbstractTask* pop()
1134     {
1135         queueLock();
1136         scope(exit) queueUnlock();
1137         auto ret = popNoSync();
1138         while (ret is null && status == PoolState.running)
1139         {
1140             wait();
1141             ret = popNoSync();
1142         }
1143         return ret;
1144     }
1145 
popNoSync()1146     AbstractTask* popNoSync()
1147     out(returned)
1148     {
1149         /* If task.prev and task.next aren't null, then another thread
1150          * can try to delete this task from the pool after it's
1151          * alreadly been deleted/popped.
1152          */
1153         if (returned !is null)
1154         {
1155             assert(returned.next is null);
1156             assert(returned.prev is null);
1157         }
1158     }
1159     body
1160     {
1161         if (isSingleTask) return null;
1162 
1163         AbstractTask* returned = head;
1164         if (head !is null)
1165         {
1166             head = head.next;
1167             returned.prev = null;
1168             returned.next = null;
1169             returned.taskStatus = TaskStatus.inProgress;
1170         }
1171         if (head !is null)
1172         {
1173             head.prev = null;
1174         }
1175 
1176         return returned;
1177     }
1178 
1179     // Push a task onto the queue.
abstractPut(AbstractTask * task)1180     void abstractPut(AbstractTask* task)
1181     {
1182         queueLock();
1183         scope(exit) queueUnlock();
1184         abstractPutNoSync(task);
1185     }
1186 
abstractPutNoSync(AbstractTask * task)1187     void abstractPutNoSync(AbstractTask* task)
1188     in
1189     {
1190         assert(task);
1191     }
1192     out
1193     {
1194         import std.conv : text;
1195 
1196         assert(tail.prev !is tail);
1197         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1198         if (tail.prev !is null)
1199         {
1200             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1201         }
1202     }
1203     body
1204     {
1205         // Not using enforce() to save on function call overhead since this
1206         // is a performance critical function.
1207         if (status != PoolState.running)
1208         {
1209             throw new Error(
1210                 "Cannot submit a new task to a pool after calling " ~
1211                 "finish() or stop()."
1212             );
1213         }
1214 
1215         task.next = null;
1216         if (head is null)   //Queue is empty.
1217         {
1218             head = task;
1219             tail = task;
1220             tail.prev = null;
1221         }
1222         else
1223         {
1224             assert(tail);
1225             task.prev = tail;
1226             tail.next = task;
1227             tail = task;
1228         }
1229         notify();
1230     }
1231 
abstractPutGroupNoSync(AbstractTask * h,AbstractTask * t)1232     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1233     {
1234         if (status != PoolState.running)
1235         {
1236             throw new Error(
1237                 "Cannot submit a new task to a pool after calling " ~
1238                 "finish() or stop()."
1239             );
1240         }
1241 
1242         if (head is null)
1243         {
1244             head = h;
1245             tail = t;
1246         }
1247         else
1248         {
1249             h.prev = tail;
1250             tail.next = h;
1251             tail = t;
1252         }
1253 
1254         notifyAll();
1255     }
1256 
tryDeleteExecute(AbstractTask * toExecute)1257     void tryDeleteExecute(AbstractTask* toExecute)
1258     {
1259         if (isSingleTask) return;
1260 
1261         if ( !deleteItem(toExecute) )
1262         {
1263             return;
1264         }
1265 
1266         try
1267         {
1268             toExecute.job();
1269         }
1270         catch (Exception e)
1271         {
1272             toExecute.exception = e;
1273         }
1274 
1275         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1276     }
1277 
deleteItem(AbstractTask * item)1278     bool deleteItem(AbstractTask* item)
1279     {
1280         queueLock();
1281         scope(exit) queueUnlock();
1282         return deleteItemNoSync(item);
1283     }
1284 
deleteItemNoSync(AbstractTask * item)1285     bool deleteItemNoSync(AbstractTask* item)
1286     {
1287         if (item.taskStatus != TaskStatus.notStarted)
1288         {
1289             return false;
1290         }
1291         item.taskStatus = TaskStatus.inProgress;
1292 
1293         if (item is head)
1294         {
1295             // Make sure head gets set properly.
1296             popNoSync();
1297             return true;
1298         }
1299         if (item is tail)
1300         {
1301             tail = tail.prev;
1302             if (tail !is null)
1303             {
1304                 tail.next = null;
1305             }
1306             item.next = null;
1307             item.prev = null;
1308             return true;
1309         }
1310         if (item.next !is null)
1311         {
1312             assert(item.next.prev is item);  // Check queue consistency.
1313             item.next.prev = item.prev;
1314         }
1315         if (item.prev !is null)
1316         {
1317             assert(item.prev.next is item);  // Check queue consistency.
1318             item.prev.next = item.next;
1319         }
1320         item.next = null;
1321         item.prev = null;
1322         return true;
1323     }
1324 
queueLock()1325     void queueLock()
1326     {
1327         assert(queueMutex);
1328         if (!isSingleTask) queueMutex.lock();
1329     }
1330 
queueUnlock()1331     void queueUnlock()
1332     {
1333         assert(queueMutex);
1334         if (!isSingleTask) queueMutex.unlock();
1335     }
1336 
waiterLock()1337     void waiterLock()
1338     {
1339         if (!isSingleTask) waiterMutex.lock();
1340     }
1341 
waiterUnlock()1342     void waiterUnlock()
1343     {
1344         if (!isSingleTask) waiterMutex.unlock();
1345     }
1346 
wait()1347     void wait()
1348     {
1349         if (!isSingleTask) workerCondition.wait();
1350     }
1351 
notify()1352     void notify()
1353     {
1354         if (!isSingleTask) workerCondition.notify();
1355     }
1356 
notifyAll()1357     void notifyAll()
1358     {
1359         if (!isSingleTask) workerCondition.notifyAll();
1360     }
1361 
waitUntilCompletion()1362     void waitUntilCompletion()
1363     {
1364         if (isSingleTask)
1365         {
1366             singleTaskThread.join();
1367         }
1368         else
1369         {
1370             waiterCondition.wait();
1371         }
1372     }
1373 
notifyWaiters()1374     void notifyWaiters()
1375     {
1376         if (!isSingleTask) waiterCondition.notifyAll();
1377     }
1378 
1379     // Private constructor for creating dummy pools that only have one thread,
1380     // only execute one Task, and then terminate.  This is used for
1381     // Task.executeInNewThread().
1382     this(AbstractTask* task, int priority = int.max)
1383     {
1384         assert(task);
1385 
1386         // Dummy value, not used.
1387         instanceStartIndex = 0;
1388 
1389         this.isSingleTask = true;
1390         task.taskStatus = TaskStatus.inProgress;
1391         this.head = task;
1392         singleTaskThread = new Thread(&doSingleTask);
1393         singleTaskThread.start();
1394 
1395         // Disabled until writing code to support
1396         // running thread with specified priority
1397         // See https://d.puremagic.com/issues/show_bug.cgi?id=8960
1398 
1399         /*if (priority != int.max)
1400         {
1401             singleTaskThread.priority = priority;
1402         }*/
1403     }
1404 
1405 public:
1406     // This is used in parallel_algorithm but is too unstable to document
1407     // as public API.
defaultWorkUnitSize(size_t rangeLen)1408     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1409     {
1410         import std.algorithm.comparison : max;
1411 
1412         if (this.size == 0)
1413         {
1414             return rangeLen;
1415         }
1416 
1417         immutable size_t eightSize = 4 * (this.size + 1);
1418         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1419         return max(ret, 1);
1420     }
1421 
1422     /**
1423     Default constructor that initializes a $(D TaskPool) with
1424     $(D totalCPUs) - 1 worker threads.  The minus 1 is included because the
1425     main thread will also be available to do work.
1426 
1427     Note:  On single-core machines, the primitives provided by $(D TaskPool)
1428            operate transparently in single-threaded mode.
1429      */
this()1430     this() @trusted
1431     {
1432         this(totalCPUs - 1);
1433     }
1434 
1435     /**
1436     Allows for custom number of worker threads.
1437     */
this(size_t nWorkers)1438     this(size_t nWorkers) @trusted
1439     {
1440         synchronized(typeid(TaskPool))
1441         {
1442             instanceStartIndex = nextInstanceIndex;
1443 
1444             // The first worker thread to be initialized will have this index,
1445             // and will increment it.  The second worker to be initialized will
1446             // have this index plus 1.
1447             nextThreadIndex = instanceStartIndex;
1448             nextInstanceIndex += nWorkers;
1449         }
1450 
1451         queueMutex = new Mutex(this);
1452         waiterMutex = new Mutex();
1453         workerCondition = new Condition(queueMutex);
1454         waiterCondition = new Condition(waiterMutex);
1455 
1456         pool = new ParallelismThread[nWorkers];
1457         foreach (ref poolThread; pool)
1458         {
1459             poolThread = new ParallelismThread(&startWorkLoop);
1460             poolThread.pool = this;
1461             poolThread.start();
1462         }
1463     }
1464 
1465     /**
1466     Implements a parallel foreach loop over a range.  This works by implicitly
1467     creating and submitting one $(D Task) to the $(D TaskPool) for each worker
1468     thread.  A work unit is a set of consecutive elements of $(D range) to
1469     be processed by a worker thread between communication with any other
1470     thread.  The number of elements processed per work unit is controlled by the
1471     $(D workUnitSize) parameter.  Smaller work units provide better load
1472     balancing, but larger work units avoid the overhead of communicating
1473     with other threads frequently to fetch the next work unit.  Large work
1474     units also avoid false sharing in cases where the range is being modified.
1475     The less time a single iteration of the loop takes, the larger
1476     $(D workUnitSize) should be.  For very expensive loop bodies,
1477     $(D workUnitSize) should  be 1.  An overload that chooses a default work
1478     unit size is also available.
1479 
1480     Example:
1481     ---
1482     // Find the logarithm of every number from 1 to
1483     // 10_000_000 in parallel.
1484     auto logs = new double[10_000_000];
1485 
1486     // Parallel foreach works with or without an index
1487     // variable.  It can be iterate by ref if range.front
1488     // returns by ref.
1489 
1490     // Iterate over logs using work units of size 100.
1491     foreach (i, ref elem; taskPool.parallel(logs, 100))
1492     {
1493         elem = log(i + 1.0);
1494     }
1495 
1496     // Same thing, but use the default work unit size.
1497     //
1498     // Timings on an Athlon 64 X2 dual core machine:
1499     //
1500     // Parallel foreach:  388 milliseconds
1501     // Regular foreach:   619 milliseconds
1502     foreach (i, ref elem; taskPool.parallel(logs))
1503     {
1504         elem = log(i + 1.0);
1505     }
1506     ---
1507 
1508     Notes:
1509 
1510     The memory usage of this implementation is guaranteed to be constant
1511     in $(D range.length).
1512 
1513     Breaking from a parallel foreach loop via a break, labeled break,
1514     labeled continue, return or goto statement throws a
1515     $(D ParallelForeachError).
1516 
1517     In the case of non-random access ranges, parallel foreach buffers lazily
1518     to an array of size $(D workUnitSize) before executing the parallel portion
1519     of the loop.  The exception is that, if a parallel foreach is executed
1520     over a range returned by $(D asyncBuf) or $(D map), the copying is elided
1521     and the buffers are simply swapped.  In this case $(D workUnitSize) is
1522     ignored and the work unit size is set to the  buffer size of $(D range).
1523 
1524     A memory barrier is guaranteed to be executed on exit from the loop,
1525     so that results produced by all threads are visible in the calling thread.
1526 
1527     $(B Exception Handling):
1528 
1529     When at least one exception is thrown from inside a parallel foreach loop,
1530     the submission of additional $(D Task) objects is terminated as soon as
1531     possible, in a non-deterministic manner.  All executing or
1532     enqueued work units are allowed to complete.  Then, all exceptions that
1533     were thrown by any work unit are chained using $(D Throwable.next) and
1534     rethrown.  The order of the exception chaining is non-deterministic.
1535     */
1536     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1537     {
1538         import std.exception : enforce;
1539         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1540         alias RetType = ParallelForeach!R;
1541         return RetType(this, range, workUnitSize);
1542     }
1543 
1544 
1545     /// Ditto
1546     ParallelForeach!R parallel(R)(R range)
1547     {
1548         static if (hasLength!R)
1549         {
1550             // Default work unit size is such that we would use 4x as many
1551             // slots as are in this thread pool.
1552             size_t workUnitSize = defaultWorkUnitSize(range.length);
1553             return parallel(range, workUnitSize);
1554         }
1555         else
1556         {
1557             // Just use a really, really dumb guess if the user is too lazy to
1558             // specify.
1559             return parallel(range, 512);
1560         }
1561     }
1562 
1563     ///
amap(functions...)1564     template amap(functions...)
1565     {
1566         /**
1567         Eager parallel map.  The eagerness of this function means it has less
1568         overhead than the lazily evaluated $(D TaskPool.map) and should be
1569         preferred where the memory requirements of eagerness are acceptable.
1570         $(D functions) are the functions to be evaluated, passed as template
1571         alias parameters in a style similar to
1572         $(REF map, std,algorithm,iteration).
1573         The first argument must be a random access range. For performance
1574         reasons, amap will assume the range elements have not yet been
1575         initialized. Elements will be overwritten without calling a destructor
1576         nor doing an assignment. As such, the range must not contain meaningful
1577         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1578         objects in their $(D .init) state.
1579 
1580         ---
1581         auto numbers = iota(100_000_000.0);
1582 
1583         // Find the square roots of numbers.
1584         //
1585         // Timings on an Athlon 64 X2 dual core machine:
1586         //
1587         // Parallel eager map:                   0.802 s
1588         // Equivalent serial implementation:     1.768 s
1589         auto squareRoots = taskPool.amap!sqrt(numbers);
1590         ---
1591 
1592         Immediately after the range argument, an optional work unit size argument
1593         may be provided.  Work units as used by $(D amap) are identical to those
1594         defined for parallel foreach.  If no work unit size is provided, the
1595         default work unit size is used.
1596 
1597         ---
1598         // Same thing, but make work unit size 100.
1599         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1600         ---
1601 
1602         An output range for returning the results may be provided as the last
1603         argument.  If one is not provided, an array of the proper type will be
1604         allocated on the garbage collected heap.  If one is provided, it must be a
1605         random access range with assignable elements, must have reference
1606         semantics with respect to assignment to its elements, and must have the
1607         same length as the input range.  Writing to adjacent elements from
1608         different threads must be safe.
1609 
1610         ---
1611         // Same thing, but explicitly allocate an array
1612         // to return the results in.  The element type
1613         // of the array may be either the exact type
1614         // returned by functions or an implicit conversion
1615         // target.
1616         auto squareRoots = new float[numbers.length];
1617         taskPool.amap!sqrt(numbers, squareRoots);
1618 
1619         // Multiple functions, explicit output range, and
1620         // explicit work unit size.
1621         auto results = new Tuple!(float, real)[numbers.length];
1622         taskPool.amap!(sqrt, log)(numbers, 100, results);
1623         ---
1624 
1625         Note:
1626 
1627         A memory barrier is guaranteed to be executed after all results are written
1628         but before returning so that results produced by all threads are visible
1629         in the calling thread.
1630 
1631         Tips:
1632 
1633         To perform the mapping operation in place, provide the same range for the
1634         input and output range.
1635 
1636         To parallelize the copying of a range with expensive to evaluate elements
1637         to an array, pass an identity function (a function that just returns
1638         whatever argument is provided to it) to $(D amap).
1639 
1640         $(B Exception Handling):
1641 
1642         When at least one exception is thrown from inside the map functions,
1643         the submission of additional $(D Task) objects is terminated as soon as
1644         possible, in a non-deterministic manner.  All currently executing or
1645         enqueued work units are allowed to complete.  Then, all exceptions that
1646         were thrown from any work unit are chained using $(D Throwable.next) and
1647         rethrown.  The order of the exception chaining is non-deterministic.
1648         */
1649         auto amap(Args...)(Args args)
1650         if (isRandomAccessRange!(Args[0]))
1651         {
1652             import std.conv : emplaceRef;
1653 
1654             alias fun = adjoin!(staticMap!(unaryFun, functions));
1655 
1656             alias range = args[0];
1657             immutable len = range.length;
1658 
1659             static if (
1660                 Args.length > 1 &&
1661                 randAssignable!(Args[$ - 1]) &&
1662                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1663                 )
1664             {
1665                 import std.conv : text;
1666                 import std.exception : enforce;
1667 
1668                 alias buf = args[$ - 1];
1669                 alias args2 = args[0..$ - 1];
1670                 alias Args2 = Args[0..$ - 1];
1671                 enforce(buf.length == len,
1672                         text("Can't use a user supplied buffer that's the wrong ",
1673                              "size.  (Expected  :", len, " Got:  ", buf.length));
1674             }
1675             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1676             {
1677                 static assert(0, "Wrong buffer type.");
1678             }
1679             else
1680             {
1681                 import std.array : uninitializedArray;
1682 
1683                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1684                 alias args2 = args;
1685                 alias Args2 = Args;
1686             }
1687 
1688             if (!len) return buf;
1689 
1690             static if (isIntegral!(Args2[$ - 1]))
1691             {
1692                 static assert(args2.length == 2);
1693                 auto workUnitSize = cast(size_t) args2[1];
1694             }
1695             else
1696             {
1697                 static assert(args2.length == 1, Args);
1698                 auto workUnitSize = defaultWorkUnitSize(range.length);
1699             }
1700 
1701             alias R = typeof(range);
1702 
1703             if (workUnitSize > len)
1704             {
1705                 workUnitSize = len;
1706             }
1707 
1708             // Handle as a special case:
1709             if (size == 0)
1710             {
1711                 size_t index = 0;
1712                 foreach (elem; range)
1713                 {
1714                     emplaceRef(buf[index++], fun(elem));
1715                 }
1716                 return buf;
1717             }
1718 
1719             // Effectively -1:  chunkIndex + 1 == 0:
1720             shared size_t workUnitIndex = size_t.max;
1721             shared bool shouldContinue = true;
1722 
1723             void doIt()
1724             {
1725                 import std.algorithm.comparison : min;
1726 
1727                 scope(failure)
1728                 {
1729                     // If an exception is thrown, all threads should bail.
1730                     atomicStore(shouldContinue, false);
1731                 }
1732 
1733                 while (atomicLoad(shouldContinue))
1734                 {
1735                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1736                     immutable start = workUnitSize * myUnitIndex;
1737                     if (start >= len)
1738                     {
1739                         atomicStore(shouldContinue, false);
1740                         break;
1741                     }
1742 
1743                     immutable end = min(len, start + workUnitSize);
1744 
1745                     static if (hasSlicing!R)
1746                     {
1747                         auto subrange = range[start .. end];
1748                         foreach (i; start .. end)
1749                         {
1750                             emplaceRef(buf[i], fun(subrange.front));
1751                             subrange.popFront();
1752                         }
1753                     }
1754                     else
1755                     {
1756                         foreach (i; start .. end)
1757                         {
1758                             emplaceRef(buf[i], fun(range[i]));
1759                         }
1760                     }
1761                 }
1762             }
1763 
1764             submitAndExecute(this, &doIt);
1765             return buf;
1766         }
1767     }
1768 
1769     ///
map(functions...)1770     template map(functions...)
1771     {
1772         /**
1773         A semi-lazy parallel map that can be used for pipelining.  The map
1774         functions are evaluated for the first $(D bufSize) elements and stored in a
1775         buffer and made available to $(D popFront).  Meanwhile, in the
1776         background a second buffer of the same size is filled.  When the first
1777         buffer is exhausted, it is swapped with the second buffer and filled while
1778         the values from what was originally the second buffer are read.  This
1779         implementation allows for elements to be written to the buffer without
1780         the need for atomic operations or synchronization for each write, and
1781         enables the mapping function to be evaluated efficiently in parallel.
1782 
1783         $(D map) has more overhead than the simpler procedure used by $(D amap)
1784         but avoids the need to keep all results in memory simultaneously and works
1785         with non-random access ranges.
1786 
1787         Params:
1788 
1789         source = The input range to be mapped.  If $(D source) is not random
1790         access it will be lazily buffered to an array of size $(D bufSize) before
1791         the map function is evaluated.  (For an exception to this rule, see Notes.)
1792 
1793         bufSize = The size of the buffer to store the evaluated elements.
1794 
1795         workUnitSize = The number of elements to evaluate in a single
1796         $(D Task).  Must be less than or equal to $(D bufSize), and
1797         should be a fraction of $(D bufSize) such that all worker threads can be
1798         used.  If the default of size_t.max is used, workUnitSize will be set to
1799         the pool-wide default.
1800 
1801         Returns:  An input range representing the results of the map.  This range
1802                   has a length iff $(D source) has a length.
1803 
1804         Notes:
1805 
1806         If a range returned by $(D map) or $(D asyncBuf) is used as an input to
1807         $(D map), then as an optimization the copying from the output buffer
1808         of the first range to the input buffer of the second range is elided, even
1809         though the ranges returned by $(D map) and $(D asyncBuf) are non-random
1810         access ranges.  This means that the $(D bufSize) parameter passed to the
1811         current call to $(D map) will be ignored and the size of the buffer
1812         will be the buffer size of $(D source).
1813 
1814         Example:
1815         ---
1816         // Pipeline reading a file, converting each line
1817         // to a number, taking the logarithms of the numbers,
1818         // and performing the additions necessary to find
1819         // the sum of the logarithms.
1820 
1821         auto lineRange = File("numberList.txt").byLine();
1822         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1823         auto nums = taskPool.map!(to!double)(dupedLines);
1824         auto logs = taskPool.map!log10(nums);
1825 
1826         double sum = 0;
1827         foreach (elem; logs)
1828         {
1829             sum += elem;
1830         }
1831         ---
1832 
1833         $(B Exception Handling):
1834 
1835         Any exceptions thrown while iterating over $(D source)
1836         or computing the map function are re-thrown on a call to $(D popFront) or,
1837         if thrown during construction, are simply allowed to propagate to the
1838         caller.  In the case of exceptions thrown while computing the map function,
1839         the exceptions are chained as in $(D TaskPool.amap).
1840         */
1841         auto
1842         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1843         if (isInputRange!S)
1844         {
1845             import std.exception : enforce;
1846 
1847             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1848                     "Work unit size must be smaller than buffer size.");
1849             alias fun = adjoin!(staticMap!(unaryFun, functions));
1850 
1851             static final class Map
1852             {
1853                 // This is a class because the task needs to be located on the
1854                 // heap and in the non-random access case source needs to be on
1855                 // the heap, too.
1856 
1857             private:
1858                 enum bufferTrick = is(typeof(source.buf1)) &&
1859                 is(typeof(source.bufPos)) &&
1860                 is(typeof(source.doBufSwap()));
1861 
1862                 alias E = MapType!(S, functions);
1863                 E[] buf1, buf2;
1864                 S source;
1865                 TaskPool pool;
1866                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1867                 size_t workUnitSize;
1868                 size_t bufPos;
1869                 bool lastTaskWaited;
1870 
1871             static if (isRandomAccessRange!S)
1872             {
1873                 alias FromType = S;
1874 
1875                 void popSource()
1876                 {
1877                     import std.algorithm.comparison : min;
1878 
1879                     static if (__traits(compiles, source[0 .. source.length]))
1880                     {
1881                         source = source[min(buf1.length, source.length)..source.length];
1882                     }
1883                     else static if (__traits(compiles, source[0..$]))
1884                     {
1885                         source = source[min(buf1.length, source.length)..$];
1886                     }
1887                     else
1888                     {
1889                         static assert(0, "S must have slicing for Map."
1890                                       ~ "  " ~ S.stringof ~ " doesn't.");
1891                     }
1892                 }
1893             }
1894             else static if (bufferTrick)
1895             {
1896                 // Make sure we don't have the buffer recycling overload of
1897                 // asyncBuf.
1898                 static if (
1899                     is(typeof(source.source)) &&
1900                     isRoundRobin!(typeof(source.source))
1901                 )
1902                 {
1903                     static assert(0, "Cannot execute a parallel map on " ~
1904                                   "the buffer recycling overload of asyncBuf."
1905                                  );
1906                 }
1907 
1908                 alias FromType = typeof(source.buf1);
1909                 FromType from;
1910 
1911                 // Just swap our input buffer with source's output buffer.
1912                 // No need to copy element by element.
1913                 FromType dumpToFrom()
1914                 {
1915                     import std.algorithm.mutation : swap;
1916 
1917                     assert(source.buf1.length <= from.length);
1918                     from.length = source.buf1.length;
1919                     swap(source.buf1, from);
1920 
1921                     // Just in case this source has been popped before
1922                     // being sent to map:
1923                     from = from[source.bufPos..$];
1924 
1925                     static if (is(typeof(source._length)))
1926                     {
1927                         source._length -= (from.length - source.bufPos);
1928                     }
1929 
1930                     source.doBufSwap();
1931 
1932                     return from;
1933                 }
1934             }
1935             else
1936             {
1937                 alias FromType = ElementType!S[];
1938 
1939                 // The temporary array that data is copied to before being
1940                 // mapped.
1941                 FromType from;
1942 
1943                 FromType dumpToFrom()
1944                 {
1945                     assert(from !is null);
1946 
1947                     size_t i;
1948                     for (; !source.empty && i < from.length; source.popFront())
1949                     {
1950                         from[i++] = source.front;
1951                     }
1952 
1953                     from = from[0 .. i];
1954                     return from;
1955                 }
1956             }
1957 
1958             static if (hasLength!S)
1959             {
1960                 size_t _length;
1961 
1962                 public @property size_t length() const @safe pure nothrow
1963                 {
1964                     return _length;
1965                 }
1966             }
1967 
1968                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
1969                 {
1970                     static if (bufferTrick)
1971                     {
1972                         bufSize = source.buf1.length;
1973                     }
1974 
1975                     buf1.length = bufSize;
1976                     buf2.length = bufSize;
1977 
1978                     static if (!isRandomAccessRange!S)
1979                     {
1980                         from.length = bufSize;
1981                     }
1982 
1983                     this.workUnitSize = (workUnitSize == size_t.max) ?
1984                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
1985                     this.source = source;
1986                     this.pool = pool;
1987 
1988                     static if (hasLength!S)
1989                     {
1990                         _length = source.length;
1991                     }
1992 
1993                     buf1 = fillBuf(buf1);
1994                     submitBuf2();
1995                 }
1996 
1997                 // The from parameter is a dummy and ignored in the random access
1998                 // case.
1999                 E[] fillBuf(E[] buf)
2000                 {
2001                     import std.algorithm.comparison : min;
2002 
2003                     static if (isRandomAccessRange!S)
2004                     {
2005                         import std.range : take;
2006                         auto toMap = take(source, buf.length);
2007                         scope(success) popSource();
2008                     }
2009                     else
2010                     {
2011                         auto toMap = dumpToFrom();
2012                     }
2013 
2014                     buf = buf[0 .. min(buf.length, toMap.length)];
2015 
2016                     // Handle as a special case:
2017                     if (pool.size == 0)
2018                     {
2019                         size_t index = 0;
2020                         foreach (elem; toMap)
2021                         {
2022                             buf[index++] = fun(elem);
2023                         }
2024                         return buf;
2025                     }
2026 
2027                     pool.amap!functions(toMap, workUnitSize, buf);
2028 
2029                     return buf;
2030                 }
2031 
2032                 void submitBuf2()
2033                 in
2034                 {
2035                     assert(nextBufTask.prev is null);
2036                     assert(nextBufTask.next is null);
2037                 } body
2038                 {
2039                     // Hack to reuse the task object.
2040 
2041                     nextBufTask = typeof(nextBufTask).init;
2042                     nextBufTask._args[0] = &fillBuf;
2043                     nextBufTask._args[1] = buf2;
2044                     pool.put(nextBufTask);
2045                 }
2046 
2047                 void doBufSwap()
2048                 {
2049                     if (lastTaskWaited)
2050                     {
2051                         // Then the source is empty.  Signal it here.
2052                         buf1 = null;
2053                         buf2 = null;
2054 
2055                         static if (!isRandomAccessRange!S)
2056                         {
2057                             from = null;
2058                         }
2059 
2060                         return;
2061                     }
2062 
2063                     buf2 = buf1;
2064                     buf1 = nextBufTask.yieldForce;
2065                     bufPos = 0;
2066 
2067                     if (source.empty)
2068                     {
2069                         lastTaskWaited = true;
2070                     }
2071                     else
2072                     {
2073                         submitBuf2();
2074                     }
2075                 }
2076 
2077             public:
2078                 @property auto front()
2079                 {
2080                     return buf1[bufPos];
2081                 }
2082 
2083                 void popFront()
2084                 {
2085                     static if (hasLength!S)
2086                     {
2087                         _length--;
2088                     }
2089 
2090                     bufPos++;
2091                     if (bufPos >= buf1.length)
2092                     {
2093                         doBufSwap();
2094                     }
2095                 }
2096 
2097                 static if (isInfinite!S)
2098                 {
2099                     enum bool empty = false;
2100                 }
2101                 else
2102                 {
2103 
2104                     bool empty() const @property
2105                     {
2106                         // popFront() sets this when source is empty
2107                         return buf1.length == 0;
2108                     }
2109                 }
2110             }
2111             return new Map(source, bufSize, workUnitSize, this);
2112         }
2113     }
2114 
2115     /**
2116     Given a $(D source) range that is expensive to iterate over, returns an
2117     input range that asynchronously buffers the contents of
2118     $(D source) into a buffer of $(D bufSize) elements in a worker thread,
2119     while making previously buffered elements from a second buffer, also of size
2120     $(D bufSize), available via the range interface of the returned
2121     object.  The returned range has a length iff $(D hasLength!S).
2122     $(D asyncBuf) is useful, for example, when performing expensive operations
2123     on the elements of ranges that represent data on a disk or network.
2124 
2125     Example:
2126     ---
2127     import std.conv, std.stdio;
2128 
2129     void main()
2130     {
2131         // Fetch lines of a file in a background thread
2132         // while processing previously fetched lines,
2133         // dealing with byLine's buffer recycling by
2134         // eagerly duplicating every line.
2135         auto lines = File("foo.txt").byLine();
2136         auto duped = std.algorithm.map!"a.idup"(lines);
2137 
2138         // Fetch more lines in the background while we
2139         // process the lines already read into memory
2140         // into a matrix of doubles.
2141         double[][] matrix;
2142         auto asyncReader = taskPool.asyncBuf(duped);
2143 
2144         foreach (line; asyncReader)
2145         {
2146             auto ls = line.split("\t");
2147             matrix ~= to!(double[])(ls);
2148         }
2149     }
2150     ---
2151 
2152     $(B Exception Handling):
2153 
2154     Any exceptions thrown while iterating over $(D source) are re-thrown on a
2155     call to $(D popFront) or, if thrown during construction, simply
2156     allowed to propagate to the caller.
2157     */
2158     auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2159     {
2160         static final class AsyncBuf
2161         {
2162             // This is a class because the task and source both need to be on
2163             // the heap.
2164 
2165             // The element type of S.
2166             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2167 
2168         private:
2169             E[] buf1, buf2;
2170             S source;
2171             TaskPool pool;
2172             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2173             size_t bufPos;
2174             bool lastTaskWaited;
2175 
2176             static if (hasLength!S)
2177             {
2178                 size_t _length;
2179 
2180                 // Available if hasLength!S.
length()2181                 public @property size_t length() const @safe pure nothrow
2182                 {
2183                     return _length;
2184                 }
2185             }
2186 
this(S source,size_t bufSize,TaskPool pool)2187             this(S source, size_t bufSize, TaskPool pool)
2188             {
2189                 buf1.length = bufSize;
2190                 buf2.length = bufSize;
2191 
2192                 this.source = source;
2193                 this.pool = pool;
2194 
2195                 static if (hasLength!S)
2196                 {
2197                     _length = source.length;
2198                 }
2199 
2200                 buf1 = fillBuf(buf1);
2201                 submitBuf2();
2202             }
2203 
fillBuf(E[]buf)2204             E[] fillBuf(E[] buf)
2205             {
2206                 assert(buf !is null);
2207 
2208                 size_t i;
2209                 for (; !source.empty && i < buf.length; source.popFront())
2210                 {
2211                     buf[i++] = source.front;
2212                 }
2213 
2214                 buf = buf[0 .. i];
2215                 return buf;
2216             }
2217 
submitBuf2()2218             void submitBuf2()
2219             in
2220             {
2221                 assert(nextBufTask.prev is null);
2222                 assert(nextBufTask.next is null);
2223             } body
2224             {
2225                 // Hack to reuse the task object.
2226 
2227                 nextBufTask = typeof(nextBufTask).init;
2228                 nextBufTask._args[0] = &fillBuf;
2229                 nextBufTask._args[1] = buf2;
2230                 pool.put(nextBufTask);
2231             }
2232 
doBufSwap()2233             void doBufSwap()
2234             {
2235                 if (lastTaskWaited)
2236                 {
2237                     // Then source is empty.  Signal it here.
2238                     buf1 = null;
2239                     buf2 = null;
2240                     return;
2241                 }
2242 
2243                 buf2 = buf1;
2244                 buf1 = nextBufTask.yieldForce;
2245                 bufPos = 0;
2246 
2247                 if (source.empty)
2248                 {
2249                     lastTaskWaited = true;
2250                 }
2251                 else
2252                 {
2253                     submitBuf2();
2254                 }
2255             }
2256 
2257         public:
front()2258             E front() @property
2259             {
2260                 return buf1[bufPos];
2261             }
2262 
popFront()2263             void popFront()
2264             {
2265                 static if (hasLength!S)
2266                 {
2267                     _length--;
2268                 }
2269 
2270                 bufPos++;
2271                 if (bufPos >= buf1.length)
2272                 {
2273                     doBufSwap();
2274                 }
2275             }
2276 
2277             static if (isInfinite!S)
2278             {
2279                 enum bool empty = false;
2280             }
2281 
2282             else
2283             {
2284                 ///
empty()2285                 bool empty() @property
2286                 {
2287                     // popFront() sets this when source is empty:
2288                     return buf1.length == 0;
2289                 }
2290             }
2291         }
2292         return new AsyncBuf(source, bufSize, this);
2293     }
2294 
2295     /**
2296     Given a callable object $(D next) that writes to a user-provided buffer and
2297     a second callable object $(D empty) that determines whether more data is
2298     available to write via $(D next), returns an input range that
2299     asynchronously calls $(D next) with a set of size $(D nBuffers) of buffers
2300     and makes the results available in the order they were obtained via the
2301     input range interface of the returned object.  Similarly to the
2302     input range overload of $(D asyncBuf), the first half of the buffers
2303     are made available via the range interface while the second half are
2304     filled and vice-versa.
2305 
2306     Params:
2307 
2308     next = A callable object that takes a single argument that must be an array
2309            with mutable elements.  When called, $(D next) writes data to
2310            the array provided by the caller.
2311 
2312     empty = A callable object that takes no arguments and returns a type
2313             implicitly convertible to $(D bool).  This is used to signify
2314             that no more data is available to be obtained by calling $(D next).
2315 
2316     initialBufSize = The initial size of each buffer.  If $(D next) takes its
2317                      array by reference, it may resize the buffers.
2318 
2319     nBuffers = The number of buffers to cycle through when calling $(D next).
2320 
2321     Example:
2322     ---
2323     // Fetch lines of a file in a background
2324     // thread while processing previously fetched
2325     // lines, without duplicating any lines.
2326     auto file = File("foo.txt");
2327 
2328     void next(ref char[] buf)
2329     {
2330         file.readln(buf);
2331     }
2332 
2333     // Fetch more lines in the background while we
2334     // process the lines already read into memory
2335     // into a matrix of doubles.
2336     double[][] matrix;
2337     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2338 
2339     foreach (line; asyncReader)
2340     {
2341         auto ls = line.split("\t");
2342         matrix ~= to!(double[])(ls);
2343     }
2344     ---
2345 
2346     $(B Exception Handling):
2347 
2348     Any exceptions thrown while iterating over $(D range) are re-thrown on a
2349     call to $(D popFront).
2350 
2351     Warning:
2352 
2353     Using the range returned by this function in a parallel foreach loop
2354     will not work because buffers may be overwritten while the task that
2355     processes them is in queue.  This is checked for at compile time
2356     and will result in a static assertion failure.
2357     */
2358     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2359     if (is(typeof(C2.init()) : bool) &&
2360         Parameters!C1.length == 1 &&
2361         Parameters!C2.length == 0 &&
2362         isArray!(Parameters!C1[0])
2363     ) {
2364         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2365         return asyncBuf(roundRobin, nBuffers / 2);
2366     }
2367 
2368     ///
reduce(functions...)2369     template reduce(functions...)
2370     {
2371         /**
2372         Parallel reduce on a random access range.  Except as otherwise noted,
2373         usage is similar to $(REF _reduce, std,algorithm,iteration).  This
2374         function works by splitting the range to be reduced into work units,
2375         which are slices to be reduced in parallel.  Once the results from all
2376         work units are computed, a final serial reduction is performed on these
2377         results to compute the final answer. Therefore, care must be taken to
2378         choose the seed value appropriately.
2379 
2380         Because the reduction is being performed in parallel, $(D functions)
2381         must be associative.  For notational simplicity, let # be an
2382         infix operator representing $(D functions).  Then, (a # b) # c must equal
2383         a # (b # c).  Floating point addition is not associative
2384         even though addition in exact arithmetic is.  Summing floating
2385         point numbers using this function may give different results than summing
2386         serially.  However, for many practical purposes floating point addition
2387         can be treated as associative.
2388 
2389         Note that, since $(D functions) are assumed to be associative,
2390         additional optimizations are made to the serial portion of the reduction
2391         algorithm. These take advantage of the instruction level parallelism of
2392         modern CPUs, in addition to the thread-level parallelism that the rest
2393         of this module exploits.  This can lead to better than linear speedups
2394         relative to $(REF _reduce, std,algorithm,iteration), especially for
2395         fine-grained benchmarks like dot products.
2396 
2397         An explicit seed may be provided as the first argument.  If
2398         provided, it is used as the seed for all work units and for the final
2399         reduction of results from all work units.  Therefore, if it is not the
2400         identity value for the operation being performed, results may differ
2401         from those generated by $(REF _reduce, std,algorithm,iteration) or
2402         depending on how many work units are used.  The next argument must be
2403         the range to be reduced.
2404         ---
2405         // Find the sum of squares of a range in parallel, using
2406         // an explicit seed.
2407         //
2408         // Timings on an Athlon 64 X2 dual core machine:
2409         //
2410         // Parallel reduce:                     72 milliseconds
2411         // Using std.algorithm.reduce instead:  181 milliseconds
2412         auto nums = iota(10_000_000.0f);
2413         auto sumSquares = taskPool.reduce!"a + b"(
2414             0.0, std.algorithm.map!"a * a"(nums)
2415         );
2416         ---
2417 
2418         If no explicit seed is provided, the first element of each work unit
2419         is used as a seed.  For the final reduction, the result from the first
2420         work unit is used as the seed.
2421         ---
2422         // Find the sum of a range in parallel, using the first
2423         // element of each work unit as the seed.
2424         auto sum = taskPool.reduce!"a + b"(nums);
2425         ---
2426 
2427         An explicit work unit size may be specified as the last argument.
2428         Specifying too small a work unit size will effectively serialize the
2429         reduction, as the final reduction of the result of each work unit will
2430         dominate computation time.  If $(D TaskPool.size) for this instance
2431         is zero, this parameter is ignored and one work unit is used.
2432         ---
2433         // Use a work unit size of 100.
2434         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2435 
2436         // Work unit size of 100 and explicit seed.
2437         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2438         ---
2439 
2440         Parallel reduce supports multiple functions, like
2441         $(D std.algorithm.reduce).
2442         ---
2443         // Find both the min and max of nums.
2444         auto minMax = taskPool.reduce!(min, max)(nums);
2445         assert(minMax[0] == reduce!min(nums));
2446         assert(minMax[1] == reduce!max(nums));
2447         ---
2448 
2449         $(B Exception Handling):
2450 
2451         After this function is finished executing, any exceptions thrown
2452         are chained together via $(D Throwable.next) and rethrown.  The chaining
2453         order is non-deterministic.
2454          */
2455         auto reduce(Args...)(Args args)
2456         {
2457             import core.exception : OutOfMemoryError;
2458             import std.conv : emplaceRef;
2459             import std.exception : enforce;
2460 
2461             alias fun = reduceAdjoin!functions;
2462             alias finishFun = reduceFinish!functions;
2463 
2464             static if (isIntegral!(Args[$ - 1]))
2465             {
2466                 size_t workUnitSize = cast(size_t) args[$ - 1];
2467                 alias args2 = args[0..$ - 1];
2468                 alias Args2 = Args[0..$ - 1];
2469             }
2470             else
2471             {
2472                 alias args2 = args;
2473                 alias Args2 = Args;
2474             }
2475 
2476             auto makeStartValue(Type)(Type e)
2477             {
2478                 static if (functions.length == 1)
2479                 {
2480                     return e;
2481                 }
2482                 else
2483                 {
2484                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2485                     foreach (i, T; seed.Types)
2486                     {
2487                         emplaceRef(seed.expand[i], e);
2488                     }
2489 
2490                     return seed;
2491                 }
2492             }
2493 
2494             static if (args2.length == 2)
2495             {
2496                 static assert(isInputRange!(Args2[1]));
2497                 alias range = args2[1];
2498                 alias seed = args2[0];
2499                 enum explicitSeed = true;
2500 
2501                 static if (!is(typeof(workUnitSize)))
2502                 {
2503                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2504                 }
2505             }
2506             else
2507             {
2508                 static assert(args2.length == 1);
2509                 alias range = args2[0];
2510 
2511                 static if (!is(typeof(workUnitSize)))
2512                 {
2513                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2514                 }
2515 
2516                 enforce(!range.empty,
2517                     "Cannot reduce an empty range with first element as start value.");
2518 
2519                 auto seed = makeStartValue(range.front);
2520                 enum explicitSeed = false;
2521                 range.popFront();
2522             }
2523 
2524             alias E = typeof(seed);
2525             alias R = typeof(range);
2526 
2527             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2528             {
2529                 // This is for exploiting instruction level parallelism by
2530                 // using multiple accumulator variables within each thread,
2531                 // since we're assuming functions are associative anyhow.
2532 
2533                 // This is so that loops can be unrolled automatically.
2534                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2535                 enum nILP = ilpTuple.length;
2536                 immutable subSize = (upperBound - lowerBound) / nILP;
2537 
2538                 if (subSize <= 1)
2539                 {
2540                     // Handle as a special case.
2541                     static if (explicitSeed)
2542                     {
2543                         E result = seed;
2544                     }
2545                     else
2546                     {
2547                         E result = makeStartValue(range[lowerBound]);
2548                         lowerBound++;
2549                     }
2550 
2551                     foreach (i; lowerBound .. upperBound)
2552                     {
2553                         result = fun(result, range[i]);
2554                     }
2555 
2556                     return result;
2557                 }
2558 
2559                 assert(subSize > 1);
2560                 E[nILP] results;
2561                 size_t[nILP] offsets;
2562 
2563                 foreach (i; ilpTuple)
2564                 {
2565                     offsets[i] = lowerBound + subSize * i;
2566 
2567                     static if (explicitSeed)
2568                     {
2569                         results[i] = seed;
2570                     }
2571                     else
2572                     {
2573                         results[i] = makeStartValue(range[offsets[i]]);
2574                         offsets[i]++;
2575                     }
2576                 }
2577 
2578                 immutable nLoop = subSize - (!explicitSeed);
2579                 foreach (i; 0 .. nLoop)
2580                 {
2581                     foreach (j; ilpTuple)
2582                     {
2583                         results[j] = fun(results[j], range[offsets[j]]);
2584                         offsets[j]++;
2585                     }
2586                 }
2587 
2588                 // Finish the remainder.
2589                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2590                 {
2591                     results[$ - 1] = fun(results[$ - 1], range[i]);
2592                 }
2593 
2594                 foreach (i; ilpTuple[1..$])
2595                 {
2596                     results[0] = finishFun(results[0], results[i]);
2597                 }
2598 
2599                 return results[0];
2600             }
2601 
2602             immutable len = range.length;
2603             if (len == 0)
2604             {
2605                 return seed;
2606             }
2607 
2608             if (this.size == 0)
2609             {
2610                 return finishFun(seed, reduceOnRange(range, 0, len));
2611             }
2612 
2613             // Unlike the rest of the functions here, I can't use the Task object
2614             // recycling trick here because this has to work on non-commutative
2615             // operations.  After all the tasks are done executing, fun() has to
2616             // be applied on the results of these to get a final result, but
2617             // it can't be evaluated out of order.
2618 
2619             if (workUnitSize > len)
2620             {
2621                 workUnitSize = len;
2622             }
2623 
2624             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2625             assert(nWorkUnits * workUnitSize >= len);
2626 
2627             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2628             RTask[] tasks;
2629 
2630             // Can't use alloca() due to Bug 3753.  Use a fixed buffer
2631             // backed by malloc().
2632             enum maxStack = 2_048;
2633             byte[maxStack] buf = void;
2634             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2635 
2636             import core.stdc.stdlib : malloc, free;
2637             if (nBytesNeeded < maxStack)
2638             {
2639                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2640             }
2641             else
2642             {
2643                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2644                 if (!ptr)
2645                 {
2646                     throw new OutOfMemoryError(
2647                         "Out of memory in std.parallelism."
2648                     );
2649                 }
2650 
2651                 tasks = ptr[0 .. nWorkUnits];
2652             }
2653 
2654             scope(exit)
2655             {
2656                 if (nBytesNeeded > maxStack)
2657                 {
2658                     free(tasks.ptr);
2659                 }
2660             }
2661 
2662             foreach (ref t; tasks[])
2663                 emplaceRef(t, RTask());
2664 
2665             // Hack to take the address of a nested function w/o
2666             // making a closure.
2667             static auto scopedAddress(D)(scope D del) @system
2668             {
2669                 auto tmp = del;
2670                 return tmp;
2671             }
2672 
2673             size_t curPos = 0;
2674             void useTask(ref RTask task)
2675             {
2676                 import std.algorithm.comparison : min;
2677 
2678                 task.pool = this;
2679                 task._args[0] = scopedAddress(&reduceOnRange);
2680                 task._args[3] = min(len, curPos + workUnitSize);  // upper bound.
2681                 task._args[1] = range;  // range
2682                 task._args[2] = curPos; // lower bound.
2683 
2684                 curPos += workUnitSize;
2685             }
2686 
2687             foreach (ref task; tasks)
2688             {
2689                 useTask(task);
2690             }
2691 
2692             foreach (i; 1 .. tasks.length - 1)
2693             {
2694                 tasks[i].next = tasks[i + 1].basePtr;
2695                 tasks[i + 1].prev = tasks[i].basePtr;
2696             }
2697 
2698             if (tasks.length > 1)
2699             {
2700                 queueLock();
2701                 scope(exit) queueUnlock();
2702 
2703                 abstractPutGroupNoSync(
2704                     tasks[1].basePtr,
2705                     tasks[$ - 1].basePtr
2706                 );
2707             }
2708 
2709             if (tasks.length > 0)
2710             {
2711                 try
2712                 {
2713                     tasks[0].job();
2714                 }
2715                 catch (Throwable e)
2716                 {
2717                     tasks[0].exception = e;
2718                 }
2719                 tasks[0].taskStatus = TaskStatus.done;
2720 
2721                 // Try to execute each of these in the current thread
2722                 foreach (ref task; tasks[1..$])
2723                 {
2724                     tryDeleteExecute(task.basePtr);
2725                 }
2726             }
2727 
2728             // Now that we've tried to execute every task, they're all either
2729             // done or in progress.  Force all of them.
2730             E result = seed;
2731 
2732             Throwable firstException, lastException;
2733 
2734             foreach (ref task; tasks)
2735             {
2736                 try
2737                 {
2738                     task.yieldForce;
2739                 }
2740                 catch (Throwable e)
2741                 {
2742                     addToChain(e, firstException, lastException);
2743                     continue;
2744                 }
2745 
2746                 if (!firstException) result = finishFun(result, task.returnVal);
2747             }
2748 
2749             if (firstException) throw firstException;
2750 
2751             return result;
2752         }
2753     }
2754 
2755     /**
2756     Gets the index of the current thread relative to this $(D TaskPool).  Any
2757     thread not in this pool will receive an index of 0.  The worker threads in
2758     this pool receive unique indices of 1 through $(D this.size).
2759 
2760     This function is useful for maintaining worker-local resources.
2761 
2762     Example:
2763     ---
2764     // Execute a loop that computes the greatest common
2765     // divisor of every number from 0 through 999 with
2766     // 42 in parallel.  Write the results out to
2767     // a set of files, one for each thread.  This allows
2768     // results to be written out without any synchronization.
2769 
2770     import std.conv, std.range, std.numeric, std.stdio;
2771 
2772     void main()
2773     {
2774         auto filesHandles = new File[taskPool.size + 1];
2775         scope(exit) {
2776             foreach (ref handle; fileHandles)
2777             {
2778                 handle.close();
2779             }
2780         }
2781 
2782         foreach (i, ref handle; fileHandles)
2783         {
2784             handle = File("workerResults" ~ to!string(i) ~ ".txt");
2785         }
2786 
2787         foreach (num; parallel(iota(1_000)))
2788         {
2789             auto outHandle = fileHandles[taskPool.workerIndex];
2790             outHandle.writeln(num, '\t', gcd(num, 42));
2791         }
2792     }
2793     ---
2794     */
workerIndex()2795     size_t workerIndex() @property @safe const nothrow
2796     {
2797         immutable rawInd = threadIndex;
2798         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
2799                 (rawInd - instanceStartIndex + 1) : 0;
2800     }
2801 
2802     /**
2803     Struct for creating worker-local storage.  Worker-local storage is
2804     thread-local storage that exists only for worker threads in a given
2805     $(D TaskPool) plus a single thread outside the pool.  It is allocated on the
2806     garbage collected heap in a way that avoids _false sharing, and doesn't
2807     necessarily have global scope within any thread.  It can be accessed from
2808     any worker thread in the $(D TaskPool) that created it, and one thread
2809     outside this $(D TaskPool).  All threads outside the pool that created a
2810     given instance of worker-local storage share a single slot.
2811 
2812     Since the underlying data for this struct is heap-allocated, this struct
2813     has reference semantics when passed between functions.
2814 
2815     The main uses cases for $(D WorkerLocalStorageStorage) are:
2816 
2817     1.  Performing parallel reductions with an imperative, as opposed to
2818     functional, programming style.  In this case, it's useful to treat
2819     $(D WorkerLocalStorageStorage) as local to each thread for only the parallel
2820     portion of an algorithm.
2821 
2822     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
2823 
2824     Example:
2825     ---
2826     // Calculate pi as in our synopsis example, but
2827     // use an imperative instead of a functional style.
2828     immutable n = 1_000_000_000;
2829     immutable delta = 1.0L / n;
2830 
2831     auto sums = taskPool.workerLocalStorage(0.0L);
2832     foreach (i; parallel(iota(n)))
2833     {
2834         immutable x = ( i - 0.5L ) * delta;
2835         immutable toAdd = delta / ( 1.0 + x * x );
2836         sums.get += toAdd;
2837     }
2838 
2839     // Add up the results from each worker thread.
2840     real pi = 0;
2841     foreach (threadResult; sums.toRange)
2842     {
2843         pi += 4.0L * threadResult;
2844     }
2845     ---
2846      */
WorkerLocalStorage(T)2847     static struct WorkerLocalStorage(T)
2848     {
2849     private:
2850         TaskPool pool;
2851         size_t size;
2852 
2853         size_t elemSize;
2854         bool* stillThreadLocal;
2855 
2856         static size_t roundToLine(size_t num) pure nothrow
2857         {
2858             if (num % cacheLineSize == 0)
2859             {
2860                 return num;
2861             }
2862             else
2863             {
2864                 return ((num / cacheLineSize) + 1) * cacheLineSize;
2865             }
2866         }
2867 
2868         void* data;
2869 
2870         void initialize(TaskPool pool)
2871         {
2872             this.pool = pool;
2873             size = pool.size + 1;
2874             stillThreadLocal = new bool;
2875             *stillThreadLocal = true;
2876 
2877             // Determines whether the GC should scan the array.
2878             auto blkInfo = (typeid(T).flags & 1) ?
2879                            cast(GC.BlkAttr) 0 :
2880                            GC.BlkAttr.NO_SCAN;
2881 
2882             immutable nElem = pool.size + 1;
2883             elemSize = roundToLine(T.sizeof);
2884 
2885             // The + 3 is to pad one full cache line worth of space on either side
2886             // of the data structure to make sure false sharing with completely
2887             // unrelated heap data is prevented, and to provide enough padding to
2888             // make sure that data is cache line-aligned.
2889             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
2890 
2891             // Cache line align data ptr.
2892             data = cast(void*) roundToLine(cast(size_t) data);
2893 
2894             foreach (i; 0 .. nElem)
2895             {
2896                 this.opIndex(i) = T.init;
2897             }
2898         }
2899 
2900         ref opIndex(this Qualified)(size_t index)
2901         {
2902             import std.conv : text;
2903             assert(index < size, text(index, '\t', uint.max));
2904             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
2905         }
2906 
2907         void opIndexAssign(T val, size_t index)
2908         {
2909             assert(index < size);
2910             *(cast(T*) (data + elemSize * index)) = val;
2911         }
2912 
2913     public:
2914         /**
2915         Get the current thread's instance.  Returns by ref.
2916         Note that calling $(D get) from any thread
2917         outside the $(D TaskPool) that created this instance will return the
2918         same reference, so an instance of worker-local storage should only be
2919         accessed from one thread outside the pool that created it.  If this
2920         rule is violated, undefined behavior will result.
2921 
2922         If assertions are enabled and $(D toRange) has been called, then this
2923         WorkerLocalStorage instance is no longer worker-local and an assertion
2924         failure will result when calling this method.  This is not checked
2925         when assertions are disabled for performance reasons.
2926          */
2927         ref get(this Qualified)() @property
2928         {
2929             assert(*stillThreadLocal,
2930                 "Cannot call get() on this instance of WorkerLocalStorage " ~
2931                 "because it is no longer worker-local."
2932             );
2933             return opIndex(pool.workerIndex);
2934         }
2935 
2936         /**
2937         Assign a value to the current thread's instance.  This function has
2938         the same caveats as its overload.
2939         */
2940         void get(T val) @property
2941         {
2942             assert(*stillThreadLocal,
2943                 "Cannot call get() on this instance of WorkerLocalStorage " ~
2944                 "because it is no longer worker-local."
2945             );
2946 
2947             opIndexAssign(val, pool.workerIndex);
2948         }
2949 
2950         /**
2951         Returns a range view of the values for all threads, which can be used
2952         to further process the results of each thread after running the parallel
2953         part of your algorithm.  Do not use this method in the parallel portion
2954         of your algorithm.
2955 
2956         Calling this function sets a flag indicating that this struct is no
2957         longer worker-local, and attempting to use the $(D get) method again
2958         will result in an assertion failure if assertions are enabled.
2959          */
2960         WorkerLocalStorageRange!T toRange() @property
2961         {
2962             if (*stillThreadLocal)
2963             {
2964                 *stillThreadLocal = false;
2965 
2966                 // Make absolutely sure results are visible to all threads.
2967                 // This is probably not necessary since some other
2968                 // synchronization primitive will be used to signal that the
2969                 // parallel part of the algorithm is done, but the
2970                 // performance impact should be negligible, so it's better
2971                 // to be safe.
2972                 ubyte barrierDummy;
2973                 atomicSetUbyte(barrierDummy, 1);
2974             }
2975 
2976             return WorkerLocalStorageRange!T(this);
2977         }
2978     }
2979 
2980     /**
2981     Range primitives for worker-local storage.  The purpose of this is to
2982     access results produced by each worker thread from a single thread once you
2983     are no longer using the worker-local storage from multiple threads.
2984     Do not use this struct in the parallel portion of your algorithm.
2985 
2986     The proper way to instantiate this object is to call
2987     $(D WorkerLocalStorage.toRange).  Once instantiated, this object behaves
2988     as a finite random-access range with assignable, lvalue elements and
2989     a length equal to the number of worker threads in the $(D TaskPool) that
2990     created it plus 1.
2991      */
WorkerLocalStorageRange(T)2992     static struct WorkerLocalStorageRange(T)
2993     {
2994     private:
2995         WorkerLocalStorage!T workerLocalStorage;
2996 
2997         size_t _length;
2998         size_t beginOffset;
2999 
3000         this(WorkerLocalStorage!T wl)
3001         {
3002             this.workerLocalStorage = wl;
3003             _length = wl.size;
3004         }
3005 
3006     public:
3007         ref front(this Qualified)() @property
3008         {
3009             return this[0];
3010         }
3011 
3012         ref back(this Qualified)() @property
3013         {
3014             return this[_length - 1];
3015         }
3016 
3017         void popFront()
3018         {
3019             if (_length > 0)
3020             {
3021                 beginOffset++;
3022                 _length--;
3023             }
3024         }
3025 
3026         void popBack()
3027         {
3028             if (_length > 0)
3029             {
3030                 _length--;
3031             }
3032         }
3033 
3034         typeof(this) save() @property
3035         {
3036             return this;
3037         }
3038 
3039         ref opIndex(this Qualified)(size_t index)
3040         {
3041             assert(index < _length);
3042             return workerLocalStorage[index + beginOffset];
3043         }
3044 
3045         void opIndexAssign(T val, size_t index)
3046         {
3047             assert(index < _length);
3048             workerLocalStorage[index] = val;
3049         }
3050 
3051         typeof(this) opSlice(size_t lower, size_t upper)
3052         {
3053             assert(upper <= _length);
3054             auto newWl = this.workerLocalStorage;
3055             newWl.data += lower * newWl.elemSize;
3056             newWl.size = upper - lower;
3057             return typeof(this)(newWl);
3058         }
3059 
3060         bool empty() const @property
3061         {
3062             return length == 0;
3063         }
3064 
3065         size_t length() const @property
3066         {
3067             return _length;
3068         }
3069     }
3070 
3071     /**
3072     Creates an instance of worker-local storage, initialized with a given
3073     value.  The value is $(D lazy) so that you can, for example, easily
3074     create one instance of a class for each worker.  For usage example,
3075     see the $(D WorkerLocalStorage) struct.
3076      */
3077     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3078     {
3079         WorkerLocalStorage!T ret;
3080         ret.initialize(this);
3081         foreach (i; 0 .. size + 1)
3082         {
3083             ret[i] = initialVal;
3084         }
3085 
3086         // Memory barrier to make absolutely sure that what we wrote is
3087         // visible to worker threads.
3088         ubyte barrierDummy;
3089         atomicSetUbyte(barrierDummy, 0);
3090 
3091         return ret;
3092     }
3093 
3094     /**
3095     Signals to all worker threads to terminate as soon as they are finished
3096     with their current $(D Task), or immediately if they are not executing a
3097     $(D Task).  $(D Task)s that were in queue will not be executed unless
3098     a call to $(D Task.workForce), $(D Task.yieldForce) or $(D Task.spinForce)
3099     causes them to be executed.
3100 
3101     Use only if you have waited on every $(D Task) and therefore know the
3102     queue is empty, or if you speculatively executed some tasks and no longer
3103     need the results.
3104      */
stop()3105     void stop() @trusted
3106     {
3107         queueLock();
3108         scope(exit) queueUnlock();
3109         atomicSetUbyte(status, PoolState.stopNow);
3110         notifyAll();
3111     }
3112 
3113     /**
3114     Signals worker threads to terminate when the queue becomes empty.
3115 
3116     If blocking argument is true, wait for all worker threads to terminate
3117     before returning.  This option might be used in applications where
3118     task results are never consumed-- e.g. when $(D TaskPool) is employed as a
3119     rudimentary scheduler for tasks which communicate by means other than
3120     return values.
3121 
3122     Warning:  Calling this function with $(D blocking = true) from a worker
3123               thread that is a member of the same $(D TaskPool) that
3124               $(D finish) is being called on will result in a deadlock.
3125      */
3126     void finish(bool blocking = false) @trusted
3127     {
3128         {
3129             queueLock();
3130             scope(exit) queueUnlock();
3131             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3132             notifyAll();
3133         }
3134         if (blocking)
3135         {
3136             // Use this thread as a worker until everything is finished.
3137             executeWorkLoop();
3138 
foreach(t;pool)3139             foreach (t; pool)
3140             {
3141                 // Maybe there should be something here to prevent a thread
3142                 // from calling join() on itself if this function is called
3143                 // from a worker thread in the same pool, but:
3144                 //
3145                 // 1.  Using an if statement to skip join() would result in
3146                 //     finish() returning without all tasks being finished.
3147                 //
3148                 // 2.  If an exception were thrown, it would bubble up to the
3149                 //     Task from which finish() was called and likely be
3150                 //     swallowed.
3151                 t.join();
3152             }
3153         }
3154     }
3155 
3156     /// Returns the number of worker threads in the pool.
size()3157     @property size_t size() @safe const pure nothrow
3158     {
3159         return pool.length;
3160     }
3161 
3162     /**
3163     Put a $(D Task) object on the back of the task queue.  The $(D Task)
3164     object may be passed by pointer or reference.
3165 
3166     Example:
3167     ---
3168     import std.file;
3169 
3170     // Create a task.
3171     auto t = task!read("foo.txt");
3172 
3173     // Add it to the queue to be executed.
3174     taskPool.put(t);
3175     ---
3176 
3177     Notes:
3178 
3179     @trusted overloads of this function are called for $(D Task)s if
3180     $(REF hasUnsharedAliasing, std,traits) is false for the $(D Task)'s
3181     return type or the function the $(D Task) executes is $(D pure).
3182     $(D Task) objects that meet all other requirements specified in the
3183     $(D @trusted) overloads of $(D task) and $(D scopedTask) may be created
3184     and executed from $(D @safe) code via $(D Task.executeInNewThread) but
3185     not via $(D TaskPool).
3186 
3187     While this function takes the address of variables that may
3188     be on the stack, some overloads are marked as @trusted.
3189     $(D Task) includes a destructor that waits for the task to complete
3190     before destroying the stack frame it is allocated on.  Therefore,
3191     it is impossible for the stack frame to be destroyed before the task is
3192     complete and no longer referenced by a $(D TaskPool).
3193     */
3194     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3195     if (!isSafeReturn!(typeof(task)))
3196     {
3197         task.pool = this;
3198         abstractPut(task.basePtr);
3199     }
3200 
3201     /// Ditto
3202     void put(alias fun, Args...)(Task!(fun, Args)* task)
3203     if (!isSafeReturn!(typeof(*task)))
3204     {
3205         import std.exception : enforce;
3206         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3207         put(*task);
3208     }
3209 
3210     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3211     if (isSafeReturn!(typeof(task)))
3212     {
3213         task.pool = this;
3214         abstractPut(task.basePtr);
3215     }
3216 
3217     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3218     if (isSafeReturn!(typeof(*task)))
3219     {
3220         import std.exception : enforce;
3221         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3222         put(*task);
3223     }
3224 
3225     /**
3226     These properties control whether the worker threads are daemon threads.
3227     A daemon thread is automatically terminated when all non-daemon threads
3228     have terminated.  A non-daemon thread will prevent a program from
3229     terminating as long as it has not terminated.
3230 
3231     If any $(D TaskPool) with non-daemon threads is active, either $(D stop)
3232     or $(D finish) must be called on it before the program can terminate.
3233 
3234     The worker treads in the $(D TaskPool) instance returned by the
3235     $(D taskPool) property are daemon by default.  The worker threads of
3236     manually instantiated task pools are non-daemon by default.
3237 
3238     Note:  For a size zero pool, the getter arbitrarily returns true and the
3239            setter has no effect.
3240     */
isDaemon()3241     bool isDaemon() @property @trusted
3242     {
3243         queueLock();
3244         scope(exit) queueUnlock();
3245         return (size == 0) ? true : pool[0].isDaemon;
3246     }
3247 
3248     /// Ditto
isDaemon(bool newVal)3249     void isDaemon(bool newVal) @property @trusted
3250     {
3251         queueLock();
3252         scope(exit) queueUnlock();
3253         foreach (thread; pool)
3254         {
3255             thread.isDaemon = newVal;
3256         }
3257     }
3258 
3259     /**
3260     These functions allow getting and setting the OS scheduling priority of
3261     the worker threads in this $(D TaskPool).  They forward to
3262     $(D core.thread.Thread.priority), so a given priority value here means the
3263     same thing as an identical priority value in $(D core.thread).
3264 
3265     Note:  For a size zero pool, the getter arbitrarily returns
3266            $(D core.thread.Thread.PRIORITY_MIN) and the setter has no effect.
3267     */
priority()3268     int priority() @property @trusted
3269     {
3270         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3271         pool[0].priority;
3272     }
3273 
3274     /// Ditto
priority(int newPriority)3275     void priority(int newPriority) @property @trusted
3276     {
3277         if (size > 0)
3278         {
3279             foreach (t; pool)
3280             {
3281                 t.priority = newPriority;
3282             }
3283         }
3284     }
3285 }
3286 
3287 /**
3288 Returns a lazily initialized global instantiation of $(D TaskPool).
3289 This function can safely be called concurrently from multiple non-worker
3290 threads.  The worker threads in this pool are daemon threads, meaning that it
3291 is not necessary to call $(D TaskPool.stop) or $(D TaskPool.finish) before
3292 terminating the main thread.
3293 */
taskPool()3294 @property TaskPool taskPool() @trusted
3295 {
3296     import std.concurrency : initOnce;
3297     __gshared TaskPool pool;
3298     return initOnce!pool({
3299         auto p = new TaskPool(defaultPoolThreads);
3300         p.isDaemon = true;
3301         return p;
3302     }());
3303 }
3304 
3305 private shared uint _defaultPoolThreads;
this()3306 shared static this()
3307 {
3308     atomicStore(_defaultPoolThreads, totalCPUs - 1);
3309 }
3310 
3311 /**
3312 These properties get and set the number of worker threads in the $(D TaskPool)
3313 instance returned by $(D taskPool).  The default value is $(D totalCPUs) - 1.
3314 Calling the setter after the first call to $(D taskPool) does not changes
3315 number of worker threads in the instance returned by $(D taskPool).
3316 */
defaultPoolThreads()3317 @property uint defaultPoolThreads() @trusted
3318 {
3319     return atomicLoad(_defaultPoolThreads);
3320 }
3321 
3322 /// Ditto
defaultPoolThreads(uint newVal)3323 @property void defaultPoolThreads(uint newVal) @trusted
3324 {
3325     atomicStore(_defaultPoolThreads, newVal);
3326 }
3327 
3328 /**
3329 Convenience functions that forwards to $(D taskPool.parallel).  The
3330 purpose of these is to make parallel foreach less verbose and more
3331 readable.
3332 
3333 Example:
3334 ---
3335 // Find the logarithm of every number from
3336 // 1 to 1_000_000 in parallel, using the
3337 // default TaskPool instance.
3338 auto logs = new double[1_000_000];
3339 
3340 foreach (i, ref elem; parallel(logs))
3341 {
3342     elem = log(i + 1.0);
3343 }
3344 ---
3345 
3346 */
3347 ParallelForeach!R parallel(R)(R range)
3348 {
3349     return taskPool.parallel(range);
3350 }
3351 
3352 /// Ditto
3353 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3354 {
3355     return taskPool.parallel(range, workUnitSize);
3356 }
3357 
3358 // Thrown when a parallel foreach loop is broken from.
3359 class ParallelForeachError : Error
3360 {
this()3361     this()
3362     {
3363         super("Cannot break from a parallel foreach loop using break, return, "
3364               ~ "labeled break/continue or goto statements.");
3365     }
3366 }
3367 
3368 /*------Structs that implement opApply for parallel foreach.------------------*/
randLen(R)3369 private template randLen(R)
3370 {
3371     enum randLen = isRandomAccessRange!R && hasLength!R;
3372 }
3373 
submitAndExecute(TaskPool pool,scope void delegate ()doIt)3374 private void submitAndExecute(
3375     TaskPool pool,
3376     scope void delegate() doIt
3377 )
3378 {
3379     import core.exception : OutOfMemoryError;
3380     immutable nThreads = pool.size + 1;
3381 
3382     alias PTask = typeof(scopedTask(doIt));
3383     import core.stdc.stdlib : malloc, free;
3384     import core.stdc.string : memcpy;
3385 
3386     // The logical thing to do would be to just use alloca() here, but that
3387     // causes problems on Windows for reasons that I don't understand
3388     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3389     // to Bug 3753.  Therefore, allocate a fixed buffer and fall back to
3390     // malloc() if someone's using a ridiculous amount of threads.  Also,
3391     // the using a byte array instead of a PTask array as the fixed buffer
3392     // is to prevent d'tors from being called on uninitialized excess PTask
3393     // instances.
3394     enum nBuf = 64;
3395     byte[nBuf * PTask.sizeof] buf = void;
3396     PTask[] tasks;
3397     if (nThreads <= nBuf)
3398     {
3399         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3400     }
3401     else
3402     {
3403         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3404         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3405         tasks = ptr[0 .. nThreads];
3406     }
3407 
3408     scope(exit)
3409     {
3410         if (nThreads > nBuf)
3411         {
3412             free(tasks.ptr);
3413         }
3414     }
3415 
3416     foreach (ref t; tasks)
3417     {
3418         import core.stdc.string : memcpy;
3419 
3420         // This silly looking code is necessary to prevent d'tors from being
3421         // called on uninitialized objects.
3422         auto temp = scopedTask(doIt);
3423         memcpy(&t, &temp, PTask.sizeof);
3424 
3425         // This has to be done to t after copying, not temp before copying.
3426         // Otherwise, temp's destructor will sit here and wait for the
3427         // task to finish.
3428         t.pool = pool;
3429     }
3430 
3431     foreach (i; 1 .. tasks.length - 1)
3432     {
3433         tasks[i].next = tasks[i + 1].basePtr;
3434         tasks[i + 1].prev = tasks[i].basePtr;
3435     }
3436 
3437     if (tasks.length > 1)
3438     {
3439         pool.queueLock();
3440         scope(exit) pool.queueUnlock();
3441 
3442         pool.abstractPutGroupNoSync(
3443             tasks[1].basePtr,
3444             tasks[$ - 1].basePtr
3445         );
3446     }
3447 
3448     if (tasks.length > 0)
3449     {
3450         try
3451         {
3452             tasks[0].job();
3453         }
3454         catch (Throwable e)
3455         {
3456             tasks[0].exception = e; // nocoverage
3457         }
3458         tasks[0].taskStatus = TaskStatus.done;
3459 
3460         // Try to execute each of these in the current thread
3461         foreach (ref task; tasks[1..$])
3462         {
3463             pool.tryDeleteExecute(task.basePtr);
3464         }
3465     }
3466 
3467     Throwable firstException, lastException;
3468 
3469     foreach (i, ref task; tasks)
3470     {
3471         try
3472         {
3473             task.yieldForce;
3474         }
3475         catch (Throwable e)
3476         {
3477             addToChain(e, firstException, lastException);
3478             continue;
3479         }
3480     }
3481 
3482     if (firstException) throw firstException;
3483 }
3484 
foreachErr()3485 void foreachErr()
3486 {
3487     throw new ParallelForeachError();
3488 }
3489 
doSizeZeroCase(R,Delegate)3490 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3491 {
3492     with(p)
3493     {
3494         int res = 0;
3495         size_t index = 0;
3496 
3497         // The explicit ElementType!R in the foreach loops is necessary for
3498         // correct behavior when iterating over strings.
3499         static if (hasLvalueElements!R)
3500         {
3501             foreach (ref ElementType!R elem; range)
3502             {
3503                 static if (Parameters!dg.length == 2)
3504                 {
3505                     res = dg(index, elem);
3506                 }
3507                 else
3508                 {
3509                     res = dg(elem);
3510                 }
3511                 if (res) break;
3512                 index++;
3513             }
3514         }
3515         else
3516         {
3517             foreach (ElementType!R elem; range)
3518             {
3519                 static if (Parameters!dg.length == 2)
3520                 {
3521                     res = dg(index, elem);
3522                 }
3523                 else
3524                 {
3525                     res = dg(elem);
3526                 }
3527                 if (res) break;
3528                 index++;
3529             }
3530         }
3531         if (res) foreachErr;
3532         return res;
3533     }
3534 }
3535 
3536 private enum string parallelApplyMixinRandomAccess = q{
3537     // Handle empty thread pool as special case.
3538     if (pool.size == 0)
3539     {
3540         return doSizeZeroCase(this, dg);
3541     }
3542 
3543     // Whether iteration is with or without an index variable.
3544     enum withIndex = Parameters!(typeof(dg)).length == 2;
3545 
3546     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3547     immutable len = range.length;
3548     if (!len) return 0;
3549 
3550     shared bool shouldContinue = true;
3551 
3552     void doIt()
3553     {
3554         import std.algorithm.comparison : min;
3555 
3556         scope(failure)
3557         {
3558             // If an exception is thrown, all threads should bail.
3559             atomicStore(shouldContinue, false);
3560         }
3561 
3562         while (atomicLoad(shouldContinue))
3563         {
3564             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3565             immutable start = workUnitSize * myUnitIndex;
3566             if (start >= len)
3567             {
3568                 atomicStore(shouldContinue, false);
3569                 break;
3570             }
3571 
3572             immutable end = min(len, start + workUnitSize);
3573 
3574             foreach (i; start .. end)
3575             {
3576                 static if (withIndex)
3577                 {
3578                     if (dg(i, range[i])) foreachErr();
3579                 }
3580                 else
3581                 {
3582                     if (dg(range[i])) foreachErr();
3583                 }
3584             }
3585         }
3586     }
3587 
3588     submitAndExecute(pool, &doIt);
3589 
3590     return 0;
3591 };
3592 
3593 enum string parallelApplyMixinInputRange = q{
3594     // Handle empty thread pool as special case.
3595     if (pool.size == 0)
3596     {
3597         return doSizeZeroCase(this, dg);
3598     }
3599 
3600     // Whether iteration is with or without an index variable.
3601     enum withIndex = Parameters!(typeof(dg)).length == 2;
3602 
3603     // This protects the range while copying it.
3604     auto rangeMutex = new Mutex();
3605 
3606     shared bool shouldContinue = true;
3607 
3608     // The total number of elements that have been popped off range.
3609     // This is updated only while protected by rangeMutex;
3610     size_t nPopped = 0;
3611 
3612     static if (
3613         is(typeof(range.buf1)) &&
3614         is(typeof(range.bufPos)) &&
3615         is(typeof(range.doBufSwap()))
3616     )
3617     {
3618         // Make sure we don't have the buffer recycling overload of
3619         // asyncBuf.
3620         static if (
3621             is(typeof(range.source)) &&
3622             isRoundRobin!(typeof(range.source))
3623         )
3624         {
3625             static assert(0, "Cannot execute a parallel foreach loop on " ~
3626             "the buffer recycling overload of asyncBuf.");
3627         }
3628 
3629         enum bool bufferTrick = true;
3630     }
3631     else
3632     {
3633         enum bool bufferTrick = false;
3634     }
3635 
3636     void doIt()
3637     {
3638         scope(failure)
3639         {
3640             // If an exception is thrown, all threads should bail.
3641             atomicStore(shouldContinue, false);
3642         }
3643 
3644         static if (hasLvalueElements!R)
3645         {
3646             alias Temp = ElementType!R*[];
3647             Temp temp;
3648 
3649             // Returns:  The previous value of nPopped.
3650             size_t makeTemp()
3651             {
3652                 import std.algorithm.internal : addressOf;
3653                 import std.array : uninitializedArray;
3654 
3655                 if (temp is null)
3656                 {
3657                     temp = uninitializedArray!Temp(workUnitSize);
3658                 }
3659 
3660                 rangeMutex.lock();
3661                 scope(exit) rangeMutex.unlock();
3662 
3663                 size_t i = 0;
3664                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3665                 {
3666                     temp[i] = addressOf(range.front);
3667                 }
3668 
3669                 temp = temp[0 .. i];
3670                 auto ret = nPopped;
3671                 nPopped += temp.length;
3672                 return ret;
3673             }
3674 
3675         }
3676         else
3677         {
3678 
3679             alias Temp = ElementType!R[];
3680             Temp temp;
3681 
3682             // Returns:  The previous value of nPopped.
3683             static if (!bufferTrick) size_t makeTemp()
3684             {
3685                 import std.array : uninitializedArray;
3686 
3687                 if (temp is null)
3688                 {
3689                     temp = uninitializedArray!Temp(workUnitSize);
3690                 }
3691 
3692                 rangeMutex.lock();
3693                 scope(exit) rangeMutex.unlock();
3694 
3695                 size_t i = 0;
3696                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3697                 {
3698                     temp[i] = range.front;
3699                 }
3700 
3701                 temp = temp[0 .. i];
3702                 auto ret = nPopped;
3703                 nPopped += temp.length;
3704                 return ret;
3705             }
3706 
3707             static if (bufferTrick) size_t makeTemp()
3708             {
3709                 import std.algorithm.mutation : swap;
3710                 rangeMutex.lock();
3711                 scope(exit) rangeMutex.unlock();
3712 
3713                 // Elide copying by just swapping buffers.
3714                 temp.length = range.buf1.length;
3715                 swap(range.buf1, temp);
3716 
3717                 // This is necessary in case popFront() has been called on
3718                 // range before entering the parallel foreach loop.
3719                 temp = temp[range.bufPos..$];
3720 
3721                 static if (is(typeof(range._length)))
3722                 {
3723                     range._length -= (temp.length - range.bufPos);
3724                 }
3725 
3726                 range.doBufSwap();
3727                 auto ret = nPopped;
3728                 nPopped += temp.length;
3729                 return ret;
3730             }
3731         }
3732 
3733         while (atomicLoad(shouldContinue))
3734         {
3735             auto overallIndex = makeTemp();
3736             if (temp.empty)
3737             {
3738                 atomicStore(shouldContinue, false);
3739                 break;
3740             }
3741 
3742             foreach (i; 0 .. temp.length)
3743             {
3744                 scope(success) overallIndex++;
3745 
3746                 static if (hasLvalueElements!R)
3747                 {
3748                     static if (withIndex)
3749                     {
3750                         if (dg(overallIndex, *temp[i])) foreachErr();
3751                     }
3752                     else
3753                     {
3754                         if (dg(*temp[i])) foreachErr();
3755                     }
3756                 }
3757                 else
3758                 {
3759                     static if (withIndex)
3760                     {
3761                         if (dg(overallIndex, temp[i])) foreachErr();
3762                     }
3763                     else
3764                     {
3765                         if (dg(temp[i])) foreachErr();
3766                     }
3767                 }
3768             }
3769         }
3770     }
3771 
3772     submitAndExecute(pool, &doIt);
3773 
3774     return 0;
3775 };
3776 
3777 // Calls e.next until the end of the chain is found.
findLastException(Throwable e)3778 private Throwable findLastException(Throwable e) pure nothrow
3779 {
3780     if (e is null) return null;
3781 
3782     while (e.next)
3783     {
3784         e = e.next;
3785     }
3786 
3787     return e;
3788 }
3789 
3790 // Adds e to the exception chain.
addToChain(Throwable e,ref Throwable firstException,ref Throwable lastException)3791 private void addToChain(
3792     Throwable e,
3793     ref Throwable firstException,
3794     ref Throwable lastException
3795 ) pure nothrow
3796 {
3797     if (firstException)
3798     {
3799         assert(lastException); // nocoverage
3800         lastException.next = e; // nocoverage
3801         lastException = findLastException(e); // nocoverage
3802     }
3803     else
3804     {
3805         firstException = e;
3806         lastException = findLastException(e);
3807     }
3808 }
3809 
ParallelForeach(R)3810 private struct ParallelForeach(R)
3811 {
3812     TaskPool pool;
3813     R range;
3814     size_t workUnitSize;
3815     alias E = ElementType!R;
3816 
3817     static if (hasLvalueElements!R)
3818     {
3819         alias NoIndexDg = int delegate(ref E);
3820         alias IndexDg = int delegate(size_t, ref E);
3821     }
3822     else
3823     {
3824         alias NoIndexDg = int delegate(E);
3825         alias IndexDg = int delegate(size_t, E);
3826     }
3827 
3828     int opApply(scope NoIndexDg dg)
3829     {
3830         static if (randLen!R)
3831         {
3832             mixin(parallelApplyMixinRandomAccess);
3833         }
3834         else
3835         {
3836             mixin(parallelApplyMixinInputRange);
3837         }
3838     }
3839 
3840     int opApply(scope IndexDg dg)
3841     {
3842         static if (randLen!R)
3843         {
3844             mixin(parallelApplyMixinRandomAccess);
3845         }
3846         else
3847         {
3848             mixin(parallelApplyMixinInputRange);
3849         }
3850     }
3851 }
3852 
3853 /*
3854 This struct buffers the output of a callable that outputs data into a
3855 user-supplied buffer into a set of buffers of some fixed size.  It allows these
3856 buffers to be accessed with an input range interface.  This is used internally
3857 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
3858 instance and forwards it to the input range overload of asyncBuf.
3859 */
RoundRobinBuffer(C1,C2)3860 private struct RoundRobinBuffer(C1, C2)
3861 {
3862     // No need for constraints because they're already checked for in asyncBuf.
3863 
3864     alias Array = Parameters!(C1.init)[0];
3865     alias T = typeof(Array.init[0]);
3866 
3867     T[][] bufs;
3868     size_t index;
3869     C1 nextDel;
3870     C2 emptyDel;
3871     bool _empty;
3872     bool primed;
3873 
3874     this(
3875         C1 nextDel,
3876         C2 emptyDel,
3877         size_t initialBufSize,
3878         size_t nBuffers
3879     ) {
3880         this.nextDel = nextDel;
3881         this.emptyDel = emptyDel;
3882         bufs.length = nBuffers;
3883 
3884         foreach (ref buf; bufs)
3885         {
3886             buf.length = initialBufSize;
3887         }
3888     }
3889 
3890     void prime()
3891     in
3892     {
3893         assert(!empty);
3894     }
3895     body
3896     {
3897         scope(success) primed = true;
3898         nextDel(bufs[index]);
3899     }
3900 
3901 
3902     T[] front() @property
3903     in
3904     {
3905         assert(!empty);
3906     }
3907     body
3908     {
3909         if (!primed) prime();
3910         return bufs[index];
3911     }
3912 
3913     void popFront()
3914     {
3915         if (empty || emptyDel())
3916         {
3917             _empty = true;
3918             return;
3919         }
3920 
3921         index = (index + 1) % bufs.length;
3922         primed = false;
3923     }
3924 
3925     bool empty() @property const @safe pure nothrow
3926     {
3927         return _empty;
3928     }
3929 }
3930 
version(unittest)3931 version (unittest)
3932 {
3933     // This was the only way I could get nested maps to work.
3934     __gshared TaskPool poolInstance;
3935 
3936     import std.stdio;
3937 }
3938 
3939 // These test basic functionality but don't stress test for threading bugs.
3940 // These are the tests that should be run every time Phobos is compiled.
3941 @system unittest
3942 {
3943     import std.algorithm.comparison : equal, min, max;
3944     import std.algorithm.iteration : filter, map, reduce;
3945     import std.array : split;
3946     import std.conv : text;
3947     import std.exception : assertThrown;
3948     import std.math : approxEqual, sqrt, log;
3949     import std.range : indexed, iota, join;
3950     import std.typecons : Tuple, tuple;
3951 
3952     poolInstance = new TaskPool(2);
3953     scope(exit) poolInstance.stop();
3954 
3955     // The only way this can be verified is manually.
3956     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
3957 
3958     auto oldPriority = poolInstance.priority;
3959     poolInstance.priority = Thread.PRIORITY_MAX;
3960     assert(poolInstance.priority == Thread.PRIORITY_MAX);
3961 
3962     poolInstance.priority = Thread.PRIORITY_MIN;
3963     assert(poolInstance.priority == Thread.PRIORITY_MIN);
3964 
3965     poolInstance.priority = oldPriority;
3966     assert(poolInstance.priority == oldPriority);
3967 
refFun(ref uint num)3968     static void refFun(ref uint num)
3969     {
3970         num++;
3971     }
3972 
3973     uint x;
3974 
3975     // Test task().
3976     auto t = task!refFun(x);
3977     poolInstance.put(t);
3978     t.yieldForce;
3979     assert(t.args[0] == 1);
3980 
3981     auto t2 = task(&refFun, x);
3982     poolInstance.put(t2);
3983     t2.yieldForce;
3984     assert(t2.args[0] == 1);
3985 
3986     // Test scopedTask().
3987     auto st = scopedTask!refFun(x);
3988     poolInstance.put(st);
3989     st.yieldForce;
3990     assert(st.args[0] == 1);
3991 
3992     auto st2 = scopedTask(&refFun, x);
3993     poolInstance.put(st2);
3994     st2.yieldForce;
3995     assert(st2.args[0] == 1);
3996 
3997     // Test executeInNewThread().
3998     auto ct = scopedTask!refFun(x);
3999     ct.executeInNewThread(Thread.PRIORITY_MAX);
4000     ct.yieldForce;
4001     assert(ct.args[0] == 1);
4002 
4003     // Test ref return.
4004     uint toInc = 0;
makeRef(T)4005     static ref T makeRef(T)(ref T num)
4006     {
4007         return num;
4008     }
4009 
4010     auto t3 = task!makeRef(toInc);
4011     taskPool.put(t3);
4012     assert(t3.args[0] == 0);
4013     t3.spinForce++;
4014     assert(t3.args[0] == 1);
4015 
testSafe()4016     static void testSafe() @safe {
4017         static int bump(int num)
4018         {
4019             return num + 1;
4020         }
4021 
4022         auto safePool = new TaskPool(0);
4023         auto t = task(&bump, 1);
4024         taskPool.put(t);
4025         assert(t.yieldForce == 2);
4026 
4027         auto st = scopedTask(&bump, 1);
4028         taskPool.put(st);
4029         assert(st.yieldForce == 2);
4030         safePool.stop();
4031     }
4032 
4033     auto arr = [1,2,3,4,5];
4034     auto nums = new uint[5];
4035     auto nums2 = new uint[5];
4036 
4037     foreach (i, ref elem; poolInstance.parallel(arr))
4038     {
4039         elem++;
4040         nums[i] = cast(uint) i + 2;
4041         nums2[i] = elem;
4042     }
4043 
4044     assert(nums == [2,3,4,5,6], text(nums));
4045     assert(nums2 == nums, text(nums2));
4046     assert(arr == nums, text(arr));
4047 
4048     // Test const/immutable arguments.
add(int lhs,int rhs)4049     static int add(int lhs, int rhs)
4050     {
4051         return lhs + rhs;
4052     }
4053     immutable addLhs = 1;
4054     immutable addRhs = 2;
4055     auto addTask = task(&add, addLhs, addRhs);
4056     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4057     poolInstance.put(addTask);
4058     poolInstance.put(addScopedTask);
4059     assert(addTask.yieldForce == 3);
4060     assert(addScopedTask.yieldForce == 3);
4061 
4062     // Test parallel foreach with non-random access range.
4063     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4064 
4065     foreach (i, elem; poolInstance.parallel(range))
4066     {
4067         nums[i] = cast(uint) i;
4068     }
4069 
4070     assert(nums == [0,1,2,3,4]);
4071 
4072     auto logs = new double[1_000_000];
4073     foreach (i, ref elem; poolInstance.parallel(logs))
4074     {
4075         elem = log(i + 1.0);
4076     }
4077 
foreach(i,elem;logs)4078     foreach (i, elem; logs)
4079     {
4080         assert(approxEqual(elem, cast(double) log(i + 1)));
4081     }
4082 
4083     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4084     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4085     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4086            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4087 
4088     auto tupleBuf = new Tuple!(int, int)[3];
4089     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4090     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4091     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4092     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4093 
4094     // Test amap with a non-array buffer.
4095     auto toIndex = new int[5];
4096     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4097     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4098     assert(equal(ind, [2, 4, 6, 8, 10]));
4099     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4100     poolInstance.amap!"a / 2"(ind, ind);
4101     assert(equal(ind, [1, 2, 3, 4, 5]));
4102     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4103 
4104     auto buf = new int[5];
4105     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4106     assert(buf == [1,4,9,16,25]);
4107     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4108     assert(buf == [1,4,9,16,25]);
4109 
4110     assert(poolInstance.reduce!"a + b"([1]) == 1);
4111     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4112     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4113     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4114     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4115     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4116            tuple(10, 24));
4117 
4118     immutable serialAns = reduce!"a + b"(iota(1000));
4119     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4120     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4121 
4122     // Test worker-local storage.
4123     auto wl = poolInstance.workerLocalStorage(0);
4124     foreach (i; poolInstance.parallel(iota(1000), 1))
4125     {
4126         wl.get = wl.get + i;
4127     }
4128 
4129     auto wlRange = wl.toRange;
4130     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4131     assert(parallelSum == 499500);
4132     assert(wlRange[0 .. 1][0] == wlRange[0]);
4133     assert(wlRange[1 .. 2][0] == wlRange[1]);
4134 
4135     // Test finish()
4136     {
slowFun()4137         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4138 
4139         auto pool1 = new TaskPool();
4140         auto tSlow = task!slowFun();
4141         pool1.put(tSlow);
4142         pool1.finish();
4143         tSlow.yieldForce;
4144         // Can't assert that pool1.status == PoolState.stopNow because status
4145         // doesn't change until after the "done" flag is set and the waiting
4146         // thread is woken up.
4147 
4148         auto pool2 = new TaskPool();
4149         auto tSlow2 = task!slowFun();
4150         pool2.put(tSlow2);
4151         pool2.finish(true); // blocking
4152         assert(tSlow2.done);
4153 
4154         // Test fix for Bug 8582 by making pool size zero.
4155         auto pool3 = new TaskPool(0);
4156         auto tSlow3 = task!slowFun();
4157         pool3.put(tSlow3);
4158         pool3.finish(true); // blocking
4159         assert(tSlow3.done);
4160 
4161         // This is correct because no thread will terminate unless pool2.status
4162         // and pool3.status have already been set to stopNow.
4163         assert(pool2.status == TaskPool.PoolState.stopNow);
4164         assert(pool3.status == TaskPool.PoolState.stopNow);
4165     }
4166 
4167     // Test default pool stuff.
4168     assert(taskPool.size == totalCPUs - 1);
4169 
4170     nums = new uint[1000];
4171     foreach (i; parallel(iota(1000)))
4172     {
4173         nums[i] = cast(uint) i;
4174     }
4175     assert(equal(nums, iota(1000)));
4176 
4177     assert(equal(
4178                poolInstance.map!"a * a"(iota(30_000_001), 10_000),
4179                map!"a * a"(iota(30_000_001))
4180            ));
4181 
4182     // The filter is to kill random access and test the non-random access
4183     // branch.
4184     assert(equal(
4185                poolInstance.map!"a * a"(
4186                    filter!"a == a"(iota(30_000_001)
4187                                   ), 10_000, 1000),
4188                map!"a * a"(iota(30_000_001))
4189            ));
4190 
4191     assert(
4192         reduce!"a + b"(0UL,
4193                        poolInstance.map!"a * a"(iota(3_000_001), 10_000)
4194                       ) ==
4195         reduce!"a + b"(0UL,
4196                        map!"a * a"(iota(3_000_001))
4197                       )
4198     );
4199 
4200     assert(equal(
4201                iota(1_000_002),
4202                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4203            ));
4204 
4205     {
4206         import std.conv : to;
4207         import std.file : deleteme;
4208 
4209         string temp_file = deleteme ~ "-tempDelMe.txt";
4210         auto file = File(temp_file, "wb");
scope(exit)4211         scope(exit)
4212         {
4213             file.close();
4214             import std.file;
4215             remove(temp_file);
4216         }
4217 
4218         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
foreach(row;written)4219         foreach (row; written)
4220         {
4221             file.writeln(join(to!(string[])(row), "\t"));
4222         }
4223 
4224         file = File(temp_file);
4225 
next(ref char[]buf)4226         void next(ref char[] buf)
4227         {
4228             file.readln(buf);
4229             import std.string : chomp;
4230             buf = chomp(buf);
4231         }
4232 
4233         double[][] read;
4234         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4235 
foreach(line;asyncReader)4236         foreach (line; asyncReader)
4237         {
4238             if (line.length == 0) continue;
4239             auto ls = line.split("\t");
4240             read ~= to!(double[])(ls);
4241         }
4242 
4243         assert(read == written);
4244         file.close();
4245     }
4246 
4247     // Test Map/AsyncBuf chaining.
4248 
4249     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4250     auto temp = poolInstance.map!sqrt(
4251                     abuf, 100, 5
4252                 );
4253     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4254     lmchain.popFront();
4255 
4256     int ii;
foreach(elem;(lmchain))4257     foreach ( elem; (lmchain))
4258     {
4259         if (!approxEqual(elem, ii))
4260         {
4261             stderr.writeln(ii, '\t', elem);
4262         }
4263         ii++;
4264     }
4265 
4266     // Test buffer trick in parallel foreach.
4267     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4268     abuf.popFront();
4269     auto bufTrickTest = new size_t[abuf.length];
foreach(i,elem;parallel (abuf))4270     foreach (i, elem; parallel(abuf))
4271     {
4272         bufTrickTest[i] = i;
4273     }
4274 
4275     assert(equal(iota(1_000_000), bufTrickTest));
4276 
4277     auto myTask = task!(std.math.abs)(-1);
4278     taskPool.put(myTask);
4279     assert(myTask.spinForce == 1);
4280 
4281     // Test that worker local storage from one pool receives an index of 0
4282     // when the index is queried w.r.t. another pool.  The only way to do this
4283     // is non-deterministically.
4284     foreach (i; parallel(iota(1000), 1))
4285     {
4286         assert(poolInstance.workerIndex == 0);
4287     }
4288 
4289     foreach (i; poolInstance.parallel(iota(1000), 1))
4290     {
4291         assert(taskPool.workerIndex == 0);
4292     }
4293 
4294     // Test exception handling.
parallelForeachThrow()4295     static void parallelForeachThrow()
4296     {
4297         foreach (elem; parallel(iota(10)))
4298         {
4299             throw new Exception("");
4300         }
4301     }
4302 
4303     assertThrown!Exception(parallelForeachThrow());
4304 
reduceException(int a,int b)4305     static int reduceException(int a, int b)
4306     {
4307         throw new Exception("");
4308     }
4309 
4310     assertThrown!Exception(
4311         poolInstance.reduce!reduceException(iota(3))
4312     );
4313 
mapException(int a)4314     static int mapException(int a)
4315     {
4316         throw new Exception("");
4317     }
4318 
4319     assertThrown!Exception(
4320         poolInstance.amap!mapException(iota(3))
4321     );
4322 
mapThrow()4323     static void mapThrow()
4324     {
4325         auto m = poolInstance.map!mapException(iota(3));
4326         m.popFront();
4327     }
4328 
4329     assertThrown!Exception(mapThrow());
4330 
4331     struct ThrowingRange
4332     {
frontThrowingRange4333         @property int front()
4334         {
4335             return 1;
4336         }
popFrontThrowingRange4337         void popFront()
4338         {
4339             throw new Exception("");
4340         }
4341         enum bool empty = false;
4342     }
4343 
4344     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4345 }
4346 
4347 //version = parallelismStressTest;
4348 
4349 // These are more like stress tests than real unit tests.  They print out
4350 // tons of stuff and should not be run every time make unittest is run.
version(parallelismStressTest)4351 version (parallelismStressTest)
4352 {
4353     @safe unittest
4354     {
4355         size_t attempt;
4356         for (; attempt < 10; attempt++)
4357             foreach (poolSize; [0, 4])
4358         {
4359 
4360             poolInstance = new TaskPool(poolSize);
4361 
4362             uint[] numbers = new uint[1_000];
4363 
4364             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4365             {
4366                 numbers[i] = cast(uint) i;
4367             }
4368 
4369             // Make sure it works.
4370             foreach (i; 0 .. numbers.length)
4371             {
4372                 assert(numbers[i] == i);
4373             }
4374 
4375             stderr.writeln("Done creating nums.");
4376 
4377 
4378             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4379             foreach (num; poolInstance.parallel(myNumbers))
4380             {
4381                 assert(num % 7 > 0 && num < 1000);
4382             }
4383             stderr.writeln("Done modulus test.");
4384 
4385             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4386             assert(squares.length == numbers.length);
4387             foreach (i, number; numbers)
4388             {
4389                 assert(squares[i] == number * number);
4390             }
4391             stderr.writeln("Done squares.");
4392 
4393             auto sumFuture = task!( reduce!"a + b" )(numbers);
4394             poolInstance.put(sumFuture);
4395 
4396             ulong sumSquares = 0;
4397             foreach (elem; numbers)
4398             {
4399                 sumSquares += elem * elem;
4400             }
4401 
4402             uint mySum = sumFuture.spinForce();
4403             assert(mySum == 999 * 1000 / 2);
4404 
4405             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4406             assert(mySum == mySumParallel);
4407             stderr.writeln("Done sums.");
4408 
4409             auto myTask = task(
4410             {
4411                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4412             });
4413             poolInstance.put(myTask);
4414 
4415             auto nestedOuter = "abcd";
4416             auto nestedInner =  iota(0, 10, 2);
4417 
4418             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4419             {
4420                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4421                 {
4422                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4423                 }
4424             }
4425 
4426             poolInstance.stop();
4427         }
4428 
4429         assert(attempt == 10);
4430         writeln("Press enter to go to next round of unittests.");
4431         readln();
4432     }
4433 
4434     // These unittests are intended more for actual testing and not so much
4435     // as examples.
4436     @safe unittest
4437     {
4438         foreach (attempt; 0 .. 10)
4439         foreach (poolSize; [0, 4])
4440         {
4441             poolInstance = new TaskPool(poolSize);
4442 
4443             // Test indexing.
4444             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4445             assert(poolInstance.workerIndex() == 0);
4446 
4447             // Test worker-local storage.
4448             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4449             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4450             {
4451                 workerLocalStorage.get++;
4452             }
4453             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4454             1_000_000 + poolInstance.size + 1);
4455 
4456             // Make sure work is reasonably balanced among threads.  This test is
4457             // non-deterministic and is more of a sanity check than something that
4458             // has an absolute pass/fail.
4459             shared(uint)[void*] nJobsByThread;
4460             foreach (thread; poolInstance.pool)
4461             {
4462                 nJobsByThread[cast(void*) thread] = 0;
4463             }
4464             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4465 
4466             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4467             {
4468                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4469             }
4470 
4471             stderr.writeln("\nCurrent thread is:  ",
4472             cast(void*) Thread.getThis());
4473             stderr.writeln("Workload distribution:  ");
4474             foreach (k, v; nJobsByThread)
4475             {
4476                 stderr.writeln(k, '\t', v);
4477             }
4478 
4479             // Test whether amap can be nested.
4480             real[][] matrix = new real[][](1000, 1000);
4481             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4482             {
4483                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4484                 {
4485                     matrix[i][j] = i * j;
4486                 }
4487             }
4488 
4489             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4490             static real mySqrt(real num)
4491             {
4492                 return sqrt(num);
4493             }
4494 
4495             static real[] parallelSqrt(real[] nums)
4496             {
4497                 return poolInstance.amap!mySqrt(nums);
4498             }
4499 
4500             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4501 
4502             foreach (i, row; sqrtMatrix)
4503             {
4504                 foreach (j, elem; row)
4505                 {
4506                     real shouldBe = sqrt( cast(real) i * j);
4507                     assert(approxEqual(shouldBe, elem));
4508                     sqrtMatrix[i][j] = shouldBe;
4509                 }
4510             }
4511 
4512             auto saySuccess = task(
4513             {
4514                 stderr.writeln(
4515                     "Success doing matrix stuff that involves nested pool use.");
4516             });
4517             poolInstance.put(saySuccess);
4518             saySuccess.workForce();
4519 
4520             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4521             // matrix.
4522 
4523             static real parallelSum(real[] input)
4524             {
4525                 return poolInstance.reduce!"a + b"(input);
4526             }
4527 
4528             auto sumSqrt = poolInstance.reduce!"a + b"(
4529                                poolInstance.amap!parallelSum(
4530                                    sqrtMatrix
4531                                )
4532                            );
4533 
4534             assert(approxEqual(sumSqrt, 4.437e8));
4535             stderr.writeln("Done sum of square roots.");
4536 
4537             // Test whether tasks work with function pointers.
4538             auto nanTask = task(&isNaN, 1.0L);
4539             poolInstance.put(nanTask);
4540             assert(nanTask.spinForce == false);
4541 
4542             if (poolInstance.size > 0)
4543             {
4544                 // Test work waiting.
4545                 static void uselessFun()
4546                 {
4547                     foreach (i; 0 .. 1_000_000) {}
4548                 }
4549 
4550                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4551                 foreach (ref uselessTask; uselessTasks)
4552                 {
4553                     uselessTask = task(&uselessFun);
4554                 }
4555                 foreach (ref uselessTask; uselessTasks)
4556                 {
4557                     poolInstance.put(uselessTask);
4558                 }
4559                 foreach (ref uselessTask; uselessTasks)
4560                 {
4561                     uselessTask.workForce();
4562                 }
4563             }
4564 
4565             // Test the case of non-random access + ref returns.
4566             int[] nums = [1,2,3,4,5];
4567             static struct RemoveRandom
4568             {
4569                 int[] arr;
4570 
4571                 ref int front()
4572                 {
4573                     return arr.front;
4574                 }
4575                 void popFront()
4576                 {
4577                     arr.popFront();
4578                 }
4579                 bool empty()
4580                 {
4581                     return arr.empty;
4582                 }
4583             }
4584 
4585             auto refRange = RemoveRandom(nums);
4586             foreach (ref elem; poolInstance.parallel(refRange))
4587             {
4588                 elem++;
4589             }
4590             assert(nums == [2,3,4,5,6], text(nums));
4591             stderr.writeln("Nums:  ", nums);
4592 
4593             poolInstance.stop();
4594         }
4595     }
4596 }
4597 
version(unittest)4598 version (unittest)
4599 {
4600     struct __S_12733
4601     {
4602         invariant() { assert(checksum == 1_234_567_890); }
4603         this(ulong u){n = u;}
4604         void opAssign(__S_12733 s){this.n = s.n;}
4605         ulong n;
4606         ulong checksum = 1_234_567_890;
4607     }
4608 
4609     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4610 }
4611 
4612 @system unittest
4613 {
4614     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4615 
4616     auto result = taskPool.amap!__genPair_12733(data);
4617 }
4618 
4619 @safe unittest
4620 {
4621     import std.range : iota;
4622 
4623     // this test was in std.range, but caused cycles.
4624     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4625 }
4626 
4627 @safe unittest
4628 {
4629     import std.algorithm.iteration : each;
4630 
4631     long[] arr;
4632     static assert(is(typeof({
4633         arr.parallel.each!"a++";
4634     })));
4635 }
4636 
4637 // https://issues.dlang.org/show_bug.cgi?id=17539
4638 @system unittest
4639 {
4640     import std.random : rndGen;
4641     // ensure compilation
4642     try foreach (rnd; rndGen.parallel) break;
catch(ParallelForeachError e)4643     catch (ParallelForeachError e) {}
4644 }
4645