1 /*
2  * flow.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FLOW_FLOW_H
22 #define FLOW_FLOW_H
23 #pragma once
24 
25 #pragma warning( disable: 4244 4267 ) // SOMEDAY: Carefully check for integer overflow issues (e.g. size_t to int conversions like this suppresses)
26 #pragma warning( disable: 4345 )
27 #pragma warning( error: 4239 )
28 
29 #include <vector>
30 #include <queue>
31 #include <map>
32 #include <unordered_map>
33 #include <set>
34 #include <functional>
35 #include <iostream>
36 #include <string>
37 #include <utility>
38 #include <algorithm>
39 
40 #include "flow/Platform.h"
41 #include "flow/FastAlloc.h"
42 #include "flow/IRandom.h"
43 #include "flow/serialize.h"
44 #include "flow/Deque.h"
45 #include "flow/ThreadPrimitives.h"
46 #include "flow/network.h"
47 
48 #include <boost/version.hpp>
49 
50 using namespace std::rel_ops;
51 
52 #define TEST( condition ) if (!(condition)); else { static TraceEvent* __test = &(TraceEvent("CodeCoverage").detail("File", __FILE__).detail("Line",__LINE__).detail("Condition", #condition)); }
53 
54 /*
55 usage:
56 if (BUGGIFY) (
57 // code here is executed on some runs (with probability P_BUGGIFIED_SECTION_ACTIVATED),
58 //  sometimes --
59 )
60 */
61 
62 extern double P_BUGGIFIED_SECTION_ACTIVATED, P_BUGGIFIED_SECTION_FIRES, P_EXPENSIVE_VALIDATION;
63 int getSBVar(std::string file, int line);
64 void enableBuggify(bool enabled);   // Currently controls buggification and (randomized) expensive validation
65 bool validationIsEnabled();
66 
67 #define BUGGIFY_WITH_PROB(x) (getSBVar(__FILE__, __LINE__) && g_random->random01() < (x))
68 #define BUGGIFY BUGGIFY_WITH_PROB(P_BUGGIFIED_SECTION_FIRES)
69 #define EXPENSIVE_VALIDATION (validationIsEnabled() && g_random->random01() < P_EXPENSIVE_VALIDATION)
70 
71 extern Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_unit = "");
72 extern std::string format(const char* form, ...);
73 
74 // On success, returns the number of characters written. On failure, returns a negative number.
75 extern int vsformat(std::string &outputString, const char* form, va_list args);
76 
77 extern Standalone<StringRef> strinc(StringRef const& str);
78 extern StringRef strinc(StringRef const& str, Arena& arena);
79 extern Standalone<StringRef> addVersionStampAtEnd(StringRef const& str);
80 extern StringRef addVersionStampAtEnd(StringRef const& str, Arena& arena);
81 
82 template <typename Iter>
concatenate(Iter b,Iter const & e,Arena & arena)83 StringRef concatenate( Iter b, Iter const& e, Arena& arena ) {
84 	int rsize = 0;
85 	Iter i = b;
86 	while(i != e) {
87 		rsize += i->size();
88 		++i;
89 	}
90 	uint8_t* s = new (arena) uint8_t[ rsize ];
91 	uint8_t* p = s;
92 	while(b != e) {
93 		memcpy(p, b->begin(),b->size());
94 		p += b->size();
95 		++b;
96 	}
97 	return StringRef(s, rsize);
98 }
99 
100 template <typename Iter>
concatenate(Iter b,Iter const & e)101 Standalone<StringRef> concatenate( Iter b, Iter const& e ) {
102 	Standalone<StringRef> r;
103 	((StringRef &)r) = concatenate(b, e, r.arena());
104 	return r;
105 }
106 
107 class Void {
108 public:
109 	template <class Ar>
serialize(Ar &)110 	void serialize(Ar&) {}
111 };
112 
113 class Never {};
114 
115 template <class T>
116 class ErrorOr {
117 public:
ErrorOr()118 	ErrorOr() : error(default_error_or()) {}
ErrorOr(Error const & error)119 	ErrorOr(Error const& error) : error(error) {}
ErrorOr(const ErrorOr<T> & o)120 	ErrorOr(const ErrorOr<T>& o) : error(o.error) {
121 		if (present()) new (&value) T(o.get());
122 	}
123 
124 	template <class U>
ErrorOr(const U & t)125 	ErrorOr(const U& t) : error() { new (&value) T(t); }
126 
ErrorOr(Arena & a,const ErrorOr<T> & o)127 	ErrorOr(Arena& a, const ErrorOr<T>& o) : error(o.error) {
128 		if (present()) new (&value) T(a, o.get());
129 	}
expectedSize()130 	int expectedSize() const { return present() ? get().expectedSize() : 0; }
131 
castTo()132 	template <class R> ErrorOr<R> castTo() const {
133 		return map<R>([](const T& v){ return (R)v; });
134 	}
135 
map(std::function<R (T)> f)136 	template <class R> ErrorOr<R> map(std::function<R(T)> f) const {
137 		if (present()) {
138 			return ErrorOr<R>(f(get()));
139 		}
140 		else {
141 			return ErrorOr<R>(error);
142 		}
143 	}
144 
~ErrorOr()145 	~ErrorOr() {
146 		if (present()) ((T*)&value)->~T();
147 	}
148 
149 	ErrorOr & operator=(ErrorOr const& o) {
150 		if (present()) {
151 			((T*)&value)->~T();
152 		}
153 		if (o.present()) {
154 			new (&value) T(o.get());
155 		}
156 		error = o.error;
157 		return *this;
158 	}
159 
present()160 	bool present() const { return error.code() == invalid_error_code; }
get()161 	T& get() {
162 		UNSTOPPABLE_ASSERT(present());
163 		return *(T*)&value;
164 	}
get()165 	T const& get() const {
166 		UNSTOPPABLE_ASSERT(present());
167 		return *(T const*)&value;
168 	}
orDefault(T const & default_value)169 	T orDefault(T const& default_value) const { if (present()) return get(); else return default_value; }
170 
171 	template <class Ar>
serialize(Ar & ar)172 	void serialize(Ar& ar) {
173 		// SOMEDAY: specialize for space efficiency?
174 		serializer(ar, error);
175 		if (present()) {
176 			if (Ar::isDeserializing) new (&value) T();
177 			serializer(ar, *(T*)&value);
178 		}
179 	}
180 
isError()181 	bool isError() const { return error.code() != invalid_error_code; }
isError(int code)182 	bool isError(int code) const { return error.code() == code; }
getError()183 	Error getError() const { ASSERT(isError()); return error; }
184 
185 private:
186 	typename std::aligned_storage< sizeof(T), __alignof(T) >::type value;
187 	Error error;
188 };
189 
190 template <class T>
191 struct Callback {
192 	Callback<T> *prev, *next;
193 
fireCallback194 	virtual void fire(T const&) {}
errorCallback195 	virtual void error(Error) {}
unwaitCallback196 	virtual void unwait() {}
197 
insertCallback198 	void insert(Callback<T>* into) {
199 		// Add this (uninitialized) callback just after `into`
200 		this->prev = into;
201 		this->next = into->next;
202 		into->next->prev = this;
203 		into->next = this;
204 	}
205 
insertBackCallback206 	void insertBack(Callback<T>* into) {
207 		// Add this (uninitialized) callback just before `into`
208 		this->next = into;
209 		this->prev = into->prev;
210 		into->prev->next = this;
211 		into->prev = this;
212 	}
213 
insertChainCallback214 	void insertChain(Callback<T>* into) {
215 		// Combine this callback's (initialized) chain and `into`'s such that this callback is just after `into`
216 		auto p = this->prev;
217 		auto n = into->next;
218 		this->prev = into;
219 		into->next = this;
220 		p->next = n;
221 		n->prev = p;
222 	}
223 
removeCallback224 	void remove() {
225 		// Remove this callback from the list it is in, and call unwait() on the head of that list if this was the last callback
226 		next->prev = prev;
227 		prev->next = next;
228 		if (prev == next)
229 			next->unwait();
230 	}
231 
countCallbacksCallback232 	int countCallbacks() {
233 		int count = 0;
234 		for (Callback* c = next; c != this; c = c->next)
235 			count++;
236 		return count;
237 	}
238 };
239 
240 template <class T>
241 struct SingleCallback {
242 	// Used for waiting on FutureStreams, which don't support multiple callbacks
243 	SingleCallback<T> *next;
244 
fireSingleCallback245 	virtual void fire(T const&) {}
errorSingleCallback246 	virtual void error(Error) {}
unwaitSingleCallback247 	virtual void unwait() {}
248 
insertSingleCallback249 	void insert(SingleCallback<T>* into) {
250 		this->next = into->next;
251 		into->next = this;
252 	}
253 
removeSingleCallback254 	void remove() {
255 		ASSERT(next->next == this);
256 		next->next = next;
257 		next->unwait();
258 	}
259 };
260 
261 template <class T>
262 struct SAV : private Callback<T>, FastAllocated<SAV<T>> {
263 	int promises; // one for each promise (and one for an active actor if this is an actor)
264 	int futures;  // one for each future and one more if there are any callbacks
265 
266 private:
267 	typename std::aligned_storage< sizeof(T), __alignof(T) >::type value_storage;
268 public:
269 	Error error_state;
270 
271 	enum { UNSET_ERROR_CODE = -3, NEVER_ERROR_CODE, SET_ERROR_CODE };
272 
valueSAV273 	T& value() { return *(T*)&value_storage; }
274 
SAVSAV275 	SAV(int futures, int promises) : futures(futures), promises(promises), error_state(Error::fromCode(UNSET_ERROR_CODE)) {
276 		Callback<T>::prev = Callback<T>::next = this;
277 	}
~SAVSAV278 	~SAV() {
279 		if (int16_t(error_state.code()) == SET_ERROR_CODE)
280 			value().~T();
281 	}
282 
isSetSAV283 	bool isSet() const { return int16_t(error_state.code()) > NEVER_ERROR_CODE; }
canBeSetSAV284 	bool canBeSet() const { return int16_t(error_state.code()) == UNSET_ERROR_CODE; }
isErrorSAV285 	bool isError() const { return int16_t(error_state.code()) > SET_ERROR_CODE; }
286 
getSAV287 	T const& get() {
288 		ASSERT(isSet());
289 		if (isError()) throw error_state;
290 		return value();
291 	}
292 
293 	template <class U>
sendSAV294 	void send(U && value) {
295 		ASSERT(canBeSet());
296 		new (&value_storage) T(std::forward<U>(value));
297 		this->error_state = Error::fromCode(SET_ERROR_CODE);
298 		while (Callback<T>::next != this)
299 			Callback<T>::next->fire(this->value());
300 	}
301 
sendSAV302 	void send(Never) {
303 		ASSERT(canBeSet());
304 		this->error_state = Error::fromCode(NEVER_ERROR_CODE);
305 	}
306 
sendErrorSAV307 	void sendError(Error err) {
308 		ASSERT(canBeSet() && int16_t(err.code()) > 0);
309 		this->error_state = err;
310 		while (Callback<T>::next != this)
311 			Callback<T>::next->error(err);
312 	}
313 
314 	template <class U>
sendAndDelPromiseRefSAV315 	void sendAndDelPromiseRef(U && value) {
316 		ASSERT(canBeSet());
317 		if (promises == 1 && !futures) {
318 			// No one is left to receive the value, so we can just die
319 			destroy();
320 			return;
321 		}
322 
323 		new (&value_storage) T(std::forward<U>(value));
324 		finishSendAndDelPromiseRef();
325 	}
326 
finishSendAndDelPromiseRefSAV327 	void finishSendAndDelPromiseRef() {
328 		// Call only after value_storage has already been initialized!
329 		this->error_state = Error::fromCode(SET_ERROR_CODE);
330 		while (Callback<T>::next != this)
331 			Callback<T>::next->fire(this->value());
332 
333 		if (!--promises && !futures)
334 			destroy();
335 	}
336 
sendAndDelPromiseRefSAV337 	void sendAndDelPromiseRef(Never) {
338 		ASSERT(canBeSet());
339 		this->error_state = Error::fromCode(NEVER_ERROR_CODE);
340 		if (!--promises && !futures)
341 			destroy();
342 	}
343 
sendErrorAndDelPromiseRefSAV344 	void sendErrorAndDelPromiseRef(Error err) {
345 		ASSERT(canBeSet() && int16_t(err.code()) > 0);
346 		if (promises == 1 && !futures) {
347 			// No one is left to receive the value, so we can just die
348 			destroy();
349 			return;
350 		}
351 
352 		this->error_state = err;
353 		while (Callback<T>::next != this)
354 			Callback<T>::next->error(err);
355 
356 		if (!--promises && !futures)
357 			destroy();
358 	}
359 
addPromiseRefSAV360 	void addPromiseRef() { promises++; }
addFutureRefSAV361 	void addFutureRef() { futures++; }
362 
delPromiseRefSAV363 	void delPromiseRef() {
364 		if (promises == 1) {
365 			if (futures && canBeSet()) {
366 				sendError(broken_promise());
367 				ASSERT(promises == 1);  // Once there is only one promise, there is no one else with the right to change the promise reference count
368 			}
369 			promises = 0;
370 			if (!futures)
371 				destroy();
372 		}
373 		else
374 			--promises;
375 	}
delFutureRefSAV376 	void delFutureRef() {
377 		if (!--futures) {
378 			if (promises)
379 				cancel();
380 			else
381 				destroy();
382 		}
383 	}
384 
getFutureReferenceCountSAV385 	int getFutureReferenceCount() const { return futures; }
getPromiseReferenceCountSAV386 	int getPromiseReferenceCount() const { return promises; }
387 
destroySAV388 	virtual void destroy() { delete this; }
cancelSAV389 	virtual void cancel() {}
390 
addCallbackAndDelFutureRefSAV391 	void addCallbackAndDelFutureRef(Callback<T>* cb) {
392 		// We are always *logically* dropping one future reference from this, but if we are adding a first callback
393 		// we also need to add one (since futures is defined as being +1 if there are any callbacks), so net nothing
394 		if (Callback<T>::next != this)
395 			delFutureRef();
396 		cb->insert(this);
397 	}
398 
addYieldedCallbackAndDelFutureRefSAV399 	void addYieldedCallbackAndDelFutureRef(Callback<T>* cb) {
400 		// Same contract as addCallbackAndDelFutureRef, except that the callback is placed at the end of the callback chain rather than at the beginning
401 		if (Callback<T>::next != this)
402 			delFutureRef();
403 		cb->insertBack(this);
404 	}
405 
addCallbackChainAndDelFutureRefSAV406 	void addCallbackChainAndDelFutureRef(Callback<T>* cb) {
407 		if (Callback<T>::next != this)
408 			delFutureRef();
409 		cb->insertChain(this);
410 	}
411 
unwaitSAV412 	virtual void unwait() {
413 		delFutureRef();
414 	}
fireSAV415 	virtual void fire() { ASSERT(false); }
416 };
417 
418 template <class T>
419 struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>> {
420 	int promises; // one for each promise (and one for an active actor if this is an actor)
421 	int futures;  // one for each future and one more if there are any callbacks
422 
423 	// Invariant: SingleCallback<T>::next==this || (queue.empty() && !error.isValid())
424 	std::queue<T, Deque<T>> queue;
425 	Error error;
426 
NotifiedQueueNotifiedQueue427 	NotifiedQueue(int futures, int promises) : futures(futures), promises(promises) {
428 		SingleCallback<T>::next = this;
429 	}
430 
isReadyNotifiedQueue431 	bool isReady() const { return !queue.empty() || error.isValid(); }
isErrorNotifiedQueue432 	bool isError() const { return queue.empty() && error.isValid(); }  // the *next* thing queued is an error
433 
popNotifiedQueue434 	T pop() {
435 		if (queue.empty()) {
436 			if (error.isValid()) throw error;
437 			throw internal_error();
438 		}
439 		auto copy = queue.front();
440 		queue.pop();
441 		return copy;
442 	}
443 
444 	template <class U>
sendNotifiedQueue445 	void send(U && value) {
446 		if (error.isValid()) return;
447 
448 		if (SingleCallback<T>::next != this) {
449 			SingleCallback<T>::next->fire(std::forward<U>(value));
450 		}
451 		else {
452 			queue.emplace(std::forward<U>(value));
453 		}
454 	}
455 
sendErrorNotifiedQueue456 	void sendError(Error err) {
457 		if (error.isValid()) return;
458 
459 		this->error = err;
460 		if (SingleCallback<T>::next != this)
461 			SingleCallback<T>::next->error(err);
462 	}
463 
addPromiseRefNotifiedQueue464 	void addPromiseRef() { promises++; }
addFutureRefNotifiedQueue465 	void addFutureRef() { futures++; }
466 
delPromiseRefNotifiedQueue467 	void delPromiseRef() {
468 		if (!--promises) {
469 			if (futures) {
470 				sendError(broken_promise());
471 			}
472 			else
473 				destroy();
474 		}
475 	}
delFutureRefNotifiedQueue476 	void delFutureRef() {
477 		if (!--futures) {
478 			if (promises)
479 				cancel();
480 			else
481 				destroy();
482 		}
483 	}
484 
getFutureReferenceCountNotifiedQueue485 	int getFutureReferenceCount() const { return futures; }
getPromiseReferenceCountNotifiedQueue486 	int getPromiseReferenceCount() const { return promises; }
487 
destroyNotifiedQueue488 	virtual void destroy() { delete this; }
cancelNotifiedQueue489 	virtual void cancel() {}
490 
addCallbackAndDelFutureRefNotifiedQueue491 	void addCallbackAndDelFutureRef(SingleCallback<T>* cb) {
492 		ASSERT(SingleCallback<T>::next == this);
493 		cb->insert(this);
494 	}
unwaitNotifiedQueue495 	virtual void unwait() {
496 		delFutureRef();
497 	}
fireNotifiedQueue498 	virtual void fire() { ASSERT(false); }
499 };
500 
501 
502 template <class T>
503 class Promise;
504 
505 template <class T>
506 class Future
507 {
508 public:
get()509 	T const& get() const { return sav->get(); }
getValue()510 	T getValue() const { return get(); }
511 
isValid()512 	bool isValid() const {
513 		return sav != 0;
514 	}
isReady()515 	bool isReady() const {
516 		return sav->isSet();
517 	}
isError()518 	bool isError() const {
519 		return sav->isError();
520 	}
getError()521 	Error& getError() const {
522 		ASSERT(isError());
523 		return sav->error_state;
524 	}
525 
Future()526 	Future() : sav(0) {}
Future(const Future<T> & rhs)527 	Future(const Future<T>& rhs) : sav(rhs.sav) {
528 		if (sav) sav->addFutureRef();
529 		//if (sav->endpoint.isValid()) cout << "Future copied for " << sav->endpoint.key << endl;
530 	}
Future(Future<T> && rhs)531 	Future(Future<T>&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) {
532 		rhs.sav = 0;
533 		//if (sav->endpoint.isValid()) cout << "Future moved for " << sav->endpoint.key << endl;
534 	}
Future(const T & presentValue)535 	Future(const T& presentValue)
536 		: sav(new SAV<T>(1, 0))
537 	{
538 		sav->send(presentValue);
539 	}
Future(Never)540 	Future(Never)
541 		: sav(new SAV<T>(1, 0))
542 	{
543 		sav->send(Never());
544 	}
Future(const Error & error)545 	Future(const Error& error)
546 		: sav(new SAV<T>(1, 0))
547 	{
548 		sav->sendError(error);
549 	}
550 
551 #ifndef NO_INTELLISENSE
552 	template<class U>
553 	Future(const U&, typename std::enable_if<std::is_assignable<T, U>::value, int*>::type = 0) {}
554 #endif
555 
~Future()556 	~Future() {
557 		//if (sav && sav->endpoint.isValid()) cout << "Future destroyed for " << sav->endpoint.key << endl;
558 		if (sav) sav->delFutureRef();
559 	}
560 	void operator=(const Future<T>& rhs) {
561 		if (rhs.sav) rhs.sav->addFutureRef();
562 		if (sav) sav->delFutureRef();
563 		sav = rhs.sav;
564 	}
565 	void operator=(Future<T>&& rhs) BOOST_NOEXCEPT {
566 		if (sav != rhs.sav) {
567 			if (sav) sav->delFutureRef();
568 			sav = rhs.sav;
569 			rhs.sav = 0;
570 		}
571 	}
572 	bool operator == (const Future& rhs) { return rhs.sav == sav; }
573 	bool operator != (const Future& rhs) { return rhs.sav != sav; }
574 
cancel()575 	void cancel() {
576 		if (sav) sav->cancel();
577 	}
578 
addCallbackAndClear(Callback<T> * cb)579 	void addCallbackAndClear(Callback<T>* cb) {
580 		sav->addCallbackAndDelFutureRef(cb);
581 		sav = 0;
582 	}
583 
addYieldedCallbackAndClear(Callback<T> * cb)584 	void addYieldedCallbackAndClear(Callback<T>* cb) {
585 		sav->addYieldedCallbackAndDelFutureRef(cb);
586 		sav = 0;
587 	}
588 
addCallbackChainAndClear(Callback<T> * cb)589 	void addCallbackChainAndClear(Callback<T>* cb) {
590 		sav->addCallbackChainAndDelFutureRef(cb);
591 		sav = 0;
592 	}
593 
getFutureReferenceCount()594 	int getFutureReferenceCount() const { return sav->getFutureReferenceCount(); }
getPromiseReferenceCount()595 	int getPromiseReferenceCount() const { return sav->getPromiseReferenceCount(); }
596 
Future(SAV<T> * sav)597 	explicit Future(SAV<T> * sav) : sav(sav) {
598 		//if (sav->endpoint.isValid()) cout << "Future created for " << sav->endpoint.key << endl;
599 	}
600 
601 private:
602 	SAV<T>* sav;
603 	friend class Promise<T>;
604 };
605 
606 // This class is used by the flow compiler when generating code around wait statements to avoid confusing situations
607 // regarding Futures.
608 //
609 // For example, the following is legal with Future but not with StrictFuture:
610 //
611 // Future<T> x = ...
612 // T result = wait(x); // This is the correct code
613 // Future<T> result = wait(x); // This is legal if wait() generates Futures, but it's probably wrong. It's a compilation error if wait() generates StrictFutures.
614 template <class T>
615 class StrictFuture : public Future<T> {
616 public:
StrictFuture(Future<T> const & f)617 	inline StrictFuture(Future<T> const& f) : Future<T>(f) {}
StrictFuture(Never n)618 	inline StrictFuture(Never n) : Future<T>(n) {}
619 private:
StrictFuture(T t)620 	StrictFuture(T t) {}
StrictFuture(Error e)621 	StrictFuture(Error e) {}
622 };
623 
624 template <class T>
625 class Promise sealed
626 {
627 public:
628 	template <class U>
send(U && value)629 	void send(U && value) const {
630 		sav->send(std::forward<U>(value));
631 	}
632 	template <class E>
sendError(const E & exc)633 	void sendError(const E& exc) const { sav->sendError(exc); }
634 
getFuture()635 	Future<T> getFuture() const { sav->addFutureRef(); return Future<T>(sav); }
isSet()636 	bool isSet() { return sav->isSet(); }
canBeSet()637 	bool canBeSet() { return sav->canBeSet(); }
isValid()638 	bool isValid() const { return sav != NULL; }
Promise()639 	Promise() : sav(new SAV<T>(0, 1)) {}
Promise(const Promise & rhs)640 	Promise(const Promise& rhs) : sav(rhs.sav) { sav->addPromiseRef(); }
Promise(Promise && rhs)641 	Promise(Promise&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) { rhs.sav = 0; }
~Promise()642 	~Promise() { if (sav) sav->delPromiseRef(); }
643 
644 	void operator=(const Promise& rhs) {
645 		if (rhs.sav) rhs.sav->addPromiseRef();
646 		if (sav) sav->delPromiseRef();
647 		sav = rhs.sav;
648 	}
649 	void operator=(Promise && rhs) BOOST_NOEXCEPT {
650 		if (sav != rhs.sav) {
651 			if (sav) sav->delPromiseRef();
652 			sav = rhs.sav;
653 			rhs.sav = 0;
654 		}
655 	}
reset()656 	void reset() {
657 		*this = Promise<T>();
658 	}
swap(Promise & other)659 	void swap(Promise& other) {
660 		std::swap(sav, other.sav);
661 	}
662 
663 	// Beware, these operations are very unsafe
extractRawPointer()664 	SAV<T>* extractRawPointer() { auto ptr = sav; sav = NULL; return ptr; }
sav(ptr)665 	explicit Promise<T>(SAV<T>* ptr) : sav(ptr) {}
666 
getFutureReferenceCount()667 	int getFutureReferenceCount() const { return sav->getFutureReferenceCount(); }
getPromiseReferenceCount()668 	int getPromiseReferenceCount() const { return sav->getPromiseReferenceCount(); }
669 
670 private:
671 	SAV<T> *sav;
672 };
673 
674 
675 template <class T>
676 class FutureStream {
677 public:
isValid()678 	bool isValid() const {
679 		return queue != 0;
680 	}
isReady()681 	bool isReady() const {
682 		return queue->isReady();
683 	}
isError()684 	bool isError() const {
685 		// This means that the next thing to be popped is an error - it will be false if there is an error in the stream but some actual data first
686 		return queue->isError();
687 	}
addCallbackAndClear(SingleCallback<T> * cb)688 	void addCallbackAndClear(SingleCallback<T>* cb) {
689 		queue->addCallbackAndDelFutureRef(cb);
690 		queue = 0;
691 	}
FutureStream()692 	FutureStream() : queue(NULL) {}
FutureStream(const FutureStream & rhs)693 	FutureStream(const FutureStream& rhs) : queue(rhs.queue) { queue->addFutureRef(); }
FutureStream(FutureStream && rhs)694 	FutureStream(FutureStream&& rhs) BOOST_NOEXCEPT : queue(rhs.queue) { rhs.queue = 0; }
~FutureStream()695 	~FutureStream() { if (queue) queue->delFutureRef(); }
696 	void operator=(const FutureStream& rhs) {
697 		rhs.queue->addFutureRef();
698 		if (queue) queue->delFutureRef();
699 		queue = rhs.queue;
700 	}
701 	void operator=(FutureStream&& rhs) BOOST_NOEXCEPT {
702 		if (rhs.queue != queue) {
703 			if (queue) queue->delFutureRef();
704 			queue = rhs.queue;
705 			rhs.queue = 0;
706 		}
707 	}
708 	bool operator == (const FutureStream& rhs) { return rhs.queue == queue; }
709 	bool operator != (const FutureStream& rhs) { return rhs.queue != queue; }
710 
pop()711 	T pop() {
712 		return queue->pop();
713 	}
getError()714 	Error getError() {
715 		ASSERT(queue->isError());
716 		return queue->error;
717 	}
718 
FutureStream(NotifiedQueue<T> * queue)719 	explicit FutureStream(NotifiedQueue<T>* queue) : queue(queue) {}
720 
721 private:
722 	NotifiedQueue<T>* queue;
723 };
724 
725 template <class Request>
getReplyPromise(Request const & r)726 decltype(fake<Request>().reply) const& getReplyPromise(Request const& r) { return r.reply; }
727 
728 
729 
730 // Neither of these implementations of REPLY_TYPE() works on both MSVC and g++, so...
731 #ifdef __GNUG__
732 #define REPLY_TYPE(RequestType) decltype( getReplyPromise( fake<RequestType>() ).getFuture().getValue() )
733 //#define REPLY_TYPE(RequestType) decltype( getReplyFuture( fake<RequestType>() ).getValue() )
734 #else
735 template <class T>
736 struct ReplyType {
737 	// Doing this calculation directly in the return value declaration for PromiseStream<T>::getReply()
738 	//   breaks IntelliSense in VS2010; this is a workaround.
739 	typedef decltype(fake<T>().reply.getFuture().getValue()) Type;
740 };
741 template <class T> class ReplyPromise;
742 template <class T>
743 struct ReplyType<ReplyPromise<T>> {
744 	typedef T Type;
745 };
746 #define REPLY_TYPE(RequestType) typename ReplyType<RequestType>::Type
747 #endif
748 
749 
750 
751 
752 template <class T>
753 class PromiseStream {
754 public:
755 	// stream.send( request )
756 	//   Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times)
757 
758 	void send(const T& value) const {
759 		queue->send(value);
760 	}
761 	void sendError(const Error& error) const {
762 		queue->sendError(error);
763 	}
764 
765 	// stream.getReply( request )
766 	//   Reliable at least once delivery: Eventually delivers request at least once and returns one of the replies if communication is possible.  Might deliver request
767 	//      more than once.
768 	//   If a reply is returned, request was or will be delivered one or more times.
769 	//   If cancelled, request was or will be delivered zero or more times.
770 	template <class X>
771 	Future<REPLY_TYPE(X)> getReply(const X& value) const {
772 		send(value);
773 		return getReplyPromise(value).getFuture();
774 	}
775 	template <class X>
776 	Future<REPLY_TYPE(X)> getReply(const X& value, int taskID) const {
777 		setReplyPriority(value, taskID);
778 		return getReplyPromise(value).getFuture();
779 	}
780 
781 	template <class X>
782 	Future<X> getReply() const {
783 		return getReply(Promise<X>());
784 	}
785 	template <class X>
786 	Future<X> getReplyWithTaskID(int taskID) const {
787 		Promise<X> reply;
788 		reply.getEndpoint(taskID);
789 		return getReply(reply);
790 	}
791 
792 	FutureStream<T> getFuture() const { queue->addFutureRef(); return FutureStream<T>(queue); }
793 	PromiseStream() : queue(new NotifiedQueue<T>(0, 1)) {}
794 	PromiseStream(const PromiseStream& rhs) : queue(rhs.queue) { queue->addPromiseRef(); }
795 	PromiseStream(PromiseStream&& rhs) BOOST_NOEXCEPT : queue(rhs.queue) { rhs.queue = 0; }
796 	void operator=(const PromiseStream& rhs) {
797 		rhs.queue->addPromiseRef();
798 		if (queue) queue->delPromiseRef();
799 		queue = rhs.queue;
800 	}
801 	void operator=(PromiseStream&& rhs) BOOST_NOEXCEPT {
802 		if (queue != rhs.queue) {
803 			if (queue) queue->delPromiseRef();
804 			queue = rhs.queue;
805 			rhs.queue = 0;
806 		}
807 	}
808 	~PromiseStream() {
809 		if (queue)
810 			queue->delPromiseRef();
811 		//queue = (NotifiedQueue<T>*)0xdeadbeef;
812 	}
813 
814 	bool operator == (const PromiseStream<T>& rhs) const { return queue == rhs.queue; }
815 	bool isEmpty() const { return !queue->isReady(); }
816 
817 private:
818 	NotifiedQueue<T>* queue;
819 };
820 
821 
822 //extern int actorCount;
823 
824 template <class T>
825 static inline void destruct(T& t) {
826 	t.~T();
827 }
828 
829 template <class ReturnValue>
830 struct Actor : SAV<ReturnValue> {
831 	int8_t actor_wait_state;  // -1 means actor is cancelled; 0 means actor is not waiting; 1-N mean waiting in callback group #
832 
833 	Actor() : SAV<ReturnValue>(1, 1), actor_wait_state(0) { /*++actorCount;*/ }
834 	//~Actor() { --actorCount; }
835 };
836 
837 template <>
838 struct Actor<void> {
839 	// This specialization is for a void actor (one not returning a future, hence also uncancellable)
840 
841 	int8_t actor_wait_state;  // 0 means actor is not waiting; 1-N mean waiting in callback group #
842 
843 	Actor() : actor_wait_state(0) { /*++actorCount;*/ }
844 	//~Actor() { --actorCount; }
845 };
846 
847 template <class ActorType, int CallbackNumber, class ValueType>
848 struct ActorCallback : Callback<ValueType> {
849 	virtual void fire(ValueType const& value) {
850 		static_cast<ActorType*>(this)->a_callback_fire(this, value);
851 	}
852 	virtual void error(Error e) {
853 		static_cast<ActorType*>(this)->a_callback_error(this, e);
854 	}
855 };
856 
857 template <class ActorType, int CallbackNumber, class ValueType>
858 struct ActorSingleCallback : SingleCallback<ValueType> {
859 	virtual void fire(ValueType const& value) {
860 		static_cast<ActorType*>(this)->a_callback_fire(this, value);
861 	}
862 	virtual void error(Error e) {
863 		static_cast<ActorType*>(this)->a_callback_error(this, e);
864 	}
865 };
866 inline double now() { return g_network->now(); }
867 inline Future<Void> delay(double seconds, int taskID = TaskDefaultDelay) { return g_network->delay(seconds, taskID); }
868 inline Future<Void> delayUntil(double time, int taskID = TaskDefaultDelay) { return g_network->delay(std::max(0.0, time - g_network->now()), taskID); }
869 inline Future<Void> delayJittered(double seconds, int taskID = TaskDefaultDelay) { return g_network->delay(seconds*(FLOW_KNOBS->DELAY_JITTER_OFFSET + FLOW_KNOBS->DELAY_JITTER_RANGE*g_random->random01()), taskID); }
870 inline Future<Void> yield(int taskID = TaskDefaultYield) { return g_network->yield(taskID); }
871 inline bool check_yield(int taskID = TaskDefaultYield) { return g_network->check_yield(taskID); }
872 #include "flow/genericactors.actor.h"
873 #endif
874