1 // 2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 // 7 #pragma once 8 9 #include "td/actor/actor.h" 10 11 #include "td/utils/common.h" 12 #include "td/utils/port/thread.h" 13 #include "td/utils/Slice.h" 14 #include "td/utils/Time.h" 15 16 #if TD_PORT_WINDOWS 17 #include "td/utils/port/detail/Iocp.h" 18 #endif 19 20 #include <atomic> 21 #include <functional> 22 #include <mutex> 23 #include <utility> 24 25 namespace td { 26 27 class ConcurrentScheduler final : private Scheduler::Callback { 28 public: 29 void init(int32 threads_n); 30 finish_async()31 void finish_async() { 32 schedulers_[0]->finish(); 33 } wakeup()34 void wakeup() { 35 schedulers_[0]->wakeup(); 36 } get_main_guard()37 SchedulerGuard get_main_guard() { 38 return schedulers_[0]->get_guard(); 39 } 40 get_send_guard()41 SchedulerGuard get_send_guard() { 42 return schedulers_.back()->get_const_guard(); 43 } 44 45 void test_one_thread_run(); 46 is_finished()47 bool is_finished() const { 48 return is_finished_.load(std::memory_order_relaxed); 49 } 50 51 void start(); 52 run_main(double timeout)53 bool run_main(double timeout) { 54 return run_main(Timestamp::in(timeout)); 55 } 56 bool run_main(Timestamp timeout); 57 58 Timestamp get_main_timeout(); 59 static double emscripten_get_main_timeout(); 60 static void emscripten_clear_main_timeout(); 61 62 void finish(); 63 64 template <class ActorT, class... Args> create_actor_unsafe(int32 sched_id,Slice name,Args &&...args)65 ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&... args) { 66 #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED 67 sched_id = 0; 68 #endif 69 CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size())); 70 auto guard = schedulers_[sched_id]->get_guard(); 71 return schedulers_[sched_id]->create_actor<ActorT>(name, std::forward<Args>(args)...); 72 } 73 74 template <class ActorT> register_actor_unsafe(int32 sched_id,Slice name,ActorT * actor)75 ActorOwn<ActorT> register_actor_unsafe(int32 sched_id, Slice name, ActorT *actor) { 76 #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED 77 sched_id = 0; 78 #endif 79 CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size())); 80 auto guard = schedulers_[sched_id]->get_guard(); 81 return schedulers_[sched_id]->register_actor<ActorT>(name, actor); 82 } 83 84 private: 85 enum class State { Start, Run }; 86 State state_ = State::Start; 87 std::mutex at_finish_mutex_; 88 vector<std::function<void()>> at_finish_; // can be used during destruction by Scheduler destructors 89 vector<unique_ptr<Scheduler>> schedulers_; 90 std::atomic<bool> is_finished_{false}; 91 #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED 92 vector<td::thread> threads_; 93 #endif 94 #if TD_PORT_WINDOWS 95 unique_ptr<detail::Iocp> iocp_; 96 td::thread iocp_thread_; 97 #endif 98 int32 extra_scheduler_ = 0; 99 100 void on_finish() final; 101 102 void register_at_finish(std::function<void()> f) final; 103 }; 104 105 } // namespace td 106