1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #pragma once
8 
9 #include "td/actor/actor.h"
10 
11 #include "td/utils/common.h"
12 #include "td/utils/port/thread.h"
13 #include "td/utils/Slice.h"
14 #include "td/utils/Time.h"
15 
16 #if TD_PORT_WINDOWS
17 #include "td/utils/port/detail/Iocp.h"
18 #endif
19 
20 #include <atomic>
21 #include <functional>
22 #include <mutex>
23 #include <utility>
24 
25 namespace td {
26 
27 class ConcurrentScheduler final : private Scheduler::Callback {
28  public:
29   void init(int32 threads_n);
30 
finish_async()31   void finish_async() {
32     schedulers_[0]->finish();
33   }
wakeup()34   void wakeup() {
35     schedulers_[0]->wakeup();
36   }
get_main_guard()37   SchedulerGuard get_main_guard() {
38     return schedulers_[0]->get_guard();
39   }
40 
get_send_guard()41   SchedulerGuard get_send_guard() {
42     return schedulers_.back()->get_const_guard();
43   }
44 
45   void test_one_thread_run();
46 
is_finished()47   bool is_finished() const {
48     return is_finished_.load(std::memory_order_relaxed);
49   }
50 
51   void start();
52 
run_main(double timeout)53   bool run_main(double timeout) {
54     return run_main(Timestamp::in(timeout));
55   }
56   bool run_main(Timestamp timeout);
57 
58   Timestamp get_main_timeout();
59   static double emscripten_get_main_timeout();
60   static void emscripten_clear_main_timeout();
61 
62   void finish();
63 
64   template <class ActorT, class... Args>
create_actor_unsafe(int32 sched_id,Slice name,Args &&...args)65   ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&... args) {
66 #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
67     sched_id = 0;
68 #endif
69     CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size()));
70     auto guard = schedulers_[sched_id]->get_guard();
71     return schedulers_[sched_id]->create_actor<ActorT>(name, std::forward<Args>(args)...);
72   }
73 
74   template <class ActorT>
register_actor_unsafe(int32 sched_id,Slice name,ActorT * actor)75   ActorOwn<ActorT> register_actor_unsafe(int32 sched_id, Slice name, ActorT *actor) {
76 #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
77     sched_id = 0;
78 #endif
79     CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size()));
80     auto guard = schedulers_[sched_id]->get_guard();
81     return schedulers_[sched_id]->register_actor<ActorT>(name, actor);
82   }
83 
84  private:
85   enum class State { Start, Run };
86   State state_ = State::Start;
87   std::mutex at_finish_mutex_;
88   vector<std::function<void()>> at_finish_;  // can be used during destruction by Scheduler destructors
89   vector<unique_ptr<Scheduler>> schedulers_;
90   std::atomic<bool> is_finished_{false};
91 #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
92   vector<td::thread> threads_;
93 #endif
94 #if TD_PORT_WINDOWS
95   unique_ptr<detail::Iocp> iocp_;
96   td::thread iocp_thread_;
97 #endif
98   int32 extra_scheduler_ = 0;
99 
100   void on_finish() final;
101 
102   void register_at_finish(std::function<void()> f) final;
103 };
104 
105 }  // namespace td
106