1 /**
2 $(D std._parallelism) implements high-level primitives for SMP _parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise _parallelism.  $(D std._parallelism) is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 $(D std.concurrency).
10 $(D std._parallelism) is based on the concept of a $(D Task).  A $(D Task) is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other $(D Task).  Using $(D Task)
13 directly allows programming with a future/promise paradigm.  All other
14 supported _parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over $(D Task).  They
16 automatically create one or more $(D Task) objects, or closely related types
17 that are conceptually identical but not part of the public API.
19 After creation, a $(D Task) may be executed in a new thread, or submitted
20 to a $(D TaskPool) for execution.  A $(D TaskPool) encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of $(D Task)s onto a smaller number of threads.  A task queue is a
23 FIFO queue of $(D Task) objects that have been submitted to the
24 $(D TaskPool) and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the $(D Task) at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a $(D Task) is needed before execution
29 by a worker thread has begun, the $(D Task) can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
32 Warning:  Unless marked as $(D @trusted) or $(D @safe), artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
36 Source:    $(PHOBOSSRC std/_parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
43 ///
44 @system unittest
45 {
46     import std.algorithm.iteration : map;
47     import std.math : approxEqual;
48     import std.parallelism : taskPool;
49     import std.range : iota;
51     // Parallel reduce can be combined with
52     // std.algorithm.iteration.map to interesting effect.
53     // The following example (thanks to Russel Winder)
54     // calculates pi by quadrature  using
55     // std.algorithm.map and TaskPool.reduce.
56     // getTerm is evaluated in parallel as needed by
57     // TaskPool.reduce.
58     //
59     // Timings on an Intel i5-3450 quad core machine
60     // for n = 1_000_000_000:
61     //
62     // TaskPool.reduce:       1.067 s
63     // std.algorithm.reduce:  4.011 s
65     enum n = 1_000_000;
66     enum delta = 1.0 / n;
68     alias getTerm = (int i)
69     {
70         immutable x = ( i - 0.5 ) * delta;
71         return delta / ( 1.0 + x * x ) ;
72     };
74     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
76     assert(pi.approxEqual(3.1415926));
77 }
79 import core.atomic;
80 import core.memory;
81 import core.sync.condition;
82 import core.thread;
84 import std.functional;
85 import std.meta;
86 import std.range.primitives;
87 import std.traits;
version(OSX)89 version (OSX)
90 {
91     version = useSysctlbyname;
92 }
version(FreeBSD)93 else version (FreeBSD)
94 {
95     version = useSysctlbyname;
96 }
version(DragonFlyBSD)97 else version (DragonFlyBSD)
98 {
99     version = useSysctlbyname;
100 }
version(NetBSD)101 else version (NetBSD)
102 {
103     version = useSysctlbyname;
104 }
version(Windows)107 version (Windows)
108 {
109     // BUGS:  Only works on Windows 2000 and above.
110     shared static this()
111     {
112         import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
113         import std.algorithm.comparison : max;
115         SYSTEM_INFO si;
116         GetSystemInfo(&si);
117         totalCPUs = max(1, cast(uint) si.dwNumberOfProcessors);
118     }
120 }
version(linux)121 else version (linux)
122 {
123     shared static this()
124     {
125         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
126         totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
127     }
128 }
version(Solaris)129 else version (Solaris)
130 {
131     shared static this()
132     {
133         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
134         totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
135     }
136 }
version(useSysctlbyname)137 else version (useSysctlbyname)
138 {
139     extern(C) int sysctlbyname(
140         const char *, void *, size_t *, void *, size_t
141     );
143     shared static this()
144     {
145         version (OSX)
146         {
147             auto nameStr = "machdep.cpu.core_count\0".ptr;
148         }
149         else version (FreeBSD)
150         {
151             auto nameStr = "hw.ncpu\0".ptr;
152         }
153         else version (DragonFlyBSD)
154         {
155             auto nameStr = "hw.ncpu\0".ptr;
156         }
157         else version (NetBSD)
158         {
159             auto nameStr = "hw.ncpu\0".ptr;
160         }
162         uint ans;
163         size_t len = uint.sizeof;
164         sysctlbyname(nameStr, &ans, &len, null, 0);
165         totalCPUs = ans;
166     }
168 }
169 else
170 {
171     static assert(0, "Don't know how to get N CPUs on this OS.");
172 }
174 immutable size_t cacheLineSize;
this()175 shared static this()
176 {
177     import core.cpuid : datacache;
178     size_t lineSize = 0;
179     foreach (cachelevel; datacache)
180     {
181         if (cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max)
182         {
183             lineSize = cachelevel.lineSize;
184         }
185     }
187     cacheLineSize = lineSize;
188 }
191 /* Atomics code.  These forward to core.atomic, but are written like this
192    for two reasons:
194    1.  They used to actually contain ASM code and I don' want to have to change
195        to directly calling core.atomic in a zillion different places.
197    2.  core.atomic has some misc. issues that make my use cases difficult
198        without wrapping it.  If I didn't wrap it, casts would be required
199        basically everywhere.
200 */
201 private void atomicSetUbyte(T)(ref T stuff, T newVal)
202 if (__traits(isIntegral, T) && is(T : ubyte))
203 {
204     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
205     atomicStore(*(cast(shared) &stuff), newVal);
206 }
208 private ubyte atomicReadUbyte(T)(ref T val)
209 if (__traits(isIntegral, T) && is(T : ubyte))
210 {
211     return atomicLoad(*(cast(shared) &val));
212 }
214 // This gets rid of the need for a lot of annoying casts in other parts of the
215 // code, when enums are involved.
216 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
217 if (__traits(isIntegral, T) && is(T : ubyte))
218 {
219     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
220 }
222 /*--------------------- Generic helper functions, etc.------------------------*/
MapType(R,functions...)223 private template MapType(R, functions...)
224 {
225     static assert(functions.length);
227     ElementType!R e = void;
228     alias MapType =
229         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
230 }
ReduceType(alias fun,R,E)232 private template ReduceType(alias fun, R, E)
233 {
234     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
235 }
noUnsharedAliasing(T)237 private template noUnsharedAliasing(T)
238 {
239     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
240 }
242 // This template tests whether a function may be executed in parallel from
243 // @safe code via Task.executeInNewThread().  There is an additional
244 // requirement for executing it via a TaskPool.  (See isSafeReturn).
isSafeTask(F)245 private template isSafeTask(F)
246 {
247     enum bool isSafeTask =
248         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
249         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
250         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
251         allSatisfy!(noUnsharedAliasing, Parameters!F);
252 }
254 @safe unittest
255 {
256     alias F1 = void function() @safe;
257     alias F2 = void function();
258     alias F3 = void function(uint, string) @trusted;
259     alias F4 = void function(uint, char[]);
261     static assert( isSafeTask!F1);
262     static assert(!isSafeTask!F2);
263     static assert( isSafeTask!F3);
264     static assert(!isSafeTask!F4);
266     alias F5 = uint[] function(uint, string) pure @trusted;
267     static assert( isSafeTask!F5);
268 }
270 // This function decides whether Tasks that meet all of the other requirements
271 // for being executed from @safe code can be executed on a TaskPool.
272 // When executing via TaskPool, it's theoretically possible
273 // to return a value that is also pointed to by a worker thread's thread local
274 // storage.  When executing from executeInNewThread(), the thread that executed
275 // the Task is terminated by the time the return value is visible in the calling
276 // thread, so this is a non-issue.  It's also a non-issue for pure functions
277 // since they can't read global state.
isSafeReturn(T)278 private template isSafeReturn(T)
279 {
280     static if (!hasUnsharedAliasing!(T.ReturnType))
281     {
282         enum isSafeReturn = true;
283     }
284     else static if (T.isPure)
285     {
286         enum isSafeReturn = true;
287     }
288     else
289     {
290         enum isSafeReturn = false;
291     }
292 }
randAssignable(R)294 private template randAssignable(R)
295 {
296     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
297 }
299 private enum TaskStatus : ubyte
300 {
301     notStarted,
302     inProgress,
303     done
304 }
AliasReturn(alias fun,T...)306 private template AliasReturn(alias fun, T...)
307 {
308     alias AliasReturn = typeof({ T args; return fun(args); });
309 }
311 // Should be private, but std.algorithm.reduce is used in the zero-thread case
312 // and won't work w/ private.
reduceAdjoin(functions...)313 template reduceAdjoin(functions...)
314 {
315     static if (functions.length == 1)
316     {
317         alias reduceAdjoin = binaryFun!(functions[0]);
318     }
319     else
320     {
321         T reduceAdjoin(T, U)(T lhs, U rhs)
322         {
323             alias funs = staticMap!(binaryFun, functions);
325             foreach (i, Unused; typeof(lhs.expand))
326             {
327                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
328             }
330             return lhs;
331         }
332     }
333 }
reduceFinish(functions...)335 private template reduceFinish(functions...)
336 {
337     static if (functions.length == 1)
338     {
339         alias reduceFinish = binaryFun!(functions[0]);
340     }
341     else
342     {
343         T reduceFinish(T)(T lhs, T rhs)
344         {
345             alias funs = staticMap!(binaryFun, functions);
347             foreach (i, Unused; typeof(lhs.expand))
348             {
349                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
350             }
352             return lhs;
353         }
354     }
355 }
357 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
358 {
359     enum isRoundRobin = true;
360 }
isRoundRobin(T)362 private template isRoundRobin(T)
363 {
364     enum isRoundRobin = false;
365 }
367 @safe unittest
368 {
369     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
370     static assert(!isRoundRobin!(uint));
371 }
373 // This is the base "class" for all of the other tasks.  Using C-style
374 // polymorphism to allow more direct control over memory allocation, etc.
375 private struct AbstractTask
376 {
377     AbstractTask* prev;
378     AbstractTask* next;
380     // Pointer to a function that executes this task.
381     void function(void*) runTask;
383     Throwable exception;
384     ubyte taskStatus = TaskStatus.notStarted;
doneAbstractTask386     bool done() @property
387     {
388         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
389         {
390             if (exception)
391             {
392                 throw exception;
393             }
395             return true;
396         }
398         return false;
399     }
jobAbstractTask401     void job()
402     {
403         runTask(&this);
404     }
405 }
407 /**
408 $(D Task) represents the fundamental unit of work.  A $(D Task) may be
409 executed in parallel with any other $(D Task).  Using this struct directly
410 allows future/promise _parallelism.  In this paradigm, a function (or delegate
411 or other callable) is executed in a thread other than the one it was called
412 from.  The calling thread does not block while the function is being executed.
413 A call to $(D workForce), $(D yieldForce), or $(D spinForce) is used to
414 ensure that the $(D Task) has finished executing and to obtain the return
415 value, if any.  These functions and $(D done) also act as full memory barriers,
416 meaning that any memory writes made in the thread that executed the $(D Task)
417 are guaranteed to be visible in the calling thread after one of these functions
418 returns.
420 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
421 be used to create an instance of this struct.  See $(D task) for usage examples.
423 Function results are returned from $(D yieldForce), $(D spinForce) and
424 $(D workForce) by ref.  If $(D fun) returns by ref, the reference will point
425 to the returned reference of $(D fun).  Otherwise it will point to a
426 field in this struct.
428 Copying of this struct is disabled, since it would provide no useful semantics.
429 If you want to pass this struct around, you should do so by reference or
430 pointer.
432 Bugs:  Changes to $(D ref) and $(D out) arguments are not propagated to the
433        call site, only to $(D args) in this struct.
434 */
Task(alias fun,Args...)435 struct Task(alias fun, Args...)
436 {
437     AbstractTask base = {runTask : &impl};
438     alias base this;
440     private @property AbstractTask* basePtr()
441     {
442         return &base;
443     }
445     private static void impl(void* myTask)
446     {
447         import std.algorithm.internal : addressOf;
449         Task* myCastedTask = cast(typeof(this)*) myTask;
450         static if (is(ReturnType == void))
451         {
452             fun(myCastedTask._args);
453         }
454         else static if (is(typeof(addressOf(fun(myCastedTask._args)))))
455         {
456             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
457         }
458         else
459         {
460             myCastedTask.returnVal = fun(myCastedTask._args);
461         }
462     }
464     private TaskPool pool;
465     private bool isScoped;  // True if created with scopedTask.
467     Args _args;
469     /**
470     The arguments the function was called with.  Changes to $(D out) and
471     $(D ref) arguments will be visible here.
472     */
473     static if (__traits(isSame, fun, run))
474     {
475         alias args = _args[1..$];
476     }
477     else
478     {
479         alias args = _args;
480     }
483     // The purpose of this code is to decide whether functions whose
484     // return values have unshared aliasing can be executed via
485     // TaskPool from @safe code.  See isSafeReturn.
486     static if (__traits(isSame, fun, run))
487     {
488         static if (isFunctionPointer!(_args[0]))
489         {
490             private enum bool isPure =
491             functionAttributes!(Args[0]) & FunctionAttribute.pure_;
492         }
493         else
494         {
495             // BUG:  Should check this for delegates too, but std.traits
496             //       apparently doesn't allow this.  isPure is irrelevant
497             //       for delegates, at least for now since shared delegates
498             //       don't work.
499             private enum bool isPure = false;
500         }
502     }
503     else
504     {
505         // We already know that we can't execute aliases in @safe code, so
506         // just put a dummy value here.
507         private enum bool isPure = false;
508     }
511     /**
512     The return type of the function called by this $(D Task).  This can be
513     $(D void).
514     */
515     alias ReturnType = typeof(fun(_args));
517     static if (!is(ReturnType == void))
518     {
519         static if (is(typeof(&fun(_args))))
520         {
521             // Ref return.
522             ReturnType* returnVal;
524             ref ReturnType fixRef(ReturnType* val)
525             {
526                 return *val;
527             }
529         }
530         else
531         {
532             ReturnType returnVal;
534             ref ReturnType fixRef(ref ReturnType val)
535             {
536                 return val;
537             }
538         }
539     }
541     private void enforcePool()
542     {
543         import std.exception : enforce;
544         enforce(this.pool !is null, "Job not submitted yet.");
545     }
547     static if (Args.length > 0)
548     {
549         private this(Args args)
550         {
551             _args = args;
552         }
553     }
555     // Work around DMD bug 6588, allow immutable elements.
556     static if (allSatisfy!(isAssignable, Args))
557     {
558         typeof(this) opAssign(typeof(this) rhs)
559         {
560             foreach (i, Type; typeof(this.tupleof))
561             {
562                 this.tupleof[i] = rhs.tupleof[i];
563             }
564             return this;
565         }
566     }
567     else
568     {
569         @disable typeof(this) opAssign(typeof(this) rhs)
570         {
571             assert(0);
572         }
573     }
575     /**
576     If the $(D Task) isn't started yet, execute it in the current thread.
577     If it's done, return its return value, if any.  If it's in progress,
578     busy spin until it's done, then return the return value.  If it threw
579     an exception, rethrow that exception.
581     This function should be used when you expect the result of the
582     $(D Task) to be available on a timescale shorter than that of an OS
583     context switch.
584      */
585     @property ref ReturnType spinForce() @trusted
586     {
587         enforcePool();
589         this.pool.tryDeleteExecute(basePtr);
591         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
593         if (exception)
594         {
595             throw exception;
596         }
598         static if (!is(ReturnType == void))
599         {
600             return fixRef(this.returnVal);
601         }
602     }
604     /**
605     If the $(D Task) isn't started yet, execute it in the current thread.
606     If it's done, return its return value, if any.  If it's in progress,
607     wait on a condition variable.  If it threw an exception, rethrow that
608     exception.
610     This function should be used for expensive functions, as waiting on a
611     condition variable introduces latency, but avoids wasted CPU cycles.
612      */
613     @property ref ReturnType yieldForce() @trusted
614     {
615         enforcePool();
616         this.pool.tryDeleteExecute(basePtr);
618         if (done)
619         {
620             static if (is(ReturnType == void))
621             {
622                 return;
623             }
624             else
625             {
626                 return fixRef(this.returnVal);
627             }
628         }
630         pool.waiterLock();
631         scope(exit) pool.waiterUnlock();
633         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
634         {
635             pool.waitUntilCompletion();
636         }
638         if (exception)
639         {
640             throw exception; // nocoverage
641         }
643         static if (!is(ReturnType == void))
644         {
645             return fixRef(this.returnVal);
646         }
647     }
649     /**
650     If this $(D Task) was not started yet, execute it in the current
651     thread.  If it is finished, return its result.  If it is in progress,
652     execute any other $(D Task) from the $(D TaskPool) instance that
653     this $(D Task) was submitted to until this one
654     is finished.  If it threw an exception, rethrow that exception.
655     If no other tasks are available or this $(D Task) was executed using
656     $(D executeInNewThread), wait on a condition variable.
657      */
658     @property ref ReturnType workForce() @trusted
659     {
660         enforcePool();
661         this.pool.tryDeleteExecute(basePtr);
663         while (true)
664         {
665             if (done)    // done() implicitly checks for exceptions.
666             {
667                 static if (is(ReturnType == void))
668                 {
669                     return;
670                 }
671                 else
672                 {
673                     return fixRef(this.returnVal);
674                 }
675             }
677             AbstractTask* job;
678             {
679                 // Locking explicitly and calling popNoSync() because
680                 // pop() waits on a condition variable if there are no Tasks
681                 // in the queue.
683                 pool.queueLock();
684                 scope(exit) pool.queueUnlock();
685                 job = pool.popNoSync();
686             }
689             if (job !is null)
690             {
692                 version (verboseUnittest)
693                 {
694                     stderr.writeln("Doing workForce work.");
695                 }
697                 pool.doJob(job);
699                 if (done)
700                 {
701                     static if (is(ReturnType == void))
702                     {
703                         return;
704                     }
705                     else
706                     {
707                         return fixRef(this.returnVal);
708                     }
709                 }
710             }
711             else
712             {
713                 version (verboseUnittest)
714                 {
715                     stderr.writeln("Yield from workForce.");
716                 }
718                 return yieldForce;
719             }
720         }
721     }
723     /**
724     Returns $(D true) if the $(D Task) is finished executing.
726     Throws:  Rethrows any exception thrown during the execution of the
727              $(D Task).
728     */
729     @property bool done() @trusted
730     {
731         // Explicitly forwarded for documentation purposes.
732         return base.done;
733     }
735     /**
736     Create a new thread for executing this $(D Task), execute it in the
737     newly created thread, then terminate the thread.  This can be used for
738     future/promise parallelism.  An explicit priority may be given
739     to the $(D Task).  If one is provided, its value is forwarded to
740     $(D core.thread.Thread.priority). See $(REF task, std,parallelism) for
741     usage example.
742     */
743     void executeInNewThread() @trusted
744     {
745         pool = new TaskPool(basePtr);
746     }
748     /// Ditto
749     void executeInNewThread(int priority) @trusted
750     {
751         pool = new TaskPool(basePtr, priority);
752     }
754     @safe ~this()
755     {
756         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
757         {
758             yieldForce;
759         }
760     }
762     // When this is uncommented, it somehow gets called on returning from
763     // scopedTask even though the struct shouldn't be getting copied.
764     //@disable this(this) {}
765 }
767 // Calls $(D fpOrDelegate) with $(D args).  This is an
768 // adapter that makes $(D Task) work with delegates, function pointers and
769 // functors instead of just aliases.
770 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
771 {
772     return fpOrDelegate(args);
773 }
775 /**
776 Creates a $(D Task) on the GC heap that calls an alias.  This may be executed
777 via $(D Task.executeInNewThread) or by submitting to a
778 $(REF TaskPool, std,parallelism).  A globally accessible instance of
779 $(D TaskPool) is provided by $(REF taskPool, std,parallelism).
781 Returns:  A pointer to the $(D Task).
783 Example:
784 ---
785 // Read two files into memory at the same time.
786 import std.file;
788 void main()
789 {
790     // Create and execute a Task for reading
791     // foo.txt.
792     auto file1Task = task!read("foo.txt");
793     file1Task.executeInNewThread();
795     // Read bar.txt in parallel.
796     auto file2Data = read("bar.txt");
798     // Get the results of reading foo.txt.
799     auto file1Data = file1Task.yieldForce;
800 }
801 ---
803 ---
804 // Sorts an array using a parallel quick sort algorithm.
805 // The first partition is done serially.  Both recursion
806 // branches are then executed in parallel.
807 //
808 // Timings for sorting an array of 1,000,000 doubles on
809 // an Athlon 64 X2 dual core machine:
810 //
811 // This implementation:               176 milliseconds.
812 // Equivalent serial implementation:  280 milliseconds
813 void parallelSort(T)(T[] data)
814 {
815     // Sort small subarrays serially.
816     if (data.length < 100)
817     {
818          std.algorithm.sort(data);
819          return;
820     }
822     // Partition the array.
823     swap(data[$ / 2], data[$ - 1]);
824     auto pivot = data[$ - 1];
825     bool lessThanPivot(T elem) { return elem < pivot; }
827     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
828     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
830     auto less = data[0..$ - greaterEqual.length - 1];
831     greaterEqual = data[$ - greaterEqual.length..$];
833     // Execute both recursion branches in parallel.
834     auto recurseTask = task!parallelSort(greaterEqual);
835     taskPool.put(recurseTask);
836     parallelSort(less);
837     recurseTask.yieldForce;
838 }
839 ---
840 */
task(alias fun,Args...)841 auto task(alias fun, Args...)(Args args)
842 {
843     return new Task!(fun, Args)(args);
844 }
846 /**
847 Creates a $(D Task) on the GC heap that calls a function pointer, delegate, or
848 class/struct with overloaded opCall.
850 Example:
851 ---
852 // Read two files in at the same time again,
853 // but this time use a function pointer instead
854 // of an alias to represent std.file.read.
855 import std.file;
857 void main()
858 {
859     // Create and execute a Task for reading
860     // foo.txt.
861     auto file1Task = task(&read, "foo.txt");
862     file1Task.executeInNewThread();
864     // Read bar.txt in parallel.
865     auto file2Data = read("bar.txt");
867     // Get the results of reading foo.txt.
868     auto file1Data = file1Task.yieldForce;
869 }
870 ---
872 Notes: This function takes a non-scope delegate, meaning it can be
873        used with closures.  If you can't allocate a closure due to objects
874        on the stack that have scoped destruction, see $(D scopedTask), which
875        takes a scope delegate.
876  */
877 auto task(F, Args...)(F delegateOrFp, Args args)
878 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
879 {
880     return new Task!(run, F, Args)(delegateOrFp, args);
881 }
883 /**
884 Version of $(D task) usable from $(D @safe) code.  Usage mechanics are
885 identical to the non-@safe case, but safety introduces some restrictions:
887 1.  $(D fun) must be @safe or @trusted.
889 2.  $(D F) must not have any unshared aliasing as defined by
890     $(REF hasUnsharedAliasing, std,traits).  This means it
891     may not be an unshared delegate or a non-shared class or struct
892     with overloaded $(D opCall).  This also precludes accepting template
893     alias parameters.
895 3.  $(D Args) must not have unshared aliasing.
897 4.  $(D fun) must not return by reference.
899 5.  The return type must not have unshared aliasing unless $(D fun) is
900     $(D pure) or the $(D Task) is executed via $(D executeInNewThread) instead
901     of using a $(D TaskPool).
903 */
904 @trusted auto task(F, Args...)(F fun, Args args)
905 if (is(typeof(fun(args))) && isSafeTask!F)
906 {
907     return new Task!(run, F, Args)(fun, args);
908 }
910 /**
911 These functions allow the creation of $(D Task) objects on the stack rather
912 than the GC heap.  The lifetime of a $(D Task) created by $(D scopedTask)
913 cannot exceed the lifetime of the scope it was created in.
915 $(D scopedTask) might be preferred over $(D task):
917 1.  When a $(D Task) that calls a delegate is being created and a closure
918     cannot be allocated due to objects on the stack that have scoped
919     destruction.  The delegate overload of $(D scopedTask) takes a $(D scope)
920     delegate.
922 2.  As a micro-optimization, to avoid the heap allocation associated with
923     $(D task) or with the creation of a closure.
925 Usage is otherwise identical to $(D task).
927 Notes:  $(D Task) objects created using $(D scopedTask) will automatically
928 call $(D Task.yieldForce) in their destructor if necessary to ensure
929 the $(D Task) is complete before the stack frame they reside on is destroyed.
930 */
scopedTask(alias fun,Args...)931 auto scopedTask(alias fun, Args...)(Args args)
932 {
933     auto ret = Task!(fun, Args)(args);
934     ret.isScoped = true;
935     return ret;
936 }
938 /// Ditto
939 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
940 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
941 {
942     auto ret = Task!(run, F, Args)(delegateOrFp, args);
943     ret.isScoped = true;
944     return ret;
945 }
947 /// Ditto
948 @trusted auto scopedTask(F, Args...)(F fun, Args args)
949 if (is(typeof(fun(args))) && isSafeTask!F)
950 {
951     auto ret = Task!(run, F, Args)(fun, args);
952     ret.isScoped = true;
953     return ret;
954 }
956 /**
957 The total number of CPU cores available on the current machine, as reported by
958 the operating system.
959 */
960 immutable uint totalCPUs;
962 /*
963 This class serves two purposes:
965 1.  It distinguishes std.parallelism threads from other threads so that
966     the std.parallelism daemon threads can be terminated.
968 2.  It adds a reference to the pool that the thread is a member of,
969     which is also necessary to allow the daemon threads to be properly
970     terminated.
971 */
972 private final class ParallelismThread : Thread
973 {
this(void delegate ()dg)974     this(void delegate() dg)
975     {
976         super(dg);
977     }
979     TaskPool pool;
980 }
982 // Kill daemon threads.
~this()983 shared static ~this()
984 {
985     foreach (ref thread; Thread)
986     {
987         auto pthread = cast(ParallelismThread) thread;
988         if (pthread is null) continue;
989         auto pool = pthread.pool;
990         if (!pool.isDaemon) continue;
991         pool.stop();
992         pthread.join();
993     }
994 }
996 /**
997 This class encapsulates a task queue and a set of worker threads.  Its purpose
998 is to efficiently map a large number of $(D Task)s onto a smaller number of
999 threads.  A task queue is a FIFO queue of $(D Task) objects that have been
1000 submitted to the $(D TaskPool) and are awaiting execution.  A worker thread is a
1001 thread that executes the $(D Task) at the front of the queue when one is
1002 available and sleeps when the queue is empty.
1004 This class should usually be used via the global instantiation
1005 available via the $(REF taskPool, std,parallelism) property.
1006 Occasionally it is useful to explicitly instantiate a $(D TaskPool):
1008 1.  When you want $(D TaskPool) instances with multiple priorities, for example
1009     a low priority pool and a high priority pool.
1011 2.  When the threads in the global task pool are waiting on a synchronization
1012     primitive (for example a mutex), and you want to parallelize the code that
1013     needs to run before these threads can be resumed.
1014  */
1015 final class TaskPool
1016 {
1017 private:
1019     // A pool can either be a regular pool or a single-task pool.  A
1020     // single-task pool is a dummy pool that's fired up for
1021     // Task.executeInNewThread().
1022     bool isSingleTask;
1024     ParallelismThread[] pool;
1025     Thread singleTaskThread;
1027     AbstractTask* head;
1028     AbstractTask* tail;
1029     PoolState status = PoolState.running;
1030     Condition workerCondition;
1031     Condition waiterCondition;
1032     Mutex queueMutex;
1033     Mutex waiterMutex;  // For waiterCondition
1035     // The instanceStartIndex of the next instance that will be created.
1036     __gshared static size_t nextInstanceIndex = 1;
1038     // The index of the current thread.
1039     static size_t threadIndex;
1041     // The index of the first thread in this instance.
1042     immutable size_t instanceStartIndex;
1044     // The index that the next thread to be initialized in this pool will have.
1045     size_t nextThreadIndex;
1047     enum PoolState : ubyte
1048     {
1049         running,
1050         finishing,
1051         stopNow
1052     }
doJob(AbstractTask * job)1054     void doJob(AbstractTask* job)
1055     {
1056         assert(job.taskStatus == TaskStatus.inProgress);
1057         assert(job.next is null);
1058         assert(job.prev is null);
1060         scope(exit)
1061         {
1062             if (!isSingleTask)
1063             {
1064                 waiterLock();
1065                 scope(exit) waiterUnlock();
1066                 notifyWaiters();
1067             }
1068         }
1070         try
1071         {
1072             job.job();
1073         }
1074         catch (Throwable e)
1075         {
1076             job.exception = e;
1077         }
1079         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1080     }
1082     // This function is used for dummy pools created by Task.executeInNewThread().
doSingleTask()1083     void doSingleTask()
1084     {
1085         // No synchronization.  Pool is guaranteed to only have one thread,
1086         // and the queue is submitted to before this thread is created.
1087         assert(head);
1088         auto t = head;
1089         t.next = t.prev = head = null;
1090         doJob(t);
1091     }
1093     // This function performs initialization for each thread that affects
1094     // thread local storage and therefore must be done from within the
1095     // worker thread.  It then calls executeWorkLoop().
startWorkLoop()1096     void startWorkLoop()
1097     {
1098         // Initialize thread index.
1099         {
1100             queueLock();
1101             scope(exit) queueUnlock();
1102             threadIndex = nextThreadIndex;
1103             nextThreadIndex++;
1104         }
1106         executeWorkLoop();
1107     }
1109     // This is the main work loop that worker threads spend their time in
1110     // until they terminate.  It's also entered by non-worker threads when
1111     // finish() is called with the blocking variable set to true.
executeWorkLoop()1112     void executeWorkLoop()
1113     {
1114         while (atomicReadUbyte(status) != PoolState.stopNow)
1115         {
1116             AbstractTask* task = pop();
1117             if (task is null)
1118             {
1119                 if (atomicReadUbyte(status) == PoolState.finishing)
1120                 {
1121                     atomicSetUbyte(status, PoolState.stopNow);
1122                     return;
1123                 }
1124             }
1125             else
1126             {
1127                 doJob(task);
1128             }
1129         }
1130     }
1132     // Pop a task off the queue.
pop()1133     AbstractTask* pop()
1134     {
1135         queueLock();
1136         scope(exit) queueUnlock();
1137         auto ret = popNoSync();
1138         while (ret is null && status == PoolState.running)
1139         {
1140             wait();
1141             ret = popNoSync();
1142         }
1143         return ret;
1144     }
popNoSync()1146     AbstractTask* popNoSync()
1147     out(returned)
1148     {
1149         /* If task.prev and task.next aren't null, then another thread
1150          * can try to delete this task from the pool after it's
1151          * alreadly been deleted/popped.
1152          */
1153         if (returned !is null)
1154         {
1155             assert(returned.next is null);
1156             assert(returned.prev is null);
1157         }
1158     }
1159     body
1160     {
1161         if (isSingleTask) return null;
1163         AbstractTask* returned = head;
1164         if (head !is null)
1165         {
1166             head = head.next;
1167             returned.prev = null;
1168             returned.next = null;
1169             returned.taskStatus = TaskStatus.inProgress;
1170         }
1171         if (head !is null)
1172         {
1173             head.prev = null;
1174         }
1176         return returned;
1177     }
1179     // Push a task onto the queue.
abstractPut(AbstractTask * task)1180     void abstractPut(AbstractTask* task)
1181     {
1182         queueLock();
1183         scope(exit) queueUnlock();
1184         abstractPutNoSync(task);
1185     }
abstractPutNoSync(AbstractTask * task)1187     void abstractPutNoSync(AbstractTask* task)
1188     in
1189     {
1190         assert(task);
1191     }
1192     out
1193     {
1194         import std.conv : text;
1196         assert(tail.prev !is tail);
1197         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1198         if (tail.prev !is null)
1199         {
1200             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1201         }
1202     }
1203     body
1204     {
1205         // Not using enforce() to save on function call overhead since this
1206         // is a performance critical function.
1207         if (status != PoolState.running)
1208         {
1209             throw new Error(
1210                 "Cannot submit a new task to a pool after calling " ~
1211                 "finish() or stop()."
1212             );
1213         }
1215         task.next = null;
1216         if (head is null)   //Queue is empty.
1217         {
1218             head = task;
1219             tail = task;
1220             tail.prev = null;
1221         }
1222         else
1223         {
1224             assert(tail);
1225             task.prev = tail;
1226             tail.next = task;
1227             tail = task;
1228         }
1229         notify();
1230     }
abstractPutGroupNoSync(AbstractTask * h,AbstractTask * t)1232     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1233     {
1234         if (status != PoolState.running)
1235         {
1236             throw new Error(
1237                 "Cannot submit a new task to a pool after calling " ~
1238                 "finish() or stop()."
1239             );
1240         }
1242         if (head is null)
1243         {
1244             head = h;
1245             tail = t;
1246         }
1247         else
1248         {
1249             h.prev = tail;
1250             tail.next = h;
1251             tail = t;
1252         }
1254         notifyAll();
1255     }
tryDeleteExecute(AbstractTask * toExecute)1257     void tryDeleteExecute(AbstractTask* toExecute)
1258     {
1259         if (isSingleTask) return;
1261         if ( !deleteItem(toExecute) )
1262         {
1263             return;
1264         }
1266         try
1267         {
1268             toExecute.job();
1269         }
1270         catch (Exception e)
1271         {
1272             toExecute.exception = e;
1273         }
1275         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1276     }
deleteItem(AbstractTask * item)1278     bool deleteItem(AbstractTask* item)
1279     {
1280         queueLock();
1281         scope(exit) queueUnlock();
1282         return deleteItemNoSync(item);
1283     }
deleteItemNoSync(AbstractTask * item)1285     bool deleteItemNoSync(AbstractTask* item)
1286     {
1287         if (item.taskStatus != TaskStatus.notStarted)
1288         {
1289             return false;
1290         }
1291         item.taskStatus = TaskStatus.inProgress;
1293         if (item is head)
1294         {
1295             // Make sure head gets set properly.
1296             popNoSync();
1297             return true;
1298         }
1299         if (item is tail)
1300         {
1301             tail = tail.prev;
1302             if (tail !is null)
1303             {
1304                 tail.next = null;
1305             }
1306             item.next = null;
1307             item.prev = null;
1308             return true;
1309         }
1310         if (item.next !is null)
1311         {
1312             assert(item.next.prev is item);  // Check queue consistency.
1313             item.next.prev = item.prev;
1314         }
1315         if (item.prev !is null)
1316         {
1317             assert(item.prev.next is item);  // Check queue consistency.
1318             item.prev.next = item.next;
1319         }
1320         item.next = null;
1321         item.prev = null;
1322         return true;
1323     }
queueLock()1325     void queueLock()
1326     {
1327         assert(queueMutex);
1328         if (!isSingleTask) queueMutex.lock();
1329     }
queueUnlock()1331     void queueUnlock()
1332     {
1333         assert(queueMutex);
1334         if (!isSingleTask) queueMutex.unlock();
1335     }
waiterLock()1337     void waiterLock()
1338     {
1339         if (!isSingleTask) waiterMutex.lock();
1340     }
waiterUnlock()1342     void waiterUnlock()
1343     {
1344         if (!isSingleTask) waiterMutex.unlock();
1345     }
wait()1347     void wait()
1348     {
1349         if (!isSingleTask) workerCondition.wait();
1350     }
notify()1352     void notify()
1353     {
1354         if (!isSingleTask) workerCondition.notify();
1355     }
notifyAll()1357     void notifyAll()
1358     {
1359         if (!isSingleTask) workerCondition.notifyAll();
1360     }
waitUntilCompletion()1362     void waitUntilCompletion()
1363     {
1364         if (isSingleTask)
1365         {
1366             singleTaskThread.join();
1367         }
1368         else
1369         {
1370             waiterCondition.wait();
1371         }
1372     }
notifyWaiters()1374     void notifyWaiters()
1375     {
1376         if (!isSingleTask) waiterCondition.notifyAll();
1377     }
1379     // Private constructor for creating dummy pools that only have one thread,
1380     // only execute one Task, and then terminate.  This is used for
1381     // Task.executeInNewThread().
1382     this(AbstractTask* task, int priority = int.max)
1383     {
1384         assert(task);
1386         // Dummy value, not used.
1387         instanceStartIndex = 0;
1389         this.isSingleTask = true;
1390         task.taskStatus = TaskStatus.inProgress;
1391         this.head = task;
1392         singleTaskThread = new Thread(&doSingleTask);
1393         singleTaskThread.start();
1395         // Disabled until writing code to support
1396         // running thread with specified priority
1397         // See https://d.puremagic.com/issues/show_bug.cgi?id=8960
1399         /*if (priority != int.max)
1400         {
1401             singleTaskThread.priority = priority;
1402         }*/
1403     }
1405 public:
1406     // This is used in parallel_algorithm but is too unstable to document
1407     // as public API.
defaultWorkUnitSize(size_t rangeLen)1408     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1409     {
1410         import std.algorithm.comparison : max;
1412         if (this.size == 0)
1413         {
1414             return rangeLen;
1415         }
1417         immutable size_t eightSize = 4 * (this.size + 1);
1418         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1419         return max(ret, 1);
1420     }
1422     /**
1423     Default constructor that initializes a $(D TaskPool) with
1424     $(D totalCPUs) - 1 worker threads.  The minus 1 is included because the
1425     main thread will also be available to do work.
1427     Note:  On single-core machines, the primitives provided by $(D TaskPool)
1428            operate transparently in single-threaded mode.
1429      */
this()1430     this() @trusted
1431     {
1432         this(totalCPUs - 1);
1433     }
1435     /**
1436     Allows for custom number of worker threads.
1437     */
this(size_t nWorkers)1438     this(size_t nWorkers) @trusted
1439     {
1440         synchronized(typeid(TaskPool))
1441         {
1442             instanceStartIndex = nextInstanceIndex;
1444             // The first worker thread to be initialized will have this index,
1445             // and will increment it.  The second worker to be initialized will
1446             // have this index plus 1.
1447             nextThreadIndex = instanceStartIndex;
1448             nextInstanceIndex += nWorkers;
1449         }
1451         queueMutex = new Mutex(this);
1452         waiterMutex = new Mutex();
1453         workerCondition = new Condition(queueMutex);
1454         waiterCondition = new Condition(waiterMutex);
1456         pool = new ParallelismThread[nWorkers];
1457         foreach (ref poolThread; pool)
1458         {
1459             poolThread = new ParallelismThread(&startWorkLoop);
1460             poolThread.pool = this;
1461             poolThread.start();
1462         }
1463     }
1465     /**
1466     Implements a parallel foreach loop over a range.  This works by implicitly
1467     creating and submitting one $(D Task) to the $(D TaskPool) for each worker
1468     thread.  A work unit is a set of consecutive elements of $(D range) to
1469     be processed by a worker thread between communication with any other
1470     thread.  The number of elements processed per work unit is controlled by the
1471     $(D workUnitSize) parameter.  Smaller work units provide better load
1472     balancing, but larger work units avoid the overhead of communicating
1473     with other threads frequently to fetch the next work unit.  Large work
1474     units also avoid false sharing in cases where the range is being modified.
1475     The less time a single iteration of the loop takes, the larger
1476     $(D workUnitSize) should be.  For very expensive loop bodies,
1477     $(D workUnitSize) should  be 1.  An overload that chooses a default work
1478     unit size is also available.
1480     Example:
1481     ---
1482     // Find the logarithm of every number from 1 to
1483     // 10_000_000 in parallel.
1484     auto logs = new double[10_000_000];
1486     // Parallel foreach works with or without an index
1487     // variable.  It can be iterate by ref if range.front
1488     // returns by ref.
1490     // Iterate over logs using work units of size 100.
1491     foreach (i, ref elem; taskPool.parallel(logs, 100))
1492     {
1493         elem = log(i + 1.0);
1494     }
1496     // Same thing, but use the default work unit size.
1497     //
1498     // Timings on an Athlon 64 X2 dual core machine:
1499     //
1500     // Parallel foreach:  388 milliseconds
1501     // Regular foreach:   619 milliseconds
1502     foreach (i, ref elem; taskPool.parallel(logs))
1503     {
1504         elem = log(i + 1.0);
1505     }
1506     ---
1508     Notes:
1510     The memory usage of this implementation is guaranteed to be constant
1511     in $(D range.length).
1513     Breaking from a parallel foreach loop via a break, labeled break,
1514     labeled continue, return or goto statement throws a
1515     $(D ParallelForeachError).
1517     In the case of non-random access ranges, parallel foreach buffers lazily
1518     to an array of size $(D workUnitSize) before executing the parallel portion
1519     of the loop.  The exception is that, if a parallel foreach is executed
1520     over a range returned by $(D asyncBuf) or $(D map), the copying is elided
1521     and the buffers are simply swapped.  In this case $(D workUnitSize) is
1522     ignored and the work unit size is set to the  buffer size of $(D range).
1524     A memory barrier is guaranteed to be executed on exit from the loop,
1525     so that results produced by all threads are visible in the calling thread.
1527     $(B Exception Handling):
1529     When at least one exception is thrown from inside a parallel foreach loop,
1530     the submission of additional $(D Task) objects is terminated as soon as
1531     possible, in a non-deterministic manner.  All executing or
1532     enqueued work units are allowed to complete.  Then, all exceptions that
1533     were thrown by any work unit are chained using $(D Throwable.next) and
1534     rethrown.  The order of the exception chaining is non-deterministic.
1535     */
1536     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1537     {
1538         import std.exception : enforce;
1539         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1540         alias RetType = ParallelForeach!R;
1541         return RetType(this, range, workUnitSize);
1542     }
1545     /// Ditto
1546     ParallelForeach!R parallel(R)(R range)
1547     {
1548         static if (hasLength!R)
1549         {
1550             // Default work unit size is such that we would use 4x as many
1551             // slots as are in this thread pool.
1552             size_t workUnitSize = defaultWorkUnitSize(range.length);
1553             return parallel(range, workUnitSize);
1554         }
1555         else
1556         {
1557             // Just use a really, really dumb guess if the user is too lazy to
1558             // specify.
1559             return parallel(range, 512);
1560         }
1561     }
1563     ///
amap(functions...)1564     template amap(functions...)
1565     {
1566         /**
1567         Eager parallel map.  The eagerness of this function means it has less
1568         overhead than the lazily evaluated $(D TaskPool.map) and should be
1569         preferred where the memory requirements of eagerness are acceptable.
1570         $(D functions) are the functions to be evaluated, passed as template
1571         alias parameters in a style similar to
1572         $(REF map, std,algorithm,iteration).
1573         The first argument must be a random access range. For performance
1574         reasons, amap will assume the range elements have not yet been
1575         initialized. Elements will be overwritten without calling a destructor
1576         nor doing an assignment. As such, the range must not contain meaningful
1577         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1578         objects in their $(D .init) state.
1580         ---
1581         auto numbers = iota(100_000_000.0);
1583         // Find the square roots of numbers.
1584         //
1585         // Timings on an Athlon 64 X2 dual core machine:
1586         //
1587         // Parallel eager map:                   0.802 s
1588         // Equivalent serial implementation:     1.768 s
1589         auto squareRoots = taskPool.amap!sqrt(numbers);
1590         ---
1592         Immediately after the range argument, an optional work unit size argument
1593         may be provided.  Work units as used by $(D amap) are identical to those
1594         defined for parallel foreach.  If no work unit size is provided, the
1595         default work unit size is used.
1597         ---
1598         // Same thing, but make work unit size 100.
1599         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1600         ---
1602         An output range for returning the results may be provided as the last
1603         argument.  If one is not provided, an array of the proper type will be
1604         allocated on the garbage collected heap.  If one is provided, it must be a
1605         random access range with assignable elements, must have reference
1606         semantics with respect to assignment to its elements, and must have the
1607         same length as the input range.  Writing to adjacent elements from
1608         different threads must be safe.
1610         ---
1611         // Same thing, but explicitly allocate an array
1612         // to return the results in.  The element type
1613         // of the array may be either the exact type
1614         // returned by functions or an implicit conversion
1615         // target.
1616         auto squareRoots = new float[numbers.length];
1617         taskPool.amap!sqrt(numbers, squareRoots);
1619         // Multiple functions, explicit output range, and
1620         // explicit work unit size.
1621         auto results = new Tuple!(float, real)[numbers.length];
1622         taskPool.amap!(sqrt, log)(numbers, 100, results);
1623         ---
1625         Note:
1627         A memory barrier is guaranteed to be executed after all results are written
1628         but before returning so that results produced by all threads are visible
1629         in the calling thread.
1631         Tips:
1633         To perform the mapping operation in place, provide the same range for the
1634         input and output range.
1636         To parallelize the copying of a range with expensive to evaluate elements
1637         to an array, pass an identity function (a function that just returns
1638         whatever argument is provided to it) to $(D amap).
1640         $(B Exception Handling):
1642         When at least one exception is thrown from inside the map functions,
1643         the submission of additional $(D Task) objects is terminated as soon as
1644         possible, in a non-deterministic manner.  All currently executing or
1645         enqueued work units are allowed to complete.  Then, all exceptions that
1646         were thrown from any work unit are chained using $(D Throwable.next) and
1647         rethrown.  The order of the exception chaining is non-deterministic.
1648         */
1649         auto amap(Args...)(Args args)
1650         if (isRandomAccessRange!(Args[0]))
1651         {
1652             import std.conv : emplaceRef;
1654             alias fun = adjoin!(staticMap!(unaryFun, functions));
1656             alias range = args[0];
1657             immutable len = range.length;
1659             static if (
1660                 Args.length > 1 &&
1661                 randAssignable!(Args[$ - 1]) &&
1662                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1663                 )
1664             {
1665                 import std.conv : text;
1666                 import std.exception : enforce;
1668                 alias buf = args[$ - 1];
1669                 alias args2 = args[0..$ - 1];
1670                 alias Args2 = Args[0..$ - 1];
1671                 enforce(buf.length == len,
1672                         text("Can't use a user supplied buffer that's the wrong ",
1673                              "size.  (Expected  :", len, " Got:  ", buf.length));
1674             }
1675             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1676             {
1677                 static assert(0, "Wrong buffer type.");
1678             }
1679             else
1680             {
1681                 import std.array : uninitializedArray;
1683                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1684                 alias args2 = args;
1685                 alias Args2 = Args;
1686             }
1688             if (!len) return buf;
1690             static if (isIntegral!(Args2[$ - 1]))
1691             {
1692                 static assert(args2.length == 2);
1693                 auto workUnitSize = cast(size_t) args2[1];
1694             }
1695             else
1696             {
1697                 static assert(args2.length == 1, Args);
1698                 auto workUnitSize = defaultWorkUnitSize(range.length);
1699             }
1701             alias R = typeof(range);
1703             if (workUnitSize > len)
1704             {
1705                 workUnitSize = len;
1706             }
1708             // Handle as a special case:
1709             if (size == 0)
1710             {
1711                 size_t index = 0;
1712                 foreach (elem; range)
1713                 {
1714                     emplaceRef(buf[index++], fun(elem));
1715                 }
1716                 return buf;
1717             }
1719             // Effectively -1:  chunkIndex + 1 == 0:
1720             shared size_t workUnitIndex = size_t.max;
1721             shared bool shouldContinue = true;
1723             void doIt()
1724             {
1725                 import std.algorithm.comparison : min;
1727                 scope(failure)
1728                 {
1729                     // If an exception is thrown, all threads should bail.
1730                     atomicStore(shouldContinue, false);
1731                 }
1733                 while (atomicLoad(shouldContinue))
1734                 {
1735                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1736                     immutable start = workUnitSize * myUnitIndex;
1737                     if (start >= len)
1738                     {
1739                         atomicStore(shouldContinue, false);
1740                         break;
1741                     }
1743                     immutable end = min(len, start + workUnitSize);
1745                     static if (hasSlicing!R)
1746                     {
1747                         auto subrange = range[start .. end];
1748                         foreach (i; start .. end)
1749                         {
1750                             emplaceRef(buf[i], fun(subrange.front));
1751                             subrange.popFront();
1752                         }
1753                     }
1754                     else
1755                     {
1756                         foreach (i; start .. end)
1757                         {
1758                             emplaceRef(buf[i], fun(range[i]));
1759                         }
1760                     }
1761                 }
1762             }
1764             submitAndExecute(this, &doIt);
1765             return buf;
1766         }
1767     }
1769     ///
map(functions...)1770     template map(functions...)
1771     {
1772         /**
1773         A semi-lazy parallel map that can be used for pipelining.  The map
1774         functions are evaluated for the first $(D bufSize) elements and stored in a
1775         buffer and made available to $(D popFront).  Meanwhile, in the
1776         background a second buffer of the same size is filled.  When the first
1777         buffer is exhausted, it is swapped with the second buffer and filled while
1778         the values from what was originally the second buffer are read.  This
1779         implementation allows for elements to be written to the buffer without
1780         the need for atomic operations or synchronization for each write, and
1781         enables the mapping function to be evaluated efficiently in parallel.
1783         $(D map) has more overhead than the simpler procedure used by $(D amap)
1784         but avoids the need to keep all results in memory simultaneously and works
1785         with non-random access ranges.
1787         Params:
1789         source = The input range to be mapped.  If $(D source) is not random
1790         access it will be lazily buffered to an array of size $(D bufSize) before
1791         the map function is evaluated.  (For an exception to this rule, see Notes.)
1793         bufSize = The size of the buffer to store the evaluated elements.
1795         workUnitSize = The number of elements to evaluate in a single
1796         $(D Task).  Must be less than or equal to $(D bufSize), and
1797         should be a fraction of $(D bufSize) such that all worker threads can be
1798         used.  If the default of size_t.max is used, workUnitSize will be set to
1799         the pool-wide default.
1801         Returns:  An input range representing the results of the map.  This range
1802                   has a length iff $(D source) has a length.
1804         Notes:
1806         If a range returned by $(D map) or $(D asyncBuf) is used as an input to
1807         $(D map), then as an optimization the copying from the output buffer
1808         of the first range to the input buffer of the second range is elided, even
1809         though the ranges returned by $(D map) and $(D asyncBuf) are non-random
1810         access ranges.  This means that the $(D bufSize) parameter passed to the
1811         current call to $(D map) will be ignored and the size of the buffer
1812         will be the buffer size of $(D source).
1814         Example:
1815         ---
1816         // Pipeline reading a file, converting each line
1817         // to a number, taking the logarithms of the numbers,
1818         // and performing the additions necessary to find
1819         // the sum of the logarithms.
1821         auto lineRange = File("numberList.txt").byLine();
1822         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1823         auto nums = taskPool.map!(to!double)(dupedLines);
1824         auto logs = taskPool.map!log10(nums);
1826         double sum = 0;
1827         foreach (elem; logs)
1828         {
1829             sum += elem;
1830         }
1831         ---
1833         $(B Exception Handling):
1835         Any exceptions thrown while iterating over $(D source)
1836         or computing the map function are re-thrown on a call to $(D popFront) or,
1837         if thrown during construction, are simply allowed to propagate to the
1838         caller.  In the case of exceptions thrown while computing the map function,
1839         the exceptions are chained as in $(D TaskPool.amap).
1840         */
1841         auto
1842         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1843         if (isInputRange!S)
1844         {
1845             import std.exception : enforce;
1847             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1848                     "Work unit size must be smaller than buffer size.");
1849             alias fun = adjoin!(staticMap!(unaryFun, functions));
1851             static final class Map
1852             {
1853                 // This is a class because the task needs to be located on the
1854                 // heap and in the non-random access case source needs to be on
1855                 // the heap, too.
1857             private:
1858                 enum bufferTrick = is(typeof(source.buf1)) &&
1859                 is(typeof(source.bufPos)) &&
1860                 is(typeof(source.doBufSwap()));
1862                 alias E = MapType!(S, functions);
1863                 E[] buf1, buf2;
1864                 S source;
1865                 TaskPool pool;
1866                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1867                 size_t workUnitSize;
1868                 size_t bufPos;
1869                 bool lastTaskWaited;
1871             static if (isRandomAccessRange!S)
1872             {
1873                 alias FromType = S;
1875                 void popSource()
1876                 {
1877                     import std.algorithm.comparison : min;
1879                     static if (__traits(compiles, source[0 .. source.length]))
1880                     {
1881                         source = source[min(buf1.length, source.length)..source.length];
1882                     }
1883                     else static if (__traits(compiles, source[0..$]))
1884                     {
1885                         source = source[min(buf1.length, source.length)..$];
1886                     }
1887                     else
1888                     {
1889                         static assert(0, "S must have slicing for Map."
1890                                       ~ "  " ~ S.stringof ~ " doesn't.");
1891                     }
1892                 }
1893             }
1894             else static if (bufferTrick)
1895             {
1896                 // Make sure we don't have the buffer recycling overload of
1897                 // asyncBuf.
1898                 static if (
1899                     is(typeof(source.source)) &&
1900                     isRoundRobin!(typeof(source.source))
1901                 )
1902                 {
1903                     static assert(0, "Cannot execute a parallel map on " ~
1904                                   "the buffer recycling overload of asyncBuf."
1905                                  );
1906                 }
1908                 alias FromType = typeof(source.buf1);
1909                 FromType from;
1911                 // Just swap our input buffer with source's output buffer.
1912                 // No need to copy element by element.
1913                 FromType dumpToFrom()
1914                 {
1915                     import std.algorithm.mutation : swap;
1917                     assert(source.buf1.length <= from.length);
1918                     from.length = source.buf1.length;
1919                     swap(source.buf1, from);
1921                     // Just in case this source has been popped before
1922                     // being sent to map:
1923                     from = from[source.bufPos..$];
1925                     static if (is(typeof(source._length)))
1926                     {
1927                         source._length -= (from.length - source.bufPos);
1928                     }
1930                     source.doBufSwap();
1932                     return from;
1933                 }
1934             }
1935             else
1936             {
1937                 alias FromType = ElementType!S[];
1939                 // The temporary array that data is copied to before being
1940                 // mapped.
1941                 FromType from;
1943                 FromType dumpToFrom()
1944                 {
1945                     assert(from !is null);
1947                     size_t i;
1948                     for (; !source.empty && i < from.length; source.popFront())
1949                     {
1950                         from[i++] = source.front;
1951                     }
1953                     from = from[0 .. i];
1954                     return from;
1955                 }
1956             }
1958             static if (hasLength!S)
1959             {
1960                 size_t _length;
1962                 public @property size_t length() const @safe pure nothrow
1963                 {
1964                     return _length;
1965                 }
1966             }
1968                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
1969                 {
1970                     static if (bufferTrick)
1971                     {
1972                         bufSize = source.buf1.length;
1973                     }
1975                     buf1.length = bufSize;
1976                     buf2.length = bufSize;
1978                     static if (!isRandomAccessRange!S)
1979                     {
1980                         from.length = bufSize;
1981                     }
1983                     this.workUnitSize = (workUnitSize == size_t.max) ?
1984                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
1985                     this.source = source;
1986                     this.pool = pool;
1988                     static if (hasLength!S)
1989                     {
1990                         _length = source.length;
1991                     }
1993                     buf1 = fillBuf(buf1);
1994                     submitBuf2();
1995                 }
1997                 // The from parameter is a dummy and ignored in the random access
1998                 // case.
1999                 E[] fillBuf(E[] buf)
2000                 {
2001                     import std.algorithm.comparison : min;
2003                     static if (isRandomAccessRange!S)
2004                     {
2005                         import std.range : take;
2006                         auto toMap = take(source, buf.length);
2007                         scope(success) popSource();
2008                     }
2009                     else
2010                     {
2011                         auto toMap = dumpToFrom();
2012                     }
2014                     buf = buf[0 .. min(buf.length, toMap.length)];
2016                     // Handle as a special case:
2017                     if (pool.size == 0)
2018                     {
2019                         size_t index = 0;
2020                         foreach (elem; toMap)
2021                         {
2022                             buf[index++] = fun(elem);
2023                         }
2024                         return buf;
2025                     }
2027                     pool.amap!functions(toMap, workUnitSize, buf);
2029                     return buf;
2030                 }
2032                 void submitBuf2()
2033                 in
2034                 {
2035                     assert(nextBufTask.prev is null);
2036                     assert(nextBufTask.next is null);
2037                 } body
2038                 {
2039                     // Hack to reuse the task object.
2041                     nextBufTask = typeof(nextBufTask).init;
2042                     nextBufTask._args[0] = &fillBuf;
2043                     nextBufTask._args[1] = buf2;
2044                     pool.put(nextBufTask);
2045                 }
2047                 void doBufSwap()
2048                 {
2049                     if (lastTaskWaited)
2050                     {
2051                         // Then the source is empty.  Signal it here.
2052                         buf1 = null;
2053                         buf2 = null;
2055                         static if (!isRandomAccessRange!S)
2056                         {
2057                             from = null;
2058                         }
2060                         return;
2061                     }
2063                     buf2 = buf1;
2064                     buf1 = nextBufTask.yieldForce;
2065                     bufPos = 0;
2067                     if (source.empty)
2068                     {
2069                         lastTaskWaited = true;
2070                     }
2071                     else
2072                     {
2073                         submitBuf2();
2074                     }
2075                 }
2077             public:
2078                 @property auto front()
2079                 {
2080                     return buf1[bufPos];
2081                 }
2083                 void popFront()
2084                 {
2085                     static if (hasLength!S)
2086                     {
2087                         _length--;
2088                     }
2090                     bufPos++;
2091                     if (bufPos >= buf1.length)
2092                     {
2093                         doBufSwap();
2094                     }
2095                 }
2097                 static if (isInfinite!S)
2098                 {
2099                     enum bool empty = false;
2100                 }
2101                 else
2102                 {
2104                     bool empty() const @property
2105                     {
2106                         // popFront() sets this when source is empty
2107                         return buf1.length == 0;
2108                     }
2109                 }
2110             }
2111             return new Map(source, bufSize, workUnitSize, this);
2112         }
2113     }
2115     /**
2116     Given a $(D source) range that is expensive to iterate over, returns an
2117     input range that asynchronously buffers the contents of
2118     $(D source) into a buffer of $(D bufSize) elements in a worker thread,
2119     while making previously buffered elements from a second buffer, also of size
2120     $(D bufSize), available via the range interface of the returned
2121     object.  The returned range has a length iff $(D hasLength!S).
2122     $(D asyncBuf) is useful, for example, when performing expensive operations
2123     on the elements of ranges that represent data on a disk or network.
2125     Example:
2126     ---
2127     import std.conv, std.stdio;
2129     void main()
2130     {
2131         // Fetch lines of a file in a background thread
2132         // while processing previously fetched lines,
2133         // dealing with byLine's buffer recycling by
2134         // eagerly duplicating every line.
2135         auto lines = File("foo.txt").byLine();
2136         auto duped = std.algorithm.map!"a.idup"(lines);
2138         // Fetch more lines in the background while we
2139         // process the lines already read into memory
2140         // into a matrix of doubles.
2141         double[][] matrix;
2142         auto asyncReader = taskPool.asyncBuf(duped);
2144         foreach (line; asyncReader)
2145         {
2146             auto ls = line.split("\t");
2147             matrix ~= to!(double[])(ls);
2148         }
2149     }
2150     ---
2152     $(B Exception Handling):
2154     Any exceptions thrown while iterating over $(D source) are re-thrown on a
2155     call to $(D popFront) or, if thrown during construction, simply
2156     allowed to propagate to the caller.
2157     */
2158     auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2159     {
2160         static final class AsyncBuf
2161         {
2162             // This is a class because the task and source both need to be on
2163             // the heap.
2165             // The element type of S.
2166             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2168         private:
2169             E[] buf1, buf2;
2170             S source;
2171             TaskPool pool;
2172             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2173             size_t bufPos;
2174             bool lastTaskWaited;
2176             static if (hasLength!S)
2177             {
2178                 size_t _length;
2180                 // Available if hasLength!S.
length()2181                 public @property size_t length() const @safe pure nothrow
2182                 {
2183                     return _length;
2184                 }
2185             }
this(S source,size_t bufSize,TaskPool pool)2187             this(S source, size_t bufSize, TaskPool pool)
2188             {
2189                 buf1.length = bufSize;
2190                 buf2.length = bufSize;
2192                 this.source = source;
2193                 this.pool = pool;
2195                 static if (hasLength!S)
2196                 {
2197                     _length = source.length;
2198                 }
2200                 buf1 = fillBuf(buf1);
2201                 submitBuf2();
2202             }
fillBuf(E[]buf)2204             E[] fillBuf(E[] buf)
2205             {
2206                 assert(buf !is null);
2208                 size_t i;
2209                 for (; !source.empty && i < buf.length; source.popFront())
2210                 {
2211                     buf[i++] = source.front;
2212                 }
2214                 buf = buf[0 .. i];
2215                 return buf;
2216             }
submitBuf2()2218             void submitBuf2()
2219             in
2220             {
2221                 assert(nextBufTask.prev is null);
2222                 assert(nextBufTask.next is null);
2223             } body
2224             {
2225                 // Hack to reuse the task object.
2227                 nextBufTask = typeof(nextBufTask).init;
2228                 nextBufTask._args[0] = &fillBuf;
2229                 nextBufTask._args[1] = buf2;
2230                 pool.put(nextBufTask);
2231             }
doBufSwap()2233             void doBufSwap()
2234             {
2235                 if (lastTaskWaited)
2236                 {
2237                     // Then source is empty.  Signal it here.
2238                     buf1 = null;
2239                     buf2 = null;
2240                     return;
2241                 }
2243                 buf2 = buf1;
2244                 buf1 = nextBufTask.yieldForce;
2245                 bufPos = 0;
2247                 if (source.empty)
2248                 {
2249                     lastTaskWaited = true;
2250                 }
2251                 else
2252                 {
2253                     submitBuf2();
2254                 }
2255             }
2257         public:
front()2258             E front() @property
2259             {
2260                 return buf1[bufPos];
2261             }
popFront()2263             void popFront()
2264             {
2265                 static if (hasLength!S)
2266                 {
2267                     _length--;
2268                 }
2270                 bufPos++;
2271                 if (bufPos >= buf1.length)
2272                 {
2273                     doBufSwap();
2274                 }
2275             }
2277             static if (isInfinite!S)
2278             {
2279                 enum bool empty = false;
2280             }
2282             else
2283             {
2284                 ///
empty()2285                 bool empty() @property
2286                 {
2287                     // popFront() sets this when source is empty:
2288                     return buf1.length == 0;
2289                 }
2290             }
2291         }
2292         return new AsyncBuf(source, bufSize, this);
2293     }
2295     /**
2296     Given a callable object $(D next) that writes to a user-provided buffer and
2297     a second callable object $(D empty) that determines whether more data is
2298     available to write via $(D next), returns an input range that
2299     asynchronously calls $(D next) with a set of size $(D nBuffers) of buffers
2300     and makes the results available in the order they were obtained via the
2301     input range interface of the returned object.  Similarly to the
2302     input range overload of $(D asyncBuf), the first half of the buffers
2303     are made available via the range interface while the second half are
2304     filled and vice-versa.
2306     Params:
2308     next = A callable object that takes a single argument that must be an array
2309            with mutable elements.  When called, $(D next) writes data to
2310            the array provided by the caller.
2312     empty = A callable object that takes no arguments and returns a type
2313             implicitly convertible to $(D bool).  This is used to signify
2314             that no more data is available to be obtained by calling $(D next).
2316     initialBufSize = The initial size of each buffer.  If $(D next) takes its
2317                      array by reference, it may resize the buffers.
2319     nBuffers = The number of buffers to cycle through when calling $(D next).
2321     Example:
2322     ---
2323     // Fetch lines of a file in a background
2324     // thread while processing previously fetched
2325     // lines, without duplicating any lines.
2326     auto file = File("foo.txt");
2328     void next(ref char[] buf)
2329     {
2330         file.readln(buf);
2331     }
2333     // Fetch more lines in the background while we
2334     // process the lines already read into memory
2335     // into a matrix of doubles.
2336     double[][] matrix;
2337     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2339     foreach (line; asyncReader)
2340     {
2341         auto ls = line.split("\t");
2342         matrix ~= to!(double[])(ls);
2343     }
2344     ---
2346     $(B Exception Handling):
2348     Any exceptions thrown while iterating over $(D range) are re-thrown on a
2349     call to $(D popFront).
2351     Warning:
2353     Using the range returned by this function in a parallel foreach loop
2354     will not work because buffers may be overwritten while the task that
2355     processes them is in queue.  This is checked for at compile time
2356     and will result in a static assertion failure.
2357     */
2358     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2359     if (is(typeof(C2.init()) : bool) &&
2360         Parameters!C1.length == 1 &&
2361         Parameters!C2.length == 0 &&
2362         isArray!(Parameters!C1[0])
2363     ) {
2364         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2365         return asyncBuf(roundRobin, nBuffers / 2);
2366     }
2368     ///
reduce(functions...)2369     template reduce(functions...)
2370     {
2371         /**
2372         Parallel reduce on a random access range.  Except as otherwise noted,
2373         usage is similar to $(REF _reduce, std,algorithm,iteration).  This
2374         function works by splitting the range to be reduced into work units,
2375         which are slices to be reduced in parallel.  Once the results from all
2376         work units are computed, a final serial reduction is performed on these
2377         results to compute the final answer. Therefore, care must be taken to
2378         choose the seed value appropriately.
2380         Because the reduction is being performed in parallel, $(D functions)
2381         must be associative.  For notational simplicity, let # be an
2382         infix operator representing $(D functions).  Then, (a # b) # c must equal
2383         a # (b # c).  Floating point addition is not associative
2384         even though addition in exact arithmetic is.  Summing floating
2385         point numbers using this function may give different results than summing
2386         serially.  However, for many practical purposes floating point addition
2387         can be treated as associative.
2389         Note that, since $(D functions) are assumed to be associative,
2390         additional optimizations are made to the serial portion of the reduction
2391         algorithm. These take advantage of the instruction level parallelism of
2392         modern CPUs, in addition to the thread-level parallelism that the rest
2393         of this module exploits.  This can lead to better than linear speedups
2394         relative to $(REF _reduce, std,algorithm,iteration), especially for
2395         fine-grained benchmarks like dot products.
2397         An explicit seed may be provided as the first argument.  If
2398         provided, it is used as the seed for all work units and for the final
2399         reduction of results from all work units.  Therefore, if it is not the
2400         identity value for the operation being performed, results may differ
2401         from those generated by $(REF _reduce, std,algorithm,iteration) or
2402         depending on how many work units are used.  The next argument must be
2403         the range to be reduced.
2404         ---
2405         // Find the sum of squares of a range in parallel, using
2406         // an explicit seed.
2407         //
2408         // Timings on an Athlon 64 X2 dual core machine:
2409         //
2410         // Parallel reduce:                     72 milliseconds
2411         // Using std.algorithm.reduce instead:  181 milliseconds
2412         auto nums = iota(10_000_000.0f);
2413         auto sumSquares = taskPool.reduce!"a + b"(
2414             0.0, std.algorithm.map!"a * a"(nums)
2415         );
2416         ---
2418         If no explicit seed is provided, the first element of each work unit
2419         is used as a seed.  For the final reduction, the result from the first
2420         work unit is used as the seed.
2421         ---
2422         // Find the sum of a range in parallel, using the first
2423         // element of each work unit as the seed.
2424         auto sum = taskPool.reduce!"a + b"(nums);
2425         ---
2427         An explicit work unit size may be specified as the last argument.
2428         Specifying too small a work unit size will effectively serialize the
2429         reduction, as the final reduction of the result of each work unit will
2430         dominate computation time.  If $(D TaskPool.size) for this instance
2431         is zero, this parameter is ignored and one work unit is used.
2432         ---
2433         // Use a work unit size of 100.
2434         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2436         // Work unit size of 100 and explicit seed.
2437         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2438         ---
2440         Parallel reduce supports multiple functions, like
2441         $(D std.algorithm.reduce).
2442         ---
2443         // Find both the min and max of nums.
2444         auto minMax = taskPool.reduce!(min, max)(nums);
2445         assert(minMax[0] == reduce!min(nums));
2446         assert(minMax[1] == reduce!max(nums));
2447         ---
2449         $(B Exception Handling):
2451         After this function is finished executing, any exceptions thrown
2452         are chained together via $(D Throwable.next) and rethrown.  The chaining
2453         order is non-deterministic.
2454          */
2455         auto reduce(Args...)(Args args)
2456         {
2457             import core.exception : OutOfMemoryError;
2458             import std.conv : emplaceRef;
2459             import std.exception : enforce;
2461             alias fun = reduceAdjoin!functions;
2462             alias finishFun = reduceFinish!functions;
2464             static if (isIntegral!(Args[$ - 1]))
2465             {
2466                 size_t workUnitSize = cast(size_t) args[$ - 1];
2467                 alias args2 = args[0..$ - 1];
2468                 alias Args2 = Args[0..$ - 1];
2469             }
2470             else
2471             {
2472                 alias args2 = args;
2473                 alias Args2 = Args;
2474             }
2476             auto makeStartValue(Type)(Type e)
2477             {
2478                 static if (functions.length == 1)
2479                 {
2480                     return e;
2481                 }
2482                 else
2483                 {
2484                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2485                     foreach (i, T; seed.Types)
2486                     {
2487                         emplaceRef(seed.expand[i], e);
2488                     }
2490                     return seed;
2491                 }
2492             }
2494             static if (args2.length == 2)
2495             {
2496                 static assert(isInputRange!(Args2[1]));
2497                 alias range = args2[1];
2498                 alias seed = args2[0];
2499                 enum explicitSeed = true;
2501                 static if (!is(typeof(workUnitSize)))
2502                 {
2503                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2504                 }
2505             }
2506             else
2507             {
2508                 static assert(args2.length == 1);
2509                 alias range = args2[0];
2511                 static if (!is(typeof(workUnitSize)))
2512                 {
2513                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2514                 }
2516                 enforce(!range.empty,
2517                     "Cannot reduce an empty range with first element as start value.");
2519                 auto seed = makeStartValue(range.front);
2520                 enum explicitSeed = false;
2521                 range.popFront();
2522             }
2524             alias E = typeof(seed);
2525             alias R = typeof(range);
2527             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2528             {
2529                 // This is for exploiting instruction level parallelism by
2530                 // using multiple accumulator variables within each thread,
2531                 // since we're assuming functions are associative anyhow.
2533                 // This is so that loops can be unrolled automatically.
2534                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2535                 enum nILP = ilpTuple.length;
2536                 immutable subSize = (upperBound - lowerBound) / nILP;
2538                 if (subSize <= 1)
2539                 {
2540                     // Handle as a special case.
2541                     static if (explicitSeed)
2542                     {
2543                         E result = seed;
2544                     }
2545                     else
2546                     {
2547                         E result = makeStartValue(range[lowerBound]);
2548                         lowerBound++;
2549                     }
2551                     foreach (i; lowerBound .. upperBound)
2552                     {
2553                         result = fun(result, range[i]);
2554                     }
2556                     return result;
2557                 }
2559                 assert(subSize > 1);
2560                 E[nILP] results;
2561                 size_t[nILP] offsets;
2563                 foreach (i; ilpTuple)
2564                 {
2565                     offsets[i] = lowerBound + subSize * i;
2567                     static if (explicitSeed)
2568                     {
2569                         results[i] = seed;
2570                     }
2571                     else
2572                     {
2573                         results[i] = makeStartValue(range[offsets[i]]);
2574                         offsets[i]++;
2575                     }
2576                 }
2578                 immutable nLoop = subSize - (!explicitSeed);
2579                 foreach (i; 0 .. nLoop)
2580                 {
2581                     foreach (j; ilpTuple)
2582                     {
2583                         results[j] = fun(results[j], range[offsets[j]]);
2584                         offsets[j]++;
2585                     }
2586                 }
2588                 // Finish the remainder.
2589                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2590                 {
2591                     results[$ - 1] = fun(results[$ - 1], range[i]);
2592                 }
2594                 foreach (i; ilpTuple[1..$])
2595                 {
2596                     results[0] = finishFun(results[0], results[i]);
2597                 }
2599                 return results[0];
2600             }
2602             immutable len = range.length;
2603             if (len == 0)
2604             {
2605                 return seed;
2606             }
2608             if (this.size == 0)
2609             {
2610                 return finishFun(seed, reduceOnRange(range, 0, len));
2611             }
2613             // Unlike the rest of the functions here, I can't use the Task object
2614             // recycling trick here because this has to work on non-commutative
2615             // operations.  After all the tasks are done executing, fun() has to
2616             // be applied on the results of these to get a final result, but
2617             // it can't be evaluated out of order.
2619             if (workUnitSize > len)
2620             {
2621                 workUnitSize = len;
2622             }
2624             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2625             assert(nWorkUnits * workUnitSize >= len);
2627             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2628             RTask[] tasks;
2630             // Can't use alloca() due to Bug 3753.  Use a fixed buffer
2631             // backed by malloc().
2632             enum maxStack = 2_048;
2633             byte[maxStack] buf = void;
2634             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2636             import core.stdc.stdlib : malloc, free;
2637             if (nBytesNeeded < maxStack)
2638             {
2639                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2640             }
2641             else
2642             {
2643                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2644                 if (!ptr)
2645                 {
2646                     throw new OutOfMemoryError(
2647                         "Out of memory in std.parallelism."
2648                     );
2649                 }
2651                 tasks = ptr[0 .. nWorkUnits];
2652             }
2654             scope(exit)
2655             {
2656                 if (nBytesNeeded > maxStack)
2657                 {
2658                     free(tasks.ptr);
2659                 }
2660             }
2662             foreach (ref t; tasks[])
2663                 emplaceRef(t, RTask());
2665             // Hack to take the address of a nested function w/o
2666             // making a closure.
2667             static auto scopedAddress(D)(scope D del) @system
2668             {
2669                 auto tmp = del;
2670                 return tmp;
2671             }
2673             size_t curPos = 0;
2674             void useTask(ref RTask task)
2675             {
2676                 import std.algorithm.comparison : min;
2678                 task.pool = this;
2679                 task._args[0] = scopedAddress(&reduceOnRange);
2680                 task._args[3] = min(len, curPos + workUnitSize);  // upper bound.
2681                 task._args[1] = range;  // range
2682                 task._args[2] = curPos; // lower bound.
2684                 curPos += workUnitSize;
2685             }
2687             foreach (ref task; tasks)
2688             {
2689                 useTask(task);
2690             }
2692             foreach (i; 1 .. tasks.length - 1)
2693             {
2694                 tasks[i].next = tasks[i + 1].basePtr;
2695                 tasks[i + 1].prev = tasks[i].basePtr;
2696             }
2698             if (tasks.length > 1)
2699             {
2700                 queueLock();
2701                 scope(exit) queueUnlock();
2703                 abstractPutGroupNoSync(
2704                     tasks[1].basePtr,
2705                     tasks[$ - 1].basePtr
2706                 );
2707             }
2709             if (tasks.length > 0)
2710             {
2711                 try
2712                 {
2713                     tasks[0].job();
2714                 }
2715                 catch (Throwable e)
2716                 {
2717                     tasks[0].exception = e;
2718                 }
2719                 tasks[0].taskStatus = TaskStatus.done;
2721                 // Try to execute each of these in the current thread
2722                 foreach (ref task; tasks[1..$])
2723                 {
2724                     tryDeleteExecute(task.basePtr);
2725                 }
2726             }
2728             // Now that we've tried to execute every task, they're all either
2729             // done or in progress.  Force all of them.
2730             E result = seed;
2732             Throwable firstException, lastException;
2734             foreach (ref task; tasks)
2735             {
2736                 try
2737                 {
2738                     task.yieldForce;
2739                 }
2740                 catch (Throwable e)
2741                 {
2742                     addToChain(e, firstException, lastException);
2743                     continue;
2744                 }
2746                 if (!firstException) result = finishFun(result, task.returnVal);
2747             }
2749             if (firstException) throw firstException;
2751             return result;
2752         }
2753     }
2755     /**
2756     Gets the index of the current thread relative to this $(D TaskPool).  Any
2757     thread not in this pool will receive an index of 0.  The worker threads in
2758     this pool receive unique indices of 1 through $(D this.size).
2760     This function is useful for maintaining worker-local resources.
2762     Example:
2763     ---
2764     // Execute a loop that computes the greatest common
2765     // divisor of every number from 0 through 999 with
2766     // 42 in parallel.  Write the results out to
2767     // a set of files, one for each thread.  This allows
2768     // results to be written out without any synchronization.
2770     import std.conv, std.range, std.numeric, std.stdio;
2772     void main()
2773     {
2774         auto filesHandles = new File[taskPool.size + 1];
2775         scope(exit) {
2776             foreach (ref handle; fileHandles)
2777             {
2778                 handle.close();
2779             }
2780         }
2782         foreach (i, ref handle; fileHandles)
2783         {
2784             handle = File("workerResults" ~ to!string(i) ~ ".txt");
2785         }
2787         foreach (num; parallel(iota(1_000)))
2788         {
2789             auto outHandle = fileHandles[taskPool.workerIndex];
2790             outHandle.writeln(num, '\t', gcd(num, 42));
2791         }
2792     }
2793     ---
2794     */
workerIndex()2795     size_t workerIndex() @property @safe const nothrow
2796     {
2797         immutable rawInd = threadIndex;
2798         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
2799                 (rawInd - instanceStartIndex + 1) : 0;
2800     }
2802     /**
2803     Struct for creating worker-local storage.  Worker-local storage is
2804     thread-local storage that exists only for worker threads in a given
2805     $(D TaskPool) plus a single thread outside the pool.  It is allocated on the
2806     garbage collected heap in a way that avoids _false sharing, and doesn't
2807     necessarily have global scope within any thread.  It can be accessed from
2808     any worker thread in the $(D TaskPool) that created it, and one thread
2809     outside this $(D TaskPool).  All threads outside the pool that created a
2810     given instance of worker-local storage share a single slot.
2812     Since the underlying data for this struct is heap-allocated, this struct
2813     has reference semantics when passed between functions.
2815     The main uses cases for $(D WorkerLocalStorageStorage) are:
2817     1.  Performing parallel reductions with an imperative, as opposed to
2818     functional, programming style.  In this case, it's useful to treat
2819     $(D WorkerLocalStorageStorage) as local to each thread for only the parallel
2820     portion of an algorithm.
2822     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
2824     Example:
2825     ---
2826     // Calculate pi as in our synopsis example, but
2827     // use an imperative instead of a functional style.
2828     immutable n = 1_000_000_000;
2829     immutable delta = 1.0L / n;
2831     auto sums = taskPool.workerLocalStorage(0.0L);
2832     foreach (i; parallel(iota(n)))
2833     {
2834         immutable x = ( i - 0.5L ) * delta;
2835         immutable toAdd = delta / ( 1.0 + x * x );
2836         sums.get += toAdd;
2837     }
2839     // Add up the results from each worker thread.
2840     real pi = 0;
2841     foreach (threadResult; sums.toRange)
2842     {
2843         pi += 4.0L * threadResult;
2844     }
2845     ---
2846      */
WorkerLocalStorage(T)2847     static struct WorkerLocalStorage(T)
2848     {
2849     private:
2850         TaskPool pool;
2851         size_t size;
2853         size_t elemSize;
2854         bool* stillThreadLocal;
2856         static size_t roundToLine(size_t num) pure nothrow
2857         {
2858             if (num % cacheLineSize == 0)
2859             {
2860                 return num;
2861             }
2862             else
2863             {
2864                 return ((num / cacheLineSize) + 1) * cacheLineSize;
2865             }
2866         }
2868         void* data;
2870         void initialize(TaskPool pool)
2871         {
2872             this.pool = pool;
2873             size = pool.size + 1;
2874             stillThreadLocal = new bool;
2875             *stillThreadLocal = true;
2877             // Determines whether the GC should scan the array.
2878             auto blkInfo = (typeid(T).flags & 1) ?
2879                            cast(GC.BlkAttr) 0 :
2880                            GC.BlkAttr.NO_SCAN;
2882             immutable nElem = pool.size + 1;
2883             elemSize = roundToLine(T.sizeof);
2885             // The + 3 is to pad one full cache line worth of space on either side
2886             // of the data structure to make sure false sharing with completely
2887             // unrelated heap data is prevented, and to provide enough padding to
2888             // make sure that data is cache line-aligned.
2889             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
2891             // Cache line align data ptr.
2892             data = cast(void*) roundToLine(cast(size_t) data);
2894             foreach (i; 0 .. nElem)
2895             {
2896                 this.opIndex(i) = T.init;
2897             }
2898         }
2900         ref opIndex(this Qualified)(size_t index)
2901         {
2902             import std.conv : text;
2903             assert(index < size, text(index, '\t', uint.max));
2904             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
2905         }
2907         void opIndexAssign(T val, size_t index)
2908         {
2909             assert(index < size);
2910             *(cast(T*) (data + elemSize * index)) = val;
2911         }
2913     public:
2914         /**
2915         Get the current thread's instance.  Returns by ref.
2916         Note that calling $(D get) from any thread
2917         outside the $(D TaskPool) that created this instance will return the
2918         same reference, so an instance of worker-local storage should only be
2919         accessed from one thread outside the pool that created it.  If this
2920         rule is violated, undefined behavior will result.
2922         If assertions are enabled and $(D toRange) has been called, then this
2923         WorkerLocalStorage instance is no longer worker-local and an assertion
2924         failure will result when calling this method.  This is not checked
2925         when assertions are disabled for performance reasons.
2926          */
2927         ref get(this Qualified)() @property
2928         {
2929             assert(*stillThreadLocal,
2930                 "Cannot call get() on this instance of WorkerLocalStorage " ~
2931                 "because it is no longer worker-local."
2932             );
2933             return opIndex(pool.workerIndex);
2934         }
2936         /**
2937         Assign a value to the current thread's instance.  This function has
2938         the same caveats as its overload.
2939         */
2940         void get(T val) @property
2941         {
2942             assert(*stillThreadLocal,
2943                 "Cannot call get() on this instance of WorkerLocalStorage " ~
2944                 "because it is no longer worker-local."
2945             );
2947             opIndexAssign(val, pool.workerIndex);
2948         }
2950         /**
2951         Returns a range view of the values for all threads, which can be used
2952         to further process the results of each thread after running the parallel
2953         part of your algorithm.  Do not use this method in the parallel portion
2954         of your algorithm.
2956         Calling this function sets a flag indicating that this struct is no
2957         longer worker-local, and attempting to use the $(D get) method again
2958         will result in an assertion failure if assertions are enabled.
2959          */
2960         WorkerLocalStorageRange!T toRange() @property
2961         {
2962             if (*stillThreadLocal)
2963             {
2964                 *stillThreadLocal = false;
2966                 // Make absolutely sure results are visible to all threads.
2967                 // This is probably not necessary since some other
2968                 // synchronization primitive will be used to signal that the
2969                 // parallel part of the algorithm is done, but the
2970                 // performance impact should be negligible, so it's better
2971                 // to be safe.
2972                 ubyte barrierDummy;
2973                 atomicSetUbyte(barrierDummy, 1);
2974             }
2976             return WorkerLocalStorageRange!T(this);
2977         }
2978     }
2980     /**
2981     Range primitives for worker-local storage.  The purpose of this is to
2982     access results produced by each worker thread from a single thread once you
2983     are no longer using the worker-local storage from multiple threads.
2984     Do not use this struct in the parallel portion of your algorithm.
2986     The proper way to instantiate this object is to call
2987     $(D WorkerLocalStorage.toRange).  Once instantiated, this object behaves
2988     as a finite random-access range with assignable, lvalue elements and
2989     a length equal to the number of worker threads in the $(D TaskPool) that
2990     created it plus 1.
2991      */
WorkerLocalStorageRange(T)2992     static struct WorkerLocalStorageRange(T)
2993     {
2994     private:
2995         WorkerLocalStorage!T workerLocalStorage;
2997         size_t _length;
2998         size_t beginOffset;
3000         this(WorkerLocalStorage!T wl)
3001         {
3002             this.workerLocalStorage = wl;
3003             _length = wl.size;
3004         }
3006     public:
3007         ref front(this Qualified)() @property
3008         {
3009             return this[0];
3010         }
3012         ref back(this Qualified)() @property
3013         {
3014             return this[_length - 1];
3015         }
3017         void popFront()
3018         {
3019             if (_length > 0)
3020             {
3021                 beginOffset++;
3022                 _length--;
3023             }
3024         }
3026         void popBack()
3027         {
3028             if (_length > 0)
3029             {
3030                 _length--;
3031             }
3032         }
3034         typeof(this) save() @property
3035         {
3036             return this;
3037         }
3039         ref opIndex(this Qualified)(size_t index)
3040         {
3041             assert(index < _length);
3042             return workerLocalStorage[index + beginOffset];
3043         }
3045         void opIndexAssign(T val, size_t index)
3046         {
3047             assert(index < _length);
3048             workerLocalStorage[index] = val;
3049         }
3051         typeof(this) opSlice(size_t lower, size_t upper)
3052         {
3053             assert(upper <= _length);
3054             auto newWl = this.workerLocalStorage;
3055             newWl.data += lower * newWl.elemSize;
3056             newWl.size = upper - lower;
3057             return typeof(this)(newWl);
3058         }
3060         bool empty() const @property
3061         {
3062             return length == 0;
3063         }
3065         size_t length() const @property
3066         {
3067             return _length;
3068         }
3069     }
3071     /**
3072     Creates an instance of worker-local storage, initialized with a given
3073     value.  The value is $(D lazy) so that you can, for example, easily
3074     create one instance of a class for each worker.  For usage example,
3075     see the $(D WorkerLocalStorage) struct.
3076      */
3077     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3078     {
3079         WorkerLocalStorage!T ret;
3080         ret.initialize(this);
3081         foreach (i; 0 .. size + 1)
3082         {
3083             ret[i] = initialVal;
3084         }
3086         // Memory barrier to make absolutely sure that what we wrote is
3087         // visible to worker threads.
3088         ubyte barrierDummy;
3089         atomicSetUbyte(barrierDummy, 0);
3091         return ret;
3092     }
3094     /**
3095     Signals to all worker threads to terminate as soon as they are finished
3096     with their current $(D Task), or immediately if they are not executing a
3097     $(D Task).  $(D Task)s that were in queue will not be executed unless
3098     a call to $(D Task.workForce), $(D Task.yieldForce) or $(D Task.spinForce)
3099     causes them to be executed.
3101     Use only if you have waited on every $(D Task) and therefore know the
3102     queue is empty, or if you speculatively executed some tasks and no longer
3103     need the results.
3104      */
stop()3105     void stop() @trusted
3106     {
3107         queueLock();
3108         scope(exit) queueUnlock();
3109         atomicSetUbyte(status, PoolState.stopNow);
3110         notifyAll();
3111     }
3113     /**
3114     Signals worker threads to terminate when the queue becomes empty.
3116     If blocking argument is true, wait for all worker threads to terminate
3117     before returning.  This option might be used in applications where
3118     task results are never consumed-- e.g. when $(D TaskPool) is employed as a
3119     rudimentary scheduler for tasks which communicate by means other than
3120     return values.
3122     Warning:  Calling this function with $(D blocking = true) from a worker
3123               thread that is a member of the same $(D TaskPool) that
3124               $(D finish) is being called on will result in a deadlock.
3125      */
3126     void finish(bool blocking = false) @trusted
3127     {
3128         {
3129             queueLock();
3130             scope(exit) queueUnlock();
3131             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3132             notifyAll();
3133         }
3134         if (blocking)
3135         {
3136             // Use this thread as a worker until everything is finished.
3137             executeWorkLoop();
foreach(t;pool)3139             foreach (t; pool)
3140             {
3141                 // Maybe there should be something here to prevent a thread
3142                 // from calling join() on itself if this function is called
3143                 // from a worker thread in the same pool, but:
3144                 //
3145                 // 1.  Using an if statement to skip join() would result in
3146                 //     finish() returning without all tasks being finished.
3147                 //
3148                 // 2.  If an exception were thrown, it would bubble up to the
3149                 //     Task from which finish() was called and likely be
3150                 //     swallowed.
3151                 t.join();
3152             }
3153         }
3154     }
3156     /// Returns the number of worker threads in the pool.
size()3157     @property size_t size() @safe const pure nothrow
3158     {
3159         return pool.length;
3160     }
3162     /**
3163     Put a $(D Task) object on the back of the task queue.  The $(D Task)
3164     object may be passed by pointer or reference.
3166     Example:
3167     ---
3168     import std.file;
3170     // Create a task.
3171     auto t = task!read("foo.txt");
3173     // Add it to the queue to be executed.
3174     taskPool.put(t);
3175     ---
3177     Notes:
3179     @trusted overloads of this function are called for $(D Task)s if
3180     $(REF hasUnsharedAliasing, std,traits) is false for the $(D Task)'s
3181     return type or the function the $(D Task) executes is $(D pure).
3182     $(D Task) objects that meet all other requirements specified in the
3183     $(D @trusted) overloads of $(D task) and $(D scopedTask) may be created
3184     and executed from $(D @safe) code via $(D Task.executeInNewThread) but
3185     not via $(D TaskPool).
3187     While this function takes the address of variables that may
3188     be on the stack, some overloads are marked as @trusted.
3189     $(D Task) includes a destructor that waits for the task to complete
3190     before destroying the stack frame it is allocated on.  Therefore,
3191     it is impossible for the stack frame to be destroyed before the task is
3192     complete and no longer referenced by a $(D TaskPool).
3193     */
3194     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3195     if (!isSafeReturn!(typeof(task)))
3196     {
3197         task.pool = this;
3198         abstractPut(task.basePtr);
3199     }
3201     /// Ditto
3202     void put(alias fun, Args...)(Task!(fun, Args)* task)
3203     if (!isSafeReturn!(typeof(*task)))
3204     {
3205         import std.exception : enforce;
3206         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3207         put(*task);
3208     }
3210     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3211     if (isSafeReturn!(typeof(task)))
3212     {
3213         task.pool = this;
3214         abstractPut(task.basePtr);
3215     }
3217     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3218     if (isSafeReturn!(typeof(*task)))
3219     {
3220         import std.exception : enforce;
3221         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3222         put(*task);
3223     }
3225     /**
3226     These properties control whether the worker threads are daemon threads.
3227     A daemon thread is automatically terminated when all non-daemon threads
3228     have terminated.  A non-daemon thread will prevent a program from
3229     terminating as long as it has not terminated.
3231     If any $(D TaskPool) with non-daemon threads is active, either $(D stop)
3232     or $(D finish) must be called on it before the program can terminate.
3234     The worker treads in the $(D TaskPool) instance returned by the
3235     $(D taskPool) property are daemon by default.  The worker threads of
3236     manually instantiated task pools are non-daemon by default.
3238     Note:  For a size zero pool, the getter arbitrarily returns true and the
3239            setter has no effect.
3240     */
isDaemon()3241     bool isDaemon() @property @trusted
3242     {
3243         queueLock();
3244         scope(exit) queueUnlock();
3245         return (size == 0) ? true : pool[0].isDaemon;
3246     }
3248     /// Ditto
isDaemon(bool newVal)3249     void isDaemon(bool newVal) @property @trusted
3250     {
3251         queueLock();
3252         scope(exit) queueUnlock();
3253         foreach (thread; pool)
3254         {
3255             thread.isDaemon = newVal;
3256         }
3257     }
3259     /**
3260     These functions allow getting and setting the OS scheduling priority of
3261     the worker threads in this $(D TaskPool).  They forward to
3262     $(D core.thread.Thread.priority), so a given priority value here means the
3263     same thing as an identical priority value in $(D core.thread).
3265     Note:  For a size zero pool, the getter arbitrarily returns
3266            $(D core.thread.Thread.PRIORITY_MIN) and the setter has no effect.
3267     */
priority()3268     int priority() @property @trusted
3269     {
3270         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3271         pool[0].priority;
3272     }
3274     /// Ditto
priority(int newPriority)3275     void priority(int newPriority) @property @trusted
3276     {
3277         if (size > 0)
3278         {
3279             foreach (t; pool)
3280             {
3281                 t.priority = newPriority;
3282             }
3283         }
3284     }
3285 }
3287 /**
3288 Returns a lazily initialized global instantiation of $(D TaskPool).
3289 This function can safely be called concurrently from multiple non-worker
3290 threads.  The worker threads in this pool are daemon threads, meaning that it
3291 is not necessary to call $(D TaskPool.stop) or $(D TaskPool.finish) before
3292 terminating the main thread.
3293 */
taskPool()3294 @property TaskPool taskPool() @trusted
3295 {
3296     import std.concurrency : initOnce;
3297     __gshared TaskPool pool;
3298     return initOnce!pool({
3299         auto p = new TaskPool(defaultPoolThreads);
3300         p.isDaemon = true;
3301         return p;
3302     }());
3303 }
3305 private shared uint _defaultPoolThreads;
this()3306 shared static this()
3307 {
3308     atomicStore(_defaultPoolThreads, totalCPUs - 1);
3309 }
3311 /**
3312 These properties get and set the number of worker threads in the $(D TaskPool)
3313 instance returned by $(D taskPool).  The default value is $(D totalCPUs) - 1.
3314 Calling the setter after the first call to $(D taskPool) does not changes
3315 number of worker threads in the instance returned by $(D taskPool).
3316 */
defaultPoolThreads()3317 @property uint defaultPoolThreads() @trusted
3318 {
3319     return atomicLoad(_defaultPoolThreads);
3320 }
3322 /// Ditto
defaultPoolThreads(uint newVal)3323 @property void defaultPoolThreads(uint newVal) @trusted
3324 {
3325     atomicStore(_defaultPoolThreads, newVal);
3326 }
3328 /**
3329 Convenience functions that forwards to $(D taskPool.parallel).  The
3330 purpose of these is to make parallel foreach less verbose and more
3331 readable.
3333 Example:
3334 ---
3335 // Find the logarithm of every number from
3336 // 1 to 1_000_000 in parallel, using the
3337 // default TaskPool instance.
3338 auto logs = new double[1_000_000];
3340 foreach (i, ref elem; parallel(logs))
3341 {
3342     elem = log(i + 1.0);
3343 }
3344 ---
3346 */
3347 ParallelForeach!R parallel(R)(R range)
3348 {
3349     return taskPool.parallel(range);
3350 }
3352 /// Ditto
3353 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3354 {
3355     return taskPool.parallel(range, workUnitSize);
3356 }
3358 // Thrown when a parallel foreach loop is broken from.
3359 class ParallelForeachError : Error
3360 {
this()3361     this()
3362     {
3363         super("Cannot break from a parallel foreach loop using break, return, "
3364               ~ "labeled break/continue or goto statements.");
3365     }
3366 }
3368 /*------Structs that implement opApply for parallel foreach.------------------*/
randLen(R)3369 private template randLen(R)
3370 {
3371     enum randLen = isRandomAccessRange!R && hasLength!R;
3372 }
submitAndExecute(TaskPool pool,scope void delegate ()doIt)3374 private void submitAndExecute(
3375     TaskPool pool,
3376     scope void delegate() doIt
3377 )
3378 {
3379     import core.exception : OutOfMemoryError;
3380     immutable nThreads = pool.size + 1;
3382     alias PTask = typeof(scopedTask(doIt));
3383     import core.stdc.stdlib : malloc, free;
3384     import core.stdc.string : memcpy;
3386     // The logical thing to do would be to just use alloca() here, but that
3387     // causes problems on Windows for reasons that I don't understand
3388     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3389     // to Bug 3753.  Therefore, allocate a fixed buffer and fall back to
3390     // malloc() if someone's using a ridiculous amount of threads.  Also,
3391     // the using a byte array instead of a PTask array as the fixed buffer
3392     // is to prevent d'tors from being called on uninitialized excess PTask
3393     // instances.
3394     enum nBuf = 64;
3395     byte[nBuf * PTask.sizeof] buf = void;
3396     PTask[] tasks;
3397     if (nThreads <= nBuf)
3398     {
3399         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3400     }
3401     else
3402     {
3403         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3404         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3405         tasks = ptr[0 .. nThreads];
3406     }
3408     scope(exit)
3409     {
3410         if (nThreads > nBuf)
3411         {
3412             free(tasks.ptr);
3413         }
3414     }
3416     foreach (ref t; tasks)
3417     {
3418         import core.stdc.string : memcpy;
3420         // This silly looking code is necessary to prevent d'tors from being
3421         // called on uninitialized objects.
3422         auto temp = scopedTask(doIt);
3423         memcpy(&t, &temp, PTask.sizeof);
3425         // This has to be done to t after copying, not temp before copying.
3426         // Otherwise, temp's destructor will sit here and wait for the
3427         // task to finish.
3428         t.pool = pool;
3429     }
3431     foreach (i; 1 .. tasks.length - 1)
3432     {
3433         tasks[i].next = tasks[i + 1].basePtr;
3434         tasks[i + 1].prev = tasks[i].basePtr;
3435     }
3437     if (tasks.length > 1)
3438     {
3439         pool.queueLock();
3440         scope(exit) pool.queueUnlock();
3442         pool.abstractPutGroupNoSync(
3443             tasks[1].basePtr,
3444             tasks[$ - 1].basePtr
3445         );
3446     }
3448     if (tasks.length > 0)
3449     {
3450         try
3451         {
3452             tasks[0].job();
3453         }
3454         catch (Throwable e)
3455         {
3456             tasks[0].exception = e; // nocoverage
3457         }
3458         tasks[0].taskStatus = TaskStatus.done;
3460         // Try to execute each of these in the current thread
3461         foreach (ref task; tasks[1..$])
3462         {
3463             pool.tryDeleteExecute(task.basePtr);
3464         }
3465     }
3467     Throwable firstException, lastException;
3469     foreach (i, ref task; tasks)
3470     {
3471         try
3472         {
3473             task.yieldForce;
3474         }
3475         catch (Throwable e)
3476         {
3477             addToChain(e, firstException, lastException);
3478             continue;
3479         }
3480     }
3482     if (firstException) throw firstException;
3483 }
foreachErr()3485 void foreachErr()
3486 {
3487     throw new ParallelForeachError();
3488 }
doSizeZeroCase(R,Delegate)3490 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3491 {
3492     with(p)
3493     {
3494         int res = 0;
3495         size_t index = 0;
3497         // The explicit ElementType!R in the foreach loops is necessary for
3498         // correct behavior when iterating over strings.
3499         static if (hasLvalueElements!R)
3500         {
3501             foreach (ref ElementType!R elem; range)
3502             {
3503                 static if (Parameters!dg.length == 2)
3504                 {
3505                     res = dg(index, elem);
3506                 }
3507                 else
3508                 {
3509                     res = dg(elem);
3510                 }
3511                 if (res) break;
3512                 index++;
3513             }
3514         }
3515         else
3516         {
3517             foreach (ElementType!R elem; range)
3518             {
3519                 static if (Parameters!dg.length == 2)
3520                 {
3521                     res = dg(index, elem);
3522                 }
3523                 else
3524                 {
3525                     res = dg(elem);
3526                 }
3527                 if (res) break;
3528                 index++;
3529             }
3530         }
3531         if (res) foreachErr;
3532         return res;
3533     }
3534 }
3536 private enum string parallelApplyMixinRandomAccess = q{
3537     // Handle empty thread pool as special case.
3538     if (pool.size == 0)
3539     {
3540         return doSizeZeroCase(this, dg);
3541     }
3543     // Whether iteration is with or without an index variable.
3544     enum withIndex = Parameters!(typeof(dg)).length == 2;
3546     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3547     immutable len = range.length;
3548     if (!len) return 0;
3550     shared bool shouldContinue = true;
3552     void doIt()
3553     {
3554         import std.algorithm.comparison : min;
3556         scope(failure)
3557         {
3558             // If an exception is thrown, all threads should bail.
3559             atomicStore(shouldContinue, false);
3560         }
3562         while (atomicLoad(shouldContinue))
3563         {
3564             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3565             immutable start = workUnitSize * myUnitIndex;
3566             if (start >= len)
3567             {
3568                 atomicStore(shouldContinue, false);
3569                 break;
3570             }
3572             immutable end = min(len, start + workUnitSize);
3574             foreach (i; start .. end)
3575             {
3576                 static if (withIndex)
3577                 {
3578                     if (dg(i, range[i])) foreachErr();
3579                 }
3580                 else
3581                 {
3582                     if (dg(range[i])) foreachErr();
3583                 }
3584             }
3585         }
3586     }
3588     submitAndExecute(pool, &doIt);
3590     return 0;
3591 };
3593 enum string parallelApplyMixinInputRange = q{
3594     // Handle empty thread pool as special case.
3595     if (pool.size == 0)
3596     {
3597         return doSizeZeroCase(this, dg);
3598     }
3600     // Whether iteration is with or without an index variable.
3601     enum withIndex = Parameters!(typeof(dg)).length == 2;
3603     // This protects the range while copying it.
3604     auto rangeMutex = new Mutex();
3606     shared bool shouldContinue = true;
3608     // The total number of elements that have been popped off range.
3609     // This is updated only while protected by rangeMutex;
3610     size_t nPopped = 0;
3612     static if (
3613         is(typeof(range.buf1)) &&
3614         is(typeof(range.bufPos)) &&
3615         is(typeof(range.doBufSwap()))
3616     )
3617     {
3618         // Make sure we don't have the buffer recycling overload of
3619         // asyncBuf.
3620         static if (
3621             is(typeof(range.source)) &&
3622             isRoundRobin!(typeof(range.source))
3623         )
3624         {
3625             static assert(0, "Cannot execute a parallel foreach loop on " ~
3626             "the buffer recycling overload of asyncBuf.");
3627         }
3629         enum bool bufferTrick = true;
3630     }
3631     else
3632     {
3633         enum bool bufferTrick = false;
3634     }
3636     void doIt()
3637     {
3638         scope(failure)
3639         {
3640             // If an exception is thrown, all threads should bail.
3641             atomicStore(shouldContinue, false);
3642         }
3644         static if (hasLvalueElements!R)
3645         {
3646             alias Temp = ElementType!R*[];
3647             Temp temp;
3649             // Returns:  The previous value of nPopped.
3650             size_t makeTemp()
3651             {
3652                 import std.algorithm.internal : addressOf;
3653                 import std.array : uninitializedArray;
3655                 if (temp is null)
3656                 {
3657                     temp = uninitializedArray!Temp(workUnitSize);
3658                 }
3660                 rangeMutex.lock();
3661                 scope(exit) rangeMutex.unlock();
3663                 size_t i = 0;
3664                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3665                 {
3666                     temp[i] = addressOf(range.front);
3667                 }
3669                 temp = temp[0 .. i];
3670                 auto ret = nPopped;
3671                 nPopped += temp.length;
3672                 return ret;
3673             }
3675         }
3676         else
3677         {
3679             alias Temp = ElementType!R[];
3680             Temp temp;
3682             // Returns:  The previous value of nPopped.
3683             static if (!bufferTrick) size_t makeTemp()
3684             {
3685                 import std.array : uninitializedArray;
3687                 if (temp is null)
3688                 {
3689                     temp = uninitializedArray!Temp(workUnitSize);
3690                 }
3692                 rangeMutex.lock();
3693                 scope(exit) rangeMutex.unlock();
3695                 size_t i = 0;
3696                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3697                 {
3698                     temp[i] = range.front;
3699                 }
3701                 temp = temp[0 .. i];
3702                 auto ret = nPopped;
3703                 nPopped += temp.length;
3704                 return ret;
3705             }
3707             static if (bufferTrick) size_t makeTemp()
3708             {
3709                 import std.algorithm.mutation : swap;
3710                 rangeMutex.lock();
3711                 scope(exit) rangeMutex.unlock();
3713                 // Elide copying by just swapping buffers.
3714                 temp.length = range.buf1.length;
3715                 swap(range.buf1, temp);
3717                 // This is necessary in case popFront() has been called on
3718                 // range before entering the parallel foreach loop.
3719                 temp = temp[range.bufPos..$];
3721                 static if (is(typeof(range._length)))
3722                 {
3723                     range._length -= (temp.length - range.bufPos);
3724                 }
3726                 range.doBufSwap();
3727                 auto ret = nPopped;
3728                 nPopped += temp.length;
3729                 return ret;
3730             }
3731         }
3733         while (atomicLoad(shouldContinue))
3734         {
3735             auto overallIndex = makeTemp();
3736             if (temp.empty)
3737             {
3738                 atomicStore(shouldContinue, false);
3739                 break;
3740             }
3742             foreach (i; 0 .. temp.length)
3743             {
3744                 scope(success) overallIndex++;
3746                 static if (hasLvalueElements!R)
3747                 {
3748                     static if (withIndex)
3749                     {
3750                         if (dg(overallIndex, *temp[i])) foreachErr();
3751                     }
3752                     else
3753                     {
3754                         if (dg(*temp[i])) foreachErr();
3755                     }
3756                 }
3757                 else
3758                 {
3759                     static if (withIndex)
3760                     {
3761                         if (dg(overallIndex, temp[i])) foreachErr();
3762                     }
3763                     else
3764                     {
3765                         if (dg(temp[i])) foreachErr();
3766                     }
3767                 }
3768             }
3769         }
3770     }
3772     submitAndExecute(pool, &doIt);
3774     return 0;
3775 };
3777 // Calls e.next until the end of the chain is found.
findLastException(Throwable e)3778 private Throwable findLastException(Throwable e) pure nothrow
3779 {
3780     if (e is null) return null;
3782     while (e.next)
3783     {
3784         e = e.next;
3785     }
3787     return e;
3788 }
3790 // Adds e to the exception chain.
addToChain(Throwable e,ref Throwable firstException,ref Throwable lastException)3791 private void addToChain(
3792     Throwable e,
3793     ref Throwable firstException,
3794     ref Throwable lastException
3795 ) pure nothrow
3796 {
3797     if (firstException)
3798     {
3799         assert(lastException); // nocoverage
3800         lastException.next = e; // nocoverage
3801         lastException = findLastException(e); // nocoverage
3802     }
3803     else
3804     {
3805         firstException = e;
3806         lastException = findLastException(e);
3807     }
3808 }
ParallelForeach(R)3810 private struct ParallelForeach(R)
3811 {
3812     TaskPool pool;
3813     R range;
3814     size_t workUnitSize;
3815     alias E = ElementType!R;
3817     static if (hasLvalueElements!R)
3818     {
3819         alias NoIndexDg = int delegate(ref E);
3820         alias IndexDg = int delegate(size_t, ref E);
3821     }
3822     else
3823     {
3824         alias NoIndexDg = int delegate(E);
3825         alias IndexDg = int delegate(size_t, E);
3826     }
3828     int opApply(scope NoIndexDg dg)
3829     {
3830         static if (randLen!R)
3831         {
3832             mixin(parallelApplyMixinRandomAccess);
3833         }
3834         else
3835         {
3836             mixin(parallelApplyMixinInputRange);
3837         }
3838     }
3840     int opApply(scope IndexDg dg)
3841     {
3842         static if (randLen!R)
3843         {
3844             mixin(parallelApplyMixinRandomAccess);
3845         }
3846         else
3847         {
3848             mixin(parallelApplyMixinInputRange);
3849         }
3850     }
3851 }
3853 /*
3854 This struct buffers the output of a callable that outputs data into a
3855 user-supplied buffer into a set of buffers of some fixed size.  It allows these
3856 buffers to be accessed with an input range interface.  This is used internally
3857 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
3858 instance and forwards it to the input range overload of asyncBuf.
3859 */
RoundRobinBuffer(C1,C2)3860 private struct RoundRobinBuffer(C1, C2)
3861 {
3862     // No need for constraints because they're already checked for in asyncBuf.
3864     alias Array = Parameters!(C1.init)[0];
3865     alias T = typeof(Array.init[0]);
3867     T[][] bufs;
3868     size_t index;
3869     C1 nextDel;
3870     C2 emptyDel;
3871     bool _empty;
3872     bool primed;
3874     this(
3875         C1 nextDel,
3876         C2 emptyDel,
3877         size_t initialBufSize,
3878         size_t nBuffers
3879     ) {
3880         this.nextDel = nextDel;
3881         this.emptyDel = emptyDel;
3882         bufs.length = nBuffers;
3884         foreach (ref buf; bufs)
3885         {
3886             buf.length = initialBufSize;
3887         }
3888     }
3890     void prime()
3891     in
3892     {
3893         assert(!empty);
3894     }
3895     body
3896     {
3897         scope(success) primed = true;
3898         nextDel(bufs[index]);
3899     }
3902     T[] front() @property
3903     in
3904     {
3905         assert(!empty);
3906     }
3907     body
3908     {
3909         if (!primed) prime();
3910         return bufs[index];
3911     }
3913     void popFront()
3914     {
3915         if (empty || emptyDel())
3916         {
3917             _empty = true;
3918             return;
3919         }
3921         index = (index + 1) % bufs.length;
3922         primed = false;
3923     }
3925     bool empty() @property const @safe pure nothrow
3926     {
3927         return _empty;
3928     }
3929 }
version(unittest)3931 version (unittest)
3932 {
3933     // This was the only way I could get nested maps to work.
3934     __gshared TaskPool poolInstance;
3936     import std.stdio;
3937 }
3939 // These test basic functionality but don't stress test for threading bugs.
3940 // These are the tests that should be run every time Phobos is compiled.
3941 @system unittest
3942 {
3943     import std.algorithm.comparison : equal, min, max;
3944     import std.algorithm.iteration : filter, map, reduce;
3945     import std.array : split;
3946     import std.conv : text;
3947     import std.exception : assertThrown;
3948     import std.math : approxEqual, sqrt, log;
3949     import std.range : indexed, iota, join;
3950     import std.typecons : Tuple, tuple;
3952     poolInstance = new TaskPool(2);
3953     scope(exit) poolInstance.stop();
3955     // The only way this can be verified is manually.
3956     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
3958     auto oldPriority = poolInstance.priority;
3959     poolInstance.priority = Thread.PRIORITY_MAX;
3960     assert(poolInstance.priority == Thread.PRIORITY_MAX);
3962     poolInstance.priority = Thread.PRIORITY_MIN;
3963     assert(poolInstance.priority == Thread.PRIORITY_MIN);
3965     poolInstance.priority = oldPriority;
3966     assert(poolInstance.priority == oldPriority);
refFun(ref uint num)3968     static void refFun(ref uint num)
3969     {
3970         num++;
3971     }
3973     uint x;
3975     // Test task().
3976     auto t = task!refFun(x);
3977     poolInstance.put(t);
3978     t.yieldForce;
3979     assert(t.args[0] == 1);
3981     auto t2 = task(&refFun, x);
3982     poolInstance.put(t2);
3983     t2.yieldForce;
3984     assert(t2.args[0] == 1);
3986     // Test scopedTask().
3987     auto st = scopedTask!refFun(x);
3988     poolInstance.put(st);
3989     st.yieldForce;
3990     assert(st.args[0] == 1);
3992     auto st2 = scopedTask(&refFun, x);
3993     poolInstance.put(st2);
3994     st2.yieldForce;
3995     assert(st2.args[0] == 1);
3997     // Test executeInNewThread().
3998     auto ct = scopedTask!refFun(x);
3999     ct.executeInNewThread(Thread.PRIORITY_MAX);
4000     ct.yieldForce;
4001     assert(ct.args[0] == 1);
4003     // Test ref return.
4004     uint toInc = 0;
makeRef(T)4005     static ref T makeRef(T)(ref T num)
4006     {
4007         return num;
4008     }
4010     auto t3 = task!makeRef(toInc);
4011     taskPool.put(t3);
4012     assert(t3.args[0] == 0);
4013     t3.spinForce++;
4014     assert(t3.args[0] == 1);
testSafe()4016     static void testSafe() @safe {
4017         static int bump(int num)
4018         {
4019             return num + 1;
4020         }
4022         auto safePool = new TaskPool(0);
4023         auto t = task(&bump, 1);
4024         taskPool.put(t);
4025         assert(t.yieldForce == 2);
4027         auto st = scopedTask(&bump, 1);
4028         taskPool.put(st);
4029         assert(st.yieldForce == 2);
4030         safePool.stop();
4031     }
4033     auto arr = [1,2,3,4,5];
4034     auto nums = new uint[5];
4035     auto nums2 = new uint[5];
4037     foreach (i, ref elem; poolInstance.parallel(arr))
4038     {
4039         elem++;
4040         nums[i] = cast(uint) i + 2;
4041         nums2[i] = elem;
4042     }
4044     assert(nums == [2,3,4,5,6], text(nums));
4045     assert(nums2 == nums, text(nums2));
4046     assert(arr == nums, text(arr));
4048     // Test const/immutable arguments.
add(int lhs,int rhs)4049     static int add(int lhs, int rhs)
4050     {
4051         return lhs + rhs;
4052     }
4053     immutable addLhs = 1;
4054     immutable addRhs = 2;
4055     auto addTask = task(&add, addLhs, addRhs);
4056     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4057     poolInstance.put(addTask);
4058     poolInstance.put(addScopedTask);
4059     assert(addTask.yieldForce == 3);
4060     assert(addScopedTask.yieldForce == 3);
4062     // Test parallel foreach with non-random access range.
4063     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4065     foreach (i, elem; poolInstance.parallel(range))
4066     {
4067         nums[i] = cast(uint) i;
4068     }
4070     assert(nums == [0,1,2,3,4]);
4072     auto logs = new double[1_000_000];
4073     foreach (i, ref elem; poolInstance.parallel(logs))
4074     {
4075         elem = log(i + 1.0);
4076     }
foreach(i,elem;logs)4078     foreach (i, elem; logs)
4079     {
4080         assert(approxEqual(elem, cast(double) log(i + 1)));
4081     }
4083     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4084     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4085     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4086            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4088     auto tupleBuf = new Tuple!(int, int)[3];
4089     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4090     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4091     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4092     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4094     // Test amap with a non-array buffer.
4095     auto toIndex = new int[5];
4096     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4097     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4098     assert(equal(ind, [2, 4, 6, 8, 10]));
4099     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4100     poolInstance.amap!"a / 2"(ind, ind);
4101     assert(equal(ind, [1, 2, 3, 4, 5]));
4102     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4104     auto buf = new int[5];
4105     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4106     assert(buf == [1,4,9,16,25]);
4107     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4108     assert(buf == [1,4,9,16,25]);
4110     assert(poolInstance.reduce!"a + b"([1]) == 1);
4111     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4112     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4113     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4114     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4115     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4116            tuple(10, 24));
4118     immutable serialAns = reduce!"a + b"(iota(1000));
4119     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4120     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4122     // Test worker-local storage.
4123     auto wl = poolInstance.workerLocalStorage(0);
4124     foreach (i; poolInstance.parallel(iota(1000), 1))
4125     {
4126         wl.get = wl.get + i;
4127     }
4129     auto wlRange = wl.toRange;
4130     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4131     assert(parallelSum == 499500);
4132     assert(wlRange[0 .. 1][0] == wlRange[0]);
4133     assert(wlRange[1 .. 2][0] == wlRange[1]);
4135     // Test finish()
4136     {
slowFun()4137         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4139         auto pool1 = new TaskPool();
4140         auto tSlow = task!slowFun();
4141         pool1.put(tSlow);
4142         pool1.finish();
4143         tSlow.yieldForce;
4144         // Can't assert that pool1.status == PoolState.stopNow because status
4145         // doesn't change until after the "done" flag is set and the waiting
4146         // thread is woken up.
4148         auto pool2 = new TaskPool();
4149         auto tSlow2 = task!slowFun();
4150         pool2.put(tSlow2);
4151         pool2.finish(true); // blocking
4152         assert(tSlow2.done);
4154         // Test fix for Bug 8582 by making pool size zero.
4155         auto pool3 = new TaskPool(0);
4156         auto tSlow3 = task!slowFun();
4157         pool3.put(tSlow3);
4158         pool3.finish(true); // blocking
4159         assert(tSlow3.done);
4161         // This is correct because no thread will terminate unless pool2.status
4162         // and pool3.status have already been set to stopNow.
4163         assert(pool2.status == TaskPool.PoolState.stopNow);
4164         assert(pool3.status == TaskPool.PoolState.stopNow);
4165     }
4167     // Test default pool stuff.
4168     assert(taskPool.size == totalCPUs - 1);
4170     nums = new uint[1000];
4171     foreach (i; parallel(iota(1000)))
4172     {
4173         nums[i] = cast(uint) i;
4174     }
4175     assert(equal(nums, iota(1000)));
4177     assert(equal(
4178                poolInstance.map!"a * a"(iota(30_000_001), 10_000),
4179                map!"a * a"(iota(30_000_001))
4180            ));
4182     // The filter is to kill random access and test the non-random access
4183     // branch.
4184     assert(equal(
4185                poolInstance.map!"a * a"(
4186                    filter!"a == a"(iota(30_000_001)
4187                                   ), 10_000, 1000),
4188                map!"a * a"(iota(30_000_001))
4189            ));
4191     assert(
4192         reduce!"a + b"(0UL,
4193                        poolInstance.map!"a * a"(iota(3_000_001), 10_000)
4194                       ) ==
4195         reduce!"a + b"(0UL,
4196                        map!"a * a"(iota(3_000_001))
4197                       )
4198     );
4200     assert(equal(
4201                iota(1_000_002),
4202                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4203            ));
4205     {
4206         import std.conv : to;
4207         import std.file : deleteme;
4209         string temp_file = deleteme ~ "-tempDelMe.txt";
4210         auto file = File(temp_file, "wb");
scope(exit)4211         scope(exit)
4212         {
4213             file.close();
4214             import std.file;
4215             remove(temp_file);
4216         }
4218         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
foreach(row;written)4219         foreach (row; written)
4220         {
4221             file.writeln(join(to!(string[])(row), "\t"));
4222         }
4224         file = File(temp_file);
next(ref char[]buf)4226         void next(ref char[] buf)
4227         {
4228             file.readln(buf);
4229             import std.string : chomp;
4230             buf = chomp(buf);
4231         }
4233         double[][] read;
4234         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
foreach(line;asyncReader)4236         foreach (line; asyncReader)
4237         {
4238             if (line.length == 0) continue;
4239             auto ls = line.split("\t");
4240             read ~= to!(double[])(ls);
4241         }
4243         assert(read == written);
4244         file.close();
4245     }
4247     // Test Map/AsyncBuf chaining.
4249     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4250     auto temp = poolInstance.map!sqrt(
4251                     abuf, 100, 5
4252                 );
4253     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4254     lmchain.popFront();
4256     int ii;
foreach(elem;(lmchain))4257     foreach ( elem; (lmchain))
4258     {
4259         if (!approxEqual(elem, ii))
4260         {
4261             stderr.writeln(ii, '\t', elem);
4262         }
4263         ii++;
4264     }
4266     // Test buffer trick in parallel foreach.
4267     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4268     abuf.popFront();
4269     auto bufTrickTest = new size_t[abuf.length];
foreach(i,elem;parallel (abuf))4270     foreach (i, elem; parallel(abuf))
4271     {
4272         bufTrickTest[i] = i;
4273     }
4275     assert(equal(iota(1_000_000), bufTrickTest));
4277     auto myTask = task!(std.math.abs)(-1);
4278     taskPool.put(myTask);
4279     assert(myTask.spinForce == 1);
4281     // Test that worker local storage from one pool receives an index of 0
4282     // when the index is queried w.r.t. another pool.  The only way to do this
4283     // is non-deterministically.
4284     foreach (i; parallel(iota(1000), 1))
4285     {
4286         assert(poolInstance.workerIndex == 0);
4287     }
4289     foreach (i; poolInstance.parallel(iota(1000), 1))
4290     {
4291         assert(taskPool.workerIndex == 0);
4292     }
4294     // Test exception handling.
parallelForeachThrow()4295     static void parallelForeachThrow()
4296     {
4297         foreach (elem; parallel(iota(10)))
4298         {
4299             throw new Exception("");
4300         }
4301     }
4303     assertThrown!Exception(parallelForeachThrow());
reduceException(int a,int b)4305     static int reduceException(int a, int b)
4306     {
4307         throw new Exception("");
4308     }
4310     assertThrown!Exception(
4311         poolInstance.reduce!reduceException(iota(3))
4312     );
mapException(int a)4314     static int mapException(int a)
4315     {
4316         throw new Exception("");
4317     }
4319     assertThrown!Exception(
4320         poolInstance.amap!mapException(iota(3))
4321     );
mapThrow()4323     static void mapThrow()
4324     {
4325         auto m = poolInstance.map!mapException(iota(3));
4326         m.popFront();
4327     }
4329     assertThrown!Exception(mapThrow());
4331     struct ThrowingRange
4332     {
frontThrowingRange4333         @property int front()
4334         {
4335             return 1;
4336         }
popFrontThrowingRange4337         void popFront()
4338         {
4339             throw new Exception("");
4340         }
4341         enum bool empty = false;
4342     }
4344     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4345 }
4347 //version = parallelismStressTest;
4349 // These are more like stress tests than real unit tests.  They print out
4350 // tons of stuff and should not be run every time make unittest is run.
version(parallelismStressTest)4351 version (parallelismStressTest)
4352 {
4353     @safe unittest
4354     {
4355         size_t attempt;
4356         for (; attempt < 10; attempt++)
4357             foreach (poolSize; [0, 4])
4358         {
4360             poolInstance = new TaskPool(poolSize);
4362             uint[] numbers = new uint[1_000];
4364             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4365             {
4366                 numbers[i] = cast(uint) i;
4367             }
4369             // Make sure it works.
4370             foreach (i; 0 .. numbers.length)
4371             {
4372                 assert(numbers[i] == i);
4373             }
4375             stderr.writeln("Done creating nums.");
4378             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4379             foreach (num; poolInstance.parallel(myNumbers))
4380             {
4381                 assert(num % 7 > 0 && num < 1000);
4382             }
4383             stderr.writeln("Done modulus test.");
4385             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4386             assert(squares.length == numbers.length);
4387             foreach (i, number; numbers)
4388             {
4389                 assert(squares[i] == number * number);
4390             }
4391             stderr.writeln("Done squares.");
4393             auto sumFuture = task!( reduce!"a + b" )(numbers);
4394             poolInstance.put(sumFuture);
4396             ulong sumSquares = 0;
4397             foreach (elem; numbers)
4398             {
4399                 sumSquares += elem * elem;
4400             }
4402             uint mySum = sumFuture.spinForce();
4403             assert(mySum == 999 * 1000 / 2);
4405             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4406             assert(mySum == mySumParallel);
4407             stderr.writeln("Done sums.");
4409             auto myTask = task(
4410             {
4411                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4412             });
4413             poolInstance.put(myTask);
4415             auto nestedOuter = "abcd";
4416             auto nestedInner =  iota(0, 10, 2);
4418             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4419             {
4420                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4421                 {
4422                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4423                 }
4424             }
4426             poolInstance.stop();
4427         }
4429         assert(attempt == 10);
4430         writeln("Press enter to go to next round of unittests.");
4431         readln();
4432     }
4434     // These unittests are intended more for actual testing and not so much
4435     // as examples.
4436     @safe unittest
4437     {
4438         foreach (attempt; 0 .. 10)
4439         foreach (poolSize; [0, 4])
4440         {
4441             poolInstance = new TaskPool(poolSize);
4443             // Test indexing.
4444             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4445             assert(poolInstance.workerIndex() == 0);
4447             // Test worker-local storage.
4448             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4449             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4450             {
4451                 workerLocalStorage.get++;
4452             }
4453             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4454             1_000_000 + poolInstance.size + 1);
4456             // Make sure work is reasonably balanced among threads.  This test is
4457             // non-deterministic and is more of a sanity check than something that
4458             // has an absolute pass/fail.
4459             shared(uint)[void*] nJobsByThread;
4460             foreach (thread; poolInstance.pool)
4461             {
4462                 nJobsByThread[cast(void*) thread] = 0;
4463             }
4464             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4466             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4467             {
4468                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4469             }
4471             stderr.writeln("\nCurrent thread is:  ",
4472             cast(void*) Thread.getThis());
4473             stderr.writeln("Workload distribution:  ");
4474             foreach (k, v; nJobsByThread)
4475             {
4476                 stderr.writeln(k, '\t', v);
4477             }
4479             // Test whether amap can be nested.
4480             real[][] matrix = new real[][](1000, 1000);
4481             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4482             {
4483                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4484                 {
4485                     matrix[i][j] = i * j;
4486                 }
4487             }
4489             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4490             static real mySqrt(real num)
4491             {
4492                 return sqrt(num);
4493             }
4495             static real[] parallelSqrt(real[] nums)
4496             {
4497                 return poolInstance.amap!mySqrt(nums);
4498             }
4500             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4502             foreach (i, row; sqrtMatrix)
4503             {
4504                 foreach (j, elem; row)
4505                 {
4506                     real shouldBe = sqrt( cast(real) i * j);
4507                     assert(approxEqual(shouldBe, elem));
4508                     sqrtMatrix[i][j] = shouldBe;
4509                 }
4510             }
4512             auto saySuccess = task(
4513             {
4514                 stderr.writeln(
4515                     "Success doing matrix stuff that involves nested pool use.");
4516             });
4517             poolInstance.put(saySuccess);
4518             saySuccess.workForce();
4520             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4521             // matrix.
4523             static real parallelSum(real[] input)
4524             {
4525                 return poolInstance.reduce!"a + b"(input);
4526             }
4528             auto sumSqrt = poolInstance.reduce!"a + b"(
4529                                poolInstance.amap!parallelSum(
4530                                    sqrtMatrix
4531                                )
4532                            );
4534             assert(approxEqual(sumSqrt, 4.437e8));
4535             stderr.writeln("Done sum of square roots.");
4537             // Test whether tasks work with function pointers.
4538             auto nanTask = task(&isNaN, 1.0L);
4539             poolInstance.put(nanTask);
4540             assert(nanTask.spinForce == false);
4542             if (poolInstance.size > 0)
4543             {
4544                 // Test work waiting.
4545                 static void uselessFun()
4546                 {
4547                     foreach (i; 0 .. 1_000_000) {}
4548                 }
4550                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4551                 foreach (ref uselessTask; uselessTasks)
4552                 {
4553                     uselessTask = task(&uselessFun);
4554                 }
4555                 foreach (ref uselessTask; uselessTasks)
4556                 {
4557                     poolInstance.put(uselessTask);
4558                 }
4559                 foreach (ref uselessTask; uselessTasks)
4560                 {
4561                     uselessTask.workForce();
4562                 }
4563             }
4565             // Test the case of non-random access + ref returns.
4566             int[] nums = [1,2,3,4,5];
4567             static struct RemoveRandom
4568             {
4569                 int[] arr;
4571                 ref int front()
4572                 {
4573                     return arr.front;
4574                 }
4575                 void popFront()
4576                 {
4577                     arr.popFront();
4578                 }
4579                 bool empty()
4580                 {
4581                     return arr.empty;
4582                 }
4583             }
4585             auto refRange = RemoveRandom(nums);
4586             foreach (ref elem; poolInstance.parallel(refRange))
4587             {
4588                 elem++;
4589             }
4590             assert(nums == [2,3,4,5,6], text(nums));
4591             stderr.writeln("Nums:  ", nums);
4593             poolInstance.stop();
4594         }
4595     }
4596 }
version(unittest)4598 version (unittest)
4599 {
4600     struct __S_12733
4601     {
4602         invariant() { assert(checksum == 1_234_567_890); }
4603         this(ulong u){n = u;}
4604         void opAssign(__S_12733 s){this.n = s.n;}
4605         ulong n;
4606         ulong checksum = 1_234_567_890;
4607     }
4609     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4610 }
4612 @system unittest
4613 {
4614     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4616     auto result = taskPool.amap!__genPair_12733(data);
4617 }
4619 @safe unittest
4620 {
4621     import std.range : iota;
4623     // this test was in std.range, but caused cycles.
4624     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4625 }
4627 @safe unittest
4628 {
4629     import std.algorithm.iteration : each;
4631     long[] arr;
4632     static assert(is(typeof({
4633         arr.parallel.each!"a++";
4634     })));
4635 }
4637 // https://issues.dlang.org/show_bug.cgi?id=17539
4638 @system unittest
4639 {
4640     import std.random : rndGen;
4641     // ensure compilation
4642     try foreach (rnd; rndGen.parallel) break;
catch(ParallelForeachError e)4643     catch (ParallelForeachError e) {}
4644 }