1 /*
2 * flow.h
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #ifndef FLOW_FLOW_H
22 #define FLOW_FLOW_H
23 #pragma once
24
25 #pragma warning( disable: 4244 4267 ) // SOMEDAY: Carefully check for integer overflow issues (e.g. size_t to int conversions like this suppresses)
26 #pragma warning( disable: 4345 )
27 #pragma warning( error: 4239 )
28
29 #include <vector>
30 #include <queue>
31 #include <map>
32 #include <unordered_map>
33 #include <set>
34 #include <functional>
35 #include <iostream>
36 #include <string>
37 #include <utility>
38 #include <algorithm>
39
40 #include "flow/Platform.h"
41 #include "flow/FastAlloc.h"
42 #include "flow/IRandom.h"
43 #include "flow/serialize.h"
44 #include "flow/Deque.h"
45 #include "flow/ThreadPrimitives.h"
46 #include "flow/network.h"
47
48 #include <boost/version.hpp>
49
50 using namespace std::rel_ops;
51
52 #define TEST( condition ) if (!(condition)); else { static TraceEvent* __test = &(TraceEvent("CodeCoverage").detail("File", __FILE__).detail("Line",__LINE__).detail("Condition", #condition)); }
53
54 /*
55 usage:
56 if (BUGGIFY) (
57 // code here is executed on some runs (with probability P_BUGGIFIED_SECTION_ACTIVATED),
58 // sometimes --
59 )
60 */
61
62 extern double P_BUGGIFIED_SECTION_ACTIVATED, P_BUGGIFIED_SECTION_FIRES, P_EXPENSIVE_VALIDATION;
63 int getSBVar(std::string file, int line);
64 void enableBuggify(bool enabled); // Currently controls buggification and (randomized) expensive validation
65 bool validationIsEnabled();
66
67 #define BUGGIFY_WITH_PROB(x) (getSBVar(__FILE__, __LINE__) && g_random->random01() < (x))
68 #define BUGGIFY BUGGIFY_WITH_PROB(P_BUGGIFIED_SECTION_FIRES)
69 #define EXPENSIVE_VALIDATION (validationIsEnabled() && g_random->random01() < P_EXPENSIVE_VALIDATION)
70
71 extern Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_unit = "");
72 extern std::string format(const char* form, ...);
73
74 // On success, returns the number of characters written. On failure, returns a negative number.
75 extern int vsformat(std::string &outputString, const char* form, va_list args);
76
77 extern Standalone<StringRef> strinc(StringRef const& str);
78 extern StringRef strinc(StringRef const& str, Arena& arena);
79 extern Standalone<StringRef> addVersionStampAtEnd(StringRef const& str);
80 extern StringRef addVersionStampAtEnd(StringRef const& str, Arena& arena);
81
82 template <typename Iter>
concatenate(Iter b,Iter const & e,Arena & arena)83 StringRef concatenate( Iter b, Iter const& e, Arena& arena ) {
84 int rsize = 0;
85 Iter i = b;
86 while(i != e) {
87 rsize += i->size();
88 ++i;
89 }
90 uint8_t* s = new (arena) uint8_t[ rsize ];
91 uint8_t* p = s;
92 while(b != e) {
93 memcpy(p, b->begin(),b->size());
94 p += b->size();
95 ++b;
96 }
97 return StringRef(s, rsize);
98 }
99
100 template <typename Iter>
concatenate(Iter b,Iter const & e)101 Standalone<StringRef> concatenate( Iter b, Iter const& e ) {
102 Standalone<StringRef> r;
103 ((StringRef &)r) = concatenate(b, e, r.arena());
104 return r;
105 }
106
107 class Void {
108 public:
109 template <class Ar>
serialize(Ar &)110 void serialize(Ar&) {}
111 };
112
113 class Never {};
114
115 template <class T>
116 class ErrorOr {
117 public:
ErrorOr()118 ErrorOr() : error(default_error_or()) {}
ErrorOr(Error const & error)119 ErrorOr(Error const& error) : error(error) {}
ErrorOr(const ErrorOr<T> & o)120 ErrorOr(const ErrorOr<T>& o) : error(o.error) {
121 if (present()) new (&value) T(o.get());
122 }
123
124 template <class U>
ErrorOr(const U & t)125 ErrorOr(const U& t) : error() { new (&value) T(t); }
126
ErrorOr(Arena & a,const ErrorOr<T> & o)127 ErrorOr(Arena& a, const ErrorOr<T>& o) : error(o.error) {
128 if (present()) new (&value) T(a, o.get());
129 }
expectedSize()130 int expectedSize() const { return present() ? get().expectedSize() : 0; }
131
castTo()132 template <class R> ErrorOr<R> castTo() const {
133 return map<R>([](const T& v){ return (R)v; });
134 }
135
map(std::function<R (T)> f)136 template <class R> ErrorOr<R> map(std::function<R(T)> f) const {
137 if (present()) {
138 return ErrorOr<R>(f(get()));
139 }
140 else {
141 return ErrorOr<R>(error);
142 }
143 }
144
~ErrorOr()145 ~ErrorOr() {
146 if (present()) ((T*)&value)->~T();
147 }
148
149 ErrorOr & operator=(ErrorOr const& o) {
150 if (present()) {
151 ((T*)&value)->~T();
152 }
153 if (o.present()) {
154 new (&value) T(o.get());
155 }
156 error = o.error;
157 return *this;
158 }
159
present()160 bool present() const { return error.code() == invalid_error_code; }
get()161 T& get() {
162 UNSTOPPABLE_ASSERT(present());
163 return *(T*)&value;
164 }
get()165 T const& get() const {
166 UNSTOPPABLE_ASSERT(present());
167 return *(T const*)&value;
168 }
orDefault(T const & default_value)169 T orDefault(T const& default_value) const { if (present()) return get(); else return default_value; }
170
171 template <class Ar>
serialize(Ar & ar)172 void serialize(Ar& ar) {
173 // SOMEDAY: specialize for space efficiency?
174 serializer(ar, error);
175 if (present()) {
176 if (Ar::isDeserializing) new (&value) T();
177 serializer(ar, *(T*)&value);
178 }
179 }
180
isError()181 bool isError() const { return error.code() != invalid_error_code; }
isError(int code)182 bool isError(int code) const { return error.code() == code; }
getError()183 Error getError() const { ASSERT(isError()); return error; }
184
185 private:
186 typename std::aligned_storage< sizeof(T), __alignof(T) >::type value;
187 Error error;
188 };
189
190 template <class T>
191 struct Callback {
192 Callback<T> *prev, *next;
193
fireCallback194 virtual void fire(T const&) {}
errorCallback195 virtual void error(Error) {}
unwaitCallback196 virtual void unwait() {}
197
insertCallback198 void insert(Callback<T>* into) {
199 // Add this (uninitialized) callback just after `into`
200 this->prev = into;
201 this->next = into->next;
202 into->next->prev = this;
203 into->next = this;
204 }
205
insertBackCallback206 void insertBack(Callback<T>* into) {
207 // Add this (uninitialized) callback just before `into`
208 this->next = into;
209 this->prev = into->prev;
210 into->prev->next = this;
211 into->prev = this;
212 }
213
insertChainCallback214 void insertChain(Callback<T>* into) {
215 // Combine this callback's (initialized) chain and `into`'s such that this callback is just after `into`
216 auto p = this->prev;
217 auto n = into->next;
218 this->prev = into;
219 into->next = this;
220 p->next = n;
221 n->prev = p;
222 }
223
removeCallback224 void remove() {
225 // Remove this callback from the list it is in, and call unwait() on the head of that list if this was the last callback
226 next->prev = prev;
227 prev->next = next;
228 if (prev == next)
229 next->unwait();
230 }
231
countCallbacksCallback232 int countCallbacks() {
233 int count = 0;
234 for (Callback* c = next; c != this; c = c->next)
235 count++;
236 return count;
237 }
238 };
239
240 template <class T>
241 struct SingleCallback {
242 // Used for waiting on FutureStreams, which don't support multiple callbacks
243 SingleCallback<T> *next;
244
fireSingleCallback245 virtual void fire(T const&) {}
errorSingleCallback246 virtual void error(Error) {}
unwaitSingleCallback247 virtual void unwait() {}
248
insertSingleCallback249 void insert(SingleCallback<T>* into) {
250 this->next = into->next;
251 into->next = this;
252 }
253
removeSingleCallback254 void remove() {
255 ASSERT(next->next == this);
256 next->next = next;
257 next->unwait();
258 }
259 };
260
261 template <class T>
262 struct SAV : private Callback<T>, FastAllocated<SAV<T>> {
263 int promises; // one for each promise (and one for an active actor if this is an actor)
264 int futures; // one for each future and one more if there are any callbacks
265
266 private:
267 typename std::aligned_storage< sizeof(T), __alignof(T) >::type value_storage;
268 public:
269 Error error_state;
270
271 enum { UNSET_ERROR_CODE = -3, NEVER_ERROR_CODE, SET_ERROR_CODE };
272
valueSAV273 T& value() { return *(T*)&value_storage; }
274
SAVSAV275 SAV(int futures, int promises) : futures(futures), promises(promises), error_state(Error::fromCode(UNSET_ERROR_CODE)) {
276 Callback<T>::prev = Callback<T>::next = this;
277 }
~SAVSAV278 ~SAV() {
279 if (int16_t(error_state.code()) == SET_ERROR_CODE)
280 value().~T();
281 }
282
isSetSAV283 bool isSet() const { return int16_t(error_state.code()) > NEVER_ERROR_CODE; }
canBeSetSAV284 bool canBeSet() const { return int16_t(error_state.code()) == UNSET_ERROR_CODE; }
isErrorSAV285 bool isError() const { return int16_t(error_state.code()) > SET_ERROR_CODE; }
286
getSAV287 T const& get() {
288 ASSERT(isSet());
289 if (isError()) throw error_state;
290 return value();
291 }
292
293 template <class U>
sendSAV294 void send(U && value) {
295 ASSERT(canBeSet());
296 new (&value_storage) T(std::forward<U>(value));
297 this->error_state = Error::fromCode(SET_ERROR_CODE);
298 while (Callback<T>::next != this)
299 Callback<T>::next->fire(this->value());
300 }
301
sendSAV302 void send(Never) {
303 ASSERT(canBeSet());
304 this->error_state = Error::fromCode(NEVER_ERROR_CODE);
305 }
306
sendErrorSAV307 void sendError(Error err) {
308 ASSERT(canBeSet() && int16_t(err.code()) > 0);
309 this->error_state = err;
310 while (Callback<T>::next != this)
311 Callback<T>::next->error(err);
312 }
313
314 template <class U>
sendAndDelPromiseRefSAV315 void sendAndDelPromiseRef(U && value) {
316 ASSERT(canBeSet());
317 if (promises == 1 && !futures) {
318 // No one is left to receive the value, so we can just die
319 destroy();
320 return;
321 }
322
323 new (&value_storage) T(std::forward<U>(value));
324 finishSendAndDelPromiseRef();
325 }
326
finishSendAndDelPromiseRefSAV327 void finishSendAndDelPromiseRef() {
328 // Call only after value_storage has already been initialized!
329 this->error_state = Error::fromCode(SET_ERROR_CODE);
330 while (Callback<T>::next != this)
331 Callback<T>::next->fire(this->value());
332
333 if (!--promises && !futures)
334 destroy();
335 }
336
sendAndDelPromiseRefSAV337 void sendAndDelPromiseRef(Never) {
338 ASSERT(canBeSet());
339 this->error_state = Error::fromCode(NEVER_ERROR_CODE);
340 if (!--promises && !futures)
341 destroy();
342 }
343
sendErrorAndDelPromiseRefSAV344 void sendErrorAndDelPromiseRef(Error err) {
345 ASSERT(canBeSet() && int16_t(err.code()) > 0);
346 if (promises == 1 && !futures) {
347 // No one is left to receive the value, so we can just die
348 destroy();
349 return;
350 }
351
352 this->error_state = err;
353 while (Callback<T>::next != this)
354 Callback<T>::next->error(err);
355
356 if (!--promises && !futures)
357 destroy();
358 }
359
addPromiseRefSAV360 void addPromiseRef() { promises++; }
addFutureRefSAV361 void addFutureRef() { futures++; }
362
delPromiseRefSAV363 void delPromiseRef() {
364 if (promises == 1) {
365 if (futures && canBeSet()) {
366 sendError(broken_promise());
367 ASSERT(promises == 1); // Once there is only one promise, there is no one else with the right to change the promise reference count
368 }
369 promises = 0;
370 if (!futures)
371 destroy();
372 }
373 else
374 --promises;
375 }
delFutureRefSAV376 void delFutureRef() {
377 if (!--futures) {
378 if (promises)
379 cancel();
380 else
381 destroy();
382 }
383 }
384
getFutureReferenceCountSAV385 int getFutureReferenceCount() const { return futures; }
getPromiseReferenceCountSAV386 int getPromiseReferenceCount() const { return promises; }
387
destroySAV388 virtual void destroy() { delete this; }
cancelSAV389 virtual void cancel() {}
390
addCallbackAndDelFutureRefSAV391 void addCallbackAndDelFutureRef(Callback<T>* cb) {
392 // We are always *logically* dropping one future reference from this, but if we are adding a first callback
393 // we also need to add one (since futures is defined as being +1 if there are any callbacks), so net nothing
394 if (Callback<T>::next != this)
395 delFutureRef();
396 cb->insert(this);
397 }
398
addYieldedCallbackAndDelFutureRefSAV399 void addYieldedCallbackAndDelFutureRef(Callback<T>* cb) {
400 // Same contract as addCallbackAndDelFutureRef, except that the callback is placed at the end of the callback chain rather than at the beginning
401 if (Callback<T>::next != this)
402 delFutureRef();
403 cb->insertBack(this);
404 }
405
addCallbackChainAndDelFutureRefSAV406 void addCallbackChainAndDelFutureRef(Callback<T>* cb) {
407 if (Callback<T>::next != this)
408 delFutureRef();
409 cb->insertChain(this);
410 }
411
unwaitSAV412 virtual void unwait() {
413 delFutureRef();
414 }
fireSAV415 virtual void fire() { ASSERT(false); }
416 };
417
418 template <class T>
419 struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>> {
420 int promises; // one for each promise (and one for an active actor if this is an actor)
421 int futures; // one for each future and one more if there are any callbacks
422
423 // Invariant: SingleCallback<T>::next==this || (queue.empty() && !error.isValid())
424 std::queue<T, Deque<T>> queue;
425 Error error;
426
NotifiedQueueNotifiedQueue427 NotifiedQueue(int futures, int promises) : futures(futures), promises(promises) {
428 SingleCallback<T>::next = this;
429 }
430
isReadyNotifiedQueue431 bool isReady() const { return !queue.empty() || error.isValid(); }
isErrorNotifiedQueue432 bool isError() const { return queue.empty() && error.isValid(); } // the *next* thing queued is an error
433
popNotifiedQueue434 T pop() {
435 if (queue.empty()) {
436 if (error.isValid()) throw error;
437 throw internal_error();
438 }
439 auto copy = queue.front();
440 queue.pop();
441 return copy;
442 }
443
444 template <class U>
sendNotifiedQueue445 void send(U && value) {
446 if (error.isValid()) return;
447
448 if (SingleCallback<T>::next != this) {
449 SingleCallback<T>::next->fire(std::forward<U>(value));
450 }
451 else {
452 queue.emplace(std::forward<U>(value));
453 }
454 }
455
sendErrorNotifiedQueue456 void sendError(Error err) {
457 if (error.isValid()) return;
458
459 this->error = err;
460 if (SingleCallback<T>::next != this)
461 SingleCallback<T>::next->error(err);
462 }
463
addPromiseRefNotifiedQueue464 void addPromiseRef() { promises++; }
addFutureRefNotifiedQueue465 void addFutureRef() { futures++; }
466
delPromiseRefNotifiedQueue467 void delPromiseRef() {
468 if (!--promises) {
469 if (futures) {
470 sendError(broken_promise());
471 }
472 else
473 destroy();
474 }
475 }
delFutureRefNotifiedQueue476 void delFutureRef() {
477 if (!--futures) {
478 if (promises)
479 cancel();
480 else
481 destroy();
482 }
483 }
484
getFutureReferenceCountNotifiedQueue485 int getFutureReferenceCount() const { return futures; }
getPromiseReferenceCountNotifiedQueue486 int getPromiseReferenceCount() const { return promises; }
487
destroyNotifiedQueue488 virtual void destroy() { delete this; }
cancelNotifiedQueue489 virtual void cancel() {}
490
addCallbackAndDelFutureRefNotifiedQueue491 void addCallbackAndDelFutureRef(SingleCallback<T>* cb) {
492 ASSERT(SingleCallback<T>::next == this);
493 cb->insert(this);
494 }
unwaitNotifiedQueue495 virtual void unwait() {
496 delFutureRef();
497 }
fireNotifiedQueue498 virtual void fire() { ASSERT(false); }
499 };
500
501
502 template <class T>
503 class Promise;
504
505 template <class T>
506 class Future
507 {
508 public:
get()509 T const& get() const { return sav->get(); }
getValue()510 T getValue() const { return get(); }
511
isValid()512 bool isValid() const {
513 return sav != 0;
514 }
isReady()515 bool isReady() const {
516 return sav->isSet();
517 }
isError()518 bool isError() const {
519 return sav->isError();
520 }
getError()521 Error& getError() const {
522 ASSERT(isError());
523 return sav->error_state;
524 }
525
Future()526 Future() : sav(0) {}
Future(const Future<T> & rhs)527 Future(const Future<T>& rhs) : sav(rhs.sav) {
528 if (sav) sav->addFutureRef();
529 //if (sav->endpoint.isValid()) cout << "Future copied for " << sav->endpoint.key << endl;
530 }
Future(Future<T> && rhs)531 Future(Future<T>&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) {
532 rhs.sav = 0;
533 //if (sav->endpoint.isValid()) cout << "Future moved for " << sav->endpoint.key << endl;
534 }
Future(const T & presentValue)535 Future(const T& presentValue)
536 : sav(new SAV<T>(1, 0))
537 {
538 sav->send(presentValue);
539 }
Future(Never)540 Future(Never)
541 : sav(new SAV<T>(1, 0))
542 {
543 sav->send(Never());
544 }
Future(const Error & error)545 Future(const Error& error)
546 : sav(new SAV<T>(1, 0))
547 {
548 sav->sendError(error);
549 }
550
551 #ifndef NO_INTELLISENSE
552 template<class U>
553 Future(const U&, typename std::enable_if<std::is_assignable<T, U>::value, int*>::type = 0) {}
554 #endif
555
~Future()556 ~Future() {
557 //if (sav && sav->endpoint.isValid()) cout << "Future destroyed for " << sav->endpoint.key << endl;
558 if (sav) sav->delFutureRef();
559 }
560 void operator=(const Future<T>& rhs) {
561 if (rhs.sav) rhs.sav->addFutureRef();
562 if (sav) sav->delFutureRef();
563 sav = rhs.sav;
564 }
565 void operator=(Future<T>&& rhs) BOOST_NOEXCEPT {
566 if (sav != rhs.sav) {
567 if (sav) sav->delFutureRef();
568 sav = rhs.sav;
569 rhs.sav = 0;
570 }
571 }
572 bool operator == (const Future& rhs) { return rhs.sav == sav; }
573 bool operator != (const Future& rhs) { return rhs.sav != sav; }
574
cancel()575 void cancel() {
576 if (sav) sav->cancel();
577 }
578
addCallbackAndClear(Callback<T> * cb)579 void addCallbackAndClear(Callback<T>* cb) {
580 sav->addCallbackAndDelFutureRef(cb);
581 sav = 0;
582 }
583
addYieldedCallbackAndClear(Callback<T> * cb)584 void addYieldedCallbackAndClear(Callback<T>* cb) {
585 sav->addYieldedCallbackAndDelFutureRef(cb);
586 sav = 0;
587 }
588
addCallbackChainAndClear(Callback<T> * cb)589 void addCallbackChainAndClear(Callback<T>* cb) {
590 sav->addCallbackChainAndDelFutureRef(cb);
591 sav = 0;
592 }
593
getFutureReferenceCount()594 int getFutureReferenceCount() const { return sav->getFutureReferenceCount(); }
getPromiseReferenceCount()595 int getPromiseReferenceCount() const { return sav->getPromiseReferenceCount(); }
596
Future(SAV<T> * sav)597 explicit Future(SAV<T> * sav) : sav(sav) {
598 //if (sav->endpoint.isValid()) cout << "Future created for " << sav->endpoint.key << endl;
599 }
600
601 private:
602 SAV<T>* sav;
603 friend class Promise<T>;
604 };
605
606 // This class is used by the flow compiler when generating code around wait statements to avoid confusing situations
607 // regarding Futures.
608 //
609 // For example, the following is legal with Future but not with StrictFuture:
610 //
611 // Future<T> x = ...
612 // T result = wait(x); // This is the correct code
613 // Future<T> result = wait(x); // This is legal if wait() generates Futures, but it's probably wrong. It's a compilation error if wait() generates StrictFutures.
614 template <class T>
615 class StrictFuture : public Future<T> {
616 public:
StrictFuture(Future<T> const & f)617 inline StrictFuture(Future<T> const& f) : Future<T>(f) {}
StrictFuture(Never n)618 inline StrictFuture(Never n) : Future<T>(n) {}
619 private:
StrictFuture(T t)620 StrictFuture(T t) {}
StrictFuture(Error e)621 StrictFuture(Error e) {}
622 };
623
624 template <class T>
625 class Promise sealed
626 {
627 public:
628 template <class U>
send(U && value)629 void send(U && value) const {
630 sav->send(std::forward<U>(value));
631 }
632 template <class E>
sendError(const E & exc)633 void sendError(const E& exc) const { sav->sendError(exc); }
634
getFuture()635 Future<T> getFuture() const { sav->addFutureRef(); return Future<T>(sav); }
isSet()636 bool isSet() { return sav->isSet(); }
canBeSet()637 bool canBeSet() { return sav->canBeSet(); }
isValid()638 bool isValid() const { return sav != NULL; }
Promise()639 Promise() : sav(new SAV<T>(0, 1)) {}
Promise(const Promise & rhs)640 Promise(const Promise& rhs) : sav(rhs.sav) { sav->addPromiseRef(); }
Promise(Promise && rhs)641 Promise(Promise&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) { rhs.sav = 0; }
~Promise()642 ~Promise() { if (sav) sav->delPromiseRef(); }
643
644 void operator=(const Promise& rhs) {
645 if (rhs.sav) rhs.sav->addPromiseRef();
646 if (sav) sav->delPromiseRef();
647 sav = rhs.sav;
648 }
649 void operator=(Promise && rhs) BOOST_NOEXCEPT {
650 if (sav != rhs.sav) {
651 if (sav) sav->delPromiseRef();
652 sav = rhs.sav;
653 rhs.sav = 0;
654 }
655 }
reset()656 void reset() {
657 *this = Promise<T>();
658 }
swap(Promise & other)659 void swap(Promise& other) {
660 std::swap(sav, other.sav);
661 }
662
663 // Beware, these operations are very unsafe
extractRawPointer()664 SAV<T>* extractRawPointer() { auto ptr = sav; sav = NULL; return ptr; }
sav(ptr)665 explicit Promise<T>(SAV<T>* ptr) : sav(ptr) {}
666
getFutureReferenceCount()667 int getFutureReferenceCount() const { return sav->getFutureReferenceCount(); }
getPromiseReferenceCount()668 int getPromiseReferenceCount() const { return sav->getPromiseReferenceCount(); }
669
670 private:
671 SAV<T> *sav;
672 };
673
674
675 template <class T>
676 class FutureStream {
677 public:
isValid()678 bool isValid() const {
679 return queue != 0;
680 }
isReady()681 bool isReady() const {
682 return queue->isReady();
683 }
isError()684 bool isError() const {
685 // This means that the next thing to be popped is an error - it will be false if there is an error in the stream but some actual data first
686 return queue->isError();
687 }
addCallbackAndClear(SingleCallback<T> * cb)688 void addCallbackAndClear(SingleCallback<T>* cb) {
689 queue->addCallbackAndDelFutureRef(cb);
690 queue = 0;
691 }
FutureStream()692 FutureStream() : queue(NULL) {}
FutureStream(const FutureStream & rhs)693 FutureStream(const FutureStream& rhs) : queue(rhs.queue) { queue->addFutureRef(); }
FutureStream(FutureStream && rhs)694 FutureStream(FutureStream&& rhs) BOOST_NOEXCEPT : queue(rhs.queue) { rhs.queue = 0; }
~FutureStream()695 ~FutureStream() { if (queue) queue->delFutureRef(); }
696 void operator=(const FutureStream& rhs) {
697 rhs.queue->addFutureRef();
698 if (queue) queue->delFutureRef();
699 queue = rhs.queue;
700 }
701 void operator=(FutureStream&& rhs) BOOST_NOEXCEPT {
702 if (rhs.queue != queue) {
703 if (queue) queue->delFutureRef();
704 queue = rhs.queue;
705 rhs.queue = 0;
706 }
707 }
708 bool operator == (const FutureStream& rhs) { return rhs.queue == queue; }
709 bool operator != (const FutureStream& rhs) { return rhs.queue != queue; }
710
pop()711 T pop() {
712 return queue->pop();
713 }
getError()714 Error getError() {
715 ASSERT(queue->isError());
716 return queue->error;
717 }
718
FutureStream(NotifiedQueue<T> * queue)719 explicit FutureStream(NotifiedQueue<T>* queue) : queue(queue) {}
720
721 private:
722 NotifiedQueue<T>* queue;
723 };
724
725 template <class Request>
getReplyPromise(Request const & r)726 decltype(fake<Request>().reply) const& getReplyPromise(Request const& r) { return r.reply; }
727
728
729
730 // Neither of these implementations of REPLY_TYPE() works on both MSVC and g++, so...
731 #ifdef __GNUG__
732 #define REPLY_TYPE(RequestType) decltype( getReplyPromise( fake<RequestType>() ).getFuture().getValue() )
733 //#define REPLY_TYPE(RequestType) decltype( getReplyFuture( fake<RequestType>() ).getValue() )
734 #else
735 template <class T>
736 struct ReplyType {
737 // Doing this calculation directly in the return value declaration for PromiseStream<T>::getReply()
738 // breaks IntelliSense in VS2010; this is a workaround.
739 typedef decltype(fake<T>().reply.getFuture().getValue()) Type;
740 };
741 template <class T> class ReplyPromise;
742 template <class T>
743 struct ReplyType<ReplyPromise<T>> {
744 typedef T Type;
745 };
746 #define REPLY_TYPE(RequestType) typename ReplyType<RequestType>::Type
747 #endif
748
749
750
751
752 template <class T>
753 class PromiseStream {
754 public:
755 // stream.send( request )
756 // Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times)
757
758 void send(const T& value) const {
759 queue->send(value);
760 }
761 void sendError(const Error& error) const {
762 queue->sendError(error);
763 }
764
765 // stream.getReply( request )
766 // Reliable at least once delivery: Eventually delivers request at least once and returns one of the replies if communication is possible. Might deliver request
767 // more than once.
768 // If a reply is returned, request was or will be delivered one or more times.
769 // If cancelled, request was or will be delivered zero or more times.
770 template <class X>
771 Future<REPLY_TYPE(X)> getReply(const X& value) const {
772 send(value);
773 return getReplyPromise(value).getFuture();
774 }
775 template <class X>
776 Future<REPLY_TYPE(X)> getReply(const X& value, int taskID) const {
777 setReplyPriority(value, taskID);
778 return getReplyPromise(value).getFuture();
779 }
780
781 template <class X>
782 Future<X> getReply() const {
783 return getReply(Promise<X>());
784 }
785 template <class X>
786 Future<X> getReplyWithTaskID(int taskID) const {
787 Promise<X> reply;
788 reply.getEndpoint(taskID);
789 return getReply(reply);
790 }
791
792 FutureStream<T> getFuture() const { queue->addFutureRef(); return FutureStream<T>(queue); }
793 PromiseStream() : queue(new NotifiedQueue<T>(0, 1)) {}
794 PromiseStream(const PromiseStream& rhs) : queue(rhs.queue) { queue->addPromiseRef(); }
795 PromiseStream(PromiseStream&& rhs) BOOST_NOEXCEPT : queue(rhs.queue) { rhs.queue = 0; }
796 void operator=(const PromiseStream& rhs) {
797 rhs.queue->addPromiseRef();
798 if (queue) queue->delPromiseRef();
799 queue = rhs.queue;
800 }
801 void operator=(PromiseStream&& rhs) BOOST_NOEXCEPT {
802 if (queue != rhs.queue) {
803 if (queue) queue->delPromiseRef();
804 queue = rhs.queue;
805 rhs.queue = 0;
806 }
807 }
808 ~PromiseStream() {
809 if (queue)
810 queue->delPromiseRef();
811 //queue = (NotifiedQueue<T>*)0xdeadbeef;
812 }
813
814 bool operator == (const PromiseStream<T>& rhs) const { return queue == rhs.queue; }
815 bool isEmpty() const { return !queue->isReady(); }
816
817 private:
818 NotifiedQueue<T>* queue;
819 };
820
821
822 //extern int actorCount;
823
824 template <class T>
825 static inline void destruct(T& t) {
826 t.~T();
827 }
828
829 template <class ReturnValue>
830 struct Actor : SAV<ReturnValue> {
831 int8_t actor_wait_state; // -1 means actor is cancelled; 0 means actor is not waiting; 1-N mean waiting in callback group #
832
833 Actor() : SAV<ReturnValue>(1, 1), actor_wait_state(0) { /*++actorCount;*/ }
834 //~Actor() { --actorCount; }
835 };
836
837 template <>
838 struct Actor<void> {
839 // This specialization is for a void actor (one not returning a future, hence also uncancellable)
840
841 int8_t actor_wait_state; // 0 means actor is not waiting; 1-N mean waiting in callback group #
842
843 Actor() : actor_wait_state(0) { /*++actorCount;*/ }
844 //~Actor() { --actorCount; }
845 };
846
847 template <class ActorType, int CallbackNumber, class ValueType>
848 struct ActorCallback : Callback<ValueType> {
849 virtual void fire(ValueType const& value) {
850 static_cast<ActorType*>(this)->a_callback_fire(this, value);
851 }
852 virtual void error(Error e) {
853 static_cast<ActorType*>(this)->a_callback_error(this, e);
854 }
855 };
856
857 template <class ActorType, int CallbackNumber, class ValueType>
858 struct ActorSingleCallback : SingleCallback<ValueType> {
859 virtual void fire(ValueType const& value) {
860 static_cast<ActorType*>(this)->a_callback_fire(this, value);
861 }
862 virtual void error(Error e) {
863 static_cast<ActorType*>(this)->a_callback_error(this, e);
864 }
865 };
866 inline double now() { return g_network->now(); }
867 inline Future<Void> delay(double seconds, int taskID = TaskDefaultDelay) { return g_network->delay(seconds, taskID); }
868 inline Future<Void> delayUntil(double time, int taskID = TaskDefaultDelay) { return g_network->delay(std::max(0.0, time - g_network->now()), taskID); }
869 inline Future<Void> delayJittered(double seconds, int taskID = TaskDefaultDelay) { return g_network->delay(seconds*(FLOW_KNOBS->DELAY_JITTER_OFFSET + FLOW_KNOBS->DELAY_JITTER_RANGE*g_random->random01()), taskID); }
870 inline Future<Void> yield(int taskID = TaskDefaultYield) { return g_network->yield(taskID); }
871 inline bool check_yield(int taskID = TaskDefaultYield) { return g_network->check_yield(taskID); }
872 #include "flow/genericactors.actor.h"
873 #endif
874