1 /**
2 $(D std._parallelism) implements high-level primitives for SMP _parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise _parallelism. $(D std._parallelism) is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread. For communication between arbitrary threads, see
8 $(D std.concurrency).
9
10 $(D std._parallelism) is based on the concept of a $(D Task). A $(D Task) is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other $(D Task). Using $(D Task)
13 directly allows programming with a future/promise paradigm. All other
14 supported _parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over $(D Task). They
16 automatically create one or more $(D Task) objects, or closely related types
17 that are conceptually identical but not part of the public API.
18
19 After creation, a $(D Task) may be executed in a new thread, or submitted
20 to a $(D TaskPool) for execution. A $(D TaskPool) encapsulates a task queue
21 and its worker threads. Its purpose is to efficiently map a large
22 number of $(D Task)s onto a smaller number of threads. A task queue is a
23 FIFO queue of $(D Task) objects that have been submitted to the
24 $(D TaskPool) and are awaiting execution. A worker thread is a thread that
25 is associated with exactly one task queue. It executes the $(D Task) at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available. Each task queue is associated with zero or
28 more worker threads. If the result of a $(D Task) is needed before execution
29 by a worker thread has begun, the $(D Task) can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31
32 Warning: Unless marked as $(D @trusted) or $(D @safe), artifacts in
33 this module allow implicit data sharing between threads and cannot
34 guarantee that client code is free from low level data races.
35
36 Source: $(PHOBOSSRC std/_parallelism.d)
37 Author: David Simcha
38 Copyright: Copyright (c) 2009-2011, David Simcha.
39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42
43 ///
44 @system unittest
45 {
46 import std.algorithm.iteration : map;
47 import std.math : approxEqual;
48 import std.parallelism : taskPool;
49 import std.range : iota;
50
51 // Parallel reduce can be combined with
52 // std.algorithm.iteration.map to interesting effect.
53 // The following example (thanks to Russel Winder)
54 // calculates pi by quadrature using
55 // std.algorithm.map and TaskPool.reduce.
56 // getTerm is evaluated in parallel as needed by
57 // TaskPool.reduce.
58 //
59 // Timings on an Intel i5-3450 quad core machine
60 // for n = 1_000_000_000:
61 //
62 // TaskPool.reduce: 1.067 s
63 // std.algorithm.reduce: 4.011 s
64
65 enum n = 1_000_000;
66 enum delta = 1.0 / n;
67
68 alias getTerm = (int i)
69 {
70 immutable x = ( i - 0.5 ) * delta;
71 return delta / ( 1.0 + x * x ) ;
72 };
73
74 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
75
76 assert(pi.approxEqual(3.1415926));
77 }
78
79 import core.atomic;
80 import core.memory;
81 import core.sync.condition;
82 import core.thread;
83
84 import std.functional;
85 import std.meta;
86 import std.range.primitives;
87 import std.traits;
88
version(OSX)89 version (OSX)
90 {
91 version = useSysctlbyname;
92 }
version(FreeBSD)93 else version (FreeBSD)
94 {
95 version = useSysctlbyname;
96 }
version(DragonFlyBSD)97 else version (DragonFlyBSD)
98 {
99 version = useSysctlbyname;
100 }
version(NetBSD)101 else version (NetBSD)
102 {
103 version = useSysctlbyname;
104 }
105
106
version(Windows)107 version (Windows)
108 {
109 // BUGS: Only works on Windows 2000 and above.
110 shared static this()
111 {
112 import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
113 import std.algorithm.comparison : max;
114
115 SYSTEM_INFO si;
116 GetSystemInfo(&si);
117 totalCPUs = max(1, cast(uint) si.dwNumberOfProcessors);
118 }
119
120 }
version(linux)121 else version (linux)
122 {
123 shared static this()
124 {
125 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
126 totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
127 }
128 }
version(Solaris)129 else version (Solaris)
130 {
131 shared static this()
132 {
133 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
134 totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
135 }
136 }
version(useSysctlbyname)137 else version (useSysctlbyname)
138 {
139 extern(C) int sysctlbyname(
140 const char *, void *, size_t *, void *, size_t
141 );
142
143 shared static this()
144 {
145 version (OSX)
146 {
147 auto nameStr = "machdep.cpu.core_count\0".ptr;
148 }
149 else version (FreeBSD)
150 {
151 auto nameStr = "hw.ncpu\0".ptr;
152 }
153 else version (DragonFlyBSD)
154 {
155 auto nameStr = "hw.ncpu\0".ptr;
156 }
157 else version (NetBSD)
158 {
159 auto nameStr = "hw.ncpu\0".ptr;
160 }
161
162 uint ans;
163 size_t len = uint.sizeof;
164 sysctlbyname(nameStr, &ans, &len, null, 0);
165 totalCPUs = ans;
166 }
167
168 }
169 else
170 {
171 static assert(0, "Don't know how to get N CPUs on this OS.");
172 }
173
174 immutable size_t cacheLineSize;
this()175 shared static this()
176 {
177 import core.cpuid : datacache;
178 size_t lineSize = 0;
179 foreach (cachelevel; datacache)
180 {
181 if (cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max)
182 {
183 lineSize = cachelevel.lineSize;
184 }
185 }
186
187 cacheLineSize = lineSize;
188 }
189
190
191 /* Atomics code. These forward to core.atomic, but are written like this
192 for two reasons:
193
194 1. They used to actually contain ASM code and I don' want to have to change
195 to directly calling core.atomic in a zillion different places.
196
197 2. core.atomic has some misc. issues that make my use cases difficult
198 without wrapping it. If I didn't wrap it, casts would be required
199 basically everywhere.
200 */
201 private void atomicSetUbyte(T)(ref T stuff, T newVal)
202 if (__traits(isIntegral, T) && is(T : ubyte))
203 {
204 //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
205 atomicStore(*(cast(shared) &stuff), newVal);
206 }
207
208 private ubyte atomicReadUbyte(T)(ref T val)
209 if (__traits(isIntegral, T) && is(T : ubyte))
210 {
211 return atomicLoad(*(cast(shared) &val));
212 }
213
214 // This gets rid of the need for a lot of annoying casts in other parts of the
215 // code, when enums are involved.
216 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
217 if (__traits(isIntegral, T) && is(T : ubyte))
218 {
219 return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
220 }
221
222 /*--------------------- Generic helper functions, etc.------------------------*/
MapType(R,functions...)223 private template MapType(R, functions...)
224 {
225 static assert(functions.length);
226
227 ElementType!R e = void;
228 alias MapType =
229 typeof(adjoin!(staticMap!(unaryFun, functions))(e));
230 }
231
ReduceType(alias fun,R,E)232 private template ReduceType(alias fun, R, E)
233 {
234 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
235 }
236
noUnsharedAliasing(T)237 private template noUnsharedAliasing(T)
238 {
239 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
240 }
241
242 // This template tests whether a function may be executed in parallel from
243 // @safe code via Task.executeInNewThread(). There is an additional
244 // requirement for executing it via a TaskPool. (See isSafeReturn).
isSafeTask(F)245 private template isSafeTask(F)
246 {
247 enum bool isSafeTask =
248 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
249 (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
250 (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
251 allSatisfy!(noUnsharedAliasing, Parameters!F);
252 }
253
254 @safe unittest
255 {
256 alias F1 = void function() @safe;
257 alias F2 = void function();
258 alias F3 = void function(uint, string) @trusted;
259 alias F4 = void function(uint, char[]);
260
261 static assert( isSafeTask!F1);
262 static assert(!isSafeTask!F2);
263 static assert( isSafeTask!F3);
264 static assert(!isSafeTask!F4);
265
266 alias F5 = uint[] function(uint, string) pure @trusted;
267 static assert( isSafeTask!F5);
268 }
269
270 // This function decides whether Tasks that meet all of the other requirements
271 // for being executed from @safe code can be executed on a TaskPool.
272 // When executing via TaskPool, it's theoretically possible
273 // to return a value that is also pointed to by a worker thread's thread local
274 // storage. When executing from executeInNewThread(), the thread that executed
275 // the Task is terminated by the time the return value is visible in the calling
276 // thread, so this is a non-issue. It's also a non-issue for pure functions
277 // since they can't read global state.
isSafeReturn(T)278 private template isSafeReturn(T)
279 {
280 static if (!hasUnsharedAliasing!(T.ReturnType))
281 {
282 enum isSafeReturn = true;
283 }
284 else static if (T.isPure)
285 {
286 enum isSafeReturn = true;
287 }
288 else
289 {
290 enum isSafeReturn = false;
291 }
292 }
293
randAssignable(R)294 private template randAssignable(R)
295 {
296 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
297 }
298
299 private enum TaskStatus : ubyte
300 {
301 notStarted,
302 inProgress,
303 done
304 }
305
AliasReturn(alias fun,T...)306 private template AliasReturn(alias fun, T...)
307 {
308 alias AliasReturn = typeof({ T args; return fun(args); });
309 }
310
311 // Should be private, but std.algorithm.reduce is used in the zero-thread case
312 // and won't work w/ private.
reduceAdjoin(functions...)313 template reduceAdjoin(functions...)
314 {
315 static if (functions.length == 1)
316 {
317 alias reduceAdjoin = binaryFun!(functions[0]);
318 }
319 else
320 {
321 T reduceAdjoin(T, U)(T lhs, U rhs)
322 {
323 alias funs = staticMap!(binaryFun, functions);
324
325 foreach (i, Unused; typeof(lhs.expand))
326 {
327 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
328 }
329
330 return lhs;
331 }
332 }
333 }
334
reduceFinish(functions...)335 private template reduceFinish(functions...)
336 {
337 static if (functions.length == 1)
338 {
339 alias reduceFinish = binaryFun!(functions[0]);
340 }
341 else
342 {
343 T reduceFinish(T)(T lhs, T rhs)
344 {
345 alias funs = staticMap!(binaryFun, functions);
346
347 foreach (i, Unused; typeof(lhs.expand))
348 {
349 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
350 }
351
352 return lhs;
353 }
354 }
355 }
356
357 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
358 {
359 enum isRoundRobin = true;
360 }
361
isRoundRobin(T)362 private template isRoundRobin(T)
363 {
364 enum isRoundRobin = false;
365 }
366
367 @safe unittest
368 {
369 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
370 static assert(!isRoundRobin!(uint));
371 }
372
373 // This is the base "class" for all of the other tasks. Using C-style
374 // polymorphism to allow more direct control over memory allocation, etc.
375 private struct AbstractTask
376 {
377 AbstractTask* prev;
378 AbstractTask* next;
379
380 // Pointer to a function that executes this task.
381 void function(void*) runTask;
382
383 Throwable exception;
384 ubyte taskStatus = TaskStatus.notStarted;
385
doneAbstractTask386 bool done() @property
387 {
388 if (atomicReadUbyte(taskStatus) == TaskStatus.done)
389 {
390 if (exception)
391 {
392 throw exception;
393 }
394
395 return true;
396 }
397
398 return false;
399 }
400
jobAbstractTask401 void job()
402 {
403 runTask(&this);
404 }
405 }
406
407 /**
408 $(D Task) represents the fundamental unit of work. A $(D Task) may be
409 executed in parallel with any other $(D Task). Using this struct directly
410 allows future/promise _parallelism. In this paradigm, a function (or delegate
411 or other callable) is executed in a thread other than the one it was called
412 from. The calling thread does not block while the function is being executed.
413 A call to $(D workForce), $(D yieldForce), or $(D spinForce) is used to
414 ensure that the $(D Task) has finished executing and to obtain the return
415 value, if any. These functions and $(D done) also act as full memory barriers,
416 meaning that any memory writes made in the thread that executed the $(D Task)
417 are guaranteed to be visible in the calling thread after one of these functions
418 returns.
419
420 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
421 be used to create an instance of this struct. See $(D task) for usage examples.
422
423 Function results are returned from $(D yieldForce), $(D spinForce) and
424 $(D workForce) by ref. If $(D fun) returns by ref, the reference will point
425 to the returned reference of $(D fun). Otherwise it will point to a
426 field in this struct.
427
428 Copying of this struct is disabled, since it would provide no useful semantics.
429 If you want to pass this struct around, you should do so by reference or
430 pointer.
431
432 Bugs: Changes to $(D ref) and $(D out) arguments are not propagated to the
433 call site, only to $(D args) in this struct.
434 */
Task(alias fun,Args...)435 struct Task(alias fun, Args...)
436 {
437 AbstractTask base = {runTask : &impl};
438 alias base this;
439
440 private @property AbstractTask* basePtr()
441 {
442 return &base;
443 }
444
445 private static void impl(void* myTask)
446 {
447 import std.algorithm.internal : addressOf;
448
449 Task* myCastedTask = cast(typeof(this)*) myTask;
450 static if (is(ReturnType == void))
451 {
452 fun(myCastedTask._args);
453 }
454 else static if (is(typeof(addressOf(fun(myCastedTask._args)))))
455 {
456 myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
457 }
458 else
459 {
460 myCastedTask.returnVal = fun(myCastedTask._args);
461 }
462 }
463
464 private TaskPool pool;
465 private bool isScoped; // True if created with scopedTask.
466
467 Args _args;
468
469 /**
470 The arguments the function was called with. Changes to $(D out) and
471 $(D ref) arguments will be visible here.
472 */
473 static if (__traits(isSame, fun, run))
474 {
475 alias args = _args[1..$];
476 }
477 else
478 {
479 alias args = _args;
480 }
481
482
483 // The purpose of this code is to decide whether functions whose
484 // return values have unshared aliasing can be executed via
485 // TaskPool from @safe code. See isSafeReturn.
486 static if (__traits(isSame, fun, run))
487 {
488 static if (isFunctionPointer!(_args[0]))
489 {
490 private enum bool isPure =
491 functionAttributes!(Args[0]) & FunctionAttribute.pure_;
492 }
493 else
494 {
495 // BUG: Should check this for delegates too, but std.traits
496 // apparently doesn't allow this. isPure is irrelevant
497 // for delegates, at least for now since shared delegates
498 // don't work.
499 private enum bool isPure = false;
500 }
501
502 }
503 else
504 {
505 // We already know that we can't execute aliases in @safe code, so
506 // just put a dummy value here.
507 private enum bool isPure = false;
508 }
509
510
511 /**
512 The return type of the function called by this $(D Task). This can be
513 $(D void).
514 */
515 alias ReturnType = typeof(fun(_args));
516
517 static if (!is(ReturnType == void))
518 {
519 static if (is(typeof(&fun(_args))))
520 {
521 // Ref return.
522 ReturnType* returnVal;
523
524 ref ReturnType fixRef(ReturnType* val)
525 {
526 return *val;
527 }
528
529 }
530 else
531 {
532 ReturnType returnVal;
533
534 ref ReturnType fixRef(ref ReturnType val)
535 {
536 return val;
537 }
538 }
539 }
540
541 private void enforcePool()
542 {
543 import std.exception : enforce;
544 enforce(this.pool !is null, "Job not submitted yet.");
545 }
546
547 static if (Args.length > 0)
548 {
549 private this(Args args)
550 {
551 _args = args;
552 }
553 }
554
555 // Work around DMD bug 6588, allow immutable elements.
556 static if (allSatisfy!(isAssignable, Args))
557 {
558 typeof(this) opAssign(typeof(this) rhs)
559 {
560 foreach (i, Type; typeof(this.tupleof))
561 {
562 this.tupleof[i] = rhs.tupleof[i];
563 }
564 return this;
565 }
566 }
567 else
568 {
569 @disable typeof(this) opAssign(typeof(this) rhs)
570 {
571 assert(0);
572 }
573 }
574
575 /**
576 If the $(D Task) isn't started yet, execute it in the current thread.
577 If it's done, return its return value, if any. If it's in progress,
578 busy spin until it's done, then return the return value. If it threw
579 an exception, rethrow that exception.
580
581 This function should be used when you expect the result of the
582 $(D Task) to be available on a timescale shorter than that of an OS
583 context switch.
584 */
585 @property ref ReturnType spinForce() @trusted
586 {
587 enforcePool();
588
589 this.pool.tryDeleteExecute(basePtr);
590
591 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
592
593 if (exception)
594 {
595 throw exception;
596 }
597
598 static if (!is(ReturnType == void))
599 {
600 return fixRef(this.returnVal);
601 }
602 }
603
604 /**
605 If the $(D Task) isn't started yet, execute it in the current thread.
606 If it's done, return its return value, if any. If it's in progress,
607 wait on a condition variable. If it threw an exception, rethrow that
608 exception.
609
610 This function should be used for expensive functions, as waiting on a
611 condition variable introduces latency, but avoids wasted CPU cycles.
612 */
613 @property ref ReturnType yieldForce() @trusted
614 {
615 enforcePool();
616 this.pool.tryDeleteExecute(basePtr);
617
618 if (done)
619 {
620 static if (is(ReturnType == void))
621 {
622 return;
623 }
624 else
625 {
626 return fixRef(this.returnVal);
627 }
628 }
629
630 pool.waiterLock();
631 scope(exit) pool.waiterUnlock();
632
633 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
634 {
635 pool.waitUntilCompletion();
636 }
637
638 if (exception)
639 {
640 throw exception; // nocoverage
641 }
642
643 static if (!is(ReturnType == void))
644 {
645 return fixRef(this.returnVal);
646 }
647 }
648
649 /**
650 If this $(D Task) was not started yet, execute it in the current
651 thread. If it is finished, return its result. If it is in progress,
652 execute any other $(D Task) from the $(D TaskPool) instance that
653 this $(D Task) was submitted to until this one
654 is finished. If it threw an exception, rethrow that exception.
655 If no other tasks are available or this $(D Task) was executed using
656 $(D executeInNewThread), wait on a condition variable.
657 */
658 @property ref ReturnType workForce() @trusted
659 {
660 enforcePool();
661 this.pool.tryDeleteExecute(basePtr);
662
663 while (true)
664 {
665 if (done) // done() implicitly checks for exceptions.
666 {
667 static if (is(ReturnType == void))
668 {
669 return;
670 }
671 else
672 {
673 return fixRef(this.returnVal);
674 }
675 }
676
677 AbstractTask* job;
678 {
679 // Locking explicitly and calling popNoSync() because
680 // pop() waits on a condition variable if there are no Tasks
681 // in the queue.
682
683 pool.queueLock();
684 scope(exit) pool.queueUnlock();
685 job = pool.popNoSync();
686 }
687
688
689 if (job !is null)
690 {
691
692 version (verboseUnittest)
693 {
694 stderr.writeln("Doing workForce work.");
695 }
696
697 pool.doJob(job);
698
699 if (done)
700 {
701 static if (is(ReturnType == void))
702 {
703 return;
704 }
705 else
706 {
707 return fixRef(this.returnVal);
708 }
709 }
710 }
711 else
712 {
713 version (verboseUnittest)
714 {
715 stderr.writeln("Yield from workForce.");
716 }
717
718 return yieldForce;
719 }
720 }
721 }
722
723 /**
724 Returns $(D true) if the $(D Task) is finished executing.
725
726 Throws: Rethrows any exception thrown during the execution of the
727 $(D Task).
728 */
729 @property bool done() @trusted
730 {
731 // Explicitly forwarded for documentation purposes.
732 return base.done;
733 }
734
735 /**
736 Create a new thread for executing this $(D Task), execute it in the
737 newly created thread, then terminate the thread. This can be used for
738 future/promise parallelism. An explicit priority may be given
739 to the $(D Task). If one is provided, its value is forwarded to
740 $(D core.thread.Thread.priority). See $(REF task, std,parallelism) for
741 usage example.
742 */
743 void executeInNewThread() @trusted
744 {
745 pool = new TaskPool(basePtr);
746 }
747
748 /// Ditto
749 void executeInNewThread(int priority) @trusted
750 {
751 pool = new TaskPool(basePtr, priority);
752 }
753
754 @safe ~this()
755 {
756 if (isScoped && pool !is null && taskStatus != TaskStatus.done)
757 {
758 yieldForce;
759 }
760 }
761
762 // When this is uncommented, it somehow gets called on returning from
763 // scopedTask even though the struct shouldn't be getting copied.
764 //@disable this(this) {}
765 }
766
767 // Calls $(D fpOrDelegate) with $(D args). This is an
768 // adapter that makes $(D Task) work with delegates, function pointers and
769 // functors instead of just aliases.
770 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
771 {
772 return fpOrDelegate(args);
773 }
774
775 /**
776 Creates a $(D Task) on the GC heap that calls an alias. This may be executed
777 via $(D Task.executeInNewThread) or by submitting to a
778 $(REF TaskPool, std,parallelism). A globally accessible instance of
779 $(D TaskPool) is provided by $(REF taskPool, std,parallelism).
780
781 Returns: A pointer to the $(D Task).
782
783 Example:
784 ---
785 // Read two files into memory at the same time.
786 import std.file;
787
788 void main()
789 {
790 // Create and execute a Task for reading
791 // foo.txt.
792 auto file1Task = task!read("foo.txt");
793 file1Task.executeInNewThread();
794
795 // Read bar.txt in parallel.
796 auto file2Data = read("bar.txt");
797
798 // Get the results of reading foo.txt.
799 auto file1Data = file1Task.yieldForce;
800 }
801 ---
802
803 ---
804 // Sorts an array using a parallel quick sort algorithm.
805 // The first partition is done serially. Both recursion
806 // branches are then executed in parallel.
807 //
808 // Timings for sorting an array of 1,000,000 doubles on
809 // an Athlon 64 X2 dual core machine:
810 //
811 // This implementation: 176 milliseconds.
812 // Equivalent serial implementation: 280 milliseconds
813 void parallelSort(T)(T[] data)
814 {
815 // Sort small subarrays serially.
816 if (data.length < 100)
817 {
818 std.algorithm.sort(data);
819 return;
820 }
821
822 // Partition the array.
823 swap(data[$ / 2], data[$ - 1]);
824 auto pivot = data[$ - 1];
825 bool lessThanPivot(T elem) { return elem < pivot; }
826
827 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
828 swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
829
830 auto less = data[0..$ - greaterEqual.length - 1];
831 greaterEqual = data[$ - greaterEqual.length..$];
832
833 // Execute both recursion branches in parallel.
834 auto recurseTask = task!parallelSort(greaterEqual);
835 taskPool.put(recurseTask);
836 parallelSort(less);
837 recurseTask.yieldForce;
838 }
839 ---
840 */
task(alias fun,Args...)841 auto task(alias fun, Args...)(Args args)
842 {
843 return new Task!(fun, Args)(args);
844 }
845
846 /**
847 Creates a $(D Task) on the GC heap that calls a function pointer, delegate, or
848 class/struct with overloaded opCall.
849
850 Example:
851 ---
852 // Read two files in at the same time again,
853 // but this time use a function pointer instead
854 // of an alias to represent std.file.read.
855 import std.file;
856
857 void main()
858 {
859 // Create and execute a Task for reading
860 // foo.txt.
861 auto file1Task = task(&read, "foo.txt");
862 file1Task.executeInNewThread();
863
864 // Read bar.txt in parallel.
865 auto file2Data = read("bar.txt");
866
867 // Get the results of reading foo.txt.
868 auto file1Data = file1Task.yieldForce;
869 }
870 ---
871
872 Notes: This function takes a non-scope delegate, meaning it can be
873 used with closures. If you can't allocate a closure due to objects
874 on the stack that have scoped destruction, see $(D scopedTask), which
875 takes a scope delegate.
876 */
877 auto task(F, Args...)(F delegateOrFp, Args args)
878 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
879 {
880 return new Task!(run, F, Args)(delegateOrFp, args);
881 }
882
883 /**
884 Version of $(D task) usable from $(D @safe) code. Usage mechanics are
885 identical to the non-@safe case, but safety introduces some restrictions:
886
887 1. $(D fun) must be @safe or @trusted.
888
889 2. $(D F) must not have any unshared aliasing as defined by
890 $(REF hasUnsharedAliasing, std,traits). This means it
891 may not be an unshared delegate or a non-shared class or struct
892 with overloaded $(D opCall). This also precludes accepting template
893 alias parameters.
894
895 3. $(D Args) must not have unshared aliasing.
896
897 4. $(D fun) must not return by reference.
898
899 5. The return type must not have unshared aliasing unless $(D fun) is
900 $(D pure) or the $(D Task) is executed via $(D executeInNewThread) instead
901 of using a $(D TaskPool).
902
903 */
904 @trusted auto task(F, Args...)(F fun, Args args)
905 if (is(typeof(fun(args))) && isSafeTask!F)
906 {
907 return new Task!(run, F, Args)(fun, args);
908 }
909
910 /**
911 These functions allow the creation of $(D Task) objects on the stack rather
912 than the GC heap. The lifetime of a $(D Task) created by $(D scopedTask)
913 cannot exceed the lifetime of the scope it was created in.
914
915 $(D scopedTask) might be preferred over $(D task):
916
917 1. When a $(D Task) that calls a delegate is being created and a closure
918 cannot be allocated due to objects on the stack that have scoped
919 destruction. The delegate overload of $(D scopedTask) takes a $(D scope)
920 delegate.
921
922 2. As a micro-optimization, to avoid the heap allocation associated with
923 $(D task) or with the creation of a closure.
924
925 Usage is otherwise identical to $(D task).
926
927 Notes: $(D Task) objects created using $(D scopedTask) will automatically
928 call $(D Task.yieldForce) in their destructor if necessary to ensure
929 the $(D Task) is complete before the stack frame they reside on is destroyed.
930 */
scopedTask(alias fun,Args...)931 auto scopedTask(alias fun, Args...)(Args args)
932 {
933 auto ret = Task!(fun, Args)(args);
934 ret.isScoped = true;
935 return ret;
936 }
937
938 /// Ditto
939 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
940 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
941 {
942 auto ret = Task!(run, F, Args)(delegateOrFp, args);
943 ret.isScoped = true;
944 return ret;
945 }
946
947 /// Ditto
948 @trusted auto scopedTask(F, Args...)(F fun, Args args)
949 if (is(typeof(fun(args))) && isSafeTask!F)
950 {
951 auto ret = Task!(run, F, Args)(fun, args);
952 ret.isScoped = true;
953 return ret;
954 }
955
956 /**
957 The total number of CPU cores available on the current machine, as reported by
958 the operating system.
959 */
960 immutable uint totalCPUs;
961
962 /*
963 This class serves two purposes:
964
965 1. It distinguishes std.parallelism threads from other threads so that
966 the std.parallelism daemon threads can be terminated.
967
968 2. It adds a reference to the pool that the thread is a member of,
969 which is also necessary to allow the daemon threads to be properly
970 terminated.
971 */
972 private final class ParallelismThread : Thread
973 {
this(void delegate ()dg)974 this(void delegate() dg)
975 {
976 super(dg);
977 }
978
979 TaskPool pool;
980 }
981
982 // Kill daemon threads.
~this()983 shared static ~this()
984 {
985 foreach (ref thread; Thread)
986 {
987 auto pthread = cast(ParallelismThread) thread;
988 if (pthread is null) continue;
989 auto pool = pthread.pool;
990 if (!pool.isDaemon) continue;
991 pool.stop();
992 pthread.join();
993 }
994 }
995
996 /**
997 This class encapsulates a task queue and a set of worker threads. Its purpose
998 is to efficiently map a large number of $(D Task)s onto a smaller number of
999 threads. A task queue is a FIFO queue of $(D Task) objects that have been
1000 submitted to the $(D TaskPool) and are awaiting execution. A worker thread is a
1001 thread that executes the $(D Task) at the front of the queue when one is
1002 available and sleeps when the queue is empty.
1003
1004 This class should usually be used via the global instantiation
1005 available via the $(REF taskPool, std,parallelism) property.
1006 Occasionally it is useful to explicitly instantiate a $(D TaskPool):
1007
1008 1. When you want $(D TaskPool) instances with multiple priorities, for example
1009 a low priority pool and a high priority pool.
1010
1011 2. When the threads in the global task pool are waiting on a synchronization
1012 primitive (for example a mutex), and you want to parallelize the code that
1013 needs to run before these threads can be resumed.
1014 */
1015 final class TaskPool
1016 {
1017 private:
1018
1019 // A pool can either be a regular pool or a single-task pool. A
1020 // single-task pool is a dummy pool that's fired up for
1021 // Task.executeInNewThread().
1022 bool isSingleTask;
1023
1024 ParallelismThread[] pool;
1025 Thread singleTaskThread;
1026
1027 AbstractTask* head;
1028 AbstractTask* tail;
1029 PoolState status = PoolState.running;
1030 Condition workerCondition;
1031 Condition waiterCondition;
1032 Mutex queueMutex;
1033 Mutex waiterMutex; // For waiterCondition
1034
1035 // The instanceStartIndex of the next instance that will be created.
1036 __gshared static size_t nextInstanceIndex = 1;
1037
1038 // The index of the current thread.
1039 static size_t threadIndex;
1040
1041 // The index of the first thread in this instance.
1042 immutable size_t instanceStartIndex;
1043
1044 // The index that the next thread to be initialized in this pool will have.
1045 size_t nextThreadIndex;
1046
1047 enum PoolState : ubyte
1048 {
1049 running,
1050 finishing,
1051 stopNow
1052 }
1053
doJob(AbstractTask * job)1054 void doJob(AbstractTask* job)
1055 {
1056 assert(job.taskStatus == TaskStatus.inProgress);
1057 assert(job.next is null);
1058 assert(job.prev is null);
1059
1060 scope(exit)
1061 {
1062 if (!isSingleTask)
1063 {
1064 waiterLock();
1065 scope(exit) waiterUnlock();
1066 notifyWaiters();
1067 }
1068 }
1069
1070 try
1071 {
1072 job.job();
1073 }
1074 catch (Throwable e)
1075 {
1076 job.exception = e;
1077 }
1078
1079 atomicSetUbyte(job.taskStatus, TaskStatus.done);
1080 }
1081
1082 // This function is used for dummy pools created by Task.executeInNewThread().
doSingleTask()1083 void doSingleTask()
1084 {
1085 // No synchronization. Pool is guaranteed to only have one thread,
1086 // and the queue is submitted to before this thread is created.
1087 assert(head);
1088 auto t = head;
1089 t.next = t.prev = head = null;
1090 doJob(t);
1091 }
1092
1093 // This function performs initialization for each thread that affects
1094 // thread local storage and therefore must be done from within the
1095 // worker thread. It then calls executeWorkLoop().
startWorkLoop()1096 void startWorkLoop()
1097 {
1098 // Initialize thread index.
1099 {
1100 queueLock();
1101 scope(exit) queueUnlock();
1102 threadIndex = nextThreadIndex;
1103 nextThreadIndex++;
1104 }
1105
1106 executeWorkLoop();
1107 }
1108
1109 // This is the main work loop that worker threads spend their time in
1110 // until they terminate. It's also entered by non-worker threads when
1111 // finish() is called with the blocking variable set to true.
executeWorkLoop()1112 void executeWorkLoop()
1113 {
1114 while (atomicReadUbyte(status) != PoolState.stopNow)
1115 {
1116 AbstractTask* task = pop();
1117 if (task is null)
1118 {
1119 if (atomicReadUbyte(status) == PoolState.finishing)
1120 {
1121 atomicSetUbyte(status, PoolState.stopNow);
1122 return;
1123 }
1124 }
1125 else
1126 {
1127 doJob(task);
1128 }
1129 }
1130 }
1131
1132 // Pop a task off the queue.
pop()1133 AbstractTask* pop()
1134 {
1135 queueLock();
1136 scope(exit) queueUnlock();
1137 auto ret = popNoSync();
1138 while (ret is null && status == PoolState.running)
1139 {
1140 wait();
1141 ret = popNoSync();
1142 }
1143 return ret;
1144 }
1145
popNoSync()1146 AbstractTask* popNoSync()
1147 out(returned)
1148 {
1149 /* If task.prev and task.next aren't null, then another thread
1150 * can try to delete this task from the pool after it's
1151 * alreadly been deleted/popped.
1152 */
1153 if (returned !is null)
1154 {
1155 assert(returned.next is null);
1156 assert(returned.prev is null);
1157 }
1158 }
1159 body
1160 {
1161 if (isSingleTask) return null;
1162
1163 AbstractTask* returned = head;
1164 if (head !is null)
1165 {
1166 head = head.next;
1167 returned.prev = null;
1168 returned.next = null;
1169 returned.taskStatus = TaskStatus.inProgress;
1170 }
1171 if (head !is null)
1172 {
1173 head.prev = null;
1174 }
1175
1176 return returned;
1177 }
1178
1179 // Push a task onto the queue.
abstractPut(AbstractTask * task)1180 void abstractPut(AbstractTask* task)
1181 {
1182 queueLock();
1183 scope(exit) queueUnlock();
1184 abstractPutNoSync(task);
1185 }
1186
abstractPutNoSync(AbstractTask * task)1187 void abstractPutNoSync(AbstractTask* task)
1188 in
1189 {
1190 assert(task);
1191 }
1192 out
1193 {
1194 import std.conv : text;
1195
1196 assert(tail.prev !is tail);
1197 assert(tail.next is null, text(tail.prev, '\t', tail.next));
1198 if (tail.prev !is null)
1199 {
1200 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1201 }
1202 }
1203 body
1204 {
1205 // Not using enforce() to save on function call overhead since this
1206 // is a performance critical function.
1207 if (status != PoolState.running)
1208 {
1209 throw new Error(
1210 "Cannot submit a new task to a pool after calling " ~
1211 "finish() or stop()."
1212 );
1213 }
1214
1215 task.next = null;
1216 if (head is null) //Queue is empty.
1217 {
1218 head = task;
1219 tail = task;
1220 tail.prev = null;
1221 }
1222 else
1223 {
1224 assert(tail);
1225 task.prev = tail;
1226 tail.next = task;
1227 tail = task;
1228 }
1229 notify();
1230 }
1231
abstractPutGroupNoSync(AbstractTask * h,AbstractTask * t)1232 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1233 {
1234 if (status != PoolState.running)
1235 {
1236 throw new Error(
1237 "Cannot submit a new task to a pool after calling " ~
1238 "finish() or stop()."
1239 );
1240 }
1241
1242 if (head is null)
1243 {
1244 head = h;
1245 tail = t;
1246 }
1247 else
1248 {
1249 h.prev = tail;
1250 tail.next = h;
1251 tail = t;
1252 }
1253
1254 notifyAll();
1255 }
1256
tryDeleteExecute(AbstractTask * toExecute)1257 void tryDeleteExecute(AbstractTask* toExecute)
1258 {
1259 if (isSingleTask) return;
1260
1261 if ( !deleteItem(toExecute) )
1262 {
1263 return;
1264 }
1265
1266 try
1267 {
1268 toExecute.job();
1269 }
1270 catch (Exception e)
1271 {
1272 toExecute.exception = e;
1273 }
1274
1275 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1276 }
1277
deleteItem(AbstractTask * item)1278 bool deleteItem(AbstractTask* item)
1279 {
1280 queueLock();
1281 scope(exit) queueUnlock();
1282 return deleteItemNoSync(item);
1283 }
1284
deleteItemNoSync(AbstractTask * item)1285 bool deleteItemNoSync(AbstractTask* item)
1286 {
1287 if (item.taskStatus != TaskStatus.notStarted)
1288 {
1289 return false;
1290 }
1291 item.taskStatus = TaskStatus.inProgress;
1292
1293 if (item is head)
1294 {
1295 // Make sure head gets set properly.
1296 popNoSync();
1297 return true;
1298 }
1299 if (item is tail)
1300 {
1301 tail = tail.prev;
1302 if (tail !is null)
1303 {
1304 tail.next = null;
1305 }
1306 item.next = null;
1307 item.prev = null;
1308 return true;
1309 }
1310 if (item.next !is null)
1311 {
1312 assert(item.next.prev is item); // Check queue consistency.
1313 item.next.prev = item.prev;
1314 }
1315 if (item.prev !is null)
1316 {
1317 assert(item.prev.next is item); // Check queue consistency.
1318 item.prev.next = item.next;
1319 }
1320 item.next = null;
1321 item.prev = null;
1322 return true;
1323 }
1324
queueLock()1325 void queueLock()
1326 {
1327 assert(queueMutex);
1328 if (!isSingleTask) queueMutex.lock();
1329 }
1330
queueUnlock()1331 void queueUnlock()
1332 {
1333 assert(queueMutex);
1334 if (!isSingleTask) queueMutex.unlock();
1335 }
1336
waiterLock()1337 void waiterLock()
1338 {
1339 if (!isSingleTask) waiterMutex.lock();
1340 }
1341
waiterUnlock()1342 void waiterUnlock()
1343 {
1344 if (!isSingleTask) waiterMutex.unlock();
1345 }
1346
wait()1347 void wait()
1348 {
1349 if (!isSingleTask) workerCondition.wait();
1350 }
1351
notify()1352 void notify()
1353 {
1354 if (!isSingleTask) workerCondition.notify();
1355 }
1356
notifyAll()1357 void notifyAll()
1358 {
1359 if (!isSingleTask) workerCondition.notifyAll();
1360 }
1361
waitUntilCompletion()1362 void waitUntilCompletion()
1363 {
1364 if (isSingleTask)
1365 {
1366 singleTaskThread.join();
1367 }
1368 else
1369 {
1370 waiterCondition.wait();
1371 }
1372 }
1373
notifyWaiters()1374 void notifyWaiters()
1375 {
1376 if (!isSingleTask) waiterCondition.notifyAll();
1377 }
1378
1379 // Private constructor for creating dummy pools that only have one thread,
1380 // only execute one Task, and then terminate. This is used for
1381 // Task.executeInNewThread().
1382 this(AbstractTask* task, int priority = int.max)
1383 {
1384 assert(task);
1385
1386 // Dummy value, not used.
1387 instanceStartIndex = 0;
1388
1389 this.isSingleTask = true;
1390 task.taskStatus = TaskStatus.inProgress;
1391 this.head = task;
1392 singleTaskThread = new Thread(&doSingleTask);
1393 singleTaskThread.start();
1394
1395 // Disabled until writing code to support
1396 // running thread with specified priority
1397 // See https://d.puremagic.com/issues/show_bug.cgi?id=8960
1398
1399 /*if (priority != int.max)
1400 {
1401 singleTaskThread.priority = priority;
1402 }*/
1403 }
1404
1405 public:
1406 // This is used in parallel_algorithm but is too unstable to document
1407 // as public API.
defaultWorkUnitSize(size_t rangeLen)1408 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1409 {
1410 import std.algorithm.comparison : max;
1411
1412 if (this.size == 0)
1413 {
1414 return rangeLen;
1415 }
1416
1417 immutable size_t eightSize = 4 * (this.size + 1);
1418 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1419 return max(ret, 1);
1420 }
1421
1422 /**
1423 Default constructor that initializes a $(D TaskPool) with
1424 $(D totalCPUs) - 1 worker threads. The minus 1 is included because the
1425 main thread will also be available to do work.
1426
1427 Note: On single-core machines, the primitives provided by $(D TaskPool)
1428 operate transparently in single-threaded mode.
1429 */
this()1430 this() @trusted
1431 {
1432 this(totalCPUs - 1);
1433 }
1434
1435 /**
1436 Allows for custom number of worker threads.
1437 */
this(size_t nWorkers)1438 this(size_t nWorkers) @trusted
1439 {
1440 synchronized(typeid(TaskPool))
1441 {
1442 instanceStartIndex = nextInstanceIndex;
1443
1444 // The first worker thread to be initialized will have this index,
1445 // and will increment it. The second worker to be initialized will
1446 // have this index plus 1.
1447 nextThreadIndex = instanceStartIndex;
1448 nextInstanceIndex += nWorkers;
1449 }
1450
1451 queueMutex = new Mutex(this);
1452 waiterMutex = new Mutex();
1453 workerCondition = new Condition(queueMutex);
1454 waiterCondition = new Condition(waiterMutex);
1455
1456 pool = new ParallelismThread[nWorkers];
1457 foreach (ref poolThread; pool)
1458 {
1459 poolThread = new ParallelismThread(&startWorkLoop);
1460 poolThread.pool = this;
1461 poolThread.start();
1462 }
1463 }
1464
1465 /**
1466 Implements a parallel foreach loop over a range. This works by implicitly
1467 creating and submitting one $(D Task) to the $(D TaskPool) for each worker
1468 thread. A work unit is a set of consecutive elements of $(D range) to
1469 be processed by a worker thread between communication with any other
1470 thread. The number of elements processed per work unit is controlled by the
1471 $(D workUnitSize) parameter. Smaller work units provide better load
1472 balancing, but larger work units avoid the overhead of communicating
1473 with other threads frequently to fetch the next work unit. Large work
1474 units also avoid false sharing in cases where the range is being modified.
1475 The less time a single iteration of the loop takes, the larger
1476 $(D workUnitSize) should be. For very expensive loop bodies,
1477 $(D workUnitSize) should be 1. An overload that chooses a default work
1478 unit size is also available.
1479
1480 Example:
1481 ---
1482 // Find the logarithm of every number from 1 to
1483 // 10_000_000 in parallel.
1484 auto logs = new double[10_000_000];
1485
1486 // Parallel foreach works with or without an index
1487 // variable. It can be iterate by ref if range.front
1488 // returns by ref.
1489
1490 // Iterate over logs using work units of size 100.
1491 foreach (i, ref elem; taskPool.parallel(logs, 100))
1492 {
1493 elem = log(i + 1.0);
1494 }
1495
1496 // Same thing, but use the default work unit size.
1497 //
1498 // Timings on an Athlon 64 X2 dual core machine:
1499 //
1500 // Parallel foreach: 388 milliseconds
1501 // Regular foreach: 619 milliseconds
1502 foreach (i, ref elem; taskPool.parallel(logs))
1503 {
1504 elem = log(i + 1.0);
1505 }
1506 ---
1507
1508 Notes:
1509
1510 The memory usage of this implementation is guaranteed to be constant
1511 in $(D range.length).
1512
1513 Breaking from a parallel foreach loop via a break, labeled break,
1514 labeled continue, return or goto statement throws a
1515 $(D ParallelForeachError).
1516
1517 In the case of non-random access ranges, parallel foreach buffers lazily
1518 to an array of size $(D workUnitSize) before executing the parallel portion
1519 of the loop. The exception is that, if a parallel foreach is executed
1520 over a range returned by $(D asyncBuf) or $(D map), the copying is elided
1521 and the buffers are simply swapped. In this case $(D workUnitSize) is
1522 ignored and the work unit size is set to the buffer size of $(D range).
1523
1524 A memory barrier is guaranteed to be executed on exit from the loop,
1525 so that results produced by all threads are visible in the calling thread.
1526
1527 $(B Exception Handling):
1528
1529 When at least one exception is thrown from inside a parallel foreach loop,
1530 the submission of additional $(D Task) objects is terminated as soon as
1531 possible, in a non-deterministic manner. All executing or
1532 enqueued work units are allowed to complete. Then, all exceptions that
1533 were thrown by any work unit are chained using $(D Throwable.next) and
1534 rethrown. The order of the exception chaining is non-deterministic.
1535 */
1536 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1537 {
1538 import std.exception : enforce;
1539 enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1540 alias RetType = ParallelForeach!R;
1541 return RetType(this, range, workUnitSize);
1542 }
1543
1544
1545 /// Ditto
1546 ParallelForeach!R parallel(R)(R range)
1547 {
1548 static if (hasLength!R)
1549 {
1550 // Default work unit size is such that we would use 4x as many
1551 // slots as are in this thread pool.
1552 size_t workUnitSize = defaultWorkUnitSize(range.length);
1553 return parallel(range, workUnitSize);
1554 }
1555 else
1556 {
1557 // Just use a really, really dumb guess if the user is too lazy to
1558 // specify.
1559 return parallel(range, 512);
1560 }
1561 }
1562
1563 ///
amap(functions...)1564 template amap(functions...)
1565 {
1566 /**
1567 Eager parallel map. The eagerness of this function means it has less
1568 overhead than the lazily evaluated $(D TaskPool.map) and should be
1569 preferred where the memory requirements of eagerness are acceptable.
1570 $(D functions) are the functions to be evaluated, passed as template
1571 alias parameters in a style similar to
1572 $(REF map, std,algorithm,iteration).
1573 The first argument must be a random access range. For performance
1574 reasons, amap will assume the range elements have not yet been
1575 initialized. Elements will be overwritten without calling a destructor
1576 nor doing an assignment. As such, the range must not contain meaningful
1577 data$(DDOC_COMMENT not a section): either un-initialized objects, or
1578 objects in their $(D .init) state.
1579
1580 ---
1581 auto numbers = iota(100_000_000.0);
1582
1583 // Find the square roots of numbers.
1584 //
1585 // Timings on an Athlon 64 X2 dual core machine:
1586 //
1587 // Parallel eager map: 0.802 s
1588 // Equivalent serial implementation: 1.768 s
1589 auto squareRoots = taskPool.amap!sqrt(numbers);
1590 ---
1591
1592 Immediately after the range argument, an optional work unit size argument
1593 may be provided. Work units as used by $(D amap) are identical to those
1594 defined for parallel foreach. If no work unit size is provided, the
1595 default work unit size is used.
1596
1597 ---
1598 // Same thing, but make work unit size 100.
1599 auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1600 ---
1601
1602 An output range for returning the results may be provided as the last
1603 argument. If one is not provided, an array of the proper type will be
1604 allocated on the garbage collected heap. If one is provided, it must be a
1605 random access range with assignable elements, must have reference
1606 semantics with respect to assignment to its elements, and must have the
1607 same length as the input range. Writing to adjacent elements from
1608 different threads must be safe.
1609
1610 ---
1611 // Same thing, but explicitly allocate an array
1612 // to return the results in. The element type
1613 // of the array may be either the exact type
1614 // returned by functions or an implicit conversion
1615 // target.
1616 auto squareRoots = new float[numbers.length];
1617 taskPool.amap!sqrt(numbers, squareRoots);
1618
1619 // Multiple functions, explicit output range, and
1620 // explicit work unit size.
1621 auto results = new Tuple!(float, real)[numbers.length];
1622 taskPool.amap!(sqrt, log)(numbers, 100, results);
1623 ---
1624
1625 Note:
1626
1627 A memory barrier is guaranteed to be executed after all results are written
1628 but before returning so that results produced by all threads are visible
1629 in the calling thread.
1630
1631 Tips:
1632
1633 To perform the mapping operation in place, provide the same range for the
1634 input and output range.
1635
1636 To parallelize the copying of a range with expensive to evaluate elements
1637 to an array, pass an identity function (a function that just returns
1638 whatever argument is provided to it) to $(D amap).
1639
1640 $(B Exception Handling):
1641
1642 When at least one exception is thrown from inside the map functions,
1643 the submission of additional $(D Task) objects is terminated as soon as
1644 possible, in a non-deterministic manner. All currently executing or
1645 enqueued work units are allowed to complete. Then, all exceptions that
1646 were thrown from any work unit are chained using $(D Throwable.next) and
1647 rethrown. The order of the exception chaining is non-deterministic.
1648 */
1649 auto amap(Args...)(Args args)
1650 if (isRandomAccessRange!(Args[0]))
1651 {
1652 import std.conv : emplaceRef;
1653
1654 alias fun = adjoin!(staticMap!(unaryFun, functions));
1655
1656 alias range = args[0];
1657 immutable len = range.length;
1658
1659 static if (
1660 Args.length > 1 &&
1661 randAssignable!(Args[$ - 1]) &&
1662 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1663 )
1664 {
1665 import std.conv : text;
1666 import std.exception : enforce;
1667
1668 alias buf = args[$ - 1];
1669 alias args2 = args[0..$ - 1];
1670 alias Args2 = Args[0..$ - 1];
1671 enforce(buf.length == len,
1672 text("Can't use a user supplied buffer that's the wrong ",
1673 "size. (Expected :", len, " Got: ", buf.length));
1674 }
1675 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1676 {
1677 static assert(0, "Wrong buffer type.");
1678 }
1679 else
1680 {
1681 import std.array : uninitializedArray;
1682
1683 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1684 alias args2 = args;
1685 alias Args2 = Args;
1686 }
1687
1688 if (!len) return buf;
1689
1690 static if (isIntegral!(Args2[$ - 1]))
1691 {
1692 static assert(args2.length == 2);
1693 auto workUnitSize = cast(size_t) args2[1];
1694 }
1695 else
1696 {
1697 static assert(args2.length == 1, Args);
1698 auto workUnitSize = defaultWorkUnitSize(range.length);
1699 }
1700
1701 alias R = typeof(range);
1702
1703 if (workUnitSize > len)
1704 {
1705 workUnitSize = len;
1706 }
1707
1708 // Handle as a special case:
1709 if (size == 0)
1710 {
1711 size_t index = 0;
1712 foreach (elem; range)
1713 {
1714 emplaceRef(buf[index++], fun(elem));
1715 }
1716 return buf;
1717 }
1718
1719 // Effectively -1: chunkIndex + 1 == 0:
1720 shared size_t workUnitIndex = size_t.max;
1721 shared bool shouldContinue = true;
1722
1723 void doIt()
1724 {
1725 import std.algorithm.comparison : min;
1726
1727 scope(failure)
1728 {
1729 // If an exception is thrown, all threads should bail.
1730 atomicStore(shouldContinue, false);
1731 }
1732
1733 while (atomicLoad(shouldContinue))
1734 {
1735 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1736 immutable start = workUnitSize * myUnitIndex;
1737 if (start >= len)
1738 {
1739 atomicStore(shouldContinue, false);
1740 break;
1741 }
1742
1743 immutable end = min(len, start + workUnitSize);
1744
1745 static if (hasSlicing!R)
1746 {
1747 auto subrange = range[start .. end];
1748 foreach (i; start .. end)
1749 {
1750 emplaceRef(buf[i], fun(subrange.front));
1751 subrange.popFront();
1752 }
1753 }
1754 else
1755 {
1756 foreach (i; start .. end)
1757 {
1758 emplaceRef(buf[i], fun(range[i]));
1759 }
1760 }
1761 }
1762 }
1763
1764 submitAndExecute(this, &doIt);
1765 return buf;
1766 }
1767 }
1768
1769 ///
map(functions...)1770 template map(functions...)
1771 {
1772 /**
1773 A semi-lazy parallel map that can be used for pipelining. The map
1774 functions are evaluated for the first $(D bufSize) elements and stored in a
1775 buffer and made available to $(D popFront). Meanwhile, in the
1776 background a second buffer of the same size is filled. When the first
1777 buffer is exhausted, it is swapped with the second buffer and filled while
1778 the values from what was originally the second buffer are read. This
1779 implementation allows for elements to be written to the buffer without
1780 the need for atomic operations or synchronization for each write, and
1781 enables the mapping function to be evaluated efficiently in parallel.
1782
1783 $(D map) has more overhead than the simpler procedure used by $(D amap)
1784 but avoids the need to keep all results in memory simultaneously and works
1785 with non-random access ranges.
1786
1787 Params:
1788
1789 source = The input range to be mapped. If $(D source) is not random
1790 access it will be lazily buffered to an array of size $(D bufSize) before
1791 the map function is evaluated. (For an exception to this rule, see Notes.)
1792
1793 bufSize = The size of the buffer to store the evaluated elements.
1794
1795 workUnitSize = The number of elements to evaluate in a single
1796 $(D Task). Must be less than or equal to $(D bufSize), and
1797 should be a fraction of $(D bufSize) such that all worker threads can be
1798 used. If the default of size_t.max is used, workUnitSize will be set to
1799 the pool-wide default.
1800
1801 Returns: An input range representing the results of the map. This range
1802 has a length iff $(D source) has a length.
1803
1804 Notes:
1805
1806 If a range returned by $(D map) or $(D asyncBuf) is used as an input to
1807 $(D map), then as an optimization the copying from the output buffer
1808 of the first range to the input buffer of the second range is elided, even
1809 though the ranges returned by $(D map) and $(D asyncBuf) are non-random
1810 access ranges. This means that the $(D bufSize) parameter passed to the
1811 current call to $(D map) will be ignored and the size of the buffer
1812 will be the buffer size of $(D source).
1813
1814 Example:
1815 ---
1816 // Pipeline reading a file, converting each line
1817 // to a number, taking the logarithms of the numbers,
1818 // and performing the additions necessary to find
1819 // the sum of the logarithms.
1820
1821 auto lineRange = File("numberList.txt").byLine();
1822 auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1823 auto nums = taskPool.map!(to!double)(dupedLines);
1824 auto logs = taskPool.map!log10(nums);
1825
1826 double sum = 0;
1827 foreach (elem; logs)
1828 {
1829 sum += elem;
1830 }
1831 ---
1832
1833 $(B Exception Handling):
1834
1835 Any exceptions thrown while iterating over $(D source)
1836 or computing the map function are re-thrown on a call to $(D popFront) or,
1837 if thrown during construction, are simply allowed to propagate to the
1838 caller. In the case of exceptions thrown while computing the map function,
1839 the exceptions are chained as in $(D TaskPool.amap).
1840 */
1841 auto
1842 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1843 if (isInputRange!S)
1844 {
1845 import std.exception : enforce;
1846
1847 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1848 "Work unit size must be smaller than buffer size.");
1849 alias fun = adjoin!(staticMap!(unaryFun, functions));
1850
1851 static final class Map
1852 {
1853 // This is a class because the task needs to be located on the
1854 // heap and in the non-random access case source needs to be on
1855 // the heap, too.
1856
1857 private:
1858 enum bufferTrick = is(typeof(source.buf1)) &&
1859 is(typeof(source.bufPos)) &&
1860 is(typeof(source.doBufSwap()));
1861
1862 alias E = MapType!(S, functions);
1863 E[] buf1, buf2;
1864 S source;
1865 TaskPool pool;
1866 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1867 size_t workUnitSize;
1868 size_t bufPos;
1869 bool lastTaskWaited;
1870
1871 static if (isRandomAccessRange!S)
1872 {
1873 alias FromType = S;
1874
1875 void popSource()
1876 {
1877 import std.algorithm.comparison : min;
1878
1879 static if (__traits(compiles, source[0 .. source.length]))
1880 {
1881 source = source[min(buf1.length, source.length)..source.length];
1882 }
1883 else static if (__traits(compiles, source[0..$]))
1884 {
1885 source = source[min(buf1.length, source.length)..$];
1886 }
1887 else
1888 {
1889 static assert(0, "S must have slicing for Map."
1890 ~ " " ~ S.stringof ~ " doesn't.");
1891 }
1892 }
1893 }
1894 else static if (bufferTrick)
1895 {
1896 // Make sure we don't have the buffer recycling overload of
1897 // asyncBuf.
1898 static if (
1899 is(typeof(source.source)) &&
1900 isRoundRobin!(typeof(source.source))
1901 )
1902 {
1903 static assert(0, "Cannot execute a parallel map on " ~
1904 "the buffer recycling overload of asyncBuf."
1905 );
1906 }
1907
1908 alias FromType = typeof(source.buf1);
1909 FromType from;
1910
1911 // Just swap our input buffer with source's output buffer.
1912 // No need to copy element by element.
1913 FromType dumpToFrom()
1914 {
1915 import std.algorithm.mutation : swap;
1916
1917 assert(source.buf1.length <= from.length);
1918 from.length = source.buf1.length;
1919 swap(source.buf1, from);
1920
1921 // Just in case this source has been popped before
1922 // being sent to map:
1923 from = from[source.bufPos..$];
1924
1925 static if (is(typeof(source._length)))
1926 {
1927 source._length -= (from.length - source.bufPos);
1928 }
1929
1930 source.doBufSwap();
1931
1932 return from;
1933 }
1934 }
1935 else
1936 {
1937 alias FromType = ElementType!S[];
1938
1939 // The temporary array that data is copied to before being
1940 // mapped.
1941 FromType from;
1942
1943 FromType dumpToFrom()
1944 {
1945 assert(from !is null);
1946
1947 size_t i;
1948 for (; !source.empty && i < from.length; source.popFront())
1949 {
1950 from[i++] = source.front;
1951 }
1952
1953 from = from[0 .. i];
1954 return from;
1955 }
1956 }
1957
1958 static if (hasLength!S)
1959 {
1960 size_t _length;
1961
1962 public @property size_t length() const @safe pure nothrow
1963 {
1964 return _length;
1965 }
1966 }
1967
1968 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
1969 {
1970 static if (bufferTrick)
1971 {
1972 bufSize = source.buf1.length;
1973 }
1974
1975 buf1.length = bufSize;
1976 buf2.length = bufSize;
1977
1978 static if (!isRandomAccessRange!S)
1979 {
1980 from.length = bufSize;
1981 }
1982
1983 this.workUnitSize = (workUnitSize == size_t.max) ?
1984 pool.defaultWorkUnitSize(bufSize) : workUnitSize;
1985 this.source = source;
1986 this.pool = pool;
1987
1988 static if (hasLength!S)
1989 {
1990 _length = source.length;
1991 }
1992
1993 buf1 = fillBuf(buf1);
1994 submitBuf2();
1995 }
1996
1997 // The from parameter is a dummy and ignored in the random access
1998 // case.
1999 E[] fillBuf(E[] buf)
2000 {
2001 import std.algorithm.comparison : min;
2002
2003 static if (isRandomAccessRange!S)
2004 {
2005 import std.range : take;
2006 auto toMap = take(source, buf.length);
2007 scope(success) popSource();
2008 }
2009 else
2010 {
2011 auto toMap = dumpToFrom();
2012 }
2013
2014 buf = buf[0 .. min(buf.length, toMap.length)];
2015
2016 // Handle as a special case:
2017 if (pool.size == 0)
2018 {
2019 size_t index = 0;
2020 foreach (elem; toMap)
2021 {
2022 buf[index++] = fun(elem);
2023 }
2024 return buf;
2025 }
2026
2027 pool.amap!functions(toMap, workUnitSize, buf);
2028
2029 return buf;
2030 }
2031
2032 void submitBuf2()
2033 in
2034 {
2035 assert(nextBufTask.prev is null);
2036 assert(nextBufTask.next is null);
2037 } body
2038 {
2039 // Hack to reuse the task object.
2040
2041 nextBufTask = typeof(nextBufTask).init;
2042 nextBufTask._args[0] = &fillBuf;
2043 nextBufTask._args[1] = buf2;
2044 pool.put(nextBufTask);
2045 }
2046
2047 void doBufSwap()
2048 {
2049 if (lastTaskWaited)
2050 {
2051 // Then the source is empty. Signal it here.
2052 buf1 = null;
2053 buf2 = null;
2054
2055 static if (!isRandomAccessRange!S)
2056 {
2057 from = null;
2058 }
2059
2060 return;
2061 }
2062
2063 buf2 = buf1;
2064 buf1 = nextBufTask.yieldForce;
2065 bufPos = 0;
2066
2067 if (source.empty)
2068 {
2069 lastTaskWaited = true;
2070 }
2071 else
2072 {
2073 submitBuf2();
2074 }
2075 }
2076
2077 public:
2078 @property auto front()
2079 {
2080 return buf1[bufPos];
2081 }
2082
2083 void popFront()
2084 {
2085 static if (hasLength!S)
2086 {
2087 _length--;
2088 }
2089
2090 bufPos++;
2091 if (bufPos >= buf1.length)
2092 {
2093 doBufSwap();
2094 }
2095 }
2096
2097 static if (isInfinite!S)
2098 {
2099 enum bool empty = false;
2100 }
2101 else
2102 {
2103
2104 bool empty() const @property
2105 {
2106 // popFront() sets this when source is empty
2107 return buf1.length == 0;
2108 }
2109 }
2110 }
2111 return new Map(source, bufSize, workUnitSize, this);
2112 }
2113 }
2114
2115 /**
2116 Given a $(D source) range that is expensive to iterate over, returns an
2117 input range that asynchronously buffers the contents of
2118 $(D source) into a buffer of $(D bufSize) elements in a worker thread,
2119 while making previously buffered elements from a second buffer, also of size
2120 $(D bufSize), available via the range interface of the returned
2121 object. The returned range has a length iff $(D hasLength!S).
2122 $(D asyncBuf) is useful, for example, when performing expensive operations
2123 on the elements of ranges that represent data on a disk or network.
2124
2125 Example:
2126 ---
2127 import std.conv, std.stdio;
2128
2129 void main()
2130 {
2131 // Fetch lines of a file in a background thread
2132 // while processing previously fetched lines,
2133 // dealing with byLine's buffer recycling by
2134 // eagerly duplicating every line.
2135 auto lines = File("foo.txt").byLine();
2136 auto duped = std.algorithm.map!"a.idup"(lines);
2137
2138 // Fetch more lines in the background while we
2139 // process the lines already read into memory
2140 // into a matrix of doubles.
2141 double[][] matrix;
2142 auto asyncReader = taskPool.asyncBuf(duped);
2143
2144 foreach (line; asyncReader)
2145 {
2146 auto ls = line.split("\t");
2147 matrix ~= to!(double[])(ls);
2148 }
2149 }
2150 ---
2151
2152 $(B Exception Handling):
2153
2154 Any exceptions thrown while iterating over $(D source) are re-thrown on a
2155 call to $(D popFront) or, if thrown during construction, simply
2156 allowed to propagate to the caller.
2157 */
2158 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2159 {
2160 static final class AsyncBuf
2161 {
2162 // This is a class because the task and source both need to be on
2163 // the heap.
2164
2165 // The element type of S.
2166 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs.
2167
2168 private:
2169 E[] buf1, buf2;
2170 S source;
2171 TaskPool pool;
2172 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2173 size_t bufPos;
2174 bool lastTaskWaited;
2175
2176 static if (hasLength!S)
2177 {
2178 size_t _length;
2179
2180 // Available if hasLength!S.
length()2181 public @property size_t length() const @safe pure nothrow
2182 {
2183 return _length;
2184 }
2185 }
2186
this(S source,size_t bufSize,TaskPool pool)2187 this(S source, size_t bufSize, TaskPool pool)
2188 {
2189 buf1.length = bufSize;
2190 buf2.length = bufSize;
2191
2192 this.source = source;
2193 this.pool = pool;
2194
2195 static if (hasLength!S)
2196 {
2197 _length = source.length;
2198 }
2199
2200 buf1 = fillBuf(buf1);
2201 submitBuf2();
2202 }
2203
fillBuf(E[]buf)2204 E[] fillBuf(E[] buf)
2205 {
2206 assert(buf !is null);
2207
2208 size_t i;
2209 for (; !source.empty && i < buf.length; source.popFront())
2210 {
2211 buf[i++] = source.front;
2212 }
2213
2214 buf = buf[0 .. i];
2215 return buf;
2216 }
2217
submitBuf2()2218 void submitBuf2()
2219 in
2220 {
2221 assert(nextBufTask.prev is null);
2222 assert(nextBufTask.next is null);
2223 } body
2224 {
2225 // Hack to reuse the task object.
2226
2227 nextBufTask = typeof(nextBufTask).init;
2228 nextBufTask._args[0] = &fillBuf;
2229 nextBufTask._args[1] = buf2;
2230 pool.put(nextBufTask);
2231 }
2232
doBufSwap()2233 void doBufSwap()
2234 {
2235 if (lastTaskWaited)
2236 {
2237 // Then source is empty. Signal it here.
2238 buf1 = null;
2239 buf2 = null;
2240 return;
2241 }
2242
2243 buf2 = buf1;
2244 buf1 = nextBufTask.yieldForce;
2245 bufPos = 0;
2246
2247 if (source.empty)
2248 {
2249 lastTaskWaited = true;
2250 }
2251 else
2252 {
2253 submitBuf2();
2254 }
2255 }
2256
2257 public:
front()2258 E front() @property
2259 {
2260 return buf1[bufPos];
2261 }
2262
popFront()2263 void popFront()
2264 {
2265 static if (hasLength!S)
2266 {
2267 _length--;
2268 }
2269
2270 bufPos++;
2271 if (bufPos >= buf1.length)
2272 {
2273 doBufSwap();
2274 }
2275 }
2276
2277 static if (isInfinite!S)
2278 {
2279 enum bool empty = false;
2280 }
2281
2282 else
2283 {
2284 ///
empty()2285 bool empty() @property
2286 {
2287 // popFront() sets this when source is empty:
2288 return buf1.length == 0;
2289 }
2290 }
2291 }
2292 return new AsyncBuf(source, bufSize, this);
2293 }
2294
2295 /**
2296 Given a callable object $(D next) that writes to a user-provided buffer and
2297 a second callable object $(D empty) that determines whether more data is
2298 available to write via $(D next), returns an input range that
2299 asynchronously calls $(D next) with a set of size $(D nBuffers) of buffers
2300 and makes the results available in the order they were obtained via the
2301 input range interface of the returned object. Similarly to the
2302 input range overload of $(D asyncBuf), the first half of the buffers
2303 are made available via the range interface while the second half are
2304 filled and vice-versa.
2305
2306 Params:
2307
2308 next = A callable object that takes a single argument that must be an array
2309 with mutable elements. When called, $(D next) writes data to
2310 the array provided by the caller.
2311
2312 empty = A callable object that takes no arguments and returns a type
2313 implicitly convertible to $(D bool). This is used to signify
2314 that no more data is available to be obtained by calling $(D next).
2315
2316 initialBufSize = The initial size of each buffer. If $(D next) takes its
2317 array by reference, it may resize the buffers.
2318
2319 nBuffers = The number of buffers to cycle through when calling $(D next).
2320
2321 Example:
2322 ---
2323 // Fetch lines of a file in a background
2324 // thread while processing previously fetched
2325 // lines, without duplicating any lines.
2326 auto file = File("foo.txt");
2327
2328 void next(ref char[] buf)
2329 {
2330 file.readln(buf);
2331 }
2332
2333 // Fetch more lines in the background while we
2334 // process the lines already read into memory
2335 // into a matrix of doubles.
2336 double[][] matrix;
2337 auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2338
2339 foreach (line; asyncReader)
2340 {
2341 auto ls = line.split("\t");
2342 matrix ~= to!(double[])(ls);
2343 }
2344 ---
2345
2346 $(B Exception Handling):
2347
2348 Any exceptions thrown while iterating over $(D range) are re-thrown on a
2349 call to $(D popFront).
2350
2351 Warning:
2352
2353 Using the range returned by this function in a parallel foreach loop
2354 will not work because buffers may be overwritten while the task that
2355 processes them is in queue. This is checked for at compile time
2356 and will result in a static assertion failure.
2357 */
2358 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2359 if (is(typeof(C2.init()) : bool) &&
2360 Parameters!C1.length == 1 &&
2361 Parameters!C2.length == 0 &&
2362 isArray!(Parameters!C1[0])
2363 ) {
2364 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2365 return asyncBuf(roundRobin, nBuffers / 2);
2366 }
2367
2368 ///
reduce(functions...)2369 template reduce(functions...)
2370 {
2371 /**
2372 Parallel reduce on a random access range. Except as otherwise noted,
2373 usage is similar to $(REF _reduce, std,algorithm,iteration). This
2374 function works by splitting the range to be reduced into work units,
2375 which are slices to be reduced in parallel. Once the results from all
2376 work units are computed, a final serial reduction is performed on these
2377 results to compute the final answer. Therefore, care must be taken to
2378 choose the seed value appropriately.
2379
2380 Because the reduction is being performed in parallel, $(D functions)
2381 must be associative. For notational simplicity, let # be an
2382 infix operator representing $(D functions). Then, (a # b) # c must equal
2383 a # (b # c). Floating point addition is not associative
2384 even though addition in exact arithmetic is. Summing floating
2385 point numbers using this function may give different results than summing
2386 serially. However, for many practical purposes floating point addition
2387 can be treated as associative.
2388
2389 Note that, since $(D functions) are assumed to be associative,
2390 additional optimizations are made to the serial portion of the reduction
2391 algorithm. These take advantage of the instruction level parallelism of
2392 modern CPUs, in addition to the thread-level parallelism that the rest
2393 of this module exploits. This can lead to better than linear speedups
2394 relative to $(REF _reduce, std,algorithm,iteration), especially for
2395 fine-grained benchmarks like dot products.
2396
2397 An explicit seed may be provided as the first argument. If
2398 provided, it is used as the seed for all work units and for the final
2399 reduction of results from all work units. Therefore, if it is not the
2400 identity value for the operation being performed, results may differ
2401 from those generated by $(REF _reduce, std,algorithm,iteration) or
2402 depending on how many work units are used. The next argument must be
2403 the range to be reduced.
2404 ---
2405 // Find the sum of squares of a range in parallel, using
2406 // an explicit seed.
2407 //
2408 // Timings on an Athlon 64 X2 dual core machine:
2409 //
2410 // Parallel reduce: 72 milliseconds
2411 // Using std.algorithm.reduce instead: 181 milliseconds
2412 auto nums = iota(10_000_000.0f);
2413 auto sumSquares = taskPool.reduce!"a + b"(
2414 0.0, std.algorithm.map!"a * a"(nums)
2415 );
2416 ---
2417
2418 If no explicit seed is provided, the first element of each work unit
2419 is used as a seed. For the final reduction, the result from the first
2420 work unit is used as the seed.
2421 ---
2422 // Find the sum of a range in parallel, using the first
2423 // element of each work unit as the seed.
2424 auto sum = taskPool.reduce!"a + b"(nums);
2425 ---
2426
2427 An explicit work unit size may be specified as the last argument.
2428 Specifying too small a work unit size will effectively serialize the
2429 reduction, as the final reduction of the result of each work unit will
2430 dominate computation time. If $(D TaskPool.size) for this instance
2431 is zero, this parameter is ignored and one work unit is used.
2432 ---
2433 // Use a work unit size of 100.
2434 auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2435
2436 // Work unit size of 100 and explicit seed.
2437 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2438 ---
2439
2440 Parallel reduce supports multiple functions, like
2441 $(D std.algorithm.reduce).
2442 ---
2443 // Find both the min and max of nums.
2444 auto minMax = taskPool.reduce!(min, max)(nums);
2445 assert(minMax[0] == reduce!min(nums));
2446 assert(minMax[1] == reduce!max(nums));
2447 ---
2448
2449 $(B Exception Handling):
2450
2451 After this function is finished executing, any exceptions thrown
2452 are chained together via $(D Throwable.next) and rethrown. The chaining
2453 order is non-deterministic.
2454 */
2455 auto reduce(Args...)(Args args)
2456 {
2457 import core.exception : OutOfMemoryError;
2458 import std.conv : emplaceRef;
2459 import std.exception : enforce;
2460
2461 alias fun = reduceAdjoin!functions;
2462 alias finishFun = reduceFinish!functions;
2463
2464 static if (isIntegral!(Args[$ - 1]))
2465 {
2466 size_t workUnitSize = cast(size_t) args[$ - 1];
2467 alias args2 = args[0..$ - 1];
2468 alias Args2 = Args[0..$ - 1];
2469 }
2470 else
2471 {
2472 alias args2 = args;
2473 alias Args2 = Args;
2474 }
2475
2476 auto makeStartValue(Type)(Type e)
2477 {
2478 static if (functions.length == 1)
2479 {
2480 return e;
2481 }
2482 else
2483 {
2484 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2485 foreach (i, T; seed.Types)
2486 {
2487 emplaceRef(seed.expand[i], e);
2488 }
2489
2490 return seed;
2491 }
2492 }
2493
2494 static if (args2.length == 2)
2495 {
2496 static assert(isInputRange!(Args2[1]));
2497 alias range = args2[1];
2498 alias seed = args2[0];
2499 enum explicitSeed = true;
2500
2501 static if (!is(typeof(workUnitSize)))
2502 {
2503 size_t workUnitSize = defaultWorkUnitSize(range.length);
2504 }
2505 }
2506 else
2507 {
2508 static assert(args2.length == 1);
2509 alias range = args2[0];
2510
2511 static if (!is(typeof(workUnitSize)))
2512 {
2513 size_t workUnitSize = defaultWorkUnitSize(range.length);
2514 }
2515
2516 enforce(!range.empty,
2517 "Cannot reduce an empty range with first element as start value.");
2518
2519 auto seed = makeStartValue(range.front);
2520 enum explicitSeed = false;
2521 range.popFront();
2522 }
2523
2524 alias E = typeof(seed);
2525 alias R = typeof(range);
2526
2527 E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2528 {
2529 // This is for exploiting instruction level parallelism by
2530 // using multiple accumulator variables within each thread,
2531 // since we're assuming functions are associative anyhow.
2532
2533 // This is so that loops can be unrolled automatically.
2534 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2535 enum nILP = ilpTuple.length;
2536 immutable subSize = (upperBound - lowerBound) / nILP;
2537
2538 if (subSize <= 1)
2539 {
2540 // Handle as a special case.
2541 static if (explicitSeed)
2542 {
2543 E result = seed;
2544 }
2545 else
2546 {
2547 E result = makeStartValue(range[lowerBound]);
2548 lowerBound++;
2549 }
2550
2551 foreach (i; lowerBound .. upperBound)
2552 {
2553 result = fun(result, range[i]);
2554 }
2555
2556 return result;
2557 }
2558
2559 assert(subSize > 1);
2560 E[nILP] results;
2561 size_t[nILP] offsets;
2562
2563 foreach (i; ilpTuple)
2564 {
2565 offsets[i] = lowerBound + subSize * i;
2566
2567 static if (explicitSeed)
2568 {
2569 results[i] = seed;
2570 }
2571 else
2572 {
2573 results[i] = makeStartValue(range[offsets[i]]);
2574 offsets[i]++;
2575 }
2576 }
2577
2578 immutable nLoop = subSize - (!explicitSeed);
2579 foreach (i; 0 .. nLoop)
2580 {
2581 foreach (j; ilpTuple)
2582 {
2583 results[j] = fun(results[j], range[offsets[j]]);
2584 offsets[j]++;
2585 }
2586 }
2587
2588 // Finish the remainder.
2589 foreach (i; nILP * subSize + lowerBound .. upperBound)
2590 {
2591 results[$ - 1] = fun(results[$ - 1], range[i]);
2592 }
2593
2594 foreach (i; ilpTuple[1..$])
2595 {
2596 results[0] = finishFun(results[0], results[i]);
2597 }
2598
2599 return results[0];
2600 }
2601
2602 immutable len = range.length;
2603 if (len == 0)
2604 {
2605 return seed;
2606 }
2607
2608 if (this.size == 0)
2609 {
2610 return finishFun(seed, reduceOnRange(range, 0, len));
2611 }
2612
2613 // Unlike the rest of the functions here, I can't use the Task object
2614 // recycling trick here because this has to work on non-commutative
2615 // operations. After all the tasks are done executing, fun() has to
2616 // be applied on the results of these to get a final result, but
2617 // it can't be evaluated out of order.
2618
2619 if (workUnitSize > len)
2620 {
2621 workUnitSize = len;
2622 }
2623
2624 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2625 assert(nWorkUnits * workUnitSize >= len);
2626
2627 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2628 RTask[] tasks;
2629
2630 // Can't use alloca() due to Bug 3753. Use a fixed buffer
2631 // backed by malloc().
2632 enum maxStack = 2_048;
2633 byte[maxStack] buf = void;
2634 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2635
2636 import core.stdc.stdlib : malloc, free;
2637 if (nBytesNeeded < maxStack)
2638 {
2639 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2640 }
2641 else
2642 {
2643 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2644 if (!ptr)
2645 {
2646 throw new OutOfMemoryError(
2647 "Out of memory in std.parallelism."
2648 );
2649 }
2650
2651 tasks = ptr[0 .. nWorkUnits];
2652 }
2653
2654 scope(exit)
2655 {
2656 if (nBytesNeeded > maxStack)
2657 {
2658 free(tasks.ptr);
2659 }
2660 }
2661
2662 foreach (ref t; tasks[])
2663 emplaceRef(t, RTask());
2664
2665 // Hack to take the address of a nested function w/o
2666 // making a closure.
2667 static auto scopedAddress(D)(scope D del) @system
2668 {
2669 auto tmp = del;
2670 return tmp;
2671 }
2672
2673 size_t curPos = 0;
2674 void useTask(ref RTask task)
2675 {
2676 import std.algorithm.comparison : min;
2677
2678 task.pool = this;
2679 task._args[0] = scopedAddress(&reduceOnRange);
2680 task._args[3] = min(len, curPos + workUnitSize); // upper bound.
2681 task._args[1] = range; // range
2682 task._args[2] = curPos; // lower bound.
2683
2684 curPos += workUnitSize;
2685 }
2686
2687 foreach (ref task; tasks)
2688 {
2689 useTask(task);
2690 }
2691
2692 foreach (i; 1 .. tasks.length - 1)
2693 {
2694 tasks[i].next = tasks[i + 1].basePtr;
2695 tasks[i + 1].prev = tasks[i].basePtr;
2696 }
2697
2698 if (tasks.length > 1)
2699 {
2700 queueLock();
2701 scope(exit) queueUnlock();
2702
2703 abstractPutGroupNoSync(
2704 tasks[1].basePtr,
2705 tasks[$ - 1].basePtr
2706 );
2707 }
2708
2709 if (tasks.length > 0)
2710 {
2711 try
2712 {
2713 tasks[0].job();
2714 }
2715 catch (Throwable e)
2716 {
2717 tasks[0].exception = e;
2718 }
2719 tasks[0].taskStatus = TaskStatus.done;
2720
2721 // Try to execute each of these in the current thread
2722 foreach (ref task; tasks[1..$])
2723 {
2724 tryDeleteExecute(task.basePtr);
2725 }
2726 }
2727
2728 // Now that we've tried to execute every task, they're all either
2729 // done or in progress. Force all of them.
2730 E result = seed;
2731
2732 Throwable firstException, lastException;
2733
2734 foreach (ref task; tasks)
2735 {
2736 try
2737 {
2738 task.yieldForce;
2739 }
2740 catch (Throwable e)
2741 {
2742 addToChain(e, firstException, lastException);
2743 continue;
2744 }
2745
2746 if (!firstException) result = finishFun(result, task.returnVal);
2747 }
2748
2749 if (firstException) throw firstException;
2750
2751 return result;
2752 }
2753 }
2754
2755 /**
2756 Gets the index of the current thread relative to this $(D TaskPool). Any
2757 thread not in this pool will receive an index of 0. The worker threads in
2758 this pool receive unique indices of 1 through $(D this.size).
2759
2760 This function is useful for maintaining worker-local resources.
2761
2762 Example:
2763 ---
2764 // Execute a loop that computes the greatest common
2765 // divisor of every number from 0 through 999 with
2766 // 42 in parallel. Write the results out to
2767 // a set of files, one for each thread. This allows
2768 // results to be written out without any synchronization.
2769
2770 import std.conv, std.range, std.numeric, std.stdio;
2771
2772 void main()
2773 {
2774 auto filesHandles = new File[taskPool.size + 1];
2775 scope(exit) {
2776 foreach (ref handle; fileHandles)
2777 {
2778 handle.close();
2779 }
2780 }
2781
2782 foreach (i, ref handle; fileHandles)
2783 {
2784 handle = File("workerResults" ~ to!string(i) ~ ".txt");
2785 }
2786
2787 foreach (num; parallel(iota(1_000)))
2788 {
2789 auto outHandle = fileHandles[taskPool.workerIndex];
2790 outHandle.writeln(num, '\t', gcd(num, 42));
2791 }
2792 }
2793 ---
2794 */
workerIndex()2795 size_t workerIndex() @property @safe const nothrow
2796 {
2797 immutable rawInd = threadIndex;
2798 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
2799 (rawInd - instanceStartIndex + 1) : 0;
2800 }
2801
2802 /**
2803 Struct for creating worker-local storage. Worker-local storage is
2804 thread-local storage that exists only for worker threads in a given
2805 $(D TaskPool) plus a single thread outside the pool. It is allocated on the
2806 garbage collected heap in a way that avoids _false sharing, and doesn't
2807 necessarily have global scope within any thread. It can be accessed from
2808 any worker thread in the $(D TaskPool) that created it, and one thread
2809 outside this $(D TaskPool). All threads outside the pool that created a
2810 given instance of worker-local storage share a single slot.
2811
2812 Since the underlying data for this struct is heap-allocated, this struct
2813 has reference semantics when passed between functions.
2814
2815 The main uses cases for $(D WorkerLocalStorageStorage) are:
2816
2817 1. Performing parallel reductions with an imperative, as opposed to
2818 functional, programming style. In this case, it's useful to treat
2819 $(D WorkerLocalStorageStorage) as local to each thread for only the parallel
2820 portion of an algorithm.
2821
2822 2. Recycling temporary buffers across iterations of a parallel foreach loop.
2823
2824 Example:
2825 ---
2826 // Calculate pi as in our synopsis example, but
2827 // use an imperative instead of a functional style.
2828 immutable n = 1_000_000_000;
2829 immutable delta = 1.0L / n;
2830
2831 auto sums = taskPool.workerLocalStorage(0.0L);
2832 foreach (i; parallel(iota(n)))
2833 {
2834 immutable x = ( i - 0.5L ) * delta;
2835 immutable toAdd = delta / ( 1.0 + x * x );
2836 sums.get += toAdd;
2837 }
2838
2839 // Add up the results from each worker thread.
2840 real pi = 0;
2841 foreach (threadResult; sums.toRange)
2842 {
2843 pi += 4.0L * threadResult;
2844 }
2845 ---
2846 */
WorkerLocalStorage(T)2847 static struct WorkerLocalStorage(T)
2848 {
2849 private:
2850 TaskPool pool;
2851 size_t size;
2852
2853 size_t elemSize;
2854 bool* stillThreadLocal;
2855
2856 static size_t roundToLine(size_t num) pure nothrow
2857 {
2858 if (num % cacheLineSize == 0)
2859 {
2860 return num;
2861 }
2862 else
2863 {
2864 return ((num / cacheLineSize) + 1) * cacheLineSize;
2865 }
2866 }
2867
2868 void* data;
2869
2870 void initialize(TaskPool pool)
2871 {
2872 this.pool = pool;
2873 size = pool.size + 1;
2874 stillThreadLocal = new bool;
2875 *stillThreadLocal = true;
2876
2877 // Determines whether the GC should scan the array.
2878 auto blkInfo = (typeid(T).flags & 1) ?
2879 cast(GC.BlkAttr) 0 :
2880 GC.BlkAttr.NO_SCAN;
2881
2882 immutable nElem = pool.size + 1;
2883 elemSize = roundToLine(T.sizeof);
2884
2885 // The + 3 is to pad one full cache line worth of space on either side
2886 // of the data structure to make sure false sharing with completely
2887 // unrelated heap data is prevented, and to provide enough padding to
2888 // make sure that data is cache line-aligned.
2889 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
2890
2891 // Cache line align data ptr.
2892 data = cast(void*) roundToLine(cast(size_t) data);
2893
2894 foreach (i; 0 .. nElem)
2895 {
2896 this.opIndex(i) = T.init;
2897 }
2898 }
2899
2900 ref opIndex(this Qualified)(size_t index)
2901 {
2902 import std.conv : text;
2903 assert(index < size, text(index, '\t', uint.max));
2904 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
2905 }
2906
2907 void opIndexAssign(T val, size_t index)
2908 {
2909 assert(index < size);
2910 *(cast(T*) (data + elemSize * index)) = val;
2911 }
2912
2913 public:
2914 /**
2915 Get the current thread's instance. Returns by ref.
2916 Note that calling $(D get) from any thread
2917 outside the $(D TaskPool) that created this instance will return the
2918 same reference, so an instance of worker-local storage should only be
2919 accessed from one thread outside the pool that created it. If this
2920 rule is violated, undefined behavior will result.
2921
2922 If assertions are enabled and $(D toRange) has been called, then this
2923 WorkerLocalStorage instance is no longer worker-local and an assertion
2924 failure will result when calling this method. This is not checked
2925 when assertions are disabled for performance reasons.
2926 */
2927 ref get(this Qualified)() @property
2928 {
2929 assert(*stillThreadLocal,
2930 "Cannot call get() on this instance of WorkerLocalStorage " ~
2931 "because it is no longer worker-local."
2932 );
2933 return opIndex(pool.workerIndex);
2934 }
2935
2936 /**
2937 Assign a value to the current thread's instance. This function has
2938 the same caveats as its overload.
2939 */
2940 void get(T val) @property
2941 {
2942 assert(*stillThreadLocal,
2943 "Cannot call get() on this instance of WorkerLocalStorage " ~
2944 "because it is no longer worker-local."
2945 );
2946
2947 opIndexAssign(val, pool.workerIndex);
2948 }
2949
2950 /**
2951 Returns a range view of the values for all threads, which can be used
2952 to further process the results of each thread after running the parallel
2953 part of your algorithm. Do not use this method in the parallel portion
2954 of your algorithm.
2955
2956 Calling this function sets a flag indicating that this struct is no
2957 longer worker-local, and attempting to use the $(D get) method again
2958 will result in an assertion failure if assertions are enabled.
2959 */
2960 WorkerLocalStorageRange!T toRange() @property
2961 {
2962 if (*stillThreadLocal)
2963 {
2964 *stillThreadLocal = false;
2965
2966 // Make absolutely sure results are visible to all threads.
2967 // This is probably not necessary since some other
2968 // synchronization primitive will be used to signal that the
2969 // parallel part of the algorithm is done, but the
2970 // performance impact should be negligible, so it's better
2971 // to be safe.
2972 ubyte barrierDummy;
2973 atomicSetUbyte(barrierDummy, 1);
2974 }
2975
2976 return WorkerLocalStorageRange!T(this);
2977 }
2978 }
2979
2980 /**
2981 Range primitives for worker-local storage. The purpose of this is to
2982 access results produced by each worker thread from a single thread once you
2983 are no longer using the worker-local storage from multiple threads.
2984 Do not use this struct in the parallel portion of your algorithm.
2985
2986 The proper way to instantiate this object is to call
2987 $(D WorkerLocalStorage.toRange). Once instantiated, this object behaves
2988 as a finite random-access range with assignable, lvalue elements and
2989 a length equal to the number of worker threads in the $(D TaskPool) that
2990 created it plus 1.
2991 */
WorkerLocalStorageRange(T)2992 static struct WorkerLocalStorageRange(T)
2993 {
2994 private:
2995 WorkerLocalStorage!T workerLocalStorage;
2996
2997 size_t _length;
2998 size_t beginOffset;
2999
3000 this(WorkerLocalStorage!T wl)
3001 {
3002 this.workerLocalStorage = wl;
3003 _length = wl.size;
3004 }
3005
3006 public:
3007 ref front(this Qualified)() @property
3008 {
3009 return this[0];
3010 }
3011
3012 ref back(this Qualified)() @property
3013 {
3014 return this[_length - 1];
3015 }
3016
3017 void popFront()
3018 {
3019 if (_length > 0)
3020 {
3021 beginOffset++;
3022 _length--;
3023 }
3024 }
3025
3026 void popBack()
3027 {
3028 if (_length > 0)
3029 {
3030 _length--;
3031 }
3032 }
3033
3034 typeof(this) save() @property
3035 {
3036 return this;
3037 }
3038
3039 ref opIndex(this Qualified)(size_t index)
3040 {
3041 assert(index < _length);
3042 return workerLocalStorage[index + beginOffset];
3043 }
3044
3045 void opIndexAssign(T val, size_t index)
3046 {
3047 assert(index < _length);
3048 workerLocalStorage[index] = val;
3049 }
3050
3051 typeof(this) opSlice(size_t lower, size_t upper)
3052 {
3053 assert(upper <= _length);
3054 auto newWl = this.workerLocalStorage;
3055 newWl.data += lower * newWl.elemSize;
3056 newWl.size = upper - lower;
3057 return typeof(this)(newWl);
3058 }
3059
3060 bool empty() const @property
3061 {
3062 return length == 0;
3063 }
3064
3065 size_t length() const @property
3066 {
3067 return _length;
3068 }
3069 }
3070
3071 /**
3072 Creates an instance of worker-local storage, initialized with a given
3073 value. The value is $(D lazy) so that you can, for example, easily
3074 create one instance of a class for each worker. For usage example,
3075 see the $(D WorkerLocalStorage) struct.
3076 */
3077 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3078 {
3079 WorkerLocalStorage!T ret;
3080 ret.initialize(this);
3081 foreach (i; 0 .. size + 1)
3082 {
3083 ret[i] = initialVal;
3084 }
3085
3086 // Memory barrier to make absolutely sure that what we wrote is
3087 // visible to worker threads.
3088 ubyte barrierDummy;
3089 atomicSetUbyte(barrierDummy, 0);
3090
3091 return ret;
3092 }
3093
3094 /**
3095 Signals to all worker threads to terminate as soon as they are finished
3096 with their current $(D Task), or immediately if they are not executing a
3097 $(D Task). $(D Task)s that were in queue will not be executed unless
3098 a call to $(D Task.workForce), $(D Task.yieldForce) or $(D Task.spinForce)
3099 causes them to be executed.
3100
3101 Use only if you have waited on every $(D Task) and therefore know the
3102 queue is empty, or if you speculatively executed some tasks and no longer
3103 need the results.
3104 */
stop()3105 void stop() @trusted
3106 {
3107 queueLock();
3108 scope(exit) queueUnlock();
3109 atomicSetUbyte(status, PoolState.stopNow);
3110 notifyAll();
3111 }
3112
3113 /**
3114 Signals worker threads to terminate when the queue becomes empty.
3115
3116 If blocking argument is true, wait for all worker threads to terminate
3117 before returning. This option might be used in applications where
3118 task results are never consumed-- e.g. when $(D TaskPool) is employed as a
3119 rudimentary scheduler for tasks which communicate by means other than
3120 return values.
3121
3122 Warning: Calling this function with $(D blocking = true) from a worker
3123 thread that is a member of the same $(D TaskPool) that
3124 $(D finish) is being called on will result in a deadlock.
3125 */
3126 void finish(bool blocking = false) @trusted
3127 {
3128 {
3129 queueLock();
3130 scope(exit) queueUnlock();
3131 atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3132 notifyAll();
3133 }
3134 if (blocking)
3135 {
3136 // Use this thread as a worker until everything is finished.
3137 executeWorkLoop();
3138
foreach(t;pool)3139 foreach (t; pool)
3140 {
3141 // Maybe there should be something here to prevent a thread
3142 // from calling join() on itself if this function is called
3143 // from a worker thread in the same pool, but:
3144 //
3145 // 1. Using an if statement to skip join() would result in
3146 // finish() returning without all tasks being finished.
3147 //
3148 // 2. If an exception were thrown, it would bubble up to the
3149 // Task from which finish() was called and likely be
3150 // swallowed.
3151 t.join();
3152 }
3153 }
3154 }
3155
3156 /// Returns the number of worker threads in the pool.
size()3157 @property size_t size() @safe const pure nothrow
3158 {
3159 return pool.length;
3160 }
3161
3162 /**
3163 Put a $(D Task) object on the back of the task queue. The $(D Task)
3164 object may be passed by pointer or reference.
3165
3166 Example:
3167 ---
3168 import std.file;
3169
3170 // Create a task.
3171 auto t = task!read("foo.txt");
3172
3173 // Add it to the queue to be executed.
3174 taskPool.put(t);
3175 ---
3176
3177 Notes:
3178
3179 @trusted overloads of this function are called for $(D Task)s if
3180 $(REF hasUnsharedAliasing, std,traits) is false for the $(D Task)'s
3181 return type or the function the $(D Task) executes is $(D pure).
3182 $(D Task) objects that meet all other requirements specified in the
3183 $(D @trusted) overloads of $(D task) and $(D scopedTask) may be created
3184 and executed from $(D @safe) code via $(D Task.executeInNewThread) but
3185 not via $(D TaskPool).
3186
3187 While this function takes the address of variables that may
3188 be on the stack, some overloads are marked as @trusted.
3189 $(D Task) includes a destructor that waits for the task to complete
3190 before destroying the stack frame it is allocated on. Therefore,
3191 it is impossible for the stack frame to be destroyed before the task is
3192 complete and no longer referenced by a $(D TaskPool).
3193 */
3194 void put(alias fun, Args...)(ref Task!(fun, Args) task)
3195 if (!isSafeReturn!(typeof(task)))
3196 {
3197 task.pool = this;
3198 abstractPut(task.basePtr);
3199 }
3200
3201 /// Ditto
3202 void put(alias fun, Args...)(Task!(fun, Args)* task)
3203 if (!isSafeReturn!(typeof(*task)))
3204 {
3205 import std.exception : enforce;
3206 enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3207 put(*task);
3208 }
3209
3210 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3211 if (isSafeReturn!(typeof(task)))
3212 {
3213 task.pool = this;
3214 abstractPut(task.basePtr);
3215 }
3216
3217 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3218 if (isSafeReturn!(typeof(*task)))
3219 {
3220 import std.exception : enforce;
3221 enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3222 put(*task);
3223 }
3224
3225 /**
3226 These properties control whether the worker threads are daemon threads.
3227 A daemon thread is automatically terminated when all non-daemon threads
3228 have terminated. A non-daemon thread will prevent a program from
3229 terminating as long as it has not terminated.
3230
3231 If any $(D TaskPool) with non-daemon threads is active, either $(D stop)
3232 or $(D finish) must be called on it before the program can terminate.
3233
3234 The worker treads in the $(D TaskPool) instance returned by the
3235 $(D taskPool) property are daemon by default. The worker threads of
3236 manually instantiated task pools are non-daemon by default.
3237
3238 Note: For a size zero pool, the getter arbitrarily returns true and the
3239 setter has no effect.
3240 */
isDaemon()3241 bool isDaemon() @property @trusted
3242 {
3243 queueLock();
3244 scope(exit) queueUnlock();
3245 return (size == 0) ? true : pool[0].isDaemon;
3246 }
3247
3248 /// Ditto
isDaemon(bool newVal)3249 void isDaemon(bool newVal) @property @trusted
3250 {
3251 queueLock();
3252 scope(exit) queueUnlock();
3253 foreach (thread; pool)
3254 {
3255 thread.isDaemon = newVal;
3256 }
3257 }
3258
3259 /**
3260 These functions allow getting and setting the OS scheduling priority of
3261 the worker threads in this $(D TaskPool). They forward to
3262 $(D core.thread.Thread.priority), so a given priority value here means the
3263 same thing as an identical priority value in $(D core.thread).
3264
3265 Note: For a size zero pool, the getter arbitrarily returns
3266 $(D core.thread.Thread.PRIORITY_MIN) and the setter has no effect.
3267 */
priority()3268 int priority() @property @trusted
3269 {
3270 return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3271 pool[0].priority;
3272 }
3273
3274 /// Ditto
priority(int newPriority)3275 void priority(int newPriority) @property @trusted
3276 {
3277 if (size > 0)
3278 {
3279 foreach (t; pool)
3280 {
3281 t.priority = newPriority;
3282 }
3283 }
3284 }
3285 }
3286
3287 /**
3288 Returns a lazily initialized global instantiation of $(D TaskPool).
3289 This function can safely be called concurrently from multiple non-worker
3290 threads. The worker threads in this pool are daemon threads, meaning that it
3291 is not necessary to call $(D TaskPool.stop) or $(D TaskPool.finish) before
3292 terminating the main thread.
3293 */
taskPool()3294 @property TaskPool taskPool() @trusted
3295 {
3296 import std.concurrency : initOnce;
3297 __gshared TaskPool pool;
3298 return initOnce!pool({
3299 auto p = new TaskPool(defaultPoolThreads);
3300 p.isDaemon = true;
3301 return p;
3302 }());
3303 }
3304
3305 private shared uint _defaultPoolThreads;
this()3306 shared static this()
3307 {
3308 atomicStore(_defaultPoolThreads, totalCPUs - 1);
3309 }
3310
3311 /**
3312 These properties get and set the number of worker threads in the $(D TaskPool)
3313 instance returned by $(D taskPool). The default value is $(D totalCPUs) - 1.
3314 Calling the setter after the first call to $(D taskPool) does not changes
3315 number of worker threads in the instance returned by $(D taskPool).
3316 */
defaultPoolThreads()3317 @property uint defaultPoolThreads() @trusted
3318 {
3319 return atomicLoad(_defaultPoolThreads);
3320 }
3321
3322 /// Ditto
defaultPoolThreads(uint newVal)3323 @property void defaultPoolThreads(uint newVal) @trusted
3324 {
3325 atomicStore(_defaultPoolThreads, newVal);
3326 }
3327
3328 /**
3329 Convenience functions that forwards to $(D taskPool.parallel). The
3330 purpose of these is to make parallel foreach less verbose and more
3331 readable.
3332
3333 Example:
3334 ---
3335 // Find the logarithm of every number from
3336 // 1 to 1_000_000 in parallel, using the
3337 // default TaskPool instance.
3338 auto logs = new double[1_000_000];
3339
3340 foreach (i, ref elem; parallel(logs))
3341 {
3342 elem = log(i + 1.0);
3343 }
3344 ---
3345
3346 */
3347 ParallelForeach!R parallel(R)(R range)
3348 {
3349 return taskPool.parallel(range);
3350 }
3351
3352 /// Ditto
3353 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3354 {
3355 return taskPool.parallel(range, workUnitSize);
3356 }
3357
3358 // Thrown when a parallel foreach loop is broken from.
3359 class ParallelForeachError : Error
3360 {
this()3361 this()
3362 {
3363 super("Cannot break from a parallel foreach loop using break, return, "
3364 ~ "labeled break/continue or goto statements.");
3365 }
3366 }
3367
3368 /*------Structs that implement opApply for parallel foreach.------------------*/
randLen(R)3369 private template randLen(R)
3370 {
3371 enum randLen = isRandomAccessRange!R && hasLength!R;
3372 }
3373
submitAndExecute(TaskPool pool,scope void delegate ()doIt)3374 private void submitAndExecute(
3375 TaskPool pool,
3376 scope void delegate() doIt
3377 )
3378 {
3379 import core.exception : OutOfMemoryError;
3380 immutable nThreads = pool.size + 1;
3381
3382 alias PTask = typeof(scopedTask(doIt));
3383 import core.stdc.stdlib : malloc, free;
3384 import core.stdc.string : memcpy;
3385
3386 // The logical thing to do would be to just use alloca() here, but that
3387 // causes problems on Windows for reasons that I don't understand
3388 // (tentatively a compiler bug) and definitely doesn't work on Posix due
3389 // to Bug 3753. Therefore, allocate a fixed buffer and fall back to
3390 // malloc() if someone's using a ridiculous amount of threads. Also,
3391 // the using a byte array instead of a PTask array as the fixed buffer
3392 // is to prevent d'tors from being called on uninitialized excess PTask
3393 // instances.
3394 enum nBuf = 64;
3395 byte[nBuf * PTask.sizeof] buf = void;
3396 PTask[] tasks;
3397 if (nThreads <= nBuf)
3398 {
3399 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3400 }
3401 else
3402 {
3403 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3404 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3405 tasks = ptr[0 .. nThreads];
3406 }
3407
3408 scope(exit)
3409 {
3410 if (nThreads > nBuf)
3411 {
3412 free(tasks.ptr);
3413 }
3414 }
3415
3416 foreach (ref t; tasks)
3417 {
3418 import core.stdc.string : memcpy;
3419
3420 // This silly looking code is necessary to prevent d'tors from being
3421 // called on uninitialized objects.
3422 auto temp = scopedTask(doIt);
3423 memcpy(&t, &temp, PTask.sizeof);
3424
3425 // This has to be done to t after copying, not temp before copying.
3426 // Otherwise, temp's destructor will sit here and wait for the
3427 // task to finish.
3428 t.pool = pool;
3429 }
3430
3431 foreach (i; 1 .. tasks.length - 1)
3432 {
3433 tasks[i].next = tasks[i + 1].basePtr;
3434 tasks[i + 1].prev = tasks[i].basePtr;
3435 }
3436
3437 if (tasks.length > 1)
3438 {
3439 pool.queueLock();
3440 scope(exit) pool.queueUnlock();
3441
3442 pool.abstractPutGroupNoSync(
3443 tasks[1].basePtr,
3444 tasks[$ - 1].basePtr
3445 );
3446 }
3447
3448 if (tasks.length > 0)
3449 {
3450 try
3451 {
3452 tasks[0].job();
3453 }
3454 catch (Throwable e)
3455 {
3456 tasks[0].exception = e; // nocoverage
3457 }
3458 tasks[0].taskStatus = TaskStatus.done;
3459
3460 // Try to execute each of these in the current thread
3461 foreach (ref task; tasks[1..$])
3462 {
3463 pool.tryDeleteExecute(task.basePtr);
3464 }
3465 }
3466
3467 Throwable firstException, lastException;
3468
3469 foreach (i, ref task; tasks)
3470 {
3471 try
3472 {
3473 task.yieldForce;
3474 }
3475 catch (Throwable e)
3476 {
3477 addToChain(e, firstException, lastException);
3478 continue;
3479 }
3480 }
3481
3482 if (firstException) throw firstException;
3483 }
3484
foreachErr()3485 void foreachErr()
3486 {
3487 throw new ParallelForeachError();
3488 }
3489
doSizeZeroCase(R,Delegate)3490 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3491 {
3492 with(p)
3493 {
3494 int res = 0;
3495 size_t index = 0;
3496
3497 // The explicit ElementType!R in the foreach loops is necessary for
3498 // correct behavior when iterating over strings.
3499 static if (hasLvalueElements!R)
3500 {
3501 foreach (ref ElementType!R elem; range)
3502 {
3503 static if (Parameters!dg.length == 2)
3504 {
3505 res = dg(index, elem);
3506 }
3507 else
3508 {
3509 res = dg(elem);
3510 }
3511 if (res) break;
3512 index++;
3513 }
3514 }
3515 else
3516 {
3517 foreach (ElementType!R elem; range)
3518 {
3519 static if (Parameters!dg.length == 2)
3520 {
3521 res = dg(index, elem);
3522 }
3523 else
3524 {
3525 res = dg(elem);
3526 }
3527 if (res) break;
3528 index++;
3529 }
3530 }
3531 if (res) foreachErr;
3532 return res;
3533 }
3534 }
3535
3536 private enum string parallelApplyMixinRandomAccess = q{
3537 // Handle empty thread pool as special case.
3538 if (pool.size == 0)
3539 {
3540 return doSizeZeroCase(this, dg);
3541 }
3542
3543 // Whether iteration is with or without an index variable.
3544 enum withIndex = Parameters!(typeof(dg)).length == 2;
3545
3546 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0
3547 immutable len = range.length;
3548 if (!len) return 0;
3549
3550 shared bool shouldContinue = true;
3551
3552 void doIt()
3553 {
3554 import std.algorithm.comparison : min;
3555
3556 scope(failure)
3557 {
3558 // If an exception is thrown, all threads should bail.
3559 atomicStore(shouldContinue, false);
3560 }
3561
3562 while (atomicLoad(shouldContinue))
3563 {
3564 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3565 immutable start = workUnitSize * myUnitIndex;
3566 if (start >= len)
3567 {
3568 atomicStore(shouldContinue, false);
3569 break;
3570 }
3571
3572 immutable end = min(len, start + workUnitSize);
3573
3574 foreach (i; start .. end)
3575 {
3576 static if (withIndex)
3577 {
3578 if (dg(i, range[i])) foreachErr();
3579 }
3580 else
3581 {
3582 if (dg(range[i])) foreachErr();
3583 }
3584 }
3585 }
3586 }
3587
3588 submitAndExecute(pool, &doIt);
3589
3590 return 0;
3591 };
3592
3593 enum string parallelApplyMixinInputRange = q{
3594 // Handle empty thread pool as special case.
3595 if (pool.size == 0)
3596 {
3597 return doSizeZeroCase(this, dg);
3598 }
3599
3600 // Whether iteration is with or without an index variable.
3601 enum withIndex = Parameters!(typeof(dg)).length == 2;
3602
3603 // This protects the range while copying it.
3604 auto rangeMutex = new Mutex();
3605
3606 shared bool shouldContinue = true;
3607
3608 // The total number of elements that have been popped off range.
3609 // This is updated only while protected by rangeMutex;
3610 size_t nPopped = 0;
3611
3612 static if (
3613 is(typeof(range.buf1)) &&
3614 is(typeof(range.bufPos)) &&
3615 is(typeof(range.doBufSwap()))
3616 )
3617 {
3618 // Make sure we don't have the buffer recycling overload of
3619 // asyncBuf.
3620 static if (
3621 is(typeof(range.source)) &&
3622 isRoundRobin!(typeof(range.source))
3623 )
3624 {
3625 static assert(0, "Cannot execute a parallel foreach loop on " ~
3626 "the buffer recycling overload of asyncBuf.");
3627 }
3628
3629 enum bool bufferTrick = true;
3630 }
3631 else
3632 {
3633 enum bool bufferTrick = false;
3634 }
3635
3636 void doIt()
3637 {
3638 scope(failure)
3639 {
3640 // If an exception is thrown, all threads should bail.
3641 atomicStore(shouldContinue, false);
3642 }
3643
3644 static if (hasLvalueElements!R)
3645 {
3646 alias Temp = ElementType!R*[];
3647 Temp temp;
3648
3649 // Returns: The previous value of nPopped.
3650 size_t makeTemp()
3651 {
3652 import std.algorithm.internal : addressOf;
3653 import std.array : uninitializedArray;
3654
3655 if (temp is null)
3656 {
3657 temp = uninitializedArray!Temp(workUnitSize);
3658 }
3659
3660 rangeMutex.lock();
3661 scope(exit) rangeMutex.unlock();
3662
3663 size_t i = 0;
3664 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3665 {
3666 temp[i] = addressOf(range.front);
3667 }
3668
3669 temp = temp[0 .. i];
3670 auto ret = nPopped;
3671 nPopped += temp.length;
3672 return ret;
3673 }
3674
3675 }
3676 else
3677 {
3678
3679 alias Temp = ElementType!R[];
3680 Temp temp;
3681
3682 // Returns: The previous value of nPopped.
3683 static if (!bufferTrick) size_t makeTemp()
3684 {
3685 import std.array : uninitializedArray;
3686
3687 if (temp is null)
3688 {
3689 temp = uninitializedArray!Temp(workUnitSize);
3690 }
3691
3692 rangeMutex.lock();
3693 scope(exit) rangeMutex.unlock();
3694
3695 size_t i = 0;
3696 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3697 {
3698 temp[i] = range.front;
3699 }
3700
3701 temp = temp[0 .. i];
3702 auto ret = nPopped;
3703 nPopped += temp.length;
3704 return ret;
3705 }
3706
3707 static if (bufferTrick) size_t makeTemp()
3708 {
3709 import std.algorithm.mutation : swap;
3710 rangeMutex.lock();
3711 scope(exit) rangeMutex.unlock();
3712
3713 // Elide copying by just swapping buffers.
3714 temp.length = range.buf1.length;
3715 swap(range.buf1, temp);
3716
3717 // This is necessary in case popFront() has been called on
3718 // range before entering the parallel foreach loop.
3719 temp = temp[range.bufPos..$];
3720
3721 static if (is(typeof(range._length)))
3722 {
3723 range._length -= (temp.length - range.bufPos);
3724 }
3725
3726 range.doBufSwap();
3727 auto ret = nPopped;
3728 nPopped += temp.length;
3729 return ret;
3730 }
3731 }
3732
3733 while (atomicLoad(shouldContinue))
3734 {
3735 auto overallIndex = makeTemp();
3736 if (temp.empty)
3737 {
3738 atomicStore(shouldContinue, false);
3739 break;
3740 }
3741
3742 foreach (i; 0 .. temp.length)
3743 {
3744 scope(success) overallIndex++;
3745
3746 static if (hasLvalueElements!R)
3747 {
3748 static if (withIndex)
3749 {
3750 if (dg(overallIndex, *temp[i])) foreachErr();
3751 }
3752 else
3753 {
3754 if (dg(*temp[i])) foreachErr();
3755 }
3756 }
3757 else
3758 {
3759 static if (withIndex)
3760 {
3761 if (dg(overallIndex, temp[i])) foreachErr();
3762 }
3763 else
3764 {
3765 if (dg(temp[i])) foreachErr();
3766 }
3767 }
3768 }
3769 }
3770 }
3771
3772 submitAndExecute(pool, &doIt);
3773
3774 return 0;
3775 };
3776
3777 // Calls e.next until the end of the chain is found.
findLastException(Throwable e)3778 private Throwable findLastException(Throwable e) pure nothrow
3779 {
3780 if (e is null) return null;
3781
3782 while (e.next)
3783 {
3784 e = e.next;
3785 }
3786
3787 return e;
3788 }
3789
3790 // Adds e to the exception chain.
addToChain(Throwable e,ref Throwable firstException,ref Throwable lastException)3791 private void addToChain(
3792 Throwable e,
3793 ref Throwable firstException,
3794 ref Throwable lastException
3795 ) pure nothrow
3796 {
3797 if (firstException)
3798 {
3799 assert(lastException); // nocoverage
3800 lastException.next = e; // nocoverage
3801 lastException = findLastException(e); // nocoverage
3802 }
3803 else
3804 {
3805 firstException = e;
3806 lastException = findLastException(e);
3807 }
3808 }
3809
ParallelForeach(R)3810 private struct ParallelForeach(R)
3811 {
3812 TaskPool pool;
3813 R range;
3814 size_t workUnitSize;
3815 alias E = ElementType!R;
3816
3817 static if (hasLvalueElements!R)
3818 {
3819 alias NoIndexDg = int delegate(ref E);
3820 alias IndexDg = int delegate(size_t, ref E);
3821 }
3822 else
3823 {
3824 alias NoIndexDg = int delegate(E);
3825 alias IndexDg = int delegate(size_t, E);
3826 }
3827
3828 int opApply(scope NoIndexDg dg)
3829 {
3830 static if (randLen!R)
3831 {
3832 mixin(parallelApplyMixinRandomAccess);
3833 }
3834 else
3835 {
3836 mixin(parallelApplyMixinInputRange);
3837 }
3838 }
3839
3840 int opApply(scope IndexDg dg)
3841 {
3842 static if (randLen!R)
3843 {
3844 mixin(parallelApplyMixinRandomAccess);
3845 }
3846 else
3847 {
3848 mixin(parallelApplyMixinInputRange);
3849 }
3850 }
3851 }
3852
3853 /*
3854 This struct buffers the output of a callable that outputs data into a
3855 user-supplied buffer into a set of buffers of some fixed size. It allows these
3856 buffers to be accessed with an input range interface. This is used internally
3857 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
3858 instance and forwards it to the input range overload of asyncBuf.
3859 */
RoundRobinBuffer(C1,C2)3860 private struct RoundRobinBuffer(C1, C2)
3861 {
3862 // No need for constraints because they're already checked for in asyncBuf.
3863
3864 alias Array = Parameters!(C1.init)[0];
3865 alias T = typeof(Array.init[0]);
3866
3867 T[][] bufs;
3868 size_t index;
3869 C1 nextDel;
3870 C2 emptyDel;
3871 bool _empty;
3872 bool primed;
3873
3874 this(
3875 C1 nextDel,
3876 C2 emptyDel,
3877 size_t initialBufSize,
3878 size_t nBuffers
3879 ) {
3880 this.nextDel = nextDel;
3881 this.emptyDel = emptyDel;
3882 bufs.length = nBuffers;
3883
3884 foreach (ref buf; bufs)
3885 {
3886 buf.length = initialBufSize;
3887 }
3888 }
3889
3890 void prime()
3891 in
3892 {
3893 assert(!empty);
3894 }
3895 body
3896 {
3897 scope(success) primed = true;
3898 nextDel(bufs[index]);
3899 }
3900
3901
3902 T[] front() @property
3903 in
3904 {
3905 assert(!empty);
3906 }
3907 body
3908 {
3909 if (!primed) prime();
3910 return bufs[index];
3911 }
3912
3913 void popFront()
3914 {
3915 if (empty || emptyDel())
3916 {
3917 _empty = true;
3918 return;
3919 }
3920
3921 index = (index + 1) % bufs.length;
3922 primed = false;
3923 }
3924
3925 bool empty() @property const @safe pure nothrow
3926 {
3927 return _empty;
3928 }
3929 }
3930
version(unittest)3931 version (unittest)
3932 {
3933 // This was the only way I could get nested maps to work.
3934 __gshared TaskPool poolInstance;
3935
3936 import std.stdio;
3937 }
3938
3939 // These test basic functionality but don't stress test for threading bugs.
3940 // These are the tests that should be run every time Phobos is compiled.
3941 @system unittest
3942 {
3943 import std.algorithm.comparison : equal, min, max;
3944 import std.algorithm.iteration : filter, map, reduce;
3945 import std.array : split;
3946 import std.conv : text;
3947 import std.exception : assertThrown;
3948 import std.math : approxEqual, sqrt, log;
3949 import std.range : indexed, iota, join;
3950 import std.typecons : Tuple, tuple;
3951
3952 poolInstance = new TaskPool(2);
3953 scope(exit) poolInstance.stop();
3954
3955 // The only way this can be verified is manually.
3956 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
3957
3958 auto oldPriority = poolInstance.priority;
3959 poolInstance.priority = Thread.PRIORITY_MAX;
3960 assert(poolInstance.priority == Thread.PRIORITY_MAX);
3961
3962 poolInstance.priority = Thread.PRIORITY_MIN;
3963 assert(poolInstance.priority == Thread.PRIORITY_MIN);
3964
3965 poolInstance.priority = oldPriority;
3966 assert(poolInstance.priority == oldPriority);
3967
refFun(ref uint num)3968 static void refFun(ref uint num)
3969 {
3970 num++;
3971 }
3972
3973 uint x;
3974
3975 // Test task().
3976 auto t = task!refFun(x);
3977 poolInstance.put(t);
3978 t.yieldForce;
3979 assert(t.args[0] == 1);
3980
3981 auto t2 = task(&refFun, x);
3982 poolInstance.put(t2);
3983 t2.yieldForce;
3984 assert(t2.args[0] == 1);
3985
3986 // Test scopedTask().
3987 auto st = scopedTask!refFun(x);
3988 poolInstance.put(st);
3989 st.yieldForce;
3990 assert(st.args[0] == 1);
3991
3992 auto st2 = scopedTask(&refFun, x);
3993 poolInstance.put(st2);
3994 st2.yieldForce;
3995 assert(st2.args[0] == 1);
3996
3997 // Test executeInNewThread().
3998 auto ct = scopedTask!refFun(x);
3999 ct.executeInNewThread(Thread.PRIORITY_MAX);
4000 ct.yieldForce;
4001 assert(ct.args[0] == 1);
4002
4003 // Test ref return.
4004 uint toInc = 0;
makeRef(T)4005 static ref T makeRef(T)(ref T num)
4006 {
4007 return num;
4008 }
4009
4010 auto t3 = task!makeRef(toInc);
4011 taskPool.put(t3);
4012 assert(t3.args[0] == 0);
4013 t3.spinForce++;
4014 assert(t3.args[0] == 1);
4015
testSafe()4016 static void testSafe() @safe {
4017 static int bump(int num)
4018 {
4019 return num + 1;
4020 }
4021
4022 auto safePool = new TaskPool(0);
4023 auto t = task(&bump, 1);
4024 taskPool.put(t);
4025 assert(t.yieldForce == 2);
4026
4027 auto st = scopedTask(&bump, 1);
4028 taskPool.put(st);
4029 assert(st.yieldForce == 2);
4030 safePool.stop();
4031 }
4032
4033 auto arr = [1,2,3,4,5];
4034 auto nums = new uint[5];
4035 auto nums2 = new uint[5];
4036
4037 foreach (i, ref elem; poolInstance.parallel(arr))
4038 {
4039 elem++;
4040 nums[i] = cast(uint) i + 2;
4041 nums2[i] = elem;
4042 }
4043
4044 assert(nums == [2,3,4,5,6], text(nums));
4045 assert(nums2 == nums, text(nums2));
4046 assert(arr == nums, text(arr));
4047
4048 // Test const/immutable arguments.
add(int lhs,int rhs)4049 static int add(int lhs, int rhs)
4050 {
4051 return lhs + rhs;
4052 }
4053 immutable addLhs = 1;
4054 immutable addRhs = 2;
4055 auto addTask = task(&add, addLhs, addRhs);
4056 auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4057 poolInstance.put(addTask);
4058 poolInstance.put(addScopedTask);
4059 assert(addTask.yieldForce == 3);
4060 assert(addScopedTask.yieldForce == 3);
4061
4062 // Test parallel foreach with non-random access range.
4063 auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4064
4065 foreach (i, elem; poolInstance.parallel(range))
4066 {
4067 nums[i] = cast(uint) i;
4068 }
4069
4070 assert(nums == [0,1,2,3,4]);
4071
4072 auto logs = new double[1_000_000];
4073 foreach (i, ref elem; poolInstance.parallel(logs))
4074 {
4075 elem = log(i + 1.0);
4076 }
4077
foreach(i,elem;logs)4078 foreach (i, elem; logs)
4079 {
4080 assert(approxEqual(elem, cast(double) log(i + 1)));
4081 }
4082
4083 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4084 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4085 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4086 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4087
4088 auto tupleBuf = new Tuple!(int, int)[3];
4089 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4090 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4091 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4092 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4093
4094 // Test amap with a non-array buffer.
4095 auto toIndex = new int[5];
4096 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4097 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4098 assert(equal(ind, [2, 4, 6, 8, 10]));
4099 assert(equal(toIndex, [8, 4, 10, 2, 6]));
4100 poolInstance.amap!"a / 2"(ind, ind);
4101 assert(equal(ind, [1, 2, 3, 4, 5]));
4102 assert(equal(toIndex, [4, 2, 5, 1, 3]));
4103
4104 auto buf = new int[5];
4105 poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4106 assert(buf == [1,4,9,16,25]);
4107 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4108 assert(buf == [1,4,9,16,25]);
4109
4110 assert(poolInstance.reduce!"a + b"([1]) == 1);
4111 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4112 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4113 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4114 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4115 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4116 tuple(10, 24));
4117
4118 immutable serialAns = reduce!"a + b"(iota(1000));
4119 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4120 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4121
4122 // Test worker-local storage.
4123 auto wl = poolInstance.workerLocalStorage(0);
4124 foreach (i; poolInstance.parallel(iota(1000), 1))
4125 {
4126 wl.get = wl.get + i;
4127 }
4128
4129 auto wlRange = wl.toRange;
4130 auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4131 assert(parallelSum == 499500);
4132 assert(wlRange[0 .. 1][0] == wlRange[0]);
4133 assert(wlRange[1 .. 2][0] == wlRange[1]);
4134
4135 // Test finish()
4136 {
slowFun()4137 static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4138
4139 auto pool1 = new TaskPool();
4140 auto tSlow = task!slowFun();
4141 pool1.put(tSlow);
4142 pool1.finish();
4143 tSlow.yieldForce;
4144 // Can't assert that pool1.status == PoolState.stopNow because status
4145 // doesn't change until after the "done" flag is set and the waiting
4146 // thread is woken up.
4147
4148 auto pool2 = new TaskPool();
4149 auto tSlow2 = task!slowFun();
4150 pool2.put(tSlow2);
4151 pool2.finish(true); // blocking
4152 assert(tSlow2.done);
4153
4154 // Test fix for Bug 8582 by making pool size zero.
4155 auto pool3 = new TaskPool(0);
4156 auto tSlow3 = task!slowFun();
4157 pool3.put(tSlow3);
4158 pool3.finish(true); // blocking
4159 assert(tSlow3.done);
4160
4161 // This is correct because no thread will terminate unless pool2.status
4162 // and pool3.status have already been set to stopNow.
4163 assert(pool2.status == TaskPool.PoolState.stopNow);
4164 assert(pool3.status == TaskPool.PoolState.stopNow);
4165 }
4166
4167 // Test default pool stuff.
4168 assert(taskPool.size == totalCPUs - 1);
4169
4170 nums = new uint[1000];
4171 foreach (i; parallel(iota(1000)))
4172 {
4173 nums[i] = cast(uint) i;
4174 }
4175 assert(equal(nums, iota(1000)));
4176
4177 assert(equal(
4178 poolInstance.map!"a * a"(iota(30_000_001), 10_000),
4179 map!"a * a"(iota(30_000_001))
4180 ));
4181
4182 // The filter is to kill random access and test the non-random access
4183 // branch.
4184 assert(equal(
4185 poolInstance.map!"a * a"(
4186 filter!"a == a"(iota(30_000_001)
4187 ), 10_000, 1000),
4188 map!"a * a"(iota(30_000_001))
4189 ));
4190
4191 assert(
4192 reduce!"a + b"(0UL,
4193 poolInstance.map!"a * a"(iota(3_000_001), 10_000)
4194 ) ==
4195 reduce!"a + b"(0UL,
4196 map!"a * a"(iota(3_000_001))
4197 )
4198 );
4199
4200 assert(equal(
4201 iota(1_000_002),
4202 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4203 ));
4204
4205 {
4206 import std.conv : to;
4207 import std.file : deleteme;
4208
4209 string temp_file = deleteme ~ "-tempDelMe.txt";
4210 auto file = File(temp_file, "wb");
scope(exit)4211 scope(exit)
4212 {
4213 file.close();
4214 import std.file;
4215 remove(temp_file);
4216 }
4217
4218 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
foreach(row;written)4219 foreach (row; written)
4220 {
4221 file.writeln(join(to!(string[])(row), "\t"));
4222 }
4223
4224 file = File(temp_file);
4225
next(ref char[]buf)4226 void next(ref char[] buf)
4227 {
4228 file.readln(buf);
4229 import std.string : chomp;
4230 buf = chomp(buf);
4231 }
4232
4233 double[][] read;
4234 auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4235
foreach(line;asyncReader)4236 foreach (line; asyncReader)
4237 {
4238 if (line.length == 0) continue;
4239 auto ls = line.split("\t");
4240 read ~= to!(double[])(ls);
4241 }
4242
4243 assert(read == written);
4244 file.close();
4245 }
4246
4247 // Test Map/AsyncBuf chaining.
4248
4249 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4250 auto temp = poolInstance.map!sqrt(
4251 abuf, 100, 5
4252 );
4253 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4254 lmchain.popFront();
4255
4256 int ii;
foreach(elem;(lmchain))4257 foreach ( elem; (lmchain))
4258 {
4259 if (!approxEqual(elem, ii))
4260 {
4261 stderr.writeln(ii, '\t', elem);
4262 }
4263 ii++;
4264 }
4265
4266 // Test buffer trick in parallel foreach.
4267 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4268 abuf.popFront();
4269 auto bufTrickTest = new size_t[abuf.length];
foreach(i,elem;parallel (abuf))4270 foreach (i, elem; parallel(abuf))
4271 {
4272 bufTrickTest[i] = i;
4273 }
4274
4275 assert(equal(iota(1_000_000), bufTrickTest));
4276
4277 auto myTask = task!(std.math.abs)(-1);
4278 taskPool.put(myTask);
4279 assert(myTask.spinForce == 1);
4280
4281 // Test that worker local storage from one pool receives an index of 0
4282 // when the index is queried w.r.t. another pool. The only way to do this
4283 // is non-deterministically.
4284 foreach (i; parallel(iota(1000), 1))
4285 {
4286 assert(poolInstance.workerIndex == 0);
4287 }
4288
4289 foreach (i; poolInstance.parallel(iota(1000), 1))
4290 {
4291 assert(taskPool.workerIndex == 0);
4292 }
4293
4294 // Test exception handling.
parallelForeachThrow()4295 static void parallelForeachThrow()
4296 {
4297 foreach (elem; parallel(iota(10)))
4298 {
4299 throw new Exception("");
4300 }
4301 }
4302
4303 assertThrown!Exception(parallelForeachThrow());
4304
reduceException(int a,int b)4305 static int reduceException(int a, int b)
4306 {
4307 throw new Exception("");
4308 }
4309
4310 assertThrown!Exception(
4311 poolInstance.reduce!reduceException(iota(3))
4312 );
4313
mapException(int a)4314 static int mapException(int a)
4315 {
4316 throw new Exception("");
4317 }
4318
4319 assertThrown!Exception(
4320 poolInstance.amap!mapException(iota(3))
4321 );
4322
mapThrow()4323 static void mapThrow()
4324 {
4325 auto m = poolInstance.map!mapException(iota(3));
4326 m.popFront();
4327 }
4328
4329 assertThrown!Exception(mapThrow());
4330
4331 struct ThrowingRange
4332 {
frontThrowingRange4333 @property int front()
4334 {
4335 return 1;
4336 }
popFrontThrowingRange4337 void popFront()
4338 {
4339 throw new Exception("");
4340 }
4341 enum bool empty = false;
4342 }
4343
4344 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4345 }
4346
4347 //version = parallelismStressTest;
4348
4349 // These are more like stress tests than real unit tests. They print out
4350 // tons of stuff and should not be run every time make unittest is run.
version(parallelismStressTest)4351 version (parallelismStressTest)
4352 {
4353 @safe unittest
4354 {
4355 size_t attempt;
4356 for (; attempt < 10; attempt++)
4357 foreach (poolSize; [0, 4])
4358 {
4359
4360 poolInstance = new TaskPool(poolSize);
4361
4362 uint[] numbers = new uint[1_000];
4363
4364 foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4365 {
4366 numbers[i] = cast(uint) i;
4367 }
4368
4369 // Make sure it works.
4370 foreach (i; 0 .. numbers.length)
4371 {
4372 assert(numbers[i] == i);
4373 }
4374
4375 stderr.writeln("Done creating nums.");
4376
4377
4378 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4379 foreach (num; poolInstance.parallel(myNumbers))
4380 {
4381 assert(num % 7 > 0 && num < 1000);
4382 }
4383 stderr.writeln("Done modulus test.");
4384
4385 uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4386 assert(squares.length == numbers.length);
4387 foreach (i, number; numbers)
4388 {
4389 assert(squares[i] == number * number);
4390 }
4391 stderr.writeln("Done squares.");
4392
4393 auto sumFuture = task!( reduce!"a + b" )(numbers);
4394 poolInstance.put(sumFuture);
4395
4396 ulong sumSquares = 0;
4397 foreach (elem; numbers)
4398 {
4399 sumSquares += elem * elem;
4400 }
4401
4402 uint mySum = sumFuture.spinForce();
4403 assert(mySum == 999 * 1000 / 2);
4404
4405 auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4406 assert(mySum == mySumParallel);
4407 stderr.writeln("Done sums.");
4408
4409 auto myTask = task(
4410 {
4411 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4412 });
4413 poolInstance.put(myTask);
4414
4415 auto nestedOuter = "abcd";
4416 auto nestedInner = iota(0, 10, 2);
4417
4418 foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4419 {
4420 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4421 {
4422 synchronized writeln(i, ": ", letter, " ", j, ": ", number);
4423 }
4424 }
4425
4426 poolInstance.stop();
4427 }
4428
4429 assert(attempt == 10);
4430 writeln("Press enter to go to next round of unittests.");
4431 readln();
4432 }
4433
4434 // These unittests are intended more for actual testing and not so much
4435 // as examples.
4436 @safe unittest
4437 {
4438 foreach (attempt; 0 .. 10)
4439 foreach (poolSize; [0, 4])
4440 {
4441 poolInstance = new TaskPool(poolSize);
4442
4443 // Test indexing.
4444 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex);
4445 assert(poolInstance.workerIndex() == 0);
4446
4447 // Test worker-local storage.
4448 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4449 foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4450 {
4451 workerLocalStorage.get++;
4452 }
4453 assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4454 1_000_000 + poolInstance.size + 1);
4455
4456 // Make sure work is reasonably balanced among threads. This test is
4457 // non-deterministic and is more of a sanity check than something that
4458 // has an absolute pass/fail.
4459 shared(uint)[void*] nJobsByThread;
4460 foreach (thread; poolInstance.pool)
4461 {
4462 nJobsByThread[cast(void*) thread] = 0;
4463 }
4464 nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4465
4466 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4467 {
4468 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4469 }
4470
4471 stderr.writeln("\nCurrent thread is: ",
4472 cast(void*) Thread.getThis());
4473 stderr.writeln("Workload distribution: ");
4474 foreach (k, v; nJobsByThread)
4475 {
4476 stderr.writeln(k, '\t', v);
4477 }
4478
4479 // Test whether amap can be nested.
4480 real[][] matrix = new real[][](1000, 1000);
4481 foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4482 {
4483 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4484 {
4485 matrix[i][j] = i * j;
4486 }
4487 }
4488
4489 // Get around weird bugs having to do w/ sqrt being an intrinsic:
4490 static real mySqrt(real num)
4491 {
4492 return sqrt(num);
4493 }
4494
4495 static real[] parallelSqrt(real[] nums)
4496 {
4497 return poolInstance.amap!mySqrt(nums);
4498 }
4499
4500 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4501
4502 foreach (i, row; sqrtMatrix)
4503 {
4504 foreach (j, elem; row)
4505 {
4506 real shouldBe = sqrt( cast(real) i * j);
4507 assert(approxEqual(shouldBe, elem));
4508 sqrtMatrix[i][j] = shouldBe;
4509 }
4510 }
4511
4512 auto saySuccess = task(
4513 {
4514 stderr.writeln(
4515 "Success doing matrix stuff that involves nested pool use.");
4516 });
4517 poolInstance.put(saySuccess);
4518 saySuccess.workForce();
4519
4520 // A more thorough test of amap, reduce: Find the sum of the square roots of
4521 // matrix.
4522
4523 static real parallelSum(real[] input)
4524 {
4525 return poolInstance.reduce!"a + b"(input);
4526 }
4527
4528 auto sumSqrt = poolInstance.reduce!"a + b"(
4529 poolInstance.amap!parallelSum(
4530 sqrtMatrix
4531 )
4532 );
4533
4534 assert(approxEqual(sumSqrt, 4.437e8));
4535 stderr.writeln("Done sum of square roots.");
4536
4537 // Test whether tasks work with function pointers.
4538 auto nanTask = task(&isNaN, 1.0L);
4539 poolInstance.put(nanTask);
4540 assert(nanTask.spinForce == false);
4541
4542 if (poolInstance.size > 0)
4543 {
4544 // Test work waiting.
4545 static void uselessFun()
4546 {
4547 foreach (i; 0 .. 1_000_000) {}
4548 }
4549
4550 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4551 foreach (ref uselessTask; uselessTasks)
4552 {
4553 uselessTask = task(&uselessFun);
4554 }
4555 foreach (ref uselessTask; uselessTasks)
4556 {
4557 poolInstance.put(uselessTask);
4558 }
4559 foreach (ref uselessTask; uselessTasks)
4560 {
4561 uselessTask.workForce();
4562 }
4563 }
4564
4565 // Test the case of non-random access + ref returns.
4566 int[] nums = [1,2,3,4,5];
4567 static struct RemoveRandom
4568 {
4569 int[] arr;
4570
4571 ref int front()
4572 {
4573 return arr.front;
4574 }
4575 void popFront()
4576 {
4577 arr.popFront();
4578 }
4579 bool empty()
4580 {
4581 return arr.empty;
4582 }
4583 }
4584
4585 auto refRange = RemoveRandom(nums);
4586 foreach (ref elem; poolInstance.parallel(refRange))
4587 {
4588 elem++;
4589 }
4590 assert(nums == [2,3,4,5,6], text(nums));
4591 stderr.writeln("Nums: ", nums);
4592
4593 poolInstance.stop();
4594 }
4595 }
4596 }
4597
version(unittest)4598 version (unittest)
4599 {
4600 struct __S_12733
4601 {
4602 invariant() { assert(checksum == 1_234_567_890); }
4603 this(ulong u){n = u;}
4604 void opAssign(__S_12733 s){this.n = s.n;}
4605 ulong n;
4606 ulong checksum = 1_234_567_890;
4607 }
4608
4609 static auto __genPair_12733(ulong n) { return __S_12733(n); }
4610 }
4611
4612 @system unittest
4613 {
4614 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4615
4616 auto result = taskPool.amap!__genPair_12733(data);
4617 }
4618
4619 @safe unittest
4620 {
4621 import std.range : iota;
4622
4623 // this test was in std.range, but caused cycles.
4624 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4625 }
4626
4627 @safe unittest
4628 {
4629 import std.algorithm.iteration : each;
4630
4631 long[] arr;
4632 static assert(is(typeof({
4633 arr.parallel.each!"a++";
4634 })));
4635 }
4636
4637 // https://issues.dlang.org/show_bug.cgi?id=17539
4638 @system unittest
4639 {
4640 import std.random : rndGen;
4641 // ensure compilation
4642 try foreach (rnd; rndGen.parallel) break;
catch(ParallelForeachError e)4643 catch (ParallelForeachError e) {}
4644 }
4645