1 /**
2  * This is a low-level messaging API upon which more structured or restrictive
3  * APIs may be built.  The general idea is that every messageable entity is
4  * represented by a common handle type called a Tid, which allows messages to
5  * be sent to logical threads that are executing in both the current process
6  * and in external processes using the same interface.  This is an important
7  * aspect of scalability because it allows the components of a program to be
8  * spread across available resources with few to no changes to the actual
9  * implementation.
10  *
11  * A logical thread is an execution context that has its own stack and which
12  * runs asynchronously to other logical threads.  These may be preemptively
13  * scheduled kernel threads, fibers (cooperative user-space threads), or some
14  * other concept with similar behavior.
15  *
16  * The type of concurrency used when logical threads are created is determined
17  * by the Scheduler selected at initialization time.  The default behavior is
18  * currently to create a new kernel thread per call to spawn, but other
19  * schedulers are available that multiplex fibers across the main thread or
20  * use some combination of the two approaches.
21  *
22  * Copyright: Copyright Sean Kelly 2009 - 2014.
23  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
24  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
25  * Source:    $(PHOBOSSRC std/_concurrency.d)
26  */
27 /*          Copyright Sean Kelly 2009 - 2014.
28  * Distributed under the Boost Software License, Version 1.0.
29  *    (See accompanying file LICENSE_1_0.txt or copy at
30  *          http://www.boost.org/LICENSE_1_0.txt)
31  */
32 module std.concurrency;
33 
34 public import std.variant;
35 
36 import core.atomic;
37 import core.sync.condition;
38 import core.sync.mutex;
39 import core.thread;
40 import std.range.primitives;
41 import std.range.interfaces : InputRange;
42 import std.traits;
43 
44 ///
45 @system unittest
46 {
47     __gshared string received;
spawnedFunc(Tid ownerTid)48     static void spawnedFunc(Tid ownerTid)
49     {
50         import std.conv : text;
51         // Receive a message from the owner thread.
52         receive((int i){
53             received = text("Received the number ", i);
54 
55             // Send a message back to the owner thread
56             // indicating success.
57             send(ownerTid, true);
58         });
59     }
60 
61     // Start spawnedFunc in a new thread.
62     auto childTid = spawn(&spawnedFunc, thisTid);
63 
64     // Send the number 42 to this new thread.
65     send(childTid, 42);
66 
67     // Receive the result code.
68     auto wasSuccessful = receiveOnly!(bool);
69     assert(wasSuccessful);
70     assert(received == "Received the number 42");
71 }
72 
73 private
74 {
hasLocalAliasing(T...)75     template hasLocalAliasing(T...)
76     {
77         static if (!T.length)
78             enum hasLocalAliasing = false;
79         else
80             enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) ||
81                                      std.concurrency.hasLocalAliasing!(T[1 .. $]);
82     }
83 
84     enum MsgType
85     {
86         standard,
87         priority,
88         linkDead,
89     }
90 
91     struct Message
92     {
93         MsgType type;
94         Variant data;
95 
96         this(T...)(MsgType t, T vals) if (T.length > 0)
97         {
98             static if (T.length == 1)
99             {
100                 type = t;
101                 data = vals[0];
102             }
103             else
104             {
105                 import std.typecons : Tuple;
106 
107                 type = t;
108                 data = Tuple!(T)(vals);
109             }
110         }
111 
convertsTo(T...)112         @property auto convertsTo(T...)()
113         {
114             static if (T.length == 1)
115             {
116                 return is(T[0] == Variant) || data.convertsTo!(T);
117             }
118             else
119             {
120                 import std.typecons : Tuple;
121                 return data.convertsTo!(Tuple!(T));
122             }
123         }
124 
get(T...)125         @property auto get(T...)()
126         {
127             static if (T.length == 1)
128             {
129                 static if (is(T[0] == Variant))
130                     return data;
131                 else
132                     return data.get!(T);
133             }
134             else
135             {
136                 import std.typecons : Tuple;
137                 return data.get!(Tuple!(T));
138             }
139         }
140 
map(Op)141         auto map(Op)(Op op)
142         {
143             alias Args = Parameters!(Op);
144 
145             static if (Args.length == 1)
146             {
147                 static if (is(Args[0] == Variant))
148                     return op(data);
149                 else
150                     return op(data.get!(Args));
151             }
152             else
153             {
154                 import std.typecons : Tuple;
155                 return op(data.get!(Tuple!(Args)).expand);
156             }
157         }
158     }
159 
checkops(T...)160     void checkops(T...)(T ops)
161     {
162         foreach (i, t1; T)
163         {
164             static assert(isFunctionPointer!t1 || isDelegate!t1);
165             alias a1 = Parameters!(t1);
166             alias r1 = ReturnType!(t1);
167 
168             static if (i < T.length - 1 && is(r1 == void))
169             {
170                 static assert(a1.length != 1 || !is(a1[0] == Variant),
171                               "function with arguments " ~ a1.stringof ~
172                               " occludes successive function");
173 
174                 foreach (t2; T[i + 1 .. $])
175                 {
176                     static assert(isFunctionPointer!t2 || isDelegate!t2);
177                     alias a2 = Parameters!(t2);
178 
179                     static assert(!is(a1 == a2),
180                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
181                 }
182             }
183         }
184     }
185 
thisInfo()186     @property ref ThreadInfo thisInfo() nothrow
187     {
188         if (scheduler is null)
189             return ThreadInfo.thisInfo;
190         return scheduler.thisInfo;
191     }
192 }
193 
~this()194 static ~this()
195 {
196     thisInfo.cleanup();
197 }
198 
199 // Exceptions
200 
201 /**
202  * Thrown on calls to $(D receiveOnly) if a message other than the type
203  * the receiving thread expected is sent.
204  */
205 class MessageMismatch : Exception
206 {
207     ///
208     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
209     {
210         super(msg);
211     }
212 }
213 
214 /**
215  * Thrown on calls to $(D receive) if the thread that spawned the receiving
216  * thread has terminated and no more messages exist.
217  */
218 class OwnerTerminated : Exception
219 {
220     ///
221     this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
222     {
223         super(msg);
224         tid = t;
225     }
226 
227     Tid tid;
228 }
229 
230 /**
231  * Thrown if a linked thread has terminated.
232  */
233 class LinkTerminated : Exception
234 {
235     ///
236     this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
237     {
238         super(msg);
239         tid = t;
240     }
241 
242     Tid tid;
243 }
244 
245 /**
246  * Thrown if a message was sent to a thread via
247  * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
248  * for a message of this type.
249  */
250 class PriorityMessageException : Exception
251 {
252     ///
this(Variant vals)253     this(Variant vals)
254     {
255         super("Priority message");
256         message = vals;
257     }
258 
259     /**
260      * The message that was sent.
261      */
262     Variant message;
263 }
264 
265 /**
266  * Thrown on mailbox crowding if the mailbox is configured with
267  * $(D OnCrowding.throwException).
268  */
269 class MailboxFull : Exception
270 {
271     ///
272     this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
273     {
274         super(msg);
275         tid = t;
276     }
277 
278     Tid tid;
279 }
280 
281 /**
282  * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't
283  * find an owner thread.
284  */
285 class TidMissingException : Exception
286 {
287     import std.exception : basicExceptionCtors;
288     ///
289     mixin basicExceptionCtors;
290 }
291 
292 
293 // Thread ID
294 
295 
296 /**
297  * An opaque type used to represent a logical thread.
298  */
299 struct Tid
300 {
301 private:
thisTid302     this(MessageBox m) @safe pure nothrow @nogc
303     {
304         mbox = m;
305     }
306 
307     MessageBox mbox;
308 
309 public:
310 
311     /**
312      * Generate a convenient string for identifying this Tid.  This is only
313      * useful to see if Tid's that are currently executing are the same or
314      * different, e.g. for logging and debugging.  It is potentially possible
315      * that a Tid executed in the future will have the same toString() output
316      * as another Tid that has already terminated.
317      */
toStringTid318     void toString(scope void delegate(const(char)[]) sink)
319     {
320         import std.format : formattedWrite;
321         formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
322     }
323 
324 }
325 
326 @system unittest
327 {
328     // text!Tid is @system
329     import std.conv : text;
330     Tid tid;
331     assert(text(tid) == "Tid(0)");
332     auto tid2 = thisTid;
333     assert(text(tid2) != "Tid(0)");
334     auto tid3 = tid2;
335     assert(text(tid2) == text(tid3));
336 }
337 
338 /**
339  * Returns: The $(LREF Tid) of the caller's thread.
340  */
thisTid()341 @property Tid thisTid() @safe
342 {
343     // TODO: remove when concurrency is safe
344     static auto trus() @trusted
345     {
346         if (thisInfo.ident != Tid.init)
347             return thisInfo.ident;
348         thisInfo.ident = Tid(new MessageBox);
349         return thisInfo.ident;
350     }
351 
352     return trus();
353 }
354 
355 /**
356  * Return the Tid of the thread which spawned the caller's thread.
357  *
358  * Throws: A $(D TidMissingException) exception if
359  * there is no owner thread.
360  */
ownerTid()361 @property Tid ownerTid()
362 {
363     import std.exception : enforce;
364 
365     enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
366     return thisInfo.owner;
367 }
368 
369 @system unittest
370 {
371     import std.exception : assertThrown;
372 
fun()373     static void fun()
374     {
375         string res = receiveOnly!string();
376         assert(res == "Main calling");
377         ownerTid.send("Child responding");
378     }
379 
380     assertThrown!TidMissingException(ownerTid);
381     auto child = spawn(&fun);
382     child.send("Main calling");
383     string res = receiveOnly!string();
384     assert(res == "Child responding");
385 }
386 
387 // Thread Creation
388 
isSpawnable(F,T...)389 private template isSpawnable(F, T...)
390 {
391     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
392     {
393         alias param1 = Parameters!F1;
394         alias param2 = Parameters!F2;
395         static if (param1.length != param2.length)
396             enum isParamsImplicitlyConvertible = false;
397         else static if (param1.length == i)
398             enum isParamsImplicitlyConvertible = true;
399         else static if (isImplicitlyConvertible!(param2[i], param1[i]))
400             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
401                     F2, i + 1);
402         else
403             enum isParamsImplicitlyConvertible = false;
404     }
405 
406     enum isSpawnable = isCallable!F && is(ReturnType!F == void)
407             && isParamsImplicitlyConvertible!(F, void function(T))
408             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
409 }
410 
411 /**
412  * Starts fn(args) in a new logical thread.
413  *
414  * Executes the supplied function in a new logical thread represented by
415  * $(D Tid).  The calling thread is designated as the owner of the new thread.
416  * When the owner thread terminates an $(D OwnerTerminated) message will be
417  * sent to the new thread, causing an $(D OwnerTerminated) exception to be
418  * thrown on $(D receive()).
419  *
420  * Params:
421  *  fn   = The function to execute.
422  *  args = Arguments to the function.
423  *
424  * Returns:
425  *  A Tid representing the new logical thread.
426  *
427  * Notes:
428  *  $(D args) must not have unshared aliasing.  In other words, all arguments
429  *  to $(D fn) must either be $(D shared) or $(D immutable) or have no
430  *  pointer indirection.  This is necessary for enforcing isolation among
431  *  threads.
432  *
433  * Example:
434  * ---
435  * import std.stdio, std.concurrency;
436  *
437  * void f1(string str)
438  * {
439  *     writeln(str);
440  * }
441  *
442  * void f2(char[] str)
443  * {
444  *     writeln(str);
445  * }
446  *
447  * void main()
448  * {
449  *     auto str = "Hello, world";
450  *
451  *     // Works:  string is immutable.
452  *     auto tid1 = spawn(&f1, str);
453  *
454  *     // Fails:  char[] has mutable aliasing.
455  *     auto tid2 = spawn(&f2, str.dup);
456  *
457  *     // New thread with anonymous function
458  *     spawn({ writeln("This is so great!"); });
459  * }
460  * ---
461  */
462 Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T))
463 {
464     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
465     return _spawn(false, fn, args);
466 }
467 
468 /**
469  * Starts fn(args) in a logical thread and will receive a LinkTerminated
470  * message when the operation terminates.
471  *
472  * Executes the supplied function in a new logical thread represented by
473  * Tid.  This new thread is linked to the calling thread so that if either
474  * it or the calling thread terminates a LinkTerminated message will be sent
475  * to the other, causing a LinkTerminated exception to be thrown on receive().
476  * The owner relationship from spawn() is preserved as well, so if the link
477  * between threads is broken, owner termination will still result in an
478  * OwnerTerminated exception to be thrown on receive().
479  *
480  * Params:
481  *  fn   = The function to execute.
482  *  args = Arguments to the function.
483  *
484  * Returns:
485  *  A Tid representing the new thread.
486  */
487 Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T))
488 {
489     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
490     return _spawn(true, fn, args);
491 }
492 
493 /*
494  *
495  */
496 private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T))
497 {
498     // TODO: MessageList and &exec should be shared.
499     auto spawnTid = Tid(new MessageBox);
500     auto ownerTid = thisTid;
501 
exec()502     void exec()
503     {
504         thisInfo.ident = spawnTid;
505         thisInfo.owner = ownerTid;
506         fn(args);
507     }
508 
509     // TODO: MessageList and &exec should be shared.
510     if (scheduler !is null)
511         scheduler.spawn(&exec);
512     else
513     {
514         auto t = new Thread(&exec);
515         t.start();
516     }
517     thisInfo.links[spawnTid] = linked;
518     return spawnTid;
519 }
520 
521 @system unittest
522 {
523     void function() fn1;
524     void function(int) fn2;
525     static assert(__traits(compiles, spawn(fn1)));
526     static assert(__traits(compiles, spawn(fn2, 2)));
527     static assert(!__traits(compiles, spawn(fn1, 1)));
528     static assert(!__traits(compiles, spawn(fn2)));
529 
530     void delegate(int) shared dg1;
531     shared(void delegate(int)) dg2;
532     shared(void delegate(long) shared) dg3;
533     shared(void delegate(real, int, long) shared) dg4;
534     void delegate(int) immutable dg5;
535     void delegate(int) dg6;
536     static assert(__traits(compiles, spawn(dg1, 1)));
537     static assert(__traits(compiles, spawn(dg2, 2)));
538     static assert(__traits(compiles, spawn(dg3, 3)));
539     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
540     static assert(__traits(compiles, spawn(dg5, 5)));
541     static assert(!__traits(compiles, spawn(dg6, 6)));
542 
opCall(int)543     auto callable1  = new class{ void opCall(int) shared {} };
cast(shared)544     auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
opCall(int)545     auto callable3  = new class{ void opCall(int) immutable {} };
cast(immutable)546     auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
opCall(int)547     auto callable5  = new class{ void opCall(int) {} };
cast(shared)548     auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
cast(immutable)549     auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
cast(shared)550     auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
cast(const shared)551     auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
cast(const shared)552     auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
cast(immutable)553     auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
554     static assert(!__traits(compiles, spawn(callable1,  1)));
555     static assert( __traits(compiles, spawn(callable2,  2)));
556     static assert(!__traits(compiles, spawn(callable3,  3)));
557     static assert( __traits(compiles, spawn(callable4,  4)));
558     static assert(!__traits(compiles, spawn(callable5,  5)));
559     static assert(!__traits(compiles, spawn(callable6,  6)));
560     static assert(!__traits(compiles, spawn(callable7,  7)));
561     static assert( __traits(compiles, spawn(callable8,  8)));
562     static assert(!__traits(compiles, spawn(callable9,  9)));
563     static assert( __traits(compiles, spawn(callable10, 10)));
564     static assert( __traits(compiles, spawn(callable11, 11)));
565 }
566 
567 /**
568  * Places the values as a message at the back of tid's message queue.
569  *
570  * Sends the supplied value to the thread represented by tid.  As with
571  * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing.
572  */
send(T...)573 void send(T...)(Tid tid, T vals)
574 {
575     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
576     _send(tid, vals);
577 }
578 
579 /**
580  * Places the values as a message on the front of tid's message queue.
581  *
582  * Send a message to $(D tid) but place it at the front of $(D tid)'s message
583  * queue instead of at the back.  This function is typically used for
584  * out-of-band communication, to signal exceptional conditions, etc.
585  */
prioritySend(T...)586 void prioritySend(T...)(Tid tid, T vals)
587 {
588     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
589     _send(MsgType.priority, tid, vals);
590 }
591 
592 /*
593  * ditto
594  */
_send(T...)595 private void _send(T...)(Tid tid, T vals)
596 {
597     _send(MsgType.standard, tid, vals);
598 }
599 
600 /*
601  * Implementation of send.  This allows parameter checking to be different for
602  * both Tid.send() and .send().
603  */
_send(T...)604 private void _send(T...)(MsgType type, Tid tid, T vals)
605 {
606     auto msg = Message(type, vals);
607     tid.mbox.put(msg);
608 }
609 
610 /**
611  * Receives a message from another thread.
612  *
613  * Receive a message from another thread, or block if no messages of the
614  * specified types are available.  This function works by pattern matching
615  * a message against a set of delegates and executing the first match found.
616  *
617  * If a delegate that accepts a $(REF Variant, std,variant) is included as
618  * the last argument to $(D receive), it will match any message that was not
619  * matched by an earlier delegate.  If more than one argument is sent,
620  * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values
621  * sent.
622  *
623  * Example:
624  * ---
625  * import std.stdio;
626  * import std.variant;
627  * import std.concurrency;
628  *
629  * void spawnedFunction()
630  * {
631  *     receive(
632  *         (int i) { writeln("Received an int."); },
633  *         (float f) { writeln("Received a float."); },
634  *         (Variant v) { writeln("Received some other type."); }
635  *     );
636  * }
637  *
638  * void main()
639  * {
640  *      auto tid = spawn(&spawnedFunction);
641  *      send(tid, 42);
642  * }
643  * ---
644  */
receive(T...)645 void receive(T...)( T ops )
646 in
647 {
648     assert(thisInfo.ident.mbox !is null,
649            "Cannot receive a message until a thread was spawned "
650            ~ "or thisTid was passed to a running thread.");
651 }
652 body
653 {
654     checkops( ops );
655 
656     thisInfo.ident.mbox.get( ops );
657 }
658 
659 
660 @safe unittest
661 {
662     static assert( __traits( compiles,
663                       {
664                           receive( (Variant x) {} );
665                           receive( (int x) {}, (Variant x) {} );
666                       } ) );
667 
668     static assert( !__traits( compiles,
669                        {
670                            receive( (Variant x) {}, (int x) {} );
671                        } ) );
672 
673     static assert( !__traits( compiles,
674                        {
675                            receive( (int x) {}, (int x) {} );
676                        } ) );
677 }
678 
679 // Make sure receive() works with free functions as well.
version(unittest)680 version (unittest)
681 {
682     private void receiveFunction(int x) {}
683 }
684 @safe unittest
685 {
686     static assert( __traits( compiles,
687                       {
688                           receive( &receiveFunction );
689                           receive( &receiveFunction, (Variant x) {} );
690                       } ) );
691 }
692 
693 
receiveOnlyRet(T...)694 private template receiveOnlyRet(T...)
695 {
696     static if ( T.length == 1 )
697     {
698         alias receiveOnlyRet = T[0];
699     }
700     else
701     {
702         import std.typecons : Tuple;
703         alias receiveOnlyRet = Tuple!(T);
704     }
705 }
706 
707 /**
708  * Receives only messages with arguments of types $(D T).
709  *
710  * Throws:  $(D MessageMismatch) if a message of types other than $(D T)
711  *          is received.
712  *
713  * Returns: The received message.  If $(D T.length) is greater than one,
714  *          the message will be packed into a $(REF Tuple, std,typecons).
715  *
716  * Example:
717  * ---
718  * import std.concurrency;
719  *
720  * void spawnedFunc()
721  * {
722  *     auto msg = receiveOnly!(int, string)();
723  *     assert(msg[0] == 42);
724  *     assert(msg[1] == "42");
725  * }
726  *
727  * void main()
728  * {
729  *     auto tid = spawn(&spawnedFunc);
730  *     send(tid, 42, "42");
731  * }
732  * ---
733  */
734 receiveOnlyRet!(T) receiveOnly(T...)()
735 in
736 {
737     assert(thisInfo.ident.mbox !is null,
738         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
739 }
740 body
741 {
742     import std.format : format;
743     import std.typecons : Tuple;
744 
745     Tuple!(T) ret;
746 
747     thisInfo.ident.mbox.get((T val) {
748         static if (T.length)
749             ret.field = val;
750     },
751     (LinkTerminated e) { throw e; },
752     (OwnerTerminated e) { throw e; },
753     (Variant val) {
754         static if (T.length > 1)
755             string exp = T.stringof;
756         else
757             string exp = T[0].stringof;
758 
759         throw new MessageMismatch(
760             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
761     });
762     static if (T.length == 1)
763         return ret[0];
764     else
765         return ret;
766 }
767 
768 @system unittest
769 {
t1(Tid mainTid)770     static void t1(Tid mainTid)
771     {
772         try
773         {
774             receiveOnly!string();
775             mainTid.send("");
776         }
777         catch (Throwable th)
778         {
779             mainTid.send(th.msg);
780         }
781     }
782 
783     auto tid = spawn(&t1, thisTid);
784     tid.send(1);
785     string result = receiveOnly!string();
786     assert(result == "Unexpected message type: expected 'string', got 'int'");
787 }
788 
789 /**
790  * Tries to receive but will give up if no matches arrive within duration.
791  * Won't wait at all if provided $(REF Duration, core,time) is negative.
792  *
793  * Same as $(D receive) except that rather than wait forever for a message,
794  * it waits until either it receives a message or the given
795  * $(REF Duration, core,time) has passed. It returns $(D true) if it received a
796  * message and $(D false) if it timed out waiting for one.
797  */
receiveTimeout(T...)798 bool receiveTimeout(T...)(Duration duration, T ops)
799 in
800 {
801     assert(thisInfo.ident.mbox !is null,
802         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
803 }
804 body
805 {
806     checkops(ops);
807 
808     return thisInfo.ident.mbox.get(duration, ops);
809 }
810 
811 @safe unittest
812 {
813     static assert(__traits(compiles, {
814         receiveTimeout(msecs(0), (Variant x) {});
815         receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
816     }));
817 
818     static assert(!__traits(compiles, {
819         receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
820     }));
821 
822     static assert(!__traits(compiles, {
823         receiveTimeout(msecs(0), (int x) {}, (int x) {});
824     }));
825 
826     static assert(__traits(compiles, {
827         receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
828     }));
829 }
830 
831 // MessageBox Limits
832 
833 /**
834  * These behaviors may be specified when a mailbox is full.
835  */
836 enum OnCrowding
837 {
838     block, /// Wait until room is available.
839     throwException, /// Throw a MailboxFull exception.
840     ignore /// Abort the send and return.
841 }
842 
843 private
844 {
onCrowdingBlock(Tid tid)845     bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
846     {
847         return true;
848     }
849 
onCrowdingThrow(Tid tid)850     bool onCrowdingThrow(Tid tid) @safe pure
851     {
852         throw new MailboxFull(tid);
853     }
854 
onCrowdingIgnore(Tid tid)855     bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
856     {
857         return false;
858     }
859 }
860 
861 /**
862  * Sets a maximum mailbox size.
863  *
864  * Sets a limit on the maximum number of user messages allowed in the mailbox.
865  * If this limit is reached, the caller attempting to add a new message will
866  * execute the behavior specified by doThis.  If messages is zero, the mailbox
867  * is unbounded.
868  *
869  * Params:
870  *  tid      = The Tid of the thread for which this limit should be set.
871  *  messages = The maximum number of messages or zero if no limit.
872  *  doThis   = The behavior executed when a message is sent to a full
873  *             mailbox.
874  */
setMaxMailboxSize(Tid tid,size_t messages,OnCrowding doThis)875 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
876 {
877     final switch (doThis)
878     {
879     case OnCrowding.block:
880         return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
881     case OnCrowding.throwException:
882         return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
883     case OnCrowding.ignore:
884         return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
885     }
886 }
887 
888 /**
889  * Sets a maximum mailbox size.
890  *
891  * Sets a limit on the maximum number of user messages allowed in the mailbox.
892  * If this limit is reached, the caller attempting to add a new message will
893  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
894  *
895  * Params:
896  *  tid      = The Tid of the thread for which this limit should be set.
897  *  messages = The maximum number of messages or zero if no limit.
898  *  onCrowdingDoThis = The routine called when a message is sent to a full
899  *                     mailbox.
900  */
setMaxMailboxSize(Tid tid,size_t messages,bool function (Tid)onCrowdingDoThis)901 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
902 {
903     tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
904 }
905 
906 private
907 {
908     __gshared Tid[string] tidByName;
909     __gshared string[][Tid] namesByTid;
910 }
911 
registryLock()912 private @property Mutex registryLock()
913 {
914     __gshared Mutex impl;
915     initOnce!impl(new Mutex);
916     return impl;
917 }
918 
unregisterMe()919 private void unregisterMe()
920 {
921     auto me = thisInfo.ident;
922     if (thisInfo.ident != Tid.init)
923     {
924         synchronized (registryLock)
925         {
926             if (auto allNames = me in namesByTid)
927             {
928                 foreach (name; *allNames)
929                     tidByName.remove(name);
930                 namesByTid.remove(me);
931             }
932         }
933     }
934 }
935 
936 /**
937  * Associates name with tid.
938  *
939  * Associates name with tid in a process-local map.  When the thread
940  * represented by tid terminates, any names associated with it will be
941  * automatically unregistered.
942  *
943  * Params:
944  *  name = The name to associate with tid.
945  *  tid  = The tid register by name.
946  *
947  * Returns:
948  *  true if the name is available and tid is not known to represent a
949  *  defunct thread.
950  */
951 bool register(string name, Tid tid)
952 {
synchronized(registryLock)953     synchronized (registryLock)
954     {
955         if (name in tidByName)
956             return false;
957         if (tid.mbox.isClosed)
958             return false;
959         namesByTid[tid] ~= name;
960         tidByName[name] = tid;
961         return true;
962     }
963 }
964 
965 /**
966  * Removes the registered name associated with a tid.
967  *
968  * Params:
969  *  name = The name to unregister.
970  *
971  * Returns:
972  *  true if the name is registered, false if not.
973  */
unregister(string name)974 bool unregister(string name)
975 {
976     import std.algorithm.mutation : remove, SwapStrategy;
977     import std.algorithm.searching : countUntil;
978 
979     synchronized (registryLock)
980     {
981         if (auto tid = name in tidByName)
982         {
983             auto allNames = *tid in namesByTid;
984             auto pos = countUntil(*allNames, name);
985             remove!(SwapStrategy.unstable)(*allNames, pos);
986             tidByName.remove(name);
987             return true;
988         }
989         return false;
990     }
991 }
992 
993 /**
994  * Gets the Tid associated with name.
995  *
996  * Params:
997  *  name = The name to locate within the registry.
998  *
999  * Returns:
1000  *  The associated Tid or Tid.init if name is not registered.
1001  */
locate(string name)1002 Tid locate(string name)
1003 {
1004     synchronized (registryLock)
1005     {
1006         if (auto tid = name in tidByName)
1007             return *tid;
1008         return Tid.init;
1009     }
1010 }
1011 
1012 /**
1013  * Encapsulates all implementation-level data needed for scheduling.
1014  *
1015  * When defining a Scheduler, an instance of this struct must be associated
1016  * with each logical thread.  It contains all implementation-level information
1017  * needed by the internal API.
1018  */
1019 struct ThreadInfo
1020 {
1021     Tid ident;
1022     bool[Tid] links;
1023     Tid owner;
1024 
1025     /**
1026      * Gets a thread-local instance of ThreadInfo.
1027      *
1028      * Gets a thread-local instance of ThreadInfo, which should be used as the
1029      * default instance when info is requested for a thread not created by the
1030      * Scheduler.
1031      */
thisInfoThreadInfo1032     static @property ref thisInfo() nothrow
1033     {
1034         static ThreadInfo val;
1035         return val;
1036     }
1037 
1038     /**
1039      * Cleans up this ThreadInfo.
1040      *
1041      * This must be called when a scheduled thread terminates.  It tears down
1042      * the messaging system for the thread and notifies interested parties of
1043      * the thread's termination.
1044      */
cleanupThreadInfo1045     void cleanup()
1046     {
1047         if (ident.mbox !is null)
1048             ident.mbox.close();
1049         foreach (tid; links.keys)
1050             _send(MsgType.linkDead, tid, ident);
1051         if (owner != Tid.init)
1052             _send(MsgType.linkDead, owner, ident);
1053         unregisterMe(); // clean up registry entries
1054     }
1055 }
1056 
1057 /**
1058  * A Scheduler controls how threading is performed by spawn.
1059  *
1060  * Implementing a Scheduler allows the concurrency mechanism used by this
1061  * module to be customized according to different needs.  By default, a call
1062  * to spawn will create a new kernel thread that executes the supplied routine
1063  * and terminates when finished.  But it is possible to create Schedulers that
1064  * reuse threads, that multiplex Fibers (coroutines) across a single thread,
1065  * or any number of other approaches.  By making the choice of Scheduler a
1066  * user-level option, std.concurrency may be used for far more types of
1067  * application than if this behavior were predefined.
1068  *
1069  * Example:
1070  * ---
1071  * import std.concurrency;
1072  * import std.stdio;
1073  *
1074  * void main()
1075  * {
1076  *     scheduler = new FiberScheduler;
1077  *     scheduler.start(
1078  *     {
1079  *         writeln("the rest of main goes here");
1080  *     });
1081  * }
1082  * ---
1083  *
1084  * Some schedulers have a dispatching loop that must run if they are to work
1085  * properly, so for the sake of consistency, when using a scheduler, start()
1086  * must be called within main().  This yields control to the scheduler and
1087  * will ensure that any spawned threads are executed in an expected manner.
1088  */
1089 interface Scheduler
1090 {
1091     /**
1092      * Spawns the supplied op and starts the Scheduler.
1093      *
1094      * This is intended to be called at the start of the program to yield all
1095      * scheduling to the active Scheduler instance.  This is necessary for
1096      * schedulers that explicitly dispatch threads rather than simply relying
1097      * on the operating system to do so, and so start should always be called
1098      * within main() to begin normal program execution.
1099      *
1100      * Params:
1101      *  op = A wrapper for whatever the main thread would have done in the
1102      *       absence of a custom scheduler.  It will be automatically executed
1103      *       via a call to spawn by the Scheduler.
1104      */
1105     void start(void delegate() op);
1106 
1107     /**
1108      * Assigns a logical thread to execute the supplied op.
1109      *
1110      * This routine is called by spawn.  It is expected to instantiate a new
1111      * logical thread and run the supplied operation.  This thread must call
1112      * thisInfo.cleanup() when the thread terminates if the scheduled thread
1113      * is not a kernel thread--all kernel threads will have their ThreadInfo
1114      * cleaned up automatically by a thread-local destructor.
1115      *
1116      * Params:
1117      *  op = The function to execute.  This may be the actual function passed
1118      *       by the user to spawn itself, or may be a wrapper function.
1119      */
1120     void spawn(void delegate() op);
1121 
1122     /**
1123      * Yields execution to another logical thread.
1124      *
1125      * This routine is called at various points within concurrency-aware APIs
1126      * to provide a scheduler a chance to yield execution when using some sort
1127      * of cooperative multithreading model.  If this is not appropriate, such
1128      * as when each logical thread is backed by a dedicated kernel thread,
1129      * this routine may be a no-op.
1130      */
1131     void yield() nothrow;
1132 
1133     /**
1134      * Returns an appropriate ThreadInfo instance.
1135      *
1136      * Returns an instance of ThreadInfo specific to the logical thread that
1137      * is calling this routine or, if the calling thread was not create by
1138      * this scheduler, returns ThreadInfo.thisInfo instead.
1139      */
1140     @property ref ThreadInfo thisInfo() nothrow;
1141 
1142     /**
1143      * Creates a Condition variable analog for signaling.
1144      *
1145      * Creates a new Condition variable analog which is used to check for and
1146      * to signal the addition of messages to a thread's message queue.  Like
1147      * yield, some schedulers may need to define custom behavior so that calls
1148      * to Condition.wait() yield to another thread when no new messages are
1149      * available instead of blocking.
1150      *
1151      * Params:
1152      *  m = The Mutex that will be associated with this condition.  It will be
1153      *      locked prior to any operation on the condition, and so in some
1154      *      cases a Scheduler may need to hold this reference and unlock the
1155      *      mutex before yielding execution to another logical thread.
1156      */
1157     Condition newCondition(Mutex m) nothrow;
1158 }
1159 
1160 /**
1161  * An example Scheduler using kernel threads.
1162  *
1163  * This is an example Scheduler that mirrors the default scheduling behavior
1164  * of creating one kernel thread per call to spawn.  It is fully functional
1165  * and may be instantiated and used, but is not a necessary part of the
1166  * default functioning of this module.
1167  */
1168 class ThreadScheduler : Scheduler
1169 {
1170     /**
1171      * This simply runs op directly, since no real scheduling is needed by
1172      * this approach.
1173      */
start(void delegate ()op)1174     void start(void delegate() op)
1175     {
1176         op();
1177     }
1178 
1179     /**
1180      * Creates a new kernel thread and assigns it to run the supplied op.
1181      */
spawn(void delegate ()op)1182     void spawn(void delegate() op)
1183     {
1184         auto t = new Thread(op);
1185         t.start();
1186     }
1187 
1188     /**
1189      * This scheduler does no explicit multiplexing, so this is a no-op.
1190      */
yield()1191     void yield() nothrow
1192     {
1193         // no explicit yield needed
1194     }
1195 
1196     /**
1197      * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
1198      * ThreadInfo, which is the correct behavior for this scheduler.
1199      */
thisInfo()1200     @property ref ThreadInfo thisInfo() nothrow
1201     {
1202         return ThreadInfo.thisInfo;
1203     }
1204 
1205     /**
1206      * Creates a new Condition variable.  No custom behavior is needed here.
1207      */
newCondition(Mutex m)1208     Condition newCondition(Mutex m) nothrow
1209     {
1210         return new Condition(m);
1211     }
1212 }
1213 
1214 /**
1215  * An example Scheduler using Fibers.
1216  *
1217  * This is an example scheduler that creates a new Fiber per call to spawn
1218  * and multiplexes the execution of all fibers within the main thread.
1219  */
1220 class FiberScheduler : Scheduler
1221 {
1222     /**
1223      * This creates a new Fiber for the supplied op and then starts the
1224      * dispatcher.
1225      */
start(void delegate ()op)1226     void start(void delegate() op)
1227     {
1228         create(op);
1229         dispatch();
1230     }
1231 
1232     /**
1233      * This created a new Fiber for the supplied op and adds it to the
1234      * dispatch list.
1235      */
spawn(void delegate ()op)1236     void spawn(void delegate() op) nothrow
1237     {
1238         create(op);
1239         yield();
1240     }
1241 
1242     /**
1243      * If the caller is a scheduled Fiber, this yields execution to another
1244      * scheduled Fiber.
1245      */
yield()1246     void yield() nothrow
1247     {
1248         // NOTE: It's possible that we should test whether the calling Fiber
1249         //       is an InfoFiber before yielding, but I think it's reasonable
1250         //       that any (non-Generator) fiber should yield here.
1251         if (Fiber.getThis())
1252             Fiber.yield();
1253     }
1254 
1255     /**
1256      * Returns an appropriate ThreadInfo instance.
1257      *
1258      * Returns a ThreadInfo instance specific to the calling Fiber if the
1259      * Fiber was created by this dispatcher, otherwise it returns
1260      * ThreadInfo.thisInfo.
1261      */
thisInfo()1262     @property ref ThreadInfo thisInfo() nothrow
1263     {
1264         auto f = cast(InfoFiber) Fiber.getThis();
1265 
1266         if (f !is null)
1267             return f.info;
1268         return ThreadInfo.thisInfo;
1269     }
1270 
1271     /**
1272      * Returns a Condition analog that yields when wait or notify is called.
1273      */
newCondition(Mutex m)1274     Condition newCondition(Mutex m) nothrow
1275     {
1276         return new FiberCondition(m);
1277     }
1278 
1279 private:
1280     static class InfoFiber : Fiber
1281     {
1282         ThreadInfo info;
1283 
this(void delegate ()op)1284         this(void delegate() op) nothrow
1285         {
1286             super(op);
1287         }
1288     }
1289 
1290     class FiberCondition : Condition
1291     {
this(Mutex m)1292         this(Mutex m) nothrow
1293         {
1294             super(m);
1295             notified = false;
1296         }
1297 
wait()1298         override void wait() nothrow
1299         {
1300             scope (exit) notified = false;
1301 
1302             while (!notified)
1303                 switchContext();
1304         }
1305 
wait(Duration period)1306         override bool wait(Duration period) nothrow
1307         {
1308             import core.time : MonoTime;
1309 
1310             scope (exit) notified = false;
1311 
1312             for (auto limit = MonoTime.currTime + period;
1313                  !notified && !period.isNegative;
1314                  period = limit - MonoTime.currTime)
1315             {
1316                 yield();
1317             }
1318             return notified;
1319         }
1320 
notify()1321         override void notify() nothrow
1322         {
1323             notified = true;
1324             switchContext();
1325         }
1326 
notifyAll()1327         override void notifyAll() nothrow
1328         {
1329             notified = true;
1330             switchContext();
1331         }
1332 
1333     private:
switchContext()1334         void switchContext() nothrow
1335         {
1336             mutex_nothrow.unlock_nothrow();
1337             scope (exit) mutex_nothrow.lock_nothrow();
1338             yield();
1339         }
1340 
1341         private bool notified;
1342     }
1343 
1344 private:
dispatch()1345     void dispatch()
1346     {
1347         import std.algorithm.mutation : remove;
1348 
1349         while (m_fibers.length > 0)
1350         {
1351             auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1352             if (t !is null && !(cast(OwnerTerminated) t))
1353             {
1354                 throw t;
1355             }
1356             if (m_fibers[m_pos].state == Fiber.State.TERM)
1357             {
1358                 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1359                     m_pos = 0;
1360             }
1361             else if (m_pos++ >= m_fibers.length - 1)
1362             {
1363                 m_pos = 0;
1364             }
1365         }
1366     }
1367 
create(void delegate ()op)1368     void create(void delegate() op) nothrow
1369     {
1370         void wrap()
1371         {
1372             scope (exit)
1373             {
1374                 thisInfo.cleanup();
1375             }
1376             op();
1377         }
1378 
1379         m_fibers ~= new InfoFiber(&wrap);
1380     }
1381 
1382 private:
1383     Fiber[] m_fibers;
1384     size_t m_pos;
1385 }
1386 
1387 @system unittest
1388 {
receive(Condition cond,ref size_t received)1389     static void receive(Condition cond, ref size_t received)
1390     {
1391         while (true)
1392         {
1393             synchronized (cond.mutex)
1394             {
1395                 cond.wait();
1396                 ++received;
1397             }
1398         }
1399     }
1400 
send(Condition cond,ref size_t sent)1401     static void send(Condition cond, ref size_t sent)
1402     {
1403         while (true)
1404         {
1405             synchronized (cond.mutex)
1406             {
1407                 ++sent;
1408                 cond.notify();
1409             }
1410         }
1411     }
1412 
1413     auto fs = new FiberScheduler;
1414     auto mtx = new Mutex;
1415     auto cond = fs.newCondition(mtx);
1416 
1417     size_t received, sent;
1418     auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1419     waiter.call();
1420     assert(received == 0);
1421     notifier.call();
1422     assert(sent == 1);
1423     assert(received == 0);
1424     waiter.call();
1425     assert(received == 1);
1426     waiter.call();
1427     assert(received == 1);
1428 }
1429 
1430 /**
1431  * Sets the Scheduler behavior within the program.
1432  *
1433  * This variable sets the Scheduler behavior within this program.  Typically,
1434  * when setting a Scheduler, scheduler.start() should be called in main.  This
1435  * routine will not return until program execution is complete.
1436  */
1437 __gshared Scheduler scheduler;
1438 
1439 // Generator
1440 
1441 /**
1442  * If the caller is a Fiber and is not a Generator, this function will call
1443  * scheduler.yield() or Fiber.yield(), as appropriate.
1444  */
yield()1445 void yield() nothrow
1446 {
1447     auto fiber = Fiber.getThis();
1448     if (!(cast(IsGenerator) fiber))
1449     {
1450         if (scheduler is null)
1451         {
1452             if (fiber)
1453                 return Fiber.yield();
1454         }
1455         else
1456             scheduler.yield();
1457     }
1458 }
1459 
1460 /// Used to determine whether a Generator is running.
1461 private interface IsGenerator {}
1462 
1463 
1464 /**
1465  * A Generator is a Fiber that periodically returns values of type T to the
1466  * caller via yield.  This is represented as an InputRange.
1467  *
1468  * Example:
1469  * ---
1470  * import std.concurrency;
1471  * import std.stdio;
1472  *
1473  *
1474  * void main()
1475  * {
1476  *     auto tid = spawn(
1477  *     {
1478  *         while (true)
1479  *         {
1480  *             writeln(receiveOnly!int());
1481  *         }
1482  *     });
1483  *
1484  *     auto r = new Generator!int(
1485  *     {
1486  *         foreach (i; 1 .. 10)
1487  *             yield(i);
1488  *     });
1489  *
1490  *     foreach (e; r)
1491  *     {
1492  *         tid.send(e);
1493  *     }
1494  * }
1495  * ---
1496  */
Generator(T)1497 class Generator(T) :
1498     Fiber, IsGenerator, InputRange!T
1499 {
1500     /**
1501      * Initializes a generator object which is associated with a static
1502      * D function.  The function will be called once to prepare the range
1503      * for iteration.
1504      *
1505      * Params:
1506      *  fn = The fiber function.
1507      *
1508      * In:
1509      *  fn must not be null.
1510      */
1511     this(void function() fn)
1512     {
1513         super(fn);
1514         call();
1515     }
1516 
1517     /**
1518      * Initializes a generator object which is associated with a static
1519      * D function.  The function will be called once to prepare the range
1520      * for iteration.
1521      *
1522      * Params:
1523      *  fn = The fiber function.
1524      *  sz = The stack size for this fiber.
1525      *
1526      * In:
1527      *  fn must not be null.
1528      */
1529     this(void function() fn, size_t sz)
1530     {
1531         super(fn, sz);
1532         call();
1533     }
1534 
1535     /**
1536      * Initializes a generator object which is associated with a dynamic
1537      * D function.  The function will be called once to prepare the range
1538      * for iteration.
1539      *
1540      * Params:
1541      *  dg = The fiber function.
1542      *
1543      * In:
1544      *  dg must not be null.
1545      */
1546     this(void delegate() dg)
1547     {
1548         super(dg);
1549         call();
1550     }
1551 
1552     /**
1553      * Initializes a generator object which is associated with a dynamic
1554      * D function.  The function will be called once to prepare the range
1555      * for iteration.
1556      *
1557      * Params:
1558      *  dg = The fiber function.
1559      *  sz = The stack size for this fiber.
1560      *
1561      * In:
1562      *  dg must not be null.
1563      */
1564     this(void delegate() dg, size_t sz)
1565     {
1566         super(dg, sz);
1567         call();
1568     }
1569 
1570     /**
1571      * Returns true if the generator is empty.
1572      */
1573     final bool empty() @property
1574     {
1575         return m_value is null || state == State.TERM;
1576     }
1577 
1578     /**
1579      * Obtains the next value from the underlying function.
1580      */
1581     final void popFront()
1582     {
1583         call();
1584     }
1585 
1586     /**
1587      * Returns the most recently generated value by shallow copy.
1588      */
1589     final T front() @property
1590     {
1591         return *m_value;
1592     }
1593 
1594     /**
1595      * Returns the most recently generated value without executing a
1596      * copy contructor. Will not compile for element types defining a
1597      * postblit, because Generator does not return by reference.
1598      */
1599     final T moveFront()
1600     {
1601         static if (!hasElaborateCopyConstructor!T)
1602         {
1603             return front;
1604         }
1605         else
1606         {
1607             static assert(0,
1608                     "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1609         }
1610     }
1611 
1612     final int opApply(scope int delegate(T) loopBody)
1613     {
1614         int broken;
1615         for (; !empty; popFront())
1616         {
1617             broken = loopBody(front);
1618             if (broken) break;
1619         }
1620         return broken;
1621     }
1622 
1623     final int opApply(scope int delegate(size_t, T) loopBody)
1624     {
1625         int broken;
1626         for (size_t i; !empty; ++i, popFront())
1627         {
1628             broken = loopBody(i, front);
1629             if (broken) break;
1630         }
1631         return broken;
1632     }
1633 private:
1634     T* m_value;
1635 }
1636 
1637 /**
1638  * Yields a value of type T to the caller of the currently executing
1639  * generator.
1640  *
1641  * Params:
1642  *  value = The value to yield.
1643  */
yield(T)1644 void yield(T)(ref T value)
1645 {
1646     Generator!T cur = cast(Generator!T) Fiber.getThis();
1647     if (cur !is null && cur.state == Fiber.State.EXEC)
1648     {
1649         cur.m_value = &value;
1650         return Fiber.yield();
1651     }
1652     throw new Exception("yield(T) called with no active generator for the supplied type");
1653 }
1654 
1655 /// ditto
yield(T)1656 void yield(T)(T value)
1657 {
1658     yield(value);
1659 }
1660 
1661 @system unittest
1662 {
1663     import core.exception;
1664     import std.exception;
1665 
testScheduler(Scheduler s)1666     static void testScheduler(Scheduler s)
1667     {
1668         scheduler = s;
1669         scheduler.start({
1670             auto tid = spawn({
1671                 int i;
1672 
1673                 try
1674                 {
1675                     for (i = 1; i < 10; i++)
1676                     {
1677                         assertNotThrown!AssertError(assert(receiveOnly!int() == i));
1678                     }
1679                 }
1680                 catch (OwnerTerminated e)
1681                 {
1682 
1683                 }
1684 
1685                 // i will advance 1 past the last value expected
1686                 assert(i == 4);
1687             });
1688 
1689             auto r = new Generator!int({
1690                 assertThrown!Exception(yield(2.0));
1691                 yield(); // ensure this is a no-op
1692                 yield(1);
1693                 yield(); // also once something has been yielded
1694                 yield(2);
1695                 yield(3);
1696             });
1697 
1698             foreach (e; r)
1699             {
1700                 tid.send(e);
1701             }
1702         });
1703         scheduler = null;
1704     }
1705 
1706     testScheduler(new ThreadScheduler);
1707     testScheduler(new FiberScheduler);
1708 }
1709 ///
1710 @system unittest
1711 {
1712     import std.range;
1713 
1714     InputRange!int myIota = iota(10).inputRangeObject;
1715 
1716     myIota.popFront();
1717     myIota.popFront();
1718     assert(myIota.moveFront == 2);
1719     assert(myIota.front == 2);
1720     myIota.popFront();
1721     assert(myIota.front == 3);
1722 
1723     //can be assigned to std.range.interfaces.InputRange directly
1724     myIota = new Generator!int(
1725     {
1726         foreach (i; 0 .. 10) yield(i);
1727     });
1728 
1729     myIota.popFront();
1730     myIota.popFront();
1731     assert(myIota.moveFront == 2);
1732     assert(myIota.front == 2);
1733     myIota.popFront();
1734     assert(myIota.front == 3);
1735 
1736     size_t[2] counter = [0, 0];
1737     foreach (i, unused; myIota) counter[] += [1, i];
1738 
1739     assert(myIota.empty);
1740     assert(counter == [7, 21]);
1741 }
1742 
1743 private
1744 {
1745     /*
1746      * A MessageBox is a message queue for one thread.  Other threads may send
1747      * messages to this owner by calling put(), and the owner receives them by
1748      * calling get().  The put() call is therefore effectively shared and the
1749      * get() call is effectively local.  setMaxMsgs may be used by any thread
1750      * to limit the size of the message queue.
1751      */
1752     class MessageBox
1753     {
this()1754         this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
1755         {
1756             m_lock = new Mutex;
1757             m_closed = false;
1758 
1759             if (scheduler is null)
1760             {
1761                 m_putMsg = new Condition(m_lock);
1762                 m_notFull = new Condition(m_lock);
1763             }
1764             else
1765             {
1766                 m_putMsg = scheduler.newCondition(m_lock);
1767                 m_notFull = scheduler.newCondition(m_lock);
1768             }
1769         }
1770 
1771         ///
isClosed()1772         final @property bool isClosed() @safe @nogc pure
1773         {
1774             synchronized (m_lock)
1775             {
1776                 return m_closed;
1777             }
1778         }
1779 
1780         /*
1781          * Sets a limit on the maximum number of user messages allowed in the
1782          * mailbox.  If this limit is reached, the caller attempting to add
1783          * a new message will execute call.  If num is zero, there is no limit
1784          * on the message queue.
1785          *
1786          * Params:
1787          *  num  = The maximum size of the queue or zero if the queue is
1788          *         unbounded.
1789          *  call = The routine to call when the queue is full.
1790          */
setMaxMsgs(size_t num,bool function (Tid)call)1791         final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
1792         {
1793             synchronized (m_lock)
1794             {
1795                 m_maxMsgs = num;
1796                 m_onMaxMsgs = call;
1797             }
1798         }
1799 
1800         /*
1801          * If maxMsgs is not set, the message is added to the queue and the
1802          * owner is notified.  If the queue is full, the message will still be
1803          * accepted if it is a control message, otherwise onCrowdingDoThis is
1804          * called.  If the routine returns true, this call will block until
1805          * the owner has made space available in the queue.  If it returns
1806          * false, this call will abort.
1807          *
1808          * Params:
1809          *  msg = The message to put in the queue.
1810          *
1811          * Throws:
1812          *  An exception if the queue is full and onCrowdingDoThis throws.
1813          */
put(ref Message msg)1814         final void put(ref Message msg)
1815         {
1816             synchronized (m_lock)
1817             {
1818                 // TODO: Generate an error here if m_closed is true, or maybe
1819                 //       put a message in the caller's queue?
1820                 if (!m_closed)
1821                 {
1822                     while (true)
1823                     {
1824                         if (isPriorityMsg(msg))
1825                         {
1826                             m_sharedPty.put(msg);
1827                             m_putMsg.notify();
1828                             return;
1829                         }
1830                         if (!mboxFull() || isControlMsg(msg))
1831                         {
1832                             m_sharedBox.put(msg);
1833                             m_putMsg.notify();
1834                             return;
1835                         }
1836                         if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
1837                         {
1838                             return;
1839                         }
1840                         m_putQueue++;
1841                         m_notFull.wait();
1842                         m_putQueue--;
1843                     }
1844                 }
1845             }
1846         }
1847 
1848         /*
1849          * Matches ops against each message in turn until a match is found.
1850          *
1851          * Params:
1852          *  ops = The operations to match.  Each may return a bool to indicate
1853          *        whether a message with a matching type is truly a match.
1854          *
1855          * Returns:
1856          *  true if a message was retrieved and false if not (such as if a
1857          *  timeout occurred).
1858          *
1859          * Throws:
1860          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
1861          * if the owner thread terminates and no existing messages match the
1862          * supplied ops.
1863          */
get(T...)1864         bool get(T...)(scope T vals)
1865         {
1866             import std.meta : AliasSeq;
1867 
1868             static assert(T.length);
1869 
1870             static if (isImplicitlyConvertible!(T[0], Duration))
1871             {
1872                 alias Ops = AliasSeq!(T[1 .. $]);
1873                 alias ops = vals[1 .. $];
1874                 enum timedWait = true;
1875                 Duration period = vals[0];
1876             }
1877             else
1878             {
1879                 alias Ops = AliasSeq!(T);
1880                 alias ops = vals[0 .. $];
1881                 enum timedWait = false;
1882             }
1883 
1884             bool onStandardMsg(ref Message msg)
1885             {
1886                 foreach (i, t; Ops)
1887                 {
1888                     alias Args = Parameters!(t);
1889                     auto op = ops[i];
1890 
1891                     if (msg.convertsTo!(Args))
1892                     {
1893                         static if (is(ReturnType!(t) == bool))
1894                         {
1895                             return msg.map(op);
1896                         }
1897                         else
1898                         {
1899                             msg.map(op);
1900                             return true;
1901                         }
1902                     }
1903                 }
1904                 return false;
1905             }
1906 
1907             bool onLinkDeadMsg(ref Message msg)
1908             {
1909                 assert(msg.convertsTo!(Tid));
1910                 auto tid = msg.get!(Tid);
1911 
1912                 if (bool* pDepends = tid in thisInfo.links)
1913                 {
1914                     auto depends = *pDepends;
1915                     thisInfo.links.remove(tid);
1916                     // Give the owner relationship precedence.
1917                     if (depends && tid != thisInfo.owner)
1918                     {
1919                         auto e = new LinkTerminated(tid);
1920                         auto m = Message(MsgType.standard, e);
1921                         if (onStandardMsg(m))
1922                             return true;
1923                         throw e;
1924                     }
1925                 }
1926                 if (tid == thisInfo.owner)
1927                 {
1928                     thisInfo.owner = Tid.init;
1929                     auto e = new OwnerTerminated(tid);
1930                     auto m = Message(MsgType.standard, e);
1931                     if (onStandardMsg(m))
1932                         return true;
1933                     throw e;
1934                 }
1935                 return false;
1936             }
1937 
1938             bool onControlMsg(ref Message msg)
1939             {
1940                 switch (msg.type)
1941                 {
1942                 case MsgType.linkDead:
1943                     return onLinkDeadMsg(msg);
1944                 default:
1945                     return false;
1946                 }
1947             }
1948 
1949             bool scan(ref ListT list)
1950             {
1951                 for (auto range = list[]; !range.empty;)
1952                 {
1953                     // Only the message handler will throw, so if this occurs
1954                     // we can be certain that the message was handled.
1955                     scope (failure)
1956                         list.removeAt(range);
1957 
1958                     if (isControlMsg(range.front))
1959                     {
1960                         if (onControlMsg(range.front))
1961                         {
1962                             // Although the linkDead message is a control message,
1963                             // it can be handled by the user.  Since the linkDead
1964                             // message throws if not handled, if we get here then
1965                             // it has been handled and we can return from receive.
1966                             // This is a weird special case that will have to be
1967                             // handled in a more general way if more are added.
1968                             if (!isLinkDeadMsg(range.front))
1969                             {
1970                                 list.removeAt(range);
1971                                 continue;
1972                             }
1973                             list.removeAt(range);
1974                             return true;
1975                         }
1976                         range.popFront();
1977                         continue;
1978                     }
1979                     else
1980                     {
1981                         if (onStandardMsg(range.front))
1982                         {
1983                             list.removeAt(range);
1984                             return true;
1985                         }
1986                         range.popFront();
1987                         continue;
1988                     }
1989                 }
1990                 return false;
1991             }
1992 
1993             bool pty(ref ListT list)
1994             {
1995                 if (!list.empty)
1996                 {
1997                     auto range = list[];
1998 
1999                     if (onStandardMsg(range.front))
2000                     {
2001                         list.removeAt(range);
2002                         return true;
2003                     }
2004                     if (range.front.convertsTo!(Throwable))
2005                         throw range.front.get!(Throwable);
2006                     else if (range.front.convertsTo!(shared(Throwable)))
2007                         throw range.front.get!(shared(Throwable));
2008                     else
2009                         throw new PriorityMessageException(range.front.data);
2010                 }
2011                 return false;
2012             }
2013 
2014             static if (timedWait)
2015             {
2016                 import core.time : MonoTime;
2017                 auto limit = MonoTime.currTime + period;
2018             }
2019 
2020             while (true)
2021             {
2022                 ListT arrived;
2023 
2024                 if (pty(m_localPty) || scan(m_localBox))
2025                 {
2026                     return true;
2027                 }
2028                 yield();
2029                 synchronized (m_lock)
2030                 {
2031                     updateMsgCount();
2032                     while (m_sharedPty.empty && m_sharedBox.empty)
2033                     {
2034                         // NOTE: We're notifying all waiters here instead of just
2035                         //       a few because the onCrowding behavior may have
2036                         //       changed and we don't want to block sender threads
2037                         //       unnecessarily if the new behavior is not to block.
2038                         //       This will admittedly result in spurious wakeups
2039                         //       in other situations, but what can you do?
2040                         if (m_putQueue && !mboxFull())
2041                             m_notFull.notifyAll();
2042                         static if (timedWait)
2043                         {
2044                             if (period <= Duration.zero || !m_putMsg.wait(period))
2045                                 return false;
2046                         }
2047                         else
2048                         {
2049                             m_putMsg.wait();
2050                         }
2051                     }
2052                     m_localPty.put(m_sharedPty);
2053                     arrived.put(m_sharedBox);
2054                 }
2055                 if (m_localPty.empty)
2056                 {
2057                     scope (exit) m_localBox.put(arrived);
2058                     if (scan(arrived))
2059                     {
2060                         return true;
2061                     }
2062                     else
2063                     {
2064                         static if (timedWait)
2065                         {
2066                             period = limit - MonoTime.currTime;
2067                         }
2068                         continue;
2069                     }
2070                 }
2071                 m_localBox.put(arrived);
2072                 pty(m_localPty);
2073                 return true;
2074             }
2075         }
2076 
2077         /*
2078          * Called on thread termination.  This routine processes any remaining
2079          * control messages, clears out message queues, and sets a flag to
2080          * reject any future messages.
2081          */
close()2082         final void close()
2083         {
2084             static void onLinkDeadMsg(ref Message msg)
2085             {
2086                 assert(msg.convertsTo!(Tid));
2087                 auto tid = msg.get!(Tid);
2088 
2089                 thisInfo.links.remove(tid);
2090                 if (tid == thisInfo.owner)
2091                     thisInfo.owner = Tid.init;
2092             }
2093 
2094             static void sweep(ref ListT list)
2095             {
2096                 for (auto range = list[]; !range.empty; range.popFront())
2097                 {
2098                     if (range.front.type == MsgType.linkDead)
2099                         onLinkDeadMsg(range.front);
2100                 }
2101             }
2102 
2103             ListT arrived;
2104 
2105             sweep(m_localBox);
2106             synchronized (m_lock)
2107             {
2108                 arrived.put(m_sharedBox);
2109                 m_closed = true;
2110             }
2111             m_localBox.clear();
2112             sweep(arrived);
2113         }
2114 
2115     private:
2116         // Routines involving local data only, no lock needed.
2117 
mboxFull()2118         bool mboxFull() @safe @nogc pure nothrow
2119         {
2120             return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2121         }
2122 
updateMsgCount()2123         void updateMsgCount() @safe @nogc pure nothrow
2124         {
2125             m_localMsgs = m_localBox.length;
2126         }
2127 
isControlMsg(ref Message msg)2128         bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2129         {
2130             return msg.type != MsgType.standard && msg.type != MsgType.priority;
2131         }
2132 
isPriorityMsg(ref Message msg)2133         bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2134         {
2135             return msg.type == MsgType.priority;
2136         }
2137 
isLinkDeadMsg(ref Message msg)2138         bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2139         {
2140             return msg.type == MsgType.linkDead;
2141         }
2142 
2143         alias OnMaxFn = bool function(Tid);
2144         alias ListT = List!(Message);
2145 
2146         ListT m_localBox;
2147         ListT m_localPty;
2148 
2149         Mutex m_lock;
2150         Condition m_putMsg;
2151         Condition m_notFull;
2152         size_t m_putQueue;
2153         ListT m_sharedBox;
2154         ListT m_sharedPty;
2155         OnMaxFn m_onMaxMsgs;
2156         size_t m_localMsgs;
2157         size_t m_maxMsgs;
2158         bool m_closed;
2159     }
2160 
2161     /*
2162      *
2163      */
List(T)2164     struct List(T)
2165     {
2166         struct Range
2167         {
2168             import std.exception : enforce;
2169 
2170             @property bool empty() const
2171             {
2172                 return !m_prev.next;
2173             }
2174 
2175             @property ref T front()
2176             {
2177                 enforce(m_prev.next, "invalid list node");
2178                 return m_prev.next.val;
2179             }
2180 
2181             @property void front(T val)
2182             {
2183                 enforce(m_prev.next, "invalid list node");
2184                 m_prev.next.val = val;
2185             }
2186 
2187             void popFront()
2188             {
2189                 enforce(m_prev.next, "invalid list node");
2190                 m_prev = m_prev.next;
2191             }
2192 
2193             private this(Node* p)
2194             {
2195                 m_prev = p;
2196             }
2197 
2198             private Node* m_prev;
2199         }
2200 
2201         void put(T val)
2202         {
2203             put(newNode(val));
2204         }
2205 
2206         void put(ref List!(T) rhs)
2207         {
2208             if (!rhs.empty)
2209             {
2210                 put(rhs.m_first);
2211                 while (m_last.next !is null)
2212                 {
2213                     m_last = m_last.next;
2214                     m_count++;
2215                 }
2216                 rhs.m_first = null;
2217                 rhs.m_last = null;
2218                 rhs.m_count = 0;
2219             }
2220         }
2221 
2222         Range opSlice()
2223         {
2224             return Range(cast(Node*)&m_first);
2225         }
2226 
2227         void removeAt(Range r)
2228         {
2229             import std.exception : enforce;
2230 
2231             assert(m_count);
2232             Node* n = r.m_prev;
2233             enforce(n && n.next, "attempting to remove invalid list node");
2234 
2235             if (m_last is m_first)
2236                 m_last = null;
2237             else if (m_last is n.next)
2238                 m_last = n; // nocoverage
2239             Node* to_free = n.next;
2240             n.next = n.next.next;
2241             freeNode(to_free);
2242             m_count--;
2243         }
2244 
2245         @property size_t length()
2246         {
2247             return m_count;
2248         }
2249 
2250         void clear()
2251         {
2252             m_first = m_last = null;
2253             m_count = 0;
2254         }
2255 
2256         @property bool empty()
2257         {
2258             return m_first is null;
2259         }
2260 
2261     private:
2262         struct Node
2263         {
2264             Node* next;
2265             T val;
2266 
2267             this(T v)
2268             {
2269                 val = v;
2270             }
2271         }
2272 
2273         static shared struct SpinLock
2274         {
2275             void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2276             void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2277             bool locked;
2278         }
2279 
2280         static shared SpinLock sm_lock;
2281         static shared Node* sm_head;
2282 
2283         Node* newNode(T v)
2284         {
2285             Node* n;
2286             {
2287                 sm_lock.lock();
2288                 scope (exit) sm_lock.unlock();
2289 
2290                 if (sm_head)
2291                 {
2292                     n = cast(Node*) sm_head;
2293                     sm_head = sm_head.next;
2294                 }
2295             }
2296             if (n)
2297             {
2298                 import std.conv : emplace;
2299                 emplace!Node(n, v);
2300             }
2301             else
2302             {
2303                 n = new Node(v);
2304             }
2305             return n;
2306         }
2307 
2308         void freeNode(Node* n)
2309         {
2310             // destroy val to free any owned GC memory
2311             destroy(n.val);
2312 
2313             sm_lock.lock();
2314             scope (exit) sm_lock.unlock();
2315 
2316             auto sn = cast(shared(Node)*) n;
2317             sn.next = sm_head;
2318             sm_head = sn;
2319         }
2320 
2321         void put(Node* n)
2322         {
2323             m_count++;
2324             if (!empty)
2325             {
2326                 m_last.next = n;
2327                 m_last = n;
2328                 return;
2329             }
2330             m_first = n;
2331             m_last = n;
2332         }
2333 
2334         Node* m_first;
2335         Node* m_last;
2336         size_t m_count;
2337     }
2338 }
2339 
version(unittest)2340 version (unittest)
2341 {
2342     import std.stdio;
2343     import std.typecons : tuple, Tuple;
2344 
2345     void testfn(Tid tid)
2346     {
2347         receive((float val) { assert(0); }, (int val, int val2) {
2348             assert(val == 42 && val2 == 86);
2349         });
2350         receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2351         receive((Variant val) {  });
2352         receive((string val) {
2353             if ("the quick brown fox" != val)
2354                 return false;
2355             return true;
2356         }, (string val) { assert(false); });
2357         prioritySend(tid, "done");
2358     }
2359 
2360     void runTest(Tid tid)
2361     {
2362         send(tid, 42, 86);
2363         send(tid, tuple(42, 86));
2364         send(tid, "hello", "there");
2365         send(tid, "the quick brown fox");
2366         receive((string val) { assert(val == "done"); });
2367     }
2368 
2369     void simpleTest()
2370     {
2371         auto tid = spawn(&testfn, thisTid);
2372         runTest(tid);
2373 
2374         // Run the test again with a limited mailbox size.
2375         tid = spawn(&testfn, thisTid);
2376         setMaxMailboxSize(tid, 2, OnCrowding.block);
2377         runTest(tid);
2378     }
2379 
2380     @system unittest
2381     {
2382         simpleTest();
2383     }
2384 
2385     @system unittest
2386     {
2387         scheduler = new ThreadScheduler;
2388         simpleTest();
2389         scheduler = null;
2390     }
2391 }
2392 
initOnceLock()2393 private @property Mutex initOnceLock()
2394 {
2395     __gshared Mutex lock;
2396     if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock))
2397         return mtx;
2398     auto mtx = new Mutex;
2399     if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx))
2400         return mtx;
2401     return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock);
2402 }
2403 
2404 /**
2405  * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2406  * thread-safe manner.
2407  *
2408  * The implementation guarantees that all threads simultaneously calling
2409  * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2410  * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2411  * afterwards.
2412  *
2413  * Params:
2414  *   var = The variable to initialize
2415  *   init = The lazy initializer value
2416  *
2417  * Returns:
2418  *   A reference to the initialized variable
2419  */
initOnce(alias var)2420 auto ref initOnce(alias var)(lazy typeof(var) init)
2421 {
2422     return initOnce!var(init, initOnceLock);
2423 }
2424 
2425 /// A typical use-case is to perform lazy but thread-safe initialization.
2426 @system unittest
2427 {
2428     static class MySingleton
2429     {
instance()2430         static MySingleton instance()
2431         {
2432             static __gshared MySingleton inst;
2433             return initOnce!inst(new MySingleton);
2434         }
2435     }
2436 
2437     assert(MySingleton.instance !is null);
2438 }
2439 
2440 @system unittest
2441 {
2442     static class MySingleton
2443     {
instance()2444         static MySingleton instance()
2445         {
2446             static __gshared MySingleton inst;
2447             return initOnce!inst(new MySingleton);
2448         }
2449 
2450     private:
this()2451         this() { val = ++cnt; }
2452         size_t val;
2453         static __gshared size_t cnt;
2454     }
2455 
2456     foreach (_; 0 .. 10)
2457         spawn({ ownerTid.send(MySingleton.instance.val); });
2458     foreach (_; 0 .. 10)
2459         assert(receiveOnly!size_t == MySingleton.instance.val);
2460     assert(MySingleton.cnt == 1);
2461 }
2462 
2463 /**
2464  * Same as above, but takes a separate mutex instead of sharing one among
2465  * all initOnce instances.
2466  *
2467  * This should be used to avoid dead-locks when the $(D_PARAM init)
2468  * expression waits for the result of another thread that might also
2469  * call initOnce. Use with care.
2470  *
2471  * Params:
2472  *   var = The variable to initialize
2473  *   init = The lazy initializer value
2474  *   mutex = A mutex to prevent race conditions
2475  *
2476  * Returns:
2477  *   A reference to the initialized variable
2478  */
initOnce(alias var)2479 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2480 {
2481     // check that var is global, can't take address of a TLS variable
2482     static assert(is(typeof({ __gshared p = &var; })),
2483         "var must be 'static shared' or '__gshared'.");
2484     import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2485 
2486     static shared bool flag;
2487     if (!atomicLoad!(MemoryOrder.acq)(flag))
2488     {
2489         synchronized (mutex)
2490         {
2491             if (!atomicLoad!(MemoryOrder.acq)(flag))
2492             {
2493                 var = init;
2494                 atomicStore!(MemoryOrder.rel)(flag, true);
2495             }
2496         }
2497     }
2498     return var;
2499 }
2500 
2501 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2502 @system unittest
2503 {
2504     import core.sync.mutex : Mutex;
2505 
2506     static shared bool varA, varB;
2507     __gshared Mutex m;
2508     m = new Mutex;
2509 
2510     spawn({
2511         // use a different mutex for varB to avoid a dead-lock
2512         initOnce!varB(true, m);
2513         ownerTid.send(true);
2514     });
2515     // init depends on the result of the spawned thread
2516     initOnce!varA(receiveOnly!bool);
2517     assert(varA == true);
2518     assert(varB == true);
2519 }
2520 
2521 @system unittest
2522 {
2523     static shared bool a;
2524     __gshared bool b;
2525     static bool c;
2526     bool d;
2527     initOnce!a(true);
2528     initOnce!b(true);
2529     static assert(!__traits(compiles, initOnce!c(true))); // TLS
2530     static assert(!__traits(compiles, initOnce!d(true))); // local variable
2531 }
2532