1 /**
2 * This is a low-level messaging API upon which more structured or restrictive
3 * APIs may be built. The general idea is that every messageable entity is
4 * represented by a common handle type called a Tid, which allows messages to
5 * be sent to logical threads that are executing in both the current process
6 * and in external processes using the same interface. This is an important
7 * aspect of scalability because it allows the components of a program to be
8 * spread across available resources with few to no changes to the actual
9 * implementation.
10 *
11 * A logical thread is an execution context that has its own stack and which
12 * runs asynchronously to other logical threads. These may be preemptively
13 * scheduled kernel threads, fibers (cooperative user-space threads), or some
14 * other concept with similar behavior.
15 *
16 * The type of concurrency used when logical threads are created is determined
17 * by the Scheduler selected at initialization time. The default behavior is
18 * currently to create a new kernel thread per call to spawn, but other
19 * schedulers are available that multiplex fibers across the main thread or
20 * use some combination of the two approaches.
21 *
22 * Copyright: Copyright Sean Kelly 2009 - 2014.
23 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
24 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak
25 * Source: $(PHOBOSSRC std/_concurrency.d)
26 */
27 /* Copyright Sean Kelly 2009 - 2014.
28 * Distributed under the Boost Software License, Version 1.0.
29 * (See accompanying file LICENSE_1_0.txt or copy at
30 * http://www.boost.org/LICENSE_1_0.txt)
31 */
32 module std.concurrency;
33
34 public import std.variant;
35
36 import core.atomic;
37 import core.sync.condition;
38 import core.sync.mutex;
39 import core.thread;
40 import std.range.primitives;
41 import std.range.interfaces : InputRange;
42 import std.traits;
43
44 ///
45 @system unittest
46 {
47 __gshared string received;
spawnedFunc(Tid ownerTid)48 static void spawnedFunc(Tid ownerTid)
49 {
50 import std.conv : text;
51 // Receive a message from the owner thread.
52 receive((int i){
53 received = text("Received the number ", i);
54
55 // Send a message back to the owner thread
56 // indicating success.
57 send(ownerTid, true);
58 });
59 }
60
61 // Start spawnedFunc in a new thread.
62 auto childTid = spawn(&spawnedFunc, thisTid);
63
64 // Send the number 42 to this new thread.
65 send(childTid, 42);
66
67 // Receive the result code.
68 auto wasSuccessful = receiveOnly!(bool);
69 assert(wasSuccessful);
70 assert(received == "Received the number 42");
71 }
72
73 private
74 {
hasLocalAliasing(T...)75 template hasLocalAliasing(T...)
76 {
77 static if (!T.length)
78 enum hasLocalAliasing = false;
79 else
80 enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) ||
81 std.concurrency.hasLocalAliasing!(T[1 .. $]);
82 }
83
84 enum MsgType
85 {
86 standard,
87 priority,
88 linkDead,
89 }
90
91 struct Message
92 {
93 MsgType type;
94 Variant data;
95
96 this(T...)(MsgType t, T vals) if (T.length > 0)
97 {
98 static if (T.length == 1)
99 {
100 type = t;
101 data = vals[0];
102 }
103 else
104 {
105 import std.typecons : Tuple;
106
107 type = t;
108 data = Tuple!(T)(vals);
109 }
110 }
111
convertsTo(T...)112 @property auto convertsTo(T...)()
113 {
114 static if (T.length == 1)
115 {
116 return is(T[0] == Variant) || data.convertsTo!(T);
117 }
118 else
119 {
120 import std.typecons : Tuple;
121 return data.convertsTo!(Tuple!(T));
122 }
123 }
124
get(T...)125 @property auto get(T...)()
126 {
127 static if (T.length == 1)
128 {
129 static if (is(T[0] == Variant))
130 return data;
131 else
132 return data.get!(T);
133 }
134 else
135 {
136 import std.typecons : Tuple;
137 return data.get!(Tuple!(T));
138 }
139 }
140
map(Op)141 auto map(Op)(Op op)
142 {
143 alias Args = Parameters!(Op);
144
145 static if (Args.length == 1)
146 {
147 static if (is(Args[0] == Variant))
148 return op(data);
149 else
150 return op(data.get!(Args));
151 }
152 else
153 {
154 import std.typecons : Tuple;
155 return op(data.get!(Tuple!(Args)).expand);
156 }
157 }
158 }
159
checkops(T...)160 void checkops(T...)(T ops)
161 {
162 foreach (i, t1; T)
163 {
164 static assert(isFunctionPointer!t1 || isDelegate!t1);
165 alias a1 = Parameters!(t1);
166 alias r1 = ReturnType!(t1);
167
168 static if (i < T.length - 1 && is(r1 == void))
169 {
170 static assert(a1.length != 1 || !is(a1[0] == Variant),
171 "function with arguments " ~ a1.stringof ~
172 " occludes successive function");
173
174 foreach (t2; T[i + 1 .. $])
175 {
176 static assert(isFunctionPointer!t2 || isDelegate!t2);
177 alias a2 = Parameters!(t2);
178
179 static assert(!is(a1 == a2),
180 "function with arguments " ~ a1.stringof ~ " occludes successive function");
181 }
182 }
183 }
184 }
185
thisInfo()186 @property ref ThreadInfo thisInfo() nothrow
187 {
188 if (scheduler is null)
189 return ThreadInfo.thisInfo;
190 return scheduler.thisInfo;
191 }
192 }
193
~this()194 static ~this()
195 {
196 thisInfo.cleanup();
197 }
198
199 // Exceptions
200
201 /**
202 * Thrown on calls to $(D receiveOnly) if a message other than the type
203 * the receiving thread expected is sent.
204 */
205 class MessageMismatch : Exception
206 {
207 ///
208 this(string msg = "Unexpected message type") @safe pure nothrow @nogc
209 {
210 super(msg);
211 }
212 }
213
214 /**
215 * Thrown on calls to $(D receive) if the thread that spawned the receiving
216 * thread has terminated and no more messages exist.
217 */
218 class OwnerTerminated : Exception
219 {
220 ///
221 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
222 {
223 super(msg);
224 tid = t;
225 }
226
227 Tid tid;
228 }
229
230 /**
231 * Thrown if a linked thread has terminated.
232 */
233 class LinkTerminated : Exception
234 {
235 ///
236 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
237 {
238 super(msg);
239 tid = t;
240 }
241
242 Tid tid;
243 }
244
245 /**
246 * Thrown if a message was sent to a thread via
247 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
248 * for a message of this type.
249 */
250 class PriorityMessageException : Exception
251 {
252 ///
this(Variant vals)253 this(Variant vals)
254 {
255 super("Priority message");
256 message = vals;
257 }
258
259 /**
260 * The message that was sent.
261 */
262 Variant message;
263 }
264
265 /**
266 * Thrown on mailbox crowding if the mailbox is configured with
267 * $(D OnCrowding.throwException).
268 */
269 class MailboxFull : Exception
270 {
271 ///
272 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
273 {
274 super(msg);
275 tid = t;
276 }
277
278 Tid tid;
279 }
280
281 /**
282 * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't
283 * find an owner thread.
284 */
285 class TidMissingException : Exception
286 {
287 import std.exception : basicExceptionCtors;
288 ///
289 mixin basicExceptionCtors;
290 }
291
292
293 // Thread ID
294
295
296 /**
297 * An opaque type used to represent a logical thread.
298 */
299 struct Tid
300 {
301 private:
thisTid302 this(MessageBox m) @safe pure nothrow @nogc
303 {
304 mbox = m;
305 }
306
307 MessageBox mbox;
308
309 public:
310
311 /**
312 * Generate a convenient string for identifying this Tid. This is only
313 * useful to see if Tid's that are currently executing are the same or
314 * different, e.g. for logging and debugging. It is potentially possible
315 * that a Tid executed in the future will have the same toString() output
316 * as another Tid that has already terminated.
317 */
toStringTid318 void toString(scope void delegate(const(char)[]) sink)
319 {
320 import std.format : formattedWrite;
321 formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
322 }
323
324 }
325
326 @system unittest
327 {
328 // text!Tid is @system
329 import std.conv : text;
330 Tid tid;
331 assert(text(tid) == "Tid(0)");
332 auto tid2 = thisTid;
333 assert(text(tid2) != "Tid(0)");
334 auto tid3 = tid2;
335 assert(text(tid2) == text(tid3));
336 }
337
338 /**
339 * Returns: The $(LREF Tid) of the caller's thread.
340 */
thisTid()341 @property Tid thisTid() @safe
342 {
343 // TODO: remove when concurrency is safe
344 static auto trus() @trusted
345 {
346 if (thisInfo.ident != Tid.init)
347 return thisInfo.ident;
348 thisInfo.ident = Tid(new MessageBox);
349 return thisInfo.ident;
350 }
351
352 return trus();
353 }
354
355 /**
356 * Return the Tid of the thread which spawned the caller's thread.
357 *
358 * Throws: A $(D TidMissingException) exception if
359 * there is no owner thread.
360 */
ownerTid()361 @property Tid ownerTid()
362 {
363 import std.exception : enforce;
364
365 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
366 return thisInfo.owner;
367 }
368
369 @system unittest
370 {
371 import std.exception : assertThrown;
372
fun()373 static void fun()
374 {
375 string res = receiveOnly!string();
376 assert(res == "Main calling");
377 ownerTid.send("Child responding");
378 }
379
380 assertThrown!TidMissingException(ownerTid);
381 auto child = spawn(&fun);
382 child.send("Main calling");
383 string res = receiveOnly!string();
384 assert(res == "Child responding");
385 }
386
387 // Thread Creation
388
isSpawnable(F,T...)389 private template isSpawnable(F, T...)
390 {
391 template isParamsImplicitlyConvertible(F1, F2, int i = 0)
392 {
393 alias param1 = Parameters!F1;
394 alias param2 = Parameters!F2;
395 static if (param1.length != param2.length)
396 enum isParamsImplicitlyConvertible = false;
397 else static if (param1.length == i)
398 enum isParamsImplicitlyConvertible = true;
399 else static if (isImplicitlyConvertible!(param2[i], param1[i]))
400 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
401 F2, i + 1);
402 else
403 enum isParamsImplicitlyConvertible = false;
404 }
405
406 enum isSpawnable = isCallable!F && is(ReturnType!F == void)
407 && isParamsImplicitlyConvertible!(F, void function(T))
408 && (isFunctionPointer!F || !hasUnsharedAliasing!F);
409 }
410
411 /**
412 * Starts fn(args) in a new logical thread.
413 *
414 * Executes the supplied function in a new logical thread represented by
415 * $(D Tid). The calling thread is designated as the owner of the new thread.
416 * When the owner thread terminates an $(D OwnerTerminated) message will be
417 * sent to the new thread, causing an $(D OwnerTerminated) exception to be
418 * thrown on $(D receive()).
419 *
420 * Params:
421 * fn = The function to execute.
422 * args = Arguments to the function.
423 *
424 * Returns:
425 * A Tid representing the new logical thread.
426 *
427 * Notes:
428 * $(D args) must not have unshared aliasing. In other words, all arguments
429 * to $(D fn) must either be $(D shared) or $(D immutable) or have no
430 * pointer indirection. This is necessary for enforcing isolation among
431 * threads.
432 *
433 * Example:
434 * ---
435 * import std.stdio, std.concurrency;
436 *
437 * void f1(string str)
438 * {
439 * writeln(str);
440 * }
441 *
442 * void f2(char[] str)
443 * {
444 * writeln(str);
445 * }
446 *
447 * void main()
448 * {
449 * auto str = "Hello, world";
450 *
451 * // Works: string is immutable.
452 * auto tid1 = spawn(&f1, str);
453 *
454 * // Fails: char[] has mutable aliasing.
455 * auto tid2 = spawn(&f2, str.dup);
456 *
457 * // New thread with anonymous function
458 * spawn({ writeln("This is so great!"); });
459 * }
460 * ---
461 */
462 Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T))
463 {
464 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
465 return _spawn(false, fn, args);
466 }
467
468 /**
469 * Starts fn(args) in a logical thread and will receive a LinkTerminated
470 * message when the operation terminates.
471 *
472 * Executes the supplied function in a new logical thread represented by
473 * Tid. This new thread is linked to the calling thread so that if either
474 * it or the calling thread terminates a LinkTerminated message will be sent
475 * to the other, causing a LinkTerminated exception to be thrown on receive().
476 * The owner relationship from spawn() is preserved as well, so if the link
477 * between threads is broken, owner termination will still result in an
478 * OwnerTerminated exception to be thrown on receive().
479 *
480 * Params:
481 * fn = The function to execute.
482 * args = Arguments to the function.
483 *
484 * Returns:
485 * A Tid representing the new thread.
486 */
487 Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T))
488 {
489 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
490 return _spawn(true, fn, args);
491 }
492
493 /*
494 *
495 */
496 private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T))
497 {
498 // TODO: MessageList and &exec should be shared.
499 auto spawnTid = Tid(new MessageBox);
500 auto ownerTid = thisTid;
501
exec()502 void exec()
503 {
504 thisInfo.ident = spawnTid;
505 thisInfo.owner = ownerTid;
506 fn(args);
507 }
508
509 // TODO: MessageList and &exec should be shared.
510 if (scheduler !is null)
511 scheduler.spawn(&exec);
512 else
513 {
514 auto t = new Thread(&exec);
515 t.start();
516 }
517 thisInfo.links[spawnTid] = linked;
518 return spawnTid;
519 }
520
521 @system unittest
522 {
523 void function() fn1;
524 void function(int) fn2;
525 static assert(__traits(compiles, spawn(fn1)));
526 static assert(__traits(compiles, spawn(fn2, 2)));
527 static assert(!__traits(compiles, spawn(fn1, 1)));
528 static assert(!__traits(compiles, spawn(fn2)));
529
530 void delegate(int) shared dg1;
531 shared(void delegate(int)) dg2;
532 shared(void delegate(long) shared) dg3;
533 shared(void delegate(real, int, long) shared) dg4;
534 void delegate(int) immutable dg5;
535 void delegate(int) dg6;
536 static assert(__traits(compiles, spawn(dg1, 1)));
537 static assert(__traits(compiles, spawn(dg2, 2)));
538 static assert(__traits(compiles, spawn(dg3, 3)));
539 static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
540 static assert(__traits(compiles, spawn(dg5, 5)));
541 static assert(!__traits(compiles, spawn(dg6, 6)));
542
opCall(int)543 auto callable1 = new class{ void opCall(int) shared {} };
cast(shared)544 auto callable2 = cast(shared) new class{ void opCall(int) shared {} };
opCall(int)545 auto callable3 = new class{ void opCall(int) immutable {} };
cast(immutable)546 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} };
opCall(int)547 auto callable5 = new class{ void opCall(int) {} };
cast(shared)548 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} };
cast(immutable)549 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} };
cast(shared)550 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} };
cast(const shared)551 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} };
cast(const shared)552 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
cast(immutable)553 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
554 static assert(!__traits(compiles, spawn(callable1, 1)));
555 static assert( __traits(compiles, spawn(callable2, 2)));
556 static assert(!__traits(compiles, spawn(callable3, 3)));
557 static assert( __traits(compiles, spawn(callable4, 4)));
558 static assert(!__traits(compiles, spawn(callable5, 5)));
559 static assert(!__traits(compiles, spawn(callable6, 6)));
560 static assert(!__traits(compiles, spawn(callable7, 7)));
561 static assert( __traits(compiles, spawn(callable8, 8)));
562 static assert(!__traits(compiles, spawn(callable9, 9)));
563 static assert( __traits(compiles, spawn(callable10, 10)));
564 static assert( __traits(compiles, spawn(callable11, 11)));
565 }
566
567 /**
568 * Places the values as a message at the back of tid's message queue.
569 *
570 * Sends the supplied value to the thread represented by tid. As with
571 * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing.
572 */
send(T...)573 void send(T...)(Tid tid, T vals)
574 {
575 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
576 _send(tid, vals);
577 }
578
579 /**
580 * Places the values as a message on the front of tid's message queue.
581 *
582 * Send a message to $(D tid) but place it at the front of $(D tid)'s message
583 * queue instead of at the back. This function is typically used for
584 * out-of-band communication, to signal exceptional conditions, etc.
585 */
prioritySend(T...)586 void prioritySend(T...)(Tid tid, T vals)
587 {
588 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
589 _send(MsgType.priority, tid, vals);
590 }
591
592 /*
593 * ditto
594 */
_send(T...)595 private void _send(T...)(Tid tid, T vals)
596 {
597 _send(MsgType.standard, tid, vals);
598 }
599
600 /*
601 * Implementation of send. This allows parameter checking to be different for
602 * both Tid.send() and .send().
603 */
_send(T...)604 private void _send(T...)(MsgType type, Tid tid, T vals)
605 {
606 auto msg = Message(type, vals);
607 tid.mbox.put(msg);
608 }
609
610 /**
611 * Receives a message from another thread.
612 *
613 * Receive a message from another thread, or block if no messages of the
614 * specified types are available. This function works by pattern matching
615 * a message against a set of delegates and executing the first match found.
616 *
617 * If a delegate that accepts a $(REF Variant, std,variant) is included as
618 * the last argument to $(D receive), it will match any message that was not
619 * matched by an earlier delegate. If more than one argument is sent,
620 * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values
621 * sent.
622 *
623 * Example:
624 * ---
625 * import std.stdio;
626 * import std.variant;
627 * import std.concurrency;
628 *
629 * void spawnedFunction()
630 * {
631 * receive(
632 * (int i) { writeln("Received an int."); },
633 * (float f) { writeln("Received a float."); },
634 * (Variant v) { writeln("Received some other type."); }
635 * );
636 * }
637 *
638 * void main()
639 * {
640 * auto tid = spawn(&spawnedFunction);
641 * send(tid, 42);
642 * }
643 * ---
644 */
receive(T...)645 void receive(T...)( T ops )
646 in
647 {
648 assert(thisInfo.ident.mbox !is null,
649 "Cannot receive a message until a thread was spawned "
650 ~ "or thisTid was passed to a running thread.");
651 }
652 body
653 {
654 checkops( ops );
655
656 thisInfo.ident.mbox.get( ops );
657 }
658
659
660 @safe unittest
661 {
662 static assert( __traits( compiles,
663 {
664 receive( (Variant x) {} );
665 receive( (int x) {}, (Variant x) {} );
666 } ) );
667
668 static assert( !__traits( compiles,
669 {
670 receive( (Variant x) {}, (int x) {} );
671 } ) );
672
673 static assert( !__traits( compiles,
674 {
675 receive( (int x) {}, (int x) {} );
676 } ) );
677 }
678
679 // Make sure receive() works with free functions as well.
version(unittest)680 version (unittest)
681 {
682 private void receiveFunction(int x) {}
683 }
684 @safe unittest
685 {
686 static assert( __traits( compiles,
687 {
688 receive( &receiveFunction );
689 receive( &receiveFunction, (Variant x) {} );
690 } ) );
691 }
692
693
receiveOnlyRet(T...)694 private template receiveOnlyRet(T...)
695 {
696 static if ( T.length == 1 )
697 {
698 alias receiveOnlyRet = T[0];
699 }
700 else
701 {
702 import std.typecons : Tuple;
703 alias receiveOnlyRet = Tuple!(T);
704 }
705 }
706
707 /**
708 * Receives only messages with arguments of types $(D T).
709 *
710 * Throws: $(D MessageMismatch) if a message of types other than $(D T)
711 * is received.
712 *
713 * Returns: The received message. If $(D T.length) is greater than one,
714 * the message will be packed into a $(REF Tuple, std,typecons).
715 *
716 * Example:
717 * ---
718 * import std.concurrency;
719 *
720 * void spawnedFunc()
721 * {
722 * auto msg = receiveOnly!(int, string)();
723 * assert(msg[0] == 42);
724 * assert(msg[1] == "42");
725 * }
726 *
727 * void main()
728 * {
729 * auto tid = spawn(&spawnedFunc);
730 * send(tid, 42, "42");
731 * }
732 * ---
733 */
734 receiveOnlyRet!(T) receiveOnly(T...)()
735 in
736 {
737 assert(thisInfo.ident.mbox !is null,
738 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
739 }
740 body
741 {
742 import std.format : format;
743 import std.typecons : Tuple;
744
745 Tuple!(T) ret;
746
747 thisInfo.ident.mbox.get((T val) {
748 static if (T.length)
749 ret.field = val;
750 },
751 (LinkTerminated e) { throw e; },
752 (OwnerTerminated e) { throw e; },
753 (Variant val) {
754 static if (T.length > 1)
755 string exp = T.stringof;
756 else
757 string exp = T[0].stringof;
758
759 throw new MessageMismatch(
760 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
761 });
762 static if (T.length == 1)
763 return ret[0];
764 else
765 return ret;
766 }
767
768 @system unittest
769 {
t1(Tid mainTid)770 static void t1(Tid mainTid)
771 {
772 try
773 {
774 receiveOnly!string();
775 mainTid.send("");
776 }
777 catch (Throwable th)
778 {
779 mainTid.send(th.msg);
780 }
781 }
782
783 auto tid = spawn(&t1, thisTid);
784 tid.send(1);
785 string result = receiveOnly!string();
786 assert(result == "Unexpected message type: expected 'string', got 'int'");
787 }
788
789 /**
790 * Tries to receive but will give up if no matches arrive within duration.
791 * Won't wait at all if provided $(REF Duration, core,time) is negative.
792 *
793 * Same as $(D receive) except that rather than wait forever for a message,
794 * it waits until either it receives a message or the given
795 * $(REF Duration, core,time) has passed. It returns $(D true) if it received a
796 * message and $(D false) if it timed out waiting for one.
797 */
receiveTimeout(T...)798 bool receiveTimeout(T...)(Duration duration, T ops)
799 in
800 {
801 assert(thisInfo.ident.mbox !is null,
802 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
803 }
804 body
805 {
806 checkops(ops);
807
808 return thisInfo.ident.mbox.get(duration, ops);
809 }
810
811 @safe unittest
812 {
813 static assert(__traits(compiles, {
814 receiveTimeout(msecs(0), (Variant x) {});
815 receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
816 }));
817
818 static assert(!__traits(compiles, {
819 receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
820 }));
821
822 static assert(!__traits(compiles, {
823 receiveTimeout(msecs(0), (int x) {}, (int x) {});
824 }));
825
826 static assert(__traits(compiles, {
827 receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
828 }));
829 }
830
831 // MessageBox Limits
832
833 /**
834 * These behaviors may be specified when a mailbox is full.
835 */
836 enum OnCrowding
837 {
838 block, /// Wait until room is available.
839 throwException, /// Throw a MailboxFull exception.
840 ignore /// Abort the send and return.
841 }
842
843 private
844 {
onCrowdingBlock(Tid tid)845 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
846 {
847 return true;
848 }
849
onCrowdingThrow(Tid tid)850 bool onCrowdingThrow(Tid tid) @safe pure
851 {
852 throw new MailboxFull(tid);
853 }
854
onCrowdingIgnore(Tid tid)855 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
856 {
857 return false;
858 }
859 }
860
861 /**
862 * Sets a maximum mailbox size.
863 *
864 * Sets a limit on the maximum number of user messages allowed in the mailbox.
865 * If this limit is reached, the caller attempting to add a new message will
866 * execute the behavior specified by doThis. If messages is zero, the mailbox
867 * is unbounded.
868 *
869 * Params:
870 * tid = The Tid of the thread for which this limit should be set.
871 * messages = The maximum number of messages or zero if no limit.
872 * doThis = The behavior executed when a message is sent to a full
873 * mailbox.
874 */
setMaxMailboxSize(Tid tid,size_t messages,OnCrowding doThis)875 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
876 {
877 final switch (doThis)
878 {
879 case OnCrowding.block:
880 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
881 case OnCrowding.throwException:
882 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
883 case OnCrowding.ignore:
884 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
885 }
886 }
887
888 /**
889 * Sets a maximum mailbox size.
890 *
891 * Sets a limit on the maximum number of user messages allowed in the mailbox.
892 * If this limit is reached, the caller attempting to add a new message will
893 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded.
894 *
895 * Params:
896 * tid = The Tid of the thread for which this limit should be set.
897 * messages = The maximum number of messages or zero if no limit.
898 * onCrowdingDoThis = The routine called when a message is sent to a full
899 * mailbox.
900 */
setMaxMailboxSize(Tid tid,size_t messages,bool function (Tid)onCrowdingDoThis)901 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
902 {
903 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
904 }
905
906 private
907 {
908 __gshared Tid[string] tidByName;
909 __gshared string[][Tid] namesByTid;
910 }
911
registryLock()912 private @property Mutex registryLock()
913 {
914 __gshared Mutex impl;
915 initOnce!impl(new Mutex);
916 return impl;
917 }
918
unregisterMe()919 private void unregisterMe()
920 {
921 auto me = thisInfo.ident;
922 if (thisInfo.ident != Tid.init)
923 {
924 synchronized (registryLock)
925 {
926 if (auto allNames = me in namesByTid)
927 {
928 foreach (name; *allNames)
929 tidByName.remove(name);
930 namesByTid.remove(me);
931 }
932 }
933 }
934 }
935
936 /**
937 * Associates name with tid.
938 *
939 * Associates name with tid in a process-local map. When the thread
940 * represented by tid terminates, any names associated with it will be
941 * automatically unregistered.
942 *
943 * Params:
944 * name = The name to associate with tid.
945 * tid = The tid register by name.
946 *
947 * Returns:
948 * true if the name is available and tid is not known to represent a
949 * defunct thread.
950 */
951 bool register(string name, Tid tid)
952 {
synchronized(registryLock)953 synchronized (registryLock)
954 {
955 if (name in tidByName)
956 return false;
957 if (tid.mbox.isClosed)
958 return false;
959 namesByTid[tid] ~= name;
960 tidByName[name] = tid;
961 return true;
962 }
963 }
964
965 /**
966 * Removes the registered name associated with a tid.
967 *
968 * Params:
969 * name = The name to unregister.
970 *
971 * Returns:
972 * true if the name is registered, false if not.
973 */
unregister(string name)974 bool unregister(string name)
975 {
976 import std.algorithm.mutation : remove, SwapStrategy;
977 import std.algorithm.searching : countUntil;
978
979 synchronized (registryLock)
980 {
981 if (auto tid = name in tidByName)
982 {
983 auto allNames = *tid in namesByTid;
984 auto pos = countUntil(*allNames, name);
985 remove!(SwapStrategy.unstable)(*allNames, pos);
986 tidByName.remove(name);
987 return true;
988 }
989 return false;
990 }
991 }
992
993 /**
994 * Gets the Tid associated with name.
995 *
996 * Params:
997 * name = The name to locate within the registry.
998 *
999 * Returns:
1000 * The associated Tid or Tid.init if name is not registered.
1001 */
locate(string name)1002 Tid locate(string name)
1003 {
1004 synchronized (registryLock)
1005 {
1006 if (auto tid = name in tidByName)
1007 return *tid;
1008 return Tid.init;
1009 }
1010 }
1011
1012 /**
1013 * Encapsulates all implementation-level data needed for scheduling.
1014 *
1015 * When defining a Scheduler, an instance of this struct must be associated
1016 * with each logical thread. It contains all implementation-level information
1017 * needed by the internal API.
1018 */
1019 struct ThreadInfo
1020 {
1021 Tid ident;
1022 bool[Tid] links;
1023 Tid owner;
1024
1025 /**
1026 * Gets a thread-local instance of ThreadInfo.
1027 *
1028 * Gets a thread-local instance of ThreadInfo, which should be used as the
1029 * default instance when info is requested for a thread not created by the
1030 * Scheduler.
1031 */
thisInfoThreadInfo1032 static @property ref thisInfo() nothrow
1033 {
1034 static ThreadInfo val;
1035 return val;
1036 }
1037
1038 /**
1039 * Cleans up this ThreadInfo.
1040 *
1041 * This must be called when a scheduled thread terminates. It tears down
1042 * the messaging system for the thread and notifies interested parties of
1043 * the thread's termination.
1044 */
cleanupThreadInfo1045 void cleanup()
1046 {
1047 if (ident.mbox !is null)
1048 ident.mbox.close();
1049 foreach (tid; links.keys)
1050 _send(MsgType.linkDead, tid, ident);
1051 if (owner != Tid.init)
1052 _send(MsgType.linkDead, owner, ident);
1053 unregisterMe(); // clean up registry entries
1054 }
1055 }
1056
1057 /**
1058 * A Scheduler controls how threading is performed by spawn.
1059 *
1060 * Implementing a Scheduler allows the concurrency mechanism used by this
1061 * module to be customized according to different needs. By default, a call
1062 * to spawn will create a new kernel thread that executes the supplied routine
1063 * and terminates when finished. But it is possible to create Schedulers that
1064 * reuse threads, that multiplex Fibers (coroutines) across a single thread,
1065 * or any number of other approaches. By making the choice of Scheduler a
1066 * user-level option, std.concurrency may be used for far more types of
1067 * application than if this behavior were predefined.
1068 *
1069 * Example:
1070 * ---
1071 * import std.concurrency;
1072 * import std.stdio;
1073 *
1074 * void main()
1075 * {
1076 * scheduler = new FiberScheduler;
1077 * scheduler.start(
1078 * {
1079 * writeln("the rest of main goes here");
1080 * });
1081 * }
1082 * ---
1083 *
1084 * Some schedulers have a dispatching loop that must run if they are to work
1085 * properly, so for the sake of consistency, when using a scheduler, start()
1086 * must be called within main(). This yields control to the scheduler and
1087 * will ensure that any spawned threads are executed in an expected manner.
1088 */
1089 interface Scheduler
1090 {
1091 /**
1092 * Spawns the supplied op and starts the Scheduler.
1093 *
1094 * This is intended to be called at the start of the program to yield all
1095 * scheduling to the active Scheduler instance. This is necessary for
1096 * schedulers that explicitly dispatch threads rather than simply relying
1097 * on the operating system to do so, and so start should always be called
1098 * within main() to begin normal program execution.
1099 *
1100 * Params:
1101 * op = A wrapper for whatever the main thread would have done in the
1102 * absence of a custom scheduler. It will be automatically executed
1103 * via a call to spawn by the Scheduler.
1104 */
1105 void start(void delegate() op);
1106
1107 /**
1108 * Assigns a logical thread to execute the supplied op.
1109 *
1110 * This routine is called by spawn. It is expected to instantiate a new
1111 * logical thread and run the supplied operation. This thread must call
1112 * thisInfo.cleanup() when the thread terminates if the scheduled thread
1113 * is not a kernel thread--all kernel threads will have their ThreadInfo
1114 * cleaned up automatically by a thread-local destructor.
1115 *
1116 * Params:
1117 * op = The function to execute. This may be the actual function passed
1118 * by the user to spawn itself, or may be a wrapper function.
1119 */
1120 void spawn(void delegate() op);
1121
1122 /**
1123 * Yields execution to another logical thread.
1124 *
1125 * This routine is called at various points within concurrency-aware APIs
1126 * to provide a scheduler a chance to yield execution when using some sort
1127 * of cooperative multithreading model. If this is not appropriate, such
1128 * as when each logical thread is backed by a dedicated kernel thread,
1129 * this routine may be a no-op.
1130 */
1131 void yield() nothrow;
1132
1133 /**
1134 * Returns an appropriate ThreadInfo instance.
1135 *
1136 * Returns an instance of ThreadInfo specific to the logical thread that
1137 * is calling this routine or, if the calling thread was not create by
1138 * this scheduler, returns ThreadInfo.thisInfo instead.
1139 */
1140 @property ref ThreadInfo thisInfo() nothrow;
1141
1142 /**
1143 * Creates a Condition variable analog for signaling.
1144 *
1145 * Creates a new Condition variable analog which is used to check for and
1146 * to signal the addition of messages to a thread's message queue. Like
1147 * yield, some schedulers may need to define custom behavior so that calls
1148 * to Condition.wait() yield to another thread when no new messages are
1149 * available instead of blocking.
1150 *
1151 * Params:
1152 * m = The Mutex that will be associated with this condition. It will be
1153 * locked prior to any operation on the condition, and so in some
1154 * cases a Scheduler may need to hold this reference and unlock the
1155 * mutex before yielding execution to another logical thread.
1156 */
1157 Condition newCondition(Mutex m) nothrow;
1158 }
1159
1160 /**
1161 * An example Scheduler using kernel threads.
1162 *
1163 * This is an example Scheduler that mirrors the default scheduling behavior
1164 * of creating one kernel thread per call to spawn. It is fully functional
1165 * and may be instantiated and used, but is not a necessary part of the
1166 * default functioning of this module.
1167 */
1168 class ThreadScheduler : Scheduler
1169 {
1170 /**
1171 * This simply runs op directly, since no real scheduling is needed by
1172 * this approach.
1173 */
start(void delegate ()op)1174 void start(void delegate() op)
1175 {
1176 op();
1177 }
1178
1179 /**
1180 * Creates a new kernel thread and assigns it to run the supplied op.
1181 */
spawn(void delegate ()op)1182 void spawn(void delegate() op)
1183 {
1184 auto t = new Thread(op);
1185 t.start();
1186 }
1187
1188 /**
1189 * This scheduler does no explicit multiplexing, so this is a no-op.
1190 */
yield()1191 void yield() nothrow
1192 {
1193 // no explicit yield needed
1194 }
1195
1196 /**
1197 * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
1198 * ThreadInfo, which is the correct behavior for this scheduler.
1199 */
thisInfo()1200 @property ref ThreadInfo thisInfo() nothrow
1201 {
1202 return ThreadInfo.thisInfo;
1203 }
1204
1205 /**
1206 * Creates a new Condition variable. No custom behavior is needed here.
1207 */
newCondition(Mutex m)1208 Condition newCondition(Mutex m) nothrow
1209 {
1210 return new Condition(m);
1211 }
1212 }
1213
1214 /**
1215 * An example Scheduler using Fibers.
1216 *
1217 * This is an example scheduler that creates a new Fiber per call to spawn
1218 * and multiplexes the execution of all fibers within the main thread.
1219 */
1220 class FiberScheduler : Scheduler
1221 {
1222 /**
1223 * This creates a new Fiber for the supplied op and then starts the
1224 * dispatcher.
1225 */
start(void delegate ()op)1226 void start(void delegate() op)
1227 {
1228 create(op);
1229 dispatch();
1230 }
1231
1232 /**
1233 * This created a new Fiber for the supplied op and adds it to the
1234 * dispatch list.
1235 */
spawn(void delegate ()op)1236 void spawn(void delegate() op) nothrow
1237 {
1238 create(op);
1239 yield();
1240 }
1241
1242 /**
1243 * If the caller is a scheduled Fiber, this yields execution to another
1244 * scheduled Fiber.
1245 */
yield()1246 void yield() nothrow
1247 {
1248 // NOTE: It's possible that we should test whether the calling Fiber
1249 // is an InfoFiber before yielding, but I think it's reasonable
1250 // that any (non-Generator) fiber should yield here.
1251 if (Fiber.getThis())
1252 Fiber.yield();
1253 }
1254
1255 /**
1256 * Returns an appropriate ThreadInfo instance.
1257 *
1258 * Returns a ThreadInfo instance specific to the calling Fiber if the
1259 * Fiber was created by this dispatcher, otherwise it returns
1260 * ThreadInfo.thisInfo.
1261 */
thisInfo()1262 @property ref ThreadInfo thisInfo() nothrow
1263 {
1264 auto f = cast(InfoFiber) Fiber.getThis();
1265
1266 if (f !is null)
1267 return f.info;
1268 return ThreadInfo.thisInfo;
1269 }
1270
1271 /**
1272 * Returns a Condition analog that yields when wait or notify is called.
1273 */
newCondition(Mutex m)1274 Condition newCondition(Mutex m) nothrow
1275 {
1276 return new FiberCondition(m);
1277 }
1278
1279 private:
1280 static class InfoFiber : Fiber
1281 {
1282 ThreadInfo info;
1283
this(void delegate ()op)1284 this(void delegate() op) nothrow
1285 {
1286 super(op);
1287 }
1288 }
1289
1290 class FiberCondition : Condition
1291 {
this(Mutex m)1292 this(Mutex m) nothrow
1293 {
1294 super(m);
1295 notified = false;
1296 }
1297
wait()1298 override void wait() nothrow
1299 {
1300 scope (exit) notified = false;
1301
1302 while (!notified)
1303 switchContext();
1304 }
1305
wait(Duration period)1306 override bool wait(Duration period) nothrow
1307 {
1308 import core.time : MonoTime;
1309
1310 scope (exit) notified = false;
1311
1312 for (auto limit = MonoTime.currTime + period;
1313 !notified && !period.isNegative;
1314 period = limit - MonoTime.currTime)
1315 {
1316 yield();
1317 }
1318 return notified;
1319 }
1320
notify()1321 override void notify() nothrow
1322 {
1323 notified = true;
1324 switchContext();
1325 }
1326
notifyAll()1327 override void notifyAll() nothrow
1328 {
1329 notified = true;
1330 switchContext();
1331 }
1332
1333 private:
switchContext()1334 void switchContext() nothrow
1335 {
1336 mutex_nothrow.unlock_nothrow();
1337 scope (exit) mutex_nothrow.lock_nothrow();
1338 yield();
1339 }
1340
1341 private bool notified;
1342 }
1343
1344 private:
dispatch()1345 void dispatch()
1346 {
1347 import std.algorithm.mutation : remove;
1348
1349 while (m_fibers.length > 0)
1350 {
1351 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1352 if (t !is null && !(cast(OwnerTerminated) t))
1353 {
1354 throw t;
1355 }
1356 if (m_fibers[m_pos].state == Fiber.State.TERM)
1357 {
1358 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1359 m_pos = 0;
1360 }
1361 else if (m_pos++ >= m_fibers.length - 1)
1362 {
1363 m_pos = 0;
1364 }
1365 }
1366 }
1367
create(void delegate ()op)1368 void create(void delegate() op) nothrow
1369 {
1370 void wrap()
1371 {
1372 scope (exit)
1373 {
1374 thisInfo.cleanup();
1375 }
1376 op();
1377 }
1378
1379 m_fibers ~= new InfoFiber(&wrap);
1380 }
1381
1382 private:
1383 Fiber[] m_fibers;
1384 size_t m_pos;
1385 }
1386
1387 @system unittest
1388 {
receive(Condition cond,ref size_t received)1389 static void receive(Condition cond, ref size_t received)
1390 {
1391 while (true)
1392 {
1393 synchronized (cond.mutex)
1394 {
1395 cond.wait();
1396 ++received;
1397 }
1398 }
1399 }
1400
send(Condition cond,ref size_t sent)1401 static void send(Condition cond, ref size_t sent)
1402 {
1403 while (true)
1404 {
1405 synchronized (cond.mutex)
1406 {
1407 ++sent;
1408 cond.notify();
1409 }
1410 }
1411 }
1412
1413 auto fs = new FiberScheduler;
1414 auto mtx = new Mutex;
1415 auto cond = fs.newCondition(mtx);
1416
1417 size_t received, sent;
1418 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1419 waiter.call();
1420 assert(received == 0);
1421 notifier.call();
1422 assert(sent == 1);
1423 assert(received == 0);
1424 waiter.call();
1425 assert(received == 1);
1426 waiter.call();
1427 assert(received == 1);
1428 }
1429
1430 /**
1431 * Sets the Scheduler behavior within the program.
1432 *
1433 * This variable sets the Scheduler behavior within this program. Typically,
1434 * when setting a Scheduler, scheduler.start() should be called in main. This
1435 * routine will not return until program execution is complete.
1436 */
1437 __gshared Scheduler scheduler;
1438
1439 // Generator
1440
1441 /**
1442 * If the caller is a Fiber and is not a Generator, this function will call
1443 * scheduler.yield() or Fiber.yield(), as appropriate.
1444 */
yield()1445 void yield() nothrow
1446 {
1447 auto fiber = Fiber.getThis();
1448 if (!(cast(IsGenerator) fiber))
1449 {
1450 if (scheduler is null)
1451 {
1452 if (fiber)
1453 return Fiber.yield();
1454 }
1455 else
1456 scheduler.yield();
1457 }
1458 }
1459
1460 /// Used to determine whether a Generator is running.
1461 private interface IsGenerator {}
1462
1463
1464 /**
1465 * A Generator is a Fiber that periodically returns values of type T to the
1466 * caller via yield. This is represented as an InputRange.
1467 *
1468 * Example:
1469 * ---
1470 * import std.concurrency;
1471 * import std.stdio;
1472 *
1473 *
1474 * void main()
1475 * {
1476 * auto tid = spawn(
1477 * {
1478 * while (true)
1479 * {
1480 * writeln(receiveOnly!int());
1481 * }
1482 * });
1483 *
1484 * auto r = new Generator!int(
1485 * {
1486 * foreach (i; 1 .. 10)
1487 * yield(i);
1488 * });
1489 *
1490 * foreach (e; r)
1491 * {
1492 * tid.send(e);
1493 * }
1494 * }
1495 * ---
1496 */
Generator(T)1497 class Generator(T) :
1498 Fiber, IsGenerator, InputRange!T
1499 {
1500 /**
1501 * Initializes a generator object which is associated with a static
1502 * D function. The function will be called once to prepare the range
1503 * for iteration.
1504 *
1505 * Params:
1506 * fn = The fiber function.
1507 *
1508 * In:
1509 * fn must not be null.
1510 */
1511 this(void function() fn)
1512 {
1513 super(fn);
1514 call();
1515 }
1516
1517 /**
1518 * Initializes a generator object which is associated with a static
1519 * D function. The function will be called once to prepare the range
1520 * for iteration.
1521 *
1522 * Params:
1523 * fn = The fiber function.
1524 * sz = The stack size for this fiber.
1525 *
1526 * In:
1527 * fn must not be null.
1528 */
1529 this(void function() fn, size_t sz)
1530 {
1531 super(fn, sz);
1532 call();
1533 }
1534
1535 /**
1536 * Initializes a generator object which is associated with a dynamic
1537 * D function. The function will be called once to prepare the range
1538 * for iteration.
1539 *
1540 * Params:
1541 * dg = The fiber function.
1542 *
1543 * In:
1544 * dg must not be null.
1545 */
1546 this(void delegate() dg)
1547 {
1548 super(dg);
1549 call();
1550 }
1551
1552 /**
1553 * Initializes a generator object which is associated with a dynamic
1554 * D function. The function will be called once to prepare the range
1555 * for iteration.
1556 *
1557 * Params:
1558 * dg = The fiber function.
1559 * sz = The stack size for this fiber.
1560 *
1561 * In:
1562 * dg must not be null.
1563 */
1564 this(void delegate() dg, size_t sz)
1565 {
1566 super(dg, sz);
1567 call();
1568 }
1569
1570 /**
1571 * Returns true if the generator is empty.
1572 */
1573 final bool empty() @property
1574 {
1575 return m_value is null || state == State.TERM;
1576 }
1577
1578 /**
1579 * Obtains the next value from the underlying function.
1580 */
1581 final void popFront()
1582 {
1583 call();
1584 }
1585
1586 /**
1587 * Returns the most recently generated value by shallow copy.
1588 */
1589 final T front() @property
1590 {
1591 return *m_value;
1592 }
1593
1594 /**
1595 * Returns the most recently generated value without executing a
1596 * copy contructor. Will not compile for element types defining a
1597 * postblit, because Generator does not return by reference.
1598 */
1599 final T moveFront()
1600 {
1601 static if (!hasElaborateCopyConstructor!T)
1602 {
1603 return front;
1604 }
1605 else
1606 {
1607 static assert(0,
1608 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1609 }
1610 }
1611
1612 final int opApply(scope int delegate(T) loopBody)
1613 {
1614 int broken;
1615 for (; !empty; popFront())
1616 {
1617 broken = loopBody(front);
1618 if (broken) break;
1619 }
1620 return broken;
1621 }
1622
1623 final int opApply(scope int delegate(size_t, T) loopBody)
1624 {
1625 int broken;
1626 for (size_t i; !empty; ++i, popFront())
1627 {
1628 broken = loopBody(i, front);
1629 if (broken) break;
1630 }
1631 return broken;
1632 }
1633 private:
1634 T* m_value;
1635 }
1636
1637 /**
1638 * Yields a value of type T to the caller of the currently executing
1639 * generator.
1640 *
1641 * Params:
1642 * value = The value to yield.
1643 */
yield(T)1644 void yield(T)(ref T value)
1645 {
1646 Generator!T cur = cast(Generator!T) Fiber.getThis();
1647 if (cur !is null && cur.state == Fiber.State.EXEC)
1648 {
1649 cur.m_value = &value;
1650 return Fiber.yield();
1651 }
1652 throw new Exception("yield(T) called with no active generator for the supplied type");
1653 }
1654
1655 /// ditto
yield(T)1656 void yield(T)(T value)
1657 {
1658 yield(value);
1659 }
1660
1661 @system unittest
1662 {
1663 import core.exception;
1664 import std.exception;
1665
testScheduler(Scheduler s)1666 static void testScheduler(Scheduler s)
1667 {
1668 scheduler = s;
1669 scheduler.start({
1670 auto tid = spawn({
1671 int i;
1672
1673 try
1674 {
1675 for (i = 1; i < 10; i++)
1676 {
1677 assertNotThrown!AssertError(assert(receiveOnly!int() == i));
1678 }
1679 }
1680 catch (OwnerTerminated e)
1681 {
1682
1683 }
1684
1685 // i will advance 1 past the last value expected
1686 assert(i == 4);
1687 });
1688
1689 auto r = new Generator!int({
1690 assertThrown!Exception(yield(2.0));
1691 yield(); // ensure this is a no-op
1692 yield(1);
1693 yield(); // also once something has been yielded
1694 yield(2);
1695 yield(3);
1696 });
1697
1698 foreach (e; r)
1699 {
1700 tid.send(e);
1701 }
1702 });
1703 scheduler = null;
1704 }
1705
1706 testScheduler(new ThreadScheduler);
1707 testScheduler(new FiberScheduler);
1708 }
1709 ///
1710 @system unittest
1711 {
1712 import std.range;
1713
1714 InputRange!int myIota = iota(10).inputRangeObject;
1715
1716 myIota.popFront();
1717 myIota.popFront();
1718 assert(myIota.moveFront == 2);
1719 assert(myIota.front == 2);
1720 myIota.popFront();
1721 assert(myIota.front == 3);
1722
1723 //can be assigned to std.range.interfaces.InputRange directly
1724 myIota = new Generator!int(
1725 {
1726 foreach (i; 0 .. 10) yield(i);
1727 });
1728
1729 myIota.popFront();
1730 myIota.popFront();
1731 assert(myIota.moveFront == 2);
1732 assert(myIota.front == 2);
1733 myIota.popFront();
1734 assert(myIota.front == 3);
1735
1736 size_t[2] counter = [0, 0];
1737 foreach (i, unused; myIota) counter[] += [1, i];
1738
1739 assert(myIota.empty);
1740 assert(counter == [7, 21]);
1741 }
1742
1743 private
1744 {
1745 /*
1746 * A MessageBox is a message queue for one thread. Other threads may send
1747 * messages to this owner by calling put(), and the owner receives them by
1748 * calling get(). The put() call is therefore effectively shared and the
1749 * get() call is effectively local. setMaxMsgs may be used by any thread
1750 * to limit the size of the message queue.
1751 */
1752 class MessageBox
1753 {
this()1754 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
1755 {
1756 m_lock = new Mutex;
1757 m_closed = false;
1758
1759 if (scheduler is null)
1760 {
1761 m_putMsg = new Condition(m_lock);
1762 m_notFull = new Condition(m_lock);
1763 }
1764 else
1765 {
1766 m_putMsg = scheduler.newCondition(m_lock);
1767 m_notFull = scheduler.newCondition(m_lock);
1768 }
1769 }
1770
1771 ///
isClosed()1772 final @property bool isClosed() @safe @nogc pure
1773 {
1774 synchronized (m_lock)
1775 {
1776 return m_closed;
1777 }
1778 }
1779
1780 /*
1781 * Sets a limit on the maximum number of user messages allowed in the
1782 * mailbox. If this limit is reached, the caller attempting to add
1783 * a new message will execute call. If num is zero, there is no limit
1784 * on the message queue.
1785 *
1786 * Params:
1787 * num = The maximum size of the queue or zero if the queue is
1788 * unbounded.
1789 * call = The routine to call when the queue is full.
1790 */
setMaxMsgs(size_t num,bool function (Tid)call)1791 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
1792 {
1793 synchronized (m_lock)
1794 {
1795 m_maxMsgs = num;
1796 m_onMaxMsgs = call;
1797 }
1798 }
1799
1800 /*
1801 * If maxMsgs is not set, the message is added to the queue and the
1802 * owner is notified. If the queue is full, the message will still be
1803 * accepted if it is a control message, otherwise onCrowdingDoThis is
1804 * called. If the routine returns true, this call will block until
1805 * the owner has made space available in the queue. If it returns
1806 * false, this call will abort.
1807 *
1808 * Params:
1809 * msg = The message to put in the queue.
1810 *
1811 * Throws:
1812 * An exception if the queue is full and onCrowdingDoThis throws.
1813 */
put(ref Message msg)1814 final void put(ref Message msg)
1815 {
1816 synchronized (m_lock)
1817 {
1818 // TODO: Generate an error here if m_closed is true, or maybe
1819 // put a message in the caller's queue?
1820 if (!m_closed)
1821 {
1822 while (true)
1823 {
1824 if (isPriorityMsg(msg))
1825 {
1826 m_sharedPty.put(msg);
1827 m_putMsg.notify();
1828 return;
1829 }
1830 if (!mboxFull() || isControlMsg(msg))
1831 {
1832 m_sharedBox.put(msg);
1833 m_putMsg.notify();
1834 return;
1835 }
1836 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
1837 {
1838 return;
1839 }
1840 m_putQueue++;
1841 m_notFull.wait();
1842 m_putQueue--;
1843 }
1844 }
1845 }
1846 }
1847
1848 /*
1849 * Matches ops against each message in turn until a match is found.
1850 *
1851 * Params:
1852 * ops = The operations to match. Each may return a bool to indicate
1853 * whether a message with a matching type is truly a match.
1854 *
1855 * Returns:
1856 * true if a message was retrieved and false if not (such as if a
1857 * timeout occurred).
1858 *
1859 * Throws:
1860 * LinkTerminated if a linked thread terminated, or OwnerTerminated
1861 * if the owner thread terminates and no existing messages match the
1862 * supplied ops.
1863 */
get(T...)1864 bool get(T...)(scope T vals)
1865 {
1866 import std.meta : AliasSeq;
1867
1868 static assert(T.length);
1869
1870 static if (isImplicitlyConvertible!(T[0], Duration))
1871 {
1872 alias Ops = AliasSeq!(T[1 .. $]);
1873 alias ops = vals[1 .. $];
1874 enum timedWait = true;
1875 Duration period = vals[0];
1876 }
1877 else
1878 {
1879 alias Ops = AliasSeq!(T);
1880 alias ops = vals[0 .. $];
1881 enum timedWait = false;
1882 }
1883
1884 bool onStandardMsg(ref Message msg)
1885 {
1886 foreach (i, t; Ops)
1887 {
1888 alias Args = Parameters!(t);
1889 auto op = ops[i];
1890
1891 if (msg.convertsTo!(Args))
1892 {
1893 static if (is(ReturnType!(t) == bool))
1894 {
1895 return msg.map(op);
1896 }
1897 else
1898 {
1899 msg.map(op);
1900 return true;
1901 }
1902 }
1903 }
1904 return false;
1905 }
1906
1907 bool onLinkDeadMsg(ref Message msg)
1908 {
1909 assert(msg.convertsTo!(Tid));
1910 auto tid = msg.get!(Tid);
1911
1912 if (bool* pDepends = tid in thisInfo.links)
1913 {
1914 auto depends = *pDepends;
1915 thisInfo.links.remove(tid);
1916 // Give the owner relationship precedence.
1917 if (depends && tid != thisInfo.owner)
1918 {
1919 auto e = new LinkTerminated(tid);
1920 auto m = Message(MsgType.standard, e);
1921 if (onStandardMsg(m))
1922 return true;
1923 throw e;
1924 }
1925 }
1926 if (tid == thisInfo.owner)
1927 {
1928 thisInfo.owner = Tid.init;
1929 auto e = new OwnerTerminated(tid);
1930 auto m = Message(MsgType.standard, e);
1931 if (onStandardMsg(m))
1932 return true;
1933 throw e;
1934 }
1935 return false;
1936 }
1937
1938 bool onControlMsg(ref Message msg)
1939 {
1940 switch (msg.type)
1941 {
1942 case MsgType.linkDead:
1943 return onLinkDeadMsg(msg);
1944 default:
1945 return false;
1946 }
1947 }
1948
1949 bool scan(ref ListT list)
1950 {
1951 for (auto range = list[]; !range.empty;)
1952 {
1953 // Only the message handler will throw, so if this occurs
1954 // we can be certain that the message was handled.
1955 scope (failure)
1956 list.removeAt(range);
1957
1958 if (isControlMsg(range.front))
1959 {
1960 if (onControlMsg(range.front))
1961 {
1962 // Although the linkDead message is a control message,
1963 // it can be handled by the user. Since the linkDead
1964 // message throws if not handled, if we get here then
1965 // it has been handled and we can return from receive.
1966 // This is a weird special case that will have to be
1967 // handled in a more general way if more are added.
1968 if (!isLinkDeadMsg(range.front))
1969 {
1970 list.removeAt(range);
1971 continue;
1972 }
1973 list.removeAt(range);
1974 return true;
1975 }
1976 range.popFront();
1977 continue;
1978 }
1979 else
1980 {
1981 if (onStandardMsg(range.front))
1982 {
1983 list.removeAt(range);
1984 return true;
1985 }
1986 range.popFront();
1987 continue;
1988 }
1989 }
1990 return false;
1991 }
1992
1993 bool pty(ref ListT list)
1994 {
1995 if (!list.empty)
1996 {
1997 auto range = list[];
1998
1999 if (onStandardMsg(range.front))
2000 {
2001 list.removeAt(range);
2002 return true;
2003 }
2004 if (range.front.convertsTo!(Throwable))
2005 throw range.front.get!(Throwable);
2006 else if (range.front.convertsTo!(shared(Throwable)))
2007 throw range.front.get!(shared(Throwable));
2008 else
2009 throw new PriorityMessageException(range.front.data);
2010 }
2011 return false;
2012 }
2013
2014 static if (timedWait)
2015 {
2016 import core.time : MonoTime;
2017 auto limit = MonoTime.currTime + period;
2018 }
2019
2020 while (true)
2021 {
2022 ListT arrived;
2023
2024 if (pty(m_localPty) || scan(m_localBox))
2025 {
2026 return true;
2027 }
2028 yield();
2029 synchronized (m_lock)
2030 {
2031 updateMsgCount();
2032 while (m_sharedPty.empty && m_sharedBox.empty)
2033 {
2034 // NOTE: We're notifying all waiters here instead of just
2035 // a few because the onCrowding behavior may have
2036 // changed and we don't want to block sender threads
2037 // unnecessarily if the new behavior is not to block.
2038 // This will admittedly result in spurious wakeups
2039 // in other situations, but what can you do?
2040 if (m_putQueue && !mboxFull())
2041 m_notFull.notifyAll();
2042 static if (timedWait)
2043 {
2044 if (period <= Duration.zero || !m_putMsg.wait(period))
2045 return false;
2046 }
2047 else
2048 {
2049 m_putMsg.wait();
2050 }
2051 }
2052 m_localPty.put(m_sharedPty);
2053 arrived.put(m_sharedBox);
2054 }
2055 if (m_localPty.empty)
2056 {
2057 scope (exit) m_localBox.put(arrived);
2058 if (scan(arrived))
2059 {
2060 return true;
2061 }
2062 else
2063 {
2064 static if (timedWait)
2065 {
2066 period = limit - MonoTime.currTime;
2067 }
2068 continue;
2069 }
2070 }
2071 m_localBox.put(arrived);
2072 pty(m_localPty);
2073 return true;
2074 }
2075 }
2076
2077 /*
2078 * Called on thread termination. This routine processes any remaining
2079 * control messages, clears out message queues, and sets a flag to
2080 * reject any future messages.
2081 */
close()2082 final void close()
2083 {
2084 static void onLinkDeadMsg(ref Message msg)
2085 {
2086 assert(msg.convertsTo!(Tid));
2087 auto tid = msg.get!(Tid);
2088
2089 thisInfo.links.remove(tid);
2090 if (tid == thisInfo.owner)
2091 thisInfo.owner = Tid.init;
2092 }
2093
2094 static void sweep(ref ListT list)
2095 {
2096 for (auto range = list[]; !range.empty; range.popFront())
2097 {
2098 if (range.front.type == MsgType.linkDead)
2099 onLinkDeadMsg(range.front);
2100 }
2101 }
2102
2103 ListT arrived;
2104
2105 sweep(m_localBox);
2106 synchronized (m_lock)
2107 {
2108 arrived.put(m_sharedBox);
2109 m_closed = true;
2110 }
2111 m_localBox.clear();
2112 sweep(arrived);
2113 }
2114
2115 private:
2116 // Routines involving local data only, no lock needed.
2117
mboxFull()2118 bool mboxFull() @safe @nogc pure nothrow
2119 {
2120 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2121 }
2122
updateMsgCount()2123 void updateMsgCount() @safe @nogc pure nothrow
2124 {
2125 m_localMsgs = m_localBox.length;
2126 }
2127
isControlMsg(ref Message msg)2128 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2129 {
2130 return msg.type != MsgType.standard && msg.type != MsgType.priority;
2131 }
2132
isPriorityMsg(ref Message msg)2133 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2134 {
2135 return msg.type == MsgType.priority;
2136 }
2137
isLinkDeadMsg(ref Message msg)2138 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2139 {
2140 return msg.type == MsgType.linkDead;
2141 }
2142
2143 alias OnMaxFn = bool function(Tid);
2144 alias ListT = List!(Message);
2145
2146 ListT m_localBox;
2147 ListT m_localPty;
2148
2149 Mutex m_lock;
2150 Condition m_putMsg;
2151 Condition m_notFull;
2152 size_t m_putQueue;
2153 ListT m_sharedBox;
2154 ListT m_sharedPty;
2155 OnMaxFn m_onMaxMsgs;
2156 size_t m_localMsgs;
2157 size_t m_maxMsgs;
2158 bool m_closed;
2159 }
2160
2161 /*
2162 *
2163 */
List(T)2164 struct List(T)
2165 {
2166 struct Range
2167 {
2168 import std.exception : enforce;
2169
2170 @property bool empty() const
2171 {
2172 return !m_prev.next;
2173 }
2174
2175 @property ref T front()
2176 {
2177 enforce(m_prev.next, "invalid list node");
2178 return m_prev.next.val;
2179 }
2180
2181 @property void front(T val)
2182 {
2183 enforce(m_prev.next, "invalid list node");
2184 m_prev.next.val = val;
2185 }
2186
2187 void popFront()
2188 {
2189 enforce(m_prev.next, "invalid list node");
2190 m_prev = m_prev.next;
2191 }
2192
2193 private this(Node* p)
2194 {
2195 m_prev = p;
2196 }
2197
2198 private Node* m_prev;
2199 }
2200
2201 void put(T val)
2202 {
2203 put(newNode(val));
2204 }
2205
2206 void put(ref List!(T) rhs)
2207 {
2208 if (!rhs.empty)
2209 {
2210 put(rhs.m_first);
2211 while (m_last.next !is null)
2212 {
2213 m_last = m_last.next;
2214 m_count++;
2215 }
2216 rhs.m_first = null;
2217 rhs.m_last = null;
2218 rhs.m_count = 0;
2219 }
2220 }
2221
2222 Range opSlice()
2223 {
2224 return Range(cast(Node*)&m_first);
2225 }
2226
2227 void removeAt(Range r)
2228 {
2229 import std.exception : enforce;
2230
2231 assert(m_count);
2232 Node* n = r.m_prev;
2233 enforce(n && n.next, "attempting to remove invalid list node");
2234
2235 if (m_last is m_first)
2236 m_last = null;
2237 else if (m_last is n.next)
2238 m_last = n; // nocoverage
2239 Node* to_free = n.next;
2240 n.next = n.next.next;
2241 freeNode(to_free);
2242 m_count--;
2243 }
2244
2245 @property size_t length()
2246 {
2247 return m_count;
2248 }
2249
2250 void clear()
2251 {
2252 m_first = m_last = null;
2253 m_count = 0;
2254 }
2255
2256 @property bool empty()
2257 {
2258 return m_first is null;
2259 }
2260
2261 private:
2262 struct Node
2263 {
2264 Node* next;
2265 T val;
2266
2267 this(T v)
2268 {
2269 val = v;
2270 }
2271 }
2272
2273 static shared struct SpinLock
2274 {
2275 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2276 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2277 bool locked;
2278 }
2279
2280 static shared SpinLock sm_lock;
2281 static shared Node* sm_head;
2282
2283 Node* newNode(T v)
2284 {
2285 Node* n;
2286 {
2287 sm_lock.lock();
2288 scope (exit) sm_lock.unlock();
2289
2290 if (sm_head)
2291 {
2292 n = cast(Node*) sm_head;
2293 sm_head = sm_head.next;
2294 }
2295 }
2296 if (n)
2297 {
2298 import std.conv : emplace;
2299 emplace!Node(n, v);
2300 }
2301 else
2302 {
2303 n = new Node(v);
2304 }
2305 return n;
2306 }
2307
2308 void freeNode(Node* n)
2309 {
2310 // destroy val to free any owned GC memory
2311 destroy(n.val);
2312
2313 sm_lock.lock();
2314 scope (exit) sm_lock.unlock();
2315
2316 auto sn = cast(shared(Node)*) n;
2317 sn.next = sm_head;
2318 sm_head = sn;
2319 }
2320
2321 void put(Node* n)
2322 {
2323 m_count++;
2324 if (!empty)
2325 {
2326 m_last.next = n;
2327 m_last = n;
2328 return;
2329 }
2330 m_first = n;
2331 m_last = n;
2332 }
2333
2334 Node* m_first;
2335 Node* m_last;
2336 size_t m_count;
2337 }
2338 }
2339
version(unittest)2340 version (unittest)
2341 {
2342 import std.stdio;
2343 import std.typecons : tuple, Tuple;
2344
2345 void testfn(Tid tid)
2346 {
2347 receive((float val) { assert(0); }, (int val, int val2) {
2348 assert(val == 42 && val2 == 86);
2349 });
2350 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2351 receive((Variant val) { });
2352 receive((string val) {
2353 if ("the quick brown fox" != val)
2354 return false;
2355 return true;
2356 }, (string val) { assert(false); });
2357 prioritySend(tid, "done");
2358 }
2359
2360 void runTest(Tid tid)
2361 {
2362 send(tid, 42, 86);
2363 send(tid, tuple(42, 86));
2364 send(tid, "hello", "there");
2365 send(tid, "the quick brown fox");
2366 receive((string val) { assert(val == "done"); });
2367 }
2368
2369 void simpleTest()
2370 {
2371 auto tid = spawn(&testfn, thisTid);
2372 runTest(tid);
2373
2374 // Run the test again with a limited mailbox size.
2375 tid = spawn(&testfn, thisTid);
2376 setMaxMailboxSize(tid, 2, OnCrowding.block);
2377 runTest(tid);
2378 }
2379
2380 @system unittest
2381 {
2382 simpleTest();
2383 }
2384
2385 @system unittest
2386 {
2387 scheduler = new ThreadScheduler;
2388 simpleTest();
2389 scheduler = null;
2390 }
2391 }
2392
initOnceLock()2393 private @property Mutex initOnceLock()
2394 {
2395 __gshared Mutex lock;
2396 if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock))
2397 return mtx;
2398 auto mtx = new Mutex;
2399 if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx))
2400 return mtx;
2401 return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock);
2402 }
2403
2404 /**
2405 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2406 * thread-safe manner.
2407 *
2408 * The implementation guarantees that all threads simultaneously calling
2409 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2410 * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2411 * afterwards.
2412 *
2413 * Params:
2414 * var = The variable to initialize
2415 * init = The lazy initializer value
2416 *
2417 * Returns:
2418 * A reference to the initialized variable
2419 */
initOnce(alias var)2420 auto ref initOnce(alias var)(lazy typeof(var) init)
2421 {
2422 return initOnce!var(init, initOnceLock);
2423 }
2424
2425 /// A typical use-case is to perform lazy but thread-safe initialization.
2426 @system unittest
2427 {
2428 static class MySingleton
2429 {
instance()2430 static MySingleton instance()
2431 {
2432 static __gshared MySingleton inst;
2433 return initOnce!inst(new MySingleton);
2434 }
2435 }
2436
2437 assert(MySingleton.instance !is null);
2438 }
2439
2440 @system unittest
2441 {
2442 static class MySingleton
2443 {
instance()2444 static MySingleton instance()
2445 {
2446 static __gshared MySingleton inst;
2447 return initOnce!inst(new MySingleton);
2448 }
2449
2450 private:
this()2451 this() { val = ++cnt; }
2452 size_t val;
2453 static __gshared size_t cnt;
2454 }
2455
2456 foreach (_; 0 .. 10)
2457 spawn({ ownerTid.send(MySingleton.instance.val); });
2458 foreach (_; 0 .. 10)
2459 assert(receiveOnly!size_t == MySingleton.instance.val);
2460 assert(MySingleton.cnt == 1);
2461 }
2462
2463 /**
2464 * Same as above, but takes a separate mutex instead of sharing one among
2465 * all initOnce instances.
2466 *
2467 * This should be used to avoid dead-locks when the $(D_PARAM init)
2468 * expression waits for the result of another thread that might also
2469 * call initOnce. Use with care.
2470 *
2471 * Params:
2472 * var = The variable to initialize
2473 * init = The lazy initializer value
2474 * mutex = A mutex to prevent race conditions
2475 *
2476 * Returns:
2477 * A reference to the initialized variable
2478 */
initOnce(alias var)2479 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2480 {
2481 // check that var is global, can't take address of a TLS variable
2482 static assert(is(typeof({ __gshared p = &var; })),
2483 "var must be 'static shared' or '__gshared'.");
2484 import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2485
2486 static shared bool flag;
2487 if (!atomicLoad!(MemoryOrder.acq)(flag))
2488 {
2489 synchronized (mutex)
2490 {
2491 if (!atomicLoad!(MemoryOrder.acq)(flag))
2492 {
2493 var = init;
2494 atomicStore!(MemoryOrder.rel)(flag, true);
2495 }
2496 }
2497 }
2498 return var;
2499 }
2500
2501 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2502 @system unittest
2503 {
2504 import core.sync.mutex : Mutex;
2505
2506 static shared bool varA, varB;
2507 __gshared Mutex m;
2508 m = new Mutex;
2509
2510 spawn({
2511 // use a different mutex for varB to avoid a dead-lock
2512 initOnce!varB(true, m);
2513 ownerTid.send(true);
2514 });
2515 // init depends on the result of the spawned thread
2516 initOnce!varA(receiveOnly!bool);
2517 assert(varA == true);
2518 assert(varB == true);
2519 }
2520
2521 @system unittest
2522 {
2523 static shared bool a;
2524 __gshared bool b;
2525 static bool c;
2526 bool d;
2527 initOnce!a(true);
2528 initOnce!b(true);
2529 static assert(!__traits(compiles, initOnce!c(true))); // TLS
2530 static assert(!__traits(compiles, initOnce!d(true))); // local variable
2531 }
2532