1 // Copyright 2020 the V8 project authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef V8_LIBPLATFORM_DEFAULT_JOB_H_ 6 #define V8_LIBPLATFORM_DEFAULT_JOB_H_ 7 8 #include <atomic> 9 #include <memory> 10 11 #include "include/libplatform/libplatform-export.h" 12 #include "include/v8-platform.h" 13 #include "src/base/platform/condition-variable.h" 14 #include "src/base/platform/mutex.h" 15 16 namespace v8 { 17 namespace platform { 18 19 class V8_PLATFORM_EXPORT DefaultJobState 20 : public std::enable_shared_from_this<DefaultJobState> { 21 public: 22 class JobDelegate : public v8::JobDelegate { 23 public: 24 explicit JobDelegate(DefaultJobState* outer, bool is_joining_thread = false) outer_(outer)25 : outer_(outer), is_joining_thread_(is_joining_thread) {} 26 ~JobDelegate(); 27 NotifyConcurrencyIncrease()28 void NotifyConcurrencyIncrease() override { 29 outer_->NotifyConcurrencyIncrease(); 30 } ShouldYield()31 bool ShouldYield() override { 32 // Thread-safe but may return an outdated result. 33 return outer_->is_canceled_.load(std::memory_order_relaxed); 34 } 35 uint8_t GetTaskId() override; IsJoiningThread()36 bool IsJoiningThread() const override { return is_joining_thread_; } 37 38 private: 39 static constexpr uint8_t kInvalidTaskId = 40 std::numeric_limits<uint8_t>::max(); 41 42 DefaultJobState* outer_; 43 uint8_t task_id_ = kInvalidTaskId; 44 bool is_joining_thread_; 45 }; 46 47 DefaultJobState(Platform* platform, std::unique_ptr<JobTask> job_task, 48 TaskPriority priority, size_t num_worker_threads); 49 virtual ~DefaultJobState(); 50 51 void NotifyConcurrencyIncrease(); 52 uint8_t AcquireTaskId(); 53 void ReleaseTaskId(uint8_t task_id); 54 55 void Join(); 56 void CancelAndWait(); 57 void CancelAndDetach(); 58 bool IsActive(); 59 60 // Must be called before running |job_task_| for the first time. If it returns 61 // true, then the worker thread must contribute and must call DidRunTask(), or 62 // false if it should return. 63 bool CanRunFirstTask(); 64 // Must be called after running |job_task_|. Returns true if the worker thread 65 // must contribute again, or false if it should return. 66 bool DidRunTask(); 67 68 void UpdatePriority(TaskPriority); 69 70 private: 71 // Called from the joining thread. Waits for the worker count to be below or 72 // equal to max concurrency (will happen when a worker calls 73 // DidRunTask()). Returns true if the joining thread should run a task, or 74 // false if joining was completed and all other workers returned because 75 // there's no work remaining. 76 bool WaitForParticipationOpportunityLockRequired(); 77 78 // Returns GetMaxConcurrency() capped by the number of threads used by this 79 // job. 80 size_t CappedMaxConcurrency(size_t worker_count) const; 81 82 void CallOnWorkerThread(TaskPriority priority, std::unique_ptr<Task> task); 83 84 Platform* const platform_; 85 std::unique_ptr<JobTask> job_task_; 86 87 // All members below are protected by |mutex_|. 88 base::Mutex mutex_; 89 TaskPriority priority_; 90 // Number of workers running this job. 91 size_t active_workers_ = 0; 92 // Number of posted tasks that aren't running this job yet. 93 size_t pending_tasks_ = 0; 94 // Indicates if the job is canceled. 95 std::atomic_bool is_canceled_{false}; 96 // Number of worker threads available to schedule the worker task. 97 size_t num_worker_threads_; 98 // Signaled when a worker returns. 99 base::ConditionVariable worker_released_condition_; 100 101 std::atomic<uint32_t> assigned_task_ids_{0}; 102 }; 103 104 class V8_PLATFORM_EXPORT DefaultJobHandle : public JobHandle { 105 public: 106 explicit DefaultJobHandle(std::shared_ptr<DefaultJobState> state); 107 ~DefaultJobHandle() override; 108 109 DefaultJobHandle(const DefaultJobHandle&) = delete; 110 DefaultJobHandle& operator=(const DefaultJobHandle&) = delete; 111 NotifyConcurrencyIncrease()112 void NotifyConcurrencyIncrease() override { 113 state_->NotifyConcurrencyIncrease(); 114 } 115 116 void Join() override; 117 void Cancel() override; 118 void CancelAndDetach() override; 119 bool IsActive() override; IsValid()120 bool IsValid() override { return state_ != nullptr; } 121 UpdatePriorityEnabled()122 bool UpdatePriorityEnabled() const override { return true; } 123 124 void UpdatePriority(TaskPriority) override; 125 126 private: 127 std::shared_ptr<DefaultJobState> state_; 128 }; 129 130 class DefaultJobWorker : public Task { 131 public: DefaultJobWorker(std::weak_ptr<DefaultJobState> state,JobTask * job_task)132 DefaultJobWorker(std::weak_ptr<DefaultJobState> state, JobTask* job_task) 133 : state_(std::move(state)), job_task_(job_task) {} 134 ~DefaultJobWorker() override = default; 135 136 DefaultJobWorker(const DefaultJobWorker&) = delete; 137 DefaultJobWorker& operator=(const DefaultJobWorker&) = delete; 138 Run()139 void Run() override { 140 auto shared_state = state_.lock(); 141 if (!shared_state) return; 142 if (!shared_state->CanRunFirstTask()) return; 143 do { 144 // Scope of |delegate| must not outlive DidRunTask() so that associated 145 // state is freed before the worker becomes inactive. 146 DefaultJobState::JobDelegate delegate(shared_state.get()); 147 job_task_->Run(&delegate); 148 } while (shared_state->DidRunTask()); 149 } 150 151 private: 152 friend class DefaultJob; 153 154 std::weak_ptr<DefaultJobState> state_; 155 JobTask* job_task_; 156 }; 157 158 } // namespace platform 159 } // namespace v8 160 161 #endif // V8_LIBPLATFORM_DEFAULT_JOB_H_ 162