1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef PLATFORM_IMPL_TASK_RUNNER_H_
6 #define PLATFORM_IMPL_TASK_RUNNER_H_
7 
8 #include <condition_variable>  // NOLINT
9 #include <map>
10 #include <memory>
11 #include <mutex>
12 #include <thread>
13 #include <utility>
14 #include <vector>
15 
16 #include "absl/base/thread_annotations.h"
17 #include "absl/types/optional.h"
18 #include "platform/api/task_runner.h"
19 #include "platform/api/time.h"
20 #include "platform/base/error.h"
21 #include "util/trace_logging.h"
22 
23 namespace openscreen {
24 
25 class TaskRunnerImpl final : public TaskRunner {
26  public:
27   using Task = TaskRunner::Task;
28 
29   class TaskWaiter {
30    public:
31     virtual ~TaskWaiter() = default;
32 
33     // These calls should be thread-safe.  The absolute minimum is that
34     // OnTaskPosted must be safe to call from another thread while this is
35     // inside WaitForTaskToBePosted.  NOTE: There may be spurious wakeups from
36     // WaitForTaskToBePosted depending on whether the specific implementation
37     // chooses to clear queued WakeUps before entering WaitForTaskToBePosted.
38 
39     // Blocks until some event occurs, which means new tasks may have been
40     // posted.  Wait may only block up to |timeout| where 0 means don't block at
41     // all (not block forever).
42     virtual Error WaitForTaskToBePosted(Clock::duration timeout) = 0;
43 
44     // If a WaitForTaskToBePosted call is currently blocking, unblock it
45     // immediately.
46     virtual void OnTaskPosted() = 0;
47   };
48 
49   explicit TaskRunnerImpl(
50       ClockNowFunctionPtr now_function,
51       TaskWaiter* event_waiter = nullptr,
52       Clock::duration waiter_timeout = std::chrono::milliseconds(100));
53 
54   // TaskRunner overrides
55   ~TaskRunnerImpl() final;
56   void PostPackagedTask(Task task) final;
57   void PostPackagedTaskWithDelay(Task task, Clock::duration delay) final;
58   bool IsRunningOnTaskRunner() final;
59 
60   // Blocks the current thread, executing tasks from the queue with the desired
61   // timing; and does not return until some time after RequestStopSoon() is
62   // called.
63   void RunUntilStopped();
64 
65   // Blocks the current thread, executing tasks from the queue with the desired
66   // timing; and does not return until some time after the current process is
67   // signaled with SIGINT or SIGTERM, or after RequestStopSoon() is called.
68   void RunUntilSignaled();
69 
70   // Thread-safe method for requesting the TaskRunner to stop running after all
71   // non-delayed tasks in the queue have run. This behavior allows final
72   // clean-up tasks to be executed before the TaskRunner stops.
73   //
74   // If any non-delayed tasks post additional non-delayed tasks, those will be
75   // run as well before returning.
76   void RequestStopSoon();
77 
78  private:
79 #if defined(ENABLE_TRACE_LOGGING)
80   // Wrapper around a Task used to store the TraceId Metadata along with the
81   // task itself, and to set the current TraceIdHierarchy before executing the
82   // task.
83   class TaskWithMetadata {
84    public:
85     // NOTE: 'explicit' keyword omitted so that conversion construtor can be
86     // used. This simplifies switching between 'Task' and 'TaskWithMetadata'
87     // based on the compilation flag.
TaskWithMetadata(Task task)88     TaskWithMetadata(Task task)  // NOLINT
89         : task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {}
90 
operator()91     void operator()() {
92       TRACE_SET_HIERARCHY(trace_ids_);
93       std::move(task_)();
94     }
95 
96    private:
97     Task task_;
98     TraceIdHierarchy trace_ids_;
99   };
100 #else   // !defined(ENABLE_TRACE_LOGGING)
101   using TaskWithMetadata = Task;
102 #endif  // defined(ENABLE_TRACE_LOGGING)
103 
104   // Helper that runs all tasks in |running_tasks_| and then clears it.
105   void RunRunnableTasks();
106 
107   // Look at all tasks in the delayed task queue, then schedule them if the
108   // minimum delay time has elapsed.
109   void ScheduleDelayedTasks();
110 
111   // Transfers all ready-to-run tasks from |tasks_| to |running_tasks_|. If
112   // there are no ready-to-run tasks, and |is_running_| is true, this method
113   // will block waiting for new tasks. Returns true if any tasks were
114   // transferred.
115   bool GrabMoreRunnableTasks();
116 
117   const ClockNowFunctionPtr now_function_;
118 
119   // Flag that indicates whether the task runner loop should continue. This is
120   // only meant to be read/written on the thread executing RunUntilStopped().
121   bool is_running_;
122 
123   // This mutex is used for |tasks_| and |delayed_tasks_|, and also for
124   // notifying the run loop to wake up when it is waiting for a task to be added
125   // to the queue in |run_loop_wakeup_|.
126   std::mutex task_mutex_;
127   std::vector<TaskWithMetadata> tasks_ GUARDED_BY(task_mutex_);
128   std::multimap<Clock::time_point, TaskWithMetadata> delayed_tasks_
129       GUARDED_BY(task_mutex_);
130 
131   // When |task_waiter_| is nullptr, |run_loop_wakeup_| is used for sleeping the
132   // task runner.  Otherwise, |run_loop_wakeup_| isn't used and |task_waiter_|
133   // is used instead (along with |waiter_timeout_|).
134   std::condition_variable run_loop_wakeup_;
135   TaskWaiter* const task_waiter_;
136   Clock::duration waiter_timeout_;
137 
138   // To prevent excessive re-allocation of the underlying array of the |tasks_|
139   // vector, use an A/B vector-swap mechanism. |running_tasks_| starts out
140   // empty, and is swapped with |tasks_| when it is time to run the Tasks.
141   std::vector<TaskWithMetadata> running_tasks_;
142 
143   std::thread::id task_runner_thread_id_;
144 
145   OSP_DISALLOW_COPY_AND_ASSIGN(TaskRunnerImpl);
146 };
147 }  // namespace openscreen
148 
149 #endif  // PLATFORM_IMPL_TASK_RUNNER_H_
150