1 // 2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 // 7 #pragma once 8 9 #include "td/actor/actor.h" 10 11 #include "td/utils/CancellationToken.h" 12 #include "td/utils/Closure.h" 13 #include "td/utils/common.h" 14 #include "td/utils/invoke.h" 15 #include "td/utils/MovableValue.h" 16 #include "td/utils/ScopeGuard.h" 17 #include "td/utils/Status.h" 18 19 #include <tuple> 20 #include <type_traits> 21 #include <utility> 22 23 namespace td { 24 25 template <class T = Unit> 26 class PromiseInterface { 27 public: 28 PromiseInterface() = default; 29 PromiseInterface(const PromiseInterface &) = delete; 30 PromiseInterface &operator=(const PromiseInterface &) = delete; 31 PromiseInterface(PromiseInterface &&) = default; 32 PromiseInterface &operator=(PromiseInterface &&) = default; 33 virtual ~PromiseInterface() = default; set_value(T && value)34 virtual void set_value(T &&value) { 35 set_result(std::move(value)); 36 } set_error(Status && error)37 virtual void set_error(Status &&error) { 38 set_result(std::move(error)); 39 } set_result(Result<T> && result)40 virtual void set_result(Result<T> &&result) { 41 if (result.is_ok()) { 42 set_value(result.move_as_ok()); 43 } else { 44 set_error(result.move_as_error()); 45 } 46 } operator()47 void operator()(T &&value) { 48 set_value(std::move(value)); 49 } operator()50 void operator()(Status &&error) { 51 set_error(std::move(error)); 52 } operator()53 void operator()(Result<T> &&result) { 54 set_result(std::move(result)); 55 } is_cancellable()56 virtual bool is_cancellable() const { 57 return false; 58 } is_canceled()59 virtual bool is_canceled() const { 60 return false; 61 } 62 start_migrate(int32 sched_id)63 virtual void start_migrate(int32 sched_id) { 64 } finish_migrate()65 virtual void finish_migrate() { 66 } 67 }; 68 69 namespace detail { 70 71 template <typename T> 72 struct GetArg final : public GetArg<decltype(&T::operator())> {}; 73 74 template <class C, class R, class Arg> 75 class GetArg<R (C::*)(Arg)> { 76 public: 77 using type = Arg; 78 }; 79 template <class C, class R, class Arg> 80 class GetArg<R (C::*)(Arg) const> { 81 public: 82 using type = Arg; 83 }; 84 85 template <class T> 86 using get_arg_t = std::decay_t<typename GetArg<T>::type>; 87 88 template <class T> 89 struct DropResult { 90 using type = T; 91 }; 92 93 template <class T> 94 struct DropResult<Result<T>> { 95 using type = T; 96 }; 97 98 template <class T> 99 using drop_result_t = typename DropResult<T>::type; 100 101 struct Ignore { 102 void operator()(Status &&error) { 103 error.ignore(); 104 } 105 }; 106 107 template <class ValueT, class FunctionOkT, class FunctionFailT = Ignore> 108 class LambdaPromise : public PromiseInterface<ValueT> { 109 enum class OnFail { None, Ok, Fail }; 110 111 public: 112 void set_value(ValueT &&value) override { 113 CHECK(has_lambda_.get()); 114 do_ok(ok_, std::move(value)); 115 on_fail_ = OnFail::None; 116 } 117 void set_error(Status &&error) override { 118 CHECK(has_lambda_.get()); 119 do_error(std::move(error)); 120 } 121 LambdaPromise(const LambdaPromise &other) = delete; 122 LambdaPromise &operator=(const LambdaPromise &other) = delete; 123 LambdaPromise(LambdaPromise &&other) = default; 124 LambdaPromise &operator=(LambdaPromise &&other) = default; 125 ~LambdaPromise() override { 126 if (has_lambda_.get()) { 127 do_error(Status::Error("Lost promise")); 128 } 129 } 130 131 template <class FromOkT, class FromFailT> 132 LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail) 133 : ok_(std::forward<FromOkT>(ok)) 134 , fail_(std::forward<FromFailT>(fail)) 135 , on_fail_(use_ok_as_fail ? OnFail::Ok : OnFail::Fail) 136 , has_lambda_(true) { 137 } 138 template <class FromOkT, std::enable_if_t<!std::is_same<std::decay_t<FromOkT>, LambdaPromise>::value, int> = 0> 139 LambdaPromise(FromOkT &&ok) : LambdaPromise(std::forward<FromOkT>(ok), Ignore(), true) { 140 } 141 142 private: 143 FunctionOkT ok_; 144 FunctionFailT fail_; 145 OnFail on_fail_ = OnFail::None; 146 MovableValue<bool> has_lambda_{false}; 147 148 void do_error(Status &&error) { 149 switch (on_fail_) { 150 case OnFail::None: 151 break; 152 case OnFail::Ok: 153 do_error(ok_, std::move(error)); 154 break; 155 case OnFail::Fail: 156 do_error(fail_, std::move(error)); 157 break; 158 } 159 on_fail_ = OnFail::None; 160 } 161 162 template <class F> 163 std::enable_if_t<is_callable<F, Result<ValueT>>::value, void> do_error(F &&f, Status &&status) { 164 f(Result<ValueT>(std::move(status))); 165 } 166 template <class Y, class F> 167 std::enable_if_t<!is_callable<F, Result<ValueT>>::value, void> do_error(F &&f, Y &&status) { 168 f(Auto()); 169 } 170 template <class F> 171 std::enable_if_t<is_callable<F, Result<ValueT>>::value, void> do_ok(F &&f, ValueT &&result) { 172 f(Result<ValueT>(std::move(result))); 173 } 174 template <class F> 175 std::enable_if_t<!is_callable<F, Result<ValueT>>::value, void> do_ok(F &&f, ValueT &&result) { 176 f(std::move(result)); 177 } 178 }; 179 } // namespace detail 180 181 template <class T> 182 class SafePromise; 183 184 template <class T = Unit> 185 class Promise; 186 187 constexpr std::false_type is_promise_interface(...); 188 189 template <class T> 190 constexpr std::true_type is_promise_interface(const PromiseInterface<T> &promise); 191 192 template <class T> 193 constexpr std::true_type is_promise_interface(const Promise<T> &promise); 194 195 template <class F> 196 constexpr bool is_promise_interface() { 197 return decltype(is_promise_interface(std::declval<F>()))::value; 198 } 199 200 constexpr std::false_type is_promise_interface_ptr(...); 201 202 template <class T> 203 constexpr std::true_type is_promise_interface_ptr(const unique_ptr<T> &promise); 204 205 template <class F> 206 constexpr bool is_promise_interface_ptr() { 207 return decltype(is_promise_interface_ptr(std::declval<F>()))::value; 208 } 209 210 template <class T = void, class F = void, std::enable_if_t<std::is_same<T, void>::value, bool> has_t = false> 211 auto lambda_promise(F &&f) { 212 return detail::LambdaPromise<detail::drop_result_t<detail::get_arg_t<std::decay_t<F>>>, std::decay_t<F>>( 213 std::forward<F>(f)); 214 } 215 template <class T = void, class F = void, std::enable_if_t<!std::is_same<T, void>::value, bool> has_t = true> 216 auto lambda_promise(F &&f) { 217 return detail::LambdaPromise<T, std::decay_t<F>>(std::forward<F>(f)); 218 } 219 220 template <class T, class F, std::enable_if_t<is_promise_interface<F>(), bool> from_promise_interface = true> 221 auto &&promise_interface(F &&f) { 222 return std::forward<F>(f); 223 } 224 225 template <class T, class F, std::enable_if_t<!is_promise_interface<F>(), bool> from_promise_interface = false> 226 auto promise_interface(F &&f) { 227 return lambda_promise<T>(std::forward<F>(f)); 228 } 229 230 template <class T, class F, std::enable_if_t<is_promise_interface_ptr<F>(), bool> from_promise_interface = true> 231 auto promise_interface_ptr(F &&f) { 232 return std::forward<F>(f); 233 } 234 template <class T, class F, std::enable_if_t<!is_promise_interface_ptr<F>(), bool> from_promise_interface = false> 235 auto promise_interface_ptr(F &&f) { 236 return td::make_unique<std::decay_t<decltype(promise_interface<T>(std::forward<F>(f)))>>( 237 promise_interface<T>(std::forward<F>(f))); 238 } 239 240 template <class T> 241 class Promise { 242 public: 243 void set_value(T &&value) { 244 if (!promise_) { 245 return; 246 } 247 promise_->set_value(std::move(value)); 248 promise_.reset(); 249 } 250 void set_error(Status &&error) { 251 if (!promise_) { 252 return; 253 } 254 promise_->set_error(std::move(error)); 255 promise_.reset(); 256 } 257 void set_result(Result<T> &&result) { 258 if (!promise_) { 259 return; 260 } 261 promise_->set_result(std::move(result)); 262 promise_.reset(); 263 } 264 template <class S> 265 void operator()(S &&result) { 266 if (!promise_) { 267 return; 268 } 269 promise_->operator()(std::forward<S>(result)); 270 promise_.reset(); 271 } 272 void reset() { 273 promise_.reset(); 274 } 275 void start_migrate(int32 sched_id) { 276 if (!promise_) { 277 return; 278 } 279 promise_->start_migrate(sched_id); 280 } 281 void finish_migrate() { 282 if (!promise_) { 283 return; 284 } 285 promise_->finish_migrate(); 286 } 287 bool is_cancellable() const { 288 if (!promise_) { 289 return false; 290 } 291 return promise_->is_cancellable(); 292 } 293 bool is_canceled() const { 294 if (!promise_) { 295 return false; 296 } 297 return promise_->is_canceled(); 298 } 299 unique_ptr<PromiseInterface<T>> release() { 300 return std::move(promise_); 301 } 302 303 Promise() = default; 304 explicit Promise(unique_ptr<PromiseInterface<T>> promise) : promise_(std::move(promise)) { 305 } 306 Promise(Auto) { 307 } 308 Promise(SafePromise<T> &&other); 309 Promise &operator=(SafePromise<T> &&other); 310 template <class F, std::enable_if_t<!std::is_same<std::decay_t<F>, Promise>::value, int> = 0> 311 Promise(F &&f) : promise_(promise_interface_ptr<T>(std::forward<F>(f))) { 312 } 313 314 explicit operator bool() { 315 return static_cast<bool>(promise_); 316 } 317 318 private: 319 unique_ptr<PromiseInterface<T>> promise_; 320 }; 321 322 template <class T> 323 void start_migrate(Promise<T> &promise, int32 sched_id) { 324 // promise.start_migrate(sched_id); 325 } 326 template <class T> 327 void finish_migrate(Promise<T> &promise) { 328 // promise.finish_migrate(); 329 } 330 331 template <class T = Unit> 332 class SafePromise { 333 public: 334 SafePromise(Promise<T> promise, Result<T> result) : promise_(std::move(promise)), result_(std::move(result)) { 335 } 336 SafePromise(const SafePromise &other) = delete; 337 SafePromise &operator=(const SafePromise &other) = delete; 338 SafePromise(SafePromise &&other) = default; 339 SafePromise &operator=(SafePromise &&other) = default; 340 ~SafePromise() { 341 if (promise_) { 342 promise_.set_result(std::move(result_)); 343 } 344 } 345 Promise<T> release() { 346 return std::move(promise_); 347 } 348 349 private: 350 Promise<T> promise_; 351 Result<T> result_; 352 }; 353 354 template <class T> 355 Promise<T>::Promise(SafePromise<T> &&other) : Promise(other.release()) { 356 } 357 template <class T> 358 Promise<T> &Promise<T>::operator=(SafePromise<T> &&other) { 359 *this = other.release(); 360 return *this; 361 } 362 363 namespace detail { 364 365 class EventPromise final : public PromiseInterface<Unit> { 366 public: 367 void set_value(Unit &&) final { 368 ok_.try_emit(); 369 fail_.clear(); 370 } 371 void set_error(Status &&) final { 372 do_set_error(); 373 } 374 375 EventPromise(const EventPromise &other) = delete; 376 EventPromise &operator=(const EventPromise &other) = delete; 377 EventPromise(EventPromise &&other) = delete; 378 EventPromise &operator=(EventPromise &&other) = delete; 379 ~EventPromise() final { 380 do_set_error(); 381 } 382 383 EventPromise() = default; 384 explicit EventPromise(EventFull ok) : ok_(std::move(ok)), use_ok_as_fail_(true) { 385 } 386 EventPromise(EventFull ok, EventFull fail) : ok_(std::move(ok)), fail_(std::move(fail)), use_ok_as_fail_(false) { 387 } 388 389 private: 390 EventFull ok_; 391 EventFull fail_; 392 bool use_ok_as_fail_ = false; 393 void do_set_error() { 394 if (use_ok_as_fail_) { 395 ok_.try_emit(); 396 } else { 397 ok_.clear(); 398 fail_.try_emit(); 399 } 400 } 401 }; 402 403 template <class PromiseT> 404 class CancellablePromise final : public PromiseT { 405 public: 406 template <class... ArgsT> 407 CancellablePromise(CancellationToken cancellation_token, ArgsT &&... args) 408 : PromiseT(std::forward<ArgsT>(args)...), cancellation_token_(std::move(cancellation_token)) { 409 } 410 bool is_cancellable() const final { 411 return true; 412 } 413 bool is_canceled() const final { 414 return static_cast<bool>(cancellation_token_); 415 } 416 417 private: 418 CancellationToken cancellation_token_; 419 }; 420 421 template <class... ArgsT> 422 class JoinPromise final : public PromiseInterface<Unit> { 423 public: 424 explicit JoinPromise(ArgsT &&... arg) : promises_(std::forward<ArgsT>(arg)...) { 425 } 426 void set_value(Unit &&) final { 427 tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); }); 428 } 429 void set_error(Status &&error) final { 430 tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); }); 431 } 432 433 private: 434 std::tuple<std::decay_t<ArgsT>...> promises_; 435 }; 436 } // namespace detail 437 438 class SendClosure { 439 public: 440 template <class... ArgsT> 441 void operator()(ArgsT &&... args) const { 442 send_closure(std::forward<ArgsT>(args)...); 443 } 444 }; 445 446 //template <class T> 447 //template <class... ArgsT> 448 //auto Promise<T>::send_closure(ArgsT &&... args) { 449 // return [promise = std::move(*this), t = std::make_tuple(std::forward<ArgsT>(args)...)](auto &&r_res) mutable { 450 // TRY_RESULT_PROMISE(promise, res, std::move(r_res)); 451 // td2::call_tuple(SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::move(res), std::move(promise)))); 452 // }; 453 //} 454 455 template <class... ArgsT> 456 auto promise_send_closure(ArgsT &&... args) { 457 return [t = std::make_tuple(std::forward<ArgsT>(args)...)](auto &&res) mutable { 458 call_tuple(SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::forward<decltype(res)>(res)))); 459 }; 460 } 461 462 /*** FutureActor and PromiseActor ***/ 463 template <class T> 464 class FutureActor; 465 466 template <class T> 467 class PromiseActor; 468 469 template <class T> 470 class ActorTraits<FutureActor<T>> { 471 public: 472 static constexpr bool need_context = false; 473 static constexpr bool need_start_up = false; 474 }; 475 476 template <class T> 477 class PromiseActor final : public PromiseInterface<T> { 478 friend class FutureActor<T>; 479 enum State { Waiting, Hangup }; 480 481 public: 482 PromiseActor() = default; 483 PromiseActor(const PromiseActor &other) = delete; 484 PromiseActor &operator=(const PromiseActor &other) = delete; 485 PromiseActor(PromiseActor &&) = default; 486 PromiseActor &operator=(PromiseActor &&) = default; 487 ~PromiseActor() final { 488 close(); 489 } 490 491 void set_value(T &&value) final; 492 void set_error(Status &&error) final; 493 494 void close() { 495 future_id_.reset(); 496 } 497 498 // NB: if true is returned no further events will be sent 499 bool is_hangup() { 500 if (state_ == State::Hangup) { 501 return true; 502 } 503 if (!future_id_.is_alive()) { 504 state_ = State::Hangup; 505 future_id_.release(); 506 event_.clear(); 507 return true; 508 } 509 return false; 510 } 511 512 template <class S> 513 friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future); 514 515 bool empty_promise() { 516 return future_id_.empty(); 517 } 518 bool empty() { 519 return future_id_.empty(); 520 } 521 522 private: 523 ActorOwn<FutureActor<T>> future_id_; 524 EventFull event_; 525 State state_ = State::Hangup; 526 527 void init() { 528 state_ = State::Waiting; 529 event_.clear(); 530 } 531 }; 532 533 template <class T> 534 class FutureActor final : public Actor { 535 friend class PromiseActor<T>; 536 537 public: 538 enum State { Waiting, Ready }; 539 540 static constexpr int HANGUP_ERROR_CODE = 426487; 541 542 FutureActor() = default; 543 544 FutureActor(const FutureActor &other) = delete; 545 FutureActor &operator=(const FutureActor &other) = delete; 546 547 FutureActor(FutureActor &&other) = default; 548 FutureActor &operator=(FutureActor &&other) = default; 549 550 ~FutureActor() final = default; 551 552 bool is_ok() const { 553 return is_ready() && result_.is_ok(); 554 } 555 bool is_error() const { 556 CHECK(is_ready()); 557 return is_ready() && result_.is_error(); 558 } 559 T move_as_ok() { 560 return move_as_result().move_as_ok(); 561 } 562 Status move_as_error() TD_WARN_UNUSED_RESULT { 563 return move_as_result().move_as_error(); 564 } 565 Result<T> move_as_result() TD_WARN_UNUSED_RESULT { 566 CHECK(is_ready()); 567 SCOPE_EXIT { 568 do_stop(); 569 }; 570 return std::move(result_); 571 } 572 bool is_ready() const { 573 return !empty() && state_ == State::Ready; 574 } 575 576 void close() { 577 event_.clear(); 578 result_.clear(); 579 do_stop(); 580 } 581 582 void set_event(EventFull &&event) { 583 CHECK(!empty()); 584 event_ = std::move(event); 585 if (state_ != State::Waiting) { 586 event_.try_emit_later(); 587 } 588 } 589 590 State get_state() const { 591 return state_; 592 } 593 594 template <class S> 595 friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future); 596 597 private: 598 EventFull event_; 599 Result<T> result_ = Status::Error(500, "Empty FutureActor"); 600 State state_ = State::Waiting; 601 602 void set_value(T &&value) { 603 set_result(std::move(value)); 604 } 605 606 void set_error(Status &&error) { 607 set_result(std::move(error)); 608 } 609 610 void set_result(Result<T> &&result) { 611 CHECK(state_ == State::Waiting); 612 result_ = std::move(result); 613 state_ = State::Ready; 614 615 event_.try_emit_later(); 616 } 617 618 void hangup() final { 619 set_error(Status::Error<HANGUP_ERROR_CODE>()); 620 } 621 622 void start_up() final { 623 // empty 624 } 625 626 void init() { 627 CHECK(empty()); 628 state_ = State::Waiting; 629 event_.clear(); 630 } 631 }; 632 633 template <class T> 634 void PromiseActor<T>::set_value(T &&value) { 635 if (state_ == State::Waiting && !future_id_.empty()) { 636 send_closure(std::move(future_id_), &FutureActor<T>::set_value, std::move(value)); 637 } 638 } 639 template <class T> 640 void PromiseActor<T>::set_error(Status &&error) { 641 if (state_ == State::Waiting && !future_id_.empty()) { 642 send_closure(std::move(future_id_), &FutureActor<T>::set_error, std::move(error)); 643 } 644 } 645 646 template <class S> 647 void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future) { 648 promise->init(); 649 future->init(); 650 promise->future_id_ = register_actor("FutureActor", future); 651 652 CHECK(future->get_info() != nullptr); 653 } 654 655 template <class T> 656 class PromiseFuture { 657 public: 658 PromiseFuture() { 659 init_promise_future(&promise_, &future_); 660 } 661 PromiseActor<T> &promise() { 662 return promise_; 663 } 664 FutureActor<T> &future() { 665 return future_; 666 } 667 PromiseActor<T> &&move_promise() { 668 return std::move(promise_); 669 } 670 FutureActor<T> &&move_future() { 671 return std::move(future_); 672 } 673 674 private: 675 PromiseActor<T> promise_; 676 FutureActor<T> future_; 677 }; 678 679 template <ActorSendType send_type, class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, 680 class... ArgsT> 681 FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), 682 ArgsT &&... args) { 683 PromiseFuture<T> pf; 684 Scheduler::instance()->send_closure<send_type>( 685 std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...)); 686 return pf.move_future(); 687 } 688 689 class PromiseCreator { 690 public: 691 using Ignore = detail::Ignore; 692 693 template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>> 694 static Promise<ArgT> lambda(OkT &&ok) { 695 return Promise<ArgT>(td::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>>>(std::forward<OkT>(ok))); 696 } 697 698 template <class OkT, class FailT, class ArgT = detail::get_arg_t<OkT>> 699 static Promise<ArgT> lambda(OkT &&ok, FailT &&fail) { 700 return Promise<ArgT>(td::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, std::decay_t<FailT>>>( 701 std::forward<OkT>(ok), std::forward<FailT>(fail), false)); 702 } 703 704 template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>> 705 static auto cancellable_lambda(CancellationToken cancellation_token, OkT &&ok) { 706 return Promise<ArgT>(td::make_unique<detail::CancellablePromise<detail::LambdaPromise<ArgT, std::decay_t<OkT>>>>( 707 std::move(cancellation_token), std::forward<OkT>(ok))); 708 } 709 710 static Promise<> event(EventFull &&ok) { 711 return Promise<>(td::make_unique<detail::EventPromise>(std::move(ok))); 712 } 713 714 static Promise<> event(EventFull ok, EventFull fail) { 715 return Promise<>(td::make_unique<detail::EventPromise>(std::move(ok), std::move(fail))); 716 } 717 718 template <class... ArgsT> 719 static Promise<> join(ArgsT &&... args) { 720 return Promise<>(td::make_unique<detail::JoinPromise<ArgsT...>>(std::forward<ArgsT>(args)...)); 721 } 722 723 template <class T> 724 static Promise<T> from_promise_actor(PromiseActor<T> &&from) { 725 return Promise<T>(td::make_unique<PromiseActor<T>>(std::move(from))); 726 } 727 }; 728 729 } // namespace td 730