1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/futures/detail/Core.h>
18 
19 #include <new>
20 
21 #include <folly/lang/Assume.h>
22 
23 namespace folly {
24 namespace futures {
25 namespace detail {
26 
operator ()(DeferredExecutor * ptr)27 void UniqueDeleter::operator()(DeferredExecutor* ptr) {
28   if (ptr) {
29     ptr->release();
30   }
31 }
32 
KeepAliveOrDeferred()33 KeepAliveOrDeferred::KeepAliveOrDeferred() noexcept : state_(State::Deferred) {
34   ::new (&deferred_) DW{};
35 }
36 
KeepAliveOrDeferred(KA ka)37 KeepAliveOrDeferred::KeepAliveOrDeferred(KA ka) noexcept
38     : state_(State::KeepAlive) {
39   ::new (&keepAlive_) KA{std::move(ka)};
40 }
41 
KeepAliveOrDeferred(DW deferred)42 KeepAliveOrDeferred::KeepAliveOrDeferred(DW deferred) noexcept
43     : state_(State::Deferred) {
44   ::new (&deferred_) DW{std::move(deferred)};
45 }
46 
KeepAliveOrDeferred(KeepAliveOrDeferred && other)47 KeepAliveOrDeferred::KeepAliveOrDeferred(KeepAliveOrDeferred&& other) noexcept
48     : state_(other.state_) {
49   switch (state_) {
50     case State::Deferred:
51       ::new (&deferred_) DW{std::move(other.deferred_)};
52       break;
53     case State::KeepAlive:
54       ::new (&keepAlive_) KA{std::move(other.keepAlive_)};
55       break;
56   }
57 }
58 
~KeepAliveOrDeferred()59 KeepAliveOrDeferred::~KeepAliveOrDeferred() {
60   switch (state_) {
61     case State::Deferred:
62       deferred_.~DW();
63       break;
64     case State::KeepAlive:
65       keepAlive_.~KA();
66       break;
67   }
68 }
69 
operator =(KeepAliveOrDeferred && other)70 KeepAliveOrDeferred& KeepAliveOrDeferred::operator=(
71     KeepAliveOrDeferred&& other) noexcept {
72   // This is safe to do because KeepAliveOrDeferred is nothrow
73   // move-constructible.
74   this->~KeepAliveOrDeferred();
75   ::new (this) KeepAliveOrDeferred{std::move(other)};
76   return *this;
77 }
78 
getDeferredExecutor() const79 DeferredExecutor* KeepAliveOrDeferred::getDeferredExecutor() const noexcept {
80   switch (state_) {
81     case State::Deferred:
82       return deferred_.get();
83     case State::KeepAlive:
84       return nullptr;
85   }
86   assume_unreachable();
87 }
88 
getKeepAliveExecutor() const89 Executor* KeepAliveOrDeferred::getKeepAliveExecutor() const noexcept {
90   switch (state_) {
91     case State::Deferred:
92       return nullptr;
93     case State::KeepAlive:
94       return keepAlive_.get();
95   }
96   assume_unreachable();
97 }
98 
stealKeepAlive()99 KeepAliveOrDeferred::KA KeepAliveOrDeferred::stealKeepAlive() && noexcept {
100   switch (state_) {
101     case State::Deferred:
102       return KA{};
103     case State::KeepAlive:
104       return std::move(keepAlive_);
105   }
106   assume_unreachable();
107 }
108 
stealDeferred()109 KeepAliveOrDeferred::DW KeepAliveOrDeferred::stealDeferred() && noexcept {
110   switch (state_) {
111     case State::Deferred:
112       return std::move(deferred_);
113     case State::KeepAlive:
114       return DW{};
115   }
116   assume_unreachable();
117 }
118 
copy() const119 KeepAliveOrDeferred KeepAliveOrDeferred::copy() const {
120   switch (state_) {
121     case State::Deferred:
122       if (auto def = getDeferredExecutor()) {
123         return KeepAliveOrDeferred{def->copy()};
124       } else {
125         return KeepAliveOrDeferred{};
126       }
127     case State::KeepAlive:
128       return KeepAliveOrDeferred{keepAlive_};
129   }
130   assume_unreachable();
131 }
132 
operator bool() const133 /* explicit */ KeepAliveOrDeferred::operator bool() const noexcept {
134   return getDeferredExecutor() || getKeepAliveExecutor();
135 }
136 
addFrom(Executor::KeepAlive<> && completingKA,Executor::KeepAlive<>::KeepAliveFunc func)137 void DeferredExecutor::addFrom(
138     Executor::KeepAlive<>&& completingKA,
139     Executor::KeepAlive<>::KeepAliveFunc func) {
140   auto state = state_.load(std::memory_order_acquire);
141   if (state == State::DETACHED) {
142     return;
143   }
144 
145   // If we are completing on the current executor, call inline, otherwise
146   // add
147   auto addWithInline =
148       [&](Executor::KeepAlive<>::KeepAliveFunc&& addFunc) mutable {
149         if (completingKA.get() == executor_.get()) {
150           addFunc(std::move(completingKA));
151         } else {
152           executor_.copy().add(std::move(addFunc));
153         }
154       };
155 
156   if (state == State::HAS_EXECUTOR) {
157     addWithInline(std::move(func));
158     return;
159   }
160   DCHECK(state == State::EMPTY);
161   func_ = std::move(func);
162   if (folly::atomic_compare_exchange_strong_explicit(
163           &state_,
164           &state,
165           State::HAS_FUNCTION,
166           std::memory_order_release,
167           std::memory_order_acquire)) {
168     return;
169   }
170   DCHECK(state == State::DETACHED || state == State::HAS_EXECUTOR);
171   if (state == State::DETACHED) {
172     std::exchange(func_, nullptr);
173     return;
174   }
175   addWithInline(std::exchange(func_, nullptr));
176 }
177 
getExecutor() const178 Executor* DeferredExecutor::getExecutor() const {
179   assert(executor_.get());
180   return executor_.get();
181 }
182 
setExecutor(folly::Executor::KeepAlive<> executor)183 void DeferredExecutor::setExecutor(folly::Executor::KeepAlive<> executor) {
184   if (nestedExecutors_) {
185     auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
186     for (auto& nestedExecutor : *nestedExecutors) {
187       assert(nestedExecutor.get());
188       nestedExecutor.get()->setExecutor(executor.copy());
189     }
190   }
191   executor_ = std::move(executor);
192   auto state = state_.load(std::memory_order_acquire);
193   if (state == State::EMPTY &&
194       folly::atomic_compare_exchange_strong_explicit(
195           &state_,
196           &state,
197           State::HAS_EXECUTOR,
198           std::memory_order_release,
199           std::memory_order_acquire)) {
200     return;
201   }
202 
203   DCHECK(state == State::HAS_FUNCTION);
204   state_.store(State::HAS_EXECUTOR, std::memory_order_release);
205   executor_.copy().add(std::exchange(func_, nullptr));
206 }
207 
setNestedExecutors(std::vector<DeferredWrapper> executors)208 void DeferredExecutor::setNestedExecutors(
209     std::vector<DeferredWrapper> executors) {
210   DCHECK(!nestedExecutors_);
211   nestedExecutors_ =
212       std::make_unique<std::vector<DeferredWrapper>>(std::move(executors));
213 }
214 
detach()215 void DeferredExecutor::detach() {
216   if (nestedExecutors_) {
217     auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
218     for (auto& nestedExecutor : *nestedExecutors) {
219       assert(nestedExecutor.get());
220       nestedExecutor.get()->detach();
221     }
222   }
223   auto state = state_.load(std::memory_order_acquire);
224   if (state == State::EMPTY &&
225       folly::atomic_compare_exchange_strong_explicit(
226           &state_,
227           &state,
228           State::DETACHED,
229           std::memory_order_release,
230           std::memory_order_acquire)) {
231     return;
232   }
233 
234   DCHECK(state == State::HAS_FUNCTION);
235   state_.store(State::DETACHED, std::memory_order_release);
236   std::exchange(func_, nullptr);
237 }
238 
copy()239 DeferredWrapper DeferredExecutor::copy() {
240   acquire();
241   return DeferredWrapper(this);
242 }
243 
create()244 /* static */ DeferredWrapper DeferredExecutor::create() {
245   return DeferredWrapper(new DeferredExecutor{});
246 }
247 
DeferredExecutor()248 DeferredExecutor::DeferredExecutor() {}
249 
acquire()250 void DeferredExecutor::acquire() {
251   auto keepAliveCount = keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
252   DCHECK_GT(keepAliveCount, 0);
253 }
254 
release()255 void DeferredExecutor::release() {
256   auto keepAliveCount = keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
257   DCHECK_GT(keepAliveCount, 0);
258   if (keepAliveCount == 1) {
259     delete this;
260   }
261 }
262 
263 InterruptHandler::~InterruptHandler() = default;
264 
acquire()265 void InterruptHandler::acquire() {
266   auto refCount = refCount_.fetch_add(1, std::memory_order_relaxed);
267   DCHECK_GT(refCount, 0);
268 }
269 
release()270 void InterruptHandler::release() {
271   auto refCount = refCount_.fetch_sub(1, std::memory_order_acq_rel);
272   DCHECK_GT(refCount, 0);
273   if (refCount == 1) {
274     delete this;
275   }
276 }
277 
hasResult() const278 bool CoreBase::hasResult() const noexcept {
279   constexpr auto allowed = State::OnlyResult | State::Done;
280   auto core = this;
281   auto state = core->state_.load(std::memory_order_acquire);
282   while (state == State::Proxy) {
283     core = core->proxy_;
284     state = core->state_.load(std::memory_order_acquire);
285   }
286   return State() != (state & allowed);
287 }
288 
getExecutor() const289 Executor* CoreBase::getExecutor() const {
290   if (!executor_.isKeepAlive()) {
291     return nullptr;
292   }
293   return executor_.getKeepAliveExecutor();
294 }
295 
getDeferredExecutor() const296 DeferredExecutor* CoreBase::getDeferredExecutor() const {
297   if (!executor_.isDeferred()) {
298     return {};
299   }
300 
301   return executor_.getDeferredExecutor();
302 }
303 
stealDeferredExecutor()304 DeferredWrapper CoreBase::stealDeferredExecutor() {
305   if (executor_.isKeepAlive()) {
306     return {};
307   }
308 
309   return std::move(executor_).stealDeferred();
310 }
311 
raise(exception_wrapper e)312 void CoreBase::raise(exception_wrapper e) {
313   if (hasResult()) {
314     return;
315   }
316   auto interrupt = interrupt_.load(std::memory_order_acquire);
317   switch (interrupt & InterruptMask) {
318     case InterruptInitial: { // store the object
319       assert(!interrupt);
320       auto object = new exception_wrapper(std::move(e));
321       auto exchanged = folly::atomic_compare_exchange_strong_explicit(
322           &interrupt_,
323           &interrupt,
324           reinterpret_cast<uintptr_t>(object) | InterruptHasObject,
325           std::memory_order_release,
326           std::memory_order_acquire);
327       if (exchanged) {
328         return;
329       }
330       // lost the race!
331       e = std::move(*object);
332       delete object;
333       if (interrupt & InterruptHasObject) { // ignore all calls after the first
334         return;
335       }
336       assert(interrupt & InterruptHasHandler);
337       FOLLY_FALLTHROUGH;
338     }
339     case InterruptHasHandler: { // invoke the stored handler
340       auto pointer = interrupt & ~InterruptMask;
341       auto exchanged = interrupt_.compare_exchange_strong(
342           interrupt, pointer | InterruptTerminal, std::memory_order_relaxed);
343       if (!exchanged) { // ignore all calls after the first
344         return;
345       }
346       auto handler = reinterpret_cast<InterruptHandler*>(pointer);
347       handler->handle(e);
348       return;
349     }
350     case InterruptHasObject: // ignore all calls after the first
351       return;
352     case InterruptTerminal: // ignore all calls after the first
353       return;
354   }
355 }
356 
initCopyInterruptHandlerFrom(const CoreBase & other)357 void CoreBase::initCopyInterruptHandlerFrom(const CoreBase& other) {
358   assert(!interrupt_.load(std::memory_order_relaxed));
359   auto interrupt = other.interrupt_.load(std::memory_order_acquire);
360   switch (interrupt & InterruptMask) {
361     case InterruptHasHandler: { // copy the handler
362       auto pointer = interrupt & ~InterruptMask;
363       auto handler = reinterpret_cast<InterruptHandler*>(pointer);
364       handler->acquire();
365       interrupt_.store(
366           pointer | InterruptHasHandler, std::memory_order_release);
367       break;
368     }
369     case InterruptTerminal: { // copy the handler, if any
370       auto pointer = interrupt & ~InterruptMask;
371       auto handler = reinterpret_cast<InterruptHandler*>(pointer);
372       if (handler) {
373         handler->acquire();
374         interrupt_.store(
375             pointer | InterruptHasHandler, std::memory_order_release);
376       }
377       break;
378     }
379   }
380 }
381 
382 class CoreBase::CoreAndCallbackReference {
383  public:
CoreAndCallbackReference(CoreBase * core)384   explicit CoreAndCallbackReference(CoreBase* core) noexcept : core_(core) {}
385 
~CoreAndCallbackReference()386   ~CoreAndCallbackReference() noexcept { detach(); }
387 
388   CoreAndCallbackReference(CoreAndCallbackReference const& o) = delete;
389   CoreAndCallbackReference& operator=(CoreAndCallbackReference const& o) =
390       delete;
391   CoreAndCallbackReference& operator=(CoreAndCallbackReference&&) = delete;
392 
CoreAndCallbackReference(CoreAndCallbackReference && o)393   CoreAndCallbackReference(CoreAndCallbackReference&& o) noexcept
394       : core_(std::exchange(o.core_, nullptr)) {}
395 
getCore() const396   CoreBase* getCore() const noexcept { return core_; }
397 
398  private:
detach()399   void detach() noexcept {
400     if (core_) {
401       core_->derefCallback();
402       core_->detachOne();
403     }
404   }
405 
406   CoreBase* core_{nullptr};
407 };
408 
CoreBase(State state,unsigned char attached)409 CoreBase::CoreBase(State state, unsigned char attached)
410     : state_(state), attached_(attached) {}
411 
~CoreBase()412 CoreBase::~CoreBase() {
413   auto interrupt = interrupt_.load(std::memory_order_acquire);
414   auto pointer = interrupt & ~InterruptMask;
415   switch (interrupt & InterruptMask) {
416     case InterruptHasHandler: {
417       auto handler = reinterpret_cast<InterruptHandler*>(pointer);
418       handler->release();
419       break;
420     }
421     case InterruptHasObject: {
422       auto object = reinterpret_cast<exception_wrapper*>(pointer);
423       delete object;
424       break;
425     }
426     case InterruptTerminal: {
427       auto handler = reinterpret_cast<InterruptHandler*>(pointer);
428       if (handler) {
429         handler->release();
430       }
431       break;
432     }
433   }
434 }
435 
setCallback_(Callback && callback,std::shared_ptr<folly::RequestContext> && context,futures::detail::InlineContinuation allowInline)436 void CoreBase::setCallback_(
437     Callback&& callback,
438     std::shared_ptr<folly::RequestContext>&& context,
439     futures::detail::InlineContinuation allowInline) {
440   DCHECK(!hasCallback());
441 
442   ::new (&callback_) Callback(std::move(callback));
443   ::new (&context_) Context(std::move(context));
444 
445   auto state = state_.load(std::memory_order_acquire);
446   State nextState = allowInline == futures::detail::InlineContinuation::permit
447       ? State::OnlyCallbackAllowInline
448       : State::OnlyCallback;
449 
450   if (state == State::Start) {
451     if (folly::atomic_compare_exchange_strong_explicit(
452             &state_,
453             &state,
454             nextState,
455             std::memory_order_release,
456             std::memory_order_acquire)) {
457       return;
458     }
459     assume(state == State::OnlyResult || state == State::Proxy);
460   }
461 
462   if (state == State::OnlyResult) {
463     state_.store(State::Done, std::memory_order_relaxed);
464     doCallback(Executor::KeepAlive<>{}, state);
465     return;
466   }
467 
468   if (state == State::Proxy) {
469     return proxyCallback(state);
470   }
471 
472   terminate_with<std::logic_error>("setCallback unexpected state");
473 }
474 
setResult_(Executor::KeepAlive<> && completingKA)475 void CoreBase::setResult_(Executor::KeepAlive<>&& completingKA) {
476   DCHECK(!hasResult());
477 
478   auto state = state_.load(std::memory_order_acquire);
479   switch (state) {
480     case State::Start:
481       if (folly::atomic_compare_exchange_strong_explicit(
482               &state_,
483               &state,
484               State::OnlyResult,
485               std::memory_order_release,
486               std::memory_order_acquire)) {
487         return;
488       }
489       assume(
490           state == State::OnlyCallback ||
491           state == State::OnlyCallbackAllowInline);
492       FOLLY_FALLTHROUGH;
493 
494     case State::OnlyCallback:
495     case State::OnlyCallbackAllowInline:
496       state_.store(State::Done, std::memory_order_relaxed);
497       doCallback(std::move(completingKA), state);
498       return;
499     case State::OnlyResult:
500     case State::Proxy:
501     case State::Done:
502     case State::Empty:
503     default:
504       terminate_with<std::logic_error>("setResult unexpected state");
505   }
506 }
507 
setProxy_(CoreBase * proxy)508 void CoreBase::setProxy_(CoreBase* proxy) {
509   DCHECK(!hasResult());
510 
511   proxy_ = proxy;
512 
513   auto state = state_.load(std::memory_order_acquire);
514   switch (state) {
515     case State::Start:
516       if (folly::atomic_compare_exchange_strong_explicit(
517               &state_,
518               &state,
519               State::Proxy,
520               std::memory_order_release,
521               std::memory_order_acquire)) {
522         break;
523       }
524       assume(
525           state == State::OnlyCallback ||
526           state == State::OnlyCallbackAllowInline);
527       FOLLY_FALLTHROUGH;
528 
529     case State::OnlyCallback:
530     case State::OnlyCallbackAllowInline:
531       proxyCallback(state);
532       break;
533     case State::OnlyResult:
534     case State::Proxy:
535     case State::Done:
536     case State::Empty:
537     default:
538       terminate_with<std::logic_error>("setCallback unexpected state");
539   }
540 
541   detachOne();
542 }
543 
544 // May be called at most once.
doCallback(Executor::KeepAlive<> && completingKA,State priorState)545 void CoreBase::doCallback(
546     Executor::KeepAlive<>&& completingKA, State priorState) {
547   DCHECK(state_ == State::Done);
548 
549   auto executor = std::exchange(executor_, KeepAliveOrDeferred{});
550 
551   // Customise inline behaviour
552   // If addCompletingKA is non-null, then we are allowing inline execution
553   auto doAdd = [](Executor::KeepAlive<>&& addCompletingKA,
554                   KeepAliveOrDeferred&& currentExecutor,
555                   auto&& keepAliveFunc) mutable {
556     if (auto deferredExecutorPtr = currentExecutor.getDeferredExecutor()) {
557       deferredExecutorPtr->addFrom(
558           std::move(addCompletingKA), std::move(keepAliveFunc));
559     } else {
560       // If executors match call inline
561       auto currentKeepAlive = std::move(currentExecutor).stealKeepAlive();
562       if (addCompletingKA.get() == currentKeepAlive.get()) {
563         keepAliveFunc(std::move(currentKeepAlive));
564       } else {
565         std::move(currentKeepAlive).add(std::move(keepAliveFunc));
566       }
567     }
568   };
569 
570   if (executor) {
571     // If we are not allowing inline, clear the completing KA to disallow
572     if (!(priorState == State::OnlyCallbackAllowInline)) {
573       completingKA = Executor::KeepAlive<>{};
574     }
575     exception_wrapper ew;
576     // We need to reset `callback_` after it was executed (which can happen
577     // through the executor or, if `Executor::add` throws, below). The
578     // executor might discard the function without executing it (now or
579     // later), in which case `callback_` also needs to be reset.
580     // The `Core` has to be kept alive throughout that time, too. Hence we
581     // increment `attached_` and `callbackReferences_` by two, and construct
582     // exactly two `CoreAndCallbackReference` objects, which call
583     // `derefCallback` and `detachOne` in their destructor. One will guard
584     // this scope, the other one will guard the lambda passed to the executor.
585     attached_.fetch_add(2, std::memory_order_relaxed);
586     callbackReferences_.fetch_add(2, std::memory_order_relaxed);
587     CoreAndCallbackReference guard_local_scope(this);
588     CoreAndCallbackReference guard_lambda(this);
589     try {
590       doAdd(
591           std::move(completingKA),
592           std::move(executor),
593           [core_ref =
594                std::move(guard_lambda)](Executor::KeepAlive<>&& ka) mutable {
595             auto cr = std::move(core_ref);
596             CoreBase* const core = cr.getCore();
597             RequestContextScopeGuard rctx(std::move(core->context_));
598             core->callback_(*core, std::move(ka), nullptr);
599           });
600     } catch (...) {
601       ew = exception_wrapper(std::current_exception());
602     }
603     if (ew) {
604       RequestContextScopeGuard rctx(std::move(context_));
605       callback_(*this, Executor::KeepAlive<>{}, &ew);
606     }
607   } else {
608     attached_.fetch_add(1, std::memory_order_relaxed);
609     SCOPE_EXIT {
610       context_.~Context();
611       callback_.~Callback();
612       detachOne();
613     };
614     RequestContextScopeGuard rctx(std::move(context_));
615     callback_(*this, std::move(completingKA), nullptr);
616   }
617 }
618 
proxyCallback(State priorState)619 void CoreBase::proxyCallback(State priorState) {
620   // If the state of the core being proxied had a callback that allows inline
621   // execution, maintain this information in the proxy
622   futures::detail::InlineContinuation allowInline =
623       (priorState == State::OnlyCallbackAllowInline
624            ? futures::detail::InlineContinuation::permit
625            : futures::detail::InlineContinuation::forbid);
626   state_.store(State::Empty, std::memory_order_relaxed);
627   proxy_->setExecutor(std::move(executor_));
628   proxy_->setCallback_(std::move(callback_), std::move(context_), allowInline);
629   proxy_->detachFuture();
630   context_.~Context();
631   callback_.~Callback();
632 }
633 
detachOne()634 void CoreBase::detachOne() noexcept {
635   auto a = attached_.fetch_sub(1, std::memory_order_acq_rel);
636   assert(a >= 1);
637   if (a == 1) {
638     delete this;
639   }
640 }
641 
derefCallback()642 void CoreBase::derefCallback() noexcept {
643   auto c = callbackReferences_.fetch_sub(1, std::memory_order_acq_rel);
644   assert(c >= 1);
645   if (c == 1) {
646     context_.~Context();
647     callback_.~Callback();
648   }
649 }
650 
651 #if FOLLY_USE_EXTERN_FUTURE_UNIT
652 template class Core<folly::Unit>;
653 #endif
654 
655 } // namespace detail
656 } // namespace futures
657 } // namespace folly
658