1 // Copyright 2020 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef V8_LIBPLATFORM_DEFAULT_JOB_H_
6 #define V8_LIBPLATFORM_DEFAULT_JOB_H_
7 
8 #include <atomic>
9 #include <memory>
10 
11 #include "include/libplatform/libplatform-export.h"
12 #include "include/v8-platform.h"
13 #include "src/base/platform/condition-variable.h"
14 #include "src/base/platform/mutex.h"
15 
16 namespace v8 {
17 namespace platform {
18 
19 class V8_PLATFORM_EXPORT DefaultJobState
20     : public std::enable_shared_from_this<DefaultJobState> {
21  public:
22   class JobDelegate : public v8::JobDelegate {
23    public:
24     explicit JobDelegate(DefaultJobState* outer, bool is_joining_thread = false)
outer_(outer)25         : outer_(outer), is_joining_thread_(is_joining_thread) {}
26     ~JobDelegate();
27 
NotifyConcurrencyIncrease()28     void NotifyConcurrencyIncrease() override {
29       outer_->NotifyConcurrencyIncrease();
30     }
ShouldYield()31     bool ShouldYield() override {
32       // Thread-safe but may return an outdated result.
33       return outer_->is_canceled_.load(std::memory_order_relaxed);
34     }
35     uint8_t GetTaskId() override;
IsJoiningThread()36     bool IsJoiningThread() const override { return is_joining_thread_; }
37 
38    private:
39     static constexpr uint8_t kInvalidTaskId =
40         std::numeric_limits<uint8_t>::max();
41 
42     DefaultJobState* outer_;
43     uint8_t task_id_ = kInvalidTaskId;
44     bool is_joining_thread_;
45   };
46 
47   DefaultJobState(Platform* platform, std::unique_ptr<JobTask> job_task,
48                   TaskPriority priority, size_t num_worker_threads);
49   virtual ~DefaultJobState();
50 
51   void NotifyConcurrencyIncrease();
52   uint8_t AcquireTaskId();
53   void ReleaseTaskId(uint8_t task_id);
54 
55   void Join();
56   void CancelAndWait();
57   void CancelAndDetach();
58   bool IsActive();
59 
60   // Must be called before running |job_task_| for the first time. If it returns
61   // true, then the worker thread must contribute and must call DidRunTask(), or
62   // false if it should return.
63   bool CanRunFirstTask();
64   // Must be called after running |job_task_|. Returns true if the worker thread
65   // must contribute again, or false if it should return.
66   bool DidRunTask();
67 
68   void UpdatePriority(TaskPriority);
69 
70  private:
71   // Called from the joining thread. Waits for the worker count to be below or
72   // equal to max concurrency (will happen when a worker calls
73   // DidRunTask()). Returns true if the joining thread should run a task, or
74   // false if joining was completed and all other workers returned because
75   // there's no work remaining.
76   bool WaitForParticipationOpportunityLockRequired();
77 
78   // Returns GetMaxConcurrency() capped by the number of threads used by this
79   // job.
80   size_t CappedMaxConcurrency(size_t worker_count) const;
81 
82   void CallOnWorkerThread(TaskPriority priority, std::unique_ptr<Task> task);
83 
84   Platform* const platform_;
85   std::unique_ptr<JobTask> job_task_;
86 
87   // All members below are protected by |mutex_|.
88   base::Mutex mutex_;
89   TaskPriority priority_;
90   // Number of workers running this job.
91   size_t active_workers_ = 0;
92   // Number of posted tasks that aren't running this job yet.
93   size_t pending_tasks_ = 0;
94   // Indicates if the job is canceled.
95   std::atomic_bool is_canceled_{false};
96   // Number of worker threads available to schedule the worker task.
97   size_t num_worker_threads_;
98   // Signaled when a worker returns.
99   base::ConditionVariable worker_released_condition_;
100 
101   std::atomic<uint32_t> assigned_task_ids_{0};
102 };
103 
104 class V8_PLATFORM_EXPORT DefaultJobHandle : public JobHandle {
105  public:
106   explicit DefaultJobHandle(std::shared_ptr<DefaultJobState> state);
107   ~DefaultJobHandle() override;
108 
109   DefaultJobHandle(const DefaultJobHandle&) = delete;
110   DefaultJobHandle& operator=(const DefaultJobHandle&) = delete;
111 
NotifyConcurrencyIncrease()112   void NotifyConcurrencyIncrease() override {
113     state_->NotifyConcurrencyIncrease();
114   }
115 
116   void Join() override;
117   void Cancel() override;
118   void CancelAndDetach() override;
119   bool IsActive() override;
IsValid()120   bool IsValid() override { return state_ != nullptr; }
121 
UpdatePriorityEnabled()122   bool UpdatePriorityEnabled() const override { return true; }
123 
124   void UpdatePriority(TaskPriority) override;
125 
126  private:
127   std::shared_ptr<DefaultJobState> state_;
128 };
129 
130 class DefaultJobWorker : public Task {
131  public:
DefaultJobWorker(std::weak_ptr<DefaultJobState> state,JobTask * job_task)132   DefaultJobWorker(std::weak_ptr<DefaultJobState> state, JobTask* job_task)
133       : state_(std::move(state)), job_task_(job_task) {}
134   ~DefaultJobWorker() override = default;
135 
136   DefaultJobWorker(const DefaultJobWorker&) = delete;
137   DefaultJobWorker& operator=(const DefaultJobWorker&) = delete;
138 
Run()139   void Run() override {
140     auto shared_state = state_.lock();
141     if (!shared_state) return;
142     if (!shared_state->CanRunFirstTask()) return;
143     do {
144       // Scope of |delegate| must not outlive DidRunTask() so that associated
145       // state is freed before the worker becomes inactive.
146       DefaultJobState::JobDelegate delegate(shared_state.get());
147       job_task_->Run(&delegate);
148     } while (shared_state->DidRunTask());
149   }
150 
151  private:
152   friend class DefaultJob;
153 
154   std::weak_ptr<DefaultJobState> state_;
155   JobTask* job_task_;
156 };
157 
158 }  // namespace platform
159 }  // namespace v8
160 
161 #endif  // V8_LIBPLATFORM_DEFAULT_JOB_H_
162