1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/futures/detail/Core.h>
18
19 #include <new>
20
21 #include <folly/lang/Assume.h>
22
23 namespace folly {
24 namespace futures {
25 namespace detail {
26
operator ()(DeferredExecutor * ptr)27 void UniqueDeleter::operator()(DeferredExecutor* ptr) {
28 if (ptr) {
29 ptr->release();
30 }
31 }
32
KeepAliveOrDeferred()33 KeepAliveOrDeferred::KeepAliveOrDeferred() noexcept : state_(State::Deferred) {
34 ::new (&deferred_) DW{};
35 }
36
KeepAliveOrDeferred(KA ka)37 KeepAliveOrDeferred::KeepAliveOrDeferred(KA ka) noexcept
38 : state_(State::KeepAlive) {
39 ::new (&keepAlive_) KA{std::move(ka)};
40 }
41
KeepAliveOrDeferred(DW deferred)42 KeepAliveOrDeferred::KeepAliveOrDeferred(DW deferred) noexcept
43 : state_(State::Deferred) {
44 ::new (&deferred_) DW{std::move(deferred)};
45 }
46
KeepAliveOrDeferred(KeepAliveOrDeferred && other)47 KeepAliveOrDeferred::KeepAliveOrDeferred(KeepAliveOrDeferred&& other) noexcept
48 : state_(other.state_) {
49 switch (state_) {
50 case State::Deferred:
51 ::new (&deferred_) DW{std::move(other.deferred_)};
52 break;
53 case State::KeepAlive:
54 ::new (&keepAlive_) KA{std::move(other.keepAlive_)};
55 break;
56 }
57 }
58
~KeepAliveOrDeferred()59 KeepAliveOrDeferred::~KeepAliveOrDeferred() {
60 switch (state_) {
61 case State::Deferred:
62 deferred_.~DW();
63 break;
64 case State::KeepAlive:
65 keepAlive_.~KA();
66 break;
67 }
68 }
69
operator =(KeepAliveOrDeferred && other)70 KeepAliveOrDeferred& KeepAliveOrDeferred::operator=(
71 KeepAliveOrDeferred&& other) noexcept {
72 // This is safe to do because KeepAliveOrDeferred is nothrow
73 // move-constructible.
74 this->~KeepAliveOrDeferred();
75 ::new (this) KeepAliveOrDeferred{std::move(other)};
76 return *this;
77 }
78
getDeferredExecutor() const79 DeferredExecutor* KeepAliveOrDeferred::getDeferredExecutor() const noexcept {
80 switch (state_) {
81 case State::Deferred:
82 return deferred_.get();
83 case State::KeepAlive:
84 return nullptr;
85 }
86 assume_unreachable();
87 }
88
getKeepAliveExecutor() const89 Executor* KeepAliveOrDeferred::getKeepAliveExecutor() const noexcept {
90 switch (state_) {
91 case State::Deferred:
92 return nullptr;
93 case State::KeepAlive:
94 return keepAlive_.get();
95 }
96 assume_unreachable();
97 }
98
stealKeepAlive()99 KeepAliveOrDeferred::KA KeepAliveOrDeferred::stealKeepAlive() && noexcept {
100 switch (state_) {
101 case State::Deferred:
102 return KA{};
103 case State::KeepAlive:
104 return std::move(keepAlive_);
105 }
106 assume_unreachable();
107 }
108
stealDeferred()109 KeepAliveOrDeferred::DW KeepAliveOrDeferred::stealDeferred() && noexcept {
110 switch (state_) {
111 case State::Deferred:
112 return std::move(deferred_);
113 case State::KeepAlive:
114 return DW{};
115 }
116 assume_unreachable();
117 }
118
copy() const119 KeepAliveOrDeferred KeepAliveOrDeferred::copy() const {
120 switch (state_) {
121 case State::Deferred:
122 if (auto def = getDeferredExecutor()) {
123 return KeepAliveOrDeferred{def->copy()};
124 } else {
125 return KeepAliveOrDeferred{};
126 }
127 case State::KeepAlive:
128 return KeepAliveOrDeferred{keepAlive_};
129 }
130 assume_unreachable();
131 }
132
operator bool() const133 /* explicit */ KeepAliveOrDeferred::operator bool() const noexcept {
134 return getDeferredExecutor() || getKeepAliveExecutor();
135 }
136
addFrom(Executor::KeepAlive<> && completingKA,Executor::KeepAlive<>::KeepAliveFunc func)137 void DeferredExecutor::addFrom(
138 Executor::KeepAlive<>&& completingKA,
139 Executor::KeepAlive<>::KeepAliveFunc func) {
140 auto state = state_.load(std::memory_order_acquire);
141 if (state == State::DETACHED) {
142 return;
143 }
144
145 // If we are completing on the current executor, call inline, otherwise
146 // add
147 auto addWithInline =
148 [&](Executor::KeepAlive<>::KeepAliveFunc&& addFunc) mutable {
149 if (completingKA.get() == executor_.get()) {
150 addFunc(std::move(completingKA));
151 } else {
152 executor_.copy().add(std::move(addFunc));
153 }
154 };
155
156 if (state == State::HAS_EXECUTOR) {
157 addWithInline(std::move(func));
158 return;
159 }
160 DCHECK(state == State::EMPTY);
161 func_ = std::move(func);
162 if (folly::atomic_compare_exchange_strong_explicit(
163 &state_,
164 &state,
165 State::HAS_FUNCTION,
166 std::memory_order_release,
167 std::memory_order_acquire)) {
168 return;
169 }
170 DCHECK(state == State::DETACHED || state == State::HAS_EXECUTOR);
171 if (state == State::DETACHED) {
172 std::exchange(func_, nullptr);
173 return;
174 }
175 addWithInline(std::exchange(func_, nullptr));
176 }
177
getExecutor() const178 Executor* DeferredExecutor::getExecutor() const {
179 assert(executor_.get());
180 return executor_.get();
181 }
182
setExecutor(folly::Executor::KeepAlive<> executor)183 void DeferredExecutor::setExecutor(folly::Executor::KeepAlive<> executor) {
184 if (nestedExecutors_) {
185 auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
186 for (auto& nestedExecutor : *nestedExecutors) {
187 assert(nestedExecutor.get());
188 nestedExecutor.get()->setExecutor(executor.copy());
189 }
190 }
191 executor_ = std::move(executor);
192 auto state = state_.load(std::memory_order_acquire);
193 if (state == State::EMPTY &&
194 folly::atomic_compare_exchange_strong_explicit(
195 &state_,
196 &state,
197 State::HAS_EXECUTOR,
198 std::memory_order_release,
199 std::memory_order_acquire)) {
200 return;
201 }
202
203 DCHECK(state == State::HAS_FUNCTION);
204 state_.store(State::HAS_EXECUTOR, std::memory_order_release);
205 executor_.copy().add(std::exchange(func_, nullptr));
206 }
207
setNestedExecutors(std::vector<DeferredWrapper> executors)208 void DeferredExecutor::setNestedExecutors(
209 std::vector<DeferredWrapper> executors) {
210 DCHECK(!nestedExecutors_);
211 nestedExecutors_ =
212 std::make_unique<std::vector<DeferredWrapper>>(std::move(executors));
213 }
214
detach()215 void DeferredExecutor::detach() {
216 if (nestedExecutors_) {
217 auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
218 for (auto& nestedExecutor : *nestedExecutors) {
219 assert(nestedExecutor.get());
220 nestedExecutor.get()->detach();
221 }
222 }
223 auto state = state_.load(std::memory_order_acquire);
224 if (state == State::EMPTY &&
225 folly::atomic_compare_exchange_strong_explicit(
226 &state_,
227 &state,
228 State::DETACHED,
229 std::memory_order_release,
230 std::memory_order_acquire)) {
231 return;
232 }
233
234 DCHECK(state == State::HAS_FUNCTION);
235 state_.store(State::DETACHED, std::memory_order_release);
236 std::exchange(func_, nullptr);
237 }
238
copy()239 DeferredWrapper DeferredExecutor::copy() {
240 acquire();
241 return DeferredWrapper(this);
242 }
243
create()244 /* static */ DeferredWrapper DeferredExecutor::create() {
245 return DeferredWrapper(new DeferredExecutor{});
246 }
247
DeferredExecutor()248 DeferredExecutor::DeferredExecutor() {}
249
acquire()250 void DeferredExecutor::acquire() {
251 auto keepAliveCount = keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
252 DCHECK_GT(keepAliveCount, 0);
253 }
254
release()255 void DeferredExecutor::release() {
256 auto keepAliveCount = keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
257 DCHECK_GT(keepAliveCount, 0);
258 if (keepAliveCount == 1) {
259 delete this;
260 }
261 }
262
263 InterruptHandler::~InterruptHandler() = default;
264
acquire()265 void InterruptHandler::acquire() {
266 auto refCount = refCount_.fetch_add(1, std::memory_order_relaxed);
267 DCHECK_GT(refCount, 0);
268 }
269
release()270 void InterruptHandler::release() {
271 auto refCount = refCount_.fetch_sub(1, std::memory_order_acq_rel);
272 DCHECK_GT(refCount, 0);
273 if (refCount == 1) {
274 delete this;
275 }
276 }
277
hasResult() const278 bool CoreBase::hasResult() const noexcept {
279 constexpr auto allowed = State::OnlyResult | State::Done;
280 auto core = this;
281 auto state = core->state_.load(std::memory_order_acquire);
282 while (state == State::Proxy) {
283 core = core->proxy_;
284 state = core->state_.load(std::memory_order_acquire);
285 }
286 return State() != (state & allowed);
287 }
288
getExecutor() const289 Executor* CoreBase::getExecutor() const {
290 if (!executor_.isKeepAlive()) {
291 return nullptr;
292 }
293 return executor_.getKeepAliveExecutor();
294 }
295
getDeferredExecutor() const296 DeferredExecutor* CoreBase::getDeferredExecutor() const {
297 if (!executor_.isDeferred()) {
298 return {};
299 }
300
301 return executor_.getDeferredExecutor();
302 }
303
stealDeferredExecutor()304 DeferredWrapper CoreBase::stealDeferredExecutor() {
305 if (executor_.isKeepAlive()) {
306 return {};
307 }
308
309 return std::move(executor_).stealDeferred();
310 }
311
raise(exception_wrapper e)312 void CoreBase::raise(exception_wrapper e) {
313 if (hasResult()) {
314 return;
315 }
316 auto interrupt = interrupt_.load(std::memory_order_acquire);
317 switch (interrupt & InterruptMask) {
318 case InterruptInitial: { // store the object
319 assert(!interrupt);
320 auto object = new exception_wrapper(std::move(e));
321 auto exchanged = folly::atomic_compare_exchange_strong_explicit(
322 &interrupt_,
323 &interrupt,
324 reinterpret_cast<uintptr_t>(object) | InterruptHasObject,
325 std::memory_order_release,
326 std::memory_order_acquire);
327 if (exchanged) {
328 return;
329 }
330 // lost the race!
331 e = std::move(*object);
332 delete object;
333 if (interrupt & InterruptHasObject) { // ignore all calls after the first
334 return;
335 }
336 assert(interrupt & InterruptHasHandler);
337 FOLLY_FALLTHROUGH;
338 }
339 case InterruptHasHandler: { // invoke the stored handler
340 auto pointer = interrupt & ~InterruptMask;
341 auto exchanged = interrupt_.compare_exchange_strong(
342 interrupt, pointer | InterruptTerminal, std::memory_order_relaxed);
343 if (!exchanged) { // ignore all calls after the first
344 return;
345 }
346 auto handler = reinterpret_cast<InterruptHandler*>(pointer);
347 handler->handle(e);
348 return;
349 }
350 case InterruptHasObject: // ignore all calls after the first
351 return;
352 case InterruptTerminal: // ignore all calls after the first
353 return;
354 }
355 }
356
initCopyInterruptHandlerFrom(const CoreBase & other)357 void CoreBase::initCopyInterruptHandlerFrom(const CoreBase& other) {
358 assert(!interrupt_.load(std::memory_order_relaxed));
359 auto interrupt = other.interrupt_.load(std::memory_order_acquire);
360 switch (interrupt & InterruptMask) {
361 case InterruptHasHandler: { // copy the handler
362 auto pointer = interrupt & ~InterruptMask;
363 auto handler = reinterpret_cast<InterruptHandler*>(pointer);
364 handler->acquire();
365 interrupt_.store(
366 pointer | InterruptHasHandler, std::memory_order_release);
367 break;
368 }
369 case InterruptTerminal: { // copy the handler, if any
370 auto pointer = interrupt & ~InterruptMask;
371 auto handler = reinterpret_cast<InterruptHandler*>(pointer);
372 if (handler) {
373 handler->acquire();
374 interrupt_.store(
375 pointer | InterruptHasHandler, std::memory_order_release);
376 }
377 break;
378 }
379 }
380 }
381
382 class CoreBase::CoreAndCallbackReference {
383 public:
CoreAndCallbackReference(CoreBase * core)384 explicit CoreAndCallbackReference(CoreBase* core) noexcept : core_(core) {}
385
~CoreAndCallbackReference()386 ~CoreAndCallbackReference() noexcept { detach(); }
387
388 CoreAndCallbackReference(CoreAndCallbackReference const& o) = delete;
389 CoreAndCallbackReference& operator=(CoreAndCallbackReference const& o) =
390 delete;
391 CoreAndCallbackReference& operator=(CoreAndCallbackReference&&) = delete;
392
CoreAndCallbackReference(CoreAndCallbackReference && o)393 CoreAndCallbackReference(CoreAndCallbackReference&& o) noexcept
394 : core_(std::exchange(o.core_, nullptr)) {}
395
getCore() const396 CoreBase* getCore() const noexcept { return core_; }
397
398 private:
detach()399 void detach() noexcept {
400 if (core_) {
401 core_->derefCallback();
402 core_->detachOne();
403 }
404 }
405
406 CoreBase* core_{nullptr};
407 };
408
CoreBase(State state,unsigned char attached)409 CoreBase::CoreBase(State state, unsigned char attached)
410 : state_(state), attached_(attached) {}
411
~CoreBase()412 CoreBase::~CoreBase() {
413 auto interrupt = interrupt_.load(std::memory_order_acquire);
414 auto pointer = interrupt & ~InterruptMask;
415 switch (interrupt & InterruptMask) {
416 case InterruptHasHandler: {
417 auto handler = reinterpret_cast<InterruptHandler*>(pointer);
418 handler->release();
419 break;
420 }
421 case InterruptHasObject: {
422 auto object = reinterpret_cast<exception_wrapper*>(pointer);
423 delete object;
424 break;
425 }
426 case InterruptTerminal: {
427 auto handler = reinterpret_cast<InterruptHandler*>(pointer);
428 if (handler) {
429 handler->release();
430 }
431 break;
432 }
433 }
434 }
435
setCallback_(Callback && callback,std::shared_ptr<folly::RequestContext> && context,futures::detail::InlineContinuation allowInline)436 void CoreBase::setCallback_(
437 Callback&& callback,
438 std::shared_ptr<folly::RequestContext>&& context,
439 futures::detail::InlineContinuation allowInline) {
440 DCHECK(!hasCallback());
441
442 ::new (&callback_) Callback(std::move(callback));
443 ::new (&context_) Context(std::move(context));
444
445 auto state = state_.load(std::memory_order_acquire);
446 State nextState = allowInline == futures::detail::InlineContinuation::permit
447 ? State::OnlyCallbackAllowInline
448 : State::OnlyCallback;
449
450 if (state == State::Start) {
451 if (folly::atomic_compare_exchange_strong_explicit(
452 &state_,
453 &state,
454 nextState,
455 std::memory_order_release,
456 std::memory_order_acquire)) {
457 return;
458 }
459 assume(state == State::OnlyResult || state == State::Proxy);
460 }
461
462 if (state == State::OnlyResult) {
463 state_.store(State::Done, std::memory_order_relaxed);
464 doCallback(Executor::KeepAlive<>{}, state);
465 return;
466 }
467
468 if (state == State::Proxy) {
469 return proxyCallback(state);
470 }
471
472 terminate_with<std::logic_error>("setCallback unexpected state");
473 }
474
setResult_(Executor::KeepAlive<> && completingKA)475 void CoreBase::setResult_(Executor::KeepAlive<>&& completingKA) {
476 DCHECK(!hasResult());
477
478 auto state = state_.load(std::memory_order_acquire);
479 switch (state) {
480 case State::Start:
481 if (folly::atomic_compare_exchange_strong_explicit(
482 &state_,
483 &state,
484 State::OnlyResult,
485 std::memory_order_release,
486 std::memory_order_acquire)) {
487 return;
488 }
489 assume(
490 state == State::OnlyCallback ||
491 state == State::OnlyCallbackAllowInline);
492 FOLLY_FALLTHROUGH;
493
494 case State::OnlyCallback:
495 case State::OnlyCallbackAllowInline:
496 state_.store(State::Done, std::memory_order_relaxed);
497 doCallback(std::move(completingKA), state);
498 return;
499 case State::OnlyResult:
500 case State::Proxy:
501 case State::Done:
502 case State::Empty:
503 default:
504 terminate_with<std::logic_error>("setResult unexpected state");
505 }
506 }
507
setProxy_(CoreBase * proxy)508 void CoreBase::setProxy_(CoreBase* proxy) {
509 DCHECK(!hasResult());
510
511 proxy_ = proxy;
512
513 auto state = state_.load(std::memory_order_acquire);
514 switch (state) {
515 case State::Start:
516 if (folly::atomic_compare_exchange_strong_explicit(
517 &state_,
518 &state,
519 State::Proxy,
520 std::memory_order_release,
521 std::memory_order_acquire)) {
522 break;
523 }
524 assume(
525 state == State::OnlyCallback ||
526 state == State::OnlyCallbackAllowInline);
527 FOLLY_FALLTHROUGH;
528
529 case State::OnlyCallback:
530 case State::OnlyCallbackAllowInline:
531 proxyCallback(state);
532 break;
533 case State::OnlyResult:
534 case State::Proxy:
535 case State::Done:
536 case State::Empty:
537 default:
538 terminate_with<std::logic_error>("setCallback unexpected state");
539 }
540
541 detachOne();
542 }
543
544 // May be called at most once.
doCallback(Executor::KeepAlive<> && completingKA,State priorState)545 void CoreBase::doCallback(
546 Executor::KeepAlive<>&& completingKA, State priorState) {
547 DCHECK(state_ == State::Done);
548
549 auto executor = std::exchange(executor_, KeepAliveOrDeferred{});
550
551 // Customise inline behaviour
552 // If addCompletingKA is non-null, then we are allowing inline execution
553 auto doAdd = [](Executor::KeepAlive<>&& addCompletingKA,
554 KeepAliveOrDeferred&& currentExecutor,
555 auto&& keepAliveFunc) mutable {
556 if (auto deferredExecutorPtr = currentExecutor.getDeferredExecutor()) {
557 deferredExecutorPtr->addFrom(
558 std::move(addCompletingKA), std::move(keepAliveFunc));
559 } else {
560 // If executors match call inline
561 auto currentKeepAlive = std::move(currentExecutor).stealKeepAlive();
562 if (addCompletingKA.get() == currentKeepAlive.get()) {
563 keepAliveFunc(std::move(currentKeepAlive));
564 } else {
565 std::move(currentKeepAlive).add(std::move(keepAliveFunc));
566 }
567 }
568 };
569
570 if (executor) {
571 // If we are not allowing inline, clear the completing KA to disallow
572 if (!(priorState == State::OnlyCallbackAllowInline)) {
573 completingKA = Executor::KeepAlive<>{};
574 }
575 exception_wrapper ew;
576 // We need to reset `callback_` after it was executed (which can happen
577 // through the executor or, if `Executor::add` throws, below). The
578 // executor might discard the function without executing it (now or
579 // later), in which case `callback_` also needs to be reset.
580 // The `Core` has to be kept alive throughout that time, too. Hence we
581 // increment `attached_` and `callbackReferences_` by two, and construct
582 // exactly two `CoreAndCallbackReference` objects, which call
583 // `derefCallback` and `detachOne` in their destructor. One will guard
584 // this scope, the other one will guard the lambda passed to the executor.
585 attached_.fetch_add(2, std::memory_order_relaxed);
586 callbackReferences_.fetch_add(2, std::memory_order_relaxed);
587 CoreAndCallbackReference guard_local_scope(this);
588 CoreAndCallbackReference guard_lambda(this);
589 try {
590 doAdd(
591 std::move(completingKA),
592 std::move(executor),
593 [core_ref =
594 std::move(guard_lambda)](Executor::KeepAlive<>&& ka) mutable {
595 auto cr = std::move(core_ref);
596 CoreBase* const core = cr.getCore();
597 RequestContextScopeGuard rctx(std::move(core->context_));
598 core->callback_(*core, std::move(ka), nullptr);
599 });
600 } catch (...) {
601 ew = exception_wrapper(std::current_exception());
602 }
603 if (ew) {
604 RequestContextScopeGuard rctx(std::move(context_));
605 callback_(*this, Executor::KeepAlive<>{}, &ew);
606 }
607 } else {
608 attached_.fetch_add(1, std::memory_order_relaxed);
609 SCOPE_EXIT {
610 context_.~Context();
611 callback_.~Callback();
612 detachOne();
613 };
614 RequestContextScopeGuard rctx(std::move(context_));
615 callback_(*this, std::move(completingKA), nullptr);
616 }
617 }
618
proxyCallback(State priorState)619 void CoreBase::proxyCallback(State priorState) {
620 // If the state of the core being proxied had a callback that allows inline
621 // execution, maintain this information in the proxy
622 futures::detail::InlineContinuation allowInline =
623 (priorState == State::OnlyCallbackAllowInline
624 ? futures::detail::InlineContinuation::permit
625 : futures::detail::InlineContinuation::forbid);
626 state_.store(State::Empty, std::memory_order_relaxed);
627 proxy_->setExecutor(std::move(executor_));
628 proxy_->setCallback_(std::move(callback_), std::move(context_), allowInline);
629 proxy_->detachFuture();
630 context_.~Context();
631 callback_.~Callback();
632 }
633
detachOne()634 void CoreBase::detachOne() noexcept {
635 auto a = attached_.fetch_sub(1, std::memory_order_acq_rel);
636 assert(a >= 1);
637 if (a == 1) {
638 delete this;
639 }
640 }
641
derefCallback()642 void CoreBase::derefCallback() noexcept {
643 auto c = callbackReferences_.fetch_sub(1, std::memory_order_acq_rel);
644 assert(c >= 1);
645 if (c == 1) {
646 context_.~Context();
647 callback_.~Callback();
648 }
649 }
650
651 #if FOLLY_USE_EXTERN_FUTURE_UNIT
652 template class Core<folly::Unit>;
653 #endif
654
655 } // namespace detail
656 } // namespace futures
657 } // namespace folly
658