1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #pragma once
8 
9 #include "td/actor/actor.h"
10 
11 #include "td/utils/CancellationToken.h"
12 #include "td/utils/Closure.h"
13 #include "td/utils/common.h"
14 #include "td/utils/invoke.h"
15 #include "td/utils/MovableValue.h"
16 #include "td/utils/ScopeGuard.h"
17 #include "td/utils/Status.h"
18 
19 #include <tuple>
20 #include <type_traits>
21 #include <utility>
22 
23 namespace td {
24 
25 template <class T = Unit>
26 class PromiseInterface {
27  public:
28   PromiseInterface() = default;
29   PromiseInterface(const PromiseInterface &) = delete;
30   PromiseInterface &operator=(const PromiseInterface &) = delete;
31   PromiseInterface(PromiseInterface &&) = default;
32   PromiseInterface &operator=(PromiseInterface &&) = default;
33   virtual ~PromiseInterface() = default;
set_value(T && value)34   virtual void set_value(T &&value) {
35     set_result(std::move(value));
36   }
set_error(Status && error)37   virtual void set_error(Status &&error) {
38     set_result(std::move(error));
39   }
set_result(Result<T> && result)40   virtual void set_result(Result<T> &&result) {
41     if (result.is_ok()) {
42       set_value(result.move_as_ok());
43     } else {
44       set_error(result.move_as_error());
45     }
46   }
operator()47   void operator()(T &&value) {
48     set_value(std::move(value));
49   }
operator()50   void operator()(Status &&error) {
51     set_error(std::move(error));
52   }
operator()53   void operator()(Result<T> &&result) {
54     set_result(std::move(result));
55   }
is_cancellable()56   virtual bool is_cancellable() const {
57     return false;
58   }
is_canceled()59   virtual bool is_canceled() const {
60     return false;
61   }
62 
start_migrate(int32 sched_id)63   virtual void start_migrate(int32 sched_id) {
64   }
finish_migrate()65   virtual void finish_migrate() {
66   }
67 };
68 
69 namespace detail {
70 
71 template <typename T>
72 struct GetArg final : public GetArg<decltype(&T::operator())> {};
73 
74 template <class C, class R, class Arg>
75 class GetArg<R (C::*)(Arg)> {
76  public:
77   using type = Arg;
78 };
79 template <class C, class R, class Arg>
80 class GetArg<R (C::*)(Arg) const> {
81  public:
82   using type = Arg;
83 };
84 
85 template <class T>
86 using get_arg_t = std::decay_t<typename GetArg<T>::type>;
87 
88 template <class T>
89 struct DropResult {
90   using type = T;
91 };
92 
93 template <class T>
94 struct DropResult<Result<T>> {
95   using type = T;
96 };
97 
98 template <class T>
99 using drop_result_t = typename DropResult<T>::type;
100 
101 struct Ignore {
102   void operator()(Status &&error) {
103     error.ignore();
104   }
105 };
106 
107 template <class ValueT, class FunctionOkT, class FunctionFailT = Ignore>
108 class LambdaPromise : public PromiseInterface<ValueT> {
109   enum class OnFail { None, Ok, Fail };
110 
111  public:
112   void set_value(ValueT &&value) override {
113     CHECK(has_lambda_.get());
114     do_ok(ok_, std::move(value));
115     on_fail_ = OnFail::None;
116   }
117   void set_error(Status &&error) override {
118     CHECK(has_lambda_.get());
119     do_error(std::move(error));
120   }
121   LambdaPromise(const LambdaPromise &other) = delete;
122   LambdaPromise &operator=(const LambdaPromise &other) = delete;
123   LambdaPromise(LambdaPromise &&other) = default;
124   LambdaPromise &operator=(LambdaPromise &&other) = default;
125   ~LambdaPromise() override {
126     if (has_lambda_.get()) {
127       do_error(Status::Error("Lost promise"));
128     }
129   }
130 
131   template <class FromOkT, class FromFailT>
132   LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail)
133       : ok_(std::forward<FromOkT>(ok))
134       , fail_(std::forward<FromFailT>(fail))
135       , on_fail_(use_ok_as_fail ? OnFail::Ok : OnFail::Fail)
136       , has_lambda_(true) {
137   }
138   template <class FromOkT, std::enable_if_t<!std::is_same<std::decay_t<FromOkT>, LambdaPromise>::value, int> = 0>
139   LambdaPromise(FromOkT &&ok) : LambdaPromise(std::forward<FromOkT>(ok), Ignore(), true) {
140   }
141 
142  private:
143   FunctionOkT ok_;
144   FunctionFailT fail_;
145   OnFail on_fail_ = OnFail::None;
146   MovableValue<bool> has_lambda_{false};
147 
148   void do_error(Status &&error) {
149     switch (on_fail_) {
150       case OnFail::None:
151         break;
152       case OnFail::Ok:
153         do_error(ok_, std::move(error));
154         break;
155       case OnFail::Fail:
156         do_error(fail_, std::move(error));
157         break;
158     }
159     on_fail_ = OnFail::None;
160   }
161 
162   template <class F>
163   std::enable_if_t<is_callable<F, Result<ValueT>>::value, void> do_error(F &&f, Status &&status) {
164     f(Result<ValueT>(std::move(status)));
165   }
166   template <class Y, class F>
167   std::enable_if_t<!is_callable<F, Result<ValueT>>::value, void> do_error(F &&f, Y &&status) {
168     f(Auto());
169   }
170   template <class F>
171   std::enable_if_t<is_callable<F, Result<ValueT>>::value, void> do_ok(F &&f, ValueT &&result) {
172     f(Result<ValueT>(std::move(result)));
173   }
174   template <class F>
175   std::enable_if_t<!is_callable<F, Result<ValueT>>::value, void> do_ok(F &&f, ValueT &&result) {
176     f(std::move(result));
177   }
178 };
179 }  // namespace detail
180 
181 template <class T>
182 class SafePromise;
183 
184 template <class T = Unit>
185 class Promise;
186 
187 constexpr std::false_type is_promise_interface(...);
188 
189 template <class T>
190 constexpr std::true_type is_promise_interface(const PromiseInterface<T> &promise);
191 
192 template <class T>
193 constexpr std::true_type is_promise_interface(const Promise<T> &promise);
194 
195 template <class F>
196 constexpr bool is_promise_interface() {
197   return decltype(is_promise_interface(std::declval<F>()))::value;
198 }
199 
200 constexpr std::false_type is_promise_interface_ptr(...);
201 
202 template <class T>
203 constexpr std::true_type is_promise_interface_ptr(const unique_ptr<T> &promise);
204 
205 template <class F>
206 constexpr bool is_promise_interface_ptr() {
207   return decltype(is_promise_interface_ptr(std::declval<F>()))::value;
208 }
209 
210 template <class T = void, class F = void, std::enable_if_t<std::is_same<T, void>::value, bool> has_t = false>
211 auto lambda_promise(F &&f) {
212   return detail::LambdaPromise<detail::drop_result_t<detail::get_arg_t<std::decay_t<F>>>, std::decay_t<F>>(
213       std::forward<F>(f));
214 }
215 template <class T = void, class F = void, std::enable_if_t<!std::is_same<T, void>::value, bool> has_t = true>
216 auto lambda_promise(F &&f) {
217   return detail::LambdaPromise<T, std::decay_t<F>>(std::forward<F>(f));
218 }
219 
220 template <class T, class F, std::enable_if_t<is_promise_interface<F>(), bool> from_promise_interface = true>
221 auto &&promise_interface(F &&f) {
222   return std::forward<F>(f);
223 }
224 
225 template <class T, class F, std::enable_if_t<!is_promise_interface<F>(), bool> from_promise_interface = false>
226 auto promise_interface(F &&f) {
227   return lambda_promise<T>(std::forward<F>(f));
228 }
229 
230 template <class T, class F, std::enable_if_t<is_promise_interface_ptr<F>(), bool> from_promise_interface = true>
231 auto promise_interface_ptr(F &&f) {
232   return std::forward<F>(f);
233 }
234 template <class T, class F, std::enable_if_t<!is_promise_interface_ptr<F>(), bool> from_promise_interface = false>
235 auto promise_interface_ptr(F &&f) {
236   return td::make_unique<std::decay_t<decltype(promise_interface<T>(std::forward<F>(f)))>>(
237       promise_interface<T>(std::forward<F>(f)));
238 }
239 
240 template <class T>
241 class Promise {
242  public:
243   void set_value(T &&value) {
244     if (!promise_) {
245       return;
246     }
247     promise_->set_value(std::move(value));
248     promise_.reset();
249   }
250   void set_error(Status &&error) {
251     if (!promise_) {
252       return;
253     }
254     promise_->set_error(std::move(error));
255     promise_.reset();
256   }
257   void set_result(Result<T> &&result) {
258     if (!promise_) {
259       return;
260     }
261     promise_->set_result(std::move(result));
262     promise_.reset();
263   }
264   template <class S>
265   void operator()(S &&result) {
266     if (!promise_) {
267       return;
268     }
269     promise_->operator()(std::forward<S>(result));
270     promise_.reset();
271   }
272   void reset() {
273     promise_.reset();
274   }
275   void start_migrate(int32 sched_id) {
276     if (!promise_) {
277       return;
278     }
279     promise_->start_migrate(sched_id);
280   }
281   void finish_migrate() {
282     if (!promise_) {
283       return;
284     }
285     promise_->finish_migrate();
286   }
287   bool is_cancellable() const {
288     if (!promise_) {
289       return false;
290     }
291     return promise_->is_cancellable();
292   }
293   bool is_canceled() const {
294     if (!promise_) {
295       return false;
296     }
297     return promise_->is_canceled();
298   }
299   unique_ptr<PromiseInterface<T>> release() {
300     return std::move(promise_);
301   }
302 
303   Promise() = default;
304   explicit Promise(unique_ptr<PromiseInterface<T>> promise) : promise_(std::move(promise)) {
305   }
306   Promise(Auto) {
307   }
308   Promise(SafePromise<T> &&other);
309   Promise &operator=(SafePromise<T> &&other);
310   template <class F, std::enable_if_t<!std::is_same<std::decay_t<F>, Promise>::value, int> = 0>
311   Promise(F &&f) : promise_(promise_interface_ptr<T>(std::forward<F>(f))) {
312   }
313 
314   explicit operator bool() {
315     return static_cast<bool>(promise_);
316   }
317 
318  private:
319   unique_ptr<PromiseInterface<T>> promise_;
320 };
321 
322 template <class T>
323 void start_migrate(Promise<T> &promise, int32 sched_id) {
324   // promise.start_migrate(sched_id);
325 }
326 template <class T>
327 void finish_migrate(Promise<T> &promise) {
328   // promise.finish_migrate();
329 }
330 
331 template <class T = Unit>
332 class SafePromise {
333  public:
334   SafePromise(Promise<T> promise, Result<T> result) : promise_(std::move(promise)), result_(std::move(result)) {
335   }
336   SafePromise(const SafePromise &other) = delete;
337   SafePromise &operator=(const SafePromise &other) = delete;
338   SafePromise(SafePromise &&other) = default;
339   SafePromise &operator=(SafePromise &&other) = default;
340   ~SafePromise() {
341     if (promise_) {
342       promise_.set_result(std::move(result_));
343     }
344   }
345   Promise<T> release() {
346     return std::move(promise_);
347   }
348 
349  private:
350   Promise<T> promise_;
351   Result<T> result_;
352 };
353 
354 template <class T>
355 Promise<T>::Promise(SafePromise<T> &&other) : Promise(other.release()) {
356 }
357 template <class T>
358 Promise<T> &Promise<T>::operator=(SafePromise<T> &&other) {
359   *this = other.release();
360   return *this;
361 }
362 
363 namespace detail {
364 
365 class EventPromise final : public PromiseInterface<Unit> {
366  public:
367   void set_value(Unit &&) final {
368     ok_.try_emit();
369     fail_.clear();
370   }
371   void set_error(Status &&) final {
372     do_set_error();
373   }
374 
375   EventPromise(const EventPromise &other) = delete;
376   EventPromise &operator=(const EventPromise &other) = delete;
377   EventPromise(EventPromise &&other) = delete;
378   EventPromise &operator=(EventPromise &&other) = delete;
379   ~EventPromise() final {
380     do_set_error();
381   }
382 
383   EventPromise() = default;
384   explicit EventPromise(EventFull ok) : ok_(std::move(ok)), use_ok_as_fail_(true) {
385   }
386   EventPromise(EventFull ok, EventFull fail) : ok_(std::move(ok)), fail_(std::move(fail)), use_ok_as_fail_(false) {
387   }
388 
389  private:
390   EventFull ok_;
391   EventFull fail_;
392   bool use_ok_as_fail_ = false;
393   void do_set_error() {
394     if (use_ok_as_fail_) {
395       ok_.try_emit();
396     } else {
397       ok_.clear();
398       fail_.try_emit();
399     }
400   }
401 };
402 
403 template <class PromiseT>
404 class CancellablePromise final : public PromiseT {
405  public:
406   template <class... ArgsT>
407   CancellablePromise(CancellationToken cancellation_token, ArgsT &&... args)
408       : PromiseT(std::forward<ArgsT>(args)...), cancellation_token_(std::move(cancellation_token)) {
409   }
410   bool is_cancellable() const final {
411     return true;
412   }
413   bool is_canceled() const final {
414     return static_cast<bool>(cancellation_token_);
415   }
416 
417  private:
418   CancellationToken cancellation_token_;
419 };
420 
421 template <class... ArgsT>
422 class JoinPromise final : public PromiseInterface<Unit> {
423  public:
424   explicit JoinPromise(ArgsT &&... arg) : promises_(std::forward<ArgsT>(arg)...) {
425   }
426   void set_value(Unit &&) final {
427     tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); });
428   }
429   void set_error(Status &&error) final {
430     tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); });
431   }
432 
433  private:
434   std::tuple<std::decay_t<ArgsT>...> promises_;
435 };
436 }  // namespace detail
437 
438 class SendClosure {
439  public:
440   template <class... ArgsT>
441   void operator()(ArgsT &&... args) const {
442     send_closure(std::forward<ArgsT>(args)...);
443   }
444 };
445 
446 //template <class T>
447 //template <class... ArgsT>
448 //auto Promise<T>::send_closure(ArgsT &&... args) {
449 //  return [promise = std::move(*this), t = std::make_tuple(std::forward<ArgsT>(args)...)](auto &&r_res) mutable {
450 //    TRY_RESULT_PROMISE(promise, res, std::move(r_res));
451 //    td2::call_tuple(SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::move(res), std::move(promise))));
452 //  };
453 //}
454 
455 template <class... ArgsT>
456 auto promise_send_closure(ArgsT &&... args) {
457   return [t = std::make_tuple(std::forward<ArgsT>(args)...)](auto &&res) mutable {
458     call_tuple(SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::forward<decltype(res)>(res))));
459   };
460 }
461 
462 /*** FutureActor and PromiseActor ***/
463 template <class T>
464 class FutureActor;
465 
466 template <class T>
467 class PromiseActor;
468 
469 template <class T>
470 class ActorTraits<FutureActor<T>> {
471  public:
472   static constexpr bool need_context = false;
473   static constexpr bool need_start_up = false;
474 };
475 
476 template <class T>
477 class PromiseActor final : public PromiseInterface<T> {
478   friend class FutureActor<T>;
479   enum State { Waiting, Hangup };
480 
481  public:
482   PromiseActor() = default;
483   PromiseActor(const PromiseActor &other) = delete;
484   PromiseActor &operator=(const PromiseActor &other) = delete;
485   PromiseActor(PromiseActor &&) = default;
486   PromiseActor &operator=(PromiseActor &&) = default;
487   ~PromiseActor() final {
488     close();
489   }
490 
491   void set_value(T &&value) final;
492   void set_error(Status &&error) final;
493 
494   void close() {
495     future_id_.reset();
496   }
497 
498   // NB: if true is returned no further events will be sent
499   bool is_hangup() {
500     if (state_ == State::Hangup) {
501       return true;
502     }
503     if (!future_id_.is_alive()) {
504       state_ = State::Hangup;
505       future_id_.release();
506       event_.clear();
507       return true;
508     }
509     return false;
510   }
511 
512   template <class S>
513   friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future);
514 
515   bool empty_promise() {
516     return future_id_.empty();
517   }
518   bool empty() {
519     return future_id_.empty();
520   }
521 
522  private:
523   ActorOwn<FutureActor<T>> future_id_;
524   EventFull event_;
525   State state_ = State::Hangup;
526 
527   void init() {
528     state_ = State::Waiting;
529     event_.clear();
530   }
531 };
532 
533 template <class T>
534 class FutureActor final : public Actor {
535   friend class PromiseActor<T>;
536 
537  public:
538   enum State { Waiting, Ready };
539 
540   static constexpr int HANGUP_ERROR_CODE = 426487;
541 
542   FutureActor() = default;
543 
544   FutureActor(const FutureActor &other) = delete;
545   FutureActor &operator=(const FutureActor &other) = delete;
546 
547   FutureActor(FutureActor &&other) = default;
548   FutureActor &operator=(FutureActor &&other) = default;
549 
550   ~FutureActor() final = default;
551 
552   bool is_ok() const {
553     return is_ready() && result_.is_ok();
554   }
555   bool is_error() const {
556     CHECK(is_ready());
557     return is_ready() && result_.is_error();
558   }
559   T move_as_ok() {
560     return move_as_result().move_as_ok();
561   }
562   Status move_as_error() TD_WARN_UNUSED_RESULT {
563     return move_as_result().move_as_error();
564   }
565   Result<T> move_as_result() TD_WARN_UNUSED_RESULT {
566     CHECK(is_ready());
567     SCOPE_EXIT {
568       do_stop();
569     };
570     return std::move(result_);
571   }
572   bool is_ready() const {
573     return !empty() && state_ == State::Ready;
574   }
575 
576   void close() {
577     event_.clear();
578     result_.clear();
579     do_stop();
580   }
581 
582   void set_event(EventFull &&event) {
583     CHECK(!empty());
584     event_ = std::move(event);
585     if (state_ != State::Waiting) {
586       event_.try_emit_later();
587     }
588   }
589 
590   State get_state() const {
591     return state_;
592   }
593 
594   template <class S>
595   friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future);
596 
597  private:
598   EventFull event_;
599   Result<T> result_ = Status::Error(500, "Empty FutureActor");
600   State state_ = State::Waiting;
601 
602   void set_value(T &&value) {
603     set_result(std::move(value));
604   }
605 
606   void set_error(Status &&error) {
607     set_result(std::move(error));
608   }
609 
610   void set_result(Result<T> &&result) {
611     CHECK(state_ == State::Waiting);
612     result_ = std::move(result);
613     state_ = State::Ready;
614 
615     event_.try_emit_later();
616   }
617 
618   void hangup() final {
619     set_error(Status::Error<HANGUP_ERROR_CODE>());
620   }
621 
622   void start_up() final {
623     // empty
624   }
625 
626   void init() {
627     CHECK(empty());
628     state_ = State::Waiting;
629     event_.clear();
630   }
631 };
632 
633 template <class T>
634 void PromiseActor<T>::set_value(T &&value) {
635   if (state_ == State::Waiting && !future_id_.empty()) {
636     send_closure(std::move(future_id_), &FutureActor<T>::set_value, std::move(value));
637   }
638 }
639 template <class T>
640 void PromiseActor<T>::set_error(Status &&error) {
641   if (state_ == State::Waiting && !future_id_.empty()) {
642     send_closure(std::move(future_id_), &FutureActor<T>::set_error, std::move(error));
643   }
644 }
645 
646 template <class S>
647 void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future) {
648   promise->init();
649   future->init();
650   promise->future_id_ = register_actor("FutureActor", future);
651 
652   CHECK(future->get_info() != nullptr);
653 }
654 
655 template <class T>
656 class PromiseFuture {
657  public:
658   PromiseFuture() {
659     init_promise_future(&promise_, &future_);
660   }
661   PromiseActor<T> &promise() {
662     return promise_;
663   }
664   FutureActor<T> &future() {
665     return future_;
666   }
667   PromiseActor<T> &&move_promise() {
668     return std::move(promise_);
669   }
670   FutureActor<T> &&move_future() {
671     return std::move(future_);
672   }
673 
674  private:
675   PromiseActor<T> promise_;
676   FutureActor<T> future_;
677 };
678 
679 template <ActorSendType send_type, class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT,
680           class... ArgsT>
681 FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...),
682                             ArgsT &&... args) {
683   PromiseFuture<T> pf;
684   Scheduler::instance()->send_closure<send_type>(
685       std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...));
686   return pf.move_future();
687 }
688 
689 class PromiseCreator {
690  public:
691   using Ignore = detail::Ignore;
692 
693   template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>>
694   static Promise<ArgT> lambda(OkT &&ok) {
695     return Promise<ArgT>(td::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>>>(std::forward<OkT>(ok)));
696   }
697 
698   template <class OkT, class FailT, class ArgT = detail::get_arg_t<OkT>>
699   static Promise<ArgT> lambda(OkT &&ok, FailT &&fail) {
700     return Promise<ArgT>(td::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, std::decay_t<FailT>>>(
701         std::forward<OkT>(ok), std::forward<FailT>(fail), false));
702   }
703 
704   template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>>
705   static auto cancellable_lambda(CancellationToken cancellation_token, OkT &&ok) {
706     return Promise<ArgT>(td::make_unique<detail::CancellablePromise<detail::LambdaPromise<ArgT, std::decay_t<OkT>>>>(
707         std::move(cancellation_token), std::forward<OkT>(ok)));
708   }
709 
710   static Promise<> event(EventFull &&ok) {
711     return Promise<>(td::make_unique<detail::EventPromise>(std::move(ok)));
712   }
713 
714   static Promise<> event(EventFull ok, EventFull fail) {
715     return Promise<>(td::make_unique<detail::EventPromise>(std::move(ok), std::move(fail)));
716   }
717 
718   template <class... ArgsT>
719   static Promise<> join(ArgsT &&... args) {
720     return Promise<>(td::make_unique<detail::JoinPromise<ArgsT...>>(std::forward<ArgsT>(args)...));
721   }
722 
723   template <class T>
724   static Promise<T> from_promise_actor(PromiseActor<T> &&from) {
725     return Promise<T>(td::make_unique<PromiseActor<T>>(std::move(from)));
726   }
727 };
728 
729 }  // namespace td
730